1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 *
7 */
8
9 #include <quic/api/QuicPacketScheduler.h>
10 #include <quic/flowcontrol/QuicFlowController.h>
11
12 namespace {
13 using namespace quic;
14
15 /**
16 * A helper iterator adaptor class that starts iteration of streams from a
17 * specific stream id.
18 */
19 class MiddleStartingIterationWrapper {
20 public:
21 using MapType = std::set<StreamId>;
22
23 class MiddleStartingIterator
24 : public boost::iterator_facade<
25 MiddleStartingIterator,
26 const MiddleStartingIterationWrapper::MapType::value_type,
27 boost::forward_traversal_tag> {
28 friend class boost::iterator_core_access;
29
30 public:
31 using MapType = MiddleStartingIterationWrapper::MapType;
32
33 MiddleStartingIterator() = delete;
34
MiddleStartingIterator(const MapType * streams,const MapType::key_type & start)35 MiddleStartingIterator(
36 const MapType* streams,
37 const MapType::key_type& start)
38 : streams_(streams) {
39 itr_ = streams_->lower_bound(start);
40 checkForWrapAround();
41 // We don't want to mark it as wrapped around initially, instead just
42 // act as if start was the first element.
43 wrappedAround_ = false;
44 }
45
MiddleStartingIterator(const MapType * streams,MapType::const_iterator itr)46 MiddleStartingIterator(const MapType* streams, MapType::const_iterator itr)
47 : streams_(streams), itr_(itr) {
48 checkForWrapAround();
49 // We don't want to mark it as wrapped around initially, instead just
50 // act as if start was the first element.
51 wrappedAround_ = false;
52 }
53
dereference() const54 FOLLY_NODISCARD const MapType::value_type& dereference() const {
55 return *itr_;
56 }
57
rawIterator() const58 FOLLY_NODISCARD MapType::const_iterator rawIterator() const {
59 return itr_;
60 }
61
equal(const MiddleStartingIterator & other) const62 FOLLY_NODISCARD bool equal(const MiddleStartingIterator& other) const {
63 return wrappedAround_ == other.wrappedAround_ && itr_ == other.itr_;
64 }
65
increment()66 void increment() {
67 ++itr_;
68 checkForWrapAround();
69 }
70
checkForWrapAround()71 void checkForWrapAround() {
72 if (itr_ == streams_->cend()) {
73 wrappedAround_ = true;
74 itr_ = streams_->cbegin();
75 }
76 }
77
78 private:
79 friend class MiddleStartingIterationWrapper;
80 bool wrappedAround_{false};
81 const MapType* streams_{nullptr};
82 MapType::const_iterator itr_;
83 };
84
MiddleStartingIterationWrapper(const MapType & streams,const MapType::key_type & start)85 MiddleStartingIterationWrapper(
86 const MapType& streams,
87 const MapType::key_type& start)
88 : streams_(streams), start_(&streams_, start) {}
89
MiddleStartingIterationWrapper(const MapType & streams,const MapType::const_iterator & start)90 MiddleStartingIterationWrapper(
91 const MapType& streams,
92 const MapType::const_iterator& start)
93 : streams_(streams), start_(&streams_, start) {}
94
cbegin() const95 FOLLY_NODISCARD MiddleStartingIterator cbegin() const {
96 return start_;
97 }
98
cend() const99 FOLLY_NODISCARD MiddleStartingIterator cend() const {
100 MiddleStartingIterator itr(start_);
101 itr.wrappedAround_ = true;
102 return itr;
103 }
104
105 private:
106 const MapType& streams_;
107 const MiddleStartingIterator start_;
108 };
109
110 } // namespace
111
112 namespace quic {
113
hasAcksToSchedule(const AckState & ackState)114 bool hasAcksToSchedule(const AckState& ackState) {
115 folly::Optional<PacketNum> largestAckSend = largestAckToSend(ackState);
116 if (!largestAckSend) {
117 return false;
118 }
119 if (!ackState.largestAckScheduled) {
120 // Never scheduled an ack, we need to send
121 return true;
122 }
123 return *largestAckSend > *(ackState.largestAckScheduled);
124 }
125
largestAckToSend(const AckState & ackState)126 folly::Optional<PacketNum> largestAckToSend(const AckState& ackState) {
127 if (ackState.acks.empty()) {
128 return folly::none;
129 }
130 return ackState.acks.back().end;
131 }
132
133 // Schedulers
134
Builder(QuicConnectionStateBase & conn,EncryptionLevel encryptionLevel,PacketNumberSpace packetNumberSpace,folly::StringPiece name)135 FrameScheduler::Builder::Builder(
136 QuicConnectionStateBase& conn,
137 EncryptionLevel encryptionLevel,
138 PacketNumberSpace packetNumberSpace,
139 folly::StringPiece name)
140 : conn_(conn),
141 encryptionLevel_(encryptionLevel),
142 packetNumberSpace_(packetNumberSpace),
143 name_(std::move(name)) {}
144
streamFrames()145 FrameScheduler::Builder& FrameScheduler::Builder::streamFrames() {
146 streamFrameScheduler_ = true;
147 return *this;
148 }
149
ackFrames()150 FrameScheduler::Builder& FrameScheduler::Builder::ackFrames() {
151 ackScheduler_ = true;
152 return *this;
153 }
154
resetFrames()155 FrameScheduler::Builder& FrameScheduler::Builder::resetFrames() {
156 rstScheduler_ = true;
157 return *this;
158 }
159
windowUpdateFrames()160 FrameScheduler::Builder& FrameScheduler::Builder::windowUpdateFrames() {
161 windowUpdateScheduler_ = true;
162 return *this;
163 }
164
blockedFrames()165 FrameScheduler::Builder& FrameScheduler::Builder::blockedFrames() {
166 blockedScheduler_ = true;
167 return *this;
168 }
169
cryptoFrames()170 FrameScheduler::Builder& FrameScheduler::Builder::cryptoFrames() {
171 cryptoStreamScheduler_ = true;
172 return *this;
173 }
174
simpleFrames()175 FrameScheduler::Builder& FrameScheduler::Builder::simpleFrames() {
176 simpleFrameScheduler_ = true;
177 return *this;
178 }
179
pingFrames()180 FrameScheduler::Builder& FrameScheduler::Builder::pingFrames() {
181 pingFrameScheduler_ = true;
182 return *this;
183 }
184
datagramFrames()185 FrameScheduler::Builder& FrameScheduler::Builder::datagramFrames() {
186 datagramFrameScheduler_ = true;
187 return *this;
188 }
189
build()190 FrameScheduler FrameScheduler::Builder::build() && {
191 FrameScheduler scheduler(std::move(name_));
192 if (streamFrameScheduler_) {
193 scheduler.streamFrameScheduler_.emplace(StreamFrameScheduler(conn_));
194 }
195 if (ackScheduler_) {
196 scheduler.ackScheduler_.emplace(
197 AckScheduler(conn_, getAckState(conn_, packetNumberSpace_)));
198 }
199 if (rstScheduler_) {
200 scheduler.rstScheduler_.emplace(RstStreamScheduler(conn_));
201 }
202 if (windowUpdateScheduler_) {
203 scheduler.windowUpdateScheduler_.emplace(WindowUpdateScheduler(conn_));
204 }
205 if (blockedScheduler_) {
206 scheduler.blockedScheduler_.emplace(BlockedScheduler(conn_));
207 }
208 if (cryptoStreamScheduler_) {
209 scheduler.cryptoStreamScheduler_.emplace(CryptoStreamScheduler(
210 conn_, *getCryptoStream(*conn_.cryptoState, encryptionLevel_)));
211 }
212 if (simpleFrameScheduler_) {
213 scheduler.simpleFrameScheduler_.emplace(SimpleFrameScheduler(conn_));
214 }
215 if (pingFrameScheduler_) {
216 scheduler.pingFrameScheduler_.emplace(PingFrameScheduler(conn_));
217 }
218 if (datagramFrameScheduler_) {
219 scheduler.datagramFrameScheduler_.emplace(DatagramFrameScheduler(conn_));
220 }
221 return scheduler;
222 }
223
FrameScheduler(folly::StringPiece name)224 FrameScheduler::FrameScheduler(folly::StringPiece name) : name_(name) {}
225
scheduleFramesForPacket(PacketBuilderInterface && builder,uint32_t writableBytes)226 SchedulingResult FrameScheduler::scheduleFramesForPacket(
227 PacketBuilderInterface&& builder,
228 uint32_t writableBytes) {
229 builder.encodePacketHeader();
230 // We need to keep track of writable bytes after writing header.
231 writableBytes = writableBytes > builder.getHeaderBytes()
232 ? writableBytes - builder.getHeaderBytes()
233 : 0;
234 // We cannot return early if the writablyBytes drops to 0 here, since pure
235 // acks can skip writableBytes entirely.
236 PacketBuilderWrapper wrapper(builder, writableBytes);
237 bool cryptoDataWritten = false;
238 bool rstWritten = false;
239 if (cryptoStreamScheduler_ && cryptoStreamScheduler_->hasData()) {
240 cryptoDataWritten = cryptoStreamScheduler_->writeCryptoData(wrapper);
241 }
242 if (rstScheduler_ && rstScheduler_->hasPendingRsts()) {
243 rstWritten = rstScheduler_->writeRsts(wrapper);
244 }
245 // Long time ago we decided RST has higher priority than Acks.
246 if (ackScheduler_ && ackScheduler_->hasPendingAcks()) {
247 if (cryptoDataWritten || rstWritten) {
248 // If packet has non ack data, it is subject to congestion control. We
249 // need to use the wrapper/
250 ackScheduler_->writeNextAcks(wrapper);
251 } else {
252 // If we start with writing acks, we will let the ack scheduler write
253 // up to the full packet space. If the ack bytes exceeds the writable
254 // bytes, this will be a pure ack packet and it will skip congestion
255 // controller. Otherwise, we will give other schedulers an opportunity to
256 // write up to writable bytes.
257 ackScheduler_->writeNextAcks(builder);
258 }
259 }
260 if (windowUpdateScheduler_ &&
261 windowUpdateScheduler_->hasPendingWindowUpdates()) {
262 windowUpdateScheduler_->writeWindowUpdates(wrapper);
263 }
264 if (blockedScheduler_ && blockedScheduler_->hasPendingBlockedFrames()) {
265 blockedScheduler_->writeBlockedFrames(wrapper);
266 }
267 // Simple frames should be scheduled before stream frames and retx frames
268 // because those frames might fill up all available bytes for writing.
269 // If we are trying to send a PathChallenge frame it may be blocked by those,
270 // causing a connection to proceed slowly because of path validation rate
271 // limiting.
272 if (simpleFrameScheduler_ &&
273 simpleFrameScheduler_->hasPendingSimpleFrames()) {
274 simpleFrameScheduler_->writeSimpleFrames(wrapper);
275 }
276 if (pingFrameScheduler_ && pingFrameScheduler_->hasPingFrame()) {
277 pingFrameScheduler_->writePing(wrapper);
278 }
279 if (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) {
280 streamFrameScheduler_->writeStreams(wrapper);
281 }
282 if (datagramFrameScheduler_ &&
283 datagramFrameScheduler_->hasPendingDatagramFrames()) {
284 datagramFrameScheduler_->writeDatagramFrames(wrapper);
285 }
286
287 if (builder.hasFramesPending()) {
288 const LongHeader* longHeader = builder.getPacketHeader().asLong();
289 bool initialPacket =
290 longHeader && longHeader->getHeaderType() == LongHeader::Types::Initial;
291 if (initialPacket) {
292 // This is the initial packet, we need to fill er up.
293 while (wrapper.remainingSpaceInPkt() > 0) {
294 writeFrame(PaddingFrame(), builder);
295 }
296 }
297 }
298
299 return SchedulingResult(folly::none, std::move(builder).buildPacket());
300 }
301
hasData() const302 bool FrameScheduler::hasData() const {
303 return (ackScheduler_ && ackScheduler_->hasPendingAcks()) ||
304 hasImmediateData();
305 }
306
hasImmediateData() const307 bool FrameScheduler::hasImmediateData() const {
308 return (cryptoStreamScheduler_ && cryptoStreamScheduler_->hasData()) ||
309 (streamFrameScheduler_ && streamFrameScheduler_->hasPendingData()) ||
310 (rstScheduler_ && rstScheduler_->hasPendingRsts()) ||
311 (windowUpdateScheduler_ &&
312 windowUpdateScheduler_->hasPendingWindowUpdates()) ||
313 (blockedScheduler_ && blockedScheduler_->hasPendingBlockedFrames()) ||
314 (simpleFrameScheduler_ &&
315 simpleFrameScheduler_->hasPendingSimpleFrames()) ||
316 (pingFrameScheduler_ && pingFrameScheduler_->hasPingFrame()) ||
317 (datagramFrameScheduler_ &&
318 datagramFrameScheduler_->hasPendingDatagramFrames());
319 }
320
name() const321 folly::StringPiece FrameScheduler::name() const {
322 return name_;
323 }
324
writeStreamLossBuffers(PacketBuilderInterface & builder,QuicStreamState & stream)325 bool StreamFrameScheduler::writeStreamLossBuffers(
326 PacketBuilderInterface& builder,
327 QuicStreamState& stream) {
328 bool wroteStreamFrame = false;
329 for (auto buffer = stream.lossBuffer.cbegin();
330 buffer != stream.lossBuffer.cend();
331 ++buffer) {
332 auto bufferLen = buffer->data.chainLength();
333 auto dataLen = writeStreamFrameHeader(
334 builder,
335 stream.id,
336 buffer->offset,
337 bufferLen, // writeBufferLen -- only the len of the single buffer.
338 bufferLen, // flowControlLen -- not relevant, already flow controlled.
339 buffer->eof,
340 folly::none /* skipLenHint */);
341 if (dataLen) {
342 wroteStreamFrame = true;
343 writeStreamFrameData(builder, buffer->data, *dataLen);
344 VLOG(4) << "Wrote loss data for stream=" << stream.id
345 << " offset=" << buffer->offset << " bytes=" << *dataLen
346 << " fin=" << (buffer->eof && *dataLen == bufferLen) << " "
347 << conn_;
348 } else {
349 // Either we filled the packet or ran out of data for this stream (EOF?)
350 break;
351 }
352 }
353 return wroteStreamFrame;
354 }
355
StreamFrameScheduler(QuicConnectionStateBase & conn)356 StreamFrameScheduler::StreamFrameScheduler(QuicConnectionStateBase& conn)
357 : conn_(conn) {}
358
writeSingleStream(PacketBuilderInterface & builder,QuicStreamState & stream,uint64_t & connWritableBytes)359 bool StreamFrameScheduler::writeSingleStream(
360 PacketBuilderInterface& builder,
361 QuicStreamState& stream,
362 uint64_t& connWritableBytes) {
363 if (!stream.lossBuffer.empty()) {
364 if (!writeStreamLossBuffers(builder, stream)) {
365 return false;
366 }
367 }
368 if (stream.hasWritableData() && connWritableBytes > 0) {
369 if (!writeStreamFrame(builder, stream, connWritableBytes)) {
370 return false;
371 }
372 }
373 return true;
374 }
375
writeStreamsHelper(PacketBuilderInterface & builder,const std::set<StreamId> & writableStreams,StreamId nextScheduledStream,uint64_t & connWritableBytes,bool streamPerPacket)376 StreamId StreamFrameScheduler::writeStreamsHelper(
377 PacketBuilderInterface& builder,
378 const std::set<StreamId>& writableStreams,
379 StreamId nextScheduledStream,
380 uint64_t& connWritableBytes,
381 bool streamPerPacket) {
382 MiddleStartingIterationWrapper wrapper(writableStreams, nextScheduledStream);
383 auto writableStreamItr = wrapper.cbegin();
384 // This will write the stream frames in a round robin fashion ordered by
385 // stream id. The iterator will wrap around the collection at the end, and we
386 // keep track of the value at the next iteration. This allows us to start
387 // writing at the next stream when building the next packet.
388 while (writableStreamItr != wrapper.cend()) {
389 auto stream = conn_.streamManager->findStream(*writableStreamItr);
390 CHECK(stream);
391 if (!writeSingleStream(builder, *stream, connWritableBytes)) {
392 break;
393 }
394 writableStreamItr++;
395 if (streamPerPacket) {
396 break;
397 }
398 }
399 return *writableStreamItr;
400 }
401
writeStreamsHelper(PacketBuilderInterface & builder,PriorityQueue & writableStreams,uint64_t & connWritableBytes,bool streamPerPacket)402 void StreamFrameScheduler::writeStreamsHelper(
403 PacketBuilderInterface& builder,
404 PriorityQueue& writableStreams,
405 uint64_t& connWritableBytes,
406 bool streamPerPacket) {
407 // Fill a packet with non-control stream data, in priority order
408 for (size_t index = 0; index < writableStreams.levels.size() &&
409 builder.remainingSpaceInPkt() > 0;
410 index++) {
411 PriorityQueue::Level& level = writableStreams.levels[index];
412 if (level.streams.empty()) {
413 // No data here, keep going
414 continue;
415 }
416 if (level.incremental) {
417 // Round robin the streams at this level
418 MiddleStartingIterationWrapper wrapper(level.streams, level.next);
419 auto writableStreamItr = wrapper.cbegin();
420 while (writableStreamItr != wrapper.cend()) {
421 auto stream = conn_.streamManager->findStream(*writableStreamItr);
422 CHECK(stream);
423 if (!writeSingleStream(builder, *stream, connWritableBytes)) {
424 break;
425 }
426 writableStreamItr++;
427 if (streamPerPacket) {
428 level.next = writableStreamItr.rawIterator();
429 return;
430 }
431 }
432 level.next = writableStreamItr.rawIterator();
433 } else {
434 // walk the sequential streams in order until we run out of space
435 for (auto streamIt = level.streams.begin();
436 streamIt != level.streams.end() && connWritableBytes > 0;
437 ++streamIt) {
438 auto stream = conn_.streamManager->findStream(*streamIt);
439 CHECK(stream);
440 if (!writeSingleStream(builder, *stream, connWritableBytes)) {
441 break;
442 }
443 if (streamPerPacket) {
444 return;
445 }
446 }
447 }
448 }
449 }
450
writeStreams(PacketBuilderInterface & builder)451 void StreamFrameScheduler::writeStreams(PacketBuilderInterface& builder) {
452 DCHECK(conn_.streamManager->hasWritable());
453 uint64_t connWritableBytes = getSendConnFlowControlBytesWire(conn_);
454 // Write the control streams first as a naive binary priority mechanism.
455 const auto& writableControlStreams =
456 conn_.streamManager->writableControlStreams();
457 if (!writableControlStreams.empty()) {
458 conn_.schedulingState.nextScheduledControlStream = writeStreamsHelper(
459 builder,
460 writableControlStreams,
461 conn_.schedulingState.nextScheduledControlStream,
462 connWritableBytes,
463 conn_.transportSettings.streamFramePerPacket);
464 }
465 auto& writableStreams = conn_.streamManager->writableStreams();
466 if (!writableStreams.empty()) {
467 writeStreamsHelper(
468 builder,
469 writableStreams,
470 connWritableBytes,
471 conn_.transportSettings.streamFramePerPacket);
472 }
473 } // namespace quic
474
hasPendingData() const475 bool StreamFrameScheduler::hasPendingData() const {
476 return conn_.streamManager->hasNonDSRWritable() &&
477 getSendConnFlowControlBytesWire(conn_) > 0;
478 }
479
writeStreamFrame(PacketBuilderInterface & builder,QuicStreamState & stream,uint64_t & connWritableBytes)480 bool StreamFrameScheduler::writeStreamFrame(
481 PacketBuilderInterface& builder,
482 QuicStreamState& stream,
483 uint64_t& connWritableBytes) {
484 if (builder.remainingSpaceInPkt() == 0) {
485 return false;
486 }
487
488 // hasWritableData is the condition which has to be satisfied for the
489 // stream to be in writableList
490 CHECK(stream.hasWritableData());
491
492 uint64_t flowControlLen =
493 std::min(getSendStreamFlowControlBytesWire(stream), connWritableBytes);
494 uint64_t bufferLen = stream.writeBuffer.chainLength();
495 // We can't write FIN directly from here if writeBufMeta has pending bytes to
496 // send.
497 bool canWriteFin = stream.finalWriteOffset.has_value() &&
498 bufferLen <= flowControlLen && stream.writeBufMeta.length == 0;
499 auto writeOffset = stream.currentWriteOffset;
500 if (canWriteFin && stream.writeBuffer.empty()) {
501 // If we are writing FIN only from here, do not use currentWriteOffset in
502 // case some bufMeta has been sent before.
503 writeOffset = stream.finalWriteOffset.value();
504 }
505 auto dataLen = writeStreamFrameHeader(
506 builder,
507 stream.id,
508 writeOffset,
509 bufferLen,
510 flowControlLen,
511 canWriteFin,
512 folly::none /* skipLenHint */);
513 if (!dataLen) {
514 return false;
515 }
516 writeStreamFrameData(builder, stream.writeBuffer, *dataLen);
517 VLOG(4) << "Wrote stream frame stream=" << stream.id
518 << " offset=" << stream.currentWriteOffset
519 << " bytesWritten=" << *dataLen
520 << " finWritten=" << (canWriteFin && *dataLen == bufferLen) << " "
521 << conn_;
522 connWritableBytes -= dataLen.value();
523 return true;
524 }
525
AckScheduler(const QuicConnectionStateBase & conn,const AckState & ackState)526 AckScheduler::AckScheduler(
527 const QuicConnectionStateBase& conn,
528 const AckState& ackState)
529 : conn_(conn), ackState_(ackState) {}
530
writeNextAcks(PacketBuilderInterface & builder)531 folly::Optional<PacketNum> AckScheduler::writeNextAcks(
532 PacketBuilderInterface& builder) {
533 // Use default ack delay for long headers. Usually long headers are sent
534 // before crypto negotiation, so the peer might not know about the ack delay
535 // exponent yet, so we use the default.
536 uint8_t ackDelayExponentToUse =
537 builder.getPacketHeader().getHeaderForm() == HeaderForm::Long
538 ? kDefaultAckDelayExponent
539 : conn_.transportSettings.ackDelayExponent;
540 auto largestAckedPacketNum = *largestAckToSend(ackState_);
541 auto ackingTime = Clock::now();
542 DCHECK(ackState_.largestRecvdPacketTime.hasValue())
543 << "Missing received time for the largest acked packet";
544 // assuming that we're going to ack the largest received with hightest pri
545 auto receivedTime = *ackState_.largestRecvdPacketTime;
546 std::chrono::microseconds ackDelay =
547 (ackingTime > receivedTime
548 ? std::chrono::duration_cast<std::chrono::microseconds>(
549 ackingTime - receivedTime)
550 : 0us);
551 AckFrameMetaData meta(ackState_.acks, ackDelay, ackDelayExponentToUse);
552 auto ackWriteResult = writeAckFrame(meta, builder);
553 if (!ackWriteResult) {
554 return folly::none;
555 }
556 return largestAckedPacketNum;
557 }
558
hasPendingAcks() const559 bool AckScheduler::hasPendingAcks() const {
560 return hasAcksToSchedule(ackState_);
561 }
562
RstStreamScheduler(const QuicConnectionStateBase & conn)563 RstStreamScheduler::RstStreamScheduler(const QuicConnectionStateBase& conn)
564 : conn_(conn) {}
565
hasPendingRsts() const566 bool RstStreamScheduler::hasPendingRsts() const {
567 return !conn_.pendingEvents.resets.empty();
568 }
569
writeRsts(PacketBuilderInterface & builder)570 bool RstStreamScheduler::writeRsts(PacketBuilderInterface& builder) {
571 bool rstWritten = false;
572 for (const auto& resetStream : conn_.pendingEvents.resets) {
573 auto bytesWritten = writeFrame(resetStream.second, builder);
574 if (!bytesWritten) {
575 break;
576 }
577 rstWritten = true;
578 }
579 return rstWritten;
580 }
581
SimpleFrameScheduler(const QuicConnectionStateBase & conn)582 SimpleFrameScheduler::SimpleFrameScheduler(const QuicConnectionStateBase& conn)
583 : conn_(conn) {}
584
hasPendingSimpleFrames() const585 bool SimpleFrameScheduler::hasPendingSimpleFrames() const {
586 return conn_.pendingEvents.pathChallenge ||
587 !conn_.pendingEvents.frames.empty();
588 }
589
writeSimpleFrames(PacketBuilderInterface & builder)590 bool SimpleFrameScheduler::writeSimpleFrames(PacketBuilderInterface& builder) {
591 auto& pathChallenge = conn_.pendingEvents.pathChallenge;
592 if (pathChallenge &&
593 !writeSimpleFrame(QuicSimpleFrame(*pathChallenge), builder)) {
594 return false;
595 }
596
597 bool framesWritten = false;
598 for (auto& frame : conn_.pendingEvents.frames) {
599 auto bytesWritten = writeSimpleFrame(QuicSimpleFrame(frame), builder);
600 if (!bytesWritten) {
601 break;
602 }
603 framesWritten = true;
604 }
605 return framesWritten;
606 }
607
PingFrameScheduler(const QuicConnectionStateBase & conn)608 PingFrameScheduler::PingFrameScheduler(const QuicConnectionStateBase& conn)
609 : conn_(conn) {}
610
hasPingFrame() const611 bool PingFrameScheduler::hasPingFrame() const {
612 return conn_.pendingEvents.sendPing;
613 }
614
writePing(PacketBuilderInterface & builder)615 bool PingFrameScheduler::writePing(PacketBuilderInterface& builder) {
616 return 0 != writeFrame(PingFrame(), builder);
617 }
618
DatagramFrameScheduler(QuicConnectionStateBase & conn)619 DatagramFrameScheduler::DatagramFrameScheduler(QuicConnectionStateBase& conn)
620 : conn_(conn) {}
621
hasPendingDatagramFrames() const622 bool DatagramFrameScheduler::hasPendingDatagramFrames() const {
623 return !conn_.datagramState.writeBuffer.empty();
624 }
625
writeDatagramFrames(PacketBuilderInterface & builder)626 bool DatagramFrameScheduler::writeDatagramFrames(
627 PacketBuilderInterface& builder) {
628 bool sent = false;
629 for (size_t i = 0; i <= conn_.datagramState.writeBuffer.size(); ++i) {
630 auto& payload = conn_.datagramState.writeBuffer.front();
631 auto datagramFrame = DatagramFrame(payload.chainLength(), payload.move());
632 if (writeFrame(datagramFrame, builder) > 0) {
633 conn_.datagramState.writeBuffer.pop_front();
634 sent = true;
635 } else {
636 payload = datagramFrame.data.move();
637 break;
638 }
639 if (conn_.transportSettings.datagramConfig.framePerPacket) {
640 break;
641 }
642 }
643 return sent;
644 }
645
WindowUpdateScheduler(const QuicConnectionStateBase & conn)646 WindowUpdateScheduler::WindowUpdateScheduler(
647 const QuicConnectionStateBase& conn)
648 : conn_(conn) {}
649
hasPendingWindowUpdates() const650 bool WindowUpdateScheduler::hasPendingWindowUpdates() const {
651 return conn_.streamManager->hasWindowUpdates() ||
652 conn_.pendingEvents.connWindowUpdate;
653 }
654
writeWindowUpdates(PacketBuilderInterface & builder)655 void WindowUpdateScheduler::writeWindowUpdates(
656 PacketBuilderInterface& builder) {
657 if (conn_.pendingEvents.connWindowUpdate) {
658 auto maxDataFrame = generateMaxDataFrame(conn_);
659 auto maximumData = maxDataFrame.maximumData;
660 auto bytes = writeFrame(std::move(maxDataFrame), builder);
661 if (bytes) {
662 VLOG(4) << "Wrote max_data=" << maximumData << " " << conn_;
663 }
664 }
665 for (const auto& windowUpdateStream : conn_.streamManager->windowUpdates()) {
666 auto stream = conn_.streamManager->findStream(windowUpdateStream);
667 if (!stream) {
668 continue;
669 }
670 auto maxStreamDataFrame = generateMaxStreamDataFrame(*stream);
671 auto maximumData = maxStreamDataFrame.maximumData;
672 auto bytes = writeFrame(std::move(maxStreamDataFrame), builder);
673 if (!bytes) {
674 break;
675 }
676 VLOG(4) << "Wrote max_stream_data stream=" << stream->id
677 << " maximumData=" << maximumData << " " << conn_;
678 }
679 }
680
BlockedScheduler(const QuicConnectionStateBase & conn)681 BlockedScheduler::BlockedScheduler(const QuicConnectionStateBase& conn)
682 : conn_(conn) {}
683
hasPendingBlockedFrames() const684 bool BlockedScheduler::hasPendingBlockedFrames() const {
685 return !conn_.streamManager->blockedStreams().empty() ||
686 conn_.pendingEvents.sendDataBlocked;
687 }
688
writeBlockedFrames(PacketBuilderInterface & builder)689 void BlockedScheduler::writeBlockedFrames(PacketBuilderInterface& builder) {
690 if (conn_.pendingEvents.sendDataBlocked) {
691 // Connection is write blocked due to connection level flow control.
692 DataBlockedFrame blockedFrame(
693 conn_.flowControlState.peerAdvertisedMaxOffset);
694 auto result = writeFrame(blockedFrame, builder);
695 if (!result) {
696 // If there is not enough room to write data blocked frame in the
697 // curretn packet, we won't be able to write stream blocked frames either
698 // so just return.
699 return;
700 }
701 }
702 for (const auto& blockedStream : conn_.streamManager->blockedStreams()) {
703 auto bytesWritten = writeFrame(blockedStream.second, builder);
704 if (!bytesWritten) {
705 break;
706 }
707 }
708 }
709
CryptoStreamScheduler(const QuicConnectionStateBase & conn,const QuicCryptoStream & cryptoStream)710 CryptoStreamScheduler::CryptoStreamScheduler(
711 const QuicConnectionStateBase& conn,
712 const QuicCryptoStream& cryptoStream)
713 : conn_(conn), cryptoStream_(cryptoStream) {}
714
writeCryptoData(PacketBuilderInterface & builder)715 bool CryptoStreamScheduler::writeCryptoData(PacketBuilderInterface& builder) {
716 bool cryptoDataWritten = false;
717 uint64_t writableData =
718 folly::to<uint64_t>(cryptoStream_.writeBuffer.chainLength());
719 // We use the crypto scheduler to reschedule the retransmissions of the
720 // crypto streams so that we know that retransmissions of the crypto data
721 // will always take precedence over the crypto data.
722 for (const auto& buffer : cryptoStream_.lossBuffer) {
723 auto res = writeCryptoFrame(buffer.offset, buffer.data, builder);
724 if (!res) {
725 return cryptoDataWritten;
726 }
727 VLOG(4) << "Wrote retransmitted crypto"
728 << " offset=" << buffer.offset << " bytes=" << res->len << " "
729 << conn_;
730 cryptoDataWritten = true;
731 }
732
733 if (writableData != 0) {
734 auto res = writeCryptoFrame(
735 cryptoStream_.currentWriteOffset, cryptoStream_.writeBuffer, builder);
736 if (res) {
737 VLOG(4) << "Wrote crypto frame"
738 << " offset=" << cryptoStream_.currentWriteOffset
739 << " bytesWritten=" << res->len << " " << conn_;
740 cryptoDataWritten = true;
741 }
742 }
743 return cryptoDataWritten;
744 }
745
hasData() const746 bool CryptoStreamScheduler::hasData() const {
747 return !cryptoStream_.writeBuffer.empty() ||
748 !cryptoStream_.lossBuffer.empty();
749 }
750
CloningScheduler(FrameScheduler & scheduler,QuicConnectionStateBase & conn,const folly::StringPiece name,uint64_t cipherOverhead)751 CloningScheduler::CloningScheduler(
752 FrameScheduler& scheduler,
753 QuicConnectionStateBase& conn,
754 const folly::StringPiece name,
755 uint64_t cipherOverhead)
756 : frameScheduler_(scheduler),
757 conn_(conn),
758 name_(name),
759 cipherOverhead_(cipherOverhead) {}
760
hasData() const761 bool CloningScheduler::hasData() const {
762 // TODO: I'm not completely convinced d6d.outstandingProbes has been updated
763 // correctly.
764 return frameScheduler_.hasData() ||
765 conn_.outstandings.numOutstanding() >
766 conn_.d6d.outstandingProbes + conn_.outstandings.dsrCount;
767 }
768
scheduleFramesForPacket(PacketBuilderInterface && builder,uint32_t writableBytes)769 SchedulingResult CloningScheduler::scheduleFramesForPacket(
770 PacketBuilderInterface&& builder,
771 uint32_t writableBytes) {
772 // The writableBytes in this function shouldn't be limited by cwnd, since
773 // we only use CloningScheduler for the cases that we want to bypass cwnd for
774 // now.
775 if (frameScheduler_.hasData()) {
776 // Note that there is a possibility that we end up writing nothing here. But
777 // if frameScheduler_ hasData() to write, we shouldn't invoke the cloning
778 // path if the write fails.
779 return frameScheduler_.scheduleFramesForPacket(
780 std::move(builder), writableBytes);
781 }
782 // TODO: We can avoid the copy & rebuild of the header by creating an
783 // independent header builder.
784 auto header = builder.getPacketHeader();
785 std::move(builder).releaseOutputBuffer();
786 // Look for an outstanding packet that's no larger than the writableBytes
787 for (auto& outstandingPacket : conn_.outstandings.packets) {
788 if (outstandingPacket.declaredLost ||
789 outstandingPacket.metadata.isD6DProbe ||
790 outstandingPacket.isDSRPacket) {
791 continue;
792 }
793 auto opPnSpace = outstandingPacket.packet.header.getPacketNumberSpace();
794 // Reusing the RegularQuicPacketBuilder throughout loop bodies will lead to
795 // frames belong to different original packets being written into the same
796 // clone packet. So re-create a RegularQuicPacketBuilder every time.
797 // TODO: We can avoid the copy & rebuild of the header by creating an
798 // independent header builder.
799 auto builderPnSpace = builder.getPacketHeader().getPacketNumberSpace();
800 if (opPnSpace != builderPnSpace) {
801 continue;
802 }
803 size_t prevSize = 0;
804 if (conn_.transportSettings.dataPathType ==
805 DataPathType::ContinuousMemory) {
806 ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
807 prevSize = scopedBufAccessor.buf()->length();
808 }
809 // Reusing the same builder throughout loop bodies will lead to frames
810 // belong to different original packets being written into the same clone
811 // packet. So re-create a builder every time.
812 std::unique_ptr<PacketBuilderInterface> internalBuilder;
813 if (conn_.transportSettings.dataPathType == DataPathType::ChainedMemory) {
814 internalBuilder = std::make_unique<RegularQuicPacketBuilder>(
815 conn_.udpSendPacketLen,
816 header,
817 getAckState(conn_, builderPnSpace).largestAckedByPeer.value_or(0));
818 } else {
819 CHECK(conn_.bufAccessor && conn_.bufAccessor->ownsBuffer());
820 internalBuilder = std::make_unique<InplaceQuicPacketBuilder>(
821 *conn_.bufAccessor,
822 conn_.udpSendPacketLen,
823 header,
824 getAckState(conn_, builderPnSpace).largestAckedByPeer.value_or(0));
825 }
826 // If the packet is already a clone that has been processed, we don't clone
827 // it again.
828 if (outstandingPacket.associatedEvent &&
829 conn_.outstandings.packetEvents.count(
830 *outstandingPacket.associatedEvent) == 0) {
831 continue;
832 }
833 // I think this only fail if udpSendPacketLen somehow shrinks in the middle
834 // of a connection.
835 if (outstandingPacket.metadata.encodedSize >
836 writableBytes + cipherOverhead_) {
837 continue;
838 }
839
840 internalBuilder->accountForCipherOverhead(cipherOverhead_);
841 internalBuilder->encodePacketHeader();
842 PacketRebuilder rebuilder(*internalBuilder, conn_);
843
844 // TODO: It's possible we write out a packet that's larger than the packet
845 // size limit. For example, when the packet sequence number has advanced to
846 // a point where we need more bytes to encoded it than that of the original
847 // packet. In that case, if the original packet is already at the packet
848 // size limit, we will generate a packet larger than the limit. We can
849 // either ignore the problem, hoping the packet will be able to travel the
850 // network just fine; Or we can throw away the built packet and send a ping.
851
852 // Rebuilder will write the rest of frames
853 auto rebuildResult = rebuilder.rebuildFromPacket(outstandingPacket);
854 if (rebuildResult) {
855 return SchedulingResult(
856 std::move(rebuildResult), std::move(*internalBuilder).buildPacket());
857 } else if (
858 conn_.transportSettings.dataPathType ==
859 DataPathType::ContinuousMemory) {
860 // When we use Inplace packet building and reuse the write buffer, even if
861 // the packet rebuild has failed, there might be some bytes already
862 // written into the buffer and the buffer tail pointer has already moved.
863 // We need to roll back the tail pointer to the position before the packet
864 // building to exclude those bytes. Otherwise these bytes will be sitting
865 // in between legit packets inside the buffer and will either cause errors
866 // further down the write path, or be sent out and then dropped at peer
867 // when peer fail to parse them.
868 internalBuilder.reset();
869 CHECK(conn_.bufAccessor && conn_.bufAccessor->ownsBuffer());
870 ScopedBufAccessor scopedBufAccessor(conn_.bufAccessor);
871 auto& buf = scopedBufAccessor.buf();
872 buf->trimEnd(buf->length() - prevSize);
873 }
874 }
875 return SchedulingResult(folly::none, folly::none);
876 }
877
name() const878 folly::StringPiece CloningScheduler::name() const {
879 return name_;
880 }
881
D6DProbeScheduler(QuicConnectionStateBase & conn,folly::StringPiece name,uint64_t cipherOverhead,uint32_t probeSize)882 D6DProbeScheduler::D6DProbeScheduler(
883 QuicConnectionStateBase& conn,
884 folly::StringPiece name,
885 uint64_t cipherOverhead,
886 uint32_t probeSize)
887 : conn_(conn),
888 name_(name),
889 cipherOverhead_(cipherOverhead),
890 probeSize_(probeSize) {}
891
892 /**
893 * This scheduler always has data since all it does is send PING with PADDINGs
894 */
hasData() const895 bool D6DProbeScheduler::hasData() const {
896 return !probeSent_;
897 }
898
899 /**
900 * D6DProbeScheduler ignores writableBytes because it does not respect
901 * congestion control. The reasons it doesn't are that
902 * - d6d probes are occasional burst of bytes in a single packet
903 * - no rtx needed when probe lost
904 */
scheduleFramesForPacket(PacketBuilderInterface && builder,uint32_t)905 SchedulingResult D6DProbeScheduler::scheduleFramesForPacket(
906 PacketBuilderInterface&& builder,
907 uint32_t /* writableBytes */) {
908 builder.encodePacketHeader();
909 int res = writeFrame(PingFrame(), builder);
910 CHECK_GT(res, 0) << __func__ << " "
911 << "failed to write ping frame"
912 << "remainingBytes: " << builder.remainingSpaceInPkt();
913 CHECK(builder.canBuildPacket()) << __func__ << " "
914 << "inner builder cannot build packet";
915
916 auto pingOnlyPacket = std::move(builder).buildPacket();
917
918 std::unique_ptr<WrapperPacketBuilderInterface> sizeEnforcedBuilder;
919 if (conn_.transportSettings.dataPathType == DataPathType::ChainedMemory) {
920 sizeEnforcedBuilder = std::make_unique<RegularSizeEnforcedPacketBuilder>(
921 std::move(pingOnlyPacket), probeSize_, cipherOverhead_);
922 } else {
923 CHECK(conn_.bufAccessor && conn_.bufAccessor->ownsBuffer());
924 sizeEnforcedBuilder = std::make_unique<InplaceSizeEnforcedPacketBuilder>(
925 *conn_.bufAccessor,
926 std::move(pingOnlyPacket),
927 probeSize_,
928 cipherOverhead_);
929 }
930 CHECK(sizeEnforcedBuilder->canBuildPacket())
931 << __func__ << " "
932 << "sizeEnforcedBuilder cannot build packet";
933
934 auto resultPacket = std::move(*sizeEnforcedBuilder).buildPacket();
935
936 auto resultPacketSize = resultPacket.header->computeChainDataLength() +
937 resultPacket.body->computeChainDataLength() + cipherOverhead_;
938 CHECK_EQ(resultPacketSize, probeSize_)
939 << __func__ << " "
940 << "result packet does not have enforced size,"
941 << " expecting: " << probeSize_ << " getting: " << resultPacketSize;
942
943 VLOG_IF(4, conn_.d6d.lastProbe.has_value())
944 << __func__ << " "
945 << "invalidating old non-acked d6d probe,"
946 << " seq: " << conn_.d6d.lastProbe->packetNum
947 << " packet size: " << conn_.d6d.lastProbe->packetSize;
948
949 conn_.d6d.lastProbe = D6DProbePacket(
950 resultPacket.packet.header.getPacketSequenceNum(), probeSize_);
951
952 probeSent_ = true;
953 return SchedulingResult(folly::none, std::move(resultPacket));
954 }
955
name() const956 folly::StringPiece D6DProbeScheduler::name() const {
957 return name_;
958 }
959
960 } // namespace quic
961