1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 #![deny(clippy::pedantic)]
8 
9 use super::{Connection, ConnectionError, ConnectionId, Output, State};
10 use crate::addr_valid::{AddressValidation, ValidateAddress};
11 use crate::cc::{CWND_INITIAL_PKTS, CWND_MIN};
12 use crate::cid::ConnectionIdRef;
13 use crate::events::ConnectionEvent;
14 use crate::path::PATH_MTU_V6;
15 use crate::recovery::ACK_ONLY_SIZE_LIMIT;
16 use crate::stats::{FrameStats, Stats, MAX_PTO_COUNTS};
17 use crate::{
18     ConnectionIdDecoder, ConnectionIdGenerator, ConnectionParameters, Error, StreamId, StreamType,
19 };
20 
21 use std::cell::RefCell;
22 use std::cmp::min;
23 use std::convert::TryFrom;
24 use std::mem;
25 use std::rc::Rc;
26 use std::time::{Duration, Instant};
27 
28 use neqo_common::{event::Provider, qdebug, qtrace, Datagram, Decoder, Role};
29 use neqo_crypto::{random, AllowZeroRtt, AuthenticationStatus, ResumptionToken};
30 use test_fixture::{self, addr, fixture_init, now};
31 
32 // All the tests.
33 mod ackrate;
34 mod cc;
35 mod close;
36 mod datagram;
37 mod fuzzing;
38 mod handshake;
39 mod idle;
40 mod keys;
41 mod migration;
42 mod priority;
43 mod recovery;
44 mod resumption;
45 mod stream;
46 mod vn;
47 mod zerortt;
48 
49 const DEFAULT_RTT: Duration = Duration::from_millis(100);
50 const AT_LEAST_PTO: Duration = Duration::from_secs(1);
51 const DEFAULT_STREAM_DATA: &[u8] = b"message";
52 /// The number of 1-RTT packets sent in `force_idle` by a client.
53 const FORCE_IDLE_CLIENT_1RTT_PACKETS: usize = 3;
54 
55 /// WARNING!  In this module, this version of the generator needs to be used.
56 /// This copies the implementation from
57 /// `test_fixture::CountingConnectionIdGenerator`, but it uses the different
58 /// types that are exposed to this module.  See also `default_client`.
59 ///
60 /// This version doesn't randomize the length; as the congestion control tests
61 /// count the amount of data sent precisely.
62 #[derive(Debug, Default)]
63 pub struct CountingConnectionIdGenerator {
64     counter: u32,
65 }
66 
67 impl ConnectionIdDecoder for CountingConnectionIdGenerator {
decode_cid<'a>(&self, dec: &mut Decoder<'a>) -> Option<ConnectionIdRef<'a>>68     fn decode_cid<'a>(&self, dec: &mut Decoder<'a>) -> Option<ConnectionIdRef<'a>> {
69         let len = usize::from(dec.peek_byte().unwrap());
70         dec.decode(len).map(ConnectionIdRef::from)
71     }
72 }
73 
74 impl ConnectionIdGenerator for CountingConnectionIdGenerator {
generate_cid(&mut self) -> Option<ConnectionId>75     fn generate_cid(&mut self) -> Option<ConnectionId> {
76         let mut r = random(20);
77         r[0] = 8;
78         r[1] = u8::try_from(self.counter >> 24).unwrap();
79         r[2] = u8::try_from((self.counter >> 16) & 0xff).unwrap();
80         r[3] = u8::try_from((self.counter >> 8) & 0xff).unwrap();
81         r[4] = u8::try_from(self.counter & 0xff).unwrap();
82         self.counter += 1;
83         Some(ConnectionId::from(&r[..8]))
84     }
85 
as_decoder(&self) -> &dyn ConnectionIdDecoder86     fn as_decoder(&self) -> &dyn ConnectionIdDecoder {
87         self
88     }
89 }
90 
91 // This is fabulous: because test_fixture uses the public API for Connection,
92 // it gets a different type to the ones that are referenced via super::super::*.
93 // Thus, this code can't use default_client() and default_server() from
94 // test_fixture because they produce different - and incompatible - types.
95 //
96 // These are a direct copy of those functions.
new_client(params: ConnectionParameters) -> Connection97 pub fn new_client(params: ConnectionParameters) -> Connection {
98     fixture_init();
99     Connection::new_client(
100         test_fixture::DEFAULT_SERVER_NAME,
101         test_fixture::DEFAULT_ALPN,
102         Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
103         addr(),
104         addr(),
105         params,
106         now(),
107     )
108     .expect("create a default client")
109 }
default_client() -> Connection110 pub fn default_client() -> Connection {
111     new_client(ConnectionParameters::default())
112 }
113 
new_server(params: ConnectionParameters) -> Connection114 pub fn new_server(params: ConnectionParameters) -> Connection {
115     fixture_init();
116 
117     let mut c = Connection::new_server(
118         test_fixture::DEFAULT_KEYS,
119         test_fixture::DEFAULT_ALPN,
120         Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
121         params,
122     )
123     .expect("create a default server");
124     c.server_enable_0rtt(&test_fixture::anti_replay(), AllowZeroRtt {})
125         .expect("enable 0-RTT");
126     c
127 }
default_server() -> Connection128 pub fn default_server() -> Connection {
129     new_server(ConnectionParameters::default())
130 }
131 
132 /// If state is `AuthenticationNeeded` call `authenticated()`. This function will
133 /// consume all outstanding events on the connection.
maybe_authenticate(conn: &mut Connection) -> bool134 pub fn maybe_authenticate(conn: &mut Connection) -> bool {
135     let authentication_needed = |e| matches!(e, ConnectionEvent::AuthenticationNeeded);
136     if conn.events().any(authentication_needed) {
137         conn.authenticated(AuthenticationStatus::Ok, now());
138         return true;
139     }
140     false
141 }
142 
143 /// Drive the handshake between the client and server.
handshake( client: &mut Connection, server: &mut Connection, now: Instant, rtt: Duration, ) -> Instant144 fn handshake(
145     client: &mut Connection,
146     server: &mut Connection,
147     now: Instant,
148     rtt: Duration,
149 ) -> Instant {
150     let mut a = client;
151     let mut b = server;
152     let mut now = now;
153 
154     let mut input = None;
155     let is_done = |c: &mut Connection| {
156         matches!(
157             c.state(),
158             State::Confirmed | State::Closing { .. } | State::Closed(..)
159         )
160     };
161 
162     while !is_done(a) {
163         let _ = maybe_authenticate(a);
164         let had_input = input.is_some();
165         let output = a.process(input, now).dgram();
166         assert!(had_input || output.is_some());
167         input = output;
168         qtrace!("handshake: t += {:?}", rtt / 2);
169         now += rtt / 2;
170         mem::swap(&mut a, &mut b);
171     }
172     if let Some(d) = input {
173         a.process_input(d, now);
174     }
175     now
176 }
177 
connect_fail( client: &mut Connection, server: &mut Connection, client_error: Error, server_error: Error, )178 fn connect_fail(
179     client: &mut Connection,
180     server: &mut Connection,
181     client_error: Error,
182     server_error: Error,
183 ) {
184     handshake(client, server, now(), Duration::new(0, 0));
185     assert_error(client, &ConnectionError::Transport(client_error));
186     assert_error(server, &ConnectionError::Transport(server_error));
187 }
188 
connect_with_rtt( client: &mut Connection, server: &mut Connection, now: Instant, rtt: Duration, ) -> Instant189 fn connect_with_rtt(
190     client: &mut Connection,
191     server: &mut Connection,
192     now: Instant,
193     rtt: Duration,
194 ) -> Instant {
195     fn check_rtt(stats: &Stats, rtt: Duration) {
196         assert_eq!(stats.rtt, rtt);
197         // Confirmation takes 2 round trips,
198         // so rttvar is reduced by 1/4 (from rtt/2).
199         assert_eq!(stats.rttvar, rtt * 3 / 8);
200     }
201     let now = handshake(client, server, now, rtt);
202     assert_eq!(*client.state(), State::Confirmed);
203     assert_eq!(*server.state(), State::Confirmed);
204 
205     check_rtt(&client.stats(), rtt);
206     check_rtt(&server.stats(), rtt);
207     now
208 }
209 
connect(client: &mut Connection, server: &mut Connection)210 fn connect(client: &mut Connection, server: &mut Connection) {
211     connect_with_rtt(client, server, now(), Duration::new(0, 0));
212 }
213 
assert_error(c: &Connection, err: &ConnectionError)214 fn assert_error(c: &Connection, err: &ConnectionError) {
215     match c.state() {
216         State::Closing { error, .. } | State::Draining { error, .. } | State::Closed(error) => {
217             assert_eq!(*error, *err);
218         }
219         _ => panic!("bad state {:?}", c.state()),
220     }
221 }
222 
exchange_ticket( client: &mut Connection, server: &mut Connection, now: Instant, ) -> ResumptionToken223 fn exchange_ticket(
224     client: &mut Connection,
225     server: &mut Connection,
226     now: Instant,
227 ) -> ResumptionToken {
228     let validation = AddressValidation::new(now, ValidateAddress::NoToken).unwrap();
229     let validation = Rc::new(RefCell::new(validation));
230     server.set_validation(Rc::clone(&validation));
231     server.send_ticket(now, &[]).expect("can send ticket");
232     let ticket = server.process_output(now).dgram();
233     assert!(ticket.is_some());
234     client.process_input(ticket.unwrap(), now);
235     assert_eq!(*client.state(), State::Confirmed);
236     get_tokens(client).pop().expect("should have token")
237 }
238 
239 /// Getting the client and server to reach an idle state is surprisingly hard.
240 /// The server sends `HANDSHAKE_DONE` at the end of the handshake, and the client
241 /// doesn't immediately acknowledge it.  Reordering packets does the trick.
force_idle( client: &mut Connection, server: &mut Connection, rtt: Duration, mut now: Instant, ) -> Instant242 fn force_idle(
243     client: &mut Connection,
244     server: &mut Connection,
245     rtt: Duration,
246     mut now: Instant,
247 ) -> Instant {
248     // The client has sent NEW_CONNECTION_ID, so ensure that the server generates
249     // an acknowledgment by sending some reordered packets.
250     qtrace!("force_idle: send reordered client packets");
251     let c1 = send_something(client, now);
252     let c2 = send_something(client, now);
253     now += rtt / 2;
254     server.process_input(c2, now);
255     server.process_input(c1, now);
256 
257     // Now do the same for the server.  (The ACK is in the first one.)
258     qtrace!("force_idle: send reordered server packets");
259     let s1 = send_something(server, now);
260     let s2 = send_something(server, now);
261     now += rtt / 2;
262     // Delivering s2 first at the client causes it to want to ACK.
263     client.process_input(s2, now);
264     // Delivering s1 should not have the client change its mind about the ACK.
265     let ack = client.process(Some(s1), now).dgram();
266     assert!(ack.is_some());
267     let idle_timeout = min(
268         client.conn_params.get_idle_timeout(),
269         server.conn_params.get_idle_timeout(),
270     );
271     assert_eq!(client.process_output(now), Output::Callback(idle_timeout));
272     now += rtt / 2;
273     assert_eq!(server.process(ack, now), Output::Callback(idle_timeout));
274     now
275 }
276 
277 /// Connect with an RTT and then force both peers to be idle.
connect_rtt_idle(client: &mut Connection, server: &mut Connection, rtt: Duration) -> Instant278 fn connect_rtt_idle(client: &mut Connection, server: &mut Connection, rtt: Duration) -> Instant {
279     let now = connect_with_rtt(client, server, now(), rtt);
280     let now = force_idle(client, server, rtt, now);
281     // Drain events from both as well.
282     let _ = client.events().count();
283     let _ = server.events().count();
284     qtrace!("----- connected and idle with RTT {:?}", rtt);
285     now
286 }
287 
connect_force_idle(client: &mut Connection, server: &mut Connection)288 fn connect_force_idle(client: &mut Connection, server: &mut Connection) {
289     connect_rtt_idle(client, server, Duration::new(0, 0));
290 }
291 
fill_stream(c: &mut Connection, stream: StreamId)292 fn fill_stream(c: &mut Connection, stream: StreamId) {
293     const BLOCK_SIZE: usize = 4_096;
294     loop {
295         let bytes_sent = c.stream_send(stream, &[0x42; BLOCK_SIZE]).unwrap();
296         qtrace!("fill_cwnd wrote {} bytes", bytes_sent);
297         if bytes_sent < BLOCK_SIZE {
298             break;
299         }
300     }
301 }
302 
303 /// This fills the congestion window from a single source.
304 /// As the pacer will interfere with this, this moves time forward
305 /// as `Output::Callback` is received.  Because it is hard to tell
306 /// from the return value whether a timeout is an ACK delay, PTO, or
307 /// pacing, this looks at the congestion window to tell when to stop.
308 /// Returns a list of datagrams and the new time.
fill_cwnd(c: &mut Connection, stream: StreamId, mut now: Instant) -> (Vec<Datagram>, Instant)309 fn fill_cwnd(c: &mut Connection, stream: StreamId, mut now: Instant) -> (Vec<Datagram>, Instant) {
310     // Train wreck function to get the remaining congestion window on the primary path.
311     fn cwnd(c: &Connection) -> usize {
312         c.paths.primary().borrow().sender().cwnd_avail()
313     }
314 
315     qtrace!("fill_cwnd starting cwnd: {}", cwnd(c));
316     fill_stream(c, stream);
317 
318     let mut total_dgrams = Vec::new();
319     loop {
320         let pkt = c.process_output(now);
321         qtrace!("fill_cwnd cwnd remaining={}, output: {:?}", cwnd(c), pkt);
322         match pkt {
323             Output::Datagram(dgram) => {
324                 total_dgrams.push(dgram);
325             }
326             Output::Callback(t) => {
327                 if cwnd(c) < ACK_ONLY_SIZE_LIMIT {
328                     break;
329                 }
330                 now += t;
331             }
332             Output::None => panic!(),
333         }
334     }
335 
336     qtrace!(
337         "fill_cwnd sent {} bytes",
338         total_dgrams.iter().map(|d| d.len()).sum::<usize>()
339     );
340     (total_dgrams, now)
341 }
342 
343 /// This function is like the combination of `fill_cwnd` and `ack_bytes`.
344 /// However, it acknowledges everything inline and preserves an RTT of `DEFAULT_RTT`.
increase_cwnd( sender: &mut Connection, receiver: &mut Connection, stream: StreamId, mut now: Instant, ) -> Instant345 fn increase_cwnd(
346     sender: &mut Connection,
347     receiver: &mut Connection,
348     stream: StreamId,
349     mut now: Instant,
350 ) -> Instant {
351     fill_stream(sender, stream);
352     loop {
353         let pkt = sender.process_output(now);
354         match pkt {
355             Output::Datagram(dgram) => {
356                 receiver.process_input(dgram, now + DEFAULT_RTT / 2);
357             }
358             Output::Callback(t) => {
359                 if t < DEFAULT_RTT {
360                     now += t;
361                 } else {
362                     break; // We're on PTO now.
363                 }
364             }
365             Output::None => panic!(),
366         }
367     }
368 
369     // Now acknowledge all those packets at once.
370     now += DEFAULT_RTT / 2;
371     let ack = receiver.process_output(now).dgram();
372     now += DEFAULT_RTT / 2;
373     sender.process_input(ack.unwrap(), now);
374     now
375 }
376 
377 /// Receive multiple packets and generate an ack-only packet.
378 /// # Panics
379 /// The caller is responsible for ensuring that `dest` has received
380 /// enough data that it wants to generate an ACK.  This panics if
381 /// no ACK frame is generated.
ack_bytes<D>(dest: &mut Connection, stream: StreamId, in_dgrams: D, now: Instant) -> Datagram where D: IntoIterator<Item = Datagram>, D::IntoIter: ExactSizeIterator,382 fn ack_bytes<D>(dest: &mut Connection, stream: StreamId, in_dgrams: D, now: Instant) -> Datagram
383 where
384     D: IntoIterator<Item = Datagram>,
385     D::IntoIter: ExactSizeIterator,
386 {
387     let mut srv_buf = [0; 4_096];
388 
389     let in_dgrams = in_dgrams.into_iter();
390     qdebug!([dest], "ack_bytes {} datagrams", in_dgrams.len());
391     for dgram in in_dgrams {
392         dest.process_input(dgram, now);
393     }
394 
395     loop {
396         let (bytes_read, _fin) = dest.stream_recv(stream, &mut srv_buf).unwrap();
397         qtrace!([dest], "ack_bytes read {} bytes", bytes_read);
398         if bytes_read == 0 {
399             break;
400         }
401     }
402 
403     dest.process_output(now).dgram().unwrap()
404 }
405 
406 // Get the current congestion window for the connection.
cwnd(c: &Connection) -> usize407 fn cwnd(c: &Connection) -> usize {
408     c.paths.primary().borrow().sender().cwnd()
409 }
cwnd_avail(c: &Connection) -> usize410 fn cwnd_avail(c: &Connection) -> usize {
411     c.paths.primary().borrow().sender().cwnd_avail()
412 }
413 
induce_persistent_congestion( client: &mut Connection, server: &mut Connection, stream: StreamId, mut now: Instant, ) -> Instant414 fn induce_persistent_congestion(
415     client: &mut Connection,
416     server: &mut Connection,
417     stream: StreamId,
418     mut now: Instant,
419 ) -> Instant {
420     // Note: wait some arbitrary time that should be longer than pto
421     // timer. This is rather brittle.
422     qtrace!([client], "induce_persistent_congestion");
423     now += AT_LEAST_PTO;
424 
425     let mut pto_counts = [0; MAX_PTO_COUNTS];
426     assert_eq!(client.stats.borrow().pto_counts, pto_counts);
427 
428     qtrace!([client], "first PTO");
429     let (c_tx_dgrams, next_now) = fill_cwnd(client, stream, now);
430     now = next_now;
431     assert_eq!(c_tx_dgrams.len(), 2); // Two PTO packets
432 
433     pto_counts[0] = 1;
434     assert_eq!(client.stats.borrow().pto_counts, pto_counts);
435 
436     qtrace!([client], "second PTO");
437     now += AT_LEAST_PTO * 2;
438     let (c_tx_dgrams, next_now) = fill_cwnd(client, stream, now);
439     now = next_now;
440     assert_eq!(c_tx_dgrams.len(), 2); // Two PTO packets
441 
442     pto_counts[0] = 0;
443     pto_counts[1] = 1;
444     assert_eq!(client.stats.borrow().pto_counts, pto_counts);
445 
446     qtrace!([client], "third PTO");
447     now += AT_LEAST_PTO * 4;
448     let (c_tx_dgrams, next_now) = fill_cwnd(client, stream, now);
449     now = next_now;
450     assert_eq!(c_tx_dgrams.len(), 2); // Two PTO packets
451 
452     pto_counts[1] = 0;
453     pto_counts[2] = 1;
454     assert_eq!(client.stats.borrow().pto_counts, pto_counts);
455 
456     // An ACK for the third PTO causes persistent congestion.
457     let s_ack = ack_bytes(server, stream, c_tx_dgrams, now);
458     client.process_input(s_ack, now);
459     assert_eq!(cwnd(client), CWND_MIN);
460     now
461 }
462 
463 /// This magic number is the size of the client's CWND after the handshake completes.
464 /// This is the same as the initial congestion window, because during the handshake
465 /// the cc is app limited and cwnd is not increased.
466 ///
467 /// As we change how we build packets, or even as NSS changes,
468 /// this number might be different.  The tests that depend on this
469 /// value could fail as a result of variations, so it's OK to just
470 /// change this value, but it is good to first understand where the
471 /// change came from.
472 const POST_HANDSHAKE_CWND: usize = PATH_MTU_V6 * CWND_INITIAL_PKTS;
473 
474 /// Determine the number of packets required to fill the CWND.
cwnd_packets(data: usize) -> usize475 const fn cwnd_packets(data: usize) -> usize {
476     // Add one if the last chunk is >= ACK_ONLY_SIZE_LIMIT.
477     (data + PATH_MTU_V6 - ACK_ONLY_SIZE_LIMIT) / PATH_MTU_V6
478 }
479 
480 /// Determine the size of the last packet.
481 /// The minimal size of a packet is `ACK_ONLY_SIZE_LIMIT`.
last_packet(cwnd: usize) -> usize482 fn last_packet(cwnd: usize) -> usize {
483     if (cwnd % PATH_MTU_V6) > ACK_ONLY_SIZE_LIMIT {
484         cwnd % PATH_MTU_V6
485     } else {
486         PATH_MTU_V6
487     }
488 }
489 
490 /// Assert that the set of packets fill the CWND.
assert_full_cwnd(packets: &[Datagram], cwnd: usize)491 fn assert_full_cwnd(packets: &[Datagram], cwnd: usize) {
492     assert_eq!(packets.len(), cwnd_packets(cwnd));
493     let (last, rest) = packets.split_last().unwrap();
494     assert!(rest.iter().all(|d| d.len() == PATH_MTU_V6));
495     assert_eq!(last.len(), last_packet(cwnd));
496 }
497 
498 /// Send something on a stream from `sender` to `receiver`.
499 /// Return the resulting datagram.
500 #[must_use]
send_something(sender: &mut Connection, now: Instant) -> Datagram501 fn send_something(sender: &mut Connection, now: Instant) -> Datagram {
502     let stream_id = sender.stream_create(StreamType::UniDi).unwrap();
503     assert!(sender.stream_send(stream_id, DEFAULT_STREAM_DATA).is_ok());
504     assert!(sender.stream_close_send(stream_id).is_ok());
505     qdebug!([sender], "send_something on {}", stream_id);
506     let dgram = sender.process(None, now).dgram();
507     dgram.expect("should have something to send")
508 }
509 
510 /// Send something on a stream from `sender` to `receiver`.
511 /// Return any ACK that might result.
send_and_receive( sender: &mut Connection, receiver: &mut Connection, now: Instant, ) -> Option<Datagram>512 fn send_and_receive(
513     sender: &mut Connection,
514     receiver: &mut Connection,
515     now: Instant,
516 ) -> Option<Datagram> {
517     let dgram = send_something(sender, now);
518     receiver.process(Some(dgram), now).dgram()
519 }
520 
get_tokens(client: &mut Connection) -> Vec<ResumptionToken>521 fn get_tokens(client: &mut Connection) -> Vec<ResumptionToken> {
522     client
523         .events()
524         .filter_map(|e| {
525             if let ConnectionEvent::ResumptionToken(token) = e {
526                 Some(token)
527             } else {
528                 None
529             }
530         })
531         .collect()
532 }
533 
assert_default_stats(stats: &Stats)534 fn assert_default_stats(stats: &Stats) {
535     assert_eq!(stats.packets_rx, 0);
536     assert_eq!(stats.packets_tx, 0);
537     let dflt_frames = FrameStats::default();
538     assert_eq!(stats.frame_rx, dflt_frames);
539     assert_eq!(stats.frame_tx, dflt_frames);
540 }
541 
542 #[test]
create_client()543 fn create_client() {
544     let client = default_client();
545     assert_eq!(client.role(), Role::Client);
546     assert!(matches!(client.state(), State::Init));
547     let stats = client.stats();
548     assert_default_stats(&stats);
549     assert_eq!(stats.rtt, crate::rtt::INITIAL_RTT);
550     assert_eq!(stats.rttvar, crate::rtt::INITIAL_RTT / 2);
551 }
552 
553 #[test]
create_server()554 fn create_server() {
555     let server = default_server();
556     assert_eq!(server.role(), Role::Server);
557     assert!(matches!(server.state(), State::Init));
558     let stats = server.stats();
559     assert_default_stats(&stats);
560     // Server won't have a default path, so no RTT.
561     assert_eq!(stats.rtt, Duration::from_secs(0));
562 }
563