1import numpy as np
2import pytest
3
4from pandas import DataFrame, Series, concat
5import pandas._testing as tm
6
7
8@pytest.mark.parametrize("func", ["cov", "corr"])
9def test_ewm_pairwise_cov_corr(func, frame):
10    result = getattr(frame.ewm(span=10, min_periods=5), func)()
11    result = result.loc[(slice(None), 1), 5]
12    result.index = result.index.droplevel(1)
13    expected = getattr(frame[1].ewm(span=10, min_periods=5), func)(frame[5])
14    tm.assert_series_equal(result, expected, check_names=False)
15
16
17@pytest.mark.parametrize("name", ["cov", "corr"])
18def test_ewm_corr_cov(name):
19    A = Series(np.random.randn(50), index=np.arange(50))
20    B = A[2:] + np.random.randn(48)
21
22    A[:10] = np.NaN
23    B[-10:] = np.NaN
24
25    result = getattr(A.ewm(com=20, min_periods=5), name)(B)
26    assert np.isnan(result.values[:14]).all()
27    assert not np.isnan(result.values[14:]).any()
28
29
30@pytest.mark.parametrize("min_periods", [0, 1, 2])
31@pytest.mark.parametrize("name", ["cov", "corr"])
32def test_ewm_corr_cov_min_periods(name, min_periods):
33    # GH 7898
34    A = Series(np.random.randn(50), index=np.arange(50))
35    B = A[2:] + np.random.randn(48)
36
37    A[:10] = np.NaN
38    B[-10:] = np.NaN
39
40    result = getattr(A.ewm(com=20, min_periods=min_periods), name)(B)
41    # binary functions (ewmcov, ewmcorr) with bias=False require at
42    # least two values
43    assert np.isnan(result.values[:11]).all()
44    assert not np.isnan(result.values[11:]).any()
45
46    # check series of length 0
47    empty = Series([], dtype=np.float64)
48    result = getattr(empty.ewm(com=50, min_periods=min_periods), name)(empty)
49    tm.assert_series_equal(result, empty)
50
51    # check series of length 1
52    result = getattr(Series([1.0]).ewm(com=50, min_periods=min_periods), name)(
53        Series([1.0])
54    )
55    tm.assert_series_equal(result, Series([np.NaN]))
56
57
58@pytest.mark.parametrize("name", ["cov", "corr"])
59def test_different_input_array_raise_exception(name):
60    A = Series(np.random.randn(50), index=np.arange(50))
61    A[:10] = np.NaN
62
63    msg = "Input arrays must be of the same type!"
64    # exception raised is Exception
65    with pytest.raises(Exception, match=msg):
66        getattr(A.ewm(com=20, min_periods=5), name)(np.random.randn(50))
67
68
69def create_mock_weights(obj, com, adjust, ignore_na):
70    if isinstance(obj, DataFrame):
71        if not len(obj.columns):
72            return DataFrame(index=obj.index, columns=obj.columns)
73        w = concat(
74            [
75                create_mock_series_weights(
76                    obj.iloc[:, i], com=com, adjust=adjust, ignore_na=ignore_na
77                )
78                for i, _ in enumerate(obj.columns)
79            ],
80            axis=1,
81        )
82        w.index = obj.index
83        w.columns = obj.columns
84        return w
85    else:
86        return create_mock_series_weights(obj, com, adjust, ignore_na)
87
88
89def create_mock_series_weights(s, com, adjust, ignore_na):
90    w = Series(np.nan, index=s.index)
91    alpha = 1.0 / (1.0 + com)
92    if adjust:
93        count = 0
94        for i in range(len(s)):
95            if s.iat[i] == s.iat[i]:
96                w.iat[i] = pow(1.0 / (1.0 - alpha), count)
97                count += 1
98            elif not ignore_na:
99                count += 1
100    else:
101        sum_wts = 0.0
102        prev_i = -1
103        count = 0
104        for i in range(len(s)):
105            if s.iat[i] == s.iat[i]:
106                if prev_i == -1:
107                    w.iat[i] = 1.0
108                else:
109                    w.iat[i] = alpha * sum_wts / pow(1.0 - alpha, count - prev_i)
110                sum_wts += w.iat[i]
111                prev_i = count
112                count += 1
113            elif not ignore_na:
114                count += 1
115    return w
116
117
118@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
119def test_ewm_consistency_mean(consistency_data, adjust, ignore_na, min_periods):
120    x, is_constant, no_nans = consistency_data
121    com = 3.0
122
123    result = x.ewm(
124        com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
125    ).mean()
126    weights = create_mock_weights(x, com=com, adjust=adjust, ignore_na=ignore_na)
127    expected = (
128        x.multiply(weights).cumsum().divide(weights.cumsum()).fillna(method="ffill")
129    )
130    expected[
131        x.expanding().count() < (max(min_periods, 1) if min_periods else 1)
132    ] = np.nan
133    tm.assert_equal(result, expected.astype("float64"))
134
135
136@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
137def test_ewm_consistency_consistent(consistency_data, adjust, ignore_na, min_periods):
138    x, is_constant, no_nans = consistency_data
139    com = 3.0
140
141    if is_constant:
142        count_x = x.expanding().count()
143        mean_x = x.ewm(
144            com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
145        ).mean()
146        # check that correlation of a series with itself is either 1 or NaN
147        corr_x_x = x.ewm(
148            com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
149        ).corr(x)
150        exp = x.max() if isinstance(x, Series) else x.max().max()
151
152        # check mean of constant series
153        expected = x * np.nan
154        expected[count_x >= max(min_periods, 1)] = exp
155        tm.assert_equal(mean_x, expected)
156
157        # check correlation of constant series with itself is NaN
158        expected[:] = np.nan
159        tm.assert_equal(corr_x_x, expected)
160
161
162@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
163def test_ewm_consistency_var_debiasing_factors(
164    consistency_data, adjust, ignore_na, min_periods
165):
166    x, is_constant, no_nans = consistency_data
167    com = 3.0
168
169    # check variance debiasing factors
170    var_unbiased_x = x.ewm(
171        com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
172    ).var(bias=False)
173    var_biased_x = x.ewm(
174        com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
175    ).var(bias=True)
176
177    weights = create_mock_weights(x, com=com, adjust=adjust, ignore_na=ignore_na)
178    cum_sum = weights.cumsum().fillna(method="ffill")
179    cum_sum_sq = (weights * weights).cumsum().fillna(method="ffill")
180    numerator = cum_sum * cum_sum
181    denominator = numerator - cum_sum_sq
182    denominator[denominator <= 0.0] = np.nan
183    var_debiasing_factors_x = numerator / denominator
184
185    tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x)
186
187
188@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
189@pytest.mark.parametrize("bias", [True, False])
190def test_moments_consistency_var(
191    consistency_data, adjust, ignore_na, min_periods, bias
192):
193    x, is_constant, no_nans = consistency_data
194    com = 3.0
195
196    mean_x = x.ewm(
197        com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
198    ).mean()
199    var_x = x.ewm(
200        com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
201    ).var(bias=bias)
202    assert not (var_x < 0).any().any()
203
204    if bias:
205        # check that biased var(x) == mean(x^2) - mean(x)^2
206        mean_x2 = (
207            (x * x)
208            .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
209            .mean()
210        )
211        tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x))
212
213
214@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
215@pytest.mark.parametrize("bias", [True, False])
216def test_moments_consistency_var_constant(
217    consistency_data, adjust, ignore_na, min_periods, bias
218):
219    x, is_constant, no_nans = consistency_data
220    com = 3.0
221    if is_constant:
222        count_x = x.expanding(min_periods=min_periods).count()
223        var_x = x.ewm(
224            com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
225        ).var(bias=bias)
226
227        # check that variance of constant series is identically 0
228        assert not (var_x > 0).any().any()
229        expected = x * np.nan
230        expected[count_x >= max(min_periods, 1)] = 0.0
231        if not bias:
232            expected[count_x < 2] = np.nan
233        tm.assert_equal(var_x, expected)
234
235
236@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
237@pytest.mark.parametrize("bias", [True, False])
238def test_ewm_consistency_std(consistency_data, adjust, ignore_na, min_periods, bias):
239    x, is_constant, no_nans = consistency_data
240    com = 3.0
241    var_x = x.ewm(
242        com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
243    ).var(bias=bias)
244    std_x = x.ewm(
245        com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
246    ).std(bias=bias)
247    assert not (var_x < 0).any().any()
248    assert not (std_x < 0).any().any()
249
250    # check that var(x) == std(x)^2
251    tm.assert_equal(var_x, std_x * std_x)
252
253
254@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
255@pytest.mark.parametrize("bias", [True, False])
256def test_ewm_consistency_cov(consistency_data, adjust, ignore_na, min_periods, bias):
257    x, is_constant, no_nans = consistency_data
258    com = 3.0
259    var_x = x.ewm(
260        com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
261    ).var(bias=bias)
262    assert not (var_x < 0).any().any()
263
264    cov_x_x = x.ewm(
265        com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
266    ).cov(x, bias=bias)
267    assert not (cov_x_x < 0).any().any()
268
269    # check that var(x) == cov(x, x)
270    tm.assert_equal(var_x, cov_x_x)
271
272
273@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
274@pytest.mark.parametrize("bias", [True, False])
275def test_ewm_consistency_series_cov_corr(
276    consistency_data, adjust, ignore_na, min_periods, bias
277):
278    x, is_constant, no_nans = consistency_data
279    com = 3.0
280
281    if isinstance(x, Series):
282        var_x_plus_y = (
283            (x + x)
284            .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na)
285            .var(bias=bias)
286        )
287        var_x = x.ewm(
288            com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
289        ).var(bias=bias)
290        var_y = x.ewm(
291            com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
292        ).var(bias=bias)
293        cov_x_y = x.ewm(
294            com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
295        ).cov(x, bias=bias)
296        # check that cov(x, y) == (var(x+y) - var(x) -
297        # var(y)) / 2
298        tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))
299
300        # check that corr(x, y) == cov(x, y) / (std(x) *
301        # std(y))
302        corr_x_y = x.ewm(
303            com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
304        ).corr(x, bias=bias)
305        std_x = x.ewm(
306            com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
307        ).std(bias=bias)
308        std_y = x.ewm(
309            com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
310        ).std(bias=bias)
311        tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))
312
313        if bias:
314            # check that biased cov(x, y) == mean(x*y) -
315            # mean(x)*mean(y)
316            mean_x = x.ewm(
317                com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
318            ).mean()
319            mean_y = x.ewm(
320                com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
321            ).mean()
322            mean_x_times_y = (
323                (x * x)
324                .ewm(
325                    com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na
326                )
327                .mean()
328            )
329            tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))
330