1 #pragma once 2 3 4 #include <algorithm> 5 #include <iterator> 6 #include <cstddef> 7 #include <utility> 8 #include <memory> 9 #include <uv.h> 10 #include "request.hpp" 11 #include "handle.hpp" 12 #include "loop.hpp" 13 14 15 namespace uvw { 16 17 18 /** 19 * @brief ConnectEvent event. 20 * 21 * It will be emitted by StreamHandle according with its functionalities. 22 */ 23 struct ConnectEvent {}; 24 25 26 /** 27 * @brief EndEvent event. 28 * 29 * It will be emitted by StreamHandle according with its functionalities. 30 */ 31 struct EndEvent {}; 32 33 34 /** 35 * @brief ListenEvent event. 36 * 37 * It will be emitted by StreamHandle according with its functionalities. 38 */ 39 struct ListenEvent {}; 40 41 42 /** 43 * @brief ShutdownEvent event. 44 * 45 * It will be emitted by StreamHandle according with its functionalities. 46 */ 47 struct ShutdownEvent {}; 48 49 50 /** 51 * @brief WriteEvent event. 52 * 53 * It will be emitted by StreamHandle according with its functionalities. 54 */ 55 struct WriteEvent {}; 56 57 58 /** 59 * @brief DataEvent event. 60 * 61 * It will be emitted by StreamHandle according with its functionalities. 62 */ 63 struct DataEvent { DataEventuvw::DataEvent64 explicit DataEvent(std::unique_ptr<char[]> buf, std::size_t len) noexcept 65 : data{std::move(buf)}, length{len} 66 {} 67 68 std::unique_ptr<char[]> data; /*!< A bunch of data read on the stream. */ 69 std::size_t length; /*!< The amount of data read on the stream. */ 70 }; 71 72 73 namespace details { 74 75 76 struct ConnectReq final: public Request<ConnectReq, uv_connect_t> { 77 using Request::Request; 78 79 template<typename F, typename... Args> connectuvw::details::ConnectReq80 void connect(F &&f, Args&&... args) { 81 invoke(std::forward<F>(f), get(), std::forward<Args>(args)..., &defaultCallback<ConnectEvent>); 82 } 83 }; 84 85 86 struct ShutdownReq final: public Request<ShutdownReq, uv_shutdown_t> { 87 using Request::Request; 88 shutdownuvw::details::ShutdownReq89 void shutdown(uv_stream_t *handle) { 90 invoke(&uv_shutdown, get(), handle, &defaultCallback<ShutdownEvent>); 91 } 92 }; 93 94 95 class WriteReq final: public Request<WriteReq, uv_write_t> { 96 public: 97 using Deleter = void(*)(char *); 98 WriteReq(ConstructorAccess ca,std::shared_ptr<Loop> loop,std::unique_ptr<char[],Deleter> dt,unsigned int len)99 WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<char[], Deleter> dt, unsigned int len) 100 : Request<WriteReq, uv_write_t>{ca, std::move(loop)}, 101 data{std::move(dt)}, 102 buf{uv_buf_init(data.get(), len)} 103 {} 104 write(uv_stream_t * handle)105 void write(uv_stream_t *handle) { 106 invoke(&uv_write, get(), handle, &buf, 1, &defaultCallback<WriteEvent>); 107 } 108 write(uv_stream_t * handle,uv_stream_t * send)109 void write(uv_stream_t *handle, uv_stream_t *send) { 110 invoke(&uv_write2, get(), handle, &buf, 1, send, &defaultCallback<WriteEvent>); 111 } 112 113 private: 114 std::unique_ptr<char[], Deleter> data; 115 uv_buf_t buf; 116 }; 117 118 119 } 120 121 122 /** 123 * @brief The StreamHandle handle. 124 * 125 * Stream handles provide an abstraction of a duplex communication channel. 126 * StreamHandle is an intermediate type, `uvw` provides three stream 127 * implementations: TcpHandle, PipeHandle and TTYHandle. 128 */ 129 template<typename T, typename U> 130 class StreamHandle: public Handle<T, U> { 131 static constexpr unsigned int DEFAULT_BACKLOG = 128; 132 readCallback(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)133 static void readCallback(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { 134 T &ref = *(static_cast<T*>(handle->data)); 135 // data will be destroyed no matter of what the value of nread is 136 std::unique_ptr<char[]> data{buf->base}; 137 138 // nread == 0 is ignored (see http://docs.libuv.org/en/v1.x/stream.html) 139 // equivalent to EAGAIN/EWOULDBLOCK, it shouldn't be treated as an error 140 // for we don't have data to emit though, it's fine to suppress it 141 142 if(nread == UV_EOF) { 143 // end of stream 144 ref.publish(EndEvent{}); 145 } else if(nread > 0) { 146 // data available 147 ref.publish(DataEvent{std::move(data), static_cast<std::size_t>(nread)}); 148 } else if(nread < 0) { 149 // transmission error 150 ref.publish(ErrorEvent(nread)); 151 } 152 } 153 listenCallback(uv_stream_t * handle,int status)154 static void listenCallback(uv_stream_t *handle, int status) { 155 T &ref = *(static_cast<T*>(handle->data)); 156 if(status) { ref.publish(ErrorEvent{status}); } 157 else { ref.publish(ListenEvent{}); } 158 } 159 160 public: 161 #ifdef _MSC_VER StreamHandle(typename Handle<T,U>::ConstructorAccess ca,std::shared_ptr<Loop> ref)162 StreamHandle(typename Handle<T, U>::ConstructorAccess ca, std::shared_ptr<Loop> ref) 163 : Handle<T, U>{ca, std::move(ref)} 164 {} 165 #else 166 using Handle<T, U>::Handle; 167 #endif 168 169 /** 170 * @brief Shutdowns the outgoing (write) side of a duplex stream. 171 * 172 * It waits for pending write requests to complete. The handle should refer 173 * to a initialized stream.<br/> 174 * A ShutdownEvent event will be emitted after shutdown is complete. 175 */ shutdown()176 void shutdown() { 177 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) { 178 ptr->publish(event); 179 }; 180 181 auto shutdown = this->loop().template resource<details::ShutdownReq>(); 182 shutdown->template once<ErrorEvent>(listener); 183 shutdown->template once<ShutdownEvent>(listener); 184 shutdown->shutdown(this->template get<uv_stream_t>()); 185 } 186 187 /** 188 * @brief Starts listening for incoming connections. 189 * 190 * When a new incoming connection is received, a ListenEvent event is 191 * emitted.<br/> 192 * An ErrorEvent event will be emitted in case of errors. 193 * 194 * @param backlog Indicates the number of connections the kernel might 195 * queue, same as listen(2). 196 */ listen(int backlog=DEFAULT_BACKLOG)197 void listen(int backlog = DEFAULT_BACKLOG) { 198 this->invoke(&uv_listen, this->template get<uv_stream_t>(), backlog, &listenCallback); 199 } 200 201 /** 202 * @brief Accepts incoming connections. 203 * 204 * This call is used in conjunction with `listen()` to accept incoming 205 * connections. Call this function after receiving a ListenEvent event to 206 * accept the connection. Before calling this function, the submitted handle 207 * must be initialized.<br> 208 * An ErrorEvent event will be emitted in case of errors. 209 * 210 * When the ListenEvent event is emitted it is guaranteed that this 211 * function will complete successfully the first time. If you attempt to use 212 * it more than once, it may fail.<br/> 213 * It is suggested to only call this function once per ListenEvent event. 214 * 215 * @note 216 * Both the handles must be running on the same loop. 217 * 218 * @param ref An initialized handle to be used to accept the connection. 219 */ 220 template<typename S> accept(S & ref)221 void accept(S &ref) { 222 this->invoke(&uv_accept, this->template get<uv_stream_t>(), ref.template get<uv_stream_t>()); 223 } 224 225 /** 226 * @brief Starts reading data from an incoming stream. 227 * 228 * A DataEvent event will be emitted several times until there is no more 229 * data to read or `stop()` is called.<br/> 230 * An EndEvent event will be emitted when there is no more data to read. 231 */ read()232 void read() { 233 this->invoke(&uv_read_start, this->template get<uv_stream_t>(), &this->allocCallback, &readCallback); 234 } 235 236 /** 237 * @brief Stops reading data from the stream. 238 * 239 * This function is idempotent and may be safely called on a stopped stream. 240 */ stop()241 void stop() { 242 this->invoke(&uv_read_stop, this->template get<uv_stream_t>()); 243 } 244 245 /** 246 * @brief Writes data to the stream. 247 * 248 * Data are written in order. The handle takes the ownership of the data and 249 * it is in charge of delete them. 250 * 251 * A WriteEvent event will be emitted when the data have been written.<br/> 252 * An ErrorEvent event will be emitted in case of errors. 253 * 254 * @param data The data to be written to the stream. 255 * @param len The lenght of the submitted data. 256 */ write(std::unique_ptr<char[]> data,unsigned int len)257 void write(std::unique_ptr<char[]> data, unsigned int len) { 258 auto req = this->loop().template resource<details::WriteReq>( 259 std::unique_ptr<char[], details::WriteReq::Deleter>{ 260 data.release(), [](char *ptr) { delete[] ptr; } 261 }, len); 262 263 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) { 264 ptr->publish(event); 265 }; 266 267 req->template once<ErrorEvent>(listener); 268 req->template once<WriteEvent>(listener); 269 req->write(this->template get<uv_stream_t>()); 270 } 271 272 /** 273 * @brief Writes data to the stream. 274 * 275 * Data are written in order. The handle doesn't take the ownership of the 276 * data. Be sure that their lifetime overcome the one of the request. 277 * 278 * A WriteEvent event will be emitted when the data have been written.<br/> 279 * An ErrorEvent event will be emitted in case of errors. 280 * 281 * @param data The data to be written to the stream. 282 * @param len The lenght of the submitted data. 283 */ write(char * data,unsigned int len)284 void write(char *data, unsigned int len) { 285 auto req = this->loop().template resource<details::WriteReq>( 286 std::unique_ptr<char[], details::WriteReq::Deleter>{ 287 data, [](char *) {} 288 }, len); 289 290 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) { 291 ptr->publish(event); 292 }; 293 294 req->template once<ErrorEvent>(listener); 295 req->template once<WriteEvent>(listener); 296 req->write(this->template get<uv_stream_t>()); 297 } 298 299 /** 300 * @brief Extended write function for sending handles over a pipe handle. 301 * 302 * The pipe must be initialized with `ipc == true`. 303 * 304 * `send` must be a TcpHandle or PipeHandle handle, which is a server or a 305 * connection (listening or connected state). Bound sockets or pipes will be 306 * assumed to be servers. 307 * 308 * The handle takes the ownership of the data and it is in charge of delete 309 * them. 310 * 311 * A WriteEvent event will be emitted when the data have been written.<br/> 312 * An ErrorEvent wvent will be emitted in case of errors. 313 * 314 * @param send The handle over which to write data. 315 * @param data The data to be written to the stream. 316 * @param len The lenght of the submitted data. 317 */ 318 template<typename S> write(S & send,std::unique_ptr<char[]> data,unsigned int len)319 void write(S &send, std::unique_ptr<char[]> data, unsigned int len) { 320 auto req = this->loop().template resource<details::WriteReq>( 321 std::unique_ptr<char[], details::WriteReq::Deleter>{ 322 data.release(), [](char *ptr) { delete[] ptr; } 323 }, len); 324 325 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) { 326 ptr->publish(event); 327 }; 328 329 req->template once<ErrorEvent>(listener); 330 req->template once<WriteEvent>(listener); 331 req->write(this->template get<uv_stream_t>(), send.template get<uv_stream_t>()); 332 } 333 334 /** 335 * @brief Extended write function for sending handles over a pipe handle. 336 * 337 * The pipe must be initialized with `ipc == true`. 338 * 339 * `send` must be a TcpHandle or PipeHandle handle, which is a server or a 340 * connection (listening or connected state). Bound sockets or pipes will be 341 * assumed to be servers. 342 * 343 * The handle doesn't take the ownership of the data. Be sure that their 344 * lifetime overcome the one of the request. 345 * 346 * A WriteEvent event will be emitted when the data have been written.<br/> 347 * An ErrorEvent wvent will be emitted in case of errors. 348 * 349 * @param send The handle over which to write data. 350 * @param data The data to be written to the stream. 351 * @param len The lenght of the submitted data. 352 */ 353 template<typename S> write(S & send,char * data,unsigned int len)354 void write(S &send, char *data, unsigned int len) { 355 auto req = this->loop().template resource<details::WriteReq>( 356 std::unique_ptr<char[], details::WriteReq::Deleter>{ 357 data, [](char *) {} 358 }, len); 359 360 auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) { 361 ptr->publish(event); 362 }; 363 364 req->template once<ErrorEvent>(listener); 365 req->template once<WriteEvent>(listener); 366 req->write(this->template get<uv_stream_t>(), send.template get<uv_stream_t>()); 367 } 368 369 /** 370 * @brief Queues a write request if it can be completed immediately. 371 * 372 * Same as `write()`, but won’t queue a write request if it can’t be 373 * completed immediately.<br/> 374 * An ErrorEvent event will be emitted in case of errors. 375 * 376 * @param data The data to be written to the stream. 377 * @param len The lenght of the submitted data. 378 * @return Number of bytes written. 379 */ tryWrite(std::unique_ptr<char[]> data,unsigned int len)380 int tryWrite(std::unique_ptr<char[]> data, unsigned int len) { 381 uv_buf_t bufs[] = { uv_buf_init(data.get(), len) }; 382 auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1); 383 384 if(bw < 0) { 385 this->publish(ErrorEvent{bw}); 386 bw = 0; 387 } 388 389 return bw; 390 } 391 392 /** 393 * @brief Queues a write request if it can be completed immediately. 394 * 395 * Same as `write()`, but won’t queue a write request if it can’t be 396 * completed immediately.<br/> 397 * An ErrorEvent event will be emitted in case of errors. 398 * 399 * @param data The data to be written to the stream. 400 * @param len The lenght of the submitted data. 401 * @return Number of bytes written. 402 */ tryWrite(char * data,unsigned int len)403 int tryWrite(char *data, unsigned int len) { 404 uv_buf_t bufs[] = { uv_buf_init(data, len) }; 405 auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1); 406 407 if(bw < 0) { 408 this->publish(ErrorEvent{bw}); 409 bw = 0; 410 } 411 412 return bw; 413 } 414 415 /** 416 * @brief Checks if the stream is readable. 417 * @return True if the stream is readable, false otherwise. 418 */ readable() const419 bool readable() const noexcept { 420 return (uv_is_readable(this->template get<uv_stream_t>()) == 1); 421 } 422 423 /** 424 * @brief Checks if the stream is writable. 425 * @return True if the stream is writable, false otherwise. 426 */ writable() const427 bool writable() const noexcept { 428 return (uv_is_writable(this->template get<uv_stream_t>()) == 1); 429 } 430 431 /** 432 * @brief Enables or disables blocking mode for a stream. 433 * 434 * When blocking mode is enabled all writes complete synchronously. The 435 * interface remains unchanged otherwise, e.g. completion or failure of the 436 * operation will still be reported through events which are emitted 437 * asynchronously. 438 * 439 * See the official 440 * [documentation](http://docs.libuv.org/en/v1.x/stream.html#c.uv_stream_set_blocking) 441 * for further details. 442 * 443 * @param enable True to enable blocking mode, false otherwise. 444 * @return True in case of success, false otherwise. 445 */ blocking(bool enable=false)446 bool blocking(bool enable = false) { 447 return (0 == uv_stream_set_blocking(this->template get<uv_stream_t>(), enable)); 448 } 449 450 /** 451 * @brief Gets the amount of queued bytes waiting to be sent. 452 * @return Amount of queued bytes waiting to be sent. 453 */ writeQueueSize() const454 size_t writeQueueSize() const noexcept { 455 return uv_stream_get_write_queue_size(this->template get<uv_stream_t>()); 456 } 457 }; 458 459 460 } 461