1 /* 2 * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.EOFException; 29 import java.lang.System.Logger.Level; 30 import java.nio.ByteBuffer; 31 import java.util.List; 32 import java.util.concurrent.CompletableFuture; 33 import java.util.concurrent.CompletionStage; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.Flow; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import java.util.concurrent.atomic.AtomicLong; 38 import java.util.function.Consumer; 39 import java.util.function.Function; 40 import java.net.http.HttpHeaders; 41 import java.net.http.HttpResponse; 42 import jdk.internal.net.http.ResponseContent.BodyParser; 43 import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser; 44 import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber; 45 import jdk.internal.net.http.common.Log; 46 import jdk.internal.net.http.common.Logger; 47 import jdk.internal.net.http.common.MinimalFuture; 48 import jdk.internal.net.http.common.Utils; 49 import static java.net.http.HttpClient.Version.HTTP_1_1; 50 import static java.net.http.HttpResponse.BodySubscribers.discarding; 51 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; 52 import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED; 53 54 /** 55 * Handles a HTTP/1.1 response (headers + body). 56 * There can be more than one of these per Http exchange. 57 */ 58 class Http1Response<T> { 59 60 private volatile ResponseContent content; 61 private final HttpRequestImpl request; 62 private Response response; 63 private final HttpConnection connection; 64 private HttpHeaders headers; 65 private int responseCode; 66 private final Http1Exchange<T> exchange; 67 private boolean return2Cache; // return connection to cache when finished 68 private final HeadersReader headersReader; // used to read the headers 69 private final BodyReader bodyReader; // used to read the body 70 private final Http1AsyncReceiver asyncReceiver; 71 private volatile EOFException eof; 72 private volatile BodyParser bodyParser; 73 // max number of bytes of (fixed length) body to ignore on redirect 74 private final static int MAX_IGNORE = 1024; 75 76 // Revisit: can we get rid of this? 77 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} 78 private volatile State readProgress = State.INITIAL; 79 80 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 81 final static AtomicLong responseCount = new AtomicLong(); 82 final long id = responseCount.incrementAndGet(); 83 private Http1HeaderParser hd; 84 Http1Response(HttpConnection conn, Http1Exchange<T> exchange, Http1AsyncReceiver asyncReceiver)85 Http1Response(HttpConnection conn, 86 Http1Exchange<T> exchange, 87 Http1AsyncReceiver asyncReceiver) { 88 this.readProgress = State.INITIAL; 89 this.request = exchange.request(); 90 this.exchange = exchange; 91 this.connection = conn; 92 this.asyncReceiver = asyncReceiver; 93 headersReader = new HeadersReader(this::advance); 94 bodyReader = new BodyReader(this::advance); 95 96 hd = new Http1HeaderParser(); 97 readProgress = State.READING_HEADERS; 98 headersReader.start(hd); 99 asyncReceiver.subscribe(headersReader); 100 } 101 102 String dbgTag; dbgString()103 private String dbgString() { 104 String dbg = dbgTag; 105 if (dbg == null) { 106 String cdbg = connection.dbgTag; 107 if (cdbg != null) { 108 dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")"; 109 } else { 110 dbg = "Http1Response(id=" + id + ")"; 111 } 112 } 113 return dbg; 114 } 115 116 // The ClientRefCountTracker is used to track the state 117 // of a pending operation. Altough there usually is a single 118 // point where the operation starts, it may terminate at 119 // different places. 120 private final class ClientRefCountTracker { 121 final HttpClientImpl client = connection.client(); 122 // state & 0x01 != 0 => acquire called 123 // state & 0x02 != 0 => tryRelease called 124 byte state; 125 acquire()126 public synchronized void acquire() { 127 if (state == 0) { 128 // increment the reference count on the HttpClientImpl 129 // to prevent the SelectorManager thread from exiting 130 // until our operation is complete. 131 if (debug.on()) 132 debug.log("Operation started: incrementing ref count for %s", client); 133 client.reference(); 134 state = 0x01; 135 } else { 136 if (debug.on()) 137 debug.log("Operation ref count for %s is already %s", 138 client, ((state & 0x2) == 0x2) ? "released." : "incremented!" ); 139 assert (state & 0x01) == 0 : "reference count already incremented"; 140 } 141 } 142 tryRelease()143 public synchronized void tryRelease() { 144 if (state == 0x01) { 145 // decrement the reference count on the HttpClientImpl 146 // to allow the SelectorManager thread to exit if no 147 // other operation is pending and the facade is no 148 // longer referenced. 149 if (debug.on()) 150 debug.log("Operation finished: decrementing ref count for %s", client); 151 client.unreference(); 152 } else if (state == 0) { 153 if (debug.on()) 154 debug.log("Operation finished: releasing ref count for %s", client); 155 } else if ((state & 0x02) == 0x02) { 156 if (debug.on()) 157 debug.log("ref count for %s already released", client); 158 } 159 state |= 0x02; 160 } 161 } 162 163 private volatile boolean firstTimeAround = true; 164 readHeadersAsync(Executor executor)165 public CompletableFuture<Response> readHeadersAsync(Executor executor) { 166 if (debug.on()) 167 debug.log("Reading Headers: (remaining: " 168 + asyncReceiver.remaining() +") " + readProgress); 169 170 if (firstTimeAround) { 171 if (debug.on()) debug.log("First time around"); 172 firstTimeAround = false; 173 } else { 174 // with expect continue we will resume reading headers + body. 175 asyncReceiver.unsubscribe(bodyReader); 176 bodyReader.reset(); 177 178 hd = new Http1HeaderParser(); 179 readProgress = State.READING_HEADERS; 180 headersReader.reset(); 181 headersReader.start(hd); 182 asyncReceiver.subscribe(headersReader); 183 } 184 185 CompletableFuture<State> cf = headersReader.completion(); 186 assert cf != null : "parsing not started"; 187 if (debug.on()) { 188 debug.log("headersReader is %s", 189 cf == null ? "not yet started" 190 : cf.isDone() ? "already completed" 191 : "not yet completed"); 192 } 193 194 Function<State, Response> lambda = (State completed) -> { 195 assert completed == State.READING_HEADERS; 196 if (debug.on()) 197 debug.log("Reading Headers: creating Response object;" 198 + " state is now " + readProgress); 199 asyncReceiver.unsubscribe(headersReader); 200 responseCode = hd.responseCode(); 201 headers = hd.headers(); 202 203 response = new Response(request, 204 exchange.getExchange(), 205 headers, 206 connection, 207 responseCode, 208 HTTP_1_1); 209 210 if (Log.headers()) { 211 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 212 Log.dumpHeaders(sb, " ", headers); 213 Log.logHeaders(sb.toString()); 214 } 215 216 return response; 217 }; 218 219 if (executor != null) { 220 return cf.thenApplyAsync(lambda, executor); 221 } else { 222 return cf.thenApply(lambda); 223 } 224 } 225 226 private boolean finished; 227 completed()228 synchronized void completed() { 229 finished = true; 230 } 231 finished()232 synchronized boolean finished() { 233 return finished; 234 } 235 236 /** 237 * Return known fixed content length or -1 if chunked, or -2 if no content-length 238 * information in which case, connection termination delimits the response body 239 */ fixupContentLen(long clen)240 long fixupContentLen(long clen) { 241 if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) { 242 return 0L; 243 } 244 if (clen == -1L) { 245 if (headers.firstValue("Transfer-encoding").orElse("") 246 .equalsIgnoreCase("chunked")) { 247 return -1L; 248 } 249 if (responseCode == 101) { 250 // this is a h2c or websocket upgrade, contentlength must be zero 251 return 0L; 252 } 253 return -2L; 254 } 255 return clen; 256 } 257 258 /** 259 * Read up to MAX_IGNORE bytes discarding 260 */ ignoreBody(Executor executor)261 public CompletableFuture<Void> ignoreBody(Executor executor) { 262 int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); 263 if (clen == -1 || clen > MAX_IGNORE) { 264 connection.close(); 265 return MinimalFuture.completedFuture(null); // not treating as error 266 } else { 267 return readBody(discarding(), !request.isWebSocket(), executor); 268 } 269 } 270 271 // Used for those response codes that have no body associated nullBody(HttpResponse<T> resp, Throwable t)272 public void nullBody(HttpResponse<T> resp, Throwable t) { 273 if (t != null) connection.close(); 274 else { 275 return2Cache = !request.isWebSocket(); 276 onFinished(); 277 } 278 } 279 280 static final Flow.Subscription NOP = new Flow.Subscription() { 281 @Override 282 public void request(long n) { } 283 public void cancel() { } 284 }; 285 286 /** 287 * The Http1AsyncReceiver ensures that all calls to 288 * the subscriber, including onSubscribe, occur sequentially. 289 * There could however be some race conditions that could happen 290 * in case of unexpected errors thrown at unexpected places, which 291 * may cause onError to be called multiple times. 292 * The Http1BodySubscriber will ensure that the user subscriber 293 * is actually completed only once - and only after it is 294 * subscribed. 295 * @param <U> The type of response. 296 */ 297 final static class Http1BodySubscriber<U> implements TrustedSubscriber<U> { 298 final HttpResponse.BodySubscriber<U> userSubscriber; 299 final AtomicBoolean completed = new AtomicBoolean(); 300 volatile Throwable withError; 301 volatile boolean subscribed; Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber)302 Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) { 303 this.userSubscriber = userSubscriber; 304 } 305 306 @Override needsExecutor()307 public boolean needsExecutor() { 308 return TrustedSubscriber.needsExecutor(userSubscriber); 309 } 310 311 // propagate the error to the user subscriber, even if not 312 // subscribed yet. propagateError(Throwable t)313 private void propagateError(Throwable t) { 314 assert t != null; 315 try { 316 // if unsubscribed at this point, it will not 317 // get subscribed later - so do it now and 318 // propagate the error 319 if (subscribed == false) { 320 subscribed = true; 321 userSubscriber.onSubscribe(NOP); 322 } 323 } finally { 324 // if onError throws then there is nothing to do 325 // here: let the caller deal with it by logging 326 // and closing the connection. 327 userSubscriber.onError(t); 328 } 329 } 330 331 // complete the subscriber, either normally or exceptionally 332 // ensure that the subscriber is completed only once. complete(Throwable t)333 private void complete(Throwable t) { 334 if (completed.compareAndSet(false, true)) { 335 t = withError = Utils.getCompletionCause(t); 336 if (t == null) { 337 assert subscribed; 338 try { 339 userSubscriber.onComplete(); 340 } catch (Throwable x) { 341 // Simply propagate the error by calling 342 // onError on the user subscriber, and let the 343 // connection be reused since we should have received 344 // and parsed all the bytes when we reach here. 345 // If onError throws in turn, then we will simply 346 // let that new exception flow up to the caller 347 // and let it deal with it. 348 // (i.e: log and close the connection) 349 // Note that rethrowing here could introduce a 350 // race that might cause the next send() operation to 351 // fail as the connection has already been put back 352 // into the cache when we reach here. 353 propagateError(t = withError = Utils.getCompletionCause(x)); 354 } 355 } else { 356 propagateError(t); 357 } 358 } 359 } 360 361 @Override getBody()362 public CompletionStage<U> getBody() { 363 return userSubscriber.getBody(); 364 } 365 366 @Override onSubscribe(Flow.Subscription subscription)367 public void onSubscribe(Flow.Subscription subscription) { 368 if (!subscribed) { 369 subscribed = true; 370 userSubscriber.onSubscribe(subscription); 371 } else { 372 // could be already subscribed and completed 373 // if an unexpected error occurred before the actual 374 // subscription - though that's not supposed 375 // happen. 376 assert completed.get(); 377 } 378 } 379 @Override onNext(List<ByteBuffer> item)380 public void onNext(List<ByteBuffer> item) { 381 assert !completed.get(); 382 userSubscriber.onNext(item); 383 } 384 @Override onError(Throwable throwable)385 public void onError(Throwable throwable) { 386 complete(throwable); 387 } 388 @Override onComplete()389 public void onComplete() { 390 complete(null); 391 } 392 } 393 readBody(HttpResponse.BodySubscriber<U> p, boolean return2Cache, Executor executor)394 public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p, 395 boolean return2Cache, 396 Executor executor) { 397 if (debug.on()) { 398 debug.log("readBody: return2Cache: " + return2Cache); 399 if (request.isWebSocket() && return2Cache && connection != null) { 400 debug.log("websocket connection will be returned to cache: " 401 + connection.getClass() + "/" + connection ); 402 } 403 } 404 assert !return2Cache || !request.isWebSocket(); 405 this.return2Cache = return2Cache; 406 final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p); 407 408 final CompletableFuture<U> cf = new MinimalFuture<>(); 409 410 long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L); 411 final long clen = fixupContentLen(clen0); 412 413 // expect-continue reads headers and body twice. 414 // if we reach here, we must reset the headersReader state. 415 asyncReceiver.unsubscribe(headersReader); 416 headersReader.reset(); 417 ClientRefCountTracker refCountTracker = new ClientRefCountTracker(); 418 419 // We need to keep hold on the client facade until the 420 // tracker has been incremented. 421 connection.client().reference(); 422 executor.execute(() -> { 423 try { 424 content = new ResponseContent( 425 connection, clen, headers, subscriber, 426 this::onFinished 427 ); 428 if (cf.isCompletedExceptionally()) { 429 // if an error occurs during subscription 430 connection.close(); 431 return; 432 } 433 // increment the reference count on the HttpClientImpl 434 // to prevent the SelectorManager thread from exiting until 435 // the body is fully read. 436 refCountTracker.acquire(); 437 bodyParser = content.getBodyParser( 438 (t) -> { 439 try { 440 if (t != null) { 441 try { 442 subscriber.onError(t); 443 } finally { 444 cf.completeExceptionally(t); 445 } 446 } 447 } finally { 448 bodyReader.onComplete(t); 449 if (t != null) { 450 connection.close(); 451 } 452 } 453 }); 454 bodyReader.start(bodyParser); 455 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); 456 asyncReceiver.subscribe(bodyReader); 457 assert bodyReaderCF != null : "parsing not started"; 458 // Make sure to keep a reference to asyncReceiver from 459 // within this 460 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> { 461 t = Utils.getCompletionCause(t); 462 try { 463 if (t == null) { 464 if (debug.on()) debug.log("Finished reading body: " + s); 465 assert s == State.READING_BODY; 466 } 467 if (t != null) { 468 subscriber.onError(t); 469 cf.completeExceptionally(t); 470 } 471 } catch (Throwable x) { 472 // not supposed to happen 473 asyncReceiver.onReadError(x); 474 } finally { 475 // we're done: release the ref count for 476 // the current operation. 477 refCountTracker.tryRelease(); 478 } 479 }); 480 connection.addTrailingOperation(trailingOp); 481 } catch (Throwable t) { 482 if (debug.on()) debug.log("Failed reading body: " + t); 483 try { 484 subscriber.onError(t); 485 cf.completeExceptionally(t); 486 } finally { 487 asyncReceiver.onReadError(t); 488 } 489 } finally { 490 connection.client().unreference(); 491 } 492 }); 493 494 ResponseSubscribers.getBodyAsync(executor, p, cf, (t) -> { 495 cf.completeExceptionally(t); 496 asyncReceiver.setRetryOnError(false); 497 asyncReceiver.onReadError(t); 498 }); 499 500 return cf.whenComplete((s,t) -> { 501 if (t != null) { 502 // If an exception occurred, release the 503 // ref count for the current operation, as 504 // it may never be triggered otherwise 505 // (BodySubscriber ofInputStream) 506 // If there was no exception then the 507 // ref count will be/have been released when 508 // the last byte of the response is/was received 509 refCountTracker.tryRelease(); 510 } 511 }); 512 } 513 514 515 private void onFinished() { 516 asyncReceiver.clear(); 517 if (return2Cache) { 518 Log.logTrace("Attempting to return connection to the pool: {0}", connection); 519 // TODO: need to do something here? 520 // connection.setAsyncCallbacks(null, null, null); 521 522 // don't return the connection to the cache if EOF happened. 523 if (debug.on()) 524 debug.log(connection.getConnectionFlow() + ": return to HTTP/1.1 pool"); 525 connection.closeOrReturnToCache(eof == null ? headers : null); 526 } 527 } 528 529 HttpHeaders responseHeaders() { 530 return headers; 531 } 532 533 int responseCode() { 534 return responseCode; 535 } 536 537 // ================ Support for plugging into Http1Receiver ================= 538 // ============================================================================ 539 540 // Callback: Error receiver: Consumer of Throwable. 541 void onReadError(Throwable t) { 542 Log.logError(t); 543 Receiver<?> receiver = receiver(readProgress); 544 if (t instanceof EOFException) { 545 debug.log(Level.DEBUG, "onReadError: received EOF"); 546 eof = (EOFException) t; 547 } 548 CompletableFuture<?> cf = receiver == null ? null : receiver.completion(); 549 debug.log(Level.DEBUG, () -> "onReadError: cf is " 550 + (cf == null ? "null" 551 : (cf.isDone() ? "already completed" 552 : "not yet completed"))); 553 if (cf != null) { 554 cf.completeExceptionally(t); 555 } else { 556 debug.log(Level.DEBUG, "onReadError", t); 557 } 558 debug.log(Level.DEBUG, () -> "closing connection: cause is " + t); 559 connection.close(); 560 } 561 562 // ======================================================================== 563 564 private State advance(State previous) { 565 assert readProgress == previous; 566 switch(previous) { 567 case READING_HEADERS: 568 asyncReceiver.unsubscribe(headersReader); 569 return readProgress = State.READING_BODY; 570 case READING_BODY: 571 asyncReceiver.unsubscribe(bodyReader); 572 return readProgress = State.DONE; 573 default: 574 throw new InternalError("can't advance from " + previous); 575 } 576 } 577 578 Receiver<?> receiver(State state) { 579 switch(state) { 580 case READING_HEADERS: return headersReader; 581 case READING_BODY: return bodyReader; 582 default: return null; 583 } 584 585 } 586 587 static abstract class Receiver<T> 588 implements Http1AsyncReceiver.Http1AsyncDelegate { 589 abstract void start(T parser); 590 abstract CompletableFuture<State> completion(); 591 // accepts a buffer from upstream. 592 // this should be implemented as a simple call to 593 // accept(ref, parser, cf) 594 public abstract boolean tryAsyncReceive(ByteBuffer buffer); 595 public abstract void onReadError(Throwable t); 596 // handle a byte buffer received from upstream. 597 // this method should set the value of Http1Response.buffer 598 // to ref.get() before beginning parsing. 599 abstract void handle(ByteBuffer buf, T parser, 600 CompletableFuture<State> cf); 601 // resets this objects state so that it can be reused later on 602 // typically puts the reference to parser and completion to null 603 abstract void reset(); 604 605 // accepts a byte buffer received from upstream 606 // returns true if the buffer is fully parsed and more data can 607 // be accepted, false otherwise. 608 final boolean accept(ByteBuffer buf, T parser, 609 CompletableFuture<State> cf) { 610 if (cf == null || parser == null || cf.isDone()) return false; 611 handle(buf, parser, cf); 612 return !cf.isDone(); 613 } 614 public abstract void onSubscribe(AbstractSubscription s); 615 public abstract AbstractSubscription subscription(); 616 617 } 618 619 // Invoked with each new ByteBuffer when reading headers... 620 final class HeadersReader extends Receiver<Http1HeaderParser> { 621 final Consumer<State> onComplete; 622 volatile Http1HeaderParser parser; 623 volatile CompletableFuture<State> cf; 624 volatile long count; // bytes parsed (for debug) 625 volatile AbstractSubscription subscription; 626 627 HeadersReader(Consumer<State> onComplete) { 628 this.onComplete = onComplete; 629 } 630 631 @Override 632 public AbstractSubscription subscription() { 633 return subscription; 634 } 635 636 @Override 637 public void onSubscribe(AbstractSubscription s) { 638 this.subscription = s; 639 s.request(1); 640 } 641 642 @Override 643 void reset() { 644 cf = null; 645 parser = null; 646 count = 0; 647 subscription = null; 648 } 649 650 // Revisit: do we need to support restarting? 651 @Override 652 final void start(Http1HeaderParser hp) { 653 count = 0; 654 cf = new MinimalFuture<>(); 655 parser = hp; 656 } 657 658 @Override 659 CompletableFuture<State> completion() { 660 return cf; 661 } 662 663 @Override 664 public final boolean tryAsyncReceive(ByteBuffer ref) { 665 boolean hasDemand = subscription.demand().tryDecrement(); 666 assert hasDemand; 667 boolean needsMore = accept(ref, parser, cf); 668 if (needsMore) subscription.request(1); 669 return needsMore; 670 } 671 672 @Override 673 public final void onReadError(Throwable t) { 674 t = wrapWithExtraDetail(t, parser::currentStateMessage); 675 Http1Response.this.onReadError(t); 676 } 677 678 @Override 679 final void handle(ByteBuffer b, 680 Http1HeaderParser parser, 681 CompletableFuture<State> cf) { 682 assert cf != null : "parsing not started"; 683 assert parser != null : "no parser"; 684 try { 685 count += b.remaining(); 686 if (debug.on()) 687 debug.log("Sending " + b.remaining() + "/" + b.capacity() 688 + " bytes to header parser"); 689 if (parser.parse(b)) { 690 count -= b.remaining(); 691 if (debug.on()) 692 debug.log("Parsing headers completed. bytes=" + count); 693 onComplete.accept(State.READING_HEADERS); 694 cf.complete(State.READING_HEADERS); 695 } 696 } catch (Throwable t) { 697 if (debug.on()) 698 debug.log("Header parser failed to handle buffer: " + t); 699 cf.completeExceptionally(t); 700 } 701 } 702 703 @Override 704 public void close(Throwable error) { 705 // if there's no error nothing to do: the cf should/will 706 // be completed. 707 if (error != null) { 708 CompletableFuture<State> cf = this.cf; 709 if (cf != null) { 710 if (debug.on()) 711 debug.log("close: completing header parser CF with " + error); 712 cf.completeExceptionally(error); 713 } 714 } 715 } 716 } 717 718 // Invoked with each new ByteBuffer when reading bodies... 719 final class BodyReader extends Receiver<BodyParser> { 720 final Consumer<State> onComplete; 721 volatile BodyParser parser; 722 volatile CompletableFuture<State> cf; 723 volatile AbstractSubscription subscription; 724 BodyReader(Consumer<State> onComplete) { 725 this.onComplete = onComplete; 726 } 727 728 @Override 729 void reset() { 730 parser = null; 731 cf = null; 732 subscription = null; 733 } 734 735 // Revisit: do we need to support restarting? 736 @Override 737 final void start(BodyParser parser) { 738 cf = new MinimalFuture<>(); 739 this.parser = parser; 740 } 741 742 @Override 743 CompletableFuture<State> completion() { 744 return cf; 745 } 746 747 @Override 748 public final boolean tryAsyncReceive(ByteBuffer b) { 749 return accept(b, parser, cf); 750 } 751 752 @Override 753 public final void onReadError(Throwable t) { 754 if (t instanceof EOFException && bodyParser != null && 755 bodyParser instanceof UnknownLengthBodyParser) { 756 ((UnknownLengthBodyParser)bodyParser).complete(); 757 return; 758 } 759 t = wrapWithExtraDetail(t, parser::currentStateMessage); 760 Http1Response.this.onReadError(t); 761 } 762 763 @Override 764 public AbstractSubscription subscription() { 765 return subscription; 766 } 767 768 @Override 769 public void onSubscribe(AbstractSubscription s) { 770 this.subscription = s; 771 try { 772 parser.onSubscribe(s); 773 } catch (Throwable t) { 774 cf.completeExceptionally(t); 775 throw t; 776 } 777 } 778 779 @Override 780 final void handle(ByteBuffer b, 781 BodyParser parser, 782 CompletableFuture<State> cf) { 783 assert cf != null : "parsing not started"; 784 assert parser != null : "no parser"; 785 try { 786 if (debug.on()) 787 debug.log("Sending " + b.remaining() + "/" + b.capacity() 788 + " bytes to body parser"); 789 parser.accept(b); 790 } catch (Throwable t) { 791 if (debug.on()) 792 debug.log("Body parser failed to handle buffer: " + t); 793 if (!cf.isDone()) { 794 cf.completeExceptionally(t); 795 } 796 } 797 } 798 799 final void onComplete(Throwable closedExceptionally) { 800 if (cf.isDone()) return; 801 if (closedExceptionally != null) { 802 cf.completeExceptionally(closedExceptionally); 803 } else { 804 onComplete.accept(State.READING_BODY); 805 cf.complete(State.READING_BODY); 806 } 807 } 808 809 @Override 810 public final void close(Throwable error) { 811 CompletableFuture<State> cf = this.cf; 812 if (cf != null && !cf.isDone()) { 813 // we want to make sure dependent actions are triggered 814 // in order to make sure the client reference count 815 // is decremented 816 if (error != null) { 817 if (debug.on()) 818 debug.log("close: completing body parser CF with " + error); 819 cf.completeExceptionally(error); 820 } else { 821 if (debug.on()) 822 debug.log("close: completing body parser CF"); 823 cf.complete(State.READING_BODY); 824 } 825 } 826 } 827 828 @Override 829 public String toString() { 830 return super.toString() + "/parser=" + String.valueOf(parser); 831 } 832 } 833 } 834