1 use super::*; 2 use crate::codec::{RecvError, UserError}; 3 use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; 4 use crate::{frame, proto}; 5 use std::task::Context; 6 7 use http::{HeaderMap, Request, Response}; 8 9 use std::io; 10 use std::task::{Poll, Waker}; 11 use std::time::{Duration, Instant}; 12 13 #[derive(Debug)] 14 pub(super) struct Recv { 15 /// Initial window size of remote initiated streams 16 init_window_sz: WindowSize, 17 18 /// Connection level flow control governing received data 19 flow: FlowControl, 20 21 /// Amount of connection window capacity currently used by outstanding streams. 22 in_flight_data: WindowSize, 23 24 /// The lowest stream ID that is still idle 25 next_stream_id: Result<StreamId, StreamIdOverflow>, 26 27 /// The stream ID of the last processed stream 28 last_processed_id: StreamId, 29 30 /// Any streams with a higher ID are ignored. 31 /// 32 /// This starts as MAX, but is lowered when a GOAWAY is received. 33 /// 34 /// > After sending a GOAWAY frame, the sender can discard frames for 35 /// > streams initiated by the receiver with identifiers higher than 36 /// > the identified last stream. 37 max_stream_id: StreamId, 38 39 /// Streams that have pending window updates 40 pending_window_updates: store::Queue<stream::NextWindowUpdate>, 41 42 /// New streams to be accepted 43 pending_accept: store::Queue<stream::NextAccept>, 44 45 /// Locally reset streams that should be reaped when they expire 46 pending_reset_expired: store::Queue<stream::NextResetExpire>, 47 48 /// How long locally reset streams should ignore received frames 49 reset_duration: Duration, 50 51 /// Holds frames that are waiting to be read 52 buffer: Buffer<Event>, 53 54 /// Refused StreamId, this represents a frame that must be sent out. 55 refused: Option<StreamId>, 56 57 /// If push promises are allowed to be received. 58 is_push_enabled: bool, 59 } 60 61 #[derive(Debug)] 62 pub(super) enum Event { 63 Headers(peer::PollMessage), 64 Data(Bytes), 65 Trailers(HeaderMap), 66 } 67 68 #[derive(Debug)] 69 pub(super) enum RecvHeaderBlockError<T> { 70 Oversize(T), 71 State(RecvError), 72 } 73 74 #[derive(Debug)] 75 pub(crate) enum Open { 76 PushPromise, 77 Headers, 78 } 79 80 #[derive(Debug, Clone, Copy)] 81 struct Indices { 82 head: store::Key, 83 tail: store::Key, 84 } 85 86 impl Recv { new(peer: peer::Dyn, config: &Config) -> Self87 pub fn new(peer: peer::Dyn, config: &Config) -> Self { 88 let next_stream_id = if peer.is_server() { 1 } else { 2 }; 89 90 let mut flow = FlowControl::new(); 91 92 // connections always have the default window size, regardless of 93 // settings 94 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE) 95 .expect("invalid initial remote window size"); 96 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE); 97 98 Recv { 99 init_window_sz: config.local_init_window_sz, 100 flow, 101 in_flight_data: 0 as WindowSize, 102 next_stream_id: Ok(next_stream_id.into()), 103 pending_window_updates: store::Queue::new(), 104 last_processed_id: StreamId::ZERO, 105 max_stream_id: StreamId::MAX, 106 pending_accept: store::Queue::new(), 107 pending_reset_expired: store::Queue::new(), 108 reset_duration: config.local_reset_duration, 109 buffer: Buffer::new(), 110 refused: None, 111 is_push_enabled: config.local_push_enabled, 112 } 113 } 114 115 /// Returns the initial receive window size init_window_sz(&self) -> WindowSize116 pub fn init_window_sz(&self) -> WindowSize { 117 self.init_window_sz 118 } 119 120 /// Returns the ID of the last processed stream last_processed_id(&self) -> StreamId121 pub fn last_processed_id(&self) -> StreamId { 122 self.last_processed_id 123 } 124 125 /// Update state reflecting a new, remotely opened stream 126 /// 127 /// Returns the stream state if successful. `None` if refused open( &mut self, id: StreamId, mode: Open, counts: &mut Counts, ) -> Result<Option<StreamId>, RecvError>128 pub fn open( 129 &mut self, 130 id: StreamId, 131 mode: Open, 132 counts: &mut Counts, 133 ) -> Result<Option<StreamId>, RecvError> { 134 assert!(self.refused.is_none()); 135 136 counts.peer().ensure_can_open(id, mode)?; 137 138 let next_id = self.next_stream_id()?; 139 if id < next_id { 140 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id); 141 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 142 } 143 144 self.next_stream_id = id.next_id(); 145 146 if !counts.can_inc_num_recv_streams() { 147 self.refused = Some(id); 148 return Ok(None); 149 } 150 151 Ok(Some(id)) 152 } 153 154 /// Transition the stream state based on receiving headers 155 /// 156 /// The caller ensures that the frame represents headers and not trailers. recv_headers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, counts: &mut Counts, ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>>157 pub fn recv_headers( 158 &mut self, 159 frame: frame::Headers, 160 stream: &mut store::Ptr, 161 counts: &mut Counts, 162 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> { 163 tracing::trace!("opening stream; init_window={}", self.init_window_sz); 164 let is_initial = stream.state.recv_open(&frame)?; 165 166 if is_initial { 167 // TODO: be smarter about this logic 168 if frame.stream_id() > self.last_processed_id { 169 self.last_processed_id = frame.stream_id(); 170 } 171 172 // Increment the number of concurrent streams 173 counts.inc_num_recv_streams(stream); 174 } 175 176 if !stream.content_length.is_head() { 177 use super::stream::ContentLength; 178 use http::header; 179 180 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) { 181 let content_length = match frame::parse_u64(content_length.as_bytes()) { 182 Ok(v) => v, 183 Err(()) => { 184 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id); 185 return Err(RecvError::Stream { 186 id: stream.id, 187 reason: Reason::PROTOCOL_ERROR, 188 } 189 .into()); 190 } 191 }; 192 193 stream.content_length = ContentLength::Remaining(content_length); 194 } 195 } 196 197 if frame.is_over_size() { 198 // A frame is over size if the decoded header block was bigger than 199 // SETTINGS_MAX_HEADER_LIST_SIZE. 200 // 201 // > A server that receives a larger header block than it is willing 202 // > to handle can send an HTTP 431 (Request Header Fields Too 203 // > Large) status code [RFC6585]. A client can discard responses 204 // > that it cannot process. 205 // 206 // So, if peer is a server, we'll send a 431. In either case, 207 // an error is recorded, which will send a REFUSED_STREAM, 208 // since we don't want any of the data frames either. 209 tracing::debug!( 210 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \ 211 recv_headers: frame is over size; stream={:?}", 212 stream.id 213 ); 214 return if counts.peer().is_server() && is_initial { 215 let mut res = frame::Headers::new( 216 stream.id, 217 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE), 218 HeaderMap::new(), 219 ); 220 res.set_end_stream(); 221 Err(RecvHeaderBlockError::Oversize(Some(res))) 222 } else { 223 Err(RecvHeaderBlockError::Oversize(None)) 224 }; 225 } 226 227 let stream_id = frame.stream_id(); 228 let (pseudo, fields) = frame.into_parts(); 229 if !pseudo.is_informational() { 230 let message = counts 231 .peer() 232 .convert_poll_message(pseudo, fields, stream_id)?; 233 234 // Push the frame onto the stream's recv buffer 235 stream 236 .pending_recv 237 .push_back(&mut self.buffer, Event::Headers(message)); 238 stream.notify_recv(); 239 } 240 241 // Only servers can receive a headers frame that initiates the stream. 242 // This is verified in `Streams` before calling this function. 243 if counts.peer().is_server() { 244 self.pending_accept.push(stream); 245 } 246 247 Ok(()) 248 } 249 250 /// Called by the server to get the request 251 /// 252 /// TODO: Should this fn return `Result`? take_request(&mut self, stream: &mut store::Ptr) -> Request<()>253 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> { 254 use super::peer::PollMessage::*; 255 256 match stream.pending_recv.pop_front(&mut self.buffer) { 257 Some(Event::Headers(Server(request))) => request, 258 _ => panic!(), 259 } 260 } 261 262 /// Called by the client to get pushed response poll_pushed( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>>263 pub fn poll_pushed( 264 &mut self, 265 cx: &Context, 266 stream: &mut store::Ptr, 267 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> { 268 use super::peer::PollMessage::*; 269 270 let mut ppp = stream.pending_push_promises.take(); 271 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| { 272 match pushed.pending_recv.pop_front(&mut self.buffer) { 273 Some(Event::Headers(Server(headers))) => (headers, pushed.key()), 274 // When frames are pushed into the queue, it is verified that 275 // the first frame is a HEADERS frame. 276 _ => panic!("Headers not set on pushed stream"), 277 } 278 }); 279 stream.pending_push_promises = ppp; 280 if let Some(p) = pushed { 281 Poll::Ready(Some(Ok(p))) 282 } else { 283 let is_open = stream.state.ensure_recv_open()?; 284 285 if is_open { 286 stream.recv_task = Some(cx.waker().clone()); 287 Poll::Pending 288 } else { 289 Poll::Ready(None) 290 } 291 } 292 } 293 294 /// Called by the client to get the response poll_response( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Result<Response<()>, proto::Error>>295 pub fn poll_response( 296 &mut self, 297 cx: &Context, 298 stream: &mut store::Ptr, 299 ) -> Poll<Result<Response<()>, proto::Error>> { 300 use super::peer::PollMessage::*; 301 302 // If the buffer is not empty, then the first frame must be a HEADERS 303 // frame or the user violated the contract. 304 match stream.pending_recv.pop_front(&mut self.buffer) { 305 Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), 306 Some(_) => panic!("poll_response called after response returned"), 307 None => { 308 stream.state.ensure_recv_open()?; 309 310 stream.recv_task = Some(cx.waker().clone()); 311 Poll::Pending 312 } 313 } 314 } 315 316 /// Transition the stream based on receiving trailers recv_trailers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, ) -> Result<(), RecvError>317 pub fn recv_trailers( 318 &mut self, 319 frame: frame::Headers, 320 stream: &mut store::Ptr, 321 ) -> Result<(), RecvError> { 322 // Transition the state 323 stream.state.recv_close()?; 324 325 if stream.ensure_content_length_zero().is_err() { 326 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id); 327 return Err(RecvError::Stream { 328 id: stream.id, 329 reason: Reason::PROTOCOL_ERROR, 330 }); 331 } 332 333 let trailers = frame.into_fields(); 334 335 // Push the frame onto the stream's recv buffer 336 stream 337 .pending_recv 338 .push_back(&mut self.buffer, Event::Trailers(trailers)); 339 stream.notify_recv(); 340 341 Ok(()) 342 } 343 344 /// Releases capacity of the connection release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>)345 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) { 346 tracing::trace!( 347 "release_connection_capacity; size={}, connection in_flight_data={}", 348 capacity, 349 self.in_flight_data, 350 ); 351 352 // Decrement in-flight data 353 self.in_flight_data -= capacity; 354 355 // Assign capacity to connection 356 self.flow.assign_capacity(capacity); 357 358 if self.flow.unclaimed_capacity().is_some() { 359 if let Some(task) = task.take() { 360 task.wake(); 361 } 362 } 363 } 364 365 /// Releases capacity back to the connection & stream release_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, task: &mut Option<Waker>, ) -> Result<(), UserError>366 pub fn release_capacity( 367 &mut self, 368 capacity: WindowSize, 369 stream: &mut store::Ptr, 370 task: &mut Option<Waker>, 371 ) -> Result<(), UserError> { 372 tracing::trace!("release_capacity; size={}", capacity); 373 374 if capacity > stream.in_flight_recv_data { 375 return Err(UserError::ReleaseCapacityTooBig); 376 } 377 378 self.release_connection_capacity(capacity, task); 379 380 // Decrement in-flight data 381 stream.in_flight_recv_data -= capacity; 382 383 // Assign capacity to stream 384 stream.recv_flow.assign_capacity(capacity); 385 386 if stream.recv_flow.unclaimed_capacity().is_some() { 387 // Queue the stream for sending the WINDOW_UPDATE frame. 388 self.pending_window_updates.push(stream); 389 390 if let Some(task) = task.take() { 391 task.wake(); 392 } 393 } 394 395 Ok(()) 396 } 397 398 /// Release any unclaimed capacity for a closed stream. release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)399 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { 400 debug_assert_eq!(stream.ref_count, 0); 401 402 if stream.in_flight_recv_data == 0 { 403 return; 404 } 405 406 tracing::trace!( 407 "auto-release closed stream ({:?}) capacity: {:?}", 408 stream.id, 409 stream.in_flight_recv_data, 410 ); 411 412 self.release_connection_capacity(stream.in_flight_recv_data, task); 413 stream.in_flight_recv_data = 0; 414 415 self.clear_recv_buffer(stream); 416 } 417 418 /// Set the "target" connection window size. 419 /// 420 /// By default, all new connections start with 64kb of window size. As 421 /// streams used and release capacity, we will send WINDOW_UPDATEs for the 422 /// connection to bring it back up to the initial "target". 423 /// 424 /// Setting a target means that we will try to tell the peer about 425 /// WINDOW_UPDATEs so the peer knows it has about `target` window to use 426 /// for the whole connection. 427 /// 428 /// The `task` is an optional parked task for the `Connection` that might 429 /// be blocked on needing more window capacity. set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>)430 pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) { 431 tracing::trace!( 432 "set_target_connection_window; target={}; available={}, reserved={}", 433 target, 434 self.flow.available(), 435 self.in_flight_data, 436 ); 437 438 // The current target connection window is our `available` plus any 439 // in-flight data reserved by streams. 440 // 441 // Update the flow controller with the difference between the new 442 // target and the current target. 443 let current = (self.flow.available() + self.in_flight_data).checked_size(); 444 if target > current { 445 self.flow.assign_capacity(target - current); 446 } else { 447 self.flow.claim_capacity(current - target); 448 } 449 450 // If changing the target capacity means we gained a bunch of capacity, 451 // enough that we went over the update threshold, then schedule sending 452 // a connection WINDOW_UPDATE. 453 if self.flow.unclaimed_capacity().is_some() { 454 if let Some(task) = task.take() { 455 task.wake(); 456 } 457 } 458 } 459 apply_local_settings( &mut self, settings: &frame::Settings, store: &mut Store, ) -> Result<(), RecvError>460 pub(crate) fn apply_local_settings( 461 &mut self, 462 settings: &frame::Settings, 463 store: &mut Store, 464 ) -> Result<(), RecvError> { 465 let target = if let Some(val) = settings.initial_window_size() { 466 val 467 } else { 468 return Ok(()); 469 }; 470 471 let old_sz = self.init_window_sz; 472 self.init_window_sz = target; 473 474 tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); 475 476 // Per RFC 7540 §6.9.2: 477 // 478 // In addition to changing the flow-control window for streams that are 479 // not yet active, a SETTINGS frame can alter the initial flow-control 480 // window size for streams with active flow-control windows (that is, 481 // streams in the "open" or "half-closed (remote)" state). When the 482 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust 483 // the size of all stream flow-control windows that it maintains by the 484 // difference between the new value and the old value. 485 // 486 // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available 487 // space in a flow-control window to become negative. A sender MUST 488 // track the negative flow-control window and MUST NOT send new 489 // flow-controlled frames until it receives WINDOW_UPDATE frames that 490 // cause the flow-control window to become positive. 491 492 if target < old_sz { 493 // We must decrease the (local) window on every open stream. 494 let dec = old_sz - target; 495 tracing::trace!("decrementing all windows; dec={}", dec); 496 497 store.for_each(|mut stream| { 498 stream.recv_flow.dec_recv_window(dec); 499 Ok(()) 500 }) 501 } else if target > old_sz { 502 // We must increase the (local) window on every open stream. 503 let inc = target - old_sz; 504 tracing::trace!("incrementing all windows; inc={}", inc); 505 store.for_each(|mut stream| { 506 // XXX: Shouldn't the peer have already noticed our 507 // overflow and sent us a GOAWAY? 508 stream 509 .recv_flow 510 .inc_window(inc) 511 .map_err(RecvError::Connection)?; 512 stream.recv_flow.assign_capacity(inc); 513 Ok(()) 514 }) 515 } else { 516 // size is the same... so do nothing 517 Ok(()) 518 } 519 } 520 is_end_stream(&self, stream: &store::Ptr) -> bool521 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool { 522 if !stream.state.is_recv_closed() { 523 return false; 524 } 525 526 stream.pending_recv.is_empty() 527 } 528 recv_data( &mut self, frame: frame::Data, stream: &mut store::Ptr, ) -> Result<(), RecvError>529 pub fn recv_data( 530 &mut self, 531 frame: frame::Data, 532 stream: &mut store::Ptr, 533 ) -> Result<(), RecvError> { 534 let sz = frame.payload().len(); 535 536 // This should have been enforced at the codec::FramedRead layer, so 537 // this is just a sanity check. 538 assert!(sz <= MAX_WINDOW_SIZE as usize); 539 540 let sz = sz as WindowSize; 541 542 let is_ignoring_frame = stream.state.is_local_reset(); 543 544 if !is_ignoring_frame && !stream.state.is_recv_streaming() { 545 // TODO: There are cases where this can be a stream error of 546 // STREAM_CLOSED instead... 547 548 // Receiving a DATA frame when not expecting one is a protocol 549 // error. 550 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id); 551 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 552 } 553 554 tracing::trace!( 555 "recv_data; size={}; connection={}; stream={}", 556 sz, 557 self.flow.window_size(), 558 stream.recv_flow.window_size() 559 ); 560 561 if is_ignoring_frame { 562 tracing::trace!( 563 "recv_data; frame ignored on locally reset {:?} for some time", 564 stream.id, 565 ); 566 return self.ignore_data(sz); 567 } 568 569 // Ensure that there is enough capacity on the connection before acting 570 // on the stream. 571 self.consume_connection_window(sz)?; 572 573 if stream.recv_flow.window_size() < sz { 574 // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE 575 // > A receiver MAY respond with a stream error (Section 5.4.2) or 576 // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if 577 // > it is unable to accept a frame. 578 // 579 // So, for violating the **stream** window, we can send either a 580 // stream or connection error. We've opted to send a stream 581 // error. 582 return Err(RecvError::Stream { 583 id: stream.id, 584 reason: Reason::FLOW_CONTROL_ERROR, 585 }); 586 } 587 588 if stream.dec_content_length(frame.payload().len()).is_err() { 589 proto_err!(stream: 590 "recv_data: content-length overflow; stream={:?}; len={:?}", 591 stream.id, 592 frame.payload().len(), 593 ); 594 return Err(RecvError::Stream { 595 id: stream.id, 596 reason: Reason::PROTOCOL_ERROR, 597 }); 598 } 599 600 if frame.is_end_stream() { 601 if stream.ensure_content_length_zero().is_err() { 602 proto_err!(stream: 603 "recv_data: content-length underflow; stream={:?}; len={:?}", 604 stream.id, 605 frame.payload().len(), 606 ); 607 return Err(RecvError::Stream { 608 id: stream.id, 609 reason: Reason::PROTOCOL_ERROR, 610 }); 611 } 612 613 if stream.state.recv_close().is_err() { 614 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id); 615 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 616 } 617 } 618 619 // Update stream level flow control 620 stream.recv_flow.send_data(sz); 621 622 // Track the data as in-flight 623 stream.in_flight_recv_data += sz; 624 625 let event = Event::Data(frame.into_payload()); 626 627 // Push the frame onto the recv buffer 628 stream.pending_recv.push_back(&mut self.buffer, event); 629 stream.notify_recv(); 630 631 Ok(()) 632 } 633 ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError>634 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError> { 635 // Ensure that there is enough capacity on the connection... 636 self.consume_connection_window(sz)?; 637 638 // Since we are ignoring this frame, 639 // we aren't returning the frame to the user. That means they 640 // have no way to release the capacity back to the connection. So 641 // we have to release it automatically. 642 // 643 // This call doesn't send a WINDOW_UPDATE immediately, just marks 644 // the capacity as available to be reclaimed. When the available 645 // capacity meets a threshold, a WINDOW_UPDATE is then sent. 646 self.release_connection_capacity(sz, &mut None); 647 Ok(()) 648 } 649 consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError>650 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> { 651 if self.flow.window_size() < sz { 652 tracing::debug!( 653 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});", 654 self.flow.window_size(), 655 sz, 656 ); 657 return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR)); 658 } 659 660 // Update connection level flow control 661 self.flow.send_data(sz); 662 663 // Track the data as in-flight 664 self.in_flight_data += sz; 665 Ok(()) 666 } 667 recv_push_promise( &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, ) -> Result<(), RecvError>668 pub fn recv_push_promise( 669 &mut self, 670 frame: frame::PushPromise, 671 stream: &mut store::Ptr, 672 ) -> Result<(), RecvError> { 673 stream.state.reserve_remote()?; 674 if frame.is_over_size() { 675 // A frame is over size if the decoded header block was bigger than 676 // SETTINGS_MAX_HEADER_LIST_SIZE. 677 // 678 // > A server that receives a larger header block than it is willing 679 // > to handle can send an HTTP 431 (Request Header Fields Too 680 // > Large) status code [RFC6585]. A client can discard responses 681 // > that it cannot process. 682 // 683 // So, if peer is a server, we'll send a 431. In either case, 684 // an error is recorded, which will send a REFUSED_STREAM, 685 // since we don't want any of the data frames either. 686 tracing::debug!( 687 "stream error REFUSED_STREAM -- recv_push_promise: \ 688 headers frame is over size; promised_id={:?};", 689 frame.promised_id(), 690 ); 691 return Err(RecvError::Stream { 692 id: frame.promised_id(), 693 reason: Reason::REFUSED_STREAM, 694 }); 695 } 696 697 let promised_id = frame.promised_id(); 698 let (pseudo, fields) = frame.into_parts(); 699 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?; 700 701 if let Err(e) = frame::PushPromise::validate_request(&req) { 702 use PushPromiseHeaderError::*; 703 match e { 704 NotSafeAndCacheable => proto_err!( 705 stream: 706 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}", 707 req.method(), 708 promised_id, 709 ), 710 InvalidContentLength(e) => proto_err!( 711 stream: 712 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}", 713 e, 714 promised_id, 715 ), 716 } 717 return Err(RecvError::Stream { 718 id: promised_id, 719 reason: Reason::PROTOCOL_ERROR, 720 }); 721 } 722 723 use super::peer::PollMessage::*; 724 stream 725 .pending_recv 726 .push_back(&mut self.buffer, Event::Headers(Server(req))); 727 stream.notify_recv(); 728 Ok(()) 729 } 730 731 /// Ensures that `id` is not in the `Idle` state. ensure_not_idle(&self, id: StreamId) -> Result<(), Reason>732 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { 733 if let Ok(next) = self.next_stream_id { 734 if id >= next { 735 tracing::debug!( 736 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}", 737 id 738 ); 739 return Err(Reason::PROTOCOL_ERROR); 740 } 741 } 742 // if next_stream_id is overflowed, that's ok. 743 744 Ok(()) 745 } 746 747 /// Handle remote sending an explicit RST_STREAM. recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream)748 pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { 749 // Notify the stream 750 stream 751 .state 752 .recv_reset(frame.reason(), stream.is_pending_send); 753 754 stream.notify_send(); 755 stream.notify_recv(); 756 } 757 758 /// Handle a received error recv_err(&mut self, err: &proto::Error, stream: &mut Stream)759 pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) { 760 // Receive an error 761 stream.state.recv_err(err); 762 763 // If a receiver is waiting, notify it 764 stream.notify_send(); 765 stream.notify_recv(); 766 } 767 go_away(&mut self, last_processed_id: StreamId)768 pub fn go_away(&mut self, last_processed_id: StreamId) { 769 assert!(self.max_stream_id >= last_processed_id); 770 self.max_stream_id = last_processed_id; 771 } 772 recv_eof(&mut self, stream: &mut Stream)773 pub fn recv_eof(&mut self, stream: &mut Stream) { 774 stream.state.recv_eof(); 775 stream.notify_send(); 776 stream.notify_recv(); 777 } 778 clear_recv_buffer(&mut self, stream: &mut Stream)779 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) { 780 while let Some(_) = stream.pending_recv.pop_front(&mut self.buffer) { 781 // drop it 782 } 783 } 784 785 /// Get the max ID of streams we can receive. 786 /// 787 /// This gets lowered if we send a GOAWAY frame. max_stream_id(&self) -> StreamId788 pub fn max_stream_id(&self) -> StreamId { 789 self.max_stream_id 790 } 791 next_stream_id(&self) -> Result<StreamId, RecvError>792 pub fn next_stream_id(&self) -> Result<StreamId, RecvError> { 793 if let Ok(id) = self.next_stream_id { 794 Ok(id) 795 } else { 796 Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) 797 } 798 } 799 may_have_created_stream(&self, id: StreamId) -> bool800 pub fn may_have_created_stream(&self, id: StreamId) -> bool { 801 if let Ok(next_id) = self.next_stream_id { 802 // Peer::is_local_init should have been called beforehand 803 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); 804 id < next_id 805 } else { 806 true 807 } 808 } 809 810 /// Returns true if the remote peer can reserve a stream with the given ID. ensure_can_reserve(&self) -> Result<(), RecvError>811 pub fn ensure_can_reserve(&self) -> Result<(), RecvError> { 812 if !self.is_push_enabled { 813 proto_err!(conn: "recv_push_promise: push is disabled"); 814 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 815 } 816 817 Ok(()) 818 } 819 820 /// Add a locally reset stream to queue to be eventually reaped. enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts)821 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 822 if !stream.state.is_local_reset() || stream.is_pending_reset_expiration() { 823 return; 824 } 825 826 tracing::trace!("enqueue_reset_expiration; {:?}", stream.id); 827 828 if !counts.can_inc_num_reset_streams() { 829 // try to evict 1 stream if possible 830 // if max allow is 0, this won't be able to evict, 831 // and then we'll just bail after 832 if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) { 833 counts.transition_after(evicted, true); 834 } 835 } 836 837 if counts.can_inc_num_reset_streams() { 838 counts.inc_num_reset_streams(); 839 self.pending_reset_expired.push(stream); 840 } 841 } 842 843 /// Send any pending refusals. send_pending_refusal<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,844 pub fn send_pending_refusal<T, B>( 845 &mut self, 846 cx: &mut Context, 847 dst: &mut Codec<T, Prioritized<B>>, 848 ) -> Poll<io::Result<()>> 849 where 850 T: AsyncWrite + Unpin, 851 B: Buf, 852 { 853 if let Some(stream_id) = self.refused { 854 ready!(dst.poll_ready(cx))?; 855 856 // Create the RST_STREAM frame 857 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM); 858 859 // Buffer the frame 860 dst.buffer(frame.into()).expect("invalid RST_STREAM frame"); 861 } 862 863 self.refused = None; 864 865 Poll::Ready(Ok(())) 866 } 867 clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)868 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { 869 if !self.pending_reset_expired.is_empty() { 870 let now = Instant::now(); 871 let reset_duration = self.reset_duration; 872 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| { 873 let reset_at = stream.reset_at.expect("reset_at must be set if in queue"); 874 now - reset_at > reset_duration 875 }) { 876 counts.transition_after(stream, true); 877 } 878 } 879 } 880 clear_queues( &mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts, )881 pub fn clear_queues( 882 &mut self, 883 clear_pending_accept: bool, 884 store: &mut Store, 885 counts: &mut Counts, 886 ) { 887 self.clear_stream_window_update_queue(store, counts); 888 self.clear_all_reset_streams(store, counts); 889 890 if clear_pending_accept { 891 self.clear_all_pending_accept(store, counts); 892 } 893 } 894 clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts)895 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) { 896 while let Some(stream) = self.pending_window_updates.pop(store) { 897 counts.transition(stream, |_, stream| { 898 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id); 899 }) 900 } 901 } 902 903 /// Called on EOF clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)904 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { 905 while let Some(stream) = self.pending_reset_expired.pop(store) { 906 counts.transition_after(stream, true); 907 } 908 } 909 clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts)910 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) { 911 while let Some(stream) = self.pending_accept.pop(store) { 912 counts.transition_after(stream, false); 913 } 914 } 915 poll_complete<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,916 pub fn poll_complete<T, B>( 917 &mut self, 918 cx: &mut Context, 919 store: &mut Store, 920 counts: &mut Counts, 921 dst: &mut Codec<T, Prioritized<B>>, 922 ) -> Poll<io::Result<()>> 923 where 924 T: AsyncWrite + Unpin, 925 B: Buf, 926 { 927 // Send any pending connection level window updates 928 ready!(self.send_connection_window_update(cx, dst))?; 929 930 // Send any pending stream level window updates 931 ready!(self.send_stream_window_updates(cx, store, counts, dst))?; 932 933 Poll::Ready(Ok(())) 934 } 935 936 /// Send connection level window update send_connection_window_update<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,937 fn send_connection_window_update<T, B>( 938 &mut self, 939 cx: &mut Context, 940 dst: &mut Codec<T, Prioritized<B>>, 941 ) -> Poll<io::Result<()>> 942 where 943 T: AsyncWrite + Unpin, 944 B: Buf, 945 { 946 if let Some(incr) = self.flow.unclaimed_capacity() { 947 let frame = frame::WindowUpdate::new(StreamId::zero(), incr); 948 949 // Ensure the codec has capacity 950 ready!(dst.poll_ready(cx))?; 951 952 // Buffer the WINDOW_UPDATE frame 953 dst.buffer(frame.into()) 954 .expect("invalid WINDOW_UPDATE frame"); 955 956 // Update flow control 957 self.flow 958 .inc_window(incr) 959 .expect("unexpected flow control state"); 960 } 961 962 Poll::Ready(Ok(())) 963 } 964 965 /// Send stream level window update send_stream_window_updates<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,966 pub fn send_stream_window_updates<T, B>( 967 &mut self, 968 cx: &mut Context, 969 store: &mut Store, 970 counts: &mut Counts, 971 dst: &mut Codec<T, Prioritized<B>>, 972 ) -> Poll<io::Result<()>> 973 where 974 T: AsyncWrite + Unpin, 975 B: Buf, 976 { 977 loop { 978 // Ensure the codec has capacity 979 ready!(dst.poll_ready(cx))?; 980 981 // Get the next stream 982 let stream = match self.pending_window_updates.pop(store) { 983 Some(stream) => stream, 984 None => return Poll::Ready(Ok(())), 985 }; 986 987 counts.transition(stream, |_, stream| { 988 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id); 989 debug_assert!(!stream.is_pending_window_update); 990 991 if !stream.state.is_recv_streaming() { 992 // No need to send window updates on the stream if the stream is 993 // no longer receiving data. 994 // 995 // TODO: is this correct? We could possibly send a window 996 // update on a ReservedRemote stream if we already know 997 // we want to stream the data faster... 998 return; 999 } 1000 1001 // TODO: de-dup 1002 if let Some(incr) = stream.recv_flow.unclaimed_capacity() { 1003 // Create the WINDOW_UPDATE frame 1004 let frame = frame::WindowUpdate::new(stream.id, incr); 1005 1006 // Buffer it 1007 dst.buffer(frame.into()) 1008 .expect("invalid WINDOW_UPDATE frame"); 1009 1010 // Update flow control 1011 stream 1012 .recv_flow 1013 .inc_window(incr) 1014 .expect("unexpected flow control state"); 1015 } 1016 }) 1017 } 1018 } 1019 next_incoming(&mut self, store: &mut Store) -> Option<store::Key>1020 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> { 1021 self.pending_accept.pop(store).map(|ptr| ptr.key()) 1022 } 1023 poll_data( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<Bytes, proto::Error>>>1024 pub fn poll_data( 1025 &mut self, 1026 cx: &Context, 1027 stream: &mut Stream, 1028 ) -> Poll<Option<Result<Bytes, proto::Error>>> { 1029 // TODO: Return error when the stream is reset 1030 match stream.pending_recv.pop_front(&mut self.buffer) { 1031 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), 1032 Some(event) => { 1033 // Frame is trailer 1034 stream.pending_recv.push_front(&mut self.buffer, event); 1035 1036 // Notify the recv task. This is done just in case 1037 // `poll_trailers` was called. 1038 // 1039 // It is very likely that `notify_recv` will just be a no-op (as 1040 // the task will be None), so this isn't really much of a 1041 // performance concern. It also means we don't have to track 1042 // state to see if `poll_trailers` was called before `poll_data` 1043 // returned `None`. 1044 stream.notify_recv(); 1045 1046 // No more data frames 1047 Poll::Ready(None) 1048 } 1049 None => self.schedule_recv(cx, stream), 1050 } 1051 } 1052 poll_trailers( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<HeaderMap, proto::Error>>>1053 pub fn poll_trailers( 1054 &mut self, 1055 cx: &Context, 1056 stream: &mut Stream, 1057 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> { 1058 match stream.pending_recv.pop_front(&mut self.buffer) { 1059 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))), 1060 Some(event) => { 1061 // Frame is not trailers.. not ready to poll trailers yet. 1062 stream.pending_recv.push_front(&mut self.buffer, event); 1063 1064 Poll::Pending 1065 } 1066 None => self.schedule_recv(cx, stream), 1067 } 1068 } 1069 schedule_recv<T>( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<T, proto::Error>>>1070 fn schedule_recv<T>( 1071 &mut self, 1072 cx: &Context, 1073 stream: &mut Stream, 1074 ) -> Poll<Option<Result<T, proto::Error>>> { 1075 if stream.state.ensure_recv_open()? { 1076 // Request to get notified once more frames arrive 1077 stream.recv_task = Some(cx.waker().clone()); 1078 Poll::Pending 1079 } else { 1080 // No more frames will be received 1081 Poll::Ready(None) 1082 } 1083 } 1084 } 1085 1086 // ===== impl Open ===== 1087 1088 impl Open { is_push_promise(&self) -> bool1089 pub fn is_push_promise(&self) -> bool { 1090 use self::Open::*; 1091 1092 match *self { 1093 PushPromise => true, 1094 _ => false, 1095 } 1096 } 1097 } 1098 1099 // ===== impl RecvHeaderBlockError ===== 1100 1101 impl<T> From<RecvError> for RecvHeaderBlockError<T> { from(err: RecvError) -> Self1102 fn from(err: RecvError) -> Self { 1103 RecvHeaderBlockError::State(err) 1104 } 1105 } 1106