1# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
2#
3# This source code is licensed under the MIT license found in the
4# LICENSE file in the root directory of this source tree.
5
6import warnings
7import numpy as np
8import scipy.stats
9import nevergrad.common.typing as tp
10
11
12# Nevergrad, in the most fundamental layer, uses continuous variables only.
13# Discrete variables are handled in one of the following ways:
14# - by a softmax transformation, a k-valued categorical variable is converted into k continuous variables.
15# - by a discretization - as we often use Gaussian random values, we discretize according to quantiles of the normal
16#   distribution.
17def threshold_discretization(x: tp.ArrayLike, arity: int = 2) -> tp.List[int]:
18    """Discretize by casting values from 0 to arity -1, assuming that x values
19    follow a normal distribution.
20
21    Parameters
22    ----------
23    x: list/array
24       values to discretize
25    arity: int
26       the number of possible integer values (arity n will lead to values from 0 to n - 1)
27
28    Note
29    ----
30    - nans are processed as negative infs (yields 0)
31    """
32    x = np.array(x, copy=True)
33    if np.any(np.isnan(x)):
34        warnings.warn("Encountered NaN values for discretization")
35        x[np.isnan(x)] = -np.inf
36    if arity == 2:  # special case, to have 0 yield 0
37        return (np.array(x) > 0).astype(int).tolist()  # type: ignore
38    else:
39        return np.clip(arity * scipy.stats.norm.cdf(x), 0, arity - 1).astype(int).tolist()  # type: ignore
40
41
42# The function below is the opposite of the function above.
43def inverse_threshold_discretization(indexes: tp.List[int], arity: int = 2) -> np.ndarray:
44    indexes_arr = np.array(indexes, copy=True)
45    assert not np.any(np.isnan(indexes_arr))
46    pdf_bin_size = 1 / arity
47    # We take the center of each bin (in the pdf space)
48    x = scipy.stats.norm.ppf(indexes_arr * pdf_bin_size + (pdf_bin_size / 2))  # type: ignore
49    nan_indices = np.where(np.isnan(x))
50    x[nan_indices] = np.sign(indexes_arr[nan_indices] - (arity / 2.0)) * np.finfo(np.dtype("float")).max
51    return x
52
53
54# The discretization is, by nature, not one to one.
55# In the function below, we randomly draw one of the possible inverse values - this is therefore noisy.
56def noisy_inverse_threshold_discretization(
57    indexes: tp.List[int], arity: int = 2, gen: tp.Any = None
58) -> np.ndarray:
59    indexes_arr = np.array(indexes, copy=True)
60    pdf_bin_size = 1 / arity
61    # We take a random point in the bin.
62    return scipy.stats.norm.ppf(indexes_arr * pdf_bin_size + gen.rand() * pdf_bin_size)  # type: ignore
63
64
65def weight_for_reset(arity: int) -> float:
66    """p is an arbitrary probability that the provided arg will be sampled with the returned point"""
67    p = (1 / arity) * 1.5
68    w = float(np.log((p * (arity - 1)) / (1 - p)))
69    return w
70
71
72class Encoder:
73    """Handles softmax weights which need to be turned into probabilities and sampled
74    This class is expected to evolve to be more usable and include more features (like
75    conversion from probabilities to weights?)
76    It will replace most of the code above if possible
77
78    Parameters
79    ----------
80    weights: array
81        the weights of size samples x options, that will be turned to probabilities
82        using softmax.
83    rng: RandomState
84        random number generator for sampling following the probabilities
85
86    Notes
87    -----
88    - if one or several inf values are present in a row, only those are considered
89    - in case of tie, the deterministic value is the first one (lowest) of the tie
90    - nans and -infs are ignored, except if all are (then uniform random choice)
91    """
92
93    def __init__(self, weights: np.ndarray, rng: np.random.RandomState) -> None:
94        self.weights = np.array(weights, copy=True, dtype=float)
95        self.weights[np.isnan(self.weights)] = -np.inf  # 0 proba for nan values
96        self._rng = rng
97
98    def probabilities(self) -> np.ndarray:
99        """Creates the probability matrix from the weights"""
100        axis = 1
101        maxv = np.max(self.weights, axis=1, keepdims=True)
102        hasposinf = np.isposinf(maxv)
103        maxv[np.isinf(maxv)] = 0  # avoid indeterminations
104        exp: np.ndarray = np.exp(self.weights - maxv)
105        # deal with infinite positives special case
106        # by ignoring (0 proba) non-infinte on same row
107        if np.any(hasposinf):
108            is_inf = np.isposinf(self.weights)
109            is_ignored = np.logical_and(np.logical_not(is_inf), hasposinf)
110            exp[is_inf] = 1
111            exp[is_ignored] = 0
112        # random choice if sums to 0
113        sums0 = np.sum(exp, axis=axis) == 0
114        exp[sums0, :] = 1
115        exp /= np.sum(exp, axis=axis, keepdims=True)  # normalize
116        return exp
117
118    def encode(self, deterministic: bool = False) -> np.ndarray:
119        """Sample an index from each row depending on the provided probabilities.
120
121        Parameters
122        ----------
123        deterministic: bool
124            set to True for sampling deterministically the more likely option
125            (largest probability)
126        """
127        axis = 1
128        if deterministic:
129            return np.argmax(self.weights, axis=1)  # type: ignore
130        cumprob = np.cumsum(self.probabilities(), axis=axis)
131        rand = self._rng.rand(cumprob.shape[0], 1)
132        return np.argmin(cumprob < rand, axis=axis)  # type: ignore
133