1 //! Server implementation of the HTTP/2.0 protocol. 2 //! 3 //! # Getting started 4 //! 5 //! Running an HTTP/2.0 server requires the caller to manage accepting the 6 //! connections as well as getting the connections to a state that is ready to 7 //! begin the HTTP/2.0 handshake. See [here](../index.html#handshake) for more 8 //! details. 9 //! 10 //! This could be as basic as using Tokio's [`TcpListener`] to accept 11 //! connections, but usually it means using either ALPN or HTTP/1.1 protocol 12 //! upgrades. 13 //! 14 //! Once a connection is obtained, it is passed to [`handshake`], 15 //! which will begin the [HTTP/2.0 handshake]. This returns a future that 16 //! completes once the handshake process is performed and HTTP/2.0 streams may 17 //! be received. 18 //! 19 //! [`handshake`] uses default configuration values. There are a number of 20 //! settings that can be changed by using [`Builder`] instead. 21 //! 22 //! # Inbound streams 23 //! 24 //! The [`Connection`] instance is used to accept inbound HTTP/2.0 streams. It 25 //! does this by implementing [`futures::Stream`]. When a new stream is 26 //! received, a call to [`Connection::poll`] will return `(request, response)`. 27 //! The `request` handle (of type [`http::Request<RecvStream>`]) contains the 28 //! HTTP request head as well as provides a way to receive the inbound data 29 //! stream and the trailers. The `response` handle (of type [`SendStream`]) 30 //! allows responding to the request, stream the response payload, send 31 //! trailers, and send push promises. 32 //! 33 //! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream 34 //! can be operated independently. 35 //! 36 //! # Managing the connection 37 //! 38 //! The [`Connection`] instance is used to manage connection state. The caller 39 //! is required to call either [`Connection::poll`] or 40 //! [`Connection::poll_close`] in order to advance the connection state. Simply 41 //! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the 42 //! connection state is advanced. 43 //! 44 //! It is not required to call **both** [`Connection::poll`] and 45 //! [`Connection::poll_close`]. If the caller is ready to accept a new stream, 46 //! then only [`Connection::poll`] should be called. When the caller **does 47 //! not** want to accept a new stream, [`Connection::poll_close`] should be 48 //! called. 49 //! 50 //! The [`Connection`] instance should only be dropped once 51 //! [`Connection::poll_close`] returns `Ready`. Once [`Connection::poll`] 52 //! returns `Ready(None)`, there will no longer be any more inbound streams. At 53 //! this point, only [`Connection::poll_close`] should be called. 54 //! 55 //! # Shutting down the server 56 //! 57 //! Graceful shutdown of the server is [not yet 58 //! implemented](https://github.com/hyperium/h2/issues/69). 59 //! 60 //! # Example 61 //! 62 //! A basic HTTP/2.0 server example that runs over TCP and assumes [prior 63 //! knowledge], i.e. both the client and the server assume that the TCP socket 64 //! will use the HTTP/2.0 protocol without prior negotiation. 65 //! 66 //! ```no_run 67 //! use h2::server; 68 //! use http::{Response, StatusCode}; 69 //! use tokio::net::TcpListener; 70 //! 71 //! #[tokio::main] 72 //! pub async fn main() { 73 //! let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap(); 74 //! 75 //! // Accept all incoming TCP connections. 76 //! loop { 77 //! if let Ok((socket, _peer_addr)) = listener.accept().await { 78 //! // Spawn a new task to process each connection. 79 //! tokio::spawn(async { 80 //! // Start the HTTP/2.0 connection handshake 81 //! let mut h2 = server::handshake(socket).await.unwrap(); 82 //! // Accept all inbound HTTP/2.0 streams sent over the 83 //! // connection. 84 //! while let Some(request) = h2.accept().await { 85 //! let (request, mut respond) = request.unwrap(); 86 //! println!("Received request: {:?}", request); 87 //! 88 //! // Build a response with no body 89 //! let response = Response::builder() 90 //! .status(StatusCode::OK) 91 //! .body(()) 92 //! .unwrap(); 93 //! 94 //! // Send the response back to the client 95 //! respond.send_response(response, true) 96 //! .unwrap(); 97 //! } 98 //! 99 //! }); 100 //! } 101 //! } 102 //! } 103 //! ``` 104 //! 105 //! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http 106 //! [`handshake`]: fn.handshake.html 107 //! [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader 108 //! [`Builder`]: struct.Builder.html 109 //! [`Connection`]: struct.Connection.html 110 //! [`Connection::poll`]: struct.Connection.html#method.poll 111 //! [`Connection::poll_close`]: struct.Connection.html#method.poll_close 112 //! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html 113 //! [`http::Request<RecvStream>`]: ../struct.RecvStream.html 114 //! [`RecvStream`]: ../struct.RecvStream.html 115 //! [`SendStream`]: ../struct.SendStream.html 116 //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html 117 118 use crate::codec::{Codec, RecvError, UserError}; 119 use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId}; 120 use crate::proto::{self, Config, Prioritized}; 121 use crate::{FlowControl, PingPong, RecvStream, SendStream}; 122 123 use bytes::{Buf, Bytes}; 124 use http::{HeaderMap, Request, Response}; 125 use std::future::Future; 126 use std::pin::Pin; 127 use std::task::{Context, Poll}; 128 use std::time::Duration; 129 use std::{convert, fmt, io, mem}; 130 use tokio::io::{AsyncRead, AsyncWrite}; 131 132 /// In progress HTTP/2.0 connection handshake future. 133 /// 134 /// This type implements `Future`, yielding a `Connection` instance once the 135 /// handshake has completed. 136 /// 137 /// The handshake is completed once the connection preface is fully received 138 /// from the client **and** the initial settings frame is sent to the client. 139 /// 140 /// The handshake future does not wait for the initial settings frame from the 141 /// client. 142 /// 143 /// See [module] level docs for more details. 144 /// 145 /// [module]: index.html 146 #[must_use = "futures do nothing unless polled"] 147 pub struct Handshake<T, B: Buf = Bytes> { 148 /// The config to pass to Connection::new after handshake succeeds. 149 builder: Builder, 150 /// The current state of the handshake. 151 state: Handshaking<T, B>, 152 } 153 154 /// Accepts inbound HTTP/2.0 streams on a connection. 155 /// 156 /// A `Connection` is backed by an I/O resource (usually a TCP socket) and 157 /// implements the HTTP/2.0 server logic for that connection. It is responsible 158 /// for receiving inbound streams initiated by the client as well as driving the 159 /// internal state forward. 160 /// 161 /// `Connection` values are created by calling [`handshake`]. Once a 162 /// `Connection` value is obtained, the caller must call [`poll`] or 163 /// [`poll_close`] in order to drive the internal connection state forward. 164 /// 165 /// See [module level] documentation for more details 166 /// 167 /// [module level]: index.html 168 /// [`handshake`]: struct.Connection.html#method.handshake 169 /// [`poll`]: struct.Connection.html#method.poll 170 /// [`poll_close`]: struct.Connection.html#method.poll_close 171 /// 172 /// # Examples 173 /// 174 /// ``` 175 /// # use tokio::io::{AsyncRead, AsyncWrite}; 176 /// # use h2::server; 177 /// # use h2::server::*; 178 /// # 179 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) { 180 /// let mut server = server::handshake(my_io).await.unwrap(); 181 /// while let Some(request) = server.accept().await { 182 /// let (request, respond) = request.unwrap(); 183 /// // Process the request and send the response back to the client 184 /// // using `respond`. 185 /// } 186 /// # } 187 /// # 188 /// # pub fn main() {} 189 /// ``` 190 #[must_use = "streams do nothing unless polled"] 191 pub struct Connection<T, B: Buf> { 192 connection: proto::Connection<T, Peer, B>, 193 } 194 195 /// Builds server connections with custom configuration values. 196 /// 197 /// Methods can be chained in order to set the configuration values. 198 /// 199 /// The server is constructed by calling [`handshake`] and passing the I/O 200 /// handle that will back the HTTP/2.0 server. 201 /// 202 /// New instances of `Builder` are obtained via [`Builder::new`]. 203 /// 204 /// See function level documentation for details on the various server 205 /// configuration settings. 206 /// 207 /// [`Builder::new`]: struct.Builder.html#method.new 208 /// [`handshake`]: struct.Builder.html#method.handshake 209 /// 210 /// # Examples 211 /// 212 /// ``` 213 /// # use tokio::io::{AsyncRead, AsyncWrite}; 214 /// # use h2::server::*; 215 /// # 216 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 217 /// # -> Handshake<T> 218 /// # { 219 /// // `server_fut` is a future representing the completion of the HTTP/2.0 220 /// // handshake. 221 /// let server_fut = Builder::new() 222 /// .initial_window_size(1_000_000) 223 /// .max_concurrent_streams(1000) 224 /// .handshake(my_io); 225 /// # server_fut 226 /// # } 227 /// # 228 /// # pub fn main() {} 229 /// ``` 230 #[derive(Clone, Debug)] 231 pub struct Builder { 232 /// Time to keep locally reset streams around before reaping. 233 reset_stream_duration: Duration, 234 235 /// Maximum number of locally reset streams to keep at a time. 236 reset_stream_max: usize, 237 238 /// Initial `Settings` frame to send as part of the handshake. 239 settings: Settings, 240 241 /// Initial target window size for new connections. 242 initial_target_connection_window_size: Option<u32>, 243 } 244 245 /// Send a response back to the client 246 /// 247 /// A `SendResponse` instance is provided when receiving a request and is used 248 /// to send the associated response back to the client. It is also used to 249 /// explicitly reset the stream with a custom reason. 250 /// 251 /// It will also be used to initiate push promises linked with the associated 252 /// stream. 253 /// 254 /// If the `SendResponse` instance is dropped without sending a response, then 255 /// the HTTP/2.0 stream will be reset. 256 /// 257 /// See [module] level docs for more details. 258 /// 259 /// [module]: index.html 260 #[derive(Debug)] 261 pub struct SendResponse<B: Buf> { 262 inner: proto::StreamRef<B>, 263 } 264 265 /// Send a response to a promised request 266 /// 267 /// A `SendPushedResponse` instance is provided when promising a request and is used 268 /// to send the associated response to the client. It is also used to 269 /// explicitly reset the stream with a custom reason. 270 /// 271 /// It can not be used to initiate push promises. 272 /// 273 /// If the `SendPushedResponse` instance is dropped without sending a response, then 274 /// the HTTP/2.0 stream will be reset. 275 /// 276 /// See [module] level docs for more details. 277 /// 278 /// [module]: index.html 279 pub struct SendPushedResponse<B: Buf> { 280 inner: SendResponse<B>, 281 } 282 283 // Manual implementation necessary because of rust-lang/rust#26925 284 impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> { 285 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 286 write!(f, "SendPushedResponse {{ {:?} }}", self.inner) 287 } 288 } 289 290 /// Stages of an in-progress handshake. 291 enum Handshaking<T, B: Buf> { 292 /// State 1. Connection is flushing pending SETTINGS frame. 293 Flushing(Flush<T, Prioritized<B>>), 294 /// State 2. Connection is waiting for the client preface. 295 ReadingPreface(ReadPreface<T, Prioritized<B>>), 296 /// Dummy state for `mem::replace`. 297 Empty, 298 } 299 300 /// Flush a Sink 301 struct Flush<T, B> { 302 codec: Option<Codec<T, B>>, 303 } 304 305 /// Read the client connection preface 306 struct ReadPreface<T, B> { 307 codec: Option<Codec<T, B>>, 308 pos: usize, 309 } 310 311 #[derive(Debug)] 312 pub(crate) struct Peer; 313 314 const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 315 316 /// Creates a new configured HTTP/2.0 server with default configuration 317 /// values backed by `io`. 318 /// 319 /// It is expected that `io` already be in an appropriate state to commence 320 /// the [HTTP/2.0 handshake]. See [Handshake] for more details. 321 /// 322 /// Returns a future which resolves to the [`Connection`] instance once the 323 /// HTTP/2.0 handshake has been completed. The returned [`Connection`] 324 /// instance will be using default configuration values. Use [`Builder`] to 325 /// customize the configuration values used by a [`Connection`] instance. 326 /// 327 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader 328 /// [Handshake]: ../index.html#handshake 329 /// [`Connection`]: struct.Connection.html 330 /// 331 /// # Examples 332 /// 333 /// ``` 334 /// # use tokio::io::{AsyncRead, AsyncWrite}; 335 /// # use h2::server; 336 /// # use h2::server::*; 337 /// # 338 /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 339 /// # { 340 /// let connection = server::handshake(my_io).await.unwrap(); 341 /// // The HTTP/2.0 handshake has completed, now use `connection` to 342 /// // accept inbound HTTP/2.0 streams. 343 /// # } 344 /// # 345 /// # pub fn main() {} 346 /// ``` 347 pub fn handshake<T>(io: T) -> Handshake<T, Bytes> 348 where 349 T: AsyncRead + AsyncWrite + Unpin, 350 { 351 Builder::new().handshake(io) 352 } 353 354 // ===== impl Connection ===== 355 356 impl<T, B> Connection<T, B> 357 where 358 T: AsyncRead + AsyncWrite + Unpin, 359 B: Buf + 'static, 360 { 361 fn handshake2(io: T, builder: Builder) -> Handshake<T, B> { 362 // Create the codec. 363 let mut codec = Codec::new(io); 364 365 if let Some(max) = builder.settings.max_frame_size() { 366 codec.set_max_recv_frame_size(max as usize); 367 } 368 369 if let Some(max) = builder.settings.max_header_list_size() { 370 codec.set_max_recv_header_list_size(max as usize); 371 } 372 373 // Send initial settings frame. 374 codec 375 .buffer(builder.settings.clone().into()) 376 .expect("invalid SETTINGS frame"); 377 378 // Create the handshake future. 379 let state = Handshaking::from(codec); 380 381 Handshake { builder, state } 382 } 383 384 /// Accept the next incoming request on this connection. 385 pub async fn accept( 386 &mut self, 387 ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> { 388 futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await 389 } 390 391 #[doc(hidden)] 392 pub fn poll_accept( 393 &mut self, 394 cx: &mut Context<'_>, 395 ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> { 396 // Always try to advance the internal state. Getting Pending also is 397 // needed to allow this function to return Pending. 398 if let Poll::Ready(_) = self.poll_closed(cx)? { 399 // If the socket is closed, don't return anything 400 // TODO: drop any pending streams 401 return Poll::Ready(None); 402 } 403 404 if let Some(inner) = self.connection.next_incoming() { 405 log::trace!("received incoming"); 406 let (head, _) = inner.take_request().into_parts(); 407 let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque())); 408 409 let request = Request::from_parts(head, body); 410 let respond = SendResponse { inner }; 411 412 return Poll::Ready(Some(Ok((request, respond)))); 413 } 414 415 Poll::Pending 416 } 417 418 /// Sets the target window size for the whole connection. 419 /// 420 /// If `size` is greater than the current value, then a `WINDOW_UPDATE` 421 /// frame will be immediately sent to the remote, increasing the connection 422 /// level window by `size - current_value`. 423 /// 424 /// If `size` is less than the current value, nothing will happen 425 /// immediately. However, as window capacity is released by 426 /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent 427 /// out until the number of "in flight" bytes drops below `size`. 428 /// 429 /// The default value is 65,535. 430 /// 431 /// See [`FlowControl`] documentation for more details. 432 /// 433 /// [`FlowControl`]: ../struct.FlowControl.html 434 /// [library level]: ../index.html#flow-control 435 pub fn set_target_window_size(&mut self, size: u32) { 436 assert!(size <= proto::MAX_WINDOW_SIZE); 437 self.connection.set_target_window_size(size); 438 } 439 440 /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level 441 /// flow control for received data. 442 /// 443 /// The `SETTINGS` will be sent to the remote, and only applied once the 444 /// remote acknowledges the change. 445 /// 446 /// This can be used to increase or decrease the window size for existing 447 /// streams. 448 /// 449 /// # Errors 450 /// 451 /// Returns an error if a previous call is still pending acknowledgement 452 /// from the remote endpoint. 453 pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> { 454 assert!(size <= proto::MAX_WINDOW_SIZE); 455 self.connection.set_initial_window_size(size)?; 456 Ok(()) 457 } 458 459 /// Returns `Ready` when the underlying connection has closed. 460 /// 461 /// If any new inbound streams are received during a call to `poll_closed`, 462 /// they will be queued and returned on the next call to [`poll_accept`]. 463 /// 464 /// This function will advance the internal connection state, driving 465 /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]). 466 /// 467 /// See [here](index.html#managing-the-connection) for more details. 468 /// 469 /// [`poll_accept`]: struct.Connection.html#method.poll_accept 470 /// [`RecvStream`]: ../struct.RecvStream.html 471 /// [`SendStream`]: ../struct.SendStream.html 472 pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> { 473 self.connection.poll(cx).map_err(Into::into) 474 } 475 476 #[doc(hidden)] 477 #[deprecated(note = "renamed to poll_closed")] 478 pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> { 479 self.poll_closed(cx) 480 } 481 482 /// Sets the connection to a GOAWAY state. 483 /// 484 /// Does not terminate the connection. Must continue being polled to close 485 /// connection. 486 /// 487 /// After flushing the GOAWAY frame, the connection is closed. Any 488 /// outstanding streams do not prevent the connection from closing. This 489 /// should usually be reserved for shutting down when something bad 490 /// external to `h2` has happened, and open streams cannot be properly 491 /// handled. 492 /// 493 /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown). 494 pub fn abrupt_shutdown(&mut self, reason: Reason) { 495 self.connection.go_away_from_user(reason); 496 } 497 498 /// Starts a [graceful shutdown][1] process. 499 /// 500 /// Must continue being polled to close connection. 501 /// 502 /// It's possible to receive more requests after calling this method, since 503 /// they might have been in-flight from the client already. After about 504 /// 1 RTT, no new requests should be accepted. Once all active streams 505 /// have completed, the connection is closed. 506 /// 507 /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY 508 pub fn graceful_shutdown(&mut self) { 509 self.connection.go_away_gracefully(); 510 } 511 512 /// Takes a `PingPong` instance from the connection. 513 /// 514 /// # Note 515 /// 516 /// This may only be called once. Calling multiple times will return `None`. 517 pub fn ping_pong(&mut self) -> Option<PingPong> { 518 self.connection.take_user_pings().map(PingPong::new) 519 } 520 } 521 522 #[cfg(feature = "stream")] 523 impl<T, B> futures_core::Stream for Connection<T, B> 524 where 525 T: AsyncRead + AsyncWrite + Unpin, 526 B: Buf + 'static, 527 { 528 type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>; 529 530 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 531 self.poll_accept(cx) 532 } 533 } 534 535 impl<T, B> fmt::Debug for Connection<T, B> 536 where 537 T: fmt::Debug, 538 B: fmt::Debug + Buf, 539 { 540 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 541 fmt.debug_struct("Connection") 542 .field("connection", &self.connection) 543 .finish() 544 } 545 } 546 547 // ===== impl Builder ===== 548 549 impl Builder { 550 /// Returns a new server builder instance initialized with default 551 /// configuration values. 552 /// 553 /// Configuration methods can be chained on the return value. 554 /// 555 /// # Examples 556 /// 557 /// ``` 558 /// # use tokio::io::{AsyncRead, AsyncWrite}; 559 /// # use h2::server::*; 560 /// # 561 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 562 /// # -> Handshake<T> 563 /// # { 564 /// // `server_fut` is a future representing the completion of the HTTP/2.0 565 /// // handshake. 566 /// let server_fut = Builder::new() 567 /// .initial_window_size(1_000_000) 568 /// .max_concurrent_streams(1000) 569 /// .handshake(my_io); 570 /// # server_fut 571 /// # } 572 /// # 573 /// # pub fn main() {} 574 /// ``` 575 pub fn new() -> Builder { 576 Builder { 577 reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), 578 reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, 579 settings: Settings::default(), 580 initial_target_connection_window_size: None, 581 } 582 } 583 584 /// Indicates the initial window size (in octets) for stream-level 585 /// flow control for received data. 586 /// 587 /// The initial window of a stream is used as part of flow control. For more 588 /// details, see [`FlowControl`]. 589 /// 590 /// The default value is 65,535. 591 /// 592 /// [`FlowControl`]: ../struct.FlowControl.html 593 /// 594 /// # Examples 595 /// 596 /// ``` 597 /// # use tokio::io::{AsyncRead, AsyncWrite}; 598 /// # use h2::server::*; 599 /// # 600 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 601 /// # -> Handshake<T> 602 /// # { 603 /// // `server_fut` is a future representing the completion of the HTTP/2.0 604 /// // handshake. 605 /// let server_fut = Builder::new() 606 /// .initial_window_size(1_000_000) 607 /// .handshake(my_io); 608 /// # server_fut 609 /// # } 610 /// # 611 /// # pub fn main() {} 612 /// ``` 613 pub fn initial_window_size(&mut self, size: u32) -> &mut Self { 614 self.settings.set_initial_window_size(Some(size)); 615 self 616 } 617 618 /// Indicates the initial window size (in octets) for connection-level flow control 619 /// for received data. 620 /// 621 /// The initial window of a connection is used as part of flow control. For more details, 622 /// see [`FlowControl`]. 623 /// 624 /// The default value is 65,535. 625 /// 626 /// [`FlowControl`]: ../struct.FlowControl.html 627 /// 628 /// # Examples 629 /// 630 /// ``` 631 /// # use tokio::io::{AsyncRead, AsyncWrite}; 632 /// # use h2::server::*; 633 /// # 634 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 635 /// # -> Handshake<T> 636 /// # { 637 /// // `server_fut` is a future representing the completion of the HTTP/2.0 638 /// // handshake. 639 /// let server_fut = Builder::new() 640 /// .initial_connection_window_size(1_000_000) 641 /// .handshake(my_io); 642 /// # server_fut 643 /// # } 644 /// # 645 /// # pub fn main() {} 646 /// ``` 647 pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self { 648 self.initial_target_connection_window_size = Some(size); 649 self 650 } 651 652 /// Indicates the size (in octets) of the largest HTTP/2.0 frame payload that the 653 /// configured server is able to accept. 654 /// 655 /// The sender may send data frames that are **smaller** than this value, 656 /// but any data larger than `max` will be broken up into multiple `DATA` 657 /// frames. 658 /// 659 /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384. 660 /// 661 /// # Examples 662 /// 663 /// ``` 664 /// # use tokio::io::{AsyncRead, AsyncWrite}; 665 /// # use h2::server::*; 666 /// # 667 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 668 /// # -> Handshake<T> 669 /// # { 670 /// // `server_fut` is a future representing the completion of the HTTP/2.0 671 /// // handshake. 672 /// let server_fut = Builder::new() 673 /// .max_frame_size(1_000_000) 674 /// .handshake(my_io); 675 /// # server_fut 676 /// # } 677 /// # 678 /// # pub fn main() {} 679 /// ``` 680 /// 681 /// # Panics 682 /// 683 /// This function panics if `max` is not within the legal range specified 684 /// above. 685 pub fn max_frame_size(&mut self, max: u32) -> &mut Self { 686 self.settings.set_max_frame_size(Some(max)); 687 self 688 } 689 690 /// Sets the max size of received header frames. 691 /// 692 /// This advisory setting informs a peer of the maximum size of header list 693 /// that the sender is prepared to accept, in octets. The value is based on 694 /// the uncompressed size of header fields, including the length of the name 695 /// and value in octets plus an overhead of 32 octets for each header field. 696 /// 697 /// This setting is also used to limit the maximum amount of data that is 698 /// buffered to decode HEADERS frames. 699 /// 700 /// # Examples 701 /// 702 /// ``` 703 /// # use tokio::io::{AsyncRead, AsyncWrite}; 704 /// # use h2::server::*; 705 /// # 706 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 707 /// # -> Handshake<T> 708 /// # { 709 /// // `server_fut` is a future representing the completion of the HTTP/2.0 710 /// // handshake. 711 /// let server_fut = Builder::new() 712 /// .max_header_list_size(16 * 1024) 713 /// .handshake(my_io); 714 /// # server_fut 715 /// # } 716 /// # 717 /// # pub fn main() {} 718 /// ``` 719 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { 720 self.settings.set_max_header_list_size(Some(max)); 721 self 722 } 723 724 /// Sets the maximum number of concurrent streams. 725 /// 726 /// The maximum concurrent streams setting only controls the maximum number 727 /// of streams that can be initiated by the remote peer. In other words, 728 /// when this setting is set to 100, this does not limit the number of 729 /// concurrent streams that can be created by the caller. 730 /// 731 /// It is recommended that this value be no smaller than 100, so as to not 732 /// unnecessarily limit parallelism. However, any value is legal, including 733 /// 0. If `max` is set to 0, then the remote will not be permitted to 734 /// initiate streams. 735 /// 736 /// Note that streams in the reserved state, i.e., push promises that have 737 /// been reserved but the stream has not started, do not count against this 738 /// setting. 739 /// 740 /// Also note that if the remote *does* exceed the value set here, it is not 741 /// a protocol level error. Instead, the `h2` library will immediately reset 742 /// the stream. 743 /// 744 /// See [Section 5.1.2] in the HTTP/2.0 spec for more details. 745 /// 746 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 747 /// 748 /// # Examples 749 /// 750 /// ``` 751 /// # use tokio::io::{AsyncRead, AsyncWrite}; 752 /// # use h2::server::*; 753 /// # 754 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 755 /// # -> Handshake<T> 756 /// # { 757 /// // `server_fut` is a future representing the completion of the HTTP/2.0 758 /// // handshake. 759 /// let server_fut = Builder::new() 760 /// .max_concurrent_streams(1000) 761 /// .handshake(my_io); 762 /// # server_fut 763 /// # } 764 /// # 765 /// # pub fn main() {} 766 /// ``` 767 pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self { 768 self.settings.set_max_concurrent_streams(Some(max)); 769 self 770 } 771 772 /// Sets the maximum number of concurrent locally reset streams. 773 /// 774 /// When a stream is explicitly reset by either calling 775 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance 776 /// before completing the stream, the HTTP/2.0 specification requires that 777 /// any further frames received for that stream must be ignored for "some 778 /// time". 779 /// 780 /// In order to satisfy the specification, internal state must be maintained 781 /// to implement the behavior. This state grows linearly with the number of 782 /// streams that are locally reset. 783 /// 784 /// The `max_concurrent_reset_streams` setting configures sets an upper 785 /// bound on the amount of state that is maintained. When this max value is 786 /// reached, the oldest reset stream is purged from memory. 787 /// 788 /// Once the stream has been fully purged from memory, any additional frames 789 /// received for that stream will result in a connection level protocol 790 /// error, forcing the connection to terminate. 791 /// 792 /// The default value is 10. 793 /// 794 /// # Examples 795 /// 796 /// ``` 797 /// # use tokio::io::{AsyncRead, AsyncWrite}; 798 /// # use h2::server::*; 799 /// # 800 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 801 /// # -> Handshake<T> 802 /// # { 803 /// // `server_fut` is a future representing the completion of the HTTP/2.0 804 /// // handshake. 805 /// let server_fut = Builder::new() 806 /// .max_concurrent_reset_streams(1000) 807 /// .handshake(my_io); 808 /// # server_fut 809 /// # } 810 /// # 811 /// # pub fn main() {} 812 /// ``` 813 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { 814 self.reset_stream_max = max; 815 self 816 } 817 818 /// Sets the maximum number of concurrent locally reset streams. 819 /// 820 /// When a stream is explicitly reset by either calling 821 /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance 822 /// before completing the stream, the HTTP/2.0 specification requires that 823 /// any further frames received for that stream must be ignored for "some 824 /// time". 825 /// 826 /// In order to satisfy the specification, internal state must be maintained 827 /// to implement the behavior. This state grows linearly with the number of 828 /// streams that are locally reset. 829 /// 830 /// The `reset_stream_duration` setting configures the max amount of time 831 /// this state will be maintained in memory. Once the duration elapses, the 832 /// stream state is purged from memory. 833 /// 834 /// Once the stream has been fully purged from memory, any additional frames 835 /// received for that stream will result in a connection level protocol 836 /// error, forcing the connection to terminate. 837 /// 838 /// The default value is 30 seconds. 839 /// 840 /// # Examples 841 /// 842 /// ``` 843 /// # use tokio::io::{AsyncRead, AsyncWrite}; 844 /// # use h2::server::*; 845 /// # use std::time::Duration; 846 /// # 847 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 848 /// # -> Handshake<T> 849 /// # { 850 /// // `server_fut` is a future representing the completion of the HTTP/2.0 851 /// // handshake. 852 /// let server_fut = Builder::new() 853 /// .reset_stream_duration(Duration::from_secs(10)) 854 /// .handshake(my_io); 855 /// # server_fut 856 /// # } 857 /// # 858 /// # pub fn main() {} 859 /// ``` 860 pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self { 861 self.reset_stream_duration = dur; 862 self 863 } 864 865 /// Creates a new configured HTTP/2.0 server backed by `io`. 866 /// 867 /// It is expected that `io` already be in an appropriate state to commence 868 /// the [HTTP/2.0 handshake]. See [Handshake] for more details. 869 /// 870 /// Returns a future which resolves to the [`Connection`] instance once the 871 /// HTTP/2.0 handshake has been completed. 872 /// 873 /// This function also allows the caller to configure the send payload data 874 /// type. See [Outbound data type] for more details. 875 /// 876 /// [HTTP/2.0 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader 877 /// [Handshake]: ../index.html#handshake 878 /// [`Connection`]: struct.Connection.html 879 /// [Outbound data type]: ../index.html#outbound-data-type. 880 /// 881 /// # Examples 882 /// 883 /// Basic usage: 884 /// 885 /// ``` 886 /// # use tokio::io::{AsyncRead, AsyncWrite}; 887 /// # use h2::server::*; 888 /// # 889 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 890 /// # -> Handshake<T> 891 /// # { 892 /// // `server_fut` is a future representing the completion of the HTTP/2.0 893 /// // handshake. 894 /// let server_fut = Builder::new() 895 /// .handshake(my_io); 896 /// # server_fut 897 /// # } 898 /// # 899 /// # pub fn main() {} 900 /// ``` 901 /// 902 /// Configures the send-payload data type. In this case, the outbound data 903 /// type will be `&'static [u8]`. 904 /// 905 /// ``` 906 /// # use tokio::io::{AsyncRead, AsyncWrite}; 907 /// # use h2::server::*; 908 /// # 909 /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) 910 /// # -> Handshake<T, &'static [u8]> 911 /// # { 912 /// // `server_fut` is a future representing the completion of the HTTP/2.0 913 /// // handshake. 914 /// let server_fut: Handshake<_, &'static [u8]> = Builder::new() 915 /// .handshake(my_io); 916 /// # server_fut 917 /// # } 918 /// # 919 /// # pub fn main() {} 920 /// ``` 921 pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B> 922 where 923 T: AsyncRead + AsyncWrite + Unpin, 924 B: Buf + 'static, 925 { 926 Connection::handshake2(io, self.clone()) 927 } 928 } 929 930 impl Default for Builder { 931 fn default() -> Builder { 932 Builder::new() 933 } 934 } 935 936 // ===== impl SendResponse ===== 937 938 impl<B: Buf> SendResponse<B> { 939 /// Send a response to a client request. 940 /// 941 /// On success, a [`SendStream`] instance is returned. This instance can be 942 /// used to stream the response body and send trailers. 943 /// 944 /// If a body or trailers will be sent on the returned [`SendStream`] 945 /// instance, then `end_of_stream` must be set to `false` when calling this 946 /// function. 947 /// 948 /// The [`SendResponse`] instance is already associated with a received 949 /// request. This function may only be called once per instance and only if 950 /// [`send_reset`] has not been previously called. 951 /// 952 /// [`SendResponse`]: # 953 /// [`SendStream`]: ../struct.SendStream.html 954 /// [`send_reset`]: #method.send_reset 955 pub fn send_response( 956 &mut self, 957 response: Response<()>, 958 end_of_stream: bool, 959 ) -> Result<SendStream<B>, crate::Error> { 960 self.inner 961 .send_response(response, end_of_stream) 962 .map(|_| SendStream::new(self.inner.clone())) 963 .map_err(Into::into) 964 } 965 966 /// Push a request and response to the client 967 /// 968 /// On success, a [`SendResponse`] instance is returned. 969 /// 970 /// [`SendResponse`]: # 971 pub fn push_request( 972 &mut self, 973 request: Request<()>, 974 ) -> Result<SendPushedResponse<B>, crate::Error> { 975 self.inner 976 .send_push_promise(request) 977 .map(|inner| SendPushedResponse { 978 inner: SendResponse { inner }, 979 }) 980 .map_err(Into::into) 981 } 982 983 /// Send a stream reset to the peer. 984 /// 985 /// This essentially cancels the stream, including any inbound or outbound 986 /// data streams. 987 /// 988 /// If this function is called before [`send_response`], a call to 989 /// [`send_response`] will result in an error. 990 /// 991 /// If this function is called while a [`SendStream`] instance is active, 992 /// any further use of the instance will result in an error. 993 /// 994 /// This function should only be called once. 995 /// 996 /// [`send_response`]: #method.send_response 997 /// [`SendStream`]: ../struct.SendStream.html 998 pub fn send_reset(&mut self, reason: Reason) { 999 self.inner.send_reset(reason) 1000 } 1001 1002 /// Polls to be notified when the client resets this stream. 1003 /// 1004 /// If stream is still open, this returns `Poll::Pending`, and 1005 /// registers the task to be notified if a `RST_STREAM` is received. 1006 /// 1007 /// If a `RST_STREAM` frame is received for this stream, calling this 1008 /// method will yield the `Reason` for the reset. 1009 /// 1010 /// # Error 1011 /// 1012 /// Calling this method after having called `send_response` will return 1013 /// a user error. 1014 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> { 1015 self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders) 1016 } 1017 1018 /// Returns the stream ID of the response stream. 1019 /// 1020 /// # Panics 1021 /// 1022 /// If the lock on the strean store has been poisoned. 1023 pub fn stream_id(&self) -> crate::StreamId { 1024 crate::StreamId::from_internal(self.inner.stream_id()) 1025 } 1026 } 1027 1028 // ===== impl SendPushedResponse ===== 1029 1030 impl<B: Buf> SendPushedResponse<B> { 1031 /// Send a response to a promised request. 1032 /// 1033 /// On success, a [`SendStream`] instance is returned. This instance can be 1034 /// used to stream the response body and send trailers. 1035 /// 1036 /// If a body or trailers will be sent on the returned [`SendStream`] 1037 /// instance, then `end_of_stream` must be set to `false` when calling this 1038 /// function. 1039 /// 1040 /// The [`SendPushedResponse`] instance is associated with a promised 1041 /// request. This function may only be called once per instance and only if 1042 /// [`send_reset`] has not been previously called. 1043 /// 1044 /// [`SendPushedResponse`]: # 1045 /// [`SendStream`]: ../struct.SendStream.html 1046 /// [`send_reset`]: #method.send_reset 1047 pub fn send_response( 1048 &mut self, 1049 response: Response<()>, 1050 end_of_stream: bool, 1051 ) -> Result<SendStream<B>, crate::Error> { 1052 self.inner.send_response(response, end_of_stream) 1053 } 1054 1055 /// Send a stream reset to the peer. 1056 /// 1057 /// This essentially cancels the stream, including any inbound or outbound 1058 /// data streams. 1059 /// 1060 /// If this function is called before [`send_response`], a call to 1061 /// [`send_response`] will result in an error. 1062 /// 1063 /// If this function is called while a [`SendStream`] instance is active, 1064 /// any further use of the instance will result in an error. 1065 /// 1066 /// This function should only be called once. 1067 /// 1068 /// [`send_response`]: #method.send_response 1069 /// [`SendStream`]: ../struct.SendStream.html 1070 pub fn send_reset(&mut self, reason: Reason) { 1071 self.inner.send_reset(reason) 1072 } 1073 1074 /// Polls to be notified when the client resets this stream. 1075 /// 1076 /// If stream is still open, this returns `Poll::Pending`, and 1077 /// registers the task to be notified if a `RST_STREAM` is received. 1078 /// 1079 /// If a `RST_STREAM` frame is received for this stream, calling this 1080 /// method will yield the `Reason` for the reset. 1081 /// 1082 /// # Error 1083 /// 1084 /// Calling this method after having called `send_response` will return 1085 /// a user error. 1086 pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> { 1087 self.inner.poll_reset(cx) 1088 } 1089 1090 /// Returns the stream ID of the response stream. 1091 /// 1092 /// # Panics 1093 /// 1094 /// If the lock on the strean store has been poisoned. 1095 pub fn stream_id(&self) -> crate::StreamId { 1096 self.inner.stream_id() 1097 } 1098 } 1099 1100 // ===== impl Flush ===== 1101 1102 impl<T, B: Buf> Flush<T, B> { 1103 fn new(codec: Codec<T, B>) -> Self { 1104 Flush { codec: Some(codec) } 1105 } 1106 } 1107 1108 impl<T, B> Future for Flush<T, B> 1109 where 1110 T: AsyncWrite + Unpin, 1111 B: Buf, 1112 { 1113 type Output = Result<Codec<T, B>, crate::Error>; 1114 1115 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 1116 // Flush the codec 1117 ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?; 1118 1119 // Return the codec 1120 Poll::Ready(Ok(self.codec.take().unwrap())) 1121 } 1122 } 1123 1124 impl<T, B: Buf> ReadPreface<T, B> { 1125 fn new(codec: Codec<T, B>) -> Self { 1126 ReadPreface { 1127 codec: Some(codec), 1128 pos: 0, 1129 } 1130 } 1131 1132 fn inner_mut(&mut self) -> &mut T { 1133 self.codec.as_mut().unwrap().get_mut() 1134 } 1135 } 1136 1137 impl<T, B> Future for ReadPreface<T, B> 1138 where 1139 T: AsyncRead + Unpin, 1140 B: Buf, 1141 { 1142 type Output = Result<Codec<T, B>, crate::Error>; 1143 1144 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 1145 let mut buf = [0; 24]; 1146 let mut rem = PREFACE.len() - self.pos; 1147 1148 while rem > 0 { 1149 let n = ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf[..rem])) 1150 .map_err(crate::Error::from_io)?; 1151 if n == 0 { 1152 return Poll::Ready(Err(crate::Error::from_io(io::Error::new( 1153 io::ErrorKind::UnexpectedEof, 1154 "connection closed before reading preface", 1155 )))); 1156 } 1157 1158 if PREFACE[self.pos..self.pos + n] != buf[..n] { 1159 proto_err!(conn: "read_preface: invalid preface"); 1160 // TODO: Should this just write the GO_AWAY frame directly? 1161 return Poll::Ready(Err(Reason::PROTOCOL_ERROR.into())); 1162 } 1163 1164 self.pos += n; 1165 rem -= n; // TODO test 1166 } 1167 1168 Poll::Ready(Ok(self.codec.take().unwrap())) 1169 } 1170 } 1171 1172 // ===== impl Handshake ===== 1173 1174 impl<T, B: Buf> Future for Handshake<T, B> 1175 where 1176 T: AsyncRead + AsyncWrite + Unpin, 1177 B: Buf + 'static, 1178 { 1179 type Output = Result<Connection<T, B>, crate::Error>; 1180 1181 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 1182 log::trace!("Handshake::poll(); state={:?};", self.state); 1183 use crate::server::Handshaking::*; 1184 1185 self.state = if let Flushing(ref mut flush) = self.state { 1186 // We're currently flushing a pending SETTINGS frame. Poll the 1187 // flush future, and, if it's completed, advance our state to wait 1188 // for the client preface. 1189 let codec = match Pin::new(flush).poll(cx)? { 1190 Poll::Pending => { 1191 log::trace!("Handshake::poll(); flush.poll()=Pending"); 1192 return Poll::Pending; 1193 } 1194 Poll::Ready(flushed) => { 1195 log::trace!("Handshake::poll(); flush.poll()=Ready"); 1196 flushed 1197 } 1198 }; 1199 Handshaking::from(ReadPreface::new(codec)) 1200 } else { 1201 // Otherwise, we haven't actually advanced the state, but we have 1202 // to replace it with itself, because we have to return a value. 1203 // (note that the assignment to `self.state` has to be outside of 1204 // the `if let` block above in order to placate the borrow checker). 1205 mem::replace(&mut self.state, Handshaking::Empty) 1206 }; 1207 let poll = if let ReadingPreface(ref mut read) = self.state { 1208 // We're now waiting for the client preface. Poll the `ReadPreface` 1209 // future. If it has completed, we will create a `Connection` handle 1210 // for the connection. 1211 Pin::new(read).poll(cx) 1212 // Actually creating the `Connection` has to occur outside of this 1213 // `if let` block, because we've borrowed `self` mutably in order 1214 // to poll the state and won't be able to borrow the SETTINGS frame 1215 // as well until we release the borrow for `poll()`. 1216 } else { 1217 unreachable!("Handshake::poll() state was not advanced completely!") 1218 }; 1219 poll?.map(|codec| { 1220 let connection = proto::Connection::new( 1221 codec, 1222 Config { 1223 next_stream_id: 2.into(), 1224 // Server does not need to locally initiate any streams 1225 initial_max_send_streams: 0, 1226 reset_stream_duration: self.builder.reset_stream_duration, 1227 reset_stream_max: self.builder.reset_stream_max, 1228 settings: self.builder.settings.clone(), 1229 }, 1230 ); 1231 1232 log::trace!("Handshake::poll(); connection established!"); 1233 let mut c = Connection { connection }; 1234 if let Some(sz) = self.builder.initial_target_connection_window_size { 1235 c.set_target_window_size(sz); 1236 } 1237 Ok(c) 1238 }) 1239 } 1240 } 1241 1242 impl<T, B> fmt::Debug for Handshake<T, B> 1243 where 1244 T: AsyncRead + AsyncWrite + fmt::Debug, 1245 B: fmt::Debug + Buf, 1246 { 1247 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 1248 write!(fmt, "server::Handshake") 1249 } 1250 } 1251 1252 impl Peer { 1253 pub fn convert_send_message( 1254 id: StreamId, 1255 response: Response<()>, 1256 end_of_stream: bool, 1257 ) -> frame::Headers { 1258 use http::response::Parts; 1259 1260 // Extract the components of the HTTP request 1261 let ( 1262 Parts { 1263 status, headers, .. 1264 }, 1265 _, 1266 ) = response.into_parts(); 1267 1268 // Build the set pseudo header set. All requests will include `method` 1269 // and `path`. 1270 let pseudo = Pseudo::response(status); 1271 1272 // Create the HEADERS frame 1273 let mut frame = frame::Headers::new(id, pseudo, headers); 1274 1275 if end_of_stream { 1276 frame.set_end_stream() 1277 } 1278 1279 frame 1280 } 1281 1282 pub fn convert_push_message( 1283 stream_id: StreamId, 1284 promised_id: StreamId, 1285 request: Request<()>, 1286 ) -> Result<frame::PushPromise, UserError> { 1287 use http::request::Parts; 1288 1289 if let Err(e) = frame::PushPromise::validate_request(&request) { 1290 use PushPromiseHeaderError::*; 1291 match e { 1292 NotSafeAndCacheable => log::debug!( 1293 "convert_push_message: method {} is not safe and cacheable; promised_id={:?}", 1294 request.method(), 1295 promised_id, 1296 ), 1297 InvalidContentLength(e) => log::debug!( 1298 "convert_push_message; promised request has invalid content-length {:?}; promised_id={:?}", 1299 e, 1300 promised_id, 1301 ), 1302 } 1303 return Err(UserError::MalformedHeaders); 1304 } 1305 1306 // Extract the components of the HTTP request 1307 let ( 1308 Parts { 1309 method, 1310 uri, 1311 headers, 1312 .. 1313 }, 1314 _, 1315 ) = request.into_parts(); 1316 1317 let pseudo = Pseudo::request(method, uri); 1318 1319 Ok(frame::PushPromise::new( 1320 stream_id, 1321 promised_id, 1322 pseudo, 1323 headers, 1324 )) 1325 } 1326 } 1327 1328 impl proto::Peer for Peer { 1329 type Poll = Request<()>; 1330 1331 fn is_server() -> bool { 1332 true 1333 } 1334 1335 fn r#dyn() -> proto::DynPeer { 1336 proto::DynPeer::Server 1337 } 1338 1339 fn convert_poll_message( 1340 pseudo: Pseudo, 1341 fields: HeaderMap, 1342 stream_id: StreamId, 1343 ) -> Result<Self::Poll, RecvError> { 1344 use http::{uri, Version}; 1345 1346 let mut b = Request::builder(); 1347 1348 macro_rules! malformed { 1349 ($($arg:tt)*) => {{ 1350 log::debug!($($arg)*); 1351 return Err(RecvError::Stream { 1352 id: stream_id, 1353 reason: Reason::PROTOCOL_ERROR, 1354 }); 1355 }} 1356 }; 1357 1358 b = b.version(Version::HTTP_2); 1359 1360 if let Some(method) = pseudo.method { 1361 b = b.method(method); 1362 } else { 1363 malformed!("malformed headers: missing method"); 1364 } 1365 1366 // Specifying :status for a request is a protocol error 1367 if pseudo.status.is_some() { 1368 log::trace!("malformed headers: :status field on request; PROTOCOL_ERROR"); 1369 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 1370 } 1371 1372 // Convert the URI 1373 let mut parts = uri::Parts::default(); 1374 1375 // A request translated from HTTP/1 must not include the :authority 1376 // header 1377 if let Some(authority) = pseudo.authority { 1378 let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner()); 1379 parts.authority = Some(maybe_authority.or_else(|why| { 1380 malformed!( 1381 "malformed headers: malformed authority ({:?}): {}", 1382 authority, 1383 why, 1384 ) 1385 })?); 1386 } 1387 1388 // A :scheme is always required. 1389 if let Some(scheme) = pseudo.scheme { 1390 let maybe_scheme = scheme.parse(); 1391 let scheme = maybe_scheme.or_else(|why| { 1392 malformed!( 1393 "malformed headers: malformed scheme ({:?}): {}", 1394 scheme, 1395 why, 1396 ) 1397 })?; 1398 1399 // It's not possible to build an `Uri` from a scheme and path. So, 1400 // after validating is was a valid scheme, we just have to drop it 1401 // if there isn't an :authority. 1402 if parts.authority.is_some() { 1403 parts.scheme = Some(scheme); 1404 } 1405 } else { 1406 malformed!("malformed headers: missing scheme"); 1407 } 1408 1409 if let Some(path) = pseudo.path { 1410 // This cannot be empty 1411 if path.is_empty() { 1412 malformed!("malformed headers: missing path"); 1413 } 1414 1415 let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner()); 1416 parts.path_and_query = Some(maybe_path.or_else(|why| { 1417 malformed!("malformed headers: malformed path ({:?}): {}", path, why,) 1418 })?); 1419 } 1420 1421 b = b.uri(parts); 1422 1423 let mut request = match b.body(()) { 1424 Ok(request) => request, 1425 Err(e) => { 1426 // TODO: Should there be more specialized handling for different 1427 // kinds of errors 1428 proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id); 1429 return Err(RecvError::Stream { 1430 id: stream_id, 1431 reason: Reason::PROTOCOL_ERROR, 1432 }); 1433 } 1434 }; 1435 1436 *request.headers_mut() = fields; 1437 1438 Ok(request) 1439 } 1440 } 1441 1442 // ===== impl Handshaking ===== 1443 1444 impl<T, B> fmt::Debug for Handshaking<T, B> 1445 where 1446 B: Buf, 1447 { 1448 #[inline] 1449 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { 1450 match *self { 1451 Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"), 1452 Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"), 1453 Handshaking::Empty => write!(f, "Handshaking::Empty"), 1454 } 1455 } 1456 } 1457 1458 impl<T, B> convert::From<Flush<T, Prioritized<B>>> for Handshaking<T, B> 1459 where 1460 T: AsyncRead + AsyncWrite, 1461 B: Buf, 1462 { 1463 #[inline] 1464 fn from(flush: Flush<T, Prioritized<B>>) -> Self { 1465 Handshaking::Flushing(flush) 1466 } 1467 } 1468 1469 impl<T, B> convert::From<ReadPreface<T, Prioritized<B>>> for Handshaking<T, B> 1470 where 1471 T: AsyncRead + AsyncWrite, 1472 B: Buf, 1473 { 1474 #[inline] 1475 fn from(read: ReadPreface<T, Prioritized<B>>) -> Self { 1476 Handshaking::ReadingPreface(read) 1477 } 1478 } 1479 1480 impl<T, B> convert::From<Codec<T, Prioritized<B>>> for Handshaking<T, B> 1481 where 1482 T: AsyncRead + AsyncWrite, 1483 B: Buf, 1484 { 1485 #[inline] 1486 fn from(codec: Codec<T, Prioritized<B>>) -> Self { 1487 Handshaking::from(Flush::new(codec)) 1488 } 1489 } 1490