1 use super::*; 2 3 use std::usize; 4 5 #[derive(Debug)] 6 pub(super) struct Counts { 7 /// Acting as a client or server. This allows us to track which values to 8 /// inc / dec. 9 peer: peer::Dyn, 10 11 /// Maximum number of locally initiated streams 12 max_send_streams: usize, 13 14 /// Current number of remote initiated streams 15 num_send_streams: usize, 16 17 /// Maximum number of remote initiated streams 18 max_recv_streams: usize, 19 20 /// Current number of locally initiated streams 21 num_recv_streams: usize, 22 23 /// Maximum number of pending locally reset streams 24 max_reset_streams: usize, 25 26 /// Current number of pending locally reset streams 27 num_reset_streams: usize, 28 } 29 30 impl Counts { 31 /// Create a new `Counts` using the provided configuration values. new(peer: peer::Dyn, config: &Config) -> Self32 pub fn new(peer: peer::Dyn, config: &Config) -> Self { 33 Counts { 34 peer, 35 max_send_streams: config.initial_max_send_streams, 36 num_send_streams: 0, 37 max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), 38 num_recv_streams: 0, 39 max_reset_streams: config.local_reset_max, 40 num_reset_streams: 0, 41 } 42 } 43 44 /// Returns the current peer peer(&self) -> peer::Dyn45 pub fn peer(&self) -> peer::Dyn { 46 self.peer 47 } 48 has_streams(&self) -> bool49 pub fn has_streams(&self) -> bool { 50 self.num_send_streams != 0 || self.num_recv_streams != 0 51 } 52 53 /// Returns true if the receive stream concurrency can be incremented can_inc_num_recv_streams(&self) -> bool54 pub fn can_inc_num_recv_streams(&self) -> bool { 55 self.max_recv_streams > self.num_recv_streams 56 } 57 58 /// Increments the number of concurrent receive streams. 59 /// 60 /// # Panics 61 /// 62 /// Panics on failure as this should have been validated before hand. inc_num_recv_streams(&mut self, stream: &mut store::Ptr)63 pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) { 64 assert!(self.can_inc_num_recv_streams()); 65 assert!(!stream.is_counted); 66 67 // Increment the number of remote initiated streams 68 self.num_recv_streams += 1; 69 stream.is_counted = true; 70 } 71 72 /// Returns true if the send stream concurrency can be incremented can_inc_num_send_streams(&self) -> bool73 pub fn can_inc_num_send_streams(&self) -> bool { 74 self.max_send_streams > self.num_send_streams 75 } 76 77 /// Increments the number of concurrent send streams. 78 /// 79 /// # Panics 80 /// 81 /// Panics on failure as this should have been validated before hand. inc_num_send_streams(&mut self, stream: &mut store::Ptr)82 pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) { 83 assert!(self.can_inc_num_send_streams()); 84 assert!(!stream.is_counted); 85 86 // Increment the number of remote initiated streams 87 self.num_send_streams += 1; 88 stream.is_counted = true; 89 } 90 91 /// Returns true if the number of pending reset streams can be incremented. can_inc_num_reset_streams(&self) -> bool92 pub fn can_inc_num_reset_streams(&self) -> bool { 93 self.max_reset_streams > self.num_reset_streams 94 } 95 96 /// Increments the number of pending reset streams. 97 /// 98 /// # Panics 99 /// 100 /// Panics on failure as this should have been validated before hand. inc_num_reset_streams(&mut self)101 pub fn inc_num_reset_streams(&mut self) { 102 assert!(self.can_inc_num_reset_streams()); 103 104 self.num_reset_streams += 1; 105 } 106 apply_remote_settings(&mut self, settings: &frame::Settings)107 pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { 108 if let Some(val) = settings.max_concurrent_streams() { 109 self.max_send_streams = val as usize; 110 } 111 } 112 113 /// Run a block of code that could potentially transition a stream's state. 114 /// 115 /// If the stream state transitions to closed, this function will perform 116 /// all necessary cleanup. 117 /// 118 /// TODO: Is this function still needed? transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U where F: FnOnce(&mut Self, &mut store::Ptr) -> U,119 pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U 120 where 121 F: FnOnce(&mut Self, &mut store::Ptr) -> U, 122 { 123 // TODO: Does this need to be computed before performing the action? 124 let is_pending_reset = stream.is_pending_reset_expiration(); 125 126 // Run the action 127 let ret = f(self, &mut stream); 128 129 self.transition_after(stream, is_pending_reset); 130 131 ret 132 } 133 134 // TODO: move this to macro? transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool)135 pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) { 136 tracing::trace!( 137 "transition_after; stream={:?}; state={:?}; is_closed={:?}; \ 138 pending_send_empty={:?}; buffered_send_data={}; \ 139 num_recv={}; num_send={}", 140 stream.id, 141 stream.state, 142 stream.is_closed(), 143 stream.pending_send.is_empty(), 144 stream.buffered_send_data, 145 self.num_recv_streams, 146 self.num_send_streams 147 ); 148 149 if stream.is_closed() { 150 if !stream.is_pending_reset_expiration() { 151 stream.unlink(); 152 if is_reset_counted { 153 self.dec_num_reset_streams(); 154 } 155 } 156 157 if stream.is_counted { 158 tracing::trace!("dec_num_streams; stream={:?}", stream.id); 159 // Decrement the number of active streams. 160 self.dec_num_streams(&mut stream); 161 } 162 } 163 164 // Release the stream if it requires releasing 165 if stream.is_released() { 166 stream.remove(); 167 } 168 } 169 dec_num_streams(&mut self, stream: &mut store::Ptr)170 fn dec_num_streams(&mut self, stream: &mut store::Ptr) { 171 assert!(stream.is_counted); 172 173 if self.peer.is_local_init(stream.id) { 174 assert!(self.num_send_streams > 0); 175 self.num_send_streams -= 1; 176 stream.is_counted = false; 177 } else { 178 assert!(self.num_recv_streams > 0); 179 self.num_recv_streams -= 1; 180 stream.is_counted = false; 181 } 182 } 183 dec_num_reset_streams(&mut self)184 fn dec_num_reset_streams(&mut self) { 185 assert!(self.num_reset_streams > 0); 186 self.num_reset_streams -= 1; 187 } 188 } 189 190 impl Drop for Counts { drop(&mut self)191 fn drop(&mut self) { 192 use std::thread; 193 194 if !thread::panicking() { 195 debug_assert!(!self.has_streams()); 196 } 197 } 198 } 199