1import collections 2import operator 3import warnings 4 5import numpy as np 6import pandas as pd 7import pytest 8 9import dask 10import dask.dataframe as dd 11from dask.dataframe import _compat 12from dask.dataframe._compat import PANDAS_GT_110, tm 13from dask.dataframe.utils import assert_dask_graph, assert_eq, assert_max_deps 14from dask.utils import M 15 16AGG_FUNCS = [ 17 "sum", 18 "mean", 19 "min", 20 "max", 21 "count", 22 "size", 23 "std", 24 "var", 25 "cov", 26 "corr", 27 "nunique", 28 "first", 29 "last", 30 "prod", 31] 32 33 34@pytest.fixture(params=AGG_FUNCS) 35def agg_func(request): 36 """ 37 Aggregations supported for groups 38 """ 39 return request.param 40 41 42# Wrapper fixture for shuffle_method to auto-apply it to all the tests in this module, 43# as we don't want to auto-apply the fixture repo-wide. 44@pytest.fixture(autouse=True) 45def auto_shuffle_method(shuffle_method): 46 yield 47 48 49@pytest.mark.xfail(reason="uncertain how to handle. See issue #3481.") 50def test_groupby_internal_repr_xfail(): 51 pdf = pd.DataFrame({"x": [0, 1, 2, 3, 4, 6, 7, 8, 9, 10], "y": list("abcbabbcda")}) 52 ddf = dd.from_pandas(pdf, 3) 53 54 gp = pdf.groupby("y")["x"] 55 dp = ddf.groupby("y")["x"] 56 assert isinstance(dp.obj, dd.Series) 57 assert_eq(dp.obj, gp.obj) 58 59 gp = pdf.groupby(pdf.y)["x"] 60 dp = ddf.groupby(ddf.y)["x"] 61 assert isinstance(dp.obj, dd.Series) 62 63 64def test_groupby_internal_repr(): 65 pdf = pd.DataFrame({"x": [0, 1, 2, 3, 4, 6, 7, 8, 9, 10], "y": list("abcbabbcda")}) 66 ddf = dd.from_pandas(pdf, 3) 67 68 gp = pdf.groupby("y") 69 dp = ddf.groupby("y") 70 assert isinstance(dp, dd.groupby.DataFrameGroupBy) 71 assert isinstance(dp._meta, pd.core.groupby.DataFrameGroupBy) 72 assert isinstance(dp.obj, dd.DataFrame) 73 assert_eq(dp.obj, gp.obj) 74 75 gp = pdf.groupby("y")["x"] 76 dp = ddf.groupby("y")["x"] 77 assert isinstance(dp, dd.groupby.SeriesGroupBy) 78 assert isinstance(dp._meta, pd.core.groupby.SeriesGroupBy) 79 80 gp = pdf.groupby("y")[["x"]] 81 dp = ddf.groupby("y")[["x"]] 82 assert isinstance(dp, dd.groupby.DataFrameGroupBy) 83 assert isinstance(dp._meta, pd.core.groupby.DataFrameGroupBy) 84 # slicing should not affect to internal 85 assert isinstance(dp.obj, dd.DataFrame) 86 assert_eq(dp.obj, gp.obj) 87 88 gp = pdf.groupby(pdf.y)["x"] 89 dp = ddf.groupby(ddf.y)["x"] 90 assert isinstance(dp, dd.groupby.SeriesGroupBy) 91 assert isinstance(dp._meta, pd.core.groupby.SeriesGroupBy) 92 93 gp = pdf.groupby(pdf.y)[["x"]] 94 dp = ddf.groupby(ddf.y)[["x"]] 95 assert isinstance(dp, dd.groupby.DataFrameGroupBy) 96 assert isinstance(dp._meta, pd.core.groupby.DataFrameGroupBy) 97 # slicing should not affect to internal 98 assert isinstance(dp.obj, dd.DataFrame) 99 assert_eq(dp.obj, gp.obj) 100 101 102def test_groupby_error(): 103 pdf = pd.DataFrame({"x": [0, 1, 2, 3, 4, 6, 7, 8, 9, 10], "y": list("abcbabbcda")}) 104 ddf = dd.from_pandas(pdf, 3) 105 106 with pytest.raises(KeyError): 107 ddf.groupby("A") 108 109 with pytest.raises(KeyError): 110 ddf.groupby(["x", "A"]) 111 112 dp = ddf.groupby("y") 113 114 msg = "Column not found: " 115 with pytest.raises(KeyError) as err: 116 dp["A"] 117 assert msg in str(err.value) 118 119 msg = "Columns not found: " 120 with pytest.raises(KeyError) as err: 121 dp[["x", "A"]] 122 assert msg in str(err.value) 123 124 125def test_full_groupby(): 126 df = pd.DataFrame( 127 {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]}, 128 index=[0, 1, 3, 5, 6, 8, 9, 9, 9], 129 ) 130 ddf = dd.from_pandas(df, npartitions=3) 131 132 pytest.raises(KeyError, lambda: ddf.groupby("does_not_exist")) 133 pytest.raises(AttributeError, lambda: ddf.groupby("a").does_not_exist) 134 assert "b" in dir(ddf.groupby("a")) 135 136 def func(df): 137 return df.assign(b=df.b - df.b.mean()) 138 139 with warnings.catch_warnings(): 140 warnings.simplefilter("ignore") 141 assert ddf.groupby("a").apply(func)._name.startswith("func") 142 143 assert_eq(df.groupby("a").apply(func), ddf.groupby("a").apply(func)) 144 145 146def test_full_groupby_apply_multiarg(): 147 df = pd.DataFrame( 148 {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]}, 149 index=[0, 1, 3, 5, 6, 8, 9, 9, 9], 150 ) 151 ddf = dd.from_pandas(df, npartitions=3) 152 153 def func(df, c, d=3): 154 return df.assign(b=df.b - df.b.mean() + c * d) 155 156 c = df.a.sum() 157 d = df.b.mean() 158 159 c_scalar = ddf.a.sum() 160 d_scalar = ddf.b.mean() 161 c_delayed = dask.delayed(lambda: c)() 162 d_delayed = dask.delayed(lambda: d)() 163 164 meta = df.groupby("a").apply(func, c) 165 166 with warnings.catch_warnings(): 167 warnings.simplefilter("ignore") 168 assert_eq( 169 df.groupby("a").apply(func, c, d=d), 170 ddf.groupby("a").apply(func, c, d=d_scalar), 171 ) 172 173 assert_eq(df.groupby("a").apply(func, c), ddf.groupby("a").apply(func, c)) 174 175 assert_eq( 176 df.groupby("a").apply(func, c, d=d), ddf.groupby("a").apply(func, c, d=d) 177 ) 178 179 assert_eq( 180 df.groupby("a").apply(func, c), 181 ddf.groupby("a").apply(func, c_scalar), 182 check_dtype=False, 183 ) 184 185 assert_eq( 186 df.groupby("a").apply(func, c), 187 ddf.groupby("a").apply(func, c_scalar, meta=meta), 188 ) 189 190 assert_eq( 191 df.groupby("a").apply(func, c, d=d), 192 ddf.groupby("a").apply(func, c, d=d_scalar, meta=meta), 193 ) 194 195 # Delayed arguments work, but only if metadata is provided 196 with pytest.raises(ValueError) as exc: 197 ddf.groupby("a").apply(func, c, d=d_delayed) 198 assert "dask.delayed" in str(exc.value) and "meta" in str(exc.value) 199 200 with pytest.raises(ValueError) as exc: 201 ddf.groupby("a").apply(func, c_delayed, d=d) 202 assert "dask.delayed" in str(exc.value) and "meta" in str(exc.value) 203 204 assert_eq( 205 df.groupby("a").apply(func, c), 206 ddf.groupby("a").apply(func, c_delayed, meta=meta), 207 ) 208 209 assert_eq( 210 df.groupby("a").apply(func, c, d=d), 211 ddf.groupby("a").apply(func, c, d=d_delayed, meta=meta), 212 ) 213 214 215@pytest.mark.parametrize( 216 "grouper", 217 [ 218 lambda df: ["a"], 219 lambda df: ["a", "b"], 220 lambda df: df["a"], 221 lambda df: [df["a"], df["b"]], 222 pytest.param( 223 lambda df: [df["a"] > 2, df["b"] > 1], 224 marks=pytest.mark.xfail(reason="not yet supported"), 225 ), 226 ], 227) 228@pytest.mark.parametrize("reverse", [True, False]) 229def test_full_groupby_multilevel(grouper, reverse): 230 index = [0, 1, 3, 5, 6, 8, 9, 9, 9] 231 if reverse: 232 index = index[::-1] 233 df = pd.DataFrame( 234 { 235 "a": [1, 2, 3, 4, 5, 6, 7, 8, 9], 236 "d": [1, 2, 3, 4, 5, 6, 7, 8, 9], 237 "b": [4, 5, 6, 3, 2, 1, 0, 0, 0], 238 }, 239 index=index, 240 ) 241 ddf = dd.from_pandas(df, npartitions=3) 242 243 def func(df): 244 return df.assign(b=df.b - df.b.mean()) 245 246 # last one causes a DeprecationWarning from pandas. 247 # See https://github.com/pandas-dev/pandas/issues/16481 248 with warnings.catch_warnings(): 249 warnings.simplefilter("ignore") 250 assert_eq( 251 df.groupby(grouper(df)).apply(func), ddf.groupby(grouper(ddf)).apply(func) 252 ) 253 254 255def test_groupby_dir(): 256 df = pd.DataFrame({"a": range(10), "b c d e": range(10)}) 257 ddf = dd.from_pandas(df, npartitions=2) 258 g = ddf.groupby("a") 259 assert "a" in dir(g) 260 assert "b c d e" not in dir(g) 261 262 263@pytest.mark.parametrize("scheduler", ["sync", "threads"]) 264def test_groupby_on_index(scheduler): 265 pdf = pd.DataFrame( 266 {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]}, 267 index=[0, 1, 3, 5, 6, 8, 9, 9, 9], 268 ) 269 ddf = dd.from_pandas(pdf, npartitions=3) 270 271 ddf2 = ddf.set_index("a") 272 pdf2 = pdf.set_index("a") 273 assert_eq(ddf.groupby("a").b.mean(), ddf2.groupby(ddf2.index).b.mean()) 274 275 def func(df): 276 return df.assign(b=df.b - df.b.mean()) 277 278 def func2(df): 279 return df[["b"]] - df[["b"]].mean() 280 281 def func3(df): 282 return df.mean() 283 284 with dask.config.set(scheduler=scheduler): 285 with pytest.warns(None): 286 assert_eq(ddf.groupby("a").apply(func), pdf.groupby("a").apply(func)) 287 288 assert_eq( 289 ddf.groupby("a").apply(func).set_index("a"), 290 pdf.groupby("a").apply(func).set_index("a"), 291 ) 292 293 assert_eq( 294 pdf2.groupby(pdf2.index).apply(func2), 295 ddf2.groupby(ddf2.index).apply(func2), 296 ) 297 298 assert_eq( 299 ddf2.b.groupby("a").apply(func3), pdf2.b.groupby("a").apply(func3) 300 ) 301 302 assert_eq( 303 ddf2.b.groupby(ddf2.index).apply(func3), 304 pdf2.b.groupby(pdf2.index).apply(func3), 305 ) 306 307 308@pytest.mark.parametrize( 309 "grouper", 310 [ 311 lambda df: df.groupby("a")["b"], 312 lambda df: df.groupby(["a", "b"]), 313 lambda df: df.groupby(["a", "b"])["c"], 314 lambda df: df.groupby(df["a"])[["b", "c"]], 315 lambda df: df.groupby("a")[["b", "c"]], 316 lambda df: df.groupby("a")[["b"]], 317 lambda df: df.groupby(["a", "b", "c"]), 318 ], 319) 320def test_groupby_multilevel_getitem(grouper, agg_func): 321 # nunique is not implemented for DataFrameGroupBy 322 if agg_func == "nunique": 323 return 324 325 df = pd.DataFrame( 326 { 327 "a": [1, 2, 3, 1, 2, 3], 328 "b": [1, 2, 1, 4, 2, 1], 329 "c": [1, 3, 2, 1, 1, 2], 330 "d": [1, 2, 1, 1, 2, 2], 331 } 332 ) 333 ddf = dd.from_pandas(df, 2) 334 335 dask_group = grouper(ddf) 336 pandas_group = grouper(df) 337 338 # covariance/correlation only works with N+1 columns 339 if isinstance(pandas_group, pd.core.groupby.SeriesGroupBy) and agg_func in ( 340 "cov", 341 "corr", 342 ): 343 return 344 345 dask_agg = getattr(dask_group, agg_func) 346 pandas_agg = getattr(pandas_group, agg_func) 347 348 assert isinstance(dask_group, dd.groupby._GroupBy) 349 assert isinstance(pandas_group, pd.core.groupby.GroupBy) 350 351 if agg_func == "mean": 352 assert_eq(dask_agg(), pandas_agg().astype(float)) 353 else: 354 a = dask_agg() 355 with warnings.catch_warnings(): 356 # pandas does `.cov([[1], [1]])` which numpy warns on (all NaN). 357 # Pandas does strange things with exceptions in groupby. 358 warnings.simplefilter("ignore", RuntimeWarning) 359 b = pandas_agg() 360 assert_eq(a, b) 361 362 363def test_groupby_multilevel_agg(): 364 df = pd.DataFrame( 365 { 366 "a": [1, 2, 3, 1, 2, 3], 367 "b": [1, 2, 1, 4, 2, 1], 368 "c": [1, 3, 2, 1, 1, 2], 369 "d": [1, 2, 1, 1, 2, 2], 370 } 371 ) 372 ddf = dd.from_pandas(df, 2) 373 374 sol = df.groupby(["a"]).mean() 375 res = ddf.groupby(["a"]).mean() 376 assert_eq(res, sol) 377 378 sol = df.groupby(["a", "c"]).mean() 379 res = ddf.groupby(["a", "c"]).mean() 380 assert_eq(res, sol) 381 382 sol = df.groupby([df["a"], df["c"]]).mean() 383 res = ddf.groupby([ddf["a"], ddf["c"]]).mean() 384 assert_eq(res, sol) 385 386 387def test_groupby_get_group(): 388 dsk = { 389 ("x", 0): pd.DataFrame({"a": [1, 2, 6], "b": [4, 2, 7]}, index=[0, 1, 3]), 390 ("x", 1): pd.DataFrame({"a": [4, 2, 6], "b": [3, 3, 1]}, index=[5, 6, 8]), 391 ("x", 2): pd.DataFrame({"a": [4, 3, 7], "b": [1, 1, 3]}, index=[9, 9, 9]), 392 } 393 meta = dsk[("x", 0)] 394 d = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9]) 395 full = d.compute() 396 397 for ddkey, pdkey in [("b", "b"), (d.b, full.b), (d.b + 1, full.b + 1)]: 398 ddgrouped = d.groupby(ddkey) 399 pdgrouped = full.groupby(pdkey) 400 # DataFrame 401 assert_eq(ddgrouped.get_group(2), pdgrouped.get_group(2)) 402 assert_eq(ddgrouped.get_group(3), pdgrouped.get_group(3)) 403 # Series 404 assert_eq(ddgrouped.a.get_group(3), pdgrouped.a.get_group(3)) 405 assert_eq(ddgrouped.a.get_group(2), pdgrouped.a.get_group(2)) 406 407 408def test_dataframe_groupby_nunique(): 409 strings = list("aaabbccccdddeee") 410 data = np.random.randn(len(strings)) 411 ps = pd.DataFrame(dict(strings=strings, data=data)) 412 s = dd.from_pandas(ps, npartitions=3) 413 expected = ps.groupby("strings")["data"].nunique() 414 assert_eq(s.groupby("strings")["data"].nunique(), expected) 415 416 417def test_dataframe_groupby_nunique_across_group_same_value(): 418 strings = list("aaabbccccdddeee") 419 data = list(map(int, "123111223323412")) 420 ps = pd.DataFrame(dict(strings=strings, data=data)) 421 s = dd.from_pandas(ps, npartitions=3) 422 expected = ps.groupby("strings")["data"].nunique() 423 assert_eq(s.groupby("strings")["data"].nunique(), expected) 424 425 426def test_series_groupby_propagates_names(): 427 df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) 428 ddf = dd.from_pandas(df, 2) 429 func = lambda df: df["y"].sum() 430 with pytest.warns(UserWarning): # meta inference 431 result = ddf.groupby("x").apply(func) 432 expected = df.groupby("x").apply(func) 433 assert_eq(result, expected) 434 435 436def test_series_groupby(): 437 s = pd.Series([1, 2, 2, 1, 1]) 438 pd_group = s.groupby(s) 439 440 ss = dd.from_pandas(s, npartitions=2) 441 dask_group = ss.groupby(ss) 442 443 pd_group2 = s.groupby(s + 1) 444 dask_group2 = ss.groupby(ss + 1) 445 446 for dg, pdg in [(dask_group, pd_group), (pd_group2, dask_group2)]: 447 assert_eq(dg.count(), pdg.count()) 448 assert_eq(dg.sum(), pdg.sum()) 449 assert_eq(dg.min(), pdg.min()) 450 assert_eq(dg.max(), pdg.max()) 451 assert_eq(dg.size(), pdg.size()) 452 assert_eq(dg.first(), pdg.first()) 453 assert_eq(dg.last(), pdg.last()) 454 assert_eq(dg.prod(), pdg.prod()) 455 456 457def test_series_groupby_errors(): 458 s = pd.Series([1, 2, 2, 1, 1]) 459 460 ss = dd.from_pandas(s, npartitions=2) 461 462 msg = "No group keys passed!" 463 with pytest.raises(ValueError) as err: 464 s.groupby([]) # pandas 465 assert msg in str(err.value) 466 with pytest.raises(ValueError) as err: 467 ss.groupby([]) # dask should raise the same error 468 assert msg in str(err.value) 469 470 sss = dd.from_pandas(s, npartitions=5) 471 with pytest.raises(NotImplementedError): 472 ss.groupby(sss) 473 474 with pytest.raises(KeyError): 475 s.groupby("x") # pandas 476 with pytest.raises(KeyError): 477 ss.groupby("x") # dask should raise the same error 478 479 480def test_groupby_index_array(): 481 df = _compat.makeTimeDataFrame() 482 ddf = dd.from_pandas(df, npartitions=2) 483 484 # first select column, then group 485 assert_eq( 486 df.A.groupby(df.index.month).nunique(), 487 ddf.A.groupby(ddf.index.month).nunique(), 488 check_names=False, 489 ) 490 491 # first group, then select column 492 assert_eq( 493 df.groupby(df.index.month).A.nunique(), 494 ddf.groupby(ddf.index.month).A.nunique(), 495 check_names=False, 496 ) 497 498 499def test_groupby_set_index(): 500 df = _compat.makeTimeDataFrame() 501 ddf = dd.from_pandas(df, npartitions=2) 502 pytest.raises(TypeError, lambda: ddf.groupby(df.index.month, as_index=False)) 503 504 505@pytest.mark.parametrize("empty", [True, False]) 506def test_split_apply_combine_on_series(empty): 507 if empty: 508 pdf = pd.DataFrame({"a": [1.0], "b": [1.0]}, index=[0]).iloc[:0] 509 # There's a bug in pandas where df.groupby(...).var(ddof=0) results in 510 # no columns. Just skip these checks for now. 511 ddofs = [] 512 else: 513 ddofs = [0, 1, 2] 514 pdf = pd.DataFrame( 515 {"a": [1, 2, 6, 4, 4, 6, 4, 3, 7], "b": [4, 2, 7, 3, 3, 1, 1, 1, 2]}, 516 index=[0, 1, 3, 5, 6, 8, 9, 9, 9], 517 ) 518 ddf = dd.from_pandas(pdf, npartitions=3) 519 520 for ddkey, pdkey in [("b", "b"), (ddf.b, pdf.b), (ddf.b + 1, pdf.b + 1)]: 521 assert_eq(ddf.groupby(ddkey).a.min(), pdf.groupby(pdkey).a.min()) 522 assert_eq(ddf.groupby(ddkey).a.max(), pdf.groupby(pdkey).a.max()) 523 assert_eq(ddf.groupby(ddkey).a.count(), pdf.groupby(pdkey).a.count()) 524 assert_eq(ddf.groupby(ddkey).a.mean(), pdf.groupby(pdkey).a.mean()) 525 assert_eq(ddf.groupby(ddkey).a.nunique(), pdf.groupby(pdkey).a.nunique()) 526 assert_eq(ddf.groupby(ddkey).a.size(), pdf.groupby(pdkey).a.size()) 527 assert_eq(ddf.groupby(ddkey).a.first(), pdf.groupby(pdkey).a.first()) 528 assert_eq(ddf.groupby(ddkey).a.last(), pdf.groupby(pdkey).a.last()) 529 assert_eq(ddf.groupby(ddkey).a.tail(), pdf.groupby(pdkey).a.tail()) 530 assert_eq(ddf.groupby(ddkey).a.head(), pdf.groupby(pdkey).a.head()) 531 for ddof in ddofs: 532 assert_eq(ddf.groupby(ddkey).a.var(ddof), pdf.groupby(pdkey).a.var(ddof)) 533 assert_eq(ddf.groupby(ddkey).a.std(ddof), pdf.groupby(pdkey).a.std(ddof)) 534 535 assert_eq(ddf.groupby(ddkey).sum(), pdf.groupby(pdkey).sum()) 536 assert_eq(ddf.groupby(ddkey).min(), pdf.groupby(pdkey).min()) 537 assert_eq(ddf.groupby(ddkey).max(), pdf.groupby(pdkey).max()) 538 assert_eq(ddf.groupby(ddkey).count(), pdf.groupby(pdkey).count()) 539 assert_eq(ddf.groupby(ddkey).mean(), pdf.groupby(pdkey).mean()) 540 assert_eq(ddf.groupby(ddkey).size(), pdf.groupby(pdkey).size()) 541 assert_eq(ddf.groupby(ddkey).first(), pdf.groupby(pdkey).first()) 542 assert_eq(ddf.groupby(ddkey).last(), pdf.groupby(pdkey).last()) 543 assert_eq(ddf.groupby(ddkey).prod(), pdf.groupby(pdkey).prod()) 544 545 for ddof in ddofs: 546 assert_eq( 547 ddf.groupby(ddkey).var(ddof), 548 pdf.groupby(pdkey).var(ddof), 549 check_dtype=False, 550 ) 551 assert_eq( 552 ddf.groupby(ddkey).std(ddof), 553 pdf.groupby(pdkey).std(ddof), 554 check_dtype=False, 555 ) 556 557 for ddkey, pdkey in [(ddf.b, pdf.b), (ddf.b + 1, pdf.b + 1)]: 558 assert_eq( 559 ddf.a.groupby(ddkey).sum(), pdf.a.groupby(pdkey).sum(), check_names=False 560 ) 561 assert_eq( 562 ddf.a.groupby(ddkey).max(), pdf.a.groupby(pdkey).max(), check_names=False 563 ) 564 assert_eq( 565 ddf.a.groupby(ddkey).count(), 566 pdf.a.groupby(pdkey).count(), 567 check_names=False, 568 ) 569 assert_eq( 570 ddf.a.groupby(ddkey).mean(), pdf.a.groupby(pdkey).mean(), check_names=False 571 ) 572 assert_eq( 573 ddf.a.groupby(ddkey).nunique(), 574 pdf.a.groupby(pdkey).nunique(), 575 check_names=False, 576 ) 577 assert_eq( 578 ddf.a.groupby(ddkey).first(), 579 pdf.a.groupby(pdkey).first(), 580 check_names=False, 581 ) 582 assert_eq( 583 ddf.a.groupby(ddkey).last(), pdf.a.groupby(pdkey).last(), check_names=False 584 ) 585 assert_eq( 586 ddf.a.groupby(ddkey).prod(), pdf.a.groupby(pdkey).prod(), check_names=False 587 ) 588 589 for ddof in ddofs: 590 assert_eq(ddf.a.groupby(ddkey).var(ddof), pdf.a.groupby(pdkey).var(ddof)) 591 assert_eq(ddf.a.groupby(ddkey).std(ddof), pdf.a.groupby(pdkey).std(ddof)) 592 593 for i in [0, 4, 7]: 594 assert_eq(ddf.groupby(ddf.b > i).a.sum(), pdf.groupby(pdf.b > i).a.sum()) 595 assert_eq(ddf.groupby(ddf.b > i).a.min(), pdf.groupby(pdf.b > i).a.min()) 596 assert_eq(ddf.groupby(ddf.b > i).a.max(), pdf.groupby(pdf.b > i).a.max()) 597 assert_eq(ddf.groupby(ddf.b > i).a.count(), pdf.groupby(pdf.b > i).a.count()) 598 assert_eq(ddf.groupby(ddf.b > i).a.mean(), pdf.groupby(pdf.b > i).a.mean()) 599 assert_eq( 600 ddf.groupby(ddf.b > i).a.nunique(), pdf.groupby(pdf.b > i).a.nunique() 601 ) 602 assert_eq(ddf.groupby(ddf.b > i).a.size(), pdf.groupby(pdf.b > i).a.size()) 603 assert_eq(ddf.groupby(ddf.b > i).a.first(), pdf.groupby(pdf.b > i).a.first()) 604 assert_eq(ddf.groupby(ddf.b > i).a.last(), pdf.groupby(pdf.b > i).a.last()) 605 assert_eq(ddf.groupby(ddf.b > i).a.tail(), pdf.groupby(pdf.b > i).a.tail()) 606 assert_eq(ddf.groupby(ddf.b > i).a.head(), pdf.groupby(pdf.b > i).a.head()) 607 assert_eq(ddf.groupby(ddf.b > i).a.prod(), pdf.groupby(pdf.b > i).a.prod()) 608 609 assert_eq(ddf.groupby(ddf.a > i).b.sum(), pdf.groupby(pdf.a > i).b.sum()) 610 assert_eq(ddf.groupby(ddf.a > i).b.min(), pdf.groupby(pdf.a > i).b.min()) 611 assert_eq(ddf.groupby(ddf.a > i).b.max(), pdf.groupby(pdf.a > i).b.max()) 612 assert_eq(ddf.groupby(ddf.a > i).b.count(), pdf.groupby(pdf.a > i).b.count()) 613 assert_eq(ddf.groupby(ddf.a > i).b.mean(), pdf.groupby(pdf.a > i).b.mean()) 614 assert_eq( 615 ddf.groupby(ddf.a > i).b.nunique(), pdf.groupby(pdf.a > i).b.nunique() 616 ) 617 assert_eq(ddf.groupby(ddf.b > i).b.size(), pdf.groupby(pdf.b > i).b.size()) 618 assert_eq(ddf.groupby(ddf.b > i).b.first(), pdf.groupby(pdf.b > i).b.first()) 619 assert_eq(ddf.groupby(ddf.b > i).b.last(), pdf.groupby(pdf.b > i).b.last()) 620 assert_eq(ddf.groupby(ddf.b > i).b.tail(), pdf.groupby(pdf.b > i).b.tail()) 621 assert_eq(ddf.groupby(ddf.b > i).b.head(), pdf.groupby(pdf.b > i).b.head()) 622 assert_eq(ddf.groupby(ddf.b > i).b.prod(), pdf.groupby(pdf.b > i).b.prod()) 623 624 assert_eq(ddf.groupby(ddf.b > i).sum(), pdf.groupby(pdf.b > i).sum()) 625 assert_eq(ddf.groupby(ddf.b > i).min(), pdf.groupby(pdf.b > i).min()) 626 assert_eq(ddf.groupby(ddf.b > i).max(), pdf.groupby(pdf.b > i).max()) 627 assert_eq(ddf.groupby(ddf.b > i).count(), pdf.groupby(pdf.b > i).count()) 628 assert_eq(ddf.groupby(ddf.b > i).mean(), pdf.groupby(pdf.b > i).mean()) 629 assert_eq(ddf.groupby(ddf.b > i).size(), pdf.groupby(pdf.b > i).size()) 630 assert_eq(ddf.groupby(ddf.b > i).first(), pdf.groupby(pdf.b > i).first()) 631 assert_eq(ddf.groupby(ddf.b > i).last(), pdf.groupby(pdf.b > i).last()) 632 assert_eq(ddf.groupby(ddf.b > i).prod(), pdf.groupby(pdf.b > i).prod()) 633 634 assert_eq(ddf.groupby(ddf.a > i).sum(), pdf.groupby(pdf.a > i).sum()) 635 assert_eq(ddf.groupby(ddf.a > i).min(), pdf.groupby(pdf.a > i).min()) 636 assert_eq(ddf.groupby(ddf.a > i).max(), pdf.groupby(pdf.a > i).max()) 637 assert_eq(ddf.groupby(ddf.a > i).count(), pdf.groupby(pdf.a > i).count()) 638 assert_eq(ddf.groupby(ddf.a > i).mean(), pdf.groupby(pdf.a > i).mean()) 639 assert_eq(ddf.groupby(ddf.a > i).size(), pdf.groupby(pdf.a > i).size()) 640 assert_eq(ddf.groupby(ddf.a > i).first(), pdf.groupby(pdf.a > i).first()) 641 assert_eq(ddf.groupby(ddf.a > i).last(), pdf.groupby(pdf.a > i).last()) 642 assert_eq(ddf.groupby(ddf.a > i).prod(), pdf.groupby(pdf.a > i).prod()) 643 644 for ddof in ddofs: 645 assert_eq( 646 ddf.groupby(ddf.b > i).std(ddof), pdf.groupby(pdf.b > i).std(ddof) 647 ) 648 649 for ddkey, pdkey in [ 650 ("a", "a"), 651 (ddf.a, pdf.a), 652 (ddf.a + 1, pdf.a + 1), 653 (ddf.a > 3, pdf.a > 3), 654 ]: 655 assert_eq(ddf.groupby(ddkey).b.sum(), pdf.groupby(pdkey).b.sum()) 656 assert_eq(ddf.groupby(ddkey).b.min(), pdf.groupby(pdkey).b.min()) 657 assert_eq(ddf.groupby(ddkey).b.max(), pdf.groupby(pdkey).b.max()) 658 assert_eq(ddf.groupby(ddkey).b.count(), pdf.groupby(pdkey).b.count()) 659 assert_eq(ddf.groupby(ddkey).b.mean(), pdf.groupby(pdkey).b.mean()) 660 assert_eq(ddf.groupby(ddkey).b.nunique(), pdf.groupby(pdkey).b.nunique()) 661 assert_eq(ddf.groupby(ddkey).b.size(), pdf.groupby(pdkey).b.size()) 662 assert_eq(ddf.groupby(ddkey).b.first(), pdf.groupby(pdkey).b.first()) 663 assert_eq(ddf.groupby(ddkey).last(), pdf.groupby(pdkey).last()) 664 assert_eq(ddf.groupby(ddkey).prod(), pdf.groupby(pdkey).prod()) 665 666 assert_eq(ddf.groupby(ddkey).sum(), pdf.groupby(pdkey).sum()) 667 assert_eq(ddf.groupby(ddkey).min(), pdf.groupby(pdkey).min()) 668 assert_eq(ddf.groupby(ddkey).max(), pdf.groupby(pdkey).max()) 669 assert_eq(ddf.groupby(ddkey).count(), pdf.groupby(pdkey).count()) 670 assert_eq(ddf.groupby(ddkey).mean(), pdf.groupby(pdkey).mean().astype(float)) 671 assert_eq(ddf.groupby(ddkey).size(), pdf.groupby(pdkey).size()) 672 assert_eq(ddf.groupby(ddkey).first(), pdf.groupby(pdkey).first()) 673 assert_eq(ddf.groupby(ddkey).last(), pdf.groupby(pdkey).last()) 674 assert_eq(ddf.groupby(ddkey).prod(), pdf.groupby(pdkey).prod()) 675 676 for ddof in ddofs: 677 assert_eq(ddf.groupby(ddkey).b.std(ddof), pdf.groupby(pdkey).b.std(ddof)) 678 679 assert sorted(ddf.groupby("b").a.sum().dask) == sorted( 680 ddf.groupby("b").a.sum().dask 681 ) 682 assert sorted(ddf.groupby(ddf.a > 3).b.mean().dask) == sorted( 683 ddf.groupby(ddf.a > 3).b.mean().dask 684 ) 685 686 # test raises with incorrect key 687 pytest.raises(KeyError, lambda: ddf.groupby("x")) 688 pytest.raises(KeyError, lambda: ddf.groupby(["a", "x"])) 689 pytest.raises(KeyError, lambda: ddf.groupby("a")["x"]) 690 with warnings.catch_warnings(): 691 # pandas warns about using tuples before throwing the KeyError 692 warnings.simplefilter("ignore", FutureWarning) 693 pytest.raises(KeyError, lambda: ddf.groupby("a")["b", "x"]) 694 pytest.raises(KeyError, lambda: ddf.groupby("a")[["b", "x"]]) 695 696 # test graph node labels 697 assert_dask_graph(ddf.groupby("b").a.sum(), "series-groupby-sum") 698 assert_dask_graph(ddf.groupby("b").a.min(), "series-groupby-min") 699 assert_dask_graph(ddf.groupby("b").a.max(), "series-groupby-max") 700 assert_dask_graph(ddf.groupby("b").a.count(), "series-groupby-count") 701 assert_dask_graph(ddf.groupby("b").a.var(), "series-groupby-var") 702 assert_dask_graph(ddf.groupby("b").a.cov(), "series-groupby-cov") 703 assert_dask_graph(ddf.groupby("b").a.first(), "series-groupby-first") 704 assert_dask_graph(ddf.groupby("b").a.last(), "series-groupby-last") 705 assert_dask_graph(ddf.groupby("b").a.tail(), "series-groupby-tail") 706 assert_dask_graph(ddf.groupby("b").a.head(), "series-groupby-head") 707 assert_dask_graph(ddf.groupby("b").a.prod(), "series-groupby-prod") 708 # mean consists from sum and count operations 709 assert_dask_graph(ddf.groupby("b").a.mean(), "series-groupby-sum") 710 assert_dask_graph(ddf.groupby("b").a.mean(), "series-groupby-count") 711 assert_dask_graph(ddf.groupby("b").a.nunique(), "series-groupby-nunique") 712 assert_dask_graph(ddf.groupby("b").a.size(), "series-groupby-size") 713 714 assert_dask_graph(ddf.groupby("b").sum(), "dataframe-groupby-sum") 715 assert_dask_graph(ddf.groupby("b").min(), "dataframe-groupby-min") 716 assert_dask_graph(ddf.groupby("b").max(), "dataframe-groupby-max") 717 assert_dask_graph(ddf.groupby("b").count(), "dataframe-groupby-count") 718 assert_dask_graph(ddf.groupby("b").first(), "dataframe-groupby-first") 719 assert_dask_graph(ddf.groupby("b").last(), "dataframe-groupby-last") 720 assert_dask_graph(ddf.groupby("b").prod(), "dataframe-groupby-prod") 721 # mean consists from sum and count operations 722 assert_dask_graph(ddf.groupby("b").mean(), "dataframe-groupby-sum") 723 assert_dask_graph(ddf.groupby("b").mean(), "dataframe-groupby-count") 724 assert_dask_graph(ddf.groupby("b").size(), "dataframe-groupby-size") 725 726 727@pytest.mark.parametrize("keyword", ["split_every", "split_out"]) 728def test_groupby_reduction_split(keyword): 729 pdf = pd.DataFrame( 730 {"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 100, "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 100} 731 ) 732 ddf = dd.from_pandas(pdf, npartitions=15) 733 734 def call(g, m, **kwargs): 735 return getattr(g, m)(**kwargs) 736 737 # DataFrame 738 for m in AGG_FUNCS: 739 # nunique is not implemented for DataFrameGroupBy 740 # covariance/correlation is not a series aggregation 741 if m in ("nunique", "cov", "corr"): 742 continue 743 res = call(ddf.groupby("b"), m, **{keyword: 2}) 744 sol = call(pdf.groupby("b"), m) 745 assert_eq(res, sol) 746 assert call(ddf.groupby("b"), m)._name != res._name 747 748 res = call(ddf.groupby("b"), "var", ddof=2, **{keyword: 2}) 749 sol = call(pdf.groupby("b"), "var", ddof=2) 750 assert_eq(res, sol) 751 assert call(ddf.groupby("b"), "var", ddof=2)._name != res._name 752 753 # Series, post select 754 for m in AGG_FUNCS: 755 # covariance/correlation is not a series aggregation 756 if m in ("cov", "corr"): 757 continue 758 res = call(ddf.groupby("b").a, m, **{keyword: 2}) 759 sol = call(pdf.groupby("b").a, m) 760 assert_eq(res, sol) 761 assert call(ddf.groupby("b").a, m)._name != res._name 762 763 res = call(ddf.groupby("b").a, "var", ddof=2, **{keyword: 2}) 764 sol = call(pdf.groupby("b").a, "var", ddof=2) 765 assert_eq(res, sol) 766 assert call(ddf.groupby("b").a, "var", ddof=2)._name != res._name 767 768 # Series, pre select 769 for m in AGG_FUNCS: 770 # covariance/correlation is not a series aggregation 771 if m in ("cov", "corr"): 772 continue 773 res = call(ddf.a.groupby(ddf.b), m, **{keyword: 2}) 774 sol = call(pdf.a.groupby(pdf.b), m) 775 # There's a bug in pandas 0.18.0 with `pdf.a.groupby(pdf.b).count()` 776 # not forwarding the series name. Skip name checks here for now. 777 assert_eq(res, sol, check_names=False) 778 assert call(ddf.a.groupby(ddf.b), m)._name != res._name 779 780 res = call(ddf.a.groupby(ddf.b), "var", ddof=2, **{keyword: 2}) 781 sol = call(pdf.a.groupby(pdf.b), "var", ddof=2) 782 783 assert_eq(res, sol) 784 assert call(ddf.a.groupby(ddf.b), "var", ddof=2)._name != res._name 785 786 787@pytest.mark.parametrize( 788 "grouped", 789 [ 790 lambda df: df.groupby("A"), 791 lambda df: df.groupby(df["A"]), 792 lambda df: df.groupby(df["A"] + 1), 793 lambda df: df.groupby("A")["B"], 794 # SeriesGroupBy: 795 lambda df: df.groupby("A")["B"], 796 lambda df: df.groupby(df["A"])["B"], 797 lambda df: df.groupby(df["A"] + 1)["B"], 798 # Series.groupby(): 799 lambda df: df.B.groupby(df["A"]), 800 lambda df: df.B.groupby(df["A"] + 1), 801 # DataFrameGroupBy with column slice: 802 lambda df: df.groupby("A")[["B", "C"]], 803 lambda df: df.groupby(df["A"])[["B", "C"]], 804 lambda df: df.groupby(df["A"] + 1)[["B", "C"]], 805 ], 806) 807@pytest.mark.parametrize( 808 "func", 809 [ 810 lambda grp: grp.apply(lambda x: x.sum()), 811 lambda grp: grp.transform(lambda x: x.sum()), 812 ], 813) 814def test_apply_or_transform_shuffle(grouped, func): 815 pdf = pd.DataFrame( 816 { 817 "A": [1, 2, 3, 4] * 5, 818 "B": np.random.randn(20), 819 "C": np.random.randn(20), 820 "D": np.random.randn(20), 821 } 822 ) 823 ddf = dd.from_pandas(pdf, 3) 824 825 with pytest.warns(UserWarning): # meta inference 826 assert_eq(func(grouped(pdf)), func(grouped(ddf))) 827 828 829@pytest.mark.parametrize( 830 "grouper", 831 [ 832 lambda df: "AA", 833 lambda df: ["AA", "AB"], 834 lambda df: df["AA"], 835 lambda df: [df["AA"], df["AB"]], 836 lambda df: df["AA"] + 1, 837 pytest.param( 838 lambda df: [df["AA"] + 1, df["AB"] + 1], 839 marks=pytest.mark.xfail("NotImplemented"), 840 ), 841 ], 842) 843@pytest.mark.parametrize( 844 "func", 845 [ 846 lambda grouped: grouped.apply(lambda x: x.sum()), 847 lambda grouped: grouped.transform(lambda x: x.sum()), 848 ], 849) 850def test_apply_or_transform_shuffle_multilevel(grouper, func): 851 pdf = pd.DataFrame( 852 { 853 "AB": [1, 2, 3, 4] * 5, 854 "AA": [1, 2, 3, 4] * 5, 855 "B": np.random.randn(20), 856 "C": np.random.randn(20), 857 "D": np.random.randn(20), 858 } 859 ) 860 ddf = dd.from_pandas(pdf, 3) 861 862 with pytest.warns(UserWarning): 863 # DataFrameGroupBy 864 assert_eq(func(ddf.groupby(grouper(ddf))), func(pdf.groupby(grouper(pdf)))) 865 866 # SeriesGroupBy 867 assert_eq( 868 func(ddf.groupby(grouper(ddf))["B"]), func(pdf.groupby(grouper(pdf))["B"]) 869 ) 870 871 # DataFrameGroupBy with column slice 872 assert_eq( 873 func(ddf.groupby(grouper(ddf))[["B", "C"]]), 874 func(pdf.groupby(grouper(pdf))[["B", "C"]]), 875 ) 876 877 878def test_numeric_column_names(): 879 # df.groupby(0)[df.columns] fails if all columns are numbers (pandas bug) 880 # This ensures that we cast all column iterables to list beforehand. 881 df = pd.DataFrame({0: [0, 1, 0, 1], 1: [1, 2, 3, 4], 2: [0, 1, 0, 1]}) 882 ddf = dd.from_pandas(df, npartitions=2) 883 assert_eq(ddf.groupby(0).sum(), df.groupby(0).sum()) 884 assert_eq(ddf.groupby([0, 2]).sum(), df.groupby([0, 2]).sum()) 885 assert_eq( 886 ddf.groupby(0).apply(lambda x: x, meta={0: int, 1: int, 2: int}), 887 df.groupby(0).apply(lambda x: x), 888 ) 889 890 891def test_groupby_apply_tasks(shuffle_method): 892 if shuffle_method == "disk": 893 pytest.skip("Tasks-only shuffle test") 894 895 df = _compat.makeTimeDataFrame() 896 df["A"] = df.A // 0.1 897 df["B"] = df.B // 0.1 898 ddf = dd.from_pandas(df, npartitions=10) 899 900 for ind in [lambda x: "A", lambda x: x.A]: 901 a = df.groupby(ind(df)).apply(len) 902 with pytest.warns(UserWarning): 903 b = ddf.groupby(ind(ddf)).apply(len) 904 assert_eq(a, b.compute()) 905 assert not any("partd" in k[0] for k in b.dask) 906 907 a = df.groupby(ind(df)).B.apply(len) 908 with pytest.warns(UserWarning): 909 b = ddf.groupby(ind(ddf)).B.apply(len) 910 assert_eq(a, b.compute()) 911 assert not any("partd" in k[0] for k in b.dask) 912 913 914def test_groupby_multiprocessing(): 915 df = pd.DataFrame({"A": [1, 2, 3, 4, 5], "B": ["1", "1", "a", "a", "a"]}) 916 ddf = dd.from_pandas(df, npartitions=3) 917 with dask.config.set(scheduler="processes"): 918 assert_eq( 919 ddf.groupby("B").apply(lambda x: x, meta={"A": int, "B": object}), 920 df.groupby("B").apply(lambda x: x), 921 ) 922 923 924def test_groupby_normalize_index(): 925 full = pd.DataFrame( 926 {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]}, 927 index=[0, 1, 3, 5, 6, 8, 9, 9, 9], 928 ) 929 d = dd.from_pandas(full, npartitions=3) 930 931 assert d.groupby("a").index == "a" 932 assert d.groupby(d["a"]).index == "a" 933 assert d.groupby(d["a"] > 2).index._name == (d["a"] > 2)._name 934 assert d.groupby(["a", "b"]).index == ["a", "b"] 935 936 assert d.groupby([d["a"], d["b"]]).index == ["a", "b"] 937 assert d.groupby([d["a"], "b"]).index == ["a", "b"] 938 939 940def test_aggregate__single_element_groups(agg_func): 941 spec = agg_func 942 943 # nunique/cov is not supported in specs 944 if spec in ("nunique", "cov", "corr"): 945 return 946 947 pdf = pd.DataFrame( 948 {"a": [1, 1, 3, 3], "b": [4, 4, 16, 16], "c": [1, 1, 4, 4], "d": [1, 1, 3, 3]}, 949 columns=["c", "b", "a", "d"], 950 ) 951 ddf = dd.from_pandas(pdf, npartitions=3) 952 953 expected = pdf.groupby(["a", "d"]).agg(spec) 954 955 # NOTE: for std the result is not recast ot the original dtype 956 if spec in {"mean", "var"}: 957 expected = expected.astype(float) 958 959 assert_eq(expected, ddf.groupby(["a", "d"]).agg(spec)) 960 961 962def test_aggregate_build_agg_args__reuse_of_intermediates(): 963 """Aggregate reuses intermediates. For example, with sum, count, and mean 964 the sums and counts are only calculated once across the graph and reused to 965 compute the mean. 966 """ 967 from dask.dataframe.groupby import _build_agg_args 968 969 no_mean_spec = [("foo", "sum", "input"), ("bar", "count", "input")] 970 971 with_mean_spec = [ 972 ("foo", "sum", "input"), 973 ("bar", "count", "input"), 974 ("baz", "mean", "input"), 975 ] 976 977 no_mean_chunks, no_mean_aggs, no_mean_finalizers = _build_agg_args(no_mean_spec) 978 with_mean_chunks, with_mean_aggs, with_mean_finalizers = _build_agg_args( 979 with_mean_spec 980 ) 981 982 assert len(no_mean_chunks) == len(with_mean_chunks) 983 assert len(no_mean_aggs) == len(with_mean_aggs) 984 985 assert len(no_mean_finalizers) == len(no_mean_spec) 986 assert len(with_mean_finalizers) == len(with_mean_spec) 987 988 989def test_aggregate_dask(): 990 dask_holder = collections.namedtuple("dask_holder", ["dask"]) 991 get_agg_dask = lambda obj: dask_holder( 992 {k: v for (k, v) in obj.dask.items() if k[0].startswith("aggregate")} 993 ) 994 995 specs = [ 996 {"b": {"c": "mean"}, "c": {"a": "max", "b": "min"}}, 997 {"b": "mean", "c": ["min", "max"]}, 998 [ 999 "sum", 1000 "mean", 1001 "min", 1002 "max", 1003 "count", 1004 "size", 1005 "std", 1006 "var", 1007 "first", 1008 "last", 1009 "prod", 1010 ], 1011 "sum", 1012 "mean", 1013 "min", 1014 "max", 1015 "count", 1016 "std", 1017 "var", 1018 "first", 1019 "last", 1020 "prod" 1021 # NOTE: the 'size' spec is special since it bypasses aggregate 1022 # 'size' 1023 ] 1024 1025 pdf = pd.DataFrame( 1026 { 1027 "a": [1, 2, 3, 1, 1, 2, 4, 3, 7] * 100, 1028 "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 100, 1029 "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 100, 1030 "d": [3, 2, 1, 3, 2, 1, 2, 6, 4] * 100, 1031 }, 1032 columns=["c", "b", "a", "d"], 1033 ) 1034 ddf = dd.from_pandas(pdf, npartitions=100) 1035 1036 for spec in specs: 1037 result1 = ddf.groupby(["a", "b"]).agg(spec, split_every=2) 1038 result2 = ddf.groupby(["a", "b"]).agg(spec, split_every=2) 1039 1040 agg_dask1 = get_agg_dask(result1) 1041 agg_dask2 = get_agg_dask(result2) 1042 1043 # check that the number of partitions used is fixed by split_every 1044 assert_max_deps(agg_dask1, 2) 1045 assert_max_deps(agg_dask2, 2) 1046 1047 # check for deterministic key names and values 1048 assert agg_dask1 == agg_dask2 1049 1050 # the length of the dask does not depend on the passed spec 1051 for other_spec in specs: 1052 other = ddf.groupby(["a", "b"]).agg(other_spec, split_every=2) 1053 assert len(other.dask) == len(result1.dask) 1054 assert len(other.dask) == len(result2.dask) 1055 1056 1057@pytest.mark.parametrize( 1058 "grouper", 1059 [ 1060 lambda df: ["a"], 1061 lambda df: ["a", "b"], 1062 lambda df: df["a"], 1063 lambda df: [df["a"], df["b"]], 1064 lambda df: [df["a"] > 2, df["b"] > 1], 1065 ], 1066) 1067def test_dataframe_aggregations_multilevel(grouper, agg_func): 1068 def call(g, m, **kwargs): 1069 return getattr(g, m)(**kwargs) 1070 1071 pdf = pd.DataFrame( 1072 { 1073 "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10, 1074 "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10, 1075 "d": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10, 1076 "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10, 1077 }, 1078 columns=["c", "b", "a", "d"], 1079 ) 1080 1081 ddf = dd.from_pandas(pdf, npartitions=10) 1082 1083 # covariance only works with N+1 columns 1084 if agg_func not in ("cov", "corr"): 1085 assert_eq( 1086 call(pdf.groupby(grouper(pdf))["c"], agg_func), 1087 call(ddf.groupby(grouper(ddf))["c"], agg_func, split_every=2), 1088 ) 1089 1090 # not supported by pandas 1091 if agg_func != "nunique": 1092 assert_eq( 1093 call(pdf.groupby(grouper(pdf))[["c", "d"]], agg_func), 1094 call(ddf.groupby(grouper(ddf))[["c", "d"]], agg_func, split_every=2), 1095 ) 1096 1097 if agg_func in ("cov", "corr"): 1098 # there are sorting issues between pandas and chunk cov w/dask 1099 df = call(pdf.groupby(grouper(pdf)), agg_func).sort_index() 1100 cols = sorted(list(df.columns)) 1101 df = df[cols] 1102 dddf = call(ddf.groupby(grouper(ddf)), agg_func, split_every=2).compute() 1103 dddf = dddf.sort_index() 1104 cols = sorted(list(dddf.columns)) 1105 dddf = dddf[cols] 1106 assert_eq(df, dddf) 1107 else: 1108 assert_eq( 1109 call(pdf.groupby(grouper(pdf)), agg_func), 1110 call(ddf.groupby(grouper(ddf)), agg_func, split_every=2), 1111 ) 1112 1113 1114@pytest.mark.parametrize( 1115 "grouper", 1116 [ 1117 lambda df: df["a"], 1118 lambda df: [df["a"], df["b"]], 1119 lambda df: [df["a"] > 2, df["b"] > 1], 1120 ], 1121) 1122def test_series_aggregations_multilevel(grouper, agg_func): 1123 """ 1124 similar to ``test_dataframe_aggregations_multilevel``, but series do not 1125 support all groupby args. 1126 """ 1127 1128 def call(g, m, **kwargs): 1129 return getattr(g, m)(**kwargs) 1130 1131 # covariance/correlation is not a series aggregation 1132 if agg_func in ("cov", "corr"): 1133 return 1134 1135 pdf = pd.DataFrame( 1136 { 1137 "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10, 1138 "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10, 1139 "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10, 1140 }, 1141 columns=["c", "b", "a"], 1142 ) 1143 1144 ddf = dd.from_pandas(pdf, npartitions=10) 1145 1146 assert_eq( 1147 call(pdf["c"].groupby(grouper(pdf)), agg_func), 1148 call(ddf["c"].groupby(grouper(ddf)), agg_func, split_every=2), 1149 # for pandas ~ 0.18, the name is not not properly propagated for 1150 # the mean aggregation 1151 check_names=(agg_func not in {"mean", "nunique"}), 1152 ) 1153 1154 1155@pytest.mark.parametrize( 1156 "grouper", 1157 [ 1158 lambda df: df["a"], 1159 lambda df: df["a"] > 2, 1160 lambda df: [df["a"], df["b"]], 1161 lambda df: [df["a"] > 2], 1162 pytest.param( 1163 lambda df: [df["a"] > 2, df["b"] > 1], 1164 marks=pytest.mark.xfail( 1165 reason="index dtype does not coincide: boolean != empty" 1166 ), 1167 ), 1168 ], 1169) 1170@pytest.mark.parametrize( 1171 "group_and_slice", 1172 [ 1173 lambda df, grouper: df.groupby(grouper(df)), 1174 lambda df, grouper: df["c"].groupby(grouper(df)), 1175 lambda df, grouper: df.groupby(grouper(df))["c"], 1176 ], 1177) 1178def test_groupby_meta_content(group_and_slice, grouper): 1179 pdf = pd.DataFrame( 1180 { 1181 "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10, 1182 "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10, 1183 "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10, 1184 }, 1185 columns=["c", "b", "a"], 1186 ) 1187 1188 ddf = dd.from_pandas(pdf, npartitions=10) 1189 1190 expected = group_and_slice(pdf, grouper).first().head(0) 1191 meta = group_and_slice(ddf, grouper)._meta.first() 1192 meta_nonempty = group_and_slice(ddf, grouper)._meta_nonempty.first().head(0) 1193 1194 assert_eq(expected, meta) 1195 assert_eq(expected, meta_nonempty) 1196 1197 1198def test_groupy_non_aligned_index(): 1199 pdf = pd.DataFrame( 1200 { 1201 "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10, 1202 "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10, 1203 "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10, 1204 }, 1205 columns=["c", "b", "a"], 1206 ) 1207 1208 ddf3 = dd.from_pandas(pdf, npartitions=3) 1209 ddf7 = dd.from_pandas(pdf, npartitions=7) 1210 1211 # working examples 1212 ddf3.groupby(["a", "b"]) 1213 ddf3.groupby([ddf3["a"], ddf3["b"]]) 1214 1215 # misaligned divisions 1216 with pytest.raises(NotImplementedError): 1217 ddf3.groupby(ddf7["a"]) 1218 1219 with pytest.raises(NotImplementedError): 1220 ddf3.groupby([ddf7["a"], ddf7["b"]]) 1221 1222 with pytest.raises(NotImplementedError): 1223 ddf3.groupby([ddf7["a"], ddf3["b"]]) 1224 1225 with pytest.raises(NotImplementedError): 1226 ddf3.groupby([ddf3["a"], ddf7["b"]]) 1227 1228 with pytest.raises(NotImplementedError): 1229 ddf3.groupby([ddf7["a"], "b"]) 1230 1231 1232def test_groupy_series_wrong_grouper(): 1233 df = pd.DataFrame( 1234 { 1235 "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10, 1236 "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10, 1237 "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10, 1238 }, 1239 columns=["c", "b", "a"], 1240 ) 1241 1242 df = dd.from_pandas(df, npartitions=3) 1243 s = df["a"] 1244 1245 # working index values 1246 s.groupby(s) 1247 s.groupby([s, s]) 1248 1249 # non working index values 1250 with pytest.raises(KeyError): 1251 s.groupby("foo") 1252 1253 with pytest.raises(KeyError): 1254 s.groupby([s, "foo"]) 1255 1256 with pytest.raises(ValueError): 1257 s.groupby(df) 1258 1259 with pytest.raises(ValueError): 1260 s.groupby([s, df]) 1261 1262 1263@pytest.mark.parametrize("npartitions", [1, 4, 20]) 1264@pytest.mark.parametrize("split_every", [2, 5]) 1265@pytest.mark.parametrize("split_out", [None, 1, 5, 20]) 1266def test_hash_groupby_aggregate(npartitions, split_every, split_out): 1267 df = pd.DataFrame({"x": np.arange(100) % 10, "y": np.ones(100)}) 1268 ddf = dd.from_pandas(df, npartitions) 1269 1270 result = ddf.groupby("x").y.var(split_every=split_every, split_out=split_out) 1271 1272 dsk = result.__dask_optimize__(result.dask, result.__dask_keys__()) 1273 from dask.core import get_deps 1274 1275 dependencies, dependents = get_deps(dsk) 1276 1277 assert result.npartitions == (split_out or 1) 1278 assert len([k for k, v in dependencies.items() if not v]) == npartitions 1279 1280 assert_eq(result, df.groupby("x").y.var()) 1281 1282 1283def test_split_out_multi_column_groupby(): 1284 df = pd.DataFrame( 1285 {"x": np.arange(100) % 10, "y": np.ones(100), "z": [1, 2, 3, 4, 5] * 20} 1286 ) 1287 1288 ddf = dd.from_pandas(df, npartitions=10) 1289 1290 result = ddf.groupby(["x", "y"]).z.mean(split_out=4) 1291 expected = df.groupby(["x", "y"]).z.mean() 1292 1293 assert_eq(result, expected, check_dtype=False) 1294 1295 1296def test_groupby_split_out_num(): 1297 # GH 1841 1298 ddf = dd.from_pandas( 1299 pd.DataFrame({"A": [1, 1, 2, 2], "B": [1, 2, 3, 4]}), npartitions=2 1300 ) 1301 assert ddf.groupby("A").sum().npartitions == 1 1302 assert ddf.groupby("A").sum(split_out=2).npartitions == 2 1303 assert ddf.groupby("A").sum(split_out=3).npartitions == 3 1304 1305 with pytest.raises(TypeError): 1306 # groupby doesn't accept split_out 1307 ddf.groupby("A", split_out=2) 1308 1309 1310def test_groupby_not_supported(): 1311 ddf = dd.from_pandas( 1312 pd.DataFrame({"A": [1, 1, 2, 2], "B": [1, 2, 3, 4]}), npartitions=2 1313 ) 1314 with pytest.raises(TypeError): 1315 ddf.groupby("A", axis=1) 1316 with pytest.raises(TypeError): 1317 ddf.groupby("A", level=1) 1318 with pytest.raises(TypeError): 1319 ddf.groupby("A", as_index=False) 1320 with pytest.raises(TypeError): 1321 ddf.groupby("A", squeeze=True) 1322 1323 1324def test_groupby_numeric_column(): 1325 df = pd.DataFrame({"A": ["foo", "foo", "bar"], 0: [1, 2, 3]}) 1326 ddf = dd.from_pandas(df, npartitions=3) 1327 1328 assert_eq(ddf.groupby(ddf.A)[0].sum(), df.groupby(df.A)[0].sum()) 1329 1330 1331@pytest.mark.parametrize("sel", ["c", "d", ["c", "d"]]) 1332@pytest.mark.parametrize("key", ["a", ["a", "b"]]) 1333@pytest.mark.parametrize("func", ["cumsum", "cumprod", "cumcount"]) 1334def test_cumulative(func, key, sel): 1335 df = pd.DataFrame( 1336 { 1337 "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 6, 1338 "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 6, 1339 "c": np.random.randn(54), 1340 "d": np.random.randn(54), 1341 }, 1342 columns=["a", "b", "c", "d"], 1343 ) 1344 df.iloc[[-18, -12, -6], -1] = np.nan 1345 ddf = dd.from_pandas(df, npartitions=10) 1346 1347 g, dg = (d.groupby(key)[sel] for d in (df, ddf)) 1348 assert_eq(getattr(g, func)(), getattr(dg, func)()) 1349 1350 1351@pytest.mark.parametrize("func", ["cumsum", "cumprod"]) 1352def test_cumulative_axis1(func): 1353 df = pd.DataFrame( 1354 { 1355 "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 2, 1356 "b": np.random.randn(18), 1357 "c": np.random.randn(18), 1358 } 1359 ) 1360 df.iloc[-6, -1] = np.nan 1361 ddf = dd.from_pandas(df, npartitions=4) 1362 assert_eq( 1363 getattr(df.groupby("a"), func)(axis=1), getattr(ddf.groupby("a"), func)(axis=1) 1364 ) 1365 1366 1367def test_groupby_unaligned_index(): 1368 df = pd.DataFrame( 1369 { 1370 "a": np.random.randint(0, 10, 50), 1371 "b": np.random.randn(50), 1372 "c": np.random.randn(50), 1373 } 1374 ) 1375 ddf = dd.from_pandas(df, npartitions=5) 1376 filtered = df[df.b < 0.5] 1377 dfiltered = ddf[ddf.b < 0.5] 1378 1379 ddf_group = dfiltered.groupby(ddf.a) 1380 ds_group = dfiltered.b.groupby(ddf.a) 1381 1382 bad = [ 1383 ddf_group.mean(), 1384 ddf_group.var(), 1385 ddf_group.b.nunique(), 1386 ddf_group.get_group(0), 1387 ds_group.mean(), 1388 ds_group.var(), 1389 ds_group.nunique(), 1390 ds_group.get_group(0), 1391 ] 1392 1393 for obj in bad: 1394 with pytest.raises(ValueError): 1395 obj.compute() 1396 1397 def add1(x): 1398 return x + 1 1399 1400 df_group = filtered.groupby(df.a) 1401 good = [ 1402 (ddf_group.apply(add1, meta=ddf), df_group.apply(add1)), 1403 (ddf_group.b.apply(add1, meta=ddf.b), df_group.b.apply(add1)), 1404 ] 1405 1406 for (res, sol) in good: 1407 assert_eq(res, sol) 1408 1409 1410def test_groupby_string_label(): 1411 df = pd.DataFrame({"foo": [1, 1, 4], "B": [2, 3, 4], "C": [5, 6, 7]}) 1412 ddf = dd.from_pandas(pd.DataFrame(df), npartitions=1) 1413 ddf_group = ddf.groupby("foo") 1414 result = ddf_group.get_group(1).compute() 1415 1416 expected = pd.DataFrame( 1417 {"foo": [1, 1], "B": [2, 3], "C": [5, 6]}, index=pd.Index([0, 1]) 1418 ) 1419 1420 tm.assert_frame_equal(result, expected) 1421 1422 1423def test_groupby_dataframe_cum_caching(): 1424 """Test caching behavior of cumulative operations on grouped dataframes. 1425 1426 Relates to #3756. 1427 """ 1428 df = pd.DataFrame( 1429 dict(a=list("aabbcc")), index=pd.date_range(start="20100101", periods=6) 1430 ) 1431 df["ones"] = 1 1432 df["twos"] = 2 1433 1434 ddf = dd.from_pandas(df, npartitions=3) 1435 1436 ops = ["cumsum", "cumprod"] 1437 1438 for op in ops: 1439 ddf0 = getattr(ddf.groupby(["a"]), op)() 1440 ddf1 = ddf.rename(columns={"ones": "foo", "twos": "bar"}) 1441 ddf1 = getattr(ddf1.groupby(["a"]), op)() 1442 1443 # _a and _b dataframe should be equal 1444 res0_a, res1_a = dask.compute(ddf0, ddf1) 1445 res0_b, res1_b = ddf0.compute(), ddf1.compute() 1446 1447 assert res0_a.equals(res0_b) 1448 assert res1_a.equals(res1_b) 1449 1450 1451def test_groupby_series_cum_caching(): 1452 """Test caching behavior of cumulative operations on grouped Series 1453 1454 Relates to #3755 1455 """ 1456 df = pd.DataFrame( 1457 dict(a=list("aabbcc")), index=pd.date_range(start="20100101", periods=6) 1458 ) 1459 df["ones"] = 1 1460 df["twos"] = 2 1461 1462 ops = ["cumsum", "cumprod"] 1463 for op in ops: 1464 ddf = dd.from_pandas(df, npartitions=3) 1465 dcum = ddf.groupby(["a"]) 1466 res0_a, res1_a = dask.compute( 1467 getattr(dcum["ones"], op)(), getattr(dcum["twos"], op)() 1468 ) 1469 cum = df.groupby(["a"]) 1470 res0_b, res1_b = (getattr(cum["ones"], op)(), getattr(cum["twos"], op)()) 1471 1472 assert res0_a.equals(res0_b) 1473 assert res1_a.equals(res1_b) 1474 1475 1476def test_groupby_slice_agg_reduces(): 1477 d = pd.DataFrame({"a": [1, 2, 3, 4], "b": [2, 3, 4, 5]}) 1478 a = dd.from_pandas(d, npartitions=2) 1479 result = a.groupby("a")["b"].agg(["min", "max"]) 1480 expected = d.groupby("a")["b"].agg(["min", "max"]) 1481 assert_eq(result, expected) 1482 1483 1484def test_groupby_agg_grouper_single(): 1485 # https://github.com/dask/dask/issues/2255 1486 d = pd.DataFrame({"a": [1, 2, 3, 4]}) 1487 a = dd.from_pandas(d, npartitions=2) 1488 1489 result = a.groupby("a")["a"].agg(["min", "max"]) 1490 expected = d.groupby("a")["a"].agg(["min", "max"]) 1491 assert_eq(result, expected) 1492 1493 1494@pytest.mark.parametrize("slice_", ["a", ["a"], ["a", "b"], ["b"]]) 1495def test_groupby_agg_grouper_multiple(slice_): 1496 # https://github.com/dask/dask/issues/2255 1497 d = pd.DataFrame({"a": [1, 2, 3, 4], "b": [1, 2, 3, 4]}) 1498 a = dd.from_pandas(d, npartitions=2) 1499 1500 result = a.groupby("a")[slice_].agg(["min", "max"]) 1501 expected = d.groupby("a")[slice_].agg(["min", "max"]) 1502 assert_eq(result, expected) 1503 1504 1505@pytest.mark.parametrize( 1506 "agg_func", 1507 [ 1508 "cumprod", 1509 "cumcount", 1510 "cumsum", 1511 "var", 1512 "sum", 1513 "mean", 1514 "count", 1515 "size", 1516 "std", 1517 "min", 1518 "max", 1519 "first", 1520 "last", 1521 "prod", 1522 ], 1523) 1524def test_groupby_column_and_index_agg_funcs(agg_func): 1525 def call(g, m, **kwargs): 1526 return getattr(g, m)(**kwargs) 1527 1528 df = pd.DataFrame( 1529 { 1530 "idx": [1, 1, 1, 2, 2, 2], 1531 "a": [1, 2, 1, 2, 1, 2], 1532 "b": np.arange(6), 1533 "c": [1, 1, 1, 2, 2, 2], 1534 } 1535 ).set_index("idx") 1536 1537 ddf = dd.from_pandas(df, npartitions=df.index.nunique()) 1538 ddf_no_divs = dd.from_pandas(df, npartitions=df.index.nunique(), sort=False) 1539 1540 # Index and then column 1541 1542 # Compute expected result 1543 expected = call(df.groupby(["idx", "a"]), agg_func) 1544 if agg_func in {"mean", "var"}: 1545 expected = expected.astype(float) 1546 1547 result = call(ddf.groupby(["idx", "a"]), agg_func) 1548 assert_eq(expected, result) 1549 1550 result = call(ddf_no_divs.groupby(["idx", "a"]), agg_func) 1551 assert_eq(expected, result) 1552 1553 # apply-combine-apply aggregation functions 1554 aca_agg = {"sum", "mean", "var", "size", "std", "count", "first", "last", "prod"} 1555 1556 # Test aggregate strings 1557 if agg_func in aca_agg: 1558 result = ddf_no_divs.groupby(["idx", "a"]).agg(agg_func) 1559 assert_eq(expected, result) 1560 1561 # Column and then index 1562 1563 # Compute expected result 1564 expected = call(df.groupby(["a", "idx"]), agg_func) 1565 if agg_func in {"mean", "var"}: 1566 expected = expected.astype(float) 1567 1568 result = call(ddf.groupby(["a", "idx"]), agg_func) 1569 assert_eq(expected, result) 1570 1571 result = call(ddf_no_divs.groupby(["a", "idx"]), agg_func) 1572 assert_eq(expected, result) 1573 1574 # Test aggregate strings 1575 if agg_func in aca_agg: 1576 result = ddf_no_divs.groupby(["a", "idx"]).agg(agg_func) 1577 assert_eq(expected, result) 1578 1579 # Index only 1580 1581 # Compute expected result 1582 expected = call(df.groupby("idx"), agg_func) 1583 if agg_func in {"mean", "var"}: 1584 expected = expected.astype(float) 1585 1586 result = call(ddf.groupby("idx"), agg_func) 1587 assert_eq(expected, result) 1588 1589 result = call(ddf_no_divs.groupby("idx"), agg_func) 1590 assert_eq(expected, result) 1591 1592 # Test aggregate strings 1593 if agg_func in aca_agg: 1594 result = ddf_no_divs.groupby("idx").agg(agg_func) 1595 assert_eq(expected, result) 1596 1597 1598@pytest.mark.parametrize("group_args", [["idx", "a"], ["a", "idx"], ["idx"], "idx"]) 1599@pytest.mark.parametrize( 1600 "apply_func", [np.min, np.mean, lambda s: np.max(s) - np.mean(s)] 1601) 1602def test_groupby_column_and_index_apply(group_args, apply_func): 1603 df = pd.DataFrame( 1604 {"idx": [1, 1, 1, 2, 2, 2], "a": [1, 2, 1, 2, 1, 2], "b": np.arange(6)} 1605 ).set_index("idx") 1606 1607 ddf = dd.from_pandas(df, npartitions=df.index.nunique()) 1608 ddf_no_divs = dd.from_pandas(df, npartitions=df.index.nunique(), sort=False) 1609 1610 # Expected result 1611 expected = df.groupby(group_args).apply(apply_func) 1612 1613 with warnings.catch_warnings(): 1614 warnings.simplefilter("ignore") 1615 1616 # Compute on dask DataFrame with divisions (no shuffling) 1617 result = ddf.groupby(group_args).apply(apply_func) 1618 assert_eq(expected, result, check_divisions=False) 1619 1620 # Check that partitioning is preserved 1621 assert ddf.divisions == result.divisions 1622 1623 # Check that no shuffling occurred. 1624 # The groupby operation should add only 1 task per partition 1625 assert len(result.dask) == (len(ddf.dask) + ddf.npartitions) 1626 1627 # Compute on dask DataFrame without divisions (requires shuffling) 1628 result = ddf_no_divs.groupby(group_args).apply(apply_func) 1629 assert_eq(expected, result, check_divisions=False) 1630 1631 # Check that divisions were preserved (all None in this case) 1632 assert ddf_no_divs.divisions == result.divisions 1633 1634 # Crude check to see if shuffling was performed. 1635 # The groupby operation should add only more than 1 task per partition 1636 assert len(result.dask) > (len(ddf_no_divs.dask) + ddf_no_divs.npartitions) 1637 1638 1639custom_mean = dd.Aggregation( 1640 "mean", 1641 lambda s: (s.count(), s.sum()), 1642 lambda s0, s1: (s0.sum(), s1.sum()), 1643 lambda s0, s1: s1 / s0, 1644) 1645 1646custom_sum = dd.Aggregation("sum", lambda s: s.sum(), lambda s0: s0.sum()) 1647 1648 1649@pytest.mark.parametrize( 1650 "pandas_spec, dask_spec, check_dtype", 1651 [ 1652 ({"b": "mean"}, {"b": custom_mean}, False), 1653 ({"b": "sum"}, {"b": custom_sum}, True), 1654 (["mean", "sum"], [custom_mean, custom_sum], False), 1655 ({"b": ["mean", "sum"]}, {"b": [custom_mean, custom_sum]}, False), 1656 ], 1657) 1658def test_dataframe_groupby_agg_custom_sum(pandas_spec, dask_spec, check_dtype): 1659 df = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3}) 1660 ddf = dd.from_pandas(df, npartitions=2) 1661 1662 expected = df.groupby("g").aggregate(pandas_spec) 1663 result = ddf.groupby("g").aggregate(dask_spec) 1664 1665 assert_eq(result, expected, check_dtype=check_dtype) 1666 1667 1668@pytest.mark.parametrize( 1669 "pandas_spec, dask_spec", 1670 [ 1671 ("mean", custom_mean), 1672 (["mean"], [custom_mean]), 1673 (["mean", "sum"], [custom_mean, custom_sum]), 1674 ], 1675) 1676def test_series_groupby_agg_custom_mean(pandas_spec, dask_spec): 1677 d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3}) 1678 a = dd.from_pandas(d, npartitions=2) 1679 1680 expected = d["b"].groupby(d["g"]).aggregate(pandas_spec) 1681 result = a["b"].groupby(a["g"]).aggregate(dask_spec) 1682 1683 assert_eq(result, expected, check_dtype=False) 1684 1685 1686def test_groupby_agg_custom__name_clash_with_internal_same_column(): 1687 """for a single input column only unique names are allowed""" 1688 d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3}) 1689 a = dd.from_pandas(d, npartitions=2) 1690 1691 agg_func = dd.Aggregation("sum", lambda s: s.sum(), lambda s0: s0.sum()) 1692 1693 with pytest.raises(ValueError): 1694 a.groupby("g").aggregate({"b": [agg_func, "sum"]}) 1695 1696 1697def test_groupby_agg_custom__name_clash_with_internal_different_column(): 1698 """custom aggregation functions can share the name of a builtin function""" 1699 d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3, "c": [4, 5, 6] * 3}) 1700 a = dd.from_pandas(d, npartitions=2) 1701 1702 # NOTE: this function is purposefully misnamed 1703 agg_func = dd.Aggregation( 1704 "sum", 1705 lambda s: (s.count(), s.sum()), 1706 lambda s0, s1: (s0.sum(), s1.sum()), 1707 lambda s0, s1: s1 / s0, 1708 ) 1709 1710 # NOTE: the name of agg-func is suppressed in the output, 1711 # since only a single agg func per column was specified 1712 result = a.groupby("g").aggregate({"b": agg_func, "c": "sum"}) 1713 expected = d.groupby("g").aggregate({"b": "mean", "c": "sum"}) 1714 1715 assert_eq(result, expected, check_dtype=False) 1716 1717 1718def test_groupby_agg_custom__mode(): 1719 # mode function passing intermediates as pure python objects around. to protect 1720 # results from pandas in apply use return results as single-item lists 1721 def agg_mode(s): 1722 def impl(s): 1723 (res,) = s.iloc[0] 1724 1725 for (i,) in s.iloc[1:]: 1726 res = res.add(i, fill_value=0) 1727 1728 return [res] 1729 1730 return s.apply(impl) 1731 1732 agg_func = dd.Aggregation( 1733 "custom_mode", 1734 lambda s: s.apply(lambda s: [s.value_counts()]), 1735 agg_mode, 1736 lambda s: s.map(lambda i: i[0].idxmax()), 1737 ) 1738 1739 d = pd.DataFrame( 1740 { 1741 "g0": [0, 0, 0, 1, 1] * 3, 1742 "g1": [0, 0, 0, 1, 1] * 3, 1743 "cc": [4, 5, 4, 6, 6] * 3, 1744 } 1745 ) 1746 a = dd.from_pandas(d, npartitions=5) 1747 1748 actual = a["cc"].groupby([a["g0"], a["g1"]]).agg(agg_func) 1749 1750 # cheat to get the correct index 1751 expected = pd.DataFrame({"g0": [0, 1], "g1": [0, 1], "cc": [4, 6]}) 1752 expected = expected["cc"].groupby([expected["g0"], expected["g1"]]).agg("sum") 1753 1754 assert_eq(actual, expected) 1755 1756 1757@pytest.mark.parametrize("func", ["var", list]) 1758def test_groupby_select_column_agg(func): 1759 pdf = pd.DataFrame( 1760 { 1761 "A": [1, 2, 3, 1, 2, 3, 1, 2, 4], 1762 "B": [-0.776, -0.4, -0.873, 0.054, 1.419, -0.948, -0.967, -1.714, -0.666], 1763 } 1764 ) 1765 ddf = dd.from_pandas(pdf, npartitions=4) 1766 actual = ddf.groupby("A")["B"].agg(func) 1767 expected = pdf.groupby("A")["B"].agg(func) 1768 assert_eq(actual, expected) 1769 1770 1771@pytest.mark.parametrize( 1772 "func", 1773 [ 1774 lambda x: x.std(numeric_only=True), 1775 lambda x: x.groupby("x").std(), 1776 lambda x: x.groupby("x").var(), 1777 lambda x: x.groupby("x").mean(), 1778 lambda x: x.groupby("x").sum(), 1779 lambda x: x.groupby("x").z.std(), 1780 ], 1781) 1782def test_std_object_dtype(func): 1783 df = pd.DataFrame({"x": [1, 2, 1], "y": ["a", "b", "c"], "z": [11.0, 22.0, 33.0]}) 1784 ddf = dd.from_pandas(df, npartitions=2) 1785 1786 assert_eq(func(df), func(ddf)) 1787 1788 1789def test_std_columns_int(): 1790 # Make sure std() works when index_by is a df with integer column names 1791 # Non regression test for issue #3560 1792 1793 df = pd.DataFrame({0: [5], 1: [5]}) 1794 ddf = dd.from_pandas(df, npartitions=2) 1795 by = dask.array.from_array([0, 1]).to_dask_dataframe() 1796 ddf.groupby(by).std() 1797 1798 1799def test_timeseries(): 1800 df = dask.datasets.timeseries().partitions[:2] 1801 assert_eq(df.groupby("name").std(), df.groupby("name").std()) 1802 1803 1804@pytest.mark.parametrize("min_count", [0, 1, 2, 3]) 1805def test_with_min_count(min_count): 1806 dfs = [ 1807 pd.DataFrame( 1808 { 1809 "group": ["A", "A", "B"], 1810 "val1": [np.nan, 2, 3], 1811 "val2": [np.nan, 5, 6], 1812 "val3": [5, 4, 9], 1813 } 1814 ), 1815 pd.DataFrame( 1816 { 1817 "group": ["A", "A", "B"], 1818 "val1": [2, np.nan, np.nan], 1819 "val2": [np.nan, 5, 6], 1820 "val3": [5, 4, 9], 1821 } 1822 ), 1823 ] 1824 ddfs = [dd.from_pandas(df, npartitions=4) for df in dfs] 1825 1826 for df, ddf in zip(dfs, ddfs): 1827 assert_eq( 1828 df.groupby("group").sum(min_count=min_count), 1829 ddf.groupby("group").sum(min_count=min_count), 1830 ) 1831 assert_eq( 1832 df.groupby("group").prod(min_count=min_count), 1833 ddf.groupby("group").prod(min_count=min_count), 1834 ) 1835 1836 1837def test_groupby_group_keys(): 1838 df = pd.DataFrame({"a": [1, 2, 2, 3], "b": [2, 3, 4, 5]}) 1839 ddf = dd.from_pandas(df, npartitions=2).set_index("a") 1840 pdf = df.set_index("a") 1841 1842 func = lambda g: g.copy() 1843 expected = pdf.groupby("a").apply(func) 1844 assert_eq(expected, ddf.groupby("a").apply(func, meta=expected)) 1845 1846 expected = pdf.groupby("a", group_keys=False).apply(func) 1847 assert_eq(expected, ddf.groupby("a", group_keys=False).apply(func, meta=expected)) 1848 1849 1850@pytest.mark.parametrize( 1851 "columns", 1852 [["a", "b", "c"], np.array([1.0, 2.0, 3.0]), ["1", "2", "3"], ["", "a", "b"]], 1853) 1854def test_groupby_cov(columns): 1855 rows = 20 1856 cols = 3 1857 data = np.random.randn(rows, cols) 1858 df = pd.DataFrame(data, columns=columns) 1859 df["key"] = [0] * 10 + [1] * 5 + [2] * 5 1860 ddf = dd.from_pandas(df, npartitions=3) 1861 1862 expected = df.groupby("key").cov() 1863 result = ddf.groupby("key").cov() 1864 # when using numerical values for columns 1865 # the column mapping and stacking leads to a float typed 1866 # MultiIndex. Pandas will normally create a object typed 1867 # MultiIndex 1868 if isinstance(columns, np.ndarray): 1869 result = result.compute() 1870 # don't bother checking index -- MultiIndex levels are in a frozenlist 1871 result.columns = result.columns.astype(np.dtype("O")) 1872 assert_eq(expected, result, check_index=False) 1873 else: 1874 assert_eq(expected, result) 1875 1876 1877def test_df_groupby_idxmin(): 1878 pdf = pd.DataFrame( 1879 {"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]} 1880 ).set_index("idx") 1881 1882 ddf = dd.from_pandas(pdf, npartitions=3) 1883 1884 expected = pd.DataFrame({"group": [1, 2], "value": [0, 3]}).set_index("group") 1885 1886 result_pd = pdf.groupby("group").idxmin() 1887 result_dd = ddf.groupby("group").idxmin() 1888 1889 assert_eq(result_pd, result_dd) 1890 assert_eq(expected, result_dd) 1891 1892 1893@pytest.mark.parametrize("skipna", [True, False]) 1894def test_df_groupby_idxmin_skipna(skipna): 1895 pdf = pd.DataFrame( 1896 { 1897 "idx": list(range(4)), 1898 "group": [1, 1, 2, 2], 1899 "value": [np.nan, 20.1, np.nan, 10.1], 1900 } 1901 ).set_index("idx") 1902 1903 ddf = dd.from_pandas(pdf, npartitions=3) 1904 1905 result_pd = pdf.groupby("group").idxmin(skipna=skipna) 1906 result_dd = ddf.groupby("group").idxmin(skipna=skipna) 1907 1908 assert_eq(result_pd, result_dd) 1909 1910 1911def test_df_groupby_idxmax(): 1912 pdf = pd.DataFrame( 1913 {"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]} 1914 ).set_index("idx") 1915 1916 ddf = dd.from_pandas(pdf, npartitions=3) 1917 1918 expected = pd.DataFrame({"group": [1, 2], "value": [1, 2]}).set_index("group") 1919 1920 result_pd = pdf.groupby("group").idxmax() 1921 result_dd = ddf.groupby("group").idxmax() 1922 1923 assert_eq(result_pd, result_dd) 1924 assert_eq(expected, result_dd) 1925 1926 1927@pytest.mark.parametrize("skipna", [True, False]) 1928def test_df_groupby_idxmax_skipna(skipna): 1929 pdf = pd.DataFrame( 1930 { 1931 "idx": list(range(4)), 1932 "group": [1, 1, 2, 2], 1933 "value": [np.nan, 20.1, np.nan, 10.1], 1934 } 1935 ).set_index("idx") 1936 1937 ddf = dd.from_pandas(pdf, npartitions=3) 1938 1939 result_pd = pdf.groupby("group").idxmax(skipna=skipna) 1940 result_dd = ddf.groupby("group").idxmax(skipna=skipna) 1941 1942 assert_eq(result_pd, result_dd) 1943 1944 1945def test_series_groupby_idxmin(): 1946 pdf = pd.DataFrame( 1947 {"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]} 1948 ).set_index("idx") 1949 1950 ddf = dd.from_pandas(pdf, npartitions=3) 1951 1952 expected = ( 1953 pd.DataFrame({"group": [1, 2], "value": [0, 3]}).set_index("group").squeeze() 1954 ) 1955 1956 result_pd = pdf.groupby("group")["value"].idxmin() 1957 result_dd = ddf.groupby("group")["value"].idxmin() 1958 1959 assert_eq(result_pd, result_dd) 1960 assert_eq(expected, result_dd) 1961 1962 1963@pytest.mark.parametrize("skipna", [True, False]) 1964def test_series_groupby_idxmin_skipna(skipna): 1965 pdf = pd.DataFrame( 1966 { 1967 "idx": list(range(4)), 1968 "group": [1, 1, 2, 2], 1969 "value": [np.nan, 20.1, np.nan, 10.1], 1970 } 1971 ).set_index("idx") 1972 1973 ddf = dd.from_pandas(pdf, npartitions=3) 1974 1975 result_pd = pdf.groupby("group")["value"].idxmin(skipna=skipna) 1976 result_dd = ddf.groupby("group")["value"].idxmin(skipna=skipna) 1977 1978 assert_eq(result_pd, result_dd) 1979 1980 1981def test_series_groupby_idxmax(): 1982 pdf = pd.DataFrame( 1983 {"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]} 1984 ).set_index("idx") 1985 1986 ddf = dd.from_pandas(pdf, npartitions=3) 1987 1988 expected = ( 1989 pd.DataFrame({"group": [1, 2], "value": [1, 2]}).set_index("group").squeeze() 1990 ) 1991 1992 result_pd = pdf.groupby("group")["value"].idxmax() 1993 result_dd = ddf.groupby("group")["value"].idxmax() 1994 1995 assert_eq(result_pd, result_dd) 1996 assert_eq(expected, result_dd) 1997 1998 1999@pytest.mark.parametrize("skipna", [True, False]) 2000def test_series_groupby_idxmax_skipna(skipna): 2001 pdf = pd.DataFrame( 2002 { 2003 "idx": list(range(4)), 2004 "group": [1, 1, 2, 2], 2005 "value": [np.nan, 20.1, np.nan, 10.1], 2006 } 2007 ).set_index("idx") 2008 2009 ddf = dd.from_pandas(pdf, npartitions=3) 2010 2011 result_pd = pdf.groupby("group")["value"].idxmax(skipna=skipna) 2012 result_dd = ddf.groupby("group")["value"].idxmax(skipna=skipna) 2013 2014 assert_eq(result_pd, result_dd) 2015 2016 2017def test_groupby_unique(): 2018 rng = np.random.RandomState(42) 2019 df = pd.DataFrame( 2020 {"foo": rng.randint(3, size=100), "bar": rng.randint(10, size=100)} 2021 ) 2022 ddf = dd.from_pandas(df, npartitions=10) 2023 2024 pd_gb = df.groupby("foo")["bar"].unique() 2025 dd_gb = ddf.groupby("foo")["bar"].unique() 2026 2027 # Use explode because each DataFrame row is a list; equality fails 2028 assert_eq(dd_gb.explode(), pd_gb.explode()) 2029 2030 2031def test_groupby_value_counts(): 2032 rng = np.random.RandomState(42) 2033 df = pd.DataFrame( 2034 {"foo": rng.randint(3, size=100), "bar": rng.randint(4, size=100)} 2035 ) 2036 ddf = dd.from_pandas(df, npartitions=2) 2037 2038 pd_gb = df.groupby("foo")["bar"].value_counts() 2039 dd_gb = ddf.groupby("foo")["bar"].value_counts() 2040 assert_eq(dd_gb, pd_gb) 2041 2042 2043@pytest.mark.parametrize( 2044 "transformation", [lambda x: x.sum(), np.sum, "sum", pd.Series.rank] 2045) 2046def test_groupby_transform_funcs(transformation): 2047 pdf = pd.DataFrame( 2048 { 2049 "A": [1, 2, 3, 4] * 5, 2050 "B": np.random.randn(20), 2051 "C": np.random.randn(20), 2052 "D": np.random.randn(20), 2053 } 2054 ) 2055 ddf = dd.from_pandas(pdf, 3) 2056 2057 with pytest.warns(UserWarning): 2058 # DataFrame 2059 assert_eq( 2060 pdf.groupby("A").transform(transformation), 2061 ddf.groupby("A").transform(transformation), 2062 ) 2063 2064 # Series 2065 assert_eq( 2066 pdf.groupby("A")["B"].transform(transformation), 2067 ddf.groupby("A")["B"].transform(transformation), 2068 ) 2069 2070 2071@pytest.mark.parametrize("npartitions", list(range(1, 10))) 2072@pytest.mark.parametrize("indexed", [True, False]) 2073def test_groupby_transform_ufunc_partitioning(npartitions, indexed): 2074 pdf = pd.DataFrame({"group": [1, 2, 3, 4, 5] * 20, "value": np.random.randn(100)}) 2075 2076 if indexed: 2077 pdf = pdf.set_index("group") 2078 2079 ddf = dd.from_pandas(pdf, npartitions) 2080 2081 with pytest.warns(UserWarning): 2082 # DataFrame 2083 assert_eq( 2084 pdf.groupby("group").transform(lambda series: series - series.mean()), 2085 ddf.groupby("group").transform(lambda series: series - series.mean()), 2086 ) 2087 2088 # Series 2089 assert_eq( 2090 pdf.groupby("group")["value"].transform( 2091 lambda series: series - series.mean() 2092 ), 2093 ddf.groupby("group")["value"].transform( 2094 lambda series: series - series.mean() 2095 ), 2096 ) 2097 2098 2099@pytest.mark.parametrize( 2100 "grouping,agg", 2101 [ 2102 ( 2103 lambda df: df.drop(columns="category_2").groupby("category_1"), 2104 lambda grp: grp.mean(), 2105 ), 2106 ( 2107 lambda df: df.drop(columns="category_2").groupby("category_1"), 2108 lambda grp: grp.agg("mean"), 2109 ), 2110 (lambda df: df.groupby(["category_1", "category_2"]), lambda grp: grp.mean()), 2111 ( 2112 lambda df: df.groupby(["category_1", "category_2"]), 2113 lambda grp: grp.agg("mean"), 2114 ), 2115 ], 2116) 2117def test_groupby_aggregate_categoricals(grouping, agg): 2118 pdf = pd.DataFrame( 2119 { 2120 "category_1": pd.Categorical(list("AABBCC")), 2121 "category_2": pd.Categorical(list("ABCABC")), 2122 "value": np.random.uniform(size=6), 2123 } 2124 ) 2125 ddf = dd.from_pandas(pdf, 2) 2126 2127 # DataFrameGroupBy 2128 assert_eq(agg(grouping(pdf)), agg(grouping(ddf))) 2129 2130 # SeriesGroupBy 2131 assert_eq(agg(grouping(pdf)["value"]), agg(grouping(ddf)["value"])) 2132 2133 2134@pytest.mark.xfail( 2135 not dask.dataframe.utils.PANDAS_GT_110, 2136 reason="dropna kwarg not supported in pandas < 1.1.0.", 2137) 2138@pytest.mark.parametrize("dropna", [False, True]) 2139def test_groupby_dropna_pandas(dropna): 2140 df = pd.DataFrame( 2141 {"a": [1, 2, 3, 4, None, None, 7, 8], "e": [4, 5, 6, 3, 2, 1, 0, 0]} 2142 ) 2143 ddf = dd.from_pandas(df, npartitions=3) 2144 2145 dask_result = ddf.groupby("a", dropna=dropna).e.sum() 2146 pd_result = df.groupby("a", dropna=dropna).e.sum() 2147 assert_eq(dask_result, pd_result) 2148 2149 2150@pytest.mark.gpu 2151@pytest.mark.parametrize("dropna", [False, True, None]) 2152@pytest.mark.parametrize("by", ["a", "c", "d", ["a", "b"], ["a", "c"], ["a", "d"]]) 2153def test_groupby_dropna_cudf(dropna, by): 2154 2155 # NOTE: This test requires cudf/dask_cudf, and will 2156 # be skipped by non-GPU CI 2157 2158 cudf = pytest.importorskip("cudf") 2159 dask_cudf = pytest.importorskip("dask_cudf") 2160 2161 df = cudf.DataFrame( 2162 { 2163 "a": [1, 2, 3, 4, None, None, 7, 8], 2164 "b": [1, 0] * 4, 2165 "c": ["a", "b", None, None, "e", "f", "g", "h"], 2166 "e": [4, 5, 6, 3, 2, 1, 0, 0], 2167 } 2168 ) 2169 df["d"] = df["c"].astype("category") 2170 ddf = dask_cudf.from_cudf(df, npartitions=3) 2171 2172 if dropna is None: 2173 dask_result = ddf.groupby(by).e.sum() 2174 cudf_result = df.groupby(by).e.sum() 2175 else: 2176 dask_result = ddf.groupby(by, dropna=dropna).e.sum() 2177 cudf_result = df.groupby(by, dropna=dropna).e.sum() 2178 if by in ["c", "d"]: 2179 # Lose string/category index name in cudf... 2180 dask_result = dask_result.compute() 2181 dask_result.index.name = cudf_result.index.name 2182 2183 assert_eq(dask_result, cudf_result) 2184 2185 2186@pytest.mark.xfail( 2187 not dask.dataframe.utils.PANDAS_GT_110, 2188 reason="Should work starting from pandas 1.1.0", 2189) 2190def test_groupby_dropna_with_agg(): 2191 # https://github.com/dask/dask/issues/6986 2192 df = pd.DataFrame( 2193 {"id1": ["a", None, "b"], "id2": [1, 2, None], "v1": [4.5, 5.5, None]} 2194 ) 2195 expected = df.groupby(["id1", "id2"], dropna=False).agg("sum") 2196 2197 ddf = dd.from_pandas(df, 1, sort=False) 2198 actual = ddf.groupby(["id1", "id2"], dropna=False).agg("sum") 2199 assert_eq(expected, actual) 2200 2201 2202def test_groupby_observed_with_agg(): 2203 df = pd.DataFrame( 2204 { 2205 "cat_1": pd.Categorical(list("AB"), categories=list("ABCDE")), 2206 "cat_2": pd.Categorical([1, 2], categories=[1, 2, 3]), 2207 "value_1": np.random.uniform(size=2), 2208 } 2209 ) 2210 expected = df.groupby(["cat_1", "cat_2"], observed=True).agg("sum") 2211 2212 ddf = dd.from_pandas(df, 2) 2213 actual = ddf.groupby(["cat_1", "cat_2"], observed=True).agg("sum") 2214 assert_eq(expected, actual) 2215 2216 2217def test_rounding_negative_var(): 2218 x = [-0.00179999999 for _ in range(10)] 2219 ids = [1 for _ in range(5)] + [2 for _ in range(5)] 2220 2221 df = pd.DataFrame({"ids": ids, "x": x}) 2222 2223 ddf = dd.from_pandas(df, npartitions=2) 2224 assert_eq(ddf.groupby("ids").x.std(), df.groupby("ids").x.std()) 2225 2226 2227@pytest.mark.parametrize("split_out", [2, 3]) 2228@pytest.mark.parametrize("column", [["b", "c"], ["b", "d"], ["b", "e"]]) 2229def test_groupby_split_out_multiindex(split_out, column): 2230 df = pd.DataFrame( 2231 { 2232 "a": np.arange(8), 2233 "b": [1, 0, 0, 2, 1, 1, 2, 0], 2234 "c": [0, 1] * 4, 2235 "d": ["dog", "cat", "cat", "dog", "dog", "dog", "cat", "bird"], 2236 } 2237 ).fillna(0) 2238 df["e"] = df["d"].astype("category") 2239 ddf = dd.from_pandas(df, npartitions=3) 2240 2241 ddf_result_so1 = ( 2242 ddf.groupby(column).a.mean(split_out=1).compute().sort_values().dropna() 2243 ) 2244 2245 ddf_result = ( 2246 ddf.groupby(column).a.mean(split_out=split_out).compute().sort_values().dropna() 2247 ) 2248 2249 assert_eq(ddf_result, ddf_result_so1, check_index=False) 2250 2251 2252@pytest.mark.parametrize( 2253 "backend", 2254 [ 2255 "pandas", 2256 pytest.param("cudf", marks=pytest.mark.gpu), 2257 ], 2258) 2259def test_groupby_large_ints_exception(backend): 2260 data_source = pytest.importorskip(backend) 2261 if backend == "cudf": 2262 dask_cudf = pytest.importorskip("dask_cudf") 2263 data_frame = dask_cudf.from_cudf 2264 else: 2265 data_frame = dd.from_pandas 2266 max = np.iinfo(np.uint64).max 2267 sqrt = max ** 0.5 2268 series = data_source.Series( 2269 np.concatenate([sqrt * np.arange(5), np.arange(35)]) 2270 ).astype("int64") 2271 df = data_source.DataFrame({"x": series, "z": np.arange(40), "y": np.arange(40)}) 2272 ddf = data_frame(df, npartitions=1) 2273 assert_eq( 2274 df.groupby("x").std(), 2275 ddf.groupby("x").std().compute(scheduler="single-threaded"), 2276 ) 2277 2278 2279@pytest.mark.parametrize("by", ["a", "b", "c", ["a", "b"], ["a", "c"]]) 2280@pytest.mark.parametrize("agg", ["count", "mean", "std"]) 2281@pytest.mark.parametrize("sort", [True, False]) 2282def test_groupby_sort_argument(by, agg, sort): 2283 2284 df = pd.DataFrame( 2285 { 2286 "a": [1, 2, 3, 4, None, None, 7, 8], 2287 "b": [1, 0] * 4, 2288 "c": ["a", "b", None, None, "e", "f", "g", "h"], 2289 "e": [4, 5, 6, 3, 2, 1, 0, 0], 2290 } 2291 ) 2292 ddf = dd.from_pandas(df, npartitions=3) 2293 2294 gb = ddf.groupby(by, sort=sort) 2295 gb_pd = df.groupby(by, sort=sort) 2296 2297 # Basic groupby aggregation 2298 result_1 = getattr(gb, agg) 2299 result_1_pd = getattr(gb_pd, agg) 2300 2301 # Choose single column 2302 result_2 = getattr(gb.e, agg) 2303 result_2_pd = getattr(gb_pd.e, agg) 2304 2305 # Use `agg()` api 2306 result_3 = gb.agg({"e": agg}) 2307 result_3_pd = gb_pd.agg({"e": agg}) 2308 2309 if agg == "mean": 2310 assert_eq(result_1(), result_1_pd().astype("float")) 2311 assert_eq(result_2(), result_2_pd().astype("float")) 2312 assert_eq(result_3, result_3_pd.astype("float")) 2313 else: 2314 assert_eq(result_1(), result_1_pd()) 2315 assert_eq(result_2(), result_2_pd()) 2316 assert_eq(result_3, result_3_pd) 2317 2318 2319@pytest.mark.parametrize("agg", [M.sum, M.prod, M.max, M.min]) 2320@pytest.mark.parametrize("sort", [True, False]) 2321def test_groupby_sort_argument_agg(agg, sort): 2322 df = pd.DataFrame({"x": [4, 2, 1, 2, 3, 1], "y": [1, 2, 3, 4, 5, 6]}) 2323 ddf = dd.from_pandas(df, npartitions=3) 2324 2325 result = agg(ddf.groupby("x", sort=sort)) 2326 result_pd = agg(df.groupby("x", sort=sort)) 2327 2328 assert_eq(result, result_pd) 2329 if sort: 2330 # Check order of index if sort==True 2331 # (no guarantee that order will match otherwise) 2332 assert_eq(result.index, result_pd.index) 2333 2334 2335def test_groupby_sort_true_split_out(): 2336 df = pd.DataFrame({"x": [4, 2, 1, 2, 3, 1], "y": [1, 2, 3, 4, 5, 6]}) 2337 ddf = dd.from_pandas(df, npartitions=3) 2338 2339 # Works fine for split_out==1 or sort=False/None 2340 M.sum(ddf.groupby("x", sort=True), split_out=1) 2341 M.sum(ddf.groupby("x", sort=False), split_out=2) 2342 M.sum(ddf.groupby("x"), split_out=2) 2343 2344 with pytest.raises(NotImplementedError): 2345 # Cannot use sort=True with split_out>1 (for now) 2346 M.sum(ddf.groupby("x", sort=True), split_out=2) 2347 2348 2349@pytest.mark.skipif( 2350 not PANDAS_GT_110, reason="observed only supported for newer pandas" 2351) 2352@pytest.mark.parametrize("known_cats", [True, False]) 2353@pytest.mark.parametrize("ordered_cats", [True, False]) 2354@pytest.mark.parametrize("groupby", ["cat_1", ["cat_1", "cat_2"]]) 2355@pytest.mark.parametrize("observed", [True, False]) 2356def test_groupby_aggregate_categorical_observed( 2357 known_cats, ordered_cats, agg_func, groupby, observed 2358): 2359 if agg_func in ["cov", "corr", "nunique"]: 2360 pytest.skip("Not implemented for DataFrameGroupBy yet.") 2361 if agg_func in ["sum", "count", "prod"] and groupby != "cat_1": 2362 pytest.skip("Gives zeros rather than nans.") 2363 if agg_func in ["std", "var"] and observed: 2364 pytest.skip("Can't calculate observed with all nans") 2365 2366 pdf = pd.DataFrame( 2367 { 2368 "cat_1": pd.Categorical( 2369 list("AB"), categories=list("ABCDE"), ordered=ordered_cats 2370 ), 2371 "cat_2": pd.Categorical([1, 2], categories=[1, 2, 3], ordered=ordered_cats), 2372 "value_1": np.random.uniform(size=2), 2373 } 2374 ) 2375 ddf = dd.from_pandas(pdf, 2) 2376 2377 if not known_cats: 2378 ddf["cat_1"] = ddf["cat_1"].cat.as_unknown() 2379 ddf["cat_2"] = ddf["cat_2"].cat.as_unknown() 2380 2381 def agg(grp, **kwargs): 2382 return getattr(grp, agg_func)(**kwargs) 2383 2384 # only include numeric columns when passing to "min" or "max" 2385 # pandas default is numeric_only=False 2386 if ordered_cats is False and agg_func in ["min", "max"] and groupby == "cat_1": 2387 pdf = pdf[["cat_1", "value_1"]] 2388 ddf = ddf[["cat_1", "value_1"]] 2389 2390 assert_eq( 2391 agg(pdf.groupby(groupby, observed=observed)), 2392 agg(ddf.groupby(groupby, observed=observed)), 2393 ) 2394 2395 2396def test_empty_partitions_with_value_counts(): 2397 # https://github.com/dask/dask/issues/7065 2398 df = pd.DataFrame( 2399 data=[ 2400 ["a1", "b1"], 2401 ["a1", None], 2402 ["a1", "b1"], 2403 [None, None], 2404 [None, None], 2405 [None, None], 2406 ["a3", "b3"], 2407 ["a3", "b3"], 2408 ["a5", "b5"], 2409 ], 2410 columns=["A", "B"], 2411 ) 2412 2413 expected = df.groupby("A")["B"].value_counts() 2414 ddf = dd.from_pandas(df, npartitions=3) 2415 actual = ddf.groupby("A")["B"].value_counts() 2416 assert_eq(expected, actual) 2417 2418 2419def test_groupby_with_pd_grouper(): 2420 ddf = dd.from_pandas( 2421 pd.DataFrame( 2422 {"key1": ["a", "b", "a"], "key2": ["c", "c", "c"], "value": [1, 2, 3]} 2423 ), 2424 npartitions=3, 2425 ) 2426 with pytest.raises(NotImplementedError): 2427 ddf.groupby(pd.Grouper(key="key1")) 2428 with pytest.raises(NotImplementedError): 2429 ddf.groupby(["key1", pd.Grouper(key="key2")]) 2430 2431 2432@pytest.mark.parametrize("operation", ["head", "tail"]) 2433def test_groupby_empty_partitions_with_rows_operation(operation): 2434 2435 df = pd.DataFrame( 2436 data=[ 2437 ["a1", "b1"], 2438 ["a1", None], 2439 ["a1", "b1"], 2440 [None, None], 2441 [None, None], 2442 [None, None], 2443 ["a3", "b3"], 2444 ["a3", "b3"], 2445 ["a5", "b5"], 2446 ], 2447 columns=["A", "B"], 2448 ) 2449 2450 caller = operator.methodcaller(operation, 1) 2451 expected = caller(df.groupby("A")["B"]) 2452 ddf = dd.from_pandas(df, npartitions=3) 2453 actual = caller(ddf.groupby("A")["B"]) 2454 assert_eq(expected, actual) 2455 2456 2457@pytest.mark.parametrize("operation", ["head", "tail"]) 2458def test_groupby_with_row_operations(operation): 2459 df = pd.DataFrame( 2460 data=[ 2461 ["a0", "b1"], 2462 ["a0", "b2"], 2463 ["a1", "b1"], 2464 ["a3", "b3"], 2465 ["a3", "b3"], 2466 ["a5", "b5"], 2467 ["a1", "b1"], 2468 ["a1", "b1"], 2469 ["a1", "b1"], 2470 ], 2471 columns=["A", "B"], 2472 ) 2473 2474 caller = operator.methodcaller(operation) 2475 expected = caller(df.groupby("A")["B"]) 2476 ddf = dd.from_pandas(df, npartitions=3) 2477 actual = caller(ddf.groupby("A")["B"]) 2478 assert_eq(expected, actual) 2479 2480 2481@pytest.mark.parametrize("operation", ["head", "tail"]) 2482def test_groupby_multi_index_with_row_operations(operation): 2483 df = pd.DataFrame( 2484 data=[ 2485 ["a0", "b1"], 2486 ["a0", "b2"], 2487 ["a1", "b1"], 2488 ["a3", "b3"], 2489 ["a3", "b3"], 2490 ["a5", "b5"], 2491 ["a1", "b1"], 2492 ["a1", "b1"], 2493 ["a1", "b1"], 2494 ], 2495 columns=["A", "B"], 2496 ) 2497 2498 caller = operator.methodcaller(operation) 2499 expected = caller(df.groupby(["A", df["A"].eq("a1")])["B"]) 2500 ddf = dd.from_pandas(df, npartitions=3) 2501 actual = caller(ddf.groupby(["A", ddf["A"].eq("a1")])["B"]) 2502 assert_eq(expected, actual) 2503