1from functools import partial
2
3import numpy as np
4import pytest
5
6from pandas import DataFrame, Series, concat, isna, notna
7import pandas._testing as tm
8
9import pandas.tseries.offsets as offsets
10
11
12def scoreatpercentile(a, per):
13    values = np.sort(a, axis=0)
14
15    idx = int(per / 1.0 * (values.shape[0] - 1))
16
17    if idx == values.shape[0] - 1:
18        retval = values[-1]
19
20    else:
21        qlow = float(idx) / float(values.shape[0] - 1)
22        qhig = float(idx + 1) / float(values.shape[0] - 1)
23        vlow = values[idx]
24        vhig = values[idx + 1]
25        retval = vlow + (vhig - vlow) * (per - qlow) / (qhig - qlow)
26
27    return retval
28
29
30@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
31def test_series(series, q):
32    compare_func = partial(scoreatpercentile, per=q)
33    result = series.rolling(50).quantile(q)
34    assert isinstance(result, Series)
35    tm.assert_almost_equal(result.iloc[-1], compare_func(series[-50:]))
36
37
38@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
39def test_frame(raw, frame, q):
40    compare_func = partial(scoreatpercentile, per=q)
41    result = frame.rolling(50).quantile(q)
42    assert isinstance(result, DataFrame)
43    tm.assert_series_equal(
44        result.iloc[-1, :],
45        frame.iloc[-50:, :].apply(compare_func, axis=0, raw=raw),
46        check_names=False,
47    )
48
49
50@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
51def test_time_rule_series(series, q):
52    compare_func = partial(scoreatpercentile, per=q)
53    win = 25
54    ser = series[::2].resample("B").mean()
55    series_result = ser.rolling(window=win, min_periods=10).quantile(q)
56    last_date = series_result.index[-1]
57    prev_date = last_date - 24 * offsets.BDay()
58
59    trunc_series = series[::2].truncate(prev_date, last_date)
60    tm.assert_almost_equal(series_result[-1], compare_func(trunc_series))
61
62
63@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
64def test_time_rule_frame(raw, frame, q):
65    compare_func = partial(scoreatpercentile, per=q)
66    win = 25
67    frm = frame[::2].resample("B").mean()
68    frame_result = frm.rolling(window=win, min_periods=10).quantile(q)
69    last_date = frame_result.index[-1]
70    prev_date = last_date - 24 * offsets.BDay()
71
72    trunc_frame = frame[::2].truncate(prev_date, last_date)
73    tm.assert_series_equal(
74        frame_result.xs(last_date),
75        trunc_frame.apply(compare_func, raw=raw),
76        check_names=False,
77    )
78
79
80@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
81def test_nans(q):
82    compare_func = partial(scoreatpercentile, per=q)
83    obj = Series(np.random.randn(50))
84    obj[:10] = np.NaN
85    obj[-10:] = np.NaN
86
87    result = obj.rolling(50, min_periods=30).quantile(q)
88    tm.assert_almost_equal(result.iloc[-1], compare_func(obj[10:-10]))
89
90    # min_periods is working correctly
91    result = obj.rolling(20, min_periods=15).quantile(q)
92    assert isna(result.iloc[23])
93    assert not isna(result.iloc[24])
94
95    assert not isna(result.iloc[-6])
96    assert isna(result.iloc[-5])
97
98    obj2 = Series(np.random.randn(20))
99    result = obj2.rolling(10, min_periods=5).quantile(q)
100    assert isna(result.iloc[3])
101    assert notna(result.iloc[4])
102
103    result0 = obj.rolling(20, min_periods=0).quantile(q)
104    result1 = obj.rolling(20, min_periods=1).quantile(q)
105    tm.assert_almost_equal(result0, result1)
106
107
108@pytest.mark.parametrize("minp", [0, 99, 100])
109@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
110def test_min_periods(series, minp, q):
111    result = series.rolling(len(series) + 1, min_periods=minp).quantile(q)
112    expected = series.rolling(len(series), min_periods=minp).quantile(q)
113    nan_mask = isna(result)
114    tm.assert_series_equal(nan_mask, isna(expected))
115
116    nan_mask = ~nan_mask
117    tm.assert_almost_equal(result[nan_mask], expected[nan_mask])
118
119
120@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
121def test_center(q):
122    obj = Series(np.random.randn(50))
123    obj[:10] = np.NaN
124    obj[-10:] = np.NaN
125
126    result = obj.rolling(20, center=True).quantile(q)
127    expected = (
128        concat([obj, Series([np.NaN] * 9)])
129        .rolling(20)
130        .quantile(q)[9:]
131        .reset_index(drop=True)
132    )
133    tm.assert_series_equal(result, expected)
134
135
136@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
137def test_center_reindex_series(series, q):
138    # shifter index
139    s = [f"x{x:d}" for x in range(12)]
140
141    series_xp = (
142        series.reindex(list(series.index) + s)
143        .rolling(window=25)
144        .quantile(q)
145        .shift(-12)
146        .reindex(series.index)
147    )
148
149    series_rs = series.rolling(window=25, center=True).quantile(q)
150    tm.assert_series_equal(series_xp, series_rs)
151
152
153@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0])
154def test_center_reindex_frame(frame, q):
155    # shifter index
156    s = [f"x{x:d}" for x in range(12)]
157
158    frame_xp = (
159        frame.reindex(list(frame.index) + s)
160        .rolling(window=25)
161        .quantile(q)
162        .shift(-12)
163        .reindex(frame.index)
164    )
165    frame_rs = frame.rolling(window=25, center=True).quantile(q)
166    tm.assert_frame_equal(frame_xp, frame_rs)
167