1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 // This file implements a server that can handle multiple connections. 8 9 use neqo_common::{ 10 hex, matches, qerror, qinfo, qtrace, qwarn, timer::Timer, Datagram, Decoder, Encoder, 11 }; 12 use neqo_crypto::{ 13 constants::{TLS_AES_128_GCM_SHA256, TLS_VERSION_1_3}, 14 selfencrypt::SelfEncrypt, 15 AntiReplay, 16 }; 17 18 use crate::cid::{ConnectionId, ConnectionIdDecoder, ConnectionIdManager, ConnectionIdRef}; 19 use crate::connection::{Connection, Output, State}; 20 use crate::packet::{PacketBuilder, PacketType, PublicPacket}; 21 use crate::Res; 22 23 use std::cell::RefCell; 24 use std::collections::{HashMap, HashSet, VecDeque}; 25 use std::convert::TryFrom; 26 use std::mem; 27 use std::net::{IpAddr, SocketAddr}; 28 use std::ops::{Deref, DerefMut}; 29 use std::rc::Rc; 30 use std::time::{Duration, Instant}; 31 32 pub enum InitialResult { 33 Accept, 34 Drop, 35 Retry(Vec<u8>), 36 } 37 38 /// MIN_INITIAL_PACKET_SIZE is the smallest packet that can be used to establish 39 /// a new connection across all QUIC versions this server supports. 40 const MIN_INITIAL_PACKET_SIZE: usize = 1200; 41 const TIMER_GRANULARITY: Duration = Duration::from_millis(10); 42 const TIMER_CAPACITY: usize = 16384; 43 44 type StateRef = Rc<RefCell<ServerConnectionState>>; 45 type CidMgr = Rc<RefCell<dyn ConnectionIdManager>>; 46 type ConnectionTableRef = Rc<RefCell<HashMap<ConnectionId, StateRef>>>; 47 48 #[derive(Debug)] 49 pub struct ServerConnectionState { 50 c: Connection, 51 last_timer: Instant, 52 } 53 54 impl Deref for ServerConnectionState { 55 type Target = Connection; deref(&self) -> &Self::Target56 fn deref(&self) -> &Self::Target { 57 &self.c 58 } 59 } 60 61 impl DerefMut for ServerConnectionState { deref_mut(&mut self) -> &mut Self::Target62 fn deref_mut(&mut self) -> &mut Self::Target { 63 &mut self.c 64 } 65 } 66 67 enum RetryTokenResult { 68 Pass, 69 Valid(ConnectionId), 70 Validate, 71 Invalid, 72 } 73 74 struct RetryToken { 75 /// Whether to send a Retry. 76 require_retry: bool, 77 /// A self-encryption object used for protecting Retry tokens. 78 self_encrypt: SelfEncrypt, 79 /// When this object was created. 80 start_time: Instant, 81 } 82 83 impl RetryToken { new(now: Instant) -> Res<Self>84 fn new(now: Instant) -> Res<Self> { 85 Ok(Self { 86 require_retry: false, 87 self_encrypt: SelfEncrypt::new(TLS_VERSION_1_3, TLS_AES_128_GCM_SHA256)?, 88 start_time: now, 89 }) 90 } 91 encode_peer_address(peer_address: SocketAddr) -> Vec<u8>92 fn encode_peer_address(peer_address: SocketAddr) -> Vec<u8> { 93 // Let's be "clever" by putting the peer's address in the AAD. 94 // We don't need to encode these into the token as they should be 95 // available when we need to check the token. 96 let mut encoded_address = Encoder::default(); 97 match peer_address.ip() { 98 IpAddr::V4(a) => { 99 encoded_address.encode_byte(4); 100 encoded_address.encode(&a.octets()); 101 } 102 IpAddr::V6(a) => { 103 encoded_address.encode_byte(6); 104 encoded_address.encode(&a.octets()); 105 } 106 } 107 encoded_address.encode_uint(2, peer_address.port()); 108 encoded_address.into() 109 } 110 111 /// This generates a token for use with Retry. generate_token( &mut self, dcid: &ConnectionId, peer_address: SocketAddr, now: Instant, ) -> Res<Vec<u8>>112 pub fn generate_token( 113 &mut self, 114 dcid: &ConnectionId, 115 peer_address: SocketAddr, 116 now: Instant, 117 ) -> Res<Vec<u8>> { 118 // TODO(mt) rotate keys on a fixed schedule. 119 let mut token = Encoder::default(); 120 const EXPIRATION: Duration = Duration::from_secs(5); 121 let end = now + EXPIRATION; 122 let end_millis = u32::try_from(end.duration_since(self.start_time).as_millis())?; 123 token.encode_uint(4, end_millis); 124 token.encode(dcid); 125 let peer_addr = Self::encode_peer_address(peer_address); 126 Ok(self.self_encrypt.seal(&peer_addr, &token)?) 127 } 128 set_retry_required(&mut self, retry: bool)129 pub fn set_retry_required(&mut self, retry: bool) { 130 self.require_retry = retry; 131 } 132 133 /// Decrypts `token` and returns the connection Id it contains. 134 /// Returns `None` if the date is invalid in any way (such as it being expired or garbled). decrypt_token( &self, token: &[u8], peer_address: SocketAddr, now: Instant, ) -> Option<ConnectionId>135 fn decrypt_token( 136 &self, 137 token: &[u8], 138 peer_address: SocketAddr, 139 now: Instant, 140 ) -> Option<ConnectionId> { 141 let peer_addr = Self::encode_peer_address(peer_address); 142 let data = if let Ok(d) = self.self_encrypt.open(&peer_addr, token) { 143 d 144 } else { 145 return None; 146 }; 147 let mut dec = Decoder::new(&data); 148 match dec.decode_uint(4) { 149 Some(d) => { 150 let end = self.start_time + Duration::from_millis(d); 151 if end < now { 152 return None; 153 } 154 } 155 _ => return None, 156 } 157 Some(ConnectionId::from(dec.decode_remainder())) 158 } 159 validate( &self, token: &[u8], peer_address: SocketAddr, now: Instant, ) -> RetryTokenResult160 pub fn validate( 161 &self, 162 token: &[u8], 163 peer_address: SocketAddr, 164 now: Instant, 165 ) -> RetryTokenResult { 166 if token.is_empty() { 167 if self.require_retry { 168 RetryTokenResult::Validate 169 } else { 170 RetryTokenResult::Pass 171 } 172 } else if let Some(cid) = self.decrypt_token(token, peer_address, now) { 173 RetryTokenResult::Valid(cid) 174 } else { 175 RetryTokenResult::Invalid 176 } 177 } 178 } 179 180 pub struct Server { 181 /// The names of certificates. 182 certs: Vec<String>, 183 /// The ALPN values that the server supports. 184 protocols: Vec<String>, 185 anti_replay: AntiReplay, 186 /// A connection ID manager. 187 cid_manager: CidMgr, 188 /// All connections, keyed by ConnectionId. 189 connections: ConnectionTableRef, 190 /// The connections that have new events. 191 active: HashSet<ActiveConnectionRef>, 192 /// The set of connections that need immediate processing. 193 waiting: VecDeque<StateRef>, 194 /// Outstanding timers for connections. 195 timers: Timer<StateRef>, 196 /// Whether a Retry packet will be sent in response to new 197 /// Initial packets. 198 retry: RetryToken, 199 } 200 201 impl Server { 202 /// Construct a new server. 203 /// `now` is the time that the server is instantiated. 204 /// `cid_manager` is responsible for generating connection IDs and parsing them; 205 /// connection IDs produced by the manager cannot be zero-length. 206 /// `certs` is a list of the certificates that should be configured. 207 /// `protocols` is the preference list of ALPN values. 208 /// `anti_replay` is an anti-replay context. new( now: Instant, certs: &[impl AsRef<str>], protocols: &[impl AsRef<str>], anti_replay: AntiReplay, cid_manager: CidMgr, ) -> Res<Self>209 pub fn new( 210 now: Instant, 211 certs: &[impl AsRef<str>], 212 protocols: &[impl AsRef<str>], 213 anti_replay: AntiReplay, 214 cid_manager: CidMgr, 215 ) -> Res<Self> { 216 Ok(Self { 217 certs: certs.iter().map(|x| String::from(x.as_ref())).collect(), 218 protocols: protocols.iter().map(|x| String::from(x.as_ref())).collect(), 219 anti_replay, 220 cid_manager, 221 connections: Rc::default(), 222 active: HashSet::default(), 223 waiting: VecDeque::default(), 224 timers: Timer::new(now, TIMER_GRANULARITY, TIMER_CAPACITY), 225 retry: RetryToken::new(now)?, 226 }) 227 } 228 set_retry_required(&mut self, require_retry: bool)229 pub fn set_retry_required(&mut self, require_retry: bool) { 230 self.retry.set_retry_required(require_retry); 231 } 232 remove_timer(&mut self, c: &StateRef)233 fn remove_timer(&mut self, c: &StateRef) { 234 let last = c.borrow().last_timer; 235 self.timers.remove(last, |t| Rc::ptr_eq(t, c)); 236 } 237 process_connection( &mut self, c: StateRef, dgram: Option<Datagram>, now: Instant, ) -> Option<Datagram>238 fn process_connection( 239 &mut self, 240 c: StateRef, 241 dgram: Option<Datagram>, 242 now: Instant, 243 ) -> Option<Datagram> { 244 qtrace!([self], "Process connection {:?}", c); 245 let out = c.borrow_mut().process(dgram, now); 246 match out { 247 Output::Datagram(_) => { 248 qtrace!([self], "Sending packet, added to waiting connections"); 249 self.waiting.push_back(c.clone()); 250 } 251 Output::Callback(delay) => { 252 let next = now + delay; 253 if next != c.borrow().last_timer { 254 qtrace!([self], "Change timer to {:?}", next); 255 self.remove_timer(&c); 256 c.borrow_mut().last_timer = next; 257 self.timers.add(next, c.clone()); 258 } 259 } 260 _ => { 261 self.remove_timer(&c); 262 } 263 } 264 if c.borrow().has_events() { 265 qtrace!([self], "Connection active: {:?}", c); 266 self.active.insert(ActiveConnectionRef { c: c.clone() }); 267 } 268 if matches!(c.borrow().state(), State::Closed(_)) { 269 self.connections 270 .borrow_mut() 271 .retain(|_, v| !Rc::ptr_eq(v, &c)); 272 } 273 out.dgram() 274 } 275 connection(&self, cid: &ConnectionIdRef) -> Option<StateRef>276 fn connection(&self, cid: &ConnectionIdRef) -> Option<StateRef> { 277 if let Some(c) = self.connections.borrow().get(&cid[..]) { 278 Some(c.clone()) 279 } else { 280 None 281 } 282 } 283 handle_initial( &mut self, dcid: ConnectionId, scid: ConnectionId, token: Vec<u8>, dgram: Datagram, now: Instant, ) -> Option<Datagram>284 fn handle_initial( 285 &mut self, 286 dcid: ConnectionId, 287 scid: ConnectionId, 288 token: Vec<u8>, 289 dgram: Datagram, 290 now: Instant, 291 ) -> Option<Datagram> { 292 match self.retry.validate(&token, dgram.source(), now) { 293 RetryTokenResult::Invalid => None, 294 RetryTokenResult::Pass => self.accept_connection(None, dgram, now), 295 RetryTokenResult::Valid(dcid) => self.accept_connection(Some(dcid), dgram, now), 296 RetryTokenResult::Validate => { 297 qinfo!([self], "Send retry for {:?}", dcid); 298 299 let res = self.retry.generate_token(&dcid, dgram.source(), now); 300 let token = if let Ok(t) = res { 301 t 302 } else { 303 qerror!([self], "unable to generate token, dropping packet"); 304 return None; 305 }; 306 let new_dcid = self.cid_manager.borrow_mut().generate_cid(); 307 let packet = PacketBuilder::retry(&scid, &new_dcid, &token, &dcid); 308 if let Ok(p) = packet { 309 let retry = Datagram::new(dgram.destination(), dgram.source(), p); 310 Some(retry) 311 } else { 312 qerror!([self], "unable to encode retry, dropping packet"); 313 None 314 } 315 } 316 } 317 } 318 accept_connection( &mut self, odcid: Option<ConnectionId>, dgram: Datagram, now: Instant, ) -> Option<Datagram>319 fn accept_connection( 320 &mut self, 321 odcid: Option<ConnectionId>, 322 dgram: Datagram, 323 now: Instant, 324 ) -> Option<Datagram> { 325 qinfo!([self], "Accept connection"); 326 // The internal connection ID manager that we use is not used directly. 327 // Instead, wrap it so that we can save connection IDs. 328 let cid_mgr = Rc::new(RefCell::new(ServerConnectionIdManager { 329 c: None, 330 cid_manager: self.cid_manager.clone(), 331 connections: self.connections.clone(), 332 })); 333 let sconn = Connection::new_server( 334 &self.certs, 335 &self.protocols, 336 &self.anti_replay, 337 cid_mgr.clone(), 338 ); 339 if let Ok(mut c) = sconn { 340 if let Some(odcid) = odcid { 341 c.original_connection_id(&odcid); 342 } 343 let c = Rc::new(RefCell::new(ServerConnectionState { c, last_timer: now })); 344 cid_mgr.borrow_mut().c = Some(c.clone()); 345 self.process_connection(c, Some(dgram), now) 346 } else { 347 qwarn!([self], "Unable to create connection"); 348 None 349 } 350 } 351 process_input(&mut self, dgram: Datagram, now: Instant) -> Option<Datagram>352 fn process_input(&mut self, dgram: Datagram, now: Instant) -> Option<Datagram> { 353 qtrace!("Process datagram: {}", hex(&dgram[..])); 354 355 // This is only looking at the first packet header in the datagram. 356 // All packets in the datagram are routed to the same connection. 357 let res = PublicPacket::decode(&dgram[..], self.cid_manager.borrow().as_decoder()); 358 let (packet, _remainder) = match res { 359 Ok(res) => res, 360 _ => { 361 qtrace!([self], "Discarding {:?}", dgram); 362 return None; 363 } 364 }; 365 366 // Finding an existing connection. Should be the most common case. 367 if let Some(c) = self.connection(packet.dcid()) { 368 return self.process_connection(c, Some(dgram), now); 369 } 370 371 if packet.packet_type() == PacketType::Short { 372 // TODO send a stateless reset here. 373 qtrace!([self], "Short header packet for an unknown connection"); 374 return None; 375 } 376 377 if dgram.len() < MIN_INITIAL_PACKET_SIZE { 378 qtrace!([self], "Bogus packet: too short"); 379 return None; 380 } 381 if packet.packet_type() == PacketType::OtherVersion { 382 let vn = PacketBuilder::version_negotiation(packet.scid(), packet.dcid()); 383 return Some(Datagram::new(dgram.destination(), dgram.source(), vn)); 384 } 385 386 // Copy values from `packet` because they are currently still borrowing from `dgram`. 387 let dcid = ConnectionId::from(packet.dcid()); 388 let scid = ConnectionId::from(packet.scid()); 389 let token = packet.token().to_vec(); 390 self.handle_initial(dcid, scid, token, dgram, now) 391 } 392 393 /// Iterate through the pending connections looking for any that might want 394 /// to send a datagram. Stop at the first one that does. process_next_output(&mut self, now: Instant) -> Option<Datagram>395 fn process_next_output(&mut self, now: Instant) -> Option<Datagram> { 396 qtrace!([self], "No packet to send, look at waiting connections"); 397 while let Some(c) = self.waiting.pop_front() { 398 if let Some(d) = self.process_connection(c, None, now) { 399 return Some(d); 400 } 401 } 402 qtrace!([self], "No packet to send still, run timers"); 403 while let Some(c) = self.timers.take_next(now) { 404 if let Some(d) = self.process_connection(c, None, now) { 405 return Some(d); 406 } 407 } 408 None 409 } 410 next_time(&mut self, now: Instant) -> Option<Duration>411 fn next_time(&mut self, now: Instant) -> Option<Duration> { 412 if self.waiting.is_empty() { 413 self.timers.next_time().map(|x| x - now) 414 } else { 415 Some(Duration::new(0, 0)) 416 } 417 } 418 process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output419 pub fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output { 420 let out = if let Some(d) = dgram { 421 self.process_input(d, now) 422 } else { 423 None 424 }; 425 let out = out.or_else(|| self.process_next_output(now)); 426 match out { 427 Some(d) => { 428 qtrace!([self], "Send packet: {:?}", d); 429 Output::Datagram(d) 430 } 431 _ => match self.next_time(now) { 432 Some(delay) => { 433 qtrace!([self], "Wait: {:?}", delay); 434 Output::Callback(delay) 435 } 436 _ => { 437 qtrace!([self], "Go dormant"); 438 Output::None 439 } 440 }, 441 } 442 } 443 444 /// This lists the connections that have received new events 445 /// as a result of calling `process()`. active_connections(&mut self) -> Vec<ActiveConnectionRef>446 pub fn active_connections(&mut self) -> Vec<ActiveConnectionRef> { 447 mem::take(&mut self.active).into_iter().collect() 448 } 449 add_to_waiting(&mut self, c: ActiveConnectionRef)450 pub fn add_to_waiting(&mut self, c: ActiveConnectionRef) { 451 self.waiting.push_back(c.connection()); 452 } 453 } 454 455 #[derive(Clone, Debug)] 456 pub struct ActiveConnectionRef { 457 c: StateRef, 458 } 459 460 impl ActiveConnectionRef { borrow<'a>(&'a self) -> impl Deref<Target = Connection> + 'a461 pub fn borrow<'a>(&'a self) -> impl Deref<Target = Connection> + 'a { 462 std::cell::Ref::map(self.c.borrow(), |c| &c.c) 463 } 464 borrow_mut<'a>(&'a mut self) -> impl DerefMut<Target = Connection> + 'a465 pub fn borrow_mut<'a>(&'a mut self) -> impl DerefMut<Target = Connection> + 'a { 466 std::cell::RefMut::map(self.c.borrow_mut(), |c| &mut c.c) 467 } 468 connection(&self) -> StateRef469 pub fn connection(&self) -> StateRef { 470 self.c.clone() 471 } 472 } 473 474 impl std::hash::Hash for ActiveConnectionRef { hash<H: std::hash::Hasher>(&self, state: &mut H)475 fn hash<H: std::hash::Hasher>(&self, state: &mut H) { 476 let ptr: *const _ = self.c.as_ref(); 477 ptr.hash(state) 478 } 479 } 480 481 impl PartialEq for ActiveConnectionRef { eq(&self, other: &Self) -> bool482 fn eq(&self, other: &Self) -> bool { 483 Rc::ptr_eq(&self.c, &other.c) 484 } 485 } 486 487 impl Eq for ActiveConnectionRef {} 488 489 struct ServerConnectionIdManager { 490 c: Option<StateRef>, 491 connections: ConnectionTableRef, 492 cid_manager: CidMgr, 493 } 494 495 impl ConnectionIdDecoder for ServerConnectionIdManager { decode_cid<'a>(&self, dec: &mut Decoder<'a>) -> Option<ConnectionIdRef<'a>>496 fn decode_cid<'a>(&self, dec: &mut Decoder<'a>) -> Option<ConnectionIdRef<'a>> { 497 self.cid_manager.borrow_mut().decode_cid(dec) 498 } 499 } 500 impl ConnectionIdManager for ServerConnectionIdManager { generate_cid(&mut self) -> ConnectionId501 fn generate_cid(&mut self) -> ConnectionId { 502 let cid = self.cid_manager.borrow_mut().generate_cid(); 503 assert!(!cid.is_empty()); 504 let v = self 505 .connections 506 .borrow_mut() 507 .insert(cid.clone(), self.c.as_ref().unwrap().clone()); 508 if let Some(v) = v { 509 debug_assert!(Rc::ptr_eq(&v, self.c.as_ref().unwrap())); 510 } 511 cid 512 } as_decoder(&self) -> &dyn ConnectionIdDecoder513 fn as_decoder(&self) -> &dyn ConnectionIdDecoder { 514 self 515 } 516 } 517 518 impl ::std::fmt::Display for Server { fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result519 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { 520 write!(f, "Server") 521 } 522 } 523