1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 // Collecting a list of events relevant to whoever is using the Connection. 8 9 use std::cell::RefCell; 10 use std::collections::VecDeque; 11 use std::rc::Rc; 12 13 use crate::connection::State; 14 use crate::stream_id::{StreamId, StreamType}; 15 use crate::AppError; 16 use neqo_common::event::Provider as EventProvider; 17 use neqo_crypto::ResumptionToken; 18 19 #[derive(Debug, PartialOrd, Ord, PartialEq, Eq)] 20 pub enum ConnectionEvent { 21 /// Cert authentication needed 22 AuthenticationNeeded, 23 /// Encrypted client hello fallback occurred. The certificate for the 24 /// public name needs to be authenticated. 25 EchFallbackAuthenticationNeeded { 26 public_name: String, 27 }, 28 /// A new uni (read) or bidi stream has been opened by the peer. 29 NewStream { 30 stream_id: StreamId, 31 }, 32 /// Space available in the buffer for an application write to succeed. 33 SendStreamWritable { 34 stream_id: StreamId, 35 }, 36 /// New bytes available for reading. 37 RecvStreamReadable { 38 stream_id: u64, 39 }, 40 /// Peer reset the stream. 41 RecvStreamReset { 42 stream_id: u64, 43 app_error: AppError, 44 }, 45 /// Peer has sent STOP_SENDING 46 SendStreamStopSending { 47 stream_id: u64, 48 app_error: AppError, 49 }, 50 /// Peer has acked everything sent on the stream. 51 SendStreamComplete { 52 stream_id: u64, 53 }, 54 /// Peer increased MAX_STREAMS 55 SendStreamCreatable { 56 stream_type: StreamType, 57 }, 58 /// Connection state change. 59 StateChange(State), 60 /// The server rejected 0-RTT. 61 /// This event invalidates all state in streams that has been created. 62 /// Any data written to streams needs to be written again. 63 ZeroRttRejected, 64 ResumptionToken(ResumptionToken), 65 } 66 67 #[derive(Debug, Default, Clone)] 68 #[allow(clippy::module_name_repetitions)] 69 pub struct ConnectionEvents { 70 events: Rc<RefCell<VecDeque<ConnectionEvent>>>, 71 } 72 73 impl ConnectionEvents { authentication_needed(&self)74 pub fn authentication_needed(&self) { 75 self.insert(ConnectionEvent::AuthenticationNeeded); 76 } 77 ech_fallback_authentication_needed(&self, public_name: String)78 pub fn ech_fallback_authentication_needed(&self, public_name: String) { 79 self.insert(ConnectionEvent::EchFallbackAuthenticationNeeded { public_name }); 80 } 81 new_stream(&self, stream_id: StreamId)82 pub fn new_stream(&self, stream_id: StreamId) { 83 self.insert(ConnectionEvent::NewStream { stream_id }); 84 } 85 recv_stream_readable(&self, stream_id: StreamId)86 pub fn recv_stream_readable(&self, stream_id: StreamId) { 87 self.insert(ConnectionEvent::RecvStreamReadable { 88 stream_id: stream_id.as_u64(), 89 }); 90 } 91 recv_stream_reset(&self, stream_id: StreamId, app_error: AppError)92 pub fn recv_stream_reset(&self, stream_id: StreamId, app_error: AppError) { 93 // If reset, no longer readable. 94 self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64())); 95 96 self.insert(ConnectionEvent::RecvStreamReset { 97 stream_id: stream_id.as_u64(), 98 app_error, 99 }); 100 } 101 send_stream_writable(&self, stream_id: StreamId)102 pub fn send_stream_writable(&self, stream_id: StreamId) { 103 self.insert(ConnectionEvent::SendStreamWritable { stream_id }); 104 } 105 send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError)106 pub fn send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError) { 107 // If stopped, no longer writable. 108 self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id.as_u64())); 109 110 self.insert(ConnectionEvent::SendStreamStopSending { 111 stream_id: stream_id.as_u64(), 112 app_error, 113 }); 114 } 115 send_stream_complete(&self, stream_id: StreamId)116 pub fn send_stream_complete(&self, stream_id: StreamId) { 117 self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id)); 118 119 self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } if *x == stream_id.as_u64())); 120 121 self.insert(ConnectionEvent::SendStreamComplete { 122 stream_id: stream_id.as_u64(), 123 }); 124 } 125 send_stream_creatable(&self, stream_type: StreamType)126 pub fn send_stream_creatable(&self, stream_type: StreamType) { 127 self.insert(ConnectionEvent::SendStreamCreatable { stream_type }); 128 } 129 connection_state_change(&self, state: State)130 pub fn connection_state_change(&self, state: State) { 131 // If closing, existing events no longer relevant. 132 match state { 133 State::Closing { .. } | State::Closed(_) => self.events.borrow_mut().clear(), 134 _ => (), 135 } 136 self.insert(ConnectionEvent::StateChange(state)); 137 } 138 client_resumption_token(&self, token: ResumptionToken)139 pub fn client_resumption_token(&self, token: ResumptionToken) { 140 self.insert(ConnectionEvent::ResumptionToken(token)); 141 } 142 client_0rtt_rejected(&self)143 pub fn client_0rtt_rejected(&self) { 144 // If 0rtt rejected, must start over and existing events are no longer 145 // relevant. 146 self.events.borrow_mut().clear(); 147 self.insert(ConnectionEvent::ZeroRttRejected); 148 } 149 recv_stream_complete(&self, stream_id: StreamId)150 pub fn recv_stream_complete(&self, stream_id: StreamId) { 151 // If stopped, no longer readable. 152 self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64())); 153 } 154 insert(&self, event: ConnectionEvent)155 fn insert(&self, event: ConnectionEvent) { 156 let mut q = self.events.borrow_mut(); 157 158 // Special-case two enums that are not strictly PartialEq equal but that 159 // we wish to avoid inserting duplicates. 160 let already_present = match &event { 161 ConnectionEvent::SendStreamStopSending { stream_id, .. } => q.iter().any(|evt| { 162 matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } 163 if *x == *stream_id) 164 }), 165 ConnectionEvent::RecvStreamReset { stream_id, .. } => q.iter().any(|evt| { 166 matches!(evt, ConnectionEvent::RecvStreamReset { stream_id: x, .. } 167 if *x == *stream_id) 168 }), 169 _ => q.contains(&event), 170 }; 171 if !already_present { 172 q.push_back(event); 173 } 174 } 175 remove<F>(&self, f: F) where F: Fn(&ConnectionEvent) -> bool,176 fn remove<F>(&self, f: F) 177 where 178 F: Fn(&ConnectionEvent) -> bool, 179 { 180 self.events.borrow_mut().retain(|evt| !f(evt)) 181 } 182 } 183 184 impl EventProvider for ConnectionEvents { 185 type Event = ConnectionEvent; 186 has_events(&self) -> bool187 fn has_events(&self) -> bool { 188 !self.events.borrow().is_empty() 189 } 190 next_event(&mut self) -> Option<Self::Event>191 fn next_event(&mut self) -> Option<Self::Event> { 192 self.events.borrow_mut().pop_front() 193 } 194 } 195 196 #[cfg(test)] 197 mod tests { 198 use super::*; 199 use crate::{ConnectionError, Error}; 200 201 #[test] event_culling()202 fn event_culling() { 203 let mut evts = ConnectionEvents::default(); 204 205 evts.client_0rtt_rejected(); 206 evts.client_0rtt_rejected(); 207 assert_eq!(evts.events().count(), 1); 208 assert_eq!(evts.events().count(), 0); 209 210 evts.new_stream(4.into()); 211 evts.new_stream(4.into()); 212 assert_eq!(evts.events().count(), 1); 213 214 evts.recv_stream_readable(6.into()); 215 evts.recv_stream_reset(6.into(), 66); 216 evts.recv_stream_reset(6.into(), 65); 217 assert_eq!(evts.events().count(), 1); 218 219 evts.send_stream_writable(8.into()); 220 evts.send_stream_writable(8.into()); 221 evts.send_stream_stop_sending(8.into(), 55); 222 evts.send_stream_stop_sending(8.into(), 56); 223 let events = evts.events().collect::<Vec<_>>(); 224 assert_eq!(events.len(), 1); 225 assert_eq!( 226 events[0], 227 ConnectionEvent::SendStreamStopSending { 228 stream_id: 8, 229 app_error: 55 230 } 231 ); 232 233 evts.send_stream_writable(8.into()); 234 evts.send_stream_writable(8.into()); 235 evts.send_stream_stop_sending(8.into(), 55); 236 evts.send_stream_stop_sending(8.into(), 56); 237 evts.send_stream_complete(8.into()); 238 assert_eq!(evts.events().count(), 1); 239 240 evts.send_stream_writable(8.into()); 241 evts.send_stream_writable(9.into()); 242 evts.send_stream_stop_sending(10.into(), 55); 243 evts.send_stream_stop_sending(11.into(), 56); 244 evts.send_stream_complete(12.into()); 245 assert_eq!(evts.events().count(), 5); 246 247 evts.send_stream_writable(8.into()); 248 evts.send_stream_writable(9.into()); 249 evts.send_stream_stop_sending(10.into(), 55); 250 evts.send_stream_stop_sending(11.into(), 56); 251 evts.send_stream_complete(12.into()); 252 evts.client_0rtt_rejected(); 253 assert_eq!(evts.events().count(), 1); 254 255 evts.send_stream_writable(9.into()); 256 evts.send_stream_stop_sending(10.into(), 55); 257 evts.connection_state_change(State::Closed(ConnectionError::Transport( 258 Error::StreamStateError, 259 ))); 260 assert_eq!(evts.events().count(), 1); 261 } 262 } 263