1 use super::*; 2 use crate::codec::{RecvError, UserError}; 3 use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; 4 use crate::{frame, proto}; 5 use std::task::Context; 6 7 use http::{HeaderMap, Request, Response}; 8 9 use std::io; 10 use std::task::{Poll, Waker}; 11 use std::time::{Duration, Instant}; 12 13 #[derive(Debug)] 14 pub(super) struct Recv { 15 /// Initial window size of remote initiated streams 16 init_window_sz: WindowSize, 17 18 /// Connection level flow control governing received data 19 flow: FlowControl, 20 21 /// Amount of connection window capacity currently used by outstanding streams. 22 in_flight_data: WindowSize, 23 24 /// The lowest stream ID that is still idle 25 next_stream_id: Result<StreamId, StreamIdOverflow>, 26 27 /// The stream ID of the last processed stream 28 last_processed_id: StreamId, 29 30 /// Any streams with a higher ID are ignored. 31 /// 32 /// This starts as MAX, but is lowered when a GOAWAY is received. 33 /// 34 /// > After sending a GOAWAY frame, the sender can discard frames for 35 /// > streams initiated by the receiver with identifiers higher than 36 /// > the identified last stream. 37 max_stream_id: StreamId, 38 39 /// Streams that have pending window updates 40 pending_window_updates: store::Queue<stream::NextWindowUpdate>, 41 42 /// New streams to be accepted 43 pending_accept: store::Queue<stream::NextAccept>, 44 45 /// Locally reset streams that should be reaped when they expire 46 pending_reset_expired: store::Queue<stream::NextResetExpire>, 47 48 /// How long locally reset streams should ignore received frames 49 reset_duration: Duration, 50 51 /// Holds frames that are waiting to be read 52 buffer: Buffer<Event>, 53 54 /// Refused StreamId, this represents a frame that must be sent out. 55 refused: Option<StreamId>, 56 57 /// If push promises are allowed to be recevied. 58 is_push_enabled: bool, 59 } 60 61 #[derive(Debug)] 62 pub(super) enum Event { 63 Headers(peer::PollMessage), 64 Data(Bytes), 65 Trailers(HeaderMap), 66 } 67 68 #[derive(Debug)] 69 pub(super) enum RecvHeaderBlockError<T> { 70 Oversize(T), 71 State(RecvError), 72 } 73 74 #[derive(Debug)] 75 pub(crate) enum Open { 76 PushPromise, 77 Headers, 78 } 79 80 #[derive(Debug, Clone, Copy)] 81 struct Indices { 82 head: store::Key, 83 tail: store::Key, 84 } 85 86 impl Recv { new(peer: peer::Dyn, config: &Config) -> Self87 pub fn new(peer: peer::Dyn, config: &Config) -> Self { 88 let next_stream_id = if peer.is_server() { 1 } else { 2 }; 89 90 let mut flow = FlowControl::new(); 91 92 // connections always have the default window size, regardless of 93 // settings 94 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE) 95 .expect("invalid initial remote window size"); 96 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE); 97 98 Recv { 99 init_window_sz: config.local_init_window_sz, 100 flow, 101 in_flight_data: 0 as WindowSize, 102 next_stream_id: Ok(next_stream_id.into()), 103 pending_window_updates: store::Queue::new(), 104 last_processed_id: StreamId::ZERO, 105 max_stream_id: StreamId::MAX, 106 pending_accept: store::Queue::new(), 107 pending_reset_expired: store::Queue::new(), 108 reset_duration: config.local_reset_duration, 109 buffer: Buffer::new(), 110 refused: None, 111 is_push_enabled: config.local_push_enabled, 112 } 113 } 114 115 /// Returns the initial receive window size init_window_sz(&self) -> WindowSize116 pub fn init_window_sz(&self) -> WindowSize { 117 self.init_window_sz 118 } 119 120 /// Returns the ID of the last processed stream last_processed_id(&self) -> StreamId121 pub fn last_processed_id(&self) -> StreamId { 122 self.last_processed_id 123 } 124 125 /// Update state reflecting a new, remotely opened stream 126 /// 127 /// Returns the stream state if successful. `None` if refused open( &mut self, id: StreamId, mode: Open, counts: &mut Counts, ) -> Result<Option<StreamId>, RecvError>128 pub fn open( 129 &mut self, 130 id: StreamId, 131 mode: Open, 132 counts: &mut Counts, 133 ) -> Result<Option<StreamId>, RecvError> { 134 assert!(self.refused.is_none()); 135 136 counts.peer().ensure_can_open(id, mode)?; 137 138 let next_id = self.next_stream_id()?; 139 if id < next_id { 140 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id); 141 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 142 } 143 144 self.next_stream_id = id.next_id(); 145 146 if !counts.can_inc_num_recv_streams() { 147 self.refused = Some(id); 148 return Ok(None); 149 } 150 151 Ok(Some(id)) 152 } 153 154 /// Transition the stream state based on receiving headers 155 /// 156 /// The caller ensures that the frame represents headers and not trailers. recv_headers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, counts: &mut Counts, ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>>157 pub fn recv_headers( 158 &mut self, 159 frame: frame::Headers, 160 stream: &mut store::Ptr, 161 counts: &mut Counts, 162 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> { 163 log::trace!("opening stream; init_window={}", self.init_window_sz); 164 let is_initial = stream.state.recv_open(frame.is_end_stream())?; 165 166 if is_initial { 167 // TODO: be smarter about this logic 168 if frame.stream_id() > self.last_processed_id { 169 self.last_processed_id = frame.stream_id(); 170 } 171 172 // Increment the number of concurrent streams 173 counts.inc_num_recv_streams(stream); 174 } 175 176 if !stream.content_length.is_head() { 177 use super::stream::ContentLength; 178 use http::header; 179 180 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) { 181 let content_length = match frame::parse_u64(content_length.as_bytes()) { 182 Ok(v) => v, 183 Err(()) => { 184 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id); 185 return Err(RecvError::Stream { 186 id: stream.id, 187 reason: Reason::PROTOCOL_ERROR, 188 } 189 .into()); 190 } 191 }; 192 193 stream.content_length = ContentLength::Remaining(content_length); 194 } 195 } 196 197 if frame.is_over_size() { 198 // A frame is over size if the decoded header block was bigger than 199 // SETTINGS_MAX_HEADER_LIST_SIZE. 200 // 201 // > A server that receives a larger header block than it is willing 202 // > to handle can send an HTTP 431 (Request Header Fields Too 203 // > Large) status code [RFC6585]. A client can discard responses 204 // > that it cannot process. 205 // 206 // So, if peer is a server, we'll send a 431. In either case, 207 // an error is recorded, which will send a REFUSED_STREAM, 208 // since we don't want any of the data frames either. 209 log::debug!( 210 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \ 211 recv_headers: frame is over size; stream={:?}", 212 stream.id 213 ); 214 return if counts.peer().is_server() && is_initial { 215 let mut res = frame::Headers::new( 216 stream.id, 217 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE), 218 HeaderMap::new(), 219 ); 220 res.set_end_stream(); 221 Err(RecvHeaderBlockError::Oversize(Some(res))) 222 } else { 223 Err(RecvHeaderBlockError::Oversize(None)) 224 }; 225 } 226 227 let stream_id = frame.stream_id(); 228 let (pseudo, fields) = frame.into_parts(); 229 let message = counts 230 .peer() 231 .convert_poll_message(pseudo, fields, stream_id)?; 232 233 // Push the frame onto the stream's recv buffer 234 stream 235 .pending_recv 236 .push_back(&mut self.buffer, Event::Headers(message)); 237 stream.notify_recv(); 238 239 // Only servers can receive a headers frame that initiates the stream. 240 // This is verified in `Streams` before calling this function. 241 if counts.peer().is_server() { 242 self.pending_accept.push(stream); 243 } 244 245 Ok(()) 246 } 247 248 /// Called by the server to get the request 249 /// 250 /// TODO: Should this fn return `Result`? take_request(&mut self, stream: &mut store::Ptr) -> Request<()>251 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> { 252 use super::peer::PollMessage::*; 253 254 match stream.pending_recv.pop_front(&mut self.buffer) { 255 Some(Event::Headers(Server(request))) => request, 256 _ => panic!(), 257 } 258 } 259 260 /// Called by the client to get pushed response poll_pushed( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>>261 pub fn poll_pushed( 262 &mut self, 263 cx: &Context, 264 stream: &mut store::Ptr, 265 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> { 266 use super::peer::PollMessage::*; 267 268 let mut ppp = stream.pending_push_promises.take(); 269 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| { 270 match pushed.pending_recv.pop_front(&mut self.buffer) { 271 Some(Event::Headers(Server(headers))) => (headers, pushed.key()), 272 // When frames are pushed into the queue, it is verified that 273 // the first frame is a HEADERS frame. 274 _ => panic!("Headers not set on pushed stream"), 275 } 276 }); 277 stream.pending_push_promises = ppp; 278 if let Some(p) = pushed { 279 Poll::Ready(Some(Ok(p))) 280 } else { 281 let is_open = stream.state.ensure_recv_open()?; 282 283 if is_open { 284 stream.recv_task = Some(cx.waker().clone()); 285 Poll::Pending 286 } else { 287 Poll::Ready(None) 288 } 289 } 290 } 291 292 /// Called by the client to get the response poll_response( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll<Result<Response<()>, proto::Error>>293 pub fn poll_response( 294 &mut self, 295 cx: &Context, 296 stream: &mut store::Ptr, 297 ) -> Poll<Result<Response<()>, proto::Error>> { 298 use super::peer::PollMessage::*; 299 300 // If the buffer is not empty, then the first frame must be a HEADERS 301 // frame or the user violated the contract. 302 match stream.pending_recv.pop_front(&mut self.buffer) { 303 Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), 304 Some(_) => panic!("poll_response called after response returned"), 305 None => { 306 stream.state.ensure_recv_open()?; 307 308 stream.recv_task = Some(cx.waker().clone()); 309 Poll::Pending 310 } 311 } 312 } 313 314 /// Transition the stream based on receiving trailers recv_trailers( &mut self, frame: frame::Headers, stream: &mut store::Ptr, ) -> Result<(), RecvError>315 pub fn recv_trailers( 316 &mut self, 317 frame: frame::Headers, 318 stream: &mut store::Ptr, 319 ) -> Result<(), RecvError> { 320 // Transition the state 321 stream.state.recv_close()?; 322 323 if stream.ensure_content_length_zero().is_err() { 324 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id); 325 return Err(RecvError::Stream { 326 id: stream.id, 327 reason: Reason::PROTOCOL_ERROR, 328 }); 329 } 330 331 let trailers = frame.into_fields(); 332 333 // Push the frame onto the stream's recv buffer 334 stream 335 .pending_recv 336 .push_back(&mut self.buffer, Event::Trailers(trailers)); 337 stream.notify_recv(); 338 339 Ok(()) 340 } 341 342 /// Releases capacity of the connection release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>)343 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) { 344 log::trace!( 345 "release_connection_capacity; size={}, connection in_flight_data={}", 346 capacity, 347 self.in_flight_data, 348 ); 349 350 // Decrement in-flight data 351 self.in_flight_data -= capacity; 352 353 // Assign capacity to connection 354 self.flow.assign_capacity(capacity); 355 356 if self.flow.unclaimed_capacity().is_some() { 357 if let Some(task) = task.take() { 358 task.wake(); 359 } 360 } 361 } 362 363 /// Releases capacity back to the connection & stream release_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, task: &mut Option<Waker>, ) -> Result<(), UserError>364 pub fn release_capacity( 365 &mut self, 366 capacity: WindowSize, 367 stream: &mut store::Ptr, 368 task: &mut Option<Waker>, 369 ) -> Result<(), UserError> { 370 log::trace!("release_capacity; size={}", capacity); 371 372 if capacity > stream.in_flight_recv_data { 373 return Err(UserError::ReleaseCapacityTooBig); 374 } 375 376 self.release_connection_capacity(capacity, task); 377 378 // Decrement in-flight data 379 stream.in_flight_recv_data -= capacity; 380 381 // Assign capacity to stream 382 stream.recv_flow.assign_capacity(capacity); 383 384 if stream.recv_flow.unclaimed_capacity().is_some() { 385 // Queue the stream for sending the WINDOW_UPDATE frame. 386 self.pending_window_updates.push(stream); 387 388 if let Some(task) = task.take() { 389 task.wake(); 390 } 391 } 392 393 Ok(()) 394 } 395 396 /// Release any unclaimed capacity for a closed stream. release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)397 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { 398 debug_assert_eq!(stream.ref_count, 0); 399 400 if stream.in_flight_recv_data == 0 { 401 return; 402 } 403 404 log::trace!( 405 "auto-release closed stream ({:?}) capacity: {:?}", 406 stream.id, 407 stream.in_flight_recv_data, 408 ); 409 410 self.release_connection_capacity(stream.in_flight_recv_data, task); 411 stream.in_flight_recv_data = 0; 412 413 self.clear_recv_buffer(stream); 414 } 415 416 /// Set the "target" connection window size. 417 /// 418 /// By default, all new connections start with 64kb of window size. As 419 /// streams used and release capacity, we will send WINDOW_UPDATEs for the 420 /// connection to bring it back up to the initial "target". 421 /// 422 /// Setting a target means that we will try to tell the peer about 423 /// WINDOW_UPDATEs so the peer knows it has about `target` window to use 424 /// for the whole connection. 425 /// 426 /// The `task` is an optional parked task for the `Connection` that might 427 /// be blocked on needing more window capacity. set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>)428 pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) { 429 log::trace!( 430 "set_target_connection_window; target={}; available={}, reserved={}", 431 target, 432 self.flow.available(), 433 self.in_flight_data, 434 ); 435 436 // The current target connection window is our `available` plus any 437 // in-flight data reserved by streams. 438 // 439 // Update the flow controller with the difference between the new 440 // target and the current target. 441 let current = (self.flow.available() + self.in_flight_data).checked_size(); 442 if target > current { 443 self.flow.assign_capacity(target - current); 444 } else { 445 self.flow.claim_capacity(current - target); 446 } 447 448 // If changing the target capacity means we gained a bunch of capacity, 449 // enough that we went over the update threshold, then schedule sending 450 // a connection WINDOW_UPDATE. 451 if self.flow.unclaimed_capacity().is_some() { 452 if let Some(task) = task.take() { 453 task.wake(); 454 } 455 } 456 } 457 apply_local_settings( &mut self, settings: &frame::Settings, store: &mut Store, ) -> Result<(), RecvError>458 pub(crate) fn apply_local_settings( 459 &mut self, 460 settings: &frame::Settings, 461 store: &mut Store, 462 ) -> Result<(), RecvError> { 463 let target = if let Some(val) = settings.initial_window_size() { 464 val 465 } else { 466 return Ok(()); 467 }; 468 469 let old_sz = self.init_window_sz; 470 self.init_window_sz = target; 471 472 log::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); 473 474 // Per RFC 7540 §6.9.2: 475 // 476 // In addition to changing the flow-control window for streams that are 477 // not yet active, a SETTINGS frame can alter the initial flow-control 478 // window size for streams with active flow-control windows (that is, 479 // streams in the "open" or "half-closed (remote)" state). When the 480 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust 481 // the size of all stream flow-control windows that it maintains by the 482 // difference between the new value and the old value. 483 // 484 // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available 485 // space in a flow-control window to become negative. A sender MUST 486 // track the negative flow-control window and MUST NOT send new 487 // flow-controlled frames until it receives WINDOW_UPDATE frames that 488 // cause the flow-control window to become positive. 489 490 if target < old_sz { 491 // We must decrease the (local) window on every open stream. 492 let dec = old_sz - target; 493 log::trace!("decrementing all windows; dec={}", dec); 494 495 store.for_each(|mut stream| { 496 stream.recv_flow.dec_recv_window(dec); 497 Ok(()) 498 }) 499 } else if target > old_sz { 500 // We must increase the (local) window on every open stream. 501 let inc = target - old_sz; 502 log::trace!("incrementing all windows; inc={}", inc); 503 store.for_each(|mut stream| { 504 // XXX: Shouldn't the peer have already noticed our 505 // overflow and sent us a GOAWAY? 506 stream 507 .recv_flow 508 .inc_window(inc) 509 .map_err(RecvError::Connection)?; 510 stream.recv_flow.assign_capacity(inc); 511 Ok(()) 512 }) 513 } else { 514 // size is the same... so do nothing 515 Ok(()) 516 } 517 } 518 is_end_stream(&self, stream: &store::Ptr) -> bool519 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool { 520 if !stream.state.is_recv_closed() { 521 return false; 522 } 523 524 stream.pending_recv.is_empty() 525 } 526 recv_data( &mut self, frame: frame::Data, stream: &mut store::Ptr, ) -> Result<(), RecvError>527 pub fn recv_data( 528 &mut self, 529 frame: frame::Data, 530 stream: &mut store::Ptr, 531 ) -> Result<(), RecvError> { 532 let sz = frame.payload().len(); 533 534 // This should have been enforced at the codec::FramedRead layer, so 535 // this is just a sanity check. 536 assert!(sz <= MAX_WINDOW_SIZE as usize); 537 538 let sz = sz as WindowSize; 539 540 let is_ignoring_frame = stream.state.is_local_reset(); 541 542 if !is_ignoring_frame && !stream.state.is_recv_streaming() { 543 // TODO: There are cases where this can be a stream error of 544 // STREAM_CLOSED instead... 545 546 // Receiving a DATA frame when not expecting one is a protocol 547 // error. 548 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id); 549 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 550 } 551 552 log::trace!( 553 "recv_data; size={}; connection={}; stream={}", 554 sz, 555 self.flow.window_size(), 556 stream.recv_flow.window_size() 557 ); 558 559 if is_ignoring_frame { 560 log::trace!( 561 "recv_data; frame ignored on locally reset {:?} for some time", 562 stream.id, 563 ); 564 return self.ignore_data(sz); 565 } 566 567 // Ensure that there is enough capacity on the connection before acting 568 // on the stream. 569 self.consume_connection_window(sz)?; 570 571 if stream.recv_flow.window_size() < sz { 572 // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE 573 // > A receiver MAY respond with a stream error (Section 5.4.2) or 574 // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if 575 // > it is unable to accept a frame. 576 // 577 // So, for violating the **stream** window, we can send either a 578 // stream or connection error. We've opted to send a stream 579 // error. 580 return Err(RecvError::Stream { 581 id: stream.id, 582 reason: Reason::FLOW_CONTROL_ERROR, 583 }); 584 } 585 586 if stream.dec_content_length(frame.payload().len()).is_err() { 587 proto_err!(stream: 588 "recv_data: content-length overflow; stream={:?}; len={:?}", 589 stream.id, 590 frame.payload().len(), 591 ); 592 return Err(RecvError::Stream { 593 id: stream.id, 594 reason: Reason::PROTOCOL_ERROR, 595 }); 596 } 597 598 if frame.is_end_stream() { 599 if stream.ensure_content_length_zero().is_err() { 600 proto_err!(stream: 601 "recv_data: content-length underflow; stream={:?}; len={:?}", 602 stream.id, 603 frame.payload().len(), 604 ); 605 return Err(RecvError::Stream { 606 id: stream.id, 607 reason: Reason::PROTOCOL_ERROR, 608 }); 609 } 610 611 if stream.state.recv_close().is_err() { 612 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id); 613 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 614 } 615 } 616 617 // Update stream level flow control 618 stream.recv_flow.send_data(sz); 619 620 // Track the data as in-flight 621 stream.in_flight_recv_data += sz; 622 623 let event = Event::Data(frame.into_payload()); 624 625 // Push the frame onto the recv buffer 626 stream.pending_recv.push_back(&mut self.buffer, event); 627 stream.notify_recv(); 628 629 Ok(()) 630 } 631 ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError>632 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError> { 633 // Ensure that there is enough capacity on the connection... 634 self.consume_connection_window(sz)?; 635 636 // Since we are ignoring this frame, 637 // we aren't returning the frame to the user. That means they 638 // have no way to release the capacity back to the connection. So 639 // we have to release it automatically. 640 // 641 // This call doesn't send a WINDOW_UPDATE immediately, just marks 642 // the capacity as available to be reclaimed. When the available 643 // capacity meets a threshold, a WINDOW_UPDATE is then sent. 644 self.release_connection_capacity(sz, &mut None); 645 Ok(()) 646 } 647 consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError>648 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> { 649 if self.flow.window_size() < sz { 650 log::debug!( 651 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});", 652 self.flow.window_size(), 653 sz, 654 ); 655 return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR)); 656 } 657 658 // Update connection level flow control 659 self.flow.send_data(sz); 660 661 // Track the data as in-flight 662 self.in_flight_data += sz; 663 Ok(()) 664 } 665 recv_push_promise( &mut self, frame: frame::PushPromise, stream: &mut store::Ptr, ) -> Result<(), RecvError>666 pub fn recv_push_promise( 667 &mut self, 668 frame: frame::PushPromise, 669 stream: &mut store::Ptr, 670 ) -> Result<(), RecvError> { 671 stream.state.reserve_remote()?; 672 if frame.is_over_size() { 673 // A frame is over size if the decoded header block was bigger than 674 // SETTINGS_MAX_HEADER_LIST_SIZE. 675 // 676 // > A server that receives a larger header block than it is willing 677 // > to handle can send an HTTP 431 (Request Header Fields Too 678 // > Large) status code [RFC6585]. A client can discard responses 679 // > that it cannot process. 680 // 681 // So, if peer is a server, we'll send a 431. In either case, 682 // an error is recorded, which will send a REFUSED_STREAM, 683 // since we don't want any of the data frames either. 684 log::debug!( 685 "stream error REFUSED_STREAM -- recv_push_promise: \ 686 headers frame is over size; promised_id={:?};", 687 frame.promised_id(), 688 ); 689 return Err(RecvError::Stream { 690 id: frame.promised_id(), 691 reason: Reason::REFUSED_STREAM, 692 }); 693 } 694 695 let promised_id = frame.promised_id(); 696 let (pseudo, fields) = frame.into_parts(); 697 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?; 698 699 if let Err(e) = frame::PushPromise::validate_request(&req) { 700 use PushPromiseHeaderError::*; 701 match e { 702 NotSafeAndCacheable => proto_err!( 703 stream: 704 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}", 705 req.method(), 706 promised_id, 707 ), 708 InvalidContentLength(e) => proto_err!( 709 stream: 710 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}", 711 e, 712 promised_id, 713 ), 714 } 715 return Err(RecvError::Stream { 716 id: promised_id, 717 reason: Reason::PROTOCOL_ERROR, 718 }); 719 } 720 721 use super::peer::PollMessage::*; 722 stream 723 .pending_recv 724 .push_back(&mut self.buffer, Event::Headers(Server(req))); 725 stream.notify_recv(); 726 Ok(()) 727 } 728 729 /// Ensures that `id` is not in the `Idle` state. ensure_not_idle(&self, id: StreamId) -> Result<(), Reason>730 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { 731 if let Ok(next) = self.next_stream_id { 732 if id >= next { 733 log::debug!( 734 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}", 735 id 736 ); 737 return Err(Reason::PROTOCOL_ERROR); 738 } 739 } 740 // if next_stream_id is overflowed, that's ok. 741 742 Ok(()) 743 } 744 745 /// Handle remote sending an explicit RST_STREAM. recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream)746 pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { 747 // Notify the stream 748 stream 749 .state 750 .recv_reset(frame.reason(), stream.is_pending_send); 751 752 stream.notify_send(); 753 stream.notify_recv(); 754 } 755 756 /// Handle a received error recv_err(&mut self, err: &proto::Error, stream: &mut Stream)757 pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) { 758 // Receive an error 759 stream.state.recv_err(err); 760 761 // If a receiver is waiting, notify it 762 stream.notify_send(); 763 stream.notify_recv(); 764 } 765 go_away(&mut self, last_processed_id: StreamId)766 pub fn go_away(&mut self, last_processed_id: StreamId) { 767 assert!(self.max_stream_id >= last_processed_id); 768 self.max_stream_id = last_processed_id; 769 } 770 recv_eof(&mut self, stream: &mut Stream)771 pub fn recv_eof(&mut self, stream: &mut Stream) { 772 stream.state.recv_eof(); 773 stream.notify_send(); 774 stream.notify_recv(); 775 } 776 clear_recv_buffer(&mut self, stream: &mut Stream)777 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) { 778 while let Some(_) = stream.pending_recv.pop_front(&mut self.buffer) { 779 // drop it 780 } 781 } 782 783 /// Get the max ID of streams we can receive. 784 /// 785 /// This gets lowered if we send a GOAWAY frame. max_stream_id(&self) -> StreamId786 pub fn max_stream_id(&self) -> StreamId { 787 self.max_stream_id 788 } 789 next_stream_id(&self) -> Result<StreamId, RecvError>790 pub fn next_stream_id(&self) -> Result<StreamId, RecvError> { 791 if let Ok(id) = self.next_stream_id { 792 Ok(id) 793 } else { 794 Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) 795 } 796 } 797 may_have_created_stream(&self, id: StreamId) -> bool798 pub fn may_have_created_stream(&self, id: StreamId) -> bool { 799 if let Ok(next_id) = self.next_stream_id { 800 // Peer::is_local_init should have been called beforehand 801 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); 802 id < next_id 803 } else { 804 true 805 } 806 } 807 808 /// Returns true if the remote peer can reserve a stream with the given ID. ensure_can_reserve(&self) -> Result<(), RecvError>809 pub fn ensure_can_reserve(&self) -> Result<(), RecvError> { 810 if !self.is_push_enabled { 811 proto_err!(conn: "recv_push_promise: push is disabled"); 812 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); 813 } 814 815 Ok(()) 816 } 817 818 /// Add a locally reset stream to queue to be eventually reaped. enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts)819 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 820 if !stream.state.is_local_reset() || stream.is_pending_reset_expiration() { 821 return; 822 } 823 824 log::trace!("enqueue_reset_expiration; {:?}", stream.id); 825 826 if !counts.can_inc_num_reset_streams() { 827 // try to evict 1 stream if possible 828 // if max allow is 0, this won't be able to evict, 829 // and then we'll just bail after 830 if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) { 831 counts.transition_after(evicted, true); 832 } 833 } 834 835 if counts.can_inc_num_reset_streams() { 836 counts.inc_num_reset_streams(); 837 self.pending_reset_expired.push(stream); 838 } 839 } 840 841 /// Send any pending refusals. send_pending_refusal<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,842 pub fn send_pending_refusal<T, B>( 843 &mut self, 844 cx: &mut Context, 845 dst: &mut Codec<T, Prioritized<B>>, 846 ) -> Poll<io::Result<()>> 847 where 848 T: AsyncWrite + Unpin, 849 B: Buf, 850 { 851 if let Some(stream_id) = self.refused { 852 ready!(dst.poll_ready(cx))?; 853 854 // Create the RST_STREAM frame 855 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM); 856 857 // Buffer the frame 858 dst.buffer(frame.into()).expect("invalid RST_STREAM frame"); 859 } 860 861 self.refused = None; 862 863 Poll::Ready(Ok(())) 864 } 865 clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)866 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { 867 let now = Instant::now(); 868 let reset_duration = self.reset_duration; 869 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| { 870 let reset_at = stream.reset_at.expect("reset_at must be set if in queue"); 871 now - reset_at > reset_duration 872 }) { 873 counts.transition_after(stream, true); 874 } 875 } 876 clear_queues( &mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts, )877 pub fn clear_queues( 878 &mut self, 879 clear_pending_accept: bool, 880 store: &mut Store, 881 counts: &mut Counts, 882 ) { 883 self.clear_stream_window_update_queue(store, counts); 884 self.clear_all_reset_streams(store, counts); 885 886 if clear_pending_accept { 887 self.clear_all_pending_accept(store, counts); 888 } 889 } 890 clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts)891 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) { 892 while let Some(stream) = self.pending_window_updates.pop(store) { 893 counts.transition(stream, |_, stream| { 894 log::trace!("clear_stream_window_update_queue; stream={:?}", stream.id); 895 }) 896 } 897 } 898 899 /// Called on EOF clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts)900 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { 901 while let Some(stream) = self.pending_reset_expired.pop(store) { 902 counts.transition_after(stream, true); 903 } 904 } 905 clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts)906 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) { 907 while let Some(stream) = self.pending_accept.pop(store) { 908 counts.transition_after(stream, false); 909 } 910 } 911 poll_complete<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,912 pub fn poll_complete<T, B>( 913 &mut self, 914 cx: &mut Context, 915 store: &mut Store, 916 counts: &mut Counts, 917 dst: &mut Codec<T, Prioritized<B>>, 918 ) -> Poll<io::Result<()>> 919 where 920 T: AsyncWrite + Unpin, 921 B: Buf, 922 { 923 // Send any pending connection level window updates 924 ready!(self.send_connection_window_update(cx, dst))?; 925 926 // Send any pending stream level window updates 927 ready!(self.send_stream_window_updates(cx, store, counts, dst))?; 928 929 Poll::Ready(Ok(())) 930 } 931 932 /// Send connection level window update send_connection_window_update<T, B>( &mut self, cx: &mut Context, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,933 fn send_connection_window_update<T, B>( 934 &mut self, 935 cx: &mut Context, 936 dst: &mut Codec<T, Prioritized<B>>, 937 ) -> Poll<io::Result<()>> 938 where 939 T: AsyncWrite + Unpin, 940 B: Buf, 941 { 942 if let Some(incr) = self.flow.unclaimed_capacity() { 943 let frame = frame::WindowUpdate::new(StreamId::zero(), incr); 944 945 // Ensure the codec has capacity 946 ready!(dst.poll_ready(cx))?; 947 948 // Buffer the WINDOW_UPDATE frame 949 dst.buffer(frame.into()) 950 .expect("invalid WINDOW_UPDATE frame"); 951 952 // Update flow control 953 self.flow 954 .inc_window(incr) 955 .expect("unexpected flow control state"); 956 } 957 958 Poll::Ready(Ok(())) 959 } 960 961 /// Send stream level window update send_stream_window_updates<T, B>( &mut self, cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,962 pub fn send_stream_window_updates<T, B>( 963 &mut self, 964 cx: &mut Context, 965 store: &mut Store, 966 counts: &mut Counts, 967 dst: &mut Codec<T, Prioritized<B>>, 968 ) -> Poll<io::Result<()>> 969 where 970 T: AsyncWrite + Unpin, 971 B: Buf, 972 { 973 loop { 974 // Ensure the codec has capacity 975 ready!(dst.poll_ready(cx))?; 976 977 // Get the next stream 978 let stream = match self.pending_window_updates.pop(store) { 979 Some(stream) => stream, 980 None => return Poll::Ready(Ok(())), 981 }; 982 983 counts.transition(stream, |_, stream| { 984 log::trace!("pending_window_updates -- pop; stream={:?}", stream.id); 985 debug_assert!(!stream.is_pending_window_update); 986 987 if !stream.state.is_recv_streaming() { 988 // No need to send window updates on the stream if the stream is 989 // no longer receiving data. 990 // 991 // TODO: is this correct? We could possibly send a window 992 // update on a ReservedRemote stream if we already know 993 // we want to stream the data faster... 994 return; 995 } 996 997 // TODO: de-dup 998 if let Some(incr) = stream.recv_flow.unclaimed_capacity() { 999 // Create the WINDOW_UPDATE frame 1000 let frame = frame::WindowUpdate::new(stream.id, incr); 1001 1002 // Buffer it 1003 dst.buffer(frame.into()) 1004 .expect("invalid WINDOW_UPDATE frame"); 1005 1006 // Update flow control 1007 stream 1008 .recv_flow 1009 .inc_window(incr) 1010 .expect("unexpected flow control state"); 1011 } 1012 }) 1013 } 1014 } 1015 next_incoming(&mut self, store: &mut Store) -> Option<store::Key>1016 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> { 1017 self.pending_accept.pop(store).map(|ptr| ptr.key()) 1018 } 1019 poll_data( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<Bytes, proto::Error>>>1020 pub fn poll_data( 1021 &mut self, 1022 cx: &Context, 1023 stream: &mut Stream, 1024 ) -> Poll<Option<Result<Bytes, proto::Error>>> { 1025 // TODO: Return error when the stream is reset 1026 match stream.pending_recv.pop_front(&mut self.buffer) { 1027 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), 1028 Some(event) => { 1029 // Frame is trailer 1030 stream.pending_recv.push_front(&mut self.buffer, event); 1031 1032 // Notify the recv task. This is done just in case 1033 // `poll_trailers` was called. 1034 // 1035 // It is very likely that `notify_recv` will just be a no-op (as 1036 // the task will be None), so this isn't really much of a 1037 // performance concern. It also means we don't have to track 1038 // state to see if `poll_trailers` was called before `poll_data` 1039 // returned `None`. 1040 stream.notify_recv(); 1041 1042 // No more data frames 1043 Poll::Ready(None) 1044 } 1045 None => self.schedule_recv(cx, stream), 1046 } 1047 } 1048 poll_trailers( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<HeaderMap, proto::Error>>>1049 pub fn poll_trailers( 1050 &mut self, 1051 cx: &Context, 1052 stream: &mut Stream, 1053 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> { 1054 match stream.pending_recv.pop_front(&mut self.buffer) { 1055 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))), 1056 Some(event) => { 1057 // Frame is not trailers.. not ready to poll trailers yet. 1058 stream.pending_recv.push_front(&mut self.buffer, event); 1059 1060 Poll::Pending 1061 } 1062 None => self.schedule_recv(cx, stream), 1063 } 1064 } 1065 schedule_recv<T>( &mut self, cx: &Context, stream: &mut Stream, ) -> Poll<Option<Result<T, proto::Error>>>1066 fn schedule_recv<T>( 1067 &mut self, 1068 cx: &Context, 1069 stream: &mut Stream, 1070 ) -> Poll<Option<Result<T, proto::Error>>> { 1071 if stream.state.ensure_recv_open()? { 1072 // Request to get notified once more frames arrive 1073 stream.recv_task = Some(cx.waker().clone()); 1074 Poll::Pending 1075 } else { 1076 // No more frames will be received 1077 Poll::Ready(None) 1078 } 1079 } 1080 } 1081 1082 // ===== impl Open ===== 1083 1084 impl Open { is_push_promise(&self) -> bool1085 pub fn is_push_promise(&self) -> bool { 1086 use self::Open::*; 1087 1088 match *self { 1089 PushPromise => true, 1090 _ => false, 1091 } 1092 } 1093 } 1094 1095 // ===== impl RecvHeaderBlockError ===== 1096 1097 impl<T> From<RecvError> for RecvHeaderBlockError<T> { from(err: RecvError) -> Self1098 fn from(err: RecvError) -> Self { 1099 RecvHeaderBlockError::State(err) 1100 } 1101 } 1102