1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <list>
20 #include <system_error>
21 
22 #include <folly/io/IOBufQueue.h>
23 #include <folly/io/async/AsyncTransport.h>
24 #include <folly/io/async/DelayedDestruction.h>
25 #include <folly/io/async/EventHandler.h>
26 
27 namespace folly {
28 
29 class AsyncSocketException;
30 
31 /**
32  * Read from a pipe in an async manner.
33  */
34 class AsyncPipeReader : public EventHandler,
35                         public AsyncReader,
36                         public DelayedDestruction {
37  public:
38   using UniquePtr = folly::DelayedDestructionUniquePtr<AsyncPipeReader>;
39 
newReader(folly::EventBase * eventBase,NetworkSocket pipeFd)40   static UniquePtr newReader(
41       folly::EventBase* eventBase, NetworkSocket pipeFd) {
42     return UniquePtr(new AsyncPipeReader(eventBase, pipeFd));
43   }
44 
AsyncPipeReader(folly::EventBase * eventBase,NetworkSocket pipeFd)45   AsyncPipeReader(folly::EventBase* eventBase, NetworkSocket pipeFd)
46       : EventHandler(eventBase, pipeFd), fd_(pipeFd) {}
47 
48   /**
49    * Set the read callback and automatically install/uninstall the handler
50    * for events.
51    */
setReadCB(AsyncReader::ReadCallback * callback)52   void setReadCB(AsyncReader::ReadCallback* callback) override {
53     if (callback == readCallback_) {
54       return;
55     }
56     readCallback_ = callback;
57     if (readCallback_ && !isHandlerRegistered()) {
58       registerHandler(EventHandler::READ | EventHandler::PERSIST);
59     } else if (!readCallback_ && isHandlerRegistered()) {
60       unregisterHandler();
61     }
62   }
63 
64   /**
65    * Get the read callback
66    */
getReadCallback()67   AsyncReader::ReadCallback* getReadCallback() const override {
68     return readCallback_;
69   }
70 
71   /**
72    * Set a special hook to close the socket (otherwise, will call close())
73    */
setCloseCallback(std::function<void (NetworkSocket)> closeCb)74   void setCloseCallback(std::function<void(NetworkSocket)> closeCb) {
75     closeCb_ = closeCb;
76   }
77 
78  private:
79   ~AsyncPipeReader() override;
80 
81   void handlerReady(uint16_t events) noexcept override;
82   void failRead(const AsyncSocketException& ex);
83   void close();
84 
85   NetworkSocket fd_;
86   AsyncReader::ReadCallback* readCallback_{nullptr};
87   std::function<void(NetworkSocket)> closeCb_;
88 };
89 
90 /**
91  * Write to a pipe in an async manner.
92  */
93 class AsyncPipeWriter : public EventHandler,
94                         public AsyncWriter,
95                         public DelayedDestruction {
96  public:
97   using UniquePtr = folly::DelayedDestructionUniquePtr<AsyncPipeWriter>;
98 
newWriter(folly::EventBase * eventBase,NetworkSocket pipeFd)99   static UniquePtr newWriter(
100       folly::EventBase* eventBase, NetworkSocket pipeFd) {
101     return UniquePtr(new AsyncPipeWriter(eventBase, pipeFd));
102   }
103 
AsyncPipeWriter(folly::EventBase * eventBase,NetworkSocket pipeFd)104   AsyncPipeWriter(folly::EventBase* eventBase, NetworkSocket pipeFd)
105       : EventHandler(eventBase, pipeFd), fd_(pipeFd) {}
106 
107   /**
108    * Asynchronously write the given iobuf to this pipe, and invoke the callback
109    * on success/error.
110    */
111   void write(
112       std::unique_ptr<folly::IOBuf> buf,
113       AsyncWriter::WriteCallback* callback = nullptr);
114 
115   /**
116    * Set a special hook to close the socket (otherwise, will call close())
117    */
setCloseCallback(std::function<void (NetworkSocket)> closeCb)118   void setCloseCallback(std::function<void(NetworkSocket)> closeCb) {
119     closeCb_ = closeCb;
120   }
121 
122   /**
123    * Returns true if the pipe is closed
124    */
closed()125   bool closed() const { return (fd_ == NetworkSocket() || closeOnEmpty_); }
126 
127   /**
128    * Notify the pipe to close as soon as all pending writes complete
129    */
130   void closeOnEmpty();
131 
132   /**
133    * Close the pipe immediately, and fail all pending writes
134    */
135   void closeNow();
136 
137   /**
138    * Return true if there are currently writes pending (eg: the pipe is blocked
139    * for writing)
140    */
hasPendingWrites()141   bool hasPendingWrites() const { return !queue_.empty(); }
142 
143   // AsyncWriter methods
144   void write(
145       folly::AsyncWriter::WriteCallback* callback,
146       const void* buf,
147       size_t bytes,
148       WriteFlags flags = WriteFlags::NONE) override {
149     writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags);
150   }
151   void writev(
152       folly::AsyncWriter::WriteCallback*,
153       const iovec*,
154       size_t,
155       WriteFlags = WriteFlags::NONE) override {
156     throw std::runtime_error("writev is not supported. Please use writeChain.");
157   }
158   void writeChain(
159       folly::AsyncWriter::WriteCallback* callback,
160       std::unique_ptr<folly::IOBuf>&& buf,
161       WriteFlags flags = WriteFlags::NONE) override;
162 
163  private:
164   void handlerReady(uint16_t events) noexcept override;
165   void handleWrite();
166   void failAllWrites(const AsyncSocketException& ex);
167 
168   NetworkSocket fd_;
169   std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_;
170   bool closeOnEmpty_{false};
171   std::function<void(NetworkSocket)> closeCb_;
172 
~AsyncPipeWriter()173   ~AsyncPipeWriter() override { closeNow(); }
174 };
175 
176 } // namespace folly
177