1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <array> 20 #include <chrono> 21 #include <fmt/core.h> 22 #include <folly/IntrusiveList.h> 23 #include <folly/SocketAddress.h> 24 #include <folly/io/IOBuf.h> 25 #include <folly/io/async/Request.h> 26 #include <thrift/lib/cpp/protocol/TProtocolTypes.h> 27 #include <thrift/lib/cpp2/PluggableFunction.h> 28 #include <thrift/lib/cpp2/transport/core/RequestStateMachine.h> 29 #include <thrift/lib/cpp2/transport/rocket/Types.h> 30 31 namespace apache { 32 namespace thrift { 33 34 class Cpp2RequestContext; 35 class ResponseChannelRequest; 36 37 namespace detail { 38 39 // Returns the current "tick" of the Thrift server -- a monotonically 40 // increasing counter that effectively determines the size of the time 41 // interval for each bucket in the RecentRequestCounter. 42 THRIFT_PLUGGABLE_FUNC_DECLARE(uint64_t, getCurrentServerTick); 43 44 } // namespace detail 45 46 // Helper class to track recently received request counts 47 class RecentRequestCounter { 48 public: 49 static inline constexpr uint64_t kBuckets = 512ul; 50 using ArrivalCount = int32_t; 51 using ActiveCount = int32_t; 52 using Values = std::array<std::pair<ArrivalCount, ActiveCount>, kBuckets>; 53 54 void increment(); 55 void decrement(); 56 Values get() const; 57 58 private: 59 uint64_t getCurrentBucket() const; 60 61 mutable uint64_t currentBucket_{}; 62 mutable uint64_t lastTick_{}; 63 mutable Values counts_{}; 64 mutable uint64_t currActiveCount_{}; 65 }; 66 67 /** 68 * Stores a list of request stubs in memory. 69 * 70 * Each IO worker stores a single RequestsRegistry instance as its 71 * member, so that it can intercept and insert request data into the registry. 72 * 73 * Note that read operations to the request list should be always executed in 74 * the same thread as write operations to avoid race condition, which means 75 * most of the time reads should be issued to the event base which the 76 * corresponding registry belongs to, as a task. 77 */ 78 class RequestsRegistry { 79 public: 80 /** 81 * A small piece of information associated with those thrift requests that 82 * we are tracking in the registry. The stub lives alongside the request 83 * in the same chunk of memory. 84 * Requests Registry is just a fancy list of such DebugStubs. 85 * 86 * DebugStub tracks request payload to its corresponding thrift 87 * request. Handles to the payloads can be optionally released by its 88 * parent request registry, indicating the payload memory has been reclaimed 89 * to control memory usage. DebugStub should be unlinked from lists 90 * only during: 91 * 1. Destruction of DebugStub. 92 * 2. Memory collection from RequestsRegistry. 93 */ 94 class DebugStub { 95 friend class RequestsRegistry; 96 friend class Cpp2Worker; 97 98 public: DebugStub(RequestsRegistry & reqRegistry,ResponseChannelRequest & req,const Cpp2RequestContext & reqContext,std::shared_ptr<folly::RequestContext> rctx,protocol::PROTOCOL_TYPES protoId,rocket::Payload && debugPayload,RequestStateMachine & stateMachine)99 DebugStub( 100 RequestsRegistry& reqRegistry, 101 ResponseChannelRequest& req, 102 const Cpp2RequestContext& reqContext, 103 std::shared_ptr<folly::RequestContext> rctx, 104 protocol::PROTOCOL_TYPES protoId, 105 rocket::Payload&& debugPayload, 106 RequestStateMachine& stateMachine) 107 : req_(&req), 108 reqContext_(&reqContext), 109 rctx_(std::move(rctx)), 110 protoId_(protoId), 111 payload_(std::move(debugPayload)), 112 registry_(&reqRegistry), 113 rootRequestContextId_(rctx_->getRootId()), 114 stateMachine_(stateMachine) { 115 reqRegistry.registerStub(*this); 116 } 117 118 /** 119 * DebugStub objects are oblivious to memory collection, but they should 120 * notify their owner registry when unlinking themselves. 121 */ ~DebugStub()122 ~DebugStub() { 123 if (payload_.hasData()) { 124 DCHECK(activeRequestsPayloadHook_.is_linked()); 125 registry_->onStubPayloadUnlinked(*this); 126 } 127 } 128 getRequest()129 const ResponseChannelRequest* getRequest() const { return req_; } 130 getCpp2RequestContext()131 const Cpp2RequestContext* getCpp2RequestContext() const { 132 return reqContext_; 133 } 134 getTimestamp()135 std::chrono::steady_clock::time_point getTimestamp() const { 136 return stateMachine_.started(); 137 } 138 getFinished()139 std::chrono::steady_clock::time_point getFinished() const { 140 return finished_; 141 } 142 getRootRequestContextId()143 intptr_t getRootRequestContextId() const { return rootRequestContextId_; } 144 getRequestContext()145 std::shared_ptr<folly::RequestContext> getRequestContext() const { 146 return rctx_; 147 } 148 149 const std::string& getMethodName() const; 150 const folly::SocketAddress* getLocalAddress() const; 151 const folly::SocketAddress* getPeerAddress() const; 152 153 /** 154 * Clones the payload buffer to data accessors. If the buffer is already 155 * released by memory collection, returns an empty unique_ptr. 156 * Since RequestsRegistry doesn'y provide synchronization by default, 157 * this should be called from the IO worker which also owns the same 158 * RequestsRegistry. 159 */ clonePayload()160 rocket::Payload clonePayload() const { return payload_.clone(); } 161 getProtoId()162 protocol::PROTOCOL_TYPES getProtoId() const { return protoId_; } 163 getStartedProcessing()164 bool getStartedProcessing() const { 165 return stateMachine_.getStartedProcessing(); 166 } 167 168 private: getPayloadSize()169 uint64_t getPayloadSize() const { return payload_.dataSize(); } releasePayload()170 void releasePayload() { payload_ = rocket::Payload(); } 171 172 void prepareAsFinished(); 173 174 void incRef() noexcept; 175 void decRef() noexcept; 176 177 std::string methodNameIfFinished_; 178 folly::SocketAddress peerAddressIfFinished_; 179 folly::SocketAddress localAddressIfFinished_; 180 ResponseChannelRequest* req_; 181 const Cpp2RequestContext* reqContext_; 182 std::shared_ptr<folly::RequestContext> rctx_; 183 const protocol::PROTOCOL_TYPES protoId_; 184 rocket::Payload payload_; 185 std::chrono::steady_clock::time_point finished_{ 186 std::chrono::steady_clock::duration::zero()}; 187 RequestsRegistry* registry_; 188 const intptr_t rootRequestContextId_; 189 folly::IntrusiveListHook activeRequestsPayloadHook_; 190 folly::IntrusiveListHook activeRequestsRegistryHook_; 191 size_t refCount_{1}; 192 RequestStateMachine& stateMachine_; 193 }; 194 195 class Deleter { 196 public: stub_(stub)197 Deleter(DebugStub* stub = nullptr) : stub_(stub) {} 198 template <typename U> Deleter(std::default_delete<U> &&)199 /* implicit */ Deleter(std::default_delete<U>&&) : stub_(nullptr) {} 200 201 template <typename T> operator()202 void operator()(T* p) { 203 if (!stub_) { 204 delete p; 205 } else { 206 stub_->registry_->moveToFinishedList(*stub_); 207 p->~T(); 208 // We release ownership over the stub, but it still may be held alive 209 // by reqFinishedList_ 210 stub_->decRef(); 211 } 212 } 213 214 template <typename U> 215 Deleter& operator=(std::default_delete<U>&&) { 216 stub_ = nullptr; 217 return *this; 218 } 219 220 private: 221 DebugStub* stub_; 222 }; 223 224 template <typename T, typename... Args> makeRequest(Args &&...args)225 static std::unique_ptr<T, Deleter> makeRequest(Args&&... args) { 226 static_assert(std::is_base_of<ResponseChannelRequest, T>::value, ""); 227 auto offset = sizeof(std::aligned_storage_t<sizeof(DebugStub), alignof(T)>); 228 DebugStub* pStub = reinterpret_cast<DebugStub*>(malloc(offset + sizeof(T))); 229 T* pT = reinterpret_cast<T*>(reinterpret_cast<char*>(pStub) + offset); 230 new (pT) T(*pStub, std::forward<Args>(args)...); 231 return std::unique_ptr<T, Deleter>(pT, pStub); 232 } 233 234 intptr_t genRootId(); 235 static bool isThriftRootId(intptr_t) noexcept; 236 static std::string getRequestId(intptr_t rootid); 237 238 using ActiveRequestDebugStubList = 239 folly::IntrusiveList<DebugStub, &DebugStub::activeRequestsRegistryHook_>; 240 using ActiveRequestPayloadList = 241 folly::IntrusiveList<DebugStub, &DebugStub::activeRequestsPayloadHook_>; 242 243 RequestsRegistry( 244 uint64_t requestPayloadMem, 245 uint64_t totalPayloadMem, 246 uint16_t finishedRequestsLimit); 247 ~RequestsRegistry(); 248 getActive()249 const ActiveRequestDebugStubList& getActive() { return reqActiveList_; } 250 getFinished()251 const ActiveRequestDebugStubList& getFinished() { return reqFinishedList_; } 252 253 void registerStub(DebugStub& req); 254 getRequestCounter()255 const RecentRequestCounter& getRequestCounter() const { 256 return requestCounter_; 257 } 258 259 private: 260 void moveToFinishedList(DebugStub& stub); 261 evictStubPayloads()262 void evictStubPayloads() { 263 while (payloadMemoryUsage_ > payloadMemoryLimitTotal_) { 264 auto& stub = nextStubToEvict(); 265 266 onStubPayloadUnlinked(stub); 267 reqPayloadList_.erase(reqPayloadList_.iterator_to(stub)); 268 stub.releasePayload(); 269 } 270 } nextStubToEvict()271 DebugStub& nextStubToEvict() { return reqPayloadList_.front(); } onStubPayloadUnlinked(const DebugStub & stub)272 void onStubPayloadUnlinked(const DebugStub& stub) { 273 uint64_t payloadSize = stub.getPayloadSize(); 274 DCHECK(payloadMemoryUsage_ >= payloadSize); 275 payloadMemoryUsage_ -= payloadSize; 276 } 277 uint32_t registryId_; 278 uint64_t nextLocalId_{0}; 279 uint64_t payloadMemoryLimitPerRequest_; 280 uint64_t payloadMemoryLimitTotal_; 281 uint64_t payloadMemoryUsage_{0}; 282 ActiveRequestDebugStubList reqActiveList_; 283 ActiveRequestPayloadList reqPayloadList_; 284 ActiveRequestDebugStubList reqFinishedList_; 285 uint16_t finishedRequestsCount_{0}; 286 uint16_t finishedRequestsLimit_; 287 RecentRequestCounter requestCounter_; 288 }; 289 290 } // namespace thrift 291 } // namespace apache 292