1 /*
2  * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http;
27 
28 import java.io.Closeable;
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.SocketChannel;
33 import java.util.Arrays;
34 import java.util.IdentityHashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.TreeMap;
38 import java.util.concurrent.CompletableFuture;
39 import java.util.concurrent.CompletionStage;
40 import java.util.concurrent.ConcurrentLinkedDeque;
41 import java.util.concurrent.Flow;
42 import java.util.function.BiPredicate;
43 import java.util.function.Predicate;
44 import java.net.http.HttpClient;
45 import java.net.http.HttpClient.Version;
46 import java.net.http.HttpHeaders;
47 import jdk.internal.net.http.common.Demand;
48 import jdk.internal.net.http.common.FlowTube;
49 import jdk.internal.net.http.common.Logger;
50 import jdk.internal.net.http.common.SequentialScheduler;
51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
52 import jdk.internal.net.http.common.Log;
53 import jdk.internal.net.http.common.Utils;
54 import static java.net.http.HttpClient.Version.HTTP_2;
55 
56 /**
57  * Wraps socket channel layer and takes care of SSL also.
58  *
59  * Subtypes are:
60  *      PlainHttpConnection: regular direct TCP connection to server
61  *      PlainProxyConnection: plain text proxy connection
62  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
63  *      AsyncSSLConnection: TLS channel direct to server
64  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
65  */
66 abstract class HttpConnection implements Closeable {
67 
68     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
69     final static Logger DEBUG_LOGGER = Utils.getDebugLogger(
70             () -> "HttpConnection(SocketTube(?))", Utils.DEBUG);
71 
72     /** The address this connection is connected to. Could be a server or a proxy. */
73     final InetSocketAddress address;
74     private final HttpClientImpl client;
75     private final TrailingOperations trailingOperations;
76 
HttpConnection(InetSocketAddress address, HttpClientImpl client)77     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
78         this.address = address;
79         this.client = client;
80         trailingOperations = new TrailingOperations();
81     }
82 
83     private static final class TrailingOperations {
84         private final Map<CompletionStage<?>, Boolean> operations =
85                 new IdentityHashMap<>();
add(CompletionStage<?> cf)86         void add(CompletionStage<?> cf) {
87             synchronized(operations) {
88                 operations.put(cf, Boolean.TRUE);
89                 cf.whenComplete((r,t)-> remove(cf));
90             }
91         }
remove(CompletionStage<?> cf)92         boolean remove(CompletionStage<?> cf) {
93             synchronized(operations) {
94                 return operations.remove(cf);
95             }
96         }
97     }
98 
addTrailingOperation(CompletionStage<?> cf)99     final void addTrailingOperation(CompletionStage<?> cf) {
100         trailingOperations.add(cf);
101     }
102 
103 //    final void removeTrailingOperation(CompletableFuture<?> cf) {
104 //        trailingOperations.remove(cf);
105 //    }
106 
client()107     final HttpClientImpl client() {
108         return client;
109     }
110 
111     /**
112      * Initiates the connect phase.
113      *
114      * Returns a CompletableFuture that completes when the underlying
115      * TCP connection has been established or an error occurs.
116      */
connectAsync(Exchange<?> exchange)117     public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange);
118 
119     /**
120      * Finishes the connection phase.
121      *
122      * Returns a CompletableFuture that completes when any additional,
123      * type specific, setup has been done. Must be called after connectAsync. */
finishConnect()124     public abstract CompletableFuture<Void> finishConnect();
125 
126     /** Tells whether, or not, this connection is connected to its destination. */
connected()127     abstract boolean connected();
128 
129     /** Tells whether, or not, this connection is secure ( over SSL ) */
isSecure()130     abstract boolean isSecure();
131 
132     /**
133      * Tells whether, or not, this connection is proxied.
134      * Returns true for tunnel connections, or clear connection to
135      * any host through proxy.
136      */
isProxied()137     abstract boolean isProxied();
138 
139     /** Tells whether, or not, this connection is open. */
isOpen()140     final boolean isOpen() {
141         return channel().isOpen() &&
142                 (connected() ? !getConnectionFlow().isFinished() : true);
143     }
144 
145     interface HttpPublisher extends FlowTube.TubePublisher {
enqueue(List<ByteBuffer> buffers)146         void enqueue(List<ByteBuffer> buffers) throws IOException;
enqueueUnordered(List<ByteBuffer> buffers)147         void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
signalEnqueued()148         void signalEnqueued() throws IOException;
149     }
150 
151     /**
152      * Returns the HTTP publisher associated with this connection.  May be null
153      * if invoked before connecting.
154      */
publisher()155     abstract HttpPublisher publisher();
156 
157     // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS
158     private static final Predicate<String> testRequiredHTTP2TLSVersion = proto ->
159             proto.equals("TLSv1.2") || proto.equals("TLSv1.3");
160 
161    /**
162     * Returns true if the given client's SSL parameter protocols contains at
163     * least one TLS version that HTTP/2 requires.
164     */
hasRequiredHTTP2TLSVersion(HttpClient client)165    private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) {
166        String[] protos = client.sslParameters().getProtocols();
167        if (protos != null) {
168            return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent();
169        } else {
170            return false;
171        }
172    }
173 
174     /**
175      * Factory for retrieving HttpConnections. A connection can be retrieved
176      * from the connection pool, or a new one created if none available.
177      *
178      * The given {@code addr} is the ultimate destination. Any proxies,
179      * etc, are determined from the request. Returns a concrete instance which
180      * is one of the following:
181      *      {@link PlainHttpConnection}
182      *      {@link PlainTunnelingConnection}
183      *
184      * The returned connection, if not from the connection pool, must have its,
185      * connect() or connectAsync() method invoked, which ( when it completes
186      * successfully ) renders the connection usable for requests.
187      */
getConnection(InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request, Version version)188     public static HttpConnection getConnection(InetSocketAddress addr,
189                                                HttpClientImpl client,
190                                                HttpRequestImpl request,
191                                                Version version) {
192         // The default proxy selector may select a proxy whose  address is
193         // unresolved. We must resolve the address before connecting to it.
194         InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
195         HttpConnection c = null;
196         boolean secure = request.secure();
197         ConnectionPool pool = client.connectionPool();
198 
199         if (!secure) {
200             c = pool.getConnection(false, addr, proxy);
201             if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
202                 final HttpConnection conn = c;
203                 if (DEBUG_LOGGER.on())
204                     DEBUG_LOGGER.log(conn.getConnectionFlow()
205                                      + ": plain connection retrieved from HTTP/1.1 pool");
206                 return c;
207             } else {
208                 return getPlainConnection(addr, proxy, request, client);
209             }
210         } else {  // secure
211             if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
212                 c = pool.getConnection(true, addr, proxy);
213             }
214             if (c != null && c.isOpen()) {
215                 final HttpConnection conn = c;
216                 if (DEBUG_LOGGER.on())
217                     DEBUG_LOGGER.log(conn.getConnectionFlow()
218                                      + ": SSL connection retrieved from HTTP/1.1 pool");
219                 return c;
220             } else {
221                 String[] alpn = null;
222                 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
223                     alpn = new String[] { "h2", "http/1.1" };
224                 }
225                 return getSSLConnection(addr, proxy, alpn, request, client);
226             }
227         }
228     }
229 
getSSLConnection(InetSocketAddress addr, InetSocketAddress proxy, String[] alpn, HttpRequestImpl request, HttpClientImpl client)230     private static HttpConnection getSSLConnection(InetSocketAddress addr,
231                                                    InetSocketAddress proxy,
232                                                    String[] alpn,
233                                                    HttpRequestImpl request,
234                                                    HttpClientImpl client) {
235         if (proxy != null)
236             return new AsyncSSLTunnelConnection(addr, client, alpn, proxy,
237                                                 proxyTunnelHeaders(request));
238         else
239             return new AsyncSSLConnection(addr, client, alpn);
240     }
241 
242     /**
243      * This method is used to build a filter that will accept or
244      * veto (header-name, value) tuple for transmission on the
245      * wire.
246      * The filter is applied to the headers when sending the headers
247      * to the remote party.
248      * Which tuple is accepted/vetoed depends on:
249      * <pre>
250      *    - whether the connection is a tunnel connection
251      *      [talking to a server through a proxy tunnel]
252      *    - whether the method is CONNECT
253      *      [establishing a CONNECT tunnel through a proxy]
254      *    - whether the request is using a proxy
255      *      (and the connection is not a tunnel)
256      *      [talking to a server through a proxy]
257      *    - whether the request is a direct connection to
258      *      a server (no tunnel, no proxy).
259      * </pre>
260      * @param request
261      * @return
262      */
headerFilter(HttpRequestImpl request)263     BiPredicate<String,String> headerFilter(HttpRequestImpl request) {
264         if (isTunnel()) {
265             // talking to a server through a proxy tunnel
266             // don't send proxy-* headers to a plain server
267             assert !request.isConnect();
268             return Utils.NO_PROXY_HEADERS_FILTER;
269         } else if (request.isConnect()) {
270             // establishing a proxy tunnel
271             // check for proxy tunnel disabled schemes
272             // assert !this.isTunnel();
273             assert request.proxy() == null;
274             return Utils.PROXY_TUNNEL_FILTER;
275         } else if (request.proxy() != null) {
276             // talking to a server through a proxy (no tunnel)
277             // check for proxy disabled schemes
278             // assert !isTunnel() && !request.isConnect();
279             return Utils.PROXY_FILTER;
280         } else {
281             // talking to a server directly (no tunnel, no proxy)
282             // don't send proxy-* headers to a plain server
283             // assert request.proxy() == null && !request.isConnect();
284             return Utils.NO_PROXY_HEADERS_FILTER;
285         }
286     }
287 
contextRestricted(HttpRequestImpl request, HttpClient client)288     BiPredicate<String,String> contextRestricted(HttpRequestImpl request, HttpClient client) {
289         if (!isTunnel() && request.isConnect()) {
290             // establishing a proxy tunnel
291             assert request.proxy() == null;
292             return Utils.PROXY_TUNNEL_RESTRICTED(client);
293         } else {
294             return Utils.CONTEXT_RESTRICTED(client);
295         }
296     }
297 
298     // Composes a new immutable HttpHeaders that combines the
299     // user and system header but only keeps those headers that
300     // start with "proxy-"
proxyTunnelHeaders(HttpRequestImpl request)301     private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) {
302         Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
303         combined.putAll(request.getSystemHeadersBuilder().map());
304         combined.putAll(request.headers().map()); // let user override system
305 
306         // keep only proxy-* - and also strip authorization headers
307         // for disabled schemes
308         return HttpHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER);
309     }
310 
311     /* Returns either a plain HTTP connection or a plain tunnelling connection
312      * for proxied WebSocket */
getPlainConnection(InetSocketAddress addr, InetSocketAddress proxy, HttpRequestImpl request, HttpClientImpl client)313     private static HttpConnection getPlainConnection(InetSocketAddress addr,
314                                                      InetSocketAddress proxy,
315                                                      HttpRequestImpl request,
316                                                      HttpClientImpl client) {
317         if (request.isWebSocket() && proxy != null)
318             return new PlainTunnelingConnection(addr, proxy, client,
319                                                 proxyTunnelHeaders(request));
320 
321         if (proxy == null)
322             return new PlainHttpConnection(addr, client);
323         else
324             return new PlainProxyConnection(proxy, client);
325     }
326 
closeOrReturnToCache(HttpHeaders hdrs)327     void closeOrReturnToCache(HttpHeaders hdrs) {
328         if (hdrs == null) {
329             // the connection was closed by server, eof
330             Log.logTrace("Cannot return connection to pool: closing {0}", this);
331             close();
332             return;
333         }
334         HttpClientImpl client = client();
335         if (client == null) {
336             Log.logTrace("Client released: closing {0}", this);
337             close();
338             return;
339         }
340         ConnectionPool pool = client.connectionPool();
341         boolean keepAlive = hdrs.firstValue("Connection")
342                 .map((s) -> !s.equalsIgnoreCase("close"))
343                 .orElse(true);
344 
345         if (keepAlive && isOpen()) {
346             Log.logTrace("Returning connection to the pool: {0}", this);
347             pool.returnToPool(this);
348         } else {
349             Log.logTrace("Closing connection (keepAlive={0}, isOpen={1}): {2}",
350                     keepAlive, isOpen(), this);
351             close();
352         }
353     }
354 
355     /* Tells whether or not this connection is a tunnel through a proxy */
isTunnel()356     boolean isTunnel() { return false; }
357 
channel()358     abstract SocketChannel channel();
359 
address()360     final InetSocketAddress address() {
361         return address;
362     }
363 
cacheKey()364     abstract ConnectionPool.CacheKey cacheKey();
365 
366     /**
367      * Closes this connection, by returning the socket to its connection pool.
368      */
369     @Override
close()370     public abstract void close();
371 
getConnectionFlow()372     abstract FlowTube getConnectionFlow();
373 
374     /**
375      * A publisher that makes it possible to publish (write) ordered (normal
376      * priority) and unordered (high priority) buffers downstream.
377      */
378     final class PlainHttpPublisher implements HttpPublisher {
379         final Object reading;
PlainHttpPublisher()380         PlainHttpPublisher() {
381             this(new Object());
382         }
PlainHttpPublisher(Object readingLock)383         PlainHttpPublisher(Object readingLock) {
384             this.reading = readingLock;
385         }
386         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
387         final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>();
388         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
389         volatile HttpWriteSubscription subscription;
390         final SequentialScheduler writeScheduler =
391                     new SequentialScheduler(this::flushTask);
392         @Override
subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber)393         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
394             synchronized (reading) {
395                 //assert this.subscription == null;
396                 //assert this.subscriber == null;
397                 if (subscription == null) {
398                     subscription = new HttpWriteSubscription();
399                 }
400                 this.subscriber = subscriber;
401             }
402             // TODO: should we do this in the flow?
403             subscriber.onSubscribe(subscription);
404             signal();
405         }
406 
flushTask(DeferredCompleter completer)407         void flushTask(DeferredCompleter completer) {
408             try {
409                 HttpWriteSubscription sub = subscription;
410                 if (sub != null) sub.flush();
411             } finally {
412                 completer.complete();
413             }
414         }
415 
signal()416         void signal() {
417             writeScheduler.runOrSchedule();
418         }
419 
420         final class HttpWriteSubscription implements Flow.Subscription {
421             final Demand demand = new Demand();
422 
423             @Override
request(long n)424             public void request(long n) {
425                 if (n <= 0) throw new IllegalArgumentException("non-positive request");
426                 demand.increase(n);
427                 if (debug.on())
428                     debug.log("HttpPublisher: got request of "  + n + " from "
429                                + getConnectionFlow());
430                 writeScheduler.runOrSchedule();
431             }
432 
433             @Override
cancel()434             public void cancel() {
435                 if (debug.on())
436                     debug.log("HttpPublisher: cancelled by " + getConnectionFlow());
437             }
438 
isEmpty()439             private boolean isEmpty() {
440                 return queue.isEmpty() && priority.isEmpty();
441             }
442 
poll()443             private List<ByteBuffer> poll() {
444                 List<ByteBuffer> elem = priority.poll();
445                 return elem == null ? queue.poll() : elem;
446             }
447 
flush()448             void flush() {
449                 while (!isEmpty() && demand.tryDecrement()) {
450                     List<ByteBuffer> elem = poll();
451                     if (debug.on())
452                         debug.log("HttpPublisher: sending "
453                                     + Utils.remaining(elem) + " bytes ("
454                                     + elem.size() + " buffers) to "
455                                     + getConnectionFlow());
456                     subscriber.onNext(elem);
457                 }
458             }
459         }
460 
461         @Override
enqueue(List<ByteBuffer> buffers)462         public void enqueue(List<ByteBuffer> buffers) throws IOException {
463             queue.add(buffers);
464             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
465             debug.log("added %d bytes to the write queue", bytes);
466         }
467 
468         @Override
enqueueUnordered(List<ByteBuffer> buffers)469         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
470             // Unordered frames are sent before existing frames.
471             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
472             priority.add(buffers);
473             debug.log("added %d bytes in the priority write queue", bytes);
474         }
475 
476         @Override
signalEnqueued()477         public void signalEnqueued() throws IOException {
478             debug.log("signalling the publisher of the write queue");
479             signal();
480         }
481     }
482 
483     String dbgTag;
dbgString()484     final String dbgString() {
485         FlowTube flow = getConnectionFlow();
486         String tag = dbgTag;
487         if (tag == null && flow != null) {
488             dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
489         } else if (tag == null) {
490             tag = this.getClass().getSimpleName() + "(?)";
491         }
492         return tag;
493     }
494 
495     @Override
toString()496     public String toString() {
497         return "HttpConnection: " + channel().toString();
498     }
499 }
500