1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 use super::super::{Connection, Error, Output};
8 use super::{connect, default_client, default_server, fill_cwnd, maybe_authenticate};
9 use crate::addr_valid::{AddressValidation, ValidateAddress};
10 use crate::send_stream::{RetransmissionPriority, TransmissionPriority};
11 use crate::{ConnectionEvent, StreamType};
12 
13 use neqo_common::event::Provider;
14 use std::cell::RefCell;
15 use std::mem;
16 use std::rc::Rc;
17 use test_fixture::{self, now};
18 
19 const BLOCK_SIZE: usize = 4_096;
20 
fill_stream(c: &mut Connection, id: u64)21 fn fill_stream(c: &mut Connection, id: u64) {
22     loop {
23         if c.stream_send(id, &[0x42; BLOCK_SIZE]).unwrap() < BLOCK_SIZE {
24             return;
25         }
26     }
27 }
28 
29 /// A receive stream cannot be prioritized (yet).
30 #[test]
receive_stream()31 fn receive_stream() {
32     const MESSAGE: &[u8] = b"hello";
33     let mut client = default_client();
34     let mut server = default_server();
35     connect(&mut client, &mut server);
36 
37     let id = client.stream_create(StreamType::UniDi).unwrap();
38     assert_eq!(MESSAGE.len(), client.stream_send(id, MESSAGE).unwrap());
39     let dgram = client.process_output(now()).dgram();
40 
41     server.process_input(dgram.unwrap(), now());
42     assert_eq!(
43         server
44             .stream_priority(
45                 id,
46                 TransmissionPriority::default(),
47                 RetransmissionPriority::default()
48             )
49             .unwrap_err(),
50         Error::InvalidStreamId,
51         "Priority doesn't apply to inbound unidirectional streams"
52     );
53 
54     // But the stream does exist and can be read.
55     let mut buf = [0; 10];
56     let (len, end) = server.stream_recv(id, &mut buf).unwrap();
57     assert_eq!(MESSAGE, &buf[..len]);
58     assert!(!end);
59 }
60 
61 /// Higher priority streams get sent ahead of lower ones, even when
62 /// the higher priority stream is written to later.
63 #[test]
relative()64 fn relative() {
65     let mut client = default_client();
66     let mut server = default_server();
67     connect(&mut client, &mut server);
68 
69     // id_normal is created first, but it is lower priority.
70     let id_normal = client.stream_create(StreamType::UniDi).unwrap();
71     fill_stream(&mut client, id_normal);
72     let high = client.stream_create(StreamType::UniDi).unwrap();
73     fill_stream(&mut client, high);
74     client
75         .stream_priority(
76             high,
77             TransmissionPriority::High,
78             RetransmissionPriority::default(),
79         )
80         .unwrap();
81 
82     let dgram = client.process_output(now()).dgram();
83     server.process_input(dgram.unwrap(), now());
84 
85     // The "id_normal" stream will get a `NewStream` event, but no data.
86     for e in server.events() {
87         if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
88             assert_ne!(stream_id, id_normal);
89         }
90     }
91 }
92 
93 /// Check that changing priority has effect on the next packet that is sent.
94 #[test]
reprioritize()95 fn reprioritize() {
96     let mut client = default_client();
97     let mut server = default_server();
98     connect(&mut client, &mut server);
99 
100     // id_normal is created first, but it is lower priority.
101     let id_normal = client.stream_create(StreamType::UniDi).unwrap();
102     fill_stream(&mut client, id_normal);
103     let id_high = client.stream_create(StreamType::UniDi).unwrap();
104     fill_stream(&mut client, id_high);
105     client
106         .stream_priority(
107             id_high,
108             TransmissionPriority::High,
109             RetransmissionPriority::default(),
110         )
111         .unwrap();
112 
113     let dgram = client.process_output(now()).dgram();
114     server.process_input(dgram.unwrap(), now());
115 
116     // The "id_normal" stream will get a `NewStream` event, but no data.
117     for e in server.events() {
118         if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
119             assert_ne!(stream_id, id_normal);
120         }
121     }
122 
123     // When the high priority stream drops in priority, the streams are equal
124     // priority and so their stream ID determines what is sent.
125     client
126         .stream_priority(
127             id_high,
128             TransmissionPriority::Normal,
129             RetransmissionPriority::default(),
130         )
131         .unwrap();
132     let dgram = client.process_output(now()).dgram();
133     server.process_input(dgram.unwrap(), now());
134 
135     for e in server.events() {
136         if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
137             assert_ne!(stream_id, id_high);
138         }
139     }
140 }
141 
142 /// Retransmission can be prioritized differently (usually higher).
143 #[test]
repairing_loss()144 fn repairing_loss() {
145     let mut client = default_client();
146     let mut server = default_server();
147     connect(&mut client, &mut server);
148     let mut now = now();
149 
150     // Send a few packets at low priority, lose one.
151     let id_low = client.stream_create(StreamType::UniDi).unwrap();
152     fill_stream(&mut client, id_low);
153     client
154         .stream_priority(
155             id_low,
156             TransmissionPriority::Low,
157             RetransmissionPriority::Higher,
158         )
159         .unwrap();
160 
161     let _lost = client.process_output(now).dgram();
162     for _ in 0..5 {
163         match client.process_output(now) {
164             Output::Datagram(d) => server.process_input(d, now),
165             Output::Callback(delay) => now += delay,
166             Output::None => unreachable!(),
167         }
168     }
169 
170     // Generate an ACK.  The first packet is now considered lost.
171     let ack = server.process_output(now).dgram();
172     let _ = server.events().count(); // Drain events.
173 
174     let id_normal = client.stream_create(StreamType::UniDi).unwrap();
175     fill_stream(&mut client, id_normal);
176 
177     let dgram = client.process(ack, now).dgram();
178     assert_eq!(client.stats().lost, 1); // Client should have noticed the loss.
179     server.process_input(dgram.unwrap(), now);
180 
181     // Only the low priority stream has data as the retransmission of the data from
182     // the lost packet is now more important than new data from the high priority stream.
183     for e in server.events() {
184         println!("Event: {:?}", e);
185         if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
186             assert_eq!(stream_id, id_low);
187         }
188     }
189 
190     // However, only the retransmission is prioritized.
191     // Though this might contain some retransmitted data, as other frames might push
192     // the retransmitted data into a second packet, it will also contain data from the
193     // normal priority stream.
194     let dgram = client.process_output(now).dgram();
195     server.process_input(dgram.unwrap(), now);
196     assert!(server.events().any(
197         |e| matches!(e, ConnectionEvent::RecvStreamReadable { stream_id } if stream_id == id_normal),
198     ));
199 }
200 
201 #[test]
critical()202 fn critical() {
203     let mut client = default_client();
204     let mut server = default_server();
205     let now = now();
206 
207     // Rather than connect, send stream data in 0.5-RTT.
208     // That allows this to test that critical streams pre-empt most frame types.
209     let dgram = client.process_output(now).dgram();
210     let dgram = server.process(dgram, now).dgram();
211     client.process_input(dgram.unwrap(), now);
212     maybe_authenticate(&mut client);
213 
214     let id = server.stream_create(StreamType::UniDi).unwrap();
215     server
216         .stream_priority(
217             id,
218             TransmissionPriority::Critical,
219             RetransmissionPriority::default(),
220         )
221         .unwrap();
222 
223     // Can't use fill_cwnd here because the server is blocked on the amplification
224     // limit, so it can't fill the congestion window.
225     while server.stream_create(StreamType::UniDi).is_ok() {}
226 
227     fill_stream(&mut server, id);
228     let stats_before = server.stats().frame_tx;
229     let dgram = server.process_output(now).dgram();
230     let stats_after = server.stats().frame_tx;
231     assert_eq!(stats_after.crypto, stats_before.crypto);
232     assert_eq!(stats_after.streams_blocked, 0);
233     assert_eq!(stats_after.new_connection_id, 0);
234     assert_eq!(stats_after.new_token, 0);
235     assert_eq!(stats_after.handshake_done, 0);
236 
237     // Complete the handshake.
238     let dgram = client.process(dgram, now).dgram();
239     server.process_input(dgram.unwrap(), now);
240 
241     // Critical beats everything but HANDSHAKE_DONE.
242     let stats_before = server.stats().frame_tx;
243     mem::drop(fill_cwnd(&mut server, id, now));
244     let stats_after = server.stats().frame_tx;
245     assert_eq!(stats_after.crypto, stats_before.crypto);
246     assert_eq!(stats_after.streams_blocked, 0);
247     assert_eq!(stats_after.new_connection_id, 0);
248     assert_eq!(stats_after.new_token, 0);
249     assert_eq!(stats_after.handshake_done, 1);
250 }
251 
252 #[test]
important()253 fn important() {
254     let mut client = default_client();
255     let mut server = default_server();
256     let now = now();
257 
258     // Rather than connect, send stream data in 0.5-RTT.
259     // That allows this to test that important streams pre-empt most frame types.
260     let dgram = client.process_output(now).dgram();
261     let dgram = server.process(dgram, now).dgram();
262     client.process_input(dgram.unwrap(), now);
263     maybe_authenticate(&mut client);
264 
265     let id = server.stream_create(StreamType::UniDi).unwrap();
266     server
267         .stream_priority(
268             id,
269             TransmissionPriority::Important,
270             RetransmissionPriority::default(),
271         )
272         .unwrap();
273     fill_stream(&mut server, id);
274 
275     // Important beats everything but flow control.
276     // Make enough streams to get a STREAMS_BLOCKED frame out.
277     while server.stream_create(StreamType::UniDi).is_ok() {}
278 
279     let stats_before = server.stats().frame_tx;
280     let dgram = server.process_output(now).dgram();
281     let stats_after = server.stats().frame_tx;
282     assert_eq!(stats_after.crypto, stats_before.crypto);
283     assert_eq!(stats_after.streams_blocked, 1);
284     assert_eq!(stats_after.new_connection_id, 0);
285     assert_eq!(stats_after.new_token, 0);
286     assert_eq!(stats_after.handshake_done, 0);
287     assert_eq!(stats_after.stream, stats_before.stream + 1);
288 
289     // Complete the handshake.
290     let dgram = client.process(dgram, now).dgram();
291     server.process_input(dgram.unwrap(), now);
292 
293     // Important beats everything but flow control.
294     let stats_before = server.stats().frame_tx;
295     mem::drop(fill_cwnd(&mut server, id, now));
296     let stats_after = server.stats().frame_tx;
297     assert_eq!(stats_after.crypto, stats_before.crypto);
298     assert_eq!(stats_after.streams_blocked, 1);
299     assert_eq!(stats_after.new_connection_id, 0);
300     assert_eq!(stats_after.new_token, 0);
301     assert_eq!(stats_after.handshake_done, 1);
302     assert!(stats_after.stream > stats_before.stream);
303 }
304 
305 #[test]
high_normal()306 fn high_normal() {
307     let mut client = default_client();
308     let mut server = default_server();
309     let now = now();
310 
311     // Rather than connect, send stream data in 0.5-RTT.
312     // That allows this to test that important streams pre-empt most frame types.
313     let dgram = client.process_output(now).dgram();
314     let dgram = server.process(dgram, now).dgram();
315     client.process_input(dgram.unwrap(), now);
316     maybe_authenticate(&mut client);
317 
318     let id = server.stream_create(StreamType::UniDi).unwrap();
319     server
320         .stream_priority(
321             id,
322             TransmissionPriority::High,
323             RetransmissionPriority::default(),
324         )
325         .unwrap();
326     fill_stream(&mut server, id);
327 
328     // Important beats everything but flow control.
329     // Make enough streams to get a STREAMS_BLOCKED frame out.
330     while server.stream_create(StreamType::UniDi).is_ok() {}
331 
332     let stats_before = server.stats().frame_tx;
333     let dgram = server.process_output(now).dgram();
334     let stats_after = server.stats().frame_tx;
335     assert_eq!(stats_after.crypto, stats_before.crypto);
336     assert_eq!(stats_after.streams_blocked, 1);
337     assert_eq!(stats_after.new_connection_id, 0);
338     assert_eq!(stats_after.new_token, 0);
339     assert_eq!(stats_after.handshake_done, 0);
340     assert_eq!(stats_after.stream, stats_before.stream + 1);
341 
342     // Complete the handshake.
343     let dgram = client.process(dgram, now).dgram();
344     server.process_input(dgram.unwrap(), now);
345 
346     // High or Normal doesn't beat NEW_CONNECTION_ID,
347     // but they beat CRYPTO/NEW_TOKEN.
348     let stats_before = server.stats().frame_tx;
349     server.send_ticket(now, &[]).unwrap();
350     mem::drop(fill_cwnd(&mut server, id, now));
351     let stats_after = server.stats().frame_tx;
352     assert_eq!(stats_after.crypto, stats_before.crypto);
353     assert_eq!(stats_after.streams_blocked, 1);
354     assert_ne!(stats_after.new_connection_id, 0); // Note: > 0
355     assert_eq!(stats_after.new_token, 0);
356     assert_eq!(stats_after.handshake_done, 1);
357     assert!(stats_after.stream > stats_before.stream);
358 }
359 
360 #[test]
low()361 fn low() {
362     let mut client = default_client();
363     let mut server = default_server();
364     let now = now();
365     // Use address validation; note that we need to hold a strong reference
366     // as the server will only hold a weak reference.
367     let validation = Rc::new(RefCell::new(
368         AddressValidation::new(now, ValidateAddress::Never).unwrap(),
369     ));
370     server.set_validation(Rc::clone(&validation));
371     connect(&mut client, &mut server);
372 
373     let id = server.stream_create(StreamType::UniDi).unwrap();
374     server
375         .stream_priority(
376             id,
377             TransmissionPriority::Low,
378             RetransmissionPriority::default(),
379         )
380         .unwrap();
381     fill_stream(&mut server, id);
382 
383     // Send a session ticket and make it big enough to require a whole packet.
384     // The resulting CRYPTO frame beats out the stream data.
385     let stats_before = server.stats().frame_tx;
386     server.send_ticket(now, &[0; 2048]).unwrap();
387     mem::drop(server.process_output(now));
388     let stats_after = server.stats().frame_tx;
389     assert_eq!(stats_after.crypto, stats_before.crypto + 1);
390     assert_eq!(stats_after.stream, stats_before.stream);
391 
392     // The above can't test if NEW_TOKEN wins because once that fits in a packet,
393     // it is very hard to ensure that the STREAM frame won't also fit.
394     // However, we can ensure that the next packet doesn't consist of just STREAM.
395     let stats_before = server.stats().frame_tx;
396     mem::drop(server.process_output(now));
397     let stats_after = server.stats().frame_tx;
398     assert_eq!(stats_after.crypto, stats_before.crypto + 1);
399     assert_eq!(stats_after.new_token, 1);
400     assert_eq!(stats_after.stream, stats_before.stream + 1);
401 }
402