1 use super::store::Resolve;
2 use super::*;
3 
4 use crate::frame::{Reason, StreamId};
5 
6 use crate::codec::UserError;
7 use crate::codec::UserError::*;
8 
9 use bytes::buf::ext::{BufExt, Take};
10 use std::io;
11 use std::task::{Context, Poll, Waker};
12 use std::{cmp, fmt, mem};
13 
14 /// # Warning
15 ///
16 /// Queued streams are ordered by stream ID, as we need to ensure that
17 /// lower-numbered streams are sent headers before higher-numbered ones.
18 /// This is because "idle" stream IDs – those which have been initiated but
19 /// have yet to receive frames – will be implicitly closed on receipt of a
20 /// frame on a higher stream ID. If these queues was not ordered by stream
21 /// IDs, some mechanism would be necessary to ensure that the lowest-numberedh]
22 /// idle stream is opened first.
23 #[derive(Debug)]
24 pub(super) struct Prioritize {
25     /// Queue of streams waiting for socket capacity to send a frame.
26     pending_send: store::Queue<stream::NextSend>,
27 
28     /// Queue of streams waiting for window capacity to produce data.
29     pending_capacity: store::Queue<stream::NextSendCapacity>,
30 
31     /// Streams waiting for capacity due to max concurrency
32     ///
33     /// The `SendRequest` handle is `Clone`. This enables initiating requests
34     /// from many tasks. However, offering this capability while supporting
35     /// backpressure at some level is tricky. If there are many `SendRequest`
36     /// handles and a single stream becomes available, which handle gets
37     /// assigned that stream? Maybe that handle is no longer ready to send a
38     /// request.
39     ///
40     /// The strategy used is to allow each `SendRequest` handle one buffered
41     /// request. A `SendRequest` handle is ready to send a request if it has no
42     /// associated buffered requests. This is the same strategy as `mpsc` in the
43     /// futures library.
44     pending_open: store::Queue<stream::NextOpen>,
45 
46     /// Connection level flow control governing sent data
47     flow: FlowControl,
48 
49     /// Stream ID of the last stream opened.
50     last_opened_id: StreamId,
51 
52     /// What `DATA` frame is currently being sent in the codec.
53     in_flight_data_frame: InFlightData,
54 }
55 
56 #[derive(Debug, Eq, PartialEq)]
57 enum InFlightData {
58     /// There is no `DATA` frame in flight.
59     Nothing,
60     /// There is a `DATA` frame in flight belonging to the given stream.
61     DataFrame(store::Key),
62     /// There was a `DATA` frame, but the stream's queue was since cleared.
63     Drop,
64 }
65 
66 pub(crate) struct Prioritized<B> {
67     // The buffer
68     inner: Take<B>,
69 
70     end_of_stream: bool,
71 
72     // The stream that this is associated with
73     stream: store::Key,
74 }
75 
76 // ===== impl Prioritize =====
77 
78 impl Prioritize {
new(config: &Config) -> Prioritize79     pub fn new(config: &Config) -> Prioritize {
80         let mut flow = FlowControl::new();
81 
82         flow.inc_window(config.remote_init_window_sz)
83             .expect("invalid initial window size");
84 
85         flow.assign_capacity(config.remote_init_window_sz);
86 
87         log::trace!("Prioritize::new; flow={:?}", flow);
88 
89         Prioritize {
90             pending_send: store::Queue::new(),
91             pending_capacity: store::Queue::new(),
92             pending_open: store::Queue::new(),
93             flow,
94             last_opened_id: StreamId::ZERO,
95             in_flight_data_frame: InFlightData::Nothing,
96         }
97     }
98 
99     /// Queue a frame to be sent to the remote
queue_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, task: &mut Option<Waker>, )100     pub fn queue_frame<B>(
101         &mut self,
102         frame: Frame<B>,
103         buffer: &mut Buffer<Frame<B>>,
104         stream: &mut store::Ptr,
105         task: &mut Option<Waker>,
106     ) {
107         // Queue the frame in the buffer
108         stream.pending_send.push_back(buffer, frame);
109         self.schedule_send(stream, task);
110     }
111 
schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>)112     pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
113         // If the stream is waiting to be opened, nothing more to do.
114         if stream.is_send_ready() {
115             log::trace!("schedule_send; {:?}", stream.id);
116             // Queue the stream
117             self.pending_send.push(stream);
118 
119             // Notify the connection.
120             if let Some(task) = task.take() {
121                 task.wake();
122             }
123         }
124     }
125 
queue_open(&mut self, stream: &mut store::Ptr)126     pub fn queue_open(&mut self, stream: &mut store::Ptr) {
127         self.pending_open.push(stream);
128     }
129 
130     /// Send a data frame
send_data<B>( &mut self, frame: frame::Data<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option<Waker>, ) -> Result<(), UserError> where B: Buf,131     pub fn send_data<B>(
132         &mut self,
133         frame: frame::Data<B>,
134         buffer: &mut Buffer<Frame<B>>,
135         stream: &mut store::Ptr,
136         counts: &mut Counts,
137         task: &mut Option<Waker>,
138     ) -> Result<(), UserError>
139     where
140         B: Buf,
141     {
142         let sz = frame.payload().remaining();
143 
144         if sz > MAX_WINDOW_SIZE as usize {
145             return Err(UserError::PayloadTooBig);
146         }
147 
148         let sz = sz as WindowSize;
149 
150         if !stream.state.is_send_streaming() {
151             if stream.state.is_closed() {
152                 return Err(InactiveStreamId);
153             } else {
154                 return Err(UnexpectedFrameType);
155             }
156         }
157 
158         // Update the buffered data counter
159         stream.buffered_send_data += sz;
160 
161         log::trace!(
162             "send_data; sz={}; buffered={}; requested={}",
163             sz,
164             stream.buffered_send_data,
165             stream.requested_send_capacity
166         );
167 
168         // Implicitly request more send capacity if not enough has been
169         // requested yet.
170         if stream.requested_send_capacity < stream.buffered_send_data {
171             // Update the target requested capacity
172             stream.requested_send_capacity = stream.buffered_send_data;
173 
174             self.try_assign_capacity(stream);
175         }
176 
177         if frame.is_end_stream() {
178             stream.state.send_close();
179             self.reserve_capacity(0, stream, counts);
180         }
181 
182         log::trace!(
183             "send_data (2); available={}; buffered={}",
184             stream.send_flow.available(),
185             stream.buffered_send_data
186         );
187 
188         // The `stream.buffered_send_data == 0` check is here so that, if a zero
189         // length data frame is queued to the front (there is no previously
190         // queued data), it gets sent out immediately even if there is no
191         // available send window.
192         //
193         // Sending out zero length data frames can be done to signal
194         // end-of-stream.
195         //
196         if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
197             // The stream currently has capacity to send the data frame, so
198             // queue it up and notify the connection task.
199             self.queue_frame(frame.into(), buffer, stream, task);
200         } else {
201             // The stream has no capacity to send the frame now, save it but
202             // don't notify the connection task. Once additional capacity
203             // becomes available, the frame will be flushed.
204             stream.pending_send.push_back(buffer, frame.into());
205         }
206 
207         Ok(())
208     }
209 
210     /// Request capacity to send data
reserve_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, counts: &mut Counts, )211     pub fn reserve_capacity(
212         &mut self,
213         capacity: WindowSize,
214         stream: &mut store::Ptr,
215         counts: &mut Counts,
216     ) {
217         log::trace!(
218             "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
219             stream.id,
220             capacity,
221             capacity + stream.buffered_send_data,
222             stream.requested_send_capacity
223         );
224 
225         // Actual capacity is `capacity` + the current amount of buffered data.
226         // If it were less, then we could never send out the buffered data.
227         let capacity = capacity + stream.buffered_send_data;
228 
229         if capacity == stream.requested_send_capacity {
230             // Nothing to do
231         } else if capacity < stream.requested_send_capacity {
232             // Update the target requested capacity
233             stream.requested_send_capacity = capacity;
234 
235             // Currently available capacity assigned to the stream
236             let available = stream.send_flow.available().as_size();
237 
238             // If the stream has more assigned capacity than requested, reclaim
239             // some for the connection
240             if available > capacity {
241                 let diff = available - capacity;
242 
243                 stream.send_flow.claim_capacity(diff);
244 
245                 self.assign_connection_capacity(diff, stream, counts);
246             }
247         } else {
248             // If trying to *add* capacity, but the stream send side is closed,
249             // there's nothing to be done.
250             if stream.state.is_send_closed() {
251                 return;
252             }
253 
254             // Update the target requested capacity
255             stream.requested_send_capacity = capacity;
256 
257             // Try to assign additional capacity to the stream. If none is
258             // currently available, the stream will be queued to receive some
259             // when more becomes available.
260             self.try_assign_capacity(stream);
261         }
262     }
263 
recv_stream_window_update( &mut self, inc: WindowSize, stream: &mut store::Ptr, ) -> Result<(), Reason>264     pub fn recv_stream_window_update(
265         &mut self,
266         inc: WindowSize,
267         stream: &mut store::Ptr,
268     ) -> Result<(), Reason> {
269         log::trace!(
270             "recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
271             stream.id,
272             stream.state,
273             inc,
274             stream.send_flow
275         );
276 
277         if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
278             // We can't send any data, so don't bother doing anything else.
279             return Ok(());
280         }
281 
282         // Update the stream level flow control.
283         stream.send_flow.inc_window(inc)?;
284 
285         // If the stream is waiting on additional capacity, then this will
286         // assign it (if available on the connection) and notify the producer
287         self.try_assign_capacity(stream);
288 
289         Ok(())
290     }
291 
recv_connection_window_update( &mut self, inc: WindowSize, store: &mut Store, counts: &mut Counts, ) -> Result<(), Reason>292     pub fn recv_connection_window_update(
293         &mut self,
294         inc: WindowSize,
295         store: &mut Store,
296         counts: &mut Counts,
297     ) -> Result<(), Reason> {
298         // Update the connection's window
299         self.flow.inc_window(inc)?;
300 
301         self.assign_connection_capacity(inc, store, counts);
302         Ok(())
303     }
304 
305     /// Reclaim all capacity assigned to the stream and re-assign it to the
306     /// connection
reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)307     pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
308         let available = stream.send_flow.available().as_size();
309         stream.send_flow.claim_capacity(available);
310         // Re-assign all capacity to the connection
311         self.assign_connection_capacity(available, stream, counts);
312     }
313 
314     /// Reclaim just reserved capacity, not buffered capacity, and re-assign
315     /// it to the connection
reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts)316     pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
317         // only reclaim requested capacity that isn't already buffered
318         if stream.requested_send_capacity > stream.buffered_send_data {
319             let reserved = stream.requested_send_capacity - stream.buffered_send_data;
320 
321             stream.send_flow.claim_capacity(reserved);
322             self.assign_connection_capacity(reserved, stream, counts);
323         }
324     }
325 
clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts)326     pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
327         while let Some(stream) = self.pending_capacity.pop(store) {
328             counts.transition(stream, |_, stream| {
329                 log::trace!("clear_pending_capacity; stream={:?}", stream.id);
330             })
331         }
332     }
333 
assign_connection_capacity<R>( &mut self, inc: WindowSize, store: &mut R, counts: &mut Counts, ) where R: Resolve,334     pub fn assign_connection_capacity<R>(
335         &mut self,
336         inc: WindowSize,
337         store: &mut R,
338         counts: &mut Counts,
339     ) where
340         R: Resolve,
341     {
342         log::trace!("assign_connection_capacity; inc={}", inc);
343 
344         self.flow.assign_capacity(inc);
345 
346         // Assign newly acquired capacity to streams pending capacity.
347         while self.flow.available() > 0 {
348             let stream = match self.pending_capacity.pop(store) {
349                 Some(stream) => stream,
350                 None => return,
351             };
352 
353             // Streams pending capacity may have been reset before capacity
354             // became available. In that case, the stream won't want any
355             // capacity, and so we shouldn't "transition" on it, but just evict
356             // it and continue the loop.
357             if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
358                 continue;
359             }
360 
361             counts.transition(stream, |_, mut stream| {
362                 // Try to assign capacity to the stream. This will also re-queue the
363                 // stream if there isn't enough connection level capacity to fulfill
364                 // the capacity request.
365                 self.try_assign_capacity(&mut stream);
366             })
367         }
368     }
369 
370     /// Request capacity to send data
try_assign_capacity(&mut self, stream: &mut store::Ptr)371     fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
372         let total_requested = stream.requested_send_capacity;
373 
374         // Total requested should never go below actual assigned
375         // (Note: the window size can go lower than assigned)
376         debug_assert!(total_requested >= stream.send_flow.available());
377 
378         // The amount of additional capacity that the stream requests.
379         // Don't assign more than the window has available!
380         let additional = cmp::min(
381             total_requested - stream.send_flow.available().as_size(),
382             // Can't assign more than what is available
383             stream.send_flow.window_size() - stream.send_flow.available().as_size(),
384         );
385 
386         log::trace!(
387             "try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}",
388             stream.id,
389             total_requested,
390             additional,
391             stream.buffered_send_data,
392             stream.send_flow.window_size(),
393             self.flow.available()
394         );
395 
396         if additional == 0 {
397             // Nothing more to do
398             return;
399         }
400 
401         // If the stream has requested capacity, then it must be in the
402         // streaming state (more data could be sent) or there is buffered data
403         // waiting to be sent.
404         debug_assert!(
405             stream.state.is_send_streaming() || stream.buffered_send_data > 0,
406             "state={:?}",
407             stream.state
408         );
409 
410         // The amount of currently available capacity on the connection
411         let conn_available = self.flow.available().as_size();
412 
413         // First check if capacity is immediately available
414         if conn_available > 0 {
415             // The amount of capacity to assign to the stream
416             // TODO: Should prioritization factor into this?
417             let assign = cmp::min(conn_available, additional);
418 
419             log::trace!("  assigning; stream={:?}, capacity={}", stream.id, assign,);
420 
421             // Assign the capacity to the stream
422             stream.assign_capacity(assign);
423 
424             // Claim the capacity from the connection
425             self.flow.claim_capacity(assign);
426         }
427 
428         log::trace!(
429             "try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}",
430             stream.send_flow.available(),
431             stream.requested_send_capacity,
432             stream.buffered_send_data,
433             stream.send_flow.has_unavailable()
434         );
435 
436         if stream.send_flow.available() < stream.requested_send_capacity
437             && stream.send_flow.has_unavailable()
438         {
439             // The stream requires additional capacity and the stream's
440             // window has available capacity, but the connection window
441             // does not.
442             //
443             // In this case, the stream needs to be queued up for when the
444             // connection has more capacity.
445             self.pending_capacity.push(stream);
446         }
447 
448         // If data is buffered and the stream is send ready, then
449         // schedule the stream for execution
450         if stream.buffered_send_data > 0 && stream.is_send_ready() {
451             // TODO: This assertion isn't *exactly* correct. There can still be
452             // buffered send data while the stream's pending send queue is
453             // empty. This can happen when a large data frame is in the process
454             // of being **partially** sent. Once the window has been sent, the
455             // data frame will be returned to the prioritization layer to be
456             // re-scheduled.
457             //
458             // That said, it would be nice to figure out how to make this
459             // assertion correctly.
460             //
461             // debug_assert!(!stream.pending_send.is_empty());
462 
463             self.pending_send.push(stream);
464         }
465     }
466 
poll_complete<T, B>( &mut self, cx: &mut Context, buffer: &mut Buffer<Frame<B>>, store: &mut Store, counts: &mut Counts, dst: &mut Codec<T, Prioritized<B>>, ) -> Poll<io::Result<()>> where T: AsyncWrite + Unpin, B: Buf,467     pub fn poll_complete<T, B>(
468         &mut self,
469         cx: &mut Context,
470         buffer: &mut Buffer<Frame<B>>,
471         store: &mut Store,
472         counts: &mut Counts,
473         dst: &mut Codec<T, Prioritized<B>>,
474     ) -> Poll<io::Result<()>>
475     where
476         T: AsyncWrite + Unpin,
477         B: Buf,
478     {
479         // Ensure codec is ready
480         ready!(dst.poll_ready(cx))?;
481 
482         // Reclaim any frame that has previously been written
483         self.reclaim_frame(buffer, store, dst);
484 
485         // The max frame length
486         let max_frame_len = dst.max_send_frame_size();
487 
488         log::trace!("poll_complete");
489 
490         loop {
491             self.schedule_pending_open(store, counts);
492 
493             match self.pop_frame(buffer, store, max_frame_len, counts) {
494                 Some(frame) => {
495                     log::trace!("writing frame={:?}", frame);
496 
497                     debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
498                     if let Frame::Data(ref frame) = frame {
499                         self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
500                     }
501                     dst.buffer(frame).expect("invalid frame");
502 
503                     // Ensure the codec is ready to try the loop again.
504                     ready!(dst.poll_ready(cx))?;
505 
506                     // Because, always try to reclaim...
507                     self.reclaim_frame(buffer, store, dst);
508                 }
509                 None => {
510                     // Try to flush the codec.
511                     ready!(dst.flush(cx))?;
512 
513                     // This might release a data frame...
514                     if !self.reclaim_frame(buffer, store, dst) {
515                         return Poll::Ready(Ok(()));
516                     }
517 
518                     // No need to poll ready as poll_complete() does this for
519                     // us...
520                 }
521             }
522         }
523     }
524 
525     /// Tries to reclaim a pending data frame from the codec.
526     ///
527     /// Returns true if a frame was reclaimed.
528     ///
529     /// When a data frame is written to the codec, it may not be written in its
530     /// entirety (large chunks are split up into potentially many data frames).
531     /// In this case, the stream needs to be reprioritized.
reclaim_frame<T, B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, dst: &mut Codec<T, Prioritized<B>>, ) -> bool where B: Buf,532     fn reclaim_frame<T, B>(
533         &mut self,
534         buffer: &mut Buffer<Frame<B>>,
535         store: &mut Store,
536         dst: &mut Codec<T, Prioritized<B>>,
537     ) -> bool
538     where
539         B: Buf,
540     {
541         log::trace!("try reclaim frame");
542 
543         // First check if there are any data chunks to take back
544         if let Some(frame) = dst.take_last_data_frame() {
545             log::trace!(
546                 "  -> reclaimed; frame={:?}; sz={}",
547                 frame,
548                 frame.payload().inner.get_ref().remaining()
549             );
550 
551             let mut eos = false;
552             let key = frame.payload().stream;
553 
554             match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
555                 InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
556                 InFlightData::Drop => {
557                     log::trace!("not reclaiming frame for cancelled stream");
558                     return false;
559                 }
560                 InFlightData::DataFrame(k) => {
561                     debug_assert_eq!(k, key);
562                 }
563             }
564 
565             let mut frame = frame.map(|prioritized| {
566                 // TODO: Ensure fully written
567                 eos = prioritized.end_of_stream;
568                 prioritized.inner.into_inner()
569             });
570 
571             if frame.payload().has_remaining() {
572                 let mut stream = store.resolve(key);
573 
574                 if eos {
575                     frame.set_end_stream(true);
576                 }
577 
578                 self.push_back_frame(frame.into(), buffer, &mut stream);
579 
580                 return true;
581             }
582         }
583 
584         false
585     }
586 
587     /// Push the frame to the front of the stream's deque, scheduling the
588     /// stream if needed.
push_back_frame<B>( &mut self, frame: Frame<B>, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr, )589     fn push_back_frame<B>(
590         &mut self,
591         frame: Frame<B>,
592         buffer: &mut Buffer<Frame<B>>,
593         stream: &mut store::Ptr,
594     ) {
595         // Push the frame to the front of the stream's deque
596         stream.pending_send.push_front(buffer, frame);
597 
598         // If needed, schedule the sender
599         if stream.send_flow.available() > 0 {
600             debug_assert!(!stream.pending_send.is_empty());
601             self.pending_send.push(stream);
602         }
603     }
604 
clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr)605     pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
606         log::trace!("clear_queue; stream={:?}", stream.id);
607 
608         // TODO: make this more efficient?
609         while let Some(frame) = stream.pending_send.pop_front(buffer) {
610             log::trace!("dropping; frame={:?}", frame);
611         }
612 
613         stream.buffered_send_data = 0;
614         stream.requested_send_capacity = 0;
615         if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
616             if stream.key() == key {
617                 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
618                 self.in_flight_data_frame = InFlightData::Drop;
619             }
620         }
621     }
622 
clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts)623     pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
624         while let Some(stream) = self.pending_send.pop(store) {
625             let is_pending_reset = stream.is_pending_reset_expiration();
626             counts.transition_after(stream, is_pending_reset);
627         }
628     }
629 
clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts)630     pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
631         while let Some(stream) = self.pending_open.pop(store) {
632             let is_pending_reset = stream.is_pending_reset_expiration();
633             counts.transition_after(stream, is_pending_reset);
634         }
635     }
636 
pop_frame<B>( &mut self, buffer: &mut Buffer<Frame<B>>, store: &mut Store, max_len: usize, counts: &mut Counts, ) -> Option<Frame<Prioritized<B>>> where B: Buf,637     fn pop_frame<B>(
638         &mut self,
639         buffer: &mut Buffer<Frame<B>>,
640         store: &mut Store,
641         max_len: usize,
642         counts: &mut Counts,
643     ) -> Option<Frame<Prioritized<B>>>
644     where
645         B: Buf,
646     {
647         log::trace!("pop_frame");
648 
649         loop {
650             match self.pending_send.pop(store) {
651                 Some(mut stream) => {
652                     log::trace!(
653                         "pop_frame; stream={:?}; stream.state={:?}",
654                         stream.id,
655                         stream.state
656                     );
657 
658                     // It's possible that this stream, besides having data to send,
659                     // is also queued to send a reset, and thus is already in the queue
660                     // to wait for "some time" after a reset.
661                     //
662                     // To be safe, we just always ask the stream.
663                     let is_pending_reset = stream.is_pending_reset_expiration();
664 
665                     log::trace!(
666                         " --> stream={:?}; is_pending_reset={:?};",
667                         stream.id,
668                         is_pending_reset
669                     );
670 
671                     let frame = match stream.pending_send.pop_front(buffer) {
672                         Some(Frame::Data(mut frame)) => {
673                             // Get the amount of capacity remaining for stream's
674                             // window.
675                             let stream_capacity = stream.send_flow.available();
676                             let sz = frame.payload().remaining();
677 
678                             log::trace!(
679                                 " --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
680                                  available={}; requested={}; buffered={};",
681                                 frame.stream_id(),
682                                 sz,
683                                 frame.is_end_stream(),
684                                 stream_capacity,
685                                 stream.send_flow.available(),
686                                 stream.requested_send_capacity,
687                                 stream.buffered_send_data,
688                             );
689 
690                             // Zero length data frames always have capacity to
691                             // be sent.
692                             if sz > 0 && stream_capacity == 0 {
693                                 log::trace!(
694                                     " --> stream capacity is 0; requested={}",
695                                     stream.requested_send_capacity
696                                 );
697 
698                                 // Ensure that the stream is waiting for
699                                 // connection level capacity
700                                 //
701                                 // TODO: uncomment
702                                 // debug_assert!(stream.is_pending_send_capacity);
703 
704                                 // The stream has no more capacity, this can
705                                 // happen if the remote reduced the stream
706                                 // window. In this case, we need to buffer the
707                                 // frame and wait for a window update...
708                                 stream.pending_send.push_front(buffer, frame.into());
709 
710                                 continue;
711                             }
712 
713                             // Only send up to the max frame length
714                             let len = cmp::min(sz, max_len);
715 
716                             // Only send up to the stream's window capacity
717                             let len =
718                                 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
719 
720                             // There *must* be be enough connection level
721                             // capacity at this point.
722                             debug_assert!(len <= self.flow.window_size());
723 
724                             log::trace!(" --> sending data frame; len={}", len);
725 
726                             // Update the flow control
727                             log::trace!(" -- updating stream flow --");
728                             stream.send_flow.send_data(len);
729 
730                             // Decrement the stream's buffered data counter
731                             debug_assert!(stream.buffered_send_data >= len);
732                             stream.buffered_send_data -= len;
733                             stream.requested_send_capacity -= len;
734 
735                             // Assign the capacity back to the connection that
736                             // was just consumed from the stream in the previous
737                             // line.
738                             self.flow.assign_capacity(len);
739 
740                             log::trace!(" -- updating connection flow --");
741                             self.flow.send_data(len);
742 
743                             // Wrap the frame's data payload to ensure that the
744                             // correct amount of data gets written.
745 
746                             let eos = frame.is_end_stream();
747                             let len = len as usize;
748 
749                             if frame.payload().remaining() > len {
750                                 frame.set_end_stream(false);
751                             }
752 
753                             Frame::Data(frame.map(|buf| Prioritized {
754                                 inner: buf.take(len),
755                                 end_of_stream: eos,
756                                 stream: stream.key(),
757                             }))
758                         }
759                         Some(Frame::PushPromise(pp)) => {
760                             let mut pushed =
761                                 stream.store_mut().find_mut(&pp.promised_id()).unwrap();
762                             pushed.is_pending_push = false;
763                             // Transition stream from pending_push to pending_open
764                             // if possible
765                             if !pushed.pending_send.is_empty() {
766                                 if counts.can_inc_num_send_streams() {
767                                     counts.inc_num_send_streams(&mut pushed);
768                                     self.pending_send.push(&mut pushed);
769                                 } else {
770                                     self.queue_open(&mut pushed);
771                                 }
772                             }
773                             Frame::PushPromise(pp)
774                         }
775                         Some(frame) => frame.map(|_| {
776                             unreachable!(
777                                 "Frame::map closure will only be called \
778                                  on DATA frames."
779                             )
780                         }),
781                         None => {
782                             if let Some(reason) = stream.state.get_scheduled_reset() {
783                                 stream.state.set_reset(reason);
784 
785                                 let frame = frame::Reset::new(stream.id, reason);
786                                 Frame::Reset(frame)
787                             } else {
788                                 // If the stream receives a RESET from the peer, it may have
789                                 // had data buffered to be sent, but all the frames are cleared
790                                 // in clear_queue(). Instead of doing O(N) traversal through queue
791                                 // to remove, lets just ignore the stream here.
792                                 log::trace!("removing dangling stream from pending_send");
793                                 // Since this should only happen as a consequence of `clear_queue`,
794                                 // we must be in a closed state of some kind.
795                                 debug_assert!(stream.state.is_closed());
796                                 counts.transition_after(stream, is_pending_reset);
797                                 continue;
798                             }
799                         }
800                     };
801 
802                     log::trace!("pop_frame; frame={:?}", frame);
803 
804                     if cfg!(debug_assertions) && stream.state.is_idle() {
805                         debug_assert!(stream.id > self.last_opened_id);
806                         self.last_opened_id = stream.id;
807                     }
808 
809                     if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
810                         // TODO: Only requeue the sender IF it is ready to send
811                         // the next frame. i.e. don't requeue it if the next
812                         // frame is a data frame and the stream does not have
813                         // any more capacity.
814                         self.pending_send.push(&mut stream);
815                     }
816 
817                     counts.transition_after(stream, is_pending_reset);
818 
819                     return Some(frame);
820                 }
821                 None => return None,
822             }
823         }
824     }
825 
schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts)826     fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
827         log::trace!("schedule_pending_open");
828         // check for any pending open streams
829         while counts.can_inc_num_send_streams() {
830             if let Some(mut stream) = self.pending_open.pop(store) {
831                 log::trace!("schedule_pending_open; stream={:?}", stream.id);
832 
833                 counts.inc_num_send_streams(&mut stream);
834                 self.pending_send.push(&mut stream);
835                 stream.notify_send();
836             } else {
837                 return;
838             }
839         }
840     }
841 }
842 
843 // ===== impl Prioritized =====
844 
845 impl<B> Buf for Prioritized<B>
846 where
847     B: Buf,
848 {
remaining(&self) -> usize849     fn remaining(&self) -> usize {
850         self.inner.remaining()
851     }
852 
bytes(&self) -> &[u8]853     fn bytes(&self) -> &[u8] {
854         self.inner.bytes()
855     }
856 
advance(&mut self, cnt: usize)857     fn advance(&mut self, cnt: usize) {
858         self.inner.advance(cnt)
859     }
860 }
861 
862 impl<B: Buf> fmt::Debug for Prioritized<B> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result863     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
864         fmt.debug_struct("Prioritized")
865             .field("remaining", &self.inner.get_ref().remaining())
866             .field("end_of_stream", &self.end_of_stream)
867             .field("stream", &self.stream)
868             .finish()
869     }
870 }
871