1'''Quantizing a continuous distribution in 2d
2
3Author: josef-pktd
4'''
5from statsmodels.compat.python import lmap
6import numpy as np
7
8def prob_bv_rectangle(lower, upper, cdf):
9    '''helper function for probability of a rectangle in a bivariate distribution
10
11    Parameters
12    ----------
13    lower : array_like
14        tuple of lower integration bounds
15    upper : array_like
16        tuple of upper integration bounds
17    cdf : callable
18        cdf(x,y), cumulative distribution function of bivariate distribution
19
20
21    how does this generalize to more than 2 variates ?
22    '''
23    probuu = cdf(*upper)
24    probul = cdf(upper[0], lower[1])
25    problu = cdf(lower[0], upper[1])
26    probll = cdf(*lower)
27    return probuu - probul - problu + probll
28
29def prob_mv_grid(bins, cdf, axis=-1):
30    '''helper function for probability of a rectangle grid in a multivariate distribution
31
32    how does this generalize to more than 2 variates ?
33
34    bins : tuple
35        tuple of bin edges, currently it is assumed that they broadcast
36        correctly
37
38    '''
39    if not isinstance(bins, np.ndarray):
40        bins = lmap(np.asarray, bins)
41        n_dim = len(bins)
42        bins_ = []
43        #broadcast if binedges are 1d
44        if all(lmap(np.ndim, bins) == np.ones(n_dim)):
45            for d in range(n_dim):
46                sl = [None]*n_dim
47                sl[d] = slice(None)
48                bins_.append(bins[d][sl])
49    else: #assume it is already correctly broadcasted
50        n_dim = bins.shape[0]
51        bins_ = bins
52
53    print(len(bins))
54    cdf_values = cdf(bins_)
55    probs = cdf_values.copy()
56    for d in range(n_dim):
57        probs = np.diff(probs, axis=d)
58
59    return probs
60
61
62def prob_quantize_cdf(binsx, binsy, cdf):
63    '''quantize a continuous distribution given by a cdf
64
65    Parameters
66    ----------
67    binsx : array_like, 1d
68        binedges
69
70    '''
71    binsx = np.asarray(binsx)
72    binsy = np.asarray(binsy)
73    nx = len(binsx) - 1
74    ny = len(binsy) - 1
75    probs = np.nan * np.ones((nx, ny)) #np.empty(nx,ny)
76    cdf_values = cdf(binsx[:,None], binsy)
77    cdf_func = lambda x, y: cdf_values[x,y]
78    for xind in range(1, nx+1):
79        for yind in range(1, ny+1):
80            upper = (xind, yind)
81            lower = (xind-1, yind-1)
82            #print upper,lower,
83            probs[xind-1,yind-1] = prob_bv_rectangle(lower, upper, cdf_func)
84
85    assert not np.isnan(probs).any()
86    return probs
87
88def prob_quantize_cdf_old(binsx, binsy, cdf):
89    '''quantize a continuous distribution given by a cdf
90
91    old version without precomputing cdf values
92
93    Parameters
94    ----------
95    binsx : array_like, 1d
96        binedges
97
98    '''
99    binsx = np.asarray(binsx)
100    binsy = np.asarray(binsy)
101    nx = len(binsx) - 1
102    ny = len(binsy) - 1
103    probs = np.nan * np.ones((nx, ny)) #np.empty(nx,ny)
104    for xind in range(1, nx+1):
105        for yind in range(1, ny+1):
106            upper = (binsx[xind], binsy[yind])
107            lower = (binsx[xind-1], binsy[yind-1])
108            #print upper,lower,
109            probs[xind-1,yind-1] = prob_bv_rectangle(lower, upper, cdf)
110
111    assert not np.isnan(probs).any()
112    return probs
113
114
115
116
117if __name__ == '__main__':
118    from numpy.testing import assert_almost_equal
119    unif_2d = lambda x,y: x*y
120    assert_almost_equal(prob_bv_rectangle([0,0], [1,0.5], unif_2d), 0.5, 14)
121    assert_almost_equal(prob_bv_rectangle([0,0], [0.5,0.5], unif_2d), 0.25, 14)
122
123    arr1b = np.array([[ 0.05,  0.05,  0.05,  0.05],
124                       [ 0.05,  0.05,  0.05,  0.05],
125                       [ 0.05,  0.05,  0.05,  0.05],
126                       [ 0.05,  0.05,  0.05,  0.05],
127                       [ 0.05,  0.05,  0.05,  0.05]])
128
129    arr1a = prob_quantize_cdf(np.linspace(0,1,6), np.linspace(0,1,5), unif_2d)
130    assert_almost_equal(arr1a, arr1b, 14)
131
132    arr2b = np.array([[ 0.25],
133                      [ 0.25],
134                      [ 0.25],
135                      [ 0.25]])
136    arr2a = prob_quantize_cdf(np.linspace(0,1,5), np.linspace(0,1,2), unif_2d)
137    assert_almost_equal(arr2a, arr2b, 14)
138
139    arr3b = np.array([[ 0.25,  0.25,  0.25,  0.25]])
140    arr3a = prob_quantize_cdf(np.linspace(0,1,2), np.linspace(0,1,5), unif_2d)
141    assert_almost_equal(arr3a, arr3b, 14)
142