1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_H_
18 #define THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_H_ 1
19 
20 #include <folly/io/async/AsyncTransport.h>
21 #include <thrift/lib/cpp/async/TStreamAsyncChannel.h>
22 
23 namespace apache {
24 namespace thrift {
25 namespace async {
26 
27 namespace detail {
28 
29 /**
30  * Encapsulation of one outstanding write request on a TUnframedAsyncChannel.
31  */
32 class TUnframedACWriteRequest
33     : public TAsyncChannelWriteRequestBase<TUnframedACWriteRequest> {
34  public:
35   typedef std::function<void()> VoidCallback;
36 
37   TUnframedACWriteRequest(
38       const VoidCallback& callback,
39       const VoidCallback& errorCallback,
40       transport::TMemoryBuffer* message,
41       TAsyncEventChannel* channel);
42 
43   void write(
44       folly::AsyncTransport* transport,
45       folly::AsyncTransport::WriteCallback* callback) noexcept;
46 
47   void writeSuccess() noexcept;
48   void writeError(
49       size_t bytesWritten, const transport::TTransportException& ex) noexcept;
50 };
51 
52 /**
53  * Read state for TUnframedAsyncChannel
54  */
55 template <typename ProtocolTraits_>
56 class TUnframedACReadState {
57  public:
58   typedef std::function<void()> VoidCallback;
59   typedef ProtocolTraits_ ProtocolTraits;
60 
61   TUnframedACReadState();
62   ~TUnframedACReadState();
63 
64   // Methods required by TStreamAsyncChannel
65 
setCallbackBuffer(transport::TMemoryBuffer * buffer)66   void setCallbackBuffer(transport::TMemoryBuffer* buffer) {
67     callbackBuffer_ = buffer;
68   }
unsetCallbackBuffer()69   void unsetCallbackBuffer() { callbackBuffer_ = nullptr; }
70 
hasReadAheadData()71   bool hasReadAheadData() { return (memBuffer_.available_read() > 0); }
hasPartialMessage()72   bool hasPartialMessage() { return (memBuffer_.available_read() > 0); }
73 
74   void getReadBuffer(void** bufReturn, size_t* lenReturn);
75   bool readDataAvailable(size_t len);
76 
77   // Methods specific to TUnframedACReadState
78 
setMaxMessageSize(uint32_t size)79   void setMaxMessageSize(uint32_t size) { maxMessageSize_ = size; }
80 
getMaxMessageSize()81   uint32_t getMaxMessageSize() const { return maxMessageSize_; }
82 
getProtocolTraits()83   ProtocolTraits_* getProtocolTraits() { return &protocolTraits_; }
getProtocolTraits()84   const ProtocolTraits_* getProtocolTraits() const { return &protocolTraits_; }
85 
86  private:
87   bool getMessageLength(
88       uint8_t* buffer, uint32_t bufferLength, uint32_t* messageLength);
89 
90   /// maximum frame size accepted
91   uint32_t maxMessageSize_;
92 
93   apache::thrift::transport::TMemoryBuffer memBuffer_;
94   apache::thrift::transport::TMemoryBuffer* callbackBuffer_;
95   ProtocolTraits_ protocolTraits_;
96 };
97 
98 } // namespace detail
99 
100 /**
101  * TUnframedAsyncChannel
102  *
103  * This is a TAsyncChannel implementation that reads and writes raw (unframed)
104  * messages.  When reading messages, ProtocolTraits_ is used to determine the
105  * end of a message.
106  */
107 template <typename ProtocolTraits_>
108 class TUnframedAsyncChannel
109     : public TStreamAsyncChannel<
110           apache::thrift::async::detail::TUnframedACWriteRequest,
111           apache::thrift::async::detail::TUnframedACReadState<
112               ProtocolTraits_>> {
113  private:
114   typedef TStreamAsyncChannel<
115       apache::thrift::async::detail::TUnframedACWriteRequest,
116       apache::thrift::async::detail::TUnframedACReadState<ProtocolTraits_>>
117       Parent;
118   typedef TUnframedAsyncChannel<ProtocolTraits_> Self;
119 
120  public:
TUnframedAsyncChannel(const std::shared_ptr<folly::AsyncTransport> & transport)121   explicit TUnframedAsyncChannel(
122       const std::shared_ptr<folly::AsyncTransport>& transport)
123       : Parent(transport) {}
124 
125   /**
126    * Helper function to create a shared_ptr<TUnframedAsyncChannel>.
127    *
128    * This passes in the correct destructor object, since TUnframedAsyncChannel's
129    * destructor is protected and cannot be invoked directly.
130    */
newChannel(const std::shared_ptr<folly::AsyncTransport> & transport)131   static std::shared_ptr<Self> newChannel(
132       const std::shared_ptr<folly::AsyncTransport>& transport) {
133     return std::shared_ptr<Self>(
134         new Self(transport), typename Self::Destructor());
135   }
136 
137   /// size in bytes beyond which we'll reject a given message.
setMaxMessageSize(uint32_t size)138   void setMaxMessageSize(uint32_t size) {
139     this->readState_.setMaxMessageSize(size);
140   }
141 
getMaxMessageSize()142   uint32_t getMaxMessageSize() const {
143     return this->readState_.getMaxMessageSize();
144   }
145 
146  protected:
147   /**
148    * Protected destructor.
149    *
150    * Users of TUnframedAsyncChannel must never delete it directly.  Instead,
151    * invoke destroy().
152    */
~TUnframedAsyncChannel()153   ~TUnframedAsyncChannel() override {}
154 };
155 
156 } // namespace async
157 } // namespace thrift
158 } // namespace apache
159 
160 #include <thrift/lib/cpp/async/TUnframedAsyncChannel-inl.h>
161 
162 #endif // THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_H_
163