1 use super::*;
2 
3 use std::usize;
4 
5 #[derive(Debug)]
6 pub(super) struct Counts {
7     /// Acting as a client or server. This allows us to track which values to
8     /// inc / dec.
9     peer: peer::Dyn,
10 
11     /// Maximum number of locally initiated streams
12     max_send_streams: usize,
13 
14     /// Current number of remote initiated streams
15     num_send_streams: usize,
16 
17     /// Maximum number of remote initiated streams
18     max_recv_streams: usize,
19 
20     /// Current number of locally initiated streams
21     num_recv_streams: usize,
22 
23     /// Maximum number of pending locally reset streams
24     max_reset_streams: usize,
25 
26     /// Current number of pending locally reset streams
27     num_reset_streams: usize,
28 }
29 
30 impl Counts {
31     /// Create a new `Counts` using the provided configuration values.
new(peer: peer::Dyn, config: &Config) -> Self32     pub fn new(peer: peer::Dyn, config: &Config) -> Self {
33         Counts {
34             peer,
35             max_send_streams: config.initial_max_send_streams,
36             num_send_streams: 0,
37             max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
38             num_recv_streams: 0,
39             max_reset_streams: config.local_reset_max,
40             num_reset_streams: 0,
41         }
42     }
43 
44     /// Returns the current peer
peer(&self) -> peer::Dyn45     pub fn peer(&self) -> peer::Dyn {
46         self.peer
47     }
48 
has_streams(&self) -> bool49     pub fn has_streams(&self) -> bool {
50         self.num_send_streams != 0 || self.num_recv_streams != 0
51     }
52 
53     /// Returns true if the receive stream concurrency can be incremented
can_inc_num_recv_streams(&self) -> bool54     pub fn can_inc_num_recv_streams(&self) -> bool {
55         self.max_recv_streams > self.num_recv_streams
56     }
57 
58     /// Increments the number of concurrent receive streams.
59     ///
60     /// # Panics
61     ///
62     /// Panics on failure as this should have been validated before hand.
inc_num_recv_streams(&mut self, stream: &mut store::Ptr)63     pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
64         assert!(self.can_inc_num_recv_streams());
65         assert!(!stream.is_counted);
66 
67         // Increment the number of remote initiated streams
68         self.num_recv_streams += 1;
69         stream.is_counted = true;
70     }
71 
72     /// Returns true if the send stream concurrency can be incremented
can_inc_num_send_streams(&self) -> bool73     pub fn can_inc_num_send_streams(&self) -> bool {
74         self.max_send_streams > self.num_send_streams
75     }
76 
77     /// Increments the number of concurrent send streams.
78     ///
79     /// # Panics
80     ///
81     /// Panics on failure as this should have been validated before hand.
inc_num_send_streams(&mut self, stream: &mut store::Ptr)82     pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
83         assert!(self.can_inc_num_send_streams());
84         assert!(!stream.is_counted);
85 
86         // Increment the number of remote initiated streams
87         self.num_send_streams += 1;
88         stream.is_counted = true;
89     }
90 
91     /// Returns true if the number of pending reset streams can be incremented.
can_inc_num_reset_streams(&self) -> bool92     pub fn can_inc_num_reset_streams(&self) -> bool {
93         self.max_reset_streams > self.num_reset_streams
94     }
95 
96     /// Increments the number of pending reset streams.
97     ///
98     /// # Panics
99     ///
100     /// Panics on failure as this should have been validated before hand.
inc_num_reset_streams(&mut self)101     pub fn inc_num_reset_streams(&mut self) {
102         assert!(self.can_inc_num_reset_streams());
103 
104         self.num_reset_streams += 1;
105     }
106 
apply_remote_settings(&mut self, settings: &frame::Settings)107     pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
108         if let Some(val) = settings.max_concurrent_streams() {
109             self.max_send_streams = val as usize;
110         }
111     }
112 
113     /// Run a block of code that could potentially transition a stream's state.
114     ///
115     /// If the stream state transitions to closed, this function will perform
116     /// all necessary cleanup.
117     ///
118     /// TODO: Is this function still needed?
transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U where F: FnOnce(&mut Self, &mut store::Ptr) -> U,119     pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
120     where
121         F: FnOnce(&mut Self, &mut store::Ptr) -> U,
122     {
123         // TODO: Does this need to be computed before performing the action?
124         let is_pending_reset = stream.is_pending_reset_expiration();
125 
126         // Run the action
127         let ret = f(self, &mut stream);
128 
129         self.transition_after(stream, is_pending_reset);
130 
131         ret
132     }
133 
134     // TODO: move this to macro?
transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool)135     pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
136         tracing::trace!(
137             "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
138              pending_send_empty={:?}; buffered_send_data={}; \
139              num_recv={}; num_send={}",
140             stream.id,
141             stream.state,
142             stream.is_closed(),
143             stream.pending_send.is_empty(),
144             stream.buffered_send_data,
145             self.num_recv_streams,
146             self.num_send_streams
147         );
148 
149         if stream.is_closed() {
150             if !stream.is_pending_reset_expiration() {
151                 stream.unlink();
152                 if is_reset_counted {
153                     self.dec_num_reset_streams();
154                 }
155             }
156 
157             if stream.is_counted {
158                 tracing::trace!("dec_num_streams; stream={:?}", stream.id);
159                 // Decrement the number of active streams.
160                 self.dec_num_streams(&mut stream);
161             }
162         }
163 
164         // Release the stream if it requires releasing
165         if stream.is_released() {
166             stream.remove();
167         }
168     }
169 
170     /// Returns the maximum number of streams that can be initiated by this
171     /// peer.
max_send_streams(&self) -> usize172     pub(crate) fn max_send_streams(&self) -> usize {
173         self.max_send_streams
174     }
175 
176     /// Returns the maximum number of streams that can be initiated by the
177     /// remote peer.
max_recv_streams(&self) -> usize178     pub(crate) fn max_recv_streams(&self) -> usize {
179         self.max_recv_streams
180     }
181 
dec_num_streams(&mut self, stream: &mut store::Ptr)182     fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
183         assert!(stream.is_counted);
184 
185         if self.peer.is_local_init(stream.id) {
186             assert!(self.num_send_streams > 0);
187             self.num_send_streams -= 1;
188             stream.is_counted = false;
189         } else {
190             assert!(self.num_recv_streams > 0);
191             self.num_recv_streams -= 1;
192             stream.is_counted = false;
193         }
194     }
195 
dec_num_reset_streams(&mut self)196     fn dec_num_reset_streams(&mut self) {
197         assert!(self.num_reset_streams > 0);
198         self.num_reset_streams -= 1;
199     }
200 }
201 
202 impl Drop for Counts {
drop(&mut self)203     fn drop(&mut self) {
204         use std::thread;
205 
206         if !thread::panicking() {
207             debug_assert!(!self.has_streams());
208         }
209     }
210 }
211