1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef THRIFT_ASYNC_RESPONSECHANNEL_H_
18 #define THRIFT_ASYNC_RESPONSECHANNEL_H_ 1
19 
20 #include <chrono>
21 #include <limits>
22 #include <memory>
23 
24 #include <folly/Portability.h>
25 
26 #include <thrift/lib/cpp/Thrift.h>
27 #include <thrift/lib/cpp/server/TServerObserver.h>
28 #include <thrift/lib/cpp2/async/MessageChannel.h>
29 #include <thrift/lib/cpp2/async/ServerStream.h>
30 #include <thrift/lib/cpp2/async/Sink.h>
31 #include <thrift/lib/cpp2/async/StreamCallbacks.h>
32 #include <thrift/lib/cpp2/server/RequestsRegistry.h>
33 
34 namespace folly {
35 class IOBuf;
36 }
37 
38 extern const std::string kUnknownErrorCode;
39 extern const std::string kOverloadedErrorCode;
40 extern const std::string kAppOverloadedErrorCode;
41 extern const std::string kAppClientErrorCode;
42 extern const std::string kAppServerErrorCode;
43 extern const std::string kTaskExpiredErrorCode;
44 extern const std::string kProxyTransportExceptionErrorCode;
45 extern const std::string kProxyClientProtocolExceptionErrorCode;
46 extern const std::string kQueueOverloadedErrorCode;
47 extern const std::string kProxyHeaderParseExceptionErrorCode;
48 extern const std::string kProxyAuthExceptionErrorCode;
49 extern const std::string kProxyLookupTransportExceptionErrorCode;
50 extern const std::string kProxyLookupAppExceptionErrorCode;
51 extern const std::string kProxyWhitelistExceptionErrorCode;
52 extern const std::string kProxyClientAppExceptionErrorCode;
53 extern const std::string kProxyProtocolMismatchExceptionErrorCode;
54 extern const std::string kProxyQPSThrottledExceptionErrorCode;
55 extern const std::string kProxyResponseSizeThrottledExceptionErrorCode;
56 extern const std::string kInjectedFailureErrorCode;
57 extern const std::string kServerQueueTimeoutErrorCode;
58 extern const std::string kResponseTooBigErrorCode;
59 extern const std::string kProxyAclCheckExceptionErrorCode;
60 extern const std::string kProxyOverloadedErrorCode;
61 extern const std::string kProxyLoopbackErrorCode;
62 extern const std::string kRequestTypeDoesntMatchServiceFunctionType;
63 extern const std::string kMethodUnknownErrorCode;
64 extern const std::string kInteractionIdUnknownErrorCode;
65 extern const std::string kInteractionConstructorErrorErrorCode;
66 extern const std::string kConnectionClosingErrorCode;
67 extern const std::string kRequestParsingErrorCode;
68 extern const std::string kServerIngressMemoryLimitExceededErrorCode;
69 extern const std::string kChecksumMismatchErrorCode;
70 
71 namespace apache {
72 namespace thrift {
73 
74 class ResponseChannelRequest {
75  public:
76   using UniquePtr =
77       std::unique_ptr<ResponseChannelRequest, RequestsRegistry::Deleter>;
78 
79   virtual bool isActive() const = 0;
80 
81   virtual bool isOneway() const = 0;
82 
isStream()83   virtual bool isStream() const { return false; }
84 
isSink()85   virtual bool isSink() const { return false; }
86 
87   virtual bool includeEnvelope() const = 0;
88 
rpcKind()89   apache::thrift::RpcKind rpcKind() const {
90     if (isStream()) {
91       return apache::thrift::RpcKind::SINGLE_REQUEST_STREAMING_RESPONSE;
92     }
93     if (isSink()) {
94       return apache::thrift::RpcKind::SINK;
95     }
96     if (isOneway()) {
97       return apache::thrift::RpcKind::SINGLE_REQUEST_NO_RESPONSE;
98     }
99     return apache::thrift::RpcKind::SINGLE_REQUEST_SINGLE_RESPONSE;
100   }
101 
isReplyChecksumNeeded()102   virtual bool isReplyChecksumNeeded() const { return false; }
103 
104   virtual void sendReply(
105       ResponsePayload&&,
106       MessageChannel::SendCallback* cb = nullptr,
107       folly::Optional<uint32_t> crc32 = folly::none) = 0;
108 
109   virtual void sendStreamReply(
110       ResponsePayload&&,
111       apache::thrift::detail::ServerStreamFactory&&,
112       folly::Optional<uint32_t> = folly::none) {
113     throw std::logic_error("unimplemented");
114   }
115 
116   FOLLY_NODISCARD static bool sendStreamReply(
117       ResponseChannelRequest::UniquePtr request,
118       folly::EventBase* eb,
119       ResponsePayload&& payload,
120       StreamServerCallbackPtr callback,
121       folly::Optional<uint32_t> crc32 = folly::none) {
122     // Destroying request can call onStreamCancel inline, which would be a
123     // contract violation if we did it inline and returned true.
124     SCOPE_EXIT {
125       eb->runInEventBaseThread([request = std::move(request)] {});
126     };
127     return request->sendStreamReply(
128         std::move(payload), std::move(callback), crc32);
129   }
130 
131 #if FOLLY_HAS_COROUTINES
132   virtual void sendSinkReply(
133       ResponsePayload&&,
134       apache::thrift::detail::SinkConsumerImpl&&,
135       folly::Optional<uint32_t> = folly::none) {
136     throw std::logic_error("unimplemented");
137   }
138 
139   FOLLY_NODISCARD static bool sendSinkReply(
140       ResponseChannelRequest::UniquePtr request,
141       folly::EventBase* eb,
142       ResponsePayload&& payload,
143       SinkServerCallbackPtr callback,
144       folly::Optional<uint32_t> crc32 = folly::none) {
145     SCOPE_EXIT {
146       eb->runInEventBaseThread([request = std::move(request)] {});
147     };
148     return request->sendSinkReply(
149         std::move(payload), std::move(callback), crc32);
150   }
151 #endif
152 
153   virtual void sendException(
154       ResponsePayload&& response, MessageChannel::SendCallback* cb = nullptr) {
155     // Until we start requesting payloads without the envelope we can pass any
156     // sendException calls to sendReply
157     sendReply(std::move(response), cb, folly::none);
158   }
159 
160   virtual void sendErrorWrapped(
161       folly::exception_wrapper ex, std::string exCode) = 0;
162 
sendQueueTimeoutResponse()163   virtual void sendQueueTimeoutResponse() {}
164 
165   virtual ~ResponseChannelRequest() = default;
166 
getShouldStartProcessing()167   bool getShouldStartProcessing() {
168     if (!tryStartProcessing()) {
169       return false;
170     }
171     return true;
172   }
173 
174  protected:
175   // callTryStartProcessing is a helper method used in ResponseChannelRequest
176   // wrapper subclasses to delegate tryStartProcessing() calls to the wrapped
177   // ResponseChannelRequest. This is necessary due to the protected nature of
178   // tryStartProcessing().
callTryStartProcessing(ResponseChannelRequest * request)179   static bool callTryStartProcessing(ResponseChannelRequest* request) {
180     return request->tryStartProcessing();
181   }
182   virtual bool tryStartProcessing() = 0;
183 
184   FOLLY_NODISCARD virtual bool sendStreamReply(
185       ResponsePayload&&,
186       StreamServerCallbackPtr,
187       folly::Optional<uint32_t> = folly::none) {
188     throw std::logic_error("unimplemented");
189   }
190   FOLLY_NODISCARD virtual bool sendSinkReply(
191       ResponsePayload&&,
192       SinkServerCallbackPtr,
193       folly::Optional<uint32_t> = folly::none) {
194     throw std::logic_error("unimplemented");
195   }
196 
197   bool startedProcessing_{false};
198 };
199 
200 /**
201  * ResponseChannel defines an asynchronous API for servers.
202  */
203 class ResponseChannel : virtual public folly::DelayedDestruction {
204  public:
205   static const uint32_t ONEWAY_REQUEST_ID =
206       std::numeric_limits<uint32_t>::max();
207 
208   class Callback {
209    public:
210     /**
211      * reason is empty if closed due to EOF, or a pointer to an exception
212      * if closed due to some sort of error.
213      */
214     virtual void channelClosed(folly::exception_wrapper&&) = 0;
215 
~Callback()216     virtual ~Callback() {}
217   };
218 
219   /**
220    * The callback will be invoked on each new request.
221    * It will remain installed until explicitly uninstalled, or until
222    * channelClosed() is called.
223    */
224   virtual void setCallback(Callback*) = 0;
225 
226  protected:
~ResponseChannel()227   ~ResponseChannel() override {}
228 };
229 
230 } // namespace thrift
231 } // namespace apache
232 
233 #endif // #ifndef THRIFT_ASYNC_RESPONSECHANNEL_H_
234