1 //! Server implementation of the HTTP/2.0 protocol.
2 //!
3 //! # Getting started
4 //!
5 //! Running an HTTP/2.0 server requires the caller to manage accepting the
6 //! connections as well as getting the connections to a state that is ready to
7 //! begin the HTTP/2.0 handshake. See [here](../index.html#handshake) for more
8 //! details.
9 //!
10 //! This could be as basic as using Tokio's [`TcpListener`] to accept
11 //! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12 //! upgrades.
13 //!
14 //! Once a connection is obtained, it is passed to [`handshake`],
15 //! which will begin the [HTTP/2.0 handshake]. This returns a future that
16 //! completes once the handshake process is performed and HTTP/2.0 streams may
17 //! be received.
18 //!
19 //! [`handshake`] uses default configuration values. There are a number of
20 //! settings that can be changed by using [`Builder`] instead.
21 //!
22 //! # Inbound streams
23 //!
24 //! The [`Connection`] instance is used to accept inbound HTTP/2.0 streams. It
25 //! does this by implementing [`futures::Stream`]. When a new stream is
26 //! received, a call to [`Connection::accept`] will return `(request, response)`.
27 //! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28 //! HTTP request head as well as provides a way to receive the inbound data
29 //! stream and the trailers. The `response` handle (of type [`SendResponse`])
30 //! allows responding to the request, stream the response payload, send
31 //! trailers, and send push promises.
32 //!
33 //! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34 //! can be operated independently.
35 //!
36 //! # Managing the connection
37 //!
38 //! The [`Connection`] instance is used to manage connection state. The caller
39 //! is required to call either [`Connection::accept`] or
40 //! [`Connection::poll_close`] in order to advance the connection state. Simply
41 //! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42 //! connection state is advanced.
43 //!
44 //! It is not required to call **both** [`Connection::accept`] and
45 //! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46 //! then only [`Connection::accept`] should be called. When the caller **does
47 //! not** want to accept a new stream, [`Connection::poll_close`] should be
48 //! called.
49 //!
50 //! The [`Connection`] instance should only be dropped once
51 //! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52 //! returns `Ready(None)`, there will no longer be any more inbound streams. At
53 //! this point, only [`Connection::poll_close`] should be called.
54 //!
55 //! # Shutting down the server
56 //!
57 //! Graceful shutdown of the server is [not yet
58 //! implemented](https://github.com/hyperium/h2/issues/69).
59 //!
60 //! # Example
61 //!
62 //! A basic HTTP/2.0 server example that runs over TCP and assumes [prior
63 //! knowledge], i.e. both the client and the server assume that the TCP socket
64 //! will use the HTTP/2.0 protocol without prior negotiation.
65 //!
66 //! ```no_run
67 //! use h2::server;
68 //! use http::{Response, StatusCode};
69 //! use tokio::net::TcpListener;
70 //!
71 //! #[tokio::main]
72 //! pub async fn main() {
73 //! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74 //!
75 //! // Accept all incoming TCP connections.
76 //! loop {
77 //! if let Ok((socket, _peer_addr)) = listener.accept().await {
78 //! // Spawn a new task to process each connection.
79 //! tokio::spawn(async {
80 //! // Start the HTTP/2.0 connection handshake
81 //! let mut h2 = server::handshake(socket).await.unwrap();
82 //! // Accept all inbound HTTP/2.0 streams sent over the
83 //! // connection.
84 //! while let Some(request) = h2.accept().await {
85 //! let (request, mut respond) = request.unwrap();
86 //! println!("Received request: {:?}", request);
87 //!
88 //! // Build a response with no body
89 //! let response = Response::builder()
90 //! .status(StatusCode::OK)
91 //! .body(())
92 //! .unwrap();
93 //!
94 //! // Send the response back to the client
95 //! respond.send_response(response, true)
96 //! .unwrap();
97 //! }
98 //!
99 //! });
100 //! }
101 //! }
102 //! }
103 //! ```
104 //!
105 //! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106 //! [`handshake`]: fn.handshake.html
107 //! [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108 //! [`Builder`]: struct.Builder.html
109 //! [`Connection`]: struct.Connection.html
110 //! [`Connection::poll`]: struct.Connection.html#method.poll
111 //! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112 //! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113 //! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114 //! [`RecvStream`]: ../struct.RecvStream.html
115 //! [`SendStream`]: ../struct.SendStream.html
116 //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118 use crate::codec::{Codec, UserError};
119 use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120 use crate::proto::{self, Config, Error, Prioritized};
121 use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123 use bytes::{Buf, Bytes};
124 use http::{HeaderMap, Method, Request, Response};
125 use std::future::Future;
126 use std::pin::Pin;
127 use std::task::{Context, Poll};
128 use std::time::Duration;
129 use std::{convert, fmt, io, mem};
130 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131 use tracing::instrument::{Instrument, Instrumented};
132
133 /// In progress HTTP/2.0 connection handshake future.
134 ///
135 /// This type implements `Future`, yielding a `Connection` instance once the
136 /// handshake has completed.
137 ///
138 /// The handshake is completed once the connection preface is fully received
139 /// from the client **and** the initial settings frame is sent to the client.
140 ///
141 /// The handshake future does not wait for the initial settings frame from the
142 /// client.
143 ///
144 /// See [module] level docs for more details.
145 ///
146 /// [module]: index.html
147 #[must_use = "futures do nothing unless polled"]
148 pub struct Handshake<T, B: Buf = Bytes> {
149 /// The config to pass to Connection::new after handshake succeeds.
150 builder: Builder,
151 /// The current state of the handshake.
152 state: Handshaking<T, B>,
153 /// Span tracking the handshake
154 span: tracing::Span,
155 }
156
157 /// Accepts inbound HTTP/2.0 streams on a connection.
158 ///
159 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160 /// implements the HTTP/2.0 server logic for that connection. It is responsible
161 /// for receiving inbound streams initiated by the client as well as driving the
162 /// internal state forward.
163 ///
164 /// `Connection` values are created by calling [`handshake`]. Once a
165 /// `Connection` value is obtained, the caller must call [`poll`] or
166 /// [`poll_close`] in order to drive the internal connection state forward.
167 ///
168 /// See [module level] documentation for more details
169 ///
170 /// [module level]: index.html
171 /// [`handshake`]: struct.Connection.html#method.handshake
172 /// [`poll`]: struct.Connection.html#method.poll
173 /// [`poll_close`]: struct.Connection.html#method.poll_close
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// # use tokio::io::{AsyncRead, AsyncWrite};
179 /// # use h2::server;
180 /// # use h2::server::*;
181 /// #
182 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183 /// let mut server = server::handshake(my_io).await.unwrap();
184 /// while let Some(request) = server.accept().await {
185 /// tokio::spawn(async move {
186 /// let (request, respond) = request.unwrap();
187 /// // Process the request and send the response back to the client
188 /// // using `respond`.
189 /// });
190 /// }
191 /// # }
192 /// #
193 /// # pub fn main() {}
194 /// ```
195 #[must_use = "streams do nothing unless polled"]
196 pub struct Connection<T, B: Buf> {
197 connection: proto::Connection<T, Peer, B>,
198 }
199
200 /// Builds server connections with custom configuration values.
201 ///
202 /// Methods can be chained in order to set the configuration values.
203 ///
204 /// The server is constructed by calling [`handshake`] and passing the I/O
205 /// handle that will back the HTTP/2.0 server.
206 ///
207 /// New instances of `Builder` are obtained via [`Builder::new`].
208 ///
209 /// See function level documentation for details on the various server
210 /// configuration settings.
211 ///
212 /// [`Builder::new`]: struct.Builder.html#method.new
213 /// [`handshake`]: struct.Builder.html#method.handshake
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// # use tokio::io::{AsyncRead, AsyncWrite};
219 /// # use h2::server::*;
220 /// #
221 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222 /// # -> Handshake<T>
223 /// # {
224 /// // `server_fut` is a future representing the completion of the HTTP/2.0
225 /// // handshake.
226 /// let server_fut = Builder::new()
227 /// .initial_window_size(1_000_000)
228 /// .max_concurrent_streams(1000)
229 /// .handshake(my_io);
230 /// # server_fut
231 /// # }
232 /// #
233 /// # pub fn main() {}
234 /// ```
235 #[derive(Clone, Debug)]
236 pub struct Builder {
237 /// Time to keep locally reset streams around before reaping.
238 reset_stream_duration: Duration,
239
240 /// Maximum number of locally reset streams to keep at a time.
241 reset_stream_max: usize,
242
243 /// Initial `Settings` frame to send as part of the handshake.
244 settings: Settings,
245
246 /// Initial target window size for new connections.
247 initial_target_connection_window_size: Option<u32>,
248 }
249
250 /// Send a response back to the client
251 ///
252 /// A `SendResponse` instance is provided when receiving a request and is used
253 /// to send the associated response back to the client. It is also used to
254 /// explicitly reset the stream with a custom reason.
255 ///
256 /// It will also be used to initiate push promises linked with the associated
257 /// stream.
258 ///
259 /// If the `SendResponse` instance is dropped without sending a response, then
260 /// the HTTP/2.0 stream will be reset.
261 ///
262 /// See [module] level docs for more details.
263 ///
264 /// [module]: index.html
265 #[derive(Debug)]
266 pub struct SendResponse<B: Buf> {
267 inner: proto::StreamRef<B>,
268 }
269
270 /// Send a response to a promised request
271 ///
272 /// A `SendPushedResponse` instance is provided when promising a request and is used
273 /// to send the associated response to the client. It is also used to
274 /// explicitly reset the stream with a custom reason.
275 ///
276 /// It can not be used to initiate push promises.
277 ///
278 /// If the `SendPushedResponse` instance is dropped without sending a response, then
279 /// the HTTP/2.0 stream will be reset.
280 ///
281 /// See [module] level docs for more details.
282 ///
283 /// [module]: index.html
284 pub struct SendPushedResponse<B: Buf> {
285 inner: SendResponse<B>,
286 }
287
288 // Manual implementation necessary because of rust-lang/rust#26925
289 impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result290 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
291 write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
292 }
293 }
294
295 /// Stages of an in-progress handshake.
296 enum Handshaking<T, B: Buf> {
297 /// State 1. Connection is flushing pending SETTINGS frame.
298 Flushing(Instrumented<Flush<T, Prioritized<B>>>),
299 /// State 2. Connection is waiting for the client preface.
300 ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
301 /// Dummy state for `mem::replace`.
302 Empty,
303 }
304
305 /// Flush a Sink
306 struct Flush<T, B> {
307 codec: Option<Codec<T, B>>,
308 }
309
310 /// Read the client connection preface
311 struct ReadPreface<T, B> {
312 codec: Option<Codec<T, B>>,
313 pos: usize,
314 }
315
316 #[derive(Debug)]
317 pub(crate) struct Peer;
318
319 const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
320
321 /// Creates a new configured HTTP/2.0 server with default configuration
322 /// values backed by `io`.
323 ///
324 /// It is expected that `io` already be in an appropriate state to commence
325 /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
326 ///
327 /// Returns a future which resolves to the [`Connection`] instance once the
328 /// HTTP/2.0 handshake has been completed. The returned [`Connection`]
329 /// instance will be using default configuration values. Use [`Builder`] to
330 /// customize the configuration values used by a [`Connection`] instance.
331 ///
332 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
333 /// [Handshake]: ../index.html#handshake
334 /// [`Connection`]: struct.Connection.html
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// # use tokio::io::{AsyncRead, AsyncWrite};
340 /// # use h2::server;
341 /// # use h2::server::*;
342 /// #
343 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
344 /// # {
345 /// let connection = server::handshake(my_io).await.unwrap();
346 /// // The HTTP/2.0 handshake has completed, now use `connection` to
347 /// // accept inbound HTTP/2.0 streams.
348 /// # }
349 /// #
350 /// # pub fn main() {}
351 /// ```
handshake<T>(io: T) -> Handshake<T, Bytes> where T: AsyncRead + AsyncWrite + Unpin,352 pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
353 where
354 T: AsyncRead + AsyncWrite + Unpin,
355 {
356 Builder::new().handshake(io)
357 }
358
359 // ===== impl Connection =====
360
361 impl<T, B> Connection<T, B>
362 where
363 T: AsyncRead + AsyncWrite + Unpin,
364 B: Buf + 'static,
365 {
handshake2(io: T, builder: Builder) -> Handshake<T, B>366 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
367 let span = tracing::trace_span!("server_handshake", io = %std::any::type_name::<T>());
368 let entered = span.enter();
369
370 // Create the codec.
371 let mut codec = Codec::new(io);
372
373 if let Some(max) = builder.settings.max_frame_size() {
374 codec.set_max_recv_frame_size(max as usize);
375 }
376
377 if let Some(max) = builder.settings.max_header_list_size() {
378 codec.set_max_recv_header_list_size(max as usize);
379 }
380
381 // Send initial settings frame.
382 codec
383 .buffer(builder.settings.clone().into())
384 .expect("invalid SETTINGS frame");
385
386 // Create the handshake future.
387 let state = Handshaking::from(codec);
388
389 drop(entered);
390
391 Handshake {
392 builder,
393 state,
394 span,
395 }
396 }
397
398 /// Accept the next incoming request on this connection.
accept( &mut self, ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>399 pub async fn accept(
400 &mut self,
401 ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
402 futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
403 }
404
405 #[doc(hidden)]
poll_accept( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>>406 pub fn poll_accept(
407 &mut self,
408 cx: &mut Context<'_>,
409 ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
410 // Always try to advance the internal state. Getting Pending also is
411 // needed to allow this function to return Pending.
412 if let Poll::Ready(_) = self.poll_closed(cx)? {
413 // If the socket is closed, don't return anything
414 // TODO: drop any pending streams
415 return Poll::Ready(None);
416 }
417
418 if let Some(inner) = self.connection.next_incoming() {
419 tracing::trace!("received incoming");
420 let (head, _) = inner.take_request().into_parts();
421 let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
422
423 let request = Request::from_parts(head, body);
424 let respond = SendResponse { inner };
425
426 return Poll::Ready(Some(Ok((request, respond))));
427 }
428
429 Poll::Pending
430 }
431
432 /// Sets the target window size for the whole connection.
433 ///
434 /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
435 /// frame will be immediately sent to the remote, increasing the connection
436 /// level window by `size - current_value`.
437 ///
438 /// If `size` is less than the current value, nothing will happen
439 /// immediately. However, as window capacity is released by
440 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
441 /// out until the number of "in flight" bytes drops below `size`.
442 ///
443 /// The default value is 65,535.
444 ///
445 /// See [`FlowControl`] documentation for more details.
446 ///
447 /// [`FlowControl`]: ../struct.FlowControl.html
448 /// [library level]: ../index.html#flow-control
set_target_window_size(&mut self, size: u32)449 pub fn set_target_window_size(&mut self, size: u32) {
450 assert!(size <= proto::MAX_WINDOW_SIZE);
451 self.connection.set_target_window_size(size);
452 }
453
454 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
455 /// flow control for received data.
456 ///
457 /// The `SETTINGS` will be sent to the remote, and only applied once the
458 /// remote acknowledges the change.
459 ///
460 /// This can be used to increase or decrease the window size for existing
461 /// streams.
462 ///
463 /// # Errors
464 ///
465 /// Returns an error if a previous call is still pending acknowledgement
466 /// from the remote endpoint.
set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error>467 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
468 assert!(size <= proto::MAX_WINDOW_SIZE);
469 self.connection.set_initial_window_size(size)?;
470 Ok(())
471 }
472
473 /// Returns `Ready` when the underlying connection has closed.
474 ///
475 /// If any new inbound streams are received during a call to `poll_closed`,
476 /// they will be queued and returned on the next call to [`poll_accept`].
477 ///
478 /// This function will advance the internal connection state, driving
479 /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
480 ///
481 /// See [here](index.html#managing-the-connection) for more details.
482 ///
483 /// [`poll_accept`]: struct.Connection.html#method.poll_accept
484 /// [`RecvStream`]: ../struct.RecvStream.html
485 /// [`SendStream`]: ../struct.SendStream.html
poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>486 pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
487 self.connection.poll(cx).map_err(Into::into)
488 }
489
490 #[doc(hidden)]
491 #[deprecated(note = "renamed to poll_closed")]
poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>>492 pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
493 self.poll_closed(cx)
494 }
495
496 /// Sets the connection to a GOAWAY state.
497 ///
498 /// Does not terminate the connection. Must continue being polled to close
499 /// connection.
500 ///
501 /// After flushing the GOAWAY frame, the connection is closed. Any
502 /// outstanding streams do not prevent the connection from closing. This
503 /// should usually be reserved for shutting down when something bad
504 /// external to `h2` has happened, and open streams cannot be properly
505 /// handled.
506 ///
507 /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
abrupt_shutdown(&mut self, reason: Reason)508 pub fn abrupt_shutdown(&mut self, reason: Reason) {
509 self.connection.go_away_from_user(reason);
510 }
511
512 /// Starts a [graceful shutdown][1] process.
513 ///
514 /// Must continue being polled to close connection.
515 ///
516 /// It's possible to receive more requests after calling this method, since
517 /// they might have been in-flight from the client already. After about
518 /// 1 RTT, no new requests should be accepted. Once all active streams
519 /// have completed, the connection is closed.
520 ///
521 /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
graceful_shutdown(&mut self)522 pub fn graceful_shutdown(&mut self) {
523 self.connection.go_away_gracefully();
524 }
525
526 /// Takes a `PingPong` instance from the connection.
527 ///
528 /// # Note
529 ///
530 /// This may only be called once. Calling multiple times will return `None`.
ping_pong(&mut self) -> Option<PingPong>531 pub fn ping_pong(&mut self) -> Option<PingPong> {
532 self.connection.take_user_pings().map(PingPong::new)
533 }
534
535 /// Returns the maximum number of concurrent streams that may be initiated
536 /// by the server on this connection.
537 ///
538 /// This limit is configured by the client peer by sending the
539 /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
540 /// This method returns the currently acknowledged value recieved from the
541 /// remote.
542 ///
543 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
max_concurrent_send_streams(&self) -> usize544 pub fn max_concurrent_send_streams(&self) -> usize {
545 self.connection.max_send_streams()
546 }
547
548 /// Returns the maximum number of concurrent streams that may be initiated
549 /// by the client on this connection.
550 ///
551 /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
552 /// parameter][1] sent in a `SETTINGS` frame that has been
553 /// acknowledged by the remote peer. The value to be sent is configured by
554 /// the [`Builder::max_concurrent_streams`][2] method before handshaking
555 /// with the remote peer.
556 ///
557 /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
558 /// [2]: ../struct.Builder.html#method.max_concurrent_streams
max_concurrent_recv_streams(&self) -> usize559 pub fn max_concurrent_recv_streams(&self) -> usize {
560 self.connection.max_recv_streams()
561 }
562 }
563
564 #[cfg(feature = "stream")]
565 impl<T, B> futures_core::Stream for Connection<T, B>
566 where
567 T: AsyncRead + AsyncWrite + Unpin,
568 B: Buf + 'static,
569 {
570 type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
571
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>572 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
573 self.poll_accept(cx)
574 }
575 }
576
577 impl<T, B> fmt::Debug for Connection<T, B>
578 where
579 T: fmt::Debug,
580 B: fmt::Debug + Buf,
581 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result582 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
583 fmt.debug_struct("Connection")
584 .field("connection", &self.connection)
585 .finish()
586 }
587 }
588
589 // ===== impl Builder =====
590
591 impl Builder {
592 /// Returns a new server builder instance initialized with default
593 /// configuration values.
594 ///
595 /// Configuration methods can be chained on the return value.
596 ///
597 /// # Examples
598 ///
599 /// ```
600 /// # use tokio::io::{AsyncRead, AsyncWrite};
601 /// # use h2::server::*;
602 /// #
603 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
604 /// # -> Handshake<T>
605 /// # {
606 /// // `server_fut` is a future representing the completion of the HTTP/2.0
607 /// // handshake.
608 /// let server_fut = Builder::new()
609 /// .initial_window_size(1_000_000)
610 /// .max_concurrent_streams(1000)
611 /// .handshake(my_io);
612 /// # server_fut
613 /// # }
614 /// #
615 /// # pub fn main() {}
616 /// ```
new() -> Builder617 pub fn new() -> Builder {
618 Builder {
619 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
620 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
621 settings: Settings::default(),
622 initial_target_connection_window_size: None,
623 }
624 }
625
626 /// Indicates the initial window size (in octets) for stream-level
627 /// flow control for received data.
628 ///
629 /// The initial window of a stream is used as part of flow control. For more
630 /// details, see [`FlowControl`].
631 ///
632 /// The default value is 65,535.
633 ///
634 /// [`FlowControl`]: ../struct.FlowControl.html
635 ///
636 /// # Examples
637 ///
638 /// ```
639 /// # use tokio::io::{AsyncRead, AsyncWrite};
640 /// # use h2::server::*;
641 /// #
642 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
643 /// # -> Handshake<T>
644 /// # {
645 /// // `server_fut` is a future representing the completion of the HTTP/2.0
646 /// // handshake.
647 /// let server_fut = Builder::new()
648 /// .initial_window_size(1_000_000)
649 /// .handshake(my_io);
650 /// # server_fut
651 /// # }
652 /// #
653 /// # pub fn main() {}
654 /// ```
initial_window_size(&mut self, size: u32) -> &mut Self655 pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
656 self.settings.set_initial_window_size(Some(size));
657 self
658 }
659
660 /// Indicates the initial window size (in octets) for connection-level flow control
661 /// for received data.
662 ///
663 /// The initial window of a connection is used as part of flow control. For more details,
664 /// see [`FlowControl`].
665 ///
666 /// The default value is 65,535.
667 ///
668 /// [`FlowControl`]: ../struct.FlowControl.html
669 ///
670 /// # Examples
671 ///
672 /// ```
673 /// # use tokio::io::{AsyncRead, AsyncWrite};
674 /// # use h2::server::*;
675 /// #
676 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
677 /// # -> Handshake<T>
678 /// # {
679 /// // `server_fut` is a future representing the completion of the HTTP/2.0
680 /// // handshake.
681 /// let server_fut = Builder::new()
682 /// .initial_connection_window_size(1_000_000)
683 /// .handshake(my_io);
684 /// # server_fut
685 /// # }
686 /// #
687 /// # pub fn main() {}
688 /// ```
initial_connection_window_size(&mut self, size: u32) -> &mut Self689 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
690 self.initial_target_connection_window_size = Some(size);
691 self
692 }
693
694 /// Indicates the size (in octets) of the largest HTTP/2.0 frame payload that the
695 /// configured server is able to accept.
696 ///
697 /// The sender may send data frames that are **smaller** than this value,
698 /// but any data larger than `max` will be broken up into multiple `DATA`
699 /// frames.
700 ///
701 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
702 ///
703 /// # Examples
704 ///
705 /// ```
706 /// # use tokio::io::{AsyncRead, AsyncWrite};
707 /// # use h2::server::*;
708 /// #
709 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
710 /// # -> Handshake<T>
711 /// # {
712 /// // `server_fut` is a future representing the completion of the HTTP/2.0
713 /// // handshake.
714 /// let server_fut = Builder::new()
715 /// .max_frame_size(1_000_000)
716 /// .handshake(my_io);
717 /// # server_fut
718 /// # }
719 /// #
720 /// # pub fn main() {}
721 /// ```
722 ///
723 /// # Panics
724 ///
725 /// This function panics if `max` is not within the legal range specified
726 /// above.
max_frame_size(&mut self, max: u32) -> &mut Self727 pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
728 self.settings.set_max_frame_size(Some(max));
729 self
730 }
731
732 /// Sets the max size of received header frames.
733 ///
734 /// This advisory setting informs a peer of the maximum size of header list
735 /// that the sender is prepared to accept, in octets. The value is based on
736 /// the uncompressed size of header fields, including the length of the name
737 /// and value in octets plus an overhead of 32 octets for each header field.
738 ///
739 /// This setting is also used to limit the maximum amount of data that is
740 /// buffered to decode HEADERS frames.
741 ///
742 /// # Examples
743 ///
744 /// ```
745 /// # use tokio::io::{AsyncRead, AsyncWrite};
746 /// # use h2::server::*;
747 /// #
748 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
749 /// # -> Handshake<T>
750 /// # {
751 /// // `server_fut` is a future representing the completion of the HTTP/2.0
752 /// // handshake.
753 /// let server_fut = Builder::new()
754 /// .max_header_list_size(16 * 1024)
755 /// .handshake(my_io);
756 /// # server_fut
757 /// # }
758 /// #
759 /// # pub fn main() {}
760 /// ```
max_header_list_size(&mut self, max: u32) -> &mut Self761 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
762 self.settings.set_max_header_list_size(Some(max));
763 self
764 }
765
766 /// Sets the maximum number of concurrent streams.
767 ///
768 /// The maximum concurrent streams setting only controls the maximum number
769 /// of streams that can be initiated by the remote peer. In other words,
770 /// when this setting is set to 100, this does not limit the number of
771 /// concurrent streams that can be created by the caller.
772 ///
773 /// It is recommended that this value be no smaller than 100, so as to not
774 /// unnecessarily limit parallelism. However, any value is legal, including
775 /// 0. If `max` is set to 0, then the remote will not be permitted to
776 /// initiate streams.
777 ///
778 /// Note that streams in the reserved state, i.e., push promises that have
779 /// been reserved but the stream has not started, do not count against this
780 /// setting.
781 ///
782 /// Also note that if the remote *does* exceed the value set here, it is not
783 /// a protocol level error. Instead, the `h2` library will immediately reset
784 /// the stream.
785 ///
786 /// See [Section 5.1.2] in the HTTP/2.0 spec for more details.
787 ///
788 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
789 ///
790 /// # Examples
791 ///
792 /// ```
793 /// # use tokio::io::{AsyncRead, AsyncWrite};
794 /// # use h2::server::*;
795 /// #
796 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
797 /// # -> Handshake<T>
798 /// # {
799 /// // `server_fut` is a future representing the completion of the HTTP/2.0
800 /// // handshake.
801 /// let server_fut = Builder::new()
802 /// .max_concurrent_streams(1000)
803 /// .handshake(my_io);
804 /// # server_fut
805 /// # }
806 /// #
807 /// # pub fn main() {}
808 /// ```
max_concurrent_streams(&mut self, max: u32) -> &mut Self809 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
810 self.settings.set_max_concurrent_streams(Some(max));
811 self
812 }
813
814 /// Sets the maximum number of concurrent locally reset streams.
815 ///
816 /// When a stream is explicitly reset by either calling
817 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
818 /// before completing the stream, the HTTP/2.0 specification requires that
819 /// any further frames received for that stream must be ignored for "some
820 /// time".
821 ///
822 /// In order to satisfy the specification, internal state must be maintained
823 /// to implement the behavior. This state grows linearly with the number of
824 /// streams that are locally reset.
825 ///
826 /// The `max_concurrent_reset_streams` setting configures sets an upper
827 /// bound on the amount of state that is maintained. When this max value is
828 /// reached, the oldest reset stream is purged from memory.
829 ///
830 /// Once the stream has been fully purged from memory, any additional frames
831 /// received for that stream will result in a connection level protocol
832 /// error, forcing the connection to terminate.
833 ///
834 /// The default value is 10.
835 ///
836 /// # Examples
837 ///
838 /// ```
839 /// # use tokio::io::{AsyncRead, AsyncWrite};
840 /// # use h2::server::*;
841 /// #
842 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
843 /// # -> Handshake<T>
844 /// # {
845 /// // `server_fut` is a future representing the completion of the HTTP/2.0
846 /// // handshake.
847 /// let server_fut = Builder::new()
848 /// .max_concurrent_reset_streams(1000)
849 /// .handshake(my_io);
850 /// # server_fut
851 /// # }
852 /// #
853 /// # pub fn main() {}
854 /// ```
max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self855 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
856 self.reset_stream_max = max;
857 self
858 }
859
860 /// Sets the maximum number of concurrent locally reset streams.
861 ///
862 /// When a stream is explicitly reset by either calling
863 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
864 /// before completing the stream, the HTTP/2.0 specification requires that
865 /// any further frames received for that stream must be ignored for "some
866 /// time".
867 ///
868 /// In order to satisfy the specification, internal state must be maintained
869 /// to implement the behavior. This state grows linearly with the number of
870 /// streams that are locally reset.
871 ///
872 /// The `reset_stream_duration` setting configures the max amount of time
873 /// this state will be maintained in memory. Once the duration elapses, the
874 /// stream state is purged from memory.
875 ///
876 /// Once the stream has been fully purged from memory, any additional frames
877 /// received for that stream will result in a connection level protocol
878 /// error, forcing the connection to terminate.
879 ///
880 /// The default value is 30 seconds.
881 ///
882 /// # Examples
883 ///
884 /// ```
885 /// # use tokio::io::{AsyncRead, AsyncWrite};
886 /// # use h2::server::*;
887 /// # use std::time::Duration;
888 /// #
889 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
890 /// # -> Handshake<T>
891 /// # {
892 /// // `server_fut` is a future representing the completion of the HTTP/2.0
893 /// // handshake.
894 /// let server_fut = Builder::new()
895 /// .reset_stream_duration(Duration::from_secs(10))
896 /// .handshake(my_io);
897 /// # server_fut
898 /// # }
899 /// #
900 /// # pub fn main() {}
901 /// ```
reset_stream_duration(&mut self, dur: Duration) -> &mut Self902 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
903 self.reset_stream_duration = dur;
904 self
905 }
906
907 /// Creates a new configured HTTP/2.0 server backed by `io`.
908 ///
909 /// It is expected that `io` already be in an appropriate state to commence
910 /// the [HTTP/2.0 handshake]. See [Handshake] for more details.
911 ///
912 /// Returns a future which resolves to the [`Connection`] instance once the
913 /// HTTP/2.0 handshake has been completed.
914 ///
915 /// This function also allows the caller to configure the send payload data
916 /// type. See [Outbound data type] for more details.
917 ///
918 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
919 /// [Handshake]: ../index.html#handshake
920 /// [`Connection`]: struct.Connection.html
921 /// [Outbound data type]: ../index.html#outbound-data-type.
922 ///
923 /// # Examples
924 ///
925 /// Basic usage:
926 ///
927 /// ```
928 /// # use tokio::io::{AsyncRead, AsyncWrite};
929 /// # use h2::server::*;
930 /// #
931 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
932 /// # -> Handshake<T>
933 /// # {
934 /// // `server_fut` is a future representing the completion of the HTTP/2.0
935 /// // handshake.
936 /// let server_fut = Builder::new()
937 /// .handshake(my_io);
938 /// # server_fut
939 /// # }
940 /// #
941 /// # pub fn main() {}
942 /// ```
943 ///
944 /// Configures the send-payload data type. In this case, the outbound data
945 /// type will be `&'static [u8]`.
946 ///
947 /// ```
948 /// # use tokio::io::{AsyncRead, AsyncWrite};
949 /// # use h2::server::*;
950 /// #
951 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
952 /// # -> Handshake<T, &'static [u8]>
953 /// # {
954 /// // `server_fut` is a future representing the completion of the HTTP/2.0
955 /// // handshake.
956 /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
957 /// .handshake(my_io);
958 /// # server_fut
959 /// # }
960 /// #
961 /// # pub fn main() {}
962 /// ```
handshake<T, B>(&self, io: T) -> Handshake<T, B> where T: AsyncRead + AsyncWrite + Unpin, B: Buf + 'static,963 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
964 where
965 T: AsyncRead + AsyncWrite + Unpin,
966 B: Buf + 'static,
967 {
968 Connection::handshake2(io, self.clone())
969 }
970 }
971
972 impl Default for Builder {
default() -> Builder973 fn default() -> Builder {
974 Builder::new()
975 }
976 }
977
978 // ===== impl SendResponse =====
979
980 impl<B: Buf> SendResponse<B> {
981 /// Send a response to a client request.
982 ///
983 /// On success, a [`SendStream`] instance is returned. This instance can be
984 /// used to stream the response body and send trailers.
985 ///
986 /// If a body or trailers will be sent on the returned [`SendStream`]
987 /// instance, then `end_of_stream` must be set to `false` when calling this
988 /// function.
989 ///
990 /// The [`SendResponse`] instance is already associated with a received
991 /// request. This function may only be called once per instance and only if
992 /// [`send_reset`] has not been previously called.
993 ///
994 /// [`SendResponse`]: #
995 /// [`SendStream`]: ../struct.SendStream.html
996 /// [`send_reset`]: #method.send_reset
send_response( &mut self, response: Response<()>, end_of_stream: bool, ) -> Result<SendStream<B>, crate::Error>997 pub fn send_response(
998 &mut self,
999 response: Response<()>,
1000 end_of_stream: bool,
1001 ) -> Result<SendStream<B>, crate::Error> {
1002 self.inner
1003 .send_response(response, end_of_stream)
1004 .map(|_| SendStream::new(self.inner.clone()))
1005 .map_err(Into::into)
1006 }
1007
1008 /// Push a request and response to the client
1009 ///
1010 /// On success, a [`SendResponse`] instance is returned.
1011 ///
1012 /// [`SendResponse`]: #
push_request( &mut self, request: Request<()>, ) -> Result<SendPushedResponse<B>, crate::Error>1013 pub fn push_request(
1014 &mut self,
1015 request: Request<()>,
1016 ) -> Result<SendPushedResponse<B>, crate::Error> {
1017 self.inner
1018 .send_push_promise(request)
1019 .map(|inner| SendPushedResponse {
1020 inner: SendResponse { inner },
1021 })
1022 .map_err(Into::into)
1023 }
1024
1025 /// Send a stream reset to the peer.
1026 ///
1027 /// This essentially cancels the stream, including any inbound or outbound
1028 /// data streams.
1029 ///
1030 /// If this function is called before [`send_response`], a call to
1031 /// [`send_response`] will result in an error.
1032 ///
1033 /// If this function is called while a [`SendStream`] instance is active,
1034 /// any further use of the instance will result in an error.
1035 ///
1036 /// This function should only be called once.
1037 ///
1038 /// [`send_response`]: #method.send_response
1039 /// [`SendStream`]: ../struct.SendStream.html
send_reset(&mut self, reason: Reason)1040 pub fn send_reset(&mut self, reason: Reason) {
1041 self.inner.send_reset(reason)
1042 }
1043
1044 /// Polls to be notified when the client resets this stream.
1045 ///
1046 /// If stream is still open, this returns `Poll::Pending`, and
1047 /// registers the task to be notified if a `RST_STREAM` is received.
1048 ///
1049 /// If a `RST_STREAM` frame is received for this stream, calling this
1050 /// method will yield the `Reason` for the reset.
1051 ///
1052 /// # Error
1053 ///
1054 /// Calling this method after having called `send_response` will return
1055 /// a user error.
poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>>1056 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1057 self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1058 }
1059
1060 /// Returns the stream ID of the response stream.
1061 ///
1062 /// # Panics
1063 ///
1064 /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1065 pub fn stream_id(&self) -> crate::StreamId {
1066 crate::StreamId::from_internal(self.inner.stream_id())
1067 }
1068 }
1069
1070 // ===== impl SendPushedResponse =====
1071
1072 impl<B: Buf> SendPushedResponse<B> {
1073 /// Send a response to a promised request.
1074 ///
1075 /// On success, a [`SendStream`] instance is returned. This instance can be
1076 /// used to stream the response body and send trailers.
1077 ///
1078 /// If a body or trailers will be sent on the returned [`SendStream`]
1079 /// instance, then `end_of_stream` must be set to `false` when calling this
1080 /// function.
1081 ///
1082 /// The [`SendPushedResponse`] instance is associated with a promised
1083 /// request. This function may only be called once per instance and only if
1084 /// [`send_reset`] has not been previously called.
1085 ///
1086 /// [`SendPushedResponse`]: #
1087 /// [`SendStream`]: ../struct.SendStream.html
1088 /// [`send_reset`]: #method.send_reset
send_response( &mut self, response: Response<()>, end_of_stream: bool, ) -> Result<SendStream<B>, crate::Error>1089 pub fn send_response(
1090 &mut self,
1091 response: Response<()>,
1092 end_of_stream: bool,
1093 ) -> Result<SendStream<B>, crate::Error> {
1094 self.inner.send_response(response, end_of_stream)
1095 }
1096
1097 /// Send a stream reset to the peer.
1098 ///
1099 /// This essentially cancels the stream, including any inbound or outbound
1100 /// data streams.
1101 ///
1102 /// If this function is called before [`send_response`], a call to
1103 /// [`send_response`] will result in an error.
1104 ///
1105 /// If this function is called while a [`SendStream`] instance is active,
1106 /// any further use of the instance will result in an error.
1107 ///
1108 /// This function should only be called once.
1109 ///
1110 /// [`send_response`]: #method.send_response
1111 /// [`SendStream`]: ../struct.SendStream.html
send_reset(&mut self, reason: Reason)1112 pub fn send_reset(&mut self, reason: Reason) {
1113 self.inner.send_reset(reason)
1114 }
1115
1116 /// Polls to be notified when the client resets this stream.
1117 ///
1118 /// If stream is still open, this returns `Poll::Pending`, and
1119 /// registers the task to be notified if a `RST_STREAM` is received.
1120 ///
1121 /// If a `RST_STREAM` frame is received for this stream, calling this
1122 /// method will yield the `Reason` for the reset.
1123 ///
1124 /// # Error
1125 ///
1126 /// Calling this method after having called `send_response` will return
1127 /// a user error.
poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>>1128 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1129 self.inner.poll_reset(cx)
1130 }
1131
1132 /// Returns the stream ID of the response stream.
1133 ///
1134 /// # Panics
1135 ///
1136 /// If the lock on the stream store has been poisoned.
stream_id(&self) -> crate::StreamId1137 pub fn stream_id(&self) -> crate::StreamId {
1138 self.inner.stream_id()
1139 }
1140 }
1141
1142 // ===== impl Flush =====
1143
1144 impl<T, B: Buf> Flush<T, B> {
new(codec: Codec<T, B>) -> Self1145 fn new(codec: Codec<T, B>) -> Self {
1146 Flush { codec: Some(codec) }
1147 }
1148 }
1149
1150 impl<T, B> Future for Flush<T, B>
1151 where
1152 T: AsyncWrite + Unpin,
1153 B: Buf,
1154 {
1155 type Output = Result<Codec<T, B>, crate::Error>;
1156
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1157 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1158 // Flush the codec
1159 ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1160
1161 // Return the codec
1162 Poll::Ready(Ok(self.codec.take().unwrap()))
1163 }
1164 }
1165
1166 impl<T, B: Buf> ReadPreface<T, B> {
new(codec: Codec<T, B>) -> Self1167 fn new(codec: Codec<T, B>) -> Self {
1168 ReadPreface {
1169 codec: Some(codec),
1170 pos: 0,
1171 }
1172 }
1173
inner_mut(&mut self) -> &mut T1174 fn inner_mut(&mut self) -> &mut T {
1175 self.codec.as_mut().unwrap().get_mut()
1176 }
1177 }
1178
1179 impl<T, B> Future for ReadPreface<T, B>
1180 where
1181 T: AsyncRead + Unpin,
1182 B: Buf,
1183 {
1184 type Output = Result<Codec<T, B>, crate::Error>;
1185
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1186 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1187 let mut buf = [0; 24];
1188 let mut rem = PREFACE.len() - self.pos;
1189
1190 while rem > 0 {
1191 let mut buf = ReadBuf::new(&mut buf[..rem]);
1192 ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1193 .map_err(crate::Error::from_io)?;
1194 let n = buf.filled().len();
1195 if n == 0 {
1196 return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1197 io::ErrorKind::UnexpectedEof,
1198 "connection closed before reading preface",
1199 ))));
1200 }
1201
1202 if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1203 proto_err!(conn: "read_preface: invalid preface");
1204 // TODO: Should this just write the GO_AWAY frame directly?
1205 return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1206 }
1207
1208 self.pos += n;
1209 rem -= n; // TODO test
1210 }
1211
1212 Poll::Ready(Ok(self.codec.take().unwrap()))
1213 }
1214 }
1215
1216 // ===== impl Handshake =====
1217
1218 impl<T, B: Buf> Future for Handshake<T, B>
1219 where
1220 T: AsyncRead + AsyncWrite + Unpin,
1221 B: Buf + 'static,
1222 {
1223 type Output = Result<Connection<T, B>, crate::Error>;
1224
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1225 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1226 let span = self.span.clone(); // XXX(eliza): T_T
1227 let _e = span.enter();
1228 tracing::trace!(state = ?self.state);
1229 use crate::server::Handshaking::*;
1230
1231 self.state = if let Flushing(ref mut flush) = self.state {
1232 // We're currently flushing a pending SETTINGS frame. Poll the
1233 // flush future, and, if it's completed, advance our state to wait
1234 // for the client preface.
1235 let codec = match Pin::new(flush).poll(cx)? {
1236 Poll::Pending => {
1237 tracing::trace!(flush.poll = %"Pending");
1238 return Poll::Pending;
1239 }
1240 Poll::Ready(flushed) => {
1241 tracing::trace!(flush.poll = %"Ready");
1242 flushed
1243 }
1244 };
1245 Handshaking::from(ReadPreface::new(codec))
1246 } else {
1247 // Otherwise, we haven't actually advanced the state, but we have
1248 // to replace it with itself, because we have to return a value.
1249 // (note that the assignment to `self.state` has to be outside of
1250 // the `if let` block above in order to placate the borrow checker).
1251 mem::replace(&mut self.state, Handshaking::Empty)
1252 };
1253 let poll = if let ReadingPreface(ref mut read) = self.state {
1254 // We're now waiting for the client preface. Poll the `ReadPreface`
1255 // future. If it has completed, we will create a `Connection` handle
1256 // for the connection.
1257 Pin::new(read).poll(cx)
1258 // Actually creating the `Connection` has to occur outside of this
1259 // `if let` block, because we've borrowed `self` mutably in order
1260 // to poll the state and won't be able to borrow the SETTINGS frame
1261 // as well until we release the borrow for `poll()`.
1262 } else {
1263 unreachable!("Handshake::poll() state was not advanced completely!")
1264 };
1265 poll?.map(|codec| {
1266 let connection = proto::Connection::new(
1267 codec,
1268 Config {
1269 next_stream_id: 2.into(),
1270 // Server does not need to locally initiate any streams
1271 initial_max_send_streams: 0,
1272 reset_stream_duration: self.builder.reset_stream_duration,
1273 reset_stream_max: self.builder.reset_stream_max,
1274 settings: self.builder.settings.clone(),
1275 },
1276 );
1277
1278 tracing::trace!("connection established!");
1279 let mut c = Connection { connection };
1280 if let Some(sz) = self.builder.initial_target_connection_window_size {
1281 c.set_target_window_size(sz);
1282 }
1283 Ok(c)
1284 })
1285 }
1286 }
1287
1288 impl<T, B> fmt::Debug for Handshake<T, B>
1289 where
1290 T: AsyncRead + AsyncWrite + fmt::Debug,
1291 B: fmt::Debug + Buf,
1292 {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result1293 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1294 write!(fmt, "server::Handshake")
1295 }
1296 }
1297
1298 impl Peer {
convert_send_message( id: StreamId, response: Response<()>, end_of_stream: bool, ) -> frame::Headers1299 pub fn convert_send_message(
1300 id: StreamId,
1301 response: Response<()>,
1302 end_of_stream: bool,
1303 ) -> frame::Headers {
1304 use http::response::Parts;
1305
1306 // Extract the components of the HTTP request
1307 let (
1308 Parts {
1309 status, headers, ..
1310 },
1311 _,
1312 ) = response.into_parts();
1313
1314 // Build the set pseudo header set. All requests will include `method`
1315 // and `path`.
1316 let pseudo = Pseudo::response(status);
1317
1318 // Create the HEADERS frame
1319 let mut frame = frame::Headers::new(id, pseudo, headers);
1320
1321 if end_of_stream {
1322 frame.set_end_stream()
1323 }
1324
1325 frame
1326 }
1327
convert_push_message( stream_id: StreamId, promised_id: StreamId, request: Request<()>, ) -> Result<frame::PushPromise, UserError>1328 pub fn convert_push_message(
1329 stream_id: StreamId,
1330 promised_id: StreamId,
1331 request: Request<()>,
1332 ) -> Result<frame::PushPromise, UserError> {
1333 use http::request::Parts;
1334
1335 if let Err(e) = frame::PushPromise::validate_request(&request) {
1336 use PushPromiseHeaderError::*;
1337 match e {
1338 NotSafeAndCacheable => tracing::debug!(
1339 ?promised_id,
1340 "convert_push_message: method {} is not safe and cacheable",
1341 request.method(),
1342 ),
1343 InvalidContentLength(e) => tracing::debug!(
1344 ?promised_id,
1345 "convert_push_message; promised request has invalid content-length {:?}",
1346 e,
1347 ),
1348 }
1349 return Err(UserError::MalformedHeaders);
1350 }
1351
1352 // Extract the components of the HTTP request
1353 let (
1354 Parts {
1355 method,
1356 uri,
1357 headers,
1358 ..
1359 },
1360 _,
1361 ) = request.into_parts();
1362
1363 let pseudo = Pseudo::request(method, uri);
1364
1365 Ok(frame::PushPromise::new(
1366 stream_id,
1367 promised_id,
1368 pseudo,
1369 headers,
1370 ))
1371 }
1372 }
1373
1374 impl proto::Peer for Peer {
1375 type Poll = Request<()>;
1376
1377 const NAME: &'static str = "Server";
1378
is_server() -> bool1379 fn is_server() -> bool {
1380 true
1381 }
1382
1383 fn r#dyn() -> proto::DynPeer {
1384 proto::DynPeer::Server
1385 }
1386
convert_poll_message( pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId, ) -> Result<Self::Poll, Error>1387 fn convert_poll_message(
1388 pseudo: Pseudo,
1389 fields: HeaderMap,
1390 stream_id: StreamId,
1391 ) -> Result<Self::Poll, Error> {
1392 use http::{uri, Version};
1393
1394 let mut b = Request::builder();
1395
1396 macro_rules! malformed {
1397 ($($arg:tt)*) => {{
1398 tracing::debug!($($arg)*);
1399 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1400 }}
1401 }
1402
1403 b = b.version(Version::HTTP_2);
1404
1405 let is_connect;
1406 if let Some(method) = pseudo.method {
1407 is_connect = method == Method::CONNECT;
1408 b = b.method(method);
1409 } else {
1410 malformed!("malformed headers: missing method");
1411 }
1412
1413 // Specifying :status for a request is a protocol error
1414 if pseudo.status.is_some() {
1415 tracing::trace!("malformed headers: :status field on request; PROTOCOL_ERROR");
1416 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
1417 }
1418
1419 // Convert the URI
1420 let mut parts = uri::Parts::default();
1421
1422 // A request translated from HTTP/1 must not include the :authority
1423 // header
1424 if let Some(authority) = pseudo.authority {
1425 let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1426 parts.authority = Some(maybe_authority.or_else(|why| {
1427 malformed!(
1428 "malformed headers: malformed authority ({:?}): {}",
1429 authority,
1430 why,
1431 )
1432 })?);
1433 }
1434
1435 // A :scheme is required, except CONNECT.
1436 if let Some(scheme) = pseudo.scheme {
1437 if is_connect {
1438 malformed!(":scheme in CONNECT");
1439 }
1440 let maybe_scheme = scheme.parse();
1441 let scheme = maybe_scheme.or_else(|why| {
1442 malformed!(
1443 "malformed headers: malformed scheme ({:?}): {}",
1444 scheme,
1445 why,
1446 )
1447 })?;
1448
1449 // It's not possible to build an `Uri` from a scheme and path. So,
1450 // after validating is was a valid scheme, we just have to drop it
1451 // if there isn't an :authority.
1452 if parts.authority.is_some() {
1453 parts.scheme = Some(scheme);
1454 }
1455 } else if !is_connect {
1456 malformed!("malformed headers: missing scheme");
1457 }
1458
1459 if let Some(path) = pseudo.path {
1460 if is_connect {
1461 malformed!(":path in CONNECT");
1462 }
1463
1464 // This cannot be empty
1465 if path.is_empty() {
1466 malformed!("malformed headers: missing path");
1467 }
1468
1469 let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1470 parts.path_and_query = Some(maybe_path.or_else(|why| {
1471 malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1472 })?);
1473 }
1474
1475 b = b.uri(parts);
1476
1477 let mut request = match b.body(()) {
1478 Ok(request) => request,
1479 Err(e) => {
1480 // TODO: Should there be more specialized handling for different
1481 // kinds of errors
1482 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1483 return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1484 }
1485 };
1486
1487 *request.headers_mut() = fields;
1488
1489 Ok(request)
1490 }
1491 }
1492
1493 // ===== impl Handshaking =====
1494
1495 impl<T, B> fmt::Debug for Handshaking<T, B>
1496 where
1497 B: Buf,
1498 {
1499 #[inline]
fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error>1500 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1501 match *self {
1502 Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"),
1503 Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"),
1504 Handshaking::Empty => write!(f, "Handshaking::Empty"),
1505 }
1506 }
1507 }
1508
1509 impl<T, B> convert::From<Flush<T, Prioritized<B>>> for Handshaking<T, B>
1510 where
1511 T: AsyncRead + AsyncWrite,
1512 B: Buf,
1513 {
1514 #[inline]
from(flush: Flush<T, Prioritized<B>>) -> Self1515 fn from(flush: Flush<T, Prioritized<B>>) -> Self {
1516 Handshaking::Flushing(flush.instrument(tracing::trace_span!("flush")))
1517 }
1518 }
1519
1520 impl<T, B> convert::From<ReadPreface<T, Prioritized<B>>> for Handshaking<T, B>
1521 where
1522 T: AsyncRead + AsyncWrite,
1523 B: Buf,
1524 {
1525 #[inline]
from(read: ReadPreface<T, Prioritized<B>>) -> Self1526 fn from(read: ReadPreface<T, Prioritized<B>>) -> Self {
1527 Handshaking::ReadingPreface(read.instrument(tracing::trace_span!("read_preface")))
1528 }
1529 }
1530
1531 impl<T, B> convert::From<Codec<T, Prioritized<B>>> for Handshaking<T, B>
1532 where
1533 T: AsyncRead + AsyncWrite,
1534 B: Buf,
1535 {
1536 #[inline]
from(codec: Codec<T, Prioritized<B>>) -> Self1537 fn from(codec: Codec<T, Prioritized<B>>) -> Self {
1538 Handshaking::from(Flush::new(codec))
1539 }
1540 }
1541