1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 // Buffering data to send until it is acked.
8 
9 use std::cell::RefCell;
10 use std::cmp::{max, min, Ordering};
11 use std::collections::{BTreeMap, VecDeque};
12 use std::convert::TryFrom;
13 use std::mem;
14 use std::ops::Add;
15 use std::rc::Rc;
16 
17 use indexmap::IndexMap;
18 use smallvec::SmallVec;
19 
20 use neqo_common::{qdebug, qerror, qinfo, qtrace, Encoder, Role};
21 
22 use crate::events::ConnectionEvents;
23 use crate::fc::SenderFlowControl;
24 use crate::frame::{Frame, FRAME_TYPE_RESET_STREAM};
25 use crate::packet::PacketBuilder;
26 use crate::recovery::RecoveryToken;
27 use crate::stats::FrameStats;
28 use crate::stream_id::StreamId;
29 use crate::tparams::{self, TransportParameters};
30 use crate::{AppError, Error, Res};
31 
32 pub const SEND_BUFFER_SIZE: usize = 0x10_0000; // 1 MiB
33 
34 /// The priority that is assigned to sending data for the stream.
35 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
36 pub enum TransmissionPriority {
37     /// This stream is more important than the functioning of the connection.
38     /// Don't use this priority unless the stream really is that important.
39     /// A stream at this priority can starve out other connection functions,
40     /// including flow control, which could be very bad.
41     Critical,
42     /// The stream is very important.  Stream data will be written ahead of
43     /// some of the less critical connection functions, like path validation,
44     /// connection ID management, and session tickets.
45     Important,
46     /// High priority streams are important, but not enough to disrupt
47     /// connection operation.  They go ahead of session tickets though.
48     High,
49     /// The default priority.
50     Normal,
51     /// Low priority streams get sent last.
52     Low,
53 }
54 
55 impl Default for TransmissionPriority {
default() -> Self56     fn default() -> Self {
57         Self::Normal
58     }
59 }
60 
61 impl PartialOrd for TransmissionPriority {
partial_cmp(&self, other: &Self) -> Option<Ordering>62     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
63         Some(self.cmp(other))
64     }
65 }
66 
67 impl Ord for TransmissionPriority {
cmp(&self, other: &Self) -> Ordering68     fn cmp(&self, other: &Self) -> Ordering {
69         if self == other {
70             return Ordering::Equal;
71         }
72         match (self, other) {
73             (Self::Critical, _) => Ordering::Greater,
74             (_, Self::Critical) => Ordering::Less,
75             (Self::Important, _) => Ordering::Greater,
76             (_, Self::Important) => Ordering::Less,
77             (Self::High, _) => Ordering::Greater,
78             (_, Self::High) => Ordering::Less,
79             (Self::Normal, _) => Ordering::Greater,
80             (_, Self::Normal) => Ordering::Less,
81             _ => unreachable!(),
82         }
83     }
84 }
85 
86 impl Add<RetransmissionPriority> for TransmissionPriority {
87     type Output = Self;
add(self, rhs: RetransmissionPriority) -> Self::Output88     fn add(self, rhs: RetransmissionPriority) -> Self::Output {
89         match rhs {
90             RetransmissionPriority::Fixed(fixed) => fixed,
91             RetransmissionPriority::Same => self,
92             RetransmissionPriority::Higher => match self {
93                 Self::Critical => Self::Critical,
94                 Self::Important | Self::High => Self::Important,
95                 Self::Normal => Self::High,
96                 Self::Low => Self::Normal,
97             },
98             RetransmissionPriority::MuchHigher => match self {
99                 Self::Critical | Self::Important => Self::Critical,
100                 Self::High | Self::Normal => Self::Important,
101                 Self::Low => Self::High,
102             },
103         }
104     }
105 }
106 
107 /// If data is lost, this determines the priority that applies to retransmissions
108 /// of that data.
109 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
110 pub enum RetransmissionPriority {
111     /// Prioritize retransmission at a fixed priority.
112     /// With this, it is possible to prioritize retransmissions lower than transmissions.
113     /// Doing that can create a deadlock with flow control which might cause the connection
114     /// to stall unless new data stops arriving fast enough that retransmissions can complete.
115     Fixed(TransmissionPriority),
116     /// Don't increase priority for retransmission.  This is probably not a good idea
117     /// as it could mean starving flow control.
118     Same,
119     /// Increase the priority of retransmissions (the default).
120     /// Retransmissions of `Critical` or `Important` aren't elevated at all.
121     Higher,
122     /// Increase the priority of retransmissions a lot.
123     /// This is useful for streams that are particularly exposed to head-of-line blocking.
124     MuchHigher,
125 }
126 
127 impl Default for RetransmissionPriority {
default() -> Self128     fn default() -> Self {
129         Self::Higher
130     }
131 }
132 
133 #[derive(Debug, PartialEq, Clone, Copy)]
134 enum RangeState {
135     Sent,
136     Acked,
137 }
138 
139 /// Track ranges in the stream as sent or acked. Acked implies sent. Not in a
140 /// range implies needing-to-be-sent, either initially or as a retransmission.
141 #[derive(Debug, Default, PartialEq)]
142 struct RangeTracker {
143     // offset, (len, RangeState). Use u64 for len because ranges can exceed 32bits.
144     used: BTreeMap<u64, (u64, RangeState)>,
145 }
146 
147 impl RangeTracker {
highest_offset(&self) -> u64148     fn highest_offset(&self) -> u64 {
149         self.used
150             .range(..)
151             .next_back()
152             .map_or(0, |(k, (v, _))| *k + *v)
153     }
154 
acked_from_zero(&self) -> u64155     fn acked_from_zero(&self) -> u64 {
156         self.used
157             .get(&0)
158             .filter(|(_, state)| *state == RangeState::Acked)
159             .map_or(0, |(v, _)| *v)
160     }
161 
162     /// Find the first unmarked range. If all are contiguous, this will return
163     /// (highest_offset(), None).
first_unmarked_range(&self) -> (u64, Option<u64>)164     fn first_unmarked_range(&self) -> (u64, Option<u64>) {
165         let mut prev_end = 0;
166 
167         for (cur_off, (cur_len, _)) in &self.used {
168             if prev_end == *cur_off {
169                 prev_end = cur_off + cur_len;
170             } else {
171                 return (prev_end, Some(cur_off - prev_end));
172             }
173         }
174         (prev_end, None)
175     }
176 
177     /// Turn one range into a list of subranges that align with existing
178     /// ranges.
179     /// Check impermissible overlaps in subregions: Sent cannot overwrite Acked.
180     //
181     // e.g. given N is new and ABC are existing:
182     //             NNNNNNNNNNNNNNNN
183     //               AAAAA   BBBCCCCC  ...then we want 5 chunks:
184     //             1122222333444555
185     //
186     // but also if we have this:
187     //             NNNNNNNNNNNNNNNN
188     //           AAAAAAAAAA      BBBB  ...then break existing A and B ranges up:
189     //
190     //             1111111122222233
191     //           aaAAAAAAAA      BBbb
192     //
193     // Doing all this work up front should make handling each chunk much
194     // easier.
chunk_range_on_edges( &mut self, new_off: u64, new_len: u64, new_state: RangeState, ) -> Vec<(u64, u64, RangeState)>195     fn chunk_range_on_edges(
196         &mut self,
197         new_off: u64,
198         new_len: u64,
199         new_state: RangeState,
200     ) -> Vec<(u64, u64, RangeState)> {
201         let mut tmp_off = new_off;
202         let mut tmp_len = new_len;
203         let mut v = Vec::new();
204 
205         // cut previous overlapping range if needed
206         let prev = self.used.range_mut(..tmp_off).next_back();
207         if let Some((prev_off, (prev_len, prev_state))) = prev {
208             let prev_state = *prev_state;
209             let overlap = (*prev_off + *prev_len).saturating_sub(new_off);
210             *prev_len -= overlap;
211             if overlap > 0 {
212                 self.used.insert(new_off, (overlap, prev_state));
213             }
214         }
215 
216         let mut last_existing_remaining = None;
217         for (off, (len, state)) in self.used.range(tmp_off..tmp_off + tmp_len) {
218             // Create chunk for "overhang" before an existing range
219             if tmp_off < *off {
220                 let sub_len = off - tmp_off;
221                 v.push((tmp_off, sub_len, new_state));
222                 tmp_off += sub_len;
223                 tmp_len -= sub_len;
224             }
225 
226             // Create chunk to match existing range
227             let sub_len = min(*len, tmp_len);
228             let remaining_len = len - sub_len;
229             if new_state == RangeState::Sent && *state == RangeState::Acked {
230                 qinfo!(
231                     "Attempted to downgrade overlapping range Acked range {}-{} with Sent {}-{}",
232                     off,
233                     len,
234                     new_off,
235                     new_len
236                 );
237             } else {
238                 v.push((tmp_off, sub_len, new_state));
239             }
240             tmp_off += sub_len;
241             tmp_len -= sub_len;
242 
243             if remaining_len > 0 {
244                 last_existing_remaining = Some((*off, sub_len, remaining_len, *state));
245             }
246         }
247 
248         // Maybe break last existing range in two so that a final chunk will
249         // have the same length as an existing range entry
250         if let Some((off, sub_len, remaining_len, state)) = last_existing_remaining {
251             *self.used.get_mut(&off).expect("must be there") = (sub_len, state);
252             self.used.insert(off + sub_len, (remaining_len, state));
253         }
254 
255         // Create final chunk if anything remains of the new range
256         if tmp_len > 0 {
257             v.push((tmp_off, tmp_len, new_state))
258         }
259 
260         v
261     }
262 
263     /// Merge contiguous Acked ranges into the first entry (0). This range may
264     /// be dropped from the send buffer.
coalesce_acked_from_zero(&mut self)265     fn coalesce_acked_from_zero(&mut self) {
266         let acked_range_from_zero = self
267             .used
268             .get_mut(&0)
269             .filter(|(_, state)| *state == RangeState::Acked)
270             .map(|(len, _)| *len);
271 
272         if let Some(len_from_zero) = acked_range_from_zero {
273             let mut to_remove = SmallVec::<[_; 8]>::new();
274 
275             let mut new_len_from_zero = len_from_zero;
276 
277             // See if there's another Acked range entry contiguous to this one
278             while let Some((next_len, _)) = self
279                 .used
280                 .get(&new_len_from_zero)
281                 .filter(|(_, state)| *state == RangeState::Acked)
282             {
283                 to_remove.push(new_len_from_zero);
284                 new_len_from_zero += *next_len;
285             }
286 
287             if len_from_zero != new_len_from_zero {
288                 self.used.get_mut(&0).expect("must be there").0 = new_len_from_zero;
289             }
290 
291             for val in to_remove {
292                 self.used.remove(&val);
293             }
294         }
295     }
296 
mark_range(&mut self, off: u64, len: usize, state: RangeState)297     fn mark_range(&mut self, off: u64, len: usize, state: RangeState) {
298         if len == 0 {
299             qinfo!("mark 0-length range at {}", off);
300             return;
301         }
302 
303         let subranges = self.chunk_range_on_edges(off, len as u64, state);
304 
305         for (sub_off, sub_len, sub_state) in subranges {
306             self.used.insert(sub_off, (sub_len, sub_state));
307         }
308 
309         self.coalesce_acked_from_zero()
310     }
311 
unmark_range(&mut self, off: u64, len: usize)312     fn unmark_range(&mut self, off: u64, len: usize) {
313         if len == 0 {
314             qdebug!("unmark 0-length range at {}", off);
315             return;
316         }
317 
318         let len = u64::try_from(len).unwrap();
319         let end_off = off + len;
320 
321         let mut to_remove = SmallVec::<[_; 8]>::new();
322         let mut to_add = None;
323 
324         // Walk backwards through possibly affected existing ranges
325         for (cur_off, (cur_len, cur_state)) in self.used.range_mut(..off + len).rev() {
326             // Maybe fixup range preceding the removed range
327             if *cur_off < off {
328                 // Check for overlap
329                 if *cur_off + *cur_len > off {
330                     if *cur_state == RangeState::Acked {
331                         qdebug!(
332                             "Attempted to unmark Acked range {}-{} with unmark_range {}-{}",
333                             cur_off,
334                             cur_len,
335                             off,
336                             off + len
337                         );
338                     } else {
339                         *cur_len = off - cur_off;
340                     }
341                 }
342                 break;
343             }
344 
345             if *cur_state == RangeState::Acked {
346                 qdebug!(
347                     "Attempted to unmark Acked range {}-{} with unmark_range {}-{}",
348                     cur_off,
349                     cur_len,
350                     off,
351                     off + len
352                 );
353                 continue;
354             }
355 
356             // Add a new range for old subrange extending beyond
357             // to-be-unmarked range
358             let cur_end_off = cur_off + *cur_len;
359             if cur_end_off > end_off {
360                 let new_cur_off = off + len;
361                 let new_cur_len = cur_end_off - end_off;
362                 assert_eq!(to_add, None);
363                 to_add = Some((new_cur_off, new_cur_len, *cur_state));
364             }
365 
366             to_remove.push(*cur_off);
367         }
368 
369         for remove_off in to_remove {
370             self.used.remove(&remove_off);
371         }
372 
373         if let Some((new_cur_off, new_cur_len, cur_state)) = to_add {
374             self.used.insert(new_cur_off, (new_cur_len, cur_state));
375         }
376     }
377 
378     /// Unmark all sent ranges.
unmark_sent(&mut self)379     pub fn unmark_sent(&mut self) {
380         self.unmark_range(0, usize::try_from(self.highest_offset()).unwrap());
381     }
382 }
383 
384 /// Buffer to contain queued bytes and track their state.
385 #[derive(Debug, Default, PartialEq)]
386 pub struct TxBuffer {
387     retired: u64,           // contig acked bytes, no longer in buffer
388     send_buf: VecDeque<u8>, // buffer of not-acked bytes
389     ranges: RangeTracker,   // ranges in buffer that have been sent or acked
390 }
391 
392 impl TxBuffer {
new() -> Self393     pub fn new() -> Self {
394         Self::default()
395     }
396 
397     /// Attempt to add some or all of the passed-in buffer to the TxBuffer.
send(&mut self, buf: &[u8]) -> usize398     pub fn send(&mut self, buf: &[u8]) -> usize {
399         let can_buffer = min(SEND_BUFFER_SIZE - self.buffered(), buf.len());
400         if can_buffer > 0 {
401             self.send_buf.extend(&buf[..can_buffer]);
402             assert!(self.send_buf.len() <= SEND_BUFFER_SIZE);
403         }
404         can_buffer
405     }
406 
next_bytes(&self) -> Option<(u64, &[u8])>407     pub fn next_bytes(&self) -> Option<(u64, &[u8])> {
408         let (start, maybe_len) = self.ranges.first_unmarked_range();
409 
410         if start == self.retired + u64::try_from(self.buffered()).unwrap() {
411             return None;
412         }
413 
414         // Convert from ranges-relative-to-zero to
415         // ranges-relative-to-buffer-start
416         let buff_off = usize::try_from(start - self.retired).unwrap();
417 
418         // Deque returns two slices. Create a subslice from whichever
419         // one contains the first unmarked data.
420         let slc = if buff_off < self.send_buf.as_slices().0.len() {
421             &self.send_buf.as_slices().0[buff_off..]
422         } else {
423             &self.send_buf.as_slices().1[buff_off - self.send_buf.as_slices().0.len()..]
424         };
425 
426         let len = if let Some(range_len) = maybe_len {
427             // Truncate if range crosses deque slices
428             min(usize::try_from(range_len).unwrap(), slc.len())
429         } else {
430             slc.len()
431         };
432 
433         debug_assert!(len > 0);
434         debug_assert!(len <= slc.len());
435 
436         Some((start, &slc[..len]))
437     }
438 
mark_as_sent(&mut self, offset: u64, len: usize)439     pub fn mark_as_sent(&mut self, offset: u64, len: usize) {
440         self.ranges.mark_range(offset, len, RangeState::Sent)
441     }
442 
mark_as_acked(&mut self, offset: u64, len: usize)443     pub fn mark_as_acked(&mut self, offset: u64, len: usize) {
444         self.ranges.mark_range(offset, len, RangeState::Acked);
445 
446         // We can drop contig acked range from the buffer
447         let new_retirable = self.ranges.acked_from_zero() - self.retired;
448         debug_assert!(new_retirable <= self.buffered() as u64);
449         let keep_len =
450             self.buffered() - usize::try_from(new_retirable).expect("should fit in usize");
451 
452         // Truncate front
453         self.send_buf.rotate_left(self.buffered() - keep_len);
454         self.send_buf.truncate(keep_len);
455 
456         self.retired += new_retirable;
457     }
458 
mark_as_lost(&mut self, offset: u64, len: usize)459     pub fn mark_as_lost(&mut self, offset: u64, len: usize) {
460         self.ranges.unmark_range(offset, len)
461     }
462 
463     /// Forget about anything that was marked as sent.
unmark_sent(&mut self)464     pub fn unmark_sent(&mut self) {
465         self.ranges.unmark_sent();
466     }
467 
buffered(&self) -> usize468     fn buffered(&self) -> usize {
469         self.send_buf.len()
470     }
471 
avail(&self) -> usize472     fn avail(&self) -> usize {
473         SEND_BUFFER_SIZE - self.buffered()
474     }
475 
used(&self) -> u64476     fn used(&self) -> u64 {
477         self.retired + u64::try_from(self.buffered()).unwrap()
478     }
479 }
480 
481 /// QUIC sending stream states, based on -transport 3.1.
482 #[derive(Debug)]
483 pub(crate) enum SendStreamState {
484     Ready {
485         fc: SenderFlowControl<StreamId>,
486         conn_fc: Rc<RefCell<SenderFlowControl<()>>>,
487     },
488     Send {
489         fc: SenderFlowControl<StreamId>,
490         conn_fc: Rc<RefCell<SenderFlowControl<()>>>,
491         send_buf: TxBuffer,
492     },
493     // Note: `DataSent` is entered when the stream is closed, not when all data has been
494     // sent for the first time.
495     DataSent {
496         send_buf: TxBuffer,
497         fin_sent: bool,
498         fin_acked: bool,
499     },
500     DataRecvd,
501     ResetSent {
502         err: AppError,
503         final_size: u64,
504         priority: Option<TransmissionPriority>,
505     },
506     ResetRecvd,
507 }
508 
509 impl SendStreamState {
tx_buf_mut(&mut self) -> Option<&mut TxBuffer>510     fn tx_buf_mut(&mut self) -> Option<&mut TxBuffer> {
511         match self {
512             Self::Send { send_buf, .. } | Self::DataSent { send_buf, .. } => Some(send_buf),
513             Self::Ready { .. }
514             | Self::DataRecvd { .. }
515             | Self::ResetSent { .. }
516             | Self::ResetRecvd => None,
517         }
518     }
519 
tx_avail(&self) -> usize520     fn tx_avail(&self) -> usize {
521         match self {
522             // In Ready, TxBuffer not yet allocated but size is known
523             Self::Ready { .. } => SEND_BUFFER_SIZE,
524             Self::Send { send_buf, .. } | Self::DataSent { send_buf, .. } => send_buf.avail(),
525             Self::DataRecvd { .. } | Self::ResetSent { .. } | Self::ResetRecvd => 0,
526         }
527     }
528 
name(&self) -> &str529     fn name(&self) -> &str {
530         match self {
531             Self::Ready { .. } => "Ready",
532             Self::Send { .. } => "Send",
533             Self::DataSent { .. } => "DataSent",
534             Self::DataRecvd { .. } => "DataRecvd",
535             Self::ResetSent { .. } => "ResetSent",
536             Self::ResetRecvd => "ResetRecvd",
537         }
538     }
539 
transition(&mut self, new_state: Self)540     fn transition(&mut self, new_state: Self) {
541         qtrace!("SendStream state {} -> {}", self.name(), new_state.name());
542         *self = new_state;
543     }
544 }
545 
546 /// Implement a QUIC send stream.
547 #[derive(Debug)]
548 pub struct SendStream {
549     stream_id: StreamId,
550     state: SendStreamState,
551     conn_events: ConnectionEvents,
552     priority: TransmissionPriority,
553     retransmission_priority: RetransmissionPriority,
554     retransmission_offset: u64,
555 }
556 
557 impl SendStream {
new( stream_id: StreamId, max_stream_data: u64, conn_fc: Rc<RefCell<SenderFlowControl<()>>>, conn_events: ConnectionEvents, ) -> Self558     pub fn new(
559         stream_id: StreamId,
560         max_stream_data: u64,
561         conn_fc: Rc<RefCell<SenderFlowControl<()>>>,
562         conn_events: ConnectionEvents,
563     ) -> Self {
564         let ss = Self {
565             stream_id,
566             state: SendStreamState::Ready {
567                 fc: SenderFlowControl::new(stream_id, max_stream_data),
568                 conn_fc,
569             },
570             conn_events,
571             priority: TransmissionPriority::default(),
572             retransmission_priority: RetransmissionPriority::default(),
573             retransmission_offset: 0,
574         };
575         if ss.avail() > 0 {
576             ss.conn_events.send_stream_writable(stream_id);
577         }
578         ss
579     }
580 
set_priority( &mut self, transmission: TransmissionPriority, retransmission: RetransmissionPriority, )581     pub fn set_priority(
582         &mut self,
583         transmission: TransmissionPriority,
584         retransmission: RetransmissionPriority,
585     ) {
586         self.priority = transmission;
587         self.retransmission_priority = retransmission;
588     }
589 
590     /// If all data has been buffered or written, how much was sent.
final_size(&self) -> Option<u64>591     pub fn final_size(&self) -> Option<u64> {
592         match &self.state {
593             SendStreamState::DataSent { send_buf, .. } => Some(send_buf.used()),
594             SendStreamState::ResetSent { final_size, .. } => Some(*final_size),
595             _ => None,
596         }
597     }
598 
599     /// Return the next range to be sent, if any.
600     /// If this is a retransmission, cut off what is sent at the retransmission
601     /// offset.
next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])>602     fn next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])> {
603         match self.state {
604             SendStreamState::Send { ref send_buf, .. } => {
605                 send_buf.next_bytes().and_then(|(offset, slice)| {
606                     if retransmission_only {
607                         qtrace!(
608                             [self],
609                             "next_bytes apply retransmission limit at {}",
610                             self.retransmission_offset
611                         );
612                         if self.retransmission_offset > offset {
613                             let len = min(
614                                 usize::try_from(self.retransmission_offset - offset).unwrap(),
615                                 slice.len(),
616                             );
617                             Some((offset, &slice[..len]))
618                         } else {
619                             None
620                         }
621                     } else {
622                         Some((offset, slice))
623                     }
624                 })
625             }
626             SendStreamState::DataSent {
627                 ref send_buf,
628                 fin_sent,
629                 ..
630             } => {
631                 let bytes = send_buf.next_bytes();
632                 if bytes.is_some() {
633                     bytes
634                 } else if fin_sent {
635                     None
636                 } else {
637                     // Send empty stream frame with fin set
638                     Some((send_buf.used(), &[]))
639                 }
640             }
641             SendStreamState::Ready { .. }
642             | SendStreamState::DataRecvd { .. }
643             | SendStreamState::ResetSent { .. }
644             | SendStreamState::ResetRecvd => None,
645         }
646     }
647 
648     /// Calculate how many bytes (length) can fit into available space and whether
649     /// the remainder of the space can be filled (or if a length field is needed).
length_and_fill(data_len: usize, space: usize) -> (usize, bool)650     fn length_and_fill(data_len: usize, space: usize) -> (usize, bool) {
651         if data_len >= space {
652             // Either more data than space allows, or an exact fit.
653             qtrace!("SendStream::length_and_fill fill {}", space);
654             return (space, true);
655         }
656 
657         // Estimate size of the length field based on the available space,
658         // less 1, which is the worst case.
659         let length = min(space.saturating_sub(1), data_len);
660         let length_len = Encoder::varint_len(u64::try_from(length).unwrap());
661         if length_len > space {
662             qtrace!(
663                 "SendStream::length_and_fill no room for length of {} in {}",
664                 length,
665                 space
666             );
667             return (0, false);
668         }
669 
670         let length = min(data_len, space - length_len);
671         qtrace!("SendStream::length_and_fill {} in {}", length, space);
672         (length, false)
673     }
674 
675     /// Maybe write a `STREAM` frame.
write_stream_frame( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )676     fn write_stream_frame(
677         &mut self,
678         priority: TransmissionPriority,
679         builder: &mut PacketBuilder,
680         tokens: &mut Vec<RecoveryToken>,
681         stats: &mut FrameStats,
682     ) {
683         let retransmission = if priority == self.priority {
684             false
685         } else if priority == self.priority + self.retransmission_priority {
686             true
687         } else {
688             return;
689         };
690 
691         let id = self.stream_id;
692         let final_size = self.final_size();
693         if let Some((offset, data)) = self.next_bytes(retransmission) {
694             let overhead = 1 // Frame type
695                 + Encoder::varint_len(id.as_u64())
696                 + if offset > 0 {
697                     Encoder::varint_len(offset)
698                 } else {
699                     0
700                 };
701             if overhead > builder.remaining() {
702                 qtrace!([self], "write_frame no space for header");
703                 return;
704             }
705 
706             let (length, fill) = Self::length_and_fill(data.len(), builder.remaining() - overhead);
707             let fin = final_size.map_or(false, |fs| fs == offset + u64::try_from(length).unwrap());
708             if length == 0 && !fin {
709                 qtrace!([self], "write_frame no data, no fin");
710                 return;
711             }
712 
713             // Write the stream out.
714             builder.encode_varint(Frame::stream_type(fin, offset > 0, fill));
715             builder.encode_varint(id.as_u64());
716             if offset > 0 {
717                 builder.encode_varint(offset);
718             }
719             if fill {
720                 builder.encode(&data[..length]);
721             } else {
722                 builder.encode_vvec(&data[..length]);
723             }
724             debug_assert!(builder.len() <= builder.limit());
725 
726             self.mark_as_sent(offset, length, fin);
727             tokens.push(RecoveryToken::Stream(StreamRecoveryToken {
728                 id,
729                 offset,
730                 length,
731                 fin,
732             }));
733             stats.stream += 1;
734         }
735     }
736 
reset_acked(&mut self)737     pub fn reset_acked(&mut self) {
738         match self.state {
739             SendStreamState::Ready { .. }
740             | SendStreamState::Send { .. }
741             | SendStreamState::DataSent { .. }
742             | SendStreamState::DataRecvd { .. } => {
743                 qtrace!([self], "Reset acked while in {} state?", self.state.name())
744             }
745             SendStreamState::ResetSent { .. } => self.state.transition(SendStreamState::ResetRecvd),
746             SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"),
747         };
748     }
749 
reset_lost(&mut self)750     pub fn reset_lost(&mut self) {
751         match self.state {
752             SendStreamState::ResetSent {
753                 ref mut priority, ..
754             } => {
755                 *priority = Some(self.priority + self.retransmission_priority);
756             }
757             SendStreamState::ResetRecvd => (),
758             _ => unreachable!(),
759         }
760     }
761 
762     /// Maybe write a `RESET_STREAM` frame.
write_reset_frame( &mut self, p: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, ) -> bool763     pub fn write_reset_frame(
764         &mut self,
765         p: TransmissionPriority,
766         builder: &mut PacketBuilder,
767         tokens: &mut Vec<RecoveryToken>,
768         stats: &mut FrameStats,
769     ) -> bool {
770         if let SendStreamState::ResetSent {
771             final_size,
772             err,
773             ref mut priority,
774         } = self.state
775         {
776             if *priority != Some(p) {
777                 return false;
778             }
779             if builder.write_varint_frame(&[
780                 FRAME_TYPE_RESET_STREAM,
781                 self.stream_id.as_u64(),
782                 err,
783                 final_size,
784             ]) {
785                 tokens.push(RecoveryToken::ResetStream {
786                     stream_id: self.stream_id,
787                 });
788                 stats.reset_stream += 1;
789                 *priority = None;
790                 true
791             } else {
792                 false
793             }
794         } else {
795             false
796         }
797     }
798 
blocked_lost(&mut self, limit: u64)799     pub fn blocked_lost(&mut self, limit: u64) {
800         if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } =
801             &mut self.state
802         {
803             fc.frame_lost(limit);
804         } else {
805             qtrace!([self], "Ignoring lost STREAM_DATA_BLOCKED({})", limit);
806         }
807     }
808 
809     /// Maybe write a `STREAM_DATA_BLOCKED` frame.
write_blocked_frame( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )810     pub fn write_blocked_frame(
811         &mut self,
812         priority: TransmissionPriority,
813         builder: &mut PacketBuilder,
814         tokens: &mut Vec<RecoveryToken>,
815         stats: &mut FrameStats,
816     ) {
817         // Send STREAM_DATA_BLOCKED at normal priority always.
818         if priority == self.priority {
819             if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } =
820                 &mut self.state
821             {
822                 fc.write_frames(builder, tokens, stats);
823             }
824         }
825     }
826 
mark_as_sent(&mut self, offset: u64, len: usize, fin: bool)827     pub fn mark_as_sent(&mut self, offset: u64, len: usize, fin: bool) {
828         if let Some(buf) = self.state.tx_buf_mut() {
829             buf.mark_as_sent(offset, len);
830             self.send_blocked_if_space_needed(0);
831         };
832 
833         if fin {
834             if let SendStreamState::DataSent { fin_sent, .. } = &mut self.state {
835                 *fin_sent = true;
836             }
837         }
838     }
839 
mark_as_acked(&mut self, offset: u64, len: usize, fin: bool)840     pub fn mark_as_acked(&mut self, offset: u64, len: usize, fin: bool) {
841         match self.state {
842             SendStreamState::Send {
843                 ref mut send_buf, ..
844             } => {
845                 send_buf.mark_as_acked(offset, len);
846                 if self.avail() > 0 {
847                     self.conn_events.send_stream_writable(self.stream_id)
848                 }
849             }
850             SendStreamState::DataSent {
851                 ref mut send_buf,
852                 ref mut fin_acked,
853                 ..
854             } => {
855                 send_buf.mark_as_acked(offset, len);
856                 if fin {
857                     *fin_acked = true;
858                 }
859                 if *fin_acked && send_buf.buffered() == 0 {
860                     self.conn_events.send_stream_complete(self.stream_id);
861                     self.state.transition(SendStreamState::DataRecvd);
862                 }
863             }
864             _ => qtrace!(
865                 [self],
866                 "mark_as_acked called from state {}",
867                 self.state.name()
868             ),
869         }
870     }
871 
mark_as_lost(&mut self, offset: u64, len: usize, fin: bool)872     pub fn mark_as_lost(&mut self, offset: u64, len: usize, fin: bool) {
873         self.retransmission_offset = max(
874             self.retransmission_offset,
875             offset + u64::try_from(len).unwrap(),
876         );
877         qtrace!(
878             [self],
879             "mark_as_lost retransmission offset={}",
880             self.retransmission_offset
881         );
882         if let Some(buf) = self.state.tx_buf_mut() {
883             buf.mark_as_lost(offset, len);
884         }
885 
886         if fin {
887             if let SendStreamState::DataSent {
888                 fin_sent,
889                 fin_acked,
890                 ..
891             } = &mut self.state
892             {
893                 *fin_sent = *fin_acked;
894             }
895         }
896     }
897 
898     /// Bytes sendable on stream. Constrained by stream credit available,
899     /// connection credit available, and space in the tx buffer.
avail(&self) -> usize900     pub fn avail(&self) -> usize {
901         if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } =
902             &self.state
903         {
904             min(
905                 min(fc.available(), conn_fc.borrow().available()),
906                 self.state.tx_avail(),
907             )
908         } else {
909             0
910         }
911     }
912 
set_max_stream_data(&mut self, limit: u64)913     pub fn set_max_stream_data(&mut self, limit: u64) {
914         if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } =
915             &mut self.state
916         {
917             let stream_was_blocked = fc.available() == 0;
918             fc.update(limit);
919             if stream_was_blocked && self.avail() > 0 {
920                 self.conn_events.send_stream_writable(self.stream_id)
921             }
922         }
923     }
924 
is_terminal(&self) -> bool925     pub fn is_terminal(&self) -> bool {
926         matches!(
927             self.state,
928             SendStreamState::DataRecvd { .. } | SendStreamState::ResetRecvd
929         )
930     }
931 
send(&mut self, buf: &[u8]) -> Res<usize>932     pub fn send(&mut self, buf: &[u8]) -> Res<usize> {
933         self.send_internal(buf, false)
934     }
935 
send_atomic(&mut self, buf: &[u8]) -> Res<usize>936     pub fn send_atomic(&mut self, buf: &[u8]) -> Res<usize> {
937         self.send_internal(buf, true)
938     }
939 
send_blocked_if_space_needed(&mut self, needed_space: usize)940     fn send_blocked_if_space_needed(&mut self, needed_space: usize) {
941         if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } =
942             &mut self.state
943         {
944             if fc.available() <= needed_space {
945                 fc.blocked();
946             }
947 
948             if conn_fc.borrow().available() <= needed_space {
949                 conn_fc.borrow_mut().blocked();
950             }
951         }
952     }
953 
send_internal(&mut self, buf: &[u8], atomic: bool) -> Res<usize>954     fn send_internal(&mut self, buf: &[u8], atomic: bool) -> Res<usize> {
955         if buf.is_empty() {
956             qerror!([self], "zero-length send on stream");
957             return Err(Error::InvalidInput);
958         }
959 
960         if let SendStreamState::Ready { fc, conn_fc } = &mut self.state {
961             let owned_fc = mem::replace(fc, SenderFlowControl::new(self.stream_id, 0));
962             let owned_conn_fc = Rc::clone(conn_fc);
963             self.state.transition(SendStreamState::Send {
964                 fc: owned_fc,
965                 conn_fc: owned_conn_fc,
966                 send_buf: TxBuffer::new(),
967             });
968         }
969 
970         if !matches!(self.state, SendStreamState::Send { .. }) {
971             return Err(Error::FinalSizeError);
972         }
973 
974         let buf = if buf.is_empty() || (self.avail() == 0) {
975             return Ok(0);
976         } else if self.avail() < buf.len() {
977             if atomic {
978                 self.send_blocked_if_space_needed(buf.len());
979                 return Ok(0);
980             } else {
981                 &buf[..self.avail()]
982             }
983         } else {
984             buf
985         };
986 
987         match &mut self.state {
988             SendStreamState::Ready { .. } => unreachable!(),
989             SendStreamState::Send {
990                 fc,
991                 conn_fc,
992                 send_buf,
993             } => {
994                 let sent = send_buf.send(buf);
995                 fc.consume(sent);
996                 conn_fc.borrow_mut().consume(sent);
997                 Ok(sent)
998             }
999             _ => Err(Error::FinalSizeError),
1000         }
1001     }
1002 
close(&mut self)1003     pub fn close(&mut self) {
1004         match &mut self.state {
1005             SendStreamState::Ready { .. } => {
1006                 self.state.transition(SendStreamState::DataSent {
1007                     send_buf: TxBuffer::new(),
1008                     fin_sent: false,
1009                     fin_acked: false,
1010                 });
1011             }
1012             SendStreamState::Send { send_buf, .. } => {
1013                 let owned_buf = mem::replace(send_buf, TxBuffer::new());
1014                 self.state.transition(SendStreamState::DataSent {
1015                     send_buf: owned_buf,
1016                     fin_sent: false,
1017                     fin_acked: false,
1018                 });
1019             }
1020             SendStreamState::DataSent { .. } => qtrace!([self], "already in DataSent state"),
1021             SendStreamState::DataRecvd { .. } => qtrace!([self], "already in DataRecvd state"),
1022             SendStreamState::ResetSent { .. } => qtrace!([self], "already in ResetSent state"),
1023             SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"),
1024         }
1025     }
1026 
reset(&mut self, err: AppError)1027     pub fn reset(&mut self, err: AppError) {
1028         match &self.state {
1029             SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } => {
1030                 let final_size = fc.used();
1031                 self.state.transition(SendStreamState::ResetSent {
1032                     err,
1033                     final_size,
1034                     priority: Some(self.priority),
1035                 });
1036             }
1037             SendStreamState::DataSent { send_buf, .. } => {
1038                 let final_size = send_buf.used();
1039                 self.state.transition(SendStreamState::ResetSent {
1040                     err,
1041                     final_size,
1042                     priority: Some(self.priority),
1043                 });
1044             }
1045             SendStreamState::DataRecvd { .. } => qtrace!([self], "already in DataRecvd state"),
1046             SendStreamState::ResetSent { .. } => qtrace!([self], "already in ResetSent state"),
1047             SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"),
1048         };
1049     }
1050 
1051     #[cfg(test)]
state(&mut self) -> &mut SendStreamState1052     pub(crate) fn state(&mut self) -> &mut SendStreamState {
1053         &mut self.state
1054     }
1055 }
1056 
1057 impl ::std::fmt::Display for SendStream {
fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result1058     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
1059         write!(f, "SendStream {}", self.stream_id)
1060     }
1061 }
1062 
1063 #[derive(Debug, Default)]
1064 pub(crate) struct SendStreams(IndexMap<StreamId, SendStream>);
1065 
1066 impl SendStreams {
get(&self, id: StreamId) -> Res<&SendStream>1067     pub fn get(&self, id: StreamId) -> Res<&SendStream> {
1068         self.0.get(&id).ok_or(Error::InvalidStreamId)
1069     }
1070 
get_mut(&mut self, id: StreamId) -> Res<&mut SendStream>1071     pub fn get_mut(&mut self, id: StreamId) -> Res<&mut SendStream> {
1072         self.0.get_mut(&id).ok_or(Error::InvalidStreamId)
1073     }
1074 
exists(&self, id: StreamId) -> bool1075     pub fn exists(&self, id: StreamId) -> bool {
1076         self.0.contains_key(&id)
1077     }
1078 
insert(&mut self, id: StreamId, stream: SendStream)1079     pub fn insert(&mut self, id: StreamId, stream: SendStream) {
1080         self.0.insert(id, stream);
1081     }
1082 
acked(&mut self, token: &StreamRecoveryToken)1083     pub fn acked(&mut self, token: &StreamRecoveryToken) {
1084         if let Some(ss) = self.0.get_mut(&token.id) {
1085             ss.mark_as_acked(token.offset, token.length, token.fin);
1086         }
1087     }
1088 
reset_acked(&mut self, id: StreamId)1089     pub fn reset_acked(&mut self, id: StreamId) {
1090         if let Some(ss) = self.0.get_mut(&id) {
1091             ss.reset_acked()
1092         }
1093     }
1094 
lost(&mut self, token: &StreamRecoveryToken)1095     pub fn lost(&mut self, token: &StreamRecoveryToken) {
1096         if let Some(ss) = self.0.get_mut(&token.id) {
1097             ss.mark_as_lost(token.offset, token.length, token.fin);
1098         }
1099     }
1100 
reset_lost(&mut self, stream_id: StreamId)1101     pub fn reset_lost(&mut self, stream_id: StreamId) {
1102         if let Some(ss) = self.0.get_mut(&stream_id) {
1103             ss.reset_lost();
1104         }
1105     }
1106 
blocked_lost(&mut self, stream_id: StreamId, limit: u64)1107     pub fn blocked_lost(&mut self, stream_id: StreamId, limit: u64) {
1108         if let Some(ss) = self.0.get_mut(&stream_id) {
1109             ss.blocked_lost(limit);
1110         }
1111     }
1112 
clear(&mut self)1113     pub fn clear(&mut self) {
1114         self.0.clear()
1115     }
1116 
clear_terminal(&mut self)1117     pub fn clear_terminal(&mut self) {
1118         self.0.retain(|_, stream| !stream.is_terminal())
1119     }
1120 
write_frames( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )1121     pub(crate) fn write_frames(
1122         &mut self,
1123         priority: TransmissionPriority,
1124         builder: &mut PacketBuilder,
1125         tokens: &mut Vec<RecoveryToken>,
1126         stats: &mut FrameStats,
1127     ) {
1128         qtrace!("write STREAM frames at priority {:?}", priority);
1129         for stream in self.0.values_mut() {
1130             if !stream.write_reset_frame(priority, builder, tokens, stats) {
1131                 stream.write_blocked_frame(priority, builder, tokens, stats);
1132                 stream.write_stream_frame(priority, builder, tokens, stats);
1133             }
1134         }
1135     }
1136 
update_initial_limit(&mut self, remote: &TransportParameters)1137     pub fn update_initial_limit(&mut self, remote: &TransportParameters) {
1138         for (id, ss) in self.0.iter_mut() {
1139             let limit = if id.is_bidi() {
1140                 assert!(!id.is_remote_initiated(Role::Client));
1141                 remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE)
1142             } else {
1143                 remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_UNI)
1144             };
1145             ss.set_max_stream_data(limit);
1146         }
1147     }
1148 }
1149 
1150 impl<'a> IntoIterator for &'a mut SendStreams {
1151     type Item = (&'a StreamId, &'a mut SendStream);
1152     type IntoIter = indexmap::map::IterMut<'a, StreamId, SendStream>;
1153 
into_iter(self) -> indexmap::map::IterMut<'a, StreamId, SendStream>1154     fn into_iter(self) -> indexmap::map::IterMut<'a, StreamId, SendStream> {
1155         self.0.iter_mut()
1156     }
1157 }
1158 
1159 #[derive(Debug, Clone)]
1160 pub struct StreamRecoveryToken {
1161     pub(crate) id: StreamId,
1162     offset: u64,
1163     length: usize,
1164     fin: bool,
1165 }
1166 
1167 #[cfg(test)]
1168 mod tests {
1169     use super::*;
1170 
1171     use crate::events::ConnectionEvent;
1172     use neqo_common::{event::Provider, hex_with_len, qtrace};
1173 
connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>>1174     fn connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>> {
1175         Rc::new(RefCell::new(SenderFlowControl::new((), limit)))
1176     }
1177 
1178     #[test]
test_mark_range()1179     fn test_mark_range() {
1180         let mut rt = RangeTracker::default();
1181 
1182         // ranges can go from nothing->Sent if queued for retrans and then
1183         // acks arrive
1184         rt.mark_range(5, 5, RangeState::Acked);
1185         assert_eq!(rt.highest_offset(), 10);
1186         assert_eq!(rt.acked_from_zero(), 0);
1187         rt.mark_range(10, 4, RangeState::Acked);
1188         assert_eq!(rt.highest_offset(), 14);
1189         assert_eq!(rt.acked_from_zero(), 0);
1190 
1191         rt.mark_range(0, 5, RangeState::Sent);
1192         assert_eq!(rt.highest_offset(), 14);
1193         assert_eq!(rt.acked_from_zero(), 0);
1194         rt.mark_range(0, 5, RangeState::Acked);
1195         assert_eq!(rt.highest_offset(), 14);
1196         assert_eq!(rt.acked_from_zero(), 14);
1197 
1198         rt.mark_range(12, 20, RangeState::Acked);
1199         assert_eq!(rt.highest_offset(), 32);
1200         assert_eq!(rt.acked_from_zero(), 32);
1201 
1202         // ack the lot
1203         rt.mark_range(0, 400, RangeState::Acked);
1204         assert_eq!(rt.highest_offset(), 400);
1205         assert_eq!(rt.acked_from_zero(), 400);
1206 
1207         // acked trumps sent
1208         rt.mark_range(0, 200, RangeState::Sent);
1209         assert_eq!(rt.highest_offset(), 400);
1210         assert_eq!(rt.acked_from_zero(), 400);
1211     }
1212 
1213     #[test]
unmark_sent_start()1214     fn unmark_sent_start() {
1215         let mut rt = RangeTracker::default();
1216 
1217         rt.mark_range(0, 5, RangeState::Sent);
1218         assert_eq!(rt.highest_offset(), 5);
1219         assert_eq!(rt.acked_from_zero(), 0);
1220 
1221         rt.unmark_sent();
1222         assert_eq!(rt.highest_offset(), 0);
1223         assert_eq!(rt.acked_from_zero(), 0);
1224         assert_eq!(rt.first_unmarked_range(), (0, None));
1225     }
1226 
1227     #[test]
unmark_sent_middle()1228     fn unmark_sent_middle() {
1229         let mut rt = RangeTracker::default();
1230 
1231         rt.mark_range(0, 5, RangeState::Acked);
1232         assert_eq!(rt.highest_offset(), 5);
1233         assert_eq!(rt.acked_from_zero(), 5);
1234         rt.mark_range(5, 5, RangeState::Sent);
1235         assert_eq!(rt.highest_offset(), 10);
1236         assert_eq!(rt.acked_from_zero(), 5);
1237         rt.mark_range(10, 5, RangeState::Acked);
1238         assert_eq!(rt.highest_offset(), 15);
1239         assert_eq!(rt.acked_from_zero(), 5);
1240         assert_eq!(rt.first_unmarked_range(), (15, None));
1241 
1242         rt.unmark_sent();
1243         assert_eq!(rt.highest_offset(), 15);
1244         assert_eq!(rt.acked_from_zero(), 5);
1245         assert_eq!(rt.first_unmarked_range(), (5, Some(5)));
1246     }
1247 
1248     #[test]
unmark_sent_end()1249     fn unmark_sent_end() {
1250         let mut rt = RangeTracker::default();
1251 
1252         rt.mark_range(0, 5, RangeState::Acked);
1253         assert_eq!(rt.highest_offset(), 5);
1254         assert_eq!(rt.acked_from_zero(), 5);
1255         rt.mark_range(5, 5, RangeState::Sent);
1256         assert_eq!(rt.highest_offset(), 10);
1257         assert_eq!(rt.acked_from_zero(), 5);
1258         assert_eq!(rt.first_unmarked_range(), (10, None));
1259 
1260         rt.unmark_sent();
1261         assert_eq!(rt.highest_offset(), 5);
1262         assert_eq!(rt.acked_from_zero(), 5);
1263         assert_eq!(rt.first_unmarked_range(), (5, None));
1264     }
1265 
1266     #[test]
truncate_front()1267     fn truncate_front() {
1268         let mut v = VecDeque::new();
1269         v.push_back(5);
1270         v.push_back(6);
1271         v.push_back(7);
1272         v.push_front(4usize);
1273 
1274         v.rotate_left(1);
1275         v.truncate(3);
1276         assert_eq!(*v.front().unwrap(), 5);
1277         assert_eq!(*v.back().unwrap(), 7);
1278     }
1279 
1280     #[test]
test_unmark_range()1281     fn test_unmark_range() {
1282         let mut rt = RangeTracker::default();
1283 
1284         rt.mark_range(5, 5, RangeState::Acked);
1285         rt.mark_range(10, 5, RangeState::Sent);
1286 
1287         // Should unmark sent but not acked range
1288         rt.unmark_range(7, 6);
1289 
1290         let res = rt.first_unmarked_range();
1291         assert_eq!(res, (0, Some(5)));
1292         assert_eq!(
1293             rt.used.iter().next().unwrap(),
1294             (&5, &(5, RangeState::Acked))
1295         );
1296         assert_eq!(
1297             rt.used.iter().nth(1).unwrap(),
1298             (&13, &(2, RangeState::Sent))
1299         );
1300         assert!(rt.used.iter().nth(2).is_none());
1301         rt.mark_range(0, 5, RangeState::Sent);
1302 
1303         let res = rt.first_unmarked_range();
1304         assert_eq!(res, (10, Some(3)));
1305         rt.mark_range(10, 3, RangeState::Sent);
1306 
1307         let res = rt.first_unmarked_range();
1308         assert_eq!(res, (15, None));
1309     }
1310 
1311     #[test]
1312     #[allow(clippy::cognitive_complexity)]
tx_buffer_next_bytes_1()1313     fn tx_buffer_next_bytes_1() {
1314         let mut txb = TxBuffer::new();
1315 
1316         assert_eq!(txb.avail(), SEND_BUFFER_SIZE);
1317 
1318         // Fill the buffer
1319         assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE);
1320         assert!(matches!(txb.next_bytes(),
1321 			 Some((0, x)) if x.len()==SEND_BUFFER_SIZE
1322 			 && x.iter().all(|ch| *ch == 1)));
1323 
1324         // Mark almost all as sent. Get what's left
1325         let one_byte_from_end = SEND_BUFFER_SIZE as u64 - 1;
1326         txb.mark_as_sent(0, one_byte_from_end as usize);
1327         assert!(matches!(txb.next_bytes(),
1328 			 Some((start, x)) if x.len() == 1
1329 			 && start == one_byte_from_end
1330 			 && x.iter().all(|ch| *ch == 1)));
1331 
1332         // Mark all as sent. Get nothing
1333         txb.mark_as_sent(0, SEND_BUFFER_SIZE);
1334         assert!(matches!(txb.next_bytes(), None));
1335 
1336         // Mark as lost. Get it again
1337         txb.mark_as_lost(one_byte_from_end, 1);
1338         assert!(matches!(txb.next_bytes(),
1339 			 Some((start, x)) if x.len() == 1
1340 			 && start == one_byte_from_end
1341 			 && x.iter().all(|ch| *ch == 1)));
1342 
1343         // Mark a larger range lost, including beyond what's in the buffer even.
1344         // Get a little more
1345         let five_bytes_from_end = SEND_BUFFER_SIZE as u64 - 5;
1346         txb.mark_as_lost(five_bytes_from_end, 100);
1347         assert!(matches!(txb.next_bytes(),
1348 			 Some((start, x)) if x.len() == 5
1349 			 && start == five_bytes_from_end
1350 			 && x.iter().all(|ch| *ch == 1)));
1351 
1352         // Contig acked range at start means it can be removed from buffer
1353         // Impl of vecdeque should now result in a split buffer when more data
1354         // is sent
1355         txb.mark_as_acked(0, five_bytes_from_end as usize);
1356         assert_eq!(txb.send(&[2; 30]), 30);
1357         // Just get 5 even though there is more
1358         assert!(matches!(txb.next_bytes(),
1359 			 Some((start, x)) if x.len() == 5
1360 			 && start == five_bytes_from_end
1361 			 && x.iter().all(|ch| *ch == 1)));
1362         assert_eq!(txb.retired, five_bytes_from_end);
1363         assert_eq!(txb.buffered(), 35);
1364 
1365         // Marking that bit as sent should let the last contig bit be returned
1366         // when called again
1367         txb.mark_as_sent(five_bytes_from_end, 5);
1368         assert!(matches!(txb.next_bytes(),
1369 			 Some((start, x)) if x.len() == 30
1370 			 && start == SEND_BUFFER_SIZE as u64
1371 			 && x.iter().all(|ch| *ch == 2)));
1372     }
1373 
1374     #[test]
tx_buffer_next_bytes_2()1375     fn tx_buffer_next_bytes_2() {
1376         let mut txb = TxBuffer::new();
1377 
1378         assert_eq!(txb.avail(), SEND_BUFFER_SIZE);
1379 
1380         // Fill the buffer
1381         assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE);
1382         assert!(matches!(txb.next_bytes(),
1383 			 Some((0, x)) if x.len()==SEND_BUFFER_SIZE
1384 			 && x.iter().all(|ch| *ch == 1)));
1385 
1386         // As above
1387         let forty_bytes_from_end = SEND_BUFFER_SIZE as u64 - 40;
1388 
1389         txb.mark_as_acked(0, forty_bytes_from_end as usize);
1390         assert!(matches!(txb.next_bytes(),
1391                  Some((start, x)) if x.len() == 40
1392                  && start == forty_bytes_from_end
1393         ));
1394 
1395         // Valid new data placed in split locations
1396         assert_eq!(txb.send(&[2; 100]), 100);
1397 
1398         // Mark a little more as sent
1399         txb.mark_as_sent(forty_bytes_from_end, 10);
1400         let thirty_bytes_from_end = forty_bytes_from_end + 10;
1401         assert!(matches!(txb.next_bytes(),
1402 			 Some((start, x)) if x.len() == 30
1403 			 && start == thirty_bytes_from_end
1404 			 && x.iter().all(|ch| *ch == 1)));
1405 
1406         // Mark a range 'A' in second slice as sent. Should still return the same
1407         let range_a_start = SEND_BUFFER_SIZE as u64 + 30;
1408         let range_a_end = range_a_start + 10;
1409         txb.mark_as_sent(range_a_start, 10);
1410         assert!(matches!(txb.next_bytes(),
1411 			 Some((start, x)) if x.len() == 30
1412 			 && start == thirty_bytes_from_end
1413 			 && x.iter().all(|ch| *ch == 1)));
1414 
1415         // Ack entire first slice and into second slice
1416         let ten_bytes_past_end = SEND_BUFFER_SIZE as u64 + 10;
1417         txb.mark_as_acked(0, ten_bytes_past_end as usize);
1418 
1419         // Get up to marked range A
1420         assert!(matches!(txb.next_bytes(),
1421 			 Some((start, x)) if x.len() == 20
1422 			 && start == ten_bytes_past_end
1423 			 && x.iter().all(|ch| *ch == 2)));
1424 
1425         txb.mark_as_sent(ten_bytes_past_end, 20);
1426 
1427         // Get bit after earlier marked range A
1428         assert!(matches!(txb.next_bytes(),
1429 			 Some((start, x)) if x.len() == 60
1430 			 && start == range_a_end
1431 			 && x.iter().all(|ch| *ch == 2)));
1432 
1433         // No more bytes.
1434         txb.mark_as_sent(range_a_end, 60);
1435         assert!(matches!(txb.next_bytes(), None));
1436     }
1437 
1438     #[test]
test_stream_tx()1439     fn test_stream_tx() {
1440         let conn_fc = connection_fc(4096);
1441         let conn_events = ConnectionEvents::default();
1442 
1443         let mut s = SendStream::new(4.into(), 1024, Rc::clone(&conn_fc), conn_events);
1444 
1445         let res = s.send(&[4; 100]).unwrap();
1446         assert_eq!(res, 100);
1447         s.mark_as_sent(0, 50, false);
1448         if let SendStreamState::Send { fc, .. } = s.state() {
1449             assert_eq!(fc.used(), 100);
1450         } else {
1451             panic!("unexpected stream state");
1452         }
1453 
1454         // Should hit stream flow control limit before filling up send buffer
1455         let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap();
1456         assert_eq!(res, 1024 - 100);
1457 
1458         // should do nothing, max stream data already 1024
1459         s.set_max_stream_data(1024);
1460         let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap();
1461         assert_eq!(res, 0);
1462 
1463         // should now hit the conn flow control (4096)
1464         s.set_max_stream_data(1_048_576);
1465         let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap();
1466         assert_eq!(res, 3072);
1467 
1468         // should now hit the tx buffer size
1469         conn_fc.borrow_mut().update(SEND_BUFFER_SIZE as u64);
1470         let res = s.send(&[4; SEND_BUFFER_SIZE + 100]).unwrap();
1471         assert_eq!(res, SEND_BUFFER_SIZE - 4096);
1472 
1473         // TODO(agrover@mozilla.com): test ooo acks somehow
1474         s.mark_as_acked(0, 40, false);
1475     }
1476 
1477     #[test]
test_tx_buffer_acks()1478     fn test_tx_buffer_acks() {
1479         let mut tx = TxBuffer::new();
1480         assert_eq!(tx.send(&[4; 100]), 100);
1481         let res = tx.next_bytes().unwrap();
1482         assert_eq!(res.0, 0);
1483         assert_eq!(res.1.len(), 100);
1484         tx.mark_as_sent(0, 100);
1485         let res = tx.next_bytes();
1486         assert_eq!(res, None);
1487 
1488         tx.mark_as_acked(0, 100);
1489         let res = tx.next_bytes();
1490         assert_eq!(res, None);
1491     }
1492 
1493     #[test]
send_stream_writable_event_gen()1494     fn send_stream_writable_event_gen() {
1495         let conn_fc = connection_fc(2);
1496         let mut conn_events = ConnectionEvents::default();
1497 
1498         let mut s = SendStream::new(4.into(), 0, Rc::clone(&conn_fc), conn_events.clone());
1499 
1500         // Stream is initially blocked (conn:2, stream:0)
1501         // and will not accept data.
1502         assert_eq!(s.send(b"hi").unwrap(), 0);
1503 
1504         // increasing to (conn:2, stream:2) will allow 2 bytes, and also
1505         // generate a SendStreamWritable event.
1506         s.set_max_stream_data(2);
1507         let evts = conn_events.events().collect::<Vec<_>>();
1508         assert_eq!(evts.len(), 1);
1509         assert!(matches!(
1510             evts[0],
1511             ConnectionEvent::SendStreamWritable { .. }
1512         ));
1513         assert_eq!(s.send(b"hello").unwrap(), 2);
1514 
1515         // increasing to (conn:2, stream:4) will not generate an event or allow
1516         // sending anything.
1517         s.set_max_stream_data(4);
1518         let evts = conn_events.events().collect::<Vec<_>>();
1519         assert_eq!(evts.len(), 0);
1520         assert_eq!(s.send(b"hello").unwrap(), 0);
1521 
1522         // Increasing conn max (conn:4, stream:4) will unblock but not emit
1523         // event b/c that happens in Connection::emit_frame() (tested in
1524         // connection.rs)
1525         assert!(conn_fc.borrow_mut().update(4));
1526         let evts = conn_events.events().collect::<Vec<_>>();
1527         assert_eq!(evts.len(), 0);
1528         assert_eq!(s.avail(), 2);
1529         assert_eq!(s.send(b"hello").unwrap(), 2);
1530 
1531         // No event because still blocked by conn
1532         s.set_max_stream_data(1_000_000_000);
1533         let evts = conn_events.events().collect::<Vec<_>>();
1534         assert_eq!(evts.len(), 0);
1535 
1536         // No event because happens in emit_frame()
1537         conn_fc.borrow_mut().update(1_000_000_000);
1538         let evts = conn_events.events().collect::<Vec<_>>();
1539         assert_eq!(evts.len(), 0);
1540 
1541         // Unblocking both by a large amount will cause avail() to be limited by
1542         // tx buffer size.
1543         assert_eq!(s.avail(), SEND_BUFFER_SIZE - 4);
1544 
1545         assert_eq!(
1546             s.send(&[b'a'; SEND_BUFFER_SIZE]).unwrap(),
1547             SEND_BUFFER_SIZE - 4
1548         );
1549 
1550         // No event because still blocked by tx buffer full
1551         s.set_max_stream_data(2_000_000_000);
1552         let evts = conn_events.events().collect::<Vec<_>>();
1553         assert_eq!(evts.len(), 0);
1554         assert_eq!(s.send(b"hello").unwrap(), 0);
1555     }
1556 
1557     #[test]
send_stream_writable_event_new_stream()1558     fn send_stream_writable_event_new_stream() {
1559         let conn_fc = connection_fc(2);
1560         let mut conn_events = ConnectionEvents::default();
1561 
1562         let _s = SendStream::new(4.into(), 100, conn_fc, conn_events.clone());
1563 
1564         // Creating a new stream with conn and stream credits should result in
1565         // an event.
1566         let evts = conn_events.events().collect::<Vec<_>>();
1567         assert_eq!(evts.len(), 1);
1568         assert!(matches!(
1569             evts[0],
1570             ConnectionEvent::SendStreamWritable { .. }
1571         ));
1572     }
1573 
1574     #[test]
1575     // Verify lost frames handle fin properly
send_stream_get_frame_data()1576     fn send_stream_get_frame_data() {
1577         let conn_fc = connection_fc(100);
1578         let conn_events = ConnectionEvents::default();
1579 
1580         let mut s = SendStream::new(0.into(), 100, conn_fc, conn_events);
1581         s.send(&[0; 10]).unwrap();
1582         s.close();
1583 
1584         let mut ss = SendStreams::default();
1585         ss.insert(StreamId::from(0), s);
1586 
1587         let mut tokens = Vec::new();
1588         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1589 
1590         // Write a small frame: no fin.
1591         let written = builder.len();
1592         builder.set_limit(written + 6);
1593         ss.write_frames(
1594             TransmissionPriority::default(),
1595             &mut builder,
1596             &mut tokens,
1597             &mut FrameStats::default(),
1598         );
1599         assert_eq!(builder.len(), written + 6);
1600         assert_eq!(tokens.len(), 1);
1601         let f1_token = tokens.remove(0);
1602         assert!(matches!(&f1_token, RecoveryToken::Stream(x) if !x.fin));
1603 
1604         // Write the rest: fin.
1605         let written = builder.len();
1606         builder.set_limit(written + 200);
1607         ss.write_frames(
1608             TransmissionPriority::default(),
1609             &mut builder,
1610             &mut tokens,
1611             &mut FrameStats::default(),
1612         );
1613         assert_eq!(builder.len(), written + 10);
1614         assert_eq!(tokens.len(), 1);
1615         let f2_token = tokens.remove(0);
1616         assert!(matches!(&f2_token, RecoveryToken::Stream(x) if x.fin));
1617 
1618         // Should be no more data to frame.
1619         let written = builder.len();
1620         ss.write_frames(
1621             TransmissionPriority::default(),
1622             &mut builder,
1623             &mut tokens,
1624             &mut FrameStats::default(),
1625         );
1626         assert_eq!(builder.len(), written);
1627         assert!(tokens.is_empty());
1628 
1629         // Mark frame 1 as lost
1630         if let RecoveryToken::Stream(rt) = f1_token {
1631             ss.lost(&rt);
1632         } else {
1633             panic!();
1634         }
1635 
1636         // Next frame should not set fin even though stream has fin but frame
1637         // does not include end of stream
1638         let written = builder.len();
1639         ss.write_frames(
1640             TransmissionPriority::default() + RetransmissionPriority::default(),
1641             &mut builder,
1642             &mut tokens,
1643             &mut FrameStats::default(),
1644         );
1645         assert_eq!(builder.len(), written + 7); // Needs a length this time.
1646         assert_eq!(tokens.len(), 1);
1647         let f4_token = tokens.remove(0);
1648         assert!(matches!(&f4_token, RecoveryToken::Stream(x) if !x.fin));
1649 
1650         // Mark frame 2 as lost
1651         if let RecoveryToken::Stream(rt) = f2_token {
1652             ss.lost(&rt);
1653         } else {
1654             panic!();
1655         }
1656 
1657         // Next frame should set fin because it includes end of stream
1658         let written = builder.len();
1659         ss.write_frames(
1660             TransmissionPriority::default() + RetransmissionPriority::default(),
1661             &mut builder,
1662             &mut tokens,
1663             &mut FrameStats::default(),
1664         );
1665         assert_eq!(builder.len(), written + 10);
1666         assert_eq!(tokens.len(), 1);
1667         let f5_token = tokens.remove(0);
1668         assert!(matches!(&f5_token, RecoveryToken::Stream(x) if x.fin));
1669     }
1670 
1671     #[test]
1672     #[allow(clippy::cognitive_complexity)]
1673     // Verify lost frames handle fin properly with zero length fin
send_stream_get_frame_zerolength_fin()1674     fn send_stream_get_frame_zerolength_fin() {
1675         let conn_fc = connection_fc(100);
1676         let conn_events = ConnectionEvents::default();
1677 
1678         let mut s = SendStream::new(0.into(), 100, conn_fc, conn_events);
1679         s.send(&[0; 10]).unwrap();
1680 
1681         let mut ss = SendStreams::default();
1682         ss.insert(StreamId::from(0), s);
1683 
1684         let mut tokens = Vec::new();
1685         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1686         ss.write_frames(
1687             TransmissionPriority::default(),
1688             &mut builder,
1689             &mut tokens,
1690             &mut FrameStats::default(),
1691         );
1692         let f1_token = tokens.remove(0);
1693         assert!(matches!(&f1_token, RecoveryToken::Stream(x) if x.offset == 0));
1694         assert!(matches!(&f1_token, RecoveryToken::Stream(x) if x.length == 10));
1695         assert!(matches!(&f1_token, RecoveryToken::Stream(x) if !x.fin));
1696 
1697         // Should be no more data to frame
1698         ss.write_frames(
1699             TransmissionPriority::default(),
1700             &mut builder,
1701             &mut tokens,
1702             &mut FrameStats::default(),
1703         );
1704         assert!(tokens.is_empty());
1705 
1706         ss.get_mut(StreamId::from(0)).unwrap().close();
1707 
1708         ss.write_frames(
1709             TransmissionPriority::default(),
1710             &mut builder,
1711             &mut tokens,
1712             &mut FrameStats::default(),
1713         );
1714         let f2_token = tokens.remove(0);
1715         assert!(matches!(&f2_token, RecoveryToken::Stream(x) if x.offset == 10));
1716         assert!(matches!(&f2_token, RecoveryToken::Stream(x) if x.length == 0));
1717         assert!(matches!(&f2_token, RecoveryToken::Stream(x) if x.fin));
1718 
1719         // Mark frame 2 as lost
1720         if let RecoveryToken::Stream(rt) = f2_token {
1721             ss.lost(&rt);
1722         } else {
1723             panic!();
1724         }
1725 
1726         // Next frame should set fin
1727         ss.write_frames(
1728             TransmissionPriority::default(),
1729             &mut builder,
1730             &mut tokens,
1731             &mut FrameStats::default(),
1732         );
1733         let f3_token = tokens.remove(0);
1734         assert!(matches!(&f3_token, RecoveryToken::Stream(x) if x.offset == 10));
1735         assert!(matches!(&f3_token, RecoveryToken::Stream(x) if x.length == 0));
1736         assert!(matches!(&f3_token, RecoveryToken::Stream(x) if x.fin));
1737 
1738         // Mark frame 1 as lost
1739         if let RecoveryToken::Stream(rt) = f1_token {
1740             ss.lost(&rt);
1741         } else {
1742             panic!();
1743         }
1744 
1745         // Next frame should set fin and include all data
1746         ss.write_frames(
1747             TransmissionPriority::default(),
1748             &mut builder,
1749             &mut tokens,
1750             &mut FrameStats::default(),
1751         );
1752         let f4_token = tokens.remove(0);
1753         assert!(matches!(&f4_token, RecoveryToken::Stream(x) if x.offset == 0));
1754         assert!(matches!(&f4_token, RecoveryToken::Stream(x) if x.length == 10));
1755         assert!(matches!(&f4_token, RecoveryToken::Stream(x) if x.fin));
1756     }
1757 
1758     #[test]
data_blocked()1759     fn data_blocked() {
1760         let conn_fc = connection_fc(5);
1761         let conn_events = ConnectionEvents::default();
1762 
1763         let stream_id = StreamId::from(4);
1764         let mut s = SendStream::new(stream_id, 2, Rc::clone(&conn_fc), conn_events);
1765 
1766         // Only two bytes can be sent due to the stream limit.
1767         assert_eq!(s.send(b"abc").unwrap(), 2);
1768         assert_eq!(s.next_bytes(false), Some((0, &b"ab"[..])));
1769 
1770         // This doesn't report blocking yet.
1771         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1772         let mut tokens = Vec::new();
1773         let mut stats = FrameStats::default();
1774         s.write_blocked_frame(
1775             TransmissionPriority::default(),
1776             &mut builder,
1777             &mut tokens,
1778             &mut stats,
1779         );
1780         assert_eq!(stats.stream_data_blocked, 0);
1781 
1782         // Blocking is reported after sending the last available credit.
1783         s.mark_as_sent(0, 2, false);
1784         s.write_blocked_frame(
1785             TransmissionPriority::default(),
1786             &mut builder,
1787             &mut tokens,
1788             &mut stats,
1789         );
1790         assert_eq!(stats.stream_data_blocked, 1);
1791 
1792         // Now increase the stream limit and test the connection limit.
1793         s.set_max_stream_data(10);
1794 
1795         assert_eq!(s.send(b"abcd").unwrap(), 3);
1796         assert_eq!(s.next_bytes(false), Some((2, &b"abc"[..])));
1797         // DATA_BLOCKED is not sent yet.
1798         conn_fc
1799             .borrow_mut()
1800             .write_frames(&mut builder, &mut tokens, &mut stats);
1801         assert_eq!(stats.data_blocked, 0);
1802 
1803         // DATA_BLOCKED is queued once bytes using all credit are sent.
1804         s.mark_as_sent(2, 3, false);
1805         conn_fc
1806             .borrow_mut()
1807             .write_frames(&mut builder, &mut tokens, &mut stats);
1808         assert_eq!(stats.data_blocked, 1);
1809     }
1810 
1811     #[test]
data_blocked_atomic()1812     fn data_blocked_atomic() {
1813         let conn_fc = connection_fc(5);
1814         let conn_events = ConnectionEvents::default();
1815 
1816         let stream_id = StreamId::from(4);
1817         let mut s = SendStream::new(stream_id, 2, Rc::clone(&conn_fc), conn_events);
1818 
1819         // Stream is initially blocked (conn:5, stream:2)
1820         // and will not accept atomic write of 3 bytes.
1821         assert_eq!(s.send_atomic(b"abc").unwrap(), 0);
1822 
1823         // Assert that STREAM_DATA_BLOCKED is sent.
1824         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1825         let mut tokens = Vec::new();
1826         let mut stats = FrameStats::default();
1827         s.write_blocked_frame(
1828             TransmissionPriority::default(),
1829             &mut builder,
1830             &mut tokens,
1831             &mut stats,
1832         );
1833         assert_eq!(stats.stream_data_blocked, 1);
1834 
1835         // Assert that a non-atomic write works.
1836         assert_eq!(s.send(b"abc").unwrap(), 2);
1837         assert_eq!(s.next_bytes(false), Some((0, &b"ab"[..])));
1838         s.mark_as_sent(0, 2, false);
1839 
1840         // Set limits to (conn:5, stream:10).
1841         s.set_max_stream_data(10);
1842 
1843         // An atomic write of 4 bytes exceeds the remaining limit of 3.
1844         assert_eq!(s.send_atomic(b"abcd").unwrap(), 0);
1845 
1846         // Assert that DATA_BLOCKED is sent.
1847         conn_fc
1848             .borrow_mut()
1849             .write_frames(&mut builder, &mut tokens, &mut stats);
1850         assert_eq!(stats.data_blocked, 1);
1851 
1852         // Check that a non-atomic write works.
1853         assert_eq!(s.send(b"abcd").unwrap(), 3);
1854         assert_eq!(s.next_bytes(false), Some((2, &b"abc"[..])));
1855         s.mark_as_sent(2, 3, false);
1856 
1857         // Increase limits to (conn:15, stream:15).
1858         s.set_max_stream_data(15);
1859         conn_fc.borrow_mut().update(15);
1860 
1861         // Check that atomic writing right up to the limit works.
1862         assert_eq!(s.send_atomic(b"abcdefghij").unwrap(), 10);
1863     }
1864 
1865     #[test]
ack_fin_first()1866     fn ack_fin_first() {
1867         const MESSAGE: &[u8] = b"hello";
1868         let len_u64 = u64::try_from(MESSAGE.len()).unwrap();
1869 
1870         let conn_fc = connection_fc(len_u64);
1871         let conn_events = ConnectionEvents::default();
1872 
1873         let mut s = SendStream::new(StreamId::new(100), 0, conn_fc, conn_events);
1874         s.set_max_stream_data(len_u64);
1875 
1876         // Send all the data, then the fin.
1877         let _ = s.send(MESSAGE).unwrap();
1878         s.mark_as_sent(0, MESSAGE.len(), false);
1879         s.close();
1880         s.mark_as_sent(len_u64, 0, true);
1881 
1882         // Ack the fin, then the data.
1883         s.mark_as_acked(len_u64, 0, true);
1884         s.mark_as_acked(0, MESSAGE.len(), false);
1885         assert!(s.is_terminal());
1886     }
1887 
1888     #[test]
ack_then_lose_fin()1889     fn ack_then_lose_fin() {
1890         const MESSAGE: &[u8] = b"hello";
1891         let len_u64 = u64::try_from(MESSAGE.len()).unwrap();
1892 
1893         let conn_fc = connection_fc(len_u64);
1894         let conn_events = ConnectionEvents::default();
1895 
1896         let id = StreamId::new(100);
1897         let mut s = SendStream::new(id, 0, conn_fc, conn_events);
1898         s.set_max_stream_data(len_u64);
1899 
1900         // Send all the data, then the fin.
1901         let _ = s.send(MESSAGE).unwrap();
1902         s.mark_as_sent(0, MESSAGE.len(), false);
1903         s.close();
1904         s.mark_as_sent(len_u64, 0, true);
1905 
1906         // Ack the fin, then mark it lost.
1907         s.mark_as_acked(len_u64, 0, true);
1908         s.mark_as_lost(len_u64, 0, true);
1909 
1910         // No frame should be sent here.
1911         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1912         let mut tokens = Vec::new();
1913         let mut stats = FrameStats::default();
1914         s.write_stream_frame(
1915             TransmissionPriority::default(),
1916             &mut builder,
1917             &mut tokens,
1918             &mut stats,
1919         );
1920         assert_eq!(stats.stream, 0);
1921     }
1922 
1923     /// Create a `SendStream` and force it into a state where it believes that
1924     /// `offset` bytes have already been sent and acknowledged.
stream_with_sent(stream: u64, offset: usize) -> SendStream1925     fn stream_with_sent(stream: u64, offset: usize) -> SendStream {
1926         const MAX_VARINT: u64 = (1 << 62) - 1;
1927 
1928         let conn_fc = connection_fc(MAX_VARINT);
1929         let mut s = SendStream::new(
1930             StreamId::from(stream),
1931             MAX_VARINT,
1932             conn_fc,
1933             ConnectionEvents::default(),
1934         );
1935 
1936         let mut send_buf = TxBuffer::new();
1937         send_buf.retired = u64::try_from(offset).unwrap();
1938         send_buf.ranges.mark_range(0, offset, RangeState::Acked);
1939         let mut fc = SenderFlowControl::new(StreamId::from(stream), MAX_VARINT);
1940         fc.consume(offset);
1941         let conn_fc = Rc::new(RefCell::new(SenderFlowControl::new((), MAX_VARINT)));
1942         s.state = SendStreamState::Send {
1943             fc,
1944             conn_fc,
1945             send_buf,
1946         };
1947         s
1948     }
1949 
frame_sent_sid(stream: u64, offset: usize, len: usize, fin: bool, space: usize) -> bool1950     fn frame_sent_sid(stream: u64, offset: usize, len: usize, fin: bool, space: usize) -> bool {
1951         const BUF: &[u8] = &[0x42; 128];
1952         let mut s = stream_with_sent(stream, offset);
1953 
1954         // Now write out the proscribed data and maybe close.
1955         if len > 0 {
1956             s.send(&BUF[..len]).unwrap();
1957         }
1958         if fin {
1959             s.close();
1960         }
1961 
1962         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
1963         let header_len = builder.len();
1964         builder.set_limit(header_len + space);
1965 
1966         let mut tokens = Vec::new();
1967         let mut stats = FrameStats::default();
1968         s.write_stream_frame(
1969             TransmissionPriority::default(),
1970             &mut builder,
1971             &mut tokens,
1972             &mut stats,
1973         );
1974         qtrace!("STREAM frame: {}", hex_with_len(&builder[header_len..]));
1975         stats.stream > 0
1976     }
1977 
frame_sent(offset: usize, len: usize, fin: bool, space: usize) -> bool1978     fn frame_sent(offset: usize, len: usize, fin: bool, space: usize) -> bool {
1979         frame_sent_sid(0, offset, len, fin, space)
1980     }
1981 
1982     #[test]
stream_frame_empty()1983     fn stream_frame_empty() {
1984         // Stream frames with empty data and no fin never work.
1985         assert!(!frame_sent(10, 0, false, 2));
1986         assert!(!frame_sent(10, 0, false, 3));
1987         assert!(!frame_sent(10, 0, false, 4));
1988         assert!(!frame_sent(10, 0, false, 5));
1989         assert!(!frame_sent(10, 0, false, 100));
1990 
1991         // Empty data with fin is only a problem if there is no space.
1992         assert!(!frame_sent(0, 0, true, 1));
1993         assert!(frame_sent(0, 0, true, 2));
1994         assert!(!frame_sent(10, 0, true, 2));
1995         assert!(frame_sent(10, 0, true, 3));
1996         assert!(frame_sent(10, 0, true, 4));
1997         assert!(frame_sent(10, 0, true, 5));
1998         assert!(frame_sent(10, 0, true, 100));
1999     }
2000 
2001     #[test]
stream_frame_minimum()2002     fn stream_frame_minimum() {
2003         // Add minimum data
2004         assert!(!frame_sent(10, 1, false, 3));
2005         assert!(!frame_sent(10, 1, true, 3));
2006         assert!(frame_sent(10, 1, false, 4));
2007         assert!(frame_sent(10, 1, true, 4));
2008         assert!(frame_sent(10, 1, false, 5));
2009         assert!(frame_sent(10, 1, true, 5));
2010         assert!(frame_sent(10, 1, false, 100));
2011         assert!(frame_sent(10, 1, true, 100));
2012     }
2013 
2014     #[test]
stream_frame_more()2015     fn stream_frame_more() {
2016         // Try more data
2017         assert!(!frame_sent(10, 100, false, 3));
2018         assert!(!frame_sent(10, 100, true, 3));
2019         assert!(frame_sent(10, 100, false, 4));
2020         assert!(frame_sent(10, 100, true, 4));
2021         assert!(frame_sent(10, 100, false, 5));
2022         assert!(frame_sent(10, 100, true, 5));
2023         assert!(frame_sent(10, 100, false, 100));
2024         assert!(frame_sent(10, 100, true, 100));
2025 
2026         assert!(frame_sent(10, 100, false, 1000));
2027         assert!(frame_sent(10, 100, true, 1000));
2028     }
2029 
2030     #[test]
stream_frame_big_id()2031     fn stream_frame_big_id() {
2032         // A value that encodes to the largest varint.
2033         const BIG: u64 = 1 << 30;
2034         const BIGSZ: usize = 1 << 30;
2035 
2036         assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 16));
2037         assert!(!frame_sent_sid(BIG, BIGSZ, 0, true, 16));
2038         assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 17));
2039         assert!(frame_sent_sid(BIG, BIGSZ, 0, true, 17));
2040         assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 18));
2041         assert!(frame_sent_sid(BIG, BIGSZ, 0, true, 18));
2042 
2043         assert!(!frame_sent_sid(BIG, BIGSZ, 1, false, 17));
2044         assert!(!frame_sent_sid(BIG, BIGSZ, 1, true, 17));
2045         assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 18));
2046         assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 18));
2047         assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 19));
2048         assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 19));
2049         assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 100));
2050         assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 100));
2051     }
2052 
2053     #[test]
stream_frame_16384()2054     fn stream_frame_16384() {
2055         const DATA16384: &[u8] = &[0x43; 16384];
2056 
2057         // 16383/16384 is an odd boundary in STREAM frame construction.
2058         // That is the boundary where a length goes from 2 bytes to 4 bytes.
2059         // If the data fits in the available space, then it is simple:
2060         let mut s = stream_with_sent(0, 0);
2061         s.send(DATA16384).unwrap();
2062         s.close();
2063 
2064         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
2065         let header_len = builder.len();
2066         builder.set_limit(header_len + DATA16384.len() + 2);
2067         let mut tokens = Vec::new();
2068         let mut stats = FrameStats::default();
2069         s.write_stream_frame(
2070             TransmissionPriority::default(),
2071             &mut builder,
2072             &mut tokens,
2073             &mut stats,
2074         );
2075         assert_eq!(stats.stream, 1);
2076         // Expect STREAM + FIN only.
2077         assert_eq!(&builder[header_len..header_len + 2], &[0b1001, 0]);
2078         assert_eq!(&builder[header_len + 2..], DATA16384);
2079 
2080         s.mark_as_lost(0, DATA16384.len(), true);
2081 
2082         // However, if there is one extra byte of space, we will try to add a length.
2083         // That length will then make the frame to be too large and the data will be
2084         // truncated.  The frame could carry one more byte of data, but it's a corner
2085         // case we don't want to address as it should be rare (if not impossible).
2086         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
2087         let header_len = builder.len();
2088         builder.set_limit(header_len + DATA16384.len() + 3);
2089         s.write_stream_frame(
2090             TransmissionPriority::default(),
2091             &mut builder,
2092             &mut tokens,
2093             &mut stats,
2094         );
2095         assert_eq!(stats.stream, 2);
2096         // Expect STREAM + LEN + FIN.
2097         assert_eq!(
2098             &builder[header_len..header_len + 4],
2099             &[0b1010, 0, 0x7f, 0xfd]
2100         );
2101         assert_eq!(
2102             &builder[header_len + 4..],
2103             &DATA16384[..DATA16384.len() - 3]
2104         );
2105     }
2106 
2107     #[test]
stream_frame_64()2108     fn stream_frame_64() {
2109         const DATA64: &[u8] = &[0x43; 64];
2110 
2111         // Unlike 16383/16384, the boundary at 63/64 is easy because the difference
2112         // is just one byte.  We lose just the last byte when there is more space.
2113         let mut s = stream_with_sent(0, 0);
2114         s.send(DATA64).unwrap();
2115         s.close();
2116 
2117         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
2118         let header_len = builder.len();
2119         builder.set_limit(header_len + 66);
2120         let mut tokens = Vec::new();
2121         let mut stats = FrameStats::default();
2122         s.write_stream_frame(
2123             TransmissionPriority::default(),
2124             &mut builder,
2125             &mut tokens,
2126             &mut stats,
2127         );
2128         assert_eq!(stats.stream, 1);
2129         // Expect STREAM + FIN only.
2130         assert_eq!(&builder[header_len..header_len + 2], &[0b1001, 0]);
2131         assert_eq!(&builder[header_len + 2..], DATA64);
2132 
2133         s.mark_as_lost(0, DATA64.len(), true);
2134 
2135         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
2136         let header_len = builder.len();
2137         builder.set_limit(header_len + 67);
2138         s.write_stream_frame(
2139             TransmissionPriority::default(),
2140             &mut builder,
2141             &mut tokens,
2142             &mut stats,
2143         );
2144         assert_eq!(stats.stream, 2);
2145         // Expect STREAM + LEN, not FIN.
2146         assert_eq!(&builder[header_len..header_len + 3], &[0b1010, 0, 63]);
2147         assert_eq!(&builder[header_len + 3..], &DATA64[..63]);
2148     }
2149 }
2150