1 /*
2  * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http.common;
27 
28 import java.io.Closeable;
29 import java.nio.ByteBuffer;
30 import java.util.List;
31 import java.util.Objects;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.Flow;
35 import java.util.concurrent.Flow.Subscriber;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.concurrent.atomic.AtomicReference;
38 
39 /**
40  * A wrapper for a Flow.Subscriber. This wrapper delivers data to the wrapped
41  * Subscriber which is supplied to the constructor. This class takes care of
42  * downstream flow control automatically and upstream flow control automatically
43  * by default.
44  * <p>
45  * Processing is done by implementing the {@link #incoming(List, boolean)} method
46  * which supplies buffers from upstream. This method (or any other method)
47  * can then call the outgoing() method to deliver processed buffers downstream.
48  * <p>
49  * Upstream error signals are delivered downstream directly. Cancellation from
50  * downstream is also propagated upstream immediately.
51  * <p>
52  * Each SubscriberWrapper has a {@link java.util.concurrent.CompletableFuture}{@code <Void>}
53  * which propagates completion/errors from downstream to upstream. Normal completion
54  * can only occur after onComplete() is called, but errors can be propagated upwards
55  * at any time.
56  */
57 public abstract class SubscriberWrapper
58     implements FlowTube.TubeSubscriber, Closeable, Flow.Processor<List<ByteBuffer>,List<ByteBuffer>>
59                 // TODO: SSLTube Subscriber will never change? Does this really need to be a TS?
60 {
61     final Logger debug =
62             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
63 
64     public enum SchedulingAction { CONTINUE, RETURN, RESCHEDULE }
65 
66     volatile Flow.Subscription upstreamSubscription;
67     final SubscriptionBase downstreamSubscription;
68     volatile boolean upstreamCompleted;
69     volatile boolean downstreamCompleted;
70     volatile boolean completionAcknowledged;
71     private volatile Subscriber<? super List<ByteBuffer>> downstreamSubscriber;
72     // processed byte to send to the downstream subscriber.
73     private final ConcurrentLinkedQueue<List<ByteBuffer>> outputQ;
74     private final CompletableFuture<Void> cf;
75     private final SequentialScheduler pushScheduler;
76     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
77     final AtomicLong upstreamWindow = new AtomicLong(0);
78 
79     /**
80      * Wraps the given downstream subscriber. For each call to {@link
81      * #onNext(List<ByteBuffer>) } the given filter function is invoked
82      * and the list (if not empty) returned is passed downstream.
83      *
84      * A {@code CompletableFuture} is supplied which can be used to signal an
85      * error from downstream and which terminates the wrapper or which signals
86      * completion of downstream activity which can be propagated upstream. Error
87      * completion can be signaled at any time, but normal completion must not be
88      * signaled before onComplete() is called.
89      */
SubscriberWrapper()90     public SubscriberWrapper()
91     {
92         this.outputQ = new ConcurrentLinkedQueue<>();
93         this.cf = new MinimalFuture<Void>();
94         cf.whenComplete((v,t) -> {
95             if (t != null)
96                 errorCommon(t);
97         });
98         this.pushScheduler =
99                 SequentialScheduler.synchronizedScheduler(new DownstreamPusher());
100         this.downstreamSubscription = new SubscriptionBase(pushScheduler,
101                                                            this::downstreamCompletion);
102     }
103 
104     @Override
subscribe(Subscriber<? super List<ByteBuffer>> downstreamSubscriber)105     public final void subscribe(Subscriber<?  super List<ByteBuffer>> downstreamSubscriber) {
106         Objects.requireNonNull(downstreamSubscriber);
107         this.downstreamSubscriber = downstreamSubscriber;
108     }
109 
110     /**
111      * Wraps the given downstream wrapper in this. For each call to
112      * {@link #onNext(List<ByteBuffer>) } the incoming() method is called.
113      *
114      * The {@code downstreamCF} from the downstream wrapper is linked to this
115      * wrappers notifier.
116      *
117      * @param downstreamWrapper downstream destination
118      */
SubscriberWrapper(Subscriber<? super List<ByteBuffer>> downstreamWrapper)119     public SubscriberWrapper(Subscriber<? super List<ByteBuffer>> downstreamWrapper)
120     {
121         this();
122         subscribe(downstreamWrapper);
123     }
124 
125     /**
126      * Delivers data to be processed by this wrapper. Generated data to be sent
127      * downstream, must be provided to the {@link #outgoing(List, boolean)}}
128      * method.
129      *
130      * @param buffers a List of ByteBuffers.
131      * @param complete if true then no more data will be added to the list
132      */
incoming(List<ByteBuffer> buffers, boolean complete)133     protected abstract void incoming(List<ByteBuffer> buffers, boolean complete);
134 
135     /**
136      * This method is called to determine the window size to use at any time. The
137      * current window is supplied together with the current downstream queue size.
138      * {@code 0} should be returned if no change is
139      * required or a positive integer which will be added to the current window.
140      * The default implementation maintains a downstream queue size of no greater
141      * than 5. The method can be overridden if required.
142      *
143      * @param currentWindow the current upstream subscription window
144      * @param downstreamQsize the current number of buffers waiting to be sent
145      *                        downstream
146      *
147      * @return value to add to currentWindow
148      */
upstreamWindowUpdate(long currentWindow, long downstreamQsize)149     protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
150         if (downstreamQsize > 5) {
151             return 0;
152         }
153 
154         if (currentWindow == 0) {
155             return 1;
156         } else {
157             return 0;
158         }
159     }
160 
161     /**
162      * Override this if anything needs to be done after the upstream subscriber
163      * has subscribed
164      */
onSubscribe()165     protected void onSubscribe() {
166     }
167 
168     /**
169      * Override this if anything needs to be done before checking for error
170      * and processing the input queue.
171      * @return
172      */
enterScheduling()173     protected SchedulingAction enterScheduling() {
174         return SchedulingAction.CONTINUE;
175     }
176 
signalScheduling()177     protected boolean signalScheduling() {
178         if (downstreamCompleted || pushScheduler.isStopped()) {
179             return false;
180         }
181         pushScheduler.runOrSchedule();
182         return true;
183     }
184 
185     /**
186      * Delivers buffers of data downstream. After incoming()
187      * has been called complete == true signifying completion of the upstream
188      * subscription, data may continue to be delivered, up to when outgoing() is
189      * called complete == true, after which, the downstream subscription is
190      * completed.
191      *
192      * It's an error to call outgoing() with complete = true if incoming() has
193      * not previously been called with it.
194      */
outgoing(ByteBuffer buffer, boolean complete)195     public void outgoing(ByteBuffer buffer, boolean complete) {
196         Objects.requireNonNull(buffer);
197         assert !complete || !buffer.hasRemaining();
198         outgoing(List.of(buffer), complete);
199     }
200 
201     /**
202      * Sometime it might be necessary to complete the downstream subscriber
203      * before the upstream completes. For instance, when an SSL server
204      * sends a notify_close. In that case we should let the outgoing
205      * complete before upstream is completed.
206      * @return true, may be overridden by subclasses.
207      */
closing()208     public boolean closing() {
209         return false;
210     }
211 
outgoing(List<ByteBuffer> buffers, boolean complete)212     public void outgoing(List<ByteBuffer> buffers, boolean complete) {
213         Objects.requireNonNull(buffers);
214         if (complete) {
215             assert Utils.remaining(buffers) == 0;
216             boolean closing = closing();
217             if (debug.on())
218                 debug.log("completionAcknowledged upstreamCompleted:%s,"
219                           + " downstreamCompleted:%s, closing:%s",
220                           upstreamCompleted, downstreamCompleted, closing);
221             if (!upstreamCompleted && !closing) {
222                 throw new IllegalStateException("upstream not completed");
223             }
224             completionAcknowledged = true;
225         } else {
226             if (debug.on())
227                 debug.log("Adding %d to outputQ queue", Utils.remaining(buffers));
228             outputQ.add(buffers);
229         }
230         if (debug.on())
231             debug.log("pushScheduler" +(pushScheduler.isStopped() ? " is stopped!" : " is alive"));
232         pushScheduler.runOrSchedule();
233     }
234 
235     /**
236      * Returns a CompletableFuture which completes when this wrapper completes.
237      * Normal completion happens with the following steps (in order):
238      *   1. onComplete() is called
239      *   2. incoming() called with complete = true
240      *   3. outgoing() may continue to be called normally
241      *   4. outgoing called with complete = true
242      *   5. downstream subscriber is called onComplete()
243      *
244      * If the subscription is canceled or onComplete() is invoked the
245      * CompletableFuture completes exceptionally. Exceptional completion
246      * also occurs if downstreamCF completes exceptionally.
247      */
completion()248     public CompletableFuture<Void> completion() {
249         return cf;
250     }
251 
252     /**
253      * Invoked whenever it 'may' be possible to push buffers downstream.
254      */
255     class DownstreamPusher implements Runnable {
256         @Override
run()257         public void run() {
258             try {
259                 run1();
260             } catch (Throwable t) {
261                 if (debug.on())
262                     debug.log("DownstreamPusher threw: " + t);
263                 errorCommon(t);
264             }
265         }
266 
run1()267         private void run1() {
268             if (downstreamCompleted) {
269                 if (debug.on())
270                     debug.log("DownstreamPusher: downstream is already completed");
271                 return;
272             }
273             switch (enterScheduling()) {
274                 case CONTINUE: break;
275                 case RESCHEDULE: pushScheduler.runOrSchedule(); return;
276                 case RETURN: return;
277                 default:
278                     errorRef.compareAndSet(null,
279                             new InternalError("unknown scheduling command"));
280                     break;
281             }
282             // If there was an error, send it downstream.
283             Throwable error = errorRef.get();
284             if (error != null && outputQ.isEmpty()) {
285                 synchronized(this) {
286                     if (downstreamCompleted)
287                         return;
288                     downstreamCompleted = true;
289                 }
290                 if (debug.on())
291                     debug.log("DownstreamPusher: forwarding error downstream: " + error);
292                 pushScheduler.stop();
293                 outputQ.clear();
294                 downstreamSubscriber.onError(error);
295                 cf.completeExceptionally(error);
296                 return;
297             }
298 
299             // OK - no error, let's proceed
300             if (!outputQ.isEmpty()) {
301                 if (debug.on())
302                     debug.log("DownstreamPusher: queue not empty, downstreamSubscription: %s",
303                               downstreamSubscription);
304             } else {
305                 if (debug.on())
306                     debug.log("DownstreamPusher: queue empty, downstreamSubscription: %s",
307                                downstreamSubscription);
308             }
309 
310             boolean datasent = false;
311             while (!outputQ.isEmpty() && downstreamSubscription.tryDecrement()) {
312                 List<ByteBuffer> b = outputQ.poll();
313                 if (debug.on())
314                     debug.log("DownstreamPusher: Pushing %d bytes downstream",
315                               Utils.remaining(b));
316                 downstreamSubscriber.onNext(b);
317                 datasent = true;
318             }
319 
320             // If we have sent some decrypted data downstream,
321             // or if:
322             //    - there's nothing more available to send downstream
323             //    - and we still have some demand from downstream
324             //    - and upstream is not completed yet
325             //    - and our demand from upstream has reached 0,
326             // then check whether we should request more data from
327             // upstream
328             if (datasent || outputQ.isEmpty()
329                     && !downstreamSubscription.demand.isFulfilled()
330                     && !upstreamCompleted
331                     && upstreamWindow.get() == 0) {
332                 upstreamWindowUpdate();
333             }
334             checkCompletion();
335         }
336     }
337 
outputQueueSize()338     final int outputQueueSize() {
339         return outputQ.size();
340     }
341 
hasNoOutputData()342     final boolean hasNoOutputData() {
343         return outputQ.isEmpty();
344     }
345 
upstreamWindowUpdate()346     void upstreamWindowUpdate() {
347         long downstreamQueueSize = outputQ.size();
348         long upstreamWindowSize = upstreamWindow.get();
349         long n = upstreamWindowUpdate(upstreamWindowSize, downstreamQueueSize);
350         if (debug.on())
351             debug.log("upstreamWindowUpdate, "
352                       + "downstreamQueueSize:%d, upstreamWindow:%d",
353                       downstreamQueueSize, upstreamWindowSize);
354         if (n > 0)
355             upstreamRequest(n);
356     }
357 
358     @Override
onSubscribe(Flow.Subscription subscription)359     public void onSubscribe(Flow.Subscription subscription) {
360         if (upstreamSubscription != null) {
361             throw new IllegalStateException("Single shot publisher");
362         }
363         this.upstreamSubscription = subscription;
364         upstreamRequest(initialUpstreamDemand());
365         if (debug.on())
366             debug.log("calling downstreamSubscriber::onSubscribe on %s",
367                       downstreamSubscriber);
368         downstreamSubscriber.onSubscribe(downstreamSubscription);
369         onSubscribe();
370     }
371 
372     @Override
onNext(List<ByteBuffer> item)373     public void onNext(List<ByteBuffer> item) {
374         if (debug.on()) debug.log("onNext");
375         long prev = upstreamWindow.getAndDecrement();
376         if (prev <= 0)
377             throw new IllegalStateException("invalid onNext call");
378         incomingCaller(item, false);
379     }
380 
upstreamRequest(long n)381     private void upstreamRequest(long n) {
382         if (debug.on()) debug.log("requesting %d", n);
383         upstreamWindow.getAndAdd(n);
384         upstreamSubscription.request(n);
385     }
386 
387     /**
388      * Initial demand that should be requested
389      * from upstream when we get the upstream subscription
390      * from {@link #onSubscribe(Flow.Subscription)}.
391      * @return The initial demand to request from upstream.
392      */
initialUpstreamDemand()393     protected long initialUpstreamDemand() {
394         return 1;
395     }
396 
requestMore()397     protected void requestMore() {
398         if (upstreamWindow.get() == 0) {
399             upstreamRequest(1);
400         }
401     }
402 
upstreamWindow()403     public long upstreamWindow() {
404         return upstreamWindow.get();
405     }
406 
407     @Override
onError(Throwable throwable)408     public void onError(Throwable throwable) {
409         if (debug.on()) debug.log("onError: " + throwable);
410         errorCommon(Objects.requireNonNull(throwable));
411     }
412 
errorCommon(Throwable throwable)413     protected boolean errorCommon(Throwable throwable) {
414         assert throwable != null ||
415                 (throwable = new AssertionError("null throwable")) != null;
416         if (errorRef.compareAndSet(null, throwable)) {
417             if (debug.on()) debug.log("error", throwable);
418             upstreamCompleted = true;
419             pushScheduler.runOrSchedule();
420             return true;
421         }
422         return false;
423     }
424 
425     @Override
close()426     public void close() {
427         errorCommon(new RuntimeException("wrapper closed"));
428     }
429 
close(Throwable t)430     public void close(Throwable t) {
431         errorCommon(t);
432     }
433 
incomingCaller(List<ByteBuffer> l, boolean complete)434     private void incomingCaller(List<ByteBuffer> l, boolean complete) {
435         try {
436             incoming(l, complete);
437         } catch(Throwable t) {
438             errorCommon(t);
439         }
440     }
441 
442     @Override
onComplete()443     public void onComplete() {
444         if (debug.on()) debug.log("upstream completed: " + toString());
445         upstreamCompleted = true;
446         incomingCaller(Utils.EMPTY_BB_LIST, true);
447         // pushScheduler will call checkCompletion()
448         pushScheduler.runOrSchedule();
449     }
450 
451     /** Adds the given data to the input queue. */
addData(ByteBuffer l)452     public void addData(ByteBuffer l) {
453         if (upstreamSubscription == null) {
454             throw new IllegalStateException("can't add data before upstream subscriber subscribes");
455         }
456         incomingCaller(List.of(l), false);
457     }
458 
checkCompletion()459     void checkCompletion() {
460         if (downstreamCompleted || !upstreamCompleted) {
461             return;
462         }
463         if (!outputQ.isEmpty()) {
464             return;
465         }
466         if (errorRef.get() != null) {
467             pushScheduler.runOrSchedule();
468             return;
469         }
470         if (completionAcknowledged) {
471             if (debug.on()) debug.log("calling downstreamSubscriber.onComplete()");
472             downstreamSubscriber.onComplete();
473             // Fix me subscriber.onComplete.run();
474             downstreamCompleted = true;
475             cf.complete(null);
476         }
477     }
478 
479     // called from the downstream Subscription.cancel()
downstreamCompletion()480     void downstreamCompletion() {
481         upstreamSubscription.cancel();
482         cf.complete(null);
483     }
484 
resetDownstreamDemand()485     public void resetDownstreamDemand() {
486         downstreamSubscription.demand.reset();
487     }
488 
489     @Override
toString()490     public String toString() {
491         StringBuilder sb = new StringBuilder();
492         sb.append("SubscriberWrapper:")
493           .append(" upstreamCompleted: ").append(Boolean.toString(upstreamCompleted))
494           .append(" upstreamWindow: ").append(upstreamWindow.toString())
495           .append(" downstreamCompleted: ").append(Boolean.toString(downstreamCompleted))
496           .append(" completionAcknowledged: ").append(Boolean.toString(completionAcknowledged))
497           .append(" outputQ size: ").append(Integer.toString(outputQ.size()))
498           //.append(" outputQ: ").append(outputQ.toString())
499           .append(" cf: ").append(cf.toString())
500           .append(" downstreamSubscription: ").append(downstreamSubscription)
501           .append(" downstreamSubscriber: ").append(downstreamSubscriber);
502 
503         return sb.toString();
504     }
505 
dbgString()506     public String dbgString() {
507         return "SubscriberWrapper";
508     }
509 }
510