1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 package org.openjdk.tests.java.util.stream;
24 
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Iterator;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Optional;
36 import java.util.Set;
37 import java.util.StringJoiner;
38 import java.util.TreeMap;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentSkipListMap;
41 import java.util.function.BinaryOperator;
42 import java.util.function.Function;
43 import java.util.function.Predicate;
44 import java.util.function.Supplier;
45 import java.util.stream.Collector;
46 import java.util.stream.Collectors;
47 import java.util.stream.LambdaTestHelpers;
48 import java.util.stream.OpTestCase;
49 import java.util.stream.Stream;
50 import java.util.stream.StreamOpFlagTestHelper;
51 import java.util.stream.StreamTestDataProvider;
52 import java.util.stream.TestData;
53 
54 import org.testng.annotations.Test;
55 
56 import static java.util.stream.Collectors.collectingAndThen;
57 import static java.util.stream.Collectors.groupingBy;
58 import static java.util.stream.Collectors.groupingByConcurrent;
59 import static java.util.stream.Collectors.partitioningBy;
60 import static java.util.stream.Collectors.reducing;
61 import static java.util.stream.Collectors.toCollection;
62 import static java.util.stream.Collectors.toConcurrentMap;
63 import static java.util.stream.Collectors.toList;
64 import static java.util.stream.Collectors.toMap;
65 import static java.util.stream.Collectors.toSet;
66 import static java.util.stream.LambdaTestHelpers.assertContents;
67 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
68 import static java.util.stream.LambdaTestHelpers.mDoubler;
69 
70 /**
71  * TabulatorsTest
72  *
73  * @author Brian Goetz
74  */
75 @SuppressWarnings({"rawtypes", "unchecked"})
76 public class TabulatorsTest extends OpTestCase {
77 
78     private static abstract class TabulationAssertion<T, U> {
assertValue(U value, Supplier<Stream<T>> source, boolean ordered)79         abstract void assertValue(U value,
80                                   Supplier<Stream<T>> source,
81                                   boolean ordered) throws ReflectiveOperationException;
82     }
83 
84     @SuppressWarnings({"rawtypes", "unchecked"})
85     static class GroupedMapAssertion<T, K, V, M extends Map<K, ? extends V>> extends TabulationAssertion<T, M> {
86         private final Class<? extends Map> clazz;
87         private final Function<T, K> classifier;
88         private final TabulationAssertion<T,V> downstream;
89 
GroupedMapAssertion(Function<T, K> classifier, Class<? extends Map> clazz, TabulationAssertion<T, V> downstream)90         protected GroupedMapAssertion(Function<T, K> classifier,
91                                       Class<? extends Map> clazz,
92                                       TabulationAssertion<T, V> downstream) {
93             this.clazz = clazz;
94             this.classifier = classifier;
95             this.downstream = downstream;
96         }
97 
assertValue(M map, Supplier<Stream<T>> source, boolean ordered)98         void assertValue(M map,
99                          Supplier<Stream<T>> source,
100                          boolean ordered) throws ReflectiveOperationException {
101             if (!clazz.isAssignableFrom(map.getClass()))
102                 fail(String.format("Class mismatch in GroupedMapAssertion: %s, %s", clazz, map.getClass()));
103             assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
104             for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
105                 K key = entry.getKey();
106                 downstream.assertValue(entry.getValue(),
107                                        () -> source.get().filter(e -> classifier.apply(e).equals(key)),
108                                        ordered);
109             }
110         }
111     }
112 
113     static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends TabulationAssertion<T, M> {
114         private final Class<? extends Map> clazz;
115         private final Function<T, K> keyFn;
116         private final Function<T, V> valueFn;
117         private final BinaryOperator<V> mergeFn;
118 
ToMapAssertion(Function<T, K> keyFn, Function<T, V> valueFn, BinaryOperator<V> mergeFn, Class<? extends Map> clazz)119         ToMapAssertion(Function<T, K> keyFn,
120                        Function<T, V> valueFn,
121                        BinaryOperator<V> mergeFn,
122                        Class<? extends Map> clazz) {
123             this.clazz = clazz;
124             this.keyFn = keyFn;
125             this.valueFn = valueFn;
126             this.mergeFn = mergeFn;
127         }
128 
129         @Override
assertValue(M map, Supplier<Stream<T>> source, boolean ordered)130         void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
131             Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
132             assertTrue(clazz.isAssignableFrom(map.getClass()));
133             assertEquals(uniqueKeys, map.keySet());
134             source.get().forEach(t -> {
135                 K key = keyFn.apply(t);
136                 V v = source.get()
137                             .filter(e -> key.equals(keyFn.apply(e)))
138                             .map(valueFn)
139                             .reduce(mergeFn)
140                             .get();
141                 assertEquals(map.get(key), v);
142             });
143         }
144     }
145 
146     static class PartitionAssertion<T, D> extends TabulationAssertion<T, Map<Boolean,D>> {
147         private final Predicate<T> predicate;
148         private final TabulationAssertion<T,D> downstream;
149 
PartitionAssertion(Predicate<T> predicate, TabulationAssertion<T, D> downstream)150         protected PartitionAssertion(Predicate<T> predicate,
151                                      TabulationAssertion<T, D> downstream) {
152             this.predicate = predicate;
153             this.downstream = downstream;
154         }
155 
assertValue(Map<Boolean, D> map, Supplier<Stream<T>> source, boolean ordered)156         void assertValue(Map<Boolean, D> map,
157                          Supplier<Stream<T>> source,
158                          boolean ordered) throws ReflectiveOperationException {
159             if (!Map.class.isAssignableFrom(map.getClass()))
160                 fail(String.format("Class mismatch in PartitionAssertion: %s", map.getClass()));
161             assertEquals(2, map.size());
162             downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
163             downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
164         }
165     }
166 
167     @SuppressWarnings({"rawtypes", "unchecked"})
168     static class ListAssertion<T> extends TabulationAssertion<T, List<T>> {
169         @Override
assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)170         void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
171                 throws ReflectiveOperationException {
172             if (!List.class.isAssignableFrom(value.getClass()))
173                 fail(String.format("Class mismatch in ListAssertion: %s", value.getClass()));
174             Stream<T> stream = source.get();
175             List<T> result = new ArrayList<>();
176             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
177                 result.add(it.next());
178             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
179                 assertContents(value, result);
180             else
181                 assertContentsUnordered(value, result);
182         }
183     }
184 
185     @SuppressWarnings({"rawtypes", "unchecked"})
186     static class CollectionAssertion<T> extends TabulationAssertion<T, Collection<T>> {
187         private final Class<? extends Collection> clazz;
188         private final boolean targetOrdered;
189 
CollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered)190         protected CollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
191             this.clazz = clazz;
192             this.targetOrdered = targetOrdered;
193         }
194 
195         @Override
assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)196         void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
197                 throws ReflectiveOperationException {
198             if (!clazz.isAssignableFrom(value.getClass()))
199                 fail(String.format("Class mismatch in CollectionAssertion: %s, %s", clazz, value.getClass()));
200             Stream<T> stream = source.get();
201             Collection<T> result = clazz.newInstance();
202             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
203                 result.add(it.next());
204             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
205                 assertContents(value, result);
206             else
207                 assertContentsUnordered(value, result);
208         }
209     }
210 
211     static class ReduceAssertion<T, U> extends TabulationAssertion<T, U> {
212         private final U identity;
213         private final Function<T, U> mapper;
214         private final BinaryOperator<U> reducer;
215 
ReduceAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer)216         ReduceAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
217             this.identity = identity;
218             this.mapper = mapper;
219             this.reducer = reducer;
220         }
221 
222         @Override
assertValue(U value, Supplier<Stream<T>> source, boolean ordered)223         void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
224                 throws ReflectiveOperationException {
225             Optional<U> reduced = source.get().map(mapper).reduce(reducer);
226             if (value == null)
227                 assertTrue(!reduced.isPresent());
228             else if (!reduced.isPresent()) {
229                 assertEquals(value, identity);
230             }
231             else {
232                 assertEquals(value, reduced.get());
233             }
234         }
235     }
236 
mapTabulationAsserter(boolean ordered)237     private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
238         return (act, exp, ord, par) -> {
239             if (par && (!ordered || !ord)) {
240                 TabulatorsTest.nestedMapEqualityAssertion(act, exp);
241             }
242             else {
243                 LambdaTestHelpers.assertContentsEqual(act, exp);
244             }
245         };
246     }
247 
248     private<T, M extends Map>
249     void exerciseMapTabulation(TestData<T, Stream<T>> data,
250                                Collector<T, ?, ? extends M> collector,
251                                TabulationAssertion<T, M> assertion)
252             throws ReflectiveOperationException {
253         boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
254 
255         M m = withData(data)
256                 .terminal(s -> s.collect(collector))
257                 .resultAsserter(mapTabulationAsserter(ordered))
258                 .exercise();
259         assertion.assertValue(m, () -> data.stream(), ordered);
260 
261         m = withData(data)
262                 .terminal(s -> s.unordered().collect(collector))
263                 .resultAsserter(mapTabulationAsserter(ordered))
264                 .exercise();
265         assertion.assertValue(m, () -> data.stream(), false);
266     }
267 
268     private static void nestedMapEqualityAssertion(Object o1, Object o2) {
269         if (o1 instanceof Map) {
270             Map m1 = (Map) o1;
271             Map m2 = (Map) o2;
272             assertContentsUnordered(m1.keySet(), m2.keySet());
273             for (Object k : m1.keySet())
274                 nestedMapEqualityAssertion(m1.get(k), m2.get(k));
275         }
276         else if (o1 instanceof Collection) {
277             assertContentsUnordered(((Collection) o1), ((Collection) o2));
278         }
279         else
280             assertEquals(o1, o2);
281     }
282 
283     private<T, R> void assertCollect(TestData.OfRef<T> data,
284                                      Collector<T, ?, R> collector,
285                                      Function<Stream<T>, R> streamReduction) {
286         R check = streamReduction.apply(data.stream());
287         withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
288     }
289 
290     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
291     public void testReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
292         assertCollect(data, Collectors.reducing(0, Integer::sum),
293                       s -> s.reduce(0, Integer::sum));
294         assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
295                       s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
296         assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
297                       s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
298 
299         assertCollect(data, Collectors.reducing(Integer::sum),
300                       s -> s.reduce(Integer::sum));
301         assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
302                       s -> s.min(Integer::compare));
303         assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
304                       s -> s.max(Integer::compare));
305 
306         assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
307                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
308 
309         assertCollect(data, Collectors.summingLong(x -> x * 2L),
310                       s -> s.map(x -> x*2L).reduce(0L, Long::sum));
311         assertCollect(data, Collectors.summingInt(x -> x * 2),
312                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
313         assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
314                       s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
315 
316         assertCollect(data, Collectors.averagingInt(x -> x * 2),
317                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
318         assertCollect(data, Collectors.averagingLong(x -> x * 2),
319                       s -> s.mapToLong(x -> x * 2).average().orElse(0));
320         assertCollect(data, Collectors.averagingDouble(x -> x * 2),
321                       s -> s.mapToDouble(x -> x * 2).average().orElse(0));
322 
323         // Test explicit Collector.of
324         Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
325                                                                    (a, b) -> {
326                                                                        a[0] += b * 2;
327                                                                        a[1]++;
328                                                                    },
329                                                                    (a, b) -> {
330                                                                        a[0] += b[0];
331                                                                        a[1] += b[1];
332                                                                        return a;
333                                                                    },
334                                                                    a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
335         assertCollect(data, avg2xint,
336                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
337     }
338 
339     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
340     public void testJoin(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
341         withData(data)
342                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
343                 .expectedResult(join(data, ""))
344                 .exercise();
345 
346         Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
347         withData(data)
348                 .terminal(s -> s.map(Object::toString).collect(likeJoining))
349                 .expectedResult(join(data, ""))
350                 .exercise();
351 
352         withData(data)
353                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
354                 .expectedResult(join(data, ","))
355                 .exercise();
356 
357         withData(data)
358                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
359                 .expectedResult("[" + join(data, ",") + "]")
360                 .exercise();
361 
362         withData(data)
363                 .terminal(s -> s.map(Object::toString)
364                                 .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
365                                 .toString())
366                 .expectedResult(join(data, ""))
367                 .exercise();
368 
369         withData(data)
370                 .terminal(s -> s.map(Object::toString)
371                                 .collect(() -> new StringJoiner(","),
372                                          (sj, cs) -> sj.add(cs),
373                                          (j1, j2) -> j1.merge(j2))
374                                 .toString())
375                 .expectedResult(join(data, ","))
376                 .exercise();
377 
378         withData(data)
379                 .terminal(s -> s.map(Object::toString)
380                                 .collect(() -> new StringJoiner(",", "[", "]"),
381                                          (sj, cs) -> sj.add(cs),
382                                          (j1, j2) -> j1.merge(j2))
383                                 .toString())
384                 .expectedResult("[" + join(data, ",") + "]")
385                 .exercise();
386     }
387 
388     private<T> String join(TestData.OfRef<T> data, String delim) {
389         StringBuilder sb = new StringBuilder();
390         boolean first = true;
391         for (T i : data) {
392             if (!first)
393                 sb.append(delim);
394             sb.append(i.toString());
395             first = false;
396         }
397         return sb.toString();
398     }
399 
400     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
401     public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
402         Function<Integer, Integer> keyFn = i -> i * 2;
403         Function<Integer, Integer> valueFn = i -> i * 4;
404 
405         List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
406         Set<Integer> dataAsSet = new HashSet<>(dataAsList);
407 
408         BinaryOperator<Integer> sum = Integer::sum;
409         for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
410                                                         (u, v) -> v,
411                                                         sum)) {
412             try {
413                 exerciseMapTabulation(data, toMap(keyFn, valueFn),
414                                       new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
415                 if (dataAsList.size() != dataAsSet.size())
416                     fail("Expected ISE on input with duplicates");
417             }
418             catch (IllegalStateException e) {
419                 if (dataAsList.size() == dataAsSet.size())
420                     fail("Expected no ISE on input without duplicates");
421             }
422 
423             exerciseMapTabulation(data, toMap(keyFn, valueFn, op),
424                                   new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
425 
426             exerciseMapTabulation(data, toMap(keyFn, valueFn, op, TreeMap::new),
427                                   new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
428         }
429 
430         // For concurrent maps, only use commutative merge functions
431         try {
432             exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn),
433                                   new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
434             if (dataAsList.size() != dataAsSet.size())
435                 fail("Expected ISE on input with duplicates");
436         }
437         catch (IllegalStateException e) {
438             if (dataAsList.size() == dataAsSet.size())
439                 fail("Expected no ISE on input without duplicates");
440         }
441 
442         exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum),
443                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
444 
445         exerciseMapTabulation(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
446                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
447     }
448 
449     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
450     public void testSimpleGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
451         Function<Integer, Integer> classifier = i -> i % 3;
452 
453         // Single-level groupBy
454         exerciseMapTabulation(data, groupingBy(classifier),
455                               new GroupedMapAssertion<>(classifier, HashMap.class,
456                                                         new ListAssertion<>()));
457         exerciseMapTabulation(data, groupingByConcurrent(classifier),
458                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
459                                                         new ListAssertion<>()));
460 
461         // With explicit constructors
462         exerciseMapTabulation(data,
463                               groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
464                               new GroupedMapAssertion<>(classifier, TreeMap.class,
465                                                         new CollectionAssertion<Integer>(HashSet.class, false)));
466         exerciseMapTabulation(data,
467                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
468                                                    toCollection(HashSet::new)),
469                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
470                                                         new CollectionAssertion<Integer>(HashSet.class, false)));
471     }
472 
473     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
474     public void testTwoLevelGroupBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
475         Function<Integer, Integer> classifier = i -> i % 6;
476         Function<Integer, Integer> classifier2 = i -> i % 23;
477 
478         // Two-level groupBy
479         exerciseMapTabulation(data,
480                               groupingBy(classifier, groupingBy(classifier2)),
481                               new GroupedMapAssertion<>(classifier, HashMap.class,
482                                                         new GroupedMapAssertion<>(classifier2, HashMap.class,
483                                                                                   new ListAssertion<>())));
484         // with concurrent as upstream
485         exerciseMapTabulation(data,
486                               groupingByConcurrent(classifier, groupingBy(classifier2)),
487                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
488                                                         new GroupedMapAssertion<>(classifier2, HashMap.class,
489                                                                                   new ListAssertion<>())));
490         // with concurrent as downstream
491         exerciseMapTabulation(data,
492                               groupingBy(classifier, groupingByConcurrent(classifier2)),
493                               new GroupedMapAssertion<>(classifier, HashMap.class,
494                                                         new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
495                                                                                   new ListAssertion<>())));
496         // with concurrent as upstream and downstream
497         exerciseMapTabulation(data,
498                               groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
499                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
500                                                         new GroupedMapAssertion<>(classifier2, ConcurrentHashMap.class,
501                                                                                   new ListAssertion<>())));
502 
503         // With explicit constructors
504         exerciseMapTabulation(data,
505                               groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
506                               new GroupedMapAssertion<>(classifier, TreeMap.class,
507                                                         new GroupedMapAssertion<>(classifier2, TreeMap.class,
508                                                                                   new CollectionAssertion<Integer>(HashSet.class, false))));
509         // with concurrent as upstream
510         exerciseMapTabulation(data,
511                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
512                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
513                                                         new GroupedMapAssertion<>(classifier2, TreeMap.class,
514                                                                                   new ListAssertion<>())));
515         // with concurrent as downstream
516         exerciseMapTabulation(data,
517                               groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
518                               new GroupedMapAssertion<>(classifier, TreeMap.class,
519                                                         new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
520                                                                                   new ListAssertion<>())));
521         // with concurrent as upstream and downstream
522         exerciseMapTabulation(data,
523                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
524                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
525                                                         new GroupedMapAssertion<>(classifier2, ConcurrentSkipListMap.class,
526                                                                                   new ListAssertion<>())));
527     }
528 
529     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
530     public void testGroupedReduce(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
531         Function<Integer, Integer> classifier = i -> i % 3;
532 
533         // Single-level simple reduce
534         exerciseMapTabulation(data,
535                               groupingBy(classifier, reducing(0, Integer::sum)),
536                               new GroupedMapAssertion<>(classifier, HashMap.class,
537                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
538         // with concurrent
539         exerciseMapTabulation(data,
540                               groupingByConcurrent(classifier, reducing(0, Integer::sum)),
541                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
542                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
543 
544         // With explicit constructors
545         exerciseMapTabulation(data,
546                               groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
547                               new GroupedMapAssertion<>(classifier, TreeMap.class,
548                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
549         // with concurrent
550         exerciseMapTabulation(data,
551                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
552                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
553                                                         new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
554 
555         // Single-level map-reduce
556         exerciseMapTabulation(data,
557                               groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
558                               new GroupedMapAssertion<>(classifier, HashMap.class,
559                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
560         // with concurrent
561         exerciseMapTabulation(data,
562                               groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
563                               new GroupedMapAssertion<>(classifier, ConcurrentHashMap.class,
564                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
565 
566         // With explicit constructors
567         exerciseMapTabulation(data,
568                               groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
569                               new GroupedMapAssertion<>(classifier, TreeMap.class,
570                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
571         // with concurrent
572         exerciseMapTabulation(data,
573                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
574                               new GroupedMapAssertion<>(classifier, ConcurrentSkipListMap.class,
575                                                         new ReduceAssertion<>(0, mDoubler, Integer::sum)));
576     }
577 
578     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
579     public void testSimplePartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
580         Predicate<Integer> classifier = i -> i % 3 == 0;
581 
582         // Single-level partition to downstream List
583         exerciseMapTabulation(data,
584                               partitioningBy(classifier),
585                               new PartitionAssertion<>(classifier, new ListAssertion<>()));
586         exerciseMapTabulation(data,
587                               partitioningBy(classifier, toList()),
588                               new PartitionAssertion<>(classifier, new ListAssertion<>()));
589     }
590 
591     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
592     public void testTwoLevelPartition(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
593         Predicate<Integer> classifier = i -> i % 3 == 0;
594         Predicate<Integer> classifier2 = i -> i % 7 == 0;
595 
596         // Two level partition
597         exerciseMapTabulation(data,
598                               partitioningBy(classifier, partitioningBy(classifier2)),
599                               new PartitionAssertion<>(classifier,
600                                                        new PartitionAssertion(classifier2, new ListAssertion<>())));
601 
602         // Two level partition with reduce
603         exerciseMapTabulation(data,
604                               partitioningBy(classifier, reducing(0, Integer::sum)),
605                               new PartitionAssertion<>(classifier,
606                                                        new ReduceAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
607     }
608 
609     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
610     public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
611         List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
612         List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
613         assertEquals(asList, asImmutableList);
614         try {
615             asImmutableList.add(0);
616             fail("Expecting immutable result");
617         }
618         catch (UnsupportedOperationException ignored) { }
619     }
620 
621 }
622