1# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. 2# 3# This source code is licensed under the MIT license found in the 4# LICENSE file in the root directory of this source tree. 5 6import numpy as np 7import nevergrad.common.typing as tp 8from nevergrad.parametrization import parameter as p 9from .hypervolume import HypervolumeIndicator 10 11 12AUTO_BOUND = 15 13 14 15# pylint: disable=too-many-instance-attributes 16class HypervolumePareto: 17 """Given several functions, and threshold on their values (above which solutions are pointless), 18 this object can be used as a single-objective function, the minimization of which 19 yields a solution to the original multiobjective problem. 20 21 Parameters 22 ----------- 23 upper_bounds: Tuple of float or np.ndarray 24 upper_bounds[i] is a threshold above which x is pointless if function(x)[i] > upper_bounds[i]. 25 auto_bound: int 26 if no upper bounds are provided, number of initial points used to estimate the upper bounds. Their 27 loss will be 0 (except if they are uniformly worse than the previous points). 28 seed: optional int or RandomState 29 seed to use for selecting random subsamples of the pareto 30 31 Notes 32 ----- 33 - This function is not stationary! 34 - The minimum value obtained for this objective function is -h, 35 where h is the hypervolume of the Pareto front obtained, given upper_bounds as a reference point. 36 - The callable keeps track of the pareto_front (see attribute paretor_front) and is therefor stateful. 37 For this reason it cannot be distributed. A user can however call the multiobjective_function 38 remotely, and aggregate locally. This is what happens in the "minimize" method of optimizers. 39 """ 40 41 def __init__( 42 self, 43 upper_bounds: tp.Optional[tp.ArrayLike] = None, 44 auto_bound: int = AUTO_BOUND, 45 seed: tp.Optional[tp.Union[int, np.random.RandomState]] = None, 46 ) -> None: 47 self._auto_bound = 0 48 self._upper_bounds = ( 49 np.array([-float("inf")]) if upper_bounds is None else np.array(upper_bounds, copy=False) 50 ) 51 if upper_bounds is None: 52 self._auto_bound = auto_bound 53 self._rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed) 54 self._pareto: tp.List[p.Parameter] = [] 55 self._best_volume = -float("Inf") 56 self._hypervolume: tp.Optional[HypervolumeIndicator] = None 57 self._pareto_needs_filtering = False 58 59 @property 60 def num_objectives(self) -> int: 61 return self._upper_bounds.size 62 63 @property 64 def best_volume(self) -> float: 65 return self._best_volume 66 67 def _add_to_pareto(self, parameter: p.Parameter) -> None: 68 self._pareto.append(parameter) 69 self._pareto_needs_filtering = True 70 71 def extend(self, parameters: tp.Sequence[p.Parameter]) -> float: 72 output = 0.0 73 for param in parameters: 74 output = self.add(param) 75 return output 76 77 def add(self, parameter: p.Parameter) -> float: 78 """Given parameters and the multiobjective loss, this computes the hypervolume 79 and update the state of the function with new points if it belongs to the pareto front 80 """ 81 if not isinstance(parameter, p.Parameter): 82 raise TypeError( 83 f"{self.__class__.__name__}.add should receive a ng.p.Parameter, but got: {parameter}." 84 ) 85 losses = parameter.losses 86 if not isinstance(losses, np.ndarray): 87 raise TypeError( 88 f"Parameter should have multivalue as losses, but parameter.losses={losses} ({type(losses)})." 89 ) 90 if self._auto_bound > 0: 91 self._auto_bound -= 1 92 if (self._upper_bounds > -float("inf")).all() and (losses > self._upper_bounds).all(): 93 return float("inf") # Avoid uniformly worst points 94 self._upper_bounds = np.maximum(self._upper_bounds, losses) 95 self._add_to_pareto(parameter) 96 return 0.0 97 if self._hypervolume is None: 98 self._hypervolume = HypervolumeIndicator(self._upper_bounds) 99 # get rid of points over the upper bounds 100 if (losses - self._upper_bounds > 0).any(): 101 loss = -float(np.sum(np.maximum(0, losses - self._upper_bounds))) 102 if loss > self._best_volume: 103 self._best_volume = loss 104 if self._best_volume < 0: 105 self._add_to_pareto(parameter) 106 return -loss 107 # We compute the hypervolume 108 new_volume = self._hypervolume.compute([pa.losses for pa in self._pareto] + [losses]) 109 if new_volume > self._best_volume: 110 # This point is good! Let us give him a great mono-fitness value. 111 self._best_volume = new_volume 112 self._add_to_pareto(parameter) 113 return -new_volume 114 else: 115 # This point is not on the front 116 # First we prune. 117 distance_to_pareto = float("Inf") 118 for param in self.pareto_front(): 119 stored_losses = param.losses 120 # TODO the following is probably not good at all: 121 # -> +inf if no point is strictly better (but lower if it is) 122 if (stored_losses <= losses).all(): 123 distance_to_pareto = min(distance_to_pareto, min(losses - stored_losses)) 124 assert distance_to_pareto >= 0 125 return -new_volume + distance_to_pareto 126 127 def _filter_pareto_front(self) -> None: 128 """Filters the Pareto front""" 129 new_pareto: tp.List[p.Parameter] = [] 130 for param in self._pareto: # quadratic :( 131 should_be_added = True 132 for other in self._pareto: 133 if (other.losses <= param.losses).all() and (other.losses < param.losses).any(): 134 should_be_added = False 135 break 136 if should_be_added: 137 new_pareto.append(param) 138 self._pareto = new_pareto 139 self._pareto_needs_filtering = False 140 141 # pylint: disable=too-many-branches 142 def pareto_front( 143 self, size: tp.Optional[int] = None, subset: str = "random", subset_tentatives: int = 12 144 ) -> tp.List[p.Parameter]: 145 """Pareto front, as a list of Parameter. The losses can be accessed through 146 parameter.losses 147 148 Parameters 149 ------------ 150 size: int (optional) 151 if provided, selects a subset of the full pareto front with the given maximum size 152 subset: str 153 method for selecting the subset ("random, "loss-covering", "EPS", "domain-covering", "hypervolume") 154 EPS is the epsilon indicator described e.g. 155 here: https://hal.archives-ouvertes.fr/hal-01159961v2/document 156 subset_tentatives: int 157 number of random tentatives for finding a better subset 158 159 Returns 160 -------- 161 list 162 the list of Parameter of the pareto front 163 """ 164 if self._pareto_needs_filtering: 165 self._filter_pareto_front() 166 if size is None or size >= len(self._pareto): # No limit: we return the full set. 167 return self._pareto 168 if subset == "random": 169 return self._rng.choice(self._pareto, size).tolist() # type: ignore 170 tentatives = [self._rng.choice(self._pareto, size).tolist() for _ in range(subset_tentatives)] 171 if self._hypervolume is None: 172 raise RuntimeError("Hypervolume not initialized, not supported") # TODO fix 173 hypervolume = self._hypervolume 174 scores: tp.List[float] = [] 175 for tentative in tentatives: 176 if subset == "hypervolume": 177 scores += [-hypervolume.compute([pa.losses for pa in tentative])] 178 else: 179 score: float = 0.0 180 for v in self._pareto: 181 best_score = float("inf") if subset != "EPS" else 0.0 182 for pa in tentative: 183 if subset == "loss-covering": # Equivalent to IGD. 184 best_score = min(best_score, np.linalg.norm(pa.losses - v.losses)) 185 elif subset == "EPS": # Cone Epsilon-Dominance. 186 best_score = min(best_score, max(pa.losses - v.losses)) 187 elif subset == "domain-covering": 188 best_score = min( 189 best_score, np.linalg.norm(pa.get_standardized_data(reference=v)) 190 ) # TODO verify 191 else: 192 raise ValueError(f'Unknown subset for Pareto-Set subsampling: "{subset}"') 193 score += best_score ** 2 if subset != "EPS" else max(score, best_score) 194 scores += [score] 195 return tentatives[scores.index(min(scores))] # type: ignore 196