1 /* 2 * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @bug 8196389 27 * @summary Should HttpClient support SETTINGS_MAX_CONCURRENT_STREAMS from the server 28 * 29 * @modules java.base/sun.net.www.http 30 * java.net.http/jdk.internal.net.http.common 31 * java.net.http/jdk.internal.net.http.frame 32 * java.net.http/jdk.internal.net.http.hpack 33 * java.logging 34 * jdk.httpserver 35 * @library /test/lib http2/server 36 * @build Http2TestServer 37 * @build jdk.test.lib.net.SimpleSSLContext 38 * @run testng/othervm -ea -esa MaxStreams 39 */ 40 41 import java.io.IOException; 42 import java.io.InputStream; 43 import java.io.OutputStream; 44 import java.net.InetAddress; 45 import java.net.InetSocketAddress; 46 import java.net.URI; 47 import java.util.List; 48 import java.util.LinkedList; 49 import java.util.Properties; 50 import java.util.concurrent.atomic.AtomicInteger; 51 import java.util.concurrent.CompletableFuture; 52 import java.util.concurrent.CompletionException; 53 import java.util.concurrent.CountDownLatch; 54 import java.util.concurrent.Executors; 55 import java.util.concurrent.ExecutorService; 56 import java.util.concurrent.Semaphore; 57 import javax.net.ssl.SSLContext; 58 import java.net.http.HttpClient; 59 import java.net.http.HttpRequest; 60 import java.net.http.HttpResponse; 61 import java.net.http.HttpResponse.BodyHandler; 62 import java.net.http.HttpResponse.BodyHandlers; 63 import jdk.test.lib.net.SimpleSSLContext; 64 import org.testng.annotations.AfterTest; 65 import org.testng.annotations.BeforeTest; 66 import org.testng.annotations.DataProvider; 67 import org.testng.annotations.Test; 68 import static java.nio.charset.StandardCharsets.UTF_8; 69 import static java.net.http.HttpResponse.BodyHandlers.discarding; 70 import static org.testng.Assert.assertEquals; 71 import static org.testng.Assert.assertFalse; 72 import static org.testng.Assert.fail; 73 74 public class MaxStreams { 75 76 Http2TestServer http2TestServer; // HTTP/2 ( h2c ) 77 Http2TestServer https2TestServer; // HTTP/2 ( h2 ) 78 final Http2FixedHandler handler = new Http2FixedHandler(); 79 SSLContext ctx; 80 String http2FixedURI; 81 String https2FixedURI; 82 volatile CountDownLatch latch; 83 ExecutorService exec; 84 final Semaphore canStartTestRun = new Semaphore(1); 85 86 // we send an initial warm up request, then MAX_STREAMS+1 requests 87 // in parallel. The last of them should hit the limit. 88 // Then we wait for all the responses and send a further request 89 // which should succeed. The server should see (and respond to) 90 // MAX_STREAMS+2 requests per test run. 91 92 static final int MAX_STREAMS = 10; 93 static final String RESPONSE = "Hello world"; 94 95 @DataProvider(name = "uris") variants()96 public Object[][] variants() { 97 return new Object[][]{ 98 {http2FixedURI}, 99 {https2FixedURI}, 100 {http2FixedURI}, 101 {https2FixedURI} 102 }; 103 } 104 105 106 @Test(dataProvider = "uris", timeOut=20000) 107 void testAsString(String uri) throws Exception { 108 System.err.println("Semaphore acquire"); 109 canStartTestRun.acquire(); 110 latch = new CountDownLatch(1); 111 handler.setLatch(latch); 112 HttpClient client = HttpClient.newBuilder().sslContext(ctx).build(); 113 List<CompletableFuture<HttpResponse<String>>> responses = new LinkedList<>(); 114 115 HttpRequest request = HttpRequest.newBuilder(URI.create(uri)) 116 .version(HttpClient.Version.HTTP_2) 117 .GET() newHttpClient()118 .build(); 119 // send warmup to ensure we only have one Http2Connection 120 System.err.println("Sending warmup request"); 121 HttpResponse<String> warmup = client.send(request, BodyHandlers.ofString()); 122 if (warmup.statusCode() != 200 || !warmup.body().equals(RESPONSE)) 123 throw new RuntimeException(); 124 125 for (int i=0;i<MAX_STREAMS+1; i++) { testAsBytes(String uri, boolean sameClient)126 System.err.println("Sending request " + i); 127 responses.add(client.sendAsync(request, BodyHandlers.ofString())); 128 } 129 130 // wait until we get local exception before allow server to proceed 131 try { 132 System.err.println("Waiting for first exception"); 133 CompletableFuture.anyOf(responses.toArray(new CompletableFuture<?>[0])).join(); 134 } catch (Exception ee) { 135 System.err.println("Expected exception 1 " + ee); 136 } 137 138 latch.countDown(); 139 140 // check the first MAX_STREAMS requests succeeded 141 try { 142 System.err.println("Waiting for second exception"); apply(HttpResponse.ResponseInfo rinfo)143 CompletableFuture.allOf(responses.toArray(new CompletableFuture<?>[0])).join(); 144 System.err.println("Did not get Expected exception 2 "); 145 } catch (Exception ee) { 146 System.err.println("Expected exception 2 " + ee); 147 } 148 int count = 0; 149 int failures = 0; 150 for (CompletableFuture<HttpResponse<String>> cf : responses) { 151 HttpResponse<String> r = null; 152 try { 153 count++; 154 r = cf.join(); 155 if (r.statusCode() != 200 || !r.body().equals(RESPONSE)) onSubscribe(Flow.Subscription subscription)156 throw new RuntimeException(); 157 } catch (Throwable t) { 158 failures++; 159 System.err.printf("Failure %d at count %d\n", failures, count); 160 System.err.println(t); 161 t.printStackTrace(); 162 } onNext(List<ByteBuffer> item)163 } 164 if (failures != 1) { 165 String msg = "Expected 1 failure. Got " + failures; 166 throw new RuntimeException(msg); 167 } 168 169 System.err.println("Sending last request"); onError(Throwable throwable)170 // make sure it succeeds now as number of streams == 0 now 171 HttpResponse<String> warmdown = client.send(request, BodyHandlers.ofString()); 172 if (warmdown.statusCode() != 200 || !warmdown.body().equals(RESPONSE)) 173 throw new RuntimeException(); 174 System.err.println("Test OK"); 175 } 176 onComplete()177 @BeforeTest 178 public void setup() throws Exception { 179 ctx = (new SimpleSSLContext()).get(); 180 exec = Executors.newCachedThreadPool(); 181 182 InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); 183 184 Properties props = new Properties(); 185 props.setProperty("http2server.settings.max_concurrent_streams", Integer.toString(MAX_STREAMS)); 186 http2TestServer = new Http2TestServer("localhost", false, 0, exec, 10, props, null); 187 http2TestServer.addHandler(handler, "/http2/fixed"); 188 http2FixedURI = "http://" + http2TestServer.serverAuthority()+ "/http2/fixed"; 189 http2TestServer.start(); 190 191 https2TestServer = new Http2TestServer("localhost", true, 0, exec, 10, props, ctx); 192 https2TestServer.addHandler(handler, "/https2/fixed"); 193 https2FixedURI = "https://" + https2TestServer.serverAuthority()+ "/https2/fixed"; 194 https2TestServer.start(); 195 } 196 197 @AfterTest 198 public void teardown() throws Exception { 199 System.err.println("Stopping test server now"); 200 http2TestServer.stop(); 201 } 202 203 class Http2FixedHandler implements Http2Handler { 204 final AtomicInteger counter = new AtomicInteger(0); 205 volatile CountDownLatch latch; 206 207 synchronized void setLatch(CountDownLatch latch) { 208 this.latch = latch; 209 } 210 211 synchronized CountDownLatch getLatch() { 212 return latch; 213 } 214 215 @Override 216 public void handle(Http2TestExchange t) throws IOException { 217 int c = -1; 218 try (InputStream is = t.getRequestBody(); 219 OutputStream os = t.getResponseBody()) { 220 221 is.readAllBytes(); 222 c = counter.getAndIncrement(); 223 if (c > 0 && c <= MAX_STREAMS) { 224 // Wait for latch. 225 try { 226 // don't send any replies until all requests are sent 227 System.err.println("Latch await"); 228 getLatch().await(); 229 System.err.println("Latch resume"); 230 } catch (InterruptedException ee) {} 231 } 232 t.sendResponseHeaders(200, RESPONSE.length()); 233 os.write(RESPONSE.getBytes()); 234 } finally { 235 // client issues MAX_STREAMS + 3 requests in total 236 // but server should only see MAX_STREAMS + 2 in total. One is rejected by client 237 // counter c captured before increment so final value is MAX_STREAMS + 1 238 if (c == MAX_STREAMS + 1) { 239 System.err.println("Semaphore release"); teardown()240 counter.set(0); 241 canStartTestRun.release(); 242 } 243 } 244 } 245 } 246 } 247