1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.EOFException; 29 import java.lang.System.Logger.Level; 30 import java.nio.ByteBuffer; 31 import java.util.List; 32 import java.util.concurrent.CompletableFuture; 33 import java.util.concurrent.CompletionStage; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.Flow; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import java.util.concurrent.atomic.AtomicLong; 38 import java.util.function.Consumer; 39 import java.util.function.Function; 40 import java.net.http.HttpHeaders; 41 import java.net.http.HttpResponse; 42 import jdk.internal.net.http.ResponseContent.BodyParser; 43 import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser; 44 import jdk.internal.net.http.common.Log; 45 import jdk.internal.net.http.common.Logger; 46 import jdk.internal.net.http.common.MinimalFuture; 47 import jdk.internal.net.http.common.Utils; 48 import static java.net.http.HttpClient.Version.HTTP_1_1; 49 import static java.net.http.HttpResponse.BodySubscribers.discarding; 50 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; 51 import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED; 52 53 /** 54 * Handles a HTTP/1.1 response (headers + body). 55 * There can be more than one of these per Http exchange. 56 */ 57 class Http1Response<T> { 58 59 private volatile ResponseContent content; 60 private final HttpRequestImpl request; 61 private Response response; 62 private final HttpConnection connection; 63 private HttpHeaders headers; 64 private int responseCode; 65 private final Http1Exchange<T> exchange; 66 private boolean return2Cache; // return connection to cache when finished 67 private final HeadersReader headersReader; // used to read the headers 68 private final BodyReader bodyReader; // used to read the body 69 private final Http1AsyncReceiver asyncReceiver; 70 private volatile EOFException eof; 71 private volatile BodyParser bodyParser; 72 // max number of bytes of (fixed length) body to ignore on redirect 73 private final static int MAX_IGNORE = 1024; 74 75 // Revisit: can we get rid of this? 76 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} 77 private volatile State readProgress = State.INITIAL; 78 79 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 80 final static AtomicLong responseCount = new AtomicLong(); 81 final long id = responseCount.incrementAndGet(); 82 private Http1HeaderParser hd; 83 Http1Response(HttpConnection conn, Http1Exchange<T> exchange, Http1AsyncReceiver asyncReceiver)84 Http1Response(HttpConnection conn, 85 Http1Exchange<T> exchange, 86 Http1AsyncReceiver asyncReceiver) { 87 this.readProgress = State.INITIAL; 88 this.request = exchange.request(); 89 this.exchange = exchange; 90 this.connection = conn; 91 this.asyncReceiver = asyncReceiver; 92 headersReader = new HeadersReader(this::advance); 93 bodyReader = new BodyReader(this::advance); 94 95 hd = new Http1HeaderParser(); 96 readProgress = State.READING_HEADERS; 97 headersReader.start(hd); 98 asyncReceiver.subscribe(headersReader); 99 } 100 101 String dbgTag; dbgString()102 private String dbgString() { 103 String dbg = dbgTag; 104 if (dbg == null) { 105 String cdbg = connection.dbgTag; 106 if (cdbg != null) { 107 dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")"; 108 } else { 109 dbg = "Http1Response(id=" + id + ")"; 110 } 111 } 112 return dbg; 113 } 114 115 // The ClientRefCountTracker is used to track the state 116 // of a pending operation. Altough there usually is a single 117 // point where the operation starts, it may terminate at 118 // different places. 119 private final class ClientRefCountTracker { 120 final HttpClientImpl client = connection.client(); 121 // state & 0x01 != 0 => acquire called 122 // state & 0x02 != 0 => tryRelease called 123 byte state; 124 acquire()125 public synchronized void acquire() { 126 if (state == 0) { 127 // increment the reference count on the HttpClientImpl 128 // to prevent the SelectorManager thread from exiting 129 // until our operation is complete. 130 if (debug.on()) 131 debug.log("Operation started: incrementing ref count for %s", client); 132 client.reference(); 133 state = 0x01; 134 } else { 135 if (debug.on()) 136 debug.log("Operation ref count for %s is already %s", 137 client, ((state & 0x2) == 0x2) ? "released." : "incremented!" ); 138 assert (state & 0x01) == 0 : "reference count already incremented"; 139 } 140 } 141 tryRelease()142 public synchronized void tryRelease() { 143 if (state == 0x01) { 144 // decrement the reference count on the HttpClientImpl 145 // to allow the SelectorManager thread to exit if no 146 // other operation is pending and the facade is no 147 // longer referenced. 148 if (debug.on()) 149 debug.log("Operation finished: decrementing ref count for %s", client); 150 client.unreference(); 151 } else if (state == 0) { 152 if (debug.on()) 153 debug.log("Operation finished: releasing ref count for %s", client); 154 } else if ((state & 0x02) == 0x02) { 155 if (debug.on()) 156 debug.log("ref count for %s already released", client); 157 } 158 state |= 0x02; 159 } 160 } 161 162 private volatile boolean firstTimeAround = true; 163 readHeadersAsync(Executor executor)164 public CompletableFuture<Response> readHeadersAsync(Executor executor) { 165 if (debug.on()) 166 debug.log("Reading Headers: (remaining: " 167 + asyncReceiver.remaining() +") " + readProgress); 168 169 if (firstTimeAround) { 170 if (debug.on()) debug.log("First time around"); 171 firstTimeAround = false; 172 } else { 173 // with expect continue we will resume reading headers + body. 174 asyncReceiver.unsubscribe(bodyReader); 175 bodyReader.reset(); 176 177 hd = new Http1HeaderParser(); 178 readProgress = State.READING_HEADERS; 179 headersReader.reset(); 180 headersReader.start(hd); 181 asyncReceiver.subscribe(headersReader); 182 } 183 184 CompletableFuture<State> cf = headersReader.completion(); 185 assert cf != null : "parsing not started"; 186 if (debug.on()) { 187 debug.log("headersReader is %s", 188 cf == null ? "not yet started" 189 : cf.isDone() ? "already completed" 190 : "not yet completed"); 191 } 192 193 Function<State, Response> lambda = (State completed) -> { 194 assert completed == State.READING_HEADERS; 195 if (debug.on()) 196 debug.log("Reading Headers: creating Response object;" 197 + " state is now " + readProgress); 198 asyncReceiver.unsubscribe(headersReader); 199 responseCode = hd.responseCode(); 200 headers = hd.headers(); 201 202 response = new Response(request, 203 exchange.getExchange(), 204 headers, 205 connection, 206 responseCode, 207 HTTP_1_1); 208 209 if (Log.headers()) { 210 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 211 Log.dumpHeaders(sb, " ", headers); 212 Log.logHeaders(sb.toString()); 213 } 214 215 return response; 216 }; 217 218 if (executor != null) { 219 return cf.thenApplyAsync(lambda, executor); 220 } else { 221 return cf.thenApply(lambda); 222 } 223 } 224 225 private boolean finished; 226 completed()227 synchronized void completed() { 228 finished = true; 229 } 230 finished()231 synchronized boolean finished() { 232 return finished; 233 } 234 235 /** 236 * Return known fixed content length or -1 if chunked, or -2 if no content-length 237 * information in which case, connection termination delimits the response body 238 */ fixupContentLen(long clen)239 long fixupContentLen(long clen) { 240 if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) { 241 return 0L; 242 } 243 if (clen == -1L) { 244 if (headers.firstValue("Transfer-encoding").orElse("") 245 .equalsIgnoreCase("chunked")) { 246 return -1L; 247 } 248 if (responseCode == 101) { 249 // this is a h2c or websocket upgrade, contentlength must be zero 250 return 0L; 251 } 252 return -2L; 253 } 254 return clen; 255 } 256 257 /** 258 * Read up to MAX_IGNORE bytes discarding 259 */ ignoreBody(Executor executor)260 public CompletableFuture<Void> ignoreBody(Executor executor) { 261 int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); 262 if (clen == -1 || clen > MAX_IGNORE) { 263 connection.close(); 264 return MinimalFuture.completedFuture(null); // not treating as error 265 } else { 266 return readBody(discarding(), true, executor); 267 } 268 } 269 270 static final Flow.Subscription NOP = new Flow.Subscription() { 271 @Override 272 public void request(long n) { } 273 public void cancel() { } 274 }; 275 276 /** 277 * The Http1AsyncReceiver ensures that all calls to 278 * the subscriber, including onSubscribe, occur sequentially. 279 * There could however be some race conditions that could happen 280 * in case of unexpected errors thrown at unexpected places, which 281 * may cause onError to be called multiple times. 282 * The Http1BodySubscriber will ensure that the user subscriber 283 * is actually completed only once - and only after it is 284 * subscribed. 285 * @param <U> The type of response. 286 */ 287 final static class Http1BodySubscriber<U> implements HttpResponse.BodySubscriber<U> { 288 final HttpResponse.BodySubscriber<U> userSubscriber; 289 final AtomicBoolean completed = new AtomicBoolean(); 290 volatile Throwable withError; 291 volatile boolean subscribed; Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber)292 Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) { 293 this.userSubscriber = userSubscriber; 294 } 295 296 // propagate the error to the user subscriber, even if not 297 // subscribed yet. propagateError(Throwable t)298 private void propagateError(Throwable t) { 299 assert t != null; 300 try { 301 // if unsubscribed at this point, it will not 302 // get subscribed later - so do it now and 303 // propagate the error 304 if (subscribed == false) { 305 subscribed = true; 306 userSubscriber.onSubscribe(NOP); 307 } 308 } finally { 309 // if onError throws then there is nothing to do 310 // here: let the caller deal with it by logging 311 // and closing the connection. 312 userSubscriber.onError(t); 313 } 314 } 315 316 // complete the subscriber, either normally or exceptionally 317 // ensure that the subscriber is completed only once. complete(Throwable t)318 private void complete(Throwable t) { 319 if (completed.compareAndSet(false, true)) { 320 t = withError = Utils.getCompletionCause(t); 321 if (t == null) { 322 assert subscribed; 323 try { 324 userSubscriber.onComplete(); 325 } catch (Throwable x) { 326 // Simply propagate the error by calling 327 // onError on the user subscriber, and let the 328 // connection be reused since we should have received 329 // and parsed all the bytes when we reach here. 330 // If onError throws in turn, then we will simply 331 // let that new exception flow up to the caller 332 // and let it deal with it. 333 // (i.e: log and close the connection) 334 // Note that rethrowing here could introduce a 335 // race that might cause the next send() operation to 336 // fail as the connection has already been put back 337 // into the cache when we reach here. 338 propagateError(t = withError = Utils.getCompletionCause(x)); 339 } 340 } else { 341 propagateError(t); 342 } 343 } 344 } 345 346 @Override getBody()347 public CompletionStage<U> getBody() { 348 return userSubscriber.getBody(); 349 } 350 @Override onSubscribe(Flow.Subscription subscription)351 public void onSubscribe(Flow.Subscription subscription) { 352 if (!subscribed) { 353 subscribed = true; 354 userSubscriber.onSubscribe(subscription); 355 } else { 356 // could be already subscribed and completed 357 // if an unexpected error occurred before the actual 358 // subscription - though that's not supposed 359 // happen. 360 assert completed.get(); 361 } 362 } 363 @Override onNext(List<ByteBuffer> item)364 public void onNext(List<ByteBuffer> item) { 365 assert !completed.get(); 366 userSubscriber.onNext(item); 367 } 368 @Override onError(Throwable throwable)369 public void onError(Throwable throwable) { 370 complete(throwable); 371 } 372 @Override onComplete()373 public void onComplete() { 374 complete(null); 375 } 376 } 377 readBody(HttpResponse.BodySubscriber<U> p, boolean return2Cache, Executor executor)378 public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p, 379 boolean return2Cache, 380 Executor executor) { 381 this.return2Cache = return2Cache; 382 final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p); 383 384 final CompletableFuture<U> cf = new MinimalFuture<>(); 385 386 long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L); 387 final long clen = fixupContentLen(clen0); 388 389 // expect-continue reads headers and body twice. 390 // if we reach here, we must reset the headersReader state. 391 asyncReceiver.unsubscribe(headersReader); 392 headersReader.reset(); 393 ClientRefCountTracker refCountTracker = new ClientRefCountTracker(); 394 395 // We need to keep hold on the client facade until the 396 // tracker has been incremented. 397 connection.client().reference(); 398 executor.execute(() -> { 399 try { 400 content = new ResponseContent( 401 connection, clen, headers, subscriber, 402 this::onFinished 403 ); 404 if (cf.isCompletedExceptionally()) { 405 // if an error occurs during subscription 406 connection.close(); 407 return; 408 } 409 // increment the reference count on the HttpClientImpl 410 // to prevent the SelectorManager thread from exiting until 411 // the body is fully read. 412 refCountTracker.acquire(); 413 bodyParser = content.getBodyParser( 414 (t) -> { 415 try { 416 if (t != null) { 417 try { 418 subscriber.onError(t); 419 } finally { 420 cf.completeExceptionally(t); 421 } 422 } 423 } finally { 424 bodyReader.onComplete(t); 425 if (t != null) { 426 connection.close(); 427 } 428 } 429 }); 430 bodyReader.start(bodyParser); 431 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); 432 asyncReceiver.subscribe(bodyReader); 433 assert bodyReaderCF != null : "parsing not started"; 434 // Make sure to keep a reference to asyncReceiver from 435 // within this 436 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> { 437 t = Utils.getCompletionCause(t); 438 try { 439 if (t == null) { 440 if (debug.on()) debug.log("Finished reading body: " + s); 441 assert s == State.READING_BODY; 442 } 443 if (t != null) { 444 subscriber.onError(t); 445 cf.completeExceptionally(t); 446 } 447 } catch (Throwable x) { 448 // not supposed to happen 449 asyncReceiver.onReadError(x); 450 } finally { 451 // we're done: release the ref count for 452 // the current operation. 453 refCountTracker.tryRelease(); 454 } 455 }); 456 connection.addTrailingOperation(trailingOp); 457 } catch (Throwable t) { 458 if (debug.on()) debug.log("Failed reading body: " + t); 459 try { 460 subscriber.onError(t); 461 cf.completeExceptionally(t); 462 } finally { 463 asyncReceiver.onReadError(t); 464 } 465 } finally { 466 connection.client().unreference(); 467 } 468 }); 469 try { 470 p.getBody().whenComplete((U u, Throwable t) -> { 471 if (t == null) 472 cf.complete(u); 473 else 474 cf.completeExceptionally(t); 475 }); 476 } catch (Throwable t) { 477 cf.completeExceptionally(t); 478 asyncReceiver.setRetryOnError(false); 479 asyncReceiver.onReadError(t); 480 } 481 482 return cf.whenComplete((s,t) -> { 483 if (t != null) { 484 // If an exception occurred, release the 485 // ref count for the current operation, as 486 // it may never be triggered otherwise 487 // (BodySubscriber ofInputStream) 488 // If there was no exception then the 489 // ref count will be/have been released when 490 // the last byte of the response is/was received 491 refCountTracker.tryRelease(); 492 } 493 }); 494 } 495 496 497 private void onFinished() { 498 asyncReceiver.clear(); 499 if (return2Cache) { 500 Log.logTrace("Attempting to return connection to the pool: {0}", connection); 501 // TODO: need to do something here? 502 // connection.setAsyncCallbacks(null, null, null); 503 504 // don't return the connection to the cache if EOF happened. 505 if (debug.on()) 506 debug.log(connection.getConnectionFlow() + ": return to HTTP/1.1 pool"); 507 connection.closeOrReturnToCache(eof == null ? headers : null); 508 } 509 } 510 511 HttpHeaders responseHeaders() { 512 return headers; 513 } 514 515 int responseCode() { 516 return responseCode; 517 } 518 519 // ================ Support for plugging into Http1Receiver ================= 520 // ============================================================================ 521 522 // Callback: Error receiver: Consumer of Throwable. 523 void onReadError(Throwable t) { 524 Log.logError(t); 525 Receiver<?> receiver = receiver(readProgress); 526 if (t instanceof EOFException) { 527 debug.log(Level.DEBUG, "onReadError: received EOF"); 528 eof = (EOFException) t; 529 } 530 CompletableFuture<?> cf = receiver == null ? null : receiver.completion(); 531 debug.log(Level.DEBUG, () -> "onReadError: cf is " 532 + (cf == null ? "null" 533 : (cf.isDone() ? "already completed" 534 : "not yet completed"))); 535 if (cf != null) { 536 cf.completeExceptionally(t); 537 } else { 538 debug.log(Level.DEBUG, "onReadError", t); 539 } 540 debug.log(Level.DEBUG, () -> "closing connection: cause is " + t); 541 connection.close(); 542 } 543 544 // ======================================================================== 545 546 private State advance(State previous) { 547 assert readProgress == previous; 548 switch(previous) { 549 case READING_HEADERS: 550 asyncReceiver.unsubscribe(headersReader); 551 return readProgress = State.READING_BODY; 552 case READING_BODY: 553 asyncReceiver.unsubscribe(bodyReader); 554 return readProgress = State.DONE; 555 default: 556 throw new InternalError("can't advance from " + previous); 557 } 558 } 559 560 Receiver<?> receiver(State state) { 561 switch(state) { 562 case READING_HEADERS: return headersReader; 563 case READING_BODY: return bodyReader; 564 default: return null; 565 } 566 567 } 568 569 static abstract class Receiver<T> 570 implements Http1AsyncReceiver.Http1AsyncDelegate { 571 abstract void start(T parser); 572 abstract CompletableFuture<State> completion(); 573 // accepts a buffer from upstream. 574 // this should be implemented as a simple call to 575 // accept(ref, parser, cf) 576 public abstract boolean tryAsyncReceive(ByteBuffer buffer); 577 public abstract void onReadError(Throwable t); 578 // handle a byte buffer received from upstream. 579 // this method should set the value of Http1Response.buffer 580 // to ref.get() before beginning parsing. 581 abstract void handle(ByteBuffer buf, T parser, 582 CompletableFuture<State> cf); 583 // resets this objects state so that it can be reused later on 584 // typically puts the reference to parser and completion to null 585 abstract void reset(); 586 587 // accepts a byte buffer received from upstream 588 // returns true if the buffer is fully parsed and more data can 589 // be accepted, false otherwise. 590 final boolean accept(ByteBuffer buf, T parser, 591 CompletableFuture<State> cf) { 592 if (cf == null || parser == null || cf.isDone()) return false; 593 handle(buf, parser, cf); 594 return !cf.isDone(); 595 } 596 public abstract void onSubscribe(AbstractSubscription s); 597 public abstract AbstractSubscription subscription(); 598 599 } 600 601 // Invoked with each new ByteBuffer when reading headers... 602 final class HeadersReader extends Receiver<Http1HeaderParser> { 603 final Consumer<State> onComplete; 604 volatile Http1HeaderParser parser; 605 volatile CompletableFuture<State> cf; 606 volatile long count; // bytes parsed (for debug) 607 volatile AbstractSubscription subscription; 608 609 HeadersReader(Consumer<State> onComplete) { 610 this.onComplete = onComplete; 611 } 612 613 @Override 614 public AbstractSubscription subscription() { 615 return subscription; 616 } 617 618 @Override 619 public void onSubscribe(AbstractSubscription s) { 620 this.subscription = s; 621 s.request(1); 622 } 623 624 @Override 625 void reset() { 626 cf = null; 627 parser = null; 628 count = 0; 629 subscription = null; 630 } 631 632 // Revisit: do we need to support restarting? 633 @Override 634 final void start(Http1HeaderParser hp) { 635 count = 0; 636 cf = new MinimalFuture<>(); 637 parser = hp; 638 } 639 640 @Override 641 CompletableFuture<State> completion() { 642 return cf; 643 } 644 645 @Override 646 public final boolean tryAsyncReceive(ByteBuffer ref) { 647 boolean hasDemand = subscription.demand().tryDecrement(); 648 assert hasDemand; 649 boolean needsMore = accept(ref, parser, cf); 650 if (needsMore) subscription.request(1); 651 return needsMore; 652 } 653 654 @Override 655 public final void onReadError(Throwable t) { 656 t = wrapWithExtraDetail(t, parser::currentStateMessage); 657 Http1Response.this.onReadError(t); 658 } 659 660 @Override 661 final void handle(ByteBuffer b, 662 Http1HeaderParser parser, 663 CompletableFuture<State> cf) { 664 assert cf != null : "parsing not started"; 665 assert parser != null : "no parser"; 666 try { 667 count += b.remaining(); 668 if (debug.on()) 669 debug.log("Sending " + b.remaining() + "/" + b.capacity() 670 + " bytes to header parser"); 671 if (parser.parse(b)) { 672 count -= b.remaining(); 673 if (debug.on()) 674 debug.log("Parsing headers completed. bytes=" + count); 675 onComplete.accept(State.READING_HEADERS); 676 cf.complete(State.READING_HEADERS); 677 } 678 } catch (Throwable t) { 679 if (debug.on()) 680 debug.log("Header parser failed to handle buffer: " + t); 681 cf.completeExceptionally(t); 682 } 683 } 684 685 @Override 686 public void close(Throwable error) { 687 // if there's no error nothing to do: the cf should/will 688 // be completed. 689 if (error != null) { 690 CompletableFuture<State> cf = this.cf; 691 if (cf != null) { 692 if (debug.on()) 693 debug.log("close: completing header parser CF with " + error); 694 cf.completeExceptionally(error); 695 } 696 } 697 } 698 } 699 700 // Invoked with each new ByteBuffer when reading bodies... 701 final class BodyReader extends Receiver<BodyParser> { 702 final Consumer<State> onComplete; 703 volatile BodyParser parser; 704 volatile CompletableFuture<State> cf; 705 volatile AbstractSubscription subscription; 706 BodyReader(Consumer<State> onComplete) { 707 this.onComplete = onComplete; 708 } 709 710 @Override 711 void reset() { 712 parser = null; 713 cf = null; 714 subscription = null; 715 } 716 717 // Revisit: do we need to support restarting? 718 @Override 719 final void start(BodyParser parser) { 720 cf = new MinimalFuture<>(); 721 this.parser = parser; 722 } 723 724 @Override 725 CompletableFuture<State> completion() { 726 return cf; 727 } 728 729 @Override 730 public final boolean tryAsyncReceive(ByteBuffer b) { 731 return accept(b, parser, cf); 732 } 733 734 @Override 735 public final void onReadError(Throwable t) { 736 if (t instanceof EOFException && bodyParser != null && 737 bodyParser instanceof UnknownLengthBodyParser) { 738 ((UnknownLengthBodyParser)bodyParser).complete(); 739 return; 740 } 741 t = wrapWithExtraDetail(t, parser::currentStateMessage); 742 Http1Response.this.onReadError(t); 743 } 744 745 @Override 746 public AbstractSubscription subscription() { 747 return subscription; 748 } 749 750 @Override 751 public void onSubscribe(AbstractSubscription s) { 752 this.subscription = s; 753 try { 754 parser.onSubscribe(s); 755 } catch (Throwable t) { 756 cf.completeExceptionally(t); 757 throw t; 758 } 759 } 760 761 @Override 762 final void handle(ByteBuffer b, 763 BodyParser parser, 764 CompletableFuture<State> cf) { 765 assert cf != null : "parsing not started"; 766 assert parser != null : "no parser"; 767 try { 768 if (debug.on()) 769 debug.log("Sending " + b.remaining() + "/" + b.capacity() 770 + " bytes to body parser"); 771 parser.accept(b); 772 } catch (Throwable t) { 773 if (debug.on()) 774 debug.log("Body parser failed to handle buffer: " + t); 775 if (!cf.isDone()) { 776 cf.completeExceptionally(t); 777 } 778 } 779 } 780 781 final void onComplete(Throwable closedExceptionally) { 782 if (cf.isDone()) return; 783 if (closedExceptionally != null) { 784 cf.completeExceptionally(closedExceptionally); 785 } else { 786 onComplete.accept(State.READING_BODY); 787 cf.complete(State.READING_BODY); 788 } 789 } 790 791 @Override 792 public final void close(Throwable error) { 793 CompletableFuture<State> cf = this.cf; 794 if (cf != null && !cf.isDone()) { 795 // we want to make sure dependent actions are triggered 796 // in order to make sure the client reference count 797 // is decremented 798 if (error != null) { 799 if (debug.on()) 800 debug.log("close: completing body parser CF with " + error); 801 cf.completeExceptionally(error); 802 } else { 803 if (debug.on()) 804 debug.log("close: completing body parser CF"); 805 cf.complete(State.READING_BODY); 806 } 807 } 808 } 809 810 @Override 811 public String toString() { 812 return super.toString() + "/parser=" + String.valueOf(parser); 813 } 814 } 815 } 816