1 //! Server implementation of the HTTP/2.0 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2.0 server requires the caller to manage accepting the
6 //! connections as well as getting the connections to a state that is ready to
7 //! begin the HTTP/2.0 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpListener`] to accept
11 //! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12 //! upgrades.
13 //!
14 //! Once a connection is obtained, it is passed to [`handshake`],
15 //! which will begin the [HTTP/2.0 handshake]. This returns a future that
16 //! completes once the handshake process is performed and HTTP/2.0 streams may
17 //! be received.
18 //!
19 //! [`handshake`] uses default configuration values. There are a number of
20 //! settings that can be changed by using [`Builder`] instead.
21 //!
22 //! # Inbound streams
23 //!
24 //! The [`Connection`] instance is used to accept inbound HTTP/2.0 streams. It
25 //! does this by implementing [`futures::Stream`]. When a new stream is
26 //! received, a call to [`Connection::accept`] will return `(request, response)`.
27 //! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28 //! HTTP request head as well as provides a way to receive the inbound data
29 //! stream and the trailers. The `response` handle (of type [`SendResponse`])
30 //! allows responding to the request, stream the response payload, send
31 //! trailers, and send push promises.
32 //!
33 //! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34 //! can be operated independently.
35 //!
36 //! # Managing the connection
37 //!
38 //! The [`Connection`] instance is used to manage connection state. The caller
39 //! is required to call either [`Connection::accept`] or
40 //! [`Connection::poll_close`] in order to advance the connection state. Simply
41 //! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42 //! connection state is advanced.
43 //!
44 //! It is not required to call **both** [`Connection::accept`] and
45 //! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46 //! then only [`Connection::accept`] should be called. When the caller **does
47 //! not** want to accept a new stream, [`Connection::poll_close`] should be
48 //! called.
49 //!
50 //! The [`Connection`] instance should only be dropped once
51 //! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52 //! returns `Ready(None)`, there will no longer be any more inbound streams. At
53 //! this point, only [`Connection::poll_close`] should be called.
54 //!
55 //! # Shutting down the server
56 //!
57 //! Graceful shutdown of the server is [not yet
58 //! implemented](https://github.com/hyperium/h2/issues/69).
59 //!
60 //! # Example
61 //!
62 //! A basic HTTP/2.0 server example that runs over TCP and assumes [prior
63 //! knowledge], i.e. both the client and the server assume that the TCP socket
64 //! will use the HTTP/2.0 protocol without prior negotiation.
65 //!
66 //! ```no_run
67 //! use h2::server;
68 //! use http::{Response, StatusCode};
69 //! use tokio::net::TcpListener;
70 //!
71 //! #[tokio::main]
72 //! pub async fn main() {
73 //!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74 //!
75 //!     // Accept all incoming TCP connections.
76 //!     loop {
77 //!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78 //!             // Spawn a new task to process each connection.
79 //!             tokio::spawn(async {
80 //!                 // Start the HTTP/2.0 connection handshake
81 //!                 let mut h2 = server::handshake(socket).await.unwrap();
82 //!                 // Accept all inbound HTTP/2.0 streams sent over the
83 //!                 // connection.
84 //!                 while let Some(request) = h2.accept().await {
85 //!                     let (request, mut respond) = request.unwrap();
86 //!                     println!("Received request: {:?}", request);
87 //!
88 //!                     // Build a response with no body
89 //!                     let response = Response::builder()
90 //!                         .status(StatusCode::OK)
91 //!                         .body(())
92 //!                         .unwrap();
93 //!
94 //!                     // Send the response back to the client
95 //!                     respond.send_response(response, true)
96 //!                         .unwrap();
97 //!                 }
98 //!
99 //!             });
100 //!         }
101 //!     }
102 //! }
103 //! ```
104 //!
105 //! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106 //! [`handshake`]: fn.handshake.html
107 //! [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108 //! [`Builder`]: struct.Builder.html
109 //! [`Connection`]: struct.Connection.html
110 //! [`Connection::poll`]: struct.Connection.html#method.poll
111 //! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112 //! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113 //! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114 //! [`RecvStream`]: ../struct.RecvStream.html
115 //! [`SendStream`]: ../struct.SendStream.html
116 //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117 
118 use crate::codec::{Codec, UserError};
119 use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120 use crate::proto::{self, Config, Error, Prioritized};
121 use crate::{FlowControl, PingPong, RecvStream, SendStream};
122 
123 use bytes::{Buf, Bytes};
124 use http::{HeaderMap, Method, Request, Response};
125 use std::future::Future;
126 use std::pin::Pin;
127 use std::task::{Context, Poll};
128 use std::time::Duration;
129 use std::{convert, fmt, io, mem};
130 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131 use tracing::instrument::{Instrument, Instrumented};
132 
133 /// In progress HTTP/2.0 connection handshake future.
134 ///
135 /// This type implements `Future`, yielding a `Connection` instance once the
136 /// handshake has completed.
137 ///
138 /// The handshake is completed once the connection preface is fully received
139 /// from the client **and** the initial settings frame is sent to the client.
140 ///
141 /// The handshake future does not wait for the initial settings frame from the
142 /// client.
143 ///
144 /// See [module] level docs for more details.
145 ///
146 /// [module]: index.html
147 #[must_use = "futures do nothing unless polled"]
148 pub struct Handshake<T, B: Buf = Bytes> {
149     /// The config to pass to Connection::new after handshake succeeds.
150     builder: Builder,
151     /// The current state of the handshake.
152     state: Handshaking<T, B>,
153     /// Span tracking the handshake
154     span: tracing::Span,
155 }
156 
157 /// Accepts inbound HTTP/2.0 streams on a connection.
158 ///
159 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160 /// implements the HTTP/2.0 server logic for that connection. It is responsible
161 /// for receiving inbound streams initiated by the client as well as driving the
162 /// internal state forward.
163 ///
164 /// `Connection` values are created by calling [`handshake`]. Once a
165 /// `Connection` value is obtained, the caller must call [`poll`] or
166 /// [`poll_close`] in order to drive the internal connection state forward.
167 ///
168 /// See [module level] documentation for more details
169 ///
170 /// [module level]: index.html
171 /// [`handshake`]: struct.Connection.html#method.handshake
172 /// [`poll`]: struct.Connection.html#method.poll
173 /// [`poll_close`]: struct.Connection.html#method.poll_close
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// # use tokio::io::{AsyncRead, AsyncWrite};
179 /// # use h2::server;
180 /// # use h2::server::*;
181 /// #
182 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183 /// let mut server = server::handshake(my_io).await.unwrap();
184 /// while let Some(request) = server.accept().await {
185 ///     tokio::spawn(async move {
186 ///         let (request, respond) = request.unwrap();
187 ///         // Process the request and send the response back to the client
188 ///         // using `respond`.
189 ///     });
190 /// }
191 /// # }
192 /// #
193 /// # pub fn main() {}
194 /// ```
195 #[must_use = "streams do nothing unless polled"]
196 pub struct Connection<T, B: Buf> {
197     connection: proto::Connection<T, Peer, B>,
198 }
199 
200 /// Builds server connections with custom configuration values.
201 ///
202 /// Methods can be chained in order to set the configuration values.
203 ///
204 /// The server is constructed by calling [`handshake`] and passing the I/O
205 /// handle that will back the HTTP/2.0 server.
206 ///
207 /// New instances of `Builder` are obtained via [`Builder::new`].
208 ///
209 /// See function level documentation for details on the various server
210 /// configuration settings.
211 ///
212 /// [`Builder::new`]: struct.Builder.html#method.new
213 /// [`handshake`]: struct.Builder.html#method.handshake
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// # use tokio::io::{AsyncRead, AsyncWrite};
219 /// # use h2::server::*;
220 /// #
221 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222 /// # -> Handshake<T>
223 /// # {
224 /// // `server_fut` is a future representing the completion of the HTTP/2.0
225 /// // handshake.
226 /// let server_fut = Builder::new()
227 ///     .initial_window_size(1_000_000)
228 ///     .max_concurrent_streams(1000)
229 ///     .handshake(my_io);
230 /// # server_fut
231 /// # }
232 /// #
233 /// # pub fn main() {}
234 /// ```
235 #[derive(Clone, Debug)]
236 pub struct Builder {
237     /// Time to keep locally reset streams around before reaping.
238     reset_stream_duration: Duration,
239 
240     /// Maximum number of locally reset streams to keep at a time.
241     reset_stream_max: usize,
242 
243     /// Initial `Settings` frame to send as part of the handshake.
244     settings: Settings,
245 
246     /// Initial target window size for new connections.
247     initial_target_connection_window_size: Option<u32>,
248 }
249 
250 /// Send a response back to the client
251 ///
252 /// A `SendResponse` instance is provided when receiving a request and is used
253 /// to send the associated response back to the client. It is also used to
254 /// explicitly reset the stream with a custom reason.
255 ///
256 /// It will also be used to initiate push promises linked with the associated
257 /// stream.
258 ///
259 /// If the `SendResponse` instance is dropped without sending a response, then
260 /// the HTTP/2.0 stream will be reset.
261 ///
262 /// See [module] level docs for more details.
263 ///
264 /// [module]: index.html
265 #[derive(Debug)]
266 pub struct SendResponse<B: Buf> {
267     inner: proto::StreamRef<B>,
268 }
269 
270 /// Send a response to a promised request
271 ///
272 /// A `SendPushedResponse` instance is provided when promising a request and is used
273 /// to send the associated response to the client. It is also used to
274 /// explicitly reset the stream with a custom reason.
275 ///
276 /// It can not be used to initiate push promises.
277 ///
278 /// If the `SendPushedResponse` instance is dropped without sending a response, then
279 /// the HTTP/2.0 stream will be reset.
280 ///
281 /// See [module] level docs for more details.
282 ///
283 /// [module]: index.html
284 pub struct SendPushedResponse<B: Buf> {
285     inner: SendResponse<B>,
286 }
287 
288 // Manual implementation necessary because of rust-lang/rust#26925
289 impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result290     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
291         write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
292     }
293 }
294 
295 /// Stages of an in-progress handshake.
296 enum Handshaking<T, B: Buf> {
297     /// State 1. Connection is flushing pending SETTINGS frame.
298     Flushing(Instrumented<Flush<T, Prioritized<B>>>),
299     /// State 2. Connection is waiting for the client preface.
300     ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
301     /// Dummy state for `mem::replace`.
302     Empty,
303 }
304 
305 /// Flush a Sink
306 struct Flush<T, B> {
307     codec: Option<Codec<T, B>>,
308 }
309 
310 /// Read the client connection preface
311 struct ReadPreface<T, B> {
312     codec: Option<Codec<T, B>>,
313     pos: usize,
314 }
315 
316 #[derive(Debug)]
317 pub(crate) struct Peer;
318 
319 const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
320 
321 /// Creates a new configured HTTP/2.0 server with default configuration
322 /// values backed by `io`.
323 ///
324 /// It is expected that `io` already be in an appropriate state to commence
325 /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
326 ///
327 /// Returns a future which resolves to the [`Connection`] instance once the
328 /// HTTP/2.0 handshake has been completed. The returned [`Connection`]
329 /// instance will be using default configuration values. Use [`Builder`] to
330 /// customize the configuration values used by a [`Connection`] instance.
331 ///
332 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
333 /// [Handshake]: ../index.html#handshake
334 /// [`Connection`]: struct.Connection.html
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// # use tokio::io::{AsyncRead, AsyncWrite};
340 /// # use h2::server;
341 /// # use h2::server::*;
342 /// #
343 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
344 /// # {
345 /// let connection = server::handshake(my_io).await.unwrap();
346 /// // The HTTP/2.0 handshake has completed, now use `connection` to
347 /// // accept inbound HTTP/2.0 streams.
348 /// # }
349 /// #
350 /// # pub fn main() {}
351 /// ```
handshake<T>(io: T) -> Handshake<T, Bytes> where T: AsyncRead + AsyncWrite + Unpin,352 pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
353 where
354     T: AsyncRead + AsyncWrite + Unpin,
355 {
356     Builder::new().handshake(io)
357 }
358 
359 // ===== impl Connection =====
360 
361 impl<T, B> Connection<T, B>
362 where
363     T: AsyncRead + AsyncWrite + Unpin,
364     B: Buf + 'static,
365 {
handshake2(io: T, builder: Builder) -> Handshake<T, B>366     fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
367         let span = tracing::trace_span!("server_handshake", io = %std::any::type_name::<T>());
368         let entered = span.enter();
369 
370         // Create the codec.
371         let mut codec = Codec::new(io);
372 
373         if let Some(max) = builder.settings.max_frame_size() {
374             codec.set_max_recv_frame_size(max as usize);
375         }
376 
377         if let Some(max) = builder.settings.max_header_list_size() {
378             codec.set_max_recv_header_list_size(max as usize);
379         }
380 
381         // Send initial settings frame.
382         codec
383             .buffer(builder.settings.clone().into())
384             .expect("invalid SETTINGS frame");
385 
386         // Create the handshake future.
387         let state = Handshaking::from(codec);
388 
389         drop(entered);
390 
391         Handshake {
392             builder,
393             state,
394             span,
395         }
396     }
397 
398     /// Accept the next incoming request on this connection.
accept( &mut self, ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>399     pub async fn accept(
400         &mut self,
401     ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
402         futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
403     }
404 
405     #[doc(hidden)]
poll_accept( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>>406     pub fn poll_accept(
407         &mut self,
408         cx: &mut Context<'_>,
409     ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
410         // Always try to advance the internal state. Getting Pending also is
411         // needed to allow this function to return Pending.
412         if let Poll::Ready(_) = self.poll_closed(cx)? {
413             // If the socket is closed, don't return anything
414             // TODO: drop any pending streams
415             return Poll::Ready(None);
416         }
417 
418         if let Some(inner) = self.connection.next_incoming() {
419             tracing::trace!("received incoming");
420             let (head, _) = inner.take_request().into_parts();
421             let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
422 
423             let request = Request::from_parts(head, body);
424             let respond = SendResponse { inner };
425 
426             return Poll::Ready(Some(Ok((request, respond))));
427         }
428 
429         Poll::Pending
430     }
431 
432     /// Sets the target window size for the whole connection.
433     ///
434     /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
435     /// frame will be immediately sent to the remote, increasing the connection
436     /// level window by `size - current_value`.
437     ///
438     /// If `size` is less than the current value, nothing will happen
439     /// immediately. However, as window capacity is released by
440     /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
441     /// out until the number of "in flight" bytes drops below `size`.
442     ///
443     /// The default value is 65,535.
444     ///
445     /// See [`FlowControl`] documentation for more details.
446     ///
447     /// [`FlowControl`]: ../struct.FlowControl.html
448     /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)449     pub fn set_target_window_size(&mut self, size: u32) {
450         assert!(size <= proto::MAX_WINDOW_SIZE);
451         self.connection.set_target_window_size(size);
452     }
453 
454     /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
455     /// flow control for received data.
456     ///
457     /// The `SETTINGS` will be sent to the remote, and only applied once the
458     /// remote acknowledges the change.
459     ///
460     /// This can be used to increase or decrease the window size for existing
461     /// streams.
462     ///
463     /// # Errors
464     ///
465     /// Returns an error if a previous call is still pending acknowledgement
466     /// from the remote endpoint.
set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error>467     pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
468         assert!(size <= proto::MAX_WINDOW_SIZE);
469         self.connection.set_initial_window_size(size)?;
470         Ok(())
471     }
472 
473     /// Returns `Ready` when the underlying connection has closed.
474     ///
475     /// If any new inbound streams are received during a call to `poll_closed`,
476     /// they will be queued and returned on the next call to [`poll_accept`].
477     ///
478     /// This function will advance the internal connection state, driving
479     /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
480     ///
481     /// See [here](index.html#managing-the-connection) for more details.
482     ///
483     /// [`poll_accept`]: struct.Connection.html#method.poll_accept
484     /// [`RecvStream`]: ../struct.RecvStream.html
485     /// [`SendStream`]: ../struct.SendStream.html
poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>486     pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
487         self.connection.poll(cx).map_err(Into::into)
488     }
489 
490     #[doc(hidden)]
491     #[deprecated(note = "renamed to poll_closed")]
poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>492     pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
493         self.poll_closed(cx)
494     }
495 
496     /// Sets the connection to a GOAWAY state.
497     ///
498     /// Does not terminate the connection. Must continue being polled to close
499     /// connection.
500     ///
501     /// After flushing the GOAWAY frame, the connection is closed. Any
502     /// outstanding streams do not prevent the connection from closing. This
503     /// should usually be reserved for shutting down when something bad
504     /// external to `h2` has happened, and open streams cannot be properly
505     /// handled.
506     ///
507     /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
abrupt_shutdown(&mut self, reason: Reason)508     pub fn abrupt_shutdown(&mut self, reason: Reason) {
509         self.connection.go_away_from_user(reason);
510     }
511 
512     /// Starts a [graceful shutdown][1] process.
513     ///
514     /// Must continue being polled to close connection.
515     ///
516     /// It's possible to receive more requests after calling this method, since
517     /// they might have been in-flight from the client already. After about
518     /// 1 RTT, no new requests should be accepted. Once all active streams
519     /// have completed, the connection is closed.
520     ///
521     /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
graceful_shutdown(&mut self)522     pub fn graceful_shutdown(&mut self) {
523         self.connection.go_away_gracefully();
524     }
525 
526     /// Takes a `PingPong` instance from the connection.
527     ///
528     /// # Note
529     ///
530     /// This may only be called once. Calling multiple times will return `None`.
ping_pong(&mut self) -> Option<PingPong>531     pub fn ping_pong(&mut self) -> Option<PingPong> {
532         self.connection.take_user_pings().map(PingPong::new)
533     }
534 
535     /// Returns the maximum number of concurrent streams that may be initiated
536     /// by the server on this connection.
537     ///
538     /// This limit is configured by the client peer by sending the
539     /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
540     /// This method returns the currently acknowledged value recieved from the
541     /// remote.
542     ///
543     /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
max_concurrent_send_streams(&self) -> usize544     pub fn max_concurrent_send_streams(&self) -> usize {
545         self.connection.max_send_streams()
546     }
547 
548     /// Returns the maximum number of concurrent streams that may be initiated
549     /// by the client on this connection.
550     ///
551     /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
552     /// parameter][1] sent in a `SETTINGS` frame that has been
553     /// acknowledged by the remote peer. The value to be sent is configured by
554     /// the [`Builder::max_concurrent_streams`][2] method before handshaking
555     /// with the remote peer.
556     ///
557     /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
558     /// [2]: ../struct.Builder.html#method.max_concurrent_streams
max_concurrent_recv_streams(&self) -> usize559     pub fn max_concurrent_recv_streams(&self) -> usize {
560         self.connection.max_recv_streams()
561     }
562 }
563 
564 #[cfg(feature = "stream")]
565 impl<T, B> futures_core::Stream for Connection<T, B>
566 where
567     T: AsyncRead + AsyncWrite + Unpin,
568     B: Buf + 'static,
569 {
570     type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
571 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>572     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
573         self.poll_accept(cx)
574     }
575 }
576 
577 impl<T, B> fmt::Debug for Connection<T, B>
578 where
579     T: fmt::Debug,
580     B: fmt::Debug + Buf,
581 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result582     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
583         fmt.debug_struct("Connection")
584             .field("connection", &self.connection)
585             .finish()
586     }
587 }
588 
589 // ===== impl Builder =====
590 
591 impl Builder {
592     /// Returns a new server builder instance initialized with default
593     /// configuration values.
594     ///
595     /// Configuration methods can be chained on the return value.
596     ///
597     /// # Examples
598     ///
599     /// ```
600     /// # use tokio::io::{AsyncRead, AsyncWrite};
601     /// # use h2::server::*;
602     /// #
603     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
604     /// # -> Handshake<T>
605     /// # {
606     /// // `server_fut` is a future representing the completion of the HTTP/2.0
607     /// // handshake.
608     /// let server_fut = Builder::new()
609     ///     .initial_window_size(1_000_000)
610     ///     .max_concurrent_streams(1000)
611     ///     .handshake(my_io);
612     /// # server_fut
613     /// # }
614     /// #
615     /// # pub fn main() {}
616     /// ```
new() -> Builder617     pub fn new() -> Builder {
618         Builder {
619             reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
620             reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
621             settings: Settings::default(),
622             initial_target_connection_window_size: None,
623         }
624     }
625 
626     /// Indicates the initial window size (in octets) for stream-level
627     /// flow control for received data.
628     ///
629     /// The initial window of a stream is used as part of flow control. For more
630     /// details, see [`FlowControl`].
631     ///
632     /// The default value is 65,535.
633     ///
634     /// [`FlowControl`]: ../struct.FlowControl.html
635     ///
636     /// # Examples
637     ///
638     /// ```
639     /// # use tokio::io::{AsyncRead, AsyncWrite};
640     /// # use h2::server::*;
641     /// #
642     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
643     /// # -> Handshake<T>
644     /// # {
645     /// // `server_fut` is a future representing the completion of the HTTP/2.0
646     /// // handshake.
647     /// let server_fut = Builder::new()
648     ///     .initial_window_size(1_000_000)
649     ///     .handshake(my_io);
650     /// # server_fut
651     /// # }
652     /// #
653     /// # pub fn main() {}
654     /// ```
initial_window_size(&mut self, size: u32) -> &mut Self655     pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
656         self.settings.set_initial_window_size(Some(size));
657         self
658     }
659 
660     /// Indicates the initial window size (in octets) for connection-level flow control
661     /// for received data.
662     ///
663     /// The initial window of a connection is used as part of flow control. For more details,
664     /// see [`FlowControl`].
665     ///
666     /// The default value is 65,535.
667     ///
668     /// [`FlowControl`]: ../struct.FlowControl.html
669     ///
670     /// # Examples
671     ///
672     /// ```
673     /// # use tokio::io::{AsyncRead, AsyncWrite};
674     /// # use h2::server::*;
675     /// #
676     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
677     /// # -> Handshake<T>
678     /// # {
679     /// // `server_fut` is a future representing the completion of the HTTP/2.0
680     /// // handshake.
681     /// let server_fut = Builder::new()
682     ///     .initial_connection_window_size(1_000_000)
683     ///     .handshake(my_io);
684     /// # server_fut
685     /// # }
686     /// #
687     /// # pub fn main() {}
688     /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self689     pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
690         self.initial_target_connection_window_size = Some(size);
691         self
692     }
693 
694     /// Indicates the size (in octets) of the largest HTTP/2.0 frame payload that the
695     /// configured server is able to accept.
696     ///
697     /// The sender may send data frames that are **smaller** than this value,
698     /// but any data larger than `max` will be broken up into multiple `DATA`
699     /// frames.
700     ///
701     /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
702     ///
703     /// # Examples
704     ///
705     /// ```
706     /// # use tokio::io::{AsyncRead, AsyncWrite};
707     /// # use h2::server::*;
708     /// #
709     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
710     /// # -> Handshake<T>
711     /// # {
712     /// // `server_fut` is a future representing the completion of the HTTP/2.0
713     /// // handshake.
714     /// let server_fut = Builder::new()
715     ///     .max_frame_size(1_000_000)
716     ///     .handshake(my_io);
717     /// # server_fut
718     /// # }
719     /// #
720     /// # pub fn main() {}
721     /// ```
722     ///
723     /// # Panics
724     ///
725     /// This function panics if `max` is not within the legal range specified
726     /// above.
max_frame_size(&mut self, max: u32) -> &mut Self727     pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
728         self.settings.set_max_frame_size(Some(max));
729         self
730     }
731 
732     /// Sets the max size of received header frames.
733     ///
734     /// This advisory setting informs a peer of the maximum size of header list
735     /// that the sender is prepared to accept, in octets. The value is based on
736     /// the uncompressed size of header fields, including the length of the name
737     /// and value in octets plus an overhead of 32 octets for each header field.
738     ///
739     /// This setting is also used to limit the maximum amount of data that is
740     /// buffered to decode HEADERS frames.
741     ///
742     /// # Examples
743     ///
744     /// ```
745     /// # use tokio::io::{AsyncRead, AsyncWrite};
746     /// # use h2::server::*;
747     /// #
748     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
749     /// # -> Handshake<T>
750     /// # {
751     /// // `server_fut` is a future representing the completion of the HTTP/2.0
752     /// // handshake.
753     /// let server_fut = Builder::new()
754     ///     .max_header_list_size(16 * 1024)
755     ///     .handshake(my_io);
756     /// # server_fut
757     /// # }
758     /// #
759     /// # pub fn main() {}
760     /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self761     pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
762         self.settings.set_max_header_list_size(Some(max));
763         self
764     }
765 
766     /// Sets the maximum number of concurrent streams.
767     ///
768     /// The maximum concurrent streams setting only controls the maximum number
769     /// of streams that can be initiated by the remote peer. In other words,
770     /// when this setting is set to 100, this does not limit the number of
771     /// concurrent streams that can be created by the caller.
772     ///
773     /// It is recommended that this value be no smaller than 100, so as to not
774     /// unnecessarily limit parallelism. However, any value is legal, including
775     /// 0. If `max` is set to 0, then the remote will not be permitted to
776     /// initiate streams.
777     ///
778     /// Note that streams in the reserved state, i.e., push promises that have
779     /// been reserved but the stream has not started, do not count against this
780     /// setting.
781     ///
782     /// Also note that if the remote *does* exceed the value set here, it is not
783     /// a protocol level error. Instead, the `h2` library will immediately reset
784     /// the stream.
785     ///
786     /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
787     ///
788     /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
789     ///
790     /// # Examples
791     ///
792     /// ```
793     /// # use tokio::io::{AsyncRead, AsyncWrite};
794     /// # use h2::server::*;
795     /// #
796     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
797     /// # -> Handshake<T>
798     /// # {
799     /// // `server_fut` is a future representing the completion of the HTTP/2.0
800     /// // handshake.
801     /// let server_fut = Builder::new()
802     ///     .max_concurrent_streams(1000)
803     ///     .handshake(my_io);
804     /// # server_fut
805     /// # }
806     /// #
807     /// # pub fn main() {}
808     /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self809     pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
810         self.settings.set_max_concurrent_streams(Some(max));
811         self
812     }
813 
814     /// Sets the maximum number of concurrent locally reset streams.
815     ///
816     /// When a stream is explicitly reset by either calling
817     /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
818     /// before completing the stream, the HTTP/2.0 specification requires that
819     /// any further frames received for that stream must be ignored for "some
820     /// time".
821     ///
822     /// In order to satisfy the specification, internal state must be maintained
823     /// to implement the behavior. This state grows linearly with the number of
824     /// streams that are locally reset.
825     ///
826     /// The `max_concurrent_reset_streams` setting configures sets an upper
827     /// bound on the amount of state that is maintained. When this max value is
828     /// reached, the oldest reset stream is purged from memory.
829     ///
830     /// Once the stream has been fully purged from memory, any additional frames
831     /// received for that stream will result in a connection level protocol
832     /// error, forcing the connection to terminate.
833     ///
834     /// The default value is 10.
835     ///
836     /// # Examples
837     ///
838     /// ```
839     /// # use tokio::io::{AsyncRead, AsyncWrite};
840     /// # use h2::server::*;
841     /// #
842     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
843     /// # -> Handshake<T>
844     /// # {
845     /// // `server_fut` is a future representing the completion of the HTTP/2.0
846     /// // handshake.
847     /// let server_fut = Builder::new()
848     ///     .max_concurrent_reset_streams(1000)
849     ///     .handshake(my_io);
850     /// # server_fut
851     /// # }
852     /// #
853     /// # pub fn main() {}
854     /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self855     pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
856         self.reset_stream_max = max;
857         self
858     }
859 
860     /// Sets the maximum number of concurrent locally reset streams.
861     ///
862     /// When a stream is explicitly reset by either calling
863     /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
864     /// before completing the stream, the HTTP/2.0 specification requires that
865     /// any further frames received for that stream must be ignored for "some
866     /// time".
867     ///
868     /// In order to satisfy the specification, internal state must be maintained
869     /// to implement the behavior. This state grows linearly with the number of
870     /// streams that are locally reset.
871     ///
872     /// The `reset_stream_duration` setting configures the max amount of time
873     /// this state will be maintained in memory. Once the duration elapses, the
874     /// stream state is purged from memory.
875     ///
876     /// Once the stream has been fully purged from memory, any additional frames
877     /// received for that stream will result in a connection level protocol
878     /// error, forcing the connection to terminate.
879     ///
880     /// The default value is 30 seconds.
881     ///
882     /// # Examples
883     ///
884     /// ```
885     /// # use tokio::io::{AsyncRead, AsyncWrite};
886     /// # use h2::server::*;
887     /// # use std::time::Duration;
888     /// #
889     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
890     /// # -> Handshake<T>
891     /// # {
892     /// // `server_fut` is a future representing the completion of the HTTP/2.0
893     /// // handshake.
894     /// let server_fut = Builder::new()
895     ///     .reset_stream_duration(Duration::from_secs(10))
896     ///     .handshake(my_io);
897     /// # server_fut
898     /// # }
899     /// #
900     /// # pub fn main() {}
901     /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self902     pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
903         self.reset_stream_duration = dur;
904         self
905     }
906 
907     /// Creates a new configured HTTP/2.0 server backed by `io`.
908     ///
909     /// It is expected that `io` already be in an appropriate state to commence
910     /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
911     ///
912     /// Returns a future which resolves to the [`Connection`] instance once the
913     /// HTTP/2.0 handshake has been completed.
914     ///
915     /// This function also allows the caller to configure the send payload data
916     /// type. See [Outbound data type] for more details.
917     ///
918     /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
919     /// [Handshake]: ../index.html#handshake
920     /// [`Connection`]: struct.Connection.html
921     /// [Outbound data type]: ../index.html#outbound-data-type.
922     ///
923     /// # Examples
924     ///
925     /// Basic usage:
926     ///
927     /// ```
928     /// # use tokio::io::{AsyncRead, AsyncWrite};
929     /// # use h2::server::*;
930     /// #
931     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
932     /// # -> Handshake<T>
933     /// # {
934     /// // `server_fut` is a future representing the completion of the HTTP/2.0
935     /// // handshake.
936     /// let server_fut = Builder::new()
937     ///     .handshake(my_io);
938     /// # server_fut
939     /// # }
940     /// #
941     /// # pub fn main() {}
942     /// ```
943     ///
944     /// Configures the send-payload data type. In this case, the outbound data
945     /// type will be `&'static [u8]`.
946     ///
947     /// ```
948     /// # use tokio::io::{AsyncRead, AsyncWrite};
949     /// # use h2::server::*;
950     /// #
951     /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
952     /// # -> Handshake<T, &'static [u8]>
953     /// # {
954     /// // `server_fut` is a future representing the completion of the HTTP/2.0
955     /// // handshake.
956     /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
957     ///     .handshake(my_io);
958     /// # server_fut
959     /// # }
960     /// #
961     /// # pub fn main() {}
962     /// ```
handshake<T, B>(&self, io: T) -> Handshake<T, B> where T: AsyncRead + AsyncWrite + Unpin, B: Buf + 'static,963     pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
964     where
965         T: AsyncRead + AsyncWrite + Unpin,
966         B: Buf + 'static,
967     {
968         Connection::handshake2(io, self.clone())
969     }
970 }
971 
972 impl Default for Builder {
default() -> Builder973     fn default() -> Builder {
974         Builder::new()
975     }
976 }
977 
978 // ===== impl SendResponse =====
979 
980 impl<B: Buf> SendResponse<B> {
981     /// Send a response to a client request.
982     ///
983     /// On success, a [`SendStream`] instance is returned. This instance can be
984     /// used to stream the response body and send trailers.
985     ///
986     /// If a body or trailers will be sent on the returned [`SendStream`]
987     /// instance, then `end_of_stream` must be set to `false` when calling this
988     /// function.
989     ///
990     /// The [`SendResponse`] instance is already associated with a received
991     /// request.  This function may only be called once per instance and only if
992     /// [`send_reset`] has not been previously called.
993     ///
994     /// [`SendResponse`]: #
995     /// [`SendStream`]: ../struct.SendStream.html
996     /// [`send_reset`]: #method.send_reset
send_response( &mut self, response: Response<()>, end_of_stream: bool, ) -> Result<SendStream<B>, crate::Error>997     pub fn send_response(
998         &mut self,
999         response: Response<()>,
1000         end_of_stream: bool,
1001     ) -> Result<SendStream<B>, crate::Error> {
1002         self.inner
1003             .send_response(response, end_of_stream)
1004             .map(|_| SendStream::new(self.inner.clone()))
1005             .map_err(Into::into)
1006     }
1007 
1008     /// Push a request and response to the client
1009     ///
1010     /// On success, a [`SendResponse`] instance is returned.
1011     ///
1012     /// [`SendResponse`]: #
push_request( &mut self, request: Request<()>, ) -> Result<SendPushedResponse<B>, crate::Error>1013     pub fn push_request(
1014         &mut self,
1015         request: Request<()>,
1016     ) -> Result<SendPushedResponse<B>, crate::Error> {
1017         self.inner
1018             .send_push_promise(request)
1019             .map(|inner| SendPushedResponse {
1020                 inner: SendResponse { inner },
1021             })
1022             .map_err(Into::into)
1023     }
1024 
1025     /// Send a stream reset to the peer.
1026     ///
1027     /// This essentially cancels the stream, including any inbound or outbound
1028     /// data streams.
1029     ///
1030     /// If this function is called before [`send_response`], a call to
1031     /// [`send_response`] will result in an error.
1032     ///
1033     /// If this function is called while a [`SendStream`] instance is active,
1034     /// any further use of the instance will result in an error.
1035     ///
1036     /// This function should only be called once.
1037     ///
1038     /// [`send_response`]: #method.send_response
1039     /// [`SendStream`]: ../struct.SendStream.html
send_reset(&mut self, reason: Reason)1040     pub fn send_reset(&mut self, reason: Reason) {
1041         self.inner.send_reset(reason)
1042     }
1043 
1044     /// Polls to be notified when the client resets this stream.
1045     ///
1046     /// If stream is still open, this returns `Poll::Pending`, and
1047     /// registers the task to be notified if a `RST_STREAM` is received.
1048     ///
1049     /// If a `RST_STREAM` frame is received for this stream, calling this
1050     /// method will yield the `Reason` for the reset.
1051     ///
1052     /// # Error
1053     ///
1054     /// Calling this method after having called `send_response` will return
1055     /// a user error.
poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>>1056     pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1057         self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1058     }
1059 
1060     /// Returns the stream ID of the response stream.
1061     ///
1062     /// # Panics
1063     ///
1064     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1065     pub fn stream_id(&self) -> crate::StreamId {
1066         crate::StreamId::from_internal(self.inner.stream_id())
1067     }
1068 }
1069 
1070 // ===== impl SendPushedResponse =====
1071 
1072 impl<B: Buf> SendPushedResponse<B> {
1073     /// Send a response to a promised request.
1074     ///
1075     /// On success, a [`SendStream`] instance is returned. This instance can be
1076     /// used to stream the response body and send trailers.
1077     ///
1078     /// If a body or trailers will be sent on the returned [`SendStream`]
1079     /// instance, then `end_of_stream` must be set to `false` when calling this
1080     /// function.
1081     ///
1082     /// The [`SendPushedResponse`] instance is associated with a promised
1083     /// request.  This function may only be called once per instance and only if
1084     /// [`send_reset`] has not been previously called.
1085     ///
1086     /// [`SendPushedResponse`]: #
1087     /// [`SendStream`]: ../struct.SendStream.html
1088     /// [`send_reset`]: #method.send_reset
send_response( &mut self, response: Response<()>, end_of_stream: bool, ) -> Result<SendStream<B>, crate::Error>1089     pub fn send_response(
1090         &mut self,
1091         response: Response<()>,
1092         end_of_stream: bool,
1093     ) -> Result<SendStream<B>, crate::Error> {
1094         self.inner.send_response(response, end_of_stream)
1095     }
1096 
1097     /// Send a stream reset to the peer.
1098     ///
1099     /// This essentially cancels the stream, including any inbound or outbound
1100     /// data streams.
1101     ///
1102     /// If this function is called before [`send_response`], a call to
1103     /// [`send_response`] will result in an error.
1104     ///
1105     /// If this function is called while a [`SendStream`] instance is active,
1106     /// any further use of the instance will result in an error.
1107     ///
1108     /// This function should only be called once.
1109     ///
1110     /// [`send_response`]: #method.send_response
1111     /// [`SendStream`]: ../struct.SendStream.html
send_reset(&mut self, reason: Reason)1112     pub fn send_reset(&mut self, reason: Reason) {
1113         self.inner.send_reset(reason)
1114     }
1115 
1116     /// Polls to be notified when the client resets this stream.
1117     ///
1118     /// If stream is still open, this returns `Poll::Pending`, and
1119     /// registers the task to be notified if a `RST_STREAM` is received.
1120     ///
1121     /// If a `RST_STREAM` frame is received for this stream, calling this
1122     /// method will yield the `Reason` for the reset.
1123     ///
1124     /// # Error
1125     ///
1126     /// Calling this method after having called `send_response` will return
1127     /// a user error.
poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>>1128     pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1129         self.inner.poll_reset(cx)
1130     }
1131 
1132     /// Returns the stream ID of the response stream.
1133     ///
1134     /// # Panics
1135     ///
1136     /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1137     pub fn stream_id(&self) -> crate::StreamId {
1138         self.inner.stream_id()
1139     }
1140 }
1141 
1142 // ===== impl Flush =====
1143 
1144 impl<T, B: Buf> Flush<T, B> {
new(codec: Codec<T, B>) -> Self1145     fn new(codec: Codec<T, B>) -> Self {
1146         Flush { codec: Some(codec) }
1147     }
1148 }
1149 
1150 impl<T, B> Future for Flush<T, B>
1151 where
1152     T: AsyncWrite + Unpin,
1153     B: Buf,
1154 {
1155     type Output = Result<Codec<T, B>, crate::Error>;
1156 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1157     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1158         // Flush the codec
1159         ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1160 
1161         // Return the codec
1162         Poll::Ready(Ok(self.codec.take().unwrap()))
1163     }
1164 }
1165 
1166 impl<T, B: Buf> ReadPreface<T, B> {
new(codec: Codec<T, B>) -> Self1167     fn new(codec: Codec<T, B>) -> Self {
1168         ReadPreface {
1169             codec: Some(codec),
1170             pos: 0,
1171         }
1172     }
1173 
inner_mut(&mut self) -> &mut T1174     fn inner_mut(&mut self) -> &mut T {
1175         self.codec.as_mut().unwrap().get_mut()
1176     }
1177 }
1178 
1179 impl<T, B> Future for ReadPreface<T, B>
1180 where
1181     T: AsyncRead + Unpin,
1182     B: Buf,
1183 {
1184     type Output = Result<Codec<T, B>, crate::Error>;
1185 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1186     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1187         let mut buf = [0; 24];
1188         let mut rem = PREFACE.len() - self.pos;
1189 
1190         while rem > 0 {
1191             let mut buf = ReadBuf::new(&mut buf[..rem]);
1192             ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1193                 .map_err(crate::Error::from_io)?;
1194             let n = buf.filled().len();
1195             if n == 0 {
1196                 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1197                     io::ErrorKind::UnexpectedEof,
1198                     "connection closed before reading preface",
1199                 ))));
1200             }
1201 
1202             if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1203                 proto_err!(conn: "read_preface: invalid preface");
1204                 // TODO: Should this just write the GO_AWAY frame directly?
1205                 return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1206             }
1207 
1208             self.pos += n;
1209             rem -= n; // TODO test
1210         }
1211 
1212         Poll::Ready(Ok(self.codec.take().unwrap()))
1213     }
1214 }
1215 
1216 // ===== impl Handshake =====
1217 
1218 impl<T, B: Buf> Future for Handshake<T, B>
1219 where
1220     T: AsyncRead + AsyncWrite + Unpin,
1221     B: Buf + 'static,
1222 {
1223     type Output = Result<Connection<T, B>, crate::Error>;
1224 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1225     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1226         let span = self.span.clone(); // XXX(eliza): T_T
1227         let _e = span.enter();
1228         tracing::trace!(state = ?self.state);
1229         use crate::server::Handshaking::*;
1230 
1231         self.state = if let Flushing(ref mut flush) = self.state {
1232             // We're currently flushing a pending SETTINGS frame. Poll the
1233             // flush future, and, if it's completed, advance our state to wait
1234             // for the client preface.
1235             let codec = match Pin::new(flush).poll(cx)? {
1236                 Poll::Pending => {
1237                     tracing::trace!(flush.poll = %"Pending");
1238                     return Poll::Pending;
1239                 }
1240                 Poll::Ready(flushed) => {
1241                     tracing::trace!(flush.poll = %"Ready");
1242                     flushed
1243                 }
1244             };
1245             Handshaking::from(ReadPreface::new(codec))
1246         } else {
1247             // Otherwise, we haven't actually advanced the state, but we have
1248             // to replace it with itself, because we have to return a value.
1249             // (note that the assignment to `self.state` has to be outside of
1250             // the `if let` block above in order to placate the borrow checker).
1251             mem::replace(&mut self.state, Handshaking::Empty)
1252         };
1253         let poll = if let ReadingPreface(ref mut read) = self.state {
1254             // We're now waiting for the client preface. Poll the `ReadPreface`
1255             // future. If it has completed, we will create a `Connection` handle
1256             // for the connection.
1257             Pin::new(read).poll(cx)
1258         // Actually creating the `Connection` has to occur outside of this
1259         // `if let` block, because we've borrowed `self` mutably in order
1260         // to poll the state and won't be able to borrow the SETTINGS frame
1261         // as well until we release the borrow for `poll()`.
1262         } else {
1263             unreachable!("Handshake::poll() state was not advanced completely!")
1264         };
1265         poll?.map(|codec| {
1266             let connection = proto::Connection::new(
1267                 codec,
1268                 Config {
1269                     next_stream_id: 2.into(),
1270                     // Server does not need to locally initiate any streams
1271                     initial_max_send_streams: 0,
1272                     reset_stream_duration: self.builder.reset_stream_duration,
1273                     reset_stream_max: self.builder.reset_stream_max,
1274                     settings: self.builder.settings.clone(),
1275                 },
1276             );
1277 
1278             tracing::trace!("connection established!");
1279             let mut c = Connection { connection };
1280             if let Some(sz) = self.builder.initial_target_connection_window_size {
1281                 c.set_target_window_size(sz);
1282             }
1283             Ok(c)
1284         })
1285     }
1286 }
1287 
1288 impl<T, B> fmt::Debug for Handshake<T, B>
1289 where
1290     T: AsyncRead + AsyncWrite + fmt::Debug,
1291     B: fmt::Debug + Buf,
1292 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1293     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1294         write!(fmt, "server::Handshake")
1295     }
1296 }
1297 
1298 impl Peer {
convert_send_message( id: StreamId, response: Response<()>, end_of_stream: bool, ) -> frame::Headers1299     pub fn convert_send_message(
1300         id: StreamId,
1301         response: Response<()>,
1302         end_of_stream: bool,
1303     ) -> frame::Headers {
1304         use http::response::Parts;
1305 
1306         // Extract the components of the HTTP request
1307         let (
1308             Parts {
1309                 status, headers, ..
1310             },
1311             _,
1312         ) = response.into_parts();
1313 
1314         // Build the set pseudo header set. All requests will include `method`
1315         // and `path`.
1316         let pseudo = Pseudo::response(status);
1317 
1318         // Create the HEADERS frame
1319         let mut frame = frame::Headers::new(id, pseudo, headers);
1320 
1321         if end_of_stream {
1322             frame.set_end_stream()
1323         }
1324 
1325         frame
1326     }
1327 
convert_push_message( stream_id: StreamId, promised_id: StreamId, request: Request<()>, ) -> Result<frame::PushPromise, UserError>1328     pub fn convert_push_message(
1329         stream_id: StreamId,
1330         promised_id: StreamId,
1331         request: Request<()>,
1332     ) -> Result<frame::PushPromise, UserError> {
1333         use http::request::Parts;
1334 
1335         if let Err(e) = frame::PushPromise::validate_request(&request) {
1336             use PushPromiseHeaderError::*;
1337             match e {
1338                 NotSafeAndCacheable => tracing::debug!(
1339                     ?promised_id,
1340                     "convert_push_message: method {} is not safe and cacheable",
1341                     request.method(),
1342                 ),
1343                 InvalidContentLength(e) => tracing::debug!(
1344                     ?promised_id,
1345                     "convert_push_message; promised request has invalid content-length {:?}",
1346                     e,
1347                 ),
1348             }
1349             return Err(UserError::MalformedHeaders);
1350         }
1351 
1352         // Extract the components of the HTTP request
1353         let (
1354             Parts {
1355                 method,
1356                 uri,
1357                 headers,
1358                 ..
1359             },
1360             _,
1361         ) = request.into_parts();
1362 
1363         let pseudo = Pseudo::request(method, uri);
1364 
1365         Ok(frame::PushPromise::new(
1366             stream_id,
1367             promised_id,
1368             pseudo,
1369             headers,
1370         ))
1371     }
1372 }
1373 
1374 impl proto::Peer for Peer {
1375     type Poll = Request<()>;
1376 
1377     const NAME: &'static str = "Server";
1378 
is_server() -> bool1379     fn is_server() -> bool {
1380         true
1381     }
1382 
1383     fn r#dyn() -> proto::DynPeer {
1384         proto::DynPeer::Server
1385     }
1386 
convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result<Self::Poll, Error>1387     fn convert_poll_message(
1388         pseudo: Pseudo,
1389         fields: HeaderMap,
1390         stream_id: StreamId,
1391     ) -> Result<Self::Poll, Error> {
1392         use http::{uri, Version};
1393 
1394         let mut b = Request::builder();
1395 
1396         macro_rules! malformed {
1397             ($($arg:tt)*) => {{
1398                 tracing::debug!($($arg)*);
1399                 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1400             }}
1401         }
1402 
1403         b = b.version(Version::HTTP_2);
1404 
1405         let is_connect;
1406         if let Some(method) = pseudo.method {
1407             is_connect = method == Method::CONNECT;
1408             b = b.method(method);
1409         } else {
1410             malformed!("malformed headers: missing method");
1411         }
1412 
1413         // Specifying :status for a request is a protocol error
1414         if pseudo.status.is_some() {
1415             tracing::trace!("malformed headers: :status field on request; PROTOCOL_ERROR");
1416             return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
1417         }
1418 
1419         // Convert the URI
1420         let mut parts = uri::Parts::default();
1421 
1422         // A request translated from HTTP/1 must not include the :authority
1423         // header
1424         if let Some(authority) = pseudo.authority {
1425             let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1426             parts.authority = Some(maybe_authority.or_else(|why| {
1427                 malformed!(
1428                     "malformed headers: malformed authority ({:?}): {}",
1429                     authority,
1430                     why,
1431                 )
1432             })?);
1433         }
1434 
1435         // A :scheme is required, except CONNECT.
1436         if let Some(scheme) = pseudo.scheme {
1437             if is_connect {
1438                 malformed!(":scheme in CONNECT");
1439             }
1440             let maybe_scheme = scheme.parse();
1441             let scheme = maybe_scheme.or_else(|why| {
1442                 malformed!(
1443                     "malformed headers: malformed scheme ({:?}): {}",
1444                     scheme,
1445                     why,
1446                 )
1447             })?;
1448 
1449             // It's not possible to build an `Uri` from a scheme and path. So,
1450             // after validating is was a valid scheme, we just have to drop it
1451             // if there isn't an :authority.
1452             if parts.authority.is_some() {
1453                 parts.scheme = Some(scheme);
1454             }
1455         } else if !is_connect {
1456             malformed!("malformed headers: missing scheme");
1457         }
1458 
1459         if let Some(path) = pseudo.path {
1460             if is_connect {
1461                 malformed!(":path in CONNECT");
1462             }
1463 
1464             // This cannot be empty
1465             if path.is_empty() {
1466                 malformed!("malformed headers: missing path");
1467             }
1468 
1469             let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1470             parts.path_and_query = Some(maybe_path.or_else(|why| {
1471                 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1472             })?);
1473         }
1474 
1475         b = b.uri(parts);
1476 
1477         let mut request = match b.body(()) {
1478             Ok(request) => request,
1479             Err(e) => {
1480                 // TODO: Should there be more specialized handling for different
1481                 // kinds of errors
1482                 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1483                 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1484             }
1485         };
1486 
1487         *request.headers_mut() = fields;
1488 
1489         Ok(request)
1490     }
1491 }
1492 
1493 // ===== impl Handshaking =====
1494 
1495 impl<T, B> fmt::Debug for Handshaking<T, B>
1496 where
1497     B: Buf,
1498 {
1499     #[inline]
fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error>1500     fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1501         match *self {
1502             Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"),
1503             Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"),
1504             Handshaking::Empty => write!(f, "Handshaking::Empty"),
1505         }
1506     }
1507 }
1508 
1509 impl<T, B> convert::From<Flush<T, Prioritized<B>>> for Handshaking<T, B>
1510 where
1511     T: AsyncRead + AsyncWrite,
1512     B: Buf,
1513 {
1514     #[inline]
from(flush: Flush<T, Prioritized<B>>) -> Self1515     fn from(flush: Flush<T, Prioritized<B>>) -> Self {
1516         Handshaking::Flushing(flush.instrument(tracing::trace_span!("flush")))
1517     }
1518 }
1519 
1520 impl<T, B> convert::From<ReadPreface<T, Prioritized<B>>> for Handshaking<T, B>
1521 where
1522     T: AsyncRead + AsyncWrite,
1523     B: Buf,
1524 {
1525     #[inline]
from(read: ReadPreface<T, Prioritized<B>>) -> Self1526     fn from(read: ReadPreface<T, Prioritized<B>>) -> Self {
1527         Handshaking::ReadingPreface(read.instrument(tracing::trace_span!("read_preface")))
1528     }
1529 }
1530 
1531 impl<T, B> convert::From<Codec<T, Prioritized<B>>> for Handshaking<T, B>
1532 where
1533     T: AsyncRead + AsyncWrite,
1534     B: Buf,
1535 {
1536     #[inline]
from(codec: Codec<T, Prioritized<B>>) -> Self1537     fn from(codec: Codec<T, Prioritized<B>>) -> Self {
1538         Handshaking::from(Flush::new(codec))
1539     }
1540 }
1541