1 /*
2     Copyright (c) 2012-2014 Martin Sustrik  All rights reserved.
3     Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
4     Copyright 2016 Garrett D'Amore <garrett@damore.org>
5 
6     Permission is hereby granted, free of charge, to any person obtaining a copy
7     of this software and associated documentation files (the "Software"),
8     to deal in the Software without restriction, including without limitation
9     the rights to use, copy, modify, merge, publish, distribute, sublicense,
10     and/or sell copies of the Software, and to permit persons to whom
11     the Software is furnished to do so, subject to the following conditions:
12 
13     The above copyright notice and this permission notice shall be included
14     in all copies or substantial portions of the Software.
15 
16     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19     THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22     IN THE SOFTWARE.
23 */
24 
25 #include "../nn.h"
26 #include "../transport.h"
27 #include "../protocol.h"
28 
29 #include "global.h"
30 #include "sock.h"
31 #include "ep.h"
32 
33 #include "../aio/pool.h"
34 #include "../aio/timer.h"
35 
36 #include "../utils/err.h"
37 #include "../utils/alloc.h"
38 #include "../utils/mutex.h"
39 #include "../utils/condvar.h"
40 #include "../utils/once.h"
41 #include "../utils/list.h"
42 #include "../utils/cont.h"
43 #include "../utils/random.h"
44 #include "../utils/chunk.h"
45 #include "../utils/msg.h"
46 #include "../utils/attr.h"
47 
48 #include "../pubsub.h"
49 #include "../pipeline.h"
50 
51 #include <stddef.h>
52 #include <stdlib.h>
53 #include <string.h>
54 #include <time.h>
55 
56 #if defined NN_HAVE_WINDOWS
57 #include "../utils/win.h"
58 #else
59 #include <unistd.h>
60 #endif
61 
62 /*  Max number of concurrent SP sockets. Configureable at build time */
63 #ifndef NN_MAX_SOCKETS
64 #define NN_MAX_SOCKETS 512
65 #endif
66 
67 /*  To save some space, list of unused socket slots uses uint16_t integers to
68     refer to individual sockets. If there's a need to more that 0x10000 sockets,
69     the type should be changed to uint32_t or int. */
70 CT_ASSERT (NN_MAX_SOCKETS <= 0x10000);
71 
72 #define NN_CTX_FLAG_TERMED 1
73 #define NN_CTX_FLAG_TERMING 2
74 #define NN_CTX_FLAG_TERM (NN_CTX_FLAG_TERMED | NN_CTX_FLAG_TERMING)
75 
76 #define NN_GLOBAL_SRC_STAT_TIMER 1
77 
78 #define NN_GLOBAL_STATE_IDLE           1
79 #define NN_GLOBAL_STATE_ACTIVE         2
80 #define NN_GLOBAL_STATE_STOPPING_TIMER 3
81 
82 /*  We could put these in an external header file, but there really is
83     need to.  We are the only thing that needs them. */
84 extern struct nn_socktype nn_pair_socktype;
85 extern struct nn_socktype nn_xpair_socktype;
86 extern struct nn_socktype nn_pub_socktype;
87 extern struct nn_socktype nn_sub_socktype;
88 extern struct nn_socktype nn_xpub_socktype;
89 extern struct nn_socktype nn_xsub_socktype;
90 extern struct nn_socktype nn_rep_socktype;
91 extern struct nn_socktype nn_req_socktype;
92 extern struct nn_socktype nn_xrep_socktype;
93 extern struct nn_socktype nn_xreq_socktype;
94 extern struct nn_socktype nn_push_socktype;
95 extern struct nn_socktype nn_xpush_socktype;
96 extern struct nn_socktype nn_pull_socktype;
97 extern struct nn_socktype nn_xpull_socktype;
98 extern struct nn_socktype nn_respondent_socktype;
99 extern struct nn_socktype nn_surveyor_socktype;
100 extern struct nn_socktype nn_xrespondent_socktype;
101 extern struct nn_socktype nn_xsurveyor_socktype;
102 extern struct nn_socktype nn_bus_socktype;
103 extern struct nn_socktype nn_xbus_socktype;
104 
105 /*  Array of known socket types. */
106 const struct nn_socktype *nn_socktypes[] = {
107     &nn_pair_socktype,
108     &nn_xpair_socktype,
109     &nn_pub_socktype,
110     &nn_sub_socktype,
111     &nn_xpub_socktype,
112     &nn_xsub_socktype,
113     &nn_rep_socktype,
114     &nn_req_socktype,
115     &nn_xrep_socktype,
116     &nn_xreq_socktype,
117     &nn_push_socktype,
118     &nn_xpush_socktype,
119     &nn_pull_socktype,
120     &nn_xpull_socktype,
121     &nn_respondent_socktype,
122     &nn_surveyor_socktype,
123     &nn_xrespondent_socktype,
124     &nn_xsurveyor_socktype,
125     &nn_bus_socktype,
126     &nn_xbus_socktype,
127     NULL,
128 };
129 
130 /* As with protocols, we could have these in a header file, but we are the
131    only consumer, so just declare them inline. */
132 
133 extern struct nn_transport nn_inproc;
134 extern struct nn_transport nn_ipc;
135 extern struct nn_transport nn_tcp;
136 extern struct nn_transport nn_ws;
137 
138 const struct nn_transport *nn_transports[] = {
139     &nn_inproc,
140     &nn_ipc,
141     &nn_tcp,
142     &nn_ws,
143     NULL,
144 };
145 
146 struct nn_global {
147 
148     /*  The global table of existing sockets. The descriptor representing
149         the socket is the index to this table. This pointer is also used to
150         find out whether context is initialised. If it is NULL, context is
151         uninitialised. */
152     struct nn_sock **socks;
153 
154     /*  Stack of unused file descriptors. */
155     uint16_t *unused;
156 
157     /*  Number of actual open sockets in the socket table. */
158     size_t nsocks;
159 
160     /*  Combination of the flags listed above. */
161     int flags;
162 
163     /*  Pool of worker threads. */
164     struct nn_pool pool;
165 
166     /*  Timer and other machinery for submitting statistics  */
167     int state;
168 
169     int print_errors;
170 
171     int inited;
172     nn_mutex_t lock;
173     nn_condvar_t cond;
174 };
175 
176 /*  Singleton object containing the global state of the library. */
177 static struct nn_global self;
178 static nn_once_t once = NN_ONCE_INITIALIZER;
179 
180 
181 /*  Context creation- and termination-related private functions. */
182 static void nn_global_init (void);
183 static void nn_global_term (void);
184 
185 /*  Private function that unifies nn_bind and nn_connect functionality.
186     It returns the ID of the newly created endpoint. */
187 static int nn_global_create_ep (struct nn_sock *, const char *addr, int bind);
188 
189 /*  Private socket creator which doesn't initialize global state and
190     does no locking by itself */
191 static int nn_global_create_socket (int domain, int protocol);
192 
193 /*  Socket holds. */
194 static int nn_global_hold_socket (struct nn_sock **sockp, int s);
195 static int nn_global_hold_socket_locked (struct nn_sock **sockp, int s);
196 static void nn_global_rele_socket(struct nn_sock *);
197 
nn_errno(void)198 int nn_errno (void)
199 {
200     return nn_err_errno ();
201 }
202 
nn_strerror(int errnum)203 const char *nn_strerror (int errnum)
204 {
205     return nn_err_strerror (errnum);
206 }
207 
nn_global_init(void)208 static void nn_global_init (void)
209 {
210     int i;
211     char *envvar;
212 
213 #if defined NN_HAVE_WINDOWS
214     int rc;
215     WSADATA data;
216 #endif
217     const struct nn_transport *tp;
218 
219     /*  Check whether the library was already initialised. If so, do nothing. */
220     if (self.socks)
221         return;
222 
223     /*  On Windows, initialise the socket library. */
224 #if defined NN_HAVE_WINDOWS
225     rc = WSAStartup (MAKEWORD (2, 2), &data);
226     nn_assert (rc == 0);
227     nn_assert (LOBYTE (data.wVersion) == 2 &&
228         HIBYTE (data.wVersion) == 2);
229 #endif
230 
231     /*  Initialise the memory allocation subsystem. */
232     nn_alloc_init ();
233 
234     /*  Seed the pseudo-random number generator. */
235     nn_random_seed ();
236 
237     /*  Allocate the global table of SP sockets. */
238     self.socks = nn_alloc ((sizeof (struct nn_sock*) * NN_MAX_SOCKETS) +
239         (sizeof (uint16_t) * NN_MAX_SOCKETS), "socket table");
240     alloc_assert (self.socks);
241     for (i = 0; i != NN_MAX_SOCKETS; ++i)
242         self.socks [i] = NULL;
243     self.nsocks = 0;
244     self.flags = 0;
245 
246     /*  Print connection and accepting errors to the stderr  */
247     envvar = getenv("NN_PRINT_ERRORS");
248     /*  any non-empty string is true */
249     self.print_errors = envvar && *envvar;
250 
251     /*  Allocate the stack of unused file descriptors. */
252     self.unused = (uint16_t*) (self.socks + NN_MAX_SOCKETS);
253     alloc_assert (self.unused);
254     for (i = 0; i != NN_MAX_SOCKETS; ++i)
255         self.unused [i] = NN_MAX_SOCKETS - i - 1;
256 
257     /*  Initialize transports if needed. */
258     for (i = 0; (tp = nn_transports[i]) != NULL; i++) {
259         if (tp->init != NULL) {
260             tp->init ();
261         }
262     }
263 
264     /*  Start the worker threads. */
265     nn_pool_init (&self.pool);
266 }
267 
nn_global_term(void)268 static void nn_global_term (void)
269 {
270 #if defined NN_HAVE_WINDOWS
271     int rc;
272 #endif
273     const struct nn_transport *tp;
274     int i;
275 
276     /*  If there are no sockets remaining, uninitialise the global context. */
277     nn_assert (self.socks);
278     if (self.nsocks > 0)
279         return;
280 
281     /*  Shut down the worker threads. */
282     nn_pool_term (&self.pool);
283 
284     /*  Ask all the transport to deallocate their global resources. */
285     for (i = 0; (tp = nn_transports[i]) != NULL; i++) {
286         if (tp->term)
287             tp->term ();
288     }
289 
290     /*  Final deallocation of the nn_global object itself. */
291     nn_free (self.socks);
292 
293     /*  This marks the global state as uninitialised. */
294     self.socks = NULL;
295 
296     /*  Shut down the memory allocation subsystem. */
297     nn_alloc_term ();
298 
299     /*  On Windows, uninitialise the socket library. */
300 #if defined NN_HAVE_WINDOWS
301     rc = WSACleanup ();
302     nn_assert (rc == 0);
303 #endif
304 }
305 
nn_term(void)306 void nn_term (void)
307 {
308     int i;
309 
310     if (!self.inited) {
311         return;
312     }
313 
314     nn_mutex_lock (&self.lock);
315     self.flags |= NN_CTX_FLAG_TERMING;
316     nn_mutex_unlock (&self.lock);
317 
318     /* Make sure we really close resources, this will cause global
319        resources to be freed too when the last socket is closed. */
320     for (i = 0; i < NN_MAX_SOCKETS; i++) {
321         (void) nn_close (i);
322     }
323 
324     nn_mutex_lock (&self.lock);
325     self.flags |= NN_CTX_FLAG_TERMED;
326     self.flags &= ~NN_CTX_FLAG_TERMING;
327     nn_condvar_broadcast(&self.cond);
328     nn_mutex_unlock (&self.lock);
329 }
330 
nn_lib_init(void)331 static void nn_lib_init(void)
332 {
333     /*  This function is executed once to initialize global locks. */
334     nn_mutex_init (&self.lock);
335     nn_condvar_init (&self.cond);
336     self.inited = 1;
337 }
338 
nn_init(void)339 void nn_init (void)
340 {
341     nn_do_once (&once, nn_lib_init);
342 
343     nn_mutex_lock (&self.lock);
344     /*  Wait for any in progress term to complete. */
345     while (self.flags & NN_CTX_FLAG_TERMING) {
346         nn_condvar_wait (&self.cond, &self.lock, -1);
347     }
348     self.flags &= ~NN_CTX_FLAG_TERMED;
349     nn_mutex_unlock (&self.lock);
350 }
351 
nn_allocmsg(size_t size,int type)352 void *nn_allocmsg (size_t size, int type)
353 {
354     int rc;
355     void *result;
356 
357     rc = nn_chunk_alloc (size, type, &result);
358     if (rc == 0)
359         return result;
360     errno = -rc;
361     return NULL;
362 }
363 
nn_reallocmsg(void * msg,size_t size)364 void *nn_reallocmsg (void *msg, size_t size)
365 {
366     int rc;
367 
368     rc = nn_chunk_realloc (size, &msg);
369     if (rc == 0)
370         return msg;
371     errno = -rc;
372     return NULL;
373 }
374 
nn_freemsg(void * msg)375 int nn_freemsg (void *msg)
376 {
377     nn_chunk_free (msg);
378     return 0;
379 }
380 
nn_cmsg_nxthdr_(const struct nn_msghdr * mhdr,const struct nn_cmsghdr * cmsg)381 struct nn_cmsghdr *nn_cmsg_nxthdr_ (const struct nn_msghdr *mhdr,
382     const struct nn_cmsghdr *cmsg)
383 {
384     char *data;
385     size_t sz;
386     struct nn_cmsghdr *next;
387     size_t headsz;
388 
389     /*  Early return if no message is provided. */
390     if (nn_slow (mhdr == NULL))
391         return NULL;
392 
393     /*  Get the actual data. */
394     if (mhdr->msg_controllen == NN_MSG) {
395         data = *((void**) mhdr->msg_control);
396         sz = nn_chunk_size (data);
397     }
398     else {
399         data = (char*) mhdr->msg_control;
400         sz = mhdr->msg_controllen;
401     }
402 
403     /*  Ancillary data allocation was not even large enough for one element. */
404     if (nn_slow (sz < NN_CMSG_SPACE (0)))
405         return NULL;
406 
407     /*  If cmsg is set to NULL we are going to return first property.
408         Otherwise move to the next property. */
409     if (!cmsg)
410         next = (struct nn_cmsghdr*) data;
411     else
412         next = (struct nn_cmsghdr*)
413             (((char*) cmsg) + NN_CMSG_ALIGN_ (cmsg->cmsg_len));
414 
415     /*  If there's no space for next property, treat it as the end
416         of the property list. */
417     headsz = ((char*) next) - data;
418     if (headsz + NN_CMSG_SPACE (0) > sz ||
419           headsz + NN_CMSG_ALIGN_ (next->cmsg_len) > sz)
420         return NULL;
421 
422     /*  Success. */
423     return next;
424 }
425 
nn_global_create_socket(int domain,int protocol)426 int nn_global_create_socket (int domain, int protocol)
427 {
428     int rc;
429     int s;
430     int i;
431     const struct nn_socktype *socktype;
432     struct nn_sock *sock;
433 
434     /* The function is called with lock held */
435 
436     /*  Only AF_SP and AF_SP_RAW domains are supported. */
437     if (domain != AF_SP && domain != AF_SP_RAW) {
438         return -EAFNOSUPPORT;
439     }
440 
441     /*  If socket limit was reached, report error. */
442     if (self.nsocks >= NN_MAX_SOCKETS) {
443         return -EMFILE;
444     }
445 
446     /*  Find an empty socket slot. */
447     s = self.unused [NN_MAX_SOCKETS - self.nsocks - 1];
448 
449     /*  Find the appropriate socket type. */
450     for (i = 0; (socktype = nn_socktypes[i]) != NULL; i++) {
451         if (socktype->domain == domain && socktype->protocol == protocol) {
452 
453             /*  Instantiate the socket. */
454             if ((sock = nn_alloc (sizeof (struct nn_sock), "sock")) == NULL)
455                 return -ENOMEM;
456             rc = nn_sock_init (sock, socktype, s);
457             if (rc < 0) {
458                 nn_free (sock);
459                 return rc;
460             }
461 
462             /*  Adjust the global socket table. */
463             self.socks [s] = sock;
464             ++self.nsocks;
465             return s;
466         }
467     }
468     /*  Specified socket type wasn't found. */
469     return -EINVAL;
470 }
471 
nn_socket(int domain,int protocol)472 int nn_socket (int domain, int protocol)
473 {
474     int rc;
475 
476     nn_do_once (&once, nn_lib_init);
477 
478     nn_mutex_lock (&self.lock);
479 
480     /*  If nn_term() was already called, return ETERM. */
481     if (nn_slow (self.flags & NN_CTX_FLAG_TERM)) {
482         nn_mutex_unlock (&self.lock);
483         errno = ETERM;
484         return -1;
485     }
486 
487     /*  Make sure that global state is initialised. */
488     nn_global_init ();
489 
490     rc = nn_global_create_socket (domain, protocol);
491 
492     if (rc < 0) {
493         nn_global_term ();
494         nn_mutex_unlock (&self.lock);
495         errno = -rc;
496         return -1;
497     }
498 
499     nn_mutex_unlock (&self.lock);
500 
501     return rc;
502 }
503 
nn_close(int s)504 int nn_close (int s)
505 {
506     int rc;
507     struct nn_sock *sock;
508 
509     nn_mutex_lock (&self.lock);
510     rc = nn_global_hold_socket_locked (&sock, s);
511     if (nn_slow (rc < 0)) {
512         nn_mutex_unlock (&self.lock);
513         errno = -rc;
514         return -1;
515     }
516 
517     /*  Start the shutdown process on the socket.  This will cause
518         all other socket users, as well as endpoints, to begin cleaning up.
519         This is done with the lock held to ensure that two instances
520         of nn_close can't access the same socket. */
521     nn_sock_stop (sock);
522 
523     /*  We have to drop both the hold we just acquired, as well as
524         the original hold, in order for nn_sock_term to complete. */
525     nn_sock_rele (sock);
526     nn_sock_rele (sock);
527     nn_mutex_unlock (&self.lock);
528 
529     /*  Now clean up.  The termination routine below will block until
530         all other consumers of the socket have dropped their holds, and
531         all endpoints have cleanly exited. */
532     rc = nn_sock_term (sock);
533     if (nn_slow (rc == -EINTR)) {
534         nn_global_rele_socket (sock);
535         errno = EINTR;
536         return -1;
537     }
538 
539     /*  Remove the socket from the socket table, add it to unused socket
540         table. */
541     nn_mutex_lock (&self.lock);
542     self.socks [s] = NULL;
543     self.unused [NN_MAX_SOCKETS - self.nsocks] = s;
544     --self.nsocks;
545     nn_free (sock);
546 
547     /*  Destroy the global context if there's no socket remaining. */
548     nn_global_term ();
549 
550     nn_mutex_unlock (&self.lock);
551 
552     return 0;
553 }
554 
nn_setsockopt(int s,int level,int option,const void * optval,size_t optvallen)555 int nn_setsockopt (int s, int level, int option, const void *optval,
556     size_t optvallen)
557 {
558     int rc;
559     struct nn_sock *sock;
560 
561     rc = nn_global_hold_socket (&sock, s);
562     if (nn_slow (rc < 0)) {
563         errno = -rc;
564         return -1;
565     }
566 
567     if (nn_slow (!optval && optvallen)) {
568         rc = -EFAULT;
569         goto fail;
570     }
571 
572     rc = nn_sock_setopt (sock, level, option, optval, optvallen);
573     if (nn_slow (rc < 0))
574         goto fail;
575     errnum_assert (rc == 0, -rc);
576     nn_global_rele_socket (sock);
577     return 0;
578 
579 fail:
580     nn_global_rele_socket (sock);
581     errno = -rc;
582     return -1;
583 }
584 
nn_getsockopt(int s,int level,int option,void * optval,size_t * optvallen)585 int nn_getsockopt (int s, int level, int option, void *optval,
586     size_t *optvallen)
587 {
588     int rc;
589     struct nn_sock *sock;
590 
591     rc = nn_global_hold_socket (&sock, s);
592     if (nn_slow (rc < 0)) {
593         errno = -rc;
594         return -1;
595     }
596 
597     if (nn_slow (!optval && optvallen)) {
598         rc = -EFAULT;
599         goto fail;
600     }
601 
602     rc = nn_sock_getopt (sock, level, option, optval, optvallen);
603     if (nn_slow (rc < 0))
604         goto fail;
605     errnum_assert (rc == 0, -rc);
606     nn_global_rele_socket (sock);
607     return 0;
608 
609 fail:
610     nn_global_rele_socket (sock);
611     errno = -rc;
612     return -1;
613 }
614 
nn_bind(int s,const char * addr)615 int nn_bind (int s, const char *addr)
616 {
617     int rc;
618     struct nn_sock *sock;
619 
620     rc = nn_global_hold_socket (&sock, s);
621     if (rc < 0) {
622         errno = -rc;
623         return -1;
624     }
625 
626     rc = nn_global_create_ep (sock, addr, 1);
627     if (nn_slow (rc < 0)) {
628         nn_global_rele_socket (sock);
629         errno = -rc;
630         return -1;
631     }
632 
633     nn_global_rele_socket (sock);
634     return rc;
635 }
636 
nn_connect(int s,const char * addr)637 int nn_connect (int s, const char *addr)
638 {
639     int rc;
640     struct nn_sock *sock;
641 
642     rc = nn_global_hold_socket (&sock, s);
643     if (nn_slow (rc < 0)) {
644         errno = -rc;
645         return -1;
646     }
647 
648     rc = nn_global_create_ep (sock, addr, 0);
649     if (rc < 0) {
650         nn_global_rele_socket (sock);
651         errno = -rc;
652         return -1;
653     }
654 
655     nn_global_rele_socket (sock);
656     return rc;
657 }
658 
nn_shutdown(int s,int how)659 int nn_shutdown (int s, int how)
660 {
661     int rc;
662     struct nn_sock *sock;
663 
664     rc = nn_global_hold_socket (&sock, s);
665     if (nn_slow (rc < 0)) {
666         errno = -rc;
667         return -1;
668     }
669 
670     rc = nn_sock_rm_ep (sock, how);
671     if (nn_slow (rc < 0)) {
672         nn_global_rele_socket (sock);
673         errno = -rc;
674         return -1;
675     }
676     nn_assert (rc == 0);
677 
678     nn_global_rele_socket (sock);
679     return 0;
680 }
681 
nn_send(int s,const void * buf,size_t len,int flags)682 int nn_send (int s, const void *buf, size_t len, int flags)
683 {
684     struct nn_iovec iov;
685     struct nn_msghdr hdr;
686 
687     iov.iov_base = (void*) buf;
688     iov.iov_len = len;
689 
690     hdr.msg_iov = &iov;
691     hdr.msg_iovlen = 1;
692     hdr.msg_control = NULL;
693     hdr.msg_controllen = 0;
694 
695     return nn_sendmsg (s, &hdr, flags);
696 }
697 
nn_recv(int s,void * buf,size_t len,int flags)698 int nn_recv (int s, void *buf, size_t len, int flags)
699 {
700     struct nn_iovec iov;
701     struct nn_msghdr hdr;
702 
703     iov.iov_base = buf;
704     iov.iov_len = len;
705 
706     hdr.msg_iov = &iov;
707     hdr.msg_iovlen = 1;
708     hdr.msg_control = NULL;
709     hdr.msg_controllen = 0;
710 
711     return nn_recvmsg (s, &hdr, flags);
712 }
713 
nn_sendmsg(int s,const struct nn_msghdr * msghdr,int flags)714 int nn_sendmsg (int s, const struct nn_msghdr *msghdr, int flags)
715 {
716     int rc;
717     size_t sz;
718     size_t spsz;
719     int i;
720     struct nn_iovec *iov;
721     struct nn_msg msg;
722     void *chunk;
723     int nnmsg;
724     struct nn_cmsghdr *cmsg;
725     struct nn_sock *sock;
726 
727     rc = nn_global_hold_socket (&sock, s);
728     if (nn_slow (rc < 0)) {
729         errno = -rc;
730         return -1;
731     }
732 
733     if (nn_slow (!msghdr)) {
734         rc = -EINVAL;
735         goto fail;
736     }
737 
738     if (nn_slow (msghdr->msg_iovlen < 0)) {
739         rc = -EMSGSIZE;
740         goto fail;
741     }
742 
743     if (msghdr->msg_iovlen == 1 && msghdr->msg_iov [0].iov_len == NN_MSG) {
744         chunk = *(void**) msghdr->msg_iov [0].iov_base;
745         if (nn_slow (chunk == NULL)) {
746             rc = -EFAULT;
747             goto fail;
748         }
749         sz = nn_chunk_size (chunk);
750         nn_msg_init_chunk (&msg, chunk);
751         nnmsg = 1;
752     }
753     else {
754 
755         /*  Compute the total size of the message. */
756         sz = 0;
757         for (i = 0; i != msghdr->msg_iovlen; ++i) {
758             iov = &msghdr->msg_iov [i];
759             if (nn_slow (iov->iov_len == NN_MSG)) {
760                rc = -EINVAL;
761                goto fail;
762             }
763             if (nn_slow (!iov->iov_base && iov->iov_len)) {
764                 rc = -EFAULT;
765                 goto fail;
766             }
767             if (nn_slow (sz + iov->iov_len < sz)) {
768                 rc = -EINVAL;
769                 goto fail;
770             }
771             sz += iov->iov_len;
772         }
773 
774         /*  Create a message object from the supplied scatter array. */
775         nn_msg_init (&msg, sz);
776         sz = 0;
777         for (i = 0; i != msghdr->msg_iovlen; ++i) {
778             iov = &msghdr->msg_iov [i];
779             memcpy (((uint8_t*) nn_chunkref_data (&msg.body)) + sz,
780                 iov->iov_base, iov->iov_len);
781             sz += iov->iov_len;
782         }
783 
784         nnmsg = 0;
785     }
786 
787     /*  Add ancillary data to the message. */
788     if (msghdr->msg_control) {
789 
790         /*  Copy all headers. */
791         /*  TODO: SP_HDR should not be copied here! */
792         if (msghdr->msg_controllen == NN_MSG) {
793             chunk = *((void**) msghdr->msg_control);
794             nn_chunkref_term (&msg.hdrs);
795             nn_chunkref_init_chunk (&msg.hdrs, chunk);
796         }
797         else {
798             nn_chunkref_term (&msg.hdrs);
799             nn_chunkref_init (&msg.hdrs, msghdr->msg_controllen);
800             memcpy (nn_chunkref_data (&msg.hdrs),
801                 msghdr->msg_control, msghdr->msg_controllen);
802         }
803 
804         /* Search for SP_HDR property. */
805         cmsg = NN_CMSG_FIRSTHDR (msghdr);
806         while (cmsg) {
807             if (cmsg->cmsg_level == PROTO_SP && cmsg->cmsg_type == SP_HDR) {
808                 unsigned char *ptr = NN_CMSG_DATA (cmsg);
809                 size_t clen = cmsg->cmsg_len - NN_CMSG_SPACE (0);
810                 if (clen > sizeof (size_t)) {
811                     spsz = *(size_t *)(void *)ptr;
812                     if (spsz <= (clen - sizeof (size_t))) {
813                         /*  Copy body of SP_HDR property into 'sphdr'. */
814                         nn_chunkref_term (&msg.sphdr);
815                         nn_chunkref_init (&msg.sphdr, spsz);
816                          memcpy (nn_chunkref_data (&msg.sphdr),
817                              ptr + sizeof (size_t), spsz);
818                     }
819                 }
820                 break;
821             }
822             cmsg = NN_CMSG_NXTHDR (msghdr, cmsg);
823         }
824     }
825 
826     /*  Send it further down the stack. */
827     rc = nn_sock_send (sock, &msg, flags);
828     if (nn_slow (rc < 0)) {
829 
830         /*  If we are dealing with user-supplied buffer, detach it from
831             the message object. */
832         if (nnmsg)
833             nn_chunkref_init (&msg.body, 0);
834 
835         nn_msg_term (&msg);
836         goto fail;
837     }
838 
839     /*  Adjust the statistics. */
840     nn_sock_stat_increment (sock, NN_STAT_MESSAGES_SENT, 1);
841     nn_sock_stat_increment (sock, NN_STAT_BYTES_SENT, sz);
842 
843     nn_global_rele_socket (sock);
844 
845     return (int) sz;
846 
847 fail:
848     nn_global_rele_socket (sock);
849 
850     errno = -rc;
851     return -1;
852 }
853 
nn_recvmsg(int s,struct nn_msghdr * msghdr,int flags)854 int nn_recvmsg (int s, struct nn_msghdr *msghdr, int flags)
855 {
856     int rc;
857     struct nn_msg msg;
858     uint8_t *data;
859     size_t sz;
860     int i;
861     struct nn_iovec *iov;
862     void *chunk;
863     size_t hdrssz;
864     void *ctrl;
865     size_t ctrlsz;
866     size_t spsz;
867     size_t sptotalsz;
868     struct nn_cmsghdr *chdr;
869     struct nn_sock *sock;
870 
871     rc = nn_global_hold_socket (&sock, s);
872     if (nn_slow (rc < 0)) {
873         errno = -rc;
874         return -1;
875     }
876 
877     if (nn_slow (!msghdr)) {
878         rc = -EINVAL;
879         goto fail;
880     }
881 
882     if (nn_slow (msghdr->msg_iovlen < 0)) {
883         rc = -EMSGSIZE;
884         goto fail;
885     }
886 
887     /*  Get a message. */
888     rc = nn_sock_recv (sock, &msg, flags);
889     if (nn_slow (rc < 0)) {
890         goto fail;
891     }
892 
893     if (msghdr->msg_iovlen == 1 && msghdr->msg_iov [0].iov_len == NN_MSG) {
894         chunk = nn_chunkref_getchunk (&msg.body);
895         *(void**) (msghdr->msg_iov [0].iov_base) = chunk;
896         sz = nn_chunk_size (chunk);
897     }
898     else {
899 
900         /*  Copy the message content into the supplied gather array. */
901         data = nn_chunkref_data (&msg.body);
902         sz = nn_chunkref_size (&msg.body);
903         for (i = 0; i != msghdr->msg_iovlen; ++i) {
904             iov = &msghdr->msg_iov [i];
905             if (nn_slow (iov->iov_len == NN_MSG)) {
906                 nn_msg_term (&msg);
907                 rc = -EINVAL;
908                 goto fail;
909             }
910             if (iov->iov_len > sz) {
911                 memcpy (iov->iov_base, data, sz);
912                 break;
913             }
914             memcpy (iov->iov_base, data, iov->iov_len);
915             data += iov->iov_len;
916             sz -= iov->iov_len;
917         }
918         sz = nn_chunkref_size (&msg.body);
919     }
920 
921     /*  Retrieve the ancillary data from the message. */
922     if (msghdr->msg_control) {
923 
924         spsz = nn_chunkref_size (&msg.sphdr);
925         sptotalsz = NN_CMSG_SPACE (spsz+sizeof (size_t));
926         ctrlsz = sptotalsz + nn_chunkref_size (&msg.hdrs);
927 
928         if (msghdr->msg_controllen == NN_MSG) {
929 
930             /* Allocate the buffer. */
931             rc = nn_chunk_alloc (ctrlsz, 0, &ctrl);
932             errnum_assert (rc == 0, -rc);
933 
934             /* Set output parameters. */
935             *((void**) msghdr->msg_control) = ctrl;
936         }
937         else {
938 
939             /* Just use the buffer supplied by the user. */
940             ctrl = msghdr->msg_control;
941             ctrlsz = msghdr->msg_controllen;
942         }
943 
944         /* If SP header alone won't fit into the buffer, return no ancillary
945            properties. */
946         if (ctrlsz >= sptotalsz) {
947             char *ptr;
948 
949             /*  Fill in SP_HDR ancillary property. */
950             chdr = (struct nn_cmsghdr*) ctrl;
951             chdr->cmsg_len = sptotalsz;
952             chdr->cmsg_level = PROTO_SP;
953             chdr->cmsg_type = SP_HDR;
954             ptr = (void *)chdr;
955             ptr += sizeof (*chdr);
956             *(size_t *)(void *)ptr = spsz;
957             ptr += sizeof (size_t);
958             memcpy (ptr, nn_chunkref_data (&msg.sphdr), spsz);
959 
960             /*  Fill in as many remaining properties as possible.
961                 Truncate the trailing properties if necessary. */
962             hdrssz = nn_chunkref_size (&msg.hdrs);
963             if (hdrssz > ctrlsz - sptotalsz)
964                 hdrssz = ctrlsz - sptotalsz;
965             memcpy (((char*) ctrl) + sptotalsz,
966                 nn_chunkref_data (&msg.hdrs), hdrssz);
967         }
968     }
969 
970     nn_msg_term (&msg);
971 
972     /*  Adjust the statistics. */
973     nn_sock_stat_increment (sock, NN_STAT_MESSAGES_RECEIVED, 1);
974     nn_sock_stat_increment (sock, NN_STAT_BYTES_RECEIVED, sz);
975 
976     nn_global_rele_socket (sock);
977 
978     return (int) sz;
979 
980 fail:
981     nn_global_rele_socket (sock);
982 
983     errno = -rc;
984     return -1;
985 }
986 
nn_get_statistic(int s,int statistic)987 uint64_t nn_get_statistic (int s, int statistic)
988 {
989     int rc;
990     struct nn_sock *sock;
991     uint64_t val;
992 
993     rc = nn_global_hold_socket (&sock, s);
994     if (nn_slow (rc < 0)) {
995         errno = -rc;
996         return (uint64_t)-1;
997     }
998 
999     switch (statistic) {
1000     case NN_STAT_ESTABLISHED_CONNECTIONS:
1001         val = sock->statistics.established_connections;
1002         break;
1003     case NN_STAT_ACCEPTED_CONNECTIONS:
1004         val = sock->statistics.accepted_connections;
1005         break;
1006     case NN_STAT_DROPPED_CONNECTIONS:
1007         val = sock->statistics.dropped_connections;
1008         break;
1009     case NN_STAT_BROKEN_CONNECTIONS:
1010         val = sock->statistics.broken_connections;
1011         break;
1012     case NN_STAT_CONNECT_ERRORS:
1013         val = sock->statistics.connect_errors;
1014         break;
1015     case NN_STAT_BIND_ERRORS:
1016         val = sock->statistics.bind_errors;
1017         break;
1018     case NN_STAT_ACCEPT_ERRORS:
1019         val = sock->statistics.bind_errors;
1020         break;
1021     case NN_STAT_MESSAGES_SENT:
1022         val = sock->statistics.messages_sent;
1023         break;
1024     case NN_STAT_MESSAGES_RECEIVED:
1025         val = sock->statistics.messages_received;
1026         break;
1027     case NN_STAT_BYTES_SENT:
1028         val = sock->statistics.bytes_sent;
1029         break;
1030     case NN_STAT_BYTES_RECEIVED:
1031         val = sock->statistics.bytes_received;
1032         break;
1033     case NN_STAT_CURRENT_CONNECTIONS:
1034         val = sock->statistics.current_connections;
1035         break;
1036     case NN_STAT_INPROGRESS_CONNECTIONS:
1037         val = sock->statistics.inprogress_connections;
1038         break;
1039     case NN_STAT_CURRENT_SND_PRIORITY:
1040         val = sock->statistics.current_snd_priority;
1041         break;
1042     case NN_STAT_CURRENT_EP_ERRORS:
1043         val = sock->statistics.current_ep_errors;
1044         break;
1045     default:
1046         val = (uint64_t)-1;
1047         errno = EINVAL;
1048         break;
1049     }
1050 
1051     nn_global_rele_socket (sock);
1052     return val;
1053 }
1054 
nn_global_create_ep(struct nn_sock * sock,const char * addr,int bind)1055 static int nn_global_create_ep (struct nn_sock *sock, const char *addr,
1056     int bind)
1057 {
1058     int rc;
1059     const char *proto;
1060     const char *delim;
1061     size_t protosz;
1062     const struct nn_transport *tp;
1063     int i;
1064 
1065     /*  Check whether address is valid. */
1066     if (!addr)
1067         return -EINVAL;
1068     if (strlen (addr) >= NN_SOCKADDR_MAX)
1069         return -ENAMETOOLONG;
1070 
1071     /*  Separate the protocol and the actual address. */
1072     proto = addr;
1073     delim = strchr (addr, ':');
1074     if (!delim)
1075         return -EINVAL;
1076     if (delim [1] != '/' || delim [2] != '/')
1077         return -EINVAL;
1078     protosz = delim - addr;
1079     addr += protosz + 3;
1080 
1081     /*  Find the specified protocol. */
1082     tp = NULL;
1083     for (i = 0; ((tp = nn_transports[i]) != NULL); i++) {
1084         if (strlen (tp->name) == protosz &&
1085               memcmp (tp->name, proto, protosz) == 0)
1086             break;
1087     }
1088 
1089     /*  The protocol specified doesn't match any known protocol. */
1090     if (tp == NULL) {
1091         return -EPROTONOSUPPORT;
1092     }
1093 
1094     /*  Ask the socket to create the endpoint. */
1095     rc = nn_sock_add_ep (sock, tp, bind, addr);
1096     return rc;
1097 }
1098 
nn_global_transport(int id)1099 const struct nn_transport *nn_global_transport (int id)
1100 {
1101     const struct nn_transport *tp;
1102     int i;
1103 
1104     for (i = 0; (tp = nn_transports[i]) != NULL; i++) {
1105         if (tp->id == id)
1106             return tp;
1107     }
1108     return NULL;
1109 }
1110 
nn_global_getpool()1111 struct nn_pool *nn_global_getpool ()
1112 {
1113     return &self.pool;
1114 }
1115 
nn_global_print_errors()1116 int nn_global_print_errors ()
1117 {
1118     return self.print_errors;
1119 }
1120 
1121 /*  Get the socket structure for a socket id.  This must be called under
1122     the global lock (self.lock.)  The socket itself will not be freed
1123     while the hold is active. */
nn_global_hold_socket_locked(struct nn_sock ** sockp,int s)1124 int nn_global_hold_socket_locked(struct nn_sock **sockp, int s)
1125 {
1126     struct nn_sock *sock;
1127 
1128     if (nn_slow (s < 0 || s >= NN_MAX_SOCKETS || self.socks == NULL))
1129         return -EBADF;
1130 
1131     sock = self.socks[s];
1132     if (nn_slow (sock == NULL)) {
1133         return -EBADF;
1134     }
1135 
1136     if (nn_slow (nn_sock_hold (sock) != 0)) {
1137         return -EBADF;
1138     }
1139     *sockp = sock;
1140     return 0;
1141 }
1142 
nn_global_hold_socket(struct nn_sock ** sockp,int s)1143 int nn_global_hold_socket(struct nn_sock **sockp, int s)
1144 {
1145     int rc;
1146     nn_mutex_lock(&self.lock);
1147     rc = nn_global_hold_socket_locked(sockp, s);
1148     nn_mutex_unlock(&self.lock);
1149     return rc;
1150 }
1151 
nn_global_rele_socket(struct nn_sock * sock)1152 void nn_global_rele_socket(struct nn_sock *sock)
1153 {
1154     nn_mutex_lock(&self.lock);
1155     nn_sock_rele(sock);
1156     nn_mutex_unlock(&self.lock);
1157 }
1158