1 /*
2  * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http;
27 
28 import java.nio.ByteBuffer;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.ListIterator;
33 import java.util.Objects;
34 import java.util.concurrent.CompletionStage;
35 import java.util.concurrent.Flow;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.net.http.HttpResponse.BodySubscriber;
38 import jdk.internal.net.http.common.Demand;
39 import jdk.internal.net.http.common.SequentialScheduler;
40 import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;
41 
42 /**
43  * A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given
44  * amount ( in bytes ) of a publisher's data before pushing it to a downstream
45  * subscriber.
46  */
47 public class BufferingSubscriber<T> implements TrustedSubscriber<T>
48 {
49     /** The downstream consumer of the data. */
50     private final BodySubscriber<T> downstreamSubscriber;
51     /** The amount of data to be accumulate before pushing downstream. */
52     private final int bufferSize;
53 
54     /** The subscription, created lazily. */
55     private volatile Flow.Subscription subscription;
56     /** The downstream subscription, created lazily. */
57     private volatile DownstreamSubscription downstreamSubscription;
58 
59     /** Must be held when accessing the internal buffers. */
60     private final Object buffersLock = new Object();
61     /** The internal buffers holding the buffered data. */
62     private ArrayList<ByteBuffer> internalBuffers;
63     /** The actual accumulated remaining bytes in internalBuffers. */
64     private int accumulatedBytes;
65 
66     /** Holds the Throwable from upstream's onError. */
67     private volatile Throwable throwable;
68 
69     /** State of the buffering subscriber:
70      *  1) [UNSUBSCRIBED] when initially created
71      *  2) [ACTIVE] when subscribed and can receive data
72      *  3) [ERROR | CANCELLED | COMPLETE] (terminal state)
73      */
74     static final int UNSUBSCRIBED = 0x01;
75     static final int ACTIVE       = 0x02;
76     static final int ERROR        = 0x04;
77     static final int CANCELLED    = 0x08;
78     static final int COMPLETE     = 0x10;
79 
80     private volatile int state;
81 
BufferingSubscriber(BodySubscriber<T> downstreamSubscriber, int bufferSize)82     public BufferingSubscriber(BodySubscriber<T> downstreamSubscriber,
83                                int bufferSize) {
84         this.downstreamSubscriber = Objects.requireNonNull(downstreamSubscriber);
85         this.bufferSize = bufferSize;
86         synchronized (buffersLock) {
87             internalBuffers = new ArrayList<>();
88         }
89         state = UNSUBSCRIBED;
90     }
91 
92     /** Returns the number of bytes remaining in the given buffers. */
remaining(List<ByteBuffer> buffers)93     private static final long remaining(List<ByteBuffer> buffers) {
94         return buffers.stream().mapToLong(ByteBuffer::remaining).sum();
95     }
96 
97     @Override
needsExecutor()98     public boolean needsExecutor() {
99         return TrustedSubscriber.needsExecutor(downstreamSubscriber);
100     }
101 
102     /**
103      * Tells whether, or not, there is at least a sufficient number of bytes
104      * accumulated in the internal buffers. If the subscriber is COMPLETE, and
105      * has some buffered data, then there is always enough ( to pass downstream ).
106      */
hasEnoughAccumulatedBytes()107     private final boolean hasEnoughAccumulatedBytes() {
108         assert Thread.holdsLock(buffersLock);
109         return accumulatedBytes >= bufferSize
110                 || (state == COMPLETE && accumulatedBytes > 0);
111     }
112 
113     /**
114      * Returns a new, unmodifiable, List<ByteBuffer> containing exactly the
115      * amount of data as required before pushing downstream. The amount of data
116      * may be less than required ( bufferSize ), in the case where the subscriber
117      * is COMPLETE.
118      */
fromInternalBuffers()119     private List<ByteBuffer> fromInternalBuffers() {
120         assert Thread.holdsLock(buffersLock);
121         int leftToFill = bufferSize;
122         int state = this.state;
123         assert (state == ACTIVE || state == CANCELLED)
124                 ? accumulatedBytes >= leftToFill : true;
125         List<ByteBuffer> dsts = new ArrayList<>();
126 
127         ListIterator<ByteBuffer> itr = internalBuffers.listIterator();
128         while (itr.hasNext()) {
129             ByteBuffer b = itr.next();
130             if (b.remaining() <= leftToFill) {
131                 itr.remove();
132                 if (b.position() != 0)
133                     b = b.slice();  // ensure position = 0 when propagated
134                 dsts.add(b);
135                 leftToFill -= b.remaining();
136                 accumulatedBytes -= b.remaining();
137                 if (leftToFill == 0)
138                     break;
139             } else {
140                 int prevLimit = b.limit();
141                 b.limit(b.position() + leftToFill);
142                 ByteBuffer slice = b.slice();
143                 dsts.add(slice);
144                 b.limit(prevLimit);
145                 b.position(b.position() + leftToFill);
146                 accumulatedBytes -= leftToFill;
147                 leftToFill = 0;
148                 break;
149             }
150         }
151         assert (state == ACTIVE || state == CANCELLED)
152                 ? leftToFill == 0 : state == COMPLETE;
153         assert (state == ACTIVE || state == CANCELLED)
154                 ? remaining(dsts) == bufferSize : state == COMPLETE;
155         assert accumulatedBytes >= 0;
156         assert dsts.stream().noneMatch(b -> b.position() != 0);
157         return Collections.unmodifiableList(dsts);
158     }
159 
160     /** Subscription that is passed to the downstream subscriber. */
161     private class DownstreamSubscription implements Flow.Subscription {
162         private final AtomicBoolean cancelled = new AtomicBoolean(); // false
163         private final Demand demand = new Demand();
164         private volatile boolean illegalArg;
165 
166         @Override
request(long n)167         public void request(long n) {
168             if (cancelled.get() || illegalArg) {
169                 return;
170             }
171             if (n <= 0L) {
172                 // pass the "bad" value upstream so the Publisher can deal with
173                 // it appropriately, i.e. invoke onError
174                 illegalArg = true;
175                 subscription.request(n);
176                 return;
177             }
178 
179             demand.increase(n);
180 
181             pushDemanded();
182         }
183 
184         private final SequentialScheduler pushDemandedScheduler =
185                 new SequentialScheduler(new PushDemandedTask());
186 
pushDemanded()187         void pushDemanded() {
188             if (cancelled.get())
189                 return;
190             pushDemandedScheduler.runOrSchedule();
191         }
192 
193         class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask {
194             @Override
run()195             public void run() {
196                 try {
197                     Throwable t = throwable;
198                     if (t != null) {
199                         pushDemandedScheduler.stop(); // stop the demand scheduler
200                         downstreamSubscriber.onError(t);
201                         return;
202                     }
203 
204                     while (true) {
205                         List<ByteBuffer> item;
206                         synchronized (buffersLock) {
207                             if (cancelled.get())
208                                 return;
209                             if (!hasEnoughAccumulatedBytes())
210                                 break;
211                             if (!demand.tryDecrement())
212                                 break;
213                             item = fromInternalBuffers();
214                         }
215                         assert item != null;
216 
217                         downstreamSubscriber.onNext(item);
218                     }
219                     if (cancelled.get())
220                         return;
221 
222                     // complete only if all data consumed
223                     boolean complete;
224                     synchronized (buffersLock) {
225                         complete = state == COMPLETE && internalBuffers.isEmpty();
226                     }
227                     if (complete) {
228                         assert internalBuffers.isEmpty();
229                         pushDemandedScheduler.stop(); // stop the demand scheduler
230                         downstreamSubscriber.onComplete();
231                         return;
232                     }
233                 } catch (Throwable t) {
234                     cancel();  // cancel if there is any error
235                     throw t;
236                 }
237 
238                 boolean requestMore = false;
239                 synchronized (buffersLock) {
240                     if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) {
241                         // request more upstream data
242                         requestMore = true;
243                     }
244                 }
245                 if (requestMore)
246                     subscription.request(1);
247             }
248         }
249 
250         @Override
cancel()251         public void cancel() {
252             if (cancelled.compareAndExchange(false, true))
253                 return;  // already cancelled
254 
255             state = CANCELLED;  // set CANCELLED state of upstream subscriber
256             subscription.cancel();  // cancel upstream subscription
257             pushDemandedScheduler.stop(); // stop the demand scheduler
258         }
259     }
260 
261     @Override
onSubscribe(Flow.Subscription subscription)262     public void onSubscribe(Flow.Subscription subscription) {
263         Objects.requireNonNull(subscription);
264         if (this.subscription != null) {
265             subscription.cancel();
266             return;
267         }
268 
269         int s = this.state;
270         assert s == UNSUBSCRIBED;
271         state = ACTIVE;
272         this.subscription = subscription;
273         downstreamSubscription = new DownstreamSubscription();
274         downstreamSubscriber.onSubscribe(downstreamSubscription);
275     }
276 
277     @Override
onNext(List<ByteBuffer> item)278     public void onNext(List<ByteBuffer> item) {
279         Objects.requireNonNull(item);
280 
281         int s = state;
282         if (s == CANCELLED)
283             return;
284 
285         if (s != ACTIVE)
286             throw new InternalError("onNext on inactive subscriber");
287 
288         synchronized (buffersLock) {
289             internalBuffers.addAll(item);
290             accumulatedBytes += remaining(item);
291         }
292 
293         downstreamSubscription.pushDemanded();
294     }
295 
296     @Override
onError(Throwable incomingThrowable)297     public void onError(Throwable incomingThrowable) {
298         Objects.requireNonNull(incomingThrowable);
299         int s = state;
300         assert s == ACTIVE : "Expected ACTIVE, got:" + s;
301         state = ERROR;
302         Throwable t = this.throwable;
303         assert t == null : "Expected null, got:" + t;
304         this.throwable = incomingThrowable;
305         downstreamSubscription.pushDemanded();
306     }
307 
308     @Override
onComplete()309     public void onComplete() {
310         int s = state;
311         assert s == ACTIVE : "Expected ACTIVE, got:" + s;
312         state = COMPLETE;
313         downstreamSubscription.pushDemanded();
314     }
315 
316     @Override
getBody()317     public CompletionStage<T> getBody() {
318         return downstreamSubscriber.getBody();
319     }
320 }
321