Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • b093802
  • /
  • test_script_rec_function.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge
swh:1:cnt:299af3454b984231d5180b8f5bbc5436e1f1a3e0
directory badge
swh:1:dir:b09380205ed372b3c80d9423d9c6f5d70cdd352a

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
(requires biblatex-software package)
Generating citation ...
(requires biblatex-software package)
Generating citation ...
test_script_rec_function.py
# -*- coding: utf-8 -*-
"""
Created on Tue Oct 24 17:11:21 2023

@author: luis.pinos-ullauri
"""

from scipy.special import expit
import read_functions as rdf
import math
import pygad
import random as rand
import numpy as np
import time
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter



    
### Function that returns the overall fitness of the solution across all soft skill dimensions
### It checks whether the fitness amongst the different dimensions is compensated or not
### as well as which scoring function to use
def fitness_func(ga_instance, solution, solution_idx):
    soft_skill_scores=[]
    if compensatory:
        fitness=0
    else:
        fitness=1
    for i in range(10):#soft skill id from 0 to 9
        estimated_outcome=soft_skill_estimation_mean(solution,i)
        if score_function==1:
            score=linear(estimated_outcome,i)
        elif score_function==2:
            score=logistic(estimated_outcome,i)
        elif score_function==3:
            score=quadratic(estimated_outcome,i)
        if ga_instance is None:
            soft_skill_scores.append(score)            
        if compensatory:
            fitness=fitness+score*(1/10)
        else:
            fitness=fitness*score
    if ga_instance is None:
        return fitness,soft_skill_scores
    return fitness

### Linear scoring function
### Fair scoring with no bonuses nor penalisations if the
### soft skill proficiency mean reaches or not the expected profile
### Scoring function rescaled so that it ranges from 0 to 1
def linear(estimated_skill,soft_skill_id):
    #function min value
    f_min=min_skill-desired_outcome[soft_skill_id]
    #function max value
    f_max=max_skill-desired_outcome[soft_skill_id]    
    return (estimated_skill-desired_outcome[soft_skill_id]-f_min)/(f_max-f_min)
   
### Logistic scoring function
### Stricter scoring with penalisations and bonuses if the
### soft skill proficiency mean reaches or not the expcted profile
### Scoring function rescaled so that it ranges from 0 to 1     
def logistic(estimated_skill,soft_skill_id):
    #crossing value with linear function at estimated_goal=goal_skill
    fcrossing=(desired_outcome[soft_skill_id]-min_skill)/(max_skill-min_skill)
    return (1)/(1+((1-fcrossing)/fcrossing)*pow(math.e,3*(desired_outcome[soft_skill_id]-estimated_skill)))

### Quadratic Root scoring function
### Less demanding scoring that allows an easier scoring if the
### soft skill proficiency mean reaches or not the expcted profile
### Scoring function rescaled so that it ranges from 0 to 1  
def quadratic(estimated_skill,soft_skill_id):
    #crossing value with linear function at estimated_goal=goal_skill
    fcrossing=(desired_outcome[soft_skill_id]-min_skill)/(max_skill-min_skill)
    #function min value
    f_min=min_skill-desired_outcome[soft_skill_id]
    #function max value
    f_max=max_skill-desired_outcome[soft_skill_id]
    if estimated_skill<=desired_outcome[soft_skill_id]:            
        return -1*pow(estimated_skill-desired_outcome[soft_skill_id],2)/(-f_min)*fcrossing+fcrossing
    else:
        return 1*pow(estimated_skill-desired_outcome[soft_skill_id],2)/(f_max)*(1-fcrossing)+fcrossing

### Function that estimates the soft skill mean based on the ordinal logistic regression model
### It calculates the probability of each level and estimates the mean SUM x*P(X=x)
def soft_skill_estimation_mean(solution,soft_skill_id):
    linear_combination=0
    for i in range(len(solution)):
        if solution[i]!=0 and solution[i]!=104 and solution[i]!=105:
            linear_combination=linear_combination+courses_effects.iloc[soft_skill_id,0]+courses_effects.iloc[soft_skill_id,solution[i]]
        elif solution[i]==104 or solution[i]==105:
            linear_combination=linear_combination+courses_effects.iloc[soft_skill_id,0]+courses_effects.iloc[soft_skill_id,solution[i]-1]
    linear_combination=linear_combination+theta[soft_skill_id]
    eta12=thresholds.iloc[soft_skill_id,0]-linear_combination
    eta23=thresholds.iloc[soft_skill_id,1]-linear_combination
    eta34=thresholds.iloc[soft_skill_id,2]-linear_combination
    p_1=expit(eta12)
    p_2=expit(eta23)-p_1
    p_3=expit(eta34)-p_1-p_2
    p_4=1-p_3-p_2-p_1
    expected_outcome=1*p_1+2*p_2+3*p_3+4*p_4
    return expected_outcome
   
