1from functools import partial 2 3import numpy as np 4import pytest 5 6from pandas import DataFrame, Series, concat, isna, notna 7import pandas._testing as tm 8 9import pandas.tseries.offsets as offsets 10 11 12def scoreatpercentile(a, per): 13 values = np.sort(a, axis=0) 14 15 idx = int(per / 1.0 * (values.shape[0] - 1)) 16 17 if idx == values.shape[0] - 1: 18 retval = values[-1] 19 20 else: 21 qlow = float(idx) / float(values.shape[0] - 1) 22 qhig = float(idx + 1) / float(values.shape[0] - 1) 23 vlow = values[idx] 24 vhig = values[idx + 1] 25 retval = vlow + (vhig - vlow) * (per - qlow) / (qhig - qlow) 26 27 return retval 28 29 30@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) 31def test_series(series, q): 32 compare_func = partial(scoreatpercentile, per=q) 33 result = series.rolling(50).quantile(q) 34 assert isinstance(result, Series) 35 tm.assert_almost_equal(result.iloc[-1], compare_func(series[-50:])) 36 37 38@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) 39def test_frame(raw, frame, q): 40 compare_func = partial(scoreatpercentile, per=q) 41 result = frame.rolling(50).quantile(q) 42 assert isinstance(result, DataFrame) 43 tm.assert_series_equal( 44 result.iloc[-1, :], 45 frame.iloc[-50:, :].apply(compare_func, axis=0, raw=raw), 46 check_names=False, 47 ) 48 49 50@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) 51def test_time_rule_series(series, q): 52 compare_func = partial(scoreatpercentile, per=q) 53 win = 25 54 ser = series[::2].resample("B").mean() 55 series_result = ser.rolling(window=win, min_periods=10).quantile(q) 56 last_date = series_result.index[-1] 57 prev_date = last_date - 24 * offsets.BDay() 58 59 trunc_series = series[::2].truncate(prev_date, last_date) 60 tm.assert_almost_equal(series_result[-1], compare_func(trunc_series)) 61 62 63@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) 64def test_time_rule_frame(raw, frame, q): 65 compare_func = partial(scoreatpercentile, per=q) 66 win = 25 67 frm = frame[::2].resample("B").mean() 68 frame_result = frm.rolling(window=win, min_periods=10).quantile(q) 69 last_date = frame_result.index[-1] 70 prev_date = last_date - 24 * offsets.BDay() 71 72 trunc_frame = frame[::2].truncate(prev_date, last_date) 73 tm.assert_series_equal( 74 frame_result.xs(last_date), 75 trunc_frame.apply(compare_func, raw=raw), 76 check_names=False, 77 ) 78 79 80@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) 81def test_nans(q): 82 compare_func = partial(scoreatpercentile, per=q) 83 obj = Series(np.random.randn(50)) 84 obj[:10] = np.NaN 85 obj[-10:] = np.NaN 86 87 result = obj.rolling(50, min_periods=30).quantile(q) 88 tm.assert_almost_equal(result.iloc[-1], compare_func(obj[10:-10])) 89 90 # min_periods is working correctly 91 result = obj.rolling(20, min_periods=15).quantile(q) 92 assert isna(result.iloc[23]) 93 assert not isna(result.iloc[24]) 94 95 assert not isna(result.iloc[-6]) 96 assert isna(result.iloc[-5]) 97 98 obj2 = Series(np.random.randn(20)) 99 result = obj2.rolling(10, min_periods=5).quantile(q) 100 assert isna(result.iloc[3]) 101 assert notna(result.iloc[4]) 102 103 result0 = obj.rolling(20, min_periods=0).quantile(q) 104 result1 = obj.rolling(20, min_periods=1).quantile(q) 105 tm.assert_almost_equal(result0, result1) 106 107 108@pytest.mark.parametrize("minp", [0, 99, 100]) 109@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) 110def test_min_periods(series, minp, q): 111 result = series.rolling(len(series) + 1, min_periods=minp).quantile(q) 112 expected = series.rolling(len(series), min_periods=minp).quantile(q) 113 nan_mask = isna(result) 114 tm.assert_series_equal(nan_mask, isna(expected)) 115 116 nan_mask = ~nan_mask 117 tm.assert_almost_equal(result[nan_mask], expected[nan_mask]) 118 119 120@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) 121def test_center(q): 122 obj = Series(np.random.randn(50)) 123 obj[:10] = np.NaN 124 obj[-10:] = np.NaN 125 126 result = obj.rolling(20, center=True).quantile(q) 127 expected = ( 128 concat([obj, Series([np.NaN] * 9)]) 129 .rolling(20) 130 .quantile(q)[9:] 131 .reset_index(drop=True) 132 ) 133 tm.assert_series_equal(result, expected) 134 135 136@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) 137def test_center_reindex_series(series, q): 138 # shifter index 139 s = [f"x{x:d}" for x in range(12)] 140 141 series_xp = ( 142 series.reindex(list(series.index) + s) 143 .rolling(window=25) 144 .quantile(q) 145 .shift(-12) 146 .reindex(series.index) 147 ) 148 149 series_rs = series.rolling(window=25, center=True).quantile(q) 150 tm.assert_series_equal(series_xp, series_rs) 151 152 153@pytest.mark.parametrize("q", [0.0, 0.1, 0.5, 0.9, 1.0]) 154def test_center_reindex_frame(frame, q): 155 # shifter index 156 s = [f"x{x:d}" for x in range(12)] 157 158 frame_xp = ( 159 frame.reindex(list(frame.index) + s) 160 .rolling(window=25) 161 .quantile(q) 162 .shift(-12) 163 .reindex(frame.index) 164 ) 165 frame_rs = frame.rolling(window=25, center=True).quantile(q) 166 tm.assert_frame_equal(frame_xp, frame_rs) 167