1import collections
2import operator
3import warnings
4
5import numpy as np
6import pandas as pd
7import pytest
8
9import dask
10import dask.dataframe as dd
11from dask.dataframe import _compat
12from dask.dataframe._compat import PANDAS_GT_110, tm
13from dask.dataframe.utils import assert_dask_graph, assert_eq, assert_max_deps
14from dask.utils import M
15
16AGG_FUNCS = [
17    "sum",
18    "mean",
19    "min",
20    "max",
21    "count",
22    "size",
23    "std",
24    "var",
25    "cov",
26    "corr",
27    "nunique",
28    "first",
29    "last",
30    "prod",
31]
32
33
34@pytest.fixture(params=AGG_FUNCS)
35def agg_func(request):
36    """
37    Aggregations supported for groups
38    """
39    return request.param
40
41
42# Wrapper fixture for shuffle_method to auto-apply it to all the tests in this module,
43# as we don't want to auto-apply the fixture repo-wide.
44@pytest.fixture(autouse=True)
45def auto_shuffle_method(shuffle_method):
46    yield
47
48
49@pytest.mark.xfail(reason="uncertain how to handle. See issue #3481.")
50def test_groupby_internal_repr_xfail():
51    pdf = pd.DataFrame({"x": [0, 1, 2, 3, 4, 6, 7, 8, 9, 10], "y": list("abcbabbcda")})
52    ddf = dd.from_pandas(pdf, 3)
53
54    gp = pdf.groupby("y")["x"]
55    dp = ddf.groupby("y")["x"]
56    assert isinstance(dp.obj, dd.Series)
57    assert_eq(dp.obj, gp.obj)
58
59    gp = pdf.groupby(pdf.y)["x"]
60    dp = ddf.groupby(ddf.y)["x"]
61    assert isinstance(dp.obj, dd.Series)
62
63
64def test_groupby_internal_repr():
65    pdf = pd.DataFrame({"x": [0, 1, 2, 3, 4, 6, 7, 8, 9, 10], "y": list("abcbabbcda")})
66    ddf = dd.from_pandas(pdf, 3)
67
68    gp = pdf.groupby("y")
69    dp = ddf.groupby("y")
70    assert isinstance(dp, dd.groupby.DataFrameGroupBy)
71    assert isinstance(dp._meta, pd.core.groupby.DataFrameGroupBy)
72    assert isinstance(dp.obj, dd.DataFrame)
73    assert_eq(dp.obj, gp.obj)
74
75    gp = pdf.groupby("y")["x"]
76    dp = ddf.groupby("y")["x"]
77    assert isinstance(dp, dd.groupby.SeriesGroupBy)
78    assert isinstance(dp._meta, pd.core.groupby.SeriesGroupBy)
79
80    gp = pdf.groupby("y")[["x"]]
81    dp = ddf.groupby("y")[["x"]]
82    assert isinstance(dp, dd.groupby.DataFrameGroupBy)
83    assert isinstance(dp._meta, pd.core.groupby.DataFrameGroupBy)
84    # slicing should not affect to internal
85    assert isinstance(dp.obj, dd.DataFrame)
86    assert_eq(dp.obj, gp.obj)
87
88    gp = pdf.groupby(pdf.y)["x"]
89    dp = ddf.groupby(ddf.y)["x"]
90    assert isinstance(dp, dd.groupby.SeriesGroupBy)
91    assert isinstance(dp._meta, pd.core.groupby.SeriesGroupBy)
92
93    gp = pdf.groupby(pdf.y)[["x"]]
94    dp = ddf.groupby(ddf.y)[["x"]]
95    assert isinstance(dp, dd.groupby.DataFrameGroupBy)
96    assert isinstance(dp._meta, pd.core.groupby.DataFrameGroupBy)
97    # slicing should not affect to internal
98    assert isinstance(dp.obj, dd.DataFrame)
99    assert_eq(dp.obj, gp.obj)
100
101
102def test_groupby_error():
103    pdf = pd.DataFrame({"x": [0, 1, 2, 3, 4, 6, 7, 8, 9, 10], "y": list("abcbabbcda")})
104    ddf = dd.from_pandas(pdf, 3)
105
106    with pytest.raises(KeyError):
107        ddf.groupby("A")
108
109    with pytest.raises(KeyError):
110        ddf.groupby(["x", "A"])
111
112    dp = ddf.groupby("y")
113
114    msg = "Column not found: "
115    with pytest.raises(KeyError) as err:
116        dp["A"]
117    assert msg in str(err.value)
118
119    msg = "Columns not found: "
120    with pytest.raises(KeyError) as err:
121        dp[["x", "A"]]
122    assert msg in str(err.value)
123
124
125def test_full_groupby():
126    df = pd.DataFrame(
127        {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
128        index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
129    )
130    ddf = dd.from_pandas(df, npartitions=3)
131
132    pytest.raises(KeyError, lambda: ddf.groupby("does_not_exist"))
133    pytest.raises(AttributeError, lambda: ddf.groupby("a").does_not_exist)
134    assert "b" in dir(ddf.groupby("a"))
135
136    def func(df):
137        return df.assign(b=df.b - df.b.mean())
138
139    with warnings.catch_warnings():
140        warnings.simplefilter("ignore")
141        assert ddf.groupby("a").apply(func)._name.startswith("func")
142
143        assert_eq(df.groupby("a").apply(func), ddf.groupby("a").apply(func))
144
145
146def test_full_groupby_apply_multiarg():
147    df = pd.DataFrame(
148        {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
149        index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
150    )
151    ddf = dd.from_pandas(df, npartitions=3)
152
153    def func(df, c, d=3):
154        return df.assign(b=df.b - df.b.mean() + c * d)
155
156    c = df.a.sum()
157    d = df.b.mean()
158
159    c_scalar = ddf.a.sum()
160    d_scalar = ddf.b.mean()
161    c_delayed = dask.delayed(lambda: c)()
162    d_delayed = dask.delayed(lambda: d)()
163
164    meta = df.groupby("a").apply(func, c)
165
166    with warnings.catch_warnings():
167        warnings.simplefilter("ignore")
168        assert_eq(
169            df.groupby("a").apply(func, c, d=d),
170            ddf.groupby("a").apply(func, c, d=d_scalar),
171        )
172
173        assert_eq(df.groupby("a").apply(func, c), ddf.groupby("a").apply(func, c))
174
175        assert_eq(
176            df.groupby("a").apply(func, c, d=d), ddf.groupby("a").apply(func, c, d=d)
177        )
178
179        assert_eq(
180            df.groupby("a").apply(func, c),
181            ddf.groupby("a").apply(func, c_scalar),
182            check_dtype=False,
183        )
184
185        assert_eq(
186            df.groupby("a").apply(func, c),
187            ddf.groupby("a").apply(func, c_scalar, meta=meta),
188        )
189
190        assert_eq(
191            df.groupby("a").apply(func, c, d=d),
192            ddf.groupby("a").apply(func, c, d=d_scalar, meta=meta),
193        )
194
195    # Delayed arguments work, but only if metadata is provided
196    with pytest.raises(ValueError) as exc:
197        ddf.groupby("a").apply(func, c, d=d_delayed)
198    assert "dask.delayed" in str(exc.value) and "meta" in str(exc.value)
199
200    with pytest.raises(ValueError) as exc:
201        ddf.groupby("a").apply(func, c_delayed, d=d)
202    assert "dask.delayed" in str(exc.value) and "meta" in str(exc.value)
203
204    assert_eq(
205        df.groupby("a").apply(func, c),
206        ddf.groupby("a").apply(func, c_delayed, meta=meta),
207    )
208
209    assert_eq(
210        df.groupby("a").apply(func, c, d=d),
211        ddf.groupby("a").apply(func, c, d=d_delayed, meta=meta),
212    )
213
214
215@pytest.mark.parametrize(
216    "grouper",
217    [
218        lambda df: ["a"],
219        lambda df: ["a", "b"],
220        lambda df: df["a"],
221        lambda df: [df["a"], df["b"]],
222        pytest.param(
223            lambda df: [df["a"] > 2, df["b"] > 1],
224            marks=pytest.mark.xfail(reason="not yet supported"),
225        ),
226    ],
227)
228@pytest.mark.parametrize("reverse", [True, False])
229def test_full_groupby_multilevel(grouper, reverse):
230    index = [0, 1, 3, 5, 6, 8, 9, 9, 9]
231    if reverse:
232        index = index[::-1]
233    df = pd.DataFrame(
234        {
235            "a": [1, 2, 3, 4, 5, 6, 7, 8, 9],
236            "d": [1, 2, 3, 4, 5, 6, 7, 8, 9],
237            "b": [4, 5, 6, 3, 2, 1, 0, 0, 0],
238        },
239        index=index,
240    )
241    ddf = dd.from_pandas(df, npartitions=3)
242
243    def func(df):
244        return df.assign(b=df.b - df.b.mean())
245
246    # last one causes a DeprecationWarning from pandas.
247    # See https://github.com/pandas-dev/pandas/issues/16481
248    with warnings.catch_warnings():
249        warnings.simplefilter("ignore")
250        assert_eq(
251            df.groupby(grouper(df)).apply(func), ddf.groupby(grouper(ddf)).apply(func)
252        )
253
254
255def test_groupby_dir():
256    df = pd.DataFrame({"a": range(10), "b c d e": range(10)})
257    ddf = dd.from_pandas(df, npartitions=2)
258    g = ddf.groupby("a")
259    assert "a" in dir(g)
260    assert "b c d e" not in dir(g)
261
262
263@pytest.mark.parametrize("scheduler", ["sync", "threads"])
264def test_groupby_on_index(scheduler):
265    pdf = pd.DataFrame(
266        {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
267        index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
268    )
269    ddf = dd.from_pandas(pdf, npartitions=3)
270
271    ddf2 = ddf.set_index("a")
272    pdf2 = pdf.set_index("a")
273    assert_eq(ddf.groupby("a").b.mean(), ddf2.groupby(ddf2.index).b.mean())
274
275    def func(df):
276        return df.assign(b=df.b - df.b.mean())
277
278    def func2(df):
279        return df[["b"]] - df[["b"]].mean()
280
281    def func3(df):
282        return df.mean()
283
284    with dask.config.set(scheduler=scheduler):
285        with pytest.warns(None):
286            assert_eq(ddf.groupby("a").apply(func), pdf.groupby("a").apply(func))
287
288            assert_eq(
289                ddf.groupby("a").apply(func).set_index("a"),
290                pdf.groupby("a").apply(func).set_index("a"),
291            )
292
293            assert_eq(
294                pdf2.groupby(pdf2.index).apply(func2),
295                ddf2.groupby(ddf2.index).apply(func2),
296            )
297
298            assert_eq(
299                ddf2.b.groupby("a").apply(func3), pdf2.b.groupby("a").apply(func3)
300            )
301
302            assert_eq(
303                ddf2.b.groupby(ddf2.index).apply(func3),
304                pdf2.b.groupby(pdf2.index).apply(func3),
305            )
306
307
308@pytest.mark.parametrize(
309    "grouper",
310    [
311        lambda df: df.groupby("a")["b"],
312        lambda df: df.groupby(["a", "b"]),
313        lambda df: df.groupby(["a", "b"])["c"],
314        lambda df: df.groupby(df["a"])[["b", "c"]],
315        lambda df: df.groupby("a")[["b", "c"]],
316        lambda df: df.groupby("a")[["b"]],
317        lambda df: df.groupby(["a", "b", "c"]),
318    ],
319)
320def test_groupby_multilevel_getitem(grouper, agg_func):
321    # nunique is not implemented for DataFrameGroupBy
322    if agg_func == "nunique":
323        return
324
325    df = pd.DataFrame(
326        {
327            "a": [1, 2, 3, 1, 2, 3],
328            "b": [1, 2, 1, 4, 2, 1],
329            "c": [1, 3, 2, 1, 1, 2],
330            "d": [1, 2, 1, 1, 2, 2],
331        }
332    )
333    ddf = dd.from_pandas(df, 2)
334
335    dask_group = grouper(ddf)
336    pandas_group = grouper(df)
337
338    # covariance/correlation only works with N+1 columns
339    if isinstance(pandas_group, pd.core.groupby.SeriesGroupBy) and agg_func in (
340        "cov",
341        "corr",
342    ):
343        return
344
345    dask_agg = getattr(dask_group, agg_func)
346    pandas_agg = getattr(pandas_group, agg_func)
347
348    assert isinstance(dask_group, dd.groupby._GroupBy)
349    assert isinstance(pandas_group, pd.core.groupby.GroupBy)
350
351    if agg_func == "mean":
352        assert_eq(dask_agg(), pandas_agg().astype(float))
353    else:
354        a = dask_agg()
355        with warnings.catch_warnings():
356            # pandas does `.cov([[1], [1]])` which numpy warns on (all NaN).
357            # Pandas does strange things with exceptions in groupby.
358            warnings.simplefilter("ignore", RuntimeWarning)
359            b = pandas_agg()
360        assert_eq(a, b)
361
362
363def test_groupby_multilevel_agg():
364    df = pd.DataFrame(
365        {
366            "a": [1, 2, 3, 1, 2, 3],
367            "b": [1, 2, 1, 4, 2, 1],
368            "c": [1, 3, 2, 1, 1, 2],
369            "d": [1, 2, 1, 1, 2, 2],
370        }
371    )
372    ddf = dd.from_pandas(df, 2)
373
374    sol = df.groupby(["a"]).mean()
375    res = ddf.groupby(["a"]).mean()
376    assert_eq(res, sol)
377
378    sol = df.groupby(["a", "c"]).mean()
379    res = ddf.groupby(["a", "c"]).mean()
380    assert_eq(res, sol)
381
382    sol = df.groupby([df["a"], df["c"]]).mean()
383    res = ddf.groupby([ddf["a"], ddf["c"]]).mean()
384    assert_eq(res, sol)
385
386
387def test_groupby_get_group():
388    dsk = {
389        ("x", 0): pd.DataFrame({"a": [1, 2, 6], "b": [4, 2, 7]}, index=[0, 1, 3]),
390        ("x", 1): pd.DataFrame({"a": [4, 2, 6], "b": [3, 3, 1]}, index=[5, 6, 8]),
391        ("x", 2): pd.DataFrame({"a": [4, 3, 7], "b": [1, 1, 3]}, index=[9, 9, 9]),
392    }
393    meta = dsk[("x", 0)]
394    d = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9])
395    full = d.compute()
396
397    for ddkey, pdkey in [("b", "b"), (d.b, full.b), (d.b + 1, full.b + 1)]:
398        ddgrouped = d.groupby(ddkey)
399        pdgrouped = full.groupby(pdkey)
400        # DataFrame
401        assert_eq(ddgrouped.get_group(2), pdgrouped.get_group(2))
402        assert_eq(ddgrouped.get_group(3), pdgrouped.get_group(3))
403        # Series
404        assert_eq(ddgrouped.a.get_group(3), pdgrouped.a.get_group(3))
405        assert_eq(ddgrouped.a.get_group(2), pdgrouped.a.get_group(2))
406
407
408def test_dataframe_groupby_nunique():
409    strings = list("aaabbccccdddeee")
410    data = np.random.randn(len(strings))
411    ps = pd.DataFrame(dict(strings=strings, data=data))
412    s = dd.from_pandas(ps, npartitions=3)
413    expected = ps.groupby("strings")["data"].nunique()
414    assert_eq(s.groupby("strings")["data"].nunique(), expected)
415
416
417def test_dataframe_groupby_nunique_across_group_same_value():
418    strings = list("aaabbccccdddeee")
419    data = list(map(int, "123111223323412"))
420    ps = pd.DataFrame(dict(strings=strings, data=data))
421    s = dd.from_pandas(ps, npartitions=3)
422    expected = ps.groupby("strings")["data"].nunique()
423    assert_eq(s.groupby("strings")["data"].nunique(), expected)
424
425
426def test_series_groupby_propagates_names():
427    df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
428    ddf = dd.from_pandas(df, 2)
429    func = lambda df: df["y"].sum()
430    with pytest.warns(UserWarning):  # meta inference
431        result = ddf.groupby("x").apply(func)
432    expected = df.groupby("x").apply(func)
433    assert_eq(result, expected)
434
435
436def test_series_groupby():
437    s = pd.Series([1, 2, 2, 1, 1])
438    pd_group = s.groupby(s)
439
440    ss = dd.from_pandas(s, npartitions=2)
441    dask_group = ss.groupby(ss)
442
443    pd_group2 = s.groupby(s + 1)
444    dask_group2 = ss.groupby(ss + 1)
445
446    for dg, pdg in [(dask_group, pd_group), (pd_group2, dask_group2)]:
447        assert_eq(dg.count(), pdg.count())
448        assert_eq(dg.sum(), pdg.sum())
449        assert_eq(dg.min(), pdg.min())
450        assert_eq(dg.max(), pdg.max())
451        assert_eq(dg.size(), pdg.size())
452        assert_eq(dg.first(), pdg.first())
453        assert_eq(dg.last(), pdg.last())
454        assert_eq(dg.prod(), pdg.prod())
455
456
457def test_series_groupby_errors():
458    s = pd.Series([1, 2, 2, 1, 1])
459
460    ss = dd.from_pandas(s, npartitions=2)
461
462    msg = "No group keys passed!"
463    with pytest.raises(ValueError) as err:
464        s.groupby([])  # pandas
465    assert msg in str(err.value)
466    with pytest.raises(ValueError) as err:
467        ss.groupby([])  # dask should raise the same error
468    assert msg in str(err.value)
469
470    sss = dd.from_pandas(s, npartitions=5)
471    with pytest.raises(NotImplementedError):
472        ss.groupby(sss)
473
474    with pytest.raises(KeyError):
475        s.groupby("x")  # pandas
476    with pytest.raises(KeyError):
477        ss.groupby("x")  # dask should raise the same error
478
479
480def test_groupby_index_array():
481    df = _compat.makeTimeDataFrame()
482    ddf = dd.from_pandas(df, npartitions=2)
483
484    # first select column, then group
485    assert_eq(
486        df.A.groupby(df.index.month).nunique(),
487        ddf.A.groupby(ddf.index.month).nunique(),
488        check_names=False,
489    )
490
491    # first group, then select column
492    assert_eq(
493        df.groupby(df.index.month).A.nunique(),
494        ddf.groupby(ddf.index.month).A.nunique(),
495        check_names=False,
496    )
497
498
499def test_groupby_set_index():
500    df = _compat.makeTimeDataFrame()
501    ddf = dd.from_pandas(df, npartitions=2)
502    pytest.raises(TypeError, lambda: ddf.groupby(df.index.month, as_index=False))
503
504
505@pytest.mark.parametrize("empty", [True, False])
506def test_split_apply_combine_on_series(empty):
507    if empty:
508        pdf = pd.DataFrame({"a": [1.0], "b": [1.0]}, index=[0]).iloc[:0]
509        # There's a bug in pandas where df.groupby(...).var(ddof=0) results in
510        # no columns. Just skip these checks for now.
511        ddofs = []
512    else:
513        ddofs = [0, 1, 2]
514        pdf = pd.DataFrame(
515            {"a": [1, 2, 6, 4, 4, 6, 4, 3, 7], "b": [4, 2, 7, 3, 3, 1, 1, 1, 2]},
516            index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
517        )
518    ddf = dd.from_pandas(pdf, npartitions=3)
519
520    for ddkey, pdkey in [("b", "b"), (ddf.b, pdf.b), (ddf.b + 1, pdf.b + 1)]:
521        assert_eq(ddf.groupby(ddkey).a.min(), pdf.groupby(pdkey).a.min())
522        assert_eq(ddf.groupby(ddkey).a.max(), pdf.groupby(pdkey).a.max())
523        assert_eq(ddf.groupby(ddkey).a.count(), pdf.groupby(pdkey).a.count())
524        assert_eq(ddf.groupby(ddkey).a.mean(), pdf.groupby(pdkey).a.mean())
525        assert_eq(ddf.groupby(ddkey).a.nunique(), pdf.groupby(pdkey).a.nunique())
526        assert_eq(ddf.groupby(ddkey).a.size(), pdf.groupby(pdkey).a.size())
527        assert_eq(ddf.groupby(ddkey).a.first(), pdf.groupby(pdkey).a.first())
528        assert_eq(ddf.groupby(ddkey).a.last(), pdf.groupby(pdkey).a.last())
529        assert_eq(ddf.groupby(ddkey).a.tail(), pdf.groupby(pdkey).a.tail())
530        assert_eq(ddf.groupby(ddkey).a.head(), pdf.groupby(pdkey).a.head())
531        for ddof in ddofs:
532            assert_eq(ddf.groupby(ddkey).a.var(ddof), pdf.groupby(pdkey).a.var(ddof))
533            assert_eq(ddf.groupby(ddkey).a.std(ddof), pdf.groupby(pdkey).a.std(ddof))
534
535        assert_eq(ddf.groupby(ddkey).sum(), pdf.groupby(pdkey).sum())
536        assert_eq(ddf.groupby(ddkey).min(), pdf.groupby(pdkey).min())
537        assert_eq(ddf.groupby(ddkey).max(), pdf.groupby(pdkey).max())
538        assert_eq(ddf.groupby(ddkey).count(), pdf.groupby(pdkey).count())
539        assert_eq(ddf.groupby(ddkey).mean(), pdf.groupby(pdkey).mean())
540        assert_eq(ddf.groupby(ddkey).size(), pdf.groupby(pdkey).size())
541        assert_eq(ddf.groupby(ddkey).first(), pdf.groupby(pdkey).first())
542        assert_eq(ddf.groupby(ddkey).last(), pdf.groupby(pdkey).last())
543        assert_eq(ddf.groupby(ddkey).prod(), pdf.groupby(pdkey).prod())
544
545        for ddof in ddofs:
546            assert_eq(
547                ddf.groupby(ddkey).var(ddof),
548                pdf.groupby(pdkey).var(ddof),
549                check_dtype=False,
550            )
551            assert_eq(
552                ddf.groupby(ddkey).std(ddof),
553                pdf.groupby(pdkey).std(ddof),
554                check_dtype=False,
555            )
556
557    for ddkey, pdkey in [(ddf.b, pdf.b), (ddf.b + 1, pdf.b + 1)]:
558        assert_eq(
559            ddf.a.groupby(ddkey).sum(), pdf.a.groupby(pdkey).sum(), check_names=False
560        )
561        assert_eq(
562            ddf.a.groupby(ddkey).max(), pdf.a.groupby(pdkey).max(), check_names=False
563        )
564        assert_eq(
565            ddf.a.groupby(ddkey).count(),
566            pdf.a.groupby(pdkey).count(),
567            check_names=False,
568        )
569        assert_eq(
570            ddf.a.groupby(ddkey).mean(), pdf.a.groupby(pdkey).mean(), check_names=False
571        )
572        assert_eq(
573            ddf.a.groupby(ddkey).nunique(),
574            pdf.a.groupby(pdkey).nunique(),
575            check_names=False,
576        )
577        assert_eq(
578            ddf.a.groupby(ddkey).first(),
579            pdf.a.groupby(pdkey).first(),
580            check_names=False,
581        )
582        assert_eq(
583            ddf.a.groupby(ddkey).last(), pdf.a.groupby(pdkey).last(), check_names=False
584        )
585        assert_eq(
586            ddf.a.groupby(ddkey).prod(), pdf.a.groupby(pdkey).prod(), check_names=False
587        )
588
589        for ddof in ddofs:
590            assert_eq(ddf.a.groupby(ddkey).var(ddof), pdf.a.groupby(pdkey).var(ddof))
591            assert_eq(ddf.a.groupby(ddkey).std(ddof), pdf.a.groupby(pdkey).std(ddof))
592
593    for i in [0, 4, 7]:
594        assert_eq(ddf.groupby(ddf.b > i).a.sum(), pdf.groupby(pdf.b > i).a.sum())
595        assert_eq(ddf.groupby(ddf.b > i).a.min(), pdf.groupby(pdf.b > i).a.min())
596        assert_eq(ddf.groupby(ddf.b > i).a.max(), pdf.groupby(pdf.b > i).a.max())
597        assert_eq(ddf.groupby(ddf.b > i).a.count(), pdf.groupby(pdf.b > i).a.count())
598        assert_eq(ddf.groupby(ddf.b > i).a.mean(), pdf.groupby(pdf.b > i).a.mean())
599        assert_eq(
600            ddf.groupby(ddf.b > i).a.nunique(), pdf.groupby(pdf.b > i).a.nunique()
601        )
602        assert_eq(ddf.groupby(ddf.b > i).a.size(), pdf.groupby(pdf.b > i).a.size())
603        assert_eq(ddf.groupby(ddf.b > i).a.first(), pdf.groupby(pdf.b > i).a.first())
604        assert_eq(ddf.groupby(ddf.b > i).a.last(), pdf.groupby(pdf.b > i).a.last())
605        assert_eq(ddf.groupby(ddf.b > i).a.tail(), pdf.groupby(pdf.b > i).a.tail())
606        assert_eq(ddf.groupby(ddf.b > i).a.head(), pdf.groupby(pdf.b > i).a.head())
607        assert_eq(ddf.groupby(ddf.b > i).a.prod(), pdf.groupby(pdf.b > i).a.prod())
608
609        assert_eq(ddf.groupby(ddf.a > i).b.sum(), pdf.groupby(pdf.a > i).b.sum())
610        assert_eq(ddf.groupby(ddf.a > i).b.min(), pdf.groupby(pdf.a > i).b.min())
611        assert_eq(ddf.groupby(ddf.a > i).b.max(), pdf.groupby(pdf.a > i).b.max())
612        assert_eq(ddf.groupby(ddf.a > i).b.count(), pdf.groupby(pdf.a > i).b.count())
613        assert_eq(ddf.groupby(ddf.a > i).b.mean(), pdf.groupby(pdf.a > i).b.mean())
614        assert_eq(
615            ddf.groupby(ddf.a > i).b.nunique(), pdf.groupby(pdf.a > i).b.nunique()
616        )
617        assert_eq(ddf.groupby(ddf.b > i).b.size(), pdf.groupby(pdf.b > i).b.size())
618        assert_eq(ddf.groupby(ddf.b > i).b.first(), pdf.groupby(pdf.b > i).b.first())
619        assert_eq(ddf.groupby(ddf.b > i).b.last(), pdf.groupby(pdf.b > i).b.last())
620        assert_eq(ddf.groupby(ddf.b > i).b.tail(), pdf.groupby(pdf.b > i).b.tail())
621        assert_eq(ddf.groupby(ddf.b > i).b.head(), pdf.groupby(pdf.b > i).b.head())
622        assert_eq(ddf.groupby(ddf.b > i).b.prod(), pdf.groupby(pdf.b > i).b.prod())
623
624        assert_eq(ddf.groupby(ddf.b > i).sum(), pdf.groupby(pdf.b > i).sum())
625        assert_eq(ddf.groupby(ddf.b > i).min(), pdf.groupby(pdf.b > i).min())
626        assert_eq(ddf.groupby(ddf.b > i).max(), pdf.groupby(pdf.b > i).max())
627        assert_eq(ddf.groupby(ddf.b > i).count(), pdf.groupby(pdf.b > i).count())
628        assert_eq(ddf.groupby(ddf.b > i).mean(), pdf.groupby(pdf.b > i).mean())
629        assert_eq(ddf.groupby(ddf.b > i).size(), pdf.groupby(pdf.b > i).size())
630        assert_eq(ddf.groupby(ddf.b > i).first(), pdf.groupby(pdf.b > i).first())
631        assert_eq(ddf.groupby(ddf.b > i).last(), pdf.groupby(pdf.b > i).last())
632        assert_eq(ddf.groupby(ddf.b > i).prod(), pdf.groupby(pdf.b > i).prod())
633
634        assert_eq(ddf.groupby(ddf.a > i).sum(), pdf.groupby(pdf.a > i).sum())
635        assert_eq(ddf.groupby(ddf.a > i).min(), pdf.groupby(pdf.a > i).min())
636        assert_eq(ddf.groupby(ddf.a > i).max(), pdf.groupby(pdf.a > i).max())
637        assert_eq(ddf.groupby(ddf.a > i).count(), pdf.groupby(pdf.a > i).count())
638        assert_eq(ddf.groupby(ddf.a > i).mean(), pdf.groupby(pdf.a > i).mean())
639        assert_eq(ddf.groupby(ddf.a > i).size(), pdf.groupby(pdf.a > i).size())
640        assert_eq(ddf.groupby(ddf.a > i).first(), pdf.groupby(pdf.a > i).first())
641        assert_eq(ddf.groupby(ddf.a > i).last(), pdf.groupby(pdf.a > i).last())
642        assert_eq(ddf.groupby(ddf.a > i).prod(), pdf.groupby(pdf.a > i).prod())
643
644        for ddof in ddofs:
645            assert_eq(
646                ddf.groupby(ddf.b > i).std(ddof), pdf.groupby(pdf.b > i).std(ddof)
647            )
648
649    for ddkey, pdkey in [
650        ("a", "a"),
651        (ddf.a, pdf.a),
652        (ddf.a + 1, pdf.a + 1),
653        (ddf.a > 3, pdf.a > 3),
654    ]:
655        assert_eq(ddf.groupby(ddkey).b.sum(), pdf.groupby(pdkey).b.sum())
656        assert_eq(ddf.groupby(ddkey).b.min(), pdf.groupby(pdkey).b.min())
657        assert_eq(ddf.groupby(ddkey).b.max(), pdf.groupby(pdkey).b.max())
658        assert_eq(ddf.groupby(ddkey).b.count(), pdf.groupby(pdkey).b.count())
659        assert_eq(ddf.groupby(ddkey).b.mean(), pdf.groupby(pdkey).b.mean())
660        assert_eq(ddf.groupby(ddkey).b.nunique(), pdf.groupby(pdkey).b.nunique())
661        assert_eq(ddf.groupby(ddkey).b.size(), pdf.groupby(pdkey).b.size())
662        assert_eq(ddf.groupby(ddkey).b.first(), pdf.groupby(pdkey).b.first())
663        assert_eq(ddf.groupby(ddkey).last(), pdf.groupby(pdkey).last())
664        assert_eq(ddf.groupby(ddkey).prod(), pdf.groupby(pdkey).prod())
665
666        assert_eq(ddf.groupby(ddkey).sum(), pdf.groupby(pdkey).sum())
667        assert_eq(ddf.groupby(ddkey).min(), pdf.groupby(pdkey).min())
668        assert_eq(ddf.groupby(ddkey).max(), pdf.groupby(pdkey).max())
669        assert_eq(ddf.groupby(ddkey).count(), pdf.groupby(pdkey).count())
670        assert_eq(ddf.groupby(ddkey).mean(), pdf.groupby(pdkey).mean().astype(float))
671        assert_eq(ddf.groupby(ddkey).size(), pdf.groupby(pdkey).size())
672        assert_eq(ddf.groupby(ddkey).first(), pdf.groupby(pdkey).first())
673        assert_eq(ddf.groupby(ddkey).last(), pdf.groupby(pdkey).last())
674        assert_eq(ddf.groupby(ddkey).prod(), pdf.groupby(pdkey).prod())
675
676        for ddof in ddofs:
677            assert_eq(ddf.groupby(ddkey).b.std(ddof), pdf.groupby(pdkey).b.std(ddof))
678
679    assert sorted(ddf.groupby("b").a.sum().dask) == sorted(
680        ddf.groupby("b").a.sum().dask
681    )
682    assert sorted(ddf.groupby(ddf.a > 3).b.mean().dask) == sorted(
683        ddf.groupby(ddf.a > 3).b.mean().dask
684    )
685
686    # test raises with incorrect key
687    pytest.raises(KeyError, lambda: ddf.groupby("x"))
688    pytest.raises(KeyError, lambda: ddf.groupby(["a", "x"]))
689    pytest.raises(KeyError, lambda: ddf.groupby("a")["x"])
690    with warnings.catch_warnings():
691        # pandas warns about using tuples before throwing the KeyError
692        warnings.simplefilter("ignore", FutureWarning)
693        pytest.raises(KeyError, lambda: ddf.groupby("a")["b", "x"])
694    pytest.raises(KeyError, lambda: ddf.groupby("a")[["b", "x"]])
695
696    # test graph node labels
697    assert_dask_graph(ddf.groupby("b").a.sum(), "series-groupby-sum")
698    assert_dask_graph(ddf.groupby("b").a.min(), "series-groupby-min")
699    assert_dask_graph(ddf.groupby("b").a.max(), "series-groupby-max")
700    assert_dask_graph(ddf.groupby("b").a.count(), "series-groupby-count")
701    assert_dask_graph(ddf.groupby("b").a.var(), "series-groupby-var")
702    assert_dask_graph(ddf.groupby("b").a.cov(), "series-groupby-cov")
703    assert_dask_graph(ddf.groupby("b").a.first(), "series-groupby-first")
704    assert_dask_graph(ddf.groupby("b").a.last(), "series-groupby-last")
705    assert_dask_graph(ddf.groupby("b").a.tail(), "series-groupby-tail")
706    assert_dask_graph(ddf.groupby("b").a.head(), "series-groupby-head")
707    assert_dask_graph(ddf.groupby("b").a.prod(), "series-groupby-prod")
708    # mean consists from sum and count operations
709    assert_dask_graph(ddf.groupby("b").a.mean(), "series-groupby-sum")
710    assert_dask_graph(ddf.groupby("b").a.mean(), "series-groupby-count")
711    assert_dask_graph(ddf.groupby("b").a.nunique(), "series-groupby-nunique")
712    assert_dask_graph(ddf.groupby("b").a.size(), "series-groupby-size")
713
714    assert_dask_graph(ddf.groupby("b").sum(), "dataframe-groupby-sum")
715    assert_dask_graph(ddf.groupby("b").min(), "dataframe-groupby-min")
716    assert_dask_graph(ddf.groupby("b").max(), "dataframe-groupby-max")
717    assert_dask_graph(ddf.groupby("b").count(), "dataframe-groupby-count")
718    assert_dask_graph(ddf.groupby("b").first(), "dataframe-groupby-first")
719    assert_dask_graph(ddf.groupby("b").last(), "dataframe-groupby-last")
720    assert_dask_graph(ddf.groupby("b").prod(), "dataframe-groupby-prod")
721    # mean consists from sum and count operations
722    assert_dask_graph(ddf.groupby("b").mean(), "dataframe-groupby-sum")
723    assert_dask_graph(ddf.groupby("b").mean(), "dataframe-groupby-count")
724    assert_dask_graph(ddf.groupby("b").size(), "dataframe-groupby-size")
725
726
727@pytest.mark.parametrize("keyword", ["split_every", "split_out"])
728def test_groupby_reduction_split(keyword):
729    pdf = pd.DataFrame(
730        {"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 100, "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 100}
731    )
732    ddf = dd.from_pandas(pdf, npartitions=15)
733
734    def call(g, m, **kwargs):
735        return getattr(g, m)(**kwargs)
736
737    # DataFrame
738    for m in AGG_FUNCS:
739        # nunique is not implemented for DataFrameGroupBy
740        # covariance/correlation is not a series aggregation
741        if m in ("nunique", "cov", "corr"):
742            continue
743        res = call(ddf.groupby("b"), m, **{keyword: 2})
744        sol = call(pdf.groupby("b"), m)
745        assert_eq(res, sol)
746        assert call(ddf.groupby("b"), m)._name != res._name
747
748    res = call(ddf.groupby("b"), "var", ddof=2, **{keyword: 2})
749    sol = call(pdf.groupby("b"), "var", ddof=2)
750    assert_eq(res, sol)
751    assert call(ddf.groupby("b"), "var", ddof=2)._name != res._name
752
753    # Series, post select
754    for m in AGG_FUNCS:
755        # covariance/correlation is not a series aggregation
756        if m in ("cov", "corr"):
757            continue
758        res = call(ddf.groupby("b").a, m, **{keyword: 2})
759        sol = call(pdf.groupby("b").a, m)
760        assert_eq(res, sol)
761        assert call(ddf.groupby("b").a, m)._name != res._name
762
763    res = call(ddf.groupby("b").a, "var", ddof=2, **{keyword: 2})
764    sol = call(pdf.groupby("b").a, "var", ddof=2)
765    assert_eq(res, sol)
766    assert call(ddf.groupby("b").a, "var", ddof=2)._name != res._name
767
768    # Series, pre select
769    for m in AGG_FUNCS:
770        # covariance/correlation is not a series aggregation
771        if m in ("cov", "corr"):
772            continue
773        res = call(ddf.a.groupby(ddf.b), m, **{keyword: 2})
774        sol = call(pdf.a.groupby(pdf.b), m)
775        # There's a bug in pandas 0.18.0 with `pdf.a.groupby(pdf.b).count()`
776        # not forwarding the series name. Skip name checks here for now.
777        assert_eq(res, sol, check_names=False)
778        assert call(ddf.a.groupby(ddf.b), m)._name != res._name
779
780    res = call(ddf.a.groupby(ddf.b), "var", ddof=2, **{keyword: 2})
781    sol = call(pdf.a.groupby(pdf.b), "var", ddof=2)
782
783    assert_eq(res, sol)
784    assert call(ddf.a.groupby(ddf.b), "var", ddof=2)._name != res._name
785
786
787@pytest.mark.parametrize(
788    "grouped",
789    [
790        lambda df: df.groupby("A"),
791        lambda df: df.groupby(df["A"]),
792        lambda df: df.groupby(df["A"] + 1),
793        lambda df: df.groupby("A")["B"],
794        # SeriesGroupBy:
795        lambda df: df.groupby("A")["B"],
796        lambda df: df.groupby(df["A"])["B"],
797        lambda df: df.groupby(df["A"] + 1)["B"],
798        # Series.groupby():
799        lambda df: df.B.groupby(df["A"]),
800        lambda df: df.B.groupby(df["A"] + 1),
801        # DataFrameGroupBy with column slice:
802        lambda df: df.groupby("A")[["B", "C"]],
803        lambda df: df.groupby(df["A"])[["B", "C"]],
804        lambda df: df.groupby(df["A"] + 1)[["B", "C"]],
805    ],
806)
807@pytest.mark.parametrize(
808    "func",
809    [
810        lambda grp: grp.apply(lambda x: x.sum()),
811        lambda grp: grp.transform(lambda x: x.sum()),
812    ],
813)
814def test_apply_or_transform_shuffle(grouped, func):
815    pdf = pd.DataFrame(
816        {
817            "A": [1, 2, 3, 4] * 5,
818            "B": np.random.randn(20),
819            "C": np.random.randn(20),
820            "D": np.random.randn(20),
821        }
822    )
823    ddf = dd.from_pandas(pdf, 3)
824
825    with pytest.warns(UserWarning):  # meta inference
826        assert_eq(func(grouped(pdf)), func(grouped(ddf)))
827
828
829@pytest.mark.parametrize(
830    "grouper",
831    [
832        lambda df: "AA",
833        lambda df: ["AA", "AB"],
834        lambda df: df["AA"],
835        lambda df: [df["AA"], df["AB"]],
836        lambda df: df["AA"] + 1,
837        pytest.param(
838            lambda df: [df["AA"] + 1, df["AB"] + 1],
839            marks=pytest.mark.xfail("NotImplemented"),
840        ),
841    ],
842)
843@pytest.mark.parametrize(
844    "func",
845    [
846        lambda grouped: grouped.apply(lambda x: x.sum()),
847        lambda grouped: grouped.transform(lambda x: x.sum()),
848    ],
849)
850def test_apply_or_transform_shuffle_multilevel(grouper, func):
851    pdf = pd.DataFrame(
852        {
853            "AB": [1, 2, 3, 4] * 5,
854            "AA": [1, 2, 3, 4] * 5,
855            "B": np.random.randn(20),
856            "C": np.random.randn(20),
857            "D": np.random.randn(20),
858        }
859    )
860    ddf = dd.from_pandas(pdf, 3)
861
862    with pytest.warns(UserWarning):
863        # DataFrameGroupBy
864        assert_eq(func(ddf.groupby(grouper(ddf))), func(pdf.groupby(grouper(pdf))))
865
866        # SeriesGroupBy
867        assert_eq(
868            func(ddf.groupby(grouper(ddf))["B"]), func(pdf.groupby(grouper(pdf))["B"])
869        )
870
871        # DataFrameGroupBy with column slice
872        assert_eq(
873            func(ddf.groupby(grouper(ddf))[["B", "C"]]),
874            func(pdf.groupby(grouper(pdf))[["B", "C"]]),
875        )
876
877
878def test_numeric_column_names():
879    # df.groupby(0)[df.columns] fails if all columns are numbers (pandas bug)
880    # This ensures that we cast all column iterables to list beforehand.
881    df = pd.DataFrame({0: [0, 1, 0, 1], 1: [1, 2, 3, 4], 2: [0, 1, 0, 1]})
882    ddf = dd.from_pandas(df, npartitions=2)
883    assert_eq(ddf.groupby(0).sum(), df.groupby(0).sum())
884    assert_eq(ddf.groupby([0, 2]).sum(), df.groupby([0, 2]).sum())
885    assert_eq(
886        ddf.groupby(0).apply(lambda x: x, meta={0: int, 1: int, 2: int}),
887        df.groupby(0).apply(lambda x: x),
888    )
889
890
891def test_groupby_apply_tasks(shuffle_method):
892    if shuffle_method == "disk":
893        pytest.skip("Tasks-only shuffle test")
894
895    df = _compat.makeTimeDataFrame()
896    df["A"] = df.A // 0.1
897    df["B"] = df.B // 0.1
898    ddf = dd.from_pandas(df, npartitions=10)
899
900    for ind in [lambda x: "A", lambda x: x.A]:
901        a = df.groupby(ind(df)).apply(len)
902        with pytest.warns(UserWarning):
903            b = ddf.groupby(ind(ddf)).apply(len)
904        assert_eq(a, b.compute())
905        assert not any("partd" in k[0] for k in b.dask)
906
907        a = df.groupby(ind(df)).B.apply(len)
908        with pytest.warns(UserWarning):
909            b = ddf.groupby(ind(ddf)).B.apply(len)
910        assert_eq(a, b.compute())
911        assert not any("partd" in k[0] for k in b.dask)
912
913
914def test_groupby_multiprocessing():
915    df = pd.DataFrame({"A": [1, 2, 3, 4, 5], "B": ["1", "1", "a", "a", "a"]})
916    ddf = dd.from_pandas(df, npartitions=3)
917    with dask.config.set(scheduler="processes"):
918        assert_eq(
919            ddf.groupby("B").apply(lambda x: x, meta={"A": int, "B": object}),
920            df.groupby("B").apply(lambda x: x),
921        )
922
923
924def test_groupby_normalize_index():
925    full = pd.DataFrame(
926        {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
927        index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
928    )
929    d = dd.from_pandas(full, npartitions=3)
930
931    assert d.groupby("a").index == "a"
932    assert d.groupby(d["a"]).index == "a"
933    assert d.groupby(d["a"] > 2).index._name == (d["a"] > 2)._name
934    assert d.groupby(["a", "b"]).index == ["a", "b"]
935
936    assert d.groupby([d["a"], d["b"]]).index == ["a", "b"]
937    assert d.groupby([d["a"], "b"]).index == ["a", "b"]
938
939
940def test_aggregate__single_element_groups(agg_func):
941    spec = agg_func
942
943    # nunique/cov is not supported in specs
944    if spec in ("nunique", "cov", "corr"):
945        return
946
947    pdf = pd.DataFrame(
948        {"a": [1, 1, 3, 3], "b": [4, 4, 16, 16], "c": [1, 1, 4, 4], "d": [1, 1, 3, 3]},
949        columns=["c", "b", "a", "d"],
950    )
951    ddf = dd.from_pandas(pdf, npartitions=3)
952
953    expected = pdf.groupby(["a", "d"]).agg(spec)
954
955    # NOTE: for std the result is not recast ot the original dtype
956    if spec in {"mean", "var"}:
957        expected = expected.astype(float)
958
959    assert_eq(expected, ddf.groupby(["a", "d"]).agg(spec))
960
961
962def test_aggregate_build_agg_args__reuse_of_intermediates():
963    """Aggregate reuses intermediates. For example, with sum, count, and mean
964    the sums and counts are only calculated once across the graph and reused to
965    compute the mean.
966    """
967    from dask.dataframe.groupby import _build_agg_args
968
969    no_mean_spec = [("foo", "sum", "input"), ("bar", "count", "input")]
970
971    with_mean_spec = [
972        ("foo", "sum", "input"),
973        ("bar", "count", "input"),
974        ("baz", "mean", "input"),
975    ]
976
977    no_mean_chunks, no_mean_aggs, no_mean_finalizers = _build_agg_args(no_mean_spec)
978    with_mean_chunks, with_mean_aggs, with_mean_finalizers = _build_agg_args(
979        with_mean_spec
980    )
981
982    assert len(no_mean_chunks) == len(with_mean_chunks)
983    assert len(no_mean_aggs) == len(with_mean_aggs)
984
985    assert len(no_mean_finalizers) == len(no_mean_spec)
986    assert len(with_mean_finalizers) == len(with_mean_spec)
987
988
989def test_aggregate_dask():
990    dask_holder = collections.namedtuple("dask_holder", ["dask"])
991    get_agg_dask = lambda obj: dask_holder(
992        {k: v for (k, v) in obj.dask.items() if k[0].startswith("aggregate")}
993    )
994
995    specs = [
996        {"b": {"c": "mean"}, "c": {"a": "max", "b": "min"}},
997        {"b": "mean", "c": ["min", "max"]},
998        [
999            "sum",
1000            "mean",
1001            "min",
1002            "max",
1003            "count",
1004            "size",
1005            "std",
1006            "var",
1007            "first",
1008            "last",
1009            "prod",
1010        ],
1011        "sum",
1012        "mean",
1013        "min",
1014        "max",
1015        "count",
1016        "std",
1017        "var",
1018        "first",
1019        "last",
1020        "prod"
1021        # NOTE: the 'size' spec is special since it bypasses aggregate
1022        # 'size'
1023    ]
1024
1025    pdf = pd.DataFrame(
1026        {
1027            "a": [1, 2, 3, 1, 1, 2, 4, 3, 7] * 100,
1028            "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 100,
1029            "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 100,
1030            "d": [3, 2, 1, 3, 2, 1, 2, 6, 4] * 100,
1031        },
1032        columns=["c", "b", "a", "d"],
1033    )
1034    ddf = dd.from_pandas(pdf, npartitions=100)
1035
1036    for spec in specs:
1037        result1 = ddf.groupby(["a", "b"]).agg(spec, split_every=2)
1038        result2 = ddf.groupby(["a", "b"]).agg(spec, split_every=2)
1039
1040        agg_dask1 = get_agg_dask(result1)
1041        agg_dask2 = get_agg_dask(result2)
1042
1043        # check that the number of partitions used is fixed by split_every
1044        assert_max_deps(agg_dask1, 2)
1045        assert_max_deps(agg_dask2, 2)
1046
1047        # check for deterministic key names and values
1048        assert agg_dask1 == agg_dask2
1049
1050        # the length of the dask does not depend on the passed spec
1051        for other_spec in specs:
1052            other = ddf.groupby(["a", "b"]).agg(other_spec, split_every=2)
1053            assert len(other.dask) == len(result1.dask)
1054            assert len(other.dask) == len(result2.dask)
1055
1056
1057@pytest.mark.parametrize(
1058    "grouper",
1059    [
1060        lambda df: ["a"],
1061        lambda df: ["a", "b"],
1062        lambda df: df["a"],
1063        lambda df: [df["a"], df["b"]],
1064        lambda df: [df["a"] > 2, df["b"] > 1],
1065    ],
1066)
1067def test_dataframe_aggregations_multilevel(grouper, agg_func):
1068    def call(g, m, **kwargs):
1069        return getattr(g, m)(**kwargs)
1070
1071    pdf = pd.DataFrame(
1072        {
1073            "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
1074            "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
1075            "d": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
1076            "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
1077        },
1078        columns=["c", "b", "a", "d"],
1079    )
1080
1081    ddf = dd.from_pandas(pdf, npartitions=10)
1082
1083    # covariance only works with N+1 columns
1084    if agg_func not in ("cov", "corr"):
1085        assert_eq(
1086            call(pdf.groupby(grouper(pdf))["c"], agg_func),
1087            call(ddf.groupby(grouper(ddf))["c"], agg_func, split_every=2),
1088        )
1089
1090    # not supported by pandas
1091    if agg_func != "nunique":
1092        assert_eq(
1093            call(pdf.groupby(grouper(pdf))[["c", "d"]], agg_func),
1094            call(ddf.groupby(grouper(ddf))[["c", "d"]], agg_func, split_every=2),
1095        )
1096
1097        if agg_func in ("cov", "corr"):
1098            # there are sorting issues between pandas and chunk cov w/dask
1099            df = call(pdf.groupby(grouper(pdf)), agg_func).sort_index()
1100            cols = sorted(list(df.columns))
1101            df = df[cols]
1102            dddf = call(ddf.groupby(grouper(ddf)), agg_func, split_every=2).compute()
1103            dddf = dddf.sort_index()
1104            cols = sorted(list(dddf.columns))
1105            dddf = dddf[cols]
1106            assert_eq(df, dddf)
1107        else:
1108            assert_eq(
1109                call(pdf.groupby(grouper(pdf)), agg_func),
1110                call(ddf.groupby(grouper(ddf)), agg_func, split_every=2),
1111            )
1112
1113
1114@pytest.mark.parametrize(
1115    "grouper",
1116    [
1117        lambda df: df["a"],
1118        lambda df: [df["a"], df["b"]],
1119        lambda df: [df["a"] > 2, df["b"] > 1],
1120    ],
1121)
1122def test_series_aggregations_multilevel(grouper, agg_func):
1123    """
1124    similar to ``test_dataframe_aggregations_multilevel``, but series do not
1125    support all groupby args.
1126    """
1127
1128    def call(g, m, **kwargs):
1129        return getattr(g, m)(**kwargs)
1130
1131    # covariance/correlation is not a series aggregation
1132    if agg_func in ("cov", "corr"):
1133        return
1134
1135    pdf = pd.DataFrame(
1136        {
1137            "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
1138            "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
1139            "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
1140        },
1141        columns=["c", "b", "a"],
1142    )
1143
1144    ddf = dd.from_pandas(pdf, npartitions=10)
1145
1146    assert_eq(
1147        call(pdf["c"].groupby(grouper(pdf)), agg_func),
1148        call(ddf["c"].groupby(grouper(ddf)), agg_func, split_every=2),
1149        # for pandas ~ 0.18, the name is not not properly propagated for
1150        # the mean aggregation
1151        check_names=(agg_func not in {"mean", "nunique"}),
1152    )
1153
1154
1155@pytest.mark.parametrize(
1156    "grouper",
1157    [
1158        lambda df: df["a"],
1159        lambda df: df["a"] > 2,
1160        lambda df: [df["a"], df["b"]],
1161        lambda df: [df["a"] > 2],
1162        pytest.param(
1163            lambda df: [df["a"] > 2, df["b"] > 1],
1164            marks=pytest.mark.xfail(
1165                reason="index dtype does not coincide: boolean != empty"
1166            ),
1167        ),
1168    ],
1169)
1170@pytest.mark.parametrize(
1171    "group_and_slice",
1172    [
1173        lambda df, grouper: df.groupby(grouper(df)),
1174        lambda df, grouper: df["c"].groupby(grouper(df)),
1175        lambda df, grouper: df.groupby(grouper(df))["c"],
1176    ],
1177)
1178def test_groupby_meta_content(group_and_slice, grouper):
1179    pdf = pd.DataFrame(
1180        {
1181            "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
1182            "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
1183            "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
1184        },
1185        columns=["c", "b", "a"],
1186    )
1187
1188    ddf = dd.from_pandas(pdf, npartitions=10)
1189
1190    expected = group_and_slice(pdf, grouper).first().head(0)
1191    meta = group_and_slice(ddf, grouper)._meta.first()
1192    meta_nonempty = group_and_slice(ddf, grouper)._meta_nonempty.first().head(0)
1193
1194    assert_eq(expected, meta)
1195    assert_eq(expected, meta_nonempty)
1196
1197
1198def test_groupy_non_aligned_index():
1199    pdf = pd.DataFrame(
1200        {
1201            "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
1202            "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
1203            "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
1204        },
1205        columns=["c", "b", "a"],
1206    )
1207
1208    ddf3 = dd.from_pandas(pdf, npartitions=3)
1209    ddf7 = dd.from_pandas(pdf, npartitions=7)
1210
1211    # working examples
1212    ddf3.groupby(["a", "b"])
1213    ddf3.groupby([ddf3["a"], ddf3["b"]])
1214
1215    # misaligned divisions
1216    with pytest.raises(NotImplementedError):
1217        ddf3.groupby(ddf7["a"])
1218
1219    with pytest.raises(NotImplementedError):
1220        ddf3.groupby([ddf7["a"], ddf7["b"]])
1221
1222    with pytest.raises(NotImplementedError):
1223        ddf3.groupby([ddf7["a"], ddf3["b"]])
1224
1225    with pytest.raises(NotImplementedError):
1226        ddf3.groupby([ddf3["a"], ddf7["b"]])
1227
1228    with pytest.raises(NotImplementedError):
1229        ddf3.groupby([ddf7["a"], "b"])
1230
1231
1232def test_groupy_series_wrong_grouper():
1233    df = pd.DataFrame(
1234        {
1235            "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
1236            "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
1237            "c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
1238        },
1239        columns=["c", "b", "a"],
1240    )
1241
1242    df = dd.from_pandas(df, npartitions=3)
1243    s = df["a"]
1244
1245    # working index values
1246    s.groupby(s)
1247    s.groupby([s, s])
1248
1249    # non working index values
1250    with pytest.raises(KeyError):
1251        s.groupby("foo")
1252
1253    with pytest.raises(KeyError):
1254        s.groupby([s, "foo"])
1255
1256    with pytest.raises(ValueError):
1257        s.groupby(df)
1258
1259    with pytest.raises(ValueError):
1260        s.groupby([s, df])
1261
1262
1263@pytest.mark.parametrize("npartitions", [1, 4, 20])
1264@pytest.mark.parametrize("split_every", [2, 5])
1265@pytest.mark.parametrize("split_out", [None, 1, 5, 20])
1266def test_hash_groupby_aggregate(npartitions, split_every, split_out):
1267    df = pd.DataFrame({"x": np.arange(100) % 10, "y": np.ones(100)})
1268    ddf = dd.from_pandas(df, npartitions)
1269
1270    result = ddf.groupby("x").y.var(split_every=split_every, split_out=split_out)
1271
1272    dsk = result.__dask_optimize__(result.dask, result.__dask_keys__())
1273    from dask.core import get_deps
1274
1275    dependencies, dependents = get_deps(dsk)
1276
1277    assert result.npartitions == (split_out or 1)
1278    assert len([k for k, v in dependencies.items() if not v]) == npartitions
1279
1280    assert_eq(result, df.groupby("x").y.var())
1281
1282
1283def test_split_out_multi_column_groupby():
1284    df = pd.DataFrame(
1285        {"x": np.arange(100) % 10, "y": np.ones(100), "z": [1, 2, 3, 4, 5] * 20}
1286    )
1287
1288    ddf = dd.from_pandas(df, npartitions=10)
1289
1290    result = ddf.groupby(["x", "y"]).z.mean(split_out=4)
1291    expected = df.groupby(["x", "y"]).z.mean()
1292
1293    assert_eq(result, expected, check_dtype=False)
1294
1295
1296def test_groupby_split_out_num():
1297    # GH 1841
1298    ddf = dd.from_pandas(
1299        pd.DataFrame({"A": [1, 1, 2, 2], "B": [1, 2, 3, 4]}), npartitions=2
1300    )
1301    assert ddf.groupby("A").sum().npartitions == 1
1302    assert ddf.groupby("A").sum(split_out=2).npartitions == 2
1303    assert ddf.groupby("A").sum(split_out=3).npartitions == 3
1304
1305    with pytest.raises(TypeError):
1306        # groupby doesn't accept split_out
1307        ddf.groupby("A", split_out=2)
1308
1309
1310def test_groupby_not_supported():
1311    ddf = dd.from_pandas(
1312        pd.DataFrame({"A": [1, 1, 2, 2], "B": [1, 2, 3, 4]}), npartitions=2
1313    )
1314    with pytest.raises(TypeError):
1315        ddf.groupby("A", axis=1)
1316    with pytest.raises(TypeError):
1317        ddf.groupby("A", level=1)
1318    with pytest.raises(TypeError):
1319        ddf.groupby("A", as_index=False)
1320    with pytest.raises(TypeError):
1321        ddf.groupby("A", squeeze=True)
1322
1323
1324def test_groupby_numeric_column():
1325    df = pd.DataFrame({"A": ["foo", "foo", "bar"], 0: [1, 2, 3]})
1326    ddf = dd.from_pandas(df, npartitions=3)
1327
1328    assert_eq(ddf.groupby(ddf.A)[0].sum(), df.groupby(df.A)[0].sum())
1329
1330
1331@pytest.mark.parametrize("sel", ["c", "d", ["c", "d"]])
1332@pytest.mark.parametrize("key", ["a", ["a", "b"]])
1333@pytest.mark.parametrize("func", ["cumsum", "cumprod", "cumcount"])
1334def test_cumulative(func, key, sel):
1335    df = pd.DataFrame(
1336        {
1337            "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 6,
1338            "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 6,
1339            "c": np.random.randn(54),
1340            "d": np.random.randn(54),
1341        },
1342        columns=["a", "b", "c", "d"],
1343    )
1344    df.iloc[[-18, -12, -6], -1] = np.nan
1345    ddf = dd.from_pandas(df, npartitions=10)
1346
1347    g, dg = (d.groupby(key)[sel] for d in (df, ddf))
1348    assert_eq(getattr(g, func)(), getattr(dg, func)())
1349
1350
1351@pytest.mark.parametrize("func", ["cumsum", "cumprod"])
1352def test_cumulative_axis1(func):
1353    df = pd.DataFrame(
1354        {
1355            "a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 2,
1356            "b": np.random.randn(18),
1357            "c": np.random.randn(18),
1358        }
1359    )
1360    df.iloc[-6, -1] = np.nan
1361    ddf = dd.from_pandas(df, npartitions=4)
1362    assert_eq(
1363        getattr(df.groupby("a"), func)(axis=1), getattr(ddf.groupby("a"), func)(axis=1)
1364    )
1365
1366
1367def test_groupby_unaligned_index():
1368    df = pd.DataFrame(
1369        {
1370            "a": np.random.randint(0, 10, 50),
1371            "b": np.random.randn(50),
1372            "c": np.random.randn(50),
1373        }
1374    )
1375    ddf = dd.from_pandas(df, npartitions=5)
1376    filtered = df[df.b < 0.5]
1377    dfiltered = ddf[ddf.b < 0.5]
1378
1379    ddf_group = dfiltered.groupby(ddf.a)
1380    ds_group = dfiltered.b.groupby(ddf.a)
1381
1382    bad = [
1383        ddf_group.mean(),
1384        ddf_group.var(),
1385        ddf_group.b.nunique(),
1386        ddf_group.get_group(0),
1387        ds_group.mean(),
1388        ds_group.var(),
1389        ds_group.nunique(),
1390        ds_group.get_group(0),
1391    ]
1392
1393    for obj in bad:
1394        with pytest.raises(ValueError):
1395            obj.compute()
1396
1397    def add1(x):
1398        return x + 1
1399
1400    df_group = filtered.groupby(df.a)
1401    good = [
1402        (ddf_group.apply(add1, meta=ddf), df_group.apply(add1)),
1403        (ddf_group.b.apply(add1, meta=ddf.b), df_group.b.apply(add1)),
1404    ]
1405
1406    for (res, sol) in good:
1407        assert_eq(res, sol)
1408
1409
1410def test_groupby_string_label():
1411    df = pd.DataFrame({"foo": [1, 1, 4], "B": [2, 3, 4], "C": [5, 6, 7]})
1412    ddf = dd.from_pandas(pd.DataFrame(df), npartitions=1)
1413    ddf_group = ddf.groupby("foo")
1414    result = ddf_group.get_group(1).compute()
1415
1416    expected = pd.DataFrame(
1417        {"foo": [1, 1], "B": [2, 3], "C": [5, 6]}, index=pd.Index([0, 1])
1418    )
1419
1420    tm.assert_frame_equal(result, expected)
1421
1422
1423def test_groupby_dataframe_cum_caching():
1424    """Test caching behavior of cumulative operations on grouped dataframes.
1425
1426    Relates to #3756.
1427    """
1428    df = pd.DataFrame(
1429        dict(a=list("aabbcc")), index=pd.date_range(start="20100101", periods=6)
1430    )
1431    df["ones"] = 1
1432    df["twos"] = 2
1433
1434    ddf = dd.from_pandas(df, npartitions=3)
1435
1436    ops = ["cumsum", "cumprod"]
1437
1438    for op in ops:
1439        ddf0 = getattr(ddf.groupby(["a"]), op)()
1440        ddf1 = ddf.rename(columns={"ones": "foo", "twos": "bar"})
1441        ddf1 = getattr(ddf1.groupby(["a"]), op)()
1442
1443        # _a and _b dataframe should be equal
1444        res0_a, res1_a = dask.compute(ddf0, ddf1)
1445        res0_b, res1_b = ddf0.compute(), ddf1.compute()
1446
1447        assert res0_a.equals(res0_b)
1448        assert res1_a.equals(res1_b)
1449
1450
1451def test_groupby_series_cum_caching():
1452    """Test caching behavior of cumulative operations on grouped Series
1453
1454    Relates to #3755
1455    """
1456    df = pd.DataFrame(
1457        dict(a=list("aabbcc")), index=pd.date_range(start="20100101", periods=6)
1458    )
1459    df["ones"] = 1
1460    df["twos"] = 2
1461
1462    ops = ["cumsum", "cumprod"]
1463    for op in ops:
1464        ddf = dd.from_pandas(df, npartitions=3)
1465        dcum = ddf.groupby(["a"])
1466        res0_a, res1_a = dask.compute(
1467            getattr(dcum["ones"], op)(), getattr(dcum["twos"], op)()
1468        )
1469        cum = df.groupby(["a"])
1470        res0_b, res1_b = (getattr(cum["ones"], op)(), getattr(cum["twos"], op)())
1471
1472        assert res0_a.equals(res0_b)
1473        assert res1_a.equals(res1_b)
1474
1475
1476def test_groupby_slice_agg_reduces():
1477    d = pd.DataFrame({"a": [1, 2, 3, 4], "b": [2, 3, 4, 5]})
1478    a = dd.from_pandas(d, npartitions=2)
1479    result = a.groupby("a")["b"].agg(["min", "max"])
1480    expected = d.groupby("a")["b"].agg(["min", "max"])
1481    assert_eq(result, expected)
1482
1483
1484def test_groupby_agg_grouper_single():
1485    # https://github.com/dask/dask/issues/2255
1486    d = pd.DataFrame({"a": [1, 2, 3, 4]})
1487    a = dd.from_pandas(d, npartitions=2)
1488
1489    result = a.groupby("a")["a"].agg(["min", "max"])
1490    expected = d.groupby("a")["a"].agg(["min", "max"])
1491    assert_eq(result, expected)
1492
1493
1494@pytest.mark.parametrize("slice_", ["a", ["a"], ["a", "b"], ["b"]])
1495def test_groupby_agg_grouper_multiple(slice_):
1496    # https://github.com/dask/dask/issues/2255
1497    d = pd.DataFrame({"a": [1, 2, 3, 4], "b": [1, 2, 3, 4]})
1498    a = dd.from_pandas(d, npartitions=2)
1499
1500    result = a.groupby("a")[slice_].agg(["min", "max"])
1501    expected = d.groupby("a")[slice_].agg(["min", "max"])
1502    assert_eq(result, expected)
1503
1504
1505@pytest.mark.parametrize(
1506    "agg_func",
1507    [
1508        "cumprod",
1509        "cumcount",
1510        "cumsum",
1511        "var",
1512        "sum",
1513        "mean",
1514        "count",
1515        "size",
1516        "std",
1517        "min",
1518        "max",
1519        "first",
1520        "last",
1521        "prod",
1522    ],
1523)
1524def test_groupby_column_and_index_agg_funcs(agg_func):
1525    def call(g, m, **kwargs):
1526        return getattr(g, m)(**kwargs)
1527
1528    df = pd.DataFrame(
1529        {
1530            "idx": [1, 1, 1, 2, 2, 2],
1531            "a": [1, 2, 1, 2, 1, 2],
1532            "b": np.arange(6),
1533            "c": [1, 1, 1, 2, 2, 2],
1534        }
1535    ).set_index("idx")
1536
1537    ddf = dd.from_pandas(df, npartitions=df.index.nunique())
1538    ddf_no_divs = dd.from_pandas(df, npartitions=df.index.nunique(), sort=False)
1539
1540    # Index and then column
1541
1542    # Compute expected result
1543    expected = call(df.groupby(["idx", "a"]), agg_func)
1544    if agg_func in {"mean", "var"}:
1545        expected = expected.astype(float)
1546
1547    result = call(ddf.groupby(["idx", "a"]), agg_func)
1548    assert_eq(expected, result)
1549
1550    result = call(ddf_no_divs.groupby(["idx", "a"]), agg_func)
1551    assert_eq(expected, result)
1552
1553    # apply-combine-apply aggregation functions
1554    aca_agg = {"sum", "mean", "var", "size", "std", "count", "first", "last", "prod"}
1555
1556    # Test aggregate strings
1557    if agg_func in aca_agg:
1558        result = ddf_no_divs.groupby(["idx", "a"]).agg(agg_func)
1559        assert_eq(expected, result)
1560
1561    # Column and then index
1562
1563    # Compute expected result
1564    expected = call(df.groupby(["a", "idx"]), agg_func)
1565    if agg_func in {"mean", "var"}:
1566        expected = expected.astype(float)
1567
1568    result = call(ddf.groupby(["a", "idx"]), agg_func)
1569    assert_eq(expected, result)
1570
1571    result = call(ddf_no_divs.groupby(["a", "idx"]), agg_func)
1572    assert_eq(expected, result)
1573
1574    # Test aggregate strings
1575    if agg_func in aca_agg:
1576        result = ddf_no_divs.groupby(["a", "idx"]).agg(agg_func)
1577        assert_eq(expected, result)
1578
1579    # Index only
1580
1581    # Compute expected result
1582    expected = call(df.groupby("idx"), agg_func)
1583    if agg_func in {"mean", "var"}:
1584        expected = expected.astype(float)
1585
1586    result = call(ddf.groupby("idx"), agg_func)
1587    assert_eq(expected, result)
1588
1589    result = call(ddf_no_divs.groupby("idx"), agg_func)
1590    assert_eq(expected, result)
1591
1592    # Test aggregate strings
1593    if agg_func in aca_agg:
1594        result = ddf_no_divs.groupby("idx").agg(agg_func)
1595        assert_eq(expected, result)
1596
1597
1598@pytest.mark.parametrize("group_args", [["idx", "a"], ["a", "idx"], ["idx"], "idx"])
1599@pytest.mark.parametrize(
1600    "apply_func", [np.min, np.mean, lambda s: np.max(s) - np.mean(s)]
1601)
1602def test_groupby_column_and_index_apply(group_args, apply_func):
1603    df = pd.DataFrame(
1604        {"idx": [1, 1, 1, 2, 2, 2], "a": [1, 2, 1, 2, 1, 2], "b": np.arange(6)}
1605    ).set_index("idx")
1606
1607    ddf = dd.from_pandas(df, npartitions=df.index.nunique())
1608    ddf_no_divs = dd.from_pandas(df, npartitions=df.index.nunique(), sort=False)
1609
1610    # Expected result
1611    expected = df.groupby(group_args).apply(apply_func)
1612
1613    with warnings.catch_warnings():
1614        warnings.simplefilter("ignore")
1615
1616        # Compute on dask DataFrame with divisions (no shuffling)
1617        result = ddf.groupby(group_args).apply(apply_func)
1618        assert_eq(expected, result, check_divisions=False)
1619
1620        # Check that partitioning is preserved
1621        assert ddf.divisions == result.divisions
1622
1623        # Check that no shuffling occurred.
1624        # The groupby operation should add only 1 task per partition
1625        assert len(result.dask) == (len(ddf.dask) + ddf.npartitions)
1626
1627        # Compute on dask DataFrame without divisions (requires shuffling)
1628        result = ddf_no_divs.groupby(group_args).apply(apply_func)
1629        assert_eq(expected, result, check_divisions=False)
1630
1631        # Check that divisions were preserved (all None in this case)
1632        assert ddf_no_divs.divisions == result.divisions
1633
1634        # Crude check to see if shuffling was performed.
1635        # The groupby operation should add only more than 1 task per partition
1636        assert len(result.dask) > (len(ddf_no_divs.dask) + ddf_no_divs.npartitions)
1637
1638
1639custom_mean = dd.Aggregation(
1640    "mean",
1641    lambda s: (s.count(), s.sum()),
1642    lambda s0, s1: (s0.sum(), s1.sum()),
1643    lambda s0, s1: s1 / s0,
1644)
1645
1646custom_sum = dd.Aggregation("sum", lambda s: s.sum(), lambda s0: s0.sum())
1647
1648
1649@pytest.mark.parametrize(
1650    "pandas_spec, dask_spec, check_dtype",
1651    [
1652        ({"b": "mean"}, {"b": custom_mean}, False),
1653        ({"b": "sum"}, {"b": custom_sum}, True),
1654        (["mean", "sum"], [custom_mean, custom_sum], False),
1655        ({"b": ["mean", "sum"]}, {"b": [custom_mean, custom_sum]}, False),
1656    ],
1657)
1658def test_dataframe_groupby_agg_custom_sum(pandas_spec, dask_spec, check_dtype):
1659    df = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3})
1660    ddf = dd.from_pandas(df, npartitions=2)
1661
1662    expected = df.groupby("g").aggregate(pandas_spec)
1663    result = ddf.groupby("g").aggregate(dask_spec)
1664
1665    assert_eq(result, expected, check_dtype=check_dtype)
1666
1667
1668@pytest.mark.parametrize(
1669    "pandas_spec, dask_spec",
1670    [
1671        ("mean", custom_mean),
1672        (["mean"], [custom_mean]),
1673        (["mean", "sum"], [custom_mean, custom_sum]),
1674    ],
1675)
1676def test_series_groupby_agg_custom_mean(pandas_spec, dask_spec):
1677    d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3})
1678    a = dd.from_pandas(d, npartitions=2)
1679
1680    expected = d["b"].groupby(d["g"]).aggregate(pandas_spec)
1681    result = a["b"].groupby(a["g"]).aggregate(dask_spec)
1682
1683    assert_eq(result, expected, check_dtype=False)
1684
1685
1686def test_groupby_agg_custom__name_clash_with_internal_same_column():
1687    """for a single input column only unique names are allowed"""
1688    d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3})
1689    a = dd.from_pandas(d, npartitions=2)
1690
1691    agg_func = dd.Aggregation("sum", lambda s: s.sum(), lambda s0: s0.sum())
1692
1693    with pytest.raises(ValueError):
1694        a.groupby("g").aggregate({"b": [agg_func, "sum"]})
1695
1696
1697def test_groupby_agg_custom__name_clash_with_internal_different_column():
1698    """custom aggregation functions can share the name of a builtin function"""
1699    d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3, "c": [4, 5, 6] * 3})
1700    a = dd.from_pandas(d, npartitions=2)
1701
1702    # NOTE: this function is purposefully misnamed
1703    agg_func = dd.Aggregation(
1704        "sum",
1705        lambda s: (s.count(), s.sum()),
1706        lambda s0, s1: (s0.sum(), s1.sum()),
1707        lambda s0, s1: s1 / s0,
1708    )
1709
1710    # NOTE: the name of agg-func is suppressed in the output,
1711    # since only a single agg func per column was specified
1712    result = a.groupby("g").aggregate({"b": agg_func, "c": "sum"})
1713    expected = d.groupby("g").aggregate({"b": "mean", "c": "sum"})
1714
1715    assert_eq(result, expected, check_dtype=False)
1716
1717
1718def test_groupby_agg_custom__mode():
1719    # mode function passing intermediates as pure python objects around. to protect
1720    # results from pandas in apply use return results as single-item lists
1721    def agg_mode(s):
1722        def impl(s):
1723            (res,) = s.iloc[0]
1724
1725            for (i,) in s.iloc[1:]:
1726                res = res.add(i, fill_value=0)
1727
1728            return [res]
1729
1730        return s.apply(impl)
1731
1732    agg_func = dd.Aggregation(
1733        "custom_mode",
1734        lambda s: s.apply(lambda s: [s.value_counts()]),
1735        agg_mode,
1736        lambda s: s.map(lambda i: i[0].idxmax()),
1737    )
1738
1739    d = pd.DataFrame(
1740        {
1741            "g0": [0, 0, 0, 1, 1] * 3,
1742            "g1": [0, 0, 0, 1, 1] * 3,
1743            "cc": [4, 5, 4, 6, 6] * 3,
1744        }
1745    )
1746    a = dd.from_pandas(d, npartitions=5)
1747
1748    actual = a["cc"].groupby([a["g0"], a["g1"]]).agg(agg_func)
1749
1750    # cheat to get the correct index
1751    expected = pd.DataFrame({"g0": [0, 1], "g1": [0, 1], "cc": [4, 6]})
1752    expected = expected["cc"].groupby([expected["g0"], expected["g1"]]).agg("sum")
1753
1754    assert_eq(actual, expected)
1755
1756
1757@pytest.mark.parametrize("func", ["var", list])
1758def test_groupby_select_column_agg(func):
1759    pdf = pd.DataFrame(
1760        {
1761            "A": [1, 2, 3, 1, 2, 3, 1, 2, 4],
1762            "B": [-0.776, -0.4, -0.873, 0.054, 1.419, -0.948, -0.967, -1.714, -0.666],
1763        }
1764    )
1765    ddf = dd.from_pandas(pdf, npartitions=4)
1766    actual = ddf.groupby("A")["B"].agg(func)
1767    expected = pdf.groupby("A")["B"].agg(func)
1768    assert_eq(actual, expected)
1769
1770
1771@pytest.mark.parametrize(
1772    "func",
1773    [
1774        lambda x: x.std(numeric_only=True),
1775        lambda x: x.groupby("x").std(),
1776        lambda x: x.groupby("x").var(),
1777        lambda x: x.groupby("x").mean(),
1778        lambda x: x.groupby("x").sum(),
1779        lambda x: x.groupby("x").z.std(),
1780    ],
1781)
1782def test_std_object_dtype(func):
1783    df = pd.DataFrame({"x": [1, 2, 1], "y": ["a", "b", "c"], "z": [11.0, 22.0, 33.0]})
1784    ddf = dd.from_pandas(df, npartitions=2)
1785
1786    assert_eq(func(df), func(ddf))
1787
1788
1789def test_std_columns_int():
1790    # Make sure std() works when index_by is a df with integer column names
1791    # Non regression test for issue #3560
1792
1793    df = pd.DataFrame({0: [5], 1: [5]})
1794    ddf = dd.from_pandas(df, npartitions=2)
1795    by = dask.array.from_array([0, 1]).to_dask_dataframe()
1796    ddf.groupby(by).std()
1797
1798
1799def test_timeseries():
1800    df = dask.datasets.timeseries().partitions[:2]
1801    assert_eq(df.groupby("name").std(), df.groupby("name").std())
1802
1803
1804@pytest.mark.parametrize("min_count", [0, 1, 2, 3])
1805def test_with_min_count(min_count):
1806    dfs = [
1807        pd.DataFrame(
1808            {
1809                "group": ["A", "A", "B"],
1810                "val1": [np.nan, 2, 3],
1811                "val2": [np.nan, 5, 6],
1812                "val3": [5, 4, 9],
1813            }
1814        ),
1815        pd.DataFrame(
1816            {
1817                "group": ["A", "A", "B"],
1818                "val1": [2, np.nan, np.nan],
1819                "val2": [np.nan, 5, 6],
1820                "val3": [5, 4, 9],
1821            }
1822        ),
1823    ]
1824    ddfs = [dd.from_pandas(df, npartitions=4) for df in dfs]
1825
1826    for df, ddf in zip(dfs, ddfs):
1827        assert_eq(
1828            df.groupby("group").sum(min_count=min_count),
1829            ddf.groupby("group").sum(min_count=min_count),
1830        )
1831        assert_eq(
1832            df.groupby("group").prod(min_count=min_count),
1833            ddf.groupby("group").prod(min_count=min_count),
1834        )
1835
1836
1837def test_groupby_group_keys():
1838    df = pd.DataFrame({"a": [1, 2, 2, 3], "b": [2, 3, 4, 5]})
1839    ddf = dd.from_pandas(df, npartitions=2).set_index("a")
1840    pdf = df.set_index("a")
1841
1842    func = lambda g: g.copy()
1843    expected = pdf.groupby("a").apply(func)
1844    assert_eq(expected, ddf.groupby("a").apply(func, meta=expected))
1845
1846    expected = pdf.groupby("a", group_keys=False).apply(func)
1847    assert_eq(expected, ddf.groupby("a", group_keys=False).apply(func, meta=expected))
1848
1849
1850@pytest.mark.parametrize(
1851    "columns",
1852    [["a", "b", "c"], np.array([1.0, 2.0, 3.0]), ["1", "2", "3"], ["", "a", "b"]],
1853)
1854def test_groupby_cov(columns):
1855    rows = 20
1856    cols = 3
1857    data = np.random.randn(rows, cols)
1858    df = pd.DataFrame(data, columns=columns)
1859    df["key"] = [0] * 10 + [1] * 5 + [2] * 5
1860    ddf = dd.from_pandas(df, npartitions=3)
1861
1862    expected = df.groupby("key").cov()
1863    result = ddf.groupby("key").cov()
1864    # when using numerical values for columns
1865    # the column mapping and stacking leads to a float typed
1866    # MultiIndex.  Pandas will normally create a object typed
1867    # MultiIndex
1868    if isinstance(columns, np.ndarray):
1869        result = result.compute()
1870        # don't bother checking index -- MultiIndex levels are in a frozenlist
1871        result.columns = result.columns.astype(np.dtype("O"))
1872        assert_eq(expected, result, check_index=False)
1873    else:
1874        assert_eq(expected, result)
1875
1876
1877def test_df_groupby_idxmin():
1878    pdf = pd.DataFrame(
1879        {"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]}
1880    ).set_index("idx")
1881
1882    ddf = dd.from_pandas(pdf, npartitions=3)
1883
1884    expected = pd.DataFrame({"group": [1, 2], "value": [0, 3]}).set_index("group")
1885
1886    result_pd = pdf.groupby("group").idxmin()
1887    result_dd = ddf.groupby("group").idxmin()
1888
1889    assert_eq(result_pd, result_dd)
1890    assert_eq(expected, result_dd)
1891
1892
1893@pytest.mark.parametrize("skipna", [True, False])
1894def test_df_groupby_idxmin_skipna(skipna):
1895    pdf = pd.DataFrame(
1896        {
1897            "idx": list(range(4)),
1898            "group": [1, 1, 2, 2],
1899            "value": [np.nan, 20.1, np.nan, 10.1],
1900        }
1901    ).set_index("idx")
1902
1903    ddf = dd.from_pandas(pdf, npartitions=3)
1904
1905    result_pd = pdf.groupby("group").idxmin(skipna=skipna)
1906    result_dd = ddf.groupby("group").idxmin(skipna=skipna)
1907
1908    assert_eq(result_pd, result_dd)
1909
1910
1911def test_df_groupby_idxmax():
1912    pdf = pd.DataFrame(
1913        {"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]}
1914    ).set_index("idx")
1915
1916    ddf = dd.from_pandas(pdf, npartitions=3)
1917
1918    expected = pd.DataFrame({"group": [1, 2], "value": [1, 2]}).set_index("group")
1919
1920    result_pd = pdf.groupby("group").idxmax()
1921    result_dd = ddf.groupby("group").idxmax()
1922
1923    assert_eq(result_pd, result_dd)
1924    assert_eq(expected, result_dd)
1925
1926
1927@pytest.mark.parametrize("skipna", [True, False])
1928def test_df_groupby_idxmax_skipna(skipna):
1929    pdf = pd.DataFrame(
1930        {
1931            "idx": list(range(4)),
1932            "group": [1, 1, 2, 2],
1933            "value": [np.nan, 20.1, np.nan, 10.1],
1934        }
1935    ).set_index("idx")
1936
1937    ddf = dd.from_pandas(pdf, npartitions=3)
1938
1939    result_pd = pdf.groupby("group").idxmax(skipna=skipna)
1940    result_dd = ddf.groupby("group").idxmax(skipna=skipna)
1941
1942    assert_eq(result_pd, result_dd)
1943
1944
1945def test_series_groupby_idxmin():
1946    pdf = pd.DataFrame(
1947        {"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]}
1948    ).set_index("idx")
1949
1950    ddf = dd.from_pandas(pdf, npartitions=3)
1951
1952    expected = (
1953        pd.DataFrame({"group": [1, 2], "value": [0, 3]}).set_index("group").squeeze()
1954    )
1955
1956    result_pd = pdf.groupby("group")["value"].idxmin()
1957    result_dd = ddf.groupby("group")["value"].idxmin()
1958
1959    assert_eq(result_pd, result_dd)
1960    assert_eq(expected, result_dd)
1961
1962
1963@pytest.mark.parametrize("skipna", [True, False])
1964def test_series_groupby_idxmin_skipna(skipna):
1965    pdf = pd.DataFrame(
1966        {
1967            "idx": list(range(4)),
1968            "group": [1, 1, 2, 2],
1969            "value": [np.nan, 20.1, np.nan, 10.1],
1970        }
1971    ).set_index("idx")
1972
1973    ddf = dd.from_pandas(pdf, npartitions=3)
1974
1975    result_pd = pdf.groupby("group")["value"].idxmin(skipna=skipna)
1976    result_dd = ddf.groupby("group")["value"].idxmin(skipna=skipna)
1977
1978    assert_eq(result_pd, result_dd)
1979
1980
1981def test_series_groupby_idxmax():
1982    pdf = pd.DataFrame(
1983        {"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]}
1984    ).set_index("idx")
1985
1986    ddf = dd.from_pandas(pdf, npartitions=3)
1987
1988    expected = (
1989        pd.DataFrame({"group": [1, 2], "value": [1, 2]}).set_index("group").squeeze()
1990    )
1991
1992    result_pd = pdf.groupby("group")["value"].idxmax()
1993    result_dd = ddf.groupby("group")["value"].idxmax()
1994
1995    assert_eq(result_pd, result_dd)
1996    assert_eq(expected, result_dd)
1997
1998
1999@pytest.mark.parametrize("skipna", [True, False])
2000def test_series_groupby_idxmax_skipna(skipna):
2001    pdf = pd.DataFrame(
2002        {
2003            "idx": list(range(4)),
2004            "group": [1, 1, 2, 2],
2005            "value": [np.nan, 20.1, np.nan, 10.1],
2006        }
2007    ).set_index("idx")
2008
2009    ddf = dd.from_pandas(pdf, npartitions=3)
2010
2011    result_pd = pdf.groupby("group")["value"].idxmax(skipna=skipna)
2012    result_dd = ddf.groupby("group")["value"].idxmax(skipna=skipna)
2013
2014    assert_eq(result_pd, result_dd)
2015
2016
2017def test_groupby_unique():
2018    rng = np.random.RandomState(42)
2019    df = pd.DataFrame(
2020        {"foo": rng.randint(3, size=100), "bar": rng.randint(10, size=100)}
2021    )
2022    ddf = dd.from_pandas(df, npartitions=10)
2023
2024    pd_gb = df.groupby("foo")["bar"].unique()
2025    dd_gb = ddf.groupby("foo")["bar"].unique()
2026
2027    # Use explode because each DataFrame row is a list; equality fails
2028    assert_eq(dd_gb.explode(), pd_gb.explode())
2029
2030
2031def test_groupby_value_counts():
2032    rng = np.random.RandomState(42)
2033    df = pd.DataFrame(
2034        {"foo": rng.randint(3, size=100), "bar": rng.randint(4, size=100)}
2035    )
2036    ddf = dd.from_pandas(df, npartitions=2)
2037
2038    pd_gb = df.groupby("foo")["bar"].value_counts()
2039    dd_gb = ddf.groupby("foo")["bar"].value_counts()
2040    assert_eq(dd_gb, pd_gb)
2041
2042
2043@pytest.mark.parametrize(
2044    "transformation", [lambda x: x.sum(), np.sum, "sum", pd.Series.rank]
2045)
2046def test_groupby_transform_funcs(transformation):
2047    pdf = pd.DataFrame(
2048        {
2049            "A": [1, 2, 3, 4] * 5,
2050            "B": np.random.randn(20),
2051            "C": np.random.randn(20),
2052            "D": np.random.randn(20),
2053        }
2054    )
2055    ddf = dd.from_pandas(pdf, 3)
2056
2057    with pytest.warns(UserWarning):
2058        # DataFrame
2059        assert_eq(
2060            pdf.groupby("A").transform(transformation),
2061            ddf.groupby("A").transform(transformation),
2062        )
2063
2064        # Series
2065        assert_eq(
2066            pdf.groupby("A")["B"].transform(transformation),
2067            ddf.groupby("A")["B"].transform(transformation),
2068        )
2069
2070
2071@pytest.mark.parametrize("npartitions", list(range(1, 10)))
2072@pytest.mark.parametrize("indexed", [True, False])
2073def test_groupby_transform_ufunc_partitioning(npartitions, indexed):
2074    pdf = pd.DataFrame({"group": [1, 2, 3, 4, 5] * 20, "value": np.random.randn(100)})
2075
2076    if indexed:
2077        pdf = pdf.set_index("group")
2078
2079    ddf = dd.from_pandas(pdf, npartitions)
2080
2081    with pytest.warns(UserWarning):
2082        # DataFrame
2083        assert_eq(
2084            pdf.groupby("group").transform(lambda series: series - series.mean()),
2085            ddf.groupby("group").transform(lambda series: series - series.mean()),
2086        )
2087
2088        # Series
2089        assert_eq(
2090            pdf.groupby("group")["value"].transform(
2091                lambda series: series - series.mean()
2092            ),
2093            ddf.groupby("group")["value"].transform(
2094                lambda series: series - series.mean()
2095            ),
2096        )
2097
2098
2099@pytest.mark.parametrize(
2100    "grouping,agg",
2101    [
2102        (
2103            lambda df: df.drop(columns="category_2").groupby("category_1"),
2104            lambda grp: grp.mean(),
2105        ),
2106        (
2107            lambda df: df.drop(columns="category_2").groupby("category_1"),
2108            lambda grp: grp.agg("mean"),
2109        ),
2110        (lambda df: df.groupby(["category_1", "category_2"]), lambda grp: grp.mean()),
2111        (
2112            lambda df: df.groupby(["category_1", "category_2"]),
2113            lambda grp: grp.agg("mean"),
2114        ),
2115    ],
2116)
2117def test_groupby_aggregate_categoricals(grouping, agg):
2118    pdf = pd.DataFrame(
2119        {
2120            "category_1": pd.Categorical(list("AABBCC")),
2121            "category_2": pd.Categorical(list("ABCABC")),
2122            "value": np.random.uniform(size=6),
2123        }
2124    )
2125    ddf = dd.from_pandas(pdf, 2)
2126
2127    # DataFrameGroupBy
2128    assert_eq(agg(grouping(pdf)), agg(grouping(ddf)))
2129
2130    # SeriesGroupBy
2131    assert_eq(agg(grouping(pdf)["value"]), agg(grouping(ddf)["value"]))
2132
2133
2134@pytest.mark.xfail(
2135    not dask.dataframe.utils.PANDAS_GT_110,
2136    reason="dropna kwarg not supported in pandas < 1.1.0.",
2137)
2138@pytest.mark.parametrize("dropna", [False, True])
2139def test_groupby_dropna_pandas(dropna):
2140    df = pd.DataFrame(
2141        {"a": [1, 2, 3, 4, None, None, 7, 8], "e": [4, 5, 6, 3, 2, 1, 0, 0]}
2142    )
2143    ddf = dd.from_pandas(df, npartitions=3)
2144
2145    dask_result = ddf.groupby("a", dropna=dropna).e.sum()
2146    pd_result = df.groupby("a", dropna=dropna).e.sum()
2147    assert_eq(dask_result, pd_result)
2148
2149
2150@pytest.mark.gpu
2151@pytest.mark.parametrize("dropna", [False, True, None])
2152@pytest.mark.parametrize("by", ["a", "c", "d", ["a", "b"], ["a", "c"], ["a", "d"]])
2153def test_groupby_dropna_cudf(dropna, by):
2154
2155    # NOTE: This test requires cudf/dask_cudf, and will
2156    # be skipped by non-GPU CI
2157
2158    cudf = pytest.importorskip("cudf")
2159    dask_cudf = pytest.importorskip("dask_cudf")
2160
2161    df = cudf.DataFrame(
2162        {
2163            "a": [1, 2, 3, 4, None, None, 7, 8],
2164            "b": [1, 0] * 4,
2165            "c": ["a", "b", None, None, "e", "f", "g", "h"],
2166            "e": [4, 5, 6, 3, 2, 1, 0, 0],
2167        }
2168    )
2169    df["d"] = df["c"].astype("category")
2170    ddf = dask_cudf.from_cudf(df, npartitions=3)
2171
2172    if dropna is None:
2173        dask_result = ddf.groupby(by).e.sum()
2174        cudf_result = df.groupby(by).e.sum()
2175    else:
2176        dask_result = ddf.groupby(by, dropna=dropna).e.sum()
2177        cudf_result = df.groupby(by, dropna=dropna).e.sum()
2178    if by in ["c", "d"]:
2179        # Lose string/category index name in cudf...
2180        dask_result = dask_result.compute()
2181        dask_result.index.name = cudf_result.index.name
2182
2183    assert_eq(dask_result, cudf_result)
2184
2185
2186@pytest.mark.xfail(
2187    not dask.dataframe.utils.PANDAS_GT_110,
2188    reason="Should work starting from pandas 1.1.0",
2189)
2190def test_groupby_dropna_with_agg():
2191    # https://github.com/dask/dask/issues/6986
2192    df = pd.DataFrame(
2193        {"id1": ["a", None, "b"], "id2": [1, 2, None], "v1": [4.5, 5.5, None]}
2194    )
2195    expected = df.groupby(["id1", "id2"], dropna=False).agg("sum")
2196
2197    ddf = dd.from_pandas(df, 1, sort=False)
2198    actual = ddf.groupby(["id1", "id2"], dropna=False).agg("sum")
2199    assert_eq(expected, actual)
2200
2201
2202def test_groupby_observed_with_agg():
2203    df = pd.DataFrame(
2204        {
2205            "cat_1": pd.Categorical(list("AB"), categories=list("ABCDE")),
2206            "cat_2": pd.Categorical([1, 2], categories=[1, 2, 3]),
2207            "value_1": np.random.uniform(size=2),
2208        }
2209    )
2210    expected = df.groupby(["cat_1", "cat_2"], observed=True).agg("sum")
2211
2212    ddf = dd.from_pandas(df, 2)
2213    actual = ddf.groupby(["cat_1", "cat_2"], observed=True).agg("sum")
2214    assert_eq(expected, actual)
2215
2216
2217def test_rounding_negative_var():
2218    x = [-0.00179999999 for _ in range(10)]
2219    ids = [1 for _ in range(5)] + [2 for _ in range(5)]
2220
2221    df = pd.DataFrame({"ids": ids, "x": x})
2222
2223    ddf = dd.from_pandas(df, npartitions=2)
2224    assert_eq(ddf.groupby("ids").x.std(), df.groupby("ids").x.std())
2225
2226
2227@pytest.mark.parametrize("split_out", [2, 3])
2228@pytest.mark.parametrize("column", [["b", "c"], ["b", "d"], ["b", "e"]])
2229def test_groupby_split_out_multiindex(split_out, column):
2230    df = pd.DataFrame(
2231        {
2232            "a": np.arange(8),
2233            "b": [1, 0, 0, 2, 1, 1, 2, 0],
2234            "c": [0, 1] * 4,
2235            "d": ["dog", "cat", "cat", "dog", "dog", "dog", "cat", "bird"],
2236        }
2237    ).fillna(0)
2238    df["e"] = df["d"].astype("category")
2239    ddf = dd.from_pandas(df, npartitions=3)
2240
2241    ddf_result_so1 = (
2242        ddf.groupby(column).a.mean(split_out=1).compute().sort_values().dropna()
2243    )
2244
2245    ddf_result = (
2246        ddf.groupby(column).a.mean(split_out=split_out).compute().sort_values().dropna()
2247    )
2248
2249    assert_eq(ddf_result, ddf_result_so1, check_index=False)
2250
2251
2252@pytest.mark.parametrize(
2253    "backend",
2254    [
2255        "pandas",
2256        pytest.param("cudf", marks=pytest.mark.gpu),
2257    ],
2258)
2259def test_groupby_large_ints_exception(backend):
2260    data_source = pytest.importorskip(backend)
2261    if backend == "cudf":
2262        dask_cudf = pytest.importorskip("dask_cudf")
2263        data_frame = dask_cudf.from_cudf
2264    else:
2265        data_frame = dd.from_pandas
2266    max = np.iinfo(np.uint64).max
2267    sqrt = max ** 0.5
2268    series = data_source.Series(
2269        np.concatenate([sqrt * np.arange(5), np.arange(35)])
2270    ).astype("int64")
2271    df = data_source.DataFrame({"x": series, "z": np.arange(40), "y": np.arange(40)})
2272    ddf = data_frame(df, npartitions=1)
2273    assert_eq(
2274        df.groupby("x").std(),
2275        ddf.groupby("x").std().compute(scheduler="single-threaded"),
2276    )
2277
2278
2279@pytest.mark.parametrize("by", ["a", "b", "c", ["a", "b"], ["a", "c"]])
2280@pytest.mark.parametrize("agg", ["count", "mean", "std"])
2281@pytest.mark.parametrize("sort", [True, False])
2282def test_groupby_sort_argument(by, agg, sort):
2283
2284    df = pd.DataFrame(
2285        {
2286            "a": [1, 2, 3, 4, None, None, 7, 8],
2287            "b": [1, 0] * 4,
2288            "c": ["a", "b", None, None, "e", "f", "g", "h"],
2289            "e": [4, 5, 6, 3, 2, 1, 0, 0],
2290        }
2291    )
2292    ddf = dd.from_pandas(df, npartitions=3)
2293
2294    gb = ddf.groupby(by, sort=sort)
2295    gb_pd = df.groupby(by, sort=sort)
2296
2297    # Basic groupby aggregation
2298    result_1 = getattr(gb, agg)
2299    result_1_pd = getattr(gb_pd, agg)
2300
2301    # Choose single column
2302    result_2 = getattr(gb.e, agg)
2303    result_2_pd = getattr(gb_pd.e, agg)
2304
2305    # Use `agg()` api
2306    result_3 = gb.agg({"e": agg})
2307    result_3_pd = gb_pd.agg({"e": agg})
2308
2309    if agg == "mean":
2310        assert_eq(result_1(), result_1_pd().astype("float"))
2311        assert_eq(result_2(), result_2_pd().astype("float"))
2312        assert_eq(result_3, result_3_pd.astype("float"))
2313    else:
2314        assert_eq(result_1(), result_1_pd())
2315        assert_eq(result_2(), result_2_pd())
2316        assert_eq(result_3, result_3_pd)
2317
2318
2319@pytest.mark.parametrize("agg", [M.sum, M.prod, M.max, M.min])
2320@pytest.mark.parametrize("sort", [True, False])
2321def test_groupby_sort_argument_agg(agg, sort):
2322    df = pd.DataFrame({"x": [4, 2, 1, 2, 3, 1], "y": [1, 2, 3, 4, 5, 6]})
2323    ddf = dd.from_pandas(df, npartitions=3)
2324
2325    result = agg(ddf.groupby("x", sort=sort))
2326    result_pd = agg(df.groupby("x", sort=sort))
2327
2328    assert_eq(result, result_pd)
2329    if sort:
2330        # Check order of index if sort==True
2331        # (no guarantee that order will match otherwise)
2332        assert_eq(result.index, result_pd.index)
2333
2334
2335def test_groupby_sort_true_split_out():
2336    df = pd.DataFrame({"x": [4, 2, 1, 2, 3, 1], "y": [1, 2, 3, 4, 5, 6]})
2337    ddf = dd.from_pandas(df, npartitions=3)
2338
2339    # Works fine for split_out==1 or sort=False/None
2340    M.sum(ddf.groupby("x", sort=True), split_out=1)
2341    M.sum(ddf.groupby("x", sort=False), split_out=2)
2342    M.sum(ddf.groupby("x"), split_out=2)
2343
2344    with pytest.raises(NotImplementedError):
2345        # Cannot use sort=True with split_out>1 (for now)
2346        M.sum(ddf.groupby("x", sort=True), split_out=2)
2347
2348
2349@pytest.mark.skipif(
2350    not PANDAS_GT_110, reason="observed only supported for newer pandas"
2351)
2352@pytest.mark.parametrize("known_cats", [True, False])
2353@pytest.mark.parametrize("ordered_cats", [True, False])
2354@pytest.mark.parametrize("groupby", ["cat_1", ["cat_1", "cat_2"]])
2355@pytest.mark.parametrize("observed", [True, False])
2356def test_groupby_aggregate_categorical_observed(
2357    known_cats, ordered_cats, agg_func, groupby, observed
2358):
2359    if agg_func in ["cov", "corr", "nunique"]:
2360        pytest.skip("Not implemented for DataFrameGroupBy yet.")
2361    if agg_func in ["sum", "count", "prod"] and groupby != "cat_1":
2362        pytest.skip("Gives zeros rather than nans.")
2363    if agg_func in ["std", "var"] and observed:
2364        pytest.skip("Can't calculate observed with all nans")
2365
2366    pdf = pd.DataFrame(
2367        {
2368            "cat_1": pd.Categorical(
2369                list("AB"), categories=list("ABCDE"), ordered=ordered_cats
2370            ),
2371            "cat_2": pd.Categorical([1, 2], categories=[1, 2, 3], ordered=ordered_cats),
2372            "value_1": np.random.uniform(size=2),
2373        }
2374    )
2375    ddf = dd.from_pandas(pdf, 2)
2376
2377    if not known_cats:
2378        ddf["cat_1"] = ddf["cat_1"].cat.as_unknown()
2379        ddf["cat_2"] = ddf["cat_2"].cat.as_unknown()
2380
2381    def agg(grp, **kwargs):
2382        return getattr(grp, agg_func)(**kwargs)
2383
2384    # only include numeric columns when passing to "min" or "max"
2385    # pandas default is numeric_only=False
2386    if ordered_cats is False and agg_func in ["min", "max"] and groupby == "cat_1":
2387        pdf = pdf[["cat_1", "value_1"]]
2388        ddf = ddf[["cat_1", "value_1"]]
2389
2390    assert_eq(
2391        agg(pdf.groupby(groupby, observed=observed)),
2392        agg(ddf.groupby(groupby, observed=observed)),
2393    )
2394
2395
2396def test_empty_partitions_with_value_counts():
2397    # https://github.com/dask/dask/issues/7065
2398    df = pd.DataFrame(
2399        data=[
2400            ["a1", "b1"],
2401            ["a1", None],
2402            ["a1", "b1"],
2403            [None, None],
2404            [None, None],
2405            [None, None],
2406            ["a3", "b3"],
2407            ["a3", "b3"],
2408            ["a5", "b5"],
2409        ],
2410        columns=["A", "B"],
2411    )
2412
2413    expected = df.groupby("A")["B"].value_counts()
2414    ddf = dd.from_pandas(df, npartitions=3)
2415    actual = ddf.groupby("A")["B"].value_counts()
2416    assert_eq(expected, actual)
2417
2418
2419def test_groupby_with_pd_grouper():
2420    ddf = dd.from_pandas(
2421        pd.DataFrame(
2422            {"key1": ["a", "b", "a"], "key2": ["c", "c", "c"], "value": [1, 2, 3]}
2423        ),
2424        npartitions=3,
2425    )
2426    with pytest.raises(NotImplementedError):
2427        ddf.groupby(pd.Grouper(key="key1"))
2428    with pytest.raises(NotImplementedError):
2429        ddf.groupby(["key1", pd.Grouper(key="key2")])
2430
2431
2432@pytest.mark.parametrize("operation", ["head", "tail"])
2433def test_groupby_empty_partitions_with_rows_operation(operation):
2434
2435    df = pd.DataFrame(
2436        data=[
2437            ["a1", "b1"],
2438            ["a1", None],
2439            ["a1", "b1"],
2440            [None, None],
2441            [None, None],
2442            [None, None],
2443            ["a3", "b3"],
2444            ["a3", "b3"],
2445            ["a5", "b5"],
2446        ],
2447        columns=["A", "B"],
2448    )
2449
2450    caller = operator.methodcaller(operation, 1)
2451    expected = caller(df.groupby("A")["B"])
2452    ddf = dd.from_pandas(df, npartitions=3)
2453    actual = caller(ddf.groupby("A")["B"])
2454    assert_eq(expected, actual)
2455
2456
2457@pytest.mark.parametrize("operation", ["head", "tail"])
2458def test_groupby_with_row_operations(operation):
2459    df = pd.DataFrame(
2460        data=[
2461            ["a0", "b1"],
2462            ["a0", "b2"],
2463            ["a1", "b1"],
2464            ["a3", "b3"],
2465            ["a3", "b3"],
2466            ["a5", "b5"],
2467            ["a1", "b1"],
2468            ["a1", "b1"],
2469            ["a1", "b1"],
2470        ],
2471        columns=["A", "B"],
2472    )
2473
2474    caller = operator.methodcaller(operation)
2475    expected = caller(df.groupby("A")["B"])
2476    ddf = dd.from_pandas(df, npartitions=3)
2477    actual = caller(ddf.groupby("A")["B"])
2478    assert_eq(expected, actual)
2479
2480
2481@pytest.mark.parametrize("operation", ["head", "tail"])
2482def test_groupby_multi_index_with_row_operations(operation):
2483    df = pd.DataFrame(
2484        data=[
2485            ["a0", "b1"],
2486            ["a0", "b2"],
2487            ["a1", "b1"],
2488            ["a3", "b3"],
2489            ["a3", "b3"],
2490            ["a5", "b5"],
2491            ["a1", "b1"],
2492            ["a1", "b1"],
2493            ["a1", "b1"],
2494        ],
2495        columns=["A", "B"],
2496    )
2497
2498    caller = operator.methodcaller(operation)
2499    expected = caller(df.groupby(["A", df["A"].eq("a1")])["B"])
2500    ddf = dd.from_pandas(df, npartitions=3)
2501    actual = caller(ddf.groupby(["A", ddf["A"].eq("a1")])["B"])
2502    assert_eq(expected, actual)
2503