1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  *
7  */
8 
9 #pragma once
10 
11 #include <folly/container/F14Map.h>
12 #include <quic/QuicConstants.h>
13 #include <quic/codec/Types.h>
14 #include <quic/common/SmallVec.h>
15 #include <quic/dsr/DSRPacketizationRequestSender.h>
16 #include <quic/state/QuicPriorityQueue.h>
17 
18 namespace quic {
19 
20 /**
21  * A buffer representation without the actual data. This is part of the public
22  * facing interface.
23  *
24  * This is experimental.
25  */
26 struct BufferMeta {
27   size_t length;
28 
BufferMetaBufferMeta29   explicit BufferMeta(size_t lengthIn) : length(lengthIn) {}
30 };
31 
32 /**
33  * A write buffer representation without the actual data. This is used for
34  * write buffer management in a stream.
35  *
36  * This is experimental.
37  */
38 struct WriteBufferMeta {
39   size_t length{0};
40   size_t offset{0};
41   bool eof{false};
42 
43   WriteBufferMeta() = default;
44 
45   struct Builder {
setLengthWriteBufferMeta::Builder46     Builder& setLength(size_t lengthIn) {
47       length_ = lengthIn;
48       return *this;
49     }
50 
setOffsetWriteBufferMeta::Builder51     Builder& setOffset(size_t offsetIn) {
52       offset_ = offsetIn;
53       return *this;
54     }
55 
setEOFWriteBufferMeta::Builder56     Builder& setEOF(bool val) {
57       eof_ = val;
58       return *this;
59     }
60 
buildWriteBufferMeta::Builder61     WriteBufferMeta build() {
62       return WriteBufferMeta(length_, offset_, eof_);
63     }
64 
65    private:
66     size_t length_{0};
67     size_t offset_{0};
68     bool eof_{false};
69   };
70 
splitWriteBufferMeta71   WriteBufferMeta split(size_t splitLen) {
72     CHECK_GE(length, splitLen);
73     auto splitEof = splitLen == length && eof;
74     WriteBufferMeta splitOf(splitLen, offset, splitEof);
75     offset += splitLen;
76     length -= splitLen;
77     return splitOf;
78   }
79 
80  private:
WriteBufferMetaWriteBufferMeta81   explicit WriteBufferMeta(size_t lengthIn, size_t offsetIn, bool eofIn)
82       : length(lengthIn), offset(offsetIn), eof(eofIn) {}
83 };
84 
85 struct StreamBuffer {
86   BufQueue data;
87   uint64_t offset;
88   bool eof{false};
89 
90   StreamBuffer(Buf dataIn, uint64_t offsetIn, bool eofIn = false) noexcept
dataStreamBuffer91       : data(std::move(dataIn)), offset(offsetIn), eof(eofIn) {}
92 
93   StreamBuffer(StreamBuffer&& other) = default;
94   StreamBuffer& operator=(StreamBuffer&& other) = default;
95 };
96 
97 struct QuicStreamLike {
98   QuicStreamLike() = default;
99 
100   QuicStreamLike(QuicStreamLike&&) = default;
101 
102   virtual ~QuicStreamLike() = default;
103 
104   // List of bytes that have been read and buffered. We need to buffer
105   // bytes in case we get bytes out of order.
106   std::deque<StreamBuffer> readBuffer;
107 
108   // List of bytes that have been written to the QUIC layer.
109   BufQueue writeBuffer{};
110 
111   // Stores a map of offset:buffers which have been written to the socket and
112   // are currently un-acked. Each one represents one StreamFrame that was
113   // written. We need to buffer these because these might be retransmitted in
114   // the future. These are associated with the starting offset of the buffer.
115   // Note: the offset in the StreamBuffer itself can be >= the offset on which
116   // it is keyed due to partial reliability - when data is skipped the offset
117   // in the StreamBuffer may be incremented, but the keyed offset must remain
118   // the same so it can be removed from the buffer on ACK.
119   folly::F14FastMap<uint64_t, std::unique_ptr<StreamBuffer>>
120       retransmissionBuffer;
121 
122   // Tracks intervals which we have received ACKs for. E.g. in the case of all
123   // data being acked this would contain one internval from 0 -> the largest
124   // offseet ACKed. This allows us to track which delivery callbacks can be
125   // called.
126   template <class T>
127   using IntervalSetVec = SmallVec<T, 32, uint16_t>;
128   using AckedIntervals = IntervalSet<uint64_t, 1, IntervalSetVec>;
129   AckedIntervals ackedIntervals;
130 
131   // Stores a list of buffers which have been marked as loss by loss detector.
132   // Each one represents one StreamFrame that was written.
133   std::deque<StreamBuffer> lossBuffer;
134 
135   // Current offset of the start bytes in the write buffer.
136   // This changes when we pop stuff off the writeBuffer.
137   // In a non-DSR stream, when we are finished writing out all the bytes until
138   // FIN, this will be one greater than finalWriteOffset.
139   // When DSR is used, this still points to the starting bytes in the write
140   // buffer. Its value won't change with WriteBufferMetas are appended and sent
141   // for a stream.
142   uint64_t currentWriteOffset{0};
143 
144   // the minimum offset requires retransmit
145   // N.B. used in QUIC partial reliability
146   uint64_t minimumRetransmittableOffset{0};
147 
148   // Offset of the next expected bytes that we need to read from
149   // the read buffer.
150   uint64_t currentReadOffset{0};
151 
152   // the smallest data offset that we expect the peer to send.
153   // N.B. used in QUIC partial reliability
154   uint64_t currentReceiveOffset{0};
155 
156   // Maximum byte offset observed on the stream.
157   uint64_t maxOffsetObserved{0};
158 
159   // If an EOF is observed on the stream, the position of the EOF. It could be
160   // either from FIN or RST. Right now we use one value to represent both FIN
161   // and RST. We may split write EOF into two values in the future.
162   // Read side eof offset.
163   folly::Optional<uint64_t> finalReadOffset;
164 
165   // Current cumulative number of packets sent for this stream. It only counts
166   // egress packets that contains a *new* STREAM frame for this stream.
167   uint64_t numPacketsTxWithNewData{0};
168 
169   /*
170    * Either insert a new entry into the loss buffer, or merge the buffer with
171    * an existing entry.
172    */
insertIntoLossBufferQuicStreamLike173   void insertIntoLossBuffer(std::unique_ptr<StreamBuffer> buf) {
174     // We assume here that we won't try to insert an overlapping buffer, as
175     // that should never happen in the loss buffer.
176     auto lossItr = std::upper_bound(
177         lossBuffer.begin(),
178         lossBuffer.end(),
179         buf->offset,
180         [](auto offset, const auto& buffer) { return offset < buffer.offset; });
181     if (!lossBuffer.empty() && lossItr != lossBuffer.begin() &&
182         std::prev(lossItr)->offset + std::prev(lossItr)->data.chainLength() ==
183             buf->offset) {
184       std::prev(lossItr)->data.append(buf->data.move());
185       std::prev(lossItr)->eof = buf->eof;
186     } else {
187       lossBuffer.insert(lossItr, std::move(*buf));
188     }
189   }
190 };
191 
192 struct QuicConnectionStateBase;
193 
194 enum class StreamSendState : uint8_t { Open, ResetSent, Closed, Invalid };
195 
196 enum class StreamRecvState : uint8_t { Open, Closed, Invalid };
197 
streamStateToString(StreamSendState state)198 inline folly::StringPiece streamStateToString(StreamSendState state) {
199   switch (state) {
200     case StreamSendState::Open:
201       return "Open";
202     case StreamSendState::ResetSent:
203       return "ResetSent";
204     case StreamSendState::Closed:
205       return "Closed";
206     case StreamSendState::Invalid:
207       return "Invalid";
208   }
209   return "Unknown";
210 }
211 
streamStateToString(StreamRecvState state)212 inline folly::StringPiece streamStateToString(StreamRecvState state) {
213   switch (state) {
214     case StreamRecvState::Open:
215       return "Open";
216     case StreamRecvState::Closed:
217       return "Closed";
218     case StreamRecvState::Invalid:
219       return "Invalid";
220   }
221   return "Unknown";
222 }
223 
224 struct QuicStreamState : public QuicStreamLike {
225   virtual ~QuicStreamState() override = default;
226 
227   QuicStreamState(StreamId id, QuicConnectionStateBase& conn);
228 
229   QuicStreamState(QuicStreamState&&) = default;
230 
231   /**
232    * Constructor to migrate QuicStreamState to another
233    * QuicConnectionStateBase.
234    */
QuicStreamStateQuicStreamState235   QuicStreamState(QuicConnectionStateBase& connIn, QuicStreamState&& other)
236       : QuicStreamLike(std::move(other)), conn(connIn), id(other.id) {
237     // QuicStreamState fields
238     finalWriteOffset = other.finalWriteOffset;
239     flowControlState = other.flowControlState;
240     streamReadError = other.streamReadError;
241     streamWriteError = other.streamWriteError;
242     sendState = other.sendState;
243     recvState = other.recvState;
244     isControl = other.isControl;
245     lastHolbTime = other.lastHolbTime;
246     totalHolbTime = other.totalHolbTime;
247     holbCount = other.holbCount;
248     priority = other.priority;
249     dsrSender = std::move(other.dsrSender);
250     writeBufMeta = other.writeBufMeta;
251     retransmissionBufMetas = std::move(other.retransmissionBufMetas);
252     lossBufMetas = std::move(other.lossBufMetas);
253   }
254 
255   // Connection that this stream is associated with.
256   QuicConnectionStateBase& conn;
257 
258   // Stream id of the connection.
259   StreamId id;
260 
261   // Write side eof offset. This represents only the final FIN offset.
262   folly::Optional<uint64_t> finalWriteOffset;
263 
264   struct StreamFlowControlState {
265     uint64_t windowSize{0};
266     uint64_t advertisedMaxOffset{0};
267     uint64_t peerAdvertisedMaxOffset{0};
268     // Time at which the last flow control update was sent by the transport.
269     folly::Optional<TimePoint> timeOfLastFlowControlUpdate;
270   };
271 
272   StreamFlowControlState flowControlState;
273 
274   // Stream level read error occured.
275   folly::Optional<QuicErrorCode> streamReadError;
276   // Stream level write error occured.
277   folly::Optional<QuicErrorCode> streamWriteError;
278 
279   // State machine data
280   StreamSendState sendState{StreamSendState::Open};
281 
282   // State machine data
283   StreamRecvState recvState{StreamRecvState::Open};
284 
285   // Tells whether this stream is a control stream.
286   // It is set by the app via setControlStream and the transport can use this
287   // knowledge for optimizations e.g. for setting the app limited state on
288   // congestion control with control streams still active.
289   bool isControl{false};
290 
291   // The last time we detected we were head of line blocked on the stream.
292   folly::Optional<Clock::time_point> lastHolbTime;
293 
294   // The total amount of time we are head line blocked on the stream.
295   std::chrono::microseconds totalHolbTime{0us};
296 
297   // Number of times the stream has entered the HOLB state
298   // lastHolbTime indicates whether the stream is HOL blocked at the moment.
299   uint32_t holbCount{0};
300 
301   Priority priority{kDefaultPriority};
302 
303   // Returns true if both send and receive state machines are in a terminal
304   // state
inTerminalStatesQuicStreamState305   bool inTerminalStates() const {
306     bool sendInTerminalState = sendState == StreamSendState::Closed ||
307         sendState == StreamSendState::Invalid;
308 
309     bool recvInTerminalState = recvState == StreamRecvState::Closed ||
310         recvState == StreamRecvState::Invalid;
311 
312     return sendInTerminalState && recvInTerminalState;
313   }
314 
315   // If the stream is still writable.
writableQuicStreamState316   bool writable() const {
317     return sendState == StreamSendState::Open && !finalWriteOffset.has_value();
318   }
319 
shouldSendFlowControlQuicStreamState320   bool shouldSendFlowControl() const {
321     return recvState == StreamRecvState::Open;
322   }
323 
324   // If the stream has writable data that's not backed by DSR. That is, in a
325   // regular stream write, it will be able to write something. So it either
326   // needs to have writeBuffer, or it has EOF to send.
hasWritableDataQuicStreamState327   bool hasWritableData() const {
328     if (!writeBuffer.empty()) {
329       return flowControlState.peerAdvertisedMaxOffset - currentWriteOffset > 0;
330     }
331     if (finalWriteOffset) {
332       /**
333        * This is the case that EOF/FIN is the only thing we can write in a
334        * non-DSR write for a stream. It's actually OK to send out a FIN with
335        * correct offset before we send out DSRed bytes. Peer is supposed to be
336        * able to handle this. But it's also not hard to limit it. So here i'm
337        * gonna go with the safer path: do not write FIN only stream frame if we
338        * still have BufMetas to send.
339        */
340       return writeBufMeta.length == 0 &&
341           currentWriteOffset <= *finalWriteOffset &&
342           writeBufMeta.offset <= *finalWriteOffset;
343     }
344     return false;
345   }
346 
hasWritableBufMetaQuicStreamState347   FOLLY_NODISCARD bool hasWritableBufMeta() const {
348     if (writeBufMeta.offset == 0) {
349       return false;
350     }
351     if (writeBufMeta.length > 0) {
352       return flowControlState.peerAdvertisedMaxOffset - writeBufMeta.offset > 0;
353     }
354     if (finalWriteOffset) {
355       return writeBufMeta.offset <= *finalWriteOffset;
356     }
357     return false;
358   }
359 
hasSentFINQuicStreamState360   FOLLY_NODISCARD bool hasSentFIN() const {
361     if (!finalWriteOffset) {
362       return false;
363     }
364     return currentWriteOffset > *finalWriteOffset ||
365         writeBufMeta.offset > *finalWriteOffset;
366   }
367 
nextOffsetToWriteQuicStreamState368   FOLLY_NODISCARD uint64_t nextOffsetToWrite() const {
369     // The stream has never had WriteBufferMetas. Then currentWriteOffset
370     // always points to the next offset we send. This of course relies on the
371     // current contract of DSR: Real data always comes first. This code (and a
372     // lot other code) breaks when that contract is breached.
373     if (writeBufMeta.offset == 0) {
374       return currentWriteOffset;
375     }
376     if (!writeBuffer.empty()) {
377       return currentWriteOffset;
378     }
379     return writeBufMeta.offset;
380   }
381 
hasReadableDataQuicStreamState382   bool hasReadableData() const {
383     return (readBuffer.size() > 0 &&
384             currentReadOffset == readBuffer.front().offset) ||
385         (finalReadOffset && currentReadOffset == *finalReadOffset);
386   }
387 
hasPeekableDataQuicStreamState388   bool hasPeekableData() const {
389     return readBuffer.size() > 0;
390   }
391 
392   std::unique_ptr<DSRPacketizationRequestSender> dsrSender;
393 
394   // BufferMeta that has been writen to the QUIC layer.
395   // When offset is 0, nothing has been written to it. On first write, its
396   // starting offset will be currentWriteOffset + writeBuffer.chainLength().
397   WriteBufferMeta writeBufMeta;
398 
399   // A map to store sent WriteBufferMetas for potential retransmission.
400   folly::F14FastMap<uint64_t, WriteBufferMeta> retransmissionBufMetas;
401 
402   // WriteBufferMetas that's already marked lost. They will be retransmitted.
403   std::deque<WriteBufferMeta> lossBufMetas;
404 
405   /**
406    * Insert a new WriteBufferMeta into lossBufMetas. If the new WriteBufferMeta
407    * can be append to an existing WriteBufferMeta, it will be appended. Note
408    * it won't be prepended to an existing WriteBufferMeta. And it will also not
409    * merge 3 WriteBufferMetas together if the new one happens to fill up a hole
410    * between 2 existing WriteBufferMetas.
411    */
insertIntoLossBufMetaQuicStreamState412   void insertIntoLossBufMeta(WriteBufferMeta bufMeta) {
413     auto lossItr = std::upper_bound(
414         lossBufMetas.begin(),
415         lossBufMetas.end(),
416         bufMeta.offset,
417         [](auto offset, const auto& bufMeta) {
418           return offset < bufMeta.offset;
419         });
420     if (!lossBufMetas.empty() && lossItr != lossBufMetas.begin() &&
421         std::prev(lossItr)->offset + std::prev(lossItr)->length ==
422             bufMeta.offset) {
423       std::prev(lossItr)->length += bufMeta.length;
424       std::prev(lossItr)->eof = bufMeta.eof;
425     } else {
426       lossBufMetas.insert(lossItr, bufMeta);
427     }
428   }
429 };
430 } // namespace quic
431