1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "internal.h"
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
29 #include <errno.h>
30 
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <sys/uio.h>
34 #include <sys/un.h>
35 #include <unistd.h>
36 #include <limits.h> /* IOV_MAX */
37 
38 #if defined(__APPLE__)
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/select.h>
42 
43 /* Forward declaration */
44 typedef struct uv__stream_select_s uv__stream_select_t;
45 
46 struct uv__stream_select_s {
47   uv_stream_t* stream;
48   uv_thread_t thread;
49   uv_sem_t close_sem;
50   uv_sem_t async_sem;
51   uv_async_t async;
52   int events;
53   int fake_fd;
54   int int_fd;
55   int fd;
56   fd_set* sread;
57   size_t sread_sz;
58   fd_set* swrite;
59   size_t swrite_sz;
60 };
61 
62 /* Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
63  * EPROTOTYPE can be returned while trying to write to a socket that is
64  * shutting down. If we retry the write, we should get the expected EPIPE
65  * instead.
66  */
67 # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
68 # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
69     (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
70      (errno == EMSGSIZE && send_handle != NULL))
71 #else
72 # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
73 # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
74     (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
75 #endif /* defined(__APPLE__) */
76 
77 static void uv__stream_connect(uv_stream_t*);
78 static void uv__write(uv_stream_t* stream);
79 static void uv__read(uv_stream_t* stream);
80 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
81 static void uv__write_callbacks(uv_stream_t* stream);
82 static size_t uv__write_req_size(uv_write_t* req);
83 
84 
uv__stream_init(uv_loop_t * loop,uv_stream_t * stream,uv_handle_type type)85 void uv__stream_init(uv_loop_t* loop,
86                      uv_stream_t* stream,
87                      uv_handle_type type) {
88   int err;
89 
90   uv__handle_init(loop, (uv_handle_t*)stream, type);
91   stream->read_cb = NULL;
92   stream->alloc_cb = NULL;
93   stream->close_cb = NULL;
94   stream->connection_cb = NULL;
95   stream->connect_req = NULL;
96   stream->shutdown_req = NULL;
97   stream->accepted_fd = -1;
98   stream->queued_fds = NULL;
99   stream->delayed_error = 0;
100   QUEUE_INIT(&stream->write_queue);
101   QUEUE_INIT(&stream->write_completed_queue);
102   stream->write_queue_size = 0;
103 
104   if (loop->emfile_fd == -1) {
105     err = uv__open_cloexec("/dev/null", O_RDONLY);
106     if (err < 0)
107         /* In the rare case that "/dev/null" isn't mounted open "/"
108          * instead.
109          */
110         err = uv__open_cloexec("/", O_RDONLY);
111     if (err >= 0)
112       loop->emfile_fd = err;
113   }
114 
115 #if defined(__APPLE__)
116   stream->select = NULL;
117 #endif /* defined(__APPLE_) */
118 
119   uv__io_init(&stream->io_watcher, uv__stream_io, -1);
120 }
121 
122 
uv__stream_osx_interrupt_select(uv_stream_t * stream)123 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
124 #if defined(__APPLE__)
125   /* Notify select() thread about state change */
126   uv__stream_select_t* s;
127   int r;
128 
129   s = stream->select;
130   if (s == NULL)
131     return;
132 
133   /* Interrupt select() loop
134    * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
135    * emit read event on other side
136    */
137   do
138     r = write(s->fake_fd, "x", 1);
139   while (r == -1 && errno == EINTR);
140 
141   assert(r == 1);
142 #else  /* !defined(__APPLE__) */
143   /* No-op on any other platform */
144 #endif  /* !defined(__APPLE__) */
145 }
146 
147 
148 #if defined(__APPLE__)
uv__stream_osx_select(void * arg)149 static void uv__stream_osx_select(void* arg) {
150   uv_stream_t* stream;
151   uv__stream_select_t* s;
152   char buf[1024];
153   int events;
154   int fd;
155   int r;
156   int max_fd;
157 
158   stream = arg;
159   s = stream->select;
160   fd = s->fd;
161 
162   if (fd > s->int_fd)
163     max_fd = fd;
164   else
165     max_fd = s->int_fd;
166 
167   while (1) {
168     /* Terminate on semaphore */
169     if (uv_sem_trywait(&s->close_sem) == 0)
170       break;
171 
172     /* Watch fd using select(2) */
173     memset(s->sread, 0, s->sread_sz);
174     memset(s->swrite, 0, s->swrite_sz);
175 
176     if (uv__io_active(&stream->io_watcher, POLLIN))
177       FD_SET(fd, s->sread);
178     if (uv__io_active(&stream->io_watcher, POLLOUT))
179       FD_SET(fd, s->swrite);
180     FD_SET(s->int_fd, s->sread);
181 
182     /* Wait indefinitely for fd events */
183     r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
184     if (r == -1) {
185       if (errno == EINTR)
186         continue;
187 
188       /* XXX: Possible?! */
189       abort();
190     }
191 
192     /* Ignore timeouts */
193     if (r == 0)
194       continue;
195 
196     /* Empty socketpair's buffer in case of interruption */
197     if (FD_ISSET(s->int_fd, s->sread))
198       while (1) {
199         r = read(s->int_fd, buf, sizeof(buf));
200 
201         if (r == sizeof(buf))
202           continue;
203 
204         if (r != -1)
205           break;
206 
207         if (errno == EAGAIN || errno == EWOULDBLOCK)
208           break;
209 
210         if (errno == EINTR)
211           continue;
212 
213         abort();
214       }
215 
216     /* Handle events */
217     events = 0;
218     if (FD_ISSET(fd, s->sread))
219       events |= POLLIN;
220     if (FD_ISSET(fd, s->swrite))
221       events |= POLLOUT;
222 
223     assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
224     if (events != 0) {
225       ACCESS_ONCE(int, s->events) = events;
226 
227       uv_async_send(&s->async);
228       uv_sem_wait(&s->async_sem);
229 
230       /* Should be processed at this stage */
231       assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
232     }
233   }
234 }
235 
236 
uv__stream_osx_select_cb(uv_async_t * handle)237 static void uv__stream_osx_select_cb(uv_async_t* handle) {
238   uv__stream_select_t* s;
239   uv_stream_t* stream;
240   int events;
241 
242   s = container_of(handle, uv__stream_select_t, async);
243   stream = s->stream;
244 
245   /* Get and reset stream's events */
246   events = s->events;
247   ACCESS_ONCE(int, s->events) = 0;
248 
249   assert(events != 0);
250   assert(events == (events & (POLLIN | POLLOUT)));
251 
252   /* Invoke callback on event-loop */
253   if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
254     uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
255 
256   if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
257     uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
258 
259   if (stream->flags & UV_HANDLE_CLOSING)
260     return;
261 
262   /* NOTE: It is important to do it here, otherwise `select()` might be called
263    * before the actual `uv__read()`, leading to the blocking syscall
264    */
265   uv_sem_post(&s->async_sem);
266 }
267 
268 
uv__stream_osx_cb_close(uv_handle_t * async)269 static void uv__stream_osx_cb_close(uv_handle_t* async) {
270   uv__stream_select_t* s;
271 
272   s = container_of(async, uv__stream_select_t, async);
273   uv__free(s);
274 }
275 
276 
uv__stream_try_select(uv_stream_t * stream,int * fd)277 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
278   /*
279    * kqueue doesn't work with some files from /dev mount on osx.
280    * select(2) in separate thread for those fds
281    */
282 
283   struct kevent filter[1];
284   struct kevent events[1];
285   struct timespec timeout;
286   uv__stream_select_t* s;
287   int fds[2];
288   int err;
289   int ret;
290   int kq;
291   int old_fd;
292   int max_fd;
293   size_t sread_sz;
294   size_t swrite_sz;
295 
296   kq = kqueue();
297   if (kq == -1) {
298     perror("(libuv) kqueue()");
299     return UV__ERR(errno);
300   }
301 
302   EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
303 
304   /* Use small timeout, because we only want to capture EINVALs */
305   timeout.tv_sec = 0;
306   timeout.tv_nsec = 1;
307 
308   do
309     ret = kevent(kq, filter, 1, events, 1, &timeout);
310   while (ret == -1 && errno == EINTR);
311 
312   uv__close(kq);
313 
314   if (ret == -1)
315     return UV__ERR(errno);
316 
317   if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
318     return 0;
319 
320   /* At this point we definitely know that this fd won't work with kqueue */
321 
322   /*
323    * Create fds for io watcher and to interrupt the select() loop.
324    * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
325    */
326   if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
327     return UV__ERR(errno);
328 
329   max_fd = *fd;
330   if (fds[1] > max_fd)
331     max_fd = fds[1];
332 
333   sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
334   swrite_sz = sread_sz;
335 
336   s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
337   if (s == NULL) {
338     err = UV_ENOMEM;
339     goto failed_malloc;
340   }
341 
342   s->events = 0;
343   s->fd = *fd;
344   s->sread = (fd_set*) ((char*) s + sizeof(*s));
345   s->sread_sz = sread_sz;
346   s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
347   s->swrite_sz = swrite_sz;
348 
349   err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
350   if (err)
351     goto failed_async_init;
352 
353   s->async.flags |= UV_HANDLE_INTERNAL;
354   uv__handle_unref(&s->async);
355 
356   err = uv_sem_init(&s->close_sem, 0);
357   if (err != 0)
358     goto failed_close_sem_init;
359 
360   err = uv_sem_init(&s->async_sem, 0);
361   if (err != 0)
362     goto failed_async_sem_init;
363 
364   s->fake_fd = fds[0];
365   s->int_fd = fds[1];
366 
367   old_fd = *fd;
368   s->stream = stream;
369   stream->select = s;
370   *fd = s->fake_fd;
371 
372   err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
373   if (err != 0)
374     goto failed_thread_create;
375 
376   return 0;
377 
378 failed_thread_create:
379   s->stream = NULL;
380   stream->select = NULL;
381   *fd = old_fd;
382 
383   uv_sem_destroy(&s->async_sem);
384 
385 failed_async_sem_init:
386   uv_sem_destroy(&s->close_sem);
387 
388 failed_close_sem_init:
389   uv__close(fds[0]);
390   uv__close(fds[1]);
391   uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
392   return err;
393 
394 failed_async_init:
395   uv__free(s);
396 
397 failed_malloc:
398   uv__close(fds[0]);
399   uv__close(fds[1]);
400 
401   return err;
402 }
403 #endif /* defined(__APPLE__) */
404 
405 
uv__stream_open(uv_stream_t * stream,int fd,int flags)406 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
407 #if defined(__APPLE__)
408   int enable;
409 #endif
410 
411   if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
412     return UV_EBUSY;
413 
414   assert(fd >= 0);
415   stream->flags |= flags;
416 
417   if (stream->type == UV_TCP) {
418     if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
419       return UV__ERR(errno);
420 
421     /* TODO Use delay the user passed in. */
422     if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
423         uv__tcp_keepalive(fd, 1, 60)) {
424       return UV__ERR(errno);
425     }
426   }
427 
428 #if defined(__APPLE__)
429   enable = 1;
430   if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
431       errno != ENOTSOCK &&
432       errno != EINVAL) {
433     return UV__ERR(errno);
434   }
435 #endif
436 
437   stream->io_watcher.fd = fd;
438 
439   return 0;
440 }
441 
442 
uv__stream_flush_write_queue(uv_stream_t * stream,int error)443 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
444   uv_write_t* req;
445   QUEUE* q;
446   while (!QUEUE_EMPTY(&stream->write_queue)) {
447     q = QUEUE_HEAD(&stream->write_queue);
448     QUEUE_REMOVE(q);
449 
450     req = QUEUE_DATA(q, uv_write_t, queue);
451     req->error = error;
452 
453     QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
454   }
455 }
456 
457 
uv__stream_destroy(uv_stream_t * stream)458 void uv__stream_destroy(uv_stream_t* stream) {
459   assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
460   assert(stream->flags & UV_HANDLE_CLOSED);
461 
462   if (stream->connect_req) {
463     uv__req_unregister(stream->loop, stream->connect_req);
464     stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
465     stream->connect_req = NULL;
466   }
467 
468   uv__stream_flush_write_queue(stream, UV_ECANCELED);
469   uv__write_callbacks(stream);
470 
471   if (stream->shutdown_req) {
472     /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
473      * fait accompli at this point. Maybe we should revisit this in v0.11.
474      * A possible reason for leaving it unchanged is that it informs the
475      * callee that the handle has been destroyed.
476      */
477     uv__req_unregister(stream->loop, stream->shutdown_req);
478     stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
479     stream->shutdown_req = NULL;
480   }
481 
482   assert(stream->write_queue_size == 0);
483 }
484 
485 
486 /* Implements a best effort approach to mitigating accept() EMFILE errors.
487  * We have a spare file descriptor stashed away that we close to get below
488  * the EMFILE limit. Next, we accept all pending connections and close them
489  * immediately to signal the clients that we're overloaded - and we are, but
490  * we still keep on trucking.
491  *
492  * There is one caveat: it's not reliable in a multi-threaded environment.
493  * The file descriptor limit is per process. Our party trick fails if another
494  * thread opens a file or creates a socket in the time window between us
495  * calling close() and accept().
496  */
uv__emfile_trick(uv_loop_t * loop,int accept_fd)497 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
498   int err;
499   int emfile_fd;
500 
501   if (loop->emfile_fd == -1)
502     return UV_EMFILE;
503 
504   uv__close(loop->emfile_fd);
505   loop->emfile_fd = -1;
506 
507   do {
508     err = uv__accept(accept_fd);
509     if (err >= 0)
510       uv__close(err);
511   } while (err >= 0 || err == UV_EINTR);
512 
513   emfile_fd = uv__open_cloexec("/", O_RDONLY);
514   if (emfile_fd >= 0)
515     loop->emfile_fd = emfile_fd;
516 
517   return err;
518 }
519 
520 
521 #if defined(UV_HAVE_KQUEUE)
522 # define UV_DEC_BACKLOG(w) w->rcount--;
523 #else
524 # define UV_DEC_BACKLOG(w) /* no-op */
525 #endif /* defined(UV_HAVE_KQUEUE) */
526 
527 
uv__server_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)528 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
529   uv_stream_t* stream;
530   int err;
531 
532   stream = container_of(w, uv_stream_t, io_watcher);
533   assert(events & POLLIN);
534   assert(stream->accepted_fd == -1);
535   assert(!(stream->flags & UV_HANDLE_CLOSING));
536 
537   uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
538 
539   /* connection_cb can close the server socket while we're
540    * in the loop so check it on each iteration.
541    */
542   while (uv__stream_fd(stream) != -1) {
543     assert(stream->accepted_fd == -1);
544 
545 #if defined(UV_HAVE_KQUEUE)
546     if (w->rcount <= 0)
547       return;
548 #endif /* defined(UV_HAVE_KQUEUE) */
549 
550     err = uv__accept(uv__stream_fd(stream));
551     if (err < 0) {
552       if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
553         return;  /* Not an error. */
554 
555       if (err == UV_ECONNABORTED)
556         continue;  /* Ignore. Nothing we can do about that. */
557 
558       if (err == UV_EMFILE || err == UV_ENFILE) {
559         err = uv__emfile_trick(loop, uv__stream_fd(stream));
560         if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
561           break;
562       }
563 
564       stream->connection_cb(stream, err);
565       continue;
566     }
567 
568     UV_DEC_BACKLOG(w)
569     stream->accepted_fd = err;
570     stream->connection_cb(stream, 0);
571 
572     if (stream->accepted_fd != -1) {
573       /* The user hasn't yet accepted called uv_accept() */
574       uv__io_stop(loop, &stream->io_watcher, POLLIN);
575       return;
576     }
577 
578     if (stream->type == UV_TCP &&
579         (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
580       /* Give other processes a chance to accept connections. */
581       struct timespec timeout = { 0, 1 };
582       nanosleep(&timeout, NULL);
583     }
584   }
585 }
586 
587 
588 #undef UV_DEC_BACKLOG
589 
590 
uv_accept(uv_stream_t * server,uv_stream_t * client)591 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
592   int err;
593 
594   assert(server->loop == client->loop);
595 
596   if (server->accepted_fd == -1)
597     return UV_EAGAIN;
598 
599   switch (client->type) {
600     case UV_NAMED_PIPE:
601     case UV_TCP:
602       err = uv__stream_open(client,
603                             server->accepted_fd,
604                             UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
605       if (err) {
606         /* TODO handle error */
607         uv__close(server->accepted_fd);
608         goto done;
609       }
610       break;
611 
612     case UV_UDP:
613       err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
614       if (err) {
615         uv__close(server->accepted_fd);
616         goto done;
617       }
618       break;
619 
620     default:
621       return UV_EINVAL;
622   }
623 
624   client->flags |= UV_HANDLE_BOUND;
625 
626 done:
627   /* Process queued fds */
628   if (server->queued_fds != NULL) {
629     uv__stream_queued_fds_t* queued_fds;
630 
631     queued_fds = server->queued_fds;
632 
633     /* Read first */
634     server->accepted_fd = queued_fds->fds[0];
635 
636     /* All read, free */
637     assert(queued_fds->offset > 0);
638     if (--queued_fds->offset == 0) {
639       uv__free(queued_fds);
640       server->queued_fds = NULL;
641     } else {
642       /* Shift rest */
643       memmove(queued_fds->fds,
644               queued_fds->fds + 1,
645               queued_fds->offset * sizeof(*queued_fds->fds));
646     }
647   } else {
648     server->accepted_fd = -1;
649     if (err == 0)
650       uv__io_start(server->loop, &server->io_watcher, POLLIN);
651   }
652   return err;
653 }
654 
655 
uv_listen(uv_stream_t * stream,int backlog,uv_connection_cb cb)656 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
657   int err;
658 
659   switch (stream->type) {
660   case UV_TCP:
661     err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
662     break;
663 
664   case UV_NAMED_PIPE:
665     err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
666     break;
667 
668   default:
669     err = UV_EINVAL;
670   }
671 
672   if (err == 0)
673     uv__handle_start(stream);
674 
675   return err;
676 }
677 
678 
uv__drain(uv_stream_t * stream)679 static void uv__drain(uv_stream_t* stream) {
680   uv_shutdown_t* req;
681   int err;
682 
683   assert(QUEUE_EMPTY(&stream->write_queue));
684   uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
685   uv__stream_osx_interrupt_select(stream);
686 
687   /* Shutdown? */
688   if ((stream->flags & UV_HANDLE_SHUTTING) &&
689       !(stream->flags & UV_HANDLE_CLOSING) &&
690       !(stream->flags & UV_HANDLE_SHUT)) {
691     assert(stream->shutdown_req);
692 
693     req = stream->shutdown_req;
694     stream->shutdown_req = NULL;
695     stream->flags &= ~UV_HANDLE_SHUTTING;
696     uv__req_unregister(stream->loop, req);
697 
698     err = 0;
699     if (shutdown(uv__stream_fd(stream), SHUT_WR))
700       err = UV__ERR(errno);
701 
702     if (err == 0)
703       stream->flags |= UV_HANDLE_SHUT;
704 
705     if (req->cb != NULL)
706       req->cb(req, err);
707   }
708 }
709 
710 
uv__writev(int fd,struct iovec * vec,size_t n)711 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
712   if (n == 1)
713     return write(fd, vec->iov_base, vec->iov_len);
714   else
715     return writev(fd, vec, n);
716 }
717 
718 
uv__write_req_size(uv_write_t * req)719 static size_t uv__write_req_size(uv_write_t* req) {
720   size_t size;
721 
722   assert(req->bufs != NULL);
723   size = uv__count_bufs(req->bufs + req->write_index,
724                         req->nbufs - req->write_index);
725   assert(req->handle->write_queue_size >= size);
726 
727   return size;
728 }
729 
730 
731 /* Returns 1 if all write request data has been written, or 0 if there is still
732  * more data to write.
733  *
734  * Note: the return value only says something about the *current* request.
735  * There may still be other write requests sitting in the queue.
736  */
uv__write_req_update(uv_stream_t * stream,uv_write_t * req,size_t n)737 static int uv__write_req_update(uv_stream_t* stream,
738                                 uv_write_t* req,
739                                 size_t n) {
740   uv_buf_t* buf;
741   size_t len;
742 
743   assert(n <= stream->write_queue_size);
744   stream->write_queue_size -= n;
745 
746   buf = req->bufs + req->write_index;
747 
748   do {
749     len = n < buf->len ? n : buf->len;
750     buf->base += len;
751     buf->len -= len;
752     buf += (buf->len == 0);  /* Advance to next buffer if this one is empty. */
753     n -= len;
754   } while (n > 0);
755 
756   req->write_index = buf - req->bufs;
757 
758   return req->write_index == req->nbufs;
759 }
760 
761 
uv__write_req_finish(uv_write_t * req)762 static void uv__write_req_finish(uv_write_t* req) {
763   uv_stream_t* stream = req->handle;
764 
765   /* Pop the req off tcp->write_queue. */
766   QUEUE_REMOVE(&req->queue);
767 
768   /* Only free when there was no error. On error, we touch up write_queue_size
769    * right before making the callback. The reason we don't do that right away
770    * is that a write_queue_size > 0 is our only way to signal to the user that
771    * they should stop writing - which they should if we got an error. Something
772    * to revisit in future revisions of the libuv API.
773    */
774   if (req->error == 0) {
775     if (req->bufs != req->bufsml)
776       uv__free(req->bufs);
777     req->bufs = NULL;
778   }
779 
780   /* Add it to the write_completed_queue where it will have its
781    * callback called in the near future.
782    */
783   QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
784   uv__io_feed(stream->loop, &stream->io_watcher);
785 }
786 
787 
uv__handle_fd(uv_handle_t * handle)788 static int uv__handle_fd(uv_handle_t* handle) {
789   switch (handle->type) {
790     case UV_NAMED_PIPE:
791     case UV_TCP:
792       return ((uv_stream_t*) handle)->io_watcher.fd;
793 
794     case UV_UDP:
795       return ((uv_udp_t*) handle)->io_watcher.fd;
796 
797     default:
798       return -1;
799   }
800 }
801 
uv__write(uv_stream_t * stream)802 static void uv__write(uv_stream_t* stream) {
803   struct iovec* iov;
804   QUEUE* q;
805   uv_write_t* req;
806   int iovmax;
807   int iovcnt;
808   ssize_t n;
809   int err;
810 
811 start:
812 
813   assert(uv__stream_fd(stream) >= 0);
814 
815   if (QUEUE_EMPTY(&stream->write_queue))
816     return;
817 
818   q = QUEUE_HEAD(&stream->write_queue);
819   req = QUEUE_DATA(q, uv_write_t, queue);
820   assert(req->handle == stream);
821 
822   /*
823    * Cast to iovec. We had to have our own uv_buf_t instead of iovec
824    * because Windows's WSABUF is not an iovec.
825    */
826   assert(sizeof(uv_buf_t) == sizeof(struct iovec));
827   iov = (struct iovec*) &(req->bufs[req->write_index]);
828   iovcnt = req->nbufs - req->write_index;
829 
830   iovmax = uv__getiovmax();
831 
832   /* Limit iov count to avoid EINVALs from writev() */
833   if (iovcnt > iovmax)
834     iovcnt = iovmax;
835 
836   /*
837    * Now do the actual writev. Note that we've been updating the pointers
838    * inside the iov each time we write. So there is no need to offset it.
839    */
840 
841   if (req->send_handle) {
842     int fd_to_send;
843     struct msghdr msg;
844     struct cmsghdr *cmsg;
845     union {
846       char data[64];
847       struct cmsghdr alias;
848     } scratch;
849 
850     if (uv__is_closing(req->send_handle)) {
851       err = UV_EBADF;
852       goto error;
853     }
854 
855     fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
856 
857     memset(&scratch, 0, sizeof(scratch));
858 
859     assert(fd_to_send >= 0);
860 
861     msg.msg_name = NULL;
862     msg.msg_namelen = 0;
863     msg.msg_iov = iov;
864     msg.msg_iovlen = iovcnt;
865     msg.msg_flags = 0;
866 
867     msg.msg_control = &scratch.alias;
868     msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
869 
870     cmsg = CMSG_FIRSTHDR(&msg);
871     cmsg->cmsg_level = SOL_SOCKET;
872     cmsg->cmsg_type = SCM_RIGHTS;
873     cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
874 
875     /* silence aliasing warning */
876     {
877       void* pv = CMSG_DATA(cmsg);
878       int* pi = pv;
879       *pi = fd_to_send;
880     }
881 
882     do
883       n = sendmsg(uv__stream_fd(stream), &msg, 0);
884     while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
885 
886     /* Ensure the handle isn't sent again in case this is a partial write. */
887     if (n >= 0)
888       req->send_handle = NULL;
889   } else {
890     do
891       n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
892     while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
893   }
894 
895   if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
896     err = UV__ERR(errno);
897     goto error;
898   }
899 
900   if (n >= 0 && uv__write_req_update(stream, req, n)) {
901     uv__write_req_finish(req);
902     return;  /* TODO(bnoordhuis) Start trying to write the next request. */
903   }
904 
905   /* If this is a blocking stream, try again. */
906   if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
907     goto start;
908 
909   /* We're not done. */
910   uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
911 
912   /* Notify select() thread about state change */
913   uv__stream_osx_interrupt_select(stream);
914 
915   return;
916 
917 error:
918   req->error = err;
919   uv__write_req_finish(req);
920   uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
921   if (!uv__io_active(&stream->io_watcher, POLLIN))
922     uv__handle_stop(stream);
923   uv__stream_osx_interrupt_select(stream);
924 }
925 
926 
uv__write_callbacks(uv_stream_t * stream)927 static void uv__write_callbacks(uv_stream_t* stream) {
928   uv_write_t* req;
929   QUEUE* q;
930   QUEUE pq;
931 
932   if (QUEUE_EMPTY(&stream->write_completed_queue))
933     return;
934 
935   QUEUE_MOVE(&stream->write_completed_queue, &pq);
936 
937   while (!QUEUE_EMPTY(&pq)) {
938     /* Pop a req off write_completed_queue. */
939     q = QUEUE_HEAD(&pq);
940     req = QUEUE_DATA(q, uv_write_t, queue);
941     QUEUE_REMOVE(q);
942     uv__req_unregister(stream->loop, req);
943 
944     if (req->bufs != NULL) {
945       stream->write_queue_size -= uv__write_req_size(req);
946       if (req->bufs != req->bufsml)
947         uv__free(req->bufs);
948       req->bufs = NULL;
949     }
950 
951     /* NOTE: call callback AFTER freeing the request data. */
952     if (req->cb)
953       req->cb(req, req->error);
954   }
955 }
956 
957 
uv__handle_type(int fd)958 uv_handle_type uv__handle_type(int fd) {
959   struct sockaddr_storage ss;
960   socklen_t sslen;
961   socklen_t len;
962   int type;
963 
964   memset(&ss, 0, sizeof(ss));
965   sslen = sizeof(ss);
966 
967   if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
968     return UV_UNKNOWN_HANDLE;
969 
970   len = sizeof type;
971 
972   if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
973     return UV_UNKNOWN_HANDLE;
974 
975   if (type == SOCK_STREAM) {
976 #if defined(_AIX) || defined(__DragonFly__)
977     /* on AIX/DragonFly the getsockname call returns an empty sa structure
978      * for sockets of type AF_UNIX.  For all other types it will
979      * return a properly filled in structure.
980      */
981     if (sslen == 0)
982       return UV_NAMED_PIPE;
983 #endif
984     switch (ss.ss_family) {
985       case AF_UNIX:
986         return UV_NAMED_PIPE;
987       case AF_INET:
988       case AF_INET6:
989         return UV_TCP;
990       }
991   }
992 
993   if (type == SOCK_DGRAM &&
994       (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
995     return UV_UDP;
996 
997   return UV_UNKNOWN_HANDLE;
998 }
999 
1000 
uv__stream_eof(uv_stream_t * stream,const uv_buf_t * buf)1001 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
1002   stream->flags |= UV_HANDLE_READ_EOF;
1003   stream->flags &= ~UV_HANDLE_READING;
1004   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1005   if (!uv__io_active(&stream->io_watcher, POLLOUT))
1006     uv__handle_stop(stream);
1007   uv__stream_osx_interrupt_select(stream);
1008   stream->read_cb(stream, UV_EOF, buf);
1009 }
1010 
1011 
uv__stream_queue_fd(uv_stream_t * stream,int fd)1012 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
1013   uv__stream_queued_fds_t* queued_fds;
1014   unsigned int queue_size;
1015 
1016   queued_fds = stream->queued_fds;
1017   if (queued_fds == NULL) {
1018     queue_size = 8;
1019     queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
1020                             sizeof(*queued_fds));
1021     if (queued_fds == NULL)
1022       return UV_ENOMEM;
1023     queued_fds->size = queue_size;
1024     queued_fds->offset = 0;
1025     stream->queued_fds = queued_fds;
1026 
1027     /* Grow */
1028   } else if (queued_fds->size == queued_fds->offset) {
1029     queue_size = queued_fds->size + 8;
1030     queued_fds = uv__realloc(queued_fds,
1031                              (queue_size - 1) * sizeof(*queued_fds->fds) +
1032                               sizeof(*queued_fds));
1033 
1034     /*
1035      * Allocation failure, report back.
1036      * NOTE: if it is fatal - sockets will be closed in uv__stream_close
1037      */
1038     if (queued_fds == NULL)
1039       return UV_ENOMEM;
1040     queued_fds->size = queue_size;
1041     stream->queued_fds = queued_fds;
1042   }
1043 
1044   /* Put fd in a queue */
1045   queued_fds->fds[queued_fds->offset++] = fd;
1046 
1047   return 0;
1048 }
1049 
1050 
1051 #if defined(__PASE__)
1052 /* on IBMi PASE the control message length can not exceed 256. */
1053 # define UV__CMSG_FD_COUNT 60
1054 #else
1055 # define UV__CMSG_FD_COUNT 64
1056 #endif
1057 #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1058 
1059 
uv__stream_recv_cmsg(uv_stream_t * stream,struct msghdr * msg)1060 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1061   struct cmsghdr* cmsg;
1062 
1063   for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1064     char* start;
1065     char* end;
1066     int err;
1067     void* pv;
1068     int* pi;
1069     unsigned int i;
1070     unsigned int count;
1071 
1072     if (cmsg->cmsg_type != SCM_RIGHTS) {
1073       fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1074           cmsg->cmsg_type);
1075       continue;
1076     }
1077 
1078     /* silence aliasing warning */
1079     pv = CMSG_DATA(cmsg);
1080     pi = pv;
1081 
1082     /* Count available fds */
1083     start = (char*) cmsg;
1084     end = (char*) cmsg + cmsg->cmsg_len;
1085     count = 0;
1086     while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1087       count++;
1088     assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1089 
1090     for (i = 0; i < count; i++) {
1091       /* Already has accepted fd, queue now */
1092       if (stream->accepted_fd != -1) {
1093         err = uv__stream_queue_fd(stream, pi[i]);
1094         if (err != 0) {
1095           /* Close rest */
1096           for (; i < count; i++)
1097             uv__close(pi[i]);
1098           return err;
1099         }
1100       } else {
1101         stream->accepted_fd = pi[i];
1102       }
1103     }
1104   }
1105 
1106   return 0;
1107 }
1108 
1109 
1110 #ifdef __clang__
1111 # pragma clang diagnostic push
1112 # pragma clang diagnostic ignored "-Wgnu-folding-constant"
1113 # pragma clang diagnostic ignored "-Wvla-extension"
1114 #endif
1115 
uv__read(uv_stream_t * stream)1116 static void uv__read(uv_stream_t* stream) {
1117   uv_buf_t buf;
1118   ssize_t nread;
1119   struct msghdr msg;
1120   char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1121   int count;
1122   int err;
1123   int is_ipc;
1124 
1125   stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1126 
1127   /* Prevent loop starvation when the data comes in as fast as (or faster than)
1128    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1129    */
1130   count = 32;
1131 
1132   is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1133 
1134   /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1135    * tcp->read_cb is NULL or not?
1136    */
1137   while (stream->read_cb
1138       && (stream->flags & UV_HANDLE_READING)
1139       && (count-- > 0)) {
1140     assert(stream->alloc_cb != NULL);
1141 
1142     buf = uv_buf_init(NULL, 0);
1143     stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1144     if (buf.base == NULL || buf.len == 0) {
1145       /* User indicates it can't or won't handle the read. */
1146       stream->read_cb(stream, UV_ENOBUFS, &buf);
1147       return;
1148     }
1149 
1150     assert(buf.base != NULL);
1151     assert(uv__stream_fd(stream) >= 0);
1152 
1153     if (!is_ipc) {
1154       do {
1155         nread = read(uv__stream_fd(stream), buf.base, buf.len);
1156       }
1157       while (nread < 0 && errno == EINTR);
1158     } else {
1159       /* ipc uses recvmsg */
1160       msg.msg_flags = 0;
1161       msg.msg_iov = (struct iovec*) &buf;
1162       msg.msg_iovlen = 1;
1163       msg.msg_name = NULL;
1164       msg.msg_namelen = 0;
1165       /* Set up to receive a descriptor even if one isn't in the message */
1166       msg.msg_controllen = sizeof(cmsg_space);
1167       msg.msg_control = cmsg_space;
1168 
1169       do {
1170         nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1171       }
1172       while (nread < 0 && errno == EINTR);
1173     }
1174 
1175     if (nread < 0) {
1176       /* Error */
1177       if (errno == EAGAIN || errno == EWOULDBLOCK) {
1178         /* Wait for the next one. */
1179         if (stream->flags & UV_HANDLE_READING) {
1180           uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1181           uv__stream_osx_interrupt_select(stream);
1182         }
1183         stream->read_cb(stream, 0, &buf);
1184 #if defined(__CYGWIN__) || defined(__MSYS__)
1185       } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1186         uv__stream_eof(stream, &buf);
1187         return;
1188 #endif
1189       } else {
1190         /* Error. User should call uv_close(). */
1191         stream->read_cb(stream, UV__ERR(errno), &buf);
1192         if (stream->flags & UV_HANDLE_READING) {
1193           stream->flags &= ~UV_HANDLE_READING;
1194           uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1195           if (!uv__io_active(&stream->io_watcher, POLLOUT))
1196             uv__handle_stop(stream);
1197           uv__stream_osx_interrupt_select(stream);
1198         }
1199       }
1200       return;
1201     } else if (nread == 0) {
1202       uv__stream_eof(stream, &buf);
1203       return;
1204     } else {
1205       /* Successful read */
1206       ssize_t buflen = buf.len;
1207 
1208       if (is_ipc) {
1209         err = uv__stream_recv_cmsg(stream, &msg);
1210         if (err != 0) {
1211           stream->read_cb(stream, err, &buf);
1212           return;
1213         }
1214       }
1215 
1216 #if defined(__MVS__)
1217       if (is_ipc && msg.msg_controllen > 0) {
1218         uv_buf_t blankbuf;
1219         int nread;
1220         struct iovec *old;
1221 
1222         blankbuf.base = 0;
1223         blankbuf.len = 0;
1224         old = msg.msg_iov;
1225         msg.msg_iov = (struct iovec*) &blankbuf;
1226         nread = 0;
1227         do {
1228           nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1229           err = uv__stream_recv_cmsg(stream, &msg);
1230           if (err != 0) {
1231             stream->read_cb(stream, err, &buf);
1232             msg.msg_iov = old;
1233             return;
1234           }
1235         } while (nread == 0 && msg.msg_controllen > 0);
1236         msg.msg_iov = old;
1237       }
1238 #endif
1239       stream->read_cb(stream, nread, &buf);
1240 
1241       /* Return if we didn't fill the buffer, there is no more data to read. */
1242       if (nread < buflen) {
1243         stream->flags |= UV_HANDLE_READ_PARTIAL;
1244         return;
1245       }
1246     }
1247   }
1248 }
1249 
1250 
1251 #ifdef __clang__
1252 # pragma clang diagnostic pop
1253 #endif
1254 
1255 #undef UV__CMSG_FD_COUNT
1256 #undef UV__CMSG_FD_SIZE
1257 
1258 
uv_shutdown(uv_shutdown_t * req,uv_stream_t * stream,uv_shutdown_cb cb)1259 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1260   assert(stream->type == UV_TCP ||
1261          stream->type == UV_TTY ||
1262          stream->type == UV_NAMED_PIPE);
1263 
1264   if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1265       stream->flags & UV_HANDLE_SHUT ||
1266       stream->flags & UV_HANDLE_SHUTTING ||
1267       uv__is_closing(stream)) {
1268     return UV_ENOTCONN;
1269   }
1270 
1271   assert(uv__stream_fd(stream) >= 0);
1272 
1273   /* Initialize request */
1274   uv__req_init(stream->loop, req, UV_SHUTDOWN);
1275   req->handle = stream;
1276   req->cb = cb;
1277   stream->shutdown_req = req;
1278   stream->flags |= UV_HANDLE_SHUTTING;
1279 
1280   uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1281   uv__stream_osx_interrupt_select(stream);
1282 
1283   return 0;
1284 }
1285 
1286 
uv__stream_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)1287 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1288   uv_stream_t* stream;
1289 
1290   stream = container_of(w, uv_stream_t, io_watcher);
1291 
1292   assert(stream->type == UV_TCP ||
1293          stream->type == UV_NAMED_PIPE ||
1294          stream->type == UV_TTY);
1295   assert(!(stream->flags & UV_HANDLE_CLOSING));
1296 
1297   if (stream->connect_req) {
1298     uv__stream_connect(stream);
1299     return;
1300   }
1301 
1302   assert(uv__stream_fd(stream) >= 0);
1303 
1304   /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1305   if (events & (POLLIN | POLLERR | POLLHUP))
1306     uv__read(stream);
1307 
1308   if (uv__stream_fd(stream) == -1)
1309     return;  /* read_cb closed stream. */
1310 
1311   /* Short-circuit iff POLLHUP is set, the user is still interested in read
1312    * events and uv__read() reported a partial read but not EOF. If the EOF
1313    * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1314    * have to do anything. If the partial read flag is not set, we can't
1315    * report the EOF yet because there is still data to read.
1316    */
1317   if ((events & POLLHUP) &&
1318       (stream->flags & UV_HANDLE_READING) &&
1319       (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1320       !(stream->flags & UV_HANDLE_READ_EOF)) {
1321     uv_buf_t buf = { NULL, 0 };
1322     uv__stream_eof(stream, &buf);
1323   }
1324 
1325   if (uv__stream_fd(stream) == -1)
1326     return;  /* read_cb closed stream. */
1327 
1328   if (events & (POLLOUT | POLLERR | POLLHUP)) {
1329     uv__write(stream);
1330     uv__write_callbacks(stream);
1331 
1332     /* Write queue drained. */
1333     if (QUEUE_EMPTY(&stream->write_queue))
1334       uv__drain(stream);
1335   }
1336 }
1337 
1338 
1339 /**
1340  * We get called here from directly following a call to connect(2).
1341  * In order to determine if we've errored out or succeeded must call
1342  * getsockopt.
1343  */
uv__stream_connect(uv_stream_t * stream)1344 static void uv__stream_connect(uv_stream_t* stream) {
1345   int error;
1346   uv_connect_t* req = stream->connect_req;
1347   socklen_t errorsize = sizeof(int);
1348 
1349   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1350   assert(req);
1351 
1352   if (stream->delayed_error) {
1353     /* To smooth over the differences between unixes errors that
1354      * were reported synchronously on the first connect can be delayed
1355      * until the next tick--which is now.
1356      */
1357     error = stream->delayed_error;
1358     stream->delayed_error = 0;
1359   } else {
1360     /* Normal situation: we need to get the socket error from the kernel. */
1361     assert(uv__stream_fd(stream) >= 0);
1362     getsockopt(uv__stream_fd(stream),
1363                SOL_SOCKET,
1364                SO_ERROR,
1365                &error,
1366                &errorsize);
1367     error = UV__ERR(error);
1368   }
1369 
1370   if (error == UV__ERR(EINPROGRESS))
1371     return;
1372 
1373   stream->connect_req = NULL;
1374   uv__req_unregister(stream->loop, req);
1375 
1376   if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1377     uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1378   }
1379 
1380   if (req->cb)
1381     req->cb(req, error);
1382 
1383   if (uv__stream_fd(stream) == -1)
1384     return;
1385 
1386   if (error < 0) {
1387     uv__stream_flush_write_queue(stream, UV_ECANCELED);
1388     uv__write_callbacks(stream);
1389   }
1390 }
1391 
1392 
uv_write2(uv_write_t * req,uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1393 int uv_write2(uv_write_t* req,
1394               uv_stream_t* stream,
1395               const uv_buf_t bufs[],
1396               unsigned int nbufs,
1397               uv_stream_t* send_handle,
1398               uv_write_cb cb) {
1399   int empty_queue;
1400 
1401   assert(nbufs > 0);
1402   assert((stream->type == UV_TCP ||
1403           stream->type == UV_NAMED_PIPE ||
1404           stream->type == UV_TTY) &&
1405          "uv_write (unix) does not yet support other types of streams");
1406 
1407   if (uv__stream_fd(stream) < 0)
1408     return UV_EBADF;
1409 
1410   if (!(stream->flags & UV_HANDLE_WRITABLE))
1411     return UV_EPIPE;
1412 
1413   if (send_handle) {
1414     if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1415       return UV_EINVAL;
1416 
1417     /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1418      * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1419      * evaluates to a function that operates on a uv_stream_t with a couple of
1420      * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1421      * which works but only by accident.
1422      */
1423     if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1424       return UV_EBADF;
1425 
1426 #if defined(__CYGWIN__) || defined(__MSYS__)
1427     /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1428        See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1429     return UV_ENOSYS;
1430 #endif
1431   }
1432 
1433   /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1434    * it means there are error-state requests in the write_completed_queue that
1435    * will touch up write_queue_size later, see also uv__write_req_finish().
1436    * We could check that write_queue is empty instead but that implies making
1437    * a write() syscall when we know that the handle is in error mode.
1438    */
1439   empty_queue = (stream->write_queue_size == 0);
1440 
1441   /* Initialize the req */
1442   uv__req_init(stream->loop, req, UV_WRITE);
1443   req->cb = cb;
1444   req->handle = stream;
1445   req->error = 0;
1446   req->send_handle = send_handle;
1447   QUEUE_INIT(&req->queue);
1448 
1449   req->bufs = req->bufsml;
1450   if (nbufs > ARRAY_SIZE(req->bufsml))
1451     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1452 
1453   if (req->bufs == NULL)
1454     return UV_ENOMEM;
1455 
1456   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1457   req->nbufs = nbufs;
1458   req->write_index = 0;
1459   stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1460 
1461   /* Append the request to write_queue. */
1462   QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1463 
1464   /* If the queue was empty when this function began, we should attempt to
1465    * do the write immediately. Otherwise start the write_watcher and wait
1466    * for the fd to become writable.
1467    */
1468   if (stream->connect_req) {
1469     /* Still connecting, do nothing. */
1470   }
1471   else if (empty_queue) {
1472     uv__write(stream);
1473   }
1474   else {
1475     /*
1476      * blocking streams should never have anything in the queue.
1477      * if this assert fires then somehow the blocking stream isn't being
1478      * sufficiently flushed in uv__write.
1479      */
1480     assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1481     uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1482     uv__stream_osx_interrupt_select(stream);
1483   }
1484 
1485   return 0;
1486 }
1487 
1488 
1489 /* The buffers to be written must remain valid until the callback is called.
1490  * This is not required for the uv_buf_t array.
1491  */
uv_write(uv_write_t * req,uv_stream_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)1492 int uv_write(uv_write_t* req,
1493              uv_stream_t* handle,
1494              const uv_buf_t bufs[],
1495              unsigned int nbufs,
1496              uv_write_cb cb) {
1497   return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1498 }
1499 
1500 
uv_try_write_cb(uv_write_t * req,int status)1501 void uv_try_write_cb(uv_write_t* req, int status) {
1502   /* Should not be called */
1503   abort();
1504 }
1505 
1506 
uv_try_write(uv_stream_t * stream,const uv_buf_t bufs[],unsigned int nbufs)1507 int uv_try_write(uv_stream_t* stream,
1508                  const uv_buf_t bufs[],
1509                  unsigned int nbufs) {
1510   int r;
1511   int has_pollout;
1512   size_t written;
1513   size_t req_size;
1514   uv_write_t req;
1515 
1516   /* Connecting or already writing some data */
1517   if (stream->connect_req != NULL || stream->write_queue_size != 0)
1518     return UV_EAGAIN;
1519 
1520   has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
1521 
1522   r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
1523   if (r != 0)
1524     return r;
1525 
1526   /* Remove not written bytes from write queue size */
1527   written = uv__count_bufs(bufs, nbufs);
1528   if (req.bufs != NULL)
1529     req_size = uv__write_req_size(&req);
1530   else
1531     req_size = 0;
1532   written -= req_size;
1533   stream->write_queue_size -= req_size;
1534 
1535   /* Unqueue request, regardless of immediateness */
1536   QUEUE_REMOVE(&req.queue);
1537   uv__req_unregister(stream->loop, &req);
1538   if (req.bufs != req.bufsml)
1539     uv__free(req.bufs);
1540   req.bufs = NULL;
1541 
1542   /* Do not poll for writable, if we wasn't before calling this */
1543   if (!has_pollout) {
1544     uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1545     uv__stream_osx_interrupt_select(stream);
1546   }
1547 
1548   if (written == 0 && req_size != 0)
1549     return req.error < 0 ? req.error : UV_EAGAIN;
1550   else
1551     return written;
1552 }
1553 
1554 
uv_read_start(uv_stream_t * stream,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1555 int uv_read_start(uv_stream_t* stream,
1556                   uv_alloc_cb alloc_cb,
1557                   uv_read_cb read_cb) {
1558   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1559       stream->type == UV_TTY);
1560 
1561   if (stream->flags & UV_HANDLE_CLOSING)
1562     return UV_EINVAL;
1563 
1564   if (!(stream->flags & UV_HANDLE_READABLE))
1565     return UV_ENOTCONN;
1566 
1567   /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
1568    * expresses the desired state of the user.
1569    */
1570   stream->flags |= UV_HANDLE_READING;
1571 
1572   /* TODO: try to do the read inline? */
1573   /* TODO: keep track of tcp state. If we've gotten a EOF then we should
1574    * not start the IO watcher.
1575    */
1576   assert(uv__stream_fd(stream) >= 0);
1577   assert(alloc_cb);
1578 
1579   stream->read_cb = read_cb;
1580   stream->alloc_cb = alloc_cb;
1581 
1582   uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1583   uv__handle_start(stream);
1584   uv__stream_osx_interrupt_select(stream);
1585 
1586   return 0;
1587 }
1588 
1589 
uv_read_stop(uv_stream_t * stream)1590 int uv_read_stop(uv_stream_t* stream) {
1591   if (!(stream->flags & UV_HANDLE_READING))
1592     return 0;
1593 
1594   stream->flags &= ~UV_HANDLE_READING;
1595   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1596   if (!uv__io_active(&stream->io_watcher, POLLOUT))
1597     uv__handle_stop(stream);
1598   uv__stream_osx_interrupt_select(stream);
1599 
1600   stream->read_cb = NULL;
1601   stream->alloc_cb = NULL;
1602   return 0;
1603 }
1604 
1605 
uv_is_readable(const uv_stream_t * stream)1606 int uv_is_readable(const uv_stream_t* stream) {
1607   return !!(stream->flags & UV_HANDLE_READABLE);
1608 }
1609 
1610 
uv_is_writable(const uv_stream_t * stream)1611 int uv_is_writable(const uv_stream_t* stream) {
1612   return !!(stream->flags & UV_HANDLE_WRITABLE);
1613 }
1614 
1615 
1616 #if defined(__APPLE__)
uv___stream_fd(const uv_stream_t * handle)1617 int uv___stream_fd(const uv_stream_t* handle) {
1618   const uv__stream_select_t* s;
1619 
1620   assert(handle->type == UV_TCP ||
1621          handle->type == UV_TTY ||
1622          handle->type == UV_NAMED_PIPE);
1623 
1624   s = handle->select;
1625   if (s != NULL)
1626     return s->fd;
1627 
1628   return handle->io_watcher.fd;
1629 }
1630 #endif /* defined(__APPLE__) */
1631 
1632 
uv__stream_close(uv_stream_t * handle)1633 void uv__stream_close(uv_stream_t* handle) {
1634   unsigned int i;
1635   uv__stream_queued_fds_t* queued_fds;
1636 
1637 #if defined(__APPLE__)
1638   /* Terminate select loop first */
1639   if (handle->select != NULL) {
1640     uv__stream_select_t* s;
1641 
1642     s = handle->select;
1643 
1644     uv_sem_post(&s->close_sem);
1645     uv_sem_post(&s->async_sem);
1646     uv__stream_osx_interrupt_select(handle);
1647     uv_thread_join(&s->thread);
1648     uv_sem_destroy(&s->close_sem);
1649     uv_sem_destroy(&s->async_sem);
1650     uv__close(s->fake_fd);
1651     uv__close(s->int_fd);
1652     uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1653 
1654     handle->select = NULL;
1655   }
1656 #endif /* defined(__APPLE__) */
1657 
1658   uv__io_close(handle->loop, &handle->io_watcher);
1659   uv_read_stop(handle);
1660   uv__handle_stop(handle);
1661   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1662 
1663   if (handle->io_watcher.fd != -1) {
1664     /* Don't close stdio file descriptors.  Nothing good comes from it. */
1665     if (handle->io_watcher.fd > STDERR_FILENO)
1666       uv__close(handle->io_watcher.fd);
1667     handle->io_watcher.fd = -1;
1668   }
1669 
1670   if (handle->accepted_fd != -1) {
1671     uv__close(handle->accepted_fd);
1672     handle->accepted_fd = -1;
1673   }
1674 
1675   /* Close all queued fds */
1676   if (handle->queued_fds != NULL) {
1677     queued_fds = handle->queued_fds;
1678     for (i = 0; i < queued_fds->offset; i++)
1679       uv__close(queued_fds->fds[i]);
1680     uv__free(handle->queued_fds);
1681     handle->queued_fds = NULL;
1682   }
1683 
1684   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1685 }
1686 
1687 
uv_stream_set_blocking(uv_stream_t * handle,int blocking)1688 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1689   /* Don't need to check the file descriptor, uv__nonblock()
1690    * will fail with EBADF if it's not valid.
1691    */
1692   return uv__nonblock(uv__stream_fd(handle), !blocking);
1693 }
1694