1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <chrono> 20 #include <utility> 21 22 #include <folly/ExceptionWrapper.h> 23 #include <folly/Function.h> 24 #include <folly/io/IOBuf.h> 25 #include <folly/io/IOBufQueue.h> 26 #include <folly/io/async/AsyncTransport.h> 27 28 #include <thrift/lib/cpp2/Flags.h> 29 #include <thrift/lib/cpp2/async/RpcOptions.h> 30 31 THRIFT_FLAG_DECLARE_int64(rocket_parser_resize_period_seconds); 32 THRIFT_FLAG_DECLARE_bool(rocket_parser_dont_hold_buffer_enabled); 33 THRIFT_FLAG_DECLARE_bool(rocket_parser_hybrid_buffer_enabled); 34 35 namespace apache { 36 namespace thrift { 37 namespace rocket { 38 39 template <class T> 40 class Parser final : public folly::AsyncTransport::ReadCallback, 41 public folly::HHWheelTimer::Callback { 42 public: 43 explicit Parser( 44 T& owner, 45 std::chrono::milliseconds resizeBufferTimeout = 46 kDefaultBufferResizeInterval) newBufferLogicEnabled_(THRIFT_FLAG (rocket_parser_dont_hold_buffer_enabled))47 : newBufferLogicEnabled_( 48 THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled)), 49 hybridBufferLogicEnabled_( 50 THRIFT_FLAG(rocket_parser_hybrid_buffer_enabled) && 51 !newBufferLogicEnabled_), 52 owner_(owner), 53 resizeBufferTimeout_(resizeBufferTimeout), 54 periodicResizeBufferTimeout_( 55 THRIFT_FLAG(rocket_parser_resize_period_seconds)), 56 readBuffer_( 57 folly::IOBuf::CreateOp(), 58 hybridBufferLogicEnabled_ ? kStaticBufferSize : bufferSize_) {} 59 ~Parser()60 ~Parser() override { 61 if (currentFrameLength_) { 62 owner_.decMemoryUsage(currentFrameLength_); 63 } 64 } 65 66 // AsyncTransport::ReadCallback implementation 67 FOLLY_NOINLINE void getReadBuffer(void** bufout, size_t* lenout) override; 68 FOLLY_NOINLINE void readDataAvailable(size_t nbytes) noexcept override; 69 FOLLY_NOINLINE void readEOF() noexcept override; 70 FOLLY_NOINLINE void readErr( 71 const folly::AsyncSocketException&) noexcept override; 72 FOLLY_NOINLINE void readBufferAvailable( 73 std::unique_ptr<folly::IOBuf> /*readBuf*/) noexcept override; 74 isBufferMovable()75 bool isBufferMovable() noexcept override { return true; } 76 77 // TODO: This should be removed once the new buffer logic controlled by 78 // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable. 79 void timeoutExpired() noexcept override; 80 81 // TODO: This should be removed once the new buffer logic controlled by 82 // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable. getReadBuffer()83 const folly::IOBuf& getReadBuffer() const { return readBuffer_; } 84 85 // TODO: This should be removed once the new buffer logic controlled by 86 // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable. setReadBuffer(folly::IOBuf && buffer)87 void setReadBuffer(folly::IOBuf&& buffer) { readBuffer_ = std::move(buffer); } 88 getReadBufferSize()89 size_t getReadBufferSize() const { return bufferSize_; } 90 setReadBufferSize(size_t size)91 void setReadBufferSize(size_t size) { bufferSize_ = size; } 92 93 // TODO: This should be removed once the new buffer logic controlled by 94 // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable. 95 void resizeBuffer(); 96 getReadBufLength()97 size_t getReadBufLength() const { 98 if (newBufferLogicEnabled_) { 99 return readBufQueue_.chainLength(); 100 } else if (hybridBufferLogicEnabled_) { 101 return dynamicBuffer_ ? dynamicBuffer_->length() : readBuffer_.length(); 102 } 103 return readBuffer_.computeChainDataLength(); 104 } 105 getNewBufferLogicEnabled()106 bool getNewBufferLogicEnabled() const { return newBufferLogicEnabled_; } 107 108 static constexpr size_t kMinBufferSize{256}; 109 static constexpr size_t kMaxBufferSize{4096}; 110 111 private: 112 bool customAlloc(folly::IOBuf& buffer, size_t startOffset, size_t frameSize); 113 bool customAlloc( 114 folly::IOBufQueue& bufQueue, size_t startOffset, size_t frameSize); 115 std::unique_ptr<folly::IOBuf> getCustomAllocBuf( 116 size_t numBytes, size_t startOffset, size_t trimLength); 117 118 // "old" logic: maintain read buffer in Parser and resize as necessary, hand 119 // out frames as IOBufs pointing to the buffer 120 // TODO: remove once hybrid logic is stable 121 void getReadBufferOld(void** bufout, size_t* lenout); 122 void readDataAvailableOld(size_t nbytes); 123 // "new" logic: allocate space for frames and transfer ownership to 124 // application immediately 125 // TODO: remove once hybrid logic is stable 126 void getReadBufferNew(void** bufout, size_t* lenout); 127 void readDataAvailableNew(size_t nbytes); 128 // hybrid logic: maintain small, static read buffer in Parser (like "old" 129 // logic), allocate space in dynamic buffer and immediately transfer ownership 130 // if necessary (like "new" logic) 131 void getReadBufferHybrid(void** bufout, size_t* lenout); 132 void readDataAvailableHybrid(size_t nbytes); 133 134 // Flag that controls if the parser should use the new buffer logic 135 const bool newBufferLogicEnabled_; 136 // Flag that controls if the parser should use the hybrid buffer logic 137 const bool hybridBufferLogicEnabled_; 138 139 // TODO: This should be removed once the new buffer logic controlled by 140 // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable. 141 static constexpr std::chrono::milliseconds kDefaultBufferResizeInterval{ 142 std::chrono::seconds(3)}; 143 144 T& owner_; 145 size_t bufferSize_{kMinBufferSize}; 146 // TODO: readBuffer_, lastResizeTime_, resizeBufferTimeout_ and 147 // periodicResizeBufferTimeout_ should be removed once the new buffer logic 148 // controlled by THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is 149 // stable. 150 std::chrono::steady_clock::time_point lastResizeTime_{ 151 std::chrono::steady_clock::now()}; 152 const std::chrono::milliseconds resizeBufferTimeout_; 153 const int64_t periodicResizeBufferTimeout_; 154 apache::thrift::RpcOptions::MemAllocType allocType_{ 155 apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT}; 156 size_t currentFrameLength_{0}; 157 uint8_t currentFrameType_{0}; 158 // used by readDataAvailable or readBufferAvailable API (only one will be 159 // invoked for a given AsyncTransport) 160 folly::IOBufQueue readBufQueue_{folly::IOBufQueue::cacheChainLength()}; 161 folly::IOBuf readBuffer_; 162 163 // hybrid logic 164 static constexpr size_t kStaticBufferSize = 1024; 165 static constexpr size_t kReallocateThreshold = 64; 166 std::unique_ptr<folly::IOBuf> dynamicBuffer_{nullptr}; 167 bool reallocateIfShared_{false}; 168 }; 169 170 } // namespace rocket 171 } // namespace thrift 172 } // namespace apache 173 174 #include <thrift/lib/cpp2/transport/rocket/framing/Parser-inl.h> 175