diff --git a/pygad/pygad.py b/pygad/pygad.py index 6eca219..f2a05a8 100644 --- a/pygad/pygad.py +++ b/pygad/pygad.py @@ -5,6 +5,9 @@ import concurrent.futures import inspect import logging + +import numpy as np + from pygad import utils from pygad import helper from pygad import visualize @@ -25,6 +28,8 @@ class GA(utils.parent_selection.ParentSelection, object] supported_int_float_types = supported_int_types + supported_float_types + boundaries = None + def __init__(self, num_generations, num_parents_mating, @@ -36,6 +41,7 @@ def __init__(self, init_range_low=-4, init_range_high=4, gene_type=float, + gene_structure=None, parent_selection_type="sss", keep_parents=-1, keep_elitism=1, @@ -85,6 +91,10 @@ def __init__(self, # It is OK to set the value of the 2 parameters ('init_range_low' and 'init_range_high') to be equal, higher or lower than the other parameter (i.e. init_range_low is not needed to be lower than init_range_high). gene_type: The type of the gene. It is assigned to any of these types (int, numpy.int8, numpy.int16, numpy.int32, numpy.int64, numpy.uint, numpy.uint8, numpy.uint16, numpy.uint32, numpy.uint64, float, numpy.float16, numpy.float32, numpy.float64) and forces all the genes to be of that type. + gene_structure: A List of containing positive integers, that indicate the number of int/floats inside a single gene_type/index of the gene + Example: num_genes=10; gene_structure=[1,2,5,1,1] -> 5 genes in total, first gene has 1 int/float, second gene has 2 int/floats, third gene has 5 int/floats, fourth gene has 1 int/float, and fifth gene has 1 int/float. + Note: The sum of the values in the 'gene_structure' list must be equal to the value assigned to the 'num_genes' parameter. + parent_selection_type: Type of parent selection. keep_parents: If 0, this means no parent in the current population will be used in the next population. If -1, this means all parents in the current population will be used in the next population. If set to a value > 0, then the specified value refers to the number of parents in the current population to be used in the next population. Some parent selection operators such as rank selection, favor population diversity and therefore keeping the parents in the next generation can be beneficial. However, some other parent selection operators, such as roulette wheel selection (RWS), have higher selection pressure and keeping more than one parent in the next generation can seriously harm population diversity. This parameter have an effect only when the keep_elitism parameter is 0. Thanks to Prof. Fernando Jiménez (http://webs.um.es/fernan) for editing this sentence. @@ -335,6 +345,14 @@ def __init__(self, self.init_range_low = init_range_low self.init_range_high = init_range_high + # Transform gene_structure to np.array + if (gene_structure is not None): + self.gene_structure = np.array(gene_structure) + # Example: [1, 1, 3] -> [0, 1, 2, 5] + self.boundaries = numpy.insert(numpy.cumsum(self.gene_structure), 0, 0) + else: + self.gene_structure = None + # Validate gene_type if gene_type in GA.supported_int_float_types: self.gene_type = [gene_type, None] @@ -1351,10 +1369,15 @@ def round_genes(self, solutions): self.gene_type[gene_idx][1]) return solutions + def check_gene_structure(self,gene_structure,num_gene): + assert (np.sum(gene_structure)==num_gene, "the sum of all integers inside the gene_structure parameter must equal the num_gene parameter") + # it is also possible to write this function in such way that we return the num_gene and therefore ensure it this way, this likely would produce more user errors + def initialize_population(self, allow_duplicate_genes, gene_type, - gene_constraint): + gene_constraint, + gene_structure=None): """ Creates an initial population randomly as a NumPy array. The array is saved in the instance attribute named 'population'. @@ -1363,6 +1386,9 @@ def initialize_population(self, -gene_type: The data type of the genes. -gene_constraint: The constraints of the genes. + -gene_structure: A List of containing positive integers, that indicate the number of int/floats inside a single gene_type/index of the gene + Example: num_genes=10; gene_structure=[1,2,5,1,1] + This method assigns the values of the following 3 instance attributes: 1. pop_size: Size of the population. 2. population: Initially, holds the initial population and later updated after each generation. diff --git a/pygad/utils/crossover.py b/pygad/utils/crossover.py index 86d92f6..c7d87ad 100644 --- a/pygad/utils/crossover.py +++ b/pygad/utils/crossover.py @@ -31,9 +31,20 @@ def single_point_crossover(self, parents, offspring_size): # Randomly generate all the K points at which crossover takes place between each two parents. The point does not have to be always at the center of the solutions. # This saves time by calling the numpy.random.randint() function only once. - crossover_points = numpy.random.randint(low=0, - high=parents.shape[1], - size=offspring_size[0]) + if self.gene_structure is None: + crossover_points = numpy.random.randint(low=0, + high=parents.shape[1], + size=offspring_size[0]) + else: + # Select random boundary index excluding the 0th index and last index to ensure split? + # boundaries: [0, 2, 5, 10] -> valid cuts are 2, 5. So boundaries[1:-1] + # If structure has only 1 block, crossover point is meaningless? Standard GA allows index (0..N). + # Standard: low=0, high=N. Point K means split at index K. + # 0 -> Empty first part, Full second part. + # N -> Full first part, Empty second. + # So boundaries are valid crossover points immediately. + valid_cuts = self.boundaries + crossover_points = numpy.random.choice(valid_cuts, size=offspring_size[0]) for k in range(offspring_size[0]): # Check if the crossover_probability parameter is used. @@ -97,15 +108,54 @@ def two_points_crossover(self, parents, offspring_size): # Randomly generate all the first K points at which crossover takes place between each two parents. # This saves time by calling the numpy.random.randint() function only once. - if (parents.shape[1] == 1): # If the chromosome has only a single gene. In this case, this gene is copied from the second parent. - crossover_points_1 = numpy.zeros(offspring_size[0]) + if self.gene_structure is None: + if (parents.shape[1] == 1): + crossover_points_1 = numpy.zeros(offspring_size[0]) + else: + crossover_points_1 = numpy.random.randint(low=0, + high=numpy.ceil(parents.shape[1]/2 + 1), + size=offspring_size[0]) + # The second point must always be greater than the first point. + crossover_points_2 = crossover_points_1 + int(parents.shape[1]/2) else: - crossover_points_1 = numpy.random.randint(low=0, - high=numpy.ceil(parents.shape[1]/2 + 1), - size=offspring_size[0]) - - # The second point must always be greater than the first point. - crossover_points_2 = crossover_points_1 + int(parents.shape[1]/2) + num_logical = len(self.gene_structure) + if num_logical < 2: + # Can't do meaningful 2-point on < 2 blocks? Standard allows splitting anywhere. + # If only 1 block [0, 10], Boundaries are 0, 10. + # Cuts: 0, 10. + crossover_points_1 = numpy.zeros(offspring_size[0], dtype=int) + crossover_points_2 = numpy.full(offspring_size[0], parents.shape[1], dtype=int) + else: + # Select 2 distinct boundary indices. + # To ensure p1 != p2 efficiently for N offspring: + # 1. Pick p1_idx from [0, num_boundaries). + # 2. Pick offset from [1, num_boundaries). + # 3. p2_idx = (p1_idx + offset) % num_boundaries. + # This guarantees p1_idx != p2_idx. + + num_boundaries = len(self.boundaries) + # We need indices [0, num_boundaries-1] + + # p1 indices: + p1_idx = numpy.random.randint(low=0, high=num_boundaries, size=offspring_size[0]) + + # Offsets: at least 1, at most num_boundaries-1 + # If num_boundaries <= 1 (impossible if num_logical >= 1, boundaries has at least 0, N), + # but if num_logical=1, boundaries=[0, N]. len=2. offset must be 1. + # randint(1, 2) -> returns [1]. Correct. + + offsets = numpy.random.randint(low=1, high=num_boundaries, size=offspring_size[0]) + + p2_idx = (p1_idx + offsets) % num_boundaries + + # Gather points + # Stack to sort + points_idx = numpy.column_stack((p1_idx, p2_idx)) + points_idx.sort(axis=1) + + # Map indices to actual boundary values + crossover_points_1 = self.boundaries[points_idx[:, 0]] + crossover_points_2 = self.boundaries[points_idx[:, 1]] for k in range(offspring_size[0]): @@ -172,9 +222,22 @@ def uniform_crossover(self, parents, offspring_size): # This saves time by calling the numpy.random.randint() function only once. # There is a list of 0 and 1 for each offspring. # [0, 1, 0, 0, 1, 1]: If the value is 0, then take the gene from the first parent. If 1, take it from the second parent. - genes_sources = numpy.random.randint(low=0, - high=2, - size=offspring_size) + if self.gene_structure is None: + genes_sources = numpy.random.randint(low=0, + high=2, + size=offspring_size) + else: + # Generate sources for LOGICAL blocks + num_logical = len(self.gene_structure) + logical_sources = numpy.random.randint(low=0, high=2, size=(offspring_size[0], num_logical)) + + # Map logical sources to full gene mask + genes_sources = numpy.empty(offspring_size, dtype=int) + for k in range(offspring_size[0]): + for b_idx in range(num_logical): + start = self.boundaries[b_idx] + end = self.boundaries[b_idx+1] + genes_sources[k, start:end] = logical_sources[k, b_idx] for k in range(offspring_size[0]): if not (self.crossover_probability is None): @@ -242,9 +305,22 @@ def scattered_crossover(self, parents, offspring_size): # This saves time by calling the numpy.random.randint() function only once. # There is a list of 0 and 1 for each offspring. # [0, 1, 0, 0, 1, 1]: If the value is 0, then take the gene from the first parent. If 1, take it from the second parent. - genes_sources = numpy.random.randint(low=0, - high=2, - size=offspring_size) + if self.gene_structure is None: + genes_sources = numpy.random.randint(low=0, + high=2, + size=offspring_size) + else: + # Generate sources for LOGICAL blocks + num_logical = len(self.gene_structure) + logical_sources = numpy.random.randint(low=0, high=2, size=(offspring_size[0], num_logical)) + + # Map logical sources to full gene mask + genes_sources = numpy.empty(offspring_size, dtype=int) + for k in range(offspring_size[0]): + for b_idx in range(num_logical): + start = self.boundaries[b_idx] + end = self.boundaries[b_idx+1] + genes_sources[k, start:end] = logical_sources[k, b_idx] for k in range(offspring_size[0]): if not (self.crossover_probability is None): diff --git a/pygad/utils/mutation.py b/pygad/utils/mutation.py index d51b7ed..a874160 100644 --- a/pygad/utils/mutation.py +++ b/pygad/utils/mutation.py @@ -58,33 +58,53 @@ def mutation_by_space(self, offspring): # For each offspring, a value from the gene space is selected randomly and assigned to the selected mutated gene. for offspring_idx in range(offspring.shape[0]): - mutation_indices = numpy.array(random.sample(range(0, self.num_genes), self.mutation_num_genes)) - for gene_idx in mutation_indices: - - value_from_space = self.mutation_process_gene_value(solution=offspring[offspring_idx], - gene_idx=gene_idx, - sample_size=self.sample_size) - - # Before assigning the selected value from the space to the gene, change its data type and round it. - if self.gene_type_single == True: - if not self.gene_type[1] is None: - offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[0](value_from_space), - self.gene_type[1]) - else: - offspring[offspring_idx, gene_idx] = self.gene_type[0](value_from_space) - else: - if not self.gene_type[gene_idx][1] is None: - offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[gene_idx][0](value_from_space), - self.gene_type[gene_idx][1]) + if self.gene_structure is None: + mutation_indices = numpy.array(random.sample(range(0, self.num_genes), self.mutation_num_genes)) + logical_to_flat = [(idx, idx + 1) for idx in mutation_indices] + else: + num_logical_genes = len(self.gene_structure) + mutation_indices = numpy.array(random.sample(range(0, num_logical_genes), self.mutation_num_genes)) + # Map logical index to (start, end) using pre-calculated boundaries + logical_to_flat = [(self.boundaries[i], self.boundaries[i + 1]) for i in mutation_indices] + + for start, end in logical_to_flat: + # We must mutate every value within this block's range + for flat_idx in range(start, end): + # 1. Get the new value for this specific flat index + value_from_space = self.mutation_process_gene_value( + solution=offspring[offspring_idx], + gene_idx=flat_idx, + sample_size=self.sample_size + ) + + # 2. Assignment Logic (Now INSIDE the flat_idx loop) + if self.gene_type_single: + if self.gene_type[1] is not None: + offspring[offspring_idx, flat_idx] = numpy.round( + self.gene_type[0](value_from_space), self.gene_type[1] + ) + else: + offspring[offspring_idx, flat_idx] = self.gene_type[0](value_from_space) else: - offspring[offspring_idx, gene_idx] = self.gene_type[gene_idx][0](value_from_space) - - if self.allow_duplicate_genes == False: - offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space(solution=offspring[offspring_idx], - gene_type=self.gene_type, - sample_size=self.sample_size, - mutation_by_replacement=self.mutation_by_replacement, - build_initial_pop=False) + # Use flat_idx to get the specific type for this position + current_type = self.gene_type[flat_idx] + if current_type[1] is not None: + offspring[offspring_idx, flat_idx] = numpy.round( + current_type[0](value_from_space), current_type[1] + ) + else: + offspring[offspring_idx, flat_idx] = current_type[0](value_from_space) + + # 3. Duplicate check happens once per individual after all blocks are mutated + if not self.allow_duplicate_genes: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space( + solution=offspring[offspring_idx], + gene_type=self.gene_type, + sample_size=self.sample_size, + mutation_by_replacement=self.mutation_by_replacement, + build_initial_pop=False + ) + return offspring def mutation_probs_by_space(self, offspring): @@ -96,36 +116,69 @@ def mutation_probs_by_space(self, offspring): It returns an array of the mutated offspring using the mutation space. """ + # For each offspring, a value from the gene space is selected randomly and assigned to the selected mutated gene. # For each offspring, a value from the gene space is selected randomly and assigned to the selected mutated gene. for offspring_idx in range(offspring.shape[0]): - probs = numpy.random.random(size=offspring.shape[1]) - for gene_idx in range(offspring.shape[1]): - - if probs[gene_idx] <= self.mutation_probability: - value_from_space = self.mutation_process_gene_value(solution=offspring[offspring_idx], - gene_idx=gene_idx, - sample_size=self.sample_size) - - # Assigning the selected value from the space to the gene. - if self.gene_type_single == True: - if not self.gene_type[1] is None: - offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[0](value_from_space), - self.gene_type[1]) - else: - offspring[offspring_idx, gene_idx] = self.gene_type[0](value_from_space) - else: - if not self.gene_type[gene_idx][1] is None: - offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[gene_idx][0](value_from_space), - self.gene_type[gene_idx][1]) + if self.gene_structure is None: + probs = numpy.random.random(size=offspring.shape[1]) + for gene_idx in range(offspring.shape[1]): + + if probs[gene_idx] <= self.mutation_probability: + value_from_space = self.mutation_process_gene_value(solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) + + # Assigning the selected value from the space to the gene. + if self.gene_type_single == True: + if not self.gene_type[1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[0](value_from_space), + self.gene_type[1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[0](value_from_space) else: - offspring[offspring_idx, gene_idx] = self.gene_type[gene_idx][0](value_from_space) - - if self.allow_duplicate_genes == False: - offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space(solution=offspring[offspring_idx], - gene_type=self.gene_type, - sample_size=self.sample_size, - mutation_by_replacement=self.mutation_by_replacement, - build_initial_pop=False) + if not self.gene_type[gene_idx][1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[gene_idx][0](value_from_space), + self.gene_type[gene_idx][1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[gene_idx][0](value_from_space) + + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space(solution=offspring[offspring_idx], + gene_type=self.gene_type, + sample_size=self.sample_size, + mutation_by_replacement=self.mutation_by_replacement, + build_initial_pop=False) + else: + num_logical_genes = len(self.gene_structure) + probs = numpy.random.random(size=num_logical_genes) + for logical_idx in range(num_logical_genes): + if probs[logical_idx] <= self.mutation_probability: + start, end = self.boundaries[logical_idx], self.boundaries[logical_idx+1] + for gene_idx in range(start, end): + value_from_space = self.mutation_process_gene_value(solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) + + # Assigning the selected value from the space to the gene. + if self.gene_type_single == True: + if not self.gene_type[1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[0](value_from_space), + self.gene_type[1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[0](value_from_space) + else: + if not self.gene_type[gene_idx][1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[gene_idx][0](value_from_space), + self.gene_type[gene_idx][1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[gene_idx][0](value_from_space) + + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space(solution=offspring[offspring_idx], + gene_type=self.gene_type, + sample_size=self.sample_size, + mutation_by_replacement=self.mutation_by_replacement, + build_initial_pop=False) return offspring def mutation_process_gene_value(self, @@ -186,31 +239,38 @@ def mutation_randomly(self, offspring): It returns an array of the mutated offspring. """ + # Random mutation changes one or more genes in each offspring randomly. # Random mutation changes one or more genes in each offspring randomly. for offspring_idx in range(offspring.shape[0]): - # Return the indices of the genes to mutate. - mutation_indices = numpy.array(random.sample(range(0, self.num_genes), - self.mutation_num_genes)) - for gene_idx in mutation_indices: - - range_min, range_max = self.get_random_mutation_range(gene_idx) - - # Generate a random value for mutation that meet the gene constraint if exists. - random_value = self.mutation_process_gene_value(range_min=range_min, - range_max=range_max, - solution=offspring[offspring_idx], - gene_idx=gene_idx, - sample_size=self.sample_size) - - offspring[offspring_idx, gene_idx] = random_value - - if self.allow_duplicate_genes == False: - offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], - min_val=range_min, - max_val=range_max, - mutation_by_replacement=self.mutation_by_replacement, - gene_type=self.gene_type, - sample_size=self.sample_size) + if self.gene_structure is None: + mutation_indices = numpy.array(random.sample(range(0, self.num_genes), self.mutation_num_genes)) + logical_to_flat = [(idx, idx + 1) for idx in mutation_indices] + else: + num_logical_genes = len(self.gene_structure) + mutation_indices = numpy.array(random.sample(range(0, num_logical_genes), self.mutation_num_genes)) + logical_to_flat = [(self.boundaries[i], self.boundaries[i+1]) for i in mutation_indices] + + for start, end in logical_to_flat: + for gene_idx in range(start, end): + + range_min, range_max = self.get_random_mutation_range(gene_idx) + + # Generate a random value for mutation that meet the gene constraint if exists. + random_value = self.mutation_process_gene_value(range_min=range_min, + range_max=range_max, + solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) + + offspring[offspring_idx, gene_idx] = random_value + + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], + min_val=range_min, + max_val=range_max, + mutation_by_replacement=self.mutation_by_replacement, + gene_type=self.gene_type, + sample_size=self.sample_size) return offspring @@ -223,33 +283,60 @@ def mutation_probs_randomly(self, offspring): It returns an array of the mutated offspring. """ + # Random mutation changes one or more genes in each offspring randomly. # Random mutation changes one or more genes in each offspring randomly. for offspring_idx in range(offspring.shape[0]): # The mutation probabilities for the current offspring. - probs = numpy.random.random(size=offspring.shape[1]) - for gene_idx in range(offspring.shape[1]): + if self.gene_structure is None: + probs = numpy.random.random(size=offspring.shape[1]) + for gene_idx in range(offspring.shape[1]): - range_min, range_max = self.get_random_mutation_range(gene_idx) + range_min, range_max = self.get_random_mutation_range(gene_idx) - # A gene is mutated only if its mutation probability is less than or equal to the threshold. - if probs[gene_idx] <= self.mutation_probability: + # A gene is mutated only if its mutation probability is less than or equal to the threshold. + if probs[gene_idx] <= self.mutation_probability: - # Generate a random value fpr mutation that meet the gene constraint if exists. - random_value = self.mutation_process_gene_value(range_min=range_min, - range_max=range_max, - solution=offspring[offspring_idx], - gene_idx=gene_idx, - sample_size=self.sample_size) + # Generate a random value fpr mutation that meet the gene constraint if exists. + random_value = self.mutation_process_gene_value(range_min=range_min, + range_max=range_max, + solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) - offspring[offspring_idx, gene_idx] = random_value + offspring[offspring_idx, gene_idx] = random_value - if self.allow_duplicate_genes == False: - offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], - min_val=range_min, - max_val=range_max, - mutation_by_replacement=self.mutation_by_replacement, - gene_type=self.gene_type, - sample_size=self.sample_size) + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], + min_val=range_min, + max_val=range_max, + mutation_by_replacement=self.mutation_by_replacement, + gene_type=self.gene_type, + sample_size=self.sample_size) + else: + num_logical_genes = len(self.gene_structure) + probs = numpy.random.random(size=num_logical_genes) + for logical_idx in range(num_logical_genes): + if probs[logical_idx] <= self.mutation_probability: + start, end = self.boundaries[logical_idx], self.boundaries[logical_idx+1] + for gene_idx in range(start, end): + range_min, range_max = self.get_random_mutation_range(gene_idx) + + # Generate a random value fpr mutation that meet the gene constraint if exists. + random_value = self.mutation_process_gene_value(range_min=range_min, + range_max=range_max, + solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) + + offspring[offspring_idx, gene_idx] = random_value + + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], + min_val=range_min, + max_val=range_max, + mutation_by_replacement=self.mutation_by_replacement, + gene_type=self.gene_type, + sample_size=self.sample_size) return offspring def swap_mutation(self, offspring): @@ -262,12 +349,16 @@ def swap_mutation(self, offspring): """ for idx in range(offspring.shape[0]): - mutation_gene1 = numpy.random.randint(low=0, high=offspring.shape[1]/2, size=1)[0] - mutation_gene2 = mutation_gene1 + int(offspring.shape[1]/2) + if self.gene_structure is None: + mutation_gene1 = numpy.random.randint(low=0, high=offspring.shape[1]/2, size=1)[0] + mutation_gene2 = mutation_gene1 + int(offspring.shape[1]/2) - temp = offspring[idx, mutation_gene1] - offspring[idx, mutation_gene1] = offspring[idx, mutation_gene2] - offspring[idx, mutation_gene2] = temp + temp = offspring[idx, mutation_gene1] + offspring[idx, mutation_gene1] = offspring[idx, mutation_gene2] + offspring[idx, mutation_gene2] = temp + else: + # Swap mutation is not supported with gene_structure as it may break the structure. + pass return offspring def inversion_mutation(self, offspring): @@ -277,14 +368,48 @@ def inversion_mutation(self, offspring): It accepts: -offspring: The offspring to mutate. It returns an array of the mutated offspring. + + If gene_structure is used, we do NOT invert the order of the blocks themselves, + as that might violate structural integrity (different block sizes/types). + Instead, we select a range of logical blocks and invert the content WITHIN each block. + Example: Block A=[0, 1, 0], B=[1, 1, 0]. Logic selects range [A, B]. + Result: A becomes [1, 0, 1], B becomes [0, 0, 1]. The blocks stay in positions A, B. + Might be confusing, when shown with binary values, but basically we still use the numpy.flip function, which when + applied to a numpy array, reverses the order of the elements in the array. """ for idx in range(offspring.shape[0]): - mutation_gene1 = numpy.random.randint(low=0, high=numpy.ceil(offspring.shape[1]/2 + 1), size=1)[0] - mutation_gene2 = mutation_gene1 + int(offspring.shape[1]/2) + if self.gene_structure is None: + mutation_gene1 = numpy.random.randint(low=0, high=numpy.ceil(offspring.shape[1]/2 + 1), size=1)[0] + mutation_gene2 = mutation_gene1 + int(offspring.shape[1]/2) - genes_to_scramble = numpy.flip(offspring[idx, mutation_gene1:mutation_gene2]) - offspring[idx, mutation_gene1:mutation_gene2] = genes_to_scramble + genes_to_scramble = numpy.flip(offspring[idx, mutation_gene1:mutation_gene2]) + offspring[idx, mutation_gene1:mutation_gene2] = genes_to_scramble + else: + # gene_structure is used to define the structure of the gene. + # The inversion mutation is applied to the gene structure. + num_logical = len(self.gene_structure) + + mutation_block1 = numpy.random.randint(low=0, high=num_logical, size=1)[0] + # Default "range logic" from standard method is somewhat arbitrary (roughly half). + # Here we ensure we select at least 1 block to operate on. + mutation_block2 = mutation_block1 + 1 + + # Optional: Randomize length roughly similar to standard method (up to full length) + max_len = num_logical + if max_len > 1: + length = numpy.random.randint(low=1, high=max_len, size=1)[0] + mutation_block2 = mutation_block1 + length + if mutation_block2 > num_logical: + mutation_block2 = num_logical + + for b_idx in range(mutation_block1, mutation_block2): + start = self.boundaries[b_idx] + end = self.boundaries[b_idx+1] + + # Invert content of this block + genes_to_scramble = numpy.flip(offspring[idx, start:end]) + offspring[idx, start:end] = genes_to_scramble return offspring def scramble_mutation(self, offspring): @@ -294,16 +419,48 @@ def scramble_mutation(self, offspring): It accepts: -offspring: The offspring to mutate. It returns an array of the mutated offspring. + + When 'gene_structure' is used: + - This method performs 'Intra-Block Scramble'. + - It selects a range of logical blocks. + - It shuffles the internal genes of EACH selected block individually. + - It does NOT shuffle the blocks with each other, preserving the global structure. """ for idx in range(offspring.shape[0]): - mutation_gene1 = numpy.random.randint(low=0, high=numpy.ceil(offspring.shape[1]/2 + 1), size=1)[0] - mutation_gene2 = mutation_gene1 + int(offspring.shape[1]/2) - genes_range = numpy.arange(start=mutation_gene1, stop=mutation_gene2) - numpy.random.shuffle(genes_range) - - genes_to_scramble = numpy.flip(offspring[idx, genes_range]) - offspring[idx, genes_range] = genes_to_scramble + if self.gene_structure is None: + mutation_gene1 = numpy.random.randint(low=0, high=numpy.ceil(offspring.shape[1]/2 + 1), size=1)[0] + mutation_gene2 = mutation_gene1 + int(offspring.shape[1]/2) + genes_range = numpy.arange(start=mutation_gene1, stop=mutation_gene2) + numpy.random.shuffle(genes_range) + + genes_to_scramble = numpy.flip(offspring[idx, genes_range]) + offspring[idx, genes_range] = genes_to_scramble + else: + num_logical = len(self.gene_structure) + + # Intra-Block Scramble Logic + # Similar to inversion, we select logical blocks and shuffle ONLY the genes INSIDE them. + + mutation_block1 = numpy.random.randint(low=0, high=num_logical, size=1)[0] + mutation_block2 = mutation_block1 + 1 + + max_len = num_logical + if max_len > 1: + length = numpy.random.randint(low=1, high=max_len, size=1)[0] + mutation_block2 = mutation_block1 + length + if mutation_block2 > num_logical: + mutation_block2 = num_logical + + for b_idx in range(mutation_block1, mutation_block2): + start = self.boundaries[b_idx] + end = self.boundaries[b_idx+1] + + # Extract genes, shuffle them, and put them back + # Note: We must work on a COPY or index carefully to shuffle in place + block_genes = offspring[idx, start:end].copy() + numpy.random.shuffle(block_genes) + offspring[idx, start:end] = block_genes return offspring def adaptive_mutation_population_fitness(self, offspring): @@ -532,33 +689,41 @@ def adaptive_mutation_by_space(self, offspring): else: adaptive_mutation_num_genes = self.mutation_num_genes[1] - mutation_indices = numpy.array(random.sample(range(0, self.num_genes), adaptive_mutation_num_genes)) - for gene_idx in mutation_indices: + if self.gene_structure is None: + mutation_indices = numpy.array(random.sample(range(0, self.num_genes), adaptive_mutation_num_genes)) + logical_to_flat = [(idx, idx + 1) for idx in mutation_indices] + else: + num_logical_genes = len(self.gene_structure) + mutation_indices = numpy.array(random.sample(range(0, num_logical_genes), adaptive_mutation_num_genes)) + logical_to_flat = [(self.boundaries[i], self.boundaries[i+1]) for i in mutation_indices] - value_from_space = self.mutation_process_gene_value(solution=offspring[offspring_idx], - gene_idx=gene_idx, - sample_size=self.sample_size) + for start, end in logical_to_flat: + for gene_idx in range(start, end): - # Assigning the selected value from the space to the gene. - if self.gene_type_single == True: - if not self.gene_type[1] is None: - offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[0](value_from_space), - self.gene_type[1]) - else: - offspring[offspring_idx, gene_idx] = self.gene_type[0](value_from_space) - else: - if not self.gene_type[gene_idx][1] is None: - offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[gene_idx][0](value_from_space), - self.gene_type[gene_idx][1]) + value_from_space = self.mutation_process_gene_value(solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) + + # Assigning the selected value from the space to the gene. + if self.gene_type_single == True: + if not self.gene_type[1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[0](value_from_space), + self.gene_type[1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[0](value_from_space) else: - offspring[offspring_idx, gene_idx] = self.gene_type[gene_idx][0](value_from_space) - - if self.allow_duplicate_genes == False: - offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space(solution=offspring[offspring_idx], - gene_type=self.gene_type, - sample_size=self.sample_size, - mutation_by_replacement=self.mutation_by_replacement, - build_initial_pop=False) + if not self.gene_type[gene_idx][1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[gene_idx][0](value_from_space), + self.gene_type[gene_idx][1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[gene_idx][0](value_from_space) + + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space(solution=offspring[offspring_idx], + gene_type=self.gene_type, + sample_size=self.sample_size, + mutation_by_replacement=self.mutation_by_replacement, + build_initial_pop=False) return offspring def adaptive_mutation_randomly(self, offspring): @@ -602,27 +767,35 @@ def adaptive_mutation_randomly(self, offspring): else: adaptive_mutation_num_genes = self.mutation_num_genes[1] - mutation_indices = numpy.array(random.sample(range(0, self.num_genes), adaptive_mutation_num_genes)) - for gene_idx in mutation_indices: + if self.gene_structure is None: + mutation_indices = numpy.array(random.sample(range(0, self.num_genes), adaptive_mutation_num_genes)) + logical_to_flat = [(idx, idx + 1) for idx in mutation_indices] + else: + num_logical_genes = len(self.gene_structure) + mutation_indices = numpy.array(random.sample(range(0, num_logical_genes), adaptive_mutation_num_genes)) + logical_to_flat = [(self.boundaries[i], self.boundaries[i+1]) for i in mutation_indices] - range_min, range_max = self.get_random_mutation_range(gene_idx) + for start, end in logical_to_flat: + for gene_idx in range(start, end): - # Generate a random value fpr mutation that meet the gene constraint if exists. - random_value = self.mutation_process_gene_value(range_min=range_min, - range_max=range_max, - solution=offspring[offspring_idx], - gene_idx=gene_idx, - sample_size=self.sample_size) + range_min, range_max = self.get_random_mutation_range(gene_idx) - offspring[offspring_idx, gene_idx] = random_value + # Generate a random value fpr mutation that meet the gene constraint if exists. + random_value = self.mutation_process_gene_value(range_min=range_min, + range_max=range_max, + solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) - if self.allow_duplicate_genes == False: - offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], - min_val=range_min, - max_val=range_max, - mutation_by_replacement=self.mutation_by_replacement, - gene_type=self.gene_type, - sample_size=self.sample_size) + offspring[offspring_idx, gene_idx] = random_value + + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], + min_val=range_min, + max_val=range_max, + mutation_by_replacement=self.mutation_by_replacement, + gene_type=self.gene_type, + sample_size=self.sample_size) return offspring def adaptive_mutation_probs_by_space(self, offspring): @@ -668,35 +841,67 @@ def adaptive_mutation_probs_by_space(self, offspring): else: adaptive_mutation_probability = self.mutation_probability[1] - probs = numpy.random.random(size=offspring.shape[1]) - for gene_idx in range(offspring.shape[1]): + if self.gene_structure is None: + probs = numpy.random.random(size=offspring.shape[1]) + for gene_idx in range(offspring.shape[1]): - if probs[gene_idx] <= adaptive_mutation_probability: + if probs[gene_idx] <= adaptive_mutation_probability: - value_from_space = self.mutation_process_gene_value(solution=offspring[offspring_idx], - gene_idx=gene_idx, - sample_size=self.sample_size) + value_from_space = self.mutation_process_gene_value(solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) - # Assigning the selected value from the space to the gene. - if self.gene_type_single == True: - if not self.gene_type[1] is None: - offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[0](value_from_space), - self.gene_type[1]) - else: - offspring[offspring_idx, gene_idx] = self.gene_type[0](value_from_space) - else: - if not self.gene_type[gene_idx][1] is None: - offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[gene_idx][0](value_from_space), - self.gene_type[gene_idx][1]) + # Assigning the selected value from the space to the gene. + if self.gene_type_single == True: + if not self.gene_type[1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[0](value_from_space), + self.gene_type[1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[0](value_from_space) else: - offspring[offspring_idx, gene_idx] = self.gene_type[gene_idx][0](value_from_space) - - if self.allow_duplicate_genes == False: - offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space(solution=offspring[offspring_idx], - gene_type=self.gene_type, - sample_size=self.sample_size, - mutation_by_replacement=self.mutation_by_replacement, - build_initial_pop=False) + if not self.gene_type[gene_idx][1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[gene_idx][0](value_from_space), + self.gene_type[gene_idx][1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[gene_idx][0](value_from_space) + + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space(solution=offspring[offspring_idx], + gene_type=self.gene_type, + sample_size=self.sample_size, + mutation_by_replacement=self.mutation_by_replacement, + build_initial_pop=False) + else: + num_logical_genes = len(self.gene_structure) + probs = numpy.random.random(size=num_logical_genes) + for logical_idx in range(num_logical_genes): + if probs[logical_idx] <= adaptive_mutation_probability: + start, end = self.boundaries[logical_idx], self.boundaries[logical_idx+1] + for gene_idx in range(start, end): + value_from_space = self.mutation_process_gene_value(solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) + + # Assigning the selected value from the space to the gene. + if self.gene_type_single == True: + if not self.gene_type[1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[0](value_from_space), + self.gene_type[1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[0](value_from_space) + else: + if not self.gene_type[gene_idx][1] is None: + offspring[offspring_idx, gene_idx] = numpy.round(self.gene_type[gene_idx][0](value_from_space), + self.gene_type[gene_idx][1]) + else: + offspring[offspring_idx, gene_idx] = self.gene_type[gene_idx][0](value_from_space) + + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_by_space(solution=offspring[offspring_idx], + gene_type=self.gene_type, + sample_size=self.sample_size, + mutation_by_replacement=self.mutation_by_replacement, + build_initial_pop=False) return offspring def adaptive_mutation_probs_randomly(self, offspring): @@ -740,26 +945,52 @@ def adaptive_mutation_probs_randomly(self, offspring): else: adaptive_mutation_probability = self.mutation_probability[1] - probs = numpy.random.random(size=offspring.shape[1]) - for gene_idx in range(offspring.shape[1]): + if self.gene_structure is None: + probs = numpy.random.random(size=offspring.shape[1]) + for gene_idx in range(offspring.shape[1]): - range_min, range_max = self.get_random_mutation_range(gene_idx) + range_min, range_max = self.get_random_mutation_range(gene_idx) - if probs[gene_idx] <= adaptive_mutation_probability: - # Generate a random value fpr mutation that meet the gene constraint if exists. - random_value = self.mutation_process_gene_value(range_min=range_min, - range_max=range_max, - solution=offspring[offspring_idx], - gene_idx=gene_idx, - sample_size=self.sample_size) + if probs[gene_idx] <= adaptive_mutation_probability: + # Generate a random value fpr mutation that meet the gene constraint if exists. + random_value = self.mutation_process_gene_value(range_min=range_min, + range_max=range_max, + solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) - offspring[offspring_idx, gene_idx] = random_value + offspring[offspring_idx, gene_idx] = random_value - if self.allow_duplicate_genes == False: - offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], - min_val=range_min, - max_val=range_max, - mutation_by_replacement=self.mutation_by_replacement, - gene_type=self.gene_type, - sample_size=self.sample_size) + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], + min_val=range_min, + max_val=range_max, + mutation_by_replacement=self.mutation_by_replacement, + gene_type=self.gene_type, + sample_size=self.sample_size) + else: + num_logical_genes = len(self.gene_structure) + probs = numpy.random.random(size=num_logical_genes) + for logical_idx in range(num_logical_genes): + if probs[logical_idx] <= adaptive_mutation_probability: + start, end = self.boundaries[logical_idx], self.boundaries[logical_idx+1] + for gene_idx in range(start, end): + range_min, range_max = self.get_random_mutation_range(gene_idx) + + # Generate a random value fpr mutation that meet the gene constraint if exists. + random_value = self.mutation_process_gene_value(range_min=range_min, + range_max=range_max, + solution=offspring[offspring_idx], + gene_idx=gene_idx, + sample_size=self.sample_size) + + offspring[offspring_idx, gene_idx] = random_value + + if self.allow_duplicate_genes == False: + offspring[offspring_idx], _, _ = self.solve_duplicate_genes_randomly(solution=offspring[offspring_idx], + min_val=range_min, + max_val=range_max, + mutation_by_replacement=self.mutation_by_replacement, + gene_type=self.gene_type, + sample_size=self.sample_size) return offspring diff --git a/reproduce_atomicity.py b/reproduce_atomicity.py new file mode 100644 index 0000000..0f19d32 --- /dev/null +++ b/reproduce_atomicity.py @@ -0,0 +1,113 @@ +import numpy +import pygad + +def get_mutation_indices(original, mutated): + """ + Returns the indices where original and mutated differ. + """ + return numpy.where(original != mutated)[0] + +def test_mutation_by_space_atomicity(): + print("Testing mutation_by_space atomicity...") + + # 1. Setup Gene Structure + # Let's define a structure: [2, 1, 3] + # Total genes = 6 + # Block 0: indices [0, 1] + # Block 1: indices [2] + # Block 2: indices [3, 4, 5] + gene_structure = [2, 1, 3] + num_genes = sum(gene_structure) + + # Gene space (required for mutation_by_space) + # Using simple list for all genes + gene_space = [0, 1] + + # Initial Population (all zeros) + initial_population = numpy.zeros((1, num_genes)) + + # Initialize GA + ga_instance = pygad.GA(num_generations=1, + num_parents_mating=1, + fitness_func=lambda *args: 0, # Dummy fitness + initial_population=initial_population, + gene_type=int, + gene_structure=gene_structure, + gene_space=gene_space, + mutation_type="random", + mutation_probability=None, # Force use of mutation_num_genes + mutation_num_genes=1, # Mutate 1 logical block + random_seed=42) + + # Manually invoke mutation_by_space to isolate it + # We pass the population directly + offspring = ga_instance.population.copy() + + # FORCE mutation_by_space call (normally called internally if gene_space is present) + mutated_offspring = ga_instance.mutation_by_space(offspring) + + original_gene = ga_instance.population[0] + new_gene = mutated_offspring[0] + + diff_indices = get_mutation_indices(original_gene, new_gene) + print(f"Original: {original_gene}") + print(f"Mutated: {new_gene}") + print(f"Diff Indices: {diff_indices}") + + # Check if Atomicity is Respected + # If index 0 is mutated, index 1 MUST also be mutated (Block 0) + # If index 2 is mutated, only index 2 should be mutated (Block 1) + # If index 3 is mutated, 4 and 5 MUST also be mutated (Block 2) + + boundaries = [0, 2, 3, 6] # Derived from [2, 1, 3] + + # Group diff indices into blocks + mutated_blocks = set() + for idx in diff_indices: + # Find which block this idx belongs to + block_id = -1 + for i in range(len(boundaries)-1): + if boundaries[i] <= idx < boundaries[i+1]: + block_id = i + break + mutated_blocks.add(block_id) + + print(f"Mutated Blocks: {mutated_blocks}") + + # Verify that for each mutated block, ALL its genes are mutated + # Note: In this specific test setup with gene_space=[0, 1] and initial=0, + # mutation *might* pick 0 again, so a gene might technically "mutate" to the same value. + # However, since we are using random.sample for indices selection in mutation_by_space, + # we mainly want to ensure the LOOP iterates over the full block. + # But inspecting the values is the only way to see the result. + # To strictly verify, we should check if the CODE iterates correctly. + # But as black-box, we check if multiple genes changed in multi-gene blocks. + + success = True + for block_id in mutated_blocks: + start, end = boundaries[block_id], boundaries[block_id+1] + block_indices = range(start, end) + + # Check if ALL indices in this block are in diff_indices? + # NOT necessarily true if mutation picked the same value by chance. + # But with 0/1 space and 0 init, 50% chance to change. + # Let's just check if we observe partial block mutation which would be a BUG. + # Actually, if the code loops structure, it performs mutation for each. + + # A stronger check: logic trace. + # But for now let's assert that IF we have >1 diff indices, they belong to the expected blocks. + pass + + if len(mutated_blocks) == 1: + print("PASS: Exactly one logical block expected to be mutated.") + elif len(mutated_blocks) == 0: + print("WARN: No mutations observed (chance?). Run again.") + else: + print(f"FAIL: Expected 1 block mutated, got {len(mutated_blocks)}") + + # Check for split blocks (this is the real test of atomicity failure) + # If 0 is in diff but 1 is NOT, and we know 1 COULD have changed, it's suspicious but not proof (chance). + # Proof is if we run this many times. + +if __name__ == "__main__": + test_mutation_by_space_atomicity() diff --git a/tests/test_crossover_atomicity_comprehensive.py b/tests/test_crossover_atomicity_comprehensive.py new file mode 100644 index 0000000..086caf5 --- /dev/null +++ b/tests/test_crossover_atomicity_comprehensive.py @@ -0,0 +1,175 @@ +import unittest +import numpy +import pygad + +def fitness_func(ga_instance, solution, solution_idx): + return numpy.sum(solution) + +class TestCrossoverAtomicity(unittest.TestCase): + def setUp(self): + # Define a gene structure: [2, 1, 3] + # Logical Block 0: indices 0, 1 (length 2) + # Logical Block 1: index 2 (length 1) + # Logical Block 2: indices 3, 4, 5 (length 3) + # Total genes: 6 + # Valid boundaries: 0, 2, 3, 6 + self.gene_structure = [2, 1, 3] + self.num_genes = sum(self.gene_structure) + self.suppress_warnings = True + + def test_single_point_crossover_atomicity(self): + """ + Verify that single point crossover only cuts at logical boundaries. + """ + # Parents: + # P1: [0, 0, 0, 0, 0, 0] + # P2: [1, 1, 1, 1, 1, 1] + parent1 = numpy.zeros(self.num_genes) + parent2 = numpy.ones(self.num_genes) + parents = numpy.array([parent1, parent2]) + + ga = pygad.GA(num_generations=1, + num_parents_mating=2, + fitness_func=fitness_func, + initial_population=parents.copy(), # Not used for crossover call directly but needed for init + gene_structure=self.gene_structure, + crossover_type="single_point", + crossover_probability=1.0, # Force crossover + suppress_warnings=self.suppress_warnings) + + # We need to simulate the crossover call manually or inspect offspring + # pygad.GA.crossover method takes parents and offspring_size + offspring_size = (100, self.num_genes) # Generate many offspring to check all possible cuts + + # We must access the crossover method. It is part of the GA instance mixed in? + # Actually it's in ga.crossover(...) but that's the high level loop. + # The specific methods are methods of the class if mixed in. + # Let's call the specific crossover function if valid, or run a generation. + # Easier to call method directly: ga.single_point_crossover(parents, offspring_size) + + offspring = ga.single_point_crossover(parents, offspring_size) + + # Verify EACH offspring + # A valid offspring must look like: [0...0 1...1] or [1...1 0...0] (if parents order mixed) + # The transition point MUST be at index 2 or 3 (0 and 6 are trivial). + + valid_boundaries = {0, 2, 3, 6} + + for child in offspring: + # Find transition points + # Diff array: non-zero where value changes + diffs = numpy.diff(child) + changes = numpy.where(diffs != 0)[0] + + # changes gives index i where child[i] != child[i+1]. + # So the cut is after i. The boundary is i+1. + # Example: [0, 0, 1, 1, 1, 1] -> diff at index 1 (value 0!=1). Cut at 2. + + cut_indices = changes + 1 + + for cut in cut_indices: + self.assertIn(cut, valid_boundaries, f"Invalid cut at index {cut} for child {child}") + + def test_two_points_crossover_atomicity(self): + """ + Verify that two points crossover only cuts at logical boundaries. + """ + parent1 = numpy.zeros(self.num_genes) + parent2 = numpy.ones(self.num_genes) + parents = numpy.array([parent1, parent2]) + + ga = pygad.GA(num_generations=1, + num_parents_mating=2, + fitness_func=fitness_func, + initial_population=parents.copy(), + gene_structure=self.gene_structure, + crossover_type="two_points", + crossover_probability=1.0, + suppress_warnings=self.suppress_warnings) + + offspring_size = (100, self.num_genes) + offspring = ga.two_points_crossover(parents, offspring_size) + + valid_boundaries = {0, 2, 3, 6} + + for child in offspring: + diffs = numpy.diff(child) + changes = numpy.where(diffs != 0)[0] + cut_indices = changes + 1 + + for cut in cut_indices: + self.assertIn(cut, valid_boundaries, f"Invalid cut at index {cut} for child {child}") + + def test_uniform_crossover_atomicity(self): + """ + Verify that uniform crossover respects block integrity (all genes in a block are from same parent). + """ + # Parents with distinct values to identify inheritance + # Block 0 (0-2): [0, 0] vs [10, 10] + # Block 1 (2-3): [1, 1] vs [11, 11] + # Block 2 (3-6): [2, 2, 2] vs [12, 12, 12] + + # P1: [0, 0, 1, 2, 2, 2] + # P2: [10, 10, 11, 12, 12, 12] + + # To make it easier: + # P1: [0, 0, 0, 0, 0, 0] + # P2: [1, 1, 1, 1, 1, 1] + # Is enough to check consistency. + + parent1 = numpy.zeros(self.num_genes) + parent2 = numpy.ones(self.num_genes) + parents = numpy.array([parent1, parent2]) + + ga = pygad.GA(num_generations=1, + num_parents_mating=2, + fitness_func=fitness_func, + initial_population=parents.copy(), + gene_structure=self.gene_structure, + crossover_type="uniform", + crossover_probability=1.0, + suppress_warnings=self.suppress_warnings) + + offspring_size = (100, self.num_genes) + offspring = ga.uniform_crossover(parents, offspring_size) + + for child in offspring: + # Check Block 0: Indices 0, 1 must be EQUAL + self.assertEqual(child[0], child[1], f"Block 0 split! {child}") + + # Check Block 1: Index 2 (len 1, always valid) + pass + + # Check Block 2: Indices 3, 4, 5 must be EQUAL + self.assertEqual(child[3], child[4], f"Block 2 split! {child}") + self.assertEqual(child[4], child[5], f"Block 2 split! {child}") + + def test_scattered_crossover_atomicity(self): + """ + Verify that scattered crossover (same logic as uniform usually) respects block integrity. + """ + parent1 = numpy.zeros(self.num_genes) + parent2 = numpy.ones(self.num_genes) + parents = numpy.array([parent1, parent2]) + + ga = pygad.GA(num_generations=1, + num_parents_mating=2, + fitness_func=fitness_func, + initial_population=parents.copy(), + gene_structure=self.gene_structure, + crossover_type="scattered", + crossover_probability=1.0, + suppress_warnings=self.suppress_warnings) + + offspring_size = (100, self.num_genes) + offspring = ga.scattered_crossover(parents, offspring_size) + + for child in offspring: + # Check Block 0: Indices 0, 1 must be EQUAL + self.assertEqual(child[0], child[1], f"Block 0 split! {child}") + # Check Block 2: Indices 3, 4, 5 must be EQUAL + self.assertEqual(child[3], child[4], f"Block 2 split! {child}") + self.assertEqual(child[4], child[5], f"Block 2 split! {child}") + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_integration_gene_structure.py b/tests/test_integration_gene_structure.py new file mode 100644 index 0000000..3cbc4cd --- /dev/null +++ b/tests/test_integration_gene_structure.py @@ -0,0 +1,50 @@ +import pygad +import numpy + +def fitness_func(ga_instance, solution, solution_idx): + # simple fitness: sum of genes + return numpy.sum(solution) + +def test_integration_run(): + print("Starting Integration Test for Gene Structure Atomicity...") + + gene_structure = [2, 3, 1] + # Total genes = 6 + + # Define combinations to test + mutation_types = ["random", "inversion", "scramble"] + # excluded: "swap" (disabled) + + crossover_types = ["single_point", "two_points", "uniform", "scattered"] + + for mut_type in mutation_types: + for cross_type in crossover_types: + print(f"\nTesting Combination: Mutation='{mut_type}', Crossover='{cross_type}'") + + try: + ga_instance = pygad.GA(num_generations=5, + num_parents_mating=2, + fitness_func=fitness_func, + sol_per_pop=4, + num_genes=sum(gene_structure), + gene_structure=gene_structure, + mutation_type=mut_type, + crossover_type=cross_type, + # Ensure mutation actually happens reasonably often + mutation_probability=0.5, + crossover_probability=0.8, + suppress_warnings=True) + + ga_instance.run() + + best_sol, best_fit, _ = ga_instance.best_solution() + print(f" > Success! Best Fitness: {best_fit}") + + except Exception as e: + print(f" > FAILED with error: {e}") + raise e + + print("\nIntegration Test Completed Successfully.") + +if __name__ == '__main__': + test_integration_run() diff --git a/tests/test_mutation_atomicity_comprehensive.py b/tests/test_mutation_atomicity_comprehensive.py new file mode 100644 index 0000000..300eb50 --- /dev/null +++ b/tests/test_mutation_atomicity_comprehensive.py @@ -0,0 +1,171 @@ +import unittest +import numpy +import random + +import pygad + +# Helper function to find mutated indices +def get_mutated_indices(original, mutated): + return numpy.where(original != mutated)[0] + +# Dummy fitness function +def fitness_func(ga_instance, solution, solution_idx): + return 0 + +class TestMutationAtomicity(unittest.TestCase): + + def setUp(self): + # Structure: [2, 1, 3] -> 6 genes + # Blocks: [0, 1], [2], [3, 4, 5] + self.gene_structure = [2, 1, 3] + self.num_genes = sum(self.gene_structure) + self.boundaries = [0, 2, 3, 6] + self.initial_pop = numpy.zeros((1, self.num_genes)) + self.gene_space = [0, 1] + + # Suppress warnings + self.suppress_warnings = True + + def test_mutation_randomly_atomicity(self): + # Mutation by randomly (value changing) + ga = pygad.GA(num_generations=1, + num_parents_mating=1, + fitness_func=fitness_func, + initial_population=self.initial_pop.copy(), + gene_structure=self.gene_structure, + mutation_type="random", + mutation_num_genes=1, # 1 BLOCK + random_mutation_min_val=1, + random_mutation_max_val=2, + suppress_warnings=self.suppress_warnings) + + # Determine logical block count + # mutation_num_genes=1 means 1 logical block should be mutated. + + # Run mutation manually + ga.mutation_randomly(ga.population) + + mutated_indices = get_mutated_indices(self.initial_pop[0], ga.population[0]) + self.assertTrue(len(mutated_indices) > 0, "No mutation occurred") + + # Check if indices belong to a single block + blocks_mutated = set() + for idx in mutated_indices: + for b_i in range(len(self.boundaries)-1): + if self.boundaries[b_i] <= idx < self.boundaries[b_i+1]: + blocks_mutated.add(b_i) + + self.assertEqual(len(blocks_mutated), 1, f"Mutation should affect exactly 1 block, affected: {blocks_mutated}") + + # Verify ALL genes in that block are mutated? + # mutation_randomly loop: for gene_idx in range(start, end). + # Yes, it iterates all genes in the block and assigns random value. + block_idx = list(blocks_mutated)[0] + start, end = self.boundaries[block_idx], self.boundaries[block_idx+1] + + mutated_all = True + for i in range(start, end): + if ga.population[0, i] == 0: # Assuming 0 was initial + # mutation range is 1 to 2, so it MUST change from 0. + mutated_all = False + + self.assertTrue(mutated_all, f"All genes in block {block_idx} should be mutated") + + def test_inversion_mutation_atomicity(self): + # Setup population: [0, 1, 2, 3, 4, 5] + pop = numpy.array([numpy.arange(self.num_genes, dtype=float)]) + + ga = pygad.GA(num_generations=1, + num_parents_mating=1, + fitness_func=fitness_func, + initial_population=pop, + gene_structure=self.gene_structure, + mutation_type="inversion", + suppress_warnings=self.suppress_warnings) + + # Force a mutation + # inversion selects 2 blocks and reverses them. + # Since we have only 3 blocks, possibilities: + # Blocks [0, 1], [0, 1, 2], [1, 2]. + + # Mock random so we know what happens? Or just analyze result. + print(f"\nOriginal Gene: {pop[0]}") + ga.inversion_mutation(ga.population) + + new_pop = ga.population[0] + print(f"Mutated Gene: {new_pop}") + + if numpy.array_equal(pop[0], new_pop): + # It's possible random selection picked nothing or single-element blocks? + # With intra-block inversion, [2] inverted is [2]. [0, 1] inverted is [1, 0]. + # [3, 4, 5] inverted is [5, 4, 3]. + # If nothing changed, it means probably only single-element blocks were selected OR no blocks selected? + print("Inversion did not change (maybe selected blocks were size 1?)") + + # Check INTRA-BLOCK integrity + # With new logic, blocks should STAY IN PLACE but their contents might be flipped. + # Let's verify that boundaries are respected: i.e. Block A is still at index 0-2 (but maybe inverted). + + # Original: [0, 1], [2], [3, 4, 5] + # Mutated: [x, y], [z], [a, b, c] + + # Verify Block 0 (Index 0-2) + b0_new = new_pop[0:2] + b0_orig = pop[0, 0:2] + # It must be either [0, 1] OR [1, 0] + valid_b0 = numpy.array_equal(b0_new, b0_orig) or numpy.array_equal(b0_new, b0_orig[::-1]) + self.assertTrue(valid_b0, f"Block 0 corrupted: {b0_new}") + + # Verify Block 1 (Index 2-3) -> Length 1, always same + b1_new = new_pop[2:3] + b1_orig = pop[0, 2:3] + self.assertTrue(numpy.array_equal(b1_new, b1_orig), f"Block 1 corrupted: {b1_new}") + + # Verify Block 2 (Index 3-6) + b2_new = new_pop[3:6] + b2_orig = pop[0, 3:6] + # It must be either [3, 4, 5] OR [5, 4, 3] + valid_b2 = numpy.array_equal(b2_new, b2_orig) or numpy.array_equal(b2_new, b2_orig[::-1]) + self.assertTrue(valid_b2, f"Block 2 corrupted: {b2_new}") + + # Check if ANY mutation happened (excluding length 1 blocks) + changed = not numpy.array_equal(pop[0], new_pop) + if not changed: + print("Note: Random selection resulted in no effective change (maybe only Block 1 selected?)") + def test_scramble_mutation_atomicity(self): + # Setup population: [0, 1, 2, 3, 4, 5] + pop = numpy.array([numpy.arange(self.num_genes, dtype=float)]) + + ga = pygad.GA(num_generations=1, + num_parents_mating=1, + fitness_func=fitness_func, + initial_population=pop, + gene_structure=self.gene_structure, + mutation_type="scramble", + suppress_warnings=self.suppress_warnings) + + # Scramble now does Intra-Block Scramble + print(f"\nOriginal Gene (Scramble): {pop[0]}") + ga.scramble_mutation(ga.population) + new_pop = ga.population[0] + print(f"Mutated Gene (Scramble): {new_pop}") + + # Verify block integrity (Boundaries respected) + # Block 0 [0, 1] -> should still contain {0, 1} but order might change + b0 = new_pop[0:2] + self.assertTrue(set(b0) == {0, 1}, f"Block 0 content corrupted: {b0}") + + # Block 1 [2] -> should be {2} + b1 = new_pop[2:3] + self.assertTrue(set(b1) == {2}, f"Block 1 content corrupted: {b1}") + + # Block 2 [3, 4, 5] -> should be {3, 4, 5} + b2 = new_pop[3:6] + self.assertTrue(set(b2) == {3, 4, 5}, f"Block 2 content corrupted: {b2}") + + # Check if change occurred (might not if random shuffle returns same order, but likely) + if numpy.array_equal(pop[0], new_pop): + print("Scramble resulted in no change (possible with small blocks/bad luck)") + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_reproduce_atomicity.py b/tests/test_reproduce_atomicity.py new file mode 100644 index 0000000..188b4ae --- /dev/null +++ b/tests/test_reproduce_atomicity.py @@ -0,0 +1,111 @@ +import numpy +import pygad + +def get_mutation_indices(original, mutated): + """ + Returns the indices where original and mutated differ. + """ + return numpy.where(original != mutated)[0] + +# Proper fitness function with 3 arguments +def fitness_func(ga_instance, solution, solution_idx): + return 0 + +def test_mutation_by_space_atomicity(): + print("Testing mutation_by_space atomicity...") + + # 1. Setup Gene Structure + # Let's define a structure: [2, 1, 3] + # Total genes = 6 + # Block 0: indices [0, 1] + # Block 1: indices [2] + # Block 2: indices [3, 4, 5] + gene_structure = [2, 1, 3] + num_genes = sum(gene_structure) + + # Gene space (required for mutation_by_space) + # Using simple list for all genes + gene_space = [0, 1] + + # Initial Population (all zeros) + initial_population = numpy.zeros((1, num_genes)) + + # Initialize GA + # We suppress warnings to avoid clutter + ga_instance = pygad.GA(num_generations=1, + num_parents_mating=1, + fitness_func=fitness_func, # Correct signature + initial_population=initial_population, + gene_type=float, # Use float to avoid precision issues + gene_structure=gene_structure, + gene_space=gene_space, + mutation_type="random", + mutation_probability=None, # Force use of mutation_num_genes + mutation_num_genes=1, # Mutate 1 logical block + random_seed=42, # Fixed seed for reproducibility + suppress_warnings=True) + + # Manually invoke mutation_by_space to isolate it + # We pass the population directly + offspring = ga_instance.population.copy() + + # FORCE mutation_by_space call + # Note: mutation_by_space is an instance method + mutated_offspring = ga_instance.mutation_by_space(offspring) + + original_gene = ga_instance.population[0] + new_gene = mutated_offspring[0] + + diff_indices = get_mutation_indices(original_gene, new_gene) + print(f"Original: {original_gene}") + print(f"Mutated: {new_gene}") + print(f"Diff Indices: {diff_indices}") + + # Check if Atomicity is Respected + # If index 0 is mutated, index 1 MUST also be mutated (Block 0) + # If index 2 is mutated, only index 2 should be mutated (Block 1) + # If index 3 is mutated, 4 and 5 MUST also be mutated (Block 2) + + boundaries = [0, 2, 3, 6] # Derived from [2, 1, 3] + + # Group diff indices into blocks + mutated_blocks = set() + for idx in diff_indices: + # Find which block this idx belongs to + block_id = -1 + for i in range(len(boundaries)-1): + if boundaries[i] <= idx < boundaries[i+1]: + block_id = i + break + mutated_blocks.add(block_id) + + print(f"Mutated Blocks IDs: {mutated_blocks}") + + if not mutated_blocks: + print("WARN: No genes changed value (chance?).") + return + + fail = False + for block_id in mutated_blocks: + start, end = boundaries[block_id], boundaries[block_id+1] + + # Verify that ALL expected indices for this block have changed? + # IMPORTANT: mutation might randomly pick the SAME value as original (0 -> 0). + # But for Block 2 (size 3), probability of ALL 3 picking 0 is low (0.5^3 = 0.125). + # We can't strictly assert values change, but we can verify that NO OTHER block was touched partialy. + # Actually, simpler check: + # Since we set mutation_num_genes=1, ONLY ONE block should be in mutated_blocks. + # If we see indices from multiple blocks, it's a fail (unless we got lucky and multiple blocks were selected? No, we set num_genes=1). + pass + + if len(mutated_blocks) > 1: + print(f"FAIL: Expected 1 block mutated, but genes from {len(mutated_blocks)} blocks changed: {mutated_blocks}") + fail = True + + # Check for partial mutation in the mutated block? + # Hard to proof unless we use a gene space that GUARANTEES change (e.g. init 0, space [1]). + + print("Test finished.") + +if __name__ == "__main__": + test_mutation_by_space_atomicity()