1 use super::store::Resolve; 2 use super::*; 3 4 use crate::frame::{Reason, StreamId}; 5 6 use crate::codec::UserError; 7 use crate::codec::UserError::*; 8 9 use bytes::buf::{Buf, Take}; 10 use std::io; 11 use std::task::{Context, Poll, Waker}; 12 use std::{cmp, fmt, mem}; 13 14 /// # Warning 15 /// 16 /// Queued streams are ordered by stream ID, as we need to ensure that 17 /// lower-numbered streams are sent headers before higher-numbered ones. 18 /// This is because "idle" stream IDs – those which have been initiated but 19 /// have yet to receive frames – will be implicitly closed on receipt of a 20 /// frame on a higher stream ID. If these queues was not ordered by stream 21 /// IDs, some mechanism would be necessary to ensure that the lowest-numbered] 22 /// idle stream is opened first. 23 #[derive(Debug)] 24 pub(super) struct Prioritize { 25 /// Queue of streams waiting for socket capacity to send a frame. 26 pending_send: store::Queue<stream::NextSend>, 27 28 /// Queue of streams waiting for window capacity to produce data. 29 pending_capacity: store::Queue<stream::NextSendCapacity>, 30 31 /// Streams waiting for capacity due to max concurrency 32 /// 33 /// The `SendRequest` handle is `Clone`. This enables initiating requests 34 /// from many tasks. However, offering this capability while supporting 35 /// backpressure at some level is tricky. If there are many `SendRequest` 36 /// handles and a single stream becomes available, which handle gets 37 /// assigned that stream? Maybe that handle is no longer ready to send a 38 /// request. 39 /// 40 /// The strategy used is to allow each `SendRequest` handle one buffered 41 /// request. A `SendRequest` handle is ready to send a request if it has no 42 /// associated buffered requests. This is the same strategy as `mpsc` in the 43 /// futures library. 44 pending_open: store::Queue<stream::NextOpen>, 45 46 /// Connection level flow control governing sent data 47 flow: FlowControl, 48 49 /// Stream ID of the last stream opened. 50 last_opened_id: StreamId, 51 52 /// What `DATA` frame is currently being sent in the codec. 53 in_flight_data_frame: InFlightData, 54 } 55 56 #[derive(Debug, Eq, PartialEq)] 57 enum InFlightData { 58 /// There is no `DATA` frame in flight. 59 Nothing, 60 /// There is a `DATA` frame in flight belonging to the given stream. 61 DataFrame(store::Key), 62 /// There was a `DATA` frame, but the stream's queue was since cleared. 63 Drop, 64 } 65 66 pub(crate) struct Prioritized<B> { 67 // The buffer 68 inner: Take<B>, 69 70 end_of_stream: bool, 71 72 // The stream that this is associated with 73 stream: store::Key, 74 } 75 76 // ===== impl Prioritize ===== 77 78 impl Prioritize { new(config: &Config) -> Prioritize79 pub fn new(config: &Config) -> Prioritize { 80 let mut flow = FlowControl::new(); 81 82 flow.inc_window(config.remote_init_window_sz) 83 .expect("invalid initial window size"); 84 85 flow.assign_capacity(config.remote_init_window_sz); 86 87 tracing::trace!("Prioritize::new; flow={:?}", flow); 88 89 Prioritize { 90 pending_send: store::Queue::new(), 91 pending_capacity: store::Queue::new(), 92 pending_open: store::Queue::new(), 93 flow, 94 last_opened_id: StreamId::ZERO, 95 in_flight_data_frame: InFlightData::Nothing, 96 } 97 } 98 99 /// Queue a frame to be sent to the remote queue_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, task: &mut Option<Waker>, )100 pub fn queue_frame<B>( 101 &mut self, 102 frame: Frame<B>, 103 buffer: &mut Buffer<Frame<B>>, 104 stream: &mut store::Ptr, 105 task: &mut Option<Waker>, 106 ) { 107 let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); 108 let _e = span.enter(); 109 // Queue the frame in the buffer 110 stream.pending_send.push_back(buffer, frame); 111 self.schedule_send(stream, task); 112 } 113 schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)114 pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { 115 // If the stream is waiting to be opened, nothing more to do. 116 if stream.is_send_ready() { 117 tracing::trace!(?stream.id, "schedule_send"); 118 // Queue the stream 119 self.pending_send.push(stream); 120 121 // Notify the connection. 122 if let Some(task) = task.take() { 123 task.wake(); 124 } 125 } 126 } 127 queue_open(&mut self, stream: &mut store::Ptr)128 pub fn queue_open(&mut self, stream: &mut store::Ptr) { 129 self.pending_open.push(stream); 130 } 131 132 /// Send a data frame send_data<B>( &mut self, frame: frame::Data<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option<Waker>, ) -> Result<(), UserError> where B: Buf,133 pub fn send_data<B>( 134 &mut self, 135 frame: frame::Data<B>, 136 buffer: &mut Buffer<Frame<B>>, 137 stream: &mut store::Ptr, 138 counts: &mut Counts, 139 task: &mut Option<Waker>, 140 ) -> Result<(), UserError> 141 where 142 B: Buf, 143 { 144 let sz = frame.payload().remaining(); 145 146 if sz > MAX_WINDOW_SIZE as usize { 147 return Err(UserError::PayloadTooBig); 148 } 149 150 let sz = sz as WindowSize; 151 152 if !stream.state.is_send_streaming() { 153 if stream.state.is_closed() { 154 return Err(InactiveStreamId); 155 } else { 156 return Err(UnexpectedFrameType); 157 } 158 } 159 160 // Update the buffered data counter 161 stream.buffered_send_data += sz; 162 163 let span = 164 tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity); 165 let _e = span.enter(); 166 tracing::trace!(buffered = stream.buffered_send_data); 167 168 // Implicitly request more send capacity if not enough has been 169 // requested yet. 170 if stream.requested_send_capacity < stream.buffered_send_data { 171 // Update the target requested capacity 172 stream.requested_send_capacity = stream.buffered_send_data; 173 174 self.try_assign_capacity(stream); 175 } 176 177 if frame.is_end_stream() { 178 stream.state.send_close(); 179 self.reserve_capacity(0, stream, counts); 180 } 181 182 tracing::trace!( 183 available = %stream.send_flow.available(), 184 buffered = stream.buffered_send_data, 185 ); 186 187 // The `stream.buffered_send_data == 0` check is here so that, if a zero 188 // length data frame is queued to the front (there is no previously 189 // queued data), it gets sent out immediately even if there is no 190 // available send window. 191 // 192 // Sending out zero length data frames can be done to signal 193 // end-of-stream. 194 // 195 if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { 196 // The stream currently has capacity to send the data frame, so 197 // queue it up and notify the connection task. 198 self.queue_frame(frame.into(), buffer, stream, task); 199 } else { 200 // The stream has no capacity to send the frame now, save it but 201 // don't notify the connection task. Once additional capacity 202 // becomes available, the frame will be flushed. 203 stream.pending_send.push_back(buffer, frame.into()); 204 } 205 206 Ok(()) 207 } 208 209 /// Request capacity to send data reserve_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, counts: &mut Counts, )210 pub fn reserve_capacity( 211 &mut self, 212 capacity: WindowSize, 213 stream: &mut store::Ptr, 214 counts: &mut Counts, 215 ) { 216 let span = tracing::trace_span!( 217 "reserve_capacity", 218 ?stream.id, 219 requested = capacity, 220 effective = capacity + stream.buffered_send_data, 221 curr = stream.requested_send_capacity 222 ); 223 let _e = span.enter(); 224 225 // Actual capacity is `capacity` + the current amount of buffered data. 226 // If it were less, then we could never send out the buffered data. 227 let capacity = capacity + stream.buffered_send_data; 228 229 if capacity == stream.requested_send_capacity { 230 // Nothing to do 231 } else if capacity < stream.requested_send_capacity { 232 // Update the target requested capacity 233 stream.requested_send_capacity = capacity; 234 235 // Currently available capacity assigned to the stream 236 let available = stream.send_flow.available().as_size(); 237 238 // If the stream has more assigned capacity than requested, reclaim 239 // some for the connection 240 if available > capacity { 241 let diff = available - capacity; 242 243 stream.send_flow.claim_capacity(diff); 244 245 self.assign_connection_capacity(diff, stream, counts); 246 } 247 } else { 248 // If trying to *add* capacity, but the stream send side is closed, 249 // there's nothing to be done. 250 if stream.state.is_send_closed() { 251 return; 252 } 253 254 // Update the target requested capacity 255 stream.requested_send_capacity = capacity; 256 257 // Try to assign additional capacity to the stream. If none is 258 // currently available, the stream will be queued to receive some 259 // when more becomes available. 260 self.try_assign_capacity(stream); 261 } 262 } 263 recv_stream_window_update( &mut self, inc: WindowSize, stream: &mut store::Ptr, ) -> Result<(), Reason>264 pub fn recv_stream_window_update( 265 &mut self, 266 inc: WindowSize, 267 stream: &mut store::Ptr, 268 ) -> Result<(), Reason> { 269 let span = tracing::trace_span!( 270 "recv_stream_window_update", 271 ?stream.id, 272 ?stream.state, 273 inc, 274 flow = ?stream.send_flow 275 ); 276 let _e = span.enter(); 277 278 if stream.state.is_send_closed() && stream.buffered_send_data == 0 { 279 // We can't send any data, so don't bother doing anything else. 280 return Ok(()); 281 } 282 283 // Update the stream level flow control. 284 stream.send_flow.inc_window(inc)?; 285 286 // If the stream is waiting on additional capacity, then this will 287 // assign it (if available on the connection) and notify the producer 288 self.try_assign_capacity(stream); 289 290 Ok(()) 291 } 292 recv_connection_window_update( &mut self, inc: WindowSize, store: &mut Store, counts: &mut Counts, ) -> Result<(), Reason>293 pub fn recv_connection_window_update( 294 &mut self, 295 inc: WindowSize, 296 store: &mut Store, 297 counts: &mut Counts, 298 ) -> Result<(), Reason> { 299 // Update the connection's window 300 self.flow.inc_window(inc)?; 301 302 self.assign_connection_capacity(inc, store, counts); 303 Ok(()) 304 } 305 306 /// Reclaim all capacity assigned to the stream and re-assign it to the 307 /// connection reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)308 pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 309 let available = stream.send_flow.available().as_size(); 310 stream.send_flow.claim_capacity(available); 311 // Re-assign all capacity to the connection 312 self.assign_connection_capacity(available, stream, counts); 313 } 314 315 /// Reclaim just reserved capacity, not buffered capacity, and re-assign 316 /// it to the connection reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)317 pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 318 // only reclaim requested capacity that isn't already buffered 319 if stream.requested_send_capacity > stream.buffered_send_data { 320 let reserved = stream.requested_send_capacity - stream.buffered_send_data; 321 322 stream.send_flow.claim_capacity(reserved); 323 self.assign_connection_capacity(reserved, stream, counts); 324 } 325 } 326 clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts)327 pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { 328 let span = tracing::trace_span!("clear_pending_capacity"); 329 let _e = span.enter(); 330 while let Some(stream) = self.pending_capacity.pop(store) { 331 counts.transition(stream, |_, stream| { 332 tracing::trace!(?stream.id, "clear_pending_capacity"); 333 }) 334 } 335 } 336 assign_connection_capacity<R>( &mut self, inc: WindowSize, store: &mut R, counts: &mut Counts, ) where R: Resolve,337 pub fn assign_connection_capacity<R>( 338 &mut self, 339 inc: WindowSize, 340 store: &mut R, 341 counts: &mut Counts, 342 ) where 343 R: Resolve, 344 { 345 let span = tracing::trace_span!("assign_connection_capacity", inc); 346 let _e = span.enter(); 347 348 self.flow.assign_capacity(inc); 349 350 // Assign newly acquired capacity to streams pending capacity. 351 while self.flow.available() > 0 { 352 let stream = match self.pending_capacity.pop(store) { 353 Some(stream) => stream, 354 None => return, 355 }; 356 357 // Streams pending capacity may have been reset before capacity 358 // became available. In that case, the stream won't want any 359 // capacity, and so we shouldn't "transition" on it, but just evict 360 // it and continue the loop. 361 if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { 362 continue; 363 } 364 365 counts.transition(stream, |_, mut stream| { 366 // Try to assign capacity to the stream. This will also re-queue the 367 // stream if there isn't enough connection level capacity to fulfill 368 // the capacity request. 369 self.try_assign_capacity(&mut stream); 370 }) 371 } 372 } 373 374 /// Request capacity to send data try_assign_capacity(&mut self, stream: &mut store::Ptr)375 fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { 376 let total_requested = stream.requested_send_capacity; 377 378 // Total requested should never go below actual assigned 379 // (Note: the window size can go lower than assigned) 380 debug_assert!(total_requested >= stream.send_flow.available()); 381 382 // The amount of additional capacity that the stream requests. 383 // Don't assign more than the window has available! 384 let additional = cmp::min( 385 total_requested - stream.send_flow.available().as_size(), 386 // Can't assign more than what is available 387 stream.send_flow.window_size() - stream.send_flow.available().as_size(), 388 ); 389 let span = tracing::trace_span!("try_assign_capacity", ?stream.id); 390 let _e = span.enter(); 391 tracing::trace!( 392 requested = total_requested, 393 additional, 394 buffered = stream.buffered_send_data, 395 window = stream.send_flow.window_size(), 396 conn = %self.flow.available() 397 ); 398 399 if additional == 0 { 400 // Nothing more to do 401 return; 402 } 403 404 // If the stream has requested capacity, then it must be in the 405 // streaming state (more data could be sent) or there is buffered data 406 // waiting to be sent. 407 debug_assert!( 408 stream.state.is_send_streaming() || stream.buffered_send_data > 0, 409 "state={:?}", 410 stream.state 411 ); 412 413 // The amount of currently available capacity on the connection 414 let conn_available = self.flow.available().as_size(); 415 416 // First check if capacity is immediately available 417 if conn_available > 0 { 418 // The amount of capacity to assign to the stream 419 // TODO: Should prioritization factor into this? 420 let assign = cmp::min(conn_available, additional); 421 422 tracing::trace!(capacity = assign, "assigning"); 423 424 // Assign the capacity to the stream 425 stream.assign_capacity(assign); 426 427 // Claim the capacity from the connection 428 self.flow.claim_capacity(assign); 429 } 430 431 tracing::trace!( 432 available = %stream.send_flow.available(), 433 requested = stream.requested_send_capacity, 434 buffered = stream.buffered_send_data, 435 has_unavailable = %stream.send_flow.has_unavailable() 436 ); 437 438 if stream.send_flow.available() < stream.requested_send_capacity 439 && stream.send_flow.has_unavailable() 440 { 441 // The stream requires additional capacity and the stream's 442 // window has available capacity, but the connection window 443 // does not. 444 // 445 // In this case, the stream needs to be queued up for when the 446 // connection has more capacity. 447 self.pending_capacity.push(stream); 448 } 449 450 // If data is buffered and the stream is send ready, then 451 // schedule the stream for execution 452 if stream.buffered_send_data > 0 && stream.is_send_ready() { 453 // TODO: This assertion isn't *exactly* correct. There can still be 454 // buffered send data while the stream's pending send queue is 455 // empty. This can happen when a large data frame is in the process 456 // of being **partially** sent. Once the window has been sent, the 457 // data frame will be returned to the prioritization layer to be 458 // re-scheduled. 459 // 460 // That said, it would be nice to figure out how to make this 461 // assertion correctly. 462 // 463 // debug_assert!(!stream.pending_send.is_empty()); 464 465 self.pending_send.push(stream); 466 } 467 } 468 poll_complete<T, B>( &mut self, cx: &mut Context, buffer: &mut Buffer<Frame<B>>, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,469 pub fn poll_complete<T, B>( 470 &mut self, 471 cx: &mut Context, 472 buffer: &mut Buffer<Frame<B>>, 473 store: &mut Store, 474 counts: &mut Counts, 475 dst: &mut Codec<T, Prioritized<B>>, 476 ) -> Poll<io::Result<()>> 477 where 478 T: AsyncWrite + Unpin, 479 B: Buf, 480 { 481 // Ensure codec is ready 482 ready!(dst.poll_ready(cx))?; 483 484 // Reclaim any frame that has previously been written 485 self.reclaim_frame(buffer, store, dst); 486 487 // The max frame length 488 let max_frame_len = dst.max_send_frame_size(); 489 490 tracing::trace!("poll_complete"); 491 492 loop { 493 self.schedule_pending_open(store, counts); 494 495 match self.pop_frame(buffer, store, max_frame_len, counts) { 496 Some(frame) => { 497 tracing::trace!(?frame, "writing"); 498 499 debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); 500 if let Frame::Data(ref frame) = frame { 501 self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); 502 } 503 dst.buffer(frame).expect("invalid frame"); 504 505 // Ensure the codec is ready to try the loop again. 506 ready!(dst.poll_ready(cx))?; 507 508 // Because, always try to reclaim... 509 self.reclaim_frame(buffer, store, dst); 510 } 511 None => { 512 // Try to flush the codec. 513 ready!(dst.flush(cx))?; 514 515 // This might release a data frame... 516 if !self.reclaim_frame(buffer, store, dst) { 517 return Poll::Ready(Ok(())); 518 } 519 520 // No need to poll ready as poll_complete() does this for 521 // us... 522 } 523 } 524 } 525 } 526 527 /// Tries to reclaim a pending data frame from the codec. 528 /// 529 /// Returns true if a frame was reclaimed. 530 /// 531 /// When a data frame is written to the codec, it may not be written in its 532 /// entirety (large chunks are split up into potentially many data frames). 533 /// In this case, the stream needs to be reprioritized. reclaim_frame<T, B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, dst: &mut Codec<T, Prioritized<B>>, ) -> bool where B: Buf,534 fn reclaim_frame<T, B>( 535 &mut self, 536 buffer: &mut Buffer<Frame<B>>, 537 store: &mut Store, 538 dst: &mut Codec<T, Prioritized<B>>, 539 ) -> bool 540 where 541 B: Buf, 542 { 543 let span = tracing::trace_span!("try_reclaim_frame"); 544 let _e = span.enter(); 545 546 // First check if there are any data chunks to take back 547 if let Some(frame) = dst.take_last_data_frame() { 548 self.reclaim_frame_inner(buffer, store, frame) 549 } else { 550 false 551 } 552 } 553 reclaim_frame_inner<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, frame: frame::Data<Prioritized<B>>, ) -> bool where B: Buf,554 fn reclaim_frame_inner<B>( 555 &mut self, 556 buffer: &mut Buffer<Frame<B>>, 557 store: &mut Store, 558 frame: frame::Data<Prioritized<B>>, 559 ) -> bool 560 where 561 B: Buf, 562 { 563 tracing::trace!( 564 ?frame, 565 sz = frame.payload().inner.get_ref().remaining(), 566 "reclaimed" 567 ); 568 569 let mut eos = false; 570 let key = frame.payload().stream; 571 572 match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { 573 InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), 574 InFlightData::Drop => { 575 tracing::trace!("not reclaiming frame for cancelled stream"); 576 return false; 577 } 578 InFlightData::DataFrame(k) => { 579 debug_assert_eq!(k, key); 580 } 581 } 582 583 let mut frame = frame.map(|prioritized| { 584 // TODO: Ensure fully written 585 eos = prioritized.end_of_stream; 586 prioritized.inner.into_inner() 587 }); 588 589 if frame.payload().has_remaining() { 590 let mut stream = store.resolve(key); 591 592 if eos { 593 frame.set_end_stream(true); 594 } 595 596 self.push_back_frame(frame.into(), buffer, &mut stream); 597 598 return true; 599 } 600 601 false 602 } 603 604 /// Push the frame to the front of the stream's deque, scheduling the 605 /// stream if needed. push_back_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, )606 fn push_back_frame<B>( 607 &mut self, 608 frame: Frame<B>, 609 buffer: &mut Buffer<Frame<B>>, 610 stream: &mut store::Ptr, 611 ) { 612 // Push the frame to the front of the stream's deque 613 stream.pending_send.push_front(buffer, frame); 614 615 // If needed, schedule the sender 616 if stream.send_flow.available() > 0 { 617 debug_assert!(!stream.pending_send.is_empty()); 618 self.pending_send.push(stream); 619 } 620 } 621 clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr)622 pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { 623 let span = tracing::trace_span!("clear_queue", ?stream.id); 624 let _e = span.enter(); 625 626 // TODO: make this more efficient? 627 while let Some(frame) = stream.pending_send.pop_front(buffer) { 628 tracing::trace!(?frame, "dropping"); 629 } 630 631 stream.buffered_send_data = 0; 632 stream.requested_send_capacity = 0; 633 if let InFlightData::DataFrame(key) = self.in_flight_data_frame { 634 if stream.key() == key { 635 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. 636 self.in_flight_data_frame = InFlightData::Drop; 637 } 638 } 639 } 640 clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts)641 pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { 642 while let Some(stream) = self.pending_send.pop(store) { 643 let is_pending_reset = stream.is_pending_reset_expiration(); 644 counts.transition_after(stream, is_pending_reset); 645 } 646 } 647 clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts)648 pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { 649 while let Some(stream) = self.pending_open.pop(store) { 650 let is_pending_reset = stream.is_pending_reset_expiration(); 651 counts.transition_after(stream, is_pending_reset); 652 } 653 } 654 pop_frame<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, max_len: usize, counts: &mut Counts, ) -> Option<Frame<Prioritized<B>>> where B: Buf,655 fn pop_frame<B>( 656 &mut self, 657 buffer: &mut Buffer<Frame<B>>, 658 store: &mut Store, 659 max_len: usize, 660 counts: &mut Counts, 661 ) -> Option<Frame<Prioritized<B>>> 662 where 663 B: Buf, 664 { 665 let span = tracing::trace_span!("pop_frame"); 666 let _e = span.enter(); 667 668 loop { 669 match self.pending_send.pop(store) { 670 Some(mut stream) => { 671 let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); 672 let _e = span.enter(); 673 674 // It's possible that this stream, besides having data to send, 675 // is also queued to send a reset, and thus is already in the queue 676 // to wait for "some time" after a reset. 677 // 678 // To be safe, we just always ask the stream. 679 let is_pending_reset = stream.is_pending_reset_expiration(); 680 681 tracing::trace!(is_pending_reset); 682 683 let frame = match stream.pending_send.pop_front(buffer) { 684 Some(Frame::Data(mut frame)) => { 685 // Get the amount of capacity remaining for stream's 686 // window. 687 let stream_capacity = stream.send_flow.available(); 688 let sz = frame.payload().remaining(); 689 690 tracing::trace!( 691 sz, 692 eos = frame.is_end_stream(), 693 window = %stream_capacity, 694 available = %stream.send_flow.available(), 695 requested = stream.requested_send_capacity, 696 buffered = stream.buffered_send_data, 697 "data frame" 698 ); 699 700 // Zero length data frames always have capacity to 701 // be sent. 702 if sz > 0 && stream_capacity == 0 { 703 tracing::trace!("stream capacity is 0"); 704 705 // Ensure that the stream is waiting for 706 // connection level capacity 707 // 708 // TODO: uncomment 709 // debug_assert!(stream.is_pending_send_capacity); 710 711 // The stream has no more capacity, this can 712 // happen if the remote reduced the stream 713 // window. In this case, we need to buffer the 714 // frame and wait for a window update... 715 stream.pending_send.push_front(buffer, frame.into()); 716 717 continue; 718 } 719 720 // Only send up to the max frame length 721 let len = cmp::min(sz, max_len); 722 723 // Only send up to the stream's window capacity 724 let len = 725 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; 726 727 // There *must* be be enough connection level 728 // capacity at this point. 729 debug_assert!(len <= self.flow.window_size()); 730 731 tracing::trace!(len, "sending data frame"); 732 733 // Update the flow control 734 tracing::trace_span!("updating stream flow").in_scope(|| { 735 stream.send_flow.send_data(len); 736 737 // Decrement the stream's buffered data counter 738 debug_assert!(stream.buffered_send_data >= len); 739 stream.buffered_send_data -= len; 740 stream.requested_send_capacity -= len; 741 742 // Assign the capacity back to the connection that 743 // was just consumed from the stream in the previous 744 // line. 745 self.flow.assign_capacity(len); 746 }); 747 748 let (eos, len) = tracing::trace_span!("updating connection flow") 749 .in_scope(|| { 750 self.flow.send_data(len); 751 752 // Wrap the frame's data payload to ensure that the 753 // correct amount of data gets written. 754 755 let eos = frame.is_end_stream(); 756 let len = len as usize; 757 758 if frame.payload().remaining() > len { 759 frame.set_end_stream(false); 760 } 761 (eos, len) 762 }); 763 764 Frame::Data(frame.map(|buf| Prioritized { 765 inner: buf.take(len), 766 end_of_stream: eos, 767 stream: stream.key(), 768 })) 769 } 770 Some(Frame::PushPromise(pp)) => { 771 let mut pushed = 772 stream.store_mut().find_mut(&pp.promised_id()).unwrap(); 773 pushed.is_pending_push = false; 774 // Transition stream from pending_push to pending_open 775 // if possible 776 if !pushed.pending_send.is_empty() { 777 if counts.can_inc_num_send_streams() { 778 counts.inc_num_send_streams(&mut pushed); 779 self.pending_send.push(&mut pushed); 780 } else { 781 self.queue_open(&mut pushed); 782 } 783 } 784 Frame::PushPromise(pp) 785 } 786 Some(frame) => frame.map(|_| { 787 unreachable!( 788 "Frame::map closure will only be called \ 789 on DATA frames." 790 ) 791 }), 792 None => { 793 if let Some(reason) = stream.state.get_scheduled_reset() { 794 stream.state.set_reset(reason); 795 796 let frame = frame::Reset::new(stream.id, reason); 797 Frame::Reset(frame) 798 } else { 799 // If the stream receives a RESET from the peer, it may have 800 // had data buffered to be sent, but all the frames are cleared 801 // in clear_queue(). Instead of doing O(N) traversal through queue 802 // to remove, lets just ignore the stream here. 803 tracing::trace!("removing dangling stream from pending_send"); 804 // Since this should only happen as a consequence of `clear_queue`, 805 // we must be in a closed state of some kind. 806 debug_assert!(stream.state.is_closed()); 807 counts.transition_after(stream, is_pending_reset); 808 continue; 809 } 810 } 811 }; 812 813 tracing::trace!("pop_frame; frame={:?}", frame); 814 815 if cfg!(debug_assertions) && stream.state.is_idle() { 816 debug_assert!(stream.id > self.last_opened_id); 817 self.last_opened_id = stream.id; 818 } 819 820 if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { 821 // TODO: Only requeue the sender IF it is ready to send 822 // the next frame. i.e. don't requeue it if the next 823 // frame is a data frame and the stream does not have 824 // any more capacity. 825 self.pending_send.push(&mut stream); 826 } 827 828 counts.transition_after(stream, is_pending_reset); 829 830 return Some(frame); 831 } 832 None => return None, 833 } 834 } 835 } 836 schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts)837 fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { 838 tracing::trace!("schedule_pending_open"); 839 // check for any pending open streams 840 while counts.can_inc_num_send_streams() { 841 if let Some(mut stream) = self.pending_open.pop(store) { 842 tracing::trace!("schedule_pending_open; stream={:?}", stream.id); 843 844 counts.inc_num_send_streams(&mut stream); 845 self.pending_send.push(&mut stream); 846 stream.notify_send(); 847 } else { 848 return; 849 } 850 } 851 } 852 } 853 854 // ===== impl Prioritized ===== 855 856 impl<B> Buf for Prioritized<B> 857 where 858 B: Buf, 859 { remaining(&self) -> usize860 fn remaining(&self) -> usize { 861 self.inner.remaining() 862 } 863 chunk(&self) -> &[u8]864 fn chunk(&self) -> &[u8] { 865 self.inner.chunk() 866 } 867 chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize868 fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { 869 self.inner.chunks_vectored(dst) 870 } 871 advance(&mut self, cnt: usize)872 fn advance(&mut self, cnt: usize) { 873 self.inner.advance(cnt) 874 } 875 } 876 877 impl<B: Buf> fmt::Debug for Prioritized<B> { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result878 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 879 fmt.debug_struct("Prioritized") 880 .field("remaining", &self.inner.get_ref().remaining()) 881 .field("end_of_stream", &self.end_of_stream) 882 .field("stream", &self.stream) 883 .finish() 884 } 885 } 886