1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 // Collecting a list of events relevant to whoever is using the Connection.
8 
9 use std::cell::RefCell;
10 use std::collections::VecDeque;
11 use std::rc::Rc;
12 
13 use crate::connection::State;
14 use crate::quic_datagrams::DatagramTracking;
15 use crate::stream_id::{StreamId, StreamType};
16 use crate::{AppError, Stats};
17 use neqo_common::event::Provider as EventProvider;
18 use neqo_crypto::ResumptionToken;
19 
20 #[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
21 pub enum OutgoingDatagramOutcome {
22     DroppedTooBig,
23     DroppedQueueFull,
24     Lost,
25     Acked,
26 }
27 
28 #[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
29 pub enum ConnectionEvent {
30     /// Cert authentication needed
31     AuthenticationNeeded,
32     /// Encrypted client hello fallback occurred.  The certificate for the
33     /// public name needs to be authenticated.
34     EchFallbackAuthenticationNeeded {
35         public_name: String,
36     },
37     /// A new uni (read) or bidi stream has been opened by the peer.
38     NewStream {
39         stream_id: StreamId,
40     },
41     /// Space available in the buffer for an application write to succeed.
42     SendStreamWritable {
43         stream_id: StreamId,
44     },
45     /// New bytes available for reading.
46     RecvStreamReadable {
47         stream_id: StreamId,
48     },
49     /// Peer reset the stream.
50     RecvStreamReset {
51         stream_id: StreamId,
52         app_error: AppError,
53     },
54     /// Peer has sent STOP_SENDING
55     SendStreamStopSending {
56         stream_id: StreamId,
57         app_error: AppError,
58     },
59     /// Peer has acked everything sent on the stream.
60     SendStreamComplete {
61         stream_id: StreamId,
62     },
63     /// Peer increased MAX_STREAMS
64     SendStreamCreatable {
65         stream_type: StreamType,
66     },
67     /// Connection state change.
68     StateChange(State),
69     /// The server rejected 0-RTT.
70     /// This event invalidates all state in streams that has been created.
71     /// Any data written to streams needs to be written again.
72     ZeroRttRejected,
73     ResumptionToken(ResumptionToken),
74     Datagram(Vec<u8>),
75     OutgoingDatagramOutcome {
76         id: u64,
77         outcome: OutgoingDatagramOutcome,
78     },
79     IncomingDatagramDropped,
80 }
81 
82 #[derive(Debug, Default, Clone)]
83 #[allow(clippy::module_name_repetitions)]
84 pub struct ConnectionEvents {
85     events: Rc<RefCell<VecDeque<ConnectionEvent>>>,
86 }
87 
88 impl ConnectionEvents {
authentication_needed(&self)89     pub fn authentication_needed(&self) {
90         self.insert(ConnectionEvent::AuthenticationNeeded);
91     }
92 
ech_fallback_authentication_needed(&self, public_name: String)93     pub fn ech_fallback_authentication_needed(&self, public_name: String) {
94         self.insert(ConnectionEvent::EchFallbackAuthenticationNeeded { public_name });
95     }
96 
new_stream(&self, stream_id: StreamId)97     pub fn new_stream(&self, stream_id: StreamId) {
98         self.insert(ConnectionEvent::NewStream { stream_id });
99     }
100 
recv_stream_readable(&self, stream_id: StreamId)101     pub fn recv_stream_readable(&self, stream_id: StreamId) {
102         self.insert(ConnectionEvent::RecvStreamReadable { stream_id });
103     }
104 
recv_stream_reset(&self, stream_id: StreamId, app_error: AppError)105     pub fn recv_stream_reset(&self, stream_id: StreamId, app_error: AppError) {
106         // If reset, no longer readable.
107         self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64()));
108 
109         self.insert(ConnectionEvent::RecvStreamReset {
110             stream_id,
111             app_error,
112         });
113     }
114 
send_stream_writable(&self, stream_id: StreamId)115     pub fn send_stream_writable(&self, stream_id: StreamId) {
116         self.insert(ConnectionEvent::SendStreamWritable { stream_id });
117     }
118 
send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError)119     pub fn send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError) {
120         // If stopped, no longer writable.
121         self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id));
122 
123         self.insert(ConnectionEvent::SendStreamStopSending {
124             stream_id,
125             app_error,
126         });
127     }
128 
send_stream_complete(&self, stream_id: StreamId)129     pub fn send_stream_complete(&self, stream_id: StreamId) {
130         self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id));
131 
132         self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } if *x == stream_id.as_u64()));
133 
134         self.insert(ConnectionEvent::SendStreamComplete { stream_id });
135     }
136 
send_stream_creatable(&self, stream_type: StreamType)137     pub fn send_stream_creatable(&self, stream_type: StreamType) {
138         self.insert(ConnectionEvent::SendStreamCreatable { stream_type });
139     }
140 
connection_state_change(&self, state: State)141     pub fn connection_state_change(&self, state: State) {
142         // If closing, existing events no longer relevant.
143         match state {
144             State::Closing { .. } | State::Closed(_) => self.events.borrow_mut().clear(),
145             _ => (),
146         }
147         self.insert(ConnectionEvent::StateChange(state));
148     }
149 
client_resumption_token(&self, token: ResumptionToken)150     pub fn client_resumption_token(&self, token: ResumptionToken) {
151         self.insert(ConnectionEvent::ResumptionToken(token));
152     }
153 
client_0rtt_rejected(&self)154     pub fn client_0rtt_rejected(&self) {
155         // If 0rtt rejected, must start over and existing events are no longer
156         // relevant.
157         self.events.borrow_mut().clear();
158         self.insert(ConnectionEvent::ZeroRttRejected);
159     }
160 
recv_stream_complete(&self, stream_id: StreamId)161     pub fn recv_stream_complete(&self, stream_id: StreamId) {
162         // If stopped, no longer readable.
163         self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64()));
164     }
165 
166     // The number of datagrams in the events queue is limited to max_queued_datagrams.
167     // This function ensure this and deletes the oldest datagrams if needed.
check_datagram_queued(&self, max_queued_datagrams: usize, stats: &mut Stats)168     fn check_datagram_queued(&self, max_queued_datagrams: usize, stats: &mut Stats) {
169         let mut q = self.events.borrow_mut();
170         let mut remove = None;
171         if q.iter()
172             .filter(|evt| matches!(evt, ConnectionEvent::Datagram(_)))
173             .count()
174             == max_queued_datagrams
175         {
176             if let Some(d) = q
177                 .iter()
178                 .rev()
179                 .enumerate()
180                 .filter(|(_, evt)| matches!(evt, ConnectionEvent::Datagram(_)))
181                 .take(1)
182                 .next()
183             {
184                 remove = Some(d.0);
185             }
186         }
187         if let Some(r) = remove {
188             q.remove(r);
189             q.push_back(ConnectionEvent::IncomingDatagramDropped);
190             stats.incoming_datagram_dropped += 1;
191         }
192     }
193 
add_datagram(&self, max_queued_datagrams: usize, data: &[u8], stats: &mut Stats)194     pub fn add_datagram(&self, max_queued_datagrams: usize, data: &[u8], stats: &mut Stats) {
195         self.check_datagram_queued(max_queued_datagrams, stats);
196         self.events
197             .borrow_mut()
198             .push_back(ConnectionEvent::Datagram(data.to_vec()));
199     }
200 
datagram_outcome( &self, dgram_tracker: &DatagramTracking, outcome: OutgoingDatagramOutcome, )201     pub fn datagram_outcome(
202         &self,
203         dgram_tracker: &DatagramTracking,
204         outcome: OutgoingDatagramOutcome,
205     ) {
206         if let DatagramTracking::Id(id) = dgram_tracker {
207             self.events
208                 .borrow_mut()
209                 .push_back(ConnectionEvent::OutgoingDatagramOutcome { id: *id, outcome });
210         }
211     }
212 
insert(&self, event: ConnectionEvent)213     fn insert(&self, event: ConnectionEvent) {
214         let mut q = self.events.borrow_mut();
215 
216         // Special-case two enums that are not strictly PartialEq equal but that
217         // we wish to avoid inserting duplicates.
218         let already_present = match &event {
219             ConnectionEvent::SendStreamStopSending { stream_id, .. } => q.iter().any(|evt| {
220                 matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. }
221 		                    if *x == *stream_id)
222             }),
223             ConnectionEvent::RecvStreamReset { stream_id, .. } => q.iter().any(|evt| {
224                 matches!(evt, ConnectionEvent::RecvStreamReset { stream_id: x, .. }
225 		                    if *x == *stream_id)
226             }),
227             _ => q.contains(&event),
228         };
229         if !already_present {
230             q.push_back(event);
231         }
232     }
233 
remove<F>(&self, f: F) where F: Fn(&ConnectionEvent) -> bool,234     fn remove<F>(&self, f: F)
235     where
236         F: Fn(&ConnectionEvent) -> bool,
237     {
238         self.events.borrow_mut().retain(|evt| !f(evt))
239     }
240 }
241 
242 impl EventProvider for ConnectionEvents {
243     type Event = ConnectionEvent;
244 
has_events(&self) -> bool245     fn has_events(&self) -> bool {
246         !self.events.borrow().is_empty()
247     }
248 
next_event(&mut self) -> Option<Self::Event>249     fn next_event(&mut self) -> Option<Self::Event> {
250         self.events.borrow_mut().pop_front()
251     }
252 }
253 
254 #[cfg(test)]
255 mod tests {
256     use super::*;
257     use crate::{ConnectionError, Error};
258 
259     #[test]
event_culling()260     fn event_culling() {
261         let mut evts = ConnectionEvents::default();
262 
263         evts.client_0rtt_rejected();
264         evts.client_0rtt_rejected();
265         assert_eq!(evts.events().count(), 1);
266         assert_eq!(evts.events().count(), 0);
267 
268         evts.new_stream(4.into());
269         evts.new_stream(4.into());
270         assert_eq!(evts.events().count(), 1);
271 
272         evts.recv_stream_readable(6.into());
273         evts.recv_stream_reset(6.into(), 66);
274         evts.recv_stream_reset(6.into(), 65);
275         assert_eq!(evts.events().count(), 1);
276 
277         evts.send_stream_writable(8.into());
278         evts.send_stream_writable(8.into());
279         evts.send_stream_stop_sending(8.into(), 55);
280         evts.send_stream_stop_sending(8.into(), 56);
281         let events = evts.events().collect::<Vec<_>>();
282         assert_eq!(events.len(), 1);
283         assert_eq!(
284             events[0],
285             ConnectionEvent::SendStreamStopSending {
286                 stream_id: StreamId::new(8),
287                 app_error: 55
288             }
289         );
290 
291         evts.send_stream_writable(8.into());
292         evts.send_stream_writable(8.into());
293         evts.send_stream_stop_sending(8.into(), 55);
294         evts.send_stream_stop_sending(8.into(), 56);
295         evts.send_stream_complete(8.into());
296         assert_eq!(evts.events().count(), 1);
297 
298         evts.send_stream_writable(8.into());
299         evts.send_stream_writable(9.into());
300         evts.send_stream_stop_sending(10.into(), 55);
301         evts.send_stream_stop_sending(11.into(), 56);
302         evts.send_stream_complete(12.into());
303         assert_eq!(evts.events().count(), 5);
304 
305         evts.send_stream_writable(8.into());
306         evts.send_stream_writable(9.into());
307         evts.send_stream_stop_sending(10.into(), 55);
308         evts.send_stream_stop_sending(11.into(), 56);
309         evts.send_stream_complete(12.into());
310         evts.client_0rtt_rejected();
311         assert_eq!(evts.events().count(), 1);
312 
313         evts.send_stream_writable(9.into());
314         evts.send_stream_stop_sending(10.into(), 55);
315         evts.connection_state_change(State::Closed(ConnectionError::Transport(
316             Error::StreamStateError,
317         )));
318         assert_eq!(evts.events().count(), 1);
319     }
320 }
321