1 /*
2  * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /*
25  * @test
26  * @bug 8196389
27  * @summary Should HttpClient support SETTINGS_MAX_CONCURRENT_STREAMS from the server
28  *
29  * @modules java.base/sun.net.www.http
30  *          java.net.http/jdk.internal.net.http.common
31  *          java.net.http/jdk.internal.net.http.frame
32  *          java.net.http/jdk.internal.net.http.hpack
33  *          java.logging
34  *          jdk.httpserver
35  * @library /test/lib http2/server
36  * @build Http2TestServer
37  * @build jdk.test.lib.net.SimpleSSLContext
38  * @run testng/othervm -ea -esa MaxStreams
39  */
40 
41 import java.io.IOException;
42 import java.io.InputStream;
43 import java.io.OutputStream;
44 import java.net.InetAddress;
45 import java.net.InetSocketAddress;
46 import java.net.URI;
47 import java.util.List;
48 import java.util.LinkedList;
49 import java.util.Properties;
50 import java.util.concurrent.atomic.AtomicInteger;
51 import java.util.concurrent.CompletableFuture;
52 import java.util.concurrent.CompletionException;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.Executors;
55 import java.util.concurrent.ExecutorService;
56 import java.util.concurrent.Semaphore;
57 import javax.net.ssl.SSLContext;
58 import java.net.http.HttpClient;
59 import java.net.http.HttpRequest;
60 import java.net.http.HttpResponse;
61 import java.net.http.HttpResponse.BodyHandler;
62 import java.net.http.HttpResponse.BodyHandlers;
63 import jdk.test.lib.net.SimpleSSLContext;
64 import org.testng.annotations.AfterTest;
65 import org.testng.annotations.BeforeTest;
66 import org.testng.annotations.DataProvider;
67 import org.testng.annotations.Test;
68 import static java.nio.charset.StandardCharsets.UTF_8;
69 import static java.net.http.HttpResponse.BodyHandlers.discarding;
70 import static org.testng.Assert.assertEquals;
71 import static org.testng.Assert.assertFalse;
72 import static org.testng.Assert.fail;
73 
74 public class MaxStreams {
75 
76     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
77     Http2TestServer https2TestServer;   // HTTP/2 ( h2 )
78     final Http2FixedHandler handler = new Http2FixedHandler();
79     SSLContext ctx;
80     String http2FixedURI;
81     String https2FixedURI;
82     volatile CountDownLatch latch;
83     ExecutorService exec;
84     final Semaphore canStartTestRun = new Semaphore(1);
85 
86     // we send an initial warm up request, then MAX_STREAMS+1 requests
87     // in parallel. The last of them should hit the limit.
88     // Then we wait for all the responses and send a further request
89     // which should succeed. The server should see (and respond to)
90     // MAX_STREAMS+2 requests per test run.
91 
92     static final int MAX_STREAMS = 10;
93     static final String RESPONSE = "Hello world";
94 
95     @DataProvider(name = "uris")
variants()96     public Object[][] variants() {
97         return new Object[][]{
98                 {http2FixedURI},
99                 {https2FixedURI},
100                 {http2FixedURI},
101                 {https2FixedURI}
102         };
103     }
104 
105 
106     @Test(dataProvider = "uris", timeOut=20000)
107     void testAsString(String uri) throws Exception {
108         System.err.println("Semaphore acquire");
109         canStartTestRun.acquire();
110         latch = new CountDownLatch(1);
111         handler.setLatch(latch);
112         HttpClient client = HttpClient.newBuilder().sslContext(ctx).build();
113         List<CompletableFuture<HttpResponse<String>>> responses = new LinkedList<>();
114 
115         HttpRequest request = HttpRequest.newBuilder(URI.create(uri))
116                                          .version(HttpClient.Version.HTTP_2)
117                                          .GET()
newHttpClient()118                                          .build();
119         // send warmup to ensure we only have one Http2Connection
120         System.err.println("Sending warmup request");
121         HttpResponse<String> warmup = client.send(request, BodyHandlers.ofString());
122         if (warmup.statusCode() != 200 || !warmup.body().equals(RESPONSE))
123             throw new RuntimeException();
124 
125         for (int i=0;i<MAX_STREAMS+1; i++) {
testAsBytes(String uri, boolean sameClient)126             System.err.println("Sending request " + i);
127             responses.add(client.sendAsync(request, BodyHandlers.ofString()));
128         }
129 
130         // wait until we get local exception before allow server to proceed
131         try {
132             System.err.println("Waiting for first exception");
133             CompletableFuture.anyOf(responses.toArray(new CompletableFuture<?>[0])).join();
134         } catch (Exception ee) {
135             System.err.println("Expected exception 1 " + ee);
136         }
137 
138         latch.countDown();
139 
140         // check the first MAX_STREAMS requests succeeded
141         try {
142             System.err.println("Waiting for second exception");
apply(HttpResponse.ResponseInfo rinfo)143             CompletableFuture.allOf(responses.toArray(new CompletableFuture<?>[0])).join();
144             System.err.println("Did not get Expected exception 2 ");
145         } catch (Exception ee) {
146             System.err.println("Expected exception 2 " + ee);
147         }
148         int count = 0;
149         int failures = 0;
150         for (CompletableFuture<HttpResponse<String>> cf : responses) {
151             HttpResponse<String> r = null;
152             try {
153                 count++;
154                 r = cf.join();
155                 if (r.statusCode() != 200 || !r.body().equals(RESPONSE))
onSubscribe(Flow.Subscription subscription)156                     throw new RuntimeException();
157             } catch (Throwable t) {
158                 failures++;
159                 System.err.printf("Failure %d at count %d\n", failures, count);
160                 System.err.println(t);
161                 t.printStackTrace();
162             }
onNext(List<ByteBuffer> item)163         }
164         if (failures != 1) {
165             String msg = "Expected 1 failure. Got " + failures;
166             throw new RuntimeException(msg);
167         }
168 
169         System.err.println("Sending last request");
onError(Throwable throwable)170         // make sure it succeeds now as number of streams == 0 now
171         HttpResponse<String> warmdown = client.send(request, BodyHandlers.ofString());
172         if (warmdown.statusCode() != 200 || !warmdown.body().equals(RESPONSE))
173             throw new RuntimeException();
174         System.err.println("Test OK");
175     }
176 
onComplete()177     @BeforeTest
178     public void setup() throws Exception {
179         ctx = (new SimpleSSLContext()).get();
180         exec = Executors.newCachedThreadPool();
181 
182         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
183 
184         Properties props = new Properties();
185         props.setProperty("http2server.settings.max_concurrent_streams", Integer.toString(MAX_STREAMS));
186         http2TestServer = new Http2TestServer("localhost", false, 0, exec, 10, props, null);
187         http2TestServer.addHandler(handler, "/http2/fixed");
188         http2FixedURI = "http://" + http2TestServer.serverAuthority()+ "/http2/fixed";
189         http2TestServer.start();
190 
191         https2TestServer = new Http2TestServer("localhost", true, 0, exec, 10, props, ctx);
192         https2TestServer.addHandler(handler, "/https2/fixed");
193         https2FixedURI = "https://" + https2TestServer.serverAuthority()+ "/https2/fixed";
194         https2TestServer.start();
195     }
196 
197     @AfterTest
198     public void teardown() throws Exception {
199         System.err.println("Stopping test server now");
200         http2TestServer.stop();
201     }
202 
203     class Http2FixedHandler implements Http2Handler {
204         final AtomicInteger counter = new AtomicInteger(0);
205         volatile CountDownLatch latch;
206 
207         synchronized void setLatch(CountDownLatch latch) {
208             this.latch = latch;
209         }
210 
211         synchronized CountDownLatch getLatch() {
212             return latch;
213         }
214 
215         @Override
216         public void handle(Http2TestExchange t) throws IOException {
217             int c = -1;
218             try (InputStream is = t.getRequestBody();
219                  OutputStream os = t.getResponseBody()) {
220 
221                 is.readAllBytes();
222                 c = counter.getAndIncrement();
223                 if (c > 0 && c <= MAX_STREAMS) {
224                     // Wait for latch.
225                     try {
226                         // don't send any replies until all requests are sent
227                         System.err.println("Latch await");
228                         getLatch().await();
229                         System.err.println("Latch resume");
230                     } catch (InterruptedException ee) {}
231                 }
232                 t.sendResponseHeaders(200, RESPONSE.length());
233                 os.write(RESPONSE.getBytes());
234             } finally {
235                 // client issues MAX_STREAMS + 3 requests in total
236                 // but server should only see MAX_STREAMS + 2 in total. One is rejected by client
237                 // counter c captured before increment so final value is MAX_STREAMS + 1
238                 if (c == MAX_STREAMS + 1) {
239                     System.err.println("Semaphore release");
teardown()240                     counter.set(0);
241                     canStartTestRun.release();
242                 }
243             }
244         }
245     }
246 }
247