1 use super::*; 2 3 use std::task::{Context, Waker}; 4 use std::time::Instant; 5 use std::usize; 6 7 /// Tracks Stream related state 8 /// 9 /// # Reference counting 10 /// 11 /// There can be a number of outstanding handles to a single Stream. These are 12 /// tracked using reference counting. The `ref_count` field represents the 13 /// number of outstanding userspace handles that can reach this stream. 14 /// 15 /// It's important to note that when the stream is placed in an internal queue 16 /// (such as an accept queue), this is **not** tracked by a reference count. 17 /// Thus, `ref_count` can be zero and the stream still has to be kept around. 18 #[derive(Debug)] 19 pub(super) struct Stream { 20 /// The h2 stream identifier 21 pub id: StreamId, 22 23 /// Current state of the stream 24 pub state: State, 25 26 /// Set to `true` when the stream is counted against the connection's max 27 /// concurrent streams. 28 pub is_counted: bool, 29 30 /// Number of outstanding handles pointing to this stream 31 pub ref_count: usize, 32 33 // ===== Fields related to sending ===== 34 /// Next node in the accept linked list 35 pub next_pending_send: Option<store::Key>, 36 37 /// Set to true when the stream is pending accept 38 pub is_pending_send: bool, 39 40 /// Send data flow control 41 pub send_flow: FlowControl, 42 43 /// Amount of send capacity that has been requested, but not yet allocated. 44 pub requested_send_capacity: WindowSize, 45 46 /// Amount of data buffered at the prioritization layer. 47 /// TODO: Technically this could be greater than the window size... 48 pub buffered_send_data: usize, 49 50 /// Task tracking additional send capacity (i.e. window updates). 51 send_task: Option<Waker>, 52 53 /// Frames pending for this stream being sent to the socket 54 pub pending_send: buffer::Deque, 55 56 /// Next node in the linked list of streams waiting for additional 57 /// connection level capacity. 58 pub next_pending_send_capacity: Option<store::Key>, 59 60 /// True if the stream is waiting for outbound connection capacity 61 pub is_pending_send_capacity: bool, 62 63 /// Set to true when the send capacity has been incremented 64 pub send_capacity_inc: bool, 65 66 /// Next node in the open linked list 67 pub next_open: Option<store::Key>, 68 69 /// Set to true when the stream is pending to be opened 70 pub is_pending_open: bool, 71 72 /// Set to true when a push is pending for this stream 73 pub is_pending_push: bool, 74 75 // ===== Fields related to receiving ===== 76 /// Next node in the accept linked list 77 pub next_pending_accept: Option<store::Key>, 78 79 /// Set to true when the stream is pending accept 80 pub is_pending_accept: bool, 81 82 /// Receive data flow control 83 pub recv_flow: FlowControl, 84 85 pub in_flight_recv_data: WindowSize, 86 87 /// Next node in the linked list of streams waiting to send window updates. 88 pub next_window_update: Option<store::Key>, 89 90 /// True if the stream is waiting to send a window update 91 pub is_pending_window_update: bool, 92 93 /// The time when this stream may have been locally reset. 94 pub reset_at: Option<Instant>, 95 96 /// Next node in list of reset streams that should expire eventually 97 pub next_reset_expire: Option<store::Key>, 98 99 /// Frames pending for this stream to read 100 pub pending_recv: buffer::Deque, 101 102 /// Task tracking receiving frames 103 pub recv_task: Option<Waker>, 104 105 /// The stream's pending push promises 106 pub pending_push_promises: store::Queue<NextAccept>, 107 108 /// Validate content-length headers 109 pub content_length: ContentLength, 110 } 111 112 /// State related to validating a stream's content-length 113 #[derive(Debug)] 114 pub enum ContentLength { 115 Omitted, 116 Head, 117 Remaining(u64), 118 } 119 120 #[derive(Debug)] 121 pub(super) struct NextAccept; 122 123 #[derive(Debug)] 124 pub(super) struct NextSend; 125 126 #[derive(Debug)] 127 pub(super) struct NextSendCapacity; 128 129 #[derive(Debug)] 130 pub(super) struct NextWindowUpdate; 131 132 #[derive(Debug)] 133 pub(super) struct NextOpen; 134 135 #[derive(Debug)] 136 pub(super) struct NextResetExpire; 137 138 impl Stream { new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream139 pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { 140 let mut send_flow = FlowControl::new(); 141 let mut recv_flow = FlowControl::new(); 142 143 recv_flow 144 .inc_window(init_recv_window) 145 .expect("invalid initial receive window"); 146 recv_flow.assign_capacity(init_recv_window); 147 148 send_flow 149 .inc_window(init_send_window) 150 .expect("invalid initial send window size"); 151 152 Stream { 153 id, 154 state: State::default(), 155 ref_count: 0, 156 is_counted: false, 157 158 // ===== Fields related to sending ===== 159 next_pending_send: None, 160 is_pending_send: false, 161 send_flow, 162 requested_send_capacity: 0, 163 buffered_send_data: 0, 164 send_task: None, 165 pending_send: buffer::Deque::new(), 166 is_pending_send_capacity: false, 167 next_pending_send_capacity: None, 168 send_capacity_inc: false, 169 is_pending_open: false, 170 next_open: None, 171 is_pending_push: false, 172 173 // ===== Fields related to receiving ===== 174 next_pending_accept: None, 175 is_pending_accept: false, 176 recv_flow, 177 in_flight_recv_data: 0, 178 next_window_update: None, 179 is_pending_window_update: false, 180 reset_at: None, 181 next_reset_expire: None, 182 pending_recv: buffer::Deque::new(), 183 recv_task: None, 184 pending_push_promises: store::Queue::new(), 185 content_length: ContentLength::Omitted, 186 } 187 } 188 189 /// Increment the stream's ref count ref_inc(&mut self)190 pub fn ref_inc(&mut self) { 191 assert!(self.ref_count < usize::MAX); 192 self.ref_count += 1; 193 } 194 195 /// Decrements the stream's ref count ref_dec(&mut self)196 pub fn ref_dec(&mut self) { 197 assert!(self.ref_count > 0); 198 self.ref_count -= 1; 199 } 200 201 /// Returns true if stream is currently being held for some time because of 202 /// a local reset. is_pending_reset_expiration(&self) -> bool203 pub fn is_pending_reset_expiration(&self) -> bool { 204 self.reset_at.is_some() 205 } 206 207 /// Returns true if frames for this stream are ready to be sent over the wire is_send_ready(&self) -> bool208 pub fn is_send_ready(&self) -> bool { 209 // Why do we check pending_open? 210 // 211 // We allow users to call send_request() which schedules a stream to be pending_open 212 // if there is no room according to the concurrency limit (max_send_streams), and we 213 // also allow data to be buffered for send with send_data() if there is no capacity for 214 // the stream to send the data, which attempts to place the stream in pending_send. 215 // If the stream is not open, we don't want the stream to be scheduled for 216 // execution (pending_send). Note that if the stream is in pending_open, it will be 217 // pushed to pending_send when there is room for an open stream. 218 // 219 // In pending_push we track whether a PushPromise still needs to be sent 220 // from a different stream before we can start sending frames on this one. 221 // This is different from the "open" check because reserved streams don't count 222 // toward the concurrency limit. 223 // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 224 !self.is_pending_open && !self.is_pending_push 225 } 226 227 /// Returns true if the stream is closed is_closed(&self) -> bool228 pub fn is_closed(&self) -> bool { 229 // The state has fully transitioned to closed. 230 self.state.is_closed() && 231 // Because outbound frames transition the stream state before being 232 // buffered, we have to ensure that all frames have been flushed. 233 self.pending_send.is_empty() && 234 // Sometimes large data frames are sent out in chunks. After a chunk 235 // of the frame is sent, the remainder is pushed back onto the send 236 // queue to be rescheduled. 237 // 238 // Checking for additional buffered data lets us catch this case. 239 self.buffered_send_data == 0 240 } 241 242 /// Returns true if the stream is no longer in use is_released(&self) -> bool243 pub fn is_released(&self) -> bool { 244 // The stream is closed and fully flushed 245 self.is_closed() && 246 // There are no more outstanding references to the stream 247 self.ref_count == 0 && 248 // The stream is not in any queue 249 !self.is_pending_send && !self.is_pending_send_capacity && 250 !self.is_pending_accept && !self.is_pending_window_update && 251 !self.is_pending_open && !self.reset_at.is_some() 252 } 253 254 /// Returns true when the consumer of the stream has dropped all handles 255 /// (indicating no further interest in the stream) and the stream state is 256 /// not actually closed. 257 /// 258 /// In this case, a reset should be sent. is_canceled_interest(&self) -> bool259 pub fn is_canceled_interest(&self) -> bool { 260 self.ref_count == 0 && !self.state.is_closed() 261 } 262 assign_capacity(&mut self, capacity: WindowSize)263 pub fn assign_capacity(&mut self, capacity: WindowSize) { 264 debug_assert!(capacity > 0); 265 self.send_capacity_inc = true; 266 self.send_flow.assign_capacity(capacity); 267 268 tracing::trace!( 269 " assigned capacity to stream; available={}; buffered={}; id={:?}", 270 self.send_flow.available(), 271 self.buffered_send_data, 272 self.id 273 ); 274 275 // Only notify if the capacity exceeds the amount of buffered data 276 if self.send_flow.available() > self.buffered_send_data { 277 tracing::trace!(" notifying task"); 278 self.notify_send(); 279 } 280 } 281 282 /// If the capacity was limited because of the max_send_buffer_size, 283 /// then consider waking the send task again... notify_if_can_buffer_more(&mut self)284 pub fn notify_if_can_buffer_more(&mut self) { 285 // Only notify if the capacity exceeds the amount of buffered data 286 if self.send_flow.available() > self.buffered_send_data { 287 self.send_capacity_inc = true; 288 tracing::trace!(" notifying task"); 289 self.notify_send(); 290 } 291 } 292 293 /// Returns `Err` when the decrement cannot be completed due to overflow. dec_content_length(&mut self, len: usize) -> Result<(), ()>294 pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { 295 match self.content_length { 296 ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { 297 Some(val) => *rem = val, 298 None => return Err(()), 299 }, 300 ContentLength::Head => { 301 if len != 0 { 302 return Err(()); 303 } 304 } 305 _ => {} 306 } 307 308 Ok(()) 309 } 310 ensure_content_length_zero(&self) -> Result<(), ()>311 pub fn ensure_content_length_zero(&self) -> Result<(), ()> { 312 match self.content_length { 313 ContentLength::Remaining(0) => Ok(()), 314 ContentLength::Remaining(_) => Err(()), 315 _ => Ok(()), 316 } 317 } 318 notify_send(&mut self)319 pub fn notify_send(&mut self) { 320 if let Some(task) = self.send_task.take() { 321 task.wake(); 322 } 323 } 324 wait_send(&mut self, cx: &Context)325 pub fn wait_send(&mut self, cx: &Context) { 326 self.send_task = Some(cx.waker().clone()); 327 } 328 notify_recv(&mut self)329 pub fn notify_recv(&mut self) { 330 if let Some(task) = self.recv_task.take() { 331 task.wake(); 332 } 333 } 334 } 335 336 impl store::Next for NextAccept { next(stream: &Stream) -> Option<store::Key>337 fn next(stream: &Stream) -> Option<store::Key> { 338 stream.next_pending_accept 339 } 340 set_next(stream: &mut Stream, key: Option<store::Key>)341 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 342 stream.next_pending_accept = key; 343 } 344 take_next(stream: &mut Stream) -> Option<store::Key>345 fn take_next(stream: &mut Stream) -> Option<store::Key> { 346 stream.next_pending_accept.take() 347 } 348 is_queued(stream: &Stream) -> bool349 fn is_queued(stream: &Stream) -> bool { 350 stream.is_pending_accept 351 } 352 set_queued(stream: &mut Stream, val: bool)353 fn set_queued(stream: &mut Stream, val: bool) { 354 stream.is_pending_accept = val; 355 } 356 } 357 358 impl store::Next for NextSend { next(stream: &Stream) -> Option<store::Key>359 fn next(stream: &Stream) -> Option<store::Key> { 360 stream.next_pending_send 361 } 362 set_next(stream: &mut Stream, key: Option<store::Key>)363 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 364 stream.next_pending_send = key; 365 } 366 take_next(stream: &mut Stream) -> Option<store::Key>367 fn take_next(stream: &mut Stream) -> Option<store::Key> { 368 stream.next_pending_send.take() 369 } 370 is_queued(stream: &Stream) -> bool371 fn is_queued(stream: &Stream) -> bool { 372 stream.is_pending_send 373 } 374 set_queued(stream: &mut Stream, val: bool)375 fn set_queued(stream: &mut Stream, val: bool) { 376 if val { 377 // ensure that stream is not queued for being opened 378 // if it's being put into queue for sending data 379 debug_assert_eq!(stream.is_pending_open, false); 380 } 381 stream.is_pending_send = val; 382 } 383 } 384 385 impl store::Next for NextSendCapacity { next(stream: &Stream) -> Option<store::Key>386 fn next(stream: &Stream) -> Option<store::Key> { 387 stream.next_pending_send_capacity 388 } 389 set_next(stream: &mut Stream, key: Option<store::Key>)390 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 391 stream.next_pending_send_capacity = key; 392 } 393 take_next(stream: &mut Stream) -> Option<store::Key>394 fn take_next(stream: &mut Stream) -> Option<store::Key> { 395 stream.next_pending_send_capacity.take() 396 } 397 is_queued(stream: &Stream) -> bool398 fn is_queued(stream: &Stream) -> bool { 399 stream.is_pending_send_capacity 400 } 401 set_queued(stream: &mut Stream, val: bool)402 fn set_queued(stream: &mut Stream, val: bool) { 403 stream.is_pending_send_capacity = val; 404 } 405 } 406 407 impl store::Next for NextWindowUpdate { next(stream: &Stream) -> Option<store::Key>408 fn next(stream: &Stream) -> Option<store::Key> { 409 stream.next_window_update 410 } 411 set_next(stream: &mut Stream, key: Option<store::Key>)412 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 413 stream.next_window_update = key; 414 } 415 take_next(stream: &mut Stream) -> Option<store::Key>416 fn take_next(stream: &mut Stream) -> Option<store::Key> { 417 stream.next_window_update.take() 418 } 419 is_queued(stream: &Stream) -> bool420 fn is_queued(stream: &Stream) -> bool { 421 stream.is_pending_window_update 422 } 423 set_queued(stream: &mut Stream, val: bool)424 fn set_queued(stream: &mut Stream, val: bool) { 425 stream.is_pending_window_update = val; 426 } 427 } 428 429 impl store::Next for NextOpen { next(stream: &Stream) -> Option<store::Key>430 fn next(stream: &Stream) -> Option<store::Key> { 431 stream.next_open 432 } 433 set_next(stream: &mut Stream, key: Option<store::Key>)434 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 435 stream.next_open = key; 436 } 437 take_next(stream: &mut Stream) -> Option<store::Key>438 fn take_next(stream: &mut Stream) -> Option<store::Key> { 439 stream.next_open.take() 440 } 441 is_queued(stream: &Stream) -> bool442 fn is_queued(stream: &Stream) -> bool { 443 stream.is_pending_open 444 } 445 set_queued(stream: &mut Stream, val: bool)446 fn set_queued(stream: &mut Stream, val: bool) { 447 if val { 448 // ensure that stream is not queued for being sent 449 // if it's being put into queue for opening the stream 450 debug_assert_eq!(stream.is_pending_send, false); 451 } 452 stream.is_pending_open = val; 453 } 454 } 455 456 impl store::Next for NextResetExpire { next(stream: &Stream) -> Option<store::Key>457 fn next(stream: &Stream) -> Option<store::Key> { 458 stream.next_reset_expire 459 } 460 set_next(stream: &mut Stream, key: Option<store::Key>)461 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 462 stream.next_reset_expire = key; 463 } 464 take_next(stream: &mut Stream) -> Option<store::Key>465 fn take_next(stream: &mut Stream) -> Option<store::Key> { 466 stream.next_reset_expire.take() 467 } 468 is_queued(stream: &Stream) -> bool469 fn is_queued(stream: &Stream) -> bool { 470 stream.reset_at.is_some() 471 } 472 set_queued(stream: &mut Stream, val: bool)473 fn set_queued(stream: &mut Stream, val: bool) { 474 if val { 475 stream.reset_at = Some(Instant::now()); 476 } else { 477 stream.reset_at = None; 478 } 479 } 480 } 481 482 // ===== impl ContentLength ===== 483 484 impl ContentLength { is_head(&self) -> bool485 pub fn is_head(&self) -> bool { 486 match *self { 487 ContentLength::Head => true, 488 _ => false, 489 } 490 } 491 } 492