1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef THRIFT_ASYNC_CPP2CONNECTION_H_ 18 #define THRIFT_ASYNC_CPP2CONNECTION_H_ 1 19 20 #include <memory> 21 #include <unordered_set> 22 23 #include <folly/Optional.h> 24 #include <folly/SocketAddress.h> 25 #include <folly/io/async/AsyncTransport.h> 26 #include <folly/io/async/HHWheelTimer.h> 27 #include <thrift/lib/cpp/TApplicationException.h> 28 #include <thrift/lib/cpp/concurrency/Util.h> 29 #include <thrift/lib/cpp2/GeneratedCodeHelper.h> 30 #include <thrift/lib/cpp2/async/DuplexChannel.h> 31 #include <thrift/lib/cpp2/async/HeaderServerChannel.h> 32 #include <thrift/lib/cpp2/server/Cpp2ConnContext.h> 33 #include <thrift/lib/cpp2/server/Cpp2Worker.h> 34 #include <thrift/lib/cpp2/server/LoggingEvent.h> 35 #include <thrift/lib/cpp2/server/RequestsRegistry.h> 36 #include <thrift/lib/cpp2/server/ThriftServer.h> 37 #include <thrift/lib/cpp2/transport/core/RequestStateMachine.h> 38 #include <thrift/lib/cpp2/transport/rocket/Types.h> 39 #include <thrift/lib/thrift/gen-cpp2/RpcMetadata_types.h> 40 #include <wangle/acceptor/ManagedConnection.h> 41 42 namespace apache { 43 namespace thrift { 44 45 constexpr folly::StringPiece kQueueLatencyHeader("queue_latency_us"); 46 constexpr folly::StringPiece kProcessLatencyHeader("process_latency_us"); 47 48 /** 49 * Represents a connection that is handled via libevent. This connection 50 * essentially encapsulates a socket that has some associated libevent state. 51 */ 52 class Cpp2Connection : public HeaderServerChannel::Callback, 53 public wangle::ManagedConnection { 54 public: 55 /** 56 * Constructor for Cpp2Connection. 57 * 58 * @param asyncSocket shared pointer to the async socket 59 * @param address the peer address of this connection 60 * @param worker the worker instance that is handling this connection 61 * @param serverChannel server channel to use in duplex mode, 62 * should be nullptr in normal mode 63 */ 64 Cpp2Connection( 65 const std::shared_ptr<folly::AsyncTransport>& transport, 66 const folly::SocketAddress* address, 67 std::shared_ptr<Cpp2Worker> worker, 68 const std::shared_ptr<HeaderServerChannel>& serverChannel = nullptr); 69 70 /// Destructor -- close down the connection. 71 ~Cpp2Connection() override; 72 73 // HeaderServerChannel callbacks 74 void requestReceived( 75 std::unique_ptr<HeaderServerChannel::HeaderRequest>&&) override; 76 void channelClosed(folly::exception_wrapper&&) override; 77 start()78 void start() { channel_->setCallback(this); } 79 80 void stop(); 81 82 void timeoutExpired() noexcept override; 83 84 void requestTimeoutExpired(); 85 86 void queueTimeoutExpired(); 87 88 bool pending(); 89 90 // Managed Connection callbacks describe(std::ostream &)91 void describe(std::ostream&) const override {} isBusy()92 bool isBusy() const override { return activeRequests_.empty(); } notifyPendingShutdown()93 void notifyPendingShutdown() override {} closeWhenIdle()94 void closeWhenIdle() override { stop(); } 95 void dropConnection(const std::string& /* errorMsg */ = "") override { 96 stop(); 97 } dumpConnectionState(uint8_t)98 void dumpConnectionState(uint8_t /* loglevel */) override {} addConnection(std::shared_ptr<Cpp2Connection> conn)99 void addConnection(std::shared_ptr<Cpp2Connection> conn) { this_ = conn; } 100 101 typedef apache::thrift::ThriftPresult<true> 102 RocketUpgrade_upgradeToRocket_presult; 103 template <class ProtocolWriter> upgradeToRocketReply(int32_t protoSeqId)104 ResponsePayload upgradeToRocketReply(int32_t protoSeqId) { 105 folly::IOBufQueue queue; 106 ProtocolWriter w; 107 w.setOutput(&queue); 108 w.writeMessageBegin("upgradeToRocket", MessageType::T_REPLY, protoSeqId); 109 RocketUpgrade_upgradeToRocket_presult result; 110 apache::thrift::detail::serializeResponseBody(&w, &result); 111 w.writeMessageEnd(); 112 return ResponsePayload::create(queue.move()); 113 } 114 115 protected: 116 apache::thrift::AsyncProcessorFactory& processorFactory_; 117 Cpp2Worker::PerServiceMetadata& serviceMetadata_; 118 std::unique_ptr<apache::thrift::AsyncProcessor> processor_; 119 std::unique_ptr<DuplexChannel> duplexChannel_; 120 std::shared_ptr<apache::thrift::HeaderServerChannel> channel_; 121 122 std::shared_ptr<Cpp2Worker> worker_; getWorker()123 Cpp2Worker* getWorker() { return worker_.get(); } 124 Cpp2ConnContext context_; 125 126 std::shared_ptr<folly::AsyncTransport> transport_; 127 std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager_; 128 129 /** 130 * Wrap the request in our own request. This is done for 2 reasons: 131 * a) To have task timeouts for all requests, 132 * b) To ensure the channel is not destroyed before callback is called 133 */ 134 class Cpp2Request final : public ResponseChannelRequest { 135 public: 136 friend class Cpp2Connection; 137 138 class QueueTimeout : public folly::HHWheelTimer::Callback { 139 Cpp2Request* request_; 140 void timeoutExpired() noexcept override; 141 friend class Cpp2Request; 142 }; 143 class TaskTimeout : public folly::HHWheelTimer::Callback { 144 Cpp2Request* request_; 145 void timeoutExpired() noexcept override; 146 friend class Cpp2Request; 147 }; 148 friend class QueueTimeout; 149 friend class TaskTimeout; 150 151 Cpp2Request( 152 RequestsRegistry::DebugStub& debugStubToInit, 153 std::unique_ptr<HeaderServerChannel::HeaderRequest> req, 154 std::shared_ptr<folly::RequestContext> rctx, 155 std::shared_ptr<Cpp2Connection> con, 156 rocket::Payload&& debugPayload, 157 std::string&& methodName); 158 isActive()159 bool isActive() const final { return stateMachine_.isActive(); } 160 tryCancel()161 bool tryCancel() { 162 return stateMachine_.tryCancel(connection_->getWorker()->getEventBase()); 163 } 164 isOneway()165 bool isOneway() const override { return req_->isOneway(); } 166 isStream()167 bool isStream() const override { return req_->isStream(); } 168 includeEnvelope()169 bool includeEnvelope() const override { return req_->includeEnvelope(); } 170 171 void sendReply( 172 ResponsePayload&& response, 173 MessageChannel::SendCallback* notUsed = nullptr, 174 folly::Optional<uint32_t> crc32c = folly::none) override; 175 void sendException( 176 ResponsePayload&& response, 177 MessageChannel::SendCallback* notUsed = nullptr) override; 178 void sendErrorWrapped( 179 folly::exception_wrapper ew, std::string exCode) override; 180 void sendQueueTimeoutResponse() override; 181 void sendTimeoutResponse( 182 apache::thrift::HeaderServerChannel::HeaderRequest::TimeoutResponseType 183 responseType); 184 185 ~Cpp2Request() override; 186 187 // Cancel request is ususally called from a different thread than sendReply. 188 virtual void cancelRequest(); 189 getContext()190 Cpp2RequestContext* getContext() { return &reqContext_; } 191 getTimestamps()192 server::TServerObserver::CallTimestamps& getTimestamps() { 193 return static_cast<server::TServerObserver::CallTimestamps&>( 194 reqContext_.getTimestamps()); 195 } 196 197 protected: tryStartProcessing()198 bool tryStartProcessing() final { 199 return stateMachine_.tryStartProcessing(); 200 } 201 202 private: 203 MessageChannel::SendCallback* prepareSendCallback( 204 MessageChannel::SendCallback* sendCallback, 205 apache::thrift::server::TServerObserver* observer); 206 207 std::unique_ptr<HeaderServerChannel::HeaderRequest> req_; 208 209 // The order of these two fields matters; to save a shared_ptr operation, we 210 // move into connection_ first and then use the pointer in connection_ to 211 // initialize reqContext_; since field initialization happens in order of 212 // definition, connection_ needs to appear before reqContext_. 213 std::shared_ptr<Cpp2Connection> connection_; 214 Cpp2RequestContext reqContext_; 215 216 QueueTimeout queueTimeout_; 217 TaskTimeout taskTimeout_; 218 219 RequestStateMachine stateMachine_; 220 221 Cpp2Worker::ActiveRequestsGuard activeRequestsGuard_; 222 cancelTimeout()223 void cancelTimeout() { 224 queueTimeout_.cancelTimeout(); 225 taskTimeout_.cancelTimeout(); 226 } 227 void markProcessEnd( 228 transport::THeader::StringToStringMap* newHeaders = nullptr); 229 void setLatencyHeaders( 230 const apache::thrift::server::TServerObserver::CallTimestamps&, 231 transport::THeader::StringToStringMap* newHeaders = nullptr) const; 232 void setLatencyHeader( 233 const std::string& key, 234 const std::string& value, 235 transport::THeader::StringToStringMap* newHeaders = nullptr) const; 236 }; 237 238 class Cpp2Sample : public MessageChannel::SendCallback { 239 public: 240 Cpp2Sample( 241 apache::thrift::server::TServerObserver::CallTimestamps& timestamps, 242 apache::thrift::server::TServerObserver* observer, 243 MessageChannel::SendCallback* chainedCallback = nullptr); 244 245 void sendQueued() override; 246 void messageSent() override; 247 void messageSendError(folly::exception_wrapper&& e) override; 248 ~Cpp2Sample() override; 249 250 private: 251 apache::thrift::server::TServerObserver::CallTimestamps timestamps_; 252 apache::thrift::server::TServerObserver* observer_; 253 MessageChannel::SendCallback* chainedCallback_; 254 }; 255 256 folly::once_flag setupLoggingFlag_; 257 folly::once_flag clientInfoFlag_; 258 259 std::unordered_set<Cpp2Request*> activeRequests_; 260 261 void removeRequest(Cpp2Request* req); 262 void handleAppError( 263 std::unique_ptr<HeaderServerChannel::HeaderRequest> req, 264 const std::string& name, 265 const std::string& message, 266 bool isClientError); 267 void killRequest( 268 std::unique_ptr<HeaderServerChannel::HeaderRequest> req, 269 TApplicationException::TApplicationExceptionType reason, 270 const std::string& errorCode, 271 const char* comment); 272 void disconnect(const char* comment) noexcept; 273 274 void setServerHeaders(transport::THeader::StringToStringMap& writeHeaders); 275 void setServerHeaders(HeaderServerChannel::HeaderRequest& request); 276 277 friend class Cpp2Request; 278 279 std::shared_ptr<Cpp2Connection> this_; 280 }; 281 282 } // namespace thrift 283 } // namespace apache 284 285 #endif // #ifndef THRIFT_ASYNC_CPP2CONNECTION_H_ 286