1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @bug 8201186 27 * @summary Tests an asynchronous BodySubscriber that completes 28 * immediately with a Publisher<List<ByteBuffer>> 29 * @library /lib/testlibrary http2/server 30 * @build jdk.testlibrary.SimpleSSLContext 31 * @modules java.base/sun.net.www.http 32 * java.net.http/jdk.internal.net.http.common 33 * java.net.http/jdk.internal.net.http.frame 34 * java.net.http/jdk.internal.net.http.hpack 35 * @run testng/othervm ResponsePublisher 36 */ 37 38 import com.sun.net.httpserver.HttpExchange; 39 import com.sun.net.httpserver.HttpHandler; 40 import com.sun.net.httpserver.HttpServer; 41 import com.sun.net.httpserver.HttpsConfigurator; 42 import com.sun.net.httpserver.HttpsServer; 43 import jdk.testlibrary.SimpleSSLContext; 44 import org.testng.annotations.AfterTest; 45 import org.testng.annotations.BeforeTest; 46 import org.testng.annotations.DataProvider; 47 import org.testng.annotations.Test; 48 49 import javax.net.ssl.SSLContext; 50 import java.io.IOException; 51 import java.io.InputStream; 52 import java.io.OutputStream; 53 import java.net.InetAddress; 54 import java.net.InetSocketAddress; 55 import java.net.URI; 56 import java.net.http.HttpClient; 57 import java.net.http.HttpHeaders; 58 import java.net.http.HttpRequest; 59 import java.net.http.HttpResponse; 60 import java.net.http.HttpResponse.BodyHandler; 61 import java.net.http.HttpResponse.BodyHandlers; 62 import java.net.http.HttpResponse.BodySubscriber; 63 import java.net.http.HttpResponse.BodySubscribers; 64 import java.nio.ByteBuffer; 65 import java.util.List; 66 import java.util.Objects; 67 import java.util.concurrent.CompletableFuture; 68 import java.util.concurrent.CompletionException; 69 import java.util.concurrent.CompletionStage; 70 import java.util.concurrent.Executor; 71 import java.util.concurrent.Executors; 72 import java.util.concurrent.Flow; 73 import java.util.concurrent.Flow.Publisher; 74 import java.util.concurrent.atomic.AtomicReference; 75 import java.util.function.Supplier; 76 77 import static java.lang.System.out; 78 import static java.nio.charset.StandardCharsets.UTF_8; 79 import static org.testng.Assert.assertEquals; 80 import static org.testng.Assert.assertNotNull; 81 import static org.testng.Assert.assertTrue; 82 83 public class ResponsePublisher implements HttpServerAdapters { 84 85 SSLContext sslContext; 86 HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ] 87 HttpTestServer httpsTestServer; // HTTPS/1.1 88 HttpTestServer http2TestServer; // HTTP/2 ( h2c ) 89 HttpTestServer https2TestServer; // HTTP/2 ( h2 ) 90 String httpURI_fixed; 91 String httpURI_chunk; 92 String httpsURI_fixed; 93 String httpsURI_chunk; 94 String http2URI_fixed; 95 String http2URI_chunk; 96 String https2URI_fixed; 97 String https2URI_chunk; 98 99 static final int ITERATION_COUNT = 3; 100 // a shared executor helps reduce the amount of threads created by the test 101 static final Executor executor = Executors.newCachedThreadPool(); 102 103 interface BHS extends Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> { of(BHS impl, String name)104 static BHS of(BHS impl, String name) { 105 return new BHSImpl(impl, name); 106 } 107 } 108 109 static final class BHSImpl implements BHS { 110 final BHS supplier; 111 final String name; BHSImpl(BHS impl, String name)112 BHSImpl(BHS impl, String name) { 113 this.supplier = impl; 114 this.name = name; 115 } 116 @Override toString()117 public String toString() { 118 return name; 119 } 120 121 @Override get()122 public BodyHandler<Publisher<List<ByteBuffer>>> get() { 123 return supplier.get(); 124 } 125 } 126 127 static final Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> OF_PUBLISHER_API = 128 BHS.of(BodyHandlers::ofPublisher, "BodyHandlers::ofPublisher"); 129 static final Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> OF_PUBLISHER_TEST = 130 BHS.of(PublishingBodyHandler::new, "PublishingBodyHandler::new"); 131 132 @DataProvider(name = "variants") variants()133 public Object[][] variants() { 134 return new Object[][]{ 135 { httpURI_fixed, false, OF_PUBLISHER_API }, 136 { httpURI_chunk, false, OF_PUBLISHER_API }, 137 { httpsURI_fixed, false, OF_PUBLISHER_API }, 138 { httpsURI_chunk, false, OF_PUBLISHER_API }, 139 { http2URI_fixed, false, OF_PUBLISHER_API }, 140 { http2URI_chunk, false, OF_PUBLISHER_API }, 141 { https2URI_fixed, false, OF_PUBLISHER_API }, 142 { https2URI_chunk, false, OF_PUBLISHER_API }, 143 144 { httpURI_fixed, true, OF_PUBLISHER_API }, 145 { httpURI_chunk, true, OF_PUBLISHER_API }, 146 { httpsURI_fixed, true, OF_PUBLISHER_API }, 147 { httpsURI_chunk, true, OF_PUBLISHER_API }, 148 { http2URI_fixed, true, OF_PUBLISHER_API }, 149 { http2URI_chunk, true, OF_PUBLISHER_API }, 150 { https2URI_fixed, true, OF_PUBLISHER_API }, 151 { https2URI_chunk, true, OF_PUBLISHER_API }, 152 153 { httpURI_fixed, false, OF_PUBLISHER_TEST }, 154 { httpURI_chunk, false, OF_PUBLISHER_TEST }, 155 { httpsURI_fixed, false, OF_PUBLISHER_TEST }, 156 { httpsURI_chunk, false, OF_PUBLISHER_TEST }, 157 { http2URI_fixed, false, OF_PUBLISHER_TEST }, 158 { http2URI_chunk, false, OF_PUBLISHER_TEST }, 159 { https2URI_fixed, false, OF_PUBLISHER_TEST }, 160 { https2URI_chunk, false, OF_PUBLISHER_TEST }, 161 162 { httpURI_fixed, true, OF_PUBLISHER_TEST }, 163 { httpURI_chunk, true, OF_PUBLISHER_TEST }, 164 { httpsURI_fixed, true, OF_PUBLISHER_TEST }, 165 { httpsURI_chunk, true, OF_PUBLISHER_TEST }, 166 { http2URI_fixed, true, OF_PUBLISHER_TEST }, 167 { http2URI_chunk, true, OF_PUBLISHER_TEST }, 168 { https2URI_fixed, true, OF_PUBLISHER_TEST }, 169 { https2URI_chunk, true, OF_PUBLISHER_TEST }, 170 }; 171 } 172 173 final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE; newHttpClient()174 HttpClient newHttpClient() { 175 return TRACKER.track(HttpClient.newBuilder() 176 .executor(executor) 177 .sslContext(sslContext) 178 .build()); 179 } 180 181 @Test(dataProvider = "variants") testExceptions(String uri, boolean sameClient, BHS handlers)182 public void testExceptions(String uri, boolean sameClient, BHS handlers) throws Exception { 183 HttpClient client = null; 184 for (int i=0; i< ITERATION_COUNT; i++) { 185 if (!sameClient || client == null) 186 client = newHttpClient(); 187 188 HttpRequest req = HttpRequest.newBuilder(URI.create(uri)) 189 .build(); 190 BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get(); 191 HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler); 192 try { 193 response.body().subscribe(null); 194 throw new RuntimeException("Expected NPE not thrown"); 195 } catch (NullPointerException x) { 196 System.out.println("Got expected NPE: " + x); 197 } 198 // We can reuse our BodySubscribers implementations to subscribe to the 199 // Publisher<List<ByteBuffer>> 200 BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8); 201 response.body().subscribe(ofString); 202 203 BodySubscriber<String> ofString2 = BodySubscribers.ofString(UTF_8); 204 response.body().subscribe(ofString2); 205 try { 206 ofString2.getBody().toCompletableFuture().join(); 207 throw new RuntimeException("Expected ISE not thrown"); 208 } catch (CompletionException x) { 209 Throwable cause = x.getCause(); 210 if (cause instanceof IllegalStateException) { 211 System.out.println("Got expected ISE: " + cause); 212 } else { 213 throw x; 214 } 215 } 216 // Get the final result and compare it with the expected body 217 String body = ofString.getBody().toCompletableFuture().get(); 218 assertEquals(body, ""); 219 } 220 } 221 222 @Test(dataProvider = "variants") testNoBody(String uri, boolean sameClient, BHS handlers)223 public void testNoBody(String uri, boolean sameClient, BHS handlers) throws Exception { 224 HttpClient client = null; 225 for (int i=0; i< ITERATION_COUNT; i++) { 226 if (!sameClient || client == null) 227 client = newHttpClient(); 228 229 HttpRequest req = HttpRequest.newBuilder(URI.create(uri)) 230 .build(); 231 BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get(); 232 HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler); 233 // We can reuse our BodySubscribers implementations to subscribe to the 234 // Publisher<List<ByteBuffer>> 235 BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8); 236 // get the Publisher<List<ByteBuffer>> and 237 // subscribe to it. 238 response.body().subscribe(ofString); 239 // Get the final result and compare it with the expected body 240 String body = ofString.getBody().toCompletableFuture().get(); 241 assertEquals(body, ""); 242 } 243 } 244 245 @Test(dataProvider = "variants") testNoBodyAsync(String uri, boolean sameClient, BHS handlers)246 public void testNoBodyAsync(String uri, boolean sameClient, BHS handlers) throws Exception { 247 HttpClient client = null; 248 for (int i=0; i< ITERATION_COUNT; i++) { 249 if (!sameClient || client == null) 250 client = newHttpClient(); 251 252 HttpRequest req = HttpRequest.newBuilder(URI.create(uri)) 253 .build(); 254 BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get(); 255 // We can reuse our BodySubscribers implementations to subscribe to the 256 // Publisher<List<ByteBuffer>> 257 BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8); 258 CompletableFuture<String> result = 259 client.sendAsync(req, handler).thenCompose( 260 (responsePublisher) -> { 261 // get the Publisher<List<ByteBuffer>> and 262 // subscribe to it. 263 responsePublisher.body().subscribe(ofString); 264 return ofString.getBody(); 265 }); 266 // Get the final result and compare it with the expected body 267 assertEquals(result.get(), ""); 268 } 269 } 270 271 @Test(dataProvider = "variants") testAsString(String uri, boolean sameClient, BHS handlers)272 public void testAsString(String uri, boolean sameClient, BHS handlers) throws Exception { 273 HttpClient client = null; 274 for (int i=0; i< ITERATION_COUNT; i++) { 275 if (!sameClient || client == null) 276 client = newHttpClient(); 277 278 HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody")) 279 .build(); 280 BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get(); 281 HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler); 282 // We can reuse our BodySubscribers implementations to subscribe to the 283 // Publisher<List<ByteBuffer>> 284 BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8); 285 // get the Publisher<List<ByteBuffer>> and 286 // subscribe to it. 287 response.body().subscribe(ofString); 288 // Get the final result and compare it with the expected body 289 String body = ofString.getBody().toCompletableFuture().get(); 290 assertEquals(body, WITH_BODY); 291 } 292 } 293 294 @Test(dataProvider = "variants") testAsStringAsync(String uri, boolean sameClient, BHS handlers)295 public void testAsStringAsync(String uri, boolean sameClient, BHS handlers) throws Exception { 296 HttpClient client = null; 297 for (int i=0; i< ITERATION_COUNT; i++) { 298 if (!sameClient || client == null) 299 client = newHttpClient(); 300 301 HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody")) 302 .build(); 303 BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get(); 304 // We can reuse our BodySubscribers implementations to subscribe to the 305 // Publisher<List<ByteBuffer>> 306 BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8); 307 CompletableFuture<String> result = client.sendAsync(req, handler) 308 .thenCompose((responsePublisher) -> { 309 // get the Publisher<List<ByteBuffer>> and 310 // subscribe to it. 311 responsePublisher.body().subscribe(ofString); 312 return ofString.getBody(); 313 }); 314 // Get the final result and compare it with the expected body 315 String body = result.get(); 316 assertEquals(body, WITH_BODY); 317 } 318 } 319 320 // A BodyHandler that returns PublishingBodySubscriber instances 321 static class PublishingBodyHandler implements BodyHandler<Publisher<List<ByteBuffer>>> { 322 @Override apply(HttpResponse.ResponseInfo rinfo)323 public BodySubscriber<Publisher<List<ByteBuffer>>> apply(HttpResponse.ResponseInfo rinfo) { 324 assertEquals(rinfo.statusCode(), 200); 325 return new PublishingBodySubscriber(); 326 } 327 } 328 329 // A BodySubscriber that returns a Publisher<List<ByteBuffer>> 330 static class PublishingBodySubscriber implements BodySubscriber<Publisher<List<ByteBuffer>>> { 331 private final CompletableFuture<Flow.Subscription> subscriptionCF = new CompletableFuture<>(); 332 private final CompletableFuture<Flow.Subscriber<? super List<ByteBuffer>>> subscribedCF = new CompletableFuture<>(); 333 private AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>(); 334 private final CompletionStage<Publisher<List<ByteBuffer>>> body = 335 subscriptionCF.thenCompose((s) -> CompletableFuture.completedStage(this::subscribe)); 336 //CompletableFuture.completedStage(this::subscribe); 337 subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber)338 private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { 339 Objects.requireNonNull(subscriber, "subscriber must not be null"); 340 if (subscriberRef.compareAndSet(null, subscriber)) { 341 subscriptionCF.thenAccept((s) -> { 342 subscriber.onSubscribe(s); 343 subscribedCF.complete(subscriber); 344 }); 345 } else { 346 subscriber.onSubscribe(new Flow.Subscription() { 347 @Override public void request(long n) { } 348 @Override public void cancel() { } 349 }); 350 subscriber.onError( 351 new IllegalStateException("This publisher has already one subscriber")); 352 } 353 } 354 355 @Override onSubscribe(Flow.Subscription subscription)356 public void onSubscribe(Flow.Subscription subscription) { 357 subscriptionCF.complete(subscription); 358 } 359 360 @Override onNext(List<ByteBuffer> item)361 public void onNext(List<ByteBuffer> item) { 362 assert subscriptionCF.isDone(); // cannot be called before onSubscribe() 363 Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get(); 364 assert subscriber != null; // cannot be called before subscriber calls request(1) 365 subscriber.onNext(item); 366 } 367 368 @Override onError(Throwable throwable)369 public void onError(Throwable throwable) { 370 assert subscriptionCF.isDone(); // cannot be called before onSubscribe() 371 // onError can be called before request(1), and therefore can 372 // be called before subscriberRef is set. 373 subscribedCF.thenAccept(s -> s.onError(throwable)); 374 } 375 376 @Override onComplete()377 public void onComplete() { 378 assert subscriptionCF.isDone(); // cannot be called before onSubscribe() 379 // onComplete can be called before request(1), and therefore can 380 // be called before subscriberRef is set. 381 subscribedCF.thenAccept(s -> s.onComplete()); 382 } 383 384 @Override getBody()385 public CompletionStage<Publisher<List<ByteBuffer>>> getBody() { 386 return body; 387 } 388 } 389 serverAuthority(HttpServer server)390 static String serverAuthority(HttpServer server) { 391 return InetAddress.getLoopbackAddress().getHostName() + ":" 392 + server.getAddress().getPort(); 393 } 394 395 @BeforeTest setup()396 public void setup() throws Exception { 397 sslContext = new SimpleSSLContext().get(); 398 if (sslContext == null) 399 throw new AssertionError("Unexpected null sslContext"); 400 401 // HTTP/1.1 402 HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler(); 403 HttpTestHandler h1_chunkHandler = new HTTP_VariableLengthHandler(); 404 InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); 405 httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0)); 406 httpTestServer.addHandler( h1_fixedLengthHandler, "/http1/fixed"); 407 httpTestServer.addHandler(h1_chunkHandler,"/http1/chunk"); 408 httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed"; 409 httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk"; 410 411 HttpsServer httpsServer = HttpsServer.create(sa, 0); 412 httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); 413 httpsTestServer = HttpTestServer.of(httpsServer); 414 httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed"); 415 httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk"); 416 httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed"; 417 httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk"; 418 419 // HTTP/2 420 HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler(); 421 HttpTestHandler h2_chunkedHandler = new HTTP_VariableLengthHandler(); 422 423 http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0)); 424 http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed"); 425 http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk"); 426 http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed"; 427 http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk"; 428 429 https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext)); 430 https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed"); 431 https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk"); 432 https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed"; 433 https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk"; 434 435 httpTestServer.start(); 436 httpsTestServer.start(); 437 http2TestServer.start(); 438 https2TestServer.start(); 439 } 440 441 @AfterTest teardown()442 public void teardown() throws Exception { 443 Thread.sleep(100); 444 AssertionError fail = TRACKER.check(500); 445 try { 446 httpTestServer.stop(); 447 httpsTestServer.stop(); 448 http2TestServer.stop(); 449 https2TestServer.stop(); 450 } finally { 451 if (fail != null) { 452 throw fail; 453 } 454 } 455 } 456 457 static final String WITH_BODY = "Lorem ipsum dolor sit amet, consectetur" + 458 " adipiscing elit, sed do eiusmod tempor incididunt ut labore et" + 459 " dolore magna aliqua. Ut enim ad minim veniam, quis nostrud" + 460 " exercitation ullamco laboris nisi ut aliquip ex ea" + 461 " commodo consequat. Duis aute irure dolor in reprehenderit in " + 462 "voluptate velit esse cillum dolore eu fugiat nulla pariatur." + 463 " Excepteur sint occaecat cupidatat non proident, sunt in culpa qui" + 464 " officia deserunt mollit anim id est laborum."; 465 466 static class HTTP_FixedLengthHandler implements HttpTestHandler { 467 @Override handle(HttpTestExchange t)468 public void handle(HttpTestExchange t) throws IOException { 469 out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI()); 470 try (InputStream is = t.getRequestBody()) { 471 is.readAllBytes(); 472 } 473 if (t.getRequestURI().getPath().endsWith("/withBody")) { 474 byte[] bytes = WITH_BODY.getBytes(UTF_8); 475 t.sendResponseHeaders(200, bytes.length); // body 476 try (OutputStream os = t.getResponseBody()) { 477 os.write(bytes); 478 } 479 } else { 480 t.sendResponseHeaders(200, 0); //no body 481 } 482 } 483 } 484 485 static class HTTP_VariableLengthHandler implements HttpTestHandler { 486 @Override handle(HttpTestExchange t)487 public void handle(HttpTestExchange t) throws IOException { 488 out.println("HTTP_VariableLengthHandler received request to " + t.getRequestURI()); 489 try (InputStream is = t.getRequestBody()) { 490 is.readAllBytes(); 491 } 492 t.sendResponseHeaders(200, -1); //chunked or variable 493 if (t.getRequestURI().getPath().endsWith("/withBody")) { 494 byte[] bytes = WITH_BODY.getBytes(UTF_8); 495 try (OutputStream os = t.getResponseBody()) { 496 int chunkLen = bytes.length/10; 497 if (chunkLen == 0) { 498 os.write(bytes); 499 } else { 500 int count = 0; 501 for (int i=0; i<10; i++) { 502 os.write(bytes, count, chunkLen); 503 os.flush(); 504 count += chunkLen; 505 } 506 os.write(bytes, count, bytes.length % chunkLen); 507 count += bytes.length % chunkLen; 508 assert count == bytes.length; 509 } 510 } 511 } else { 512 t.getResponseBody().close(); // no body 513 } 514 } 515 } 516 } 517