1from datetime import datetime
2
3import numpy as np
4import pytest
5
6import pandas.util._test_decorators as td
7
8from pandas import DataFrame, DatetimeIndex, Index, MultiIndex, Series
9import pandas._testing as tm
10from pandas.core.window.common import flex_binary_moment
11
12
13def _rolling_consistency_cases():
14    for window in [1, 2, 3, 10, 20]:
15        for min_periods in {0, 1, 2, 3, 4, window}:
16            if min_periods and (min_periods > window):
17                continue
18            for center in [False, True]:
19                yield window, min_periods, center
20
21
22# binary moments
23def test_rolling_cov(series):
24    A = series
25    B = A + np.random.randn(len(A))
26
27    result = A.rolling(window=50, min_periods=25).cov(B)
28    tm.assert_almost_equal(result[-1], np.cov(A[-50:], B[-50:])[0, 1])
29
30
31def test_rolling_corr(series):
32    A = series
33    B = A + np.random.randn(len(A))
34
35    result = A.rolling(window=50, min_periods=25).corr(B)
36    tm.assert_almost_equal(result[-1], np.corrcoef(A[-50:], B[-50:])[0, 1])
37
38    # test for correct bias correction
39    a = tm.makeTimeSeries()
40    b = tm.makeTimeSeries()
41    a[:5] = np.nan
42    b[:10] = np.nan
43
44    result = a.rolling(window=len(a), min_periods=1).corr(b)
45    tm.assert_almost_equal(result[-1], a.corr(b))
46
47
48@pytest.mark.parametrize("func", ["cov", "corr"])
49def test_rolling_pairwise_cov_corr(func, frame):
50    result = getattr(frame.rolling(window=10, min_periods=5), func)()
51    result = result.loc[(slice(None), 1), 5]
52    result.index = result.index.droplevel(1)
53    expected = getattr(frame[1].rolling(window=10, min_periods=5), func)(frame[5])
54    tm.assert_series_equal(result, expected, check_names=False)
55
56
57@pytest.mark.parametrize("method", ["corr", "cov"])
58def test_flex_binary_frame(method, frame):
59    series = frame[1]
60
61    res = getattr(series.rolling(window=10), method)(frame)
62    res2 = getattr(frame.rolling(window=10), method)(series)
63    exp = frame.apply(lambda x: getattr(series.rolling(window=10), method)(x))
64
65    tm.assert_frame_equal(res, exp)
66    tm.assert_frame_equal(res2, exp)
67
68    frame2 = frame.copy()
69    frame2.values[:] = np.random.randn(*frame2.shape)
70
71    res3 = getattr(frame.rolling(window=10), method)(frame2)
72    exp = DataFrame(
73        {k: getattr(frame[k].rolling(window=10), method)(frame2[k]) for k in frame}
74    )
75    tm.assert_frame_equal(res3, exp)
76
77
78@pytest.mark.parametrize(
79    "window,min_periods,center", list(_rolling_consistency_cases())
80)
81@pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum])
82def test_rolling_apply_consistency_sum_nans(
83    consistency_data, window, min_periods, center, f
84):
85    x, is_constant, no_nans = consistency_data
86
87    if f is np.nansum and min_periods == 0:
88        pass
89    else:
90        rolling_f_result = x.rolling(
91            window=window, min_periods=min_periods, center=center
92        ).sum()
93        rolling_apply_f_result = x.rolling(
94            window=window, min_periods=min_periods, center=center
95        ).apply(func=f, raw=True)
96        tm.assert_equal(rolling_f_result, rolling_apply_f_result)
97
98
99@pytest.mark.parametrize(
100    "window,min_periods,center", list(_rolling_consistency_cases())
101)
102@pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum, np.sum])
103def test_rolling_apply_consistency_sum_no_nans(
104    consistency_data, window, min_periods, center, f
105):
106    x, is_constant, no_nans = consistency_data
107
108    if no_nans:
109        if f is np.nansum and min_periods == 0:
110            pass
111        else:
112            rolling_f_result = x.rolling(
113                window=window, min_periods=min_periods, center=center
114            ).sum()
115            rolling_apply_f_result = x.rolling(
116                window=window, min_periods=min_periods, center=center
117            ).apply(func=f, raw=True)
118            tm.assert_equal(rolling_f_result, rolling_apply_f_result)
119
120
121@pytest.mark.parametrize("window", range(7))
122def test_rolling_corr_with_zero_variance(window):
123    # GH 18430
124    s = Series(np.zeros(20))
125    other = Series(np.arange(20))
126
127    assert s.rolling(window=window).corr(other=other).isna().all()
128
129
130def test_flex_binary_moment():
131    # GH3155
132    # don't blow the stack
133    msg = "arguments to moment function must be of type np.ndarray/Series/DataFrame"
134    with pytest.raises(TypeError, match=msg):
135        flex_binary_moment(5, 6, None)
136
137
138def test_corr_sanity():
139    # GH 3155
140    df = DataFrame(
141        np.array(
142            [
143                [0.87024726, 0.18505595],
144                [0.64355431, 0.3091617],
145                [0.92372966, 0.50552513],
146                [0.00203756, 0.04520709],
147                [0.84780328, 0.33394331],
148                [0.78369152, 0.63919667],
149            ]
150        )
151    )
152
153    res = df[0].rolling(5, center=True).corr(df[1])
154    assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res)
155
156    df = DataFrame(np.random.rand(30, 2))
157    res = df[0].rolling(5, center=True).corr(df[1])
158    assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res)
159
160
161def test_rolling_cov_diff_length():
162    # GH 7512
163    s1 = Series([1, 2, 3], index=[0, 1, 2])
164    s2 = Series([1, 3], index=[0, 2])
165    result = s1.rolling(window=3, min_periods=2).cov(s2)
166    expected = Series([None, None, 2.0])
167    tm.assert_series_equal(result, expected)
168
169    s2a = Series([1, None, 3], index=[0, 1, 2])
170    result = s1.rolling(window=3, min_periods=2).cov(s2a)
171    tm.assert_series_equal(result, expected)
172
173
174def test_rolling_corr_diff_length():
175    # GH 7512
176    s1 = Series([1, 2, 3], index=[0, 1, 2])
177    s2 = Series([1, 3], index=[0, 2])
178    result = s1.rolling(window=3, min_periods=2).corr(s2)
179    expected = Series([None, None, 1.0])
180    tm.assert_series_equal(result, expected)
181
182    s2a = Series([1, None, 3], index=[0, 1, 2])
183    result = s1.rolling(window=3, min_periods=2).corr(s2a)
184    tm.assert_series_equal(result, expected)
185
186
187@pytest.mark.parametrize(
188    "f",
189    [
190        lambda x: x.rolling(window=10, min_periods=5).cov(x, pairwise=False),
191        lambda x: x.rolling(window=10, min_periods=5).corr(x, pairwise=False),
192        lambda x: x.rolling(window=10, min_periods=5).max(),
193        lambda x: x.rolling(window=10, min_periods=5).min(),
194        lambda x: x.rolling(window=10, min_periods=5).sum(),
195        lambda x: x.rolling(window=10, min_periods=5).mean(),
196        lambda x: x.rolling(window=10, min_periods=5).std(),
197        lambda x: x.rolling(window=10, min_periods=5).var(),
198        lambda x: x.rolling(window=10, min_periods=5).skew(),
199        lambda x: x.rolling(window=10, min_periods=5).kurt(),
200        lambda x: x.rolling(window=10, min_periods=5).quantile(quantile=0.5),
201        lambda x: x.rolling(window=10, min_periods=5).median(),
202        lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=False),
203        lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=True),
204        pytest.param(
205            lambda x: x.rolling(win_type="boxcar", window=10, min_periods=5).mean(),
206            marks=td.skip_if_no_scipy,
207        ),
208    ],
209)
210def test_rolling_functions_window_non_shrinkage(f):
211    # GH 7764
212    s = Series(range(4))
213    s_expected = Series(np.nan, index=s.index)
214    df = DataFrame([[1, 5], [3, 2], [3, 9], [-1, 0]], columns=["A", "B"])
215    df_expected = DataFrame(np.nan, index=df.index, columns=df.columns)
216
217    s_result = f(s)
218    tm.assert_series_equal(s_result, s_expected)
219
220    df_result = f(df)
221    tm.assert_frame_equal(df_result, df_expected)
222
223
224@pytest.mark.parametrize(
225    "f",
226    [
227        lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)),
228        lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)),
229    ],
230)
231def test_rolling_functions_window_non_shrinkage_binary(f):
232
233    # corr/cov return a MI DataFrame
234    df = DataFrame(
235        [[1, 5], [3, 2], [3, 9], [-1, 0]],
236        columns=Index(["A", "B"], name="foo"),
237        index=Index(range(4), name="bar"),
238    )
239    df_expected = DataFrame(
240        columns=Index(["A", "B"], name="foo"),
241        index=MultiIndex.from_product([df.index, df.columns], names=["bar", "foo"]),
242        dtype="float64",
243    )
244    df_result = f(df)
245    tm.assert_frame_equal(df_result, df_expected)
246
247
248def test_rolling_skew_edge_cases():
249
250    all_nan = Series([np.NaN] * 5)
251
252    # yields all NaN (0 variance)
253    d = Series([1] * 5)
254    x = d.rolling(window=5).skew()
255    tm.assert_series_equal(all_nan, x)
256
257    # yields all NaN (window too small)
258    d = Series(np.random.randn(5))
259    x = d.rolling(window=2).skew()
260    tm.assert_series_equal(all_nan, x)
261
262    # yields [NaN, NaN, NaN, 0.177994, 1.548824]
263    d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401])
264    expected = Series([np.NaN, np.NaN, np.NaN, 0.177994, 1.548824])
265    x = d.rolling(window=4).skew()
266    tm.assert_series_equal(expected, x)
267
268
269def test_rolling_kurt_edge_cases():
270
271    all_nan = Series([np.NaN] * 5)
272
273    # yields all NaN (0 variance)
274    d = Series([1] * 5)
275    x = d.rolling(window=5).kurt()
276    tm.assert_series_equal(all_nan, x)
277
278    # yields all NaN (window too small)
279    d = Series(np.random.randn(5))
280    x = d.rolling(window=3).kurt()
281    tm.assert_series_equal(all_nan, x)
282
283    # yields [NaN, NaN, NaN, 1.224307, 2.671499]
284    d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401])
285    expected = Series([np.NaN, np.NaN, np.NaN, 1.224307, 2.671499])
286    x = d.rolling(window=4).kurt()
287    tm.assert_series_equal(expected, x)
288
289
290def test_rolling_skew_eq_value_fperr():
291    # #18804 all rolling skew for all equal values should return Nan
292    a = Series([1.1] * 15).rolling(window=10).skew()
293    assert np.isnan(a).all()
294
295
296def test_rolling_kurt_eq_value_fperr():
297    # #18804 all rolling kurt for all equal values should return Nan
298    a = Series([1.1] * 15).rolling(window=10).kurt()
299    assert np.isnan(a).all()
300
301
302def test_rolling_max_gh6297():
303    """Replicate result expected in GH #6297"""
304    indices = [datetime(1975, 1, i) for i in range(1, 6)]
305    # So that we can have 2 datapoints on one of the days
306    indices.append(datetime(1975, 1, 3, 6, 0))
307    series = Series(range(1, 7), index=indices)
308    # Use floats instead of ints as values
309    series = series.map(lambda x: float(x))
310    # Sort chronologically
311    series = series.sort_index()
312
313    expected = Series(
314        [1.0, 2.0, 6.0, 4.0, 5.0],
315        index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"),
316    )
317    x = series.resample("D").max().rolling(window=1).max()
318    tm.assert_series_equal(expected, x)
319
320
321def test_rolling_max_resample():
322
323    indices = [datetime(1975, 1, i) for i in range(1, 6)]
324    # So that we can have 3 datapoints on last day (4, 10, and 20)
325    indices.append(datetime(1975, 1, 5, 1))
326    indices.append(datetime(1975, 1, 5, 2))
327    series = Series(list(range(0, 5)) + [10, 20], index=indices)
328    # Use floats instead of ints as values
329    series = series.map(lambda x: float(x))
330    # Sort chronologically
331    series = series.sort_index()
332
333    # Default how should be max
334    expected = Series(
335        [0.0, 1.0, 2.0, 3.0, 20.0],
336        index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"),
337    )
338    x = series.resample("D").max().rolling(window=1).max()
339    tm.assert_series_equal(expected, x)
340
341    # Now specify median (10.0)
342    expected = Series(
343        [0.0, 1.0, 2.0, 3.0, 10.0],
344        index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"),
345    )
346    x = series.resample("D").median().rolling(window=1).max()
347    tm.assert_series_equal(expected, x)
348
349    # Now specify mean (4+10+20)/3
350    v = (4.0 + 10.0 + 20.0) / 3.0
351    expected = Series(
352        [0.0, 1.0, 2.0, 3.0, v],
353        index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"),
354    )
355    x = series.resample("D").mean().rolling(window=1).max()
356    tm.assert_series_equal(expected, x)
357
358
359def test_rolling_min_resample():
360
361    indices = [datetime(1975, 1, i) for i in range(1, 6)]
362    # So that we can have 3 datapoints on last day (4, 10, and 20)
363    indices.append(datetime(1975, 1, 5, 1))
364    indices.append(datetime(1975, 1, 5, 2))
365    series = Series(list(range(0, 5)) + [10, 20], index=indices)
366    # Use floats instead of ints as values
367    series = series.map(lambda x: float(x))
368    # Sort chronologically
369    series = series.sort_index()
370
371    # Default how should be min
372    expected = Series(
373        [0.0, 1.0, 2.0, 3.0, 4.0],
374        index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"),
375    )
376    r = series.resample("D").min().rolling(window=1)
377    tm.assert_series_equal(expected, r.min())
378
379
380def test_rolling_median_resample():
381
382    indices = [datetime(1975, 1, i) for i in range(1, 6)]
383    # So that we can have 3 datapoints on last day (4, 10, and 20)
384    indices.append(datetime(1975, 1, 5, 1))
385    indices.append(datetime(1975, 1, 5, 2))
386    series = Series(list(range(0, 5)) + [10, 20], index=indices)
387    # Use floats instead of ints as values
388    series = series.map(lambda x: float(x))
389    # Sort chronologically
390    series = series.sort_index()
391
392    # Default how should be median
393    expected = Series(
394        [0.0, 1.0, 2.0, 3.0, 10],
395        index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"),
396    )
397    x = series.resample("D").median().rolling(window=1).median()
398    tm.assert_series_equal(expected, x)
399
400
401def test_rolling_median_memory_error():
402    # GH11722
403    n = 20000
404    Series(np.random.randn(n)).rolling(window=2, center=False).median()
405    Series(np.random.randn(n)).rolling(window=2, center=False).median()
406
407
408@pytest.mark.parametrize(
409    "data_type",
410    [np.dtype(f"f{width}") for width in [4, 8]]
411    + [np.dtype(f"{sign}{width}") for width in [1, 2, 4, 8] for sign in "ui"],
412)
413def test_rolling_min_max_numeric_types(data_type):
414    # GH12373
415
416    # Just testing that these don't throw exceptions and that
417    # the return type is float64. Other tests will cover quantitative
418    # correctness
419    result = DataFrame(np.arange(20, dtype=data_type)).rolling(window=5).max()
420    assert result.dtypes[0] == np.dtype("f8")
421    result = DataFrame(np.arange(20, dtype=data_type)).rolling(window=5).min()
422    assert result.dtypes[0] == np.dtype("f8")
423
424
425@pytest.mark.parametrize(
426    "f",
427    [
428        lambda x: x.rolling(window=10, min_periods=0).count(),
429        lambda x: x.rolling(window=10, min_periods=5).cov(x, pairwise=False),
430        lambda x: x.rolling(window=10, min_periods=5).corr(x, pairwise=False),
431        lambda x: x.rolling(window=10, min_periods=5).max(),
432        lambda x: x.rolling(window=10, min_periods=5).min(),
433        lambda x: x.rolling(window=10, min_periods=5).sum(),
434        lambda x: x.rolling(window=10, min_periods=5).mean(),
435        lambda x: x.rolling(window=10, min_periods=5).std(),
436        lambda x: x.rolling(window=10, min_periods=5).var(),
437        lambda x: x.rolling(window=10, min_periods=5).skew(),
438        lambda x: x.rolling(window=10, min_periods=5).kurt(),
439        lambda x: x.rolling(window=10, min_periods=5).quantile(0.5),
440        lambda x: x.rolling(window=10, min_periods=5).median(),
441        lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=False),
442        lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=True),
443        pytest.param(
444            lambda x: x.rolling(win_type="boxcar", window=10, min_periods=5).mean(),
445            marks=td.skip_if_no_scipy,
446        ),
447    ],
448)
449def test_moment_functions_zero_length(f):
450    # GH 8056
451    s = Series(dtype=np.float64)
452    s_expected = s
453    df1 = DataFrame()
454    df1_expected = df1
455    df2 = DataFrame(columns=["a"])
456    df2["a"] = df2["a"].astype("float64")
457    df2_expected = df2
458
459    s_result = f(s)
460    tm.assert_series_equal(s_result, s_expected)
461
462    df1_result = f(df1)
463    tm.assert_frame_equal(df1_result, df1_expected)
464
465    df2_result = f(df2)
466    tm.assert_frame_equal(df2_result, df2_expected)
467
468
469@pytest.mark.parametrize(
470    "f",
471    [
472        lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)),
473        lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)),
474    ],
475)
476def test_moment_functions_zero_length_pairwise(f):
477
478    df1 = DataFrame()
479    df2 = DataFrame(columns=Index(["a"], name="foo"), index=Index([], name="bar"))
480    df2["a"] = df2["a"].astype("float64")
481
482    df1_expected = DataFrame(
483        index=MultiIndex.from_product([df1.index, df1.columns]), columns=Index([])
484    )
485    df2_expected = DataFrame(
486        index=MultiIndex.from_product([df2.index, df2.columns], names=["bar", "foo"]),
487        columns=Index(["a"], name="foo"),
488        dtype="float64",
489    )
490
491    df1_result = f(df1)
492    tm.assert_frame_equal(df1_result, df1_expected)
493
494    df2_result = f(df2)
495    tm.assert_frame_equal(df2_result, df2_expected)
496
497
498@pytest.mark.parametrize(
499    "window,min_periods,center", list(_rolling_consistency_cases())
500)
501@pytest.mark.parametrize("ddof", [0, 1])
502def test_moments_consistency_var(consistency_data, window, min_periods, center, ddof):
503    x, is_constant, no_nans = consistency_data
504
505    mean_x = x.rolling(window=window, min_periods=min_periods, center=center).mean()
506    var_x = x.rolling(window=window, min_periods=min_periods, center=center).var(
507        ddof=ddof
508    )
509    assert not (var_x < 0).any().any()
510
511    if ddof == 0:
512        # check that biased var(x) == mean(x^2) - mean(x)^2
513        mean_x2 = (
514            (x * x)
515            .rolling(window=window, min_periods=min_periods, center=center)
516            .mean()
517        )
518        tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x))
519
520
521@pytest.mark.parametrize(
522    "window,min_periods,center", list(_rolling_consistency_cases())
523)
524@pytest.mark.parametrize("ddof", [0, 1])
525def test_moments_consistency_var_constant(
526    consistency_data, window, min_periods, center, ddof
527):
528    x, is_constant, no_nans = consistency_data
529
530    if is_constant:
531        count_x = x.rolling(
532            window=window, min_periods=min_periods, center=center
533        ).count()
534        var_x = x.rolling(window=window, min_periods=min_periods, center=center).var(
535            ddof=ddof
536        )
537
538        # check that variance of constant series is identically 0
539        assert not (var_x > 0).any().any()
540        expected = x * np.nan
541        expected[count_x >= max(min_periods, 1)] = 0.0
542        if ddof == 1:
543            expected[count_x < 2] = np.nan
544        tm.assert_equal(var_x, expected)
545
546
547@pytest.mark.parametrize(
548    "window,min_periods,center", list(_rolling_consistency_cases())
549)
550@pytest.mark.parametrize("ddof", [0, 1])
551def test_rolling_consistency_std(consistency_data, window, min_periods, center, ddof):
552    x, is_constant, no_nans = consistency_data
553
554    var_x = x.rolling(window=window, min_periods=min_periods, center=center).var(
555        ddof=ddof
556    )
557    std_x = x.rolling(window=window, min_periods=min_periods, center=center).std(
558        ddof=ddof
559    )
560    assert not (var_x < 0).any().any()
561    assert not (std_x < 0).any().any()
562
563    # check that var(x) == std(x)^2
564    tm.assert_equal(var_x, std_x * std_x)
565
566
567@pytest.mark.parametrize(
568    "window,min_periods,center", list(_rolling_consistency_cases())
569)
570@pytest.mark.parametrize("ddof", [0, 1])
571def test_rolling_consistency_cov(consistency_data, window, min_periods, center, ddof):
572    x, is_constant, no_nans = consistency_data
573    var_x = x.rolling(window=window, min_periods=min_periods, center=center).var(
574        ddof=ddof
575    )
576    assert not (var_x < 0).any().any()
577
578    cov_x_x = x.rolling(window=window, min_periods=min_periods, center=center).cov(
579        x, ddof=ddof
580    )
581    assert not (cov_x_x < 0).any().any()
582
583    # check that var(x) == cov(x, x)
584    tm.assert_equal(var_x, cov_x_x)
585
586
587@pytest.mark.parametrize(
588    "window,min_periods,center", list(_rolling_consistency_cases())
589)
590@pytest.mark.parametrize("ddof", [0, 1])
591def test_rolling_consistency_series_cov_corr(
592    consistency_data, window, min_periods, center, ddof
593):
594    x, is_constant, no_nans = consistency_data
595
596    if isinstance(x, Series):
597        var_x_plus_y = (
598            (x + x)
599            .rolling(window=window, min_periods=min_periods, center=center)
600            .var(ddof=ddof)
601        )
602        var_x = x.rolling(window=window, min_periods=min_periods, center=center).var(
603            ddof=ddof
604        )
605        var_y = x.rolling(window=window, min_periods=min_periods, center=center).var(
606            ddof=ddof
607        )
608        cov_x_y = x.rolling(window=window, min_periods=min_periods, center=center).cov(
609            x, ddof=ddof
610        )
611        # check that cov(x, y) == (var(x+y) - var(x) -
612        # var(y)) / 2
613        tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y))
614
615        # check that corr(x, y) == cov(x, y) / (std(x) *
616        # std(y))
617        corr_x_y = x.rolling(
618            window=window, min_periods=min_periods, center=center
619        ).corr(x)
620        std_x = x.rolling(window=window, min_periods=min_periods, center=center).std(
621            ddof=ddof
622        )
623        std_y = x.rolling(window=window, min_periods=min_periods, center=center).std(
624            ddof=ddof
625        )
626        tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y))
627
628        if ddof == 0:
629            # check that biased cov(x, y) == mean(x*y) -
630            # mean(x)*mean(y)
631            mean_x = x.rolling(
632                window=window, min_periods=min_periods, center=center
633            ).mean()
634            mean_y = x.rolling(
635                window=window, min_periods=min_periods, center=center
636            ).mean()
637            mean_x_times_y = (
638                (x * x)
639                .rolling(window=window, min_periods=min_periods, center=center)
640                .mean()
641            )
642            tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y))
643
644
645@pytest.mark.parametrize(
646    "window,min_periods,center", list(_rolling_consistency_cases())
647)
648def test_rolling_consistency_mean(consistency_data, window, min_periods, center):
649    x, is_constant, no_nans = consistency_data
650
651    result = x.rolling(window=window, min_periods=min_periods, center=center).mean()
652    expected = (
653        x.rolling(window=window, min_periods=min_periods, center=center)
654        .sum()
655        .divide(
656            x.rolling(window=window, min_periods=min_periods, center=center).count()
657        )
658    )
659    tm.assert_equal(result, expected.astype("float64"))
660
661
662@pytest.mark.parametrize(
663    "window,min_periods,center", list(_rolling_consistency_cases())
664)
665def test_rolling_consistency_constant(consistency_data, window, min_periods, center):
666    x, is_constant, no_nans = consistency_data
667
668    if is_constant:
669        count_x = x.rolling(
670            window=window, min_periods=min_periods, center=center
671        ).count()
672        mean_x = x.rolling(window=window, min_periods=min_periods, center=center).mean()
673        # check that correlation of a series with itself is either 1 or NaN
674        corr_x_x = x.rolling(
675            window=window, min_periods=min_periods, center=center
676        ).corr(x)
677
678        exp = x.max() if isinstance(x, Series) else x.max().max()
679
680        # check mean of constant series
681        expected = x * np.nan
682        expected[count_x >= max(min_periods, 1)] = exp
683        tm.assert_equal(mean_x, expected)
684
685        # check correlation of constant series with itself is NaN
686        expected[:] = np.nan
687        tm.assert_equal(corr_x_x, expected)
688
689
690@pytest.mark.parametrize(
691    "window,min_periods,center", list(_rolling_consistency_cases())
692)
693def test_rolling_consistency_var_debiasing_factors(
694    consistency_data, window, min_periods, center
695):
696    x, is_constant, no_nans = consistency_data
697
698    # check variance debiasing factors
699    var_unbiased_x = x.rolling(
700        window=window, min_periods=min_periods, center=center
701    ).var()
702    var_biased_x = x.rolling(window=window, min_periods=min_periods, center=center).var(
703        ddof=0
704    )
705    var_debiasing_factors_x = (
706        x.rolling(window=window, min_periods=min_periods, center=center)
707        .count()
708        .divide(
709            (
710                x.rolling(window=window, min_periods=min_periods, center=center).count()
711                - 1.0
712            ).replace(0.0, np.nan)
713        )
714    )
715    tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x)
716