1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
6 
7 #![allow(clippy::module_name_repetitions)]
8 
9 use super::Node;
10 use neqo_common::{qtrace, Datagram};
11 use neqo_transport::Output;
12 use std::cmp::max;
13 use std::collections::VecDeque;
14 use std::convert::TryFrom;
15 use std::fmt::{self, Debug};
16 use std::time::{Duration, Instant};
17 
18 /// One second in nanoseconds.
19 const ONE_SECOND_NS: u128 = 1_000_000_000;
20 
21 /// This models a link with a tail drop router at the front of it.
22 pub struct TailDrop {
23     /// An overhead associated with each entry.  This accounts for
24     /// layer 2, IP, and UDP overheads.
25     overhead: usize,
26     /// The rate at which bytes egress the link, in bytes per second.
27     rate: usize,
28     /// The depth of the queue, in bytes.
29     capacity: usize,
30 
31     /// A counter for how many bytes are enqueued.
32     used: usize,
33     /// A queue of unsent bytes.
34     queue: VecDeque<Datagram>,
35     /// The time that the next datagram can enter the link.
36     next_deque: Option<Instant>,
37 
38     /// Any sub-ns delay from the last enqueue.
39     sub_ns_delay: u32,
40     /// The time it takes a byte to exit the other end of the link.
41     delay: Duration,
42     /// The packets that are on the link and when they can be delivered.
43     on_link: VecDeque<(Instant, Datagram)>,
44 
45     /// The number of packets received.
46     received: usize,
47     /// The number of packets dropped.
48     dropped: usize,
49     /// The number of packets delivered.
50     delivered: usize,
51     /// The maximum amount of queue capacity ever used.
52     /// As packets leave the queue as soon as they start being used, this doesn't
53     /// count them.
54     maxq: usize,
55 }
56 
57 impl TailDrop {
58     /// Make a new taildrop node with the given rate, queue capacity, and link delay.
new(rate: usize, capacity: usize, delay: Duration) -> Self59     pub fn new(rate: usize, capacity: usize, delay: Duration) -> Self {
60         Self {
61             overhead: 64,
62             rate,
63             capacity,
64             used: 0,
65             queue: VecDeque::new(),
66             next_deque: None,
67             sub_ns_delay: 0,
68             delay,
69             on_link: VecDeque::new(),
70             received: 0,
71             dropped: 0,
72             delivered: 0,
73             maxq: 0,
74         }
75     }
76 
77     /// A tail drop queue on a 10Mbps link (approximated to 1 million bytes per second)
78     /// with a fat 32k buffer (about 30ms), and the default forward delay of 50ms.
dsl_uplink() -> Self79     pub fn dsl_uplink() -> Self {
80         TailDrop::new(1_000_000, 32_768, Duration::from_millis(50))
81     }
82 
83     /// Cut downlink to one fifth of the uplink (2Mbps), and reduce the buffer to 1/4.
dsl_downlink() -> Self84     pub fn dsl_downlink() -> Self {
85         TailDrop::new(200_000, 8_192, Duration::from_millis(50))
86     }
87 
88     /// How "big" is this datagram, accounting for overheads.
89     /// This approximates by using the same overhead for storing in the queue
90     /// and for sending on the wire.
size(&self, d: &Datagram) -> usize91     fn size(&self, d: &Datagram) -> usize {
92         d.len() + self.overhead
93     }
94 
95     /// Start sending a datagram.
send(&mut self, d: Datagram, now: Instant)96     fn send(&mut self, d: Datagram, now: Instant) {
97         // How many bytes are we "transmitting"?
98         let sz = u128::try_from(self.size(&d)).unwrap();
99 
100         // Calculate how long it takes to put the packet on the link.
101         // Perform the calculation based on 2^32 seconds and save any remainder.
102         // This ensures that high rates and small packets don't result in rounding
103         // down times too badly.
104         // Duration consists of a u64 and a u32, so we have 32 high bits to spare.
105         let t = sz * (ONE_SECOND_NS << 32) / u128::try_from(self.rate).unwrap()
106             + u128::from(self.sub_ns_delay);
107         let send_ns = u64::try_from(t >> 32).unwrap();
108         assert_ne!(send_ns, 0, "sending a packet takes <1ns");
109         self.sub_ns_delay = u32::try_from(t & u128::from(u32::MAX)).unwrap();
110         let deque_time = now + Duration::from_nanos(send_ns);
111         self.next_deque = Some(deque_time);
112 
113         // Now work out when the packet is fully received at the other end of
114         // the link. Setup to deliver the packet then.
115         let delivery_time = deque_time + self.delay;
116         self.on_link.push_back((delivery_time, d));
117     }
118 
119     /// Enqueue for sending.  Maybe.  If this overflows the queue, drop it instead.
maybe_enqueue(&mut self, d: Datagram, now: Instant)120     fn maybe_enqueue(&mut self, d: Datagram, now: Instant) {
121         self.received += 1;
122         if self.next_deque.is_none() {
123             // Nothing in the queue and nothing still sending.
124             debug_assert!(self.queue.is_empty());
125             self.send(d, now);
126         } else if self.used + self.size(&d) <= self.capacity {
127             self.used += self.size(&d);
128             self.maxq = max(self.maxq, self.used);
129             self.queue.push_back(d);
130         } else {
131             qtrace!("taildrop dropping {} bytes", d.len());
132             self.dropped += 1;
133         }
134     }
135 
136     /// If the last packet that was sending has been sent, start sending
137     /// the next one.
maybe_send(&mut self, now: Instant)138     fn maybe_send(&mut self, now: Instant) {
139         if self.next_deque.as_ref().map_or(false, |t| *t <= now) {
140             if let Some(d) = self.queue.pop_front() {
141                 self.used -= self.size(&d);
142                 self.send(d, now);
143             } else {
144                 self.next_deque = None;
145                 self.sub_ns_delay = 0;
146             }
147         }
148     }
149 }
150 
151 impl Node for TailDrop {
process(&mut self, d: Option<Datagram>, now: Instant) -> Output152     fn process(&mut self, d: Option<Datagram>, now: Instant) -> Output {
153         if let Some(dgram) = d {
154             self.maybe_enqueue(dgram, now);
155         }
156 
157         self.maybe_send(now);
158 
159         if let Some((t, _)) = self.on_link.front() {
160             if *t <= now {
161                 let (_, d) = self.on_link.pop_front().unwrap();
162                 self.delivered += 1;
163                 Output::Datagram(d)
164             } else {
165                 Output::Callback(*t - now)
166             }
167         } else {
168             Output::None
169         }
170     }
171 
print_summary(&self, test_name: &str)172     fn print_summary(&self, test_name: &str) {
173         println!(
174             "{}: taildrop: rx {} drop {} tx {} maxq {}",
175             test_name, self.received, self.dropped, self.delivered, self.maxq,
176         );
177     }
178 }
179 
180 impl Debug for TailDrop {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result181     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
182         f.write_str("taildrop")
183     }
184 }
185