1 #pragma once
2 
3 
4 #include <algorithm>
5 #include <iterator>
6 #include <cstddef>
7 #include <utility>
8 #include <memory>
9 #include <uv.h>
10 #include "request.hpp"
11 #include "handle.hpp"
12 #include "loop.hpp"
13 
14 
15 namespace uvw {
16 
17 
18 /**
19  * @brief ConnectEvent event.
20  *
21  * It will be emitted by StreamHandle according with its functionalities.
22  */
23 struct ConnectEvent {};
24 
25 
26 /**
27  * @brief EndEvent event.
28  *
29  * It will be emitted by StreamHandle according with its functionalities.
30  */
31 struct EndEvent {};
32 
33 
34 /**
35  * @brief ListenEvent event.
36  *
37  * It will be emitted by StreamHandle according with its functionalities.
38  */
39 struct ListenEvent {};
40 
41 
42 /**
43  * @brief ShutdownEvent event.
44  *
45  * It will be emitted by StreamHandle according with its functionalities.
46  */
47 struct ShutdownEvent {};
48 
49 
50 /**
51  * @brief WriteEvent event.
52  *
53  * It will be emitted by StreamHandle according with its functionalities.
54  */
55 struct WriteEvent {};
56 
57 
58 /**
59  * @brief DataEvent event.
60  *
61  * It will be emitted by StreamHandle according with its functionalities.
62  */
63 struct DataEvent {
DataEventuvw::DataEvent64     explicit DataEvent(std::unique_ptr<char[]> buf, std::size_t len) noexcept
65         : data{std::move(buf)}, length{len}
66     {}
67 
68     std::unique_ptr<char[]> data; /*!< A bunch of data read on the stream. */
69     std::size_t length; /*!< The amount of data read on the stream. */
70 };
71 
72 
73 namespace details {
74 
75 
76 struct ConnectReq final: public Request<ConnectReq, uv_connect_t> {
77     using Request::Request;
78 
79     template<typename F, typename... Args>
connectuvw::details::ConnectReq80     void connect(F &&f, Args&&... args) {
81         invoke(std::forward<F>(f), get(), std::forward<Args>(args)..., &defaultCallback<ConnectEvent>);
82     }
83 };
84 
85 
86 struct ShutdownReq final: public Request<ShutdownReq, uv_shutdown_t> {
87     using Request::Request;
88 
shutdownuvw::details::ShutdownReq89     void shutdown(uv_stream_t *handle) {
90         invoke(&uv_shutdown, get(), handle, &defaultCallback<ShutdownEvent>);
91     }
92 };
93 
94 
95 class WriteReq final: public Request<WriteReq, uv_write_t> {
96 public:
97     using Deleter = void(*)(char *);
98 
WriteReq(ConstructorAccess ca,std::shared_ptr<Loop> loop,std::unique_ptr<char[],Deleter> dt,unsigned int len)99     WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<char[], Deleter> dt, unsigned int len)
100         : Request<WriteReq, uv_write_t>{ca, std::move(loop)},
101           data{std::move(dt)},
102           buf{uv_buf_init(data.get(), len)}
103     {}
104 
write(uv_stream_t * handle)105     void write(uv_stream_t *handle) {
106         invoke(&uv_write, get(), handle, &buf, 1, &defaultCallback<WriteEvent>);
107     }
108 
write(uv_stream_t * handle,uv_stream_t * send)109     void write(uv_stream_t *handle, uv_stream_t *send) {
110         invoke(&uv_write2, get(), handle, &buf, 1, send, &defaultCallback<WriteEvent>);
111     }
112 
113 private:
114     std::unique_ptr<char[], Deleter> data;
115     uv_buf_t buf;
116 };
117 
118 
119 }
120 
121 
122 /**
123  * @brief The StreamHandle handle.
124  *
125  * Stream handles provide an abstraction of a duplex communication channel.
126  * StreamHandle is an intermediate type, `uvw` provides three stream
127  * implementations: TcpHandle, PipeHandle and TTYHandle.
128  */
129 template<typename T, typename U>
130 class StreamHandle: public Handle<T, U> {
131     static constexpr unsigned int DEFAULT_BACKLOG = 128;
132 
readCallback(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)133     static void readCallback(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
134         T &ref = *(static_cast<T*>(handle->data));
135         // data will be destroyed no matter of what the value of nread is
136         std::unique_ptr<char[]> data{buf->base};
137 
138         // nread == 0 is ignored (see http://docs.libuv.org/en/v1.x/stream.html)
139         // equivalent to EAGAIN/EWOULDBLOCK, it shouldn't be treated as an error
140         // for we don't have data to emit though, it's fine to suppress it
141 
142         if(nread == UV_EOF) {
143             // end of stream
144             ref.publish(EndEvent{});
145         } else if(nread > 0) {
146             // data available
147             ref.publish(DataEvent{std::move(data), static_cast<std::size_t>(nread)});
148         } else if(nread < 0) {
149             // transmission error
150             ref.publish(ErrorEvent(nread));
151         }
152     }
153 
listenCallback(uv_stream_t * handle,int status)154     static void listenCallback(uv_stream_t *handle, int status) {
155         T &ref = *(static_cast<T*>(handle->data));
156         if(status) { ref.publish(ErrorEvent{status}); }
157         else { ref.publish(ListenEvent{}); }
158     }
159 
160 public:
161 #ifdef _MSC_VER
StreamHandle(typename Handle<T,U>::ConstructorAccess ca,std::shared_ptr<Loop> ref)162     StreamHandle(typename Handle<T, U>::ConstructorAccess ca, std::shared_ptr<Loop> ref)
163         : Handle<T, U>{ca, std::move(ref)}
164     {}
165 #else
166     using Handle<T, U>::Handle;
167 #endif
168 
169     /**
170      * @brief Shutdowns the outgoing (write) side of a duplex stream.
171      *
172      * It waits for pending write requests to complete. The handle should refer
173      * to a initialized stream.<br/>
174      * A ShutdownEvent event will be emitted after shutdown is complete.
175      */
shutdown()176     void shutdown() {
177         auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
178             ptr->publish(event);
179         };
180 
181         auto shutdown = this->loop().template resource<details::ShutdownReq>();
182         shutdown->template once<ErrorEvent>(listener);
183         shutdown->template once<ShutdownEvent>(listener);
184         shutdown->shutdown(this->template get<uv_stream_t>());
185     }
186 
187     /**
188      * @brief Starts listening for incoming connections.
189      *
190      * When a new incoming connection is received, a ListenEvent event is
191      * emitted.<br/>
192      * An ErrorEvent event will be emitted in case of errors.
193      *
194      * @param backlog Indicates the number of connections the kernel might
195      * queue, same as listen(2).
196      */
listen(int backlog=DEFAULT_BACKLOG)197     void listen(int backlog = DEFAULT_BACKLOG) {
198         this->invoke(&uv_listen, this->template get<uv_stream_t>(), backlog, &listenCallback);
199     }
200 
201     /**
202      * @brief Accepts incoming connections.
203      *
204      * This call is used in conjunction with `listen()` to accept incoming
205      * connections. Call this function after receiving a ListenEvent event to
206      * accept the connection. Before calling this function, the submitted handle
207      * must be initialized.<br>
208      * An ErrorEvent event will be emitted in case of errors.
209      *
210      * When the ListenEvent event is emitted it is guaranteed that this
211      * function will complete successfully the first time. If you attempt to use
212      * it more than once, it may fail.<br/>
213      * It is suggested to only call this function once per ListenEvent event.
214      *
215      * @note
216      * Both the handles must be running on the same loop.
217      *
218      * @param ref An initialized handle to be used to accept the connection.
219      */
220     template<typename S>
accept(S & ref)221     void accept(S &ref) {
222         this->invoke(&uv_accept, this->template get<uv_stream_t>(), ref.template get<uv_stream_t>());
223     }
224 
225     /**
226      * @brief Starts reading data from an incoming stream.
227      *
228      * A DataEvent event will be emitted several times until there is no more
229      * data to read or `stop()` is called.<br/>
230      * An EndEvent event will be emitted when there is no more data to read.
231      */
read()232     void read() {
233         this->invoke(&uv_read_start, this->template get<uv_stream_t>(), &this->allocCallback, &readCallback);
234     }
235 
236     /**
237      * @brief Stops reading data from the stream.
238      *
239      * This function is idempotent and may be safely called on a stopped stream.
240      */
stop()241     void stop() {
242         this->invoke(&uv_read_stop, this->template get<uv_stream_t>());
243     }
244 
245     /**
246      * @brief Writes data to the stream.
247      *
248      * Data are written in order. The handle takes the ownership of the data and
249      * it is in charge of delete them.
250      *
251      * A WriteEvent event will be emitted when the data have been written.<br/>
252      * An ErrorEvent event will be emitted in case of errors.
253      *
254      * @param data The data to be written to the stream.
255      * @param len The lenght of the submitted data.
256      */
write(std::unique_ptr<char[]> data,unsigned int len)257     void write(std::unique_ptr<char[]> data, unsigned int len) {
258         auto req = this->loop().template resource<details::WriteReq>(
259                     std::unique_ptr<char[], details::WriteReq::Deleter>{
260                         data.release(), [](char *ptr) { delete[] ptr; }
261                     }, len);
262 
263         auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
264             ptr->publish(event);
265         };
266 
267         req->template once<ErrorEvent>(listener);
268         req->template once<WriteEvent>(listener);
269         req->write(this->template get<uv_stream_t>());
270     }
271 
272     /**
273      * @brief Writes data to the stream.
274      *
275      * Data are written in order. The handle doesn't take the ownership of the
276      * data. Be sure that their lifetime overcome the one of the request.
277      *
278      * A WriteEvent event will be emitted when the data have been written.<br/>
279      * An ErrorEvent event will be emitted in case of errors.
280      *
281      * @param data The data to be written to the stream.
282      * @param len The lenght of the submitted data.
283      */
write(char * data,unsigned int len)284     void write(char *data, unsigned int len) {
285         auto req = this->loop().template resource<details::WriteReq>(
286                     std::unique_ptr<char[], details::WriteReq::Deleter>{
287                         data, [](char *) {}
288                     }, len);
289 
290         auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
291             ptr->publish(event);
292         };
293 
294         req->template once<ErrorEvent>(listener);
295         req->template once<WriteEvent>(listener);
296         req->write(this->template get<uv_stream_t>());
297     }
298 
299     /**
300      * @brief Extended write function for sending handles over a pipe handle.
301      *
302      * The pipe must be initialized with `ipc == true`.
303      *
304      * `send` must be a TcpHandle or PipeHandle handle, which is a server or a
305      * connection (listening or connected state). Bound sockets or pipes will be
306      * assumed to be servers.
307      *
308      * The handle takes the ownership of the data and it is in charge of delete
309      * them.
310      *
311      * A WriteEvent event will be emitted when the data have been written.<br/>
312      * An ErrorEvent wvent will be emitted in case of errors.
313      *
314      * @param send The handle over which to write data.
315      * @param data The data to be written to the stream.
316      * @param len The lenght of the submitted data.
317      */
318     template<typename S>
write(S & send,std::unique_ptr<char[]> data,unsigned int len)319     void write(S &send, std::unique_ptr<char[]> data, unsigned int len) {
320         auto req = this->loop().template resource<details::WriteReq>(
321                     std::unique_ptr<char[], details::WriteReq::Deleter>{
322                         data.release(), [](char *ptr) { delete[] ptr; }
323                     }, len);
324 
325         auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
326             ptr->publish(event);
327         };
328 
329         req->template once<ErrorEvent>(listener);
330         req->template once<WriteEvent>(listener);
331         req->write(this->template get<uv_stream_t>(), send.template get<uv_stream_t>());
332     }
333 
334     /**
335      * @brief Extended write function for sending handles over a pipe handle.
336      *
337      * The pipe must be initialized with `ipc == true`.
338      *
339      * `send` must be a TcpHandle or PipeHandle handle, which is a server or a
340      * connection (listening or connected state). Bound sockets or pipes will be
341      * assumed to be servers.
342      *
343      * The handle doesn't take the ownership of the data. Be sure that their
344      * lifetime overcome the one of the request.
345      *
346      * A WriteEvent event will be emitted when the data have been written.<br/>
347      * An ErrorEvent wvent will be emitted in case of errors.
348      *
349      * @param send The handle over which to write data.
350      * @param data The data to be written to the stream.
351      * @param len The lenght of the submitted data.
352      */
353     template<typename S>
write(S & send,char * data,unsigned int len)354     void write(S &send, char *data, unsigned int len) {
355         auto req = this->loop().template resource<details::WriteReq>(
356                     std::unique_ptr<char[], details::WriteReq::Deleter>{
357                         data, [](char *) {}
358                     }, len);
359 
360         auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
361             ptr->publish(event);
362         };
363 
364         req->template once<ErrorEvent>(listener);
365         req->template once<WriteEvent>(listener);
366         req->write(this->template get<uv_stream_t>(), send.template get<uv_stream_t>());
367     }
368 
369     /**
370      * @brief Queues a write request if it can be completed immediately.
371      *
372      * Same as `write()`, but won’t queue a write request if it can’t be
373      * completed immediately.<br/>
374      * An ErrorEvent event will be emitted in case of errors.
375      *
376      * @param data The data to be written to the stream.
377      * @param len The lenght of the submitted data.
378      * @return Number of bytes written.
379      */
tryWrite(std::unique_ptr<char[]> data,unsigned int len)380     int tryWrite(std::unique_ptr<char[]> data, unsigned int len) {
381         uv_buf_t bufs[] = { uv_buf_init(data.get(), len) };
382         auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
383 
384         if(bw < 0) {
385             this->publish(ErrorEvent{bw});
386             bw = 0;
387         }
388 
389         return bw;
390     }
391 
392     /**
393      * @brief Queues a write request if it can be completed immediately.
394      *
395      * Same as `write()`, but won’t queue a write request if it can’t be
396      * completed immediately.<br/>
397      * An ErrorEvent event will be emitted in case of errors.
398      *
399      * @param data The data to be written to the stream.
400      * @param len The lenght of the submitted data.
401      * @return Number of bytes written.
402      */
tryWrite(char * data,unsigned int len)403     int tryWrite(char *data, unsigned int len) {
404         uv_buf_t bufs[] = { uv_buf_init(data, len) };
405         auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
406 
407         if(bw < 0) {
408             this->publish(ErrorEvent{bw});
409             bw = 0;
410         }
411 
412         return bw;
413     }
414 
415     /**
416      * @brief Checks if the stream is readable.
417      * @return True if the stream is readable, false otherwise.
418      */
readable() const419     bool readable() const noexcept {
420         return (uv_is_readable(this->template get<uv_stream_t>()) == 1);
421     }
422 
423     /**
424      * @brief Checks if the stream is writable.
425      * @return True if the stream is writable, false otherwise.
426      */
writable() const427     bool writable() const noexcept {
428         return (uv_is_writable(this->template get<uv_stream_t>()) == 1);
429     }
430 
431     /**
432      * @brief Enables or disables blocking mode for a stream.
433      *
434      * When blocking mode is enabled all writes complete synchronously. The
435      * interface remains unchanged otherwise, e.g. completion or failure of the
436      * operation will still be reported through events which are emitted
437      * asynchronously.
438      *
439      * See the official
440      * [documentation](http://docs.libuv.org/en/v1.x/stream.html#c.uv_stream_set_blocking)
441      * for further details.
442      *
443      * @param enable True to enable blocking mode, false otherwise.
444      * @return True in case of success, false otherwise.
445      */
blocking(bool enable=false)446     bool blocking(bool enable = false) {
447         return (0 == uv_stream_set_blocking(this->template get<uv_stream_t>(), enable));
448     }
449 
450     /**
451      * @brief Gets the amount of queued bytes waiting to be sent.
452      * @return Amount of queued bytes waiting to be sent.
453      */
writeQueueSize() const454     size_t writeQueueSize() const noexcept {
455         return uv_stream_get_write_queue_size(this->template get<uv_stream_t>());
456     }
457 };
458 
459 
460 }
461