1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6
7 use super::super::{Connection, Error, Output};
8 use super::{connect, default_client, default_server, fill_cwnd, maybe_authenticate};
9 use crate::addr_valid::{AddressValidation, ValidateAddress};
10 use crate::send_stream::{RetransmissionPriority, TransmissionPriority};
11 use crate::{ConnectionEvent, StreamType};
12
13 use neqo_common::event::Provider;
14 use std::cell::RefCell;
15 use std::mem;
16 use std::rc::Rc;
17 use test_fixture::{self, now};
18
19 const BLOCK_SIZE: usize = 4_096;
20
fill_stream(c: &mut Connection, id: u64)21 fn fill_stream(c: &mut Connection, id: u64) {
22 loop {
23 if c.stream_send(id, &[0x42; BLOCK_SIZE]).unwrap() < BLOCK_SIZE {
24 return;
25 }
26 }
27 }
28
29 /// A receive stream cannot be prioritized (yet).
30 #[test]
receive_stream()31 fn receive_stream() {
32 const MESSAGE: &[u8] = b"hello";
33 let mut client = default_client();
34 let mut server = default_server();
35 connect(&mut client, &mut server);
36
37 let id = client.stream_create(StreamType::UniDi).unwrap();
38 assert_eq!(MESSAGE.len(), client.stream_send(id, MESSAGE).unwrap());
39 let dgram = client.process_output(now()).dgram();
40
41 server.process_input(dgram.unwrap(), now());
42 assert_eq!(
43 server
44 .stream_priority(
45 id,
46 TransmissionPriority::default(),
47 RetransmissionPriority::default()
48 )
49 .unwrap_err(),
50 Error::InvalidStreamId,
51 "Priority doesn't apply to inbound unidirectional streams"
52 );
53
54 // But the stream does exist and can be read.
55 let mut buf = [0; 10];
56 let (len, end) = server.stream_recv(id, &mut buf).unwrap();
57 assert_eq!(MESSAGE, &buf[..len]);
58 assert!(!end);
59 }
60
61 /// Higher priority streams get sent ahead of lower ones, even when
62 /// the higher priority stream is written to later.
63 #[test]
relative()64 fn relative() {
65 let mut client = default_client();
66 let mut server = default_server();
67 connect(&mut client, &mut server);
68
69 // id_normal is created first, but it is lower priority.
70 let id_normal = client.stream_create(StreamType::UniDi).unwrap();
71 fill_stream(&mut client, id_normal);
72 let high = client.stream_create(StreamType::UniDi).unwrap();
73 fill_stream(&mut client, high);
74 client
75 .stream_priority(
76 high,
77 TransmissionPriority::High,
78 RetransmissionPriority::default(),
79 )
80 .unwrap();
81
82 let dgram = client.process_output(now()).dgram();
83 server.process_input(dgram.unwrap(), now());
84
85 // The "id_normal" stream will get a `NewStream` event, but no data.
86 for e in server.events() {
87 if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
88 assert_ne!(stream_id, id_normal);
89 }
90 }
91 }
92
93 /// Check that changing priority has effect on the next packet that is sent.
94 #[test]
reprioritize()95 fn reprioritize() {
96 let mut client = default_client();
97 let mut server = default_server();
98 connect(&mut client, &mut server);
99
100 // id_normal is created first, but it is lower priority.
101 let id_normal = client.stream_create(StreamType::UniDi).unwrap();
102 fill_stream(&mut client, id_normal);
103 let id_high = client.stream_create(StreamType::UniDi).unwrap();
104 fill_stream(&mut client, id_high);
105 client
106 .stream_priority(
107 id_high,
108 TransmissionPriority::High,
109 RetransmissionPriority::default(),
110 )
111 .unwrap();
112
113 let dgram = client.process_output(now()).dgram();
114 server.process_input(dgram.unwrap(), now());
115
116 // The "id_normal" stream will get a `NewStream` event, but no data.
117 for e in server.events() {
118 if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
119 assert_ne!(stream_id, id_normal);
120 }
121 }
122
123 // When the high priority stream drops in priority, the streams are equal
124 // priority and so their stream ID determines what is sent.
125 client
126 .stream_priority(
127 id_high,
128 TransmissionPriority::Normal,
129 RetransmissionPriority::default(),
130 )
131 .unwrap();
132 let dgram = client.process_output(now()).dgram();
133 server.process_input(dgram.unwrap(), now());
134
135 for e in server.events() {
136 if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
137 assert_ne!(stream_id, id_high);
138 }
139 }
140 }
141
142 /// Retransmission can be prioritized differently (usually higher).
143 #[test]
repairing_loss()144 fn repairing_loss() {
145 let mut client = default_client();
146 let mut server = default_server();
147 connect(&mut client, &mut server);
148 let mut now = now();
149
150 // Send a few packets at low priority, lose one.
151 let id_low = client.stream_create(StreamType::UniDi).unwrap();
152 fill_stream(&mut client, id_low);
153 client
154 .stream_priority(
155 id_low,
156 TransmissionPriority::Low,
157 RetransmissionPriority::Higher,
158 )
159 .unwrap();
160
161 let _lost = client.process_output(now).dgram();
162 for _ in 0..5 {
163 match client.process_output(now) {
164 Output::Datagram(d) => server.process_input(d, now),
165 Output::Callback(delay) => now += delay,
166 Output::None => unreachable!(),
167 }
168 }
169
170 // Generate an ACK. The first packet is now considered lost.
171 let ack = server.process_output(now).dgram();
172 let _ = server.events().count(); // Drain events.
173
174 let id_normal = client.stream_create(StreamType::UniDi).unwrap();
175 fill_stream(&mut client, id_normal);
176
177 let dgram = client.process(ack, now).dgram();
178 assert_eq!(client.stats().lost, 1); // Client should have noticed the loss.
179 server.process_input(dgram.unwrap(), now);
180
181 // Only the low priority stream has data as the retransmission of the data from
182 // the lost packet is now more important than new data from the high priority stream.
183 for e in server.events() {
184 println!("Event: {:?}", e);
185 if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
186 assert_eq!(stream_id, id_low);
187 }
188 }
189
190 // However, only the retransmission is prioritized.
191 // Though this might contain some retransmitted data, as other frames might push
192 // the retransmitted data into a second packet, it will also contain data from the
193 // normal priority stream.
194 let dgram = client.process_output(now).dgram();
195 server.process_input(dgram.unwrap(), now);
196 assert!(server.events().any(
197 |e| matches!(e, ConnectionEvent::RecvStreamReadable { stream_id } if stream_id == id_normal),
198 ));
199 }
200
201 #[test]
critical()202 fn critical() {
203 let mut client = default_client();
204 let mut server = default_server();
205 let now = now();
206
207 // Rather than connect, send stream data in 0.5-RTT.
208 // That allows this to test that critical streams pre-empt most frame types.
209 let dgram = client.process_output(now).dgram();
210 let dgram = server.process(dgram, now).dgram();
211 client.process_input(dgram.unwrap(), now);
212 maybe_authenticate(&mut client);
213
214 let id = server.stream_create(StreamType::UniDi).unwrap();
215 server
216 .stream_priority(
217 id,
218 TransmissionPriority::Critical,
219 RetransmissionPriority::default(),
220 )
221 .unwrap();
222
223 // Can't use fill_cwnd here because the server is blocked on the amplification
224 // limit, so it can't fill the congestion window.
225 while server.stream_create(StreamType::UniDi).is_ok() {}
226
227 fill_stream(&mut server, id);
228 let stats_before = server.stats().frame_tx;
229 let dgram = server.process_output(now).dgram();
230 let stats_after = server.stats().frame_tx;
231 assert_eq!(stats_after.crypto, stats_before.crypto);
232 assert_eq!(stats_after.streams_blocked, 0);
233 assert_eq!(stats_after.new_connection_id, 0);
234 assert_eq!(stats_after.new_token, 0);
235 assert_eq!(stats_after.handshake_done, 0);
236
237 // Complete the handshake.
238 let dgram = client.process(dgram, now).dgram();
239 server.process_input(dgram.unwrap(), now);
240
241 // Critical beats everything but HANDSHAKE_DONE.
242 let stats_before = server.stats().frame_tx;
243 mem::drop(fill_cwnd(&mut server, id, now));
244 let stats_after = server.stats().frame_tx;
245 assert_eq!(stats_after.crypto, stats_before.crypto);
246 assert_eq!(stats_after.streams_blocked, 0);
247 assert_eq!(stats_after.new_connection_id, 0);
248 assert_eq!(stats_after.new_token, 0);
249 assert_eq!(stats_after.handshake_done, 1);
250 }
251
252 #[test]
important()253 fn important() {
254 let mut client = default_client();
255 let mut server = default_server();
256 let now = now();
257
258 // Rather than connect, send stream data in 0.5-RTT.
259 // That allows this to test that important streams pre-empt most frame types.
260 let dgram = client.process_output(now).dgram();
261 let dgram = server.process(dgram, now).dgram();
262 client.process_input(dgram.unwrap(), now);
263 maybe_authenticate(&mut client);
264
265 let id = server.stream_create(StreamType::UniDi).unwrap();
266 server
267 .stream_priority(
268 id,
269 TransmissionPriority::Important,
270 RetransmissionPriority::default(),
271 )
272 .unwrap();
273 fill_stream(&mut server, id);
274
275 // Important beats everything but flow control.
276 // Make enough streams to get a STREAMS_BLOCKED frame out.
277 while server.stream_create(StreamType::UniDi).is_ok() {}
278
279 let stats_before = server.stats().frame_tx;
280 let dgram = server.process_output(now).dgram();
281 let stats_after = server.stats().frame_tx;
282 assert_eq!(stats_after.crypto, stats_before.crypto);
283 assert_eq!(stats_after.streams_blocked, 1);
284 assert_eq!(stats_after.new_connection_id, 0);
285 assert_eq!(stats_after.new_token, 0);
286 assert_eq!(stats_after.handshake_done, 0);
287 assert_eq!(stats_after.stream, stats_before.stream + 1);
288
289 // Complete the handshake.
290 let dgram = client.process(dgram, now).dgram();
291 server.process_input(dgram.unwrap(), now);
292
293 // Important beats everything but flow control.
294 let stats_before = server.stats().frame_tx;
295 mem::drop(fill_cwnd(&mut server, id, now));
296 let stats_after = server.stats().frame_tx;
297 assert_eq!(stats_after.crypto, stats_before.crypto);
298 assert_eq!(stats_after.streams_blocked, 1);
299 assert_eq!(stats_after.new_connection_id, 0);
300 assert_eq!(stats_after.new_token, 0);
301 assert_eq!(stats_after.handshake_done, 1);
302 assert!(stats_after.stream > stats_before.stream);
303 }
304
305 #[test]
high_normal()306 fn high_normal() {
307 let mut client = default_client();
308 let mut server = default_server();
309 let now = now();
310
311 // Rather than connect, send stream data in 0.5-RTT.
312 // That allows this to test that important streams pre-empt most frame types.
313 let dgram = client.process_output(now).dgram();
314 let dgram = server.process(dgram, now).dgram();
315 client.process_input(dgram.unwrap(), now);
316 maybe_authenticate(&mut client);
317
318 let id = server.stream_create(StreamType::UniDi).unwrap();
319 server
320 .stream_priority(
321 id,
322 TransmissionPriority::High,
323 RetransmissionPriority::default(),
324 )
325 .unwrap();
326 fill_stream(&mut server, id);
327
328 // Important beats everything but flow control.
329 // Make enough streams to get a STREAMS_BLOCKED frame out.
330 while server.stream_create(StreamType::UniDi).is_ok() {}
331
332 let stats_before = server.stats().frame_tx;
333 let dgram = server.process_output(now).dgram();
334 let stats_after = server.stats().frame_tx;
335 assert_eq!(stats_after.crypto, stats_before.crypto);
336 assert_eq!(stats_after.streams_blocked, 1);
337 assert_eq!(stats_after.new_connection_id, 0);
338 assert_eq!(stats_after.new_token, 0);
339 assert_eq!(stats_after.handshake_done, 0);
340 assert_eq!(stats_after.stream, stats_before.stream + 1);
341
342 // Complete the handshake.
343 let dgram = client.process(dgram, now).dgram();
344 server.process_input(dgram.unwrap(), now);
345
346 // High or Normal doesn't beat NEW_CONNECTION_ID,
347 // but they beat CRYPTO/NEW_TOKEN.
348 let stats_before = server.stats().frame_tx;
349 server.send_ticket(now, &[]).unwrap();
350 mem::drop(fill_cwnd(&mut server, id, now));
351 let stats_after = server.stats().frame_tx;
352 assert_eq!(stats_after.crypto, stats_before.crypto);
353 assert_eq!(stats_after.streams_blocked, 1);
354 assert_ne!(stats_after.new_connection_id, 0); // Note: > 0
355 assert_eq!(stats_after.new_token, 0);
356 assert_eq!(stats_after.handshake_done, 1);
357 assert!(stats_after.stream > stats_before.stream);
358 }
359
360 #[test]
low()361 fn low() {
362 let mut client = default_client();
363 let mut server = default_server();
364 let now = now();
365 // Use address validation; note that we need to hold a strong reference
366 // as the server will only hold a weak reference.
367 let validation = Rc::new(RefCell::new(
368 AddressValidation::new(now, ValidateAddress::Never).unwrap(),
369 ));
370 server.set_validation(Rc::clone(&validation));
371 connect(&mut client, &mut server);
372
373 let id = server.stream_create(StreamType::UniDi).unwrap();
374 server
375 .stream_priority(
376 id,
377 TransmissionPriority::Low,
378 RetransmissionPriority::default(),
379 )
380 .unwrap();
381 fill_stream(&mut server, id);
382
383 // Send a session ticket and make it big enough to require a whole packet.
384 // The resulting CRYPTO frame beats out the stream data.
385 let stats_before = server.stats().frame_tx;
386 server.send_ticket(now, &[0; 2048]).unwrap();
387 mem::drop(server.process_output(now));
388 let stats_after = server.stats().frame_tx;
389 assert_eq!(stats_after.crypto, stats_before.crypto + 1);
390 assert_eq!(stats_after.stream, stats_before.stream);
391
392 // The above can't test if NEW_TOKEN wins because once that fits in a packet,
393 // it is very hard to ensure that the STREAM frame won't also fit.
394 // However, we can ensure that the next packet doesn't consist of just STREAM.
395 let stats_before = server.stats().frame_tx;
396 mem::drop(server.process_output(now));
397 let stats_after = server.stats().frame_tx;
398 assert_eq!(stats_after.crypto, stats_before.crypto + 1);
399 assert_eq!(stats_after.new_token, 1);
400 assert_eq!(stats_after.stream, stats_before.stream + 1);
401 }
402