1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 *
7 */
8
9 #pragma once
10
11 #include <folly/container/F14Map.h>
12 #include <quic/QuicConstants.h>
13 #include <quic/codec/Types.h>
14 #include <quic/common/SmallVec.h>
15 #include <quic/dsr/DSRPacketizationRequestSender.h>
16 #include <quic/state/QuicPriorityQueue.h>
17
18 namespace quic {
19
20 /**
21 * A buffer representation without the actual data. This is part of the public
22 * facing interface.
23 *
24 * This is experimental.
25 */
26 struct BufferMeta {
27 size_t length;
28
BufferMetaBufferMeta29 explicit BufferMeta(size_t lengthIn) : length(lengthIn) {}
30 };
31
32 /**
33 * A write buffer representation without the actual data. This is used for
34 * write buffer management in a stream.
35 *
36 * This is experimental.
37 */
38 struct WriteBufferMeta {
39 size_t length{0};
40 size_t offset{0};
41 bool eof{false};
42
43 WriteBufferMeta() = default;
44
45 struct Builder {
setLengthWriteBufferMeta::Builder46 Builder& setLength(size_t lengthIn) {
47 length_ = lengthIn;
48 return *this;
49 }
50
setOffsetWriteBufferMeta::Builder51 Builder& setOffset(size_t offsetIn) {
52 offset_ = offsetIn;
53 return *this;
54 }
55
setEOFWriteBufferMeta::Builder56 Builder& setEOF(bool val) {
57 eof_ = val;
58 return *this;
59 }
60
buildWriteBufferMeta::Builder61 WriteBufferMeta build() {
62 return WriteBufferMeta(length_, offset_, eof_);
63 }
64
65 private:
66 size_t length_{0};
67 size_t offset_{0};
68 bool eof_{false};
69 };
70
splitWriteBufferMeta71 WriteBufferMeta split(size_t splitLen) {
72 CHECK_GE(length, splitLen);
73 auto splitEof = splitLen == length && eof;
74 WriteBufferMeta splitOf(splitLen, offset, splitEof);
75 offset += splitLen;
76 length -= splitLen;
77 return splitOf;
78 }
79
80 private:
WriteBufferMetaWriteBufferMeta81 explicit WriteBufferMeta(size_t lengthIn, size_t offsetIn, bool eofIn)
82 : length(lengthIn), offset(offsetIn), eof(eofIn) {}
83 };
84
85 struct StreamBuffer {
86 BufQueue data;
87 uint64_t offset;
88 bool eof{false};
89
90 StreamBuffer(Buf dataIn, uint64_t offsetIn, bool eofIn = false) noexcept
dataStreamBuffer91 : data(std::move(dataIn)), offset(offsetIn), eof(eofIn) {}
92
93 StreamBuffer(StreamBuffer&& other) = default;
94 StreamBuffer& operator=(StreamBuffer&& other) = default;
95 };
96
97 struct QuicStreamLike {
98 QuicStreamLike() = default;
99
100 QuicStreamLike(QuicStreamLike&&) = default;
101
102 virtual ~QuicStreamLike() = default;
103
104 // List of bytes that have been read and buffered. We need to buffer
105 // bytes in case we get bytes out of order.
106 std::deque<StreamBuffer> readBuffer;
107
108 // List of bytes that have been written to the QUIC layer.
109 BufQueue writeBuffer{};
110
111 // Stores a map of offset:buffers which have been written to the socket and
112 // are currently un-acked. Each one represents one StreamFrame that was
113 // written. We need to buffer these because these might be retransmitted in
114 // the future. These are associated with the starting offset of the buffer.
115 // Note: the offset in the StreamBuffer itself can be >= the offset on which
116 // it is keyed due to partial reliability - when data is skipped the offset
117 // in the StreamBuffer may be incremented, but the keyed offset must remain
118 // the same so it can be removed from the buffer on ACK.
119 folly::F14FastMap<uint64_t, std::unique_ptr<StreamBuffer>>
120 retransmissionBuffer;
121
122 // Tracks intervals which we have received ACKs for. E.g. in the case of all
123 // data being acked this would contain one internval from 0 -> the largest
124 // offseet ACKed. This allows us to track which delivery callbacks can be
125 // called.
126 template <class T>
127 using IntervalSetVec = SmallVec<T, 32, uint16_t>;
128 using AckedIntervals = IntervalSet<uint64_t, 1, IntervalSetVec>;
129 AckedIntervals ackedIntervals;
130
131 // Stores a list of buffers which have been marked as loss by loss detector.
132 // Each one represents one StreamFrame that was written.
133 std::deque<StreamBuffer> lossBuffer;
134
135 // Current offset of the start bytes in the write buffer.
136 // This changes when we pop stuff off the writeBuffer.
137 // In a non-DSR stream, when we are finished writing out all the bytes until
138 // FIN, this will be one greater than finalWriteOffset.
139 // When DSR is used, this still points to the starting bytes in the write
140 // buffer. Its value won't change with WriteBufferMetas are appended and sent
141 // for a stream.
142 uint64_t currentWriteOffset{0};
143
144 // the minimum offset requires retransmit
145 // N.B. used in QUIC partial reliability
146 uint64_t minimumRetransmittableOffset{0};
147
148 // Offset of the next expected bytes that we need to read from
149 // the read buffer.
150 uint64_t currentReadOffset{0};
151
152 // the smallest data offset that we expect the peer to send.
153 // N.B. used in QUIC partial reliability
154 uint64_t currentReceiveOffset{0};
155
156 // Maximum byte offset observed on the stream.
157 uint64_t maxOffsetObserved{0};
158
159 // If an EOF is observed on the stream, the position of the EOF. It could be
160 // either from FIN or RST. Right now we use one value to represent both FIN
161 // and RST. We may split write EOF into two values in the future.
162 // Read side eof offset.
163 folly::Optional<uint64_t> finalReadOffset;
164
165 // Current cumulative number of packets sent for this stream. It only counts
166 // egress packets that contains a *new* STREAM frame for this stream.
167 uint64_t numPacketsTxWithNewData{0};
168
169 /*
170 * Either insert a new entry into the loss buffer, or merge the buffer with
171 * an existing entry.
172 */
insertIntoLossBufferQuicStreamLike173 void insertIntoLossBuffer(std::unique_ptr<StreamBuffer> buf) {
174 // We assume here that we won't try to insert an overlapping buffer, as
175 // that should never happen in the loss buffer.
176 auto lossItr = std::upper_bound(
177 lossBuffer.begin(),
178 lossBuffer.end(),
179 buf->offset,
180 [](auto offset, const auto& buffer) { return offset < buffer.offset; });
181 if (!lossBuffer.empty() && lossItr != lossBuffer.begin() &&
182 std::prev(lossItr)->offset + std::prev(lossItr)->data.chainLength() ==
183 buf->offset) {
184 std::prev(lossItr)->data.append(buf->data.move());
185 std::prev(lossItr)->eof = buf->eof;
186 } else {
187 lossBuffer.insert(lossItr, std::move(*buf));
188 }
189 }
190 };
191
192 struct QuicConnectionStateBase;
193
194 enum class StreamSendState : uint8_t { Open, ResetSent, Closed, Invalid };
195
196 enum class StreamRecvState : uint8_t { Open, Closed, Invalid };
197
streamStateToString(StreamSendState state)198 inline folly::StringPiece streamStateToString(StreamSendState state) {
199 switch (state) {
200 case StreamSendState::Open:
201 return "Open";
202 case StreamSendState::ResetSent:
203 return "ResetSent";
204 case StreamSendState::Closed:
205 return "Closed";
206 case StreamSendState::Invalid:
207 return "Invalid";
208 }
209 return "Unknown";
210 }
211
streamStateToString(StreamRecvState state)212 inline folly::StringPiece streamStateToString(StreamRecvState state) {
213 switch (state) {
214 case StreamRecvState::Open:
215 return "Open";
216 case StreamRecvState::Closed:
217 return "Closed";
218 case StreamRecvState::Invalid:
219 return "Invalid";
220 }
221 return "Unknown";
222 }
223
224 struct QuicStreamState : public QuicStreamLike {
225 virtual ~QuicStreamState() override = default;
226
227 QuicStreamState(StreamId id, QuicConnectionStateBase& conn);
228
229 QuicStreamState(QuicStreamState&&) = default;
230
231 /**
232 * Constructor to migrate QuicStreamState to another
233 * QuicConnectionStateBase.
234 */
QuicStreamStateQuicStreamState235 QuicStreamState(QuicConnectionStateBase& connIn, QuicStreamState&& other)
236 : QuicStreamLike(std::move(other)), conn(connIn), id(other.id) {
237 // QuicStreamState fields
238 finalWriteOffset = other.finalWriteOffset;
239 flowControlState = other.flowControlState;
240 streamReadError = other.streamReadError;
241 streamWriteError = other.streamWriteError;
242 sendState = other.sendState;
243 recvState = other.recvState;
244 isControl = other.isControl;
245 lastHolbTime = other.lastHolbTime;
246 totalHolbTime = other.totalHolbTime;
247 holbCount = other.holbCount;
248 priority = other.priority;
249 dsrSender = std::move(other.dsrSender);
250 writeBufMeta = other.writeBufMeta;
251 retransmissionBufMetas = std::move(other.retransmissionBufMetas);
252 lossBufMetas = std::move(other.lossBufMetas);
253 }
254
255 // Connection that this stream is associated with.
256 QuicConnectionStateBase& conn;
257
258 // Stream id of the connection.
259 StreamId id;
260
261 // Write side eof offset. This represents only the final FIN offset.
262 folly::Optional<uint64_t> finalWriteOffset;
263
264 struct StreamFlowControlState {
265 uint64_t windowSize{0};
266 uint64_t advertisedMaxOffset{0};
267 uint64_t peerAdvertisedMaxOffset{0};
268 // Time at which the last flow control update was sent by the transport.
269 folly::Optional<TimePoint> timeOfLastFlowControlUpdate;
270 };
271
272 StreamFlowControlState flowControlState;
273
274 // Stream level read error occured.
275 folly::Optional<QuicErrorCode> streamReadError;
276 // Stream level write error occured.
277 folly::Optional<QuicErrorCode> streamWriteError;
278
279 // State machine data
280 StreamSendState sendState{StreamSendState::Open};
281
282 // State machine data
283 StreamRecvState recvState{StreamRecvState::Open};
284
285 // Tells whether this stream is a control stream.
286 // It is set by the app via setControlStream and the transport can use this
287 // knowledge for optimizations e.g. for setting the app limited state on
288 // congestion control with control streams still active.
289 bool isControl{false};
290
291 // The last time we detected we were head of line blocked on the stream.
292 folly::Optional<Clock::time_point> lastHolbTime;
293
294 // The total amount of time we are head line blocked on the stream.
295 std::chrono::microseconds totalHolbTime{0us};
296
297 // Number of times the stream has entered the HOLB state
298 // lastHolbTime indicates whether the stream is HOL blocked at the moment.
299 uint32_t holbCount{0};
300
301 Priority priority{kDefaultPriority};
302
303 // Returns true if both send and receive state machines are in a terminal
304 // state
inTerminalStatesQuicStreamState305 bool inTerminalStates() const {
306 bool sendInTerminalState = sendState == StreamSendState::Closed ||
307 sendState == StreamSendState::Invalid;
308
309 bool recvInTerminalState = recvState == StreamRecvState::Closed ||
310 recvState == StreamRecvState::Invalid;
311
312 return sendInTerminalState && recvInTerminalState;
313 }
314
315 // If the stream is still writable.
writableQuicStreamState316 bool writable() const {
317 return sendState == StreamSendState::Open && !finalWriteOffset.has_value();
318 }
319
shouldSendFlowControlQuicStreamState320 bool shouldSendFlowControl() const {
321 return recvState == StreamRecvState::Open;
322 }
323
324 // If the stream has writable data that's not backed by DSR. That is, in a
325 // regular stream write, it will be able to write something. So it either
326 // needs to have writeBuffer, or it has EOF to send.
hasWritableDataQuicStreamState327 bool hasWritableData() const {
328 if (!writeBuffer.empty()) {
329 return flowControlState.peerAdvertisedMaxOffset - currentWriteOffset > 0;
330 }
331 if (finalWriteOffset) {
332 /**
333 * This is the case that EOF/FIN is the only thing we can write in a
334 * non-DSR write for a stream. It's actually OK to send out a FIN with
335 * correct offset before we send out DSRed bytes. Peer is supposed to be
336 * able to handle this. But it's also not hard to limit it. So here i'm
337 * gonna go with the safer path: do not write FIN only stream frame if we
338 * still have BufMetas to send.
339 */
340 return writeBufMeta.length == 0 &&
341 currentWriteOffset <= *finalWriteOffset &&
342 writeBufMeta.offset <= *finalWriteOffset;
343 }
344 return false;
345 }
346
hasWritableBufMetaQuicStreamState347 FOLLY_NODISCARD bool hasWritableBufMeta() const {
348 if (writeBufMeta.offset == 0) {
349 return false;
350 }
351 if (writeBufMeta.length > 0) {
352 return flowControlState.peerAdvertisedMaxOffset - writeBufMeta.offset > 0;
353 }
354 if (finalWriteOffset) {
355 return writeBufMeta.offset <= *finalWriteOffset;
356 }
357 return false;
358 }
359
hasSentFINQuicStreamState360 FOLLY_NODISCARD bool hasSentFIN() const {
361 if (!finalWriteOffset) {
362 return false;
363 }
364 return currentWriteOffset > *finalWriteOffset ||
365 writeBufMeta.offset > *finalWriteOffset;
366 }
367
nextOffsetToWriteQuicStreamState368 FOLLY_NODISCARD uint64_t nextOffsetToWrite() const {
369 // The stream has never had WriteBufferMetas. Then currentWriteOffset
370 // always points to the next offset we send. This of course relies on the
371 // current contract of DSR: Real data always comes first. This code (and a
372 // lot other code) breaks when that contract is breached.
373 if (writeBufMeta.offset == 0) {
374 return currentWriteOffset;
375 }
376 if (!writeBuffer.empty()) {
377 return currentWriteOffset;
378 }
379 return writeBufMeta.offset;
380 }
381
hasReadableDataQuicStreamState382 bool hasReadableData() const {
383 return (readBuffer.size() > 0 &&
384 currentReadOffset == readBuffer.front().offset) ||
385 (finalReadOffset && currentReadOffset == *finalReadOffset);
386 }
387
hasPeekableDataQuicStreamState388 bool hasPeekableData() const {
389 return readBuffer.size() > 0;
390 }
391
392 std::unique_ptr<DSRPacketizationRequestSender> dsrSender;
393
394 // BufferMeta that has been writen to the QUIC layer.
395 // When offset is 0, nothing has been written to it. On first write, its
396 // starting offset will be currentWriteOffset + writeBuffer.chainLength().
397 WriteBufferMeta writeBufMeta;
398
399 // A map to store sent WriteBufferMetas for potential retransmission.
400 folly::F14FastMap<uint64_t, WriteBufferMeta> retransmissionBufMetas;
401
402 // WriteBufferMetas that's already marked lost. They will be retransmitted.
403 std::deque<WriteBufferMeta> lossBufMetas;
404
405 /**
406 * Insert a new WriteBufferMeta into lossBufMetas. If the new WriteBufferMeta
407 * can be append to an existing WriteBufferMeta, it will be appended. Note
408 * it won't be prepended to an existing WriteBufferMeta. And it will also not
409 * merge 3 WriteBufferMetas together if the new one happens to fill up a hole
410 * between 2 existing WriteBufferMetas.
411 */
insertIntoLossBufMetaQuicStreamState412 void insertIntoLossBufMeta(WriteBufferMeta bufMeta) {
413 auto lossItr = std::upper_bound(
414 lossBufMetas.begin(),
415 lossBufMetas.end(),
416 bufMeta.offset,
417 [](auto offset, const auto& bufMeta) {
418 return offset < bufMeta.offset;
419 });
420 if (!lossBufMetas.empty() && lossItr != lossBufMetas.begin() &&
421 std::prev(lossItr)->offset + std::prev(lossItr)->length ==
422 bufMeta.offset) {
423 std::prev(lossItr)->length += bufMeta.length;
424 std::prev(lossItr)->eof = bufMeta.eof;
425 } else {
426 lossBufMetas.insert(lossItr, bufMeta);
427 }
428 }
429 };
430 } // namespace quic
431