1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #pragma once
8 
9 #include "td/utils/common.h"
10 #include "td/utils/port/thread_local.h"
11 #include "td/utils/Slice.h"
12 
13 #include <atomic>
14 #include <limits>
15 #include <memory>
16 
17 namespace td {
18 
19 struct BufferRaw {
BufferRawBufferRaw20   explicit BufferRaw(size_t size) : data_size_(size) {
21   }
22   size_t data_size_;
23 
24   // Constant after first reader is created.
25   // May be change by writer before it.
26   // So writer may do prepends till there is no reader created.
27   size_t begin_ = 0;
28 
29   // Write by writer.
30   // Read by reader.
31   std::atomic<size_t> end_{0};
32 
33   mutable std::atomic<int32> ref_cnt_{1};
34   std::atomic<bool> has_writer_{true};
35   bool was_reader_{false};
36 
37   alignas(4) unsigned char data_[1];
38 };
39 
40 class BufferAllocator {
41  public:
42   class DeleteWriterPtr {
43    public:
operator()44     void operator()(BufferRaw *ptr) {
45       ptr->has_writer_.store(false, std::memory_order_release);
46       dec_ref_cnt(ptr);
47     }
48   };
49   class DeleteReaderPtr {
50    public:
operator()51     void operator()(BufferRaw *ptr) {
52       dec_ref_cnt(ptr);
53     }
54   };
55 
56   using WriterPtr = std::unique_ptr<BufferRaw, DeleteWriterPtr>;
57   using ReaderPtr = std::unique_ptr<BufferRaw, DeleteReaderPtr>;
58 
59   static WriterPtr create_writer(size_t size);
60 
61   static WriterPtr create_writer(size_t size, size_t prepend, size_t append);
62 
63   static ReaderPtr create_reader(size_t size);
64 
65   static ReaderPtr create_reader(const WriterPtr &raw);
66 
67   static ReaderPtr create_reader(const ReaderPtr &raw);
68 
69   static size_t get_buffer_mem();
70   static int64 get_buffer_slice_size();
71 
72   static void clear_thread_local();
73 
74  private:
75   friend class BufferSlice;
76 
77   static void track_buffer_slice(int64 size);
78 
79   static ReaderPtr create_reader_fast(size_t size);
80 
81   static WriterPtr create_writer_exact(size_t size);
82 
83   struct BufferRawDeleter {
operatorBufferRawDeleter84     void operator()(BufferRaw *ptr) {
85       dec_ref_cnt(ptr);
86     }
87   };
88   struct BufferRawTls {
89     std::unique_ptr<BufferRaw, BufferRawDeleter> buffer_raw;
90   };
91 
92   static TD_THREAD_LOCAL BufferRawTls *buffer_raw_tls;
93 
94   static void dec_ref_cnt(BufferRaw *ptr);
95 
96   static BufferRaw *create_buffer_raw(size_t size);
97 
98   static std::atomic<size_t> buffer_mem;
99 };
100 
101 using BufferWriterPtr = BufferAllocator::WriterPtr;
102 using BufferReaderPtr = BufferAllocator::ReaderPtr;
103 
104 class BufferSlice {
105  public:
106   BufferSlice() = default;
BufferSlice(BufferReaderPtr buffer_ptr)107   explicit BufferSlice(BufferReaderPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) {
108     if (is_null()) {
109       return;
110     }
111     begin_ = buffer_->begin_;
112     end_ = begin_;
113     sync_with_writer();
114   }
BufferSlice(BufferReaderPtr buffer_ptr,size_t begin,size_t end)115   BufferSlice(BufferReaderPtr buffer_ptr, size_t begin, size_t end)
116       : buffer_(std::move(buffer_ptr)), begin_(begin), end_(end) {
117     debug_track();
118   }
119   BufferSlice(const BufferSlice &other) = delete;
120   BufferSlice &operator=(const BufferSlice &other) = delete;
BufferSlice(BufferSlice && other)121   BufferSlice(BufferSlice &&other) noexcept : BufferSlice(std::move(other.buffer_), other.begin_, other.end_) {
122     debug_untrack();  // yes, debug_untrack
123   }
124   BufferSlice &operator=(BufferSlice &&other) noexcept {
125     if (this == &other) {
126       return *this;
127     }
128     debug_untrack();
129     buffer_ = std::move(other.buffer_);
130     begin_ = other.begin_;
131     end_ = other.end_;
132     return *this;
133   }
134 
BufferSlice(size_t size)135   explicit BufferSlice(size_t size) : buffer_(BufferAllocator::create_reader(size)) {
136     end_ = buffer_->end_.load(std::memory_order_relaxed);
137     begin_ = end_ - ((size + 7) & -8);
138     end_ = begin_ + size;
139     debug_track();
140   }
141 
BufferSlice(Slice slice)142   explicit BufferSlice(Slice slice) : BufferSlice(slice.size()) {
143     as_slice().copy_from(slice);
144   }
145 
BufferSlice(const char * ptr,size_t size)146   BufferSlice(const char *ptr, size_t size) : BufferSlice(Slice(ptr, size)) {
147   }
148 
~BufferSlice()149   ~BufferSlice() {
150     debug_untrack();
151   }
152 
debug_track()153   void debug_track() const {
154     BufferAllocator::track_buffer_slice(static_cast<int64>(size()));
155   }
debug_untrack()156   void debug_untrack() const {
157     BufferAllocator::track_buffer_slice(-static_cast<int64>(size()));
158   }
159 
clone()160   BufferSlice clone() const {
161     if (is_null()) {
162       return BufferSlice(BufferReaderPtr(), begin_, end_);
163     }
164     return BufferSlice(BufferAllocator::create_reader(buffer_), begin_, end_);
165   }
166 
copy()167   BufferSlice copy() const {
168     if (is_null()) {
169       return BufferSlice(BufferReaderPtr(), begin_, end_);
170     }
171     return BufferSlice(as_slice());
172   }
173 
as_slice()174   Slice as_slice() const {
175     if (is_null()) {
176       return Slice();
177     }
178     return Slice(buffer_->data_ + begin_, size());
179   }
180 
Slice()181   operator Slice() const {
182     return as_slice();
183   }
184 
as_slice()185   MutableSlice as_slice() {
186     if (is_null()) {
187       return MutableSlice();
188     }
189     return MutableSlice(buffer_->data_ + begin_, size());
190   }
191 
prepare_read()192   Slice prepare_read() const {
193     return as_slice();
194   }
195 
after(size_t offset)196   Slice after(size_t offset) const {
197     auto full = as_slice();
198     full.remove_prefix(offset);
199     return full;
200   }
201 
confirm_read(size_t size)202   bool confirm_read(size_t size) {
203     debug_untrack();
204     begin_ += size;
205     CHECK(begin_ <= end_);
206     debug_track();
207     return begin_ == end_;
208   }
209 
truncate(size_t limit)210   void truncate(size_t limit) {
211     if (size() > limit) {
212       debug_untrack();
213       end_ = begin_ + limit;
214       debug_track();
215     }
216   }
217 
from_slice(Slice slice)218   BufferSlice from_slice(Slice slice) const {
219     auto res = BufferSlice(BufferAllocator::create_reader(buffer_));
220     res.debug_untrack();
221     res.begin_ = static_cast<size_t>(slice.ubegin() - buffer_->data_);
222     res.end_ = static_cast<size_t>(slice.uend() - buffer_->data_);
223     res.debug_track();
224     CHECK(buffer_->begin_ <= res.begin_);
225     CHECK(res.begin_ <= res.end_);
226     CHECK(res.end_ <= buffer_->end_.load(std::memory_order_relaxed));
227     return res;
228   }
229 
230   // like in std::string
data()231   char *data() {
232     return as_slice().data();
233   }
data()234   const char *data() const {
235     return as_slice().data();
236   }
237   char operator[](size_t at) const {
238     return as_slice()[at];
239   }
240 
empty()241   bool empty() const {
242     return size() == 0;
243   }
244 
is_null()245   bool is_null() const {
246     return !buffer_;
247   }
248 
size()249   size_t size() const {
250     if (is_null()) {
251       return 0;
252     }
253     return end_ - begin_;
254   }
255 
256   // like in std::string
length()257   size_t length() const {
258     return size();
259   }
260 
261   // set end_ into writer's end_
sync_with_writer()262   size_t sync_with_writer() {
263     debug_untrack();
264     CHECK(!is_null());
265     auto old_end = end_;
266     end_ = buffer_->end_.load(std::memory_order_acquire);
267     debug_track();
268     return end_ - old_end;
269   }
is_writer_alive()270   bool is_writer_alive() const {
271     CHECK(!is_null());
272     return buffer_->has_writer_.load(std::memory_order_acquire);
273   }
clear()274   void clear() {
275     debug_untrack();
276     begin_ = 0;
277     end_ = 0;
278     buffer_ = nullptr;
279   }
280 
281  private:
282   BufferReaderPtr buffer_;
283   size_t begin_ = 0;
284   size_t end_ = 0;
285 };
286 
287 template <class StorerT>
store(const BufferSlice & buffer_slice,StorerT & storer)288 void store(const BufferSlice &buffer_slice, StorerT &storer) {
289   storer.store_string(buffer_slice);
290 }
291 
292 template <class ParserT>
parse(BufferSlice & buffer_slice,ParserT & parser)293 void parse(BufferSlice &buffer_slice, ParserT &parser) {
294   buffer_slice = parser.template fetch_string<BufferSlice>();
295 }
296 
297 class BufferWriter {
298  public:
299   BufferWriter() = default;
BufferWriter(size_t size)300   explicit BufferWriter(size_t size) : BufferWriter(BufferAllocator::create_writer(size)) {
301   }
BufferWriter(size_t size,size_t prepend,size_t append)302   BufferWriter(size_t size, size_t prepend, size_t append)
303       : BufferWriter(BufferAllocator::create_writer(size, prepend, append)) {
304   }
BufferWriter(Slice slice,size_t prepend,size_t append)305   BufferWriter(Slice slice, size_t prepend, size_t append)
306       : BufferWriter(BufferAllocator::create_writer(slice.size(), prepend, append)) {
307     as_slice().copy_from(slice);
308   }
BufferWriter(BufferWriterPtr buffer_ptr)309   explicit BufferWriter(BufferWriterPtr buffer_ptr) : buffer_(std::move(buffer_ptr)) {
310   }
311 
as_buffer_slice()312   BufferSlice as_buffer_slice() const {
313     return BufferSlice(BufferAllocator::create_reader(buffer_));
314   }
is_null()315   bool is_null() const {
316     return !buffer_;
317   }
empty()318   bool empty() const {
319     return size() == 0;
320   }
size()321   size_t size() const {
322     if (is_null()) {
323       return 0;
324     }
325     return buffer_->end_.load(std::memory_order_relaxed) - buffer_->begin_;
326   }
as_slice()327   MutableSlice as_slice() {
328     auto end = buffer_->end_.load(std::memory_order_relaxed);
329     return MutableSlice(buffer_->data_ + buffer_->begin_, buffer_->data_ + end);
330   }
as_slice()331   Slice as_slice() const {
332     auto end = buffer_->end_.load(std::memory_order_relaxed);
333     return Slice(buffer_->data_ + buffer_->begin_, buffer_->data_ + end);
334   }
335 
prepare_prepend()336   MutableSlice prepare_prepend() {
337     if (is_null()) {
338       return MutableSlice();
339     }
340     CHECK(!buffer_->was_reader_);
341     return MutableSlice(buffer_->data_, buffer_->begin_);
342   }
prepare_append()343   MutableSlice prepare_append() {
344     if (is_null()) {
345       return MutableSlice();
346     }
347     auto end = buffer_->end_.load(std::memory_order_relaxed);
348     return MutableSlice(buffer_->data_ + end, buffer_->data_size_ - end);
349   }
confirm_append(size_t size)350   void confirm_append(size_t size) {
351     if (is_null()) {
352       CHECK(size == 0);
353       return;
354     }
355     auto new_end = buffer_->end_.load(std::memory_order_relaxed) + size;
356     CHECK(new_end <= buffer_->data_size_);
357     buffer_->end_.store(new_end, std::memory_order_release);
358   }
confirm_prepend(size_t size)359   void confirm_prepend(size_t size) {
360     if (is_null()) {
361       CHECK(size == 0);
362       return;
363     }
364     CHECK(buffer_->begin_ >= size);
365     buffer_->begin_ -= size;
366   }
367 
368  private:
369   BufferWriterPtr buffer_;
370 };
371 
372 struct ChainBufferNode {
373   friend struct DeleteWriterPtr;
374   struct DeleteWriterPtr {
operatorChainBufferNode::DeleteWriterPtr375     void operator()(ChainBufferNode *ptr) {
376       ptr->has_writer_.store(false, std::memory_order_release);
377       dec_ref_cnt(ptr);
378     }
379   };
380   friend struct DeleteReaderPtr;
381   struct DeleteReaderPtr {
operatorChainBufferNode::DeleteReaderPtr382     void operator()(ChainBufferNode *ptr) {
383       dec_ref_cnt(ptr);
384     }
385   };
386   using WriterPtr = std::unique_ptr<ChainBufferNode, DeleteWriterPtr>;
387   using ReaderPtr = std::unique_ptr<ChainBufferNode, DeleteReaderPtr>;
388 
make_writer_ptrChainBufferNode389   static WriterPtr make_writer_ptr(ChainBufferNode *ptr) {
390     ptr->ref_cnt_.store(1, std::memory_order_relaxed);
391     ptr->has_writer_.store(true, std::memory_order_relaxed);
392     return WriterPtr(ptr);
393   }
make_reader_ptrChainBufferNode394   static ReaderPtr make_reader_ptr(ChainBufferNode *ptr) {
395     ptr->ref_cnt_.fetch_add(1, std::memory_order_acq_rel);
396     return ReaderPtr(ptr);
397   }
398 
has_writerChainBufferNode399   bool has_writer() {
400     return has_writer_.load(std::memory_order_acquire);
401   }
402 
uniqueChainBufferNode403   bool unique() {
404     return ref_cnt_.load(std::memory_order_acquire) == 1;
405   }
406 
ChainBufferNodeChainBufferNode407   ChainBufferNode(BufferSlice slice, bool sync_flag) : slice_(std::move(slice)), sync_flag_(sync_flag) {
408   }
409 
410   // reader
411   // There are two options
412   // 1. Fixed slice of Buffer
413   // 2. Slice with non-fixed right end
414   // In each case slice_ is const. Reader should read it and use sync_with_writer on its own copy.
415   const BufferSlice slice_;
416   const bool sync_flag_{false};  // should we call slice_.sync_with_writer or not.
417 
418   // writer
419   ReaderPtr next_{nullptr};
420 
421  private:
422   std::atomic<int> ref_cnt_{0};
423   std::atomic<bool> has_writer_{false};
424 
clear_nonrecursiveChainBufferNode425   static void clear_nonrecursive(ReaderPtr ptr) {
426     while (ptr && ptr->unique()) {
427       ptr = std::move(ptr->next_);
428     }
429   }
dec_ref_cntChainBufferNode430   static void dec_ref_cnt(ChainBufferNode *ptr) {
431     int left = --ptr->ref_cnt_;
432     if (left == 0) {
433       clear_nonrecursive(std::move(ptr->next_));
434       // TODO(refact): move memory management into allocator (?)
435       delete ptr;
436     }
437   }
438 };
439 
440 using ChainBufferNodeWriterPtr = ChainBufferNode::WriterPtr;
441 using ChainBufferNodeReaderPtr = ChainBufferNode::ReaderPtr;
442 
443 class ChainBufferNodeAllocator {
444  public:
create(BufferSlice slice,bool sync_flag)445   static ChainBufferNodeWriterPtr create(BufferSlice slice, bool sync_flag) {
446     auto *ptr = new ChainBufferNode(std::move(slice), sync_flag);
447     return ChainBufferNode::make_writer_ptr(ptr);
448   }
clone(const ChainBufferNodeReaderPtr & ptr)449   static ChainBufferNodeReaderPtr clone(const ChainBufferNodeReaderPtr &ptr) {
450     if (!ptr) {
451       return ChainBufferNodeReaderPtr();
452     }
453     return ChainBufferNode::make_reader_ptr(ptr.get());
454   }
clone(ChainBufferNodeWriterPtr & ptr)455   static ChainBufferNodeReaderPtr clone(ChainBufferNodeWriterPtr &ptr) {
456     if (!ptr) {
457       return ChainBufferNodeReaderPtr();
458     }
459     return ChainBufferNode::make_reader_ptr(ptr.get());
460   }
461 };
462 
463 class ChainBufferIterator {
464  public:
465   ChainBufferIterator() = default;
ChainBufferIterator(ChainBufferNodeReaderPtr head)466   explicit ChainBufferIterator(ChainBufferNodeReaderPtr head) : head_(std::move(head)) {
467     load_head();
468   }
clone()469   ChainBufferIterator clone() const {
470     return ChainBufferIterator(ChainBufferNodeAllocator::clone(head_), reader_.clone(), need_sync_, offset_);
471   }
472 
offset()473   size_t offset() const {
474     return offset_;
475   }
476 
clear()477   void clear() {
478     *this = ChainBufferIterator();
479   }
480 
prepare_read()481   Slice prepare_read() {
482     if (!head_) {
483       return Slice();
484     }
485     while (true) {
486       auto res = reader_.prepare_read();
487       if (!res.empty()) {
488         return res;
489       }
490       auto has_writer = head_->has_writer();
491       if (need_sync_) {
492         reader_.sync_with_writer();
493         res = reader_.prepare_read();
494         if (!res.empty()) {
495           return res;
496         }
497       }
498       if (has_writer) {
499         return Slice();
500       }
501       head_ = ChainBufferNodeAllocator::clone(head_->next_);
502       if (!head_) {
503         return Slice();
504       }
505       load_head();
506     }
507   }
508 
509   // returns only head
read_as_buffer_slice(size_t limit)510   BufferSlice read_as_buffer_slice(size_t limit) {
511     prepare_read();
512     auto res = reader_.clone();
513     res.truncate(limit);
514     confirm_read(res.size());
515     return res;
516   }
517 
head()518   const BufferSlice &head() const {
519     return reader_;
520   }
521 
confirm_read(size_t size)522   void confirm_read(size_t size) {
523     offset_ += size;
524     reader_.confirm_read(size);
525   }
526 
advance_till_end()527   void advance_till_end() {
528     while (true) {
529       auto ready = prepare_read();
530       if (ready.empty()) {
531         break;
532       }
533       confirm_read(ready.size());
534     }
535   }
536 
537   size_t advance(size_t offset, MutableSlice dest = MutableSlice()) {
538     size_t skipped = 0;
539     while (offset != 0) {
540       auto ready = prepare_read();
541       if (ready.empty()) {
542         break;
543       }
544 
545       // read no more than offset
546       ready.truncate(offset);
547       offset -= ready.size();
548       skipped += ready.size();
549 
550       // copy to dest if possible
551       auto to_dest_size = min(ready.size(), dest.size());
552       if (to_dest_size != 0) {
553         dest.copy_from(ready.substr(0, to_dest_size));
554         dest.remove_prefix(to_dest_size);
555       }
556 
557       confirm_read(ready.size());
558     }
559     return skipped;
560   }
561 
562  private:
563   ChainBufferNodeReaderPtr head_;
564   BufferSlice reader_;      // copy of head_->slice_
565   bool need_sync_ = false;  // copy of head_->sync_flag_
566   size_t offset_ = 0;       // position in the union of all nodes
567 
ChainBufferIterator(ChainBufferNodeReaderPtr head,BufferSlice reader,bool need_sync,size_t offset)568   ChainBufferIterator(ChainBufferNodeReaderPtr head, BufferSlice reader, bool need_sync, size_t offset)
569       : head_(std::move(head)), reader_(std::move(reader)), need_sync_(need_sync), offset_(offset) {
570   }
load_head()571   void load_head() {
572     reader_ = head_->slice_.clone();
573     need_sync_ = head_->sync_flag_;
574   }
575 };
576 
577 class ChainBufferReader {
578  public:
579   ChainBufferReader() = default;
ChainBufferReader(ChainBufferNodeReaderPtr head)580   explicit ChainBufferReader(ChainBufferNodeReaderPtr head)
581       : begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) {
582     end_.advance_till_end();
583   }
ChainBufferReader(ChainBufferIterator begin,ChainBufferIterator end,bool sync_flag)584   ChainBufferReader(ChainBufferIterator begin, ChainBufferIterator end, bool sync_flag)
585       : begin_(std::move(begin)), end_(std::move(end)), sync_flag_(sync_flag) {
586   }
ChainBufferReader(ChainBufferNodeReaderPtr head,size_t size)587   ChainBufferReader(ChainBufferNodeReaderPtr head, size_t size)
588       : begin_(ChainBufferNodeAllocator::clone(head)), end_(std::move(head)) {
589     auto advanced = end_.advance(size);
590     CHECK(advanced == size);
591   }
592   ChainBufferReader(ChainBufferReader &&) = default;
593   ChainBufferReader &operator=(ChainBufferReader &&) = default;
594   ChainBufferReader(const ChainBufferReader &) = delete;
595   ChainBufferReader &operator=(const ChainBufferReader &) = delete;
596   ~ChainBufferReader() = default;
597 
clone()598   ChainBufferReader clone() {
599     return ChainBufferReader(begin_.clone(), end_.clone(), sync_flag_);
600   }
601 
prepare_read()602   Slice prepare_read() {
603     auto res = begin_.prepare_read();
604     res.truncate(size());
605     return res;
606   }
607 
confirm_read(size_t size)608   void confirm_read(size_t size) {
609     CHECK(size <= this->size());
610     begin_.confirm_read(size);
611   }
612 
613   size_t advance(size_t offset, MutableSlice dest = MutableSlice());
614 
size()615   size_t size() const {
616     return end_.offset() - begin_.offset();
617   }
empty()618   bool empty() const {
619     return size() == 0;
620   }
621 
sync_with_writer()622   void sync_with_writer() {
623     if (sync_flag_) {
624       end_.advance_till_end();
625     }
626   }
advance_end(size_t size)627   void advance_end(size_t size) {
628     end_.advance(size);
629   }
begin()630   const ChainBufferIterator &begin() {
631     return begin_;
632   }
end()633   const ChainBufferIterator &end() {
634     return end_;
635   }
636 
637   // Return [begin_, tail.begin_)
638   // *this = tail
cut_head(ChainBufferIterator pos)639   ChainBufferReader cut_head(ChainBufferIterator pos) TD_WARN_UNUSED_RESULT {
640     auto tmp = begin_.clone();
641     begin_ = pos.clone();
642     return ChainBufferReader(std::move(tmp), std::move(pos), false);
643   }
644 
cut_head(size_t offset)645   ChainBufferReader cut_head(size_t offset) TD_WARN_UNUSED_RESULT {
646     CHECK(offset <= size());
647     auto it = begin_.clone();
648     it.advance(offset);
649     return cut_head(std::move(it));
650   }
651 
move_as_buffer_slice()652   BufferSlice move_as_buffer_slice() {
653     BufferSlice res;
654     if (begin_.head().size() >= size()) {
655       res = begin_.read_as_buffer_slice(size());
656     } else {
657       auto save_size = size();
658       res = BufferSlice{save_size};
659       advance(save_size, res.as_slice());
660     }
661     *this = ChainBufferReader();
662     return res;
663   }
664 
665   BufferSlice read_as_buffer_slice(size_t limit = std::numeric_limits<size_t>::max()) {
666     return begin_.read_as_buffer_slice(min(limit, size()));
667   }
668 
669  private:
670   ChainBufferIterator begin_;  // use it for prepare_read. Fix result with size()
671   ChainBufferIterator end_;    // keep end as far as we can. use it for size()
672   bool sync_flag_ = true;      // auto sync of end_
673 
674   // 1. We have fixed size. Than end_ is useless.
675   // 2. No fixed size. One has to sync end_ with end_.advance_till_end() in order to calculate size.
676 };
677 
678 class ChainBufferWriter {
679  public:
ChainBufferWriter()680   ChainBufferWriter() {
681     init();
682   }
683 
684   void init(size_t size = 0) {
685     writer_ = BufferWriter(size);
686     tail_ = ChainBufferNodeAllocator::create(writer_.as_buffer_slice(), true);
687     head_ = ChainBufferNodeAllocator::clone(tail_);
688   }
689 
690   MutableSlice prepare_append(size_t hint = 0) {
691     CHECK(!empty());
692     auto res = prepare_append_inplace();
693     if (res.empty()) {
694       return prepare_append_alloc(hint);
695     }
696     return res;
697   }
prepare_append_at_least(size_t size)698   MutableSlice prepare_append_at_least(size_t size) {
699     CHECK(!empty());
700     auto res = prepare_append_inplace();
701     if (res.size() < size) {
702       return prepare_append_alloc(size);
703     }
704     return res;
705   }
prepare_append_inplace()706   MutableSlice prepare_append_inplace() {
707     CHECK(!empty());
708     return writer_.prepare_append();
709   }
710   MutableSlice prepare_append_alloc(size_t hint = 0) {
711     CHECK(!empty());
712     if (hint < (1 << 10)) {
713       hint = 1 << 12;
714     }
715     BufferWriter new_writer(hint);
716     auto new_tail = ChainBufferNodeAllocator::create(new_writer.as_buffer_slice(), true);
717     tail_->next_ = ChainBufferNodeAllocator::clone(new_tail);
718     writer_ = std::move(new_writer);
719     tail_ = std::move(new_tail);  // release tail_
720     return writer_.prepare_append();
721   }
confirm_append(size_t size)722   void confirm_append(size_t size) {
723     CHECK(!empty());
724     writer_.confirm_append(size);
725   }
726 
727   void append(Slice slice, size_t hint = 0) {
728     while (!slice.empty()) {
729       auto ready = prepare_append(td::max(slice.size(), hint));
730       auto shift = min(ready.size(), slice.size());
731       ready.copy_from(slice.substr(0, shift));
732       confirm_append(shift);
733       slice.remove_prefix(shift);
734     }
735   }
736 
append(BufferSlice slice)737   void append(BufferSlice slice) {
738     auto ready = prepare_append_inplace();
739     // TODO(perf): we have to store some stats in ChainBufferWriter
740     // for better append logic
741     if (slice.size() < (1 << 8) || ready.size() >= slice.size()) {
742       return append(slice.as_slice());
743     }
744 
745     auto new_tail = ChainBufferNodeAllocator::create(std::move(slice), false);
746     tail_->next_ = ChainBufferNodeAllocator::clone(new_tail);
747     writer_ = BufferWriter();
748     tail_ = std::move(new_tail);  // release tail_
749   }
750 
append(ChainBufferReader && reader)751   void append(ChainBufferReader &&reader) {
752     while (!reader.empty()) {
753       append(reader.read_as_buffer_slice());
754     }
755   }
append(ChainBufferReader & reader)756   void append(ChainBufferReader &reader) {
757     while (!reader.empty()) {
758       append(reader.read_as_buffer_slice());
759     }
760   }
761 
extract_reader()762   ChainBufferReader extract_reader() {
763     CHECK(head_);
764     return ChainBufferReader(std::move(head_));
765   }
766 
767  private:
empty()768   bool empty() const {
769     return !tail_;
770   }
771 
772   ChainBufferNodeReaderPtr head_;
773   ChainBufferNodeWriterPtr tail_;
774   BufferWriter writer_;
775 };
776 
777 class BufferBuilder {
778  public:
779   BufferBuilder() = default;
BufferBuilder(Slice slice,size_t prepend_size,size_t append_size)780   BufferBuilder(Slice slice, size_t prepend_size, size_t append_size)
781       : buffer_writer_(slice, prepend_size, append_size) {
782   }
BufferBuilder(BufferWriter && buffer_writer)783   explicit BufferBuilder(BufferWriter &&buffer_writer) : buffer_writer_(std::move(buffer_writer)) {
784   }
785 
786   void append(BufferSlice slice);
787   void append(Slice slice);
788 
789   void prepend(BufferSlice slice);
790   void prepend(Slice slice);
791 
792   template <class F>
for_each(F && f)793   void for_each(F &&f) const & {
794     for (auto i = to_prepend_.size(); i > 0; i--) {
795       f(to_prepend_[i - 1].as_slice());
796     }
797     if (!buffer_writer_.empty()) {
798       f(buffer_writer_.as_slice());
799     }
800     for (auto &slice : to_append_) {
801       f(slice.as_slice());
802     }
803   }
804   template <class F>
for_each(F && f)805   void for_each(F &&f) && {
806     for (auto i = to_prepend_.size(); i > 0; i--) {
807       f(std::move(to_prepend_[i - 1]));
808     }
809     if (!buffer_writer_.empty()) {
810       f(buffer_writer_.as_buffer_slice());
811     }
812     for (auto &slice : to_append_) {
813       f(std::move(slice));
814     }
815   }
816   size_t size() const;
817 
818   BufferSlice extract();
819 
820  private:
821   BufferWriter buffer_writer_;
822   std::vector<BufferSlice> to_append_;
823   std::vector<BufferSlice> to_prepend_;
824 
825   bool append_inplace(Slice slice);
826   void append_slow(BufferSlice slice);
827   bool prepend_inplace(Slice slice);
828   void prepend_slow(BufferSlice slice);
829 };
830 
as_slice(const BufferSlice & value)831 inline Slice as_slice(const BufferSlice &value) {
832   return value.as_slice();
833 }
as_slice(BufferSlice & value)834 inline MutableSlice as_slice(BufferSlice &value) {
835   return value.as_slice();
836 }
as_mutable_slice(BufferSlice & value)837 inline MutableSlice as_mutable_slice(BufferSlice &value) {
838   return value.as_slice();
839 }
840 
841 }  // namespace td
842