1 /*
2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http;
27 
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.http.HttpClient;
31 import java.net.http.HttpResponse.BodyHandler;
32 import java.net.http.HttpResponse.BodySubscriber;
33 import java.nio.ByteBuffer;
34 import java.util.Objects;
35 import java.util.concurrent.CompletableFuture;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.concurrent.ConcurrentLinkedDeque;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.Flow;
41 import jdk.internal.net.http.common.Demand;
42 import jdk.internal.net.http.common.Log;
43 import jdk.internal.net.http.common.FlowTube;
44 import jdk.internal.net.http.common.Logger;
45 import jdk.internal.net.http.common.SequentialScheduler;
46 import jdk.internal.net.http.common.MinimalFuture;
47 import jdk.internal.net.http.common.Utils;
48 import static java.net.http.HttpClient.Version.HTTP_1_1;
49 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
50 
51 /**
52  * Encapsulates one HTTP/1.1 request/response exchange.
53  */
54 class Http1Exchange<T> extends ExchangeImpl<T> {
55 
56     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
57     final HttpRequestImpl request; // main request
58     final Http1Request requestAction;
59     private volatile Http1Response<T> response;
60     final HttpConnection connection;
61     final HttpClientImpl client;
62     final Executor executor;
63     private final Http1AsyncReceiver asyncReceiver;
64 
65     /** Records a possible cancellation raised before any operation
66      * has been initiated, or an error received while sending the request. */
67     private Throwable failed;
68     private final List<CompletableFuture<?>> operations; // used for cancel
69 
70     /** Must be held when operating on any internal state or data. */
71     private final Object lock = new Object();
72 
73     /** Holds the outgoing data, either the headers or a request body part. Or
74      * an error from the request body publisher. At most there can be ~2 pieces
75      * of outgoing data ( onComplete|onError can be invoked without demand ).*/
76     final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();
77 
78     /** The write publisher, responsible for writing the complete request ( both
79      * headers and body ( if any ). */
80     private final Http1Publisher writePublisher = new Http1Publisher();
81 
82     /** Completed when the header have been published, or there is an error */
83     private final CompletableFuture<ExchangeImpl<T>> headersSentCF  = new MinimalFuture<>();
84      /** Completed when the body has been published, or there is an error */
85     private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
86 
87     /** The subscriber to the request's body published. Maybe null. */
88     private volatile Http1BodySubscriber bodySubscriber;
89 
90     enum State { INITIAL,
91                  HEADERS,
92                  BODY,
93                  ERROR,          // terminal state
94                  COMPLETING,
95                  COMPLETED }     // terminal state
96 
97     private State state = State.INITIAL;
98 
99     /** A carrier for either data or an error. Used to carry data, and communicate
100      * errors from the request ( both headers and body ) to the exchange. */
101     static class DataPair {
102         Throwable throwable;
103         List<ByteBuffer> data;
DataPair(List<ByteBuffer> data, Throwable throwable)104         DataPair(List<ByteBuffer> data, Throwable throwable){
105             this.data = data;
106             this.throwable = throwable;
107         }
108         @Override
toString()109         public String toString() {
110             return "DataPair [data=" + data + ", throwable=" + throwable + "]";
111         }
112     }
113 
114     /** An abstract supertype for HTTP/1.1 body subscribers. There are two
115      * concrete implementations: {@link Http1Request.StreamSubscriber}, and
116      * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
117      * fixed length bodies, respectively. */
118     static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
119         final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
120         private volatile Flow.Subscription subscription;
121         volatile boolean complete;
122         private final Logger debug;
Http1BodySubscriber(Logger debug)123         Http1BodySubscriber(Logger debug) {
124             assert debug != null;
125             this.debug = debug;
126         }
127 
128         /** Final sentinel in the stream of request body. */
129         static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
130 
request(long n)131         final void request(long n) {
132             if (debug.on())
133                 debug.log("Http1BodySubscriber requesting %d, from %s",
134                           n, subscription);
135             subscription.request(n);
136         }
137 
138         /** A current-state message suitable for inclusion in an exception detail message. */
currentStateMessage()139         abstract String currentStateMessage();
140 
isSubscribed()141         final boolean isSubscribed() {
142             return subscription != null;
143         }
144 
setSubscription(Flow.Subscription subscription)145         final void setSubscription(Flow.Subscription subscription) {
146             this.subscription = subscription;
147             whenSubscribed.complete(subscription);
148         }
149 
cancelSubscription()150         final void cancelSubscription() {
151             try {
152                 subscription.cancel();
153             } catch(Throwable t) {
154                 String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
155                 if (debug.on()) debug.log("%s: %s", msg, t);
156                 Log.logError("{0}: {1}", msg, (Object)t);
157             }
158         }
159 
completeSubscriber(Logger debug)160         static Http1BodySubscriber completeSubscriber(Logger debug) {
161             return new Http1BodySubscriber(debug) {
162                 @Override public void onSubscribe(Flow.Subscription subscription) { error(); }
163                 @Override public void onNext(ByteBuffer item) { error(); }
164                 @Override public void onError(Throwable throwable) { error(); }
165                 @Override public void onComplete() { error(); }
166                 @Override String currentStateMessage() { return null; }
167                 private void error() {
168                     throw new InternalError("should not reach here");
169                 }
170             };
171         }
172     }
173 
174     @Override
175     public String toString() {
176         return "HTTP/1.1 " + request.toString();
177     }
178 
179     HttpRequestImpl request() {
180         return request;
181     }
182 
183     Http1Exchange(Exchange<T> exchange, HttpConnection connection)
184         throws IOException
185     {
186         super(exchange);
187         this.request = exchange.request();
188         this.client = exchange.client();
189         this.executor = exchange.executor();
190         this.operations = new LinkedList<>();
191         operations.add(headersSentCF);
192         operations.add(bodySentCF);
193         if (connection != null) {
194             this.connection = connection;
195         } else {
196             InetSocketAddress addr = request.getAddress();
197             this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
198         }
199         this.requestAction = new Http1Request(request, this);
200         this.asyncReceiver = new Http1AsyncReceiver(executor, this);
201     }
202 
203     @Override
204     HttpConnection connection() {
205         return connection;
206     }
207 
208     private void connectFlows(HttpConnection connection) {
209         FlowTube tube =  connection.getConnectionFlow();
210         if (debug.on()) debug.log("%s connecting flows", tube);
211 
212         // Connect the flow to our Http1TubeSubscriber:
213         //   asyncReceiver.subscriber().
214         tube.connectFlows(writePublisher,
215                           asyncReceiver.subscriber());
216     }
217 
218     @Override
219     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
220         // create the response before sending the request headers, so that
221         // the response can set the appropriate receivers.
222         if (debug.on()) debug.log("Sending headers only");
223         // If the first attempt to read something triggers EOF, or
224         // IOException("channel reset by peer"), we're going to retry.
225         // Instruct the asyncReceiver to throw ConnectionExpiredException
226         // to force a retry.
227         asyncReceiver.setRetryOnError(true);
228         if (response == null) {
229             response = new Http1Response<>(connection, this, asyncReceiver);
230         }
231 
232         if (debug.on()) debug.log("response created in advance");
233 
234         CompletableFuture<Void> connectCF;
235         if (!connection.connected()) {
236             if (debug.on()) debug.log("initiating connect async");
237             connectCF = connection.connectAsync(exchange)
238                     .thenCompose(unused -> connection.finishConnect());
239             Throwable cancelled;
240             synchronized (lock) {
241                 if ((cancelled = failed) == null) {
242                     operations.add(connectCF);
243                 }
244             }
245             if (cancelled != null) {
246                 if (client.isSelectorThread()) {
247                     executor.execute(() ->
248                         connectCF.completeExceptionally(cancelled));
249                 } else {
250                     connectCF.completeExceptionally(cancelled);
251                 }
252             }
253         } else {
254             connectCF = new MinimalFuture<>();
255             connectCF.complete(null);
256         }
257 
258         return connectCF
259                 .thenCompose(unused -> {
260                     CompletableFuture<Void> cf = new MinimalFuture<>();
261                     try {
262                         asyncReceiver.whenFinished.whenComplete((r,t) -> {
263                             if (t != null) {
264                                 if (debug.on())
265                                     debug.log("asyncReceiver finished (failed=%s)", (Object)t);
266                                 if (!headersSentCF.isDone())
267                                     headersSentCF.completeAsync(() -> this, executor);
268                             }
269                         });
270                         connectFlows(connection);
271 
272                         if (debug.on()) debug.log("requestAction.headers");
273                         List<ByteBuffer> data = requestAction.headers();
274                         synchronized (lock) {
275                             state = State.HEADERS;
276                         }
277                         if (debug.on()) debug.log("setting outgoing with headers");
278                         assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
279                         appendToOutgoing(data);
280                         cf.complete(null);
281                         return cf;
282                     } catch (Throwable t) {
283                         if (debug.on()) debug.log("Failed to send headers: %s", t);
284                         headersSentCF.completeExceptionally(t);
285                         bodySentCF.completeExceptionally(t);
286                         connection.close();
287                         cf.completeExceptionally(t);
288                         return cf;
289                     } })
290                 .thenCompose(unused -> headersSentCF);
291     }
292 
293     private void cancelIfFailed(Flow.Subscription s) {
294         asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
295             if (debug.on())
296                 debug.log("asyncReceiver finished (failed=%s)", (Object)t);
297             if (t != null) {
298                 s.cancel();
299                 // Don't complete exceptionally here as 't'
300                 // might not be the right exception: it will
301                 // not have been decorated yet.
302                 // t is an exception raised by the read side,
303                 // an EOFException or Broken Pipe...
304                 // We are cancelling the BodyPublisher subscription
305                 // and completing bodySentCF to allow the next step
306                 // to flow and call readHeaderAsync, which will
307                 // get the right exception from the asyncReceiver.
308                 bodySentCF.complete(this);
309             }
310         }, executor);
311     }
312 
313     @Override
314     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
315         assert headersSentCF.isDone();
316         if (debug.on()) debug.log("sendBodyAsync");
317         try {
318             bodySubscriber = requestAction.continueRequest();
319             if (debug.on()) debug.log("bodySubscriber is %s",
320                     bodySubscriber == null ? null : bodySubscriber.getClass());
321             if (bodySubscriber == null) {
322                 bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
323                 appendToOutgoing(Http1BodySubscriber.COMPLETED);
324             } else {
325                 // start
326                 bodySubscriber.whenSubscribed
327                         .thenAccept((s) -> cancelIfFailed(s))
328                         .thenAccept((s) -> requestMoreBody());
329             }
330         } catch (Throwable t) {
331             cancelImpl(t);
332             bodySentCF.completeExceptionally(t);
333         }
334         return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
335     }
336 
337     @Override
338     CompletableFuture<Response> getResponseAsync(Executor executor) {
339         if (debug.on()) debug.log("reading headers");
340         CompletableFuture<Response> cf = response.readHeadersAsync(executor);
341         Throwable cause;
342         synchronized (lock) {
343             operations.add(cf);
344             cause = failed;
345             failed = null;
346         }
347 
348         if (cause != null) {
349             Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
350                             + "\n\tCompleting exceptionally with {2}\n",
351                          request.uri(),
352                          request.timeout().isPresent() ?
353                             // calling duration.toMillis() can throw an exception.
354                             // this is just debugging, we don't care if it overflows.
355                             (request.timeout().get().getSeconds() * 1000
356                              + request.timeout().get().getNano() / 1000000) : -1,
357                          cause);
358             boolean acknowledged = cf.completeExceptionally(cause);
359             if (debug.on())
360                 debug.log(acknowledged ? ("completed response with " + cause)
361                           : ("response already completed, ignoring " + cause));
362         }
363         return Utils.wrapForDebug(debug, "getResponseAsync", cf);
364     }
365 
366     @Override
367     CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
368                                        boolean returnConnectionToPool,
369                                        Executor executor)
370     {
371         BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),
372                                                                   response.responseHeaders(),
373                                                                   HTTP_1_1));
374         CompletableFuture<T> bodyCF = response.readBody(bs,
375                                                         returnConnectionToPool,
376                                                         executor);
377         return bodyCF;
378     }
379 
380     @Override
381     CompletableFuture<Void> ignoreBody() {
382         return response.ignoreBody(executor);
383     }
384 
385     ByteBuffer drainLeftOverBytes() {
386         synchronized (lock) {
387             asyncReceiver.stop();
388             return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER);
389         }
390     }
391 
392     void released() {
393         Http1Response<T> resp = this.response;
394         if (resp != null) resp.completed();
395         asyncReceiver.clear();
396     }
397 
398     void completed() {
399         Http1Response<T> resp = this.response;
400         if (resp != null) resp.completed();
401     }
402 
403     /**
404      * Cancel checks to see if request and responseAsync finished already.
405      * If not it closes the connection and completes all pending operations
406      */
407     @Override
408     void cancel() {
409         cancelImpl(new IOException("Request cancelled"));
410     }
411 
412     /**
413      * Cancel checks to see if request and responseAsync finished already.
414      * If not it closes the connection and completes all pending operations
415      */
416     @Override
417     void cancel(IOException cause) {
418         cancelImpl(cause);
419     }
420 
421     private void cancelImpl(Throwable cause) {
422         LinkedList<CompletableFuture<?>> toComplete = null;
423         int count = 0;
424         Throwable error;
425         synchronized (lock) {
426             if ((error = failed) == null) {
427                 failed = error = cause;
428             }
429             if (debug.on()) {
430                 debug.log(request.uri() + ": " + error);
431             }
432             if (requestAction != null && requestAction.finished()
433                     && response != null && response.finished()) {
434                 return;
435             }
436             writePublisher.writeScheduler.stop();
437             if (operations.isEmpty()) {
438                 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
439                                 + "\n\tCan''t cancel yet with {2}",
440                              request.uri(),
441                              request.timeout().isPresent() ?
442                                 // calling duration.toMillis() can throw an exception.
443                                 // this is just debugging, we don't care if it overflows.
444                                 (request.timeout().get().getSeconds() * 1000
445                                  + request.timeout().get().getNano() / 1000000) : -1,
446                              cause);
447             } else {
448                 for (CompletableFuture<?> cf : operations) {
449                     if (!cf.isDone()) {
450                         if (toComplete == null) toComplete = new LinkedList<>();
451                         toComplete.add(cf);
452                         count++;
453                     }
454                 }
455                 operations.clear();
456             }
457         }
458         try {
459             Log.logError("Http1Exchange.cancel: count=" + count);
460             if (toComplete != null) {
461                 // We might be in the selector thread in case of timeout, when
462                 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline()
463                 // There may or may not be other places that reach here
464                 // from the SelectorManager thread, so just make sure we
465                 // don't complete any CF from within the selector manager
466                 // thread.
467                 Executor exec = client.isSelectorThread()
468                         ? executor
469                         : this::runInline;
470                 Throwable x = error;
471                 while (!toComplete.isEmpty()) {
472                     CompletableFuture<?> cf = toComplete.poll();
473                     exec.execute(() -> {
474                         if (cf.completeExceptionally(x)) {
475                             if (debug.on())
476                                 debug.log("%s: completed cf with %s", request.uri(), x);
477                         }
478                     });
479                 }
480             }
481         } finally {
482             connection.close();
483         }
484     }
485 
486     private void runInline(Runnable run) {
487         assert !client.isSelectorThread();
488         run.run();
489     }
490 
491     /** Returns true if this exchange was canceled. */
492     boolean isCanceled() {
493         synchronized (lock) {
494             return failed != null;
495         }
496     }
497 
498     /** Returns the cause for which this exchange was canceled, if available. */
499     Throwable getCancelCause() {
500         synchronized (lock) {
501             return failed;
502         }
503     }
504 
505     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */
506     void appendToOutgoing(Throwable throwable) {
507         appendToOutgoing(new DataPair(null, throwable));
508     }
509 
510     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */
511     void appendToOutgoing(List<ByteBuffer> item) {
512         appendToOutgoing(new DataPair(item, null));
513     }
514 
515     private void appendToOutgoing(DataPair dp) {
516         if (debug.on()) debug.log("appending to outgoing " + dp);
517         outgoing.add(dp);
518         writePublisher.writeScheduler.runOrSchedule();
519     }
520 
521     /** Tells whether, or not, there is any outgoing data that can be published,
522      * or if there is an error. */
523     private boolean hasOutgoing() {
524         return !outgoing.isEmpty();
525     }
526 
527     private void requestMoreBody() {
528         try {
529             if (debug.on()) debug.log("requesting more request body from the subscriber");
530             bodySubscriber.request(1);
531         } catch (Throwable t) {
532             if (debug.on()) debug.log("Subscription::request failed", t);
533             cancelImpl(t);
534             bodySentCF.completeExceptionally(t);
535         }
536     }
537 
538     // Invoked only by the publisher
539     // ALL tasks should execute off the Selector-Manager thread
540     /** Returns the next portion of the HTTP request, or the error. */
541     private DataPair getOutgoing() {
542         final Executor exec = client.theExecutor();
543         final DataPair dp = outgoing.pollFirst();
544 
545         if (writePublisher.cancelled) {
546             if (debug.on()) debug.log("cancelling upstream publisher");
547             if (bodySubscriber != null) {
548                 exec.execute(bodySubscriber::cancelSubscription);
549             } else if (debug.on()) {
550                 debug.log("bodySubscriber is null");
551             }
552             headersSentCF.completeAsync(() -> this, exec);
553             bodySentCF.completeAsync(() -> this, exec);
554             return null;
555         }
556 
557         if (dp == null)  // publisher has not published anything yet
558             return null;
559 
560         if (dp.throwable != null) {
561             synchronized (lock) {
562                 state = State.ERROR;
563             }
564             exec.execute(() -> {
565                 headersSentCF.completeExceptionally(dp.throwable);
566                 bodySentCF.completeExceptionally(dp.throwable);
567                 connection.close();
568             });
569             return dp;
570         }
571 
572         switch (state) {
573             case HEADERS:
574                 synchronized (lock) {
575                     state = State.BODY;
576                 }
577                 // completeAsync, since dependent tasks should run in another thread
578                 if (debug.on()) debug.log("initiating completion of headersSentCF");
579                 headersSentCF.completeAsync(() -> this, exec);
580                 break;
581             case BODY:
582                 if (dp.data == Http1BodySubscriber.COMPLETED) {
583                     synchronized (lock) {
584                         state = State.COMPLETING;
585                     }
586                     if (debug.on()) debug.log("initiating completion of bodySentCF");
587                     bodySentCF.completeAsync(() -> this, exec);
588                 } else {
589                     exec.execute(this::requestMoreBody);
590                 }
591                 break;
592             case INITIAL:
593             case ERROR:
594             case COMPLETING:
595             case COMPLETED:
596             default:
597                 assert false : "Unexpected state:" + state;
598         }
599 
600         return dp;
601     }
602 
603     /** A Publisher of HTTP/1.1 headers and request body. */
604     final class Http1Publisher implements FlowTube.TubePublisher {
605 
606         final Logger debug = Utils.getDebugLogger(this::dbgString);
607         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
608         volatile boolean cancelled;
609         final Http1WriteSubscription subscription = new Http1WriteSubscription();
610         final Demand demand = new Demand();
611         final SequentialScheduler writeScheduler =
612                 SequentialScheduler.synchronizedScheduler(new WriteTask());
613 
614         @Override
615         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
616             assert state == State.INITIAL;
617             Objects.requireNonNull(s);
618             assert subscriber == null;
619 
620             subscriber = s;
621             if (debug.on()) debug.log("got subscriber: %s", s);
622             s.onSubscribe(subscription);
623         }
624 
625         volatile String dbgTag;
626         String dbgString() {
627             String tag = dbgTag;
628             Object flow = connection.getConnectionFlow();
629             if (tag == null && flow != null) {
630                 dbgTag = tag = "Http1Publisher(" + flow + ")";
631             } else if (tag == null) {
632                 tag = "Http1Publisher(?)";
633             }
634             return tag;
635         }
636 
637         final class WriteTask implements Runnable {
638             @Override
639             public void run() {
640                 assert state != State.COMPLETED : "Unexpected state:" + state;
641                 if (debug.on()) debug.log("WriteTask");
642 
643                 if (cancelled) {
644                     if (debug.on()) debug.log("handling cancellation");
645                     writeScheduler.stop();
646                     getOutgoing();
647                     return;
648                 }
649 
650                 if (subscriber == null) {
651                     if (debug.on()) debug.log("no subscriber yet");
652                     return;
653                 }
654                 if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
655                 while (hasOutgoing() && demand.tryDecrement()) {
656                     DataPair dp = getOutgoing();
657                     if (dp == null)
658                         break;
659 
660                     if (dp.throwable != null) {
661                         if (debug.on()) debug.log("onError");
662                         // Do not call the subscriber's onError, it is not required.
663                         writeScheduler.stop();
664                     } else {
665                         List<ByteBuffer> data = dp.data;
666                         if (data == Http1BodySubscriber.COMPLETED) {
667                             synchronized (lock) {
668                                 assert state == State.COMPLETING : "Unexpected state:" + state;
669                                 state = State.COMPLETED;
670                             }
671                             if (debug.on())
672                                 debug.log("completed, stopping %s", writeScheduler);
673                             writeScheduler.stop();
674                             // Do nothing more. Just do not publish anything further.
675                             // The next Subscriber will eventually take over.
676 
677                         } else {
678                             if (debug.on())
679                                 debug.log("onNext with " + Utils.remaining(data) + " bytes");
680                             subscriber.onNext(data);
681                         }
682                     }
683                 }
684             }
685         }
686 
687         final class Http1WriteSubscription implements Flow.Subscription {
688 
689             @Override
690             public void request(long n) {
691                 if (cancelled)
692                     return;  //no-op
693                 demand.increase(n);
694                 if (debug.on())
695                     debug.log("subscription request(%d), demand=%s", n, demand);
696                 writeScheduler.runOrSchedule(client.theExecutor());
697             }
698 
699             @Override
700             public void cancel() {
701                 if (debug.on()) debug.log("subscription cancelled");
702                 if (cancelled)
703                     return;  //no-op
704                 cancelled = true;
705                 writeScheduler.runOrSchedule(client.theExecutor());
706             }
707         }
708     }
709 
710     HttpClient client() {
711         return client;
712     }
713 
714     String dbgString() {
715         return "Http1Exchange";
716     }
717 }
718