https://github.com/F-FIDELO-19-008-FLEURY/course-recommender
Tip revision: 40f04981dce459338256cd143adb2beea5fcb258 authored by luispinos on 08 July 2024, 10:00:53 UTC
Update codemeta.json
Update codemeta.json
Tip revision: 40f0498
script_rec_function.py
# -*- coding: utf-8 -*-
"""
Created on Tue Oct 24 17:11:21 2023
@author: luis.pinos-ullauri
"""
from scipy.special import expit
import read_functions as rdf
import math
import os
import pygad
import random as rand
import numpy as np
import time
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
############### User-defined functions for the Genetic Algorithm ##############
########## Estimation of the fitness and score functions
### Function that returns the overall batch fitness of the list solutions across all soft skill dimensions
### It checks whether the fitness amongst the different dimensions is compensated or not
### as well as which scoring function to use
def fitness_func_batch(ga_instance, solutions, solutions_indices):
batch_fitness = []
soft_skill_scores=[]
for i in range(len(solutions)):
solution=solutions[i]
soft_skill_scores_solution=[]
if compensatory:
fitness=0
else:
fitness=1
for i in range(10):#soft skill id from 0 to 9
estimated_outcome=soft_skill_estimation_mean(solution,i)
if score_function==1:
score=linear(estimated_outcome,i)
elif score_function==2:
score=logistic(estimated_outcome,i)
elif score_function==3:
score=quadratic(estimated_outcome,i)
if ga_instance is None:
soft_skill_scores_solution.append(score)
if compensatory:
fitness=fitness+score*(1/10)
else:
fitness=fitness*score
batch_fitness.append(fitness)
if ga_instance is None:
soft_skill_scores.append(soft_skill_scores_solution)
if ga_instance is None:
return batch_fitness,soft_skill_scores
else:
return batch_fitness
### Function that returns the overall fitness of the solution across all soft skill dimensions
### It checks whether the fitness amongst the different dimensions is compensated or not
### as well as which scoring function to use
def fitness_func(ga_instance, solution, solution_idx):
soft_skill_scores=[]
if compensatory:
fitness=0
else:
fitness=1
for i in range(10):#soft skill id from 0 to 9
estimated_outcome=soft_skill_estimation_mean(solution,i)
if score_function==1:
score=linear(estimated_outcome,i)
elif score_function==2:
score=logistic(estimated_outcome,i)
elif score_function==3:
score=quadratic(estimated_outcome,i)
if ga_instance is None:
soft_skill_scores.append(score)
if compensatory:
fitness=fitness+score*(1/10)
else:
fitness=fitness*score
if ga_instance is None:
return fitness,soft_skill_scores
return fitness
### Linear scoring function
### Fair scoring with no bonuses nor penalisations if the
### soft skill proficiency mean reaches or not the expected profile
### Scoring function rescaled so that it ranges from 0 to 1
def linear(estimated_skill,soft_skill_id):
#function min value
f_min=min_skill-desired_outcome[soft_skill_id]
#function max value
f_max=max_skill-desired_outcome[soft_skill_id]
return (estimated_skill-desired_outcome[soft_skill_id]-f_min)/(f_max-f_min)
### Logistic scoring function
### Stricter scoring with penalisations and bonuses if the
### soft skill proficiency mean reaches or not the expcted profile
### Scoring function rescaled so that it ranges from 0 to 1
def logistic(estimated_skill,soft_skill_id):
#crossing value with linear function at estimated_goal=goal_skill
fcrossing=(desired_outcome[soft_skill_id]-min_skill)/(max_skill-min_skill)
return (1)/(1+((1-fcrossing)/fcrossing)*pow(math.e,3*(desired_outcome[soft_skill_id]-estimated_skill)))
### Quadratic Root scoring function
### Less demanding scoring that allows an easier scoring if the
### soft skill proficiency mean reaches or not the expcted profile
### Scoring function rescaled so that it ranges from 0 to 1
def quadratic(estimated_skill,soft_skill_id):
#crossing value with linear function at estimated_goal=goal_skill
fcrossing=(desired_outcome[soft_skill_id]-min_skill)/(max_skill-min_skill)
#function min value
f_min=-1*pow(min_skill-desired_outcome[soft_skill_id],2)
#function max value
f_max=1*pow(max_skill-desired_outcome[soft_skill_id],2)
if estimated_skill<=desired_outcome[soft_skill_id]:
return -1*pow(estimated_skill-desired_outcome[soft_skill_id],2)/(-f_min)*fcrossing+fcrossing
else:
return 1*pow(estimated_skill-desired_outcome[soft_skill_id],2)/(f_max)*(1-fcrossing)+fcrossing
### Function that estimates the soft skill mean based on the ordinal logistic regression model
### It calculates the probability of each level and estimates the mean SUM x*P(X=x)
def soft_skill_estimation_mean(solution,soft_skill_id):
linear_combination=0
for i in range(len(solution)):
if solution[i]!=0:
linear_combination=linear_combination+courses_effects.iloc[soft_skill_id,0]+courses_effects.iloc[soft_skill_id,solution[i]]
linear_combination=linear_combination+theta[soft_skill_id]
eta12=thresholds.iloc[soft_skill_id,0]-linear_combination
eta23=thresholds.iloc[soft_skill_id,1]-linear_combination
eta34=thresholds.iloc[soft_skill_id,2]-linear_combination
p_1=expit(eta12)
p_2=expit(eta23)-p_1
p_3=expit(eta34)-p_1-p_2
p_4=1-p_3-p_2-p_1
expected_outcome=1*p_1+2*p_2+3*p_3+4*p_4
return expected_outcome
########## Genetic Algorithm user-defined functions
### Function that performs uniform crossover between two parent combinations
### maintaining the constraint of different followed courses
def unifcrossover_func(parents, offspring_size, ga_instance):
offspring = []
idx = 0
while len(offspring) != offspring_size[0]:
parent1 = parents[idx % parents.shape[0], :].copy()
parent2 = parents[(idx + 1) % parents.shape[0], :].copy()
child=parent2.copy()#child is copy of parent2 by default
if np.random.random()<=ga_instance.crossover_probability:
for i in range(len(parent1)):
if parent1[i] not in parent2:#gene parent1[i] not in parent[2]
if rand.random()<0.5:#copy parent1
child[i]=parent1[i]
offspring.append(child)
idx += 1
return np.array(offspring)
### Function that performs a single point mutation on one course
### It checks whether the new course is already in the combination and if so
### Generates a new one
def mutation_func(offspring, ga_instance):
view = offspring.shape[0]
if np.random.random()<=ga_instance.mutation_probability:
for chromosome_idx in range(view):
available_course_for_sol = np.setdiff1d(possible_courses,offspring[chromosome_idx]) #Remove all course of indiv in the catalog to draw a rn value in
if len(available_course_for_sol)>=1:
random_gene=np.random.choice(available_course_for_sol)
random_gene_idx = np.random.choice(range(offspring.shape[1]))
offspring[chromosome_idx,random_gene_idx]=random_gene
return offspring
### Function that selects parents based on a tournament
### Random parents are selected, which then fight against one another and the ones with the best fitness are selected
def parent_selection_func(fitness, num_parents, ga_instance):
parents = np.empty((num_parents, ga_instance.population.shape[1]))
parents_indices = []
for parent_num in range(num_parents):
# Generate random indices for the candiadate solutions.
rand_indices = np.random.randint(low=0.0, high=len(fitness), size=ga_instance.K_tournament-1)
K_fitnesses = fitness[rand_indices]
selected_parent_idx = rand_indices[np.where(K_fitnesses == np.max(K_fitnesses))[0][0]]
# Append the index of the selected parent.
parents_indices.append(selected_parent_idx)
# Insert the selected parent.
parents[parent_num, :] = ga_instance.population[selected_parent_idx, :].copy()
return parents,np.array(parents_indices)
### Function that register the elapsed time between the first generation and the current one
### It saves the values in a global variable which is used in other functions
def on_fitness(ga_instance, population_fitness):
end_time = time.time()
elapsed_time=end_time-start_time
global generation_time
generation_time.append(elapsed_time)
################# Writing functions for the solutions #########################
### Function that parses an individual into a coded string
### Returns a string based on three different versions depending on the fitness and combination
def parse_individual(fitness,combination):
string=""
### VERSION WITHOUT FITNESS: course identifier SPACE .... course identifier SKIPLINE
if fitness is None and combination is not None:
for course in combination:
string=string+str(course)+" "
string=string+'\n'
### VERSION WITHOUT COMBINATION: fitness value SPACE soft skill score SPACE ..... soft skill score SKIPLINE
if combination is None and type(fitness) is list:
#overall fitness
string=string+'{0:.3f}'.format(fitness[0])
#soft skills scores
for soft_skill_score in fitness[1]:
string=string+" "+'{0:.3f}'.format(soft_skill_score)
string=string+'\n'
### VERSION WITH COMBINATION AND FITNESS: course identifier SPACE .... course identifier fitness value SKIPLINE
if combination is not None and fitness is not None:
for course in combination:
string=string+str(course)+" "
string=string+'{0:.3f}'.format(fitness[0])
if type(fitness) is tuple:
#soft skills scores
for soft_skill_score in fitness[1]:
string=string+" "+'{0:.3f}'.format(soft_skill_score)
string=string+'\n'
return string
### Function that writes the fitness run file of the initial and last population
### There are 200 individuals (lines), 100 of the first and 100 of the last generation
### Calls the parse individual function without the combination version
def write_fit_run(pop_init_fitness,pop_last_fitness,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed):
string_file=""
for i in range(2):
if i==0:
population_fitness=pop_init_fitness
else:
population_fitness=pop_last_fitness
for i in range(len(population_fitness[0])):
#List containing the overall fitness and the soft skill scores
individual_fitness=[]
individual_fitness.append(population_fitness[0][i])
individual_fitness.append(population_fitness[1][i])
string_file=string_file+parse_individual(individual_fitness,None)
#file name
file_name=path+"fit_"+str(student_id)+"_"+str(number_generations)+"_"+str(int(crossover_probability*100))+"_"+str(int(mutation_probability*100))+"_"+str(seed)+".garcs"
file=open(file_name,"w")
file.write(string_file)
file.close()
return string_file
### Function that writes the population run file of the initial and last population
### There are 200 individuals (lines), 100 of the first and 100 of the last generation
### Then, there are 200 lines, where each shows the combination solutions
### Receives as parameter the string file of the fit run file and adds the combinations
### Calls the parse individual function with the combination version
def write_pop_run(initial_population,last_population,fitness_string_file,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed):
#sort the solutions of the initial and last population
initial_population.sort()
last_population.sort()
#fitness string file of both populations
string_file=fitness_string_file
for i in range(2):
if i==0:
population=initial_population
else:
population=last_population
for combination in population:
string_file=string_file+parse_individual(None,combination)
#file name
file_name=path+"pop_"+str(student_id)+"_"+str(number_generations)+"_"+str(int(crossover_probability*100))+"_"+str(int(mutation_probability*100))+"_"+str(seed)+".garcs"
file=open(file_name,"w")
file.write(string_file)
file.close()
### Function that writes the best solution run file
### It writes the best solution calling the parse individual function
def write_best_sol_run(best_solution,best_fitness,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed):
#sort the best solution
best_solution.sort()
#parsing the solution
string_file=parse_individual(best_fitness, best_solution)
#file name
file_name=path+"bestsol_"+str(student_id)+"_"+str(number_generations)+"_"+str(int(crossover_probability*100))+"_"+str(int(mutation_probability*100))+"_"+str(seed)+".garcs"
file=open(file_name,"w")
file.write(string_file)
file.close()
### Function that writes the best all run file
### It writes the best solutions per generation along the elapsed time taken
### The number of lines is equal to n+1 generations
def write_best_all_run(best_fitness_by_gen,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed):
global generation_time
for j in range(len(best_fitness_by_gen)):
if j==0:
string_file="0.000 "+'{0:.3f}'.format(best_fitness_by_gen[j])+'\n'
else:
string_file=string_file+'{0:.3f}'.format(generation_time[j-1])+" "+'{0:.3f}'.format(best_fitness_by_gen[j])+'\n'
#file title
file_name=path+"bestall_"+str(student_id)+"_"+str(number_generations)+"_"+str(int(crossover_probability*100))+"_"+str(int(mutation_probability*100))+"_"+str(seed)+".garcs"
file=open(file_name,"w")
file.write(string_file)
file.close()
### Function that calls the other writing functions based on the run results
def write_files(solution,solution_fitness,best_fitness_by_gen,initial_population,last_population,pop_init_fitness,pop_last_fitness,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed):
fitness_string_file=write_fit_run(pop_init_fitness,pop_last_fitness,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed)
write_pop_run(initial_population,last_population,fitness_string_file,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed)
write_best_sol_run(solution,solution_fitness,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed)
write_best_all_run(best_fitness_by_gen,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed)
###################### Parameter input for the script #########################
### Formatting of the parser
parser = ArgumentParser(description="Genetic Algorithm Script",formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("-s", "--student_row", default=0,type=int, help="Student id row")
parser.add_argument("-d", "--domain_id", default=1, type=int, help="Domain id EE=1,IS=2,MX=3,NU=4")
parser.add_argument("-f", "--score_function", default=1, type=int, help="Score function Linear=1,Logistic=2,Quadratic=3")
parser.add_argument("-c", "--compensatory", default=False, type=lambda x: (str(x).lower() == 'true' or str(x).lower() == 't' or str(x)=='1'), help="Compensatory=True, Partially Compensatory=False")
parser.add_argument("-g", "--number_generations", default=100, type=int, help="Number of generations")
parser.add_argument("-x", "--crossover_probability", default=0.65, type=float, help="Probability of crossover")
parser.add_argument("-m", "--mutation_probability", default=0.15, type=float, help="Probability of mutation")
parser.add_argument("-n", "--sol_per_pop", default=100, type=int, help="Population size")
parser.add_argument("-e", "--keep_elitism", default=1, type=int, help="Number of combinations kept after each generation")
parser.add_argument("-i", "--seed", default=1, type=int, help="Seed")
### Parsing the arguments
args = vars(parser.parse_args())
### Parameter reading
student_row=args["student_row"]
domain_id=args["domain_id"]
score_function=args["score_function"]
compensatory=args["compensatory"]
number_generations=args["number_generations"]
crossover_probability=args["crossover_probability"]
mutation_probability=args["mutation_probability"]
sol_per_pop=args["sol_per_pop"]
keep_elitism=args["keep_elitism"]
seed=args["seed"]
### Setting the Genetic Algorithm parameters
#Read real data set
real_data=rdf.read_real_data()
#Considering only stage 2
real_data_stage2=real_data.loc[real_data["stage"]==2]
#Considering only students with more than 5 courses
real_data_stage2=real_data_stage2.loc[real_data_stage2["N_courses_followed"]>5]
#Shrinking the dataset for domain of interest
real_data_stage2=real_data_stage2.loc[real_data_stage2["domain_id"]==domain_id]
real_data_stage2=real_data_stage2.reset_index(drop=True)
#student id
student_id=real_data_stage2.iloc[student_row,0]
#Number of followed courses
N_courses_followed=real_data_stage2.iloc[student_row,25]
#Thresholds
thresholds=rdf.get_thresholds()
#Course effects
courses_effects=rdf.get_courses_effects()
#Minimum soft skill score
min_skill=1
#Maximum soft skill score
max_skill=4
#Student random effects
theta=rdf.get_student_random_effect(student_id)
#Expected profile of domain of interest
desired_outcome=rdf.get_desired_outcome(domain_id)
#List of possible courses for the gene space
possible_courses=rdf.get_courses_domain(domain_id)
#Number of parent combinations mating
num_parents_mating=int(sol_per_pop/2)
#Number of parents to keep for the next generation
keep_parents=keep_elitism
#Parent selection function
parent_selection_type=parent_selection_func
#Time variables
start_time = time.time()
generation_time=[]
############################ Genetic Algorithm Launch #########################
ga_instance = pygad.GA(num_generations=number_generations,
num_parents_mating=num_parents_mating,
fitness_func=fitness_func,
crossover_type=unifcrossover_func,
mutation_type=mutation_func,
crossover_probability=crossover_probability,
mutation_probability=mutation_probability,
#fitness_batch_size=10,
sol_per_pop=sol_per_pop,
num_genes=int(N_courses_followed),
allow_duplicate_genes=False,
parent_selection_type=parent_selection_type,
gene_type=int,gene_space=possible_courses,
keep_elitism=keep_elitism,
on_fitness=on_fitness,
random_seed=seed,
suppress_warnings=True
)
ga_instance.run()
solution, solution_fitness, solution_idx = ga_instance.best_solution()
solution.sort()
print("Parameters of the best solution :",solution)
print("Fitness value of the best solution = {solution_fitness}".format(solution_fitness=solution_fitness))
print("Crossover probability:",ga_instance.crossover_probability)
print("Mutation probability:",ga_instance.mutation_probability)
print(f"Best fitness value reached after {ga_instance.best_solution_generation} generations.")
#initial population
initial_population=ga_instance.initial_population.copy()
#last population
last_population=ga_instance.population.copy()
#fitness of initial population
pop_init_fitness=fitness_func_batch(None,initial_population,None)
#fitness of last population
pop_last_fitness=fitness_func_batch(None,last_population,None)
#best fitness by generation
best_fitness_by_gen=ga_instance.best_solutions_fitness
#Fitness with scores of the solution
solution_fitness=fitness_func(None,solution,None)
#directory setting
file_directory=rdf.toStringfilestructure(domain_id,compensatory,score_function,student_id,folder="/results/")
path="/home/luis.pinos-ullauri"+file_directory
mode = 0o777
if not os.path.exists(path):
os.makedirs(path,mode)
#writing results
write_files(solution,solution_fitness,best_fitness_by_gen,initial_population,last_population,pop_init_fitness,pop_last_fitness,
student_id,domain_id,score_function,compensatory,number_generations,
crossover_probability,mutation_probability,seed)
