1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <thrift/lib/cpp2/server/RequestsRegistry.h>
18 
19 #include <atomic>
20 #include <fmt/format.h>
21 #include <thrift/lib/cpp2/server/Cpp2ConnContext.h>
22 
23 namespace apache {
24 namespace thrift {
25 
26 namespace {
27 // RequestId storage.
28 // Reserve some high bits for future use. Currently the maximum id supported
29 // is 10^52, so thrift servers theoretically can generate unique request id
30 // for ~12 years, assuming the QPS is ~10 million.
31 const size_t kLsbBits = 52;
32 const uintptr_t kLsbMask = (1ull << kLsbBits) - 1;
33 
34 struct RegistryIdManager {
35  public:
getIdapache::thrift::__anonaeba8d170111::RegistryIdManager36   uint32_t getId() {
37     if (!freeIds_.empty()) {
38       auto id = *freeIds_.begin();
39       freeIds_.erase(freeIds_.begin());
40       return id;
41     }
42 
43     auto id = nextId_++;
44     CHECK(id < 4096);
45     return id;
46   }
47 
returnIdapache::thrift::__anonaeba8d170111::RegistryIdManager48   void returnId(uint32_t id) {
49     freeIds_.insert(id);
50     while (!freeIds_.empty()) {
51       auto largestId = *(--freeIds_.end());
52       if (largestId < nextId_ - 1) {
53         return;
54       }
55       DCHECK(largestId == nextId_ - 1);
56       --nextId_;
57       freeIds_.erase(largestId);
58     }
59   }
60 
61  private:
62   std::set<uint32_t> freeIds_;
63   uint32_t nextId_;
64 };
65 
registryIdManager()66 folly::Synchronized<RegistryIdManager>& registryIdManager() {
67   static auto* registryIdManagerPtr =
68       new folly::Synchronized<RegistryIdManager>();
69   return *registryIdManagerPtr;
70 }
71 } // namespace
72 
73 namespace detail {
74 
THRIFT_PLUGGABLE_FUNC_REGISTER(uint64_t,getCurrentServerTick)75 THRIFT_PLUGGABLE_FUNC_REGISTER(uint64_t, getCurrentServerTick) {
76   return 0;
77 }
78 
79 } // namespace detail
80 
increment()81 void RecentRequestCounter::increment() {
82   auto currBucket = getCurrentBucket();
83   counts_[currBucket].first += 1;
84   counts_[currBucket].second = ++currActiveCount_;
85 }
86 
decrement()87 void RecentRequestCounter::decrement() {
88   if (currActiveCount_ > 0) {
89     auto currBucket = getCurrentBucket();
90     counts_[currBucket].second = --currActiveCount_;
91   }
92 }
93 
get() const94 RecentRequestCounter::Values RecentRequestCounter::get() const {
95   Values ret;
96   uint64_t currentBucket = getCurrentBucket();
97   uint64_t i = currentBucket + kBuckets;
98 
99   for (auto& val : ret) {
100     val = counts_[i-- % kBuckets];
101   }
102 
103   return ret;
104 }
105 
getCurrentBucket() const106 uint64_t RecentRequestCounter::getCurrentBucket() const {
107   // Remove old request counts from counts_ and update lastTick_
108   uint64_t currentTick = detail::getCurrentServerTick();
109 
110   if (lastTick_ < currentTick) {
111     uint64_t tickDiff = currentTick - lastTick_;
112     uint64_t ticksToClear = tickDiff < kBuckets ? tickDiff : kBuckets;
113 
114     while (ticksToClear) {
115       auto index = (lastTick_ + ticksToClear--) % kBuckets;
116       counts_[index].first = 0;
117       counts_[index].second = currActiveCount_;
118     }
119     lastTick_ = currentTick;
120     currentBucket_ = lastTick_ % kBuckets;
121   }
122 
123   return currentBucket_;
124 }
125 
RequestsRegistry(uint64_t requestPayloadMem,uint64_t totalPayloadMem,uint16_t finishedRequestsLimit)126 RequestsRegistry::RequestsRegistry(
127     uint64_t requestPayloadMem,
128     uint64_t totalPayloadMem,
129     uint16_t finishedRequestsLimit)
130     : registryId_(registryIdManager().wlock()->getId()),
131       payloadMemoryLimitPerRequest_(requestPayloadMem),
132       payloadMemoryLimitTotal_(totalPayloadMem),
133       finishedRequestsLimit_(finishedRequestsLimit) {}
134 
~RequestsRegistry()135 RequestsRegistry::~RequestsRegistry() {
136   while (!reqFinishedList_.empty()) {
137     --finishedRequestsCount_;
138     auto& front = reqFinishedList_.front();
139     reqFinishedList_.pop_front();
140     front.decRef();
141   }
142   DCHECK(finishedRequestsCount_ == 0);
143   registryIdManager().wlock()->returnId(registryId_);
144 }
145 
getRequestId(intptr_t rootid)146 /* static */ std::string RequestsRegistry::getRequestId(intptr_t rootid) {
147   return fmt::format("{:016x}", static_cast<uintptr_t>(rootid));
148 }
149 
isThriftRootId(intptr_t rootid)150 bool RequestsRegistry::isThriftRootId(intptr_t rootid) noexcept {
151   return rootid & 0x1;
152 }
153 
genRootId()154 intptr_t RequestsRegistry::genRootId() {
155   // Ensure rootid's LSB is always 1.
156   // This is to prevent any collision with rootids on folly::RequestsContext() -
157   // those are addresses of folly::RequestContext objects.
158   return 0x1 | ((nextLocalId_++ << 1) & kLsbMask) |
159       (static_cast<uintptr_t>(registryId_) << kLsbBits);
160 }
161 
registerStub(DebugStub & req)162 void RequestsRegistry::registerStub(DebugStub& req) {
163   if (req.stateMachine_.includeInRecentRequests()) {
164     requestCounter_.increment();
165   }
166   uint64_t payloadSize = req.getPayloadSize();
167   reqActiveList_.push_back(req);
168   if (payloadSize > payloadMemoryLimitPerRequest_) {
169     req.releasePayload();
170     return;
171   }
172   reqPayloadList_.push_back(req);
173   payloadMemoryUsage_ += payloadSize;
174   evictStubPayloads();
175 }
176 
moveToFinishedList(RequestsRegistry::DebugStub & stub)177 void RequestsRegistry::moveToFinishedList(RequestsRegistry::DebugStub& stub) {
178   if (stub.stateMachine_.includeInRecentRequests()) {
179     requestCounter_.decrement();
180   }
181   if (finishedRequestsLimit_ == 0) {
182     return;
183   }
184 
185   stub.activeRequestsRegistryHook_.unlink();
186   stub.incRef();
187   stub.prepareAsFinished();
188   ++finishedRequestsCount_;
189   reqFinishedList_.push_back(stub);
190 
191   if (finishedRequestsCount_ > finishedRequestsLimit_) {
192     DCHECK(finishedRequestsLimit_ > 0);
193     --finishedRequestsCount_;
194     auto& front = reqFinishedList_.front();
195     reqFinishedList_.pop_front();
196     front.decRef();
197   }
198 }
199 
getMethodName() const200 const std::string& RequestsRegistry::DebugStub::getMethodName() const {
201   return getCpp2RequestContext() ? getCpp2RequestContext()->getMethodName()
202                                  : methodNameIfFinished_;
203 }
204 
getLocalAddress() const205 const folly::SocketAddress* RequestsRegistry::DebugStub::getLocalAddress()
206     const {
207   return getCpp2RequestContext() ? getCpp2RequestContext()->getLocalAddress()
208                                  : &localAddressIfFinished_;
209 }
210 
getPeerAddress() const211 const folly::SocketAddress* RequestsRegistry::DebugStub::getPeerAddress()
212     const {
213   return getCpp2RequestContext() ? getCpp2RequestContext()->getPeerAddress()
214                                  : &peerAddressIfFinished_;
215 }
216 
prepareAsFinished()217 void RequestsRegistry::DebugStub::prepareAsFinished() {
218   finished_ = std::chrono::steady_clock::now();
219   rctx_.reset();
220   methodNameIfFinished_ =
221       const_cast<Cpp2RequestContext*>(reqContext_)->releaseMethodName();
222   peerAddressIfFinished_ = *reqContext_->getPeerAddress();
223   localAddressIfFinished_ = *reqContext_->getLocalAddress();
224   reqContext_ = nullptr;
225   req_ = nullptr;
226 }
227 
incRef()228 void RequestsRegistry::DebugStub::incRef() noexcept {
229   refCount_++;
230 }
231 
decRef()232 void RequestsRegistry::DebugStub::decRef() noexcept {
233   if (--refCount_ == 0) {
234     this->~DebugStub();
235     free(this);
236   }
237 }
238 
239 } // namespace thrift
240 } // namespace apache
241