1 /* 2 * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.EOFException; 29 import java.lang.System.Logger.Level; 30 import java.nio.ByteBuffer; 31 import java.util.List; 32 import java.util.concurrent.CompletableFuture; 33 import java.util.concurrent.CompletionStage; 34 import java.util.concurrent.Executor; 35 import java.util.concurrent.Flow; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import java.util.concurrent.atomic.AtomicLong; 38 import java.util.function.Consumer; 39 import java.util.function.Function; 40 import java.net.http.HttpHeaders; 41 import java.net.http.HttpResponse; 42 import jdk.internal.net.http.ResponseContent.BodyParser; 43 import jdk.internal.net.http.ResponseContent.UnknownLengthBodyParser; 44 import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber; 45 import jdk.internal.net.http.common.Log; 46 import jdk.internal.net.http.common.Logger; 47 import jdk.internal.net.http.common.MinimalFuture; 48 import jdk.internal.net.http.common.Utils; 49 import static java.net.http.HttpClient.Version.HTTP_1_1; 50 import static java.net.http.HttpResponse.BodySubscribers.discarding; 51 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; 52 import static jdk.internal.net.http.RedirectFilter.HTTP_NOT_MODIFIED; 53 54 /** 55 * Handles a HTTP/1.1 response (headers + body). 56 * There can be more than one of these per Http exchange. 57 */ 58 class Http1Response<T> { 59 60 private volatile ResponseContent content; 61 private final HttpRequestImpl request; 62 private Response response; 63 private final HttpConnection connection; 64 private HttpHeaders headers; 65 private int responseCode; 66 private final Http1Exchange<T> exchange; 67 private boolean return2Cache; // return connection to cache when finished 68 private final HeadersReader headersReader; // used to read the headers 69 private final BodyReader bodyReader; // used to read the body 70 private final Http1AsyncReceiver asyncReceiver; 71 private volatile EOFException eof; 72 private volatile BodyParser bodyParser; 73 // max number of bytes of (fixed length) body to ignore on redirect 74 private final static int MAX_IGNORE = 1024; 75 76 // Revisit: can we get rid of this? 77 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} 78 private volatile State readProgress = State.INITIAL; 79 80 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 81 final static AtomicLong responseCount = new AtomicLong(); 82 final long id = responseCount.incrementAndGet(); 83 private Http1HeaderParser hd; 84 Http1Response(HttpConnection conn, Http1Exchange<T> exchange, Http1AsyncReceiver asyncReceiver)85 Http1Response(HttpConnection conn, 86 Http1Exchange<T> exchange, 87 Http1AsyncReceiver asyncReceiver) { 88 this.readProgress = State.INITIAL; 89 this.request = exchange.request(); 90 this.exchange = exchange; 91 this.connection = conn; 92 this.asyncReceiver = asyncReceiver; 93 headersReader = new HeadersReader(this::advance); 94 bodyReader = new BodyReader(this::advance); 95 96 hd = new Http1HeaderParser(); 97 readProgress = State.READING_HEADERS; 98 headersReader.start(hd); 99 asyncReceiver.subscribe(headersReader); 100 } 101 102 String dbgTag; dbgString()103 private String dbgString() { 104 String dbg = dbgTag; 105 if (dbg == null) { 106 String cdbg = connection.dbgTag; 107 if (cdbg != null) { 108 dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")"; 109 } else { 110 dbg = "Http1Response(id=" + id + ")"; 111 } 112 } 113 return dbg; 114 } 115 116 // The ClientRefCountTracker is used to track the state 117 // of a pending operation. Altough there usually is a single 118 // point where the operation starts, it may terminate at 119 // different places. 120 private final class ClientRefCountTracker { 121 final HttpClientImpl client = connection.client(); 122 // state & 0x01 != 0 => acquire called 123 // state & 0x02 != 0 => tryRelease called 124 byte state; 125 acquire()126 public synchronized void acquire() { 127 if (state == 0) { 128 // increment the reference count on the HttpClientImpl 129 // to prevent the SelectorManager thread from exiting 130 // until our operation is complete. 131 if (debug.on()) 132 debug.log("Operation started: incrementing ref count for %s", client); 133 client.reference(); 134 state = 0x01; 135 } else { 136 if (debug.on()) 137 debug.log("Operation ref count for %s is already %s", 138 client, ((state & 0x2) == 0x2) ? "released." : "incremented!" ); 139 assert (state & 0x01) == 0 : "reference count already incremented"; 140 } 141 } 142 tryRelease()143 public synchronized void tryRelease() { 144 if (state == 0x01) { 145 // decrement the reference count on the HttpClientImpl 146 // to allow the SelectorManager thread to exit if no 147 // other operation is pending and the facade is no 148 // longer referenced. 149 if (debug.on()) 150 debug.log("Operation finished: decrementing ref count for %s", client); 151 client.unreference(); 152 } else if (state == 0) { 153 if (debug.on()) 154 debug.log("Operation finished: releasing ref count for %s", client); 155 } else if ((state & 0x02) == 0x02) { 156 if (debug.on()) 157 debug.log("ref count for %s already released", client); 158 } 159 state |= 0x02; 160 } 161 } 162 163 private volatile boolean firstTimeAround = true; 164 readHeadersAsync(Executor executor)165 public CompletableFuture<Response> readHeadersAsync(Executor executor) { 166 if (debug.on()) 167 debug.log("Reading Headers: (remaining: " 168 + asyncReceiver.remaining() +") " + readProgress); 169 170 if (firstTimeAround) { 171 if (debug.on()) debug.log("First time around"); 172 firstTimeAround = false; 173 } else { 174 // with expect continue we will resume reading headers + body. 175 asyncReceiver.unsubscribe(bodyReader); 176 bodyReader.reset(); 177 178 hd = new Http1HeaderParser(); 179 readProgress = State.READING_HEADERS; 180 headersReader.reset(); 181 headersReader.start(hd); 182 asyncReceiver.subscribe(headersReader); 183 } 184 185 CompletableFuture<State> cf = headersReader.completion(); 186 assert cf != null : "parsing not started"; 187 if (debug.on()) { 188 debug.log("headersReader is %s", 189 cf == null ? "not yet started" 190 : cf.isDone() ? "already completed" 191 : "not yet completed"); 192 } 193 194 Function<State, Response> lambda = (State completed) -> { 195 assert completed == State.READING_HEADERS; 196 if (debug.on()) 197 debug.log("Reading Headers: creating Response object;" 198 + " state is now " + readProgress); 199 asyncReceiver.unsubscribe(headersReader); 200 responseCode = hd.responseCode(); 201 headers = hd.headers(); 202 203 response = new Response(request, 204 exchange.getExchange(), 205 headers, 206 connection, 207 responseCode, 208 HTTP_1_1); 209 210 if (Log.headers()) { 211 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 212 Log.dumpHeaders(sb, " ", headers); 213 Log.logHeaders(sb.toString()); 214 } 215 216 return response; 217 }; 218 219 if (executor != null) { 220 return cf.thenApplyAsync(lambda, executor); 221 } else { 222 return cf.thenApply(lambda); 223 } 224 } 225 226 private boolean finished; 227 completed()228 synchronized void completed() { 229 finished = true; 230 } 231 finished()232 synchronized boolean finished() { 233 return finished; 234 } 235 236 /** 237 * Return known fixed content length or -1 if chunked, or -2 if no content-length 238 * information in which case, connection termination delimits the response body 239 */ fixupContentLen(long clen)240 long fixupContentLen(long clen) { 241 if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) { 242 return 0L; 243 } 244 if (clen == -1L) { 245 if (headers.firstValue("Transfer-encoding").orElse("") 246 .equalsIgnoreCase("chunked")) { 247 return -1L; 248 } 249 if (responseCode == 101) { 250 // this is a h2c or websocket upgrade, contentlength must be zero 251 return 0L; 252 } 253 return -2L; 254 } 255 return clen; 256 } 257 258 /** 259 * Read up to MAX_IGNORE bytes discarding 260 */ ignoreBody(Executor executor)261 public CompletableFuture<Void> ignoreBody(Executor executor) { 262 int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); 263 if (clen == -1 || clen > MAX_IGNORE) { 264 connection.close(); 265 return MinimalFuture.completedFuture(null); // not treating as error 266 } else { 267 return readBody(discarding(), true, executor); 268 } 269 } 270 271 // Used for those response codes that have no body associated nullBody(HttpResponse<T> resp, Throwable t)272 public void nullBody(HttpResponse<T> resp, Throwable t) { 273 if (t != null) connection.close(); 274 else { 275 return2Cache = !request.isWebSocket(); 276 onFinished(); 277 } 278 } 279 280 static final Flow.Subscription NOP = new Flow.Subscription() { 281 @Override 282 public void request(long n) { } 283 public void cancel() { } 284 }; 285 286 /** 287 * The Http1AsyncReceiver ensures that all calls to 288 * the subscriber, including onSubscribe, occur sequentially. 289 * There could however be some race conditions that could happen 290 * in case of unexpected errors thrown at unexpected places, which 291 * may cause onError to be called multiple times. 292 * The Http1BodySubscriber will ensure that the user subscriber 293 * is actually completed only once - and only after it is 294 * subscribed. 295 * @param <U> The type of response. 296 */ 297 final static class Http1BodySubscriber<U> implements TrustedSubscriber<U> { 298 final HttpResponse.BodySubscriber<U> userSubscriber; 299 final AtomicBoolean completed = new AtomicBoolean(); 300 volatile Throwable withError; 301 volatile boolean subscribed; Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber)302 Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) { 303 this.userSubscriber = userSubscriber; 304 } 305 306 @Override needsExecutor()307 public boolean needsExecutor() { 308 return TrustedSubscriber.needsExecutor(userSubscriber); 309 } 310 311 // propagate the error to the user subscriber, even if not 312 // subscribed yet. propagateError(Throwable t)313 private void propagateError(Throwable t) { 314 assert t != null; 315 try { 316 // if unsubscribed at this point, it will not 317 // get subscribed later - so do it now and 318 // propagate the error 319 if (subscribed == false) { 320 subscribed = true; 321 userSubscriber.onSubscribe(NOP); 322 } 323 } finally { 324 // if onError throws then there is nothing to do 325 // here: let the caller deal with it by logging 326 // and closing the connection. 327 userSubscriber.onError(t); 328 } 329 } 330 331 // complete the subscriber, either normally or exceptionally 332 // ensure that the subscriber is completed only once. complete(Throwable t)333 private void complete(Throwable t) { 334 if (completed.compareAndSet(false, true)) { 335 t = withError = Utils.getCompletionCause(t); 336 if (t == null) { 337 assert subscribed; 338 try { 339 userSubscriber.onComplete(); 340 } catch (Throwable x) { 341 // Simply propagate the error by calling 342 // onError on the user subscriber, and let the 343 // connection be reused since we should have received 344 // and parsed all the bytes when we reach here. 345 // If onError throws in turn, then we will simply 346 // let that new exception flow up to the caller 347 // and let it deal with it. 348 // (i.e: log and close the connection) 349 // Note that rethrowing here could introduce a 350 // race that might cause the next send() operation to 351 // fail as the connection has already been put back 352 // into the cache when we reach here. 353 propagateError(t = withError = Utils.getCompletionCause(x)); 354 } 355 } else { 356 propagateError(t); 357 } 358 } 359 } 360 361 @Override getBody()362 public CompletionStage<U> getBody() { 363 return userSubscriber.getBody(); 364 } 365 366 @Override onSubscribe(Flow.Subscription subscription)367 public void onSubscribe(Flow.Subscription subscription) { 368 if (!subscribed) { 369 subscribed = true; 370 userSubscriber.onSubscribe(subscription); 371 } else { 372 // could be already subscribed and completed 373 // if an unexpected error occurred before the actual 374 // subscription - though that's not supposed 375 // happen. 376 assert completed.get(); 377 } 378 } 379 @Override onNext(List<ByteBuffer> item)380 public void onNext(List<ByteBuffer> item) { 381 assert !completed.get(); 382 userSubscriber.onNext(item); 383 } 384 @Override onError(Throwable throwable)385 public void onError(Throwable throwable) { 386 complete(throwable); 387 } 388 @Override onComplete()389 public void onComplete() { 390 complete(null); 391 } 392 } 393 readBody(HttpResponse.BodySubscriber<U> p, boolean return2Cache, Executor executor)394 public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p, 395 boolean return2Cache, 396 Executor executor) { 397 this.return2Cache = return2Cache; 398 final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p); 399 400 final CompletableFuture<U> cf = new MinimalFuture<>(); 401 402 long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L); 403 final long clen = fixupContentLen(clen0); 404 405 // expect-continue reads headers and body twice. 406 // if we reach here, we must reset the headersReader state. 407 asyncReceiver.unsubscribe(headersReader); 408 headersReader.reset(); 409 ClientRefCountTracker refCountTracker = new ClientRefCountTracker(); 410 411 // We need to keep hold on the client facade until the 412 // tracker has been incremented. 413 connection.client().reference(); 414 executor.execute(() -> { 415 try { 416 content = new ResponseContent( 417 connection, clen, headers, subscriber, 418 this::onFinished 419 ); 420 if (cf.isCompletedExceptionally()) { 421 // if an error occurs during subscription 422 connection.close(); 423 return; 424 } 425 // increment the reference count on the HttpClientImpl 426 // to prevent the SelectorManager thread from exiting until 427 // the body is fully read. 428 refCountTracker.acquire(); 429 bodyParser = content.getBodyParser( 430 (t) -> { 431 try { 432 if (t != null) { 433 try { 434 subscriber.onError(t); 435 } finally { 436 cf.completeExceptionally(t); 437 } 438 } 439 } finally { 440 bodyReader.onComplete(t); 441 if (t != null) { 442 connection.close(); 443 } 444 } 445 }); 446 bodyReader.start(bodyParser); 447 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); 448 asyncReceiver.subscribe(bodyReader); 449 assert bodyReaderCF != null : "parsing not started"; 450 // Make sure to keep a reference to asyncReceiver from 451 // within this 452 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> { 453 t = Utils.getCompletionCause(t); 454 try { 455 if (t == null) { 456 if (debug.on()) debug.log("Finished reading body: " + s); 457 assert s == State.READING_BODY; 458 } 459 if (t != null) { 460 subscriber.onError(t); 461 cf.completeExceptionally(t); 462 } 463 } catch (Throwable x) { 464 // not supposed to happen 465 asyncReceiver.onReadError(x); 466 } finally { 467 // we're done: release the ref count for 468 // the current operation. 469 refCountTracker.tryRelease(); 470 } 471 }); 472 connection.addTrailingOperation(trailingOp); 473 } catch (Throwable t) { 474 if (debug.on()) debug.log("Failed reading body: " + t); 475 try { 476 subscriber.onError(t); 477 cf.completeExceptionally(t); 478 } finally { 479 asyncReceiver.onReadError(t); 480 } 481 } finally { 482 connection.client().unreference(); 483 } 484 }); 485 486 ResponseSubscribers.getBodyAsync(executor, p, cf, (t) -> { 487 cf.completeExceptionally(t); 488 asyncReceiver.setRetryOnError(false); 489 asyncReceiver.onReadError(t); 490 }); 491 492 return cf.whenComplete((s,t) -> { 493 if (t != null) { 494 // If an exception occurred, release the 495 // ref count for the current operation, as 496 // it may never be triggered otherwise 497 // (BodySubscriber ofInputStream) 498 // If there was no exception then the 499 // ref count will be/have been released when 500 // the last byte of the response is/was received 501 refCountTracker.tryRelease(); 502 } 503 }); 504 } 505 506 507 private void onFinished() { 508 asyncReceiver.clear(); 509 if (return2Cache) { 510 Log.logTrace("Attempting to return connection to the pool: {0}", connection); 511 // TODO: need to do something here? 512 // connection.setAsyncCallbacks(null, null, null); 513 514 // don't return the connection to the cache if EOF happened. 515 if (debug.on()) 516 debug.log(connection.getConnectionFlow() + ": return to HTTP/1.1 pool"); 517 connection.closeOrReturnToCache(eof == null ? headers : null); 518 } 519 } 520 521 HttpHeaders responseHeaders() { 522 return headers; 523 } 524 525 int responseCode() { 526 return responseCode; 527 } 528 529 // ================ Support for plugging into Http1Receiver ================= 530 // ============================================================================ 531 532 // Callback: Error receiver: Consumer of Throwable. 533 void onReadError(Throwable t) { 534 Log.logError(t); 535 Receiver<?> receiver = receiver(readProgress); 536 if (t instanceof EOFException) { 537 debug.log(Level.DEBUG, "onReadError: received EOF"); 538 eof = (EOFException) t; 539 } 540 CompletableFuture<?> cf = receiver == null ? null : receiver.completion(); 541 debug.log(Level.DEBUG, () -> "onReadError: cf is " 542 + (cf == null ? "null" 543 : (cf.isDone() ? "already completed" 544 : "not yet completed"))); 545 if (cf != null) { 546 cf.completeExceptionally(t); 547 } else { 548 debug.log(Level.DEBUG, "onReadError", t); 549 } 550 debug.log(Level.DEBUG, () -> "closing connection: cause is " + t); 551 connection.close(); 552 } 553 554 // ======================================================================== 555 556 private State advance(State previous) { 557 assert readProgress == previous; 558 switch(previous) { 559 case READING_HEADERS: 560 asyncReceiver.unsubscribe(headersReader); 561 return readProgress = State.READING_BODY; 562 case READING_BODY: 563 asyncReceiver.unsubscribe(bodyReader); 564 return readProgress = State.DONE; 565 default: 566 throw new InternalError("can't advance from " + previous); 567 } 568 } 569 570 Receiver<?> receiver(State state) { 571 switch(state) { 572 case READING_HEADERS: return headersReader; 573 case READING_BODY: return bodyReader; 574 default: return null; 575 } 576 577 } 578 579 static abstract class Receiver<T> 580 implements Http1AsyncReceiver.Http1AsyncDelegate { 581 abstract void start(T parser); 582 abstract CompletableFuture<State> completion(); 583 // accepts a buffer from upstream. 584 // this should be implemented as a simple call to 585 // accept(ref, parser, cf) 586 public abstract boolean tryAsyncReceive(ByteBuffer buffer); 587 public abstract void onReadError(Throwable t); 588 // handle a byte buffer received from upstream. 589 // this method should set the value of Http1Response.buffer 590 // to ref.get() before beginning parsing. 591 abstract void handle(ByteBuffer buf, T parser, 592 CompletableFuture<State> cf); 593 // resets this objects state so that it can be reused later on 594 // typically puts the reference to parser and completion to null 595 abstract void reset(); 596 597 // accepts a byte buffer received from upstream 598 // returns true if the buffer is fully parsed and more data can 599 // be accepted, false otherwise. 600 final boolean accept(ByteBuffer buf, T parser, 601 CompletableFuture<State> cf) { 602 if (cf == null || parser == null || cf.isDone()) return false; 603 handle(buf, parser, cf); 604 return !cf.isDone(); 605 } 606 public abstract void onSubscribe(AbstractSubscription s); 607 public abstract AbstractSubscription subscription(); 608 609 } 610 611 // Invoked with each new ByteBuffer when reading headers... 612 final class HeadersReader extends Receiver<Http1HeaderParser> { 613 final Consumer<State> onComplete; 614 volatile Http1HeaderParser parser; 615 volatile CompletableFuture<State> cf; 616 volatile long count; // bytes parsed (for debug) 617 volatile AbstractSubscription subscription; 618 619 HeadersReader(Consumer<State> onComplete) { 620 this.onComplete = onComplete; 621 } 622 623 @Override 624 public AbstractSubscription subscription() { 625 return subscription; 626 } 627 628 @Override 629 public void onSubscribe(AbstractSubscription s) { 630 this.subscription = s; 631 s.request(1); 632 } 633 634 @Override 635 void reset() { 636 cf = null; 637 parser = null; 638 count = 0; 639 subscription = null; 640 } 641 642 // Revisit: do we need to support restarting? 643 @Override 644 final void start(Http1HeaderParser hp) { 645 count = 0; 646 cf = new MinimalFuture<>(); 647 parser = hp; 648 } 649 650 @Override 651 CompletableFuture<State> completion() { 652 return cf; 653 } 654 655 @Override 656 public final boolean tryAsyncReceive(ByteBuffer ref) { 657 boolean hasDemand = subscription.demand().tryDecrement(); 658 assert hasDemand; 659 boolean needsMore = accept(ref, parser, cf); 660 if (needsMore) subscription.request(1); 661 return needsMore; 662 } 663 664 @Override 665 public final void onReadError(Throwable t) { 666 t = wrapWithExtraDetail(t, parser::currentStateMessage); 667 Http1Response.this.onReadError(t); 668 } 669 670 @Override 671 final void handle(ByteBuffer b, 672 Http1HeaderParser parser, 673 CompletableFuture<State> cf) { 674 assert cf != null : "parsing not started"; 675 assert parser != null : "no parser"; 676 try { 677 count += b.remaining(); 678 if (debug.on()) 679 debug.log("Sending " + b.remaining() + "/" + b.capacity() 680 + " bytes to header parser"); 681 if (parser.parse(b)) { 682 count -= b.remaining(); 683 if (debug.on()) 684 debug.log("Parsing headers completed. bytes=" + count); 685 onComplete.accept(State.READING_HEADERS); 686 cf.complete(State.READING_HEADERS); 687 } 688 } catch (Throwable t) { 689 if (debug.on()) 690 debug.log("Header parser failed to handle buffer: " + t); 691 cf.completeExceptionally(t); 692 } 693 } 694 695 @Override 696 public void close(Throwable error) { 697 // if there's no error nothing to do: the cf should/will 698 // be completed. 699 if (error != null) { 700 CompletableFuture<State> cf = this.cf; 701 if (cf != null) { 702 if (debug.on()) 703 debug.log("close: completing header parser CF with " + error); 704 cf.completeExceptionally(error); 705 } 706 } 707 } 708 } 709 710 // Invoked with each new ByteBuffer when reading bodies... 711 final class BodyReader extends Receiver<BodyParser> { 712 final Consumer<State> onComplete; 713 volatile BodyParser parser; 714 volatile CompletableFuture<State> cf; 715 volatile AbstractSubscription subscription; 716 BodyReader(Consumer<State> onComplete) { 717 this.onComplete = onComplete; 718 } 719 720 @Override 721 void reset() { 722 parser = null; 723 cf = null; 724 subscription = null; 725 } 726 727 // Revisit: do we need to support restarting? 728 @Override 729 final void start(BodyParser parser) { 730 cf = new MinimalFuture<>(); 731 this.parser = parser; 732 } 733 734 @Override 735 CompletableFuture<State> completion() { 736 return cf; 737 } 738 739 @Override 740 public final boolean tryAsyncReceive(ByteBuffer b) { 741 return accept(b, parser, cf); 742 } 743 744 @Override 745 public final void onReadError(Throwable t) { 746 if (t instanceof EOFException && bodyParser != null && 747 bodyParser instanceof UnknownLengthBodyParser) { 748 ((UnknownLengthBodyParser)bodyParser).complete(); 749 return; 750 } 751 t = wrapWithExtraDetail(t, parser::currentStateMessage); 752 Http1Response.this.onReadError(t); 753 } 754 755 @Override 756 public AbstractSubscription subscription() { 757 return subscription; 758 } 759 760 @Override 761 public void onSubscribe(AbstractSubscription s) { 762 this.subscription = s; 763 try { 764 parser.onSubscribe(s); 765 } catch (Throwable t) { 766 cf.completeExceptionally(t); 767 throw t; 768 } 769 } 770 771 @Override 772 final void handle(ByteBuffer b, 773 BodyParser parser, 774 CompletableFuture<State> cf) { 775 assert cf != null : "parsing not started"; 776 assert parser != null : "no parser"; 777 try { 778 if (debug.on()) 779 debug.log("Sending " + b.remaining() + "/" + b.capacity() 780 + " bytes to body parser"); 781 parser.accept(b); 782 } catch (Throwable t) { 783 if (debug.on()) 784 debug.log("Body parser failed to handle buffer: " + t); 785 if (!cf.isDone()) { 786 cf.completeExceptionally(t); 787 } 788 } 789 } 790 791 final void onComplete(Throwable closedExceptionally) { 792 if (cf.isDone()) return; 793 if (closedExceptionally != null) { 794 cf.completeExceptionally(closedExceptionally); 795 } else { 796 onComplete.accept(State.READING_BODY); 797 cf.complete(State.READING_BODY); 798 } 799 } 800 801 @Override 802 public final void close(Throwable error) { 803 CompletableFuture<State> cf = this.cf; 804 if (cf != null && !cf.isDone()) { 805 // we want to make sure dependent actions are triggered 806 // in order to make sure the client reference count 807 // is decremented 808 if (error != null) { 809 if (debug.on()) 810 debug.log("close: completing body parser CF with " + error); 811 cf.completeExceptionally(error); 812 } else { 813 if (debug.on()) 814 debug.log("close: completing body parser CF"); 815 cf.complete(State.READING_BODY); 816 } 817 } 818 } 819 820 @Override 821 public String toString() { 822 return super.toString() + "/parser=" + String.valueOf(parser); 823 } 824 } 825 } 826