1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <list> 20 #include <system_error> 21 22 #include <folly/io/IOBufQueue.h> 23 #include <folly/io/async/AsyncTransport.h> 24 #include <folly/io/async/DelayedDestruction.h> 25 #include <folly/io/async/EventHandler.h> 26 27 namespace folly { 28 29 class AsyncSocketException; 30 31 /** 32 * Read from a pipe in an async manner. 33 */ 34 class AsyncPipeReader : public EventHandler, 35 public AsyncReader, 36 public DelayedDestruction { 37 public: 38 using UniquePtr = folly::DelayedDestructionUniquePtr<AsyncPipeReader>; 39 newReader(folly::EventBase * eventBase,NetworkSocket pipeFd)40 static UniquePtr newReader( 41 folly::EventBase* eventBase, NetworkSocket pipeFd) { 42 return UniquePtr(new AsyncPipeReader(eventBase, pipeFd)); 43 } 44 AsyncPipeReader(folly::EventBase * eventBase,NetworkSocket pipeFd)45 AsyncPipeReader(folly::EventBase* eventBase, NetworkSocket pipeFd) 46 : EventHandler(eventBase, pipeFd), fd_(pipeFd) {} 47 48 /** 49 * Set the read callback and automatically install/uninstall the handler 50 * for events. 51 */ setReadCB(AsyncReader::ReadCallback * callback)52 void setReadCB(AsyncReader::ReadCallback* callback) override { 53 if (callback == readCallback_) { 54 return; 55 } 56 readCallback_ = callback; 57 if (readCallback_ && !isHandlerRegistered()) { 58 registerHandler(EventHandler::READ | EventHandler::PERSIST); 59 } else if (!readCallback_ && isHandlerRegistered()) { 60 unregisterHandler(); 61 } 62 } 63 64 /** 65 * Get the read callback 66 */ getReadCallback()67 AsyncReader::ReadCallback* getReadCallback() const override { 68 return readCallback_; 69 } 70 71 /** 72 * Set a special hook to close the socket (otherwise, will call close()) 73 */ setCloseCallback(std::function<void (NetworkSocket)> closeCb)74 void setCloseCallback(std::function<void(NetworkSocket)> closeCb) { 75 closeCb_ = closeCb; 76 } 77 78 private: 79 ~AsyncPipeReader() override; 80 81 void handlerReady(uint16_t events) noexcept override; 82 void failRead(const AsyncSocketException& ex); 83 void close(); 84 85 NetworkSocket fd_; 86 AsyncReader::ReadCallback* readCallback_{nullptr}; 87 std::function<void(NetworkSocket)> closeCb_; 88 }; 89 90 /** 91 * Write to a pipe in an async manner. 92 */ 93 class AsyncPipeWriter : public EventHandler, 94 public AsyncWriter, 95 public DelayedDestruction { 96 public: 97 using UniquePtr = folly::DelayedDestructionUniquePtr<AsyncPipeWriter>; 98 newWriter(folly::EventBase * eventBase,NetworkSocket pipeFd)99 static UniquePtr newWriter( 100 folly::EventBase* eventBase, NetworkSocket pipeFd) { 101 return UniquePtr(new AsyncPipeWriter(eventBase, pipeFd)); 102 } 103 AsyncPipeWriter(folly::EventBase * eventBase,NetworkSocket pipeFd)104 AsyncPipeWriter(folly::EventBase* eventBase, NetworkSocket pipeFd) 105 : EventHandler(eventBase, pipeFd), fd_(pipeFd) {} 106 107 /** 108 * Asynchronously write the given iobuf to this pipe, and invoke the callback 109 * on success/error. 110 */ 111 void write( 112 std::unique_ptr<folly::IOBuf> buf, 113 AsyncWriter::WriteCallback* callback = nullptr); 114 115 /** 116 * Set a special hook to close the socket (otherwise, will call close()) 117 */ setCloseCallback(std::function<void (NetworkSocket)> closeCb)118 void setCloseCallback(std::function<void(NetworkSocket)> closeCb) { 119 closeCb_ = closeCb; 120 } 121 122 /** 123 * Returns true if the pipe is closed 124 */ closed()125 bool closed() const { return (fd_ == NetworkSocket() || closeOnEmpty_); } 126 127 /** 128 * Notify the pipe to close as soon as all pending writes complete 129 */ 130 void closeOnEmpty(); 131 132 /** 133 * Close the pipe immediately, and fail all pending writes 134 */ 135 void closeNow(); 136 137 /** 138 * Return true if there are currently writes pending (eg: the pipe is blocked 139 * for writing) 140 */ hasPendingWrites()141 bool hasPendingWrites() const { return !queue_.empty(); } 142 143 // AsyncWriter methods 144 void write( 145 folly::AsyncWriter::WriteCallback* callback, 146 const void* buf, 147 size_t bytes, 148 WriteFlags flags = WriteFlags::NONE) override { 149 writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags); 150 } 151 void writev( 152 folly::AsyncWriter::WriteCallback*, 153 const iovec*, 154 size_t, 155 WriteFlags = WriteFlags::NONE) override { 156 throw std::runtime_error("writev is not supported. Please use writeChain."); 157 } 158 void writeChain( 159 folly::AsyncWriter::WriteCallback* callback, 160 std::unique_ptr<folly::IOBuf>&& buf, 161 WriteFlags flags = WriteFlags::NONE) override; 162 163 private: 164 void handlerReady(uint16_t events) noexcept override; 165 void handleWrite(); 166 void failAllWrites(const AsyncSocketException& ex); 167 168 NetworkSocket fd_; 169 std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_; 170 bool closeOnEmpty_{false}; 171 std::function<void(NetworkSocket)> closeCb_; 172 ~AsyncPipeWriter()173 ~AsyncPipeWriter() override { closeNow(); } 174 }; 175 176 } // namespace folly 177