1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 // Tracks possibly-redundant flow control signals from other code and converts
8 // into flow control frames needing to be sent to the remote.
9 
10 use std::collections::HashMap;
11 use std::mem;
12 
13 use neqo_common::qwarn;
14 use smallvec::{smallvec, SmallVec};
15 
16 use crate::frame::{write_varint_frame, Frame};
17 use crate::packet::PacketBuilder;
18 use crate::recovery::RecoveryToken;
19 use crate::stats::FrameStats;
20 use crate::stream_id::{StreamIndex, StreamIndexes, StreamType};
21 use crate::Res;
22 
23 type FlowFrame = Frame<'static>;
24 pub type FlowControlRecoveryToken = FlowFrame;
25 
26 #[derive(Debug, Default)]
27 pub struct FlowMgr {
28     // (stream_type, discriminant) as key ensures only 1 of every frame type
29     // per stream type will be queued.
30     from_stream_types: HashMap<(StreamType, mem::Discriminant<FlowFrame>), FlowFrame>,
31 }
32 
33 impl FlowMgr {
34     // -- frames scoped on stream type --
35 
max_streams(&mut self, stream_limit: StreamIndex, stream_type: StreamType)36     pub fn max_streams(&mut self, stream_limit: StreamIndex, stream_type: StreamType) {
37         let frame = Frame::MaxStreams {
38             stream_type,
39             maximum_streams: stream_limit,
40         };
41         self.from_stream_types
42             .insert((stream_type, mem::discriminant(&frame)), frame);
43     }
44 
streams_blocked(&mut self, stream_limit: StreamIndex, stream_type: StreamType)45     pub fn streams_blocked(&mut self, stream_limit: StreamIndex, stream_type: StreamType) {
46         let frame = Frame::StreamsBlocked {
47             stream_type,
48             stream_limit,
49         };
50         self.from_stream_types
51             .insert((stream_type, mem::discriminant(&frame)), frame);
52     }
53 
peek(&self) -> Option<&Frame>54     pub fn peek(&self) -> Option<&Frame> {
55         if let Some(key) = self.from_stream_types.keys().next() {
56             return self.from_stream_types.get(key);
57         }
58         None
59     }
60 
lost(&mut self, token: &FlowControlRecoveryToken, indexes: &mut StreamIndexes)61     pub(crate) fn lost(&mut self, token: &FlowControlRecoveryToken, indexes: &mut StreamIndexes) {
62         match *token {
63             // Resend MaxStreams if lost (with updated value)
64             Frame::MaxStreams { stream_type, .. } => {
65                 let local_max = match stream_type {
66                     StreamType::BiDi => &mut indexes.local_max_stream_bidi,
67                     StreamType::UniDi => &mut indexes.local_max_stream_uni,
68                 };
69 
70                 self.max_streams(*local_max, stream_type)
71             }
72             // Only resend "*Blocked" frames if still blocked
73             Frame::StreamsBlocked { stream_type, .. } => match stream_type {
74                 StreamType::UniDi => {
75                     if indexes.remote_next_stream_uni >= indexes.remote_max_stream_uni {
76                         self.streams_blocked(indexes.remote_max_stream_uni, StreamType::UniDi);
77                     }
78                 }
79                 StreamType::BiDi => {
80                     if indexes.remote_next_stream_bidi >= indexes.remote_max_stream_bidi {
81                         self.streams_blocked(indexes.remote_max_stream_bidi, StreamType::BiDi);
82                     }
83                 }
84             },
85             _ => qwarn!("Unexpected Flow frame {:?} lost, not re-sent", token),
86         }
87     }
88 
write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, ) -> Res<()>89     pub(crate) fn write_frames(
90         &mut self,
91         builder: &mut PacketBuilder,
92         tokens: &mut Vec<RecoveryToken>,
93         stats: &mut FrameStats,
94     ) -> Res<()> {
95         while let Some(frame) = self.peek() {
96             // All these frames are bags of varints, so we can just extract the
97             // varints and use common code for writing.
98             let (mut values, stat): (SmallVec<[_; 3]>, _) = match frame {
99                 Frame::MaxStreams {
100                     maximum_streams, ..
101                 } => (smallvec![maximum_streams.as_u64()], &mut stats.max_streams),
102                 Frame::StreamsBlocked { stream_limit, .. } => {
103                     (smallvec![stream_limit.as_u64()], &mut stats.streams_blocked)
104                 }
105                 _ => unreachable!("{:?}", frame),
106             };
107             values.insert(0, frame.get_type());
108             debug_assert!(!values.spilled());
109 
110             if write_varint_frame(builder, &values)? {
111                 tokens.push(RecoveryToken::Flow(self.next().unwrap()));
112                 *stat += 1;
113             } else {
114                 return Ok(());
115             }
116         }
117         Ok(())
118     }
119 }
120 
121 impl Iterator for FlowMgr {
122     type Item = FlowFrame;
123 
124     /// Used by generator to get a flow control frame.
next(&mut self) -> Option<Self::Item>125     fn next(&mut self) -> Option<Self::Item> {
126         let first_key = self.from_stream_types.keys().next();
127         if let Some(&first_key) = first_key {
128             return self.from_stream_types.remove(&first_key);
129         }
130         None
131     }
132 }
133