Source code for evovaq.DifferentialEvolution

import random
import numpy as np
from evovaq.tools.support import compute_statistics, print_info, Logbook, set_progress_bar, BestIndividualTracker, \
    FinalResult
from typing import Union
from evovaq.problem import Problem


[docs]class DE(object): """ Differential Evolution (DE) is a parallel direct search method designed for minimizing possibly nonlinear and non-differentiable continuous space functions [1]. Starting from an initial random population, the evolutionary cycle that creates a new population for the next generation consists of an appropriate perturbation of each individual. In detail, for each target individual, a mutant individual is generated by adding a weighted difference between two or four randomly chosen individuals to a third individual selected as best or randomly. At this point, a uniform crossover is applied between the target and the mutant to obtain the trial individual. Lastly, the trial individual is compared to the target individual to determine the best member of the next generation [2]. References: [1] R. Storn and K. Price, “Differential evolution–a simple and efficient heuristic for global optimization over continuous spaces,” Journal of global optimization, vol. 11, no. 4, pp. 341–359, 1997. [2] Giovanni Acampora, Angela Chiatto, and Autilia Vitiello, "A comparison of evolutionary algorithms for training variational quantum classifiers", Proceedings of 2023 IEEE Congress on Evolutionary Computation (CEC), pp. 1–8, 2023. Args: variant: The variants are denoted as <selection>/<n_diffs>/<crossover>, where <selection> is `best` or `rand`, <n_diffs> is `1` or `2`, and <crossover> is `bin` or `exp`. differential_weight: Differential weight is a hyperparameter that controls the amplification of the individuals difference used to create the mutant. If float, it must be in the range [0, 2]. If ``(min, max)`` tuple, dithering is employed. Dithering randomly changes the differential weight in each generation so that it can help speed convergence significantly. CR: The probability that a parameter of the trial individual is replaced with the corresponding one of the mutant individual. """ def __init__(self, variant: str = 'best/1/bin', differential_weight: Union[float, tuple] = 0.8, CR: float = 0.9): self.variant = variant self.differential_weight = differential_weight self.CR = CR
[docs] def mut_de(self, population: np.ndarray, fitness: np.ndarray, target_idx: int, F: float) -> np.ndarray: """ Create the mutant individual. Args: population: A population of possible solutions as array of real parameters with (`pop_size`, `n_params`) shape. fitness: A set of fitness values associated to the population as array of real values with (`pop_size`,) shape. target_idx: Index of the target individual in the population. F: Differential weight. Returns: Mutant individual as array of real parameters with (`n_params`,) shape. """ indices = np.arange(len(population)) if self.variant.split('/')[0].lower() == 'rand': mutant_idx = np.random.choice(indices[indices != target_idx]) elif self.variant.split('/')[0].lower() == 'best': mutant_idx = np.argsort(fitness)[0] else: raise ValueError("Please select a valid mutation strategy, i.e. 'best' or 'rand'") mutant = population[mutant_idx].copy() diff_ind_idx = np.random.choice(indices[~np.isin(indices, [target_idx, mutant_idx])], size=int(self.variant.split('/')[1]) * 2, replace=False) if self.variant.split('/')[1] == '1': mutated = mutant + F * (population[diff_ind_idx[0]].copy() - population[diff_ind_idx[1]].copy()) elif self.variant.split('/')[1] == '2': mutated = mutant + F * (population[diff_ind_idx[0]].copy() + population[diff_ind_idx[1]].copy() - population[diff_ind_idx[2]].copy() - population[diff_ind_idx[3]].copy()) else: raise ValueError("Please select a valid mutation strategy, i.e. '1' or '2'") return mutated
[docs] def cx_de(self, target: np.ndarray, mutated: np.ndarray) -> np.ndarray: """ Mate the target and mutant individual. Args: target: Target individual as array of real parameters with (`n_params`,) shape. mutated: Mutant individual as array of real parameters with (`n_params`,) shape. Returns: Trial individual as array of real parameters with (`n_params`,) shape. """ variant_type = self.variant.split('/')[-1].lower() trial = None if variant_type == 'bin': R = np.random.choice(len(target)) pr = np.random.random(size=len(target)) trial = np.array([mutated[i] if pr[i] < self.CR or i == R else target[i] for i in range(len(target))]) elif variant_type == 'exp': print("'exp' crossover is under construction") else: raise ValueError("Please select a valid crossover strategy, i.e. 'bin' or 'exp'") return trial
[docs] def evolve_population( self, problem: Problem, population: np.ndarray, fitness: np.ndarray) -> tuple[np.ndarray, np.ndarray, int]: """ Evolve the population by means of stochastic operators. Args: problem : :class:`~.Problem` to be solved. population: A population of individuals as array of real parameters with (`pop_size`, `n_params`) shape. fitness: A set of fitness values associated to the population as array of real values with (`pop_size`,) shape. Returns: The offspring and fitness values obtained after evolution, and number of fitness evaluations completed during the evolution. """ # Check the dithering if isinstance(self.differential_weight, tuple): _min, _max = sorted(self.differential_weight) F = random.uniform(_min, _max) elif isinstance(self.differential_weight, float): F = self.differential_weight else: raise ValueError('The differential_weight must be a float in ' '(0, 2), or specified as a tuple(min, max)' ' where min < max and min, max are in (0, 2).') # Clone the individuals to produce the offspring offspring = population.copy() fit_offspring = fitness.copy() for target_idx, child in enumerate(offspring): # Mutation step mutated = self.mut_de(population, fitness, target_idx, F) # Crossover step trial = self.cx_de(child, mutated) # Check the bounds of the parameters trial[:] = problem.check_bounds(trial) # Selection step fit_trial = problem.evaluate_fitness(trial) if fit_trial < fit_offspring[target_idx]: child[:] = trial fit_offspring[target_idx] = fit_trial nfev = len(offspring) return offspring, fit_offspring, nfev
[docs] def optimize( self, problem: Problem, pop_size: int, initial_pop: Union[np.ndarray, None] = None, max_nfev: Union[int, None] = None, max_gen: int = 1000, n_run: int = 1, seed: Union[int, float, str, None] = None, verbose: bool = True) -> FinalResult: """ Optimize the parameters of the problem to be solved. Args: problem: :class:`~.Problem` to be solved. pop_size: Population size. initial_pop: Initial population of possible solutions as array of real parameters with (`pop_size`, `n_params`) shape. If None, the initial population is randomly generated from `param_bounds`. max_nfev: Maximum number of fitness evaluations used as stopping criterion. If None, the maximum number of generations `max_gen` is considered as stopping criterion. max_gen: Maximum number of generations used as stopping criterion. If `max_nfev` is not None, this is considered as stopping criterion. n_run: Independent execution number of the algorithm. seed: Initialize the random number generator. If None, the current time is used. verbose: If True, the statistics of fitness values is printed during the evolution. Returns: A :class:`~.FinalResult` containing the optimization result. """ # Set the seed if seed is not None: np.random.seed(seed) random.seed(seed) # Create and initialize the population if initial_pop is None: population = problem.generate_random_pop(pop_size) fitness = np.array(list(map(problem.evaluate_fitness, population))) nfev = len(population) tot_nfev = len(population) else: population = initial_pop[:].copy() fitness = np.array(list(map(problem.evaluate_fitness, population))) nfev = len(population) tot_nfev = len(population) # Store the best solution ever found best_tracker = BestIndividualTracker() best_tracker.update(population, fitness) # Set the progress bar considering the stopping criterion pbar = set_progress_bar(max_gen, max_nfev) if max_nfev is not None: pbar.update(nfev) # Compute the statistics of the fitness values stats = compute_statistics(fitness) # Set the logbook lg = Logbook() lg.record(gen=0, nfev=nfev, **stats) # Print the evolution info if verbose: print_info(n_run=n_run, gen=0, nfev=nfev, **stats, header=True) # Begin the generational process for gen in range(1, max_gen + 1): # Check the stopping criterion if max_nfev is not None and tot_nfev >= max_nfev: pbar.close() break offspring, fit_offspring, nfev = self.evolve_population(problem, population, fitness) tot_nfev += nfev population[:] = offspring fitness[:] = fit_offspring # Store the best solution ever found best_tracker.update(population, fitness) # Compute the statistics of the fitness values stats = compute_statistics(fitness) # Record info in the logbook lg.record(gen=gen, nfev=nfev, **stats) # Print the evolution info if verbose: print_info(n_run=n_run, gen=gen, nfev=nfev, **stats) # Update the progress bar if max_nfev is not None: pbar.update(nfev) else: pbar.update() res = FinalResult(x=best_tracker.get_best(), fun=best_tracker.get_best_fit(), nfev=tot_nfev, gen=gen, log=lg.get_log()) return res