1 /*
2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /*
25  * @test
26  * @bug 8201186
27  * @summary Tests an asynchronous BodySubscriber that completes
28  *          immediately with a Publisher<List<ByteBuffer>>
29  * @library /lib/testlibrary http2/server
30  * @build jdk.testlibrary.SimpleSSLContext
31  * @modules java.base/sun.net.www.http
32  *          java.net.http/jdk.internal.net.http.common
33  *          java.net.http/jdk.internal.net.http.frame
34  *          java.net.http/jdk.internal.net.http.hpack
35  * @run testng/othervm ResponsePublisher
36  */
37 
38 import com.sun.net.httpserver.HttpExchange;
39 import com.sun.net.httpserver.HttpHandler;
40 import com.sun.net.httpserver.HttpServer;
41 import com.sun.net.httpserver.HttpsConfigurator;
42 import com.sun.net.httpserver.HttpsServer;
43 import jdk.testlibrary.SimpleSSLContext;
44 import org.testng.annotations.AfterTest;
45 import org.testng.annotations.BeforeTest;
46 import org.testng.annotations.DataProvider;
47 import org.testng.annotations.Test;
48 
49 import javax.net.ssl.SSLContext;
50 import java.io.IOException;
51 import java.io.InputStream;
52 import java.io.OutputStream;
53 import java.net.InetAddress;
54 import java.net.InetSocketAddress;
55 import java.net.URI;
56 import java.net.http.HttpClient;
57 import java.net.http.HttpHeaders;
58 import java.net.http.HttpRequest;
59 import java.net.http.HttpResponse;
60 import java.net.http.HttpResponse.BodyHandler;
61 import java.net.http.HttpResponse.BodyHandlers;
62 import java.net.http.HttpResponse.BodySubscriber;
63 import java.net.http.HttpResponse.BodySubscribers;
64 import java.nio.ByteBuffer;
65 import java.util.List;
66 import java.util.Objects;
67 import java.util.concurrent.CompletableFuture;
68 import java.util.concurrent.CompletionException;
69 import java.util.concurrent.CompletionStage;
70 import java.util.concurrent.Executor;
71 import java.util.concurrent.Executors;
72 import java.util.concurrent.Flow;
73 import java.util.concurrent.Flow.Publisher;
74 import java.util.concurrent.atomic.AtomicReference;
75 import java.util.function.Supplier;
76 
77 import static java.lang.System.out;
78 import static java.nio.charset.StandardCharsets.UTF_8;
79 import static org.testng.Assert.assertEquals;
80 import static org.testng.Assert.assertNotNull;
81 import static org.testng.Assert.assertTrue;
82 
83 public class ResponsePublisher implements HttpServerAdapters {
84 
85     SSLContext sslContext;
86     HttpTestServer httpTestServer;    // HTTP/1.1    [ 4 servers ]
87     HttpTestServer httpsTestServer;   // HTTPS/1.1
88     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
89     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
90     String httpURI_fixed;
91     String httpURI_chunk;
92     String httpsURI_fixed;
93     String httpsURI_chunk;
94     String http2URI_fixed;
95     String http2URI_chunk;
96     String https2URI_fixed;
97     String https2URI_chunk;
98 
99     static final int ITERATION_COUNT = 3;
100     // a shared executor helps reduce the amount of threads created by the test
101     static final Executor executor = Executors.newCachedThreadPool();
102 
103     interface BHS extends Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> {
of(BHS impl, String name)104         static BHS of(BHS impl, String name) {
105             return new BHSImpl(impl, name);
106         }
107     }
108 
109     static final class BHSImpl implements BHS {
110         final BHS supplier;
111         final String name;
BHSImpl(BHS impl, String name)112         BHSImpl(BHS impl, String name) {
113             this.supplier = impl;
114             this.name = name;
115         }
116         @Override
toString()117         public String toString() {
118             return name;
119         }
120 
121         @Override
get()122         public BodyHandler<Publisher<List<ByteBuffer>>> get() {
123             return supplier.get();
124         }
125     }
126 
127     static final Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> OF_PUBLISHER_API =
128             BHS.of(BodyHandlers::ofPublisher, "BodyHandlers::ofPublisher");
129     static final Supplier<BodyHandler<Publisher<List<ByteBuffer>>>> OF_PUBLISHER_TEST =
130             BHS.of(PublishingBodyHandler::new, "PublishingBodyHandler::new");
131 
132     @DataProvider(name = "variants")
variants()133     public Object[][] variants() {
134         return new Object[][]{
135                 { httpURI_fixed,    false, OF_PUBLISHER_API },
136                 { httpURI_chunk,    false, OF_PUBLISHER_API },
137                 { httpsURI_fixed,   false, OF_PUBLISHER_API },
138                 { httpsURI_chunk,   false, OF_PUBLISHER_API },
139                 { http2URI_fixed,   false, OF_PUBLISHER_API },
140                 { http2URI_chunk,   false, OF_PUBLISHER_API },
141                 { https2URI_fixed,  false, OF_PUBLISHER_API },
142                 { https2URI_chunk,  false, OF_PUBLISHER_API },
143 
144                 { httpURI_fixed,    true, OF_PUBLISHER_API },
145                 { httpURI_chunk,    true, OF_PUBLISHER_API },
146                 { httpsURI_fixed,   true, OF_PUBLISHER_API },
147                 { httpsURI_chunk,   true, OF_PUBLISHER_API },
148                 { http2URI_fixed,   true, OF_PUBLISHER_API },
149                 { http2URI_chunk,   true, OF_PUBLISHER_API },
150                 { https2URI_fixed,  true, OF_PUBLISHER_API },
151                 { https2URI_chunk,  true, OF_PUBLISHER_API },
152 
153                 { httpURI_fixed,    false, OF_PUBLISHER_TEST },
154                 { httpURI_chunk,    false, OF_PUBLISHER_TEST },
155                 { httpsURI_fixed,   false, OF_PUBLISHER_TEST },
156                 { httpsURI_chunk,   false, OF_PUBLISHER_TEST },
157                 { http2URI_fixed,   false, OF_PUBLISHER_TEST },
158                 { http2URI_chunk,   false, OF_PUBLISHER_TEST },
159                 { https2URI_fixed,  false, OF_PUBLISHER_TEST },
160                 { https2URI_chunk,  false, OF_PUBLISHER_TEST },
161 
162                 { httpURI_fixed,    true, OF_PUBLISHER_TEST },
163                 { httpURI_chunk,    true, OF_PUBLISHER_TEST },
164                 { httpsURI_fixed,   true, OF_PUBLISHER_TEST },
165                 { httpsURI_chunk,   true, OF_PUBLISHER_TEST },
166                 { http2URI_fixed,   true, OF_PUBLISHER_TEST },
167                 { http2URI_chunk,   true, OF_PUBLISHER_TEST },
168                 { https2URI_fixed,  true, OF_PUBLISHER_TEST },
169                 { https2URI_chunk,  true, OF_PUBLISHER_TEST },
170         };
171     }
172 
173     final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
newHttpClient()174     HttpClient newHttpClient() {
175         return TRACKER.track(HttpClient.newBuilder()
176                          .executor(executor)
177                          .sslContext(sslContext)
178                          .build());
179     }
180 
181     @Test(dataProvider = "variants")
testExceptions(String uri, boolean sameClient, BHS handlers)182     public void testExceptions(String uri, boolean sameClient, BHS handlers) throws Exception {
183         HttpClient client = null;
184         for (int i=0; i< ITERATION_COUNT; i++) {
185             if (!sameClient || client == null)
186                 client = newHttpClient();
187 
188             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
189                     .build();
190             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
191             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
192             try {
193                 response.body().subscribe(null);
194                 throw new RuntimeException("Expected NPE not thrown");
195             } catch (NullPointerException x) {
196                 System.out.println("Got expected NPE: " + x);
197             }
198             // We can reuse our BodySubscribers implementations to subscribe to the
199             // Publisher<List<ByteBuffer>>
200             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
201             response.body().subscribe(ofString);
202 
203             BodySubscriber<String> ofString2 = BodySubscribers.ofString(UTF_8);
204             response.body().subscribe(ofString2);
205             try {
206                 ofString2.getBody().toCompletableFuture().join();
207                 throw new RuntimeException("Expected ISE not thrown");
208             } catch (CompletionException x) {
209                 Throwable cause = x.getCause();
210                 if (cause instanceof  IllegalStateException) {
211                     System.out.println("Got expected ISE: " + cause);
212                 } else {
213                     throw x;
214                 }
215             }
216             // Get the final result and compare it with the expected body
217             String body = ofString.getBody().toCompletableFuture().get();
218             assertEquals(body, "");
219         }
220     }
221 
222     @Test(dataProvider = "variants")
testNoBody(String uri, boolean sameClient, BHS handlers)223     public void testNoBody(String uri, boolean sameClient, BHS handlers) throws Exception {
224         HttpClient client = null;
225         for (int i=0; i< ITERATION_COUNT; i++) {
226             if (!sameClient || client == null)
227                 client = newHttpClient();
228 
229             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
230                     .build();
231             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
232             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
233             // We can reuse our BodySubscribers implementations to subscribe to the
234             // Publisher<List<ByteBuffer>>
235             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
236             // get the Publisher<List<ByteBuffer>> and
237             // subscribe to it.
238             response.body().subscribe(ofString);
239             // Get the final result and compare it with the expected body
240             String body = ofString.getBody().toCompletableFuture().get();
241             assertEquals(body, "");
242         }
243     }
244 
245     @Test(dataProvider = "variants")
testNoBodyAsync(String uri, boolean sameClient, BHS handlers)246     public void testNoBodyAsync(String uri, boolean sameClient, BHS handlers) throws Exception {
247         HttpClient client = null;
248         for (int i=0; i< ITERATION_COUNT; i++) {
249             if (!sameClient || client == null)
250                 client = newHttpClient();
251 
252             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
253                     .build();
254             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
255             // We can reuse our BodySubscribers implementations to subscribe to the
256             // Publisher<List<ByteBuffer>>
257             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
258             CompletableFuture<String> result =
259                     client.sendAsync(req, handler).thenCompose(
260                             (responsePublisher) -> {
261                                 // get the Publisher<List<ByteBuffer>> and
262                                 // subscribe to it.
263                                 responsePublisher.body().subscribe(ofString);
264                                 return ofString.getBody();
265                             });
266             // Get the final result and compare it with the expected body
267             assertEquals(result.get(), "");
268         }
269     }
270 
271     @Test(dataProvider = "variants")
testAsString(String uri, boolean sameClient, BHS handlers)272     public void testAsString(String uri, boolean sameClient, BHS handlers) throws Exception {
273         HttpClient client = null;
274         for (int i=0; i< ITERATION_COUNT; i++) {
275             if (!sameClient || client == null)
276                 client = newHttpClient();
277 
278             HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
279                     .build();
280             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
281             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
282             // We can reuse our BodySubscribers implementations to subscribe to the
283             // Publisher<List<ByteBuffer>>
284             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
285             // get the Publisher<List<ByteBuffer>> and
286             // subscribe to it.
287             response.body().subscribe(ofString);
288             // Get the final result and compare it with the expected body
289             String body = ofString.getBody().toCompletableFuture().get();
290             assertEquals(body, WITH_BODY);
291         }
292     }
293 
294     @Test(dataProvider = "variants")
testAsStringAsync(String uri, boolean sameClient, BHS handlers)295     public void testAsStringAsync(String uri, boolean sameClient, BHS handlers) throws Exception {
296         HttpClient client = null;
297         for (int i=0; i< ITERATION_COUNT; i++) {
298             if (!sameClient || client == null)
299                 client = newHttpClient();
300 
301             HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
302                     .build();
303             BodyHandler<Publisher<List<ByteBuffer>>> handler = handlers.get();
304             // We can reuse our BodySubscribers implementations to subscribe to the
305             // Publisher<List<ByteBuffer>>
306             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
307             CompletableFuture<String> result = client.sendAsync(req, handler)
308                     .thenCompose((responsePublisher) -> {
309                         // get the Publisher<List<ByteBuffer>> and
310                         // subscribe to it.
311                         responsePublisher.body().subscribe(ofString);
312                         return ofString.getBody();
313                     });
314             // Get the final result and compare it with the expected body
315             String body = result.get();
316             assertEquals(body, WITH_BODY);
317         }
318     }
319 
320     // A BodyHandler that returns PublishingBodySubscriber instances
321     static class PublishingBodyHandler implements BodyHandler<Publisher<List<ByteBuffer>>> {
322         @Override
apply(HttpResponse.ResponseInfo rinfo)323         public BodySubscriber<Publisher<List<ByteBuffer>>> apply(HttpResponse.ResponseInfo rinfo) {
324             assertEquals(rinfo.statusCode(), 200);
325             return new PublishingBodySubscriber();
326         }
327     }
328 
329     // A BodySubscriber that returns a Publisher<List<ByteBuffer>>
330     static class PublishingBodySubscriber implements BodySubscriber<Publisher<List<ByteBuffer>>> {
331         private final CompletableFuture<Flow.Subscription> subscriptionCF = new CompletableFuture<>();
332         private final CompletableFuture<Flow.Subscriber<? super List<ByteBuffer>>> subscribedCF = new CompletableFuture<>();
333         private AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
334         private final CompletionStage<Publisher<List<ByteBuffer>>> body =
335                 subscriptionCF.thenCompose((s) -> CompletableFuture.completedStage(this::subscribe));
336                 //CompletableFuture.completedStage(this::subscribe);
337 
subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber)338         private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
339             Objects.requireNonNull(subscriber, "subscriber must not be null");
340             if (subscriberRef.compareAndSet(null, subscriber)) {
341                 subscriptionCF.thenAccept((s) -> {
342                     subscriber.onSubscribe(s);
343                     subscribedCF.complete(subscriber);
344                 });
345             } else {
346                 subscriber.onSubscribe(new Flow.Subscription() {
347                     @Override public void request(long n) { }
348                     @Override public void cancel() { }
349                 });
350                 subscriber.onError(
351                         new IllegalStateException("This publisher has already one subscriber"));
352             }
353         }
354 
355         @Override
onSubscribe(Flow.Subscription subscription)356         public void onSubscribe(Flow.Subscription subscription) {
357             subscriptionCF.complete(subscription);
358         }
359 
360         @Override
onNext(List<ByteBuffer> item)361         public void onNext(List<ByteBuffer> item) {
362             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
363             Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get();
364             assert subscriber != null; // cannot be called before subscriber calls request(1)
365             subscriber.onNext(item);
366         }
367 
368         @Override
onError(Throwable throwable)369         public void onError(Throwable throwable) {
370             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
371             // onError can be called before request(1), and therefore can
372             // be called before subscriberRef is set.
373             subscribedCF.thenAccept(s -> s.onError(throwable));
374         }
375 
376         @Override
onComplete()377         public void onComplete() {
378             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
379             // onComplete can be called before request(1), and therefore can
380             // be called before subscriberRef is set.
381             subscribedCF.thenAccept(s -> s.onComplete());
382         }
383 
384         @Override
getBody()385         public CompletionStage<Publisher<List<ByteBuffer>>> getBody() {
386             return body;
387         }
388     }
389 
serverAuthority(HttpServer server)390     static String serverAuthority(HttpServer server) {
391         return InetAddress.getLoopbackAddress().getHostName() + ":"
392                 + server.getAddress().getPort();
393     }
394 
395     @BeforeTest
setup()396     public void setup() throws Exception {
397         sslContext = new SimpleSSLContext().get();
398         if (sslContext == null)
399             throw new AssertionError("Unexpected null sslContext");
400 
401         // HTTP/1.1
402         HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler();
403         HttpTestHandler h1_chunkHandler = new HTTP_VariableLengthHandler();
404         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
405         httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));
406         httpTestServer.addHandler( h1_fixedLengthHandler, "/http1/fixed");
407         httpTestServer.addHandler(h1_chunkHandler,"/http1/chunk");
408         httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed";
409         httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk";
410 
411         HttpsServer httpsServer = HttpsServer.create(sa, 0);
412         httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
413         httpsTestServer = HttpTestServer.of(httpsServer);
414         httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed");
415         httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk");
416         httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed";
417         httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk";
418 
419         // HTTP/2
420         HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
421         HttpTestHandler h2_chunkedHandler = new HTTP_VariableLengthHandler();
422 
423         http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
424         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
425         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
426         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed";
427         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk";
428 
429         https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
430         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
431         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
432         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
433         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk";
434 
435         httpTestServer.start();
436         httpsTestServer.start();
437         http2TestServer.start();
438         https2TestServer.start();
439     }
440 
441     @AfterTest
teardown()442     public void teardown() throws Exception {
443         Thread.sleep(100);
444         AssertionError fail = TRACKER.check(500);
445         try {
446             httpTestServer.stop();
447             httpsTestServer.stop();
448             http2TestServer.stop();
449             https2TestServer.stop();
450         } finally {
451             if (fail != null) {
452                 throw fail;
453             }
454         }
455     }
456 
457     static final String WITH_BODY = "Lorem ipsum dolor sit amet, consectetur" +
458             " adipiscing elit, sed do eiusmod tempor incididunt ut labore et" +
459             " dolore magna aliqua. Ut enim ad minim veniam, quis nostrud" +
460             " exercitation ullamco laboris nisi ut aliquip ex ea" +
461             " commodo consequat. Duis aute irure dolor in reprehenderit in " +
462             "voluptate velit esse cillum dolore eu fugiat nulla pariatur." +
463             " Excepteur sint occaecat cupidatat non proident, sunt in culpa qui" +
464             " officia deserunt mollit anim id est laborum.";
465 
466     static class HTTP_FixedLengthHandler implements HttpTestHandler {
467         @Override
handle(HttpTestExchange t)468         public void handle(HttpTestExchange t) throws IOException {
469             out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
470             try (InputStream is = t.getRequestBody()) {
471                 is.readAllBytes();
472             }
473             if (t.getRequestURI().getPath().endsWith("/withBody")) {
474                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
475                 t.sendResponseHeaders(200, bytes.length);  // body
476                 try (OutputStream os = t.getResponseBody()) {
477                     os.write(bytes);
478                 }
479             } else {
480                 t.sendResponseHeaders(200, 0);  //no body
481             }
482         }
483     }
484 
485     static class HTTP_VariableLengthHandler implements HttpTestHandler {
486         @Override
handle(HttpTestExchange t)487         public void handle(HttpTestExchange t) throws IOException {
488             out.println("HTTP_VariableLengthHandler received request to " + t.getRequestURI());
489             try (InputStream is = t.getRequestBody()) {
490                 is.readAllBytes();
491             }
492             t.sendResponseHeaders(200, -1);  //chunked or variable
493             if (t.getRequestURI().getPath().endsWith("/withBody")) {
494                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
495                 try (OutputStream os = t.getResponseBody()) {
496                     int chunkLen = bytes.length/10;
497                     if (chunkLen == 0) {
498                         os.write(bytes);
499                     } else {
500                         int count = 0;
501                         for (int i=0; i<10; i++) {
502                             os.write(bytes, count, chunkLen);
503                             os.flush();
504                             count += chunkLen;
505                         }
506                         os.write(bytes, count, bytes.length % chunkLen);
507                         count += bytes.length % chunkLen;
508                         assert count == bytes.length;
509                     }
510                 }
511             } else {
512                 t.getResponseBody().close();   // no body
513             }
514         }
515     }
516 }
517