MH-P2/src/genetic_algorithm.py

229 lines
8.3 KiB
Python

from numpy import sum, append, arange, delete, where
from numpy.random import randint, choice, shuffle
from pandas import DataFrame
from math import ceil
from functools import partial
from multiprocessing import Pool
from preprocessing import parse_file
def get_row_distance(source, destination, data):
row = data.query(
"""(source == @source and destination == @destination) or \
(source == @destination and destination == @source)"""
)
return row["distance"].values[0]
def compute_distance(element, individual, data):
accumulator = 0
distinct_elements = individual.query(f"point != {element}")
for _, item in distinct_elements.iterrows():
accumulator += get_row_distance(
source=element, destination=item.point, data=data
)
return accumulator
def generate_individual(n, m, data):
individual = DataFrame(columns=["point", "distance", "fitness"])
individual["point"] = choice(n, size=m, replace=False)
individual["distance"] = individual["point"].apply(
func=compute_distance, individual=individual, data=data
)
return individual
def evaluate_individual(individual, data):
fitness = []
genotype = individual.point.values
distances = data.query(f"source in @genotype and destination in @genotype")
for item in genotype[:-1]:
element_df = distances.query(f"source == {item} or destination == {item}")
max_distance = element_df["distance"].astype(float).max()
fitness = append(arr=fitness, values=max_distance)
distances = distances.query(f"source != {item} and destination != {item}")
individual["fitness"] = sum(fitness)
return individual
def select_distinct_genes(matching_genes, parents, m):
first_parent = parents[0].query("point not in @matching_genes")
second_parent = parents[1].query("point not in @matching_genes")
cutoff = randint(len(first_parent.point.values))
first_parent_genes = first_parent.point.values[cutoff:]
second_parent_genes = second_parent.point.values[:cutoff]
return first_parent_genes, second_parent_genes
def select_random_genes(matching_genes, parents, m):
random_parent = parents[randint(len(parents))]
distinct_indexes = delete(arange(m), matching_genes)
genes = random_parent.point.iloc[distinct_indexes].values
shuffle(genes)
return genes
def repair_offspring(offspring, parents, m):
while len(offspring) != m:
if len(offspring) > m:
best_index = offspring["distance"].idxmax()
offspring.drop(index=best_index, inplace=True)
elif len(offspring) < m:
random_parent = parents[randint(len(parents))]
while True:
best_index = random_parent["distance"].idxmax()
best_point = random_parent["point"].loc[best_index]
random_parent.drop(index=best_index, inplace=True)
if not any(offspring["point"].isin([best_point])):
break
offspring = offspring.append(
{"point": best_point, "distance": 0, "fitness": 0}, ignore_index=True
)
return offspring
def get_matching_genes(parents):
first_parent = parents[0].point.values
second_parent = parents[1].point.values
return where(first_parent == second_parent)[0]
def populate_offspring(values):
offspring = DataFrame(columns=["point", "distance", "fitness"])
for element in values:
aux = DataFrame(columns=["point", "distance", "fitness"])
aux["point"] = element
offspring = offspring.append(aux)
offspring["distance"] = 0
offspring["fitness"] = 0
offspring = offspring[1:]
return offspring
def uniform_crossover(parents, m):
matching_genes = get_matching_genes(parents)
first_genes, second_genes = select_distinct_genes(matching_genes, parents, m)
offspring = populate_offspring(values=[matching_genes, first_genes, second_genes])
viable_offspring = repair_offspring(offspring, parents, m)
return viable_offspring
def position_crossover(parents, m):
matching_genes = get_matching_genes(parents)
shuffled_genes = select_random_genes(matching_genes, parents, m)
first_offspring = populate_offspring(values=[matching_genes, shuffled_genes])
second_offspring = populate_offspring(values=[matching_genes, shuffled_genes])
return [first_offspring, second_offspring]
def crossover(mode, parents, m):
split_parents = [parents[i : i + 2] for i in range(0, len(parents), 2)]
if mode == "uniform":
crossover_func = partial(uniform_crossover, m=m)
else:
crossover_func = partial(position_crossover, m=m)
offspring = [*map(crossover_func, split_parents)]
return offspring
def element_in_dataframe(individual, element):
duplicates = individual.query(f"point == {element}")
return not duplicates.empty
def select_new_gene(individual, n):
while True:
new_gene = randint(n)
if not element_in_dataframe(individual=individual, element=new_gene):
return new_gene
def mutate(population, n, probability=0.001):
expected_mutations = len(population) * n * probability
individuals = []
genes = []
for _ in range(ceil(expected_mutations)):
individuals.append(randint(n))
current_individual = individuals[-1]
genes.append(population[current_individual].sample().index)
for ind, gen in zip(individuals, genes):
individual = population[ind]
individual["point"].iloc[gen] = select_new_gene(individual, n)
individual["distance"].iloc[gen] = 0
return population
def tournament_selection(m, population):
individuals = [population[randint(m)] for _ in range(2)]
best_index = population.index(max(population, key=lambda x: all(x.fitness)))
return individuals[best_index]
def generational_replacement(previous_population, current_population):
new_population = current_population
best_previous_individual = max(previous_population, key=lambda x: all(x.fitness))
if best_previous_individual not in new_population:
worst_index = new_population.index(
min(new_population, key=lambda x: all(x.fitness))
)
new_population[worst_index] = best_previous_individual
return new_population
def get_best_elements(population):
first_index = population.index(max(population, key=lambda x: all(x.fitness)))
population.pop(first_index)
second_index = population.index(max(population, key=lambda x: all(x.fitness)))
return first_index, second_index
def get_worst_elements(population):
first_index = population.index(min(population, key=lambda x: all(x.fitness)))
population.pop(first_index)
second_index = population.index(min(population, key=lambda x: all(x.fitness)))
return first_index, second_index
def stationary_replacement(prev_population, current_population):
new_population = prev_population
worst_indexes = get_worst_elements(prev_population)
best_indexes = get_best_elements(current_population)
for worst, best in zip(worst_indexes, best_indexes):
if current_population[best].fitness > prev_population[worst].fitness:
new_population[worst] = current_population[best]
return new_population
def replace_population(prev_population, current_population, mode):
if mode == "generational":
return generational_replacement(prev_population, current_population)
return stationary_replacement(prev_population, current_population)
def evaluate_population(population, data, cores=4):
fitness_func = partial(evaluate_individual, data=data)
with Pool(cores) as pool:
evaluated_population = pool.map(fitness_func, population)
return evaluated_population
def select_new_population(population, n, m, mode):
if mode == "generational":
parents = [tournament_selection(m, population) for _ in range(n)]
else:
parents = [tournament_selection(m, population) for _ in range(2)]
return parents
def genetic_algorithm(n, m, data, mode, max_iterations=100000):
population = [generate_individual(n, m, data) for _ in range(n)]
population = evaluate_population(population, data)
for _ in range(max_iterations):
parents = select_new_population(population, n, m, mode)
n, m, data = parse_file("data/GKD-c_11_n500_m50.txt")
genetic_algorithm(n=10, m=5, data=data, mode="generational", max_iterations=1)