1 //! Server implementation of the HTTP/2.0 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2.0 server requires the caller to manage accepting the
6 //! connections as well as getting the connections to a state that is ready to
7 //! begin the HTTP/2.0 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpListener`] to accept
11 //! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12 //! upgrades.
13 //!
14 //! Once a connection is obtained, it is passed to [`handshake`],
15 //! which will begin the [HTTP/2.0 handshake]. This returns a future that
16 //! completes once the handshake process is performed and HTTP/2.0 streams may
17 //! be received.
18 //!
19 //! [`handshake`] uses default configuration values. There are a number of
20 //! settings that can be changed by using [`Builder`] instead.
21 //!
22 //! # Inbound streams
23 //!
24 //! The [`Connection`] instance is used to accept inbound HTTP/2.0 streams. It
25 //! does this by implementing [`futures::Stream`]. When a new stream is
26 //! received, a call to [`Connection::poll`] will return `(request, response)`.
27 //! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28 //! HTTP request head as well as provides a way to receive the inbound data
29 //! stream and the trailers. The `response` handle (of type [`SendStream`])
30 //! allows responding to the request, stream the response payload, send
31 //! trailers, and send push promises.
32 //!
33 //! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34 //! can be operated independently.
35 //!
36 //! # Managing the connection
37 //!
38 //! The [`Connection`] instance is used to manage connection state. The caller
39 //! is required to call either [`Connection::poll`] or
40 //! [`Connection::poll_close`] in order to advance the connection state. Simply
41 //! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42 //! connection state is advanced.
43 //!
44 //! It is not required to call **both** [`Connection::poll`] and
45 //! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46 //! then only [`Connection::poll`] should be called. When the caller **does
47 //! not** want to accept a new stream, [`Connection::poll_close`] should be
48 //! called.
49 //!
50 //! The [`Connection`] instance should only be dropped once
51 //! [`Connection::poll_close`] returns `Ready`. Once [`Connection::poll`]
52 //! returns `Ready(None)`, there will no longer be any more inbound streams. At
53 //! this point, only [`Connection::poll_close`] should be called.
54 //!
55 //! # Shutting down the server
56 //!
57 //! Graceful shutdown of the server is [not yet
58 //! implemented](https://github.com/hyperium/h2/issues/69).
59 //!
60 //! # Example
61 //!
62 //! A basic HTTP/2.0 server example that runs over TCP and assumes [prior
63 //! knowledge], i.e. both the client and the server assume that the TCP socket
64 //! will use the HTTP/2.0 protocol without prior negotiation.
65 //!
66 //! ```no_run
67 //! use h2::server;
68 //! use http::{Response, StatusCode};
69 //! use tokio::net::TcpListener;
70 //!
71 //! #[tokio::main]
72 //! pub async fn main() {
73 //!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74 //!
75 //!     // Accept all incoming TCP connections.
76 //!     loop {
77 //!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78 //!             // Spawn a new task to process each connection.
79 //!             tokio::spawn(async {
80 //!                 // Start the HTTP/2.0 connection handshake
81 //!                 let mut h2 = server::handshake(socket).await.unwrap();
82 //!                 // Accept all inbound HTTP/2.0 streams sent over the
83 //!                 // connection.
84 //!                 while let Some(request) = h2.accept().await {
85 //!                     let (request, mut respond) = request.unwrap();
86 //!                     println!("Received request: {:?}", request);
87 //!
88 //!                     // Build a response with no body
89 //!                     let response = Response::builder()
90 //!                         .status(StatusCode::OK)
91 //!                         .body(())
92 //!                         .unwrap();
93 //!
94 //!                     // Send the response back to the client
95 //!                     respond.send_response(response, true)
96 //!                         .unwrap();
97 //!                 }
98 //!
99 //!             });
100 //!         }
101 //!     }
102 //! }
103 //! ```
104 //!
105 //! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106 //! [`handshake`]: fn.handshake.html
107 //! [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108 //! [`Builder`]: struct.Builder.html
109 //! [`Connection`]: struct.Connection.html
110 //! [`Connection::poll`]: struct.Connection.html#method.poll
111 //! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112 //! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113 //! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114 //! [`RecvStream`]: ../struct.RecvStream.html
115 //! [`SendStream`]: ../struct.SendStream.html
116 //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117 
118 use crate::codec::{Codec, RecvError, UserError};
119 use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120 use crate::proto::{self, Config, Prioritized};
121 use crate::{FlowControl, PingPong, RecvStream, SendStream};
122 
123 use bytes::{Buf, Bytes};
124 use http::{HeaderMap, Request, Response};
125 use std::future::Future;
126 use std::pin::Pin;
127 use std::task::{Context, Poll};
128 use std::time::Duration;
129 use std::{convert, fmt, io, mem};
130 use tokio::io::{AsyncRead, AsyncWrite};
131 
132 /// In progress HTTP/2.0 connection handshake future.
133 ///
134 /// This type implements `Future`, yielding a `Connection` instance once the
135 /// handshake has completed.
136 ///
137 /// The handshake is completed once the connection preface is fully received
138 /// from the client **and** the initial settings frame is sent to the client.
139 ///
140 /// The handshake future does not wait for the initial settings frame from the
141 /// client.
142 ///
143 /// See [module] level docs for more details.
144 ///
145 /// [module]: index.html
146 #[must_use = "futures do nothing unless polled"]
147 pub struct Handshake<T, B: Buf = Bytes> {
148     /// The config to pass to Connection::new after handshake succeeds.
149     builder: Builder,
150     /// The current state of the handshake.
151     state: Handshaking<T, B>,
152 }
153 
154 /// Accepts inbound HTTP/2.0 streams on a connection.
155 ///
156 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
157 /// implements the HTTP/2.0 server logic for that connection. It is responsible
158 /// for receiving inbound streams initiated by the client as well as driving the
159 /// internal state forward.
160 ///
161 /// `Connection` values are created by calling [`handshake`]. Once a
162 /// `Connection` value is obtained, the caller must call [`poll`] or
163 /// [`poll_close`] in order to drive the internal connection state forward.
164 ///
165 /// See [module level] documentation for more details
166 ///
167 /// [module level]: index.html
168 /// [`handshake`]: struct.Connection.html#method.handshake
169 /// [`poll`]: struct.Connection.html#method.poll
170 /// [`poll_close`]: struct.Connection.html#method.poll_close
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// # use tokio::io::{AsyncRead, AsyncWrite};
176 /// # use h2::server;
177 /// # use h2::server::*;
178 /// #
179 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
180 /// let mut server = server::handshake(my_io).await.unwrap();
181 /// while let Some(request) = server.accept().await {
182 ///     let (request, respond) = request.unwrap();
183 ///     // Process the request and send the response back to the client
184 ///     // using `respond`.
185 /// }
186 /// # }
187 /// #
188 /// # pub fn main() {}
189 /// ```
190 #[must_use = "streams do nothing unless polled"]
191 pub struct Connection<T, B: Buf> {
192     connection: proto::Connection<T, Peer, B>,
193 }
194 
195 /// Builds server connections with custom configuration values.
196 ///
197 /// Methods can be chained in order to set the configuration values.
198 ///
199 /// The server is constructed by calling [`handshake`] and passing the I/O
200 /// handle that will back the HTTP/2.0 server.
201 ///
202 /// New instances of `Builder` are obtained via [`Builder::new`].
203 ///
204 /// See function level documentation for details on the various server
205 /// configuration settings.
206 ///
207 /// [`Builder::new`]: struct.Builder.html#method.new
208 /// [`handshake`]: struct.Builder.html#method.handshake
209 ///
210 /// # Examples
211 ///
212 /// ```
213 /// # use tokio::io::{AsyncRead, AsyncWrite};
214 /// # use h2::server::*;
215 /// #
216 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
217 /// # -> Handshake<T>
218 /// # {
219 /// // `server_fut` is a future representing the completion of the HTTP/2.0
220 /// // handshake.
221 /// let server_fut = Builder::new()
222 ///     .initial_window_size(1_000_000)
223 ///     .max_concurrent_streams(1000)
224 ///     .handshake(my_io);
225 /// # server_fut
226 /// # }
227 /// #
228 /// # pub fn main() {}
229 /// ```
230 #[derive(Clone, Debug)]
231 pub struct Builder {
232     /// Time to keep locally reset streams around before reaping.
233     reset_stream_duration: Duration,
234 
235     /// Maximum number of locally reset streams to keep at a time.
236     reset_stream_max: usize,
237 
238     /// Initial `Settings` frame to send as part of the handshake.
239     settings: Settings,
240 
241     /// Initial target window size for new connections.
242     initial_target_connection_window_size: Option<u32>,
243 }
244 
245 /// Send a response back to the client
246 ///
247 /// A `SendResponse` instance is provided when receiving a request and is used
248 /// to send the associated response back to the client. It is also used to
249 /// explicitly reset the stream with a custom reason.
250 ///
251 /// It will also be used to initiate push promises linked with the associated
252 /// stream.
253 ///
254 /// If the `SendResponse` instance is dropped without sending a response, then
255 /// the HTTP/2.0 stream will be reset.
256 ///
257 /// See [module] level docs for more details.
258 ///
259 /// [module]: index.html
260 #[derive(Debug)]
261 pub struct SendResponse<B: Buf> {
262     inner: proto::StreamRef<B>,
263 }
264 
265 /// Send a response to a promised request
266 ///
267 /// A `SendPushedResponse` instance is provided when promising a request and is used
268 /// to send the associated response to the client. It is also used to
269 /// explicitly reset the stream with a custom reason.
270 ///
271 /// It can not be used to initiate push promises.
272 ///
273 /// If the `SendPushedResponse` instance is dropped without sending a response, then
274 /// the HTTP/2.0 stream will be reset.
275 ///
276 /// See [module] level docs for more details.
277 ///
278 /// [module]: index.html
279 pub struct SendPushedResponse<B: Buf> {
280     inner: SendResponse<B>,
281 }
282 
283 // Manual implementation necessary because of rust-lang/rust#26925
284 impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
285     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
286         write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
287     }
288 }
289 
290 /// Stages of an in-progress handshake.
291 enum Handshaking<T, B: Buf> {
292     /// State 1. Connection is flushing pending SETTINGS frame.
293     Flushing(Flush<T, Prioritized<B>>),
294     /// State 2. Connection is waiting for the client preface.
295     ReadingPreface(ReadPreface<T, Prioritized<B>>),
296     /// Dummy state for `mem::replace`.
297     Empty,
298 }
299 
300 /// Flush a Sink
301 struct Flush<T, B> {
302     codec: Option<Codec<T, B>>,
303 }
304 
305 /// Read the client connection preface
306 struct ReadPreface<T, B> {
307     codec: Option<Codec<T, B>>,
308     pos: usize,
309 }
310 
311 #[derive(Debug)]
312 pub(crate) struct Peer;
313 
314 const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
315 
316 /// Creates a new configured HTTP/2.0 server with default configuration
317 /// values backed by `io`.
318 ///
319 /// It is expected that `io` already be in an appropriate state to commence
320 /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
321 ///
322 /// Returns a future which resolves to the [`Connection`] instance once the
323 /// HTTP/2.0 handshake has been completed. The returned [`Connection`]
324 /// instance will be using default configuration values. Use [`Builder`] to
325 /// customize the configuration values used by a [`Connection`] instance.
326 ///
327 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
328 /// [Handshake]: ../index.html#handshake
329 /// [`Connection`]: struct.Connection.html
330 ///
331 /// # Examples
332 ///
333 /// ```
334 /// # use tokio::io::{AsyncRead, AsyncWrite};
335 /// # use h2::server;
336 /// # use h2::server::*;
337 /// #
338 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
339 /// # {
340 /// let connection = server::handshake(my_io).await.unwrap();
341 /// // The HTTP/2.0 handshake has completed, now use `connection` to
342 /// // accept inbound HTTP/2.0 streams.
343 /// # }
344 /// #
345 /// # pub fn main() {}
346 /// ```
347 pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
348 where
349     T: AsyncRead + AsyncWrite + Unpin,
350 {
351     Builder::new().handshake(io)
352 }
353 
354 // ===== impl Connection =====
355 
356 impl<T, B> Connection<T, B>
357 where
358     T: AsyncRead + AsyncWrite + Unpin,
359     B: Buf + 'static,
360 {
361     fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
362         // Create the codec.
363         let mut codec = Codec::new(io);
364 
365         if let Some(max) = builder.settings.max_frame_size() {
366             codec.set_max_recv_frame_size(max as usize);
367         }
368 
369         if let Some(max) = builder.settings.max_header_list_size() {
370             codec.set_max_recv_header_list_size(max as usize);
371         }
372 
373         // Send initial settings frame.
374         codec
375             .buffer(builder.settings.clone().into())
376             .expect("invalid SETTINGS frame");
377 
378         // Create the handshake future.
379         let state = Handshaking::from(codec);
380 
381         Handshake { builder, state }
382     }
383 
384     /// Accept the next incoming request on this connection.
385     pub async fn accept(
386         &mut self,
387     ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
388         futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
389     }
390 
391     #[doc(hidden)]
392     pub fn poll_accept(
393         &mut self,
394         cx: &mut Context<'_>,
395     ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
396         // Always try to advance the internal state. Getting Pending also is
397         // needed to allow this function to return Pending.
398         if let Poll::Ready(_) = self.poll_closed(cx)? {
399             // If the socket is closed, don't return anything
400             // TODO: drop any pending streams
401             return Poll::Ready(None);
402         }
403 
404         if let Some(inner) = self.connection.next_incoming() {
405             log::trace!("received incoming");
406             let (head, _) = inner.take_request().into_parts();
407             let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
408 
409             let request = Request::from_parts(head, body);
410             let respond = SendResponse { inner };
411 
412             return Poll::Ready(Some(Ok((request, respond))));
413         }
414 
415         Poll::Pending
416     }
417 
418     /// Sets the target window size for the whole connection.
419     ///
420     /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
421     /// frame will be immediately sent to the remote, increasing the connection
422     /// level window by `size - current_value`.
423     ///
424     /// If `size` is less than the current value, nothing will happen
425     /// immediately. However, as window capacity is released by
426     /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
427     /// out until the number of "in flight" bytes drops below `size`.
428     ///
429     /// The default value is 65,535.
430     ///
431     /// See [`FlowControl`] documentation for more details.
432     ///
433     /// [`FlowControl`]: ../struct.FlowControl.html
434     /// [library level]: ../index.html#flow-control
435     pub fn set_target_window_size(&mut self, size: u32) {
436         assert!(size <= proto::MAX_WINDOW_SIZE);
437         self.connection.set_target_window_size(size);
438     }
439 
440     /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
441     /// flow control for received data.
442     ///
443     /// The `SETTINGS` will be sent to the remote, and only applied once the
444     /// remote acknowledges the change.
445     ///
446     /// This can be used to increase or decrease the window size for existing
447     /// streams.
448     ///
449     /// # Errors
450     ///
451     /// Returns an error if a previous call is still pending acknowledgement
452     /// from the remote endpoint.
453     pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
454         assert!(size <= proto::MAX_WINDOW_SIZE);
455         self.connection.set_initial_window_size(size)?;
456         Ok(())
457     }
458 
459     /// Returns `Ready` when the underlying connection has closed.
460     ///
461     /// If any new inbound streams are received during a call to `poll_closed`,
462     /// they will be queued and returned on the next call to [`poll_accept`].
463     ///
464     /// This function will advance the internal connection state, driving
465     /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
466     ///
467     /// See [here](index.html#managing-the-connection) for more details.
468     ///
469     /// [`poll_accept`]: struct.Connection.html#method.poll_accept
470     /// [`RecvStream`]: ../struct.RecvStream.html
471     /// [`SendStream`]: ../struct.SendStream.html
472     pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
473         self.connection.poll(cx).map_err(Into::into)
474     }
475 
476     #[doc(hidden)]
477     #[deprecated(note = "renamed to poll_closed")]
478     pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
479         self.poll_closed(cx)
480     }
481 
482     /// Sets the connection to a GOAWAY state.
483     ///
484     /// Does not terminate the connection. Must continue being polled to close
485     /// connection.
486     ///
487     /// After flushing the GOAWAY frame, the connection is closed. Any
488     /// outstanding streams do not prevent the connection from closing. This
489     /// should usually be reserved for shutting down when something bad
490     /// external to `h2` has happened, and open streams cannot be properly
491     /// handled.
492     ///
493     /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
494     pub fn abrupt_shutdown(&mut self, reason: Reason) {
495         self.connection.go_away_from_user(reason);
496     }
497 
498     /// Starts a [graceful shutdown][1] process.
499     ///
500     /// Must continue being polled to close connection.
501     ///
502     /// It's possible to receive more requests after calling this method, since
503     /// they might have been in-flight from the client already. After about
504     /// 1 RTT, no new requests should be accepted. Once all active streams
505     /// have completed, the connection is closed.
506     ///
507     /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
508     pub fn graceful_shutdown(&mut self) {
509         self.connection.go_away_gracefully();
510     }
511 
512     /// Takes a `PingPong` instance from the connection.
513     ///
514     /// # Note
515     ///
516     /// This may only be called once. Calling multiple times will return `None`.
517     pub fn ping_pong(&mut self) -> Option<PingPong> {
518         self.connection.take_user_pings().map(PingPong::new)
519     }
520 }
521 
522 #[cfg(feature = "stream")]
523 impl<T, B> futures_core::Stream for Connection<T, B>
524 where
525     T: AsyncRead + AsyncWrite + Unpin,
526     B: Buf + 'static,
527 {
528     type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
529 
530     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
531         self.poll_accept(cx)
532     }
533 }
534 
535 impl<T, B> fmt::Debug for Connection<T, B>
536 where
537     T: fmt::Debug,
538     B: fmt::Debug + Buf,
539 {
540     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
541         fmt.debug_struct("Connection")
542             .field("connection", &self.connection)
543             .finish()
544     }
545 }
546 
547 // ===== impl Builder =====
548 
549 impl Builder {
550     /// Returns a new server builder instance initialized with default
551     /// configuration values.
552     ///
553     /// Configuration methods can be chained on the return value.
554     ///
555     /// # Examples
556     ///
557     /// ```
558     /// # use tokio::io::{AsyncRead, AsyncWrite};
559     /// # use h2::server::*;
560     /// #
561     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
562     /// # -> Handshake<T>
563     /// # {
564     /// // `server_fut` is a future representing the completion of the HTTP/2.0
565     /// // handshake.
566     /// let server_fut = Builder::new()
567     ///     .initial_window_size(1_000_000)
568     ///     .max_concurrent_streams(1000)
569     ///     .handshake(my_io);
570     /// # server_fut
571     /// # }
572     /// #
573     /// # pub fn main() {}
574     /// ```
575     pub fn new() -> Builder {
576         Builder {
577             reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
578             reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
579             settings: Settings::default(),
580             initial_target_connection_window_size: None,
581         }
582     }
583 
584     /// Indicates the initial window size (in octets) for stream-level
585     /// flow control for received data.
586     ///
587     /// The initial window of a stream is used as part of flow control. For more
588     /// details, see [`FlowControl`].
589     ///
590     /// The default value is 65,535.
591     ///
592     /// [`FlowControl`]: ../struct.FlowControl.html
593     ///
594     /// # Examples
595     ///
596     /// ```
597     /// # use tokio::io::{AsyncRead, AsyncWrite};
598     /// # use h2::server::*;
599     /// #
600     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
601     /// # -> Handshake<T>
602     /// # {
603     /// // `server_fut` is a future representing the completion of the HTTP/2.0
604     /// // handshake.
605     /// let server_fut = Builder::new()
606     ///     .initial_window_size(1_000_000)
607     ///     .handshake(my_io);
608     /// # server_fut
609     /// # }
610     /// #
611     /// # pub fn main() {}
612     /// ```
613     pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
614         self.settings.set_initial_window_size(Some(size));
615         self
616     }
617 
618     /// Indicates the initial window size (in octets) for connection-level flow control
619     /// for received data.
620     ///
621     /// The initial window of a connection is used as part of flow control. For more details,
622     /// see [`FlowControl`].
623     ///
624     /// The default value is 65,535.
625     ///
626     /// [`FlowControl`]: ../struct.FlowControl.html
627     ///
628     /// # Examples
629     ///
630     /// ```
631     /// # use tokio::io::{AsyncRead, AsyncWrite};
632     /// # use h2::server::*;
633     /// #
634     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
635     /// # -> Handshake<T>
636     /// # {
637     /// // `server_fut` is a future representing the completion of the HTTP/2.0
638     /// // handshake.
639     /// let server_fut = Builder::new()
640     ///     .initial_connection_window_size(1_000_000)
641     ///     .handshake(my_io);
642     /// # server_fut
643     /// # }
644     /// #
645     /// # pub fn main() {}
646     /// ```
647     pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
648         self.initial_target_connection_window_size = Some(size);
649         self
650     }
651 
652     /// Indicates the size (in octets) of the largest HTTP/2.0 frame payload that the
653     /// configured server is able to accept.
654     ///
655     /// The sender may send data frames that are **smaller** than this value,
656     /// but any data larger than `max` will be broken up into multiple `DATA`
657     /// frames.
658     ///
659     /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
660     ///
661     /// # Examples
662     ///
663     /// ```
664     /// # use tokio::io::{AsyncRead, AsyncWrite};
665     /// # use h2::server::*;
666     /// #
667     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
668     /// # -> Handshake<T>
669     /// # {
670     /// // `server_fut` is a future representing the completion of the HTTP/2.0
671     /// // handshake.
672     /// let server_fut = Builder::new()
673     ///     .max_frame_size(1_000_000)
674     ///     .handshake(my_io);
675     /// # server_fut
676     /// # }
677     /// #
678     /// # pub fn main() {}
679     /// ```
680     ///
681     /// # Panics
682     ///
683     /// This function panics if `max` is not within the legal range specified
684     /// above.
685     pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
686         self.settings.set_max_frame_size(Some(max));
687         self
688     }
689 
690     /// Sets the max size of received header frames.
691     ///
692     /// This advisory setting informs a peer of the maximum size of header list
693     /// that the sender is prepared to accept, in octets. The value is based on
694     /// the uncompressed size of header fields, including the length of the name
695     /// and value in octets plus an overhead of 32 octets for each header field.
696     ///
697     /// This setting is also used to limit the maximum amount of data that is
698     /// buffered to decode HEADERS frames.
699     ///
700     /// # Examples
701     ///
702     /// ```
703     /// # use tokio::io::{AsyncRead, AsyncWrite};
704     /// # use h2::server::*;
705     /// #
706     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
707     /// # -> Handshake<T>
708     /// # {
709     /// // `server_fut` is a future representing the completion of the HTTP/2.0
710     /// // handshake.
711     /// let server_fut = Builder::new()
712     ///     .max_header_list_size(16 * 1024)
713     ///     .handshake(my_io);
714     /// # server_fut
715     /// # }
716     /// #
717     /// # pub fn main() {}
718     /// ```
719     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
720         self.settings.set_max_header_list_size(Some(max));
721         self
722     }
723 
724     /// Sets the maximum number of concurrent streams.
725     ///
726     /// The maximum concurrent streams setting only controls the maximum number
727     /// of streams that can be initiated by the remote peer. In other words,
728     /// when this setting is set to 100, this does not limit the number of
729     /// concurrent streams that can be created by the caller.
730     ///
731     /// It is recommended that this value be no smaller than 100, so as to not
732     /// unnecessarily limit parallelism. However, any value is legal, including
733     /// 0. If `max` is set to 0, then the remote will not be permitted to
734     /// initiate streams.
735     ///
736     /// Note that streams in the reserved state, i.e., push promises that have
737     /// been reserved but the stream has not started, do not count against this
738     /// setting.
739     ///
740     /// Also note that if the remote *does* exceed the value set here, it is not
741     /// a protocol level error. Instead, the `h2` library will immediately reset
742     /// the stream.
743     ///
744     /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
745     ///
746     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
747     ///
748     /// # Examples
749     ///
750     /// ```
751     /// # use tokio::io::{AsyncRead, AsyncWrite};
752     /// # use h2::server::*;
753     /// #
754     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
755     /// # -> Handshake<T>
756     /// # {
757     /// // `server_fut` is a future representing the completion of the HTTP/2.0
758     /// // handshake.
759     /// let server_fut = Builder::new()
760     ///     .max_concurrent_streams(1000)
761     ///     .handshake(my_io);
762     /// # server_fut
763     /// # }
764     /// #
765     /// # pub fn main() {}
766     /// ```
767     pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
768         self.settings.set_max_concurrent_streams(Some(max));
769         self
770     }
771 
772     /// Sets the maximum number of concurrent locally reset streams.
773     ///
774     /// When a stream is explicitly reset by either calling
775     /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
776     /// before completing the stream, the HTTP/2.0 specification requires that
777     /// any further frames received for that stream must be ignored for "some
778     /// time".
779     ///
780     /// In order to satisfy the specification, internal state must be maintained
781     /// to implement the behavior. This state grows linearly with the number of
782     /// streams that are locally reset.
783     ///
784     /// The `max_concurrent_reset_streams` setting configures sets an upper
785     /// bound on the amount of state that is maintained. When this max value is
786     /// reached, the oldest reset stream is purged from memory.
787     ///
788     /// Once the stream has been fully purged from memory, any additional frames
789     /// received for that stream will result in a connection level protocol
790     /// error, forcing the connection to terminate.
791     ///
792     /// The default value is 10.
793     ///
794     /// # Examples
795     ///
796     /// ```
797     /// # use tokio::io::{AsyncRead, AsyncWrite};
798     /// # use h2::server::*;
799     /// #
800     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
801     /// # -> Handshake<T>
802     /// # {
803     /// // `server_fut` is a future representing the completion of the HTTP/2.0
804     /// // handshake.
805     /// let server_fut = Builder::new()
806     ///     .max_concurrent_reset_streams(1000)
807     ///     .handshake(my_io);
808     /// # server_fut
809     /// # }
810     /// #
811     /// # pub fn main() {}
812     /// ```
813     pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
814         self.reset_stream_max = max;
815         self
816     }
817 
818     /// Sets the maximum number of concurrent locally reset streams.
819     ///
820     /// When a stream is explicitly reset by either calling
821     /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
822     /// before completing the stream, the HTTP/2.0 specification requires that
823     /// any further frames received for that stream must be ignored for "some
824     /// time".
825     ///
826     /// In order to satisfy the specification, internal state must be maintained
827     /// to implement the behavior. This state grows linearly with the number of
828     /// streams that are locally reset.
829     ///
830     /// The `reset_stream_duration` setting configures the max amount of time
831     /// this state will be maintained in memory. Once the duration elapses, the
832     /// stream state is purged from memory.
833     ///
834     /// Once the stream has been fully purged from memory, any additional frames
835     /// received for that stream will result in a connection level protocol
836     /// error, forcing the connection to terminate.
837     ///
838     /// The default value is 30 seconds.
839     ///
840     /// # Examples
841     ///
842     /// ```
843     /// # use tokio::io::{AsyncRead, AsyncWrite};
844     /// # use h2::server::*;
845     /// # use std::time::Duration;
846     /// #
847     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
848     /// # -> Handshake<T>
849     /// # {
850     /// // `server_fut` is a future representing the completion of the HTTP/2.0
851     /// // handshake.
852     /// let server_fut = Builder::new()
853     ///     .reset_stream_duration(Duration::from_secs(10))
854     ///     .handshake(my_io);
855     /// # server_fut
856     /// # }
857     /// #
858     /// # pub fn main() {}
859     /// ```
860     pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
861         self.reset_stream_duration = dur;
862         self
863     }
864 
865     /// Creates a new configured HTTP/2.0 server backed by `io`.
866     ///
867     /// It is expected that `io` already be in an appropriate state to commence
868     /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
869     ///
870     /// Returns a future which resolves to the [`Connection`] instance once the
871     /// HTTP/2.0 handshake has been completed.
872     ///
873     /// This function also allows the caller to configure the send payload data
874     /// type. See [Outbound data type] for more details.
875     ///
876     /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
877     /// [Handshake]: ../index.html#handshake
878     /// [`Connection`]: struct.Connection.html
879     /// [Outbound data type]: ../index.html#outbound-data-type.
880     ///
881     /// # Examples
882     ///
883     /// Basic usage:
884     ///
885     /// ```
886     /// # use tokio::io::{AsyncRead, AsyncWrite};
887     /// # use h2::server::*;
888     /// #
889     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
890     /// # -> Handshake<T>
891     /// # {
892     /// // `server_fut` is a future representing the completion of the HTTP/2.0
893     /// // handshake.
894     /// let server_fut = Builder::new()
895     ///     .handshake(my_io);
896     /// # server_fut
897     /// # }
898     /// #
899     /// # pub fn main() {}
900     /// ```
901     ///
902     /// Configures the send-payload data type. In this case, the outbound data
903     /// type will be `&'static [u8]`.
904     ///
905     /// ```
906     /// # use tokio::io::{AsyncRead, AsyncWrite};
907     /// # use h2::server::*;
908     /// #
909     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
910     /// # -> Handshake<T, &'static [u8]>
911     /// # {
912     /// // `server_fut` is a future representing the completion of the HTTP/2.0
913     /// // handshake.
914     /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
915     ///     .handshake(my_io);
916     /// # server_fut
917     /// # }
918     /// #
919     /// # pub fn main() {}
920     /// ```
921     pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
922     where
923         T: AsyncRead + AsyncWrite + Unpin,
924         B: Buf + 'static,
925     {
926         Connection::handshake2(io, self.clone())
927     }
928 }
929 
930 impl Default for Builder {
931     fn default() -> Builder {
932         Builder::new()
933     }
934 }
935 
936 // ===== impl SendResponse =====
937 
938 impl<B: Buf> SendResponse<B> {
939     /// Send a response to a client request.
940     ///
941     /// On success, a [`SendStream`] instance is returned. This instance can be
942     /// used to stream the response body and send trailers.
943     ///
944     /// If a body or trailers will be sent on the returned [`SendStream`]
945     /// instance, then `end_of_stream` must be set to `false` when calling this
946     /// function.
947     ///
948     /// The [`SendResponse`] instance is already associated with a received
949     /// request.  This function may only be called once per instance and only if
950     /// [`send_reset`] has not been previously called.
951     ///
952     /// [`SendResponse`]: #
953     /// [`SendStream`]: ../struct.SendStream.html
954     /// [`send_reset`]: #method.send_reset
955     pub fn send_response(
956         &mut self,
957         response: Response<()>,
958         end_of_stream: bool,
959     ) -> Result<SendStream<B>, crate::Error> {
960         self.inner
961             .send_response(response, end_of_stream)
962             .map(|_| SendStream::new(self.inner.clone()))
963             .map_err(Into::into)
964     }
965 
966     /// Push a request and response to the client
967     ///
968     /// On success, a [`SendResponse`] instance is returned.
969     ///
970     /// [`SendResponse`]: #
971     pub fn push_request(
972         &mut self,
973         request: Request<()>,
974     ) -> Result<SendPushedResponse<B>, crate::Error> {
975         self.inner
976             .send_push_promise(request)
977             .map(|inner| SendPushedResponse {
978                 inner: SendResponse { inner },
979             })
980             .map_err(Into::into)
981     }
982 
983     /// Send a stream reset to the peer.
984     ///
985     /// This essentially cancels the stream, including any inbound or outbound
986     /// data streams.
987     ///
988     /// If this function is called before [`send_response`], a call to
989     /// [`send_response`] will result in an error.
990     ///
991     /// If this function is called while a [`SendStream`] instance is active,
992     /// any further use of the instance will result in an error.
993     ///
994     /// This function should only be called once.
995     ///
996     /// [`send_response`]: #method.send_response
997     /// [`SendStream`]: ../struct.SendStream.html
998     pub fn send_reset(&mut self, reason: Reason) {
999         self.inner.send_reset(reason)
1000     }
1001 
1002     /// Polls to be notified when the client resets this stream.
1003     ///
1004     /// If stream is still open, this returns `Poll::Pending`, and
1005     /// registers the task to be notified if a `RST_STREAM` is received.
1006     ///
1007     /// If a `RST_STREAM` frame is received for this stream, calling this
1008     /// method will yield the `Reason` for the reset.
1009     ///
1010     /// # Error
1011     ///
1012     /// Calling this method after having called `send_response` will return
1013     /// a user error.
1014     pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1015         self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1016     }
1017 
1018     /// Returns the stream ID of the response stream.
1019     ///
1020     /// # Panics
1021     ///
1022     /// If the lock on the strean store has been poisoned.
1023     pub fn stream_id(&self) -> crate::StreamId {
1024         crate::StreamId::from_internal(self.inner.stream_id())
1025     }
1026 }
1027 
1028 // ===== impl SendPushedResponse =====
1029 
1030 impl<B: Buf> SendPushedResponse<B> {
1031     /// Send a response to a promised request.
1032     ///
1033     /// On success, a [`SendStream`] instance is returned. This instance can be
1034     /// used to stream the response body and send trailers.
1035     ///
1036     /// If a body or trailers will be sent on the returned [`SendStream`]
1037     /// instance, then `end_of_stream` must be set to `false` when calling this
1038     /// function.
1039     ///
1040     /// The [`SendPushedResponse`] instance is associated with a promised
1041     /// request.  This function may only be called once per instance and only if
1042     /// [`send_reset`] has not been previously called.
1043     ///
1044     /// [`SendPushedResponse`]: #
1045     /// [`SendStream`]: ../struct.SendStream.html
1046     /// [`send_reset`]: #method.send_reset
1047     pub fn send_response(
1048         &mut self,
1049         response: Response<()>,
1050         end_of_stream: bool,
1051     ) -> Result<SendStream<B>, crate::Error> {
1052         self.inner.send_response(response, end_of_stream)
1053     }
1054 
1055     /// Send a stream reset to the peer.
1056     ///
1057     /// This essentially cancels the stream, including any inbound or outbound
1058     /// data streams.
1059     ///
1060     /// If this function is called before [`send_response`], a call to
1061     /// [`send_response`] will result in an error.
1062     ///
1063     /// If this function is called while a [`SendStream`] instance is active,
1064     /// any further use of the instance will result in an error.
1065     ///
1066     /// This function should only be called once.
1067     ///
1068     /// [`send_response`]: #method.send_response
1069     /// [`SendStream`]: ../struct.SendStream.html
1070     pub fn send_reset(&mut self, reason: Reason) {
1071         self.inner.send_reset(reason)
1072     }
1073 
1074     /// Polls to be notified when the client resets this stream.
1075     ///
1076     /// If stream is still open, this returns `Poll::Pending`, and
1077     /// registers the task to be notified if a `RST_STREAM` is received.
1078     ///
1079     /// If a `RST_STREAM` frame is received for this stream, calling this
1080     /// method will yield the `Reason` for the reset.
1081     ///
1082     /// # Error
1083     ///
1084     /// Calling this method after having called `send_response` will return
1085     /// a user error.
1086     pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1087         self.inner.poll_reset(cx)
1088     }
1089 
1090     /// Returns the stream ID of the response stream.
1091     ///
1092     /// # Panics
1093     ///
1094     /// If the lock on the strean store has been poisoned.
1095     pub fn stream_id(&self) -> crate::StreamId {
1096         self.inner.stream_id()
1097     }
1098 }
1099 
1100 // ===== impl Flush =====
1101 
1102 impl<T, B: Buf> Flush<T, B> {
1103     fn new(codec: Codec<T, B>) -> Self {
1104         Flush { codec: Some(codec) }
1105     }
1106 }
1107 
1108 impl<T, B> Future for Flush<T, B>
1109 where
1110     T: AsyncWrite + Unpin,
1111     B: Buf,
1112 {
1113     type Output = Result<Codec<T, B>, crate::Error>;
1114 
1115     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1116         // Flush the codec
1117         ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1118 
1119         // Return the codec
1120         Poll::Ready(Ok(self.codec.take().unwrap()))
1121     }
1122 }
1123 
1124 impl<T, B: Buf> ReadPreface<T, B> {
1125     fn new(codec: Codec<T, B>) -> Self {
1126         ReadPreface {
1127             codec: Some(codec),
1128             pos: 0,
1129         }
1130     }
1131 
1132     fn inner_mut(&mut self) -> &mut T {
1133         self.codec.as_mut().unwrap().get_mut()
1134     }
1135 }
1136 
1137 impl<T, B> Future for ReadPreface<T, B>
1138 where
1139     T: AsyncRead + Unpin,
1140     B: Buf,
1141 {
1142     type Output = Result<Codec<T, B>, crate::Error>;
1143 
1144     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1145         let mut buf = [0; 24];
1146         let mut rem = PREFACE.len() - self.pos;
1147 
1148         while rem > 0 {
1149             let n = ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf[..rem]))
1150                 .map_err(crate::Error::from_io)?;
1151             if n == 0 {
1152                 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1153                     io::ErrorKind::UnexpectedEof,
1154                     "connection closed before reading preface",
1155                 ))));
1156             }
1157 
1158             if PREFACE[self.pos..self.pos + n] != buf[..n] {
1159                 proto_err!(conn: "read_preface: invalid preface");
1160                 // TODO: Should this just write the GO_AWAY frame directly?
1161                 return Poll::Ready(Err(Reason::PROTOCOL_ERROR.into()));
1162             }
1163 
1164             self.pos += n;
1165             rem -= n; // TODO test
1166         }
1167 
1168         Poll::Ready(Ok(self.codec.take().unwrap()))
1169     }
1170 }
1171 
1172 // ===== impl Handshake =====
1173 
1174 impl<T, B: Buf> Future for Handshake<T, B>
1175 where
1176     T: AsyncRead + AsyncWrite + Unpin,
1177     B: Buf + 'static,
1178 {
1179     type Output = Result<Connection<T, B>, crate::Error>;
1180 
1181     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1182         log::trace!("Handshake::poll(); state={:?};", self.state);
1183         use crate::server::Handshaking::*;
1184 
1185         self.state = if let Flushing(ref mut flush) = self.state {
1186             // We're currently flushing a pending SETTINGS frame. Poll the
1187             // flush future, and, if it's completed, advance our state to wait
1188             // for the client preface.
1189             let codec = match Pin::new(flush).poll(cx)? {
1190                 Poll::Pending => {
1191                     log::trace!("Handshake::poll(); flush.poll()=Pending");
1192                     return Poll::Pending;
1193                 }
1194                 Poll::Ready(flushed) => {
1195                     log::trace!("Handshake::poll(); flush.poll()=Ready");
1196                     flushed
1197                 }
1198             };
1199             Handshaking::from(ReadPreface::new(codec))
1200         } else {
1201             // Otherwise, we haven't actually advanced the state, but we have
1202             // to replace it with itself, because we have to return a value.
1203             // (note that the assignment to `self.state` has to be outside of
1204             // the `if let` block above in order to placate the borrow checker).
1205             mem::replace(&mut self.state, Handshaking::Empty)
1206         };
1207         let poll = if let ReadingPreface(ref mut read) = self.state {
1208             // We're now waiting for the client preface. Poll the `ReadPreface`
1209             // future. If it has completed, we will create a `Connection` handle
1210             // for the connection.
1211             Pin::new(read).poll(cx)
1212         // Actually creating the `Connection` has to occur outside of this
1213         // `if let` block, because we've borrowed `self` mutably in order
1214         // to poll the state and won't be able to borrow the SETTINGS frame
1215         // as well until we release the borrow for `poll()`.
1216         } else {
1217             unreachable!("Handshake::poll() state was not advanced completely!")
1218         };
1219         poll?.map(|codec| {
1220             let connection = proto::Connection::new(
1221                 codec,
1222                 Config {
1223                     next_stream_id: 2.into(),
1224                     // Server does not need to locally initiate any streams
1225                     initial_max_send_streams: 0,
1226                     reset_stream_duration: self.builder.reset_stream_duration,
1227                     reset_stream_max: self.builder.reset_stream_max,
1228                     settings: self.builder.settings.clone(),
1229                 },
1230             );
1231 
1232             log::trace!("Handshake::poll(); connection established!");
1233             let mut c = Connection { connection };
1234             if let Some(sz) = self.builder.initial_target_connection_window_size {
1235                 c.set_target_window_size(sz);
1236             }
1237             Ok(c)
1238         })
1239     }
1240 }
1241 
1242 impl<T, B> fmt::Debug for Handshake<T, B>
1243 where
1244     T: AsyncRead + AsyncWrite + fmt::Debug,
1245     B: fmt::Debug + Buf,
1246 {
1247     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1248         write!(fmt, "server::Handshake")
1249     }
1250 }
1251 
1252 impl Peer {
1253     pub fn convert_send_message(
1254         id: StreamId,
1255         response: Response<()>,
1256         end_of_stream: bool,
1257     ) -> frame::Headers {
1258         use http::response::Parts;
1259 
1260         // Extract the components of the HTTP request
1261         let (
1262             Parts {
1263                 status, headers, ..
1264             },
1265             _,
1266         ) = response.into_parts();
1267 
1268         // Build the set pseudo header set. All requests will include `method`
1269         // and `path`.
1270         let pseudo = Pseudo::response(status);
1271 
1272         // Create the HEADERS frame
1273         let mut frame = frame::Headers::new(id, pseudo, headers);
1274 
1275         if end_of_stream {
1276             frame.set_end_stream()
1277         }
1278 
1279         frame
1280     }
1281 
1282     pub fn convert_push_message(
1283         stream_id: StreamId,
1284         promised_id: StreamId,
1285         request: Request<()>,
1286     ) -> Result<frame::PushPromise, UserError> {
1287         use http::request::Parts;
1288 
1289         if let Err(e) = frame::PushPromise::validate_request(&request) {
1290             use PushPromiseHeaderError::*;
1291             match e {
1292                 NotSafeAndCacheable => log::debug!(
1293                     "convert_push_message: method {} is not safe and cacheable; promised_id={:?}",
1294                     request.method(),
1295                     promised_id,
1296                 ),
1297                 InvalidContentLength(e) => log::debug!(
1298                     "convert_push_message; promised request has invalid content-length {:?}; promised_id={:?}",
1299                     e,
1300                     promised_id,
1301                 ),
1302             }
1303             return Err(UserError::MalformedHeaders);
1304         }
1305 
1306         // Extract the components of the HTTP request
1307         let (
1308             Parts {
1309                 method,
1310                 uri,
1311                 headers,
1312                 ..
1313             },
1314             _,
1315         ) = request.into_parts();
1316 
1317         let pseudo = Pseudo::request(method, uri);
1318 
1319         Ok(frame::PushPromise::new(
1320             stream_id,
1321             promised_id,
1322             pseudo,
1323             headers,
1324         ))
1325     }
1326 }
1327 
1328 impl proto::Peer for Peer {
1329     type Poll = Request<()>;
1330 
1331     fn is_server() -> bool {
1332         true
1333     }
1334 
1335     fn r#dyn() -> proto::DynPeer {
1336         proto::DynPeer::Server
1337     }
1338 
1339     fn convert_poll_message(
1340         pseudo: Pseudo,
1341         fields: HeaderMap,
1342         stream_id: StreamId,
1343     ) -> Result<Self::Poll, RecvError> {
1344         use http::{uri, Version};
1345 
1346         let mut b = Request::builder();
1347 
1348         macro_rules! malformed {
1349             ($($arg:tt)*) => {{
1350                 log::debug!($($arg)*);
1351                 return Err(RecvError::Stream {
1352                     id: stream_id,
1353                     reason: Reason::PROTOCOL_ERROR,
1354                 });
1355             }}
1356         };
1357 
1358         b = b.version(Version::HTTP_2);
1359 
1360         if let Some(method) = pseudo.method {
1361             b = b.method(method);
1362         } else {
1363             malformed!("malformed headers: missing method");
1364         }
1365 
1366         // Specifying :status for a request is a protocol error
1367         if pseudo.status.is_some() {
1368             log::trace!("malformed headers: :status field on request; PROTOCOL_ERROR");
1369             return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
1370         }
1371 
1372         // Convert the URI
1373         let mut parts = uri::Parts::default();
1374 
1375         // A request translated from HTTP/1 must not include the :authority
1376         // header
1377         if let Some(authority) = pseudo.authority {
1378             let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1379             parts.authority = Some(maybe_authority.or_else(|why| {
1380                 malformed!(
1381                     "malformed headers: malformed authority ({:?}): {}",
1382                     authority,
1383                     why,
1384                 )
1385             })?);
1386         }
1387 
1388         // A :scheme is always required.
1389         if let Some(scheme) = pseudo.scheme {
1390             let maybe_scheme = scheme.parse();
1391             let scheme = maybe_scheme.or_else(|why| {
1392                 malformed!(
1393                     "malformed headers: malformed scheme ({:?}): {}",
1394                     scheme,
1395                     why,
1396                 )
1397             })?;
1398 
1399             // It's not possible to build an `Uri` from a scheme and path. So,
1400             // after validating is was a valid scheme, we just have to drop it
1401             // if there isn't an :authority.
1402             if parts.authority.is_some() {
1403                 parts.scheme = Some(scheme);
1404             }
1405         } else {
1406             malformed!("malformed headers: missing scheme");
1407         }
1408 
1409         if let Some(path) = pseudo.path {
1410             // This cannot be empty
1411             if path.is_empty() {
1412                 malformed!("malformed headers: missing path");
1413             }
1414 
1415             let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1416             parts.path_and_query = Some(maybe_path.or_else(|why| {
1417                 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1418             })?);
1419         }
1420 
1421         b = b.uri(parts);
1422 
1423         let mut request = match b.body(()) {
1424             Ok(request) => request,
1425             Err(e) => {
1426                 // TODO: Should there be more specialized handling for different
1427                 // kinds of errors
1428                 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1429                 return Err(RecvError::Stream {
1430                     id: stream_id,
1431                     reason: Reason::PROTOCOL_ERROR,
1432                 });
1433             }
1434         };
1435 
1436         *request.headers_mut() = fields;
1437 
1438         Ok(request)
1439     }
1440 }
1441 
1442 // ===== impl Handshaking =====
1443 
1444 impl<T, B> fmt::Debug for Handshaking<T, B>
1445 where
1446     B: Buf,
1447 {
1448     #[inline]
1449     fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1450         match *self {
1451             Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"),
1452             Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"),
1453             Handshaking::Empty => write!(f, "Handshaking::Empty"),
1454         }
1455     }
1456 }
1457 
1458 impl<T, B> convert::From<Flush<T, Prioritized<B>>> for Handshaking<T, B>
1459 where
1460     T: AsyncRead + AsyncWrite,
1461     B: Buf,
1462 {
1463     #[inline]
1464     fn from(flush: Flush<T, Prioritized<B>>) -> Self {
1465         Handshaking::Flushing(flush)
1466     }
1467 }
1468 
1469 impl<T, B> convert::From<ReadPreface<T, Prioritized<B>>> for Handshaking<T, B>
1470 where
1471     T: AsyncRead + AsyncWrite,
1472     B: Buf,
1473 {
1474     #[inline]
1475     fn from(read: ReadPreface<T, Prioritized<B>>) -> Self {
1476         Handshaking::ReadingPreface(read)
1477     }
1478 }
1479 
1480 impl<T, B> convert::From<Codec<T, Prioritized<B>>> for Handshaking<T, B>
1481 where
1482     T: AsyncRead + AsyncWrite,
1483     B: Buf,
1484 {
1485     #[inline]
1486     fn from(codec: Codec<T, Prioritized<B>>) -> Self {
1487         Handshaking::from(Flush::new(codec))
1488     }
1489 }
1490