1 /*
2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http;
27 
28 import javax.net.ssl.SSLContext;
29 import javax.net.ssl.SSLParameters;
30 import java.io.IOException;
31 import java.io.UncheckedIOException;
32 import java.lang.ref.Reference;
33 import java.lang.ref.WeakReference;
34 import java.net.Authenticator;
35 import java.net.ConnectException;
36 import java.net.CookieHandler;
37 import java.net.ProxySelector;
38 import java.net.http.HttpConnectTimeoutException;
39 import java.net.http.HttpTimeoutException;
40 import java.nio.ByteBuffer;
41 import java.nio.channels.CancelledKeyException;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.SelectableChannel;
44 import java.nio.channels.SelectionKey;
45 import java.nio.channels.Selector;
46 import java.nio.channels.SocketChannel;
47 import java.security.AccessControlContext;
48 import java.security.AccessController;
49 import java.security.NoSuchAlgorithmException;
50 import java.security.PrivilegedAction;
51 import java.time.Duration;
52 import java.time.Instant;
53 import java.time.temporal.ChronoUnit;
54 import java.util.ArrayList;
55 import java.util.HashSet;
56 import java.util.Iterator;
57 import java.util.LinkedList;
58 import java.util.List;
59 import java.util.Objects;
60 import java.util.Optional;
61 import java.util.Set;
62 import java.util.TreeSet;
63 import java.util.concurrent.CompletableFuture;
64 import java.util.concurrent.CompletionException;
65 import java.util.concurrent.ExecutionException;
66 import java.util.concurrent.Executor;
67 import java.util.concurrent.Executors;
68 import java.util.concurrent.ThreadFactory;
69 import java.util.concurrent.atomic.AtomicInteger;
70 import java.util.concurrent.atomic.AtomicLong;
71 import java.util.function.BooleanSupplier;
72 import java.util.stream.Stream;
73 import java.net.http.HttpClient;
74 import java.net.http.HttpRequest;
75 import java.net.http.HttpResponse;
76 import java.net.http.HttpResponse.BodyHandler;
77 import java.net.http.HttpResponse.PushPromiseHandler;
78 import java.net.http.WebSocket;
79 import jdk.internal.net.http.common.BufferSupplier;
80 import jdk.internal.net.http.common.Log;
81 import jdk.internal.net.http.common.Logger;
82 import jdk.internal.net.http.common.Pair;
83 import jdk.internal.net.http.common.Utils;
84 import jdk.internal.net.http.common.OperationTrackers.Trackable;
85 import jdk.internal.net.http.common.OperationTrackers.Tracker;
86 import jdk.internal.net.http.websocket.BuilderImpl;
87 import jdk.internal.misc.InnocuousThread;
88 
89 /**
90  * Client implementation. Contains all configuration information and also
91  * the selector manager thread which allows async events to be registered
92  * and delivered when they occur. See AsyncEvent.
93  */
94 final class HttpClientImpl extends HttpClient implements Trackable {
95 
96     static final boolean DEBUGELAPSED = Utils.TESTING || Utils.DEBUG;  // dev flag
97     static final boolean DEBUGTIMEOUT = false; // dev flag
98     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
99     final Logger debugelapsed = Utils.getDebugLogger(this::dbgString, DEBUGELAPSED);
100     final Logger debugtimeout = Utils.getDebugLogger(this::dbgString, DEBUGTIMEOUT);
101     static final AtomicLong CLIENT_IDS = new AtomicLong();
102 
103     // Define the default factory as a static inner class
104     // that embeds all the necessary logic to avoid
105     // the risk of using a lambda that might keep a reference on the
106     // HttpClient instance from which it was created (helps with
107     // heapdump analysis).
108     private static final class DefaultThreadFactory implements ThreadFactory {
109         private final String namePrefix;
110         private final AtomicInteger nextId = new AtomicInteger();
111 
DefaultThreadFactory(long clientID)112         DefaultThreadFactory(long clientID) {
113             namePrefix = "HttpClient-" + clientID + "-Worker-";
114         }
115 
116         @Override
newThread(Runnable r)117         public Thread newThread(Runnable r) {
118             String name = namePrefix + nextId.getAndIncrement();
119             Thread t;
120             if (System.getSecurityManager() == null) {
121                 t = new Thread(null, r, name, 0, false);
122             } else {
123                 t = InnocuousThread.newThread(name, r);
124             }
125             t.setDaemon(true);
126             return t;
127         }
128     }
129 
130     /**
131      * A DelegatingExecutor is an executor that delegates tasks to
132      * a wrapped executor when it detects that the current thread
133      * is the SelectorManager thread. If the current thread is not
134      * the selector manager thread the given task is executed inline.
135      */
136     final static class DelegatingExecutor implements Executor {
137         private final BooleanSupplier isInSelectorThread;
138         private final Executor delegate;
DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate)139         DelegatingExecutor(BooleanSupplier isInSelectorThread, Executor delegate) {
140             this.isInSelectorThread = isInSelectorThread;
141             this.delegate = delegate;
142         }
143 
delegate()144         Executor delegate() {
145             return delegate;
146         }
147 
148         @Override
execute(Runnable command)149         public void execute(Runnable command) {
150             if (isInSelectorThread.getAsBoolean()) {
151                 delegate.execute(command);
152             } else {
153                 command.run();
154             }
155         }
156     }
157 
158     private final CookieHandler cookieHandler;
159     private final Duration connectTimeout;
160     private final Redirect followRedirects;
161     private final Optional<ProxySelector> userProxySelector;
162     private final ProxySelector proxySelector;
163     private final Authenticator authenticator;
164     private final Version version;
165     private final ConnectionPool connections;
166     private final DelegatingExecutor delegatingExecutor;
167     private final boolean isDefaultExecutor;
168     // Security parameters
169     private final SSLContext sslContext;
170     private final SSLParameters sslParams;
171     private final SelectorManager selmgr;
172     private final FilterFactory filters;
173     private final Http2ClientImpl client2;
174     private final long id;
175     private final String dbgTag;
176 
177     // The SSL DirectBuffer Supplier provides the ability to recycle
178     // buffers used between the socket reader and the SSLEngine, or
179     // more precisely between the SocketTube publisher and the
180     // SSLFlowDelegate reader.
181     private final SSLDirectBufferSupplier sslBufferSupplier
182             = new SSLDirectBufferSupplier(this);
183 
184     // This reference is used to keep track of the facade HttpClient
185     // that was returned to the application code.
186     // It makes it possible to know when the application no longer
187     // holds any reference to the HttpClient.
188     // Unfortunately, this information is not enough to know when
189     // to exit the SelectorManager thread. Because of the asynchronous
190     // nature of the API, we also need to wait until all pending operations
191     // have completed.
192     private final WeakReference<HttpClientFacade> facadeRef;
193 
194     // This counter keeps track of the number of operations pending
195     // on the HttpClient. The SelectorManager thread will wait
196     // until there are no longer any pending operations and the
197     // facadeRef is cleared before exiting.
198     //
199     // The pendingOperationCount is incremented every time a send/sendAsync
200     // operation is invoked on the HttpClient, and is decremented when
201     // the HttpResponse<T> object is returned to the user.
202     // However, at this point, the body may not have been fully read yet.
203     // This is the case when the response T is implemented as a streaming
204     // subscriber (such as an InputStream).
205     //
206     // To take care of this issue the pendingOperationCount will additionally
207     // be incremented/decremented in the following cases:
208     //
209     // 1. For HTTP/2  it is incremented when a stream is added to the
210     //    Http2Connection streams map, and decreased when the stream is removed
211     //    from the map. This should also take care of push promises.
212     // 2. For WebSocket the count is increased when creating a
213     //    DetachedConnectionChannel for the socket, and decreased
214     //    when the channel is closed.
215     //    In addition, the HttpClient facade is passed to the WebSocket builder,
216     //    (instead of the client implementation delegate).
217     // 3. For HTTP/1.1 the count is incremented before starting to parse the body
218     //    response, and decremented when the parser has reached the end of the
219     //    response body flow.
220     //
221     // This should ensure that the selector manager thread remains alive until
222     // the response has been fully received or the web socket is closed.
223     private final AtomicLong pendingOperationCount = new AtomicLong();
224     private final AtomicLong pendingWebSocketCount = new AtomicLong();
225     private final AtomicLong pendingHttpRequestCount = new AtomicLong();
226     private final AtomicLong pendingHttp2StreamCount = new AtomicLong();
227 
228     /** A Set of, deadline first, ordered timeout events. */
229     private final TreeSet<TimeoutEvent> timeouts;
230 
231     /**
232      * This is a bit tricky:
233      * 1. an HttpClientFacade has a final HttpClientImpl field.
234      * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
235      *    where the referent is the facade created for that instance.
236      * 3. We cannot just create the HttpClientFacade in the HttpClientImpl
237      *    constructor, because it would be only weakly referenced and could
238      *    be GC'ed before we can return it.
239      * The solution is to use an instance of SingleFacadeFactory which will
240      * allow the caller of new HttpClientImpl(...) to retrieve the facade
241      * after the HttpClientImpl has been created.
242      */
243     private static final class SingleFacadeFactory {
244         HttpClientFacade facade;
createFacade(HttpClientImpl impl)245         HttpClientFacade createFacade(HttpClientImpl impl) {
246             assert facade == null;
247             return (facade = new HttpClientFacade(impl));
248         }
249     }
250 
create(HttpClientBuilderImpl builder)251     static HttpClientFacade create(HttpClientBuilderImpl builder) {
252         SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
253         HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
254         impl.start();
255         assert facadeFactory.facade != null;
256         assert impl.facadeRef.get() == facadeFactory.facade;
257         return facadeFactory.facade;
258     }
259 
HttpClientImpl(HttpClientBuilderImpl builder, SingleFacadeFactory facadeFactory)260     private HttpClientImpl(HttpClientBuilderImpl builder,
261                            SingleFacadeFactory facadeFactory) {
262         id = CLIENT_IDS.incrementAndGet();
263         dbgTag = "HttpClientImpl(" + id +")";
264         if (builder.sslContext == null) {
265             try {
266                 sslContext = SSLContext.getDefault();
267             } catch (NoSuchAlgorithmException ex) {
268                 throw new InternalError(ex);
269             }
270         } else {
271             sslContext = builder.sslContext;
272         }
273         Executor ex = builder.executor;
274         if (ex == null) {
275             ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
276             isDefaultExecutor = true;
277         } else {
278             isDefaultExecutor = false;
279         }
280         delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
281         facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
282         client2 = new Http2ClientImpl(this);
283         cookieHandler = builder.cookieHandler;
284         connectTimeout = builder.connectTimeout;
285         followRedirects = builder.followRedirects == null ?
286                 Redirect.NEVER : builder.followRedirects;
287         this.userProxySelector = Optional.ofNullable(builder.proxy);
288         this.proxySelector = userProxySelector
289                 .orElseGet(HttpClientImpl::getDefaultProxySelector);
290         if (debug.on())
291             debug.log("proxySelector is %s (user-supplied=%s)",
292                       this.proxySelector, userProxySelector.isPresent());
293         authenticator = builder.authenticator;
294         if (builder.version == null) {
295             version = HttpClient.Version.HTTP_2;
296         } else {
297             version = builder.version;
298         }
299         if (builder.sslParams == null) {
300             sslParams = getDefaultParams(sslContext);
301         } else {
302             sslParams = builder.sslParams;
303         }
304         connections = new ConnectionPool(id);
305         connections.start();
306         timeouts = new TreeSet<>();
307         try {
308             selmgr = new SelectorManager(this);
309         } catch (IOException e) {
310             // unlikely
311             throw new InternalError(e);
312         }
313         selmgr.setDaemon(true);
314         filters = new FilterFactory();
315         initFilters();
316         assert facadeRef.get() != null;
317     }
318 
start()319     private void start() {
320         selmgr.start();
321     }
322 
323     // Called from the SelectorManager thread, just before exiting.
324     // Clears the HTTP/1.1 and HTTP/2 cache, ensuring that the connections
325     // that may be still lingering there are properly closed (and their
326     // possibly still opened SocketChannel released).
stop()327     private void stop() {
328         // Clears HTTP/1.1 cache and close its connections
329         connections.stop();
330         // Clears HTTP/2 cache and close its connections.
331         client2.stop();
332     }
333 
getDefaultParams(SSLContext ctx)334     private static SSLParameters getDefaultParams(SSLContext ctx) {
335         SSLParameters params = ctx.getSupportedSSLParameters();
336         String[] protocols = params.getProtocols();
337         boolean found13 = false;
338         for (String proto : protocols) {
339             if (proto.equals("TLSv1.3")) {
340                 found13 = true;
341                 break;
342             }
343         }
344         if (found13)
345             params.setProtocols(new String[] {"TLSv1.3", "TLSv1.2"});
346         else
347             params.setProtocols(new String[] {"TLSv1.2"});
348         return params;
349     }
350 
getDefaultProxySelector()351     private static ProxySelector getDefaultProxySelector() {
352         PrivilegedAction<ProxySelector> action = ProxySelector::getDefault;
353         return AccessController.doPrivileged(action);
354     }
355 
356     // Returns the facade that was returned to the application code.
357     // May be null if that facade is no longer referenced.
facade()358     final HttpClientFacade facade() {
359         return facadeRef.get();
360     }
361 
362     // Increments the pendingOperationCount.
reference()363     final long reference() {
364         pendingHttpRequestCount.incrementAndGet();
365         return pendingOperationCount.incrementAndGet();
366     }
367 
368     // Decrements the pendingOperationCount.
unreference()369     final long unreference() {
370         final long count = pendingOperationCount.decrementAndGet();
371         final long httpCount = pendingHttpRequestCount.decrementAndGet();
372         final long http2Count = pendingHttp2StreamCount.get();
373         final long webSocketCount = pendingWebSocketCount.get();
374         if (count == 0 && facade() == null) {
375             selmgr.wakeupSelector();
376         }
377         assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
378         assert http2Count >= 0 : "count of HTTP/2 operations < 0";
379         assert webSocketCount >= 0 : "count of WS operations < 0";
380         assert count >= 0 : "count of pending operations < 0";
381         return count;
382     }
383 
384     // Increments the pendingOperationCount.
streamReference()385     final long streamReference() {
386         pendingHttp2StreamCount.incrementAndGet();
387         return pendingOperationCount.incrementAndGet();
388     }
389 
390     // Decrements the pendingOperationCount.
streamUnreference()391     final long streamUnreference() {
392         final long count = pendingOperationCount.decrementAndGet();
393         final long http2Count = pendingHttp2StreamCount.decrementAndGet();
394         final long httpCount = pendingHttpRequestCount.get();
395         final long webSocketCount = pendingWebSocketCount.get();
396         if (count == 0 && facade() == null) {
397             selmgr.wakeupSelector();
398         }
399         assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
400         assert http2Count >= 0 : "count of HTTP/2 operations < 0";
401         assert webSocketCount >= 0 : "count of WS operations < 0";
402         assert count >= 0 : "count of pending operations < 0";
403         return count;
404     }
405 
406     // Increments the pendingOperationCount.
webSocketOpen()407     final long webSocketOpen() {
408         pendingWebSocketCount.incrementAndGet();
409         return pendingOperationCount.incrementAndGet();
410     }
411 
412     // Decrements the pendingOperationCount.
webSocketClose()413     final long webSocketClose() {
414         final long count = pendingOperationCount.decrementAndGet();
415         final long webSocketCount = pendingWebSocketCount.decrementAndGet();
416         final long httpCount = pendingHttpRequestCount.get();
417         final long http2Count = pendingHttp2StreamCount.get();
418         if (count == 0 && facade() == null) {
419             selmgr.wakeupSelector();
420         }
421         assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
422         assert http2Count >= 0 : "count of HTTP/2 operations < 0";
423         assert webSocketCount >= 0 : "count of WS operations < 0";
424         assert count >= 0 : "count of pending operations < 0";
425         return count;
426     }
427 
428     // Returns the pendingOperationCount.
referenceCount()429     final long referenceCount() {
430         return pendingOperationCount.get();
431     }
432 
433     final static class HttpClientTracker implements Tracker {
434         final AtomicLong httpCount;
435         final AtomicLong http2Count;
436         final AtomicLong websocketCount;
437         final AtomicLong operationsCount;
438         final Reference<?> reference;
439         final String name;
HttpClientTracker(AtomicLong http, AtomicLong http2, AtomicLong ws, AtomicLong ops, Reference<?> ref, String name)440         HttpClientTracker(AtomicLong http,
441                           AtomicLong http2,
442                           AtomicLong ws,
443                           AtomicLong ops,
444                           Reference<?> ref,
445                           String name) {
446             this.httpCount = http;
447             this.http2Count = http2;
448             this.websocketCount = ws;
449             this.operationsCount = ops;
450             this.reference = ref;
451             this.name = name;
452         }
453         @Override
getOutstandingOperations()454         public long getOutstandingOperations() {
455             return operationsCount.get();
456         }
457         @Override
getOutstandingHttpOperations()458         public long getOutstandingHttpOperations() {
459             return httpCount.get();
460         }
461         @Override
getOutstandingHttp2Streams()462         public long getOutstandingHttp2Streams() { return http2Count.get(); }
463         @Override
getOutstandingWebSocketOperations()464         public long getOutstandingWebSocketOperations() {
465             return websocketCount.get();
466         }
467         @Override
isFacadeReferenced()468         public boolean isFacadeReferenced() {
469             return reference.get() != null;
470         }
471         @Override
getName()472         public String getName() {
473             return name;
474         }
475     }
476 
getOperationsTracker()477     public Tracker getOperationsTracker() {
478         return new HttpClientTracker(pendingHttpRequestCount,
479                 pendingHttp2StreamCount,
480                 pendingWebSocketCount,
481                 pendingOperationCount,
482                 facadeRef,
483                 dbgTag);
484     }
485 
486     // Called by the SelectorManager thread to figure out whether it's time
487     // to terminate.
isReferenced()488     final boolean isReferenced() {
489         HttpClient facade = facade();
490         return facade != null || referenceCount() > 0;
491     }
492 
493     /**
494      * Wait for activity on given exchange.
495      * The following occurs in the SelectorManager thread.
496      *
497      *  1) add to selector
498      *  2) If selector fires for this exchange then
499      *     call AsyncEvent.handle()
500      *
501      * If exchange needs to change interest ops, then call registerEvent() again.
502      */
registerEvent(AsyncEvent exchange)503     void registerEvent(AsyncEvent exchange) throws IOException {
504         selmgr.register(exchange);
505     }
506 
507     /**
508      * Allows an AsyncEvent to modify its interestOps.
509      * @param event The modified event.
510      */
eventUpdated(AsyncEvent event)511     void eventUpdated(AsyncEvent event) throws ClosedChannelException {
512         assert !(event instanceof AsyncTriggerEvent);
513         selmgr.eventUpdated(event);
514     }
515 
isSelectorThread()516     boolean isSelectorThread() {
517         return Thread.currentThread() == selmgr;
518     }
519 
client2()520     Http2ClientImpl client2() {
521         return client2;
522     }
523 
debugCompleted(String tag, long startNanos, HttpRequest req)524     private void debugCompleted(String tag, long startNanos, HttpRequest req) {
525         if (debugelapsed.on()) {
526             debugelapsed.log(tag + " elapsed "
527                     + (System.nanoTime() - startNanos)/1000_000L
528                     + " millis for " + req.method()
529                     + " to " + req.uri());
530         }
531     }
532 
533     @Override
534     public <T> HttpResponse<T>
send(HttpRequest req, BodyHandler<T> responseHandler)535     send(HttpRequest req, BodyHandler<T> responseHandler)
536         throws IOException, InterruptedException
537     {
538         CompletableFuture<HttpResponse<T>> cf = null;
539         try {
540             cf = sendAsync(req, responseHandler, null, null);
541             return cf.get();
542         } catch (InterruptedException ie) {
543             if (cf != null )
544                 cf.cancel(true);
545             throw ie;
546         } catch (ExecutionException e) {
547             final Throwable throwable = e.getCause();
548             final String msg = throwable.getMessage();
549 
550             if (throwable instanceof IllegalArgumentException) {
551                 throw new IllegalArgumentException(msg, throwable);
552             } else if (throwable instanceof SecurityException) {
553                 throw new SecurityException(msg, throwable);
554             } else if (throwable instanceof HttpConnectTimeoutException) {
555                 HttpConnectTimeoutException hcte = new HttpConnectTimeoutException(msg);
556                 hcte.initCause(throwable);
557                 throw hcte;
558             } else if (throwable instanceof HttpTimeoutException) {
559                 throw new HttpTimeoutException(msg);
560             } else if (throwable instanceof ConnectException) {
561                 ConnectException ce = new ConnectException(msg);
562                 ce.initCause(throwable);
563                 throw ce;
564             } else if (throwable instanceof IOException) {
565                 throw new IOException(msg, throwable);
566             } else {
567                 throw new IOException(msg, throwable);
568             }
569         }
570     }
571 
572     private static final Executor ASYNC_POOL = new CompletableFuture<Void>().defaultExecutor();
573 
574     @Override
575     public <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)576     sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
577     {
578         return sendAsync(userRequest, responseHandler, null);
579     }
580 
581     @Override
582     public <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler, PushPromiseHandler<T> pushPromiseHandler)583     sendAsync(HttpRequest userRequest,
584               BodyHandler<T> responseHandler,
585               PushPromiseHandler<T> pushPromiseHandler) {
586         return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
587     }
588 
589     private <T> CompletableFuture<HttpResponse<T>>
sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler, PushPromiseHandler<T> pushPromiseHandler, Executor exchangeExecutor)590     sendAsync(HttpRequest userRequest,
591               BodyHandler<T> responseHandler,
592               PushPromiseHandler<T> pushPromiseHandler,
593               Executor exchangeExecutor)    {
594 
595         Objects.requireNonNull(userRequest);
596         Objects.requireNonNull(responseHandler);
597 
598         AccessControlContext acc = null;
599         if (System.getSecurityManager() != null)
600             acc = AccessController.getContext();
601 
602         // Clone the, possibly untrusted, HttpRequest
603         HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);
604         if (requestImpl.method().equals("CONNECT"))
605             throw new IllegalArgumentException("Unsupported method CONNECT");
606 
607         long start = DEBUGELAPSED ? System.nanoTime() : 0;
608         reference();
609         try {
610             if (debugelapsed.on())
611                 debugelapsed.log("ClientImpl (async) send %s", userRequest);
612 
613             // When using sendAsync(...) we explicitly pass the
614             // executor's delegate as exchange executor to force
615             // asynchronous scheduling of the exchange.
616             // When using send(...) we don't specify any executor
617             // and default to using the client's delegating executor
618             // which only spawns asynchronous tasks if it detects
619             // that the current thread is the selector manager
620             // thread. This will cause everything to execute inline
621             // until we need to schedule some event with the selector.
622             Executor executor = exchangeExecutor == null
623                     ? this.delegatingExecutor : exchangeExecutor;
624 
625             MultiExchange<T> mex = new MultiExchange<>(userRequest,
626                                                             requestImpl,
627                                                             this,
628                                                             responseHandler,
629                                                             pushPromiseHandler,
630                                                             acc);
631             CompletableFuture<HttpResponse<T>> res =
632                     mex.responseAsync(executor).whenComplete((b,t) -> unreference());
633             if (DEBUGELAPSED) {
634                 res = res.whenComplete(
635                         (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
636             }
637 
638             // makes sure that any dependent actions happen in the CF default
639             // executor. This is only needed for sendAsync(...), when
640             // exchangeExecutor is non-null.
641             if (exchangeExecutor != null) {
642                 res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
643             }
644             return res;
645         } catch(Throwable t) {
646             unreference();
647             debugCompleted("ClientImpl (async)", start, userRequest);
648             throw t;
649         }
650     }
651 
652     // Main loop for this client's selector
653     private final static class SelectorManager extends Thread {
654 
655         // For testing purposes we have an internal System property that
656         // can control the frequency at which the selector manager will wake
657         // up when there are no pending operations.
658         // Increasing the frequency (shorter delays) might allow the selector
659         // to observe that the facade is no longer referenced and might allow
660         // the selector thread to terminate more timely - for when nothing is
661         // ongoing it will only check for that condition every NODEADLINE ms.
662         // To avoid misuse of the property, the delay that can be specified
663         // is comprised between [MIN_NODEADLINE, MAX_NODEADLINE], and its default
664         // value if unspecified (or <= 0) is DEF_NODEADLINE = 3000ms
665         // The property is -Djdk.internal.httpclient.selectorTimeout=<millis>
666         private static final int MIN_NODEADLINE = 1000; // ms
667         private static final int MAX_NODEADLINE = 1000 * 1200; // ms
668         private static final int DEF_NODEADLINE = 3000; // ms
669         private static final long NODEADLINE; // default is DEF_NODEADLINE ms
670         static {
671             // ensure NODEADLINE is initialized with some valid value.
672             long deadline =  Utils.getIntegerProperty(
673                 "jdk.internal.httpclient.selectorTimeout",
674                 DEF_NODEADLINE); // millis
675             if (deadline <= 0) deadline = DEF_NODEADLINE;
676             deadline = Math.max(deadline, MIN_NODEADLINE);
677             NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
678         }
679 
680         private final Selector selector;
681         private volatile boolean closed;
682         private final List<AsyncEvent> registrations;
683         private final List<AsyncTriggerEvent> deregistrations;
684         private final Logger debug;
685         private final Logger debugtimeout;
686         HttpClientImpl owner;
687         ConnectionPool pool;
688 
SelectorManager(HttpClientImpl ref)689         SelectorManager(HttpClientImpl ref) throws IOException {
690             super(null, null,
691                   "HttpClient-" + ref.id + "-SelectorManager",
692                   0, false);
693             owner = ref;
694             debug = ref.debug;
695             debugtimeout = ref.debugtimeout;
696             pool = ref.connectionPool();
697             registrations = new ArrayList<>();
698             deregistrations = new ArrayList<>();
699             selector = Selector.open();
700         }
701 
eventUpdated(AsyncEvent e)702         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
703             if (Thread.currentThread() == this) {
704                 SelectionKey key = e.channel().keyFor(selector);
705                 if (key != null && key.isValid()) {
706                     SelectorAttachment sa = (SelectorAttachment) key.attachment();
707                     sa.register(e);
708                 } else if (e.interestOps() != 0){
709                     // We don't care about paused events.
710                     // These are actually handled by
711                     // SelectorAttachment::resetInterestOps later on.
712                     // But if we reach here when trying to resume an
713                     // event then it's better to fail fast.
714                     if (debug.on()) debug.log("No key for channel");
715                     e.abort(new IOException("No key for channel"));
716                 }
717             } else {
718                 register(e);
719             }
720         }
721 
722         // This returns immediately. So caller not allowed to send/receive
723         // on connection.
register(AsyncEvent e)724         synchronized void register(AsyncEvent e) {
725             registrations.add(e);
726             selector.wakeup();
727         }
728 
cancel(SocketChannel e)729         synchronized void cancel(SocketChannel e) {
730             SelectionKey key = e.keyFor(selector);
731             if (key != null) {
732                 key.cancel();
733             }
734             selector.wakeup();
735         }
736 
wakeupSelector()737         void wakeupSelector() {
738             selector.wakeup();
739         }
740 
shutdown()741         synchronized void shutdown() {
742             Log.logTrace("{0}: shutting down", getName());
743             if (debug.on()) debug.log("SelectorManager shutting down");
744             closed = true;
745             try {
746                 selector.close();
747             } catch (IOException ignored) {
748             } finally {
749                 owner.stop();
750             }
751         }
752 
753         @Override
run()754         public void run() {
755             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
756             List<AsyncEvent> readyList = new ArrayList<>();
757             List<Runnable> resetList = new ArrayList<>();
758             try {
759                 if (Log.channel()) Log.logChannel(getName() + ": starting");
760                 while (!Thread.currentThread().isInterrupted()) {
761                     synchronized (this) {
762                         assert errorList.isEmpty();
763                         assert readyList.isEmpty();
764                         assert resetList.isEmpty();
765                         for (AsyncTriggerEvent event : deregistrations) {
766                             event.handle();
767                         }
768                         deregistrations.clear();
769                         for (AsyncEvent event : registrations) {
770                             if (event instanceof AsyncTriggerEvent) {
771                                 readyList.add(event);
772                                 continue;
773                             }
774                             SelectableChannel chan = event.channel();
775                             SelectionKey key = null;
776                             try {
777                                 key = chan.keyFor(selector);
778                                 SelectorAttachment sa;
779                                 if (key == null || !key.isValid()) {
780                                     if (key != null) {
781                                         // key is canceled.
782                                         // invoke selectNow() to purge it
783                                         // before registering the new event.
784                                         selector.selectNow();
785                                     }
786                                     sa = new SelectorAttachment(chan, selector);
787                                 } else {
788                                     sa = (SelectorAttachment) key.attachment();
789                                 }
790                                 // may throw IOE if channel closed: that's OK
791                                 sa.register(event);
792                                 if (!chan.isOpen()) {
793                                     throw new IOException("Channel closed");
794                                 }
795                             } catch (IOException e) {
796                                 Log.logTrace("{0}: {1}", getName(), e);
797                                 if (debug.on())
798                                     debug.log("Got " + e.getClass().getName()
799                                               + " while handling registration events");
800                                 chan.close();
801                                 // let the event abort deal with it
802                                 errorList.add(new Pair<>(event, e));
803                                 if (key != null) {
804                                     key.cancel();
805                                     selector.selectNow();
806                                 }
807                             }
808                         }
809                         registrations.clear();
810                         selector.selectedKeys().clear();
811                     }
812 
813                     for (AsyncEvent event : readyList) {
814                         assert event instanceof AsyncTriggerEvent;
815                         event.handle();
816                     }
817                     readyList.clear();
818 
819                     for (Pair<AsyncEvent,IOException> error : errorList) {
820                         // an IOException was raised and the channel closed.
821                         handleEvent(error.first, error.second);
822                     }
823                     errorList.clear();
824 
825                     // Check whether client is still alive, and if not,
826                     // gracefully stop this thread
827                     if (!owner.isReferenced()) {
828                         Log.logTrace("{0}: {1}",
829                                 getName(),
830                                 "HttpClient no longer referenced. Exiting...");
831                         return;
832                     }
833 
834                     // Timeouts will have milliseconds granularity. It is important
835                     // to handle them in a timely fashion.
836                     long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
837                     if (debugtimeout.on())
838                         debugtimeout.log("next timeout: %d", nextTimeout);
839 
840                     // Keep-alive have seconds granularity. It's not really an
841                     // issue if we keep connections linger a bit more in the keep
842                     // alive cache.
843                     long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
844                     if (debugtimeout.on())
845                         debugtimeout.log("next expired: %d", nextExpiry);
846 
847                     assert nextTimeout >= 0;
848                     assert nextExpiry >= 0;
849 
850                     // Don't wait for ever as it might prevent the thread to
851                     // stop gracefully. millis will be 0 if no deadline was found.
852                     if (nextTimeout <= 0) nextTimeout = NODEADLINE;
853 
854                     // Clip nextExpiry at NODEADLINE limit. The default
855                     // keep alive is 1200 seconds (half an hour) - we don't
856                     // want to wait that long.
857                     if (nextExpiry <= 0) nextExpiry = NODEADLINE;
858                     else nextExpiry = Math.min(NODEADLINE, nextExpiry);
859 
860                     // takes the least of the two.
861                     long millis = Math.min(nextExpiry, nextTimeout);
862 
863                     if (debugtimeout.on())
864                         debugtimeout.log("Next deadline is %d",
865                                          (millis == 0 ? NODEADLINE : millis));
866                     //debugPrint(selector);
867                     int n = selector.select(millis == 0 ? NODEADLINE : millis);
868                     if (n == 0) {
869                         // Check whether client is still alive, and if not,
870                         // gracefully stop this thread
871                         if (!owner.isReferenced()) {
872                             Log.logTrace("{0}: {1}",
873                                     getName(),
874                                     "HttpClient no longer referenced. Exiting...");
875                             return;
876                         }
877                         owner.purgeTimeoutsAndReturnNextDeadline();
878                         continue;
879                     }
880 
881                     Set<SelectionKey> keys = selector.selectedKeys();
882                     assert errorList.isEmpty();
883 
884                     for (SelectionKey key : keys) {
885                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
886                         if (!key.isValid()) {
887                             IOException ex = sa.chan.isOpen()
888                                     ? new IOException("Invalid key")
889                                     : new ClosedChannelException();
890                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
891                             sa.pending.clear();
892                             continue;
893                         }
894 
895                         int eventsOccurred;
896                         try {
897                             eventsOccurred = key.readyOps();
898                         } catch (CancelledKeyException ex) {
899                             IOException io = Utils.getIOException(ex);
900                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
901                             sa.pending.clear();
902                             continue;
903                         }
904                         sa.events(eventsOccurred).forEach(readyList::add);
905                         resetList.add(() -> sa.resetInterestOps(eventsOccurred));
906                     }
907 
908                     selector.selectNow(); // complete cancellation
909                     selector.selectedKeys().clear();
910 
911                     // handle selected events
912                     readyList.forEach((e) -> handleEvent(e, null));
913                     readyList.clear();
914 
915                     // handle errors (closed channels etc...)
916                     errorList.forEach((p) -> handleEvent(p.first, p.second));
917                     errorList.clear();
918 
919                     // reset interest ops for selected channels
920                     resetList.forEach(r -> r.run());
921                     resetList.clear();
922 
923                 }
924             } catch (Throwable e) {
925                 if (!closed) {
926                     // This terminates thread. So, better just print stack trace
927                     String err = Utils.stackTrace(e);
928                     Log.logError("{0}: {1}: {2}", getName(),
929                             "HttpClientImpl shutting down due to fatal error", err);
930                 }
931                 if (debug.on()) debug.log("shutting down", e);
932                 if (Utils.ASSERTIONSENABLED && !debug.on()) {
933                     e.printStackTrace(System.err); // always print the stack
934                 }
935             } finally {
936                 if (Log.channel()) Log.logChannel(getName() + ": stopping");
937                 shutdown();
938             }
939         }
940 
941 //        void debugPrint(Selector selector) {
942 //            System.err.println("Selector: debugprint start");
943 //            Set<SelectionKey> keys = selector.keys();
944 //            for (SelectionKey key : keys) {
945 //                SelectableChannel c = key.channel();
946 //                int ops = key.interestOps();
947 //                System.err.printf("selector chan:%s ops:%d\n", c, ops);
948 //            }
949 //            System.err.println("Selector: debugprint end");
950 //        }
951 
952         /** Handles the given event. The given ioe may be null. */
handleEvent(AsyncEvent event, IOException ioe)953         void handleEvent(AsyncEvent event, IOException ioe) {
954             if (closed || ioe != null) {
955                 event.abort(ioe);
956             } else {
957                 event.handle();
958             }
959         }
960     }
961 
debugInterestOps(SelectableChannel channel)962     final String debugInterestOps(SelectableChannel channel) {
963         try {
964             SelectionKey key = channel.keyFor(selmgr.selector);
965             if (key == null) return "channel not registered with selector";
966             String keyInterestOps = key.isValid()
967                     ? "key.interestOps=" + key.interestOps() : "invalid key";
968             return String.format("channel registered with selector, %s, sa.interestOps=%s",
969                                  keyInterestOps,
970                                  ((SelectorAttachment)key.attachment()).interestOps);
971         } catch (Throwable t) {
972             return String.valueOf(t);
973         }
974     }
975 
976     /**
977      * Tracks multiple user level registrations associated with one NIO
978      * registration (SelectionKey). In this implementation, registrations
979      * are one-off and when an event is posted the registration is cancelled
980      * until explicitly registered again.
981      *
982      * <p> No external synchronization required as this class is only used
983      * by the SelectorManager thread. One of these objects required per
984      * connection.
985      */
986     private static class SelectorAttachment {
987         private final SelectableChannel chan;
988         private final Selector selector;
989         private final Set<AsyncEvent> pending;
990         private final static Logger debug =
991                 Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG);
992         private int interestOps;
993 
SelectorAttachment(SelectableChannel chan, Selector selector)994         SelectorAttachment(SelectableChannel chan, Selector selector) {
995             this.pending = new HashSet<>();
996             this.chan = chan;
997             this.selector = selector;
998         }
999 
register(AsyncEvent e)1000         void register(AsyncEvent e) throws ClosedChannelException {
1001             int newOps = e.interestOps();
1002             // re register interest if we are not already interested
1003             // in the event. If the event is paused, then the pause will
1004             // be taken into account later when resetInterestOps is called.
1005             boolean reRegister = (interestOps & newOps) != newOps;
1006             interestOps |= newOps;
1007             pending.add(e);
1008             if (debug.on())
1009                 debug.log("Registering %s for %d (%s)", e, newOps, reRegister);
1010             if (reRegister) {
1011                 // first time registration happens here also
1012                 try {
1013                     chan.register(selector, interestOps, this);
1014                 } catch (Throwable x) {
1015                     abortPending(x);
1016                 }
1017             } else if (!chan.isOpen()) {
1018                 abortPending(new ClosedChannelException());
1019             }
1020         }
1021 
1022         /**
1023          * Returns a Stream<AsyncEvents> containing only events that are
1024          * registered with the given {@code interestOps}.
1025          */
events(int interestOps)1026         Stream<AsyncEvent> events(int interestOps) {
1027             return pending.stream()
1028                     .filter(ev -> (ev.interestOps() & interestOps) != 0);
1029         }
1030 
1031         /**
1032          * Removes any events with the given {@code interestOps}, and if no
1033          * events remaining, cancels the associated SelectionKey.
1034          */
resetInterestOps(int interestOps)1035         void resetInterestOps(int interestOps) {
1036             int newOps = 0;
1037 
1038             Iterator<AsyncEvent> itr = pending.iterator();
1039             while (itr.hasNext()) {
1040                 AsyncEvent event = itr.next();
1041                 int evops = event.interestOps();
1042                 if (event.repeating()) {
1043                     newOps |= evops;
1044                     continue;
1045                 }
1046                 if ((evops & interestOps) != 0) {
1047                     itr.remove();
1048                 } else {
1049                     newOps |= evops;
1050                 }
1051             }
1052 
1053             this.interestOps = newOps;
1054             SelectionKey key = chan.keyFor(selector);
1055             if (newOps == 0 && key != null && pending.isEmpty()) {
1056                 key.cancel();
1057             } else {
1058                 try {
1059                     if (key == null || !key.isValid()) {
1060                         throw new CancelledKeyException();
1061                     }
1062                     key.interestOps(newOps);
1063                     // double check after
1064                     if (!chan.isOpen()) {
1065                         abortPending(new ClosedChannelException());
1066                         return;
1067                     }
1068                     assert key.interestOps() == newOps;
1069                 } catch (CancelledKeyException x) {
1070                     // channel may have been closed
1071                     if (debug.on()) debug.log("key cancelled for " + chan);
1072                     abortPending(x);
1073                 }
1074             }
1075         }
1076 
abortPending(Throwable x)1077         void abortPending(Throwable x) {
1078             if (!pending.isEmpty()) {
1079                 AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
1080                 pending.clear();
1081                 IOException io = Utils.getIOException(x);
1082                 for (AsyncEvent event : evts) {
1083                     event.abort(io);
1084                 }
1085             }
1086         }
1087     }
1088 
theSSLContext()1089     /*package-private*/ SSLContext theSSLContext() {
1090         return sslContext;
1091     }
1092 
1093     @Override
sslContext()1094     public SSLContext sslContext() {
1095         return sslContext;
1096     }
1097 
1098     @Override
sslParameters()1099     public SSLParameters sslParameters() {
1100         return Utils.copySSLParameters(sslParams);
1101     }
1102 
1103     @Override
authenticator()1104     public Optional<Authenticator> authenticator() {
1105         return Optional.ofNullable(authenticator);
1106     }
1107 
theExecutor()1108     /*package-private*/ final DelegatingExecutor theExecutor() {
1109         return delegatingExecutor;
1110     }
1111 
1112     @Override
executor()1113     public final Optional<Executor> executor() {
1114         return isDefaultExecutor
1115                 ? Optional.empty()
1116                 : Optional.of(delegatingExecutor.delegate());
1117     }
1118 
connectionPool()1119     ConnectionPool connectionPool() {
1120         return connections;
1121     }
1122 
1123     @Override
followRedirects()1124     public Redirect followRedirects() {
1125         return followRedirects;
1126     }
1127 
1128 
1129     @Override
cookieHandler()1130     public Optional<CookieHandler> cookieHandler() {
1131         return Optional.ofNullable(cookieHandler);
1132     }
1133 
1134     @Override
connectTimeout()1135     public Optional<Duration> connectTimeout() {
1136         return Optional.ofNullable(connectTimeout);
1137     }
1138 
1139     @Override
proxy()1140     public Optional<ProxySelector> proxy() {
1141         return this.userProxySelector;
1142     }
1143 
1144     // Return the effective proxy that this client uses.
proxySelector()1145     ProxySelector proxySelector() {
1146         return proxySelector;
1147     }
1148 
1149     @Override
newWebSocketBuilder()1150     public WebSocket.Builder newWebSocketBuilder() {
1151         // Make sure to pass the HttpClientFacade to the WebSocket builder.
1152         // This will ensure that the facade is not released before the
1153         // WebSocket has been created, at which point the pendingOperationCount
1154         // will have been incremented by the RawChannelTube.
1155         // See RawChannelTube.
1156         return new BuilderImpl(this.facade(), proxySelector);
1157     }
1158 
1159     @Override
version()1160     public Version version() {
1161         return version;
1162     }
1163 
dbgString()1164     String dbgString() {
1165         return dbgTag;
1166     }
1167 
1168     @Override
toString()1169     public String toString() {
1170         // Used by tests to get the client's id and compute the
1171         // name of the SelectorManager thread.
1172         return super.toString() + ("(" + id + ")");
1173     }
1174 
initFilters()1175     private void initFilters() {
1176         addFilter(AuthenticationFilter.class);
1177         addFilter(RedirectFilter.class);
1178         if (this.cookieHandler != null) {
1179             addFilter(CookieFilter.class);
1180         }
1181     }
1182 
addFilter(Class<? extends HeaderFilter> f)1183     private void addFilter(Class<? extends HeaderFilter> f) {
1184         filters.addFilter(f);
1185     }
1186 
filterChain()1187     final LinkedList<HeaderFilter> filterChain() {
1188         return filters.getFilterChain();
1189     }
1190 
1191     // Timer controls.
1192     // Timers are implemented through timed Selector.select() calls.
1193 
registerTimer(TimeoutEvent event)1194     synchronized void registerTimer(TimeoutEvent event) {
1195         Log.logTrace("Registering timer {0}", event);
1196         timeouts.add(event);
1197         selmgr.wakeupSelector();
1198     }
1199 
cancelTimer(TimeoutEvent event)1200     synchronized void cancelTimer(TimeoutEvent event) {
1201         Log.logTrace("Canceling timer {0}", event);
1202         timeouts.remove(event);
1203     }
1204 
1205     /**
1206      * Purges ( handles ) timer events that have passed their deadline, and
1207      * returns the amount of time, in milliseconds, until the next earliest
1208      * event. A return value of 0 means that there are no events.
1209      */
purgeTimeoutsAndReturnNextDeadline()1210     private long purgeTimeoutsAndReturnNextDeadline() {
1211         long diff = 0L;
1212         List<TimeoutEvent> toHandle = null;
1213         int remaining = 0;
1214         // enter critical section to retrieve the timeout event to handle
1215         synchronized(this) {
1216             if (timeouts.isEmpty()) return 0L;
1217 
1218             Instant now = Instant.now();
1219             Iterator<TimeoutEvent> itr = timeouts.iterator();
1220             while (itr.hasNext()) {
1221                 TimeoutEvent event = itr.next();
1222                 diff = now.until(event.deadline(), ChronoUnit.MILLIS);
1223                 if (diff <= 0) {
1224                     itr.remove();
1225                     toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
1226                     toHandle.add(event);
1227                 } else {
1228                     break;
1229                 }
1230             }
1231             remaining = timeouts.size();
1232         }
1233 
1234         // can be useful for debugging
1235         if (toHandle != null && Log.trace()) {
1236             Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
1237                     +  toHandle.size() + " events, "
1238                     + "remaining " + remaining
1239                     + ", next deadline: " + (diff < 0 ? 0L : diff));
1240         }
1241 
1242         // handle timeout events out of critical section
1243         if (toHandle != null) {
1244             Throwable failed = null;
1245             for (TimeoutEvent event : toHandle) {
1246                 try {
1247                    Log.logTrace("Firing timer {0}", event);
1248                    event.handle();
1249                 } catch (Error | RuntimeException e) {
1250                     // Not expected. Handle remaining events then throw...
1251                     // If e is an OOME or SOE it might simply trigger a new
1252                     // error from here - but in this case there's not much we
1253                     // could do anyway. Just let it flow...
1254                     if (failed == null) failed = e;
1255                     else failed.addSuppressed(e);
1256                     Log.logTrace("Failed to handle event {0}: {1}", event, e);
1257                 }
1258             }
1259             if (failed instanceof Error) throw (Error) failed;
1260             if (failed instanceof RuntimeException) throw (RuntimeException) failed;
1261         }
1262 
1263         // return time to wait until next event. 0L if there's no more events.
1264         return diff < 0 ? 0L : diff;
1265     }
1266 
1267     // used for the connection window
getReceiveBufferSize()1268     int getReceiveBufferSize() {
1269         return Utils.getIntegerNetProperty(
1270                 "jdk.httpclient.receiveBufferSize",
1271                 0 // only set the size if > 0
1272         );
1273     }
1274 
1275     // Optimization for reading SSL encrypted data
1276     // --------------------------------------------
1277 
1278     // Returns a BufferSupplier that can be used for reading
1279     // encrypted bytes of the channel. These buffers can then
1280     // be recycled by the SSLFlowDelegate::Reader after their
1281     // content has been copied in the SSLFlowDelegate::Reader
1282     // readBuf.
1283     // Because allocating, reading, copying, and recycling
1284     // all happen in the SelectorManager thread,
1285     // then this BufferSupplier can be shared between all
1286     // the SSL connections managed by this client.
getSSLBufferSupplier()1287     BufferSupplier getSSLBufferSupplier() {
1288         return sslBufferSupplier;
1289     }
1290 
1291     // An implementation of BufferSupplier that manages a pool of
1292     // maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that
1293     // are used for reading encrypted bytes off the channel before
1294     // copying and subsequent unwrapping.
1295     private static final class SSLDirectBufferSupplier implements BufferSupplier {
1296         private static final int POOL_SIZE = SocketTube.MAX_BUFFERS;
1297         private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE];
1298         private final HttpClientImpl client;
1299         private final Logger debug;
1300         private int tail, count; // no need for volatile: only accessed in SM thread.
1301 
SSLDirectBufferSupplier(HttpClientImpl client)1302         SSLDirectBufferSupplier(HttpClientImpl client) {
1303             this.client = Objects.requireNonNull(client);
1304             this.debug = client.debug;
1305         }
1306 
1307         // Gets a buffer from the pool, or allocates a new one if needed.
1308         @Override
get()1309         public ByteBuffer get() {
1310             assert client.isSelectorThread();
1311             assert tail <= POOL_SIZE : "allocate tail is " + tail;
1312             ByteBuffer buf;
1313             if (tail == 0) {
1314                 if (debug.on()) {
1315                     // should not appear more than SocketTube.MAX_BUFFERS
1316                     debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
1317                 }
1318                 assert count++ < POOL_SIZE : "trying to allocate more than "
1319                             + POOL_SIZE + " buffers";
1320                 buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
1321             } else {
1322                 assert tail > 0 : "non positive tail value: " + tail;
1323                 tail--;
1324                 buf = pool[tail];
1325                 pool[tail] = null;
1326             }
1327             assert buf.isDirect();
1328             assert buf.position() == 0;
1329             assert buf.hasRemaining();
1330             assert buf.limit() == Utils.BUFSIZE;
1331             assert tail < POOL_SIZE;
1332             assert tail >= 0;
1333             return buf;
1334         }
1335 
1336         // Returns the given buffer to the pool.
1337         @Override
recycle(ByteBuffer buffer)1338         public void recycle(ByteBuffer buffer) {
1339             assert client.isSelectorThread();
1340             assert buffer.isDirect();
1341             assert !buffer.hasRemaining();
1342             assert tail < POOL_SIZE : "recycle tail is " + tail;
1343             assert tail >= 0;
1344             buffer.position(0);
1345             buffer.limit(buffer.capacity());
1346             // don't fail if assertions are off. we have asserted above.
1347             if (tail < POOL_SIZE) {
1348                 pool[tail] = buffer;
1349                 tail++;
1350             }
1351             assert tail <= POOL_SIZE;
1352             assert tail > 0;
1353         }
1354     }
1355 
1356 }
1357