Source code for evol.population

"""
Population objects in `evol` are a collection of chromosomes
at some point in an evolutionary algorithm. You can apply
evolutionary steps by directly calling methods on the population
or by applying an `evol.Evolution` object.
"""
from abc import ABCMeta, abstractmethod
from copy import copy
from itertools import cycle, islice
from math import ceil
from random import choices, randint
from typing import Any, Callable, Generator, Iterable, Iterator, List, Optional, Sequence, TYPE_CHECKING
from uuid import uuid4

from multiprocess.pool import Pool

from evol import Individual
from evol.conditions import Condition
from evol.exceptions import StopEvolution
from evol.helpers.groups import group_random
from evol.utils import offspring_generator, select_arguments
from evol.serialization import SimpleSerializer

if TYPE_CHECKING:
    from .evolution import Evolution


[docs]class BasePopulation(metaclass=ABCMeta): def __init__(self, chromosomes: Iterable[Any], eval_function: Callable, checkpoint_target: Optional[str] = None, concurrent_workers: Optional[int] = 1, maximize: bool = True, generation: int = 0, intended_size: Optional[int] = None, serializer=None): self.concurrent_workers = concurrent_workers self.documented_best = None self.eval_function = eval_function self.generation = generation self.id = str(uuid4())[:6] self.individuals = [Individual(chromosome=chromosome) for chromosome in chromosomes] self.intended_size = intended_size or len(self.individuals) self.maximize = maximize self.serializer = serializer or SimpleSerializer(target=checkpoint_target) self.pool = None if concurrent_workers == 1 else Pool(concurrent_workers) def __iter__(self) -> Iterator[Individual]: return self.individuals.__iter__() def __getitem__(self, i) -> Individual: return self.individuals[i] def __len__(self): return len(self.individuals) def __repr__(self): return f"<Population with size {len(self)} at {id(self)}>" @property def current_best(self) -> Individual: evaluated_individuals = tuple(filter(lambda x: x.fitness is not None, self.individuals)) if len(evaluated_individuals) > 0: return max(evaluated_individuals, key=lambda x: x.fitness if self.maximize else -x.fitness) @property def current_worst(self) -> Individual: evaluated_individuals = tuple(filter(lambda x: x.fitness is not None, self.individuals)) if len(evaluated_individuals) > 0: return min(evaluated_individuals, key=lambda x: x.fitness if self.maximize else -x.fitness) @property def chromosomes(self) -> Generator[Any, None, None]: for individual in self.individuals: yield individual.chromosome @property def is_evaluated(self) -> bool: return all(individual.fitness is not None for individual in self)
[docs] @classmethod def generate(cls, init_function: Callable[[], Any], eval_function: Callable[..., float], size: int = 100, **kwargs) -> 'BasePopulation': """Generate a population from an initialisation function. :param init_function: Function that returns a chromosome. :param eval_function: Function that reduces a chromosome to a fitness. :param size: Number of individuals to generate. Defaults to 100. :return: BasePopulation """ chromosomes = [init_function() for _ in range(size)] return cls(chromosomes=chromosomes, eval_function=eval_function, **kwargs)
[docs] @classmethod def load(cls, target: str, eval_function: Callable[..., float], **kwargs) -> 'BasePopulation': """Load a population from a checkpoint. :param target: Path to checkpoint directory or file. :param eval_function: Function that reduces a chromosome to a fitness. :param kwargs: Any argument the init method accepts. :return: Population """ result = cls(chromosomes=[], eval_function=eval_function, **kwargs) result.individuals = result.serializer.load(target=target) return result
[docs] def checkpoint(self, target: Optional[str] = None, method: str = 'pickle') -> 'BasePopulation': """Checkpoint the population. :param target: Directory to write checkpoint to. If None, the Serializer default target is taken, which can be provided upon initialisation. Defaults to None. :param method: One of 'pickle' or 'json'. When 'json', the chromosomes need to be json-serializable. Defaults to 'pickle'. :return: Population """ self.serializer.checkpoint(individuals=self.individuals, target=target, method=method) return self
@property def _individual_weights(self): try: min_fitness = min(individual.fitness for individual in self) max_fitness = max(individual.fitness for individual in self) except TypeError: raise RuntimeError('Individual weights can not be computed if the individuals are not evaluated.') if min_fitness == max_fitness: return [1] * len(self) elif self.maximize: return [(individual.fitness - min_fitness) / (max_fitness - min_fitness) for individual in self] else: return [1 - (individual.fitness - min_fitness) / (max_fitness - min_fitness) for individual in self]
[docs] def evolve(self, evolution: 'Evolution', n: int = 1) -> 'BasePopulation': # noqa: F821 """Evolve the population according to an Evolution. :param evolution: Evolution to follow :param n: Times to apply the evolution. Defaults to 1. :return: Population """ result = copy(self) try: for _ in range(n): Condition.check(result) for step in evolution: result = step.apply(result) except StopEvolution: pass return result
[docs] @abstractmethod def evaluate(self, lazy: bool = False) -> 'BasePopulation': pass
[docs] def breed(self, parent_picker: Callable[..., Sequence[Individual]], combiner: Callable, population_size: Optional[int] = None, **kwargs) -> 'BasePopulation': """Create new individuals by combining existing individuals. This increments the generation of the Population. :param parent_picker: Function that selects parents from a collection of individuals. :param combiner: Function that combines chromosomes into a new chromosome. Must be able to handle the number of chromosomes that the combiner returns. :param population_size: Intended population size after breeding. If None, take the previous intended population size. Defaults to None. :param kwargs: Kwargs to pass to the parent_picker and combiner. Arguments are only passed to the functions if they accept them. :return: self """ if population_size: self.intended_size = population_size offspring = offspring_generator(parents=self.individuals, parent_picker=select_arguments(parent_picker), combiner=select_arguments(combiner), **kwargs) self.individuals += list(islice(offspring, self.intended_size - len(self.individuals))) self.generation += 1 return self
[docs] def mutate(self, mutate_function: Callable[..., Any], probability: float = 1.0, elitist: bool = False, **kwargs) -> 'BasePopulation': """Mutate the chromosome of each individual. :param mutate_function: Function that accepts a chromosome and returns a mutated chromosome. :param probability: Probability that the individual mutates. The function is only applied in the given fraction of cases. Defaults to 1.0. :param elitist: If True, do not mutate the current best individual(s). Note that this only applies to evaluated individuals. Any unevaluated individual will be treated as normal. Defaults to False. :param kwargs: Arguments to pass to the mutation function. :return: self """ elite_fitness: Optional[float] = self.current_best.fitness if elitist else None for individual in self.individuals: if elite_fitness is None or individual.fitness != elite_fitness: individual.mutate(mutate_function, probability=probability, **kwargs) return self
[docs] def map(self, func: Callable[..., Individual], **kwargs) -> 'BasePopulation': """Apply the provided function to each individual in the population. :param func: A function to apply to each individual in the population, which when called returns a modified individual. :param kwargs: Arguments to pass to the function. :return: self """ self.individuals = [func(individual, **kwargs) for individual in self.individuals] return self
[docs] def filter(self, func: Callable[..., bool], **kwargs) -> 'BasePopulation': """Add a filter step to the Evolution. Filters the individuals in the population using the provided function. :param func: Function to filter the individuals in the population by, which returns a boolean when called on an individual. :param kwargs: Arguments to pass to the function. :return: self """ self.individuals = [individual for individual in self.individuals if func(individual, **kwargs)] return self
[docs] def survive(self, fraction: Optional[float] = None, n: Optional[int] = None, luck: bool = False) -> 'BasePopulation': """Let part of the population survive. Remove part of the population. If both fraction and n are specified, the minimum resulting population size is taken. :param fraction: Fraction of the original population that survives. Defaults to None. :param n: Number of individuals of the population that survive. Defaults to None. :param luck: If True, individuals randomly survive (with replacement!) with chances proportional to their fitness. Defaults to False. :return: self """ if fraction is None: if n is None: raise ValueError('everyone survives! must provide either "fraction" and/or "n".') resulting_size = n elif n is None: resulting_size = round(fraction * len(self.individuals)) else: resulting_size = min(round(fraction * len(self.individuals)), n) self.evaluate(lazy=True) if resulting_size == 0: raise RuntimeError(f'No individual out of {len(self.individuals)} survived!') if resulting_size > len(self.individuals): raise ValueError(f'everyone survives in population {self.id}: ' f'{resulting_size} out of {len(self.individuals)} must survive.') if luck: self.individuals = choices(self.individuals, k=resulting_size, weights=self._individual_weights) else: sorted_individuals = sorted(self.individuals, key=lambda x: x.fitness, reverse=self.maximize) self.individuals = sorted_individuals[:resulting_size] return self
[docs] def callback(self, callback_function: Callable[..., None], **kwargs) -> 'BasePopulation': """ Performs a callback function on the population. Can be used for custom logging/checkpointing. :param callback_function: Function that accepts the population as a first argument. :return: """ self.evaluate(lazy=True) callback_function(self, **kwargs) return self
[docs] def group(self, grouping_function: Callable[..., List[List[int]]] = group_random, **kwargs) -> List['BasePopulation']: """ Group a population into islands. Divides the population into multiple island populations, each of which contains a subset of the original population. An individual from the original population may end up in multiple (>= 0) island populations. :param grouping_function: Function that allocates individuals to the island populations. It will be passed a list of individuals plus the kwargs passed to this method, and must return a list of lists of integers, each sub-list representing an island and the integers representing the index of an individual in the list. Each island must contain at least one individual, and individual may be copied to multiple islands. :param kwargs: Additional keyworded arguments are passed to the grouping function. :return: List[Population] """ group_indexes = grouping_function(self.individuals, **kwargs) if len(group_indexes) == 0: raise ValueError('Group yielded zero islands.') result = [self._subset(index=index, subset_id=str(i)) for i, index in enumerate(group_indexes)] return result
[docs] @classmethod def combine(cls, *populations: 'BasePopulation', intended_size: Optional[int] = None, pool: Optional[Pool] = None) -> 'BasePopulation': """ Combine multiple island populations into a single population. The resulting population is reduced to its intended size. :param populations: Populations to combine. :param intended_size: Intended size of the resulting population. Defaults to the sum of the intended sizes of the islands. :param pool: Optionally provide a multiprocessing pool to be used by the population. :return: Population """ if len(populations) == 0: raise ValueError('Cannot combine zero islands into one.') result = copy(populations[0]) for pop in populations[1:]: result.individuals += pop.individuals result.intended_size = intended_size or sum([pop.intended_size for pop in populations]) result.pool = pool result.id = result.id.split('-')[0] return result.survive(n=result.intended_size)
def _subset(self, index: List[int], subset_id: str) -> 'BasePopulation': """Create a new population that is a subset of this population.""" if len(index) == 0: raise ValueError('Grouping yielded an empty island.') result = copy(self) result.individuals = [result.individuals[i] for i in index] result.intended_size = len(result.individuals) result.pool = None # Subsets shouldn't parallelize anything result.id += '-' + subset_id return result def _update_documented_best(self): """Update the documented best""" current_best = self.current_best if (self.documented_best is None or (self.maximize and current_best.fitness > self.documented_best.fitness) or (not self.maximize and current_best.fitness < self.documented_best.fitness)): self.documented_best = copy(current_best)
[docs]class Population(BasePopulation): """Population of Individuals :param chromosomes: Iterable of initial chromosomes of the Population. :param eval_function: Function that reduces a chromosome to a fitness. :param maximize: If True, fitness will be maximized, otherwise minimized. Defaults to True. :param generation: Generation of the Population. This is incremented after each breed call. Defaults to 0. :param intended_size: Intended size of the Population. The population will be replenished to this size by .breed(). Defaults to the number of chromosomes provided. :param checkpoint_target: Target for the serializer of the Population. If a serializer is provided, this target is ignored. Defaults to None. :param serializer: Serializer for the Population. If None, a new SimpleSerializer is created. Defaults to None. :param concurrent_workers: If > 1, evaluate individuals in {concurrent_workers} separate processes. If None, concurrent_workers is set to n_cpus. Defaults to 1. """ def __init__(self, chromosomes: Iterable, eval_function: Callable[..., float], maximize: bool = True, generation: int = 0, intended_size: Optional[int] = None, checkpoint_target: Optional[str] = None, serializer=None, concurrent_workers: Optional[int] = 1): super().__init__(chromosomes=chromosomes, eval_function=eval_function, checkpoint_target=checkpoint_target, concurrent_workers=concurrent_workers, maximize=maximize, generation=generation, intended_size=intended_size, serializer=serializer) def __copy__(self): result = self.__class__(chromosomes=[], eval_function=self.eval_function, maximize=self.maximize, serializer=self.serializer, intended_size=self.intended_size, generation=self.generation, concurrent_workers=1) # Prevent new pool from being made result.individuals = [copy(individual) for individual in self.individuals] result.concurrent_workers = self.concurrent_workers result.pool = self.pool result.documented_best = self.documented_best result.id = self.id return result
[docs] def evaluate(self, lazy: bool = False) -> 'Population': """Evaluate the individuals in the population. This evaluates the fitness of all individuals. If lazy is True, the fitness is only evaluated when a fitness value is not yet known. In most situations adding an explicit evaluation step is not needed, as lazy evaluation is implicitly included in the operations that need it (most notably in the survive operation). :param lazy: If True, do no re-evaluate the fitness if the fitness is known. :return: self """ if self.pool: f = self.eval_function # We cannot refer to self in the map scores = self.pool.map(lambda i: i.fitness if (i.fitness and lazy) else f(i.chromosome), self.individuals) for individual, fitness in zip(self.individuals, scores): individual.fitness = fitness else: for individual in self.individuals: individual.evaluate(eval_function=self.eval_function, lazy=lazy) self._update_documented_best() return self
[docs]class Contest: """A single contest among a group of competitors. This is encapsulated in an object so that scores for many sets of competitors can be evaluated concurrently without resorting to a dict or some similar madness to correlate score vectors with an ever-widening matrix of contests and competitors. :param competitors: Iterable of Individuals in this Contest. """ def __init__(self, competitors: Iterable[Individual]): self.competitors = list(competitors)
[docs] def assign_scores(self, scores: Sequence[float]) -> None: for competitor, score in zip(self.competitors, scores): competitor.fitness += score
@property def competitor_chromosomes(self): return [competitor.chromosome for competitor in self.competitors]
[docs] @classmethod def generate(cls, individuals: Sequence[Individual], individuals_per_contest: int, contests_per_round: int) -> List['Contest']: """Generate contests for a round of evaluations. :param individuals: A sequence of competing Individuals. :param individuals_per_contest: Number of Individuals participating in each Contest. :param contests_per_round: Minimum number of contests each individual takes part in for each evaluation round. The actual number of contests per round is a multiple of individuals_per_contest. :return: List of Contests """ contests = [] n_rounds = ceil(contests_per_round / individuals_per_contest) for _ in range(n_rounds): offsets = [0] + [randint(0, len(individuals) - 1) for _ in range(individuals_per_contest - 1)] generators = [islice(cycle(individuals), offset, None) for offset in offsets] for competitors in islice(zip(*generators), len(individuals)): contests.append(Contest(competitors)) return contests
[docs]class ContestPopulation(BasePopulation): """Population which is evaluated through contests. This variant of the Population is used when individuals cannot be evaluated on a one-by-one basis, but instead can only be compared to each other. This is typically the case for AI that performs some task (i.e. plays a game), but can be useful in many other cases. For each round of evaluation, each individual participates in a given number of contests, in which a given number of individuals take part. The resulting scores of these contests are summed to form the fitness. Since the fitness of an individual is dependent on the other individuals in the population, the fitness of all individuals is recalculated when new individuals are present, and the fitness of all individuals is reset when the population is modified (e.g. by calling survive, mutate etc). :param chromosomes: Iterable of initial chromosomes of the Population. :param eval_function: Function that reduces a chromosome to a fitness. :param maximize: If True, fitness will be maximized, otherwise minimized. Defaults to True. :param individuals_per_contest: Number of individuals that take part in each contest. The size of the population must be divisible by this number. Defaults to 2. :param contests_per_round: Minimum number of contests each individual takes part in for each evaluation round. The actual number of contests per round is a multiple of individuals_per_contest. Defaults to 10. :param generation: Generation of the Population. This is incremented after echo survive call. Defaults to 0. :param intended_size: Intended size of the Population. The population will be replenished to this size by .breed(). Defaults to the number of chromosomes provided. :param checkpoint_target: Target for the serializer of the Population. If a serializer is provided, this target is ignored. Defaults to None. :param serializer: Serializer for the Population. If None, a new SimpleSerializer is created. Defaults to None. :param concurrent_workers: If > 1, evaluate individuals in {concurrent_workers} separate processes. If None, concurrent_workers is set to n_cpus. Defaults to 1. """ eval_function: Callable[..., Sequence[float]] # This population expects a different eval signature def __init__(self, chromosomes: Iterable, eval_function: Callable[..., Sequence[float]], maximize: bool = True, individuals_per_contest=2, contests_per_round=10, generation: int = 0, intended_size: Optional[int] = None, checkpoint_target: Optional[int] = None, serializer=None, concurrent_workers: Optional[int] = 1): super().__init__(chromosomes=chromosomes, eval_function=eval_function, maximize=maximize, generation=generation, intended_size=intended_size, checkpoint_target=checkpoint_target, serializer=serializer, concurrent_workers=concurrent_workers) self.contests_per_round = contests_per_round self.individuals_per_contest = individuals_per_contest def __copy__(self): result = self.__class__(chromosomes=[], eval_function=self.eval_function, maximize=self.maximize, contests_per_round=self.contests_per_round, individuals_per_contest=self.individuals_per_contest, serializer=self.serializer, intended_size=self.intended_size, generation=self.generation, concurrent_workers=1) result.individuals = [copy(individual) for individual in self.individuals] result.pool = self.pool result.concurrent_workers = self.concurrent_workers result.documented_best = None result.id = self.id return result
[docs] def evaluate(self, lazy: bool = False, contests_per_round: Optional[int] = None, individuals_per_contest: Optional[int] = None) -> 'ContestPopulation': """Evaluate the individuals in the population. This evaluates the fitness of all individuals. For each round of evaluation, each individual participates in a given number of contests, in which a given number of individuals take part. The resulting scores of these contests are summed to form the fitness. This means that the score of the individual is influenced by other chromosomes in the population. Note that in the `ContestPopulation` two settings are passed at initialisation which affect how we are evaluating individuals: contests_per_round and individuals_per_contest. You may overwrite them here if you wish. If lazy is True, the fitness is only evaluated when a fitness value is not yet known for all individuals. In most situations adding an explicit evaluation step is not needed, as lazy evaluation is implicitly included in the operations that need it (most notably in the survive operation). :param lazy: If True, do no re-evaluate the fitness if the fitness is known. :param contests_per_round: If set, overwrites the population setting for the number of contests there will be every round. :param individuals_per_contest: If set, overwrites the population setting for number of individuals to have in a contest during the evaluation. :return: self """ if contests_per_round is None: contests_per_round = self.contests_per_round if individuals_per_contest is None: individuals_per_contest = self.individuals_per_contest if lazy and all(individual.fitness is not None for individual in self): return self for individual in self.individuals: individual.fitness = 0 contests = Contest.generate(individuals=self.individuals, individuals_per_contest=individuals_per_contest, contests_per_round=contests_per_round) if self.pool is None: for contest in contests: contest.assign_scores(self.eval_function(*contest.competitor_chromosomes)) else: f = self.eval_function # We cannot refer to self in the map results = self.pool.map(lambda c: f(*c.competitor_chromosomes), contests) for result, contest in zip(results, contests): contest.assign_scores(result) return self
[docs] def map(self, func: Callable[..., Individual], **kwargs) -> 'ContestPopulation': """Apply the provided function to each individual in the population. Resets the fitness of all individuals. :param func: A function to apply to each individual in the population, which when called returns a modified individual. :param kwargs: Arguments to pass to the function. :return: self """ BasePopulation.map(self, func=func, **kwargs) self.reset_fitness() return self
[docs] def filter(self, func: Callable[..., bool], **kwargs) -> 'ContestPopulation': """Add a filter step to the Evolution. Filters the individuals in the population using the provided function. Resets the fitness of all individuals. :param func: Function to filter the individuals in the population by, which returns a boolean when called on an individual. :param kwargs: Arguments to pass to the function. :return: self """ BasePopulation.filter(self, func=func, **kwargs) self.reset_fitness() return self
[docs] def survive(self, fraction: Optional[float] = None, n: Optional[int] = None, luck: bool = False) -> 'ContestPopulation': """Let part of the population survive. Remove part of the population. If both fraction and n are specified, the minimum resulting population size is taken. Resets the fitness of all individuals. :param fraction: Fraction of the original population that survives. Defaults to None. :param n: Number of individuals of the population that survive. Defaults to None. :param luck: If True, individuals randomly survive (with replacement!) with chances proportional to their fitness. Defaults to False. :return: self """ BasePopulation.survive(self, fraction=fraction, n=n, luck=luck) self.reset_fitness() return self
[docs] def reset_fitness(self): """Reset the fitness of all individuals.""" for individual in self: individual.fitness = None