1 /* 2 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.BufferedReader; 25 import java.io.ByteArrayInputStream; 26 import java.io.InputStreamReader; 27 import java.io.StringReader; 28 import java.net.http.HttpResponse.BodySubscriber; 29 import java.net.http.HttpResponse.BodySubscribers; 30 import java.nio.ByteBuffer; 31 import java.nio.charset.Charset; 32 import java.nio.charset.MalformedInputException; 33 import java.util.Arrays; 34 import java.util.List; 35 import java.util.concurrent.ExecutionException; 36 import java.util.concurrent.SubmissionPublisher; 37 import java.util.concurrent.atomic.AtomicReference; 38 import java.util.stream.Collectors; 39 import java.util.stream.Stream; 40 import org.testng.annotations.Test; 41 import static java.nio.charset.StandardCharsets.UTF_8; 42 import static java.nio.charset.StandardCharsets.UTF_16; 43 import static org.testng.Assert.assertEquals; 44 45 /* 46 * @test 47 * @summary tests for BodySubscribers returned by asLines. 48 * In particular tests that surrogate characters are handled 49 * correctly. 50 * @modules java.net.http java.logging 51 * @run testng/othervm LineStreamsAndSurrogatesTest 52 */ 53 54 public class LineStreamsAndSurrogatesTest { 55 56 57 static final Class<NullPointerException> NPE = NullPointerException.class; 58 lines(String text)59 private static final List<String> lines(String text) { 60 return new BufferedReader(new StringReader(text)).lines().collect(Collectors.toList()); 61 } 62 63 @Test testUncomplete()64 void testUncomplete() throws Exception { 65 // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00 66 String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r les\n\n" + 67 " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\ud801\udc00"; 68 Charset charset = UTF_8; 69 70 BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset); 71 AtomicReference<Throwable> errorRef = new AtomicReference<>(); 72 Runnable run = () -> { 73 try { 74 SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>(); 75 byte[] sbytes = text.getBytes(charset); 76 byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1); 77 publisher.subscribe(bodySubscriber); 78 System.out.println("Publishing " + bytes.length + " bytes"); 79 for (int i = 0; i < bytes.length; i++) { 80 // ensure that surrogates are split over several buffers. 81 publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); 82 } 83 publisher.close(); 84 } catch(Throwable t) { 85 errorRef.set(t); 86 } 87 }; 88 Thread thread = new Thread(run,"Publishing"); 89 thread.start(); 90 try { 91 Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get(); 92 List<String> list = stream.collect(Collectors.toList()); 93 String resp = list.stream().collect(Collectors.joining("")); 94 System.out.println("***** Got: " + resp); 95 96 byte[] sbytes = text.getBytes(UTF_8); 97 byte[] bytes = Arrays.copyOfRange(sbytes, 0, sbytes.length - 1); 98 ByteArrayInputStream bais = new ByteArrayInputStream(bytes); 99 BufferedReader reader = new BufferedReader(new InputStreamReader(bais, charset)); 100 String resp2 = reader.lines().collect(Collectors.joining("")); 101 System.out.println("***** Got2: " + resp2); 102 103 assertEquals(resp, resp2); 104 assertEquals(list, List.of("Bient\u00f4t", 105 " nous plongerons", 106 " dans", 107 " les", 108 "", 109 " fr\u00f4\ud801\udc00des", 110 " t\u00e9n\u00e8bres\ufffd")); 111 } catch (ExecutionException x) { 112 Throwable cause = x.getCause(); 113 if (cause instanceof MalformedInputException) { 114 throw new RuntimeException("Unexpected MalformedInputException", cause); 115 } 116 throw x; 117 } 118 if (errorRef.get() != null) { 119 throw new RuntimeException("Unexpected exception", errorRef.get()); 120 } 121 } 122 123 @Test testStream1()124 void testStream1() throws Exception { 125 // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00 126 String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r les\n\n" + 127 " fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres"; 128 Charset charset = UTF_8; 129 130 BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset); 131 SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>(); 132 byte[] bytes = text.getBytes(charset); 133 AtomicReference<Throwable> errorRef = new AtomicReference<>(); 134 Runnable run = () -> { 135 try { 136 publisher.subscribe(bodySubscriber); 137 System.out.println("Publishing " + bytes.length + " bytes"); 138 for (int i = 0; i < bytes.length; i++) { 139 // ensure that surrogates are split over several buffers. 140 publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); 141 } 142 publisher.close(); 143 } catch(Throwable t) { 144 errorRef.set(t); 145 } 146 }; 147 148 Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get(); 149 Thread thread = new Thread(run,"Publishing"); 150 thread.start(); 151 List<String> list = stream.collect(Collectors.toList()); 152 String resp = list.stream().collect(Collectors.joining("|")); 153 System.out.println("***** Got: " + resp); 154 assertEquals(resp, text.replace("\r\n", "|") 155 .replace("\n","|") 156 .replace("\r","|")); 157 assertEquals(list, List.of("Bient\u00f4t", 158 " nous plongerons", 159 " dans", 160 "", 161 " les", 162 "", 163 " fr\u00f4\ud801\udc00des", 164 " t\u00e9n\u00e8bres")); 165 assertEquals(list, lines(text)); 166 if (errorRef.get() != null) { 167 throw new RuntimeException("Unexpected exception", errorRef.get()); 168 } 169 } 170 171 172 @Test testStream2()173 void testStream2() throws Exception { 174 String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" + 175 " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r"; 176 Charset charset = UTF_8; 177 178 BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset); 179 SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>(); 180 byte[] bytes = text.getBytes(charset); 181 AtomicReference<Throwable> errorRef = new AtomicReference<>(); 182 Runnable run = () -> { 183 try { 184 publisher.subscribe(bodySubscriber); 185 System.out.println("Publishing " + bytes.length + " bytes"); 186 for (int i = 0; i < bytes.length; i++) { 187 // ensure that surrogates are split over several buffers. 188 publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); 189 } 190 publisher.close(); 191 } catch(Throwable t) { 192 errorRef.set(t); 193 } 194 }; 195 196 Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get(); 197 Thread thread = new Thread(run,"Publishing"); 198 thread.start(); 199 List<String> list = stream.collect(Collectors.toList()); 200 String resp = list.stream().collect(Collectors.joining("")); 201 System.out.println("***** Got: " + resp); 202 String expected = Stream.of(text.split("\r\n|\r|\n")) 203 .collect(Collectors.joining("")); 204 assertEquals(resp, expected); 205 assertEquals(list, List.of("Bient\u00f4t", 206 " nous plongerons", 207 " dans", 208 "", 209 " les fr\u00f4\ud801\udc00des", 210 " t\u00e9n\u00e8bres", 211 "")); 212 assertEquals(list, lines(text)); 213 if (errorRef.get() != null) { 214 throw new RuntimeException("Unexpected exception", errorRef.get()); 215 } 216 } 217 218 @Test testStream3_UTF16()219 void testStream3_UTF16() throws Exception { 220 // Uses U+10400 which is encoded as the surrogate pair U+D801 U+DC00 221 String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" + 222 " les\n\n fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres"; 223 Charset charset = UTF_16; 224 225 BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset); 226 SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>(); 227 byte[] bytes = text.getBytes(charset); 228 AtomicReference<Throwable> errorRef = new AtomicReference<>(); 229 Runnable run = () -> { 230 try { 231 publisher.subscribe(bodySubscriber); 232 System.out.println("Publishing " + bytes.length + " bytes"); 233 for (int i = 0; i < bytes.length; i++) { 234 // ensure that surrogates are split over several buffers. 235 publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); 236 } 237 publisher.close(); 238 } catch(Throwable t) { 239 errorRef.set(t); 240 } 241 }; 242 243 Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get(); 244 Thread thread = new Thread(run,"Publishing"); 245 thread.start(); 246 List<String> list = stream.collect(Collectors.toList()); 247 String resp = list.stream().collect(Collectors.joining("")); 248 System.out.println("***** Got: " + resp); 249 assertEquals(resp, text.replace("\n","").replace("\r","")); 250 assertEquals(list, List.of("Bient\u00f4t", 251 " nous plongerons", 252 " dans", 253 "", 254 " les", 255 "", 256 " fr\u00f4\ud801\udc00des", 257 " t\u00e9n\u00e8bres")); 258 assertEquals(list, lines(text)); 259 if (errorRef.get() != null) { 260 throw new RuntimeException("Unexpected exception", errorRef.get()); 261 } 262 } 263 264 265 @Test testStream4_UTF16()266 void testStream4_UTF16() throws Exception { 267 String text = "Bient\u00f4t\r\n nous plongerons\r\n dans\r\r" + 268 " les fr\u00f4\ud801\udc00des\r\n t\u00e9n\u00e8bres\r\r"; 269 Charset charset = UTF_16; 270 271 BodySubscriber<Stream<String>> bodySubscriber = BodySubscribers.ofLines(charset); 272 SubmissionPublisher<List<ByteBuffer>> publisher = new SubmissionPublisher<>(); 273 byte[] bytes = text.getBytes(charset); 274 AtomicReference<Throwable> errorRef = new AtomicReference<>(); 275 Runnable run = () -> { 276 try { 277 publisher.subscribe(bodySubscriber); 278 System.out.println("Publishing " + bytes.length + " bytes"); 279 for (int i = 0; i < bytes.length; i++) { 280 // ensure that surrogates are split over several buffers. 281 publisher.submit(List.of(ByteBuffer.wrap(bytes, i, 1))); 282 } 283 publisher.close(); 284 } catch(Throwable t) { 285 errorRef.set(t); 286 } 287 }; 288 289 Stream<String> stream = bodySubscriber.getBody().toCompletableFuture().get(); 290 Thread thread = new Thread(run,"Publishing"); 291 thread.start(); 292 List<String> list = stream.collect(Collectors.toList()); 293 String resp = list.stream().collect(Collectors.joining("")); 294 System.out.println("***** Got: " + resp); 295 String expected = Stream.of(text.split("\r\n|\r|\n")) 296 .collect(Collectors.joining("")); 297 assertEquals(resp, expected); 298 assertEquals(list, List.of("Bient\u00f4t", 299 " nous plongerons", 300 " dans", 301 "", 302 " les fr\u00f4\ud801\udc00des", 303 " t\u00e9n\u00e8bres", 304 "")); 305 assertEquals(list, lines(text)); 306 if (errorRef.get() != null) { 307 throw new RuntimeException("Unexpected exception", errorRef.get()); 308 } 309 } 310 311 } 312