1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 4 // option. This file may not be copied, modified, or distributed 5 // except according to those terms. 6 7 // Tracks possibly-redundant flow control signals from other code and converts 8 // into flow control frames needing to be sent to the remote. 9 10 use crate::frame::{ 11 FRAME_TYPE_DATA_BLOCKED, FRAME_TYPE_MAX_DATA, FRAME_TYPE_MAX_STREAMS_BIDI, 12 FRAME_TYPE_MAX_STREAMS_UNIDI, FRAME_TYPE_MAX_STREAM_DATA, FRAME_TYPE_STREAMS_BLOCKED_BIDI, 13 FRAME_TYPE_STREAMS_BLOCKED_UNIDI, FRAME_TYPE_STREAM_DATA_BLOCKED, 14 }; 15 use crate::packet::PacketBuilder; 16 use crate::recovery::{RecoveryToken, StreamRecoveryToken}; 17 use crate::stats::FrameStats; 18 use crate::stream_id::{StreamId, StreamType}; 19 use crate::{Error, Res}; 20 use neqo_common::{qtrace, Role}; 21 22 use std::convert::TryFrom; 23 use std::fmt::Debug; 24 use std::ops::{Deref, DerefMut}; 25 use std::ops::{Index, IndexMut}; 26 27 #[derive(Debug)] 28 pub struct SenderFlowControl<T> 29 where 30 T: Debug + Sized, 31 { 32 /// The thing that we're counting for. 33 subject: T, 34 /// The limit. 35 limit: u64, 36 /// How much of that limit we've used. 37 used: u64, 38 /// The point at which blocking occurred. This is updated each time 39 /// the sender decides that it is blocked. It only ever changes 40 /// when blocking occurs. This ensures that blocking at any given limit 41 /// is only reported once. 42 /// Note: All values are one greater than the corresponding `limit` to 43 /// allow distinguishing between blocking at a limit of 0 and no blocking. 44 blocked_at: u64, 45 /// Whether a blocked frame should be sent. 46 blocked_frame: bool, 47 } 48 49 impl<T> SenderFlowControl<T> 50 where 51 T: Debug + Sized, 52 { 53 /// Make a new instance with the initial value and subject. new(subject: T, initial: u64) -> Self54 pub fn new(subject: T, initial: u64) -> Self { 55 Self { 56 subject, 57 limit: initial, 58 used: 0, 59 blocked_at: 0, 60 blocked_frame: false, 61 } 62 } 63 64 /// Update the maximum. Returns `true` if the change was an increase. update(&mut self, limit: u64) -> bool65 pub fn update(&mut self, limit: u64) -> bool { 66 debug_assert!(limit < u64::MAX); 67 if limit > self.limit { 68 self.limit = limit; 69 self.blocked_frame = false; 70 true 71 } else { 72 false 73 } 74 } 75 76 /// Consume flow control. consume(&mut self, count: usize)77 pub fn consume(&mut self, count: usize) { 78 let amt = u64::try_from(count).unwrap(); 79 debug_assert!(self.used + amt <= self.limit); 80 self.used += amt; 81 } 82 83 /// Get available flow control. available(&self) -> usize84 pub fn available(&self) -> usize { 85 usize::try_from(self.limit - self.used).unwrap_or(usize::MAX) 86 } 87 88 /// How much data has been written. used(&self) -> u6489 pub fn used(&self) -> u64 { 90 self.used 91 } 92 93 /// Mark flow control as blocked. 94 /// This only does something if the current limit exceeds the last reported blocking limit. blocked(&mut self)95 pub fn blocked(&mut self) { 96 if self.limit >= self.blocked_at { 97 self.blocked_at = self.limit + 1; 98 self.blocked_frame = true; 99 } 100 } 101 102 /// Return whether a blocking frame needs to be sent. 103 /// This is `Some` with the active limit if `blocked` has been called, 104 /// if a blocking frame has not been sent (or it has been lost), and 105 /// if the blocking condition remains. blocked_needed(&self) -> Option<u64>106 fn blocked_needed(&self) -> Option<u64> { 107 if self.blocked_frame && self.limit < self.blocked_at { 108 Some(self.blocked_at - 1) 109 } else { 110 None 111 } 112 } 113 114 /// Clear the need to send a blocked frame. blocked_sent(&mut self)115 fn blocked_sent(&mut self) { 116 self.blocked_frame = false; 117 } 118 119 /// Mark a blocked frame as having been lost. 120 /// Only send again if value of `self.blocked_at` hasn't increased since sending. 121 /// That would imply that the limit has since increased. frame_lost(&mut self, limit: u64)122 pub fn frame_lost(&mut self, limit: u64) { 123 if self.blocked_at == limit + 1 { 124 self.blocked_frame = true; 125 } 126 } 127 } 128 129 impl SenderFlowControl<()> { write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )130 pub fn write_frames( 131 &mut self, 132 builder: &mut PacketBuilder, 133 tokens: &mut Vec<RecoveryToken>, 134 stats: &mut FrameStats, 135 ) { 136 if let Some(limit) = self.blocked_needed() { 137 if builder.write_varint_frame(&[FRAME_TYPE_DATA_BLOCKED, limit]) { 138 stats.data_blocked += 1; 139 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::DataBlocked( 140 limit, 141 ))); 142 self.blocked_sent(); 143 } 144 } 145 } 146 } 147 148 impl SenderFlowControl<StreamId> { write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )149 pub fn write_frames( 150 &mut self, 151 builder: &mut PacketBuilder, 152 tokens: &mut Vec<RecoveryToken>, 153 stats: &mut FrameStats, 154 ) { 155 if let Some(limit) = self.blocked_needed() { 156 if builder.write_varint_frame(&[ 157 FRAME_TYPE_STREAM_DATA_BLOCKED, 158 self.subject.as_u64(), 159 limit, 160 ]) { 161 stats.stream_data_blocked += 1; 162 tokens.push(RecoveryToken::Stream( 163 StreamRecoveryToken::StreamDataBlocked { 164 stream_id: self.subject, 165 limit, 166 }, 167 )); 168 self.blocked_sent(); 169 } 170 } 171 } 172 } 173 174 impl SenderFlowControl<StreamType> { write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )175 pub fn write_frames( 176 &mut self, 177 builder: &mut PacketBuilder, 178 tokens: &mut Vec<RecoveryToken>, 179 stats: &mut FrameStats, 180 ) { 181 if let Some(limit) = self.blocked_needed() { 182 let frame = match self.subject { 183 StreamType::BiDi => FRAME_TYPE_STREAMS_BLOCKED_BIDI, 184 StreamType::UniDi => FRAME_TYPE_STREAMS_BLOCKED_UNIDI, 185 }; 186 if builder.write_varint_frame(&[frame, limit]) { 187 stats.streams_blocked += 1; 188 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::StreamsBlocked { 189 stream_type: self.subject, 190 limit, 191 })); 192 self.blocked_sent(); 193 } 194 } 195 } 196 } 197 198 #[derive(Debug)] 199 pub struct ReceiverFlowControl<T> 200 where 201 T: Debug + Sized, 202 { 203 /// The thing that we're counting for. 204 subject: T, 205 /// The maximum amount of items that can be active (e.g., the size of the receive buffer). 206 max_active: u64, 207 /// Last max allowed sent. 208 max_allowed: u64, 209 /// Item received, but not retired yet. 210 /// This will be used for byte flow control: each stream will remember is largest byte 211 /// offset received and session flow control will remember the sum of all bytes consumed 212 /// by all streams. 213 consumed: u64, 214 /// Retired items. 215 retired: u64, 216 frame_pending: bool, 217 } 218 219 impl<T> ReceiverFlowControl<T> 220 where 221 T: Debug + Sized, 222 { 223 /// Make a new instance with the initial value and subject. new(subject: T, max: u64) -> Self224 pub fn new(subject: T, max: u64) -> Self { 225 Self { 226 subject, 227 max_active: max, 228 max_allowed: max, 229 consumed: 0, 230 retired: 0, 231 frame_pending: false, 232 } 233 } 234 235 /// Retired some items and maybe send flow control 236 /// update. retire(&mut self, retired: u64)237 pub fn retire(&mut self, retired: u64) { 238 if retired <= self.retired { 239 return; 240 } 241 242 self.retired = retired; 243 if self.retired + self.max_active / 2 > self.max_allowed { 244 self.frame_pending = true; 245 } 246 } 247 248 /// This function is called when STREAM_DATA_BLOCKED frame is received. 249 /// The flow control will try to send an update if possible. send_flowc_update(&mut self)250 pub fn send_flowc_update(&mut self) { 251 if self.retired + self.max_active > self.max_allowed { 252 self.frame_pending = true; 253 } 254 } 255 frame_needed(&self) -> bool256 pub fn frame_needed(&self) -> bool { 257 self.frame_pending 258 } 259 next_limit(&self) -> u64260 pub fn next_limit(&self) -> u64 { 261 self.retired + self.max_active 262 } 263 max_active(&self) -> u64264 pub fn max_active(&self) -> u64 { 265 self.max_active 266 } 267 frame_lost(&mut self, maximum_data: u64)268 pub fn frame_lost(&mut self, maximum_data: u64) { 269 if maximum_data == self.max_allowed { 270 self.frame_pending = true; 271 } 272 } 273 frame_sent(&mut self, new_max: u64)274 fn frame_sent(&mut self, new_max: u64) { 275 self.max_allowed = new_max; 276 self.frame_pending = false; 277 } 278 set_max_active(&mut self, max: u64)279 pub fn set_max_active(&mut self, max: u64) { 280 // If max_active has been increased, send an update immediately. 281 self.frame_pending |= self.max_active < max; 282 self.max_active = max; 283 } 284 retired(&self) -> u64285 pub fn retired(&self) -> u64 { 286 self.retired 287 } 288 consumed(&self) -> u64289 pub fn consumed(&self) -> u64 { 290 self.consumed 291 } 292 } 293 294 impl ReceiverFlowControl<()> { write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )295 pub fn write_frames( 296 &mut self, 297 builder: &mut PacketBuilder, 298 tokens: &mut Vec<RecoveryToken>, 299 stats: &mut FrameStats, 300 ) { 301 if !self.frame_needed() { 302 return; 303 } 304 let max_allowed = self.next_limit(); 305 if builder.write_varint_frame(&[FRAME_TYPE_MAX_DATA, max_allowed]) { 306 stats.max_data += 1; 307 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::MaxData( 308 max_allowed, 309 ))); 310 self.frame_sent(max_allowed); 311 } 312 } 313 add_retired(&mut self, count: u64)314 pub fn add_retired(&mut self, count: u64) { 315 debug_assert!(self.retired + count <= self.consumed); 316 self.retired += count; 317 if self.retired + self.max_active / 2 > self.max_allowed { 318 self.frame_pending = true; 319 } 320 } 321 consume(&mut self, count: u64) -> Res<()>322 pub fn consume(&mut self, count: u64) -> Res<()> { 323 if self.consumed + count > self.max_allowed { 324 qtrace!( 325 "Session RX window exceeded: consumed:{} new:{} limit:{}", 326 self.consumed, 327 count, 328 self.max_allowed 329 ); 330 return Err(Error::FlowControlError); 331 } 332 self.consumed += count; 333 Ok(()) 334 } 335 } 336 337 impl Default for ReceiverFlowControl<()> { default() -> Self338 fn default() -> Self { 339 Self::new((), 0) 340 } 341 } 342 343 impl ReceiverFlowControl<StreamId> { write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )344 pub fn write_frames( 345 &mut self, 346 builder: &mut PacketBuilder, 347 tokens: &mut Vec<RecoveryToken>, 348 stats: &mut FrameStats, 349 ) { 350 if !self.frame_needed() { 351 return; 352 } 353 let max_allowed = self.next_limit(); 354 if builder.write_varint_frame(&[ 355 FRAME_TYPE_MAX_STREAM_DATA, 356 self.subject.as_u64(), 357 max_allowed, 358 ]) { 359 stats.max_stream_data += 1; 360 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::MaxStreamData { 361 stream_id: self.subject, 362 max_data: max_allowed, 363 })); 364 self.frame_sent(max_allowed); 365 } 366 } 367 add_retired(&mut self, count: u64)368 pub fn add_retired(&mut self, count: u64) { 369 debug_assert!(self.retired + count <= self.consumed); 370 self.retired += count; 371 if self.retired + self.max_active / 2 > self.max_allowed { 372 self.frame_pending = true; 373 } 374 } 375 set_consumed(&mut self, consumed: u64) -> Res<u64>376 pub fn set_consumed(&mut self, consumed: u64) -> Res<u64> { 377 if consumed <= self.consumed { 378 return Ok(0); 379 } 380 381 if consumed > self.max_allowed { 382 qtrace!("Stream RX window exceeded: {}", consumed); 383 return Err(Error::FlowControlError); 384 } 385 let new_consumed = consumed - self.consumed; 386 self.consumed = consumed; 387 Ok(new_consumed) 388 } 389 } 390 391 impl Default for ReceiverFlowControl<StreamId> { default() -> Self392 fn default() -> Self { 393 Self::new(StreamId::new(0), 0) 394 } 395 } 396 397 impl ReceiverFlowControl<StreamType> { write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, )398 pub fn write_frames( 399 &mut self, 400 builder: &mut PacketBuilder, 401 tokens: &mut Vec<RecoveryToken>, 402 stats: &mut FrameStats, 403 ) { 404 if !self.frame_needed() { 405 return; 406 } 407 let max_streams = self.next_limit(); 408 let frame = match self.subject { 409 StreamType::BiDi => FRAME_TYPE_MAX_STREAMS_BIDI, 410 StreamType::UniDi => FRAME_TYPE_MAX_STREAMS_UNIDI, 411 }; 412 if builder.write_varint_frame(&[frame, max_streams]) { 413 stats.max_streams += 1; 414 tokens.push(RecoveryToken::Stream(StreamRecoveryToken::MaxStreams { 415 stream_type: self.subject, 416 max_streams, 417 })); 418 self.frame_sent(max_streams); 419 } 420 } 421 422 /// Check if received item exceeds the allowed flow control limit. check_allowed(&self, new_end: u64) -> bool423 pub fn check_allowed(&self, new_end: u64) -> bool { 424 new_end < self.max_allowed 425 } 426 427 /// Retire given amount of additional data. 428 /// This function will send flow updates immediately. add_retired(&mut self, count: u64)429 pub fn add_retired(&mut self, count: u64) { 430 self.retired += count; 431 if count > 0 { 432 self.send_flowc_update(); 433 } 434 } 435 } 436 437 pub struct RemoteStreamLimit { 438 streams_fc: ReceiverFlowControl<StreamType>, 439 next_stream: StreamId, 440 } 441 442 impl RemoteStreamLimit { new(stream_type: StreamType, max_streams: u64, role: Role) -> Self443 pub fn new(stream_type: StreamType, max_streams: u64, role: Role) -> Self { 444 Self { 445 streams_fc: ReceiverFlowControl::new(stream_type, max_streams), 446 // // This is for a stream created by a peer, therefore we use role.remote(). 447 next_stream: StreamId::init(stream_type, role.remote()), 448 } 449 } 450 is_allowed(&self, stream_id: StreamId) -> bool451 pub fn is_allowed(&self, stream_id: StreamId) -> bool { 452 let stream_idx = stream_id.as_u64() >> 2; 453 self.streams_fc.check_allowed(stream_idx) 454 } 455 is_new_stream(&self, stream_id: StreamId) -> Res<bool>456 pub fn is_new_stream(&self, stream_id: StreamId) -> Res<bool> { 457 if !self.is_allowed(stream_id) { 458 return Err(Error::StreamLimitError); 459 } 460 Ok(stream_id >= self.next_stream) 461 } 462 take_stream_id(&mut self) -> StreamId463 pub fn take_stream_id(&mut self) -> StreamId { 464 let new_stream = self.next_stream; 465 self.next_stream.next(); 466 assert!(self.is_allowed(new_stream)); 467 new_stream 468 } 469 } 470 471 impl Deref for RemoteStreamLimit { 472 type Target = ReceiverFlowControl<StreamType>; deref(&self) -> &Self::Target473 fn deref(&self) -> &Self::Target { 474 &self.streams_fc 475 } 476 } 477 478 impl DerefMut for RemoteStreamLimit { deref_mut(&mut self) -> &mut Self::Target479 fn deref_mut(&mut self) -> &mut Self::Target { 480 &mut self.streams_fc 481 } 482 } 483 484 pub struct RemoteStreamLimits { 485 bidirectional: RemoteStreamLimit, 486 unidirectional: RemoteStreamLimit, 487 } 488 489 impl RemoteStreamLimits { new(local_max_stream_bidi: u64, local_max_stream_uni: u64, role: Role) -> Self490 pub fn new(local_max_stream_bidi: u64, local_max_stream_uni: u64, role: Role) -> Self { 491 Self { 492 bidirectional: RemoteStreamLimit::new(StreamType::BiDi, local_max_stream_bidi, role), 493 unidirectional: RemoteStreamLimit::new(StreamType::UniDi, local_max_stream_uni, role), 494 } 495 } 496 } 497 498 impl Index<StreamType> for RemoteStreamLimits { 499 type Output = RemoteStreamLimit; 500 index(&self, idx: StreamType) -> &Self::Output501 fn index(&self, idx: StreamType) -> &Self::Output { 502 match idx { 503 StreamType::BiDi => &self.bidirectional, 504 StreamType::UniDi => &self.unidirectional, 505 } 506 } 507 } 508 509 impl IndexMut<StreamType> for RemoteStreamLimits { index_mut(&mut self, idx: StreamType) -> &mut Self::Output510 fn index_mut(&mut self, idx: StreamType) -> &mut Self::Output { 511 match idx { 512 StreamType::BiDi => &mut self.bidirectional, 513 StreamType::UniDi => &mut self.unidirectional, 514 } 515 } 516 } 517 518 pub struct LocalStreamLimits { 519 bidirectional: SenderFlowControl<StreamType>, 520 unidirectional: SenderFlowControl<StreamType>, 521 role_bit: u64, 522 } 523 524 impl LocalStreamLimits { new(role: Role) -> Self525 pub fn new(role: Role) -> Self { 526 Self { 527 bidirectional: SenderFlowControl::new(StreamType::BiDi, 0), 528 unidirectional: SenderFlowControl::new(StreamType::UniDi, 0), 529 role_bit: StreamId::role_bit(role), 530 } 531 } 532 take_stream_id(&mut self, stream_type: StreamType) -> Option<StreamId>533 pub fn take_stream_id(&mut self, stream_type: StreamType) -> Option<StreamId> { 534 let fc = match stream_type { 535 StreamType::BiDi => &mut self.bidirectional, 536 StreamType::UniDi => &mut self.unidirectional, 537 }; 538 if fc.available() > 0 { 539 let new_stream = fc.used(); 540 fc.consume(1); 541 let type_bit = match stream_type { 542 StreamType::BiDi => 0, 543 StreamType::UniDi => 2, 544 }; 545 Some(StreamId::from((new_stream << 2) + type_bit + self.role_bit)) 546 } else { 547 fc.blocked(); 548 None 549 } 550 } 551 } 552 553 impl Index<StreamType> for LocalStreamLimits { 554 type Output = SenderFlowControl<StreamType>; 555 index(&self, idx: StreamType) -> &Self::Output556 fn index(&self, idx: StreamType) -> &Self::Output { 557 match idx { 558 StreamType::BiDi => &self.bidirectional, 559 StreamType::UniDi => &self.unidirectional, 560 } 561 } 562 } 563 564 impl IndexMut<StreamType> for LocalStreamLimits { index_mut(&mut self, idx: StreamType) -> &mut Self::Output565 fn index_mut(&mut self, idx: StreamType) -> &mut Self::Output { 566 match idx { 567 StreamType::BiDi => &mut self.bidirectional, 568 StreamType::UniDi => &mut self.unidirectional, 569 } 570 } 571 } 572 573 #[cfg(test)] 574 mod test { 575 use super::{LocalStreamLimits, ReceiverFlowControl, RemoteStreamLimits, SenderFlowControl}; 576 use crate::packet::PacketBuilder; 577 use crate::stats::FrameStats; 578 use crate::stream_id::{StreamId, StreamType}; 579 use crate::Error; 580 use neqo_common::{Encoder, Role}; 581 582 #[test] blocked_at_zero()583 fn blocked_at_zero() { 584 let mut fc = SenderFlowControl::new((), 0); 585 fc.blocked(); 586 assert_eq!(fc.blocked_needed(), Some(0)); 587 } 588 589 #[test] blocked()590 fn blocked() { 591 let mut fc = SenderFlowControl::new((), 10); 592 fc.blocked(); 593 assert_eq!(fc.blocked_needed(), Some(10)); 594 } 595 596 #[test] update_consume()597 fn update_consume() { 598 let mut fc = SenderFlowControl::new((), 10); 599 fc.consume(10); 600 assert_eq!(fc.available(), 0); 601 fc.update(5); // An update lower than the current limit does nothing. 602 assert_eq!(fc.available(), 0); 603 fc.update(15); 604 assert_eq!(fc.available(), 5); 605 fc.consume(3); 606 assert_eq!(fc.available(), 2); 607 } 608 609 #[test] update_clears_blocked()610 fn update_clears_blocked() { 611 let mut fc = SenderFlowControl::new((), 10); 612 fc.blocked(); 613 assert_eq!(fc.blocked_needed(), Some(10)); 614 fc.update(5); // An update lower than the current limit does nothing. 615 assert_eq!(fc.blocked_needed(), Some(10)); 616 fc.update(11); 617 assert_eq!(fc.blocked_needed(), None); 618 } 619 620 #[test] lost_blocked_resent()621 fn lost_blocked_resent() { 622 let mut fc = SenderFlowControl::new((), 10); 623 fc.blocked(); 624 fc.blocked_sent(); 625 assert_eq!(fc.blocked_needed(), None); 626 fc.frame_lost(10); 627 assert_eq!(fc.blocked_needed(), Some(10)); 628 } 629 630 #[test] lost_after_increase()631 fn lost_after_increase() { 632 let mut fc = SenderFlowControl::new((), 10); 633 fc.blocked(); 634 fc.blocked_sent(); 635 assert_eq!(fc.blocked_needed(), None); 636 fc.update(11); 637 fc.frame_lost(10); 638 assert_eq!(fc.blocked_needed(), None); 639 } 640 641 #[test] lost_after_higher_blocked()642 fn lost_after_higher_blocked() { 643 let mut fc = SenderFlowControl::new((), 10); 644 fc.blocked(); 645 fc.blocked_sent(); 646 fc.update(11); 647 fc.blocked(); 648 assert_eq!(fc.blocked_needed(), Some(11)); 649 fc.blocked_sent(); 650 fc.frame_lost(10); 651 assert_eq!(fc.blocked_needed(), None); 652 } 653 654 #[test] do_no_need_max_allowed_frame_at_start()655 fn do_no_need_max_allowed_frame_at_start() { 656 let fc = ReceiverFlowControl::new((), 0); 657 assert!(!fc.frame_needed()); 658 } 659 660 #[test] max_allowed_after_items_retired()661 fn max_allowed_after_items_retired() { 662 let mut fc = ReceiverFlowControl::new((), 100); 663 fc.retire(49); 664 assert!(!fc.frame_needed()); 665 fc.retire(51); 666 assert!(fc.frame_needed()); 667 assert_eq!(fc.next_limit(), 151); 668 } 669 670 #[test] need_max_allowed_frame_after_loss()671 fn need_max_allowed_frame_after_loss() { 672 let mut fc = ReceiverFlowControl::new((), 100); 673 fc.retire(100); 674 assert!(fc.frame_needed()); 675 assert_eq!(fc.next_limit(), 200); 676 fc.frame_sent(200); 677 assert!(!fc.frame_needed()); 678 fc.frame_lost(200); 679 assert!(fc.frame_needed()); 680 assert_eq!(fc.next_limit(), 200); 681 } 682 683 #[test] no_max_allowed_frame_after_old_loss()684 fn no_max_allowed_frame_after_old_loss() { 685 let mut fc = ReceiverFlowControl::new((), 100); 686 fc.retire(51); 687 assert!(fc.frame_needed()); 688 assert_eq!(fc.next_limit(), 151); 689 fc.frame_sent(151); 690 assert!(!fc.frame_needed()); 691 fc.retire(102); 692 assert!(fc.frame_needed()); 693 assert_eq!(fc.next_limit(), 202); 694 fc.frame_sent(202); 695 assert!(!fc.frame_needed()); 696 fc.frame_lost(151); 697 assert!(!fc.frame_needed()); 698 } 699 700 #[test] force_send_max_allowed()701 fn force_send_max_allowed() { 702 let mut fc = ReceiverFlowControl::new((), 100); 703 fc.retire(10); 704 assert!(!fc.frame_needed()); 705 } 706 707 #[test] multiple_retries_after_frame_pending_is_set()708 fn multiple_retries_after_frame_pending_is_set() { 709 let mut fc = ReceiverFlowControl::new((), 100); 710 fc.retire(51); 711 assert!(fc.frame_needed()); 712 assert_eq!(fc.next_limit(), 151); 713 fc.retire(61); 714 assert!(fc.frame_needed()); 715 assert_eq!(fc.next_limit(), 161); 716 fc.retire(88); 717 assert!(fc.frame_needed()); 718 assert_eq!(fc.next_limit(), 188); 719 fc.retire(90); 720 assert!(fc.frame_needed()); 721 assert_eq!(fc.next_limit(), 190); 722 fc.frame_sent(190); 723 assert!(!fc.frame_needed()); 724 fc.retire(141); 725 assert!(fc.frame_needed()); 726 assert_eq!(fc.next_limit(), 241); 727 fc.frame_sent(241); 728 assert!(!fc.frame_needed()); 729 } 730 731 #[test] new_retired_before_loss()732 fn new_retired_before_loss() { 733 let mut fc = ReceiverFlowControl::new((), 100); 734 fc.retire(51); 735 assert!(fc.frame_needed()); 736 assert_eq!(fc.next_limit(), 151); 737 fc.frame_sent(151); 738 assert!(!fc.frame_needed()); 739 fc.retire(62); 740 assert!(!fc.frame_needed()); 741 fc.frame_lost(151); 742 assert!(fc.frame_needed()); 743 assert_eq!(fc.next_limit(), 162); 744 } 745 746 #[test] changing_max_active()747 fn changing_max_active() { 748 let mut fc = ReceiverFlowControl::new((), 100); 749 fc.set_max_active(50); 750 // There is no MAX_STREAM_DATA frame needed. 751 assert!(!fc.frame_needed()); 752 // We can still retire more than 50. 753 fc.retire(60); 754 // There is no MAX_STREAM_DATA fame needed yet. 755 assert!(!fc.frame_needed()); 756 fc.retire(76); 757 assert!(fc.frame_needed()); 758 assert_eq!(fc.next_limit(), 126); 759 760 // Increase max_active. 761 fc.set_max_active(60); 762 assert!(fc.frame_needed()); 763 assert_eq!(fc.next_limit(), 136); 764 765 // We can retire more than 60. 766 fc.retire(136); 767 assert!(fc.frame_needed()); 768 assert_eq!(fc.next_limit(), 196); 769 } 770 remote_stream_limits(role: Role, bidi: u64, unidi: u64)771 fn remote_stream_limits(role: Role, bidi: u64, unidi: u64) { 772 let mut fc = RemoteStreamLimits::new(2, 1, role); 773 assert!(fc[StreamType::BiDi] 774 .is_new_stream(StreamId::from(bidi)) 775 .unwrap()); 776 assert!(fc[StreamType::BiDi] 777 .is_new_stream(StreamId::from(bidi + 4)) 778 .unwrap()); 779 assert!(fc[StreamType::UniDi] 780 .is_new_stream(StreamId::from(unidi)) 781 .unwrap()); 782 783 // Exceed limits 784 assert_eq!( 785 fc[StreamType::BiDi].is_new_stream(StreamId::from(bidi + 8)), 786 Err(Error::StreamLimitError) 787 ); 788 assert_eq!( 789 fc[StreamType::UniDi].is_new_stream(StreamId::from(unidi + 4)), 790 Err(Error::StreamLimitError) 791 ); 792 793 assert_eq!(fc[StreamType::BiDi].take_stream_id(), StreamId::from(bidi)); 794 assert_eq!( 795 fc[StreamType::BiDi].take_stream_id(), 796 StreamId::from(bidi + 4) 797 ); 798 assert_eq!( 799 fc[StreamType::UniDi].take_stream_id(), 800 StreamId::from(unidi) 801 ); 802 803 fc[StreamType::BiDi].add_retired(1); 804 fc[StreamType::BiDi].send_flowc_update(); 805 // consume the frame 806 let mut builder = PacketBuilder::short(Encoder::new(), false, &[]); 807 let mut tokens = Vec::new(); 808 fc[StreamType::BiDi].write_frames(&mut builder, &mut tokens, &mut FrameStats::default()); 809 assert_eq!(tokens.len(), 1); 810 811 // Now 9 can be a new StreamId. 812 assert!(fc[StreamType::BiDi] 813 .is_new_stream(StreamId::from(bidi + 8)) 814 .unwrap()); 815 assert_eq!( 816 fc[StreamType::BiDi].take_stream_id(), 817 StreamId::from(bidi + 8) 818 ); 819 // 13 still exceeds limits 820 assert_eq!( 821 fc[StreamType::BiDi].is_new_stream(StreamId::from(bidi + 12)), 822 Err(Error::StreamLimitError) 823 ); 824 825 fc[StreamType::UniDi].add_retired(1); 826 fc[StreamType::UniDi].send_flowc_update(); 827 // consume the frame 828 fc[StreamType::UniDi].write_frames(&mut builder, &mut tokens, &mut FrameStats::default()); 829 assert_eq!(tokens.len(), 2); 830 831 // Now 7 can be a new StreamId. 832 assert!(fc[StreamType::UniDi] 833 .is_new_stream(StreamId::from(unidi + 4)) 834 .unwrap()); 835 assert_eq!( 836 fc[StreamType::UniDi].take_stream_id(), 837 StreamId::from(unidi + 4) 838 ); 839 // 11 exceeds limits 840 assert_eq!( 841 fc[StreamType::UniDi].is_new_stream(StreamId::from(unidi + 8)), 842 Err(Error::StreamLimitError) 843 ); 844 } 845 846 #[test] remote_stream_limits_new_stream_client()847 fn remote_stream_limits_new_stream_client() { 848 remote_stream_limits(Role::Client, 1, 3); 849 } 850 851 #[test] remote_stream_limits_new_stream_server()852 fn remote_stream_limits_new_stream_server() { 853 remote_stream_limits(Role::Server, 0, 2); 854 } 855 856 #[should_panic] 857 #[test] remote_stream_limits_asserts_if_limit_exceeded()858 fn remote_stream_limits_asserts_if_limit_exceeded() { 859 let mut fc = RemoteStreamLimits::new(2, 1, Role::Client); 860 assert_eq!(fc[StreamType::BiDi].take_stream_id(), StreamId::from(1)); 861 assert_eq!(fc[StreamType::BiDi].take_stream_id(), StreamId::from(5)); 862 let _ = fc[StreamType::BiDi].take_stream_id(); 863 } 864 local_stream_limits(role: Role, bidi: u64, unidi: u64)865 fn local_stream_limits(role: Role, bidi: u64, unidi: u64) { 866 let mut fc = LocalStreamLimits::new(role); 867 868 fc[StreamType::BiDi].update(2); 869 fc[StreamType::UniDi].update(1); 870 871 // Add streams 872 assert_eq!( 873 fc.take_stream_id(StreamType::BiDi).unwrap(), 874 StreamId::from(bidi) 875 ); 876 assert_eq!( 877 fc.take_stream_id(StreamType::BiDi).unwrap(), 878 StreamId::from(bidi + 4) 879 ); 880 assert_eq!(fc.take_stream_id(StreamType::BiDi), None); 881 assert_eq!( 882 fc.take_stream_id(StreamType::UniDi).unwrap(), 883 StreamId::from(unidi) 884 ); 885 assert_eq!(fc.take_stream_id(StreamType::UniDi), None); 886 887 // Increase limit 888 fc[StreamType::BiDi].update(3); 889 fc[StreamType::UniDi].update(2); 890 assert_eq!( 891 fc.take_stream_id(StreamType::BiDi).unwrap(), 892 StreamId::from(bidi + 8) 893 ); 894 assert_eq!(fc.take_stream_id(StreamType::BiDi), None); 895 assert_eq!( 896 fc.take_stream_id(StreamType::UniDi).unwrap(), 897 StreamId::from(unidi + 4) 898 ); 899 assert_eq!(fc.take_stream_id(StreamType::UniDi), None); 900 } 901 902 #[test] local_stream_limits_new_stream_client()903 fn local_stream_limits_new_stream_client() { 904 local_stream_limits(Role::Client, 0, 2); 905 } 906 907 #[test] local_stream_limits_new_stream_server()908 fn local_stream_limits_new_stream_server() { 909 local_stream_limits(Role::Server, 1, 3); 910 } 911 } 912