1 /* 2 * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Comparator; 28 import java.util.Objects; 29 import java.util.Spliterator; 30 import java.util.concurrent.ConcurrentHashMap; 31 import java.util.concurrent.ForkJoinPool; 32 import java.util.concurrent.atomic.AtomicLong; 33 import java.util.function.BooleanSupplier; 34 import java.util.function.Consumer; 35 import java.util.function.DoubleConsumer; 36 import java.util.function.DoubleSupplier; 37 import java.util.function.IntConsumer; 38 import java.util.function.IntSupplier; 39 import java.util.function.LongConsumer; 40 import java.util.function.LongSupplier; 41 import java.util.function.Supplier; 42 43 /** 44 * Spliterator implementations for wrapping and delegating spliterators, used 45 * in the implementation of the {@link Stream#spliterator()} method. 46 * 47 * @since 1.8 48 */ 49 class StreamSpliterators { 50 51 /** 52 * Abstract wrapping spliterator that binds to the spliterator of a 53 * pipeline helper on first operation. 54 * 55 * <p>This spliterator is not late-binding and will bind to the source 56 * spliterator when first operated on. 57 * 58 * <p>A wrapping spliterator produced from a sequential stream 59 * cannot be split if there are stateful operations present. 60 */ 61 private static abstract class AbstractWrappingSpliterator<P_IN, P_OUT, 62 T_BUFFER extends AbstractSpinedBuffer> 63 implements Spliterator<P_OUT> { 64 65 // @@@ Detect if stateful operations are present or not 66 // If not then can split otherwise cannot 67 68 /** 69 * True if this spliterator supports splitting 70 */ 71 final boolean isParallel; 72 73 final PipelineHelper<P_OUT> ph; 74 75 /** 76 * Supplier for the source spliterator. Client provides either a 77 * spliterator or a supplier. 78 */ 79 private Supplier<Spliterator<P_IN>> spliteratorSupplier; 80 81 /** 82 * Source spliterator. Either provided from client or obtained from 83 * supplier. 84 */ 85 Spliterator<P_IN> spliterator; 86 87 /** 88 * Sink chain for the downstream stages of the pipeline, ultimately 89 * leading to the buffer. Used during partial traversal. 90 */ 91 Sink<P_IN> bufferSink; 92 93 /** 94 * A function that advances one element of the spliterator, pushing 95 * it to bufferSink. Returns whether any elements were processed. 96 * Used during partial traversal. 97 */ 98 BooleanSupplier pusher; 99 100 /** Next element to consume from the buffer, used during partial traversal */ 101 long nextToConsume; 102 103 /** Buffer into which elements are pushed. Used during partial traversal. */ 104 T_BUFFER buffer; 105 106 /** 107 * True if full traversal has occurred (with possible cancelation). 108 * If doing a partial traversal, there may be still elements in buffer. 109 */ 110 boolean finished; 111 112 /** 113 * Construct an AbstractWrappingSpliterator from a 114 * {@code Supplier<Spliterator>}. 115 */ AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> spliteratorSupplier, boolean parallel)116 AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, 117 Supplier<Spliterator<P_IN>> spliteratorSupplier, 118 boolean parallel) { 119 this.ph = ph; 120 this.spliteratorSupplier = spliteratorSupplier; 121 this.spliterator = null; 122 this.isParallel = parallel; 123 } 124 125 /** 126 * Construct an AbstractWrappingSpliterator from a 127 * {@code Spliterator}. 128 */ AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel)129 AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, 130 Spliterator<P_IN> spliterator, 131 boolean parallel) { 132 this.ph = ph; 133 this.spliteratorSupplier = null; 134 this.spliterator = spliterator; 135 this.isParallel = parallel; 136 } 137 138 /** 139 * Called before advancing to set up spliterator, if needed. 140 */ init()141 final void init() { 142 if (spliterator == null) { 143 spliterator = spliteratorSupplier.get(); 144 spliteratorSupplier = null; 145 } 146 } 147 148 /** 149 * Get an element from the source, pushing it into the sink chain, 150 * setting up the buffer if needed 151 * @return whether there are elements to consume from the buffer 152 */ doAdvance()153 final boolean doAdvance() { 154 if (buffer == null) { 155 if (finished) 156 return false; 157 158 init(); 159 initPartialTraversalState(); 160 nextToConsume = 0; 161 bufferSink.begin(spliterator.getExactSizeIfKnown()); 162 return fillBuffer(); 163 } 164 else { 165 ++nextToConsume; 166 boolean hasNext = nextToConsume < buffer.count(); 167 if (!hasNext) { 168 nextToConsume = 0; 169 buffer.clear(); 170 hasNext = fillBuffer(); 171 } 172 return hasNext; 173 } 174 } 175 176 /** 177 * Invokes the shape-specific constructor with the provided arguments 178 * and returns the result. 179 */ 180 abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> s); 181 182 /** 183 * Initializes buffer, sink chain, and pusher for a shape-specific 184 * implementation. 185 */ 186 abstract void initPartialTraversalState(); 187 188 @Override 189 public Spliterator<P_OUT> trySplit() { 190 if (isParallel && !finished) { 191 init(); 192 193 Spliterator<P_IN> split = spliterator.trySplit(); 194 return (split == null) ? null : wrap(split); 195 } 196 else 197 return null; 198 } 199 200 /** 201 * If the buffer is empty, push elements into the sink chain until 202 * the source is empty or cancellation is requested. 203 * @return whether there are elements to consume from the buffer 204 */ 205 private boolean fillBuffer() { 206 while (buffer.count() == 0) { 207 if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) { 208 if (finished) 209 return false; 210 else { 211 bufferSink.end(); // might trigger more elements 212 finished = true; 213 } 214 } 215 } 216 return true; 217 } 218 219 @Override 220 public final long estimateSize() { 221 init(); 222 // Use the estimate of the wrapped spliterator 223 // Note this may not be accurate if there are filter/flatMap 224 // operations filtering or adding elements to the stream 225 return spliterator.estimateSize(); 226 } 227 228 @Override 229 public final long getExactSizeIfKnown() { 230 init(); 231 return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags()) 232 ? spliterator.getExactSizeIfKnown() 233 : -1; 234 } 235 236 @Override 237 public final int characteristics() { 238 init(); 239 240 // Get the characteristics from the pipeline 241 int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags())); 242 243 // Mask off the size and uniform characteristics and replace with 244 // those of the spliterator 245 // Note that a non-uniform spliterator can change from something 246 // with an exact size to an estimate for a sub-split, for example 247 // with HashSet where the size is known at the top level spliterator 248 // but for sub-splits only an estimate is known 249 if ((c & Spliterator.SIZED) != 0) { 250 c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED); 251 c |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED)); 252 } 253 254 return c; 255 } 256 257 @Override 258 public Comparator<? super P_OUT> getComparator() { 259 if (!hasCharacteristics(SORTED)) 260 throw new IllegalStateException(); 261 return null; 262 } 263 264 @Override 265 public final String toString() { 266 return String.format("%s[%s]", getClass().getName(), spliterator); 267 } 268 } 269 270 static final class WrappingSpliterator<P_IN, P_OUT> 271 extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> { 272 273 WrappingSpliterator(PipelineHelper<P_OUT> ph, 274 Supplier<Spliterator<P_IN>> supplier, 275 boolean parallel) { 276 super(ph, supplier, parallel); 277 } 278 279 WrappingSpliterator(PipelineHelper<P_OUT> ph, 280 Spliterator<P_IN> spliterator, 281 boolean parallel) { 282 super(ph, spliterator, parallel); 283 } 284 285 @Override 286 WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) { 287 return new WrappingSpliterator<>(ph, s, isParallel); 288 } 289 290 @Override 291 void initPartialTraversalState() { 292 SpinedBuffer<P_OUT> b = new SpinedBuffer<>(); 293 buffer = b; 294 bufferSink = ph.wrapSink(b::accept); 295 pusher = () -> spliterator.tryAdvance(bufferSink); 296 } 297 298 @Override tryAdvance(Consumer<? super P_OUT> consumer)299 public boolean tryAdvance(Consumer<? super P_OUT> consumer) { 300 Objects.requireNonNull(consumer); 301 boolean hasNext = doAdvance(); 302 if (hasNext) 303 consumer.accept(buffer.get(nextToConsume)); 304 return hasNext; 305 } 306 307 @Override forEachRemaining(Consumer<? super P_OUT> consumer)308 public void forEachRemaining(Consumer<? super P_OUT> consumer) { 309 if (buffer == null && !finished) { 310 Objects.requireNonNull(consumer); 311 init(); 312 313 ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator); 314 finished = true; 315 } 316 else { 317 do { } while (tryAdvance(consumer)); 318 } 319 } 320 } 321 322 static final class IntWrappingSpliterator<P_IN> 323 extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt> 324 implements Spliterator.OfInt { 325 IntWrappingSpliterator(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)326 IntWrappingSpliterator(PipelineHelper<Integer> ph, 327 Supplier<Spliterator<P_IN>> supplier, 328 boolean parallel) { 329 super(ph, supplier, parallel); 330 } 331 IntWrappingSpliterator(PipelineHelper<Integer> ph, Spliterator<P_IN> spliterator, boolean parallel)332 IntWrappingSpliterator(PipelineHelper<Integer> ph, 333 Spliterator<P_IN> spliterator, 334 boolean parallel) { 335 super(ph, spliterator, parallel); 336 } 337 338 @Override wrap(Spliterator<P_IN> s)339 AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> s) { 340 return new IntWrappingSpliterator<>(ph, s, isParallel); 341 } 342 343 @Override initPartialTraversalState()344 void initPartialTraversalState() { 345 SpinedBuffer.OfInt b = new SpinedBuffer.OfInt(); 346 buffer = b; 347 bufferSink = ph.wrapSink((Sink.OfInt) b::accept); 348 pusher = () -> spliterator.tryAdvance(bufferSink); 349 } 350 351 @Override trySplit()352 public Spliterator.OfInt trySplit() { 353 return (Spliterator.OfInt) super.trySplit(); 354 } 355 356 @Override tryAdvance(IntConsumer consumer)357 public boolean tryAdvance(IntConsumer consumer) { 358 Objects.requireNonNull(consumer); 359 boolean hasNext = doAdvance(); 360 if (hasNext) 361 consumer.accept(buffer.get(nextToConsume)); 362 return hasNext; 363 } 364 365 @Override forEachRemaining(IntConsumer consumer)366 public void forEachRemaining(IntConsumer consumer) { 367 if (buffer == null && !finished) { 368 Objects.requireNonNull(consumer); 369 init(); 370 371 ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator); 372 finished = true; 373 } 374 else { 375 do { } while (tryAdvance(consumer)); 376 } 377 } 378 } 379 380 static final class LongWrappingSpliterator<P_IN> 381 extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong> 382 implements Spliterator.OfLong { 383 LongWrappingSpliterator(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)384 LongWrappingSpliterator(PipelineHelper<Long> ph, 385 Supplier<Spliterator<P_IN>> supplier, 386 boolean parallel) { 387 super(ph, supplier, parallel); 388 } 389 LongWrappingSpliterator(PipelineHelper<Long> ph, Spliterator<P_IN> spliterator, boolean parallel)390 LongWrappingSpliterator(PipelineHelper<Long> ph, 391 Spliterator<P_IN> spliterator, 392 boolean parallel) { 393 super(ph, spliterator, parallel); 394 } 395 396 @Override wrap(Spliterator<P_IN> s)397 AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> s) { 398 return new LongWrappingSpliterator<>(ph, s, isParallel); 399 } 400 401 @Override initPartialTraversalState()402 void initPartialTraversalState() { 403 SpinedBuffer.OfLong b = new SpinedBuffer.OfLong(); 404 buffer = b; 405 bufferSink = ph.wrapSink((Sink.OfLong) b::accept); 406 pusher = () -> spliterator.tryAdvance(bufferSink); 407 } 408 409 @Override trySplit()410 public Spliterator.OfLong trySplit() { 411 return (Spliterator.OfLong) super.trySplit(); 412 } 413 414 @Override tryAdvance(LongConsumer consumer)415 public boolean tryAdvance(LongConsumer consumer) { 416 Objects.requireNonNull(consumer); 417 boolean hasNext = doAdvance(); 418 if (hasNext) 419 consumer.accept(buffer.get(nextToConsume)); 420 return hasNext; 421 } 422 423 @Override forEachRemaining(LongConsumer consumer)424 public void forEachRemaining(LongConsumer consumer) { 425 if (buffer == null && !finished) { 426 Objects.requireNonNull(consumer); 427 init(); 428 429 ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator); 430 finished = true; 431 } 432 else { 433 do { } while (tryAdvance(consumer)); 434 } 435 } 436 } 437 438 static final class DoubleWrappingSpliterator<P_IN> 439 extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble> 440 implements Spliterator.OfDouble { 441 DoubleWrappingSpliterator(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)442 DoubleWrappingSpliterator(PipelineHelper<Double> ph, 443 Supplier<Spliterator<P_IN>> supplier, 444 boolean parallel) { 445 super(ph, supplier, parallel); 446 } 447 DoubleWrappingSpliterator(PipelineHelper<Double> ph, Spliterator<P_IN> spliterator, boolean parallel)448 DoubleWrappingSpliterator(PipelineHelper<Double> ph, 449 Spliterator<P_IN> spliterator, 450 boolean parallel) { 451 super(ph, spliterator, parallel); 452 } 453 454 @Override wrap(Spliterator<P_IN> s)455 AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> s) { 456 return new DoubleWrappingSpliterator<>(ph, s, isParallel); 457 } 458 459 @Override initPartialTraversalState()460 void initPartialTraversalState() { 461 SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble(); 462 buffer = b; 463 bufferSink = ph.wrapSink((Sink.OfDouble) b::accept); 464 pusher = () -> spliterator.tryAdvance(bufferSink); 465 } 466 467 @Override trySplit()468 public Spliterator.OfDouble trySplit() { 469 return (Spliterator.OfDouble) super.trySplit(); 470 } 471 472 @Override tryAdvance(DoubleConsumer consumer)473 public boolean tryAdvance(DoubleConsumer consumer) { 474 Objects.requireNonNull(consumer); 475 boolean hasNext = doAdvance(); 476 if (hasNext) 477 consumer.accept(buffer.get(nextToConsume)); 478 return hasNext; 479 } 480 481 @Override forEachRemaining(DoubleConsumer consumer)482 public void forEachRemaining(DoubleConsumer consumer) { 483 if (buffer == null && !finished) { 484 Objects.requireNonNull(consumer); 485 init(); 486 487 ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator); 488 finished = true; 489 } 490 else { 491 do { } while (tryAdvance(consumer)); 492 } 493 } 494 } 495 496 /** 497 * Spliterator implementation that delegates to an underlying spliterator, 498 * acquiring the spliterator from a {@code Supplier<Spliterator>} on the 499 * first call to any spliterator method. 500 * @param <T> 501 */ 502 static class DelegatingSpliterator<T, T_SPLITR extends Spliterator<T>> 503 implements Spliterator<T> { 504 private final Supplier<? extends T_SPLITR> supplier; 505 506 private T_SPLITR s; 507 DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier)508 DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier) { 509 this.supplier = supplier; 510 } 511 get()512 T_SPLITR get() { 513 if (s == null) { 514 s = supplier.get(); 515 } 516 return s; 517 } 518 519 @Override 520 @SuppressWarnings("unchecked") trySplit()521 public T_SPLITR trySplit() { 522 return (T_SPLITR) get().trySplit(); 523 } 524 525 @Override tryAdvance(Consumer<? super T> consumer)526 public boolean tryAdvance(Consumer<? super T> consumer) { 527 return get().tryAdvance(consumer); 528 } 529 530 @Override forEachRemaining(Consumer<? super T> consumer)531 public void forEachRemaining(Consumer<? super T> consumer) { 532 get().forEachRemaining(consumer); 533 } 534 535 @Override estimateSize()536 public long estimateSize() { 537 return get().estimateSize(); 538 } 539 540 @Override characteristics()541 public int characteristics() { 542 return get().characteristics(); 543 } 544 545 @Override getComparator()546 public Comparator<? super T> getComparator() { 547 return get().getComparator(); 548 } 549 550 @Override getExactSizeIfKnown()551 public long getExactSizeIfKnown() { 552 return get().getExactSizeIfKnown(); 553 } 554 555 @Override toString()556 public String toString() { 557 return getClass().getName() + "[" + get() + "]"; 558 } 559 560 static class OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> 561 extends DelegatingSpliterator<T, T_SPLITR> 562 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> { OfPrimitive(Supplier<? extends T_SPLITR> supplier)563 OfPrimitive(Supplier<? extends T_SPLITR> supplier) { 564 super(supplier); 565 } 566 567 @Override tryAdvance(T_CONS consumer)568 public boolean tryAdvance(T_CONS consumer) { 569 return get().tryAdvance(consumer); 570 } 571 572 @Override forEachRemaining(T_CONS consumer)573 public void forEachRemaining(T_CONS consumer) { 574 get().forEachRemaining(consumer); 575 } 576 } 577 578 static final class OfInt 579 extends OfPrimitive<Integer, IntConsumer, Spliterator.OfInt> 580 implements Spliterator.OfInt { 581 OfInt(Supplier<Spliterator.OfInt> supplier)582 OfInt(Supplier<Spliterator.OfInt> supplier) { 583 super(supplier); 584 } 585 } 586 587 static final class OfLong 588 extends OfPrimitive<Long, LongConsumer, Spliterator.OfLong> 589 implements Spliterator.OfLong { 590 OfLong(Supplier<Spliterator.OfLong> supplier)591 OfLong(Supplier<Spliterator.OfLong> supplier) { 592 super(supplier); 593 } 594 } 595 596 static final class OfDouble 597 extends OfPrimitive<Double, DoubleConsumer, Spliterator.OfDouble> 598 implements Spliterator.OfDouble { 599 OfDouble(Supplier<Spliterator.OfDouble> supplier)600 OfDouble(Supplier<Spliterator.OfDouble> supplier) { 601 super(supplier); 602 } 603 } 604 } 605 606 /** 607 * A slice Spliterator from a source Spliterator that reports 608 * {@code SUBSIZED}. 609 * 610 */ 611 static abstract class SliceSpliterator<T, T_SPLITR extends Spliterator<T>> { 612 // The start index of the slice 613 final long sliceOrigin; 614 // One past the last index of the slice 615 final long sliceFence; 616 617 // The spliterator to slice 618 T_SPLITR s; 619 // current (absolute) index, modified on advance/split 620 long index; 621 // one past last (absolute) index or sliceFence, which ever is smaller 622 long fence; 623 SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)624 SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence) { 625 assert s.hasCharacteristics(Spliterator.SUBSIZED); 626 this.s = s; 627 this.sliceOrigin = sliceOrigin; 628 this.sliceFence = sliceFence; 629 this.index = origin; 630 this.fence = fence; 631 } 632 makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)633 protected abstract T_SPLITR makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence); 634 trySplit()635 public T_SPLITR trySplit() { 636 if (sliceOrigin >= fence) 637 return null; 638 639 if (index >= fence) 640 return null; 641 642 // Keep splitting until the left and right splits intersect with the slice 643 // thereby ensuring the size estimate decreases. 644 // This also avoids creating empty spliterators which can result in 645 // existing and additionally created F/J tasks that perform 646 // redundant work on no elements. 647 while (true) { 648 @SuppressWarnings("unchecked") 649 T_SPLITR leftSplit = (T_SPLITR) s.trySplit(); 650 if (leftSplit == null) 651 return null; 652 653 long leftSplitFenceUnbounded = index + leftSplit.estimateSize(); 654 long leftSplitFence = Math.min(leftSplitFenceUnbounded, sliceFence); 655 if (sliceOrigin >= leftSplitFence) { 656 // The left split does not intersect with, and is to the left of, the slice 657 // The right split does intersect 658 // Discard the left split and split further with the right split 659 index = leftSplitFence; 660 } 661 else if (leftSplitFence >= sliceFence) { 662 // The right split does not intersect with, and is to the right of, the slice 663 // The left split does intersect 664 // Discard the right split and split further with the left split 665 s = leftSplit; 666 fence = leftSplitFence; 667 } 668 else if (index >= sliceOrigin && leftSplitFenceUnbounded <= sliceFence) { 669 // The left split is contained within the slice, return the underlying left split 670 // Right split is contained within or intersects with the slice 671 index = leftSplitFence; 672 return leftSplit; 673 } else { 674 // The left split intersects with the slice 675 // Right split is contained within or intersects with the slice 676 return makeSpliterator(leftSplit, sliceOrigin, sliceFence, index, index = leftSplitFence); 677 } 678 } 679 } 680 estimateSize()681 public long estimateSize() { 682 return (sliceOrigin < fence) 683 ? fence - Math.max(sliceOrigin, index) : 0; 684 } 685 characteristics()686 public int characteristics() { 687 return s.characteristics(); 688 } 689 690 static final class OfRef<T> 691 extends SliceSpliterator<T, Spliterator<T>> 692 implements Spliterator<T> { 693 OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence)694 OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence) { 695 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence)); 696 } 697 OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)698 private OfRef(Spliterator<T> s, 699 long sliceOrigin, long sliceFence, long origin, long fence) { 700 super(s, sliceOrigin, sliceFence, origin, fence); 701 } 702 703 @Override makeSpliterator(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)704 protected Spliterator<T> makeSpliterator(Spliterator<T> s, 705 long sliceOrigin, long sliceFence, 706 long origin, long fence) { 707 return new OfRef<>(s, sliceOrigin, sliceFence, origin, fence); 708 } 709 710 @Override tryAdvance(Consumer<? super T> action)711 public boolean tryAdvance(Consumer<? super T> action) { 712 Objects.requireNonNull(action); 713 714 if (sliceOrigin >= fence) 715 return false; 716 717 while (sliceOrigin > index) { 718 s.tryAdvance(e -> {}); 719 index++; 720 } 721 722 if (index >= fence) 723 return false; 724 725 index++; 726 return s.tryAdvance(action); 727 } 728 729 @Override forEachRemaining(Consumer<? super T> action)730 public void forEachRemaining(Consumer<? super T> action) { 731 Objects.requireNonNull(action); 732 733 if (sliceOrigin >= fence) 734 return; 735 736 if (index >= fence) 737 return; 738 739 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) { 740 // The spliterator is contained within the slice 741 s.forEachRemaining(action); 742 index = fence; 743 } else { 744 // The spliterator intersects with the slice 745 while (sliceOrigin > index) { 746 s.tryAdvance(e -> {}); 747 index++; 748 } 749 // Traverse elements up to the fence 750 for (;index < fence; index++) { 751 s.tryAdvance(action); 752 } 753 } 754 } 755 } 756 757 static abstract class OfPrimitive<T, 758 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>, 759 T_CONS> 760 extends SliceSpliterator<T, T_SPLITR> 761 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> { 762 OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence)763 OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence) { 764 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence)); 765 } 766 OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)767 private OfPrimitive(T_SPLITR s, 768 long sliceOrigin, long sliceFence, long origin, long fence) { 769 super(s, sliceOrigin, sliceFence, origin, fence); 770 } 771 772 @Override tryAdvance(T_CONS action)773 public boolean tryAdvance(T_CONS action) { 774 Objects.requireNonNull(action); 775 776 if (sliceOrigin >= fence) 777 return false; 778 779 while (sliceOrigin > index) { 780 s.tryAdvance(emptyConsumer()); 781 index++; 782 } 783 784 if (index >= fence) 785 return false; 786 787 index++; 788 return s.tryAdvance(action); 789 } 790 791 @Override forEachRemaining(T_CONS action)792 public void forEachRemaining(T_CONS action) { 793 Objects.requireNonNull(action); 794 795 if (sliceOrigin >= fence) 796 return; 797 798 if (index >= fence) 799 return; 800 801 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) { 802 // The spliterator is contained within the slice 803 s.forEachRemaining(action); 804 index = fence; 805 } else { 806 // The spliterator intersects with the slice 807 while (sliceOrigin > index) { 808 s.tryAdvance(emptyConsumer()); 809 index++; 810 } 811 // Traverse elements up to the fence 812 for (;index < fence; index++) { 813 s.tryAdvance(action); 814 } 815 } 816 } 817 emptyConsumer()818 protected abstract T_CONS emptyConsumer(); 819 } 820 821 static final class OfInt extends OfPrimitive<Integer, Spliterator.OfInt, IntConsumer> 822 implements Spliterator.OfInt { OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence)823 OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence) { 824 super(s, sliceOrigin, sliceFence); 825 } 826 OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)827 OfInt(Spliterator.OfInt s, 828 long sliceOrigin, long sliceFence, long origin, long fence) { 829 super(s, sliceOrigin, sliceFence, origin, fence); 830 } 831 832 @Override makeSpliterator(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)833 protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s, 834 long sliceOrigin, long sliceFence, 835 long origin, long fence) { 836 return new SliceSpliterator.OfInt(s, sliceOrigin, sliceFence, origin, fence); 837 } 838 839 @Override emptyConsumer()840 protected IntConsumer emptyConsumer() { 841 return e -> {}; 842 } 843 } 844 845 static final class OfLong extends OfPrimitive<Long, Spliterator.OfLong, LongConsumer> 846 implements Spliterator.OfLong { OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence)847 OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence) { 848 super(s, sliceOrigin, sliceFence); 849 } 850 OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)851 OfLong(Spliterator.OfLong s, 852 long sliceOrigin, long sliceFence, long origin, long fence) { 853 super(s, sliceOrigin, sliceFence, origin, fence); 854 } 855 856 @Override makeSpliterator(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)857 protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s, 858 long sliceOrigin, long sliceFence, 859 long origin, long fence) { 860 return new SliceSpliterator.OfLong(s, sliceOrigin, sliceFence, origin, fence); 861 } 862 863 @Override emptyConsumer()864 protected LongConsumer emptyConsumer() { 865 return e -> {}; 866 } 867 } 868 869 static final class OfDouble extends OfPrimitive<Double, Spliterator.OfDouble, DoubleConsumer> 870 implements Spliterator.OfDouble { OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence)871 OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence) { 872 super(s, sliceOrigin, sliceFence); 873 } 874 OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)875 OfDouble(Spliterator.OfDouble s, 876 long sliceOrigin, long sliceFence, long origin, long fence) { 877 super(s, sliceOrigin, sliceFence, origin, fence); 878 } 879 880 @Override makeSpliterator(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)881 protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s, 882 long sliceOrigin, long sliceFence, 883 long origin, long fence) { 884 return new SliceSpliterator.OfDouble(s, sliceOrigin, sliceFence, origin, fence); 885 } 886 887 @Override emptyConsumer()888 protected DoubleConsumer emptyConsumer() { 889 return e -> {}; 890 } 891 } 892 } 893 894 /** 895 * A slice Spliterator that does not preserve order, if any, of a source 896 * Spliterator. 897 * 898 * Note: The source spliterator may report {@code ORDERED} since that 899 * spliterator be the result of a previous pipeline stage that was 900 * collected to a {@code Node}. It is the order of the pipeline stage 901 * that governs whether this slice spliterator is to be used or not. 902 */ 903 static abstract class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> { 904 static final int CHUNK_SIZE = 1 << 7; 905 906 // The spliterator to slice 907 protected final T_SPLITR s; 908 protected final boolean unlimited; 909 protected final int chunkSize; 910 private final long skipThreshold; 911 private final AtomicLong permits; 912 UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit)913 UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) { 914 this.s = s; 915 this.unlimited = limit < 0; 916 this.skipThreshold = limit >= 0 ? limit : 0; 917 this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE, 918 ((skip + limit) / AbstractTask.getLeafTarget()) + 1) : CHUNK_SIZE; 919 this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip); 920 } 921 UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator<T, T_SPLITR> parent)922 UnorderedSliceSpliterator(T_SPLITR s, 923 UnorderedSliceSpliterator<T, T_SPLITR> parent) { 924 this.s = s; 925 this.unlimited = parent.unlimited; 926 this.permits = parent.permits; 927 this.skipThreshold = parent.skipThreshold; 928 this.chunkSize = parent.chunkSize; 929 } 930 931 /** 932 * Acquire permission to skip or process elements. The caller must 933 * first acquire the elements, then consult this method for guidance 934 * as to what to do with the data. 935 * 936 * <p>We use an {@code AtomicLong} to atomically maintain a counter, 937 * which is initialized as skip+limit if we are limiting, or skip only 938 * if we are not limiting. The user should consult the method 939 * {@code checkPermits()} before acquiring data elements. 940 * 941 * @param numElements the number of elements the caller has in hand 942 * @return the number of elements that should be processed; any 943 * remaining elements should be discarded. 944 */ acquirePermits(long numElements)945 protected final long acquirePermits(long numElements) { 946 long remainingPermits; 947 long grabbing; 948 // permits never increase, and don't decrease below zero 949 assert numElements > 0; 950 do { 951 remainingPermits = permits.get(); 952 if (remainingPermits == 0) 953 return unlimited ? numElements : 0; 954 grabbing = Math.min(remainingPermits, numElements); 955 } while (grabbing > 0 && 956 !permits.compareAndSet(remainingPermits, remainingPermits - grabbing)); 957 958 if (unlimited) 959 return Math.max(numElements - grabbing, 0); 960 else if (remainingPermits > skipThreshold) 961 return Math.max(grabbing - (remainingPermits - skipThreshold), 0); 962 else 963 return grabbing; 964 } 965 966 enum PermitStatus { NO_MORE, MAYBE_MORE, UNLIMITED } 967 968 /** Call to check if permits might be available before acquiring data */ permitStatus()969 protected final PermitStatus permitStatus() { 970 if (permits.get() > 0) 971 return PermitStatus.MAYBE_MORE; 972 else 973 return unlimited ? PermitStatus.UNLIMITED : PermitStatus.NO_MORE; 974 } 975 trySplit()976 public final T_SPLITR trySplit() { 977 // Stop splitting when there are no more limit permits 978 if (permits.get() == 0) 979 return null; 980 @SuppressWarnings("unchecked") 981 T_SPLITR split = (T_SPLITR) s.trySplit(); 982 return split == null ? null : makeSpliterator(split); 983 } 984 makeSpliterator(T_SPLITR s)985 protected abstract T_SPLITR makeSpliterator(T_SPLITR s); 986 estimateSize()987 public final long estimateSize() { 988 return s.estimateSize(); 989 } 990 characteristics()991 public final int characteristics() { 992 return s.characteristics() & 993 ~(Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED); 994 } 995 996 static final class OfRef<T> extends UnorderedSliceSpliterator<T, Spliterator<T>> 997 implements Spliterator<T>, Consumer<T> { 998 T tmpSlot; 999 OfRef(Spliterator<T> s, long skip, long limit)1000 OfRef(Spliterator<T> s, long skip, long limit) { 1001 super(s, skip, limit); 1002 } 1003 OfRef(Spliterator<T> s, OfRef<T> parent)1004 OfRef(Spliterator<T> s, OfRef<T> parent) { 1005 super(s, parent); 1006 } 1007 1008 @Override accept(T t)1009 public final void accept(T t) { 1010 tmpSlot = t; 1011 } 1012 1013 @Override tryAdvance(Consumer<? super T> action)1014 public boolean tryAdvance(Consumer<? super T> action) { 1015 Objects.requireNonNull(action); 1016 1017 while (permitStatus() != PermitStatus.NO_MORE) { 1018 if (!s.tryAdvance(this)) 1019 return false; 1020 else if (acquirePermits(1) == 1) { 1021 action.accept(tmpSlot); 1022 tmpSlot = null; 1023 return true; 1024 } 1025 } 1026 return false; 1027 } 1028 1029 @Override forEachRemaining(Consumer<? super T> action)1030 public void forEachRemaining(Consumer<? super T> action) { 1031 Objects.requireNonNull(action); 1032 1033 ArrayBuffer.OfRef<T> sb = null; 1034 PermitStatus permitStatus; 1035 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { 1036 if (permitStatus == PermitStatus.MAYBE_MORE) { 1037 // Optimistically traverse elements up to a threshold of chunkSize 1038 if (sb == null) 1039 sb = new ArrayBuffer.OfRef<>(chunkSize); 1040 else 1041 sb.reset(); 1042 long permitsRequested = 0; 1043 do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize); 1044 if (permitsRequested == 0) 1045 return; 1046 sb.forEach(action, acquirePermits(permitsRequested)); 1047 } 1048 else { 1049 // Must be UNLIMITED; let 'er rip 1050 s.forEachRemaining(action); 1051 return; 1052 } 1053 } 1054 } 1055 1056 @Override makeSpliterator(Spliterator<T> s)1057 protected Spliterator<T> makeSpliterator(Spliterator<T> s) { 1058 return new UnorderedSliceSpliterator.OfRef<>(s, this); 1059 } 1060 } 1061 1062 /** 1063 * Concrete sub-types must also be an instance of type {@code T_CONS}. 1064 * 1065 * @param <T_BUFF> the type of the spined buffer. Must also be a type of 1066 * {@code T_CONS}. 1067 */ 1068 static abstract class OfPrimitive< 1069 T, 1070 T_CONS, 1071 T_BUFF extends ArrayBuffer.OfPrimitive<T_CONS>, 1072 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> 1073 extends UnorderedSliceSpliterator<T, T_SPLITR> 1074 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> { OfPrimitive(T_SPLITR s, long skip, long limit)1075 OfPrimitive(T_SPLITR s, long skip, long limit) { 1076 super(s, skip, limit); 1077 } 1078 OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent)1079 OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent) { 1080 super(s, parent); 1081 } 1082 1083 @Override tryAdvance(T_CONS action)1084 public boolean tryAdvance(T_CONS action) { 1085 Objects.requireNonNull(action); 1086 @SuppressWarnings("unchecked") 1087 T_CONS consumer = (T_CONS) this; 1088 1089 while (permitStatus() != PermitStatus.NO_MORE) { 1090 if (!s.tryAdvance(consumer)) 1091 return false; 1092 else if (acquirePermits(1) == 1) { 1093 acceptConsumed(action); 1094 return true; 1095 } 1096 } 1097 return false; 1098 } 1099 acceptConsumed(T_CONS action)1100 protected abstract void acceptConsumed(T_CONS action); 1101 1102 @Override forEachRemaining(T_CONS action)1103 public void forEachRemaining(T_CONS action) { 1104 Objects.requireNonNull(action); 1105 1106 T_BUFF sb = null; 1107 PermitStatus permitStatus; 1108 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { 1109 if (permitStatus == PermitStatus.MAYBE_MORE) { 1110 // Optimistically traverse elements up to a threshold of chunkSize 1111 if (sb == null) 1112 sb = bufferCreate(chunkSize); 1113 else 1114 sb.reset(); 1115 @SuppressWarnings("unchecked") 1116 T_CONS sbc = (T_CONS) sb; 1117 long permitsRequested = 0; 1118 do { } while (s.tryAdvance(sbc) && ++permitsRequested < chunkSize); 1119 if (permitsRequested == 0) 1120 return; 1121 sb.forEach(action, acquirePermits(permitsRequested)); 1122 } 1123 else { 1124 // Must be UNLIMITED; let 'er rip 1125 s.forEachRemaining(action); 1126 return; 1127 } 1128 } 1129 } 1130 bufferCreate(int initialCapacity)1131 protected abstract T_BUFF bufferCreate(int initialCapacity); 1132 } 1133 1134 static final class OfInt 1135 extends OfPrimitive<Integer, IntConsumer, ArrayBuffer.OfInt, Spliterator.OfInt> 1136 implements Spliterator.OfInt, IntConsumer { 1137 1138 int tmpValue; 1139 OfInt(Spliterator.OfInt s, long skip, long limit)1140 OfInt(Spliterator.OfInt s, long skip, long limit) { 1141 super(s, skip, limit); 1142 } 1143 OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent)1144 OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent) { 1145 super(s, parent); 1146 } 1147 1148 @Override accept(int value)1149 public void accept(int value) { 1150 tmpValue = value; 1151 } 1152 1153 @Override acceptConsumed(IntConsumer action)1154 protected void acceptConsumed(IntConsumer action) { 1155 action.accept(tmpValue); 1156 } 1157 1158 @Override bufferCreate(int initialCapacity)1159 protected ArrayBuffer.OfInt bufferCreate(int initialCapacity) { 1160 return new ArrayBuffer.OfInt(initialCapacity); 1161 } 1162 1163 @Override makeSpliterator(Spliterator.OfInt s)1164 protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { 1165 return new UnorderedSliceSpliterator.OfInt(s, this); 1166 } 1167 } 1168 1169 static final class OfLong 1170 extends OfPrimitive<Long, LongConsumer, ArrayBuffer.OfLong, Spliterator.OfLong> 1171 implements Spliterator.OfLong, LongConsumer { 1172 1173 long tmpValue; 1174 OfLong(Spliterator.OfLong s, long skip, long limit)1175 OfLong(Spliterator.OfLong s, long skip, long limit) { 1176 super(s, skip, limit); 1177 } 1178 OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent)1179 OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent) { 1180 super(s, parent); 1181 } 1182 1183 @Override accept(long value)1184 public void accept(long value) { 1185 tmpValue = value; 1186 } 1187 1188 @Override acceptConsumed(LongConsumer action)1189 protected void acceptConsumed(LongConsumer action) { 1190 action.accept(tmpValue); 1191 } 1192 1193 @Override bufferCreate(int initialCapacity)1194 protected ArrayBuffer.OfLong bufferCreate(int initialCapacity) { 1195 return new ArrayBuffer.OfLong(initialCapacity); 1196 } 1197 1198 @Override makeSpliterator(Spliterator.OfLong s)1199 protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { 1200 return new UnorderedSliceSpliterator.OfLong(s, this); 1201 } 1202 } 1203 1204 static final class OfDouble 1205 extends OfPrimitive<Double, DoubleConsumer, ArrayBuffer.OfDouble, Spliterator.OfDouble> 1206 implements Spliterator.OfDouble, DoubleConsumer { 1207 1208 double tmpValue; 1209 OfDouble(Spliterator.OfDouble s, long skip, long limit)1210 OfDouble(Spliterator.OfDouble s, long skip, long limit) { 1211 super(s, skip, limit); 1212 } 1213 OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent)1214 OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent) { 1215 super(s, parent); 1216 } 1217 1218 @Override accept(double value)1219 public void accept(double value) { 1220 tmpValue = value; 1221 } 1222 1223 @Override acceptConsumed(DoubleConsumer action)1224 protected void acceptConsumed(DoubleConsumer action) { 1225 action.accept(tmpValue); 1226 } 1227 1228 @Override bufferCreate(int initialCapacity)1229 protected ArrayBuffer.OfDouble bufferCreate(int initialCapacity) { 1230 return new ArrayBuffer.OfDouble(initialCapacity); 1231 } 1232 1233 @Override makeSpliterator(Spliterator.OfDouble s)1234 protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { 1235 return new UnorderedSliceSpliterator.OfDouble(s, this); 1236 } 1237 } 1238 } 1239 1240 /** 1241 * A wrapping spliterator that only reports distinct elements of the 1242 * underlying spliterator. Does not preserve size and encounter order. 1243 */ 1244 static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> { 1245 1246 // The value to represent null in the ConcurrentHashMap 1247 private static final Object NULL_VALUE = new Object(); 1248 1249 // The underlying spliterator 1250 private final Spliterator<T> s; 1251 1252 // ConcurrentHashMap holding distinct elements as keys 1253 private final ConcurrentHashMap<T, Boolean> seen; 1254 1255 // Temporary element, only used with tryAdvance 1256 private T tmpSlot; 1257 DistinctSpliterator(Spliterator<T> s)1258 DistinctSpliterator(Spliterator<T> s) { 1259 this(s, new ConcurrentHashMap<>()); 1260 } 1261 DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen)1262 private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) { 1263 this.s = s; 1264 this.seen = seen; 1265 } 1266 1267 @Override accept(T t)1268 public void accept(T t) { 1269 this.tmpSlot = t; 1270 } 1271 1272 @SuppressWarnings("unchecked") mapNull(T t)1273 private T mapNull(T t) { 1274 return t != null ? t : (T) NULL_VALUE; 1275 } 1276 1277 @Override tryAdvance(Consumer<? super T> action)1278 public boolean tryAdvance(Consumer<? super T> action) { 1279 while (s.tryAdvance(this)) { 1280 if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) { 1281 action.accept(tmpSlot); 1282 tmpSlot = null; 1283 return true; 1284 } 1285 } 1286 return false; 1287 } 1288 1289 @Override forEachRemaining(Consumer<? super T> action)1290 public void forEachRemaining(Consumer<? super T> action) { 1291 s.forEachRemaining(t -> { 1292 if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) { 1293 action.accept(t); 1294 } 1295 }); 1296 } 1297 1298 @Override trySplit()1299 public Spliterator<T> trySplit() { 1300 Spliterator<T> split = s.trySplit(); 1301 return (split != null) ? new DistinctSpliterator<>(split, seen) : null; 1302 } 1303 1304 @Override estimateSize()1305 public long estimateSize() { 1306 return s.estimateSize(); 1307 } 1308 1309 @Override characteristics()1310 public int characteristics() { 1311 return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED | 1312 Spliterator.SORTED | Spliterator.ORDERED)) 1313 | Spliterator.DISTINCT; 1314 } 1315 1316 @Override getComparator()1317 public Comparator<? super T> getComparator() { 1318 return s.getComparator(); 1319 } 1320 } 1321 1322 /** 1323 * A Spliterator that infinitely supplies elements in no particular order. 1324 * 1325 * <p>Splitting divides the estimated size in two and stops when the 1326 * estimate size is 0. 1327 * 1328 * <p>The {@code forEachRemaining} method if invoked will never terminate. 1329 * The {@code tryAdvance} method always returns true. 1330 * 1331 */ 1332 static abstract class InfiniteSupplyingSpliterator<T> implements Spliterator<T> { 1333 long estimate; 1334 InfiniteSupplyingSpliterator(long estimate)1335 protected InfiniteSupplyingSpliterator(long estimate) { 1336 this.estimate = estimate; 1337 } 1338 1339 @Override estimateSize()1340 public long estimateSize() { 1341 return estimate; 1342 } 1343 1344 @Override characteristics()1345 public int characteristics() { 1346 return IMMUTABLE; 1347 } 1348 1349 static final class OfRef<T> extends InfiniteSupplyingSpliterator<T> { 1350 final Supplier<T> s; 1351 OfRef(long size, Supplier<T> s)1352 OfRef(long size, Supplier<T> s) { 1353 super(size); 1354 this.s = s; 1355 } 1356 1357 @Override tryAdvance(Consumer<? super T> action)1358 public boolean tryAdvance(Consumer<? super T> action) { 1359 Objects.requireNonNull(action); 1360 1361 action.accept(s.get()); 1362 return true; 1363 } 1364 1365 @Override trySplit()1366 public Spliterator<T> trySplit() { 1367 if (estimate == 0) 1368 return null; 1369 return new InfiniteSupplyingSpliterator.OfRef<>(estimate >>>= 1, s); 1370 } 1371 } 1372 1373 static final class OfInt extends InfiniteSupplyingSpliterator<Integer> 1374 implements Spliterator.OfInt { 1375 final IntSupplier s; 1376 OfInt(long size, IntSupplier s)1377 OfInt(long size, IntSupplier s) { 1378 super(size); 1379 this.s = s; 1380 } 1381 1382 @Override tryAdvance(IntConsumer action)1383 public boolean tryAdvance(IntConsumer action) { 1384 Objects.requireNonNull(action); 1385 1386 action.accept(s.getAsInt()); 1387 return true; 1388 } 1389 1390 @Override trySplit()1391 public Spliterator.OfInt trySplit() { 1392 if (estimate == 0) 1393 return null; 1394 return new InfiniteSupplyingSpliterator.OfInt(estimate = estimate >>> 1, s); 1395 } 1396 } 1397 1398 static final class OfLong extends InfiniteSupplyingSpliterator<Long> 1399 implements Spliterator.OfLong { 1400 final LongSupplier s; 1401 OfLong(long size, LongSupplier s)1402 OfLong(long size, LongSupplier s) { 1403 super(size); 1404 this.s = s; 1405 } 1406 1407 @Override tryAdvance(LongConsumer action)1408 public boolean tryAdvance(LongConsumer action) { 1409 Objects.requireNonNull(action); 1410 1411 action.accept(s.getAsLong()); 1412 return true; 1413 } 1414 1415 @Override trySplit()1416 public Spliterator.OfLong trySplit() { 1417 if (estimate == 0) 1418 return null; 1419 return new InfiniteSupplyingSpliterator.OfLong(estimate = estimate >>> 1, s); 1420 } 1421 } 1422 1423 static final class OfDouble extends InfiniteSupplyingSpliterator<Double> 1424 implements Spliterator.OfDouble { 1425 final DoubleSupplier s; 1426 OfDouble(long size, DoubleSupplier s)1427 OfDouble(long size, DoubleSupplier s) { 1428 super(size); 1429 this.s = s; 1430 } 1431 1432 @Override tryAdvance(DoubleConsumer action)1433 public boolean tryAdvance(DoubleConsumer action) { 1434 Objects.requireNonNull(action); 1435 1436 action.accept(s.getAsDouble()); 1437 return true; 1438 } 1439 1440 @Override trySplit()1441 public Spliterator.OfDouble trySplit() { 1442 if (estimate == 0) 1443 return null; 1444 return new InfiniteSupplyingSpliterator.OfDouble(estimate = estimate >>> 1, s); 1445 } 1446 } 1447 } 1448 1449 // @@@ Consolidate with Node.Builder 1450 static abstract class ArrayBuffer { 1451 int index; 1452 reset()1453 void reset() { 1454 index = 0; 1455 } 1456 1457 static final class OfRef<T> extends ArrayBuffer implements Consumer<T> { 1458 final Object[] array; 1459 OfRef(int size)1460 OfRef(int size) { 1461 this.array = new Object[size]; 1462 } 1463 1464 @Override accept(T t)1465 public void accept(T t) { 1466 array[index++] = t; 1467 } 1468 forEach(Consumer<? super T> action, long fence)1469 public void forEach(Consumer<? super T> action, long fence) { 1470 for (int i = 0; i < fence; i++) { 1471 @SuppressWarnings("unchecked") 1472 T t = (T) array[i]; 1473 action.accept(t); 1474 } 1475 } 1476 } 1477 1478 static abstract class OfPrimitive<T_CONS> extends ArrayBuffer { 1479 int index; 1480 1481 @Override reset()1482 void reset() { 1483 index = 0; 1484 } 1485 forEach(T_CONS action, long fence)1486 abstract void forEach(T_CONS action, long fence); 1487 } 1488 1489 static final class OfInt extends OfPrimitive<IntConsumer> 1490 implements IntConsumer { 1491 final int[] array; 1492 OfInt(int size)1493 OfInt(int size) { 1494 this.array = new int[size]; 1495 } 1496 1497 @Override accept(int t)1498 public void accept(int t) { 1499 array[index++] = t; 1500 } 1501 1502 @Override forEach(IntConsumer action, long fence)1503 public void forEach(IntConsumer action, long fence) { 1504 for (int i = 0; i < fence; i++) { 1505 action.accept(array[i]); 1506 } 1507 } 1508 } 1509 1510 static final class OfLong extends OfPrimitive<LongConsumer> 1511 implements LongConsumer { 1512 final long[] array; 1513 OfLong(int size)1514 OfLong(int size) { 1515 this.array = new long[size]; 1516 } 1517 1518 @Override accept(long t)1519 public void accept(long t) { 1520 array[index++] = t; 1521 } 1522 1523 @Override forEach(LongConsumer action, long fence)1524 public void forEach(LongConsumer action, long fence) { 1525 for (int i = 0; i < fence; i++) { 1526 action.accept(array[i]); 1527 } 1528 } 1529 } 1530 1531 static final class OfDouble extends OfPrimitive<DoubleConsumer> 1532 implements DoubleConsumer { 1533 final double[] array; 1534 OfDouble(int size)1535 OfDouble(int size) { 1536 this.array = new double[size]; 1537 } 1538 1539 @Override accept(double t)1540 public void accept(double t) { 1541 array[index++] = t; 1542 } 1543 1544 @Override forEach(DoubleConsumer action, long fence)1545 void forEach(DoubleConsumer action, long fence) { 1546 for (int i = 0; i < fence; i++) { 1547 action.accept(array[i]); 1548 } 1549 } 1550 } 1551 } 1552 } 1553 1554