1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 // Buffering data to send until it is acked. 8 9 use std::cell::RefCell; 10 use std::cmp::{max, min, Ordering}; 11 use std::collections::{BTreeMap, VecDeque}; 12 use std::convert::TryFrom; 13 use std::mem; 14 use std::ops::Add; 15 use std::rc::Rc; 16 17 use indexmap::IndexMap; 18 use smallvec::SmallVec; 19 20 use neqo_common::{qdebug, qerror, qinfo, qtrace, Encoder, Role}; 21 22 use crate::events::ConnectionEvents; 23 use crate::fc::SenderFlowControl; 24 use crate::frame::{Frame, FRAME_TYPE_RESET_STREAM}; 25 use crate::packet::PacketBuilder; 26 use crate::recovery::{RecoveryToken, StreamRecoveryToken}; 27 use crate::stats::FrameStats; 28 use crate::stream_id::StreamId; 29 use crate::tparams::{self, TransportParameters}; 30 use crate::{AppError, Error, Res}; 31 32 pub const SEND_BUFFER_SIZE: usize = 0x10_0000; // 1 MiB 33 34 /// The priority that is assigned to sending data for the stream. 35 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 36 pub enum TransmissionPriority { 37 /// This stream is more important than the functioning of the connection. 38 /// Don't use this priority unless the stream really is that important. 39 /// A stream at this priority can starve out other connection functions, 40 /// including flow control, which could be very bad. 41 Critical, 42 /// The stream is very important. Stream data will be written ahead of 43 /// some of the less critical connection functions, like path validation, 44 /// connection ID management, and session tickets. 45 Important, 46 /// High priority streams are important, but not enough to disrupt 47 /// connection operation. They go ahead of session tickets though. 48 High, 49 /// The default priority. 50 Normal, 51 /// Low priority streams get sent last. 52 Low, 53 } 54 55 impl Default for TransmissionPriority { default() -> Self56 fn default() -> Self { 57 Self::Normal 58 } 59 } 60 61 impl PartialOrd for TransmissionPriority { partial_cmp(&self, other: &Self) -> Option<Ordering>62 fn partial_cmp(&self, other: &Self) -> Option<Ordering> { 63 Some(self.cmp(other)) 64 } 65 } 66 67 impl Ord for TransmissionPriority { cmp(&self, other: &Self) -> Ordering68 fn cmp(&self, other: &Self) -> Ordering { 69 if self == other { 70 return Ordering::Equal; 71 } 72 match (self, other) { 73 (Self::Critical, _) => Ordering::Greater, 74 (_, Self::Critical) => Ordering::Less, 75 (Self::Important, _) => Ordering::Greater, 76 (_, Self::Important) => Ordering::Less, 77 (Self::High, _) => Ordering::Greater, 78 (_, Self::High) => Ordering::Less, 79 (Self::Normal, _) => Ordering::Greater, 80 (_, Self::Normal) => Ordering::Less, 81 _ => unreachable!(), 82 } 83 } 84 } 85 86 impl Add<RetransmissionPriority> for TransmissionPriority { 87 type Output = Self; add(self, rhs: RetransmissionPriority) -> Self::Output88 fn add(self, rhs: RetransmissionPriority) -> Self::Output { 89 match rhs { 90 RetransmissionPriority::Fixed(fixed) => fixed, 91 RetransmissionPriority::Same => self, 92 RetransmissionPriority::Higher => match self { 93 Self::Critical => Self::Critical, 94 Self::Important | Self::High => Self::Important, 95 Self::Normal => Self::High, 96 Self::Low => Self::Normal, 97 }, 98 RetransmissionPriority::MuchHigher => match self { 99 Self::Critical | Self::Important => Self::Critical, 100 Self::High | Self::Normal => Self::Important, 101 Self::Low => Self::High, 102 }, 103 } 104 } 105 } 106 107 /// If data is lost, this determines the priority that applies to retransmissions 108 /// of that data. 109 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 110 pub enum RetransmissionPriority { 111 /// Prioritize retransmission at a fixed priority. 112 /// With this, it is possible to prioritize retransmissions lower than transmissions. 113 /// Doing that can create a deadlock with flow control which might cause the connection 114 /// to stall unless new data stops arriving fast enough that retransmissions can complete. 115 Fixed(TransmissionPriority), 116 /// Don't increase priority for retransmission. This is probably not a good idea 117 /// as it could mean starving flow control. 118 Same, 119 /// Increase the priority of retransmissions (the default). 120 /// Retransmissions of `Critical` or `Important` aren't elevated at all. 121 Higher, 122 /// Increase the priority of retransmissions a lot. 123 /// This is useful for streams that are particularly exposed to head-of-line blocking. 124 MuchHigher, 125 } 126 127 impl Default for RetransmissionPriority { default() -> Self128 fn default() -> Self { 129 Self::Higher 130 } 131 } 132 133 #[derive(Debug, PartialEq, Clone, Copy)] 134 enum RangeState { 135 Sent, 136 Acked, 137 } 138 139 /// Track ranges in the stream as sent or acked. Acked implies sent. Not in a 140 /// range implies needing-to-be-sent, either initially or as a retransmission. 141 #[derive(Debug, Default, PartialEq)] 142 struct RangeTracker { 143 // offset, (len, RangeState). Use u64 for len because ranges can exceed 32bits. 144 used: BTreeMap<u64, (u64, RangeState)>, 145 } 146 147 impl RangeTracker { highest_offset(&self) -> u64148 fn highest_offset(&self) -> u64 { 149 self.used 150 .range(..) 151 .next_back() 152 .map_or(0, |(k, (v, _))| *k + *v) 153 } 154 acked_from_zero(&self) -> u64155 fn acked_from_zero(&self) -> u64 { 156 self.used 157 .get(&0) 158 .filter(|(_, state)| *state == RangeState::Acked) 159 .map_or(0, |(v, _)| *v) 160 } 161 162 /// Find the first unmarked range. If all are contiguous, this will return 163 /// (highest_offset(), None). first_unmarked_range(&self) -> (u64, Option<u64>)164 fn first_unmarked_range(&self) -> (u64, Option<u64>) { 165 let mut prev_end = 0; 166 167 for (cur_off, (cur_len, _)) in &self.used { 168 if prev_end == *cur_off { 169 prev_end = cur_off + cur_len; 170 } else { 171 return (prev_end, Some(cur_off - prev_end)); 172 } 173 } 174 (prev_end, None) 175 } 176 177 /// Turn one range into a list of subranges that align with existing 178 /// ranges. 179 /// Check impermissible overlaps in subregions: Sent cannot overwrite Acked. 180 // 181 // e.g. given N is new and ABC are existing: 182 // NNNNNNNNNNNNNNNN 183 // AAAAA BBBCCCCC ...then we want 5 chunks: 184 // 1122222333444555 185 // 186 // but also if we have this: 187 // NNNNNNNNNNNNNNNN 188 // AAAAAAAAAA BBBB ...then break existing A and B ranges up: 189 // 190 // 1111111122222233 191 // aaAAAAAAAA BBbb 192 // 193 // Doing all this work up front should make handling each chunk much 194 // easier. chunk_range_on_edges( &mut self, new_off: u64, new_len: u64, new_state: RangeState, ) -> Vec<(u64, u64, RangeState)>195 fn chunk_range_on_edges( 196 &mut self, 197 new_off: u64, 198 new_len: u64, 199 new_state: RangeState, 200 ) -> Vec<(u64, u64, RangeState)> { 201 let mut tmp_off = new_off; 202 let mut tmp_len = new_len; 203 let mut v = Vec::new(); 204 205 // cut previous overlapping range if needed 206 let prev = self.used.range_mut(..tmp_off).next_back(); 207 if let Some((prev_off, (prev_len, prev_state))) = prev { 208 let prev_state = *prev_state; 209 let overlap = (*prev_off + *prev_len).saturating_sub(new_off); 210 *prev_len -= overlap; 211 if overlap > 0 { 212 self.used.insert(new_off, (overlap, prev_state)); 213 } 214 } 215 216 let mut last_existing_remaining = None; 217 for (off, (len, state)) in self.used.range(tmp_off..tmp_off + tmp_len) { 218 // Create chunk for "overhang" before an existing range 219 if tmp_off < *off { 220 let sub_len = off - tmp_off; 221 v.push((tmp_off, sub_len, new_state)); 222 tmp_off += sub_len; 223 tmp_len -= sub_len; 224 } 225 226 // Create chunk to match existing range 227 let sub_len = min(*len, tmp_len); 228 let remaining_len = len - sub_len; 229 if new_state == RangeState::Sent && *state == RangeState::Acked { 230 qinfo!( 231 "Attempted to downgrade overlapping range Acked range {}-{} with Sent {}-{}", 232 off, 233 len, 234 new_off, 235 new_len 236 ); 237 } else { 238 v.push((tmp_off, sub_len, new_state)); 239 } 240 tmp_off += sub_len; 241 tmp_len -= sub_len; 242 243 if remaining_len > 0 { 244 last_existing_remaining = Some((*off, sub_len, remaining_len, *state)); 245 } 246 } 247 248 // Maybe break last existing range in two so that a final chunk will 249 // have the same length as an existing range entry 250 if let Some((off, sub_len, remaining_len, state)) = last_existing_remaining { 251 *self.used.get_mut(&off).expect("must be there") = (sub_len, state); 252 self.used.insert(off + sub_len, (remaining_len, state)); 253 } 254 255 // Create final chunk if anything remains of the new range 256 if tmp_len > 0 { 257 v.push((tmp_off, tmp_len, new_state)) 258 } 259 260 v 261 } 262 263 /// Merge contiguous Acked ranges into the first entry (0). This range may 264 /// be dropped from the send buffer. coalesce_acked_from_zero(&mut self)265 fn coalesce_acked_from_zero(&mut self) { 266 let acked_range_from_zero = self 267 .used 268 .get_mut(&0) 269 .filter(|(_, state)| *state == RangeState::Acked) 270 .map(|(len, _)| *len); 271 272 if let Some(len_from_zero) = acked_range_from_zero { 273 let mut to_remove = SmallVec::<[_; 8]>::new(); 274 275 let mut new_len_from_zero = len_from_zero; 276 277 // See if there's another Acked range entry contiguous to this one 278 while let Some((next_len, _)) = self 279 .used 280 .get(&new_len_from_zero) 281 .filter(|(_, state)| *state == RangeState::Acked) 282 { 283 to_remove.push(new_len_from_zero); 284 new_len_from_zero += *next_len; 285 } 286 287 if len_from_zero != new_len_from_zero { 288 self.used.get_mut(&0).expect("must be there").0 = new_len_from_zero; 289 } 290 291 for val in to_remove { 292 self.used.remove(&val); 293 } 294 } 295 } 296 mark_range(&mut self, off: u64, len: usize, state: RangeState)297 fn mark_range(&mut self, off: u64, len: usize, state: RangeState) { 298 if len == 0 { 299 qinfo!("mark 0-length range at {}", off); 300 return; 301 } 302 303 let subranges = self.chunk_range_on_edges(off, len as u64, state); 304 305 for (sub_off, sub_len, sub_state) in subranges { 306 self.used.insert(sub_off, (sub_len, sub_state)); 307 } 308 309 self.coalesce_acked_from_zero() 310 } 311 unmark_range(&mut self, off: u64, len: usize)312 fn unmark_range(&mut self, off: u64, len: usize) { 313 if len == 0 { 314 qdebug!("unmark 0-length range at {}", off); 315 return; 316 } 317 318 let len = u64::try_from(len).unwrap(); 319 let end_off = off + len; 320 321 let mut to_remove = SmallVec::<[_; 8]>::new(); 322 let mut to_add = None; 323 324 // Walk backwards through possibly affected existing ranges 325 for (cur_off, (cur_len, cur_state)) in self.used.range_mut(..off + len).rev() { 326 // Maybe fixup range preceding the removed range 327 if *cur_off < off { 328 // Check for overlap 329 if *cur_off + *cur_len > off { 330 if *cur_state == RangeState::Acked { 331 qdebug!( 332 "Attempted to unmark Acked range {}-{} with unmark_range {}-{}", 333 cur_off, 334 cur_len, 335 off, 336 off + len 337 ); 338 } else { 339 *cur_len = off - cur_off; 340 } 341 } 342 break; 343 } 344 345 if *cur_state == RangeState::Acked { 346 qdebug!( 347 "Attempted to unmark Acked range {}-{} with unmark_range {}-{}", 348 cur_off, 349 cur_len, 350 off, 351 off + len 352 ); 353 continue; 354 } 355 356 // Add a new range for old subrange extending beyond 357 // to-be-unmarked range 358 let cur_end_off = cur_off + *cur_len; 359 if cur_end_off > end_off { 360 let new_cur_off = off + len; 361 let new_cur_len = cur_end_off - end_off; 362 assert_eq!(to_add, None); 363 to_add = Some((new_cur_off, new_cur_len, *cur_state)); 364 } 365 366 to_remove.push(*cur_off); 367 } 368 369 for remove_off in to_remove { 370 self.used.remove(&remove_off); 371 } 372 373 if let Some((new_cur_off, new_cur_len, cur_state)) = to_add { 374 self.used.insert(new_cur_off, (new_cur_len, cur_state)); 375 } 376 } 377 378 /// Unmark all sent ranges. unmark_sent(&mut self)379 pub fn unmark_sent(&mut self) { 380 self.unmark_range(0, usize::try_from(self.highest_offset()).unwrap()); 381 } 382 } 383 384 /// Buffer to contain queued bytes and track their state. 385 #[derive(Debug, Default, PartialEq)] 386 pub struct TxBuffer { 387 retired: u64, // contig acked bytes, no longer in buffer 388 send_buf: VecDeque<u8>, // buffer of not-acked bytes 389 ranges: RangeTracker, // ranges in buffer that have been sent or acked 390 } 391 392 impl TxBuffer { new() -> Self393 pub fn new() -> Self { 394 Self::default() 395 } 396 397 /// Attempt to add some or all of the passed-in buffer to the TxBuffer. send(&mut self, buf: &[u8]) -> usize398 pub fn send(&mut self, buf: &[u8]) -> usize { 399 let can_buffer = min(SEND_BUFFER_SIZE - self.buffered(), buf.len()); 400 if can_buffer > 0 { 401 self.send_buf.extend(&buf[..can_buffer]); 402 assert!(self.send_buf.len() <= SEND_BUFFER_SIZE); 403 } 404 can_buffer 405 } 406 next_bytes(&self) -> Option<(u64, &[u8])>407 pub fn next_bytes(&self) -> Option<(u64, &[u8])> { 408 let (start, maybe_len) = self.ranges.first_unmarked_range(); 409 410 if start == self.retired + u64::try_from(self.buffered()).unwrap() { 411 return None; 412 } 413 414 // Convert from ranges-relative-to-zero to 415 // ranges-relative-to-buffer-start 416 let buff_off = usize::try_from(start - self.retired).unwrap(); 417 418 // Deque returns two slices. Create a subslice from whichever 419 // one contains the first unmarked data. 420 let slc = if buff_off < self.send_buf.as_slices().0.len() { 421 &self.send_buf.as_slices().0[buff_off..] 422 } else { 423 &self.send_buf.as_slices().1[buff_off - self.send_buf.as_slices().0.len()..] 424 }; 425 426 let len = if let Some(range_len) = maybe_len { 427 // Truncate if range crosses deque slices 428 min(usize::try_from(range_len).unwrap(), slc.len()) 429 } else { 430 slc.len() 431 }; 432 433 debug_assert!(len > 0); 434 debug_assert!(len <= slc.len()); 435 436 Some((start, &slc[..len])) 437 } 438 mark_as_sent(&mut self, offset: u64, len: usize)439 pub fn mark_as_sent(&mut self, offset: u64, len: usize) { 440 self.ranges.mark_range(offset, len, RangeState::Sent) 441 } 442 mark_as_acked(&mut self, offset: u64, len: usize)443 pub fn mark_as_acked(&mut self, offset: u64, len: usize) { 444 self.ranges.mark_range(offset, len, RangeState::Acked); 445 446 // We can drop contig acked range from the buffer 447 let new_retirable = self.ranges.acked_from_zero() - self.retired; 448 debug_assert!(new_retirable <= self.buffered() as u64); 449 let keep_len = 450 self.buffered() - usize::try_from(new_retirable).expect("should fit in usize"); 451 452 // Truncate front 453 self.send_buf.rotate_left(self.buffered() - keep_len); 454 self.send_buf.truncate(keep_len); 455 456 self.retired += new_retirable; 457 } 458 mark_as_lost(&mut self, offset: u64, len: usize)459 pub fn mark_as_lost(&mut self, offset: u64, len: usize) { 460 self.ranges.unmark_range(offset, len) 461 } 462 463 /// Forget about anything that was marked as sent. unmark_sent(&mut self)464 pub fn unmark_sent(&mut self) { 465 self.ranges.unmark_sent(); 466 } 467 buffered(&self) -> usize468 fn buffered(&self) -> usize { 469 self.send_buf.len() 470 } 471 avail(&self) -> usize472 fn avail(&self) -> usize { 473 SEND_BUFFER_SIZE - self.buffered() 474 } 475 used(&self) -> u64476 fn used(&self) -> u64 { 477 self.retired + u64::try_from(self.buffered()).unwrap() 478 } 479 } 480 481 /// QUIC sending stream states, based on -transport 3.1. 482 #[derive(Debug)] 483 pub(crate) enum SendStreamState { 484 Ready { 485 fc: SenderFlowControl<StreamId>, 486 conn_fc: Rc<RefCell<SenderFlowControl<()>>>, 487 }, 488 Send { 489 fc: SenderFlowControl<StreamId>, 490 conn_fc: Rc<RefCell<SenderFlowControl<()>>>, 491 send_buf: TxBuffer, 492 }, 493 // Note: `DataSent` is entered when the stream is closed, not when all data has been 494 // sent for the first time. 495 DataSent { 496 send_buf: TxBuffer, 497 fin_sent: bool, 498 fin_acked: bool, 499 }, 500 DataRecvd, 501 ResetSent { 502 err: AppError, 503 final_size: u64, 504 priority: Option<TransmissionPriority>, 505 }, 506 ResetRecvd, 507 } 508 509 impl SendStreamState { tx_buf_mut(&mut self) -> Option<&mut TxBuffer>510 fn tx_buf_mut(&mut self) -> Option<&mut TxBuffer> { 511 match self { 512 Self::Send { send_buf, .. } | Self::DataSent { send_buf, .. } => Some(send_buf), 513 Self::Ready { .. } 514 | Self::DataRecvd { .. } 515 | Self::ResetSent { .. } 516 | Self::ResetRecvd => None, 517 } 518 } 519 tx_avail(&self) -> usize520 fn tx_avail(&self) -> usize { 521 match self { 522 // In Ready, TxBuffer not yet allocated but size is known 523 Self::Ready { .. } => SEND_BUFFER_SIZE, 524 Self::Send { send_buf, .. } | Self::DataSent { send_buf, .. } => send_buf.avail(), 525 Self::DataRecvd { .. } | Self::ResetSent { .. } | Self::ResetRecvd => 0, 526 } 527 } 528 name(&self) -> &str529 fn name(&self) -> &str { 530 match self { 531 Self::Ready { .. } => "Ready", 532 Self::Send { .. } => "Send", 533 Self::DataSent { .. } => "DataSent", 534 Self::DataRecvd { .. } => "DataRecvd", 535 Self::ResetSent { .. } => "ResetSent", 536 Self::ResetRecvd => "ResetRecvd", 537 } 538 } 539 transition(&mut self, new_state: Self)540 fn transition(&mut self, new_state: Self) { 541 qtrace!("SendStream state {} -> {}", self.name(), new_state.name()); 542 *self = new_state; 543 } 544 } 545 546 /// Implement a QUIC send stream. 547 #[derive(Debug)] 548 pub struct SendStream { 549 stream_id: StreamId, 550 state: SendStreamState, 551 conn_events: ConnectionEvents, 552 priority: TransmissionPriority, 553 retransmission_priority: RetransmissionPriority, 554 retransmission_offset: u64, 555 } 556 557 impl SendStream { new( stream_id: StreamId, max_stream_data: u64, conn_fc: Rc<RefCell<SenderFlowControl<()>>>, conn_events: ConnectionEvents, ) -> Self558 pub fn new( 559 stream_id: StreamId, 560 max_stream_data: u64, 561 conn_fc: Rc<RefCell<SenderFlowControl<()>>>, 562 conn_events: ConnectionEvents, 563 ) -> Self { 564 let ss = Self { 565 stream_id, 566 state: SendStreamState::Ready { 567 fc: SenderFlowControl::new(stream_id, max_stream_data), 568 conn_fc, 569 }, 570 conn_events, 571 priority: TransmissionPriority::default(), 572 retransmission_priority: RetransmissionPriority::default(), 573 retransmission_offset: 0, 574 }; 575 if ss.avail() > 0 { 576 ss.conn_events.send_stream_writable(stream_id); 577 } 578 ss 579 } 580 set_priority( &mut self, transmission: TransmissionPriority, retransmission: RetransmissionPriority, )581 pub fn set_priority( 582 &mut self, 583 transmission: TransmissionPriority, 584 retransmission: RetransmissionPriority, 585 ) { 586 self.priority = transmission; 587 self.retransmission_priority = retransmission; 588 } 589 590 /// If all data has been buffered or written, how much was sent. final_size(&self) -> Option<u64>591 pub fn final_size(&self) -> Option<u64> { 592 match &self.state { 593 SendStreamState::DataSent { send_buf, .. } => Some(send_buf.used()), 594 SendStreamState::ResetSent { final_size, .. } => Some(*final_size), 595 _ => None, 596 } 597 } 598 599 /// Return the next range to be sent, if any. 600 /// If this is a retransmission, cut off what is sent at the retransmission 601 /// offset. next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])>602 fn next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])> { 603 match self.state { 604 SendStreamState::Send { ref send_buf, .. } => { 605 send_buf.next_bytes().and_then(|(offset, slice)| { 606 if retransmission_only { 607 qtrace!( 608 [self], 609 "next_bytes apply retransmission limit at {}", 610 self.retransmission_offset 611 ); 612 if self.retransmission_offset > offset { 613 let len = min( 614 usize::try_from(self.retransmission_offset - offset).unwrap(), 615 slice.len(), 616 ); 617 Some((offset, &slice[..len])) 618 } else { 619 None 620 } 621 } else { 622 Some((offset, slice)) 623 } 624 }) 625 } 626 SendStreamState::DataSent { 627 ref send_buf, 628 fin_sent, 629 .. 630 } => { 631 let bytes = send_buf.next_bytes(); 632 if bytes.is_some() { 633 bytes 634 } else if fin_sent { 635 None 636 } else { 637 // Send empty stream frame with fin set 638 Some((send_buf.used(), &[])) 639 } 640 } 641 SendStreamState::Ready { .. } 642 | SendStreamState::DataRecvd { .. } 643 | SendStreamState::ResetSent { .. } 644 | SendStreamState::ResetRecvd => None, 645 } 646 } 647 648 /// Calculate how many bytes (length) can fit into available space and whether 649 /// the remainder of the space can be filled (or if a length field is needed). length_and_fill(data_len: usize, space: usize) -> (usize, bool)650 fn length_and_fill(data_len: usize, space: usize) -> (usize, bool) { 651 if data_len >= space { 652 // More data than space allows, or an exact fit => fast path. 653 qtrace!("SendStream::length_and_fill fill {}", space); 654 return (space, true); 655 } 656 657 // Estimate size of the length field based on the available space, 658 // less 1, which is the worst case. 659 let length = min(space.saturating_sub(1), data_len); 660 let length_len = Encoder::varint_len(u64::try_from(length).unwrap()); 661 debug_assert!(length_len <= space); // We don't depend on this being true, but it is true. 662 663 // From here we can always fit `data_len`, but we might as well fill 664 // if there is no space for the length field plus another frame. 665 let fill = data_len + length_len + PacketBuilder::MINIMUM_FRAME_SIZE > space; 666 qtrace!("SendStream::length_and_fill {} fill {}", data_len, fill); 667 (data_len, fill) 668 } 669 670 /// Maybe write a `STREAM` frame. write_stream_frame( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )671 fn write_stream_frame( 672 &mut self, 673 priority: TransmissionPriority, 674 builder: &mut PacketBuilder, 675 tokens: &mut Vec<RecoveryToken>, 676 stats: &mut FrameStats, 677 ) { 678 let retransmission = if priority == self.priority { 679 false 680 } else if priority == self.priority + self.retransmission_priority { 681 true 682 } else { 683 return; 684 }; 685 686 let id = self.stream_id; 687 let final_size = self.final_size(); 688 if let Some((offset, data)) = self.next_bytes(retransmission) { 689 let overhead = 1 // Frame type 690 + Encoder::varint_len(id.as_u64()) 691 + if offset > 0 { 692 Encoder::varint_len(offset) 693 } else { 694 0 695 }; 696 if overhead > builder.remaining() { 697 qtrace!([self], "write_frame no space for header"); 698 return; 699 } 700 701 let (length, fill) = Self::length_and_fill(data.len(), builder.remaining() - overhead); 702 let fin = final_size.map_or(false, |fs| fs == offset + u64::try_from(length).unwrap()); 703 if length == 0 && !fin { 704 qtrace!([self], "write_frame no data, no fin"); 705 return; 706 } 707 708 // Write the stream out. 709 builder.encode_varint(Frame::stream_type(fin, offset > 0, fill)); 710 builder.encode_varint(id.as_u64()); 711 if offset > 0 { 712 builder.encode_varint(offset); 713 } 714 if fill { 715 builder.encode(&data[..length]); 716 builder.mark_full(); 717 } else { 718 builder.encode_vvec(&data[..length]); 719 } 720 debug_assert!(builder.len() <= builder.limit()); 721 722 self.mark_as_sent(offset, length, fin); 723 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::Stream( 724 SendStreamRecoveryToken { 725 id, 726 offset, 727 length, 728 fin, 729 }, 730 ))); 731 stats.stream += 1; 732 } 733 } 734 reset_acked(&mut self)735 pub fn reset_acked(&mut self) { 736 match self.state { 737 SendStreamState::Ready { .. } 738 | SendStreamState::Send { .. } 739 | SendStreamState::DataSent { .. } 740 | SendStreamState::DataRecvd { .. } => { 741 qtrace!([self], "Reset acked while in {} state?", self.state.name()) 742 } 743 SendStreamState::ResetSent { .. } => self.state.transition(SendStreamState::ResetRecvd), 744 SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"), 745 }; 746 } 747 reset_lost(&mut self)748 pub fn reset_lost(&mut self) { 749 match self.state { 750 SendStreamState::ResetSent { 751 ref mut priority, .. 752 } => { 753 *priority = Some(self.priority + self.retransmission_priority); 754 } 755 SendStreamState::ResetRecvd => (), 756 _ => unreachable!(), 757 } 758 } 759 760 /// Maybe write a `RESET_STREAM` frame. write_reset_frame( &mut self, p: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, ) -> bool761 pub fn write_reset_frame( 762 &mut self, 763 p: TransmissionPriority, 764 builder: &mut PacketBuilder, 765 tokens: &mut Vec<RecoveryToken>, 766 stats: &mut FrameStats, 767 ) -> bool { 768 if let SendStreamState::ResetSent { 769 final_size, 770 err, 771 ref mut priority, 772 } = self.state 773 { 774 if *priority != Some(p) { 775 return false; 776 } 777 if builder.write_varint_frame(&[ 778 FRAME_TYPE_RESET_STREAM, 779 self.stream_id.as_u64(), 780 err, 781 final_size, 782 ]) { 783 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::ResetStream { 784 stream_id: self.stream_id, 785 })); 786 stats.reset_stream += 1; 787 *priority = None; 788 true 789 } else { 790 false 791 } 792 } else { 793 false 794 } 795 } 796 blocked_lost(&mut self, limit: u64)797 pub fn blocked_lost(&mut self, limit: u64) { 798 if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } = 799 &mut self.state 800 { 801 fc.frame_lost(limit); 802 } else { 803 qtrace!([self], "Ignoring lost STREAM_DATA_BLOCKED({})", limit); 804 } 805 } 806 807 /// Maybe write a `STREAM_DATA_BLOCKED` frame. write_blocked_frame( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )808 pub fn write_blocked_frame( 809 &mut self, 810 priority: TransmissionPriority, 811 builder: &mut PacketBuilder, 812 tokens: &mut Vec<RecoveryToken>, 813 stats: &mut FrameStats, 814 ) { 815 // Send STREAM_DATA_BLOCKED at normal priority always. 816 if priority == self.priority { 817 if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } = 818 &mut self.state 819 { 820 fc.write_frames(builder, tokens, stats); 821 } 822 } 823 } 824 mark_as_sent(&mut self, offset: u64, len: usize, fin: bool)825 pub fn mark_as_sent(&mut self, offset: u64, len: usize, fin: bool) { 826 if let Some(buf) = self.state.tx_buf_mut() { 827 buf.mark_as_sent(offset, len); 828 self.send_blocked_if_space_needed(0); 829 }; 830 831 if fin { 832 if let SendStreamState::DataSent { fin_sent, .. } = &mut self.state { 833 *fin_sent = true; 834 } 835 } 836 } 837 mark_as_acked(&mut self, offset: u64, len: usize, fin: bool)838 pub fn mark_as_acked(&mut self, offset: u64, len: usize, fin: bool) { 839 match self.state { 840 SendStreamState::Send { 841 ref mut send_buf, .. 842 } => { 843 send_buf.mark_as_acked(offset, len); 844 if self.avail() > 0 { 845 self.conn_events.send_stream_writable(self.stream_id) 846 } 847 } 848 SendStreamState::DataSent { 849 ref mut send_buf, 850 ref mut fin_acked, 851 .. 852 } => { 853 send_buf.mark_as_acked(offset, len); 854 if fin { 855 *fin_acked = true; 856 } 857 if *fin_acked && send_buf.buffered() == 0 { 858 self.conn_events.send_stream_complete(self.stream_id); 859 self.state.transition(SendStreamState::DataRecvd); 860 } 861 } 862 _ => qtrace!( 863 [self], 864 "mark_as_acked called from state {}", 865 self.state.name() 866 ), 867 } 868 } 869 mark_as_lost(&mut self, offset: u64, len: usize, fin: bool)870 pub fn mark_as_lost(&mut self, offset: u64, len: usize, fin: bool) { 871 self.retransmission_offset = max( 872 self.retransmission_offset, 873 offset + u64::try_from(len).unwrap(), 874 ); 875 qtrace!( 876 [self], 877 "mark_as_lost retransmission offset={}", 878 self.retransmission_offset 879 ); 880 if let Some(buf) = self.state.tx_buf_mut() { 881 buf.mark_as_lost(offset, len); 882 } 883 884 if fin { 885 if let SendStreamState::DataSent { 886 fin_sent, 887 fin_acked, 888 .. 889 } = &mut self.state 890 { 891 *fin_sent = *fin_acked; 892 } 893 } 894 } 895 896 /// Bytes sendable on stream. Constrained by stream credit available, 897 /// connection credit available, and space in the tx buffer. avail(&self) -> usize898 pub fn avail(&self) -> usize { 899 if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } = 900 &self.state 901 { 902 min( 903 min(fc.available(), conn_fc.borrow().available()), 904 self.state.tx_avail(), 905 ) 906 } else { 907 0 908 } 909 } 910 set_max_stream_data(&mut self, limit: u64)911 pub fn set_max_stream_data(&mut self, limit: u64) { 912 if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } = 913 &mut self.state 914 { 915 let stream_was_blocked = fc.available() == 0; 916 fc.update(limit); 917 if stream_was_blocked && self.avail() > 0 { 918 self.conn_events.send_stream_writable(self.stream_id) 919 } 920 } 921 } 922 is_terminal(&self) -> bool923 pub fn is_terminal(&self) -> bool { 924 matches!( 925 self.state, 926 SendStreamState::DataRecvd { .. } | SendStreamState::ResetRecvd 927 ) 928 } 929 send(&mut self, buf: &[u8]) -> Res<usize>930 pub fn send(&mut self, buf: &[u8]) -> Res<usize> { 931 self.send_internal(buf, false) 932 } 933 send_atomic(&mut self, buf: &[u8]) -> Res<usize>934 pub fn send_atomic(&mut self, buf: &[u8]) -> Res<usize> { 935 self.send_internal(buf, true) 936 } 937 send_blocked_if_space_needed(&mut self, needed_space: usize)938 fn send_blocked_if_space_needed(&mut self, needed_space: usize) { 939 if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } = 940 &mut self.state 941 { 942 if fc.available() <= needed_space { 943 fc.blocked(); 944 } 945 946 if conn_fc.borrow().available() <= needed_space { 947 conn_fc.borrow_mut().blocked(); 948 } 949 } 950 } 951 send_internal(&mut self, buf: &[u8], atomic: bool) -> Res<usize>952 fn send_internal(&mut self, buf: &[u8], atomic: bool) -> Res<usize> { 953 if buf.is_empty() { 954 qerror!([self], "zero-length send on stream"); 955 return Err(Error::InvalidInput); 956 } 957 958 if let SendStreamState::Ready { fc, conn_fc } = &mut self.state { 959 let owned_fc = mem::replace(fc, SenderFlowControl::new(self.stream_id, 0)); 960 let owned_conn_fc = Rc::clone(conn_fc); 961 self.state.transition(SendStreamState::Send { 962 fc: owned_fc, 963 conn_fc: owned_conn_fc, 964 send_buf: TxBuffer::new(), 965 }); 966 } 967 968 if !matches!(self.state, SendStreamState::Send { .. }) { 969 return Err(Error::FinalSizeError); 970 } 971 972 let buf = if buf.is_empty() || (self.avail() == 0) { 973 return Ok(0); 974 } else if self.avail() < buf.len() { 975 if atomic { 976 self.send_blocked_if_space_needed(buf.len()); 977 return Ok(0); 978 } else { 979 &buf[..self.avail()] 980 } 981 } else { 982 buf 983 }; 984 985 match &mut self.state { 986 SendStreamState::Ready { .. } => unreachable!(), 987 SendStreamState::Send { 988 fc, 989 conn_fc, 990 send_buf, 991 } => { 992 let sent = send_buf.send(buf); 993 fc.consume(sent); 994 conn_fc.borrow_mut().consume(sent); 995 Ok(sent) 996 } 997 _ => Err(Error::FinalSizeError), 998 } 999 } 1000 close(&mut self)1001 pub fn close(&mut self) { 1002 match &mut self.state { 1003 SendStreamState::Ready { .. } => { 1004 self.state.transition(SendStreamState::DataSent { 1005 send_buf: TxBuffer::new(), 1006 fin_sent: false, 1007 fin_acked: false, 1008 }); 1009 } 1010 SendStreamState::Send { send_buf, .. } => { 1011 let owned_buf = mem::replace(send_buf, TxBuffer::new()); 1012 self.state.transition(SendStreamState::DataSent { 1013 send_buf: owned_buf, 1014 fin_sent: false, 1015 fin_acked: false, 1016 }); 1017 } 1018 SendStreamState::DataSent { .. } => qtrace!([self], "already in DataSent state"), 1019 SendStreamState::DataRecvd { .. } => qtrace!([self], "already in DataRecvd state"), 1020 SendStreamState::ResetSent { .. } => qtrace!([self], "already in ResetSent state"), 1021 SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"), 1022 } 1023 } 1024 reset(&mut self, err: AppError)1025 pub fn reset(&mut self, err: AppError) { 1026 match &self.state { 1027 SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } => { 1028 let final_size = fc.used(); 1029 self.state.transition(SendStreamState::ResetSent { 1030 err, 1031 final_size, 1032 priority: Some(self.priority), 1033 }); 1034 } 1035 SendStreamState::DataSent { send_buf, .. } => { 1036 let final_size = send_buf.used(); 1037 self.state.transition(SendStreamState::ResetSent { 1038 err, 1039 final_size, 1040 priority: Some(self.priority), 1041 }); 1042 } 1043 SendStreamState::DataRecvd { .. } => qtrace!([self], "already in DataRecvd state"), 1044 SendStreamState::ResetSent { .. } => qtrace!([self], "already in ResetSent state"), 1045 SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"), 1046 }; 1047 } 1048 1049 #[cfg(test)] state(&mut self) -> &mut SendStreamState1050 pub(crate) fn state(&mut self) -> &mut SendStreamState { 1051 &mut self.state 1052 } 1053 } 1054 1055 impl ::std::fmt::Display for SendStream { fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result1056 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { 1057 write!(f, "SendStream {}", self.stream_id) 1058 } 1059 } 1060 1061 #[derive(Debug, Default)] 1062 pub(crate) struct SendStreams(IndexMap<StreamId, SendStream>); 1063 1064 impl SendStreams { get(&self, id: StreamId) -> Res<&SendStream>1065 pub fn get(&self, id: StreamId) -> Res<&SendStream> { 1066 self.0.get(&id).ok_or(Error::InvalidStreamId) 1067 } 1068 get_mut(&mut self, id: StreamId) -> Res<&mut SendStream>1069 pub fn get_mut(&mut self, id: StreamId) -> Res<&mut SendStream> { 1070 self.0.get_mut(&id).ok_or(Error::InvalidStreamId) 1071 } 1072 exists(&self, id: StreamId) -> bool1073 pub fn exists(&self, id: StreamId) -> bool { 1074 self.0.contains_key(&id) 1075 } 1076 insert(&mut self, id: StreamId, stream: SendStream)1077 pub fn insert(&mut self, id: StreamId, stream: SendStream) { 1078 self.0.insert(id, stream); 1079 } 1080 acked(&mut self, token: &SendStreamRecoveryToken)1081 pub fn acked(&mut self, token: &SendStreamRecoveryToken) { 1082 if let Some(ss) = self.0.get_mut(&token.id) { 1083 ss.mark_as_acked(token.offset, token.length, token.fin); 1084 } 1085 } 1086 reset_acked(&mut self, id: StreamId)1087 pub fn reset_acked(&mut self, id: StreamId) { 1088 if let Some(ss) = self.0.get_mut(&id) { 1089 ss.reset_acked() 1090 } 1091 } 1092 lost(&mut self, token: &SendStreamRecoveryToken)1093 pub fn lost(&mut self, token: &SendStreamRecoveryToken) { 1094 if let Some(ss) = self.0.get_mut(&token.id) { 1095 ss.mark_as_lost(token.offset, token.length, token.fin); 1096 } 1097 } 1098 reset_lost(&mut self, stream_id: StreamId)1099 pub fn reset_lost(&mut self, stream_id: StreamId) { 1100 if let Some(ss) = self.0.get_mut(&stream_id) { 1101 ss.reset_lost(); 1102 } 1103 } 1104 blocked_lost(&mut self, stream_id: StreamId, limit: u64)1105 pub fn blocked_lost(&mut self, stream_id: StreamId, limit: u64) { 1106 if let Some(ss) = self.0.get_mut(&stream_id) { 1107 ss.blocked_lost(limit); 1108 } 1109 } 1110 clear(&mut self)1111 pub fn clear(&mut self) { 1112 self.0.clear() 1113 } 1114 clear_terminal(&mut self)1115 pub fn clear_terminal(&mut self) { 1116 self.0.retain(|_, stream| !stream.is_terminal()) 1117 } 1118 write_frames( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )1119 pub(crate) fn write_frames( 1120 &mut self, 1121 priority: TransmissionPriority, 1122 builder: &mut PacketBuilder, 1123 tokens: &mut Vec<RecoveryToken>, 1124 stats: &mut FrameStats, 1125 ) { 1126 qtrace!("write STREAM frames at priority {:?}", priority); 1127 for stream in self.0.values_mut() { 1128 if !stream.write_reset_frame(priority, builder, tokens, stats) { 1129 stream.write_blocked_frame(priority, builder, tokens, stats); 1130 stream.write_stream_frame(priority, builder, tokens, stats); 1131 } 1132 } 1133 } 1134 update_initial_limit(&mut self, remote: &TransportParameters)1135 pub fn update_initial_limit(&mut self, remote: &TransportParameters) { 1136 for (id, ss) in self.0.iter_mut() { 1137 let limit = if id.is_bidi() { 1138 assert!(!id.is_remote_initiated(Role::Client)); 1139 remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE) 1140 } else { 1141 remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_UNI) 1142 }; 1143 ss.set_max_stream_data(limit); 1144 } 1145 } 1146 } 1147 1148 impl<'a> IntoIterator for &'a mut SendStreams { 1149 type Item = (&'a StreamId, &'a mut SendStream); 1150 type IntoIter = indexmap::map::IterMut<'a, StreamId, SendStream>; 1151 into_iter(self) -> indexmap::map::IterMut<'a, StreamId, SendStream>1152 fn into_iter(self) -> indexmap::map::IterMut<'a, StreamId, SendStream> { 1153 self.0.iter_mut() 1154 } 1155 } 1156 1157 #[derive(Debug, Clone)] 1158 pub struct SendStreamRecoveryToken { 1159 pub(crate) id: StreamId, 1160 offset: u64, 1161 length: usize, 1162 fin: bool, 1163 } 1164 1165 #[cfg(test)] 1166 mod tests { 1167 use super::*; 1168 1169 use crate::events::ConnectionEvent; 1170 use neqo_common::{event::Provider, hex_with_len, qtrace}; 1171 connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>>1172 fn connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>> { 1173 Rc::new(RefCell::new(SenderFlowControl::new((), limit))) 1174 } 1175 1176 #[test] test_mark_range()1177 fn test_mark_range() { 1178 let mut rt = RangeTracker::default(); 1179 1180 // ranges can go from nothing->Sent if queued for retrans and then 1181 // acks arrive 1182 rt.mark_range(5, 5, RangeState::Acked); 1183 assert_eq!(rt.highest_offset(), 10); 1184 assert_eq!(rt.acked_from_zero(), 0); 1185 rt.mark_range(10, 4, RangeState::Acked); 1186 assert_eq!(rt.highest_offset(), 14); 1187 assert_eq!(rt.acked_from_zero(), 0); 1188 1189 rt.mark_range(0, 5, RangeState::Sent); 1190 assert_eq!(rt.highest_offset(), 14); 1191 assert_eq!(rt.acked_from_zero(), 0); 1192 rt.mark_range(0, 5, RangeState::Acked); 1193 assert_eq!(rt.highest_offset(), 14); 1194 assert_eq!(rt.acked_from_zero(), 14); 1195 1196 rt.mark_range(12, 20, RangeState::Acked); 1197 assert_eq!(rt.highest_offset(), 32); 1198 assert_eq!(rt.acked_from_zero(), 32); 1199 1200 // ack the lot 1201 rt.mark_range(0, 400, RangeState::Acked); 1202 assert_eq!(rt.highest_offset(), 400); 1203 assert_eq!(rt.acked_from_zero(), 400); 1204 1205 // acked trumps sent 1206 rt.mark_range(0, 200, RangeState::Sent); 1207 assert_eq!(rt.highest_offset(), 400); 1208 assert_eq!(rt.acked_from_zero(), 400); 1209 } 1210 1211 #[test] unmark_sent_start()1212 fn unmark_sent_start() { 1213 let mut rt = RangeTracker::default(); 1214 1215 rt.mark_range(0, 5, RangeState::Sent); 1216 assert_eq!(rt.highest_offset(), 5); 1217 assert_eq!(rt.acked_from_zero(), 0); 1218 1219 rt.unmark_sent(); 1220 assert_eq!(rt.highest_offset(), 0); 1221 assert_eq!(rt.acked_from_zero(), 0); 1222 assert_eq!(rt.first_unmarked_range(), (0, None)); 1223 } 1224 1225 #[test] unmark_sent_middle()1226 fn unmark_sent_middle() { 1227 let mut rt = RangeTracker::default(); 1228 1229 rt.mark_range(0, 5, RangeState::Acked); 1230 assert_eq!(rt.highest_offset(), 5); 1231 assert_eq!(rt.acked_from_zero(), 5); 1232 rt.mark_range(5, 5, RangeState::Sent); 1233 assert_eq!(rt.highest_offset(), 10); 1234 assert_eq!(rt.acked_from_zero(), 5); 1235 rt.mark_range(10, 5, RangeState::Acked); 1236 assert_eq!(rt.highest_offset(), 15); 1237 assert_eq!(rt.acked_from_zero(), 5); 1238 assert_eq!(rt.first_unmarked_range(), (15, None)); 1239 1240 rt.unmark_sent(); 1241 assert_eq!(rt.highest_offset(), 15); 1242 assert_eq!(rt.acked_from_zero(), 5); 1243 assert_eq!(rt.first_unmarked_range(), (5, Some(5))); 1244 } 1245 1246 #[test] unmark_sent_end()1247 fn unmark_sent_end() { 1248 let mut rt = RangeTracker::default(); 1249 1250 rt.mark_range(0, 5, RangeState::Acked); 1251 assert_eq!(rt.highest_offset(), 5); 1252 assert_eq!(rt.acked_from_zero(), 5); 1253 rt.mark_range(5, 5, RangeState::Sent); 1254 assert_eq!(rt.highest_offset(), 10); 1255 assert_eq!(rt.acked_from_zero(), 5); 1256 assert_eq!(rt.first_unmarked_range(), (10, None)); 1257 1258 rt.unmark_sent(); 1259 assert_eq!(rt.highest_offset(), 5); 1260 assert_eq!(rt.acked_from_zero(), 5); 1261 assert_eq!(rt.first_unmarked_range(), (5, None)); 1262 } 1263 1264 #[test] truncate_front()1265 fn truncate_front() { 1266 let mut v = VecDeque::new(); 1267 v.push_back(5); 1268 v.push_back(6); 1269 v.push_back(7); 1270 v.push_front(4usize); 1271 1272 v.rotate_left(1); 1273 v.truncate(3); 1274 assert_eq!(*v.front().unwrap(), 5); 1275 assert_eq!(*v.back().unwrap(), 7); 1276 } 1277 1278 #[test] test_unmark_range()1279 fn test_unmark_range() { 1280 let mut rt = RangeTracker::default(); 1281 1282 rt.mark_range(5, 5, RangeState::Acked); 1283 rt.mark_range(10, 5, RangeState::Sent); 1284 1285 // Should unmark sent but not acked range 1286 rt.unmark_range(7, 6); 1287 1288 let res = rt.first_unmarked_range(); 1289 assert_eq!(res, (0, Some(5))); 1290 assert_eq!( 1291 rt.used.iter().next().unwrap(), 1292 (&5, &(5, RangeState::Acked)) 1293 ); 1294 assert_eq!( 1295 rt.used.iter().nth(1).unwrap(), 1296 (&13, &(2, RangeState::Sent)) 1297 ); 1298 assert!(rt.used.iter().nth(2).is_none()); 1299 rt.mark_range(0, 5, RangeState::Sent); 1300 1301 let res = rt.first_unmarked_range(); 1302 assert_eq!(res, (10, Some(3))); 1303 rt.mark_range(10, 3, RangeState::Sent); 1304 1305 let res = rt.first_unmarked_range(); 1306 assert_eq!(res, (15, None)); 1307 } 1308 1309 #[test] 1310 #[allow(clippy::cognitive_complexity)] tx_buffer_next_bytes_1()1311 fn tx_buffer_next_bytes_1() { 1312 let mut txb = TxBuffer::new(); 1313 1314 assert_eq!(txb.avail(), SEND_BUFFER_SIZE); 1315 1316 // Fill the buffer 1317 assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); 1318 assert!(matches!(txb.next_bytes(), 1319 Some((0, x)) if x.len()==SEND_BUFFER_SIZE 1320 && x.iter().all(|ch| *ch == 1))); 1321 1322 // Mark almost all as sent. Get what's left 1323 let one_byte_from_end = SEND_BUFFER_SIZE as u64 - 1; 1324 txb.mark_as_sent(0, one_byte_from_end as usize); 1325 assert!(matches!(txb.next_bytes(), 1326 Some((start, x)) if x.len() == 1 1327 && start == one_byte_from_end 1328 && x.iter().all(|ch| *ch == 1))); 1329 1330 // Mark all as sent. Get nothing 1331 txb.mark_as_sent(0, SEND_BUFFER_SIZE); 1332 assert!(matches!(txb.next_bytes(), None)); 1333 1334 // Mark as lost. Get it again 1335 txb.mark_as_lost(one_byte_from_end, 1); 1336 assert!(matches!(txb.next_bytes(), 1337 Some((start, x)) if x.len() == 1 1338 && start == one_byte_from_end 1339 && x.iter().all(|ch| *ch == 1))); 1340 1341 // Mark a larger range lost, including beyond what's in the buffer even. 1342 // Get a little more 1343 let five_bytes_from_end = SEND_BUFFER_SIZE as u64 - 5; 1344 txb.mark_as_lost(five_bytes_from_end, 100); 1345 assert!(matches!(txb.next_bytes(), 1346 Some((start, x)) if x.len() == 5 1347 && start == five_bytes_from_end 1348 && x.iter().all(|ch| *ch == 1))); 1349 1350 // Contig acked range at start means it can be removed from buffer 1351 // Impl of vecdeque should now result in a split buffer when more data 1352 // is sent 1353 txb.mark_as_acked(0, five_bytes_from_end as usize); 1354 assert_eq!(txb.send(&[2; 30]), 30); 1355 // Just get 5 even though there is more 1356 assert!(matches!(txb.next_bytes(), 1357 Some((start, x)) if x.len() == 5 1358 && start == five_bytes_from_end 1359 && x.iter().all(|ch| *ch == 1))); 1360 assert_eq!(txb.retired, five_bytes_from_end); 1361 assert_eq!(txb.buffered(), 35); 1362 1363 // Marking that bit as sent should let the last contig bit be returned 1364 // when called again 1365 txb.mark_as_sent(five_bytes_from_end, 5); 1366 assert!(matches!(txb.next_bytes(), 1367 Some((start, x)) if x.len() == 30 1368 && start == SEND_BUFFER_SIZE as u64 1369 && x.iter().all(|ch| *ch == 2))); 1370 } 1371 1372 #[test] tx_buffer_next_bytes_2()1373 fn tx_buffer_next_bytes_2() { 1374 let mut txb = TxBuffer::new(); 1375 1376 assert_eq!(txb.avail(), SEND_BUFFER_SIZE); 1377 1378 // Fill the buffer 1379 assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); 1380 assert!(matches!(txb.next_bytes(), 1381 Some((0, x)) if x.len()==SEND_BUFFER_SIZE 1382 && x.iter().all(|ch| *ch == 1))); 1383 1384 // As above 1385 let forty_bytes_from_end = SEND_BUFFER_SIZE as u64 - 40; 1386 1387 txb.mark_as_acked(0, forty_bytes_from_end as usize); 1388 assert!(matches!(txb.next_bytes(), 1389 Some((start, x)) if x.len() == 40 1390 && start == forty_bytes_from_end 1391 )); 1392 1393 // Valid new data placed in split locations 1394 assert_eq!(txb.send(&[2; 100]), 100); 1395 1396 // Mark a little more as sent 1397 txb.mark_as_sent(forty_bytes_from_end, 10); 1398 let thirty_bytes_from_end = forty_bytes_from_end + 10; 1399 assert!(matches!(txb.next_bytes(), 1400 Some((start, x)) if x.len() == 30 1401 && start == thirty_bytes_from_end 1402 && x.iter().all(|ch| *ch == 1))); 1403 1404 // Mark a range 'A' in second slice as sent. Should still return the same 1405 let range_a_start = SEND_BUFFER_SIZE as u64 + 30; 1406 let range_a_end = range_a_start + 10; 1407 txb.mark_as_sent(range_a_start, 10); 1408 assert!(matches!(txb.next_bytes(), 1409 Some((start, x)) if x.len() == 30 1410 && start == thirty_bytes_from_end 1411 && x.iter().all(|ch| *ch == 1))); 1412 1413 // Ack entire first slice and into second slice 1414 let ten_bytes_past_end = SEND_BUFFER_SIZE as u64 + 10; 1415 txb.mark_as_acked(0, ten_bytes_past_end as usize); 1416 1417 // Get up to marked range A 1418 assert!(matches!(txb.next_bytes(), 1419 Some((start, x)) if x.len() == 20 1420 && start == ten_bytes_past_end 1421 && x.iter().all(|ch| *ch == 2))); 1422 1423 txb.mark_as_sent(ten_bytes_past_end, 20); 1424 1425 // Get bit after earlier marked range A 1426 assert!(matches!(txb.next_bytes(), 1427 Some((start, x)) if x.len() == 60 1428 && start == range_a_end 1429 && x.iter().all(|ch| *ch == 2))); 1430 1431 // No more bytes. 1432 txb.mark_as_sent(range_a_end, 60); 1433 assert!(matches!(txb.next_bytes(), None)); 1434 } 1435 1436 #[test] test_stream_tx()1437 fn test_stream_tx() { 1438 let conn_fc = connection_fc(4096); 1439 let conn_events = ConnectionEvents::default(); 1440 1441 let mut s = SendStream::new(4.into(), 1024, Rc::clone(&conn_fc), conn_events); 1442 1443 let res = s.send(&[4; 100]).unwrap(); 1444 assert_eq!(res, 100); 1445 s.mark_as_sent(0, 50, false); 1446 if let SendStreamState::Send { fc, .. } = s.state() { 1447 assert_eq!(fc.used(), 100); 1448 } else { 1449 panic!("unexpected stream state"); 1450 } 1451 1452 // Should hit stream flow control limit before filling up send buffer 1453 let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); 1454 assert_eq!(res, 1024 - 100); 1455 1456 // should do nothing, max stream data already 1024 1457 s.set_max_stream_data(1024); 1458 let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); 1459 assert_eq!(res, 0); 1460 1461 // should now hit the conn flow control (4096) 1462 s.set_max_stream_data(1_048_576); 1463 let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); 1464 assert_eq!(res, 3072); 1465 1466 // should now hit the tx buffer size 1467 conn_fc.borrow_mut().update(SEND_BUFFER_SIZE as u64); 1468 let res = s.send(&[4; SEND_BUFFER_SIZE + 100]).unwrap(); 1469 assert_eq!(res, SEND_BUFFER_SIZE - 4096); 1470 1471 // TODO(agrover@mozilla.com): test ooo acks somehow 1472 s.mark_as_acked(0, 40, false); 1473 } 1474 1475 #[test] test_tx_buffer_acks()1476 fn test_tx_buffer_acks() { 1477 let mut tx = TxBuffer::new(); 1478 assert_eq!(tx.send(&[4; 100]), 100); 1479 let res = tx.next_bytes().unwrap(); 1480 assert_eq!(res.0, 0); 1481 assert_eq!(res.1.len(), 100); 1482 tx.mark_as_sent(0, 100); 1483 let res = tx.next_bytes(); 1484 assert_eq!(res, None); 1485 1486 tx.mark_as_acked(0, 100); 1487 let res = tx.next_bytes(); 1488 assert_eq!(res, None); 1489 } 1490 1491 #[test] send_stream_writable_event_gen()1492 fn send_stream_writable_event_gen() { 1493 let conn_fc = connection_fc(2); 1494 let mut conn_events = ConnectionEvents::default(); 1495 1496 let mut s = SendStream::new(4.into(), 0, Rc::clone(&conn_fc), conn_events.clone()); 1497 1498 // Stream is initially blocked (conn:2, stream:0) 1499 // and will not accept data. 1500 assert_eq!(s.send(b"hi").unwrap(), 0); 1501 1502 // increasing to (conn:2, stream:2) will allow 2 bytes, and also 1503 // generate a SendStreamWritable event. 1504 s.set_max_stream_data(2); 1505 let evts = conn_events.events().collect::<Vec<_>>(); 1506 assert_eq!(evts.len(), 1); 1507 assert!(matches!( 1508 evts[0], 1509 ConnectionEvent::SendStreamWritable { .. } 1510 )); 1511 assert_eq!(s.send(b"hello").unwrap(), 2); 1512 1513 // increasing to (conn:2, stream:4) will not generate an event or allow 1514 // sending anything. 1515 s.set_max_stream_data(4); 1516 assert_eq!(conn_events.events().count(), 0); 1517 assert_eq!(s.send(b"hello").unwrap(), 0); 1518 1519 // Increasing conn max (conn:4, stream:4) will unblock but not emit 1520 // event b/c that happens in Connection::emit_frame() (tested in 1521 // connection.rs) 1522 assert!(conn_fc.borrow_mut().update(4)); 1523 assert_eq!(conn_events.events().count(), 0); 1524 assert_eq!(s.avail(), 2); 1525 assert_eq!(s.send(b"hello").unwrap(), 2); 1526 1527 // No event because still blocked by conn 1528 s.set_max_stream_data(1_000_000_000); 1529 assert_eq!(conn_events.events().count(), 0); 1530 1531 // No event because happens in emit_frame() 1532 conn_fc.borrow_mut().update(1_000_000_000); 1533 assert_eq!(conn_events.events().count(), 0); 1534 1535 // Unblocking both by a large amount will cause avail() to be limited by 1536 // tx buffer size. 1537 assert_eq!(s.avail(), SEND_BUFFER_SIZE - 4); 1538 1539 assert_eq!( 1540 s.send(&[b'a'; SEND_BUFFER_SIZE]).unwrap(), 1541 SEND_BUFFER_SIZE - 4 1542 ); 1543 1544 // No event because still blocked by tx buffer full 1545 s.set_max_stream_data(2_000_000_000); 1546 assert_eq!(conn_events.events().count(), 0); 1547 assert_eq!(s.send(b"hello").unwrap(), 0); 1548 } 1549 1550 #[test] send_stream_writable_event_new_stream()1551 fn send_stream_writable_event_new_stream() { 1552 let conn_fc = connection_fc(2); 1553 let mut conn_events = ConnectionEvents::default(); 1554 1555 let _s = SendStream::new(4.into(), 100, conn_fc, conn_events.clone()); 1556 1557 // Creating a new stream with conn and stream credits should result in 1558 // an event. 1559 let evts = conn_events.events().collect::<Vec<_>>(); 1560 assert_eq!(evts.len(), 1); 1561 assert!(matches!( 1562 evts[0], 1563 ConnectionEvent::SendStreamWritable { .. } 1564 )); 1565 } 1566 as_stream_token(t: &RecoveryToken) -> &SendStreamRecoveryToken1567 fn as_stream_token(t: &RecoveryToken) -> &SendStreamRecoveryToken { 1568 if let RecoveryToken::Stream(StreamRecoveryToken::Stream(rt)) = &t { 1569 rt 1570 } else { 1571 panic!(); 1572 } 1573 } 1574 1575 #[test] 1576 // Verify lost frames handle fin properly send_stream_get_frame_data()1577 fn send_stream_get_frame_data() { 1578 let conn_fc = connection_fc(100); 1579 let conn_events = ConnectionEvents::default(); 1580 1581 let mut s = SendStream::new(0.into(), 100, conn_fc, conn_events); 1582 s.send(&[0; 10]).unwrap(); 1583 s.close(); 1584 1585 let mut ss = SendStreams::default(); 1586 ss.insert(StreamId::from(0), s); 1587 1588 let mut tokens = Vec::new(); 1589 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1590 1591 // Write a small frame: no fin. 1592 let written = builder.len(); 1593 builder.set_limit(written + 6); 1594 ss.write_frames( 1595 TransmissionPriority::default(), 1596 &mut builder, 1597 &mut tokens, 1598 &mut FrameStats::default(), 1599 ); 1600 assert_eq!(builder.len(), written + 6); 1601 assert_eq!(tokens.len(), 1); 1602 let f1_token = tokens.remove(0); 1603 assert!(!as_stream_token(&f1_token).fin); 1604 1605 // Write the rest: fin. 1606 let written = builder.len(); 1607 builder.set_limit(written + 200); 1608 ss.write_frames( 1609 TransmissionPriority::default(), 1610 &mut builder, 1611 &mut tokens, 1612 &mut FrameStats::default(), 1613 ); 1614 assert_eq!(builder.len(), written + 10); 1615 assert_eq!(tokens.len(), 1); 1616 let f2_token = tokens.remove(0); 1617 assert!(as_stream_token(&f2_token).fin); 1618 1619 // Should be no more data to frame. 1620 let written = builder.len(); 1621 ss.write_frames( 1622 TransmissionPriority::default(), 1623 &mut builder, 1624 &mut tokens, 1625 &mut FrameStats::default(), 1626 ); 1627 assert_eq!(builder.len(), written); 1628 assert!(tokens.is_empty()); 1629 1630 // Mark frame 1 as lost 1631 ss.lost(as_stream_token(&f1_token)); 1632 1633 // Next frame should not set fin even though stream has fin but frame 1634 // does not include end of stream 1635 let written = builder.len(); 1636 ss.write_frames( 1637 TransmissionPriority::default() + RetransmissionPriority::default(), 1638 &mut builder, 1639 &mut tokens, 1640 &mut FrameStats::default(), 1641 ); 1642 assert_eq!(builder.len(), written + 7); // Needs a length this time. 1643 assert_eq!(tokens.len(), 1); 1644 let f4_token = tokens.remove(0); 1645 assert!(!as_stream_token(&f4_token).fin); 1646 1647 // Mark frame 2 as lost 1648 ss.lost(as_stream_token(&f2_token)); 1649 1650 // Next frame should set fin because it includes end of stream 1651 let written = builder.len(); 1652 ss.write_frames( 1653 TransmissionPriority::default() + RetransmissionPriority::default(), 1654 &mut builder, 1655 &mut tokens, 1656 &mut FrameStats::default(), 1657 ); 1658 assert_eq!(builder.len(), written + 10); 1659 assert_eq!(tokens.len(), 1); 1660 let f5_token = tokens.remove(0); 1661 assert!(as_stream_token(&f5_token).fin); 1662 } 1663 1664 #[test] 1665 #[allow(clippy::cognitive_complexity)] 1666 // Verify lost frames handle fin properly with zero length fin send_stream_get_frame_zerolength_fin()1667 fn send_stream_get_frame_zerolength_fin() { 1668 let conn_fc = connection_fc(100); 1669 let conn_events = ConnectionEvents::default(); 1670 1671 let mut s = SendStream::new(0.into(), 100, conn_fc, conn_events); 1672 s.send(&[0; 10]).unwrap(); 1673 1674 let mut ss = SendStreams::default(); 1675 ss.insert(StreamId::from(0), s); 1676 1677 let mut tokens = Vec::new(); 1678 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1679 ss.write_frames( 1680 TransmissionPriority::default(), 1681 &mut builder, 1682 &mut tokens, 1683 &mut FrameStats::default(), 1684 ); 1685 let f1_token = tokens.remove(0); 1686 assert_eq!(as_stream_token(&f1_token).offset, 0); 1687 assert_eq!(as_stream_token(&f1_token).length, 10); 1688 assert!(!as_stream_token(&f1_token).fin); 1689 1690 // Should be no more data to frame 1691 ss.write_frames( 1692 TransmissionPriority::default(), 1693 &mut builder, 1694 &mut tokens, 1695 &mut FrameStats::default(), 1696 ); 1697 assert!(tokens.is_empty()); 1698 1699 ss.get_mut(StreamId::from(0)).unwrap().close(); 1700 1701 ss.write_frames( 1702 TransmissionPriority::default(), 1703 &mut builder, 1704 &mut tokens, 1705 &mut FrameStats::default(), 1706 ); 1707 let f2_token = tokens.remove(0); 1708 assert_eq!(as_stream_token(&f2_token).offset, 10); 1709 assert_eq!(as_stream_token(&f2_token).length, 0); 1710 assert!(as_stream_token(&f2_token).fin); 1711 1712 // Mark frame 2 as lost 1713 ss.lost(as_stream_token(&f2_token)); 1714 1715 // Next frame should set fin 1716 ss.write_frames( 1717 TransmissionPriority::default(), 1718 &mut builder, 1719 &mut tokens, 1720 &mut FrameStats::default(), 1721 ); 1722 let f3_token = tokens.remove(0); 1723 assert_eq!(as_stream_token(&f3_token).offset, 10); 1724 assert_eq!(as_stream_token(&f3_token).length, 0); 1725 assert!(as_stream_token(&f3_token).fin); 1726 1727 // Mark frame 1 as lost 1728 ss.lost(as_stream_token(&f1_token)); 1729 1730 // Next frame should set fin and include all data 1731 ss.write_frames( 1732 TransmissionPriority::default(), 1733 &mut builder, 1734 &mut tokens, 1735 &mut FrameStats::default(), 1736 ); 1737 let f4_token = tokens.remove(0); 1738 assert_eq!(as_stream_token(&f4_token).offset, 0); 1739 assert_eq!(as_stream_token(&f4_token).length, 10); 1740 assert!(as_stream_token(&f4_token).fin); 1741 } 1742 1743 #[test] data_blocked()1744 fn data_blocked() { 1745 let conn_fc = connection_fc(5); 1746 let conn_events = ConnectionEvents::default(); 1747 1748 let stream_id = StreamId::from(4); 1749 let mut s = SendStream::new(stream_id, 2, Rc::clone(&conn_fc), conn_events); 1750 1751 // Only two bytes can be sent due to the stream limit. 1752 assert_eq!(s.send(b"abc").unwrap(), 2); 1753 assert_eq!(s.next_bytes(false), Some((0, &b"ab"[..]))); 1754 1755 // This doesn't report blocking yet. 1756 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1757 let mut tokens = Vec::new(); 1758 let mut stats = FrameStats::default(); 1759 s.write_blocked_frame( 1760 TransmissionPriority::default(), 1761 &mut builder, 1762 &mut tokens, 1763 &mut stats, 1764 ); 1765 assert_eq!(stats.stream_data_blocked, 0); 1766 1767 // Blocking is reported after sending the last available credit. 1768 s.mark_as_sent(0, 2, false); 1769 s.write_blocked_frame( 1770 TransmissionPriority::default(), 1771 &mut builder, 1772 &mut tokens, 1773 &mut stats, 1774 ); 1775 assert_eq!(stats.stream_data_blocked, 1); 1776 1777 // Now increase the stream limit and test the connection limit. 1778 s.set_max_stream_data(10); 1779 1780 assert_eq!(s.send(b"abcd").unwrap(), 3); 1781 assert_eq!(s.next_bytes(false), Some((2, &b"abc"[..]))); 1782 // DATA_BLOCKED is not sent yet. 1783 conn_fc 1784 .borrow_mut() 1785 .write_frames(&mut builder, &mut tokens, &mut stats); 1786 assert_eq!(stats.data_blocked, 0); 1787 1788 // DATA_BLOCKED is queued once bytes using all credit are sent. 1789 s.mark_as_sent(2, 3, false); 1790 conn_fc 1791 .borrow_mut() 1792 .write_frames(&mut builder, &mut tokens, &mut stats); 1793 assert_eq!(stats.data_blocked, 1); 1794 } 1795 1796 #[test] data_blocked_atomic()1797 fn data_blocked_atomic() { 1798 let conn_fc = connection_fc(5); 1799 let conn_events = ConnectionEvents::default(); 1800 1801 let stream_id = StreamId::from(4); 1802 let mut s = SendStream::new(stream_id, 2, Rc::clone(&conn_fc), conn_events); 1803 1804 // Stream is initially blocked (conn:5, stream:2) 1805 // and will not accept atomic write of 3 bytes. 1806 assert_eq!(s.send_atomic(b"abc").unwrap(), 0); 1807 1808 // Assert that STREAM_DATA_BLOCKED is sent. 1809 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1810 let mut tokens = Vec::new(); 1811 let mut stats = FrameStats::default(); 1812 s.write_blocked_frame( 1813 TransmissionPriority::default(), 1814 &mut builder, 1815 &mut tokens, 1816 &mut stats, 1817 ); 1818 assert_eq!(stats.stream_data_blocked, 1); 1819 1820 // Assert that a non-atomic write works. 1821 assert_eq!(s.send(b"abc").unwrap(), 2); 1822 assert_eq!(s.next_bytes(false), Some((0, &b"ab"[..]))); 1823 s.mark_as_sent(0, 2, false); 1824 1825 // Set limits to (conn:5, stream:10). 1826 s.set_max_stream_data(10); 1827 1828 // An atomic write of 4 bytes exceeds the remaining limit of 3. 1829 assert_eq!(s.send_atomic(b"abcd").unwrap(), 0); 1830 1831 // Assert that DATA_BLOCKED is sent. 1832 conn_fc 1833 .borrow_mut() 1834 .write_frames(&mut builder, &mut tokens, &mut stats); 1835 assert_eq!(stats.data_blocked, 1); 1836 1837 // Check that a non-atomic write works. 1838 assert_eq!(s.send(b"abcd").unwrap(), 3); 1839 assert_eq!(s.next_bytes(false), Some((2, &b"abc"[..]))); 1840 s.mark_as_sent(2, 3, false); 1841 1842 // Increase limits to (conn:15, stream:15). 1843 s.set_max_stream_data(15); 1844 conn_fc.borrow_mut().update(15); 1845 1846 // Check that atomic writing right up to the limit works. 1847 assert_eq!(s.send_atomic(b"abcdefghij").unwrap(), 10); 1848 } 1849 1850 #[test] ack_fin_first()1851 fn ack_fin_first() { 1852 const MESSAGE: &[u8] = b"hello"; 1853 let len_u64 = u64::try_from(MESSAGE.len()).unwrap(); 1854 1855 let conn_fc = connection_fc(len_u64); 1856 let conn_events = ConnectionEvents::default(); 1857 1858 let mut s = SendStream::new(StreamId::new(100), 0, conn_fc, conn_events); 1859 s.set_max_stream_data(len_u64); 1860 1861 // Send all the data, then the fin. 1862 let _ = s.send(MESSAGE).unwrap(); 1863 s.mark_as_sent(0, MESSAGE.len(), false); 1864 s.close(); 1865 s.mark_as_sent(len_u64, 0, true); 1866 1867 // Ack the fin, then the data. 1868 s.mark_as_acked(len_u64, 0, true); 1869 s.mark_as_acked(0, MESSAGE.len(), false); 1870 assert!(s.is_terminal()); 1871 } 1872 1873 #[test] ack_then_lose_fin()1874 fn ack_then_lose_fin() { 1875 const MESSAGE: &[u8] = b"hello"; 1876 let len_u64 = u64::try_from(MESSAGE.len()).unwrap(); 1877 1878 let conn_fc = connection_fc(len_u64); 1879 let conn_events = ConnectionEvents::default(); 1880 1881 let id = StreamId::new(100); 1882 let mut s = SendStream::new(id, 0, conn_fc, conn_events); 1883 s.set_max_stream_data(len_u64); 1884 1885 // Send all the data, then the fin. 1886 let _ = s.send(MESSAGE).unwrap(); 1887 s.mark_as_sent(0, MESSAGE.len(), false); 1888 s.close(); 1889 s.mark_as_sent(len_u64, 0, true); 1890 1891 // Ack the fin, then mark it lost. 1892 s.mark_as_acked(len_u64, 0, true); 1893 s.mark_as_lost(len_u64, 0, true); 1894 1895 // No frame should be sent here. 1896 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1897 let mut tokens = Vec::new(); 1898 let mut stats = FrameStats::default(); 1899 s.write_stream_frame( 1900 TransmissionPriority::default(), 1901 &mut builder, 1902 &mut tokens, 1903 &mut stats, 1904 ); 1905 assert_eq!(stats.stream, 0); 1906 } 1907 1908 /// Create a `SendStream` and force it into a state where it believes that 1909 /// `offset` bytes have already been sent and acknowledged. stream_with_sent(stream: u64, offset: usize) -> SendStream1910 fn stream_with_sent(stream: u64, offset: usize) -> SendStream { 1911 const MAX_VARINT: u64 = (1 << 62) - 1; 1912 1913 let conn_fc = connection_fc(MAX_VARINT); 1914 let mut s = SendStream::new( 1915 StreamId::from(stream), 1916 MAX_VARINT, 1917 conn_fc, 1918 ConnectionEvents::default(), 1919 ); 1920 1921 let mut send_buf = TxBuffer::new(); 1922 send_buf.retired = u64::try_from(offset).unwrap(); 1923 send_buf.ranges.mark_range(0, offset, RangeState::Acked); 1924 let mut fc = SenderFlowControl::new(StreamId::from(stream), MAX_VARINT); 1925 fc.consume(offset); 1926 let conn_fc = Rc::new(RefCell::new(SenderFlowControl::new((), MAX_VARINT))); 1927 s.state = SendStreamState::Send { 1928 fc, 1929 conn_fc, 1930 send_buf, 1931 }; 1932 s 1933 } 1934 frame_sent_sid(stream: u64, offset: usize, len: usize, fin: bool, space: usize) -> bool1935 fn frame_sent_sid(stream: u64, offset: usize, len: usize, fin: bool, space: usize) -> bool { 1936 const BUF: &[u8] = &[0x42; 128]; 1937 1938 qtrace!( 1939 "frame_sent stream={} offset={} len={} fin={}, space={}", 1940 stream, 1941 offset, 1942 len, 1943 fin, 1944 space 1945 ); 1946 1947 let mut s = stream_with_sent(stream, offset); 1948 1949 // Now write out the proscribed data and maybe close. 1950 if len > 0 { 1951 s.send(&BUF[..len]).unwrap(); 1952 } 1953 if fin { 1954 s.close(); 1955 } 1956 1957 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1958 let header_len = builder.len(); 1959 builder.set_limit(header_len + space); 1960 1961 let mut tokens = Vec::new(); 1962 let mut stats = FrameStats::default(); 1963 s.write_stream_frame( 1964 TransmissionPriority::default(), 1965 &mut builder, 1966 &mut tokens, 1967 &mut stats, 1968 ); 1969 qtrace!("STREAM frame: {}", hex_with_len(&builder[header_len..])); 1970 stats.stream > 0 1971 } 1972 frame_sent(offset: usize, len: usize, fin: bool, space: usize) -> bool1973 fn frame_sent(offset: usize, len: usize, fin: bool, space: usize) -> bool { 1974 frame_sent_sid(0, offset, len, fin, space) 1975 } 1976 1977 #[test] stream_frame_empty()1978 fn stream_frame_empty() { 1979 // Stream frames with empty data and no fin never work. 1980 assert!(!frame_sent(10, 0, false, 2)); 1981 assert!(!frame_sent(10, 0, false, 3)); 1982 assert!(!frame_sent(10, 0, false, 4)); 1983 assert!(!frame_sent(10, 0, false, 5)); 1984 assert!(!frame_sent(10, 0, false, 100)); 1985 1986 // Empty data with fin is only a problem if there is no space. 1987 assert!(!frame_sent(0, 0, true, 1)); 1988 assert!(frame_sent(0, 0, true, 2)); 1989 assert!(!frame_sent(10, 0, true, 2)); 1990 assert!(frame_sent(10, 0, true, 3)); 1991 assert!(frame_sent(10, 0, true, 4)); 1992 assert!(frame_sent(10, 0, true, 5)); 1993 assert!(frame_sent(10, 0, true, 100)); 1994 } 1995 1996 #[test] stream_frame_minimum()1997 fn stream_frame_minimum() { 1998 // Add minimum data 1999 assert!(!frame_sent(10, 1, false, 3)); 2000 assert!(!frame_sent(10, 1, true, 3)); 2001 assert!(frame_sent(10, 1, false, 4)); 2002 assert!(frame_sent(10, 1, true, 4)); 2003 assert!(frame_sent(10, 1, false, 5)); 2004 assert!(frame_sent(10, 1, true, 5)); 2005 assert!(frame_sent(10, 1, false, 100)); 2006 assert!(frame_sent(10, 1, true, 100)); 2007 } 2008 2009 #[test] stream_frame_more()2010 fn stream_frame_more() { 2011 // Try more data 2012 assert!(!frame_sent(10, 100, false, 3)); 2013 assert!(!frame_sent(10, 100, true, 3)); 2014 assert!(frame_sent(10, 100, false, 4)); 2015 assert!(frame_sent(10, 100, true, 4)); 2016 assert!(frame_sent(10, 100, false, 5)); 2017 assert!(frame_sent(10, 100, true, 5)); 2018 assert!(frame_sent(10, 100, false, 100)); 2019 assert!(frame_sent(10, 100, true, 100)); 2020 2021 assert!(frame_sent(10, 100, false, 1000)); 2022 assert!(frame_sent(10, 100, true, 1000)); 2023 } 2024 2025 #[test] stream_frame_big_id()2026 fn stream_frame_big_id() { 2027 // A value that encodes to the largest varint. 2028 const BIG: u64 = 1 << 30; 2029 const BIGSZ: usize = 1 << 30; 2030 2031 assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 16)); 2032 assert!(!frame_sent_sid(BIG, BIGSZ, 0, true, 16)); 2033 assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 17)); 2034 assert!(frame_sent_sid(BIG, BIGSZ, 0, true, 17)); 2035 assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 18)); 2036 assert!(frame_sent_sid(BIG, BIGSZ, 0, true, 18)); 2037 2038 assert!(!frame_sent_sid(BIG, BIGSZ, 1, false, 17)); 2039 assert!(!frame_sent_sid(BIG, BIGSZ, 1, true, 17)); 2040 assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 18)); 2041 assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 18)); 2042 assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 19)); 2043 assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 19)); 2044 assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 100)); 2045 assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 100)); 2046 } 2047 stream_frame_at_boundary(data: &[u8])2048 fn stream_frame_at_boundary(data: &[u8]) { 2049 fn send_with_extra_capacity(data: &[u8], extra: usize, expect_full: bool) -> Vec<u8> { 2050 qtrace!("send_with_extra_capacity {} + {}", data.len(), extra); 2051 let mut s = stream_with_sent(0, 0); 2052 s.send(data).unwrap(); 2053 s.close(); 2054 2055 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 2056 let header_len = builder.len(); 2057 // Add 2 for the frame type and stream ID, then add the extra. 2058 builder.set_limit(header_len + data.len() + 2 + extra); 2059 let mut tokens = Vec::new(); 2060 let mut stats = FrameStats::default(); 2061 s.write_stream_frame( 2062 TransmissionPriority::default(), 2063 &mut builder, 2064 &mut tokens, 2065 &mut stats, 2066 ); 2067 assert_eq!(stats.stream, 1); 2068 assert_eq!(builder.is_full(), expect_full); 2069 Vec::from(Encoder::from(builder)).split_off(header_len) 2070 } 2071 2072 // The minimum amount of extra space for getting another frame in. 2073 let mut enc = Encoder::new(); 2074 enc.encode_varint(u64::try_from(data.len()).unwrap()); 2075 let len_buf = Vec::from(enc); 2076 let minimum_extra = len_buf.len() + PacketBuilder::MINIMUM_FRAME_SIZE; 2077 2078 // For anything short of the minimum extra, the frame should fill the packet. 2079 for i in 0..minimum_extra { 2080 let frame = send_with_extra_capacity(data, i, true); 2081 let (header, body) = frame.split_at(2); 2082 assert_eq!(header, &[0b1001, 0]); 2083 assert_eq!(body, data); 2084 } 2085 2086 // Once there is space for another packet AND a length field, 2087 // then a length will be added. 2088 let frame = send_with_extra_capacity(data, minimum_extra, false); 2089 let (header, rest) = frame.split_at(2); 2090 assert_eq!(header, &[0b1011, 0]); 2091 let (len, body) = rest.split_at(len_buf.len()); 2092 assert_eq!(len, &len_buf); 2093 assert_eq!(body, data); 2094 } 2095 2096 /// 16383/16384 is an odd boundary in STREAM frame construction. 2097 /// That is the boundary where a length goes from 2 bytes to 4 bytes. 2098 /// Test that we correctly add a length field to the frame; and test 2099 /// that if we don't, then we don't allow other frames to be added. 2100 #[test] stream_frame_16384()2101 fn stream_frame_16384() { 2102 stream_frame_at_boundary(&[4; 16383]); 2103 stream_frame_at_boundary(&[4; 16384]); 2104 } 2105 2106 /// 63/64 is the other odd boundary. 2107 #[test] stream_frame_64()2108 fn stream_frame_64() { 2109 stream_frame_at_boundary(&[2; 63]); 2110 stream_frame_at_boundary(&[2; 64]); 2111 } 2112 } 2113