1 /*
2  * Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /*
25  * @test
26  * @bug 8252374
27  * @library /test/lib http2/server
28  * @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters
29  *       ReferenceTracker AggregateRequestBodyTest
30  * @modules java.base/sun.net.www.http
31  *          java.net.http/jdk.internal.net.http.common
32  *          java.net.http/jdk.internal.net.http.frame
33  *          java.net.http/jdk.internal.net.http.hpack
34  * @run testng/othervm -Djdk.internal.httpclient.debug=true
35  *                     -Djdk.httpclient.HttpClient.log=requests,responses,errors
36  *                     AggregateRequestBodyTest
37  * @summary Tests HttpRequest.BodyPublishers::concat
38  */
39 
40 import java.net.InetAddress;
41 import java.net.InetSocketAddress;
42 import java.net.URI;
43 import java.net.http.HttpClient;
44 import java.net.http.HttpRequest;
45 import java.net.http.HttpRequest.BodyPublisher;
46 import java.net.http.HttpRequest.BodyPublishers;
47 import java.net.http.HttpResponse;
48 import java.net.http.HttpResponse.BodyHandlers;
49 import java.nio.ByteBuffer;
50 import java.util.Arrays;
51 import java.util.LinkedHashMap;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.concurrent.CompletableFuture;
55 import java.util.concurrent.CompletionException;
56 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.ConcurrentLinkedDeque;
58 import java.util.concurrent.ConcurrentMap;
59 import java.util.concurrent.Executor;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.Flow;
62 import java.util.concurrent.Flow.Subscriber;
63 import java.util.concurrent.Flow.Subscription;
64 import java.util.concurrent.TimeUnit;
65 import java.util.concurrent.TimeoutException;
66 import java.util.concurrent.atomic.AtomicLong;
67 import java.util.concurrent.atomic.AtomicReference;
68 import java.util.function.Consumer;
69 import java.util.function.Supplier;
70 import java.util.stream.Collectors;
71 import java.util.stream.LongStream;
72 import java.util.stream.Stream;
73 import javax.net.ssl.SSLContext;
74 
75 import com.sun.net.httpserver.HttpServer;
76 import com.sun.net.httpserver.HttpsConfigurator;
77 import com.sun.net.httpserver.HttpsServer;
78 import jdk.test.lib.net.SimpleSSLContext;
79 import org.testng.Assert;
80 import org.testng.ITestContext;
81 import org.testng.ITestResult;
82 import org.testng.SkipException;
83 import org.testng.annotations.AfterClass;
84 import org.testng.annotations.AfterTest;
85 import org.testng.annotations.BeforeMethod;
86 import org.testng.annotations.BeforeTest;
87 import org.testng.annotations.DataProvider;
88 import org.testng.annotations.Test;
89 
90 import static java.lang.System.out;
91 import static org.testng.Assert.assertEquals;
92 import static org.testng.Assert.assertFalse;
93 import static org.testng.Assert.assertTrue;
94 import static org.testng.Assert.expectThrows;
95 
96 public class AggregateRequestBodyTest implements HttpServerAdapters {
97 
98     SSLContext sslContext;
99     HttpTestServer http1TestServer;   // HTTP/1.1 ( http )
100     HttpTestServer https1TestServer;  // HTTPS/1.1 ( https  )
101     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
102     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
103     String http1URI;
104     String https1URI;
105     String http2URI;
106     String https2URI;
107 
108     static final int RESPONSE_CODE = 200;
109     static final int ITERATION_COUNT = 4;
110     static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
111     static final Class<CompletionException> CE = CompletionException.class;
112     // a shared executor helps reduce the amount of threads created by the test
113     static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
114     static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
115     static volatile boolean tasksFailed;
116     static final AtomicLong serverCount = new AtomicLong();
117     static final AtomicLong clientCount = new AtomicLong();
118     static final long start = System.nanoTime();
now()119     public static String now() {
120         long now = System.nanoTime() - start;
121         long secs = now / 1000_000_000;
122         long mill = (now % 1000_000_000) / 1000_000;
123         long nan = now % 1000_000;
124         return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
125     }
126 
127     final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
128     private volatile HttpClient sharedClient;
129 
130     static class TestExecutor implements Executor {
131         final AtomicLong tasks = new AtomicLong();
132         Executor executor;
TestExecutor(Executor executor)133         TestExecutor(Executor executor) {
134             this.executor = executor;
135         }
136 
137         @Override
execute(Runnable command)138         public void execute(Runnable command) {
139             long id = tasks.incrementAndGet();
140             executor.execute(() -> {
141                 try {
142                     command.run();
143                 } catch (Throwable t) {
144                     tasksFailed = true;
145                     System.out.printf(now() + "Task %s failed: %s%n", id, t);
146                     System.err.printf(now() + "Task %s failed: %s%n", id, t);
147                     FAILURES.putIfAbsent("Task " + id, t);
148                     throw t;
149                 }
150             });
151         }
152     }
153 
stopAfterFirstFailure()154     protected boolean stopAfterFirstFailure() {
155         return Boolean.getBoolean("jdk.internal.httpclient.debug");
156     }
157 
158     final AtomicReference<SkipException> skiptests = new AtomicReference<>();
checkSkip()159     void checkSkip() {
160         var skip = skiptests.get();
161         if (skip != null) throw skip;
162     }
name(ITestResult result)163     static String name(ITestResult result) {
164         var params = result.getParameters();
165         return result.getName()
166                 + (params == null ? "()" : Arrays.toString(result.getParameters()));
167     }
168 
169     @BeforeMethod
beforeMethod(ITestContext context)170     void beforeMethod(ITestContext context) {
171         if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {
172             if (skiptests.get() == null) {
173                 SkipException skip = new SkipException("some tests failed");
174                 skip.setStackTrace(new StackTraceElement[0]);
175                 skiptests.compareAndSet(null, skip);
176             }
177         }
178     }
179 
180     @AfterClass
printFailedTests(ITestContext context)181     static final void printFailedTests(ITestContext context) {
182         out.println("\n=========================");
183         try {
184             var failed = context.getFailedTests().getAllResults().stream()
185                     .collect(Collectors.toMap(r -> name(r), ITestResult::getThrowable));
186             FAILURES.putAll(failed);
187 
188             out.printf("%n%sCreated %d servers and %d clients%n",
189                     now(), serverCount.get(), clientCount.get());
190             if (FAILURES.isEmpty()) return;
191             out.println("Failed tests: ");
192             FAILURES.entrySet().forEach((e) -> {
193                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
194                 e.getValue().printStackTrace(out);
195                 e.getValue().printStackTrace();
196             });
197             if (tasksFailed) {
198                 System.out.println("WARNING: Some tasks failed");
199             }
200         } finally {
201             out.println("\n=========================\n");
202         }
203     }
204 
uris()205     private String[] uris() {
206         return new String[] {
207                 http1URI,
208                 https1URI,
209                 http2URI,
210                 https2URI,
211         };
212     }
213 
214     static AtomicLong URICOUNT = new AtomicLong();
215 
216     @DataProvider(name = "variants")
variants(ITestContext context)217     public Object[][] variants(ITestContext context) {
218         if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {
219             return new Object[0][];
220         }
221         String[] uris = uris();
222         Object[][] result = new Object[uris.length * 2][];
223         int i = 0;
224         for (boolean sameClient : List.of(false, true)) {
225             for (String uri : uris()) {
226                 result[i++] = new Object[]{uri, sameClient};
227             }
228         }
229         assert i == uris.length * 2;
230         return result;
231     }
232 
makeNewClient()233     private HttpClient makeNewClient() {
234         clientCount.incrementAndGet();
235         HttpClient client =  HttpClient.newBuilder()
236                 .proxy(HttpClient.Builder.NO_PROXY)
237                 .executor(executor)
238                 .sslContext(sslContext)
239                 .build();
240         return TRACKER.track(client);
241     }
242 
newHttpClient(boolean share)243     HttpClient newHttpClient(boolean share) {
244         if (!share) return makeNewClient();
245         HttpClient shared = sharedClient;
246         if (shared != null) return shared;
247         synchronized (this) {
248             shared = sharedClient;
249             if (shared == null) {
250                 shared = sharedClient = makeNewClient();
251             }
252             return shared;
253         }
254     }
255 
256     static final List<String> BODIES = List.of(
257             "Lorem ipsum",
258             "dolor sit amet",
259             "consectetur adipiscing elit, sed do eiusmod tempor",
260             "quis nostrud exercitation ullamco",
261             "laboris nisi",
262             "ut",
263             "aliquip ex ea commodo consequat." +
264                     "Duis aute irure dolor in reprehenderit in voluptate velit esse" +
265                     "cillum dolore eu fugiat nulla pariatur.",
266             "Excepteur sint occaecat cupidatat non proident."
267     );
268 
publishers(String... content)269     static BodyPublisher[] publishers(String... content) {
270         if (content == null) return null;
271         BodyPublisher[] result = new BodyPublisher[content.length];
272         for (int i=0; i < content.length ; i++) {
273             result[i] = content[i] == null ? null : BodyPublishers.ofString(content[i]);
274         }
275         return result;
276     }
277 
strings(String... s)278     static String[] strings(String... s) {
279         return s;
280     }
281 
282     @DataProvider(name = "sparseContent")
nulls()283     Object[][] nulls() {
284         return new Object[][] {
285                 {"null array", null},
286                 {"null element", strings((String)null)},
287                 {"null first element", strings(null, "one")},
288                 {"null second element", strings( "one", null)},
289                 {"null third element", strings( "one", "two", null)},
290                 {"null fourth element", strings( "one", "two", "three", null)},
291                 {"null random element", strings( "one", "two", "three", null, "five")},
292         };
293     }
294 
lengths(long... lengths)295     static List<Long> lengths(long... lengths) {
296         return LongStream.of(lengths)
297                 .mapToObj(Long::valueOf)
298                 .collect(Collectors.toList());
299     }
300 
301     @DataProvider(name = "contentLengths")
contentLengths()302     Object[][] contentLengths() {
303         return new Object[][] {
304                 {-1, lengths(-1)},
305                 {-42, lengths(-42)},
306                 {42, lengths(42)},
307                 {42, lengths(10, 0, 20, 0, 12)},
308                 {-1, lengths(10, 0, 20, -1, 12)},
309                 {-1, lengths(-1, 0, 20, 10, 12)},
310                 {-1, lengths(10, 0, 20, 12, -1)},
311                 {-1, lengths(10, 0, 20, -10, 12)},
312                 {-1, lengths(-10, 0, 20, 10, 12)},
313                 {-1, lengths(10, 0, 20, 12, -10)},
314                 {-1, lengths(10, 0, Long.MIN_VALUE, -1, 12)},
315                 {-1, lengths(-1, 0, Long.MIN_VALUE, 10, 12)},
316                 {-1, lengths(10, Long.MIN_VALUE, 20, 12, -1)},
317                 {Long.MAX_VALUE, lengths(10, Long.MAX_VALUE - 42L, 20, 0, 12)},
318                 {-1, lengths(10, Long.MAX_VALUE - 40L, 20, 0, 12)},
319                 {-1, lengths(10, Long.MAX_VALUE - 12L, 20, 0, 12)},
320                 {-1, lengths(10, Long.MAX_VALUE/2L, Long.MAX_VALUE/2L + 1L, 0, 12)},
321                 {-1, lengths(10, Long.MAX_VALUE/2L, -1, Long.MAX_VALUE/2L + 1L, 12)},
322                 {-1, lengths(10, Long.MAX_VALUE, 12, Long.MAX_VALUE, 20)},
323                 {-1, lengths(10, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},
324                 {-1, lengths(0, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},
325                 {-1, lengths(Long.MAX_VALUE, Long.MAX_VALUE, 12, 0, 20)}
326         };
327     }
328 
329     @DataProvider(name="negativeRequests")
negativeRequests()330     Object[][] negativeRequests() {
331         return new Object[][] {
332                 {0L}, {-1L}, {-2L}, {Long.MIN_VALUE + 1L}, {Long.MIN_VALUE}
333         };
334     }
335 
336 
337     static class ContentLengthPublisher implements BodyPublisher {
338         final long length;
ContentLengthPublisher(long length)339         ContentLengthPublisher(long length) {
340             this.length = length;
341         }
342         @Override
contentLength()343         public long contentLength() {
344             return length;
345         }
346 
347         @Override
subscribe(Subscriber<? super ByteBuffer> subscriber)348         public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
349         }
350 
of(List<Long> lengths)351         static ContentLengthPublisher[] of(List<Long> lengths) {
352             return lengths.stream()
353                     .map(ContentLengthPublisher::new)
354                     .toArray(ContentLengthPublisher[]::new);
355         }
356     }
357 
358     /**
359      * A dummy publisher that allows to call onError on its subscriber (or not...).
360      */
361     static class PublishWithError implements BodyPublisher {
362         final ConcurrentHashMap<Subscriber<?>, ErrorSubscription> subscribers = new ConcurrentHashMap<>();
363         final long length;
364         final List<String> content;
365         final int errorAt;
366         final Supplier<? extends Throwable> errorSupplier;
PublishWithError(List<String> content, int errorAt, Supplier<? extends Throwable> supplier)367         PublishWithError(List<String> content, int errorAt, Supplier<? extends Throwable> supplier) {
368             this.content = content;
369             this.errorAt = errorAt;
370             this.errorSupplier = supplier;
371             length = content.stream().mapToInt(String::length).sum();
372         }
373 
hasErrors()374         boolean hasErrors() {
375             return errorAt < content.size();
376         }
377 
378         @Override
contentLength()379         public long contentLength() {
380             return length;
381         }
382 
383         @Override
subscribe(Subscriber<? super ByteBuffer> subscriber)384         public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
385             ErrorSubscription subscription = new ErrorSubscription(subscriber);
386             subscribers.put(subscriber, subscription);
387             subscriber.onSubscribe(subscription);
388         }
389 
390         class ErrorSubscription implements Flow.Subscription {
391             volatile boolean cancelled;
392             volatile int at;
393             final Subscriber<? super ByteBuffer> subscriber;
ErrorSubscription(Subscriber<? super ByteBuffer> subscriber)394             ErrorSubscription(Subscriber<? super ByteBuffer> subscriber) {
395                 this.subscriber = subscriber;
396             }
397             @Override
request(long n)398             public void request(long n) {
399                 while (!cancelled && --n >= 0 && at < Math.min(errorAt+1, content.size())) {
400                     if (at++ == errorAt) {
401                         subscriber.onError(errorSupplier.get());
402                         return;
403                     } else if (at <= content.size()){
404                         subscriber.onNext(ByteBuffer.wrap(
405                                 content.get(at-1).getBytes()));
406                         if (at == content.size()) {
407                             subscriber.onComplete();
408                             return;
409                         }
410                     }
411                 }
412             }
413 
414             @Override
cancel()415             public void cancel() {
416                 cancelled = true;
417             }
418         }
419     }
420 
421     static class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
422         CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>();
423         ConcurrentLinkedDeque<ByteBuffer> items = new ConcurrentLinkedDeque<>();
424         CompletableFuture<List<ByteBuffer>> resultCF = new CompletableFuture<>();
425 
426         @Override
onSubscribe(Subscription subscription)427         public void onSubscribe(Subscription subscription) {
428             this.subscriptionCF.complete(subscription);
429         }
430 
431         @Override
onNext(ByteBuffer item)432         public void onNext(ByteBuffer item) {
433             items.addLast(item);
434         }
435 
436         @Override
onError(Throwable throwable)437         public void onError(Throwable throwable) {
438             resultCF.completeExceptionally(throwable);
439         }
440 
441         @Override
onComplete()442         public void onComplete() {
443             resultCF.complete(items.stream().collect(Collectors.toUnmodifiableList()));
444         }
445 
resultCF()446         CompletableFuture<List<ByteBuffer>> resultCF() { return resultCF; }
447     }
448 
stringFromBuffer(ByteBuffer buffer)449     static String stringFromBuffer(ByteBuffer buffer) {
450         byte[] bytes = new byte[buffer.remaining()];
451         buffer.get(bytes);
452         return new String(bytes);
453     }
454 
stringFromBytes(Stream<ByteBuffer> buffers)455     String stringFromBytes(Stream<ByteBuffer> buffers) {
456         return buffers.map(AggregateRequestBodyTest::stringFromBuffer)
457                 .collect(Collectors.joining());
458     }
459 
withNoError(String content)460     static PublishWithError withNoError(String content) {
461         return new PublishWithError(List.of(content), 1,
462                 () -> new AssertionError("Should not happen!"));
463     }
464 
withNoError(List<String> content)465     static PublishWithError withNoError(List<String> content) {
466         return new PublishWithError(content, content.size(),
467                 () -> new AssertionError("Should not happen!"));
468     }
469 
470     @Test(dataProvider = "sparseContent") // checks that NPE is thrown
testNullPointerException(String description, String[] content)471     public void testNullPointerException(String description, String[] content) {
472         checkSkip();
473         BodyPublisher[] publishers = publishers(content);
474         Assert.assertThrows(NullPointerException.class, () -> BodyPublishers.concat(publishers));
475     }
476 
477     // Verifies that an empty array creates a "noBody" publisher
478     @Test
testEmpty()479     public void testEmpty() {
480         checkSkip();
481         BodyPublisher publisher = BodyPublishers.concat();
482         RequestSubscriber subscriber = new RequestSubscriber();
483         assertEquals(publisher.contentLength(), 0);
484         publisher.subscribe(subscriber);
485         subscriber.subscriptionCF.thenAccept(s -> s.request(1));
486         List<ByteBuffer> result = subscriber.resultCF.join();
487         assertEquals(result, List.of());
488         assertTrue(subscriber.items.isEmpty());;
489     }
490 
491     // verifies that error emitted by upstream publishers are propagated downstream.
492     @Test(dataProvider = "sparseContent") // nulls are replaced with error publisher
testOnError(String description, String[] content)493     public void testOnError(String description, String[] content) {
494         checkSkip();
495         final RequestSubscriber subscriber = new RequestSubscriber();
496         final PublishWithError errorPublisher;
497         final BodyPublisher[] publishers;
498         String result = BODIES.stream().collect(Collectors.joining());
499         if (content == null) {
500             content = List.of(result).toArray(String[]::new);
501             errorPublisher = new PublishWithError(BODIES, BODIES.size(),
502                     () -> new AssertionError("Unexpected!!"));
503             publishers = List.of(errorPublisher).toArray(new BodyPublisher[0]);
504             description = "No error";
505         } else {
506             publishers = publishers(content);
507             description = description.replace("null", "error at");
508             errorPublisher = new PublishWithError(BODIES, 2, () -> new Exception("expected"));
509         }
510         result = "";
511         boolean hasErrors = false;
512         for (int i=0; i < content.length; i++) {
513             if (content[i] == null) {
514                 publishers[i] = errorPublisher;
515                 if (hasErrors) continue;
516                 if (!errorPublisher.hasErrors()) {
517                     result = result + errorPublisher
518                             .content.stream().collect(Collectors.joining());
519                 } else {
520                     result = result + errorPublisher.content
521                             .stream().limit(errorPublisher.errorAt)
522                             .collect(Collectors.joining());
523                     result = result + "<error>";
524                     hasErrors = true;
525                 }
526             } else if (!hasErrors) {
527                 result = result + content[i];
528             }
529         }
530         BodyPublisher publisher = BodyPublishers.concat(publishers);
531         publisher.subscribe(subscriber);
532         subscriber.subscriptionCF.thenAccept(s -> s.request(Long.MAX_VALUE));
533         if (errorPublisher.hasErrors()) {
534             CompletionException ce = expectThrows(CompletionException.class,
535                     () -> subscriber.resultCF.join());
536             out.println(description + ": got expected " + ce);
537             assertEquals(ce.getCause().getClass(), Exception.class);
538             assertEquals(stringFromBytes(subscriber.items.stream()) + "<error>", result);
539         } else {
540             assertEquals(stringFromBytes(subscriber.resultCF.join().stream()), result);
541             out.println(description + ": got expected result: " + result);
542         }
543     }
544 
545     // Verifies that if an upstream publisher has an unknown length, the
546     // aggregate publisher will have an unknown length as well. Otherwise
547     // the length should be known.
548     @Test(dataProvider = "sparseContent") // nulls are replaced with unknown length
testUnknownContentLength(String description, String[] content)549     public void testUnknownContentLength(String description, String[] content) {
550         checkSkip();
551         if (content == null) {
552             content = BODIES.toArray(String[]::new);
553             description = "BODIES (known length)";
554         } else {
555             description = description.replace("null", "length(-1)");
556         }
557         BodyPublisher[] publishers = publishers(content);
558         BodyPublisher nolength = new BodyPublisher() {
559             final BodyPublisher missing = BodyPublishers.ofString("missing");
560             @Override
561             public long contentLength() { return -1; }
562             @Override
563             public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
564                 missing.subscribe(subscriber);
565             }
566         };
567         long length = 0;
568         for (int i=0; i < content.length; i++) {
569             if (content[i] == null) {
570                 publishers[i] = nolength;
571                 length = -1;
572             } else if (length >= 0) {
573                 length += content[i].length();
574             }
575         }
576         out.printf("testUnknownContentLength(%s): %d%n", description, length);
577         BodyPublisher publisher = BodyPublishers.concat(publishers);
578         assertEquals(publisher.contentLength(), length,
579                 description.replace("null", "length(-1)"));
580     }
581 
completionCause(CompletionException x)582     private static final Throwable completionCause(CompletionException x) {
583         while (x.getCause() instanceof CompletionException) {
584             x = (CompletionException)x.getCause();
585         }
586         return x.getCause();
587     }
588 
589     @Test(dataProvider = "negativeRequests")
testNegativeRequest(long n)590     public void testNegativeRequest(long n) {
591         checkSkip();
592         assert n <= 0 : "test for negative request called with n > 0 : " + n;
593         BodyPublisher[] publishers = ContentLengthPublisher.of(List.of(1L, 2L, 3L));
594         BodyPublisher publisher = BodyPublishers.concat(publishers);
595         RequestSubscriber subscriber = new RequestSubscriber();
596         publisher.subscribe(subscriber);
597         Subscription subscription = subscriber.subscriptionCF.join();
598         subscription.request(n);
599         CompletionException expected = expectThrows(CE, () -> subscriber.resultCF.join());
600         Throwable cause = completionCause(expected);
601         if (cause instanceof IllegalArgumentException) {
602             System.out.printf("Got expected IAE for %d: %s%n", n, cause);
603         } else {
604             throw new AssertionError("Unexpected exception: " + cause,
605                     (cause == null) ? expected : cause);
606         }
607     }
608 
ofStrings(String... strings)609     static BodyPublisher[] ofStrings(String... strings) {
610         return Stream.of(strings).map(BodyPublishers::ofString).toArray(BodyPublisher[]::new);
611     }
612 
613     @Test
testPositiveRequests()614     public void testPositiveRequests()  {
615         checkSkip();
616         // A composite array of publishers
617         BodyPublisher[] publishers = Stream.of(
618                 Stream.of(ofStrings("Lorem", " ", "ipsum", " ")),
619                 Stream.of(BodyPublishers.concat(ofStrings("dolor", " ", "sit", " ", "amet", ", "))),
620                 Stream.<BodyPublisher>of(withNoError(List.of("consectetur", " ", "adipiscing"))),
621                 Stream.of(ofStrings(" ")),
622                 Stream.of(BodyPublishers.concat(ofStrings("elit", ".")))
623         ).flatMap((s) -> s).toArray(BodyPublisher[]::new);
624         BodyPublisher publisher = BodyPublishers.concat(publishers);
625 
626         // Test that we can request all 13 items in a single request call.
627         RequestSubscriber requestSubscriber1 = new RequestSubscriber();
628         publisher.subscribe(requestSubscriber1);
629         Subscription subscription1 = requestSubscriber1.subscriptionCF.join();
630         subscription1.request(16);
631         assertTrue(requestSubscriber1.resultCF().isDone());
632         List<ByteBuffer> list1 = requestSubscriber1.resultCF().join();
633         String result1 = stringFromBytes(list1.stream());
634         assertEquals(result1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
635         System.out.println("Got expected sentence with one request: \"%s\"".formatted(result1));
636 
637         // Test that we can split our requests call any which way we want
638         // (whether in the 'middle of a publisher' or at the boundaries.
639         RequestSubscriber requestSubscriber2 = new RequestSubscriber();
640         publisher.subscribe(requestSubscriber2);
641         Subscription subscription2 = requestSubscriber2.subscriptionCF.join();
642         subscription2.request(1);
643         assertFalse(requestSubscriber2.resultCF().isDone());
644         subscription2.request(10);
645         assertFalse(requestSubscriber2.resultCF().isDone());
646         subscription2.request(4);
647         assertFalse(requestSubscriber2.resultCF().isDone());
648         subscription2.request(1);
649         assertTrue(requestSubscriber2.resultCF().isDone());
650         List<ByteBuffer> list2 = requestSubscriber2.resultCF().join();
651         String result2 = stringFromBytes(list2.stream());
652         assertEquals(result2, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
653         System.out.println("Got expected sentence with 4 requests: \"%s\"".formatted(result1));
654     }
655 
656     @Test(dataProvider = "contentLengths")
testContentLength(long expected, List<Long> lengths)657     public void testContentLength(long expected, List<Long> lengths) {
658         checkSkip();
659         BodyPublisher[] publishers = ContentLengthPublisher.of(lengths);
660         BodyPublisher aggregate = BodyPublishers.concat(publishers);
661         assertEquals(aggregate.contentLength(), expected,
662                 "Unexpected result for %s".formatted(lengths));
663     }
664 
665     // Verifies that cancelling the subscription ensure that downstream
666     // publishers are no longer subscribed etc...
667     @Test
testCancel()668     public void testCancel() {
669         checkSkip();
670         BodyPublisher[] publishers = BODIES.stream()
671                 .map(BodyPublishers::ofString)
672                 .toArray(BodyPublisher[]::new);
673         BodyPublisher publisher = BodyPublishers.concat(publishers);
674 
675         assertEquals(publisher.contentLength(),
676                 BODIES.stream().mapToInt(String::length).sum());
677         Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();
678 
679         for (int n=0; n < BODIES.size(); n++) {
680 
681             String description = String.format(
682                     "cancel after %d/%d onNext() invocations",
683                     n, BODIES.size());
684             RequestSubscriber subscriber = new RequestSubscriber();
685             publisher.subscribe(subscriber);
686             Subscription subscription = subscriber.subscriptionCF.join();
687             subscribers.put(subscriber, description);
688 
689             // receive half the data
690             for (int i = 0; i < n; i++) {
691                 subscription.request(1);
692                 ByteBuffer buffer = subscriber.items.pop();
693             }
694 
695             // cancel subscription
696             subscription.cancel();
697             // request the rest...
698             subscription.request(Long.MAX_VALUE);
699         }
700 
701         CompletableFuture[] results = subscribers.keySet()
702                 .stream().map(RequestSubscriber::resultCF)
703                 .toArray(CompletableFuture[]::new);
704         CompletableFuture<?> any = CompletableFuture.anyOf(results);
705 
706         // subscription was cancelled, so nothing should be received...
707         try {
708             TimeoutException x = Assert.expectThrows(TimeoutException.class,
709                     () -> any.get(5, TimeUnit.SECONDS));
710             out.println("Got expected " + x);
711         } finally {
712             subscribers.keySet().stream()
713                     .filter(rs -> rs.resultCF.isDone())
714                     .forEach(rs -> System.err.printf(
715                             "Failed: %s completed with %s",
716                             subscribers.get(rs), rs.resultCF));
717         }
718         Consumer<RequestSubscriber> check = (rs) -> {
719             Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");
720             Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");
721             out.println(subscribers.get(rs) + ": PASSED");
722         };
723         subscribers.keySet().stream().forEach(check);
724     }
725 
726     // Verifies that cancelling the subscription is propagated downstream
727     @Test
testCancelSubscription()728     public void testCancelSubscription() {
729         checkSkip();
730         PublishWithError upstream = new PublishWithError(BODIES, BODIES.size(),
731                 () -> new AssertionError("should not come here"));
732         BodyPublisher publisher = BodyPublishers.concat(upstream);
733 
734         assertEquals(publisher.contentLength(),
735                 BODIES.stream().mapToInt(String::length).sum());
736         Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();
737 
738         for (int n=0; n < BODIES.size(); n++) {
739 
740             String description = String.format(
741                     "cancel after %d/%d onNext() invocations",
742                     n, BODIES.size());
743             RequestSubscriber subscriber = new RequestSubscriber();
744             publisher.subscribe(subscriber);
745             Subscription subscription = subscriber.subscriptionCF.join();
746             subscribers.put(subscriber, description);
747 
748             // receive half the data
749             for (int i = 0; i < n; i++) {
750                 subscription.request(1);
751                 ByteBuffer buffer = subscriber.items.pop();
752             }
753 
754             // cancel subscription
755             subscription.cancel();
756             // request the rest...
757             subscription.request(Long.MAX_VALUE);
758             assertTrue(upstream.subscribers.get(subscriber).cancelled,
759                     description + " upstream subscription not cancelled");
760             out.println(description + " upstream subscription was properly cancelled");
761         }
762 
763         CompletableFuture[] results = subscribers.keySet()
764                 .stream().map(RequestSubscriber::resultCF)
765                 .toArray(CompletableFuture[]::new);
766         CompletableFuture<?> any = CompletableFuture.anyOf(results);
767 
768         // subscription was cancelled, so nothing should be received...
769         try {
770             TimeoutException x = Assert.expectThrows(TimeoutException.class,
771                     () -> any.get(5, TimeUnit.SECONDS));
772             out.println("Got expected " + x);
773         } finally {
774             subscribers.keySet().stream()
775                     .filter(rs -> rs.resultCF.isDone())
776                     .forEach(rs -> System.err.printf(
777                             "Failed: %s completed with %s",
778                             subscribers.get(rs), rs.resultCF));
779         }
780         Consumer<RequestSubscriber> check = (rs) -> {
781             Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");
782             Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");
783             out.println(subscribers.get(rs) + ": PASSED");
784         };
785         subscribers.keySet().stream().forEach(check);
786 
787     }
788 
789     @Test(dataProvider = "variants")
test(String uri, boolean sameClient)790     public void test(String uri, boolean sameClient) throws Exception {
791         checkSkip();
792         System.out.println("Request to " + uri);
793 
794         HttpClient client = newHttpClient(sameClient);
795 
796         BodyPublisher publisher = BodyPublishers.concat(
797                 BODIES.stream()
798                         .map(BodyPublishers::ofString)
799                         .toArray(HttpRequest.BodyPublisher[]::new)
800                 );
801         HttpRequest request = HttpRequest.newBuilder(URI.create(uri))
802                 .POST(publisher)
803                 .build();
804         for (int i = 0; i < ITERATION_COUNT; i++) {
805             System.out.println("Iteration: " + i);
806             HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
807             int expectedResponse =  RESPONSE_CODE;
808             if (response.statusCode() != expectedResponse)
809                 throw new RuntimeException("wrong response code " + Integer.toString(response.statusCode()));
810             assertEquals(response.body(), BODIES.stream().collect(Collectors.joining()));
811         }
812         System.out.println("test: DONE");
813     }
814 
815     @BeforeTest
setup()816     public void setup() throws Exception {
817         sslContext = new SimpleSSLContext().get();
818         if (sslContext == null)
819             throw new AssertionError("Unexpected null sslContext");
820 
821         HttpTestHandler handler = new HttpTestEchoHandler();
822         InetSocketAddress loopback = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
823 
824         HttpServer http1 = HttpServer.create(loopback, 0);
825         http1TestServer = HttpTestServer.of(http1);
826         http1TestServer.addHandler(handler, "/http1/echo/");
827         http1URI = "http://" + http1TestServer.serverAuthority() + "/http1/echo/x";
828 
829         HttpsServer https1 = HttpsServer.create(loopback, 0);
830         https1.setHttpsConfigurator(new HttpsConfigurator(sslContext));
831         https1TestServer = HttpTestServer.of(https1);
832         https1TestServer.addHandler(handler, "/https1/echo/");
833         https1URI = "https://" + https1TestServer.serverAuthority() + "/https1/echo/x";
834 
835         // HTTP/2
836         http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
837         http2TestServer.addHandler(handler, "/http2/echo/");
838         http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo/x";
839 
840         https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
841         https2TestServer.addHandler(handler, "/https2/echo/");
842         https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo/x";
843 
844         serverCount.addAndGet(4);
845         http1TestServer.start();
846         https1TestServer.start();
847         http2TestServer.start();
848         https2TestServer.start();
849     }
850 
851     @AfterTest
teardown()852     public void teardown() throws Exception {
853         String sharedClientName =
854                 sharedClient == null ? null : sharedClient.toString();
855         sharedClient = null;
856         Thread.sleep(100);
857         AssertionError fail = TRACKER.check(500);
858         try {
859             http1TestServer.stop();
860             https1TestServer.stop();
861             http2TestServer.stop();
862             https2TestServer.stop();
863         } finally {
864             if (fail != null) {
865                 if (sharedClientName != null) {
866                     System.err.println("Shared client name is: " + sharedClientName);
867                 }
868                 throw fail;
869             }
870         }
871     }
872 }
873