1 /*
2  * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http;
27 
28 import java.io.IOException;
29 import java.net.InetSocketAddress;
30 import java.net.http.HttpClient;
31 import java.net.http.HttpResponse;
32 import java.net.http.HttpResponse.BodyHandler;
33 import java.net.http.HttpResponse.BodySubscriber;
34 import java.nio.ByteBuffer;
35 import java.util.Objects;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.concurrent.ConcurrentLinkedDeque;
40 import java.util.concurrent.Executor;
41 import java.util.concurrent.Flow;
42 import jdk.internal.net.http.common.Demand;
43 import jdk.internal.net.http.common.Log;
44 import jdk.internal.net.http.common.FlowTube;
45 import jdk.internal.net.http.common.Logger;
46 import jdk.internal.net.http.common.SequentialScheduler;
47 import jdk.internal.net.http.common.MinimalFuture;
48 import jdk.internal.net.http.common.Utils;
49 import static java.net.http.HttpClient.Version.HTTP_1_1;
50 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;
51 
52 /**
53  * Encapsulates one HTTP/1.1 request/response exchange.
54  */
55 class Http1Exchange<T> extends ExchangeImpl<T> {
56 
57     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
58     final HttpRequestImpl request; // main request
59     final Http1Request requestAction;
60     private volatile Http1Response<T> response;
61     final HttpConnection connection;
62     final HttpClientImpl client;
63     final Executor executor;
64     private final Http1AsyncReceiver asyncReceiver;
65     private volatile boolean upgraded;
66 
67     /** Records a possible cancellation raised before any operation
68      * has been initiated, or an error received while sending the request. */
69     private Throwable failed;
70     private final List<CompletableFuture<?>> operations; // used for cancel
71 
72     /** Must be held when operating on any internal state or data. */
73     private final Object lock = new Object();
74 
75     /** Holds the outgoing data, either the headers or a request body part. Or
76      * an error from the request body publisher. At most there can be ~2 pieces
77      * of outgoing data ( onComplete|onError can be invoked without demand ).*/
78     final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();
79 
80     /** The write publisher, responsible for writing the complete request ( both
81      * headers and body ( if any ). */
82     private final Http1Publisher writePublisher = new Http1Publisher();
83 
84     /** Completed when the header have been published, or there is an error */
85     private final CompletableFuture<ExchangeImpl<T>> headersSentCF  = new MinimalFuture<>();
86      /** Completed when the body has been published, or there is an error */
87     private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
88 
89     /** The subscriber to the request's body published. Maybe null. */
90     private volatile Http1BodySubscriber bodySubscriber;
91 
92     enum State { INITIAL,
93                  HEADERS,
94                  BODY,
95                  ERROR,          // terminal state
96                  COMPLETING,
97                  COMPLETED }     // terminal state
98 
99     private State state = State.INITIAL;
100 
101     /** A carrier for either data or an error. Used to carry data, and communicate
102      * errors from the request ( both headers and body ) to the exchange. */
103     static class DataPair {
104         Throwable throwable;
105         List<ByteBuffer> data;
DataPair(List<ByteBuffer> data, Throwable throwable)106         DataPair(List<ByteBuffer> data, Throwable throwable){
107             this.data = data;
108             this.throwable = throwable;
109         }
110         @Override
toString()111         public String toString() {
112             return "DataPair [data=" + data + ", throwable=" + throwable + "]";
113         }
114     }
115 
116     /** An abstract supertype for HTTP/1.1 body subscribers. There are two
117      * concrete implementations: {@link Http1Request.StreamSubscriber}, and
118      * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
119      * fixed length bodies, respectively. */
120     static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
121         final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
122         private volatile Flow.Subscription subscription;
123         volatile boolean complete;
124         private final Logger debug;
Http1BodySubscriber(Logger debug)125         Http1BodySubscriber(Logger debug) {
126             assert debug != null;
127             this.debug = debug;
128         }
129 
130         /** Final sentinel in the stream of request body. */
131         static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
132 
request(long n)133         final void request(long n) {
134             if (debug.on())
135                 debug.log("Http1BodySubscriber requesting %d, from %s",
136                           n, subscription);
137             subscription.request(n);
138         }
139 
140         /** A current-state message suitable for inclusion in an exception detail message. */
currentStateMessage()141         abstract String currentStateMessage();
142 
isSubscribed()143         final boolean isSubscribed() {
144             return subscription != null;
145         }
146 
setSubscription(Flow.Subscription subscription)147         final void setSubscription(Flow.Subscription subscription) {
148             this.subscription = subscription;
149             whenSubscribed.complete(subscription);
150         }
151 
cancelSubscription()152         final void cancelSubscription() {
153             try {
154                 subscription.cancel();
155             } catch(Throwable t) {
156                 String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
157                 if (debug.on()) debug.log("%s: %s", msg, t);
158                 Log.logError("{0}: {1}", msg, (Object)t);
159             }
160         }
161 
completeSubscriber(Logger debug)162         static Http1BodySubscriber completeSubscriber(Logger debug) {
163             return new Http1BodySubscriber(debug) {
164                 @Override public void onSubscribe(Flow.Subscription subscription) { error(); }
165                 @Override public void onNext(ByteBuffer item) { error(); }
166                 @Override public void onError(Throwable throwable) { error(); }
167                 @Override public void onComplete() { error(); }
168                 @Override String currentStateMessage() { return null; }
169                 private void error() {
170                     throw new InternalError("should not reach here");
171                 }
172             };
173         }
174     }
175 
176     @Override
177     public String toString() {
178         return "HTTP/1.1 " + request.toString();
179     }
180 
181     HttpRequestImpl request() {
182         return request;
183     }
184 
185     Http1Exchange(Exchange<T> exchange, HttpConnection connection)
186         throws IOException
187     {
188         super(exchange);
189         this.request = exchange.request();
190         this.client = exchange.client();
191         this.executor = exchange.executor();
192         this.operations = new LinkedList<>();
193         operations.add(headersSentCF);
194         operations.add(bodySentCF);
195         if (connection != null) {
196             this.connection = connection;
197         } else {
198             InetSocketAddress addr = request.getAddress();
199             this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
200         }
201         this.requestAction = new Http1Request(request, this);
202         this.asyncReceiver = new Http1AsyncReceiver(executor, this);
203     }
204 
205     @Override
206     HttpConnection connection() {
207         return connection;
208     }
209 
210     private void connectFlows(HttpConnection connection) {
211         FlowTube tube =  connection.getConnectionFlow();
212         if (debug.on()) debug.log("%s connecting flows", tube);
213 
214         // Connect the flow to our Http1TubeSubscriber:
215         //   asyncReceiver.subscriber().
216         tube.connectFlows(writePublisher,
217                           asyncReceiver.subscriber());
218     }
219 
220     @Override
221     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
222         // create the response before sending the request headers, so that
223         // the response can set the appropriate receivers.
224         if (debug.on()) debug.log("Sending headers only");
225         // If the first attempt to read something triggers EOF, or
226         // IOException("channel reset by peer"), we're going to retry.
227         // Instruct the asyncReceiver to throw ConnectionExpiredException
228         // to force a retry.
229         asyncReceiver.setRetryOnError(true);
230         if (response == null) {
231             response = new Http1Response<>(connection, this, asyncReceiver);
232         }
233 
234         if (debug.on()) debug.log("response created in advance");
235 
236         CompletableFuture<Void> connectCF;
237         if (!connection.connected()) {
238             if (debug.on()) debug.log("initiating connect async");
239             connectCF = connection.connectAsync(exchange)
240                     .thenCompose(unused -> connection.finishConnect());
241             Throwable cancelled;
242             synchronized (lock) {
243                 if ((cancelled = failed) == null) {
244                     operations.add(connectCF);
245                 }
246             }
247             if (cancelled != null) {
248                 if (client.isSelectorThread()) {
249                     executor.execute(() ->
250                         connectCF.completeExceptionally(cancelled));
251                 } else {
252                     connectCF.completeExceptionally(cancelled);
253                 }
254             }
255         } else {
256             connectCF = new MinimalFuture<>();
257             connectCF.complete(null);
258         }
259 
260         return connectCF
261                 .thenCompose(unused -> {
262                     CompletableFuture<Void> cf = new MinimalFuture<>();
263                     try {
264                         asyncReceiver.whenFinished.whenComplete((r,t) -> {
265                             if (t != null) {
266                                 if (debug.on())
267                                     debug.log("asyncReceiver finished (failed=%s)", (Object)t);
268                                 if (!headersSentCF.isDone())
269                                     headersSentCF.completeAsync(() -> this, executor);
270                             }
271                         });
272                         connectFlows(connection);
273 
274                         if (debug.on()) debug.log("requestAction.headers");
275                         List<ByteBuffer> data = requestAction.headers();
276                         synchronized (lock) {
277                             state = State.HEADERS;
278                         }
279                         if (debug.on()) debug.log("setting outgoing with headers");
280                         assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
281                         appendToOutgoing(data);
282                         cf.complete(null);
283                         return cf;
284                     } catch (Throwable t) {
285                         if (debug.on()) debug.log("Failed to send headers: %s", t);
286                         headersSentCF.completeExceptionally(t);
287                         bodySentCF.completeExceptionally(t);
288                         connection.close();
289                         cf.completeExceptionally(t);
290                         return cf;
291                     } })
292                 .thenCompose(unused -> headersSentCF);
293     }
294 
295     private void cancelIfFailed(Flow.Subscription s) {
296         asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
297             if (debug.on())
298                 debug.log("asyncReceiver finished (failed=%s)", (Object)t);
299             if (t != null) {
300                 s.cancel();
301                 // Don't complete exceptionally here as 't'
302                 // might not be the right exception: it will
303                 // not have been decorated yet.
304                 // t is an exception raised by the read side,
305                 // an EOFException or Broken Pipe...
306                 // We are cancelling the BodyPublisher subscription
307                 // and completing bodySentCF to allow the next step
308                 // to flow and call readHeaderAsync, which will
309                 // get the right exception from the asyncReceiver.
310                 bodySentCF.complete(this);
311             }
312         }, executor);
313     }
314 
315     @Override
316     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
317         assert headersSentCF.isDone();
318         if (debug.on()) debug.log("sendBodyAsync");
319         try {
320             bodySubscriber = requestAction.continueRequest();
321             if (debug.on()) debug.log("bodySubscriber is %s",
322                     bodySubscriber == null ? null : bodySubscriber.getClass());
323             if (bodySubscriber == null) {
324                 bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
325                 appendToOutgoing(Http1BodySubscriber.COMPLETED);
326             } else {
327                 // start
328                 bodySubscriber.whenSubscribed
329                         .thenAccept((s) -> cancelIfFailed(s))
330                         .thenAccept((s) -> requestMoreBody());
331             }
332         } catch (Throwable t) {
333             cancelImpl(t);
334             bodySentCF.completeExceptionally(t);
335         }
336         return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF);
337     }
338 
339     @Override
340     CompletableFuture<Response> getResponseAsync(Executor executor) {
341         if (debug.on()) debug.log("reading headers");
342         CompletableFuture<Response> cf = response.readHeadersAsync(executor);
343         Throwable cause;
344         synchronized (lock) {
345             operations.add(cf);
346             cause = failed;
347             failed = null;
348         }
349 
350         if (cause != null) {
351             Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
352                             + "\n\tCompleting exceptionally with {2}\n",
353                          request.uri(),
354                          request.timeout().isPresent() ?
355                             // calling duration.toMillis() can throw an exception.
356                             // this is just debugging, we don't care if it overflows.
357                             (request.timeout().get().getSeconds() * 1000
358                              + request.timeout().get().getNano() / 1000000) : -1,
359                          cause);
360             boolean acknowledged = cf.completeExceptionally(cause);
361             if (debug.on())
362                 debug.log(acknowledged ? ("completed response with " + cause)
363                           : ("response already completed, ignoring " + cause));
364         }
365         return Utils.wrapForDebug(debug, "getResponseAsync", cf);
366     }
367 
368     @Override
369     CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
370                                        boolean returnConnectionToPool,
371                                        Executor executor)
372     {
373         BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),
374                                                                   response.responseHeaders(),
375                                                                   HTTP_1_1));
376         CompletableFuture<T> bodyCF = response.readBody(bs,
377                                                         returnConnectionToPool,
378                                                         executor);
379         return bodyCF;
380     }
381 
382     @Override
383     CompletableFuture<Void> ignoreBody() {
384         return response.ignoreBody(executor);
385     }
386 
387     // Used for those response codes that have no body associated
388     @Override
389     public void nullBody(HttpResponse<T> resp, Throwable t) {
390        response.nullBody(resp, t);
391     }
392 
393 
394     ByteBuffer drainLeftOverBytes() {
395         synchronized (lock) {
396             asyncReceiver.stop();
397             return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER);
398         }
399     }
400 
401     void released() {
402         Http1Response<T> resp = this.response;
403         if (resp != null) resp.completed();
404         asyncReceiver.clear();
405     }
406 
407     void completed() {
408         Http1Response<T> resp = this.response;
409         if (resp != null) resp.completed();
410     }
411 
412     /**
413      * Cancel checks to see if request and responseAsync finished already.
414      * If not it closes the connection and completes all pending operations
415      */
416     @Override
417     void cancel() {
418         cancelImpl(new IOException("Request cancelled"));
419     }
420 
421     /**
422      * Cancel checks to see if request and responseAsync finished already.
423      * If not it closes the connection and completes all pending operations
424      */
425     @Override
426     void cancel(IOException cause) {
427         cancelImpl(cause);
428     }
429 
430     private void cancelImpl(Throwable cause) {
431         LinkedList<CompletableFuture<?>> toComplete = null;
432         int count = 0;
433         Throwable error;
434         synchronized (lock) {
435             if ((error = failed) == null) {
436                 failed = error = cause;
437             }
438             if (debug.on()) {
439                 debug.log(request.uri() + ": " + error);
440             }
441             if (requestAction != null && requestAction.finished()
442                     && response != null && response.finished()) {
443                 return;
444             }
445             writePublisher.writeScheduler.stop();
446             if (operations.isEmpty()) {
447                 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
448                                 + "\n\tCan''t cancel yet with {2}",
449                              request.uri(),
450                              request.timeout().isPresent() ?
451                                 // calling duration.toMillis() can throw an exception.
452                                 // this is just debugging, we don't care if it overflows.
453                                 (request.timeout().get().getSeconds() * 1000
454                                  + request.timeout().get().getNano() / 1000000) : -1,
455                              cause);
456             } else {
457                 for (CompletableFuture<?> cf : operations) {
458                     if (!cf.isDone()) {
459                         if (toComplete == null) toComplete = new LinkedList<>();
460                         toComplete.add(cf);
461                         count++;
462                     }
463                 }
464                 operations.clear();
465             }
466         }
467         try {
468             Log.logError("Http1Exchange.cancel: count=" + count);
469             if (toComplete != null) {
470                 // We might be in the selector thread in case of timeout, when
471                 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline()
472                 // There may or may not be other places that reach here
473                 // from the SelectorManager thread, so just make sure we
474                 // don't complete any CF from within the selector manager
475                 // thread.
476                 Executor exec = client.isSelectorThread()
477                         ? executor
478                         : this::runInline;
479                 Throwable x = error;
480                 while (!toComplete.isEmpty()) {
481                     CompletableFuture<?> cf = toComplete.poll();
482                     exec.execute(() -> {
483                         if (cf.completeExceptionally(x)) {
484                             if (debug.on())
485                                 debug.log("%s: completed cf with %s", request.uri(), x);
486                         }
487                     });
488                 }
489             }
490         } finally {
491             if (!upgraded)
492                 connection.close();
493         }
494     }
495 
496     void upgraded() {
497         upgraded = true;
498     }
499 
500     private void runInline(Runnable run) {
501         assert !client.isSelectorThread();
502         run.run();
503     }
504 
505     /** Returns true if this exchange was canceled. */
506     boolean isCanceled() {
507         synchronized (lock) {
508             return failed != null;
509         }
510     }
511 
512     /** Returns the cause for which this exchange was canceled, if available. */
513     Throwable getCancelCause() {
514         synchronized (lock) {
515             return failed;
516         }
517     }
518 
519     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */
520     void appendToOutgoing(Throwable throwable) {
521         appendToOutgoing(new DataPair(null, throwable));
522     }
523 
524     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */
525     void appendToOutgoing(List<ByteBuffer> item) {
526         appendToOutgoing(new DataPair(item, null));
527     }
528 
529     private void appendToOutgoing(DataPair dp) {
530         if (debug.on()) debug.log("appending to outgoing " + dp);
531         outgoing.add(dp);
532         writePublisher.writeScheduler.runOrSchedule();
533     }
534 
535     /** Tells whether, or not, there is any outgoing data that can be published,
536      * or if there is an error. */
537     private boolean hasOutgoing() {
538         return !outgoing.isEmpty();
539     }
540 
541     private void requestMoreBody() {
542         try {
543             if (debug.on()) debug.log("requesting more request body from the subscriber");
544             bodySubscriber.request(1);
545         } catch (Throwable t) {
546             if (debug.on()) debug.log("Subscription::request failed", t);
547             cancelImpl(t);
548             bodySentCF.completeExceptionally(t);
549         }
550     }
551 
552     private void cancelUpstreamSubscription() {
553         final Executor exec = client.theExecutor();
554         if (debug.on()) debug.log("cancelling upstream publisher");
555         if (bodySubscriber != null) {
556             exec.execute(bodySubscriber::cancelSubscription);
557         } else if (debug.on()) {
558             debug.log("bodySubscriber is null");
559         }
560     }
561 
562     // Invoked only by the publisher
563     // ALL tasks should execute off the Selector-Manager thread
564     /** Returns the next portion of the HTTP request, or the error. */
565     private DataPair getOutgoing() {
566         final Executor exec = client.theExecutor();
567         final DataPair dp = outgoing.pollFirst();
568 
569         if (writePublisher.cancelled) {
570             cancelUpstreamSubscription();
571             headersSentCF.completeAsync(() -> this, exec);
572             bodySentCF.completeAsync(() -> this, exec);
573             return null;
574         }
575 
576         if (dp == null)  // publisher has not published anything yet
577             return null;
578 
579         if (dp.throwable != null) {
580             synchronized (lock) {
581                 state = State.ERROR;
582             }
583             exec.execute(() -> {
584                 headersSentCF.completeExceptionally(dp.throwable);
585                 bodySentCF.completeExceptionally(dp.throwable);
586                 connection.close();
587             });
588             return dp;
589         }
590 
591         switch (state) {
592             case HEADERS:
593                 synchronized (lock) {
594                     state = State.BODY;
595                 }
596                 // completeAsync, since dependent tasks should run in another thread
597                 if (debug.on()) debug.log("initiating completion of headersSentCF");
598                 headersSentCF.completeAsync(() -> this, exec);
599                 break;
600             case BODY:
601                 if (dp.data == Http1BodySubscriber.COMPLETED) {
602                     synchronized (lock) {
603                         state = State.COMPLETING;
604                     }
605                     if (debug.on()) debug.log("initiating completion of bodySentCF");
606                     bodySentCF.completeAsync(() -> this, exec);
607                 } else {
608                     exec.execute(this::requestMoreBody);
609                 }
610                 break;
611             case INITIAL:
612             case ERROR:
613             case COMPLETING:
614             case COMPLETED:
615             default:
616                 assert false : "Unexpected state:" + state;
617         }
618 
619         return dp;
620     }
621 
622     /** A Publisher of HTTP/1.1 headers and request body. */
623     final class Http1Publisher implements FlowTube.TubePublisher {
624 
625         final Logger debug = Utils.getDebugLogger(this::dbgString);
626         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
627         volatile boolean cancelled;
628         final Http1WriteSubscription subscription = new Http1WriteSubscription();
629         final Demand demand = new Demand();
630         final SequentialScheduler writeScheduler =
631                 SequentialScheduler.synchronizedScheduler(new WriteTask());
632 
633         @Override
634         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
635             assert state == State.INITIAL;
636             Objects.requireNonNull(s);
637             assert subscriber == null;
638 
639             subscriber = s;
640             if (debug.on()) debug.log("got subscriber: %s", s);
641             s.onSubscribe(subscription);
642         }
643 
644         volatile String dbgTag;
645         String dbgString() {
646             String tag = dbgTag;
647             Object flow = connection.getConnectionFlow();
648             if (tag == null && flow != null) {
649                 dbgTag = tag = "Http1Publisher(" + flow + ")";
650             } else if (tag == null) {
651                 tag = "Http1Publisher(?)";
652             }
653             return tag;
654         }
655 
656         @SuppressWarnings("fallthrough")
657         private boolean checkRequestCancelled() {
658             if (exchange.multi.requestCancelled()) {
659                 if (debug.on()) debug.log("request cancelled");
660                 if (subscriber == null) {
661                     if (debug.on()) debug.log("no subscriber yet");
662                     return true;
663                 }
664                 switch (state) {
665                     case BODY:
666                         cancelUpstreamSubscription();
667                         // fall trough to HEADERS
668                     case HEADERS:
669                         Throwable cause = getCancelCause();
670                         if (cause == null) cause = new IOException("Request cancelled");
671                         subscriber.onError(cause);
672                         writeScheduler.stop();
673                         return true;
674                 }
675             }
676             return false;
677         }
678 
679 
680         final class WriteTask implements Runnable {
681             @Override
682             public void run() {
683                 assert state != State.COMPLETED : "Unexpected state:" + state;
684                 if (debug.on()) debug.log("WriteTask");
685 
686                 if (cancelled) {
687                     if (debug.on()) debug.log("handling cancellation");
688                     writeScheduler.stop();
689                     getOutgoing();
690                     return;
691                 }
692 
693                 if (checkRequestCancelled()) return;
694 
695                 if (subscriber == null) {
696                     if (debug.on()) debug.log("no subscriber yet");
697                     return;
698                 }
699 
700                 if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
701                 while (hasOutgoing() && demand.tryDecrement()) {
702                     DataPair dp = getOutgoing();
703                     if (dp == null)
704                         break;
705 
706                     if (dp.throwable != null) {
707                         if (debug.on()) debug.log("onError");
708                         // Do not call the subscriber's onError, it is not required.
709                         writeScheduler.stop();
710                     } else {
711                         List<ByteBuffer> data = dp.data;
712                         if (data == Http1BodySubscriber.COMPLETED) {
713                             synchronized (lock) {
714                                 assert state == State.COMPLETING : "Unexpected state:" + state;
715                                 state = State.COMPLETED;
716                             }
717                             if (debug.on())
718                                 debug.log("completed, stopping %s", writeScheduler);
719                             writeScheduler.stop();
720                             // Do nothing more. Just do not publish anything further.
721                             // The next Subscriber will eventually take over.
722 
723                         } else {
724                             if (checkRequestCancelled()) return;
725                             if (debug.on())
726                                 debug.log("onNext with " + Utils.remaining(data) + " bytes");
727                             subscriber.onNext(data);
728                         }
729                     }
730                 }
731             }
732         }
733 
734         final class Http1WriteSubscription implements Flow.Subscription {
735 
736             @Override
737             public void request(long n) {
738                 if (cancelled)
739                     return;  //no-op
740                 demand.increase(n);
741                 if (debug.on())
742                     debug.log("subscription request(%d), demand=%s", n, demand);
743                 writeScheduler.runOrSchedule(client.theExecutor());
744             }
745 
746             @Override
747             public void cancel() {
748                 if (debug.on()) debug.log("subscription cancelled");
749                 if (cancelled)
750                     return;  //no-op
751                 cancelled = true;
752                 writeScheduler.runOrSchedule(client.theExecutor());
753             }
754         }
755     }
756 
757     HttpClient client() {
758         return client;
759     }
760 
761     String dbgString() {
762         return "Http1Exchange";
763     }
764 }
765