1from functools import partial
2
3import numpy as np
4import pytest
5
6import pandas.util._test_decorators as td
7
8from pandas import DataFrame, Series, concat, isna, notna
9import pandas._testing as tm
10
11import pandas.tseries.offsets as offsets
12
13
14@td.skip_if_no_scipy
15@pytest.mark.parametrize("sp_func, roll_func", [["kurtosis", "kurt"], ["skew", "skew"]])
16def test_series(series, sp_func, roll_func):
17    import scipy.stats
18
19    compare_func = partial(getattr(scipy.stats, sp_func), bias=False)
20    result = getattr(series.rolling(50), roll_func)()
21    assert isinstance(result, Series)
22    tm.assert_almost_equal(result.iloc[-1], compare_func(series[-50:]))
23
24
25@td.skip_if_no_scipy
26@pytest.mark.parametrize("sp_func, roll_func", [["kurtosis", "kurt"], ["skew", "skew"]])
27def test_frame(raw, frame, sp_func, roll_func):
28    import scipy.stats
29
30    compare_func = partial(getattr(scipy.stats, sp_func), bias=False)
31    result = getattr(frame.rolling(50), roll_func)()
32    assert isinstance(result, DataFrame)
33    tm.assert_series_equal(
34        result.iloc[-1, :],
35        frame.iloc[-50:, :].apply(compare_func, axis=0, raw=raw),
36        check_names=False,
37    )
38
39
40@td.skip_if_no_scipy
41@pytest.mark.parametrize("sp_func, roll_func", [["kurtosis", "kurt"], ["skew", "skew"]])
42def test_time_rule_series(series, sp_func, roll_func):
43    import scipy.stats
44
45    compare_func = partial(getattr(scipy.stats, sp_func), bias=False)
46    win = 25
47    ser = series[::2].resample("B").mean()
48    series_result = getattr(ser.rolling(window=win, min_periods=10), roll_func)()
49    last_date = series_result.index[-1]
50    prev_date = last_date - 24 * offsets.BDay()
51
52    trunc_series = series[::2].truncate(prev_date, last_date)
53    tm.assert_almost_equal(series_result[-1], compare_func(trunc_series))
54
55
56@td.skip_if_no_scipy
57@pytest.mark.parametrize("sp_func, roll_func", [["kurtosis", "kurt"], ["skew", "skew"]])
58def test_time_rule_frame(raw, frame, sp_func, roll_func):
59    import scipy.stats
60
61    compare_func = partial(getattr(scipy.stats, sp_func), bias=False)
62    win = 25
63    frm = frame[::2].resample("B").mean()
64    frame_result = getattr(frm.rolling(window=win, min_periods=10), roll_func)()
65    last_date = frame_result.index[-1]
66    prev_date = last_date - 24 * offsets.BDay()
67
68    trunc_frame = frame[::2].truncate(prev_date, last_date)
69    tm.assert_series_equal(
70        frame_result.xs(last_date),
71        trunc_frame.apply(compare_func, raw=raw),
72        check_names=False,
73    )
74
75
76@td.skip_if_no_scipy
77@pytest.mark.parametrize("sp_func, roll_func", [["kurtosis", "kurt"], ["skew", "skew"]])
78def test_nans(sp_func, roll_func):
79    import scipy.stats
80
81    compare_func = partial(getattr(scipy.stats, sp_func), bias=False)
82    obj = Series(np.random.randn(50))
83    obj[:10] = np.NaN
84    obj[-10:] = np.NaN
85
86    result = getattr(obj.rolling(50, min_periods=30), roll_func)()
87    tm.assert_almost_equal(result.iloc[-1], compare_func(obj[10:-10]))
88
89    # min_periods is working correctly
90    result = getattr(obj.rolling(20, min_periods=15), roll_func)()
91    assert isna(result.iloc[23])
92    assert not isna(result.iloc[24])
93
94    assert not isna(result.iloc[-6])
95    assert isna(result.iloc[-5])
96
97    obj2 = Series(np.random.randn(20))
98    result = getattr(obj2.rolling(10, min_periods=5), roll_func)()
99    assert isna(result.iloc[3])
100    assert notna(result.iloc[4])
101
102    result0 = getattr(obj.rolling(20, min_periods=0), roll_func)()
103    result1 = getattr(obj.rolling(20, min_periods=1), roll_func)()
104    tm.assert_almost_equal(result0, result1)
105
106
107@pytest.mark.parametrize("minp", [0, 99, 100])
108@pytest.mark.parametrize("roll_func", ["kurt", "skew"])
109def test_min_periods(series, minp, roll_func):
110    result = getattr(series.rolling(len(series) + 1, min_periods=minp), roll_func)()
111    expected = getattr(series.rolling(len(series), min_periods=minp), roll_func)()
112    nan_mask = isna(result)
113    tm.assert_series_equal(nan_mask, isna(expected))
114
115    nan_mask = ~nan_mask
116    tm.assert_almost_equal(result[nan_mask], expected[nan_mask])
117
118
119@pytest.mark.parametrize("roll_func", ["kurt", "skew"])
120def test_center(roll_func):
121    obj = Series(np.random.randn(50))
122    obj[:10] = np.NaN
123    obj[-10:] = np.NaN
124
125    result = getattr(obj.rolling(20, center=True), roll_func)()
126    expected = getattr(concat([obj, Series([np.NaN] * 9)]).rolling(20), roll_func)()[
127        9:
128    ].reset_index(drop=True)
129    tm.assert_series_equal(result, expected)
130
131
132@pytest.mark.parametrize("roll_func", ["kurt", "skew"])
133def test_center_reindex_series(series, roll_func):
134    # shifter index
135    s = [f"x{x:d}" for x in range(12)]
136
137    series_xp = (
138        getattr(
139            series.reindex(list(series.index) + s).rolling(window=25),
140            roll_func,
141        )()
142        .shift(-12)
143        .reindex(series.index)
144    )
145    series_rs = getattr(series.rolling(window=25, center=True), roll_func)()
146    tm.assert_series_equal(series_xp, series_rs)
147
148
149@pytest.mark.parametrize("roll_func", ["kurt", "skew"])
150def test_center_reindex_frame(frame, roll_func):
151    # shifter index
152    s = [f"x{x:d}" for x in range(12)]
153
154    frame_xp = (
155        getattr(
156            frame.reindex(list(frame.index) + s).rolling(window=25),
157            roll_func,
158        )()
159        .shift(-12)
160        .reindex(frame.index)
161    )
162    frame_rs = getattr(frame.rolling(window=25, center=True), roll_func)()
163    tm.assert_frame_equal(frame_xp, frame_rs)
164