1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <thrift/lib/cpp2/server/RequestsRegistry.h>
18
19 #include <atomic>
20 #include <fmt/format.h>
21 #include <thrift/lib/cpp2/server/Cpp2ConnContext.h>
22
23 namespace apache {
24 namespace thrift {
25
26 namespace {
27 // RequestId storage.
28 // Reserve some high bits for future use. Currently the maximum id supported
29 // is 10^52, so thrift servers theoretically can generate unique request id
30 // for ~12 years, assuming the QPS is ~10 million.
31 const size_t kLsbBits = 52;
32 const uintptr_t kLsbMask = (1ull << kLsbBits) - 1;
33
34 struct RegistryIdManager {
35 public:
getIdapache::thrift::__anonaeba8d170111::RegistryIdManager36 uint32_t getId() {
37 if (!freeIds_.empty()) {
38 auto id = *freeIds_.begin();
39 freeIds_.erase(freeIds_.begin());
40 return id;
41 }
42
43 auto id = nextId_++;
44 CHECK(id < 4096);
45 return id;
46 }
47
returnIdapache::thrift::__anonaeba8d170111::RegistryIdManager48 void returnId(uint32_t id) {
49 freeIds_.insert(id);
50 while (!freeIds_.empty()) {
51 auto largestId = *(--freeIds_.end());
52 if (largestId < nextId_ - 1) {
53 return;
54 }
55 DCHECK(largestId == nextId_ - 1);
56 --nextId_;
57 freeIds_.erase(largestId);
58 }
59 }
60
61 private:
62 std::set<uint32_t> freeIds_;
63 uint32_t nextId_;
64 };
65
registryIdManager()66 folly::Synchronized<RegistryIdManager>& registryIdManager() {
67 static auto* registryIdManagerPtr =
68 new folly::Synchronized<RegistryIdManager>();
69 return *registryIdManagerPtr;
70 }
71 } // namespace
72
73 namespace detail {
74
THRIFT_PLUGGABLE_FUNC_REGISTER(uint64_t,getCurrentServerTick)75 THRIFT_PLUGGABLE_FUNC_REGISTER(uint64_t, getCurrentServerTick) {
76 return 0;
77 }
78
79 } // namespace detail
80
increment()81 void RecentRequestCounter::increment() {
82 auto currBucket = getCurrentBucket();
83 counts_[currBucket].first += 1;
84 counts_[currBucket].second = ++currActiveCount_;
85 }
86
decrement()87 void RecentRequestCounter::decrement() {
88 if (currActiveCount_ > 0) {
89 auto currBucket = getCurrentBucket();
90 counts_[currBucket].second = --currActiveCount_;
91 }
92 }
93
get() const94 RecentRequestCounter::Values RecentRequestCounter::get() const {
95 Values ret;
96 uint64_t currentBucket = getCurrentBucket();
97 uint64_t i = currentBucket + kBuckets;
98
99 for (auto& val : ret) {
100 val = counts_[i-- % kBuckets];
101 }
102
103 return ret;
104 }
105
getCurrentBucket() const106 uint64_t RecentRequestCounter::getCurrentBucket() const {
107 // Remove old request counts from counts_ and update lastTick_
108 uint64_t currentTick = detail::getCurrentServerTick();
109
110 if (lastTick_ < currentTick) {
111 uint64_t tickDiff = currentTick - lastTick_;
112 uint64_t ticksToClear = tickDiff < kBuckets ? tickDiff : kBuckets;
113
114 while (ticksToClear) {
115 auto index = (lastTick_ + ticksToClear--) % kBuckets;
116 counts_[index].first = 0;
117 counts_[index].second = currActiveCount_;
118 }
119 lastTick_ = currentTick;
120 currentBucket_ = lastTick_ % kBuckets;
121 }
122
123 return currentBucket_;
124 }
125
RequestsRegistry(uint64_t requestPayloadMem,uint64_t totalPayloadMem,uint16_t finishedRequestsLimit)126 RequestsRegistry::RequestsRegistry(
127 uint64_t requestPayloadMem,
128 uint64_t totalPayloadMem,
129 uint16_t finishedRequestsLimit)
130 : registryId_(registryIdManager().wlock()->getId()),
131 payloadMemoryLimitPerRequest_(requestPayloadMem),
132 payloadMemoryLimitTotal_(totalPayloadMem),
133 finishedRequestsLimit_(finishedRequestsLimit) {}
134
~RequestsRegistry()135 RequestsRegistry::~RequestsRegistry() {
136 while (!reqFinishedList_.empty()) {
137 --finishedRequestsCount_;
138 auto& front = reqFinishedList_.front();
139 reqFinishedList_.pop_front();
140 front.decRef();
141 }
142 DCHECK(finishedRequestsCount_ == 0);
143 registryIdManager().wlock()->returnId(registryId_);
144 }
145
getRequestId(intptr_t rootid)146 /* static */ std::string RequestsRegistry::getRequestId(intptr_t rootid) {
147 return fmt::format("{:016x}", static_cast<uintptr_t>(rootid));
148 }
149
isThriftRootId(intptr_t rootid)150 bool RequestsRegistry::isThriftRootId(intptr_t rootid) noexcept {
151 return rootid & 0x1;
152 }
153
genRootId()154 intptr_t RequestsRegistry::genRootId() {
155 // Ensure rootid's LSB is always 1.
156 // This is to prevent any collision with rootids on folly::RequestsContext() -
157 // those are addresses of folly::RequestContext objects.
158 return 0x1 | ((nextLocalId_++ << 1) & kLsbMask) |
159 (static_cast<uintptr_t>(registryId_) << kLsbBits);
160 }
161
registerStub(DebugStub & req)162 void RequestsRegistry::registerStub(DebugStub& req) {
163 if (req.stateMachine_.includeInRecentRequests()) {
164 requestCounter_.increment();
165 }
166 uint64_t payloadSize = req.getPayloadSize();
167 reqActiveList_.push_back(req);
168 if (payloadSize > payloadMemoryLimitPerRequest_) {
169 req.releasePayload();
170 return;
171 }
172 reqPayloadList_.push_back(req);
173 payloadMemoryUsage_ += payloadSize;
174 evictStubPayloads();
175 }
176
moveToFinishedList(RequestsRegistry::DebugStub & stub)177 void RequestsRegistry::moveToFinishedList(RequestsRegistry::DebugStub& stub) {
178 if (stub.stateMachine_.includeInRecentRequests()) {
179 requestCounter_.decrement();
180 }
181 if (finishedRequestsLimit_ == 0) {
182 return;
183 }
184
185 stub.activeRequestsRegistryHook_.unlink();
186 stub.incRef();
187 stub.prepareAsFinished();
188 ++finishedRequestsCount_;
189 reqFinishedList_.push_back(stub);
190
191 if (finishedRequestsCount_ > finishedRequestsLimit_) {
192 DCHECK(finishedRequestsLimit_ > 0);
193 --finishedRequestsCount_;
194 auto& front = reqFinishedList_.front();
195 reqFinishedList_.pop_front();
196 front.decRef();
197 }
198 }
199
getMethodName() const200 const std::string& RequestsRegistry::DebugStub::getMethodName() const {
201 return getCpp2RequestContext() ? getCpp2RequestContext()->getMethodName()
202 : methodNameIfFinished_;
203 }
204
getLocalAddress() const205 const folly::SocketAddress* RequestsRegistry::DebugStub::getLocalAddress()
206 const {
207 return getCpp2RequestContext() ? getCpp2RequestContext()->getLocalAddress()
208 : &localAddressIfFinished_;
209 }
210
getPeerAddress() const211 const folly::SocketAddress* RequestsRegistry::DebugStub::getPeerAddress()
212 const {
213 return getCpp2RequestContext() ? getCpp2RequestContext()->getPeerAddress()
214 : &peerAddressIfFinished_;
215 }
216
prepareAsFinished()217 void RequestsRegistry::DebugStub::prepareAsFinished() {
218 finished_ = std::chrono::steady_clock::now();
219 rctx_.reset();
220 methodNameIfFinished_ =
221 const_cast<Cpp2RequestContext*>(reqContext_)->releaseMethodName();
222 peerAddressIfFinished_ = *reqContext_->getPeerAddress();
223 localAddressIfFinished_ = *reqContext_->getLocalAddress();
224 reqContext_ = nullptr;
225 req_ = nullptr;
226 }
227
incRef()228 void RequestsRegistry::DebugStub::incRef() noexcept {
229 refCount_++;
230 }
231
decRef()232 void RequestsRegistry::DebugStub::decRef() noexcept {
233 if (--refCount_ == 0) {
234 this->~DebugStub();
235 free(this);
236 }
237 }
238
239 } // namespace thrift
240 } // namespace apache
241