1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_H_ 18 #define THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_H_ 1 19 20 #include <folly/io/async/AsyncTransport.h> 21 #include <thrift/lib/cpp/async/TStreamAsyncChannel.h> 22 23 namespace apache { 24 namespace thrift { 25 namespace async { 26 27 namespace detail { 28 29 /** 30 * Encapsulation of one outstanding write request on a TUnframedAsyncChannel. 31 */ 32 class TUnframedACWriteRequest 33 : public TAsyncChannelWriteRequestBase<TUnframedACWriteRequest> { 34 public: 35 typedef std::function<void()> VoidCallback; 36 37 TUnframedACWriteRequest( 38 const VoidCallback& callback, 39 const VoidCallback& errorCallback, 40 transport::TMemoryBuffer* message, 41 TAsyncEventChannel* channel); 42 43 void write( 44 folly::AsyncTransport* transport, 45 folly::AsyncTransport::WriteCallback* callback) noexcept; 46 47 void writeSuccess() noexcept; 48 void writeError( 49 size_t bytesWritten, const transport::TTransportException& ex) noexcept; 50 }; 51 52 /** 53 * Read state for TUnframedAsyncChannel 54 */ 55 template <typename ProtocolTraits_> 56 class TUnframedACReadState { 57 public: 58 typedef std::function<void()> VoidCallback; 59 typedef ProtocolTraits_ ProtocolTraits; 60 61 TUnframedACReadState(); 62 ~TUnframedACReadState(); 63 64 // Methods required by TStreamAsyncChannel 65 setCallbackBuffer(transport::TMemoryBuffer * buffer)66 void setCallbackBuffer(transport::TMemoryBuffer* buffer) { 67 callbackBuffer_ = buffer; 68 } unsetCallbackBuffer()69 void unsetCallbackBuffer() { callbackBuffer_ = nullptr; } 70 hasReadAheadData()71 bool hasReadAheadData() { return (memBuffer_.available_read() > 0); } hasPartialMessage()72 bool hasPartialMessage() { return (memBuffer_.available_read() > 0); } 73 74 void getReadBuffer(void** bufReturn, size_t* lenReturn); 75 bool readDataAvailable(size_t len); 76 77 // Methods specific to TUnframedACReadState 78 setMaxMessageSize(uint32_t size)79 void setMaxMessageSize(uint32_t size) { maxMessageSize_ = size; } 80 getMaxMessageSize()81 uint32_t getMaxMessageSize() const { return maxMessageSize_; } 82 getProtocolTraits()83 ProtocolTraits_* getProtocolTraits() { return &protocolTraits_; } getProtocolTraits()84 const ProtocolTraits_* getProtocolTraits() const { return &protocolTraits_; } 85 86 private: 87 bool getMessageLength( 88 uint8_t* buffer, uint32_t bufferLength, uint32_t* messageLength); 89 90 /// maximum frame size accepted 91 uint32_t maxMessageSize_; 92 93 apache::thrift::transport::TMemoryBuffer memBuffer_; 94 apache::thrift::transport::TMemoryBuffer* callbackBuffer_; 95 ProtocolTraits_ protocolTraits_; 96 }; 97 98 } // namespace detail 99 100 /** 101 * TUnframedAsyncChannel 102 * 103 * This is a TAsyncChannel implementation that reads and writes raw (unframed) 104 * messages. When reading messages, ProtocolTraits_ is used to determine the 105 * end of a message. 106 */ 107 template <typename ProtocolTraits_> 108 class TUnframedAsyncChannel 109 : public TStreamAsyncChannel< 110 apache::thrift::async::detail::TUnframedACWriteRequest, 111 apache::thrift::async::detail::TUnframedACReadState< 112 ProtocolTraits_>> { 113 private: 114 typedef TStreamAsyncChannel< 115 apache::thrift::async::detail::TUnframedACWriteRequest, 116 apache::thrift::async::detail::TUnframedACReadState<ProtocolTraits_>> 117 Parent; 118 typedef TUnframedAsyncChannel<ProtocolTraits_> Self; 119 120 public: TUnframedAsyncChannel(const std::shared_ptr<folly::AsyncTransport> & transport)121 explicit TUnframedAsyncChannel( 122 const std::shared_ptr<folly::AsyncTransport>& transport) 123 : Parent(transport) {} 124 125 /** 126 * Helper function to create a shared_ptr<TUnframedAsyncChannel>. 127 * 128 * This passes in the correct destructor object, since TUnframedAsyncChannel's 129 * destructor is protected and cannot be invoked directly. 130 */ newChannel(const std::shared_ptr<folly::AsyncTransport> & transport)131 static std::shared_ptr<Self> newChannel( 132 const std::shared_ptr<folly::AsyncTransport>& transport) { 133 return std::shared_ptr<Self>( 134 new Self(transport), typename Self::Destructor()); 135 } 136 137 /// size in bytes beyond which we'll reject a given message. setMaxMessageSize(uint32_t size)138 void setMaxMessageSize(uint32_t size) { 139 this->readState_.setMaxMessageSize(size); 140 } 141 getMaxMessageSize()142 uint32_t getMaxMessageSize() const { 143 return this->readState_.getMaxMessageSize(); 144 } 145 146 protected: 147 /** 148 * Protected destructor. 149 * 150 * Users of TUnframedAsyncChannel must never delete it directly. Instead, 151 * invoke destroy(). 152 */ ~TUnframedAsyncChannel()153 ~TUnframedAsyncChannel() override {} 154 }; 155 156 } // namespace async 157 } // namespace thrift 158 } // namespace apache 159 160 #include <thrift/lib/cpp/async/TUnframedAsyncChannel-inl.h> 161 162 #endif // THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_H_ 163