1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <thrift/lib/cpp2/async/RpcOptions.h>
18 
19 #include <thrift/lib/cpp2/async/Interaction.h>
20 
21 namespace apache {
22 namespace thrift {
23 namespace {
kEmptyMap()24 const transport::THeader::StringToStringMap& kEmptyMap() {
25   static const transport::THeader::StringToStringMap& map =
26       *(new transport::THeader::StringToStringMap);
27   return map;
28 }
29 
validateTimeout(std::chrono::milliseconds timeout)30 uint32_t validateTimeout(std::chrono::milliseconds timeout) {
31   using rep = std::chrono::milliseconds::rep;
32   static constexpr rep min = 0;
33   static constexpr rep max = std::numeric_limits<uint32_t>::max();
34   const auto ms = timeout.count();
35   DCHECK_GE(ms, min) << "Timeout should be >= 0";
36   DCHECK_LE(ms, max) << "Timeout should not exceed " << max << " ms";
37   return std::max(min, std::min(ms, max));
38 }
39 } // namespace
40 
setTimeout(std::chrono::milliseconds timeout)41 RpcOptions& RpcOptions::setTimeout(std::chrono::milliseconds timeout) {
42   timeout_ = validateTimeout(timeout);
43   return *this;
44 }
45 
getTimeout() const46 std::chrono::milliseconds RpcOptions::getTimeout() const {
47   return std::chrono::milliseconds(timeout_);
48 }
49 
setPriority(RpcOptions::PRIORITY priority)50 RpcOptions& RpcOptions::setPriority(RpcOptions::PRIORITY priority) {
51   priority_ = static_cast<uint8_t>(priority);
52   return *this;
53 }
54 
getPriority() const55 RpcOptions::PRIORITY RpcOptions::getPriority() const {
56   return static_cast<RpcOptions::PRIORITY>(priority_);
57 }
58 
setClientOnlyTimeouts(bool val)59 RpcOptions& RpcOptions::setClientOnlyTimeouts(bool val) {
60   clientOnlyTimeouts_ = val;
61   return *this;
62 }
63 
getClientOnlyTimeouts() const64 bool RpcOptions::getClientOnlyTimeouts() const {
65   return clientOnlyTimeouts_;
66 }
67 
setEnableChecksum(bool val)68 RpcOptions& RpcOptions::setEnableChecksum(bool val) {
69   enableChecksum_ = val;
70   return *this;
71 }
72 
getEnableChecksum() const73 bool RpcOptions::getEnableChecksum() const {
74   return enableChecksum_;
75 }
76 
setChunkTimeout(std::chrono::milliseconds chunkTimeout)77 RpcOptions& RpcOptions::setChunkTimeout(
78     std::chrono::milliseconds chunkTimeout) {
79   chunkTimeout_ = validateTimeout(chunkTimeout);
80   return *this;
81 }
82 
getChunkTimeout() const83 std::chrono::milliseconds RpcOptions::getChunkTimeout() const {
84   return std::chrono::milliseconds(chunkTimeout_);
85 }
86 
setChunkBufferSize(int32_t chunkBufferSize)87 RpcOptions& RpcOptions::setChunkBufferSize(int32_t chunkBufferSize) {
88   CHECK_EQ(bufferOptions_.memSize, 0)
89       << "Only one of setMemoryBufferSize and setChunkBufferSize should be called";
90   bufferOptions_.chunkSize = chunkBufferSize;
91   return *this;
92 }
93 
setMemoryBufferSize(size_t targetBytes,int32_t initialChunks,int32_t maxChunks)94 RpcOptions& RpcOptions::setMemoryBufferSize(
95     size_t targetBytes, int32_t initialChunks, int32_t maxChunks) {
96   CHECK_EQ(bufferOptions_.chunkSize, 100)
97       << "Only one of setMemoryBufferSize and setChunkBufferSize should be called";
98   CHECK_GT(targetBytes, 0);
99   CHECK_LE(0, initialChunks);
100   CHECK_LE(initialChunks, maxChunks);
101   bufferOptions_.memSize = targetBytes;
102   bufferOptions_.chunkSize = initialChunks;
103   bufferOptions_.maxChunkSize = maxChunks;
104   return *this;
105 }
106 
getChunkBufferSize() const107 int32_t RpcOptions::getChunkBufferSize() const {
108   return bufferOptions_.chunkSize;
109 }
110 
getBufferOptions() const111 const BufferOptions& RpcOptions::getBufferOptions() const {
112   return bufferOptions_;
113 }
114 
setQueueTimeout(std::chrono::milliseconds queueTimeout)115 RpcOptions& RpcOptions::setQueueTimeout(
116     std::chrono::milliseconds queueTimeout) {
117   queueTimeout_ = validateTimeout(queueTimeout);
118   return *this;
119 }
120 
getQueueTimeout() const121 std::chrono::milliseconds RpcOptions::getQueueTimeout() const {
122   return std::chrono::milliseconds(queueTimeout_);
123 }
124 
setOverallTimeout(std::chrono::milliseconds overallTimeout)125 RpcOptions& RpcOptions::setOverallTimeout(
126     std::chrono::milliseconds overallTimeout) {
127   overallTimeout_ = validateTimeout(overallTimeout);
128   return *this;
129 }
130 
getOverallTimeout() const131 std::chrono::milliseconds RpcOptions::getOverallTimeout() const {
132   return std::chrono::milliseconds(overallTimeout_);
133 }
134 
setProcessingTimeout(std::chrono::milliseconds processingTimeout)135 RpcOptions& RpcOptions::setProcessingTimeout(
136     std::chrono::milliseconds processingTimeout) {
137   processingTimeout_ = validateTimeout(processingTimeout);
138   return *this;
139 }
140 
getProcessingTimeout() const141 std::chrono::milliseconds RpcOptions::getProcessingTimeout() const {
142   return std::chrono::milliseconds(processingTimeout_);
143 }
144 
setRoutingKey(std::string routingKey)145 RpcOptions& RpcOptions::setRoutingKey(std::string routingKey) {
146   routingKey_ = std::move(routingKey);
147   return *this;
148 }
149 
getRoutingKey() const150 const std::string& RpcOptions::getRoutingKey() const {
151   return routingKey_;
152 }
153 
setShardId(std::string shardId)154 RpcOptions& RpcOptions::setShardId(std::string shardId) {
155   shardId_ = std::move(shardId);
156   return *this;
157 }
158 
getShardId() const159 const std::string& RpcOptions::getShardId() const {
160   return shardId_;
161 }
162 
setReadHeaders(transport::THeader::StringToStringMap && readHeaders)163 void RpcOptions::setReadHeaders(
164     transport::THeader::StringToStringMap&& readHeaders) {
165   readHeaders_ = std::move(readHeaders);
166 }
167 
getReadHeaders() const168 const transport::THeader::StringToStringMap& RpcOptions::getReadHeaders()
169     const {
170   return readHeaders_ ? *readHeaders_ : kEmptyMap();
171 }
172 
setWriteHeader(const std::string & key,const std::string & value)173 void RpcOptions::setWriteHeader(
174     const std::string& key, const std::string& value) {
175   if (!writeHeaders_) {
176     writeHeaders_.emplace();
177   }
178   (*writeHeaders_)[key] = value;
179 }
180 
getWriteHeaders() const181 const transport::THeader::StringToStringMap& RpcOptions::getWriteHeaders()
182     const {
183   return writeHeaders_ ? *writeHeaders_ : kEmptyMap();
184 }
185 
releaseWriteHeaders()186 transport::THeader::StringToStringMap RpcOptions::releaseWriteHeaders() {
187   return std::exchange(writeHeaders_, std::nullopt).value_or(kEmptyMap());
188 }
189 
getMemAllocType() const190 RpcOptions::MemAllocType RpcOptions::getMemAllocType() const {
191   return memAllocType_;
192 }
193 
setMemAllocType(MemAllocType memAllocType)194 RpcOptions& RpcOptions::setMemAllocType(MemAllocType memAllocType) {
195   memAllocType_ = memAllocType;
196   return *this;
197 }
198 
setInteractionId(const InteractionId & id)199 RpcOptions& RpcOptions::setInteractionId(const InteractionId& id) {
200   interactionId_ = id;
201   DCHECK_GT(interactionId_, 0);
202   return *this;
203 }
204 
getInteractionId() const205 int64_t RpcOptions::getInteractionId() const {
206   return interactionId_;
207 }
208 
setLoggingContext(std::string loggingContext)209 RpcOptions& RpcOptions::setLoggingContext(std::string loggingContext) {
210   loggingContext_ = std::move(loggingContext);
211   return *this;
212 }
213 
getLoggingContext() const214 const std::string& RpcOptions::getLoggingContext() const {
215   return loggingContext_;
216 }
217 
setRoutingData(std::shared_ptr<void> data)218 RpcOptions& RpcOptions::setRoutingData(std::shared_ptr<void> data) {
219   routingData_ = std::move(data);
220   return *this;
221 }
222 
getRoutingData() const223 const std::shared_ptr<void>& RpcOptions::getRoutingData() const {
224   return routingData_;
225 }
226 
setRoutingHint(uint64_t hint)227 RpcOptions& RpcOptions::setRoutingHint(uint64_t hint) {
228   routingHint_ = hint;
229   return *this;
230 }
231 
getRoutingHint() const232 uint64_t RpcOptions::getRoutingHint() const {
233   return routingHint_;
234 }
235 
setContextPropMask(uint8_t mask)236 RpcOptions& RpcOptions::setContextPropMask(uint8_t mask) {
237   contextPropComponentEnabledMask_ = mask;
238   return *this;
239 }
240 
getContextPropMask() const241 uint8_t RpcOptions::getContextPropMask() const {
242   return contextPropComponentEnabledMask_;
243 }
244 
setCallerContext(std::shared_ptr<void> callerContext)245 RpcOptions& RpcOptions::setCallerContext(std::shared_ptr<void> callerContext) {
246   callerContext_ = std::move(callerContext);
247   return *this;
248 }
249 
getCallerContext() const250 const std::shared_ptr<void>& RpcOptions::getCallerContext() const {
251   return callerContext_;
252 }
253 
254 } // namespace thrift
255 } // namespace apache
256