1import numpy as np
2import pytest
3
4from pandas import DataFrame, Index, MultiIndex, Series, isna, notna
5import pandas._testing as tm
6
7
8def test_expanding_corr(series):
9    A = series.dropna()
10    B = (A + np.random.randn(len(A)))[:-5]
11
12    result = A.expanding().corr(B)
13
14    rolling_result = A.rolling(window=len(A), min_periods=1).corr(B)
15
16    tm.assert_almost_equal(rolling_result, result)
17
18
19def test_expanding_count(series):
20    result = series.expanding(min_periods=0).count()
21    tm.assert_almost_equal(
22        result, series.rolling(window=len(series), min_periods=0).count()
23    )
24
25
26def test_expanding_quantile(series):
27    result = series.expanding().quantile(0.5)
28
29    rolling_result = series.rolling(window=len(series), min_periods=1).quantile(0.5)
30
31    tm.assert_almost_equal(result, rolling_result)
32
33
34def test_expanding_cov(series):
35    A = series
36    B = (A + np.random.randn(len(A)))[:-5]
37
38    result = A.expanding().cov(B)
39
40    rolling_result = A.rolling(window=len(A), min_periods=1).cov(B)
41
42    tm.assert_almost_equal(rolling_result, result)
43
44
45def test_expanding_cov_pairwise(frame):
46    result = frame.expanding().cov()
47
48    rolling_result = frame.rolling(window=len(frame), min_periods=1).cov()
49
50    tm.assert_frame_equal(result, rolling_result)
51
52
53def test_expanding_corr_pairwise(frame):
54    result = frame.expanding().corr()
55
56    rolling_result = frame.rolling(window=len(frame), min_periods=1).corr()
57    tm.assert_frame_equal(result, rolling_result)
58
59
60@pytest.mark.parametrize(
61    "func,static_comp",
62    [("sum", np.sum), ("mean", np.mean), ("max", np.max), ("min", np.min)],
63    ids=["sum", "mean", "max", "min"],
64)
65def test_expanding_func(func, static_comp, frame_or_series):
66    data = frame_or_series(np.array(list(range(10)) + [np.nan] * 10))
67    result = getattr(data.expanding(min_periods=1, axis=0), func)()
68    assert isinstance(result, frame_or_series)
69
70    if frame_or_series is Series:
71        tm.assert_almost_equal(result[10], static_comp(data[:11]))
72    else:
73        tm.assert_series_equal(
74            result.iloc[10], static_comp(data[:11]), check_names=False
75        )
76
77
78@pytest.mark.parametrize(
79    "func,static_comp",
80    [("sum", np.sum), ("mean", np.mean), ("max", np.max), ("min", np.min)],
81    ids=["sum", "mean", "max", "min"],
82)
83def test_expanding_min_periods(func, static_comp):
84    ser = Series(np.random.randn(50))
85
86    result = getattr(ser.expanding(min_periods=30, axis=0), func)()
87    assert result[:29].isna().all()
88    tm.assert_almost_equal(result.iloc[-1], static_comp(ser[:50]))
89
90    # min_periods is working correctly
91    result = getattr(ser.expanding(min_periods=15, axis=0), func)()
92    assert isna(result.iloc[13])
93    assert notna(result.iloc[14])
94
95    ser2 = Series(np.random.randn(20))
96    result = getattr(ser2.expanding(min_periods=5, axis=0), func)()
97    assert isna(result[3])
98    assert notna(result[4])
99
100    # min_periods=0
101    result0 = getattr(ser.expanding(min_periods=0, axis=0), func)()
102    result1 = getattr(ser.expanding(min_periods=1, axis=0), func)()
103    tm.assert_almost_equal(result0, result1)
104
105    result = getattr(ser.expanding(min_periods=1, axis=0), func)()
106    tm.assert_almost_equal(result.iloc[-1], static_comp(ser[:50]))
107
108
109def test_expanding_apply(engine_and_raw, frame_or_series):
110    engine, raw = engine_and_raw
111    data = frame_or_series(np.array(list(range(10)) + [np.nan] * 10))
112    result = data.expanding(min_periods=1).apply(
113        lambda x: x.mean(), raw=raw, engine=engine
114    )
115    assert isinstance(result, frame_or_series)
116
117    if frame_or_series is Series:
118        tm.assert_almost_equal(result[9], np.mean(data[:11]))
119    else:
120        tm.assert_series_equal(result.iloc[9], np.mean(data[:11]), check_names=False)
121
122
123def test_expanding_min_periods_apply(engine_and_raw):
124    engine, raw = engine_and_raw
125    ser = Series(np.random.randn(50))
126
127    result = ser.expanding(min_periods=30).apply(
128        lambda x: x.mean(), raw=raw, engine=engine
129    )
130    assert result[:29].isna().all()
131    tm.assert_almost_equal(result.iloc[-1], np.mean(ser[:50]))
132
133    # min_periods is working correctly
134    result = ser.expanding(min_periods=15).apply(
135        lambda x: x.mean(), raw=raw, engine=engine
136    )
137    assert isna(result.iloc[13])
138    assert notna(result.iloc[14])
139
140    ser2 = Series(np.random.randn(20))
141    result = ser2.expanding(min_periods=5).apply(
142        lambda x: x.mean(), raw=raw, engine=engine
143    )
144    assert isna(result[3])
145    assert notna(result[4])
146
147    # min_periods=0
148    result0 = ser.expanding(min_periods=0).apply(
149        lambda x: x.mean(), raw=raw, engine=engine
150    )
151    result1 = ser.expanding(min_periods=1).apply(
152        lambda x: x.mean(), raw=raw, engine=engine
153    )
154    tm.assert_almost_equal(result0, result1)
155
156    result = ser.expanding(min_periods=1).apply(
157        lambda x: x.mean(), raw=raw, engine=engine
158    )
159    tm.assert_almost_equal(result.iloc[-1], np.mean(ser[:50]))
160
161
162@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
163@pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum])
164def test_expanding_apply_consistency_sum_nans(consistency_data, min_periods, f):
165    x, is_constant, no_nans = consistency_data
166
167    if f is np.nansum and min_periods == 0:
168        pass
169    else:
170        expanding_f_result = x.expanding(min_periods=min_periods).sum()
171        expanding_apply_f_result = x.expanding(min_periods=min_periods).apply(
172            func=f, raw=True
173        )
174        tm.assert_equal(expanding_f_result, expanding_apply_f_result)
175
176
177@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
178@pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum, np.sum])
179def test_expanding_apply_consistency_sum_no_nans(consistency_data, min_periods, f):
180
181    x, is_constant, no_nans = consistency_data
182
183    if no_nans:
184        if f is np.nansum and min_periods == 0:
185            pass
186        else:
187            expanding_f_result = x.expanding(min_periods=min_periods).sum()
188            expanding_apply_f_result = x.expanding(min_periods=min_periods).apply(
189                func=f, raw=True
190            )
191            tm.assert_equal(expanding_f_result, expanding_apply_f_result)
192
193
194@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
195@pytest.mark.parametrize("ddof", [0, 1])
196def test_moments_consistency_var(consistency_data, min_periods, ddof):
197    x, is_constant, no_nans = consistency_data
198
199    mean_x = x.expanding(min_periods=min_periods).mean()
200    var_x = x.expanding(min_periods=min_periods).var(ddof=ddof)
201    assert not (var_x < 0).any().any()
202
203    if ddof == 0:
204        # check that biased var(x) == mean(x^2) - mean(x)^2
205        mean_x2 = (x * x).expanding(min_periods=min_periods).mean()
206        tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x))
207
208
209@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
210@pytest.mark.parametrize("ddof", [0, 1])
211def test_moments_consistency_var_constant(consistency_data, min_periods, ddof):
212    x, is_constant, no_nans = consistency_data
213
214    if is_constant:
215        count_x = x.expanding(min_periods=min_periods).count()
216        var_x = x.expanding(min_periods=min_periods).var(ddof=ddof)
217
218        # check that variance of constant series is identically 0
219        assert not (var_x > 0).any().any()
220        expected = x * np.nan
221        expected[count_x >= max(min_periods, 1)] = 0.0
222        if ddof == 1:
223            expected[count_x < 2] = np.nan
224        tm.assert_equal(var_x, expected)
225
226
227@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
228@pytest.mark.parametrize("ddof", [0, 1])
229def test_expanding_consistency_std(consistency_data, min_periods, ddof):
230    x, is_constant, no_nans = consistency_data
231
232    var_x = x.expanding(min_periods=min_periods).var(ddof=ddof)
233    std_x = x.expanding(min_periods=min_periods).std(ddof=ddof)
234    assert not (var_x < 0).any().any()
235    assert not (std_x < 0).any().any()
236
237    # check that var(x) == std(x)^2
238    tm.assert_equal(var_x, std_x * std_x)
239
240
241@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
242@pytest.mark.parametrize("ddof", [0, 1])
243def test_expanding_consistency_cov(consistency_data, min_periods, ddof):
244    x, is_constant, no_nans = consistency_data
245    var_x = x.expanding(min_periods=min_periods).var(ddof=ddof)
246    assert not (var_x < 0).any().any()
247
248    cov_x_x = x.expanding(min_periods=min_periods).cov(x, ddof=ddof)
249    assert not (cov_x_x < 0).any().any()
250
251    # check that var(x) == cov(x, x)
252    tm.assert_equal(var_x, cov_x_x)
253
254
255@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
256@pytest.mark.parametrize("ddof", [0, 1])
257def test_expanding_consistency_series_cov_corr(consistency_data, min_periods, ddof):
258    x, is_constant, no_nans = consistency_data
259
260    if isinstance(x, Series):
261        var_x_plus_y = (x + x).expanding(min_periods=min_periods).var(ddof=ddof)
262        var_x = x.expanding(min_periods=min_periods).var(ddof=ddof)
263        var_y = x.expanding(min_periods=min_periods).var(ddof=ddof)
264        cov_x_y = x.expanding(min_periods=min_periods).cov(x, ddof=ddof)
265        # check that cov(x, y) == (var(x+y) - var(x) -
266        # var(y)) / 2
267        tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))
268
269        # check that corr(x, y) == cov(x, y) / (std(x) *
270        # std(y))
271        corr_x_y = x.expanding(min_periods=min_periods).corr(x)
272        std_x = x.expanding(min_periods=min_periods).std(ddof=ddof)
273        std_y = x.expanding(min_periods=min_periods).std(ddof=ddof)
274        tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))
275
276        if ddof == 0:
277            # check that biased cov(x, y) == mean(x*y) -
278            # mean(x)*mean(y)
279            mean_x = x.expanding(min_periods=min_periods).mean()
280            mean_y = x.expanding(min_periods=min_periods).mean()
281            mean_x_times_y = (x * x).expanding(min_periods=min_periods).mean()
282            tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))
283
284
285@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
286def test_expanding_consistency_mean(consistency_data, min_periods):
287    x, is_constant, no_nans = consistency_data
288
289    result = x.expanding(min_periods=min_periods).mean()
290    expected = (
291        x.expanding(min_periods=min_periods).sum()
292        / x.expanding(min_periods=min_periods).count()
293    )
294    tm.assert_equal(result, expected.astype("float64"))
295
296
297@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
298def test_expanding_consistency_constant(consistency_data, min_periods):
299    x, is_constant, no_nans = consistency_data
300
301    if is_constant:
302        count_x = x.expanding().count()
303        mean_x = x.expanding(min_periods=min_periods).mean()
304        # check that correlation of a series with itself is either 1 or NaN
305        corr_x_x = x.expanding(min_periods=min_periods).corr(x)
306
307        exp = x.max() if isinstance(x, Series) else x.max().max()
308
309        # check mean of constant series
310        expected = x * np.nan
311        expected[count_x >= max(min_periods, 1)] = exp
312        tm.assert_equal(mean_x, expected)
313
314        # check correlation of constant series with itself is NaN
315        expected[:] = np.nan
316        tm.assert_equal(corr_x_x, expected)
317
318
319@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4])
320def test_expanding_consistency_var_debiasing_factors(consistency_data, min_periods):
321    x, is_constant, no_nans = consistency_data
322
323    # check variance debiasing factors
324    var_unbiased_x = x.expanding(min_periods=min_periods).var()
325    var_biased_x = x.expanding(min_periods=min_periods).var(ddof=0)
326    var_debiasing_factors_x = x.expanding().count() / (
327        x.expanding().count() - 1.0
328    ).replace(0.0, np.nan)
329    tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x)
330
331
332@pytest.mark.parametrize(
333    "f",
334    [
335        lambda x: (x.expanding(min_periods=5).cov(x, pairwise=True)),
336        lambda x: (x.expanding(min_periods=5).corr(x, pairwise=True)),
337    ],
338)
339def test_moment_functions_zero_length_pairwise(f):
340
341    df1 = DataFrame()
342    df2 = DataFrame(columns=Index(["a"], name="foo"), index=Index([], name="bar"))
343    df2["a"] = df2["a"].astype("float64")
344
345    df1_expected = DataFrame(
346        index=MultiIndex.from_product([df1.index, df1.columns]), columns=Index([])
347    )
348    df2_expected = DataFrame(
349        index=MultiIndex.from_product([df2.index, df2.columns], names=["bar", "foo"]),
350        columns=Index(["a"], name="foo"),
351        dtype="float64",
352    )
353
354    df1_result = f(df1)
355    tm.assert_frame_equal(df1_result, df1_expected)
356
357    df2_result = f(df2)
358    tm.assert_frame_equal(df2_result, df2_expected)
359
360
361@pytest.mark.parametrize(
362    "f",
363    [
364        lambda x: x.expanding().count(),
365        lambda x: x.expanding(min_periods=5).cov(x, pairwise=False),
366        lambda x: x.expanding(min_periods=5).corr(x, pairwise=False),
367        lambda x: x.expanding(min_periods=5).max(),
368        lambda x: x.expanding(min_periods=5).min(),
369        lambda x: x.expanding(min_periods=5).sum(),
370        lambda x: x.expanding(min_periods=5).mean(),
371        lambda x: x.expanding(min_periods=5).std(),
372        lambda x: x.expanding(min_periods=5).var(),
373        lambda x: x.expanding(min_periods=5).skew(),
374        lambda x: x.expanding(min_periods=5).kurt(),
375        lambda x: x.expanding(min_periods=5).quantile(0.5),
376        lambda x: x.expanding(min_periods=5).median(),
377        lambda x: x.expanding(min_periods=5).apply(sum, raw=False),
378        lambda x: x.expanding(min_periods=5).apply(sum, raw=True),
379    ],
380)
381def test_moment_functions_zero_length(f):
382    # GH 8056
383    s = Series(dtype=np.float64)
384    s_expected = s
385    df1 = DataFrame()
386    df1_expected = df1
387    df2 = DataFrame(columns=["a"])
388    df2["a"] = df2["a"].astype("float64")
389    df2_expected = df2
390
391    s_result = f(s)
392    tm.assert_series_equal(s_result, s_expected)
393
394    df1_result = f(df1)
395    tm.assert_frame_equal(df1_result, df1_expected)
396
397    df2_result = f(df2)
398    tm.assert_frame_equal(df2_result, df2_expected)
399
400
401def test_expanding_apply_empty_series(engine_and_raw):
402    engine, raw = engine_and_raw
403    ser = Series([], dtype=np.float64)
404    tm.assert_series_equal(
405        ser, ser.expanding().apply(lambda x: x.mean(), raw=raw, engine=engine)
406    )
407
408
409def test_expanding_apply_min_periods_0(engine_and_raw):
410    # GH 8080
411    engine, raw = engine_and_raw
412    s = Series([None, None, None])
413    result = s.expanding(min_periods=0).apply(lambda x: len(x), raw=raw, engine=engine)
414    expected = Series([1.0, 2.0, 3.0])
415    tm.assert_series_equal(result, expected)
416
417
418def test_expanding_cov_diff_index():
419    # GH 7512
420    s1 = Series([1, 2, 3], index=[0, 1, 2])
421    s2 = Series([1, 3], index=[0, 2])
422    result = s1.expanding().cov(s2)
423    expected = Series([None, None, 2.0])
424    tm.assert_series_equal(result, expected)
425
426    s2a = Series([1, None, 3], index=[0, 1, 2])
427    result = s1.expanding().cov(s2a)
428    tm.assert_series_equal(result, expected)
429
430    s1 = Series([7, 8, 10], index=[0, 1, 3])
431    s2 = Series([7, 9, 10], index=[0, 2, 3])
432    result = s1.expanding().cov(s2)
433    expected = Series([None, None, None, 4.5])
434    tm.assert_series_equal(result, expected)
435
436
437def test_expanding_corr_diff_index():
438    # GH 7512
439    s1 = Series([1, 2, 3], index=[0, 1, 2])
440    s2 = Series([1, 3], index=[0, 2])
441    result = s1.expanding().corr(s2)
442    expected = Series([None, None, 1.0])
443    tm.assert_series_equal(result, expected)
444
445    s2a = Series([1, None, 3], index=[0, 1, 2])
446    result = s1.expanding().corr(s2a)
447    tm.assert_series_equal(result, expected)
448
449    s1 = Series([7, 8, 10], index=[0, 1, 3])
450    s2 = Series([7, 9, 10], index=[0, 2, 3])
451    result = s1.expanding().corr(s2)
452    expected = Series([None, None, None, 1.0])
453    tm.assert_series_equal(result, expected)
454
455
456def test_expanding_cov_pairwise_diff_length():
457    # GH 7512
458    df1 = DataFrame([[1, 5], [3, 2], [3, 9]], columns=Index(["A", "B"], name="foo"))
459    df1a = DataFrame(
460        [[1, 5], [3, 9]], index=[0, 2], columns=Index(["A", "B"], name="foo")
461    )
462    df2 = DataFrame(
463        [[5, 6], [None, None], [2, 1]], columns=Index(["X", "Y"], name="foo")
464    )
465    df2a = DataFrame(
466        [[5, 6], [2, 1]], index=[0, 2], columns=Index(["X", "Y"], name="foo")
467    )
468    # TODO: xref gh-15826
469    # .loc is not preserving the names
470    result1 = df1.expanding().cov(df2, pairwise=True).loc[2]
471    result2 = df1.expanding().cov(df2a, pairwise=True).loc[2]
472    result3 = df1a.expanding().cov(df2, pairwise=True).loc[2]
473    result4 = df1a.expanding().cov(df2a, pairwise=True).loc[2]
474    expected = DataFrame(
475        [[-3.0, -6.0], [-5.0, -10.0]],
476        columns=Index(["A", "B"], name="foo"),
477        index=Index(["X", "Y"], name="foo"),
478    )
479    tm.assert_frame_equal(result1, expected)
480    tm.assert_frame_equal(result2, expected)
481    tm.assert_frame_equal(result3, expected)
482    tm.assert_frame_equal(result4, expected)
483
484
485def test_expanding_corr_pairwise_diff_length():
486    # GH 7512
487    df1 = DataFrame(
488        [[1, 2], [3, 2], [3, 4]], columns=["A", "B"], index=Index(range(3), name="bar")
489    )
490    df1a = DataFrame(
491        [[1, 2], [3, 4]], index=Index([0, 2], name="bar"), columns=["A", "B"]
492    )
493    df2 = DataFrame(
494        [[5, 6], [None, None], [2, 1]],
495        columns=["X", "Y"],
496        index=Index(range(3), name="bar"),
497    )
498    df2a = DataFrame(
499        [[5, 6], [2, 1]], index=Index([0, 2], name="bar"), columns=["X", "Y"]
500    )
501    result1 = df1.expanding().corr(df2, pairwise=True).loc[2]
502    result2 = df1.expanding().corr(df2a, pairwise=True).loc[2]
503    result3 = df1a.expanding().corr(df2, pairwise=True).loc[2]
504    result4 = df1a.expanding().corr(df2a, pairwise=True).loc[2]
505    expected = DataFrame(
506        [[-1.0, -1.0], [-1.0, -1.0]], columns=["A", "B"], index=Index(["X", "Y"])
507    )
508    tm.assert_frame_equal(result1, expected)
509    tm.assert_frame_equal(result2, expected)
510    tm.assert_frame_equal(result3, expected)
511    tm.assert_frame_equal(result4, expected)
512
513
514def test_expanding_apply_args_kwargs(engine_and_raw):
515    def mean_w_arg(x, const):
516        return np.mean(x) + const
517
518    engine, raw = engine_and_raw
519
520    df = DataFrame(np.random.rand(20, 3))
521
522    expected = df.expanding().apply(np.mean, engine=engine, raw=raw) + 20.0
523
524    result = df.expanding().apply(mean_w_arg, engine=engine, raw=raw, args=(20,))
525    tm.assert_frame_equal(result, expected)
526
527    result = df.expanding().apply(mean_w_arg, raw=raw, kwargs={"const": 20})
528    tm.assert_frame_equal(result, expected)
529