1 /* 2 * Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 /* 25 * @test 26 * @bug 8252374 27 * @library /test/lib http2/server 28 * @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters 29 * ReferenceTracker AggregateRequestBodyTest 30 * @modules java.base/sun.net.www.http 31 * java.net.http/jdk.internal.net.http.common 32 * java.net.http/jdk.internal.net.http.frame 33 * java.net.http/jdk.internal.net.http.hpack 34 * @run testng/othervm -Djdk.internal.httpclient.debug=true 35 * -Djdk.httpclient.HttpClient.log=requests,responses,errors 36 * AggregateRequestBodyTest 37 * @summary Tests HttpRequest.BodyPublishers::concat 38 */ 39 40 import java.net.InetAddress; 41 import java.net.InetSocketAddress; 42 import java.net.URI; 43 import java.net.http.HttpClient; 44 import java.net.http.HttpRequest; 45 import java.net.http.HttpRequest.BodyPublisher; 46 import java.net.http.HttpRequest.BodyPublishers; 47 import java.net.http.HttpResponse; 48 import java.net.http.HttpResponse.BodyHandlers; 49 import java.nio.ByteBuffer; 50 import java.util.Arrays; 51 import java.util.LinkedHashMap; 52 import java.util.List; 53 import java.util.Map; 54 import java.util.concurrent.CompletableFuture; 55 import java.util.concurrent.CompletionException; 56 import java.util.concurrent.ConcurrentHashMap; 57 import java.util.concurrent.ConcurrentLinkedDeque; 58 import java.util.concurrent.ConcurrentMap; 59 import java.util.concurrent.Executor; 60 import java.util.concurrent.Executors; 61 import java.util.concurrent.Flow; 62 import java.util.concurrent.Flow.Subscriber; 63 import java.util.concurrent.Flow.Subscription; 64 import java.util.concurrent.TimeUnit; 65 import java.util.concurrent.TimeoutException; 66 import java.util.concurrent.atomic.AtomicLong; 67 import java.util.concurrent.atomic.AtomicReference; 68 import java.util.function.Consumer; 69 import java.util.function.Supplier; 70 import java.util.stream.Collectors; 71 import java.util.stream.LongStream; 72 import java.util.stream.Stream; 73 import javax.net.ssl.SSLContext; 74 75 import com.sun.net.httpserver.HttpServer; 76 import com.sun.net.httpserver.HttpsConfigurator; 77 import com.sun.net.httpserver.HttpsServer; 78 import jdk.test.lib.net.SimpleSSLContext; 79 import org.testng.Assert; 80 import org.testng.ITestContext; 81 import org.testng.ITestResult; 82 import org.testng.SkipException; 83 import org.testng.annotations.AfterClass; 84 import org.testng.annotations.AfterTest; 85 import org.testng.annotations.BeforeMethod; 86 import org.testng.annotations.BeforeTest; 87 import org.testng.annotations.DataProvider; 88 import org.testng.annotations.Test; 89 90 import static java.lang.System.out; 91 import static org.testng.Assert.assertEquals; 92 import static org.testng.Assert.assertFalse; 93 import static org.testng.Assert.assertTrue; 94 import static org.testng.Assert.expectThrows; 95 96 public class AggregateRequestBodyTest implements HttpServerAdapters { 97 98 SSLContext sslContext; 99 HttpTestServer http1TestServer; // HTTP/1.1 ( http ) 100 HttpTestServer https1TestServer; // HTTPS/1.1 ( https ) 101 HttpTestServer http2TestServer; // HTTP/2 ( h2c ) 102 HttpTestServer https2TestServer; // HTTP/2 ( h2 ) 103 String http1URI; 104 String https1URI; 105 String http2URI; 106 String https2URI; 107 108 static final int RESPONSE_CODE = 200; 109 static final int ITERATION_COUNT = 4; 110 static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class; 111 static final Class<CompletionException> CE = CompletionException.class; 112 // a shared executor helps reduce the amount of threads created by the test 113 static final Executor executor = new TestExecutor(Executors.newCachedThreadPool()); 114 static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>(); 115 static volatile boolean tasksFailed; 116 static final AtomicLong serverCount = new AtomicLong(); 117 static final AtomicLong clientCount = new AtomicLong(); 118 static final long start = System.nanoTime(); now()119 public static String now() { 120 long now = System.nanoTime() - start; 121 long secs = now / 1000_000_000; 122 long mill = (now % 1000_000_000) / 1000_000; 123 long nan = now % 1000_000; 124 return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan); 125 } 126 127 final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE; 128 private volatile HttpClient sharedClient; 129 130 static class TestExecutor implements Executor { 131 final AtomicLong tasks = new AtomicLong(); 132 Executor executor; TestExecutor(Executor executor)133 TestExecutor(Executor executor) { 134 this.executor = executor; 135 } 136 137 @Override execute(Runnable command)138 public void execute(Runnable command) { 139 long id = tasks.incrementAndGet(); 140 executor.execute(() -> { 141 try { 142 command.run(); 143 } catch (Throwable t) { 144 tasksFailed = true; 145 System.out.printf(now() + "Task %s failed: %s%n", id, t); 146 System.err.printf(now() + "Task %s failed: %s%n", id, t); 147 FAILURES.putIfAbsent("Task " + id, t); 148 throw t; 149 } 150 }); 151 } 152 } 153 stopAfterFirstFailure()154 protected boolean stopAfterFirstFailure() { 155 return Boolean.getBoolean("jdk.internal.httpclient.debug"); 156 } 157 158 final AtomicReference<SkipException> skiptests = new AtomicReference<>(); checkSkip()159 void checkSkip() { 160 var skip = skiptests.get(); 161 if (skip != null) throw skip; 162 } name(ITestResult result)163 static String name(ITestResult result) { 164 var params = result.getParameters(); 165 return result.getName() 166 + (params == null ? "()" : Arrays.toString(result.getParameters())); 167 } 168 169 @BeforeMethod beforeMethod(ITestContext context)170 void beforeMethod(ITestContext context) { 171 if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) { 172 if (skiptests.get() == null) { 173 SkipException skip = new SkipException("some tests failed"); 174 skip.setStackTrace(new StackTraceElement[0]); 175 skiptests.compareAndSet(null, skip); 176 } 177 } 178 } 179 180 @AfterClass printFailedTests(ITestContext context)181 static final void printFailedTests(ITestContext context) { 182 out.println("\n========================="); 183 try { 184 var failed = context.getFailedTests().getAllResults().stream() 185 .collect(Collectors.toMap(r -> name(r), ITestResult::getThrowable)); 186 FAILURES.putAll(failed); 187 188 out.printf("%n%sCreated %d servers and %d clients%n", 189 now(), serverCount.get(), clientCount.get()); 190 if (FAILURES.isEmpty()) return; 191 out.println("Failed tests: "); 192 FAILURES.entrySet().forEach((e) -> { 193 out.printf("\t%s: %s%n", e.getKey(), e.getValue()); 194 e.getValue().printStackTrace(out); 195 e.getValue().printStackTrace(); 196 }); 197 if (tasksFailed) { 198 System.out.println("WARNING: Some tasks failed"); 199 } 200 } finally { 201 out.println("\n=========================\n"); 202 } 203 } 204 uris()205 private String[] uris() { 206 return new String[] { 207 http1URI, 208 https1URI, 209 http2URI, 210 https2URI, 211 }; 212 } 213 214 static AtomicLong URICOUNT = new AtomicLong(); 215 216 @DataProvider(name = "variants") variants(ITestContext context)217 public Object[][] variants(ITestContext context) { 218 if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) { 219 return new Object[0][]; 220 } 221 String[] uris = uris(); 222 Object[][] result = new Object[uris.length * 2][]; 223 int i = 0; 224 for (boolean sameClient : List.of(false, true)) { 225 for (String uri : uris()) { 226 result[i++] = new Object[]{uri, sameClient}; 227 } 228 } 229 assert i == uris.length * 2; 230 return result; 231 } 232 makeNewClient()233 private HttpClient makeNewClient() { 234 clientCount.incrementAndGet(); 235 HttpClient client = HttpClient.newBuilder() 236 .proxy(HttpClient.Builder.NO_PROXY) 237 .executor(executor) 238 .sslContext(sslContext) 239 .build(); 240 return TRACKER.track(client); 241 } 242 newHttpClient(boolean share)243 HttpClient newHttpClient(boolean share) { 244 if (!share) return makeNewClient(); 245 HttpClient shared = sharedClient; 246 if (shared != null) return shared; 247 synchronized (this) { 248 shared = sharedClient; 249 if (shared == null) { 250 shared = sharedClient = makeNewClient(); 251 } 252 return shared; 253 } 254 } 255 256 static final List<String> BODIES = List.of( 257 "Lorem ipsum", 258 "dolor sit amet", 259 "consectetur adipiscing elit, sed do eiusmod tempor", 260 "quis nostrud exercitation ullamco", 261 "laboris nisi", 262 "ut", 263 "aliquip ex ea commodo consequat." + 264 "Duis aute irure dolor in reprehenderit in voluptate velit esse" + 265 "cillum dolore eu fugiat nulla pariatur.", 266 "Excepteur sint occaecat cupidatat non proident." 267 ); 268 publishers(String... content)269 static BodyPublisher[] publishers(String... content) { 270 if (content == null) return null; 271 BodyPublisher[] result = new BodyPublisher[content.length]; 272 for (int i=0; i < content.length ; i++) { 273 result[i] = content[i] == null ? null : BodyPublishers.ofString(content[i]); 274 } 275 return result; 276 } 277 strings(String... s)278 static String[] strings(String... s) { 279 return s; 280 } 281 282 @DataProvider(name = "sparseContent") nulls()283 Object[][] nulls() { 284 return new Object[][] { 285 {"null array", null}, 286 {"null element", strings((String)null)}, 287 {"null first element", strings(null, "one")}, 288 {"null second element", strings( "one", null)}, 289 {"null third element", strings( "one", "two", null)}, 290 {"null fourth element", strings( "one", "two", "three", null)}, 291 {"null random element", strings( "one", "two", "three", null, "five")}, 292 }; 293 } 294 lengths(long... lengths)295 static List<Long> lengths(long... lengths) { 296 return LongStream.of(lengths) 297 .mapToObj(Long::valueOf) 298 .collect(Collectors.toList()); 299 } 300 301 @DataProvider(name = "contentLengths") contentLengths()302 Object[][] contentLengths() { 303 return new Object[][] { 304 {-1, lengths(-1)}, 305 {-42, lengths(-42)}, 306 {42, lengths(42)}, 307 {42, lengths(10, 0, 20, 0, 12)}, 308 {-1, lengths(10, 0, 20, -1, 12)}, 309 {-1, lengths(-1, 0, 20, 10, 12)}, 310 {-1, lengths(10, 0, 20, 12, -1)}, 311 {-1, lengths(10, 0, 20, -10, 12)}, 312 {-1, lengths(-10, 0, 20, 10, 12)}, 313 {-1, lengths(10, 0, 20, 12, -10)}, 314 {-1, lengths(10, 0, Long.MIN_VALUE, -1, 12)}, 315 {-1, lengths(-1, 0, Long.MIN_VALUE, 10, 12)}, 316 {-1, lengths(10, Long.MIN_VALUE, 20, 12, -1)}, 317 {Long.MAX_VALUE, lengths(10, Long.MAX_VALUE - 42L, 20, 0, 12)}, 318 {-1, lengths(10, Long.MAX_VALUE - 40L, 20, 0, 12)}, 319 {-1, lengths(10, Long.MAX_VALUE - 12L, 20, 0, 12)}, 320 {-1, lengths(10, Long.MAX_VALUE/2L, Long.MAX_VALUE/2L + 1L, 0, 12)}, 321 {-1, lengths(10, Long.MAX_VALUE/2L, -1, Long.MAX_VALUE/2L + 1L, 12)}, 322 {-1, lengths(10, Long.MAX_VALUE, 12, Long.MAX_VALUE, 20)}, 323 {-1, lengths(10, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)}, 324 {-1, lengths(0, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)}, 325 {-1, lengths(Long.MAX_VALUE, Long.MAX_VALUE, 12, 0, 20)} 326 }; 327 } 328 329 @DataProvider(name="negativeRequests") negativeRequests()330 Object[][] negativeRequests() { 331 return new Object[][] { 332 {0L}, {-1L}, {-2L}, {Long.MIN_VALUE + 1L}, {Long.MIN_VALUE} 333 }; 334 } 335 336 337 static class ContentLengthPublisher implements BodyPublisher { 338 final long length; ContentLengthPublisher(long length)339 ContentLengthPublisher(long length) { 340 this.length = length; 341 } 342 @Override contentLength()343 public long contentLength() { 344 return length; 345 } 346 347 @Override subscribe(Subscriber<? super ByteBuffer> subscriber)348 public void subscribe(Subscriber<? super ByteBuffer> subscriber) { 349 } 350 of(List<Long> lengths)351 static ContentLengthPublisher[] of(List<Long> lengths) { 352 return lengths.stream() 353 .map(ContentLengthPublisher::new) 354 .toArray(ContentLengthPublisher[]::new); 355 } 356 } 357 358 /** 359 * A dummy publisher that allows to call onError on its subscriber (or not...). 360 */ 361 static class PublishWithError implements BodyPublisher { 362 final ConcurrentHashMap<Subscriber<?>, ErrorSubscription> subscribers = new ConcurrentHashMap<>(); 363 final long length; 364 final List<String> content; 365 final int errorAt; 366 final Supplier<? extends Throwable> errorSupplier; PublishWithError(List<String> content, int errorAt, Supplier<? extends Throwable> supplier)367 PublishWithError(List<String> content, int errorAt, Supplier<? extends Throwable> supplier) { 368 this.content = content; 369 this.errorAt = errorAt; 370 this.errorSupplier = supplier; 371 length = content.stream().mapToInt(String::length).sum(); 372 } 373 hasErrors()374 boolean hasErrors() { 375 return errorAt < content.size(); 376 } 377 378 @Override contentLength()379 public long contentLength() { 380 return length; 381 } 382 383 @Override subscribe(Subscriber<? super ByteBuffer> subscriber)384 public void subscribe(Subscriber<? super ByteBuffer> subscriber) { 385 ErrorSubscription subscription = new ErrorSubscription(subscriber); 386 subscribers.put(subscriber, subscription); 387 subscriber.onSubscribe(subscription); 388 } 389 390 class ErrorSubscription implements Flow.Subscription { 391 volatile boolean cancelled; 392 volatile int at; 393 final Subscriber<? super ByteBuffer> subscriber; ErrorSubscription(Subscriber<? super ByteBuffer> subscriber)394 ErrorSubscription(Subscriber<? super ByteBuffer> subscriber) { 395 this.subscriber = subscriber; 396 } 397 @Override request(long n)398 public void request(long n) { 399 while (!cancelled && --n >= 0 && at < Math.min(errorAt+1, content.size())) { 400 if (at++ == errorAt) { 401 subscriber.onError(errorSupplier.get()); 402 return; 403 } else if (at <= content.size()){ 404 subscriber.onNext(ByteBuffer.wrap( 405 content.get(at-1).getBytes())); 406 if (at == content.size()) { 407 subscriber.onComplete(); 408 return; 409 } 410 } 411 } 412 } 413 414 @Override cancel()415 public void cancel() { 416 cancelled = true; 417 } 418 } 419 } 420 421 static class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { 422 CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>(); 423 ConcurrentLinkedDeque<ByteBuffer> items = new ConcurrentLinkedDeque<>(); 424 CompletableFuture<List<ByteBuffer>> resultCF = new CompletableFuture<>(); 425 426 @Override onSubscribe(Subscription subscription)427 public void onSubscribe(Subscription subscription) { 428 this.subscriptionCF.complete(subscription); 429 } 430 431 @Override onNext(ByteBuffer item)432 public void onNext(ByteBuffer item) { 433 items.addLast(item); 434 } 435 436 @Override onError(Throwable throwable)437 public void onError(Throwable throwable) { 438 resultCF.completeExceptionally(throwable); 439 } 440 441 @Override onComplete()442 public void onComplete() { 443 resultCF.complete(items.stream().collect(Collectors.toUnmodifiableList())); 444 } 445 resultCF()446 CompletableFuture<List<ByteBuffer>> resultCF() { return resultCF; } 447 } 448 stringFromBuffer(ByteBuffer buffer)449 static String stringFromBuffer(ByteBuffer buffer) { 450 byte[] bytes = new byte[buffer.remaining()]; 451 buffer.get(bytes); 452 return new String(bytes); 453 } 454 stringFromBytes(Stream<ByteBuffer> buffers)455 String stringFromBytes(Stream<ByteBuffer> buffers) { 456 return buffers.map(AggregateRequestBodyTest::stringFromBuffer) 457 .collect(Collectors.joining()); 458 } 459 withNoError(String content)460 static PublishWithError withNoError(String content) { 461 return new PublishWithError(List.of(content), 1, 462 () -> new AssertionError("Should not happen!")); 463 } 464 withNoError(List<String> content)465 static PublishWithError withNoError(List<String> content) { 466 return new PublishWithError(content, content.size(), 467 () -> new AssertionError("Should not happen!")); 468 } 469 470 @Test(dataProvider = "sparseContent") // checks that NPE is thrown testNullPointerException(String description, String[] content)471 public void testNullPointerException(String description, String[] content) { 472 checkSkip(); 473 BodyPublisher[] publishers = publishers(content); 474 Assert.assertThrows(NullPointerException.class, () -> BodyPublishers.concat(publishers)); 475 } 476 477 // Verifies that an empty array creates a "noBody" publisher 478 @Test testEmpty()479 public void testEmpty() { 480 checkSkip(); 481 BodyPublisher publisher = BodyPublishers.concat(); 482 RequestSubscriber subscriber = new RequestSubscriber(); 483 assertEquals(publisher.contentLength(), 0); 484 publisher.subscribe(subscriber); 485 subscriber.subscriptionCF.thenAccept(s -> s.request(1)); 486 List<ByteBuffer> result = subscriber.resultCF.join(); 487 assertEquals(result, List.of()); 488 assertTrue(subscriber.items.isEmpty());; 489 } 490 491 // verifies that error emitted by upstream publishers are propagated downstream. 492 @Test(dataProvider = "sparseContent") // nulls are replaced with error publisher testOnError(String description, String[] content)493 public void testOnError(String description, String[] content) { 494 checkSkip(); 495 final RequestSubscriber subscriber = new RequestSubscriber(); 496 final PublishWithError errorPublisher; 497 final BodyPublisher[] publishers; 498 String result = BODIES.stream().collect(Collectors.joining()); 499 if (content == null) { 500 content = List.of(result).toArray(String[]::new); 501 errorPublisher = new PublishWithError(BODIES, BODIES.size(), 502 () -> new AssertionError("Unexpected!!")); 503 publishers = List.of(errorPublisher).toArray(new BodyPublisher[0]); 504 description = "No error"; 505 } else { 506 publishers = publishers(content); 507 description = description.replace("null", "error at"); 508 errorPublisher = new PublishWithError(BODIES, 2, () -> new Exception("expected")); 509 } 510 result = ""; 511 boolean hasErrors = false; 512 for (int i=0; i < content.length; i++) { 513 if (content[i] == null) { 514 publishers[i] = errorPublisher; 515 if (hasErrors) continue; 516 if (!errorPublisher.hasErrors()) { 517 result = result + errorPublisher 518 .content.stream().collect(Collectors.joining()); 519 } else { 520 result = result + errorPublisher.content 521 .stream().limit(errorPublisher.errorAt) 522 .collect(Collectors.joining()); 523 result = result + "<error>"; 524 hasErrors = true; 525 } 526 } else if (!hasErrors) { 527 result = result + content[i]; 528 } 529 } 530 BodyPublisher publisher = BodyPublishers.concat(publishers); 531 publisher.subscribe(subscriber); 532 subscriber.subscriptionCF.thenAccept(s -> s.request(Long.MAX_VALUE)); 533 if (errorPublisher.hasErrors()) { 534 CompletionException ce = expectThrows(CompletionException.class, 535 () -> subscriber.resultCF.join()); 536 out.println(description + ": got expected " + ce); 537 assertEquals(ce.getCause().getClass(), Exception.class); 538 assertEquals(stringFromBytes(subscriber.items.stream()) + "<error>", result); 539 } else { 540 assertEquals(stringFromBytes(subscriber.resultCF.join().stream()), result); 541 out.println(description + ": got expected result: " + result); 542 } 543 } 544 545 // Verifies that if an upstream publisher has an unknown length, the 546 // aggregate publisher will have an unknown length as well. Otherwise 547 // the length should be known. 548 @Test(dataProvider = "sparseContent") // nulls are replaced with unknown length testUnknownContentLength(String description, String[] content)549 public void testUnknownContentLength(String description, String[] content) { 550 checkSkip(); 551 if (content == null) { 552 content = BODIES.toArray(String[]::new); 553 description = "BODIES (known length)"; 554 } else { 555 description = description.replace("null", "length(-1)"); 556 } 557 BodyPublisher[] publishers = publishers(content); 558 BodyPublisher nolength = new BodyPublisher() { 559 final BodyPublisher missing = BodyPublishers.ofString("missing"); 560 @Override 561 public long contentLength() { return -1; } 562 @Override 563 public void subscribe(Subscriber<? super ByteBuffer> subscriber) { 564 missing.subscribe(subscriber); 565 } 566 }; 567 long length = 0; 568 for (int i=0; i < content.length; i++) { 569 if (content[i] == null) { 570 publishers[i] = nolength; 571 length = -1; 572 } else if (length >= 0) { 573 length += content[i].length(); 574 } 575 } 576 out.printf("testUnknownContentLength(%s): %d%n", description, length); 577 BodyPublisher publisher = BodyPublishers.concat(publishers); 578 assertEquals(publisher.contentLength(), length, 579 description.replace("null", "length(-1)")); 580 } 581 completionCause(CompletionException x)582 private static final Throwable completionCause(CompletionException x) { 583 while (x.getCause() instanceof CompletionException) { 584 x = (CompletionException)x.getCause(); 585 } 586 return x.getCause(); 587 } 588 589 @Test(dataProvider = "negativeRequests") testNegativeRequest(long n)590 public void testNegativeRequest(long n) { 591 checkSkip(); 592 assert n <= 0 : "test for negative request called with n > 0 : " + n; 593 BodyPublisher[] publishers = ContentLengthPublisher.of(List.of(1L, 2L, 3L)); 594 BodyPublisher publisher = BodyPublishers.concat(publishers); 595 RequestSubscriber subscriber = new RequestSubscriber(); 596 publisher.subscribe(subscriber); 597 Subscription subscription = subscriber.subscriptionCF.join(); 598 subscription.request(n); 599 CompletionException expected = expectThrows(CE, () -> subscriber.resultCF.join()); 600 Throwable cause = completionCause(expected); 601 if (cause instanceof IllegalArgumentException) { 602 System.out.printf("Got expected IAE for %d: %s%n", n, cause); 603 } else { 604 throw new AssertionError("Unexpected exception: " + cause, 605 (cause == null) ? expected : cause); 606 } 607 } 608 ofStrings(String... strings)609 static BodyPublisher[] ofStrings(String... strings) { 610 return Stream.of(strings).map(BodyPublishers::ofString).toArray(BodyPublisher[]::new); 611 } 612 613 @Test testPositiveRequests()614 public void testPositiveRequests() { 615 checkSkip(); 616 // A composite array of publishers 617 BodyPublisher[] publishers = Stream.of( 618 Stream.of(ofStrings("Lorem", " ", "ipsum", " ")), 619 Stream.of(BodyPublishers.concat(ofStrings("dolor", " ", "sit", " ", "amet", ", "))), 620 Stream.<BodyPublisher>of(withNoError(List.of("consectetur", " ", "adipiscing"))), 621 Stream.of(ofStrings(" ")), 622 Stream.of(BodyPublishers.concat(ofStrings("elit", "."))) 623 ).flatMap((s) -> s).toArray(BodyPublisher[]::new); 624 BodyPublisher publisher = BodyPublishers.concat(publishers); 625 626 // Test that we can request all 13 items in a single request call. 627 RequestSubscriber requestSubscriber1 = new RequestSubscriber(); 628 publisher.subscribe(requestSubscriber1); 629 Subscription subscription1 = requestSubscriber1.subscriptionCF.join(); 630 subscription1.request(16); 631 assertTrue(requestSubscriber1.resultCF().isDone()); 632 List<ByteBuffer> list1 = requestSubscriber1.resultCF().join(); 633 String result1 = stringFromBytes(list1.stream()); 634 assertEquals(result1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit."); 635 System.out.println("Got expected sentence with one request: \"%s\"".formatted(result1)); 636 637 // Test that we can split our requests call any which way we want 638 // (whether in the 'middle of a publisher' or at the boundaries. 639 RequestSubscriber requestSubscriber2 = new RequestSubscriber(); 640 publisher.subscribe(requestSubscriber2); 641 Subscription subscription2 = requestSubscriber2.subscriptionCF.join(); 642 subscription2.request(1); 643 assertFalse(requestSubscriber2.resultCF().isDone()); 644 subscription2.request(10); 645 assertFalse(requestSubscriber2.resultCF().isDone()); 646 subscription2.request(4); 647 assertFalse(requestSubscriber2.resultCF().isDone()); 648 subscription2.request(1); 649 assertTrue(requestSubscriber2.resultCF().isDone()); 650 List<ByteBuffer> list2 = requestSubscriber2.resultCF().join(); 651 String result2 = stringFromBytes(list2.stream()); 652 assertEquals(result2, "Lorem ipsum dolor sit amet, consectetur adipiscing elit."); 653 System.out.println("Got expected sentence with 4 requests: \"%s\"".formatted(result1)); 654 } 655 656 @Test(dataProvider = "contentLengths") testContentLength(long expected, List<Long> lengths)657 public void testContentLength(long expected, List<Long> lengths) { 658 checkSkip(); 659 BodyPublisher[] publishers = ContentLengthPublisher.of(lengths); 660 BodyPublisher aggregate = BodyPublishers.concat(publishers); 661 assertEquals(aggregate.contentLength(), expected, 662 "Unexpected result for %s".formatted(lengths)); 663 } 664 665 // Verifies that cancelling the subscription ensure that downstream 666 // publishers are no longer subscribed etc... 667 @Test testCancel()668 public void testCancel() { 669 checkSkip(); 670 BodyPublisher[] publishers = BODIES.stream() 671 .map(BodyPublishers::ofString) 672 .toArray(BodyPublisher[]::new); 673 BodyPublisher publisher = BodyPublishers.concat(publishers); 674 675 assertEquals(publisher.contentLength(), 676 BODIES.stream().mapToInt(String::length).sum()); 677 Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>(); 678 679 for (int n=0; n < BODIES.size(); n++) { 680 681 String description = String.format( 682 "cancel after %d/%d onNext() invocations", 683 n, BODIES.size()); 684 RequestSubscriber subscriber = new RequestSubscriber(); 685 publisher.subscribe(subscriber); 686 Subscription subscription = subscriber.subscriptionCF.join(); 687 subscribers.put(subscriber, description); 688 689 // receive half the data 690 for (int i = 0; i < n; i++) { 691 subscription.request(1); 692 ByteBuffer buffer = subscriber.items.pop(); 693 } 694 695 // cancel subscription 696 subscription.cancel(); 697 // request the rest... 698 subscription.request(Long.MAX_VALUE); 699 } 700 701 CompletableFuture[] results = subscribers.keySet() 702 .stream().map(RequestSubscriber::resultCF) 703 .toArray(CompletableFuture[]::new); 704 CompletableFuture<?> any = CompletableFuture.anyOf(results); 705 706 // subscription was cancelled, so nothing should be received... 707 try { 708 TimeoutException x = Assert.expectThrows(TimeoutException.class, 709 () -> any.get(5, TimeUnit.SECONDS)); 710 out.println("Got expected " + x); 711 } finally { 712 subscribers.keySet().stream() 713 .filter(rs -> rs.resultCF.isDone()) 714 .forEach(rs -> System.err.printf( 715 "Failed: %s completed with %s", 716 subscribers.get(rs), rs.resultCF)); 717 } 718 Consumer<RequestSubscriber> check = (rs) -> { 719 Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items"); 720 Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled"); 721 out.println(subscribers.get(rs) + ": PASSED"); 722 }; 723 subscribers.keySet().stream().forEach(check); 724 } 725 726 // Verifies that cancelling the subscription is propagated downstream 727 @Test testCancelSubscription()728 public void testCancelSubscription() { 729 checkSkip(); 730 PublishWithError upstream = new PublishWithError(BODIES, BODIES.size(), 731 () -> new AssertionError("should not come here")); 732 BodyPublisher publisher = BodyPublishers.concat(upstream); 733 734 assertEquals(publisher.contentLength(), 735 BODIES.stream().mapToInt(String::length).sum()); 736 Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>(); 737 738 for (int n=0; n < BODIES.size(); n++) { 739 740 String description = String.format( 741 "cancel after %d/%d onNext() invocations", 742 n, BODIES.size()); 743 RequestSubscriber subscriber = new RequestSubscriber(); 744 publisher.subscribe(subscriber); 745 Subscription subscription = subscriber.subscriptionCF.join(); 746 subscribers.put(subscriber, description); 747 748 // receive half the data 749 for (int i = 0; i < n; i++) { 750 subscription.request(1); 751 ByteBuffer buffer = subscriber.items.pop(); 752 } 753 754 // cancel subscription 755 subscription.cancel(); 756 // request the rest... 757 subscription.request(Long.MAX_VALUE); 758 assertTrue(upstream.subscribers.get(subscriber).cancelled, 759 description + " upstream subscription not cancelled"); 760 out.println(description + " upstream subscription was properly cancelled"); 761 } 762 763 CompletableFuture[] results = subscribers.keySet() 764 .stream().map(RequestSubscriber::resultCF) 765 .toArray(CompletableFuture[]::new); 766 CompletableFuture<?> any = CompletableFuture.anyOf(results); 767 768 // subscription was cancelled, so nothing should be received... 769 try { 770 TimeoutException x = Assert.expectThrows(TimeoutException.class, 771 () -> any.get(5, TimeUnit.SECONDS)); 772 out.println("Got expected " + x); 773 } finally { 774 subscribers.keySet().stream() 775 .filter(rs -> rs.resultCF.isDone()) 776 .forEach(rs -> System.err.printf( 777 "Failed: %s completed with %s", 778 subscribers.get(rs), rs.resultCF)); 779 } 780 Consumer<RequestSubscriber> check = (rs) -> { 781 Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items"); 782 Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled"); 783 out.println(subscribers.get(rs) + ": PASSED"); 784 }; 785 subscribers.keySet().stream().forEach(check); 786 787 } 788 789 @Test(dataProvider = "variants") test(String uri, boolean sameClient)790 public void test(String uri, boolean sameClient) throws Exception { 791 checkSkip(); 792 System.out.println("Request to " + uri); 793 794 HttpClient client = newHttpClient(sameClient); 795 796 BodyPublisher publisher = BodyPublishers.concat( 797 BODIES.stream() 798 .map(BodyPublishers::ofString) 799 .toArray(HttpRequest.BodyPublisher[]::new) 800 ); 801 HttpRequest request = HttpRequest.newBuilder(URI.create(uri)) 802 .POST(publisher) 803 .build(); 804 for (int i = 0; i < ITERATION_COUNT; i++) { 805 System.out.println("Iteration: " + i); 806 HttpResponse<String> response = client.send(request, BodyHandlers.ofString()); 807 int expectedResponse = RESPONSE_CODE; 808 if (response.statusCode() != expectedResponse) 809 throw new RuntimeException("wrong response code " + Integer.toString(response.statusCode())); 810 assertEquals(response.body(), BODIES.stream().collect(Collectors.joining())); 811 } 812 System.out.println("test: DONE"); 813 } 814 815 @BeforeTest setup()816 public void setup() throws Exception { 817 sslContext = new SimpleSSLContext().get(); 818 if (sslContext == null) 819 throw new AssertionError("Unexpected null sslContext"); 820 821 HttpTestHandler handler = new HttpTestEchoHandler(); 822 InetSocketAddress loopback = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); 823 824 HttpServer http1 = HttpServer.create(loopback, 0); 825 http1TestServer = HttpTestServer.of(http1); 826 http1TestServer.addHandler(handler, "/http1/echo/"); 827 http1URI = "http://" + http1TestServer.serverAuthority() + "/http1/echo/x"; 828 829 HttpsServer https1 = HttpsServer.create(loopback, 0); 830 https1.setHttpsConfigurator(new HttpsConfigurator(sslContext)); 831 https1TestServer = HttpTestServer.of(https1); 832 https1TestServer.addHandler(handler, "/https1/echo/"); 833 https1URI = "https://" + https1TestServer.serverAuthority() + "/https1/echo/x"; 834 835 // HTTP/2 836 http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0)); 837 http2TestServer.addHandler(handler, "/http2/echo/"); 838 http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo/x"; 839 840 https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext)); 841 https2TestServer.addHandler(handler, "/https2/echo/"); 842 https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo/x"; 843 844 serverCount.addAndGet(4); 845 http1TestServer.start(); 846 https1TestServer.start(); 847 http2TestServer.start(); 848 https2TestServer.start(); 849 } 850 851 @AfterTest teardown()852 public void teardown() throws Exception { 853 String sharedClientName = 854 sharedClient == null ? null : sharedClient.toString(); 855 sharedClient = null; 856 Thread.sleep(100); 857 AssertionError fail = TRACKER.check(500); 858 try { 859 http1TestServer.stop(); 860 https1TestServer.stop(); 861 http2TestServer.stop(); 862 https2TestServer.stop(); 863 } finally { 864 if (fail != null) { 865 if (sharedClientName != null) { 866 System.err.println("Shared client name is: " + sharedClientName); 867 } 868 throw fail; 869 } 870 } 871 } 872 } 873