### Function that performs uniform crossover between two parent combinations
### maintaining the constraint of different followed courses
def unifcrossover_func(parents, offspring_size, ga_instance):
    #ct = time.time()
    offspring = []
    idx = 0
    while len(offspring) != offspring_size[0]:
        parent1 = parents[idx % parents.shape[0], :].copy()
        parent2 = parents[(idx + 1) % parents.shape[0], :].copy()
        child=parent2.copy()#child is copy of parent2 by default
        if np.random.random()<=ga_instance.crossover_probability:
            for i in range(len(parent1)):
                if parent1[i] not in parent2:#gene parent1[i] not in parent[2]
                    if rand.random()<0.5:#copy parent1                        
                        child[i]=parent1[i]         
        offspring.append(child)
        idx += 1
    #print("crossover"+'{0:.3f}'.format(time.time() - ct))
    return np.array(offspring)


generation_number=0

### Functions that performs a single point mutation on one course
### It checks whether the new course is already in the combination and if so
### Generates a new one
def mutation_func(offspring, ga_instance):
    global generation_number
    generation_number+=1
    ct = time.time()
    view = offspring.shape[0]
    for chromosome_idx in range(view):
        if np.random.random()<=ga_instance.mutation_probability:            
            availabe_course_for_sol = np.setdiff1d(possible_courses,offspring[chromosome_idx]) #Remove all course of indiv in the catalog to draw a rn value in
            random_gene=np.random.choice(availabe_course_for_sol)
            random_gene_idx = np.random.choice(range(offspring.shape[1]))
            offspring[chromosome_idx][random_gene_idx]=random_gene 
    print(generation_number," generation mutation"+'{0:.3f}'.format(time.time() - ct))
    return offspring

### Function that register the elapsed time between the first generation and the current one
### It saves the values in a global variable which is used in other functions
def on_fitness(ga_instance, population_fitness):
    end_time = time.time()
    elapsed_time=end_time-start_time
    global generation_time
    generation_time.append(elapsed_time)
    #logging.info('{0:.3f}'.format(elapsed_time))

### Function that calculates the fitness of a population and returns a list of the fitness values
def batch_fitness(population):
    pop_fitness=[]
    for combination in population:
        pop_fitness.append(fitness_func(None, combination,None))
    return pop_fitness

### Function that parses an individual into a coded string
### Returns three different versions depending whether it receives a combination of the individual or not
### VERSION WITHOUT FITNESS: course identifier SPACE .... course identifier SKIPLINE
### VERSION WITHOUT COMBINATION: fitness value SPACE soft skill score SPACE ..... soft skill score SKIPLINE
### VERSION WITH COMBINATION AND FITNESS: course identifier SPACE .... course identifier fitness value SKIPLINE
def parse_individual(fitness,combination):
    string=""
    if fitness is None:
        for course in combination:
            string=string+str(course)+" "
        string=string+'\n'
    if combination is None:
        string=string+'{0:.3f}'.format(fitness[0])+" "
        for soft_skill_score in fitness[1]:
            string=string+'{0:.3f}'.format(soft_skill_score)+" "
        string=string+'\n'
    if combination is not None and fitness is not None:
        for course in combination:
            string=string+str(course)+" "
        string=string+'{0:.3f}'.format(fitness)+'\n'        
    return string

### Function that writes the fitness run file of the initial and last population
### There are 200 individuals (lines), 100 of the first and 100 of the last generation
### Calls the parse individual function without the combination version
def write_fit_run(pop_init_fitness,pop_last_fitness,
                  student_row,domain_id,score_function,compensatory,num_generations,
                  crossover_prob,mutation_prob,seed):
    string_file=""
    for individual_fitness in pop_init_fitness:
        string_file=string_file+parse_individual(individual_fitness,None)
    for individual_fitness in pop_last_fitness:
        string_file=string_file+parse_individual(individual_fitness,None)
    if compensatory:
        file_title="fit_"+str(student_row)+"_"+str(domain_id)+"_"+str(score_function)+"_comp_"+str(num_generations)+"_"+str(int(crossover_prob*100))+"_"+str(int(mutation_prob*100))+"_"+str(seed)
    else:
        file_title="fit_"+str(student_row)+"_"+str(domain_id)+"_"+str(score_function)+"_parcomp_"+str(num_generations)+"_"+str(int(crossover_prob*100))+"_"+str(int(mutation_prob*100))+"_"+str(seed)
    file=open(file_title,"w")
    file.write(string_file)
    file.close()
    return string_file

