1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 // Buffering data to send until it is acked.
8 
9 use std::cell::RefCell;
10 use std::cmp::{max, min, Ordering};
11 use std::collections::{BTreeMap, VecDeque};
12 use std::convert::TryFrom;
13 use std::mem;
14 use std::ops::Add;
15 use std::rc::Rc;
16 
17 use indexmap::IndexMap;
18 use smallvec::SmallVec;
19 
20 use neqo_common::{qdebug, qerror, qinfo, qtrace, Encoder, Role};
21 
22 use crate::events::ConnectionEvents;
23 use crate::fc::SenderFlowControl;
24 use crate::frame::{Frame, FRAME_TYPE_RESET_STREAM};
25 use crate::packet::PacketBuilder;
26 use crate::recovery::{RecoveryToken, StreamRecoveryToken};
27 use crate::stats::FrameStats;
28 use crate::stream_id::StreamId;
29 use crate::tparams::{self, TransportParameters};
30 use crate::{AppError, Error, Res};
31 
32 pub const SEND_BUFFER_SIZE: usize = 0x10_0000; // 1 MiB
33 
34 /// The priority that is assigned to sending data for the stream.
35 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
36 pub enum TransmissionPriority {
37     /// This stream is more important than the functioning of the connection.
38     /// Don't use this priority unless the stream really is that important.
39     /// A stream at this priority can starve out other connection functions,
40     /// including flow control, which could be very bad.
41     Critical,
42     /// The stream is very important.  Stream data will be written ahead of
43     /// some of the less critical connection functions, like path validation,
44     /// connection ID management, and session tickets.
45     Important,
46     /// High priority streams are important, but not enough to disrupt
47     /// connection operation.  They go ahead of session tickets though.
48     High,
49     /// The default priority.
50     Normal,
51     /// Low priority streams get sent last.
52     Low,
53 }
54 
55 impl Default for TransmissionPriority {
default() -> Self56     fn default() -> Self {
57         Self::Normal
58     }
59 }
60 
61 impl PartialOrd for TransmissionPriority {
partial_cmp(&self, other: &Self) -> Option<Ordering>62     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
63         Some(self.cmp(other))
64     }
65 }
66 
67 impl Ord for TransmissionPriority {
cmp(&self, other: &Self) -> Ordering68     fn cmp(&self, other: &Self) -> Ordering {
69         if self == other {
70             return Ordering::Equal;
71         }
72         match (self, other) {
73             (Self::Critical, _) => Ordering::Greater,
74             (_, Self::Critical) => Ordering::Less,
75             (Self::Important, _) => Ordering::Greater,
76             (_, Self::Important) => Ordering::Less,
77             (Self::High, _) => Ordering::Greater,
78             (_, Self::High) => Ordering::Less,
79             (Self::Normal, _) => Ordering::Greater,
80             (_, Self::Normal) => Ordering::Less,
81             _ => unreachable!(),
82         }
83     }
84 }
85 
86 impl Add<RetransmissionPriority> for TransmissionPriority {
87     type Output = Self;
add(self, rhs: RetransmissionPriority) -> Self::Output88     fn add(self, rhs: RetransmissionPriority) -> Self::Output {
89         match rhs {
90             RetransmissionPriority::Fixed(fixed) => fixed,
91             RetransmissionPriority::Same => self,
92             RetransmissionPriority::Higher => match self {
93                 Self::Critical => Self::Critical,
94                 Self::Important | Self::High => Self::Important,
95                 Self::Normal => Self::High,
96                 Self::Low => Self::Normal,
97             },
98             RetransmissionPriority::MuchHigher => match self {
99                 Self::Critical | Self::Important => Self::Critical,
100                 Self::High | Self::Normal => Self::Important,
101                 Self::Low => Self::High,
102             },
103         }
104     }
105 }
106 
107 /// If data is lost, this determines the priority that applies to retransmissions
108 /// of that data.
109 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
110 pub enum RetransmissionPriority {
111     /// Prioritize retransmission at a fixed priority.
112     /// With this, it is possible to prioritize retransmissions lower than transmissions.
113     /// Doing that can create a deadlock with flow control which might cause the connection
114     /// to stall unless new data stops arriving fast enough that retransmissions can complete.
115     Fixed(TransmissionPriority),
116     /// Don't increase priority for retransmission.  This is probably not a good idea
117     /// as it could mean starving flow control.
118     Same,
119     /// Increase the priority of retransmissions (the default).
120     /// Retransmissions of `Critical` or `Important` aren't elevated at all.
121     Higher,
122     /// Increase the priority of retransmissions a lot.
123     /// This is useful for streams that are particularly exposed to head-of-line blocking.
124     MuchHigher,
125 }
126 
127 impl Default for RetransmissionPriority {
default() -> Self128     fn default() -> Self {
129         Self::Higher
130     }
131 }
132 
133 #[derive(Debug, PartialEq, Clone, Copy)]
134 enum RangeState {
135     Sent,
136     Acked,
137 }
138 
139 /// Track ranges in the stream as sent or acked. Acked implies sent. Not in a
140 /// range implies needing-to-be-sent, either initially or as a retransmission.
141 #[derive(Debug, Default, PartialEq)]
142 struct RangeTracker {
143     // offset, (len, RangeState). Use u64 for len because ranges can exceed 32bits.
144     used: BTreeMap<u64, (u64, RangeState)>,
145 }
146 
147 impl RangeTracker {
highest_offset(&self) -> u64148     fn highest_offset(&self) -> u64 {
149         self.used
150             .range(..)
151             .next_back()
152             .map_or(0, |(k, (v, _))| *k + *v)
153     }
154 
acked_from_zero(&self) -> u64155     fn acked_from_zero(&self) -> u64 {
156         self.used
157             .get(&0)
158             .filter(|(_, state)| *state == RangeState::Acked)
159             .map_or(0, |(v, _)| *v)
160     }
161 
162     /// Find the first unmarked range. If all are contiguous, this will return
163     /// (highest_offset(), None).
first_unmarked_range(&self) -> (u64, Option<u64>)164     fn first_unmarked_range(&self) -> (u64, Option<u64>) {
165         let mut prev_end = 0;
166 
167         for (cur_off, (cur_len, _)) in &self.used {
168             if prev_end == *cur_off {
169                 prev_end = cur_off + cur_len;
170             } else {
171                 return (prev_end, Some(cur_off - prev_end));
172             }
173         }
174         (prev_end, None)
175     }
176 
177     /// Turn one range into a list of subranges that align with existing
178     /// ranges.
179     /// Check impermissible overlaps in subregions: Sent cannot overwrite Acked.
180     //
181     // e.g. given N is new and ABC are existing:
182     //             NNNNNNNNNNNNNNNN
183     //               AAAAA   BBBCCCCC  ...then we want 5 chunks:
184     //             1122222333444555
185     //
186     // but also if we have this:
187     //             NNNNNNNNNNNNNNNN
188     //           AAAAAAAAAA      BBBB  ...then break existing A and B ranges up:
189     //
190     //             1111111122222233
191     //           aaAAAAAAAA      BBbb
192     //
193     // Doing all this work up front should make handling each chunk much
194     // easier.
chunk_range_on_edges( &mut self, new_off: u64, new_len: u64, new_state: RangeState, ) -> Vec<(u64, u64, RangeState)>195     fn chunk_range_on_edges(
196         &mut self,
197         new_off: u64,
198         new_len: u64,
199         new_state: RangeState,
200     ) -> Vec<(u64, u64, RangeState)> {
201         let mut tmp_off = new_off;
202         let mut tmp_len = new_len;
203         let mut v = Vec::new();
204 
205         // cut previous overlapping range if needed
206         let prev = self.used.range_mut(..tmp_off).next_back();
207         if let Some((prev_off, (prev_len, prev_state))) = prev {
208             let prev_state = *prev_state;
209             let overlap = (*prev_off + *prev_len).saturating_sub(new_off);
210             *prev_len -= overlap;
211             if overlap > 0 {
212                 self.used.insert(new_off, (overlap, prev_state));
213             }
214         }
215 
216         let mut last_existing_remaining = None;
217         for (off, (len, state)) in self.used.range(tmp_off..tmp_off + tmp_len) {
218             // Create chunk for "overhang" before an existing range
219             if tmp_off < *off {
220                 let sub_len = off - tmp_off;
221                 v.push((tmp_off, sub_len, new_state));
222                 tmp_off += sub_len;
223                 tmp_len -= sub_len;
224             }
225 
226             // Create chunk to match existing range
227             let sub_len = min(*len, tmp_len);
228             let remaining_len = len - sub_len;
229             if new_state == RangeState::Sent && *state == RangeState::Acked {
230                 qinfo!(
231                     "Attempted to downgrade overlapping range Acked range {}-{} with Sent {}-{}",
232                     off,
233                     len,
234                     new_off,
235                     new_len
236                 );
237             } else {
238                 v.push((tmp_off, sub_len, new_state));
239             }
240             tmp_off += sub_len;
241             tmp_len -= sub_len;
242 
243             if remaining_len > 0 {
244                 last_existing_remaining = Some((*off, sub_len, remaining_len, *state));
245             }
246         }
247 
248         // Maybe break last existing range in two so that a final chunk will
249         // have the same length as an existing range entry
250         if let Some((off, sub_len, remaining_len, state)) = last_existing_remaining {
251             *self.used.get_mut(&off).expect("must be there") = (sub_len, state);
252             self.used.insert(off + sub_len, (remaining_len, state));
253         }
254 
255         // Create final chunk if anything remains of the new range
256         if tmp_len > 0 {
257             v.push((tmp_off, tmp_len, new_state))
258         }
259 
260         v
261     }
262 
263     /// Merge contiguous Acked ranges into the first entry (0). This range may
264     /// be dropped from the send buffer.
coalesce_acked_from_zero(&mut self)265     fn coalesce_acked_from_zero(&mut self) {
266         let acked_range_from_zero = self
267             .used
268             .get_mut(&0)
269             .filter(|(_, state)| *state == RangeState::Acked)
270             .map(|(len, _)| *len);
271 
272         if let Some(len_from_zero) = acked_range_from_zero {
273             let mut to_remove = SmallVec::<[_; 8]>::new();
274 
275             let mut new_len_from_zero = len_from_zero;
276 
277             // See if there's another Acked range entry contiguous to this one
278             while let Some((next_len, _)) = self
279                 .used
280                 .get(&new_len_from_zero)
281                 .filter(|(_, state)| *state == RangeState::Acked)
282             {
283                 to_remove.push(new_len_from_zero);
284                 new_len_from_zero += *next_len;
285             }
286 
287             if len_from_zero != new_len_from_zero {
288                 self.used.get_mut(&0).expect("must be there").0 = new_len_from_zero;
289             }
290 
291             for val in to_remove {
292                 self.used.remove(&val);
293             }
294         }
295     }
296 
mark_range(&mut self, off: u64, len: usize, state: RangeState)297     fn mark_range(&mut self, off: u64, len: usize, state: RangeState) {
298         if len == 0 {
299             qinfo!("mark 0-length range at {}", off);
300             return;
301         }
302 
303         let subranges = self.chunk_range_on_edges(off, len as u64, state);
304 
305         for (sub_off, sub_len, sub_state) in subranges {
306             self.used.insert(sub_off, (sub_len, sub_state));
307         }
308 
309         self.coalesce_acked_from_zero()
310     }
311 
unmark_range(&mut self, off: u64, len: usize)312     fn unmark_range(&mut self, off: u64, len: usize) {
313         if len == 0 {
314             qdebug!("unmark 0-length range at {}", off);
315             return;
316         }
317 
318         let len = u64::try_from(len).unwrap();
319         let end_off = off + len;
320 
321         let mut to_remove = SmallVec::<[_; 8]>::new();
322         let mut to_add = None;
323 
324         // Walk backwards through possibly affected existing ranges
325         for (cur_off, (cur_len, cur_state)) in self.used.range_mut(..off + len).rev() {
326             // Maybe fixup range preceding the removed range
327             if *cur_off < off {
328                 // Check for overlap
329                 if *cur_off + *cur_len > off {
330                     if *cur_state == RangeState::Acked {
331                         qdebug!(
332                             "Attempted to unmark Acked range {}-{} with unmark_range {}-{}",
333                             cur_off,
334                             cur_len,
335                             off,
336                             off + len
337                         );
338                     } else {
339                         *cur_len = off - cur_off;
340                     }
341                 }
342                 break;
343             }
344 
345             if *cur_state == RangeState::Acked {
346                 qdebug!(
347                     "Attempted to unmark Acked range {}-{} with unmark_range {}-{}",
348                     cur_off,
349                     cur_len,
350                     off,
351                     off + len
352                 );
353                 continue;
354             }
355 
356             // Add a new range for old subrange extending beyond
357             // to-be-unmarked range
358             let cur_end_off = cur_off + *cur_len;
359             if cur_end_off > end_off {
360                 let new_cur_off = off + len;
361                 let new_cur_len = cur_end_off - end_off;
362                 assert_eq!(to_add, None);
363                 to_add = Some((new_cur_off, new_cur_len, *cur_state));
364             }
365 
366             to_remove.push(*cur_off);
367         }
368 
369         for remove_off in to_remove {
370             self.used.remove(&remove_off);
371         }
372 
373         if let Some((new_cur_off, new_cur_len, cur_state)) = to_add {
374             self.used.insert(new_cur_off, (new_cur_len, cur_state));
375         }
376     }
377 
378     /// Unmark all sent ranges.
unmark_sent(&mut self)379     pub fn unmark_sent(&mut self) {
380         self.unmark_range(0, usize::try_from(self.highest_offset()).unwrap());
381     }
382 }
383 
384 /// Buffer to contain queued bytes and track their state.
385 #[derive(Debug, Default, PartialEq)]
386 pub struct TxBuffer {
387     retired: u64,           // contig acked bytes, no longer in buffer
388     send_buf: VecDeque<u8>, // buffer of not-acked bytes
389     ranges: RangeTracker,   // ranges in buffer that have been sent or acked
390 }
391 
392 impl TxBuffer {
new() -> Self393     pub fn new() -> Self {
394         Self::default()
395     }
396 
397     /// Attempt to add some or all of the passed-in buffer to the TxBuffer.
send(&mut self, buf: &[u8]) -> usize398     pub fn send(&mut self, buf: &[u8]) -> usize {
399         let can_buffer = min(SEND_BUFFER_SIZE - self.buffered(), buf.len());
400         if can_buffer > 0 {
401             self.send_buf.extend(&buf[..can_buffer]);
402             assert!(self.send_buf.len() <= SEND_BUFFER_SIZE);
403         }
404         can_buffer
405     }
406 
next_bytes(&self) -> Option<(u64, &[u8])>407     pub fn next_bytes(&self) -> Option<(u64, &[u8])> {
408         let (start, maybe_len) = self.ranges.first_unmarked_range();
409 
410         if start == self.retired + u64::try_from(self.buffered()).unwrap() {
411             return None;
412         }
413 
414         // Convert from ranges-relative-to-zero to
415         // ranges-relative-to-buffer-start
416         let buff_off = usize::try_from(start - self.retired).unwrap();
417 
418         // Deque returns two slices. Create a subslice from whichever
419         // one contains the first unmarked data.
420         let slc = if buff_off < self.send_buf.as_slices().0.len() {
421             &self.send_buf.as_slices().0[buff_off..]
422         } else {
423             &self.send_buf.as_slices().1[buff_off - self.send_buf.as_slices().0.len()..]
424         };
425 
426         let len = if let Some(range_len) = maybe_len {
427             // Truncate if range crosses deque slices
428             min(usize::try_from(range_len).unwrap(), slc.len())
429         } else {
430             slc.len()
431         };
432 
433         debug_assert!(len > 0);
434         debug_assert!(len <= slc.len());
435 
436         Some((start, &slc[..len]))
437     }
438 
mark_as_sent(&mut self, offset: u64, len: usize)439     pub fn mark_as_sent(&mut self, offset: u64, len: usize) {
440         self.ranges.mark_range(offset, len, RangeState::Sent)
441     }
442 
mark_as_acked(&mut self, offset: u64, len: usize)443     pub fn mark_as_acked(&mut self, offset: u64, len: usize) {
444         self.ranges.mark_range(offset, len, RangeState::Acked);
445 
446         // We can drop contig acked range from the buffer
447         let new_retirable = self.ranges.acked_from_zero() - self.retired;
448         debug_assert!(new_retirable <= self.buffered() as u64);
449         let keep_len =
450             self.buffered() - usize::try_from(new_retirable).expect("should fit in usize");
451 
452         // Truncate front
453         self.send_buf.rotate_left(self.buffered() - keep_len);
454         self.send_buf.truncate(keep_len);
455 
456         self.retired += new_retirable;
457     }
458 
mark_as_lost(&mut self, offset: u64, len: usize)459     pub fn mark_as_lost(&mut self, offset: u64, len: usize) {
460         self.ranges.unmark_range(offset, len)
461     }
462 
463     /// Forget about anything that was marked as sent.
unmark_sent(&mut self)464     pub fn unmark_sent(&mut self) {
465         self.ranges.unmark_sent();
466     }
467 
buffered(&self) -> usize468     fn buffered(&self) -> usize {
469         self.send_buf.len()
470     }
471 
avail(&self) -> usize472     fn avail(&self) -> usize {
473         SEND_BUFFER_SIZE - self.buffered()
474     }
475 
used(&self) -> u64476     fn used(&self) -> u64 {
477         self.retired + u64::try_from(self.buffered()).unwrap()
478     }
479 }
480 
481 /// QUIC sending stream states, based on -transport 3.1.
482 #[derive(Debug)]
483 pub(crate) enum SendStreamState {
484     Ready {
485         fc: SenderFlowControl<StreamId>,
486         conn_fc: Rc<RefCell<SenderFlowControl<()>>>,
487     },
488     Send {
489         fc: SenderFlowControl<StreamId>,
490         conn_fc: Rc<RefCell<SenderFlowControl<()>>>,
491         send_buf: TxBuffer,
492     },
493     // Note: `DataSent` is entered when the stream is closed, not when all data has been
494     // sent for the first time.
495     DataSent {
496         send_buf: TxBuffer,
497         fin_sent: bool,
498         fin_acked: bool,
499     },
500     DataRecvd,
501     ResetSent {
502         err: AppError,
503         final_size: u64,
504         priority: Option<TransmissionPriority>,
505     },
506     ResetRecvd,
507 }
508 
509 impl SendStreamState {
tx_buf_mut(&mut self) -> Option<&mut TxBuffer>510     fn tx_buf_mut(&mut self) -> Option<&mut TxBuffer> {
511         match self {
512             Self::Send { send_buf, .. } | Self::DataSent { send_buf, .. } => Some(send_buf),
513             Self::Ready { .. }
514             | Self::DataRecvd { .. }
515             | Self::ResetSent { .. }
516             | Self::ResetRecvd => None,
517         }
518     }
519 
tx_avail(&self) -> usize520     fn tx_avail(&self) -> usize {
521         match self {
522             // In Ready, TxBuffer not yet allocated but size is known
523             Self::Ready { .. } => SEND_BUFFER_SIZE,
524             Self::Send { send_buf, .. } | Self::DataSent { send_buf, .. } => send_buf.avail(),
525             Self::DataRecvd { .. } | Self::ResetSent { .. } | Self::ResetRecvd => 0,
526         }
527     }
528 
name(&self) -> &str529     fn name(&self) -> &str {
530         match self {
531             Self::Ready { .. } => "Ready",
532             Self::Send { .. } => "Send",
533             Self::DataSent { .. } => "DataSent",
534             Self::DataRecvd { .. } => "DataRecvd",
535             Self::ResetSent { .. } => "ResetSent",
536             Self::ResetRecvd => "ResetRecvd",
537         }
538     }
539 
transition(&mut self, new_state: Self)540     fn transition(&mut self, new_state: Self) {
541         qtrace!("SendStream state {} -> {}", self.name(), new_state.name());
542         *self = new_state;
543     }
544 }
545 
546 /// Implement a QUIC send stream.
547 #[derive(Debug)]
548 pub struct SendStream {
549     stream_id: StreamId,
550     state: SendStreamState,
551     conn_events: ConnectionEvents,
552     priority: TransmissionPriority,
553     retransmission_priority: RetransmissionPriority,
554     retransmission_offset: u64,
555 }
556 
557 impl SendStream {
new( stream_id: StreamId, max_stream_data: u64, conn_fc: Rc<RefCell<SenderFlowControl<()>>>, conn_events: ConnectionEvents, ) -> Self558     pub fn new(
559         stream_id: StreamId,
560         max_stream_data: u64,
561         conn_fc: Rc<RefCell<SenderFlowControl<()>>>,
562         conn_events: ConnectionEvents,
563     ) -> Self {
564         let ss = Self {
565             stream_id,
566             state: SendStreamState::Ready {
567                 fc: SenderFlowControl::new(stream_id, max_stream_data),
568                 conn_fc,
569             },
570             conn_events,
571             priority: TransmissionPriority::default(),
572             retransmission_priority: RetransmissionPriority::default(),
573             retransmission_offset: 0,
574         };
575         if ss.avail() > 0 {
576             ss.conn_events.send_stream_writable(stream_id);
577         }
578         ss
579     }
580 
set_priority( &mut self, transmission: TransmissionPriority, retransmission: RetransmissionPriority, )581     pub fn set_priority(
582         &mut self,
583         transmission: TransmissionPriority,
584         retransmission: RetransmissionPriority,
585     ) {
586         self.priority = transmission;
587         self.retransmission_priority = retransmission;
588     }
589 
590     /// If all data has been buffered or written, how much was sent.
final_size(&self) -> Option<u64>591     pub fn final_size(&self) -> Option<u64> {
592         match &self.state {
593             SendStreamState::DataSent { send_buf, .. } => Some(send_buf.used()),
594             SendStreamState::ResetSent { final_size, .. } => Some(*final_size),
595             _ => None,
596         }
597     }
598 
599     /// Return the next range to be sent, if any.
600     /// If this is a retransmission, cut off what is sent at the retransmission
601     /// offset.
next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])>602     fn next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])> {
603         match self.state {
604             SendStreamState::Send { ref send_buf, .. } => {
605                 send_buf.next_bytes().and_then(|(offset, slice)| {
606                     if retransmission_only {
607                         qtrace!(
608                             [self],
609                             "next_bytes apply retransmission limit at {}",
610                             self.retransmission_offset
611                         );
612                         if self.retransmission_offset > offset {
613                             let len = min(
614                                 usize::try_from(self.retransmission_offset - offset).unwrap(),
615                                 slice.len(),
616                             );
617                             Some((offset, &slice[..len]))
618                         } else {
619                             None
620                         }
621                     } else {
622                         Some((offset, slice))
623                     }
624                 })
625             }
626             SendStreamState::DataSent {
627                 ref send_buf,
628                 fin_sent,
629                 ..
630             } => {
631                 let bytes = send_buf.next_bytes();
632                 if bytes.is_some() {
633                     bytes
634                 } else if fin_sent {
635                     None
636                 } else {
637                     // Send empty stream frame with fin set
638                     Some((send_buf.used(), &[]))
639                 }
640             }
641             SendStreamState::Ready { .. }
642             | SendStreamState::DataRecvd { .. }
643             | SendStreamState::ResetSent { .. }
644             | SendStreamState::ResetRecvd => None,
645         }
646     }
647 
648     /// Calculate how many bytes (length) can fit into available space and whether
649     /// the remainder of the space can be filled (or if a length field is needed).
length_and_fill(data_len: usize, space: usize) -> (usize, bool)650     fn length_and_fill(data_len: usize, space: usize) -> (usize, bool) {
651         if data_len >= space {
652             // More data than space allows, or an exact fit => fast path.
653             qtrace!("SendStream::length_and_fill fill {}", space);
654             return (space, true);
655         }
656 
657         // Estimate size of the length field based on the available space,
658         // less 1, which is the worst case.
659         let length = min(space.saturating_sub(1), data_len);
660         let length_len = Encoder::varint_len(u64::try_from(length).unwrap());
661         debug_assert!(length_len <= space); // We don't depend on this being true, but it is true.
662 
663         // From here we can always fit `data_len`, but we might as well fill
664         // if there is no space for the length field plus another frame.
665         let fill = data_len + length_len + PacketBuilder::MINIMUM_FRAME_SIZE > space;
666         qtrace!("SendStream::length_and_fill {} fill {}", data_len, fill);
667         (data_len, fill)
668     }
669 
670     /// Maybe write a `STREAM` frame.
write_stream_frame( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )671     fn write_stream_frame(
672         &mut self,
673         priority: TransmissionPriority,
674         builder: &mut PacketBuilder,
675         tokens: &mut Vec<RecoveryToken>,
676         stats: &mut FrameStats,
677     ) {
678         let retransmission = if priority == self.priority {
679             false
680         } else if priority == self.priority + self.retransmission_priority {
681             true
682         } else {
683             return;
684         };
685 
686         let id = self.stream_id;
687         let final_size = self.final_size();
688         if let Some((offset, data)) = self.next_bytes(retransmission) {
689             let overhead = 1 // Frame type
690                 + Encoder::varint_len(id.as_u64())
691                 + if offset > 0 {
692                     Encoder::varint_len(offset)
693                 } else {
694                     0
695                 };
696             if overhead > builder.remaining() {
697                 qtrace!([self], "write_frame no space for header");
698                 return;
699             }
700 
701             let (length, fill) = Self::length_and_fill(data.len(), builder.remaining() - overhead);
702             let fin = final_size.map_or(false, |fs| fs == offset + u64::try_from(length).unwrap());
703             if length == 0 && !fin {
704                 qtrace!([self], "write_frame no data, no fin");
705                 return;
706             }
707 
708             // Write the stream out.
709             builder.encode_varint(Frame::stream_type(fin, offset > 0, fill));
710             builder.encode_varint(id.as_u64());
711             if offset > 0 {
712                 builder.encode_varint(offset);
713             }
714             if fill {
715                 builder.encode(&data[..length]);
716                 builder.mark_full();
717             } else {
718                 builder.encode_vvec(&data[..length]);
719             }
720             debug_assert!(builder.len() <= builder.limit());
721 
722             self.mark_as_sent(offset, length, fin);
723             tokens.push(RecoveryToken::Stream(StreamRecoveryToken::Stream(
724                 SendStreamRecoveryToken {
725                     id,
726                     offset,
727                     length,
728                     fin,
729                 },
730             )));
731             stats.stream += 1;
732         }
733     }
734 
reset_acked(&mut self)735     pub fn reset_acked(&mut self) {
736         match self.state {
737             SendStreamState::Ready { .. }
738             | SendStreamState::Send { .. }
739             | SendStreamState::DataSent { .. }
740             | SendStreamState::DataRecvd { .. } => {
741                 qtrace!([self], "Reset acked while in {} state?", self.state.name())
742             }
743             SendStreamState::ResetSent { .. } => self.state.transition(SendStreamState::ResetRecvd),
744             SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"),
745         };
746     }
747 
reset_lost(&mut self)748     pub fn reset_lost(&mut self) {
749         match self.state {
750             SendStreamState::ResetSent {
751                 ref mut priority, ..
752             } => {
753                 *priority = Some(self.priority + self.retransmission_priority);
754             }
755             SendStreamState::ResetRecvd => (),
756             _ => unreachable!(),
757         }
758     }
759 
760     /// Maybe write a `RESET_STREAM` frame.
write_reset_frame( &mut self, p: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, ) -> bool761     pub fn write_reset_frame(
762         &mut self,
763         p: TransmissionPriority,
764         builder: &mut PacketBuilder,
765         tokens: &mut Vec<RecoveryToken>,
766         stats: &mut FrameStats,
767     ) -> bool {
768         if let SendStreamState::ResetSent {
769             final_size,
770             err,
771             ref mut priority,
772         } = self.state
773         {
774             if *priority != Some(p) {
775                 return false;
776             }
777             if builder.write_varint_frame(&[
778                 FRAME_TYPE_RESET_STREAM,
779                 self.stream_id.as_u64(),
780                 err,
781                 final_size,
782             ]) {
783                 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::ResetStream {
784                     stream_id: self.stream_id,
785                 }));
786                 stats.reset_stream += 1;
787                 *priority = None;
788                 true
789             } else {
790                 false
791             }
792         } else {
793             false
794         }
795     }
796 
blocked_lost(&mut self, limit: u64)797     pub fn blocked_lost(&mut self, limit: u64) {
798         if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } =
799             &mut self.state
800         {
801             fc.frame_lost(limit);
802         } else {
803             qtrace!([self], "Ignoring lost STREAM_DATA_BLOCKED({})", limit);
804         }
805     }
806 
807     /// Maybe write a `STREAM_DATA_BLOCKED` frame.
write_blocked_frame( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )808     pub fn write_blocked_frame(
809         &mut self,
810         priority: TransmissionPriority,
811         builder: &mut PacketBuilder,
812         tokens: &mut Vec<RecoveryToken>,
813         stats: &mut FrameStats,
814     ) {
815         // Send STREAM_DATA_BLOCKED at normal priority always.
816         if priority == self.priority {
817             if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } =
818                 &mut self.state
819             {
820                 fc.write_frames(builder, tokens, stats);
821             }
822         }
823     }
824 
mark_as_sent(&mut self, offset: u64, len: usize, fin: bool)825     pub fn mark_as_sent(&mut self, offset: u64, len: usize, fin: bool) {
826         if let Some(buf) = self.state.tx_buf_mut() {
827             buf.mark_as_sent(offset, len);
828             self.send_blocked_if_space_needed(0);
829         };
830 
831         if fin {
832             if let SendStreamState::DataSent { fin_sent, .. } = &mut self.state {
833                 *fin_sent = true;
834             }
835         }
836     }
837 
mark_as_acked(&mut self, offset: u64, len: usize, fin: bool)838     pub fn mark_as_acked(&mut self, offset: u64, len: usize, fin: bool) {
839         match self.state {
840             SendStreamState::Send {
841                 ref mut send_buf, ..
842             } => {
843                 send_buf.mark_as_acked(offset, len);
844                 if self.avail() > 0 {
845                     self.conn_events.send_stream_writable(self.stream_id)
846                 }
847             }
848             SendStreamState::DataSent {
849                 ref mut send_buf,
850                 ref mut fin_acked,
851                 ..
852             } => {
853                 send_buf.mark_as_acked(offset, len);
854                 if fin {
855                     *fin_acked = true;
856                 }
857                 if *fin_acked && send_buf.buffered() == 0 {
858                     self.conn_events.send_stream_complete(self.stream_id);
859                     self.state.transition(SendStreamState::DataRecvd);
860                 }
861             }
862             _ => qtrace!(
863                 [self],
864                 "mark_as_acked called from state {}",
865                 self.state.name()
866             ),
867         }
868     }
869 
mark_as_lost(&mut self, offset: u64, len: usize, fin: bool)870     pub fn mark_as_lost(&mut self, offset: u64, len: usize, fin: bool) {
871         self.retransmission_offset = max(
872             self.retransmission_offset,
873             offset + u64::try_from(len).unwrap(),
874         );
875         qtrace!(
876             [self],
877             "mark_as_lost retransmission offset={}",
878             self.retransmission_offset
879         );
880         if let Some(buf) = self.state.tx_buf_mut() {
881             buf.mark_as_lost(offset, len);
882         }
883 
884         if fin {
885             if let SendStreamState::DataSent {
886                 fin_sent,
887                 fin_acked,
888                 ..
889             } = &mut self.state
890             {
891                 *fin_sent = *fin_acked;
892             }
893         }
894     }
895 
896     /// Bytes sendable on stream. Constrained by stream credit available,
897     /// connection credit available, and space in the tx buffer.
avail(&self) -> usize898     pub fn avail(&self) -> usize {
899         if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } =
900             &self.state
901         {
902             min(
903                 min(fc.available(), conn_fc.borrow().available()),
904                 self.state.tx_avail(),
905             )
906         } else {
907             0
908         }
909     }
910 
set_max_stream_data(&mut self, limit: u64)911     pub fn set_max_stream_data(&mut self, limit: u64) {
912         if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } =
913             &mut self.state
914         {
915             let stream_was_blocked = fc.available() == 0;
916             fc.update(limit);
917             if stream_was_blocked && self.avail() > 0 {
918                 self.conn_events.send_stream_writable(self.stream_id)
919             }
920         }
921     }
922 
is_terminal(&self) -> bool923     pub fn is_terminal(&self) -> bool {
924         matches!(
925             self.state,
926             SendStreamState::DataRecvd { .. } | SendStreamState::ResetRecvd
927         )
928     }
929 
send(&mut self, buf: &[u8]) -> Res<usize>930     pub fn send(&mut self, buf: &[u8]) -> Res<usize> {
931         self.send_internal(buf, false)
932     }
933 
send_atomic(&mut self, buf: &[u8]) -> Res<usize>934     pub fn send_atomic(&mut self, buf: &[u8]) -> Res<usize> {
935         self.send_internal(buf, true)
936     }
937 
send_blocked_if_space_needed(&mut self, needed_space: usize)938     fn send_blocked_if_space_needed(&mut self, needed_space: usize) {
939         if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } =
940             &mut self.state
941         {
942             if fc.available() <= needed_space {
943                 fc.blocked();
944             }
945 
946             if conn_fc.borrow().available() <= needed_space {
947                 conn_fc.borrow_mut().blocked();
948             }
949         }
950     }
951 
send_internal(&mut self, buf: &[u8], atomic: bool) -> Res<usize>952     fn send_internal(&mut self, buf: &[u8], atomic: bool) -> Res<usize> {
953         if buf.is_empty() {
954             qerror!([self], "zero-length send on stream");
955             return Err(Error::InvalidInput);
956         }
957 
958         if let SendStreamState::Ready { fc, conn_fc } = &mut self.state {
959             let owned_fc = mem::replace(fc, SenderFlowControl::new(self.stream_id, 0));
960             let owned_conn_fc = Rc::clone(conn_fc);
961             self.state.transition(SendStreamState::Send {
962                 fc: owned_fc,
963                 conn_fc: owned_conn_fc,
964                 send_buf: TxBuffer::new(),
965             });
966         }
967 
968         if !matches!(self.state, SendStreamState::Send { .. }) {
969             return Err(Error::FinalSizeError);
970         }
971 
972         let buf = if buf.is_empty() || (self.avail() == 0) {
973             return Ok(0);
974         } else if self.avail() < buf.len() {
975             if atomic {
976                 self.send_blocked_if_space_needed(buf.len());
977                 return Ok(0);
978             } else {
979                 &buf[..self.avail()]
980             }
981         } else {
982             buf
983         };
984 
985         match &mut self.state {
986             SendStreamState::Ready { .. } => unreachable!(),
987             SendStreamState::Send {
988                 fc,
989                 conn_fc,
990                 send_buf,
991             } => {
992                 let sent = send_buf.send(buf);
993                 fc.consume(sent);
994                 conn_fc.borrow_mut().consume(sent);
995                 Ok(sent)
996             }
997             _ => Err(Error::FinalSizeError),
998         }
999     }
1000 
close(&mut self)1001     pub fn close(&mut self) {
1002         match &mut self.state {
1003             SendStreamState::Ready { .. } => {
1004                 self.state.transition(SendStreamState::DataSent {
1005                     send_buf: TxBuffer::new(),
1006                     fin_sent: false,
1007                     fin_acked: false,
1008                 });
1009             }
1010             SendStreamState::Send { send_buf, .. } => {
1011                 let owned_buf = mem::replace(send_buf, TxBuffer::new());
1012                 self.state.transition(SendStreamState::DataSent {
1013                     send_buf: owned_buf,
1014                     fin_sent: false,
1015                     fin_acked: false,
1016                 });
1017             }
1018             SendStreamState::DataSent { .. } => qtrace!([self], "already in DataSent state"),
1019             SendStreamState::DataRecvd { .. } => qtrace!([self], "already in DataRecvd state"),
1020             SendStreamState::ResetSent { .. } => qtrace!([self], "already in ResetSent state"),
1021             SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"),
1022         }
1023     }
1024 
reset(&mut self, err: AppError)1025     pub fn reset(&mut self, err: AppError) {
1026         match &self.state {
1027             SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } => {
1028                 let final_size = fc.used();
1029                 self.state.transition(SendStreamState::ResetSent {
1030                     err,
1031                     final_size,
1032                     priority: Some(self.priority),
1033                 });
1034             }
1035             SendStreamState::DataSent { send_buf, .. } => {
1036                 let final_size = send_buf.used();
1037                 self.state.transition(SendStreamState::ResetSent {
1038                     err,
1039                     final_size,
1040                     priority: Some(self.priority),
1041                 });
1042             }
1043             SendStreamState::DataRecvd { .. } => qtrace!([self], "already in DataRecvd state"),
1044             SendStreamState::ResetSent { .. } => qtrace!([self], "already in ResetSent state"),
1045             SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"),
1046         };
1047     }
1048 
1049     #[cfg(test)]
state(&mut self) -> &mut SendStreamState1050     pub(crate) fn state(&mut self) -> &mut SendStreamState {
1051         &mut self.state
1052     }
1053 }
1054 
1055 impl ::std::fmt::Display for SendStream {
fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result1056     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
1057         write!(f, "SendStream {}", self.stream_id)
1058     }
1059 }
1060 
1061 #[derive(Debug, Default)]
1062 pub(crate) struct SendStreams(IndexMap<StreamId, SendStream>);
1063 
1064 impl SendStreams {
get(&self, id: StreamId) -> Res<&SendStream>1065     pub fn get(&self, id: StreamId) -> Res<&SendStream> {
1066         self.0.get(&id).ok_or(Error::InvalidStreamId)
1067     }
1068 
get_mut(&mut self, id: StreamId) -> Res<&mut SendStream>1069     pub fn get_mut(&mut self, id: StreamId) -> Res<&mut SendStream> {
1070         self.0.get_mut(&id).ok_or(Error::InvalidStreamId)
1071     }
1072 
exists(&self, id: StreamId) -> bool1073     pub fn exists(&self, id: StreamId) -> bool {
1074         self.0.contains_key(&id)
1075     }
1076 
insert(&mut self, id: StreamId, stream: SendStream)1077     pub fn insert(&mut self, id: StreamId, stream: SendStream) {
1078         self.0.insert(id, stream);
1079     }
1080 
acked(&mut self, token: &SendStreamRecoveryToken)1081     pub fn acked(&mut self, token: &SendStreamRecoveryToken) {
1082         if let Some(ss) = self.0.get_mut(&token.id) {
1083             ss.mark_as_acked(token.offset, token.length, token.fin);
1084         }
1085     }
1086 
reset_acked(&mut self, id: StreamId)1087     pub fn reset_acked(&mut self, id: StreamId) {
1088         if let Some(ss) = self.0.get_mut(&id) {
1089             ss.reset_acked()
1090         }
1091     }
1092 
lost(&mut self, token: &SendStreamRecoveryToken)1093     pub fn lost(&mut self, token: &SendStreamRecoveryToken) {
1094         if let Some(ss) = self.0.get_mut(&token.id) {
1095             ss.mark_as_lost(token.offset, token.length, token.fin);
1096         }
1097     }
1098 
reset_lost(&mut self, stream_id: StreamId)1099     pub fn reset_lost(&mut self, stream_id: StreamId) {
1100         if let Some(ss) = self.0.get_mut(&stream_id) {
1101             ss.reset_lost();
1102         }
1103     }
1104 
blocked_lost(&mut self, stream_id: StreamId, limit: u64)1105     pub fn blocked_lost(&mut self, stream_id: StreamId, limit: u64) {
1106         if let Some(ss) = self.0.get_mut(&stream_id) {
1107             ss.blocked_lost(limit);
1108         }
1109     }
1110 
clear(&mut self)1111     pub fn clear(&mut self) {
1112         self.0.clear()
1113     }
1114 
clear_terminal(&mut self)1115     pub fn clear_terminal(&mut self) {
1116         self.0.retain(|_, stream| !stream.is_terminal())
1117     }
1118 
write_frames( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )1119     pub(crate) fn write_frames(
1120         &mut self,
1121         priority: TransmissionPriority,
1122         builder: &mut PacketBuilder,
1123         tokens: &mut Vec<RecoveryToken>,
1124         stats: &mut FrameStats,
1125     ) {
1126         qtrace!("write STREAM frames at priority {:?}", priority);
1127         for stream in self.0.values_mut() {
1128             if !stream.write_reset_frame(priority, builder, tokens, stats) {
1129                 stream.write_blocked_frame(priority, builder, tokens, stats);
1130                 stream.write_stream_frame(priority, builder, tokens, stats);
1131             }
1132         }
1133     }
1134 
update_initial_limit(&mut self, remote: &TransportParameters)1135     pub fn update_initial_limit(&mut self, remote: &TransportParameters) {
1136         for (id, ss) in self.0.iter_mut() {
1137             let limit = if id.is_bidi() {
1138                 assert!(!id.is_remote_initiated(Role::Client));
1139                 remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE)
1140             } else {
1141                 remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_UNI)
1142             };
1143             ss.set_max_stream_data(limit);
1144         }
1145     }
1146 }
1147 
1148 impl<'a> IntoIterator for &'a mut SendStreams {
1149     type Item = (&'a StreamId, &'a mut SendStream);
1150     type IntoIter = indexmap::map::IterMut<'a, StreamId, SendStream>;
1151 
into_iter(self) -> indexmap::map::IterMut<'a, StreamId, SendStream>1152     fn into_iter(self) -> indexmap::map::IterMut<'a, StreamId, SendStream> {
1153         self.0.iter_mut()
1154     }
1155 }
1156 
1157 #[derive(Debug, Clone)]
1158 pub struct SendStreamRecoveryToken {
1159     pub(crate) id: StreamId,
1160     offset: u64,
1161     length: usize,
1162     fin: bool,
1163 }
1164 
1165 #[cfg(test)]
1166 mod tests {
1167     use super::*;
1168 
1169     use crate::events::ConnectionEvent;
1170     use neqo_common::{event::Provider, hex_with_len, qtrace};
1171 
connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>>1172     fn connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>> {
1173         Rc::new(RefCell::new(SenderFlowControl::new((), limit)))
1174     }
1175 
1176     #[test]
test_mark_range()1177     fn test_mark_range() {
1178         let mut rt = RangeTracker::default();
1179 
1180         // ranges can go from nothing->Sent if queued for retrans and then
1181         // acks arrive
1182         rt.mark_range(5, 5, RangeState::Acked);
1183         assert_eq!(rt.highest_offset(), 10);
1184         assert_eq!(rt.acked_from_zero(), 0);
1185         rt.mark_range(10, 4, RangeState::Acked);
1186         assert_eq!(rt.highest_offset(), 14);
1187         assert_eq!(rt.acked_from_zero(), 0);
1188 
1189         rt.mark_range(0, 5, RangeState::Sent);
1190         assert_eq!(rt.highest_offset(), 14);
1191         assert_eq!(rt.acked_from_zero(), 0);
1192         rt.mark_range(0, 5, RangeState::Acked);
1193         assert_eq!(rt.highest_offset(), 14);
1194         assert_eq!(rt.acked_from_zero(), 14);
1195 
1196         rt.mark_range(12, 20, RangeState::Acked);
1197         assert_eq!(rt.highest_offset(), 32);
1198         assert_eq!(rt.acked_from_zero(), 32);
1199 
1200         // ack the lot
1201         rt.mark_range(0, 400, RangeState::Acked);
1202         assert_eq!(rt.highest_offset(), 400);
1203         assert_eq!(rt.acked_from_zero(), 400);
1204 
1205         // acked trumps sent
1206         rt.mark_range(0, 200, RangeState::Sent);
1207         assert_eq!(rt.highest_offset(), 400);
1208         assert_eq!(rt.acked_from_zero(), 400);
1209     }
1210 
1211     #[test]
unmark_sent_start()1212     fn unmark_sent_start() {
1213         let mut rt = RangeTracker::default();
1214 
1215         rt.mark_range(0, 5, RangeState::Sent);
1216         assert_eq!(rt.highest_offset(), 5);
1217         assert_eq!(rt.acked_from_zero(), 0);
1218 
1219         rt.unmark_sent();
1220         assert_eq!(rt.highest_offset(), 0);
1221         assert_eq!(rt.acked_from_zero(), 0);
1222         assert_eq!(rt.first_unmarked_range(), (0, None));
1223     }
1224 
1225     #[test]
unmark_sent_middle()1226     fn unmark_sent_middle() {
1227         let mut rt = RangeTracker::default();
1228 
1229         rt.mark_range(0, 5, RangeState::Acked);
1230         assert_eq!(rt.highest_offset(), 5);
1231         assert_eq!(rt.acked_from_zero(), 5);
1232         rt.mark_range(5, 5, RangeState::Sent);
1233         assert_eq!(rt.highest_offset(), 10);
1234         assert_eq!(rt.acked_from_zero(), 5);
1235         rt.mark_range(10, 5, RangeState::Acked);
1236         assert_eq!(rt.highest_offset(), 15);
1237         assert_eq!(rt.acked_from_zero(), 5);
1238         assert_eq!(rt.first_unmarked_range(), (15, None));
1239 
1240         rt.unmark_sent();
1241         assert_eq!(rt.highest_offset(), 15);
1242         assert_eq!(rt.acked_from_zero(), 5);
1243         assert_eq!(rt.first_unmarked_range(), (5, Some(5)));
1244     }
1245 
1246     #[test]
unmark_sent_end()1247     fn unmark_sent_end() {
1248         let mut rt = RangeTracker::default();
1249 
1250         rt.mark_range(0, 5, RangeState::Acked);
1251         assert_eq!(rt.highest_offset(), 5);
1252         assert_eq!(rt.acked_from_zero(), 5);
1253         rt.mark_range(5, 5, RangeState::Sent);
1254         assert_eq!(rt.highest_offset(), 10);
1255         assert_eq!(rt.acked_from_zero(), 5);
1256         assert_eq!(rt.first_unmarked_range(), (10, None));
1257 
1258         rt.unmark_sent();
1259         assert_eq!(rt.highest_offset(), 5);
1260         assert_eq!(rt.acked_from_zero(), 5);
1261         assert_eq!(rt.first_unmarked_range(), (5, None));
1262     }
1263 
1264     #[test]
truncate_front()1265     fn truncate_front() {
1266         let mut v = VecDeque::new();
1267         v.push_back(5);
1268         v.push_back(6);
1269         v.push_back(7);
1270         v.push_front(4usize);
1271 
1272         v.rotate_left(1);
1273         v.truncate(3);
1274         assert_eq!(*v.front().unwrap(), 5);
1275         assert_eq!(*v.back().unwrap(), 7);
1276     }
1277 
1278     #[test]
test_unmark_range()1279     fn test_unmark_range() {
1280         let mut rt = RangeTracker::default();
1281 
1282         rt.mark_range(5, 5, RangeState::Acked);
1283         rt.mark_range(10, 5, RangeState::Sent);
1284 
1285         // Should unmark sent but not acked range
1286         rt.unmark_range(7, 6);
1287 
1288         let res = rt.first_unmarked_range();
1289         assert_eq!(res, (0, Some(5)));
1290         assert_eq!(
1291             rt.used.iter().next().unwrap(),
1292             (&5, &(5, RangeState::Acked))
1293         );
1294         assert_eq!(
1295             rt.used.iter().nth(1).unwrap(),
1296             (&13, &(2, RangeState::Sent))
1297         );
1298         assert!(rt.used.iter().nth(2).is_none());
1299         rt.mark_range(0, 5, RangeState::Sent);
1300 
1301         let res = rt.first_unmarked_range();
1302         assert_eq!(res, (10, Some(3)));
1303         rt.mark_range(10, 3, RangeState::Sent);
1304 
1305         let res = rt.first_unmarked_range();
1306         assert_eq!(res, (15, None));
1307     }
1308 
1309     #[test]
1310     #[allow(clippy::cognitive_complexity)]
tx_buffer_next_bytes_1()1311     fn tx_buffer_next_bytes_1() {
1312         let mut txb = TxBuffer::new();
1313 
1314         assert_eq!(txb.avail(), SEND_BUFFER_SIZE);
1315 
1316         // Fill the buffer
1317         assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE);
1318         assert!(matches!(txb.next_bytes(),
1319 			 Some((0, x)) if x.len()==SEND_BUFFER_SIZE
1320 			 && x.iter().all(|ch| *ch == 1)));
1321 
1322         // Mark almost all as sent. Get what's left
1323         let one_byte_from_end = SEND_BUFFER_SIZE as u64 - 1;
1324         txb.mark_as_sent(0, one_byte_from_end as usize);
1325         assert!(matches!(txb.next_bytes(),
1326 			 Some((start, x)) if x.len() == 1
1327 			 && start == one_byte_from_end
1328 			 && x.iter().all(|ch| *ch == 1)));
1329 
1330         // Mark all as sent. Get nothing
1331         txb.mark_as_sent(0, SEND_BUFFER_SIZE);
1332         assert!(matches!(txb.next_bytes(), None));
1333 
1334         // Mark as lost. Get it again
1335         txb.mark_as_lost(one_byte_from_end, 1);
1336         assert!(matches!(txb.next_bytes(),
1337 			 Some((start, x)) if x.len() == 1
1338 			 && start == one_byte_from_end
1339 			 && x.iter().all(|ch| *ch == 1)));
1340 
1341         // Mark a larger range lost, including beyond what's in the buffer even.
1342         // Get a little more
1343         let five_bytes_from_end = SEND_BUFFER_SIZE as u64 - 5;
1344         txb.mark_as_lost(five_bytes_from_end, 100);
1345         assert!(matches!(txb.next_bytes(),
1346 			 Some((start, x)) if x.len() == 5
1347 			 && start == five_bytes_from_end
1348 			 && x.iter().all(|ch| *ch == 1)));
1349 
1350         // Contig acked range at start means it can be removed from buffer
1351         // Impl of vecdeque should now result in a split buffer when more data
1352         // is sent
1353         txb.mark_as_acked(0, five_bytes_from_end as usize);
1354         assert_eq!(txb.send(&[2; 30]), 30);
1355         // Just get 5 even though there is more
1356         assert!(matches!(txb.next_bytes(),
1357 			 Some((start, x)) if x.len() == 5
1358 			 && start == five_bytes_from_end
1359 			 && x.iter().all(|ch| *ch == 1)));
1360         assert_eq!(txb.retired, five_bytes_from_end);
1361         assert_eq!(txb.buffered(), 35);
1362 
1363         // Marking that bit as sent should let the last contig bit be returned
1364         // when called again
1365         txb.mark_as_sent(five_bytes_from_end, 5);
1366         assert!(matches!(txb.next_bytes(),
1367 			 Some((start, x)) if x.len() == 30
1368 			 && start == SEND_BUFFER_SIZE as u64
1369 			 && x.iter().all(|ch| *ch == 2)));
1370     }
1371 
1372     #[test]
tx_buffer_next_bytes_2()1373     fn tx_buffer_next_bytes_2() {
1374         let mut txb = TxBuffer::new();
1375 
1376         assert_eq!(txb.avail(), SEND_BUFFER_SIZE);
1377 
1378         // Fill the buffer
1379         assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE);
1380         assert!(matches!(txb.next_bytes(),
1381 			 Some((0, x)) if x.len()==SEND_BUFFER_SIZE
1382 			 && x.iter().all(|ch| *ch == 1)));
1383 
1384         // As above
1385         let forty_bytes_from_end = SEND_BUFFER_SIZE as u64 - 40;
1386 
1387         txb.mark_as_acked(0, forty_bytes_from_end as usize);
1388         assert!(matches!(txb.next_bytes(),
1389                  Some((start, x)) if x.len() == 40
1390                  && start == forty_bytes_from_end
1391         ));
1392 
1393         // Valid new data placed in split locations
1394         assert_eq!(txb.send(&[2; 100]), 100);
1395 
1396         // Mark a little more as sent
1397         txb.mark_as_sent(forty_bytes_from_end, 10);
1398         let thirty_bytes_from_end = forty_bytes_from_end + 10;
1399         assert!(matches!(txb.next_bytes(),
1400 			 Some((start, x)) if x.len() == 30
1401 			 && start == thirty_bytes_from_end
1402 			 && x.iter().all(|ch| *ch == 1)));
1403 
1404         // Mark a range 'A' in second slice as sent. Should still return the same
1405         let range_a_start = SEND_BUFFER_SIZE as u64 + 30;
1406         let range_a_end = range_a_start + 10;
1407         txb.mark_as_sent(range_a_start, 10);
1408         assert!(matches!(txb.next_bytes(),
1409 			 Some((start, x)) if x.len() == 30
1410 			 && start == thirty_bytes_from_end
1411 			 && x.iter().all(|ch| *ch == 1)));
1412 
1413         // Ack entire first slice and into second slice
1414         let ten_bytes_past_end = SEND_BUFFER_SIZE as u64 + 10;
1415         txb.mark_as_acked(0, ten_bytes_past_end as usize);
1416 
1417         // Get up to marked range A
1418         assert!(matches!(txb.next_bytes(),
1419 			 Some((start, x)) if x.len() == 20
1420 			 && start == ten_bytes_past_end
1421 			 && x.iter().all(|ch| *ch == 2)));
1422 
1423         txb.mark_as_sent(ten_bytes_past_end, 20);
1424 
1425         // Get bit after earlier marked range A
1426         assert!(matches!(txb.next_bytes(),
1427 			 Some((start, x)) if x.len() == 60
1428 			 && start == range_a_end
1429 			 && x.iter().all(|ch| *ch == 2)));
1430 
1431         // No more bytes.
1432         txb.mark_as_sent(range_a_end, 60);
1433         assert!(matches!(txb.next_bytes(), None));
1434     }
1435 
1436     #[test]
test_stream_tx()1437     fn test_stream_tx() {
1438         let conn_fc = connection_fc(4096);
1439         let conn_events = ConnectionEvents::default();
1440 
1441         let mut s = SendStream::new(4.into(), 1024, Rc::clone(&conn_fc), conn_events);
1442 
1443         let res = s.send(&[4; 100]).unwrap();
1444         assert_eq!(res, 100);
1445         s.mark_as_sent(0, 50, false);
1446         if let SendStreamState::Send { fc, .. } = s.state() {
1447             assert_eq!(fc.used(), 100);
1448         } else {
1449             panic!("unexpected stream state");
1450         }
1451 
1452         // Should hit stream flow control limit before filling up send buffer
1453         let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap();
1454         assert_eq!(res, 1024 - 100);
1455 
1456         // should do nothing, max stream data already 1024
1457         s.set_max_stream_data(1024);
1458         let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap();
1459         assert_eq!(res, 0);
1460 
1461         // should now hit the conn flow control (4096)
1462         s.set_max_stream_data(1_048_576);
1463         let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap();
1464         assert_eq!(res, 3072);
1465 
1466         // should now hit the tx buffer size
1467         conn_fc.borrow_mut().update(SEND_BUFFER_SIZE as u64);
1468         let res = s.send(&[4; SEND_BUFFER_SIZE + 100]).unwrap();
1469         assert_eq!(res, SEND_BUFFER_SIZE - 4096);
1470 
1471         // TODO(agrover@mozilla.com): test ooo acks somehow
1472         s.mark_as_acked(0, 40, false);
1473     }
1474 
1475     #[test]
test_tx_buffer_acks()1476     fn test_tx_buffer_acks() {
1477         let mut tx = TxBuffer::new();
1478         assert_eq!(tx.send(&[4; 100]), 100);
1479         let res = tx.next_bytes().unwrap();
1480         assert_eq!(res.0, 0);
1481         assert_eq!(res.1.len(), 100);
1482         tx.mark_as_sent(0, 100);
1483         let res = tx.next_bytes();
1484         assert_eq!(res, None);
1485 
1486         tx.mark_as_acked(0, 100);
1487         let res = tx.next_bytes();
1488         assert_eq!(res, None);
1489     }
1490 
1491     #[test]
send_stream_writable_event_gen()1492     fn send_stream_writable_event_gen() {
1493         let conn_fc = connection_fc(2);
1494         let mut conn_events = ConnectionEvents::default();
1495 
1496         let mut s = SendStream::new(4.into(), 0, Rc::clone(&conn_fc), conn_events.clone());
1497 
1498         // Stream is initially blocked (conn:2, stream:0)
1499         // and will not accept data.
1500         assert_eq!(s.send(b"hi").unwrap(), 0);
1501 
1502         // increasing to (conn:2, stream:2) will allow 2 bytes, and also
1503         // generate a SendStreamWritable event.
1504         s.set_max_stream_data(2);
1505         let evts = conn_events.events().collect::<Vec<_>>();
1506         assert_eq!(evts.len(), 1);
1507         assert!(matches!(
1508             evts[0],
1509             ConnectionEvent::SendStreamWritable { .. }
1510         ));
1511         assert_eq!(s.send(b"hello").unwrap(), 2);
1512 
1513         // increasing to (conn:2, stream:4) will not generate an event or allow
1514         // sending anything.
1515         s.set_max_stream_data(4);
1516         assert_eq!(conn_events.events().count(), 0);
1517         assert_eq!(s.send(b"hello").unwrap(), 0);
1518 
1519         // Increasing conn max (conn:4, stream:4) will unblock but not emit
1520         // event b/c that happens in Connection::emit_frame() (tested in
1521         // connection.rs)
1522         assert!(conn_fc.borrow_mut().update(4));
1523         assert_eq!(conn_events.events().count(), 0);
1524         assert_eq!(s.avail(), 2);
1525         assert_eq!(s.send(b"hello").unwrap(), 2);
1526 
1527         // No event because still blocked by conn
1528         s.set_max_stream_data(1_000_000_000);
1529         assert_eq!(conn_events.events().count(), 0);
1530 
1531         // No event because happens in emit_frame()
1532         conn_fc.borrow_mut().update(1_000_000_000);
1533         assert_eq!(conn_events.events().count(), 0);
1534 
1535         // Unblocking both by a large amount will cause avail() to be limited by
1536         // tx buffer size.
1537         assert_eq!(s.avail(), SEND_BUFFER_SIZE - 4);
1538 
1539         assert_eq!(
1540             s.send(&[b'a'; SEND_BUFFER_SIZE]).unwrap(),
1541             SEND_BUFFER_SIZE - 4
1542         );
1543 
1544         // No event because still blocked by tx buffer full
1545         s.set_max_stream_data(2_000_000_000);
1546         assert_eq!(conn_events.events().count(), 0);
1547         assert_eq!(s.send(b"hello").unwrap(), 0);
1548     }
1549 
1550     #[test]
send_stream_writable_event_new_stream()1551     fn send_stream_writable_event_new_stream() {
1552         let conn_fc = connection_fc(2);
1553         let mut conn_events = ConnectionEvents::default();
1554 
1555         let _s = SendStream::new(4.into(), 100, conn_fc, conn_events.clone());
1556 
1557         // Creating a new stream with conn and stream credits should result in
1558         // an event.
1559         let evts = conn_events.events().collect::<Vec<_>>();
1560         assert_eq!(evts.len(), 1);
1561         assert!(matches!(
1562             evts[0],
1563             ConnectionEvent::SendStreamWritable { .. }
1564         ));
1565     }
1566 
as_stream_token(t: &RecoveryToken) -> &SendStreamRecoveryToken1567     fn as_stream_token(t: &RecoveryToken) -> &SendStreamRecoveryToken {
1568         if let RecoveryToken::Stream(StreamRecoveryToken::Stream(rt)) = &t {
1569             rt
1570         } else {
1571             panic!();
1572         }
1573     }
1574 
1575     #[test]
1576     // Verify lost frames handle fin properly
send_stream_get_frame_data()1577     fn send_stream_get_frame_data() {
1578         let conn_fc = connection_fc(100);
1579         let conn_events = ConnectionEvents::default();
1580 
1581         let mut s = SendStream::new(0.into(), 100, conn_fc, conn_events);
1582         s.send(&[0; 10]).unwrap();
1583         s.close();
1584 
1585         let mut ss = SendStreams::default();
1586         ss.insert(StreamId::from(0), s);
1587 
1588         let mut tokens = Vec::new();
1589         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1590 
1591         // Write a small frame: no fin.
1592         let written = builder.len();
1593         builder.set_limit(written + 6);
1594         ss.write_frames(
1595             TransmissionPriority::default(),
1596             &mut builder,
1597             &mut tokens,
1598             &mut FrameStats::default(),
1599         );
1600         assert_eq!(builder.len(), written + 6);
1601         assert_eq!(tokens.len(), 1);
1602         let f1_token = tokens.remove(0);
1603         assert!(!as_stream_token(&f1_token).fin);
1604 
1605         // Write the rest: fin.
1606         let written = builder.len();
1607         builder.set_limit(written + 200);
1608         ss.write_frames(
1609             TransmissionPriority::default(),
1610             &mut builder,
1611             &mut tokens,
1612             &mut FrameStats::default(),
1613         );
1614         assert_eq!(builder.len(), written + 10);
1615         assert_eq!(tokens.len(), 1);
1616         let f2_token = tokens.remove(0);
1617         assert!(as_stream_token(&f2_token).fin);
1618 
1619         // Should be no more data to frame.
1620         let written = builder.len();
1621         ss.write_frames(
1622             TransmissionPriority::default(),
1623             &mut builder,
1624             &mut tokens,
1625             &mut FrameStats::default(),
1626         );
1627         assert_eq!(builder.len(), written);
1628         assert!(tokens.is_empty());
1629 
1630         // Mark frame 1 as lost
1631         ss.lost(as_stream_token(&f1_token));
1632 
1633         // Next frame should not set fin even though stream has fin but frame
1634         // does not include end of stream
1635         let written = builder.len();
1636         ss.write_frames(
1637             TransmissionPriority::default() + RetransmissionPriority::default(),
1638             &mut builder,
1639             &mut tokens,
1640             &mut FrameStats::default(),
1641         );
1642         assert_eq!(builder.len(), written + 7); // Needs a length this time.
1643         assert_eq!(tokens.len(), 1);
1644         let f4_token = tokens.remove(0);
1645         assert!(!as_stream_token(&f4_token).fin);
1646 
1647         // Mark frame 2 as lost
1648         ss.lost(as_stream_token(&f2_token));
1649 
1650         // Next frame should set fin because it includes end of stream
1651         let written = builder.len();
1652         ss.write_frames(
1653             TransmissionPriority::default() + RetransmissionPriority::default(),
1654             &mut builder,
1655             &mut tokens,
1656             &mut FrameStats::default(),
1657         );
1658         assert_eq!(builder.len(), written + 10);
1659         assert_eq!(tokens.len(), 1);
1660         let f5_token = tokens.remove(0);
1661         assert!(as_stream_token(&f5_token).fin);
1662     }
1663 
1664     #[test]
1665     #[allow(clippy::cognitive_complexity)]
1666     // Verify lost frames handle fin properly with zero length fin
send_stream_get_frame_zerolength_fin()1667     fn send_stream_get_frame_zerolength_fin() {
1668         let conn_fc = connection_fc(100);
1669         let conn_events = ConnectionEvents::default();
1670 
1671         let mut s = SendStream::new(0.into(), 100, conn_fc, conn_events);
1672         s.send(&[0; 10]).unwrap();
1673 
1674         let mut ss = SendStreams::default();
1675         ss.insert(StreamId::from(0), s);
1676 
1677         let mut tokens = Vec::new();
1678         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1679         ss.write_frames(
1680             TransmissionPriority::default(),
1681             &mut builder,
1682             &mut tokens,
1683             &mut FrameStats::default(),
1684         );
1685         let f1_token = tokens.remove(0);
1686         assert_eq!(as_stream_token(&f1_token).offset, 0);
1687         assert_eq!(as_stream_token(&f1_token).length, 10);
1688         assert!(!as_stream_token(&f1_token).fin);
1689 
1690         // Should be no more data to frame
1691         ss.write_frames(
1692             TransmissionPriority::default(),
1693             &mut builder,
1694             &mut tokens,
1695             &mut FrameStats::default(),
1696         );
1697         assert!(tokens.is_empty());
1698 
1699         ss.get_mut(StreamId::from(0)).unwrap().close();
1700 
1701         ss.write_frames(
1702             TransmissionPriority::default(),
1703             &mut builder,
1704             &mut tokens,
1705             &mut FrameStats::default(),
1706         );
1707         let f2_token = tokens.remove(0);
1708         assert_eq!(as_stream_token(&f2_token).offset, 10);
1709         assert_eq!(as_stream_token(&f2_token).length, 0);
1710         assert!(as_stream_token(&f2_token).fin);
1711 
1712         // Mark frame 2 as lost
1713         ss.lost(as_stream_token(&f2_token));
1714 
1715         // Next frame should set fin
1716         ss.write_frames(
1717             TransmissionPriority::default(),
1718             &mut builder,
1719             &mut tokens,
1720             &mut FrameStats::default(),
1721         );
1722         let f3_token = tokens.remove(0);
1723         assert_eq!(as_stream_token(&f3_token).offset, 10);
1724         assert_eq!(as_stream_token(&f3_token).length, 0);
1725         assert!(as_stream_token(&f3_token).fin);
1726 
1727         // Mark frame 1 as lost
1728         ss.lost(as_stream_token(&f1_token));
1729 
1730         // Next frame should set fin and include all data
1731         ss.write_frames(
1732             TransmissionPriority::default(),
1733             &mut builder,
1734             &mut tokens,
1735             &mut FrameStats::default(),
1736         );
1737         let f4_token = tokens.remove(0);
1738         assert_eq!(as_stream_token(&f4_token).offset, 0);
1739         assert_eq!(as_stream_token(&f4_token).length, 10);
1740         assert!(as_stream_token(&f4_token).fin);
1741     }
1742 
1743     #[test]
data_blocked()1744     fn data_blocked() {
1745         let conn_fc = connection_fc(5);
1746         let conn_events = ConnectionEvents::default();
1747 
1748         let stream_id = StreamId::from(4);
1749         let mut s = SendStream::new(stream_id, 2, Rc::clone(&conn_fc), conn_events);
1750 
1751         // Only two bytes can be sent due to the stream limit.
1752         assert_eq!(s.send(b"abc").unwrap(), 2);
1753         assert_eq!(s.next_bytes(false), Some((0, &b"ab"[..])));
1754 
1755         // This doesn't report blocking yet.
1756         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1757         let mut tokens = Vec::new();
1758         let mut stats = FrameStats::default();
1759         s.write_blocked_frame(
1760             TransmissionPriority::default(),
1761             &mut builder,
1762             &mut tokens,
1763             &mut stats,
1764         );
1765         assert_eq!(stats.stream_data_blocked, 0);
1766 
1767         // Blocking is reported after sending the last available credit.
1768         s.mark_as_sent(0, 2, false);
1769         s.write_blocked_frame(
1770             TransmissionPriority::default(),
1771             &mut builder,
1772             &mut tokens,
1773             &mut stats,
1774         );
1775         assert_eq!(stats.stream_data_blocked, 1);
1776 
1777         // Now increase the stream limit and test the connection limit.
1778         s.set_max_stream_data(10);
1779 
1780         assert_eq!(s.send(b"abcd").unwrap(), 3);
1781         assert_eq!(s.next_bytes(false), Some((2, &b"abc"[..])));
1782         // DATA_BLOCKED is not sent yet.
1783         conn_fc
1784             .borrow_mut()
1785             .write_frames(&mut builder, &mut tokens, &mut stats);
1786         assert_eq!(stats.data_blocked, 0);
1787 
1788         // DATA_BLOCKED is queued once bytes using all credit are sent.
1789         s.mark_as_sent(2, 3, false);
1790         conn_fc
1791             .borrow_mut()
1792             .write_frames(&mut builder, &mut tokens, &mut stats);
1793         assert_eq!(stats.data_blocked, 1);
1794     }
1795 
1796     #[test]
data_blocked_atomic()1797     fn data_blocked_atomic() {
1798         let conn_fc = connection_fc(5);
1799         let conn_events = ConnectionEvents::default();
1800 
1801         let stream_id = StreamId::from(4);
1802         let mut s = SendStream::new(stream_id, 2, Rc::clone(&conn_fc), conn_events);
1803 
1804         // Stream is initially blocked (conn:5, stream:2)
1805         // and will not accept atomic write of 3 bytes.
1806         assert_eq!(s.send_atomic(b"abc").unwrap(), 0);
1807 
1808         // Assert that STREAM_DATA_BLOCKED is sent.
1809         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1810         let mut tokens = Vec::new();
1811         let mut stats = FrameStats::default();
1812         s.write_blocked_frame(
1813             TransmissionPriority::default(),
1814             &mut builder,
1815             &mut tokens,
1816             &mut stats,
1817         );
1818         assert_eq!(stats.stream_data_blocked, 1);
1819 
1820         // Assert that a non-atomic write works.
1821         assert_eq!(s.send(b"abc").unwrap(), 2);
1822         assert_eq!(s.next_bytes(false), Some((0, &b"ab"[..])));
1823         s.mark_as_sent(0, 2, false);
1824 
1825         // Set limits to (conn:5, stream:10).
1826         s.set_max_stream_data(10);
1827 
1828         // An atomic write of 4 bytes exceeds the remaining limit of 3.
1829         assert_eq!(s.send_atomic(b"abcd").unwrap(), 0);
1830 
1831         // Assert that DATA_BLOCKED is sent.
1832         conn_fc
1833             .borrow_mut()
1834             .write_frames(&mut builder, &mut tokens, &mut stats);
1835         assert_eq!(stats.data_blocked, 1);
1836 
1837         // Check that a non-atomic write works.
1838         assert_eq!(s.send(b"abcd").unwrap(), 3);
1839         assert_eq!(s.next_bytes(false), Some((2, &b"abc"[..])));
1840         s.mark_as_sent(2, 3, false);
1841 
1842         // Increase limits to (conn:15, stream:15).
1843         s.set_max_stream_data(15);
1844         conn_fc.borrow_mut().update(15);
1845 
1846         // Check that atomic writing right up to the limit works.
1847         assert_eq!(s.send_atomic(b"abcdefghij").unwrap(), 10);
1848     }
1849 
1850     #[test]
ack_fin_first()1851     fn ack_fin_first() {
1852         const MESSAGE: &[u8] = b"hello";
1853         let len_u64 = u64::try_from(MESSAGE.len()).unwrap();
1854 
1855         let conn_fc = connection_fc(len_u64);
1856         let conn_events = ConnectionEvents::default();
1857 
1858         let mut s = SendStream::new(StreamId::new(100), 0, conn_fc, conn_events);
1859         s.set_max_stream_data(len_u64);
1860 
1861         // Send all the data, then the fin.
1862         let _ = s.send(MESSAGE).unwrap();
1863         s.mark_as_sent(0, MESSAGE.len(), false);
1864         s.close();
1865         s.mark_as_sent(len_u64, 0, true);
1866 
1867         // Ack the fin, then the data.
1868         s.mark_as_acked(len_u64, 0, true);
1869         s.mark_as_acked(0, MESSAGE.len(), false);
1870         assert!(s.is_terminal());
1871     }
1872 
1873     #[test]
ack_then_lose_fin()1874     fn ack_then_lose_fin() {
1875         const MESSAGE: &[u8] = b"hello";
1876         let len_u64 = u64::try_from(MESSAGE.len()).unwrap();
1877 
1878         let conn_fc = connection_fc(len_u64);
1879         let conn_events = ConnectionEvents::default();
1880 
1881         let id = StreamId::new(100);
1882         let mut s = SendStream::new(id, 0, conn_fc, conn_events);
1883         s.set_max_stream_data(len_u64);
1884 
1885         // Send all the data, then the fin.
1886         let _ = s.send(MESSAGE).unwrap();
1887         s.mark_as_sent(0, MESSAGE.len(), false);
1888         s.close();
1889         s.mark_as_sent(len_u64, 0, true);
1890 
1891         // Ack the fin, then mark it lost.
1892         s.mark_as_acked(len_u64, 0, true);
1893         s.mark_as_lost(len_u64, 0, true);
1894 
1895         // No frame should be sent here.
1896         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1897         let mut tokens = Vec::new();
1898         let mut stats = FrameStats::default();
1899         s.write_stream_frame(
1900             TransmissionPriority::default(),
1901             &mut builder,
1902             &mut tokens,
1903             &mut stats,
1904         );
1905         assert_eq!(stats.stream, 0);
1906     }
1907 
1908     /// Create a `SendStream` and force it into a state where it believes that
1909     /// `offset` bytes have already been sent and acknowledged.
stream_with_sent(stream: u64, offset: usize) -> SendStream1910     fn stream_with_sent(stream: u64, offset: usize) -> SendStream {
1911         const MAX_VARINT: u64 = (1 << 62) - 1;
1912 
1913         let conn_fc = connection_fc(MAX_VARINT);
1914         let mut s = SendStream::new(
1915             StreamId::from(stream),
1916             MAX_VARINT,
1917             conn_fc,
1918             ConnectionEvents::default(),
1919         );
1920 
1921         let mut send_buf = TxBuffer::new();
1922         send_buf.retired = u64::try_from(offset).unwrap();
1923         send_buf.ranges.mark_range(0, offset, RangeState::Acked);
1924         let mut fc = SenderFlowControl::new(StreamId::from(stream), MAX_VARINT);
1925         fc.consume(offset);
1926         let conn_fc = Rc::new(RefCell::new(SenderFlowControl::new((), MAX_VARINT)));
1927         s.state = SendStreamState::Send {
1928             fc,
1929             conn_fc,
1930             send_buf,
1931         };
1932         s
1933     }
1934 
frame_sent_sid(stream: u64, offset: usize, len: usize, fin: bool, space: usize) -> bool1935     fn frame_sent_sid(stream: u64, offset: usize, len: usize, fin: bool, space: usize) -> bool {
1936         const BUF: &[u8] = &[0x42; 128];
1937 
1938         qtrace!(
1939             "frame_sent stream={} offset={} len={} fin={}, space={}",
1940             stream,
1941             offset,
1942             len,
1943             fin,
1944             space
1945         );
1946 
1947         let mut s = stream_with_sent(stream, offset);
1948 
1949         // Now write out the proscribed data and maybe close.
1950         if len > 0 {
1951             s.send(&BUF[..len]).unwrap();
1952         }
1953         if fin {
1954             s.close();
1955         }
1956 
1957         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1958         let header_len = builder.len();
1959         builder.set_limit(header_len + space);
1960 
1961         let mut tokens = Vec::new();
1962         let mut stats = FrameStats::default();
1963         s.write_stream_frame(
1964             TransmissionPriority::default(),
1965             &mut builder,
1966             &mut tokens,
1967             &mut stats,
1968         );
1969         qtrace!("STREAM frame: {}", hex_with_len(&builder[header_len..]));
1970         stats.stream > 0
1971     }
1972 
frame_sent(offset: usize, len: usize, fin: bool, space: usize) -> bool1973     fn frame_sent(offset: usize, len: usize, fin: bool, space: usize) -> bool {
1974         frame_sent_sid(0, offset, len, fin, space)
1975     }
1976 
1977     #[test]
stream_frame_empty()1978     fn stream_frame_empty() {
1979         // Stream frames with empty data and no fin never work.
1980         assert!(!frame_sent(10, 0, false, 2));
1981         assert!(!frame_sent(10, 0, false, 3));
1982         assert!(!frame_sent(10, 0, false, 4));
1983         assert!(!frame_sent(10, 0, false, 5));
1984         assert!(!frame_sent(10, 0, false, 100));
1985 
1986         // Empty data with fin is only a problem if there is no space.
1987         assert!(!frame_sent(0, 0, true, 1));
1988         assert!(frame_sent(0, 0, true, 2));
1989         assert!(!frame_sent(10, 0, true, 2));
1990         assert!(frame_sent(10, 0, true, 3));
1991         assert!(frame_sent(10, 0, true, 4));
1992         assert!(frame_sent(10, 0, true, 5));
1993         assert!(frame_sent(10, 0, true, 100));
1994     }
1995 
1996     #[test]
stream_frame_minimum()1997     fn stream_frame_minimum() {
1998         // Add minimum data
1999         assert!(!frame_sent(10, 1, false, 3));
2000         assert!(!frame_sent(10, 1, true, 3));
2001         assert!(frame_sent(10, 1, false, 4));
2002         assert!(frame_sent(10, 1, true, 4));
2003         assert!(frame_sent(10, 1, false, 5));
2004         assert!(frame_sent(10, 1, true, 5));
2005         assert!(frame_sent(10, 1, false, 100));
2006         assert!(frame_sent(10, 1, true, 100));
2007     }
2008 
2009     #[test]
stream_frame_more()2010     fn stream_frame_more() {
2011         // Try more data
2012         assert!(!frame_sent(10, 100, false, 3));
2013         assert!(!frame_sent(10, 100, true, 3));
2014         assert!(frame_sent(10, 100, false, 4));
2015         assert!(frame_sent(10, 100, true, 4));
2016         assert!(frame_sent(10, 100, false, 5));
2017         assert!(frame_sent(10, 100, true, 5));
2018         assert!(frame_sent(10, 100, false, 100));
2019         assert!(frame_sent(10, 100, true, 100));
2020 
2021         assert!(frame_sent(10, 100, false, 1000));
2022         assert!(frame_sent(10, 100, true, 1000));
2023     }
2024 
2025     #[test]
stream_frame_big_id()2026     fn stream_frame_big_id() {
2027         // A value that encodes to the largest varint.
2028         const BIG: u64 = 1 << 30;
2029         const BIGSZ: usize = 1 << 30;
2030 
2031         assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 16));
2032         assert!(!frame_sent_sid(BIG, BIGSZ, 0, true, 16));
2033         assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 17));
2034         assert!(frame_sent_sid(BIG, BIGSZ, 0, true, 17));
2035         assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 18));
2036         assert!(frame_sent_sid(BIG, BIGSZ, 0, true, 18));
2037 
2038         assert!(!frame_sent_sid(BIG, BIGSZ, 1, false, 17));
2039         assert!(!frame_sent_sid(BIG, BIGSZ, 1, true, 17));
2040         assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 18));
2041         assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 18));
2042         assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 19));
2043         assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 19));
2044         assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 100));
2045         assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 100));
2046     }
2047 
stream_frame_at_boundary(data: &[u8])2048     fn stream_frame_at_boundary(data: &[u8]) {
2049         fn send_with_extra_capacity(data: &[u8], extra: usize, expect_full: bool) -> Vec<u8> {
2050             qtrace!("send_with_extra_capacity {} + {}", data.len(), extra);
2051             let mut s = stream_with_sent(0, 0);
2052             s.send(data).unwrap();
2053             s.close();
2054 
2055             let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
2056             let header_len = builder.len();
2057             // Add 2 for the frame type and stream ID, then add the extra.
2058             builder.set_limit(header_len + data.len() + 2 + extra);
2059             let mut tokens = Vec::new();
2060             let mut stats = FrameStats::default();
2061             s.write_stream_frame(
2062                 TransmissionPriority::default(),
2063                 &mut builder,
2064                 &mut tokens,
2065                 &mut stats,
2066             );
2067             assert_eq!(stats.stream, 1);
2068             assert_eq!(builder.is_full(), expect_full);
2069             Vec::from(Encoder::from(builder)).split_off(header_len)
2070         }
2071 
2072         // The minimum amount of extra space for getting another frame in.
2073         let mut enc = Encoder::new();
2074         enc.encode_varint(u64::try_from(data.len()).unwrap());
2075         let len_buf = Vec::from(enc);
2076         let minimum_extra = len_buf.len() + PacketBuilder::MINIMUM_FRAME_SIZE;
2077 
2078         // For anything short of the minimum extra, the frame should fill the packet.
2079         for i in 0..minimum_extra {
2080             let frame = send_with_extra_capacity(data, i, true);
2081             let (header, body) = frame.split_at(2);
2082             assert_eq!(header, &[0b1001, 0]);
2083             assert_eq!(body, data);
2084         }
2085 
2086         // Once there is space for another packet AND a length field,
2087         // then a length will be added.
2088         let frame = send_with_extra_capacity(data, minimum_extra, false);
2089         let (header, rest) = frame.split_at(2);
2090         assert_eq!(header, &[0b1011, 0]);
2091         let (len, body) = rest.split_at(len_buf.len());
2092         assert_eq!(len, &len_buf);
2093         assert_eq!(body, data);
2094     }
2095 
2096     /// 16383/16384 is an odd boundary in STREAM frame construction.
2097     /// That is the boundary where a length goes from 2 bytes to 4 bytes.
2098     /// Test that we correctly add a length field to the frame; and test
2099     /// that if we don't, then we don't allow other frames to be added.
2100     #[test]
stream_frame_16384()2101     fn stream_frame_16384() {
2102         stream_frame_at_boundary(&[4; 16383]);
2103         stream_frame_at_boundary(&[4; 16384]);
2104     }
2105 
2106     /// 63/64 is the other odd boundary.
2107     #[test]
stream_frame_64()2108     fn stream_frame_64() {
2109         stream_frame_at_boundary(&[2; 63]);
2110         stream_frame_at_boundary(&[2; 64]);
2111     }
2112 }
2113