1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 // Collecting a list of events relevant to whoever is using the Connection.
8 
9 use std::cell::RefCell;
10 use std::collections::VecDeque;
11 use std::rc::Rc;
12 
13 use crate::connection::State;
14 use crate::stream_id::{StreamId, StreamType};
15 use crate::AppError;
16 use neqo_common::event::Provider as EventProvider;
17 use neqo_crypto::ResumptionToken;
18 
19 #[derive(Debug, PartialOrd, Ord, PartialEq, Eq)]
20 pub enum ConnectionEvent {
21     /// Cert authentication needed
22     AuthenticationNeeded,
23     /// Encrypted client hello fallback occurred.  The certificate for the
24     /// public name needs to be authenticated.
25     EchFallbackAuthenticationNeeded {
26         public_name: String,
27     },
28     /// A new uni (read) or bidi stream has been opened by the peer.
29     NewStream {
30         stream_id: StreamId,
31     },
32     /// Space available in the buffer for an application write to succeed.
33     SendStreamWritable {
34         stream_id: StreamId,
35     },
36     /// New bytes available for reading.
37     RecvStreamReadable {
38         stream_id: u64,
39     },
40     /// Peer reset the stream.
41     RecvStreamReset {
42         stream_id: u64,
43         app_error: AppError,
44     },
45     /// Peer has sent STOP_SENDING
46     SendStreamStopSending {
47         stream_id: u64,
48         app_error: AppError,
49     },
50     /// Peer has acked everything sent on the stream.
51     SendStreamComplete {
52         stream_id: u64,
53     },
54     /// Peer increased MAX_STREAMS
55     SendStreamCreatable {
56         stream_type: StreamType,
57     },
58     /// Connection state change.
59     StateChange(State),
60     /// The server rejected 0-RTT.
61     /// This event invalidates all state in streams that has been created.
62     /// Any data written to streams needs to be written again.
63     ZeroRttRejected,
64     ResumptionToken(ResumptionToken),
65 }
66 
67 #[derive(Debug, Default, Clone)]
68 #[allow(clippy::module_name_repetitions)]
69 pub struct ConnectionEvents {
70     events: Rc<RefCell<VecDeque<ConnectionEvent>>>,
71 }
72 
73 impl ConnectionEvents {
authentication_needed(&self)74     pub fn authentication_needed(&self) {
75         self.insert(ConnectionEvent::AuthenticationNeeded);
76     }
77 
ech_fallback_authentication_needed(&self, public_name: String)78     pub fn ech_fallback_authentication_needed(&self, public_name: String) {
79         self.insert(ConnectionEvent::EchFallbackAuthenticationNeeded { public_name });
80     }
81 
new_stream(&self, stream_id: StreamId)82     pub fn new_stream(&self, stream_id: StreamId) {
83         self.insert(ConnectionEvent::NewStream { stream_id });
84     }
85 
recv_stream_readable(&self, stream_id: StreamId)86     pub fn recv_stream_readable(&self, stream_id: StreamId) {
87         self.insert(ConnectionEvent::RecvStreamReadable {
88             stream_id: stream_id.as_u64(),
89         });
90     }
91 
recv_stream_reset(&self, stream_id: StreamId, app_error: AppError)92     pub fn recv_stream_reset(&self, stream_id: StreamId, app_error: AppError) {
93         // If reset, no longer readable.
94         self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64()));
95 
96         self.insert(ConnectionEvent::RecvStreamReset {
97             stream_id: stream_id.as_u64(),
98             app_error,
99         });
100     }
101 
send_stream_writable(&self, stream_id: StreamId)102     pub fn send_stream_writable(&self, stream_id: StreamId) {
103         self.insert(ConnectionEvent::SendStreamWritable { stream_id });
104     }
105 
send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError)106     pub fn send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError) {
107         // If stopped, no longer writable.
108         self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id.as_u64()));
109 
110         self.insert(ConnectionEvent::SendStreamStopSending {
111             stream_id: stream_id.as_u64(),
112             app_error,
113         });
114     }
115 
send_stream_complete(&self, stream_id: StreamId)116     pub fn send_stream_complete(&self, stream_id: StreamId) {
117         self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id));
118 
119         self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } if *x == stream_id.as_u64()));
120 
121         self.insert(ConnectionEvent::SendStreamComplete {
122             stream_id: stream_id.as_u64(),
123         });
124     }
125 
send_stream_creatable(&self, stream_type: StreamType)126     pub fn send_stream_creatable(&self, stream_type: StreamType) {
127         self.insert(ConnectionEvent::SendStreamCreatable { stream_type });
128     }
129 
connection_state_change(&self, state: State)130     pub fn connection_state_change(&self, state: State) {
131         // If closing, existing events no longer relevant.
132         match state {
133             State::Closing { .. } | State::Closed(_) => self.events.borrow_mut().clear(),
134             _ => (),
135         }
136         self.insert(ConnectionEvent::StateChange(state));
137     }
138 
client_resumption_token(&self, token: ResumptionToken)139     pub fn client_resumption_token(&self, token: ResumptionToken) {
140         self.insert(ConnectionEvent::ResumptionToken(token));
141     }
142 
client_0rtt_rejected(&self)143     pub fn client_0rtt_rejected(&self) {
144         // If 0rtt rejected, must start over and existing events are no longer
145         // relevant.
146         self.events.borrow_mut().clear();
147         self.insert(ConnectionEvent::ZeroRttRejected);
148     }
149 
recv_stream_complete(&self, stream_id: StreamId)150     pub fn recv_stream_complete(&self, stream_id: StreamId) {
151         // If stopped, no longer readable.
152         self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64()));
153     }
154 
insert(&self, event: ConnectionEvent)155     fn insert(&self, event: ConnectionEvent) {
156         let mut q = self.events.borrow_mut();
157 
158         // Special-case two enums that are not strictly PartialEq equal but that
159         // we wish to avoid inserting duplicates.
160         let already_present = match &event {
161             ConnectionEvent::SendStreamStopSending { stream_id, .. } => q.iter().any(|evt| {
162                 matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. }
163 		                    if *x == *stream_id)
164             }),
165             ConnectionEvent::RecvStreamReset { stream_id, .. } => q.iter().any(|evt| {
166                 matches!(evt, ConnectionEvent::RecvStreamReset { stream_id: x, .. }
167 		                    if *x == *stream_id)
168             }),
169             _ => q.contains(&event),
170         };
171         if !already_present {
172             q.push_back(event);
173         }
174     }
175 
remove<F>(&self, f: F) where F: Fn(&ConnectionEvent) -> bool,176     fn remove<F>(&self, f: F)
177     where
178         F: Fn(&ConnectionEvent) -> bool,
179     {
180         self.events.borrow_mut().retain(|evt| !f(evt))
181     }
182 }
183 
184 impl EventProvider for ConnectionEvents {
185     type Event = ConnectionEvent;
186 
has_events(&self) -> bool187     fn has_events(&self) -> bool {
188         !self.events.borrow().is_empty()
189     }
190 
next_event(&mut self) -> Option<Self::Event>191     fn next_event(&mut self) -> Option<Self::Event> {
192         self.events.borrow_mut().pop_front()
193     }
194 }
195 
196 #[cfg(test)]
197 mod tests {
198     use super::*;
199     use crate::{ConnectionError, Error};
200 
201     #[test]
event_culling()202     fn event_culling() {
203         let mut evts = ConnectionEvents::default();
204 
205         evts.client_0rtt_rejected();
206         evts.client_0rtt_rejected();
207         assert_eq!(evts.events().count(), 1);
208         assert_eq!(evts.events().count(), 0);
209 
210         evts.new_stream(4.into());
211         evts.new_stream(4.into());
212         assert_eq!(evts.events().count(), 1);
213 
214         evts.recv_stream_readable(6.into());
215         evts.recv_stream_reset(6.into(), 66);
216         evts.recv_stream_reset(6.into(), 65);
217         assert_eq!(evts.events().count(), 1);
218 
219         evts.send_stream_writable(8.into());
220         evts.send_stream_writable(8.into());
221         evts.send_stream_stop_sending(8.into(), 55);
222         evts.send_stream_stop_sending(8.into(), 56);
223         let events = evts.events().collect::<Vec<_>>();
224         assert_eq!(events.len(), 1);
225         assert_eq!(
226             events[0],
227             ConnectionEvent::SendStreamStopSending {
228                 stream_id: 8,
229                 app_error: 55
230             }
231         );
232 
233         evts.send_stream_writable(8.into());
234         evts.send_stream_writable(8.into());
235         evts.send_stream_stop_sending(8.into(), 55);
236         evts.send_stream_stop_sending(8.into(), 56);
237         evts.send_stream_complete(8.into());
238         assert_eq!(evts.events().count(), 1);
239 
240         evts.send_stream_writable(8.into());
241         evts.send_stream_writable(9.into());
242         evts.send_stream_stop_sending(10.into(), 55);
243         evts.send_stream_stop_sending(11.into(), 56);
244         evts.send_stream_complete(12.into());
245         assert_eq!(evts.events().count(), 5);
246 
247         evts.send_stream_writable(8.into());
248         evts.send_stream_writable(9.into());
249         evts.send_stream_stop_sending(10.into(), 55);
250         evts.send_stream_stop_sending(11.into(), 56);
251         evts.send_stream_complete(12.into());
252         evts.client_0rtt_rejected();
253         assert_eq!(evts.events().count(), 1);
254 
255         evts.send_stream_writable(9.into());
256         evts.send_stream_stop_sending(10.into(), 55);
257         evts.connection_state_change(State::Closed(ConnectionError::Transport(
258             Error::StreamStateError,
259         )));
260         assert_eq!(evts.events().count(), 1);
261     }
262 }
263