"""The :mod:`search` module defines algorithms to search for Push programs."""
from abc import ABC, abstractmethod
from typing import Union, Tuple, Optional
import numpy as np
import math
from functools import partial
from multiprocessing import Pool, Manager
from pyshgp.push.program import ProgramSignature
from pyshgp.tap import tap
from pyshgp.utils import DiscreteProbDistrib
from pyshgp.gp.evaluation import Evaluator
from pyshgp.gp.genome import GeneSpawner, GenomeSimplifier
from pyshgp.gp.individual import Individual
from pyshgp.gp.population import Population
from pyshgp.gp.selection import Selector, get_selector
from pyshgp.gp.variation import VariationOperator, get_variation_operator
from pyshgp.utils import instantiate_using
[docs]class ParallelContext:
"""Holds the objects needed to coordinate parallelism."""
def __init__(self,
spawner: GeneSpawner,
evaluator: Evaluator,
n_proc: Optional[int] = None):
self.manager = Manager()
self.ns = self.manager.Namespace()
self.ns.spawner = spawner
self.ns.evaluator = evaluator
self.pool = None
if n_proc is None:
self.pool = Pool()
else:
self.pool = Pool(n_proc)
[docs] def close(self):
if self.pool is not None:
self.pool.close()
[docs]class SearchConfiguration:
"""Configuration of an search algorithm.
Parameters
----------
evaluator : Evaluator
The Evaluator to use when evaluating individuals.
spawning : GeneSpawner
The GeneSpawner to use when producing Genomes during initialization and
variation.
selection : Union[Selector, DiscreteProbDistrib, str], optional
A Selector, or DiscreteProbDistrib of selectors, to use when selecting
parents. The default is lexicase selection.
variation : Union[VariationOperator, DiscreteProbDistrib, str], optional
A VariationOperator, or DiscreteProbDistrib of VariationOperators, to
use during variation. Default is SIZE_NEUTRAL_UMAD.
population_size : int, optional
The number of individuals hold in the population each generation. Default
is 300.
max_generations : int, optional
The number of generations to run the search algorithm. Default is 100.
error_threshold : float, optional
If the search algorithm finds an Individual with a total error less
than this values, stop searching. Default is 0.0.
initial_genome_size : Tuple[int, int], optional
The range of genome sizes to produce during initialization. Default is
(20, 100)
simplification_steps : int, optional
The number of simplification iterations to apply to the best Push program
produced by the search algorithm. Default is 2000.
parallelism : Union[Int, bool], optional
Set the number of processes to spawn for use when performing embarrassingly
parallel tasks. If false, no processes will spawn and compuation will be
serial. Default is true, which spawns one process per available cpu.
verbosity_config : Union[VerbosityConfig, str], optional
A VerbosityConfig controlling what is logged during the search. Default
is no verbosity.
"""
# @todo change to a PClass
def __init__(self,
signature: ProgramSignature,
evaluator: Evaluator,
spawner: GeneSpawner,
selection: Union[Selector, DiscreteProbDistrib, str] = "lexicase",
variation: Union[VariationOperator, DiscreteProbDistrib, str] = "umad",
population_size: int = 500,
max_generations: int = 100,
error_threshold: float = 0.0,
initial_genome_size: Tuple[int, int] = (10, 50),
simplification_steps: int = 2000,
parallelism: Union[int, bool] = True,
**kwargs):
self.signature = signature
self.evaluator = evaluator
self.spawner = spawner
self.population_size = population_size
self.max_generations = max_generations
self.error_threshold = error_threshold
self.initial_genome_size = initial_genome_size
self.simplification_steps = simplification_steps
self.ext = kwargs
self.parallel_context = None
if isinstance(parallelism, bool):
if parallelism:
self.parallel_context = ParallelContext(spawner, evaluator)
elif parallelism > 1:
self.parallel_context = ParallelContext(spawner, evaluator, parallelism)
if isinstance(selection, Selector):
self.selection = DiscreteProbDistrib().add(selection, 1.0)
elif isinstance(selection, DiscreteProbDistrib):
self.selection = selection
else:
selector = get_selector(selection, **self.ext)
self.selection = DiscreteProbDistrib().add(selector, 1.0)
if isinstance(variation, VariationOperator):
self.variation = DiscreteProbDistrib().add(variation, 1.0)
elif isinstance(variation, DiscreteProbDistrib):
self.variation = variation
else:
variation_op = get_variation_operator(variation, **self.ext)
self.variation = DiscreteProbDistrib().add(variation_op, 1.0)
[docs] def get_selector(self):
"""Return a Selector."""
return self.selection.sample()
[docs] def get_variation_op(self):
"""Return a VariationOperator."""
return self.variation.sample()
[docs] def tear_down(self):
if self.parallel_context is not None:
self.parallel_context.close()
def _spawn_individual(spawner, genome_size, program_signature: ProgramSignature, *args):
return Individual(spawner.spawn_genome(genome_size), program_signature)
[docs]class SearchAlgorithm(ABC):
"""Base class for all search algorithms.
Parameters
----------
config : SearchConfiguration
The configuation of the search algorithm.
Attributes
----------
config : SearchConfiguration
The configuration of the search algorithm.
generation : int
The current generation, or iteration, of the search.
best_seen : Individual
The best Individual, with respect to total error, seen so far.
population : Population
The current Population of individuals.
"""
def __init__(self, config: SearchConfiguration):
self.config = config
self._p_context = config.parallel_context
self.generation = 0
self.best_seen = None
self.population = None
self.init_population()
[docs] def init_population(self):
"""Initialize the population."""
spawner = self.config.spawner
init_gn_size = self.config.initial_genome_size
pop_size = self.config.population_size
signature = self.config.signature
self.population = Population()
if self._p_context is not None:
gen_func = partial(_spawn_individual, self._p_context.ns.spawner, init_gn_size, signature)
for indiv in self._p_context.pool.imap_unordered(gen_func, range(pop_size)):
self.population.add(indiv)
else:
for i in range(pop_size):
self.population.add(_spawn_individual(spawner, init_gn_size, signature))
[docs] @tap
@abstractmethod
def step(self) -> bool:
"""Perform one generation (step) of the search.
The step method should assume an evaluated Population, and must only
perform parent selection and variation (producing children). The step
method should modify the search algorithms population in-place, or
assign a new Population to the population attribute.
"""
pass
def _full_step(self) -> bool:
self.generation += 1
if self._p_context is not None:
self.population.p_evaluate(self._p_context.ns.evaluator, self._p_context.pool)
else:
self.population.evaluate(self.config.evaluator)
best_this_gen = self.population.best()
if self.best_seen is None or best_this_gen.total_error < self.best_seen.total_error:
self.best_seen = best_this_gen
if self.best_seen.total_error <= self.config.error_threshold:
return False
self.step()
return True
[docs] def is_solved(self) -> bool:
"""Return ``True`` if the search algorithm has found a solution or ``False`` otherwise."""
return self.best_seen.total_error <= self.config.error_threshold
[docs] @tap
def run(self) -> Individual:
"""Run the algorithm until termination."""
while self._full_step():
if self.generation >= self.config.max_generations:
break
# Simplify the best individual for a better generalization and interpretation.
simplifier = GenomeSimplifier(
self.config.evaluator,
self.config.signature
)
simp_genome, simp_error_vector = simplifier.simplify(
self.best_seen.genome,
self.best_seen.error_vector,
self.config.simplification_steps
)
simplified_best = Individual(simp_genome, self.config.signature)
simplified_best.error_vector = simp_error_vector
self.best_seen = simplified_best
return self.best_seen
[docs]class GeneticAlgorithm(SearchAlgorithm):
"""Genetic algorithm to synthesize Push programs.
An initial Population of random Individuals is created. Each generation
begins by evaluating all Individuals in the population. Then the current
Popluation is replaced with children produced by selecting parents from
the Population and applying VariationOperators to them.
"""
def __init__(self, config: SearchConfiguration):
super().__init__(config)
def _make_child(self) -> Individual:
op = self.config.get_variation_op()
selector = self.config.get_selector()
parent_genomes = [p.genome for p in selector.select(self.population, n=op.num_parents)]
child_genome = op.produce(parent_genomes, self.config.spawner)
return Individual(child_genome, self.config.signature)
[docs] @tap
def step(self):
"""Perform one generation (step) of the genetic algorithm.
The step method assumes an evaluated Population and performs parent
selection and variation (producing children).
"""
super().step()
self.population = Population(
[self._make_child() for _ in range(self.config.population_size)]
)
[docs]class SimulatedAnnealing(SearchAlgorithm):
"""Algorithm to synthesize Push programs with Simulated Annealing.
At each step (generation), the simulated annealing heuristic mutates the
current Individual, and probabilistically decides between accepting or
rejecting the child. If the child is accepted, it becomes the new current
Individual.
After each step, the simmulated annealing system cools its temperature.
As the temperature lowers, the probability of accepting a child that does
not have a lower total error than the current Individual decreases.
"""
def __init__(self, config: SearchConfiguration):
config.population_size = 1
for op in config.variation.elements:
assert op.num_parents <= 1, "SimulatedAnnealing cannot take multiple parant variation operators."
super().__init__(config)
def _get_temp(self, ):
"""Return the temperature."""
return 1.0 - (self.generation / self.config.max_generations)
def _acceptance(self, next_error):
"""Return probability of acceptance given an error."""
current_error = self.population.best().total_error
if np.isinf(current_error) or next_error < current_error:
return 1
else:
return math.exp(-(next_error - current_error) / self._get_temp())
[docs] @tap
def step(self):
"""Perform one generation, or step, of the Simulated Annealing.
The step method assumes an evaluated Population one Individual and
produces a single candidate Individual. If the candidate individual
passes the acceptance function, it becomes the Individual in the
Population.
"""
super().step()
if self._get_temp() <= 0:
return
candidate = Individual(
self.config.get_variation_op().produce(
[self.population.best().genome],
self.config.spawner
),
self.config.signature
)
candidate.error_vector = self.config.evaluator.evaluate(candidate.program)
acceptance_probability = self._acceptance(candidate.total_error)
if np.random.random() < acceptance_probability:
self.population = Population().add(candidate)
# @TODO: class EvolutionaryStrategy(SearchAlgorithm):
# ...
[docs]def get_search_algo(name: str, **kwargs) -> SearchAlgorithm:
"""Return the search algorithm class with the given name."""
name_to_cls = {
"GA": GeneticAlgorithm,
"SA": SimulatedAnnealing,
# "ES": EvolutionaryStrategy,
}
_cls = name_to_cls.get(name, None)
if _cls is None:
raise ValueError("No search algo '{nm}'. Supported names: {lst}.".format(
nm=name,
lst=list(name_to_cls.keys())
))
return instantiate_using(_cls, kwargs)