1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv.h"
23 #include "internal.h"
24
25 #include <assert.h>
26 #include <string.h>
27 #include <errno.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #if defined(__MVS__)
31 #include <xti.h>
32 #endif
33 #include <sys/un.h>
34
35 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
36 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
37 #endif
38
39 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
40 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
41 #endif
42
43
44 static void uv__udp_run_completed(uv_udp_t* handle);
45 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
46 static void uv__udp_recvmsg(uv_udp_t* handle);
47 static void uv__udp_sendmsg(uv_udp_t* handle);
48 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
49 int domain,
50 unsigned int flags);
51
52
uv__udp_close(uv_udp_t * handle)53 void uv__udp_close(uv_udp_t* handle) {
54 uv__io_close(handle->loop, &handle->io_watcher);
55 uv__handle_stop(handle);
56
57 if (handle->io_watcher.fd != -1) {
58 uv__close(handle->io_watcher.fd);
59 handle->io_watcher.fd = -1;
60 }
61 }
62
63
uv__udp_finish_close(uv_udp_t * handle)64 void uv__udp_finish_close(uv_udp_t* handle) {
65 uv_udp_send_t* req;
66 QUEUE* q;
67
68 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
69 assert(handle->io_watcher.fd == -1);
70
71 while (!QUEUE_EMPTY(&handle->write_queue)) {
72 q = QUEUE_HEAD(&handle->write_queue);
73 QUEUE_REMOVE(q);
74
75 req = QUEUE_DATA(q, uv_udp_send_t, queue);
76 req->status = UV_ECANCELED;
77 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
78 }
79
80 uv__udp_run_completed(handle);
81
82 assert(handle->send_queue_size == 0);
83 assert(handle->send_queue_count == 0);
84
85 /* Now tear down the handle. */
86 handle->recv_cb = NULL;
87 handle->alloc_cb = NULL;
88 /* but _do not_ touch close_cb */
89 }
90
91
uv__udp_run_completed(uv_udp_t * handle)92 static void uv__udp_run_completed(uv_udp_t* handle) {
93 uv_udp_send_t* req;
94 QUEUE* q;
95
96 assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
97 handle->flags |= UV_HANDLE_UDP_PROCESSING;
98
99 while (!QUEUE_EMPTY(&handle->write_completed_queue)) {
100 q = QUEUE_HEAD(&handle->write_completed_queue);
101 QUEUE_REMOVE(q);
102
103 req = QUEUE_DATA(q, uv_udp_send_t, queue);
104 uv__req_unregister(handle->loop, req);
105
106 handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
107 handle->send_queue_count--;
108
109 if (req->bufs != req->bufsml)
110 uv__free(req->bufs);
111 req->bufs = NULL;
112
113 if (req->send_cb == NULL)
114 continue;
115
116 /* req->status >= 0 == bytes written
117 * req->status < 0 == errno
118 */
119 if (req->status >= 0)
120 req->send_cb(req, 0);
121 else
122 req->send_cb(req, req->status);
123 }
124
125 if (QUEUE_EMPTY(&handle->write_queue)) {
126 /* Pending queue and completion queue empty, stop watcher. */
127 uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
128 if (!uv__io_active(&handle->io_watcher, POLLIN))
129 uv__handle_stop(handle);
130 }
131
132 handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
133 }
134
135
uv__udp_io(uv_loop_t * loop,uv__io_t * w,unsigned int revents)136 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
137 uv_udp_t* handle;
138
139 handle = container_of(w, uv_udp_t, io_watcher);
140 assert(handle->type == UV_UDP);
141
142 if (revents & POLLIN)
143 uv__udp_recvmsg(handle);
144
145 if (revents & POLLOUT) {
146 uv__udp_sendmsg(handle);
147 uv__udp_run_completed(handle);
148 }
149 }
150
151
uv__udp_recvmsg(uv_udp_t * handle)152 static void uv__udp_recvmsg(uv_udp_t* handle) {
153 struct sockaddr_storage peer;
154 struct msghdr h;
155 ssize_t nread;
156 uv_buf_t buf;
157 int flags;
158 int count;
159
160 assert(handle->recv_cb != NULL);
161 assert(handle->alloc_cb != NULL);
162
163 /* Prevent loop starvation when the data comes in as fast as (or faster than)
164 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
165 */
166 count = 32;
167
168 do {
169 buf = uv_buf_init(NULL, 0);
170 handle->alloc_cb((uv_handle_t*) handle, 64 * 1024, &buf);
171 if (buf.base == NULL || buf.len == 0) {
172 handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
173 return;
174 }
175 assert(buf.base != NULL);
176
177 memset(&h, 0, sizeof(h));
178 memset(&peer, 0, sizeof(peer));
179 h.msg_name = &peer;
180 h.msg_namelen = sizeof(peer);
181 h.msg_iov = (void*) &buf;
182 h.msg_iovlen = 1;
183
184 do {
185 nread = recvmsg(handle->io_watcher.fd, &h, 0);
186 }
187 while (nread == -1 && errno == EINTR);
188
189 if (nread == -1) {
190 if (errno == EAGAIN || errno == EWOULDBLOCK)
191 handle->recv_cb(handle, 0, &buf, NULL, 0);
192 else
193 handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
194 }
195 else {
196 flags = 0;
197 if (h.msg_flags & MSG_TRUNC)
198 flags |= UV_UDP_PARTIAL;
199
200 handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
201 }
202 }
203 /* recv_cb callback may decide to pause or close the handle */
204 while (nread != -1
205 && count-- > 0
206 && handle->io_watcher.fd != -1
207 && handle->recv_cb != NULL);
208 }
209
210
uv__udp_sendmsg(uv_udp_t * handle)211 static void uv__udp_sendmsg(uv_udp_t* handle) {
212 uv_udp_send_t* req;
213 QUEUE* q;
214 struct msghdr h;
215 ssize_t size;
216
217 while (!QUEUE_EMPTY(&handle->write_queue)) {
218 q = QUEUE_HEAD(&handle->write_queue);
219 assert(q != NULL);
220
221 req = QUEUE_DATA(q, uv_udp_send_t, queue);
222 assert(req != NULL);
223
224 memset(&h, 0, sizeof h);
225 if (req->addr.ss_family == AF_UNSPEC) {
226 h.msg_name = NULL;
227 h.msg_namelen = 0;
228 } else {
229 h.msg_name = &req->addr;
230 if (req->addr.ss_family == AF_INET6)
231 h.msg_namelen = sizeof(struct sockaddr_in6);
232 else if (req->addr.ss_family == AF_INET)
233 h.msg_namelen = sizeof(struct sockaddr_in);
234 else if (req->addr.ss_family == AF_UNIX)
235 h.msg_namelen = sizeof(struct sockaddr_un);
236 else {
237 assert(0 && "unsupported address family");
238 abort();
239 }
240 }
241 h.msg_iov = (struct iovec*) req->bufs;
242 h.msg_iovlen = req->nbufs;
243
244 do {
245 size = sendmsg(handle->io_watcher.fd, &h, 0);
246 } while (size == -1 && errno == EINTR);
247
248 if (size == -1) {
249 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
250 break;
251 }
252
253 req->status = (size == -1 ? UV__ERR(errno) : size);
254
255 /* Sending a datagram is an atomic operation: either all data
256 * is written or nothing is (and EMSGSIZE is raised). That is
257 * why we don't handle partial writes. Just pop the request
258 * off the write queue and onto the completed queue, done.
259 */
260 QUEUE_REMOVE(&req->queue);
261 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
262 uv__io_feed(handle->loop, &handle->io_watcher);
263 }
264 }
265
266
267 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
268 * refinements for programs that use multicast.
269 *
270 * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
271 * are different from the BSDs: it _shares_ the port rather than steal it
272 * from the current listener. While useful, it's not something we can emulate
273 * on other platforms so we don't enable it.
274 *
275 * zOS does not support getsockname with SO_REUSEPORT option when using
276 * AF_UNIX.
277 */
uv__set_reuse(int fd)278 static int uv__set_reuse(int fd) {
279 int yes;
280 yes = 1;
281
282 #if defined(SO_REUSEPORT) && defined(__MVS__)
283 struct sockaddr_in sockfd;
284 unsigned int sockfd_len = sizeof(sockfd);
285 if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
286 return UV__ERR(errno);
287 if (sockfd.sin_family == AF_UNIX) {
288 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
289 return UV__ERR(errno);
290 } else {
291 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
292 return UV__ERR(errno);
293 }
294 #elif defined(SO_REUSEPORT) && !defined(__linux__)
295 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
296 return UV__ERR(errno);
297 #else
298 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
299 return UV__ERR(errno);
300 #endif
301
302 return 0;
303 }
304
305
uv__udp_bind(uv_udp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)306 int uv__udp_bind(uv_udp_t* handle,
307 const struct sockaddr* addr,
308 unsigned int addrlen,
309 unsigned int flags) {
310 int err;
311 int yes;
312 int fd;
313
314 /* Check for bad flags. */
315 if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR))
316 return UV_EINVAL;
317
318 /* Cannot set IPv6-only mode on non-IPv6 socket. */
319 if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
320 return UV_EINVAL;
321
322 fd = handle->io_watcher.fd;
323 if (fd == -1) {
324 err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
325 if (err < 0)
326 return err;
327 fd = err;
328 handle->io_watcher.fd = fd;
329 }
330
331 if (flags & UV_UDP_REUSEADDR) {
332 err = uv__set_reuse(fd);
333 if (err)
334 return err;
335 }
336
337 if (flags & UV_UDP_IPV6ONLY) {
338 #ifdef IPV6_V6ONLY
339 yes = 1;
340 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
341 err = UV__ERR(errno);
342 return err;
343 }
344 #else
345 err = UV_ENOTSUP;
346 return err;
347 #endif
348 }
349
350 if (bind(fd, addr, addrlen)) {
351 err = UV__ERR(errno);
352 if (errno == EAFNOSUPPORT)
353 /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
354 * socket created with AF_INET to an AF_INET6 address or vice versa. */
355 err = UV_EINVAL;
356 return err;
357 }
358
359 if (addr->sa_family == AF_INET6)
360 handle->flags |= UV_HANDLE_IPV6;
361
362 handle->flags |= UV_HANDLE_BOUND;
363 return 0;
364 }
365
366
uv__udp_maybe_deferred_bind(uv_udp_t * handle,int domain,unsigned int flags)367 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
368 int domain,
369 unsigned int flags) {
370 union {
371 struct sockaddr_in6 in6;
372 struct sockaddr_in in;
373 struct sockaddr addr;
374 } taddr;
375 socklen_t addrlen;
376
377 if (handle->io_watcher.fd != -1)
378 return 0;
379
380 switch (domain) {
381 case AF_INET:
382 {
383 struct sockaddr_in* addr = &taddr.in;
384 memset(addr, 0, sizeof *addr);
385 addr->sin_family = AF_INET;
386 addr->sin_addr.s_addr = INADDR_ANY;
387 addrlen = sizeof *addr;
388 break;
389 }
390 case AF_INET6:
391 {
392 struct sockaddr_in6* addr = &taddr.in6;
393 memset(addr, 0, sizeof *addr);
394 addr->sin6_family = AF_INET6;
395 addr->sin6_addr = in6addr_any;
396 addrlen = sizeof *addr;
397 break;
398 }
399 default:
400 assert(0 && "unsupported address family");
401 abort();
402 }
403
404 return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
405 }
406
407
uv__udp_connect(uv_udp_t * handle,const struct sockaddr * addr,unsigned int addrlen)408 int uv__udp_connect(uv_udp_t* handle,
409 const struct sockaddr* addr,
410 unsigned int addrlen) {
411 int err;
412
413 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
414 if (err)
415 return err;
416
417 do {
418 errno = 0;
419 err = connect(handle->io_watcher.fd, addr, addrlen);
420 } while (err == -1 && errno == EINTR);
421
422 if (err)
423 return UV__ERR(errno);
424
425 handle->flags |= UV_HANDLE_UDP_CONNECTED;
426
427 return 0;
428 }
429
430
uv__udp_disconnect(uv_udp_t * handle)431 int uv__udp_disconnect(uv_udp_t* handle) {
432 int r;
433 struct sockaddr addr;
434
435 memset(&addr, 0, sizeof(addr));
436
437 addr.sa_family = AF_UNSPEC;
438
439 do {
440 errno = 0;
441 r = connect(handle->io_watcher.fd, &addr, sizeof(addr));
442 } while (r == -1 && errno == EINTR);
443
444 if (r == -1 && errno != EAFNOSUPPORT)
445 return UV__ERR(errno);
446
447 handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
448 return 0;
449 }
450
451
uv__udp_send(uv_udp_send_t * req,uv_udp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr * addr,unsigned int addrlen,uv_udp_send_cb send_cb)452 int uv__udp_send(uv_udp_send_t* req,
453 uv_udp_t* handle,
454 const uv_buf_t bufs[],
455 unsigned int nbufs,
456 const struct sockaddr* addr,
457 unsigned int addrlen,
458 uv_udp_send_cb send_cb) {
459 int err;
460 int empty_queue;
461
462 assert(nbufs > 0);
463
464 if (addr) {
465 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
466 if (err)
467 return err;
468 }
469
470 /* It's legal for send_queue_count > 0 even when the write_queue is empty;
471 * it means there are error-state requests in the write_completed_queue that
472 * will touch up send_queue_size/count later.
473 */
474 empty_queue = (handle->send_queue_count == 0);
475
476 uv__req_init(handle->loop, req, UV_UDP_SEND);
477 assert(addrlen <= sizeof(req->addr));
478 if (addr == NULL)
479 req->addr.ss_family = AF_UNSPEC;
480 else
481 memcpy(&req->addr, addr, addrlen);
482 req->send_cb = send_cb;
483 req->handle = handle;
484 req->nbufs = nbufs;
485
486 req->bufs = req->bufsml;
487 if (nbufs > ARRAY_SIZE(req->bufsml))
488 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
489
490 if (req->bufs == NULL) {
491 uv__req_unregister(handle->loop, req);
492 return UV_ENOMEM;
493 }
494
495 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
496 handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
497 handle->send_queue_count++;
498 QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
499 uv__handle_start(handle);
500
501 if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
502 uv__udp_sendmsg(handle);
503
504 /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
505 * away. In such cases the `io_watcher` has to be queued for asynchronous
506 * write.
507 */
508 if (!QUEUE_EMPTY(&handle->write_queue))
509 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
510 } else {
511 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
512 }
513
514 return 0;
515 }
516
517
uv__udp_try_send(uv_udp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,const struct sockaddr * addr,unsigned int addrlen)518 int uv__udp_try_send(uv_udp_t* handle,
519 const uv_buf_t bufs[],
520 unsigned int nbufs,
521 const struct sockaddr* addr,
522 unsigned int addrlen) {
523 int err;
524 struct msghdr h;
525 ssize_t size;
526
527 assert(nbufs > 0);
528
529 /* already sending a message */
530 if (handle->send_queue_count != 0)
531 return UV_EAGAIN;
532
533 if (addr) {
534 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
535 if (err)
536 return err;
537 } else {
538 assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
539 }
540
541 memset(&h, 0, sizeof h);
542 h.msg_name = (struct sockaddr*) addr;
543 h.msg_namelen = addrlen;
544 h.msg_iov = (struct iovec*) bufs;
545 h.msg_iovlen = nbufs;
546
547 do {
548 size = sendmsg(handle->io_watcher.fd, &h, 0);
549 } while (size == -1 && errno == EINTR);
550
551 if (size == -1) {
552 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
553 return UV_EAGAIN;
554 else
555 return UV__ERR(errno);
556 }
557
558 return size;
559 }
560
561
uv__udp_set_membership4(uv_udp_t * handle,const struct sockaddr_in * multicast_addr,const char * interface_addr,uv_membership membership)562 static int uv__udp_set_membership4(uv_udp_t* handle,
563 const struct sockaddr_in* multicast_addr,
564 const char* interface_addr,
565 uv_membership membership) {
566 struct ip_mreq mreq;
567 int optname;
568 int err;
569
570 memset(&mreq, 0, sizeof mreq);
571
572 if (interface_addr) {
573 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
574 if (err)
575 return err;
576 } else {
577 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
578 }
579
580 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
581
582 switch (membership) {
583 case UV_JOIN_GROUP:
584 optname = IP_ADD_MEMBERSHIP;
585 break;
586 case UV_LEAVE_GROUP:
587 optname = IP_DROP_MEMBERSHIP;
588 break;
589 default:
590 return UV_EINVAL;
591 }
592
593 if (setsockopt(handle->io_watcher.fd,
594 IPPROTO_IP,
595 optname,
596 &mreq,
597 sizeof(mreq))) {
598 #if defined(__MVS__)
599 if (errno == ENXIO)
600 return UV_ENODEV;
601 #endif
602 return UV__ERR(errno);
603 }
604
605 return 0;
606 }
607
608
uv__udp_set_membership6(uv_udp_t * handle,const struct sockaddr_in6 * multicast_addr,const char * interface_addr,uv_membership membership)609 static int uv__udp_set_membership6(uv_udp_t* handle,
610 const struct sockaddr_in6* multicast_addr,
611 const char* interface_addr,
612 uv_membership membership) {
613 int optname;
614 struct ipv6_mreq mreq;
615 struct sockaddr_in6 addr6;
616
617 memset(&mreq, 0, sizeof mreq);
618
619 if (interface_addr) {
620 if (uv_ip6_addr(interface_addr, 0, &addr6))
621 return UV_EINVAL;
622 mreq.ipv6mr_interface = addr6.sin6_scope_id;
623 } else {
624 mreq.ipv6mr_interface = 0;
625 }
626
627 mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
628
629 switch (membership) {
630 case UV_JOIN_GROUP:
631 optname = IPV6_ADD_MEMBERSHIP;
632 break;
633 case UV_LEAVE_GROUP:
634 optname = IPV6_DROP_MEMBERSHIP;
635 break;
636 default:
637 return UV_EINVAL;
638 }
639
640 if (setsockopt(handle->io_watcher.fd,
641 IPPROTO_IPV6,
642 optname,
643 &mreq,
644 sizeof(mreq))) {
645 #if defined(__MVS__)
646 if (errno == ENXIO)
647 return UV_ENODEV;
648 #endif
649 return UV__ERR(errno);
650 }
651
652 return 0;
653 }
654
655
656 #if !defined(__OpenBSD__) && !defined(__NetBSD__)
uv__udp_set_source_membership4(uv_udp_t * handle,const struct sockaddr_in * multicast_addr,const char * interface_addr,const struct sockaddr_in * source_addr,uv_membership membership)657 static int uv__udp_set_source_membership4(uv_udp_t* handle,
658 const struct sockaddr_in* multicast_addr,
659 const char* interface_addr,
660 const struct sockaddr_in* source_addr,
661 uv_membership membership) {
662 struct ip_mreq_source mreq;
663 int optname;
664 int err;
665
666 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
667 if (err)
668 return err;
669
670 memset(&mreq, 0, sizeof(mreq));
671
672 if (interface_addr != NULL) {
673 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
674 if (err)
675 return err;
676 } else {
677 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
678 }
679
680 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
681 mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
682
683 if (membership == UV_JOIN_GROUP)
684 optname = IP_ADD_SOURCE_MEMBERSHIP;
685 else if (membership == UV_LEAVE_GROUP)
686 optname = IP_DROP_SOURCE_MEMBERSHIP;
687 else
688 return UV_EINVAL;
689
690 if (setsockopt(handle->io_watcher.fd,
691 IPPROTO_IP,
692 optname,
693 &mreq,
694 sizeof(mreq))) {
695 return UV__ERR(errno);
696 }
697
698 return 0;
699 }
700
701
uv__udp_set_source_membership6(uv_udp_t * handle,const struct sockaddr_in6 * multicast_addr,const char * interface_addr,const struct sockaddr_in6 * source_addr,uv_membership membership)702 static int uv__udp_set_source_membership6(uv_udp_t* handle,
703 const struct sockaddr_in6* multicast_addr,
704 const char* interface_addr,
705 const struct sockaddr_in6* source_addr,
706 uv_membership membership) {
707 struct group_source_req mreq;
708 struct sockaddr_in6 addr6;
709 int optname;
710 int err;
711
712 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
713 if (err)
714 return err;
715
716 memset(&mreq, 0, sizeof(mreq));
717
718 if (interface_addr != NULL) {
719 err = uv_ip6_addr(interface_addr, 0, &addr6);
720 if (err)
721 return err;
722 mreq.gsr_interface = addr6.sin6_scope_id;
723 } else {
724 mreq.gsr_interface = 0;
725 }
726
727 memcpy(&mreq.gsr_group, multicast_addr, sizeof(mreq.gsr_group));
728 memcpy(&mreq.gsr_source, source_addr, sizeof(mreq.gsr_source));
729
730 if (membership == UV_JOIN_GROUP)
731 optname = MCAST_JOIN_SOURCE_GROUP;
732 else if (membership == UV_LEAVE_GROUP)
733 optname = MCAST_LEAVE_SOURCE_GROUP;
734 else
735 return UV_EINVAL;
736
737 if (setsockopt(handle->io_watcher.fd,
738 IPPROTO_IPV6,
739 optname,
740 &mreq,
741 sizeof(mreq))) {
742 return UV__ERR(errno);
743 }
744
745 return 0;
746 }
747 #endif
748
749
uv_udp_init_ex(uv_loop_t * loop,uv_udp_t * handle,unsigned int flags)750 int uv_udp_init_ex(uv_loop_t* loop, uv_udp_t* handle, unsigned int flags) {
751 int domain;
752 int err;
753 int fd;
754
755 /* Use the lower 8 bits for the domain */
756 domain = flags & 0xFF;
757 if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
758 return UV_EINVAL;
759
760 if (flags & ~0xFF)
761 return UV_EINVAL;
762
763 if (domain != AF_UNSPEC) {
764 err = uv__socket(domain, SOCK_DGRAM, 0);
765 if (err < 0)
766 return err;
767 fd = err;
768 } else {
769 fd = -1;
770 }
771
772 uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
773 handle->alloc_cb = NULL;
774 handle->recv_cb = NULL;
775 handle->send_queue_size = 0;
776 handle->send_queue_count = 0;
777 uv__io_init(&handle->io_watcher, uv__udp_io, fd);
778 QUEUE_INIT(&handle->write_queue);
779 QUEUE_INIT(&handle->write_completed_queue);
780
781 return 0;
782 }
783
784
uv_udp_init(uv_loop_t * loop,uv_udp_t * handle)785 int uv_udp_init(uv_loop_t* loop, uv_udp_t* handle) {
786 return uv_udp_init_ex(loop, handle, AF_UNSPEC);
787 }
788
789
uv_udp_open(uv_udp_t * handle,uv_os_sock_t sock)790 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
791 int err;
792
793 /* Check for already active socket. */
794 if (handle->io_watcher.fd != -1)
795 return UV_EBUSY;
796
797 if (uv__fd_exists(handle->loop, sock))
798 return UV_EEXIST;
799
800 err = uv__nonblock(sock, 1);
801 if (err)
802 return err;
803
804 err = uv__set_reuse(sock);
805 if (err)
806 return err;
807
808 handle->io_watcher.fd = sock;
809 if (uv__udp_is_connected(handle))
810 handle->flags |= UV_HANDLE_UDP_CONNECTED;
811
812 return 0;
813 }
814
815
uv_udp_set_membership(uv_udp_t * handle,const char * multicast_addr,const char * interface_addr,uv_membership membership)816 int uv_udp_set_membership(uv_udp_t* handle,
817 const char* multicast_addr,
818 const char* interface_addr,
819 uv_membership membership) {
820 int err;
821 struct sockaddr_in addr4;
822 struct sockaddr_in6 addr6;
823
824 if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
825 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
826 if (err)
827 return err;
828 return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
829 } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
830 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
831 if (err)
832 return err;
833 return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
834 } else {
835 return UV_EINVAL;
836 }
837 }
838
839
uv_udp_set_source_membership(uv_udp_t * handle,const char * multicast_addr,const char * interface_addr,const char * source_addr,uv_membership membership)840 int uv_udp_set_source_membership(uv_udp_t* handle,
841 const char* multicast_addr,
842 const char* interface_addr,
843 const char* source_addr,
844 uv_membership membership) {
845 #if !defined(__OpenBSD__) && !defined(__NetBSD__)
846 int err;
847 struct sockaddr_storage mcast_addr;
848 struct sockaddr_in* mcast_addr4;
849 struct sockaddr_in6* mcast_addr6;
850 struct sockaddr_storage src_addr;
851 struct sockaddr_in* src_addr4;
852 struct sockaddr_in6* src_addr6;
853
854 mcast_addr4 = (struct sockaddr_in*)&mcast_addr;
855 mcast_addr6 = (struct sockaddr_in6*)&mcast_addr;
856 src_addr4 = (struct sockaddr_in*)&src_addr;
857 src_addr6 = (struct sockaddr_in6*)&src_addr;
858
859 err = uv_ip4_addr(multicast_addr, 0, mcast_addr4);
860 if (err) {
861 err = uv_ip6_addr(multicast_addr, 0, mcast_addr6);
862 if (err)
863 return err;
864 err = uv_ip6_addr(source_addr, 0, src_addr6);
865 if (err)
866 return err;
867 return uv__udp_set_source_membership6(handle,
868 mcast_addr6,
869 interface_addr,
870 src_addr6,
871 membership);
872 }
873
874 err = uv_ip4_addr(source_addr, 0, src_addr4);
875 if (err)
876 return err;
877 return uv__udp_set_source_membership4(handle,
878 mcast_addr4,
879 interface_addr,
880 src_addr4,
881 membership);
882 #else
883 return UV_ENOSYS;
884 #endif
885 }
886
887
uv__setsockopt(uv_udp_t * handle,int option4,int option6,const void * val,size_t size)888 static int uv__setsockopt(uv_udp_t* handle,
889 int option4,
890 int option6,
891 const void* val,
892 size_t size) {
893 int r;
894
895 if (handle->flags & UV_HANDLE_IPV6)
896 r = setsockopt(handle->io_watcher.fd,
897 IPPROTO_IPV6,
898 option6,
899 val,
900 size);
901 else
902 r = setsockopt(handle->io_watcher.fd,
903 IPPROTO_IP,
904 option4,
905 val,
906 size);
907 if (r)
908 return UV__ERR(errno);
909
910 return 0;
911 }
912
uv__setsockopt_maybe_char(uv_udp_t * handle,int option4,int option6,int val)913 static int uv__setsockopt_maybe_char(uv_udp_t* handle,
914 int option4,
915 int option6,
916 int val) {
917 #if defined(__sun) || defined(_AIX) || defined(__MVS__)
918 char arg = val;
919 #elif defined(__OpenBSD__)
920 unsigned char arg = val;
921 #else
922 int arg = val;
923 #endif
924
925 if (val < 0 || val > 255)
926 return UV_EINVAL;
927
928 return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
929 }
930
931
uv_udp_set_broadcast(uv_udp_t * handle,int on)932 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
933 if (setsockopt(handle->io_watcher.fd,
934 SOL_SOCKET,
935 SO_BROADCAST,
936 &on,
937 sizeof(on))) {
938 return UV__ERR(errno);
939 }
940
941 return 0;
942 }
943
944
uv_udp_set_ttl(uv_udp_t * handle,int ttl)945 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
946 if (ttl < 1 || ttl > 255)
947 return UV_EINVAL;
948
949 #if defined(__MVS__)
950 if (!(handle->flags & UV_HANDLE_IPV6))
951 return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */
952 #endif
953
954 /*
955 * On Solaris and derivatives such as SmartOS, the length of socket options
956 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
957 * so hardcode the size of these options on this platform,
958 * and use the general uv__setsockopt_maybe_char call on other platforms.
959 */
960 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
961 defined(__MVS__)
962
963 return uv__setsockopt(handle,
964 IP_TTL,
965 IPV6_UNICAST_HOPS,
966 &ttl,
967 sizeof(ttl));
968
969 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
970 defined(__MVS__)) */
971
972 return uv__setsockopt_maybe_char(handle,
973 IP_TTL,
974 IPV6_UNICAST_HOPS,
975 ttl);
976
977 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
978 defined(__MVS__) */
979 }
980
981
uv_udp_set_multicast_ttl(uv_udp_t * handle,int ttl)982 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
983 /*
984 * On Solaris and derivatives such as SmartOS, the length of socket options
985 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
986 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
987 * and use the general uv__setsockopt_maybe_char call otherwise.
988 */
989 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
990 defined(__MVS__)
991 if (handle->flags & UV_HANDLE_IPV6)
992 return uv__setsockopt(handle,
993 IP_MULTICAST_TTL,
994 IPV6_MULTICAST_HOPS,
995 &ttl,
996 sizeof(ttl));
997 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
998 defined(__MVS__) */
999
1000 return uv__setsockopt_maybe_char(handle,
1001 IP_MULTICAST_TTL,
1002 IPV6_MULTICAST_HOPS,
1003 ttl);
1004 }
1005
1006
uv_udp_set_multicast_loop(uv_udp_t * handle,int on)1007 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1008 /*
1009 * On Solaris and derivatives such as SmartOS, the length of socket options
1010 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1011 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1012 * and use the general uv__setsockopt_maybe_char call otherwise.
1013 */
1014 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1015 defined(__MVS__)
1016 if (handle->flags & UV_HANDLE_IPV6)
1017 return uv__setsockopt(handle,
1018 IP_MULTICAST_LOOP,
1019 IPV6_MULTICAST_LOOP,
1020 &on,
1021 sizeof(on));
1022 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1023 defined(__MVS__) */
1024
1025 return uv__setsockopt_maybe_char(handle,
1026 IP_MULTICAST_LOOP,
1027 IPV6_MULTICAST_LOOP,
1028 on);
1029 }
1030
uv_udp_set_multicast_interface(uv_udp_t * handle,const char * interface_addr)1031 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1032 struct sockaddr_storage addr_st;
1033 struct sockaddr_in* addr4;
1034 struct sockaddr_in6* addr6;
1035
1036 addr4 = (struct sockaddr_in*) &addr_st;
1037 addr6 = (struct sockaddr_in6*) &addr_st;
1038
1039 if (!interface_addr) {
1040 memset(&addr_st, 0, sizeof addr_st);
1041 if (handle->flags & UV_HANDLE_IPV6) {
1042 addr_st.ss_family = AF_INET6;
1043 addr6->sin6_scope_id = 0;
1044 } else {
1045 addr_st.ss_family = AF_INET;
1046 addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1047 }
1048 } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1049 /* nothing, address was parsed */
1050 } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1051 /* nothing, address was parsed */
1052 } else {
1053 return UV_EINVAL;
1054 }
1055
1056 if (addr_st.ss_family == AF_INET) {
1057 if (setsockopt(handle->io_watcher.fd,
1058 IPPROTO_IP,
1059 IP_MULTICAST_IF,
1060 (void*) &addr4->sin_addr,
1061 sizeof(addr4->sin_addr)) == -1) {
1062 return UV__ERR(errno);
1063 }
1064 } else if (addr_st.ss_family == AF_INET6) {
1065 if (setsockopt(handle->io_watcher.fd,
1066 IPPROTO_IPV6,
1067 IPV6_MULTICAST_IF,
1068 &addr6->sin6_scope_id,
1069 sizeof(addr6->sin6_scope_id)) == -1) {
1070 return UV__ERR(errno);
1071 }
1072 } else {
1073 assert(0 && "unexpected address family");
1074 abort();
1075 }
1076
1077 return 0;
1078 }
1079
uv_udp_getpeername(const uv_udp_t * handle,struct sockaddr * name,int * namelen)1080 int uv_udp_getpeername(const uv_udp_t* handle,
1081 struct sockaddr* name,
1082 int* namelen) {
1083
1084 return uv__getsockpeername((const uv_handle_t*) handle,
1085 getpeername,
1086 name,
1087 namelen);
1088 }
1089
uv_udp_getsockname(const uv_udp_t * handle,struct sockaddr * name,int * namelen)1090 int uv_udp_getsockname(const uv_udp_t* handle,
1091 struct sockaddr* name,
1092 int* namelen) {
1093
1094 return uv__getsockpeername((const uv_handle_t*) handle,
1095 getsockname,
1096 name,
1097 namelen);
1098 }
1099
1100
uv__udp_recv_start(uv_udp_t * handle,uv_alloc_cb alloc_cb,uv_udp_recv_cb recv_cb)1101 int uv__udp_recv_start(uv_udp_t* handle,
1102 uv_alloc_cb alloc_cb,
1103 uv_udp_recv_cb recv_cb) {
1104 int err;
1105
1106 if (alloc_cb == NULL || recv_cb == NULL)
1107 return UV_EINVAL;
1108
1109 if (uv__io_active(&handle->io_watcher, POLLIN))
1110 return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1111
1112 err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1113 if (err)
1114 return err;
1115
1116 handle->alloc_cb = alloc_cb;
1117 handle->recv_cb = recv_cb;
1118
1119 uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1120 uv__handle_start(handle);
1121
1122 return 0;
1123 }
1124
1125
uv__udp_recv_stop(uv_udp_t * handle)1126 int uv__udp_recv_stop(uv_udp_t* handle) {
1127 uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1128
1129 if (!uv__io_active(&handle->io_watcher, POLLOUT))
1130 uv__handle_stop(handle);
1131
1132 handle->alloc_cb = NULL;
1133 handle->recv_cb = NULL;
1134
1135 return 0;
1136 }
1137