1 /*
2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /*
25  * @test
26  * @summary Tests response body subscribers's onComplete is not invoked before onSubscribe
27  * @library /lib/testlibrary http2/server
28  * @build jdk.testlibrary.SimpleSSLContext
29  * @modules java.base/sun.net.www.http
30  *          java.net.http/jdk.internal.net.http.common
31  *          java.net.http/jdk.internal.net.http.frame
32  *          java.net.http/jdk.internal.net.http.hpack
33  * @run testng/othervm CustomResponseSubscriber
34  */
35 
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.net.InetAddress;
39 import java.net.InetSocketAddress;
40 import java.net.URI;
41 import java.nio.ByteBuffer;
42 import java.util.List;
43 import java.util.concurrent.CompletionStage;
44 import java.util.concurrent.Executor;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.Flow;
47 import com.sun.net.httpserver.HttpExchange;
48 import com.sun.net.httpserver.HttpHandler;
49 import com.sun.net.httpserver.HttpServer;
50 import com.sun.net.httpserver.HttpsConfigurator;
51 import com.sun.net.httpserver.HttpsServer;
52 import java.net.http.HttpClient;
53 import java.net.http.HttpHeaders;
54 import java.net.http.HttpRequest;
55 import java.net.http.HttpResponse;
56 import java.net.http.HttpResponse.BodyHandler;
57 import java.net.http.HttpResponse.BodySubscriber;
58 import java.net.http.HttpResponse.BodySubscribers;
59 import javax.net.ssl.SSLContext;
60 import jdk.testlibrary.SimpleSSLContext;
61 import org.testng.annotations.AfterTest;
62 import org.testng.annotations.BeforeTest;
63 import org.testng.annotations.DataProvider;
64 import org.testng.annotations.Test;
65 import static java.lang.System.out;
66 import static java.nio.charset.StandardCharsets.UTF_8;
67 import static org.testng.Assert.assertEquals;
68 import static org.testng.Assert.assertTrue;
69 
70 public class CustomResponseSubscriber {
71 
72     SSLContext sslContext;
73     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
74     HttpsServer httpsTestServer;       // HTTPS/1.1
75     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
76     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
77     String httpURI_fixed;
78     String httpURI_chunk;
79     String httpsURI_fixed;
80     String httpsURI_chunk;
81     String http2URI_fixed;
82     String http2URI_chunk;
83     String https2URI_fixed;
84     String https2URI_chunk;
85 
86     static final int ITERATION_COUNT = 10;
87     // a shared executor helps reduce the amount of threads created by the test
88     static final Executor executor = Executors.newCachedThreadPool();
89 
90     @DataProvider(name = "variants")
variants()91     public Object[][] variants() {
92         return new Object[][]{
93                 { httpURI_fixed,    false },
94                 { httpURI_chunk,    false },
95                 { httpsURI_fixed,   false },
96                 { httpsURI_chunk,   false },
97                 { http2URI_fixed,   false },
98                 { http2URI_chunk,   false },
99                 { https2URI_fixed,  false,},
100                 { https2URI_chunk,  false },
101 
102                 { httpURI_fixed,    true },
103                 { httpURI_chunk,    true },
104                 { httpsURI_fixed,   true },
105                 { httpsURI_chunk,   true },
106                 { http2URI_fixed,   true },
107                 { http2URI_chunk,   true },
108                 { https2URI_fixed,  true,},
109                 { https2URI_chunk,  true },
110         };
111     }
112 
newHttpClient()113     HttpClient newHttpClient() {
114         return HttpClient.newBuilder()
115                          .executor(executor)
116                          .sslContext(sslContext)
117                          .build();
118     }
119 
120     @Test(dataProvider = "variants")
testAsString(String uri, boolean sameClient)121     public void testAsString(String uri, boolean sameClient) throws Exception {
122         HttpClient client = null;
123         for (int i=0; i< ITERATION_COUNT; i++) {
124             if (!sameClient || client == null)
125                 client = newHttpClient();
126 
127             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
128                                          .build();
129             BodyHandler<String> handler = new CRSBodyHandler();
130             HttpResponse<String> response = client.send(req, handler);
131             String body = response.body();
132             assertEquals(body, "");
133         }
134     }
135 
136     static class CRSBodyHandler implements BodyHandler<String> {
137         @Override
apply(HttpResponse.ResponseInfo rinfo)138         public BodySubscriber<String> apply(HttpResponse.ResponseInfo rinfo) {
139             assertEquals(rinfo.statusCode(), 200);
140             return new CRSBodySubscriber();
141         }
142     }
143 
144     static class CRSBodySubscriber implements BodySubscriber<String> {
145         private final BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
146         volatile boolean onSubscribeCalled;
147 
148         @Override
onSubscribe(Flow.Subscription subscription)149         public void onSubscribe(Flow.Subscription subscription) {
150             //out.println("onSubscribe ");
151             onSubscribeCalled = true;
152             ofString.onSubscribe(subscription);
153         }
154 
155         @Override
onNext(List<ByteBuffer> item)156         public void onNext(List<ByteBuffer> item) {
157            // out.println("onNext " + item);
158             assertTrue(onSubscribeCalled);
159             ofString.onNext(item);
160         }
161 
162         @Override
onError(Throwable throwable)163         public void onError(Throwable throwable) {
164             //out.println("onError");
165             assertTrue(onSubscribeCalled);
166             ofString.onError(throwable);
167         }
168 
169         @Override
onComplete()170         public void onComplete() {
171             //out.println("onComplete");
172             assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
173             ofString.onComplete();
174         }
175 
176         @Override
getBody()177         public CompletionStage<String> getBody() {
178             return ofString.getBody();
179         }
180     }
181 
serverAuthority(HttpServer server)182     static String serverAuthority(HttpServer server) {
183         return InetAddress.getLoopbackAddress().getHostName() + ":"
184                 + server.getAddress().getPort();
185     }
186 
187     @BeforeTest
setup()188     public void setup() throws Exception {
189         sslContext = new SimpleSSLContext().get();
190         if (sslContext == null)
191             throw new AssertionError("Unexpected null sslContext");
192 
193         // HTTP/1.1
194         HttpHandler h1_fixedLengthHandler = new HTTP1_FixedLengthHandler();
195         HttpHandler h1_chunkHandler = new HTTP1_ChunkedHandler();
196         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
197         httpTestServer = HttpServer.create(sa, 0);
198         httpTestServer.createContext("/http1/fixed", h1_fixedLengthHandler);
199         httpTestServer.createContext("/http1/chunk", h1_chunkHandler);
200         httpURI_fixed = "http://" + serverAuthority(httpTestServer) + "/http1/fixed";
201         httpURI_chunk = "http://" + serverAuthority(httpTestServer) + "/http1/chunk";
202 
203         httpsTestServer = HttpsServer.create(sa, 0);
204         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
205         httpsTestServer.createContext("/https1/fixed", h1_fixedLengthHandler);
206         httpsTestServer.createContext("/https1/chunk", h1_chunkHandler);
207         httpsURI_fixed = "https://" + serverAuthority(httpsTestServer) + "/https1/fixed";
208         httpsURI_chunk = "https://" + serverAuthority(httpsTestServer) + "/https1/chunk";
209 
210         // HTTP/2
211         Http2Handler h2_fixedLengthHandler = new HTTP2_FixedLengthHandler();
212         Http2Handler h2_chunkedHandler = new HTTP2_VariableHandler();
213 
214         http2TestServer = new Http2TestServer("localhost", false, 0);
215         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
216         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
217         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed";
218         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk";
219 
220         https2TestServer = new Http2TestServer("localhost", true, sslContext);
221         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
222         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
223         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
224         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk";
225 
226         httpTestServer.start();
227         httpsTestServer.start();
228         http2TestServer.start();
229         https2TestServer.start();
230     }
231 
232     @AfterTest
teardown()233     public void teardown() throws Exception {
234         httpTestServer.stop(0);
235         httpsTestServer.stop(0);
236         http2TestServer.stop();
237         https2TestServer.stop();
238     }
239 
240     static class HTTP1_FixedLengthHandler implements HttpHandler {
241         @Override
handle(HttpExchange t)242         public void handle(HttpExchange t) throws IOException {
243             out.println("HTTP1_FixedLengthHandler received request to " + t.getRequestURI());
244             try (InputStream is = t.getRequestBody()) {
245                 is.readAllBytes();
246             }
247             t.sendResponseHeaders(200, -1);  //no body
248         }
249     }
250 
251     static class HTTP1_ChunkedHandler implements HttpHandler {
252         @Override
handle(HttpExchange t)253         public void handle(HttpExchange t) throws IOException {
254             out.println("HTTP1_ChunkedHandler received request to " + t.getRequestURI());
255             try (InputStream is = t.getRequestBody()) {
256                 is.readAllBytes();
257             }
258             t.sendResponseHeaders(200, 0); // chunked
259             t.getResponseBody().close();   // no body
260         }
261     }
262 
263     static class HTTP2_FixedLengthHandler implements Http2Handler {
264         @Override
handle(Http2TestExchange t)265         public void handle(Http2TestExchange t) throws IOException {
266             out.println("HTTP2_FixedLengthHandler received request to " + t.getRequestURI());
267             try (InputStream is = t.getRequestBody()) {
268                 is.readAllBytes();
269             }
270             t.sendResponseHeaders(200, 0);
271             t.getResponseBody().close();
272         }
273     }
274 
275     static class HTTP2_VariableHandler implements Http2Handler {
276         @Override
handle(Http2TestExchange t)277         public void handle(Http2TestExchange t) throws IOException {
278             out.println("HTTP2_VariableHandler received request to " + t.getRequestURI());
279             try (InputStream is = t.getRequestBody()) {
280                 is.readAllBytes();
281             }
282             t.sendResponseHeaders(200, -1); // variable
283             t.getResponseBody().close();  //no body
284         }
285     }
286 }
287