1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.http.HttpClient; 31 import java.net.http.HttpResponse.BodyHandler; 32 import java.net.http.HttpResponse.BodySubscriber; 33 import java.nio.ByteBuffer; 34 import java.util.Objects; 35 import java.util.concurrent.CompletableFuture; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.concurrent.ConcurrentLinkedDeque; 39 import java.util.concurrent.Executor; 40 import java.util.concurrent.Flow; 41 import jdk.internal.net.http.common.Demand; 42 import jdk.internal.net.http.common.Log; 43 import jdk.internal.net.http.common.FlowTube; 44 import jdk.internal.net.http.common.Logger; 45 import jdk.internal.net.http.common.SequentialScheduler; 46 import jdk.internal.net.http.common.MinimalFuture; 47 import jdk.internal.net.http.common.Utils; 48 import static java.net.http.HttpClient.Version.HTTP_1_1; 49 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; 50 51 /** 52 * Encapsulates one HTTP/1.1 request/response exchange. 53 */ 54 class Http1Exchange<T> extends ExchangeImpl<T> { 55 56 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 57 final HttpRequestImpl request; // main request 58 final Http1Request requestAction; 59 private volatile Http1Response<T> response; 60 final HttpConnection connection; 61 final HttpClientImpl client; 62 final Executor executor; 63 private final Http1AsyncReceiver asyncReceiver; 64 65 /** Records a possible cancellation raised before any operation 66 * has been initiated, or an error received while sending the request. */ 67 private Throwable failed; 68 private final List<CompletableFuture<?>> operations; // used for cancel 69 70 /** Must be held when operating on any internal state or data. */ 71 private final Object lock = new Object(); 72 73 /** Holds the outgoing data, either the headers or a request body part. Or 74 * an error from the request body publisher. At most there can be ~2 pieces 75 * of outgoing data ( onComplete|onError can be invoked without demand ).*/ 76 final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>(); 77 78 /** The write publisher, responsible for writing the complete request ( both 79 * headers and body ( if any ). */ 80 private final Http1Publisher writePublisher = new Http1Publisher(); 81 82 /** Completed when the header have been published, or there is an error */ 83 private final CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>(); 84 /** Completed when the body has been published, or there is an error */ 85 private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>(); 86 87 /** The subscriber to the request's body published. Maybe null. */ 88 private volatile Http1BodySubscriber bodySubscriber; 89 90 enum State { INITIAL, 91 HEADERS, 92 BODY, 93 ERROR, // terminal state 94 COMPLETING, 95 COMPLETED } // terminal state 96 97 private State state = State.INITIAL; 98 99 /** A carrier for either data or an error. Used to carry data, and communicate 100 * errors from the request ( both headers and body ) to the exchange. */ 101 static class DataPair { 102 Throwable throwable; 103 List<ByteBuffer> data; DataPair(List<ByteBuffer> data, Throwable throwable)104 DataPair(List<ByteBuffer> data, Throwable throwable){ 105 this.data = data; 106 this.throwable = throwable; 107 } 108 @Override toString()109 public String toString() { 110 return "DataPair [data=" + data + ", throwable=" + throwable + "]"; 111 } 112 } 113 114 /** An abstract supertype for HTTP/1.1 body subscribers. There are two 115 * concrete implementations: {@link Http1Request.StreamSubscriber}, and 116 * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and 117 * fixed length bodies, respectively. */ 118 static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> { 119 final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>(); 120 private volatile Flow.Subscription subscription; 121 volatile boolean complete; 122 private final Logger debug; Http1BodySubscriber(Logger debug)123 Http1BodySubscriber(Logger debug) { 124 assert debug != null; 125 this.debug = debug; 126 } 127 128 /** Final sentinel in the stream of request body. */ 129 static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0)); 130 request(long n)131 final void request(long n) { 132 if (debug.on()) 133 debug.log("Http1BodySubscriber requesting %d, from %s", 134 n, subscription); 135 subscription.request(n); 136 } 137 138 /** A current-state message suitable for inclusion in an exception detail message. */ currentStateMessage()139 abstract String currentStateMessage(); 140 isSubscribed()141 final boolean isSubscribed() { 142 return subscription != null; 143 } 144 setSubscription(Flow.Subscription subscription)145 final void setSubscription(Flow.Subscription subscription) { 146 this.subscription = subscription; 147 whenSubscribed.complete(subscription); 148 } 149 cancelSubscription()150 final void cancelSubscription() { 151 try { 152 subscription.cancel(); 153 } catch(Throwable t) { 154 String msg = "Ignoring exception raised when canceling BodyPublisher subscription"; 155 if (debug.on()) debug.log("%s: %s", msg, t); 156 Log.logError("{0}: {1}", msg, (Object)t); 157 } 158 } 159 completeSubscriber(Logger debug)160 static Http1BodySubscriber completeSubscriber(Logger debug) { 161 return new Http1BodySubscriber(debug) { 162 @Override public void onSubscribe(Flow.Subscription subscription) { error(); } 163 @Override public void onNext(ByteBuffer item) { error(); } 164 @Override public void onError(Throwable throwable) { error(); } 165 @Override public void onComplete() { error(); } 166 @Override String currentStateMessage() { return null; } 167 private void error() { 168 throw new InternalError("should not reach here"); 169 } 170 }; 171 } 172 } 173 174 @Override 175 public String toString() { 176 return "HTTP/1.1 " + request.toString(); 177 } 178 179 HttpRequestImpl request() { 180 return request; 181 } 182 183 Http1Exchange(Exchange<T> exchange, HttpConnection connection) 184 throws IOException 185 { 186 super(exchange); 187 this.request = exchange.request(); 188 this.client = exchange.client(); 189 this.executor = exchange.executor(); 190 this.operations = new LinkedList<>(); 191 operations.add(headersSentCF); 192 operations.add(bodySentCF); 193 if (connection != null) { 194 this.connection = connection; 195 } else { 196 InetSocketAddress addr = request.getAddress(); 197 this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1); 198 } 199 this.requestAction = new Http1Request(request, this); 200 this.asyncReceiver = new Http1AsyncReceiver(executor, this); 201 } 202 203 @Override 204 HttpConnection connection() { 205 return connection; 206 } 207 208 private void connectFlows(HttpConnection connection) { 209 FlowTube tube = connection.getConnectionFlow(); 210 if (debug.on()) debug.log("%s connecting flows", tube); 211 212 // Connect the flow to our Http1TubeSubscriber: 213 // asyncReceiver.subscriber(). 214 tube.connectFlows(writePublisher, 215 asyncReceiver.subscriber()); 216 } 217 218 @Override 219 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 220 // create the response before sending the request headers, so that 221 // the response can set the appropriate receivers. 222 if (debug.on()) debug.log("Sending headers only"); 223 // If the first attempt to read something triggers EOF, or 224 // IOException("channel reset by peer"), we're going to retry. 225 // Instruct the asyncReceiver to throw ConnectionExpiredException 226 // to force a retry. 227 asyncReceiver.setRetryOnError(true); 228 if (response == null) { 229 response = new Http1Response<>(connection, this, asyncReceiver); 230 } 231 232 if (debug.on()) debug.log("response created in advance"); 233 234 CompletableFuture<Void> connectCF; 235 if (!connection.connected()) { 236 if (debug.on()) debug.log("initiating connect async"); 237 connectCF = connection.connectAsync(exchange) 238 .thenCompose(unused -> connection.finishConnect()); 239 Throwable cancelled; 240 synchronized (lock) { 241 if ((cancelled = failed) == null) { 242 operations.add(connectCF); 243 } 244 } 245 if (cancelled != null) { 246 if (client.isSelectorThread()) { 247 executor.execute(() -> 248 connectCF.completeExceptionally(cancelled)); 249 } else { 250 connectCF.completeExceptionally(cancelled); 251 } 252 } 253 } else { 254 connectCF = new MinimalFuture<>(); 255 connectCF.complete(null); 256 } 257 258 return connectCF 259 .thenCompose(unused -> { 260 CompletableFuture<Void> cf = new MinimalFuture<>(); 261 try { 262 asyncReceiver.whenFinished.whenComplete((r,t) -> { 263 if (t != null) { 264 if (debug.on()) 265 debug.log("asyncReceiver finished (failed=%s)", (Object)t); 266 if (!headersSentCF.isDone()) 267 headersSentCF.completeAsync(() -> this, executor); 268 } 269 }); 270 connectFlows(connection); 271 272 if (debug.on()) debug.log("requestAction.headers"); 273 List<ByteBuffer> data = requestAction.headers(); 274 synchronized (lock) { 275 state = State.HEADERS; 276 } 277 if (debug.on()) debug.log("setting outgoing with headers"); 278 assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing; 279 appendToOutgoing(data); 280 cf.complete(null); 281 return cf; 282 } catch (Throwable t) { 283 if (debug.on()) debug.log("Failed to send headers: %s", t); 284 headersSentCF.completeExceptionally(t); 285 bodySentCF.completeExceptionally(t); 286 connection.close(); 287 cf.completeExceptionally(t); 288 return cf; 289 } }) 290 .thenCompose(unused -> headersSentCF); 291 } 292 293 private void cancelIfFailed(Flow.Subscription s) { 294 asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> { 295 if (debug.on()) 296 debug.log("asyncReceiver finished (failed=%s)", (Object)t); 297 if (t != null) { 298 s.cancel(); 299 // Don't complete exceptionally here as 't' 300 // might not be the right exception: it will 301 // not have been decorated yet. 302 // t is an exception raised by the read side, 303 // an EOFException or Broken Pipe... 304 // We are cancelling the BodyPublisher subscription 305 // and completing bodySentCF to allow the next step 306 // to flow and call readHeaderAsync, which will 307 // get the right exception from the asyncReceiver. 308 bodySentCF.complete(this); 309 } 310 }, executor); 311 } 312 313 @Override 314 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 315 assert headersSentCF.isDone(); 316 if (debug.on()) debug.log("sendBodyAsync"); 317 try { 318 bodySubscriber = requestAction.continueRequest(); 319 if (debug.on()) debug.log("bodySubscriber is %s", 320 bodySubscriber == null ? null : bodySubscriber.getClass()); 321 if (bodySubscriber == null) { 322 bodySubscriber = Http1BodySubscriber.completeSubscriber(debug); 323 appendToOutgoing(Http1BodySubscriber.COMPLETED); 324 } else { 325 // start 326 bodySubscriber.whenSubscribed 327 .thenAccept((s) -> cancelIfFailed(s)) 328 .thenAccept((s) -> requestMoreBody()); 329 } 330 } catch (Throwable t) { 331 cancelImpl(t); 332 bodySentCF.completeExceptionally(t); 333 } 334 return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF); 335 } 336 337 @Override 338 CompletableFuture<Response> getResponseAsync(Executor executor) { 339 if (debug.on()) debug.log("reading headers"); 340 CompletableFuture<Response> cf = response.readHeadersAsync(executor); 341 Throwable cause; 342 synchronized (lock) { 343 operations.add(cf); 344 cause = failed; 345 failed = null; 346 } 347 348 if (cause != null) { 349 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]" 350 + "\n\tCompleting exceptionally with {2}\n", 351 request.uri(), 352 request.timeout().isPresent() ? 353 // calling duration.toMillis() can throw an exception. 354 // this is just debugging, we don't care if it overflows. 355 (request.timeout().get().getSeconds() * 1000 356 + request.timeout().get().getNano() / 1000000) : -1, 357 cause); 358 boolean acknowledged = cf.completeExceptionally(cause); 359 if (debug.on()) 360 debug.log(acknowledged ? ("completed response with " + cause) 361 : ("response already completed, ignoring " + cause)); 362 } 363 return Utils.wrapForDebug(debug, "getResponseAsync", cf); 364 } 365 366 @Override 367 CompletableFuture<T> readBodyAsync(BodyHandler<T> handler, 368 boolean returnConnectionToPool, 369 Executor executor) 370 { 371 BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(), 372 response.responseHeaders(), 373 HTTP_1_1)); 374 CompletableFuture<T> bodyCF = response.readBody(bs, 375 returnConnectionToPool, 376 executor); 377 return bodyCF; 378 } 379 380 @Override 381 CompletableFuture<Void> ignoreBody() { 382 return response.ignoreBody(executor); 383 } 384 385 ByteBuffer drainLeftOverBytes() { 386 synchronized (lock) { 387 asyncReceiver.stop(); 388 return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER); 389 } 390 } 391 392 void released() { 393 Http1Response<T> resp = this.response; 394 if (resp != null) resp.completed(); 395 asyncReceiver.clear(); 396 } 397 398 void completed() { 399 Http1Response<T> resp = this.response; 400 if (resp != null) resp.completed(); 401 } 402 403 /** 404 * Cancel checks to see if request and responseAsync finished already. 405 * If not it closes the connection and completes all pending operations 406 */ 407 @Override 408 void cancel() { 409 cancelImpl(new IOException("Request cancelled")); 410 } 411 412 /** 413 * Cancel checks to see if request and responseAsync finished already. 414 * If not it closes the connection and completes all pending operations 415 */ 416 @Override 417 void cancel(IOException cause) { 418 cancelImpl(cause); 419 } 420 421 private void cancelImpl(Throwable cause) { 422 LinkedList<CompletableFuture<?>> toComplete = null; 423 int count = 0; 424 Throwable error; 425 synchronized (lock) { 426 if ((error = failed) == null) { 427 failed = error = cause; 428 } 429 if (debug.on()) { 430 debug.log(request.uri() + ": " + error); 431 } 432 if (requestAction != null && requestAction.finished() 433 && response != null && response.finished()) { 434 return; 435 } 436 writePublisher.writeScheduler.stop(); 437 if (operations.isEmpty()) { 438 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation." 439 + "\n\tCan''t cancel yet with {2}", 440 request.uri(), 441 request.timeout().isPresent() ? 442 // calling duration.toMillis() can throw an exception. 443 // this is just debugging, we don't care if it overflows. 444 (request.timeout().get().getSeconds() * 1000 445 + request.timeout().get().getNano() / 1000000) : -1, 446 cause); 447 } else { 448 for (CompletableFuture<?> cf : operations) { 449 if (!cf.isDone()) { 450 if (toComplete == null) toComplete = new LinkedList<>(); 451 toComplete.add(cf); 452 count++; 453 } 454 } 455 operations.clear(); 456 } 457 } 458 try { 459 Log.logError("Http1Exchange.cancel: count=" + count); 460 if (toComplete != null) { 461 // We might be in the selector thread in case of timeout, when 462 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline() 463 // There may or may not be other places that reach here 464 // from the SelectorManager thread, so just make sure we 465 // don't complete any CF from within the selector manager 466 // thread. 467 Executor exec = client.isSelectorThread() 468 ? executor 469 : this::runInline; 470 Throwable x = error; 471 while (!toComplete.isEmpty()) { 472 CompletableFuture<?> cf = toComplete.poll(); 473 exec.execute(() -> { 474 if (cf.completeExceptionally(x)) { 475 if (debug.on()) 476 debug.log("%s: completed cf with %s", request.uri(), x); 477 } 478 }); 479 } 480 } 481 } finally { 482 connection.close(); 483 } 484 } 485 486 private void runInline(Runnable run) { 487 assert !client.isSelectorThread(); 488 run.run(); 489 } 490 491 /** Returns true if this exchange was canceled. */ 492 boolean isCanceled() { 493 synchronized (lock) { 494 return failed != null; 495 } 496 } 497 498 /** Returns the cause for which this exchange was canceled, if available. */ 499 Throwable getCancelCause() { 500 synchronized (lock) { 501 return failed; 502 } 503 } 504 505 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */ 506 void appendToOutgoing(Throwable throwable) { 507 appendToOutgoing(new DataPair(null, throwable)); 508 } 509 510 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */ 511 void appendToOutgoing(List<ByteBuffer> item) { 512 appendToOutgoing(new DataPair(item, null)); 513 } 514 515 private void appendToOutgoing(DataPair dp) { 516 if (debug.on()) debug.log("appending to outgoing " + dp); 517 outgoing.add(dp); 518 writePublisher.writeScheduler.runOrSchedule(); 519 } 520 521 /** Tells whether, or not, there is any outgoing data that can be published, 522 * or if there is an error. */ 523 private boolean hasOutgoing() { 524 return !outgoing.isEmpty(); 525 } 526 527 private void requestMoreBody() { 528 try { 529 if (debug.on()) debug.log("requesting more request body from the subscriber"); 530 bodySubscriber.request(1); 531 } catch (Throwable t) { 532 if (debug.on()) debug.log("Subscription::request failed", t); 533 cancelImpl(t); 534 bodySentCF.completeExceptionally(t); 535 } 536 } 537 538 // Invoked only by the publisher 539 // ALL tasks should execute off the Selector-Manager thread 540 /** Returns the next portion of the HTTP request, or the error. */ 541 private DataPair getOutgoing() { 542 final Executor exec = client.theExecutor(); 543 final DataPair dp = outgoing.pollFirst(); 544 545 if (writePublisher.cancelled) { 546 if (debug.on()) debug.log("cancelling upstream publisher"); 547 if (bodySubscriber != null) { 548 exec.execute(bodySubscriber::cancelSubscription); 549 } else if (debug.on()) { 550 debug.log("bodySubscriber is null"); 551 } 552 headersSentCF.completeAsync(() -> this, exec); 553 bodySentCF.completeAsync(() -> this, exec); 554 return null; 555 } 556 557 if (dp == null) // publisher has not published anything yet 558 return null; 559 560 if (dp.throwable != null) { 561 synchronized (lock) { 562 state = State.ERROR; 563 } 564 exec.execute(() -> { 565 headersSentCF.completeExceptionally(dp.throwable); 566 bodySentCF.completeExceptionally(dp.throwable); 567 connection.close(); 568 }); 569 return dp; 570 } 571 572 switch (state) { 573 case HEADERS: 574 synchronized (lock) { 575 state = State.BODY; 576 } 577 // completeAsync, since dependent tasks should run in another thread 578 if (debug.on()) debug.log("initiating completion of headersSentCF"); 579 headersSentCF.completeAsync(() -> this, exec); 580 break; 581 case BODY: 582 if (dp.data == Http1BodySubscriber.COMPLETED) { 583 synchronized (lock) { 584 state = State.COMPLETING; 585 } 586 if (debug.on()) debug.log("initiating completion of bodySentCF"); 587 bodySentCF.completeAsync(() -> this, exec); 588 } else { 589 exec.execute(this::requestMoreBody); 590 } 591 break; 592 case INITIAL: 593 case ERROR: 594 case COMPLETING: 595 case COMPLETED: 596 default: 597 assert false : "Unexpected state:" + state; 598 } 599 600 return dp; 601 } 602 603 /** A Publisher of HTTP/1.1 headers and request body. */ 604 final class Http1Publisher implements FlowTube.TubePublisher { 605 606 final Logger debug = Utils.getDebugLogger(this::dbgString); 607 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 608 volatile boolean cancelled; 609 final Http1WriteSubscription subscription = new Http1WriteSubscription(); 610 final Demand demand = new Demand(); 611 final SequentialScheduler writeScheduler = 612 SequentialScheduler.synchronizedScheduler(new WriteTask()); 613 614 @Override 615 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 616 assert state == State.INITIAL; 617 Objects.requireNonNull(s); 618 assert subscriber == null; 619 620 subscriber = s; 621 if (debug.on()) debug.log("got subscriber: %s", s); 622 s.onSubscribe(subscription); 623 } 624 625 volatile String dbgTag; 626 String dbgString() { 627 String tag = dbgTag; 628 Object flow = connection.getConnectionFlow(); 629 if (tag == null && flow != null) { 630 dbgTag = tag = "Http1Publisher(" + flow + ")"; 631 } else if (tag == null) { 632 tag = "Http1Publisher(?)"; 633 } 634 return tag; 635 } 636 637 final class WriteTask implements Runnable { 638 @Override 639 public void run() { 640 assert state != State.COMPLETED : "Unexpected state:" + state; 641 if (debug.on()) debug.log("WriteTask"); 642 643 if (cancelled) { 644 if (debug.on()) debug.log("handling cancellation"); 645 writeScheduler.stop(); 646 getOutgoing(); 647 return; 648 } 649 650 if (subscriber == null) { 651 if (debug.on()) debug.log("no subscriber yet"); 652 return; 653 } 654 if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing()); 655 while (hasOutgoing() && demand.tryDecrement()) { 656 DataPair dp = getOutgoing(); 657 if (dp == null) 658 break; 659 660 if (dp.throwable != null) { 661 if (debug.on()) debug.log("onError"); 662 // Do not call the subscriber's onError, it is not required. 663 writeScheduler.stop(); 664 } else { 665 List<ByteBuffer> data = dp.data; 666 if (data == Http1BodySubscriber.COMPLETED) { 667 synchronized (lock) { 668 assert state == State.COMPLETING : "Unexpected state:" + state; 669 state = State.COMPLETED; 670 } 671 if (debug.on()) 672 debug.log("completed, stopping %s", writeScheduler); 673 writeScheduler.stop(); 674 // Do nothing more. Just do not publish anything further. 675 // The next Subscriber will eventually take over. 676 677 } else { 678 if (debug.on()) 679 debug.log("onNext with " + Utils.remaining(data) + " bytes"); 680 subscriber.onNext(data); 681 } 682 } 683 } 684 } 685 } 686 687 final class Http1WriteSubscription implements Flow.Subscription { 688 689 @Override 690 public void request(long n) { 691 if (cancelled) 692 return; //no-op 693 demand.increase(n); 694 if (debug.on()) 695 debug.log("subscription request(%d), demand=%s", n, demand); 696 writeScheduler.runOrSchedule(client.theExecutor()); 697 } 698 699 @Override 700 public void cancel() { 701 if (debug.on()) debug.log("subscription cancelled"); 702 if (cancelled) 703 return; //no-op 704 cancelled = true; 705 writeScheduler.runOrSchedule(client.theExecutor()); 706 } 707 } 708 } 709 710 HttpClient client() { 711 return client; 712 } 713 714 String dbgString() { 715 return "Http1Exchange"; 716 } 717 } 718