1 /*
2  * Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package java.net.http;
27 
28 import java.io.BufferedReader;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.net.URI;
32 import java.nio.ByteBuffer;
33 import java.nio.charset.Charset;
34 import java.nio.channels.FileChannel;
35 import java.nio.charset.StandardCharsets;
36 import java.nio.file.OpenOption;
37 import java.nio.file.Path;
38 import java.util.List;
39 import java.util.Objects;
40 import java.util.Optional;
41 import java.util.concurrent.CompletableFuture;
42 import java.util.concurrent.CompletionStage;
43 import java.util.concurrent.ConcurrentMap;
44 import java.util.concurrent.Flow;
45 import java.util.concurrent.Flow.Subscriber;
46 import java.util.concurrent.Flow.Publisher;
47 import java.util.concurrent.Flow.Subscription;
48 import java.util.function.Consumer;
49 import java.util.function.Function;
50 import java.util.function.Supplier;
51 import java.util.stream.Stream;
52 import javax.net.ssl.SSLSession;
53 import jdk.internal.net.http.BufferingSubscriber;
54 import jdk.internal.net.http.LineSubscriberAdapter;
55 import jdk.internal.net.http.ResponseBodyHandlers.FileDownloadBodyHandler;
56 import jdk.internal.net.http.ResponseBodyHandlers.PathBodyHandler;
57 import jdk.internal.net.http.ResponseBodyHandlers.PushPromisesHandlerWithMap;
58 import jdk.internal.net.http.ResponseSubscribers;
59 import jdk.internal.net.http.ResponseSubscribers.PathSubscriber;
60 import static java.nio.file.StandardOpenOption.*;
61 import static jdk.internal.net.http.common.Utils.charsetFrom;
62 
63 /**
64  * An HTTP response.
65  *
66  * <p> An {@code HttpResponse} is not created directly, but rather returned as
67  * a result of sending an {@link HttpRequest}. An {@code HttpResponse} is
68  * made available when the response status code and headers have been received,
69  * and typically after the response body has also been completely received.
70  * Whether or not the {@code HttpResponse} is made available before the response
71  * body has been completely received depends on the {@link BodyHandler
72  * BodyHandler} provided when sending the {@code HttpRequest}.
73  *
74  * <p> This class provides methods for accessing the response status code,
75  * headers, the response body, and the {@code HttpRequest} corresponding
76  * to this response.
77  *
78  * <p> The following is an example of retrieving a response as a String:
79  *
80  * <pre>{@code    HttpResponse<String> response = client
81  *     .send(request, BodyHandlers.ofString()); }</pre>
82  *
83  * <p> The class {@link BodyHandlers BodyHandlers} provides implementations
84  * of many common response handlers. Alternatively, a custom {@code BodyHandler}
85  * implementation can be used.
86  *
87  * @param <T> the response body type
88  * @since 11
89  */
90 public interface HttpResponse<T> {
91 
92 
93     /**
94      * Returns the status code for this response.
95      *
96      * @return the response code
97      */
statusCode()98     public int statusCode();
99 
100     /**
101      * Returns the {@link HttpRequest} corresponding to this response.
102      *
103      * <p> The returned {@code HttpRequest} may not be the initiating request
104      * provided when {@linkplain HttpClient#send(HttpRequest, BodyHandler)
105      * sending}. For example, if the initiating request was redirected, then the
106      * request returned by this method will have the redirected URI, which will
107      * be different from the initiating request URI.
108      *
109      * @see #previousResponse()
110      *
111      * @return the request
112      */
request()113     public HttpRequest request();
114 
115     /**
116      * Returns an {@code Optional} containing the previous intermediate response
117      * if one was received. An intermediate response is one that is received
118      * as a result of redirection or authentication. If no previous response
119      * was received then an empty {@code Optional} is returned.
120      *
121      * @return an Optional containing the HttpResponse, if any.
122      */
previousResponse()123     public Optional<HttpResponse<T>> previousResponse();
124 
125     /**
126      * Returns the received response headers.
127      *
128      * @return the response headers
129      */
headers()130     public HttpHeaders headers();
131 
132     /**
133      * Returns the body. Depending on the type of {@code T}, the returned body
134      * may represent the body after it was read (such as {@code byte[]}, or
135      * {@code String}, or {@code Path}) or it may represent an object with
136      * which the body is read, such as an {@link java.io.InputStream}.
137      *
138      * <p> If this {@code HttpResponse} was returned from an invocation of
139      * {@link #previousResponse()} then this method returns {@code null}
140      *
141      * @return the body
142      */
body()143     public T body();
144 
145     /**
146      * Returns an {@link Optional} containing the {@link SSLSession} in effect
147      * for this response. Returns an empty {@code Optional} if this is not a
148      * <i>HTTPS</i> response.
149      *
150      * @return an {@code Optional} containing the {@code SSLSession} associated
151      *         with the response
152      */
sslSession()153     public Optional<SSLSession> sslSession();
154 
155     /**
156      * Returns the {@code URI} that the response was received from. This may be
157      * different from the request {@code URI} if redirection occurred.
158      *
159      * @return the URI of the response
160      */
uri()161      public URI uri();
162 
163     /**
164      * Returns the HTTP protocol version that was used for this response.
165      *
166      * @return HTTP protocol version
167      */
version()168     public HttpClient.Version version();
169 
170 
171     /**
172      * Initial response information supplied to a {@link BodyHandler BodyHandler}
173      * when a response is initially received and before the body is processed.
174      */
175     public interface ResponseInfo {
176         /**
177          * Provides the response status code.
178          * @return the response status code
179          */
statusCode()180         public int statusCode();
181 
182         /**
183          * Provides the response headers.
184          * @return the response headers
185          */
headers()186         public HttpHeaders headers();
187 
188         /**
189          * Provides the response protocol version.
190          * @return the response protocol version
191          */
version()192         public HttpClient.Version version();
193     }
194 
195     /**
196      * A handler for response bodies.  The class {@link BodyHandlers BodyHandlers}
197      * provides implementations of many common body handlers.
198      *
199      * <p> The {@code BodyHandler} interface allows inspection of the response
200      * code and headers, before the actual response body is received, and is
201      * responsible for creating the response {@link BodySubscriber
202      * BodySubscriber}. The {@code BodySubscriber} consumes the actual response
203      * body bytes and, typically, converts them into a higher-level Java type.
204      *
205      * <p> A {@code BodyHandler} is a function that takes a {@link ResponseInfo
206      * ResponseInfo} object; and which returns a {@code BodySubscriber}. The
207      * {@code BodyHandler} is invoked when the response status code and headers
208      * are available, but before the response  body bytes are received.
209      *
210      * <p> The following example uses one of the {@linkplain BodyHandlers
211      * predefined body handlers} that always process the response body in the
212      * same way ( streams the response body to a file ).
213      *
214      * <pre>{@code   HttpRequest request = HttpRequest.newBuilder()
215      *        .uri(URI.create("http://www.foo.com/"))
216      *        .build();
217      *  client.sendAsync(request, BodyHandlers.ofFile(Paths.get("/tmp/f")))
218      *        .thenApply(HttpResponse::body)
219      *        .thenAccept(System.out::println); }</pre>
220      *
221      * Note, that even though the pre-defined handlers do not examine the
222      * response code, the response code and headers are always retrievable from
223      * the {@link HttpResponse}, when it is returned.
224      *
225      * <p> In the second example, the function returns a different subscriber
226      * depending on the status code.
227      * <pre>{@code   HttpRequest request = HttpRequest.newBuilder()
228      *        .uri(URI.create("http://www.foo.com/"))
229      *        .build();
230      *  BodyHandler<Path> bodyHandler = (rspInfo) -> rspInfo.statusCode() == 200
231      *                      ? BodySubscribers.ofFile(Paths.get("/tmp/f"))
232      *                      : BodySubscribers.replacing(Paths.get("/NULL"));
233      *  client.sendAsync(request, bodyHandler)
234      *        .thenApply(HttpResponse::body)
235      *        .thenAccept(System.out::println); }</pre>
236      *
237      * @param <T> the response body type
238      * @see BodyHandlers
239      * @since 11
240      */
241     @FunctionalInterface
242     public interface BodyHandler<T> {
243 
244         /**
245          * Returns a {@link BodySubscriber BodySubscriber} considering the
246          * given response status code and headers. This method is invoked before
247          * the actual response body bytes are read and its implementation must
248          * return a {@link BodySubscriber BodySubscriber} to consume the response
249          * body bytes.
250          *
251          * <p> The response body can be discarded using one of {@link
252          * BodyHandlers#discarding() discarding} or {@link
253          * BodyHandlers#replacing(Object) replacing}.
254          *
255          * @param responseInfo the response info
256          * @return a body subscriber
257          */
apply(ResponseInfo responseInfo)258         public BodySubscriber<T> apply(ResponseInfo responseInfo);
259     }
260 
261     /**
262      * Implementations of {@link BodyHandler BodyHandler} that implement various
263      * useful handlers, such as handling the response body as a String, or
264      * streaming the response body to a file.
265      *
266      * <p> These implementations do not examine the status code, meaning the
267      * body is always accepted. They typically return an equivalently named
268      * {@code BodySubscriber}. Alternatively, a custom handler can be used to
269      * examine the status code and headers, and return a different body
270      * subscriber, of the same type, as appropriate.
271      *
272      * <p>The following are examples of using the predefined body handlers to
273      * convert a flow of response body data into common high-level Java objects:
274      *
275      * <pre>{@code    // Receives the response body as a String
276      *   HttpResponse<String> response = client
277      *     .send(request, BodyHandlers.ofString());
278      *
279      *   // Receives the response body as a file
280      *   HttpResponse<Path> response = client
281      *     .send(request, BodyHandlers.ofFile(Paths.get("example.html")));
282      *
283      *   // Receives the response body as an InputStream
284      *   HttpResponse<InputStream> response = client
285      *     .send(request, BodyHandlers.ofInputStream());
286      *
287      *   // Discards the response body
288      *   HttpResponse<Void> response = client
289      *     .send(request, BodyHandlers.discarding());  }</pre>
290      *
291      * @since 11
292      */
293     public static class BodyHandlers {
294 
BodyHandlers()295         private BodyHandlers() { }
296 
297         /**
298          * Returns a response body handler that returns a {@link BodySubscriber
299          * BodySubscriber}{@code <Void>} obtained from {@link
300          * BodySubscribers#fromSubscriber(Subscriber)}, with the given
301          * {@code subscriber}.
302          *
303          * <p> The response body is not available through this, or the {@code
304          * HttpResponse} API, but instead all response body is forwarded to the
305          * given {@code subscriber}, which should make it available, if
306          * appropriate, through some other mechanism, e.g. an entry in a
307          * database, etc.
308          *
309          * @apiNote This method can be used as an adapter between {@code
310          * BodySubscriber} and {@code Flow.Subscriber}.
311          *
312          * <p> For example:
313          * <pre> {@code  TextSubscriber subscriber = new TextSubscriber();
314          *  HttpResponse<Void> response = client.sendAsync(request,
315          *      BodyHandlers.fromSubscriber(subscriber)).join();
316          *  System.out.println(response.statusCode()); }</pre>
317          *
318          * @param subscriber the subscriber
319          * @return a response body handler
320          */
321         public static BodyHandler<Void>
fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber)322         fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) {
323             Objects.requireNonNull(subscriber);
324             return (responseInfo) -> BodySubscribers.fromSubscriber(subscriber,
325                                                                        s -> null);
326         }
327 
328         /**
329          * Returns a response body handler that returns a {@link BodySubscriber
330          * BodySubscriber}{@code <T>} obtained from {@link
331          * BodySubscribers#fromSubscriber(Subscriber, Function)}, with the
332          * given {@code subscriber} and {@code finisher} function.
333          *
334          * <p> The given {@code finisher} function is applied after the given
335          * subscriber's {@code onComplete} has been invoked. The {@code finisher}
336          * function is invoked with the given subscriber, and returns a value
337          * that is set as the response's body.
338          *
339          * @apiNote This method can be used as an adapter between {@code
340          * BodySubscriber} and {@code Flow.Subscriber}.
341          *
342          * <p> For example:
343          * <pre> {@code  TextSubscriber subscriber = ...;  // accumulates bytes and transforms them into a String
344          *  HttpResponse<String> response = client.sendAsync(request,
345          *      BodyHandlers.fromSubscriber(subscriber, TextSubscriber::getTextResult)).join();
346          *  String text = response.body(); }</pre>
347          *
348          * @param <S> the type of the Subscriber
349          * @param <T> the type of the response body
350          * @param subscriber the subscriber
351          * @param finisher a function to be applied after the subscriber has completed
352          * @return a response body handler
353          */
354         public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodyHandler<T>
fromSubscriber(S subscriber, Function<? super S,? extends T> finisher)355         fromSubscriber(S subscriber, Function<? super S,? extends T> finisher) {
356             Objects.requireNonNull(subscriber);
357             Objects.requireNonNull(finisher);
358             return (responseInfo) -> BodySubscribers.fromSubscriber(subscriber,
359                                                                       finisher);
360         }
361 
362         /**
363          * Returns a response body handler that returns a {@link BodySubscriber
364          * BodySubscriber}{@code <Void>} obtained from {@link
365          * BodySubscribers#fromLineSubscriber(Subscriber, Function, Charset, String)
366          * BodySubscribers.fromLineSubscriber(subscriber, s -> null, charset, null)},
367          * with the given {@code subscriber}.
368          * The {@link Charset charset} used to decode the response body bytes is
369          * obtained from the HTTP response headers as specified by {@link #ofString()},
370          * and lines are delimited in the manner of {@link BufferedReader#readLine()}.
371          *
372          * <p> The response body is not available through this, or the {@code
373          * HttpResponse} API, but instead all response body is forwarded to the
374          * given {@code subscriber}, which should make it available, if
375          * appropriate, through some other mechanism, e.g. an entry in a
376          * database, etc.
377          *
378          * @apiNote This method can be used as an adapter between a {@code
379          * BodySubscriber} and a text based {@code Flow.Subscriber} that parses
380          * text line by line.
381          *
382          * <p> For example:
383          * <pre> {@code  // A PrintSubscriber that implements Flow.Subscriber<String>
384          *  // and print lines received by onNext() on System.out
385          *  PrintSubscriber subscriber = new PrintSubscriber(System.out);
386          *  client.sendAsync(request, BodyHandlers.fromLineSubscriber(subscriber))
387          *      .thenApply(HttpResponse::statusCode)
388          *      .thenAccept((status) -> {
389          *          if (status != 200) {
390          *              System.err.printf("ERROR: %d status received%n", status);
391          *          }
392          *      }); }</pre>
393          *
394          * @param subscriber the subscriber
395          * @return a response body handler
396          */
397         public static BodyHandler<Void>
fromLineSubscriber(Subscriber<? super String> subscriber)398         fromLineSubscriber(Subscriber<? super String> subscriber) {
399             Objects.requireNonNull(subscriber);
400             return (responseInfo) ->
401                         BodySubscribers.fromLineSubscriber(subscriber,
402                                                            s -> null,
403                                                            charsetFrom(responseInfo.headers()),
404                                                            null);
405         }
406 
407         /**
408          * Returns a response body handler that returns a {@link BodySubscriber
409          * BodySubscriber}{@code <T>} obtained from {@link
410          * BodySubscribers#fromLineSubscriber(Subscriber, Function, Charset, String)
411          * BodySubscribers.fromLineSubscriber(subscriber, finisher, charset, lineSeparator)},
412          * with the given {@code subscriber}, {@code finisher} function, and line separator.
413          * The {@link Charset charset} used to decode the response body bytes is
414          * obtained from the HTTP response headers as specified by {@link #ofString()}.
415          *
416          * <p> The given {@code finisher} function is applied after the given
417          * subscriber's {@code onComplete} has been invoked. The {@code finisher}
418          * function is invoked with the given subscriber, and returns a value
419          * that is set as the response's body.
420          *
421          * @apiNote This method can be used as an adapter between a {@code
422          * BodySubscriber} and a text based {@code Flow.Subscriber} that parses
423          * text line by line.
424          *
425          * <p> For example:
426          * <pre> {@code  // A LineParserSubscriber that implements Flow.Subscriber<String>
427          *  // and accumulates lines that match a particular pattern
428          *  Pattern pattern = ...;
429          *  LineParserSubscriber subscriber = new LineParserSubscriber(pattern);
430          *  HttpResponse<List<String>> response = client.send(request,
431          *      BodyHandlers.fromLineSubscriber(subscriber, s -> s.getMatchingLines(), "\n"));
432          *  if (response.statusCode() != 200) {
433          *      System.err.printf("ERROR: %d status received%n", response.statusCode());
434          *  } }</pre>
435          *
436          *
437          * @param <S> the type of the Subscriber
438          * @param <T> the type of the response body
439          * @param subscriber the subscriber
440          * @param finisher a function to be applied after the subscriber has completed
441          * @param lineSeparator an optional line separator: can be {@code null},
442          *                      in which case lines will be delimited in the manner of
443          *                      {@link BufferedReader#readLine()}.
444          * @return a response body handler
445          * @throws IllegalArgumentException if the supplied {@code lineSeparator}
446          *         is the empty string
447          */
448         public static <S extends Subscriber<? super String>,T> BodyHandler<T>
fromLineSubscriber(S subscriber, Function<? super S,? extends T> finisher, String lineSeparator)449         fromLineSubscriber(S subscriber,
450                            Function<? super S,? extends T> finisher,
451                            String lineSeparator) {
452             Objects.requireNonNull(subscriber);
453             Objects.requireNonNull(finisher);
454             // implicit null check
455             if (lineSeparator != null && lineSeparator.isEmpty())
456                 throw new IllegalArgumentException("empty line separator");
457             return (responseInfo) ->
458                         BodySubscribers.fromLineSubscriber(subscriber,
459                                                            finisher,
460                                                            charsetFrom(responseInfo.headers()),
461                                                            lineSeparator);
462         }
463 
464         /**
465          * Returns a response body handler that discards the response body.
466          *
467          * @return a response body handler
468          */
discarding()469         public static BodyHandler<Void> discarding() {
470             return (responseInfo) -> BodySubscribers.discarding();
471         }
472 
473         /**
474          * Returns a response body handler that returns the given replacement
475          * value, after discarding the response body.
476          *
477          * @param <U> the response body type
478          * @param value the value of U to return as the body, may be {@code null}
479          * @return a response body handler
480          */
replacing(U value)481         public static <U> BodyHandler<U> replacing(U value) {
482             return (responseInfo) -> BodySubscribers.replacing(value);
483         }
484 
485         /**
486          * Returns a {@code BodyHandler<String>} that returns a
487          * {@link BodySubscriber BodySubscriber}{@code <String>} obtained from
488          * {@link BodySubscribers#ofString(Charset) BodySubscribers.ofString(Charset)}.
489          * The body is decoded using the given character set.
490          *
491          * @param charset the character set to convert the body with
492          * @return a response body handler
493          */
ofString(Charset charset)494         public static BodyHandler<String> ofString(Charset charset) {
495             Objects.requireNonNull(charset);
496             return (responseInfo) -> BodySubscribers.ofString(charset);
497         }
498 
499         /**
500          * Returns a {@code BodyHandler<Path>} that returns a
501          * {@link BodySubscriber BodySubscriber}{@code <Path>} obtained from
502          * {@link BodySubscribers#ofFile(Path, OpenOption...)
503          * BodySubscribers.ofFile(Path,OpenOption...)}.
504          *
505          * <p> When the {@code HttpResponse} object is returned, the body has
506          * been completely written to the file, and {@link #body()} returns a
507          * reference to its {@link Path}.
508          *
509          * <p> In the case of the default file system provider, security manager
510          * permission checks are performed in this factory method, when the
511          * {@code BodyHandler} is created. Otherwise,
512          * {@linkplain FileChannel#open(Path, OpenOption...) permission checks}
513          * may be performed asynchronously against the caller's context
514          * at file access time.
515          * Care must be taken that the {@code BodyHandler} is not shared with
516          * untrusted code.
517          *
518          * @param  file the file to store the body in
519          * @param  openOptions any options to use when opening/creating the file
520          * @return a response body handler
521          * @throws IllegalArgumentException if an invalid set of open options
522          *         are specified
523          * @throws SecurityException in the case of the default file system
524          *         provider, and a security manager is installed,
525          *         {@link SecurityManager#checkWrite(String) checkWrite}
526          *         is invoked to check write access to the given file
527          */
ofFile(Path file, OpenOption... openOptions)528         public static BodyHandler<Path> ofFile(Path file, OpenOption... openOptions) {
529             Objects.requireNonNull(file);
530             List<OpenOption> opts = List.of(openOptions);
531             if (opts.contains(DELETE_ON_CLOSE) || opts.contains(READ)) {
532                 // these options make no sense, since the FileChannel is not exposed
533                 throw new IllegalArgumentException("invalid openOptions: " + opts);
534             }
535             return PathBodyHandler.create(file, opts);
536         }
537 
538         /**
539          * Returns a {@code BodyHandler<Path>} that returns a
540          * {@link BodySubscriber BodySubscriber}{@code <Path>}.
541          *
542          * <p> Equivalent to: {@code ofFile(file, CREATE, WRITE)}
543          *
544          * <p> In the case of the default file system provider, security manager
545          * permission checks are performed in this factory method, when the
546          * {@code BodyHandler} is created. Otherwise,
547          * {@linkplain FileChannel#open(Path, OpenOption...) permission checks}
548          * may be performed asynchronously against the caller's context
549          * at file access time.
550          * Care must be taken that the {@code BodyHandler} is not shared with
551          * untrusted code.
552          *
553          * @param  file the file to store the body in
554          * @return a response body handler
555          * @throws SecurityException in the case of the default file system
556          *         provider, and a security manager is installed,
557          *         {@link SecurityManager#checkWrite(String) checkWrite}
558          *         is invoked to check write access to the given file
559          */
ofFile(Path file)560         public static BodyHandler<Path> ofFile(Path file) {
561             return BodyHandlers.ofFile(file, CREATE, WRITE);
562         }
563 
564         /**
565          * Returns a {@code BodyHandler<Path>} that returns a
566          * {@link BodySubscriber BodySubscriber}&lt;{@link Path}&gt;
567          * where the download directory is specified, but the filename is
568          * obtained from the {@code Content-Disposition} response header. The
569          * {@code Content-Disposition} header must specify the <i>attachment</i>
570          * type and must also contain a <i>filename</i> parameter. If the
571          * filename specifies multiple path components only the final component
572          * is used as the filename (with the given directory name).
573          *
574          * <p> When the {@code HttpResponse} object is returned, the body has
575          * been completely written to the file and {@link #body()} returns a
576          * {@code Path} object for the file. The returned {@code Path} is the
577          * combination of the supplied directory name and the file name supplied
578          * by the server. If the destination directory does not exist or cannot
579          * be written to, then the response will fail with an {@link IOException}.
580          *
581          * <p> Security manager permission checks are performed in this factory
582          * method, when the {@code BodyHandler} is created. Care must be taken
583          * that the {@code BodyHandler} is not shared with untrusted code.
584          *
585          * @param  directory the directory to store the file in
586          * @param  openOptions open options used when opening the file
587          * @return a response body handler
588          * @throws IllegalArgumentException if the given path does not exist,
589          *         is not of the default file system, is not a directory,
590          *         is not writable, or if an invalid set of open options
591          *         are specified
592          * @throws SecurityException in the case of the default file system
593          *         provider and a security manager has been installed,
594          *         and it denies
595          *         {@linkplain SecurityManager#checkRead(String) read access}
596          *         to the directory, or it denies
597          *         {@linkplain SecurityManager#checkWrite(String) write access}
598          *         to the directory, or it denies
599          *         {@linkplain SecurityManager#checkWrite(String) write access}
600          *         to the files within the directory.
601          */
ofFileDownload(Path directory, OpenOption... openOptions)602         public static BodyHandler<Path> ofFileDownload(Path directory,
603                                                        OpenOption... openOptions) {
604             Objects.requireNonNull(directory);
605             List<OpenOption> opts = List.of(openOptions);
606             if (opts.contains(DELETE_ON_CLOSE)) {
607                 throw new IllegalArgumentException("invalid option: " + DELETE_ON_CLOSE);
608             }
609             return FileDownloadBodyHandler.create(directory, opts);
610         }
611 
612         /**
613          * Returns a {@code BodyHandler<InputStream>} that returns a
614          * {@link BodySubscriber BodySubscriber}{@code <InputStream>} obtained from
615          * {@link BodySubscribers#ofInputStream() BodySubscribers.ofInputStream}.
616          *
617          * <p> When the {@code HttpResponse} object is returned, the response
618          * headers will have been completely read, but the body may not have
619          * been fully received yet. The {@link #body()} method returns an
620          * {@link InputStream} from which the body can be read as it is received.
621          *
622          * @apiNote See {@link BodySubscribers#ofInputStream()} for more
623          * information.
624          *
625          * @return a response body handler
626          */
ofInputStream()627         public static BodyHandler<InputStream> ofInputStream() {
628             return (responseInfo) -> BodySubscribers.ofInputStream();
629         }
630 
631         /**
632          * Returns a {@code BodyHandler<Stream<String>>} that returns a
633          * {@link BodySubscriber BodySubscriber}{@code <Stream<String>>} obtained
634          * from {@link BodySubscribers#ofLines(Charset) BodySubscribers.ofLines(charset)}.
635          * The {@link Charset charset} used to decode the response body bytes is
636          * obtained from the HTTP response headers as specified by {@link #ofString()},
637          * and lines are delimited in the manner of {@link BufferedReader#readLine()}.
638          *
639          * <p> When the {@code HttpResponse} object is returned, the body may
640          * not have been completely received.
641          *
642          * @return a response body handler
643          */
ofLines()644         public static BodyHandler<Stream<String>> ofLines() {
645             return (responseInfo) ->
646                     BodySubscribers.ofLines(charsetFrom(responseInfo.headers()));
647         }
648 
649         /**
650          * Returns a {@code BodyHandler<Void>} that returns a
651          * {@link BodySubscriber BodySubscriber}{@code <Void>} obtained from
652          * {@link BodySubscribers#ofByteArrayConsumer(Consumer)
653          * BodySubscribers.ofByteArrayConsumer(Consumer)}.
654          *
655          * <p> When the {@code HttpResponse} object is returned, the body has
656          * been completely written to the consumer.
657          *
658          * @apiNote
659          * The subscriber returned by this handler is not flow controlled.
660          * Therefore, the supplied consumer must be able to process whatever
661          * amount of data is delivered in a timely fashion.
662          *
663          * @param consumer a Consumer to accept the response body
664          * @return a response body handler
665          */
666         public static BodyHandler<Void>
ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer)667         ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer) {
668             Objects.requireNonNull(consumer);
669             return (responseInfo) -> BodySubscribers.ofByteArrayConsumer(consumer);
670         }
671 
672         /**
673          * Returns a {@code BodyHandler<byte[]>} that returns a
674          * {@link BodySubscriber BodySubscriber}{@code <byte[]>} obtained
675          * from {@link BodySubscribers#ofByteArray() BodySubscribers.ofByteArray()}.
676          *
677          * <p> When the {@code HttpResponse} object is returned, the body has
678          * been completely written to the byte array.
679          *
680          * @return a response body handler
681          */
ofByteArray()682         public static BodyHandler<byte[]> ofByteArray() {
683             return (responseInfo) -> BodySubscribers.ofByteArray();
684         }
685 
686         /**
687          * Returns a {@code BodyHandler<String>} that returns a
688          * {@link BodySubscriber BodySubscriber}{@code <String>} obtained from
689          * {@link BodySubscribers#ofString(Charset) BodySubscribers.ofString(Charset)}.
690          * The body is decoded using the character set specified in
691          * the {@code Content-Type} response header. If there is no such
692          * header, or the character set is not supported, then
693          * {@link StandardCharsets#UTF_8 UTF_8} is used.
694          *
695          * <p> When the {@code HttpResponse} object is returned, the body has
696          * been completely written to the string.
697          *
698          * @return a response body handler
699          */
ofString()700         public static BodyHandler<String> ofString() {
701             return (responseInfo) -> BodySubscribers.ofString(charsetFrom(responseInfo.headers()));
702         }
703 
704         /**
705          * Returns a {@code BodyHandler<Publisher<List<ByteBuffer>>>} that creates a
706          * {@link BodySubscriber BodySubscriber}{@code <Publisher<List<ByteBuffer>>>}
707          * obtained from {@link BodySubscribers#ofPublisher()
708          * BodySubscribers.ofPublisher()}.
709          *
710          * <p> When the {@code HttpResponse} object is returned, the response
711          * headers will have been completely read, but the body may not have
712          * been fully received yet. The {@link #body()} method returns a
713          * {@link Publisher Publisher}{@code <List<ByteBuffer>>} from which the body
714          * response bytes can be obtained as they are received. The publisher
715          * can and must be subscribed to only once.
716          *
717          * @apiNote See {@link BodySubscribers#ofPublisher()} for more
718          * information.
719          *
720          * @return a response body handler
721          */
ofPublisher()722         public static BodyHandler<Publisher<List<ByteBuffer>>> ofPublisher() {
723             return (responseInfo) -> BodySubscribers.ofPublisher();
724         }
725 
726         /**
727          * Returns a {@code BodyHandler} which, when invoked, returns a {@linkplain
728          * BodySubscribers#buffering(BodySubscriber,int) buffering BodySubscriber}
729          * that buffers data before delivering it to the downstream subscriber.
730          * These {@code BodySubscriber} instances are created by calling
731          * {@link BodySubscribers#buffering(BodySubscriber,int)
732          * BodySubscribers.buffering} with a subscriber obtained from the given
733          * downstream handler and the {@code bufferSize} parameter.
734          *
735          * @param <T> the response body type
736          * @param downstreamHandler the downstream handler
737          * @param bufferSize the buffer size parameter passed to {@link
738          *        BodySubscribers#buffering(BodySubscriber,int) BodySubscribers.buffering}
739          * @return a body handler
740          * @throws IllegalArgumentException if {@code bufferSize <= 0}
741          */
buffering(BodyHandler<T> downstreamHandler, int bufferSize)742          public static <T> BodyHandler<T> buffering(BodyHandler<T> downstreamHandler,
743                                                     int bufferSize) {
744              Objects.requireNonNull(downstreamHandler);
745              if (bufferSize <= 0)
746                  throw new IllegalArgumentException("must be greater than 0");
747              return (responseInfo) -> BodySubscribers
748                      .buffering(downstreamHandler.apply(responseInfo),
749                                 bufferSize);
750          }
751     }
752 
753     /**
754      * A handler for push promises.
755      *
756      * <p> A <i>push promise</i> is a synthetic request sent by an HTTP/2 server
757      * when retrieving an initiating client-sent request. The server has
758      * determined, possibly through inspection of the initiating request, that
759      * the client will likely need the promised resource, and hence pushes a
760      * synthetic push request, in the form of a push promise, to the client. The
761      * client can choose to accept or reject the push promise request.
762      *
763      * <p> A push promise request may be received up to the point where the
764      * response body of the initiating client-sent request has been fully
765      * received. The delivery of a push promise response, however, is not
766      * coordinated with the delivery of the response to the initiating
767      * client-sent request.
768      *
769      * @param <T> the push promise response body type
770      * @since 11
771      */
772     public interface PushPromiseHandler<T> {
773 
774         /**
775          * Notification of an incoming push promise.
776          *
777          * <p> This method is invoked once for each push promise received, up
778          * to the point where the response body of the initiating client-sent
779          * request has been fully received.
780          *
781          * <p> A push promise is accepted by invoking the given {@code acceptor}
782          * function. The {@code acceptor} function must be passed a non-null
783          * {@code BodyHandler}, that is to be used to handle the promise's
784          * response body. The acceptor function will return a {@code
785          * CompletableFuture} that completes with the promise's response.
786          *
787          * <p> If the {@code acceptor} function is not successfully invoked,
788          * then the push promise is rejected. The {@code acceptor} function will
789          * throw an {@code IllegalStateException} if invoked more than once.
790          *
791          * @param initiatingRequest the initiating client-send request
792          * @param pushPromiseRequest the synthetic push request
793          * @param acceptor the acceptor function that must be successfully
794          *                 invoked to accept the push promise
795          */
applyPushPromise( HttpRequest initiatingRequest, HttpRequest pushPromiseRequest, Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor )796         public void applyPushPromise(
797             HttpRequest initiatingRequest,
798             HttpRequest pushPromiseRequest,
799             Function<HttpResponse.BodyHandler<T>,CompletableFuture<HttpResponse<T>>> acceptor
800         );
801 
802 
803         /**
804          * Returns a push promise handler that accumulates push promises, and
805          * their responses, into the given map.
806          *
807          * <p> Entries are added to the given map for each push promise accepted.
808          * The entry's key is the push request, and the entry's value is a
809          * {@code CompletableFuture} that completes with the response
810          * corresponding to the key's push request. A push request is rejected /
811          * cancelled if there is already an entry in the map whose key is
812          * {@linkplain HttpRequest#equals equal} to it. A push request is
813          * rejected / cancelled if it  does not have the same origin as its
814          * initiating request.
815          *
816          * <p> Entries are added to the given map as soon as practically
817          * possible when a push promise is received and accepted. That way code,
818          * using such a map like a cache, can determine if a push promise has
819          * been issued by the server and avoid making, possibly, unnecessary
820          * requests.
821          *
822          * <p> The delivery of a push promise response is not coordinated with
823          * the delivery of the response to the initiating client-sent request.
824          * However, when the response body for the initiating client-sent
825          * request has been fully received, the map is guaranteed to be fully
826          * populated, that is, no more entries will be added. The individual
827          * {@code CompletableFutures} contained in the map may or may not
828          * already be completed at this point.
829          *
830          * @param <T> the push promise response body type
831          * @param pushPromiseHandler t he body handler to use for push promises
832          * @param pushPromisesMap a map to accumulate push promises into
833          * @return a push promise handler
834          */
835         public static <T> PushPromiseHandler<T>
of(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler, ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap)836         of(Function<HttpRequest,BodyHandler<T>> pushPromiseHandler,
837            ConcurrentMap<HttpRequest,CompletableFuture<HttpResponse<T>>> pushPromisesMap) {
838             return new PushPromisesHandlerWithMap<>(pushPromiseHandler, pushPromisesMap);
839         }
840     }
841 
842     /**
843      * A {@code BodySubscriber} consumes response body bytes and converts them
844      * into a higher-level Java type.  The class {@link BodySubscribers
845      * BodySubscribers} provides implementations of many common body subscribers.
846      *
847      * <p> The object acts as a {@link Flow.Subscriber}&lt;{@link List}&lt;{@link
848      * ByteBuffer}&gt;&gt; to the HTTP Client implementation, which publishes
849      * lists of ByteBuffers containing the response body. The Flow of data, as
850      * well as the order of ByteBuffers in the Flow lists, is a strictly ordered
851      * representation of the response body. Both the Lists and the ByteBuffers,
852      * once passed to the subscriber, are no longer used by the HTTP Client. The
853      * subscriber converts the incoming buffers of data to some higher-level
854      * Java type {@code T}.
855      *
856      * <p> The {@link #getBody()} method returns a
857      * {@link CompletionStage}{@code <T>} that provides the response body
858      * object. The {@code CompletionStage} must be obtainable at any time. When
859      * it completes depends on the nature of type {@code T}. In many cases,
860      * when {@code T} represents the entire body after being consumed then
861      * the {@code CompletionStage} completes after the body has been consumed.
862      * If  {@code T} is a streaming type, such as {@link java.io.InputStream
863      * InputStream}, then it completes before the body has been read, because
864      * the calling code uses the {@code InputStream} to consume the data.
865      *
866      * @apiNote To ensure that all resources associated with the corresponding
867      * HTTP exchange are properly released, an implementation of {@code
868      * BodySubscriber} should ensure to {@linkplain Flow.Subscription#request
869      * request} more data until one of {@link #onComplete() onComplete} or
870      * {@link #onError(Throwable) onError} are signalled, or {@link
871      * Flow.Subscription#request cancel} its {@linkplain
872      * #onSubscribe(Flow.Subscription) subscription} if unable or unwilling to
873      * do so. Calling {@code cancel} before exhausting the response body data
874      * may cause the underlying HTTP connection to be closed and prevent it
875      * from being reused for subsequent operations.
876      *
877      * @implNote The flow of data containing the response body is immutable.
878      * Specifically, it is a flow of unmodifiable lists of read-only ByteBuffers.
879      *
880      * @param <T> the response body type
881      * @see BodySubscribers
882      * @since 11
883      */
884     public interface BodySubscriber<T>
885             extends Flow.Subscriber<List<ByteBuffer>> {
886 
887         /**
888          * Returns a {@code CompletionStage} which when completed will return
889          * the response body object. This method can be called at any time
890          * relative to the other {@link Flow.Subscriber} methods and is invoked
891          * using the client's {@link HttpClient#executor() executor}.
892          *
893          * @return a CompletionStage for the response body
894          */
getBody()895         public CompletionStage<T> getBody();
896     }
897 
898     /**
899      * Implementations of {@link BodySubscriber BodySubscriber} that implement
900      * various useful subscribers, such as converting the response body bytes
901      * into a String, or streaming the bytes to a file.
902      *
903      * <p>The following are examples of using the predefined body subscribers
904      * to convert a flow of response body data into common high-level Java
905      * objects:
906      *
907      * <pre>{@code    // Streams the response body to a File
908      *   HttpResponse<Path> response = client
909      *     .send(request, responseInfo -> BodySubscribers.ofFile(Paths.get("example.html"));
910      *
911      *   // Accumulates the response body and returns it as a byte[]
912      *   HttpResponse<byte[]> response = client
913      *     .send(request, responseInfo -> BodySubscribers.ofByteArray());
914      *
915      *   // Discards the response body
916      *   HttpResponse<Void> response = client
917      *     .send(request, responseInfo -> BodySubscribers.discarding());
918      *
919      *   // Accumulates the response body as a String then maps it to its bytes
920      *   HttpResponse<byte[]> response = client
921      *     .send(request, responseInfo ->
922      *        BodySubscribers.mapping(BodySubscribers.ofString(UTF_8), String::getBytes));
923      * }</pre>
924      *
925      * @since 11
926      */
927     public static class BodySubscribers {
928 
BodySubscribers()929         private BodySubscribers() { }
930 
931         /**
932          * Returns a body subscriber that forwards all response body to the
933          * given {@code Flow.Subscriber}. The {@linkplain BodySubscriber#getBody()
934          * completion stage} of the returned body subscriber completes after one
935          * of the given subscribers {@code onComplete} or {@code onError} has
936          * been invoked.
937          *
938          * @apiNote This method can be used as an adapter between {@code
939          * BodySubscriber} and {@code Flow.Subscriber}.
940          *
941          * @param subscriber the subscriber
942          * @return a body subscriber
943          */
944         public static BodySubscriber<Void>
fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber)945         fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) {
946             return new ResponseSubscribers.SubscriberAdapter<>(subscriber, s -> null);
947         }
948 
949         /**
950          * Returns a body subscriber that forwards all response body to the
951          * given {@code Flow.Subscriber}. The {@linkplain BodySubscriber#getBody()
952          * completion stage} of the returned body subscriber completes after one
953          * of the given subscribers {@code onComplete} or {@code onError} has
954          * been invoked.
955          *
956          * <p> The given {@code finisher} function is applied after the given
957          * subscriber's {@code onComplete} has been invoked. The {@code finisher}
958          * function is invoked with the given subscriber, and returns a value
959          * that is set as the response's body.
960          *
961          * @apiNote This method can be used as an adapter between {@code
962          * BodySubscriber} and {@code Flow.Subscriber}.
963          *
964          * @param <S> the type of the Subscriber
965          * @param <T> the type of the response body
966          * @param subscriber the subscriber
967          * @param finisher a function to be applied after the subscriber has
968          *                 completed
969          * @return a body subscriber
970          */
971         public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodySubscriber<T>
fromSubscriber(S subscriber, Function<? super S,? extends T> finisher)972         fromSubscriber(S subscriber,
973                        Function<? super S,? extends T> finisher) {
974             return new ResponseSubscribers.SubscriberAdapter<>(subscriber, finisher);
975         }
976 
977         /**
978          * Returns a body subscriber that forwards all response body to the
979          * given {@code Flow.Subscriber}, line by line.
980          * The {@linkplain BodySubscriber#getBody() completion
981          * stage} of the returned body subscriber completes after one of the
982          * given subscribers {@code onComplete} or {@code onError} has been
983          * invoked.
984          * Bytes are decoded using the {@link StandardCharsets#UTF_8
985          * UTF-8} charset, and lines are delimited in the manner of
986          * {@link BufferedReader#readLine()}.
987          *
988          * @apiNote This method can be used as an adapter between {@code
989          * BodySubscriber} and {@code Flow.Subscriber}.
990          *
991          * @implNote This is equivalent to calling <pre>{@code
992          *      fromLineSubscriber(subscriber, s -> null, StandardCharsets.UTF_8, null)
993          * }</pre>
994          *
995          * @param subscriber the subscriber
996          * @return a body subscriber
997          */
998         public static BodySubscriber<Void>
fromLineSubscriber(Subscriber<? super String> subscriber)999         fromLineSubscriber(Subscriber<? super String> subscriber) {
1000             return fromLineSubscriber(subscriber,  s -> null,
1001                     StandardCharsets.UTF_8, null);
1002         }
1003 
1004         /**
1005          * Returns a body subscriber that forwards all response body to the
1006          * given {@code Flow.Subscriber}, line by line. The {@linkplain
1007          * BodySubscriber#getBody() completion stage} of the returned body
1008          * subscriber completes after one of the given subscribers
1009          * {@code onComplete} or {@code onError} has been invoked.
1010          *
1011          * <p> The given {@code finisher} function is applied after the given
1012          * subscriber's {@code onComplete} has been invoked. The {@code finisher}
1013          * function is invoked with the given subscriber, and returns a value
1014          * that is set as the response's body.
1015          *
1016          * @apiNote This method can be used as an adapter between {@code
1017          * BodySubscriber} and {@code Flow.Subscriber}.
1018          *
1019          * @param <S> the type of the Subscriber
1020          * @param <T> the type of the response body
1021          * @param subscriber the subscriber
1022          * @param finisher a function to be applied after the subscriber has
1023          *                 completed
1024          * @param charset a {@link Charset} to decode the bytes
1025          * @param lineSeparator an optional line separator: can be {@code null},
1026          *                      in which case lines will be delimited in the manner of
1027          *                      {@link BufferedReader#readLine()}.
1028          * @return a body subscriber
1029          * @throws IllegalArgumentException if the supplied {@code lineSeparator}
1030          *         is the empty string
1031          */
1032         public static <S extends Subscriber<? super String>,T> BodySubscriber<T>
fromLineSubscriber(S subscriber, Function<? super S,? extends T> finisher, Charset charset, String lineSeparator)1033         fromLineSubscriber(S subscriber,
1034                            Function<? super S,? extends T> finisher,
1035                            Charset charset,
1036                            String lineSeparator) {
1037             return LineSubscriberAdapter.create(subscriber,
1038                     finisher, charset, lineSeparator);
1039         }
1040 
1041         /**
1042          * Returns a body subscriber which stores the response body as a {@code
1043          * String} converted using the given {@code Charset}.
1044          *
1045          * <p> The {@link HttpResponse} using this subscriber is available after
1046          * the entire response has been read.
1047          *
1048          * @param charset the character set to convert the String with
1049          * @return a body subscriber
1050          */
ofString(Charset charset)1051         public static BodySubscriber<String> ofString(Charset charset) {
1052             Objects.requireNonNull(charset);
1053             return new ResponseSubscribers.ByteArraySubscriber<>(
1054                     bytes -> new String(bytes, charset)
1055             );
1056         }
1057 
1058         /**
1059          * Returns a {@code BodySubscriber} which stores the response body as a
1060          * byte array.
1061          *
1062          * <p> The {@link HttpResponse} using this subscriber is available after
1063          * the entire response has been read.
1064          *
1065          * @return a body subscriber
1066          */
ofByteArray()1067         public static BodySubscriber<byte[]> ofByteArray() {
1068             return new ResponseSubscribers.ByteArraySubscriber<>(
1069                     Function.identity() // no conversion
1070             );
1071         }
1072 
1073         /**
1074          * Returns a {@code BodySubscriber} which stores the response body in a
1075          * file opened with the given options and name. The file will be opened
1076          * with the given options using {@link FileChannel#open(Path,OpenOption...)
1077          * FileChannel.open} just before the body is read. Any exception thrown
1078          * will be returned or thrown from {@link HttpClient#send(HttpRequest,
1079          * BodyHandler) HttpClient::send} or {@link HttpClient#sendAsync(HttpRequest,
1080          * BodyHandler) HttpClient::sendAsync} as appropriate.
1081          *
1082          * <p> The {@link HttpResponse} using this subscriber is available after
1083          * the entire response has been read.
1084          *
1085          * <p> In the case of the default file system provider, security manager
1086          * permission checks are performed in this factory method, when the
1087          * {@code BodySubscriber} is created. Otherwise,
1088          * {@linkplain FileChannel#open(Path, OpenOption...) permission checks}
1089          * may be performed asynchronously against the caller's context
1090          * at file access time.
1091          * Care must be taken that the {@code BodySubscriber} is not shared with
1092          * untrusted code.
1093          *
1094          * @param  file the file to store the body in
1095          * @param  openOptions the list of options to open the file with
1096          * @return a body subscriber
1097          * @throws IllegalArgumentException if an invalid set of open options
1098          *         are specified
1099          * @throws SecurityException in the case of the default file system
1100          *         provider, and a security manager is installed,
1101          *         {@link SecurityManager#checkWrite(String) checkWrite}
1102          *         is invoked to check write access to the given file
1103          */
ofFile(Path file, OpenOption... openOptions)1104         public static BodySubscriber<Path> ofFile(Path file, OpenOption... openOptions) {
1105             Objects.requireNonNull(file);
1106             List<OpenOption> opts = List.of(openOptions);
1107             if (opts.contains(DELETE_ON_CLOSE) || opts.contains(READ)) {
1108                 // these options make no sense, since the FileChannel is not exposed
1109                 throw new IllegalArgumentException("invalid openOptions: " + opts);
1110             }
1111             return PathSubscriber.create(file, opts);
1112         }
1113 
1114         /**
1115          * Returns a {@code BodySubscriber} which stores the response body in a
1116          * file opened with the given name.
1117          *
1118          * <p> Equivalent to: {@code ofFile(file, CREATE, WRITE)}
1119          *
1120          * <p> In the case of the default file system provider, security manager
1121          * permission checks are performed in this factory method, when the
1122          * {@code BodySubscriber} is created. Otherwise,
1123          * {@linkplain FileChannel#open(Path, OpenOption...) permission checks}
1124          * may be performed asynchronously against the caller's context
1125          * at file access time.
1126          * Care must be taken that the {@code BodySubscriber} is not shared with
1127          * untrusted code.
1128          *
1129          * @param  file the file to store the body in
1130          * @return a body subscriber
1131          * @throws SecurityException in the case of the default file system
1132          *         provider, and a security manager is installed,
1133          *         {@link SecurityManager#checkWrite(String) checkWrite}
1134          *         is invoked to check write access to the given file
1135          */
ofFile(Path file)1136         public static BodySubscriber<Path> ofFile(Path file) {
1137             return ofFile(file, CREATE, WRITE);
1138         }
1139 
1140         /**
1141          * Returns a {@code BodySubscriber} which provides the incoming body
1142          * data to the provided Consumer of {@code Optional<byte[]>}. Each
1143          * call to {@link Consumer#accept(java.lang.Object) Consumer.accept()}
1144          * will contain a non empty {@code Optional}, except for the final
1145          * invocation after all body data has been read, when the {@code
1146          * Optional} will be empty.
1147          *
1148          * <p> The {@link HttpResponse} using this subscriber is available after
1149          * the entire response has been read.
1150          *
1151          * @apiNote
1152          * This subscriber is not flow controlled.
1153          * Therefore, the supplied consumer must be able to process whatever
1154          * amount of data is delivered in a timely fashion.
1155          *
1156          * @param consumer a Consumer of byte arrays
1157          * @return a BodySubscriber
1158          */
1159         public static BodySubscriber<Void>
ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer)1160         ofByteArrayConsumer(Consumer<Optional<byte[]>> consumer) {
1161             return new ResponseSubscribers.ConsumerSubscriber(consumer);
1162         }
1163 
1164         /**
1165          * Returns a {@code BodySubscriber} which streams the response body as
1166          * an {@link InputStream}.
1167          *
1168          * <p> The {@link HttpResponse} using this subscriber is available
1169          * immediately after the response headers have been read, without
1170          * requiring to wait for the entire body to be processed. The response
1171          * body can then be read directly from the {@link InputStream}.
1172          *
1173          * @apiNote To ensure that all resources associated with the
1174          * corresponding exchange are properly released the caller must
1175          * ensure to either read all bytes until EOF is reached, or call
1176          * {@link InputStream#close} if it is unable or unwilling to do so.
1177          * Calling {@code close} before exhausting the stream may cause
1178          * the underlying HTTP connection to be closed and prevent it
1179          * from being reused for subsequent operations.
1180          *
1181          * @return a body subscriber that streams the response body as an
1182          *         {@link InputStream}.
1183          */
ofInputStream()1184         public static BodySubscriber<InputStream> ofInputStream() {
1185             return new ResponseSubscribers.HttpResponseInputStream();
1186         }
1187 
1188         /**
1189          * Returns a {@code BodySubscriber} which streams the response body as
1190          * a {@link Stream Stream}{@code <String>}, where each string in the stream
1191          * corresponds to a line as defined by {@link BufferedReader#lines()}.
1192          *
1193          * <p> The {@link HttpResponse} using this subscriber is available
1194          * immediately after the response headers have been read, without
1195          * requiring to wait for the entire body to be processed. The response
1196          * body can then be read directly from the {@link Stream}.
1197          *
1198          * @apiNote To ensure that all resources associated with the
1199          * corresponding exchange are properly released the caller must
1200          * ensure to either read all lines until the stream is exhausted,
1201          * or call {@link Stream#close} if it is unable or unwilling to do so.
1202          * Calling {@code close} before exhausting the stream may cause
1203          * the underlying HTTP connection to be closed and prevent it
1204          * from being reused for subsequent operations.
1205          *
1206          * @param charset the character set to use when converting bytes to characters
1207          * @return a body subscriber that streams the response body as a
1208          *         {@link Stream Stream}{@code <String>}.
1209          *
1210          * @see BufferedReader#lines()
1211          */
ofLines(Charset charset)1212         public static BodySubscriber<Stream<String>> ofLines(Charset charset) {
1213             return ResponseSubscribers.createLineStream(charset);
1214         }
1215 
1216         /**
1217          * Returns a response subscriber which publishes the response body
1218          * through a {@code Publisher<List<ByteBuffer>>}.
1219          *
1220          * <p> The {@link HttpResponse} using this subscriber is available
1221          * immediately after the response headers have been read, without
1222          * requiring to wait for the entire body to be processed. The response
1223          * body bytes can then be obtained by subscribing to the publisher
1224          * returned by the {@code HttpResponse} {@link HttpResponse#body() body}
1225          * method.
1226          *
1227          * <p>The publisher returned by the {@link HttpResponse#body() body}
1228          * method can be subscribed to only once. The first subscriber will
1229          * receive the body response bytes if successfully subscribed, or will
1230          * cause the subscription to be cancelled otherwise.
1231          * If more subscriptions are attempted, the subsequent subscribers will
1232          * be immediately subscribed with an empty subscription and their
1233          * {@link Subscriber#onError(Throwable) onError} method
1234          * will be invoked with an {@code IllegalStateException}.
1235          *
1236          * @apiNote To ensure that all resources associated with the
1237          * corresponding exchange are properly released the caller must
1238          * ensure that the provided publisher is subscribed once, and either
1239          * {@linkplain Subscription#request(long) requests} all bytes
1240          * until {@link Subscriber#onComplete() onComplete} or
1241          * {@link Subscriber#onError(Throwable) onError} are invoked, or
1242          * cancel the provided {@linkplain Subscriber#onSubscribe(Subscription)
1243          * subscription} if it is unable or unwilling to do so.
1244          * Note that depending on the actual HTTP protocol {@linkplain
1245          * HttpClient.Version version} used for the exchange, cancelling the
1246          * subscription instead of exhausting the flow may cause the underlying
1247          * HTTP connection to be closed and prevent it from being reused for
1248          * subsequent operations.
1249          *
1250          * @return A {@code BodySubscriber} which publishes the response body
1251          *         through a {@code Publisher<List<ByteBuffer>>}.
1252          */
ofPublisher()1253         public static BodySubscriber<Publisher<List<ByteBuffer>>> ofPublisher() {
1254             return ResponseSubscribers.createPublisher();
1255         }
1256 
1257         /**
1258          * Returns a response subscriber which discards the response body. The
1259          * supplied value is the value that will be returned from
1260          * {@link HttpResponse#body()}.
1261          *
1262          * @param <U> the type of the response body
1263          * @param value the value to return from HttpResponse.body(), may be {@code null}
1264          * @return a {@code BodySubscriber}
1265          */
replacing(U value)1266         public static <U> BodySubscriber<U> replacing(U value) {
1267             return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(value));
1268         }
1269 
1270         /**
1271          * Returns a response subscriber which discards the response body.
1272          *
1273          * @return a response body subscriber
1274          */
discarding()1275         public static BodySubscriber<Void> discarding() {
1276             return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(null));
1277         }
1278 
1279         /**
1280          * Returns a {@code BodySubscriber} which buffers data before delivering
1281          * it to the given downstream subscriber. The subscriber guarantees to
1282          * deliver {@code bufferSize} bytes of data to each invocation of the
1283          * downstream's {@link BodySubscriber#onNext(Object) onNext} method,
1284          * except for the final invocation, just before
1285          * {@link BodySubscriber#onComplete() onComplete} is invoked. The final
1286          * invocation of {@code onNext} may contain fewer than {@code bufferSize}
1287          * bytes.
1288          *
1289          * <p> The returned subscriber delegates its {@link BodySubscriber#getBody()
1290          * getBody()} method to the downstream subscriber.
1291          *
1292          * @param <T> the type of the response body
1293          * @param downstream the downstream subscriber
1294          * @param bufferSize the buffer size
1295          * @return a buffering body subscriber
1296          * @throws IllegalArgumentException if {@code bufferSize <= 0}
1297          */
buffering(BodySubscriber<T> downstream, int bufferSize)1298          public static <T> BodySubscriber<T> buffering(BodySubscriber<T> downstream,
1299                                                        int bufferSize) {
1300              if (bufferSize <= 0)
1301                  throw new IllegalArgumentException("must be greater than 0");
1302              return new BufferingSubscriber<>(downstream, bufferSize);
1303          }
1304 
1305         /**
1306          * Returns a {@code BodySubscriber} whose response body value is that of
1307          * the result of applying the given function to the body object of the
1308          * given {@code upstream} {@code BodySubscriber}.
1309          *
1310          * <p> The mapping function is executed using the client's {@linkplain
1311          * HttpClient#executor() executor}, and can therefore be used to map any
1312          * response body type, including blocking {@link InputStream}.
1313          * However, performing any blocking operation in the mapper function
1314          * runs the risk of blocking the executor's thread for an unknown
1315          * amount of time (at least until the blocking operation finishes),
1316          * which may end up starving the executor of available threads.
1317          * Therefore, in the case where mapping to the desired type might
1318          * block (e.g. by reading on the {@code InputStream}), then mapping
1319          * to a {@link java.util.function.Supplier Supplier} of the desired
1320          * type and deferring the blocking operation until {@link Supplier#get()
1321          * Supplier::get} is invoked by the caller's thread should be preferred,
1322          * as shown in the following example which uses a well-known JSON parser to
1323          * convert an {@code InputStream} into any annotated Java type.
1324          *
1325          * <p>For example:
1326          * <pre> {@code  public static <W> BodySubscriber<Supplier<W>> asJSON(Class<W> targetType) {
1327          *     BodySubscriber<InputStream> upstream = BodySubscribers.ofInputStream();
1328          *
1329          *     BodySubscriber<Supplier<W>> downstream = BodySubscribers.mapping(
1330          *           upstream,
1331          *           (InputStream is) -> () -> {
1332          *               try (InputStream stream = is) {
1333          *                   ObjectMapper objectMapper = new ObjectMapper();
1334          *                   return objectMapper.readValue(stream, targetType);
1335          *               } catch (IOException e) {
1336          *                   throw new UncheckedIOException(e);
1337          *               }
1338          *           });
1339          *    return downstream;
1340          *  } }</pre>
1341          *
1342          * @param <T> the upstream body type
1343          * @param <U> the type of the body subscriber returned
1344          * @param upstream the body subscriber to be mapped
1345          * @param mapper the mapping function
1346          * @return a mapping body subscriber
1347          */
mapping(BodySubscriber<T> upstream, Function<? super T, ? extends U> mapper)1348         public static <T,U> BodySubscriber<U> mapping(BodySubscriber<T> upstream,
1349                                                       Function<? super T, ? extends U> mapper)
1350         {
1351             return new ResponseSubscribers.MappingSubscriber<>(upstream, mapper);
1352         }
1353     }
1354 }
1355