1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <thrift/lib/cpp2/async/RpcOptions.h>
18
19 #include <thrift/lib/cpp2/async/Interaction.h>
20
21 namespace apache {
22 namespace thrift {
23 namespace {
kEmptyMap()24 const transport::THeader::StringToStringMap& kEmptyMap() {
25 static const transport::THeader::StringToStringMap& map =
26 *(new transport::THeader::StringToStringMap);
27 return map;
28 }
29
validateTimeout(std::chrono::milliseconds timeout)30 uint32_t validateTimeout(std::chrono::milliseconds timeout) {
31 using rep = std::chrono::milliseconds::rep;
32 static constexpr rep min = 0;
33 static constexpr rep max = std::numeric_limits<uint32_t>::max();
34 const auto ms = timeout.count();
35 DCHECK_GE(ms, min) << "Timeout should be >= 0";
36 DCHECK_LE(ms, max) << "Timeout should not exceed " << max << " ms";
37 return std::max(min, std::min(ms, max));
38 }
39 } // namespace
40
setTimeout(std::chrono::milliseconds timeout)41 RpcOptions& RpcOptions::setTimeout(std::chrono::milliseconds timeout) {
42 timeout_ = validateTimeout(timeout);
43 return *this;
44 }
45
getTimeout() const46 std::chrono::milliseconds RpcOptions::getTimeout() const {
47 return std::chrono::milliseconds(timeout_);
48 }
49
setPriority(RpcOptions::PRIORITY priority)50 RpcOptions& RpcOptions::setPriority(RpcOptions::PRIORITY priority) {
51 priority_ = static_cast<uint8_t>(priority);
52 return *this;
53 }
54
getPriority() const55 RpcOptions::PRIORITY RpcOptions::getPriority() const {
56 return static_cast<RpcOptions::PRIORITY>(priority_);
57 }
58
setClientOnlyTimeouts(bool val)59 RpcOptions& RpcOptions::setClientOnlyTimeouts(bool val) {
60 clientOnlyTimeouts_ = val;
61 return *this;
62 }
63
getClientOnlyTimeouts() const64 bool RpcOptions::getClientOnlyTimeouts() const {
65 return clientOnlyTimeouts_;
66 }
67
setEnableChecksum(bool val)68 RpcOptions& RpcOptions::setEnableChecksum(bool val) {
69 enableChecksum_ = val;
70 return *this;
71 }
72
getEnableChecksum() const73 bool RpcOptions::getEnableChecksum() const {
74 return enableChecksum_;
75 }
76
setChunkTimeout(std::chrono::milliseconds chunkTimeout)77 RpcOptions& RpcOptions::setChunkTimeout(
78 std::chrono::milliseconds chunkTimeout) {
79 chunkTimeout_ = validateTimeout(chunkTimeout);
80 return *this;
81 }
82
getChunkTimeout() const83 std::chrono::milliseconds RpcOptions::getChunkTimeout() const {
84 return std::chrono::milliseconds(chunkTimeout_);
85 }
86
setChunkBufferSize(int32_t chunkBufferSize)87 RpcOptions& RpcOptions::setChunkBufferSize(int32_t chunkBufferSize) {
88 CHECK_EQ(bufferOptions_.memSize, 0)
89 << "Only one of setMemoryBufferSize and setChunkBufferSize should be called";
90 bufferOptions_.chunkSize = chunkBufferSize;
91 return *this;
92 }
93
setMemoryBufferSize(size_t targetBytes,int32_t initialChunks,int32_t maxChunks)94 RpcOptions& RpcOptions::setMemoryBufferSize(
95 size_t targetBytes, int32_t initialChunks, int32_t maxChunks) {
96 CHECK_EQ(bufferOptions_.chunkSize, 100)
97 << "Only one of setMemoryBufferSize and setChunkBufferSize should be called";
98 CHECK_GT(targetBytes, 0);
99 CHECK_LE(0, initialChunks);
100 CHECK_LE(initialChunks, maxChunks);
101 bufferOptions_.memSize = targetBytes;
102 bufferOptions_.chunkSize = initialChunks;
103 bufferOptions_.maxChunkSize = maxChunks;
104 return *this;
105 }
106
getChunkBufferSize() const107 int32_t RpcOptions::getChunkBufferSize() const {
108 return bufferOptions_.chunkSize;
109 }
110
getBufferOptions() const111 const BufferOptions& RpcOptions::getBufferOptions() const {
112 return bufferOptions_;
113 }
114
setQueueTimeout(std::chrono::milliseconds queueTimeout)115 RpcOptions& RpcOptions::setQueueTimeout(
116 std::chrono::milliseconds queueTimeout) {
117 queueTimeout_ = validateTimeout(queueTimeout);
118 return *this;
119 }
120
getQueueTimeout() const121 std::chrono::milliseconds RpcOptions::getQueueTimeout() const {
122 return std::chrono::milliseconds(queueTimeout_);
123 }
124
setOverallTimeout(std::chrono::milliseconds overallTimeout)125 RpcOptions& RpcOptions::setOverallTimeout(
126 std::chrono::milliseconds overallTimeout) {
127 overallTimeout_ = validateTimeout(overallTimeout);
128 return *this;
129 }
130
getOverallTimeout() const131 std::chrono::milliseconds RpcOptions::getOverallTimeout() const {
132 return std::chrono::milliseconds(overallTimeout_);
133 }
134
setProcessingTimeout(std::chrono::milliseconds processingTimeout)135 RpcOptions& RpcOptions::setProcessingTimeout(
136 std::chrono::milliseconds processingTimeout) {
137 processingTimeout_ = validateTimeout(processingTimeout);
138 return *this;
139 }
140
getProcessingTimeout() const141 std::chrono::milliseconds RpcOptions::getProcessingTimeout() const {
142 return std::chrono::milliseconds(processingTimeout_);
143 }
144
setRoutingKey(std::string routingKey)145 RpcOptions& RpcOptions::setRoutingKey(std::string routingKey) {
146 routingKey_ = std::move(routingKey);
147 return *this;
148 }
149
getRoutingKey() const150 const std::string& RpcOptions::getRoutingKey() const {
151 return routingKey_;
152 }
153
setShardId(std::string shardId)154 RpcOptions& RpcOptions::setShardId(std::string shardId) {
155 shardId_ = std::move(shardId);
156 return *this;
157 }
158
getShardId() const159 const std::string& RpcOptions::getShardId() const {
160 return shardId_;
161 }
162
setReadHeaders(transport::THeader::StringToStringMap && readHeaders)163 void RpcOptions::setReadHeaders(
164 transport::THeader::StringToStringMap&& readHeaders) {
165 readHeaders_ = std::move(readHeaders);
166 }
167
getReadHeaders() const168 const transport::THeader::StringToStringMap& RpcOptions::getReadHeaders()
169 const {
170 return readHeaders_ ? *readHeaders_ : kEmptyMap();
171 }
172
setWriteHeader(const std::string & key,const std::string & value)173 void RpcOptions::setWriteHeader(
174 const std::string& key, const std::string& value) {
175 if (!writeHeaders_) {
176 writeHeaders_.emplace();
177 }
178 (*writeHeaders_)[key] = value;
179 }
180
getWriteHeaders() const181 const transport::THeader::StringToStringMap& RpcOptions::getWriteHeaders()
182 const {
183 return writeHeaders_ ? *writeHeaders_ : kEmptyMap();
184 }
185
releaseWriteHeaders()186 transport::THeader::StringToStringMap RpcOptions::releaseWriteHeaders() {
187 return std::exchange(writeHeaders_, std::nullopt).value_or(kEmptyMap());
188 }
189
getMemAllocType() const190 RpcOptions::MemAllocType RpcOptions::getMemAllocType() const {
191 return memAllocType_;
192 }
193
setMemAllocType(MemAllocType memAllocType)194 RpcOptions& RpcOptions::setMemAllocType(MemAllocType memAllocType) {
195 memAllocType_ = memAllocType;
196 return *this;
197 }
198
setInteractionId(const InteractionId & id)199 RpcOptions& RpcOptions::setInteractionId(const InteractionId& id) {
200 interactionId_ = id;
201 DCHECK_GT(interactionId_, 0);
202 return *this;
203 }
204
getInteractionId() const205 int64_t RpcOptions::getInteractionId() const {
206 return interactionId_;
207 }
208
setLoggingContext(std::string loggingContext)209 RpcOptions& RpcOptions::setLoggingContext(std::string loggingContext) {
210 loggingContext_ = std::move(loggingContext);
211 return *this;
212 }
213
getLoggingContext() const214 const std::string& RpcOptions::getLoggingContext() const {
215 return loggingContext_;
216 }
217
setRoutingData(std::shared_ptr<void> data)218 RpcOptions& RpcOptions::setRoutingData(std::shared_ptr<void> data) {
219 routingData_ = std::move(data);
220 return *this;
221 }
222
getRoutingData() const223 const std::shared_ptr<void>& RpcOptions::getRoutingData() const {
224 return routingData_;
225 }
226
setRoutingHint(uint64_t hint)227 RpcOptions& RpcOptions::setRoutingHint(uint64_t hint) {
228 routingHint_ = hint;
229 return *this;
230 }
231
getRoutingHint() const232 uint64_t RpcOptions::getRoutingHint() const {
233 return routingHint_;
234 }
235
setContextPropMask(uint8_t mask)236 RpcOptions& RpcOptions::setContextPropMask(uint8_t mask) {
237 contextPropComponentEnabledMask_ = mask;
238 return *this;
239 }
240
getContextPropMask() const241 uint8_t RpcOptions::getContextPropMask() const {
242 return contextPropComponentEnabledMask_;
243 }
244
setCallerContext(std::shared_ptr<void> callerContext)245 RpcOptions& RpcOptions::setCallerContext(std::shared_ptr<void> callerContext) {
246 callerContext_ = std::move(callerContext);
247 return *this;
248 }
249
getCallerContext() const250 const std::shared_ptr<void>& RpcOptions::getCallerContext() const {
251 return callerContext_;
252 }
253
254 } // namespace thrift
255 } // namespace apache
256