1 use super::store::Resolve;
2 use super::*;
3 
4 use crate::frame::{Reason, StreamId};
5 
6 use crate::codec::UserError;
7 use crate::codec::UserError::*;
8 
9 use bytes::buf::{Buf, Take};
10 use std::io;
11 use std::task::{Context, Poll, Waker};
12 use std::{cmp, fmt, mem};
13 
14 /// # Warning
15 ///
16 /// Queued streams are ordered by stream ID, as we need to ensure that
17 /// lower-numbered streams are sent headers before higher-numbered ones.
18 /// This is because "idle" stream IDs – those which have been initiated but
19 /// have yet to receive frames – will be implicitly closed on receipt of a
20 /// frame on a higher stream ID. If these queues was not ordered by stream
21 /// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
22 /// idle stream is opened first.
23 #[derive(Debug)]
24 pub(super) struct Prioritize {
25     /// Queue of streams waiting for socket capacity to send a frame.
26     pending_send: store::Queue<stream::NextSend>,
27 
28     /// Queue of streams waiting for window capacity to produce data.
29     pending_capacity: store::Queue<stream::NextSendCapacity>,
30 
31     /// Streams waiting for capacity due to max concurrency
32     ///
33     /// The `SendRequest` handle is `Clone`. This enables initiating requests
34     /// from many tasks. However, offering this capability while supporting
35     /// backpressure at some level is tricky. If there are many `SendRequest`
36     /// handles and a single stream becomes available, which handle gets
37     /// assigned that stream? Maybe that handle is no longer ready to send a
38     /// request.
39     ///
40     /// The strategy used is to allow each `SendRequest` handle one buffered
41     /// request. A `SendRequest` handle is ready to send a request if it has no
42     /// associated buffered requests. This is the same strategy as `mpsc` in the
43     /// futures library.
44     pending_open: store::Queue<stream::NextOpen>,
45 
46     /// Connection level flow control governing sent data
47     flow: FlowControl,
48 
49     /// Stream ID of the last stream opened.
50     last_opened_id: StreamId,
51 
52     /// What `DATA` frame is currently being sent in the codec.
53     in_flight_data_frame: InFlightData,
54 }
55 
56 #[derive(Debug, Eq, PartialEq)]
57 enum InFlightData {
58     /// There is no `DATA` frame in flight.
59     Nothing,
60     /// There is a `DATA` frame in flight belonging to the given stream.
61     DataFrame(store::Key),
62     /// There was a `DATA` frame, but the stream's queue was since cleared.
63     Drop,
64 }
65 
66 pub(crate) struct Prioritized<B> {
67     // The buffer
68     inner: Take<B>,
69 
70     end_of_stream: bool,
71 
72     // The stream that this is associated with
73     stream: store::Key,
74 }
75 
76 // ===== impl Prioritize =====
77 
78 impl Prioritize {
new(config: &Config) -> Prioritize79     pub fn new(config: &Config) -> Prioritize {
80         let mut flow = FlowControl::new();
81 
82         flow.inc_window(config.remote_init_window_sz)
83             .expect("invalid initial window size");
84 
85         flow.assign_capacity(config.remote_init_window_sz);
86 
87         tracing::trace!("Prioritize::new; flow={:?}", flow);
88 
89         Prioritize {
90             pending_send: store::Queue::new(),
91             pending_capacity: store::Queue::new(),
92             pending_open: store::Queue::new(),
93             flow,
94             last_opened_id: StreamId::ZERO,
95             in_flight_data_frame: InFlightData::Nothing,
96         }
97     }
98 
99     /// Queue a frame to be sent to the remote
queue_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, task: &mut Option<Waker>, )100     pub fn queue_frame<B>(
101         &mut self,
102         frame: Frame<B>,
103         buffer: &mut Buffer<Frame<B>>,
104         stream: &mut store::Ptr,
105         task: &mut Option<Waker>,
106     ) {
107         let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
108         let _e = span.enter();
109         // Queue the frame in the buffer
110         stream.pending_send.push_back(buffer, frame);
111         self.schedule_send(stream, task);
112     }
113 
schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)114     pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
115         // If the stream is waiting to be opened, nothing more to do.
116         if stream.is_send_ready() {
117             tracing::trace!(?stream.id, "schedule_send");
118             // Queue the stream
119             self.pending_send.push(stream);
120 
121             // Notify the connection.
122             if let Some(task) = task.take() {
123                 task.wake();
124             }
125         }
126     }
127 
queue_open(&mut self, stream: &mut store::Ptr)128     pub fn queue_open(&mut self, stream: &mut store::Ptr) {
129         self.pending_open.push(stream);
130     }
131 
132     /// Send a data frame
send_data<B>( &mut self, frame: frame::Data<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option<Waker>, ) -> Result<(), UserError> where B: Buf,133     pub fn send_data<B>(
134         &mut self,
135         frame: frame::Data<B>,
136         buffer: &mut Buffer<Frame<B>>,
137         stream: &mut store::Ptr,
138         counts: &mut Counts,
139         task: &mut Option<Waker>,
140     ) -> Result<(), UserError>
141     where
142         B: Buf,
143     {
144         let sz = frame.payload().remaining();
145 
146         if sz > MAX_WINDOW_SIZE as usize {
147             return Err(UserError::PayloadTooBig);
148         }
149 
150         let sz = sz as WindowSize;
151 
152         if !stream.state.is_send_streaming() {
153             if stream.state.is_closed() {
154                 return Err(InactiveStreamId);
155             } else {
156                 return Err(UnexpectedFrameType);
157             }
158         }
159 
160         // Update the buffered data counter
161         stream.buffered_send_data += sz;
162 
163         let span =
164             tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
165         let _e = span.enter();
166         tracing::trace!(buffered = stream.buffered_send_data);
167 
168         // Implicitly request more send capacity if not enough has been
169         // requested yet.
170         if stream.requested_send_capacity < stream.buffered_send_data {
171             // Update the target requested capacity
172             stream.requested_send_capacity = stream.buffered_send_data;
173 
174             self.try_assign_capacity(stream);
175         }
176 
177         if frame.is_end_stream() {
178             stream.state.send_close();
179             self.reserve_capacity(0, stream, counts);
180         }
181 
182         tracing::trace!(
183             available = %stream.send_flow.available(),
184             buffered = stream.buffered_send_data,
185         );
186 
187         // The `stream.buffered_send_data == 0` check is here so that, if a zero
188         // length data frame is queued to the front (there is no previously
189         // queued data), it gets sent out immediately even if there is no
190         // available send window.
191         //
192         // Sending out zero length data frames can be done to signal
193         // end-of-stream.
194         //
195         if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
196             // The stream currently has capacity to send the data frame, so
197             // queue it up and notify the connection task.
198             self.queue_frame(frame.into(), buffer, stream, task);
199         } else {
200             // The stream has no capacity to send the frame now, save it but
201             // don't notify the connection task. Once additional capacity
202             // becomes available, the frame will be flushed.
203             stream.pending_send.push_back(buffer, frame.into());
204         }
205 
206         Ok(())
207     }
208 
209     /// Request capacity to send data
reserve_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, counts: &mut Counts, )210     pub fn reserve_capacity(
211         &mut self,
212         capacity: WindowSize,
213         stream: &mut store::Ptr,
214         counts: &mut Counts,
215     ) {
216         let span = tracing::trace_span!(
217             "reserve_capacity",
218             ?stream.id,
219             requested = capacity,
220             effective = capacity + stream.buffered_send_data,
221             curr = stream.requested_send_capacity
222         );
223         let _e = span.enter();
224 
225         // Actual capacity is `capacity` + the current amount of buffered data.
226         // If it were less, then we could never send out the buffered data.
227         let capacity = capacity + stream.buffered_send_data;
228 
229         if capacity == stream.requested_send_capacity {
230             // Nothing to do
231         } else if capacity < stream.requested_send_capacity {
232             // Update the target requested capacity
233             stream.requested_send_capacity = capacity;
234 
235             // Currently available capacity assigned to the stream
236             let available = stream.send_flow.available().as_size();
237 
238             // If the stream has more assigned capacity than requested, reclaim
239             // some for the connection
240             if available > capacity {
241                 let diff = available - capacity;
242 
243                 stream.send_flow.claim_capacity(diff);
244 
245                 self.assign_connection_capacity(diff, stream, counts);
246             }
247         } else {
248             // If trying to *add* capacity, but the stream send side is closed,
249             // there's nothing to be done.
250             if stream.state.is_send_closed() {
251                 return;
252             }
253 
254             // Update the target requested capacity
255             stream.requested_send_capacity = capacity;
256 
257             // Try to assign additional capacity to the stream. If none is
258             // currently available, the stream will be queued to receive some
259             // when more becomes available.
260             self.try_assign_capacity(stream);
261         }
262     }
263 
recv_stream_window_update( &mut self, inc: WindowSize, stream: &mut store::Ptr, ) -> Result<(), Reason>264     pub fn recv_stream_window_update(
265         &mut self,
266         inc: WindowSize,
267         stream: &mut store::Ptr,
268     ) -> Result<(), Reason> {
269         let span = tracing::trace_span!(
270             "recv_stream_window_update",
271             ?stream.id,
272             ?stream.state,
273             inc,
274             flow = ?stream.send_flow
275         );
276         let _e = span.enter();
277 
278         if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
279             // We can't send any data, so don't bother doing anything else.
280             return Ok(());
281         }
282 
283         // Update the stream level flow control.
284         stream.send_flow.inc_window(inc)?;
285 
286         // If the stream is waiting on additional capacity, then this will
287         // assign it (if available on the connection) and notify the producer
288         self.try_assign_capacity(stream);
289 
290         Ok(())
291     }
292 
recv_connection_window_update( &mut self, inc: WindowSize, store: &mut Store, counts: &mut Counts, ) -> Result<(), Reason>293     pub fn recv_connection_window_update(
294         &mut self,
295         inc: WindowSize,
296         store: &mut Store,
297         counts: &mut Counts,
298     ) -> Result<(), Reason> {
299         // Update the connection's window
300         self.flow.inc_window(inc)?;
301 
302         self.assign_connection_capacity(inc, store, counts);
303         Ok(())
304     }
305 
306     /// Reclaim all capacity assigned to the stream and re-assign it to the
307     /// connection
reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)308     pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
309         let available = stream.send_flow.available().as_size();
310         stream.send_flow.claim_capacity(available);
311         // Re-assign all capacity to the connection
312         self.assign_connection_capacity(available, stream, counts);
313     }
314 
315     /// Reclaim just reserved capacity, not buffered capacity, and re-assign
316     /// it to the connection
reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)317     pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
318         // only reclaim requested capacity that isn't already buffered
319         if stream.requested_send_capacity > stream.buffered_send_data {
320             let reserved = stream.requested_send_capacity - stream.buffered_send_data;
321 
322             stream.send_flow.claim_capacity(reserved);
323             self.assign_connection_capacity(reserved, stream, counts);
324         }
325     }
326 
clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts)327     pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
328         let span = tracing::trace_span!("clear_pending_capacity");
329         let _e = span.enter();
330         while let Some(stream) = self.pending_capacity.pop(store) {
331             counts.transition(stream, |_, stream| {
332                 tracing::trace!(?stream.id, "clear_pending_capacity");
333             })
334         }
335     }
336 
assign_connection_capacity<R>( &mut self, inc: WindowSize, store: &mut R, counts: &mut Counts, ) where R: Resolve,337     pub fn assign_connection_capacity<R>(
338         &mut self,
339         inc: WindowSize,
340         store: &mut R,
341         counts: &mut Counts,
342     ) where
343         R: Resolve,
344     {
345         let span = tracing::trace_span!("assign_connection_capacity", inc);
346         let _e = span.enter();
347 
348         self.flow.assign_capacity(inc);
349 
350         // Assign newly acquired capacity to streams pending capacity.
351         while self.flow.available() > 0 {
352             let stream = match self.pending_capacity.pop(store) {
353                 Some(stream) => stream,
354                 None => return,
355             };
356 
357             // Streams pending capacity may have been reset before capacity
358             // became available. In that case, the stream won't want any
359             // capacity, and so we shouldn't "transition" on it, but just evict
360             // it and continue the loop.
361             if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
362                 continue;
363             }
364 
365             counts.transition(stream, |_, mut stream| {
366                 // Try to assign capacity to the stream. This will also re-queue the
367                 // stream if there isn't enough connection level capacity to fulfill
368                 // the capacity request.
369                 self.try_assign_capacity(&mut stream);
370             })
371         }
372     }
373 
374     /// Request capacity to send data
try_assign_capacity(&mut self, stream: &mut store::Ptr)375     fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
376         let total_requested = stream.requested_send_capacity;
377 
378         // Total requested should never go below actual assigned
379         // (Note: the window size can go lower than assigned)
380         debug_assert!(total_requested >= stream.send_flow.available());
381 
382         // The amount of additional capacity that the stream requests.
383         // Don't assign more than the window has available!
384         let additional = cmp::min(
385             total_requested - stream.send_flow.available().as_size(),
386             // Can't assign more than what is available
387             stream.send_flow.window_size() - stream.send_flow.available().as_size(),
388         );
389         let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
390         let _e = span.enter();
391         tracing::trace!(
392             requested = total_requested,
393             additional,
394             buffered = stream.buffered_send_data,
395             window = stream.send_flow.window_size(),
396             conn = %self.flow.available()
397         );
398 
399         if additional == 0 {
400             // Nothing more to do
401             return;
402         }
403 
404         // If the stream has requested capacity, then it must be in the
405         // streaming state (more data could be sent) or there is buffered data
406         // waiting to be sent.
407         debug_assert!(
408             stream.state.is_send_streaming() || stream.buffered_send_data > 0,
409             "state={:?}",
410             stream.state
411         );
412 
413         // The amount of currently available capacity on the connection
414         let conn_available = self.flow.available().as_size();
415 
416         // First check if capacity is immediately available
417         if conn_available > 0 {
418             // The amount of capacity to assign to the stream
419             // TODO: Should prioritization factor into this?
420             let assign = cmp::min(conn_available, additional);
421 
422             tracing::trace!(capacity = assign, "assigning");
423 
424             // Assign the capacity to the stream
425             stream.assign_capacity(assign);
426 
427             // Claim the capacity from the connection
428             self.flow.claim_capacity(assign);
429         }
430 
431         tracing::trace!(
432             available = %stream.send_flow.available(),
433             requested = stream.requested_send_capacity,
434             buffered = stream.buffered_send_data,
435             has_unavailable = %stream.send_flow.has_unavailable()
436         );
437 
438         if stream.send_flow.available() < stream.requested_send_capacity
439             && stream.send_flow.has_unavailable()
440         {
441             // The stream requires additional capacity and the stream's
442             // window has available capacity, but the connection window
443             // does not.
444             //
445             // In this case, the stream needs to be queued up for when the
446             // connection has more capacity.
447             self.pending_capacity.push(stream);
448         }
449 
450         // If data is buffered and the stream is send ready, then
451         // schedule the stream for execution
452         if stream.buffered_send_data > 0 && stream.is_send_ready() {
453             // TODO: This assertion isn't *exactly* correct. There can still be
454             // buffered send data while the stream's pending send queue is
455             // empty. This can happen when a large data frame is in the process
456             // of being **partially** sent. Once the window has been sent, the
457             // data frame will be returned to the prioritization layer to be
458             // re-scheduled.
459             //
460             // That said, it would be nice to figure out how to make this
461             // assertion correctly.
462             //
463             // debug_assert!(!stream.pending_send.is_empty());
464 
465             self.pending_send.push(stream);
466         }
467     }
468 
poll_complete<T, B>( &mut self, cx: &mut Context, buffer: &mut Buffer<Frame<B>>, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,469     pub fn poll_complete<T, B>(
470         &mut self,
471         cx: &mut Context,
472         buffer: &mut Buffer<Frame<B>>,
473         store: &mut Store,
474         counts: &mut Counts,
475         dst: &mut Codec<T, Prioritized<B>>,
476     ) -> Poll<io::Result<()>>
477     where
478         T: AsyncWrite + Unpin,
479         B: Buf,
480     {
481         // Ensure codec is ready
482         ready!(dst.poll_ready(cx))?;
483 
484         // Reclaim any frame that has previously been written
485         self.reclaim_frame(buffer, store, dst);
486 
487         // The max frame length
488         let max_frame_len = dst.max_send_frame_size();
489 
490         tracing::trace!("poll_complete");
491 
492         loop {
493             self.schedule_pending_open(store, counts);
494 
495             match self.pop_frame(buffer, store, max_frame_len, counts) {
496                 Some(frame) => {
497                     tracing::trace!(?frame, "writing");
498 
499                     debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
500                     if let Frame::Data(ref frame) = frame {
501                         self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
502                     }
503                     dst.buffer(frame).expect("invalid frame");
504 
505                     // Ensure the codec is ready to try the loop again.
506                     ready!(dst.poll_ready(cx))?;
507 
508                     // Because, always try to reclaim...
509                     self.reclaim_frame(buffer, store, dst);
510                 }
511                 None => {
512                     // Try to flush the codec.
513                     ready!(dst.flush(cx))?;
514 
515                     // This might release a data frame...
516                     if !self.reclaim_frame(buffer, store, dst) {
517                         return Poll::Ready(Ok(()));
518                     }
519 
520                     // No need to poll ready as poll_complete() does this for
521                     // us...
522                 }
523             }
524         }
525     }
526 
527     /// Tries to reclaim a pending data frame from the codec.
528     ///
529     /// Returns true if a frame was reclaimed.
530     ///
531     /// When a data frame is written to the codec, it may not be written in its
532     /// entirety (large chunks are split up into potentially many data frames).
533     /// In this case, the stream needs to be reprioritized.
reclaim_frame<T, B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, dst: &mut Codec<T, Prioritized<B>>, ) -> bool where B: Buf,534     fn reclaim_frame<T, B>(
535         &mut self,
536         buffer: &mut Buffer<Frame<B>>,
537         store: &mut Store,
538         dst: &mut Codec<T, Prioritized<B>>,
539     ) -> bool
540     where
541         B: Buf,
542     {
543         let span = tracing::trace_span!("try_reclaim_frame");
544         let _e = span.enter();
545 
546         // First check if there are any data chunks to take back
547         if let Some(frame) = dst.take_last_data_frame() {
548             self.reclaim_frame_inner(buffer, store, frame)
549         } else {
550             false
551         }
552     }
553 
reclaim_frame_inner<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, frame: frame::Data<Prioritized<B>>, ) -> bool where B: Buf,554     fn reclaim_frame_inner<B>(
555         &mut self,
556         buffer: &mut Buffer<Frame<B>>,
557         store: &mut Store,
558         frame: frame::Data<Prioritized<B>>,
559     ) -> bool
560     where
561         B: Buf,
562     {
563         tracing::trace!(
564             ?frame,
565             sz = frame.payload().inner.get_ref().remaining(),
566             "reclaimed"
567         );
568 
569         let mut eos = false;
570         let key = frame.payload().stream;
571 
572         match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
573             InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
574             InFlightData::Drop => {
575                 tracing::trace!("not reclaiming frame for cancelled stream");
576                 return false;
577             }
578             InFlightData::DataFrame(k) => {
579                 debug_assert_eq!(k, key);
580             }
581         }
582 
583         let mut frame = frame.map(|prioritized| {
584             // TODO: Ensure fully written
585             eos = prioritized.end_of_stream;
586             prioritized.inner.into_inner()
587         });
588 
589         if frame.payload().has_remaining() {
590             let mut stream = store.resolve(key);
591 
592             if eos {
593                 frame.set_end_stream(true);
594             }
595 
596             self.push_back_frame(frame.into(), buffer, &mut stream);
597 
598             return true;
599         }
600 
601         false
602     }
603 
604     /// Push the frame to the front of the stream's deque, scheduling the
605     /// stream if needed.
push_back_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, )606     fn push_back_frame<B>(
607         &mut self,
608         frame: Frame<B>,
609         buffer: &mut Buffer<Frame<B>>,
610         stream: &mut store::Ptr,
611     ) {
612         // Push the frame to the front of the stream's deque
613         stream.pending_send.push_front(buffer, frame);
614 
615         // If needed, schedule the sender
616         if stream.send_flow.available() > 0 {
617             debug_assert!(!stream.pending_send.is_empty());
618             self.pending_send.push(stream);
619         }
620     }
621 
clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr)622     pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
623         let span = tracing::trace_span!("clear_queue", ?stream.id);
624         let _e = span.enter();
625 
626         // TODO: make this more efficient?
627         while let Some(frame) = stream.pending_send.pop_front(buffer) {
628             tracing::trace!(?frame, "dropping");
629         }
630 
631         stream.buffered_send_data = 0;
632         stream.requested_send_capacity = 0;
633         if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
634             if stream.key() == key {
635                 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
636                 self.in_flight_data_frame = InFlightData::Drop;
637             }
638         }
639     }
640 
clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts)641     pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
642         while let Some(stream) = self.pending_send.pop(store) {
643             let is_pending_reset = stream.is_pending_reset_expiration();
644             counts.transition_after(stream, is_pending_reset);
645         }
646     }
647 
clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts)648     pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
649         while let Some(stream) = self.pending_open.pop(store) {
650             let is_pending_reset = stream.is_pending_reset_expiration();
651             counts.transition_after(stream, is_pending_reset);
652         }
653     }
654 
pop_frame<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, max_len: usize, counts: &mut Counts, ) -> Option<Frame<Prioritized<B>>> where B: Buf,655     fn pop_frame<B>(
656         &mut self,
657         buffer: &mut Buffer<Frame<B>>,
658         store: &mut Store,
659         max_len: usize,
660         counts: &mut Counts,
661     ) -> Option<Frame<Prioritized<B>>>
662     where
663         B: Buf,
664     {
665         let span = tracing::trace_span!("pop_frame");
666         let _e = span.enter();
667 
668         loop {
669             match self.pending_send.pop(store) {
670                 Some(mut stream) => {
671                     let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
672                     let _e = span.enter();
673 
674                     // It's possible that this stream, besides having data to send,
675                     // is also queued to send a reset, and thus is already in the queue
676                     // to wait for "some time" after a reset.
677                     //
678                     // To be safe, we just always ask the stream.
679                     let is_pending_reset = stream.is_pending_reset_expiration();
680 
681                     tracing::trace!(is_pending_reset);
682 
683                     let frame = match stream.pending_send.pop_front(buffer) {
684                         Some(Frame::Data(mut frame)) => {
685                             // Get the amount of capacity remaining for stream's
686                             // window.
687                             let stream_capacity = stream.send_flow.available();
688                             let sz = frame.payload().remaining();
689 
690                             tracing::trace!(
691                                 sz,
692                                 eos = frame.is_end_stream(),
693                                 window = %stream_capacity,
694                                 available = %stream.send_flow.available(),
695                                 requested = stream.requested_send_capacity,
696                                 buffered = stream.buffered_send_data,
697                                 "data frame"
698                             );
699 
700                             // Zero length data frames always have capacity to
701                             // be sent.
702                             if sz > 0 && stream_capacity == 0 {
703                                 tracing::trace!("stream capacity is 0");
704 
705                                 // Ensure that the stream is waiting for
706                                 // connection level capacity
707                                 //
708                                 // TODO: uncomment
709                                 // debug_assert!(stream.is_pending_send_capacity);
710 
711                                 // The stream has no more capacity, this can
712                                 // happen if the remote reduced the stream
713                                 // window. In this case, we need to buffer the
714                                 // frame and wait for a window update...
715                                 stream.pending_send.push_front(buffer, frame.into());
716 
717                                 continue;
718                             }
719 
720                             // Only send up to the max frame length
721                             let len = cmp::min(sz, max_len);
722 
723                             // Only send up to the stream's window capacity
724                             let len =
725                                 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
726 
727                             // There *must* be be enough connection level
728                             // capacity at this point.
729                             debug_assert!(len <= self.flow.window_size());
730 
731                             tracing::trace!(len, "sending data frame");
732 
733                             // Update the flow control
734                             tracing::trace_span!("updating stream flow").in_scope(|| {
735                                 stream.send_flow.send_data(len);
736 
737                                 // Decrement the stream's buffered data counter
738                                 debug_assert!(stream.buffered_send_data >= len);
739                                 stream.buffered_send_data -= len;
740                                 stream.requested_send_capacity -= len;
741 
742                                 // Assign the capacity back to the connection that
743                                 // was just consumed from the stream in the previous
744                                 // line.
745                                 self.flow.assign_capacity(len);
746                             });
747 
748                             let (eos, len) = tracing::trace_span!("updating connection flow")
749                                 .in_scope(|| {
750                                     self.flow.send_data(len);
751 
752                                     // Wrap the frame's data payload to ensure that the
753                                     // correct amount of data gets written.
754 
755                                     let eos = frame.is_end_stream();
756                                     let len = len as usize;
757 
758                                     if frame.payload().remaining() > len {
759                                         frame.set_end_stream(false);
760                                     }
761                                     (eos, len)
762                                 });
763 
764                             Frame::Data(frame.map(|buf| Prioritized {
765                                 inner: buf.take(len),
766                                 end_of_stream: eos,
767                                 stream: stream.key(),
768                             }))
769                         }
770                         Some(Frame::PushPromise(pp)) => {
771                             let mut pushed =
772                                 stream.store_mut().find_mut(&pp.promised_id()).unwrap();
773                             pushed.is_pending_push = false;
774                             // Transition stream from pending_push to pending_open
775                             // if possible
776                             if !pushed.pending_send.is_empty() {
777                                 if counts.can_inc_num_send_streams() {
778                                     counts.inc_num_send_streams(&mut pushed);
779                                     self.pending_send.push(&mut pushed);
780                                 } else {
781                                     self.queue_open(&mut pushed);
782                                 }
783                             }
784                             Frame::PushPromise(pp)
785                         }
786                         Some(frame) => frame.map(|_| {
787                             unreachable!(
788                                 "Frame::map closure will only be called \
789                                  on DATA frames."
790                             )
791                         }),
792                         None => {
793                             if let Some(reason) = stream.state.get_scheduled_reset() {
794                                 stream.state.set_reset(reason);
795 
796                                 let frame = frame::Reset::new(stream.id, reason);
797                                 Frame::Reset(frame)
798                             } else {
799                                 // If the stream receives a RESET from the peer, it may have
800                                 // had data buffered to be sent, but all the frames are cleared
801                                 // in clear_queue(). Instead of doing O(N) traversal through queue
802                                 // to remove, lets just ignore the stream here.
803                                 tracing::trace!("removing dangling stream from pending_send");
804                                 // Since this should only happen as a consequence of `clear_queue`,
805                                 // we must be in a closed state of some kind.
806                                 debug_assert!(stream.state.is_closed());
807                                 counts.transition_after(stream, is_pending_reset);
808                                 continue;
809                             }
810                         }
811                     };
812 
813                     tracing::trace!("pop_frame; frame={:?}", frame);
814 
815                     if cfg!(debug_assertions) && stream.state.is_idle() {
816                         debug_assert!(stream.id > self.last_opened_id);
817                         self.last_opened_id = stream.id;
818                     }
819 
820                     if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
821                         // TODO: Only requeue the sender IF it is ready to send
822                         // the next frame. i.e. don't requeue it if the next
823                         // frame is a data frame and the stream does not have
824                         // any more capacity.
825                         self.pending_send.push(&mut stream);
826                     }
827 
828                     counts.transition_after(stream, is_pending_reset);
829 
830                     return Some(frame);
831                 }
832                 None => return None,
833             }
834         }
835     }
836 
schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts)837     fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
838         tracing::trace!("schedule_pending_open");
839         // check for any pending open streams
840         while counts.can_inc_num_send_streams() {
841             if let Some(mut stream) = self.pending_open.pop(store) {
842                 tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
843 
844                 counts.inc_num_send_streams(&mut stream);
845                 self.pending_send.push(&mut stream);
846                 stream.notify_send();
847             } else {
848                 return;
849             }
850         }
851     }
852 }
853 
854 // ===== impl Prioritized =====
855 
856 impl<B> Buf for Prioritized<B>
857 where
858     B: Buf,
859 {
remaining(&self) -> usize860     fn remaining(&self) -> usize {
861         self.inner.remaining()
862     }
863 
chunk(&self) -> &[u8]864     fn chunk(&self) -> &[u8] {
865         self.inner.chunk()
866     }
867 
chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize868     fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
869         self.inner.chunks_vectored(dst)
870     }
871 
advance(&mut self, cnt: usize)872     fn advance(&mut self, cnt: usize) {
873         self.inner.advance(cnt)
874     }
875 }
876 
877 impl<B: Buf> fmt::Debug for Prioritized<B> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result878     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
879         fmt.debug_struct("Prioritized")
880             .field("remaining", &self.inner.get_ref().remaining())
881             .field("end_of_stream", &self.end_of_stream)
882             .field("stream", &self.stream)
883             .finish()
884     }
885 }
886