### Function that writes the population run file of the initial and last population
### There are 200 individuals (lines), 100 of the first and 100 of the last generation
### Then, there are 200 lines, where each shows the combination solutions
### Receives as parameter the string file of the fit run file and adds the combinations
### Calls the parse individual function with the combination version 
def write_pop_run(initial_population,last_population,fitness_string_file,
                  student_row,domain_id,score_function,compensatory,num_generations,
                  crossover_prob,mutation_prob,seed):
    initial_population.sort()
    last_population.sort()
    string_file=fitness_string_file
    for individual in initial_population:
        string_file=string_file+parse_individual(None,individual)
    for individual in last_population:
        string_file=string_file+parse_individual(None,individual)
    if compensatory:
        file_title="pop_"+str(student_row)+"_"+str(domain_id)+"_"+str(score_function)+"_comp_"+str(num_generations)+"_"+str(int(crossover_prob*100))+"_"+str(int(mutation_prob*100))+"_"+str(seed)
    else:
        file_title="pop_"+str(student_row)+"_"+str(domain_id)+"_"+str(score_function)+"_parcomp_"+str(num_generations)+"_"+str(int(crossover_prob*100))+"_"+str(int(mutation_prob*100))+"_"+str(seed)
    file=open(file_title,"w")
    file.write(string_file)
    file.close()

### Function that writes the best solution run file
### It writes the best solution calling the parse individual function
def write_best_sol_run(best_fitness_solution,best_fitness,
              student_row,domain_id,score_function,compensatory,number_generations,
              crossover_probability,mutation_probability,seed):
    best_fitness_solution.sort()
    string_file=parse_individual(best_fitness, best_fitness_solution)
    if compensatory:
        file_title="bestsol_"+str(student_row)+"_"+str(domain_id)+"_"+str(score_function)+"_comp_"+str(number_generations)+"_"+str(int(crossover_probability*100))+"_"+str(int(mutation_probability*100))+"_"+str(seed)
    else:
        file_title="bestsol_"+str(student_row)+"_"+str(domain_id)+"_"+str(score_function)+"_parcomp_"+str(number_generations)+"_"+str(int(crossover_probability*100))+"_"+str(int(mutation_probability*100))+"_"+str(seed)
    file=open(file_title,"w")
    file.write(string_file)
    file.close()
    
### Function that writes the best all run file
### It writes the best solutions per generation along the elapsed time taken
def write_best_all_run(best_fitness_by_gen,
              student_row,domain_id,score_function,compensatory,number_generations,
              crossover_probability,mutation_probability,seed):
    global generation_time
    for j in range(len(best_fitness_by_gen)):
        if j==0:
            string_file="0.000 "+'{0:.3f}'.format(best_fitness_by_gen[j])+'\n'
        else:
            string_file=string_file+'{0:.3f}'.format(generation_time[j-1])+" "+'{0:.3f}'.format(best_fitness_by_gen[j])+'\n'
    if compensatory:
        file_title="bestall_"+str(student_row)+"_"+str(domain_id)+"_"+str(score_function)+"_comp_"+str(number_generations)+"_"+str(int(crossover_probability*100))+"_"+str(int(mutation_probability*100))+"_"+str(seed)
    else:
        file_title="bestall_"+str(student_row)+"_"+str(domain_id)+"_"+str(score_function)+"_parcomp_"+str(number_generations)+"_"+str(int(crossover_probability*100))+"_"+str(int(mutation_probability*100))+"_"+str(seed)
    file=open(file_title,"w")
    file.write(string_file)
    file.close()
    
### Function that writes the files based on the run results    
def write_files(solution,solution_fitness,best_fitness_by_gen,initial_population,last_population,pop_init_fitness,pop_last_fitness,
                  student_row,domain_id,score_function,compensatory,num_generations,
                  crossover_prob,mutation_prob,seed):
    fitness_string_file=write_fit_run(pop_init_fitness,pop_last_fitness,
                  student_row,domain_id,score_function,compensatory,number_generations,
                  crossover_probability,mutation_probability,seed)
    
    write_pop_run(initial_population,last_population,fitness_string_file,
                  student_row,domain_id,score_function,compensatory,number_generations,
                  crossover_probability,mutation_probability,seed)
    write_best_sol_run(solution,solution_fitness,
                  student_row,domain_id,score_function,compensatory,number_generations,
                  crossover_probability,mutation_probability,seed)
    write_best_all_run(best_fitness_by_gen,
                  student_row,domain_id,score_function,compensatory,number_generations,
                  crossover_probability,mutation_probability,seed)
    
   


