1 /* 2 * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.net.http; 27 28 import java.io.BufferedReader; 29 import java.io.IOException; 30 import java.io.InputStream; 31 import java.net.URI; 32 import java.nio.ByteBuffer; 33 import java.nio.charset.Charset; 34 import java.nio.channels.FileChannel; 35 import java.nio.charset.StandardCharsets; 36 import java.nio.file.OpenOption; 37 import java.nio.file.Path; 38 import java.util.List; 39 import java.util.Objects; 40 import java.util.Optional; 41 import java.util.concurrent.CompletableFuture; 42 import java.util.concurrent.CompletionStage; 43 import java.util.concurrent.ConcurrentMap; 44 import java.util.concurrent.Flow; 45 import java.util.concurrent.Flow.Subscriber; 46 import java.util.concurrent.Flow.Publisher; 47 import java.util.concurrent.Flow.Subscription; 48 import java.util.function.Consumer; 49 import java.util.function.Function; 50 import java.util.function.Supplier; 51 import java.util.stream.Stream; 52 import javax.net.ssl.SSLSession; 53 import jdk.internal.net.http.BufferingSubscriber; 54 import jdk.internal.net.http.LineSubscriberAdapter; 55 import jdk.internal.net.http.ResponseBodyHandlers.FileDownloadBodyHandler; 56 import jdk.internal.net.http.ResponseBodyHandlers.PathBodyHandler; 57 import jdk.internal.net.http.ResponseBodyHandlers.PushPromisesHandlerWithMap; 58 import jdk.internal.net.http.ResponseSubscribers; 59 import jdk.internal.net.http.ResponseSubscribers.PathSubscriber; 60 import static java.nio.file.StandardOpenOption.*; 61 import static jdk.internal.net.http.common.Utils.charsetFrom; 62 63 /** 64 * An HTTP response. 65 * 66 * <p> An {@code HttpResponse} is not created directly, but rather returned as 67 * a result of sending an {@link HttpRequest}. An {@code HttpResponse} is 68 * made available when the response status code and headers have been received, 69 * and typically after the response body has also been completely received. 70 * Whether or not the {@code HttpResponse} is made available before the response 71 * body has been completely received depends on the {@link BodyHandler 72 * BodyHandler} provided when sending the {@code HttpRequest}. 73 * 74 * <p> This class provides methods for accessing the response status code, 75 * headers, the response body, and the {@code HttpRequest} corresponding 76 * to this response. 77 * 78 * <p> The following is an example of retrieving a response as a String: 79 * 80 * <pre>{@code HttpResponse<String> response = client 81 * .send(request, BodyHandlers.ofString()); }</pre> 82 * 83 * <p> The class {@link BodyHandlers BodyHandlers} provides implementations 84 * of many common response handlers. Alternatively, a custom {@code BodyHandler} 85 * implementation can be used. 86 * 87 * @param <T> the response body type 88 * @since 11 89 */ 90 public interface HttpResponse<T> { 91 92 93 /** 94 * Returns the status code for this response. 95 * 96 * @return the response code 97 */ statusCode()98 public int statusCode(); 99 100 /** 101 * Returns the {@link HttpRequest} corresponding to this response. 102 * 103 * <p> The returned {@code HttpRequest} may not be the initiating request 104 * provided when {@linkplain HttpClient#send(HttpRequest, BodyHandler) 105 * sending}. For example, if the initiating request was redirected, then the 106 * request returned by this method will have the redirected URI, which will 107 * be different from the initiating request URI. 108 * 109 * @see #previousResponse() 110 * 111 * @return the request 112 */ request()113 public HttpRequest request(); 114 115 /** 116 * Returns an {@code Optional} containing the previous intermediate response 117 * if one was received. An intermediate response is one that is received 118 * as a result of redirection or authentication. If no previous response 119 * was received then an empty {@code Optional} is returned. 120 * 121 * @return an Optional containing the HttpResponse, if any. 122 */ previousResponse()123 public Optional<HttpResponse<T>> previousResponse(); 124 125 /** 126 * Returns the received response headers. 127 * 128 * @return the response headers 129 */ headers()130 public HttpHeaders headers(); 131 132 /** 133 * Returns the body. Depending on the type of {@code T}, the returned body 134 * may represent the body after it was read (such as {@code byte[]}, or 135 * {@code String}, or {@code Path}) or it may represent an object with 136 * which the body is read, such as an {@link java.io.InputStream}. 137 * 138 * <p> If this {@code HttpResponse} was returned from an invocation of 139 * {@link #previousResponse()} then this method returns {@code null} 140 * 141 * @return the body 142 */ body()143 public T body(); 144 145 /** 146 * Returns an {@link Optional} containing the {@link SSLSession} in effect 147 * for this response. Returns an empty {@code Optional} if this is not a 148 * <i>HTTPS</i> response. 149 * 150 * @return an {@code Optional} containing the {@code SSLSession} associated 151 * with the response 152 */ sslSession()153 public Optional<SSLSession> sslSession(); 154 155 /** 156 * Returns the {@code URI} that the response was received from. This may be 157 * different from the request {@code URI} if redirection occurred. 158 * 159 * @return the URI of the response 160 */ uri()161 public URI uri(); 162 163 /** 164 * Returns the HTTP protocol version that was used for this response. 165 * 166 * @return HTTP protocol version 167 */ version()168 public HttpClient.Version version(); 169 170 171 /** 172 * Initial response information supplied to a {@link BodyHandler BodyHandler} 173 * when a response is initially received and before the body is processed. 174 */ 175 public interface ResponseInfo { 176 /** 177 * Provides the response status code. 178 * @return the response status code 179 */ statusCode()180 public int statusCode(); 181 182 /** 183 * Provides the response headers. 184 * @return the response headers 185 */ headers()186 public HttpHeaders headers(); 187 188 /** 189 * Provides the response protocol version. 190 * @return the response protocol version 191 */ version()192 public HttpClient.Version version(); 193 } 194 195 /** 196 * A handler for response bodies. The class {@link BodyHandlers BodyHandlers} 197 * provides implementations of many common body handlers. 198 * 199 * <p> The {@code BodyHandler} interface allows inspection of the response 200 * code and headers, before the actual response body is received, and is 201 * responsible for creating the response {@link BodySubscriber 202 * BodySubscriber}. The {@code BodySubscriber} consumes the actual response 203 * body bytes and, typically, converts them into a higher-level Java type. 204 * 205 * <p> A {@code BodyHandler} is a function that takes a {@link ResponseInfo 206 * ResponseInfo} object; and which returns a {@code BodySubscriber}. The 207 * {@code BodyHandler} is invoked when the response status code and headers 208 * are available, but before the response body bytes are received. 209 * 210 * <p> The following example uses one of the {@linkplain BodyHandlers 211 * predefined body handlers} that always process the response body in the 212 * same way ( streams the response body to a file ). 213 * 214 * <pre>{@code HttpRequest request = HttpRequest.newBuilder() 215 * .uri(URI.create("http://www.foo.com/")) 216 * .build(); 217 * client.sendAsync(request, BodyHandlers.ofFile(Paths.get("/tmp/f"))) 218 * .thenApply(HttpResponse::body) 219 * .thenAccept(System.out::println); }</pre> 220 * 221 * Note, that even though the pre-defined handlers do not examine the 222 * response code, the response code and headers are always retrievable from 223 * the {@link HttpResponse}, when it is returned. 224 * 225 * <p> In the second example, the function returns a different subscriber 226 * depending on the status code. 227 * <pre>{@code HttpRequest request = HttpRequest.newBuilder() 228 * .uri(URI.create("http://www.foo.com/")) 229 * .build(); 230 * BodyHandler<Path> bodyHandler = (rspInfo) -> rspInfo.statusCode() == 200 231 * ? BodySubscribers.ofFile(Paths.get("/tmp/f")) 232 * : BodySubscribers.replacing(Paths.get("/NULL")); 233 * client.sendAsync(request, bodyHandler) 234 * .thenApply(HttpResponse::body) 235 * .thenAccept(System.out::println); }</pre> 236 * 237 * @param <T> the response body type 238 * @see BodyHandlers 239 * @since 11 240 */ 241 @FunctionalInterface 242 public interface BodyHandler<T> { 243 244 /** 245 * Returns a {@link BodySubscriber BodySubscriber} considering the 246 * given response status code and headers. This method is invoked before 247 * the actual response body bytes are read and its implementation must 248 * return a {@link BodySubscriber BodySubscriber} to consume the response 249 * body bytes. 250 * 251 * <p> The response body can be discarded using one of {@link 252 * BodyHandlers#discarding() discarding} or {@link 253 * BodyHandlers#replacing(Object) replacing}. 254 * 255 * @param responseInfo the response info 256 * @return a body subscriber 257 */ apply(ResponseInfo responseInfo)258 public BodySubscriber<T> apply(ResponseInfo responseInfo); 259 } 260 261 /** 262 * Implementations of {@link BodyHandler BodyHandler} that implement various 263 * useful handlers, such as handling the response body as a String, or 264 * streaming the response body to a file. 265 * 266 * <p> These implementations do not examine the status code, meaning the 267 * body is always accepted. They typically return an equivalently named 268 * {@code BodySubscriber}. Alternatively, a custom handler can be used to 269 * examine the status code and headers, and return a different body 270 * subscriber, of the same type, as appropriate. 271 * 272 * <p>The following are examples of using the predefined body handlers to 273 * convert a flow of response body data into common high-level Java objects: 274 * 275 * <pre>{@code // Receives the response body as a String 276 * HttpResponse<String> response = client 277 * .send(request, BodyHandlers.ofString()); 278 * 279 * // Receives the response body as a file 280 * HttpResponse<Path> response = client 281 * .send(request, BodyHandlers.ofFile(Paths.get("example.html"))); 282 * 283 * // Receives the response body as an InputStream 284 * HttpResponse<InputStream> response = client 285 * .send(request, BodyHandlers.ofInputStream()); 286 * 287 * // Discards the response body 288 * HttpResponse<Void> response = client 289 * .send(request, BodyHandlers.discarding()); }</pre> 290 * 291 * @since 11 292 */ 293 public static class BodyHandlers { 294 BodyHandlers()295 private BodyHandlers() { } 296 297 /** 298 * Returns a response body handler that returns a {@link BodySubscriber 299 * BodySubscriber}{@code <Void>} obtained from {@link 300 * BodySubscribers#fromSubscriber(Subscriber)}, with the given 301 * {@code subscriber}. 302 * 303 * <p> The response body is not available through this, or the {@code 304 * HttpResponse} API, but instead all response body is forwarded to the 305 * given {@code subscriber}, which should make it available, if 306 * appropriate, through some other mechanism, e.g. an entry in a 307 * database, etc. 308 * 309 * @apiNote This method can be used as an adapter between {@code 310 * BodySubscriber} and {@code Flow.Subscriber}. 311 * 312 * <p> For example: 313 * <pre> {@code TextSubscriber subscriber = new TextSubscriber(); 314 * HttpResponse<Void> response = client.sendAsync(request, 315 * BodyHandlers.fromSubscriber(subscriber)).join(); 316 * System.out.println(response.statusCode()); }</pre> 317 * 318 * @param subscriber the subscriber 319 * @return a response body handler 320 */ 321 public static BodyHandler<Void> fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber)322 fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) { 323 Objects.requireNonNull(subscriber); 324 return (responseInfo) -> BodySubscribers.fromSubscriber(subscriber, 325 s -> null); 326 } 327 328 /** 329 * Returns a response body handler that returns a {@link BodySubscriber 330 * BodySubscriber}{@code <T>} obtained from {@link 331 * BodySubscribers#fromSubscriber(Subscriber, Function)}, with the 332 * given {@code subscriber} and {@code finisher} function. 333 * 334 * <p> The given {@code finisher} function is applied after the given 335 * subscriber's {@code onComplete} has been invoked. The {@code finisher} 336 * function is invoked with the given subscriber, and returns a value 337 * that is set as the response's body. 338 * 339 * @apiNote This method can be used as an adapter between {@code 340 * BodySubscriber} and {@code Flow.Subscriber}. 341 * 342 * <p> For example: 343 * <pre> {@code TextSubscriber subscriber = ...; // accumulates bytes and transforms them into a String 344 * HttpResponse<String> response = client.sendAsync(request, 345 * BodyHandlers.fromSubscriber(subscriber, TextSubscriber::getTextResult)).join(); 346 * String text = response.body(); }</pre> 347 * 348 * @param <S> the type of the Subscriber 349 * @param <T> the type of the response body 350 * @param subscriber the subscriber 351 * @param finisher a function to be applied after the subscriber has completed 352 * @return a response body handler 353 */ 354 public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodyHandler<T> fromSubscriber(S subscriber, Function<? super S,? extends T> finisher)355 fromSubscriber(S subscriber, Function<? super S,? extends T> finisher) { 356 Objects.requireNonNull(subscriber); 357 Objects.requireNonNull(finisher); 358 return (responseInfo) -> BodySubscribers.fromSubscriber(subscriber, 359 finisher); 360 } 361 362 /** 363 * Returns a response body handler that returns a {@link BodySubscriber 364 * BodySubscriber}{@code <Void>} obtained from {@link 365 * BodySubscribers#fromLineSubscriber(Subscriber, Function, Charset, String) 366 * BodySubscribers.fromLineSubscriber(subscriber, s -> null, charset, null)}, 367 * with the given {@code subscriber}. 368 * The {@link Charset charset} used to decode the response body bytes is 369 * obtained from the HTTP response headers as specified by {@link #ofString()}, 370 * and lines are delimited in the manner of {@link BufferedReader#readLine()}. 371 * 372 * <p> The response body is not available through this, or the {@code 373 * HttpResponse} API, but instead all response body is forwarded to the 374 * given {@code subscriber}, which should make it available, if 375 * appropriate, through some other mechanism, e.g. an entry in a 376 * database, etc. 377 * 378 * @apiNote This method can be used as an adapter between a {@code 379 * BodySubscriber} and a text based {@code Flow.Subscriber} that parses 380 * text line by line. 381 * 382 * <p> For example: 383 * <pre> {@code // A PrintSubscriber that implements Flow.Subscriber<String> 384 * // and print lines received by onNext() on System.out 385 * PrintSubscriber subscriber = new PrintSubscriber(System.out); 386 * client.sendAsync(request, BodyHandlers.fromLineSubscriber(subscriber)) 387 * .thenApply(HttpResponse::statusCode) 388 * .thenAccept((status) -> { 389 * if (status != 200) { 390 * System.err.printf("ERROR: %d status received%n", status); 391 * } 392 * }); }</pre> 393 * 394 * @param subscriber the subscriber 395 * @return a response body handler 396 */ 397 public static BodyHandler<Void> fromLineSubscriber(Subscriber<? super String> subscriber)398 fromLineSubscriber(Subscriber<? super String> subscriber) { 399 Objects.requireNonNull(subscriber); 400 return (responseInfo) -> 401 BodySubscribers.fromLineSubscriber(subscriber, 402 s -> null, 403 charsetFrom(responseInfo.headers()), 404 null); 405 } 406 407 /** 408 * Returns a response body handler that returns a {@link BodySubscriber 409 * BodySubscriber}{@code <T>} obtained from {@link 410 * BodySubscribers#fromLineSubscriber(Subscriber, Function, Charset, String) 411 * BodySubscribers.fromLineSubscriber(subscriber, finisher, charset, lineSeparator)}, 412 * with the given {@code subscriber}, {@code finisher} function, and line separator. 413 * The {@link Charset charset} used to decode the response body bytes is 414 * obtained from the HTTP response headers as specified by {@link #ofString()}. 415 * 416 * <p> The given {@code finisher} function is applied after the given 417 * subscriber's {@code onComplete} has been invoked. The {@code finisher} 418 * function is invoked with the given subscriber, and returns a value 419 * that is set as the response's body. 420 * 421 * @apiNote This method can be used as an adapter between a {@code 422 * BodySubscriber} and a text based {@code Flow.Subscriber} that parses 423 * text line by line. 424 * 425 * <p> For example: 426 * <pre> {@code // A LineParserSubscriber that implements Flow.Subscriber<String> 427 * // and accumulates lines that match a particular pattern 428 * Pattern pattern = ...; 429 * LineParserSubscriber subscriber = new LineParserSubscriber(pattern); 430 * HttpResponse<List<String>> response = client.send(request, 431 * BodyHandlers.fromLineSubscriber(subscriber, s -> s.getMatchingLines(), "\n")); 432 * if (response.statusCode() != 200) { 433 * System.err.printf("ERROR: %d status received%n", response.statusCode()); 434 * } }</pre> 435 * 436 * 437 * @param <S> the type of the Subscriber 438 * @param <T> the type of the response body 439 * @param subscriber the subscriber 440 * @param finisher a function to be applied after the subscriber has completed 441 * @param lineSeparator an optional line separator: can be {@code null}, 442 * in which case lines will be delimited in the manner of 443 * {@link BufferedReader#readLine()}. 444 * @return a response body handler 445 * @throws IllegalArgumentException if the supplied {@code lineSeparator} 446 * is the empty string 447 */ 448 public static <S extends Subscriber<? super String>,T> BodyHandler<T> fromLineSubscriber(S subscriber, Function<? super S,? extends T> finisher, String lineSeparator)449 fromLineSubscriber(S subscriber, 450 Function<? super S,? extends T> finisher, 451 String lineSeparator) { 452 Objects.requireNonNull(subscriber); 453 Objects.requireNonNull(finisher); 454 // implicit null check 455 if (lineSeparator != null && lineSeparator.isEmpty()) 456 throw new IllegalArgumentException("empty line separator"); 457 return (responseInfo) -> 458 BodySubscribers.fromLineSubscriber(subscriber, 459 finisher, 460 charsetFrom(responseInfo.headers()), 461 lineSeparator); 462 } 463 464 /** 465 * Returns a response body handler that discards the response body. 466 * 467 * @return a response body handler 468 */ discarding()469 public static BodyHandler<Void> discarding() { 470 return (responseInfo) -> BodySubscribers.discarding(); 471 } 472 473 /** 474 * Returns a response body handler that returns the given replacement 475 * value, after discarding the response body. 476 * 477 * @param <U> the response body type 478 * @param value the value of U to return as the body, may be {@code null} 479 * @return a response body handler 480 */ replacing(U value)481 public static <U> BodyHandler<U> replacing(U value) { 482 return (responseInfo) -> BodySubscribers.replacing(value); 483 } 484 485 /** 486 * Returns a {@code BodyHandler<String>} that returns a 487 * {@link BodySubscriber BodySubscriber}{@code <String>} obtained from 488 * {@link BodySubscribers#ofString(Charset) BodySubscribers.ofString(Charset)}. 489 * The body is decoded using the given character set. 490 * 491 * @param charset the character set to convert the body with 492 * @return a response body handler 493 */ ofString(Charset charset)494 public static BodyHandler<String> ofString(Charset charset) { 495 Objects.requireNonNull(charset); 496 return (responseInfo) -> BodySubscribers.ofString(charset); 497 } 498 499 /** 500 * Returns a {@code BodyHandler<Path>} that returns a 501 * {@link BodySubscriber BodySubscriber}{@code <Path>} obtained from 502 * {@link BodySubscribers#ofFile(Path, OpenOption...) 503 * BodySubscribers.ofFile(Path,OpenOption...)}. 504 * 505 * <p> When the {@code HttpResponse} object is returned, the body has 506 * been completely written to the file, and {@link #body()} returns a 507 * reference to its {@link Path}. 508 * 509 * <p> In the case of the default file system provider, security manager 510 * permission checks are performed in this factory method, when the 511 * {@code BodyHandler} is created. Otherwise, 512 * {@linkplain FileChannel#open(Path, OpenOption...) permission checks} 513 * may be performed asynchronously against the caller's context 514 * at file access time. 515 * Care must be taken that the {@code BodyHandler} is not shared with 516 * untrusted code. 517 * 518 * @param file the file to store the body in 519 * @param openOptions any options to use when opening/creating the file 520 * @return a response body handler 521 * @throws IllegalArgumentException if an invalid set of open options 522 * are specified 523 * @throws SecurityException in the case of the default file system 524 * provider, and a security manager is installed, 525 * {@link SecurityManager#checkWrite(String) checkWrite} 526 * is invoked to check write access to the given file 527 */ ofFile(Path file, OpenOption... openOptions)528 public static BodyHandler<Path> ofFile(Path file, OpenOption... openOptions) { 529 Objects.requireNonNull(file); 530 List<OpenOption> opts = List.of(openOptions); 531 if (opts.contains(DELETE_ON_CLOSE) || opts.contains(READ)) { 532 // these options make no sense, since the FileChannel is not exposed 533 throw new IllegalArgumentException("invalid openOptions: " + opts); 534 } 535 return PathBodyHandler.create(file, opts); 536 } 537 538 /** 539 * Returns a {@code BodyHandler<Path>} that returns a 540 * {@link BodySubscriber BodySubscriber}{@code <Path>}. 541 * 542 * <p> Equivalent to: {@code ofFile(file, CREATE, WRITE)} 543 * 544 * <p> In the case of the default file system provider, security manager 545 * permission checks are performed in this factory method, when the 546 * {@code BodyHandler} is created. Otherwise, 547 * {@linkplain FileChannel#open(Path, OpenOption...) permission checks} 548 * may be performed asynchronously against the caller's context 549 * at file access time. 550 * Care must be taken that the {@code BodyHandler} is not shared with 551 * untrusted code. 552 * 553 * @param file the file to store the body in 554 * @return a response body handler 555 * @throws SecurityException in the case of the default file system 556 * provider, and a security manager is installed, 557 * {@link SecurityManager#checkWrite(String) checkWrite} 558 * is invoked to check write access to the given file 559 */ ofFile(Path file)560 public static BodyHandler<Path> ofFile(Path file) { 561 return BodyHandlers.ofFile(file, CREATE, WRITE); 562 } 563 564 /** 565 * Returns a {@code BodyHandler<Path>} that returns a 566 * {@link BodySubscriber BodySubscriber}<{@link Path}> 567 * where the download directory is specified, but the filename is 568 * obtained from the {@code Content-Disposition} response header. The 569 * {@code Content-Disposition} header must specify the <i>attachment</i> 570 * type and must also contain a <i>filename</i> parameter. If the 571 * filename specifies multiple path components only the final component 572 * is used as the filename (with the given directory name). 573 * 574 * <p> When the {@code HttpResponse} object is returned, the body has 575 * been completely written to the file and {@link #body()} returns a 576 * {@code Path} object for the file. The returned {@code Path} is the 577 * combination of the supplied directory name and the file name supplied 578 * by the server. If the destination directory does not exist or cannot 579 * be written to, then the response will fail with an {@link IOException}. 580 * 581 * <p> Security manager permission checks are performed in this factory 582 * method, when the {@code BodyHandler} is created. Care must be taken 583 * that the {@code BodyHandler} is not shared with untrusted code. 584 * 585 * @param directory the directory to store the file in 586 * @param openOptions open options used when opening the file 587 * @return a response body handler 588 * @throws IllegalArgumentException if the given path does not exist, 589 * is not of the default file system, is not a directory, 590 * is not writable, or if an invalid set of open options 591 * are specified 592 * @throws SecurityException in the case of the default file system 593 * provider and a security manager has been installed, 594 * and it denies 595 * {@linkplain SecurityManager#checkRead(String) read access} 596 * to the directory, or it denies 597 * {@linkplain SecurityManager#checkWrite(String) write access} 598 * to the directory, or it denies 599 * {@linkplain SecurityManager#checkWrite(String) write access} 600 * to the files within the directory. 601 */ ofFileDownload(Path directory, OpenOption... openOptions)602 public static BodyHandler<Path> ofFileDownload(Path directory, 603 OpenOption... openOptions) { 604 Objects.requireNonNull(directory); 605 List<OpenOption> opts = List.of(openOptions); 606 if (opts.contains(DELETE_ON_CLOSE)) { 607 throw new IllegalArgumentException("invalid option: " + DELETE_ON_CLOSE); 608 } 609 return FileDownloadBodyHandler.create(directory, opts); 610 } 611 612 /** 613 * Returns a {@code BodyHandler<InputStream>} that returns a 614 * {@link BodySubscriber BodySubscriber}{@code <InputStream>} obtained from 615 * {@link BodySubscribers#ofInputStream() BodySubscribers.ofInputStream}. 616 * 617 * <p> When the {@code HttpResponse} object is returned, the response 618 * headers will have been completely read, but the body may not have 619 * been fully received yet. The {@link #body()} method returns an 620 * {@link InputStream} from which the body can be read as it is received. 621 * 622 * @apiNote See {@link BodySubscribers#ofInputStream()} for more 623 * information. 624 * 625 * @return a response body handler 626 */ ofInputStream()627 public static BodyHandler<InputStream> ofInputStream() { 628 return (responseInfo) -> BodySubscribers.ofInputStream(); 629 } 630 631 /** 632 * Returns a {@code BodyHandler<Stream<String>>} that returns a 633 * {@link BodySubscriber BodySubscriber}{@code <Stream<String>>} obtained 634 * from {@link BodySubscribers#ofLines(Charset) BodySubscribers.ofLines(charset)}. 635 * The {@link Charset charset} used to decode the response body bytes is 636 * obtained from the HTTP response headers as specified by {@link #ofString()}, 637 * and lines are delimited in the manner of {@link BufferedReader#readLine()}. 638 * 639 * <p> When the {@code HttpResponse} object is returned, the body may 640 * not have been completely received. 641 * 642 * @return a response body handler 643 */ ofLines()644 public static BodyHandler<Stream<String>> ofLines() { 645 return (responseInfo) -> 646 BodySubscribers.ofLines(charsetFrom(responseInfo.headers())); 647 } 648 649 /** 650 * Returns a {@code BodyHandler<Void>} that returns a 651 * {@link BodySubscriber BodySubscriber}{@code <Void>} obtained from 652 * {@link BodySubscribers#ofByteArrayConsumer(Consumer) 653 * BodySubscribers.ofByteArrayConsumer(Consumer)}. 654 * 655 * <p> When the {@code HttpResponse} object is returned, the body has 656 * been completely written to the consumer. 657 * 658 * @apiNote 659 * The subscriber returned by this handler is not flow controlled. 660 * Therefore, the supplied consumer must be able to process whatever 661 * amount of data is delivered in a timely fashion. 662 * 663 * @param consumer a Consumer to accept the response body 664 * @return a response body handler 665 */ 666 public static BodyHandler<Void> ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer)667 ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer) { 668 Objects.requireNonNull(consumer); 669 return (responseInfo) -> BodySubscribers.ofByteArrayConsumer(consumer); 670 } 671 672 /** 673 * Returns a {@code BodyHandler<byte[]>} that returns a 674 * {@link BodySubscriber BodySubscriber}{@code <byte[]>} obtained 675 * from {@link BodySubscribers#ofByteArray() BodySubscribers.ofByteArray()}. 676 * 677 * <p> When the {@code HttpResponse} object is returned, the body has 678 * been completely written to the byte array. 679 * 680 * @return a response body handler 681 */ ofByteArray()682 public static BodyHandler<byte[]> ofByteArray() { 683 return (responseInfo) -> BodySubscribers.ofByteArray(); 684 } 685 686 /** 687 * Returns a {@code BodyHandler<String>} that returns a 688 * {@link BodySubscriber BodySubscriber}{@code <String>} obtained from 689 * {@link BodySubscribers#ofString(Charset) BodySubscribers.ofString(Charset)}. 690 * The body is decoded using the character set specified in 691 * the {@code Content-Type} response header. If there is no such 692 * header, or the character set is not supported, then 693 * {@link StandardCharsets#UTF_8 UTF_8} is used. 694 * 695 * <p> When the {@code HttpResponse} object is returned, the body has 696 * been completely written to the string. 697 * 698 * @return a response body handler 699 */ ofString()700 public static BodyHandler<String> ofString() { 701 return (responseInfo) -> BodySubscribers.ofString(charsetFrom(responseInfo.headers())); 702 } 703 704 /** 705 * Returns a {@code BodyHandler<Publisher<List<ByteBuffer>>>} that creates a 706 * {@link BodySubscriber BodySubscriber}{@code <Publisher<List<ByteBuffer>>>} 707 * obtained from {@link BodySubscribers#ofPublisher() 708 * BodySubscribers.ofPublisher()}. 709 * 710 * <p> When the {@code HttpResponse} object is returned, the response 711 * headers will have been completely read, but the body may not have 712 * been fully received yet. The {@link #body()} method returns a 713 * {@link Publisher Publisher}{@code <List<ByteBuffer>>} from which the body 714 * response bytes can be obtained as they are received. The publisher 715 * can and must be subscribed to only once. 716 * 717 * @apiNote See {@link BodySubscribers#ofPublisher()} for more 718 * information. 719 * 720 * @return a response body handler 721 */ ofPublisher()722 public static BodyHandler<Publisher<List<ByteBuffer>>> ofPublisher() { 723 return (responseInfo) -> BodySubscribers.ofPublisher(); 724 } 725 726 /** 727 * Returns a {@code BodyHandler} which, when invoked, returns a {@linkplain 728 * BodySubscribers#buffering(BodySubscriber,int) buffering BodySubscriber} 729 * that buffers data before delivering it to the downstream subscriber. 730 * These {@code BodySubscriber} instances are created by calling 731 * {@link BodySubscribers#buffering(BodySubscriber,int) 732 * BodySubscribers.buffering} with a subscriber obtained from the given 733 * downstream handler and the {@code bufferSize} parameter. 734 * 735 * @param <T> the response body type 736 * @param downstreamHandler the downstream handler 737 * @param bufferSize the buffer size parameter passed to {@link 738 * BodySubscribers#buffering(BodySubscriber,int) BodySubscribers.buffering} 739 * @return a body handler 740 * @throws IllegalArgumentException if {@code bufferSize <= 0} 741 */ buffering(BodyHandler<T> downstreamHandler, int bufferSize)742 public static <T> BodyHandler<T> buffering(BodyHandler<T> downstreamHandler, 743 int bufferSize) { 744 Objects.requireNonNull(downstreamHandler); 745 if (bufferSize <= 0) 746 throw new IllegalArgumentException("must be greater than 0"); 747 return (responseInfo) -> BodySubscribers 748 .buffering(downstreamHandler.apply(responseInfo), 749 bufferSize); 750 } 751 } 752 753 /** 754 * A handler for push promises. 755 * 756 * <p> A <i>push promise</i> is a synthetic request sent by an HTTP/2 server 757 * when retrieving an initiating client-sent request. The server has 758 * determined, possibly through inspection of the initiating request, that 759 * the client will likely need the promised resource, and hence pushes a 760 * synthetic push request, in the form of a push promise, to the client. The 761 * client can choose to accept or reject the push promise request. 762 * 763 * <p> A push promise request may be received up to the point where the 764 * response body of the initiating client-sent request has been fully 765 * received. The delivery of a push promise response, however, is not 766 * coordinated with the delivery of the response to the initiating 767 * client-sent request. 768 * 769 * @param <T> the push promise response body type 770 * @since 11 771 */ 772 public interface PushPromiseHandler<T> { 773 774 /** 775 * Notification of an incoming push promise. 776 * 777 * <p> This method is invoked once for each push promise received, up 778 * to the point where the response body of the initiating client-sent 779 * request has been fully received. 780 * 781 * <p> A push promise is accepted by invoking the given {@code acceptor} 782 * function. The {@code acceptor} function must be passed a non-null 783 * {@code BodyHandler}, that is to be used to handle the promise's 784 * response body. The acceptor function will return a {@code 785 * CompletableFuture} that completes with the promise's response. 786 * 787 * <p> If the {@code acceptor} function is not successfully invoked, 788 * then the push promise is rejected. The {@code acceptor} function will 789 * throw an {@code IllegalStateException} if invoked more than once. 790 * 791 * @param initiatingRequest the initiating client-send request 792 * @param pushPromiseRequest the synthetic push request 793 * @param acceptor the acceptor function that must be successfully 794 * invoked to accept the push promise 795 */ applyPushPromise( HttpRequest initiatingRequest, HttpRequest pushPromiseRequest, Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor )796 public void applyPushPromise( 797 HttpRequest initiatingRequest, 798 HttpRequest pushPromiseRequest, 799 Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor 800 ); 801 802 803 /** 804 * Returns a push promise handler that accumulates push promises, and 805 * their responses, into the given map. 806 * 807 * <p> Entries are added to the given map for each push promise accepted. 808 * The entry's key is the push request, and the entry's value is a 809 * {@code CompletableFuture} that completes with the response 810 * corresponding to the key's push request. A push request is rejected / 811 * cancelled if there is already an entry in the map whose key is 812 * {@linkplain HttpRequest#equals equal} to it. A push request is 813 * rejected / cancelled if it does not have the same origin as its 814 * initiating request. 815 * 816 * <p> Entries are added to the given map as soon as practically 817 * possible when a push promise is received and accepted. That way code, 818 * using such a map like a cache, can determine if a push promise has 819 * been issued by the server and avoid making, possibly, unnecessary 820 * requests. 821 * 822 * <p> The delivery of a push promise response is not coordinated with 823 * the delivery of the response to the initiating client-sent request. 824 * However, when the response body for the initiating client-sent 825 * request has been fully received, the map is guaranteed to be fully 826 * populated, that is, no more entries will be added. The individual 827 * {@code CompletableFutures} contained in the map may or may not 828 * already be completed at this point. 829 * 830 * @param <T> the push promise response body type 831 * @param pushPromiseHandler t he body handler to use for push promises 832 * @param pushPromisesMap a map to accumulate push promises into 833 * @return a push promise handler 834 */ 835 public static <T> PushPromiseHandler<T> of(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler, ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap)836 of(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler, 837 ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) { 838 return new PushPromisesHandlerWithMap<>(pushPromiseHandler, pushPromisesMap); 839 } 840 } 841 842 /** 843 * A {@code BodySubscriber} consumes response body bytes and converts them 844 * into a higher-level Java type. The class {@link BodySubscribers 845 * BodySubscribers} provides implementations of many common body subscribers. 846 * 847 * <p> The object acts as a {@link Flow.Subscriber}<{@link List}<{@link 848 * ByteBuffer}>> to the HTTP Client implementation, which publishes 849 * lists of ByteBuffers containing the response body. The Flow of data, as 850 * well as the order of ByteBuffers in the Flow lists, is a strictly ordered 851 * representation of the response body. Both the Lists and the ByteBuffers, 852 * once passed to the subscriber, are no longer used by the HTTP Client. The 853 * subscriber converts the incoming buffers of data to some higher-level 854 * Java type {@code T}. 855 * 856 * <p> The {@link #getBody()} method returns a 857 * {@link CompletionStage}{@code <T>} that provides the response body 858 * object. The {@code CompletionStage} must be obtainable at any time. When 859 * it completes depends on the nature of type {@code T}. In many cases, 860 * when {@code T} represents the entire body after being consumed then 861 * the {@code CompletionStage} completes after the body has been consumed. 862 * If {@code T} is a streaming type, such as {@link java.io.InputStream 863 * InputStream}, then it completes before the body has been read, because 864 * the calling code uses the {@code InputStream} to consume the data. 865 * 866 * @apiNote To ensure that all resources associated with the corresponding 867 * HTTP exchange are properly released, an implementation of {@code 868 * BodySubscriber} should ensure to {@linkplain Flow.Subscription#request 869 * request} more data until one of {@link #onComplete() onComplete} or 870 * {@link #onError(Throwable) onError} are signalled, or {@link 871 * Flow.Subscription#request cancel} its {@linkplain 872 * #onSubscribe(Flow.Subscription) subscription} if unable or unwilling to 873 * do so. Calling {@code cancel} before exhausting the response body data 874 * may cause the underlying HTTP connection to be closed and prevent it 875 * from being reused for subsequent operations. 876 * 877 * @implNote The flow of data containing the response body is immutable. 878 * Specifically, it is a flow of unmodifiable lists of read-only ByteBuffers. 879 * 880 * @param <T> the response body type 881 * @see BodySubscribers 882 * @since 11 883 */ 884 public interface BodySubscriber<T> 885 extends Flow.Subscriber<List<ByteBuffer>> { 886 887 /** 888 * Returns a {@code CompletionStage} which when completed will return 889 * the response body object. This method can be called at any time 890 * relative to the other {@link Flow.Subscriber} methods and is invoked 891 * using the client's {@link HttpClient#executor() executor}. 892 * 893 * @return a CompletionStage for the response body 894 */ getBody()895 public CompletionStage<T> getBody(); 896 } 897 898 /** 899 * Implementations of {@link BodySubscriber BodySubscriber} that implement 900 * various useful subscribers, such as converting the response body bytes 901 * into a String, or streaming the bytes to a file. 902 * 903 * <p>The following are examples of using the predefined body subscribers 904 * to convert a flow of response body data into common high-level Java 905 * objects: 906 * 907 * <pre>{@code // Streams the response body to a File 908 * HttpResponse<Path> response = client 909 * .send(request, responseInfo -> BodySubscribers.ofFile(Paths.get("example.html")); 910 * 911 * // Accumulates the response body and returns it as a byte[] 912 * HttpResponse<byte[]> response = client 913 * .send(request, responseInfo -> BodySubscribers.ofByteArray()); 914 * 915 * // Discards the response body 916 * HttpResponse<Void> response = client 917 * .send(request, responseInfo -> BodySubscribers.discarding()); 918 * 919 * // Accumulates the response body as a String then maps it to its bytes 920 * HttpResponse<byte[]> response = client 921 * .send(request, responseInfo -> 922 * BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), String::getBytes)); 923 * }</pre> 924 * 925 * @since 11 926 */ 927 public static class BodySubscribers { 928 BodySubscribers()929 private BodySubscribers() { } 930 931 /** 932 * Returns a body subscriber that forwards all response body to the 933 * given {@code Flow.Subscriber}. The {@linkplain BodySubscriber#getBody() 934 * completion stage} of the returned body subscriber completes after one 935 * of the given subscribers {@code onComplete} or {@code onError} has 936 * been invoked. 937 * 938 * @apiNote This method can be used as an adapter between {@code 939 * BodySubscriber} and {@code Flow.Subscriber}. 940 * 941 * @param subscriber the subscriber 942 * @return a body subscriber 943 */ 944 public static BodySubscriber<Void> fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber)945 fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) { 946 return new ResponseSubscribers.SubscriberAdapter<>(subscriber, s -> null); 947 } 948 949 /** 950 * Returns a body subscriber that forwards all response body to the 951 * given {@code Flow.Subscriber}. The {@linkplain BodySubscriber#getBody() 952 * completion stage} of the returned body subscriber completes after one 953 * of the given subscribers {@code onComplete} or {@code onError} has 954 * been invoked. 955 * 956 * <p> The given {@code finisher} function is applied after the given 957 * subscriber's {@code onComplete} has been invoked. The {@code finisher} 958 * function is invoked with the given subscriber, and returns a value 959 * that is set as the response's body. 960 * 961 * @apiNote This method can be used as an adapter between {@code 962 * BodySubscriber} and {@code Flow.Subscriber}. 963 * 964 * @param <S> the type of the Subscriber 965 * @param <T> the type of the response body 966 * @param subscriber the subscriber 967 * @param finisher a function to be applied after the subscriber has 968 * completed 969 * @return a body subscriber 970 */ 971 public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodySubscriber<T> fromSubscriber(S subscriber, Function<? super S,? extends T> finisher)972 fromSubscriber(S subscriber, 973 Function<? super S,? extends T> finisher) { 974 return new ResponseSubscribers.SubscriberAdapter<>(subscriber, finisher); 975 } 976 977 /** 978 * Returns a body subscriber that forwards all response body to the 979 * given {@code Flow.Subscriber}, line by line. 980 * The {@linkplain BodySubscriber#getBody() completion 981 * stage} of the returned body subscriber completes after one of the 982 * given subscribers {@code onComplete} or {@code onError} has been 983 * invoked. 984 * Bytes are decoded using the {@link StandardCharsets#UTF_8 985 * UTF-8} charset, and lines are delimited in the manner of 986 * {@link BufferedReader#readLine()}. 987 * 988 * @apiNote This method can be used as an adapter between {@code 989 * BodySubscriber} and {@code Flow.Subscriber}. 990 * 991 * @implNote This is equivalent to calling <pre>{@code 992 * fromLineSubscriber(subscriber, s -> null, StandardCharsets.UTF_8, null) 993 * }</pre> 994 * 995 * @param subscriber the subscriber 996 * @return a body subscriber 997 */ 998 public static BodySubscriber<Void> fromLineSubscriber(Subscriber<? super String> subscriber)999 fromLineSubscriber(Subscriber<? super String> subscriber) { 1000 return fromLineSubscriber(subscriber, s -> null, 1001 StandardCharsets.UTF_8, null); 1002 } 1003 1004 /** 1005 * Returns a body subscriber that forwards all response body to the 1006 * given {@code Flow.Subscriber}, line by line. The {@linkplain 1007 * BodySubscriber#getBody() completion stage} of the returned body 1008 * subscriber completes after one of the given subscribers 1009 * {@code onComplete} or {@code onError} has been invoked. 1010 * 1011 * <p> The given {@code finisher} function is applied after the given 1012 * subscriber's {@code onComplete} has been invoked. The {@code finisher} 1013 * function is invoked with the given subscriber, and returns a value 1014 * that is set as the response's body. 1015 * 1016 * @apiNote This method can be used as an adapter between {@code 1017 * BodySubscriber} and {@code Flow.Subscriber}. 1018 * 1019 * @param <S> the type of the Subscriber 1020 * @param <T> the type of the response body 1021 * @param subscriber the subscriber 1022 * @param finisher a function to be applied after the subscriber has 1023 * completed 1024 * @param charset a {@link Charset} to decode the bytes 1025 * @param lineSeparator an optional line separator: can be {@code null}, 1026 * in which case lines will be delimited in the manner of 1027 * {@link BufferedReader#readLine()}. 1028 * @return a body subscriber 1029 * @throws IllegalArgumentException if the supplied {@code lineSeparator} 1030 * is the empty string 1031 */ 1032 public static <S extends Subscriber<? super String>,T> BodySubscriber<T> fromLineSubscriber(S subscriber, Function<? super S,? extends T> finisher, Charset charset, String lineSeparator)1033 fromLineSubscriber(S subscriber, 1034 Function<? super S,? extends T> finisher, 1035 Charset charset, 1036 String lineSeparator) { 1037 return LineSubscriberAdapter.create(subscriber, 1038 finisher, charset, lineSeparator); 1039 } 1040 1041 /** 1042 * Returns a body subscriber which stores the response body as a {@code 1043 * String} converted using the given {@code Charset}. 1044 * 1045 * <p> The {@link HttpResponse} using this subscriber is available after 1046 * the entire response has been read. 1047 * 1048 * @param charset the character set to convert the String with 1049 * @return a body subscriber 1050 */ ofString(Charset charset)1051 public static BodySubscriber<String> ofString(Charset charset) { 1052 Objects.requireNonNull(charset); 1053 return new ResponseSubscribers.ByteArraySubscriber<>( 1054 bytes -> new String(bytes, charset) 1055 ); 1056 } 1057 1058 /** 1059 * Returns a {@code BodySubscriber} which stores the response body as a 1060 * byte array. 1061 * 1062 * <p> The {@link HttpResponse} using this subscriber is available after 1063 * the entire response has been read. 1064 * 1065 * @return a body subscriber 1066 */ ofByteArray()1067 public static BodySubscriber<byte[]> ofByteArray() { 1068 return new ResponseSubscribers.ByteArraySubscriber<>( 1069 Function.identity() // no conversion 1070 ); 1071 } 1072 1073 /** 1074 * Returns a {@code BodySubscriber} which stores the response body in a 1075 * file opened with the given options and name. The file will be opened 1076 * with the given options using {@link FileChannel#open(Path,OpenOption...) 1077 * FileChannel.open} just before the body is read. Any exception thrown 1078 * will be returned or thrown from {@link HttpClient#send(HttpRequest, 1079 * BodyHandler) HttpClient::send} or {@link HttpClient#sendAsync(HttpRequest, 1080 * BodyHandler) HttpClient::sendAsync} as appropriate. 1081 * 1082 * <p> The {@link HttpResponse} using this subscriber is available after 1083 * the entire response has been read. 1084 * 1085 * <p> In the case of the default file system provider, security manager 1086 * permission checks are performed in this factory method, when the 1087 * {@code BodySubscriber} is created. Otherwise, 1088 * {@linkplain FileChannel#open(Path, OpenOption...) permission checks} 1089 * may be performed asynchronously against the caller's context 1090 * at file access time. 1091 * Care must be taken that the {@code BodySubscriber} is not shared with 1092 * untrusted code. 1093 * 1094 * @param file the file to store the body in 1095 * @param openOptions the list of options to open the file with 1096 * @return a body subscriber 1097 * @throws IllegalArgumentException if an invalid set of open options 1098 * are specified 1099 * @throws SecurityException in the case of the default file system 1100 * provider, and a security manager is installed, 1101 * {@link SecurityManager#checkWrite(String) checkWrite} 1102 * is invoked to check write access to the given file 1103 */ ofFile(Path file, OpenOption... openOptions)1104 public static BodySubscriber<Path> ofFile(Path file, OpenOption... openOptions) { 1105 Objects.requireNonNull(file); 1106 List<OpenOption> opts = List.of(openOptions); 1107 if (opts.contains(DELETE_ON_CLOSE) || opts.contains(READ)) { 1108 // these options make no sense, since the FileChannel is not exposed 1109 throw new IllegalArgumentException("invalid openOptions: " + opts); 1110 } 1111 return PathSubscriber.create(file, opts); 1112 } 1113 1114 /** 1115 * Returns a {@code BodySubscriber} which stores the response body in a 1116 * file opened with the given name. 1117 * 1118 * <p> Equivalent to: {@code ofFile(file, CREATE, WRITE)} 1119 * 1120 * <p> In the case of the default file system provider, security manager 1121 * permission checks are performed in this factory method, when the 1122 * {@code BodySubscriber} is created. Otherwise, 1123 * {@linkplain FileChannel#open(Path, OpenOption...) permission checks} 1124 * may be performed asynchronously against the caller's context 1125 * at file access time. 1126 * Care must be taken that the {@code BodySubscriber} is not shared with 1127 * untrusted code. 1128 * 1129 * @param file the file to store the body in 1130 * @return a body subscriber 1131 * @throws SecurityException in the case of the default file system 1132 * provider, and a security manager is installed, 1133 * {@link SecurityManager#checkWrite(String) checkWrite} 1134 * is invoked to check write access to the given file 1135 */ ofFile(Path file)1136 public static BodySubscriber<Path> ofFile(Path file) { 1137 return ofFile(file, CREATE, WRITE); 1138 } 1139 1140 /** 1141 * Returns a {@code BodySubscriber} which provides the incoming body 1142 * data to the provided Consumer of {@code Optional<byte[]>}. Each 1143 * call to {@link Consumer#accept(java.lang.Object) Consumer.accept()} 1144 * will contain a non empty {@code Optional}, except for the final 1145 * invocation after all body data has been read, when the {@code 1146 * Optional} will be empty. 1147 * 1148 * <p> The {@link HttpResponse} using this subscriber is available after 1149 * the entire response has been read. 1150 * 1151 * @apiNote 1152 * This subscriber is not flow controlled. 1153 * Therefore, the supplied consumer must be able to process whatever 1154 * amount of data is delivered in a timely fashion. 1155 * 1156 * @param consumer a Consumer of byte arrays 1157 * @return a BodySubscriber 1158 */ 1159 public static BodySubscriber<Void> ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer)1160 ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer) { 1161 return new ResponseSubscribers.ConsumerSubscriber(consumer); 1162 } 1163 1164 /** 1165 * Returns a {@code BodySubscriber} which streams the response body as 1166 * an {@link InputStream}. 1167 * 1168 * <p> The {@link HttpResponse} using this subscriber is available 1169 * immediately after the response headers have been read, without 1170 * requiring to wait for the entire body to be processed. The response 1171 * body can then be read directly from the {@link InputStream}. 1172 * 1173 * @apiNote To ensure that all resources associated with the 1174 * corresponding exchange are properly released the caller must 1175 * ensure to either read all bytes until EOF is reached, or call 1176 * {@link InputStream#close} if it is unable or unwilling to do so. 1177 * Calling {@code close} before exhausting the stream may cause 1178 * the underlying HTTP connection to be closed and prevent it 1179 * from being reused for subsequent operations. 1180 * 1181 * @return a body subscriber that streams the response body as an 1182 * {@link InputStream}. 1183 */ ofInputStream()1184 public static BodySubscriber<InputStream> ofInputStream() { 1185 return new ResponseSubscribers.HttpResponseInputStream(); 1186 } 1187 1188 /** 1189 * Returns a {@code BodySubscriber} which streams the response body as 1190 * a {@link Stream Stream}{@code <String>}, where each string in the stream 1191 * corresponds to a line as defined by {@link BufferedReader#lines()}. 1192 * 1193 * <p> The {@link HttpResponse} using this subscriber is available 1194 * immediately after the response headers have been read, without 1195 * requiring to wait for the entire body to be processed. The response 1196 * body can then be read directly from the {@link Stream}. 1197 * 1198 * @apiNote To ensure that all resources associated with the 1199 * corresponding exchange are properly released the caller must 1200 * ensure to either read all lines until the stream is exhausted, 1201 * or call {@link Stream#close} if it is unable or unwilling to do so. 1202 * Calling {@code close} before exhausting the stream may cause 1203 * the underlying HTTP connection to be closed and prevent it 1204 * from being reused for subsequent operations. 1205 * 1206 * @param charset the character set to use when converting bytes to characters 1207 * @return a body subscriber that streams the response body as a 1208 * {@link Stream Stream}{@code <String>}. 1209 * 1210 * @see BufferedReader#lines() 1211 */ ofLines(Charset charset)1212 public static BodySubscriber<Stream<String>> ofLines(Charset charset) { 1213 return ResponseSubscribers.createLineStream(charset); 1214 } 1215 1216 /** 1217 * Returns a response subscriber which publishes the response body 1218 * through a {@code Publisher<List<ByteBuffer>>}. 1219 * 1220 * <p> The {@link HttpResponse} using this subscriber is available 1221 * immediately after the response headers have been read, without 1222 * requiring to wait for the entire body to be processed. The response 1223 * body bytes can then be obtained by subscribing to the publisher 1224 * returned by the {@code HttpResponse} {@link HttpResponse#body() body} 1225 * method. 1226 * 1227 * <p>The publisher returned by the {@link HttpResponse#body() body} 1228 * method can be subscribed to only once. The first subscriber will 1229 * receive the body response bytes if successfully subscribed, or will 1230 * cause the subscription to be cancelled otherwise. 1231 * If more subscriptions are attempted, the subsequent subscribers will 1232 * be immediately subscribed with an empty subscription and their 1233 * {@link Subscriber#onError(Throwable) onError} method 1234 * will be invoked with an {@code IllegalStateException}. 1235 * 1236 * @apiNote To ensure that all resources associated with the 1237 * corresponding exchange are properly released the caller must 1238 * ensure that the provided publisher is subscribed once, and either 1239 * {@linkplain Subscription#request(long) requests} all bytes 1240 * until {@link Subscriber#onComplete() onComplete} or 1241 * {@link Subscriber#onError(Throwable) onError} are invoked, or 1242 * cancel the provided {@linkplain Subscriber#onSubscribe(Subscription) 1243 * subscription} if it is unable or unwilling to do so. 1244 * Note that depending on the actual HTTP protocol {@linkplain 1245 * HttpClient.Version version} used for the exchange, cancelling the 1246 * subscription instead of exhausting the flow may cause the underlying 1247 * HTTP connection to be closed and prevent it from being reused for 1248 * subsequent operations. 1249 * 1250 * @return A {@code BodySubscriber} which publishes the response body 1251 * through a {@code Publisher<List<ByteBuffer>>}. 1252 */ ofPublisher()1253 public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() { 1254 return ResponseSubscribers.createPublisher(); 1255 } 1256 1257 /** 1258 * Returns a response subscriber which discards the response body. The 1259 * supplied value is the value that will be returned from 1260 * {@link HttpResponse#body()}. 1261 * 1262 * @param <U> the type of the response body 1263 * @param value the value to return from HttpResponse.body(), may be {@code null} 1264 * @return a {@code BodySubscriber} 1265 */ replacing(U value)1266 public static <U> BodySubscriber<U> replacing(U value) { 1267 return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(value)); 1268 } 1269 1270 /** 1271 * Returns a response subscriber which discards the response body. 1272 * 1273 * @return a response body subscriber 1274 */ discarding()1275 public static BodySubscriber<Void> discarding() { 1276 return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(null)); 1277 } 1278 1279 /** 1280 * Returns a {@code BodySubscriber} which buffers data before delivering 1281 * it to the given downstream subscriber. The subscriber guarantees to 1282 * deliver {@code bufferSize} bytes of data to each invocation of the 1283 * downstream's {@link BodySubscriber#onNext(Object) onNext} method, 1284 * except for the final invocation, just before 1285 * {@link BodySubscriber#onComplete() onComplete} is invoked. The final 1286 * invocation of {@code onNext} may contain fewer than {@code bufferSize} 1287 * bytes. 1288 * 1289 * <p> The returned subscriber delegates its {@link BodySubscriber#getBody() 1290 * getBody()} method to the downstream subscriber. 1291 * 1292 * @param <T> the type of the response body 1293 * @param downstream the downstream subscriber 1294 * @param bufferSize the buffer size 1295 * @return a buffering body subscriber 1296 * @throws IllegalArgumentException if {@code bufferSize <= 0} 1297 */ buffering(BodySubscriber<T> downstream, int bufferSize)1298 public static <T> BodySubscriber<T> buffering(BodySubscriber<T> downstream, 1299 int bufferSize) { 1300 if (bufferSize <= 0) 1301 throw new IllegalArgumentException("must be greater than 0"); 1302 return new BufferingSubscriber<>(downstream, bufferSize); 1303 } 1304 1305 /** 1306 * Returns a {@code BodySubscriber} whose response body value is that of 1307 * the result of applying the given function to the body object of the 1308 * given {@code upstream} {@code BodySubscriber}. 1309 * 1310 * <p> The mapping function is executed using the client's {@linkplain 1311 * HttpClient#executor() executor}, and can therefore be used to map any 1312 * response body type, including blocking {@link InputStream}. 1313 * However, performing any blocking operation in the mapper function 1314 * runs the risk of blocking the executor's thread for an unknown 1315 * amount of time (at least until the blocking operation finishes), 1316 * which may end up starving the executor of available threads. 1317 * Therefore, in the case where mapping to the desired type might 1318 * block (e.g. by reading on the {@code InputStream}), then mapping 1319 * to a {@link java.util.function.Supplier Supplier} of the desired 1320 * type and deferring the blocking operation until {@link Supplier#get() 1321 * Supplier::get} is invoked by the caller's thread should be preferred, 1322 * as shown in the following example which uses a well-known JSON parser to 1323 * convert an {@code InputStream} into any annotated Java type. 1324 * 1325 * <p>For example: 1326 * <pre> {@code public static <W> BodySubscriber<Supplier<W>> asJSON(Class<W> targetType) { 1327 * BodySubscriber<InputStream> upstream = BodySubscribers.ofInputStream(); 1328 * 1329 * BodySubscriber<Supplier<W>> downstream = BodySubscribers.mapping( 1330 * upstream, 1331 * (InputStream is) -> () -> { 1332 * try (InputStream stream = is) { 1333 * ObjectMapper objectMapper = new ObjectMapper(); 1334 * return objectMapper.readValue(stream, targetType); 1335 * } catch (IOException e) { 1336 * throw new UncheckedIOException(e); 1337 * } 1338 * }); 1339 * return downstream; 1340 * } }</pre> 1341 * 1342 * @param <T> the upstream body type 1343 * @param <U> the type of the body subscriber returned 1344 * @param upstream the body subscriber to be mapped 1345 * @param mapper the mapping function 1346 * @return a mapping body subscriber 1347 */ mapping(BodySubscriber<T> upstream, Function<? super T, ? extends U> mapper)1348 public static <T,U> BodySubscriber<U> mapping(BodySubscriber<T> upstream, 1349 Function<? super T, ? extends U> mapper) 1350 { 1351 return new ResponseSubscribers.MappingSubscriber<>(upstream, mapper); 1352 } 1353 } 1354 } 1355