1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <thrift/lib/cpp2/async/RetryingRequestChannel.h>
18 
19 #include <folly/io/async/AsyncSocketException.h>
20 
21 namespace apache {
22 namespace thrift {
23 
24 class RetryingRequestChannel::RequestCallbackBase {
25  protected:
RequestCallbackBase(folly::Executor::KeepAlive<> ka,RetryingRequestChannel::ImplPtr impl,int retries,const apache::thrift::RpcOptions & options,folly::StringPiece methodName,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header)26   RequestCallbackBase(
27       folly::Executor::KeepAlive<> ka,
28       RetryingRequestChannel::ImplPtr impl,
29       int retries,
30       const apache::thrift::RpcOptions& options,
31       folly::StringPiece methodName,
32       SerializedRequest&& request,
33       std::shared_ptr<apache::thrift::transport::THeader> header)
34       : impl_(std::move(impl)),
35         retriesLeft_(retries),
36         options_(options),
37         methodName_(methodName.str()),
38         request_(std::move(request)),
39         header_(std::move(header)) {
40     if (retriesLeft_) {
41       ka_ = std::move(ka);
42     }
43   }
44 
shouldRetry(folly::exception_wrapper & ex)45   bool shouldRetry(folly::exception_wrapper& ex) {
46     if (!ex.is_compatible_with<
47             apache::thrift::transport::TTransportException>()) {
48       return false;
49     }
50     return retriesLeft_ > 0;
51   }
52 
53   folly::Executor::KeepAlive<> ka_;
54   RetryingRequestChannel::ImplPtr impl_;
55   int retriesLeft_;
56   apache::thrift::RpcOptions options_;
57   std::string methodName_;
58   SerializedRequest request_;
59   std::shared_ptr<apache::thrift::transport::THeader> header_;
60 };
61 
62 class RetryingRequestChannel::RequestCallback
63     : public RetryingRequestChannel::RequestCallbackBase,
64       public apache::thrift::RequestClientCallback {
65  public:
RequestCallback(folly::Executor::KeepAlive<> ka,RetryingRequestChannel::ImplPtr impl,int retries,const apache::thrift::RpcOptions & options,apache::thrift::RequestClientCallback::Ptr cob,folly::StringPiece methodName,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header)66   RequestCallback(
67       folly::Executor::KeepAlive<> ka,
68       RetryingRequestChannel::ImplPtr impl,
69       int retries,
70       const apache::thrift::RpcOptions& options,
71       apache::thrift::RequestClientCallback::Ptr cob,
72       folly::StringPiece methodName,
73       SerializedRequest&& request,
74       std::shared_ptr<apache::thrift::transport::THeader> header)
75       : RequestCallbackBase(
76             std::move(ka),
77             std::move(impl),
78             retries,
79             options,
80             std::move(methodName),
81             std::move(request),
82             header),
83         cob_(std::move(cob)) {}
84 
onResponse(apache::thrift::ClientReceiveState && state)85   void onResponse(
86       apache::thrift::ClientReceiveState&& state) noexcept override {
87     cob_->onRequestSent();
88     cob_.release()->onResponse(std::move(state));
89     delete this;
90   }
91 
onResponseError(folly::exception_wrapper ex)92   void onResponseError(folly::exception_wrapper ex) noexcept override {
93     if (shouldRetry(ex)) {
94       retry();
95     } else {
96       cob_.release()->onResponseError(std::move(ex));
97       delete this;
98     }
99   }
100 
retry()101   void retry() {
102     if (!--retriesLeft_) {
103       ka_.reset();
104     }
105 
106     impl_->sendRequestResponse(
107         options_,
108         methodName_,
109         SerializedRequest(request_.buffer->clone()),
110         header_,
111         RequestClientCallback::Ptr(this));
112   }
113 
114  private:
115   RequestClientCallback::Ptr cob_;
116 };
117 
118 class RetryingRequestChannel::StreamCallback
119     : public RetryingRequestChannel::RequestCallbackBase,
120       public apache::thrift::StreamClientCallback {
121  public:
StreamCallback(folly::Executor::KeepAlive<> ka,RetryingRequestChannel::ImplPtr impl,int retries,const apache::thrift::RpcOptions & options,apache::thrift::StreamClientCallback & clientCallback,folly::StringPiece methodName,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header)122   StreamCallback(
123       folly::Executor::KeepAlive<> ka,
124       RetryingRequestChannel::ImplPtr impl,
125       int retries,
126       const apache::thrift::RpcOptions& options,
127       apache::thrift::StreamClientCallback& clientCallback,
128       folly::StringPiece methodName,
129       SerializedRequest&& request,
130       std::shared_ptr<apache::thrift::transport::THeader> header)
131       : RequestCallbackBase(
132             std::move(ka),
133             std::move(impl),
134             retries,
135             options,
136             methodName,
137             std::move(request),
138             header),
139         clientCallback_(clientCallback) {}
140 
onFirstResponse(FirstResponsePayload && pload,folly::EventBase * evb,StreamServerCallback * serverCallback)141   bool onFirstResponse(
142       FirstResponsePayload&& pload,
143       folly::EventBase* evb,
144       StreamServerCallback* serverCallback) noexcept override {
145     SCOPE_EXIT { delete this; };
146     serverCallback->resetClientCallback(clientCallback_);
147     return clientCallback_.onFirstResponse(
148         std::move(pload), evb, serverCallback);
149   }
150 
onFirstResponseError(folly::exception_wrapper ex)151   void onFirstResponseError(folly::exception_wrapper ex) noexcept override {
152     if (shouldRetry(ex)) {
153       retry();
154     } else {
155       clientCallback_.onFirstResponseError(std::move(ex));
156       delete this;
157     }
158   }
159 
onStreamNext(StreamPayload &&)160   bool onStreamNext(StreamPayload&&) override { std::terminate(); }
161 
onStreamError(folly::exception_wrapper)162   void onStreamError(folly::exception_wrapper) override { std::terminate(); }
163 
onStreamComplete()164   void onStreamComplete() override { std::terminate(); }
165 
onStreamHeaders(HeadersPayload &&)166   bool onStreamHeaders(HeadersPayload&&) override { std::terminate(); }
167 
resetServerCallback(StreamServerCallback &)168   void resetServerCallback(StreamServerCallback&) override { std::terminate(); }
169 
170  private:
retry()171   void retry() {
172     if (!--retriesLeft_) {
173       ka_.reset();
174     }
175 
176     impl_->sendRequestStream(
177         options_,
178         methodName_,
179         SerializedRequest(request_.buffer->clone()),
180         header_,
181         this);
182   }
183 
184   StreamClientCallback& clientCallback_;
185 };
186 
187 class RetryingRequestChannel::SinkCallback
188     : public RetryingRequestChannel::RequestCallbackBase,
189       public apache::thrift::SinkClientCallback {
190  public:
SinkCallback(folly::Executor::KeepAlive<> ka,RetryingRequestChannel::ImplPtr impl,int retries,const apache::thrift::RpcOptions & options,apache::thrift::SinkClientCallback & clientCallback,folly::StringPiece methodName,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header)191   SinkCallback(
192       folly::Executor::KeepAlive<> ka,
193       RetryingRequestChannel::ImplPtr impl,
194       int retries,
195       const apache::thrift::RpcOptions& options,
196       apache::thrift::SinkClientCallback& clientCallback,
197       folly::StringPiece methodName,
198       SerializedRequest&& request,
199       std::shared_ptr<apache::thrift::transport::THeader> header)
200       : RequestCallbackBase(
201             std::move(ka),
202             std::move(impl),
203             retries,
204             options,
205             methodName,
206             std::move(request),
207             header),
208         clientCallback_(clientCallback) {}
209 
onFirstResponse(FirstResponsePayload && pload,folly::EventBase * evb,SinkServerCallback * serverCallback)210   bool onFirstResponse(
211       FirstResponsePayload&& pload,
212       folly::EventBase* evb,
213       SinkServerCallback* serverCallback) noexcept override {
214     SCOPE_EXIT { delete this; };
215     serverCallback->resetClientCallback(clientCallback_);
216     return clientCallback_.onFirstResponse(
217         std::move(pload), evb, serverCallback);
218   }
219 
onFirstResponseError(folly::exception_wrapper ex)220   void onFirstResponseError(folly::exception_wrapper ex) noexcept override {
221     if (shouldRetry(ex)) {
222       retry();
223     } else {
224       clientCallback_.onFirstResponseError(std::move(ex));
225       delete this;
226     }
227   }
228 
onFinalResponse(StreamPayload &&)229   void onFinalResponse(StreamPayload&&) override { std::terminate(); }
230 
onFinalResponseError(folly::exception_wrapper)231   void onFinalResponseError(folly::exception_wrapper) override {
232     std::terminate();
233   }
234 
onSinkRequestN(uint64_t)235   bool onSinkRequestN(uint64_t) override { std::terminate(); }
236 
resetServerCallback(SinkServerCallback &)237   void resetServerCallback(SinkServerCallback&) override { std::terminate(); }
238 
239  private:
retry()240   void retry() {
241     if (!--retriesLeft_) {
242       ka_.reset();
243     }
244 
245     impl_->sendRequestSink(
246         options_,
247         methodName_,
248         SerializedRequest(request_.buffer->clone()),
249         header_,
250         this);
251   }
252 
253   SinkClientCallback& clientCallback_;
254 };
255 
sendRequestStream(const apache::thrift::RpcOptions & rpcOptions,MethodMetadata && methodMetadata,apache::thrift::SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header,apache::thrift::StreamClientCallback * clientCallback)256 void RetryingRequestChannel::sendRequestStream(
257     const apache::thrift::RpcOptions& rpcOptions,
258     MethodMetadata&& methodMetadata,
259     apache::thrift::SerializedRequest&& request,
260     std::shared_ptr<apache::thrift::transport::THeader> header,
261     apache::thrift::StreamClientCallback* clientCallback) {
262   apache::thrift::StreamClientCallback* streamCallback = new StreamCallback(
263       folly::getKeepAliveToken(evb_),
264       impl_,
265       numRetries_,
266       rpcOptions,
267       *clientCallback,
268       methodMetadata.name_view(),
269       SerializedRequest(request.buffer->clone()),
270       header);
271 
272   return impl_->sendRequestStream(
273       rpcOptions,
274       std::move(methodMetadata),
275       std::move(request),
276       std::move(header),
277       streamCallback);
278 }
279 
sendRequestSink(const apache::thrift::RpcOptions & rpcOptions,apache::thrift::MethodMetadata && methodMetadata,apache::thrift::SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header,apache::thrift::SinkClientCallback * clientCallback)280 void RetryingRequestChannel::sendRequestSink(
281     const apache::thrift::RpcOptions& rpcOptions,
282     apache::thrift::MethodMetadata&& methodMetadata,
283     apache::thrift::SerializedRequest&& request,
284     std::shared_ptr<apache::thrift::transport::THeader> header,
285     apache::thrift::SinkClientCallback* clientCallback) {
286   apache::thrift::SinkClientCallback* sinkCallback = new SinkCallback(
287       folly::getKeepAliveToken(evb_),
288       impl_,
289       numRetries_,
290       rpcOptions,
291       *clientCallback,
292       methodMetadata.name_view(),
293       SerializedRequest(request.buffer->clone()),
294       header);
295 
296   return impl_->sendRequestSink(
297       rpcOptions,
298       std::move(methodMetadata),
299       std::move(request),
300       std::move(header),
301       sinkCallback);
302 }
303 
sendRequestResponse(const apache::thrift::RpcOptions & options,MethodMetadata && methodMetadata,SerializedRequest && request,std::shared_ptr<apache::thrift::transport::THeader> header,RequestClientCallback::Ptr cob)304 void RetryingRequestChannel::sendRequestResponse(
305     const apache::thrift::RpcOptions& options,
306     MethodMetadata&& methodMetadata,
307     SerializedRequest&& request,
308     std::shared_ptr<apache::thrift::transport::THeader> header,
309     RequestClientCallback::Ptr cob) {
310   cob = RequestClientCallback::Ptr(new RequestCallback(
311       folly::getKeepAliveToken(evb_),
312       impl_,
313       numRetries_,
314       options,
315       std::move(cob),
316       methodMetadata.name_view(),
317       SerializedRequest(request.buffer->clone()),
318       header));
319 
320   return impl_->sendRequestResponse(
321       options,
322       std::move(methodMetadata),
323       std::move(request),
324       std::move(header),
325       std::move(cob));
326 }
327 } // namespace thrift
328 } // namespace apache
329