1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "internal.h"
24 
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #if defined(__MVS__)
31 #include <xti.h>
32 #endif
33 #include <sys/un.h>
34 
35 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
36 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
37 #endif
38 
39 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
40 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
41 #endif
42 
43 
44 static void uv__udp_run_completed(uv_udp_t* handle);
45 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
46 static void uv__udp_recvmsg(uv_udp_t* handle);
47 static void uv__udp_sendmsg(uv_udp_t* handle);
48 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
49                                        int domain,
50                                        unsigned int flags);
51 
52 
uv__udp_close(uv_udp_t * handle)53 void uv__udp_close(uv_udp_t* handle) {
54   uv__io_close(handle->loop, &handle->io_watcher);
55   uv__handle_stop(handle);
56 
57   if (handle->io_watcher.fd != -1) {
58     uv__close(handle->io_watcher.fd);
59     handle->io_watcher.fd = -1;
60   }
61 }
62 
63 
uv__udp_finish_close(uv_udp_t * handle)64 void uv__udp_finish_close(uv_udp_t* handle) {
65   uv_udp_send_t* req;
66   QUEUE* q;
67 
68   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
69   assert(handle->io_watcher.fd == -1);
70 
71   while (!QUEUE_EMPTY(&handle->write_queue)) {
72     q = QUEUE_HEAD(&handle->write_queue);
73     QUEUE_REMOVE(q);
74 
75     req = QUEUE_DATA(q, uv_udp_send_t, queue);
76     req->status = UV_ECANCELED;
77     QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
78   }
79 
80   uv__udp_run_completed(handle);
81 
82   assert(handle->send_queue_size == 0);
83   assert(handle->send_queue_count == 0);
84 
85   /* Now tear down the handle. */
86   handle->recv_cb = NULL;
87   handle->alloc_cb = NULL;
88   /* but _do not_ touch close_cb */
89 }
90 
91 
uv__udp_run_completed(uv_udp_t * handle)92 static void uv__udp_run_completed(uv_udp_t* handle) {
93   uv_udp_send_t* req;
94   QUEUE* q;
95 
96   assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
97   handle->flags |= UV_HANDLE_UDP_PROCESSING;
98 
99   while (!QUEUE_EMPTY(&handle->write_completed_queue)) {
100     q = QUEUE_HEAD(&handle->write_completed_queue);
101     QUEUE_REMOVE(q);
102 
103     req = QUEUE_DATA(q, uv_udp_send_t, queue);
104     uv__req_unregister(handle->loop, req);
105 
106     handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
107     handle->send_queue_count--;
108 
109     if (req->bufs != req->bufsml)
110       uv__free(req->bufs);
111     req->bufs = NULL;
112 
113     if (req->send_cb == NULL)
114       continue;
115 
116     /* req->status >= 0 == bytes written
117      * req->status <  0 == errno
118      */
119     if (req->status >= 0)
120       req->send_cb(req, 0);
121     else
122       req->send_cb(req, req->status);
123   }
124 
125   if (QUEUE_EMPTY(&handle->write_queue)) {
126     /* Pending queue and completion queue empty, stop watcher. */
127     uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
128     if (!uv__io_active(&handle->io_watcher, POLLIN))
129       uv__handle_stop(handle);
130   }
131 
132   handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
133 }
134 
135 
uv__udp_io(uv_loop_t * loop,uv__io_t * w,unsigned int revents)136 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
137   uv_udp_t* handle;
138 
139   handle = container_of(w, uv_udp_t, io_watcher);
140   assert(handle->type == UV_UDP);
141 
142   if (revents & POLLIN)
143     uv__udp_recvmsg(handle);
144 
145   if (revents & POLLOUT) {
146     uv__udp_sendmsg(handle);
147     uv__udp_run_completed(handle);
148   }
149 }
150 
151 
uv__udp_recvmsg(uv_udp_t * handle)152 static void uv__udp_recvmsg(uv_udp_t* handle) {
153   struct sockaddr_storage peer;
154   struct msghdr h;
155   ssize_t nread;
156   uv_buf_t buf;
157   int flags;
158   int count;
159 
160   assert(handle->recv_cb != NULL);
161   assert(handle->alloc_cb != NULL);
162 
163   /* Prevent loop starvation when the data comes in as fast as (or faster than)
164    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
165    */
166   count = 32;
167 
168   do {
169     buf = uv_buf_init(NULL, 0);
170     handle->alloc_cb((uv_handle_t*) handle, 64 * 1024, &buf);
171     if (buf.base == NULL || buf.len == 0) {
172       handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
173       return;
174     }
175     assert(buf.base != NULL);
176 
177     memset(&h, 0, sizeof(h));
178     memset(&peer, 0, sizeof(peer));
179     h.msg_name = &peer;
180     h.msg_namelen = sizeof(peer);
181     h.msg_iov = (void*) &buf;
182     h.msg_iovlen = 1;
183 
184     do {
185       nread = recvmsg(handle->io_watcher.fd, &h, 0);
186     }
187     while (nread == -1 && errno == EINTR);
188 
189     if (nread == -1) {
190       if (errno == EAGAIN || errno == EWOULDBLOCK)
191         handle->recv_cb(handle, 0, &buf, NULL, 0);
192       else
193         handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
194     }
195     else {
196       flags = 0;
197       if (h.msg_flags & MSG_TRUNC)
198         flags |= UV_UDP_PARTIAL;
199 
200       handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
201     }
202   }
203   /* recv_cb callback may decide to pause or close the handle */
204   while (nread != -1
205       && count-- > 0
206       && handle->io_watcher.fd != -1
207       && handle->recv_cb != NULL);
208 }
209 
210 
uv__udp_sendmsg(uv_udp_t * handle)211 static void uv__udp_sendmsg(uv_udp_t* handle) {
212   uv_udp_send_t* req;
213   QUEUE* q;
214   struct msghdr h;
215   ssize_t size;
216 
217   while (!QUEUE_EMPTY(&handle->write_queue)) {
218     q = QUEUE_HEAD(&handle->write_queue);
219     assert(q != NULL);
220 
221     req = QUEUE_DATA(q, uv_udp_send_t, queue);
222     assert(req != NULL);
223 
224     memset(&h, 0, sizeof h);
225     if (req->addr.ss_family == AF_UNSPEC) {
226       h.msg_name = NULL;
227       h.msg_namelen = 0;
228     } else {
229       h.msg_name = &req->addr;
230       if (req->addr.ss_family == AF_INET6)
231         h.msg_namelen = sizeof(struct sockaddr_in6);
232       else if (req->addr.ss_family == AF_INET)
233         h.msg_namelen = sizeof(struct sockaddr_in);
234       else if (req->addr.ss_family == AF_UNIX)
235         h.msg_namelen = sizeof(struct sockaddr_un);
236       else {
237         assert(0 && "unsupported address family");
238         abort();
239       }
240     }
241     h.msg_iov = (struct iovec*) req->bufs;
242     h.msg_iovlen = req->nbufs;
243 
244     do {
245       size = sendmsg(handle->io_watcher.fd, &h, 0);
246     } while (size == -1 && errno == EINTR);
247 
248     if (size == -1) {
249       if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
250         break;
251     }
252 
253     req->status = (size == -1 ? UV__ERR(errno) : size);
254 
255     /* Sending a datagram is an atomic operation: either all data
256      * is written or nothing is (and EMSGSIZE is raised). That is
257      * why we don't handle partial writes. Just pop the request
258      * off the write queue and onto the completed queue, done.
259      */
260     QUEUE_REMOVE(&req->queue);
261     QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
262     uv__io_feed(handle->loop, &handle->io_watcher);
263   }
264 }
265 
266 
267 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
268  * refinements for programs that use multicast.
269  *
270  * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
271  * are different from the BSDs: it _shares_ the port rather than steal it
272  * from the current listener.  While useful, it's not something we can emulate
273  * on other platforms so we don't enable it.
274  *
275  * zOS does not support getsockname with SO_REUSEPORT option when using
276  * AF_UNIX.
277  */
uv__set_reuse(int fd)278 static int uv__set_reuse(int fd) {
279   int yes;
280   yes = 1;
281 
282 #if defined(SO_REUSEPORT) && defined(__MVS__)
283   struct sockaddr_in sockfd;
284   unsigned int sockfd_len = sizeof(sockfd);
285   if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
286       return UV__ERR(errno);
287   if (sockfd.sin_family == AF_UNIX) {
288     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
289       return UV__ERR(errno);
290   } else {
291     if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
292        return UV__ERR(errno);
293   }
294 #elif defined(SO_REUSEPORT) && !defined(__linux__)
295   if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
296     return UV__ERR(errno);
297 #else
298   if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
299     return UV__ERR(errno);
300 #endif
301 
302   return 0;
303 }
304 
305 
uv__udp_bind(uv_udp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)306 int uv__udp_bind(uv_udp_t* handle,
307                  const struct sockaddr* addr,
308                  unsigned int addrlen,
309                  unsigned int flags) {
310   int err;
311   int yes;
312   int fd;
313 
314   /* Check for bad flags. */
315   if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR))
316     return UV_EINVAL;
317 
318   /* Cannot set IPv6-only mode on non-IPv6 socket. */
319   if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
320     return UV_EINVAL;
321 
322   fd = handle->io_watcher.fd;
323   if (fd == -1) {
324     err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
325     if (err < 0)
326       return err;
327     fd = err;
328     handle->io_watcher.fd = fd;
329   }
330 
331   if (flags & UV_UDP_REUSEADDR) {
332     err = uv__set_reuse(fd);
333     if (err)
334       return err;
335   }
336 
337   if (flags & UV_UDP_IPV6ONLY) {
338 #ifdef IPV6_V6ONLY
339     yes = 1;
340     if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
341       err = UV__ERR(errno);
342       return err;
343     }
344 #else
345     err = UV_ENOTSUP;
346     return err;
347 #endif
348   }
349 
350   if (bind(fd, addr, addrlen)) {
351     err = UV__ERR(errno);
352     if (errno == EAFNOSUPPORT)
353       /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
354        * socket created with AF_INET to an AF_INET6 address or vice versa. */
355       err = UV_EINVAL;
356     return err;
357   }
358 
359   if (addr->sa_family == AF_INET6)
360     handle->flags |= UV_HANDLE_IPV6;
361 
362   handle->flags |= UV_HANDLE_BOUND;
363   return 0;
364 }
365 
366 
uv__udp_maybe_deferred_bind(uv_udp_t * handle,int domain,unsigned int flags)367 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
368                                        int domain,
369                                        unsigned int flags) {
370   union {
371     struct sockaddr_in6 in6;
372     struct sockaddr_in in;
373     struct sockaddr addr;
374   } taddr;
375   socklen_t addrlen;
376 
377   if (handle->io_watcher.fd != -1)
378     return 0;
379 
380   switch (domain) {
381   case AF_INET:
382   {
383     struct sockaddr_in* addr = &taddr.in;
384     memset(addr, 0, sizeof *addr);
385     addr->sin_family = AF_INET;
386     addr->sin_addr.s_addr = INADDR_ANY;
387     addrlen = sizeof *addr;
388     break;
389   }
390   case AF_INET6:
391   {
392     struct sockaddr_in6* addr = &taddr.in6;
393     memset(addr, 0, sizeof *addr);
394     addr->sin6_family = AF_INET6;
395     addr->sin6_addr = in6addr_any;
396     addrlen = sizeof *addr;
397     break;
398   }
399   default:
400     assert(0 && "unsupported address family");
401     abort();
402   }
403 
404   return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
405 }
406 
407 
uv__udp_connect(uv_udp_t * handle,const struct sockaddr * addr,unsigned int addrlen)408 int uv__udp_connect(uv_udp_t* handle,
409                     const struct sockaddr* addr,
410                     unsigned int addrlen) {
411   int err;
412 
413   err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
414   if (err)
415     return err;
416 
417   do {
418     errno = 0;
419     err = connect(handle->io_watcher.fd, addr, addrlen);
420   } while (err == -1 && errno == EINTR);
421 
422   if (err)
423     return UV__ERR(errno);
424 
425   handle->flags |= UV_HANDLE_UDP_CONNECTED;
426 
427   return 0;
428 }
429 
430 
uv__udp_disconnect(uv_udp_t * handle)431 int uv__udp_disconnect(uv_udp_t* handle) {
432     int r;
433     struct sockaddr addr;
434 
435     memset(&addr, 0, sizeof(addr));
436 
437     addr.sa_family = AF_UNSPEC;
438 
439     do {
440       errno = 0;
441       r = connect(handle->io_watcher.fd, &addr, sizeof(addr));
442     } while (r == -1 && errno == EINTR);
443 
444     if (r == -1 && errno != EAFNOSUPPORT)
445       return UV__ERR(errno);
446 
447     handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
448     return 0;
449 }
450 
451 
uv__udp_send(uv_udp_send_t * req,uv_udp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr * addr,unsigned int addrlen,uv_udp_send_cb send_cb)452 int uv__udp_send(uv_udp_send_t* req,
453                  uv_udp_t* handle,
454                  const uv_buf_t bufs[],
455                  unsigned int nbufs,
456                  const struct sockaddr* addr,
457                  unsigned int addrlen,
458                  uv_udp_send_cb send_cb) {
459   int err;
460   int empty_queue;
461 
462   assert(nbufs > 0);
463 
464   if (addr) {
465     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
466     if (err)
467       return err;
468   }
469 
470   /* It's legal for send_queue_count > 0 even when the write_queue is empty;
471    * it means there are error-state requests in the write_completed_queue that
472    * will touch up send_queue_size/count later.
473    */
474   empty_queue = (handle->send_queue_count == 0);
475 
476   uv__req_init(handle->loop, req, UV_UDP_SEND);
477   assert(addrlen <= sizeof(req->addr));
478   if (addr == NULL)
479     req->addr.ss_family = AF_UNSPEC;
480   else
481     memcpy(&req->addr, addr, addrlen);
482   req->send_cb = send_cb;
483   req->handle = handle;
484   req->nbufs = nbufs;
485 
486   req->bufs = req->bufsml;
487   if (nbufs > ARRAY_SIZE(req->bufsml))
488     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
489 
490   if (req->bufs == NULL) {
491     uv__req_unregister(handle->loop, req);
492     return UV_ENOMEM;
493   }
494 
495   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
496   handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
497   handle->send_queue_count++;
498   QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
499   uv__handle_start(handle);
500 
501   if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
502     uv__udp_sendmsg(handle);
503 
504     /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
505      * away. In such cases the `io_watcher` has to be queued for asynchronous
506      * write.
507      */
508     if (!QUEUE_EMPTY(&handle->write_queue))
509       uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
510   } else {
511     uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
512   }
513 
514   return 0;
515 }
516 
517 
uv__udp_try_send(uv_udp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr * addr,unsigned int addrlen)518 int uv__udp_try_send(uv_udp_t* handle,
519                      const uv_buf_t bufs[],
520                      unsigned int nbufs,
521                      const struct sockaddr* addr,
522                      unsigned int addrlen) {
523   int err;
524   struct msghdr h;
525   ssize_t size;
526 
527   assert(nbufs > 0);
528 
529   /* already sending a message */
530   if (handle->send_queue_count != 0)
531     return UV_EAGAIN;
532 
533   if (addr) {
534     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
535     if (err)
536       return err;
537   } else {
538     assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
539   }
540 
541   memset(&h, 0, sizeof h);
542   h.msg_name = (struct sockaddr*) addr;
543   h.msg_namelen = addrlen;
544   h.msg_iov = (struct iovec*) bufs;
545   h.msg_iovlen = nbufs;
546 
547   do {
548     size = sendmsg(handle->io_watcher.fd, &h, 0);
549   } while (size == -1 && errno == EINTR);
550 
551   if (size == -1) {
552     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
553       return UV_EAGAIN;
554     else
555       return UV__ERR(errno);
556   }
557 
558   return size;
559 }
560 
561 
uv__udp_set_membership4(uv_udp_t * handle,const struct sockaddr_in * multicast_addr,const char * interface_addr,uv_membership membership)562 static int uv__udp_set_membership4(uv_udp_t* handle,
563                                    const struct sockaddr_in* multicast_addr,
564                                    const char* interface_addr,
565                                    uv_membership membership) {
566   struct ip_mreq mreq;
567   int optname;
568   int err;
569 
570   memset(&mreq, 0, sizeof mreq);
571 
572   if (interface_addr) {
573     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
574     if (err)
575       return err;
576   } else {
577     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
578   }
579 
580   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
581 
582   switch (membership) {
583   case UV_JOIN_GROUP:
584     optname = IP_ADD_MEMBERSHIP;
585     break;
586   case UV_LEAVE_GROUP:
587     optname = IP_DROP_MEMBERSHIP;
588     break;
589   default:
590     return UV_EINVAL;
591   }
592 
593   if (setsockopt(handle->io_watcher.fd,
594                  IPPROTO_IP,
595                  optname,
596                  &mreq,
597                  sizeof(mreq))) {
598 #if defined(__MVS__)
599   if (errno == ENXIO)
600     return UV_ENODEV;
601 #endif
602     return UV__ERR(errno);
603   }
604 
605   return 0;
606 }
607 
608 
uv__udp_set_membership6(uv_udp_t * handle,const struct sockaddr_in6 * multicast_addr,const char * interface_addr,uv_membership membership)609 static int uv__udp_set_membership6(uv_udp_t* handle,
610                                    const struct sockaddr_in6* multicast_addr,
611                                    const char* interface_addr,
612                                    uv_membership membership) {
613   int optname;
614   struct ipv6_mreq mreq;
615   struct sockaddr_in6 addr6;
616 
617   memset(&mreq, 0, sizeof mreq);
618 
619   if (interface_addr) {
620     if (uv_ip6_addr(interface_addr, 0, &addr6))
621       return UV_EINVAL;
622     mreq.ipv6mr_interface = addr6.sin6_scope_id;
623   } else {
624     mreq.ipv6mr_interface = 0;
625   }
626 
627   mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
628 
629   switch (membership) {
630   case UV_JOIN_GROUP:
631     optname = IPV6_ADD_MEMBERSHIP;
632     break;
633   case UV_LEAVE_GROUP:
634     optname = IPV6_DROP_MEMBERSHIP;
635     break;
636   default:
637     return UV_EINVAL;
638   }
639 
640   if (setsockopt(handle->io_watcher.fd,
641                  IPPROTO_IPV6,
642                  optname,
643                  &mreq,
644                  sizeof(mreq))) {
645 #if defined(__MVS__)
646   if (errno == ENXIO)
647     return UV_ENODEV;
648 #endif
649     return UV__ERR(errno);
650   }
651 
652   return 0;
653 }
654 
655 
656 #if !defined(__OpenBSD__) && !defined(__NetBSD__)
uv__udp_set_source_membership4(uv_udp_t * handle,const struct sockaddr_in * multicast_addr,const char * interface_addr,const struct sockaddr_in * source_addr,uv_membership membership)657 static int uv__udp_set_source_membership4(uv_udp_t* handle,
658                                           const struct sockaddr_in* multicast_addr,
659                                           const char* interface_addr,
660                                           const struct sockaddr_in* source_addr,
661                                           uv_membership membership) {
662   struct ip_mreq_source mreq;
663   int optname;
664   int err;
665 
666   err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
667   if (err)
668     return err;
669 
670   memset(&mreq, 0, sizeof(mreq));
671 
672   if (interface_addr != NULL) {
673     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
674     if (err)
675       return err;
676   } else {
677     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
678   }
679 
680   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
681   mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
682 
683   if (membership == UV_JOIN_GROUP)
684     optname = IP_ADD_SOURCE_MEMBERSHIP;
685   else if (membership == UV_LEAVE_GROUP)
686     optname = IP_DROP_SOURCE_MEMBERSHIP;
687   else
688     return UV_EINVAL;
689 
690   if (setsockopt(handle->io_watcher.fd,
691                  IPPROTO_IP,
692                  optname,
693                  &mreq,
694                  sizeof(mreq))) {
695     return UV__ERR(errno);
696   }
697 
698   return 0;
699 }
700 
701 
uv__udp_set_source_membership6(uv_udp_t * handle,const struct sockaddr_in6 * multicast_addr,const char * interface_addr,const struct sockaddr_in6 * source_addr,uv_membership membership)702 static int uv__udp_set_source_membership6(uv_udp_t* handle,
703                                           const struct sockaddr_in6* multicast_addr,
704                                           const char* interface_addr,
705                                           const struct sockaddr_in6* source_addr,
706                                           uv_membership membership) {
707   struct group_source_req mreq;
708   struct sockaddr_in6 addr6;
709   int optname;
710   int err;
711 
712   err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
713   if (err)
714     return err;
715 
716   memset(&mreq, 0, sizeof(mreq));
717 
718   if (interface_addr != NULL) {
719     err = uv_ip6_addr(interface_addr, 0, &addr6);
720     if (err)
721       return err;
722     mreq.gsr_interface = addr6.sin6_scope_id;
723   } else {
724     mreq.gsr_interface = 0;
725   }
726 
727   memcpy(&mreq.gsr_group, multicast_addr, sizeof(mreq.gsr_group));
728   memcpy(&mreq.gsr_source, source_addr, sizeof(mreq.gsr_source));
729 
730   if (membership == UV_JOIN_GROUP)
731     optname = MCAST_JOIN_SOURCE_GROUP;
732   else if (membership == UV_LEAVE_GROUP)
733     optname = MCAST_LEAVE_SOURCE_GROUP;
734   else
735     return UV_EINVAL;
736 
737   if (setsockopt(handle->io_watcher.fd,
738                  IPPROTO_IPV6,
739                  optname,
740                  &mreq,
741                  sizeof(mreq))) {
742     return UV__ERR(errno);
743   }
744 
745   return 0;
746 }
747 #endif
748 
749 
uv_udp_init_ex(uv_loop_t * loop,uv_udp_t * handle,unsigned int flags)750 int uv_udp_init_ex(uv_loop_t* loop, uv_udp_t* handle, unsigned int flags) {
751   int domain;
752   int err;
753   int fd;
754 
755   /* Use the lower 8 bits for the domain */
756   domain = flags & 0xFF;
757   if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
758     return UV_EINVAL;
759 
760   if (flags & ~0xFF)
761     return UV_EINVAL;
762 
763   if (domain != AF_UNSPEC) {
764     err = uv__socket(domain, SOCK_DGRAM, 0);
765     if (err < 0)
766       return err;
767     fd = err;
768   } else {
769     fd = -1;
770   }
771 
772   uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
773   handle->alloc_cb = NULL;
774   handle->recv_cb = NULL;
775   handle->send_queue_size = 0;
776   handle->send_queue_count = 0;
777   uv__io_init(&handle->io_watcher, uv__udp_io, fd);
778   QUEUE_INIT(&handle->write_queue);
779   QUEUE_INIT(&handle->write_completed_queue);
780 
781   return 0;
782 }
783 
784 
uv_udp_init(uv_loop_t * loop,uv_udp_t * handle)785 int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
786   return uv_udp_init_ex(loop, handle, AF_UNSPEC);
787 }
788 
789 
uv_udp_open(uv_udp_t * handle,uv_os_sock_t sock)790 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
791   int err;
792 
793   /* Check for already active socket. */
794   if (handle->io_watcher.fd != -1)
795     return UV_EBUSY;
796 
797   if (uv__fd_exists(handle->loop, sock))
798     return UV_EEXIST;
799 
800   err = uv__nonblock(sock, 1);
801   if (err)
802     return err;
803 
804   err = uv__set_reuse(sock);
805   if (err)
806     return err;
807 
808   handle->io_watcher.fd = sock;
809   if (uv__udp_is_connected(handle))
810     handle->flags |= UV_HANDLE_UDP_CONNECTED;
811 
812   return 0;
813 }
814 
815 
uv_udp_set_membership(uv_udp_t * handle,const char * multicast_addr,const char * interface_addr,uv_membership membership)816 int uv_udp_set_membership(uv_udp_t* handle,
817                           const char* multicast_addr,
818                           const char* interface_addr,
819                           uv_membership membership) {
820   int err;
821   struct sockaddr_in addr4;
822   struct sockaddr_in6 addr6;
823 
824   if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
825     err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
826     if (err)
827       return err;
828     return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
829   } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
830     err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
831     if (err)
832       return err;
833     return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
834   } else {
835     return UV_EINVAL;
836   }
837 }
838 
839 
uv_udp_set_source_membership(uv_udp_t * handle,const char * multicast_addr,const char * interface_addr,const char * source_addr,uv_membership membership)840 int uv_udp_set_source_membership(uv_udp_t* handle,
841                                  const char* multicast_addr,
842                                  const char* interface_addr,
843                                  const char* source_addr,
844                                  uv_membership membership) {
845 #if !defined(__OpenBSD__) && !defined(__NetBSD__)
846   int err;
847   struct sockaddr_storage mcast_addr;
848   struct sockaddr_in* mcast_addr4;
849   struct sockaddr_in6* mcast_addr6;
850   struct sockaddr_storage src_addr;
851   struct sockaddr_in* src_addr4;
852   struct sockaddr_in6* src_addr6;
853 
854   mcast_addr4 = (struct sockaddr_in*)&mcast_addr;
855   mcast_addr6 = (struct sockaddr_in6*)&mcast_addr;
856   src_addr4 = (struct sockaddr_in*)&src_addr;
857   src_addr6 = (struct sockaddr_in6*)&src_addr;
858 
859   err = uv_ip4_addr(multicast_addr, 0, mcast_addr4);
860   if (err) {
861     err = uv_ip6_addr(multicast_addr, 0, mcast_addr6);
862     if (err)
863       return err;
864     err = uv_ip6_addr(source_addr, 0, src_addr6);
865     if (err)
866       return err;
867     return uv__udp_set_source_membership6(handle,
868                                           mcast_addr6,
869                                           interface_addr,
870                                           src_addr6,
871                                           membership);
872   }
873 
874   err = uv_ip4_addr(source_addr, 0, src_addr4);
875   if (err)
876     return err;
877   return uv__udp_set_source_membership4(handle,
878                                         mcast_addr4,
879                                         interface_addr,
880                                         src_addr4,
881                                         membership);
882 #else
883   return UV_ENOSYS;
884 #endif
885 }
886 
887 
uv__setsockopt(uv_udp_t * handle,int option4,int option6,const void * val,size_t size)888 static int uv__setsockopt(uv_udp_t* handle,
889                          int option4,
890                          int option6,
891                          const void* val,
892                          size_t size) {
893   int r;
894 
895   if (handle->flags & UV_HANDLE_IPV6)
896     r = setsockopt(handle->io_watcher.fd,
897                    IPPROTO_IPV6,
898                    option6,
899                    val,
900                    size);
901   else
902     r = setsockopt(handle->io_watcher.fd,
903                    IPPROTO_IP,
904                    option4,
905                    val,
906                    size);
907   if (r)
908     return UV__ERR(errno);
909 
910   return 0;
911 }
912 
uv__setsockopt_maybe_char(uv_udp_t * handle,int option4,int option6,int val)913 static int uv__setsockopt_maybe_char(uv_udp_t* handle,
914                                      int option4,
915                                      int option6,
916                                      int val) {
917 #if defined(__sun) || defined(_AIX) || defined(__MVS__)
918   char arg = val;
919 #elif defined(__OpenBSD__)
920   unsigned char arg = val;
921 #else
922   int arg = val;
923 #endif
924 
925   if (val < 0 || val > 255)
926     return UV_EINVAL;
927 
928   return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
929 }
930 
931 
uv_udp_set_broadcast(uv_udp_t * handle,int on)932 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
933   if (setsockopt(handle->io_watcher.fd,
934                  SOL_SOCKET,
935                  SO_BROADCAST,
936                  &on,
937                  sizeof(on))) {
938     return UV__ERR(errno);
939   }
940 
941   return 0;
942 }
943 
944 
uv_udp_set_ttl(uv_udp_t * handle,int ttl)945 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
946   if (ttl < 1 || ttl > 255)
947     return UV_EINVAL;
948 
949 #if defined(__MVS__)
950   if (!(handle->flags & UV_HANDLE_IPV6))
951     return UV_ENOTSUP;  /* zOS does not support setting ttl for IPv4 */
952 #endif
953 
954 /*
955  * On Solaris and derivatives such as SmartOS, the length of socket options
956  * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
957  * so hardcode the size of these options on this platform,
958  * and use the general uv__setsockopt_maybe_char call on other platforms.
959  */
960 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
961     defined(__MVS__)
962 
963   return uv__setsockopt(handle,
964                         IP_TTL,
965                         IPV6_UNICAST_HOPS,
966                         &ttl,
967                         sizeof(ttl));
968 
969 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
970            defined(__MVS__)) */
971 
972   return uv__setsockopt_maybe_char(handle,
973                                    IP_TTL,
974                                    IPV6_UNICAST_HOPS,
975                                    ttl);
976 
977 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
978           defined(__MVS__) */
979 }
980 
981 
uv_udp_set_multicast_ttl(uv_udp_t * handle,int ttl)982 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
983 /*
984  * On Solaris and derivatives such as SmartOS, the length of socket options
985  * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
986  * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
987  * and use the general uv__setsockopt_maybe_char call otherwise.
988  */
989 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
990     defined(__MVS__)
991   if (handle->flags & UV_HANDLE_IPV6)
992     return uv__setsockopt(handle,
993                           IP_MULTICAST_TTL,
994                           IPV6_MULTICAST_HOPS,
995                           &ttl,
996                           sizeof(ttl));
997 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
998     defined(__MVS__) */
999 
1000   return uv__setsockopt_maybe_char(handle,
1001                                    IP_MULTICAST_TTL,
1002                                    IPV6_MULTICAST_HOPS,
1003                                    ttl);
1004 }
1005 
1006 
uv_udp_set_multicast_loop(uv_udp_t * handle,int on)1007 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1008 /*
1009  * On Solaris and derivatives such as SmartOS, the length of socket options
1010  * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1011  * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1012  * and use the general uv__setsockopt_maybe_char call otherwise.
1013  */
1014 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1015     defined(__MVS__)
1016   if (handle->flags & UV_HANDLE_IPV6)
1017     return uv__setsockopt(handle,
1018                           IP_MULTICAST_LOOP,
1019                           IPV6_MULTICAST_LOOP,
1020                           &on,
1021                           sizeof(on));
1022 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1023     defined(__MVS__) */
1024 
1025   return uv__setsockopt_maybe_char(handle,
1026                                    IP_MULTICAST_LOOP,
1027                                    IPV6_MULTICAST_LOOP,
1028                                    on);
1029 }
1030 
uv_udp_set_multicast_interface(uv_udp_t * handle,const char * interface_addr)1031 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1032   struct sockaddr_storage addr_st;
1033   struct sockaddr_in* addr4;
1034   struct sockaddr_in6* addr6;
1035 
1036   addr4 = (struct sockaddr_in*) &addr_st;
1037   addr6 = (struct sockaddr_in6*) &addr_st;
1038 
1039   if (!interface_addr) {
1040     memset(&addr_st, 0, sizeof addr_st);
1041     if (handle->flags & UV_HANDLE_IPV6) {
1042       addr_st.ss_family = AF_INET6;
1043       addr6->sin6_scope_id = 0;
1044     } else {
1045       addr_st.ss_family = AF_INET;
1046       addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1047     }
1048   } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1049     /* nothing, address was parsed */
1050   } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1051     /* nothing, address was parsed */
1052   } else {
1053     return UV_EINVAL;
1054   }
1055 
1056   if (addr_st.ss_family == AF_INET) {
1057     if (setsockopt(handle->io_watcher.fd,
1058                    IPPROTO_IP,
1059                    IP_MULTICAST_IF,
1060                    (void*) &addr4->sin_addr,
1061                    sizeof(addr4->sin_addr)) == -1) {
1062       return UV__ERR(errno);
1063     }
1064   } else if (addr_st.ss_family == AF_INET6) {
1065     if (setsockopt(handle->io_watcher.fd,
1066                    IPPROTO_IPV6,
1067                    IPV6_MULTICAST_IF,
1068                    &addr6->sin6_scope_id,
1069                    sizeof(addr6->sin6_scope_id)) == -1) {
1070       return UV__ERR(errno);
1071     }
1072   } else {
1073     assert(0 && "unexpected address family");
1074     abort();
1075   }
1076 
1077   return 0;
1078 }
1079 
uv_udp_getpeername(const uv_udp_t * handle,struct sockaddr * name,int * namelen)1080 int uv_udp_getpeername(const uv_udp_t* handle,
1081                        struct sockaddr* name,
1082                        int* namelen) {
1083 
1084   return uv__getsockpeername((const uv_handle_t*) handle,
1085                              getpeername,
1086                              name,
1087                              namelen);
1088 }
1089 
uv_udp_getsockname(const uv_udp_t * handle,struct sockaddr * name,int * namelen)1090 int uv_udp_getsockname(const uv_udp_t* handle,
1091                        struct sockaddr* name,
1092                        int* namelen) {
1093 
1094   return uv__getsockpeername((const uv_handle_t*) handle,
1095                              getsockname,
1096                              name,
1097                              namelen);
1098 }
1099 
1100 
uv__udp_recv_start(uv_udp_t * handle,uv_alloc_cb alloc_cb,uv_udp_recv_cb recv_cb)1101 int uv__udp_recv_start(uv_udp_t* handle,
1102                        uv_alloc_cb alloc_cb,
1103                        uv_udp_recv_cb recv_cb) {
1104   int err;
1105 
1106   if (alloc_cb == NULL || recv_cb == NULL)
1107     return UV_EINVAL;
1108 
1109   if (uv__io_active(&handle->io_watcher, POLLIN))
1110     return UV_EALREADY;  /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1111 
1112   err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1113   if (err)
1114     return err;
1115 
1116   handle->alloc_cb = alloc_cb;
1117   handle->recv_cb = recv_cb;
1118 
1119   uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1120   uv__handle_start(handle);
1121 
1122   return 0;
1123 }
1124 
1125 
uv__udp_recv_stop(uv_udp_t * handle)1126 int uv__udp_recv_stop(uv_udp_t* handle) {
1127   uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1128 
1129   if (!uv__io_active(&handle->io_watcher, POLLOUT))
1130     uv__handle_stop(handle);
1131 
1132   handle->alloc_cb = NULL;
1133   handle->recv_cb = NULL;
1134 
1135   return 0;
1136 }
1137