1 /*
2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 package org.openjdk.tests.java.util.stream;
24 
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Iterator;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Optional;
36 import java.util.Set;
37 import java.util.StringJoiner;
38 import java.util.TreeMap;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentSkipListMap;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.function.BinaryOperator;
43 import java.util.function.Function;
44 import java.util.function.Predicate;
45 import java.util.function.Supplier;
46 import java.util.stream.Collector;
47 import java.util.stream.Collectors;
48 import java.util.stream.LambdaTestHelpers;
49 import java.util.stream.OpTestCase;
50 import java.util.stream.Stream;
51 import java.util.stream.StreamOpFlagTestHelper;
52 import java.util.stream.StreamTestDataProvider;
53 import java.util.stream.TestData;
54 
55 import org.testng.annotations.Test;
56 
57 import static java.util.stream.Collectors.collectingAndThen;
58 import static java.util.stream.Collectors.flatMapping;
59 import static java.util.stream.Collectors.filtering;
60 import static java.util.stream.Collectors.groupingBy;
61 import static java.util.stream.Collectors.groupingByConcurrent;
62 import static java.util.stream.Collectors.mapping;
63 import static java.util.stream.Collectors.partitioningBy;
64 import static java.util.stream.Collectors.reducing;
65 import static java.util.stream.Collectors.toCollection;
66 import static java.util.stream.Collectors.toConcurrentMap;
67 import static java.util.stream.Collectors.toList;
68 import static java.util.stream.Collectors.toMap;
69 import static java.util.stream.Collectors.toSet;
70 import static java.util.stream.LambdaTestHelpers.assertContents;
71 import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
72 import static java.util.stream.LambdaTestHelpers.mDoubler;
73 
74 /*
75  * @test
76  * @bug 8071600 8144675
77  * @summary Test for collectors.
78  */
79 public class CollectorsTest extends OpTestCase {
80 
81     private abstract static class CollectorAssertion<T, U> {
assertValue(U value, Supplier<Stream<T>> source, boolean ordered)82         abstract void assertValue(U value,
83                                   Supplier<Stream<T>> source,
84                                   boolean ordered) throws ReflectiveOperationException;
85     }
86 
87     static class MappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
88         private final Function<T, V> mapper;
89         private final CollectorAssertion<V, R> downstream;
90 
MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream)91         MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream) {
92             this.mapper = mapper;
93             this.downstream = downstream;
94         }
95 
96         @Override
assertValue(R value, Supplier<Stream<T>> source, boolean ordered)97         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
98             downstream.assertValue(value,
99                                    () -> source.get().map(mapper::apply),
100                                    ordered);
101         }
102     }
103 
104     static class FlatMappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
105         private final Function<T, Stream<V>> mapper;
106         private final CollectorAssertion<V, R> downstream;
107 
FlatMappingAssertion(Function<T, Stream<V>> mapper, CollectorAssertion<V, R> downstream)108         FlatMappingAssertion(Function<T, Stream<V>> mapper,
109                              CollectorAssertion<V, R> downstream) {
110             this.mapper = mapper;
111             this.downstream = downstream;
112         }
113 
114         @Override
assertValue(R value, Supplier<Stream<T>> source, boolean ordered)115         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
116             downstream.assertValue(value,
117                                    () -> source.get().flatMap(mapper::apply),
118                                    ordered);
119         }
120     }
121 
122     static class FilteringAssertion<T, R> extends CollectorAssertion<T, R> {
123         private final Predicate<T> filter;
124         private final CollectorAssertion<T, R> downstream;
125 
FilteringAssertion(Predicate<T> filter, CollectorAssertion<T, R> downstream)126         public FilteringAssertion(Predicate<T> filter, CollectorAssertion<T, R> downstream) {
127             this.filter = filter;
128             this.downstream = downstream;
129         }
130 
131         @Override
assertValue(R value, Supplier<Stream<T>> source, boolean ordered)132         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
133             downstream.assertValue(value,
134                                    () -> source.get().filter(filter),
135                                    ordered);
136         }
137     }
138 
139     static class GroupingByAssertion<T, K, V, M extends Map<K, ? extends V>> extends CollectorAssertion<T, M> {
140         private final Class<? extends Map> clazz;
141         private final Function<T, K> classifier;
142         private final CollectorAssertion<T,V> downstream;
143 
GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz, CollectorAssertion<T, V> downstream)144         GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz,
145                             CollectorAssertion<T, V> downstream) {
146             this.clazz = clazz;
147             this.classifier = classifier;
148             this.downstream = downstream;
149         }
150 
151         @Override
assertValue(M map, Supplier<Stream<T>> source, boolean ordered)152         void assertValue(M map,
153                          Supplier<Stream<T>> source,
154                          boolean ordered) throws ReflectiveOperationException {
155             if (!clazz.isAssignableFrom(map.getClass()))
156                 fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass()));
157             assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
158             for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
159                 K key = entry.getKey();
160                 downstream.assertValue(entry.getValue(),
161                                        () -> source.get().filter(e -> classifier.apply(e).equals(key)),
162                                        ordered);
163             }
164         }
165     }
166 
167     static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends CollectorAssertion<T, M> {
168         private final Class<? extends Map> clazz;
169         private final Function<T, K> keyFn;
170         private final Function<T, V> valueFn;
171         private final BinaryOperator<V> mergeFn;
172 
ToMapAssertion(Function<T, K> keyFn, Function<T, V> valueFn, BinaryOperator<V> mergeFn, Class<? extends Map> clazz)173         ToMapAssertion(Function<T, K> keyFn,
174                        Function<T, V> valueFn,
175                        BinaryOperator<V> mergeFn,
176                        Class<? extends Map> clazz) {
177             this.clazz = clazz;
178             this.keyFn = keyFn;
179             this.valueFn = valueFn;
180             this.mergeFn = mergeFn;
181         }
182 
183         @Override
assertValue(M map, Supplier<Stream<T>> source, boolean ordered)184         void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
185             if (!clazz.isAssignableFrom(map.getClass()))
186                 fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass()));
187             Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
188             assertEquals(uniqueKeys, map.keySet());
189             source.get().forEach(t -> {
190                 K key = keyFn.apply(t);
191                 V v = source.get()
192                             .filter(e -> key.equals(keyFn.apply(e)))
193                             .map(valueFn)
194                             .reduce(mergeFn)
195                             .get();
196                 assertEquals(map.get(key), v);
197             });
198         }
199     }
200 
201     static class PartitioningByAssertion<T, D> extends CollectorAssertion<T, Map<Boolean,D>> {
202         private final Predicate<T> predicate;
203         private final CollectorAssertion<T,D> downstream;
204 
PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream)205         PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream) {
206             this.predicate = predicate;
207             this.downstream = downstream;
208         }
209 
210         @Override
assertValue(Map<Boolean, D> map, Supplier<Stream<T>> source, boolean ordered)211         void assertValue(Map<Boolean, D> map,
212                          Supplier<Stream<T>> source,
213                          boolean ordered) throws ReflectiveOperationException {
214             if (!Map.class.isAssignableFrom(map.getClass()))
215                 fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass()));
216             assertEquals(2, map.size());
217             downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
218             downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
219         }
220     }
221 
222     static class ToListAssertion<T> extends CollectorAssertion<T, List<T>> {
223         @Override
assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)224         void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
225                 throws ReflectiveOperationException {
226             if (!List.class.isAssignableFrom(value.getClass()))
227                 fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass()));
228             Stream<T> stream = source.get();
229             List<T> result = new ArrayList<>();
230             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
231                 result.add(it.next());
232             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
233                 assertContents(value, result);
234             else
235                 assertContentsUnordered(value, result);
236         }
237     }
238 
239     static class ToCollectionAssertion<T> extends CollectorAssertion<T, Collection<T>> {
240         private final Class<? extends Collection> clazz;
241         private final boolean targetOrdered;
242 
ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered)243         ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
244             this.clazz = clazz;
245             this.targetOrdered = targetOrdered;
246         }
247 
248         @Override
assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)249         void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
250                 throws ReflectiveOperationException {
251             if (!clazz.isAssignableFrom(value.getClass()))
252                 fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass()));
253             Stream<T> stream = source.get();
254             Collection<T> result = clazz.newInstance();
255             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
256                 result.add(it.next());
257             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
258                 assertContents(value, result);
259             else
260                 assertContentsUnordered(value, result);
261         }
262     }
263 
264     static class ReducingAssertion<T, U> extends CollectorAssertion<T, U> {
265         private final U identity;
266         private final Function<T, U> mapper;
267         private final BinaryOperator<U> reducer;
268 
ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer)269         ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
270             this.identity = identity;
271             this.mapper = mapper;
272             this.reducer = reducer;
273         }
274 
275         @Override
assertValue(U value, Supplier<Stream<T>> source, boolean ordered)276         void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
277                 throws ReflectiveOperationException {
278             Optional<U> reduced = source.get().map(mapper).reduce(reducer);
279             if (value == null)
280                 assertTrue(!reduced.isPresent());
281             else if (!reduced.isPresent()) {
282                 assertEquals(value, identity);
283             }
284             else {
285                 assertEquals(value, reduced.get());
286             }
287         }
288     }
289 
mapTabulationAsserter(boolean ordered)290     private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
291         return (act, exp, ord, par) -> {
292             if (par && (!ordered || !ord)) {
293                 CollectorsTest.nestedMapEqualityAssertion(act, exp);
294             }
295             else {
296                 LambdaTestHelpers.assertContentsEqual(act, exp);
297             }
298         };
299     }
300 
301     private<T, M extends Map>
302     void exerciseMapCollection(TestData<T, Stream<T>> data,
303                                Collector<T, ?, ? extends M> collector,
304                                CollectorAssertion<T, M> assertion)
305             throws ReflectiveOperationException {
306         boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
307 
308         M m = withData(data)
309                 .terminal(s -> s.collect(collector))
310                 .resultAsserter(mapTabulationAsserter(ordered))
311                 .exercise();
312         assertion.assertValue(m, () -> data.stream(), ordered);
313 
314         m = withData(data)
315                 .terminal(s -> s.unordered().collect(collector))
316                 .resultAsserter(mapTabulationAsserter(ordered))
317                 .exercise();
318         assertion.assertValue(m, () -> data.stream(), false);
319     }
320 
321     private static void nestedMapEqualityAssertion(Object o1, Object o2) {
322         if (o1 instanceof Map) {
323             Map m1 = (Map) o1;
324             Map m2 = (Map) o2;
325             assertContentsUnordered(m1.keySet(), m2.keySet());
326             for (Object k : m1.keySet())
327                 nestedMapEqualityAssertion(m1.get(k), m2.get(k));
328         }
329         else if (o1 instanceof Collection) {
330             assertContentsUnordered(((Collection) o1), ((Collection) o2));
331         }
332         else
333             assertEquals(o1, o2);
334     }
335 
336     private<T, R> void assertCollect(TestData.OfRef<T> data,
337                                      Collector<T, ?, R> collector,
338                                      Function<Stream<T>, R> streamReduction) {
339         R check = streamReduction.apply(data.stream());
340         withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
341     }
342 
343     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
344     public void testReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
345         assertCollect(data, Collectors.reducing(0, Integer::sum),
346                       s -> s.reduce(0, Integer::sum));
347         assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
348                       s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
349         assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
350                       s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
351 
352         assertCollect(data, Collectors.reducing(Integer::sum),
353                       s -> s.reduce(Integer::sum));
354         assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
355                       s -> s.min(Integer::compare));
356         assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
357                       s -> s.max(Integer::compare));
358 
359         assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
360                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
361 
362         assertCollect(data, Collectors.summingLong(x -> x * 2L),
363                       s -> s.map(x -> x*2L).reduce(0L, Long::sum));
364         assertCollect(data, Collectors.summingInt(x -> x * 2),
365                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
366         assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
367                       s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
368 
369         assertCollect(data, Collectors.averagingInt(x -> x * 2),
370                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
371         assertCollect(data, Collectors.averagingLong(x -> x * 2),
372                       s -> s.mapToLong(x -> x * 2).average().orElse(0));
373         assertCollect(data, Collectors.averagingDouble(x -> x * 2),
374                       s -> s.mapToDouble(x -> x * 2).average().orElse(0));
375 
376         // Test explicit Collector.of
377         Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
378                                                                    (a, b) -> {
379                                                                        a[0] += b * 2;
380                                                                        a[1]++;
381                                                                    },
382                                                                    (a, b) -> {
383                                                                        a[0] += b[0];
384                                                                        a[1] += b[1];
385                                                                        return a;
386                                                                    },
387                                                                    a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
388         assertCollect(data, avg2xint,
389                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
390     }
391 
392     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
393     public void testJoining(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
394         withData(data)
395                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
396                 .expectedResult(join(data, ""))
397                 .exercise();
398 
399         Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
400         withData(data)
401                 .terminal(s -> s.map(Object::toString).collect(likeJoining))
402                 .expectedResult(join(data, ""))
403                 .exercise();
404 
405         withData(data)
406                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
407                 .expectedResult(join(data, ","))
408                 .exercise();
409 
410         withData(data)
411                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
412                 .expectedResult("[" + join(data, ",") + "]")
413                 .exercise();
414 
415         withData(data)
416                 .terminal(s -> s.map(Object::toString)
417                                 .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
418                                 .toString())
419                 .expectedResult(join(data, ""))
420                 .exercise();
421 
422         withData(data)
423                 .terminal(s -> s.map(Object::toString)
424                                 .collect(() -> new StringJoiner(","),
425                                          (sj, cs) -> sj.add(cs),
426                                          (j1, j2) -> j1.merge(j2))
427                                 .toString())
428                 .expectedResult(join(data, ","))
429                 .exercise();
430 
431         withData(data)
432                 .terminal(s -> s.map(Object::toString)
433                                 .collect(() -> new StringJoiner(",", "[", "]"),
434                                          (sj, cs) -> sj.add(cs),
435                                          (j1, j2) -> j1.merge(j2))
436                                 .toString())
437                 .expectedResult("[" + join(data, ",") + "]")
438                 .exercise();
439     }
440 
441     private<T> String join(TestData.OfRef<T> data, String delim) {
442         StringBuilder sb = new StringBuilder();
443         boolean first = true;
444         for (T i : data) {
445             if (!first)
446                 sb.append(delim);
447             sb.append(i.toString());
448             first = false;
449         }
450         return sb.toString();
451     }
452 
453     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
454     public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
455         Function<Integer, Integer> keyFn = i -> i * 2;
456         Function<Integer, Integer> valueFn = i -> i * 4;
457 
458         List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
459         Set<Integer> dataAsSet = new HashSet<>(dataAsList);
460 
461         BinaryOperator<Integer> sum = Integer::sum;
462         for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
463                                                         (u, v) -> v,
464                                                         sum)) {
465             try {
466                 exerciseMapCollection(data, toMap(keyFn, valueFn),
467                                       new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
468                 if (dataAsList.size() != dataAsSet.size())
469                     fail("Expected ISE on input with duplicates");
470             }
471             catch (IllegalStateException e) {
472                 if (dataAsList.size() == dataAsSet.size())
473                     fail("Expected no ISE on input without duplicates");
474             }
475 
476             exerciseMapCollection(data, toMap(keyFn, valueFn, op),
477                                   new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
478 
479             exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new),
480                                   new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
481         }
482 
483         // For concurrent maps, only use commutative merge functions
484         try {
485             exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn),
486                                   new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
487             if (dataAsList.size() != dataAsSet.size())
488                 fail("Expected ISE on input with duplicates");
489         }
490         catch (IllegalStateException e) {
491             if (dataAsList.size() == dataAsSet.size())
492                 fail("Expected no ISE on input without duplicates");
493         }
494 
495         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum),
496                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
497 
498         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
499                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
500     }
501 
502     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
503     public void testSimpleGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
504         Function<Integer, Integer> classifier = i -> i % 3;
505 
506         // Single-level groupBy
507         exerciseMapCollection(data, groupingBy(classifier),
508                               new GroupingByAssertion<>(classifier, HashMap.class,
509                                                         new ToListAssertion<>()));
510         exerciseMapCollection(data, groupingByConcurrent(classifier),
511                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
512                                                         new ToListAssertion<>()));
513 
514         // With explicit constructors
515         exerciseMapCollection(data,
516                               groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
517                               new GroupingByAssertion<>(classifier, TreeMap.class,
518                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
519         exerciseMapCollection(data,
520                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
521                                                    toCollection(HashSet::new)),
522                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
523                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
524     }
525 
526     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
527     public void testGroupingByWithMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
528         Function<Integer, Integer> classifier = i -> i % 3;
529         Function<Integer, Integer> mapper = i -> i * 2;
530 
531         exerciseMapCollection(data,
532                               groupingBy(classifier, mapping(mapper, toList())),
533                               new GroupingByAssertion<>(classifier, HashMap.class,
534                                                         new MappingAssertion<>(mapper,
535                                                                                new ToListAssertion<>())));
536     }
537 
538     @Test(groups = { "serialization-hostile" })
539     public void testFlatMappingClose() {
540         Function<Integer, Integer> classifier = i -> i;
541         AtomicInteger ai = new AtomicInteger();
542         Function<Integer, Stream<Integer>> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement);
543         Map<Integer, List<Integer>> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList())));
544         assertEquals(m.size(), ai.get());
545     }
546 
547     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
548     public void testGroupingByWithFlatMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
549         Function<Integer, Integer> classifier = i -> i % 3;
550         Function<Integer, Stream<Integer>> flatMapperByNull = i -> null;
551         Function<Integer, Stream<Integer>> flatMapperBy0 = i -> Stream.empty();
552         Function<Integer, Stream<Integer>> flatMapperBy2 = i -> Stream.of(i, i);
553 
554         exerciseMapCollection(data,
555                               groupingBy(classifier, flatMapping(flatMapperByNull, toList())),
556                               new GroupingByAssertion<>(classifier, HashMap.class,
557                                                         new FlatMappingAssertion<>(flatMapperBy0,
558                                                                                    new ToListAssertion<>())));
559         exerciseMapCollection(data,
560                               groupingBy(classifier, flatMapping(flatMapperBy0, toList())),
561                               new GroupingByAssertion<>(classifier, HashMap.class,
562                                                         new FlatMappingAssertion<>(flatMapperBy0,
563                                                                                    new ToListAssertion<>())));
564         exerciseMapCollection(data,
565                               groupingBy(classifier, flatMapping(flatMapperBy2, toList())),
566                               new GroupingByAssertion<>(classifier, HashMap.class,
567                                                         new FlatMappingAssertion<>(flatMapperBy2,
568                                                                                    new ToListAssertion<>())));
569     }
570 
571     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
572     public void testGroupingByWithFiltering(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
573         Function<Integer, Integer> classifier = i -> i % 3;
574         Predicate<Integer> filteringByMod2 = i -> i % 2 == 0;
575         Predicate<Integer> filteringByUnder100 = i -> i % 2 < 100;
576         Predicate<Integer> filteringByTrue = i -> true;
577         Predicate<Integer> filteringByFalse = i -> false;
578 
579         exerciseMapCollection(data,
580                               groupingBy(classifier, filtering(filteringByMod2, toList())),
581                               new GroupingByAssertion<>(classifier, HashMap.class,
582                                                         new FilteringAssertion<>(filteringByMod2,
583                                                                                    new ToListAssertion<>())));
584         exerciseMapCollection(data,
585                               groupingBy(classifier, filtering(filteringByUnder100, toList())),
586                               new GroupingByAssertion<>(classifier, HashMap.class,
587                                                         new FilteringAssertion<>(filteringByUnder100,
588                                                                                    new ToListAssertion<>())));
589         exerciseMapCollection(data,
590                               groupingBy(classifier, filtering(filteringByTrue, toList())),
591                               new GroupingByAssertion<>(classifier, HashMap.class,
592                                                         new FilteringAssertion<>(filteringByTrue,
593                                                                                    new ToListAssertion<>())));
594         exerciseMapCollection(data,
595                               groupingBy(classifier, filtering(filteringByFalse, toList())),
596                               new GroupingByAssertion<>(classifier, HashMap.class,
597                                                         new FilteringAssertion<>(filteringByFalse,
598                                                                                    new ToListAssertion<>())));
599     }
600 
601     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
602     public void testTwoLevelGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
603         Function<Integer, Integer> classifier = i -> i % 6;
604         Function<Integer, Integer> classifier2 = i -> i % 23;
605 
606         // Two-level groupBy
607         exerciseMapCollection(data,
608                               groupingBy(classifier, groupingBy(classifier2)),
609                               new GroupingByAssertion<>(classifier, HashMap.class,
610                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
611                                                                                   new ToListAssertion<>())));
612         // with concurrent as upstream
613         exerciseMapCollection(data,
614                               groupingByConcurrent(classifier, groupingBy(classifier2)),
615                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
616                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
617                                                                                   new ToListAssertion<>())));
618         // with concurrent as downstream
619         exerciseMapCollection(data,
620                               groupingBy(classifier, groupingByConcurrent(classifier2)),
621                               new GroupingByAssertion<>(classifier, HashMap.class,
622                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
623                                                                                   new ToListAssertion<>())));
624         // with concurrent as upstream and downstream
625         exerciseMapCollection(data,
626                               groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
627                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
628                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
629                                                                                   new ToListAssertion<>())));
630 
631         // With explicit constructors
632         exerciseMapCollection(data,
633                               groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
634                               new GroupingByAssertion<>(classifier, TreeMap.class,
635                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
636                                                                                   new ToCollectionAssertion<Integer>(HashSet.class, false))));
637         // with concurrent as upstream
638         exerciseMapCollection(data,
639                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
640                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
641                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
642                                                                                   new ToListAssertion<>())));
643         // with concurrent as downstream
644         exerciseMapCollection(data,
645                               groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
646                               new GroupingByAssertion<>(classifier, TreeMap.class,
647                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
648                                                                                   new ToListAssertion<>())));
649         // with concurrent as upstream and downstream
650         exerciseMapCollection(data,
651                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
652                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
653                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
654                                                                                   new ToListAssertion<>())));
655     }
656 
657     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
658     public void testGroupubgByWithReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
659         Function<Integer, Integer> classifier = i -> i % 3;
660 
661         // Single-level simple reduce
662         exerciseMapCollection(data,
663                               groupingBy(classifier, reducing(0, Integer::sum)),
664                               new GroupingByAssertion<>(classifier, HashMap.class,
665                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
666         // with concurrent
667         exerciseMapCollection(data,
668                               groupingByConcurrent(classifier, reducing(0, Integer::sum)),
669                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
670                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
671 
672         // With explicit constructors
673         exerciseMapCollection(data,
674                               groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
675                               new GroupingByAssertion<>(classifier, TreeMap.class,
676                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
677         // with concurrent
678         exerciseMapCollection(data,
679                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
680                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
681                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
682 
683         // Single-level map-reduce
684         exerciseMapCollection(data,
685                               groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
686                               new GroupingByAssertion<>(classifier, HashMap.class,
687                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
688         // with concurrent
689         exerciseMapCollection(data,
690                               groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
691                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
692                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
693 
694         // With explicit constructors
695         exerciseMapCollection(data,
696                               groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
697                               new GroupingByAssertion<>(classifier, TreeMap.class,
698                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
699         // with concurrent
700         exerciseMapCollection(data,
701                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
702                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
703                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
704     }
705 
706     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
707     public void testSimplePartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
708         Predicate<Integer> classifier = i -> i % 3 == 0;
709 
710         // Single-level partition to downstream List
711         exerciseMapCollection(data,
712                               partitioningBy(classifier),
713                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
714         exerciseMapCollection(data,
715                               partitioningBy(classifier, toList()),
716                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
717     }
718 
719     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
720     public void testTwoLevelPartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
721         Predicate<Integer> classifier = i -> i % 3 == 0;
722         Predicate<Integer> classifier2 = i -> i % 7 == 0;
723 
724         // Two level partition
725         exerciseMapCollection(data,
726                               partitioningBy(classifier, partitioningBy(classifier2)),
727                               new PartitioningByAssertion<>(classifier,
728                                                             new PartitioningByAssertion(classifier2, new ToListAssertion<>())));
729 
730         // Two level partition with reduce
731         exerciseMapCollection(data,
732                               partitioningBy(classifier, reducing(0, Integer::sum)),
733                               new PartitioningByAssertion<>(classifier,
734                                                             new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
735     }
736 
737     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
738     public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
739         List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
740         List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
741         assertEquals(asList, asImmutableList);
742         try {
743             asImmutableList.add(0);
744             fail("Expecting immutable result");
745         }
746         catch (UnsupportedOperationException ignored) { }
747     }
748 
749 }
750