import numpy as np
import random
from evovaq.tools.support import compute_statistics, print_info, Logbook, set_progress_bar, BestIndividualTracker, \
FinalResult, Particle
from evovaq.tools.operators import sel_best
from typing import Union
from evovaq.problem import Problem
[docs]class PSO(object):
"""
Particle Swarm Optimization (PSO) is an optimization method based on a swarm of candidate solutions, named particles,
moving in the search space according to appropriate position and velocity equation [1]. Starting from a swarm of
particles with random positions and velocities, each particle’s velocity is updated by combining its own best
position (pbest) and the global best position (gbest) ever found in the search space with some random perturbations
influenced by two hyperparameters, denoted as `phi1` and `phi2`. At this point, each particle’s position is updated
by adding its resulting velocity to the current position. The final goal is to move the whole swarm close to an
optimal position in the search space.
In `evovaq`, PSO with inertia weight (`inertia_weight`) described in [2] is implemented.
References:
[1] Giovanni Acampora, Angela Chiatto, and Autilia Vitiello, "A comparison of evolutionary algorithms for
training variational quantum classifiers", Proceedings of 2023 IEEE Congress on Evolutionary Computation (CEC),
pp. 1–8, 2023.
[2] Poli R., Kennedy J., & Blackwell T., "Particle swarm optimization: An overview", Swarm intelligence, vol. 1,
pp. 33-57, 2007.
Args:
vmin: Lower value(s) of the velocity. If None, no limits are considered.
vmax: Upper value(s) of the velocity. If None, no limits are considered.
inertia_weight: Inertia weight.
phi1: Acceleration coefficient determining the magnitude of the random force in the direction of personal
best solution.
phi2: Acceleration coefficient determining the magnitude of the random force in the direction of global best
solution.
"""
def __init__(self, vmin: Union[float, np.ndarray, None] = None, vmax: Union[float, np.ndarray, None] = None,
inertia_weight: float = 0.7298, phi1: float = 1.49618, phi2: float = 1.49618):
self.vmin = vmin
self.vmax = vmax
self.inertia_weight = inertia_weight
self.phi1 = phi1
self.phi2 = phi2
[docs] @staticmethod
def initialize_swarm(population: np.ndarray, fitness: np.ndarray) -> list[Particle]:
"""
Initialize the swarm.
Args:
population: A population of individuals as array of real parameters with (`pop_size`, `n_params`)
shape.
fitness: A set of fitness values associated to the population as array of real values with (`pop_size`,)
shape.
Returns:
List of :class:`~.Particle` elements.
"""
swarm = []
for position, fit in zip(population, fitness):
velocity = np.random.uniform(-1, 1, len(position))
part = Particle(position, velocity, fit)
swarm.append(part)
return swarm
[docs] def optimize(
self, problem: Problem, pop_size: int, initial_pop: Union[np.ndarray, None] = None,
max_nfev: Union[int, None] = None, max_gen: int = 1000, n_run: int = 1, seed: Union[int, float, str, None] = None,
verbose: bool = True) -> FinalResult:
"""
Optimize the parameters of the problem to be solved.
Args:
problem: :class:`~.Problem` to be solved.
pop_size: Population size.
initial_pop: Initial population of possible solutions as array of real parameters with (`pop_size`, `n_params`)
shape. If None, the initial population is randomly generated from `param_bounds`.
max_nfev: Maximum number of fitness evaluations used as stopping criterion. If None, the maximum number of
generations `max_gen` is considered as stopping criterion.
max_gen: Maximum number of generations used as stopping criterion. If `max_nfev` is not None, this is
considered as stopping criterion.
n_run: Independent execution number of the algorithm.
seed: Initialize the random number generator. If None, the current time is used.
verbose: If True, the statistics of fitness values is printed during the evolution.
Returns:
A :class:`~.FinalResult` containing the optimization result.
"""
# Set the seed
if seed is not None:
np.random.seed(seed)
random.seed(seed)
# Create and initialize the population
if initial_pop is None:
population = problem.generate_random_pop(pop_size)
fitness = np.array(list(map(problem.evaluate_fitness, population)))
nfev = len(population)
tot_nfev = len(population)
else:
population = initial_pop[:].copy()
fitness = np.array(list(map(problem.evaluate_fitness, population)))
nfev = len(population)
tot_nfev = len(population)
# Store the best solution ever found
best_tracker = BestIndividualTracker()
best_tracker.update(population, fitness)
# Set the progress bar considering the stopping criterion
pbar = set_progress_bar(max_gen, max_nfev)
if max_nfev is not None:
pbar.update(nfev)
# Compute the statistics of the fitness values
stats = compute_statistics(fitness)
# Set the logbook
lg = Logbook()
lg.record(gen=0, nfev=nfev, **stats)
# Print the evolution info
if verbose:
print_info(n_run=n_run, gen=0, nfev=nfev, **stats, header=True)
swarm = self.initialize_swarm(population, fitness)
# Begin the generational process
for gen in range(1, max_gen + 1):
# Check the stopping criterion
if max_nfev is not None and tot_nfev >= max_nfev:
pbar.close()
break
gbest, fit_gbest = sel_best(population, fitness, 1)
offspring = population.copy()
fit_offspring = fitness.copy()
for idx, part in enumerate(swarm):
part.update_velocity(gbest[0], self.inertia_weight, self.phi1, self.phi2, self.vmin, self.vmax)
part.update_position(problem.param_bounds)
new_fitness = problem.evaluate_fitness(part.position)
part.update_fitness(new_fitness)
part.update_pbest()
offspring[idx] = part.position
fit_offspring[idx] = part.fitness
nfev = len(offspring)
tot_nfev += len(offspring)
population[:] = offspring
fitness[:] = fit_offspring
# Store the best solution ever found
best_tracker.update(population, fitness)
# Compute the statistics of the fitness values
stats = compute_statistics(fitness)
# Record info in the logbook
lg.record(gen=gen, nfev=nfev, **stats)
# Print the evolution info
if verbose:
print_info(n_run=n_run, gen=gen, nfev=nfev, **stats)
# Update the progress bar
if max_nfev is not None:
pbar.update(nfev)
else:
pbar.update()
res = FinalResult(x=best_tracker.get_best(), fun=best_tracker.get_best_fit(), nfev=tot_nfev, gen=gen,
log=lg.get_log())
return res