1 use super::*; 2 3 use std::time::Instant; 4 use std::usize; 5 6 /// Tracks Stream related state 7 /// 8 /// # Reference counting 9 /// 10 /// There can be a number of outstanding handles to a single Stream. These are 11 /// tracked using reference counting. The `ref_count` field represents the 12 /// number of outstanding userspace handles that can reach this stream. 13 /// 14 /// It's important to note that when the stream is placed in an internal queue 15 /// (such as an accept queue), this is **not** tracked by a reference count. 16 /// Thus, `ref_count` can be zero and the stream still has to be kept around. 17 #[derive(Debug)] 18 pub(super) struct Stream { 19 /// The h2 stream identifier 20 pub id: StreamId, 21 22 /// Current state of the stream 23 pub state: State, 24 25 /// Set to `true` when the stream is counted against the connection's max 26 /// concurrent streams. 27 pub is_counted: bool, 28 29 /// Number of outstanding handles pointing to this stream 30 pub ref_count: usize, 31 32 // ===== Fields related to sending ===== 33 /// Next node in the accept linked list 34 pub next_pending_send: Option<store::Key>, 35 36 /// Set to true when the stream is pending accept 37 pub is_pending_send: bool, 38 39 /// Send data flow control 40 pub send_flow: FlowControl, 41 42 /// Amount of send capacity that has been requested, but not yet allocated. 43 pub requested_send_capacity: WindowSize, 44 45 /// Amount of data buffered at the prioritization layer. 46 /// TODO: Technically this could be greater than the window size... 47 pub buffered_send_data: WindowSize, 48 49 /// Task tracking additional send capacity (i.e. window updates). 50 send_task: Option<task::Task>, 51 52 /// Frames pending for this stream being sent to the socket 53 pub pending_send: buffer::Deque, 54 55 /// Next node in the linked list of streams waiting for additional 56 /// connection level capacity. 57 pub next_pending_send_capacity: Option<store::Key>, 58 59 /// True if the stream is waiting for outbound connection capacity 60 pub is_pending_send_capacity: bool, 61 62 /// Set to true when the send capacity has been incremented 63 pub send_capacity_inc: bool, 64 65 /// Next node in the open linked list 66 pub next_open: Option<store::Key>, 67 68 /// Set to true when the stream is pending to be opened 69 pub is_pending_open: bool, 70 71 // ===== Fields related to receiving ===== 72 /// Next node in the accept linked list 73 pub next_pending_accept: Option<store::Key>, 74 75 /// Set to true when the stream is pending accept 76 pub is_pending_accept: bool, 77 78 /// Receive data flow control 79 pub recv_flow: FlowControl, 80 81 pub in_flight_recv_data: WindowSize, 82 83 /// Next node in the linked list of streams waiting to send window updates. 84 pub next_window_update: Option<store::Key>, 85 86 /// True if the stream is waiting to send a window update 87 pub is_pending_window_update: bool, 88 89 /// The time when this stream may have been locally reset. 90 pub reset_at: Option<Instant>, 91 92 /// Next node in list of reset streams that should expire eventually 93 pub next_reset_expire: Option<store::Key>, 94 95 /// Frames pending for this stream to read 96 pub pending_recv: buffer::Deque, 97 98 /// Task tracking receiving frames 99 pub recv_task: Option<task::Task>, 100 101 /// The stream's pending push promises 102 pub pending_push_promises: store::Queue<NextAccept>, 103 104 /// Validate content-length headers 105 pub content_length: ContentLength, 106 } 107 108 /// State related to validating a stream's content-length 109 #[derive(Debug)] 110 pub enum ContentLength { 111 Omitted, 112 Head, 113 Remaining(u64), 114 } 115 116 #[derive(Debug)] 117 pub(super) struct NextAccept; 118 119 #[derive(Debug)] 120 pub(super) struct NextSend; 121 122 #[derive(Debug)] 123 pub(super) struct NextSendCapacity; 124 125 #[derive(Debug)] 126 pub(super) struct NextWindowUpdate; 127 128 #[derive(Debug)] 129 pub(super) struct NextOpen; 130 131 #[derive(Debug)] 132 pub(super) struct NextResetExpire; 133 134 impl Stream { new( id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize, ) -> Stream135 pub fn new( 136 id: StreamId, 137 init_send_window: WindowSize, 138 init_recv_window: WindowSize, 139 ) -> Stream { 140 let mut send_flow = FlowControl::new(); 141 let mut recv_flow = FlowControl::new(); 142 143 recv_flow 144 .inc_window(init_recv_window) 145 .ok() 146 .expect("invalid initial receive window"); 147 recv_flow.assign_capacity(init_recv_window); 148 149 send_flow 150 .inc_window(init_send_window) 151 .ok() 152 .expect("invalid initial send window size"); 153 154 Stream { 155 id, 156 state: State::default(), 157 ref_count: 0, 158 is_counted: false, 159 160 // ===== Fields related to sending ===== 161 next_pending_send: None, 162 is_pending_send: false, 163 send_flow: send_flow, 164 requested_send_capacity: 0, 165 buffered_send_data: 0, 166 send_task: None, 167 pending_send: buffer::Deque::new(), 168 is_pending_send_capacity: false, 169 next_pending_send_capacity: None, 170 send_capacity_inc: false, 171 is_pending_open: false, 172 next_open: None, 173 174 // ===== Fields related to receiving ===== 175 next_pending_accept: None, 176 is_pending_accept: false, 177 recv_flow: recv_flow, 178 in_flight_recv_data: 0, 179 next_window_update: None, 180 is_pending_window_update: false, 181 reset_at: None, 182 next_reset_expire: None, 183 pending_recv: buffer::Deque::new(), 184 recv_task: None, 185 pending_push_promises: store::Queue::new(), 186 content_length: ContentLength::Omitted, 187 } 188 } 189 190 /// Increment the stream's ref count ref_inc(&mut self)191 pub fn ref_inc(&mut self) { 192 assert!(self.ref_count < usize::MAX); 193 self.ref_count += 1; 194 } 195 196 /// Decrements the stream's ref count ref_dec(&mut self)197 pub fn ref_dec(&mut self) { 198 assert!(self.ref_count > 0); 199 self.ref_count -= 1; 200 } 201 202 /// Returns true if stream is currently being held for some time because of 203 /// a local reset. is_pending_reset_expiration(&self) -> bool204 pub fn is_pending_reset_expiration(&self) -> bool { 205 self.reset_at.is_some() 206 } 207 208 /// Returns true if the stream is closed is_closed(&self) -> bool209 pub fn is_closed(&self) -> bool { 210 // The state has fully transitioned to closed. 211 self.state.is_closed() && 212 // Because outbound frames transition the stream state before being 213 // buffered, we have to ensure that all frames have been flushed. 214 self.pending_send.is_empty() && 215 // Sometimes large data frames are sent out in chunks. After a chunk 216 // of the frame is sent, the remainder is pushed back onto the send 217 // queue to be rescheduled. 218 // 219 // Checking for additional buffered data lets us catch this case. 220 self.buffered_send_data == 0 221 } 222 223 /// Returns true if the stream is no longer in use is_released(&self) -> bool224 pub fn is_released(&self) -> bool { 225 // The stream is closed and fully flushed 226 self.is_closed() && 227 // There are no more outstanding references to the stream 228 self.ref_count == 0 && 229 // The stream is not in any queue 230 !self.is_pending_send && !self.is_pending_send_capacity && 231 !self.is_pending_accept && !self.is_pending_window_update && 232 !self.is_pending_open && !self.reset_at.is_some() 233 } 234 235 /// Returns true when the consumer of the stream has dropped all handles 236 /// (indicating no further interest in the stream) and the stream state is 237 /// not actually closed. 238 /// 239 /// In this case, a reset should be sent. is_canceled_interest(&self) -> bool240 pub fn is_canceled_interest(&self) -> bool { 241 self.ref_count == 0 && !self.state.is_closed() 242 } 243 assign_capacity(&mut self, capacity: WindowSize)244 pub fn assign_capacity(&mut self, capacity: WindowSize) { 245 debug_assert!(capacity > 0); 246 self.send_capacity_inc = true; 247 self.send_flow.assign_capacity(capacity); 248 249 trace!(" assigned capacity to stream; available={}; buffered={}; id={:?}", 250 self.send_flow.available(), self.buffered_send_data, self.id); 251 252 // Only notify if the capacity exceeds the amount of buffered data 253 if self.send_flow.available() > self.buffered_send_data { 254 trace!(" notifying task"); 255 self.notify_send(); 256 } 257 } 258 259 /// Returns `Err` when the decrement cannot be completed due to overflow. dec_content_length(&mut self, len: usize) -> Result<(), ()>260 pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { 261 match self.content_length { 262 ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { 263 Some(val) => *rem = val, 264 None => return Err(()), 265 }, 266 ContentLength::Head => return Err(()), 267 _ => {}, 268 } 269 270 Ok(()) 271 } 272 ensure_content_length_zero(&self) -> Result<(), ()>273 pub fn ensure_content_length_zero(&self) -> Result<(), ()> { 274 match self.content_length { 275 ContentLength::Remaining(0) => Ok(()), 276 ContentLength::Remaining(_) => Err(()), 277 _ => Ok(()), 278 } 279 } 280 notify_send(&mut self)281 pub fn notify_send(&mut self) { 282 if let Some(task) = self.send_task.take() { 283 task.notify(); 284 } 285 } 286 wait_send(&mut self)287 pub fn wait_send(&mut self) { 288 self.send_task = Some(task::current()); 289 } 290 notify_recv(&mut self)291 pub fn notify_recv(&mut self) { 292 if let Some(task) = self.recv_task.take() { 293 task.notify(); 294 } 295 } 296 } 297 298 impl store::Next for NextAccept { next(stream: &Stream) -> Option<store::Key>299 fn next(stream: &Stream) -> Option<store::Key> { 300 stream.next_pending_accept 301 } 302 set_next(stream: &mut Stream, key: Option<store::Key>)303 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 304 stream.next_pending_accept = key; 305 } 306 take_next(stream: &mut Stream) -> Option<store::Key>307 fn take_next(stream: &mut Stream) -> Option<store::Key> { 308 stream.next_pending_accept.take() 309 } 310 is_queued(stream: &Stream) -> bool311 fn is_queued(stream: &Stream) -> bool { 312 stream.is_pending_accept 313 } 314 set_queued(stream: &mut Stream, val: bool)315 fn set_queued(stream: &mut Stream, val: bool) { 316 stream.is_pending_accept = val; 317 } 318 } 319 320 impl store::Next for NextSend { next(stream: &Stream) -> Option<store::Key>321 fn next(stream: &Stream) -> Option<store::Key> { 322 stream.next_pending_send 323 } 324 set_next(stream: &mut Stream, key: Option<store::Key>)325 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 326 stream.next_pending_send = key; 327 } 328 take_next(stream: &mut Stream) -> Option<store::Key>329 fn take_next(stream: &mut Stream) -> Option<store::Key> { 330 stream.next_pending_send.take() 331 } 332 is_queued(stream: &Stream) -> bool333 fn is_queued(stream: &Stream) -> bool { 334 stream.is_pending_send 335 } 336 set_queued(stream: &mut Stream, val: bool)337 fn set_queued(stream: &mut Stream, val: bool) { 338 if val { 339 // ensure that stream is not queued for being opened 340 // if it's being put into queue for sending data 341 debug_assert_eq!(stream.is_pending_open, false); 342 } 343 stream.is_pending_send = val; 344 } 345 } 346 347 impl store::Next for NextSendCapacity { next(stream: &Stream) -> Option<store::Key>348 fn next(stream: &Stream) -> Option<store::Key> { 349 stream.next_pending_send_capacity 350 } 351 set_next(stream: &mut Stream, key: Option<store::Key>)352 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 353 stream.next_pending_send_capacity = key; 354 } 355 take_next(stream: &mut Stream) -> Option<store::Key>356 fn take_next(stream: &mut Stream) -> Option<store::Key> { 357 stream.next_pending_send_capacity.take() 358 } 359 is_queued(stream: &Stream) -> bool360 fn is_queued(stream: &Stream) -> bool { 361 stream.is_pending_send_capacity 362 } 363 set_queued(stream: &mut Stream, val: bool)364 fn set_queued(stream: &mut Stream, val: bool) { 365 stream.is_pending_send_capacity = val; 366 } 367 } 368 369 impl store::Next for NextWindowUpdate { next(stream: &Stream) -> Option<store::Key>370 fn next(stream: &Stream) -> Option<store::Key> { 371 stream.next_window_update 372 } 373 set_next(stream: &mut Stream, key: Option<store::Key>)374 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 375 stream.next_window_update = key; 376 } 377 take_next(stream: &mut Stream) -> Option<store::Key>378 fn take_next(stream: &mut Stream) -> Option<store::Key> { 379 stream.next_window_update.take() 380 } 381 is_queued(stream: &Stream) -> bool382 fn is_queued(stream: &Stream) -> bool { 383 stream.is_pending_window_update 384 } 385 set_queued(stream: &mut Stream, val: bool)386 fn set_queued(stream: &mut Stream, val: bool) { 387 stream.is_pending_window_update = val; 388 } 389 } 390 391 impl store::Next for NextOpen { next(stream: &Stream) -> Option<store::Key>392 fn next(stream: &Stream) -> Option<store::Key> { 393 stream.next_open 394 } 395 set_next(stream: &mut Stream, key: Option<store::Key>)396 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 397 stream.next_open = key; 398 } 399 take_next(stream: &mut Stream) -> Option<store::Key>400 fn take_next(stream: &mut Stream) -> Option<store::Key> { 401 stream.next_open.take() 402 } 403 is_queued(stream: &Stream) -> bool404 fn is_queued(stream: &Stream) -> bool { 405 stream.is_pending_open 406 } 407 set_queued(stream: &mut Stream, val: bool)408 fn set_queued(stream: &mut Stream, val: bool) { 409 if val { 410 // ensure that stream is not queued for being sent 411 // if it's being put into queue for opening the stream 412 debug_assert_eq!(stream.is_pending_send, false); 413 } 414 stream.is_pending_open = val; 415 } 416 } 417 418 impl store::Next for NextResetExpire { next(stream: &Stream) -> Option<store::Key>419 fn next(stream: &Stream) -> Option<store::Key> { 420 stream.next_reset_expire 421 } 422 set_next(stream: &mut Stream, key: Option<store::Key>)423 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 424 stream.next_reset_expire = key; 425 } 426 take_next(stream: &mut Stream) -> Option<store::Key>427 fn take_next(stream: &mut Stream) -> Option<store::Key> { 428 stream.next_reset_expire.take() 429 } 430 is_queued(stream: &Stream) -> bool431 fn is_queued(stream: &Stream) -> bool { 432 stream.reset_at.is_some() 433 } 434 set_queued(stream: &mut Stream, val: bool)435 fn set_queued(stream: &mut Stream, val: bool) { 436 if val { 437 stream.reset_at = Some(Instant::now()); 438 } else { 439 stream.reset_at = None; 440 } 441 } 442 } 443 444 // ===== impl ContentLength ===== 445 446 impl ContentLength { is_head(&self) -> bool447 pub fn is_head(&self) -> bool { 448 match *self { 449 ContentLength::Head => true, 450 _ => false, 451 } 452 } 453 } 454