1 /*
2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 /*
25  * @test
26  * @summary Tests mapped response subscriber
27  * @library /lib/testlibrary http2/server
28  * @build jdk.testlibrary.SimpleSSLContext
29  * @modules java.base/sun.net.www.http
30  *          java.net.http/jdk.internal.net.http.common
31  *          java.net.http/jdk.internal.net.http.frame
32  *          java.net.http/jdk.internal.net.http.hpack
33  * @run testng/othervm
34  *       -Djdk.internal.httpclient.debug=true
35  *      MappingResponseSubscriber
36  */
37 
38 import java.io.IOException;
39 import java.io.InputStream;
40 import java.io.OutputStream;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.URI;
44 import java.nio.ByteBuffer;
45 import java.util.List;
46 import java.util.concurrent.CompletionStage;
47 import java.util.concurrent.Executor;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.Flow;
50 import com.sun.net.httpserver.HttpExchange;
51 import com.sun.net.httpserver.HttpHandler;
52 import com.sun.net.httpserver.HttpServer;
53 import com.sun.net.httpserver.HttpsConfigurator;
54 import com.sun.net.httpserver.HttpsServer;
55 import java.net.http.HttpClient;
56 import java.net.http.HttpHeaders;
57 import java.net.http.HttpRequest;
58 import java.net.http.HttpResponse;
59 import java.net.http.HttpResponse.BodyHandler;
60 import java.net.http.HttpResponse.BodyHandlers;
61 import java.net.http.HttpResponse.BodySubscribers;
62 import  java.net.http.HttpResponse.BodySubscriber;
63 import java.util.function.Function;
64 import javax.net.ssl.SSLContext;
65 import jdk.testlibrary.SimpleSSLContext;
66 import org.testng.annotations.AfterTest;
67 import org.testng.annotations.BeforeTest;
68 import org.testng.annotations.DataProvider;
69 import org.testng.annotations.Test;
70 import static java.lang.System.out;
71 import static java.nio.charset.StandardCharsets.UTF_8;
72 import static org.testng.Assert.assertEquals;
73 import static org.testng.Assert.assertTrue;
74 
75 public class MappingResponseSubscriber {
76 
77     SSLContext sslContext;
78     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
79     HttpsServer httpsTestServer;       // HTTPS/1.1
80     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
81     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
82     String httpURI_fixed;
83     String httpURI_chunk;
84     String httpsURI_fixed;
85     String httpsURI_chunk;
86     String http2URI_fixed;
87     String http2URI_chunk;
88     String https2URI_fixed;
89     String https2URI_chunk;
90 
91     static final int ITERATION_COUNT = 3;
92     // a shared executor helps reduce the amount of threads created by the test
93     static final Executor executor = Executors.newCachedThreadPool();
94 
95     @DataProvider(name = "variants")
variants()96     public Object[][] variants() {
97         return new Object[][]{
98                 { httpURI_fixed,    false },
99                 { httpURI_chunk,    false },
100                 { httpsURI_fixed,   false },
101                 { httpsURI_chunk,   false },
102                 { http2URI_fixed,   false },
103                 { http2URI_chunk,   false },
104                 { https2URI_fixed,  false,},
105                 { https2URI_chunk,  false },
106 
107                 { httpURI_fixed,    true },
108                 { httpURI_chunk,    true },
109                 { httpsURI_fixed,   true },
110                 { httpsURI_chunk,   true },
111                 { http2URI_fixed,   true },
112                 { http2URI_chunk,   true },
113                 { https2URI_fixed,  true,},
114                 { https2URI_chunk,  true },
115         };
116     }
117 
newHttpClient()118     HttpClient newHttpClient() {
119         return HttpClient.newBuilder()
120                          .executor(executor)
121                          .sslContext(sslContext)
122                          .build();
123     }
124 
125     @Test(dataProvider = "variants")
testAsBytes(String uri, boolean sameClient)126     public void testAsBytes(String uri, boolean sameClient) throws Exception {
127         HttpClient client = null;
128         for (int i = 0; i < ITERATION_COUNT; i++) {
129             if (!sameClient || client == null)
130                 client = newHttpClient();
131 
132             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
133                                          .build();
134             BodyHandler<byte[]> handler = new CRSBodyHandler();
135             HttpResponse<byte[]> response = client.send(req, handler);
136             byte[] body = response.body();
137             assertEquals(body, bytes);
138         }
139     }
140 
141     static class CRSBodyHandler implements BodyHandler<byte[]> {
142         @Override
apply(HttpResponse.ResponseInfo rinfo)143         public BodySubscriber<byte[]> apply(HttpResponse.ResponseInfo rinfo) {
144             assertEquals(rinfo.statusCode(), 200);
145             return BodySubscribers.mapping(
146                 new CRSBodySubscriber(), (s) -> s.getBytes(UTF_8)
147             );
148         }
149     }
150 
151     static class CRSBodySubscriber implements BodySubscriber<String> {
152         private final BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
153         volatile boolean onSubscribeCalled;
154 
155         @Override
onSubscribe(Flow.Subscription subscription)156         public void onSubscribe(Flow.Subscription subscription) {
157             //out.println("onSubscribe ");
158             onSubscribeCalled = true;
159             ofString.onSubscribe(subscription);
160         }
161 
162         @Override
onNext(List<ByteBuffer> item)163         public void onNext(List<ByteBuffer> item) {
164            // out.println("onNext " + item);
165             assertTrue(onSubscribeCalled);
166             ofString.onNext(item);
167         }
168 
169         @Override
onError(Throwable throwable)170         public void onError(Throwable throwable) {
171             //out.println("onError");
172             assertTrue(onSubscribeCalled);
173             ofString.onError(throwable);
174         }
175 
176         @Override
onComplete()177         public void onComplete() {
178             //out.println("onComplete");
179             assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
180             ofString.onComplete();
181         }
182 
183         @Override
getBody()184         public CompletionStage<String> getBody() {
185             return ofString.getBody();
186         }
187     }
188 
serverAuthority(HttpServer server)189     static String serverAuthority(HttpServer server) {
190         return InetAddress.getLoopbackAddress().getHostName() + ":"
191                 + server.getAddress().getPort();
192     }
193 
194     @BeforeTest
setup()195     public void setup() throws Exception {
196         sslContext = new SimpleSSLContext().get();
197         if (sslContext == null)
198             throw new AssertionError("Unexpected null sslContext");
199 
200         // HTTP/1.1
201         HttpHandler h1_fixedLengthHandler = new HTTP1_FixedLengthHandler();
202         HttpHandler h1_chunkHandler = new HTTP1_ChunkedHandler();
203         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
204         httpTestServer = HttpServer.create(sa, 0);
205         httpTestServer.createContext("/http1/fixed", h1_fixedLengthHandler);
206         httpTestServer.createContext("/http1/chunk", h1_chunkHandler);
207         httpURI_fixed = "http://" + serverAuthority(httpTestServer) + "/http1/fixed";
208         httpURI_chunk = "http://" + serverAuthority(httpTestServer) + "/http1/chunk";
209 
210         httpsTestServer = HttpsServer.create(sa, 0);
211         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
212         httpsTestServer.createContext("/https1/fixed", h1_fixedLengthHandler);
213         httpsTestServer.createContext("/https1/chunk", h1_chunkHandler);
214         httpsURI_fixed = "https://" + serverAuthority(httpsTestServer) + "/https1/fixed";
215         httpsURI_chunk = "https://" + serverAuthority(httpsTestServer) + "/https1/chunk";
216 
217         // HTTP/2
218         Http2Handler h2_fixedLengthHandler = new HTTP2_FixedLengthHandler();
219         Http2Handler h2_chunkedHandler = new HTTP2_VariableHandler();
220 
221         http2TestServer = new Http2TestServer("localhost", false, 0);
222         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
223         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
224         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed";
225         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk";
226 
227         https2TestServer = new Http2TestServer("localhost", true, sslContext);
228         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
229         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
230         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
231         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk";
232 
233         httpTestServer.start();
234         httpsTestServer.start();
235         http2TestServer.start();
236         https2TestServer.start();
237     }
238 
239     @AfterTest
teardown()240     public void teardown() throws Exception {
241         httpTestServer.stop(0);
242         httpsTestServer.stop(0);
243         http2TestServer.stop();
244         https2TestServer.stop();
245     }
246 
247     static byte[] bytes;
248 
249     static {
250         bytes = new byte[128 * 1024];
251         int b = 'A';
252 
253         for (int i=0; i< bytes.length; i++) {
254             bytes[i] = (byte)b;
255             b = b == 'Z'? 'A' : b + 1;
256         }
257     }
258 
259     static class HTTP1_FixedLengthHandler implements HttpHandler {
260         @Override
handle(HttpExchange t)261         public void handle(HttpExchange t) throws IOException {
262             out.println("HTTP1_FixedLengthHandler received request to " + t.getRequestURI());
263             try (InputStream is = t.getRequestBody()) {
264                 is.readAllBytes();
265             }
266             t.sendResponseHeaders(200, bytes.length);  //no body
267             OutputStream os = t.getResponseBody();
268             os.write(bytes);
269             os.close();
270         }
271     }
272 
273     static class HTTP1_ChunkedHandler implements HttpHandler {
274         @Override
handle(HttpExchange t)275         public void handle(HttpExchange t) throws IOException {
276             out.println("HTTP1_ChunkedHandler received request to " + t.getRequestURI());
277             try (InputStream is = t.getRequestBody()) {
278                 is.readAllBytes();
279             }
280             t.sendResponseHeaders(200, 0); // chunked
281             OutputStream os = t.getResponseBody();
282             os.write(bytes);
283             os.close();
284         }
285     }
286 
287     static class HTTP2_FixedLengthHandler implements Http2Handler {
288         @Override
handle(Http2TestExchange t)289         public void handle(Http2TestExchange t) throws IOException {
290             out.println("HTTP2_FixedLengthHandler received request to " + t.getRequestURI());
291             try (InputStream is = t.getRequestBody()) {
292                 is.readAllBytes();
293             }
294             t.sendResponseHeaders(200, 0); // chunked
295             OutputStream os = t.getResponseBody();
296             os.write(bytes);
297             os.close();
298         }
299     }
300 
301     static class HTTP2_VariableHandler implements Http2Handler {
302         @Override
handle(Http2TestExchange t)303         public void handle(Http2TestExchange t) throws IOException {
304             out.println("HTTP2_VariableHandler received request to " + t.getRequestURI());
305             try (InputStream is = t.getRequestBody()) {
306                 is.readAllBytes();
307             }
308             t.sendResponseHeaders(200, bytes.length);  //no body
309             OutputStream os = t.getResponseBody();
310             os.write(bytes);
311             os.close();
312         }
313     }
314 
315     // -- Compile only. Verifies generic signatures
316 
317     static final Function<CharSequence,Integer> f1 = subscriber -> 1;
318     static final Function<CharSequence,Number> f2 = subscriber -> 2;
319     static final Function<String,Integer> f3 = subscriber -> 3;
320     static final Function<String,Number> f4 = subscriber -> 4;
321 
compileOnly()322     public void compileOnly() throws Exception {
323         HttpClient client = null;
324         HttpRequest req = null;
325 
326         HttpResponse<Integer> r1 = client.send(req, (ri) ->
327                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), s -> 1));
328         HttpResponse<Number>  r2 = client.send(req, (ri) ->
329                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), s -> 1));
330         HttpResponse<String>  r3 = client.send(req, (ri) ->
331                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), s -> "s"));
332         HttpResponse<CharSequence> r4 = client.send(req, (ri) ->
333                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), s -> "s"));
334 
335         HttpResponse<Integer> x1 = client.send(req, (ri) ->
336                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f1));
337         HttpResponse<Number>  x2 = client.send(req, (ri) ->
338                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f1));
339         HttpResponse<Number>  x3 = client.send(req, (ri) ->
340                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f2));
341         HttpResponse<Integer> x4 = client.send(req, (ri) ->
342                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f3));
343         HttpResponse<Number>  x5 = client.send(req, (ri) ->
344                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f3));
345         HttpResponse<Number>  x7 = client.send(req, (ri) ->
346                 BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), f4));
347     }
348 }
349