1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <thrift/lib/cpp2/async/RetryingRequestChannel.h>
18
19 #include <folly/io/async/AsyncSocketException.h>
20
21 namespace apache {
22 namespace thrift {
23
24 class RetryingRequestChannel::RequestCallbackBase {
25 protected:
RequestCallbackBase(folly::Executor::KeepAlive<> ka,RetryingRequestChannel::ImplPtr impl,int retries,const apache::thrift::RpcOptions & options,folly::StringPiece methodName,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header)26 RequestCallbackBase(
27 folly::Executor::KeepAlive<> ka,
28 RetryingRequestChannel::ImplPtr impl,
29 int retries,
30 const apache::thrift::RpcOptions& options,
31 folly::StringPiece methodName,
32 SerializedRequest&& request,
33 std::shared_ptr<apache::thrift::transport::THeader> header)
34 : impl_(std::move(impl)),
35 retriesLeft_(retries),
36 options_(options),
37 methodName_(methodName.str()),
38 request_(std::move(request)),
39 header_(std::move(header)) {
40 if (retriesLeft_) {
41 ka_ = std::move(ka);
42 }
43 }
44
shouldRetry(folly::exception_wrapper & ex)45 bool shouldRetry(folly::exception_wrapper& ex) {
46 if (!ex.is_compatible_with<
47 apache::thrift::transport::TTransportException>()) {
48 return false;
49 }
50 return retriesLeft_ > 0;
51 }
52
53 folly::Executor::KeepAlive<> ka_;
54 RetryingRequestChannel::ImplPtr impl_;
55 int retriesLeft_;
56 apache::thrift::RpcOptions options_;
57 std::string methodName_;
58 SerializedRequest request_;
59 std::shared_ptr<apache::thrift::transport::THeader> header_;
60 };
61
62 class RetryingRequestChannel::RequestCallback
63 : public RetryingRequestChannel::RequestCallbackBase,
64 public apache::thrift::RequestClientCallback {
65 public:
RequestCallback(folly::Executor::KeepAlive<> ka,RetryingRequestChannel::ImplPtr impl,int retries,const apache::thrift::RpcOptions & options,apache::thrift::RequestClientCallback::Ptr cob,folly::StringPiece methodName,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header)66 RequestCallback(
67 folly::Executor::KeepAlive<> ka,
68 RetryingRequestChannel::ImplPtr impl,
69 int retries,
70 const apache::thrift::RpcOptions& options,
71 apache::thrift::RequestClientCallback::Ptr cob,
72 folly::StringPiece methodName,
73 SerializedRequest&& request,
74 std::shared_ptr<apache::thrift::transport::THeader> header)
75 : RequestCallbackBase(
76 std::move(ka),
77 std::move(impl),
78 retries,
79 options,
80 std::move(methodName),
81 std::move(request),
82 header),
83 cob_(std::move(cob)) {}
84
onResponse(apache::thrift::ClientReceiveState && state)85 void onResponse(
86 apache::thrift::ClientReceiveState&& state) noexcept override {
87 cob_->onRequestSent();
88 cob_.release()->onResponse(std::move(state));
89 delete this;
90 }
91
onResponseError(folly::exception_wrapper ex)92 void onResponseError(folly::exception_wrapper ex) noexcept override {
93 if (shouldRetry(ex)) {
94 retry();
95 } else {
96 cob_.release()->onResponseError(std::move(ex));
97 delete this;
98 }
99 }
100
retry()101 void retry() {
102 if (!--retriesLeft_) {
103 ka_.reset();
104 }
105
106 impl_->sendRequestResponse(
107 options_,
108 methodName_,
109 SerializedRequest(request_.buffer->clone()),
110 header_,
111 RequestClientCallback::Ptr(this));
112 }
113
114 private:
115 RequestClientCallback::Ptr cob_;
116 };
117
118 class RetryingRequestChannel::StreamCallback
119 : public RetryingRequestChannel::RequestCallbackBase,
120 public apache::thrift::StreamClientCallback {
121 public:
StreamCallback(folly::Executor::KeepAlive<> ka,RetryingRequestChannel::ImplPtr impl,int retries,const apache::thrift::RpcOptions & options,apache::thrift::StreamClientCallback & clientCallback,folly::StringPiece methodName,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header)122 StreamCallback(
123 folly::Executor::KeepAlive<> ka,
124 RetryingRequestChannel::ImplPtr impl,
125 int retries,
126 const apache::thrift::RpcOptions& options,
127 apache::thrift::StreamClientCallback& clientCallback,
128 folly::StringPiece methodName,
129 SerializedRequest&& request,
130 std::shared_ptr<apache::thrift::transport::THeader> header)
131 : RequestCallbackBase(
132 std::move(ka),
133 std::move(impl),
134 retries,
135 options,
136 methodName,
137 std::move(request),
138 header),
139 clientCallback_(clientCallback) {}
140
onFirstResponse(FirstResponsePayload && pload,folly::EventBase * evb,StreamServerCallback * serverCallback)141 bool onFirstResponse(
142 FirstResponsePayload&& pload,
143 folly::EventBase* evb,
144 StreamServerCallback* serverCallback) noexcept override {
145 SCOPE_EXIT { delete this; };
146 serverCallback->resetClientCallback(clientCallback_);
147 return clientCallback_.onFirstResponse(
148 std::move(pload), evb, serverCallback);
149 }
150
onFirstResponseError(folly::exception_wrapper ex)151 void onFirstResponseError(folly::exception_wrapper ex) noexcept override {
152 if (shouldRetry(ex)) {
153 retry();
154 } else {
155 clientCallback_.onFirstResponseError(std::move(ex));
156 delete this;
157 }
158 }
159
onStreamNext(StreamPayload &&)160 bool onStreamNext(StreamPayload&&) override { std::terminate(); }
161
onStreamError(folly::exception_wrapper)162 void onStreamError(folly::exception_wrapper) override { std::terminate(); }
163
onStreamComplete()164 void onStreamComplete() override { std::terminate(); }
165
onStreamHeaders(HeadersPayload &&)166 bool onStreamHeaders(HeadersPayload&&) override { std::terminate(); }
167
resetServerCallback(StreamServerCallback &)168 void resetServerCallback(StreamServerCallback&) override { std::terminate(); }
169
170 private:
retry()171 void retry() {
172 if (!--retriesLeft_) {
173 ka_.reset();
174 }
175
176 impl_->sendRequestStream(
177 options_,
178 methodName_,
179 SerializedRequest(request_.buffer->clone()),
180 header_,
181 this);
182 }
183
184 StreamClientCallback& clientCallback_;
185 };
186
187 class RetryingRequestChannel::SinkCallback
188 : public RetryingRequestChannel::RequestCallbackBase,
189 public apache::thrift::SinkClientCallback {
190 public:
SinkCallback(folly::Executor::KeepAlive<> ka,RetryingRequestChannel::ImplPtr impl,int retries,const apache::thrift::RpcOptions & options,apache::thrift::SinkClientCallback & clientCallback,folly::StringPiece methodName,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header)191 SinkCallback(
192 folly::Executor::KeepAlive<> ka,
193 RetryingRequestChannel::ImplPtr impl,
194 int retries,
195 const apache::thrift::RpcOptions& options,
196 apache::thrift::SinkClientCallback& clientCallback,
197 folly::StringPiece methodName,
198 SerializedRequest&& request,
199 std::shared_ptr<apache::thrift::transport::THeader> header)
200 : RequestCallbackBase(
201 std::move(ka),
202 std::move(impl),
203 retries,
204 options,
205 methodName,
206 std::move(request),
207 header),
208 clientCallback_(clientCallback) {}
209
onFirstResponse(FirstResponsePayload && pload,folly::EventBase * evb,SinkServerCallback * serverCallback)210 bool onFirstResponse(
211 FirstResponsePayload&& pload,
212 folly::EventBase* evb,
213 SinkServerCallback* serverCallback) noexcept override {
214 SCOPE_EXIT { delete this; };
215 serverCallback->resetClientCallback(clientCallback_);
216 return clientCallback_.onFirstResponse(
217 std::move(pload), evb, serverCallback);
218 }
219
onFirstResponseError(folly::exception_wrapper ex)220 void onFirstResponseError(folly::exception_wrapper ex) noexcept override {
221 if (shouldRetry(ex)) {
222 retry();
223 } else {
224 clientCallback_.onFirstResponseError(std::move(ex));
225 delete this;
226 }
227 }
228
onFinalResponse(StreamPayload &&)229 void onFinalResponse(StreamPayload&&) override { std::terminate(); }
230
onFinalResponseError(folly::exception_wrapper)231 void onFinalResponseError(folly::exception_wrapper) override {
232 std::terminate();
233 }
234
onSinkRequestN(uint64_t)235 bool onSinkRequestN(uint64_t) override { std::terminate(); }
236
resetServerCallback(SinkServerCallback &)237 void resetServerCallback(SinkServerCallback&) override { std::terminate(); }
238
239 private:
retry()240 void retry() {
241 if (!--retriesLeft_) {
242 ka_.reset();
243 }
244
245 impl_->sendRequestSink(
246 options_,
247 methodName_,
248 SerializedRequest(request_.buffer->clone()),
249 header_,
250 this);
251 }
252
253 SinkClientCallback& clientCallback_;
254 };
255
sendRequestStream(const apache::thrift::RpcOptions & rpcOptions,MethodMetadata && methodMetadata,apache::thrift::SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header,apache::thrift::StreamClientCallback * clientCallback)256 void RetryingRequestChannel::sendRequestStream(
257 const apache::thrift::RpcOptions& rpcOptions,
258 MethodMetadata&& methodMetadata,
259 apache::thrift::SerializedRequest&& request,
260 std::shared_ptr<apache::thrift::transport::THeader> header,
261 apache::thrift::StreamClientCallback* clientCallback) {
262 apache::thrift::StreamClientCallback* streamCallback = new StreamCallback(
263 folly::getKeepAliveToken(evb_),
264 impl_,
265 numRetries_,
266 rpcOptions,
267 *clientCallback,
268 methodMetadata.name_view(),
269 SerializedRequest(request.buffer->clone()),
270 header);
271
272 return impl_->sendRequestStream(
273 rpcOptions,
274 std::move(methodMetadata),
275 std::move(request),
276 std::move(header),
277 streamCallback);
278 }
279
sendRequestSink(const apache::thrift::RpcOptions & rpcOptions,apache::thrift::MethodMetadata && methodMetadata,apache::thrift::SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header,apache::thrift::SinkClientCallback * clientCallback)280 void RetryingRequestChannel::sendRequestSink(
281 const apache::thrift::RpcOptions& rpcOptions,
282 apache::thrift::MethodMetadata&& methodMetadata,
283 apache::thrift::SerializedRequest&& request,
284 std::shared_ptr<apache::thrift::transport::THeader> header,
285 apache::thrift::SinkClientCallback* clientCallback) {
286 apache::thrift::SinkClientCallback* sinkCallback = new SinkCallback(
287 folly::getKeepAliveToken(evb_),
288 impl_,
289 numRetries_,
290 rpcOptions,
291 *clientCallback,
292 methodMetadata.name_view(),
293 SerializedRequest(request.buffer->clone()),
294 header);
295
296 return impl_->sendRequestSink(
297 rpcOptions,
298 std::move(methodMetadata),
299 std::move(request),
300 std::move(header),
301 sinkCallback);
302 }
303
sendRequestResponse(const apache::thrift::RpcOptions & options,MethodMetadata && methodMetadata,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header,RequestClientCallback::Ptr cob)304 void RetryingRequestChannel::sendRequestResponse(
305 const apache::thrift::RpcOptions& options,
306 MethodMetadata&& methodMetadata,
307 SerializedRequest&& request,
308 std::shared_ptr<apache::thrift::transport::THeader> header,
309 RequestClientCallback::Ptr cob) {
310 cob = RequestClientCallback::Ptr(new RequestCallback(
311 folly::getKeepAliveToken(evb_),
312 impl_,
313 numRetries_,
314 options,
315 std::move(cob),
316 methodMetadata.name_view(),
317 SerializedRequest(request.buffer->clone()),
318 header));
319
320 return impl_->sendRequestResponse(
321 options,
322 std::move(methodMetadata),
323 std::move(request),
324 std::move(header),
325 std::move(cob));
326 }
327 } // namespace thrift
328 } // namespace apache
329