1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 // Tracks possibly-redundant flow control signals from other code and converts 8 // into flow control frames needing to be sent to the remote. 9 10 use std::collections::HashMap; 11 use std::mem; 12 13 use neqo_common::qwarn; 14 use smallvec::{smallvec, SmallVec}; 15 16 use crate::frame::{write_varint_frame, Frame}; 17 use crate::packet::PacketBuilder; 18 use crate::recovery::RecoveryToken; 19 use crate::stats::FrameStats; 20 use crate::stream_id::{StreamIndex, StreamIndexes, StreamType}; 21 use crate::Res; 22 23 type FlowFrame = Frame<'static>; 24 pub type FlowControlRecoveryToken = FlowFrame; 25 26 #[derive(Debug, Default)] 27 pub struct FlowMgr { 28 // (stream_type, discriminant) as key ensures only 1 of every frame type 29 // per stream type will be queued. 30 from_stream_types: HashMap<(StreamType, mem::Discriminant<FlowFrame>), FlowFrame>, 31 } 32 33 impl FlowMgr { 34 // -- frames scoped on stream type -- 35 max_streams(&mut self, stream_limit: StreamIndex, stream_type: StreamType)36 pub fn max_streams(&mut self, stream_limit: StreamIndex, stream_type: StreamType) { 37 let frame = Frame::MaxStreams { 38 stream_type, 39 maximum_streams: stream_limit, 40 }; 41 self.from_stream_types 42 .insert((stream_type, mem::discriminant(&frame)), frame); 43 } 44 streams_blocked(&mut self, stream_limit: StreamIndex, stream_type: StreamType)45 pub fn streams_blocked(&mut self, stream_limit: StreamIndex, stream_type: StreamType) { 46 let frame = Frame::StreamsBlocked { 47 stream_type, 48 stream_limit, 49 }; 50 self.from_stream_types 51 .insert((stream_type, mem::discriminant(&frame)), frame); 52 } 53 peek(&self) -> Option<&Frame>54 pub fn peek(&self) -> Option<&Frame> { 55 if let Some(key) = self.from_stream_types.keys().next() { 56 return self.from_stream_types.get(key); 57 } 58 None 59 } 60 lost(&mut self, token: &FlowControlRecoveryToken, indexes: &mut StreamIndexes)61 pub(crate) fn lost(&mut self, token: &FlowControlRecoveryToken, indexes: &mut StreamIndexes) { 62 match *token { 63 // Resend MaxStreams if lost (with updated value) 64 Frame::MaxStreams { stream_type, .. } => { 65 let local_max = match stream_type { 66 StreamType::BiDi => &mut indexes.local_max_stream_bidi, 67 StreamType::UniDi => &mut indexes.local_max_stream_uni, 68 }; 69 70 self.max_streams(*local_max, stream_type) 71 } 72 // Only resend "*Blocked" frames if still blocked 73 Frame::StreamsBlocked { stream_type, .. } => match stream_type { 74 StreamType::UniDi => { 75 if indexes.remote_next_stream_uni >= indexes.remote_max_stream_uni { 76 self.streams_blocked(indexes.remote_max_stream_uni, StreamType::UniDi); 77 } 78 } 79 StreamType::BiDi => { 80 if indexes.remote_next_stream_bidi >= indexes.remote_max_stream_bidi { 81 self.streams_blocked(indexes.remote_max_stream_bidi, StreamType::BiDi); 82 } 83 } 84 }, 85 _ => qwarn!("Unexpected Flow frame {:?} lost, not re-sent", token), 86 } 87 } 88 write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, ) -> Res<()>89 pub(crate) fn write_frames( 90 &mut self, 91 builder: &mut PacketBuilder, 92 tokens: &mut Vec<RecoveryToken>, 93 stats: &mut FrameStats, 94 ) -> Res<()> { 95 while let Some(frame) = self.peek() { 96 // All these frames are bags of varints, so we can just extract the 97 // varints and use common code for writing. 98 let (mut values, stat): (SmallVec<[_; 3]>, _) = match frame { 99 Frame::MaxStreams { 100 maximum_streams, .. 101 } => (smallvec![maximum_streams.as_u64()], &mut stats.max_streams), 102 Frame::StreamsBlocked { stream_limit, .. } => { 103 (smallvec![stream_limit.as_u64()], &mut stats.streams_blocked) 104 } 105 _ => unreachable!("{:?}", frame), 106 }; 107 values.insert(0, frame.get_type()); 108 debug_assert!(!values.spilled()); 109 110 if write_varint_frame(builder, &values)? { 111 tokens.push(RecoveryToken::Flow(self.next().unwrap())); 112 *stat += 1; 113 } else { 114 return Ok(()); 115 } 116 } 117 Ok(()) 118 } 119 } 120 121 impl Iterator for FlowMgr { 122 type Item = FlowFrame; 123 124 /// Used by generator to get a flow control frame. next(&mut self) -> Option<Self::Item>125 fn next(&mut self) -> Option<Self::Item> { 126 let first_key = self.from_stream_types.keys().next(); 127 if let Some(&first_key) = first_key { 128 return self.from_stream_types.remove(&first_key); 129 } 130 None 131 } 132 } 133