1 //! Client implementation of the HTTP/2 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2 client requires the caller to establish the underlying
6 //! connection as well as get the connection to a state that is ready to begin
7 //! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11 //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12 //!
13 //! Once a connection is obtained, it is passed to [`handshake`], which will
14 //! begin the [HTTP/2 handshake]. This returns a future that completes once
15 //! the handshake process is performed and HTTP/2 streams may be initialized.
16 //!
17 //! [`handshake`] uses default configuration values. There are a number of
18 //! settings that can be changed by using [`Builder`] instead.
19 //!
20 //! Once the handshake future completes, the caller is provided with a
21 //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22 //! instance is used to drive the connection (see [Managing the connection]).
23 //! The [`SendRequest`] instance is used to initialize new streams (see [Making
24 //! requests]).
25 //!
26 //! # Making requests
27 //!
28 //! Requests are made using the [`SendRequest`] handle provided by the handshake
29 //! future. Once a request is submitted, an HTTP/2 stream is initialized and
30 //! the request is sent to the server.
31 //!
32 //! A request body and request trailers are sent using [`SendRequest`] and the
33 //! server's response is returned once the [`ResponseFuture`] future completes.
34 //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35 //! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36 //! initialized by the sent request.
37 //!
38 //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39 //! stream can be created, i.e. as long as the current number of active streams
40 //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41 //! caller will be notified once an existing stream closes, freeing capacity for
42 //! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43 //! capacity before sending a request to the server.
44 //!
45 //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46 //! must not send a request if `poll_ready` does not return `Ready`. Attempting
47 //! to do so will result in an [`Error`] being returned.
48 //!
49 //! # Managing the connection
50 //!
51 //! The [`Connection`] instance is used to manage connection state. The caller
52 //! is required to call [`Connection::poll`] in order to advance state.
53 //! [`SendRequest::send_request`] and other functions have no effect unless
54 //! [`Connection::poll`] is called.
55 //!
56 //! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57 //! returns `Ready`. At this point, the underlying socket has been closed and no
58 //! further work needs to be done.
59 //!
60 //! The easiest way to ensure that the [`Connection`] instance gets polled is to
61 //! submit the [`Connection`] instance to an [executor]. The executor will then
62 //! manage polling the connection until the connection is complete.
63 //! Alternatively, the caller can call `poll` manually.
64 //!
65 //! # Example
66 //!
67 //! ```rust, no_run
68 //!
69 //! use h2::client;
70 //!
71 //! use http::{Request, Method};
72 //! use std::error::Error;
73 //! use tokio::net::TcpStream;
74 //!
75 //! #[tokio::main]
76 //! pub async fn main() -> Result<(), Box<dyn Error>> {
77 //!     // Establish TCP connection to the server.
78 //!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79 //!     let (h2, connection) = client::handshake(tcp).await?;
80 //!     tokio::spawn(async move {
81 //!         connection.await.unwrap();
82 //!     });
83 //!
84 //!     let mut h2 = h2.ready().await?;
85 //!     // Prepare the HTTP request to send to the server.
86 //!     let request = Request::builder()
87 //!                     .method(Method::GET)
88 //!                     .uri("https://www.example.com/")
89 //!                     .body(())
90 //!                     .unwrap();
91 //!
92 //!     // Send the request. The second tuple item allows the caller
93 //!     // to stream a request body.
94 //!     let (response, _) = h2.send_request(request, true).unwrap();
95 //!
96 //!     let (head, mut body) = response.await?.into_parts();
97 //!
98 //!     println!("Received response: {:?}", head);
99 //!
100 //!     // The `flow_control` handle allows the caller to manage
101 //!     // flow control.
102 //!     //
103 //!     // Whenever data is received, the caller is responsible for
104 //!     // releasing capacity back to the server once it has freed
105 //!     // the data from memory.
106 //!     let mut flow_control = body.flow_control().clone();
107 //!
108 //!     while let Some(chunk) = body.data().await {
109 //!         let chunk = chunk?;
110 //!         println!("RX: {:?}", chunk);
111 //!
112 //!         // Let the server send more data.
113 //!         let _ = flow_control.release_capacity(chunk.len());
114 //!     }
115 //!
116 //!     Ok(())
117 //! }
118 //! ```
119 //!
120 //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121 //! [`handshake`]: fn.handshake.html
122 //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123 //! [`SendRequest`]: struct.SendRequest.html
124 //! [`SendStream`]: ../struct.SendStream.html
125 //! [Making requests]: #making-requests
126 //! [Managing the connection]: #managing-the-connection
127 //! [`Connection`]: struct.Connection.html
128 //! [`Connection::poll`]: struct.Connection.html#method.poll
129 //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130 //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131 //! [`SendRequest`]: struct.SendRequest.html
132 //! [`ResponseFuture`]: struct.ResponseFuture.html
133 //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134 //! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135 //! [`Builder`]: struct.Builder.html
136 //! [`Error`]: ../struct.Error.html
137 
138 use crate::codec::{Codec, SendError, UserError};
139 use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
140 use crate::proto::{self, Error};
141 use crate::{FlowControl, PingPong, RecvStream, SendStream};
142 
143 use bytes::{Buf, Bytes};
144 use http::{uri, HeaderMap, Method, Request, Response, Version};
145 use std::fmt;
146 use std::future::Future;
147 use std::pin::Pin;
148 use std::task::{Context, Poll};
149 use std::time::Duration;
150 use std::usize;
151 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
152 use tracing::Instrument;
153 
154 /// Initializes new HTTP/2 streams on a connection by sending a request.
155 ///
156 /// This type does no work itself. Instead, it is a handle to the inner
157 /// connection state held by [`Connection`]. If the associated connection
158 /// instance is dropped, all `SendRequest` functions will return [`Error`].
159 ///
160 /// [`SendRequest`] instances are able to move to and operate on separate tasks
161 /// / threads than their associated [`Connection`] instance. Internally, there
162 /// is a buffer used to stage requests before they get written to the
163 /// connection. There is no guarantee that requests get written to the
164 /// connection in FIFO order as HTTP/2 prioritization logic can play a role.
165 ///
166 /// [`SendRequest`] implements [`Clone`], enabling the creation of many
167 /// instances that are backed by a single connection.
168 ///
169 /// See [module] level documentation for more details.
170 ///
171 /// [module]: index.html
172 /// [`Connection`]: struct.Connection.html
173 /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
174 /// [`Error`]: ../struct.Error.html
175 pub struct SendRequest<B: Buf> {
176     inner: proto::Streams<B, Peer>,
177     pending: Option<proto::OpaqueStreamRef>,
178 }
179 
180 /// Returns a `SendRequest` instance once it is ready to send at least one
181 /// request.
182 #[derive(Debug)]
183 pub struct ReadySendRequest<B: Buf> {
184     inner: Option<SendRequest<B>>,
185 }
186 
187 /// Manages all state associated with an HTTP/2 client connection.
188 ///
189 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
190 /// implements the HTTP/2 client logic for that connection. It is responsible
191 /// for driving the internal state forward, performing the work requested of the
192 /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
193 /// [`RecvStream`]).
194 ///
195 /// `Connection` values are created by calling [`handshake`]. Once a
196 /// `Connection` value is obtained, the caller must repeatedly call [`poll`]
197 /// until `Ready` is returned. The easiest way to do this is to submit the
198 /// `Connection` instance to an [executor].
199 ///
200 /// [module]: index.html
201 /// [`handshake`]: fn.handshake.html
202 /// [`SendRequest`]: struct.SendRequest.html
203 /// [`ResponseFuture`]: struct.ResponseFuture.html
204 /// [`SendStream`]: ../struct.SendStream.html
205 /// [`RecvStream`]: ../struct.RecvStream.html
206 /// [`poll`]: #method.poll
207 /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// # use tokio::io::{AsyncRead, AsyncWrite};
213 /// # use h2::client;
214 /// # use h2::client::*;
215 /// #
216 /// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
217 /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
218 /// # {
219 ///     let (send_request, connection) = client::handshake(my_io).await?;
220 ///     // Submit the connection handle to an executor.
221 ///     tokio::spawn(async { connection.await.expect("connection failed"); });
222 ///
223 ///     // Now, use `send_request` to initialize HTTP/2 streams.
224 ///     // ...
225 /// # Ok(())
226 /// # }
227 /// #
228 /// # pub fn main() {}
229 /// ```
230 #[must_use = "futures do nothing unless polled"]
231 pub struct Connection<T, B: Buf = Bytes> {
232     inner: proto::Connection<T, Peer, B>,
233 }
234 
235 /// A future of an HTTP response.
236 #[derive(Debug)]
237 #[must_use = "futures do nothing unless polled"]
238 pub struct ResponseFuture {
239     inner: proto::OpaqueStreamRef,
240     push_promise_consumed: bool,
241 }
242 
243 /// A future of a pushed HTTP response.
244 ///
245 /// We have to differentiate between pushed and non pushed because of the spec
246 /// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
247 /// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
248 /// > that is in either the "open" or "half-closed (remote)" state.
249 #[derive(Debug)]
250 #[must_use = "futures do nothing unless polled"]
251 pub struct PushedResponseFuture {
252     inner: ResponseFuture,
253 }
254 
255 /// A pushed response and corresponding request headers
256 #[derive(Debug)]
257 pub struct PushPromise {
258     /// The request headers
259     request: Request<()>,
260 
261     /// The pushed response
262     response: PushedResponseFuture,
263 }
264 
265 /// A stream of pushed responses and corresponding promised requests
266 #[derive(Debug)]
267 #[must_use = "streams do nothing unless polled"]
268 pub struct PushPromises {
269     inner: proto::OpaqueStreamRef,
270 }
271 
272 /// Builds client connections with custom configuration values.
273 ///
274 /// Methods can be chained in order to set the configuration values.
275 ///
276 /// The client is constructed by calling [`handshake`] and passing the I/O
277 /// handle that will back the HTTP/2 server.
278 ///
279 /// New instances of `Builder` are obtained via [`Builder::new`].
280 ///
281 /// See function level documentation for details on the various client
282 /// configuration settings.
283 ///
284 /// [`Builder::new`]: struct.Builder.html#method.new
285 /// [`handshake`]: struct.Builder.html#method.handshake
286 ///
287 /// # Examples
288 ///
289 /// ```
290 /// # use tokio::io::{AsyncRead, AsyncWrite};
291 /// # use h2::client::*;
292 /// # use bytes::Bytes;
293 /// #
294 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
295 ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
296 /// # {
297 /// // `client_fut` is a future representing the completion of the HTTP/2
298 /// // handshake.
299 /// let client_fut = Builder::new()
300 ///     .initial_window_size(1_000_000)
301 ///     .max_concurrent_streams(1000)
302 ///     .handshake(my_io);
303 /// # client_fut.await
304 /// # }
305 /// #
306 /// # pub fn main() {}
307 /// ```
308 #[derive(Clone, Debug)]
309 pub struct Builder {
310     /// Time to keep locally reset streams around before reaping.
311     reset_stream_duration: Duration,
312 
313     /// Initial maximum number of locally initiated (send) streams.
314     /// After receiving a Settings frame from the remote peer,
315     /// the connection will overwrite this value with the
316     /// MAX_CONCURRENT_STREAMS specified in the frame.
317     initial_max_send_streams: usize,
318 
319     /// Initial target window size for new connections.
320     initial_target_connection_window_size: Option<u32>,
321 
322     /// Maximum number of locally reset streams to keep at a time.
323     reset_stream_max: usize,
324 
325     /// Initial `Settings` frame to send as part of the handshake.
326     settings: Settings,
327 
328     /// The stream ID of the first (lowest) stream. Subsequent streams will use
329     /// monotonically increasing stream IDs.
330     stream_id: StreamId,
331 }
332 
333 #[derive(Debug)]
334 pub(crate) struct Peer;
335 
336 // ===== impl SendRequest =====
337 
338 impl<B> SendRequest<B>
339 where
340     B: Buf + 'static,
341 {
342     /// Returns `Ready` when the connection can initialize a new HTTP/2
343     /// stream.
344     ///
345     /// This function must return `Ready` before `send_request` is called. When
346     /// `Poll::Pending` is returned, the task will be notified once the readiness
347     /// state changes.
348     ///
349     /// See [module] level docs for more details.
350     ///
351     /// [module]: index.html
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>352     pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
353         ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
354         self.pending = None;
355         Poll::Ready(Ok(()))
356     }
357 
358     /// Consumes `self`, returning a future that returns `self` back once it is
359     /// ready to send a request.
360     ///
361     /// This function should be called before calling `send_request`.
362     ///
363     /// This is a functional combinator for [`poll_ready`]. The returned future
364     /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
365     /// the caller.
366     ///
367     /// # Examples
368     ///
369     /// ```rust
370     /// # use h2::client::*;
371     /// # use http::*;
372     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
373     /// # {
374     /// // First, wait until the `send_request` handle is ready to send a new
375     /// // request
376     /// let mut send_request = send_request.ready().await.unwrap();
377     /// // Use `send_request` here.
378     /// # }
379     /// # pub fn main() {}
380     /// ```
381     ///
382     /// See [module] level docs for more details.
383     ///
384     /// [`poll_ready`]: #method.poll_ready
385     /// [module]: index.html
ready(self) -> ReadySendRequest<B>386     pub fn ready(self) -> ReadySendRequest<B> {
387         ReadySendRequest { inner: Some(self) }
388     }
389 
390     /// Sends a HTTP/2 request to the server.
391     ///
392     /// `send_request` initializes a new HTTP/2 stream on the associated
393     /// connection, then sends the given request using this new stream. Only the
394     /// request head is sent.
395     ///
396     /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
397     /// are returned. The [`ResponseFuture`] instance is used to get the
398     /// server's response and the [`SendStream`] instance is used to send a
399     /// request body or trailers to the server over the same HTTP/2 stream.
400     ///
401     /// To send a request body or trailers, set `end_of_stream` to `false`.
402     /// Then, use the returned [`SendStream`] instance to stream request body
403     /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
404     /// then attempting to call [`SendStream::send_data`] or
405     /// [`SendStream::send_trailers`] will result in an error.
406     ///
407     /// If no request body or trailers are to be sent, set `end_of_stream` to
408     /// `true` and drop the returned [`SendStream`] instance.
409     ///
410     /// # A note on HTTP versions
411     ///
412     /// The provided `Request` will be encoded differently depending on the
413     /// value of its version field. If the version is set to 2.0, then the
414     /// request is encoded as per the specification recommends.
415     ///
416     /// If the version is set to a lower value, then the request is encoded to
417     /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
418     /// headers are permitted and the `:authority` pseudo header is not
419     /// included.
420     ///
421     /// The caller should always set the request's version field to 2.0 unless
422     /// specifically transmitting an HTTP 1.1 request over 2.0.
423     ///
424     /// # Examples
425     ///
426     /// Sending a request with no body
427     ///
428     /// ```rust
429     /// # use h2::client::*;
430     /// # use http::*;
431     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
432     /// # {
433     /// // First, wait until the `send_request` handle is ready to send a new
434     /// // request
435     /// let mut send_request = send_request.ready().await.unwrap();
436     /// // Prepare the HTTP request to send to the server.
437     /// let request = Request::get("https://www.example.com/")
438     ///     .body(())
439     ///     .unwrap();
440     ///
441     /// // Send the request to the server. Since we are not sending a
442     /// // body or trailers, we can drop the `SendStream` instance.
443     /// let (response, _) = send_request.send_request(request, true).unwrap();
444     /// let response = response.await.unwrap();
445     /// // Process the response
446     /// # }
447     /// # pub fn main() {}
448     /// ```
449     ///
450     /// Sending a request with a body and trailers
451     ///
452     /// ```rust
453     /// # use h2::client::*;
454     /// # use http::*;
455     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
456     /// # {
457     /// // First, wait until the `send_request` handle is ready to send a new
458     /// // request
459     /// let mut send_request = send_request.ready().await.unwrap();
460     ///
461     /// // Prepare the HTTP request to send to the server.
462     /// let request = Request::get("https://www.example.com/")
463     ///     .body(())
464     ///     .unwrap();
465     ///
466     /// // Send the request to the server. If we are not sending a
467     /// // body or trailers, we can drop the `SendStream` instance.
468     /// let (response, mut send_stream) = send_request
469     ///     .send_request(request, false).unwrap();
470     ///
471     /// // At this point, one option would be to wait for send capacity.
472     /// // Doing so would allow us to not hold data in memory that
473     /// // cannot be sent. However, this is not a requirement, so this
474     /// // example will skip that step. See `SendStream` documentation
475     /// // for more details.
476     /// send_stream.send_data(b"hello", false).unwrap();
477     /// send_stream.send_data(b"world", false).unwrap();
478     ///
479     /// // Send the trailers.
480     /// let mut trailers = HeaderMap::new();
481     /// trailers.insert(
482     ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
483     ///     header::HeaderValue::from_bytes(b"hello").unwrap());
484     ///
485     /// send_stream.send_trailers(trailers).unwrap();
486     ///
487     /// let response = response.await.unwrap();
488     /// // Process the response
489     /// # }
490     /// # pub fn main() {}
491     /// ```
492     ///
493     /// [`ResponseFuture`]: struct.ResponseFuture.html
494     /// [`SendStream`]: ../struct.SendStream.html
495     /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
496     /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
send_request( &mut self, request: Request<()>, end_of_stream: bool, ) -> Result<(ResponseFuture, SendStream<B>), crate::Error>497     pub fn send_request(
498         &mut self,
499         request: Request<()>,
500         end_of_stream: bool,
501     ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
502         self.inner
503             .send_request(request, end_of_stream, self.pending.as_ref())
504             .map_err(Into::into)
505             .map(|stream| {
506                 if stream.is_pending_open() {
507                     self.pending = Some(stream.clone_to_opaque());
508                 }
509 
510                 let response = ResponseFuture {
511                     inner: stream.clone_to_opaque(),
512                     push_promise_consumed: false,
513                 };
514 
515                 let stream = SendStream::new(stream);
516 
517                 (response, stream)
518             })
519     }
520 }
521 
522 impl<B> fmt::Debug for SendRequest<B>
523 where
524     B: Buf,
525 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result526     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
527         fmt.debug_struct("SendRequest").finish()
528     }
529 }
530 
531 impl<B> Clone for SendRequest<B>
532 where
533     B: Buf,
534 {
clone(&self) -> Self535     fn clone(&self) -> Self {
536         SendRequest {
537             inner: self.inner.clone(),
538             pending: None,
539         }
540     }
541 }
542 
543 #[cfg(feature = "unstable")]
544 impl<B> SendRequest<B>
545 where
546     B: Buf,
547 {
548     /// Returns the number of active streams.
549     ///
550     /// An active stream is a stream that has not yet transitioned to a closed
551     /// state.
num_active_streams(&self) -> usize552     pub fn num_active_streams(&self) -> usize {
553         self.inner.num_active_streams()
554     }
555 
556     /// Returns the number of streams that are held in memory.
557     ///
558     /// A wired stream is a stream that is either active or is closed but must
559     /// stay in memory for some reason. For example, there are still outstanding
560     /// userspace handles pointing to the slot.
num_wired_streams(&self) -> usize561     pub fn num_wired_streams(&self) -> usize {
562         self.inner.num_wired_streams()
563     }
564 }
565 
566 // ===== impl ReadySendRequest =====
567 
568 impl<B> Future for ReadySendRequest<B>
569 where
570     B: Buf + 'static,
571 {
572     type Output = Result<SendRequest<B>, crate::Error>;
573 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>574     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
575         match &mut self.inner {
576             Some(send_request) => {
577                 ready!(send_request.poll_ready(cx))?;
578             }
579             None => panic!("called `poll` after future completed"),
580         }
581 
582         Poll::Ready(Ok(self.inner.take().unwrap()))
583     }
584 }
585 
586 // ===== impl Builder =====
587 
588 impl Builder {
589     /// Returns a new client builder instance initialized with default
590     /// configuration values.
591     ///
592     /// Configuration methods can be chained on the return value.
593     ///
594     /// # Examples
595     ///
596     /// ```
597     /// # use tokio::io::{AsyncRead, AsyncWrite};
598     /// # use h2::client::*;
599     /// # use bytes::Bytes;
600     /// #
601     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
602     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
603     /// # {
604     /// // `client_fut` is a future representing the completion of the HTTP/2
605     /// // handshake.
606     /// let client_fut = Builder::new()
607     ///     .initial_window_size(1_000_000)
608     ///     .max_concurrent_streams(1000)
609     ///     .handshake(my_io);
610     /// # client_fut.await
611     /// # }
612     /// #
613     /// # pub fn main() {}
614     /// ```
new() -> Builder615     pub fn new() -> Builder {
616         Builder {
617             reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
618             reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
619             initial_target_connection_window_size: None,
620             initial_max_send_streams: usize::MAX,
621             settings: Default::default(),
622             stream_id: 1.into(),
623         }
624     }
625 
626     /// Indicates the initial window size (in octets) for stream-level
627     /// flow control for received data.
628     ///
629     /// The initial window of a stream is used as part of flow control. For more
630     /// details, see [`FlowControl`].
631     ///
632     /// The default value is 65,535.
633     ///
634     /// [`FlowControl`]: ../struct.FlowControl.html
635     ///
636     /// # Examples
637     ///
638     /// ```
639     /// # use tokio::io::{AsyncRead, AsyncWrite};
640     /// # use h2::client::*;
641     /// # use bytes::Bytes;
642     /// #
643     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
644     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
645     /// # {
646     /// // `client_fut` is a future representing the completion of the HTTP/2
647     /// // handshake.
648     /// let client_fut = Builder::new()
649     ///     .initial_window_size(1_000_000)
650     ///     .handshake(my_io);
651     /// # client_fut.await
652     /// # }
653     /// #
654     /// # pub fn main() {}
655     /// ```
initial_window_size(&mut self, size: u32) -> &mut Self656     pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
657         self.settings.set_initial_window_size(Some(size));
658         self
659     }
660 
661     /// Indicates the initial window size (in octets) for connection-level flow control
662     /// for received data.
663     ///
664     /// The initial window of a connection is used as part of flow control. For more details,
665     /// see [`FlowControl`].
666     ///
667     /// The default value is 65,535.
668     ///
669     /// [`FlowControl`]: ../struct.FlowControl.html
670     ///
671     /// # Examples
672     ///
673     /// ```
674     /// # use tokio::io::{AsyncRead, AsyncWrite};
675     /// # use h2::client::*;
676     /// # use bytes::Bytes;
677     /// #
678     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
679     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
680     /// # {
681     /// // `client_fut` is a future representing the completion of the HTTP/2
682     /// // handshake.
683     /// let client_fut = Builder::new()
684     ///     .initial_connection_window_size(1_000_000)
685     ///     .handshake(my_io);
686     /// # client_fut.await
687     /// # }
688     /// #
689     /// # pub fn main() {}
690     /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self691     pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
692         self.initial_target_connection_window_size = Some(size);
693         self
694     }
695 
696     /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
697     /// configured client is able to accept.
698     ///
699     /// The sender may send data frames that are **smaller** than this value,
700     /// but any data larger than `max` will be broken up into multiple `DATA`
701     /// frames.
702     ///
703     /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
704     ///
705     /// # Examples
706     ///
707     /// ```
708     /// # use tokio::io::{AsyncRead, AsyncWrite};
709     /// # use h2::client::*;
710     /// # use bytes::Bytes;
711     /// #
712     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
713     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
714     /// # {
715     /// // `client_fut` is a future representing the completion of the HTTP/2
716     /// // handshake.
717     /// let client_fut = Builder::new()
718     ///     .max_frame_size(1_000_000)
719     ///     .handshake(my_io);
720     /// # client_fut.await
721     /// # }
722     /// #
723     /// # pub fn main() {}
724     /// ```
725     ///
726     /// # Panics
727     ///
728     /// This function panics if `max` is not within the legal range specified
729     /// above.
max_frame_size(&mut self, max: u32) -> &mut Self730     pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
731         self.settings.set_max_frame_size(Some(max));
732         self
733     }
734 
735     /// Sets the max size of received header frames.
736     ///
737     /// This advisory setting informs a peer of the maximum size of header list
738     /// that the sender is prepared to accept, in octets. The value is based on
739     /// the uncompressed size of header fields, including the length of the name
740     /// and value in octets plus an overhead of 32 octets for each header field.
741     ///
742     /// This setting is also used to limit the maximum amount of data that is
743     /// buffered to decode HEADERS frames.
744     ///
745     /// # Examples
746     ///
747     /// ```
748     /// # use tokio::io::{AsyncRead, AsyncWrite};
749     /// # use h2::client::*;
750     /// # use bytes::Bytes;
751     /// #
752     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
753     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
754     /// # {
755     /// // `client_fut` is a future representing the completion of the HTTP/2
756     /// // handshake.
757     /// let client_fut = Builder::new()
758     ///     .max_header_list_size(16 * 1024)
759     ///     .handshake(my_io);
760     /// # client_fut.await
761     /// # }
762     /// #
763     /// # pub fn main() {}
764     /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self765     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
766         self.settings.set_max_header_list_size(Some(max));
767         self
768     }
769 
770     /// Sets the maximum number of concurrent streams.
771     ///
772     /// The maximum concurrent streams setting only controls the maximum number
773     /// of streams that can be initiated by the remote peer. In other words,
774     /// when this setting is set to 100, this does not limit the number of
775     /// concurrent streams that can be created by the caller.
776     ///
777     /// It is recommended that this value be no smaller than 100, so as to not
778     /// unnecessarily limit parallelism. However, any value is legal, including
779     /// 0. If `max` is set to 0, then the remote will not be permitted to
780     /// initiate streams.
781     ///
782     /// Note that streams in the reserved state, i.e., push promises that have
783     /// been reserved but the stream has not started, do not count against this
784     /// setting.
785     ///
786     /// Also note that if the remote *does* exceed the value set here, it is not
787     /// a protocol level error. Instead, the `h2` library will immediately reset
788     /// the stream.
789     ///
790     /// See [Section 5.1.2] in the HTTP/2 spec for more details.
791     ///
792     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
793     ///
794     /// # Examples
795     ///
796     /// ```
797     /// # use tokio::io::{AsyncRead, AsyncWrite};
798     /// # use h2::client::*;
799     /// # use bytes::Bytes;
800     /// #
801     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
802     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
803     /// # {
804     /// // `client_fut` is a future representing the completion of the HTTP/2
805     /// // handshake.
806     /// let client_fut = Builder::new()
807     ///     .max_concurrent_streams(1000)
808     ///     .handshake(my_io);
809     /// # client_fut.await
810     /// # }
811     /// #
812     /// # pub fn main() {}
813     /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self814     pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
815         self.settings.set_max_concurrent_streams(Some(max));
816         self
817     }
818 
819     /// Sets the initial maximum of locally initiated (send) streams.
820     ///
821     /// The initial settings will be overwritten by the remote peer when
822     /// the Settings frame is received. The new value will be set to the
823     /// `max_concurrent_streams()` from the frame.
824     ///
825     /// This setting prevents the caller from exceeding this number of
826     /// streams that are counted towards the concurrency limit.
827     ///
828     /// Sending streams past the limit returned by the peer will be treated
829     /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
830     ///
831     /// See [Section 5.1.2] in the HTTP/2 spec for more details.
832     ///
833     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
834     ///
835     /// # Examples
836     ///
837     /// ```
838     /// # use tokio::io::{AsyncRead, AsyncWrite};
839     /// # use h2::client::*;
840     /// # use bytes::Bytes;
841     /// #
842     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
843     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
844     /// # {
845     /// // `client_fut` is a future representing the completion of the HTTP/2
846     /// // handshake.
847     /// let client_fut = Builder::new()
848     ///     .initial_max_send_streams(1000)
849     ///     .handshake(my_io);
850     /// # client_fut.await
851     /// # }
852     /// #
853     /// # pub fn main() {}
854     /// ```
initial_max_send_streams(&mut self, initial: usize) -> &mut Self855     pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
856         self.initial_max_send_streams = initial;
857         self
858     }
859 
860     /// Sets the maximum number of concurrent locally reset streams.
861     ///
862     /// When a stream is explicitly reset, the HTTP/2 specification requires
863     /// that any further frames received for that stream must be ignored for
864     /// "some time".
865     ///
866     /// In order to satisfy the specification, internal state must be maintained
867     /// to implement the behavior. This state grows linearly with the number of
868     /// streams that are locally reset.
869     ///
870     /// The `max_concurrent_reset_streams` setting configures sets an upper
871     /// bound on the amount of state that is maintained. When this max value is
872     /// reached, the oldest reset stream is purged from memory.
873     ///
874     /// Once the stream has been fully purged from memory, any additional frames
875     /// received for that stream will result in a connection level protocol
876     /// error, forcing the connection to terminate.
877     ///
878     /// The default value is 10.
879     ///
880     /// # Examples
881     ///
882     /// ```
883     /// # use tokio::io::{AsyncRead, AsyncWrite};
884     /// # use h2::client::*;
885     /// # use bytes::Bytes;
886     /// #
887     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
888     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
889     /// # {
890     /// // `client_fut` is a future representing the completion of the HTTP/2
891     /// // handshake.
892     /// let client_fut = Builder::new()
893     ///     .max_concurrent_reset_streams(1000)
894     ///     .handshake(my_io);
895     /// # client_fut.await
896     /// # }
897     /// #
898     /// # pub fn main() {}
899     /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self900     pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
901         self.reset_stream_max = max;
902         self
903     }
904 
905     /// Sets the duration to remember locally reset streams.
906     ///
907     /// When a stream is explicitly reset, the HTTP/2 specification requires
908     /// that any further frames received for that stream must be ignored for
909     /// "some time".
910     ///
911     /// In order to satisfy the specification, internal state must be maintained
912     /// to implement the behavior. This state grows linearly with the number of
913     /// streams that are locally reset.
914     ///
915     /// The `reset_stream_duration` setting configures the max amount of time
916     /// this state will be maintained in memory. Once the duration elapses, the
917     /// stream state is purged from memory.
918     ///
919     /// Once the stream has been fully purged from memory, any additional frames
920     /// received for that stream will result in a connection level protocol
921     /// error, forcing the connection to terminate.
922     ///
923     /// The default value is 30 seconds.
924     ///
925     /// # Examples
926     ///
927     /// ```
928     /// # use tokio::io::{AsyncRead, AsyncWrite};
929     /// # use h2::client::*;
930     /// # use std::time::Duration;
931     /// # use bytes::Bytes;
932     /// #
933     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
934     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
935     /// # {
936     /// // `client_fut` is a future representing the completion of the HTTP/2
937     /// // handshake.
938     /// let client_fut = Builder::new()
939     ///     .reset_stream_duration(Duration::from_secs(10))
940     ///     .handshake(my_io);
941     /// # client_fut.await
942     /// # }
943     /// #
944     /// # pub fn main() {}
945     /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self946     pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
947         self.reset_stream_duration = dur;
948         self
949     }
950 
951     /// Enables or disables server push promises.
952     ///
953     /// This value is included in the initial SETTINGS handshake. When set, the
954     /// server MUST NOT send a push promise. Setting this value to value to
955     /// false in the initial SETTINGS handshake guarantees that the remote server
956     /// will never send a push promise.
957     ///
958     /// This setting can be changed during the life of a single HTTP/2
959     /// connection by sending another settings frame updating the value.
960     ///
961     /// Default value: `true`.
962     ///
963     /// # Examples
964     ///
965     /// ```
966     /// # use tokio::io::{AsyncRead, AsyncWrite};
967     /// # use h2::client::*;
968     /// # use std::time::Duration;
969     /// # use bytes::Bytes;
970     /// #
971     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
972     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
973     /// # {
974     /// // `client_fut` is a future representing the completion of the HTTP/2
975     /// // handshake.
976     /// let client_fut = Builder::new()
977     ///     .enable_push(false)
978     ///     .handshake(my_io);
979     /// # client_fut.await
980     /// # }
981     /// #
982     /// # pub fn main() {}
983     /// ```
enable_push(&mut self, enabled: bool) -> &mut Self984     pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
985         self.settings.set_enable_push(enabled);
986         self
987     }
988 
989     /// Sets the first stream ID to something other than 1.
990     #[cfg(feature = "unstable")]
initial_stream_id(&mut self, stream_id: u32) -> &mut Self991     pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
992         self.stream_id = stream_id.into();
993         assert!(
994             self.stream_id.is_client_initiated(),
995             "stream id must be odd"
996         );
997         self
998     }
999 
1000     /// Creates a new configured HTTP/2 client backed by `io`.
1001     ///
1002     /// It is expected that `io` already be in an appropriate state to commence
1003     /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1004     /// preface and the initial settings frame is sent by the client.
1005     ///
1006     /// The handshake future does not wait for the initial settings frame from the
1007     /// server.
1008     ///
1009     /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1010     /// tuple once the HTTP/2 handshake has been completed.
1011     ///
1012     /// This function also allows the caller to configure the send payload data
1013     /// type. See [Outbound data type] for more details.
1014     ///
1015     /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1016     /// [`Connection`]: struct.Connection.html
1017     /// [`SendRequest`]: struct.SendRequest.html
1018     /// [Outbound data type]: ../index.html#outbound-data-type.
1019     ///
1020     /// # Examples
1021     ///
1022     /// Basic usage:
1023     ///
1024     /// ```
1025     /// # use tokio::io::{AsyncRead, AsyncWrite};
1026     /// # use h2::client::*;
1027     /// # use bytes::Bytes;
1028     /// #
1029     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1030     ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1031     /// # {
1032     /// // `client_fut` is a future representing the completion of the HTTP/2
1033     /// // handshake.
1034     /// let client_fut = Builder::new()
1035     ///     .handshake(my_io);
1036     /// # client_fut.await
1037     /// # }
1038     /// #
1039     /// # pub fn main() {}
1040     /// ```
1041     ///
1042     /// Configures the send-payload data type. In this case, the outbound data
1043     /// type will be `&'static [u8]`.
1044     ///
1045     /// ```
1046     /// # use tokio::io::{AsyncRead, AsyncWrite};
1047     /// # use h2::client::*;
1048     /// #
1049     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1050     /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1051     /// # {
1052     /// // `client_fut` is a future representing the completion of the HTTP/2
1053     /// // handshake.
1054     /// let client_fut = Builder::new()
1055     ///     .handshake::<_, &'static [u8]>(my_io);
1056     /// # client_fut.await
1057     /// # }
1058     /// #
1059     /// # pub fn main() {}
1060     /// ```
handshake<T, B>( &self, io: T, ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> where T: AsyncRead + AsyncWrite + Unpin, B: Buf + 'static,1061     pub fn handshake<T, B>(
1062         &self,
1063         io: T,
1064     ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1065     where
1066         T: AsyncRead + AsyncWrite + Unpin,
1067         B: Buf + 'static,
1068     {
1069         Connection::handshake2(io, self.clone())
1070     }
1071 }
1072 
1073 impl Default for Builder {
default() -> Builder1074     fn default() -> Builder {
1075         Builder::new()
1076     }
1077 }
1078 
1079 /// Creates a new configured HTTP/2 client with default configuration
1080 /// values backed by `io`.
1081 ///
1082 /// It is expected that `io` already be in an appropriate state to commence
1083 /// the [HTTP/2 handshake]. See [Handshake] for more details.
1084 ///
1085 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1086 /// tuple once the HTTP/2 handshake has been completed. The returned
1087 /// [`Connection`] instance will be using default configuration values. Use
1088 /// [`Builder`] to customize the configuration values used by a [`Connection`]
1089 /// instance.
1090 ///
1091 /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1092 /// [Handshake]: ../index.html#handshake
1093 /// [`Connection`]: struct.Connection.html
1094 /// [`SendRequest`]: struct.SendRequest.html
1095 ///
1096 /// # Examples
1097 ///
1098 /// ```
1099 /// # use tokio::io::{AsyncRead, AsyncWrite};
1100 /// # use h2::client;
1101 /// # use h2::client::*;
1102 /// #
1103 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1104 /// # {
1105 /// let (send_request, connection) = client::handshake(my_io).await?;
1106 /// // The HTTP/2 handshake has completed, now start polling
1107 /// // `connection` and use `send_request` to send requests to the
1108 /// // server.
1109 /// # Ok(())
1110 /// # }
1111 /// #
1112 /// # pub fn main() {}
1113 /// ```
handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error> where T: AsyncRead + AsyncWrite + Unpin,1114 pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1115 where
1116     T: AsyncRead + AsyncWrite + Unpin,
1117 {
1118     let builder = Builder::new();
1119     builder
1120         .handshake(io)
1121         .instrument(tracing::trace_span!("client_handshake", io = %std::any::type_name::<T>()))
1122         .await
1123 }
1124 
1125 // ===== impl Connection =====
1126 
bind_connection<T>(io: &mut T) -> Result<(), crate::Error> where T: AsyncRead + AsyncWrite + Unpin,1127 async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1128 where
1129     T: AsyncRead + AsyncWrite + Unpin,
1130 {
1131     tracing::debug!("binding client connection");
1132 
1133     let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1134     io.write_all(msg).await.map_err(crate::Error::from_io)?;
1135 
1136     tracing::debug!("client connection bound");
1137 
1138     Ok(())
1139 }
1140 
1141 impl<T, B> Connection<T, B>
1142 where
1143     T: AsyncRead + AsyncWrite + Unpin,
1144     B: Buf + 'static,
1145 {
handshake2( mut io: T, builder: Builder, ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error>1146     async fn handshake2(
1147         mut io: T,
1148         builder: Builder,
1149     ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1150         bind_connection(&mut io).await?;
1151 
1152         // Create the codec
1153         let mut codec = Codec::new(io);
1154 
1155         if let Some(max) = builder.settings.max_frame_size() {
1156             codec.set_max_recv_frame_size(max as usize);
1157         }
1158 
1159         if let Some(max) = builder.settings.max_header_list_size() {
1160             codec.set_max_recv_header_list_size(max as usize);
1161         }
1162 
1163         // Send initial settings frame
1164         codec
1165             .buffer(builder.settings.clone().into())
1166             .expect("invalid SETTINGS frame");
1167 
1168         let inner = proto::Connection::new(
1169             codec,
1170             proto::Config {
1171                 next_stream_id: builder.stream_id,
1172                 initial_max_send_streams: builder.initial_max_send_streams,
1173                 reset_stream_duration: builder.reset_stream_duration,
1174                 reset_stream_max: builder.reset_stream_max,
1175                 settings: builder.settings.clone(),
1176             },
1177         );
1178         let send_request = SendRequest {
1179             inner: inner.streams().clone(),
1180             pending: None,
1181         };
1182 
1183         let mut connection = Connection { inner };
1184         if let Some(sz) = builder.initial_target_connection_window_size {
1185             connection.set_target_window_size(sz);
1186         }
1187 
1188         Ok((send_request, connection))
1189     }
1190 
1191     /// Sets the target window size for the whole connection.
1192     ///
1193     /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1194     /// frame will be immediately sent to the remote, increasing the connection
1195     /// level window by `size - current_value`.
1196     ///
1197     /// If `size` is less than the current value, nothing will happen
1198     /// immediately. However, as window capacity is released by
1199     /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1200     /// out until the number of "in flight" bytes drops below `size`.
1201     ///
1202     /// The default value is 65,535.
1203     ///
1204     /// See [`FlowControl`] documentation for more details.
1205     ///
1206     /// [`FlowControl`]: ../struct.FlowControl.html
1207     /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)1208     pub fn set_target_window_size(&mut self, size: u32) {
1209         assert!(size <= proto::MAX_WINDOW_SIZE);
1210         self.inner.set_target_window_size(size);
1211     }
1212 
1213     /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1214     /// flow control for received data.
1215     ///
1216     /// The `SETTINGS` will be sent to the remote, and only applied once the
1217     /// remote acknowledges the change.
1218     ///
1219     /// This can be used to increase or decrease the window size for existing
1220     /// streams.
1221     ///
1222     /// # Errors
1223     ///
1224     /// Returns an error if a previous call is still pending acknowledgement
1225     /// from the remote endpoint.
set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error>1226     pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1227         assert!(size <= proto::MAX_WINDOW_SIZE);
1228         self.inner.set_initial_window_size(size)?;
1229         Ok(())
1230     }
1231 
1232     /// Takes a `PingPong` instance from the connection.
1233     ///
1234     /// # Note
1235     ///
1236     /// This may only be called once. Calling multiple times will return `None`.
ping_pong(&mut self) -> Option<PingPong>1237     pub fn ping_pong(&mut self) -> Option<PingPong> {
1238         self.inner.take_user_pings().map(PingPong::new)
1239     }
1240 
1241     /// Returns the maximum number of concurrent streams that may be initiated
1242     /// by this client.
1243     ///
1244     /// This limit is configured by the server peer by sending the
1245     /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1246     /// This method returns the currently acknowledged value recieved from the
1247     /// remote.
1248     ///
1249     /// [settings]: https://tools.ietf.org/html/rfc7540#section-5.1.2
max_concurrent_send_streams(&self) -> usize1250     pub fn max_concurrent_send_streams(&self) -> usize {
1251         self.inner.max_send_streams()
1252     }
1253 
1254     /// Returns the maximum number of concurrent streams that may be initiated
1255     /// by the server on this connection.
1256     ///
1257     /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1258     /// parameter][1] sent in a `SETTINGS` frame that has been
1259     /// acknowledged by the remote peer. The value to be sent is configured by
1260     /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1261     /// with the remote peer.
1262     ///
1263     /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1264     /// [2]: ../struct.Builder.html#method.max_concurrent_streams
max_concurrent_recv_streams(&self) -> usize1265     pub fn max_concurrent_recv_streams(&self) -> usize {
1266         self.inner.max_recv_streams()
1267     }
1268 }
1269 
1270 impl<T, B> Future for Connection<T, B>
1271 where
1272     T: AsyncRead + AsyncWrite + Unpin,
1273     B: Buf + 'static,
1274 {
1275     type Output = Result<(), crate::Error>;
1276 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1277     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1278         self.inner.maybe_close_connection_if_no_streams();
1279         self.inner.poll(cx).map_err(Into::into)
1280     }
1281 }
1282 
1283 impl<T, B> fmt::Debug for Connection<T, B>
1284 where
1285     T: AsyncRead + AsyncWrite,
1286     T: fmt::Debug,
1287     B: fmt::Debug + Buf,
1288 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1289     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1290         fmt::Debug::fmt(&self.inner, fmt)
1291     }
1292 }
1293 
1294 // ===== impl ResponseFuture =====
1295 
1296 impl Future for ResponseFuture {
1297     type Output = Result<Response<RecvStream>, crate::Error>;
1298 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1299     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1300         let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1301         let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1302 
1303         Poll::Ready(Ok(Response::from_parts(parts, body)))
1304     }
1305 }
1306 
1307 impl ResponseFuture {
1308     /// Returns the stream ID of the response stream.
1309     ///
1310     /// # Panics
1311     ///
1312     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1313     pub fn stream_id(&self) -> crate::StreamId {
1314         crate::StreamId::from_internal(self.inner.stream_id())
1315     }
1316     /// Returns a stream of PushPromises
1317     ///
1318     /// # Panics
1319     ///
1320     /// If this method has been called before
1321     /// or the stream was itself was pushed
push_promises(&mut self) -> PushPromises1322     pub fn push_promises(&mut self) -> PushPromises {
1323         if self.push_promise_consumed {
1324             panic!("Reference to push promises stream taken!");
1325         }
1326         self.push_promise_consumed = true;
1327         PushPromises {
1328             inner: self.inner.clone(),
1329         }
1330     }
1331 }
1332 
1333 // ===== impl PushPromises =====
1334 
1335 impl PushPromises {
1336     /// Get the next `PushPromise`.
push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>>1337     pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1338         futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
1339     }
1340 
1341     #[doc(hidden)]
poll_push_promise( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<PushPromise, crate::Error>>>1342     pub fn poll_push_promise(
1343         &mut self,
1344         cx: &mut Context<'_>,
1345     ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1346         match self.inner.poll_pushed(cx) {
1347             Poll::Ready(Some(Ok((request, response)))) => {
1348                 let response = PushedResponseFuture {
1349                     inner: ResponseFuture {
1350                         inner: response,
1351                         push_promise_consumed: false,
1352                     },
1353                 };
1354                 Poll::Ready(Some(Ok(PushPromise { request, response })))
1355             }
1356             Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1357             Poll::Ready(None) => Poll::Ready(None),
1358             Poll::Pending => Poll::Pending,
1359         }
1360     }
1361 }
1362 
1363 #[cfg(feature = "stream")]
1364 impl futures_core::Stream for PushPromises {
1365     type Item = Result<PushPromise, crate::Error>;
1366 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>1367     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1368         self.poll_push_promise(cx)
1369     }
1370 }
1371 
1372 // ===== impl PushPromise =====
1373 
1374 impl PushPromise {
1375     /// Returns a reference to the push promise's request headers.
request(&self) -> &Request<()>1376     pub fn request(&self) -> &Request<()> {
1377         &self.request
1378     }
1379 
1380     /// Returns a mutable reference to the push promise's request headers.
request_mut(&mut self) -> &mut Request<()>1381     pub fn request_mut(&mut self) -> &mut Request<()> {
1382         &mut self.request
1383     }
1384 
1385     /// Consumes `self`, returning the push promise's request headers and
1386     /// response future.
into_parts(self) -> (Request<()>, PushedResponseFuture)1387     pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1388         (self.request, self.response)
1389     }
1390 }
1391 
1392 // ===== impl PushedResponseFuture =====
1393 
1394 impl Future for PushedResponseFuture {
1395     type Output = Result<Response<RecvStream>, crate::Error>;
1396 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1397     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1398         Pin::new(&mut self.inner).poll(cx)
1399     }
1400 }
1401 
1402 impl PushedResponseFuture {
1403     /// Returns the stream ID of the response stream.
1404     ///
1405     /// # Panics
1406     ///
1407     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1408     pub fn stream_id(&self) -> crate::StreamId {
1409         self.inner.stream_id()
1410     }
1411 }
1412 
1413 // ===== impl Peer =====
1414 
1415 impl Peer {
convert_send_message( id: StreamId, request: Request<()>, end_of_stream: bool, ) -> Result<Headers, SendError>1416     pub fn convert_send_message(
1417         id: StreamId,
1418         request: Request<()>,
1419         end_of_stream: bool,
1420     ) -> Result<Headers, SendError> {
1421         use http::request::Parts;
1422 
1423         let (
1424             Parts {
1425                 method,
1426                 uri,
1427                 headers,
1428                 version,
1429                 ..
1430             },
1431             _,
1432         ) = request.into_parts();
1433 
1434         let is_connect = method == Method::CONNECT;
1435 
1436         // Build the set pseudo header set. All requests will include `method`
1437         // and `path`.
1438         let mut pseudo = Pseudo::request(method, uri);
1439 
1440         if pseudo.scheme.is_none() {
1441             // If the scheme is not set, then there are a two options.
1442             //
1443             // 1) Authority is not set. In this case, a request was issued with
1444             //    a relative URI. This is permitted **only** when forwarding
1445             //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1446             //    this is an error.
1447             //
1448             // 2) Authority is set, then the HTTP method *must* be CONNECT.
1449             //
1450             // It is not possible to have a scheme but not an authority set (the
1451             // `http` crate does not allow it).
1452             //
1453             if pseudo.authority.is_none() {
1454                 if version == Version::HTTP_2 {
1455                     return Err(UserError::MissingUriSchemeAndAuthority.into());
1456                 } else {
1457                     // This is acceptable as per the above comment. However,
1458                     // HTTP/2 requires that a scheme is set. Since we are
1459                     // forwarding an HTTP 1.1 request, the scheme is set to
1460                     // "http".
1461                     pseudo.set_scheme(uri::Scheme::HTTP);
1462                 }
1463             } else if !is_connect {
1464                 // TODO: Error
1465             }
1466         }
1467 
1468         // Create the HEADERS frame
1469         let mut frame = Headers::new(id, pseudo, headers);
1470 
1471         if end_of_stream {
1472             frame.set_end_stream()
1473         }
1474 
1475         Ok(frame)
1476     }
1477 }
1478 
1479 impl proto::Peer for Peer {
1480     type Poll = Response<()>;
1481 
1482     const NAME: &'static str = "Client";
1483 
1484     fn r#dyn() -> proto::DynPeer {
1485         proto::DynPeer::Client
1486     }
1487 
is_server() -> bool1488     fn is_server() -> bool {
1489         false
1490     }
1491 
convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result<Self::Poll, Error>1492     fn convert_poll_message(
1493         pseudo: Pseudo,
1494         fields: HeaderMap,
1495         stream_id: StreamId,
1496     ) -> Result<Self::Poll, Error> {
1497         let mut b = Response::builder();
1498 
1499         b = b.version(Version::HTTP_2);
1500 
1501         if let Some(status) = pseudo.status {
1502             b = b.status(status);
1503         }
1504 
1505         let mut response = match b.body(()) {
1506             Ok(response) => response,
1507             Err(_) => {
1508                 // TODO: Should there be more specialized handling for different
1509                 // kinds of errors
1510                 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1511             }
1512         };
1513 
1514         *response.headers_mut() = fields;
1515 
1516         Ok(response)
1517     }
1518 }
1519