1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <array>
20 #include <chrono>
21 #include <fmt/core.h>
22 #include <folly/IntrusiveList.h>
23 #include <folly/SocketAddress.h>
24 #include <folly/io/IOBuf.h>
25 #include <folly/io/async/Request.h>
26 #include <thrift/lib/cpp/protocol/TProtocolTypes.h>
27 #include <thrift/lib/cpp2/PluggableFunction.h>
28 #include <thrift/lib/cpp2/transport/core/RequestStateMachine.h>
29 #include <thrift/lib/cpp2/transport/rocket/Types.h>
30 
31 namespace apache {
32 namespace thrift {
33 
34 class Cpp2RequestContext;
35 class ResponseChannelRequest;
36 
37 namespace detail {
38 
39 // Returns the current "tick" of the Thrift server -- a monotonically
40 // increasing counter that effectively determines the size of the time
41 // interval for each bucket in the RecentRequestCounter.
42 THRIFT_PLUGGABLE_FUNC_DECLARE(uint64_t, getCurrentServerTick);
43 
44 } // namespace detail
45 
46 // Helper class to track recently received request counts
47 class RecentRequestCounter {
48  public:
49   static inline constexpr uint64_t kBuckets = 512ul;
50   using ArrivalCount = int32_t;
51   using ActiveCount = int32_t;
52   using Values = std::array<std::pair<ArrivalCount, ActiveCount>, kBuckets>;
53 
54   void increment();
55   void decrement();
56   Values get() const;
57 
58  private:
59   uint64_t getCurrentBucket() const;
60 
61   mutable uint64_t currentBucket_{};
62   mutable uint64_t lastTick_{};
63   mutable Values counts_{};
64   mutable uint64_t currActiveCount_{};
65 };
66 
67 /**
68  * Stores a list of request stubs in memory.
69  *
70  * Each IO worker stores a single RequestsRegistry instance as its
71  * member, so that it can intercept and insert request data into the registry.
72  *
73  * Note that read operations to the request list should be always executed in
74  * the same thread as write operations to avoid race condition, which means
75  * most of the time reads should be issued to the event base which the
76  * corresponding registry belongs to, as a task.
77  */
78 class RequestsRegistry {
79  public:
80   /**
81    * A small piece of information associated with those thrift requests that
82    * we are tracking in the registry. The stub lives alongside the request
83    * in the same chunk of memory.
84    * Requests Registry is just a fancy list of such DebugStubs.
85    *
86    * DebugStub tracks request payload to its corresponding thrift
87    * request. Handles to the payloads can be optionally released by its
88    * parent request registry, indicating the payload memory has been reclaimed
89    * to control memory usage. DebugStub should be unlinked from lists
90    * only during:
91    *   1. Destruction of DebugStub.
92    *   2. Memory collection from RequestsRegistry.
93    */
94   class DebugStub {
95     friend class RequestsRegistry;
96     friend class Cpp2Worker;
97 
98    public:
DebugStub(RequestsRegistry & reqRegistry,ResponseChannelRequest & req,const Cpp2RequestContext & reqContext,std::shared_ptr<folly::RequestContext> rctx,protocol::PROTOCOL_TYPES protoId,rocket::Payload && debugPayload,RequestStateMachine & stateMachine)99     DebugStub(
100         RequestsRegistry& reqRegistry,
101         ResponseChannelRequest& req,
102         const Cpp2RequestContext& reqContext,
103         std::shared_ptr<folly::RequestContext> rctx,
104         protocol::PROTOCOL_TYPES protoId,
105         rocket::Payload&& debugPayload,
106         RequestStateMachine& stateMachine)
107         : req_(&req),
108           reqContext_(&reqContext),
109           rctx_(std::move(rctx)),
110           protoId_(protoId),
111           payload_(std::move(debugPayload)),
112           registry_(&reqRegistry),
113           rootRequestContextId_(rctx_->getRootId()),
114           stateMachine_(stateMachine) {
115       reqRegistry.registerStub(*this);
116     }
117 
118     /**
119      * DebugStub objects are oblivious to memory collection, but they should
120      * notify their owner registry when unlinking themselves.
121      */
~DebugStub()122     ~DebugStub() {
123       if (payload_.hasData()) {
124         DCHECK(activeRequestsPayloadHook_.is_linked());
125         registry_->onStubPayloadUnlinked(*this);
126       }
127     }
128 
getRequest()129     const ResponseChannelRequest* getRequest() const { return req_; }
130 
getCpp2RequestContext()131     const Cpp2RequestContext* getCpp2RequestContext() const {
132       return reqContext_;
133     }
134 
getTimestamp()135     std::chrono::steady_clock::time_point getTimestamp() const {
136       return stateMachine_.started();
137     }
138 
getFinished()139     std::chrono::steady_clock::time_point getFinished() const {
140       return finished_;
141     }
142 
getRootRequestContextId()143     intptr_t getRootRequestContextId() const { return rootRequestContextId_; }
144 
getRequestContext()145     std::shared_ptr<folly::RequestContext> getRequestContext() const {
146       return rctx_;
147     }
148 
149     const std::string& getMethodName() const;
150     const folly::SocketAddress* getLocalAddress() const;
151     const folly::SocketAddress* getPeerAddress() const;
152 
153     /**
154      * Clones the payload buffer to data accessors. If the buffer is already
155      * released by memory collection, returns an empty unique_ptr.
156      * Since RequestsRegistry doesn'y provide synchronization by default,
157      * this should be called from the IO worker which also owns the same
158      * RequestsRegistry.
159      */
clonePayload()160     rocket::Payload clonePayload() const { return payload_.clone(); }
161 
getProtoId()162     protocol::PROTOCOL_TYPES getProtoId() const { return protoId_; }
163 
getStartedProcessing()164     bool getStartedProcessing() const {
165       return stateMachine_.getStartedProcessing();
166     }
167 
168    private:
getPayloadSize()169     uint64_t getPayloadSize() const { return payload_.dataSize(); }
releasePayload()170     void releasePayload() { payload_ = rocket::Payload(); }
171 
172     void prepareAsFinished();
173 
174     void incRef() noexcept;
175     void decRef() noexcept;
176 
177     std::string methodNameIfFinished_;
178     folly::SocketAddress peerAddressIfFinished_;
179     folly::SocketAddress localAddressIfFinished_;
180     ResponseChannelRequest* req_;
181     const Cpp2RequestContext* reqContext_;
182     std::shared_ptr<folly::RequestContext> rctx_;
183     const protocol::PROTOCOL_TYPES protoId_;
184     rocket::Payload payload_;
185     std::chrono::steady_clock::time_point finished_{
186         std::chrono::steady_clock::duration::zero()};
187     RequestsRegistry* registry_;
188     const intptr_t rootRequestContextId_;
189     folly::IntrusiveListHook activeRequestsPayloadHook_;
190     folly::IntrusiveListHook activeRequestsRegistryHook_;
191     size_t refCount_{1};
192     RequestStateMachine& stateMachine_;
193   };
194 
195   class Deleter {
196    public:
stub_(stub)197     Deleter(DebugStub* stub = nullptr) : stub_(stub) {}
198     template <typename U>
Deleter(std::default_delete<U> &&)199     /* implicit */ Deleter(std::default_delete<U>&&) : stub_(nullptr) {}
200 
201     template <typename T>
operator()202     void operator()(T* p) {
203       if (!stub_) {
204         delete p;
205       } else {
206         stub_->registry_->moveToFinishedList(*stub_);
207         p->~T();
208         // We release ownership over the stub, but it still may be held alive
209         // by reqFinishedList_
210         stub_->decRef();
211       }
212     }
213 
214     template <typename U>
215     Deleter& operator=(std::default_delete<U>&&) {
216       stub_ = nullptr;
217       return *this;
218     }
219 
220    private:
221     DebugStub* stub_;
222   };
223 
224   template <typename T, typename... Args>
makeRequest(Args &&...args)225   static std::unique_ptr<T, Deleter> makeRequest(Args&&... args) {
226     static_assert(std::is_base_of<ResponseChannelRequest, T>::value, "");
227     auto offset = sizeof(std::aligned_storage_t<sizeof(DebugStub), alignof(T)>);
228     DebugStub* pStub = reinterpret_cast<DebugStub*>(malloc(offset + sizeof(T)));
229     T* pT = reinterpret_cast<T*>(reinterpret_cast<char*>(pStub) + offset);
230     new (pT) T(*pStub, std::forward<Args>(args)...);
231     return std::unique_ptr<T, Deleter>(pT, pStub);
232   }
233 
234   intptr_t genRootId();
235   static bool isThriftRootId(intptr_t) noexcept;
236   static std::string getRequestId(intptr_t rootid);
237 
238   using ActiveRequestDebugStubList =
239       folly::IntrusiveList<DebugStub, &DebugStub::activeRequestsRegistryHook_>;
240   using ActiveRequestPayloadList =
241       folly::IntrusiveList<DebugStub, &DebugStub::activeRequestsPayloadHook_>;
242 
243   RequestsRegistry(
244       uint64_t requestPayloadMem,
245       uint64_t totalPayloadMem,
246       uint16_t finishedRequestsLimit);
247   ~RequestsRegistry();
248 
getActive()249   const ActiveRequestDebugStubList& getActive() { return reqActiveList_; }
250 
getFinished()251   const ActiveRequestDebugStubList& getFinished() { return reqFinishedList_; }
252 
253   void registerStub(DebugStub& req);
254 
getRequestCounter()255   const RecentRequestCounter& getRequestCounter() const {
256     return requestCounter_;
257   }
258 
259  private:
260   void moveToFinishedList(DebugStub& stub);
261 
evictStubPayloads()262   void evictStubPayloads() {
263     while (payloadMemoryUsage_ > payloadMemoryLimitTotal_) {
264       auto& stub = nextStubToEvict();
265 
266       onStubPayloadUnlinked(stub);
267       reqPayloadList_.erase(reqPayloadList_.iterator_to(stub));
268       stub.releasePayload();
269     }
270   }
nextStubToEvict()271   DebugStub& nextStubToEvict() { return reqPayloadList_.front(); }
onStubPayloadUnlinked(const DebugStub & stub)272   void onStubPayloadUnlinked(const DebugStub& stub) {
273     uint64_t payloadSize = stub.getPayloadSize();
274     DCHECK(payloadMemoryUsage_ >= payloadSize);
275     payloadMemoryUsage_ -= payloadSize;
276   }
277   uint32_t registryId_;
278   uint64_t nextLocalId_{0};
279   uint64_t payloadMemoryLimitPerRequest_;
280   uint64_t payloadMemoryLimitTotal_;
281   uint64_t payloadMemoryUsage_{0};
282   ActiveRequestDebugStubList reqActiveList_;
283   ActiveRequestPayloadList reqPayloadList_;
284   ActiveRequestDebugStubList reqFinishedList_;
285   uint16_t finishedRequestsCount_{0};
286   uint16_t finishedRequestsLimit_;
287   RecentRequestCounter requestCounter_;
288 };
289 
290 } // namespace thrift
291 } // namespace apache
292