1from datetime import datetime 2 3import numpy as np 4import pytest 5 6import pandas.util._test_decorators as td 7 8from pandas import DataFrame, DatetimeIndex, Index, MultiIndex, Series 9import pandas._testing as tm 10from pandas.core.window.common import flex_binary_moment 11 12 13def _rolling_consistency_cases(): 14 for window in [1, 2, 3, 10, 20]: 15 for min_periods in {0, 1, 2, 3, 4, window}: 16 if min_periods and (min_periods > window): 17 continue 18 for center in [False, True]: 19 yield window, min_periods, center 20 21 22# binary moments 23def test_rolling_cov(series): 24 A = series 25 B = A + np.random.randn(len(A)) 26 27 result = A.rolling(window=50, min_periods=25).cov(B) 28 tm.assert_almost_equal(result[-1], np.cov(A[-50:], B[-50:])[0, 1]) 29 30 31def test_rolling_corr(series): 32 A = series 33 B = A + np.random.randn(len(A)) 34 35 result = A.rolling(window=50, min_periods=25).corr(B) 36 tm.assert_almost_equal(result[-1], np.corrcoef(A[-50:], B[-50:])[0, 1]) 37 38 # test for correct bias correction 39 a = tm.makeTimeSeries() 40 b = tm.makeTimeSeries() 41 a[:5] = np.nan 42 b[:10] = np.nan 43 44 result = a.rolling(window=len(a), min_periods=1).corr(b) 45 tm.assert_almost_equal(result[-1], a.corr(b)) 46 47 48@pytest.mark.parametrize("func", ["cov", "corr"]) 49def test_rolling_pairwise_cov_corr(func, frame): 50 result = getattr(frame.rolling(window=10, min_periods=5), func)() 51 result = result.loc[(slice(None), 1), 5] 52 result.index = result.index.droplevel(1) 53 expected = getattr(frame[1].rolling(window=10, min_periods=5), func)(frame[5]) 54 tm.assert_series_equal(result, expected, check_names=False) 55 56 57@pytest.mark.parametrize("method", ["corr", "cov"]) 58def test_flex_binary_frame(method, frame): 59 series = frame[1] 60 61 res = getattr(series.rolling(window=10), method)(frame) 62 res2 = getattr(frame.rolling(window=10), method)(series) 63 exp = frame.apply(lambda x: getattr(series.rolling(window=10), method)(x)) 64 65 tm.assert_frame_equal(res, exp) 66 tm.assert_frame_equal(res2, exp) 67 68 frame2 = frame.copy() 69 frame2.values[:] = np.random.randn(*frame2.shape) 70 71 res3 = getattr(frame.rolling(window=10), method)(frame2) 72 exp = DataFrame( 73 {k: getattr(frame[k].rolling(window=10), method)(frame2[k]) for k in frame} 74 ) 75 tm.assert_frame_equal(res3, exp) 76 77 78@pytest.mark.parametrize( 79 "window,min_periods,center", list(_rolling_consistency_cases()) 80) 81@pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum]) 82def test_rolling_apply_consistency_sum_nans( 83 consistency_data, window, min_periods, center, f 84): 85 x, is_constant, no_nans = consistency_data 86 87 if f is np.nansum and min_periods == 0: 88 pass 89 else: 90 rolling_f_result = x.rolling( 91 window=window, min_periods=min_periods, center=center 92 ).sum() 93 rolling_apply_f_result = x.rolling( 94 window=window, min_periods=min_periods, center=center 95 ).apply(func=f, raw=True) 96 tm.assert_equal(rolling_f_result, rolling_apply_f_result) 97 98 99@pytest.mark.parametrize( 100 "window,min_periods,center", list(_rolling_consistency_cases()) 101) 102@pytest.mark.parametrize("f", [lambda v: Series(v).sum(), np.nansum, np.sum]) 103def test_rolling_apply_consistency_sum_no_nans( 104 consistency_data, window, min_periods, center, f 105): 106 x, is_constant, no_nans = consistency_data 107 108 if no_nans: 109 if f is np.nansum and min_periods == 0: 110 pass 111 else: 112 rolling_f_result = x.rolling( 113 window=window, min_periods=min_periods, center=center 114 ).sum() 115 rolling_apply_f_result = x.rolling( 116 window=window, min_periods=min_periods, center=center 117 ).apply(func=f, raw=True) 118 tm.assert_equal(rolling_f_result, rolling_apply_f_result) 119 120 121@pytest.mark.parametrize("window", range(7)) 122def test_rolling_corr_with_zero_variance(window): 123 # GH 18430 124 s = Series(np.zeros(20)) 125 other = Series(np.arange(20)) 126 127 assert s.rolling(window=window).corr(other=other).isna().all() 128 129 130def test_flex_binary_moment(): 131 # GH3155 132 # don't blow the stack 133 msg = "arguments to moment function must be of type np.ndarray/Series/DataFrame" 134 with pytest.raises(TypeError, match=msg): 135 flex_binary_moment(5, 6, None) 136 137 138def test_corr_sanity(): 139 # GH 3155 140 df = DataFrame( 141 np.array( 142 [ 143 [0.87024726, 0.18505595], 144 [0.64355431, 0.3091617], 145 [0.92372966, 0.50552513], 146 [0.00203756, 0.04520709], 147 [0.84780328, 0.33394331], 148 [0.78369152, 0.63919667], 149 ] 150 ) 151 ) 152 153 res = df[0].rolling(5, center=True).corr(df[1]) 154 assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res) 155 156 df = DataFrame(np.random.rand(30, 2)) 157 res = df[0].rolling(5, center=True).corr(df[1]) 158 assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res) 159 160 161def test_rolling_cov_diff_length(): 162 # GH 7512 163 s1 = Series([1, 2, 3], index=[0, 1, 2]) 164 s2 = Series([1, 3], index=[0, 2]) 165 result = s1.rolling(window=3, min_periods=2).cov(s2) 166 expected = Series([None, None, 2.0]) 167 tm.assert_series_equal(result, expected) 168 169 s2a = Series([1, None, 3], index=[0, 1, 2]) 170 result = s1.rolling(window=3, min_periods=2).cov(s2a) 171 tm.assert_series_equal(result, expected) 172 173 174def test_rolling_corr_diff_length(): 175 # GH 7512 176 s1 = Series([1, 2, 3], index=[0, 1, 2]) 177 s2 = Series([1, 3], index=[0, 2]) 178 result = s1.rolling(window=3, min_periods=2).corr(s2) 179 expected = Series([None, None, 1.0]) 180 tm.assert_series_equal(result, expected) 181 182 s2a = Series([1, None, 3], index=[0, 1, 2]) 183 result = s1.rolling(window=3, min_periods=2).corr(s2a) 184 tm.assert_series_equal(result, expected) 185 186 187@pytest.mark.parametrize( 188 "f", 189 [ 190 lambda x: x.rolling(window=10, min_periods=5).cov(x, pairwise=False), 191 lambda x: x.rolling(window=10, min_periods=5).corr(x, pairwise=False), 192 lambda x: x.rolling(window=10, min_periods=5).max(), 193 lambda x: x.rolling(window=10, min_periods=5).min(), 194 lambda x: x.rolling(window=10, min_periods=5).sum(), 195 lambda x: x.rolling(window=10, min_periods=5).mean(), 196 lambda x: x.rolling(window=10, min_periods=5).std(), 197 lambda x: x.rolling(window=10, min_periods=5).var(), 198 lambda x: x.rolling(window=10, min_periods=5).skew(), 199 lambda x: x.rolling(window=10, min_periods=5).kurt(), 200 lambda x: x.rolling(window=10, min_periods=5).quantile(quantile=0.5), 201 lambda x: x.rolling(window=10, min_periods=5).median(), 202 lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=False), 203 lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=True), 204 pytest.param( 205 lambda x: x.rolling(win_type="boxcar", window=10, min_periods=5).mean(), 206 marks=td.skip_if_no_scipy, 207 ), 208 ], 209) 210def test_rolling_functions_window_non_shrinkage(f): 211 # GH 7764 212 s = Series(range(4)) 213 s_expected = Series(np.nan, index=s.index) 214 df = DataFrame([[1, 5], [3, 2], [3, 9], [-1, 0]], columns=["A", "B"]) 215 df_expected = DataFrame(np.nan, index=df.index, columns=df.columns) 216 217 s_result = f(s) 218 tm.assert_series_equal(s_result, s_expected) 219 220 df_result = f(df) 221 tm.assert_frame_equal(df_result, df_expected) 222 223 224@pytest.mark.parametrize( 225 "f", 226 [ 227 lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)), 228 lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)), 229 ], 230) 231def test_rolling_functions_window_non_shrinkage_binary(f): 232 233 # corr/cov return a MI DataFrame 234 df = DataFrame( 235 [[1, 5], [3, 2], [3, 9], [-1, 0]], 236 columns=Index(["A", "B"], name="foo"), 237 index=Index(range(4), name="bar"), 238 ) 239 df_expected = DataFrame( 240 columns=Index(["A", "B"], name="foo"), 241 index=MultiIndex.from_product([df.index, df.columns], names=["bar", "foo"]), 242 dtype="float64", 243 ) 244 df_result = f(df) 245 tm.assert_frame_equal(df_result, df_expected) 246 247 248def test_rolling_skew_edge_cases(): 249 250 all_nan = Series([np.NaN] * 5) 251 252 # yields all NaN (0 variance) 253 d = Series([1] * 5) 254 x = d.rolling(window=5).skew() 255 tm.assert_series_equal(all_nan, x) 256 257 # yields all NaN (window too small) 258 d = Series(np.random.randn(5)) 259 x = d.rolling(window=2).skew() 260 tm.assert_series_equal(all_nan, x) 261 262 # yields [NaN, NaN, NaN, 0.177994, 1.548824] 263 d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401]) 264 expected = Series([np.NaN, np.NaN, np.NaN, 0.177994, 1.548824]) 265 x = d.rolling(window=4).skew() 266 tm.assert_series_equal(expected, x) 267 268 269def test_rolling_kurt_edge_cases(): 270 271 all_nan = Series([np.NaN] * 5) 272 273 # yields all NaN (0 variance) 274 d = Series([1] * 5) 275 x = d.rolling(window=5).kurt() 276 tm.assert_series_equal(all_nan, x) 277 278 # yields all NaN (window too small) 279 d = Series(np.random.randn(5)) 280 x = d.rolling(window=3).kurt() 281 tm.assert_series_equal(all_nan, x) 282 283 # yields [NaN, NaN, NaN, 1.224307, 2.671499] 284 d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401]) 285 expected = Series([np.NaN, np.NaN, np.NaN, 1.224307, 2.671499]) 286 x = d.rolling(window=4).kurt() 287 tm.assert_series_equal(expected, x) 288 289 290def test_rolling_skew_eq_value_fperr(): 291 # #18804 all rolling skew for all equal values should return Nan 292 a = Series([1.1] * 15).rolling(window=10).skew() 293 assert np.isnan(a).all() 294 295 296def test_rolling_kurt_eq_value_fperr(): 297 # #18804 all rolling kurt for all equal values should return Nan 298 a = Series([1.1] * 15).rolling(window=10).kurt() 299 assert np.isnan(a).all() 300 301 302def test_rolling_max_gh6297(): 303 """Replicate result expected in GH #6297""" 304 indices = [datetime(1975, 1, i) for i in range(1, 6)] 305 # So that we can have 2 datapoints on one of the days 306 indices.append(datetime(1975, 1, 3, 6, 0)) 307 series = Series(range(1, 7), index=indices) 308 # Use floats instead of ints as values 309 series = series.map(lambda x: float(x)) 310 # Sort chronologically 311 series = series.sort_index() 312 313 expected = Series( 314 [1.0, 2.0, 6.0, 4.0, 5.0], 315 index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), 316 ) 317 x = series.resample("D").max().rolling(window=1).max() 318 tm.assert_series_equal(expected, x) 319 320 321def test_rolling_max_resample(): 322 323 indices = [datetime(1975, 1, i) for i in range(1, 6)] 324 # So that we can have 3 datapoints on last day (4, 10, and 20) 325 indices.append(datetime(1975, 1, 5, 1)) 326 indices.append(datetime(1975, 1, 5, 2)) 327 series = Series(list(range(0, 5)) + [10, 20], index=indices) 328 # Use floats instead of ints as values 329 series = series.map(lambda x: float(x)) 330 # Sort chronologically 331 series = series.sort_index() 332 333 # Default how should be max 334 expected = Series( 335 [0.0, 1.0, 2.0, 3.0, 20.0], 336 index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), 337 ) 338 x = series.resample("D").max().rolling(window=1).max() 339 tm.assert_series_equal(expected, x) 340 341 # Now specify median (10.0) 342 expected = Series( 343 [0.0, 1.0, 2.0, 3.0, 10.0], 344 index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), 345 ) 346 x = series.resample("D").median().rolling(window=1).max() 347 tm.assert_series_equal(expected, x) 348 349 # Now specify mean (4+10+20)/3 350 v = (4.0 + 10.0 + 20.0) / 3.0 351 expected = Series( 352 [0.0, 1.0, 2.0, 3.0, v], 353 index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), 354 ) 355 x = series.resample("D").mean().rolling(window=1).max() 356 tm.assert_series_equal(expected, x) 357 358 359def test_rolling_min_resample(): 360 361 indices = [datetime(1975, 1, i) for i in range(1, 6)] 362 # So that we can have 3 datapoints on last day (4, 10, and 20) 363 indices.append(datetime(1975, 1, 5, 1)) 364 indices.append(datetime(1975, 1, 5, 2)) 365 series = Series(list(range(0, 5)) + [10, 20], index=indices) 366 # Use floats instead of ints as values 367 series = series.map(lambda x: float(x)) 368 # Sort chronologically 369 series = series.sort_index() 370 371 # Default how should be min 372 expected = Series( 373 [0.0, 1.0, 2.0, 3.0, 4.0], 374 index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), 375 ) 376 r = series.resample("D").min().rolling(window=1) 377 tm.assert_series_equal(expected, r.min()) 378 379 380def test_rolling_median_resample(): 381 382 indices = [datetime(1975, 1, i) for i in range(1, 6)] 383 # So that we can have 3 datapoints on last day (4, 10, and 20) 384 indices.append(datetime(1975, 1, 5, 1)) 385 indices.append(datetime(1975, 1, 5, 2)) 386 series = Series(list(range(0, 5)) + [10, 20], index=indices) 387 # Use floats instead of ints as values 388 series = series.map(lambda x: float(x)) 389 # Sort chronologically 390 series = series.sort_index() 391 392 # Default how should be median 393 expected = Series( 394 [0.0, 1.0, 2.0, 3.0, 10], 395 index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), 396 ) 397 x = series.resample("D").median().rolling(window=1).median() 398 tm.assert_series_equal(expected, x) 399 400 401def test_rolling_median_memory_error(): 402 # GH11722 403 n = 20000 404 Series(np.random.randn(n)).rolling(window=2, center=False).median() 405 Series(np.random.randn(n)).rolling(window=2, center=False).median() 406 407 408@pytest.mark.parametrize( 409 "data_type", 410 [np.dtype(f"f{width}") for width in [4, 8]] 411 + [np.dtype(f"{sign}{width}") for width in [1, 2, 4, 8] for sign in "ui"], 412) 413def test_rolling_min_max_numeric_types(data_type): 414 # GH12373 415 416 # Just testing that these don't throw exceptions and that 417 # the return type is float64. Other tests will cover quantitative 418 # correctness 419 result = DataFrame(np.arange(20, dtype=data_type)).rolling(window=5).max() 420 assert result.dtypes[0] == np.dtype("f8") 421 result = DataFrame(np.arange(20, dtype=data_type)).rolling(window=5).min() 422 assert result.dtypes[0] == np.dtype("f8") 423 424 425@pytest.mark.parametrize( 426 "f", 427 [ 428 lambda x: x.rolling(window=10, min_periods=0).count(), 429 lambda x: x.rolling(window=10, min_periods=5).cov(x, pairwise=False), 430 lambda x: x.rolling(window=10, min_periods=5).corr(x, pairwise=False), 431 lambda x: x.rolling(window=10, min_periods=5).max(), 432 lambda x: x.rolling(window=10, min_periods=5).min(), 433 lambda x: x.rolling(window=10, min_periods=5).sum(), 434 lambda x: x.rolling(window=10, min_periods=5).mean(), 435 lambda x: x.rolling(window=10, min_periods=5).std(), 436 lambda x: x.rolling(window=10, min_periods=5).var(), 437 lambda x: x.rolling(window=10, min_periods=5).skew(), 438 lambda x: x.rolling(window=10, min_periods=5).kurt(), 439 lambda x: x.rolling(window=10, min_periods=5).quantile(0.5), 440 lambda x: x.rolling(window=10, min_periods=5).median(), 441 lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=False), 442 lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=True), 443 pytest.param( 444 lambda x: x.rolling(win_type="boxcar", window=10, min_periods=5).mean(), 445 marks=td.skip_if_no_scipy, 446 ), 447 ], 448) 449def test_moment_functions_zero_length(f): 450 # GH 8056 451 s = Series(dtype=np.float64) 452 s_expected = s 453 df1 = DataFrame() 454 df1_expected = df1 455 df2 = DataFrame(columns=["a"]) 456 df2["a"] = df2["a"].astype("float64") 457 df2_expected = df2 458 459 s_result = f(s) 460 tm.assert_series_equal(s_result, s_expected) 461 462 df1_result = f(df1) 463 tm.assert_frame_equal(df1_result, df1_expected) 464 465 df2_result = f(df2) 466 tm.assert_frame_equal(df2_result, df2_expected) 467 468 469@pytest.mark.parametrize( 470 "f", 471 [ 472 lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)), 473 lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)), 474 ], 475) 476def test_moment_functions_zero_length_pairwise(f): 477 478 df1 = DataFrame() 479 df2 = DataFrame(columns=Index(["a"], name="foo"), index=Index([], name="bar")) 480 df2["a"] = df2["a"].astype("float64") 481 482 df1_expected = DataFrame( 483 index=MultiIndex.from_product([df1.index, df1.columns]), columns=Index([]) 484 ) 485 df2_expected = DataFrame( 486 index=MultiIndex.from_product([df2.index, df2.columns], names=["bar", "foo"]), 487 columns=Index(["a"], name="foo"), 488 dtype="float64", 489 ) 490 491 df1_result = f(df1) 492 tm.assert_frame_equal(df1_result, df1_expected) 493 494 df2_result = f(df2) 495 tm.assert_frame_equal(df2_result, df2_expected) 496 497 498@pytest.mark.parametrize( 499 "window,min_periods,center", list(_rolling_consistency_cases()) 500) 501@pytest.mark.parametrize("ddof", [0, 1]) 502def test_moments_consistency_var(consistency_data, window, min_periods, center, ddof): 503 x, is_constant, no_nans = consistency_data 504 505 mean_x = x.rolling(window=window, min_periods=min_periods, center=center).mean() 506 var_x = x.rolling(window=window, min_periods=min_periods, center=center).var( 507 ddof=ddof 508 ) 509 assert not (var_x < 0).any().any() 510 511 if ddof == 0: 512 # check that biased var(x) == mean(x^2) - mean(x)^2 513 mean_x2 = ( 514 (x * x) 515 .rolling(window=window, min_periods=min_periods, center=center) 516 .mean() 517 ) 518 tm.assert_equal(var_x, mean_x2 - (mean_x * mean_x)) 519 520 521@pytest.mark.parametrize( 522 "window,min_periods,center", list(_rolling_consistency_cases()) 523) 524@pytest.mark.parametrize("ddof", [0, 1]) 525def test_moments_consistency_var_constant( 526 consistency_data, window, min_periods, center, ddof 527): 528 x, is_constant, no_nans = consistency_data 529 530 if is_constant: 531 count_x = x.rolling( 532 window=window, min_periods=min_periods, center=center 533 ).count() 534 var_x = x.rolling(window=window, min_periods=min_periods, center=center).var( 535 ddof=ddof 536 ) 537 538 # check that variance of constant series is identically 0 539 assert not (var_x > 0).any().any() 540 expected = x * np.nan 541 expected[count_x >= max(min_periods, 1)] = 0.0 542 if ddof == 1: 543 expected[count_x < 2] = np.nan 544 tm.assert_equal(var_x, expected) 545 546 547@pytest.mark.parametrize( 548 "window,min_periods,center", list(_rolling_consistency_cases()) 549) 550@pytest.mark.parametrize("ddof", [0, 1]) 551def test_rolling_consistency_std(consistency_data, window, min_periods, center, ddof): 552 x, is_constant, no_nans = consistency_data 553 554 var_x = x.rolling(window=window, min_periods=min_periods, center=center).var( 555 ddof=ddof 556 ) 557 std_x = x.rolling(window=window, min_periods=min_periods, center=center).std( 558 ddof=ddof 559 ) 560 assert not (var_x < 0).any().any() 561 assert not (std_x < 0).any().any() 562 563 # check that var(x) == std(x)^2 564 tm.assert_equal(var_x, std_x * std_x) 565 566 567@pytest.mark.parametrize( 568 "window,min_periods,center", list(_rolling_consistency_cases()) 569) 570@pytest.mark.parametrize("ddof", [0, 1]) 571def test_rolling_consistency_cov(consistency_data, window, min_periods, center, ddof): 572 x, is_constant, no_nans = consistency_data 573 var_x = x.rolling(window=window, min_periods=min_periods, center=center).var( 574 ddof=ddof 575 ) 576 assert not (var_x < 0).any().any() 577 578 cov_x_x = x.rolling(window=window, min_periods=min_periods, center=center).cov( 579 x, ddof=ddof 580 ) 581 assert not (cov_x_x < 0).any().any() 582 583 # check that var(x) == cov(x, x) 584 tm.assert_equal(var_x, cov_x_x) 585 586 587@pytest.mark.parametrize( 588 "window,min_periods,center", list(_rolling_consistency_cases()) 589) 590@pytest.mark.parametrize("ddof", [0, 1]) 591def test_rolling_consistency_series_cov_corr( 592 consistency_data, window, min_periods, center, ddof 593): 594 x, is_constant, no_nans = consistency_data 595 596 if isinstance(x, Series): 597 var_x_plus_y = ( 598 (x + x) 599 .rolling(window=window, min_periods=min_periods, center=center) 600 .var(ddof=ddof) 601 ) 602 var_x = x.rolling(window=window, min_periods=min_periods, center=center).var( 603 ddof=ddof 604 ) 605 var_y = x.rolling(window=window, min_periods=min_periods, center=center).var( 606 ddof=ddof 607 ) 608 cov_x_y = x.rolling(window=window, min_periods=min_periods, center=center).cov( 609 x, ddof=ddof 610 ) 611 # check that cov(x, y) == (var(x+y) - var(x) - 612 # var(y)) / 2 613 tm.assert_equal(cov_x_y, 0.5 * (var_x_plus_y - var_x - var_y)) 614 615 # check that corr(x, y) == cov(x, y) / (std(x) * 616 # std(y)) 617 corr_x_y = x.rolling( 618 window=window, min_periods=min_periods, center=center 619 ).corr(x) 620 std_x = x.rolling(window=window, min_periods=min_periods, center=center).std( 621 ddof=ddof 622 ) 623 std_y = x.rolling(window=window, min_periods=min_periods, center=center).std( 624 ddof=ddof 625 ) 626 tm.assert_equal(corr_x_y, cov_x_y / (std_x * std_y)) 627 628 if ddof == 0: 629 # check that biased cov(x, y) == mean(x*y) - 630 # mean(x)*mean(y) 631 mean_x = x.rolling( 632 window=window, min_periods=min_periods, center=center 633 ).mean() 634 mean_y = x.rolling( 635 window=window, min_periods=min_periods, center=center 636 ).mean() 637 mean_x_times_y = ( 638 (x * x) 639 .rolling(window=window, min_periods=min_periods, center=center) 640 .mean() 641 ) 642 tm.assert_equal(cov_x_y, mean_x_times_y - (mean_x * mean_y)) 643 644 645@pytest.mark.parametrize( 646 "window,min_periods,center", list(_rolling_consistency_cases()) 647) 648def test_rolling_consistency_mean(consistency_data, window, min_periods, center): 649 x, is_constant, no_nans = consistency_data 650 651 result = x.rolling(window=window, min_periods=min_periods, center=center).mean() 652 expected = ( 653 x.rolling(window=window, min_periods=min_periods, center=center) 654 .sum() 655 .divide( 656 x.rolling(window=window, min_periods=min_periods, center=center).count() 657 ) 658 ) 659 tm.assert_equal(result, expected.astype("float64")) 660 661 662@pytest.mark.parametrize( 663 "window,min_periods,center", list(_rolling_consistency_cases()) 664) 665def test_rolling_consistency_constant(consistency_data, window, min_periods, center): 666 x, is_constant, no_nans = consistency_data 667 668 if is_constant: 669 count_x = x.rolling( 670 window=window, min_periods=min_periods, center=center 671 ).count() 672 mean_x = x.rolling(window=window, min_periods=min_periods, center=center).mean() 673 # check that correlation of a series with itself is either 1 or NaN 674 corr_x_x = x.rolling( 675 window=window, min_periods=min_periods, center=center 676 ).corr(x) 677 678 exp = x.max() if isinstance(x, Series) else x.max().max() 679 680 # check mean of constant series 681 expected = x * np.nan 682 expected[count_x >= max(min_periods, 1)] = exp 683 tm.assert_equal(mean_x, expected) 684 685 # check correlation of constant series with itself is NaN 686 expected[:] = np.nan 687 tm.assert_equal(corr_x_x, expected) 688 689 690@pytest.mark.parametrize( 691 "window,min_periods,center", list(_rolling_consistency_cases()) 692) 693def test_rolling_consistency_var_debiasing_factors( 694 consistency_data, window, min_periods, center 695): 696 x, is_constant, no_nans = consistency_data 697 698 # check variance debiasing factors 699 var_unbiased_x = x.rolling( 700 window=window, min_periods=min_periods, center=center 701 ).var() 702 var_biased_x = x.rolling(window=window, min_periods=min_periods, center=center).var( 703 ddof=0 704 ) 705 var_debiasing_factors_x = ( 706 x.rolling(window=window, min_periods=min_periods, center=center) 707 .count() 708 .divide( 709 ( 710 x.rolling(window=window, min_periods=min_periods, center=center).count() 711 - 1.0 712 ).replace(0.0, np.nan) 713 ) 714 ) 715 tm.assert_equal(var_unbiased_x, var_biased_x * var_debiasing_factors_x) 716