1 /* 2 * Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http.common; 27 28 import java.io.Closeable; 29 import java.nio.ByteBuffer; 30 import java.util.List; 31 import java.util.Objects; 32 import java.util.concurrent.CompletableFuture; 33 import java.util.concurrent.ConcurrentLinkedQueue; 34 import java.util.concurrent.Flow; 35 import java.util.concurrent.Flow.Subscriber; 36 import java.util.concurrent.atomic.AtomicLong; 37 import java.util.concurrent.atomic.AtomicReference; 38 39 /** 40 * A wrapper for a Flow.Subscriber. This wrapper delivers data to the wrapped 41 * Subscriber which is supplied to the constructor. This class takes care of 42 * downstream flow control automatically and upstream flow control automatically 43 * by default. 44 * <p> 45 * Processing is done by implementing the {@link #incoming(List, boolean)} method 46 * which supplies buffers from upstream. This method (or any other method) 47 * can then call the outgoing() method to deliver processed buffers downstream. 48 * <p> 49 * Upstream error signals are delivered downstream directly. Cancellation from 50 * downstream is also propagated upstream immediately. 51 * <p> 52 * Each SubscriberWrapper has a {@link java.util.concurrent.CompletableFuture}{@code <Void>} 53 * which propagates completion/errors from downstream to upstream. Normal completion 54 * can only occur after onComplete() is called, but errors can be propagated upwards 55 * at any time. 56 */ 57 public abstract class SubscriberWrapper 58 implements FlowTube.TubeSubscriber, Closeable, Flow.Processor<List<ByteBuffer>,List<ByteBuffer>> 59 // TODO: SSLTube Subscriber will never change? Does this really need to be a TS? 60 { 61 final Logger debug = 62 Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 63 64 public enum SchedulingAction { CONTINUE, RETURN, RESCHEDULE } 65 66 volatile Flow.Subscription upstreamSubscription; 67 final SubscriptionBase downstreamSubscription; 68 volatile boolean upstreamCompleted; 69 volatile boolean downstreamCompleted; 70 volatile boolean completionAcknowledged; 71 private volatile Subscriber<? super List<ByteBuffer>> downstreamSubscriber; 72 // processed byte to send to the downstream subscriber. 73 private final ConcurrentLinkedQueue<List<ByteBuffer>> outputQ; 74 private final CompletableFuture<Void> cf; 75 private final SequentialScheduler pushScheduler; 76 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 77 final AtomicLong upstreamWindow = new AtomicLong(); 78 79 /** 80 * Wraps the given downstream subscriber. For each call to {@link 81 * #onNext(List<ByteBuffer>) } the given filter function is invoked 82 * and the list (if not empty) returned is passed downstream. 83 * 84 * A {@code CompletableFuture} is supplied which can be used to signal an 85 * error from downstream and which terminates the wrapper or which signals 86 * completion of downstream activity which can be propagated upstream. Error 87 * completion can be signaled at any time, but normal completion must not be 88 * signaled before onComplete() is called. 89 */ SubscriberWrapper()90 public SubscriberWrapper() 91 { 92 this.outputQ = new ConcurrentLinkedQueue<>(); 93 this.cf = new MinimalFuture<Void>(); 94 cf.whenComplete((v,t) -> { 95 if (t != null) 96 errorCommon(t); 97 }); 98 this.pushScheduler = 99 SequentialScheduler.lockingScheduler(new DownstreamPusher()); 100 this.downstreamSubscription = new SubscriptionBase(pushScheduler, 101 this::downstreamCompletion); 102 } 103 104 @Override subscribe(Subscriber<? super List<ByteBuffer>> downstreamSubscriber)105 public final void subscribe(Subscriber<? super List<ByteBuffer>> downstreamSubscriber) { 106 Objects.requireNonNull(downstreamSubscriber); 107 this.downstreamSubscriber = downstreamSubscriber; 108 } 109 110 /** 111 * Wraps the given downstream wrapper in this. For each call to 112 * {@link #onNext(List<ByteBuffer>) } the incoming() method is called. 113 * 114 * The {@code downstreamCF} from the downstream wrapper is linked to this 115 * wrappers notifier. 116 * 117 * @param downstreamWrapper downstream destination 118 */ SubscriberWrapper(Subscriber<? super List<ByteBuffer>> downstreamWrapper)119 public SubscriberWrapper(Subscriber<? super List<ByteBuffer>> downstreamWrapper) 120 { 121 this(); 122 subscribe(downstreamWrapper); 123 } 124 125 /** 126 * Delivers data to be processed by this wrapper. Generated data to be sent 127 * downstream, must be provided to the {@link #outgoing(List, boolean)}} 128 * method. 129 * 130 * @param buffers a List of ByteBuffers. 131 * @param complete if true then no more data will be added to the list 132 */ incoming(List<ByteBuffer> buffers, boolean complete)133 protected abstract void incoming(List<ByteBuffer> buffers, boolean complete); 134 135 /** 136 * This method is called to determine the window size to use at any time. The 137 * current window is supplied together with the current downstream queue size. 138 * {@code 0} should be returned if no change is 139 * required or a positive integer which will be added to the current window. 140 * The default implementation maintains a downstream queue size of no greater 141 * than 5. The method can be overridden if required. 142 * 143 * @param currentWindow the current upstream subscription window 144 * @param downstreamQsize the current number of buffers waiting to be sent 145 * downstream 146 * 147 * @return value to add to currentWindow 148 */ upstreamWindowUpdate(long currentWindow, long downstreamQsize)149 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 150 if (downstreamQsize > 5) { 151 return 0; 152 } 153 154 if (currentWindow == 0) { 155 return 1; 156 } else { 157 return 0; 158 } 159 } 160 161 /** 162 * Override this if anything needs to be done after the upstream subscriber 163 * has subscribed 164 */ onSubscribe()165 protected void onSubscribe() { 166 } 167 168 /** 169 * Override this if anything needs to be done before checking for error 170 * and processing the input queue. 171 * @return 172 */ enterScheduling()173 protected SchedulingAction enterScheduling() { 174 return SchedulingAction.CONTINUE; 175 } 176 signalScheduling()177 protected boolean signalScheduling() { 178 if (downstreamCompleted || pushScheduler.isStopped()) { 179 return false; 180 } 181 pushScheduler.runOrSchedule(); 182 return true; 183 } 184 185 /** 186 * Delivers buffers of data downstream. After incoming() 187 * has been called complete == true signifying completion of the upstream 188 * subscription, data may continue to be delivered, up to when outgoing() is 189 * called complete == true, after which, the downstream subscription is 190 * completed. 191 * 192 * It's an error to call outgoing() with complete = true if incoming() has 193 * not previously been called with it. 194 */ outgoing(ByteBuffer buffer, boolean complete)195 public void outgoing(ByteBuffer buffer, boolean complete) { 196 Objects.requireNonNull(buffer); 197 assert !complete || !buffer.hasRemaining(); 198 outgoing(List.of(buffer), complete); 199 } 200 201 /** 202 * Sometime it might be necessary to complete the downstream subscriber 203 * before the upstream completes. For instance, when an SSL server 204 * sends a notify_close. In that case we should let the outgoing 205 * complete before upstream is completed. 206 * @return true, may be overridden by subclasses. 207 */ closing()208 public boolean closing() { 209 return false; 210 } 211 outgoing(List<ByteBuffer> buffers, boolean complete)212 public void outgoing(List<ByteBuffer> buffers, boolean complete) { 213 Objects.requireNonNull(buffers); 214 if (complete) { 215 assert Utils.remaining(buffers) == 0; 216 boolean closing = closing(); 217 if (debug.on()) 218 debug.log("completionAcknowledged upstreamCompleted:%s," 219 + " downstreamCompleted:%s, closing:%s", 220 upstreamCompleted, downstreamCompleted, closing); 221 if (!upstreamCompleted && !closing) { 222 throw new IllegalStateException("upstream not completed"); 223 } 224 completionAcknowledged = true; 225 } else { 226 if (debug.on()) 227 debug.log("Adding %d to outputQ queue", Utils.remaining(buffers)); 228 outputQ.add(buffers); 229 } 230 if (debug.on()) 231 debug.log("pushScheduler" +(pushScheduler.isStopped() ? " is stopped!" : " is alive")); 232 pushScheduler.runOrSchedule(); 233 } 234 235 /** 236 * Returns a CompletableFuture which completes when this wrapper completes. 237 * Normal completion happens with the following steps (in order): 238 * 1. onComplete() is called 239 * 2. incoming() called with complete = true 240 * 3. outgoing() may continue to be called normally 241 * 4. outgoing called with complete = true 242 * 5. downstream subscriber is called onComplete() 243 * 244 * If the subscription is canceled or onComplete() is invoked the 245 * CompletableFuture completes exceptionally. Exceptional completion 246 * also occurs if downstreamCF completes exceptionally. 247 */ completion()248 public CompletableFuture<Void> completion() { 249 return cf; 250 } 251 252 /** 253 * Invoked whenever it 'may' be possible to push buffers downstream. 254 */ 255 class DownstreamPusher implements Runnable { 256 @Override run()257 public void run() { 258 try { 259 run1(); 260 } catch (Throwable t) { 261 if (debug.on()) 262 debug.log("DownstreamPusher threw: " + t); 263 errorCommon(t); 264 } 265 } 266 run1()267 private void run1() { 268 if (downstreamCompleted) { 269 if (debug.on()) 270 debug.log("DownstreamPusher: downstream is already completed"); 271 return; 272 } 273 switch (enterScheduling()) { 274 case CONTINUE: break; 275 case RESCHEDULE: pushScheduler.runOrSchedule(); return; 276 case RETURN: return; 277 default: 278 errorRef.compareAndSet(null, 279 new InternalError("unknown scheduling command")); 280 break; 281 } 282 // If there was an error, send it downstream. 283 Throwable error = errorRef.get(); 284 if (error != null && outputQ.isEmpty()) { 285 synchronized(this) { 286 if (downstreamCompleted) 287 return; 288 downstreamCompleted = true; 289 } 290 if (debug.on()) 291 debug.log("DownstreamPusher: forwarding error downstream: " + error); 292 pushScheduler.stop(); 293 outputQ.clear(); 294 downstreamSubscriber.onError(error); 295 cf.completeExceptionally(error); 296 return; 297 } 298 299 // OK - no error, let's proceed 300 if (!outputQ.isEmpty()) { 301 if (debug.on()) 302 debug.log("DownstreamPusher: queue not empty, downstreamSubscription: %s", 303 downstreamSubscription); 304 } else { 305 if (debug.on()) 306 debug.log("DownstreamPusher: queue empty, downstreamSubscription: %s", 307 downstreamSubscription); 308 } 309 310 boolean datasent = false; 311 while (!outputQ.isEmpty() && downstreamSubscription.tryDecrement()) { 312 List<ByteBuffer> b = outputQ.poll(); 313 if (debug.on()) 314 debug.log("DownstreamPusher: Pushing %d bytes downstream", 315 Utils.remaining(b)); 316 downstreamSubscriber.onNext(b); 317 datasent = true; 318 } 319 320 // If we have sent some decrypted data downstream, 321 // or if: 322 // - there's nothing more available to send downstream 323 // - and we still have some demand from downstream 324 // - and upstream is not completed yet 325 // - and our demand from upstream has reached 0, 326 // then check whether we should request more data from 327 // upstream 328 if (datasent || outputQ.isEmpty() 329 && !downstreamSubscription.demand.isFulfilled() 330 && !upstreamCompleted 331 && upstreamWindow.get() == 0) { 332 upstreamWindowUpdate(); 333 } 334 checkCompletion(); 335 } 336 } 337 outputQueueSize()338 final int outputQueueSize() { 339 return outputQ.size(); 340 } 341 hasNoOutputData()342 final boolean hasNoOutputData() { 343 return outputQ.isEmpty(); 344 } 345 upstreamWindowUpdate()346 void upstreamWindowUpdate() { 347 long downstreamQueueSize = outputQ.size(); 348 long upstreamWindowSize = upstreamWindow.get(); 349 long n = upstreamWindowUpdate(upstreamWindowSize, downstreamQueueSize); 350 if (debug.on()) 351 debug.log("upstreamWindowUpdate, " 352 + "downstreamQueueSize:%d, upstreamWindow:%d", 353 downstreamQueueSize, upstreamWindowSize); 354 if (n > 0) 355 upstreamRequest(n); 356 } 357 358 @Override onSubscribe(Flow.Subscription subscription)359 public void onSubscribe(Flow.Subscription subscription) { 360 if (upstreamSubscription != null) { 361 throw new IllegalStateException("Single shot publisher"); 362 } 363 this.upstreamSubscription = subscription; 364 upstreamRequest(initialUpstreamDemand()); 365 if (debug.on()) 366 debug.log("calling downstreamSubscriber::onSubscribe on %s", 367 downstreamSubscriber); 368 downstreamSubscriber.onSubscribe(downstreamSubscription); 369 onSubscribe(); 370 } 371 372 @Override onNext(List<ByteBuffer> item)373 public void onNext(List<ByteBuffer> item) { 374 if (debug.on()) debug.log("onNext"); 375 long prev = upstreamWindow.getAndDecrement(); 376 if (prev <= 0) 377 throw new IllegalStateException("invalid onNext call"); 378 incomingCaller(item, false); 379 } 380 upstreamRequest(long n)381 private void upstreamRequest(long n) { 382 if (debug.on()) debug.log("requesting %d", n); 383 upstreamWindow.getAndAdd(n); 384 upstreamSubscription.request(n); 385 } 386 387 /** 388 * Initial demand that should be requested 389 * from upstream when we get the upstream subscription 390 * from {@link #onSubscribe(Flow.Subscription)}. 391 * @return The initial demand to request from upstream. 392 */ initialUpstreamDemand()393 protected long initialUpstreamDemand() { 394 return 1; 395 } 396 requestMore()397 protected void requestMore() { 398 if (upstreamWindow.get() == 0) { 399 upstreamRequest(1); 400 } 401 } 402 upstreamWindow()403 public long upstreamWindow() { 404 return upstreamWindow.get(); 405 } 406 407 @Override onError(Throwable throwable)408 public void onError(Throwable throwable) { 409 if (debug.on()) debug.log("onError: " + throwable); 410 errorCommon(Objects.requireNonNull(throwable)); 411 } 412 errorCommon(Throwable throwable)413 protected boolean errorCommon(Throwable throwable) { 414 assert throwable != null || 415 (throwable = new AssertionError("null throwable")) != null; 416 if (errorRef.compareAndSet(null, throwable)) { 417 if (debug.on()) debug.log("error", throwable); 418 upstreamCompleted = true; 419 pushScheduler.runOrSchedule(); 420 return true; 421 } 422 return false; 423 } 424 425 @Override close()426 public void close() { 427 errorCommon(new RuntimeException("wrapper closed")); 428 } 429 close(Throwable t)430 public void close(Throwable t) { 431 errorCommon(t); 432 } 433 incomingCaller(List<ByteBuffer> l, boolean complete)434 private void incomingCaller(List<ByteBuffer> l, boolean complete) { 435 try { 436 incoming(l, complete); 437 } catch(Throwable t) { 438 errorCommon(t); 439 } 440 } 441 442 @Override onComplete()443 public void onComplete() { 444 if (debug.on()) debug.log("upstream completed: " + toString()); 445 upstreamCompleted = true; 446 incomingCaller(Utils.EMPTY_BB_LIST, true); 447 // pushScheduler will call checkCompletion() 448 pushScheduler.runOrSchedule(); 449 } 450 451 /** Adds the given data to the input queue. */ addData(ByteBuffer l)452 public void addData(ByteBuffer l) { 453 if (upstreamSubscription == null) { 454 throw new IllegalStateException("can't add data before upstream subscriber subscribes"); 455 } 456 incomingCaller(List.of(l), false); 457 } 458 checkCompletion()459 void checkCompletion() { 460 if (downstreamCompleted || !upstreamCompleted) { 461 return; 462 } 463 if (!outputQ.isEmpty()) { 464 return; 465 } 466 if (errorRef.get() != null) { 467 pushScheduler.runOrSchedule(); 468 return; 469 } 470 if (completionAcknowledged) { 471 if (debug.on()) debug.log("calling downstreamSubscriber.onComplete()"); 472 downstreamSubscriber.onComplete(); 473 // Fix me subscriber.onComplete.run(); 474 downstreamCompleted = true; 475 cf.complete(null); 476 } 477 } 478 479 // called from the downstream Subscription.cancel() downstreamCompletion()480 void downstreamCompletion() { 481 upstreamSubscription.cancel(); 482 cf.complete(null); 483 } 484 resetDownstreamDemand()485 public void resetDownstreamDemand() { 486 downstreamSubscription.demand.reset(); 487 } 488 489 @Override toString()490 public String toString() { 491 StringBuilder sb = new StringBuilder(); 492 sb.append("SubscriberWrapper:") 493 .append(" upstreamCompleted: ").append(Boolean.toString(upstreamCompleted)) 494 .append(" upstreamWindow: ").append(upstreamWindow.toString()) 495 .append(" downstreamCompleted: ").append(Boolean.toString(downstreamCompleted)) 496 .append(" completionAcknowledged: ").append(Boolean.toString(completionAcknowledged)) 497 .append(" outputQ size: ").append(Integer.toString(outputQ.size())) 498 //.append(" outputQ: ").append(outputQ.toString()) 499 .append(" cf: ").append(cf.toString()) 500 .append(" downstreamSubscription: ").append(downstreamSubscription) 501 .append(" downstreamSubscriber: ").append(downstreamSubscriber); 502 503 return sb.toString(); 504 } 505 dbgString()506 public String dbgString() { 507 return "SubscriberWrapper"; 508 } 509 } 510