1 /* 2 * Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.Closeable; 29 import java.io.IOException; 30 import java.net.InetSocketAddress; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.SocketChannel; 33 import java.util.Arrays; 34 import java.util.IdentityHashMap; 35 import java.util.List; 36 import java.util.Map; 37 import java.util.TreeMap; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.CompletionStage; 40 import java.util.concurrent.ConcurrentLinkedDeque; 41 import java.util.concurrent.Flow; 42 import java.util.function.BiPredicate; 43 import java.util.function.Predicate; 44 import java.net.http.HttpClient; 45 import java.net.http.HttpClient.Version; 46 import java.net.http.HttpHeaders; 47 import jdk.internal.net.http.common.Demand; 48 import jdk.internal.net.http.common.FlowTube; 49 import jdk.internal.net.http.common.Logger; 50 import jdk.internal.net.http.common.SequentialScheduler; 51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; 52 import jdk.internal.net.http.common.Log; 53 import jdk.internal.net.http.common.Utils; 54 import static java.net.http.HttpClient.Version.HTTP_2; 55 56 /** 57 * Wraps socket channel layer and takes care of SSL also. 58 * 59 * Subtypes are: 60 * PlainHttpConnection: regular direct TCP connection to server 61 * PlainProxyConnection: plain text proxy connection 62 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server 63 * AsyncSSLConnection: TLS channel direct to server 64 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel 65 */ 66 abstract class HttpConnection implements Closeable { 67 68 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 69 final static Logger DEBUG_LOGGER = Utils.getDebugLogger( 70 () -> "HttpConnection(SocketTube(?))", Utils.DEBUG); 71 72 /** The address this connection is connected to. Could be a server or a proxy. */ 73 final InetSocketAddress address; 74 private final HttpClientImpl client; 75 private final TrailingOperations trailingOperations; 76 HttpConnection(InetSocketAddress address, HttpClientImpl client)77 HttpConnection(InetSocketAddress address, HttpClientImpl client) { 78 this.address = address; 79 this.client = client; 80 trailingOperations = new TrailingOperations(); 81 } 82 83 private static final class TrailingOperations { 84 private final Map<CompletionStage<?>, Boolean> operations = 85 new IdentityHashMap<>(); add(CompletionStage<?> cf)86 void add(CompletionStage<?> cf) { 87 synchronized(operations) { 88 operations.put(cf, Boolean.TRUE); 89 cf.whenComplete((r,t)-> remove(cf)); 90 } 91 } remove(CompletionStage<?> cf)92 boolean remove(CompletionStage<?> cf) { 93 synchronized(operations) { 94 return operations.remove(cf); 95 } 96 } 97 } 98 addTrailingOperation(CompletionStage<?> cf)99 final void addTrailingOperation(CompletionStage<?> cf) { 100 trailingOperations.add(cf); 101 } 102 103 // final void removeTrailingOperation(CompletableFuture<?> cf) { 104 // trailingOperations.remove(cf); 105 // } 106 client()107 final HttpClientImpl client() { 108 return client; 109 } 110 111 /** 112 * Initiates the connect phase. 113 * 114 * Returns a CompletableFuture that completes when the underlying 115 * TCP connection has been established or an error occurs. 116 */ connectAsync(Exchange<?> exchange)117 public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange); 118 119 /** 120 * Finishes the connection phase. 121 * 122 * Returns a CompletableFuture that completes when any additional, 123 * type specific, setup has been done. Must be called after connectAsync. */ finishConnect()124 public abstract CompletableFuture<Void> finishConnect(); 125 126 /** Tells whether, or not, this connection is connected to its destination. */ connected()127 abstract boolean connected(); 128 129 /** Tells whether, or not, this connection is secure ( over SSL ) */ isSecure()130 abstract boolean isSecure(); 131 132 /** 133 * Tells whether, or not, this connection is proxied. 134 * Returns true for tunnel connections, or clear connection to 135 * any host through proxy. 136 */ isProxied()137 abstract boolean isProxied(); 138 139 /** Tells whether, or not, this connection is open. */ isOpen()140 final boolean isOpen() { 141 return channel().isOpen() && 142 (connected() ? !getConnectionFlow().isFinished() : true); 143 } 144 145 interface HttpPublisher extends FlowTube.TubePublisher { enqueue(List<ByteBuffer> buffers)146 void enqueue(List<ByteBuffer> buffers) throws IOException; enqueueUnordered(List<ByteBuffer> buffers)147 void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; signalEnqueued()148 void signalEnqueued() throws IOException; 149 } 150 151 /** 152 * Returns the HTTP publisher associated with this connection. May be null 153 * if invoked before connecting. 154 */ publisher()155 abstract HttpPublisher publisher(); 156 157 // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS 158 private static final Predicate<String> testRequiredHTTP2TLSVersion = proto -> 159 proto.equals("TLSv1.2") || proto.equals("TLSv1.3"); 160 161 /** 162 * Returns true if the given client's SSL parameter protocols contains at 163 * least one TLS version that HTTP/2 requires. 164 */ hasRequiredHTTP2TLSVersion(HttpClient client)165 private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) { 166 String[] protos = client.sslParameters().getProtocols(); 167 if (protos != null) { 168 return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent(); 169 } else { 170 return false; 171 } 172 } 173 174 /** 175 * Factory for retrieving HttpConnections. A connection can be retrieved 176 * from the connection pool, or a new one created if none available. 177 * 178 * The given {@code addr} is the ultimate destination. Any proxies, 179 * etc, are determined from the request. Returns a concrete instance which 180 * is one of the following: 181 * {@link PlainHttpConnection} 182 * {@link PlainTunnelingConnection} 183 * 184 * The returned connection, if not from the connection pool, must have its, 185 * connect() or connectAsync() method invoked, which ( when it completes 186 * successfully ) renders the connection usable for requests. 187 */ getConnection(InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request, Version version)188 public static HttpConnection getConnection(InetSocketAddress addr, 189 HttpClientImpl client, 190 HttpRequestImpl request, 191 Version version) { 192 // The default proxy selector may select a proxy whose address is 193 // unresolved. We must resolve the address before connecting to it. 194 InetSocketAddress proxy = Utils.resolveAddress(request.proxy()); 195 HttpConnection c = null; 196 boolean secure = request.secure(); 197 ConnectionPool pool = client.connectionPool(); 198 199 if (!secure) { 200 c = pool.getConnection(false, addr, proxy); 201 if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { 202 final HttpConnection conn = c; 203 if (DEBUG_LOGGER.on()) 204 DEBUG_LOGGER.log(conn.getConnectionFlow() 205 + ": plain connection retrieved from HTTP/1.1 pool"); 206 return c; 207 } else { 208 return getPlainConnection(addr, proxy, request, client); 209 } 210 } else { // secure 211 if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool 212 c = pool.getConnection(true, addr, proxy); 213 } 214 if (c != null && c.isOpen()) { 215 final HttpConnection conn = c; 216 if (DEBUG_LOGGER.on()) 217 DEBUG_LOGGER.log(conn.getConnectionFlow() 218 + ": SSL connection retrieved from HTTP/1.1 pool"); 219 return c; 220 } else { 221 String[] alpn = null; 222 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) { 223 alpn = new String[] { "h2", "http/1.1" }; 224 } 225 return getSSLConnection(addr, proxy, alpn, request, client); 226 } 227 } 228 } 229 getSSLConnection(InetSocketAddress addr, InetSocketAddress proxy, String[] alpn, HttpRequestImpl request, HttpClientImpl client)230 private static HttpConnection getSSLConnection(InetSocketAddress addr, 231 InetSocketAddress proxy, 232 String[] alpn, 233 HttpRequestImpl request, 234 HttpClientImpl client) { 235 if (proxy != null) 236 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy, 237 proxyTunnelHeaders(request)); 238 else 239 return new AsyncSSLConnection(addr, client, alpn); 240 } 241 242 /** 243 * This method is used to build a filter that will accept or 244 * veto (header-name, value) tuple for transmission on the 245 * wire. 246 * The filter is applied to the headers when sending the headers 247 * to the remote party. 248 * Which tuple is accepted/vetoed depends on: 249 * <pre> 250 * - whether the connection is a tunnel connection 251 * [talking to a server through a proxy tunnel] 252 * - whether the method is CONNECT 253 * [establishing a CONNECT tunnel through a proxy] 254 * - whether the request is using a proxy 255 * (and the connection is not a tunnel) 256 * [talking to a server through a proxy] 257 * - whether the request is a direct connection to 258 * a server (no tunnel, no proxy). 259 * </pre> 260 * @param request 261 * @return 262 */ headerFilter(HttpRequestImpl request)263 BiPredicate<String,String> headerFilter(HttpRequestImpl request) { 264 if (isTunnel()) { 265 // talking to a server through a proxy tunnel 266 // don't send proxy-* headers to a plain server 267 assert !request.isConnect(); 268 return Utils.NO_PROXY_HEADERS_FILTER; 269 } else if (request.isConnect()) { 270 // establishing a proxy tunnel 271 // check for proxy tunnel disabled schemes 272 // assert !this.isTunnel(); 273 assert request.proxy() == null; 274 return Utils.PROXY_TUNNEL_FILTER; 275 } else if (request.proxy() != null) { 276 // talking to a server through a proxy (no tunnel) 277 // check for proxy disabled schemes 278 // assert !isTunnel() && !request.isConnect(); 279 return Utils.PROXY_FILTER; 280 } else { 281 // talking to a server directly (no tunnel, no proxy) 282 // don't send proxy-* headers to a plain server 283 // assert request.proxy() == null && !request.isConnect(); 284 return Utils.NO_PROXY_HEADERS_FILTER; 285 } 286 } 287 contextRestricted(HttpRequestImpl request, HttpClient client)288 BiPredicate<String,String> contextRestricted(HttpRequestImpl request, HttpClient client) { 289 if (!isTunnel() && request.isConnect()) { 290 // establishing a proxy tunnel 291 assert request.proxy() == null; 292 return Utils.PROXY_TUNNEL_RESTRICTED(client); 293 } else { 294 return Utils.CONTEXT_RESTRICTED(client); 295 } 296 } 297 298 // Composes a new immutable HttpHeaders that combines the 299 // user and system header but only keeps those headers that 300 // start with "proxy-" proxyTunnelHeaders(HttpRequestImpl request)301 private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) { 302 Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); 303 combined.putAll(request.getSystemHeadersBuilder().map()); 304 combined.putAll(request.headers().map()); // let user override system 305 306 // keep only proxy-* - and also strip authorization headers 307 // for disabled schemes 308 return HttpHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER); 309 } 310 311 /* Returns either a plain HTTP connection or a plain tunnelling connection 312 * for proxied WebSocket */ getPlainConnection(InetSocketAddress addr, InetSocketAddress proxy, HttpRequestImpl request, HttpClientImpl client)313 private static HttpConnection getPlainConnection(InetSocketAddress addr, 314 InetSocketAddress proxy, 315 HttpRequestImpl request, 316 HttpClientImpl client) { 317 if (request.isWebSocket() && proxy != null) 318 return new PlainTunnelingConnection(addr, proxy, client, 319 proxyTunnelHeaders(request)); 320 321 if (proxy == null) 322 return new PlainHttpConnection(addr, client); 323 else 324 return new PlainProxyConnection(proxy, client); 325 } 326 closeOrReturnToCache(HttpHeaders hdrs)327 void closeOrReturnToCache(HttpHeaders hdrs) { 328 if (hdrs == null) { 329 // the connection was closed by server, eof 330 Log.logTrace("Cannot return connection to pool: closing {0}", this); 331 close(); 332 return; 333 } 334 HttpClientImpl client = client(); 335 if (client == null) { 336 Log.logTrace("Client released: closing {0}", this); 337 close(); 338 return; 339 } 340 ConnectionPool pool = client.connectionPool(); 341 boolean keepAlive = hdrs.firstValue("Connection") 342 .map((s) -> !s.equalsIgnoreCase("close")) 343 .orElse(true); 344 345 if (keepAlive && isOpen()) { 346 Log.logTrace("Returning connection to the pool: {0}", this); 347 pool.returnToPool(this); 348 } else { 349 Log.logTrace("Closing connection (keepAlive={0}, isOpen={1}): {2}", 350 keepAlive, isOpen(), this); 351 close(); 352 } 353 } 354 355 /* Tells whether or not this connection is a tunnel through a proxy */ isTunnel()356 boolean isTunnel() { return false; } 357 channel()358 abstract SocketChannel channel(); 359 address()360 final InetSocketAddress address() { 361 return address; 362 } 363 cacheKey()364 abstract ConnectionPool.CacheKey cacheKey(); 365 366 /** 367 * Closes this connection, by returning the socket to its connection pool. 368 */ 369 @Override close()370 public abstract void close(); 371 getConnectionFlow()372 abstract FlowTube getConnectionFlow(); 373 374 /** 375 * A publisher that makes it possible to publish (write) ordered (normal 376 * priority) and unordered (high priority) buffers downstream. 377 */ 378 final class PlainHttpPublisher implements HttpPublisher { 379 final Object reading; PlainHttpPublisher()380 PlainHttpPublisher() { 381 this(new Object()); 382 } PlainHttpPublisher(Object readingLock)383 PlainHttpPublisher(Object readingLock) { 384 this.reading = readingLock; 385 } 386 final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); 387 final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>(); 388 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 389 volatile HttpWriteSubscription subscription; 390 final SequentialScheduler writeScheduler = 391 new SequentialScheduler(this::flushTask); 392 @Override subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber)393 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 394 synchronized (reading) { 395 //assert this.subscription == null; 396 //assert this.subscriber == null; 397 if (subscription == null) { 398 subscription = new HttpWriteSubscription(); 399 } 400 this.subscriber = subscriber; 401 } 402 // TODO: should we do this in the flow? 403 subscriber.onSubscribe(subscription); 404 signal(); 405 } 406 flushTask(DeferredCompleter completer)407 void flushTask(DeferredCompleter completer) { 408 try { 409 HttpWriteSubscription sub = subscription; 410 if (sub != null) sub.flush(); 411 } finally { 412 completer.complete(); 413 } 414 } 415 signal()416 void signal() { 417 writeScheduler.runOrSchedule(); 418 } 419 420 final class HttpWriteSubscription implements Flow.Subscription { 421 final Demand demand = new Demand(); 422 423 @Override request(long n)424 public void request(long n) { 425 if (n <= 0) throw new IllegalArgumentException("non-positive request"); 426 demand.increase(n); 427 if (debug.on()) 428 debug.log("HttpPublisher: got request of " + n + " from " 429 + getConnectionFlow()); 430 writeScheduler.runOrSchedule(); 431 } 432 433 @Override cancel()434 public void cancel() { 435 if (debug.on()) 436 debug.log("HttpPublisher: cancelled by " + getConnectionFlow()); 437 } 438 isEmpty()439 private boolean isEmpty() { 440 return queue.isEmpty() && priority.isEmpty(); 441 } 442 poll()443 private List<ByteBuffer> poll() { 444 List<ByteBuffer> elem = priority.poll(); 445 return elem == null ? queue.poll() : elem; 446 } 447 flush()448 void flush() { 449 while (!isEmpty() && demand.tryDecrement()) { 450 List<ByteBuffer> elem = poll(); 451 if (debug.on()) 452 debug.log("HttpPublisher: sending " 453 + Utils.remaining(elem) + " bytes (" 454 + elem.size() + " buffers) to " 455 + getConnectionFlow()); 456 subscriber.onNext(elem); 457 } 458 } 459 } 460 461 @Override enqueue(List<ByteBuffer> buffers)462 public void enqueue(List<ByteBuffer> buffers) throws IOException { 463 queue.add(buffers); 464 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 465 debug.log("added %d bytes to the write queue", bytes); 466 } 467 468 @Override enqueueUnordered(List<ByteBuffer> buffers)469 public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { 470 // Unordered frames are sent before existing frames. 471 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); 472 priority.add(buffers); 473 debug.log("added %d bytes in the priority write queue", bytes); 474 } 475 476 @Override signalEnqueued()477 public void signalEnqueued() throws IOException { 478 debug.log("signalling the publisher of the write queue"); 479 signal(); 480 } 481 } 482 483 String dbgTag; dbgString()484 final String dbgString() { 485 FlowTube flow = getConnectionFlow(); 486 String tag = dbgTag; 487 if (tag == null && flow != null) { 488 dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; 489 } else if (tag == null) { 490 tag = this.getClass().getSimpleName() + "(?)"; 491 } 492 return tag; 493 } 494 495 @Override toString()496 public String toString() { 497 return "HttpConnection: " + channel().toString(); 498 } 499 } 500