1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 // Collecting a list of events relevant to whoever is using the Connection. 8 9 use std::cell::RefCell; 10 use std::collections::VecDeque; 11 use std::rc::Rc; 12 13 use crate::connection::State; 14 use crate::quic_datagrams::DatagramTracking; 15 use crate::stream_id::{StreamId, StreamType}; 16 use crate::{AppError, Stats}; 17 use neqo_common::event::Provider as EventProvider; 18 use neqo_crypto::ResumptionToken; 19 20 #[derive(Debug, PartialOrd, Ord, PartialEq, Eq)] 21 pub enum OutgoingDatagramOutcome { 22 DroppedTooBig, 23 DroppedQueueFull, 24 Lost, 25 Acked, 26 } 27 28 #[derive(Debug, PartialOrd, Ord, PartialEq, Eq)] 29 pub enum ConnectionEvent { 30 /// Cert authentication needed 31 AuthenticationNeeded, 32 /// Encrypted client hello fallback occurred. The certificate for the 33 /// public name needs to be authenticated. 34 EchFallbackAuthenticationNeeded { 35 public_name: String, 36 }, 37 /// A new uni (read) or bidi stream has been opened by the peer. 38 NewStream { 39 stream_id: StreamId, 40 }, 41 /// Space available in the buffer for an application write to succeed. 42 SendStreamWritable { 43 stream_id: StreamId, 44 }, 45 /// New bytes available for reading. 46 RecvStreamReadable { 47 stream_id: StreamId, 48 }, 49 /// Peer reset the stream. 50 RecvStreamReset { 51 stream_id: StreamId, 52 app_error: AppError, 53 }, 54 /// Peer has sent STOP_SENDING 55 SendStreamStopSending { 56 stream_id: StreamId, 57 app_error: AppError, 58 }, 59 /// Peer has acked everything sent on the stream. 60 SendStreamComplete { 61 stream_id: StreamId, 62 }, 63 /// Peer increased MAX_STREAMS 64 SendStreamCreatable { 65 stream_type: StreamType, 66 }, 67 /// Connection state change. 68 StateChange(State), 69 /// The server rejected 0-RTT. 70 /// This event invalidates all state in streams that has been created. 71 /// Any data written to streams needs to be written again. 72 ZeroRttRejected, 73 ResumptionToken(ResumptionToken), 74 Datagram(Vec<u8>), 75 OutgoingDatagramOutcome { 76 id: u64, 77 outcome: OutgoingDatagramOutcome, 78 }, 79 IncomingDatagramDropped, 80 } 81 82 #[derive(Debug, Default, Clone)] 83 #[allow(clippy::module_name_repetitions)] 84 pub struct ConnectionEvents { 85 events: Rc<RefCell<VecDeque<ConnectionEvent>>>, 86 } 87 88 impl ConnectionEvents { authentication_needed(&self)89 pub fn authentication_needed(&self) { 90 self.insert(ConnectionEvent::AuthenticationNeeded); 91 } 92 ech_fallback_authentication_needed(&self, public_name: String)93 pub fn ech_fallback_authentication_needed(&self, public_name: String) { 94 self.insert(ConnectionEvent::EchFallbackAuthenticationNeeded { public_name }); 95 } 96 new_stream(&self, stream_id: StreamId)97 pub fn new_stream(&self, stream_id: StreamId) { 98 self.insert(ConnectionEvent::NewStream { stream_id }); 99 } 100 recv_stream_readable(&self, stream_id: StreamId)101 pub fn recv_stream_readable(&self, stream_id: StreamId) { 102 self.insert(ConnectionEvent::RecvStreamReadable { stream_id }); 103 } 104 recv_stream_reset(&self, stream_id: StreamId, app_error: AppError)105 pub fn recv_stream_reset(&self, stream_id: StreamId, app_error: AppError) { 106 // If reset, no longer readable. 107 self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64())); 108 109 self.insert(ConnectionEvent::RecvStreamReset { 110 stream_id, 111 app_error, 112 }); 113 } 114 send_stream_writable(&self, stream_id: StreamId)115 pub fn send_stream_writable(&self, stream_id: StreamId) { 116 self.insert(ConnectionEvent::SendStreamWritable { stream_id }); 117 } 118 send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError)119 pub fn send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError) { 120 // If stopped, no longer writable. 121 self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id)); 122 123 self.insert(ConnectionEvent::SendStreamStopSending { 124 stream_id, 125 app_error, 126 }); 127 } 128 send_stream_complete(&self, stream_id: StreamId)129 pub fn send_stream_complete(&self, stream_id: StreamId) { 130 self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id)); 131 132 self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } if *x == stream_id.as_u64())); 133 134 self.insert(ConnectionEvent::SendStreamComplete { stream_id }); 135 } 136 send_stream_creatable(&self, stream_type: StreamType)137 pub fn send_stream_creatable(&self, stream_type: StreamType) { 138 self.insert(ConnectionEvent::SendStreamCreatable { stream_type }); 139 } 140 connection_state_change(&self, state: State)141 pub fn connection_state_change(&self, state: State) { 142 // If closing, existing events no longer relevant. 143 match state { 144 State::Closing { .. } | State::Closed(_) => self.events.borrow_mut().clear(), 145 _ => (), 146 } 147 self.insert(ConnectionEvent::StateChange(state)); 148 } 149 client_resumption_token(&self, token: ResumptionToken)150 pub fn client_resumption_token(&self, token: ResumptionToken) { 151 self.insert(ConnectionEvent::ResumptionToken(token)); 152 } 153 client_0rtt_rejected(&self)154 pub fn client_0rtt_rejected(&self) { 155 // If 0rtt rejected, must start over and existing events are no longer 156 // relevant. 157 self.events.borrow_mut().clear(); 158 self.insert(ConnectionEvent::ZeroRttRejected); 159 } 160 recv_stream_complete(&self, stream_id: StreamId)161 pub fn recv_stream_complete(&self, stream_id: StreamId) { 162 // If stopped, no longer readable. 163 self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64())); 164 } 165 166 // The number of datagrams in the events queue is limited to max_queued_datagrams. 167 // This function ensure this and deletes the oldest datagrams if needed. check_datagram_queued(&self, max_queued_datagrams: usize, stats: &mut Stats)168 fn check_datagram_queued(&self, max_queued_datagrams: usize, stats: &mut Stats) { 169 let mut q = self.events.borrow_mut(); 170 let mut remove = None; 171 if q.iter() 172 .filter(|evt| matches!(evt, ConnectionEvent::Datagram(_))) 173 .count() 174 == max_queued_datagrams 175 { 176 if let Some(d) = q 177 .iter() 178 .rev() 179 .enumerate() 180 .filter(|(_, evt)| matches!(evt, ConnectionEvent::Datagram(_))) 181 .take(1) 182 .next() 183 { 184 remove = Some(d.0); 185 } 186 } 187 if let Some(r) = remove { 188 q.remove(r); 189 q.push_back(ConnectionEvent::IncomingDatagramDropped); 190 stats.incoming_datagram_dropped += 1; 191 } 192 } 193 add_datagram(&self, max_queued_datagrams: usize, data: &[u8], stats: &mut Stats)194 pub fn add_datagram(&self, max_queued_datagrams: usize, data: &[u8], stats: &mut Stats) { 195 self.check_datagram_queued(max_queued_datagrams, stats); 196 self.events 197 .borrow_mut() 198 .push_back(ConnectionEvent::Datagram(data.to_vec())); 199 } 200 datagram_outcome( &self, dgram_tracker: &DatagramTracking, outcome: OutgoingDatagramOutcome, )201 pub fn datagram_outcome( 202 &self, 203 dgram_tracker: &DatagramTracking, 204 outcome: OutgoingDatagramOutcome, 205 ) { 206 if let DatagramTracking::Id(id) = dgram_tracker { 207 self.events 208 .borrow_mut() 209 .push_back(ConnectionEvent::OutgoingDatagramOutcome { id: *id, outcome }); 210 } 211 } 212 insert(&self, event: ConnectionEvent)213 fn insert(&self, event: ConnectionEvent) { 214 let mut q = self.events.borrow_mut(); 215 216 // Special-case two enums that are not strictly PartialEq equal but that 217 // we wish to avoid inserting duplicates. 218 let already_present = match &event { 219 ConnectionEvent::SendStreamStopSending { stream_id, .. } => q.iter().any(|evt| { 220 matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } 221 if *x == *stream_id) 222 }), 223 ConnectionEvent::RecvStreamReset { stream_id, .. } => q.iter().any(|evt| { 224 matches!(evt, ConnectionEvent::RecvStreamReset { stream_id: x, .. } 225 if *x == *stream_id) 226 }), 227 _ => q.contains(&event), 228 }; 229 if !already_present { 230 q.push_back(event); 231 } 232 } 233 remove<F>(&self, f: F) where F: Fn(&ConnectionEvent) -> bool,234 fn remove<F>(&self, f: F) 235 where 236 F: Fn(&ConnectionEvent) -> bool, 237 { 238 self.events.borrow_mut().retain(|evt| !f(evt)) 239 } 240 } 241 242 impl EventProvider for ConnectionEvents { 243 type Event = ConnectionEvent; 244 has_events(&self) -> bool245 fn has_events(&self) -> bool { 246 !self.events.borrow().is_empty() 247 } 248 next_event(&mut self) -> Option<Self::Event>249 fn next_event(&mut self) -> Option<Self::Event> { 250 self.events.borrow_mut().pop_front() 251 } 252 } 253 254 #[cfg(test)] 255 mod tests { 256 use super::*; 257 use crate::{ConnectionError, Error}; 258 259 #[test] event_culling()260 fn event_culling() { 261 let mut evts = ConnectionEvents::default(); 262 263 evts.client_0rtt_rejected(); 264 evts.client_0rtt_rejected(); 265 assert_eq!(evts.events().count(), 1); 266 assert_eq!(evts.events().count(), 0); 267 268 evts.new_stream(4.into()); 269 evts.new_stream(4.into()); 270 assert_eq!(evts.events().count(), 1); 271 272 evts.recv_stream_readable(6.into()); 273 evts.recv_stream_reset(6.into(), 66); 274 evts.recv_stream_reset(6.into(), 65); 275 assert_eq!(evts.events().count(), 1); 276 277 evts.send_stream_writable(8.into()); 278 evts.send_stream_writable(8.into()); 279 evts.send_stream_stop_sending(8.into(), 55); 280 evts.send_stream_stop_sending(8.into(), 56); 281 let events = evts.events().collect::<Vec<_>>(); 282 assert_eq!(events.len(), 1); 283 assert_eq!( 284 events[0], 285 ConnectionEvent::SendStreamStopSending { 286 stream_id: StreamId::new(8), 287 app_error: 55 288 } 289 ); 290 291 evts.send_stream_writable(8.into()); 292 evts.send_stream_writable(8.into()); 293 evts.send_stream_stop_sending(8.into(), 55); 294 evts.send_stream_stop_sending(8.into(), 56); 295 evts.send_stream_complete(8.into()); 296 assert_eq!(evts.events().count(), 1); 297 298 evts.send_stream_writable(8.into()); 299 evts.send_stream_writable(9.into()); 300 evts.send_stream_stop_sending(10.into(), 55); 301 evts.send_stream_stop_sending(11.into(), 56); 302 evts.send_stream_complete(12.into()); 303 assert_eq!(evts.events().count(), 5); 304 305 evts.send_stream_writable(8.into()); 306 evts.send_stream_writable(9.into()); 307 evts.send_stream_stop_sending(10.into(), 55); 308 evts.send_stream_stop_sending(11.into(), 56); 309 evts.send_stream_complete(12.into()); 310 evts.client_0rtt_rejected(); 311 assert_eq!(evts.events().count(), 1); 312 313 evts.send_stream_writable(9.into()); 314 evts.send_stream_stop_sending(10.into(), 55); 315 evts.connection_state_change(State::Closed(ConnectionError::Transport( 316 Error::StreamStateError, 317 ))); 318 assert_eq!(evts.events().count(), 1); 319 } 320 } 321