1 /*
2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /*
25  * @test
26  * @summary Verify that dependent synchronous actions added before the promise CF
27  *          completes are executed either asynchronously in an executor when the
28  *          CF later completes, or in the user thread that joins.
29  * @library /test/lib http2/server
30  * @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters DependentPromiseActionsTest
31  * @modules java.base/sun.net.www.http
32  *          java.net.http/jdk.internal.net.http.common
33  *          java.net.http/jdk.internal.net.http.frame
34  *          java.net.http/jdk.internal.net.http.hpack
35  * @run testng/othervm -Djdk.internal.httpclient.debug=true DependentPromiseActionsTest
36  * @run testng/othervm/java.security.policy=dependent.policy
37   *           -Djdk.internal.httpclient.debug=true DependentPromiseActionsTest
38  */
39 
40 import java.io.BufferedReader;
41 import java.io.InputStreamReader;
42 import java.lang.StackWalker.StackFrame;
43 import jdk.test.lib.net.SimpleSSLContext;
44 import org.testng.annotations.AfterTest;
45 import org.testng.annotations.AfterClass;
46 import org.testng.annotations.BeforeTest;
47 import org.testng.annotations.DataProvider;
48 import org.testng.annotations.Test;
49 
50 import javax.net.ssl.SSLContext;
51 import java.io.IOException;
52 import java.io.InputStream;
53 import java.io.OutputStream;
54 import java.net.URI;
55 import java.net.URISyntaxException;
56 import java.net.http.HttpClient;
57 import java.net.http.HttpHeaders;
58 import java.net.http.HttpRequest;
59 import java.net.http.HttpResponse;
60 import java.net.http.HttpResponse.BodyHandler;
61 import java.net.http.HttpResponse.BodyHandlers;
62 import java.net.http.HttpResponse.BodySubscriber;
63 import java.net.http.HttpResponse.PushPromiseHandler;
64 import java.nio.ByteBuffer;
65 import java.nio.charset.StandardCharsets;
66 import java.util.EnumSet;
67 import java.util.List;
68 import java.util.Map;
69 import java.util.Optional;
70 import java.util.concurrent.CompletableFuture;
71 import java.util.concurrent.CompletionException;
72 import java.util.concurrent.CompletionStage;
73 import java.util.concurrent.ConcurrentHashMap;
74 import java.util.concurrent.ConcurrentMap;
75 import java.util.concurrent.Executor;
76 import java.util.concurrent.Executors;
77 import java.util.concurrent.Flow;
78 import java.util.concurrent.Semaphore;
79 import java.util.concurrent.atomic.AtomicLong;
80 import java.util.concurrent.atomic.AtomicReference;
81 import java.util.function.BiPredicate;
82 import java.util.function.Consumer;
83 import java.util.function.Function;
84 import java.util.function.Supplier;
85 import java.util.stream.Collectors;
86 import java.util.stream.Stream;
87 
88 import static java.lang.System.err;
89 import static java.lang.System.out;
90 import static java.lang.String.format;
91 import static java.nio.charset.StandardCharsets.UTF_8;
92 import static org.testng.Assert.assertEquals;
93 import static org.testng.Assert.assertTrue;
94 
95 public class DependentPromiseActionsTest implements HttpServerAdapters {
96 
97     SSLContext sslContext;
98     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
99     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
100     String http2URI_fixed;
101     String http2URI_chunk;
102     String https2URI_fixed;
103     String https2URI_chunk;
104 
105     static final StackWalker WALKER =
106             StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE);
107 
108     static final int ITERATION_COUNT = 1;
109     // a shared executor helps reduce the amount of threads created by the test
110     static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
111     static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
112     static volatile boolean tasksFailed;
113     static final AtomicLong serverCount = new AtomicLong();
114     static final AtomicLong clientCount = new AtomicLong();
115     static final long start = System.nanoTime();
now()116     public static String now() {
117         long now = System.nanoTime() - start;
118         long secs = now / 1000_000_000;
119         long mill = (now % 1000_000_000) / 1000_000;
120         long nan = now % 1000_000;
121         return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
122     }
123 
124     private volatile HttpClient sharedClient;
125 
126     static class TestExecutor implements Executor {
127         final AtomicLong tasks = new AtomicLong();
128         Executor executor;
TestExecutor(Executor executor)129         TestExecutor(Executor executor) {
130             this.executor = executor;
131         }
132 
133         @Override
execute(Runnable command)134         public void execute(Runnable command) {
135             long id = tasks.incrementAndGet();
136             executor.execute(() -> {
137                 try {
138                     command.run();
139                 } catch (Throwable t) {
140                     tasksFailed = true;
141                     System.out.printf(now() + "Task %s failed: %s%n", id, t);
142                     System.err.printf(now() + "Task %s failed: %s%n", id, t);
143                     FAILURES.putIfAbsent("Task " + id, t);
144                     throw t;
145                 }
146             });
147         }
148     }
149 
150     @AfterClass
printFailedTests()151     static final void printFailedTests() {
152         out.println("\n=========================");
153         try {
154             out.printf("%n%sCreated %d servers and %d clients%n",
155                     now(), serverCount.get(), clientCount.get());
156             if (FAILURES.isEmpty()) return;
157             out.println("Failed tests: ");
158             FAILURES.entrySet().forEach((e) -> {
159                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
160                 e.getValue().printStackTrace(out);
161                 e.getValue().printStackTrace();
162             });
163             if (tasksFailed) {
164                 System.out.println("WARNING: Some tasks failed");
165             }
166         } finally {
167             out.println("\n=========================\n");
168         }
169     }
170 
uris()171     private String[] uris() {
172         return new String[] {
173                 http2URI_fixed,
174                 http2URI_chunk,
175                 https2URI_fixed,
176                 https2URI_chunk,
177         };
178     }
179 
180     enum SubscriberType {EAGER, LAZZY}
181 
182     static final class SemaphoreStallerSupplier
183             implements Supplier<SemaphoreStaller> {
184         @Override
get()185         public SemaphoreStaller get() {
186             return new SemaphoreStaller();
187         }
188         @Override
toString()189         public String toString() {
190             return "SemaphoreStaller";
191         }
192     }
193 
194     @DataProvider(name = "noStalls")
noThrows()195     public Object[][] noThrows() {
196         String[] uris = uris();
197         Object[][] result = new Object[uris.length * 2][];
198         int i = 0;
199         for (boolean sameClient : List.of(false, true)) {
200             for (String uri: uris()) {
201                 result[i++] = new Object[] {uri, sameClient};
202             }
203         }
204         assert i == uris.length * 2;
205         return result;
206     }
207 
208     @DataProvider(name = "variants")
variants()209     public Object[][] variants() {
210         String[] uris = uris();
211         Object[][] result = new Object[uris.length * 2][];
212         int i = 0;
213         Supplier<? extends Staller> s = new SemaphoreStallerSupplier();
214         for (Supplier<? extends Staller> staller : List.of(s)) {
215             for (boolean sameClient : List.of(false, true)) {
216                 for (String uri : uris()) {
217                     result[i++] = new Object[]{uri, sameClient, staller};
218                 }
219             }
220         }
221         assert i == uris.length * 2;
222         return result;
223     }
224 
makeNewClient()225     private HttpClient makeNewClient() {
226         clientCount.incrementAndGet();
227         return HttpClient.newBuilder()
228                 .executor(executor)
229                 .sslContext(sslContext)
230                 .build();
231     }
232 
newHttpClient(boolean share)233     HttpClient newHttpClient(boolean share) {
234         if (!share) return makeNewClient();
235         HttpClient shared = sharedClient;
236         if (shared != null) return shared;
237         synchronized (this) {
238             shared = sharedClient;
239             if (shared == null) {
240                 shared = sharedClient = makeNewClient();
241             }
242             return shared;
243         }
244     }
245 
246     @Test(dataProvider = "noStalls")
testNoStalls(String uri, boolean sameClient)247     public void testNoStalls(String uri, boolean sameClient)
248             throws Exception {
249         HttpClient client = null;
250         out.printf("%ntestNoStalls(%s, %b)%n", uri, sameClient);
251         for (int i=0; i< ITERATION_COUNT; i++) {
252             if (!sameClient || client == null)
253                 client = newHttpClient(sameClient);
254 
255             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
256                     .build();
257             BodyHandler<Stream<String>> handler =
258                     new StallingBodyHandler((w) -> {},
259                             BodyHandlers.ofLines());
260             Map<HttpRequest, CompletableFuture<HttpResponse<Stream<String>>>> pushPromises =
261                     new ConcurrentHashMap<>();
262             PushPromiseHandler<Stream<String>> pushHandler = new PushPromiseHandler<>() {
263                 @Override
264                 public void applyPushPromise(HttpRequest initiatingRequest,
265                                              HttpRequest pushPromiseRequest,
266                                              Function<BodyHandler<Stream<String>>,
267                                                      CompletableFuture<HttpResponse<Stream<String>>>>
268                                                      acceptor) {
269                     pushPromises.putIfAbsent(pushPromiseRequest, acceptor.apply(handler));
270                 }
271             };
272             HttpResponse<Stream<String>> response =
273                     client.sendAsync(req, BodyHandlers.ofLines(), pushHandler).get();
274             String body = response.body().collect(Collectors.joining("|"));
275             assertEquals(URI.create(body).getPath(), URI.create(uri).getPath());
276             for (HttpRequest promised : pushPromises.keySet()) {
277                 out.printf("%s Received promise: %s%n\tresponse: %s%n",
278                         now(), promised, pushPromises.get(promised).get());
279                 String promisedBody = pushPromises.get(promised).get().body()
280                         .collect(Collectors.joining("|"));
281                 assertEquals(promisedBody, promised.uri().toASCIIString());
282             }
283             assertEquals(3, pushPromises.size());
284         }
285     }
286 
287     @Test(dataProvider = "variants")
testAsStringAsync(String uri, boolean sameClient, Supplier<Staller> stallers)288     public void testAsStringAsync(String uri,
289                                   boolean sameClient,
290                                   Supplier<Staller> stallers)
291             throws Exception
292     {
293         String test = format("testAsStringAsync(%s, %b, %s)",
294                 uri, sameClient, stallers);
295         testDependent(test, uri, sameClient, BodyHandlers::ofString,
296                 this::finish, this::extractString, stallers,
297                 SubscriberType.EAGER);
298     }
299 
300     @Test(dataProvider = "variants")
testAsLinesAsync(String uri, boolean sameClient, Supplier<Staller> stallers)301     public void testAsLinesAsync(String uri,
302                                  boolean sameClient,
303                                  Supplier<Staller> stallers)
304             throws Exception
305     {
306         String test = format("testAsLinesAsync(%s, %b, %s)",
307                 uri, sameClient, stallers);
308         testDependent(test, uri, sameClient, BodyHandlers::ofLines,
309                 this::finish, this::extractStream, stallers,
310                 SubscriberType.LAZZY);
311     }
312 
313     @Test(dataProvider = "variants")
testAsInputStreamAsync(String uri, boolean sameClient, Supplier<Staller> stallers)314     public void testAsInputStreamAsync(String uri,
315                                        boolean sameClient,
316                                        Supplier<Staller> stallers)
317             throws Exception
318     {
319         String test = format("testAsInputStreamAsync(%s, %b, %s)",
320                 uri, sameClient, stallers);
321         testDependent(test, uri, sameClient, BodyHandlers::ofInputStream,
322                 this::finish, this::extractInputStream, stallers,
323                 SubscriberType.LAZZY);
324     }
325 
testDependent(String name, String uri, boolean sameClient, Supplier<BodyHandler<T>> handlers, Finisher finisher, Extractor<T> extractor, Supplier<Staller> stallers, SubscriberType subscriberType)326     private <T,U> void testDependent(String name, String uri, boolean sameClient,
327                                      Supplier<BodyHandler<T>> handlers,
328                                      Finisher finisher,
329                                      Extractor<T> extractor,
330                                      Supplier<Staller> stallers,
331                                      SubscriberType subscriberType)
332             throws Exception
333     {
334         out.printf("%n%s%s%n", now(), name);
335         try {
336             testDependent(uri, sameClient, handlers, finisher,
337                           extractor, stallers, subscriberType);
338         } catch (Error | Exception x) {
339             FAILURES.putIfAbsent(name, x);
340             throw x;
341         }
342     }
343 
testDependent(String uri, boolean sameClient, Supplier<BodyHandler<T>> handlers, Finisher finisher, Extractor<T> extractor, Supplier<Staller> stallers, SubscriberType subscriberType)344     private <T,U> void testDependent(String uri, boolean sameClient,
345                                      Supplier<BodyHandler<T>> handlers,
346                                      Finisher finisher,
347                                      Extractor<T> extractor,
348                                      Supplier<Staller> stallers,
349                                      SubscriberType subscriberType)
350             throws Exception
351     {
352         HttpClient client = null;
353         for (Where where : EnumSet.of(Where.BODY_HANDLER)) {
354             if (!sameClient || client == null)
355                 client = newHttpClient(sameClient);
356 
357             HttpRequest req = HttpRequest.
358                     newBuilder(URI.create(uri))
359                     .build();
360             StallingPushPromiseHandler<T> promiseHandler =
361                     new StallingPushPromiseHandler<>(where, handlers, stallers);
362             BodyHandler<T> handler = handlers.get();
363             System.out.println("try stalling in " + where);
364             CompletableFuture<HttpResponse<T>> responseCF =
365                     client.sendAsync(req, handler, promiseHandler);
366             assert subscriberType == SubscriberType.LAZZY || !responseCF.isDone();
367             finisher.finish(where, responseCF, promiseHandler, extractor);
368         }
369     }
370 
371     enum Where {
372         ON_PUSH_PROMISE, BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF;
select(Consumer<Where> consumer)373         public Consumer<Where> select(Consumer<Where> consumer) {
374             return new Consumer<Where>() {
375                 @Override
376                 public void accept(Where where) {
377                     if (Where.this == where) {
378                         consumer.accept(where);
379                     }
380                 }
381             };
382         }
383     }
384 
385     static final class StallingPushPromiseHandler<T> implements PushPromiseHandler<T> {
386 
387         static final class Tuple<U> {
388             public final CompletableFuture<HttpResponse<U>> response;
389             public final Staller staller;
390             public final AtomicReference<RuntimeException> failed;
391             Tuple(AtomicReference<RuntimeException> failed,
392                   CompletableFuture<HttpResponse<U>> response,
393                   Staller staller) {
394                 this.response = response;
395                 this.staller = staller;
396                 this.failed = failed;
397             }
398         }
399 
400         public final ConcurrentMap<HttpRequest, Tuple<T>> promiseMap =
401                 new ConcurrentHashMap<>();
402         private final Supplier<Staller> stallers;
403         private final Supplier<BodyHandler<T>> handlers;
404         private final Where where;
405         private final Thread thread = Thread.currentThread(); // main thread
406 
407         StallingPushPromiseHandler(Where where,
408                                    Supplier<BodyHandler<T>> handlers,
409                                    Supplier<Staller> stallers) {
410             this.where = where;
411             this.handlers = handlers;
412             this.stallers = stallers;
413         }
414 
415         @Override
416         public void applyPushPromise(HttpRequest initiatingRequest,
417                                      HttpRequest pushPromiseRequest,
418                                      Function<BodyHandler<T>,
419                                              CompletableFuture<HttpResponse<T>>> acceptor) {
420             AtomicReference<RuntimeException> failed = new AtomicReference<>();
421             Staller staller = stallers.get();
422             staller.acquire();
423             assert staller.willStall();
424             try {
425                 BodyHandler handler = new StallingBodyHandler<>(
426                         where.select(staller), handlers.get());
427                 CompletableFuture<HttpResponse<T>> cf = acceptor.apply(handler);
428                 Tuple<T> tuple = new Tuple(failed, cf, staller);
429                 promiseMap.putIfAbsent(pushPromiseRequest, tuple);
430                 CompletableFuture<?> done = cf.whenComplete(
431                         (r, t) -> checkThreadAndStack(thread, failed, r, t));
432                 assert !cf.isDone();
433             } finally {
434                 staller.release();
435             }
436         }
437     }
438 
439     interface Extractor<T> {
440         public List<String> extract(HttpResponse<T> resp);
441     }
442 
443     final List<String> extractString(HttpResponse<String> resp) {
444         return List.of(resp.body());
445     }
446 
447     final List<String> extractStream(HttpResponse<Stream<String>> resp) {
448         return resp.body().collect(Collectors.toList());
449     }
450 
451     final List<String> extractInputStream(HttpResponse<InputStream> resp) {
452         try (InputStream is = resp.body()) {
453             return new BufferedReader(new InputStreamReader(is))
454                     .lines().collect(Collectors.toList());
455         } catch (IOException x) {
456             throw new CompletionException(x);
457         }
458     }
459 
460     interface Finisher<T> {
461         public void finish(Where w,
462                            CompletableFuture<HttpResponse<T>> cf,
463                            StallingPushPromiseHandler<T> ph,
464                            Extractor<T> extractor);
465     }
466 
467     static Optional<StackFrame> findFrame(Stream<StackFrame> s, String name) {
468         return s.filter((f) -> f.getClassName().contains(name))
469                 .filter((f) -> f.getDeclaringClass().getModule().equals(HttpClient.class.getModule()))
470                 .findFirst();
471     }
472 
473     static <T> void checkThreadAndStack(Thread thread,
474                                         AtomicReference<RuntimeException> failed,
475                                         T result,
476                                         Throwable error) {
477         if (Thread.currentThread() == thread) {
478             //failed.set(new RuntimeException("Dependant action was executed in " + thread));
479             List<StackFrame> httpStack = WALKER.walk(s -> s.filter(f -> f.getDeclaringClass()
480                     .getModule().equals(HttpClient.class.getModule()))
481                     .collect(Collectors.toList()));
482             if (!httpStack.isEmpty()) {
483                 System.out.println("Found unexpected trace: ");
484                 httpStack.forEach(f -> System.out.printf("\t%s%n", f));
485                 failed.set(new RuntimeException("Dependant action has unexpected frame in " +
486                         Thread.currentThread() + ": " + httpStack.get(0)));
487 
488             }            return;
489         } else if (System.getSecurityManager() != null) {
490             Optional<StackFrame> sf = WALKER.walk(s -> findFrame(s, "PrivilegedRunnable"));
491             if (!sf.isPresent()) {
492                 failed.set(new RuntimeException("Dependant action does not have expected frame in "
493                         + Thread.currentThread()));
494                 return;
495             } else {
496                 System.out.println("Found expected frame: " + sf.get());
497             }
498         } else {
499             List<StackFrame> httpStack = WALKER.walk(s -> s.filter(f -> f.getDeclaringClass()
500                     .getModule().equals(HttpClient.class.getModule()))
501                     .collect(Collectors.toList()));
502             if (!httpStack.isEmpty()) {
503                 System.out.println("Found unexpected trace: ");
504                 httpStack.forEach(f -> System.out.printf("\t%s%n", f));
505                 failed.set(new RuntimeException("Dependant action has unexpected frame in " +
506                         Thread.currentThread() + ": " + httpStack.get(0)));
507 
508             }
509         }
510     }
511 
512     <T> void finish(Where w,
513                     StallingPushPromiseHandler.Tuple<T> tuple,
514                     Extractor<T> extractor) {
515         AtomicReference<RuntimeException> failed = tuple.failed;
516         CompletableFuture<HttpResponse<T>> done = tuple.response;
517         Staller staller = tuple.staller;
518         try {
519             HttpResponse<T> response = done.join();
520             List<String> result = extractor.extract(response);
521             URI uri = response.uri();
522             RuntimeException error = failed.get();
523             if (error != null) {
524                 throw new RuntimeException("Test failed in "
525                         + w + ": " + uri, error);
526             }
527             assertEquals(result, List.of(response.request().uri().toASCIIString()));
528         } finally {
529             staller.reset();
530         }
531     }
532 
533     <T> void finish(Where w,
534                     CompletableFuture<HttpResponse<T>> cf,
535                     StallingPushPromiseHandler<T> ph,
536                     Extractor<T> extractor) {
537         HttpResponse<T> response = cf.join();
538         List<String> result = extractor.extract(response);
539         for (HttpRequest req : ph.promiseMap.keySet()) {
540             finish(w, ph.promiseMap.get(req), extractor);
541         }
542         assertEquals(ph.promiseMap.size(), 3,
543                 "Expected 3 push promises for " + w + " in "
544                         + response.request().uri());
545         assertEquals(result, List.of(response.request().uri().toASCIIString()));
546 
547     }
548 
549     interface Staller extends Consumer<Where> {
550         void release();
551         void acquire();
552         void reset();
553         boolean willStall();
554     }
555 
556     static final class SemaphoreStaller implements Staller {
557         final Semaphore sem = new Semaphore(1);
558         @Override
559         public void accept(Where where) {
560             sem.acquireUninterruptibly();
561         }
562 
563         @Override
564         public void release() {
565             sem.release();
566         }
567 
568         @Override
569         public void acquire() {
570             sem.acquireUninterruptibly();
571         }
572 
573         @Override
574         public void reset() {
575             sem.drainPermits();
576             sem.release();
577         }
578 
579         @Override
580         public boolean willStall() {
581             return sem.availablePermits() <= 0;
582         }
583 
584         @Override
585         public String toString() {
586             return "SemaphoreStaller";
587         }
588     }
589 
590     static final class StallingBodyHandler<T> implements BodyHandler<T> {
591         final Consumer<Where> stalling;
592         final BodyHandler<T> bodyHandler;
593         StallingBodyHandler(Consumer<Where> stalling, BodyHandler<T> bodyHandler) {
594             this.stalling = stalling;
595             this.bodyHandler = bodyHandler;
596         }
597         @Override
598         public BodySubscriber<T> apply(HttpResponse.ResponseInfo rinfo) {
599             stalling.accept(Where.BODY_HANDLER);
600             BodySubscriber<T> subscriber = bodyHandler.apply(rinfo);
601             return new StallingBodySubscriber(stalling, subscriber);
602         }
603     }
604 
605     static final class StallingBodySubscriber<T> implements BodySubscriber<T> {
606         private final BodySubscriber<T> subscriber;
607         volatile boolean onSubscribeCalled;
608         final Consumer<Where> stalling;
609         StallingBodySubscriber(Consumer<Where> stalling, BodySubscriber<T> subscriber) {
610             this.stalling = stalling;
611             this.subscriber = subscriber;
612         }
613 
614         @Override
615         public void onSubscribe(Flow.Subscription subscription) {
616             //out.println("onSubscribe ");
617             onSubscribeCalled = true;
618             stalling.accept(Where.ON_SUBSCRIBE);
619             subscriber.onSubscribe(subscription);
620         }
621 
622         @Override
623         public void onNext(List<ByteBuffer> item) {
624             // out.println("onNext " + item);
625             assertTrue(onSubscribeCalled);
626             stalling.accept(Where.ON_NEXT);
627             subscriber.onNext(item);
628         }
629 
630         @Override
631         public void onError(Throwable throwable) {
632             //out.println("onError");
633             assertTrue(onSubscribeCalled);
634             stalling.accept(Where.ON_ERROR);
635             subscriber.onError(throwable);
636         }
637 
638         @Override
639         public void onComplete() {
640             //out.println("onComplete");
641             assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
642             stalling.accept(Where.ON_COMPLETE);
643             subscriber.onComplete();
644         }
645 
646         @Override
647         public CompletionStage<T> getBody() {
648             stalling.accept(Where.GET_BODY);
649             try {
650                 stalling.accept(Where.BODY_CF);
651             } catch (Throwable t) {
652                 return CompletableFuture.failedFuture(t);
653             }
654             return subscriber.getBody();
655         }
656     }
657 
658 
659     @BeforeTest
660     public void setup() throws Exception {
661         sslContext = new SimpleSSLContext().get();
662         if (sslContext == null)
663             throw new AssertionError("Unexpected null sslContext");
664 
665         // HTTP/2
666         HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
667         HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler();
668 
669         http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
670         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
671         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
672         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed/y";
673         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk/y";
674 
675         https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
676         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
677         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
678         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed/y";
679         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk/y";
680 
681         serverCount.addAndGet(4);
682         http2TestServer.start();
683         https2TestServer.start();
684     }
685 
686     @AfterTest
687     public void teardown() throws Exception {
688         sharedClient = null;
689         http2TestServer.stop();
690         https2TestServer.stop();
691     }
692 
693     static final BiPredicate<String,String> ACCEPT_ALL = (x, y) -> true;
694 
695     private static void pushPromiseFor(HttpTestExchange t,
696                                        URI requestURI,
697                                        String pushPath,
698                                        boolean fixed)
699         throws IOException
700     {
701         try {
702             URI promise = new URI(requestURI.getScheme(),
703                     requestURI.getAuthority(),
704                     pushPath, null, null);
705             byte[] promiseBytes = promise.toASCIIString().getBytes(UTF_8);
706             out.printf("TestServer: %s Pushing promise: %s%n", now(), promise);
707             err.printf("TestServer: %s Pushing promise: %s%n", now(), promise);
708             HttpHeaders headers;
709             if (fixed) {
710                 String length = String.valueOf(promiseBytes.length);
711                 headers = HttpHeaders.of(Map.of("Content-Length", List.of(length)),
712                                          ACCEPT_ALL);
713             } else {
714                 headers = HttpHeaders.of(Map.of(), ACCEPT_ALL); // empty
715             }
716             t.serverPush(promise, headers, promiseBytes);
717         } catch (URISyntaxException x) {
718             throw new IOException(x.getMessage(), x);
719         }
720     }
721 
722     static class HTTP_FixedLengthHandler implements HttpTestHandler {
723         @Override
724         public void handle(HttpTestExchange t) throws IOException {
725             out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
726             try (InputStream is = t.getRequestBody()) {
727                 is.readAllBytes();
728             }
729             URI requestURI = t.getRequestURI();
730             for (int i = 1; i<2; i++) {
731                 String path = requestURI.getPath() + "/before/promise-" + i;
732                 pushPromiseFor(t, requestURI, path, true);
733             }
734             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
735             t.sendResponseHeaders(200, resp.length);  //fixed content length
736             try (OutputStream os = t.getResponseBody()) {
737                 int bytes = resp.length/3;
738                 for (int i = 0; i<2; i++) {
739                     String path = requestURI.getPath() + "/after/promise-" + (i + 2);
740                     os.write(resp, i * bytes, bytes);
741                     os.flush();
742                     pushPromiseFor(t, requestURI, path, true);
743                 }
744                 os.write(resp, 2*bytes, resp.length - 2*bytes);
745             }
746         }
747 
748     }
749 
750     static class HTTP_ChunkedHandler implements HttpTestHandler {
751         @Override
752         public void handle(HttpTestExchange t) throws IOException {
753             out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI());
754             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
755             try (InputStream is = t.getRequestBody()) {
756                 is.readAllBytes();
757             }
758             URI requestURI = t.getRequestURI();
759             for (int i = 1; i<2; i++) {
760                 String path = requestURI.getPath() + "/before/promise-" + i;
761                 pushPromiseFor(t, requestURI, path, false);
762             }
763             t.sendResponseHeaders(200, -1); // chunked/variable
764             try (OutputStream os = t.getResponseBody()) {
765                 int bytes = resp.length/3;
766                 for (int i = 0; i<2; i++) {
767                     String path = requestURI.getPath() + "/after/promise-" + (i + 2);
768                     os.write(resp, i * bytes, bytes);
769                     os.flush();
770                     pushPromiseFor(t, requestURI, path, false);
771                 }
772                 os.write(resp, 2*bytes, resp.length - 2*bytes);
773             }
774         }
775     }
776 
777 
778 }
779