1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  *
7  */
8 
9 #include <quic/api/QuicPacketScheduler.h>
10 #include <quic/flowcontrol/QuicFlowController.h>
11 
12 namespace {
13 using namespace quic;
14 
15 /**
16  * A helper iterator adaptor class that starts iteration of streams from a
17  * specific stream id.
18  */
19 class MiddleStartingIterationWrapper {
20  public:
21   using MapType = std::set<StreamId>;
22 
23   class MiddleStartingIterator
24       : public boost::iterator_facade<
25             MiddleStartingIterator,
26             const MiddleStartingIterationWrapper::MapType::value_type,
27             boost::forward_traversal_tag> {
28     friend class boost::iterator_core_access;
29 
30    public:
31     using MapType = MiddleStartingIterationWrapper::MapType;
32 
33     MiddleStartingIterator() = delete;
34 
MiddleStartingIterator(const MapType * streams,const MapType::key_type & start)35     MiddleStartingIterator(
36         const MapType* streams,
37         const MapType::key_type& start)
38         : streams_(streams) {
39       itr_ = streams_->lower_bound(start);
40       checkForWrapAround();
41       // We don't want to mark it as wrapped around initially, instead just
42       // act as if start was the first element.
43       wrappedAround_ = false;
44     }
45 
MiddleStartingIterator(const MapType * streams,MapType::const_iterator itr)46     MiddleStartingIterator(const MapType* streams, MapType::const_iterator itr)
47         : streams_(streams), itr_(itr) {
48       checkForWrapAround();
49       // We don't want to mark it as wrapped around initially, instead just
50       // act as if start was the first element.
51       wrappedAround_ = false;
52     }
53 
dereference() const54     FOLLY_NODISCARD const MapType::value_type& dereference() const {
55       return *itr_;
56     }
57 
rawIterator() const58     FOLLY_NODISCARD MapType::const_iterator rawIterator() const {
59       return itr_;
60     }
61 
equal(const MiddleStartingIterator & other) const62     FOLLY_NODISCARD bool equal(const MiddleStartingIterator& other) const {
63       return wrappedAround_ == other.wrappedAround_ && itr_ == other.itr_;
64     }
65 
increment()66     void increment() {
67       ++itr_;
68       checkForWrapAround();
69     }
70 
checkForWrapAround()71     void checkForWrapAround() {
72       if (itr_ == streams_->cend()) {
73         wrappedAround_ = true;
74         itr_ = streams_->cbegin();
75       }
76     }
77 
78    private:
79     friend class MiddleStartingIterationWrapper;
80     bool wrappedAround_{false};
81     const MapType* streams_{nullptr};
82     MapType::const_iterator itr_;
83   };
84 
MiddleStartingIterationWrapper(const MapType & streams,const MapType::key_type & start)85   MiddleStartingIterationWrapper(
86       const MapType& streams,
87       const MapType::key_type& start)
88       : streams_(streams), start_(&streams_, start) {}
89 
MiddleStartingIterationWrapper(const MapType & streams,const MapType::const_iterator & start)90   MiddleStartingIterationWrapper(
91       const MapType& streams,
92       const MapType::const_iterator& start)
93       : streams_(streams), start_(&streams_, start) {}
94 
cbegin() const95   FOLLY_NODISCARD MiddleStartingIterator cbegin() const {
96     return start_;
97   }
98 
cend() const99   FOLLY_NODISCARD MiddleStartingIterator cend() const {
100     MiddleStartingIterator itr(start_);
101     itr.wrappedAround_ = true;
102     return itr;
103   }
104 
105  private:
106   const MapType& streams_;
107   const MiddleStartingIterator start_;
108 };
109 
110 } // namespace
111 
112 namespace quic {
113 
hasAcksToSchedule(const AckState & ackState)114 bool hasAcksToSchedule(const AckState& ackState) {
115   folly::Optional<PacketNum> largestAckSend = largestAckToSend(ackState);
116   if (!largestAckSend) {
117     return false;
118   }
119   if (!ackState.largestAckScheduled) {
120     // Never scheduled an ack, we need to send
121     return true;
122   }
123   return *largestAckSend > *(ackState.largestAckScheduled);
124 }
125 
largestAckToSend(const AckState & ackState)126 folly::Optional<PacketNum> largestAckToSend(const AckState& ackState) {
127   if (ackState.acks.empty()) {
128     return folly::none;
129   }
130   return ackState.acks.back().end;
131 }
132 
133 // Schedulers
134 
Builder(QuicConnectionStateBase & conn,EncryptionLevel encryptionLevel,PacketNumberSpace packetNumberSpace,folly::StringPiece name)135 FrameScheduler::Builder::Builder(
136     QuicConnectionStateBase& conn,
137     EncryptionLevel encryptionLevel,
138     PacketNumberSpace packetNumberSpace,
139     folly::StringPiece name)
140     : conn_(conn),
141       encryptionLevel_(encryptionLevel),
142       packetNumberSpace_(packetNumberSpace),
143       name_(std::move(name)) {}
144 
streamFrames()145 FrameScheduler::Builder& FrameScheduler::Builder::streamFrames() {
146   streamFrameScheduler_ = true;
147   return *this;
148 }
149 
ackFrames()150 FrameScheduler::Builder& FrameScheduler::Builder::ackFrames() {
151   ackScheduler_ = true;
152   return *this;
153 }
154 
resetFrames()155 FrameScheduler::Builder& FrameScheduler::Builder::resetFrames() {
156   rstScheduler_ = true;
157   return *this;
158 }
159 
windowUpdateFrames()160 FrameScheduler::Builder& FrameScheduler::Builder::windowUpdateFrames() {
161   windowUpdateScheduler_ = true;
162   return *this;
163 }
164 
blockedFrames()165 FrameScheduler::Builder& FrameScheduler::Builder::blockedFrames() {
166   blockedScheduler_ = true;
167   return *this;
168 }
169 
cryptoFrames()170 FrameScheduler::Builder& FrameScheduler::Builder::cryptoFrames() {
171   cryptoStreamScheduler_ = true;
172   return *this;
173 }
174 
simpleFrames()175 FrameScheduler::Builder& FrameScheduler::Builder::simpleFrames() {
176   simpleFrameScheduler_ = true;
177   return *this;
178 }
179 
pingFrames()180 FrameScheduler::Builder& FrameScheduler::Builder::pingFrames() {
181   pingFrameScheduler_ = true;
182   return *this;
183 }
184 
datagramFrames()185 FrameScheduler::Builder& FrameScheduler::Builder::datagramFrames() {
186   datagramFrameScheduler_ = true;
187   return *this;
188 }
189 
build()190 FrameScheduler FrameScheduler::Builder::build() && {
191   FrameScheduler scheduler(std::move(name_));
192   if (streamFrameScheduler_) {
193     scheduler.streamFrameScheduler_.emplace(StreamFrameScheduler(conn_));
194   }
195   if (ackScheduler_) {
196     scheduler.ackScheduler_.emplace(
197         AckScheduler(conn_, getAckState(conn_, packetNumberSpace_)));
198   }
199   if (rstScheduler_) {
200     scheduler.rstScheduler_.emplace(RstStreamScheduler(conn_));
201   }
202   if (windowUpdateScheduler_) {
203     scheduler.windowUpdateScheduler_.emplace(WindowUpdateScheduler(conn_));
204   }
205   if (blockedScheduler_) {
206     scheduler.blockedScheduler_.emplace(BlockedScheduler(conn_));
207   }
208   if (cryptoStreamScheduler_) {
209     scheduler.cryptoStreamScheduler_.emplace(CryptoStreamScheduler(
210         conn_, *getCryptoStream(*conn_.cryptoState, encryptionLevel_)));
211   }
212   if (simpleFrameScheduler_) {
213     scheduler.simpleFrameScheduler_.emplace(SimpleFrameScheduler(conn_));
214   }
215   if (pingFrameScheduler_) {
216     scheduler.pingFrameScheduler_.emplace(PingFrameScheduler(conn_));
217   }
218   if (datagramFrameScheduler_) {
219     scheduler.datagramFrameScheduler_.emplace(DatagramFrameScheduler(conn_));
220   }
221   return scheduler;
222 }
223 
FrameScheduler(folly::StringPiece name)224 FrameScheduler::FrameScheduler(folly::StringPiece name) : name_(name) {}
225 
scheduleFramesForPacket(PacketBuilderInterface && builder,uint32_t writableBytes)226 SchedulingResult FrameScheduler::scheduleFramesForPacket(
227     PacketBuilderInterface&& builder,
228     uint32_t writableBytes) {
229   builder.encodePacketHeader();
230   // We need to keep track of writable bytes after writing header.
231   writableBytes = writableBytes > builder.getHeaderBytes()
232       ? writableBytes - builder.getHeaderBytes()
233       : 0;
234   // We cannot return early if the writablyBytes drops to 0 here, since pure
235   // acks can skip writableBytes entirely.
236   PacketBuilderWrapper wrapper(builder, writableBytes);
237   bool cryptoDataWritten = false;
238   bool rstWritten = false;
239   if (cryptoStreamScheduler_ && cryptoStreamScheduler_->hasData()) {
240     cryptoDataWritten = cryptoStreamScheduler_->writeCryptoData(wrapper);
241   }
242   if (rstScheduler_ && rstScheduler_->hasPendingRsts()) {
243     rstWritten = rstScheduler_->writeRsts(wrapper);
244   }
245   // Long time ago we decided RST has higher priority than Acks.
246   if (ackScheduler_ && ackScheduler_->hasPendingAcks()) {
247     if (cryptoDataWritten || rstWritten) {
248       // If packet has non ack data, it is subject to congestion control. We
249       // need to use the wrapper/
250       ackScheduler_->writeNextAcks(wrapper);
251     } else {
252       // If we start with writing acks, we will let the ack scheduler write
253       // up to the full packet space. If the ack bytes exceeds the writable
254       // bytes, this will be a pure ack packet and it will skip congestion
255       // controller. Otherwise, we will give other schedulers an opportunity to
256       // write up to writable bytes.
257       ackScheduler_->writeNextAcks(builder);
258     }
259   }
260   if (windowUpdateScheduler_ &&
261       windowUpdateScheduler_->hasPendingWindowUpdates()) {
262     windowUpdateScheduler_->writeWindowUpdates(wrapper);
263   }
264   if (blockedScheduler_ && blockedScheduler_->hasPendingBlockedFrames()) {
265     blockedScheduler_->writeBlockedFrames(wrapper);
266   }
267   // Simple frames should be scheduled before stream frames and retx frames
268   // because those frames might fill up all available bytes for writing.
269   // If we are trying to send a PathChallenge frame it may be blocked by those,
270   // causing a connection to proceed slowly because of path validation rate
271   // limiting.
272   if (simpleFrameScheduler_ &&
273       simpleFrameScheduler_->hasPendingSimpleFrames()) {
274     simpleFrameScheduler_->writeSimpleFrames(wrapper);
275   }
276   if (pingFrameScheduler_ && pingFrameScheduler_->hasPingFrame()) {
277     pingFrameScheduler_->writePing(wrapper);
278   }
279   if (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) {
280     streamFrameScheduler_->writeStreams(wrapper);
281   }
282   if (datagramFrameScheduler_ &&
283       datagramFrameScheduler_->hasPendingDatagramFrames()) {
284     datagramFrameScheduler_->writeDatagramFrames(wrapper);
285   }
286 
287   if (builder.hasFramesPending()) {
288     const LongHeader* longHeader = builder.getPacketHeader().asLong();
289     bool initialPacket =
290         longHeader && longHeader->getHeaderType() == LongHeader::Types::Initial;
291     if (initialPacket) {
292       // This is the initial packet, we need to fill er up.
293       while (wrapper.remainingSpaceInPkt() > 0) {
294         writeFrame(PaddingFrame(), builder);
295       }
296     }
297   }
298 
299   return SchedulingResult(folly::none, std::move(builder).buildPacket());
300 }
301 
hasData() const302 bool FrameScheduler::hasData() const {
303   return (ackScheduler_ && ackScheduler_->hasPendingAcks()) ||
304       hasImmediateData();
305 }
306 
hasImmediateData() const307 bool FrameScheduler::hasImmediateData() const {
308   return (cryptoStreamScheduler_ && cryptoStreamScheduler_->hasData()) ||
309       (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) ||
310       (rstScheduler_ && rstScheduler_->hasPendingRsts()) ||
311       (windowUpdateScheduler_ &&
312        windowUpdateScheduler_->hasPendingWindowUpdates()) ||
313       (blockedScheduler_ && blockedScheduler_->hasPendingBlockedFrames()) ||
314       (simpleFrameScheduler_ &&
315        simpleFrameScheduler_->hasPendingSimpleFrames()) ||
316       (pingFrameScheduler_ && pingFrameScheduler_->hasPingFrame()) ||
317       (datagramFrameScheduler_ &&
318        datagramFrameScheduler_->hasPendingDatagramFrames());
319 }
320 
name() const321 folly::StringPiece FrameScheduler::name() const {
322   return name_;
323 }
324 
writeStreamLossBuffers(PacketBuilderInterface & builder,QuicStreamState & stream)325 bool StreamFrameScheduler::writeStreamLossBuffers(
326     PacketBuilderInterface& builder,
327     QuicStreamState& stream) {
328   bool wroteStreamFrame = false;
329   for (auto buffer = stream.lossBuffer.cbegin();
330        buffer != stream.lossBuffer.cend();
331        ++buffer) {
332     auto bufferLen = buffer->data.chainLength();
333     auto dataLen = writeStreamFrameHeader(
334         builder,
335         stream.id,
336         buffer->offset,
337         bufferLen, // writeBufferLen -- only the len of the single buffer.
338         bufferLen, // flowControlLen -- not relevant, already flow controlled.
339         buffer->eof,
340         folly::none /* skipLenHint */);
341     if (dataLen) {
342       wroteStreamFrame = true;
343       writeStreamFrameData(builder, buffer->data, *dataLen);
344       VLOG(4) << "Wrote loss data for stream=" << stream.id
345               << " offset=" << buffer->offset << " bytes=" << *dataLen
346               << " fin=" << (buffer->eof && *dataLen == bufferLen) << " "
347               << conn_;
348     } else {
349       // Either we filled the packet or ran out of data for this stream (EOF?)
350       break;
351     }
352   }
353   return wroteStreamFrame;
354 }
355 
StreamFrameScheduler(QuicConnectionStateBase & conn)356 StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn)
357     : conn_(conn) {}
358 
writeSingleStream(PacketBuilderInterface & builder,QuicStreamState & stream,uint64_t & connWritableBytes)359 bool StreamFrameScheduler::writeSingleStream(
360     PacketBuilderInterface& builder,
361     QuicStreamState& stream,
362     uint64_t& connWritableBytes) {
363   if (!stream.lossBuffer.empty()) {
364     if (!writeStreamLossBuffers(builder, stream)) {
365       return false;
366     }
367   }
368   if (stream.hasWritableData() && connWritableBytes > 0) {
369     if (!writeStreamFrame(builder, stream, connWritableBytes)) {
370       return false;
371     }
372   }
373   return true;
374 }
375 
writeStreamsHelper(PacketBuilderInterface & builder,const std::set<StreamId> & writableStreams,StreamId nextScheduledStream,uint64_t & connWritableBytes,bool streamPerPacket)376 StreamId StreamFrameScheduler::writeStreamsHelper(
377     PacketBuilderInterface& builder,
378     const std::set<StreamId>& writableStreams,
379     StreamId nextScheduledStream,
380     uint64_t& connWritableBytes,
381     bool streamPerPacket) {
382   MiddleStartingIterationWrapper wrapper(writableStreams, nextScheduledStream);
383   auto writableStreamItr = wrapper.cbegin();
384   // This will write the stream frames in a round robin fashion ordered by
385   // stream id. The iterator will wrap around the collection at the end, and we
386   // keep track of the value at the next iteration. This allows us to start
387   // writing at the next stream when building the next packet.
388   while (writableStreamItr != wrapper.cend()) {
389     auto stream = conn_.streamManager->findStream(*writableStreamItr);
390     CHECK(stream);
391     if (!writeSingleStream(builder, *stream, connWritableBytes)) {
392       break;
393     }
394     writableStreamItr++;
395     if (streamPerPacket) {
396       break;
397     }
398   }
399   return *writableStreamItr;
400 }
401 
writeStreamsHelper(PacketBuilderInterface & builder,PriorityQueue & writableStreams,uint64_t & connWritableBytes,bool streamPerPacket)402 void StreamFrameScheduler::writeStreamsHelper(
403     PacketBuilderInterface& builder,
404     PriorityQueue& writableStreams,
405     uint64_t& connWritableBytes,
406     bool streamPerPacket) {
407   // Fill a packet with non-control stream data, in priority order
408   for (size_t index = 0; index < writableStreams.levels.size() &&
409        builder.remainingSpaceInPkt() > 0;
410        index++) {
411     PriorityQueue::Level& level = writableStreams.levels[index];
412     if (level.streams.empty()) {
413       // No data here, keep going
414       continue;
415     }
416     if (level.incremental) {
417       // Round robin the streams at this level
418       MiddleStartingIterationWrapper wrapper(level.streams, level.next);
419       auto writableStreamItr = wrapper.cbegin();
420       while (writableStreamItr != wrapper.cend()) {
421         auto stream = conn_.streamManager->findStream(*writableStreamItr);
422         CHECK(stream);
423         if (!writeSingleStream(builder, *stream, connWritableBytes)) {
424           break;
425         }
426         writableStreamItr++;
427         if (streamPerPacket) {
428           level.next = writableStreamItr.rawIterator();
429           return;
430         }
431       }
432       level.next = writableStreamItr.rawIterator();
433     } else {
434       // walk the sequential streams in order until we run out of space
435       for (auto streamIt = level.streams.begin();
436            streamIt != level.streams.end() && connWritableBytes > 0;
437            ++streamIt) {
438         auto stream = conn_.streamManager->findStream(*streamIt);
439         CHECK(stream);
440         if (!writeSingleStream(builder, *stream, connWritableBytes)) {
441           break;
442         }
443         if (streamPerPacket) {
444           return;
445         }
446       }
447     }
448   }
449 }
450 
writeStreams(PacketBuilderInterface & builder)451 void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
452   DCHECK(conn_.streamManager->hasWritable());
453   uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
454   // Write the control streams first as a naive binary priority mechanism.
455   const auto& writableControlStreams =
456       conn_.streamManager->writableControlStreams();
457   if (!writableControlStreams.empty()) {
458     conn_.schedulingState.nextScheduledControlStream = writeStreamsHelper(
459         builder,
460         writableControlStreams,
461         conn_.schedulingState.nextScheduledControlStream,
462         connWritableBytes,
463         conn_.transportSettings.streamFramePerPacket);
464   }
465   auto& writableStreams = conn_.streamManager->writableStreams();
466   if (!writableStreams.empty()) {
467     writeStreamsHelper(
468         builder,
469         writableStreams,
470         connWritableBytes,
471         conn_.transportSettings.streamFramePerPacket);
472   }
473 } // namespace quic
474 
hasPendingData() const475 bool StreamFrameScheduler::hasPendingData() const {
476   return conn_.streamManager->hasNonDSRWritable() &&
477       getSendConnFlowControlBytesWire(conn_) > 0;
478 }
479 
writeStreamFrame(PacketBuilderInterface & builder,QuicStreamState & stream,uint64_t & connWritableBytes)480 bool StreamFrameScheduler::writeStreamFrame(
481     PacketBuilderInterface& builder,
482     QuicStreamState& stream,
483     uint64_t& connWritableBytes) {
484   if (builder.remainingSpaceInPkt() == 0) {
485     return false;
486   }
487 
488   // hasWritableData is the condition which has to be satisfied for the
489   // stream to be in writableList
490   CHECK(stream.hasWritableData());
491 
492   uint64_t flowControlLen =
493       std::min(getSendStreamFlowControlBytesWire(stream), connWritableBytes);
494   uint64_t bufferLen = stream.writeBuffer.chainLength();
495   // We can't write FIN directly from here if writeBufMeta has pending bytes to
496   // send.
497   bool canWriteFin = stream.finalWriteOffset.has_value() &&
498       bufferLen <= flowControlLen && stream.writeBufMeta.length == 0;
499   auto writeOffset = stream.currentWriteOffset;
500   if (canWriteFin && stream.writeBuffer.empty()) {
501     // If we are writing FIN only from here, do not use currentWriteOffset in
502     // case some bufMeta has been sent before.
503     writeOffset = stream.finalWriteOffset.value();
504   }
505   auto dataLen = writeStreamFrameHeader(
506       builder,
507       stream.id,
508       writeOffset,
509       bufferLen,
510       flowControlLen,
511       canWriteFin,
512       folly::none /* skipLenHint */);
513   if (!dataLen) {
514     return false;
515   }
516   writeStreamFrameData(builder, stream.writeBuffer, *dataLen);
517   VLOG(4) << "Wrote stream frame stream=" << stream.id
518           << " offset=" << stream.currentWriteOffset
519           << " bytesWritten=" << *dataLen
520           << " finWritten=" << (canWriteFin && *dataLen == bufferLen) << " "
521           << conn_;
522   connWritableBytes -= dataLen.value();
523   return true;
524 }
525 
AckScheduler(const QuicConnectionStateBase & conn,const AckState & ackState)526 AckScheduler::AckScheduler(
527     const QuicConnectionStateBase& conn,
528     const AckState& ackState)
529     : conn_(conn), ackState_(ackState) {}
530 
writeNextAcks(PacketBuilderInterface & builder)531 folly::Optional<PacketNum> AckScheduler::writeNextAcks(
532     PacketBuilderInterface& builder) {
533   // Use default ack delay for long headers. Usually long headers are sent
534   // before crypto negotiation, so the peer might not know about the ack delay
535   // exponent yet, so we use the default.
536   uint8_t ackDelayExponentToUse =
537       builder.getPacketHeader().getHeaderForm() == HeaderForm::Long
538       ? kDefaultAckDelayExponent
539       : conn_.transportSettings.ackDelayExponent;
540   auto largestAckedPacketNum = *largestAckToSend(ackState_);
541   auto ackingTime = Clock::now();
542   DCHECK(ackState_.largestRecvdPacketTime.hasValue())
543       << "Missing received time for the largest acked packet";
544   // assuming that we're going to ack the largest received with hightest pri
545   auto receivedTime = *ackState_.largestRecvdPacketTime;
546   std::chrono::microseconds ackDelay =
547       (ackingTime > receivedTime
548            ? std::chrono::duration_cast<std::chrono::microseconds>(
549                  ackingTime - receivedTime)
550            : 0us);
551   AckFrameMetaData meta(ackState_.acks, ackDelay, ackDelayExponentToUse);
552   auto ackWriteResult = writeAckFrame(meta, builder);
553   if (!ackWriteResult) {
554     return folly::none;
555   }
556   return largestAckedPacketNum;
557 }
558 
hasPendingAcks() const559 bool AckScheduler::hasPendingAcks() const {
560   return hasAcksToSchedule(ackState_);
561 }
562 
RstStreamScheduler(const QuicConnectionStateBase & conn)563 RstStreamScheduler::RstStreamScheduler(const QuicConnectionStateBase& conn)
564     : conn_(conn) {}
565 
hasPendingRsts() const566 bool RstStreamScheduler::hasPendingRsts() const {
567   return !conn_.pendingEvents.resets.empty();
568 }
569 
writeRsts(PacketBuilderInterface & builder)570 bool RstStreamScheduler::writeRsts(PacketBuilderInterface& builder) {
571   bool rstWritten = false;
572   for (const auto& resetStream : conn_.pendingEvents.resets) {
573     auto bytesWritten = writeFrame(resetStream.second, builder);
574     if (!bytesWritten) {
575       break;
576     }
577     rstWritten = true;
578   }
579   return rstWritten;
580 }
581 
SimpleFrameScheduler(const QuicConnectionStateBase & conn)582 SimpleFrameScheduler::SimpleFrameScheduler(const QuicConnectionStateBase& conn)
583     : conn_(conn) {}
584 
hasPendingSimpleFrames() const585 bool SimpleFrameScheduler::hasPendingSimpleFrames() const {
586   return conn_.pendingEvents.pathChallenge ||
587       !conn_.pendingEvents.frames.empty();
588 }
589 
writeSimpleFrames(PacketBuilderInterface & builder)590 bool SimpleFrameScheduler::writeSimpleFrames(PacketBuilderInterface& builder) {
591   auto& pathChallenge = conn_.pendingEvents.pathChallenge;
592   if (pathChallenge &&
593       !writeSimpleFrame(QuicSimpleFrame(*pathChallenge), builder)) {
594     return false;
595   }
596 
597   bool framesWritten = false;
598   for (auto& frame : conn_.pendingEvents.frames) {
599     auto bytesWritten = writeSimpleFrame(QuicSimpleFrame(frame), builder);
600     if (!bytesWritten) {
601       break;
602     }
603     framesWritten = true;
604   }
605   return framesWritten;
606 }
607 
PingFrameScheduler(const QuicConnectionStateBase & conn)608 PingFrameScheduler::PingFrameScheduler(const QuicConnectionStateBase& conn)
609     : conn_(conn) {}
610 
hasPingFrame() const611 bool PingFrameScheduler::hasPingFrame() const {
612   return conn_.pendingEvents.sendPing;
613 }
614 
writePing(PacketBuilderInterface & builder)615 bool PingFrameScheduler::writePing(PacketBuilderInterface& builder) {
616   return 0 != writeFrame(PingFrame(), builder);
617 }
618 
DatagramFrameScheduler(QuicConnectionStateBase & conn)619 DatagramFrameScheduler::DatagramFrameScheduler(QuicConnectionStateBase& conn)
620     : conn_(conn) {}
621 
hasPendingDatagramFrames() const622 bool DatagramFrameScheduler::hasPendingDatagramFrames() const {
623   return !conn_.datagramState.writeBuffer.empty();
624 }
625 
writeDatagramFrames(PacketBuilderInterface & builder)626 bool DatagramFrameScheduler::writeDatagramFrames(
627     PacketBuilderInterface& builder) {
628   bool sent = false;
629   for (size_t i = 0; i <= conn_.datagramState.writeBuffer.size(); ++i) {
630     auto& payload = conn_.datagramState.writeBuffer.front();
631     auto datagramFrame = DatagramFrame(payload.chainLength(), payload.move());
632     if (writeFrame(datagramFrame, builder) > 0) {
633       conn_.datagramState.writeBuffer.pop_front();
634       sent = true;
635     } else {
636       payload = datagramFrame.data.move();
637       break;
638     }
639     if (conn_.transportSettings.datagramConfig.framePerPacket) {
640       break;
641     }
642   }
643   return sent;
644 }
645 
WindowUpdateScheduler(const QuicConnectionStateBase & conn)646 WindowUpdateScheduler::WindowUpdateScheduler(
647     const QuicConnectionStateBase& conn)
648     : conn_(conn) {}
649 
hasPendingWindowUpdates() const650 bool WindowUpdateScheduler::hasPendingWindowUpdates() const {
651   return conn_.streamManager->hasWindowUpdates() ||
652       conn_.pendingEvents.connWindowUpdate;
653 }
654 
writeWindowUpdates(PacketBuilderInterface & builder)655 void WindowUpdateScheduler::writeWindowUpdates(
656     PacketBuilderInterface& builder) {
657   if (conn_.pendingEvents.connWindowUpdate) {
658     auto maxDataFrame = generateMaxDataFrame(conn_);
659     auto maximumData = maxDataFrame.maximumData;
660     auto bytes = writeFrame(std::move(maxDataFrame), builder);
661     if (bytes) {
662       VLOG(4) << "Wrote max_data=" << maximumData << " " << conn_;
663     }
664   }
665   for (const auto& windowUpdateStream : conn_.streamManager->windowUpdates()) {
666     auto stream = conn_.streamManager->findStream(windowUpdateStream);
667     if (!stream) {
668       continue;
669     }
670     auto maxStreamDataFrame = generateMaxStreamDataFrame(*stream);
671     auto maximumData = maxStreamDataFrame.maximumData;
672     auto bytes = writeFrame(std::move(maxStreamDataFrame), builder);
673     if (!bytes) {
674       break;
675     }
676     VLOG(4) << "Wrote max_stream_data stream=" << stream->id
677             << " maximumData=" << maximumData << " " << conn_;
678   }
679 }
680 
BlockedScheduler(const QuicConnectionStateBase & conn)681 BlockedScheduler::BlockedScheduler(const QuicConnectionStateBase& conn)
682     : conn_(conn) {}
683 
hasPendingBlockedFrames() const684 bool BlockedScheduler::hasPendingBlockedFrames() const {
685   return !conn_.streamManager->blockedStreams().empty() ||
686       conn_.pendingEvents.sendDataBlocked;
687 }
688 
writeBlockedFrames(PacketBuilderInterface & builder)689 void BlockedScheduler::writeBlockedFrames(PacketBuilderInterface& builder) {
690   if (conn_.pendingEvents.sendDataBlocked) {
691     // Connection is write blocked due to connection level flow control.
692     DataBlockedFrame blockedFrame(
693         conn_.flowControlState.peerAdvertisedMaxOffset);
694     auto result = writeFrame(blockedFrame, builder);
695     if (!result) {
696       // If there is not enough room to write data blocked frame in the
697       // curretn packet, we won't be able to write stream blocked frames either
698       // so just return.
699       return;
700     }
701   }
702   for (const auto& blockedStream : conn_.streamManager->blockedStreams()) {
703     auto bytesWritten = writeFrame(blockedStream.second, builder);
704     if (!bytesWritten) {
705       break;
706     }
707   }
708 }
709 
CryptoStreamScheduler(const QuicConnectionStateBase & conn,const QuicCryptoStream & cryptoStream)710 CryptoStreamScheduler::CryptoStreamScheduler(
711     const QuicConnectionStateBase& conn,
712     const QuicCryptoStream& cryptoStream)
713     : conn_(conn), cryptoStream_(cryptoStream) {}
714 
writeCryptoData(PacketBuilderInterface & builder)715 bool CryptoStreamScheduler::writeCryptoData(PacketBuilderInterface& builder) {
716   bool cryptoDataWritten = false;
717   uint64_t writableData =
718       folly::to<uint64_t>(cryptoStream_.writeBuffer.chainLength());
719   // We use the crypto scheduler to reschedule the retransmissions of the
720   // crypto streams so that we know that retransmissions of the crypto data
721   // will always take precedence over the crypto data.
722   for (const auto& buffer : cryptoStream_.lossBuffer) {
723     auto res = writeCryptoFrame(buffer.offset, buffer.data, builder);
724     if (!res) {
725       return cryptoDataWritten;
726     }
727     VLOG(4) << "Wrote retransmitted crypto"
728             << " offset=" << buffer.offset << " bytes=" << res->len << " "
729             << conn_;
730     cryptoDataWritten = true;
731   }
732 
733   if (writableData != 0) {
734     auto res = writeCryptoFrame(
735         cryptoStream_.currentWriteOffset, cryptoStream_.writeBuffer, builder);
736     if (res) {
737       VLOG(4) << "Wrote crypto frame"
738               << " offset=" << cryptoStream_.currentWriteOffset
739               << " bytesWritten=" << res->len << " " << conn_;
740       cryptoDataWritten = true;
741     }
742   }
743   return cryptoDataWritten;
744 }
745 
hasData() const746 bool CryptoStreamScheduler::hasData() const {
747   return !cryptoStream_.writeBuffer.empty() ||
748       !cryptoStream_.lossBuffer.empty();
749 }
750 
CloningScheduler(FrameScheduler & scheduler,QuicConnectionStateBase & conn,const folly::StringPiece name,uint64_t cipherOverhead)751 CloningScheduler::CloningScheduler(
752     FrameScheduler& scheduler,
753     QuicConnectionStateBase& conn,
754     const folly::StringPiece name,
755     uint64_t cipherOverhead)
756     : frameScheduler_(scheduler),
757       conn_(conn),
758       name_(name),
759       cipherOverhead_(cipherOverhead) {}
760 
hasData() const761 bool CloningScheduler::hasData() const {
762   // TODO: I'm not completely convinced d6d.outstandingProbes has been updated
763   // correctly.
764   return frameScheduler_.hasData() ||
765       conn_.outstandings.numOutstanding() >
766       conn_.d6d.outstandingProbes + conn_.outstandings.dsrCount;
767 }
768 
scheduleFramesForPacket(PacketBuilderInterface && builder,uint32_t writableBytes)769 SchedulingResult CloningScheduler::scheduleFramesForPacket(
770     PacketBuilderInterface&& builder,
771     uint32_t writableBytes) {
772   // The writableBytes in this function shouldn't be limited by cwnd, since
773   // we only use CloningScheduler for the cases that we want to bypass cwnd for
774   // now.
775   if (frameScheduler_.hasData()) {
776     // Note that there is a possibility that we end up writing nothing here. But
777     // if frameScheduler_ hasData() to write, we shouldn't invoke the cloning
778     // path if the write fails.
779     return frameScheduler_.scheduleFramesForPacket(
780         std::move(builder), writableBytes);
781   }
782   // TODO: We can avoid the copy & rebuild of the header by creating an
783   // independent header builder.
784   auto header = builder.getPacketHeader();
785   std::move(builder).releaseOutputBuffer();
786   // Look for an outstanding packet that's no larger than the writableBytes
787   for (auto& outstandingPacket : conn_.outstandings.packets) {
788     if (outstandingPacket.declaredLost ||
789         outstandingPacket.metadata.isD6DProbe ||
790         outstandingPacket.isDSRPacket) {
791       continue;
792     }
793     auto opPnSpace = outstandingPacket.packet.header.getPacketNumberSpace();
794     // Reusing the RegularQuicPacketBuilder throughout loop bodies will lead to
795     // frames belong to different original packets being written into the same
796     // clone packet. So re-create a RegularQuicPacketBuilder every time.
797     // TODO: We can avoid the copy & rebuild of the header by creating an
798     // independent header builder.
799     auto builderPnSpace = builder.getPacketHeader().getPacketNumberSpace();
800     if (opPnSpace != builderPnSpace) {
801       continue;
802     }
803     size_t prevSize = 0;
804     if (conn_.transportSettings.dataPathType ==
805         DataPathType::ContinuousMemory) {
806       ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
807       prevSize = scopedBufAccessor.buf()->length();
808     }
809     // Reusing the same builder throughout loop bodies will lead to frames
810     // belong to different original packets being written into the same clone
811     // packet. So re-create a builder every time.
812     std::unique_ptr<PacketBuilderInterface> internalBuilder;
813     if (conn_.transportSettings.dataPathType == DataPathType::ChainedMemory) {
814       internalBuilder = std::make_unique<RegularQuicPacketBuilder>(
815           conn_.udpSendPacketLen,
816           header,
817           getAckState(conn_, builderPnSpace).largestAckedByPeer.value_or(0));
818     } else {
819       CHECK(conn_.bufAccessor && conn_.bufAccessor->ownsBuffer());
820       internalBuilder = std::make_unique<InplaceQuicPacketBuilder>(
821           *conn_.bufAccessor,
822           conn_.udpSendPacketLen,
823           header,
824           getAckState(conn_, builderPnSpace).largestAckedByPeer.value_or(0));
825     }
826     // If the packet is already a clone that has been processed, we don't clone
827     // it again.
828     if (outstandingPacket.associatedEvent &&
829         conn_.outstandings.packetEvents.count(
830             *outstandingPacket.associatedEvent) == 0) {
831       continue;
832     }
833     // I think this only fail if udpSendPacketLen somehow shrinks in the middle
834     // of a connection.
835     if (outstandingPacket.metadata.encodedSize >
836         writableBytes + cipherOverhead_) {
837       continue;
838     }
839 
840     internalBuilder->accountForCipherOverhead(cipherOverhead_);
841     internalBuilder->encodePacketHeader();
842     PacketRebuilder rebuilder(*internalBuilder, conn_);
843 
844     // TODO: It's possible we write out a packet that's larger than the packet
845     // size limit. For example, when the packet sequence number has advanced to
846     // a point where we need more bytes to encoded it than that of the original
847     // packet. In that case, if the original packet is already at the packet
848     // size limit, we will generate a packet larger than the limit. We can
849     // either ignore the problem, hoping the packet will be able to travel the
850     // network just fine; Or we can throw away the built packet and send a ping.
851 
852     // Rebuilder will write the rest of frames
853     auto rebuildResult = rebuilder.rebuildFromPacket(outstandingPacket);
854     if (rebuildResult) {
855       return SchedulingResult(
856           std::move(rebuildResult), std::move(*internalBuilder).buildPacket());
857     } else if (
858         conn_.transportSettings.dataPathType ==
859         DataPathType::ContinuousMemory) {
860       // When we use Inplace packet building and reuse the write buffer, even if
861       // the packet rebuild has failed, there might be some bytes already
862       // written into the buffer and the buffer tail pointer has already moved.
863       // We need to roll back the tail pointer to the position before the packet
864       // building to exclude those bytes. Otherwise these bytes will be sitting
865       // in between legit packets inside the buffer and will either cause errors
866       // further down the write path, or be sent out and then dropped at peer
867       // when peer fail to parse them.
868       internalBuilder.reset();
869       CHECK(conn_.bufAccessor && conn_.bufAccessor->ownsBuffer());
870       ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
871       auto& buf = scopedBufAccessor.buf();
872       buf->trimEnd(buf->length() - prevSize);
873     }
874   }
875   return SchedulingResult(folly::none, folly::none);
876 }
877 
name() const878 folly::StringPiece CloningScheduler::name() const {
879   return name_;
880 }
881 
D6DProbeScheduler(QuicConnectionStateBase & conn,folly::StringPiece name,uint64_t cipherOverhead,uint32_t probeSize)882 D6DProbeScheduler::D6DProbeScheduler(
883     QuicConnectionStateBase& conn,
884     folly::StringPiece name,
885     uint64_t cipherOverhead,
886     uint32_t probeSize)
887     : conn_(conn),
888       name_(name),
889       cipherOverhead_(cipherOverhead),
890       probeSize_(probeSize) {}
891 
892 /**
893  * This scheduler always has data since all it does is send PING with PADDINGs
894  */
hasData() const895 bool D6DProbeScheduler::hasData() const {
896   return !probeSent_;
897 }
898 
899 /**
900  * D6DProbeScheduler ignores writableBytes because it does not respect
901  * congestion control. The reasons it doesn't are that
902  * - d6d probes are occasional burst of bytes in a single packet
903  * - no rtx needed when probe lost
904  */
scheduleFramesForPacket(PacketBuilderInterface && builder,uint32_t)905 SchedulingResult D6DProbeScheduler::scheduleFramesForPacket(
906     PacketBuilderInterface&& builder,
907     uint32_t /* writableBytes */) {
908   builder.encodePacketHeader();
909   int res = writeFrame(PingFrame(), builder);
910   CHECK_GT(res, 0) << __func__ << " "
911                    << "failed to write ping frame"
912                    << "remainingBytes: " << builder.remainingSpaceInPkt();
913   CHECK(builder.canBuildPacket()) << __func__ << " "
914                                   << "inner builder cannot build packet";
915 
916   auto pingOnlyPacket = std::move(builder).buildPacket();
917 
918   std::unique_ptr<WrapperPacketBuilderInterface> sizeEnforcedBuilder;
919   if (conn_.transportSettings.dataPathType == DataPathType::ChainedMemory) {
920     sizeEnforcedBuilder = std::make_unique<RegularSizeEnforcedPacketBuilder>(
921         std::move(pingOnlyPacket), probeSize_, cipherOverhead_);
922   } else {
923     CHECK(conn_.bufAccessor && conn_.bufAccessor->ownsBuffer());
924     sizeEnforcedBuilder = std::make_unique<InplaceSizeEnforcedPacketBuilder>(
925         *conn_.bufAccessor,
926         std::move(pingOnlyPacket),
927         probeSize_,
928         cipherOverhead_);
929   }
930   CHECK(sizeEnforcedBuilder->canBuildPacket())
931       << __func__ << " "
932       << "sizeEnforcedBuilder cannot build packet";
933 
934   auto resultPacket = std::move(*sizeEnforcedBuilder).buildPacket();
935 
936   auto resultPacketSize = resultPacket.header->computeChainDataLength() +
937       resultPacket.body->computeChainDataLength() + cipherOverhead_;
938   CHECK_EQ(resultPacketSize, probeSize_)
939       << __func__ << " "
940       << "result packet does not have enforced size,"
941       << " expecting: " << probeSize_ << " getting: " << resultPacketSize;
942 
943   VLOG_IF(4, conn_.d6d.lastProbe.has_value())
944       << __func__ << " "
945       << "invalidating old non-acked d6d probe,"
946       << " seq: " << conn_.d6d.lastProbe->packetNum
947       << " packet size: " << conn_.d6d.lastProbe->packetSize;
948 
949   conn_.d6d.lastProbe = D6DProbePacket(
950       resultPacket.packet.header.getPacketSequenceNum(), probeSize_);
951 
952   probeSent_ = true;
953   return SchedulingResult(folly::none, std::move(resultPacket));
954 }
955 
name() const956 folly::StringPiece D6DProbeScheduler::name() const {
957   return name_;
958 }
959 
960 } // namespace quic
961