1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <stdexcept>
20 #include <string>
21 
22 #include <folly/ScopeGuard.h>
23 #include <folly/io/IOBuf.h>
24 
25 namespace folly {
26 
27 namespace io {
28 enum class CursorAccess;
29 template <CursorAccess>
30 class RWCursor;
31 } // namespace io
32 
33 /**
34  * An IOBufQueue encapsulates a chain of IOBufs and provides
35  * convenience functions to append data to the back of the chain
36  * and remove data from the front.
37  *
38  * You may also prepend data into the headroom of the first buffer in the
39  * chain, if any.
40  */
41 class IOBufQueue {
42  private:
43   template <io::CursorAccess>
44   friend class io::RWCursor;
45 
46   /**
47    * This guard should be taken by any method that intends to do any changes
48    * to in data_ (e.g. appending to it).
49    *
50    * It flushes the writable tail cache and refills it on destruction.
51    */
updateGuard()52   auto updateGuard() {
53     flushCache();
54     return folly::makeGuard([this] { updateWritableTailCache(); });
55   }
56 
57   struct WritableRangeCacheData {
58     std::pair<uint8_t*, uint8_t*> cachedRange;
59     bool attached{false};
60 
61     WritableRangeCacheData() = default;
62 
WritableRangeCacheDataWritableRangeCacheData63     WritableRangeCacheData(WritableRangeCacheData&& other)
64         : cachedRange(other.cachedRange), attached(other.attached) {
65       other.cachedRange = {nullptr, nullptr};
66       other.attached = false;
67     }
68     WritableRangeCacheData& operator=(WritableRangeCacheData&& other) {
69       cachedRange = other.cachedRange;
70       attached = other.attached;
71 
72       other.cachedRange = {nullptr, nullptr};
73       other.attached = false;
74 
75       return *this;
76     }
77 
78     WritableRangeCacheData(const WritableRangeCacheData&) = delete;
79     WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete;
80   };
81 
82  public:
83   struct Options {
OptionsOptions84     Options() : cacheChainLength(false) {}
85     bool cacheChainLength;
86   };
87 
88   /**
89    * Commonly used Options, currently the only possible value other than
90    * the default.
91    */
cacheChainLength()92   static Options cacheChainLength() {
93     Options options;
94     options.cacheChainLength = true;
95     return options;
96   }
97 
98   /**
99    * WritableRangeCache represents a cache of current writable tail and provides
100    * cheap and simple interface to append to it that avoids paying the cost of
101    * preallocate/postallocate pair (i.e. indirections and checks).
102    *
103    * The cache is flushed on destruction/copy/move and on non-const accesses to
104    * the underlying IOBufQueue.
105    *
106    * Note: there can be only one active cache for a given IOBufQueue, i.e. when
107    *       you fill a cache object it automatically invalidates other
108    *       cache (if any).
109    */
110   class WritableRangeCache {
111    public:
queue_(q)112     explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) {
113       if (queue_) {
114         fillCache();
115       }
116     }
117 
118     /**
119      * Move constructor/assignment can move the cached range, but must update
120      * the reference in IOBufQueue.
121      */
WritableRangeCache(WritableRangeCache && other)122     WritableRangeCache(WritableRangeCache&& other)
123         : data_(std::move(other.data_)), queue_(other.queue_) {
124       if (data_.attached) {
125         queue_->updateCacheRef(data_);
126       }
127     }
128     WritableRangeCache& operator=(WritableRangeCache&& other) {
129       if (data_.attached) {
130         queue_->clearWritableRangeCache();
131       }
132 
133       data_ = std::move(other.data_);
134       queue_ = other.queue_;
135 
136       if (data_.attached) {
137         queue_->updateCacheRef(data_);
138       }
139 
140       return *this;
141     }
142 
143     /**
144      * Copy constructor/assignment cannot copy the cached range.
145      */
WritableRangeCache(const WritableRangeCache & other)146     WritableRangeCache(const WritableRangeCache& other)
147         : queue_(other.queue_) {}
148     WritableRangeCache& operator=(const WritableRangeCache& other) {
149       if (data_.attached) {
150         queue_->clearWritableRangeCache();
151       }
152 
153       queue_ = other.queue_;
154 
155       return *this;
156     }
157 
~WritableRangeCache()158     ~WritableRangeCache() {
159       if (data_.attached) {
160         queue_->clearWritableRangeCache();
161       }
162     }
163 
164     /**
165      * Reset the underlying IOBufQueue, will flush current cache if present.
166      */
reset(IOBufQueue * q)167     void reset(IOBufQueue* q) {
168       if (data_.attached) {
169         queue_->clearWritableRangeCache();
170       }
171 
172       queue_ = q;
173 
174       if (queue_) {
175         fillCache();
176       }
177     }
178 
179     /**
180      * Get a pointer to the underlying IOBufQueue object.
181      */
queue()182     IOBufQueue* queue() { return queue_; }
183 
184     /**
185      * Return a pointer to the start of cached writable tail.
186      *
187      * Note: doesn't populate cache.
188      */
writableData()189     uint8_t* writableData() {
190       dcheckIntegrity();
191       return data_.cachedRange.first;
192     }
193 
194     /**
195      * Return a length of cached writable tail.
196      *
197      * Note: doesn't populate cache.
198      */
length()199     size_t length() {
200       dcheckIntegrity();
201       return data_.cachedRange.second - data_.cachedRange.first;
202     }
203 
204     /**
205      * Mark n bytes as occupied (e.g. postallocate).
206      */
append(size_t n)207     void append(size_t n) {
208       dcheckIntegrity();
209       // This can happen only if somebody is misusing the interface.
210       // E.g. calling append after touching IOBufQueue or without checking
211       // the length().
212       if (LIKELY(data_.cachedRange.first != nullptr)) {
213         DCHECK_LE(n, length());
214         data_.cachedRange.first += n;
215       } else {
216         appendSlow(n);
217       }
218     }
219 
220     /**
221      * Same as append(n), but avoids checking if there is a cache.
222      * The caller must guarantee that the cache is set (e.g. the caller just
223      * called fillCache or checked that it's not empty).
224      */
appendUnsafe(size_t n)225     void appendUnsafe(size_t n) { data_.cachedRange.first += n; }
226 
227     /**
228      * Fill the cache of writable tail from the underlying IOBufQueue.
229      */
fillCache()230     void fillCache() { queue_->fillWritableRangeCache(data_); }
231 
232    private:
233     WritableRangeCacheData data_;
234     IOBufQueue* queue_;
235 
appendSlow(size_t n)236     FOLLY_NOINLINE void appendSlow(size_t n) { queue_->postallocate(n); }
237 
dcheckIntegrity()238     void dcheckIntegrity() {
239       // Tail start should always be less than tail end.
240       DCHECK_LE(
241           (void*)data_.cachedRange.first, (void*)data_.cachedRange.second);
242       DCHECK(
243           data_.cachedRange.first != nullptr ||
244           data_.cachedRange.second == nullptr);
245 
246       // Cached range should be always empty if the cache is not attached.
247       DCHECK(
248           data_.attached ||
249           (data_.cachedRange.first == nullptr &&
250            data_.cachedRange.second == nullptr));
251 
252       // We cannot be in attached state if the queue_ is not set.
253       DCHECK(queue_ != nullptr || !data_.attached);
254 
255       // If we're attached and the cache is not empty, then it should coincide
256       // with the tail buffer.
257       DCHECK(
258           !data_.attached || data_.cachedRange.first == nullptr ||
259           (queue_->head_ != nullptr &&
260            data_.cachedRange.first >= queue_->head_->prev()->writableTail() &&
261            data_.cachedRange.second ==
262                queue_->head_->prev()->writableTail() +
263                    queue_->head_->prev()->tailroom()));
264     }
265   };
266 
267   explicit IOBufQueue(const Options& options = Options());
268   ~IOBufQueue();
269 
270   /**
271    * Return a space to prepend bytes and the amount of headroom available.
272    */
273   std::pair<void*, std::size_t> headroom();
274 
275   /**
276    * Indicate that n bytes from the headroom have been used.
277    */
278   void markPrepended(std::size_t n);
279 
280   /**
281    * Prepend an existing range; throws std::overflow_error if not enough
282    * room.
283    */
284   void prepend(const void* buf, std::size_t n);
285 
286   /**
287    * Add a buffer or buffer chain to the end of this queue. The
288    * queue takes ownership of buf.
289    *
290    * If pack is true, we try to reduce wastage at the end of this queue
291    * by copying some data from the first buffers in the buf chain (and
292    * releasing the buffers), if possible.  If pack is false, we leave
293    * the chain topology unchanged.
294    *
295    * If allowTailReuse is true, the current writable tail is reappended at the
296    * end of the chain when possible and beneficial.
297    */
298   void append(
299       std::unique_ptr<folly::IOBuf>&& buf,
300       bool pack = false,
301       bool allowTailReuse = false);
302   void append(
303       const folly::IOBuf& buf, bool pack = false, bool allowTailReuse = false);
304 
305   /**
306    * Add a queue to the end of this queue. The queue takes ownership of
307    * all buffers from the other queue.
308    */
309   void append(
310       IOBufQueue& other, bool pack = false, bool allowTailReuse = false);
311   void append(
312       IOBufQueue&& other, bool pack = false, bool allowTailReuse = false) {
313     append(other, pack, allowTailReuse);
314   }
315 
316   /**
317    * Copy len bytes, starting at buf, to the end of this queue.
318    * The caller retains ownership of the source data.
319    */
320   void append(const void* buf, size_t len);
321 
322   /**
323    * Copy a string to the end of this queue.
324    * The caller retains ownership of the source data.
325    */
append(StringPiece sp)326   void append(StringPiece sp) { append(sp.data(), sp.size()); }
327 
328   /**
329    * Append a chain of IOBuf objects that point to consecutive regions
330    * within buf.
331    *
332    * Just like IOBuf::wrapBuffer, this should only be used when the caller
333    * knows ahead of time and can ensure that all IOBuf objects that will point
334    * to this buffer will be destroyed before the buffer itself is destroyed;
335    * all other caveats from wrapBuffer also apply.
336    *
337    * Every buffer except for the last will wrap exactly blockSize bytes.
338    * Importantly, this method may be used to wrap buffers larger than 4GB.
339    */
340   void wrapBuffer(
341       const void* buf,
342       size_t len,
343       std::size_t blockSize = (1U << 31)); // default block size: 2GB
344 
345   /**
346    * Obtain a writable block of contiguous bytes at the end of this
347    * queue, allocating more space if necessary.  The amount of space
348    * reserved will be at least min.  If min contiguous space is not
349    * available at the end of the queue, and IOBuf with size newAllocationSize
350    * is appended to the chain and returned.  The actual available space
351    * may be larger than newAllocationSize, but will be truncated to max,
352    * if specified.
353    *
354    * If the caller subsequently writes anything into the returned space,
355    * it must call the postallocate() method.
356    *
357    * @return The starting address of the block and the length in bytes.
358    *
359    * @note The point of the preallocate()/postallocate() mechanism is
360    *       to support I/O APIs such as AsyncSocket::ReadCallback that
361    *       request a buffer from the application and then, in a later
362    *       callback, tell the application how much of the buffer they
363    *       have filled with data.
364    */
365   std::pair<void*, std::size_t> preallocate(
366       std::size_t min,
367       std::size_t newAllocationSize,
368       std::size_t max = std::numeric_limits<std::size_t>::max()) {
369     dcheckCacheIntegrity();
370 
371     if (LIKELY(writableTail() != nullptr && tailroom() >= min)) {
372       return std::make_pair(
373           writableTail(), std::min<std::size_t>(max, tailroom()));
374     }
375 
376     return preallocateSlow(min, newAllocationSize, max);
377   }
378 
379   /**
380    * Tell the queue that the caller has written data into the first n
381    * bytes provided by the previous preallocate() call.
382    *
383    * @note n should be less than or equal to the size returned by
384    *       preallocate().  If n is zero, the caller may skip the call
385    *       to postallocate().  If n is nonzero, the caller must not
386    *       invoke any other non-const methods on this IOBufQueue between
387    *       the call to preallocate and the call to postallocate().
388    */
postallocate(std::size_t n)389   void postallocate(std::size_t n) {
390     dcheckCacheIntegrity();
391     DCHECK_LE(
392         (void*)(cachePtr_->cachedRange.first + n),
393         (void*)cachePtr_->cachedRange.second);
394     cachePtr_->cachedRange.first += n;
395   }
396 
397   /**
398    * Obtain a writable block of n contiguous bytes, allocating more space
399    * if necessary, and mark it as used.  The caller can fill it later.
400    */
allocate(std::size_t n)401   void* allocate(std::size_t n) {
402     void* p = preallocate(n, n).first;
403     postallocate(n);
404     return p;
405   }
406 
writableTail()407   void* writableTail() const {
408     dcheckCacheIntegrity();
409     return cachePtr_->cachedRange.first;
410   }
411 
tailroom()412   size_t tailroom() const {
413     dcheckCacheIntegrity();
414     return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first;
415   }
416 
417   /**
418    * Split off the first n bytes of the queue into a separate IOBuf chain,
419    * and transfer ownership of the new chain to the caller.  The IOBufQueue
420    * retains ownership of everything after the split point.
421    *
422    * @warning If the split point lies in the middle of some IOBuf within
423    *          the chain, this function may, as an implementation detail,
424    *          clone that IOBuf.
425    *
426    * @throws std::underflow_error if n exceeds the number of bytes
427    *         in the queue.
428    */
split(size_t n)429   std::unique_ptr<folly::IOBuf> split(size_t n) { return split(n, true); }
430 
431   /**
432    * Similar to split, but will return the entire queue instead of throwing
433    * if n exceeds the number of bytes in the queue.
434    */
splitAtMost(size_t n)435   std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) {
436     return split(n, false);
437   }
438 
439   /**
440    * Similar to IOBuf::trimStart, but works on the whole queue.  Will
441    * pop off buffers that have been completely trimmed.
442    */
443   void trimStart(size_t amount);
444 
445   /**
446    * Similar to trimStart, but will trim at most amount bytes and returns
447    * the number of bytes trimmed.
448    */
449   size_t trimStartAtMost(size_t amount);
450 
451   /**
452    * Similar to IOBuf::trimEnd, but works on the whole queue.  Will
453    * pop off buffers that have been completely trimmed.
454    */
455   void trimEnd(size_t amount);
456 
457   /**
458    * Similar to trimEnd, but will trim at most amount bytes and returns
459    * the number of bytes trimmed.
460    */
461   size_t trimEndAtMost(size_t amount);
462 
463   /**
464    * Transfer ownership of the queue's entire IOBuf chain to the caller.
465    */
move()466   std::unique_ptr<folly::IOBuf> move() {
467     auto guard = updateGuard();
468     std::unique_ptr<folly::IOBuf> res = std::move(head_);
469     chainLength_ = 0;
470     return res;
471   }
472 
moveAsValue()473   folly::IOBuf moveAsValue() { return std::move(*move()); }
474 
475   /**
476    * Access the front IOBuf.
477    *
478    * Note: caller will see the current state of the chain, but may not see
479    *       future updates immediately, due to the presence of a tail cache.
480    * Note: the caller may potentially clone the chain, thus marking all buffers
481    *       as shared. We may still continue writing to the tail of the last
482    *       IOBuf without checking if it's shared, but this is fine, since the
483    *       cloned IOBufs won't reference that data.
484    */
front()485   const folly::IOBuf* front() const {
486     flushCache();
487     return head_.get();
488   }
489 
490   /**
491    * returns the first IOBuf in the chain and removes it from the chain
492    *
493    * @return first IOBuf in the chain or nullptr if none.
494    */
495   std::unique_ptr<folly::IOBuf> pop_front();
496 
497   /**
498    * Total chain length, only valid if cacheLength was specified in the
499    * constructor.
500    */
chainLength()501   size_t chainLength() const {
502     if (UNLIKELY(!options_.cacheChainLength)) {
503       throw std::invalid_argument("IOBufQueue: chain length not cached");
504     }
505     dcheckCacheIntegrity();
506     return chainLength_ + (cachePtr_->cachedRange.first - tailStart_);
507   }
508 
509   /**
510    * Returns true iff the IOBuf chain length is 0.
511    */
empty()512   bool empty() const {
513     dcheckCacheIntegrity();
514     return !head_ ||
515         (head_->empty() && cachePtr_->cachedRange.first == tailStart_);
516   }
517 
options()518   const Options& options() const { return options_; }
519 
520   /**
521    * Clear the queue, freeing all the buffers. Options are preserved.
522    */
reset()523   void reset() { move(); }
524 
525   /**
526    * Clear the queue, but try to clear and keep the largest buffer for reuse
527    * when possible. Options are preserved.
528    */
529   void clearAndTryReuseLargestBuffer();
530 
531   /**
532    * Append the queue to a std::string. Non-destructive.
533    */
534   void appendToString(std::string& out) const;
535 
536   /**
537    * Calls IOBuf::gather() on the head of the queue, if it exists.
538    */
539   void gather(std::size_t maxLength);
540 
541   /** Movable */
542   IOBufQueue(IOBufQueue&&) noexcept;
543   IOBufQueue& operator=(IOBufQueue&&);
544 
545   static constexpr size_t kMaxPackCopy = 4096;
546 
547  private:
548   std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow);
549 
550   static const size_t kChainLengthNotCached = (size_t)-1;
551   /** Not copyable */
552   IOBufQueue(const IOBufQueue&) = delete;
553   IOBufQueue& operator=(const IOBufQueue&) = delete;
554 
555   Options options_;
556 
557   // NOTE that chainLength_ is still updated even if !options_.cacheChainLength
558   // because doing it unchecked in postallocate() is faster (no (mis)predicted
559   // branch)
560   mutable size_t chainLength_{0};
561   /**
562    * Everything that has been appended but not yet discarded or moved out
563    * Note: anything that needs to operate on a tail should either call
564    * flushCache() or grab updateGuard() (it will flush the cache itself).
565    */
566   std::unique_ptr<folly::IOBuf> head_;
567 
568   mutable uint8_t* tailStart_{nullptr};
569   WritableRangeCacheData* cachePtr_{nullptr};
570   WritableRangeCacheData localCache_;
571 
572   // Non-null only if points to the current tail buffer, and that buffer was
573   // originally created by this IOBufQueue, so it can be safely
574   // reused. Initially set by preallocateSlow() and updated by maybeReuseTail()
575   // or invalidated by updateGuard().
576   folly::IOBuf* reusableTail_ = nullptr;
577 
dcheckCacheIntegrity()578   void dcheckCacheIntegrity() const {
579     // Tail start should always be less than tail end.
580     DCHECK_LE((void*)tailStart_, (void*)cachePtr_->cachedRange.first);
581     DCHECK_LE(
582         (void*)cachePtr_->cachedRange.first,
583         (void*)cachePtr_->cachedRange.second);
584     DCHECK(
585         cachePtr_->cachedRange.first != nullptr ||
586         cachePtr_->cachedRange.second == nullptr);
587 
588     // There is always an attached cache instance.
589     DCHECK(cachePtr_->attached);
590 
591     // Either cache is empty or it coincides with the tail.
592     if (cachePtr_->cachedRange.first != nullptr) {
593       DCHECK(head_ != nullptr);
594       DCHECK(tailStart_ == head_->prev()->writableTail());
595       DCHECK(tailStart_ <= cachePtr_->cachedRange.first);
596       DCHECK(cachePtr_->cachedRange.first >= head_->prev()->writableTail());
597       DCHECK(
598           cachePtr_->cachedRange.second ==
599           head_->prev()->writableTail() + head_->prev()->tailroom());
600     }
601 
602     // If reusableTail_ is not null it should point to the current tail buffer.
603     if (reusableTail_ != nullptr) {
604       DCHECK(head_ != nullptr);
605       DCHECK(reusableTail_ == head_->prev());
606     }
607   }
608 
609   /**
610    * Populate dest with writable tail range cache.
611    */
fillWritableRangeCache(WritableRangeCacheData & dest)612   void fillWritableRangeCache(WritableRangeCacheData& dest) {
613     dcheckCacheIntegrity();
614     if (cachePtr_ != &dest) {
615       dest = std::move(*cachePtr_);
616       cachePtr_ = &dest;
617     }
618   }
619 
620   /**
621    * Clear current writable tail cache and reset it to localCache_
622    */
clearWritableRangeCache()623   void clearWritableRangeCache() {
624     flushCache();
625 
626     if (cachePtr_ != &localCache_) {
627       localCache_ = std::move(*cachePtr_);
628       cachePtr_ = &localCache_;
629     }
630 
631     DCHECK(cachePtr_ == &localCache_ && localCache_.attached);
632   }
633 
634   /**
635    * Commit any pending changes to the tail of the queue.
636    */
flushCache()637   void flushCache() const {
638     dcheckCacheIntegrity();
639 
640     if (tailStart_ != cachePtr_->cachedRange.first) {
641       auto buf = head_->prev();
642       DCHECK_EQ(
643           (void*)(buf->writableTail() + buf->tailroom()),
644           (void*)cachePtr_->cachedRange.second);
645       auto len = cachePtr_->cachedRange.first - tailStart_;
646       buf->append(len);
647       chainLength_ += len;
648       tailStart_ += len;
649     }
650   }
651 
652   // For WritableRangeCache move assignment/construction.
updateCacheRef(WritableRangeCacheData & newRef)653   void updateCacheRef(WritableRangeCacheData& newRef) { cachePtr_ = &newRef; }
654 
655   /**
656    * Update cached writable tail range. Called by updateGuard()
657    */
updateWritableTailCache()658   void updateWritableTailCache() {
659     if (head_ == nullptr || reusableTail_ != head_->prev()) {
660       reusableTail_ = nullptr;
661     }
662 
663     if (LIKELY(head_ != nullptr)) {
664       IOBuf* buf = head_->prev();
665       if (LIKELY(!buf->isSharedOne())) {
666         tailStart_ = buf->writableTail();
667         cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
668             tailStart_, tailStart_ + buf->tailroom());
669         return;
670       }
671     }
672     tailStart_ = nullptr;
673     cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>();
674   }
675 
676   std::pair<void*, std::size_t> preallocateSlow(
677       std::size_t min, std::size_t newAllocationSize, std::size_t max);
678 
679   void maybeReuseTail();
680 };
681 
682 } // namespace folly
683