1 //! Client implementation of the HTTP/2.0 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2.0 client requires the caller to establish the underlying
6 //! connection as well as get the connection to a state that is ready to begin
7 //! the HTTP/2.0 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11 //! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12 //!
13 //! Once a connection is obtained, it is passed to [`handshake`], which will
14 //! begin the [HTTP/2.0 handshake]. This returns a future that completes once
15 //! the handshake process is performed and HTTP/2.0 streams may be initialized.
16 //!
17 //! [`handshake`] uses default configuration values. There are a number of
18 //! settings that can be changed by using [`Builder`] instead.
19 //!
20 //! Once the handshake future completes, the caller is provided with a
21 //! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22 //! instance is used to drive the connection (see [Managing the connection]).
23 //! The [`SendRequest`] instance is used to initialize new streams (see [Making
24 //! requests]).
25 //!
26 //! # Making requests
27 //!
28 //! Requests are made using the [`SendRequest`] handle provided by the handshake
29 //! future. Once a request is submitted, an HTTP/2.0 stream is initialized and
30 //! the request is sent to the server.
31 //!
32 //! A request body and request trailers are sent using [`SendRequest`] and the
33 //! server's response is returned once the [`ResponseFuture`] future completes.
34 //! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35 //! [`SendRequest::send_request`] and are tied to the HTTP/2.0 stream
36 //! initialized by the sent request.
37 //!
38 //! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2.0
39 //! stream can be created, i.e. as long as the current number of active streams
40 //! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41 //! caller will be notified once an existing stream closes, freeing capacity for
42 //! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43 //! capacity before sending a request to the server.
44 //!
45 //! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46 //! must not send a request if `poll_ready` does not return `Ready`. Attempting
47 //! to do so will result in an [`Error`] being returned.
48 //!
49 //! # Managing the connection
50 //!
51 //! The [`Connection`] instance is used to manage connection state. The caller
52 //! is required to call [`Connection::poll`] in order to advance state.
53 //! [`SendRequest::send_request`] and other functions have no effect unless
54 //! [`Connection::poll`] is called.
55 //!
56 //! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57 //! returns `Ready`. At this point, the underlying socket has been closed and no
58 //! further work needs to be done.
59 //!
60 //! The easiest way to ensure that the [`Connection`] instance gets polled is to
61 //! submit the [`Connection`] instance to an [executor]. The executor will then
62 //! manage polling the connection until the connection is complete.
63 //! Alternatively, the caller can call `poll` manually.
64 //!
65 //! # Example
66 //!
67 //! ```rust, no_run
68 //!
69 //! use h2::client;
70 //!
71 //! use http::{Request, Method};
72 //! use std::error::Error;
73 //! use tokio::net::TcpStream;
74 //!
75 //! #[tokio::main]
76 //! pub async fn main() -> Result<(), Box<dyn Error>> {
77 //!     // Establish TCP connection to the server.
78 //!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79 //!     let (h2, connection) = client::handshake(tcp).await?;
80 //!     tokio::spawn(async move {
81 //!         connection.await.unwrap();
82 //!     });
83 //!
84 //!     let mut h2 = h2.ready().await?;
85 //!     // Prepare the HTTP request to send to the server.
86 //!     let request = Request::builder()
87 //!                     .method(Method::GET)
88 //!                     .uri("https://www.example.com/")
89 //!                     .body(())
90 //!                     .unwrap();
91 //!
92 //!     // Send the request. The second tuple item allows the caller
93 //!     // to stream a request body.
94 //!     let (response, _) = h2.send_request(request, true).unwrap();
95 //!
96 //!     let (head, mut body) = response.await?.into_parts();
97 //!
98 //!     println!("Received response: {:?}", head);
99 //!
100 //!     // The `flow_control` handle allows the caller to manage
101 //!     // flow control.
102 //!     //
103 //!     // Whenever data is received, the caller is responsible for
104 //!     // releasing capacity back to the server once it has freed
105 //!     // the data from memory.
106 //!     let mut flow_control = body.flow_control().clone();
107 //!
108 //!     while let Some(chunk) = body.data().await {
109 //!         let chunk = chunk?;
110 //!         println!("RX: {:?}", chunk);
111 //!
112 //!         // Let the server send more data.
113 //!         let _ = flow_control.release_capacity(chunk.len());
114 //!     }
115 //!
116 //!     Ok(())
117 //! }
118 //! ```
119 //!
120 //! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121 //! [`handshake`]: fn.handshake.html
122 //! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123 //! [`SendRequest`]: struct.SendRequest.html
124 //! [`SendStream`]: ../struct.SendStream.html
125 //! [Making requests]: #making-requests
126 //! [Managing the connection]: #managing-the-connection
127 //! [`Connection`]: struct.Connection.html
128 //! [`Connection::poll`]: struct.Connection.html#method.poll
129 //! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130 //! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131 //! [`SendRequest`]: struct.SendRequest.html
132 //! [`ResponseFuture`]: struct.ResponseFuture.html
133 //! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134 //! [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135 //! [`Builder`]: struct.Builder.html
136 //! [`Error`]: ../struct.Error.html
137 
138 use crate::codec::{Codec, RecvError, SendError, UserError};
139 use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
140 use crate::proto;
141 use crate::{FlowControl, PingPong, RecvStream, SendStream};
142 
143 use bytes::{Buf, Bytes};
144 use http::{uri, HeaderMap, Method, Request, Response, Version};
145 use std::fmt;
146 use std::future::Future;
147 use std::pin::Pin;
148 use std::task::{Context, Poll};
149 use std::time::Duration;
150 use std::usize;
151 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
152 
153 /// Initializes new HTTP/2.0 streams on a connection by sending a request.
154 ///
155 /// This type does no work itself. Instead, it is a handle to the inner
156 /// connection state held by [`Connection`]. If the associated connection
157 /// instance is dropped, all `SendRequest` functions will return [`Error`].
158 ///
159 /// [`SendRequest`] instances are able to move to and operate on separate tasks
160 /// / threads than their associated [`Connection`] instance. Internally, there
161 /// is a buffer used to stage requests before they get written to the
162 /// connection. There is no guarantee that requests get written to the
163 /// connection in FIFO order as HTTP/2.0 prioritization logic can play a role.
164 ///
165 /// [`SendRequest`] implements [`Clone`], enabling the creation of many
166 /// instances that are backed by a single connection.
167 ///
168 /// See [module] level documentation for more details.
169 ///
170 /// [module]: index.html
171 /// [`Connection`]: struct.Connection.html
172 /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
173 /// [`Error`]: ../struct.Error.html
174 pub struct SendRequest<B: Buf> {
175     inner: proto::Streams<B, Peer>,
176     pending: Option<proto::OpaqueStreamRef>,
177 }
178 
179 /// Returns a `SendRequest` instance once it is ready to send at least one
180 /// request.
181 #[derive(Debug)]
182 pub struct ReadySendRequest<B: Buf> {
183     inner: Option<SendRequest<B>>,
184 }
185 
186 /// Manages all state associated with an HTTP/2.0 client connection.
187 ///
188 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
189 /// implements the HTTP/2.0 client logic for that connection. It is responsible
190 /// for driving the internal state forward, performing the work requested of the
191 /// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
192 /// [`RecvStream`]).
193 ///
194 /// `Connection` values are created by calling [`handshake`]. Once a
195 /// `Connection` value is obtained, the caller must repeatedly call [`poll`]
196 /// until `Ready` is returned. The easiest way to do this is to submit the
197 /// `Connection` instance to an [executor].
198 ///
199 /// [module]: index.html
200 /// [`handshake`]: fn.handshake.html
201 /// [`SendRequest`]: struct.SendRequest.html
202 /// [`ResponseFuture`]: struct.ResponseFuture.html
203 /// [`SendStream`]: ../struct.SendStream.html
204 /// [`RecvStream`]: ../struct.RecvStream.html
205 /// [`poll`]: #method.poll
206 /// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
207 ///
208 /// # Examples
209 ///
210 /// ```
211 /// # use tokio::io::{AsyncRead, AsyncWrite};
212 /// # use h2::client;
213 /// # use h2::client::*;
214 /// #
215 /// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
216 /// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
217 /// # {
218 ///     let (send_request, connection) = client::handshake(my_io).await?;
219 ///     // Submit the connection handle to an executor.
220 ///     tokio::spawn(async { connection.await.expect("connection failed"); });
221 ///
222 ///     // Now, use `send_request` to initialize HTTP/2.0 streams.
223 ///     // ...
224 /// # Ok(())
225 /// # }
226 /// #
227 /// # pub fn main() {}
228 /// ```
229 #[must_use = "futures do nothing unless polled"]
230 pub struct Connection<T, B: Buf = Bytes> {
231     inner: proto::Connection<T, Peer, B>,
232 }
233 
234 /// A future of an HTTP response.
235 #[derive(Debug)]
236 #[must_use = "futures do nothing unless polled"]
237 pub struct ResponseFuture {
238     inner: proto::OpaqueStreamRef,
239     push_promise_consumed: bool,
240 }
241 
242 /// A future of a pushed HTTP response.
243 ///
244 /// We have to differentiate between pushed and non pushed because of the spec
245 /// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
246 /// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
247 /// > that is in either the "open" or "half-closed (remote)" state.
248 #[derive(Debug)]
249 #[must_use = "futures do nothing unless polled"]
250 pub struct PushedResponseFuture {
251     inner: ResponseFuture,
252 }
253 
254 /// A pushed response and corresponding request headers
255 #[derive(Debug)]
256 pub struct PushPromise {
257     /// The request headers
258     request: Request<()>,
259 
260     /// The pushed response
261     response: PushedResponseFuture,
262 }
263 
264 /// A stream of pushed responses and corresponding promised requests
265 #[derive(Debug)]
266 #[must_use = "streams do nothing unless polled"]
267 pub struct PushPromises {
268     inner: proto::OpaqueStreamRef,
269 }
270 
271 /// Builds client connections with custom configuration values.
272 ///
273 /// Methods can be chained in order to set the configuration values.
274 ///
275 /// The client is constructed by calling [`handshake`] and passing the I/O
276 /// handle that will back the HTTP/2.0 server.
277 ///
278 /// New instances of `Builder` are obtained via [`Builder::new`].
279 ///
280 /// See function level documentation for details on the various client
281 /// configuration settings.
282 ///
283 /// [`Builder::new`]: struct.Builder.html#method.new
284 /// [`handshake`]: struct.Builder.html#method.handshake
285 ///
286 /// # Examples
287 ///
288 /// ```
289 /// # use tokio::io::{AsyncRead, AsyncWrite};
290 /// # use h2::client::*;
291 /// # use bytes::Bytes;
292 /// #
293 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
294 ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
295 /// # {
296 /// // `client_fut` is a future representing the completion of the HTTP/2.0
297 /// // handshake.
298 /// let client_fut = Builder::new()
299 ///     .initial_window_size(1_000_000)
300 ///     .max_concurrent_streams(1000)
301 ///     .handshake(my_io);
302 /// # client_fut.await
303 /// # }
304 /// #
305 /// # pub fn main() {}
306 /// ```
307 #[derive(Clone, Debug)]
308 pub struct Builder {
309     /// Time to keep locally reset streams around before reaping.
310     reset_stream_duration: Duration,
311 
312     /// Initial maximum number of locally initiated (send) streams.
313     /// After receiving a Settings frame from the remote peer,
314     /// the connection will overwrite this value with the
315     /// MAX_CONCURRENT_STREAMS specified in the frame.
316     initial_max_send_streams: usize,
317 
318     /// Initial target window size for new connections.
319     initial_target_connection_window_size: Option<u32>,
320 
321     /// Maximum number of locally reset streams to keep at a time.
322     reset_stream_max: usize,
323 
324     /// Initial `Settings` frame to send as part of the handshake.
325     settings: Settings,
326 
327     /// The stream ID of the first (lowest) stream. Subsequent streams will use
328     /// monotonically increasing stream IDs.
329     stream_id: StreamId,
330 }
331 
332 #[derive(Debug)]
333 pub(crate) struct Peer;
334 
335 // ===== impl SendRequest =====
336 
337 impl<B> SendRequest<B>
338 where
339     B: Buf + 'static,
340 {
341     /// Returns `Ready` when the connection can initialize a new HTTP/2.0
342     /// stream.
343     ///
344     /// This function must return `Ready` before `send_request` is called. When
345     /// `Poll::Pending` is returned, the task will be notified once the readiness
346     /// state changes.
347     ///
348     /// See [module] level docs for more details.
349     ///
350     /// [module]: index.html
poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>351     pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
352         ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
353         self.pending = None;
354         Poll::Ready(Ok(()))
355     }
356 
357     /// Consumes `self`, returning a future that returns `self` back once it is
358     /// ready to send a request.
359     ///
360     /// This function should be called before calling `send_request`.
361     ///
362     /// This is a functional combinator for [`poll_ready`]. The returned future
363     /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
364     /// the caller.
365     ///
366     /// # Examples
367     ///
368     /// ```rust
369     /// # use h2::client::*;
370     /// # use http::*;
371     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
372     /// # {
373     /// // First, wait until the `send_request` handle is ready to send a new
374     /// // request
375     /// let mut send_request = send_request.ready().await.unwrap();
376     /// // Use `send_request` here.
377     /// # }
378     /// # pub fn main() {}
379     /// ```
380     ///
381     /// See [module] level docs for more details.
382     ///
383     /// [`poll_ready`]: #method.poll_ready
384     /// [module]: index.html
ready(self) -> ReadySendRequest<B>385     pub fn ready(self) -> ReadySendRequest<B> {
386         ReadySendRequest { inner: Some(self) }
387     }
388 
389     /// Sends a HTTP/2.0 request to the server.
390     ///
391     /// `send_request` initializes a new HTTP/2.0 stream on the associated
392     /// connection, then sends the given request using this new stream. Only the
393     /// request head is sent.
394     ///
395     /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
396     /// are returned. The [`ResponseFuture`] instance is used to get the
397     /// server's response and the [`SendStream`] instance is used to send a
398     /// request body or trailers to the server over the same HTTP/2.0 stream.
399     ///
400     /// To send a request body or trailers, set `end_of_stream` to `false`.
401     /// Then, use the returned [`SendStream`] instance to stream request body
402     /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
403     /// then attempting to call [`SendStream::send_data`] or
404     /// [`SendStream::send_trailers`] will result in an error.
405     ///
406     /// If no request body or trailers are to be sent, set `end_of_stream` to
407     /// `true` and drop the returned [`SendStream`] instance.
408     ///
409     /// # A note on HTTP versions
410     ///
411     /// The provided `Request` will be encoded differently depending on the
412     /// value of its version field. If the version is set to 2.0, then the
413     /// request is encoded as per the specification recommends.
414     ///
415     /// If the version is set to a lower value, then the request is encoded to
416     /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
417     /// headers are permitted and the `:authority` pseudo header is not
418     /// included.
419     ///
420     /// The caller should always set the request's version field to 2.0 unless
421     /// specifically transmitting an HTTP 1.1 request over 2.0.
422     ///
423     /// # Examples
424     ///
425     /// Sending a request with no body
426     ///
427     /// ```rust
428     /// # use h2::client::*;
429     /// # use http::*;
430     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
431     /// # {
432     /// // First, wait until the `send_request` handle is ready to send a new
433     /// // request
434     /// let mut send_request = send_request.ready().await.unwrap();
435     /// // Prepare the HTTP request to send to the server.
436     /// let request = Request::get("https://www.example.com/")
437     ///     .body(())
438     ///     .unwrap();
439     ///
440     /// // Send the request to the server. Since we are not sending a
441     /// // body or trailers, we can drop the `SendStream` instance.
442     /// let (response, _) = send_request.send_request(request, true).unwrap();
443     /// let response = response.await.unwrap();
444     /// // Process the response
445     /// # }
446     /// # pub fn main() {}
447     /// ```
448     ///
449     /// Sending a request with a body and trailers
450     ///
451     /// ```rust
452     /// # use h2::client::*;
453     /// # use http::*;
454     /// # async fn doc(send_request: SendRequest<&'static [u8]>)
455     /// # {
456     /// // First, wait until the `send_request` handle is ready to send a new
457     /// // request
458     /// let mut send_request = send_request.ready().await.unwrap();
459     ///
460     /// // Prepare the HTTP request to send to the server.
461     /// let request = Request::get("https://www.example.com/")
462     ///     .body(())
463     ///     .unwrap();
464     ///
465     /// // Send the request to the server. If we are not sending a
466     /// // body or trailers, we can drop the `SendStream` instance.
467     /// let (response, mut send_stream) = send_request
468     ///     .send_request(request, false).unwrap();
469     ///
470     /// // At this point, one option would be to wait for send capacity.
471     /// // Doing so would allow us to not hold data in memory that
472     /// // cannot be sent. However, this is not a requirement, so this
473     /// // example will skip that step. See `SendStream` documentation
474     /// // for more details.
475     /// send_stream.send_data(b"hello", false).unwrap();
476     /// send_stream.send_data(b"world", false).unwrap();
477     ///
478     /// // Send the trailers.
479     /// let mut trailers = HeaderMap::new();
480     /// trailers.insert(
481     ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
482     ///     header::HeaderValue::from_bytes(b"hello").unwrap());
483     ///
484     /// send_stream.send_trailers(trailers).unwrap();
485     ///
486     /// let response = response.await.unwrap();
487     /// // Process the response
488     /// # }
489     /// # pub fn main() {}
490     /// ```
491     ///
492     /// [`ResponseFuture`]: struct.ResponseFuture.html
493     /// [`SendStream`]: ../struct.SendStream.html
494     /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
495     /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
send_request( &mut self, request: Request<()>, end_of_stream: bool, ) -> Result<(ResponseFuture, SendStream<B>), crate::Error>496     pub fn send_request(
497         &mut self,
498         request: Request<()>,
499         end_of_stream: bool,
500     ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
501         self.inner
502             .send_request(request, end_of_stream, self.pending.as_ref())
503             .map_err(Into::into)
504             .map(|stream| {
505                 if stream.is_pending_open() {
506                     self.pending = Some(stream.clone_to_opaque());
507                 }
508 
509                 let response = ResponseFuture {
510                     inner: stream.clone_to_opaque(),
511                     push_promise_consumed: false,
512                 };
513 
514                 let stream = SendStream::new(stream);
515 
516                 (response, stream)
517             })
518     }
519 }
520 
521 impl<B> fmt::Debug for SendRequest<B>
522 where
523     B: Buf,
524 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result525     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
526         fmt.debug_struct("SendRequest").finish()
527     }
528 }
529 
530 impl<B> Clone for SendRequest<B>
531 where
532     B: Buf,
533 {
clone(&self) -> Self534     fn clone(&self) -> Self {
535         SendRequest {
536             inner: self.inner.clone(),
537             pending: None,
538         }
539     }
540 }
541 
542 #[cfg(feature = "unstable")]
543 impl<B> SendRequest<B>
544 where
545     B: Buf,
546 {
547     /// Returns the number of active streams.
548     ///
549     /// An active stream is a stream that has not yet transitioned to a closed
550     /// state.
num_active_streams(&self) -> usize551     pub fn num_active_streams(&self) -> usize {
552         self.inner.num_active_streams()
553     }
554 
555     /// Returns the number of streams that are held in memory.
556     ///
557     /// A wired stream is a stream that is either active or is closed but must
558     /// stay in memory for some reason. For example, there are still outstanding
559     /// userspace handles pointing to the slot.
num_wired_streams(&self) -> usize560     pub fn num_wired_streams(&self) -> usize {
561         self.inner.num_wired_streams()
562     }
563 }
564 
565 // ===== impl ReadySendRequest =====
566 
567 impl<B> Future for ReadySendRequest<B>
568 where
569     B: Buf + 'static,
570 {
571     type Output = Result<SendRequest<B>, crate::Error>;
572 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>573     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
574         match &mut self.inner {
575             Some(send_request) => {
576                 ready!(send_request.poll_ready(cx))?;
577             }
578             None => panic!("called `poll` after future completed"),
579         }
580 
581         Poll::Ready(Ok(self.inner.take().unwrap()))
582     }
583 }
584 
585 // ===== impl Builder =====
586 
587 impl Builder {
588     /// Returns a new client builder instance initialized with default
589     /// configuration values.
590     ///
591     /// Configuration methods can be chained on the return value.
592     ///
593     /// # Examples
594     ///
595     /// ```
596     /// # use tokio::io::{AsyncRead, AsyncWrite};
597     /// # use h2::client::*;
598     /// # use bytes::Bytes;
599     /// #
600     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
601     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
602     /// # {
603     /// // `client_fut` is a future representing the completion of the HTTP/2.0
604     /// // handshake.
605     /// let client_fut = Builder::new()
606     ///     .initial_window_size(1_000_000)
607     ///     .max_concurrent_streams(1000)
608     ///     .handshake(my_io);
609     /// # client_fut.await
610     /// # }
611     /// #
612     /// # pub fn main() {}
613     /// ```
new() -> Builder614     pub fn new() -> Builder {
615         Builder {
616             reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
617             reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
618             initial_target_connection_window_size: None,
619             initial_max_send_streams: usize::MAX,
620             settings: Default::default(),
621             stream_id: 1.into(),
622         }
623     }
624 
625     /// Indicates the initial window size (in octets) for stream-level
626     /// flow control for received data.
627     ///
628     /// The initial window of a stream is used as part of flow control. For more
629     /// details, see [`FlowControl`].
630     ///
631     /// The default value is 65,535.
632     ///
633     /// [`FlowControl`]: ../struct.FlowControl.html
634     ///
635     /// # Examples
636     ///
637     /// ```
638     /// # use tokio::io::{AsyncRead, AsyncWrite};
639     /// # use h2::client::*;
640     /// # use bytes::Bytes;
641     /// #
642     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
643     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
644     /// # {
645     /// // `client_fut` is a future representing the completion of the HTTP/2.0
646     /// // handshake.
647     /// let client_fut = Builder::new()
648     ///     .initial_window_size(1_000_000)
649     ///     .handshake(my_io);
650     /// # client_fut.await
651     /// # }
652     /// #
653     /// # pub fn main() {}
654     /// ```
initial_window_size(&mut self, size: u32) -> &mut Self655     pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
656         self.settings.set_initial_window_size(Some(size));
657         self
658     }
659 
660     /// Indicates the initial window size (in octets) for connection-level flow control
661     /// for received data.
662     ///
663     /// The initial window of a connection is used as part of flow control. For more details,
664     /// see [`FlowControl`].
665     ///
666     /// The default value is 65,535.
667     ///
668     /// [`FlowControl`]: ../struct.FlowControl.html
669     ///
670     /// # Examples
671     ///
672     /// ```
673     /// # use tokio::io::{AsyncRead, AsyncWrite};
674     /// # use h2::client::*;
675     /// # use bytes::Bytes;
676     /// #
677     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
678     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
679     /// # {
680     /// // `client_fut` is a future representing the completion of the HTTP/2.0
681     /// // handshake.
682     /// let client_fut = Builder::new()
683     ///     .initial_connection_window_size(1_000_000)
684     ///     .handshake(my_io);
685     /// # client_fut.await
686     /// # }
687     /// #
688     /// # pub fn main() {}
689     /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self690     pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
691         self.initial_target_connection_window_size = Some(size);
692         self
693     }
694 
695     /// Indicates the size (in octets) of the largest HTTP/2.0 frame payload that the
696     /// configured client is able to accept.
697     ///
698     /// The sender may send data frames that are **smaller** than this value,
699     /// but any data larger than `max` will be broken up into multiple `DATA`
700     /// frames.
701     ///
702     /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
703     ///
704     /// # Examples
705     ///
706     /// ```
707     /// # use tokio::io::{AsyncRead, AsyncWrite};
708     /// # use h2::client::*;
709     /// # use bytes::Bytes;
710     /// #
711     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
712     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
713     /// # {
714     /// // `client_fut` is a future representing the completion of the HTTP/2.0
715     /// // handshake.
716     /// let client_fut = Builder::new()
717     ///     .max_frame_size(1_000_000)
718     ///     .handshake(my_io);
719     /// # client_fut.await
720     /// # }
721     /// #
722     /// # pub fn main() {}
723     /// ```
724     ///
725     /// # Panics
726     ///
727     /// This function panics if `max` is not within the legal range specified
728     /// above.
max_frame_size(&mut self, max: u32) -> &mut Self729     pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
730         self.settings.set_max_frame_size(Some(max));
731         self
732     }
733 
734     /// Sets the max size of received header frames.
735     ///
736     /// This advisory setting informs a peer of the maximum size of header list
737     /// that the sender is prepared to accept, in octets. The value is based on
738     /// the uncompressed size of header fields, including the length of the name
739     /// and value in octets plus an overhead of 32 octets for each header field.
740     ///
741     /// This setting is also used to limit the maximum amount of data that is
742     /// buffered to decode HEADERS frames.
743     ///
744     /// # Examples
745     ///
746     /// ```
747     /// # use tokio::io::{AsyncRead, AsyncWrite};
748     /// # use h2::client::*;
749     /// # use bytes::Bytes;
750     /// #
751     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
752     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
753     /// # {
754     /// // `client_fut` is a future representing the completion of the HTTP/2.0
755     /// // handshake.
756     /// let client_fut = Builder::new()
757     ///     .max_header_list_size(16 * 1024)
758     ///     .handshake(my_io);
759     /// # client_fut.await
760     /// # }
761     /// #
762     /// # pub fn main() {}
763     /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self764     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
765         self.settings.set_max_header_list_size(Some(max));
766         self
767     }
768 
769     /// Sets the maximum number of concurrent streams.
770     ///
771     /// The maximum concurrent streams setting only controls the maximum number
772     /// of streams that can be initiated by the remote peer. In other words,
773     /// when this setting is set to 100, this does not limit the number of
774     /// concurrent streams that can be created by the caller.
775     ///
776     /// It is recommended that this value be no smaller than 100, so as to not
777     /// unnecessarily limit parallelism. However, any value is legal, including
778     /// 0. If `max` is set to 0, then the remote will not be permitted to
779     /// initiate streams.
780     ///
781     /// Note that streams in the reserved state, i.e., push promises that have
782     /// been reserved but the stream has not started, do not count against this
783     /// setting.
784     ///
785     /// Also note that if the remote *does* exceed the value set here, it is not
786     /// a protocol level error. Instead, the `h2` library will immediately reset
787     /// the stream.
788     ///
789     /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
790     ///
791     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
792     ///
793     /// # Examples
794     ///
795     /// ```
796     /// # use tokio::io::{AsyncRead, AsyncWrite};
797     /// # use h2::client::*;
798     /// # use bytes::Bytes;
799     /// #
800     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
801     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
802     /// # {
803     /// // `client_fut` is a future representing the completion of the HTTP/2.0
804     /// // handshake.
805     /// let client_fut = Builder::new()
806     ///     .max_concurrent_streams(1000)
807     ///     .handshake(my_io);
808     /// # client_fut.await
809     /// # }
810     /// #
811     /// # pub fn main() {}
812     /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self813     pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
814         self.settings.set_max_concurrent_streams(Some(max));
815         self
816     }
817 
818     /// Sets the initial maximum of locally initiated (send) streams.
819     ///
820     /// The initial settings will be overwritten by the remote peer when
821     /// the Settings frame is received. The new value will be set to the
822     /// `max_concurrent_streams()` from the frame.
823     ///
824     /// This setting prevents the caller from exceeding this number of
825     /// streams that are counted towards the concurrency limit.
826     ///
827     /// Sending streams past the limit returned by the peer will be treated
828     /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
829     ///
830     /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
831     ///
832     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
833     ///
834     /// # Examples
835     ///
836     /// ```
837     /// # use tokio::io::{AsyncRead, AsyncWrite};
838     /// # use h2::client::*;
839     /// # use bytes::Bytes;
840     /// #
841     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
842     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
843     /// # {
844     /// // `client_fut` is a future representing the completion of the HTTP/2.0
845     /// // handshake.
846     /// let client_fut = Builder::new()
847     ///     .initial_max_send_streams(1000)
848     ///     .handshake(my_io);
849     /// # client_fut.await
850     /// # }
851     /// #
852     /// # pub fn main() {}
853     /// ```
initial_max_send_streams(&mut self, initial: usize) -> &mut Self854     pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
855         self.initial_max_send_streams = initial;
856         self
857     }
858 
859     /// Sets the maximum number of concurrent locally reset streams.
860     ///
861     /// When a stream is explicitly reset, the HTTP/2.0 specification requires
862     /// that any further frames received for that stream must be ignored for
863     /// "some time".
864     ///
865     /// In order to satisfy the specification, internal state must be maintained
866     /// to implement the behavior. This state grows linearly with the number of
867     /// streams that are locally reset.
868     ///
869     /// The `max_concurrent_reset_streams` setting configures sets an upper
870     /// bound on the amount of state that is maintained. When this max value is
871     /// reached, the oldest reset stream is purged from memory.
872     ///
873     /// Once the stream has been fully purged from memory, any additional frames
874     /// received for that stream will result in a connection level protocol
875     /// error, forcing the connection to terminate.
876     ///
877     /// The default value is 10.
878     ///
879     /// # Examples
880     ///
881     /// ```
882     /// # use tokio::io::{AsyncRead, AsyncWrite};
883     /// # use h2::client::*;
884     /// # use bytes::Bytes;
885     /// #
886     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
887     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
888     /// # {
889     /// // `client_fut` is a future representing the completion of the HTTP/2.0
890     /// // handshake.
891     /// let client_fut = Builder::new()
892     ///     .max_concurrent_reset_streams(1000)
893     ///     .handshake(my_io);
894     /// # client_fut.await
895     /// # }
896     /// #
897     /// # pub fn main() {}
898     /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self899     pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
900         self.reset_stream_max = max;
901         self
902     }
903 
904     /// Sets the duration to remember locally reset streams.
905     ///
906     /// When a stream is explicitly reset, the HTTP/2.0 specification requires
907     /// that any further frames received for that stream must be ignored for
908     /// "some time".
909     ///
910     /// In order to satisfy the specification, internal state must be maintained
911     /// to implement the behavior. This state grows linearly with the number of
912     /// streams that are locally reset.
913     ///
914     /// The `reset_stream_duration` setting configures the max amount of time
915     /// this state will be maintained in memory. Once the duration elapses, the
916     /// stream state is purged from memory.
917     ///
918     /// Once the stream has been fully purged from memory, any additional frames
919     /// received for that stream will result in a connection level protocol
920     /// error, forcing the connection to terminate.
921     ///
922     /// The default value is 30 seconds.
923     ///
924     /// # Examples
925     ///
926     /// ```
927     /// # use tokio::io::{AsyncRead, AsyncWrite};
928     /// # use h2::client::*;
929     /// # use std::time::Duration;
930     /// # use bytes::Bytes;
931     /// #
932     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
933     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
934     /// # {
935     /// // `client_fut` is a future representing the completion of the HTTP/2.0
936     /// // handshake.
937     /// let client_fut = Builder::new()
938     ///     .reset_stream_duration(Duration::from_secs(10))
939     ///     .handshake(my_io);
940     /// # client_fut.await
941     /// # }
942     /// #
943     /// # pub fn main() {}
944     /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self945     pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
946         self.reset_stream_duration = dur;
947         self
948     }
949 
950     /// Enables or disables server push promises.
951     ///
952     /// This value is included in the initial SETTINGS handshake. When set, the
953     /// server MUST NOT send a push promise. Setting this value to value to
954     /// false in the initial SETTINGS handshake guarantees that the remote server
955     /// will never send a push promise.
956     ///
957     /// This setting can be changed during the life of a single HTTP/2.0
958     /// connection by sending another settings frame updating the value.
959     ///
960     /// Default value: `true`.
961     ///
962     /// # Examples
963     ///
964     /// ```
965     /// # use tokio::io::{AsyncRead, AsyncWrite};
966     /// # use h2::client::*;
967     /// # use std::time::Duration;
968     /// # use bytes::Bytes;
969     /// #
970     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
971     /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
972     /// # {
973     /// // `client_fut` is a future representing the completion of the HTTP/2.0
974     /// // handshake.
975     /// let client_fut = Builder::new()
976     ///     .enable_push(false)
977     ///     .handshake(my_io);
978     /// # client_fut.await
979     /// # }
980     /// #
981     /// # pub fn main() {}
982     /// ```
enable_push(&mut self, enabled: bool) -> &mut Self983     pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
984         self.settings.set_enable_push(enabled);
985         self
986     }
987 
988     /// Sets the first stream ID to something other than 1.
989     #[cfg(feature = "unstable")]
initial_stream_id(&mut self, stream_id: u32) -> &mut Self990     pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
991         self.stream_id = stream_id.into();
992         assert!(
993             self.stream_id.is_client_initiated(),
994             "stream id must be odd"
995         );
996         self
997     }
998 
999     /// Creates a new configured HTTP/2.0 client backed by `io`.
1000     ///
1001     /// It is expected that `io` already be in an appropriate state to commence
1002     /// the [HTTP/2.0 handshake]. The handshake is completed once both the connection
1003     /// preface and the initial settings frame is sent by the client.
1004     ///
1005     /// The handshake future does not wait for the initial settings frame from the
1006     /// server.
1007     ///
1008     /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1009     /// tuple once the HTTP/2.0 handshake has been completed.
1010     ///
1011     /// This function also allows the caller to configure the send payload data
1012     /// type. See [Outbound data type] for more details.
1013     ///
1014     /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1015     /// [`Connection`]: struct.Connection.html
1016     /// [`SendRequest`]: struct.SendRequest.html
1017     /// [Outbound data type]: ../index.html#outbound-data-type.
1018     ///
1019     /// # Examples
1020     ///
1021     /// Basic usage:
1022     ///
1023     /// ```
1024     /// # use tokio::io::{AsyncRead, AsyncWrite};
1025     /// # use h2::client::*;
1026     /// # use bytes::Bytes;
1027     /// #
1028     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1029     ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1030     /// # {
1031     /// // `client_fut` is a future representing the completion of the HTTP/2.0
1032     /// // handshake.
1033     /// let client_fut = Builder::new()
1034     ///     .handshake(my_io);
1035     /// # client_fut.await
1036     /// # }
1037     /// #
1038     /// # pub fn main() {}
1039     /// ```
1040     ///
1041     /// Configures the send-payload data type. In this case, the outbound data
1042     /// type will be `&'static [u8]`.
1043     ///
1044     /// ```
1045     /// # use tokio::io::{AsyncRead, AsyncWrite};
1046     /// # use h2::client::*;
1047     /// #
1048     /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1049     /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1050     /// # {
1051     /// // `client_fut` is a future representing the completion of the HTTP/2.0
1052     /// // handshake.
1053     /// let client_fut = Builder::new()
1054     ///     .handshake::<_, &'static [u8]>(my_io);
1055     /// # client_fut.await
1056     /// # }
1057     /// #
1058     /// # pub fn main() {}
1059     /// ```
handshake<T, B>( &self, io: T, ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>> where T: AsyncRead + AsyncWrite + Unpin, B: Buf + 'static,1060     pub fn handshake<T, B>(
1061         &self,
1062         io: T,
1063     ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1064     where
1065         T: AsyncRead + AsyncWrite + Unpin,
1066         B: Buf + 'static,
1067     {
1068         Connection::handshake2(io, self.clone())
1069     }
1070 }
1071 
1072 impl Default for Builder {
default() -> Builder1073     fn default() -> Builder {
1074         Builder::new()
1075     }
1076 }
1077 
1078 /// Creates a new configured HTTP/2.0 client with default configuration
1079 /// values backed by `io`.
1080 ///
1081 /// It is expected that `io` already be in an appropriate state to commence
1082 /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
1083 ///
1084 /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1085 /// tuple once the HTTP/2.0 handshake has been completed. The returned
1086 /// [`Connection`] instance will be using default configuration values. Use
1087 /// [`Builder`] to customize the configuration values used by a [`Connection`]
1088 /// instance.
1089 ///
1090 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1091 /// [Handshake]: ../index.html#handshake
1092 /// [`Connection`]: struct.Connection.html
1093 /// [`SendRequest`]: struct.SendRequest.html
1094 ///
1095 /// # Examples
1096 ///
1097 /// ```
1098 /// # use tokio::io::{AsyncRead, AsyncWrite};
1099 /// # use h2::client;
1100 /// # use h2::client::*;
1101 /// #
1102 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1103 /// # {
1104 /// let (send_request, connection) = client::handshake(my_io).await?;
1105 /// // The HTTP/2.0 handshake has completed, now start polling
1106 /// // `connection` and use `send_request` to send requests to the
1107 /// // server.
1108 /// # Ok(())
1109 /// # }
1110 /// #
1111 /// # pub fn main() {}
1112 /// ```
handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error> where T: AsyncRead + AsyncWrite + Unpin,1113 pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1114 where
1115     T: AsyncRead + AsyncWrite + Unpin,
1116 {
1117     let builder = Builder::new();
1118     builder.handshake(io).await
1119 }
1120 
1121 // ===== impl Connection =====
1122 
1123 impl<T, B> Connection<T, B>
1124 where
1125     T: AsyncRead + AsyncWrite + Unpin,
1126     B: Buf + 'static,
1127 {
handshake2( mut io: T, builder: Builder, ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error>1128     async fn handshake2(
1129         mut io: T,
1130         builder: Builder,
1131     ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1132         log::debug!("binding client connection");
1133 
1134         let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1135         io.write_all(msg).await.map_err(crate::Error::from_io)?;
1136 
1137         log::debug!("client connection bound");
1138 
1139         // Create the codec
1140         let mut codec = Codec::new(io);
1141 
1142         if let Some(max) = builder.settings.max_frame_size() {
1143             codec.set_max_recv_frame_size(max as usize);
1144         }
1145 
1146         if let Some(max) = builder.settings.max_header_list_size() {
1147             codec.set_max_recv_header_list_size(max as usize);
1148         }
1149 
1150         // Send initial settings frame
1151         codec
1152             .buffer(builder.settings.clone().into())
1153             .expect("invalid SETTINGS frame");
1154 
1155         let inner = proto::Connection::new(
1156             codec,
1157             proto::Config {
1158                 next_stream_id: builder.stream_id,
1159                 initial_max_send_streams: builder.initial_max_send_streams,
1160                 reset_stream_duration: builder.reset_stream_duration,
1161                 reset_stream_max: builder.reset_stream_max,
1162                 settings: builder.settings.clone(),
1163             },
1164         );
1165         let send_request = SendRequest {
1166             inner: inner.streams().clone(),
1167             pending: None,
1168         };
1169 
1170         let mut connection = Connection { inner };
1171         if let Some(sz) = builder.initial_target_connection_window_size {
1172             connection.set_target_window_size(sz);
1173         }
1174 
1175         Ok((send_request, connection))
1176     }
1177 
1178     /// Sets the target window size for the whole connection.
1179     ///
1180     /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1181     /// frame will be immediately sent to the remote, increasing the connection
1182     /// level window by `size - current_value`.
1183     ///
1184     /// If `size` is less than the current value, nothing will happen
1185     /// immediately. However, as window capacity is released by
1186     /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1187     /// out until the number of "in flight" bytes drops below `size`.
1188     ///
1189     /// The default value is 65,535.
1190     ///
1191     /// See [`FlowControl`] documentation for more details.
1192     ///
1193     /// [`FlowControl`]: ../struct.FlowControl.html
1194     /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)1195     pub fn set_target_window_size(&mut self, size: u32) {
1196         assert!(size <= proto::MAX_WINDOW_SIZE);
1197         self.inner.set_target_window_size(size);
1198     }
1199 
1200     /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1201     /// flow control for received data.
1202     ///
1203     /// The `SETTINGS` will be sent to the remote, and only applied once the
1204     /// remote acknowledges the change.
1205     ///
1206     /// This can be used to increase or decrease the window size for existing
1207     /// streams.
1208     ///
1209     /// # Errors
1210     ///
1211     /// Returns an error if a previous call is still pending acknowledgement
1212     /// from the remote endpoint.
set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error>1213     pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1214         assert!(size <= proto::MAX_WINDOW_SIZE);
1215         self.inner.set_initial_window_size(size)?;
1216         Ok(())
1217     }
1218 
1219     /// Takes a `PingPong` instance from the connection.
1220     ///
1221     /// # Note
1222     ///
1223     /// This may only be called once. Calling multiple times will return `None`.
ping_pong(&mut self) -> Option<PingPong>1224     pub fn ping_pong(&mut self) -> Option<PingPong> {
1225         self.inner.take_user_pings().map(PingPong::new)
1226     }
1227 }
1228 
1229 impl<T, B> Future for Connection<T, B>
1230 where
1231     T: AsyncRead + AsyncWrite + Unpin,
1232     B: Buf + 'static,
1233 {
1234     type Output = Result<(), crate::Error>;
1235 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1236     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1237         self.inner.maybe_close_connection_if_no_streams();
1238         self.inner.poll(cx).map_err(Into::into)
1239     }
1240 }
1241 
1242 impl<T, B> fmt::Debug for Connection<T, B>
1243 where
1244     T: AsyncRead + AsyncWrite,
1245     T: fmt::Debug,
1246     B: fmt::Debug + Buf,
1247 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1248     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1249         fmt::Debug::fmt(&self.inner, fmt)
1250     }
1251 }
1252 
1253 // ===== impl ResponseFuture =====
1254 
1255 impl Future for ResponseFuture {
1256     type Output = Result<Response<RecvStream>, crate::Error>;
1257 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1258     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1259         let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1260         let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1261 
1262         Poll::Ready(Ok(Response::from_parts(parts, body)))
1263     }
1264 }
1265 
1266 impl ResponseFuture {
1267     /// Returns the stream ID of the response stream.
1268     ///
1269     /// # Panics
1270     ///
1271     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1272     pub fn stream_id(&self) -> crate::StreamId {
1273         crate::StreamId::from_internal(self.inner.stream_id())
1274     }
1275     /// Returns a stream of PushPromises
1276     ///
1277     /// # Panics
1278     ///
1279     /// If this method has been called before
1280     /// or the stream was itself was pushed
push_promises(&mut self) -> PushPromises1281     pub fn push_promises(&mut self) -> PushPromises {
1282         if self.push_promise_consumed {
1283             panic!("Reference to push promises stream taken!");
1284         }
1285         self.push_promise_consumed = true;
1286         PushPromises {
1287             inner: self.inner.clone(),
1288         }
1289     }
1290 }
1291 
1292 // ===== impl PushPromises =====
1293 
1294 impl PushPromises {
1295     /// Get the next `PushPromise`.
push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>>1296     pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1297         futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
1298     }
1299 
1300     #[doc(hidden)]
poll_push_promise( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<PushPromise, crate::Error>>>1301     pub fn poll_push_promise(
1302         &mut self,
1303         cx: &mut Context<'_>,
1304     ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1305         match self.inner.poll_pushed(cx) {
1306             Poll::Ready(Some(Ok((request, response)))) => {
1307                 let response = PushedResponseFuture {
1308                     inner: ResponseFuture {
1309                         inner: response,
1310                         push_promise_consumed: false,
1311                     },
1312                 };
1313                 Poll::Ready(Some(Ok(PushPromise { request, response })))
1314             }
1315             Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1316             Poll::Ready(None) => Poll::Ready(None),
1317             Poll::Pending => Poll::Pending,
1318         }
1319     }
1320 }
1321 
1322 #[cfg(feature = "stream")]
1323 impl futures_core::Stream for PushPromises {
1324     type Item = Result<PushPromise, crate::Error>;
1325 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>1326     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1327         self.poll_push_promise(cx)
1328     }
1329 }
1330 
1331 // ===== impl PushPromise =====
1332 
1333 impl PushPromise {
1334     /// Returns a reference to the push promise's request headers.
request(&self) -> &Request<()>1335     pub fn request(&self) -> &Request<()> {
1336         &self.request
1337     }
1338 
1339     /// Returns a mutable reference to the push promise's request headers.
request_mut(&mut self) -> &mut Request<()>1340     pub fn request_mut(&mut self) -> &mut Request<()> {
1341         &mut self.request
1342     }
1343 
1344     /// Consumes `self`, returning the push promise's request headers and
1345     /// response future.
into_parts(self) -> (Request<()>, PushedResponseFuture)1346     pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1347         (self.request, self.response)
1348     }
1349 }
1350 
1351 // ===== impl PushedResponseFuture =====
1352 
1353 impl Future for PushedResponseFuture {
1354     type Output = Result<Response<RecvStream>, crate::Error>;
1355 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1356     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1357         Pin::new(&mut self.inner).poll(cx)
1358     }
1359 }
1360 
1361 impl PushedResponseFuture {
1362     /// Returns the stream ID of the response stream.
1363     ///
1364     /// # Panics
1365     ///
1366     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1367     pub fn stream_id(&self) -> crate::StreamId {
1368         self.inner.stream_id()
1369     }
1370 }
1371 
1372 // ===== impl Peer =====
1373 
1374 impl Peer {
convert_send_message( id: StreamId, request: Request<()>, end_of_stream: bool, ) -> Result<Headers, SendError>1375     pub fn convert_send_message(
1376         id: StreamId,
1377         request: Request<()>,
1378         end_of_stream: bool,
1379     ) -> Result<Headers, SendError> {
1380         use http::request::Parts;
1381 
1382         let (
1383             Parts {
1384                 method,
1385                 uri,
1386                 headers,
1387                 version,
1388                 ..
1389             },
1390             _,
1391         ) = request.into_parts();
1392 
1393         let is_connect = method == Method::CONNECT;
1394 
1395         // Build the set pseudo header set. All requests will include `method`
1396         // and `path`.
1397         let mut pseudo = Pseudo::request(method, uri);
1398 
1399         if pseudo.scheme.is_none() {
1400             // If the scheme is not set, then there are a two options.
1401             //
1402             // 1) Authority is not set. In this case, a request was issued with
1403             //    a relative URI. This is permitted **only** when forwarding
1404             //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1405             //    this is an error.
1406             //
1407             // 2) Authority is set, then the HTTP method *must* be CONNECT.
1408             //
1409             // It is not possible to have a scheme but not an authority set (the
1410             // `http` crate does not allow it).
1411             //
1412             if pseudo.authority.is_none() {
1413                 if version == Version::HTTP_2 {
1414                     return Err(UserError::MissingUriSchemeAndAuthority.into());
1415                 } else {
1416                     // This is acceptable as per the above comment. However,
1417                     // HTTP/2.0 requires that a scheme is set. Since we are
1418                     // forwarding an HTTP 1.1 request, the scheme is set to
1419                     // "http".
1420                     pseudo.set_scheme(uri::Scheme::HTTP);
1421                 }
1422             } else if !is_connect {
1423                 // TODO: Error
1424             }
1425         }
1426 
1427         // Create the HEADERS frame
1428         let mut frame = Headers::new(id, pseudo, headers);
1429 
1430         if end_of_stream {
1431             frame.set_end_stream()
1432         }
1433 
1434         Ok(frame)
1435     }
1436 }
1437 
1438 impl proto::Peer for Peer {
1439     type Poll = Response<()>;
1440 
1441     fn r#dyn() -> proto::DynPeer {
1442         proto::DynPeer::Client
1443     }
1444 
is_server() -> bool1445     fn is_server() -> bool {
1446         false
1447     }
1448 
convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result<Self::Poll, RecvError>1449     fn convert_poll_message(
1450         pseudo: Pseudo,
1451         fields: HeaderMap,
1452         stream_id: StreamId,
1453     ) -> Result<Self::Poll, RecvError> {
1454         let mut b = Response::builder();
1455 
1456         b = b.version(Version::HTTP_2);
1457 
1458         if let Some(status) = pseudo.status {
1459             b = b.status(status);
1460         }
1461 
1462         let mut response = match b.body(()) {
1463             Ok(response) => response,
1464             Err(_) => {
1465                 // TODO: Should there be more specialized handling for different
1466                 // kinds of errors
1467                 return Err(RecvError::Stream {
1468                     id: stream_id,
1469                     reason: Reason::PROTOCOL_ERROR,
1470                 });
1471             }
1472         };
1473 
1474         *response.headers_mut() = fields;
1475 
1476         Ok(response)
1477     }
1478 }
1479