1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef THRIFT_ASYNC_CPP2CONNECTION_H_
18 #define THRIFT_ASYNC_CPP2CONNECTION_H_ 1
19 
20 #include <memory>
21 #include <unordered_set>
22 
23 #include <folly/Optional.h>
24 #include <folly/SocketAddress.h>
25 #include <folly/io/async/AsyncTransport.h>
26 #include <folly/io/async/HHWheelTimer.h>
27 #include <thrift/lib/cpp/TApplicationException.h>
28 #include <thrift/lib/cpp/concurrency/Util.h>
29 #include <thrift/lib/cpp2/GeneratedCodeHelper.h>
30 #include <thrift/lib/cpp2/async/DuplexChannel.h>
31 #include <thrift/lib/cpp2/async/HeaderServerChannel.h>
32 #include <thrift/lib/cpp2/server/Cpp2ConnContext.h>
33 #include <thrift/lib/cpp2/server/Cpp2Worker.h>
34 #include <thrift/lib/cpp2/server/LoggingEvent.h>
35 #include <thrift/lib/cpp2/server/RequestsRegistry.h>
36 #include <thrift/lib/cpp2/server/ThriftServer.h>
37 #include <thrift/lib/cpp2/transport/core/RequestStateMachine.h>
38 #include <thrift/lib/cpp2/transport/rocket/Types.h>
39 #include <thrift/lib/thrift/gen-cpp2/RpcMetadata_types.h>
40 #include <wangle/acceptor/ManagedConnection.h>
41 
42 namespace apache {
43 namespace thrift {
44 
45 constexpr folly::StringPiece kQueueLatencyHeader("queue_latency_us");
46 constexpr folly::StringPiece kProcessLatencyHeader("process_latency_us");
47 
48 /**
49  * Represents a connection that is handled via libevent. This connection
50  * essentially encapsulates a socket that has some associated libevent state.
51  */
52 class Cpp2Connection : public HeaderServerChannel::Callback,
53                        public wangle::ManagedConnection {
54  public:
55   /**
56    * Constructor for Cpp2Connection.
57    *
58    * @param asyncSocket shared pointer to the async socket
59    * @param address the peer address of this connection
60    * @param worker the worker instance that is handling this connection
61    * @param serverChannel server channel to use in duplex mode,
62    *        should be nullptr in normal mode
63    */
64   Cpp2Connection(
65       const std::shared_ptr<folly::AsyncTransport>& transport,
66       const folly::SocketAddress* address,
67       std::shared_ptr<Cpp2Worker> worker,
68       const std::shared_ptr<HeaderServerChannel>& serverChannel = nullptr);
69 
70   /// Destructor -- close down the connection.
71   ~Cpp2Connection() override;
72 
73   // HeaderServerChannel callbacks
74   void requestReceived(
75       std::unique_ptr<HeaderServerChannel::HeaderRequest>&&) override;
76   void channelClosed(folly::exception_wrapper&&) override;
77 
start()78   void start() { channel_->setCallback(this); }
79 
80   void stop();
81 
82   void timeoutExpired() noexcept override;
83 
84   void requestTimeoutExpired();
85 
86   void queueTimeoutExpired();
87 
88   bool pending();
89 
90   // Managed Connection callbacks
describe(std::ostream &)91   void describe(std::ostream&) const override {}
isBusy()92   bool isBusy() const override { return activeRequests_.empty(); }
notifyPendingShutdown()93   void notifyPendingShutdown() override {}
closeWhenIdle()94   void closeWhenIdle() override { stop(); }
95   void dropConnection(const std::string& /* errorMsg */ = "") override {
96     stop();
97   }
dumpConnectionState(uint8_t)98   void dumpConnectionState(uint8_t /* loglevel */) override {}
addConnection(std::shared_ptr<Cpp2Connection> conn)99   void addConnection(std::shared_ptr<Cpp2Connection> conn) { this_ = conn; }
100 
101   typedef apache::thrift::ThriftPresult<true>
102       RocketUpgrade_upgradeToRocket_presult;
103   template <class ProtocolWriter>
upgradeToRocketReply(int32_t protoSeqId)104   ResponsePayload upgradeToRocketReply(int32_t protoSeqId) {
105     folly::IOBufQueue queue;
106     ProtocolWriter w;
107     w.setOutput(&queue);
108     w.writeMessageBegin("upgradeToRocket", MessageType::T_REPLY, protoSeqId);
109     RocketUpgrade_upgradeToRocket_presult result;
110     apache::thrift::detail::serializeResponseBody(&w, &result);
111     w.writeMessageEnd();
112     return ResponsePayload::create(queue.move());
113   }
114 
115  protected:
116   apache::thrift::AsyncProcessorFactory& processorFactory_;
117   Cpp2Worker::PerServiceMetadata& serviceMetadata_;
118   std::unique_ptr<apache::thrift::AsyncProcessor> processor_;
119   std::unique_ptr<DuplexChannel> duplexChannel_;
120   std::shared_ptr<apache::thrift::HeaderServerChannel> channel_;
121 
122   std::shared_ptr<Cpp2Worker> worker_;
getWorker()123   Cpp2Worker* getWorker() { return worker_.get(); }
124   Cpp2ConnContext context_;
125 
126   std::shared_ptr<folly::AsyncTransport> transport_;
127   std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager_;
128 
129   /**
130    * Wrap the request in our own request.  This is done for 2 reasons:
131    * a) To have task timeouts for all requests,
132    * b) To ensure the channel is not destroyed before callback is called
133    */
134   class Cpp2Request final : public ResponseChannelRequest {
135    public:
136     friend class Cpp2Connection;
137 
138     class QueueTimeout : public folly::HHWheelTimer::Callback {
139       Cpp2Request* request_;
140       void timeoutExpired() noexcept override;
141       friend class Cpp2Request;
142     };
143     class TaskTimeout : public folly::HHWheelTimer::Callback {
144       Cpp2Request* request_;
145       void timeoutExpired() noexcept override;
146       friend class Cpp2Request;
147     };
148     friend class QueueTimeout;
149     friend class TaskTimeout;
150 
151     Cpp2Request(
152         RequestsRegistry::DebugStub& debugStubToInit,
153         std::unique_ptr<HeaderServerChannel::HeaderRequest> req,
154         std::shared_ptr<folly::RequestContext> rctx,
155         std::shared_ptr<Cpp2Connection> con,
156         rocket::Payload&& debugPayload,
157         std::string&& methodName);
158 
isActive()159     bool isActive() const final { return stateMachine_.isActive(); }
160 
tryCancel()161     bool tryCancel() {
162       return stateMachine_.tryCancel(connection_->getWorker()->getEventBase());
163     }
164 
isOneway()165     bool isOneway() const override { return req_->isOneway(); }
166 
isStream()167     bool isStream() const override { return req_->isStream(); }
168 
includeEnvelope()169     bool includeEnvelope() const override { return req_->includeEnvelope(); }
170 
171     void sendReply(
172         ResponsePayload&& response,
173         MessageChannel::SendCallback* notUsed = nullptr,
174         folly::Optional<uint32_t> crc32c = folly::none) override;
175     void sendException(
176         ResponsePayload&& response,
177         MessageChannel::SendCallback* notUsed = nullptr) override;
178     void sendErrorWrapped(
179         folly::exception_wrapper ew, std::string exCode) override;
180     void sendQueueTimeoutResponse() override;
181     void sendTimeoutResponse(
182         apache::thrift::HeaderServerChannel::HeaderRequest::TimeoutResponseType
183             responseType);
184 
185     ~Cpp2Request() override;
186 
187     // Cancel request is ususally called from a different thread than sendReply.
188     virtual void cancelRequest();
189 
getContext()190     Cpp2RequestContext* getContext() { return &reqContext_; }
191 
getTimestamps()192     server::TServerObserver::CallTimestamps& getTimestamps() {
193       return static_cast<server::TServerObserver::CallTimestamps&>(
194           reqContext_.getTimestamps());
195     }
196 
197    protected:
tryStartProcessing()198     bool tryStartProcessing() final {
199       return stateMachine_.tryStartProcessing();
200     }
201 
202    private:
203     MessageChannel::SendCallback* prepareSendCallback(
204         MessageChannel::SendCallback* sendCallback,
205         apache::thrift::server::TServerObserver* observer);
206 
207     std::unique_ptr<HeaderServerChannel::HeaderRequest> req_;
208 
209     // The order of these two fields matters; to save a shared_ptr operation, we
210     // move into connection_ first and then use the pointer in connection_ to
211     // initialize reqContext_; since field initialization happens in order of
212     // definition, connection_ needs to appear before reqContext_.
213     std::shared_ptr<Cpp2Connection> connection_;
214     Cpp2RequestContext reqContext_;
215 
216     QueueTimeout queueTimeout_;
217     TaskTimeout taskTimeout_;
218 
219     RequestStateMachine stateMachine_;
220 
221     Cpp2Worker::ActiveRequestsGuard activeRequestsGuard_;
222 
cancelTimeout()223     void cancelTimeout() {
224       queueTimeout_.cancelTimeout();
225       taskTimeout_.cancelTimeout();
226     }
227     void markProcessEnd(
228         transport::THeader::StringToStringMap* newHeaders = nullptr);
229     void setLatencyHeaders(
230         const apache::thrift::server::TServerObserver::CallTimestamps&,
231         transport::THeader::StringToStringMap* newHeaders = nullptr) const;
232     void setLatencyHeader(
233         const std::string& key,
234         const std::string& value,
235         transport::THeader::StringToStringMap* newHeaders = nullptr) const;
236   };
237 
238   class Cpp2Sample : public MessageChannel::SendCallback {
239    public:
240     Cpp2Sample(
241         apache::thrift::server::TServerObserver::CallTimestamps& timestamps,
242         apache::thrift::server::TServerObserver* observer,
243         MessageChannel::SendCallback* chainedCallback = nullptr);
244 
245     void sendQueued() override;
246     void messageSent() override;
247     void messageSendError(folly::exception_wrapper&& e) override;
248     ~Cpp2Sample() override;
249 
250    private:
251     apache::thrift::server::TServerObserver::CallTimestamps timestamps_;
252     apache::thrift::server::TServerObserver* observer_;
253     MessageChannel::SendCallback* chainedCallback_;
254   };
255 
256   folly::once_flag setupLoggingFlag_;
257   folly::once_flag clientInfoFlag_;
258 
259   std::unordered_set<Cpp2Request*> activeRequests_;
260 
261   void removeRequest(Cpp2Request* req);
262   void handleAppError(
263       std::unique_ptr<HeaderServerChannel::HeaderRequest> req,
264       const std::string& name,
265       const std::string& message,
266       bool isClientError);
267   void killRequest(
268       std::unique_ptr<HeaderServerChannel::HeaderRequest> req,
269       TApplicationException::TApplicationExceptionType reason,
270       const std::string& errorCode,
271       const char* comment);
272   void disconnect(const char* comment) noexcept;
273 
274   void setServerHeaders(transport::THeader::StringToStringMap& writeHeaders);
275   void setServerHeaders(HeaderServerChannel::HeaderRequest& request);
276 
277   friend class Cpp2Request;
278 
279   std::shared_ptr<Cpp2Connection> this_;
280 };
281 
282 } // namespace thrift
283 } // namespace apache
284 
285 #endif // #ifndef THRIFT_ASYNC_CPP2CONNECTION_H_
286