1 /* 2 * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.http.HttpClient; 31 import java.net.http.HttpResponse; 32 import java.net.http.HttpResponse.BodyHandler; 33 import java.net.http.HttpResponse.BodySubscriber; 34 import java.nio.ByteBuffer; 35 import java.util.Objects; 36 import java.util.concurrent.CompletableFuture; 37 import java.util.LinkedList; 38 import java.util.List; 39 import java.util.concurrent.ConcurrentLinkedDeque; 40 import java.util.concurrent.Executor; 41 import java.util.concurrent.Flow; 42 import jdk.internal.net.http.common.Demand; 43 import jdk.internal.net.http.common.Log; 44 import jdk.internal.net.http.common.FlowTube; 45 import jdk.internal.net.http.common.Logger; 46 import jdk.internal.net.http.common.SequentialScheduler; 47 import jdk.internal.net.http.common.MinimalFuture; 48 import jdk.internal.net.http.common.Utils; 49 import static java.net.http.HttpClient.Version.HTTP_1_1; 50 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; 51 52 /** 53 * Encapsulates one HTTP/1.1 request/response exchange. 54 */ 55 class Http1Exchange<T> extends ExchangeImpl<T> { 56 57 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 58 final HttpRequestImpl request; // main request 59 final Http1Request requestAction; 60 private volatile Http1Response<T> response; 61 final HttpConnection connection; 62 final HttpClientImpl client; 63 final Executor executor; 64 private final Http1AsyncReceiver asyncReceiver; 65 private volatile boolean upgraded; 66 67 /** Records a possible cancellation raised before any operation 68 * has been initiated, or an error received while sending the request. */ 69 private Throwable failed; 70 private final List<CompletableFuture<?>> operations; // used for cancel 71 72 /** Must be held when operating on any internal state or data. */ 73 private final Object lock = new Object(); 74 75 /** Holds the outgoing data, either the headers or a request body part. Or 76 * an error from the request body publisher. At most there can be ~2 pieces 77 * of outgoing data ( onComplete|onError can be invoked without demand ).*/ 78 final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>(); 79 80 /** The write publisher, responsible for writing the complete request ( both 81 * headers and body ( if any ). */ 82 private final Http1Publisher writePublisher = new Http1Publisher(); 83 84 /** Completed when the header have been published, or there is an error */ 85 private final CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>(); 86 /** Completed when the body has been published, or there is an error */ 87 private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>(); 88 89 /** The subscriber to the request's body published. Maybe null. */ 90 private volatile Http1BodySubscriber bodySubscriber; 91 92 enum State { INITIAL, 93 HEADERS, 94 BODY, 95 ERROR, // terminal state 96 COMPLETING, 97 COMPLETED } // terminal state 98 99 private State state = State.INITIAL; 100 101 /** A carrier for either data or an error. Used to carry data, and communicate 102 * errors from the request ( both headers and body ) to the exchange. */ 103 static class DataPair { 104 Throwable throwable; 105 List<ByteBuffer> data; DataPair(List<ByteBuffer> data, Throwable throwable)106 DataPair(List<ByteBuffer> data, Throwable throwable){ 107 this.data = data; 108 this.throwable = throwable; 109 } 110 @Override toString()111 public String toString() { 112 return "DataPair [data=" + data + ", throwable=" + throwable + "]"; 113 } 114 } 115 116 /** An abstract supertype for HTTP/1.1 body subscribers. There are two 117 * concrete implementations: {@link Http1Request.StreamSubscriber}, and 118 * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and 119 * fixed length bodies, respectively. */ 120 static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> { 121 final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>(); 122 private volatile Flow.Subscription subscription; 123 volatile boolean complete; 124 private final Logger debug; Http1BodySubscriber(Logger debug)125 Http1BodySubscriber(Logger debug) { 126 assert debug != null; 127 this.debug = debug; 128 } 129 130 /** Final sentinel in the stream of request body. */ 131 static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0)); 132 request(long n)133 final void request(long n) { 134 if (debug.on()) 135 debug.log("Http1BodySubscriber requesting %d, from %s", 136 n, subscription); 137 subscription.request(n); 138 } 139 140 /** A current-state message suitable for inclusion in an exception detail message. */ currentStateMessage()141 abstract String currentStateMessage(); 142 isSubscribed()143 final boolean isSubscribed() { 144 return subscription != null; 145 } 146 setSubscription(Flow.Subscription subscription)147 final void setSubscription(Flow.Subscription subscription) { 148 this.subscription = subscription; 149 whenSubscribed.complete(subscription); 150 } 151 cancelSubscription()152 final void cancelSubscription() { 153 try { 154 subscription.cancel(); 155 } catch(Throwable t) { 156 String msg = "Ignoring exception raised when canceling BodyPublisher subscription"; 157 if (debug.on()) debug.log("%s: %s", msg, t); 158 Log.logError("{0}: {1}", msg, (Object)t); 159 } 160 } 161 completeSubscriber(Logger debug)162 static Http1BodySubscriber completeSubscriber(Logger debug) { 163 return new Http1BodySubscriber(debug) { 164 @Override public void onSubscribe(Flow.Subscription subscription) { error(); } 165 @Override public void onNext(ByteBuffer item) { error(); } 166 @Override public void onError(Throwable throwable) { error(); } 167 @Override public void onComplete() { error(); } 168 @Override String currentStateMessage() { return null; } 169 private void error() { 170 throw new InternalError("should not reach here"); 171 } 172 }; 173 } 174 } 175 176 @Override 177 public String toString() { 178 return "HTTP/1.1 " + request.toString(); 179 } 180 181 HttpRequestImpl request() { 182 return request; 183 } 184 185 Http1Exchange(Exchange<T> exchange, HttpConnection connection) 186 throws IOException 187 { 188 super(exchange); 189 this.request = exchange.request(); 190 this.client = exchange.client(); 191 this.executor = exchange.executor(); 192 this.operations = new LinkedList<>(); 193 operations.add(headersSentCF); 194 operations.add(bodySentCF); 195 if (connection != null) { 196 this.connection = connection; 197 } else { 198 InetSocketAddress addr = request.getAddress(); 199 this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1); 200 } 201 this.requestAction = new Http1Request(request, this); 202 this.asyncReceiver = new Http1AsyncReceiver(executor, this); 203 } 204 205 @Override 206 HttpConnection connection() { 207 return connection; 208 } 209 210 private void connectFlows(HttpConnection connection) { 211 FlowTube tube = connection.getConnectionFlow(); 212 if (debug.on()) debug.log("%s connecting flows", tube); 213 214 // Connect the flow to our Http1TubeSubscriber: 215 // asyncReceiver.subscriber(). 216 tube.connectFlows(writePublisher, 217 asyncReceiver.subscriber()); 218 } 219 220 @Override 221 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 222 // create the response before sending the request headers, so that 223 // the response can set the appropriate receivers. 224 if (debug.on()) debug.log("Sending headers only"); 225 // If the first attempt to read something triggers EOF, or 226 // IOException("channel reset by peer"), we're going to retry. 227 // Instruct the asyncReceiver to throw ConnectionExpiredException 228 // to force a retry. 229 asyncReceiver.setRetryOnError(true); 230 if (response == null) { 231 response = new Http1Response<>(connection, this, asyncReceiver); 232 } 233 234 if (debug.on()) debug.log("response created in advance"); 235 236 CompletableFuture<Void> connectCF; 237 if (!connection.connected()) { 238 if (debug.on()) debug.log("initiating connect async"); 239 connectCF = connection.connectAsync(exchange) 240 .thenCompose(unused -> connection.finishConnect()); 241 Throwable cancelled; 242 synchronized (lock) { 243 if ((cancelled = failed) == null) { 244 operations.add(connectCF); 245 } 246 } 247 if (cancelled != null) { 248 if (client.isSelectorThread()) { 249 executor.execute(() -> 250 connectCF.completeExceptionally(cancelled)); 251 } else { 252 connectCF.completeExceptionally(cancelled); 253 } 254 } 255 } else { 256 connectCF = new MinimalFuture<>(); 257 connectCF.complete(null); 258 } 259 260 return connectCF 261 .thenCompose(unused -> { 262 CompletableFuture<Void> cf = new MinimalFuture<>(); 263 try { 264 asyncReceiver.whenFinished.whenComplete((r,t) -> { 265 if (t != null) { 266 if (debug.on()) 267 debug.log("asyncReceiver finished (failed=%s)", (Object)t); 268 if (!headersSentCF.isDone()) 269 headersSentCF.completeAsync(() -> this, executor); 270 } 271 }); 272 connectFlows(connection); 273 274 if (debug.on()) debug.log("requestAction.headers"); 275 List<ByteBuffer> data = requestAction.headers(); 276 synchronized (lock) { 277 state = State.HEADERS; 278 } 279 if (debug.on()) debug.log("setting outgoing with headers"); 280 assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing; 281 appendToOutgoing(data); 282 cf.complete(null); 283 return cf; 284 } catch (Throwable t) { 285 if (debug.on()) debug.log("Failed to send headers: %s", t); 286 headersSentCF.completeExceptionally(t); 287 bodySentCF.completeExceptionally(t); 288 connection.close(); 289 cf.completeExceptionally(t); 290 return cf; 291 } }) 292 .thenCompose(unused -> headersSentCF); 293 } 294 295 private void cancelIfFailed(Flow.Subscription s) { 296 asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> { 297 if (debug.on()) 298 debug.log("asyncReceiver finished (failed=%s)", (Object)t); 299 if (t != null) { 300 s.cancel(); 301 // Don't complete exceptionally here as 't' 302 // might not be the right exception: it will 303 // not have been decorated yet. 304 // t is an exception raised by the read side, 305 // an EOFException or Broken Pipe... 306 // We are cancelling the BodyPublisher subscription 307 // and completing bodySentCF to allow the next step 308 // to flow and call readHeaderAsync, which will 309 // get the right exception from the asyncReceiver. 310 bodySentCF.complete(this); 311 } 312 }, executor); 313 } 314 315 @Override 316 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 317 assert headersSentCF.isDone(); 318 if (debug.on()) debug.log("sendBodyAsync"); 319 try { 320 bodySubscriber = requestAction.continueRequest(); 321 if (debug.on()) debug.log("bodySubscriber is %s", 322 bodySubscriber == null ? null : bodySubscriber.getClass()); 323 if (bodySubscriber == null) { 324 bodySubscriber = Http1BodySubscriber.completeSubscriber(debug); 325 appendToOutgoing(Http1BodySubscriber.COMPLETED); 326 } else { 327 // start 328 bodySubscriber.whenSubscribed 329 .thenAccept((s) -> cancelIfFailed(s)) 330 .thenAccept((s) -> requestMoreBody()); 331 } 332 } catch (Throwable t) { 333 cancelImpl(t); 334 bodySentCF.completeExceptionally(t); 335 } 336 return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF); 337 } 338 339 @Override 340 CompletableFuture<Response> getResponseAsync(Executor executor) { 341 if (debug.on()) debug.log("reading headers"); 342 CompletableFuture<Response> cf = response.readHeadersAsync(executor); 343 Throwable cause; 344 synchronized (lock) { 345 operations.add(cf); 346 cause = failed; 347 failed = null; 348 } 349 350 if (cause != null) { 351 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]" 352 + "\n\tCompleting exceptionally with {2}\n", 353 request.uri(), 354 request.timeout().isPresent() ? 355 // calling duration.toMillis() can throw an exception. 356 // this is just debugging, we don't care if it overflows. 357 (request.timeout().get().getSeconds() * 1000 358 + request.timeout().get().getNano() / 1000000) : -1, 359 cause); 360 boolean acknowledged = cf.completeExceptionally(cause); 361 if (debug.on()) 362 debug.log(acknowledged ? ("completed response with " + cause) 363 : ("response already completed, ignoring " + cause)); 364 } 365 return Utils.wrapForDebug(debug, "getResponseAsync", cf); 366 } 367 368 @Override 369 CompletableFuture<T> readBodyAsync(BodyHandler<T> handler, 370 boolean returnConnectionToPool, 371 Executor executor) 372 { 373 BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(), 374 response.responseHeaders(), 375 HTTP_1_1)); 376 CompletableFuture<T> bodyCF = response.readBody(bs, 377 returnConnectionToPool, 378 executor); 379 return bodyCF; 380 } 381 382 @Override 383 CompletableFuture<Void> ignoreBody() { 384 return response.ignoreBody(executor); 385 } 386 387 // Used for those response codes that have no body associated 388 @Override 389 public void nullBody(HttpResponse<T> resp, Throwable t) { 390 response.nullBody(resp, t); 391 } 392 393 394 ByteBuffer drainLeftOverBytes() { 395 synchronized (lock) { 396 asyncReceiver.stop(); 397 return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER); 398 } 399 } 400 401 void released() { 402 Http1Response<T> resp = this.response; 403 if (resp != null) resp.completed(); 404 asyncReceiver.clear(); 405 } 406 407 void completed() { 408 Http1Response<T> resp = this.response; 409 if (resp != null) resp.completed(); 410 } 411 412 /** 413 * Cancel checks to see if request and responseAsync finished already. 414 * If not it closes the connection and completes all pending operations 415 */ 416 @Override 417 void cancel() { 418 cancelImpl(new IOException("Request cancelled")); 419 } 420 421 /** 422 * Cancel checks to see if request and responseAsync finished already. 423 * If not it closes the connection and completes all pending operations 424 */ 425 @Override 426 void cancel(IOException cause) { 427 cancelImpl(cause); 428 } 429 430 private void cancelImpl(Throwable cause) { 431 LinkedList<CompletableFuture<?>> toComplete = null; 432 int count = 0; 433 Throwable error; 434 synchronized (lock) { 435 if ((error = failed) == null) { 436 failed = error = cause; 437 } 438 if (debug.on()) { 439 debug.log(request.uri() + ": " + error); 440 } 441 if (requestAction != null && requestAction.finished() 442 && response != null && response.finished()) { 443 return; 444 } 445 writePublisher.writeScheduler.stop(); 446 if (operations.isEmpty()) { 447 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation." 448 + "\n\tCan''t cancel yet with {2}", 449 request.uri(), 450 request.timeout().isPresent() ? 451 // calling duration.toMillis() can throw an exception. 452 // this is just debugging, we don't care if it overflows. 453 (request.timeout().get().getSeconds() * 1000 454 + request.timeout().get().getNano() / 1000000) : -1, 455 cause); 456 } else { 457 for (CompletableFuture<?> cf : operations) { 458 if (!cf.isDone()) { 459 if (toComplete == null) toComplete = new LinkedList<>(); 460 toComplete.add(cf); 461 count++; 462 } 463 } 464 operations.clear(); 465 } 466 } 467 try { 468 Log.logError("Http1Exchange.cancel: count=" + count); 469 if (toComplete != null) { 470 // We might be in the selector thread in case of timeout, when 471 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline() 472 // There may or may not be other places that reach here 473 // from the SelectorManager thread, so just make sure we 474 // don't complete any CF from within the selector manager 475 // thread. 476 Executor exec = client.isSelectorThread() 477 ? executor 478 : this::runInline; 479 Throwable x = error; 480 while (!toComplete.isEmpty()) { 481 CompletableFuture<?> cf = toComplete.poll(); 482 exec.execute(() -> { 483 if (cf.completeExceptionally(x)) { 484 if (debug.on()) 485 debug.log("%s: completed cf with %s", request.uri(), x); 486 } 487 }); 488 } 489 } 490 } finally { 491 if (!upgraded) 492 connection.close(); 493 } 494 } 495 496 void upgraded() { 497 upgraded = true; 498 } 499 500 private void runInline(Runnable run) { 501 assert !client.isSelectorThread(); 502 run.run(); 503 } 504 505 /** Returns true if this exchange was canceled. */ 506 boolean isCanceled() { 507 synchronized (lock) { 508 return failed != null; 509 } 510 } 511 512 /** Returns the cause for which this exchange was canceled, if available. */ 513 Throwable getCancelCause() { 514 synchronized (lock) { 515 return failed; 516 } 517 } 518 519 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */ 520 void appendToOutgoing(Throwable throwable) { 521 appendToOutgoing(new DataPair(null, throwable)); 522 } 523 524 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */ 525 void appendToOutgoing(List<ByteBuffer> item) { 526 appendToOutgoing(new DataPair(item, null)); 527 } 528 529 private void appendToOutgoing(DataPair dp) { 530 if (debug.on()) debug.log("appending to outgoing " + dp); 531 outgoing.add(dp); 532 writePublisher.writeScheduler.runOrSchedule(); 533 } 534 535 /** Tells whether, or not, there is any outgoing data that can be published, 536 * or if there is an error. */ 537 private boolean hasOutgoing() { 538 return !outgoing.isEmpty(); 539 } 540 541 private void requestMoreBody() { 542 try { 543 if (debug.on()) debug.log("requesting more request body from the subscriber"); 544 bodySubscriber.request(1); 545 } catch (Throwable t) { 546 if (debug.on()) debug.log("Subscription::request failed", t); 547 cancelImpl(t); 548 bodySentCF.completeExceptionally(t); 549 } 550 } 551 552 private void cancelUpstreamSubscription() { 553 final Executor exec = client.theExecutor(); 554 if (debug.on()) debug.log("cancelling upstream publisher"); 555 if (bodySubscriber != null) { 556 exec.execute(bodySubscriber::cancelSubscription); 557 } else if (debug.on()) { 558 debug.log("bodySubscriber is null"); 559 } 560 } 561 562 // Invoked only by the publisher 563 // ALL tasks should execute off the Selector-Manager thread 564 /** Returns the next portion of the HTTP request, or the error. */ 565 private DataPair getOutgoing() { 566 final Executor exec = client.theExecutor(); 567 final DataPair dp = outgoing.pollFirst(); 568 569 if (writePublisher.cancelled) { 570 cancelUpstreamSubscription(); 571 headersSentCF.completeAsync(() -> this, exec); 572 bodySentCF.completeAsync(() -> this, exec); 573 return null; 574 } 575 576 if (dp == null) // publisher has not published anything yet 577 return null; 578 579 if (dp.throwable != null) { 580 synchronized (lock) { 581 state = State.ERROR; 582 } 583 exec.execute(() -> { 584 headersSentCF.completeExceptionally(dp.throwable); 585 bodySentCF.completeExceptionally(dp.throwable); 586 connection.close(); 587 }); 588 return dp; 589 } 590 591 switch (state) { 592 case HEADERS: 593 synchronized (lock) { 594 state = State.BODY; 595 } 596 // completeAsync, since dependent tasks should run in another thread 597 if (debug.on()) debug.log("initiating completion of headersSentCF"); 598 headersSentCF.completeAsync(() -> this, exec); 599 break; 600 case BODY: 601 if (dp.data == Http1BodySubscriber.COMPLETED) { 602 synchronized (lock) { 603 state = State.COMPLETING; 604 } 605 if (debug.on()) debug.log("initiating completion of bodySentCF"); 606 bodySentCF.completeAsync(() -> this, exec); 607 } else { 608 exec.execute(this::requestMoreBody); 609 } 610 break; 611 case INITIAL: 612 case ERROR: 613 case COMPLETING: 614 case COMPLETED: 615 default: 616 assert false : "Unexpected state:" + state; 617 } 618 619 return dp; 620 } 621 622 /** A Publisher of HTTP/1.1 headers and request body. */ 623 final class Http1Publisher implements FlowTube.TubePublisher { 624 625 final Logger debug = Utils.getDebugLogger(this::dbgString); 626 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 627 volatile boolean cancelled; 628 final Http1WriteSubscription subscription = new Http1WriteSubscription(); 629 final Demand demand = new Demand(); 630 final SequentialScheduler writeScheduler = 631 SequentialScheduler.synchronizedScheduler(new WriteTask()); 632 633 @Override 634 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 635 assert state == State.INITIAL; 636 Objects.requireNonNull(s); 637 assert subscriber == null; 638 639 subscriber = s; 640 if (debug.on()) debug.log("got subscriber: %s", s); 641 s.onSubscribe(subscription); 642 } 643 644 volatile String dbgTag; 645 String dbgString() { 646 String tag = dbgTag; 647 Object flow = connection.getConnectionFlow(); 648 if (tag == null && flow != null) { 649 dbgTag = tag = "Http1Publisher(" + flow + ")"; 650 } else if (tag == null) { 651 tag = "Http1Publisher(?)"; 652 } 653 return tag; 654 } 655 656 @SuppressWarnings("fallthrough") 657 private boolean checkRequestCancelled() { 658 if (exchange.multi.requestCancelled()) { 659 if (debug.on()) debug.log("request cancelled"); 660 if (subscriber == null) { 661 if (debug.on()) debug.log("no subscriber yet"); 662 return true; 663 } 664 switch (state) { 665 case BODY: 666 cancelUpstreamSubscription(); 667 // fall trough to HEADERS 668 case HEADERS: 669 Throwable cause = getCancelCause(); 670 if (cause == null) cause = new IOException("Request cancelled"); 671 subscriber.onError(cause); 672 writeScheduler.stop(); 673 return true; 674 } 675 } 676 return false; 677 } 678 679 680 final class WriteTask implements Runnable { 681 @Override 682 public void run() { 683 assert state != State.COMPLETED : "Unexpected state:" + state; 684 if (debug.on()) debug.log("WriteTask"); 685 686 if (cancelled) { 687 if (debug.on()) debug.log("handling cancellation"); 688 writeScheduler.stop(); 689 getOutgoing(); 690 return; 691 } 692 693 if (checkRequestCancelled()) return; 694 695 if (subscriber == null) { 696 if (debug.on()) debug.log("no subscriber yet"); 697 return; 698 } 699 700 if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing()); 701 while (hasOutgoing() && demand.tryDecrement()) { 702 DataPair dp = getOutgoing(); 703 if (dp == null) 704 break; 705 706 if (dp.throwable != null) { 707 if (debug.on()) debug.log("onError"); 708 // Do not call the subscriber's onError, it is not required. 709 writeScheduler.stop(); 710 } else { 711 List<ByteBuffer> data = dp.data; 712 if (data == Http1BodySubscriber.COMPLETED) { 713 synchronized (lock) { 714 assert state == State.COMPLETING : "Unexpected state:" + state; 715 state = State.COMPLETED; 716 } 717 if (debug.on()) 718 debug.log("completed, stopping %s", writeScheduler); 719 writeScheduler.stop(); 720 // Do nothing more. Just do not publish anything further. 721 // The next Subscriber will eventually take over. 722 723 } else { 724 if (checkRequestCancelled()) return; 725 if (debug.on()) 726 debug.log("onNext with " + Utils.remaining(data) + " bytes"); 727 subscriber.onNext(data); 728 } 729 } 730 } 731 } 732 } 733 734 final class Http1WriteSubscription implements Flow.Subscription { 735 736 @Override 737 public void request(long n) { 738 if (cancelled) 739 return; //no-op 740 demand.increase(n); 741 if (debug.on()) 742 debug.log("subscription request(%d), demand=%s", n, demand); 743 writeScheduler.runOrSchedule(client.theExecutor()); 744 } 745 746 @Override 747 public void cancel() { 748 if (debug.on()) debug.log("subscription cancelled"); 749 if (cancelled) 750 return; //no-op 751 cancelled = true; 752 writeScheduler.runOrSchedule(client.theExecutor()); 753 } 754 } 755 } 756 757 HttpClient client() { 758 return client; 759 } 760 761 String dbgString() { 762 return "Http1Exchange"; 763 } 764 } 765