1"""Collection of approximation functions.""" 2import logging 3 4import numpy 5import chaospy 6 7 8def approximate_inverse( 9 distribution, 10 idx, 11 qloc, 12 bounds=None, 13 cache=None, 14 parameters=None, 15 xloc0=None, 16 iterations=300, 17 tolerance=1e-12, 18): 19 """ 20 Calculate the approximation of the inverse Rosenblatt transformation. 21 22 Uses a hybrid Newton-Raphson and binary search method to converge to the 23 inverse values. Includes forward Rosenblatt transformations, probability 24 density function (its derivative), and if not provided, boundary function. 25 26 Args: 27 distribution (Distribution): 28 Distribution to estimate inverse Rosenblatt on. 29 idx (int): 30 The dimension to take approximation along. 31 qloc (numpy.ndarray): 32 Input values. All values must be on unit interval ``(0, 1)`` and 33 ``qloc.shape == (dim,size)`` where dim is the number of dimensions 34 in distribution and size is the number of values to calculate 35 simultaneously. 36 bounds (Optional[Tuple[numpy.ndarray, numpy.ndarray]]): 37 Assuming lower and upper bounds is not available, this provides 38 outer bounds for lower and upper to use instead. 39 cache (Optional[Dict[Distribution, numpy.ndarray]]): 40 Memory cache for the location in the evaluation so far. 41 parameters (Optional[Dict[str, Any]]): 42 The parameters to use. If omitted, get the parameters from 43 distribution. 44 iterations (int): 45 The maximum number of iterations allowed. 46 tolerance (float): 47 Tolerance criterion determining convergence. 48 49 Returns: 50 (numpy.ndarray): 51 Approximation of inverse Rosenblatt transformation. 52 53 Example: 54 >>> distribution = chaospy.Normal(1000, 10) 55 >>> qloc = numpy.array([0.1, 0.2, 0.9]) 56 >>> inverse = distribution.inv(qloc) 57 >>> inverse.round(4) 58 array([ 987.1845, 991.5838, 1012.8155]) 59 >>> distribution._ppf = None 60 >>> numpy.allclose( 61 ... approximate_inverse(distribution, 0, qloc), inverse) 62 True 63 64 """ 65 logger = logging.getLogger(__name__) 66 logger.debug("init approximate_inverse: %s", distribution) 67 logger.debug("cache: %s", cache) 68 69 # lots of initial values: 70 if cache is None: 71 cache = {} 72 if bounds is None: 73 xlower = distribution._get_lower(idx, cache.copy()) 74 xupper = distribution._get_upper(idx, cache.copy()) 75 else: 76 xlower = numpy.broadcast_to(bounds[0], qloc.shape) 77 xupper = numpy.broadcast_to(bounds[1], qloc.shape) 78 _lower, distribution._lower = distribution._lower, lambda **kws: xlower 79 _upper, distribution._upper = distribution._upper, lambda **kws: xupper 80 81 xloc = xlower+qloc*(xlower+xupper) if xloc0 is None else xloc0 82 uloc = numpy.zeros(qloc.shape) 83 ulower = -qloc 84 uupper = 1-qloc 85 indices = numpy.ones(qloc.shape[-1], dtype=bool) 86 87 cache_copy = cache.copy() 88 parameters_copy = distribution._parameters.copy() 89 if parameters is not None: 90 91 assert not any([isinstance(value, chaospy.Distribution) 92 for value in parameters.values()]) 93 distribution._parameters.update(parameters) 94 95 for idx_ in range(2*iterations): 96 97 cache.clear() 98 cache.update(cache_copy) 99 # evaluate function: 100 uloc = numpy.where( 101 indices, distribution._get_fwd(xloc, idx, cache)-qloc, uloc) 102 103 # convergence criteria: 104 indices &= numpy.abs(uloc) > tolerance 105 if not numpy.any(indices): 106 break 107 108 # narrow down boundaries: 109 ulower = numpy.where(indices & (uloc < 0), uloc, ulower) 110 xlower = numpy.where(indices & (uloc < 0), xloc, xlower) 111 uupper = numpy.where(indices & (uloc > 0), uloc, uupper) 112 xupper = numpy.where(indices & (uloc > 0), xloc, xupper) 113 114 # Newton increment every second iteration: 115 xloc_ = numpy.inf 116 if idx_ % 2 == 0: 117 cache.clear() 118 cache.update(cache_copy) 119 try: 120 derivative = distribution._get_pdf(xloc, idx, cache) 121 except chaospy.UnsupportedFeature: 122 cache.clear() 123 cache.update(cache_copy) 124 derivative = approximate_density(distribution, idx, xloc, cache) 125 derivative = numpy.where(derivative, derivative, 1) 126 xloc_ = xloc-uloc/derivative 127 128 # use binary search if Newton increment is outside bounds: 129 weight = numpy.random.random() 130 xloc_ = numpy.where((xloc_ < xupper) & (xloc_ > xlower), 131 xloc_, weight*xupper+(1-weight)*xlower) 132 xloc = numpy.where(indices, xloc_, xloc) 133 134 else: 135 logger.warning( 136 "Too many iterations required to estimate inverse.") 137 logger.info("%d out of %d did not converge.", 138 numpy.sum(indices), len(indices)) 139 # print("Too many iterations required to estimate inverse.") 140 # print("%d out of %d did not converge." % (numpy.sum(indices), len(indices))) 141 142 cache.clear() 143 cache.update(cache_copy) 144 distribution._parameters.clear() 145 distribution._parameters.update(parameters_copy) 146 if bounds is not None: 147 distribution._lower = _lower 148 distribution._upper = _upper 149 logger.debug("%s: ppf approx used %d steps", distribution, idx_/2) 150 # print("%s: ppf approx used %d steps" % (distribution, idx_/2)) 151 return xloc 152 153MOMENTS_QUADS = {} 154 155 156def approximate_moment( 157 distribution, 158 k_loc, 159 order=100000, 160 rule="clenshaw_curtis", 161 **kwargs 162): 163 """ 164 Approximation method for estimation of raw statistical moments. 165 166 Uses quadrature integration to estimate the values. 167 168 Args: 169 distribution (Distribution): 170 Distribution domain with dim=len(distribution) 171 k_loc (Sequence[int, ...]): 172 The exponents of the moments of interest with ``shape == (dim,)``. 173 order (int): 174 The quadrature order used in approximation. 175 rule (str): 176 Quadrature rule for integrating moments. 177 kwargs: 178 Extra args passed to :func:`chaospy.generate_quadrature`. 179 180 Examples: 181 >>> distribution = chaospy.Uniform(1, 4) 182 >>> round(chaospy.approximate_moment(distribution, (1,)), 4) 183 2.5 184 >>> round(chaospy.approximate_moment(distribution, (2,)), 4) 185 7.0 186 187 """ 188 assert isinstance(distribution, chaospy.Distribution) 189 k_loc = numpy.asarray(k_loc) 190 if len(distribution) > 1: 191 assert not distribution.stochastic_dependent, ( 192 "Dependent distributions does not support moment approximation.") 193 assert len(k_loc) == len(distribution), "incorrect size of exponents" 194 return numpy.prod([ 195 approximate_moment(distribution[idx], (k_loc[idx],), 196 order=order, rule=rule, **kwargs) 197 for idx in range(len(distribution)) 198 ], axis=0) 199 200 k_loc = tuple(k_loc.tolist()) 201 assert all([isinstance(k, int) for k in k_loc]), ( 202 "exponents must be integers: %s found" % type(k_loc[0])) 203 204 order = int(1e5 if order is None else order) 205 if (distribution, order) not in MOMENTS_QUADS: 206 MOMENTS_QUADS[distribution, order] = chaospy.generate_quadrature( 207 order, distribution, rule=rule, **kwargs) 208 X, W = MOMENTS_QUADS[distribution, order] 209 210 if k_loc in distribution._mom_cache: 211 return distribution._mom_cache[k_loc] 212 213 out = float(numpy.sum(numpy.prod(X.T**k_loc, 1)*W)) 214 distribution._mom_cache[k_loc] = out 215 return out 216 217 218def approximate_density( 219 distribution, 220 idx, 221 xloc, 222 cache=None, 223 step_size=1e-7 224): 225 """ 226 Approximate the probability density function. 227 228 Args: 229 distribution (Distribution): 230 Distribution in question. May not be an advanced variable. 231 idx (int): 232 The dimension to take approximation along. 233 xloc (numpy.ndarray): 234 Location coordinates. Requires that xloc.shape=(len(distribution), K). 235 cache (Optional[Dict[Distribution, Tuple[numpy.ndarray, numpy.ndarray]]]): 236 Current state in the evaluation graph. If omitted, assume that 237 evaluations should be done from scratch. 238 step_size (float): 239 The relative step size between two points used to calculate the 240 derivative. 241 242 Returns: 243 numpy.ndarray: Local probability density function with 244 ``out.shape == xloc.shape``. To calculate actual density function, 245 evaluate ``numpy.prod(out, 0)``. 246 247 Example: 248 >>> distribution = chaospy.Normal(1000, 10) 249 >>> xloc = numpy.array([990, 1000, 1010]) 250 >>> approximate_density(distribution, 0, xloc).round(4) 251 array([0.0242, 0.0399, 0.0242]) 252 >>> distribution.pdf(xloc).round(4) 253 array([0.0242, 0.0399, 0.0242]) 254 255 """ 256 cache = {} if cache is None else cache 257 assert (idx, distribution) not in cache 258 xloc = numpy.asfarray(xloc) 259 assert xloc.ndim == 1 260 lower, upper = numpy.min(xloc), numpy.max(xloc) 261 middle = .5*(lower+upper) 262 step_size = (numpy.where(xloc < middle, step_size, -step_size)* 263 numpy.clip(numpy.abs(xloc), 1, None)) 264 265 cache1 = cache.copy() 266 floc1 = distribution._get_fwd(xloc, idx, cache=cache1) 267 cache2 = cache.copy() 268 floc2 = distribution._get_fwd(xloc+step_size, idx, cache=cache2) 269 floc = numpy.abs((floc2-floc1)/step_size) 270 271 # weave a history of pdf from two cdf streams 272 for key in set(cache1).difference(cache): 273 cache[key] = cache1[key][0], ((cache2[key][1]-cache1[key][1])/ 274 (cache2[key][0]-cache1[key][0])) 275 276 277 assert floc.shape == xloc.shape 278 return floc 279