1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef THRIFT_ASYNC_RESPONSECHANNEL_H_ 18 #define THRIFT_ASYNC_RESPONSECHANNEL_H_ 1 19 20 #include <chrono> 21 #include <limits> 22 #include <memory> 23 24 #include <folly/Portability.h> 25 26 #include <thrift/lib/cpp/Thrift.h> 27 #include <thrift/lib/cpp/server/TServerObserver.h> 28 #include <thrift/lib/cpp2/async/MessageChannel.h> 29 #include <thrift/lib/cpp2/async/ServerStream.h> 30 #include <thrift/lib/cpp2/async/Sink.h> 31 #include <thrift/lib/cpp2/async/StreamCallbacks.h> 32 #include <thrift/lib/cpp2/server/RequestsRegistry.h> 33 34 namespace folly { 35 class IOBuf; 36 } 37 38 extern const std::string kUnknownErrorCode; 39 extern const std::string kOverloadedErrorCode; 40 extern const std::string kAppOverloadedErrorCode; 41 extern const std::string kAppClientErrorCode; 42 extern const std::string kAppServerErrorCode; 43 extern const std::string kTaskExpiredErrorCode; 44 extern const std::string kProxyTransportExceptionErrorCode; 45 extern const std::string kProxyClientProtocolExceptionErrorCode; 46 extern const std::string kQueueOverloadedErrorCode; 47 extern const std::string kProxyHeaderParseExceptionErrorCode; 48 extern const std::string kProxyAuthExceptionErrorCode; 49 extern const std::string kProxyLookupTransportExceptionErrorCode; 50 extern const std::string kProxyLookupAppExceptionErrorCode; 51 extern const std::string kProxyWhitelistExceptionErrorCode; 52 extern const std::string kProxyClientAppExceptionErrorCode; 53 extern const std::string kProxyProtocolMismatchExceptionErrorCode; 54 extern const std::string kProxyQPSThrottledExceptionErrorCode; 55 extern const std::string kProxyResponseSizeThrottledExceptionErrorCode; 56 extern const std::string kInjectedFailureErrorCode; 57 extern const std::string kServerQueueTimeoutErrorCode; 58 extern const std::string kResponseTooBigErrorCode; 59 extern const std::string kProxyAclCheckExceptionErrorCode; 60 extern const std::string kProxyOverloadedErrorCode; 61 extern const std::string kProxyLoopbackErrorCode; 62 extern const std::string kRequestTypeDoesntMatchServiceFunctionType; 63 extern const std::string kMethodUnknownErrorCode; 64 extern const std::string kInteractionIdUnknownErrorCode; 65 extern const std::string kInteractionConstructorErrorErrorCode; 66 extern const std::string kConnectionClosingErrorCode; 67 extern const std::string kRequestParsingErrorCode; 68 extern const std::string kServerIngressMemoryLimitExceededErrorCode; 69 extern const std::string kChecksumMismatchErrorCode; 70 71 namespace apache { 72 namespace thrift { 73 74 class ResponseChannelRequest { 75 public: 76 using UniquePtr = 77 std::unique_ptr<ResponseChannelRequest, RequestsRegistry::Deleter>; 78 79 virtual bool isActive() const = 0; 80 81 virtual bool isOneway() const = 0; 82 isStream()83 virtual bool isStream() const { return false; } 84 isSink()85 virtual bool isSink() const { return false; } 86 87 virtual bool includeEnvelope() const = 0; 88 rpcKind()89 apache::thrift::RpcKind rpcKind() const { 90 if (isStream()) { 91 return apache::thrift::RpcKind::SINGLE_REQUEST_STREAMING_RESPONSE; 92 } 93 if (isSink()) { 94 return apache::thrift::RpcKind::SINK; 95 } 96 if (isOneway()) { 97 return apache::thrift::RpcKind::SINGLE_REQUEST_NO_RESPONSE; 98 } 99 return apache::thrift::RpcKind::SINGLE_REQUEST_SINGLE_RESPONSE; 100 } 101 isReplyChecksumNeeded()102 virtual bool isReplyChecksumNeeded() const { return false; } 103 104 virtual void sendReply( 105 ResponsePayload&&, 106 MessageChannel::SendCallback* cb = nullptr, 107 folly::Optional<uint32_t> crc32 = folly::none) = 0; 108 109 virtual void sendStreamReply( 110 ResponsePayload&&, 111 apache::thrift::detail::ServerStreamFactory&&, 112 folly::Optional<uint32_t> = folly::none) { 113 throw std::logic_error("unimplemented"); 114 } 115 116 FOLLY_NODISCARD static bool sendStreamReply( 117 ResponseChannelRequest::UniquePtr request, 118 folly::EventBase* eb, 119 ResponsePayload&& payload, 120 StreamServerCallbackPtr callback, 121 folly::Optional<uint32_t> crc32 = folly::none) { 122 // Destroying request can call onStreamCancel inline, which would be a 123 // contract violation if we did it inline and returned true. 124 SCOPE_EXIT { 125 eb->runInEventBaseThread([request = std::move(request)] {}); 126 }; 127 return request->sendStreamReply( 128 std::move(payload), std::move(callback), crc32); 129 } 130 131 #if FOLLY_HAS_COROUTINES 132 virtual void sendSinkReply( 133 ResponsePayload&&, 134 apache::thrift::detail::SinkConsumerImpl&&, 135 folly::Optional<uint32_t> = folly::none) { 136 throw std::logic_error("unimplemented"); 137 } 138 139 FOLLY_NODISCARD static bool sendSinkReply( 140 ResponseChannelRequest::UniquePtr request, 141 folly::EventBase* eb, 142 ResponsePayload&& payload, 143 SinkServerCallbackPtr callback, 144 folly::Optional<uint32_t> crc32 = folly::none) { 145 SCOPE_EXIT { 146 eb->runInEventBaseThread([request = std::move(request)] {}); 147 }; 148 return request->sendSinkReply( 149 std::move(payload), std::move(callback), crc32); 150 } 151 #endif 152 153 virtual void sendException( 154 ResponsePayload&& response, MessageChannel::SendCallback* cb = nullptr) { 155 // Until we start requesting payloads without the envelope we can pass any 156 // sendException calls to sendReply 157 sendReply(std::move(response), cb, folly::none); 158 } 159 160 virtual void sendErrorWrapped( 161 folly::exception_wrapper ex, std::string exCode) = 0; 162 sendQueueTimeoutResponse()163 virtual void sendQueueTimeoutResponse() {} 164 165 virtual ~ResponseChannelRequest() = default; 166 getShouldStartProcessing()167 bool getShouldStartProcessing() { 168 if (!tryStartProcessing()) { 169 return false; 170 } 171 return true; 172 } 173 174 protected: 175 // callTryStartProcessing is a helper method used in ResponseChannelRequest 176 // wrapper subclasses to delegate tryStartProcessing() calls to the wrapped 177 // ResponseChannelRequest. This is necessary due to the protected nature of 178 // tryStartProcessing(). callTryStartProcessing(ResponseChannelRequest * request)179 static bool callTryStartProcessing(ResponseChannelRequest* request) { 180 return request->tryStartProcessing(); 181 } 182 virtual bool tryStartProcessing() = 0; 183 184 FOLLY_NODISCARD virtual bool sendStreamReply( 185 ResponsePayload&&, 186 StreamServerCallbackPtr, 187 folly::Optional<uint32_t> = folly::none) { 188 throw std::logic_error("unimplemented"); 189 } 190 FOLLY_NODISCARD virtual bool sendSinkReply( 191 ResponsePayload&&, 192 SinkServerCallbackPtr, 193 folly::Optional<uint32_t> = folly::none) { 194 throw std::logic_error("unimplemented"); 195 } 196 197 bool startedProcessing_{false}; 198 }; 199 200 /** 201 * ResponseChannel defines an asynchronous API for servers. 202 */ 203 class ResponseChannel : virtual public folly::DelayedDestruction { 204 public: 205 static const uint32_t ONEWAY_REQUEST_ID = 206 std::numeric_limits<uint32_t>::max(); 207 208 class Callback { 209 public: 210 /** 211 * reason is empty if closed due to EOF, or a pointer to an exception 212 * if closed due to some sort of error. 213 */ 214 virtual void channelClosed(folly::exception_wrapper&&) = 0; 215 ~Callback()216 virtual ~Callback() {} 217 }; 218 219 /** 220 * The callback will be invoked on each new request. 221 * It will remain installed until explicitly uninstalled, or until 222 * channelClosed() is called. 223 */ 224 virtual void setCallback(Callback*) = 0; 225 226 protected: ~ResponseChannel()227 ~ResponseChannel() override {} 228 }; 229 230 } // namespace thrift 231 } // namespace apache 232 233 #endif // #ifndef THRIFT_ASYNC_RESPONSECHANNEL_H_ 234