1 /* 2 * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.nio.ByteBuffer; 29 import java.util.ArrayList; 30 import java.util.Collections; 31 import java.util.List; 32 import java.util.ListIterator; 33 import java.util.Objects; 34 import java.util.concurrent.CompletionStage; 35 import java.util.concurrent.Flow; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import java.net.http.HttpResponse.BodySubscriber; 38 import jdk.internal.net.http.common.Demand; 39 import jdk.internal.net.http.common.SequentialScheduler; 40 import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber; 41 42 /** 43 * A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given 44 * amount ( in bytes ) of a publisher's data before pushing it to a downstream 45 * subscriber. 46 */ 47 public class BufferingSubscriber<T> implements TrustedSubscriber<T> 48 { 49 /** The downstream consumer of the data. */ 50 private final BodySubscriber<T> downstreamSubscriber; 51 /** The amount of data to be accumulate before pushing downstream. */ 52 private final int bufferSize; 53 54 /** The subscription, created lazily. */ 55 private volatile Flow.Subscription subscription; 56 /** The downstream subscription, created lazily. */ 57 private volatile DownstreamSubscription downstreamSubscription; 58 59 /** Must be held when accessing the internal buffers. */ 60 private final Object buffersLock = new Object(); 61 /** The internal buffers holding the buffered data. */ 62 private ArrayList<ByteBuffer> internalBuffers; 63 /** The actual accumulated remaining bytes in internalBuffers. */ 64 private int accumulatedBytes; 65 66 /** Holds the Throwable from upstream's onError. */ 67 private volatile Throwable throwable; 68 69 /** State of the buffering subscriber: 70 * 1) [UNSUBSCRIBED] when initially created 71 * 2) [ACTIVE] when subscribed and can receive data 72 * 3) [ERROR | CANCELLED | COMPLETE] (terminal state) 73 */ 74 static final int UNSUBSCRIBED = 0x01; 75 static final int ACTIVE = 0x02; 76 static final int ERROR = 0x04; 77 static final int CANCELLED = 0x08; 78 static final int COMPLETE = 0x10; 79 80 private volatile int state; 81 BufferingSubscriber(BodySubscriber<T> downstreamSubscriber, int bufferSize)82 public BufferingSubscriber(BodySubscriber<T> downstreamSubscriber, 83 int bufferSize) { 84 this.downstreamSubscriber = Objects.requireNonNull(downstreamSubscriber); 85 this.bufferSize = bufferSize; 86 synchronized (buffersLock) { 87 internalBuffers = new ArrayList<>(); 88 } 89 state = UNSUBSCRIBED; 90 } 91 92 /** Returns the number of bytes remaining in the given buffers. */ remaining(List<ByteBuffer> buffers)93 private static final long remaining(List<ByteBuffer> buffers) { 94 return buffers.stream().mapToLong(ByteBuffer::remaining).sum(); 95 } 96 97 @Override needsExecutor()98 public boolean needsExecutor() { 99 return TrustedSubscriber.needsExecutor(downstreamSubscriber); 100 } 101 102 /** 103 * Tells whether, or not, there is at least a sufficient number of bytes 104 * accumulated in the internal buffers. If the subscriber is COMPLETE, and 105 * has some buffered data, then there is always enough ( to pass downstream ). 106 */ hasEnoughAccumulatedBytes()107 private final boolean hasEnoughAccumulatedBytes() { 108 assert Thread.holdsLock(buffersLock); 109 return accumulatedBytes >= bufferSize 110 || (state == COMPLETE && accumulatedBytes > 0); 111 } 112 113 /** 114 * Returns a new, unmodifiable, List<ByteBuffer> containing exactly the 115 * amount of data as required before pushing downstream. The amount of data 116 * may be less than required ( bufferSize ), in the case where the subscriber 117 * is COMPLETE. 118 */ fromInternalBuffers()119 private List<ByteBuffer> fromInternalBuffers() { 120 assert Thread.holdsLock(buffersLock); 121 int leftToFill = bufferSize; 122 int state = this.state; 123 assert (state == ACTIVE || state == CANCELLED) 124 ? accumulatedBytes >= leftToFill : true; 125 List<ByteBuffer> dsts = new ArrayList<>(); 126 127 ListIterator<ByteBuffer> itr = internalBuffers.listIterator(); 128 while (itr.hasNext()) { 129 ByteBuffer b = itr.next(); 130 if (b.remaining() <= leftToFill) { 131 itr.remove(); 132 if (b.position() != 0) 133 b = b.slice(); // ensure position = 0 when propagated 134 dsts.add(b); 135 leftToFill -= b.remaining(); 136 accumulatedBytes -= b.remaining(); 137 if (leftToFill == 0) 138 break; 139 } else { 140 int prevLimit = b.limit(); 141 b.limit(b.position() + leftToFill); 142 ByteBuffer slice = b.slice(); 143 dsts.add(slice); 144 b.limit(prevLimit); 145 b.position(b.position() + leftToFill); 146 accumulatedBytes -= leftToFill; 147 leftToFill = 0; 148 break; 149 } 150 } 151 assert (state == ACTIVE || state == CANCELLED) 152 ? leftToFill == 0 : state == COMPLETE; 153 assert (state == ACTIVE || state == CANCELLED) 154 ? remaining(dsts) == bufferSize : state == COMPLETE; 155 assert accumulatedBytes >= 0; 156 assert dsts.stream().noneMatch(b -> b.position() != 0); 157 return Collections.unmodifiableList(dsts); 158 } 159 160 /** Subscription that is passed to the downstream subscriber. */ 161 private class DownstreamSubscription implements Flow.Subscription { 162 private final AtomicBoolean cancelled = new AtomicBoolean(); // false 163 private final Demand demand = new Demand(); 164 private volatile boolean illegalArg; 165 166 @Override request(long n)167 public void request(long n) { 168 if (cancelled.get() || illegalArg) { 169 return; 170 } 171 if (n <= 0L) { 172 // pass the "bad" value upstream so the Publisher can deal with 173 // it appropriately, i.e. invoke onError 174 illegalArg = true; 175 subscription.request(n); 176 return; 177 } 178 179 demand.increase(n); 180 181 pushDemanded(); 182 } 183 184 private final SequentialScheduler pushDemandedScheduler = 185 new SequentialScheduler(new PushDemandedTask()); 186 pushDemanded()187 void pushDemanded() { 188 if (cancelled.get()) 189 return; 190 pushDemandedScheduler.runOrSchedule(); 191 } 192 193 class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask { 194 @Override run()195 public void run() { 196 try { 197 Throwable t = throwable; 198 if (t != null) { 199 pushDemandedScheduler.stop(); // stop the demand scheduler 200 downstreamSubscriber.onError(t); 201 return; 202 } 203 204 while (true) { 205 List<ByteBuffer> item; 206 synchronized (buffersLock) { 207 if (cancelled.get()) 208 return; 209 if (!hasEnoughAccumulatedBytes()) 210 break; 211 if (!demand.tryDecrement()) 212 break; 213 item = fromInternalBuffers(); 214 } 215 assert item != null; 216 217 downstreamSubscriber.onNext(item); 218 } 219 if (cancelled.get()) 220 return; 221 222 // complete only if all data consumed 223 boolean complete; 224 synchronized (buffersLock) { 225 complete = state == COMPLETE && internalBuffers.isEmpty(); 226 } 227 if (complete) { 228 assert internalBuffers.isEmpty(); 229 pushDemandedScheduler.stop(); // stop the demand scheduler 230 downstreamSubscriber.onComplete(); 231 return; 232 } 233 } catch (Throwable t) { 234 cancel(); // cancel if there is any error 235 throw t; 236 } 237 238 boolean requestMore = false; 239 synchronized (buffersLock) { 240 if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) { 241 // request more upstream data 242 requestMore = true; 243 } 244 } 245 if (requestMore) 246 subscription.request(1); 247 } 248 } 249 250 @Override cancel()251 public void cancel() { 252 if (cancelled.compareAndExchange(false, true)) 253 return; // already cancelled 254 255 state = CANCELLED; // set CANCELLED state of upstream subscriber 256 subscription.cancel(); // cancel upstream subscription 257 pushDemandedScheduler.stop(); // stop the demand scheduler 258 } 259 } 260 261 @Override onSubscribe(Flow.Subscription subscription)262 public void onSubscribe(Flow.Subscription subscription) { 263 Objects.requireNonNull(subscription); 264 if (this.subscription != null) { 265 subscription.cancel(); 266 return; 267 } 268 269 int s = this.state; 270 assert s == UNSUBSCRIBED; 271 state = ACTIVE; 272 this.subscription = subscription; 273 downstreamSubscription = new DownstreamSubscription(); 274 downstreamSubscriber.onSubscribe(downstreamSubscription); 275 } 276 277 @Override onNext(List<ByteBuffer> item)278 public void onNext(List<ByteBuffer> item) { 279 Objects.requireNonNull(item); 280 281 int s = state; 282 if (s == CANCELLED) 283 return; 284 285 if (s != ACTIVE) 286 throw new InternalError("onNext on inactive subscriber"); 287 288 synchronized (buffersLock) { 289 internalBuffers.addAll(item); 290 accumulatedBytes += remaining(item); 291 } 292 293 downstreamSubscription.pushDemanded(); 294 } 295 296 @Override onError(Throwable incomingThrowable)297 public void onError(Throwable incomingThrowable) { 298 Objects.requireNonNull(incomingThrowable); 299 int s = state; 300 assert s == ACTIVE : "Expected ACTIVE, got:" + s; 301 state = ERROR; 302 Throwable t = this.throwable; 303 assert t == null : "Expected null, got:" + t; 304 this.throwable = incomingThrowable; 305 downstreamSubscription.pushDemanded(); 306 } 307 308 @Override onComplete()309 public void onComplete() { 310 int s = state; 311 assert s == ACTIVE : "Expected ACTIVE, got:" + s; 312 state = COMPLETE; 313 downstreamSubscription.pushDemanded(); 314 } 315 316 @Override getBody()317 public CompletionStage<T> getBody() { 318 return downstreamSubscriber.getBody(); 319 } 320 } 321