1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 // Buffering data to send until it is acked. 8 9 use std::cell::RefCell; 10 use std::cmp::{max, min, Ordering}; 11 use std::collections::{BTreeMap, VecDeque}; 12 use std::convert::TryFrom; 13 use std::mem; 14 use std::ops::Add; 15 use std::rc::Rc; 16 17 use indexmap::IndexMap; 18 use smallvec::SmallVec; 19 20 use neqo_common::{qdebug, qerror, qinfo, qtrace, Encoder, Role}; 21 22 use crate::events::ConnectionEvents; 23 use crate::fc::SenderFlowControl; 24 use crate::frame::{Frame, FRAME_TYPE_RESET_STREAM}; 25 use crate::packet::PacketBuilder; 26 use crate::recovery::RecoveryToken; 27 use crate::stats::FrameStats; 28 use crate::stream_id::StreamId; 29 use crate::tparams::{self, TransportParameters}; 30 use crate::{AppError, Error, Res}; 31 32 pub const SEND_BUFFER_SIZE: usize = 0x10_0000; // 1 MiB 33 34 /// The priority that is assigned to sending data for the stream. 35 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 36 pub enum TransmissionPriority { 37 /// This stream is more important than the functioning of the connection. 38 /// Don't use this priority unless the stream really is that important. 39 /// A stream at this priority can starve out other connection functions, 40 /// including flow control, which could be very bad. 41 Critical, 42 /// The stream is very important. Stream data will be written ahead of 43 /// some of the less critical connection functions, like path validation, 44 /// connection ID management, and session tickets. 45 Important, 46 /// High priority streams are important, but not enough to disrupt 47 /// connection operation. They go ahead of session tickets though. 48 High, 49 /// The default priority. 50 Normal, 51 /// Low priority streams get sent last. 52 Low, 53 } 54 55 impl Default for TransmissionPriority { default() -> Self56 fn default() -> Self { 57 Self::Normal 58 } 59 } 60 61 impl PartialOrd for TransmissionPriority { partial_cmp(&self, other: &Self) -> Option<Ordering>62 fn partial_cmp(&self, other: &Self) -> Option<Ordering> { 63 Some(self.cmp(other)) 64 } 65 } 66 67 impl Ord for TransmissionPriority { cmp(&self, other: &Self) -> Ordering68 fn cmp(&self, other: &Self) -> Ordering { 69 if self == other { 70 return Ordering::Equal; 71 } 72 match (self, other) { 73 (Self::Critical, _) => Ordering::Greater, 74 (_, Self::Critical) => Ordering::Less, 75 (Self::Important, _) => Ordering::Greater, 76 (_, Self::Important) => Ordering::Less, 77 (Self::High, _) => Ordering::Greater, 78 (_, Self::High) => Ordering::Less, 79 (Self::Normal, _) => Ordering::Greater, 80 (_, Self::Normal) => Ordering::Less, 81 _ => unreachable!(), 82 } 83 } 84 } 85 86 impl Add<RetransmissionPriority> for TransmissionPriority { 87 type Output = Self; add(self, rhs: RetransmissionPriority) -> Self::Output88 fn add(self, rhs: RetransmissionPriority) -> Self::Output { 89 match rhs { 90 RetransmissionPriority::Fixed(fixed) => fixed, 91 RetransmissionPriority::Same => self, 92 RetransmissionPriority::Higher => match self { 93 Self::Critical => Self::Critical, 94 Self::Important | Self::High => Self::Important, 95 Self::Normal => Self::High, 96 Self::Low => Self::Normal, 97 }, 98 RetransmissionPriority::MuchHigher => match self { 99 Self::Critical | Self::Important => Self::Critical, 100 Self::High | Self::Normal => Self::Important, 101 Self::Low => Self::High, 102 }, 103 } 104 } 105 } 106 107 /// If data is lost, this determines the priority that applies to retransmissions 108 /// of that data. 109 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 110 pub enum RetransmissionPriority { 111 /// Prioritize retransmission at a fixed priority. 112 /// With this, it is possible to prioritize retransmissions lower than transmissions. 113 /// Doing that can create a deadlock with flow control which might cause the connection 114 /// to stall unless new data stops arriving fast enough that retransmissions can complete. 115 Fixed(TransmissionPriority), 116 /// Don't increase priority for retransmission. This is probably not a good idea 117 /// as it could mean starving flow control. 118 Same, 119 /// Increase the priority of retransmissions (the default). 120 /// Retransmissions of `Critical` or `Important` aren't elevated at all. 121 Higher, 122 /// Increase the priority of retransmissions a lot. 123 /// This is useful for streams that are particularly exposed to head-of-line blocking. 124 MuchHigher, 125 } 126 127 impl Default for RetransmissionPriority { default() -> Self128 fn default() -> Self { 129 Self::Higher 130 } 131 } 132 133 #[derive(Debug, PartialEq, Clone, Copy)] 134 enum RangeState { 135 Sent, 136 Acked, 137 } 138 139 /// Track ranges in the stream as sent or acked. Acked implies sent. Not in a 140 /// range implies needing-to-be-sent, either initially or as a retransmission. 141 #[derive(Debug, Default, PartialEq)] 142 struct RangeTracker { 143 // offset, (len, RangeState). Use u64 for len because ranges can exceed 32bits. 144 used: BTreeMap<u64, (u64, RangeState)>, 145 } 146 147 impl RangeTracker { highest_offset(&self) -> u64148 fn highest_offset(&self) -> u64 { 149 self.used 150 .range(..) 151 .next_back() 152 .map_or(0, |(k, (v, _))| *k + *v) 153 } 154 acked_from_zero(&self) -> u64155 fn acked_from_zero(&self) -> u64 { 156 self.used 157 .get(&0) 158 .filter(|(_, state)| *state == RangeState::Acked) 159 .map_or(0, |(v, _)| *v) 160 } 161 162 /// Find the first unmarked range. If all are contiguous, this will return 163 /// (highest_offset(), None). first_unmarked_range(&self) -> (u64, Option<u64>)164 fn first_unmarked_range(&self) -> (u64, Option<u64>) { 165 let mut prev_end = 0; 166 167 for (cur_off, (cur_len, _)) in &self.used { 168 if prev_end == *cur_off { 169 prev_end = cur_off + cur_len; 170 } else { 171 return (prev_end, Some(cur_off - prev_end)); 172 } 173 } 174 (prev_end, None) 175 } 176 177 /// Turn one range into a list of subranges that align with existing 178 /// ranges. 179 /// Check impermissible overlaps in subregions: Sent cannot overwrite Acked. 180 // 181 // e.g. given N is new and ABC are existing: 182 // NNNNNNNNNNNNNNNN 183 // AAAAA BBBCCCCC ...then we want 5 chunks: 184 // 1122222333444555 185 // 186 // but also if we have this: 187 // NNNNNNNNNNNNNNNN 188 // AAAAAAAAAA BBBB ...then break existing A and B ranges up: 189 // 190 // 1111111122222233 191 // aaAAAAAAAA BBbb 192 // 193 // Doing all this work up front should make handling each chunk much 194 // easier. chunk_range_on_edges( &mut self, new_off: u64, new_len: u64, new_state: RangeState, ) -> Vec<(u64, u64, RangeState)>195 fn chunk_range_on_edges( 196 &mut self, 197 new_off: u64, 198 new_len: u64, 199 new_state: RangeState, 200 ) -> Vec<(u64, u64, RangeState)> { 201 let mut tmp_off = new_off; 202 let mut tmp_len = new_len; 203 let mut v = Vec::new(); 204 205 // cut previous overlapping range if needed 206 let prev = self.used.range_mut(..tmp_off).next_back(); 207 if let Some((prev_off, (prev_len, prev_state))) = prev { 208 let prev_state = *prev_state; 209 let overlap = (*prev_off + *prev_len).saturating_sub(new_off); 210 *prev_len -= overlap; 211 if overlap > 0 { 212 self.used.insert(new_off, (overlap, prev_state)); 213 } 214 } 215 216 let mut last_existing_remaining = None; 217 for (off, (len, state)) in self.used.range(tmp_off..tmp_off + tmp_len) { 218 // Create chunk for "overhang" before an existing range 219 if tmp_off < *off { 220 let sub_len = off - tmp_off; 221 v.push((tmp_off, sub_len, new_state)); 222 tmp_off += sub_len; 223 tmp_len -= sub_len; 224 } 225 226 // Create chunk to match existing range 227 let sub_len = min(*len, tmp_len); 228 let remaining_len = len - sub_len; 229 if new_state == RangeState::Sent && *state == RangeState::Acked { 230 qinfo!( 231 "Attempted to downgrade overlapping range Acked range {}-{} with Sent {}-{}", 232 off, 233 len, 234 new_off, 235 new_len 236 ); 237 } else { 238 v.push((tmp_off, sub_len, new_state)); 239 } 240 tmp_off += sub_len; 241 tmp_len -= sub_len; 242 243 if remaining_len > 0 { 244 last_existing_remaining = Some((*off, sub_len, remaining_len, *state)); 245 } 246 } 247 248 // Maybe break last existing range in two so that a final chunk will 249 // have the same length as an existing range entry 250 if let Some((off, sub_len, remaining_len, state)) = last_existing_remaining { 251 *self.used.get_mut(&off).expect("must be there") = (sub_len, state); 252 self.used.insert(off + sub_len, (remaining_len, state)); 253 } 254 255 // Create final chunk if anything remains of the new range 256 if tmp_len > 0 { 257 v.push((tmp_off, tmp_len, new_state)) 258 } 259 260 v 261 } 262 263 /// Merge contiguous Acked ranges into the first entry (0). This range may 264 /// be dropped from the send buffer. coalesce_acked_from_zero(&mut self)265 fn coalesce_acked_from_zero(&mut self) { 266 let acked_range_from_zero = self 267 .used 268 .get_mut(&0) 269 .filter(|(_, state)| *state == RangeState::Acked) 270 .map(|(len, _)| *len); 271 272 if let Some(len_from_zero) = acked_range_from_zero { 273 let mut to_remove = SmallVec::<[_; 8]>::new(); 274 275 let mut new_len_from_zero = len_from_zero; 276 277 // See if there's another Acked range entry contiguous to this one 278 while let Some((next_len, _)) = self 279 .used 280 .get(&new_len_from_zero) 281 .filter(|(_, state)| *state == RangeState::Acked) 282 { 283 to_remove.push(new_len_from_zero); 284 new_len_from_zero += *next_len; 285 } 286 287 if len_from_zero != new_len_from_zero { 288 self.used.get_mut(&0).expect("must be there").0 = new_len_from_zero; 289 } 290 291 for val in to_remove { 292 self.used.remove(&val); 293 } 294 } 295 } 296 mark_range(&mut self, off: u64, len: usize, state: RangeState)297 fn mark_range(&mut self, off: u64, len: usize, state: RangeState) { 298 if len == 0 { 299 qinfo!("mark 0-length range at {}", off); 300 return; 301 } 302 303 let subranges = self.chunk_range_on_edges(off, len as u64, state); 304 305 for (sub_off, sub_len, sub_state) in subranges { 306 self.used.insert(sub_off, (sub_len, sub_state)); 307 } 308 309 self.coalesce_acked_from_zero() 310 } 311 unmark_range(&mut self, off: u64, len: usize)312 fn unmark_range(&mut self, off: u64, len: usize) { 313 if len == 0 { 314 qdebug!("unmark 0-length range at {}", off); 315 return; 316 } 317 318 let len = u64::try_from(len).unwrap(); 319 let end_off = off + len; 320 321 let mut to_remove = SmallVec::<[_; 8]>::new(); 322 let mut to_add = None; 323 324 // Walk backwards through possibly affected existing ranges 325 for (cur_off, (cur_len, cur_state)) in self.used.range_mut(..off + len).rev() { 326 // Maybe fixup range preceding the removed range 327 if *cur_off < off { 328 // Check for overlap 329 if *cur_off + *cur_len > off { 330 if *cur_state == RangeState::Acked { 331 qdebug!( 332 "Attempted to unmark Acked range {}-{} with unmark_range {}-{}", 333 cur_off, 334 cur_len, 335 off, 336 off + len 337 ); 338 } else { 339 *cur_len = off - cur_off; 340 } 341 } 342 break; 343 } 344 345 if *cur_state == RangeState::Acked { 346 qdebug!( 347 "Attempted to unmark Acked range {}-{} with unmark_range {}-{}", 348 cur_off, 349 cur_len, 350 off, 351 off + len 352 ); 353 continue; 354 } 355 356 // Add a new range for old subrange extending beyond 357 // to-be-unmarked range 358 let cur_end_off = cur_off + *cur_len; 359 if cur_end_off > end_off { 360 let new_cur_off = off + len; 361 let new_cur_len = cur_end_off - end_off; 362 assert_eq!(to_add, None); 363 to_add = Some((new_cur_off, new_cur_len, *cur_state)); 364 } 365 366 to_remove.push(*cur_off); 367 } 368 369 for remove_off in to_remove { 370 self.used.remove(&remove_off); 371 } 372 373 if let Some((new_cur_off, new_cur_len, cur_state)) = to_add { 374 self.used.insert(new_cur_off, (new_cur_len, cur_state)); 375 } 376 } 377 378 /// Unmark all sent ranges. unmark_sent(&mut self)379 pub fn unmark_sent(&mut self) { 380 self.unmark_range(0, usize::try_from(self.highest_offset()).unwrap()); 381 } 382 } 383 384 /// Buffer to contain queued bytes and track their state. 385 #[derive(Debug, Default, PartialEq)] 386 pub struct TxBuffer { 387 retired: u64, // contig acked bytes, no longer in buffer 388 send_buf: VecDeque<u8>, // buffer of not-acked bytes 389 ranges: RangeTracker, // ranges in buffer that have been sent or acked 390 } 391 392 impl TxBuffer { new() -> Self393 pub fn new() -> Self { 394 Self::default() 395 } 396 397 /// Attempt to add some or all of the passed-in buffer to the TxBuffer. send(&mut self, buf: &[u8]) -> usize398 pub fn send(&mut self, buf: &[u8]) -> usize { 399 let can_buffer = min(SEND_BUFFER_SIZE - self.buffered(), buf.len()); 400 if can_buffer > 0 { 401 self.send_buf.extend(&buf[..can_buffer]); 402 assert!(self.send_buf.len() <= SEND_BUFFER_SIZE); 403 } 404 can_buffer 405 } 406 next_bytes(&self) -> Option<(u64, &[u8])>407 pub fn next_bytes(&self) -> Option<(u64, &[u8])> { 408 let (start, maybe_len) = self.ranges.first_unmarked_range(); 409 410 if start == self.retired + u64::try_from(self.buffered()).unwrap() { 411 return None; 412 } 413 414 // Convert from ranges-relative-to-zero to 415 // ranges-relative-to-buffer-start 416 let buff_off = usize::try_from(start - self.retired).unwrap(); 417 418 // Deque returns two slices. Create a subslice from whichever 419 // one contains the first unmarked data. 420 let slc = if buff_off < self.send_buf.as_slices().0.len() { 421 &self.send_buf.as_slices().0[buff_off..] 422 } else { 423 &self.send_buf.as_slices().1[buff_off - self.send_buf.as_slices().0.len()..] 424 }; 425 426 let len = if let Some(range_len) = maybe_len { 427 // Truncate if range crosses deque slices 428 min(usize::try_from(range_len).unwrap(), slc.len()) 429 } else { 430 slc.len() 431 }; 432 433 debug_assert!(len > 0); 434 debug_assert!(len <= slc.len()); 435 436 Some((start, &slc[..len])) 437 } 438 mark_as_sent(&mut self, offset: u64, len: usize)439 pub fn mark_as_sent(&mut self, offset: u64, len: usize) { 440 self.ranges.mark_range(offset, len, RangeState::Sent) 441 } 442 mark_as_acked(&mut self, offset: u64, len: usize)443 pub fn mark_as_acked(&mut self, offset: u64, len: usize) { 444 self.ranges.mark_range(offset, len, RangeState::Acked); 445 446 // We can drop contig acked range from the buffer 447 let new_retirable = self.ranges.acked_from_zero() - self.retired; 448 debug_assert!(new_retirable <= self.buffered() as u64); 449 let keep_len = 450 self.buffered() - usize::try_from(new_retirable).expect("should fit in usize"); 451 452 // Truncate front 453 self.send_buf.rotate_left(self.buffered() - keep_len); 454 self.send_buf.truncate(keep_len); 455 456 self.retired += new_retirable; 457 } 458 mark_as_lost(&mut self, offset: u64, len: usize)459 pub fn mark_as_lost(&mut self, offset: u64, len: usize) { 460 self.ranges.unmark_range(offset, len) 461 } 462 463 /// Forget about anything that was marked as sent. unmark_sent(&mut self)464 pub fn unmark_sent(&mut self) { 465 self.ranges.unmark_sent(); 466 } 467 buffered(&self) -> usize468 fn buffered(&self) -> usize { 469 self.send_buf.len() 470 } 471 avail(&self) -> usize472 fn avail(&self) -> usize { 473 SEND_BUFFER_SIZE - self.buffered() 474 } 475 used(&self) -> u64476 fn used(&self) -> u64 { 477 self.retired + u64::try_from(self.buffered()).unwrap() 478 } 479 } 480 481 /// QUIC sending stream states, based on -transport 3.1. 482 #[derive(Debug)] 483 pub(crate) enum SendStreamState { 484 Ready { 485 fc: SenderFlowControl<StreamId>, 486 conn_fc: Rc<RefCell<SenderFlowControl<()>>>, 487 }, 488 Send { 489 fc: SenderFlowControl<StreamId>, 490 conn_fc: Rc<RefCell<SenderFlowControl<()>>>, 491 send_buf: TxBuffer, 492 }, 493 // Note: `DataSent` is entered when the stream is closed, not when all data has been 494 // sent for the first time. 495 DataSent { 496 send_buf: TxBuffer, 497 fin_sent: bool, 498 fin_acked: bool, 499 }, 500 DataRecvd, 501 ResetSent { 502 err: AppError, 503 final_size: u64, 504 priority: Option<TransmissionPriority>, 505 }, 506 ResetRecvd, 507 } 508 509 impl SendStreamState { tx_buf_mut(&mut self) -> Option<&mut TxBuffer>510 fn tx_buf_mut(&mut self) -> Option<&mut TxBuffer> { 511 match self { 512 Self::Send { send_buf, .. } | Self::DataSent { send_buf, .. } => Some(send_buf), 513 Self::Ready { .. } 514 | Self::DataRecvd { .. } 515 | Self::ResetSent { .. } 516 | Self::ResetRecvd => None, 517 } 518 } 519 tx_avail(&self) -> usize520 fn tx_avail(&self) -> usize { 521 match self { 522 // In Ready, TxBuffer not yet allocated but size is known 523 Self::Ready { .. } => SEND_BUFFER_SIZE, 524 Self::Send { send_buf, .. } | Self::DataSent { send_buf, .. } => send_buf.avail(), 525 Self::DataRecvd { .. } | Self::ResetSent { .. } | Self::ResetRecvd => 0, 526 } 527 } 528 name(&self) -> &str529 fn name(&self) -> &str { 530 match self { 531 Self::Ready { .. } => "Ready", 532 Self::Send { .. } => "Send", 533 Self::DataSent { .. } => "DataSent", 534 Self::DataRecvd { .. } => "DataRecvd", 535 Self::ResetSent { .. } => "ResetSent", 536 Self::ResetRecvd => "ResetRecvd", 537 } 538 } 539 transition(&mut self, new_state: Self)540 fn transition(&mut self, new_state: Self) { 541 qtrace!("SendStream state {} -> {}", self.name(), new_state.name()); 542 *self = new_state; 543 } 544 } 545 546 /// Implement a QUIC send stream. 547 #[derive(Debug)] 548 pub struct SendStream { 549 stream_id: StreamId, 550 state: SendStreamState, 551 conn_events: ConnectionEvents, 552 priority: TransmissionPriority, 553 retransmission_priority: RetransmissionPriority, 554 retransmission_offset: u64, 555 } 556 557 impl SendStream { new( stream_id: StreamId, max_stream_data: u64, conn_fc: Rc<RefCell<SenderFlowControl<()>>>, conn_events: ConnectionEvents, ) -> Self558 pub fn new( 559 stream_id: StreamId, 560 max_stream_data: u64, 561 conn_fc: Rc<RefCell<SenderFlowControl<()>>>, 562 conn_events: ConnectionEvents, 563 ) -> Self { 564 let ss = Self { 565 stream_id, 566 state: SendStreamState::Ready { 567 fc: SenderFlowControl::new(stream_id, max_stream_data), 568 conn_fc, 569 }, 570 conn_events, 571 priority: TransmissionPriority::default(), 572 retransmission_priority: RetransmissionPriority::default(), 573 retransmission_offset: 0, 574 }; 575 if ss.avail() > 0 { 576 ss.conn_events.send_stream_writable(stream_id); 577 } 578 ss 579 } 580 set_priority( &mut self, transmission: TransmissionPriority, retransmission: RetransmissionPriority, )581 pub fn set_priority( 582 &mut self, 583 transmission: TransmissionPriority, 584 retransmission: RetransmissionPriority, 585 ) { 586 self.priority = transmission; 587 self.retransmission_priority = retransmission; 588 } 589 590 /// If all data has been buffered or written, how much was sent. final_size(&self) -> Option<u64>591 pub fn final_size(&self) -> Option<u64> { 592 match &self.state { 593 SendStreamState::DataSent { send_buf, .. } => Some(send_buf.used()), 594 SendStreamState::ResetSent { final_size, .. } => Some(*final_size), 595 _ => None, 596 } 597 } 598 599 /// Return the next range to be sent, if any. 600 /// If this is a retransmission, cut off what is sent at the retransmission 601 /// offset. next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])>602 fn next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])> { 603 match self.state { 604 SendStreamState::Send { ref send_buf, .. } => { 605 send_buf.next_bytes().and_then(|(offset, slice)| { 606 if retransmission_only { 607 qtrace!( 608 [self], 609 "next_bytes apply retransmission limit at {}", 610 self.retransmission_offset 611 ); 612 if self.retransmission_offset > offset { 613 let len = min( 614 usize::try_from(self.retransmission_offset - offset).unwrap(), 615 slice.len(), 616 ); 617 Some((offset, &slice[..len])) 618 } else { 619 None 620 } 621 } else { 622 Some((offset, slice)) 623 } 624 }) 625 } 626 SendStreamState::DataSent { 627 ref send_buf, 628 fin_sent, 629 .. 630 } => { 631 let bytes = send_buf.next_bytes(); 632 if bytes.is_some() { 633 bytes 634 } else if fin_sent { 635 None 636 } else { 637 // Send empty stream frame with fin set 638 Some((send_buf.used(), &[])) 639 } 640 } 641 SendStreamState::Ready { .. } 642 | SendStreamState::DataRecvd { .. } 643 | SendStreamState::ResetSent { .. } 644 | SendStreamState::ResetRecvd => None, 645 } 646 } 647 648 /// Calculate how many bytes (length) can fit into available space and whether 649 /// the remainder of the space can be filled (or if a length field is needed). length_and_fill(data_len: usize, space: usize) -> (usize, bool)650 fn length_and_fill(data_len: usize, space: usize) -> (usize, bool) { 651 if data_len >= space { 652 // Either more data than space allows, or an exact fit. 653 qtrace!("SendStream::length_and_fill fill {}", space); 654 return (space, true); 655 } 656 657 // Estimate size of the length field based on the available space, 658 // less 1, which is the worst case. 659 let length = min(space.saturating_sub(1), data_len); 660 let length_len = Encoder::varint_len(u64::try_from(length).unwrap()); 661 if length_len > space { 662 qtrace!( 663 "SendStream::length_and_fill no room for length of {} in {}", 664 length, 665 space 666 ); 667 return (0, false); 668 } 669 670 let length = min(data_len, space - length_len); 671 qtrace!("SendStream::length_and_fill {} in {}", length, space); 672 (length, false) 673 } 674 675 /// Maybe write a `STREAM` frame. write_stream_frame( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )676 fn write_stream_frame( 677 &mut self, 678 priority: TransmissionPriority, 679 builder: &mut PacketBuilder, 680 tokens: &mut Vec<RecoveryToken>, 681 stats: &mut FrameStats, 682 ) { 683 let retransmission = if priority == self.priority { 684 false 685 } else if priority == self.priority + self.retransmission_priority { 686 true 687 } else { 688 return; 689 }; 690 691 let id = self.stream_id; 692 let final_size = self.final_size(); 693 if let Some((offset, data)) = self.next_bytes(retransmission) { 694 let overhead = 1 // Frame type 695 + Encoder::varint_len(id.as_u64()) 696 + if offset > 0 { 697 Encoder::varint_len(offset) 698 } else { 699 0 700 }; 701 if overhead > builder.remaining() { 702 qtrace!([self], "write_frame no space for header"); 703 return; 704 } 705 706 let (length, fill) = Self::length_and_fill(data.len(), builder.remaining() - overhead); 707 let fin = final_size.map_or(false, |fs| fs == offset + u64::try_from(length).unwrap()); 708 if length == 0 && !fin { 709 qtrace!([self], "write_frame no data, no fin"); 710 return; 711 } 712 713 // Write the stream out. 714 builder.encode_varint(Frame::stream_type(fin, offset > 0, fill)); 715 builder.encode_varint(id.as_u64()); 716 if offset > 0 { 717 builder.encode_varint(offset); 718 } 719 if fill { 720 builder.encode(&data[..length]); 721 } else { 722 builder.encode_vvec(&data[..length]); 723 } 724 debug_assert!(builder.len() <= builder.limit()); 725 726 self.mark_as_sent(offset, length, fin); 727 tokens.push(RecoveryToken::Stream(StreamRecoveryToken { 728 id, 729 offset, 730 length, 731 fin, 732 })); 733 stats.stream += 1; 734 } 735 } 736 reset_acked(&mut self)737 pub fn reset_acked(&mut self) { 738 match self.state { 739 SendStreamState::Ready { .. } 740 | SendStreamState::Send { .. } 741 | SendStreamState::DataSent { .. } 742 | SendStreamState::DataRecvd { .. } => { 743 qtrace!([self], "Reset acked while in {} state?", self.state.name()) 744 } 745 SendStreamState::ResetSent { .. } => self.state.transition(SendStreamState::ResetRecvd), 746 SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"), 747 }; 748 } 749 reset_lost(&mut self)750 pub fn reset_lost(&mut self) { 751 match self.state { 752 SendStreamState::ResetSent { 753 ref mut priority, .. 754 } => { 755 *priority = Some(self.priority + self.retransmission_priority); 756 } 757 SendStreamState::ResetRecvd => (), 758 _ => unreachable!(), 759 } 760 } 761 762 /// Maybe write a `RESET_STREAM` frame. write_reset_frame( &mut self, p: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, ) -> bool763 pub fn write_reset_frame( 764 &mut self, 765 p: TransmissionPriority, 766 builder: &mut PacketBuilder, 767 tokens: &mut Vec<RecoveryToken>, 768 stats: &mut FrameStats, 769 ) -> bool { 770 if let SendStreamState::ResetSent { 771 final_size, 772 err, 773 ref mut priority, 774 } = self.state 775 { 776 if *priority != Some(p) { 777 return false; 778 } 779 if builder.write_varint_frame(&[ 780 FRAME_TYPE_RESET_STREAM, 781 self.stream_id.as_u64(), 782 err, 783 final_size, 784 ]) { 785 tokens.push(RecoveryToken::ResetStream { 786 stream_id: self.stream_id, 787 }); 788 stats.reset_stream += 1; 789 *priority = None; 790 true 791 } else { 792 false 793 } 794 } else { 795 false 796 } 797 } 798 blocked_lost(&mut self, limit: u64)799 pub fn blocked_lost(&mut self, limit: u64) { 800 if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } = 801 &mut self.state 802 { 803 fc.frame_lost(limit); 804 } else { 805 qtrace!([self], "Ignoring lost STREAM_DATA_BLOCKED({})", limit); 806 } 807 } 808 809 /// Maybe write a `STREAM_DATA_BLOCKED` frame. write_blocked_frame( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )810 pub fn write_blocked_frame( 811 &mut self, 812 priority: TransmissionPriority, 813 builder: &mut PacketBuilder, 814 tokens: &mut Vec<RecoveryToken>, 815 stats: &mut FrameStats, 816 ) { 817 // Send STREAM_DATA_BLOCKED at normal priority always. 818 if priority == self.priority { 819 if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } = 820 &mut self.state 821 { 822 fc.write_frames(builder, tokens, stats); 823 } 824 } 825 } 826 mark_as_sent(&mut self, offset: u64, len: usize, fin: bool)827 pub fn mark_as_sent(&mut self, offset: u64, len: usize, fin: bool) { 828 if let Some(buf) = self.state.tx_buf_mut() { 829 buf.mark_as_sent(offset, len); 830 self.send_blocked_if_space_needed(0); 831 }; 832 833 if fin { 834 if let SendStreamState::DataSent { fin_sent, .. } = &mut self.state { 835 *fin_sent = true; 836 } 837 } 838 } 839 mark_as_acked(&mut self, offset: u64, len: usize, fin: bool)840 pub fn mark_as_acked(&mut self, offset: u64, len: usize, fin: bool) { 841 match self.state { 842 SendStreamState::Send { 843 ref mut send_buf, .. 844 } => { 845 send_buf.mark_as_acked(offset, len); 846 if self.avail() > 0 { 847 self.conn_events.send_stream_writable(self.stream_id) 848 } 849 } 850 SendStreamState::DataSent { 851 ref mut send_buf, 852 ref mut fin_acked, 853 .. 854 } => { 855 send_buf.mark_as_acked(offset, len); 856 if fin { 857 *fin_acked = true; 858 } 859 if *fin_acked && send_buf.buffered() == 0 { 860 self.conn_events.send_stream_complete(self.stream_id); 861 self.state.transition(SendStreamState::DataRecvd); 862 } 863 } 864 _ => qtrace!( 865 [self], 866 "mark_as_acked called from state {}", 867 self.state.name() 868 ), 869 } 870 } 871 mark_as_lost(&mut self, offset: u64, len: usize, fin: bool)872 pub fn mark_as_lost(&mut self, offset: u64, len: usize, fin: bool) { 873 self.retransmission_offset = max( 874 self.retransmission_offset, 875 offset + u64::try_from(len).unwrap(), 876 ); 877 qtrace!( 878 [self], 879 "mark_as_lost retransmission offset={}", 880 self.retransmission_offset 881 ); 882 if let Some(buf) = self.state.tx_buf_mut() { 883 buf.mark_as_lost(offset, len); 884 } 885 886 if fin { 887 if let SendStreamState::DataSent { 888 fin_sent, 889 fin_acked, 890 .. 891 } = &mut self.state 892 { 893 *fin_sent = *fin_acked; 894 } 895 } 896 } 897 898 /// Bytes sendable on stream. Constrained by stream credit available, 899 /// connection credit available, and space in the tx buffer. avail(&self) -> usize900 pub fn avail(&self) -> usize { 901 if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } = 902 &self.state 903 { 904 min( 905 min(fc.available(), conn_fc.borrow().available()), 906 self.state.tx_avail(), 907 ) 908 } else { 909 0 910 } 911 } 912 set_max_stream_data(&mut self, limit: u64)913 pub fn set_max_stream_data(&mut self, limit: u64) { 914 if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } = 915 &mut self.state 916 { 917 let stream_was_blocked = fc.available() == 0; 918 fc.update(limit); 919 if stream_was_blocked && self.avail() > 0 { 920 self.conn_events.send_stream_writable(self.stream_id) 921 } 922 } 923 } 924 is_terminal(&self) -> bool925 pub fn is_terminal(&self) -> bool { 926 matches!( 927 self.state, 928 SendStreamState::DataRecvd { .. } | SendStreamState::ResetRecvd 929 ) 930 } 931 send(&mut self, buf: &[u8]) -> Res<usize>932 pub fn send(&mut self, buf: &[u8]) -> Res<usize> { 933 self.send_internal(buf, false) 934 } 935 send_atomic(&mut self, buf: &[u8]) -> Res<usize>936 pub fn send_atomic(&mut self, buf: &[u8]) -> Res<usize> { 937 self.send_internal(buf, true) 938 } 939 send_blocked_if_space_needed(&mut self, needed_space: usize)940 fn send_blocked_if_space_needed(&mut self, needed_space: usize) { 941 if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } = 942 &mut self.state 943 { 944 if fc.available() <= needed_space { 945 fc.blocked(); 946 } 947 948 if conn_fc.borrow().available() <= needed_space { 949 conn_fc.borrow_mut().blocked(); 950 } 951 } 952 } 953 send_internal(&mut self, buf: &[u8], atomic: bool) -> Res<usize>954 fn send_internal(&mut self, buf: &[u8], atomic: bool) -> Res<usize> { 955 if buf.is_empty() { 956 qerror!([self], "zero-length send on stream"); 957 return Err(Error::InvalidInput); 958 } 959 960 if let SendStreamState::Ready { fc, conn_fc } = &mut self.state { 961 let owned_fc = mem::replace(fc, SenderFlowControl::new(self.stream_id, 0)); 962 let owned_conn_fc = Rc::clone(conn_fc); 963 self.state.transition(SendStreamState::Send { 964 fc: owned_fc, 965 conn_fc: owned_conn_fc, 966 send_buf: TxBuffer::new(), 967 }); 968 } 969 970 if !matches!(self.state, SendStreamState::Send { .. }) { 971 return Err(Error::FinalSizeError); 972 } 973 974 let buf = if buf.is_empty() || (self.avail() == 0) { 975 return Ok(0); 976 } else if self.avail() < buf.len() { 977 if atomic { 978 self.send_blocked_if_space_needed(buf.len()); 979 return Ok(0); 980 } else { 981 &buf[..self.avail()] 982 } 983 } else { 984 buf 985 }; 986 987 match &mut self.state { 988 SendStreamState::Ready { .. } => unreachable!(), 989 SendStreamState::Send { 990 fc, 991 conn_fc, 992 send_buf, 993 } => { 994 let sent = send_buf.send(buf); 995 fc.consume(sent); 996 conn_fc.borrow_mut().consume(sent); 997 Ok(sent) 998 } 999 _ => Err(Error::FinalSizeError), 1000 } 1001 } 1002 close(&mut self)1003 pub fn close(&mut self) { 1004 match &mut self.state { 1005 SendStreamState::Ready { .. } => { 1006 self.state.transition(SendStreamState::DataSent { 1007 send_buf: TxBuffer::new(), 1008 fin_sent: false, 1009 fin_acked: false, 1010 }); 1011 } 1012 SendStreamState::Send { send_buf, .. } => { 1013 let owned_buf = mem::replace(send_buf, TxBuffer::new()); 1014 self.state.transition(SendStreamState::DataSent { 1015 send_buf: owned_buf, 1016 fin_sent: false, 1017 fin_acked: false, 1018 }); 1019 } 1020 SendStreamState::DataSent { .. } => qtrace!([self], "already in DataSent state"), 1021 SendStreamState::DataRecvd { .. } => qtrace!([self], "already in DataRecvd state"), 1022 SendStreamState::ResetSent { .. } => qtrace!([self], "already in ResetSent state"), 1023 SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"), 1024 } 1025 } 1026 reset(&mut self, err: AppError)1027 pub fn reset(&mut self, err: AppError) { 1028 match &self.state { 1029 SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } => { 1030 let final_size = fc.used(); 1031 self.state.transition(SendStreamState::ResetSent { 1032 err, 1033 final_size, 1034 priority: Some(self.priority), 1035 }); 1036 } 1037 SendStreamState::DataSent { send_buf, .. } => { 1038 let final_size = send_buf.used(); 1039 self.state.transition(SendStreamState::ResetSent { 1040 err, 1041 final_size, 1042 priority: Some(self.priority), 1043 }); 1044 } 1045 SendStreamState::DataRecvd { .. } => qtrace!([self], "already in DataRecvd state"), 1046 SendStreamState::ResetSent { .. } => qtrace!([self], "already in ResetSent state"), 1047 SendStreamState::ResetRecvd => qtrace!([self], "already in ResetRecvd state"), 1048 }; 1049 } 1050 1051 #[cfg(test)] state(&mut self) -> &mut SendStreamState1052 pub(crate) fn state(&mut self) -> &mut SendStreamState { 1053 &mut self.state 1054 } 1055 } 1056 1057 impl ::std::fmt::Display for SendStream { fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result1058 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { 1059 write!(f, "SendStream {}", self.stream_id) 1060 } 1061 } 1062 1063 #[derive(Debug, Default)] 1064 pub(crate) struct SendStreams(IndexMap<StreamId, SendStream>); 1065 1066 impl SendStreams { get(&self, id: StreamId) -> Res<&SendStream>1067 pub fn get(&self, id: StreamId) -> Res<&SendStream> { 1068 self.0.get(&id).ok_or(Error::InvalidStreamId) 1069 } 1070 get_mut(&mut self, id: StreamId) -> Res<&mut SendStream>1071 pub fn get_mut(&mut self, id: StreamId) -> Res<&mut SendStream> { 1072 self.0.get_mut(&id).ok_or(Error::InvalidStreamId) 1073 } 1074 exists(&self, id: StreamId) -> bool1075 pub fn exists(&self, id: StreamId) -> bool { 1076 self.0.contains_key(&id) 1077 } 1078 insert(&mut self, id: StreamId, stream: SendStream)1079 pub fn insert(&mut self, id: StreamId, stream: SendStream) { 1080 self.0.insert(id, stream); 1081 } 1082 acked(&mut self, token: &StreamRecoveryToken)1083 pub fn acked(&mut self, token: &StreamRecoveryToken) { 1084 if let Some(ss) = self.0.get_mut(&token.id) { 1085 ss.mark_as_acked(token.offset, token.length, token.fin); 1086 } 1087 } 1088 reset_acked(&mut self, id: StreamId)1089 pub fn reset_acked(&mut self, id: StreamId) { 1090 if let Some(ss) = self.0.get_mut(&id) { 1091 ss.reset_acked() 1092 } 1093 } 1094 lost(&mut self, token: &StreamRecoveryToken)1095 pub fn lost(&mut self, token: &StreamRecoveryToken) { 1096 if let Some(ss) = self.0.get_mut(&token.id) { 1097 ss.mark_as_lost(token.offset, token.length, token.fin); 1098 } 1099 } 1100 reset_lost(&mut self, stream_id: StreamId)1101 pub fn reset_lost(&mut self, stream_id: StreamId) { 1102 if let Some(ss) = self.0.get_mut(&stream_id) { 1103 ss.reset_lost(); 1104 } 1105 } 1106 blocked_lost(&mut self, stream_id: StreamId, limit: u64)1107 pub fn blocked_lost(&mut self, stream_id: StreamId, limit: u64) { 1108 if let Some(ss) = self.0.get_mut(&stream_id) { 1109 ss.blocked_lost(limit); 1110 } 1111 } 1112 clear(&mut self)1113 pub fn clear(&mut self) { 1114 self.0.clear() 1115 } 1116 clear_terminal(&mut self)1117 pub fn clear_terminal(&mut self) { 1118 self.0.retain(|_, stream| !stream.is_terminal()) 1119 } 1120 write_frames( &mut self, priority: TransmissionPriority, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )1121 pub(crate) fn write_frames( 1122 &mut self, 1123 priority: TransmissionPriority, 1124 builder: &mut PacketBuilder, 1125 tokens: &mut Vec<RecoveryToken>, 1126 stats: &mut FrameStats, 1127 ) { 1128 qtrace!("write STREAM frames at priority {:?}", priority); 1129 for stream in self.0.values_mut() { 1130 if !stream.write_reset_frame(priority, builder, tokens, stats) { 1131 stream.write_blocked_frame(priority, builder, tokens, stats); 1132 stream.write_stream_frame(priority, builder, tokens, stats); 1133 } 1134 } 1135 } 1136 update_initial_limit(&mut self, remote: &TransportParameters)1137 pub fn update_initial_limit(&mut self, remote: &TransportParameters) { 1138 for (id, ss) in self.0.iter_mut() { 1139 let limit = if id.is_bidi() { 1140 assert!(!id.is_remote_initiated(Role::Client)); 1141 remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE) 1142 } else { 1143 remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_UNI) 1144 }; 1145 ss.set_max_stream_data(limit); 1146 } 1147 } 1148 } 1149 1150 impl<'a> IntoIterator for &'a mut SendStreams { 1151 type Item = (&'a StreamId, &'a mut SendStream); 1152 type IntoIter = indexmap::map::IterMut<'a, StreamId, SendStream>; 1153 into_iter(self) -> indexmap::map::IterMut<'a, StreamId, SendStream>1154 fn into_iter(self) -> indexmap::map::IterMut<'a, StreamId, SendStream> { 1155 self.0.iter_mut() 1156 } 1157 } 1158 1159 #[derive(Debug, Clone)] 1160 pub struct StreamRecoveryToken { 1161 pub(crate) id: StreamId, 1162 offset: u64, 1163 length: usize, 1164 fin: bool, 1165 } 1166 1167 #[cfg(test)] 1168 mod tests { 1169 use super::*; 1170 1171 use crate::events::ConnectionEvent; 1172 use neqo_common::{event::Provider, hex_with_len, qtrace}; 1173 connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>>1174 fn connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>> { 1175 Rc::new(RefCell::new(SenderFlowControl::new((), limit))) 1176 } 1177 1178 #[test] test_mark_range()1179 fn test_mark_range() { 1180 let mut rt = RangeTracker::default(); 1181 1182 // ranges can go from nothing->Sent if queued for retrans and then 1183 // acks arrive 1184 rt.mark_range(5, 5, RangeState::Acked); 1185 assert_eq!(rt.highest_offset(), 10); 1186 assert_eq!(rt.acked_from_zero(), 0); 1187 rt.mark_range(10, 4, RangeState::Acked); 1188 assert_eq!(rt.highest_offset(), 14); 1189 assert_eq!(rt.acked_from_zero(), 0); 1190 1191 rt.mark_range(0, 5, RangeState::Sent); 1192 assert_eq!(rt.highest_offset(), 14); 1193 assert_eq!(rt.acked_from_zero(), 0); 1194 rt.mark_range(0, 5, RangeState::Acked); 1195 assert_eq!(rt.highest_offset(), 14); 1196 assert_eq!(rt.acked_from_zero(), 14); 1197 1198 rt.mark_range(12, 20, RangeState::Acked); 1199 assert_eq!(rt.highest_offset(), 32); 1200 assert_eq!(rt.acked_from_zero(), 32); 1201 1202 // ack the lot 1203 rt.mark_range(0, 400, RangeState::Acked); 1204 assert_eq!(rt.highest_offset(), 400); 1205 assert_eq!(rt.acked_from_zero(), 400); 1206 1207 // acked trumps sent 1208 rt.mark_range(0, 200, RangeState::Sent); 1209 assert_eq!(rt.highest_offset(), 400); 1210 assert_eq!(rt.acked_from_zero(), 400); 1211 } 1212 1213 #[test] unmark_sent_start()1214 fn unmark_sent_start() { 1215 let mut rt = RangeTracker::default(); 1216 1217 rt.mark_range(0, 5, RangeState::Sent); 1218 assert_eq!(rt.highest_offset(), 5); 1219 assert_eq!(rt.acked_from_zero(), 0); 1220 1221 rt.unmark_sent(); 1222 assert_eq!(rt.highest_offset(), 0); 1223 assert_eq!(rt.acked_from_zero(), 0); 1224 assert_eq!(rt.first_unmarked_range(), (0, None)); 1225 } 1226 1227 #[test] unmark_sent_middle()1228 fn unmark_sent_middle() { 1229 let mut rt = RangeTracker::default(); 1230 1231 rt.mark_range(0, 5, RangeState::Acked); 1232 assert_eq!(rt.highest_offset(), 5); 1233 assert_eq!(rt.acked_from_zero(), 5); 1234 rt.mark_range(5, 5, RangeState::Sent); 1235 assert_eq!(rt.highest_offset(), 10); 1236 assert_eq!(rt.acked_from_zero(), 5); 1237 rt.mark_range(10, 5, RangeState::Acked); 1238 assert_eq!(rt.highest_offset(), 15); 1239 assert_eq!(rt.acked_from_zero(), 5); 1240 assert_eq!(rt.first_unmarked_range(), (15, None)); 1241 1242 rt.unmark_sent(); 1243 assert_eq!(rt.highest_offset(), 15); 1244 assert_eq!(rt.acked_from_zero(), 5); 1245 assert_eq!(rt.first_unmarked_range(), (5, Some(5))); 1246 } 1247 1248 #[test] unmark_sent_end()1249 fn unmark_sent_end() { 1250 let mut rt = RangeTracker::default(); 1251 1252 rt.mark_range(0, 5, RangeState::Acked); 1253 assert_eq!(rt.highest_offset(), 5); 1254 assert_eq!(rt.acked_from_zero(), 5); 1255 rt.mark_range(5, 5, RangeState::Sent); 1256 assert_eq!(rt.highest_offset(), 10); 1257 assert_eq!(rt.acked_from_zero(), 5); 1258 assert_eq!(rt.first_unmarked_range(), (10, None)); 1259 1260 rt.unmark_sent(); 1261 assert_eq!(rt.highest_offset(), 5); 1262 assert_eq!(rt.acked_from_zero(), 5); 1263 assert_eq!(rt.first_unmarked_range(), (5, None)); 1264 } 1265 1266 #[test] truncate_front()1267 fn truncate_front() { 1268 let mut v = VecDeque::new(); 1269 v.push_back(5); 1270 v.push_back(6); 1271 v.push_back(7); 1272 v.push_front(4usize); 1273 1274 v.rotate_left(1); 1275 v.truncate(3); 1276 assert_eq!(*v.front().unwrap(), 5); 1277 assert_eq!(*v.back().unwrap(), 7); 1278 } 1279 1280 #[test] test_unmark_range()1281 fn test_unmark_range() { 1282 let mut rt = RangeTracker::default(); 1283 1284 rt.mark_range(5, 5, RangeState::Acked); 1285 rt.mark_range(10, 5, RangeState::Sent); 1286 1287 // Should unmark sent but not acked range 1288 rt.unmark_range(7, 6); 1289 1290 let res = rt.first_unmarked_range(); 1291 assert_eq!(res, (0, Some(5))); 1292 assert_eq!( 1293 rt.used.iter().next().unwrap(), 1294 (&5, &(5, RangeState::Acked)) 1295 ); 1296 assert_eq!( 1297 rt.used.iter().nth(1).unwrap(), 1298 (&13, &(2, RangeState::Sent)) 1299 ); 1300 assert!(rt.used.iter().nth(2).is_none()); 1301 rt.mark_range(0, 5, RangeState::Sent); 1302 1303 let res = rt.first_unmarked_range(); 1304 assert_eq!(res, (10, Some(3))); 1305 rt.mark_range(10, 3, RangeState::Sent); 1306 1307 let res = rt.first_unmarked_range(); 1308 assert_eq!(res, (15, None)); 1309 } 1310 1311 #[test] 1312 #[allow(clippy::cognitive_complexity)] tx_buffer_next_bytes_1()1313 fn tx_buffer_next_bytes_1() { 1314 let mut txb = TxBuffer::new(); 1315 1316 assert_eq!(txb.avail(), SEND_BUFFER_SIZE); 1317 1318 // Fill the buffer 1319 assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); 1320 assert!(matches!(txb.next_bytes(), 1321 Some((0, x)) if x.len()==SEND_BUFFER_SIZE 1322 && x.iter().all(|ch| *ch == 1))); 1323 1324 // Mark almost all as sent. Get what's left 1325 let one_byte_from_end = SEND_BUFFER_SIZE as u64 - 1; 1326 txb.mark_as_sent(0, one_byte_from_end as usize); 1327 assert!(matches!(txb.next_bytes(), 1328 Some((start, x)) if x.len() == 1 1329 && start == one_byte_from_end 1330 && x.iter().all(|ch| *ch == 1))); 1331 1332 // Mark all as sent. Get nothing 1333 txb.mark_as_sent(0, SEND_BUFFER_SIZE); 1334 assert!(matches!(txb.next_bytes(), None)); 1335 1336 // Mark as lost. Get it again 1337 txb.mark_as_lost(one_byte_from_end, 1); 1338 assert!(matches!(txb.next_bytes(), 1339 Some((start, x)) if x.len() == 1 1340 && start == one_byte_from_end 1341 && x.iter().all(|ch| *ch == 1))); 1342 1343 // Mark a larger range lost, including beyond what's in the buffer even. 1344 // Get a little more 1345 let five_bytes_from_end = SEND_BUFFER_SIZE as u64 - 5; 1346 txb.mark_as_lost(five_bytes_from_end, 100); 1347 assert!(matches!(txb.next_bytes(), 1348 Some((start, x)) if x.len() == 5 1349 && start == five_bytes_from_end 1350 && x.iter().all(|ch| *ch == 1))); 1351 1352 // Contig acked range at start means it can be removed from buffer 1353 // Impl of vecdeque should now result in a split buffer when more data 1354 // is sent 1355 txb.mark_as_acked(0, five_bytes_from_end as usize); 1356 assert_eq!(txb.send(&[2; 30]), 30); 1357 // Just get 5 even though there is more 1358 assert!(matches!(txb.next_bytes(), 1359 Some((start, x)) if x.len() == 5 1360 && start == five_bytes_from_end 1361 && x.iter().all(|ch| *ch == 1))); 1362 assert_eq!(txb.retired, five_bytes_from_end); 1363 assert_eq!(txb.buffered(), 35); 1364 1365 // Marking that bit as sent should let the last contig bit be returned 1366 // when called again 1367 txb.mark_as_sent(five_bytes_from_end, 5); 1368 assert!(matches!(txb.next_bytes(), 1369 Some((start, x)) if x.len() == 30 1370 && start == SEND_BUFFER_SIZE as u64 1371 && x.iter().all(|ch| *ch == 2))); 1372 } 1373 1374 #[test] tx_buffer_next_bytes_2()1375 fn tx_buffer_next_bytes_2() { 1376 let mut txb = TxBuffer::new(); 1377 1378 assert_eq!(txb.avail(), SEND_BUFFER_SIZE); 1379 1380 // Fill the buffer 1381 assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); 1382 assert!(matches!(txb.next_bytes(), 1383 Some((0, x)) if x.len()==SEND_BUFFER_SIZE 1384 && x.iter().all(|ch| *ch == 1))); 1385 1386 // As above 1387 let forty_bytes_from_end = SEND_BUFFER_SIZE as u64 - 40; 1388 1389 txb.mark_as_acked(0, forty_bytes_from_end as usize); 1390 assert!(matches!(txb.next_bytes(), 1391 Some((start, x)) if x.len() == 40 1392 && start == forty_bytes_from_end 1393 )); 1394 1395 // Valid new data placed in split locations 1396 assert_eq!(txb.send(&[2; 100]), 100); 1397 1398 // Mark a little more as sent 1399 txb.mark_as_sent(forty_bytes_from_end, 10); 1400 let thirty_bytes_from_end = forty_bytes_from_end + 10; 1401 assert!(matches!(txb.next_bytes(), 1402 Some((start, x)) if x.len() == 30 1403 && start == thirty_bytes_from_end 1404 && x.iter().all(|ch| *ch == 1))); 1405 1406 // Mark a range 'A' in second slice as sent. Should still return the same 1407 let range_a_start = SEND_BUFFER_SIZE as u64 + 30; 1408 let range_a_end = range_a_start + 10; 1409 txb.mark_as_sent(range_a_start, 10); 1410 assert!(matches!(txb.next_bytes(), 1411 Some((start, x)) if x.len() == 30 1412 && start == thirty_bytes_from_end 1413 && x.iter().all(|ch| *ch == 1))); 1414 1415 // Ack entire first slice and into second slice 1416 let ten_bytes_past_end = SEND_BUFFER_SIZE as u64 + 10; 1417 txb.mark_as_acked(0, ten_bytes_past_end as usize); 1418 1419 // Get up to marked range A 1420 assert!(matches!(txb.next_bytes(), 1421 Some((start, x)) if x.len() == 20 1422 && start == ten_bytes_past_end 1423 && x.iter().all(|ch| *ch == 2))); 1424 1425 txb.mark_as_sent(ten_bytes_past_end, 20); 1426 1427 // Get bit after earlier marked range A 1428 assert!(matches!(txb.next_bytes(), 1429 Some((start, x)) if x.len() == 60 1430 && start == range_a_end 1431 && x.iter().all(|ch| *ch == 2))); 1432 1433 // No more bytes. 1434 txb.mark_as_sent(range_a_end, 60); 1435 assert!(matches!(txb.next_bytes(), None)); 1436 } 1437 1438 #[test] test_stream_tx()1439 fn test_stream_tx() { 1440 let conn_fc = connection_fc(4096); 1441 let conn_events = ConnectionEvents::default(); 1442 1443 let mut s = SendStream::new(4.into(), 1024, Rc::clone(&conn_fc), conn_events); 1444 1445 let res = s.send(&[4; 100]).unwrap(); 1446 assert_eq!(res, 100); 1447 s.mark_as_sent(0, 50, false); 1448 if let SendStreamState::Send { fc, .. } = s.state() { 1449 assert_eq!(fc.used(), 100); 1450 } else { 1451 panic!("unexpected stream state"); 1452 } 1453 1454 // Should hit stream flow control limit before filling up send buffer 1455 let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); 1456 assert_eq!(res, 1024 - 100); 1457 1458 // should do nothing, max stream data already 1024 1459 s.set_max_stream_data(1024); 1460 let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); 1461 assert_eq!(res, 0); 1462 1463 // should now hit the conn flow control (4096) 1464 s.set_max_stream_data(1_048_576); 1465 let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); 1466 assert_eq!(res, 3072); 1467 1468 // should now hit the tx buffer size 1469 conn_fc.borrow_mut().update(SEND_BUFFER_SIZE as u64); 1470 let res = s.send(&[4; SEND_BUFFER_SIZE + 100]).unwrap(); 1471 assert_eq!(res, SEND_BUFFER_SIZE - 4096); 1472 1473 // TODO(agrover@mozilla.com): test ooo acks somehow 1474 s.mark_as_acked(0, 40, false); 1475 } 1476 1477 #[test] test_tx_buffer_acks()1478 fn test_tx_buffer_acks() { 1479 let mut tx = TxBuffer::new(); 1480 assert_eq!(tx.send(&[4; 100]), 100); 1481 let res = tx.next_bytes().unwrap(); 1482 assert_eq!(res.0, 0); 1483 assert_eq!(res.1.len(), 100); 1484 tx.mark_as_sent(0, 100); 1485 let res = tx.next_bytes(); 1486 assert_eq!(res, None); 1487 1488 tx.mark_as_acked(0, 100); 1489 let res = tx.next_bytes(); 1490 assert_eq!(res, None); 1491 } 1492 1493 #[test] send_stream_writable_event_gen()1494 fn send_stream_writable_event_gen() { 1495 let conn_fc = connection_fc(2); 1496 let mut conn_events = ConnectionEvents::default(); 1497 1498 let mut s = SendStream::new(4.into(), 0, Rc::clone(&conn_fc), conn_events.clone()); 1499 1500 // Stream is initially blocked (conn:2, stream:0) 1501 // and will not accept data. 1502 assert_eq!(s.send(b"hi").unwrap(), 0); 1503 1504 // increasing to (conn:2, stream:2) will allow 2 bytes, and also 1505 // generate a SendStreamWritable event. 1506 s.set_max_stream_data(2); 1507 let evts = conn_events.events().collect::<Vec<_>>(); 1508 assert_eq!(evts.len(), 1); 1509 assert!(matches!( 1510 evts[0], 1511 ConnectionEvent::SendStreamWritable { .. } 1512 )); 1513 assert_eq!(s.send(b"hello").unwrap(), 2); 1514 1515 // increasing to (conn:2, stream:4) will not generate an event or allow 1516 // sending anything. 1517 s.set_max_stream_data(4); 1518 let evts = conn_events.events().collect::<Vec<_>>(); 1519 assert_eq!(evts.len(), 0); 1520 assert_eq!(s.send(b"hello").unwrap(), 0); 1521 1522 // Increasing conn max (conn:4, stream:4) will unblock but not emit 1523 // event b/c that happens in Connection::emit_frame() (tested in 1524 // connection.rs) 1525 assert!(conn_fc.borrow_mut().update(4)); 1526 let evts = conn_events.events().collect::<Vec<_>>(); 1527 assert_eq!(evts.len(), 0); 1528 assert_eq!(s.avail(), 2); 1529 assert_eq!(s.send(b"hello").unwrap(), 2); 1530 1531 // No event because still blocked by conn 1532 s.set_max_stream_data(1_000_000_000); 1533 let evts = conn_events.events().collect::<Vec<_>>(); 1534 assert_eq!(evts.len(), 0); 1535 1536 // No event because happens in emit_frame() 1537 conn_fc.borrow_mut().update(1_000_000_000); 1538 let evts = conn_events.events().collect::<Vec<_>>(); 1539 assert_eq!(evts.len(), 0); 1540 1541 // Unblocking both by a large amount will cause avail() to be limited by 1542 // tx buffer size. 1543 assert_eq!(s.avail(), SEND_BUFFER_SIZE - 4); 1544 1545 assert_eq!( 1546 s.send(&[b'a'; SEND_BUFFER_SIZE]).unwrap(), 1547 SEND_BUFFER_SIZE - 4 1548 ); 1549 1550 // No event because still blocked by tx buffer full 1551 s.set_max_stream_data(2_000_000_000); 1552 let evts = conn_events.events().collect::<Vec<_>>(); 1553 assert_eq!(evts.len(), 0); 1554 assert_eq!(s.send(b"hello").unwrap(), 0); 1555 } 1556 1557 #[test] send_stream_writable_event_new_stream()1558 fn send_stream_writable_event_new_stream() { 1559 let conn_fc = connection_fc(2); 1560 let mut conn_events = ConnectionEvents::default(); 1561 1562 let _s = SendStream::new(4.into(), 100, conn_fc, conn_events.clone()); 1563 1564 // Creating a new stream with conn and stream credits should result in 1565 // an event. 1566 let evts = conn_events.events().collect::<Vec<_>>(); 1567 assert_eq!(evts.len(), 1); 1568 assert!(matches!( 1569 evts[0], 1570 ConnectionEvent::SendStreamWritable { .. } 1571 )); 1572 } 1573 1574 #[test] 1575 // Verify lost frames handle fin properly send_stream_get_frame_data()1576 fn send_stream_get_frame_data() { 1577 let conn_fc = connection_fc(100); 1578 let conn_events = ConnectionEvents::default(); 1579 1580 let mut s = SendStream::new(0.into(), 100, conn_fc, conn_events); 1581 s.send(&[0; 10]).unwrap(); 1582 s.close(); 1583 1584 let mut ss = SendStreams::default(); 1585 ss.insert(StreamId::from(0), s); 1586 1587 let mut tokens = Vec::new(); 1588 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1589 1590 // Write a small frame: no fin. 1591 let written = builder.len(); 1592 builder.set_limit(written + 6); 1593 ss.write_frames( 1594 TransmissionPriority::default(), 1595 &mut builder, 1596 &mut tokens, 1597 &mut FrameStats::default(), 1598 ); 1599 assert_eq!(builder.len(), written + 6); 1600 assert_eq!(tokens.len(), 1); 1601 let f1_token = tokens.remove(0); 1602 assert!(matches!(&f1_token, RecoveryToken::Stream(x) if !x.fin)); 1603 1604 // Write the rest: fin. 1605 let written = builder.len(); 1606 builder.set_limit(written + 200); 1607 ss.write_frames( 1608 TransmissionPriority::default(), 1609 &mut builder, 1610 &mut tokens, 1611 &mut FrameStats::default(), 1612 ); 1613 assert_eq!(builder.len(), written + 10); 1614 assert_eq!(tokens.len(), 1); 1615 let f2_token = tokens.remove(0); 1616 assert!(matches!(&f2_token, RecoveryToken::Stream(x) if x.fin)); 1617 1618 // Should be no more data to frame. 1619 let written = builder.len(); 1620 ss.write_frames( 1621 TransmissionPriority::default(), 1622 &mut builder, 1623 &mut tokens, 1624 &mut FrameStats::default(), 1625 ); 1626 assert_eq!(builder.len(), written); 1627 assert!(tokens.is_empty()); 1628 1629 // Mark frame 1 as lost 1630 if let RecoveryToken::Stream(rt) = f1_token { 1631 ss.lost(&rt); 1632 } else { 1633 panic!(); 1634 } 1635 1636 // Next frame should not set fin even though stream has fin but frame 1637 // does not include end of stream 1638 let written = builder.len(); 1639 ss.write_frames( 1640 TransmissionPriority::default() + RetransmissionPriority::default(), 1641 &mut builder, 1642 &mut tokens, 1643 &mut FrameStats::default(), 1644 ); 1645 assert_eq!(builder.len(), written + 7); // Needs a length this time. 1646 assert_eq!(tokens.len(), 1); 1647 let f4_token = tokens.remove(0); 1648 assert!(matches!(&f4_token, RecoveryToken::Stream(x) if !x.fin)); 1649 1650 // Mark frame 2 as lost 1651 if let RecoveryToken::Stream(rt) = f2_token { 1652 ss.lost(&rt); 1653 } else { 1654 panic!(); 1655 } 1656 1657 // Next frame should set fin because it includes end of stream 1658 let written = builder.len(); 1659 ss.write_frames( 1660 TransmissionPriority::default() + RetransmissionPriority::default(), 1661 &mut builder, 1662 &mut tokens, 1663 &mut FrameStats::default(), 1664 ); 1665 assert_eq!(builder.len(), written + 10); 1666 assert_eq!(tokens.len(), 1); 1667 let f5_token = tokens.remove(0); 1668 assert!(matches!(&f5_token, RecoveryToken::Stream(x) if x.fin)); 1669 } 1670 1671 #[test] 1672 #[allow(clippy::cognitive_complexity)] 1673 // Verify lost frames handle fin properly with zero length fin send_stream_get_frame_zerolength_fin()1674 fn send_stream_get_frame_zerolength_fin() { 1675 let conn_fc = connection_fc(100); 1676 let conn_events = ConnectionEvents::default(); 1677 1678 let mut s = SendStream::new(0.into(), 100, conn_fc, conn_events); 1679 s.send(&[0; 10]).unwrap(); 1680 1681 let mut ss = SendStreams::default(); 1682 ss.insert(StreamId::from(0), s); 1683 1684 let mut tokens = Vec::new(); 1685 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1686 ss.write_frames( 1687 TransmissionPriority::default(), 1688 &mut builder, 1689 &mut tokens, 1690 &mut FrameStats::default(), 1691 ); 1692 let f1_token = tokens.remove(0); 1693 assert!(matches!(&f1_token, RecoveryToken::Stream(x) if x.offset == 0)); 1694 assert!(matches!(&f1_token, RecoveryToken::Stream(x) if x.length == 10)); 1695 assert!(matches!(&f1_token, RecoveryToken::Stream(x) if !x.fin)); 1696 1697 // Should be no more data to frame 1698 ss.write_frames( 1699 TransmissionPriority::default(), 1700 &mut builder, 1701 &mut tokens, 1702 &mut FrameStats::default(), 1703 ); 1704 assert!(tokens.is_empty()); 1705 1706 ss.get_mut(StreamId::from(0)).unwrap().close(); 1707 1708 ss.write_frames( 1709 TransmissionPriority::default(), 1710 &mut builder, 1711 &mut tokens, 1712 &mut FrameStats::default(), 1713 ); 1714 let f2_token = tokens.remove(0); 1715 assert!(matches!(&f2_token, RecoveryToken::Stream(x) if x.offset == 10)); 1716 assert!(matches!(&f2_token, RecoveryToken::Stream(x) if x.length == 0)); 1717 assert!(matches!(&f2_token, RecoveryToken::Stream(x) if x.fin)); 1718 1719 // Mark frame 2 as lost 1720 if let RecoveryToken::Stream(rt) = f2_token { 1721 ss.lost(&rt); 1722 } else { 1723 panic!(); 1724 } 1725 1726 // Next frame should set fin 1727 ss.write_frames( 1728 TransmissionPriority::default(), 1729 &mut builder, 1730 &mut tokens, 1731 &mut FrameStats::default(), 1732 ); 1733 let f3_token = tokens.remove(0); 1734 assert!(matches!(&f3_token, RecoveryToken::Stream(x) if x.offset == 10)); 1735 assert!(matches!(&f3_token, RecoveryToken::Stream(x) if x.length == 0)); 1736 assert!(matches!(&f3_token, RecoveryToken::Stream(x) if x.fin)); 1737 1738 // Mark frame 1 as lost 1739 if let RecoveryToken::Stream(rt) = f1_token { 1740 ss.lost(&rt); 1741 } else { 1742 panic!(); 1743 } 1744 1745 // Next frame should set fin and include all data 1746 ss.write_frames( 1747 TransmissionPriority::default(), 1748 &mut builder, 1749 &mut tokens, 1750 &mut FrameStats::default(), 1751 ); 1752 let f4_token = tokens.remove(0); 1753 assert!(matches!(&f4_token, RecoveryToken::Stream(x) if x.offset == 0)); 1754 assert!(matches!(&f4_token, RecoveryToken::Stream(x) if x.length == 10)); 1755 assert!(matches!(&f4_token, RecoveryToken::Stream(x) if x.fin)); 1756 } 1757 1758 #[test] data_blocked()1759 fn data_blocked() { 1760 let conn_fc = connection_fc(5); 1761 let conn_events = ConnectionEvents::default(); 1762 1763 let stream_id = StreamId::from(4); 1764 let mut s = SendStream::new(stream_id, 2, Rc::clone(&conn_fc), conn_events); 1765 1766 // Only two bytes can be sent due to the stream limit. 1767 assert_eq!(s.send(b"abc").unwrap(), 2); 1768 assert_eq!(s.next_bytes(false), Some((0, &b"ab"[..]))); 1769 1770 // This doesn't report blocking yet. 1771 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1772 let mut tokens = Vec::new(); 1773 let mut stats = FrameStats::default(); 1774 s.write_blocked_frame( 1775 TransmissionPriority::default(), 1776 &mut builder, 1777 &mut tokens, 1778 &mut stats, 1779 ); 1780 assert_eq!(stats.stream_data_blocked, 0); 1781 1782 // Blocking is reported after sending the last available credit. 1783 s.mark_as_sent(0, 2, false); 1784 s.write_blocked_frame( 1785 TransmissionPriority::default(), 1786 &mut builder, 1787 &mut tokens, 1788 &mut stats, 1789 ); 1790 assert_eq!(stats.stream_data_blocked, 1); 1791 1792 // Now increase the stream limit and test the connection limit. 1793 s.set_max_stream_data(10); 1794 1795 assert_eq!(s.send(b"abcd").unwrap(), 3); 1796 assert_eq!(s.next_bytes(false), Some((2, &b"abc"[..]))); 1797 // DATA_BLOCKED is not sent yet. 1798 conn_fc 1799 .borrow_mut() 1800 .write_frames(&mut builder, &mut tokens, &mut stats); 1801 assert_eq!(stats.data_blocked, 0); 1802 1803 // DATA_BLOCKED is queued once bytes using all credit are sent. 1804 s.mark_as_sent(2, 3, false); 1805 conn_fc 1806 .borrow_mut() 1807 .write_frames(&mut builder, &mut tokens, &mut stats); 1808 assert_eq!(stats.data_blocked, 1); 1809 } 1810 1811 #[test] data_blocked_atomic()1812 fn data_blocked_atomic() { 1813 let conn_fc = connection_fc(5); 1814 let conn_events = ConnectionEvents::default(); 1815 1816 let stream_id = StreamId::from(4); 1817 let mut s = SendStream::new(stream_id, 2, Rc::clone(&conn_fc), conn_events); 1818 1819 // Stream is initially blocked (conn:5, stream:2) 1820 // and will not accept atomic write of 3 bytes. 1821 assert_eq!(s.send_atomic(b"abc").unwrap(), 0); 1822 1823 // Assert that STREAM_DATA_BLOCKED is sent. 1824 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1825 let mut tokens = Vec::new(); 1826 let mut stats = FrameStats::default(); 1827 s.write_blocked_frame( 1828 TransmissionPriority::default(), 1829 &mut builder, 1830 &mut tokens, 1831 &mut stats, 1832 ); 1833 assert_eq!(stats.stream_data_blocked, 1); 1834 1835 // Assert that a non-atomic write works. 1836 assert_eq!(s.send(b"abc").unwrap(), 2); 1837 assert_eq!(s.next_bytes(false), Some((0, &b"ab"[..]))); 1838 s.mark_as_sent(0, 2, false); 1839 1840 // Set limits to (conn:5, stream:10). 1841 s.set_max_stream_data(10); 1842 1843 // An atomic write of 4 bytes exceeds the remaining limit of 3. 1844 assert_eq!(s.send_atomic(b"abcd").unwrap(), 0); 1845 1846 // Assert that DATA_BLOCKED is sent. 1847 conn_fc 1848 .borrow_mut() 1849 .write_frames(&mut builder, &mut tokens, &mut stats); 1850 assert_eq!(stats.data_blocked, 1); 1851 1852 // Check that a non-atomic write works. 1853 assert_eq!(s.send(b"abcd").unwrap(), 3); 1854 assert_eq!(s.next_bytes(false), Some((2, &b"abc"[..]))); 1855 s.mark_as_sent(2, 3, false); 1856 1857 // Increase limits to (conn:15, stream:15). 1858 s.set_max_stream_data(15); 1859 conn_fc.borrow_mut().update(15); 1860 1861 // Check that atomic writing right up to the limit works. 1862 assert_eq!(s.send_atomic(b"abcdefghij").unwrap(), 10); 1863 } 1864 1865 #[test] ack_fin_first()1866 fn ack_fin_first() { 1867 const MESSAGE: &[u8] = b"hello"; 1868 let len_u64 = u64::try_from(MESSAGE.len()).unwrap(); 1869 1870 let conn_fc = connection_fc(len_u64); 1871 let conn_events = ConnectionEvents::default(); 1872 1873 let mut s = SendStream::new(StreamId::new(100), 0, conn_fc, conn_events); 1874 s.set_max_stream_data(len_u64); 1875 1876 // Send all the data, then the fin. 1877 let _ = s.send(MESSAGE).unwrap(); 1878 s.mark_as_sent(0, MESSAGE.len(), false); 1879 s.close(); 1880 s.mark_as_sent(len_u64, 0, true); 1881 1882 // Ack the fin, then the data. 1883 s.mark_as_acked(len_u64, 0, true); 1884 s.mark_as_acked(0, MESSAGE.len(), false); 1885 assert!(s.is_terminal()); 1886 } 1887 1888 #[test] ack_then_lose_fin()1889 fn ack_then_lose_fin() { 1890 const MESSAGE: &[u8] = b"hello"; 1891 let len_u64 = u64::try_from(MESSAGE.len()).unwrap(); 1892 1893 let conn_fc = connection_fc(len_u64); 1894 let conn_events = ConnectionEvents::default(); 1895 1896 let id = StreamId::new(100); 1897 let mut s = SendStream::new(id, 0, conn_fc, conn_events); 1898 s.set_max_stream_data(len_u64); 1899 1900 // Send all the data, then the fin. 1901 let _ = s.send(MESSAGE).unwrap(); 1902 s.mark_as_sent(0, MESSAGE.len(), false); 1903 s.close(); 1904 s.mark_as_sent(len_u64, 0, true); 1905 1906 // Ack the fin, then mark it lost. 1907 s.mark_as_acked(len_u64, 0, true); 1908 s.mark_as_lost(len_u64, 0, true); 1909 1910 // No frame should be sent here. 1911 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1912 let mut tokens = Vec::new(); 1913 let mut stats = FrameStats::default(); 1914 s.write_stream_frame( 1915 TransmissionPriority::default(), 1916 &mut builder, 1917 &mut tokens, 1918 &mut stats, 1919 ); 1920 assert_eq!(stats.stream, 0); 1921 } 1922 1923 /// Create a `SendStream` and force it into a state where it believes that 1924 /// `offset` bytes have already been sent and acknowledged. stream_with_sent(stream: u64, offset: usize) -> SendStream1925 fn stream_with_sent(stream: u64, offset: usize) -> SendStream { 1926 const MAX_VARINT: u64 = (1 << 62) - 1; 1927 1928 let conn_fc = connection_fc(MAX_VARINT); 1929 let mut s = SendStream::new( 1930 StreamId::from(stream), 1931 MAX_VARINT, 1932 conn_fc, 1933 ConnectionEvents::default(), 1934 ); 1935 1936 let mut send_buf = TxBuffer::new(); 1937 send_buf.retired = u64::try_from(offset).unwrap(); 1938 send_buf.ranges.mark_range(0, offset, RangeState::Acked); 1939 let mut fc = SenderFlowControl::new(StreamId::from(stream), MAX_VARINT); 1940 fc.consume(offset); 1941 let conn_fc = Rc::new(RefCell::new(SenderFlowControl::new((), MAX_VARINT))); 1942 s.state = SendStreamState::Send { 1943 fc, 1944 conn_fc, 1945 send_buf, 1946 }; 1947 s 1948 } 1949 frame_sent_sid(stream: u64, offset: usize, len: usize, fin: bool, space: usize) -> bool1950 fn frame_sent_sid(stream: u64, offset: usize, len: usize, fin: bool, space: usize) -> bool { 1951 const BUF: &[u8] = &[0x42; 128]; 1952 let mut s = stream_with_sent(stream, offset); 1953 1954 // Now write out the proscribed data and maybe close. 1955 if len > 0 { 1956 s.send(&BUF[..len]).unwrap(); 1957 } 1958 if fin { 1959 s.close(); 1960 } 1961 1962 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 1963 let header_len = builder.len(); 1964 builder.set_limit(header_len + space); 1965 1966 let mut tokens = Vec::new(); 1967 let mut stats = FrameStats::default(); 1968 s.write_stream_frame( 1969 TransmissionPriority::default(), 1970 &mut builder, 1971 &mut tokens, 1972 &mut stats, 1973 ); 1974 qtrace!("STREAM frame: {}", hex_with_len(&builder[header_len..])); 1975 stats.stream > 0 1976 } 1977 frame_sent(offset: usize, len: usize, fin: bool, space: usize) -> bool1978 fn frame_sent(offset: usize, len: usize, fin: bool, space: usize) -> bool { 1979 frame_sent_sid(0, offset, len, fin, space) 1980 } 1981 1982 #[test] stream_frame_empty()1983 fn stream_frame_empty() { 1984 // Stream frames with empty data and no fin never work. 1985 assert!(!frame_sent(10, 0, false, 2)); 1986 assert!(!frame_sent(10, 0, false, 3)); 1987 assert!(!frame_sent(10, 0, false, 4)); 1988 assert!(!frame_sent(10, 0, false, 5)); 1989 assert!(!frame_sent(10, 0, false, 100)); 1990 1991 // Empty data with fin is only a problem if there is no space. 1992 assert!(!frame_sent(0, 0, true, 1)); 1993 assert!(frame_sent(0, 0, true, 2)); 1994 assert!(!frame_sent(10, 0, true, 2)); 1995 assert!(frame_sent(10, 0, true, 3)); 1996 assert!(frame_sent(10, 0, true, 4)); 1997 assert!(frame_sent(10, 0, true, 5)); 1998 assert!(frame_sent(10, 0, true, 100)); 1999 } 2000 2001 #[test] stream_frame_minimum()2002 fn stream_frame_minimum() { 2003 // Add minimum data 2004 assert!(!frame_sent(10, 1, false, 3)); 2005 assert!(!frame_sent(10, 1, true, 3)); 2006 assert!(frame_sent(10, 1, false, 4)); 2007 assert!(frame_sent(10, 1, true, 4)); 2008 assert!(frame_sent(10, 1, false, 5)); 2009 assert!(frame_sent(10, 1, true, 5)); 2010 assert!(frame_sent(10, 1, false, 100)); 2011 assert!(frame_sent(10, 1, true, 100)); 2012 } 2013 2014 #[test] stream_frame_more()2015 fn stream_frame_more() { 2016 // Try more data 2017 assert!(!frame_sent(10, 100, false, 3)); 2018 assert!(!frame_sent(10, 100, true, 3)); 2019 assert!(frame_sent(10, 100, false, 4)); 2020 assert!(frame_sent(10, 100, true, 4)); 2021 assert!(frame_sent(10, 100, false, 5)); 2022 assert!(frame_sent(10, 100, true, 5)); 2023 assert!(frame_sent(10, 100, false, 100)); 2024 assert!(frame_sent(10, 100, true, 100)); 2025 2026 assert!(frame_sent(10, 100, false, 1000)); 2027 assert!(frame_sent(10, 100, true, 1000)); 2028 } 2029 2030 #[test] stream_frame_big_id()2031 fn stream_frame_big_id() { 2032 // A value that encodes to the largest varint. 2033 const BIG: u64 = 1 << 30; 2034 const BIGSZ: usize = 1 << 30; 2035 2036 assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 16)); 2037 assert!(!frame_sent_sid(BIG, BIGSZ, 0, true, 16)); 2038 assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 17)); 2039 assert!(frame_sent_sid(BIG, BIGSZ, 0, true, 17)); 2040 assert!(!frame_sent_sid(BIG, BIGSZ, 0, false, 18)); 2041 assert!(frame_sent_sid(BIG, BIGSZ, 0, true, 18)); 2042 2043 assert!(!frame_sent_sid(BIG, BIGSZ, 1, false, 17)); 2044 assert!(!frame_sent_sid(BIG, BIGSZ, 1, true, 17)); 2045 assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 18)); 2046 assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 18)); 2047 assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 19)); 2048 assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 19)); 2049 assert!(frame_sent_sid(BIG, BIGSZ, 1, false, 100)); 2050 assert!(frame_sent_sid(BIG, BIGSZ, 1, true, 100)); 2051 } 2052 2053 #[test] stream_frame_16384()2054 fn stream_frame_16384() { 2055 const DATA16384: &[u8] = &[0x43; 16384]; 2056 2057 // 16383/16384 is an odd boundary in STREAM frame construction. 2058 // That is the boundary where a length goes from 2 bytes to 4 bytes. 2059 // If the data fits in the available space, then it is simple: 2060 let mut s = stream_with_sent(0, 0); 2061 s.send(DATA16384).unwrap(); 2062 s.close(); 2063 2064 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 2065 let header_len = builder.len(); 2066 builder.set_limit(header_len + DATA16384.len() + 2); 2067 let mut tokens = Vec::new(); 2068 let mut stats = FrameStats::default(); 2069 s.write_stream_frame( 2070 TransmissionPriority::default(), 2071 &mut builder, 2072 &mut tokens, 2073 &mut stats, 2074 ); 2075 assert_eq!(stats.stream, 1); 2076 // Expect STREAM + FIN only. 2077 assert_eq!(&builder[header_len..header_len + 2], &[0b1001, 0]); 2078 assert_eq!(&builder[header_len + 2..], DATA16384); 2079 2080 s.mark_as_lost(0, DATA16384.len(), true); 2081 2082 // However, if there is one extra byte of space, we will try to add a length. 2083 // That length will then make the frame to be too large and the data will be 2084 // truncated. The frame could carry one more byte of data, but it's a corner 2085 // case we don't want to address as it should be rare (if not impossible). 2086 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 2087 let header_len = builder.len(); 2088 builder.set_limit(header_len + DATA16384.len() + 3); 2089 s.write_stream_frame( 2090 TransmissionPriority::default(), 2091 &mut builder, 2092 &mut tokens, 2093 &mut stats, 2094 ); 2095 assert_eq!(stats.stream, 2); 2096 // Expect STREAM + LEN + FIN. 2097 assert_eq!( 2098 &builder[header_len..header_len + 4], 2099 &[0b1010, 0, 0x7f, 0xfd] 2100 ); 2101 assert_eq!( 2102 &builder[header_len + 4..], 2103 &DATA16384[..DATA16384.len() - 3] 2104 ); 2105 } 2106 2107 #[test] stream_frame_64()2108 fn stream_frame_64() { 2109 const DATA64: &[u8] = &[0x43; 64]; 2110 2111 // Unlike 16383/16384, the boundary at 63/64 is easy because the difference 2112 // is just one byte. We lose just the last byte when there is more space. 2113 let mut s = stream_with_sent(0, 0); 2114 s.send(DATA64).unwrap(); 2115 s.close(); 2116 2117 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 2118 let header_len = builder.len(); 2119 builder.set_limit(header_len + 66); 2120 let mut tokens = Vec::new(); 2121 let mut stats = FrameStats::default(); 2122 s.write_stream_frame( 2123 TransmissionPriority::default(), 2124 &mut builder, 2125 &mut tokens, 2126 &mut stats, 2127 ); 2128 assert_eq!(stats.stream, 1); 2129 // Expect STREAM + FIN only. 2130 assert_eq!(&builder[header_len..header_len + 2], &[0b1001, 0]); 2131 assert_eq!(&builder[header_len + 2..], DATA64); 2132 2133 s.mark_as_lost(0, DATA64.len(), true); 2134 2135 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 2136 let header_len = builder.len(); 2137 builder.set_limit(header_len + 67); 2138 s.write_stream_frame( 2139 TransmissionPriority::default(), 2140 &mut builder, 2141 &mut tokens, 2142 &mut stats, 2143 ); 2144 assert_eq!(stats.stream, 2); 2145 // Expect STREAM + LEN, not FIN. 2146 assert_eq!(&builder[header_len..header_len + 3], &[0b1010, 0, 63]); 2147 assert_eq!(&builder[header_len + 3..], &DATA64[..63]); 2148 } 2149 } 2150