1 /*
2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package jdk.internal.net.http;
27 
28 import java.io.IOException;
29 import java.lang.System.Logger.Level;
30 import java.net.InetSocketAddress;
31 import java.net.ProxySelector;
32 import java.net.URI;
33 import java.net.URISyntaxException;
34 import java.net.URLPermission;
35 import java.security.AccessControlContext;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.CompletableFuture;
39 import java.util.concurrent.Executor;
40 import java.util.function.Function;
41 import java.net.http.HttpClient;
42 import java.net.http.HttpHeaders;
43 import java.net.http.HttpResponse;
44 import java.net.http.HttpTimeoutException;
45 
46 import jdk.internal.net.http.common.Logger;
47 import jdk.internal.net.http.common.MinimalFuture;
48 import jdk.internal.net.http.common.Utils;
49 import jdk.internal.net.http.common.Log;
50 
51 import static jdk.internal.net.http.common.Utils.permissionForProxy;
52 
53 /**
54  * One request/response exchange (handles 100/101 intermediate response also).
55  * depth field used to track number of times a new request is being sent
56  * for a given API request. If limit exceeded exception is thrown.
57  *
58  * Security check is performed here:
59  * - uses AccessControlContext captured at API level
60  * - checks for appropriate URLPermission for request
61  * - if permission allowed, grants equivalent SocketPermission to call
62  * - in case of direct HTTP proxy, checks additionally for access to proxy
63  *    (CONNECT proxying uses its own Exchange, so check done there)
64  *
65  */
66 final class Exchange<T> {
67 
68     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
69 
70     final HttpRequestImpl request;
71     final HttpClientImpl client;
72     volatile ExchangeImpl<T> exchImpl;
73     volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
74     volatile CompletableFuture<Void> bodyIgnored;
75 
76     // used to record possible cancellation raised before the exchImpl
77     // has been established.
78     private volatile IOException failed;
79     final AccessControlContext acc;
80     final MultiExchange<T> multi;
81     final Executor parentExecutor;
82     boolean upgrading; // to HTTP/2
83     final PushGroup<T> pushGroup;
84     final String dbgTag;
85 
86     // Keeps track of the underlying connection when establishing an HTTP/2
87     // exchange so that it can be aborted/timed out mid setup.
88     final ConnectionAborter connectionAborter = new ConnectionAborter();
89 
Exchange(HttpRequestImpl request, MultiExchange<T> multi)90     Exchange(HttpRequestImpl request, MultiExchange<T> multi) {
91         this.request = request;
92         this.upgrading = false;
93         this.client = multi.client();
94         this.multi = multi;
95         this.acc = multi.acc;
96         this.parentExecutor = multi.executor;
97         this.pushGroup = multi.pushGroup;
98         this.dbgTag = "Exchange";
99     }
100 
101     /* If different AccessControlContext to be used  */
Exchange(HttpRequestImpl request, MultiExchange<T> multi, AccessControlContext acc)102     Exchange(HttpRequestImpl request,
103              MultiExchange<T> multi,
104              AccessControlContext acc)
105     {
106         this.request = request;
107         this.acc = acc;
108         this.upgrading = false;
109         this.client = multi.client();
110         this.multi = multi;
111         this.parentExecutor = multi.executor;
112         this.pushGroup = multi.pushGroup;
113         this.dbgTag = "Exchange";
114     }
115 
getPushGroup()116     PushGroup<T> getPushGroup() {
117         return pushGroup;
118     }
119 
executor()120     Executor executor() {
121         return parentExecutor;
122     }
123 
request()124     public HttpRequestImpl request() {
125         return request;
126     }
127 
client()128     HttpClientImpl client() {
129         return client;
130     }
131 
132     // Keeps track of the underlying connection when establishing an HTTP/2
133     // exchange so that it can be aborted/timed out mid setup.
134     static final class ConnectionAborter {
135         private volatile HttpConnection connection;
136 
connection(HttpConnection connection)137         void connection(HttpConnection connection) {
138             this.connection = connection;
139         }
140 
closeConnection()141         void closeConnection() {
142             HttpConnection connection = this.connection;
143             this.connection = null;
144             if (connection != null) {
145                 try {
146                     connection.close();
147                 } catch (Throwable t) {
148                     // ignore
149                 }
150             }
151         }
152     }
153 
readBodyAsync(HttpResponse.BodyHandler<T> handler)154     public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
155         // If we received a 407 while establishing the exchange
156         // there will be no body to read: bodyIgnored will be true,
157         // and exchImpl will be null (if we were trying to establish
158         // an HTTP/2 tunnel through an HTTP/1.1 proxy)
159         if (bodyIgnored != null) return MinimalFuture.completedFuture(null);
160 
161         // The connection will not be returned to the pool in the case of WebSocket
162         return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor)
163                 .whenComplete((r,t) -> exchImpl.completed());
164     }
165 
166     /**
167      * Called after a redirect or similar kind of retry where a body might
168      * be sent but we don't want it. Should send a RESET in h2. For http/1.1
169      * we can consume small quantity of data, or close the connection in
170      * other cases.
171      */
ignoreBody()172     public CompletableFuture<Void> ignoreBody() {
173         if (bodyIgnored != null) return bodyIgnored;
174         return exchImpl.ignoreBody();
175     }
176 
177     /**
178      * Called when a new exchange is created to replace this exchange.
179      * At this point it is guaranteed that readBody/readBodyAsync will
180      * not be called.
181      */
released()182     public void released() {
183         ExchangeImpl<?> impl = exchImpl;
184         if (impl != null) impl.released();
185         // Don't set exchImpl to null here. We need to keep
186         // it alive until it's replaced by a Stream in wrapForUpgrade.
187         // Setting it to null here might get it GC'ed too early, because
188         // the Http1Response is now only weakly referenced by the Selector.
189     }
190 
cancel()191     public void cancel() {
192         // cancel can be called concurrently before or at the same time
193         // that the exchange impl is being established.
194         // In that case we won't be able to propagate the cancellation
195         // right away
196         if (exchImpl != null) {
197             exchImpl.cancel();
198         } else {
199             // no impl - can't cancel impl yet.
200             // call cancel(IOException) instead which takes care
201             // of race conditions between impl/cancel.
202             cancel(new IOException("Request cancelled"));
203         }
204     }
205 
cancel(IOException cause)206     public void cancel(IOException cause) {
207         if (debug.on()) debug.log("cancel exchImpl: %s, with \"%s\"", exchImpl, cause);
208         // If the impl is non null, propagate the exception right away.
209         // Otherwise record it so that it can be propagated once the
210         // exchange impl has been established.
211         ExchangeImpl<?> impl = exchImpl;
212         if (impl != null) {
213             // propagate the exception to the impl
214             if (debug.on()) debug.log("Cancelling exchImpl: %s", exchImpl);
215             impl.cancel(cause);
216         } else {
217             // no impl yet. record the exception
218             failed = cause;
219 
220             // abort/close the connection if setting up the exchange. This can
221             // be important when setting up HTTP/2
222             connectionAborter.closeConnection();
223 
224             // now call checkCancelled to recheck the impl.
225             // if the failed state is set and the impl is not null, reset
226             // the failed state and propagate the exception to the impl.
227             checkCancelled();
228         }
229     }
230 
231     // This method will raise an exception if one was reported and if
232     // it is possible to do so. If the exception can be raised, then
233     // the failed state will be reset. Otherwise, the failed state
234     // will persist until the exception can be raised and the failed state
235     // can be cleared.
236     // Takes care of possible race conditions.
checkCancelled()237     private void checkCancelled() {
238         ExchangeImpl<?> impl = null;
239         IOException cause = null;
240         CompletableFuture<? extends ExchangeImpl<T>> cf = null;
241         if (failed != null) {
242             synchronized(this) {
243                 cause = failed;
244                 impl = exchImpl;
245                 cf = exchangeCF;
246             }
247         }
248         if (cause == null) return;
249         if (impl != null) {
250             // The exception is raised by propagating it to the impl.
251             if (debug.on()) debug.log("Cancelling exchImpl: %s", impl);
252             impl.cancel(cause);
253             failed = null;
254         } else {
255             Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
256                          + "\n\tCan''t cancel yet with {2}",
257                          request.uri(),
258                          request.timeout().isPresent() ?
259                          // calling duration.toMillis() can throw an exception.
260                          // this is just debugging, we don't care if it overflows.
261                          (request.timeout().get().getSeconds() * 1000
262                           + request.timeout().get().getNano() / 1000000) : -1,
263                          cause);
264             if (cf != null) cf.completeExceptionally(cause);
265         }
266     }
267 
h2Upgrade()268     public void h2Upgrade() {
269         upgrading = true;
270         request.setH2Upgrade(client.client2());
271     }
272 
getCancelCause()273     synchronized IOException getCancelCause() {
274         return failed;
275     }
276 
277     // get/set the exchange impl, solving race condition issues with
278     // potential concurrent calls to cancel() or cancel(IOException)
279     private CompletableFuture<? extends ExchangeImpl<T>>
establishExchange(HttpConnection connection)280     establishExchange(HttpConnection connection) {
281         if (debug.on()) {
282             debug.log("establishing exchange for %s,%n\t proxy=%s",
283                       request, request.proxy());
284         }
285         // check if we have been cancelled first.
286         Throwable t = getCancelCause();
287         checkCancelled();
288         if (t != null) {
289             return MinimalFuture.failedFuture(t);
290         }
291 
292         CompletableFuture<? extends ExchangeImpl<T>> cf, res;
293         cf = ExchangeImpl.get(this, connection);
294         // We should probably use a VarHandle to get/set exchangeCF
295         // instead - as we need CAS semantics.
296         synchronized (this) { exchangeCF = cf; };
297         res = cf.whenComplete((r,x) -> {
298             synchronized(Exchange.this) {
299                 if (exchangeCF == cf) exchangeCF = null;
300             }
301         });
302         checkCancelled();
303         return res.thenCompose((eimpl) -> {
304                     // recheck for cancelled, in case of race conditions
305                     exchImpl = eimpl;
306                     IOException tt = getCancelCause();
307                     checkCancelled();
308                     if (tt != null) {
309                         return MinimalFuture.failedFuture(tt);
310                     } else {
311                         // Now we're good to go. Because exchImpl is no longer
312                         // null cancel() will be able to propagate directly to
313                         // the impl after this point ( if needed ).
314                         return MinimalFuture.completedFuture(eimpl);
315                     } });
316     }
317 
318     // Completed HttpResponse will be null if response succeeded
319     // will be a non null responseAsync if expect continue returns an error
320 
responseAsync()321     public CompletableFuture<Response> responseAsync() {
322         return responseAsyncImpl(null);
323     }
324 
responseAsyncImpl(HttpConnection connection)325     CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
326         SecurityException e = checkPermissions();
327         if (e != null) {
328             return MinimalFuture.failedFuture(e);
329         } else {
330             return responseAsyncImpl0(connection);
331         }
332     }
333 
334     // check whether the headersSentCF was completed exceptionally with
335     // ProxyAuthorizationRequired. If so the Response embedded in the
336     // exception is returned. Otherwise we proceed.
checkFor407(ExchangeImpl<T> ex, Throwable t, Function<ExchangeImpl<T>,CompletableFuture<Response>> andThen)337     private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t,
338                                                     Function<ExchangeImpl<T>,CompletableFuture<Response>> andThen) {
339         t = Utils.getCompletionCause(t);
340         if (t instanceof ProxyAuthenticationRequired) {
341             if (debug.on()) debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response");
342             bodyIgnored = MinimalFuture.completedFuture(null);
343             Response proxyResponse = ((ProxyAuthenticationRequired)t).proxyResponse;
344             HttpConnection c = ex == null ? null : ex.connection();
345             Response syntheticResponse = new Response(request, this,
346                     proxyResponse.headers, c, proxyResponse.statusCode,
347                     proxyResponse.version, true);
348             return MinimalFuture.completedFuture(syntheticResponse);
349         } else if (t != null) {
350             if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);
351             return MinimalFuture.failedFuture(t);
352         } else {
353             if (debug.on()) debug.log("checkFor407: all clear");
354             return andThen.apply(ex);
355         }
356     }
357 
358     // After sending the request headers, if no ProxyAuthorizationRequired
359     // was raised and the expectContinue flag is on, we need to wait
360     // for the 100-Continue response
expectContinue(ExchangeImpl<T> ex)361     private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
362         assert request.expectContinue();
363         return ex.getResponseAsync(parentExecutor)
364                 .thenCompose((Response r1) -> {
365             Log.logResponse(r1::toString);
366             int rcode = r1.statusCode();
367             if (rcode == 100) {
368                 Log.logTrace("Received 100-Continue: sending body");
369                 if (debug.on()) debug.log("Received 100-Continue for %s", r1);
370                 CompletableFuture<Response> cf =
371                         exchImpl.sendBodyAsync()
372                                 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
373                 cf = wrapForUpgrade(cf);
374                 cf = wrapForLog(cf);
375                 return cf;
376             } else {
377                 Log.logTrace("Expectation failed: Received {0}",
378                         rcode);
379                 if (debug.on()) debug.log("Expect-Continue failed (%d) for: %s", rcode, r1);
380                 if (upgrading && rcode == 101) {
381                     IOException failed = new IOException(
382                             "Unable to handle 101 while waiting for 100");
383                     return MinimalFuture.failedFuture(failed);
384                 }
385                 return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
386                         .thenApply(v ->  r1);
387             }
388         });
389     }
390 
391     // After sending the request headers, if no ProxyAuthorizationRequired
392     // was raised and the expectContinue flag is off, we can immediately
393     // send the request body and proceed.
394     private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
395         assert !request.expectContinue();
396         if (debug.on()) debug.log("sendRequestBody");
397         CompletableFuture<Response> cf = ex.sendBodyAsync()
398                 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
399         cf = wrapForUpgrade(cf);
400         cf = wrapForLog(cf);
401         return cf;
402     }
403 
404     CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
405         Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
406         bodyIgnored = null;
407         if (request.expectContinue()) {
408             request.addSystemHeader("Expect", "100-Continue");
409             Log.logTrace("Sending Expect: 100-Continue");
410             // wait for 100-Continue before sending body
411             after407Check = this::expectContinue;
412         } else {
413             // send request body and proceed.
414             after407Check = this::sendRequestBody;
415         }
416         // The ProxyAuthorizationRequired can be triggered either by
417         // establishExchange (case of HTTP/2 SSL tunneling through HTTP/1.1 proxy
418         // or by sendHeaderAsync (case of HTTP/1.1 SSL tunneling through HTTP/1.1 proxy
419         // Therefore we handle it with a call to this checkFor407(...) after these
420         // two places.
421         Function<ExchangeImpl<T>, CompletableFuture<Response>> afterExch407Check =
422                 (ex) -> ex.sendHeadersAsync()
423                         .handle((r,t) -> this.checkFor407(r, t, after407Check))
424                         .thenCompose(Function.identity());
425         return establishExchange(connection)
426                 .handle((r,t) -> this.checkFor407(r,t, afterExch407Check))
427                 .thenCompose(Function.identity());
428     }
429 
430     private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
431         if (upgrading) {
432             return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
433         }
434         return cf;
435     }
436 
437     private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
438         if (Log.requests()) {
439             return cf.thenApply(response -> {
440                 Log.logResponse(response::toString);
441                 return response;
442             });
443         }
444         return cf;
445     }
446 
447     HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
448         return HttpResponse.BodySubscribers.replacing(null);
449     }
450 
451     // if this response was received in reply to an upgrade
452     // then create the Http2Connection from the HttpConnection
453     // initialize it and wait for the real response on a newly created Stream
454 
455     private CompletableFuture<Response>
456     checkForUpgradeAsync(Response resp,
457                          ExchangeImpl<T> ex) {
458 
459         int rcode = resp.statusCode();
460         if (upgrading && (rcode == 101)) {
461             Http1Exchange<T> e = (Http1Exchange<T>)ex;
462             // check for 101 switching protocols
463             // 101 responses are not supposed to contain a body.
464             //    => should we fail if there is one?
465             if (debug.on()) debug.log("Upgrading async %s", e.connection());
466             return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
467                 .thenCompose((T v) -> {// v is null
468                     debug.log("Ignored body");
469                     // we pass e::getBuffer to allow the ByteBuffers to accumulate
470                     // while we build the Http2Connection
471                     return Http2Connection.createAsync(e.connection(),
472                                                  client.client2(),
473                                                  this, e::drainLeftOverBytes)
474                         .thenCompose((Http2Connection c) -> {
475                             boolean cached = c.offerConnection();
476                             Stream<T> s = c.getStream(1);
477 
478                             if (s == null) {
479                                 // s can be null if an exception occurred
480                                 // asynchronously while sending the preface.
481                                 Throwable t = c.getRecordedCause();
482                                 IOException ioe;
483                                 if (t != null) {
484                                     if (!cached)
485                                         c.close();
486                                     ioe = new IOException("Can't get stream 1: " + t, t);
487                                 } else {
488                                     ioe = new IOException("Can't get stream 1");
489                                 }
490                                 return MinimalFuture.failedFuture(ioe);
491                             }
492                             exchImpl.released();
493                             Throwable t;
494                             // There's a race condition window where an external
495                             // thread (SelectorManager) might complete the
496                             // exchange in timeout at the same time where we're
497                             // trying to switch the exchange impl.
498                             // 'failed' will be reset to null after
499                             // exchImpl.cancel() has completed, so either we
500                             // will observe failed != null here, or we will
501                             // observe e.getCancelCause() != null, or the
502                             // timeout exception will be routed to 's'.
503                             // Either way, we need to relay it to s.
504                             synchronized (this) {
505                                 exchImpl = s;
506                                 t = failed;
507                             }
508                             // Check whether the HTTP/1.1 was cancelled.
509                             if (t == null) t = e.getCancelCause();
510                             // if HTTP/1.1 exchange was timed out, don't
511                             // try to go further.
512                             if (t instanceof HttpTimeoutException) {
513                                  s.cancelImpl(t);
514                                  return MinimalFuture.failedFuture(t);
515                             }
516                             if (debug.on())
517                                 debug.log("Getting response async %s", s);
518                             return s.getResponseAsync(null);
519                         });}
520                 );
521         }
522         return MinimalFuture.completedFuture(resp);
523     }
524 
525     private URI getURIForSecurityCheck() {
526         URI u;
527         String method = request.method();
528         InetSocketAddress authority = request.authority();
529         URI uri = request.uri();
530 
531         // CONNECT should be restricted at API level
532         if (method.equalsIgnoreCase("CONNECT")) {
533             try {
534                 u = new URI("socket",
535                              null,
536                              authority.getHostString(),
537                              authority.getPort(),
538                              null,
539                              null,
540                              null);
541             } catch (URISyntaxException e) {
542                 throw new InternalError(e); // shouldn't happen
543             }
544         } else {
545             u = uri;
546         }
547         return u;
548     }
549 
550     /**
551      * Returns the security permission required for the given details.
552      * If method is CONNECT, then uri must be of form "scheme://host:port"
553      */
554     private static URLPermission permissionForServer(URI uri,
555                                                      String method,
556                                                      Map<String, List<String>> headers) {
557         if (method.equals("CONNECT")) {
558             return new URLPermission(uri.toString(), "CONNECT");
559         } else {
560             return Utils.permissionForServer(uri, method, headers.keySet().stream());
561         }
562     }
563 
564     /**
565      * Performs the necessary security permission checks required to retrieve
566      * the response. Returns a security exception representing the denied
567      * permission, or null if all checks pass or there is no security manager.
568      */
569     private SecurityException checkPermissions() {
570         String method = request.method();
571         SecurityManager sm = System.getSecurityManager();
572         if (sm == null || method.equals("CONNECT")) {
573             // tunneling will have a null acc, which is fine. The proxy
574             // permission check will have already been preformed.
575             return null;
576         }
577 
578         HttpHeaders userHeaders = request.getUserHeaders();
579         URI u = getURIForSecurityCheck();
580         URLPermission p = permissionForServer(u, method, userHeaders.map());
581 
582         try {
583             assert acc != null;
584             sm.checkPermission(p, acc);
585         } catch (SecurityException e) {
586             return e;
587         }
588         String hostHeader = userHeaders.firstValue("Host").orElse(null);
589         if (hostHeader != null && !hostHeader.equalsIgnoreCase(u.getHost())) {
590             // user has set a Host header different to request URI
591             // must check that for URLPermission also
592             URI u1 = replaceHostInURI(u, hostHeader);
593             URLPermission p1 = permissionForServer(u1, method, userHeaders.map());
594             try {
595                 assert acc != null;
596                 sm.checkPermission(p1, acc);
597             } catch (SecurityException e) {
598                 return e;
599             }
600         }
601         ProxySelector ps = client.proxySelector();
602         if (ps != null) {
603             if (!method.equals("CONNECT")) {
604                 // a non-tunneling HTTP proxy. Need to check access
605                 URLPermission proxyPerm = permissionForProxy(request.proxy());
606                 if (proxyPerm != null) {
607                     try {
608                         sm.checkPermission(proxyPerm, acc);
609                     } catch (SecurityException e) {
610                         return e;
611                     }
612                 }
613             }
614         }
615         return null;
616     }
617 
618     private static URI replaceHostInURI(URI u, String hostPort) {
619         StringBuilder sb = new StringBuilder();
620         sb.append(u.getScheme())
621                 .append("://")
622                 .append(hostPort)
623                 .append(u.getRawPath());
624         return URI.create(sb.toString());
625     }
626 
627     HttpClient.Version version() {
628         return multi.version();
629     }
630 
631     String dbgString() {
632         return dbgTag;
633     }
634 }
635