1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  *
7  */
8 
9 #pragma once
10 
11 #include <folly/container/F14Map.h>
12 #include <folly/container/F14Set.h>
13 #include <quic/QuicConstants.h>
14 #include <quic/codec/Types.h>
15 #include <quic/state/QuicStreamPrioritiesObserver.h>
16 #include <quic/state/StreamData.h>
17 #include <quic/state/TransportSettings.h>
18 #include <numeric>
19 #include <set>
20 
21 namespace quic {
22 namespace detail {
23 
24 constexpr uint8_t kStreamIncrement = 0x04;
25 }
26 
27 class QuicStreamManager {
28  public:
QuicStreamManager(QuicConnectionStateBase & conn,QuicNodeType nodeType,const TransportSettings & transportSettings)29   explicit QuicStreamManager(
30       QuicConnectionStateBase& conn,
31       QuicNodeType nodeType,
32       const TransportSettings& transportSettings)
33       : conn_(conn),
34         nodeType_(nodeType),
35         transportSettings_(&transportSettings) {
36     if (nodeType == QuicNodeType::Server) {
37       nextAcceptablePeerBidirectionalStreamId_ = 0x00;
38       nextAcceptablePeerUnidirectionalStreamId_ = 0x02;
39       nextAcceptableLocalBidirectionalStreamId_ = 0x01;
40       nextAcceptableLocalUnidirectionalStreamId_ = 0x03;
41       nextBidirectionalStreamId_ = 0x01;
42       nextUnidirectionalStreamId_ = 0x03;
43       initialLocalBidirectionalStreamId_ = 0x01;
44       initialLocalUnidirectionalStreamId_ = 0x03;
45       initialRemoteBidirectionalStreamId_ = 0x00;
46       initialRemoteUnidirectionalStreamId_ = 0x02;
47     } else {
48       nextAcceptablePeerBidirectionalStreamId_ = 0x01;
49       nextAcceptablePeerUnidirectionalStreamId_ = 0x03;
50       nextAcceptableLocalBidirectionalStreamId_ = 0x00;
51       nextAcceptableLocalUnidirectionalStreamId_ = 0x02;
52       nextBidirectionalStreamId_ = 0x00;
53       nextUnidirectionalStreamId_ = 0x02;
54       initialLocalBidirectionalStreamId_ = 0x00;
55       initialLocalUnidirectionalStreamId_ = 0x02;
56       initialRemoteBidirectionalStreamId_ = 0x01;
57       initialRemoteUnidirectionalStreamId_ = 0x03;
58     }
59     refreshTransportSettings(transportSettings);
60   }
61 
62   /**
63    * Constructor to facilitate migration of a QuicStreamManager to another
64    * QuicConnectionStateBase
65    */
QuicStreamManager(QuicConnectionStateBase & conn,QuicNodeType nodeType,const TransportSettings & transportSettings,QuicStreamManager && other)66   explicit QuicStreamManager(
67       QuicConnectionStateBase& conn,
68       QuicNodeType nodeType,
69       const TransportSettings& transportSettings,
70       QuicStreamManager&& other)
71       : conn_(conn),
72         nodeType_(nodeType),
73         transportSettings_(&transportSettings) {
74     nextAcceptablePeerBidirectionalStreamId_ =
75         other.nextAcceptablePeerBidirectionalStreamId_;
76     nextAcceptablePeerUnidirectionalStreamId_ =
77         other.nextAcceptablePeerUnidirectionalStreamId_;
78     nextAcceptableLocalBidirectionalStreamId_ =
79         other.nextAcceptableLocalBidirectionalStreamId_;
80     nextAcceptableLocalUnidirectionalStreamId_ =
81         other.nextAcceptableLocalUnidirectionalStreamId_;
82     nextBidirectionalStreamId_ = other.nextBidirectionalStreamId_;
83     nextUnidirectionalStreamId_ = other.nextUnidirectionalStreamId_;
84     maxLocalBidirectionalStreamId_ = other.maxLocalBidirectionalStreamId_;
85     maxLocalUnidirectionalStreamId_ = other.maxLocalUnidirectionalStreamId_;
86     maxRemoteBidirectionalStreamId_ = other.maxRemoteBidirectionalStreamId_;
87     maxRemoteUnidirectionalStreamId_ = other.maxRemoteUnidirectionalStreamId_;
88     initialLocalBidirectionalStreamId_ =
89         other.initialLocalBidirectionalStreamId_;
90     initialLocalUnidirectionalStreamId_ =
91         other.initialLocalUnidirectionalStreamId_;
92     initialRemoteBidirectionalStreamId_ =
93         other.initialRemoteBidirectionalStreamId_;
94     initialRemoteUnidirectionalStreamId_ =
95         other.initialRemoteUnidirectionalStreamId_;
96 
97     streamLimitWindowingFraction_ = other.streamLimitWindowingFraction_;
98     remoteBidirectionalStreamLimitUpdate_ =
99         other.remoteBidirectionalStreamLimitUpdate_;
100     remoteUnidirectionalStreamLimitUpdate_ =
101         other.remoteUnidirectionalStreamLimitUpdate_;
102     numControlStreams_ = other.numControlStreams_;
103     openBidirectionalPeerStreams_ =
104         std::move(other.openBidirectionalPeerStreams_);
105     openUnidirectionalPeerStreams_ =
106         std::move(other.openUnidirectionalPeerStreams_);
107     openBidirectionalLocalStreams_ =
108         std::move(other.openBidirectionalLocalStreams_);
109     openUnidirectionalLocalStreams_ =
110         std::move(other.openUnidirectionalLocalStreams_);
111     newPeerStreams_ = std::move(other.newPeerStreams_);
112     blockedStreams_ = std::move(other.blockedStreams_);
113     stopSendingStreams_ = std::move(other.stopSendingStreams_);
114     streamPriorityLevels_ = std::move(other.streamPriorityLevels_);
115     windowUpdates_ = std::move(other.windowUpdates_);
116     flowControlUpdated_ = std::move(other.flowControlUpdated_);
117     lossStreams_ = std::move(other.lossStreams_);
118     readableStreams_ = std::move(other.readableStreams_);
119     peekableStreams_ = std::move(other.peekableStreams_);
120     writableStreams_ = std::move(other.writableStreams_);
121     writableDSRStreams_ = std::move(other.writableDSRStreams_);
122     writableControlStreams_ = std::move(other.writableControlStreams_);
123     txStreams_ = std::move(other.txStreams_);
124     deliverableStreams_ = std::move(other.deliverableStreams_);
125     closedStreams_ = std::move(other.closedStreams_);
126     isAppIdle_ = other.isAppIdle_;
127     maxLocalBidirectionalStreamIdIncreased_ =
128         other.maxLocalBidirectionalStreamIdIncreased_;
129     maxLocalUnidirectionalStreamIdIncreased_ =
130         other.maxLocalUnidirectionalStreamIdIncreased_;
131 
132     /**
133      * We can't simply std::move the streams as the underlying
134      * QuicStreamState(s) hold a reference to the other.conn_.
135      */
136     for (auto& pair : other.streams_) {
137       streams_.emplace(
138           std::piecewise_construct,
139           std::forward_as_tuple(pair.first),
140           std::forward_as_tuple(
141               /* migrate state to new conn ref */ conn_,
142               std::move(pair.second)));
143     }
144   }
145   /*
146    * Create the state for a stream if it does not exist and return it. Note this
147    * function is only used internally or for testing.
148    */
149   folly::Expected<QuicStreamState*, LocalErrorCode> createStream(
150       StreamId streamId);
151 
152   /*
153    * Create and return the state for the next available bidirectional stream.
154    */
155   folly::Expected<QuicStreamState*, LocalErrorCode>
156   createNextBidirectionalStream();
157 
158   /*
159    * Create and return the state for the next available unidirectional stream.
160    */
161   folly::Expected<QuicStreamState*, LocalErrorCode>
162   createNextUnidirectionalStream();
163 
164   /*
165    * Return the stream state or create it if the state has not yet been created.
166    * Note that this is only valid for streams that are currently open.
167    */
168   QuicStreamState* FOLLY_NULLABLE getStream(StreamId streamId);
169 
170   /*
171    * Remove all the state for a stream that is being closed.
172    */
173   void removeClosedStream(StreamId streamId);
174 
175   /*
176    * Update the current readable streams for the given stream state. This will
177    * either add or remove it from the collection of currently readable streams.
178    */
179   void updateReadableStreams(QuicStreamState& stream);
180 
181   /*
182    * Update the current peehable streams for the given stream state. This will
183    * either add or remove it from the collection of currently peekable streams.
184    */
185   void updatePeekableStreams(QuicStreamState& stream);
186 
187   /*
188    * Update the current writable streams for the given stream state. This will
189    * either add or remove it from the collection of currently writable streams.
190    */
191   void updateWritableStreams(QuicStreamState& stream);
192 
193   /*
194    * Find a open and active (we have created state for it) stream and return its
195    * state.
196    */
197   QuicStreamState* FOLLY_NULLABLE findStream(StreamId streamId);
198 
199   /*
200    * Check whether the stream exists. This returns false for the crypto stream,
201    * thus the caller must check separately for the crypto stream.
202    */
203   bool streamExists(StreamId streamId);
204 
openableLocalBidirectionalStreams()205   uint64_t openableLocalBidirectionalStreams() {
206     CHECK_GE(
207         maxLocalBidirectionalStreamId_,
208         nextAcceptableLocalBidirectionalStreamId_);
209     return (maxLocalBidirectionalStreamId_ -
210             nextAcceptableLocalBidirectionalStreamId_) /
211         detail::kStreamIncrement;
212   }
213 
openableLocalUnidirectionalStreams()214   uint64_t openableLocalUnidirectionalStreams() {
215     CHECK_GE(
216         maxLocalUnidirectionalStreamId_,
217         nextAcceptableLocalUnidirectionalStreamId_);
218     return (maxLocalUnidirectionalStreamId_ -
219             nextAcceptableLocalUnidirectionalStreamId_) /
220         detail::kStreamIncrement;
221   }
222 
openableRemoteBidirectionalStreams()223   uint64_t openableRemoteBidirectionalStreams() {
224     CHECK_GE(
225         maxRemoteBidirectionalStreamId_,
226         nextAcceptablePeerBidirectionalStreamId_);
227     return (maxRemoteBidirectionalStreamId_ -
228             nextAcceptablePeerBidirectionalStreamId_) /
229         detail::kStreamIncrement;
230   }
231 
openableRemoteUnidirectionalStreams()232   uint64_t openableRemoteUnidirectionalStreams() {
233     CHECK_GE(
234         maxRemoteUnidirectionalStreamId_,
235         nextAcceptablePeerUnidirectionalStreamId_);
236     return (maxRemoteUnidirectionalStreamId_ -
237             nextAcceptablePeerUnidirectionalStreamId_) /
238         detail::kStreamIncrement;
239   }
240 
241   /*
242    * Clear the new peer streams, presumably after all have been processed.
243    */
clearNewPeerStreams()244   void clearNewPeerStreams() {
245     newPeerStreams_.clear();
246   }
247 
248   /*
249    * Clear all the currently open streams.
250    */
clearOpenStreams()251   void clearOpenStreams() {
252     openBidirectionalLocalStreams_.clear();
253     openUnidirectionalLocalStreams_.clear();
254     openBidirectionalPeerStreams_.clear();
255     openUnidirectionalPeerStreams_.clear();
256     streams_.clear();
257   }
258 
259   /*
260    * Return a const reference to the underlying container holding the stream
261    * state. Only really useful for iterating.
262    */
streams()263   const auto& streams() const {
264     return streams_;
265   }
266 
267   /*
268    * Call the given function on every currently open stream's state.
269    */
streamStateForEach(const std::function<void (QuicStreamState &)> & f)270   void streamStateForEach(const std::function<void(QuicStreamState&)>& f) {
271     for (auto& s : streams_) {
272       f(s.second);
273     }
274   }
275 
hasLoss()276   FOLLY_NODISCARD bool hasLoss() const {
277     return !lossStreams_.empty();
278   }
279 
removeLoss(StreamId id)280   void removeLoss(StreamId id) {
281     lossStreams_.erase(id);
282   }
283 
addLoss(StreamId id)284   void addLoss(StreamId id) {
285     lossStreams_.insert(id);
286   }
287 
updateLossStreams(const QuicStreamState & stream)288   void updateLossStreams(const QuicStreamState& stream) {
289     if (stream.lossBuffer.empty()) {
290       removeLoss(stream.id);
291     } else {
292       addLoss(stream.id);
293     }
294   }
295 
296   /**
297    * Update stream priority if the stream indicated by id exists, and the
298    * passed in values are different from current priority. Return true if
299    * stream priority is update, false otherwise.
300    */
301   bool setStreamPriority(StreamId id, PriorityLevel level, bool incremental);
302 
303   // TODO figure out a better interface here.
304   /*
305    * Returns a mutable reference to the container holding the writable stream
306    * IDs.
307    */
writableStreams()308   auto& writableStreams() {
309     return writableStreams_;
310   }
311 
writableDSRStreams()312   auto& writableDSRStreams() {
313     return writableDSRStreams_;
314   }
315 
316   // TODO figure out a better interface here.
317   /*
318    * Returns a mutable reference to the container holding the writable stream
319    * IDs.
320    */
writableControlStreams()321   auto& writableControlStreams() {
322     return writableControlStreams_;
323   }
324 
325   /*
326    * Returns if there are any writable streams.
327    */
hasWritable()328   bool hasWritable() const {
329     return !writableStreams_.empty() || !writableDSRStreams_.empty() ||
330         !writableControlStreams_.empty();
331   }
332 
hasDSRWritable()333   FOLLY_NODISCARD bool hasDSRWritable() const {
334     return !writableDSRStreams_.empty();
335   }
336 
hasNonDSRWritable()337   bool hasNonDSRWritable() const {
338     return !writableStreams_.empty() || !writableControlStreams_.empty();
339   }
340 
341   /*
342    * Add a writable stream id.
343    */
addWritable(const QuicStreamState & stream)344   void addWritable(const QuicStreamState& stream) {
345     if (stream.isControl) {
346       writableControlStreams_.insert(stream.id);
347     } else {
348       CHECK(stream.hasWritableData() || !stream.lossBuffer.empty());
349       writableStreams_.insertOrUpdate(stream.id, stream.priority);
350     }
351   }
352 
addDSRWritable(const QuicStreamState & stream)353   void addDSRWritable(const QuicStreamState& stream) {
354     CHECK(!stream.isControl);
355     CHECK(stream.hasWritableBufMeta() || !stream.lossBufMetas.empty());
356     writableDSRStreams_.insertOrUpdate(stream.id, stream.priority);
357   }
358 
359   /*
360    * Remove a writable stream id.
361    */
removeWritable(const QuicStreamState & stream)362   void removeWritable(const QuicStreamState& stream) {
363     if (stream.isControl) {
364       writableControlStreams_.erase(stream.id);
365     } else {
366       writableStreams_.erase(stream.id);
367     }
368   }
369 
removeDSRWritable(const QuicStreamState & stream)370   void removeDSRWritable(const QuicStreamState& stream) {
371     CHECK(!stream.isControl);
372     writableDSRStreams_.erase(stream.id);
373   }
374 
375   /*
376    * Clear the writable streams.
377    */
clearWritable()378   void clearWritable() {
379     writableStreams_.clear();
380     writableDSRStreams_.clear();
381     writableControlStreams_.clear();
382   }
383 
384   /*
385    * Returns a const reference to the underlying blocked streams container.
386    */
blockedStreams()387   const auto& blockedStreams() const {
388     return blockedStreams_;
389   }
390 
391   /*
392    * Queue a blocked event for the given stream id at the given offset.
393    */
queueBlocked(StreamId streamId,uint64_t offset)394   void queueBlocked(StreamId streamId, uint64_t offset) {
395     blockedStreams_.emplace(streamId, StreamDataBlockedFrame(streamId, offset));
396   }
397 
398   /*
399    * Remove a blocked stream.
400    */
removeBlocked(StreamId streamId)401   void removeBlocked(StreamId streamId) {
402     blockedStreams_.erase(streamId);
403   }
404 
405   /*
406    * Returns if there are any blocked streams.
407    */
hasBlocked()408   bool hasBlocked() const {
409     return !blockedStreams_.empty();
410   }
411 
412   /*
413    * Set the max number of local bidirectional streams. Can only be increased
414    * unless force is true.
415    */
416   void setMaxLocalBidirectionalStreams(uint64_t maxStreams, bool force = false);
417 
418   /*
419    * Set the max number of local unidirectional streams. Can only be increased
420    * unless force is true.
421    */
422   void setMaxLocalUnidirectionalStreams(
423       uint64_t maxStreams,
424       bool force = false);
425 
426   /*
427    * Set the max number of remote bidirectional streams. Can only be increased
428    * unless force is true.
429    */
430   void setMaxRemoteBidirectionalStreams(uint64_t maxStreams);
431 
432   /*
433    * Set the max number of remote unidirectional streams. Can only be increased
434    * unless force is true.
435    */
436   void setMaxRemoteUnidirectionalStreams(uint64_t maxStreams);
437 
438   /*
439    * Returns true if MaxLocalBidirectionalStreamId was increased
440    * since last call of this function (resets flag).
441    */
442   bool consumeMaxLocalBidirectionalStreamIdIncreased();
443 
444   /*
445    * Returns true if MaxLocalUnidirectionalStreamId was increased
446    * since last call of this function (resets flag).
447    */
448   bool consumeMaxLocalUnidirectionalStreamIdIncreased();
449 
450   void refreshTransportSettings(const TransportSettings& settings);
451 
452   /*
453    * Sets the "window-by" fraction for sending stream limit updates. E.g.
454    * setting the fraction to two when the initial stream limit was 100 will
455    * cause the stream manager to update the relevant stream limit update when
456    * 50 streams have been closed.
457    */
setStreamLimitWindowingFraction(uint64_t fraction)458   void setStreamLimitWindowingFraction(uint64_t fraction) {
459     if (fraction > 0) {
460       streamLimitWindowingFraction_ = fraction;
461     }
462   }
463 
464   /*
465    * The next value that should be sent in a bidirectional max streams frame,
466    * if any. This is potentially updated every time a bidirectional stream is
467    * closed. Calling this function "consumes" the update.
468    */
remoteBidirectionalStreamLimitUpdate()469   folly::Optional<uint64_t> remoteBidirectionalStreamLimitUpdate() {
470     auto ret = remoteBidirectionalStreamLimitUpdate_;
471     remoteBidirectionalStreamLimitUpdate_ = folly::none;
472     return ret;
473   }
474 
475   /*
476    * The next value that should be sent in a unidirectional max streams frame,
477    * if any. This is potentially updated every time a unidirectional stream is
478    * closed. Calling this function "consumes" the update.
479    */
remoteUnidirectionalStreamLimitUpdate()480   folly::Optional<uint64_t> remoteUnidirectionalStreamLimitUpdate() {
481     auto ret = remoteUnidirectionalStreamLimitUpdate_;
482     remoteUnidirectionalStreamLimitUpdate_ = folly::none;
483     return ret;
484   }
485 
486   /*
487    * Returns a const reference to the underlying stream window updates
488    * container.
489    */
windowUpdates()490   const auto& windowUpdates() const {
491     return windowUpdates_;
492   }
493 
494   /*
495    * Returns whether a given stream id has a pending window update.
496    */
pendingWindowUpdate(StreamId streamId)497   bool pendingWindowUpdate(StreamId streamId) {
498     return windowUpdates_.count(streamId) > 0;
499   }
500 
501   /*
502    * Queue a pending window update for the given stream id.
503    */
queueWindowUpdate(StreamId streamId)504   void queueWindowUpdate(StreamId streamId) {
505     windowUpdates_.emplace(streamId);
506   }
507 
508   /*
509    * Clear the window updates.
510    */
removeWindowUpdate(StreamId streamId)511   void removeWindowUpdate(StreamId streamId) {
512     windowUpdates_.erase(streamId);
513   }
514 
515   /*
516    * Returns whether any stream has a pending window update.
517    */
hasWindowUpdates()518   bool hasWindowUpdates() const {
519     return !windowUpdates_.empty();
520   }
521 
522   // TODO figure out a better interface here.
523   /*
524    * Return a mutable reference to the underlying closed streams container.
525    */
closedStreams()526   auto& closedStreams() {
527     return closedStreams_;
528   }
529 
530   /*
531    * Add a closed stream.
532    */
addClosed(StreamId streamId)533   void addClosed(StreamId streamId) {
534     closedStreams_.insert(streamId);
535   }
536 
537   /*
538    * Returns a const reference to the underlying deliverable streams container.
539    */
deliverableStreams()540   const auto& deliverableStreams() const {
541     return deliverableStreams_;
542   }
543 
544   /*
545    * Add a deliverable stream.
546    */
addDeliverable(StreamId streamId)547   void addDeliverable(StreamId streamId) {
548     deliverableStreams_.insert(streamId);
549   }
550 
551   /*
552    * Remove a deliverable stream.
553    */
removeDeliverable(StreamId streamId)554   void removeDeliverable(StreamId streamId) {
555     deliverableStreams_.erase(streamId);
556   }
557 
558   /*
559    * Pop a deliverable stream id and return it.
560    */
popDeliverable()561   folly::Optional<StreamId> popDeliverable() {
562     auto itr = deliverableStreams_.begin();
563     if (itr == deliverableStreams_.end()) {
564       return folly::none;
565     }
566     StreamId ret = *itr;
567     deliverableStreams_.erase(itr);
568     return ret;
569   }
570 
571   /*
572    * Returns if there are any deliverable streams.
573    */
hasDeliverable()574   bool hasDeliverable() const {
575     return !deliverableStreams_.empty();
576   }
577 
578   /*
579    * Returns if the stream is in the deliverable container.
580    */
deliverableContains(StreamId streamId)581   bool deliverableContains(StreamId streamId) const {
582     return deliverableStreams_.count(streamId) > 0;
583   }
584 
585   /*
586    * Returns a const reference to the underlying TX streams container.
587    */
txStreams()588   FOLLY_NODISCARD const auto& txStreams() const {
589     return txStreams_;
590   }
591 
592   /*
593    * Add a stream to list of streams that have transmitted.
594    */
addTx(StreamId streamId)595   void addTx(StreamId streamId) {
596     txStreams_.insert(streamId);
597   }
598 
599   /*
600    * Remove a TX stream.
601    */
removeTx(StreamId streamId)602   void removeTx(StreamId streamId) {
603     txStreams_.erase(streamId);
604   }
605 
606   /*
607    * Pop a TX stream id and return it.
608    */
popTx()609   folly::Optional<StreamId> popTx() {
610     auto itr = txStreams_.begin();
611     if (itr == txStreams_.end()) {
612       return folly::none;
613     } else {
614       StreamId ret = *itr;
615       txStreams_.erase(itr);
616       return ret;
617     }
618   }
619 
620   /*
621    * Returns if there are any TX streams.
622    */
hasTx()623   FOLLY_NODISCARD bool hasTx() const {
624     return !txStreams_.empty();
625   }
626 
627   /*
628    * Returns if the stream is in the TX container.
629    */
txContains(StreamId streamId)630   FOLLY_NODISCARD bool txContains(StreamId streamId) const {
631     return txStreams_.count(streamId) > 0;
632   }
633 
634   // TODO figure out a better interface here.
635   /*
636    * Returns a mutable reference to the underlying readable streams container.
637    */
readableStreams()638   auto& readableStreams() {
639     return readableStreams_;
640   }
641 
642   // TODO figure out a better interface here.
643   /*
644    * Returns a mutable reference to the underlying peekable streams container.
645    */
peekableStreams()646   auto& peekableStreams() {
647     return peekableStreams_;
648   }
649 
650   /*
651    * Returns a mutable reference to the underlying container of streams which
652    * had their flow control updated.
653    */
flowControlUpdated()654   const auto& flowControlUpdated() {
655     return flowControlUpdated_;
656   }
657 
658   /*
659    * Consume the flow control updated streams using the parameter vector.
660    */
consumeFlowControlUpdated(std::vector<StreamId> && storage)661   auto consumeFlowControlUpdated(std::vector<StreamId>&& storage) {
662     std::vector<StreamId> result = storage;
663     result.clear();
664     result.reserve(flowControlUpdated_.size());
665     result.insert(
666         result.end(), flowControlUpdated_.begin(), flowControlUpdated_.end());
667     flowControlUpdated_.clear();
668     return result;
669   }
670 
671   /*
672    * Queue a stream which has had its flow control updated.
673    */
queueFlowControlUpdated(StreamId streamId)674   void queueFlowControlUpdated(StreamId streamId) {
675     flowControlUpdated_.emplace(streamId);
676   }
677 
678   /*
679    * Pop and return a stream which has had its flow control updated.
680    */
popFlowControlUpdated()681   folly::Optional<StreamId> popFlowControlUpdated() {
682     auto itr = flowControlUpdated_.begin();
683     if (itr == flowControlUpdated_.end()) {
684       return folly::none;
685     } else {
686       StreamId ret = *itr;
687       flowControlUpdated_.erase(itr);
688       return ret;
689     }
690   }
691 
692   /*
693    * Remove the specified stream from the flow control updated container.
694    */
removeFlowControlUpdated(StreamId streamId)695   void removeFlowControlUpdated(StreamId streamId) {
696     flowControlUpdated_.erase(streamId);
697   }
698 
699   /*
700    * Returns if the the given stream is in the flow control updated container.
701    */
flowControlUpdatedContains(StreamId streamId)702   bool flowControlUpdatedContains(StreamId streamId) {
703     return flowControlUpdated_.count(streamId) > 0;
704   }
705 
706   /*
707    * Clear the flow control updated container.
708    */
clearFlowControlUpdated()709   void clearFlowControlUpdated() {
710     flowControlUpdated_.clear();
711   }
712 
713   // TODO figure out a better interface here.
714   /*
715    * Returns a mutable reference to the underlying open bidirectional peer
716    * streams container.
717    */
openBidirectionalPeerStreams()718   auto& openBidirectionalPeerStreams() {
719     return openBidirectionalPeerStreams_;
720   }
721 
722   // TODO figure out a better interface here.
723   /*
724    * Returns a mutable reference to the underlying open peer unidirectional
725    * streams container.
726    */
openUnidirectionalPeerStreams()727   auto& openUnidirectionalPeerStreams() {
728     return openUnidirectionalPeerStreams_;
729   }
730 
731   // TODO figure out a better interface here.
732   /*
733    * Returns a mutable reference to the underlying open local unidirectional
734    * streams container.
735    */
openUnidirectionalLocalStreams()736   auto& openUnidirectionalLocalStreams() {
737     return openUnidirectionalLocalStreams_;
738   }
739 
740   // TODO figure out a better interface here.
741   /*
742    * Returns a mutable reference to the underlying open local unidirectional
743    * streams container.
744    */
openBidirectionalLocalStreams()745   auto& openBidirectionalLocalStreams() {
746     return openBidirectionalLocalStreams_;
747   }
748 
749   // TODO figure out a better interface here.
750   /*
751    * Returns a mutable reference to the underlying new peer streams container.
752    */
newPeerStreams()753   auto& newPeerStreams() {
754     return newPeerStreams_;
755   }
756 
757   /*
758    * Consume the new peer streams using the parameter vector.
759    */
consumeNewPeerStreams(std::vector<StreamId> && storage)760   auto consumeNewPeerStreams(std::vector<StreamId>&& storage) {
761     std::vector<StreamId> result = storage;
762     result.clear();
763     result.reserve(newPeerStreams_.size());
764     result.insert(result.end(), newPeerStreams_.begin(), newPeerStreams_.end());
765     newPeerStreams_.clear();
766     return result;
767   }
768 
769   /*
770    * Returns the number of streams open and active (for which we have created
771    * the stream state).
772    */
streamCount()773   size_t streamCount() {
774     return streams_.size();
775   }
776 
777   /*
778    * Returns a const reference to the container of streams with pending
779    * StopSending events.
780    */
stopSendingStreams()781   const auto& stopSendingStreams() const {
782     return stopSendingStreams_;
783   }
784 
785   /*
786    * Consume the stop sending streams.
787    */
consumeStopSending()788   auto consumeStopSending() {
789     std::vector<std::pair<StreamId, ApplicationErrorCode>> result;
790     result.reserve(stopSendingStreams_.size());
791     result.insert(
792         result.end(), stopSendingStreams_.begin(), stopSendingStreams_.end());
793     return result;
794   }
795 
796   /*
797    * Clear the StopSending streams.
798    */
clearStopSending()799   void clearStopSending() {
800     stopSendingStreams_.clear();
801   }
802 
803   /*
804    * Add a stream to the StopSending streams.
805    */
addStopSending(StreamId streamId,ApplicationErrorCode error)806   void addStopSending(StreamId streamId, ApplicationErrorCode error) {
807     stopSendingStreams_.emplace(streamId, error);
808   }
809 
810   /*
811    * Returns if the stream manager has any non-control streams.
812    */
hasNonCtrlStreams()813   bool hasNonCtrlStreams() {
814     return streams_.size() != numControlStreams_;
815   }
816 
817   /*
818    * Returns number of control streams.
819    */
numControlStreams()820   auto numControlStreams() {
821     return numControlStreams_;
822   }
823 
824   /*
825    * Sets the given stream to be tracked as a control stream.
826    */
827   void setStreamAsControl(QuicStreamState& stream);
828 
829   /*
830    * Clear the tracking of streams which can trigger API callbacks.
831    */
clearActionable()832   void clearActionable() {
833     deliverableStreams_.clear();
834     txStreams_.clear();
835     readableStreams_.clear();
836     peekableStreams_.clear();
837     flowControlUpdated_.clear();
838   }
839 
840   bool isAppIdle() const;
841 
842   /*
843    * Sets an observer that will be notified whenever the set of stream
844    * priorities changes
845    */
846   void setPriorityChangesObserver(QuicStreamPrioritiesObserver* observer);
847 
848   /*
849    * Stops notifications for changes to the set of stream priorities
850    */
851   void resetPriorityChangesObserver();
852 
853   /*
854    * Returns the highest priority level used by any stream
855    * (Highest priority is lowest value)
856    */
857   [[nodiscard]] PriorityLevel getHighestPriorityLevel() const;
858 
859  private:
860   // Updates the congestion controller app-idle state, after a change in the
861   // number of streams.
862   // App-idle state is set to true if there was at least one non-control
863   // before the update and there are none after. It is set to false if instead
864   // there were no non-control streams before and there is at least one at the
865   // time of calling
866   void updateAppIdleState();
867 
868   QuicStreamState* FOLLY_NULLABLE
869   getOrCreateOpenedLocalStream(StreamId streamId);
870 
871   QuicStreamState* FOLLY_NULLABLE getOrCreatePeerStream(StreamId streamId);
872 
873   void setMaxRemoteBidirectionalStreamsInternal(
874       uint64_t maxStreams,
875       bool force);
876   void setMaxRemoteUnidirectionalStreamsInternal(
877       uint64_t maxStreams,
878       bool force);
879 
880   void addToStreamPriorityMap(const QuicStreamState& streamState);
881   void notifyStreamPriorityChanges();
882 
883   QuicConnectionStateBase& conn_;
884   QuicNodeType nodeType_;
885 
886   // Next acceptable bidirectional stream id that can be opened by the peer.
887   // Used to keep track of closed streams.
888   StreamId nextAcceptablePeerBidirectionalStreamId_{0};
889 
890   // Next acceptable unidirectional stream id that can be opened by the peer.
891   // Used to keep track of closed streams.
892   StreamId nextAcceptablePeerUnidirectionalStreamId_{0};
893 
894   // Next acceptable bidirectional stream id that can be opened locally.
895   // Used to keep track of closed streams.
896   StreamId nextAcceptableLocalBidirectionalStreamId_{0};
897 
898   // Next acceptable bidirectional stream id that can be opened locally.
899   // Used to keep track of closed streams.
900   StreamId nextAcceptableLocalUnidirectionalStreamId_{0};
901 
902   // Next bidirectional stream id to use when creating a stream.
903   StreamId nextBidirectionalStreamId_{0};
904 
905   // Next unidirectional stream id to use when creating a stream.
906   StreamId nextUnidirectionalStreamId_{0};
907 
908   StreamId maxLocalBidirectionalStreamId_{0};
909 
910   StreamId maxLocalUnidirectionalStreamId_{0};
911 
912   StreamId maxRemoteBidirectionalStreamId_{0};
913 
914   StreamId maxRemoteUnidirectionalStreamId_{0};
915 
916   StreamId initialLocalBidirectionalStreamId_{0};
917 
918   StreamId initialLocalUnidirectionalStreamId_{0};
919 
920   StreamId initialRemoteBidirectionalStreamId_{0};
921 
922   StreamId initialRemoteUnidirectionalStreamId_{0};
923 
924   // The fraction to determine the window by which we will signal the need to
925   // send stream limit updates
926   uint64_t streamLimitWindowingFraction_{2};
927 
928   // Contains the value of a stream window update that should be sent for
929   // remote bidirectional streams.
930   folly::Optional<uint64_t> remoteBidirectionalStreamLimitUpdate_;
931 
932   // Contains the value of a stream window update that should be sent for
933   // remote bidirectional streams.
934   folly::Optional<uint64_t> remoteUnidirectionalStreamLimitUpdate_;
935 
936   uint64_t numControlStreams_{0};
937 
938   // Bidirectional streams that are opened by the peer on the connection.
939   folly::F14FastSet<StreamId> openBidirectionalPeerStreams_;
940 
941   // Unidirectional streams that are opened by the peer on the connection.
942   folly::F14FastSet<StreamId> openUnidirectionalPeerStreams_;
943 
944   // Bidirectional streams that are opened locally on the connection.
945   folly::F14FastSet<StreamId> openBidirectionalLocalStreams_;
946 
947   // Unidirectional streams that are opened locally on the connection.
948   folly::F14FastSet<StreamId> openUnidirectionalLocalStreams_;
949 
950   // A map of streams that are active.
951   folly::F14FastMap<StreamId, QuicStreamState> streams_;
952 
953   // Recently opened peer streams.
954   std::vector<StreamId> newPeerStreams_;
955 
956   // Map of streams that were blocked
957   folly::F14FastMap<StreamId, StreamDataBlockedFrame> blockedStreams_;
958 
959   // Map of streams where the peer was asked to stop sending
960   folly::F14FastMap<StreamId, ApplicationErrorCode> stopSendingStreams_;
961 
962   // Map of stream priority levels
963   folly::F14FastMap<StreamId, PriorityLevel> streamPriorityLevels_;
964 
965   // Streams that had their stream window change and potentially need a window
966   // update sent
967   folly::F14FastSet<StreamId> windowUpdates_;
968 
969   // Streams that had their flow control updated
970   folly::F14FastSet<StreamId> flowControlUpdated_;
971 
972   // Streams that have bytes in loss buffer
973   folly::F14FastSet<StreamId> lossStreams_;
974 
975   // Set of streams that have pending reads
976   folly::F14FastSet<StreamId> readableStreams_;
977 
978   // Set of streams that have pending peeks
979   folly::F14FastSet<StreamId> peekableStreams_;
980 
981   // Set of !control streams that have writable data
982   PriorityQueue writableStreams_;
983   PriorityQueue writableDSRStreams_;
984 
985   // Set of control streams that have writable data
986   std::set<StreamId> writableControlStreams_;
987 
988   // Streams that may be able to call TxCallback
989   folly::F14FastSet<StreamId> txStreams_;
990 
991   // Streams that may be able to callback DeliveryCallback
992   folly::F14FastSet<StreamId> deliverableStreams_;
993 
994   // Streams that are closed but we still have state for
995   folly::F14FastSet<StreamId> closedStreams_;
996 
997   // Observer to notify on changes in the streamPriorityLevels_ map
998   QuicStreamPrioritiesObserver* priorityChangesObserver_{nullptr};
999 
1000   // Record whether or not we are app-idle.
1001   bool isAppIdle_{false};
1002 
1003   const TransportSettings* FOLLY_NONNULL transportSettings_;
1004 
1005   bool maxLocalBidirectionalStreamIdIncreased_{false};
1006   bool maxLocalUnidirectionalStreamIdIncreased_{false};
1007 };
1008 
1009 } // namespace quic
1010