1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #pragma once
8
9 #include "td/utils/common.h"
10 #include "td/utils/port/thread_local.h"
11 #include "td/utils/Slice.h"
12
13 #include <atomic>
14 #include <limits>
15 #include <memory>
16
17 namespace td {
18
19 struct BufferRaw {
BufferRawBufferRaw20 explicit BufferRaw(size_t size) : data_size_(size) {
21 }
22 size_t data_size_;
23
24 // Constant after first reader is created.
25 // May be change by writer before it.
26 // So writer may do prepends till there is no reader created.
27 size_t begin_ = 0;
28
29 // Write by writer.
30 // Read by reader.
31 std::atomic<size_t> end_{0};
32
33 mutable std::atomic<int32> ref_cnt_{1};
34 std::atomic<bool> has_writer_{true};
35 bool was_reader_{false};
36
37 alignas(4) unsigned char data_[1];
38 };
39
40 class BufferAllocator {
41 public:
42 class DeleteWriterPtr {
43 public:
operator()44 void operator()(BufferRaw *ptr) {
45 ptr->has_writer_.store(false, std::memory_order_release);
46 dec_ref_cnt(ptr);
47 }
48 };
49 class DeleteReaderPtr {
50 public:
operator()51 void operator()(BufferRaw *ptr) {
52 dec_ref_cnt(ptr);
53 }
54 };
55
56 using WriterPtr = std::unique_ptr<BufferRaw, DeleteWriterPtr>;
57 using ReaderPtr = std::unique_ptr<BufferRaw, DeleteReaderPtr>;
58
59 static WriterPtr create_writer(size_t size);
60
61 static WriterPtr create_writer(size_t size, size_t prepend, size_t append);
62
63 static ReaderPtr create_reader(size_t size);
64
65 static ReaderPtr create_reader(const WriterPtr &raw);
66
67 static ReaderPtr create_reader(const ReaderPtr &raw);
68
69 static size_t get_buffer_mem();
70 static int64 get_buffer_slice_size();
71
72 static void clear_thread_local();
73
74 private:
75 friend class BufferSlice;
76
77 static void track_buffer_slice(int64 size);
78
79 static ReaderPtr create_reader_fast(size_t size);
80
81 static WriterPtr create_writer_exact(size_t size);
82
83 struct BufferRawDeleter {
operatorBufferRawDeleter84 void operator()(BufferRaw *ptr) {
85 dec_ref_cnt(ptr);
86 }
87 };
88 struct BufferRawTls {
89 std::unique_ptr<BufferRaw, BufferRawDeleter> buffer_raw;
90 };
91
92 static TD_THREAD_LOCAL BufferRawTls *buffer_raw_tls;
93
94 static void dec_ref_cnt(BufferRaw *ptr);
95
96 static BufferRaw *create_buffer_raw(size_t size);
97
98 static std::atomic<size_t> buffer_mem;
99 };
100
101 using BufferWriterPtr = BufferAllocator::WriterPtr;
102 using BufferReaderPtr = BufferAllocator::ReaderPtr;
103
104 class BufferSlice {
105 public:
106 BufferSlice() = default;
BufferSlice(BufferReaderPtr buffer_ptr)107 explicit BufferSlice(BufferReaderPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) {
108 if (is_null()) {
109 return;
110 }
111 begin_ = buffer_->begin_;
112 end_ = begin_;
113 sync_with_writer();
114 }
BufferSlice(BufferReaderPtr buffer_ptr,size_t begin,size_t end)115 BufferSlice(BufferReaderPtr buffer_ptr, size_t begin, size_t end)
116 : buffer_(std::move(buffer_ptr)), begin_(begin), end_(end) {
117 debug_track();
118 }
119 BufferSlice(const BufferSlice &other) = delete;
120 BufferSlice &operator=(const BufferSlice &other) = delete;
BufferSlice(BufferSlice && other)121 BufferSlice(BufferSlice &&other) noexcept : BufferSlice(std::move(other.buffer_), other.begin_, other.end_) {
122 debug_untrack(); // yes, debug_untrack
123 }
124 BufferSlice &operator=(BufferSlice &&other) noexcept {
125 if (this == &other) {
126 return *this;
127 }
128 debug_untrack();
129 buffer_ = std::move(other.buffer_);
130 begin_ = other.begin_;
131 end_ = other.end_;
132 return *this;
133 }
134
BufferSlice(size_t size)135 explicit BufferSlice(size_t size) : buffer_(BufferAllocator::create_reader(size)) {
136 end_ = buffer_->end_.load(std::memory_order_relaxed);
137 begin_ = end_ - ((size + 7) & -8);
138 end_ = begin_ + size;
139 debug_track();
140 }
141
BufferSlice(Slice slice)142 explicit BufferSlice(Slice slice) : BufferSlice(slice.size()) {
143 as_slice().copy_from(slice);
144 }
145
BufferSlice(const char * ptr,size_t size)146 BufferSlice(const char *ptr, size_t size) : BufferSlice(Slice(ptr, size)) {
147 }
148
~BufferSlice()149 ~BufferSlice() {
150 debug_untrack();
151 }
152
debug_track()153 void debug_track() const {
154 BufferAllocator::track_buffer_slice(static_cast<int64>(size()));
155 }
debug_untrack()156 void debug_untrack() const {
157 BufferAllocator::track_buffer_slice(-static_cast<int64>(size()));
158 }
159
clone()160 BufferSlice clone() const {
161 if (is_null()) {
162 return BufferSlice(BufferReaderPtr(), begin_, end_);
163 }
164 return BufferSlice(BufferAllocator::create_reader(buffer_), begin_, end_);
165 }
166
copy()167 BufferSlice copy() const {
168 if (is_null()) {
169 return BufferSlice(BufferReaderPtr(), begin_, end_);
170 }
171 return BufferSlice(as_slice());
172 }
173
as_slice()174 Slice as_slice() const {
175 if (is_null()) {
176 return Slice();
177 }
178 return Slice(buffer_->data_ + begin_, size());
179 }
180
Slice()181 operator Slice() const {
182 return as_slice();
183 }
184
as_slice()185 MutableSlice as_slice() {
186 if (is_null()) {
187 return MutableSlice();
188 }
189 return MutableSlice(buffer_->data_ + begin_, size());
190 }
191
prepare_read()192 Slice prepare_read() const {
193 return as_slice();
194 }
195
after(size_t offset)196 Slice after(size_t offset) const {
197 auto full = as_slice();
198 full.remove_prefix(offset);
199 return full;
200 }
201
confirm_read(size_t size)202 bool confirm_read(size_t size) {
203 debug_untrack();
204 begin_ += size;
205 CHECK(begin_ <= end_);
206 debug_track();
207 return begin_ == end_;
208 }
209
truncate(size_t limit)210 void truncate(size_t limit) {
211 if (size() > limit) {
212 debug_untrack();
213 end_ = begin_ + limit;
214 debug_track();
215 }
216 }
217
from_slice(Slice slice)218 BufferSlice from_slice(Slice slice) const {
219 auto res = BufferSlice(BufferAllocator::create_reader(buffer_));
220 res.debug_untrack();
221 res.begin_ = static_cast<size_t>(slice.ubegin() - buffer_->data_);
222 res.end_ = static_cast<size_t>(slice.uend() - buffer_->data_);
223 res.debug_track();
224 CHECK(buffer_->begin_ <= res.begin_);
225 CHECK(res.begin_ <= res.end_);
226 CHECK(res.end_ <= buffer_->end_.load(std::memory_order_relaxed));
227 return res;
228 }
229
230 // like in std::string
data()231 char *data() {
232 return as_slice().data();
233 }
data()234 const char *data() const {
235 return as_slice().data();
236 }
237 char operator[](size_t at) const {
238 return as_slice()[at];
239 }
240
empty()241 bool empty() const {
242 return size() == 0;
243 }
244
is_null()245 bool is_null() const {
246 return !buffer_;
247 }
248
size()249 size_t size() const {
250 if (is_null()) {
251 return 0;
252 }
253 return end_ - begin_;
254 }
255
256 // like in std::string
length()257 size_t length() const {
258 return size();
259 }
260
261 // set end_ into writer's end_
sync_with_writer()262 size_t sync_with_writer() {
263 debug_untrack();
264 CHECK(!is_null());
265 auto old_end = end_;
266 end_ = buffer_->end_.load(std::memory_order_acquire);
267 debug_track();
268 return end_ - old_end;
269 }
is_writer_alive()270 bool is_writer_alive() const {
271 CHECK(!is_null());
272 return buffer_->has_writer_.load(std::memory_order_acquire);
273 }
clear()274 void clear() {
275 debug_untrack();
276 begin_ = 0;
277 end_ = 0;
278 buffer_ = nullptr;
279 }
280
281 private:
282 BufferReaderPtr buffer_;
283 size_t begin_ = 0;
284 size_t end_ = 0;
285 };
286
287 template <class StorerT>
store(const BufferSlice & buffer_slice,StorerT & storer)288 void store(const BufferSlice &buffer_slice, StorerT &storer) {
289 storer.store_string(buffer_slice);
290 }
291
292 template <class ParserT>
parse(BufferSlice & buffer_slice,ParserT & parser)293 void parse(BufferSlice &buffer_slice, ParserT &parser) {
294 buffer_slice = parser.template fetch_string<BufferSlice>();
295 }
296
297 class BufferWriter {
298 public:
299 BufferWriter() = default;
BufferWriter(size_t size)300 explicit BufferWriter(size_t size) : BufferWriter(BufferAllocator::create_writer(size)) {
301 }
BufferWriter(size_t size,size_t prepend,size_t append)302 BufferWriter(size_t size, size_t prepend, size_t append)
303 : BufferWriter(BufferAllocator::create_writer(size, prepend, append)) {
304 }
BufferWriter(Slice slice,size_t prepend,size_t append)305 BufferWriter(Slice slice, size_t prepend, size_t append)
306 : BufferWriter(BufferAllocator::create_writer(slice.size(), prepend, append)) {
307 as_slice().copy_from(slice);
308 }
BufferWriter(BufferWriterPtr buffer_ptr)309 explicit BufferWriter(BufferWriterPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) {
310 }
311
as_buffer_slice()312 BufferSlice as_buffer_slice() const {
313 return BufferSlice(BufferAllocator::create_reader(buffer_));
314 }
is_null()315 bool is_null() const {
316 return !buffer_;
317 }
empty()318 bool empty() const {
319 return size() == 0;
320 }
size()321 size_t size() const {
322 if (is_null()) {
323 return 0;
324 }
325 return buffer_->end_.load(std::memory_order_relaxed) - buffer_->begin_;
326 }
as_slice()327 MutableSlice as_slice() {
328 auto end = buffer_->end_.load(std::memory_order_relaxed);
329 return MutableSlice(buffer_->data_ + buffer_->begin_, buffer_->data_ + end);
330 }
as_slice()331 Slice as_slice() const {
332 auto end = buffer_->end_.load(std::memory_order_relaxed);
333 return Slice(buffer_->data_ + buffer_->begin_, buffer_->data_ + end);
334 }
335
prepare_prepend()336 MutableSlice prepare_prepend() {
337 if (is_null()) {
338 return MutableSlice();
339 }
340 CHECK(!buffer_->was_reader_);
341 return MutableSlice(buffer_->data_, buffer_->begin_);
342 }
prepare_append()343 MutableSlice prepare_append() {
344 if (is_null()) {
345 return MutableSlice();
346 }
347 auto end = buffer_->end_.load(std::memory_order_relaxed);
348 return MutableSlice(buffer_->data_ + end, buffer_->data_size_ - end);
349 }
confirm_append(size_t size)350 void confirm_append(size_t size) {
351 if (is_null()) {
352 CHECK(size == 0);
353 return;
354 }
355 auto new_end = buffer_->end_.load(std::memory_order_relaxed) + size;
356 CHECK(new_end <= buffer_->data_size_);
357 buffer_->end_.store(new_end, std::memory_order_release);
358 }
confirm_prepend(size_t size)359 void confirm_prepend(size_t size) {
360 if (is_null()) {
361 CHECK(size == 0);
362 return;
363 }
364 CHECK(buffer_->begin_ >= size);
365 buffer_->begin_ -= size;
366 }
367
368 private:
369 BufferWriterPtr buffer_;
370 };
371
372 struct ChainBufferNode {
373 friend struct DeleteWriterPtr;
374 struct DeleteWriterPtr {
operatorChainBufferNode::DeleteWriterPtr375 void operator()(ChainBufferNode *ptr) {
376 ptr->has_writer_.store(false, std::memory_order_release);
377 dec_ref_cnt(ptr);
378 }
379 };
380 friend struct DeleteReaderPtr;
381 struct DeleteReaderPtr {
operatorChainBufferNode::DeleteReaderPtr382 void operator()(ChainBufferNode *ptr) {
383 dec_ref_cnt(ptr);
384 }
385 };
386 using WriterPtr = std::unique_ptr<ChainBufferNode, DeleteWriterPtr>;
387 using ReaderPtr = std::unique_ptr<ChainBufferNode, DeleteReaderPtr>;
388
make_writer_ptrChainBufferNode389 static WriterPtr make_writer_ptr(ChainBufferNode *ptr) {
390 ptr->ref_cnt_.store(1, std::memory_order_relaxed);
391 ptr->has_writer_.store(true, std::memory_order_relaxed);
392 return WriterPtr(ptr);
393 }
make_reader_ptrChainBufferNode394 static ReaderPtr make_reader_ptr(ChainBufferNode *ptr) {
395 ptr->ref_cnt_.fetch_add(1, std::memory_order_acq_rel);
396 return ReaderPtr(ptr);
397 }
398
has_writerChainBufferNode399 bool has_writer() {
400 return has_writer_.load(std::memory_order_acquire);
401 }
402
uniqueChainBufferNode403 bool unique() {
404 return ref_cnt_.load(std::memory_order_acquire) == 1;
405 }
406
ChainBufferNodeChainBufferNode407 ChainBufferNode(BufferSlice slice, bool sync_flag) : slice_(std::move(slice)), sync_flag_(sync_flag) {
408 }
409
410 // reader
411 // There are two options
412 // 1. Fixed slice of Buffer
413 // 2. Slice with non-fixed right end
414 // In each case slice_ is const. Reader should read it and use sync_with_writer on its own copy.
415 const BufferSlice slice_;
416 const bool sync_flag_{false}; // should we call slice_.sync_with_writer or not.
417
418 // writer
419 ReaderPtr next_{nullptr};
420
421 private:
422 std::atomic<int> ref_cnt_{0};
423 std::atomic<bool> has_writer_{false};
424
clear_nonrecursiveChainBufferNode425 static void clear_nonrecursive(ReaderPtr ptr) {
426 while (ptr && ptr->unique()) {
427 ptr = std::move(ptr->next_);
428 }
429 }
dec_ref_cntChainBufferNode430 static void dec_ref_cnt(ChainBufferNode *ptr) {
431 int left = --ptr->ref_cnt_;
432 if (left == 0) {
433 clear_nonrecursive(std::move(ptr->next_));
434 // TODO(refact): move memory management into allocator (?)
435 delete ptr;
436 }
437 }
438 };
439
440 using ChainBufferNodeWriterPtr = ChainBufferNode::WriterPtr;
441 using ChainBufferNodeReaderPtr = ChainBufferNode::ReaderPtr;
442
443 class ChainBufferNodeAllocator {
444 public:
create(BufferSlice slice,bool sync_flag)445 static ChainBufferNodeWriterPtr create(BufferSlice slice, bool sync_flag) {
446 auto *ptr = new ChainBufferNode(std::move(slice), sync_flag);
447 return ChainBufferNode::make_writer_ptr(ptr);
448 }
clone(const ChainBufferNodeReaderPtr & ptr)449 static ChainBufferNodeReaderPtr clone(const ChainBufferNodeReaderPtr &ptr) {
450 if (!ptr) {
451 return ChainBufferNodeReaderPtr();
452 }
453 return ChainBufferNode::make_reader_ptr(ptr.get());
454 }
clone(ChainBufferNodeWriterPtr & ptr)455 static ChainBufferNodeReaderPtr clone(ChainBufferNodeWriterPtr &ptr) {
456 if (!ptr) {
457 return ChainBufferNodeReaderPtr();
458 }
459 return ChainBufferNode::make_reader_ptr(ptr.get());
460 }
461 };
462
463 class ChainBufferIterator {
464 public:
465 ChainBufferIterator() = default;
ChainBufferIterator(ChainBufferNodeReaderPtr head)466 explicit ChainBufferIterator(ChainBufferNodeReaderPtr head) : head_(std::move(head)) {
467 load_head();
468 }
clone()469 ChainBufferIterator clone() const {
470 return ChainBufferIterator(ChainBufferNodeAllocator::clone(head_), reader_.clone(), need_sync_, offset_);
471 }
472
offset()473 size_t offset() const {
474 return offset_;
475 }
476
clear()477 void clear() {
478 *this = ChainBufferIterator();
479 }
480
prepare_read()481 Slice prepare_read() {
482 if (!head_) {
483 return Slice();
484 }
485 while (true) {
486 auto res = reader_.prepare_read();
487 if (!res.empty()) {
488 return res;
489 }
490 auto has_writer = head_->has_writer();
491 if (need_sync_) {
492 reader_.sync_with_writer();
493 res = reader_.prepare_read();
494 if (!res.empty()) {
495 return res;
496 }
497 }
498 if (has_writer) {
499 return Slice();
500 }
501 head_ = ChainBufferNodeAllocator::clone(head_->next_);
502 if (!head_) {
503 return Slice();
504 }
505 load_head();
506 }
507 }
508
509 // returns only head
read_as_buffer_slice(size_t limit)510 BufferSlice read_as_buffer_slice(size_t limit) {
511 prepare_read();
512 auto res = reader_.clone();
513 res.truncate(limit);
514 confirm_read(res.size());
515 return res;
516 }
517
head()518 const BufferSlice &head() const {
519 return reader_;
520 }
521
confirm_read(size_t size)522 void confirm_read(size_t size) {
523 offset_ += size;
524 reader_.confirm_read(size);
525 }
526
advance_till_end()527 void advance_till_end() {
528 while (true) {
529 auto ready = prepare_read();
530 if (ready.empty()) {
531 break;
532 }
533 confirm_read(ready.size());
534 }
535 }
536
537 size_t advance(size_t offset, MutableSlice dest = MutableSlice()) {
538 size_t skipped = 0;
539 while (offset != 0) {
540 auto ready = prepare_read();
541 if (ready.empty()) {
542 break;
543 }
544
545 // read no more than offset
546 ready.truncate(offset);
547 offset -= ready.size();
548 skipped += ready.size();
549
550 // copy to dest if possible
551 auto to_dest_size = min(ready.size(), dest.size());
552 if (to_dest_size != 0) {
553 dest.copy_from(ready.substr(0, to_dest_size));
554 dest.remove_prefix(to_dest_size);
555 }
556
557 confirm_read(ready.size());
558 }
559 return skipped;
560 }
561
562 private:
563 ChainBufferNodeReaderPtr head_;
564 BufferSlice reader_; // copy of head_->slice_
565 bool need_sync_ = false; // copy of head_->sync_flag_
566 size_t offset_ = 0; // position in the union of all nodes
567
ChainBufferIterator(ChainBufferNodeReaderPtr head,BufferSlice reader,bool need_sync,size_t offset)568 ChainBufferIterator(ChainBufferNodeReaderPtr head, BufferSlice reader, bool need_sync, size_t offset)
569 : head_(std::move(head)), reader_(std::move(reader)), need_sync_(need_sync), offset_(offset) {
570 }
load_head()571 void load_head() {
572 reader_ = head_->slice_.clone();
573 need_sync_ = head_->sync_flag_;
574 }
575 };
576
577 class ChainBufferReader {
578 public:
579 ChainBufferReader() = default;
ChainBufferReader(ChainBufferNodeReaderPtr head)580 explicit ChainBufferReader(ChainBufferNodeReaderPtr head)
581 : begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) {
582 end_.advance_till_end();
583 }
ChainBufferReader(ChainBufferIterator begin,ChainBufferIterator end,bool sync_flag)584 ChainBufferReader(ChainBufferIterator begin, ChainBufferIterator end, bool sync_flag)
585 : begin_(std::move(begin)), end_(std::move(end)), sync_flag_(sync_flag) {
586 }
ChainBufferReader(ChainBufferNodeReaderPtr head,size_t size)587 ChainBufferReader(ChainBufferNodeReaderPtr head, size_t size)
588 : begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) {
589 auto advanced = end_.advance(size);
590 CHECK(advanced == size);
591 }
592 ChainBufferReader(ChainBufferReader &&) = default;
593 ChainBufferReader &operator=(ChainBufferReader &&) = default;
594 ChainBufferReader(const ChainBufferReader &) = delete;
595 ChainBufferReader &operator=(const ChainBufferReader &) = delete;
596 ~ChainBufferReader() = default;
597
clone()598 ChainBufferReader clone() {
599 return ChainBufferReader(begin_.clone(), end_.clone(), sync_flag_);
600 }
601
prepare_read()602 Slice prepare_read() {
603 auto res = begin_.prepare_read();
604 res.truncate(size());
605 return res;
606 }
607
confirm_read(size_t size)608 void confirm_read(size_t size) {
609 CHECK(size <= this->size());
610 begin_.confirm_read(size);
611 }
612
613 size_t advance(size_t offset, MutableSlice dest = MutableSlice());
614
size()615 size_t size() const {
616 return end_.offset() - begin_.offset();
617 }
empty()618 bool empty() const {
619 return size() == 0;
620 }
621
sync_with_writer()622 void sync_with_writer() {
623 if (sync_flag_) {
624 end_.advance_till_end();
625 }
626 }
advance_end(size_t size)627 void advance_end(size_t size) {
628 end_.advance(size);
629 }
begin()630 const ChainBufferIterator &begin() {
631 return begin_;
632 }
end()633 const ChainBufferIterator &end() {
634 return end_;
635 }
636
637 // Return [begin_, tail.begin_)
638 // *this = tail
cut_head(ChainBufferIterator pos)639 ChainBufferReader cut_head(ChainBufferIterator pos) TD_WARN_UNUSED_RESULT {
640 auto tmp = begin_.clone();
641 begin_ = pos.clone();
642 return ChainBufferReader(std::move(tmp), std::move(pos), false);
643 }
644
cut_head(size_t offset)645 ChainBufferReader cut_head(size_t offset) TD_WARN_UNUSED_RESULT {
646 CHECK(offset <= size());
647 auto it = begin_.clone();
648 it.advance(offset);
649 return cut_head(std::move(it));
650 }
651
move_as_buffer_slice()652 BufferSlice move_as_buffer_slice() {
653 BufferSlice res;
654 if (begin_.head().size() >= size()) {
655 res = begin_.read_as_buffer_slice(size());
656 } else {
657 auto save_size = size();
658 res = BufferSlice{save_size};
659 advance(save_size, res.as_slice());
660 }
661 *this = ChainBufferReader();
662 return res;
663 }
664
665 BufferSlice read_as_buffer_slice(size_t limit = std::numeric_limits<size_t>::max()) {
666 return begin_.read_as_buffer_slice(min(limit, size()));
667 }
668
669 private:
670 ChainBufferIterator begin_; // use it for prepare_read. Fix result with size()
671 ChainBufferIterator end_; // keep end as far as we can. use it for size()
672 bool sync_flag_ = true; // auto sync of end_
673
674 // 1. We have fixed size. Than end_ is useless.
675 // 2. No fixed size. One has to sync end_ with end_.advance_till_end() in order to calculate size.
676 };
677
678 class ChainBufferWriter {
679 public:
ChainBufferWriter()680 ChainBufferWriter() {
681 init();
682 }
683
684 void init(size_t size = 0) {
685 writer_ = BufferWriter(size);
686 tail_ = ChainBufferNodeAllocator::create(writer_.as_buffer_slice(), true);
687 head_ = ChainBufferNodeAllocator::clone(tail_);
688 }
689
690 MutableSlice prepare_append(size_t hint = 0) {
691 CHECK(!empty());
692 auto res = prepare_append_inplace();
693 if (res.empty()) {
694 return prepare_append_alloc(hint);
695 }
696 return res;
697 }
prepare_append_at_least(size_t size)698 MutableSlice prepare_append_at_least(size_t size) {
699 CHECK(!empty());
700 auto res = prepare_append_inplace();
701 if (res.size() < size) {
702 return prepare_append_alloc(size);
703 }
704 return res;
705 }
prepare_append_inplace()706 MutableSlice prepare_append_inplace() {
707 CHECK(!empty());
708 return writer_.prepare_append();
709 }
710 MutableSlice prepare_append_alloc(size_t hint = 0) {
711 CHECK(!empty());
712 if (hint < (1 << 10)) {
713 hint = 1 << 12;
714 }
715 BufferWriter new_writer(hint);
716 auto new_tail = ChainBufferNodeAllocator::create(new_writer.as_buffer_slice(), true);
717 tail_->next_ = ChainBufferNodeAllocator::clone(new_tail);
718 writer_ = std::move(new_writer);
719 tail_ = std::move(new_tail); // release tail_
720 return writer_.prepare_append();
721 }
confirm_append(size_t size)722 void confirm_append(size_t size) {
723 CHECK(!empty());
724 writer_.confirm_append(size);
725 }
726
727 void append(Slice slice, size_t hint = 0) {
728 while (!slice.empty()) {
729 auto ready = prepare_append(td::max(slice.size(), hint));
730 auto shift = min(ready.size(), slice.size());
731 ready.copy_from(slice.substr(0, shift));
732 confirm_append(shift);
733 slice.remove_prefix(shift);
734 }
735 }
736
append(BufferSlice slice)737 void append(BufferSlice slice) {
738 auto ready = prepare_append_inplace();
739 // TODO(perf): we have to store some stats in ChainBufferWriter
740 // for better append logic
741 if (slice.size() < (1 << 8) || ready.size() >= slice.size()) {
742 return append(slice.as_slice());
743 }
744
745 auto new_tail = ChainBufferNodeAllocator::create(std::move(slice), false);
746 tail_->next_ = ChainBufferNodeAllocator::clone(new_tail);
747 writer_ = BufferWriter();
748 tail_ = std::move(new_tail); // release tail_
749 }
750
append(ChainBufferReader && reader)751 void append(ChainBufferReader &&reader) {
752 while (!reader.empty()) {
753 append(reader.read_as_buffer_slice());
754 }
755 }
append(ChainBufferReader & reader)756 void append(ChainBufferReader &reader) {
757 while (!reader.empty()) {
758 append(reader.read_as_buffer_slice());
759 }
760 }
761
extract_reader()762 ChainBufferReader extract_reader() {
763 CHECK(head_);
764 return ChainBufferReader(std::move(head_));
765 }
766
767 private:
empty()768 bool empty() const {
769 return !tail_;
770 }
771
772 ChainBufferNodeReaderPtr head_;
773 ChainBufferNodeWriterPtr tail_;
774 BufferWriter writer_;
775 };
776
777 class BufferBuilder {
778 public:
779 BufferBuilder() = default;
BufferBuilder(Slice slice,size_t prepend_size,size_t append_size)780 BufferBuilder(Slice slice, size_t prepend_size, size_t append_size)
781 : buffer_writer_(slice, prepend_size, append_size) {
782 }
BufferBuilder(BufferWriter && buffer_writer)783 explicit BufferBuilder(BufferWriter &&buffer_writer) : buffer_writer_(std::move(buffer_writer)) {
784 }
785
786 void append(BufferSlice slice);
787 void append(Slice slice);
788
789 void prepend(BufferSlice slice);
790 void prepend(Slice slice);
791
792 template <class F>
for_each(F && f)793 void for_each(F &&f) const & {
794 for (auto i = to_prepend_.size(); i > 0; i--) {
795 f(to_prepend_[i - 1].as_slice());
796 }
797 if (!buffer_writer_.empty()) {
798 f(buffer_writer_.as_slice());
799 }
800 for (auto &slice : to_append_) {
801 f(slice.as_slice());
802 }
803 }
804 template <class F>
for_each(F && f)805 void for_each(F &&f) && {
806 for (auto i = to_prepend_.size(); i > 0; i--) {
807 f(std::move(to_prepend_[i - 1]));
808 }
809 if (!buffer_writer_.empty()) {
810 f(buffer_writer_.as_buffer_slice());
811 }
812 for (auto &slice : to_append_) {
813 f(std::move(slice));
814 }
815 }
816 size_t size() const;
817
818 BufferSlice extract();
819
820 private:
821 BufferWriter buffer_writer_;
822 std::vector<BufferSlice> to_append_;
823 std::vector<BufferSlice> to_prepend_;
824
825 bool append_inplace(Slice slice);
826 void append_slow(BufferSlice slice);
827 bool prepend_inplace(Slice slice);
828 void prepend_slow(BufferSlice slice);
829 };
830
as_slice(const BufferSlice & value)831 inline Slice as_slice(const BufferSlice &value) {
832 return value.as_slice();
833 }
as_slice(BufferSlice & value)834 inline MutableSlice as_slice(BufferSlice &value) {
835 return value.as_slice();
836 }
as_mutable_slice(BufferSlice & value)837 inline MutableSlice as_mutable_slice(BufferSlice &value) {
838 return value.as_slice();
839 }
840
841 } // namespace td
842