1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @summary Verify that dependent synchronous actions added before the promise CF 27 * completes are executed either asynchronously in an executor when the 28 * CF later completes, or in the user thread that joins. 29 * @library /test/lib http2/server 30 * @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters DependentPromiseActionsTest 31 * @modules java.base/sun.net.www.http 32 * java.net.http/jdk.internal.net.http.common 33 * java.net.http/jdk.internal.net.http.frame 34 * java.net.http/jdk.internal.net.http.hpack 35 * @run testng/othervm -Djdk.internal.httpclient.debug=true DependentPromiseActionsTest 36 * @run testng/othervm/java.security.policy=dependent.policy 37 * -Djdk.internal.httpclient.debug=true DependentPromiseActionsTest 38 */ 39 40 import java.io.BufferedReader; 41 import java.io.InputStreamReader; 42 import java.lang.StackWalker.StackFrame; 43 import jdk.test.lib.net.SimpleSSLContext; 44 import org.testng.annotations.AfterTest; 45 import org.testng.annotations.AfterClass; 46 import org.testng.annotations.BeforeTest; 47 import org.testng.annotations.DataProvider; 48 import org.testng.annotations.Test; 49 50 import javax.net.ssl.SSLContext; 51 import java.io.IOException; 52 import java.io.InputStream; 53 import java.io.OutputStream; 54 import java.net.URI; 55 import java.net.URISyntaxException; 56 import java.net.http.HttpClient; 57 import java.net.http.HttpHeaders; 58 import java.net.http.HttpRequest; 59 import java.net.http.HttpResponse; 60 import java.net.http.HttpResponse.BodyHandler; 61 import java.net.http.HttpResponse.BodyHandlers; 62 import java.net.http.HttpResponse.BodySubscriber; 63 import java.net.http.HttpResponse.PushPromiseHandler; 64 import java.nio.ByteBuffer; 65 import java.nio.charset.StandardCharsets; 66 import java.util.EnumSet; 67 import java.util.List; 68 import java.util.Map; 69 import java.util.Optional; 70 import java.util.concurrent.CompletableFuture; 71 import java.util.concurrent.CompletionException; 72 import java.util.concurrent.CompletionStage; 73 import java.util.concurrent.ConcurrentHashMap; 74 import java.util.concurrent.ConcurrentMap; 75 import java.util.concurrent.Executor; 76 import java.util.concurrent.Executors; 77 import java.util.concurrent.Flow; 78 import java.util.concurrent.Semaphore; 79 import java.util.concurrent.atomic.AtomicLong; 80 import java.util.concurrent.atomic.AtomicReference; 81 import java.util.function.BiPredicate; 82 import java.util.function.Consumer; 83 import java.util.function.Function; 84 import java.util.function.Supplier; 85 import java.util.stream.Collectors; 86 import java.util.stream.Stream; 87 88 import static java.lang.System.err; 89 import static java.lang.System.out; 90 import static java.lang.String.format; 91 import static java.nio.charset.StandardCharsets.UTF_8; 92 import static org.testng.Assert.assertEquals; 93 import static org.testng.Assert.assertTrue; 94 95 public class DependentPromiseActionsTest implements HttpServerAdapters { 96 97 SSLContext sslContext; 98 HttpTestServer http2TestServer; // HTTP/2 ( h2c ) 99 HttpTestServer https2TestServer; // HTTP/2 ( h2 ) 100 String http2URI_fixed; 101 String http2URI_chunk; 102 String https2URI_fixed; 103 String https2URI_chunk; 104 105 static final StackWalker WALKER = 106 StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE); 107 108 static final int ITERATION_COUNT = 1; 109 // a shared executor helps reduce the amount of threads created by the test 110 static final Executor executor = new TestExecutor(Executors.newCachedThreadPool()); 111 static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>(); 112 static volatile boolean tasksFailed; 113 static final AtomicLong serverCount = new AtomicLong(); 114 static final AtomicLong clientCount = new AtomicLong(); 115 static final long start = System.nanoTime(); now()116 public static String now() { 117 long now = System.nanoTime() - start; 118 long secs = now / 1000_000_000; 119 long mill = (now % 1000_000_000) / 1000_000; 120 long nan = now % 1000_000; 121 return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan); 122 } 123 124 private volatile HttpClient sharedClient; 125 126 static class TestExecutor implements Executor { 127 final AtomicLong tasks = new AtomicLong(); 128 Executor executor; TestExecutor(Executor executor)129 TestExecutor(Executor executor) { 130 this.executor = executor; 131 } 132 133 @Override execute(Runnable command)134 public void execute(Runnable command) { 135 long id = tasks.incrementAndGet(); 136 executor.execute(() -> { 137 try { 138 command.run(); 139 } catch (Throwable t) { 140 tasksFailed = true; 141 System.out.printf(now() + "Task %s failed: %s%n", id, t); 142 System.err.printf(now() + "Task %s failed: %s%n", id, t); 143 FAILURES.putIfAbsent("Task " + id, t); 144 throw t; 145 } 146 }); 147 } 148 } 149 150 @AfterClass printFailedTests()151 static final void printFailedTests() { 152 out.println("\n========================="); 153 try { 154 out.printf("%n%sCreated %d servers and %d clients%n", 155 now(), serverCount.get(), clientCount.get()); 156 if (FAILURES.isEmpty()) return; 157 out.println("Failed tests: "); 158 FAILURES.entrySet().forEach((e) -> { 159 out.printf("\t%s: %s%n", e.getKey(), e.getValue()); 160 e.getValue().printStackTrace(out); 161 e.getValue().printStackTrace(); 162 }); 163 if (tasksFailed) { 164 System.out.println("WARNING: Some tasks failed"); 165 } 166 } finally { 167 out.println("\n=========================\n"); 168 } 169 } 170 uris()171 private String[] uris() { 172 return new String[] { 173 http2URI_fixed, 174 http2URI_chunk, 175 https2URI_fixed, 176 https2URI_chunk, 177 }; 178 } 179 180 enum SubscriberType {EAGER, LAZZY} 181 182 static final class SemaphoreStallerSupplier 183 implements Supplier<SemaphoreStaller> { 184 @Override get()185 public SemaphoreStaller get() { 186 return new SemaphoreStaller(); 187 } 188 @Override toString()189 public String toString() { 190 return "SemaphoreStaller"; 191 } 192 } 193 194 @DataProvider(name = "noStalls") noThrows()195 public Object[][] noThrows() { 196 String[] uris = uris(); 197 Object[][] result = new Object[uris.length * 2][]; 198 int i = 0; 199 for (boolean sameClient : List.of(false, true)) { 200 for (String uri: uris()) { 201 result[i++] = new Object[] {uri, sameClient}; 202 } 203 } 204 assert i == uris.length * 2; 205 return result; 206 } 207 208 @DataProvider(name = "variants") variants()209 public Object[][] variants() { 210 String[] uris = uris(); 211 Object[][] result = new Object[uris.length * 2][]; 212 int i = 0; 213 Supplier<? extends Staller> s = new SemaphoreStallerSupplier(); 214 for (Supplier<? extends Staller> staller : List.of(s)) { 215 for (boolean sameClient : List.of(false, true)) { 216 for (String uri : uris()) { 217 result[i++] = new Object[]{uri, sameClient, staller}; 218 } 219 } 220 } 221 assert i == uris.length * 2; 222 return result; 223 } 224 makeNewClient()225 private HttpClient makeNewClient() { 226 clientCount.incrementAndGet(); 227 return HttpClient.newBuilder() 228 .executor(executor) 229 .sslContext(sslContext) 230 .build(); 231 } 232 newHttpClient(boolean share)233 HttpClient newHttpClient(boolean share) { 234 if (!share) return makeNewClient(); 235 HttpClient shared = sharedClient; 236 if (shared != null) return shared; 237 synchronized (this) { 238 shared = sharedClient; 239 if (shared == null) { 240 shared = sharedClient = makeNewClient(); 241 } 242 return shared; 243 } 244 } 245 246 @Test(dataProvider = "noStalls") testNoStalls(String uri, boolean sameClient)247 public void testNoStalls(String uri, boolean sameClient) 248 throws Exception { 249 HttpClient client = null; 250 out.printf("%ntestNoStalls(%s, %b)%n", uri, sameClient); 251 for (int i=0; i< ITERATION_COUNT; i++) { 252 if (!sameClient || client == null) 253 client = newHttpClient(sameClient); 254 255 HttpRequest req = HttpRequest.newBuilder(URI.create(uri)) 256 .build(); 257 BodyHandler<Stream<String>> handler = 258 new StallingBodyHandler((w) -> {}, 259 BodyHandlers.ofLines()); 260 Map<HttpRequest, CompletableFuture<HttpResponse<Stream<String>>>> pushPromises = 261 new ConcurrentHashMap<>(); 262 PushPromiseHandler<Stream<String>> pushHandler = new PushPromiseHandler<>() { 263 @Override 264 public void applyPushPromise(HttpRequest initiatingRequest, 265 HttpRequest pushPromiseRequest, 266 Function<BodyHandler<Stream<String>>, 267 CompletableFuture<HttpResponse<Stream<String>>>> 268 acceptor) { 269 pushPromises.putIfAbsent(pushPromiseRequest, acceptor.apply(handler)); 270 } 271 }; 272 HttpResponse<Stream<String>> response = 273 client.sendAsync(req, BodyHandlers.ofLines(), pushHandler).get(); 274 String body = response.body().collect(Collectors.joining("|")); 275 assertEquals(URI.create(body).getPath(), URI.create(uri).getPath()); 276 for (HttpRequest promised : pushPromises.keySet()) { 277 out.printf("%s Received promise: %s%n\tresponse: %s%n", 278 now(), promised, pushPromises.get(promised).get()); 279 String promisedBody = pushPromises.get(promised).get().body() 280 .collect(Collectors.joining("|")); 281 assertEquals(promisedBody, promised.uri().toASCIIString()); 282 } 283 assertEquals(3, pushPromises.size()); 284 } 285 } 286 287 @Test(dataProvider = "variants") testAsStringAsync(String uri, boolean sameClient, Supplier<Staller> stallers)288 public void testAsStringAsync(String uri, 289 boolean sameClient, 290 Supplier<Staller> stallers) 291 throws Exception 292 { 293 String test = format("testAsStringAsync(%s, %b, %s)", 294 uri, sameClient, stallers); 295 testDependent(test, uri, sameClient, BodyHandlers::ofString, 296 this::finish, this::extractString, stallers, 297 SubscriberType.EAGER); 298 } 299 300 @Test(dataProvider = "variants") testAsLinesAsync(String uri, boolean sameClient, Supplier<Staller> stallers)301 public void testAsLinesAsync(String uri, 302 boolean sameClient, 303 Supplier<Staller> stallers) 304 throws Exception 305 { 306 String test = format("testAsLinesAsync(%s, %b, %s)", 307 uri, sameClient, stallers); 308 testDependent(test, uri, sameClient, BodyHandlers::ofLines, 309 this::finish, this::extractStream, stallers, 310 SubscriberType.LAZZY); 311 } 312 313 @Test(dataProvider = "variants") testAsInputStreamAsync(String uri, boolean sameClient, Supplier<Staller> stallers)314 public void testAsInputStreamAsync(String uri, 315 boolean sameClient, 316 Supplier<Staller> stallers) 317 throws Exception 318 { 319 String test = format("testAsInputStreamAsync(%s, %b, %s)", 320 uri, sameClient, stallers); 321 testDependent(test, uri, sameClient, BodyHandlers::ofInputStream, 322 this::finish, this::extractInputStream, stallers, 323 SubscriberType.LAZZY); 324 } 325 testDependent(String name, String uri, boolean sameClient, Supplier<BodyHandler<T>> handlers, Finisher finisher, Extractor<T> extractor, Supplier<Staller> stallers, SubscriberType subscriberType)326 private <T,U> void testDependent(String name, String uri, boolean sameClient, 327 Supplier<BodyHandler<T>> handlers, 328 Finisher finisher, 329 Extractor<T> extractor, 330 Supplier<Staller> stallers, 331 SubscriberType subscriberType) 332 throws Exception 333 { 334 out.printf("%n%s%s%n", now(), name); 335 try { 336 testDependent(uri, sameClient, handlers, finisher, 337 extractor, stallers, subscriberType); 338 } catch (Error | Exception x) { 339 FAILURES.putIfAbsent(name, x); 340 throw x; 341 } 342 } 343 testDependent(String uri, boolean sameClient, Supplier<BodyHandler<T>> handlers, Finisher finisher, Extractor<T> extractor, Supplier<Staller> stallers, SubscriberType subscriberType)344 private <T,U> void testDependent(String uri, boolean sameClient, 345 Supplier<BodyHandler<T>> handlers, 346 Finisher finisher, 347 Extractor<T> extractor, 348 Supplier<Staller> stallers, 349 SubscriberType subscriberType) 350 throws Exception 351 { 352 HttpClient client = null; 353 for (Where where : EnumSet.of(Where.BODY_HANDLER)) { 354 if (!sameClient || client == null) 355 client = newHttpClient(sameClient); 356 357 HttpRequest req = HttpRequest. 358 newBuilder(URI.create(uri)) 359 .build(); 360 StallingPushPromiseHandler<T> promiseHandler = 361 new StallingPushPromiseHandler<>(where, handlers, stallers); 362 BodyHandler<T> handler = handlers.get(); 363 System.out.println("try stalling in " + where); 364 CompletableFuture<HttpResponse<T>> responseCF = 365 client.sendAsync(req, handler, promiseHandler); 366 assert subscriberType == SubscriberType.LAZZY || !responseCF.isDone(); 367 finisher.finish(where, responseCF, promiseHandler, extractor); 368 } 369 } 370 371 enum Where { 372 ON_PUSH_PROMISE, BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF; select(Consumer<Where> consumer)373 public Consumer<Where> select(Consumer<Where> consumer) { 374 return new Consumer<Where>() { 375 @Override 376 public void accept(Where where) { 377 if (Where.this == where) { 378 consumer.accept(where); 379 } 380 } 381 }; 382 } 383 } 384 385 static final class StallingPushPromiseHandler<T> implements PushPromiseHandler<T> { 386 387 static final class Tuple<U> { 388 public final CompletableFuture<HttpResponse<U>> response; 389 public final Staller staller; 390 public final AtomicReference<RuntimeException> failed; 391 Tuple(AtomicReference<RuntimeException> failed, 392 CompletableFuture<HttpResponse<U>> response, 393 Staller staller) { 394 this.response = response; 395 this.staller = staller; 396 this.failed = failed; 397 } 398 } 399 400 public final ConcurrentMap<HttpRequest, Tuple<T>> promiseMap = 401 new ConcurrentHashMap<>(); 402 private final Supplier<Staller> stallers; 403 private final Supplier<BodyHandler<T>> handlers; 404 private final Where where; 405 private final Thread thread = Thread.currentThread(); // main thread 406 407 StallingPushPromiseHandler(Where where, 408 Supplier<BodyHandler<T>> handlers, 409 Supplier<Staller> stallers) { 410 this.where = where; 411 this.handlers = handlers; 412 this.stallers = stallers; 413 } 414 415 @Override 416 public void applyPushPromise(HttpRequest initiatingRequest, 417 HttpRequest pushPromiseRequest, 418 Function<BodyHandler<T>, 419 CompletableFuture<HttpResponse<T>>> acceptor) { 420 AtomicReference<RuntimeException> failed = new AtomicReference<>(); 421 Staller staller = stallers.get(); 422 staller.acquire(); 423 assert staller.willStall(); 424 try { 425 BodyHandler handler = new StallingBodyHandler<>( 426 where.select(staller), handlers.get()); 427 CompletableFuture<HttpResponse<T>> cf = acceptor.apply(handler); 428 Tuple<T> tuple = new Tuple(failed, cf, staller); 429 promiseMap.putIfAbsent(pushPromiseRequest, tuple); 430 CompletableFuture<?> done = cf.whenComplete( 431 (r, t) -> checkThreadAndStack(thread, failed, r, t)); 432 assert !cf.isDone(); 433 } finally { 434 staller.release(); 435 } 436 } 437 } 438 439 interface Extractor<T> { 440 public List<String> extract(HttpResponse<T> resp); 441 } 442 443 final List<String> extractString(HttpResponse<String> resp) { 444 return List.of(resp.body()); 445 } 446 447 final List<String> extractStream(HttpResponse<Stream<String>> resp) { 448 return resp.body().collect(Collectors.toList()); 449 } 450 451 final List<String> extractInputStream(HttpResponse<InputStream> resp) { 452 try (InputStream is = resp.body()) { 453 return new BufferedReader(new InputStreamReader(is)) 454 .lines().collect(Collectors.toList()); 455 } catch (IOException x) { 456 throw new CompletionException(x); 457 } 458 } 459 460 interface Finisher<T> { 461 public void finish(Where w, 462 CompletableFuture<HttpResponse<T>> cf, 463 StallingPushPromiseHandler<T> ph, 464 Extractor<T> extractor); 465 } 466 467 static Optional<StackFrame> findFrame(Stream<StackFrame> s, String name) { 468 return s.filter((f) -> f.getClassName().contains(name)) 469 .filter((f) -> f.getDeclaringClass().getModule().equals(HttpClient.class.getModule())) 470 .findFirst(); 471 } 472 473 static <T> void checkThreadAndStack(Thread thread, 474 AtomicReference<RuntimeException> failed, 475 T result, 476 Throwable error) { 477 if (Thread.currentThread() == thread) { 478 //failed.set(new RuntimeException("Dependant action was executed in " + thread)); 479 List<StackFrame> httpStack = WALKER.walk(s -> s.filter(f -> f.getDeclaringClass() 480 .getModule().equals(HttpClient.class.getModule())) 481 .collect(Collectors.toList())); 482 if (!httpStack.isEmpty()) { 483 System.out.println("Found unexpected trace: "); 484 httpStack.forEach(f -> System.out.printf("\t%s%n", f)); 485 failed.set(new RuntimeException("Dependant action has unexpected frame in " + 486 Thread.currentThread() + ": " + httpStack.get(0))); 487 488 } return; 489 } else if (System.getSecurityManager() != null) { 490 Optional<StackFrame> sf = WALKER.walk(s -> findFrame(s, "PrivilegedRunnable")); 491 if (!sf.isPresent()) { 492 failed.set(new RuntimeException("Dependant action does not have expected frame in " 493 + Thread.currentThread())); 494 return; 495 } else { 496 System.out.println("Found expected frame: " + sf.get()); 497 } 498 } else { 499 List<StackFrame> httpStack = WALKER.walk(s -> s.filter(f -> f.getDeclaringClass() 500 .getModule().equals(HttpClient.class.getModule())) 501 .collect(Collectors.toList())); 502 if (!httpStack.isEmpty()) { 503 System.out.println("Found unexpected trace: "); 504 httpStack.forEach(f -> System.out.printf("\t%s%n", f)); 505 failed.set(new RuntimeException("Dependant action has unexpected frame in " + 506 Thread.currentThread() + ": " + httpStack.get(0))); 507 508 } 509 } 510 } 511 512 <T> void finish(Where w, 513 StallingPushPromiseHandler.Tuple<T> tuple, 514 Extractor<T> extractor) { 515 AtomicReference<RuntimeException> failed = tuple.failed; 516 CompletableFuture<HttpResponse<T>> done = tuple.response; 517 Staller staller = tuple.staller; 518 try { 519 HttpResponse<T> response = done.join(); 520 List<String> result = extractor.extract(response); 521 URI uri = response.uri(); 522 RuntimeException error = failed.get(); 523 if (error != null) { 524 throw new RuntimeException("Test failed in " 525 + w + ": " + uri, error); 526 } 527 assertEquals(result, List.of(response.request().uri().toASCIIString())); 528 } finally { 529 staller.reset(); 530 } 531 } 532 533 <T> void finish(Where w, 534 CompletableFuture<HttpResponse<T>> cf, 535 StallingPushPromiseHandler<T> ph, 536 Extractor<T> extractor) { 537 HttpResponse<T> response = cf.join(); 538 List<String> result = extractor.extract(response); 539 for (HttpRequest req : ph.promiseMap.keySet()) { 540 finish(w, ph.promiseMap.get(req), extractor); 541 } 542 assertEquals(ph.promiseMap.size(), 3, 543 "Expected 3 push promises for " + w + " in " 544 + response.request().uri()); 545 assertEquals(result, List.of(response.request().uri().toASCIIString())); 546 547 } 548 549 interface Staller extends Consumer<Where> { 550 void release(); 551 void acquire(); 552 void reset(); 553 boolean willStall(); 554 } 555 556 static final class SemaphoreStaller implements Staller { 557 final Semaphore sem = new Semaphore(1); 558 @Override 559 public void accept(Where where) { 560 sem.acquireUninterruptibly(); 561 } 562 563 @Override 564 public void release() { 565 sem.release(); 566 } 567 568 @Override 569 public void acquire() { 570 sem.acquireUninterruptibly(); 571 } 572 573 @Override 574 public void reset() { 575 sem.drainPermits(); 576 sem.release(); 577 } 578 579 @Override 580 public boolean willStall() { 581 return sem.availablePermits() <= 0; 582 } 583 584 @Override 585 public String toString() { 586 return "SemaphoreStaller"; 587 } 588 } 589 590 static final class StallingBodyHandler<T> implements BodyHandler<T> { 591 final Consumer<Where> stalling; 592 final BodyHandler<T> bodyHandler; 593 StallingBodyHandler(Consumer<Where> stalling, BodyHandler<T> bodyHandler) { 594 this.stalling = stalling; 595 this.bodyHandler = bodyHandler; 596 } 597 @Override 598 public BodySubscriber<T> apply(HttpResponse.ResponseInfo rinfo) { 599 stalling.accept(Where.BODY_HANDLER); 600 BodySubscriber<T> subscriber = bodyHandler.apply(rinfo); 601 return new StallingBodySubscriber(stalling, subscriber); 602 } 603 } 604 605 static final class StallingBodySubscriber<T> implements BodySubscriber<T> { 606 private final BodySubscriber<T> subscriber; 607 volatile boolean onSubscribeCalled; 608 final Consumer<Where> stalling; 609 StallingBodySubscriber(Consumer<Where> stalling, BodySubscriber<T> subscriber) { 610 this.stalling = stalling; 611 this.subscriber = subscriber; 612 } 613 614 @Override 615 public void onSubscribe(Flow.Subscription subscription) { 616 //out.println("onSubscribe "); 617 onSubscribeCalled = true; 618 stalling.accept(Where.ON_SUBSCRIBE); 619 subscriber.onSubscribe(subscription); 620 } 621 622 @Override 623 public void onNext(List<ByteBuffer> item) { 624 // out.println("onNext " + item); 625 assertTrue(onSubscribeCalled); 626 stalling.accept(Where.ON_NEXT); 627 subscriber.onNext(item); 628 } 629 630 @Override 631 public void onError(Throwable throwable) { 632 //out.println("onError"); 633 assertTrue(onSubscribeCalled); 634 stalling.accept(Where.ON_ERROR); 635 subscriber.onError(throwable); 636 } 637 638 @Override 639 public void onComplete() { 640 //out.println("onComplete"); 641 assertTrue(onSubscribeCalled, "onComplete called before onSubscribe"); 642 stalling.accept(Where.ON_COMPLETE); 643 subscriber.onComplete(); 644 } 645 646 @Override 647 public CompletionStage<T> getBody() { 648 stalling.accept(Where.GET_BODY); 649 try { 650 stalling.accept(Where.BODY_CF); 651 } catch (Throwable t) { 652 return CompletableFuture.failedFuture(t); 653 } 654 return subscriber.getBody(); 655 } 656 } 657 658 659 @BeforeTest 660 public void setup() throws Exception { 661 sslContext = new SimpleSSLContext().get(); 662 if (sslContext == null) 663 throw new AssertionError("Unexpected null sslContext"); 664 665 // HTTP/2 666 HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler(); 667 HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler(); 668 669 http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0)); 670 http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed"); 671 http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk"); 672 http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed/y"; 673 http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk/y"; 674 675 https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext)); 676 https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed"); 677 https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk"); 678 https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed/y"; 679 https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk/y"; 680 681 serverCount.addAndGet(4); 682 http2TestServer.start(); 683 https2TestServer.start(); 684 } 685 686 @AfterTest 687 public void teardown() throws Exception { 688 sharedClient = null; 689 http2TestServer.stop(); 690 https2TestServer.stop(); 691 } 692 693 static final BiPredicate<String,String> ACCEPT_ALL = (x, y) -> true; 694 695 private static void pushPromiseFor(HttpTestExchange t, 696 URI requestURI, 697 String pushPath, 698 boolean fixed) 699 throws IOException 700 { 701 try { 702 URI promise = new URI(requestURI.getScheme(), 703 requestURI.getAuthority(), 704 pushPath, null, null); 705 byte[] promiseBytes = promise.toASCIIString().getBytes(UTF_8); 706 out.printf("TestServer: %s Pushing promise: %s%n", now(), promise); 707 err.printf("TestServer: %s Pushing promise: %s%n", now(), promise); 708 HttpHeaders headers; 709 if (fixed) { 710 String length = String.valueOf(promiseBytes.length); 711 headers = HttpHeaders.of(Map.of("Content-Length", List.of(length)), 712 ACCEPT_ALL); 713 } else { 714 headers = HttpHeaders.of(Map.of(), ACCEPT_ALL); // empty 715 } 716 t.serverPush(promise, headers, promiseBytes); 717 } catch (URISyntaxException x) { 718 throw new IOException(x.getMessage(), x); 719 } 720 } 721 722 static class HTTP_FixedLengthHandler implements HttpTestHandler { 723 @Override 724 public void handle(HttpTestExchange t) throws IOException { 725 out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI()); 726 try (InputStream is = t.getRequestBody()) { 727 is.readAllBytes(); 728 } 729 URI requestURI = t.getRequestURI(); 730 for (int i = 1; i<2; i++) { 731 String path = requestURI.getPath() + "/before/promise-" + i; 732 pushPromiseFor(t, requestURI, path, true); 733 } 734 byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8); 735 t.sendResponseHeaders(200, resp.length); //fixed content length 736 try (OutputStream os = t.getResponseBody()) { 737 int bytes = resp.length/3; 738 for (int i = 0; i<2; i++) { 739 String path = requestURI.getPath() + "/after/promise-" + (i + 2); 740 os.write(resp, i * bytes, bytes); 741 os.flush(); 742 pushPromiseFor(t, requestURI, path, true); 743 } 744 os.write(resp, 2*bytes, resp.length - 2*bytes); 745 } 746 } 747 748 } 749 750 static class HTTP_ChunkedHandler implements HttpTestHandler { 751 @Override 752 public void handle(HttpTestExchange t) throws IOException { 753 out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI()); 754 byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8); 755 try (InputStream is = t.getRequestBody()) { 756 is.readAllBytes(); 757 } 758 URI requestURI = t.getRequestURI(); 759 for (int i = 1; i<2; i++) { 760 String path = requestURI.getPath() + "/before/promise-" + i; 761 pushPromiseFor(t, requestURI, path, false); 762 } 763 t.sendResponseHeaders(200, -1); // chunked/variable 764 try (OutputStream os = t.getResponseBody()) { 765 int bytes = resp.length/3; 766 for (int i = 0; i<2; i++) { 767 String path = requestURI.getPath() + "/after/promise-" + (i + 2); 768 os.write(resp, i * bytes, bytes); 769 os.flush(); 770 pushPromiseFor(t, requestURI, path, false); 771 } 772 os.write(resp, 2*bytes, resp.length - 2*bytes); 773 } 774 } 775 } 776 777 778 } 779