1 /*
2 Copyright (c) 2012-2014 Martin Sustrik All rights reserved.
3 Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
4 Copyright 2016 Garrett D'Amore <garrett@damore.org>
5
6 Permission is hereby granted, free of charge, to any person obtaining a copy
7 of this software and associated documentation files (the "Software"),
8 to deal in the Software without restriction, including without limitation
9 the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 and/or sell copies of the Software, and to permit persons to whom
11 the Software is furnished to do so, subject to the following conditions:
12
13 The above copyright notice and this permission notice shall be included
14 in all copies or substantial portions of the Software.
15
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 IN THE SOFTWARE.
23 */
24
25 #include "../nn.h"
26 #include "../transport.h"
27 #include "../protocol.h"
28
29 #include "global.h"
30 #include "sock.h"
31 #include "ep.h"
32
33 #include "../aio/pool.h"
34 #include "../aio/timer.h"
35
36 #include "../utils/err.h"
37 #include "../utils/alloc.h"
38 #include "../utils/mutex.h"
39 #include "../utils/condvar.h"
40 #include "../utils/once.h"
41 #include "../utils/list.h"
42 #include "../utils/cont.h"
43 #include "../utils/random.h"
44 #include "../utils/chunk.h"
45 #include "../utils/msg.h"
46 #include "../utils/attr.h"
47
48 #include "../pubsub.h"
49 #include "../pipeline.h"
50
51 #include <stddef.h>
52 #include <stdlib.h>
53 #include <string.h>
54 #include <time.h>
55
56 #if defined NN_HAVE_WINDOWS
57 #include "../utils/win.h"
58 #else
59 #include <unistd.h>
60 #endif
61
62 /* Max number of concurrent SP sockets. Configureable at build time */
63 #ifndef NN_MAX_SOCKETS
64 #define NN_MAX_SOCKETS 512
65 #endif
66
67 /* To save some space, list of unused socket slots uses uint16_t integers to
68 refer to individual sockets. If there's a need to more that 0x10000 sockets,
69 the type should be changed to uint32_t or int. */
70 CT_ASSERT (NN_MAX_SOCKETS <= 0x10000);
71
72 #define NN_CTX_FLAG_TERMED 1
73 #define NN_CTX_FLAG_TERMING 2
74 #define NN_CTX_FLAG_TERM (NN_CTX_FLAG_TERMED | NN_CTX_FLAG_TERMING)
75
76 #define NN_GLOBAL_SRC_STAT_TIMER 1
77
78 #define NN_GLOBAL_STATE_IDLE 1
79 #define NN_GLOBAL_STATE_ACTIVE 2
80 #define NN_GLOBAL_STATE_STOPPING_TIMER 3
81
82 /* We could put these in an external header file, but there really is
83 need to. We are the only thing that needs them. */
84 extern struct nn_socktype nn_pair_socktype;
85 extern struct nn_socktype nn_xpair_socktype;
86 extern struct nn_socktype nn_pub_socktype;
87 extern struct nn_socktype nn_sub_socktype;
88 extern struct nn_socktype nn_xpub_socktype;
89 extern struct nn_socktype nn_xsub_socktype;
90 extern struct nn_socktype nn_rep_socktype;
91 extern struct nn_socktype nn_req_socktype;
92 extern struct nn_socktype nn_xrep_socktype;
93 extern struct nn_socktype nn_xreq_socktype;
94 extern struct nn_socktype nn_push_socktype;
95 extern struct nn_socktype nn_xpush_socktype;
96 extern struct nn_socktype nn_pull_socktype;
97 extern struct nn_socktype nn_xpull_socktype;
98 extern struct nn_socktype nn_respondent_socktype;
99 extern struct nn_socktype nn_surveyor_socktype;
100 extern struct nn_socktype nn_xrespondent_socktype;
101 extern struct nn_socktype nn_xsurveyor_socktype;
102 extern struct nn_socktype nn_bus_socktype;
103 extern struct nn_socktype nn_xbus_socktype;
104
105 /* Array of known socket types. */
106 const struct nn_socktype *nn_socktypes[] = {
107 &nn_pair_socktype,
108 &nn_xpair_socktype,
109 &nn_pub_socktype,
110 &nn_sub_socktype,
111 &nn_xpub_socktype,
112 &nn_xsub_socktype,
113 &nn_rep_socktype,
114 &nn_req_socktype,
115 &nn_xrep_socktype,
116 &nn_xreq_socktype,
117 &nn_push_socktype,
118 &nn_xpush_socktype,
119 &nn_pull_socktype,
120 &nn_xpull_socktype,
121 &nn_respondent_socktype,
122 &nn_surveyor_socktype,
123 &nn_xrespondent_socktype,
124 &nn_xsurveyor_socktype,
125 &nn_bus_socktype,
126 &nn_xbus_socktype,
127 NULL,
128 };
129
130 /* As with protocols, we could have these in a header file, but we are the
131 only consumer, so just declare them inline. */
132
133 extern struct nn_transport nn_inproc;
134 extern struct nn_transport nn_ipc;
135 extern struct nn_transport nn_tcp;
136 extern struct nn_transport nn_ws;
137
138 const struct nn_transport *nn_transports[] = {
139 &nn_inproc,
140 &nn_ipc,
141 &nn_tcp,
142 &nn_ws,
143 NULL,
144 };
145
146 struct nn_global {
147
148 /* The global table of existing sockets. The descriptor representing
149 the socket is the index to this table. This pointer is also used to
150 find out whether context is initialised. If it is NULL, context is
151 uninitialised. */
152 struct nn_sock **socks;
153
154 /* Stack of unused file descriptors. */
155 uint16_t *unused;
156
157 /* Number of actual open sockets in the socket table. */
158 size_t nsocks;
159
160 /* Combination of the flags listed above. */
161 int flags;
162
163 /* Pool of worker threads. */
164 struct nn_pool pool;
165
166 /* Timer and other machinery for submitting statistics */
167 int state;
168
169 int print_errors;
170
171 int inited;
172 nn_mutex_t lock;
173 nn_condvar_t cond;
174 };
175
176 /* Singleton object containing the global state of the library. */
177 static struct nn_global self;
178 static nn_once_t once = NN_ONCE_INITIALIZER;
179
180
181 /* Context creation- and termination-related private functions. */
182 static void nn_global_init (void);
183 static void nn_global_term (void);
184
185 /* Private function that unifies nn_bind and nn_connect functionality.
186 It returns the ID of the newly created endpoint. */
187 static int nn_global_create_ep (struct nn_sock *, const char *addr, int bind);
188
189 /* Private socket creator which doesn't initialize global state and
190 does no locking by itself */
191 static int nn_global_create_socket (int domain, int protocol);
192
193 /* Socket holds. */
194 static int nn_global_hold_socket (struct nn_sock **sockp, int s);
195 static int nn_global_hold_socket_locked (struct nn_sock **sockp, int s);
196 static void nn_global_rele_socket(struct nn_sock *);
197
nn_errno(void)198 int nn_errno (void)
199 {
200 return nn_err_errno ();
201 }
202
nn_strerror(int errnum)203 const char *nn_strerror (int errnum)
204 {
205 return nn_err_strerror (errnum);
206 }
207
nn_global_init(void)208 static void nn_global_init (void)
209 {
210 int i;
211 char *envvar;
212
213 #if defined NN_HAVE_WINDOWS
214 int rc;
215 WSADATA data;
216 #endif
217 const struct nn_transport *tp;
218
219 /* Check whether the library was already initialised. If so, do nothing. */
220 if (self.socks)
221 return;
222
223 /* On Windows, initialise the socket library. */
224 #if defined NN_HAVE_WINDOWS
225 rc = WSAStartup (MAKEWORD (2, 2), &data);
226 nn_assert (rc == 0);
227 nn_assert (LOBYTE (data.wVersion) == 2 &&
228 HIBYTE (data.wVersion) == 2);
229 #endif
230
231 /* Initialise the memory allocation subsystem. */
232 nn_alloc_init ();
233
234 /* Seed the pseudo-random number generator. */
235 nn_random_seed ();
236
237 /* Allocate the global table of SP sockets. */
238 self.socks = nn_alloc ((sizeof (struct nn_sock*) * NN_MAX_SOCKETS) +
239 (sizeof (uint16_t) * NN_MAX_SOCKETS), "socket table");
240 alloc_assert (self.socks);
241 for (i = 0; i != NN_MAX_SOCKETS; ++i)
242 self.socks [i] = NULL;
243 self.nsocks = 0;
244 self.flags = 0;
245
246 /* Print connection and accepting errors to the stderr */
247 envvar = getenv("NN_PRINT_ERRORS");
248 /* any non-empty string is true */
249 self.print_errors = envvar && *envvar;
250
251 /* Allocate the stack of unused file descriptors. */
252 self.unused = (uint16_t*) (self.socks + NN_MAX_SOCKETS);
253 alloc_assert (self.unused);
254 for (i = 0; i != NN_MAX_SOCKETS; ++i)
255 self.unused [i] = NN_MAX_SOCKETS - i - 1;
256
257 /* Initialize transports if needed. */
258 for (i = 0; (tp = nn_transports[i]) != NULL; i++) {
259 if (tp->init != NULL) {
260 tp->init ();
261 }
262 }
263
264 /* Start the worker threads. */
265 nn_pool_init (&self.pool);
266 }
267
nn_global_term(void)268 static void nn_global_term (void)
269 {
270 #if defined NN_HAVE_WINDOWS
271 int rc;
272 #endif
273 const struct nn_transport *tp;
274 int i;
275
276 /* If there are no sockets remaining, uninitialise the global context. */
277 nn_assert (self.socks);
278 if (self.nsocks > 0)
279 return;
280
281 /* Shut down the worker threads. */
282 nn_pool_term (&self.pool);
283
284 /* Ask all the transport to deallocate their global resources. */
285 for (i = 0; (tp = nn_transports[i]) != NULL; i++) {
286 if (tp->term)
287 tp->term ();
288 }
289
290 /* Final deallocation of the nn_global object itself. */
291 nn_free (self.socks);
292
293 /* This marks the global state as uninitialised. */
294 self.socks = NULL;
295
296 /* Shut down the memory allocation subsystem. */
297 nn_alloc_term ();
298
299 /* On Windows, uninitialise the socket library. */
300 #if defined NN_HAVE_WINDOWS
301 rc = WSACleanup ();
302 nn_assert (rc == 0);
303 #endif
304 }
305
nn_term(void)306 void nn_term (void)
307 {
308 int i;
309
310 if (!self.inited) {
311 return;
312 }
313
314 nn_mutex_lock (&self.lock);
315 self.flags |= NN_CTX_FLAG_TERMING;
316 nn_mutex_unlock (&self.lock);
317
318 /* Make sure we really close resources, this will cause global
319 resources to be freed too when the last socket is closed. */
320 for (i = 0; i < NN_MAX_SOCKETS; i++) {
321 (void) nn_close (i);
322 }
323
324 nn_mutex_lock (&self.lock);
325 self.flags |= NN_CTX_FLAG_TERMED;
326 self.flags &= ~NN_CTX_FLAG_TERMING;
327 nn_condvar_broadcast(&self.cond);
328 nn_mutex_unlock (&self.lock);
329 }
330
nn_lib_init(void)331 static void nn_lib_init(void)
332 {
333 /* This function is executed once to initialize global locks. */
334 nn_mutex_init (&self.lock);
335 nn_condvar_init (&self.cond);
336 self.inited = 1;
337 }
338
nn_init(void)339 void nn_init (void)
340 {
341 nn_do_once (&once, nn_lib_init);
342
343 nn_mutex_lock (&self.lock);
344 /* Wait for any in progress term to complete. */
345 while (self.flags & NN_CTX_FLAG_TERMING) {
346 nn_condvar_wait (&self.cond, &self.lock, -1);
347 }
348 self.flags &= ~NN_CTX_FLAG_TERMED;
349 nn_mutex_unlock (&self.lock);
350 }
351
nn_allocmsg(size_t size,int type)352 void *nn_allocmsg (size_t size, int type)
353 {
354 int rc;
355 void *result;
356
357 rc = nn_chunk_alloc (size, type, &result);
358 if (rc == 0)
359 return result;
360 errno = -rc;
361 return NULL;
362 }
363
nn_reallocmsg(void * msg,size_t size)364 void *nn_reallocmsg (void *msg, size_t size)
365 {
366 int rc;
367
368 rc = nn_chunk_realloc (size, &msg);
369 if (rc == 0)
370 return msg;
371 errno = -rc;
372 return NULL;
373 }
374
nn_freemsg(void * msg)375 int nn_freemsg (void *msg)
376 {
377 nn_chunk_free (msg);
378 return 0;
379 }
380
nn_cmsg_nxthdr_(const struct nn_msghdr * mhdr,const struct nn_cmsghdr * cmsg)381 struct nn_cmsghdr *nn_cmsg_nxthdr_ (const struct nn_msghdr *mhdr,
382 const struct nn_cmsghdr *cmsg)
383 {
384 char *data;
385 size_t sz;
386 struct nn_cmsghdr *next;
387 size_t headsz;
388
389 /* Early return if no message is provided. */
390 if (nn_slow (mhdr == NULL))
391 return NULL;
392
393 /* Get the actual data. */
394 if (mhdr->msg_controllen == NN_MSG) {
395 data = *((void**) mhdr->msg_control);
396 sz = nn_chunk_size (data);
397 }
398 else {
399 data = (char*) mhdr->msg_control;
400 sz = mhdr->msg_controllen;
401 }
402
403 /* Ancillary data allocation was not even large enough for one element. */
404 if (nn_slow (sz < NN_CMSG_SPACE (0)))
405 return NULL;
406
407 /* If cmsg is set to NULL we are going to return first property.
408 Otherwise move to the next property. */
409 if (!cmsg)
410 next = (struct nn_cmsghdr*) data;
411 else
412 next = (struct nn_cmsghdr*)
413 (((char*) cmsg) + NN_CMSG_ALIGN_ (cmsg->cmsg_len));
414
415 /* If there's no space for next property, treat it as the end
416 of the property list. */
417 headsz = ((char*) next) - data;
418 if (headsz + NN_CMSG_SPACE (0) > sz ||
419 headsz + NN_CMSG_ALIGN_ (next->cmsg_len) > sz)
420 return NULL;
421
422 /* Success. */
423 return next;
424 }
425
nn_global_create_socket(int domain,int protocol)426 int nn_global_create_socket (int domain, int protocol)
427 {
428 int rc;
429 int s;
430 int i;
431 const struct nn_socktype *socktype;
432 struct nn_sock *sock;
433
434 /* The function is called with lock held */
435
436 /* Only AF_SP and AF_SP_RAW domains are supported. */
437 if (domain != AF_SP && domain != AF_SP_RAW) {
438 return -EAFNOSUPPORT;
439 }
440
441 /* If socket limit was reached, report error. */
442 if (self.nsocks >= NN_MAX_SOCKETS) {
443 return -EMFILE;
444 }
445
446 /* Find an empty socket slot. */
447 s = self.unused [NN_MAX_SOCKETS - self.nsocks - 1];
448
449 /* Find the appropriate socket type. */
450 for (i = 0; (socktype = nn_socktypes[i]) != NULL; i++) {
451 if (socktype->domain == domain && socktype->protocol == protocol) {
452
453 /* Instantiate the socket. */
454 if ((sock = nn_alloc (sizeof (struct nn_sock), "sock")) == NULL)
455 return -ENOMEM;
456 rc = nn_sock_init (sock, socktype, s);
457 if (rc < 0) {
458 nn_free (sock);
459 return rc;
460 }
461
462 /* Adjust the global socket table. */
463 self.socks [s] = sock;
464 ++self.nsocks;
465 return s;
466 }
467 }
468 /* Specified socket type wasn't found. */
469 return -EINVAL;
470 }
471
nn_socket(int domain,int protocol)472 int nn_socket (int domain, int protocol)
473 {
474 int rc;
475
476 nn_do_once (&once, nn_lib_init);
477
478 nn_mutex_lock (&self.lock);
479
480 /* If nn_term() was already called, return ETERM. */
481 if (nn_slow (self.flags & NN_CTX_FLAG_TERM)) {
482 nn_mutex_unlock (&self.lock);
483 errno = ETERM;
484 return -1;
485 }
486
487 /* Make sure that global state is initialised. */
488 nn_global_init ();
489
490 rc = nn_global_create_socket (domain, protocol);
491
492 if (rc < 0) {
493 nn_global_term ();
494 nn_mutex_unlock (&self.lock);
495 errno = -rc;
496 return -1;
497 }
498
499 nn_mutex_unlock (&self.lock);
500
501 return rc;
502 }
503
nn_close(int s)504 int nn_close (int s)
505 {
506 int rc;
507 struct nn_sock *sock;
508
509 nn_mutex_lock (&self.lock);
510 rc = nn_global_hold_socket_locked (&sock, s);
511 if (nn_slow (rc < 0)) {
512 nn_mutex_unlock (&self.lock);
513 errno = -rc;
514 return -1;
515 }
516
517 /* Start the shutdown process on the socket. This will cause
518 all other socket users, as well as endpoints, to begin cleaning up.
519 This is done with the lock held to ensure that two instances
520 of nn_close can't access the same socket. */
521 nn_sock_stop (sock);
522
523 /* We have to drop both the hold we just acquired, as well as
524 the original hold, in order for nn_sock_term to complete. */
525 nn_sock_rele (sock);
526 nn_sock_rele (sock);
527 nn_mutex_unlock (&self.lock);
528
529 /* Now clean up. The termination routine below will block until
530 all other consumers of the socket have dropped their holds, and
531 all endpoints have cleanly exited. */
532 rc = nn_sock_term (sock);
533 if (nn_slow (rc == -EINTR)) {
534 nn_global_rele_socket (sock);
535 errno = EINTR;
536 return -1;
537 }
538
539 /* Remove the socket from the socket table, add it to unused socket
540 table. */
541 nn_mutex_lock (&self.lock);
542 self.socks [s] = NULL;
543 self.unused [NN_MAX_SOCKETS - self.nsocks] = s;
544 --self.nsocks;
545 nn_free (sock);
546
547 /* Destroy the global context if there's no socket remaining. */
548 nn_global_term ();
549
550 nn_mutex_unlock (&self.lock);
551
552 return 0;
553 }
554
nn_setsockopt(int s,int level,int option,const void * optval,size_t optvallen)555 int nn_setsockopt (int s, int level, int option, const void *optval,
556 size_t optvallen)
557 {
558 int rc;
559 struct nn_sock *sock;
560
561 rc = nn_global_hold_socket (&sock, s);
562 if (nn_slow (rc < 0)) {
563 errno = -rc;
564 return -1;
565 }
566
567 if (nn_slow (!optval && optvallen)) {
568 rc = -EFAULT;
569 goto fail;
570 }
571
572 rc = nn_sock_setopt (sock, level, option, optval, optvallen);
573 if (nn_slow (rc < 0))
574 goto fail;
575 errnum_assert (rc == 0, -rc);
576 nn_global_rele_socket (sock);
577 return 0;
578
579 fail:
580 nn_global_rele_socket (sock);
581 errno = -rc;
582 return -1;
583 }
584
nn_getsockopt(int s,int level,int option,void * optval,size_t * optvallen)585 int nn_getsockopt (int s, int level, int option, void *optval,
586 size_t *optvallen)
587 {
588 int rc;
589 struct nn_sock *sock;
590
591 rc = nn_global_hold_socket (&sock, s);
592 if (nn_slow (rc < 0)) {
593 errno = -rc;
594 return -1;
595 }
596
597 if (nn_slow (!optval && optvallen)) {
598 rc = -EFAULT;
599 goto fail;
600 }
601
602 rc = nn_sock_getopt (sock, level, option, optval, optvallen);
603 if (nn_slow (rc < 0))
604 goto fail;
605 errnum_assert (rc == 0, -rc);
606 nn_global_rele_socket (sock);
607 return 0;
608
609 fail:
610 nn_global_rele_socket (sock);
611 errno = -rc;
612 return -1;
613 }
614
nn_bind(int s,const char * addr)615 int nn_bind (int s, const char *addr)
616 {
617 int rc;
618 struct nn_sock *sock;
619
620 rc = nn_global_hold_socket (&sock, s);
621 if (rc < 0) {
622 errno = -rc;
623 return -1;
624 }
625
626 rc = nn_global_create_ep (sock, addr, 1);
627 if (nn_slow (rc < 0)) {
628 nn_global_rele_socket (sock);
629 errno = -rc;
630 return -1;
631 }
632
633 nn_global_rele_socket (sock);
634 return rc;
635 }
636
nn_connect(int s,const char * addr)637 int nn_connect (int s, const char *addr)
638 {
639 int rc;
640 struct nn_sock *sock;
641
642 rc = nn_global_hold_socket (&sock, s);
643 if (nn_slow (rc < 0)) {
644 errno = -rc;
645 return -1;
646 }
647
648 rc = nn_global_create_ep (sock, addr, 0);
649 if (rc < 0) {
650 nn_global_rele_socket (sock);
651 errno = -rc;
652 return -1;
653 }
654
655 nn_global_rele_socket (sock);
656 return rc;
657 }
658
nn_shutdown(int s,int how)659 int nn_shutdown (int s, int how)
660 {
661 int rc;
662 struct nn_sock *sock;
663
664 rc = nn_global_hold_socket (&sock, s);
665 if (nn_slow (rc < 0)) {
666 errno = -rc;
667 return -1;
668 }
669
670 rc = nn_sock_rm_ep (sock, how);
671 if (nn_slow (rc < 0)) {
672 nn_global_rele_socket (sock);
673 errno = -rc;
674 return -1;
675 }
676 nn_assert (rc == 0);
677
678 nn_global_rele_socket (sock);
679 return 0;
680 }
681
nn_send(int s,const void * buf,size_t len,int flags)682 int nn_send (int s, const void *buf, size_t len, int flags)
683 {
684 struct nn_iovec iov;
685 struct nn_msghdr hdr;
686
687 iov.iov_base = (void*) buf;
688 iov.iov_len = len;
689
690 hdr.msg_iov = &iov;
691 hdr.msg_iovlen = 1;
692 hdr.msg_control = NULL;
693 hdr.msg_controllen = 0;
694
695 return nn_sendmsg (s, &hdr, flags);
696 }
697
nn_recv(int s,void * buf,size_t len,int flags)698 int nn_recv (int s, void *buf, size_t len, int flags)
699 {
700 struct nn_iovec iov;
701 struct nn_msghdr hdr;
702
703 iov.iov_base = buf;
704 iov.iov_len = len;
705
706 hdr.msg_iov = &iov;
707 hdr.msg_iovlen = 1;
708 hdr.msg_control = NULL;
709 hdr.msg_controllen = 0;
710
711 return nn_recvmsg (s, &hdr, flags);
712 }
713
nn_sendmsg(int s,const struct nn_msghdr * msghdr,int flags)714 int nn_sendmsg (int s, const struct nn_msghdr *msghdr, int flags)
715 {
716 int rc;
717 size_t sz;
718 size_t spsz;
719 int i;
720 struct nn_iovec *iov;
721 struct nn_msg msg;
722 void *chunk;
723 int nnmsg;
724 struct nn_cmsghdr *cmsg;
725 struct nn_sock *sock;
726
727 rc = nn_global_hold_socket (&sock, s);
728 if (nn_slow (rc < 0)) {
729 errno = -rc;
730 return -1;
731 }
732
733 if (nn_slow (!msghdr)) {
734 rc = -EINVAL;
735 goto fail;
736 }
737
738 if (nn_slow (msghdr->msg_iovlen < 0)) {
739 rc = -EMSGSIZE;
740 goto fail;
741 }
742
743 if (msghdr->msg_iovlen == 1 && msghdr->msg_iov [0].iov_len == NN_MSG) {
744 chunk = *(void**) msghdr->msg_iov [0].iov_base;
745 if (nn_slow (chunk == NULL)) {
746 rc = -EFAULT;
747 goto fail;
748 }
749 sz = nn_chunk_size (chunk);
750 nn_msg_init_chunk (&msg, chunk);
751 nnmsg = 1;
752 }
753 else {
754
755 /* Compute the total size of the message. */
756 sz = 0;
757 for (i = 0; i != msghdr->msg_iovlen; ++i) {
758 iov = &msghdr->msg_iov [i];
759 if (nn_slow (iov->iov_len == NN_MSG)) {
760 rc = -EINVAL;
761 goto fail;
762 }
763 if (nn_slow (!iov->iov_base && iov->iov_len)) {
764 rc = -EFAULT;
765 goto fail;
766 }
767 if (nn_slow (sz + iov->iov_len < sz)) {
768 rc = -EINVAL;
769 goto fail;
770 }
771 sz += iov->iov_len;
772 }
773
774 /* Create a message object from the supplied scatter array. */
775 nn_msg_init (&msg, sz);
776 sz = 0;
777 for (i = 0; i != msghdr->msg_iovlen; ++i) {
778 iov = &msghdr->msg_iov [i];
779 memcpy (((uint8_t*) nn_chunkref_data (&msg.body)) + sz,
780 iov->iov_base, iov->iov_len);
781 sz += iov->iov_len;
782 }
783
784 nnmsg = 0;
785 }
786
787 /* Add ancillary data to the message. */
788 if (msghdr->msg_control) {
789
790 /* Copy all headers. */
791 /* TODO: SP_HDR should not be copied here! */
792 if (msghdr->msg_controllen == NN_MSG) {
793 chunk = *((void**) msghdr->msg_control);
794 nn_chunkref_term (&msg.hdrs);
795 nn_chunkref_init_chunk (&msg.hdrs, chunk);
796 }
797 else {
798 nn_chunkref_term (&msg.hdrs);
799 nn_chunkref_init (&msg.hdrs, msghdr->msg_controllen);
800 memcpy (nn_chunkref_data (&msg.hdrs),
801 msghdr->msg_control, msghdr->msg_controllen);
802 }
803
804 /* Search for SP_HDR property. */
805 cmsg = NN_CMSG_FIRSTHDR (msghdr);
806 while (cmsg) {
807 if (cmsg->cmsg_level == PROTO_SP && cmsg->cmsg_type == SP_HDR) {
808 unsigned char *ptr = NN_CMSG_DATA (cmsg);
809 size_t clen = cmsg->cmsg_len - NN_CMSG_SPACE (0);
810 if (clen > sizeof (size_t)) {
811 spsz = *(size_t *)(void *)ptr;
812 if (spsz <= (clen - sizeof (size_t))) {
813 /* Copy body of SP_HDR property into 'sphdr'. */
814 nn_chunkref_term (&msg.sphdr);
815 nn_chunkref_init (&msg.sphdr, spsz);
816 memcpy (nn_chunkref_data (&msg.sphdr),
817 ptr + sizeof (size_t), spsz);
818 }
819 }
820 break;
821 }
822 cmsg = NN_CMSG_NXTHDR (msghdr, cmsg);
823 }
824 }
825
826 /* Send it further down the stack. */
827 rc = nn_sock_send (sock, &msg, flags);
828 if (nn_slow (rc < 0)) {
829
830 /* If we are dealing with user-supplied buffer, detach it from
831 the message object. */
832 if (nnmsg)
833 nn_chunkref_init (&msg.body, 0);
834
835 nn_msg_term (&msg);
836 goto fail;
837 }
838
839 /* Adjust the statistics. */
840 nn_sock_stat_increment (sock, NN_STAT_MESSAGES_SENT, 1);
841 nn_sock_stat_increment (sock, NN_STAT_BYTES_SENT, sz);
842
843 nn_global_rele_socket (sock);
844
845 return (int) sz;
846
847 fail:
848 nn_global_rele_socket (sock);
849
850 errno = -rc;
851 return -1;
852 }
853
nn_recvmsg(int s,struct nn_msghdr * msghdr,int flags)854 int nn_recvmsg (int s, struct nn_msghdr *msghdr, int flags)
855 {
856 int rc;
857 struct nn_msg msg;
858 uint8_t *data;
859 size_t sz;
860 int i;
861 struct nn_iovec *iov;
862 void *chunk;
863 size_t hdrssz;
864 void *ctrl;
865 size_t ctrlsz;
866 size_t spsz;
867 size_t sptotalsz;
868 struct nn_cmsghdr *chdr;
869 struct nn_sock *sock;
870
871 rc = nn_global_hold_socket (&sock, s);
872 if (nn_slow (rc < 0)) {
873 errno = -rc;
874 return -1;
875 }
876
877 if (nn_slow (!msghdr)) {
878 rc = -EINVAL;
879 goto fail;
880 }
881
882 if (nn_slow (msghdr->msg_iovlen < 0)) {
883 rc = -EMSGSIZE;
884 goto fail;
885 }
886
887 /* Get a message. */
888 rc = nn_sock_recv (sock, &msg, flags);
889 if (nn_slow (rc < 0)) {
890 goto fail;
891 }
892
893 if (msghdr->msg_iovlen == 1 && msghdr->msg_iov [0].iov_len == NN_MSG) {
894 chunk = nn_chunkref_getchunk (&msg.body);
895 *(void**) (msghdr->msg_iov [0].iov_base) = chunk;
896 sz = nn_chunk_size (chunk);
897 }
898 else {
899
900 /* Copy the message content into the supplied gather array. */
901 data = nn_chunkref_data (&msg.body);
902 sz = nn_chunkref_size (&msg.body);
903 for (i = 0; i != msghdr->msg_iovlen; ++i) {
904 iov = &msghdr->msg_iov [i];
905 if (nn_slow (iov->iov_len == NN_MSG)) {
906 nn_msg_term (&msg);
907 rc = -EINVAL;
908 goto fail;
909 }
910 if (iov->iov_len > sz) {
911 memcpy (iov->iov_base, data, sz);
912 break;
913 }
914 memcpy (iov->iov_base, data, iov->iov_len);
915 data += iov->iov_len;
916 sz -= iov->iov_len;
917 }
918 sz = nn_chunkref_size (&msg.body);
919 }
920
921 /* Retrieve the ancillary data from the message. */
922 if (msghdr->msg_control) {
923
924 spsz = nn_chunkref_size (&msg.sphdr);
925 sptotalsz = NN_CMSG_SPACE (spsz+sizeof (size_t));
926 ctrlsz = sptotalsz + nn_chunkref_size (&msg.hdrs);
927
928 if (msghdr->msg_controllen == NN_MSG) {
929
930 /* Allocate the buffer. */
931 rc = nn_chunk_alloc (ctrlsz, 0, &ctrl);
932 errnum_assert (rc == 0, -rc);
933
934 /* Set output parameters. */
935 *((void**) msghdr->msg_control) = ctrl;
936 }
937 else {
938
939 /* Just use the buffer supplied by the user. */
940 ctrl = msghdr->msg_control;
941 ctrlsz = msghdr->msg_controllen;
942 }
943
944 /* If SP header alone won't fit into the buffer, return no ancillary
945 properties. */
946 if (ctrlsz >= sptotalsz) {
947 char *ptr;
948
949 /* Fill in SP_HDR ancillary property. */
950 chdr = (struct nn_cmsghdr*) ctrl;
951 chdr->cmsg_len = sptotalsz;
952 chdr->cmsg_level = PROTO_SP;
953 chdr->cmsg_type = SP_HDR;
954 ptr = (void *)chdr;
955 ptr += sizeof (*chdr);
956 *(size_t *)(void *)ptr = spsz;
957 ptr += sizeof (size_t);
958 memcpy (ptr, nn_chunkref_data (&msg.sphdr), spsz);
959
960 /* Fill in as many remaining properties as possible.
961 Truncate the trailing properties if necessary. */
962 hdrssz = nn_chunkref_size (&msg.hdrs);
963 if (hdrssz > ctrlsz - sptotalsz)
964 hdrssz = ctrlsz - sptotalsz;
965 memcpy (((char*) ctrl) + sptotalsz,
966 nn_chunkref_data (&msg.hdrs), hdrssz);
967 }
968 }
969
970 nn_msg_term (&msg);
971
972 /* Adjust the statistics. */
973 nn_sock_stat_increment (sock, NN_STAT_MESSAGES_RECEIVED, 1);
974 nn_sock_stat_increment (sock, NN_STAT_BYTES_RECEIVED, sz);
975
976 nn_global_rele_socket (sock);
977
978 return (int) sz;
979
980 fail:
981 nn_global_rele_socket (sock);
982
983 errno = -rc;
984 return -1;
985 }
986
nn_get_statistic(int s,int statistic)987 uint64_t nn_get_statistic (int s, int statistic)
988 {
989 int rc;
990 struct nn_sock *sock;
991 uint64_t val;
992
993 rc = nn_global_hold_socket (&sock, s);
994 if (nn_slow (rc < 0)) {
995 errno = -rc;
996 return (uint64_t)-1;
997 }
998
999 switch (statistic) {
1000 case NN_STAT_ESTABLISHED_CONNECTIONS:
1001 val = sock->statistics.established_connections;
1002 break;
1003 case NN_STAT_ACCEPTED_CONNECTIONS:
1004 val = sock->statistics.accepted_connections;
1005 break;
1006 case NN_STAT_DROPPED_CONNECTIONS:
1007 val = sock->statistics.dropped_connections;
1008 break;
1009 case NN_STAT_BROKEN_CONNECTIONS:
1010 val = sock->statistics.broken_connections;
1011 break;
1012 case NN_STAT_CONNECT_ERRORS:
1013 val = sock->statistics.connect_errors;
1014 break;
1015 case NN_STAT_BIND_ERRORS:
1016 val = sock->statistics.bind_errors;
1017 break;
1018 case NN_STAT_ACCEPT_ERRORS:
1019 val = sock->statistics.bind_errors;
1020 break;
1021 case NN_STAT_MESSAGES_SENT:
1022 val = sock->statistics.messages_sent;
1023 break;
1024 case NN_STAT_MESSAGES_RECEIVED:
1025 val = sock->statistics.messages_received;
1026 break;
1027 case NN_STAT_BYTES_SENT:
1028 val = sock->statistics.bytes_sent;
1029 break;
1030 case NN_STAT_BYTES_RECEIVED:
1031 val = sock->statistics.bytes_received;
1032 break;
1033 case NN_STAT_CURRENT_CONNECTIONS:
1034 val = sock->statistics.current_connections;
1035 break;
1036 case NN_STAT_INPROGRESS_CONNECTIONS:
1037 val = sock->statistics.inprogress_connections;
1038 break;
1039 case NN_STAT_CURRENT_SND_PRIORITY:
1040 val = sock->statistics.current_snd_priority;
1041 break;
1042 case NN_STAT_CURRENT_EP_ERRORS:
1043 val = sock->statistics.current_ep_errors;
1044 break;
1045 default:
1046 val = (uint64_t)-1;
1047 errno = EINVAL;
1048 break;
1049 }
1050
1051 nn_global_rele_socket (sock);
1052 return val;
1053 }
1054
nn_global_create_ep(struct nn_sock * sock,const char * addr,int bind)1055 static int nn_global_create_ep (struct nn_sock *sock, const char *addr,
1056 int bind)
1057 {
1058 int rc;
1059 const char *proto;
1060 const char *delim;
1061 size_t protosz;
1062 const struct nn_transport *tp;
1063 int i;
1064
1065 /* Check whether address is valid. */
1066 if (!addr)
1067 return -EINVAL;
1068 if (strlen (addr) >= NN_SOCKADDR_MAX)
1069 return -ENAMETOOLONG;
1070
1071 /* Separate the protocol and the actual address. */
1072 proto = addr;
1073 delim = strchr (addr, ':');
1074 if (!delim)
1075 return -EINVAL;
1076 if (delim [1] != '/' || delim [2] != '/')
1077 return -EINVAL;
1078 protosz = delim - addr;
1079 addr += protosz + 3;
1080
1081 /* Find the specified protocol. */
1082 tp = NULL;
1083 for (i = 0; ((tp = nn_transports[i]) != NULL); i++) {
1084 if (strlen (tp->name) == protosz &&
1085 memcmp (tp->name, proto, protosz) == 0)
1086 break;
1087 }
1088
1089 /* The protocol specified doesn't match any known protocol. */
1090 if (tp == NULL) {
1091 return -EPROTONOSUPPORT;
1092 }
1093
1094 /* Ask the socket to create the endpoint. */
1095 rc = nn_sock_add_ep (sock, tp, bind, addr);
1096 return rc;
1097 }
1098
nn_global_transport(int id)1099 const struct nn_transport *nn_global_transport (int id)
1100 {
1101 const struct nn_transport *tp;
1102 int i;
1103
1104 for (i = 0; (tp = nn_transports[i]) != NULL; i++) {
1105 if (tp->id == id)
1106 return tp;
1107 }
1108 return NULL;
1109 }
1110
nn_global_getpool()1111 struct nn_pool *nn_global_getpool ()
1112 {
1113 return &self.pool;
1114 }
1115
nn_global_print_errors()1116 int nn_global_print_errors ()
1117 {
1118 return self.print_errors;
1119 }
1120
1121 /* Get the socket structure for a socket id. This must be called under
1122 the global lock (self.lock.) The socket itself will not be freed
1123 while the hold is active. */
nn_global_hold_socket_locked(struct nn_sock ** sockp,int s)1124 int nn_global_hold_socket_locked(struct nn_sock **sockp, int s)
1125 {
1126 struct nn_sock *sock;
1127
1128 if (nn_slow (s < 0 || s >= NN_MAX_SOCKETS || self.socks == NULL))
1129 return -EBADF;
1130
1131 sock = self.socks[s];
1132 if (nn_slow (sock == NULL)) {
1133 return -EBADF;
1134 }
1135
1136 if (nn_slow (nn_sock_hold (sock) != 0)) {
1137 return -EBADF;
1138 }
1139 *sockp = sock;
1140 return 0;
1141 }
1142
nn_global_hold_socket(struct nn_sock ** sockp,int s)1143 int nn_global_hold_socket(struct nn_sock **sockp, int s)
1144 {
1145 int rc;
1146 nn_mutex_lock(&self.lock);
1147 rc = nn_global_hold_socket_locked(sockp, s);
1148 nn_mutex_unlock(&self.lock);
1149 return rc;
1150 }
1151
nn_global_rele_socket(struct nn_sock * sock)1152 void nn_global_rele_socket(struct nn_sock *sock)
1153 {
1154 nn_mutex_lock(&self.lock);
1155 nn_sock_rele(sock);
1156 nn_mutex_unlock(&self.lock);
1157 }
1158