1import numpy as np 2import pytest 3 4from pandas import DataFrame, Series, concat 5import pandas._testing as tm 6 7 8@pytest.mark.parametrize("func", ["cov", "corr"]) 9def test_ewm_pairwise_cov_corr(func, frame): 10 result = getattr(frame.ewm(span=10, min_periods=5), func)() 11 result = result.loc[(slice(None), 1), 5] 12 result.index = result.index.droplevel(1) 13 expected = getattr(frame[1].ewm(span=10, min_periods=5), func)(frame[5]) 14 tm.assert_series_equal(result, expected, check_names=False) 15 16 17@pytest.mark.parametrize("name", ["cov", "corr"]) 18def test_ewm_corr_cov(name): 19 A = Series(np.random.randn(50), index=np.arange(50)) 20 B = A[2:] + np.random.randn(48) 21 22 A[:10] = np.NaN 23 B[-10:] = np.NaN 24 25 result = getattr(A.ewm(com=20, min_periods=5), name)(B) 26 assert np.isnan(result.values[:14]).all() 27 assert not np.isnan(result.values[14:]).any() 28 29 30@pytest.mark.parametrize("min_periods", [0, 1, 2]) 31@pytest.mark.parametrize("name", ["cov", "corr"]) 32def test_ewm_corr_cov_min_periods(name, min_periods): 33 # GH 7898 34 A = Series(np.random.randn(50), index=np.arange(50)) 35 B = A[2:] + np.random.randn(48) 36 37 A[:10] = np.NaN 38 B[-10:] = np.NaN 39 40 result = getattr(A.ewm(com=20, min_periods=min_periods), name)(B) 41 # binary functions (ewmcov, ewmcorr) with bias=False require at 42 # least two values 43 assert np.isnan(result.values[:11]).all() 44 assert not np.isnan(result.values[11:]).any() 45 46 # check series of length 0 47 empty = Series([], dtype=np.float64) 48 result = getattr(empty.ewm(com=50, min_periods=min_periods), name)(empty) 49 tm.assert_series_equal(result, empty) 50 51 # check series of length 1 52 result = getattr(Series([1.0]).ewm(com=50, min_periods=min_periods), name)( 53 Series([1.0]) 54 ) 55 tm.assert_series_equal(result, Series([np.NaN])) 56 57 58@pytest.mark.parametrize("name", ["cov", "corr"]) 59def test_different_input_array_raise_exception(name): 60 A = Series(np.random.randn(50), index=np.arange(50)) 61 A[:10] = np.NaN 62 63 msg = "Input arrays must be of the same type!" 64 # exception raised is Exception 65 with pytest.raises(Exception, match=msg): 66 getattr(A.ewm(com=20, min_periods=5), name)(np.random.randn(50)) 67 68 69def create_mock_weights(obj, com, adjust, ignore_na): 70 if isinstance(obj, DataFrame): 71 if not len(obj.columns): 72 return DataFrame(index=obj.index, columns=obj.columns) 73 w = concat( 74 [ 75 create_mock_series_weights( 76 obj.iloc[:, i], com=com, adjust=adjust, ignore_na=ignore_na 77 ) 78 for i, _ in enumerate(obj.columns) 79 ], 80 axis=1, 81 ) 82 w.index = obj.index 83 w.columns = obj.columns 84 return w 85 else: 86 return create_mock_series_weights(obj, com, adjust, ignore_na) 87 88 89def create_mock_series_weights(s, com, adjust, ignore_na): 90 w = Series(np.nan, index=s.index) 91 alpha = 1.0 / (1.0 + com) 92 if adjust: 93 count = 0 94 for i in range(len(s)): 95 if s.iat[i] == s.iat[i]: 96 w.iat[i] = pow(1.0 / (1.0 - alpha), count) 97 count += 1 98 elif not ignore_na: 99 count += 1 100 else: 101 sum_wts = 0.0 102 prev_i = -1 103 count = 0 104 for i in range(len(s)): 105 if s.iat[i] == s.iat[i]: 106 if prev_i == -1: 107 w.iat[i] = 1.0 108 else: 109 w.iat[i] = alpha * sum_wts / pow(1.0 - alpha, count - prev_i) 110 sum_wts += w.iat[i] 111 prev_i = count 112 count += 1 113 elif not ignore_na: 114 count += 1 115 return w 116 117 118@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) 119def test_ewm_consistency_mean(consistency_data, adjust, ignore_na, min_periods): 120 x, is_constant, no_nans = consistency_data 121 com = 3.0 122 123 result = x.ewm( 124 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 125 ).mean() 126 weights = create_mock_weights(x, com=com, adjust=adjust, ignore_na=ignore_na) 127 expected = ( 128 x.multiply(weights).cumsum().divide(weights.cumsum()).fillna(method="ffill") 129 ) 130 expected[ 131 x.expanding().count() < (max(min_periods, 1) if min_periods else 1) 132 ] = np.nan 133 tm.assert_equal(result, expected.astype("float64")) 134 135 136@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) 137def test_ewm_consistency_consistent(consistency_data, adjust, ignore_na, min_periods): 138 x, is_constant, no_nans = consistency_data 139 com = 3.0 140 141 if is_constant: 142 count_x = x.expanding().count() 143 mean_x = x.ewm( 144 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 145 ).mean() 146 # check that correlation of a series with itself is either 1 or NaN 147 corr_x_x = x.ewm( 148 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 149 ).corr(x) 150 exp = x.max() if isinstance(x, Series) else x.max().max() 151 152 # check mean of constant series 153 expected = x * np.nan 154 expected[count_x >= max(min_periods, 1)] = exp 155 tm.assert_equal(mean_x, expected) 156 157 # check correlation of constant series with itself is NaN 158 expected[:] = np.nan 159 tm.assert_equal(corr_x_x, expected) 160 161 162@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) 163def test_ewm_consistency_var_debiasing_factors( 164 consistency_data, adjust, ignore_na, min_periods 165): 166 x, is_constant, no_nans = consistency_data 167 com = 3.0 168 169 # check variance debiasing factors 170 var_unbiased_x = x.ewm( 171 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 172 ).var(bias=False) 173 var_biased_x = x.ewm( 174 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 175 ).var(bias=True) 176 177 weights = create_mock_weights(x, com=com, adjust=adjust, ignore_na=ignore_na) 178 cum_sum = weights.cumsum().fillna(method="ffill") 179 cum_sum_sq = (weights * weights).cumsum().fillna(method="ffill") 180 numerator = cum_sum * cum_sum 181 denominator = numerator - cum_sum_sq 182 denominator[denominator <= 0.0] = np.nan 183 var_debiasing_factors_x = numerator / denominator 184 185 tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x) 186 187 188@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) 189@pytest.mark.parametrize("bias", [True, False]) 190def test_moments_consistency_var( 191 consistency_data, adjust, ignore_na, min_periods, bias 192): 193 x, is_constant, no_nans = consistency_data 194 com = 3.0 195 196 mean_x = x.ewm( 197 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 198 ).mean() 199 var_x = x.ewm( 200 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 201 ).var(bias=bias) 202 assert not (var_x < 0).any().any() 203 204 if bias: 205 # check that biased var(x) == mean(x^2) - mean(x)^2 206 mean_x2 = ( 207 (x * x) 208 .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na) 209 .mean() 210 ) 211 tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x)) 212 213 214@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) 215@pytest.mark.parametrize("bias", [True, False]) 216def test_moments_consistency_var_constant( 217 consistency_data, adjust, ignore_na, min_periods, bias 218): 219 x, is_constant, no_nans = consistency_data 220 com = 3.0 221 if is_constant: 222 count_x = x.expanding(min_periods=min_periods).count() 223 var_x = x.ewm( 224 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 225 ).var(bias=bias) 226 227 # check that variance of constant series is identically 0 228 assert not (var_x > 0).any().any() 229 expected = x * np.nan 230 expected[count_x >= max(min_periods, 1)] = 0.0 231 if not bias: 232 expected[count_x < 2] = np.nan 233 tm.assert_equal(var_x, expected) 234 235 236@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) 237@pytest.mark.parametrize("bias", [True, False]) 238def test_ewm_consistency_std(consistency_data, adjust, ignore_na, min_periods, bias): 239 x, is_constant, no_nans = consistency_data 240 com = 3.0 241 var_x = x.ewm( 242 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 243 ).var(bias=bias) 244 std_x = x.ewm( 245 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 246 ).std(bias=bias) 247 assert not (var_x < 0).any().any() 248 assert not (std_x < 0).any().any() 249 250 # check that var(x) == std(x)^2 251 tm.assert_equal(var_x, std_x * std_x) 252 253 254@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) 255@pytest.mark.parametrize("bias", [True, False]) 256def test_ewm_consistency_cov(consistency_data, adjust, ignore_na, min_periods, bias): 257 x, is_constant, no_nans = consistency_data 258 com = 3.0 259 var_x = x.ewm( 260 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 261 ).var(bias=bias) 262 assert not (var_x < 0).any().any() 263 264 cov_x_x = x.ewm( 265 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 266 ).cov(x, bias=bias) 267 assert not (cov_x_x < 0).any().any() 268 269 # check that var(x) == cov(x, x) 270 tm.assert_equal(var_x, cov_x_x) 271 272 273@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) 274@pytest.mark.parametrize("bias", [True, False]) 275def test_ewm_consistency_series_cov_corr( 276 consistency_data, adjust, ignore_na, min_periods, bias 277): 278 x, is_constant, no_nans = consistency_data 279 com = 3.0 280 281 if isinstance(x, Series): 282 var_x_plus_y = ( 283 (x + x) 284 .ewm(com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na) 285 .var(bias=bias) 286 ) 287 var_x = x.ewm( 288 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 289 ).var(bias=bias) 290 var_y = x.ewm( 291 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 292 ).var(bias=bias) 293 cov_x_y = x.ewm( 294 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 295 ).cov(x, bias=bias) 296 # check that cov(x, y) == (var(x+y) - var(x) - 297 # var(y)) / 2 298 tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y)) 299 300 # check that corr(x, y) == cov(x, y) / (std(x) * 301 # std(y)) 302 corr_x_y = x.ewm( 303 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 304 ).corr(x, bias=bias) 305 std_x = x.ewm( 306 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 307 ).std(bias=bias) 308 std_y = x.ewm( 309 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 310 ).std(bias=bias) 311 tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y)) 312 313 if bias: 314 # check that biased cov(x, y) == mean(x*y) - 315 # mean(x)*mean(y) 316 mean_x = x.ewm( 317 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 318 ).mean() 319 mean_y = x.ewm( 320 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 321 ).mean() 322 mean_x_times_y = ( 323 (x * x) 324 .ewm( 325 com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na 326 ) 327 .mean() 328 ) 329 tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y)) 330