1 use super::*;
2 
3 use std::task::{Context, Waker};
4 use std::time::Instant;
5 use std::usize;
6 
7 /// Tracks Stream related state
8 ///
9 /// # Reference counting
10 ///
11 /// There can be a number of outstanding handles to a single Stream. These are
12 /// tracked using reference counting. The `ref_count` field represents the
13 /// number of outstanding userspace handles that can reach this stream.
14 ///
15 /// It's important to note that when the stream is placed in an internal queue
16 /// (such as an accept queue), this is **not** tracked by a reference count.
17 /// Thus, `ref_count` can be zero and the stream still has to be kept around.
18 #[derive(Debug)]
19 pub(super) struct Stream {
20     /// The h2 stream identifier
21     pub id: StreamId,
22 
23     /// Current state of the stream
24     pub state: State,
25 
26     /// Set to `true` when the stream is counted against the connection's max
27     /// concurrent streams.
28     pub is_counted: bool,
29 
30     /// Number of outstanding handles pointing to this stream
31     pub ref_count: usize,
32 
33     // ===== Fields related to sending =====
34     /// Next node in the accept linked list
35     pub next_pending_send: Option<store::Key>,
36 
37     /// Set to true when the stream is pending accept
38     pub is_pending_send: bool,
39 
40     /// Send data flow control
41     pub send_flow: FlowControl,
42 
43     /// Amount of send capacity that has been requested, but not yet allocated.
44     pub requested_send_capacity: WindowSize,
45 
46     /// Amount of data buffered at the prioritization layer.
47     /// TODO: Technically this could be greater than the window size...
48     pub buffered_send_data: usize,
49 
50     /// Task tracking additional send capacity (i.e. window updates).
51     send_task: Option<Waker>,
52 
53     /// Frames pending for this stream being sent to the socket
54     pub pending_send: buffer::Deque,
55 
56     /// Next node in the linked list of streams waiting for additional
57     /// connection level capacity.
58     pub next_pending_send_capacity: Option<store::Key>,
59 
60     /// True if the stream is waiting for outbound connection capacity
61     pub is_pending_send_capacity: bool,
62 
63     /// Set to true when the send capacity has been incremented
64     pub send_capacity_inc: bool,
65 
66     /// Next node in the open linked list
67     pub next_open: Option<store::Key>,
68 
69     /// Set to true when the stream is pending to be opened
70     pub is_pending_open: bool,
71 
72     /// Set to true when a push is pending for this stream
73     pub is_pending_push: bool,
74 
75     // ===== Fields related to receiving =====
76     /// Next node in the accept linked list
77     pub next_pending_accept: Option<store::Key>,
78 
79     /// Set to true when the stream is pending accept
80     pub is_pending_accept: bool,
81 
82     /// Receive data flow control
83     pub recv_flow: FlowControl,
84 
85     pub in_flight_recv_data: WindowSize,
86 
87     /// Next node in the linked list of streams waiting to send window updates.
88     pub next_window_update: Option<store::Key>,
89 
90     /// True if the stream is waiting to send a window update
91     pub is_pending_window_update: bool,
92 
93     /// The time when this stream may have been locally reset.
94     pub reset_at: Option<Instant>,
95 
96     /// Next node in list of reset streams that should expire eventually
97     pub next_reset_expire: Option<store::Key>,
98 
99     /// Frames pending for this stream to read
100     pub pending_recv: buffer::Deque,
101 
102     /// Task tracking receiving frames
103     pub recv_task: Option<Waker>,
104 
105     /// The stream's pending push promises
106     pub pending_push_promises: store::Queue<NextAccept>,
107 
108     /// Validate content-length headers
109     pub content_length: ContentLength,
110 }
111 
112 /// State related to validating a stream's content-length
113 #[derive(Debug)]
114 pub enum ContentLength {
115     Omitted,
116     Head,
117     Remaining(u64),
118 }
119 
120 #[derive(Debug)]
121 pub(super) struct NextAccept;
122 
123 #[derive(Debug)]
124 pub(super) struct NextSend;
125 
126 #[derive(Debug)]
127 pub(super) struct NextSendCapacity;
128 
129 #[derive(Debug)]
130 pub(super) struct NextWindowUpdate;
131 
132 #[derive(Debug)]
133 pub(super) struct NextOpen;
134 
135 #[derive(Debug)]
136 pub(super) struct NextResetExpire;
137 
138 impl Stream {
new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream139     pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
140         let mut send_flow = FlowControl::new();
141         let mut recv_flow = FlowControl::new();
142 
143         recv_flow
144             .inc_window(init_recv_window)
145             .expect("invalid initial receive window");
146         recv_flow.assign_capacity(init_recv_window);
147 
148         send_flow
149             .inc_window(init_send_window)
150             .expect("invalid initial send window size");
151 
152         Stream {
153             id,
154             state: State::default(),
155             ref_count: 0,
156             is_counted: false,
157 
158             // ===== Fields related to sending =====
159             next_pending_send: None,
160             is_pending_send: false,
161             send_flow,
162             requested_send_capacity: 0,
163             buffered_send_data: 0,
164             send_task: None,
165             pending_send: buffer::Deque::new(),
166             is_pending_send_capacity: false,
167             next_pending_send_capacity: None,
168             send_capacity_inc: false,
169             is_pending_open: false,
170             next_open: None,
171             is_pending_push: false,
172 
173             // ===== Fields related to receiving =====
174             next_pending_accept: None,
175             is_pending_accept: false,
176             recv_flow,
177             in_flight_recv_data: 0,
178             next_window_update: None,
179             is_pending_window_update: false,
180             reset_at: None,
181             next_reset_expire: None,
182             pending_recv: buffer::Deque::new(),
183             recv_task: None,
184             pending_push_promises: store::Queue::new(),
185             content_length: ContentLength::Omitted,
186         }
187     }
188 
189     /// Increment the stream's ref count
ref_inc(&mut self)190     pub fn ref_inc(&mut self) {
191         assert!(self.ref_count < usize::MAX);
192         self.ref_count += 1;
193     }
194 
195     /// Decrements the stream's ref count
ref_dec(&mut self)196     pub fn ref_dec(&mut self) {
197         assert!(self.ref_count > 0);
198         self.ref_count -= 1;
199     }
200 
201     /// Returns true if stream is currently being held for some time because of
202     /// a local reset.
is_pending_reset_expiration(&self) -> bool203     pub fn is_pending_reset_expiration(&self) -> bool {
204         self.reset_at.is_some()
205     }
206 
207     /// Returns true if frames for this stream are ready to be sent over the wire
is_send_ready(&self) -> bool208     pub fn is_send_ready(&self) -> bool {
209         // Why do we check pending_open?
210         //
211         // We allow users to call send_request() which schedules a stream to be pending_open
212         // if there is no room according to the concurrency limit (max_send_streams), and we
213         // also allow data to be buffered for send with send_data() if there is no capacity for
214         // the stream to send the data, which attempts to place the stream in pending_send.
215         // If the stream is not open, we don't want the stream to be scheduled for
216         // execution (pending_send). Note that if the stream is in pending_open, it will be
217         // pushed to pending_send when there is room for an open stream.
218         //
219         // In pending_push we track whether a PushPromise still needs to be sent
220         // from a different stream before we can start sending frames on this one.
221         // This is different from the "open" check because reserved streams don't count
222         // toward the concurrency limit.
223         // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
224         !self.is_pending_open && !self.is_pending_push
225     }
226 
227     /// Returns true if the stream is closed
is_closed(&self) -> bool228     pub fn is_closed(&self) -> bool {
229         // The state has fully transitioned to closed.
230         self.state.is_closed() &&
231             // Because outbound frames transition the stream state before being
232             // buffered, we have to ensure that all frames have been flushed.
233             self.pending_send.is_empty() &&
234             // Sometimes large data frames are sent out in chunks. After a chunk
235             // of the frame is sent, the remainder is pushed back onto the send
236             // queue to be rescheduled.
237             //
238             // Checking for additional buffered data lets us catch this case.
239             self.buffered_send_data == 0
240     }
241 
242     /// Returns true if the stream is no longer in use
is_released(&self) -> bool243     pub fn is_released(&self) -> bool {
244         // The stream is closed and fully flushed
245         self.is_closed() &&
246             // There are no more outstanding references to the stream
247             self.ref_count == 0 &&
248             // The stream is not in any queue
249             !self.is_pending_send && !self.is_pending_send_capacity &&
250             !self.is_pending_accept && !self.is_pending_window_update &&
251             !self.is_pending_open && !self.reset_at.is_some()
252     }
253 
254     /// Returns true when the consumer of the stream has dropped all handles
255     /// (indicating no further interest in the stream) and the stream state is
256     /// not actually closed.
257     ///
258     /// In this case, a reset should be sent.
is_canceled_interest(&self) -> bool259     pub fn is_canceled_interest(&self) -> bool {
260         self.ref_count == 0 && !self.state.is_closed()
261     }
262 
assign_capacity(&mut self, capacity: WindowSize)263     pub fn assign_capacity(&mut self, capacity: WindowSize) {
264         debug_assert!(capacity > 0);
265         self.send_capacity_inc = true;
266         self.send_flow.assign_capacity(capacity);
267 
268         tracing::trace!(
269             "  assigned capacity to stream; available={}; buffered={}; id={:?}",
270             self.send_flow.available(),
271             self.buffered_send_data,
272             self.id
273         );
274 
275         // Only notify if the capacity exceeds the amount of buffered data
276         if self.send_flow.available() > self.buffered_send_data {
277             tracing::trace!("  notifying task");
278             self.notify_send();
279         }
280     }
281 
282     /// If the capacity was limited because of the max_send_buffer_size,
283     /// then consider waking the send task again...
notify_if_can_buffer_more(&mut self)284     pub fn notify_if_can_buffer_more(&mut self) {
285         // Only notify if the capacity exceeds the amount of buffered data
286         if self.send_flow.available() > self.buffered_send_data {
287             self.send_capacity_inc = true;
288             tracing::trace!("  notifying task");
289             self.notify_send();
290         }
291     }
292 
293     /// Returns `Err` when the decrement cannot be completed due to overflow.
dec_content_length(&mut self, len: usize) -> Result<(), ()>294     pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
295         match self.content_length {
296             ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
297                 Some(val) => *rem = val,
298                 None => return Err(()),
299             },
300             ContentLength::Head => {
301                 if len != 0 {
302                     return Err(());
303                 }
304             }
305             _ => {}
306         }
307 
308         Ok(())
309     }
310 
ensure_content_length_zero(&self) -> Result<(), ()>311     pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
312         match self.content_length {
313             ContentLength::Remaining(0) => Ok(()),
314             ContentLength::Remaining(_) => Err(()),
315             _ => Ok(()),
316         }
317     }
318 
notify_send(&mut self)319     pub fn notify_send(&mut self) {
320         if let Some(task) = self.send_task.take() {
321             task.wake();
322         }
323     }
324 
wait_send(&mut self, cx: &Context)325     pub fn wait_send(&mut self, cx: &Context) {
326         self.send_task = Some(cx.waker().clone());
327     }
328 
notify_recv(&mut self)329     pub fn notify_recv(&mut self) {
330         if let Some(task) = self.recv_task.take() {
331             task.wake();
332         }
333     }
334 }
335 
336 impl store::Next for NextAccept {
next(stream: &Stream) -> Option<store::Key>337     fn next(stream: &Stream) -> Option<store::Key> {
338         stream.next_pending_accept
339     }
340 
set_next(stream: &mut Stream, key: Option<store::Key>)341     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
342         stream.next_pending_accept = key;
343     }
344 
take_next(stream: &mut Stream) -> Option<store::Key>345     fn take_next(stream: &mut Stream) -> Option<store::Key> {
346         stream.next_pending_accept.take()
347     }
348 
is_queued(stream: &Stream) -> bool349     fn is_queued(stream: &Stream) -> bool {
350         stream.is_pending_accept
351     }
352 
set_queued(stream: &mut Stream, val: bool)353     fn set_queued(stream: &mut Stream, val: bool) {
354         stream.is_pending_accept = val;
355     }
356 }
357 
358 impl store::Next for NextSend {
next(stream: &Stream) -> Option<store::Key>359     fn next(stream: &Stream) -> Option<store::Key> {
360         stream.next_pending_send
361     }
362 
set_next(stream: &mut Stream, key: Option<store::Key>)363     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
364         stream.next_pending_send = key;
365     }
366 
take_next(stream: &mut Stream) -> Option<store::Key>367     fn take_next(stream: &mut Stream) -> Option<store::Key> {
368         stream.next_pending_send.take()
369     }
370 
is_queued(stream: &Stream) -> bool371     fn is_queued(stream: &Stream) -> bool {
372         stream.is_pending_send
373     }
374 
set_queued(stream: &mut Stream, val: bool)375     fn set_queued(stream: &mut Stream, val: bool) {
376         if val {
377             // ensure that stream is not queued for being opened
378             // if it's being put into queue for sending data
379             debug_assert_eq!(stream.is_pending_open, false);
380         }
381         stream.is_pending_send = val;
382     }
383 }
384 
385 impl store::Next for NextSendCapacity {
next(stream: &Stream) -> Option<store::Key>386     fn next(stream: &Stream) -> Option<store::Key> {
387         stream.next_pending_send_capacity
388     }
389 
set_next(stream: &mut Stream, key: Option<store::Key>)390     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
391         stream.next_pending_send_capacity = key;
392     }
393 
take_next(stream: &mut Stream) -> Option<store::Key>394     fn take_next(stream: &mut Stream) -> Option<store::Key> {
395         stream.next_pending_send_capacity.take()
396     }
397 
is_queued(stream: &Stream) -> bool398     fn is_queued(stream: &Stream) -> bool {
399         stream.is_pending_send_capacity
400     }
401 
set_queued(stream: &mut Stream, val: bool)402     fn set_queued(stream: &mut Stream, val: bool) {
403         stream.is_pending_send_capacity = val;
404     }
405 }
406 
407 impl store::Next for NextWindowUpdate {
next(stream: &Stream) -> Option<store::Key>408     fn next(stream: &Stream) -> Option<store::Key> {
409         stream.next_window_update
410     }
411 
set_next(stream: &mut Stream, key: Option<store::Key>)412     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
413         stream.next_window_update = key;
414     }
415 
take_next(stream: &mut Stream) -> Option<store::Key>416     fn take_next(stream: &mut Stream) -> Option<store::Key> {
417         stream.next_window_update.take()
418     }
419 
is_queued(stream: &Stream) -> bool420     fn is_queued(stream: &Stream) -> bool {
421         stream.is_pending_window_update
422     }
423 
set_queued(stream: &mut Stream, val: bool)424     fn set_queued(stream: &mut Stream, val: bool) {
425         stream.is_pending_window_update = val;
426     }
427 }
428 
429 impl store::Next for NextOpen {
next(stream: &Stream) -> Option<store::Key>430     fn next(stream: &Stream) -> Option<store::Key> {
431         stream.next_open
432     }
433 
set_next(stream: &mut Stream, key: Option<store::Key>)434     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
435         stream.next_open = key;
436     }
437 
take_next(stream: &mut Stream) -> Option<store::Key>438     fn take_next(stream: &mut Stream) -> Option<store::Key> {
439         stream.next_open.take()
440     }
441 
is_queued(stream: &Stream) -> bool442     fn is_queued(stream: &Stream) -> bool {
443         stream.is_pending_open
444     }
445 
set_queued(stream: &mut Stream, val: bool)446     fn set_queued(stream: &mut Stream, val: bool) {
447         if val {
448             // ensure that stream is not queued for being sent
449             // if it's being put into queue for opening the stream
450             debug_assert_eq!(stream.is_pending_send, false);
451         }
452         stream.is_pending_open = val;
453     }
454 }
455 
456 impl store::Next for NextResetExpire {
next(stream: &Stream) -> Option<store::Key>457     fn next(stream: &Stream) -> Option<store::Key> {
458         stream.next_reset_expire
459     }
460 
set_next(stream: &mut Stream, key: Option<store::Key>)461     fn set_next(stream: &mut Stream, key: Option<store::Key>) {
462         stream.next_reset_expire = key;
463     }
464 
take_next(stream: &mut Stream) -> Option<store::Key>465     fn take_next(stream: &mut Stream) -> Option<store::Key> {
466         stream.next_reset_expire.take()
467     }
468 
is_queued(stream: &Stream) -> bool469     fn is_queued(stream: &Stream) -> bool {
470         stream.reset_at.is_some()
471     }
472 
set_queued(stream: &mut Stream, val: bool)473     fn set_queued(stream: &mut Stream, val: bool) {
474         if val {
475             stream.reset_at = Some(Instant::now());
476         } else {
477             stream.reset_at = None;
478         }
479     }
480 }
481 
482 // ===== impl ContentLength =====
483 
484 impl ContentLength {
is_head(&self) -> bool485     pub fn is_head(&self) -> bool {
486         match *self {
487             ContentLength::Head => true,
488             _ => false,
489         }
490     }
491 }
492