1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 // Tracks possibly-redundant flow control signals from other code and converts
8 // into flow control frames needing to be sent to the remote.
9 
10 use crate::frame::{
11     FRAME_TYPE_DATA_BLOCKED, FRAME_TYPE_MAX_DATA, FRAME_TYPE_MAX_STREAMS_BIDI,
12     FRAME_TYPE_MAX_STREAMS_UNIDI, FRAME_TYPE_MAX_STREAM_DATA, FRAME_TYPE_STREAMS_BLOCKED_BIDI,
13     FRAME_TYPE_STREAMS_BLOCKED_UNIDI, FRAME_TYPE_STREAM_DATA_BLOCKED,
14 };
15 use crate::packet::PacketBuilder;
16 use crate::recovery::{RecoveryToken, StreamRecoveryToken};
17 use crate::stats::FrameStats;
18 use crate::stream_id::{StreamId, StreamType};
19 use crate::{Error, Res};
20 use neqo_common::{qtrace, Role};
21 
22 use std::convert::TryFrom;
23 use std::fmt::Debug;
24 use std::ops::{Deref, DerefMut};
25 use std::ops::{Index, IndexMut};
26 
27 #[derive(Debug)]
28 pub struct SenderFlowControl<T>
29 where
30     T: Debug + Sized,
31 {
32     /// The thing that we're counting for.
33     subject: T,
34     /// The limit.
35     limit: u64,
36     /// How much of that limit we've used.
37     used: u64,
38     /// The point at which blocking occurred.  This is updated each time
39     /// the sender decides that it is blocked.  It only ever changes
40     /// when blocking occurs.  This ensures that blocking at any given limit
41     /// is only reported once.
42     /// Note: All values are one greater than the corresponding `limit` to
43     /// allow distinguishing between blocking at a limit of 0 and no blocking.
44     blocked_at: u64,
45     /// Whether a blocked frame should be sent.
46     blocked_frame: bool,
47 }
48 
49 impl<T> SenderFlowControl<T>
50 where
51     T: Debug + Sized,
52 {
53     /// Make a new instance with the initial value and subject.
new(subject: T, initial: u64) -> Self54     pub fn new(subject: T, initial: u64) -> Self {
55         Self {
56             subject,
57             limit: initial,
58             used: 0,
59             blocked_at: 0,
60             blocked_frame: false,
61         }
62     }
63 
64     /// Update the maximum.  Returns `true` if the change was an increase.
update(&mut self, limit: u64) -> bool65     pub fn update(&mut self, limit: u64) -> bool {
66         debug_assert!(limit < u64::MAX);
67         if limit > self.limit {
68             self.limit = limit;
69             self.blocked_frame = false;
70             true
71         } else {
72             false
73         }
74     }
75 
76     /// Consume flow control.
consume(&mut self, count: usize)77     pub fn consume(&mut self, count: usize) {
78         let amt = u64::try_from(count).unwrap();
79         debug_assert!(self.used + amt <= self.limit);
80         self.used += amt;
81     }
82 
83     /// Get available flow control.
available(&self) -> usize84     pub fn available(&self) -> usize {
85         usize::try_from(self.limit - self.used).unwrap_or(usize::MAX)
86     }
87 
88     /// How much data has been written.
used(&self) -> u6489     pub fn used(&self) -> u64 {
90         self.used
91     }
92 
93     /// Mark flow control as blocked.
94     /// This only does something if the current limit exceeds the last reported blocking limit.
blocked(&mut self)95     pub fn blocked(&mut self) {
96         if self.limit >= self.blocked_at {
97             self.blocked_at = self.limit + 1;
98             self.blocked_frame = true;
99         }
100     }
101 
102     /// Return whether a blocking frame needs to be sent.
103     /// This is `Some` with the active limit if `blocked` has been called,
104     /// if a blocking frame has not been sent (or it has been lost), and
105     /// if the blocking condition remains.
blocked_needed(&self) -> Option<u64>106     fn blocked_needed(&self) -> Option<u64> {
107         if self.blocked_frame && self.limit < self.blocked_at {
108             Some(self.blocked_at - 1)
109         } else {
110             None
111         }
112     }
113 
114     /// Clear the need to send a blocked frame.
blocked_sent(&mut self)115     fn blocked_sent(&mut self) {
116         self.blocked_frame = false;
117     }
118 
119     /// Mark a blocked frame as having been lost.
120     /// Only send again if value of `self.blocked_at` hasn't increased since sending.
121     /// That would imply that the limit has since increased.
frame_lost(&mut self, limit: u64)122     pub fn frame_lost(&mut self, limit: u64) {
123         if self.blocked_at == limit + 1 {
124             self.blocked_frame = true;
125         }
126     }
127 }
128 
129 impl SenderFlowControl<()> {
write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )130     pub fn write_frames(
131         &mut self,
132         builder: &mut PacketBuilder,
133         tokens: &mut Vec<RecoveryToken>,
134         stats: &mut FrameStats,
135     ) {
136         if let Some(limit) = self.blocked_needed() {
137             if builder.write_varint_frame(&[FRAME_TYPE_DATA_BLOCKED, limit]) {
138                 stats.data_blocked += 1;
139                 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::DataBlocked(
140                     limit,
141                 )));
142                 self.blocked_sent();
143             }
144         }
145     }
146 }
147 
148 impl SenderFlowControl<StreamId> {
write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )149     pub fn write_frames(
150         &mut self,
151         builder: &mut PacketBuilder,
152         tokens: &mut Vec<RecoveryToken>,
153         stats: &mut FrameStats,
154     ) {
155         if let Some(limit) = self.blocked_needed() {
156             if builder.write_varint_frame(&[
157                 FRAME_TYPE_STREAM_DATA_BLOCKED,
158                 self.subject.as_u64(),
159                 limit,
160             ]) {
161                 stats.stream_data_blocked += 1;
162                 tokens.push(RecoveryToken::Stream(
163                     StreamRecoveryToken::StreamDataBlocked {
164                         stream_id: self.subject,
165                         limit,
166                     },
167                 ));
168                 self.blocked_sent();
169             }
170         }
171     }
172 }
173 
174 impl SenderFlowControl<StreamType> {
write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )175     pub fn write_frames(
176         &mut self,
177         builder: &mut PacketBuilder,
178         tokens: &mut Vec<RecoveryToken>,
179         stats: &mut FrameStats,
180     ) {
181         if let Some(limit) = self.blocked_needed() {
182             let frame = match self.subject {
183                 StreamType::BiDi => FRAME_TYPE_STREAMS_BLOCKED_BIDI,
184                 StreamType::UniDi => FRAME_TYPE_STREAMS_BLOCKED_UNIDI,
185             };
186             if builder.write_varint_frame(&[frame, limit]) {
187                 stats.streams_blocked += 1;
188                 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::StreamsBlocked {
189                     stream_type: self.subject,
190                     limit,
191                 }));
192                 self.blocked_sent();
193             }
194         }
195     }
196 }
197 
198 #[derive(Debug)]
199 pub struct ReceiverFlowControl<T>
200 where
201     T: Debug + Sized,
202 {
203     /// The thing that we're counting for.
204     subject: T,
205     /// The maximum amount of items that can be active (e.g., the size of the receive buffer).
206     max_active: u64,
207     /// Last max allowed sent.
208     max_allowed: u64,
209     /// Item received, but not retired yet.
210     /// This will be used for byte flow control: each stream will remember is largest byte
211     /// offset received and session flow control will remember the sum of all bytes consumed
212     /// by all streams.
213     consumed: u64,
214     /// Retired items.
215     retired: u64,
216     frame_pending: bool,
217 }
218 
219 impl<T> ReceiverFlowControl<T>
220 where
221     T: Debug + Sized,
222 {
223     /// Make a new instance with the initial value and subject.
new(subject: T, max: u64) -> Self224     pub fn new(subject: T, max: u64) -> Self {
225         Self {
226             subject,
227             max_active: max,
228             max_allowed: max,
229             consumed: 0,
230             retired: 0,
231             frame_pending: false,
232         }
233     }
234 
235     /// Retired some items and maybe send flow control
236     /// update.
retire(&mut self, retired: u64)237     pub fn retire(&mut self, retired: u64) {
238         if retired <= self.retired {
239             return;
240         }
241 
242         self.retired = retired;
243         if self.retired + self.max_active / 2 > self.max_allowed {
244             self.frame_pending = true;
245         }
246     }
247 
248     /// This function is called when STREAM_DATA_BLOCKED frame is received.
249     /// The flow control will try to send an update if possible.
send_flowc_update(&mut self)250     pub fn send_flowc_update(&mut self) {
251         if self.retired + self.max_active > self.max_allowed {
252             self.frame_pending = true;
253         }
254     }
255 
frame_needed(&self) -> bool256     pub fn frame_needed(&self) -> bool {
257         self.frame_pending
258     }
259 
next_limit(&self) -> u64260     pub fn next_limit(&self) -> u64 {
261         self.retired + self.max_active
262     }
263 
max_active(&self) -> u64264     pub fn max_active(&self) -> u64 {
265         self.max_active
266     }
267 
frame_lost(&mut self, maximum_data: u64)268     pub fn frame_lost(&mut self, maximum_data: u64) {
269         if maximum_data == self.max_allowed {
270             self.frame_pending = true;
271         }
272     }
273 
frame_sent(&mut self, new_max: u64)274     fn frame_sent(&mut self, new_max: u64) {
275         self.max_allowed = new_max;
276         self.frame_pending = false;
277     }
278 
set_max_active(&mut self, max: u64)279     pub fn set_max_active(&mut self, max: u64) {
280         // If max_active has been increased, send an update immediately.
281         self.frame_pending |= self.max_active < max;
282         self.max_active = max;
283     }
284 
retired(&self) -> u64285     pub fn retired(&self) -> u64 {
286         self.retired
287     }
288 
consumed(&self) -> u64289     pub fn consumed(&self) -> u64 {
290         self.consumed
291     }
292 }
293 
294 impl ReceiverFlowControl<()> {
write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )295     pub fn write_frames(
296         &mut self,
297         builder: &mut PacketBuilder,
298         tokens: &mut Vec<RecoveryToken>,
299         stats: &mut FrameStats,
300     ) {
301         if !self.frame_needed() {
302             return;
303         }
304         let max_allowed = self.next_limit();
305         if builder.write_varint_frame(&[FRAME_TYPE_MAX_DATA, max_allowed]) {
306             stats.max_data += 1;
307             tokens.push(RecoveryToken::Stream(StreamRecoveryToken::MaxData(
308                 max_allowed,
309             )));
310             self.frame_sent(max_allowed);
311         }
312     }
313 
add_retired(&mut self, count: u64)314     pub fn add_retired(&mut self, count: u64) {
315         debug_assert!(self.retired + count <= self.consumed);
316         self.retired += count;
317         if self.retired + self.max_active / 2 > self.max_allowed {
318             self.frame_pending = true;
319         }
320     }
321 
consume(&mut self, count: u64) -> Res<()>322     pub fn consume(&mut self, count: u64) -> Res<()> {
323         if self.consumed + count > self.max_allowed {
324             qtrace!(
325                 "Session RX window exceeded: consumed:{} new:{} limit:{}",
326                 self.consumed,
327                 count,
328                 self.max_allowed
329             );
330             return Err(Error::FlowControlError);
331         }
332         self.consumed += count;
333         Ok(())
334     }
335 }
336 
337 impl Default for ReceiverFlowControl<()> {
default() -> Self338     fn default() -> Self {
339         Self::new((), 0)
340     }
341 }
342 
343 impl ReceiverFlowControl<StreamId> {
write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )344     pub fn write_frames(
345         &mut self,
346         builder: &mut PacketBuilder,
347         tokens: &mut Vec<RecoveryToken>,
348         stats: &mut FrameStats,
349     ) {
350         if !self.frame_needed() {
351             return;
352         }
353         let max_allowed = self.next_limit();
354         if builder.write_varint_frame(&[
355             FRAME_TYPE_MAX_STREAM_DATA,
356             self.subject.as_u64(),
357             max_allowed,
358         ]) {
359             stats.max_stream_data += 1;
360             tokens.push(RecoveryToken::Stream(StreamRecoveryToken::MaxStreamData {
361                 stream_id: self.subject,
362                 max_data: max_allowed,
363             }));
364             self.frame_sent(max_allowed);
365         }
366     }
367 
add_retired(&mut self, count: u64)368     pub fn add_retired(&mut self, count: u64) {
369         debug_assert!(self.retired + count <= self.consumed);
370         self.retired += count;
371         if self.retired + self.max_active / 2 > self.max_allowed {
372             self.frame_pending = true;
373         }
374     }
375 
set_consumed(&mut self, consumed: u64) -> Res<u64>376     pub fn set_consumed(&mut self, consumed: u64) -> Res<u64> {
377         if consumed <= self.consumed {
378             return Ok(0);
379         }
380 
381         if consumed > self.max_allowed {
382             qtrace!("Stream RX window exceeded: {}", consumed);
383             return Err(Error::FlowControlError);
384         }
385         let new_consumed = consumed - self.consumed;
386         self.consumed = consumed;
387         Ok(new_consumed)
388     }
389 }
390 
391 impl Default for ReceiverFlowControl<StreamId> {
default() -> Self392     fn default() -> Self {
393         Self::new(StreamId::new(0), 0)
394     }
395 }
396 
397 impl ReceiverFlowControl<StreamType> {
write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )398     pub fn write_frames(
399         &mut self,
400         builder: &mut PacketBuilder,
401         tokens: &mut Vec<RecoveryToken>,
402         stats: &mut FrameStats,
403     ) {
404         if !self.frame_needed() {
405             return;
406         }
407         let max_streams = self.next_limit();
408         let frame = match self.subject {
409             StreamType::BiDi => FRAME_TYPE_MAX_STREAMS_BIDI,
410             StreamType::UniDi => FRAME_TYPE_MAX_STREAMS_UNIDI,
411         };
412         if builder.write_varint_frame(&[frame, max_streams]) {
413             stats.max_streams += 1;
414             tokens.push(RecoveryToken::Stream(StreamRecoveryToken::MaxStreams {
415                 stream_type: self.subject,
416                 max_streams,
417             }));
418             self.frame_sent(max_streams);
419         }
420     }
421 
422     /// Check if received item exceeds the allowed flow control limit.
check_allowed(&self, new_end: u64) -> bool423     pub fn check_allowed(&self, new_end: u64) -> bool {
424         new_end < self.max_allowed
425     }
426 
427     /// Retire given amount of additional data.
428     /// This function will send flow updates immediately.
add_retired(&mut self, count: u64)429     pub fn add_retired(&mut self, count: u64) {
430         self.retired += count;
431         if count > 0 {
432             self.send_flowc_update();
433         }
434     }
435 }
436 
437 pub struct RemoteStreamLimit {
438     streams_fc: ReceiverFlowControl<StreamType>,
439     next_stream: StreamId,
440 }
441 
442 impl RemoteStreamLimit {
new(stream_type: StreamType, max_streams: u64, role: Role) -> Self443     pub fn new(stream_type: StreamType, max_streams: u64, role: Role) -> Self {
444         Self {
445             streams_fc: ReceiverFlowControl::new(stream_type, max_streams),
446             // // This is for a stream created by a peer, therefore we use role.remote().
447             next_stream: StreamId::init(stream_type, role.remote()),
448         }
449     }
450 
is_allowed(&self, stream_id: StreamId) -> bool451     pub fn is_allowed(&self, stream_id: StreamId) -> bool {
452         let stream_idx = stream_id.as_u64() >> 2;
453         self.streams_fc.check_allowed(stream_idx)
454     }
455 
is_new_stream(&self, stream_id: StreamId) -> Res<bool>456     pub fn is_new_stream(&self, stream_id: StreamId) -> Res<bool> {
457         if !self.is_allowed(stream_id) {
458             return Err(Error::StreamLimitError);
459         }
460         Ok(stream_id >= self.next_stream)
461     }
462 
take_stream_id(&mut self) -> StreamId463     pub fn take_stream_id(&mut self) -> StreamId {
464         let new_stream = self.next_stream;
465         self.next_stream.next();
466         assert!(self.is_allowed(new_stream));
467         new_stream
468     }
469 }
470 
471 impl Deref for RemoteStreamLimit {
472     type Target = ReceiverFlowControl<StreamType>;
deref(&self) -> &Self::Target473     fn deref(&self) -> &Self::Target {
474         &self.streams_fc
475     }
476 }
477 
478 impl DerefMut for RemoteStreamLimit {
deref_mut(&mut self) -> &mut Self::Target479     fn deref_mut(&mut self) -> &mut Self::Target {
480         &mut self.streams_fc
481     }
482 }
483 
484 pub struct RemoteStreamLimits {
485     bidirectional: RemoteStreamLimit,
486     unidirectional: RemoteStreamLimit,
487 }
488 
489 impl RemoteStreamLimits {
new(local_max_stream_bidi: u64, local_max_stream_uni: u64, role: Role) -> Self490     pub fn new(local_max_stream_bidi: u64, local_max_stream_uni: u64, role: Role) -> Self {
491         Self {
492             bidirectional: RemoteStreamLimit::new(StreamType::BiDi, local_max_stream_bidi, role),
493             unidirectional: RemoteStreamLimit::new(StreamType::UniDi, local_max_stream_uni, role),
494         }
495     }
496 }
497 
498 impl Index<StreamType> for RemoteStreamLimits {
499     type Output = RemoteStreamLimit;
500 
index(&self, idx: StreamType) -> &Self::Output501     fn index(&self, idx: StreamType) -> &Self::Output {
502         match idx {
503             StreamType::BiDi => &self.bidirectional,
504             StreamType::UniDi => &self.unidirectional,
505         }
506     }
507 }
508 
509 impl IndexMut<StreamType> for RemoteStreamLimits {
index_mut(&mut self, idx: StreamType) -> &mut Self::Output510     fn index_mut(&mut self, idx: StreamType) -> &mut Self::Output {
511         match idx {
512             StreamType::BiDi => &mut self.bidirectional,
513             StreamType::UniDi => &mut self.unidirectional,
514         }
515     }
516 }
517 
518 pub struct LocalStreamLimits {
519     bidirectional: SenderFlowControl<StreamType>,
520     unidirectional: SenderFlowControl<StreamType>,
521     role_bit: u64,
522 }
523 
524 impl LocalStreamLimits {
new(role: Role) -> Self525     pub fn new(role: Role) -> Self {
526         Self {
527             bidirectional: SenderFlowControl::new(StreamType::BiDi, 0),
528             unidirectional: SenderFlowControl::new(StreamType::UniDi, 0),
529             role_bit: StreamId::role_bit(role),
530         }
531     }
532 
take_stream_id(&mut self, stream_type: StreamType) -> Option<StreamId>533     pub fn take_stream_id(&mut self, stream_type: StreamType) -> Option<StreamId> {
534         let fc = match stream_type {
535             StreamType::BiDi => &mut self.bidirectional,
536             StreamType::UniDi => &mut self.unidirectional,
537         };
538         if fc.available() > 0 {
539             let new_stream = fc.used();
540             fc.consume(1);
541             let type_bit = match stream_type {
542                 StreamType::BiDi => 0,
543                 StreamType::UniDi => 2,
544             };
545             Some(StreamId::from((new_stream << 2) + type_bit + self.role_bit))
546         } else {
547             fc.blocked();
548             None
549         }
550     }
551 }
552 
553 impl Index<StreamType> for LocalStreamLimits {
554     type Output = SenderFlowControl<StreamType>;
555 
index(&self, idx: StreamType) -> &Self::Output556     fn index(&self, idx: StreamType) -> &Self::Output {
557         match idx {
558             StreamType::BiDi => &self.bidirectional,
559             StreamType::UniDi => &self.unidirectional,
560         }
561     }
562 }
563 
564 impl IndexMut<StreamType> for LocalStreamLimits {
index_mut(&mut self, idx: StreamType) -> &mut Self::Output565     fn index_mut(&mut self, idx: StreamType) -> &mut Self::Output {
566         match idx {
567             StreamType::BiDi => &mut self.bidirectional,
568             StreamType::UniDi => &mut self.unidirectional,
569         }
570     }
571 }
572 
573 #[cfg(test)]
574 mod test {
575     use super::{LocalStreamLimits, ReceiverFlowControl, RemoteStreamLimits, SenderFlowControl};
576     use crate::packet::PacketBuilder;
577     use crate::stats::FrameStats;
578     use crate::stream_id::{StreamId, StreamType};
579     use crate::Error;
580     use neqo_common::{Encoder, Role};
581 
582     #[test]
blocked_at_zero()583     fn blocked_at_zero() {
584         let mut fc = SenderFlowControl::new((), 0);
585         fc.blocked();
586         assert_eq!(fc.blocked_needed(), Some(0));
587     }
588 
589     #[test]
blocked()590     fn blocked() {
591         let mut fc = SenderFlowControl::new((), 10);
592         fc.blocked();
593         assert_eq!(fc.blocked_needed(), Some(10));
594     }
595 
596     #[test]
update_consume()597     fn update_consume() {
598         let mut fc = SenderFlowControl::new((), 10);
599         fc.consume(10);
600         assert_eq!(fc.available(), 0);
601         fc.update(5); // An update lower than the current limit does nothing.
602         assert_eq!(fc.available(), 0);
603         fc.update(15);
604         assert_eq!(fc.available(), 5);
605         fc.consume(3);
606         assert_eq!(fc.available(), 2);
607     }
608 
609     #[test]
update_clears_blocked()610     fn update_clears_blocked() {
611         let mut fc = SenderFlowControl::new((), 10);
612         fc.blocked();
613         assert_eq!(fc.blocked_needed(), Some(10));
614         fc.update(5); // An update lower than the current limit does nothing.
615         assert_eq!(fc.blocked_needed(), Some(10));
616         fc.update(11);
617         assert_eq!(fc.blocked_needed(), None);
618     }
619 
620     #[test]
lost_blocked_resent()621     fn lost_blocked_resent() {
622         let mut fc = SenderFlowControl::new((), 10);
623         fc.blocked();
624         fc.blocked_sent();
625         assert_eq!(fc.blocked_needed(), None);
626         fc.frame_lost(10);
627         assert_eq!(fc.blocked_needed(), Some(10));
628     }
629 
630     #[test]
lost_after_increase()631     fn lost_after_increase() {
632         let mut fc = SenderFlowControl::new((), 10);
633         fc.blocked();
634         fc.blocked_sent();
635         assert_eq!(fc.blocked_needed(), None);
636         fc.update(11);
637         fc.frame_lost(10);
638         assert_eq!(fc.blocked_needed(), None);
639     }
640 
641     #[test]
lost_after_higher_blocked()642     fn lost_after_higher_blocked() {
643         let mut fc = SenderFlowControl::new((), 10);
644         fc.blocked();
645         fc.blocked_sent();
646         fc.update(11);
647         fc.blocked();
648         assert_eq!(fc.blocked_needed(), Some(11));
649         fc.blocked_sent();
650         fc.frame_lost(10);
651         assert_eq!(fc.blocked_needed(), None);
652     }
653 
654     #[test]
do_no_need_max_allowed_frame_at_start()655     fn do_no_need_max_allowed_frame_at_start() {
656         let fc = ReceiverFlowControl::new((), 0);
657         assert!(!fc.frame_needed());
658     }
659 
660     #[test]
max_allowed_after_items_retired()661     fn max_allowed_after_items_retired() {
662         let mut fc = ReceiverFlowControl::new((), 100);
663         fc.retire(49);
664         assert!(!fc.frame_needed());
665         fc.retire(51);
666         assert!(fc.frame_needed());
667         assert_eq!(fc.next_limit(), 151);
668     }
669 
670     #[test]
need_max_allowed_frame_after_loss()671     fn need_max_allowed_frame_after_loss() {
672         let mut fc = ReceiverFlowControl::new((), 100);
673         fc.retire(100);
674         assert!(fc.frame_needed());
675         assert_eq!(fc.next_limit(), 200);
676         fc.frame_sent(200);
677         assert!(!fc.frame_needed());
678         fc.frame_lost(200);
679         assert!(fc.frame_needed());
680         assert_eq!(fc.next_limit(), 200);
681     }
682 
683     #[test]
no_max_allowed_frame_after_old_loss()684     fn no_max_allowed_frame_after_old_loss() {
685         let mut fc = ReceiverFlowControl::new((), 100);
686         fc.retire(51);
687         assert!(fc.frame_needed());
688         assert_eq!(fc.next_limit(), 151);
689         fc.frame_sent(151);
690         assert!(!fc.frame_needed());
691         fc.retire(102);
692         assert!(fc.frame_needed());
693         assert_eq!(fc.next_limit(), 202);
694         fc.frame_sent(202);
695         assert!(!fc.frame_needed());
696         fc.frame_lost(151);
697         assert!(!fc.frame_needed());
698     }
699 
700     #[test]
force_send_max_allowed()701     fn force_send_max_allowed() {
702         let mut fc = ReceiverFlowControl::new((), 100);
703         fc.retire(10);
704         assert!(!fc.frame_needed());
705     }
706 
707     #[test]
multiple_retries_after_frame_pending_is_set()708     fn multiple_retries_after_frame_pending_is_set() {
709         let mut fc = ReceiverFlowControl::new((), 100);
710         fc.retire(51);
711         assert!(fc.frame_needed());
712         assert_eq!(fc.next_limit(), 151);
713         fc.retire(61);
714         assert!(fc.frame_needed());
715         assert_eq!(fc.next_limit(), 161);
716         fc.retire(88);
717         assert!(fc.frame_needed());
718         assert_eq!(fc.next_limit(), 188);
719         fc.retire(90);
720         assert!(fc.frame_needed());
721         assert_eq!(fc.next_limit(), 190);
722         fc.frame_sent(190);
723         assert!(!fc.frame_needed());
724         fc.retire(141);
725         assert!(fc.frame_needed());
726         assert_eq!(fc.next_limit(), 241);
727         fc.frame_sent(241);
728         assert!(!fc.frame_needed());
729     }
730 
731     #[test]
new_retired_before_loss()732     fn new_retired_before_loss() {
733         let mut fc = ReceiverFlowControl::new((), 100);
734         fc.retire(51);
735         assert!(fc.frame_needed());
736         assert_eq!(fc.next_limit(), 151);
737         fc.frame_sent(151);
738         assert!(!fc.frame_needed());
739         fc.retire(62);
740         assert!(!fc.frame_needed());
741         fc.frame_lost(151);
742         assert!(fc.frame_needed());
743         assert_eq!(fc.next_limit(), 162);
744     }
745 
746     #[test]
changing_max_active()747     fn changing_max_active() {
748         let mut fc = ReceiverFlowControl::new((), 100);
749         fc.set_max_active(50);
750         // There is no MAX_STREAM_DATA frame needed.
751         assert!(!fc.frame_needed());
752         // We can still retire more than 50.
753         fc.retire(60);
754         // There is no MAX_STREAM_DATA fame needed yet.
755         assert!(!fc.frame_needed());
756         fc.retire(76);
757         assert!(fc.frame_needed());
758         assert_eq!(fc.next_limit(), 126);
759 
760         // Increase max_active.
761         fc.set_max_active(60);
762         assert!(fc.frame_needed());
763         assert_eq!(fc.next_limit(), 136);
764 
765         // We can retire more than 60.
766         fc.retire(136);
767         assert!(fc.frame_needed());
768         assert_eq!(fc.next_limit(), 196);
769     }
770 
remote_stream_limits(role: Role, bidi: u64, unidi: u64)771     fn remote_stream_limits(role: Role, bidi: u64, unidi: u64) {
772         let mut fc = RemoteStreamLimits::new(2, 1, role);
773         assert!(fc[StreamType::BiDi]
774             .is_new_stream(StreamId::from(bidi))
775             .unwrap());
776         assert!(fc[StreamType::BiDi]
777             .is_new_stream(StreamId::from(bidi + 4))
778             .unwrap());
779         assert!(fc[StreamType::UniDi]
780             .is_new_stream(StreamId::from(unidi))
781             .unwrap());
782 
783         // Exceed limits
784         assert_eq!(
785             fc[StreamType::BiDi].is_new_stream(StreamId::from(bidi + 8)),
786             Err(Error::StreamLimitError)
787         );
788         assert_eq!(
789             fc[StreamType::UniDi].is_new_stream(StreamId::from(unidi + 4)),
790             Err(Error::StreamLimitError)
791         );
792 
793         assert_eq!(fc[StreamType::BiDi].take_stream_id(), StreamId::from(bidi));
794         assert_eq!(
795             fc[StreamType::BiDi].take_stream_id(),
796             StreamId::from(bidi + 4)
797         );
798         assert_eq!(
799             fc[StreamType::UniDi].take_stream_id(),
800             StreamId::from(unidi)
801         );
802 
803         fc[StreamType::BiDi].add_retired(1);
804         fc[StreamType::BiDi].send_flowc_update();
805         // consume the frame
806         let mut builder = PacketBuilder::short(Encoder::new(), false, &[]);
807         let mut tokens = Vec::new();
808         fc[StreamType::BiDi].write_frames(&mut builder, &mut tokens, &mut FrameStats::default());
809         assert_eq!(tokens.len(), 1);
810 
811         // Now 9 can be a new StreamId.
812         assert!(fc[StreamType::BiDi]
813             .is_new_stream(StreamId::from(bidi + 8))
814             .unwrap());
815         assert_eq!(
816             fc[StreamType::BiDi].take_stream_id(),
817             StreamId::from(bidi + 8)
818         );
819         // 13 still exceeds limits
820         assert_eq!(
821             fc[StreamType::BiDi].is_new_stream(StreamId::from(bidi + 12)),
822             Err(Error::StreamLimitError)
823         );
824 
825         fc[StreamType::UniDi].add_retired(1);
826         fc[StreamType::UniDi].send_flowc_update();
827         // consume the frame
828         fc[StreamType::UniDi].write_frames(&mut builder, &mut tokens, &mut FrameStats::default());
829         assert_eq!(tokens.len(), 2);
830 
831         // Now 7 can be a new StreamId.
832         assert!(fc[StreamType::UniDi]
833             .is_new_stream(StreamId::from(unidi + 4))
834             .unwrap());
835         assert_eq!(
836             fc[StreamType::UniDi].take_stream_id(),
837             StreamId::from(unidi + 4)
838         );
839         // 11 exceeds limits
840         assert_eq!(
841             fc[StreamType::UniDi].is_new_stream(StreamId::from(unidi + 8)),
842             Err(Error::StreamLimitError)
843         );
844     }
845 
846     #[test]
remote_stream_limits_new_stream_client()847     fn remote_stream_limits_new_stream_client() {
848         remote_stream_limits(Role::Client, 1, 3);
849     }
850 
851     #[test]
remote_stream_limits_new_stream_server()852     fn remote_stream_limits_new_stream_server() {
853         remote_stream_limits(Role::Server, 0, 2);
854     }
855 
856     #[should_panic]
857     #[test]
remote_stream_limits_asserts_if_limit_exceeded()858     fn remote_stream_limits_asserts_if_limit_exceeded() {
859         let mut fc = RemoteStreamLimits::new(2, 1, Role::Client);
860         assert_eq!(fc[StreamType::BiDi].take_stream_id(), StreamId::from(1));
861         assert_eq!(fc[StreamType::BiDi].take_stream_id(), StreamId::from(5));
862         let _ = fc[StreamType::BiDi].take_stream_id();
863     }
864 
local_stream_limits(role: Role, bidi: u64, unidi: u64)865     fn local_stream_limits(role: Role, bidi: u64, unidi: u64) {
866         let mut fc = LocalStreamLimits::new(role);
867 
868         fc[StreamType::BiDi].update(2);
869         fc[StreamType::UniDi].update(1);
870 
871         // Add streams
872         assert_eq!(
873             fc.take_stream_id(StreamType::BiDi).unwrap(),
874             StreamId::from(bidi)
875         );
876         assert_eq!(
877             fc.take_stream_id(StreamType::BiDi).unwrap(),
878             StreamId::from(bidi + 4)
879         );
880         assert_eq!(fc.take_stream_id(StreamType::BiDi), None);
881         assert_eq!(
882             fc.take_stream_id(StreamType::UniDi).unwrap(),
883             StreamId::from(unidi)
884         );
885         assert_eq!(fc.take_stream_id(StreamType::UniDi), None);
886 
887         // Increase limit
888         fc[StreamType::BiDi].update(3);
889         fc[StreamType::UniDi].update(2);
890         assert_eq!(
891             fc.take_stream_id(StreamType::BiDi).unwrap(),
892             StreamId::from(bidi + 8)
893         );
894         assert_eq!(fc.take_stream_id(StreamType::BiDi), None);
895         assert_eq!(
896             fc.take_stream_id(StreamType::UniDi).unwrap(),
897             StreamId::from(unidi + 4)
898         );
899         assert_eq!(fc.take_stream_id(StreamType::UniDi), None);
900     }
901 
902     #[test]
local_stream_limits_new_stream_client()903     fn local_stream_limits_new_stream_client() {
904         local_stream_limits(Role::Client, 0, 2);
905     }
906 
907     #[test]
local_stream_limits_new_stream_server()908     fn local_stream_limits_new_stream_server() {
909         local_stream_limits(Role::Server, 1, 3);
910     }
911 }
912