1 /*
2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 import java.io.BufferedReader;
25 import java.io.ByteArrayInputStream;
26 import java.io.InputStreamReader;
27 import java.io.StringReader;
28 import java.net.http.HttpResponse.BodySubscriber;
29 import java.net.http.HttpResponse.BodySubscribers;
30 import java.nio.ByteBuffer;
31 import java.nio.charset.Charset;
32 import java.nio.charset.MalformedInputException;
33 import java.util.Arrays;
34 import java.util.List;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.SubmissionPublisher;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.stream.Collectors;
39 import java.util.stream.Stream;
40 import org.testng.annotations.Test;
41 import static java.nio.charset.StandardCharsets.UTF_8;
42 import static java.nio.charset.StandardCharsets.UTF_16;
43 import static org.testng.Assert.assertEquals;
44 
45 /*
46  * @test
47  * @summary tests for BodySubscribers returned by asLines.
48  *       In particular tests that surrogate characters are handled
49  *       correctly.
50  * @modules java.net.http java.logging
51  * @run testng/othervm LineStreamsAndSurrogatesTest
52  */
53 
54 public class LineStreamsAndSurrogatesTest {
55 
56 
57     static final Class<NullPointerException> NPE = NullPointerException.class;
58 
lines(String text)59     private static final List<String> lines(String text) {
60         return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList());
61     }
62 
63     @Test
testUncomplete()64     void testUncomplete() throws Exception {
65         // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
66         String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r les\n\n" +
67                 " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\ud801\udc00";
68         Charset charset = UTF_8;
69 
70         BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset);
71         AtomicReference<Throwable> errorRef = new AtomicReference<>();
72         Runnable run = () -> {
73             try {
74                 SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
75                 byte[] sbytes = text.getBytes(charset);
76                 byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1);
77                 publisher.subscribe(bodySubscriber);
78                 System.out.println("Publishing " + bytes.length + " bytes");
79                 for (int i = 0; i < bytes.length; i++) {
80                     // ensure that surrogates are split over several buffers.
81                     publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
82                 }
83                 publisher.close();
84             } catch(Throwable t) {
85                 errorRef.set(t);
86             }
87         };
88         Thread thread = new Thread(run,"Publishing");
89         thread.start();
90         try {
91             Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
92             List<String> list = stream.collect(Collectors.toList());
93             String resp = list.stream().collect(Collectors.joining(""));
94             System.out.println("***** Got: " + resp);
95 
96             byte[] sbytes = text.getBytes(UTF_8);
97             byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1);
98             ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
99             BufferedReader reader = new BufferedReader(new InputStreamReader(bais, charset));
100             String resp2 = reader.lines().collect(Collectors.joining(""));
101             System.out.println("***** Got2: " + resp2);
102 
103             assertEquals(resp, resp2);
104             assertEquals(list, List.of("Bient\u00f4t",
105                                        " nous plongerons",
106                                        " dans",
107                                        " les",
108                                        "",
109                                        " fr\u00f4\ud801\udc00des",
110                                        " t\u00e9n\u00e8bres\ufffd"));
111         } catch (ExecutionException x) {
112             Throwable cause = x.getCause();
113             if (cause instanceof MalformedInputException) {
114                 throw new RuntimeException("Unexpected MalformedInputException", cause);
115             }
116             throw x;
117         }
118         if (errorRef.get() != null) {
119             throw new RuntimeException("Unexpected exception", errorRef.get());
120         }
121     }
122 
123     @Test
testStream1()124     void testStream1() throws Exception {
125         // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
126         String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r les\n\n" +
127                 " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
128         Charset charset = UTF_8;
129 
130         BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset);
131         SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
132         byte[] bytes = text.getBytes(charset);
133         AtomicReference<Throwable> errorRef = new AtomicReference<>();
134         Runnable run = () -> {
135             try {
136                 publisher.subscribe(bodySubscriber);
137                 System.out.println("Publishing " + bytes.length + " bytes");
138                 for (int i = 0; i < bytes.length; i++) {
139                     // ensure that surrogates are split over several buffers.
140                     publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
141                 }
142                 publisher.close();
143             } catch(Throwable t) {
144                 errorRef.set(t);
145             }
146         };
147 
148         Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
149         Thread thread = new Thread(run,"Publishing");
150         thread.start();
151         List<String> list = stream.collect(Collectors.toList());
152         String resp = list.stream().collect(Collectors.joining("|"));
153         System.out.println("***** Got: " + resp);
154         assertEquals(resp, text.replace("\r\n", "|")
155                                .replace("\n","|")
156                                .replace("\r","|"));
157         assertEquals(list, List.of("Bient\u00f4t",
158                 " nous plongerons",
159                 " dans",
160                 "",
161                 " les",
162                 "",
163                 " fr\u00f4\ud801\udc00des",
164                 " t\u00e9n\u00e8bres"));
165         assertEquals(list, lines(text));
166         if (errorRef.get() != null) {
167             throw new RuntimeException("Unexpected exception", errorRef.get());
168         }
169     }
170 
171 
172     @Test
testStream2()173     void testStream2() throws Exception {
174         String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" +
175                 " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
176         Charset charset = UTF_8;
177 
178         BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset);
179         SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
180         byte[] bytes = text.getBytes(charset);
181         AtomicReference<Throwable> errorRef = new AtomicReference<>();
182         Runnable run = () -> {
183             try {
184                 publisher.subscribe(bodySubscriber);
185                 System.out.println("Publishing " + bytes.length + " bytes");
186                 for (int i = 0; i < bytes.length; i++) {
187                     // ensure that surrogates are split over several buffers.
188                     publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
189                 }
190                 publisher.close();
191             } catch(Throwable t) {
192                 errorRef.set(t);
193             }
194         };
195 
196         Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
197         Thread thread = new Thread(run,"Publishing");
198         thread.start();
199         List<String> list = stream.collect(Collectors.toList());
200         String resp = list.stream().collect(Collectors.joining(""));
201         System.out.println("***** Got: " + resp);
202         String expected = Stream.of(text.split("\r\n|\r|\n"))
203                 .collect(Collectors.joining(""));
204         assertEquals(resp, expected);
205         assertEquals(list, List.of("Bient\u00f4t",
206                 " nous plongerons",
207                 " dans",
208                 "",
209                 " les fr\u00f4\ud801\udc00des",
210                 " t\u00e9n\u00e8bres",
211                 ""));
212         assertEquals(list, lines(text));
213         if (errorRef.get() != null) {
214             throw new RuntimeException("Unexpected exception", errorRef.get());
215         }
216     }
217 
218     @Test
testStream3_UTF16()219     void testStream3_UTF16() throws Exception {
220         // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00
221         String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" +
222                 " les\n\n fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres";
223         Charset charset = UTF_16;
224 
225         BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset);
226         SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
227         byte[] bytes = text.getBytes(charset);
228         AtomicReference<Throwable> errorRef = new AtomicReference<>();
229         Runnable run = () -> {
230             try {
231                 publisher.subscribe(bodySubscriber);
232                 System.out.println("Publishing " + bytes.length + " bytes");
233                 for (int i = 0; i < bytes.length; i++) {
234                     // ensure that surrogates are split over several buffers.
235                     publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
236                 }
237                 publisher.close();
238             } catch(Throwable t) {
239                 errorRef.set(t);
240             }
241         };
242 
243         Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
244         Thread thread = new Thread(run,"Publishing");
245         thread.start();
246         List<String> list = stream.collect(Collectors.toList());
247         String resp = list.stream().collect(Collectors.joining(""));
248         System.out.println("***** Got: " + resp);
249         assertEquals(resp, text.replace("\n","").replace("\r",""));
250         assertEquals(list, List.of("Bient\u00f4t",
251                 " nous plongerons",
252                 " dans",
253                 "",
254                 " les",
255                 "",
256                 " fr\u00f4\ud801\udc00des",
257                 " t\u00e9n\u00e8bres"));
258         assertEquals(list, lines(text));
259         if (errorRef.get() != null) {
260             throw new RuntimeException("Unexpected exception", errorRef.get());
261         }
262     }
263 
264 
265     @Test
testStream4_UTF16()266     void testStream4_UTF16() throws Exception {
267         String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" +
268                 " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r";
269         Charset charset = UTF_16;
270 
271         BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset);
272         SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>();
273         byte[] bytes = text.getBytes(charset);
274         AtomicReference<Throwable> errorRef = new AtomicReference<>();
275         Runnable run = () -> {
276             try {
277                 publisher.subscribe(bodySubscriber);
278                 System.out.println("Publishing " + bytes.length + " bytes");
279                 for (int i = 0; i < bytes.length; i++) {
280                     // ensure that surrogates are split over several buffers.
281                     publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1)));
282                 }
283                 publisher.close();
284             } catch(Throwable t) {
285                 errorRef.set(t);
286             }
287         };
288 
289         Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get();
290         Thread thread = new Thread(run,"Publishing");
291         thread.start();
292         List<String> list = stream.collect(Collectors.toList());
293         String resp = list.stream().collect(Collectors.joining(""));
294         System.out.println("***** Got: " + resp);
295         String expected = Stream.of(text.split("\r\n|\r|\n"))
296                 .collect(Collectors.joining(""));
297         assertEquals(resp, expected);
298         assertEquals(list, List.of("Bient\u00f4t",
299                 " nous plongerons",
300                 " dans",
301                 "",
302                 " les fr\u00f4\ud801\udc00des",
303                 " t\u00e9n\u00e8bres",
304                 ""));
305         assertEquals(list, lines(text));
306         if (errorRef.get() != null) {
307             throw new RuntimeException("Unexpected exception", errorRef.get());
308         }
309     }
310 
311 }
312