1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * This source code is licensed under the MIT license found in the 5 * LICENSE file in the root directory of this source tree. 6 * 7 */ 8 9 #pragma once 10 11 #include <folly/container/F14Map.h> 12 #include <folly/container/F14Set.h> 13 #include <quic/QuicConstants.h> 14 #include <quic/codec/Types.h> 15 #include <quic/state/QuicStreamPrioritiesObserver.h> 16 #include <quic/state/StreamData.h> 17 #include <quic/state/TransportSettings.h> 18 #include <numeric> 19 #include <set> 20 21 namespace quic { 22 namespace detail { 23 24 constexpr uint8_t kStreamIncrement = 0x04; 25 } 26 27 class QuicStreamManager { 28 public: QuicStreamManager(QuicConnectionStateBase & conn,QuicNodeType nodeType,const TransportSettings & transportSettings)29 explicit QuicStreamManager( 30 QuicConnectionStateBase& conn, 31 QuicNodeType nodeType, 32 const TransportSettings& transportSettings) 33 : conn_(conn), 34 nodeType_(nodeType), 35 transportSettings_(&transportSettings) { 36 if (nodeType == QuicNodeType::Server) { 37 nextAcceptablePeerBidirectionalStreamId_ = 0x00; 38 nextAcceptablePeerUnidirectionalStreamId_ = 0x02; 39 nextAcceptableLocalBidirectionalStreamId_ = 0x01; 40 nextAcceptableLocalUnidirectionalStreamId_ = 0x03; 41 nextBidirectionalStreamId_ = 0x01; 42 nextUnidirectionalStreamId_ = 0x03; 43 initialLocalBidirectionalStreamId_ = 0x01; 44 initialLocalUnidirectionalStreamId_ = 0x03; 45 initialRemoteBidirectionalStreamId_ = 0x00; 46 initialRemoteUnidirectionalStreamId_ = 0x02; 47 } else { 48 nextAcceptablePeerBidirectionalStreamId_ = 0x01; 49 nextAcceptablePeerUnidirectionalStreamId_ = 0x03; 50 nextAcceptableLocalBidirectionalStreamId_ = 0x00; 51 nextAcceptableLocalUnidirectionalStreamId_ = 0x02; 52 nextBidirectionalStreamId_ = 0x00; 53 nextUnidirectionalStreamId_ = 0x02; 54 initialLocalBidirectionalStreamId_ = 0x00; 55 initialLocalUnidirectionalStreamId_ = 0x02; 56 initialRemoteBidirectionalStreamId_ = 0x01; 57 initialRemoteUnidirectionalStreamId_ = 0x03; 58 } 59 refreshTransportSettings(transportSettings); 60 } 61 62 /** 63 * Constructor to facilitate migration of a QuicStreamManager to another 64 * QuicConnectionStateBase 65 */ QuicStreamManager(QuicConnectionStateBase & conn,QuicNodeType nodeType,const TransportSettings & transportSettings,QuicStreamManager && other)66 explicit QuicStreamManager( 67 QuicConnectionStateBase& conn, 68 QuicNodeType nodeType, 69 const TransportSettings& transportSettings, 70 QuicStreamManager&& other) 71 : conn_(conn), 72 nodeType_(nodeType), 73 transportSettings_(&transportSettings) { 74 nextAcceptablePeerBidirectionalStreamId_ = 75 other.nextAcceptablePeerBidirectionalStreamId_; 76 nextAcceptablePeerUnidirectionalStreamId_ = 77 other.nextAcceptablePeerUnidirectionalStreamId_; 78 nextAcceptableLocalBidirectionalStreamId_ = 79 other.nextAcceptableLocalBidirectionalStreamId_; 80 nextAcceptableLocalUnidirectionalStreamId_ = 81 other.nextAcceptableLocalUnidirectionalStreamId_; 82 nextBidirectionalStreamId_ = other.nextBidirectionalStreamId_; 83 nextUnidirectionalStreamId_ = other.nextUnidirectionalStreamId_; 84 maxLocalBidirectionalStreamId_ = other.maxLocalBidirectionalStreamId_; 85 maxLocalUnidirectionalStreamId_ = other.maxLocalUnidirectionalStreamId_; 86 maxRemoteBidirectionalStreamId_ = other.maxRemoteBidirectionalStreamId_; 87 maxRemoteUnidirectionalStreamId_ = other.maxRemoteUnidirectionalStreamId_; 88 initialLocalBidirectionalStreamId_ = 89 other.initialLocalBidirectionalStreamId_; 90 initialLocalUnidirectionalStreamId_ = 91 other.initialLocalUnidirectionalStreamId_; 92 initialRemoteBidirectionalStreamId_ = 93 other.initialRemoteBidirectionalStreamId_; 94 initialRemoteUnidirectionalStreamId_ = 95 other.initialRemoteUnidirectionalStreamId_; 96 97 streamLimitWindowingFraction_ = other.streamLimitWindowingFraction_; 98 remoteBidirectionalStreamLimitUpdate_ = 99 other.remoteBidirectionalStreamLimitUpdate_; 100 remoteUnidirectionalStreamLimitUpdate_ = 101 other.remoteUnidirectionalStreamLimitUpdate_; 102 numControlStreams_ = other.numControlStreams_; 103 openBidirectionalPeerStreams_ = 104 std::move(other.openBidirectionalPeerStreams_); 105 openUnidirectionalPeerStreams_ = 106 std::move(other.openUnidirectionalPeerStreams_); 107 openBidirectionalLocalStreams_ = 108 std::move(other.openBidirectionalLocalStreams_); 109 openUnidirectionalLocalStreams_ = 110 std::move(other.openUnidirectionalLocalStreams_); 111 newPeerStreams_ = std::move(other.newPeerStreams_); 112 blockedStreams_ = std::move(other.blockedStreams_); 113 stopSendingStreams_ = std::move(other.stopSendingStreams_); 114 streamPriorityLevels_ = std::move(other.streamPriorityLevels_); 115 windowUpdates_ = std::move(other.windowUpdates_); 116 flowControlUpdated_ = std::move(other.flowControlUpdated_); 117 lossStreams_ = std::move(other.lossStreams_); 118 readableStreams_ = std::move(other.readableStreams_); 119 peekableStreams_ = std::move(other.peekableStreams_); 120 writableStreams_ = std::move(other.writableStreams_); 121 writableDSRStreams_ = std::move(other.writableDSRStreams_); 122 writableControlStreams_ = std::move(other.writableControlStreams_); 123 txStreams_ = std::move(other.txStreams_); 124 deliverableStreams_ = std::move(other.deliverableStreams_); 125 closedStreams_ = std::move(other.closedStreams_); 126 isAppIdle_ = other.isAppIdle_; 127 maxLocalBidirectionalStreamIdIncreased_ = 128 other.maxLocalBidirectionalStreamIdIncreased_; 129 maxLocalUnidirectionalStreamIdIncreased_ = 130 other.maxLocalUnidirectionalStreamIdIncreased_; 131 132 /** 133 * We can't simply std::move the streams as the underlying 134 * QuicStreamState(s) hold a reference to the other.conn_. 135 */ 136 for (auto& pair : other.streams_) { 137 streams_.emplace( 138 std::piecewise_construct, 139 std::forward_as_tuple(pair.first), 140 std::forward_as_tuple( 141 /* migrate state to new conn ref */ conn_, 142 std::move(pair.second))); 143 } 144 } 145 /* 146 * Create the state for a stream if it does not exist and return it. Note this 147 * function is only used internally or for testing. 148 */ 149 folly::Expected<QuicStreamState*, LocalErrorCode> createStream( 150 StreamId streamId); 151 152 /* 153 * Create and return the state for the next available bidirectional stream. 154 */ 155 folly::Expected<QuicStreamState*, LocalErrorCode> 156 createNextBidirectionalStream(); 157 158 /* 159 * Create and return the state for the next available unidirectional stream. 160 */ 161 folly::Expected<QuicStreamState*, LocalErrorCode> 162 createNextUnidirectionalStream(); 163 164 /* 165 * Return the stream state or create it if the state has not yet been created. 166 * Note that this is only valid for streams that are currently open. 167 */ 168 QuicStreamState* FOLLY_NULLABLE getStream(StreamId streamId); 169 170 /* 171 * Remove all the state for a stream that is being closed. 172 */ 173 void removeClosedStream(StreamId streamId); 174 175 /* 176 * Update the current readable streams for the given stream state. This will 177 * either add or remove it from the collection of currently readable streams. 178 */ 179 void updateReadableStreams(QuicStreamState& stream); 180 181 /* 182 * Update the current peehable streams for the given stream state. This will 183 * either add or remove it from the collection of currently peekable streams. 184 */ 185 void updatePeekableStreams(QuicStreamState& stream); 186 187 /* 188 * Update the current writable streams for the given stream state. This will 189 * either add or remove it from the collection of currently writable streams. 190 */ 191 void updateWritableStreams(QuicStreamState& stream); 192 193 /* 194 * Find a open and active (we have created state for it) stream and return its 195 * state. 196 */ 197 QuicStreamState* FOLLY_NULLABLE findStream(StreamId streamId); 198 199 /* 200 * Check whether the stream exists. This returns false for the crypto stream, 201 * thus the caller must check separately for the crypto stream. 202 */ 203 bool streamExists(StreamId streamId); 204 openableLocalBidirectionalStreams()205 uint64_t openableLocalBidirectionalStreams() { 206 CHECK_GE( 207 maxLocalBidirectionalStreamId_, 208 nextAcceptableLocalBidirectionalStreamId_); 209 return (maxLocalBidirectionalStreamId_ - 210 nextAcceptableLocalBidirectionalStreamId_) / 211 detail::kStreamIncrement; 212 } 213 openableLocalUnidirectionalStreams()214 uint64_t openableLocalUnidirectionalStreams() { 215 CHECK_GE( 216 maxLocalUnidirectionalStreamId_, 217 nextAcceptableLocalUnidirectionalStreamId_); 218 return (maxLocalUnidirectionalStreamId_ - 219 nextAcceptableLocalUnidirectionalStreamId_) / 220 detail::kStreamIncrement; 221 } 222 openableRemoteBidirectionalStreams()223 uint64_t openableRemoteBidirectionalStreams() { 224 CHECK_GE( 225 maxRemoteBidirectionalStreamId_, 226 nextAcceptablePeerBidirectionalStreamId_); 227 return (maxRemoteBidirectionalStreamId_ - 228 nextAcceptablePeerBidirectionalStreamId_) / 229 detail::kStreamIncrement; 230 } 231 openableRemoteUnidirectionalStreams()232 uint64_t openableRemoteUnidirectionalStreams() { 233 CHECK_GE( 234 maxRemoteUnidirectionalStreamId_, 235 nextAcceptablePeerUnidirectionalStreamId_); 236 return (maxRemoteUnidirectionalStreamId_ - 237 nextAcceptablePeerUnidirectionalStreamId_) / 238 detail::kStreamIncrement; 239 } 240 241 /* 242 * Clear the new peer streams, presumably after all have been processed. 243 */ clearNewPeerStreams()244 void clearNewPeerStreams() { 245 newPeerStreams_.clear(); 246 } 247 248 /* 249 * Clear all the currently open streams. 250 */ clearOpenStreams()251 void clearOpenStreams() { 252 openBidirectionalLocalStreams_.clear(); 253 openUnidirectionalLocalStreams_.clear(); 254 openBidirectionalPeerStreams_.clear(); 255 openUnidirectionalPeerStreams_.clear(); 256 streams_.clear(); 257 } 258 259 /* 260 * Return a const reference to the underlying container holding the stream 261 * state. Only really useful for iterating. 262 */ streams()263 const auto& streams() const { 264 return streams_; 265 } 266 267 /* 268 * Call the given function on every currently open stream's state. 269 */ streamStateForEach(const std::function<void (QuicStreamState &)> & f)270 void streamStateForEach(const std::function<void(QuicStreamState&)>& f) { 271 for (auto& s : streams_) { 272 f(s.second); 273 } 274 } 275 hasLoss()276 FOLLY_NODISCARD bool hasLoss() const { 277 return !lossStreams_.empty(); 278 } 279 removeLoss(StreamId id)280 void removeLoss(StreamId id) { 281 lossStreams_.erase(id); 282 } 283 addLoss(StreamId id)284 void addLoss(StreamId id) { 285 lossStreams_.insert(id); 286 } 287 updateLossStreams(const QuicStreamState & stream)288 void updateLossStreams(const QuicStreamState& stream) { 289 if (stream.lossBuffer.empty()) { 290 removeLoss(stream.id); 291 } else { 292 addLoss(stream.id); 293 } 294 } 295 296 /** 297 * Update stream priority if the stream indicated by id exists, and the 298 * passed in values are different from current priority. Return true if 299 * stream priority is update, false otherwise. 300 */ 301 bool setStreamPriority(StreamId id, PriorityLevel level, bool incremental); 302 303 // TODO figure out a better interface here. 304 /* 305 * Returns a mutable reference to the container holding the writable stream 306 * IDs. 307 */ writableStreams()308 auto& writableStreams() { 309 return writableStreams_; 310 } 311 writableDSRStreams()312 auto& writableDSRStreams() { 313 return writableDSRStreams_; 314 } 315 316 // TODO figure out a better interface here. 317 /* 318 * Returns a mutable reference to the container holding the writable stream 319 * IDs. 320 */ writableControlStreams()321 auto& writableControlStreams() { 322 return writableControlStreams_; 323 } 324 325 /* 326 * Returns if there are any writable streams. 327 */ hasWritable()328 bool hasWritable() const { 329 return !writableStreams_.empty() || !writableDSRStreams_.empty() || 330 !writableControlStreams_.empty(); 331 } 332 hasDSRWritable()333 FOLLY_NODISCARD bool hasDSRWritable() const { 334 return !writableDSRStreams_.empty(); 335 } 336 hasNonDSRWritable()337 bool hasNonDSRWritable() const { 338 return !writableStreams_.empty() || !writableControlStreams_.empty(); 339 } 340 341 /* 342 * Add a writable stream id. 343 */ addWritable(const QuicStreamState & stream)344 void addWritable(const QuicStreamState& stream) { 345 if (stream.isControl) { 346 writableControlStreams_.insert(stream.id); 347 } else { 348 CHECK(stream.hasWritableData() || !stream.lossBuffer.empty()); 349 writableStreams_.insertOrUpdate(stream.id, stream.priority); 350 } 351 } 352 addDSRWritable(const QuicStreamState & stream)353 void addDSRWritable(const QuicStreamState& stream) { 354 CHECK(!stream.isControl); 355 CHECK(stream.hasWritableBufMeta() || !stream.lossBufMetas.empty()); 356 writableDSRStreams_.insertOrUpdate(stream.id, stream.priority); 357 } 358 359 /* 360 * Remove a writable stream id. 361 */ removeWritable(const QuicStreamState & stream)362 void removeWritable(const QuicStreamState& stream) { 363 if (stream.isControl) { 364 writableControlStreams_.erase(stream.id); 365 } else { 366 writableStreams_.erase(stream.id); 367 } 368 } 369 removeDSRWritable(const QuicStreamState & stream)370 void removeDSRWritable(const QuicStreamState& stream) { 371 CHECK(!stream.isControl); 372 writableDSRStreams_.erase(stream.id); 373 } 374 375 /* 376 * Clear the writable streams. 377 */ clearWritable()378 void clearWritable() { 379 writableStreams_.clear(); 380 writableDSRStreams_.clear(); 381 writableControlStreams_.clear(); 382 } 383 384 /* 385 * Returns a const reference to the underlying blocked streams container. 386 */ blockedStreams()387 const auto& blockedStreams() const { 388 return blockedStreams_; 389 } 390 391 /* 392 * Queue a blocked event for the given stream id at the given offset. 393 */ queueBlocked(StreamId streamId,uint64_t offset)394 void queueBlocked(StreamId streamId, uint64_t offset) { 395 blockedStreams_.emplace(streamId, StreamDataBlockedFrame(streamId, offset)); 396 } 397 398 /* 399 * Remove a blocked stream. 400 */ removeBlocked(StreamId streamId)401 void removeBlocked(StreamId streamId) { 402 blockedStreams_.erase(streamId); 403 } 404 405 /* 406 * Returns if there are any blocked streams. 407 */ hasBlocked()408 bool hasBlocked() const { 409 return !blockedStreams_.empty(); 410 } 411 412 /* 413 * Set the max number of local bidirectional streams. Can only be increased 414 * unless force is true. 415 */ 416 void setMaxLocalBidirectionalStreams(uint64_t maxStreams, bool force = false); 417 418 /* 419 * Set the max number of local unidirectional streams. Can only be increased 420 * unless force is true. 421 */ 422 void setMaxLocalUnidirectionalStreams( 423 uint64_t maxStreams, 424 bool force = false); 425 426 /* 427 * Set the max number of remote bidirectional streams. Can only be increased 428 * unless force is true. 429 */ 430 void setMaxRemoteBidirectionalStreams(uint64_t maxStreams); 431 432 /* 433 * Set the max number of remote unidirectional streams. Can only be increased 434 * unless force is true. 435 */ 436 void setMaxRemoteUnidirectionalStreams(uint64_t maxStreams); 437 438 /* 439 * Returns true if MaxLocalBidirectionalStreamId was increased 440 * since last call of this function (resets flag). 441 */ 442 bool consumeMaxLocalBidirectionalStreamIdIncreased(); 443 444 /* 445 * Returns true if MaxLocalUnidirectionalStreamId was increased 446 * since last call of this function (resets flag). 447 */ 448 bool consumeMaxLocalUnidirectionalStreamIdIncreased(); 449 450 void refreshTransportSettings(const TransportSettings& settings); 451 452 /* 453 * Sets the "window-by" fraction for sending stream limit updates. E.g. 454 * setting the fraction to two when the initial stream limit was 100 will 455 * cause the stream manager to update the relevant stream limit update when 456 * 50 streams have been closed. 457 */ setStreamLimitWindowingFraction(uint64_t fraction)458 void setStreamLimitWindowingFraction(uint64_t fraction) { 459 if (fraction > 0) { 460 streamLimitWindowingFraction_ = fraction; 461 } 462 } 463 464 /* 465 * The next value that should be sent in a bidirectional max streams frame, 466 * if any. This is potentially updated every time a bidirectional stream is 467 * closed. Calling this function "consumes" the update. 468 */ remoteBidirectionalStreamLimitUpdate()469 folly::Optional<uint64_t> remoteBidirectionalStreamLimitUpdate() { 470 auto ret = remoteBidirectionalStreamLimitUpdate_; 471 remoteBidirectionalStreamLimitUpdate_ = folly::none; 472 return ret; 473 } 474 475 /* 476 * The next value that should be sent in a unidirectional max streams frame, 477 * if any. This is potentially updated every time a unidirectional stream is 478 * closed. Calling this function "consumes" the update. 479 */ remoteUnidirectionalStreamLimitUpdate()480 folly::Optional<uint64_t> remoteUnidirectionalStreamLimitUpdate() { 481 auto ret = remoteUnidirectionalStreamLimitUpdate_; 482 remoteUnidirectionalStreamLimitUpdate_ = folly::none; 483 return ret; 484 } 485 486 /* 487 * Returns a const reference to the underlying stream window updates 488 * container. 489 */ windowUpdates()490 const auto& windowUpdates() const { 491 return windowUpdates_; 492 } 493 494 /* 495 * Returns whether a given stream id has a pending window update. 496 */ pendingWindowUpdate(StreamId streamId)497 bool pendingWindowUpdate(StreamId streamId) { 498 return windowUpdates_.count(streamId) > 0; 499 } 500 501 /* 502 * Queue a pending window update for the given stream id. 503 */ queueWindowUpdate(StreamId streamId)504 void queueWindowUpdate(StreamId streamId) { 505 windowUpdates_.emplace(streamId); 506 } 507 508 /* 509 * Clear the window updates. 510 */ removeWindowUpdate(StreamId streamId)511 void removeWindowUpdate(StreamId streamId) { 512 windowUpdates_.erase(streamId); 513 } 514 515 /* 516 * Returns whether any stream has a pending window update. 517 */ hasWindowUpdates()518 bool hasWindowUpdates() const { 519 return !windowUpdates_.empty(); 520 } 521 522 // TODO figure out a better interface here. 523 /* 524 * Return a mutable reference to the underlying closed streams container. 525 */ closedStreams()526 auto& closedStreams() { 527 return closedStreams_; 528 } 529 530 /* 531 * Add a closed stream. 532 */ addClosed(StreamId streamId)533 void addClosed(StreamId streamId) { 534 closedStreams_.insert(streamId); 535 } 536 537 /* 538 * Returns a const reference to the underlying deliverable streams container. 539 */ deliverableStreams()540 const auto& deliverableStreams() const { 541 return deliverableStreams_; 542 } 543 544 /* 545 * Add a deliverable stream. 546 */ addDeliverable(StreamId streamId)547 void addDeliverable(StreamId streamId) { 548 deliverableStreams_.insert(streamId); 549 } 550 551 /* 552 * Remove a deliverable stream. 553 */ removeDeliverable(StreamId streamId)554 void removeDeliverable(StreamId streamId) { 555 deliverableStreams_.erase(streamId); 556 } 557 558 /* 559 * Pop a deliverable stream id and return it. 560 */ popDeliverable()561 folly::Optional<StreamId> popDeliverable() { 562 auto itr = deliverableStreams_.begin(); 563 if (itr == deliverableStreams_.end()) { 564 return folly::none; 565 } 566 StreamId ret = *itr; 567 deliverableStreams_.erase(itr); 568 return ret; 569 } 570 571 /* 572 * Returns if there are any deliverable streams. 573 */ hasDeliverable()574 bool hasDeliverable() const { 575 return !deliverableStreams_.empty(); 576 } 577 578 /* 579 * Returns if the stream is in the deliverable container. 580 */ deliverableContains(StreamId streamId)581 bool deliverableContains(StreamId streamId) const { 582 return deliverableStreams_.count(streamId) > 0; 583 } 584 585 /* 586 * Returns a const reference to the underlying TX streams container. 587 */ txStreams()588 FOLLY_NODISCARD const auto& txStreams() const { 589 return txStreams_; 590 } 591 592 /* 593 * Add a stream to list of streams that have transmitted. 594 */ addTx(StreamId streamId)595 void addTx(StreamId streamId) { 596 txStreams_.insert(streamId); 597 } 598 599 /* 600 * Remove a TX stream. 601 */ removeTx(StreamId streamId)602 void removeTx(StreamId streamId) { 603 txStreams_.erase(streamId); 604 } 605 606 /* 607 * Pop a TX stream id and return it. 608 */ popTx()609 folly::Optional<StreamId> popTx() { 610 auto itr = txStreams_.begin(); 611 if (itr == txStreams_.end()) { 612 return folly::none; 613 } else { 614 StreamId ret = *itr; 615 txStreams_.erase(itr); 616 return ret; 617 } 618 } 619 620 /* 621 * Returns if there are any TX streams. 622 */ hasTx()623 FOLLY_NODISCARD bool hasTx() const { 624 return !txStreams_.empty(); 625 } 626 627 /* 628 * Returns if the stream is in the TX container. 629 */ txContains(StreamId streamId)630 FOLLY_NODISCARD bool txContains(StreamId streamId) const { 631 return txStreams_.count(streamId) > 0; 632 } 633 634 // TODO figure out a better interface here. 635 /* 636 * Returns a mutable reference to the underlying readable streams container. 637 */ readableStreams()638 auto& readableStreams() { 639 return readableStreams_; 640 } 641 642 // TODO figure out a better interface here. 643 /* 644 * Returns a mutable reference to the underlying peekable streams container. 645 */ peekableStreams()646 auto& peekableStreams() { 647 return peekableStreams_; 648 } 649 650 /* 651 * Returns a mutable reference to the underlying container of streams which 652 * had their flow control updated. 653 */ flowControlUpdated()654 const auto& flowControlUpdated() { 655 return flowControlUpdated_; 656 } 657 658 /* 659 * Consume the flow control updated streams using the parameter vector. 660 */ consumeFlowControlUpdated(std::vector<StreamId> && storage)661 auto consumeFlowControlUpdated(std::vector<StreamId>&& storage) { 662 std::vector<StreamId> result = storage; 663 result.clear(); 664 result.reserve(flowControlUpdated_.size()); 665 result.insert( 666 result.end(), flowControlUpdated_.begin(), flowControlUpdated_.end()); 667 flowControlUpdated_.clear(); 668 return result; 669 } 670 671 /* 672 * Queue a stream which has had its flow control updated. 673 */ queueFlowControlUpdated(StreamId streamId)674 void queueFlowControlUpdated(StreamId streamId) { 675 flowControlUpdated_.emplace(streamId); 676 } 677 678 /* 679 * Pop and return a stream which has had its flow control updated. 680 */ popFlowControlUpdated()681 folly::Optional<StreamId> popFlowControlUpdated() { 682 auto itr = flowControlUpdated_.begin(); 683 if (itr == flowControlUpdated_.end()) { 684 return folly::none; 685 } else { 686 StreamId ret = *itr; 687 flowControlUpdated_.erase(itr); 688 return ret; 689 } 690 } 691 692 /* 693 * Remove the specified stream from the flow control updated container. 694 */ removeFlowControlUpdated(StreamId streamId)695 void removeFlowControlUpdated(StreamId streamId) { 696 flowControlUpdated_.erase(streamId); 697 } 698 699 /* 700 * Returns if the the given stream is in the flow control updated container. 701 */ flowControlUpdatedContains(StreamId streamId)702 bool flowControlUpdatedContains(StreamId streamId) { 703 return flowControlUpdated_.count(streamId) > 0; 704 } 705 706 /* 707 * Clear the flow control updated container. 708 */ clearFlowControlUpdated()709 void clearFlowControlUpdated() { 710 flowControlUpdated_.clear(); 711 } 712 713 // TODO figure out a better interface here. 714 /* 715 * Returns a mutable reference to the underlying open bidirectional peer 716 * streams container. 717 */ openBidirectionalPeerStreams()718 auto& openBidirectionalPeerStreams() { 719 return openBidirectionalPeerStreams_; 720 } 721 722 // TODO figure out a better interface here. 723 /* 724 * Returns a mutable reference to the underlying open peer unidirectional 725 * streams container. 726 */ openUnidirectionalPeerStreams()727 auto& openUnidirectionalPeerStreams() { 728 return openUnidirectionalPeerStreams_; 729 } 730 731 // TODO figure out a better interface here. 732 /* 733 * Returns a mutable reference to the underlying open local unidirectional 734 * streams container. 735 */ openUnidirectionalLocalStreams()736 auto& openUnidirectionalLocalStreams() { 737 return openUnidirectionalLocalStreams_; 738 } 739 740 // TODO figure out a better interface here. 741 /* 742 * Returns a mutable reference to the underlying open local unidirectional 743 * streams container. 744 */ openBidirectionalLocalStreams()745 auto& openBidirectionalLocalStreams() { 746 return openBidirectionalLocalStreams_; 747 } 748 749 // TODO figure out a better interface here. 750 /* 751 * Returns a mutable reference to the underlying new peer streams container. 752 */ newPeerStreams()753 auto& newPeerStreams() { 754 return newPeerStreams_; 755 } 756 757 /* 758 * Consume the new peer streams using the parameter vector. 759 */ consumeNewPeerStreams(std::vector<StreamId> && storage)760 auto consumeNewPeerStreams(std::vector<StreamId>&& storage) { 761 std::vector<StreamId> result = storage; 762 result.clear(); 763 result.reserve(newPeerStreams_.size()); 764 result.insert(result.end(), newPeerStreams_.begin(), newPeerStreams_.end()); 765 newPeerStreams_.clear(); 766 return result; 767 } 768 769 /* 770 * Returns the number of streams open and active (for which we have created 771 * the stream state). 772 */ streamCount()773 size_t streamCount() { 774 return streams_.size(); 775 } 776 777 /* 778 * Returns a const reference to the container of streams with pending 779 * StopSending events. 780 */ stopSendingStreams()781 const auto& stopSendingStreams() const { 782 return stopSendingStreams_; 783 } 784 785 /* 786 * Consume the stop sending streams. 787 */ consumeStopSending()788 auto consumeStopSending() { 789 std::vector<std::pair<StreamId, ApplicationErrorCode>> result; 790 result.reserve(stopSendingStreams_.size()); 791 result.insert( 792 result.end(), stopSendingStreams_.begin(), stopSendingStreams_.end()); 793 return result; 794 } 795 796 /* 797 * Clear the StopSending streams. 798 */ clearStopSending()799 void clearStopSending() { 800 stopSendingStreams_.clear(); 801 } 802 803 /* 804 * Add a stream to the StopSending streams. 805 */ addStopSending(StreamId streamId,ApplicationErrorCode error)806 void addStopSending(StreamId streamId, ApplicationErrorCode error) { 807 stopSendingStreams_.emplace(streamId, error); 808 } 809 810 /* 811 * Returns if the stream manager has any non-control streams. 812 */ hasNonCtrlStreams()813 bool hasNonCtrlStreams() { 814 return streams_.size() != numControlStreams_; 815 } 816 817 /* 818 * Returns number of control streams. 819 */ numControlStreams()820 auto numControlStreams() { 821 return numControlStreams_; 822 } 823 824 /* 825 * Sets the given stream to be tracked as a control stream. 826 */ 827 void setStreamAsControl(QuicStreamState& stream); 828 829 /* 830 * Clear the tracking of streams which can trigger API callbacks. 831 */ clearActionable()832 void clearActionable() { 833 deliverableStreams_.clear(); 834 txStreams_.clear(); 835 readableStreams_.clear(); 836 peekableStreams_.clear(); 837 flowControlUpdated_.clear(); 838 } 839 840 bool isAppIdle() const; 841 842 /* 843 * Sets an observer that will be notified whenever the set of stream 844 * priorities changes 845 */ 846 void setPriorityChangesObserver(QuicStreamPrioritiesObserver* observer); 847 848 /* 849 * Stops notifications for changes to the set of stream priorities 850 */ 851 void resetPriorityChangesObserver(); 852 853 /* 854 * Returns the highest priority level used by any stream 855 * (Highest priority is lowest value) 856 */ 857 [[nodiscard]] PriorityLevel getHighestPriorityLevel() const; 858 859 private: 860 // Updates the congestion controller app-idle state, after a change in the 861 // number of streams. 862 // App-idle state is set to true if there was at least one non-control 863 // before the update and there are none after. It is set to false if instead 864 // there were no non-control streams before and there is at least one at the 865 // time of calling 866 void updateAppIdleState(); 867 868 QuicStreamState* FOLLY_NULLABLE 869 getOrCreateOpenedLocalStream(StreamId streamId); 870 871 QuicStreamState* FOLLY_NULLABLE getOrCreatePeerStream(StreamId streamId); 872 873 void setMaxRemoteBidirectionalStreamsInternal( 874 uint64_t maxStreams, 875 bool force); 876 void setMaxRemoteUnidirectionalStreamsInternal( 877 uint64_t maxStreams, 878 bool force); 879 880 void addToStreamPriorityMap(const QuicStreamState& streamState); 881 void notifyStreamPriorityChanges(); 882 883 QuicConnectionStateBase& conn_; 884 QuicNodeType nodeType_; 885 886 // Next acceptable bidirectional stream id that can be opened by the peer. 887 // Used to keep track of closed streams. 888 StreamId nextAcceptablePeerBidirectionalStreamId_{0}; 889 890 // Next acceptable unidirectional stream id that can be opened by the peer. 891 // Used to keep track of closed streams. 892 StreamId nextAcceptablePeerUnidirectionalStreamId_{0}; 893 894 // Next acceptable bidirectional stream id that can be opened locally. 895 // Used to keep track of closed streams. 896 StreamId nextAcceptableLocalBidirectionalStreamId_{0}; 897 898 // Next acceptable bidirectional stream id that can be opened locally. 899 // Used to keep track of closed streams. 900 StreamId nextAcceptableLocalUnidirectionalStreamId_{0}; 901 902 // Next bidirectional stream id to use when creating a stream. 903 StreamId nextBidirectionalStreamId_{0}; 904 905 // Next unidirectional stream id to use when creating a stream. 906 StreamId nextUnidirectionalStreamId_{0}; 907 908 StreamId maxLocalBidirectionalStreamId_{0}; 909 910 StreamId maxLocalUnidirectionalStreamId_{0}; 911 912 StreamId maxRemoteBidirectionalStreamId_{0}; 913 914 StreamId maxRemoteUnidirectionalStreamId_{0}; 915 916 StreamId initialLocalBidirectionalStreamId_{0}; 917 918 StreamId initialLocalUnidirectionalStreamId_{0}; 919 920 StreamId initialRemoteBidirectionalStreamId_{0}; 921 922 StreamId initialRemoteUnidirectionalStreamId_{0}; 923 924 // The fraction to determine the window by which we will signal the need to 925 // send stream limit updates 926 uint64_t streamLimitWindowingFraction_{2}; 927 928 // Contains the value of a stream window update that should be sent for 929 // remote bidirectional streams. 930 folly::Optional<uint64_t> remoteBidirectionalStreamLimitUpdate_; 931 932 // Contains the value of a stream window update that should be sent for 933 // remote bidirectional streams. 934 folly::Optional<uint64_t> remoteUnidirectionalStreamLimitUpdate_; 935 936 uint64_t numControlStreams_{0}; 937 938 // Bidirectional streams that are opened by the peer on the connection. 939 folly::F14FastSet<StreamId> openBidirectionalPeerStreams_; 940 941 // Unidirectional streams that are opened by the peer on the connection. 942 folly::F14FastSet<StreamId> openUnidirectionalPeerStreams_; 943 944 // Bidirectional streams that are opened locally on the connection. 945 folly::F14FastSet<StreamId> openBidirectionalLocalStreams_; 946 947 // Unidirectional streams that are opened locally on the connection. 948 folly::F14FastSet<StreamId> openUnidirectionalLocalStreams_; 949 950 // A map of streams that are active. 951 folly::F14FastMap<StreamId, QuicStreamState> streams_; 952 953 // Recently opened peer streams. 954 std::vector<StreamId> newPeerStreams_; 955 956 // Map of streams that were blocked 957 folly::F14FastMap<StreamId, StreamDataBlockedFrame> blockedStreams_; 958 959 // Map of streams where the peer was asked to stop sending 960 folly::F14FastMap<StreamId, ApplicationErrorCode> stopSendingStreams_; 961 962 // Map of stream priority levels 963 folly::F14FastMap<StreamId, PriorityLevel> streamPriorityLevels_; 964 965 // Streams that had their stream window change and potentially need a window 966 // update sent 967 folly::F14FastSet<StreamId> windowUpdates_; 968 969 // Streams that had their flow control updated 970 folly::F14FastSet<StreamId> flowControlUpdated_; 971 972 // Streams that have bytes in loss buffer 973 folly::F14FastSet<StreamId> lossStreams_; 974 975 // Set of streams that have pending reads 976 folly::F14FastSet<StreamId> readableStreams_; 977 978 // Set of streams that have pending peeks 979 folly::F14FastSet<StreamId> peekableStreams_; 980 981 // Set of !control streams that have writable data 982 PriorityQueue writableStreams_; 983 PriorityQueue writableDSRStreams_; 984 985 // Set of control streams that have writable data 986 std::set<StreamId> writableControlStreams_; 987 988 // Streams that may be able to call TxCallback 989 folly::F14FastSet<StreamId> txStreams_; 990 991 // Streams that may be able to callback DeliveryCallback 992 folly::F14FastSet<StreamId> deliverableStreams_; 993 994 // Streams that are closed but we still have state for 995 folly::F14FastSet<StreamId> closedStreams_; 996 997 // Observer to notify on changes in the streamPriorityLevels_ map 998 QuicStreamPrioritiesObserver* priorityChangesObserver_{nullptr}; 999 1000 // Record whether or not we are app-idle. 1001 bool isAppIdle_{false}; 1002 1003 const TransportSettings* FOLLY_NONNULL transportSettings_; 1004 1005 bool maxLocalBidirectionalStreamIdIncreased_{false}; 1006 bool maxLocalUnidirectionalStreamIdIncreased_{false}; 1007 }; 1008 1009 } // namespace quic 1010