1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <chrono>
20 #include <utility>
21 
22 #include <folly/ExceptionWrapper.h>
23 #include <folly/Function.h>
24 #include <folly/io/IOBuf.h>
25 #include <folly/io/IOBufQueue.h>
26 #include <folly/io/async/AsyncTransport.h>
27 
28 #include <thrift/lib/cpp2/Flags.h>
29 #include <thrift/lib/cpp2/async/RpcOptions.h>
30 
31 THRIFT_FLAG_DECLARE_int64(rocket_parser_resize_period_seconds);
32 THRIFT_FLAG_DECLARE_bool(rocket_parser_dont_hold_buffer_enabled);
33 THRIFT_FLAG_DECLARE_bool(rocket_parser_hybrid_buffer_enabled);
34 
35 namespace apache {
36 namespace thrift {
37 namespace rocket {
38 
39 template <class T>
40 class Parser final : public folly::AsyncTransport::ReadCallback,
41                      public folly::HHWheelTimer::Callback {
42  public:
43   explicit Parser(
44       T& owner,
45       std::chrono::milliseconds resizeBufferTimeout =
46           kDefaultBufferResizeInterval)
newBufferLogicEnabled_(THRIFT_FLAG (rocket_parser_dont_hold_buffer_enabled))47       : newBufferLogicEnabled_(
48             THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled)),
49         hybridBufferLogicEnabled_(
50             THRIFT_FLAG(rocket_parser_hybrid_buffer_enabled) &&
51             !newBufferLogicEnabled_),
52         owner_(owner),
53         resizeBufferTimeout_(resizeBufferTimeout),
54         periodicResizeBufferTimeout_(
55             THRIFT_FLAG(rocket_parser_resize_period_seconds)),
56         readBuffer_(
57             folly::IOBuf::CreateOp(),
58             hybridBufferLogicEnabled_ ? kStaticBufferSize : bufferSize_) {}
59 
~Parser()60   ~Parser() override {
61     if (currentFrameLength_) {
62       owner_.decMemoryUsage(currentFrameLength_);
63     }
64   }
65 
66   // AsyncTransport::ReadCallback implementation
67   FOLLY_NOINLINE void getReadBuffer(void** bufout, size_t* lenout) override;
68   FOLLY_NOINLINE void readDataAvailable(size_t nbytes) noexcept override;
69   FOLLY_NOINLINE void readEOF() noexcept override;
70   FOLLY_NOINLINE void readErr(
71       const folly::AsyncSocketException&) noexcept override;
72   FOLLY_NOINLINE void readBufferAvailable(
73       std::unique_ptr<folly::IOBuf> /*readBuf*/) noexcept override;
74 
isBufferMovable()75   bool isBufferMovable() noexcept override { return true; }
76 
77   // TODO: This should be removed once the new buffer logic controlled by
78   // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable.
79   void timeoutExpired() noexcept override;
80 
81   // TODO: This should be removed once the new buffer logic controlled by
82   // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable.
getReadBuffer()83   const folly::IOBuf& getReadBuffer() const { return readBuffer_; }
84 
85   // TODO: This should be removed once the new buffer logic controlled by
86   // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable.
setReadBuffer(folly::IOBuf && buffer)87   void setReadBuffer(folly::IOBuf&& buffer) { readBuffer_ = std::move(buffer); }
88 
getReadBufferSize()89   size_t getReadBufferSize() const { return bufferSize_; }
90 
setReadBufferSize(size_t size)91   void setReadBufferSize(size_t size) { bufferSize_ = size; }
92 
93   // TODO: This should be removed once the new buffer logic controlled by
94   // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable.
95   void resizeBuffer();
96 
getReadBufLength()97   size_t getReadBufLength() const {
98     if (newBufferLogicEnabled_) {
99       return readBufQueue_.chainLength();
100     } else if (hybridBufferLogicEnabled_) {
101       return dynamicBuffer_ ? dynamicBuffer_->length() : readBuffer_.length();
102     }
103     return readBuffer_.computeChainDataLength();
104   }
105 
getNewBufferLogicEnabled()106   bool getNewBufferLogicEnabled() const { return newBufferLogicEnabled_; }
107 
108   static constexpr size_t kMinBufferSize{256};
109   static constexpr size_t kMaxBufferSize{4096};
110 
111  private:
112   bool customAlloc(folly::IOBuf& buffer, size_t startOffset, size_t frameSize);
113   bool customAlloc(
114       folly::IOBufQueue& bufQueue, size_t startOffset, size_t frameSize);
115   std::unique_ptr<folly::IOBuf> getCustomAllocBuf(
116       size_t numBytes, size_t startOffset, size_t trimLength);
117 
118   // "old" logic: maintain read buffer in Parser and resize as necessary, hand
119   // out frames as IOBufs pointing to the buffer
120   // TODO: remove once hybrid logic is stable
121   void getReadBufferOld(void** bufout, size_t* lenout);
122   void readDataAvailableOld(size_t nbytes);
123   // "new" logic: allocate space for frames and transfer ownership to
124   // application immediately
125   // TODO: remove once hybrid logic is stable
126   void getReadBufferNew(void** bufout, size_t* lenout);
127   void readDataAvailableNew(size_t nbytes);
128   // hybrid logic: maintain small, static read buffer in Parser (like "old"
129   // logic), allocate space in dynamic buffer and immediately transfer ownership
130   // if necessary (like "new" logic)
131   void getReadBufferHybrid(void** bufout, size_t* lenout);
132   void readDataAvailableHybrid(size_t nbytes);
133 
134   // Flag that controls if the parser should use the new buffer logic
135   const bool newBufferLogicEnabled_;
136   // Flag that controls if the parser should use the hybrid buffer logic
137   const bool hybridBufferLogicEnabled_;
138 
139   // TODO: This should be removed once the new buffer logic controlled by
140   // THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is stable.
141   static constexpr std::chrono::milliseconds kDefaultBufferResizeInterval{
142       std::chrono::seconds(3)};
143 
144   T& owner_;
145   size_t bufferSize_{kMinBufferSize};
146   // TODO: readBuffer_, lastResizeTime_, resizeBufferTimeout_ and
147   // periodicResizeBufferTimeout_ should be removed once the new buffer logic
148   // controlled by THRIFT_FLAG(rocket_parser_dont_hold_buffer_enabled) is
149   // stable.
150   std::chrono::steady_clock::time_point lastResizeTime_{
151       std::chrono::steady_clock::now()};
152   const std::chrono::milliseconds resizeBufferTimeout_;
153   const int64_t periodicResizeBufferTimeout_;
154   apache::thrift::RpcOptions::MemAllocType allocType_{
155       apache::thrift::RpcOptions::MemAllocType::ALLOC_DEFAULT};
156   size_t currentFrameLength_{0};
157   uint8_t currentFrameType_{0};
158   // used by readDataAvailable or readBufferAvailable API (only one will be
159   // invoked for a given AsyncTransport)
160   folly::IOBufQueue readBufQueue_{folly::IOBufQueue::cacheChainLength()};
161   folly::IOBuf readBuffer_;
162 
163   // hybrid logic
164   static constexpr size_t kStaticBufferSize = 1024;
165   static constexpr size_t kReallocateThreshold = 64;
166   std::unique_ptr<folly::IOBuf> dynamicBuffer_{nullptr};
167   bool reallocateIfShared_{false};
168 };
169 
170 } // namespace rocket
171 } // namespace thrift
172 } // namespace apache
173 
174 #include <thrift/lib/cpp2/transport/rocket/framing/Parser-inl.h>
175