1 // Copyright (c) 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "net/third_party/quiche/src/quic/core/quic_stream_sequencer_buffer.h"
6 
7 #include <string>
8 
9 #include "net/third_party/quiche/src/quic/core/quic_constants.h"
10 #include "net/third_party/quiche/src/quic/core/quic_interval.h"
11 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
12 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
13 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
14 #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
15 #include "net/third_party/quiche/src/common/platform/api/quiche_str_cat.h"
16 #include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
17 
18 namespace quic {
19 namespace {
20 
CalculateBlockCount(size_t max_capacity_bytes)21 size_t CalculateBlockCount(size_t max_capacity_bytes) {
22   return (max_capacity_bytes + QuicStreamSequencerBuffer::kBlockSizeBytes - 1) /
23          QuicStreamSequencerBuffer::kBlockSizeBytes;
24 }
25 
26 // Upper limit of how many gaps allowed in buffer, which ensures a reasonable
27 // number of iterations needed to find the right gap to fill when a frame
28 // arrives.
29 const size_t kMaxNumDataIntervalsAllowed = 2 * kMaxPacketGap;
30 
31 }  // namespace
32 
QuicStreamSequencerBuffer(size_t max_capacity_bytes)33 QuicStreamSequencerBuffer::QuicStreamSequencerBuffer(size_t max_capacity_bytes)
34     : max_buffer_capacity_bytes_(max_capacity_bytes),
35       blocks_count_(CalculateBlockCount(max_capacity_bytes)),
36       total_bytes_read_(0),
37       blocks_(nullptr) {
38   Clear();
39 }
40 
~QuicStreamSequencerBuffer()41 QuicStreamSequencerBuffer::~QuicStreamSequencerBuffer() {
42   Clear();
43 }
44 
Clear()45 void QuicStreamSequencerBuffer::Clear() {
46   if (blocks_ != nullptr) {
47     for (size_t i = 0; i < blocks_count_; ++i) {
48       if (blocks_[i] != nullptr) {
49         RetireBlock(i);
50       }
51     }
52   }
53   num_bytes_buffered_ = 0;
54   bytes_received_.Clear();
55   bytes_received_.Add(0, total_bytes_read_);
56 }
57 
RetireBlock(size_t index)58 bool QuicStreamSequencerBuffer::RetireBlock(size_t index) {
59   if (blocks_[index] == nullptr) {
60     QUIC_BUG << "Try to retire block twice";
61     return false;
62   }
63   delete blocks_[index];
64   blocks_[index] = nullptr;
65   QUIC_DVLOG(1) << "Retired block with index: " << index;
66   return true;
67 }
68 
OnStreamData(QuicStreamOffset starting_offset,quiche::QuicheStringPiece data,size_t * const bytes_buffered,std::string * error_details)69 QuicErrorCode QuicStreamSequencerBuffer::OnStreamData(
70     QuicStreamOffset starting_offset,
71     quiche::QuicheStringPiece data,
72     size_t* const bytes_buffered,
73     std::string* error_details) {
74   *bytes_buffered = 0;
75   size_t size = data.size();
76   if (size == 0) {
77     *error_details = "Received empty stream frame without FIN.";
78     return QUIC_EMPTY_STREAM_FRAME_NO_FIN;
79   }
80   // Write beyond the current range this buffer is covering.
81   if (starting_offset + size > total_bytes_read_ + max_buffer_capacity_bytes_ ||
82       starting_offset + size < starting_offset) {
83     *error_details = "Received data beyond available range.";
84     return QUIC_INTERNAL_ERROR;
85   }
86   if (bytes_received_.Empty() ||
87       starting_offset >= bytes_received_.rbegin()->max() ||
88       bytes_received_.IsDisjoint(QuicInterval<QuicStreamOffset>(
89           starting_offset, starting_offset + size))) {
90     // Optimization for the typical case, when all data is newly received.
91     bytes_received_.AddOptimizedForAppend(starting_offset,
92                                           starting_offset + size);
93     if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
94       // This frame is going to create more intervals than allowed. Stop
95       // processing.
96       *error_details = "Too many data intervals received for this stream.";
97       return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
98     }
99 
100     size_t bytes_copy = 0;
101     if (!CopyStreamData(starting_offset, data, &bytes_copy, error_details)) {
102       return QUIC_STREAM_SEQUENCER_INVALID_STATE;
103     }
104     *bytes_buffered += bytes_copy;
105     num_bytes_buffered_ += *bytes_buffered;
106     return QUIC_NO_ERROR;
107   }
108   // Slow path, received data overlaps with received data.
109   QuicIntervalSet<QuicStreamOffset> newly_received(starting_offset,
110                                                    starting_offset + size);
111   newly_received.Difference(bytes_received_);
112   if (newly_received.Empty()) {
113     return QUIC_NO_ERROR;
114   }
115   bytes_received_.Add(starting_offset, starting_offset + size);
116   if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
117     // This frame is going to create more intervals than allowed. Stop
118     // processing.
119     *error_details = "Too many data intervals received for this stream.";
120     return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
121   }
122   for (const auto& interval : newly_received) {
123     const QuicStreamOffset copy_offset = interval.min();
124     const QuicByteCount copy_length = interval.max() - interval.min();
125     size_t bytes_copy = 0;
126     if (!CopyStreamData(copy_offset,
127                         data.substr(copy_offset - starting_offset, copy_length),
128                         &bytes_copy, error_details)) {
129       return QUIC_STREAM_SEQUENCER_INVALID_STATE;
130     }
131     *bytes_buffered += bytes_copy;
132   }
133   num_bytes_buffered_ += *bytes_buffered;
134   return QUIC_NO_ERROR;
135 }
136 
CopyStreamData(QuicStreamOffset offset,quiche::QuicheStringPiece data,size_t * bytes_copy,std::string * error_details)137 bool QuicStreamSequencerBuffer::CopyStreamData(QuicStreamOffset offset,
138                                                quiche::QuicheStringPiece data,
139                                                size_t* bytes_copy,
140                                                std::string* error_details) {
141   *bytes_copy = 0;
142   size_t source_remaining = data.size();
143   if (source_remaining == 0) {
144     return true;
145   }
146   const char* source = data.data();
147   // Write data block by block. If corresponding block has not created yet,
148   // create it first.
149   // Stop when all data are written or reaches the logical end of the buffer.
150   while (source_remaining > 0) {
151     const size_t write_block_num = GetBlockIndex(offset);
152     const size_t write_block_offset = GetInBlockOffset(offset);
153     DCHECK_GT(blocks_count_, write_block_num);
154 
155     size_t block_capacity = GetBlockCapacity(write_block_num);
156     size_t bytes_avail = block_capacity - write_block_offset;
157 
158     // If this write meets the upper boundary of the buffer,
159     // reduce the available free bytes.
160     if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) {
161       bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset;
162     }
163 
164     if (blocks_ == nullptr) {
165       blocks_.reset(new BufferBlock*[blocks_count_]());
166       for (size_t i = 0; i < blocks_count_; ++i) {
167         blocks_[i] = nullptr;
168       }
169     }
170 
171     if (write_block_num >= blocks_count_) {
172       *error_details = quiche::QuicheStrCat(
173           "QuicStreamSequencerBuffer error: OnStreamData() exceed array bounds."
174           "write offset = ",
175           offset, " write_block_num = ", write_block_num,
176           " blocks_count_ = ", blocks_count_);
177       return false;
178     }
179     if (blocks_ == nullptr) {
180       *error_details =
181           "QuicStreamSequencerBuffer error: OnStreamData() blocks_ is null";
182       return false;
183     }
184     if (blocks_[write_block_num] == nullptr) {
185       // TODO(danzh): Investigate if using a freelist would improve performance.
186       // Same as RetireBlock().
187       blocks_[write_block_num] = new BufferBlock();
188     }
189 
190     const size_t bytes_to_copy =
191         std::min<size_t>(bytes_avail, source_remaining);
192     char* dest = blocks_[write_block_num]->buffer + write_block_offset;
193     QUIC_DVLOG(1) << "Write at offset: " << offset
194                   << " length: " << bytes_to_copy;
195 
196     if (dest == nullptr || source == nullptr) {
197       *error_details = quiche::QuicheStrCat(
198           "QuicStreamSequencerBuffer error: OnStreamData()"
199           " dest == nullptr: ",
200           (dest == nullptr), " source == nullptr: ", (source == nullptr),
201           " Writing at offset ", offset,
202           " Received frames: ", ReceivedFramesDebugString(),
203           " total_bytes_read_ = ", total_bytes_read_);
204       return false;
205     }
206     memcpy(dest, source, bytes_to_copy);
207     source += bytes_to_copy;
208     source_remaining -= bytes_to_copy;
209     offset += bytes_to_copy;
210     *bytes_copy += bytes_to_copy;
211   }
212   return true;
213 }
214 
Readv(const iovec * dest_iov,size_t dest_count,size_t * bytes_read,std::string * error_details)215 QuicErrorCode QuicStreamSequencerBuffer::Readv(const iovec* dest_iov,
216                                                size_t dest_count,
217                                                size_t* bytes_read,
218                                                std::string* error_details) {
219   *bytes_read = 0;
220   for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) {
221     char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base);
222     DCHECK(dest != nullptr);
223     size_t dest_remaining = dest_iov[i].iov_len;
224     while (dest_remaining > 0 && ReadableBytes() > 0) {
225       size_t block_idx = NextBlockToRead();
226       size_t start_offset_in_block = ReadOffset();
227       size_t block_capacity = GetBlockCapacity(block_idx);
228       size_t bytes_available_in_block = std::min<size_t>(
229           ReadableBytes(), block_capacity - start_offset_in_block);
230       size_t bytes_to_copy =
231           std::min<size_t>(bytes_available_in_block, dest_remaining);
232       DCHECK_GT(bytes_to_copy, 0u);
233       if (blocks_[block_idx] == nullptr || dest == nullptr) {
234         *error_details = quiche::QuicheStrCat(
235             "QuicStreamSequencerBuffer error:"
236             " Readv() dest == nullptr: ",
237             (dest == nullptr), " blocks_[", block_idx,
238             "] == nullptr: ", (blocks_[block_idx] == nullptr),
239             " Received frames: ", ReceivedFramesDebugString(),
240             " total_bytes_read_ = ", total_bytes_read_);
241         return QUIC_STREAM_SEQUENCER_INVALID_STATE;
242       }
243       memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block,
244              bytes_to_copy);
245       dest += bytes_to_copy;
246       dest_remaining -= bytes_to_copy;
247       num_bytes_buffered_ -= bytes_to_copy;
248       total_bytes_read_ += bytes_to_copy;
249       *bytes_read += bytes_to_copy;
250 
251       // Retire the block if all the data is read out and no other data is
252       // stored in this block.
253       // In case of failing to retire a block which is ready to retire, return
254       // immediately.
255       if (bytes_to_copy == bytes_available_in_block) {
256         bool retire_successfully = RetireBlockIfEmpty(block_idx);
257         if (!retire_successfully) {
258           *error_details = quiche::QuicheStrCat(
259               "QuicStreamSequencerBuffer error: fail to retire block ",
260               block_idx,
261               " as the block is already released, total_bytes_read_ = ",
262               total_bytes_read_,
263               " Received frames: ", ReceivedFramesDebugString());
264           return QUIC_STREAM_SEQUENCER_INVALID_STATE;
265         }
266       }
267     }
268   }
269 
270   return QUIC_NO_ERROR;
271 }
272 
GetReadableRegions(struct iovec * iov,int iov_len) const273 int QuicStreamSequencerBuffer::GetReadableRegions(struct iovec* iov,
274                                                   int iov_len) const {
275   DCHECK(iov != nullptr);
276   DCHECK_GT(iov_len, 0);
277 
278   if (ReadableBytes() == 0) {
279     iov[0].iov_base = nullptr;
280     iov[0].iov_len = 0;
281     return 0;
282   }
283 
284   size_t start_block_idx = NextBlockToRead();
285   QuicStreamOffset readable_offset_end = FirstMissingByte() - 1;
286   DCHECK_GE(readable_offset_end + 1, total_bytes_read_);
287   size_t end_block_offset = GetInBlockOffset(readable_offset_end);
288   size_t end_block_idx = GetBlockIndex(readable_offset_end);
289 
290   // If readable region is within one block, deal with it seperately.
291   if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) {
292     iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
293     iov[0].iov_len = ReadableBytes();
294     QUIC_DVLOG(1) << "Got only a single block with index: " << start_block_idx;
295     return 1;
296   }
297 
298   // Get first block
299   iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
300   iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset();
301   QUIC_DVLOG(1) << "Got first block " << start_block_idx << " with len "
302                 << iov[0].iov_len;
303   DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len)
304       << "there should be more available data";
305 
306   // Get readable regions of the rest blocks till either 2nd to last block
307   // before gap is met or |iov| is filled. For these blocks, one whole block is
308   // a region.
309   int iov_used = 1;
310   size_t block_idx = (start_block_idx + iov_used) % blocks_count_;
311   while (block_idx != end_block_idx && iov_used < iov_len) {
312     DCHECK(nullptr != blocks_[block_idx]);
313     iov[iov_used].iov_base = blocks_[block_idx]->buffer;
314     iov[iov_used].iov_len = GetBlockCapacity(block_idx);
315     QUIC_DVLOG(1) << "Got block with index: " << block_idx;
316     ++iov_used;
317     block_idx = (start_block_idx + iov_used) % blocks_count_;
318   }
319 
320   // Deal with last block if |iov| can hold more.
321   if (iov_used < iov_len) {
322     DCHECK(nullptr != blocks_[block_idx]);
323     iov[iov_used].iov_base = blocks_[end_block_idx]->buffer;
324     iov[iov_used].iov_len = end_block_offset + 1;
325     QUIC_DVLOG(1) << "Got last block with index: " << end_block_idx;
326     ++iov_used;
327   }
328   return iov_used;
329 }
330 
GetReadableRegion(iovec * iov) const331 bool QuicStreamSequencerBuffer::GetReadableRegion(iovec* iov) const {
332   return GetReadableRegions(iov, 1) == 1;
333 }
334 
PeekRegion(QuicStreamOffset offset,iovec * iov) const335 bool QuicStreamSequencerBuffer::PeekRegion(QuicStreamOffset offset,
336                                            iovec* iov) const {
337   DCHECK(iov);
338 
339   if (offset < total_bytes_read_) {
340     // Data at |offset| has already been consumed.
341     return false;
342   }
343 
344   if (offset >= FirstMissingByte()) {
345     // Data at |offset| has not been received yet.
346     return false;
347   }
348 
349   // Beginning of region.
350   size_t block_idx = GetBlockIndex(offset);
351   size_t block_offset = GetInBlockOffset(offset);
352   iov->iov_base = blocks_[block_idx]->buffer + block_offset;
353 
354   // Determine if entire block has been received.
355   size_t end_block_idx = GetBlockIndex(FirstMissingByte());
356   if (block_idx == end_block_idx) {
357     // Only read part of block before FirstMissingByte().
358     iov->iov_len = GetInBlockOffset(FirstMissingByte()) - block_offset;
359   } else {
360     // Read entire block.
361     iov->iov_len = GetBlockCapacity(block_idx) - block_offset;
362   }
363 
364   return true;
365 }
366 
MarkConsumed(size_t bytes_consumed)367 bool QuicStreamSequencerBuffer::MarkConsumed(size_t bytes_consumed) {
368   if (bytes_consumed > ReadableBytes()) {
369     return false;
370   }
371   size_t bytes_to_consume = bytes_consumed;
372   while (bytes_to_consume > 0) {
373     size_t block_idx = NextBlockToRead();
374     size_t offset_in_block = ReadOffset();
375     size_t bytes_available = std::min<size_t>(
376         ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block);
377     size_t bytes_read = std::min<size_t>(bytes_to_consume, bytes_available);
378     total_bytes_read_ += bytes_read;
379     num_bytes_buffered_ -= bytes_read;
380     bytes_to_consume -= bytes_read;
381     // If advanced to the end of current block and end of buffer hasn't wrapped
382     // to this block yet.
383     if (bytes_available == bytes_read) {
384       RetireBlockIfEmpty(block_idx);
385     }
386   }
387 
388   return true;
389 }
390 
FlushBufferedFrames()391 size_t QuicStreamSequencerBuffer::FlushBufferedFrames() {
392   size_t prev_total_bytes_read = total_bytes_read_;
393   total_bytes_read_ = NextExpectedByte();
394   Clear();
395   return total_bytes_read_ - prev_total_bytes_read;
396 }
397 
ReleaseWholeBuffer()398 void QuicStreamSequencerBuffer::ReleaseWholeBuffer() {
399   Clear();
400   blocks_.reset(nullptr);
401 }
402 
ReadableBytes() const403 size_t QuicStreamSequencerBuffer::ReadableBytes() const {
404   return FirstMissingByte() - total_bytes_read_;
405 }
406 
HasBytesToRead() const407 bool QuicStreamSequencerBuffer::HasBytesToRead() const {
408   return ReadableBytes() > 0;
409 }
410 
BytesConsumed() const411 QuicStreamOffset QuicStreamSequencerBuffer::BytesConsumed() const {
412   return total_bytes_read_;
413 }
414 
BytesBuffered() const415 size_t QuicStreamSequencerBuffer::BytesBuffered() const {
416   return num_bytes_buffered_;
417 }
418 
GetBlockIndex(QuicStreamOffset offset) const419 size_t QuicStreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const {
420   return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes;
421 }
422 
GetInBlockOffset(QuicStreamOffset offset) const423 size_t QuicStreamSequencerBuffer::GetInBlockOffset(
424     QuicStreamOffset offset) const {
425   return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes;
426 }
427 
ReadOffset() const428 size_t QuicStreamSequencerBuffer::ReadOffset() const {
429   return GetInBlockOffset(total_bytes_read_);
430 }
431 
NextBlockToRead() const432 size_t QuicStreamSequencerBuffer::NextBlockToRead() const {
433   return GetBlockIndex(total_bytes_read_);
434 }
435 
RetireBlockIfEmpty(size_t block_index)436 bool QuicStreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) {
437   DCHECK(ReadableBytes() == 0 || GetInBlockOffset(total_bytes_read_) == 0)
438       << "RetireBlockIfEmpty() should only be called when advancing to next "
439       << "block or a gap has been reached.";
440   // If the whole buffer becomes empty, the last piece of data has been read.
441   if (Empty()) {
442     return RetireBlock(block_index);
443   }
444 
445   // Check where the logical end of this buffer is.
446   // Not empty if the end of circular buffer has been wrapped to this block.
447   if (GetBlockIndex(NextExpectedByte() - 1) == block_index) {
448     return true;
449   }
450 
451   // Read index remains in this block, which means a gap has been reached.
452   if (NextBlockToRead() == block_index) {
453     if (bytes_received_.Size() > 1) {
454       auto it = bytes_received_.begin();
455       ++it;
456       if (GetBlockIndex(it->min()) == block_index) {
457         // Do not retire the block if next data interval is in this block.
458         return true;
459       }
460     } else {
461       QUIC_BUG << "Read stopped at where it shouldn't.";
462       return false;
463     }
464   }
465   return RetireBlock(block_index);
466 }
467 
Empty() const468 bool QuicStreamSequencerBuffer::Empty() const {
469   return bytes_received_.Empty() ||
470          (bytes_received_.Size() == 1 && total_bytes_read_ > 0 &&
471           bytes_received_.begin()->max() == total_bytes_read_);
472 }
473 
GetBlockCapacity(size_t block_index) const474 size_t QuicStreamSequencerBuffer::GetBlockCapacity(size_t block_index) const {
475   if ((block_index + 1) == blocks_count_) {
476     size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes;
477     if (result == 0) {  // whole block
478       result = kBlockSizeBytes;
479     }
480     return result;
481   } else {
482     return kBlockSizeBytes;
483   }
484 }
485 
ReceivedFramesDebugString() const486 std::string QuicStreamSequencerBuffer::ReceivedFramesDebugString() const {
487   return bytes_received_.ToString();
488 }
489 
FirstMissingByte() const490 QuicStreamOffset QuicStreamSequencerBuffer::FirstMissingByte() const {
491   if (bytes_received_.Empty() || bytes_received_.begin()->min() > 0) {
492     // Offset 0 is not received yet.
493     return 0;
494   }
495   return bytes_received_.begin()->max();
496 }
497 
NextExpectedByte() const498 QuicStreamOffset QuicStreamSequencerBuffer::NextExpectedByte() const {
499   if (bytes_received_.Empty()) {
500     return 0;
501   }
502   return bytes_received_.rbegin()->max();
503 }
504 
505 }  //  namespace quic
506