1 //! Client implementation of the HTTP/2 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2 client requires the caller to establish the underlying
6 //! connection as well as get the connection to a state that is ready to begin
7 //! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11 //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12 //!
13 //! Once a connection is obtained, it is passed to [`handshake`], which will
14 //! begin the [HTTP/2 handshake]. This returns a future that completes once
15 //! the handshake process is performed and HTTP/2 streams may be initialized.
16 //!
17 //! [`handshake`] uses default configuration values. There are a number of
18 //! settings that can be changed by using [`Builder`] instead.
19 //!
20 //! Once the handshake future completes, the caller is provided with a
21 //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22 //! instance is used to drive the connection (see [Managing the connection]).
23 //! The [`SendRequest`] instance is used to initialize new streams (see [Making
24 //! requests]).
25 //!
26 //! # Making requests
27 //!
28 //! Requests are made using the [`SendRequest`] handle provided by the handshake
29 //! future. Once a request is submitted, an HTTP/2 stream is initialized and
30 //! the request is sent to the server.
31 //!
32 //! A request body and request trailers are sent using [`SendRequest`] and the
33 //! server's response is returned once the [`ResponseFuture`] future completes.
34 //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35 //! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36 //! initialized by the sent request.
37 //!
38 //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39 //! stream can be created, i.e. as long as the current number of active streams
40 //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41 //! caller will be notified once an existing stream closes, freeing capacity for
42 //! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43 //! capacity before sending a request to the server.
44 //!
45 //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46 //! must not send a request if `poll_ready` does not return `Ready`. Attempting
47 //! to do so will result in an [`Error`] being returned.
48 //!
49 //! # Managing the connection
50 //!
51 //! The [`Connection`] instance is used to manage connection state. The caller
52 //! is required to call [`Connection::poll`] in order to advance state.
53 //! [`SendRequest::send_request`] and other functions have no effect unless
54 //! [`Connection::poll`] is called.
55 //!
56 //! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57 //! returns `Ready`. At this point, the underlying socket has been closed and no
58 //! further work needs to be done.
59 //!
60 //! The easiest way to ensure that the [`Connection`] instance gets polled is to
61 //! submit the [`Connection`] instance to an [executor]. The executor will then
62 //! manage polling the connection until the connection is complete.
63 //! Alternatively, the caller can call `poll` manually.
64 //!
65 //! # Example
66 //!
67 //! ```rust, no_run
68 //!
69 //! use h2::client;
70 //!
71 //! use http::{Request, Method};
72 //! use std::error::Error;
73 //! use tokio::net::TcpStream;
74 //!
75 //! #[tokio::main]
76 //! pub async fn main() -> Result<(), Box<dyn Error>> {
77 //!     // Establish TCP connection to the server.
78 //!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79 //!     let (h2, connection) = client::handshake(tcp).await?;
80 //!     tokio::spawn(async move {
81 //!         connection.await.unwrap();
82 //!     });
83 //!
84 //!     let mut h2 = h2.ready().await?;
85 //!     // Prepare the HTTP request to send to the server.
86 //!     let request = Request::builder()
87 //!                     .method(Method::GET)
88 //!                     .uri("https://www.example.com/")
89 //!                     .body(())
90 //!                     .unwrap();
91 //!
92 //!     // Send the request. The second tuple item allows the caller
93 //!     // to stream a request body.
94 //!     let (response, _) = h2.send_request(request, true).unwrap();
95 //!
96 //!     let (head, mut body) = response.await?.into_parts();
97 //!
98 //!     println!("Received response: {:?}", head);
99 //!
100 //!     // The `flow_control` handle allows the caller to manage
101 //!     // flow control.
102 //!     //
103 //!     // Whenever data is received, the caller is responsible for
104 //!     // releasing capacity back to the server once it has freed
105 //!     // the data from memory.
106 //!     let mut flow_control = body.flow_control().clone();
107 //!
108 //!     while let Some(chunk) = body.data().await {
109 //!         let chunk = chunk?;
110 //!         println!("RX: {:?}", chunk);
111 //!
112 //!         // Let the server send more data.
113 //!         let _ = flow_control.release_capacity(chunk.len());
114 //!     }
115 //!
116 //!     Ok(())
117 //! }
118 //! ```
119 //!
120 //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121 //! [`handshake`]: fn.handshake.html
122 //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123 //! [`SendRequest`]: struct.SendRequest.html
124 //! [`SendStream`]: ../struct.SendStream.html
125 //! [Making requests]: #making-requests
126 //! [Managing the connection]: #managing-the-connection
127 //! [`Connection`]: struct.Connection.html
128 //! [`Connection::poll`]: struct.Connection.html#method.poll
129 //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130 //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131 //! [`SendRequest`]: struct.SendRequest.html
132 //! [`ResponseFuture`]: struct.ResponseFuture.html
133 //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134 //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135 //! [`Builder`]: struct.Builder.html
136 //! [`Error`]: ../struct.Error.html
137 
138 use crate::codec::{Codec, SendError, UserError};
139 use crate::ext::Protocol;
140 use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
141 use crate::proto::{self, Error};
142 use crate::{FlowControl, PingPong, RecvStream, SendStream};
143 
144 use bytes::{Buf, Bytes};
145 use http::{uri, HeaderMap, Method, Request, Response, Version};
146 use std::fmt;
147 use std::future::Future;
148 use std::pin::Pin;
149 use std::task::{Context, Poll};
150 use std::time::Duration;
151 use std::usize;
152 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
153 use tracing::Instrument;
154 
155 /// Initializes new HTTP/2 streams on a connection by sending a request.
156 ///
157 /// This type does no work itself. Instead, it is a handle to the inner
158 /// connection state held by [`Connection`]. If the associated connection
159 /// instance is dropped, all `SendRequest` functions will return [`Error`].
160 ///
161 /// [`SendRequest`] instances are able to move to and operate on separate tasks
162 /// / threads than their associated [`Connection`] instance. Internally, there
163 /// is a buffer used to stage requests before they get written to the
164 /// connection. There is no guarantee that requests get written to the
165 /// connection in FIFO order as HTTP/2 prioritization logic can play a role.
166 ///
167 /// [`SendRequest`] implements [`Clone`], enabling the creation of many
168 /// instances that are backed by a single connection.
169 ///
170 /// See [module] level documentation for more details.
171 ///
172 /// [module]: index.html
173 /// [`Connection`]: struct.Connection.html
174 /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
175 /// [`Error`]: ../struct.Error.html
176 pub struct SendRequest<B: Buf> {
177     inner: proto::Streams<B, Peer>,
178     pending: Option<proto::OpaqueStreamRef>,
179 }
180 
181 /// Returns a `SendRequest` instance once it is ready to send at least one
182 /// request.
183 #[derive(Debug)]
184 pub struct ReadySendRequest<B: Buf> {
185     inner: Option<SendRequest<B>>,
186 }
187 
188 /// Manages all state associated with an HTTP/2 client connection.
189 ///
190 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
191 /// implements the HTTP/2 client logic for that connection. It is responsible
192 /// for driving the internal state forward, performing the work requested of the
193 /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
194 /// [`RecvStream`]).
195 ///
196 /// `Connection` values are created by calling [`handshake`]. Once a
197 /// `Connection` value is obtained, the caller must repeatedly call [`poll`]
198 /// until `Ready` is returned. The easiest way to do this is to submit the
199 /// `Connection` instance to an [executor].
200 ///
201 /// [module]: index.html
202 /// [`handshake`]: fn.handshake.html
203 /// [`SendRequest`]: struct.SendRequest.html
204 /// [`ResponseFuture`]: struct.ResponseFuture.html
205 /// [`SendStream`]: ../struct.SendStream.html
206 /// [`RecvStream`]: ../struct.RecvStream.html
207 /// [`poll`]: #method.poll
208 /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
209 ///
210 /// # Examples
211 ///
212 /// ```
213 /// # use tokio::io::{AsyncRead, AsyncWrite};
214 /// # use h2::client;
215 /// # use h2::client::*;
216 /// #
217 /// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
218 /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
219 /// # {
220 ///     let (send_request, connection) = client::handshake(my_io).await?;
221 ///     // Submit the connection handle to an executor.
222 ///     tokio::spawn(async { connection.await.expect("connection failed"); });
223 ///
224 ///     // Now, use `send_request` to initialize HTTP/2 streams.
225 ///     // ...
226 /// # Ok(())
227 /// # }
228 /// #
229 /// # pub fn main() {}
230 /// ```
231 #[must_use = "futures do nothing unless polled"]
232 pub struct Connection<T, B: Buf = Bytes> {
233     inner: proto::Connection<T, Peer, B>,
234 }
235 
236 /// A future of an HTTP response.
237 #[derive(Debug)]
238 #[must_use = "futures do nothing unless polled"]
239 pub struct ResponseFuture {
240     inner: proto::OpaqueStreamRef,
241     push_promise_consumed: bool,
242 }
243 
244 /// A future of a pushed HTTP response.
245 ///
246 /// We have to differentiate between pushed and non pushed because of the spec
247 /// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
248 /// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
249 /// > that is in either the "open" or "half-closed (remote)" state.
250 #[derive(Debug)]
251 #[must_use = "futures do nothing unless polled"]
252 pub struct PushedResponseFuture {
253     inner: ResponseFuture,
254 }
255 
256 /// A pushed response and corresponding request headers
257 #[derive(Debug)]
258 pub struct PushPromise {
259     /// The request headers
260     request: Request<()>,
261 
262     /// The pushed response
263     response: PushedResponseFuture,
264 }
265 
266 /// A stream of pushed responses and corresponding promised requests
267 #[derive(Debug)]
268 #[must_use = "streams do nothing unless polled"]
269 pub struct PushPromises {
270     inner: proto::OpaqueStreamRef,
271 }
272 
273 /// Builds client connections with custom configuration values.
274 ///
275 /// Methods can be chained in order to set the configuration values.
276 ///
277 /// The client is constructed by calling [`handshake`] and passing the I/O
278 /// handle that will back the HTTP/2 server.
279 ///
280 /// New instances of `Builder` are obtained via [`Builder::new`].
281 ///
282 /// See function level documentation for details on the various client
283 /// configuration settings.
284 ///
285 /// [`Builder::new`]: struct.Builder.html#method.new
286 /// [`handshake`]: struct.Builder.html#method.handshake
287 ///
288 /// # Examples
289 ///
290 /// ```
291 /// # use tokio::io::{AsyncRead, AsyncWrite};
292 /// # use h2::client::*;
293 /// # use bytes::Bytes;
294 /// #
295 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
296 ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
297 /// # {
298 /// // `client_fut` is a future representing the completion of the HTTP/2
299 /// // handshake.
300 /// let client_fut = Builder::new()
301 ///     .initial_window_size(1_000_000)
302 ///     .max_concurrent_streams(1000)
303 ///     .handshake(my_io);
304 /// # client_fut.await
305 /// # }
306 /// #
307 /// # pub fn main() {}
308 /// ```
309 #[derive(Clone, Debug)]
310 pub struct Builder {
311     /// Time to keep locally reset streams around before reaping.
312     reset_stream_duration: Duration,
313 
314     /// Initial maximum number of locally initiated (send) streams.
315     /// After receiving a Settings frame from the remote peer,
316     /// the connection will overwrite this value with the
317     /// MAX_CONCURRENT_STREAMS specified in the frame.
318     initial_max_send_streams: usize,
319 
320     /// Initial target window size for new connections.
321     initial_target_connection_window_size: Option<u32>,
322 
323     /// Maximum amount of bytes to "buffer" for writing per stream.
324     max_send_buffer_size: usize,
325 
326     /// Maximum number of locally reset streams to keep at a time.
327     reset_stream_max: usize,
328 
329     /// Initial `Settings` frame to send as part of the handshake.
330     settings: Settings,
331 
332     /// The stream ID of the first (lowest) stream. Subsequent streams will use
333     /// monotonically increasing stream IDs.
334     stream_id: StreamId,
335 }
336 
337 #[derive(Debug)]
338 pub(crate) struct Peer;
339 
340 // ===== impl SendRequest =====
341 
342 impl<B> SendRequest<B>
343 where
344     B: Buf + 'static,
345 {
346     /// Returns `Ready` when the connection can initialize a new HTTP/2
347     /// stream.
348     ///
349     /// This function must return `Ready` before `send_request` is called. When
350     /// `Poll::Pending` is returned, the task will be notified once the readiness
351     /// state changes.
352     ///
353     /// See [module] level docs for more details.
354     ///
355     /// [module]: index.html
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>356     pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
357         ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
358         self.pending = None;
359         Poll::Ready(Ok(()))
360     }
361 
362     /// Consumes `self`, returning a future that returns `self` back once it is
363     /// ready to send a request.
364     ///
365     /// This function should be called before calling `send_request`.
366     ///
367     /// This is a functional combinator for [`poll_ready`]. The returned future
368     /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
369     /// the caller.
370     ///
371     /// # Examples
372     ///
373     /// ```rust
374     /// # use h2::client::*;
375     /// # use http::*;
376     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
377     /// # {
378     /// // First, wait until the `send_request` handle is ready to send a new
379     /// // request
380     /// let mut send_request = send_request.ready().await.unwrap();
381     /// // Use `send_request` here.
382     /// # }
383     /// # pub fn main() {}
384     /// ```
385     ///
386     /// See [module] level docs for more details.
387     ///
388     /// [`poll_ready`]: #method.poll_ready
389     /// [module]: index.html
ready(self) -> ReadySendRequest<B>390     pub fn ready(self) -> ReadySendRequest<B> {
391         ReadySendRequest { inner: Some(self) }
392     }
393 
394     /// Sends a HTTP/2 request to the server.
395     ///
396     /// `send_request` initializes a new HTTP/2 stream on the associated
397     /// connection, then sends the given request using this new stream. Only the
398     /// request head is sent.
399     ///
400     /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
401     /// are returned. The [`ResponseFuture`] instance is used to get the
402     /// server's response and the [`SendStream`] instance is used to send a
403     /// request body or trailers to the server over the same HTTP/2 stream.
404     ///
405     /// To send a request body or trailers, set `end_of_stream` to `false`.
406     /// Then, use the returned [`SendStream`] instance to stream request body
407     /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
408     /// then attempting to call [`SendStream::send_data`] or
409     /// [`SendStream::send_trailers`] will result in an error.
410     ///
411     /// If no request body or trailers are to be sent, set `end_of_stream` to
412     /// `true` and drop the returned [`SendStream`] instance.
413     ///
414     /// # A note on HTTP versions
415     ///
416     /// The provided `Request` will be encoded differently depending on the
417     /// value of its version field. If the version is set to 2.0, then the
418     /// request is encoded as per the specification recommends.
419     ///
420     /// If the version is set to a lower value, then the request is encoded to
421     /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
422     /// headers are permitted and the `:authority` pseudo header is not
423     /// included.
424     ///
425     /// The caller should always set the request's version field to 2.0 unless
426     /// specifically transmitting an HTTP 1.1 request over 2.0.
427     ///
428     /// # Examples
429     ///
430     /// Sending a request with no body
431     ///
432     /// ```rust
433     /// # use h2::client::*;
434     /// # use http::*;
435     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
436     /// # {
437     /// // First, wait until the `send_request` handle is ready to send a new
438     /// // request
439     /// let mut send_request = send_request.ready().await.unwrap();
440     /// // Prepare the HTTP request to send to the server.
441     /// let request = Request::get("https://www.example.com/")
442     ///     .body(())
443     ///     .unwrap();
444     ///
445     /// // Send the request to the server. Since we are not sending a
446     /// // body or trailers, we can drop the `SendStream` instance.
447     /// let (response, _) = send_request.send_request(request, true).unwrap();
448     /// let response = response.await.unwrap();
449     /// // Process the response
450     /// # }
451     /// # pub fn main() {}
452     /// ```
453     ///
454     /// Sending a request with a body and trailers
455     ///
456     /// ```rust
457     /// # use h2::client::*;
458     /// # use http::*;
459     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
460     /// # {
461     /// // First, wait until the `send_request` handle is ready to send a new
462     /// // request
463     /// let mut send_request = send_request.ready().await.unwrap();
464     ///
465     /// // Prepare the HTTP request to send to the server.
466     /// let request = Request::get("https://www.example.com/")
467     ///     .body(())
468     ///     .unwrap();
469     ///
470     /// // Send the request to the server. If we are not sending a
471     /// // body or trailers, we can drop the `SendStream` instance.
472     /// let (response, mut send_stream) = send_request
473     ///     .send_request(request, false).unwrap();
474     ///
475     /// // At this point, one option would be to wait for send capacity.
476     /// // Doing so would allow us to not hold data in memory that
477     /// // cannot be sent. However, this is not a requirement, so this
478     /// // example will skip that step. See `SendStream` documentation
479     /// // for more details.
480     /// send_stream.send_data(b"hello", false).unwrap();
481     /// send_stream.send_data(b"world", false).unwrap();
482     ///
483     /// // Send the trailers.
484     /// let mut trailers = HeaderMap::new();
485     /// trailers.insert(
486     ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
487     ///     header::HeaderValue::from_bytes(b"hello").unwrap());
488     ///
489     /// send_stream.send_trailers(trailers).unwrap();
490     ///
491     /// let response = response.await.unwrap();
492     /// // Process the response
493     /// # }
494     /// # pub fn main() {}
495     /// ```
496     ///
497     /// [`ResponseFuture`]: struct.ResponseFuture.html
498     /// [`SendStream`]: ../struct.SendStream.html
499     /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
500     /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
send_request( &mut self, request: Request<()>, end_of_stream: bool, ) -> Result<(ResponseFuture, SendStream<B>), crate::Error>501     pub fn send_request(
502         &mut self,
503         request: Request<()>,
504         end_of_stream: bool,
505     ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
506         self.inner
507             .send_request(request, end_of_stream, self.pending.as_ref())
508             .map_err(Into::into)
509             .map(|stream| {
510                 if stream.is_pending_open() {
511                     self.pending = Some(stream.clone_to_opaque());
512                 }
513 
514                 let response = ResponseFuture {
515                     inner: stream.clone_to_opaque(),
516                     push_promise_consumed: false,
517                 };
518 
519                 let stream = SendStream::new(stream);
520 
521                 (response, stream)
522             })
523     }
524 
525     /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
526     ///
527     /// This setting is configured by the server peer by sending the
528     /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
529     /// This method returns the currently acknowledged value recieved from the
530     /// remote.
531     ///
532     /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
533     /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
is_extended_connect_protocol_enabled(&self) -> bool534     pub fn is_extended_connect_protocol_enabled(&self) -> bool {
535         self.inner.is_extended_connect_protocol_enabled()
536     }
537 }
538 
539 impl<B> fmt::Debug for SendRequest<B>
540 where
541     B: Buf,
542 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result543     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
544         fmt.debug_struct("SendRequest").finish()
545     }
546 }
547 
548 impl<B> Clone for SendRequest<B>
549 where
550     B: Buf,
551 {
clone(&self) -> Self552     fn clone(&self) -> Self {
553         SendRequest {
554             inner: self.inner.clone(),
555             pending: None,
556         }
557     }
558 }
559 
560 #[cfg(feature = "unstable")]
561 impl<B> SendRequest<B>
562 where
563     B: Buf,
564 {
565     /// Returns the number of active streams.
566     ///
567     /// An active stream is a stream that has not yet transitioned to a closed
568     /// state.
num_active_streams(&self) -> usize569     pub fn num_active_streams(&self) -> usize {
570         self.inner.num_active_streams()
571     }
572 
573     /// Returns the number of streams that are held in memory.
574     ///
575     /// A wired stream is a stream that is either active or is closed but must
576     /// stay in memory for some reason. For example, there are still outstanding
577     /// userspace handles pointing to the slot.
num_wired_streams(&self) -> usize578     pub fn num_wired_streams(&self) -> usize {
579         self.inner.num_wired_streams()
580     }
581 }
582 
583 // ===== impl ReadySendRequest =====
584 
585 impl<B> Future for ReadySendRequest<B>
586 where
587     B: Buf + 'static,
588 {
589     type Output = Result<SendRequest<B>, crate::Error>;
590 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>591     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
592         match &mut self.inner {
593             Some(send_request) => {
594                 ready!(send_request.poll_ready(cx))?;
595             }
596             None => panic!("called `poll` after future completed"),
597         }
598 
599         Poll::Ready(Ok(self.inner.take().unwrap()))
600     }
601 }
602 
603 // ===== impl Builder =====
604 
605 impl Builder {
606     /// Returns a new client builder instance initialized with default
607     /// configuration values.
608     ///
609     /// Configuration methods can be chained on the return value.
610     ///
611     /// # Examples
612     ///
613     /// ```
614     /// # use tokio::io::{AsyncRead, AsyncWrite};
615     /// # use h2::client::*;
616     /// # use bytes::Bytes;
617     /// #
618     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
619     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
620     /// # {
621     /// // `client_fut` is a future representing the completion of the HTTP/2
622     /// // handshake.
623     /// let client_fut = Builder::new()
624     ///     .initial_window_size(1_000_000)
625     ///     .max_concurrent_streams(1000)
626     ///     .handshake(my_io);
627     /// # client_fut.await
628     /// # }
629     /// #
630     /// # pub fn main() {}
631     /// ```
new() -> Builder632     pub fn new() -> Builder {
633         Builder {
634             max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
635             reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
636             reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
637             initial_target_connection_window_size: None,
638             initial_max_send_streams: usize::MAX,
639             settings: Default::default(),
640             stream_id: 1.into(),
641         }
642     }
643 
644     /// Indicates the initial window size (in octets) for stream-level
645     /// flow control for received data.
646     ///
647     /// The initial window of a stream is used as part of flow control. For more
648     /// details, see [`FlowControl`].
649     ///
650     /// The default value is 65,535.
651     ///
652     /// [`FlowControl`]: ../struct.FlowControl.html
653     ///
654     /// # Examples
655     ///
656     /// ```
657     /// # use tokio::io::{AsyncRead, AsyncWrite};
658     /// # use h2::client::*;
659     /// # use bytes::Bytes;
660     /// #
661     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
662     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
663     /// # {
664     /// // `client_fut` is a future representing the completion of the HTTP/2
665     /// // handshake.
666     /// let client_fut = Builder::new()
667     ///     .initial_window_size(1_000_000)
668     ///     .handshake(my_io);
669     /// # client_fut.await
670     /// # }
671     /// #
672     /// # pub fn main() {}
673     /// ```
initial_window_size(&mut self, size: u32) -> &mut Self674     pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
675         self.settings.set_initial_window_size(Some(size));
676         self
677     }
678 
679     /// Indicates the initial window size (in octets) for connection-level flow control
680     /// for received data.
681     ///
682     /// The initial window of a connection is used as part of flow control. For more details,
683     /// see [`FlowControl`].
684     ///
685     /// The default value is 65,535.
686     ///
687     /// [`FlowControl`]: ../struct.FlowControl.html
688     ///
689     /// # Examples
690     ///
691     /// ```
692     /// # use tokio::io::{AsyncRead, AsyncWrite};
693     /// # use h2::client::*;
694     /// # use bytes::Bytes;
695     /// #
696     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
697     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
698     /// # {
699     /// // `client_fut` is a future representing the completion of the HTTP/2
700     /// // handshake.
701     /// let client_fut = Builder::new()
702     ///     .initial_connection_window_size(1_000_000)
703     ///     .handshake(my_io);
704     /// # client_fut.await
705     /// # }
706     /// #
707     /// # pub fn main() {}
708     /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self709     pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
710         self.initial_target_connection_window_size = Some(size);
711         self
712     }
713 
714     /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
715     /// configured client is able to accept.
716     ///
717     /// The sender may send data frames that are **smaller** than this value,
718     /// but any data larger than `max` will be broken up into multiple `DATA`
719     /// frames.
720     ///
721     /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
722     ///
723     /// # Examples
724     ///
725     /// ```
726     /// # use tokio::io::{AsyncRead, AsyncWrite};
727     /// # use h2::client::*;
728     /// # use bytes::Bytes;
729     /// #
730     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
731     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
732     /// # {
733     /// // `client_fut` is a future representing the completion of the HTTP/2
734     /// // handshake.
735     /// let client_fut = Builder::new()
736     ///     .max_frame_size(1_000_000)
737     ///     .handshake(my_io);
738     /// # client_fut.await
739     /// # }
740     /// #
741     /// # pub fn main() {}
742     /// ```
743     ///
744     /// # Panics
745     ///
746     /// This function panics if `max` is not within the legal range specified
747     /// above.
max_frame_size(&mut self, max: u32) -> &mut Self748     pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
749         self.settings.set_max_frame_size(Some(max));
750         self
751     }
752 
753     /// Sets the max size of received header frames.
754     ///
755     /// This advisory setting informs a peer of the maximum size of header list
756     /// that the sender is prepared to accept, in octets. The value is based on
757     /// the uncompressed size of header fields, including the length of the name
758     /// and value in octets plus an overhead of 32 octets for each header field.
759     ///
760     /// This setting is also used to limit the maximum amount of data that is
761     /// buffered to decode HEADERS frames.
762     ///
763     /// # Examples
764     ///
765     /// ```
766     /// # use tokio::io::{AsyncRead, AsyncWrite};
767     /// # use h2::client::*;
768     /// # use bytes::Bytes;
769     /// #
770     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
771     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
772     /// # {
773     /// // `client_fut` is a future representing the completion of the HTTP/2
774     /// // handshake.
775     /// let client_fut = Builder::new()
776     ///     .max_header_list_size(16 * 1024)
777     ///     .handshake(my_io);
778     /// # client_fut.await
779     /// # }
780     /// #
781     /// # pub fn main() {}
782     /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self783     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
784         self.settings.set_max_header_list_size(Some(max));
785         self
786     }
787 
788     /// Sets the maximum number of concurrent streams.
789     ///
790     /// The maximum concurrent streams setting only controls the maximum number
791     /// of streams that can be initiated by the remote peer. In other words,
792     /// when this setting is set to 100, this does not limit the number of
793     /// concurrent streams that can be created by the caller.
794     ///
795     /// It is recommended that this value be no smaller than 100, so as to not
796     /// unnecessarily limit parallelism. However, any value is legal, including
797     /// 0. If `max` is set to 0, then the remote will not be permitted to
798     /// initiate streams.
799     ///
800     /// Note that streams in the reserved state, i.e., push promises that have
801     /// been reserved but the stream has not started, do not count against this
802     /// setting.
803     ///
804     /// Also note that if the remote *does* exceed the value set here, it is not
805     /// a protocol level error. Instead, the `h2` library will immediately reset
806     /// the stream.
807     ///
808     /// See [Section 5.1.2] in the HTTP/2 spec for more details.
809     ///
810     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
811     ///
812     /// # Examples
813     ///
814     /// ```
815     /// # use tokio::io::{AsyncRead, AsyncWrite};
816     /// # use h2::client::*;
817     /// # use bytes::Bytes;
818     /// #
819     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
820     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
821     /// # {
822     /// // `client_fut` is a future representing the completion of the HTTP/2
823     /// // handshake.
824     /// let client_fut = Builder::new()
825     ///     .max_concurrent_streams(1000)
826     ///     .handshake(my_io);
827     /// # client_fut.await
828     /// # }
829     /// #
830     /// # pub fn main() {}
831     /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self832     pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
833         self.settings.set_max_concurrent_streams(Some(max));
834         self
835     }
836 
837     /// Sets the initial maximum of locally initiated (send) streams.
838     ///
839     /// The initial settings will be overwritten by the remote peer when
840     /// the Settings frame is received. The new value will be set to the
841     /// `max_concurrent_streams()` from the frame.
842     ///
843     /// This setting prevents the caller from exceeding this number of
844     /// streams that are counted towards the concurrency limit.
845     ///
846     /// Sending streams past the limit returned by the peer will be treated
847     /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
848     ///
849     /// See [Section 5.1.2] in the HTTP/2 spec for more details.
850     ///
851     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
852     ///
853     /// # Examples
854     ///
855     /// ```
856     /// # use tokio::io::{AsyncRead, AsyncWrite};
857     /// # use h2::client::*;
858     /// # use bytes::Bytes;
859     /// #
860     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
861     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
862     /// # {
863     /// // `client_fut` is a future representing the completion of the HTTP/2
864     /// // handshake.
865     /// let client_fut = Builder::new()
866     ///     .initial_max_send_streams(1000)
867     ///     .handshake(my_io);
868     /// # client_fut.await
869     /// # }
870     /// #
871     /// # pub fn main() {}
872     /// ```
initial_max_send_streams(&mut self, initial: usize) -> &mut Self873     pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
874         self.initial_max_send_streams = initial;
875         self
876     }
877 
878     /// Sets the maximum number of concurrent locally reset streams.
879     ///
880     /// When a stream is explicitly reset, the HTTP/2 specification requires
881     /// that any further frames received for that stream must be ignored for
882     /// "some time".
883     ///
884     /// In order to satisfy the specification, internal state must be maintained
885     /// to implement the behavior. This state grows linearly with the number of
886     /// streams that are locally reset.
887     ///
888     /// The `max_concurrent_reset_streams` setting configures sets an upper
889     /// bound on the amount of state that is maintained. When this max value is
890     /// reached, the oldest reset stream is purged from memory.
891     ///
892     /// Once the stream has been fully purged from memory, any additional frames
893     /// received for that stream will result in a connection level protocol
894     /// error, forcing the connection to terminate.
895     ///
896     /// The default value is 10.
897     ///
898     /// # Examples
899     ///
900     /// ```
901     /// # use tokio::io::{AsyncRead, AsyncWrite};
902     /// # use h2::client::*;
903     /// # use bytes::Bytes;
904     /// #
905     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
906     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
907     /// # {
908     /// // `client_fut` is a future representing the completion of the HTTP/2
909     /// // handshake.
910     /// let client_fut = Builder::new()
911     ///     .max_concurrent_reset_streams(1000)
912     ///     .handshake(my_io);
913     /// # client_fut.await
914     /// # }
915     /// #
916     /// # pub fn main() {}
917     /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self918     pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
919         self.reset_stream_max = max;
920         self
921     }
922 
923     /// Sets the duration to remember locally reset streams.
924     ///
925     /// When a stream is explicitly reset, the HTTP/2 specification requires
926     /// that any further frames received for that stream must be ignored for
927     /// "some time".
928     ///
929     /// In order to satisfy the specification, internal state must be maintained
930     /// to implement the behavior. This state grows linearly with the number of
931     /// streams that are locally reset.
932     ///
933     /// The `reset_stream_duration` setting configures the max amount of time
934     /// this state will be maintained in memory. Once the duration elapses, the
935     /// stream state is purged from memory.
936     ///
937     /// Once the stream has been fully purged from memory, any additional frames
938     /// received for that stream will result in a connection level protocol
939     /// error, forcing the connection to terminate.
940     ///
941     /// The default value is 30 seconds.
942     ///
943     /// # Examples
944     ///
945     /// ```
946     /// # use tokio::io::{AsyncRead, AsyncWrite};
947     /// # use h2::client::*;
948     /// # use std::time::Duration;
949     /// # use bytes::Bytes;
950     /// #
951     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
952     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
953     /// # {
954     /// // `client_fut` is a future representing the completion of the HTTP/2
955     /// // handshake.
956     /// let client_fut = Builder::new()
957     ///     .reset_stream_duration(Duration::from_secs(10))
958     ///     .handshake(my_io);
959     /// # client_fut.await
960     /// # }
961     /// #
962     /// # pub fn main() {}
963     /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self964     pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
965         self.reset_stream_duration = dur;
966         self
967     }
968 
969     /// Sets the maximum send buffer size per stream.
970     ///
971     /// Once a stream has buffered up to (or over) the maximum, the stream's
972     /// flow control will not "poll" additional capacity. Once bytes for the
973     /// stream have been written to the connection, the send buffer capacity
974     /// will be freed up again.
975     ///
976     /// The default is currently ~400MB, but may change.
977     ///
978     /// # Panics
979     ///
980     /// This function panics if `max` is larger than `u32::MAX`.
max_send_buffer_size(&mut self, max: usize) -> &mut Self981     pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
982         assert!(max <= std::u32::MAX as usize);
983         self.max_send_buffer_size = max;
984         self
985     }
986 
987     /// Enables or disables server push promises.
988     ///
989     /// This value is included in the initial SETTINGS handshake. When set, the
990     /// server MUST NOT send a push promise. Setting this value to value to
991     /// false in the initial SETTINGS handshake guarantees that the remote server
992     /// will never send a push promise.
993     ///
994     /// This setting can be changed during the life of a single HTTP/2
995     /// connection by sending another settings frame updating the value.
996     ///
997     /// Default value: `true`.
998     ///
999     /// # Examples
1000     ///
1001     /// ```
1002     /// # use tokio::io::{AsyncRead, AsyncWrite};
1003     /// # use h2::client::*;
1004     /// # use std::time::Duration;
1005     /// # use bytes::Bytes;
1006     /// #
1007     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1008     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1009     /// # {
1010     /// // `client_fut` is a future representing the completion of the HTTP/2
1011     /// // handshake.
1012     /// let client_fut = Builder::new()
1013     ///     .enable_push(false)
1014     ///     .handshake(my_io);
1015     /// # client_fut.await
1016     /// # }
1017     /// #
1018     /// # pub fn main() {}
1019     /// ```
enable_push(&mut self, enabled: bool) -> &mut Self1020     pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1021         self.settings.set_enable_push(enabled);
1022         self
1023     }
1024 
1025     /// Sets the first stream ID to something other than 1.
1026     #[cfg(feature = "unstable")]
initial_stream_id(&mut self, stream_id: u32) -> &mut Self1027     pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1028         self.stream_id = stream_id.into();
1029         assert!(
1030             self.stream_id.is_client_initiated(),
1031             "stream id must be odd"
1032         );
1033         self
1034     }
1035 
1036     /// Creates a new configured HTTP/2 client backed by `io`.
1037     ///
1038     /// It is expected that `io` already be in an appropriate state to commence
1039     /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1040     /// preface and the initial settings frame is sent by the client.
1041     ///
1042     /// The handshake future does not wait for the initial settings frame from the
1043     /// server.
1044     ///
1045     /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1046     /// tuple once the HTTP/2 handshake has been completed.
1047     ///
1048     /// This function also allows the caller to configure the send payload data
1049     /// type. See [Outbound data type] for more details.
1050     ///
1051     /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1052     /// [`Connection`]: struct.Connection.html
1053     /// [`SendRequest`]: struct.SendRequest.html
1054     /// [Outbound data type]: ../index.html#outbound-data-type.
1055     ///
1056     /// # Examples
1057     ///
1058     /// Basic usage:
1059     ///
1060     /// ```
1061     /// # use tokio::io::{AsyncRead, AsyncWrite};
1062     /// # use h2::client::*;
1063     /// # use bytes::Bytes;
1064     /// #
1065     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1066     ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1067     /// # {
1068     /// // `client_fut` is a future representing the completion of the HTTP/2
1069     /// // handshake.
1070     /// let client_fut = Builder::new()
1071     ///     .handshake(my_io);
1072     /// # client_fut.await
1073     /// # }
1074     /// #
1075     /// # pub fn main() {}
1076     /// ```
1077     ///
1078     /// Configures the send-payload data type. In this case, the outbound data
1079     /// type will be `&'static [u8]`.
1080     ///
1081     /// ```
1082     /// # use tokio::io::{AsyncRead, AsyncWrite};
1083     /// # use h2::client::*;
1084     /// #
1085     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1086     /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1087     /// # {
1088     /// // `client_fut` is a future representing the completion of the HTTP/2
1089     /// // handshake.
1090     /// let client_fut = Builder::new()
1091     ///     .handshake::<_, &'static [u8]>(my_io);
1092     /// # client_fut.await
1093     /// # }
1094     /// #
1095     /// # pub fn main() {}
1096     /// ```
handshake<T, B>( &self, io: T, ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> where T: AsyncRead + AsyncWrite + Unpin, B: Buf + 'static,1097     pub fn handshake<T, B>(
1098         &self,
1099         io: T,
1100     ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1101     where
1102         T: AsyncRead + AsyncWrite + Unpin,
1103         B: Buf + 'static,
1104     {
1105         Connection::handshake2(io, self.clone())
1106     }
1107 }
1108 
1109 impl Default for Builder {
default() -> Builder1110     fn default() -> Builder {
1111         Builder::new()
1112     }
1113 }
1114 
1115 /// Creates a new configured HTTP/2 client with default configuration
1116 /// values backed by `io`.
1117 ///
1118 /// It is expected that `io` already be in an appropriate state to commence
1119 /// the [HTTP/2 handshake]. See [Handshake] for more details.
1120 ///
1121 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1122 /// tuple once the HTTP/2 handshake has been completed. The returned
1123 /// [`Connection`] instance will be using default configuration values. Use
1124 /// [`Builder`] to customize the configuration values used by a [`Connection`]
1125 /// instance.
1126 ///
1127 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1128 /// [Handshake]: ../index.html#handshake
1129 /// [`Connection`]: struct.Connection.html
1130 /// [`SendRequest`]: struct.SendRequest.html
1131 ///
1132 /// # Examples
1133 ///
1134 /// ```
1135 /// # use tokio::io::{AsyncRead, AsyncWrite};
1136 /// # use h2::client;
1137 /// # use h2::client::*;
1138 /// #
1139 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1140 /// # {
1141 /// let (send_request, connection) = client::handshake(my_io).await?;
1142 /// // The HTTP/2 handshake has completed, now start polling
1143 /// // `connection` and use `send_request` to send requests to the
1144 /// // server.
1145 /// # Ok(())
1146 /// # }
1147 /// #
1148 /// # pub fn main() {}
1149 /// ```
handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error> where T: AsyncRead + AsyncWrite + Unpin,1150 pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1151 where
1152     T: AsyncRead + AsyncWrite + Unpin,
1153 {
1154     let builder = Builder::new();
1155     builder
1156         .handshake(io)
1157         .instrument(tracing::trace_span!("client_handshake", io = %std::any::type_name::<T>()))
1158         .await
1159 }
1160 
1161 // ===== impl Connection =====
1162 
bind_connection<T>(io: &mut T) -> Result<(), crate::Error> where T: AsyncRead + AsyncWrite + Unpin,1163 async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1164 where
1165     T: AsyncRead + AsyncWrite + Unpin,
1166 {
1167     tracing::debug!("binding client connection");
1168 
1169     let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1170     io.write_all(msg).await.map_err(crate::Error::from_io)?;
1171 
1172     tracing::debug!("client connection bound");
1173 
1174     Ok(())
1175 }
1176 
1177 impl<T, B> Connection<T, B>
1178 where
1179     T: AsyncRead + AsyncWrite + Unpin,
1180     B: Buf + 'static,
1181 {
handshake2( mut io: T, builder: Builder, ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error>1182     async fn handshake2(
1183         mut io: T,
1184         builder: Builder,
1185     ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1186         bind_connection(&mut io).await?;
1187 
1188         // Create the codec
1189         let mut codec = Codec::new(io);
1190 
1191         if let Some(max) = builder.settings.max_frame_size() {
1192             codec.set_max_recv_frame_size(max as usize);
1193         }
1194 
1195         if let Some(max) = builder.settings.max_header_list_size() {
1196             codec.set_max_recv_header_list_size(max as usize);
1197         }
1198 
1199         // Send initial settings frame
1200         codec
1201             .buffer(builder.settings.clone().into())
1202             .expect("invalid SETTINGS frame");
1203 
1204         let inner = proto::Connection::new(
1205             codec,
1206             proto::Config {
1207                 next_stream_id: builder.stream_id,
1208                 initial_max_send_streams: builder.initial_max_send_streams,
1209                 max_send_buffer_size: builder.max_send_buffer_size,
1210                 reset_stream_duration: builder.reset_stream_duration,
1211                 reset_stream_max: builder.reset_stream_max,
1212                 settings: builder.settings.clone(),
1213             },
1214         );
1215         let send_request = SendRequest {
1216             inner: inner.streams().clone(),
1217             pending: None,
1218         };
1219 
1220         let mut connection = Connection { inner };
1221         if let Some(sz) = builder.initial_target_connection_window_size {
1222             connection.set_target_window_size(sz);
1223         }
1224 
1225         Ok((send_request, connection))
1226     }
1227 
1228     /// Sets the target window size for the whole connection.
1229     ///
1230     /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1231     /// frame will be immediately sent to the remote, increasing the connection
1232     /// level window by `size - current_value`.
1233     ///
1234     /// If `size` is less than the current value, nothing will happen
1235     /// immediately. However, as window capacity is released by
1236     /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1237     /// out until the number of "in flight" bytes drops below `size`.
1238     ///
1239     /// The default value is 65,535.
1240     ///
1241     /// See [`FlowControl`] documentation for more details.
1242     ///
1243     /// [`FlowControl`]: ../struct.FlowControl.html
1244     /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)1245     pub fn set_target_window_size(&mut self, size: u32) {
1246         assert!(size <= proto::MAX_WINDOW_SIZE);
1247         self.inner.set_target_window_size(size);
1248     }
1249 
1250     /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1251     /// flow control for received data.
1252     ///
1253     /// The `SETTINGS` will be sent to the remote, and only applied once the
1254     /// remote acknowledges the change.
1255     ///
1256     /// This can be used to increase or decrease the window size for existing
1257     /// streams.
1258     ///
1259     /// # Errors
1260     ///
1261     /// Returns an error if a previous call is still pending acknowledgement
1262     /// from the remote endpoint.
set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error>1263     pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1264         assert!(size <= proto::MAX_WINDOW_SIZE);
1265         self.inner.set_initial_window_size(size)?;
1266         Ok(())
1267     }
1268 
1269     /// Takes a `PingPong` instance from the connection.
1270     ///
1271     /// # Note
1272     ///
1273     /// This may only be called once. Calling multiple times will return `None`.
ping_pong(&mut self) -> Option<PingPong>1274     pub fn ping_pong(&mut self) -> Option<PingPong> {
1275         self.inner.take_user_pings().map(PingPong::new)
1276     }
1277 
1278     /// Returns the maximum number of concurrent streams that may be initiated
1279     /// by this client.
1280     ///
1281     /// This limit is configured by the server peer by sending the
1282     /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1283     /// This method returns the currently acknowledged value recieved from the
1284     /// remote.
1285     ///
1286     /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
max_concurrent_send_streams(&self) -> usize1287     pub fn max_concurrent_send_streams(&self) -> usize {
1288         self.inner.max_send_streams()
1289     }
1290     /// Returns the maximum number of concurrent streams that may be initiated
1291     /// by the server on this connection.
1292     ///
1293     /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1294     /// parameter][1] sent in a `SETTINGS` frame that has been
1295     /// acknowledged by the remote peer. The value to be sent is configured by
1296     /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1297     /// with the remote peer.
1298     ///
1299     /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1300     /// [2]: ../struct.Builder.html#method.max_concurrent_streams
max_concurrent_recv_streams(&self) -> usize1301     pub fn max_concurrent_recv_streams(&self) -> usize {
1302         self.inner.max_recv_streams()
1303     }
1304 }
1305 
1306 impl<T, B> Future for Connection<T, B>
1307 where
1308     T: AsyncRead + AsyncWrite + Unpin,
1309     B: Buf + 'static,
1310 {
1311     type Output = Result<(), crate::Error>;
1312 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1313     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1314         self.inner.maybe_close_connection_if_no_streams();
1315         self.inner.poll(cx).map_err(Into::into)
1316     }
1317 }
1318 
1319 impl<T, B> fmt::Debug for Connection<T, B>
1320 where
1321     T: AsyncRead + AsyncWrite,
1322     T: fmt::Debug,
1323     B: fmt::Debug + Buf,
1324 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1325     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1326         fmt::Debug::fmt(&self.inner, fmt)
1327     }
1328 }
1329 
1330 // ===== impl ResponseFuture =====
1331 
1332 impl Future for ResponseFuture {
1333     type Output = Result<Response<RecvStream>, crate::Error>;
1334 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1335     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1336         let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1337         let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1338 
1339         Poll::Ready(Ok(Response::from_parts(parts, body)))
1340     }
1341 }
1342 
1343 impl ResponseFuture {
1344     /// Returns the stream ID of the response stream.
1345     ///
1346     /// # Panics
1347     ///
1348     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1349     pub fn stream_id(&self) -> crate::StreamId {
1350         crate::StreamId::from_internal(self.inner.stream_id())
1351     }
1352     /// Returns a stream of PushPromises
1353     ///
1354     /// # Panics
1355     ///
1356     /// If this method has been called before
1357     /// or the stream was itself was pushed
push_promises(&mut self) -> PushPromises1358     pub fn push_promises(&mut self) -> PushPromises {
1359         if self.push_promise_consumed {
1360             panic!("Reference to push promises stream taken!");
1361         }
1362         self.push_promise_consumed = true;
1363         PushPromises {
1364             inner: self.inner.clone(),
1365         }
1366     }
1367 }
1368 
1369 // ===== impl PushPromises =====
1370 
1371 impl PushPromises {
1372     /// Get the next `PushPromise`.
push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>>1373     pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1374         futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
1375     }
1376 
1377     #[doc(hidden)]
poll_push_promise( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<PushPromise, crate::Error>>>1378     pub fn poll_push_promise(
1379         &mut self,
1380         cx: &mut Context<'_>,
1381     ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1382         match self.inner.poll_pushed(cx) {
1383             Poll::Ready(Some(Ok((request, response)))) => {
1384                 let response = PushedResponseFuture {
1385                     inner: ResponseFuture {
1386                         inner: response,
1387                         push_promise_consumed: false,
1388                     },
1389                 };
1390                 Poll::Ready(Some(Ok(PushPromise { request, response })))
1391             }
1392             Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1393             Poll::Ready(None) => Poll::Ready(None),
1394             Poll::Pending => Poll::Pending,
1395         }
1396     }
1397 }
1398 
1399 #[cfg(feature = "stream")]
1400 impl futures_core::Stream for PushPromises {
1401     type Item = Result<PushPromise, crate::Error>;
1402 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>1403     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1404         self.poll_push_promise(cx)
1405     }
1406 }
1407 
1408 // ===== impl PushPromise =====
1409 
1410 impl PushPromise {
1411     /// Returns a reference to the push promise's request headers.
request(&self) -> &Request<()>1412     pub fn request(&self) -> &Request<()> {
1413         &self.request
1414     }
1415 
1416     /// Returns a mutable reference to the push promise's request headers.
request_mut(&mut self) -> &mut Request<()>1417     pub fn request_mut(&mut self) -> &mut Request<()> {
1418         &mut self.request
1419     }
1420 
1421     /// Consumes `self`, returning the push promise's request headers and
1422     /// response future.
into_parts(self) -> (Request<()>, PushedResponseFuture)1423     pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1424         (self.request, self.response)
1425     }
1426 }
1427 
1428 // ===== impl PushedResponseFuture =====
1429 
1430 impl Future for PushedResponseFuture {
1431     type Output = Result<Response<RecvStream>, crate::Error>;
1432 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1433     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1434         Pin::new(&mut self.inner).poll(cx)
1435     }
1436 }
1437 
1438 impl PushedResponseFuture {
1439     /// Returns the stream ID of the response stream.
1440     ///
1441     /// # Panics
1442     ///
1443     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1444     pub fn stream_id(&self) -> crate::StreamId {
1445         self.inner.stream_id()
1446     }
1447 }
1448 
1449 // ===== impl Peer =====
1450 
1451 impl Peer {
convert_send_message( id: StreamId, request: Request<()>, protocol: Option<Protocol>, end_of_stream: bool, ) -> Result<Headers, SendError>1452     pub fn convert_send_message(
1453         id: StreamId,
1454         request: Request<()>,
1455         protocol: Option<Protocol>,
1456         end_of_stream: bool,
1457     ) -> Result<Headers, SendError> {
1458         use http::request::Parts;
1459 
1460         let (
1461             Parts {
1462                 method,
1463                 uri,
1464                 headers,
1465                 version,
1466                 ..
1467             },
1468             _,
1469         ) = request.into_parts();
1470 
1471         let is_connect = method == Method::CONNECT;
1472 
1473         // Build the set pseudo header set. All requests will include `method`
1474         // and `path`.
1475         let mut pseudo = Pseudo::request(method, uri, protocol);
1476 
1477         if pseudo.scheme.is_none() {
1478             // If the scheme is not set, then there are a two options.
1479             //
1480             // 1) Authority is not set. In this case, a request was issued with
1481             //    a relative URI. This is permitted **only** when forwarding
1482             //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1483             //    this is an error.
1484             //
1485             // 2) Authority is set, then the HTTP method *must* be CONNECT.
1486             //
1487             // It is not possible to have a scheme but not an authority set (the
1488             // `http` crate does not allow it).
1489             //
1490             if pseudo.authority.is_none() {
1491                 if version == Version::HTTP_2 {
1492                     return Err(UserError::MissingUriSchemeAndAuthority.into());
1493                 } else {
1494                     // This is acceptable as per the above comment. However,
1495                     // HTTP/2 requires that a scheme is set. Since we are
1496                     // forwarding an HTTP 1.1 request, the scheme is set to
1497                     // "http".
1498                     pseudo.set_scheme(uri::Scheme::HTTP);
1499                 }
1500             } else if !is_connect {
1501                 // TODO: Error
1502             }
1503         }
1504 
1505         // Create the HEADERS frame
1506         let mut frame = Headers::new(id, pseudo, headers);
1507 
1508         if end_of_stream {
1509             frame.set_end_stream()
1510         }
1511 
1512         Ok(frame)
1513     }
1514 }
1515 
1516 impl proto::Peer for Peer {
1517     type Poll = Response<()>;
1518 
1519     const NAME: &'static str = "Client";
1520 
1521     fn r#dyn() -> proto::DynPeer {
1522         proto::DynPeer::Client
1523     }
1524 
is_server() -> bool1525     fn is_server() -> bool {
1526         false
1527     }
1528 
convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result<Self::Poll, Error>1529     fn convert_poll_message(
1530         pseudo: Pseudo,
1531         fields: HeaderMap,
1532         stream_id: StreamId,
1533     ) -> Result<Self::Poll, Error> {
1534         let mut b = Response::builder();
1535 
1536         b = b.version(Version::HTTP_2);
1537 
1538         if let Some(status) = pseudo.status {
1539             b = b.status(status);
1540         }
1541 
1542         let mut response = match b.body(()) {
1543             Ok(response) => response,
1544             Err(_) => {
1545                 // TODO: Should there be more specialized handling for different
1546                 // kinds of errors
1547                 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1548             }
1549         };
1550 
1551         *response.headers_mut() = fields;
1552 
1553         Ok(response)
1554     }
1555 }
1556