1 use super::store::Resolve; 2 use super::*; 3 4 use crate::frame::{Reason, StreamId}; 5 6 use crate::codec::UserError; 7 use crate::codec::UserError::*; 8 9 use bytes::buf::ext::{BufExt, Take}; 10 use std::io; 11 use std::task::{Context, Poll, Waker}; 12 use std::{cmp, fmt, mem}; 13 14 /// # Warning 15 /// 16 /// Queued streams are ordered by stream ID, as we need to ensure that 17 /// lower-numbered streams are sent headers before higher-numbered ones. 18 /// This is because "idle" stream IDs – those which have been initiated but 19 /// have yet to receive frames – will be implicitly closed on receipt of a 20 /// frame on a higher stream ID. If these queues was not ordered by stream 21 /// IDs, some mechanism would be necessary to ensure that the lowest-numberedh] 22 /// idle stream is opened first. 23 #[derive(Debug)] 24 pub(super) struct Prioritize { 25 /// Queue of streams waiting for socket capacity to send a frame. 26 pending_send: store::Queue<stream::NextSend>, 27 28 /// Queue of streams waiting for window capacity to produce data. 29 pending_capacity: store::Queue<stream::NextSendCapacity>, 30 31 /// Streams waiting for capacity due to max concurrency 32 /// 33 /// The `SendRequest` handle is `Clone`. This enables initiating requests 34 /// from many tasks. However, offering this capability while supporting 35 /// backpressure at some level is tricky. If there are many `SendRequest` 36 /// handles and a single stream becomes available, which handle gets 37 /// assigned that stream? Maybe that handle is no longer ready to send a 38 /// request. 39 /// 40 /// The strategy used is to allow each `SendRequest` handle one buffered 41 /// request. A `SendRequest` handle is ready to send a request if it has no 42 /// associated buffered requests. This is the same strategy as `mpsc` in the 43 /// futures library. 44 pending_open: store::Queue<stream::NextOpen>, 45 46 /// Connection level flow control governing sent data 47 flow: FlowControl, 48 49 /// Stream ID of the last stream opened. 50 last_opened_id: StreamId, 51 52 /// What `DATA` frame is currently being sent in the codec. 53 in_flight_data_frame: InFlightData, 54 } 55 56 #[derive(Debug, Eq, PartialEq)] 57 enum InFlightData { 58 /// There is no `DATA` frame in flight. 59 Nothing, 60 /// There is a `DATA` frame in flight belonging to the given stream. 61 DataFrame(store::Key), 62 /// There was a `DATA` frame, but the stream's queue was since cleared. 63 Drop, 64 } 65 66 pub(crate) struct Prioritized<B> { 67 // The buffer 68 inner: Take<B>, 69 70 end_of_stream: bool, 71 72 // The stream that this is associated with 73 stream: store::Key, 74 } 75 76 // ===== impl Prioritize ===== 77 78 impl Prioritize { new(config: &Config) -> Prioritize79 pub fn new(config: &Config) -> Prioritize { 80 let mut flow = FlowControl::new(); 81 82 flow.inc_window(config.remote_init_window_sz) 83 .expect("invalid initial window size"); 84 85 flow.assign_capacity(config.remote_init_window_sz); 86 87 log::trace!("Prioritize::new; flow={:?}", flow); 88 89 Prioritize { 90 pending_send: store::Queue::new(), 91 pending_capacity: store::Queue::new(), 92 pending_open: store::Queue::new(), 93 flow, 94 last_opened_id: StreamId::ZERO, 95 in_flight_data_frame: InFlightData::Nothing, 96 } 97 } 98 99 /// Queue a frame to be sent to the remote queue_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, task: &mut Option<Waker>, )100 pub fn queue_frame<B>( 101 &mut self, 102 frame: Frame<B>, 103 buffer: &mut Buffer<Frame<B>>, 104 stream: &mut store::Ptr, 105 task: &mut Option<Waker>, 106 ) { 107 // Queue the frame in the buffer 108 stream.pending_send.push_back(buffer, frame); 109 self.schedule_send(stream, task); 110 } 111 schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)112 pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { 113 // If the stream is waiting to be opened, nothing more to do. 114 if stream.is_send_ready() { 115 log::trace!("schedule_send; {:?}", stream.id); 116 // Queue the stream 117 self.pending_send.push(stream); 118 119 // Notify the connection. 120 if let Some(task) = task.take() { 121 task.wake(); 122 } 123 } 124 } 125 queue_open(&mut self, stream: &mut store::Ptr)126 pub fn queue_open(&mut self, stream: &mut store::Ptr) { 127 self.pending_open.push(stream); 128 } 129 130 /// Send a data frame send_data<B>( &mut self, frame: frame::Data<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option<Waker>, ) -> Result<(), UserError> where B: Buf,131 pub fn send_data<B>( 132 &mut self, 133 frame: frame::Data<B>, 134 buffer: &mut Buffer<Frame<B>>, 135 stream: &mut store::Ptr, 136 counts: &mut Counts, 137 task: &mut Option<Waker>, 138 ) -> Result<(), UserError> 139 where 140 B: Buf, 141 { 142 let sz = frame.payload().remaining(); 143 144 if sz > MAX_WINDOW_SIZE as usize { 145 return Err(UserError::PayloadTooBig); 146 } 147 148 let sz = sz as WindowSize; 149 150 if !stream.state.is_send_streaming() { 151 if stream.state.is_closed() { 152 return Err(InactiveStreamId); 153 } else { 154 return Err(UnexpectedFrameType); 155 } 156 } 157 158 // Update the buffered data counter 159 stream.buffered_send_data += sz; 160 161 log::trace!( 162 "send_data; sz={}; buffered={}; requested={}", 163 sz, 164 stream.buffered_send_data, 165 stream.requested_send_capacity 166 ); 167 168 // Implicitly request more send capacity if not enough has been 169 // requested yet. 170 if stream.requested_send_capacity < stream.buffered_send_data { 171 // Update the target requested capacity 172 stream.requested_send_capacity = stream.buffered_send_data; 173 174 self.try_assign_capacity(stream); 175 } 176 177 if frame.is_end_stream() { 178 stream.state.send_close(); 179 self.reserve_capacity(0, stream, counts); 180 } 181 182 log::trace!( 183 "send_data (2); available={}; buffered={}", 184 stream.send_flow.available(), 185 stream.buffered_send_data 186 ); 187 188 // The `stream.buffered_send_data == 0` check is here so that, if a zero 189 // length data frame is queued to the front (there is no previously 190 // queued data), it gets sent out immediately even if there is no 191 // available send window. 192 // 193 // Sending out zero length data frames can be done to signal 194 // end-of-stream. 195 // 196 if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { 197 // The stream currently has capacity to send the data frame, so 198 // queue it up and notify the connection task. 199 self.queue_frame(frame.into(), buffer, stream, task); 200 } else { 201 // The stream has no capacity to send the frame now, save it but 202 // don't notify the connection task. Once additional capacity 203 // becomes available, the frame will be flushed. 204 stream.pending_send.push_back(buffer, frame.into()); 205 } 206 207 Ok(()) 208 } 209 210 /// Request capacity to send data reserve_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, counts: &mut Counts, )211 pub fn reserve_capacity( 212 &mut self, 213 capacity: WindowSize, 214 stream: &mut store::Ptr, 215 counts: &mut Counts, 216 ) { 217 log::trace!( 218 "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}", 219 stream.id, 220 capacity, 221 capacity + stream.buffered_send_data, 222 stream.requested_send_capacity 223 ); 224 225 // Actual capacity is `capacity` + the current amount of buffered data. 226 // If it were less, then we could never send out the buffered data. 227 let capacity = capacity + stream.buffered_send_data; 228 229 if capacity == stream.requested_send_capacity { 230 // Nothing to do 231 } else if capacity < stream.requested_send_capacity { 232 // Update the target requested capacity 233 stream.requested_send_capacity = capacity; 234 235 // Currently available capacity assigned to the stream 236 let available = stream.send_flow.available().as_size(); 237 238 // If the stream has more assigned capacity than requested, reclaim 239 // some for the connection 240 if available > capacity { 241 let diff = available - capacity; 242 243 stream.send_flow.claim_capacity(diff); 244 245 self.assign_connection_capacity(diff, stream, counts); 246 } 247 } else { 248 // If trying to *add* capacity, but the stream send side is closed, 249 // there's nothing to be done. 250 if stream.state.is_send_closed() { 251 return; 252 } 253 254 // Update the target requested capacity 255 stream.requested_send_capacity = capacity; 256 257 // Try to assign additional capacity to the stream. If none is 258 // currently available, the stream will be queued to receive some 259 // when more becomes available. 260 self.try_assign_capacity(stream); 261 } 262 } 263 recv_stream_window_update( &mut self, inc: WindowSize, stream: &mut store::Ptr, ) -> Result<(), Reason>264 pub fn recv_stream_window_update( 265 &mut self, 266 inc: WindowSize, 267 stream: &mut store::Ptr, 268 ) -> Result<(), Reason> { 269 log::trace!( 270 "recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", 271 stream.id, 272 stream.state, 273 inc, 274 stream.send_flow 275 ); 276 277 if stream.state.is_send_closed() && stream.buffered_send_data == 0 { 278 // We can't send any data, so don't bother doing anything else. 279 return Ok(()); 280 } 281 282 // Update the stream level flow control. 283 stream.send_flow.inc_window(inc)?; 284 285 // If the stream is waiting on additional capacity, then this will 286 // assign it (if available on the connection) and notify the producer 287 self.try_assign_capacity(stream); 288 289 Ok(()) 290 } 291 recv_connection_window_update( &mut self, inc: WindowSize, store: &mut Store, counts: &mut Counts, ) -> Result<(), Reason>292 pub fn recv_connection_window_update( 293 &mut self, 294 inc: WindowSize, 295 store: &mut Store, 296 counts: &mut Counts, 297 ) -> Result<(), Reason> { 298 // Update the connection's window 299 self.flow.inc_window(inc)?; 300 301 self.assign_connection_capacity(inc, store, counts); 302 Ok(()) 303 } 304 305 /// Reclaim all capacity assigned to the stream and re-assign it to the 306 /// connection reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)307 pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 308 let available = stream.send_flow.available().as_size(); 309 stream.send_flow.claim_capacity(available); 310 // Re-assign all capacity to the connection 311 self.assign_connection_capacity(available, stream, counts); 312 } 313 314 /// Reclaim just reserved capacity, not buffered capacity, and re-assign 315 /// it to the connection reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)316 pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { 317 // only reclaim requested capacity that isn't already buffered 318 if stream.requested_send_capacity > stream.buffered_send_data { 319 let reserved = stream.requested_send_capacity - stream.buffered_send_data; 320 321 stream.send_flow.claim_capacity(reserved); 322 self.assign_connection_capacity(reserved, stream, counts); 323 } 324 } 325 clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts)326 pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { 327 while let Some(stream) = self.pending_capacity.pop(store) { 328 counts.transition(stream, |_, stream| { 329 log::trace!("clear_pending_capacity; stream={:?}", stream.id); 330 }) 331 } 332 } 333 assign_connection_capacity<R>( &mut self, inc: WindowSize, store: &mut R, counts: &mut Counts, ) where R: Resolve,334 pub fn assign_connection_capacity<R>( 335 &mut self, 336 inc: WindowSize, 337 store: &mut R, 338 counts: &mut Counts, 339 ) where 340 R: Resolve, 341 { 342 log::trace!("assign_connection_capacity; inc={}", inc); 343 344 self.flow.assign_capacity(inc); 345 346 // Assign newly acquired capacity to streams pending capacity. 347 while self.flow.available() > 0 { 348 let stream = match self.pending_capacity.pop(store) { 349 Some(stream) => stream, 350 None => return, 351 }; 352 353 // Streams pending capacity may have been reset before capacity 354 // became available. In that case, the stream won't want any 355 // capacity, and so we shouldn't "transition" on it, but just evict 356 // it and continue the loop. 357 if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { 358 continue; 359 } 360 361 counts.transition(stream, |_, mut stream| { 362 // Try to assign capacity to the stream. This will also re-queue the 363 // stream if there isn't enough connection level capacity to fulfill 364 // the capacity request. 365 self.try_assign_capacity(&mut stream); 366 }) 367 } 368 } 369 370 /// Request capacity to send data try_assign_capacity(&mut self, stream: &mut store::Ptr)371 fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { 372 let total_requested = stream.requested_send_capacity; 373 374 // Total requested should never go below actual assigned 375 // (Note: the window size can go lower than assigned) 376 debug_assert!(total_requested >= stream.send_flow.available()); 377 378 // The amount of additional capacity that the stream requests. 379 // Don't assign more than the window has available! 380 let additional = cmp::min( 381 total_requested - stream.send_flow.available().as_size(), 382 // Can't assign more than what is available 383 stream.send_flow.window_size() - stream.send_flow.available().as_size(), 384 ); 385 386 log::trace!( 387 "try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}", 388 stream.id, 389 total_requested, 390 additional, 391 stream.buffered_send_data, 392 stream.send_flow.window_size(), 393 self.flow.available() 394 ); 395 396 if additional == 0 { 397 // Nothing more to do 398 return; 399 } 400 401 // If the stream has requested capacity, then it must be in the 402 // streaming state (more data could be sent) or there is buffered data 403 // waiting to be sent. 404 debug_assert!( 405 stream.state.is_send_streaming() || stream.buffered_send_data > 0, 406 "state={:?}", 407 stream.state 408 ); 409 410 // The amount of currently available capacity on the connection 411 let conn_available = self.flow.available().as_size(); 412 413 // First check if capacity is immediately available 414 if conn_available > 0 { 415 // The amount of capacity to assign to the stream 416 // TODO: Should prioritization factor into this? 417 let assign = cmp::min(conn_available, additional); 418 419 log::trace!(" assigning; stream={:?}, capacity={}", stream.id, assign,); 420 421 // Assign the capacity to the stream 422 stream.assign_capacity(assign); 423 424 // Claim the capacity from the connection 425 self.flow.claim_capacity(assign); 426 } 427 428 log::trace!( 429 "try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}", 430 stream.send_flow.available(), 431 stream.requested_send_capacity, 432 stream.buffered_send_data, 433 stream.send_flow.has_unavailable() 434 ); 435 436 if stream.send_flow.available() < stream.requested_send_capacity 437 && stream.send_flow.has_unavailable() 438 { 439 // The stream requires additional capacity and the stream's 440 // window has available capacity, but the connection window 441 // does not. 442 // 443 // In this case, the stream needs to be queued up for when the 444 // connection has more capacity. 445 self.pending_capacity.push(stream); 446 } 447 448 // If data is buffered and the stream is send ready, then 449 // schedule the stream for execution 450 if stream.buffered_send_data > 0 && stream.is_send_ready() { 451 // TODO: This assertion isn't *exactly* correct. There can still be 452 // buffered send data while the stream's pending send queue is 453 // empty. This can happen when a large data frame is in the process 454 // of being **partially** sent. Once the window has been sent, the 455 // data frame will be returned to the prioritization layer to be 456 // re-scheduled. 457 // 458 // That said, it would be nice to figure out how to make this 459 // assertion correctly. 460 // 461 // debug_assert!(!stream.pending_send.is_empty()); 462 463 self.pending_send.push(stream); 464 } 465 } 466 poll_complete<T, B>( &mut self, cx: &mut Context, buffer: &mut Buffer<Frame<B>>, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,467 pub fn poll_complete<T, B>( 468 &mut self, 469 cx: &mut Context, 470 buffer: &mut Buffer<Frame<B>>, 471 store: &mut Store, 472 counts: &mut Counts, 473 dst: &mut Codec<T, Prioritized<B>>, 474 ) -> Poll<io::Result<()>> 475 where 476 T: AsyncWrite + Unpin, 477 B: Buf, 478 { 479 // Ensure codec is ready 480 ready!(dst.poll_ready(cx))?; 481 482 // Reclaim any frame that has previously been written 483 self.reclaim_frame(buffer, store, dst); 484 485 // The max frame length 486 let max_frame_len = dst.max_send_frame_size(); 487 488 log::trace!("poll_complete"); 489 490 loop { 491 self.schedule_pending_open(store, counts); 492 493 match self.pop_frame(buffer, store, max_frame_len, counts) { 494 Some(frame) => { 495 log::trace!("writing frame={:?}", frame); 496 497 debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); 498 if let Frame::Data(ref frame) = frame { 499 self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); 500 } 501 dst.buffer(frame).expect("invalid frame"); 502 503 // Ensure the codec is ready to try the loop again. 504 ready!(dst.poll_ready(cx))?; 505 506 // Because, always try to reclaim... 507 self.reclaim_frame(buffer, store, dst); 508 } 509 None => { 510 // Try to flush the codec. 511 ready!(dst.flush(cx))?; 512 513 // This might release a data frame... 514 if !self.reclaim_frame(buffer, store, dst) { 515 return Poll::Ready(Ok(())); 516 } 517 518 // No need to poll ready as poll_complete() does this for 519 // us... 520 } 521 } 522 } 523 } 524 525 /// Tries to reclaim a pending data frame from the codec. 526 /// 527 /// Returns true if a frame was reclaimed. 528 /// 529 /// When a data frame is written to the codec, it may not be written in its 530 /// entirety (large chunks are split up into potentially many data frames). 531 /// In this case, the stream needs to be reprioritized. reclaim_frame<T, B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, dst: &mut Codec<T, Prioritized<B>>, ) -> bool where B: Buf,532 fn reclaim_frame<T, B>( 533 &mut self, 534 buffer: &mut Buffer<Frame<B>>, 535 store: &mut Store, 536 dst: &mut Codec<T, Prioritized<B>>, 537 ) -> bool 538 where 539 B: Buf, 540 { 541 log::trace!("try reclaim frame"); 542 543 // First check if there are any data chunks to take back 544 if let Some(frame) = dst.take_last_data_frame() { 545 log::trace!( 546 " -> reclaimed; frame={:?}; sz={}", 547 frame, 548 frame.payload().inner.get_ref().remaining() 549 ); 550 551 let mut eos = false; 552 let key = frame.payload().stream; 553 554 match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { 555 InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), 556 InFlightData::Drop => { 557 log::trace!("not reclaiming frame for cancelled stream"); 558 return false; 559 } 560 InFlightData::DataFrame(k) => { 561 debug_assert_eq!(k, key); 562 } 563 } 564 565 let mut frame = frame.map(|prioritized| { 566 // TODO: Ensure fully written 567 eos = prioritized.end_of_stream; 568 prioritized.inner.into_inner() 569 }); 570 571 if frame.payload().has_remaining() { 572 let mut stream = store.resolve(key); 573 574 if eos { 575 frame.set_end_stream(true); 576 } 577 578 self.push_back_frame(frame.into(), buffer, &mut stream); 579 580 return true; 581 } 582 } 583 584 false 585 } 586 587 /// Push the frame to the front of the stream's deque, scheduling the 588 /// stream if needed. push_back_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, )589 fn push_back_frame<B>( 590 &mut self, 591 frame: Frame<B>, 592 buffer: &mut Buffer<Frame<B>>, 593 stream: &mut store::Ptr, 594 ) { 595 // Push the frame to the front of the stream's deque 596 stream.pending_send.push_front(buffer, frame); 597 598 // If needed, schedule the sender 599 if stream.send_flow.available() > 0 { 600 debug_assert!(!stream.pending_send.is_empty()); 601 self.pending_send.push(stream); 602 } 603 } 604 clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr)605 pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { 606 log::trace!("clear_queue; stream={:?}", stream.id); 607 608 // TODO: make this more efficient? 609 while let Some(frame) = stream.pending_send.pop_front(buffer) { 610 log::trace!("dropping; frame={:?}", frame); 611 } 612 613 stream.buffered_send_data = 0; 614 stream.requested_send_capacity = 0; 615 if let InFlightData::DataFrame(key) = self.in_flight_data_frame { 616 if stream.key() == key { 617 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. 618 self.in_flight_data_frame = InFlightData::Drop; 619 } 620 } 621 } 622 clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts)623 pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { 624 while let Some(stream) = self.pending_send.pop(store) { 625 let is_pending_reset = stream.is_pending_reset_expiration(); 626 counts.transition_after(stream, is_pending_reset); 627 } 628 } 629 clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts)630 pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { 631 while let Some(stream) = self.pending_open.pop(store) { 632 let is_pending_reset = stream.is_pending_reset_expiration(); 633 counts.transition_after(stream, is_pending_reset); 634 } 635 } 636 pop_frame<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, max_len: usize, counts: &mut Counts, ) -> Option<Frame<Prioritized<B>>> where B: Buf,637 fn pop_frame<B>( 638 &mut self, 639 buffer: &mut Buffer<Frame<B>>, 640 store: &mut Store, 641 max_len: usize, 642 counts: &mut Counts, 643 ) -> Option<Frame<Prioritized<B>>> 644 where 645 B: Buf, 646 { 647 log::trace!("pop_frame"); 648 649 loop { 650 match self.pending_send.pop(store) { 651 Some(mut stream) => { 652 log::trace!( 653 "pop_frame; stream={:?}; stream.state={:?}", 654 stream.id, 655 stream.state 656 ); 657 658 // It's possible that this stream, besides having data to send, 659 // is also queued to send a reset, and thus is already in the queue 660 // to wait for "some time" after a reset. 661 // 662 // To be safe, we just always ask the stream. 663 let is_pending_reset = stream.is_pending_reset_expiration(); 664 665 log::trace!( 666 " --> stream={:?}; is_pending_reset={:?};", 667 stream.id, 668 is_pending_reset 669 ); 670 671 let frame = match stream.pending_send.pop_front(buffer) { 672 Some(Frame::Data(mut frame)) => { 673 // Get the amount of capacity remaining for stream's 674 // window. 675 let stream_capacity = stream.send_flow.available(); 676 let sz = frame.payload().remaining(); 677 678 log::trace!( 679 " --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \ 680 available={}; requested={}; buffered={};", 681 frame.stream_id(), 682 sz, 683 frame.is_end_stream(), 684 stream_capacity, 685 stream.send_flow.available(), 686 stream.requested_send_capacity, 687 stream.buffered_send_data, 688 ); 689 690 // Zero length data frames always have capacity to 691 // be sent. 692 if sz > 0 && stream_capacity == 0 { 693 log::trace!( 694 " --> stream capacity is 0; requested={}", 695 stream.requested_send_capacity 696 ); 697 698 // Ensure that the stream is waiting for 699 // connection level capacity 700 // 701 // TODO: uncomment 702 // debug_assert!(stream.is_pending_send_capacity); 703 704 // The stream has no more capacity, this can 705 // happen if the remote reduced the stream 706 // window. In this case, we need to buffer the 707 // frame and wait for a window update... 708 stream.pending_send.push_front(buffer, frame.into()); 709 710 continue; 711 } 712 713 // Only send up to the max frame length 714 let len = cmp::min(sz, max_len); 715 716 // Only send up to the stream's window capacity 717 let len = 718 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; 719 720 // There *must* be be enough connection level 721 // capacity at this point. 722 debug_assert!(len <= self.flow.window_size()); 723 724 log::trace!(" --> sending data frame; len={}", len); 725 726 // Update the flow control 727 log::trace!(" -- updating stream flow --"); 728 stream.send_flow.send_data(len); 729 730 // Decrement the stream's buffered data counter 731 debug_assert!(stream.buffered_send_data >= len); 732 stream.buffered_send_data -= len; 733 stream.requested_send_capacity -= len; 734 735 // Assign the capacity back to the connection that 736 // was just consumed from the stream in the previous 737 // line. 738 self.flow.assign_capacity(len); 739 740 log::trace!(" -- updating connection flow --"); 741 self.flow.send_data(len); 742 743 // Wrap the frame's data payload to ensure that the 744 // correct amount of data gets written. 745 746 let eos = frame.is_end_stream(); 747 let len = len as usize; 748 749 if frame.payload().remaining() > len { 750 frame.set_end_stream(false); 751 } 752 753 Frame::Data(frame.map(|buf| Prioritized { 754 inner: buf.take(len), 755 end_of_stream: eos, 756 stream: stream.key(), 757 })) 758 } 759 Some(Frame::PushPromise(pp)) => { 760 let mut pushed = 761 stream.store_mut().find_mut(&pp.promised_id()).unwrap(); 762 pushed.is_pending_push = false; 763 // Transition stream from pending_push to pending_open 764 // if possible 765 if !pushed.pending_send.is_empty() { 766 if counts.can_inc_num_send_streams() { 767 counts.inc_num_send_streams(&mut pushed); 768 self.pending_send.push(&mut pushed); 769 } else { 770 self.queue_open(&mut pushed); 771 } 772 } 773 Frame::PushPromise(pp) 774 } 775 Some(frame) => frame.map(|_| { 776 unreachable!( 777 "Frame::map closure will only be called \ 778 on DATA frames." 779 ) 780 }), 781 None => { 782 if let Some(reason) = stream.state.get_scheduled_reset() { 783 stream.state.set_reset(reason); 784 785 let frame = frame::Reset::new(stream.id, reason); 786 Frame::Reset(frame) 787 } else { 788 // If the stream receives a RESET from the peer, it may have 789 // had data buffered to be sent, but all the frames are cleared 790 // in clear_queue(). Instead of doing O(N) traversal through queue 791 // to remove, lets just ignore the stream here. 792 log::trace!("removing dangling stream from pending_send"); 793 // Since this should only happen as a consequence of `clear_queue`, 794 // we must be in a closed state of some kind. 795 debug_assert!(stream.state.is_closed()); 796 counts.transition_after(stream, is_pending_reset); 797 continue; 798 } 799 } 800 }; 801 802 log::trace!("pop_frame; frame={:?}", frame); 803 804 if cfg!(debug_assertions) && stream.state.is_idle() { 805 debug_assert!(stream.id > self.last_opened_id); 806 self.last_opened_id = stream.id; 807 } 808 809 if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { 810 // TODO: Only requeue the sender IF it is ready to send 811 // the next frame. i.e. don't requeue it if the next 812 // frame is a data frame and the stream does not have 813 // any more capacity. 814 self.pending_send.push(&mut stream); 815 } 816 817 counts.transition_after(stream, is_pending_reset); 818 819 return Some(frame); 820 } 821 None => return None, 822 } 823 } 824 } 825 schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts)826 fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { 827 log::trace!("schedule_pending_open"); 828 // check for any pending open streams 829 while counts.can_inc_num_send_streams() { 830 if let Some(mut stream) = self.pending_open.pop(store) { 831 log::trace!("schedule_pending_open; stream={:?}", stream.id); 832 833 counts.inc_num_send_streams(&mut stream); 834 self.pending_send.push(&mut stream); 835 stream.notify_send(); 836 } else { 837 return; 838 } 839 } 840 } 841 } 842 843 // ===== impl Prioritized ===== 844 845 impl<B> Buf for Prioritized<B> 846 where 847 B: Buf, 848 { remaining(&self) -> usize849 fn remaining(&self) -> usize { 850 self.inner.remaining() 851 } 852 bytes(&self) -> &[u8]853 fn bytes(&self) -> &[u8] { 854 self.inner.bytes() 855 } 856 advance(&mut self, cnt: usize)857 fn advance(&mut self, cnt: usize) { 858 self.inner.advance(cnt) 859 } 860 } 861 862 impl<B: Buf> fmt::Debug for Prioritized<B> { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result863 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 864 fmt.debug_struct("Prioritized") 865 .field("remaining", &self.inner.get_ref().remaining()) 866 .field("end_of_stream", &self.end_of_stream) 867 .field("stream", &self.stream) 868 .finish() 869 } 870 } 871