1# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2#
3# This source code is licensed under the MIT license found in the
4# LICENSE file in the root directory of this source tree.
5
6import numpy as np
7import nevergrad.common.typing as tp
8from nevergrad.parametrization import parameter as p
9from .hypervolume import HypervolumeIndicator
10
11
12AUTO_BOUND = 15
13
14
15# pylint: disable=too-many-instance-attributes
16class HypervolumePareto:
17    """Given several functions, and threshold on their values (above which solutions are pointless),
18    this object can be used as a single-objective function, the minimization of which
19    yields a solution to the original multiobjective problem.
20
21    Parameters
22    -----------
23    upper_bounds: Tuple of float or np.ndarray
24        upper_bounds[i] is a threshold above which x is pointless if function(x)[i] > upper_bounds[i].
25    auto_bound: int
26        if no upper bounds are provided, number of initial points used to estimate the upper bounds. Their
27        loss will be 0 (except if they are uniformly worse than the previous points).
28    seed: optional int or RandomState
29        seed to use for selecting random subsamples of the pareto
30
31    Notes
32    -----
33    - This function is not stationary!
34    - The minimum value obtained for this objective function is -h,
35      where h is the hypervolume of the Pareto front obtained, given upper_bounds as a reference point.
36    - The callable keeps track of the pareto_front (see attribute paretor_front) and is therefor stateful.
37      For this reason it cannot be distributed. A user can however call the multiobjective_function
38      remotely, and aggregate locally. This is what happens in the "minimize" method of optimizers.
39    """
40
41    def __init__(
42        self,
43        upper_bounds: tp.Optional[tp.ArrayLike] = None,
44        auto_bound: int = AUTO_BOUND,
45        seed: tp.Optional[tp.Union[int, np.random.RandomState]] = None,
46    ) -> None:
47        self._auto_bound = 0
48        self._upper_bounds = (
49            np.array([-float("inf")]) if upper_bounds is None else np.array(upper_bounds, copy=False)
50        )
51        if upper_bounds is None:
52            self._auto_bound = auto_bound
53        self._rng = seed if isinstance(seed, np.random.RandomState) else np.random.RandomState(seed)
54        self._pareto: tp.List[p.Parameter] = []
55        self._best_volume = -float("Inf")
56        self._hypervolume: tp.Optional[HypervolumeIndicator] = None
57        self._pareto_needs_filtering = False
58
59    @property
60    def num_objectives(self) -> int:
61        return self._upper_bounds.size
62
63    @property
64    def best_volume(self) -> float:
65        return self._best_volume
66
67    def _add_to_pareto(self, parameter: p.Parameter) -> None:
68        self._pareto.append(parameter)
69        self._pareto_needs_filtering = True
70
71    def extend(self, parameters: tp.Sequence[p.Parameter]) -> float:
72        output = 0.0
73        for param in parameters:
74            output = self.add(param)
75        return output
76
77    def add(self, parameter: p.Parameter) -> float:
78        """Given parameters and the multiobjective loss, this computes the hypervolume
79        and update the state of the function with new points if it belongs to the pareto front
80        """
81        if not isinstance(parameter, p.Parameter):
82            raise TypeError(
83                f"{self.__class__.__name__}.add should receive a ng.p.Parameter, but got: {parameter}."
84            )
85        losses = parameter.losses
86        if not isinstance(losses, np.ndarray):
87            raise TypeError(
88                f"Parameter should have multivalue as losses, but parameter.losses={losses} ({type(losses)})."
89            )
90        if self._auto_bound > 0:
91            self._auto_bound -= 1
92            if (self._upper_bounds > -float("inf")).all() and (losses > self._upper_bounds).all():
93                return float("inf")  # Avoid uniformly worst points
94            self._upper_bounds = np.maximum(self._upper_bounds, losses)
95            self._add_to_pareto(parameter)
96            return 0.0
97        if self._hypervolume is None:
98            self._hypervolume = HypervolumeIndicator(self._upper_bounds)
99        # get rid of points over the upper bounds
100        if (losses - self._upper_bounds > 0).any():
101            loss = -float(np.sum(np.maximum(0, losses - self._upper_bounds)))
102            if loss > self._best_volume:
103                self._best_volume = loss
104            if self._best_volume < 0:
105                self._add_to_pareto(parameter)
106            return -loss
107        # We compute the hypervolume
108        new_volume = self._hypervolume.compute([pa.losses for pa in self._pareto] + [losses])
109        if new_volume > self._best_volume:
110            # This point is good! Let us give him a great mono-fitness value.
111            self._best_volume = new_volume
112            self._add_to_pareto(parameter)
113            return -new_volume
114        else:
115            # This point is not on the front
116            # First we prune.
117            distance_to_pareto = float("Inf")
118            for param in self.pareto_front():
119                stored_losses = param.losses
120                # TODO the following is probably not good at all:
121                # -> +inf if no point is strictly better (but lower if it is)
122                if (stored_losses <= losses).all():
123                    distance_to_pareto = min(distance_to_pareto, min(losses - stored_losses))
124            assert distance_to_pareto >= 0
125            return -new_volume + distance_to_pareto
126
127    def _filter_pareto_front(self) -> None:
128        """Filters the Pareto front"""
129        new_pareto: tp.List[p.Parameter] = []
130        for param in self._pareto:  # quadratic :(
131            should_be_added = True
132            for other in self._pareto:
133                if (other.losses <= param.losses).all() and (other.losses < param.losses).any():
134                    should_be_added = False
135                    break
136            if should_be_added:
137                new_pareto.append(param)
138        self._pareto = new_pareto
139        self._pareto_needs_filtering = False
140
141    # pylint: disable=too-many-branches
142    def pareto_front(
143        self, size: tp.Optional[int] = None, subset: str = "random", subset_tentatives: int = 12
144    ) -> tp.List[p.Parameter]:
145        """Pareto front, as a list of Parameter. The losses can be accessed through
146        parameter.losses
147
148        Parameters
149        ------------
150        size:  int (optional)
151            if provided, selects a subset of the full pareto front with the given maximum size
152        subset: str
153            method for selecting the subset ("random, "loss-covering", "EPS", "domain-covering", "hypervolume")
154            EPS is the epsilon indicator described e.g.
155                here: https://hal.archives-ouvertes.fr/hal-01159961v2/document
156        subset_tentatives: int
157            number of random tentatives for finding a better subset
158
159        Returns
160        --------
161        list
162            the list of Parameter of the pareto front
163        """
164        if self._pareto_needs_filtering:
165            self._filter_pareto_front()
166        if size is None or size >= len(self._pareto):  # No limit: we return the full set.
167            return self._pareto
168        if subset == "random":
169            return self._rng.choice(self._pareto, size).tolist()  # type: ignore
170        tentatives = [self._rng.choice(self._pareto, size).tolist() for _ in range(subset_tentatives)]
171        if self._hypervolume is None:
172            raise RuntimeError("Hypervolume not initialized, not supported")  # TODO fix
173        hypervolume = self._hypervolume
174        scores: tp.List[float] = []
175        for tentative in tentatives:
176            if subset == "hypervolume":
177                scores += [-hypervolume.compute([pa.losses for pa in tentative])]
178            else:
179                score: float = 0.0
180                for v in self._pareto:
181                    best_score = float("inf") if subset != "EPS" else 0.0
182                    for pa in tentative:
183                        if subset == "loss-covering":  # Equivalent to IGD.
184                            best_score = min(best_score, np.linalg.norm(pa.losses - v.losses))
185                        elif subset == "EPS":  # Cone Epsilon-Dominance.
186                            best_score = min(best_score, max(pa.losses - v.losses))
187                        elif subset == "domain-covering":
188                            best_score = min(
189                                best_score, np.linalg.norm(pa.get_standardized_data(reference=v))
190                            )  # TODO verify
191                        else:
192                            raise ValueError(f'Unknown subset for Pareto-Set subsampling: "{subset}"')
193                    score += best_score ** 2 if subset != "EPS" else max(score, best_score)
194                scores += [score]
195        return tentatives[scores.index(min(scores))]  # type: ignore
196