1 use super::*;
2 
3 use std::time::Instant;
4 use std::usize;
5 
6 /// Tracks Stream related state
7 ///
8 /// # Reference counting
9 ///
10 /// There can be a number of outstanding handles to a single Stream. These are
11 /// tracked using reference counting. The `ref_count` field represents the
12 /// number of outstanding userspace handles that can reach this stream.
13 ///
14 /// It's important to note that when the stream is placed in an internal queue
15 /// (such as an accept queue), this is **not** tracked by a reference count.
16 /// Thus, `ref_count` can be zero and the stream still has to be kept around.
17 #[derive(Debug)]
18 pub(super) struct Stream {
19     /// The h2 stream identifier
20     pub id: StreamId,
21 
22     /// Current state of the stream
23     pub state: State,
24 
25     /// Set to `true` when the stream is counted against the connection's max
26     /// concurrent streams.
27     pub is_counted: bool,
28 
29     /// Number of outstanding handles pointing to this stream
30     pub ref_count: usize,
31 
32     // ===== Fields related to sending =====
33     /// Next node in the accept linked list
34     pub next_pending_send: Option<store::Key>,
35 
36     /// Set to true when the stream is pending accept
37     pub is_pending_send: bool,
38 
39     /// Send data flow control
40     pub send_flow: FlowControl,
41 
42     /// Amount of send capacity that has been requested, but not yet allocated.
43     pub requested_send_capacity: WindowSize,
44 
45     /// Amount of data buffered at the prioritization layer.
46     /// TODO: Technically this could be greater than the window size...
47     pub buffered_send_data: WindowSize,
48 
49     /// Task tracking additional send capacity (i.e. window updates).
50     send_task: Option<task::Task>,
51 
52     /// Frames pending for this stream being sent to the socket
53     pub pending_send: buffer::Deque,
54 
55     /// Next node in the linked list of streams waiting for additional
56     /// connection level capacity.
57     pub next_pending_send_capacity: Option<store::Key>,
58 
59     /// True if the stream is waiting for outbound connection capacity
60     pub is_pending_send_capacity: bool,
61 
62     /// Set to true when the send capacity has been incremented
63     pub send_capacity_inc: bool,
64 
65     /// Next node in the open linked list
66     pub next_open: Option<store::Key>,
67 
68     /// Set to true when the stream is pending to be opened
69     pub is_pending_open: bool,
70 
71     // ===== Fields related to receiving =====
72     /// Next node in the accept linked list
73     pub next_pending_accept: Option<store::Key>,
74 
75     /// Set to true when the stream is pending accept
76     pub is_pending_accept: bool,
77 
78     /// Receive data flow control
79     pub recv_flow: FlowControl,
80 
81     pub in_flight_recv_data: WindowSize,
82 
83     /// Next node in the linked list of streams waiting to send window updates.
84     pub next_window_update: Option<store::Key>,
85 
86     /// True if the stream is waiting to send a window update
87     pub is_pending_window_update: bool,
88 
89     /// The time when this stream may have been locally reset.
90     pub reset_at: Option<Instant>,
91 
92     /// Next node in list of reset streams that should expire eventually
93     pub next_reset_expire: Option<store::Key>,
94 
95     /// Frames pending for this stream to read
96     pub pending_recv: buffer::Deque,
97 
98     /// Task tracking receiving frames
99     pub recv_task: Option<task::Task>,
100 
101     /// The stream's pending push promises
102     pub pending_push_promises: store::Queue<NextAccept>,
103 
104     /// Validate content-length headers
105     pub content_length: ContentLength,
106 }
107 
108 /// State related to validating a stream's content-length
109 #[derive(Debug)]
110 pub enum ContentLength {
111     Omitted,
112     Head,
113     Remaining(u64),
114 }
115 
116 #[derive(Debug)]
117 pub(super) struct NextAccept;
118 
119 #[derive(Debug)]
120 pub(super) struct NextSend;
121 
122 #[derive(Debug)]
123 pub(super) struct NextSendCapacity;
124 
125 #[derive(Debug)]
126 pub(super) struct NextWindowUpdate;
127 
128 #[derive(Debug)]
129 pub(super) struct NextOpen;
130 
131 #[derive(Debug)]
132 pub(super) struct NextResetExpire;
133 
134 impl Stream {
new( id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize, ) -> Stream135     pub fn new(
136         id: StreamId,
137         init_send_window: WindowSize,
138         init_recv_window: WindowSize,
139     ) -> Stream {
140         let mut send_flow = FlowControl::new();
141         let mut recv_flow = FlowControl::new();
142 
143         recv_flow
144             .inc_window(init_recv_window)
145             .ok()
146             .expect("invalid initial receive window");
147         recv_flow.assign_capacity(init_recv_window);
148 
149         send_flow
150             .inc_window(init_send_window)
151             .ok()
152             .expect("invalid initial send window size");
153 
154         Stream {
155             id,
156             state: State::default(),
157             ref_count: 0,
158             is_counted: false,
159 
160             // ===== Fields related to sending =====
161             next_pending_send: None,
162             is_pending_send: false,
163             send_flow: send_flow,
164             requested_send_capacity: 0,
165             buffered_send_data: 0,
166             send_task: None,
167             pending_send: buffer::Deque::new(),
168             is_pending_send_capacity: false,
169             next_pending_send_capacity: None,
170             send_capacity_inc: false,
171             is_pending_open: false,
172             next_open: None,
173 
174             // ===== Fields related to receiving =====
175             next_pending_accept: None,
176             is_pending_accept: false,
177             recv_flow: recv_flow,
178             in_flight_recv_data: 0,
179             next_window_update: None,
180             is_pending_window_update: false,
181             reset_at: None,
182             next_reset_expire: None,
183             pending_recv: buffer::Deque::new(),
184             recv_task: None,
185             pending_push_promises: store::Queue::new(),
186             content_length: ContentLength::Omitted,
187         }
188     }
189 
190     /// Increment the stream's ref count
ref_inc(&mut self)191     pub fn ref_inc(&mut self) {
192         assert!(self.ref_count < usize::MAX);
193         self.ref_count += 1;
194     }
195 
196     /// Decrements the stream's ref count
ref_dec(&mut self)197     pub fn ref_dec(&mut self) {
198         assert!(self.ref_count > 0);
199         self.ref_count -= 1;
200     }
201 
202     /// Returns true if stream is currently being held for some time because of
203     /// a local reset.
is_pending_reset_expiration(&self) -> bool204     pub fn is_pending_reset_expiration(&self) -> bool {
205         self.reset_at.is_some()
206     }
207 
208     /// Returns true if the stream is closed
is_closed(&self) -> bool209     pub fn is_closed(&self) -> bool {
210         // The state has fully transitioned to closed.
211         self.state.is_closed() &&
212             // Because outbound frames transition the stream state before being
213             // buffered, we have to ensure that all frames have been flushed.
214             self.pending_send.is_empty() &&
215             // Sometimes large data frames are sent out in chunks. After a chunk
216             // of the frame is sent, the remainder is pushed back onto the send
217             // queue to be rescheduled.
218             //
219             // Checking for additional buffered data lets us catch this case.
220             self.buffered_send_data == 0
221     }
222 
223     /// Returns true if the stream is no longer in use
is_released(&self) -> bool224     pub fn is_released(&self) -> bool {
225         // The stream is closed and fully flushed
226         self.is_closed() &&
227             // There are no more outstanding references to the stream
228             self.ref_count == 0 &&
229             // The stream is not in any queue
230             !self.is_pending_send && !self.is_pending_send_capacity &&
231             !self.is_pending_accept && !self.is_pending_window_update &&
232             !self.is_pending_open && !self.reset_at.is_some()
233     }
234 
235     /// Returns true when the consumer of the stream has dropped all handles
236     /// (indicating no further interest in the stream) and the stream state is
237     /// not actually closed.
238     ///
239     /// In this case, a reset should be sent.
is_canceled_interest(&self) -> bool240     pub fn is_canceled_interest(&self) -> bool {
241         self.ref_count == 0 && !self.state.is_closed()
242     }
243 
assign_capacity(&mut self, capacity: WindowSize)244     pub fn assign_capacity(&mut self, capacity: WindowSize) {
245         debug_assert!(capacity > 0);
246         self.send_capacity_inc = true;
247         self.send_flow.assign_capacity(capacity);
248 
249         trace!("  assigned capacity to stream; available={}; buffered={}; id={:?}",
250                self.send_flow.available(), self.buffered_send_data, self.id);
251 
252         // Only notify if the capacity exceeds the amount of buffered data
253         if self.send_flow.available() > self.buffered_send_data {
254             trace!("  notifying task");
255             self.notify_send();
256         }
257     }
258 
259     /// Returns `Err` when the decrement cannot be completed due to overflow.
dec_content_length(&mut self, len: usize) -> Result<(), ()>260     pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
261         match self.content_length {
262             ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
263                 Some(val) => *rem = val,
264                 None => return Err(()),
265             },
266             ContentLength::Head => return Err(()),
267             _ => {},
268         }
269 
270         Ok(())
271     }
272 
ensure_content_length_zero(&self) -> Result<(), ()>273     pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
274         match self.content_length {
275             ContentLength::Remaining(0) => Ok(()),
276             ContentLength::Remaining(_) => Err(()),
277             _ => Ok(()),
278         }
279     }
280 
notify_send(&mut self)281     pub fn notify_send(&mut self) {
282         if let Some(task) = self.send_task.take() {
283             task.notify();
284         }
285     }
286 
wait_send(&mut self)287     pub fn wait_send(&mut self) {
288         self.send_task = Some(task::current());
289     }
290 
notify_recv(&mut self)291     pub fn notify_recv(&mut self) {
292         if let Some(task) = self.recv_task.take() {
293             task.notify();
294         }
295     }
296 }
297 
298 impl store::Next for NextAccept {
next(stream: &Stream) -> Option<store::Key>299     fn next(stream: &Stream) -> Option<store::Key> {
300         stream.next_pending_accept
301     }
302 
set_next(stream: &mut Stream, key: Option<store::Key>)303     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
304         stream.next_pending_accept = key;
305     }
306 
take_next(stream: &mut Stream) -> Option<store::Key>307     fn take_next(stream: &mut Stream) -> Option<store::Key> {
308         stream.next_pending_accept.take()
309     }
310 
is_queued(stream: &Stream) -> bool311     fn is_queued(stream: &Stream) -> bool {
312         stream.is_pending_accept
313     }
314 
set_queued(stream: &mut Stream, val: bool)315     fn set_queued(stream: &mut Stream, val: bool) {
316         stream.is_pending_accept = val;
317     }
318 }
319 
320 impl store::Next for NextSend {
next(stream: &Stream) -> Option<store::Key>321     fn next(stream: &Stream) -> Option<store::Key> {
322         stream.next_pending_send
323     }
324 
set_next(stream: &mut Stream, key: Option<store::Key>)325     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
326         stream.next_pending_send = key;
327     }
328 
take_next(stream: &mut Stream) -> Option<store::Key>329     fn take_next(stream: &mut Stream) -> Option<store::Key> {
330         stream.next_pending_send.take()
331     }
332 
is_queued(stream: &Stream) -> bool333     fn is_queued(stream: &Stream) -> bool {
334         stream.is_pending_send
335     }
336 
set_queued(stream: &mut Stream, val: bool)337     fn set_queued(stream: &mut Stream, val: bool) {
338         if val {
339             // ensure that stream is not queued for being opened
340             // if it's being put into queue for sending data
341             debug_assert_eq!(stream.is_pending_open, false);
342         }
343         stream.is_pending_send = val;
344     }
345 }
346 
347 impl store::Next for NextSendCapacity {
next(stream: &Stream) -> Option<store::Key>348     fn next(stream: &Stream) -> Option<store::Key> {
349         stream.next_pending_send_capacity
350     }
351 
set_next(stream: &mut Stream, key: Option<store::Key>)352     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
353         stream.next_pending_send_capacity = key;
354     }
355 
take_next(stream: &mut Stream) -> Option<store::Key>356     fn take_next(stream: &mut Stream) -> Option<store::Key> {
357         stream.next_pending_send_capacity.take()
358     }
359 
is_queued(stream: &Stream) -> bool360     fn is_queued(stream: &Stream) -> bool {
361         stream.is_pending_send_capacity
362     }
363 
set_queued(stream: &mut Stream, val: bool)364     fn set_queued(stream: &mut Stream, val: bool) {
365         stream.is_pending_send_capacity = val;
366     }
367 }
368 
369 impl store::Next for NextWindowUpdate {
next(stream: &Stream) -> Option<store::Key>370     fn next(stream: &Stream) -> Option<store::Key> {
371         stream.next_window_update
372     }
373 
set_next(stream: &mut Stream, key: Option<store::Key>)374     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
375         stream.next_window_update = key;
376     }
377 
take_next(stream: &mut Stream) -> Option<store::Key>378     fn take_next(stream: &mut Stream) -> Option<store::Key> {
379         stream.next_window_update.take()
380     }
381 
is_queued(stream: &Stream) -> bool382     fn is_queued(stream: &Stream) -> bool {
383         stream.is_pending_window_update
384     }
385 
set_queued(stream: &mut Stream, val: bool)386     fn set_queued(stream: &mut Stream, val: bool) {
387         stream.is_pending_window_update = val;
388     }
389 }
390 
391 impl store::Next for NextOpen {
next(stream: &Stream) -> Option<store::Key>392     fn next(stream: &Stream) -> Option<store::Key> {
393         stream.next_open
394     }
395 
set_next(stream: &mut Stream, key: Option<store::Key>)396     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
397         stream.next_open = key;
398     }
399 
take_next(stream: &mut Stream) -> Option<store::Key>400     fn take_next(stream: &mut Stream) -> Option<store::Key> {
401         stream.next_open.take()
402     }
403 
is_queued(stream: &Stream) -> bool404     fn is_queued(stream: &Stream) -> bool {
405         stream.is_pending_open
406     }
407 
set_queued(stream: &mut Stream, val: bool)408     fn set_queued(stream: &mut Stream, val: bool) {
409         if val {
410             // ensure that stream is not queued for being sent
411             // if it's being put into queue for opening the stream
412             debug_assert_eq!(stream.is_pending_send, false);
413         }
414         stream.is_pending_open = val;
415     }
416 }
417 
418 impl store::Next for NextResetExpire {
next(stream: &Stream) -> Option<store::Key>419     fn next(stream: &Stream) -> Option<store::Key> {
420         stream.next_reset_expire
421     }
422 
set_next(stream: &mut Stream, key: Option<store::Key>)423     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
424         stream.next_reset_expire = key;
425     }
426 
take_next(stream: &mut Stream) -> Option<store::Key>427     fn take_next(stream: &mut Stream) -> Option<store::Key> {
428         stream.next_reset_expire.take()
429     }
430 
is_queued(stream: &Stream) -> bool431     fn is_queued(stream: &Stream) -> bool {
432         stream.reset_at.is_some()
433     }
434 
set_queued(stream: &mut Stream, val: bool)435     fn set_queued(stream: &mut Stream, val: bool) {
436         if val {
437             stream.reset_at = Some(Instant::now());
438         } else {
439             stream.reset_at = None;
440         }
441     }
442 }
443 
444 // ===== impl ContentLength =====
445 
446 impl ContentLength {
is_head(&self) -> bool447     pub fn is_head(&self) -> bool {
448         match *self {
449             ContentLength::Head => true,
450             _ => false,
451         }
452     }
453 }
454