1 use super::*; 2 3 use std::task::{Context, Waker}; 4 use std::time::Instant; 5 use std::usize; 6 7 /// Tracks Stream related state 8 /// 9 /// # Reference counting 10 /// 11 /// There can be a number of outstanding handles to a single Stream. These are 12 /// tracked using reference counting. The `ref_count` field represents the 13 /// number of outstanding userspace handles that can reach this stream. 14 /// 15 /// It's important to note that when the stream is placed in an internal queue 16 /// (such as an accept queue), this is **not** tracked by a reference count. 17 /// Thus, `ref_count` can be zero and the stream still has to be kept around. 18 #[derive(Debug)] 19 pub(super) struct Stream { 20 /// The h2 stream identifier 21 pub id: StreamId, 22 23 /// Current state of the stream 24 pub state: State, 25 26 /// Set to `true` when the stream is counted against the connection's max 27 /// concurrent streams. 28 pub is_counted: bool, 29 30 /// Number of outstanding handles pointing to this stream 31 pub ref_count: usize, 32 33 // ===== Fields related to sending ===== 34 /// Next node in the accept linked list 35 pub next_pending_send: Option<store::Key>, 36 37 /// Set to true when the stream is pending accept 38 pub is_pending_send: bool, 39 40 /// Send data flow control 41 pub send_flow: FlowControl, 42 43 /// Amount of send capacity that has been requested, but not yet allocated. 44 pub requested_send_capacity: WindowSize, 45 46 /// Amount of data buffered at the prioritization layer. 47 /// TODO: Technically this could be greater than the window size... 48 pub buffered_send_data: usize, 49 50 /// Task tracking additional send capacity (i.e. window updates). 51 send_task: Option<Waker>, 52 53 /// Frames pending for this stream being sent to the socket 54 pub pending_send: buffer::Deque, 55 56 /// Next node in the linked list of streams waiting for additional 57 /// connection level capacity. 58 pub next_pending_send_capacity: Option<store::Key>, 59 60 /// True if the stream is waiting for outbound connection capacity 61 pub is_pending_send_capacity: bool, 62 63 /// Set to true when the send capacity has been incremented 64 pub send_capacity_inc: bool, 65 66 /// Next node in the open linked list 67 pub next_open: Option<store::Key>, 68 69 /// Set to true when the stream is pending to be opened 70 pub is_pending_open: bool, 71 72 /// Set to true when a push is pending for this stream 73 pub is_pending_push: bool, 74 75 // ===== Fields related to receiving ===== 76 /// Next node in the accept linked list 77 pub next_pending_accept: Option<store::Key>, 78 79 /// Set to true when the stream is pending accept 80 pub is_pending_accept: bool, 81 82 /// Receive data flow control 83 pub recv_flow: FlowControl, 84 85 pub in_flight_recv_data: WindowSize, 86 87 /// Next node in the linked list of streams waiting to send window updates. 88 pub next_window_update: Option<store::Key>, 89 90 /// True if the stream is waiting to send a window update 91 pub is_pending_window_update: bool, 92 93 /// The time when this stream may have been locally reset. 94 pub reset_at: Option<Instant>, 95 96 /// Next node in list of reset streams that should expire eventually 97 pub next_reset_expire: Option<store::Key>, 98 99 /// Frames pending for this stream to read 100 pub pending_recv: buffer::Deque, 101 102 /// Task tracking receiving frames 103 pub recv_task: Option<Waker>, 104 105 /// The stream's pending push promises 106 pub pending_push_promises: store::Queue<NextAccept>, 107 108 /// Validate content-length headers 109 pub content_length: ContentLength, 110 } 111 112 /// State related to validating a stream's content-length 113 #[derive(Debug)] 114 pub enum ContentLength { 115 Omitted, 116 Head, 117 Remaining(u64), 118 } 119 120 #[derive(Debug)] 121 pub(super) struct NextAccept; 122 123 #[derive(Debug)] 124 pub(super) struct NextSend; 125 126 #[derive(Debug)] 127 pub(super) struct NextSendCapacity; 128 129 #[derive(Debug)] 130 pub(super) struct NextWindowUpdate; 131 132 #[derive(Debug)] 133 pub(super) struct NextOpen; 134 135 #[derive(Debug)] 136 pub(super) struct NextResetExpire; 137 138 impl Stream { new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream139 pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { 140 let mut send_flow = FlowControl::new(); 141 let mut recv_flow = FlowControl::new(); 142 143 recv_flow 144 .inc_window(init_recv_window) 145 .expect("invalid initial receive window"); 146 recv_flow.assign_capacity(init_recv_window); 147 148 send_flow 149 .inc_window(init_send_window) 150 .expect("invalid initial send window size"); 151 152 Stream { 153 id, 154 state: State::default(), 155 ref_count: 0, 156 is_counted: false, 157 158 // ===== Fields related to sending ===== 159 next_pending_send: None, 160 is_pending_send: false, 161 send_flow, 162 requested_send_capacity: 0, 163 buffered_send_data: 0, 164 send_task: None, 165 pending_send: buffer::Deque::new(), 166 is_pending_send_capacity: false, 167 next_pending_send_capacity: None, 168 send_capacity_inc: false, 169 is_pending_open: false, 170 next_open: None, 171 is_pending_push: false, 172 173 // ===== Fields related to receiving ===== 174 next_pending_accept: None, 175 is_pending_accept: false, 176 recv_flow, 177 in_flight_recv_data: 0, 178 next_window_update: None, 179 is_pending_window_update: false, 180 reset_at: None, 181 next_reset_expire: None, 182 pending_recv: buffer::Deque::new(), 183 recv_task: None, 184 pending_push_promises: store::Queue::new(), 185 content_length: ContentLength::Omitted, 186 } 187 } 188 189 /// Increment the stream's ref count ref_inc(&mut self)190 pub fn ref_inc(&mut self) { 191 assert!(self.ref_count < usize::MAX); 192 self.ref_count += 1; 193 } 194 195 /// Decrements the stream's ref count ref_dec(&mut self)196 pub fn ref_dec(&mut self) { 197 assert!(self.ref_count > 0); 198 self.ref_count -= 1; 199 } 200 201 /// Returns true if stream is currently being held for some time because of 202 /// a local reset. is_pending_reset_expiration(&self) -> bool203 pub fn is_pending_reset_expiration(&self) -> bool { 204 self.reset_at.is_some() 205 } 206 207 /// Returns true if frames for this stream are ready to be sent over the wire is_send_ready(&self) -> bool208 pub fn is_send_ready(&self) -> bool { 209 // Why do we check pending_open? 210 // 211 // We allow users to call send_request() which schedules a stream to be pending_open 212 // if there is no room according to the concurrency limit (max_send_streams), and we 213 // also allow data to be buffered for send with send_data() if there is no capacity for 214 // the stream to send the data, which attempts to place the stream in pending_send. 215 // If the stream is not open, we don't want the stream to be scheduled for 216 // execution (pending_send). Note that if the stream is in pending_open, it will be 217 // pushed to pending_send when there is room for an open stream. 218 // 219 // In pending_push we track whether a PushPromise still needs to be sent 220 // from a different stream before we can start sending frames on this one. 221 // This is different from the "open" check because reserved streams don't count 222 // toward the concurrency limit. 223 // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 224 !self.is_pending_open && !self.is_pending_push 225 } 226 227 /// Returns true if the stream is closed is_closed(&self) -> bool228 pub fn is_closed(&self) -> bool { 229 // The state has fully transitioned to closed. 230 self.state.is_closed() && 231 // Because outbound frames transition the stream state before being 232 // buffered, we have to ensure that all frames have been flushed. 233 self.pending_send.is_empty() && 234 // Sometimes large data frames are sent out in chunks. After a chunk 235 // of the frame is sent, the remainder is pushed back onto the send 236 // queue to be rescheduled. 237 // 238 // Checking for additional buffered data lets us catch this case. 239 self.buffered_send_data == 0 240 } 241 242 /// Returns true if the stream is no longer in use is_released(&self) -> bool243 pub fn is_released(&self) -> bool { 244 // The stream is closed and fully flushed 245 self.is_closed() && 246 // There are no more outstanding references to the stream 247 self.ref_count == 0 && 248 // The stream is not in any queue 249 !self.is_pending_send && !self.is_pending_send_capacity && 250 !self.is_pending_accept && !self.is_pending_window_update && 251 !self.is_pending_open && !self.reset_at.is_some() 252 } 253 254 /// Returns true when the consumer of the stream has dropped all handles 255 /// (indicating no further interest in the stream) and the stream state is 256 /// not actually closed. 257 /// 258 /// In this case, a reset should be sent. is_canceled_interest(&self) -> bool259 pub fn is_canceled_interest(&self) -> bool { 260 self.ref_count == 0 && !self.state.is_closed() 261 } 262 assign_capacity(&mut self, capacity: WindowSize)263 pub fn assign_capacity(&mut self, capacity: WindowSize) { 264 debug_assert!(capacity > 0); 265 self.send_capacity_inc = true; 266 self.send_flow.assign_capacity(capacity); 267 268 tracing::trace!( 269 " assigned capacity to stream; available={}; buffered={}; id={:?}", 270 self.send_flow.available(), 271 self.buffered_send_data, 272 self.id 273 ); 274 275 // Only notify if the capacity exceeds the amount of buffered data 276 if self.send_flow.available() > self.buffered_send_data { 277 tracing::trace!(" notifying task"); 278 self.notify_send(); 279 } 280 } 281 282 /// Returns `Err` when the decrement cannot be completed due to overflow. dec_content_length(&mut self, len: usize) -> Result<(), ()>283 pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { 284 match self.content_length { 285 ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { 286 Some(val) => *rem = val, 287 None => return Err(()), 288 }, 289 ContentLength::Head => { 290 if len != 0 { 291 return Err(()); 292 } 293 } 294 _ => {} 295 } 296 297 Ok(()) 298 } 299 ensure_content_length_zero(&self) -> Result<(), ()>300 pub fn ensure_content_length_zero(&self) -> Result<(), ()> { 301 match self.content_length { 302 ContentLength::Remaining(0) => Ok(()), 303 ContentLength::Remaining(_) => Err(()), 304 _ => Ok(()), 305 } 306 } 307 notify_send(&mut self)308 pub fn notify_send(&mut self) { 309 if let Some(task) = self.send_task.take() { 310 task.wake(); 311 } 312 } 313 wait_send(&mut self, cx: &Context)314 pub fn wait_send(&mut self, cx: &Context) { 315 self.send_task = Some(cx.waker().clone()); 316 } 317 notify_recv(&mut self)318 pub fn notify_recv(&mut self) { 319 if let Some(task) = self.recv_task.take() { 320 task.wake(); 321 } 322 } 323 } 324 325 impl store::Next for NextAccept { next(stream: &Stream) -> Option<store::Key>326 fn next(stream: &Stream) -> Option<store::Key> { 327 stream.next_pending_accept 328 } 329 set_next(stream: &mut Stream, key: Option<store::Key>)330 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 331 stream.next_pending_accept = key; 332 } 333 take_next(stream: &mut Stream) -> Option<store::Key>334 fn take_next(stream: &mut Stream) -> Option<store::Key> { 335 stream.next_pending_accept.take() 336 } 337 is_queued(stream: &Stream) -> bool338 fn is_queued(stream: &Stream) -> bool { 339 stream.is_pending_accept 340 } 341 set_queued(stream: &mut Stream, val: bool)342 fn set_queued(stream: &mut Stream, val: bool) { 343 stream.is_pending_accept = val; 344 } 345 } 346 347 impl store::Next for NextSend { next(stream: &Stream) -> Option<store::Key>348 fn next(stream: &Stream) -> Option<store::Key> { 349 stream.next_pending_send 350 } 351 set_next(stream: &mut Stream, key: Option<store::Key>)352 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 353 stream.next_pending_send = key; 354 } 355 take_next(stream: &mut Stream) -> Option<store::Key>356 fn take_next(stream: &mut Stream) -> Option<store::Key> { 357 stream.next_pending_send.take() 358 } 359 is_queued(stream: &Stream) -> bool360 fn is_queued(stream: &Stream) -> bool { 361 stream.is_pending_send 362 } 363 set_queued(stream: &mut Stream, val: bool)364 fn set_queued(stream: &mut Stream, val: bool) { 365 if val { 366 // ensure that stream is not queued for being opened 367 // if it's being put into queue for sending data 368 debug_assert_eq!(stream.is_pending_open, false); 369 } 370 stream.is_pending_send = val; 371 } 372 } 373 374 impl store::Next for NextSendCapacity { next(stream: &Stream) -> Option<store::Key>375 fn next(stream: &Stream) -> Option<store::Key> { 376 stream.next_pending_send_capacity 377 } 378 set_next(stream: &mut Stream, key: Option<store::Key>)379 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 380 stream.next_pending_send_capacity = key; 381 } 382 take_next(stream: &mut Stream) -> Option<store::Key>383 fn take_next(stream: &mut Stream) -> Option<store::Key> { 384 stream.next_pending_send_capacity.take() 385 } 386 is_queued(stream: &Stream) -> bool387 fn is_queued(stream: &Stream) -> bool { 388 stream.is_pending_send_capacity 389 } 390 set_queued(stream: &mut Stream, val: bool)391 fn set_queued(stream: &mut Stream, val: bool) { 392 stream.is_pending_send_capacity = val; 393 } 394 } 395 396 impl store::Next for NextWindowUpdate { next(stream: &Stream) -> Option<store::Key>397 fn next(stream: &Stream) -> Option<store::Key> { 398 stream.next_window_update 399 } 400 set_next(stream: &mut Stream, key: Option<store::Key>)401 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 402 stream.next_window_update = key; 403 } 404 take_next(stream: &mut Stream) -> Option<store::Key>405 fn take_next(stream: &mut Stream) -> Option<store::Key> { 406 stream.next_window_update.take() 407 } 408 is_queued(stream: &Stream) -> bool409 fn is_queued(stream: &Stream) -> bool { 410 stream.is_pending_window_update 411 } 412 set_queued(stream: &mut Stream, val: bool)413 fn set_queued(stream: &mut Stream, val: bool) { 414 stream.is_pending_window_update = val; 415 } 416 } 417 418 impl store::Next for NextOpen { next(stream: &Stream) -> Option<store::Key>419 fn next(stream: &Stream) -> Option<store::Key> { 420 stream.next_open 421 } 422 set_next(stream: &mut Stream, key: Option<store::Key>)423 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 424 stream.next_open = key; 425 } 426 take_next(stream: &mut Stream) -> Option<store::Key>427 fn take_next(stream: &mut Stream) -> Option<store::Key> { 428 stream.next_open.take() 429 } 430 is_queued(stream: &Stream) -> bool431 fn is_queued(stream: &Stream) -> bool { 432 stream.is_pending_open 433 } 434 set_queued(stream: &mut Stream, val: bool)435 fn set_queued(stream: &mut Stream, val: bool) { 436 if val { 437 // ensure that stream is not queued for being sent 438 // if it's being put into queue for opening the stream 439 debug_assert_eq!(stream.is_pending_send, false); 440 } 441 stream.is_pending_open = val; 442 } 443 } 444 445 impl store::Next for NextResetExpire { next(stream: &Stream) -> Option<store::Key>446 fn next(stream: &Stream) -> Option<store::Key> { 447 stream.next_reset_expire 448 } 449 set_next(stream: &mut Stream, key: Option<store::Key>)450 fn set_next(stream: &mut Stream, key: Option<store::Key>) { 451 stream.next_reset_expire = key; 452 } 453 take_next(stream: &mut Stream) -> Option<store::Key>454 fn take_next(stream: &mut Stream) -> Option<store::Key> { 455 stream.next_reset_expire.take() 456 } 457 is_queued(stream: &Stream) -> bool458 fn is_queued(stream: &Stream) -> bool { 459 stream.reset_at.is_some() 460 } 461 set_queued(stream: &mut Stream, val: bool)462 fn set_queued(stream: &mut Stream, val: bool) { 463 if val { 464 stream.reset_at = Some(Instant::now()); 465 } else { 466 stream.reset_at = None; 467 } 468 } 469 } 470 471 // ===== impl ContentLength ===== 472 473 impl ContentLength { is_head(&self) -> bool474 pub fn is_head(&self) -> bool { 475 match *self { 476 ContentLength::Head => true, 477 _ => false, 478 } 479 } 480 } 481