1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <stdexcept> 20 #include <string> 21 22 #include <folly/ScopeGuard.h> 23 #include <folly/io/IOBuf.h> 24 25 namespace folly { 26 27 namespace io { 28 enum class CursorAccess; 29 template <CursorAccess> 30 class RWCursor; 31 } // namespace io 32 33 /** 34 * An IOBufQueue encapsulates a chain of IOBufs and provides 35 * convenience functions to append data to the back of the chain 36 * and remove data from the front. 37 * 38 * You may also prepend data into the headroom of the first buffer in the 39 * chain, if any. 40 */ 41 class IOBufQueue { 42 private: 43 template <io::CursorAccess> 44 friend class io::RWCursor; 45 46 /** 47 * This guard should be taken by any method that intends to do any changes 48 * to in data_ (e.g. appending to it). 49 * 50 * It flushes the writable tail cache and refills it on destruction. 51 */ updateGuard()52 auto updateGuard() { 53 flushCache(); 54 return folly::makeGuard([this] { updateWritableTailCache(); }); 55 } 56 57 struct WritableRangeCacheData { 58 std::pair<uint8_t*, uint8_t*> cachedRange; 59 bool attached{false}; 60 61 WritableRangeCacheData() = default; 62 WritableRangeCacheDataWritableRangeCacheData63 WritableRangeCacheData(WritableRangeCacheData&& other) 64 : cachedRange(other.cachedRange), attached(other.attached) { 65 other.cachedRange = {nullptr, nullptr}; 66 other.attached = false; 67 } 68 WritableRangeCacheData& operator=(WritableRangeCacheData&& other) { 69 cachedRange = other.cachedRange; 70 attached = other.attached; 71 72 other.cachedRange = {nullptr, nullptr}; 73 other.attached = false; 74 75 return *this; 76 } 77 78 WritableRangeCacheData(const WritableRangeCacheData&) = delete; 79 WritableRangeCacheData& operator=(const WritableRangeCacheData&) = delete; 80 }; 81 82 public: 83 struct Options { OptionsOptions84 Options() : cacheChainLength(false) {} 85 bool cacheChainLength; 86 }; 87 88 /** 89 * Commonly used Options, currently the only possible value other than 90 * the default. 91 */ cacheChainLength()92 static Options cacheChainLength() { 93 Options options; 94 options.cacheChainLength = true; 95 return options; 96 } 97 98 /** 99 * WritableRangeCache represents a cache of current writable tail and provides 100 * cheap and simple interface to append to it that avoids paying the cost of 101 * preallocate/postallocate pair (i.e. indirections and checks). 102 * 103 * The cache is flushed on destruction/copy/move and on non-const accesses to 104 * the underlying IOBufQueue. 105 * 106 * Note: there can be only one active cache for a given IOBufQueue, i.e. when 107 * you fill a cache object it automatically invalidates other 108 * cache (if any). 109 */ 110 class WritableRangeCache { 111 public: queue_(q)112 explicit WritableRangeCache(folly::IOBufQueue* q = nullptr) : queue_(q) { 113 if (queue_) { 114 fillCache(); 115 } 116 } 117 118 /** 119 * Move constructor/assignment can move the cached range, but must update 120 * the reference in IOBufQueue. 121 */ WritableRangeCache(WritableRangeCache && other)122 WritableRangeCache(WritableRangeCache&& other) 123 : data_(std::move(other.data_)), queue_(other.queue_) { 124 if (data_.attached) { 125 queue_->updateCacheRef(data_); 126 } 127 } 128 WritableRangeCache& operator=(WritableRangeCache&& other) { 129 if (data_.attached) { 130 queue_->clearWritableRangeCache(); 131 } 132 133 data_ = std::move(other.data_); 134 queue_ = other.queue_; 135 136 if (data_.attached) { 137 queue_->updateCacheRef(data_); 138 } 139 140 return *this; 141 } 142 143 /** 144 * Copy constructor/assignment cannot copy the cached range. 145 */ WritableRangeCache(const WritableRangeCache & other)146 WritableRangeCache(const WritableRangeCache& other) 147 : queue_(other.queue_) {} 148 WritableRangeCache& operator=(const WritableRangeCache& other) { 149 if (data_.attached) { 150 queue_->clearWritableRangeCache(); 151 } 152 153 queue_ = other.queue_; 154 155 return *this; 156 } 157 ~WritableRangeCache()158 ~WritableRangeCache() { 159 if (data_.attached) { 160 queue_->clearWritableRangeCache(); 161 } 162 } 163 164 /** 165 * Reset the underlying IOBufQueue, will flush current cache if present. 166 */ reset(IOBufQueue * q)167 void reset(IOBufQueue* q) { 168 if (data_.attached) { 169 queue_->clearWritableRangeCache(); 170 } 171 172 queue_ = q; 173 174 if (queue_) { 175 fillCache(); 176 } 177 } 178 179 /** 180 * Get a pointer to the underlying IOBufQueue object. 181 */ queue()182 IOBufQueue* queue() { return queue_; } 183 184 /** 185 * Return a pointer to the start of cached writable tail. 186 * 187 * Note: doesn't populate cache. 188 */ writableData()189 uint8_t* writableData() { 190 dcheckIntegrity(); 191 return data_.cachedRange.first; 192 } 193 194 /** 195 * Return a length of cached writable tail. 196 * 197 * Note: doesn't populate cache. 198 */ length()199 size_t length() { 200 dcheckIntegrity(); 201 return data_.cachedRange.second - data_.cachedRange.first; 202 } 203 204 /** 205 * Mark n bytes as occupied (e.g. postallocate). 206 */ append(size_t n)207 void append(size_t n) { 208 dcheckIntegrity(); 209 // This can happen only if somebody is misusing the interface. 210 // E.g. calling append after touching IOBufQueue or without checking 211 // the length(). 212 if (LIKELY(data_.cachedRange.first != nullptr)) { 213 DCHECK_LE(n, length()); 214 data_.cachedRange.first += n; 215 } else { 216 appendSlow(n); 217 } 218 } 219 220 /** 221 * Same as append(n), but avoids checking if there is a cache. 222 * The caller must guarantee that the cache is set (e.g. the caller just 223 * called fillCache or checked that it's not empty). 224 */ appendUnsafe(size_t n)225 void appendUnsafe(size_t n) { data_.cachedRange.first += n; } 226 227 /** 228 * Fill the cache of writable tail from the underlying IOBufQueue. 229 */ fillCache()230 void fillCache() { queue_->fillWritableRangeCache(data_); } 231 232 private: 233 WritableRangeCacheData data_; 234 IOBufQueue* queue_; 235 appendSlow(size_t n)236 FOLLY_NOINLINE void appendSlow(size_t n) { queue_->postallocate(n); } 237 dcheckIntegrity()238 void dcheckIntegrity() { 239 // Tail start should always be less than tail end. 240 DCHECK_LE( 241 (void*)data_.cachedRange.first, (void*)data_.cachedRange.second); 242 DCHECK( 243 data_.cachedRange.first != nullptr || 244 data_.cachedRange.second == nullptr); 245 246 // Cached range should be always empty if the cache is not attached. 247 DCHECK( 248 data_.attached || 249 (data_.cachedRange.first == nullptr && 250 data_.cachedRange.second == nullptr)); 251 252 // We cannot be in attached state if the queue_ is not set. 253 DCHECK(queue_ != nullptr || !data_.attached); 254 255 // If we're attached and the cache is not empty, then it should coincide 256 // with the tail buffer. 257 DCHECK( 258 !data_.attached || data_.cachedRange.first == nullptr || 259 (queue_->head_ != nullptr && 260 data_.cachedRange.first >= queue_->head_->prev()->writableTail() && 261 data_.cachedRange.second == 262 queue_->head_->prev()->writableTail() + 263 queue_->head_->prev()->tailroom())); 264 } 265 }; 266 267 explicit IOBufQueue(const Options& options = Options()); 268 ~IOBufQueue(); 269 270 /** 271 * Return a space to prepend bytes and the amount of headroom available. 272 */ 273 std::pair<void*, std::size_t> headroom(); 274 275 /** 276 * Indicate that n bytes from the headroom have been used. 277 */ 278 void markPrepended(std::size_t n); 279 280 /** 281 * Prepend an existing range; throws std::overflow_error if not enough 282 * room. 283 */ 284 void prepend(const void* buf, std::size_t n); 285 286 /** 287 * Add a buffer or buffer chain to the end of this queue. The 288 * queue takes ownership of buf. 289 * 290 * If pack is true, we try to reduce wastage at the end of this queue 291 * by copying some data from the first buffers in the buf chain (and 292 * releasing the buffers), if possible. If pack is false, we leave 293 * the chain topology unchanged. 294 * 295 * If allowTailReuse is true, the current writable tail is reappended at the 296 * end of the chain when possible and beneficial. 297 */ 298 void append( 299 std::unique_ptr<folly::IOBuf>&& buf, 300 bool pack = false, 301 bool allowTailReuse = false); 302 void append( 303 const folly::IOBuf& buf, bool pack = false, bool allowTailReuse = false); 304 305 /** 306 * Add a queue to the end of this queue. The queue takes ownership of 307 * all buffers from the other queue. 308 */ 309 void append( 310 IOBufQueue& other, bool pack = false, bool allowTailReuse = false); 311 void append( 312 IOBufQueue&& other, bool pack = false, bool allowTailReuse = false) { 313 append(other, pack, allowTailReuse); 314 } 315 316 /** 317 * Copy len bytes, starting at buf, to the end of this queue. 318 * The caller retains ownership of the source data. 319 */ 320 void append(const void* buf, size_t len); 321 322 /** 323 * Copy a string to the end of this queue. 324 * The caller retains ownership of the source data. 325 */ append(StringPiece sp)326 void append(StringPiece sp) { append(sp.data(), sp.size()); } 327 328 /** 329 * Append a chain of IOBuf objects that point to consecutive regions 330 * within buf. 331 * 332 * Just like IOBuf::wrapBuffer, this should only be used when the caller 333 * knows ahead of time and can ensure that all IOBuf objects that will point 334 * to this buffer will be destroyed before the buffer itself is destroyed; 335 * all other caveats from wrapBuffer also apply. 336 * 337 * Every buffer except for the last will wrap exactly blockSize bytes. 338 * Importantly, this method may be used to wrap buffers larger than 4GB. 339 */ 340 void wrapBuffer( 341 const void* buf, 342 size_t len, 343 std::size_t blockSize = (1U << 31)); // default block size: 2GB 344 345 /** 346 * Obtain a writable block of contiguous bytes at the end of this 347 * queue, allocating more space if necessary. The amount of space 348 * reserved will be at least min. If min contiguous space is not 349 * available at the end of the queue, and IOBuf with size newAllocationSize 350 * is appended to the chain and returned. The actual available space 351 * may be larger than newAllocationSize, but will be truncated to max, 352 * if specified. 353 * 354 * If the caller subsequently writes anything into the returned space, 355 * it must call the postallocate() method. 356 * 357 * @return The starting address of the block and the length in bytes. 358 * 359 * @note The point of the preallocate()/postallocate() mechanism is 360 * to support I/O APIs such as AsyncSocket::ReadCallback that 361 * request a buffer from the application and then, in a later 362 * callback, tell the application how much of the buffer they 363 * have filled with data. 364 */ 365 std::pair<void*, std::size_t> preallocate( 366 std::size_t min, 367 std::size_t newAllocationSize, 368 std::size_t max = std::numeric_limits<std::size_t>::max()) { 369 dcheckCacheIntegrity(); 370 371 if (LIKELY(writableTail() != nullptr && tailroom() >= min)) { 372 return std::make_pair( 373 writableTail(), std::min<std::size_t>(max, tailroom())); 374 } 375 376 return preallocateSlow(min, newAllocationSize, max); 377 } 378 379 /** 380 * Tell the queue that the caller has written data into the first n 381 * bytes provided by the previous preallocate() call. 382 * 383 * @note n should be less than or equal to the size returned by 384 * preallocate(). If n is zero, the caller may skip the call 385 * to postallocate(). If n is nonzero, the caller must not 386 * invoke any other non-const methods on this IOBufQueue between 387 * the call to preallocate and the call to postallocate(). 388 */ postallocate(std::size_t n)389 void postallocate(std::size_t n) { 390 dcheckCacheIntegrity(); 391 DCHECK_LE( 392 (void*)(cachePtr_->cachedRange.first + n), 393 (void*)cachePtr_->cachedRange.second); 394 cachePtr_->cachedRange.first += n; 395 } 396 397 /** 398 * Obtain a writable block of n contiguous bytes, allocating more space 399 * if necessary, and mark it as used. The caller can fill it later. 400 */ allocate(std::size_t n)401 void* allocate(std::size_t n) { 402 void* p = preallocate(n, n).first; 403 postallocate(n); 404 return p; 405 } 406 writableTail()407 void* writableTail() const { 408 dcheckCacheIntegrity(); 409 return cachePtr_->cachedRange.first; 410 } 411 tailroom()412 size_t tailroom() const { 413 dcheckCacheIntegrity(); 414 return cachePtr_->cachedRange.second - cachePtr_->cachedRange.first; 415 } 416 417 /** 418 * Split off the first n bytes of the queue into a separate IOBuf chain, 419 * and transfer ownership of the new chain to the caller. The IOBufQueue 420 * retains ownership of everything after the split point. 421 * 422 * @warning If the split point lies in the middle of some IOBuf within 423 * the chain, this function may, as an implementation detail, 424 * clone that IOBuf. 425 * 426 * @throws std::underflow_error if n exceeds the number of bytes 427 * in the queue. 428 */ split(size_t n)429 std::unique_ptr<folly::IOBuf> split(size_t n) { return split(n, true); } 430 431 /** 432 * Similar to split, but will return the entire queue instead of throwing 433 * if n exceeds the number of bytes in the queue. 434 */ splitAtMost(size_t n)435 std::unique_ptr<folly::IOBuf> splitAtMost(size_t n) { 436 return split(n, false); 437 } 438 439 /** 440 * Similar to IOBuf::trimStart, but works on the whole queue. Will 441 * pop off buffers that have been completely trimmed. 442 */ 443 void trimStart(size_t amount); 444 445 /** 446 * Similar to trimStart, but will trim at most amount bytes and returns 447 * the number of bytes trimmed. 448 */ 449 size_t trimStartAtMost(size_t amount); 450 451 /** 452 * Similar to IOBuf::trimEnd, but works on the whole queue. Will 453 * pop off buffers that have been completely trimmed. 454 */ 455 void trimEnd(size_t amount); 456 457 /** 458 * Similar to trimEnd, but will trim at most amount bytes and returns 459 * the number of bytes trimmed. 460 */ 461 size_t trimEndAtMost(size_t amount); 462 463 /** 464 * Transfer ownership of the queue's entire IOBuf chain to the caller. 465 */ move()466 std::unique_ptr<folly::IOBuf> move() { 467 auto guard = updateGuard(); 468 std::unique_ptr<folly::IOBuf> res = std::move(head_); 469 chainLength_ = 0; 470 return res; 471 } 472 moveAsValue()473 folly::IOBuf moveAsValue() { return std::move(*move()); } 474 475 /** 476 * Access the front IOBuf. 477 * 478 * Note: caller will see the current state of the chain, but may not see 479 * future updates immediately, due to the presence of a tail cache. 480 * Note: the caller may potentially clone the chain, thus marking all buffers 481 * as shared. We may still continue writing to the tail of the last 482 * IOBuf without checking if it's shared, but this is fine, since the 483 * cloned IOBufs won't reference that data. 484 */ front()485 const folly::IOBuf* front() const { 486 flushCache(); 487 return head_.get(); 488 } 489 490 /** 491 * returns the first IOBuf in the chain and removes it from the chain 492 * 493 * @return first IOBuf in the chain or nullptr if none. 494 */ 495 std::unique_ptr<folly::IOBuf> pop_front(); 496 497 /** 498 * Total chain length, only valid if cacheLength was specified in the 499 * constructor. 500 */ chainLength()501 size_t chainLength() const { 502 if (UNLIKELY(!options_.cacheChainLength)) { 503 throw std::invalid_argument("IOBufQueue: chain length not cached"); 504 } 505 dcheckCacheIntegrity(); 506 return chainLength_ + (cachePtr_->cachedRange.first - tailStart_); 507 } 508 509 /** 510 * Returns true iff the IOBuf chain length is 0. 511 */ empty()512 bool empty() const { 513 dcheckCacheIntegrity(); 514 return !head_ || 515 (head_->empty() && cachePtr_->cachedRange.first == tailStart_); 516 } 517 options()518 const Options& options() const { return options_; } 519 520 /** 521 * Clear the queue, freeing all the buffers. Options are preserved. 522 */ reset()523 void reset() { move(); } 524 525 /** 526 * Clear the queue, but try to clear and keep the largest buffer for reuse 527 * when possible. Options are preserved. 528 */ 529 void clearAndTryReuseLargestBuffer(); 530 531 /** 532 * Append the queue to a std::string. Non-destructive. 533 */ 534 void appendToString(std::string& out) const; 535 536 /** 537 * Calls IOBuf::gather() on the head of the queue, if it exists. 538 */ 539 void gather(std::size_t maxLength); 540 541 /** Movable */ 542 IOBufQueue(IOBufQueue&&) noexcept; 543 IOBufQueue& operator=(IOBufQueue&&); 544 545 static constexpr size_t kMaxPackCopy = 4096; 546 547 private: 548 std::unique_ptr<folly::IOBuf> split(size_t n, bool throwOnUnderflow); 549 550 static const size_t kChainLengthNotCached = (size_t)-1; 551 /** Not copyable */ 552 IOBufQueue(const IOBufQueue&) = delete; 553 IOBufQueue& operator=(const IOBufQueue&) = delete; 554 555 Options options_; 556 557 // NOTE that chainLength_ is still updated even if !options_.cacheChainLength 558 // because doing it unchecked in postallocate() is faster (no (mis)predicted 559 // branch) 560 mutable size_t chainLength_{0}; 561 /** 562 * Everything that has been appended but not yet discarded or moved out 563 * Note: anything that needs to operate on a tail should either call 564 * flushCache() or grab updateGuard() (it will flush the cache itself). 565 */ 566 std::unique_ptr<folly::IOBuf> head_; 567 568 mutable uint8_t* tailStart_{nullptr}; 569 WritableRangeCacheData* cachePtr_{nullptr}; 570 WritableRangeCacheData localCache_; 571 572 // Non-null only if points to the current tail buffer, and that buffer was 573 // originally created by this IOBufQueue, so it can be safely 574 // reused. Initially set by preallocateSlow() and updated by maybeReuseTail() 575 // or invalidated by updateGuard(). 576 folly::IOBuf* reusableTail_ = nullptr; 577 dcheckCacheIntegrity()578 void dcheckCacheIntegrity() const { 579 // Tail start should always be less than tail end. 580 DCHECK_LE((void*)tailStart_, (void*)cachePtr_->cachedRange.first); 581 DCHECK_LE( 582 (void*)cachePtr_->cachedRange.first, 583 (void*)cachePtr_->cachedRange.second); 584 DCHECK( 585 cachePtr_->cachedRange.first != nullptr || 586 cachePtr_->cachedRange.second == nullptr); 587 588 // There is always an attached cache instance. 589 DCHECK(cachePtr_->attached); 590 591 // Either cache is empty or it coincides with the tail. 592 if (cachePtr_->cachedRange.first != nullptr) { 593 DCHECK(head_ != nullptr); 594 DCHECK(tailStart_ == head_->prev()->writableTail()); 595 DCHECK(tailStart_ <= cachePtr_->cachedRange.first); 596 DCHECK(cachePtr_->cachedRange.first >= head_->prev()->writableTail()); 597 DCHECK( 598 cachePtr_->cachedRange.second == 599 head_->prev()->writableTail() + head_->prev()->tailroom()); 600 } 601 602 // If reusableTail_ is not null it should point to the current tail buffer. 603 if (reusableTail_ != nullptr) { 604 DCHECK(head_ != nullptr); 605 DCHECK(reusableTail_ == head_->prev()); 606 } 607 } 608 609 /** 610 * Populate dest with writable tail range cache. 611 */ fillWritableRangeCache(WritableRangeCacheData & dest)612 void fillWritableRangeCache(WritableRangeCacheData& dest) { 613 dcheckCacheIntegrity(); 614 if (cachePtr_ != &dest) { 615 dest = std::move(*cachePtr_); 616 cachePtr_ = &dest; 617 } 618 } 619 620 /** 621 * Clear current writable tail cache and reset it to localCache_ 622 */ clearWritableRangeCache()623 void clearWritableRangeCache() { 624 flushCache(); 625 626 if (cachePtr_ != &localCache_) { 627 localCache_ = std::move(*cachePtr_); 628 cachePtr_ = &localCache_; 629 } 630 631 DCHECK(cachePtr_ == &localCache_ && localCache_.attached); 632 } 633 634 /** 635 * Commit any pending changes to the tail of the queue. 636 */ flushCache()637 void flushCache() const { 638 dcheckCacheIntegrity(); 639 640 if (tailStart_ != cachePtr_->cachedRange.first) { 641 auto buf = head_->prev(); 642 DCHECK_EQ( 643 (void*)(buf->writableTail() + buf->tailroom()), 644 (void*)cachePtr_->cachedRange.second); 645 auto len = cachePtr_->cachedRange.first - tailStart_; 646 buf->append(len); 647 chainLength_ += len; 648 tailStart_ += len; 649 } 650 } 651 652 // For WritableRangeCache move assignment/construction. updateCacheRef(WritableRangeCacheData & newRef)653 void updateCacheRef(WritableRangeCacheData& newRef) { cachePtr_ = &newRef; } 654 655 /** 656 * Update cached writable tail range. Called by updateGuard() 657 */ updateWritableTailCache()658 void updateWritableTailCache() { 659 if (head_ == nullptr || reusableTail_ != head_->prev()) { 660 reusableTail_ = nullptr; 661 } 662 663 if (LIKELY(head_ != nullptr)) { 664 IOBuf* buf = head_->prev(); 665 if (LIKELY(!buf->isSharedOne())) { 666 tailStart_ = buf->writableTail(); 667 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>( 668 tailStart_, tailStart_ + buf->tailroom()); 669 return; 670 } 671 } 672 tailStart_ = nullptr; 673 cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(); 674 } 675 676 std::pair<void*, std::size_t> preallocateSlow( 677 std::size_t min, std::size_t newAllocationSize, std::size_t max); 678 679 void maybeReuseTail(); 680 }; 681 682 } // namespace folly 683