1 // Copyright (c) 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/third_party/quiche/src/quic/core/quic_stream_sequencer_buffer.h"
6
7 #include <algorithm>
8 #include <cstddef>
9 #include <memory>
10 #include <string>
11
12 #include "absl/strings/string_view.h"
13 #include "net/third_party/quiche/src/quic/core/quic_constants.h"
14 #include "net/third_party/quiche/src/quic/core/quic_interval.h"
15 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
16 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
17 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
18 #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
19 #include "net/third_party/quiche/src/common/platform/api/quiche_str_cat.h"
20
21 namespace quic {
22 namespace {
23
CalculateBlockCount(size_t max_capacity_bytes)24 size_t CalculateBlockCount(size_t max_capacity_bytes) {
25 return (max_capacity_bytes + QuicStreamSequencerBuffer::kBlockSizeBytes - 1) /
26 QuicStreamSequencerBuffer::kBlockSizeBytes;
27 }
28
29 // Upper limit of how many gaps allowed in buffer, which ensures a reasonable
30 // number of iterations needed to find the right gap to fill when a frame
31 // arrives.
32 const size_t kMaxNumDataIntervalsAllowed = 2 * kMaxPacketGap;
33
34 // Number of blocks allocated initially.
35 constexpr size_t kInitialBlockCount = 8u;
36
37 // How fast block pointers container grow in size.
38 // Choose 4 to reduce the amount of reallocation.
39 constexpr int kBlocksGrowthFactor = 4;
40
41 } // namespace
42
QuicStreamSequencerBuffer(size_t max_capacity_bytes)43 QuicStreamSequencerBuffer::QuicStreamSequencerBuffer(size_t max_capacity_bytes)
44 : max_buffer_capacity_bytes_(max_capacity_bytes),
45 max_blocks_count_(CalculateBlockCount(max_capacity_bytes)),
46 current_blocks_count_(0u),
47 total_bytes_read_(0),
48 blocks_(nullptr) {
49 if (allocate_blocks_on_demand_) {
50 DCHECK_GE(max_blocks_count_, kInitialBlockCount);
51 }
52 Clear();
53 }
54
~QuicStreamSequencerBuffer()55 QuicStreamSequencerBuffer::~QuicStreamSequencerBuffer() {
56 Clear();
57 }
58
Clear()59 void QuicStreamSequencerBuffer::Clear() {
60 if (blocks_ != nullptr) {
61 size_t blocks_to_clear =
62 allocate_blocks_on_demand_ ? current_blocks_count_ : max_blocks_count_;
63 for (size_t i = 0; i < blocks_to_clear; ++i) {
64 if (blocks_[i] != nullptr) {
65 RetireBlock(i);
66 }
67 }
68 }
69 num_bytes_buffered_ = 0;
70 bytes_received_.Clear();
71 bytes_received_.Add(0, total_bytes_read_);
72 }
73
RetireBlock(size_t index)74 bool QuicStreamSequencerBuffer::RetireBlock(size_t index) {
75 if (blocks_[index] == nullptr) {
76 QUIC_BUG << "Try to retire block twice";
77 return false;
78 }
79 delete blocks_[index];
80 blocks_[index] = nullptr;
81 QUIC_DVLOG(1) << "Retired block with index: " << index;
82 return true;
83 }
84
MaybeAddMoreBlocks(size_t next_expected_byte)85 void QuicStreamSequencerBuffer::MaybeAddMoreBlocks(size_t next_expected_byte) {
86 if (current_blocks_count_ == max_blocks_count_) {
87 return;
88 }
89 size_t last_byte = next_expected_byte - 1;
90 size_t num_of_blocks_needed;
91 // As long as last_byte does not wrap around, its index plus one blocks are
92 // needed. Otherwise, block_count_ blocks are needed.
93 if (last_byte < max_buffer_capacity_bytes_) {
94 num_of_blocks_needed =
95 std::max(GetBlockIndex(last_byte) + 1, kInitialBlockCount);
96 } else {
97 num_of_blocks_needed = max_blocks_count_;
98 }
99 if (current_blocks_count_ >= num_of_blocks_needed) {
100 return;
101 }
102 size_t new_block_count = kBlocksGrowthFactor * current_blocks_count_;
103 new_block_count = std::min(std::max(new_block_count, num_of_blocks_needed),
104 max_blocks_count_);
105 auto new_blocks = std::make_unique<BufferBlock*[]>(new_block_count);
106 if (blocks_ != nullptr) {
107 memcpy(new_blocks.get(), blocks_.get(),
108 current_blocks_count_ * sizeof(BufferBlock*));
109 }
110 blocks_ = std::move(new_blocks);
111 current_blocks_count_ = new_block_count;
112 }
113
OnStreamData(QuicStreamOffset starting_offset,absl::string_view data,size_t * const bytes_buffered,std::string * error_details)114 QuicErrorCode QuicStreamSequencerBuffer::OnStreamData(
115 QuicStreamOffset starting_offset,
116 absl::string_view data,
117 size_t* const bytes_buffered,
118 std::string* error_details) {
119 *bytes_buffered = 0;
120 size_t size = data.size();
121 if (size == 0) {
122 *error_details = "Received empty stream frame without FIN.";
123 return QUIC_EMPTY_STREAM_FRAME_NO_FIN;
124 }
125 // Write beyond the current range this buffer is covering.
126 if (starting_offset + size > total_bytes_read_ + max_buffer_capacity_bytes_ ||
127 starting_offset + size < starting_offset) {
128 *error_details = "Received data beyond available range.";
129 return QUIC_INTERNAL_ERROR;
130 }
131 if (allocate_blocks_on_demand_) {
132 QUIC_RELOADABLE_FLAG_COUNT(
133 quic_allocate_stream_sequencer_buffer_blocks_on_demand);
134 MaybeAddMoreBlocks(starting_offset + size);
135 }
136
137 if (bytes_received_.Empty() ||
138 starting_offset >= bytes_received_.rbegin()->max() ||
139 bytes_received_.IsDisjoint(QuicInterval<QuicStreamOffset>(
140 starting_offset, starting_offset + size))) {
141 // Optimization for the typical case, when all data is newly received.
142 bytes_received_.AddOptimizedForAppend(starting_offset,
143 starting_offset + size);
144 if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
145 // This frame is going to create more intervals than allowed. Stop
146 // processing.
147 *error_details = "Too many data intervals received for this stream.";
148 return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
149 }
150
151 size_t bytes_copy = 0;
152 if (!CopyStreamData(starting_offset, data, &bytes_copy, error_details)) {
153 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
154 }
155 *bytes_buffered += bytes_copy;
156 num_bytes_buffered_ += *bytes_buffered;
157 return QUIC_NO_ERROR;
158 }
159 // Slow path, received data overlaps with received data.
160 QuicIntervalSet<QuicStreamOffset> newly_received(starting_offset,
161 starting_offset + size);
162 newly_received.Difference(bytes_received_);
163 if (newly_received.Empty()) {
164 return QUIC_NO_ERROR;
165 }
166 bytes_received_.Add(starting_offset, starting_offset + size);
167 if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
168 // This frame is going to create more intervals than allowed. Stop
169 // processing.
170 *error_details = "Too many data intervals received for this stream.";
171 return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
172 }
173 for (const auto& interval : newly_received) {
174 const QuicStreamOffset copy_offset = interval.min();
175 const QuicByteCount copy_length = interval.max() - interval.min();
176 size_t bytes_copy = 0;
177 if (!CopyStreamData(copy_offset,
178 data.substr(copy_offset - starting_offset, copy_length),
179 &bytes_copy, error_details)) {
180 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
181 }
182 *bytes_buffered += bytes_copy;
183 }
184 num_bytes_buffered_ += *bytes_buffered;
185 return QUIC_NO_ERROR;
186 }
187
CopyStreamData(QuicStreamOffset offset,absl::string_view data,size_t * bytes_copy,std::string * error_details)188 bool QuicStreamSequencerBuffer::CopyStreamData(QuicStreamOffset offset,
189 absl::string_view data,
190 size_t* bytes_copy,
191 std::string* error_details) {
192 *bytes_copy = 0;
193 size_t source_remaining = data.size();
194 if (source_remaining == 0) {
195 return true;
196 }
197 const char* source = data.data();
198 // Write data block by block. If corresponding block has not created yet,
199 // create it first.
200 // Stop when all data are written or reaches the logical end of the buffer.
201 while (source_remaining > 0) {
202 const size_t write_block_num = GetBlockIndex(offset);
203 const size_t write_block_offset = GetInBlockOffset(offset);
204 size_t current_blocks_count =
205 allocate_blocks_on_demand_ ? current_blocks_count_ : max_blocks_count_;
206 DCHECK_GT(current_blocks_count, write_block_num);
207
208 size_t block_capacity = GetBlockCapacity(write_block_num);
209 size_t bytes_avail = block_capacity - write_block_offset;
210
211 // If this write meets the upper boundary of the buffer,
212 // reduce the available free bytes.
213 if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) {
214 bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset;
215 }
216
217 if (!allocate_blocks_on_demand_) {
218 if (blocks_ == nullptr) {
219 blocks_.reset(new BufferBlock*[max_blocks_count_]());
220 for (size_t i = 0; i < max_blocks_count_; ++i) {
221 blocks_[i] = nullptr;
222 }
223 }
224 }
225
226 if (write_block_num >= current_blocks_count) {
227 *error_details = quiche::QuicheStrCat(
228 "QuicStreamSequencerBuffer error: OnStreamData() exceed array bounds."
229 "write offset = ",
230 offset, " write_block_num = ", write_block_num,
231 " current_blocks_count_ = ", current_blocks_count);
232 return false;
233 }
234 if (blocks_ == nullptr) {
235 *error_details =
236 "QuicStreamSequencerBuffer error: OnStreamData() blocks_ is null";
237 return false;
238 }
239 if (blocks_[write_block_num] == nullptr) {
240 // TODO(danzh): Investigate if using a freelist would improve performance.
241 // Same as RetireBlock().
242 blocks_[write_block_num] = new BufferBlock();
243 }
244
245 const size_t bytes_to_copy =
246 std::min<size_t>(bytes_avail, source_remaining);
247 char* dest = blocks_[write_block_num]->buffer + write_block_offset;
248 QUIC_DVLOG(1) << "Write at offset: " << offset
249 << " length: " << bytes_to_copy;
250
251 if (dest == nullptr || source == nullptr) {
252 *error_details = quiche::QuicheStrCat(
253 "QuicStreamSequencerBuffer error: OnStreamData()"
254 " dest == nullptr: ",
255 (dest == nullptr), " source == nullptr: ", (source == nullptr),
256 " Writing at offset ", offset,
257 " Received frames: ", ReceivedFramesDebugString(),
258 " total_bytes_read_ = ", total_bytes_read_);
259 return false;
260 }
261 memcpy(dest, source, bytes_to_copy);
262 source += bytes_to_copy;
263 source_remaining -= bytes_to_copy;
264 offset += bytes_to_copy;
265 *bytes_copy += bytes_to_copy;
266 }
267 return true;
268 }
269
Readv(const iovec * dest_iov,size_t dest_count,size_t * bytes_read,std::string * error_details)270 QuicErrorCode QuicStreamSequencerBuffer::Readv(const iovec* dest_iov,
271 size_t dest_count,
272 size_t* bytes_read,
273 std::string* error_details) {
274 *bytes_read = 0;
275 for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) {
276 char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base);
277 DCHECK(dest != nullptr);
278 size_t dest_remaining = dest_iov[i].iov_len;
279 while (dest_remaining > 0 && ReadableBytes() > 0) {
280 size_t block_idx = NextBlockToRead();
281 size_t start_offset_in_block = ReadOffset();
282 size_t block_capacity = GetBlockCapacity(block_idx);
283 size_t bytes_available_in_block = std::min<size_t>(
284 ReadableBytes(), block_capacity - start_offset_in_block);
285 size_t bytes_to_copy =
286 std::min<size_t>(bytes_available_in_block, dest_remaining);
287 DCHECK_GT(bytes_to_copy, 0u);
288 if (blocks_[block_idx] == nullptr || dest == nullptr) {
289 *error_details = quiche::QuicheStrCat(
290 "QuicStreamSequencerBuffer error:"
291 " Readv() dest == nullptr: ",
292 (dest == nullptr), " blocks_[", block_idx,
293 "] == nullptr: ", (blocks_[block_idx] == nullptr),
294 " Received frames: ", ReceivedFramesDebugString(),
295 " total_bytes_read_ = ", total_bytes_read_);
296 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
297 }
298 memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block,
299 bytes_to_copy);
300 dest += bytes_to_copy;
301 dest_remaining -= bytes_to_copy;
302 num_bytes_buffered_ -= bytes_to_copy;
303 total_bytes_read_ += bytes_to_copy;
304 *bytes_read += bytes_to_copy;
305
306 // Retire the block if all the data is read out and no other data is
307 // stored in this block.
308 // In case of failing to retire a block which is ready to retire, return
309 // immediately.
310 if (bytes_to_copy == bytes_available_in_block) {
311 bool retire_successfully = RetireBlockIfEmpty(block_idx);
312 if (!retire_successfully) {
313 *error_details = quiche::QuicheStrCat(
314 "QuicStreamSequencerBuffer error: fail to retire block ",
315 block_idx,
316 " as the block is already released, total_bytes_read_ = ",
317 total_bytes_read_,
318 " Received frames: ", ReceivedFramesDebugString());
319 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
320 }
321 }
322 }
323 }
324
325 return QUIC_NO_ERROR;
326 }
327
GetReadableRegions(struct iovec * iov,int iov_len) const328 int QuicStreamSequencerBuffer::GetReadableRegions(struct iovec* iov,
329 int iov_len) const {
330 DCHECK(iov != nullptr);
331 DCHECK_GT(iov_len, 0);
332
333 if (ReadableBytes() == 0) {
334 iov[0].iov_base = nullptr;
335 iov[0].iov_len = 0;
336 return 0;
337 }
338
339 size_t start_block_idx = NextBlockToRead();
340 QuicStreamOffset readable_offset_end = FirstMissingByte() - 1;
341 DCHECK_GE(readable_offset_end + 1, total_bytes_read_);
342 size_t end_block_offset = GetInBlockOffset(readable_offset_end);
343 size_t end_block_idx = GetBlockIndex(readable_offset_end);
344
345 // If readable region is within one block, deal with it seperately.
346 if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) {
347 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
348 iov[0].iov_len = ReadableBytes();
349 QUIC_DVLOG(1) << "Got only a single block with index: " << start_block_idx;
350 return 1;
351 }
352
353 // Get first block
354 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
355 iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset();
356 QUIC_DVLOG(1) << "Got first block " << start_block_idx << " with len "
357 << iov[0].iov_len;
358 DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len)
359 << "there should be more available data";
360
361 // Get readable regions of the rest blocks till either 2nd to last block
362 // before gap is met or |iov| is filled. For these blocks, one whole block is
363 // a region.
364 int iov_used = 1;
365 size_t block_idx = (start_block_idx + iov_used) % max_blocks_count_;
366 while (block_idx != end_block_idx && iov_used < iov_len) {
367 DCHECK(nullptr != blocks_[block_idx]);
368 iov[iov_used].iov_base = blocks_[block_idx]->buffer;
369 iov[iov_used].iov_len = GetBlockCapacity(block_idx);
370 QUIC_DVLOG(1) << "Got block with index: " << block_idx;
371 ++iov_used;
372 block_idx = (start_block_idx + iov_used) % max_blocks_count_;
373 }
374
375 // Deal with last block if |iov| can hold more.
376 if (iov_used < iov_len) {
377 DCHECK(nullptr != blocks_[block_idx]);
378 iov[iov_used].iov_base = blocks_[end_block_idx]->buffer;
379 iov[iov_used].iov_len = end_block_offset + 1;
380 QUIC_DVLOG(1) << "Got last block with index: " << end_block_idx;
381 ++iov_used;
382 }
383 return iov_used;
384 }
385
GetReadableRegion(iovec * iov) const386 bool QuicStreamSequencerBuffer::GetReadableRegion(iovec* iov) const {
387 return GetReadableRegions(iov, 1) == 1;
388 }
389
PeekRegion(QuicStreamOffset offset,iovec * iov) const390 bool QuicStreamSequencerBuffer::PeekRegion(QuicStreamOffset offset,
391 iovec* iov) const {
392 DCHECK(iov);
393
394 if (offset < total_bytes_read_) {
395 // Data at |offset| has already been consumed.
396 return false;
397 }
398
399 if (offset >= FirstMissingByte()) {
400 // Data at |offset| has not been received yet.
401 return false;
402 }
403
404 // Beginning of region.
405 size_t block_idx = GetBlockIndex(offset);
406 size_t block_offset = GetInBlockOffset(offset);
407 iov->iov_base = blocks_[block_idx]->buffer + block_offset;
408
409 // Determine if entire block has been received.
410 size_t end_block_idx = GetBlockIndex(FirstMissingByte());
411 if (block_idx == end_block_idx) {
412 // Only read part of block before FirstMissingByte().
413 iov->iov_len = GetInBlockOffset(FirstMissingByte()) - block_offset;
414 } else {
415 // Read entire block.
416 iov->iov_len = GetBlockCapacity(block_idx) - block_offset;
417 }
418
419 return true;
420 }
421
MarkConsumed(size_t bytes_consumed)422 bool QuicStreamSequencerBuffer::MarkConsumed(size_t bytes_consumed) {
423 if (bytes_consumed > ReadableBytes()) {
424 return false;
425 }
426 size_t bytes_to_consume = bytes_consumed;
427 while (bytes_to_consume > 0) {
428 size_t block_idx = NextBlockToRead();
429 size_t offset_in_block = ReadOffset();
430 size_t bytes_available = std::min<size_t>(
431 ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block);
432 size_t bytes_read = std::min<size_t>(bytes_to_consume, bytes_available);
433 total_bytes_read_ += bytes_read;
434 num_bytes_buffered_ -= bytes_read;
435 bytes_to_consume -= bytes_read;
436 // If advanced to the end of current block and end of buffer hasn't wrapped
437 // to this block yet.
438 if (bytes_available == bytes_read) {
439 RetireBlockIfEmpty(block_idx);
440 }
441 }
442
443 return true;
444 }
445
FlushBufferedFrames()446 size_t QuicStreamSequencerBuffer::FlushBufferedFrames() {
447 size_t prev_total_bytes_read = total_bytes_read_;
448 total_bytes_read_ = NextExpectedByte();
449 Clear();
450 return total_bytes_read_ - prev_total_bytes_read;
451 }
452
ReleaseWholeBuffer()453 void QuicStreamSequencerBuffer::ReleaseWholeBuffer() {
454 Clear();
455 current_blocks_count_ = 0;
456 blocks_.reset(nullptr);
457 }
458
ReadableBytes() const459 size_t QuicStreamSequencerBuffer::ReadableBytes() const {
460 return FirstMissingByte() - total_bytes_read_;
461 }
462
HasBytesToRead() const463 bool QuicStreamSequencerBuffer::HasBytesToRead() const {
464 return ReadableBytes() > 0;
465 }
466
BytesConsumed() const467 QuicStreamOffset QuicStreamSequencerBuffer::BytesConsumed() const {
468 return total_bytes_read_;
469 }
470
BytesBuffered() const471 size_t QuicStreamSequencerBuffer::BytesBuffered() const {
472 return num_bytes_buffered_;
473 }
474
GetBlockIndex(QuicStreamOffset offset) const475 size_t QuicStreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const {
476 return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes;
477 }
478
GetInBlockOffset(QuicStreamOffset offset) const479 size_t QuicStreamSequencerBuffer::GetInBlockOffset(
480 QuicStreamOffset offset) const {
481 return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes;
482 }
483
ReadOffset() const484 size_t QuicStreamSequencerBuffer::ReadOffset() const {
485 return GetInBlockOffset(total_bytes_read_);
486 }
487
NextBlockToRead() const488 size_t QuicStreamSequencerBuffer::NextBlockToRead() const {
489 return GetBlockIndex(total_bytes_read_);
490 }
491
RetireBlockIfEmpty(size_t block_index)492 bool QuicStreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) {
493 DCHECK(ReadableBytes() == 0 || GetInBlockOffset(total_bytes_read_) == 0)
494 << "RetireBlockIfEmpty() should only be called when advancing to next "
495 << "block or a gap has been reached.";
496 // If the whole buffer becomes empty, the last piece of data has been read.
497 if (Empty()) {
498 return RetireBlock(block_index);
499 }
500
501 // Check where the logical end of this buffer is.
502 // Not empty if the end of circular buffer has been wrapped to this block.
503 if (GetBlockIndex(NextExpectedByte() - 1) == block_index) {
504 return true;
505 }
506
507 // Read index remains in this block, which means a gap has been reached.
508 if (NextBlockToRead() == block_index) {
509 if (bytes_received_.Size() > 1) {
510 auto it = bytes_received_.begin();
511 ++it;
512 if (GetBlockIndex(it->min()) == block_index) {
513 // Do not retire the block if next data interval is in this block.
514 return true;
515 }
516 } else {
517 QUIC_BUG << "Read stopped at where it shouldn't.";
518 return false;
519 }
520 }
521 return RetireBlock(block_index);
522 }
523
Empty() const524 bool QuicStreamSequencerBuffer::Empty() const {
525 return bytes_received_.Empty() ||
526 (bytes_received_.Size() == 1 && total_bytes_read_ > 0 &&
527 bytes_received_.begin()->max() == total_bytes_read_);
528 }
529
GetBlockCapacity(size_t block_index) const530 size_t QuicStreamSequencerBuffer::GetBlockCapacity(size_t block_index) const {
531 if ((block_index + 1) == max_blocks_count_) {
532 size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes;
533 if (result == 0) { // whole block
534 result = kBlockSizeBytes;
535 }
536 return result;
537 } else {
538 return kBlockSizeBytes;
539 }
540 }
541
ReceivedFramesDebugString() const542 std::string QuicStreamSequencerBuffer::ReceivedFramesDebugString() const {
543 return bytes_received_.ToString();
544 }
545
FirstMissingByte() const546 QuicStreamOffset QuicStreamSequencerBuffer::FirstMissingByte() const {
547 if (bytes_received_.Empty() || bytes_received_.begin()->min() > 0) {
548 // Offset 0 is not received yet.
549 return 0;
550 }
551 return bytes_received_.begin()->max();
552 }
553
NextExpectedByte() const554 QuicStreamOffset QuicStreamSequencerBuffer::NextExpectedByte() const {
555 if (bytes_received_.Empty()) {
556 return 0;
557 }
558 return bytes_received_.rbegin()->max();
559 }
560
561 } // namespace quic
562