1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 // This file implements a server that can handle multiple connections.
8 
9 use neqo_common::{
10     hex, matches, qerror, qinfo, qtrace, qwarn, timer::Timer, Datagram, Decoder, Encoder,
11 };
12 use neqo_crypto::{
13     constants::{TLS_AES_128_GCM_SHA256, TLS_VERSION_1_3},
14     selfencrypt::SelfEncrypt,
15     AntiReplay,
16 };
17 
18 use crate::cid::{ConnectionId, ConnectionIdDecoder, ConnectionIdManager, ConnectionIdRef};
19 use crate::connection::{Connection, Output, State};
20 use crate::packet::{PacketBuilder, PacketType, PublicPacket};
21 use crate::Res;
22 
23 use std::cell::RefCell;
24 use std::collections::{HashMap, HashSet, VecDeque};
25 use std::convert::TryFrom;
26 use std::mem;
27 use std::net::{IpAddr, SocketAddr};
28 use std::ops::{Deref, DerefMut};
29 use std::rc::Rc;
30 use std::time::{Duration, Instant};
31 
32 pub enum InitialResult {
33     Accept,
34     Drop,
35     Retry(Vec<u8>),
36 }
37 
38 /// MIN_INITIAL_PACKET_SIZE is the smallest packet that can be used to establish
39 /// a new connection across all QUIC versions this server supports.
40 const MIN_INITIAL_PACKET_SIZE: usize = 1200;
41 const TIMER_GRANULARITY: Duration = Duration::from_millis(10);
42 const TIMER_CAPACITY: usize = 16384;
43 
44 type StateRef = Rc<RefCell<ServerConnectionState>>;
45 type CidMgr = Rc<RefCell<dyn ConnectionIdManager>>;
46 type ConnectionTableRef = Rc<RefCell<HashMap<ConnectionId, StateRef>>>;
47 
48 #[derive(Debug)]
49 pub struct ServerConnectionState {
50     c: Connection,
51     last_timer: Instant,
52 }
53 
54 impl Deref for ServerConnectionState {
55     type Target = Connection;
deref(&self) -> &Self::Target56     fn deref(&self) -> &Self::Target {
57         &self.c
58     }
59 }
60 
61 impl DerefMut for ServerConnectionState {
deref_mut(&mut self) -> &mut Self::Target62     fn deref_mut(&mut self) -> &mut Self::Target {
63         &mut self.c
64     }
65 }
66 
67 enum RetryTokenResult {
68     Pass,
69     Valid(ConnectionId),
70     Validate,
71     Invalid,
72 }
73 
74 struct RetryToken {
75     /// Whether to send a Retry.
76     require_retry: bool,
77     /// A self-encryption object used for protecting Retry tokens.
78     self_encrypt: SelfEncrypt,
79     /// When this object was created.
80     start_time: Instant,
81 }
82 
83 impl RetryToken {
new(now: Instant) -> Res<Self>84     fn new(now: Instant) -> Res<Self> {
85         Ok(Self {
86             require_retry: false,
87             self_encrypt: SelfEncrypt::new(TLS_VERSION_1_3, TLS_AES_128_GCM_SHA256)?,
88             start_time: now,
89         })
90     }
91 
encode_peer_address(peer_address: SocketAddr) -> Vec<u8>92     fn encode_peer_address(peer_address: SocketAddr) -> Vec<u8> {
93         // Let's be "clever" by putting the peer's address in the AAD.
94         // We don't need to encode these into the token as they should be
95         // available when we need to check the token.
96         let mut encoded_address = Encoder::default();
97         match peer_address.ip() {
98             IpAddr::V4(a) => {
99                 encoded_address.encode_byte(4);
100                 encoded_address.encode(&a.octets());
101             }
102             IpAddr::V6(a) => {
103                 encoded_address.encode_byte(6);
104                 encoded_address.encode(&a.octets());
105             }
106         }
107         encoded_address.encode_uint(2, peer_address.port());
108         encoded_address.into()
109     }
110 
111     /// This generates a token for use with Retry.
generate_token( &mut self, dcid: &ConnectionId, peer_address: SocketAddr, now: Instant, ) -> Res<Vec<u8>>112     pub fn generate_token(
113         &mut self,
114         dcid: &ConnectionId,
115         peer_address: SocketAddr,
116         now: Instant,
117     ) -> Res<Vec<u8>> {
118         // TODO(mt) rotate keys on a fixed schedule.
119         let mut token = Encoder::default();
120         const EXPIRATION: Duration = Duration::from_secs(5);
121         let end = now + EXPIRATION;
122         let end_millis = u32::try_from(end.duration_since(self.start_time).as_millis())?;
123         token.encode_uint(4, end_millis);
124         token.encode(dcid);
125         let peer_addr = Self::encode_peer_address(peer_address);
126         Ok(self.self_encrypt.seal(&peer_addr, &token)?)
127     }
128 
set_retry_required(&mut self, retry: bool)129     pub fn set_retry_required(&mut self, retry: bool) {
130         self.require_retry = retry;
131     }
132 
133     /// Decrypts `token` and returns the connection Id it contains.
134     /// Returns `None` if the date is invalid in any way (such as it being expired or garbled).
decrypt_token( &self, token: &[u8], peer_address: SocketAddr, now: Instant, ) -> Option<ConnectionId>135     fn decrypt_token(
136         &self,
137         token: &[u8],
138         peer_address: SocketAddr,
139         now: Instant,
140     ) -> Option<ConnectionId> {
141         let peer_addr = Self::encode_peer_address(peer_address);
142         let data = if let Ok(d) = self.self_encrypt.open(&peer_addr, token) {
143             d
144         } else {
145             return None;
146         };
147         let mut dec = Decoder::new(&data);
148         match dec.decode_uint(4) {
149             Some(d) => {
150                 let end = self.start_time + Duration::from_millis(d);
151                 if end < now {
152                     return None;
153                 }
154             }
155             _ => return None,
156         }
157         Some(ConnectionId::from(dec.decode_remainder()))
158     }
159 
validate( &self, token: &[u8], peer_address: SocketAddr, now: Instant, ) -> RetryTokenResult160     pub fn validate(
161         &self,
162         token: &[u8],
163         peer_address: SocketAddr,
164         now: Instant,
165     ) -> RetryTokenResult {
166         if token.is_empty() {
167             if self.require_retry {
168                 RetryTokenResult::Validate
169             } else {
170                 RetryTokenResult::Pass
171             }
172         } else if let Some(cid) = self.decrypt_token(token, peer_address, now) {
173             RetryTokenResult::Valid(cid)
174         } else {
175             RetryTokenResult::Invalid
176         }
177     }
178 }
179 
180 pub struct Server {
181     /// The names of certificates.
182     certs: Vec<String>,
183     /// The ALPN values that the server supports.
184     protocols: Vec<String>,
185     anti_replay: AntiReplay,
186     /// A connection ID manager.
187     cid_manager: CidMgr,
188     /// All connections, keyed by ConnectionId.
189     connections: ConnectionTableRef,
190     /// The connections that have new events.
191     active: HashSet<ActiveConnectionRef>,
192     /// The set of connections that need immediate processing.
193     waiting: VecDeque<StateRef>,
194     /// Outstanding timers for connections.
195     timers: Timer<StateRef>,
196     /// Whether a Retry packet will be sent in response to new
197     /// Initial packets.
198     retry: RetryToken,
199 }
200 
201 impl Server {
202     /// Construct a new server.
203     /// `now` is the time that the server is instantiated.
204     /// `cid_manager` is responsible for generating connection IDs and parsing them;
205     /// connection IDs produced by the manager cannot be zero-length.
206     /// `certs` is a list of the certificates that should be configured.
207     /// `protocols` is the preference list of ALPN values.
208     /// `anti_replay` is an anti-replay context.
new( now: Instant, certs: &[impl AsRef<str>], protocols: &[impl AsRef<str>], anti_replay: AntiReplay, cid_manager: CidMgr, ) -> Res<Self>209     pub fn new(
210         now: Instant,
211         certs: &[impl AsRef<str>],
212         protocols: &[impl AsRef<str>],
213         anti_replay: AntiReplay,
214         cid_manager: CidMgr,
215     ) -> Res<Self> {
216         Ok(Self {
217             certs: certs.iter().map(|x| String::from(x.as_ref())).collect(),
218             protocols: protocols.iter().map(|x| String::from(x.as_ref())).collect(),
219             anti_replay,
220             cid_manager,
221             connections: Rc::default(),
222             active: HashSet::default(),
223             waiting: VecDeque::default(),
224             timers: Timer::new(now, TIMER_GRANULARITY, TIMER_CAPACITY),
225             retry: RetryToken::new(now)?,
226         })
227     }
228 
set_retry_required(&mut self, require_retry: bool)229     pub fn set_retry_required(&mut self, require_retry: bool) {
230         self.retry.set_retry_required(require_retry);
231     }
232 
remove_timer(&mut self, c: &StateRef)233     fn remove_timer(&mut self, c: &StateRef) {
234         let last = c.borrow().last_timer;
235         self.timers.remove(last, |t| Rc::ptr_eq(t, c));
236     }
237 
process_connection( &mut self, c: StateRef, dgram: Option<Datagram>, now: Instant, ) -> Option<Datagram>238     fn process_connection(
239         &mut self,
240         c: StateRef,
241         dgram: Option<Datagram>,
242         now: Instant,
243     ) -> Option<Datagram> {
244         qtrace!([self], "Process connection {:?}", c);
245         let out = c.borrow_mut().process(dgram, now);
246         match out {
247             Output::Datagram(_) => {
248                 qtrace!([self], "Sending packet, added to waiting connections");
249                 self.waiting.push_back(c.clone());
250             }
251             Output::Callback(delay) => {
252                 let next = now + delay;
253                 if next != c.borrow().last_timer {
254                     qtrace!([self], "Change timer to {:?}", next);
255                     self.remove_timer(&c);
256                     c.borrow_mut().last_timer = next;
257                     self.timers.add(next, c.clone());
258                 }
259             }
260             _ => {
261                 self.remove_timer(&c);
262             }
263         }
264         if c.borrow().has_events() {
265             qtrace!([self], "Connection active: {:?}", c);
266             self.active.insert(ActiveConnectionRef { c: c.clone() });
267         }
268         if matches!(c.borrow().state(), State::Closed(_)) {
269             self.connections
270                 .borrow_mut()
271                 .retain(|_, v| !Rc::ptr_eq(v, &c));
272         }
273         out.dgram()
274     }
275 
connection(&self, cid: &ConnectionIdRef) -> Option<StateRef>276     fn connection(&self, cid: &ConnectionIdRef) -> Option<StateRef> {
277         if let Some(c) = self.connections.borrow().get(&cid[..]) {
278             Some(c.clone())
279         } else {
280             None
281         }
282     }
283 
handle_initial( &mut self, dcid: ConnectionId, scid: ConnectionId, token: Vec<u8>, dgram: Datagram, now: Instant, ) -> Option<Datagram>284     fn handle_initial(
285         &mut self,
286         dcid: ConnectionId,
287         scid: ConnectionId,
288         token: Vec<u8>,
289         dgram: Datagram,
290         now: Instant,
291     ) -> Option<Datagram> {
292         match self.retry.validate(&token, dgram.source(), now) {
293             RetryTokenResult::Invalid => None,
294             RetryTokenResult::Pass => self.accept_connection(None, dgram, now),
295             RetryTokenResult::Valid(dcid) => self.accept_connection(Some(dcid), dgram, now),
296             RetryTokenResult::Validate => {
297                 qinfo!([self], "Send retry for {:?}", dcid);
298 
299                 let res = self.retry.generate_token(&dcid, dgram.source(), now);
300                 let token = if let Ok(t) = res {
301                     t
302                 } else {
303                     qerror!([self], "unable to generate token, dropping packet");
304                     return None;
305                 };
306                 let new_dcid = self.cid_manager.borrow_mut().generate_cid();
307                 let packet = PacketBuilder::retry(&scid, &new_dcid, &token, &dcid);
308                 if let Ok(p) = packet {
309                     let retry = Datagram::new(dgram.destination(), dgram.source(), p);
310                     Some(retry)
311                 } else {
312                     qerror!([self], "unable to encode retry, dropping packet");
313                     None
314                 }
315             }
316         }
317     }
318 
accept_connection( &mut self, odcid: Option<ConnectionId>, dgram: Datagram, now: Instant, ) -> Option<Datagram>319     fn accept_connection(
320         &mut self,
321         odcid: Option<ConnectionId>,
322         dgram: Datagram,
323         now: Instant,
324     ) -> Option<Datagram> {
325         qinfo!([self], "Accept connection");
326         // The internal connection ID manager that we use is not used directly.
327         // Instead, wrap it so that we can save connection IDs.
328         let cid_mgr = Rc::new(RefCell::new(ServerConnectionIdManager {
329             c: None,
330             cid_manager: self.cid_manager.clone(),
331             connections: self.connections.clone(),
332         }));
333         let sconn = Connection::new_server(
334             &self.certs,
335             &self.protocols,
336             &self.anti_replay,
337             cid_mgr.clone(),
338         );
339         if let Ok(mut c) = sconn {
340             if let Some(odcid) = odcid {
341                 c.original_connection_id(&odcid);
342             }
343             let c = Rc::new(RefCell::new(ServerConnectionState { c, last_timer: now }));
344             cid_mgr.borrow_mut().c = Some(c.clone());
345             self.process_connection(c, Some(dgram), now)
346         } else {
347             qwarn!([self], "Unable to create connection");
348             None
349         }
350     }
351 
process_input(&mut self, dgram: Datagram, now: Instant) -> Option<Datagram>352     fn process_input(&mut self, dgram: Datagram, now: Instant) -> Option<Datagram> {
353         qtrace!("Process datagram: {}", hex(&dgram[..]));
354 
355         // This is only looking at the first packet header in the datagram.
356         // All packets in the datagram are routed to the same connection.
357         let res = PublicPacket::decode(&dgram[..], self.cid_manager.borrow().as_decoder());
358         let (packet, _remainder) = match res {
359             Ok(res) => res,
360             _ => {
361                 qtrace!([self], "Discarding {:?}", dgram);
362                 return None;
363             }
364         };
365 
366         // Finding an existing connection. Should be the most common case.
367         if let Some(c) = self.connection(packet.dcid()) {
368             return self.process_connection(c, Some(dgram), now);
369         }
370 
371         if packet.packet_type() == PacketType::Short {
372             // TODO send a stateless reset here.
373             qtrace!([self], "Short header packet for an unknown connection");
374             return None;
375         }
376 
377         if dgram.len() < MIN_INITIAL_PACKET_SIZE {
378             qtrace!([self], "Bogus packet: too short");
379             return None;
380         }
381         if packet.packet_type() == PacketType::OtherVersion {
382             let vn = PacketBuilder::version_negotiation(packet.scid(), packet.dcid());
383             return Some(Datagram::new(dgram.destination(), dgram.source(), vn));
384         }
385 
386         // Copy values from `packet` because they are currently still borrowing from `dgram`.
387         let dcid = ConnectionId::from(packet.dcid());
388         let scid = ConnectionId::from(packet.scid());
389         let token = packet.token().to_vec();
390         self.handle_initial(dcid, scid, token, dgram, now)
391     }
392 
393     /// Iterate through the pending connections looking for any that might want
394     /// to send a datagram.  Stop at the first one that does.
process_next_output(&mut self, now: Instant) -> Option<Datagram>395     fn process_next_output(&mut self, now: Instant) -> Option<Datagram> {
396         qtrace!([self], "No packet to send, look at waiting connections");
397         while let Some(c) = self.waiting.pop_front() {
398             if let Some(d) = self.process_connection(c, None, now) {
399                 return Some(d);
400             }
401         }
402         qtrace!([self], "No packet to send still, run timers");
403         while let Some(c) = self.timers.take_next(now) {
404             if let Some(d) = self.process_connection(c, None, now) {
405                 return Some(d);
406             }
407         }
408         None
409     }
410 
next_time(&mut self, now: Instant) -> Option<Duration>411     fn next_time(&mut self, now: Instant) -> Option<Duration> {
412         if self.waiting.is_empty() {
413             self.timers.next_time().map(|x| x - now)
414         } else {
415             Some(Duration::new(0, 0))
416         }
417     }
418 
process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output419     pub fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output {
420         let out = if let Some(d) = dgram {
421             self.process_input(d, now)
422         } else {
423             None
424         };
425         let out = out.or_else(|| self.process_next_output(now));
426         match out {
427             Some(d) => {
428                 qtrace!([self], "Send packet: {:?}", d);
429                 Output::Datagram(d)
430             }
431             _ => match self.next_time(now) {
432                 Some(delay) => {
433                     qtrace!([self], "Wait: {:?}", delay);
434                     Output::Callback(delay)
435                 }
436                 _ => {
437                     qtrace!([self], "Go dormant");
438                     Output::None
439                 }
440             },
441         }
442     }
443 
444     /// This lists the connections that have received new events
445     /// as a result of calling `process()`.
active_connections(&mut self) -> Vec<ActiveConnectionRef>446     pub fn active_connections(&mut self) -> Vec<ActiveConnectionRef> {
447         mem::take(&mut self.active).into_iter().collect()
448     }
449 
add_to_waiting(&mut self, c: ActiveConnectionRef)450     pub fn add_to_waiting(&mut self, c: ActiveConnectionRef) {
451         self.waiting.push_back(c.connection());
452     }
453 }
454 
455 #[derive(Clone, Debug)]
456 pub struct ActiveConnectionRef {
457     c: StateRef,
458 }
459 
460 impl ActiveConnectionRef {
borrow<'a>(&'a self) -> impl Deref<Target = Connection> + 'a461     pub fn borrow<'a>(&'a self) -> impl Deref<Target = Connection> + 'a {
462         std::cell::Ref::map(self.c.borrow(), |c| &c.c)
463     }
464 
borrow_mut<'a>(&'a mut self) -> impl DerefMut<Target = Connection> + 'a465     pub fn borrow_mut<'a>(&'a mut self) -> impl DerefMut<Target = Connection> + 'a {
466         std::cell::RefMut::map(self.c.borrow_mut(), |c| &mut c.c)
467     }
468 
connection(&self) -> StateRef469     pub fn connection(&self) -> StateRef {
470         self.c.clone()
471     }
472 }
473 
474 impl std::hash::Hash for ActiveConnectionRef {
hash<H: std::hash::Hasher>(&self, state: &mut H)475     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
476         let ptr: *const _ = self.c.as_ref();
477         ptr.hash(state)
478     }
479 }
480 
481 impl PartialEq for ActiveConnectionRef {
eq(&self, other: &Self) -> bool482     fn eq(&self, other: &Self) -> bool {
483         Rc::ptr_eq(&self.c, &other.c)
484     }
485 }
486 
487 impl Eq for ActiveConnectionRef {}
488 
489 struct ServerConnectionIdManager {
490     c: Option<StateRef>,
491     connections: ConnectionTableRef,
492     cid_manager: CidMgr,
493 }
494 
495 impl ConnectionIdDecoder for ServerConnectionIdManager {
decode_cid<'a>(&self, dec: &mut Decoder<'a>) -> Option<ConnectionIdRef<'a>>496     fn decode_cid<'a>(&self, dec: &mut Decoder<'a>) -> Option<ConnectionIdRef<'a>> {
497         self.cid_manager.borrow_mut().decode_cid(dec)
498     }
499 }
500 impl ConnectionIdManager for ServerConnectionIdManager {
generate_cid(&mut self) -> ConnectionId501     fn generate_cid(&mut self) -> ConnectionId {
502         let cid = self.cid_manager.borrow_mut().generate_cid();
503         assert!(!cid.is_empty());
504         let v = self
505             .connections
506             .borrow_mut()
507             .insert(cid.clone(), self.c.as_ref().unwrap().clone());
508         if let Some(v) = v {
509             debug_assert!(Rc::ptr_eq(&v, self.c.as_ref().unwrap()));
510         }
511         cid
512     }
as_decoder(&self) -> &dyn ConnectionIdDecoder513     fn as_decoder(&self) -> &dyn ConnectionIdDecoder {
514         self
515     }
516 }
517 
518 impl ::std::fmt::Display for Server {
fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result519     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
520         write!(f, "Server")
521     }
522 }
523