1 use super::*; 2 use crate::codec::UserError; 3 use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; 4 use crate::proto::{self, Error}; 5 use std::task::Context; 6 7 use http::{HeaderMap, Request, Response}; 8 9 use std::io; 10 use std::task::{Poll, Waker}; 11 use std::time::{Duration, Instant}; 12 13 #[derive(Debug)] 14 pub(super) struct Recv { 15 /// Initial window size of remote initiated streams 16 init_window_sz: WindowSize, 17 18 /// Connection level flow control governing received data 19 flow: FlowControl, 20 21 /// Amount of connection window capacity currently used by outstanding streams. 22 in_flight_data: WindowSize, 23 24 /// The lowest stream ID that is still idle 25 next_stream_id: Result<StreamId, StreamIdOverflow>, 26 27 /// The stream ID of the last processed stream 28 last_processed_id: StreamId, 29 30 /// Any streams with a higher ID are ignored. 31 /// 32 /// This starts as MAX, but is lowered when a GOAWAY is received. 33 /// 34 /// > After sending a GOAWAY frame, the sender can discard frames for 35 /// > streams initiated by the receiver with identifiers higher than 36 /// > the identified last stream. 37 max_stream_id: StreamId, 38 39 /// Streams that have pending window updates 40 pending_window_updates: store::Queue<stream::NextWindowUpdate>, 41 42 /// New streams to be accepted 43 pending_accept: store::Queue<stream::NextAccept>, 44 45 /// Locally reset streams that should be reaped when they expire 46 pending_reset_expired: store::Queue<stream::NextResetExpire>, 47 48 /// How long locally reset streams should ignore received frames 49 reset_duration: Duration, 50 51 /// Holds frames that are waiting to be read 52 buffer: Buffer<Event>, 53 54 /// Refused StreamId, this represents a frame that must be sent out. 55 refused: Option<StreamId>, 56 57 /// If push promises are allowed to be received. 58 is_push_enabled: bool, 59 } 60 61 #[derive(Debug)] 62 pub(super) enum Event { 63 Headers(peer::PollMessage), 64 Data(Bytes), 65 Trailers(HeaderMap), 66 } 67 68 #[derive(Debug)] 69 pub(super) enum RecvHeaderBlockError<T> { 70 Oversize(T), 71 State(Error), 72 } 73 74 #[derive(Debug)] 75 pub(crate) enum Open { 76 PushPromise, 77 Headers, 78 } 79 80 impl Recv { new(peer: peer::Dyn, config: &Config) -> Self81 pub fn new(peer: peer::Dyn, config: &Config) -> Self { 82 let next_stream_id = if peer.is_server() { 1 } else { 2 }; 83 84 let mut flow = FlowControl::new(); 85 86 // connections always have the default window size, regardless of 87 // settings 88 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE) 89 .expect("invalid initial remote window size"); 90 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE); 91 92 Recv { 93 init_window_sz: config.local_init_window_sz, 94 flow, 95 in_flight_data: 0 as WindowSize, 96 next_stream_id: Ok(next_stream_id.into()), 97 pending_window_updates: store::Queue::new(), 98 last_processed_id: StreamId::ZERO, 99 max_stream_id: StreamId::MAX, 100 pending_accept: store::Queue::new(), 101 pending_reset_expired: store::Queue::new(), 102 reset_duration: config.local_reset_duration, 103 buffer: Buffer::new(), 104 refused: None, 105 is_push_enabled: config.local_push_enabled, 106 } 107 } 108 109 /// Returns the initial receive window size init_window_sz(&self) -> WindowSize110 pub fn init_window_sz(&self) -> WindowSize { 111 self.init_window_sz 112 } 113 114 /// Returns the ID of the last processed stream last_processed_id(&self) -> StreamId115 pub fn last_processed_id(&self) -> StreamId { 116 self.last_processed_id 117 } 118 119 /// Update state reflecting a new, remotely opened stream 120 /// 121 /// Returns the stream state if successful. `None` if refused open( &mut self, id: StreamId, mode: Open, counts: &mut Counts, ) -> Result<Option<StreamId>, Error>122 pub fn open( 123 &mut self, 124 id: StreamId, 125 mode: Open, 126 counts: &mut Counts, 127 ) -> Result<Option<StreamId>, Error> { 128 assert!(self.refused.is_none()); 129 130 counts.peer().ensure_can_open(id, mode)?; 131 132 let next_id = self.next_stream_id()?; 133 if id < next_id { 134 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id); 135 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); 136 } 137 138 self.next_stream_id = id.next_id(); 139 140 if !counts.can_inc_num_recv_streams() { 141 self.refused = Some(id); 142 return Ok(None); 143 } 144 145 Ok(Some(id)) 146 } 147 148 /// Transition the stream state based on receiving headers 149 /// 150 /// The caller ensures that the frame represents headers and not trailers. recv_headers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, counts: &mut Counts, ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>>151 pub fn recv_headers( 152 &mut self, 153 frame: frame::Headers, 154 stream: &mut store::Ptr, 155 counts: &mut Counts, 156 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> { 157 tracing::trace!("opening stream; init_window={}", self.init_window_sz); 158 let is_initial = stream.state.recv_open(&frame)?; 159 160 if is_initial { 161 // TODO: be smarter about this logic 162 if frame.stream_id() > self.last_processed_id { 163 self.last_processed_id = frame.stream_id(); 164 } 165 166 // Increment the number of concurrent streams 167 counts.inc_num_recv_streams(stream); 168 } 169 170 if !stream.content_length.is_head() { 171 use super::stream::ContentLength; 172 use http::header; 173 174 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) { 175 let content_length = match frame::parse_u64(content_length.as_bytes()) { 176 Ok(v) => v, 177 Err(()) => { 178 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id); 179 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); 180 } 181 }; 182 183 stream.content_length = ContentLength::Remaining(content_length); 184 } 185 } 186 187 if frame.is_over_size() { 188 // A frame is over size if the decoded header block was bigger than 189 // SETTINGS_MAX_HEADER_LIST_SIZE. 190 // 191 // > A server that receives a larger header block than it is willing 192 // > to handle can send an HTTP 431 (Request Header Fields Too 193 // > Large) status code [RFC6585]. A client can discard responses 194 // > that it cannot process. 195 // 196 // So, if peer is a server, we'll send a 431. In either case, 197 // an error is recorded, which will send a REFUSED_STREAM, 198 // since we don't want any of the data frames either. 199 tracing::debug!( 200 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \ 201 recv_headers: frame is over size; stream={:?}", 202 stream.id 203 ); 204 return if counts.peer().is_server() && is_initial { 205 let mut res = frame::Headers::new( 206 stream.id, 207 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE), 208 HeaderMap::new(), 209 ); 210 res.set_end_stream(); 211 Err(RecvHeaderBlockError::Oversize(Some(res))) 212 } else { 213 Err(RecvHeaderBlockError::Oversize(None)) 214 }; 215 } 216 217 let stream_id = frame.stream_id(); 218 let (pseudo, fields) = frame.into_parts(); 219 if !pseudo.is_informational() { 220 let message = counts 221 .peer() 222 .convert_poll_message(pseudo, fields, stream_id)?; 223 224 // Push the frame onto the stream's recv buffer 225 stream 226 .pending_recv 227 .push_back(&mut self.buffer, Event::Headers(message)); 228 stream.notify_recv(); 229 } 230 231 // Only servers can receive a headers frame that initiates the stream. 232 // This is verified in `Streams` before calling this function. 233 if counts.peer().is_server() { 234 self.pending_accept.push(stream); 235 } 236 237 Ok(()) 238 } 239 240 /// Called by the server to get the request 241 /// 242 /// TODO: Should this fn return `Result`? take_request(&mut self, stream: &mut store::Ptr) -> Request<()>243 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> { 244 use super::peer::PollMessage::*; 245 246 match stream.pending_recv.pop_front(&mut self.buffer) { 247 Some(Event::Headers(Server(request))) => request, 248 _ => panic!(), 249 } 250 } 251 252 /// Called by the client to get pushed response poll_pushed( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>>253 pub fn poll_pushed( 254 &mut self, 255 cx: &Context, 256 stream: &mut store::Ptr, 257 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> { 258 use super::peer::PollMessage::*; 259 260 let mut ppp = stream.pending_push_promises.take(); 261 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| { 262 match pushed.pending_recv.pop_front(&mut self.buffer) { 263 Some(Event::Headers(Server(headers))) => (headers, pushed.key()), 264 // When frames are pushed into the queue, it is verified that 265 // the first frame is a HEADERS frame. 266 _ => panic!("Headers not set on pushed stream"), 267 } 268 }); 269 stream.pending_push_promises = ppp; 270 if let Some(p) = pushed { 271 Poll::Ready(Some(Ok(p))) 272 } else { 273 let is_open = stream.state.ensure_recv_open()?; 274 275 if is_open { 276 stream.recv_task = Some(cx.waker().clone()); 277 Poll::Pending 278 } else { 279 Poll::Ready(None) 280 } 281 } 282 } 283 284 /// Called by the client to get the response poll_response( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Result<Response<()>, proto::Error>>285 pub fn poll_response( 286 &mut self, 287 cx: &Context, 288 stream: &mut store::Ptr, 289 ) -> Poll<Result<Response<()>, proto::Error>> { 290 use super::peer::PollMessage::*; 291 292 // If the buffer is not empty, then the first frame must be a HEADERS 293 // frame or the user violated the contract. 294 match stream.pending_recv.pop_front(&mut self.buffer) { 295 Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), 296 Some(_) => panic!("poll_response called after response returned"), 297 None => { 298 stream.state.ensure_recv_open()?; 299 300 stream.recv_task = Some(cx.waker().clone()); 301 Poll::Pending 302 } 303 } 304 } 305 306 /// Transition the stream based on receiving trailers recv_trailers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, ) -> Result<(), Error>307 pub fn recv_trailers( 308 &mut self, 309 frame: frame::Headers, 310 stream: &mut store::Ptr, 311 ) -> Result<(), Error> { 312 // Transition the state 313 stream.state.recv_close()?; 314 315 if stream.ensure_content_length_zero().is_err() { 316 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id); 317 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); 318 } 319 320 let trailers = frame.into_fields(); 321 322 // Push the frame onto the stream's recv buffer 323 stream 324 .pending_recv 325 .push_back(&mut self.buffer, Event::Trailers(trailers)); 326 stream.notify_recv(); 327 328 Ok(()) 329 } 330 331 /// Releases capacity of the connection release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>)332 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) { 333 tracing::trace!( 334 "release_connection_capacity; size={}, connection in_flight_data={}", 335 capacity, 336 self.in_flight_data, 337 ); 338 339 // Decrement in-flight data 340 self.in_flight_data -= capacity; 341 342 // Assign capacity to connection 343 self.flow.assign_capacity(capacity); 344 345 if self.flow.unclaimed_capacity().is_some() { 346 if let Some(task) = task.take() { 347 task.wake(); 348 } 349 } 350 } 351 352 /// Releases capacity back to the connection & stream release_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, task: &mut Option<Waker>, ) -> Result<(), UserError>353 pub fn release_capacity( 354 &mut self, 355 capacity: WindowSize, 356 stream: &mut store::Ptr, 357 task: &mut Option<Waker>, 358 ) -> Result<(), UserError> { 359 tracing::trace!("release_capacity; size={}", capacity); 360 361 if capacity > stream.in_flight_recv_data { 362 return Err(UserError::ReleaseCapacityTooBig); 363 } 364 365 self.release_connection_capacity(capacity, task); 366 367 // Decrement in-flight data 368 stream.in_flight_recv_data -= capacity; 369 370 // Assign capacity to stream 371 stream.recv_flow.assign_capacity(capacity); 372 373 if stream.recv_flow.unclaimed_capacity().is_some() { 374 // Queue the stream for sending the WINDOW_UPDATE frame. 375 self.pending_window_updates.push(stream); 376 377 if let Some(task) = task.take() { 378 task.wake(); 379 } 380 } 381 382 Ok(()) 383 } 384 385 /// Release any unclaimed capacity for a closed stream. release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)386 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { 387 debug_assert_eq!(stream.ref_count, 0); 388 389 if stream.in_flight_recv_data == 0 { 390 return; 391 } 392 393 tracing::trace!( 394 "auto-release closed stream ({:?}) capacity: {:?}", 395 stream.id, 396 stream.in_flight_recv_data, 397 ); 398 399 self.release_connection_capacity(stream.in_flight_recv_data, task); 400 stream.in_flight_recv_data = 0; 401 402 self.clear_recv_buffer(stream); 403 } 404 405 /// Set the "target" connection window size. 406 /// 407 /// By default, all new connections start with 64kb of window size. As 408 /// streams used and release capacity, we will send WINDOW_UPDATEs for the 409 /// connection to bring it back up to the initial "target". 410 /// 411 /// Setting a target means that we will try to tell the peer about 412 /// WINDOW_UPDATEs so the peer knows it has about `target` window to use 413 /// for the whole connection. 414 /// 415 /// The `task` is an optional parked task for the `Connection` that might 416 /// be blocked on needing more window capacity. set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>)417 pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) { 418 tracing::trace!( 419 "set_target_connection_window; target={}; available={}, reserved={}", 420 target, 421 self.flow.available(), 422 self.in_flight_data, 423 ); 424 425 // The current target connection window is our `available` plus any 426 // in-flight data reserved by streams. 427 // 428 // Update the flow controller with the difference between the new 429 // target and the current target. 430 let current = (self.flow.available() + self.in_flight_data).checked_size(); 431 if target > current { 432 self.flow.assign_capacity(target - current); 433 } else { 434 self.flow.claim_capacity(current - target); 435 } 436 437 // If changing the target capacity means we gained a bunch of capacity, 438 // enough that we went over the update threshold, then schedule sending 439 // a connection WINDOW_UPDATE. 440 if self.flow.unclaimed_capacity().is_some() { 441 if let Some(task) = task.take() { 442 task.wake(); 443 } 444 } 445 } 446 apply_local_settings( &mut self, settings: &frame::Settings, store: &mut Store, ) -> Result<(), proto::Error>447 pub(crate) fn apply_local_settings( 448 &mut self, 449 settings: &frame::Settings, 450 store: &mut Store, 451 ) -> Result<(), proto::Error> { 452 let target = if let Some(val) = settings.initial_window_size() { 453 val 454 } else { 455 return Ok(()); 456 }; 457 458 let old_sz = self.init_window_sz; 459 self.init_window_sz = target; 460 461 tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); 462 463 // Per RFC 7540 §6.9.2: 464 // 465 // In addition to changing the flow-control window for streams that are 466 // not yet active, a SETTINGS frame can alter the initial flow-control 467 // window size for streams with active flow-control windows (that is, 468 // streams in the "open" or "half-closed (remote)" state). When the 469 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust 470 // the size of all stream flow-control windows that it maintains by the 471 // difference between the new value and the old value. 472 // 473 // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available 474 // space in a flow-control window to become negative. A sender MUST 475 // track the negative flow-control window and MUST NOT send new 476 // flow-controlled frames until it receives WINDOW_UPDATE frames that 477 // cause the flow-control window to become positive. 478 479 if target < old_sz { 480 // We must decrease the (local) window on every open stream. 481 let dec = old_sz - target; 482 tracing::trace!("decrementing all windows; dec={}", dec); 483 484 store.for_each(|mut stream| { 485 stream.recv_flow.dec_recv_window(dec); 486 Ok(()) 487 }) 488 } else if target > old_sz { 489 // We must increase the (local) window on every open stream. 490 let inc = target - old_sz; 491 tracing::trace!("incrementing all windows; inc={}", inc); 492 store.for_each(|mut stream| { 493 // XXX: Shouldn't the peer have already noticed our 494 // overflow and sent us a GOAWAY? 495 stream 496 .recv_flow 497 .inc_window(inc) 498 .map_err(proto::Error::library_go_away)?; 499 stream.recv_flow.assign_capacity(inc); 500 Ok(()) 501 }) 502 } else { 503 // size is the same... so do nothing 504 Ok(()) 505 } 506 } 507 is_end_stream(&self, stream: &store::Ptr) -> bool508 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool { 509 if !stream.state.is_recv_closed() { 510 return false; 511 } 512 513 stream.pending_recv.is_empty() 514 } 515 recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error>516 pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> { 517 let sz = frame.payload().len(); 518 519 // This should have been enforced at the codec::FramedRead layer, so 520 // this is just a sanity check. 521 assert!(sz <= MAX_WINDOW_SIZE as usize); 522 523 let sz = sz as WindowSize; 524 525 let is_ignoring_frame = stream.state.is_local_reset(); 526 527 if !is_ignoring_frame && !stream.state.is_recv_streaming() { 528 // TODO: There are cases where this can be a stream error of 529 // STREAM_CLOSED instead... 530 531 // Receiving a DATA frame when not expecting one is a protocol 532 // error. 533 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id); 534 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); 535 } 536 537 tracing::trace!( 538 "recv_data; size={}; connection={}; stream={}", 539 sz, 540 self.flow.window_size(), 541 stream.recv_flow.window_size() 542 ); 543 544 if is_ignoring_frame { 545 tracing::trace!( 546 "recv_data; frame ignored on locally reset {:?} for some time", 547 stream.id, 548 ); 549 return Ok(self.ignore_data(sz)?); 550 } 551 552 // Ensure that there is enough capacity on the connection before acting 553 // on the stream. 554 self.consume_connection_window(sz)?; 555 556 if stream.recv_flow.window_size() < sz { 557 // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE 558 // > A receiver MAY respond with a stream error (Section 5.4.2) or 559 // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if 560 // > it is unable to accept a frame. 561 // 562 // So, for violating the **stream** window, we can send either a 563 // stream or connection error. We've opted to send a stream 564 // error. 565 return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR)); 566 } 567 568 if stream.dec_content_length(frame.payload().len()).is_err() { 569 proto_err!(stream: 570 "recv_data: content-length overflow; stream={:?}; len={:?}", 571 stream.id, 572 frame.payload().len(), 573 ); 574 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); 575 } 576 577 if frame.is_end_stream() { 578 if stream.ensure_content_length_zero().is_err() { 579 proto_err!(stream: 580 "recv_data: content-length underflow; stream={:?}; len={:?}", 581 stream.id, 582 frame.payload().len(), 583 ); 584 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); 585 } 586 587 if stream.state.recv_close().is_err() { 588 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id); 589 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); 590 } 591 } 592 593 // Update stream level flow control 594 stream.recv_flow.send_data(sz); 595 596 // Track the data as in-flight 597 stream.in_flight_recv_data += sz; 598 599 let event = Event::Data(frame.into_payload()); 600 601 // Push the frame onto the recv buffer 602 stream.pending_recv.push_back(&mut self.buffer, event); 603 stream.notify_recv(); 604 605 Ok(()) 606 } 607 ignore_data(&mut self, sz: WindowSize) -> Result<(), Error>608 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> { 609 // Ensure that there is enough capacity on the connection... 610 self.consume_connection_window(sz)?; 611 612 // Since we are ignoring this frame, 613 // we aren't returning the frame to the user. That means they 614 // have no way to release the capacity back to the connection. So 615 // we have to release it automatically. 616 // 617 // This call doesn't send a WINDOW_UPDATE immediately, just marks 618 // the capacity as available to be reclaimed. When the available 619 // capacity meets a threshold, a WINDOW_UPDATE is then sent. 620 self.release_connection_capacity(sz, &mut None); 621 Ok(()) 622 } 623 consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error>624 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> { 625 if self.flow.window_size() < sz { 626 tracing::debug!( 627 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});", 628 self.flow.window_size(), 629 sz, 630 ); 631 return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR)); 632 } 633 634 // Update connection level flow control 635 self.flow.send_data(sz); 636 637 // Track the data as in-flight 638 self.in_flight_data += sz; 639 Ok(()) 640 } 641 recv_push_promise( &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, ) -> Result<(), Error>642 pub fn recv_push_promise( 643 &mut self, 644 frame: frame::PushPromise, 645 stream: &mut store::Ptr, 646 ) -> Result<(), Error> { 647 stream.state.reserve_remote()?; 648 if frame.is_over_size() { 649 // A frame is over size if the decoded header block was bigger than 650 // SETTINGS_MAX_HEADER_LIST_SIZE. 651 // 652 // > A server that receives a larger header block than it is willing 653 // > to handle can send an HTTP 431 (Request Header Fields Too 654 // > Large) status code [RFC6585]. A client can discard responses 655 // > that it cannot process. 656 // 657 // So, if peer is a server, we'll send a 431. In either case, 658 // an error is recorded, which will send a REFUSED_STREAM, 659 // since we don't want any of the data frames either. 660 tracing::debug!( 661 "stream error REFUSED_STREAM -- recv_push_promise: \ 662 headers frame is over size; promised_id={:?};", 663 frame.promised_id(), 664 ); 665 return Err(Error::library_reset( 666 frame.promised_id(), 667 Reason::REFUSED_STREAM, 668 )); 669 } 670 671 let promised_id = frame.promised_id(); 672 let (pseudo, fields) = frame.into_parts(); 673 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?; 674 675 if let Err(e) = frame::PushPromise::validate_request(&req) { 676 use PushPromiseHeaderError::*; 677 match e { 678 NotSafeAndCacheable => proto_err!( 679 stream: 680 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}", 681 req.method(), 682 promised_id, 683 ), 684 InvalidContentLength(e) => proto_err!( 685 stream: 686 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}", 687 e, 688 promised_id, 689 ), 690 } 691 return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR)); 692 } 693 694 use super::peer::PollMessage::*; 695 stream 696 .pending_recv 697 .push_back(&mut self.buffer, Event::Headers(Server(req))); 698 stream.notify_recv(); 699 Ok(()) 700 } 701 702 /// Ensures that `id` is not in the `Idle` state. ensure_not_idle(&self, id: StreamId) -> Result<(), Reason>703 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { 704 if let Ok(next) = self.next_stream_id { 705 if id >= next { 706 tracing::debug!( 707 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}", 708 id 709 ); 710 return Err(Reason::PROTOCOL_ERROR); 711 } 712 } 713 // if next_stream_id is overflowed, that's ok. 714 715 Ok(()) 716 } 717 718 /// Handle remote sending an explicit RST_STREAM. recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream)719 pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { 720 // Notify the stream 721 stream.state.recv_reset(frame, stream.is_pending_send); 722 723 stream.notify_send(); 724 stream.notify_recv(); 725 } 726 727 /// Handle a connection-level error handle_error(&mut self, err: &proto::Error, stream: &mut Stream)728 pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) { 729 // Receive an error 730 stream.state.handle_error(err); 731 732 // If a receiver is waiting, notify it 733 stream.notify_send(); 734 stream.notify_recv(); 735 } 736 go_away(&mut self, last_processed_id: StreamId)737 pub fn go_away(&mut self, last_processed_id: StreamId) { 738 assert!(self.max_stream_id >= last_processed_id); 739 self.max_stream_id = last_processed_id; 740 } 741 recv_eof(&mut self, stream: &mut Stream)742 pub fn recv_eof(&mut self, stream: &mut Stream) { 743 stream.state.recv_eof(); 744 stream.notify_send(); 745 stream.notify_recv(); 746 } 747 clear_recv_buffer(&mut self, stream: &mut Stream)748 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) { 749 while let Some(_) = stream.pending_recv.pop_front(&mut self.buffer) { 750 // drop it 751 } 752 } 753 754 /// Get the max ID of streams we can receive. 755 /// 756 /// This gets lowered if we send a GOAWAY frame. max_stream_id(&self) -> StreamId757 pub fn max_stream_id(&self) -> StreamId { 758 self.max_stream_id 759 } 760 next_stream_id(&self) -> Result<StreamId, Error>761 pub fn next_stream_id(&self) -> Result<StreamId, Error> { 762 if let Ok(id) = self.next_stream_id { 763 Ok(id) 764 } else { 765 Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) 766 } 767 } 768 may_have_created_stream(&self, id: StreamId) -> bool769 pub fn may_have_created_stream(&self, id: StreamId) -> bool { 770 if let Ok(next_id) = self.next_stream_id { 771 // Peer::is_local_init should have been called beforehand 772 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); 773 id < next_id 774 } else { 775 true 776 } 777 } 778 779 /// Returns true if the remote peer can reserve a stream with the given ID. ensure_can_reserve(&self) -> Result<(), Error>780 pub fn ensure_can_reserve(&self) -> Result<(), Error> { 781 if !self.is_push_enabled { 782 proto_err!(conn: "recv_push_promise: push is disabled"); 783 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); 784 } 785 786 Ok(()) 787 } 788 789 /// Add a locally reset stream to queue to be eventually reaped. enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts)790 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 791 if !stream.state.is_local_reset() || stream.is_pending_reset_expiration() { 792 return; 793 } 794 795 tracing::trace!("enqueue_reset_expiration; {:?}", stream.id); 796 797 if !counts.can_inc_num_reset_streams() { 798 // try to evict 1 stream if possible 799 // if max allow is 0, this won't be able to evict, 800 // and then we'll just bail after 801 if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) { 802 counts.transition_after(evicted, true); 803 } 804 } 805 806 if counts.can_inc_num_reset_streams() { 807 counts.inc_num_reset_streams(); 808 self.pending_reset_expired.push(stream); 809 } 810 } 811 812 /// Send any pending refusals. send_pending_refusal<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,813 pub fn send_pending_refusal<T, B>( 814 &mut self, 815 cx: &mut Context, 816 dst: &mut Codec<T, Prioritized<B>>, 817 ) -> Poll<io::Result<()>> 818 where 819 T: AsyncWrite + Unpin, 820 B: Buf, 821 { 822 if let Some(stream_id) = self.refused { 823 ready!(dst.poll_ready(cx))?; 824 825 // Create the RST_STREAM frame 826 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM); 827 828 // Buffer the frame 829 dst.buffer(frame.into()).expect("invalid RST_STREAM frame"); 830 } 831 832 self.refused = None; 833 834 Poll::Ready(Ok(())) 835 } 836 clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)837 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { 838 if !self.pending_reset_expired.is_empty() { 839 let now = Instant::now(); 840 let reset_duration = self.reset_duration; 841 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| { 842 let reset_at = stream.reset_at.expect("reset_at must be set if in queue"); 843 now - reset_at > reset_duration 844 }) { 845 counts.transition_after(stream, true); 846 } 847 } 848 } 849 clear_queues( &mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts, )850 pub fn clear_queues( 851 &mut self, 852 clear_pending_accept: bool, 853 store: &mut Store, 854 counts: &mut Counts, 855 ) { 856 self.clear_stream_window_update_queue(store, counts); 857 self.clear_all_reset_streams(store, counts); 858 859 if clear_pending_accept { 860 self.clear_all_pending_accept(store, counts); 861 } 862 } 863 clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts)864 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) { 865 while let Some(stream) = self.pending_window_updates.pop(store) { 866 counts.transition(stream, |_, stream| { 867 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id); 868 }) 869 } 870 } 871 872 /// Called on EOF clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)873 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { 874 while let Some(stream) = self.pending_reset_expired.pop(store) { 875 counts.transition_after(stream, true); 876 } 877 } 878 clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts)879 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) { 880 while let Some(stream) = self.pending_accept.pop(store) { 881 counts.transition_after(stream, false); 882 } 883 } 884 poll_complete<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,885 pub fn poll_complete<T, B>( 886 &mut self, 887 cx: &mut Context, 888 store: &mut Store, 889 counts: &mut Counts, 890 dst: &mut Codec<T, Prioritized<B>>, 891 ) -> Poll<io::Result<()>> 892 where 893 T: AsyncWrite + Unpin, 894 B: Buf, 895 { 896 // Send any pending connection level window updates 897 ready!(self.send_connection_window_update(cx, dst))?; 898 899 // Send any pending stream level window updates 900 ready!(self.send_stream_window_updates(cx, store, counts, dst))?; 901 902 Poll::Ready(Ok(())) 903 } 904 905 /// Send connection level window update send_connection_window_update<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,906 fn send_connection_window_update<T, B>( 907 &mut self, 908 cx: &mut Context, 909 dst: &mut Codec<T, Prioritized<B>>, 910 ) -> Poll<io::Result<()>> 911 where 912 T: AsyncWrite + Unpin, 913 B: Buf, 914 { 915 if let Some(incr) = self.flow.unclaimed_capacity() { 916 let frame = frame::WindowUpdate::new(StreamId::zero(), incr); 917 918 // Ensure the codec has capacity 919 ready!(dst.poll_ready(cx))?; 920 921 // Buffer the WINDOW_UPDATE frame 922 dst.buffer(frame.into()) 923 .expect("invalid WINDOW_UPDATE frame"); 924 925 // Update flow control 926 self.flow 927 .inc_window(incr) 928 .expect("unexpected flow control state"); 929 } 930 931 Poll::Ready(Ok(())) 932 } 933 934 /// Send stream level window update send_stream_window_updates<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,935 pub fn send_stream_window_updates<T, B>( 936 &mut self, 937 cx: &mut Context, 938 store: &mut Store, 939 counts: &mut Counts, 940 dst: &mut Codec<T, Prioritized<B>>, 941 ) -> Poll<io::Result<()>> 942 where 943 T: AsyncWrite + Unpin, 944 B: Buf, 945 { 946 loop { 947 // Ensure the codec has capacity 948 ready!(dst.poll_ready(cx))?; 949 950 // Get the next stream 951 let stream = match self.pending_window_updates.pop(store) { 952 Some(stream) => stream, 953 None => return Poll::Ready(Ok(())), 954 }; 955 956 counts.transition(stream, |_, stream| { 957 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id); 958 debug_assert!(!stream.is_pending_window_update); 959 960 if !stream.state.is_recv_streaming() { 961 // No need to send window updates on the stream if the stream is 962 // no longer receiving data. 963 // 964 // TODO: is this correct? We could possibly send a window 965 // update on a ReservedRemote stream if we already know 966 // we want to stream the data faster... 967 return; 968 } 969 970 // TODO: de-dup 971 if let Some(incr) = stream.recv_flow.unclaimed_capacity() { 972 // Create the WINDOW_UPDATE frame 973 let frame = frame::WindowUpdate::new(stream.id, incr); 974 975 // Buffer it 976 dst.buffer(frame.into()) 977 .expect("invalid WINDOW_UPDATE frame"); 978 979 // Update flow control 980 stream 981 .recv_flow 982 .inc_window(incr) 983 .expect("unexpected flow control state"); 984 } 985 }) 986 } 987 } 988 next_incoming(&mut self, store: &mut Store) -> Option<store::Key>989 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> { 990 self.pending_accept.pop(store).map(|ptr| ptr.key()) 991 } 992 poll_data( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<Bytes, proto::Error>>>993 pub fn poll_data( 994 &mut self, 995 cx: &Context, 996 stream: &mut Stream, 997 ) -> Poll<Option<Result<Bytes, proto::Error>>> { 998 // TODO: Return error when the stream is reset 999 match stream.pending_recv.pop_front(&mut self.buffer) { 1000 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), 1001 Some(event) => { 1002 // Frame is trailer 1003 stream.pending_recv.push_front(&mut self.buffer, event); 1004 1005 // Notify the recv task. This is done just in case 1006 // `poll_trailers` was called. 1007 // 1008 // It is very likely that `notify_recv` will just be a no-op (as 1009 // the task will be None), so this isn't really much of a 1010 // performance concern. It also means we don't have to track 1011 // state to see if `poll_trailers` was called before `poll_data` 1012 // returned `None`. 1013 stream.notify_recv(); 1014 1015 // No more data frames 1016 Poll::Ready(None) 1017 } 1018 None => self.schedule_recv(cx, stream), 1019 } 1020 } 1021 poll_trailers( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<HeaderMap, proto::Error>>>1022 pub fn poll_trailers( 1023 &mut self, 1024 cx: &Context, 1025 stream: &mut Stream, 1026 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> { 1027 match stream.pending_recv.pop_front(&mut self.buffer) { 1028 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))), 1029 Some(event) => { 1030 // Frame is not trailers.. not ready to poll trailers yet. 1031 stream.pending_recv.push_front(&mut self.buffer, event); 1032 1033 Poll::Pending 1034 } 1035 None => self.schedule_recv(cx, stream), 1036 } 1037 } 1038 schedule_recv<T>( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<T, proto::Error>>>1039 fn schedule_recv<T>( 1040 &mut self, 1041 cx: &Context, 1042 stream: &mut Stream, 1043 ) -> Poll<Option<Result<T, proto::Error>>> { 1044 if stream.state.ensure_recv_open()? { 1045 // Request to get notified once more frames arrive 1046 stream.recv_task = Some(cx.waker().clone()); 1047 Poll::Pending 1048 } else { 1049 // No more frames will be received 1050 Poll::Ready(None) 1051 } 1052 } 1053 } 1054 1055 // ===== impl Open ===== 1056 1057 impl Open { is_push_promise(&self) -> bool1058 pub fn is_push_promise(&self) -> bool { 1059 use self::Open::*; 1060 1061 match *self { 1062 PushPromise => true, 1063 _ => false, 1064 } 1065 } 1066 } 1067 1068 // ===== impl RecvHeaderBlockError ===== 1069 1070 impl<T> From<Error> for RecvHeaderBlockError<T> { from(err: Error) -> Self1071 fn from(err: Error) -> Self { 1072 RecvHeaderBlockError::State(err) 1073 } 1074 } 1075