parser = ArgumentParser(description="Genetic Algorithm Script",
                                 formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("-s", "--student_row", default=0,type=int, help="Student id row")
parser.add_argument("-d", "--domain_id", default=4, type=int, help="Domain id EE=1,IS=2,MX=3,NU=4")
parser.add_argument("-f", "--score_function", default=1, type=int, help="Score function Linear=1,Logistic=2,Quadratic=3")
parser.add_argument("-c", "--compensatory", default=False, type=lambda x: (str(x).lower() == 'true' or str(x).lower() == 't' or str(x)=='1'), help="Compensatory=True, Partially Compensatory=False")
parser.add_argument("-g", "--number_generations", default=10, type=int, help="Number of generations")
parser.add_argument("-x", "--crossover_probability", default=0.65, type=float, help="Probability of crossover")
parser.add_argument("-m", "--mutation_probability", default=0.15, type=float, help="Probability of mutation")
parser.add_argument("-n", "--sol_per_pop", default=100, type=int, help="Population size")
parser.add_argument("-e", "--keep_elitism", default=1, type=int, help="Number of combinations kept after each generation")
parser.add_argument("-sd", "--seed", default=1, type=int, help="Seed")

args = vars(parser.parse_args())

student_row=args["student_row"]
domain_id=args["domain_id"]
score_function=args["score_function"]
compensatory=args["compensatory"]
number_generations=args["number_generations"]
crossover_probability=args["crossover_probability"]
mutation_probability=args["mutation_probability"]
sol_per_pop=args["sol_per_pop"]
keep_elitism=args["keep_elitism"]
seed=args["seed"]

#Read real data set
real_data=rdf.read_real_data()
#Considering only stage 2
real_data_stage2=real_data.loc[real_data["stage"]==2]
real_data_stage2=real_data_stage2.loc[real_data_stage2["N_courses_followed"]>5]
real_data_stage2=real_data_stage2.loc[real_data_stage2["N_courses_followed"]<12]
real_data_stage2=real_data_stage2.loc[real_data_stage2["domain_id"]==domain_id]
real_data_stage2=real_data_stage2.reset_index(drop=True)
student_id=real_data_stage2.iloc[student_row,0]
N_courses_followed=real_data_stage2.iloc[student_row,25]
thresholds=rdf.get_thresholds()
courses_effects=rdf.get_courses_effects()
min_skill=1
max_skill=4        
theta=rdf.get_student_random_effect(student_id)
desired_outcome=rdf.get_desired_standard(domain_id)
possible_courses=rdf.get_courses_domain(domain_id)        
num_parents_mating=int(sol_per_pop/2)
keep_parents=keep_elitism
parent_selection_type="tournament"
start_time = time.time()
generation_time=[]



ga_instance = pygad.GA(num_generations=number_generations,
                   num_parents_mating=num_parents_mating,
                   fitness_func=fitness_func,
                   crossover_type=unifcrossover_func,
                   mutation_type=mutation_func,
                   crossover_probability=crossover_probability,
                   mutation_probability=mutation_probability,
                   sol_per_pop=sol_per_pop,
                   num_genes=int(N_courses_followed),
                   allow_duplicate_genes=False,
                   parent_selection_type=parent_selection_type,
                   gene_type=int,gene_space=possible_courses,
                   keep_elitism=keep_elitism,
                   on_fitness=on_fitness,
                   random_seed=seed,
                   suppress_warnings=True
                   )
ga_instance.run()
#ga_instance.plot_fitness()
solution, solution_fitness, solution_idx = ga_instance.best_solution()
solution.sort()
print("Parameters of the best solution :",solution)
print("Fitness value of the best solution = {solution_fitness}".format(solution_fitness=solution_fitness))
print("Crossover probability:",ga_instance.crossover_probability)
print("Mutation probability:",ga_instance.mutation_probability)
print(f"Best fitness value reached after {ga_instance.best_solution_generation} generations.")

initial_population=ga_instance.initial_population
last_population=ga_instance.last_generation_offspring_mutation
last_population=np.concatenate((last_population,ga_instance.last_generation_elitism),axis=0)
pop_init_fitness=batch_fitness(initial_population)
pop_last_fitness=batch_fitness(last_population)
best_fitness_by_gen=ga_instance.best_solutions_fitness
write_files(solution,solution_fitness,best_fitness_by_gen,initial_population,last_population,pop_init_fitness,pop_last_fitness,
                  student_row,domain_id,score_function,compensatory,number_generations,
                  crossover_probability,mutation_probability,seed)


back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API