1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 #![deny(clippy::pedantic)]
8 
9 use super::{Connection, ConnectionError, ConnectionId, Output, State, LOCAL_IDLE_TIMEOUT};
10 use crate::addr_valid::{AddressValidation, ValidateAddress};
11 use crate::cc::{CWND_INITIAL_PKTS, CWND_MIN};
12 use crate::cid::ConnectionIdRef;
13 use crate::events::ConnectionEvent;
14 use crate::path::PATH_MTU_V6;
15 use crate::recovery::ACK_ONLY_SIZE_LIMIT;
16 use crate::stats::MAX_PTO_COUNTS;
17 use crate::{ConnectionIdDecoder, ConnectionIdGenerator, ConnectionParameters, Error, StreamType};
18 
19 use std::cell::RefCell;
20 use std::convert::TryFrom;
21 use std::mem;
22 use std::rc::Rc;
23 use std::time::{Duration, Instant};
24 
25 use neqo_common::{event::Provider, qdebug, qtrace, Datagram, Decoder};
26 use neqo_crypto::{random, AllowZeroRtt, AuthenticationStatus, ResumptionToken};
27 use test_fixture::{self, addr, fixture_init, now};
28 
29 // All the tests.
30 mod ackrate;
31 mod cc;
32 mod close;
33 mod fuzzing;
34 mod handshake;
35 mod idle;
36 mod keys;
37 mod migration;
38 mod priority;
39 mod recovery;
40 mod resumption;
41 mod stream;
42 mod vn;
43 mod zerortt;
44 
45 const DEFAULT_RTT: Duration = Duration::from_millis(100);
46 const AT_LEAST_PTO: Duration = Duration::from_secs(1);
47 const DEFAULT_STREAM_DATA: &[u8] = b"message";
48 /// The number of 1-RTT packets sent in `force_idle` by a client.
49 const FORCE_IDLE_CLIENT_1RTT_PACKETS: usize = 3;
50 
51 /// WARNING!  In this module, this version of the generator needs to be used.
52 /// This copies the implementation from
53 /// `test_fixture::CountingConnectionIdGenerator`, but it uses the different
54 /// types that are exposed to this module.  See also `default_client`.
55 ///
56 /// This version doesn't randomize the length; as the congestion control tests
57 /// count the amount of data sent precisely.
58 #[derive(Debug, Default)]
59 pub struct CountingConnectionIdGenerator {
60     counter: u32,
61 }
62 
63 impl ConnectionIdDecoder for CountingConnectionIdGenerator {
decode_cid<'a>(&self, dec: &mut Decoder<'a>) -> Option<ConnectionIdRef<'a>>64     fn decode_cid<'a>(&self, dec: &mut Decoder<'a>) -> Option<ConnectionIdRef<'a>> {
65         let len = usize::from(dec.peek_byte().unwrap());
66         dec.decode(len).map(ConnectionIdRef::from)
67     }
68 }
69 
70 impl ConnectionIdGenerator for CountingConnectionIdGenerator {
generate_cid(&mut self) -> Option<ConnectionId>71     fn generate_cid(&mut self) -> Option<ConnectionId> {
72         let mut r = random(20);
73         r[0] = 8;
74         r[1] = u8::try_from(self.counter >> 24).unwrap();
75         r[2] = u8::try_from((self.counter >> 16) & 0xff).unwrap();
76         r[3] = u8::try_from((self.counter >> 8) & 0xff).unwrap();
77         r[4] = u8::try_from(self.counter & 0xff).unwrap();
78         self.counter += 1;
79         Some(ConnectionId::from(&r[..8]))
80     }
81 
as_decoder(&self) -> &dyn ConnectionIdDecoder82     fn as_decoder(&self) -> &dyn ConnectionIdDecoder {
83         self
84     }
85 }
86 
87 // This is fabulous: because test_fixture uses the public API for Connection,
88 // it gets a different type to the ones that are referenced via super::super::*.
89 // Thus, this code can't use default_client() and default_server() from
90 // test_fixture because they produce different - and incompatible - types.
91 //
92 // These are a direct copy of those functions.
new_client(params: ConnectionParameters) -> Connection93 pub fn new_client(params: ConnectionParameters) -> Connection {
94     fixture_init();
95     Connection::new_client(
96         test_fixture::DEFAULT_SERVER_NAME,
97         test_fixture::DEFAULT_ALPN,
98         Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
99         addr(),
100         addr(),
101         params,
102         now(),
103     )
104     .expect("create a default client")
105 }
default_client() -> Connection106 pub fn default_client() -> Connection {
107     new_client(ConnectionParameters::default())
108 }
109 
new_server(params: ConnectionParameters) -> Connection110 pub fn new_server(params: ConnectionParameters) -> Connection {
111     fixture_init();
112 
113     let mut c = Connection::new_server(
114         test_fixture::DEFAULT_KEYS,
115         test_fixture::DEFAULT_ALPN,
116         Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
117         params,
118     )
119     .expect("create a default server");
120     c.server_enable_0rtt(&test_fixture::anti_replay(), AllowZeroRtt {})
121         .expect("enable 0-RTT");
122     c
123 }
default_server() -> Connection124 pub fn default_server() -> Connection {
125     new_server(ConnectionParameters::default())
126 }
127 
128 /// If state is `AuthenticationNeeded` call `authenticated()`. This function will
129 /// consume all outstanding events on the connection.
maybe_authenticate(conn: &mut Connection) -> bool130 pub fn maybe_authenticate(conn: &mut Connection) -> bool {
131     let authentication_needed = |e| matches!(e, ConnectionEvent::AuthenticationNeeded);
132     if conn.events().any(authentication_needed) {
133         conn.authenticated(AuthenticationStatus::Ok, now());
134         return true;
135     }
136     false
137 }
138 
139 /// Drive the handshake between the client and server.
handshake( client: &mut Connection, server: &mut Connection, now: Instant, rtt: Duration, ) -> Instant140 fn handshake(
141     client: &mut Connection,
142     server: &mut Connection,
143     now: Instant,
144     rtt: Duration,
145 ) -> Instant {
146     let mut a = client;
147     let mut b = server;
148     let mut now = now;
149 
150     let mut input = None;
151     let is_done = |c: &mut Connection| {
152         matches!(
153             c.state(),
154             State::Confirmed | State::Closing { .. } | State::Closed(..)
155         )
156     };
157 
158     while !is_done(a) {
159         let _ = maybe_authenticate(a);
160         let had_input = input.is_some();
161         let output = a.process(input, now).dgram();
162         assert!(had_input || output.is_some());
163         input = output;
164         qtrace!("handshake: t += {:?}", rtt / 2);
165         now += rtt / 2;
166         mem::swap(&mut a, &mut b);
167     }
168     if let Some(d) = input {
169         a.process_input(d, now);
170     }
171     now
172 }
173 
connect_fail( client: &mut Connection, server: &mut Connection, client_error: Error, server_error: Error, )174 fn connect_fail(
175     client: &mut Connection,
176     server: &mut Connection,
177     client_error: Error,
178     server_error: Error,
179 ) {
180     handshake(client, server, now(), Duration::new(0, 0));
181     assert_error(client, &ConnectionError::Transport(client_error));
182     assert_error(server, &ConnectionError::Transport(server_error));
183 }
184 
connect_with_rtt( client: &mut Connection, server: &mut Connection, now: Instant, rtt: Duration, ) -> Instant185 fn connect_with_rtt(
186     client: &mut Connection,
187     server: &mut Connection,
188     now: Instant,
189     rtt: Duration,
190 ) -> Instant {
191     let now = handshake(client, server, now, rtt);
192     assert_eq!(*client.state(), State::Confirmed);
193     assert_eq!(*server.state(), State::Confirmed);
194 
195     assert_eq!(client.paths.rtt(), rtt);
196     assert_eq!(server.paths.rtt(), rtt);
197     now
198 }
199 
connect(client: &mut Connection, server: &mut Connection)200 fn connect(client: &mut Connection, server: &mut Connection) {
201     connect_with_rtt(client, server, now(), Duration::new(0, 0));
202 }
203 
assert_error(c: &Connection, err: &ConnectionError)204 fn assert_error(c: &Connection, err: &ConnectionError) {
205     match c.state() {
206         State::Closing { error, .. } | State::Draining { error, .. } | State::Closed(error) => {
207             assert_eq!(*error, *err);
208         }
209         _ => panic!("bad state {:?}", c.state()),
210     }
211 }
212 
exchange_ticket( client: &mut Connection, server: &mut Connection, now: Instant, ) -> ResumptionToken213 fn exchange_ticket(
214     client: &mut Connection,
215     server: &mut Connection,
216     now: Instant,
217 ) -> ResumptionToken {
218     let validation = AddressValidation::new(now, ValidateAddress::NoToken).unwrap();
219     let validation = Rc::new(RefCell::new(validation));
220     server.set_validation(Rc::clone(&validation));
221     server.send_ticket(now, &[]).expect("can send ticket");
222     let ticket = server.process_output(now).dgram();
223     assert!(ticket.is_some());
224     client.process_input(ticket.unwrap(), now);
225     assert_eq!(*client.state(), State::Confirmed);
226     get_tokens(client).pop().expect("should have token")
227 }
228 
229 /// Getting the client and server to reach an idle state is surprisingly hard.
230 /// The server sends `HANDSHAKE_DONE` at the end of the handshake, and the client
231 /// doesn't immediately acknowledge it.  Reordering packets does the trick.
force_idle( client: &mut Connection, server: &mut Connection, rtt: Duration, mut now: Instant, ) -> Instant232 fn force_idle(
233     client: &mut Connection,
234     server: &mut Connection,
235     rtt: Duration,
236     mut now: Instant,
237 ) -> Instant {
238     // The client has sent NEW_CONNECTION_ID, so ensure that the server generates
239     // an acknowledgment by sending some reordered packets.
240     qtrace!("force_idle: send reordered client packets");
241     let c1 = send_something(client, now);
242     let c2 = send_something(client, now);
243     now += rtt / 2;
244     server.process_input(c2, now);
245     server.process_input(c1, now);
246 
247     // Now do the same for the server.  (The ACK is in the first one.)
248     qtrace!("force_idle: send reordered server packets");
249     let s1 = send_something(server, now);
250     let s2 = send_something(server, now);
251     now += rtt / 2;
252     // Delivering s2 first at the client causes it to want to ACK.
253     client.process_input(s2, now);
254     // Delivering s1 should not have the client change its mind about the ACK.
255     let ack = client.process(Some(s1), now).dgram();
256     assert!(ack.is_some());
257     assert_eq!(
258         client.process_output(now),
259         Output::Callback(LOCAL_IDLE_TIMEOUT)
260     );
261     now += rtt / 2;
262     assert_eq!(
263         server.process(ack, now),
264         Output::Callback(LOCAL_IDLE_TIMEOUT)
265     );
266     now
267 }
268 
269 /// Connect with an RTT and then force both peers to be idle.
connect_rtt_idle(client: &mut Connection, server: &mut Connection, rtt: Duration) -> Instant270 fn connect_rtt_idle(client: &mut Connection, server: &mut Connection, rtt: Duration) -> Instant {
271     let now = connect_with_rtt(client, server, now(), rtt);
272     let now = force_idle(client, server, rtt, now);
273     // Drain events from both as well.
274     let _ = client.events().count();
275     let _ = server.events().count();
276     qtrace!("----- connected and idle with RTT {:?}", rtt);
277     now
278 }
279 
connect_force_idle(client: &mut Connection, server: &mut Connection)280 fn connect_force_idle(client: &mut Connection, server: &mut Connection) {
281     connect_rtt_idle(client, server, Duration::new(0, 0));
282 }
283 
fill_stream(c: &mut Connection, stream: u64)284 fn fill_stream(c: &mut Connection, stream: u64) {
285     const BLOCK_SIZE: usize = 4_096;
286     loop {
287         let bytes_sent = c.stream_send(stream, &[0x42; BLOCK_SIZE]).unwrap();
288         qtrace!("fill_cwnd wrote {} bytes", bytes_sent);
289         if bytes_sent < BLOCK_SIZE {
290             break;
291         }
292     }
293 }
294 
295 /// This fills the congestion window from a single source.
296 /// As the pacer will interfere with this, this moves time forward
297 /// as `Output::Callback` is received.  Because it is hard to tell
298 /// from the return value whether a timeout is an ACK delay, PTO, or
299 /// pacing, this looks at the congestion window to tell when to stop.
300 /// Returns a list of datagrams and the new time.
fill_cwnd(c: &mut Connection, stream: u64, mut now: Instant) -> (Vec<Datagram>, Instant)301 fn fill_cwnd(c: &mut Connection, stream: u64, mut now: Instant) -> (Vec<Datagram>, Instant) {
302     // Train wreck function to get the remaining congestion window on the primary path.
303     fn cwnd(c: &Connection) -> usize {
304         c.paths.primary().borrow().sender().cwnd_avail()
305     }
306 
307     qtrace!("fill_cwnd starting cwnd: {}", cwnd(c));
308     fill_stream(c, stream);
309 
310     let mut total_dgrams = Vec::new();
311     loop {
312         let pkt = c.process_output(now);
313         qtrace!("fill_cwnd cwnd remaining={}, output: {:?}", cwnd(c), pkt);
314         match pkt {
315             Output::Datagram(dgram) => {
316                 total_dgrams.push(dgram);
317             }
318             Output::Callback(t) => {
319                 if cwnd(c) < ACK_ONLY_SIZE_LIMIT {
320                     break;
321                 }
322                 now += t;
323             }
324             Output::None => panic!(),
325         }
326     }
327 
328     qtrace!(
329         "fill_cwnd sent {} bytes",
330         total_dgrams.iter().map(|d| d.len()).sum::<usize>()
331     );
332     (total_dgrams, now)
333 }
334 
335 /// This function is like the combination of `fill_cwnd` and `ack_bytes`.
336 /// However, it acknowledges everything inline and preserves an RTT of `DEFAULT_RTT`.
increase_cwnd( sender: &mut Connection, receiver: &mut Connection, stream: u64, mut now: Instant, ) -> Instant337 fn increase_cwnd(
338     sender: &mut Connection,
339     receiver: &mut Connection,
340     stream: u64,
341     mut now: Instant,
342 ) -> Instant {
343     fill_stream(sender, stream);
344     loop {
345         let pkt = sender.process_output(now);
346         match pkt {
347             Output::Datagram(dgram) => {
348                 receiver.process_input(dgram, now + DEFAULT_RTT / 2);
349             }
350             Output::Callback(t) => {
351                 if t < DEFAULT_RTT {
352                     now += t;
353                 } else {
354                     break; // We're on PTO now.
355                 }
356             }
357             Output::None => panic!(),
358         }
359     }
360 
361     // Now acknowledge all those packets at once.
362     now += DEFAULT_RTT / 2;
363     let ack = receiver.process_output(now).dgram();
364     now += DEFAULT_RTT / 2;
365     sender.process_input(ack.unwrap(), now);
366     now
367 }
368 
369 /// Receive multiple packets and generate an ack-only packet.
370 /// # Panics
371 /// The caller is responsible for ensuring that `dest` has received
372 /// enough data that it wants to generate an ACK.  This panics if
373 /// no ACK frame is generated.
ack_bytes<D>(dest: &mut Connection, stream: u64, in_dgrams: D, now: Instant) -> Datagram where D: IntoIterator<Item = Datagram>, D::IntoIter: ExactSizeIterator,374 fn ack_bytes<D>(dest: &mut Connection, stream: u64, in_dgrams: D, now: Instant) -> Datagram
375 where
376     D: IntoIterator<Item = Datagram>,
377     D::IntoIter: ExactSizeIterator,
378 {
379     let mut srv_buf = [0; 4_096];
380 
381     let in_dgrams = in_dgrams.into_iter();
382     qdebug!([dest], "ack_bytes {} datagrams", in_dgrams.len());
383     for dgram in in_dgrams {
384         dest.process_input(dgram, now);
385     }
386 
387     loop {
388         let (bytes_read, _fin) = dest.stream_recv(stream, &mut srv_buf).unwrap();
389         qtrace!([dest], "ack_bytes read {} bytes", bytes_read);
390         if bytes_read == 0 {
391             break;
392         }
393     }
394 
395     dest.process_output(now).dgram().unwrap()
396 }
397 
398 // Get the current congestion window for the connection.
cwnd(c: &Connection) -> usize399 fn cwnd(c: &Connection) -> usize {
400     c.paths.primary().borrow().sender().cwnd()
401 }
cwnd_avail(c: &Connection) -> usize402 fn cwnd_avail(c: &Connection) -> usize {
403     c.paths.primary().borrow().sender().cwnd_avail()
404 }
405 
induce_persistent_congestion( client: &mut Connection, server: &mut Connection, stream: u64, mut now: Instant, ) -> Instant406 fn induce_persistent_congestion(
407     client: &mut Connection,
408     server: &mut Connection,
409     stream: u64,
410     mut now: Instant,
411 ) -> Instant {
412     // Note: wait some arbitrary time that should be longer than pto
413     // timer. This is rather brittle.
414     qtrace!([client], "induce_persistent_congestion");
415     now += AT_LEAST_PTO;
416 
417     let mut pto_counts = [0; MAX_PTO_COUNTS];
418     assert_eq!(client.stats.borrow().pto_counts, pto_counts);
419 
420     qtrace!([client], "first PTO");
421     let (c_tx_dgrams, next_now) = fill_cwnd(client, stream, now);
422     now = next_now;
423     assert_eq!(c_tx_dgrams.len(), 2); // Two PTO packets
424 
425     pto_counts[0] = 1;
426     assert_eq!(client.stats.borrow().pto_counts, pto_counts);
427 
428     qtrace!([client], "second PTO");
429     now += AT_LEAST_PTO * 2;
430     let (c_tx_dgrams, next_now) = fill_cwnd(client, stream, now);
431     now = next_now;
432     assert_eq!(c_tx_dgrams.len(), 2); // Two PTO packets
433 
434     pto_counts[0] = 0;
435     pto_counts[1] = 1;
436     assert_eq!(client.stats.borrow().pto_counts, pto_counts);
437 
438     qtrace!([client], "third PTO");
439     now += AT_LEAST_PTO * 4;
440     let (c_tx_dgrams, next_now) = fill_cwnd(client, stream, now);
441     now = next_now;
442     assert_eq!(c_tx_dgrams.len(), 2); // Two PTO packets
443 
444     pto_counts[1] = 0;
445     pto_counts[2] = 1;
446     assert_eq!(client.stats.borrow().pto_counts, pto_counts);
447 
448     // An ACK for the third PTO causes persistent congestion.
449     let s_ack = ack_bytes(server, stream, c_tx_dgrams, now);
450     client.process_input(s_ack, now);
451     assert_eq!(cwnd(client), CWND_MIN);
452     now
453 }
454 
455 /// This magic number is the size of the client's CWND after the handshake completes.
456 /// This is the same as the initial congestion window, because during the handshake
457 /// the cc is app limited and cwnd is not increased.
458 ///
459 /// As we change how we build packets, or even as NSS changes,
460 /// this number might be different.  The tests that depend on this
461 /// value could fail as a result of variations, so it's OK to just
462 /// change this value, but it is good to first understand where the
463 /// change came from.
464 const POST_HANDSHAKE_CWND: usize = PATH_MTU_V6 * CWND_INITIAL_PKTS;
465 
466 /// Determine the number of packets required to fill the CWND.
cwnd_packets(data: usize) -> usize467 const fn cwnd_packets(data: usize) -> usize {
468     // Add one if the last chunk is >= ACK_ONLY_SIZE_LIMIT.
469     (data + PATH_MTU_V6 - ACK_ONLY_SIZE_LIMIT) / PATH_MTU_V6
470 }
471 
472 /// Determine the size of the last packet.
473 /// The minimal size of a packet is `ACK_ONLY_SIZE_LIMIT`.
last_packet(cwnd: usize) -> usize474 fn last_packet(cwnd: usize) -> usize {
475     if (cwnd % PATH_MTU_V6) > ACK_ONLY_SIZE_LIMIT {
476         cwnd % PATH_MTU_V6
477     } else {
478         PATH_MTU_V6
479     }
480 }
481 
482 /// Assert that the set of packets fill the CWND.
assert_full_cwnd(packets: &[Datagram], cwnd: usize)483 fn assert_full_cwnd(packets: &[Datagram], cwnd: usize) {
484     assert_eq!(packets.len(), cwnd_packets(cwnd));
485     let (last, rest) = packets.split_last().unwrap();
486     assert!(rest.iter().all(|d| d.len() == PATH_MTU_V6));
487     assert_eq!(last.len(), last_packet(cwnd));
488 }
489 
490 /// Send something on a stream from `sender` to `receiver`.
491 /// Return the resulting datagram.
492 #[must_use]
send_something(sender: &mut Connection, now: Instant) -> Datagram493 fn send_something(sender: &mut Connection, now: Instant) -> Datagram {
494     let stream_id = sender.stream_create(StreamType::UniDi).unwrap();
495     assert!(sender.stream_send(stream_id, DEFAULT_STREAM_DATA).is_ok());
496     assert!(sender.stream_close_send(stream_id).is_ok());
497     qdebug!([sender], "send_something on {}", stream_id);
498     let dgram = sender.process(None, now).dgram();
499     dgram.expect("should have something to send")
500 }
501 
502 /// Send something on a stream from `sender` to `receiver`.
503 /// Return any ACK that might result.
send_and_receive( sender: &mut Connection, receiver: &mut Connection, now: Instant, ) -> Option<Datagram>504 fn send_and_receive(
505     sender: &mut Connection,
506     receiver: &mut Connection,
507     now: Instant,
508 ) -> Option<Datagram> {
509     let dgram = send_something(sender, now);
510     receiver.process(Some(dgram), now).dgram()
511 }
512 
get_tokens(client: &mut Connection) -> Vec<ResumptionToken>513 fn get_tokens(client: &mut Connection) -> Vec<ResumptionToken> {
514     client
515         .events()
516         .filter_map(|e| {
517             if let ConnectionEvent::ResumptionToken(token) = e {
518                 Some(token)
519             } else {
520                 None
521             }
522         })
523         .collect()
524 }
525