1 /*
2     Copyright (c) 2012-2014 Martin Sustrik  All rights reserved.
3     Copyright (c) 2013 GoPivotal, Inc.  All rights reserved.
4     Copyright 2017 Garrett D'Amore <garrett@damore.org>
5     Copyright 2017 Capitar IT Group BV <info@capitar.com>
6 
7     Permission is hereby granted, free of charge, to any person obtaining a copy
8     of this software and associated documentation files (the "Software"),
9     to deal in the Software without restriction, including without limitation
10     the rights to use, copy, modify, merge, publish, distribute, sublicense,
11     and/or sell copies of the Software, and to permit persons to whom
12     the Software is furnished to do so, subject to the following conditions:
13 
14     The above copyright notice and this permission notice shall be included
15     in all copies or substantial portions of the Software.
16 
17     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20     THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23     IN THE SOFTWARE.
24 */
25 
26 #include "../protocol.h"
27 #include "../transport.h"
28 
29 #include "sock.h"
30 #include "global.h"
31 #include "ep.h"
32 
33 #include "../utils/err.h"
34 #include "../utils/cont.h"
35 #include "../utils/clock.h"
36 #include "../utils/fast.h"
37 #include "../utils/alloc.h"
38 #include "../utils/msg.h"
39 
40 #include <limits.h>
41 
42 /*  These bits specify whether individual efds are signalled or not at
43     the moment. Storing this information allows us to avoid redundant signalling
44     and unsignalling of the efd objects. */
45 #define NN_SOCK_FLAG_IN 1
46 #define NN_SOCK_FLAG_OUT 2
47 
48 /*  Possible states of the socket. */
49 #define NN_SOCK_STATE_INIT 1
50 #define NN_SOCK_STATE_ACTIVE 2
51 #define NN_SOCK_STATE_STOPPING_EPS 3
52 #define NN_SOCK_STATE_STOPPING 4
53 #define NN_SOCK_STATE_FINI 5
54 
55 /*  Events sent to the state machine. */
56 #define NN_SOCK_ACTION_STOPPED 1
57 
58 /*  Subordinated source objects. */
59 #define NN_SOCK_SRC_EP 1
60 
61 /*  Private functions. */
62 static struct nn_optset *nn_sock_optset (struct nn_sock *self, int id);
63 static int nn_sock_setopt_inner (struct nn_sock *self, int level,
64     int option, const void *optval, size_t optvallen);
65 static void nn_sock_onleave (struct nn_ctx *self);
66 static void nn_sock_handler (struct nn_fsm *self, int src, int type,
67     void *srcptr);
68 static void nn_sock_shutdown (struct nn_fsm *self, int src, int type,
69     void *srcptr);
70 
71 /*  Initialize a socket.  A hold is placed on the initialized socket for
72     the caller as well. */
nn_sock_init(struct nn_sock * self,const struct nn_socktype * socktype,int fd)73 int nn_sock_init (struct nn_sock *self, const struct nn_socktype *socktype,
74     int fd)
75 {
76     int rc;
77     int i;
78 
79     /* Make sure that at least one message direction is supported. */
80     nn_assert (!(socktype->flags & NN_SOCKTYPE_FLAG_NOSEND) ||
81         !(socktype->flags & NN_SOCKTYPE_FLAG_NORECV));
82 
83     /*  Create the AIO context for the SP socket. */
84     nn_ctx_init (&self->ctx, nn_global_getpool (), nn_sock_onleave);
85 
86     /*  Initialise the state machine. */
87     nn_fsm_init_root (&self->fsm, nn_sock_handler,
88         nn_sock_shutdown, &self->ctx);
89     self->state = NN_SOCK_STATE_INIT;
90 
91     /*  Open the NN_SNDFD and NN_RCVFD efds. Do so, only if the socket type
92         supports send/recv, as appropriate. */
93     if (socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)
94         memset (&self->sndfd, 0xcd, sizeof (self->sndfd));
95     else {
96         rc = nn_efd_init (&self->sndfd);
97         if (nn_slow (rc < 0))
98             return rc;
99     }
100     if (socktype->flags & NN_SOCKTYPE_FLAG_NORECV)
101         memset (&self->rcvfd, 0xcd, sizeof (self->rcvfd));
102     else {
103         rc = nn_efd_init (&self->rcvfd);
104         if (nn_slow (rc < 0)) {
105             if (!(socktype->flags & NN_SOCKTYPE_FLAG_NOSEND))
106                 nn_efd_term (&self->sndfd);
107             return rc;
108         }
109     }
110     nn_sem_init (&self->termsem);
111     nn_sem_init (&self->relesem);
112     if (nn_slow (rc < 0)) {
113         if (!(socktype->flags & NN_SOCKTYPE_FLAG_NORECV))
114             nn_efd_term (&self->rcvfd);
115         if (!(socktype->flags & NN_SOCKTYPE_FLAG_NOSEND))
116             nn_efd_term (&self->sndfd);
117         return rc;
118     }
119 
120     self->holds = 1;   /*  Callers hold. */
121     self->flags = 0;
122     nn_list_init (&self->eps);
123     nn_list_init (&self->sdeps);
124     self->eid = 1;
125 
126     /*  Default values for NN_SOL_SOCKET options. */
127     self->sndbuf = 128 * 1024;
128     self->rcvbuf = 128 * 1024;
129     self->rcvmaxsize = 1024 * 1024;
130     self->sndtimeo = -1;
131     self->rcvtimeo = -1;
132     self->reconnect_ivl = 100;
133     self->reconnect_ivl_max = 0;
134     self->maxttl = 8;
135     self->ep_template.sndprio = 8;
136     self->ep_template.rcvprio = 8;
137     self->ep_template.ipv4only = 1;
138 
139     /* Clear statistic entries */
140     memset(&self->statistics, 0, sizeof (self->statistics));
141 
142     /*  Should be pretty much enough space for just the number  */
143     sprintf(self->socket_name, "%d", fd);
144 
145     /* Security attribute */
146     self->sec_attr = NULL;
147     self->sec_attr_size = 0;
148     self->inbuffersz = 4096;
149     self->outbuffersz = 4096;
150 
151     /*  The transport-specific options are not initialised immediately,
152         rather, they are allocated later on when needed. */
153     for (i = 0; i != NN_MAX_TRANSPORT; ++i)
154         self->optsets [i] = NULL;
155 
156     /*  Create the specific socket type itself. */
157     rc = socktype->create ((void*) self, &self->sockbase);
158     errnum_assert (rc == 0, -rc);
159     self->socktype = socktype;
160 
161     /*  Launch the state machine. */
162     nn_ctx_enter (&self->ctx);
163     nn_fsm_start (&self->fsm);
164     nn_ctx_leave (&self->ctx);
165 
166     return 0;
167 }
168 
nn_sock_stopped(struct nn_sock * self)169 void nn_sock_stopped (struct nn_sock *self)
170 {
171     /*  TODO: Do the following in a more sane way. */
172     self->fsm.stopped.fsm = &self->fsm;
173     self->fsm.stopped.src = NN_FSM_ACTION;
174     self->fsm.stopped.srcptr = NULL;
175     self->fsm.stopped.type = NN_SOCK_ACTION_STOPPED;
176     nn_ctx_raise (self->fsm.ctx, &self->fsm.stopped);
177 }
178 
179 /*  Stop the socket.  This will prevent new calls from aquiring a
180     hold on the socket, cause endpoints to shut down, and wake any
181     threads waiting to recv or send data. */
nn_sock_stop(struct nn_sock * self)182 void nn_sock_stop (struct nn_sock *self)
183 {
184     nn_ctx_enter (&self->ctx);
185     nn_fsm_stop (&self->fsm);
186     nn_ctx_leave (&self->ctx);
187 }
188 
nn_sock_term(struct nn_sock * self)189 int nn_sock_term (struct nn_sock *self)
190 {
191     int rc;
192     int i;
193 
194     /*  NOTE: nn_sock_stop must have already been called. */
195 
196     /*  Some endpoints may still be alive.  Here we are going to wait
197         till they are all closed.  This loop is not interruptible, because
198         making it so would leave a partially cleaned up socket, and we don't
199         have a way to defer resource deallocation. */
200     for (;;) {
201         rc = nn_sem_wait (&self->termsem);
202         if (nn_slow (rc == -EINTR))
203             continue;
204         errnum_assert (rc == 0, -rc);
205         break;
206     }
207 
208     /*  Also, wait for all holds on the socket to be released.  */
209     for (;;) {
210         rc = nn_sem_wait (&self->relesem);
211         if (nn_slow (rc == -EINTR))
212             continue;
213         errnum_assert (rc == 0, -rc);
214         break;
215     }
216 
217     /*  Threads that posted the semaphore(s) can still have the ctx locked
218         for a short while. By simply entering the context and exiting it
219         immediately we can be sure that any such threads have already
220         exited the context. */
221     nn_ctx_enter (&self->ctx);
222     nn_ctx_leave (&self->ctx);
223 
224     /*  At this point, we can be reasonably certain that no other thread
225         has any references to the socket. */
226 
227     /*  Close the event FDs entirely. */
228     if (!(self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)) {
229         nn_efd_term (&self->rcvfd);
230     }
231     if (!(self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)) {
232         nn_efd_term (&self->sndfd);
233     }
234 
235     nn_fsm_stopped_noevent (&self->fsm);
236     nn_fsm_term (&self->fsm);
237     nn_sem_term (&self->termsem);
238     nn_sem_term (&self->relesem);
239     nn_list_term (&self->sdeps);
240     nn_list_term (&self->eps);
241     nn_ctx_term (&self->ctx);
242 
243     /*  Destroy any optsets associated with the socket. */
244     for (i = 0; i != NN_MAX_TRANSPORT; ++i)
245         if (self->optsets [i])
246             self->optsets [i]->vfptr->destroy (self->optsets [i]);
247 
248     return 0;
249 }
250 
nn_sock_getctx(struct nn_sock * self)251 struct nn_ctx *nn_sock_getctx (struct nn_sock *self)
252 {
253     return &self->ctx;
254 }
255 
nn_sock_ispeer(struct nn_sock * self,int socktype)256 int nn_sock_ispeer (struct nn_sock *self, int socktype)
257 {
258     /*  If the peer implements a different SP protocol it is not a valid peer.
259         Checking it here ensures that even if faulty protocol implementation
260         allows for cross-protocol communication, it will never happen
261         in practice. */
262     if ((self->socktype->protocol & 0xfff0) != (socktype  & 0xfff0))
263         return 0;
264 
265     /*  As long as the peer speaks the same protocol, socket type itself
266         decides which socket types are to be accepted. */
267     return self->socktype->ispeer (socktype);
268 }
269 
nn_sock_setopt(struct nn_sock * self,int level,int option,const void * optval,size_t optvallen)270 int nn_sock_setopt (struct nn_sock *self, int level, int option,
271     const void *optval, size_t optvallen)
272 {
273     int rc;
274 
275     nn_ctx_enter (&self->ctx);
276     rc = nn_sock_setopt_inner (self, level, option, optval, optvallen);
277     nn_ctx_leave (&self->ctx);
278 
279     return rc;
280 }
281 
nn_sock_setopt_inner(struct nn_sock * self,int level,int option,const void * optval,size_t optvallen)282 static int nn_sock_setopt_inner (struct nn_sock *self, int level,
283     int option, const void *optval, size_t optvallen)
284 {
285     struct nn_optset *optset;
286     int val;
287 
288     /*  Protocol-specific socket options. */
289     if (level > NN_SOL_SOCKET) {
290         if (self->sockbase->vfptr->setopt == NULL) {
291             return -ENOPROTOOPT;
292         }
293         return self->sockbase->vfptr->setopt (self->sockbase, level, option,
294             optval, optvallen);
295     }
296 
297     /*  Transport-specific options. */
298     if (level < NN_SOL_SOCKET) {
299         optset = nn_sock_optset (self, level);
300         if (!optset)
301             return -ENOPROTOOPT;
302         return optset->vfptr->setopt (optset, option, optval, optvallen);
303     }
304 
305     nn_assert (level == NN_SOL_SOCKET);
306 
307     /*  Special-casing socket name for now as it's the only string option  */
308     if (option == NN_SOCKET_NAME) {
309         if (optvallen > 63)
310             return -EINVAL;
311         memcpy (self->socket_name, optval, optvallen);
312         self->socket_name [optvallen] = 0;
313         return 0;
314     }
315 
316     /*  At this point we assume that all options are of type int. */
317     if (optvallen != sizeof (int))
318         return -EINVAL;
319     val = *(int*) optval;
320 
321     /*  Generic socket-level options. */
322     switch (option) {
323     case NN_SNDBUF:
324         if (val <= 0)
325             return -EINVAL;
326         self->sndbuf = val;
327         return 0;
328     case NN_RCVBUF:
329         if (val <= 0)
330             return -EINVAL;
331         self->rcvbuf = val;
332         return 0;
333     case NN_RCVMAXSIZE:
334         if (val < -1)
335             return -EINVAL;
336         self->rcvmaxsize = val;
337         return 0;
338     case NN_SNDTIMEO:
339         self->sndtimeo = val;
340         return 0;
341     case NN_RCVTIMEO:
342         self->rcvtimeo = val;
343         return 0;
344     case NN_RECONNECT_IVL:
345         if (val < 0)
346             return -EINVAL;
347         self->reconnect_ivl = val;
348         return 0;
349     case NN_RECONNECT_IVL_MAX:
350         if (val < 0)
351             return -EINVAL;
352         self->reconnect_ivl_max = val;
353         return 0;
354     case NN_SNDPRIO:
355         if (val < 1 || val > 16)
356             return -EINVAL;
357         self->ep_template.sndprio = val;
358         return 0;
359     case NN_RCVPRIO:
360         if (val < 1 || val > 16)
361             return -EINVAL;
362         self->ep_template.rcvprio = val;
363         return 0;
364     case NN_IPV4ONLY:
365         if (val != 0 && val != 1)
366             return -EINVAL;
367         self->ep_template.ipv4only = val;
368         return 0;
369     case NN_MAXTTL:
370         if (val < 1 || val > 255)
371             return -EINVAL;
372         self->maxttl = val;
373         return 0;
374     case NN_LINGER:
375 	/*  Ignored, retained for compatibility. */
376         return 0;
377     }
378 
379     return -ENOPROTOOPT;
380 }
381 
nn_sock_getopt(struct nn_sock * self,int level,int option,void * optval,size_t * optvallen)382 int nn_sock_getopt (struct nn_sock *self, int level, int option,
383     void *optval, size_t *optvallen)
384 {
385     int rc;
386 
387     nn_ctx_enter (&self->ctx);
388     rc = nn_sock_getopt_inner (self, level, option, optval, optvallen);
389     nn_ctx_leave (&self->ctx);
390 
391     return rc;
392 }
393 
nn_sock_getopt_inner(struct nn_sock * self,int level,int option,void * optval,size_t * optvallen)394 int nn_sock_getopt_inner (struct nn_sock *self, int level,
395     int option, void *optval, size_t *optvallen)
396 {
397     struct nn_optset *optset;
398     int intval;
399     nn_fd fd;
400 
401     /*  Protocol-specific socket options. */
402     if (level > NN_SOL_SOCKET) {
403         if (self->sockbase->vfptr->getopt == NULL) {
404             return -ENOPROTOOPT;
405         }
406         return self->sockbase->vfptr->getopt (self->sockbase,
407             level, option, optval, optvallen);
408     }
409 
410     /*  Transport-specific options. */
411     if (level < NN_SOL_SOCKET) {
412         optset = nn_sock_optset (self, level);
413         if (!optset)
414             return -ENOPROTOOPT;
415         return optset->vfptr->getopt (optset, option, optval, optvallen);
416     }
417 
418     nn_assert (level == NN_SOL_SOCKET);
419 
420     /*  Generic socket-level options. */
421     switch (option) {
422     case NN_DOMAIN:
423         intval = self->socktype->domain;
424         break;
425     case NN_PROTOCOL:
426         intval = self->socktype->protocol;
427         break;
428     case NN_LINGER:
429         intval = 0;
430         break;
431     case NN_SNDBUF:
432         intval = self->sndbuf;
433         break;
434     case NN_RCVBUF:
435         intval = self->rcvbuf;
436         break;
437     case NN_RCVMAXSIZE:
438         intval = self->rcvmaxsize;
439         break;
440     case NN_SNDTIMEO:
441         intval = self->sndtimeo;
442         break;
443     case NN_RCVTIMEO:
444         intval = self->rcvtimeo;
445         break;
446     case NN_RECONNECT_IVL:
447         intval = self->reconnect_ivl;
448         break;
449     case NN_RECONNECT_IVL_MAX:
450         intval = self->reconnect_ivl_max;
451         break;
452     case NN_SNDPRIO:
453         intval = self->ep_template.sndprio;
454         break;
455     case NN_RCVPRIO:
456         intval = self->ep_template.rcvprio;
457         break;
458     case NN_IPV4ONLY:
459         intval = self->ep_template.ipv4only;
460         break;
461     case NN_MAXTTL:
462         intval = self->maxttl;
463         break;
464     case NN_SNDFD:
465         if (self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)
466             return -ENOPROTOOPT;
467         fd = nn_efd_getfd (&self->sndfd);
468         memcpy (optval, &fd,
469             *optvallen < sizeof (nn_fd) ? *optvallen : sizeof (nn_fd));
470         *optvallen = sizeof (nn_fd);
471         return 0;
472     case NN_RCVFD:
473         if (self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)
474             return -ENOPROTOOPT;
475         fd = nn_efd_getfd (&self->rcvfd);
476         memcpy (optval, &fd,
477             *optvallen < sizeof (nn_fd) ? *optvallen : sizeof (nn_fd));
478         *optvallen = sizeof (nn_fd);
479         return 0;
480     case NN_SOCKET_NAME:
481         strncpy (optval, self->socket_name, *optvallen);
482         *optvallen = strlen(self->socket_name);
483         return 0;
484     default:
485         return -ENOPROTOOPT;
486     }
487 
488     memcpy (optval, &intval,
489         *optvallen < sizeof (int) ? *optvallen : sizeof (int));
490     *optvallen = sizeof (int);
491 
492     return 0;
493 }
494 
nn_sock_add_ep(struct nn_sock * self,const struct nn_transport * transport,int bind,const char * addr)495 int nn_sock_add_ep (struct nn_sock *self, const struct nn_transport *transport,
496     int bind, const char *addr)
497 {
498     int rc;
499     struct nn_ep *ep;
500     int eid;
501 
502     nn_ctx_enter (&self->ctx);
503 
504     /*  Instantiate the endpoint. */
505     ep = nn_alloc (sizeof (struct nn_ep), "endpoint");
506     rc = nn_ep_init (ep, NN_SOCK_SRC_EP, self, self->eid, transport,
507         bind, addr);
508     if (nn_slow (rc < 0)) {
509         nn_free (ep);
510         nn_ctx_leave (&self->ctx);
511         return rc;
512     }
513     nn_ep_start (ep);
514 
515     /*  Increase the endpoint ID for the next endpoint. */
516     eid = self->eid;
517     ++self->eid;
518 
519     /*  Add it to the list of active endpoints. */
520     nn_list_insert (&self->eps, &ep->item, nn_list_end (&self->eps));
521 
522     nn_ctx_leave (&self->ctx);
523 
524     return eid;
525 }
526 
nn_sock_rm_ep(struct nn_sock * self,int eid)527 int nn_sock_rm_ep (struct nn_sock *self, int eid)
528 {
529     struct nn_list_item *it;
530     struct nn_ep *ep;
531 
532     nn_ctx_enter (&self->ctx);
533 
534     /*  Find the specified enpoint. */
535     ep = NULL;
536     for (it = nn_list_begin (&self->eps);
537           it != nn_list_end (&self->eps);
538           it = nn_list_next (&self->eps, it)) {
539         ep = nn_cont (it, struct nn_ep, item);
540         if (ep->eid == eid)
541             break;
542         ep = NULL;
543     }
544 
545     /*  The endpoint doesn't exist. */
546     if (!ep) {
547         nn_ctx_leave (&self->ctx);
548         return -EINVAL;
549     }
550 
551     /*  Move the endpoint from the list of active endpoints to the list
552         of shutting down endpoints. */
553     nn_list_erase (&self->eps, &ep->item);
554     nn_list_insert (&self->sdeps, &ep->item, nn_list_end (&self->sdeps));
555 
556     /*  Ask the endpoint to stop. Actual terminatation may be delayed
557         by the transport. */
558     nn_ep_stop (ep);
559 
560     nn_ctx_leave (&self->ctx);
561 
562     return 0;
563 }
564 
nn_sock_send(struct nn_sock * self,struct nn_msg * msg,int flags)565 int nn_sock_send (struct nn_sock *self, struct nn_msg *msg, int flags)
566 {
567     int rc;
568     uint64_t deadline;
569     uint64_t now;
570     int timeout;
571 
572     /*  Some sockets types cannot be used for sending messages. */
573     if (nn_slow (self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND))
574         return -ENOTSUP;
575 
576     nn_ctx_enter (&self->ctx);
577 
578     /*  Compute the deadline for SNDTIMEO timer. */
579     if (self->sndtimeo < 0) {
580         deadline = -1;
581         timeout = -1;
582     }
583     else {
584         deadline = nn_clock_ms() + self->sndtimeo;
585         timeout = self->sndtimeo;
586     }
587 
588     while (1) {
589 
590         switch (self->state) {
591         case NN_SOCK_STATE_ACTIVE:
592         case NN_SOCK_STATE_INIT:
593              break;
594 
595         case NN_SOCK_STATE_STOPPING_EPS:
596         case NN_SOCK_STATE_STOPPING:
597         case NN_SOCK_STATE_FINI:
598             /*  Socket closed or closing.  Should we return something
599                 else here; recvmsg(2) for example returns no data in
600                 this case, like read(2).  The use of indexed file
601                 descriptors is further problematic, as an FD can be reused
602                 leading to situations where technically the outstanding
603                 operation should refer to some other socket entirely.  */
604             nn_ctx_leave (&self->ctx);
605             return -EBADF;
606         }
607 
608         /*  Try to send the message in a non-blocking way. */
609         rc = self->sockbase->vfptr->send (self->sockbase, msg);
610         if (nn_fast (rc == 0)) {
611             nn_ctx_leave (&self->ctx);
612             return 0;
613         }
614         nn_assert (rc < 0);
615 
616         /*  Any unexpected error is forwarded to the caller. */
617         if (nn_slow (rc != -EAGAIN)) {
618             nn_ctx_leave (&self->ctx);
619             return rc;
620         }
621 
622         /*  If the message cannot be sent at the moment and the send call
623             is non-blocking, return immediately. */
624         if (nn_fast (flags & NN_DONTWAIT)) {
625             nn_ctx_leave (&self->ctx);
626             return -EAGAIN;
627         }
628 
629         /*  With blocking send, wait while there are new pipes available
630             for sending. */
631         nn_ctx_leave (&self->ctx);
632         rc = nn_efd_wait (&self->sndfd, timeout);
633         if (nn_slow (rc == -ETIMEDOUT))
634             return -ETIMEDOUT;
635         if (nn_slow (rc == -EINTR))
636             return -EINTR;
637         if (nn_slow (rc == -EBADF))
638             return -EBADF;
639         errnum_assert (rc == 0, rc);
640         nn_ctx_enter (&self->ctx);
641         /*
642          *  Double check if pipes are still available for sending
643          */
644         if (!nn_efd_wait (&self->sndfd, 0)) {
645             self->flags |= NN_SOCK_FLAG_OUT;
646         }
647 
648         /*  If needed, re-compute the timeout to reflect the time that have
649             already elapsed. */
650         if (self->sndtimeo >= 0) {
651             now = nn_clock_ms();
652             timeout = (int) (now > deadline ? 0 : deadline - now);
653         }
654     }
655 }
656 
nn_sock_recv(struct nn_sock * self,struct nn_msg * msg,int flags)657 int nn_sock_recv (struct nn_sock *self, struct nn_msg *msg, int flags)
658 {
659     int rc;
660     uint64_t deadline;
661     uint64_t now;
662     int timeout;
663 
664     /*  Some sockets types cannot be used for receiving messages. */
665     if (nn_slow (self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV))
666         return -ENOTSUP;
667 
668     nn_ctx_enter (&self->ctx);
669 
670     /*  Compute the deadline for RCVTIMEO timer. */
671     if (self->rcvtimeo < 0) {
672         deadline = -1;
673         timeout = -1;
674     }
675     else {
676         deadline = nn_clock_ms() + self->rcvtimeo;
677         timeout = self->rcvtimeo;
678     }
679 
680     while (1) {
681 
682         switch (self->state) {
683         case NN_SOCK_STATE_ACTIVE:
684         case NN_SOCK_STATE_INIT:
685              break;
686 
687         case NN_SOCK_STATE_STOPPING_EPS:
688         case NN_SOCK_STATE_STOPPING:
689         case NN_SOCK_STATE_FINI:
690             /*  Socket closed or closing.  Should we return something
691                 else here; recvmsg(2) for example returns no data in
692                 this case, like read(2).  The use of indexed file
693                 descriptors is further problematic, as an FD can be reused
694                 leading to situations where technically the outstanding
695                 operation should refer to some other socket entirely.  */
696             nn_ctx_leave (&self->ctx);
697             return -EBADF;
698         }
699 
700         /*  Try to receive the message in a non-blocking way. */
701         rc = self->sockbase->vfptr->recv (self->sockbase, msg);
702         if (nn_fast (rc == 0)) {
703             nn_ctx_leave (&self->ctx);
704             return 0;
705         }
706         nn_assert (rc < 0);
707 
708         /*  Any unexpected error is forwarded to the caller. */
709         if (nn_slow (rc != -EAGAIN)) {
710             nn_ctx_leave (&self->ctx);
711             return rc;
712         }
713 
714         /*  If the message cannot be received at the moment and the recv call
715             is non-blocking, return immediately. */
716         if (nn_fast (flags & NN_DONTWAIT)) {
717             nn_ctx_leave (&self->ctx);
718             return -EAGAIN;
719         }
720 
721         /*  With blocking recv, wait while there are new pipes available
722             for receiving. */
723         nn_ctx_leave (&self->ctx);
724         rc = nn_efd_wait (&self->rcvfd, timeout);
725         if (nn_slow (rc == -ETIMEDOUT))
726             return -ETIMEDOUT;
727         if (nn_slow (rc == -EINTR))
728             return -EINTR;
729         if (nn_slow (rc == -EBADF))
730             return -EBADF;
731         errnum_assert (rc == 0, rc);
732         nn_ctx_enter (&self->ctx);
733         /*
734          *  Double check if pipes are still available for receiving
735          */
736         if (!nn_efd_wait (&self->rcvfd, 0)) {
737             self->flags |= NN_SOCK_FLAG_IN;
738         }
739 
740         /*  If needed, re-compute the timeout to reflect the time that have
741             already elapsed. */
742         if (self->rcvtimeo >= 0) {
743             now = nn_clock_ms();
744             timeout = (int) (now > deadline ? 0 : deadline - now);
745         }
746     }
747 }
748 
nn_sock_add(struct nn_sock * self,struct nn_pipe * pipe)749 int nn_sock_add (struct nn_sock *self, struct nn_pipe *pipe)
750 {
751     int rc;
752 
753     rc = self->sockbase->vfptr->add (self->sockbase, pipe);
754     if (nn_slow (rc >= 0)) {
755         nn_sock_stat_increment (self, NN_STAT_CURRENT_CONNECTIONS, 1);
756     }
757     return rc;
758 }
759 
nn_sock_rm(struct nn_sock * self,struct nn_pipe * pipe)760 void nn_sock_rm (struct nn_sock *self, struct nn_pipe *pipe)
761 {
762     self->sockbase->vfptr->rm (self->sockbase, pipe);
763     nn_sock_stat_increment (self, NN_STAT_CURRENT_CONNECTIONS, -1);
764 }
765 
nn_sock_onleave(struct nn_ctx * self)766 static void nn_sock_onleave (struct nn_ctx *self)
767 {
768     struct nn_sock *sock;
769     int events;
770 
771     sock = nn_cont (self, struct nn_sock, ctx);
772 
773     /*  If nn_close() was already called there's no point in adjusting the
774         snd/rcv file descriptors. */
775     if (nn_slow (sock->state != NN_SOCK_STATE_ACTIVE))
776         return;
777 
778     /*  Check whether socket is readable and/or writable at the moment. */
779     events = sock->sockbase->vfptr->events (sock->sockbase);
780     errnum_assert (events >= 0, -events);
781 
782     /*  Signal/unsignal IN as needed. */
783     if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)) {
784         if (events & NN_SOCKBASE_EVENT_IN) {
785             if (!(sock->flags & NN_SOCK_FLAG_IN)) {
786                 sock->flags |= NN_SOCK_FLAG_IN;
787                 nn_efd_signal (&sock->rcvfd);
788             }
789         }
790         else {
791             if (sock->flags & NN_SOCK_FLAG_IN) {
792                 sock->flags &= ~NN_SOCK_FLAG_IN;
793                 nn_efd_unsignal (&sock->rcvfd);
794             }
795         }
796     }
797 
798     /*  Signal/unsignal OUT as needed. */
799     if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)) {
800         if (events & NN_SOCKBASE_EVENT_OUT) {
801             if (!(sock->flags & NN_SOCK_FLAG_OUT)) {
802                 sock->flags |= NN_SOCK_FLAG_OUT;
803                 nn_efd_signal (&sock->sndfd);
804             }
805         }
806         else {
807             if (sock->flags & NN_SOCK_FLAG_OUT) {
808                 sock->flags &= ~NN_SOCK_FLAG_OUT;
809                 nn_efd_unsignal (&sock->sndfd);
810             }
811         }
812     }
813 }
814 
nn_sock_optset(struct nn_sock * self,int id)815 static struct nn_optset *nn_sock_optset (struct nn_sock *self, int id)
816 {
817     int index;
818     const struct nn_transport *tp;
819 
820     /*  Transport IDs are negative and start from -1. */
821     index = (-id) - 1;
822 
823     /*  Check for invalid indices. */
824     if (nn_slow (index < 0 || index >= NN_MAX_TRANSPORT))
825         return NULL;
826 
827     /*  If the option set already exists return it. */
828     if (nn_fast (self->optsets [index] != NULL))
829         return self->optsets [index];
830 
831     /*  If the option set doesn't exist yet, create it. */
832     tp = nn_global_transport (id);
833     if (nn_slow (!tp))
834         return NULL;
835     if (nn_slow (!tp->optset))
836         return NULL;
837     self->optsets [index] = tp->optset ();
838 
839     return self->optsets [index];
840 }
841 
nn_sock_shutdown(struct nn_fsm * self,int src,int type,void * srcptr)842 static void nn_sock_shutdown (struct nn_fsm *self, int src, int type,
843     void *srcptr)
844 {
845     struct nn_sock *sock;
846     struct nn_list_item *it;
847     struct nn_ep *ep;
848 
849     sock = nn_cont (self, struct nn_sock, fsm);
850 
851     if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
852         nn_assert (sock->state == NN_SOCK_STATE_ACTIVE);
853 
854         /*  Close sndfd and rcvfd. This should make any current
855             select/poll using SNDFD and/or RCVFD exit. */
856         if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)) {
857             nn_efd_stop (&sock->rcvfd);
858         }
859         if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)) {
860             nn_efd_stop (&sock->sndfd);
861         }
862 
863         /*  Ask all the associated endpoints to stop. */
864         it = nn_list_begin (&sock->eps);
865         while (it != nn_list_end (&sock->eps)) {
866             ep = nn_cont (it, struct nn_ep, item);
867             it = nn_list_next (&sock->eps, it);
868             nn_list_erase (&sock->eps, &ep->item);
869             nn_list_insert (&sock->sdeps, &ep->item,
870                 nn_list_end (&sock->sdeps));
871             nn_ep_stop (ep);
872 
873         }
874         sock->state = NN_SOCK_STATE_STOPPING_EPS;
875         goto finish2;
876     }
877     if (nn_slow (sock->state == NN_SOCK_STATE_STOPPING_EPS)) {
878 
879         if (!(src == NN_SOCK_SRC_EP && type == NN_EP_STOPPED)) {
880             /*  If we got here waiting for EPs to teardown, but src is
881                 not an EP, then it isn't safe for us to do anything,
882                 because we just need to wait for the EPs to finish
883                 up their thing.  Just bail. */
884             return;
885         }
886         /*  Endpoint is stopped. Now we can safely deallocate it. */
887         ep = (struct nn_ep*) srcptr;
888         nn_list_erase (&sock->sdeps, &ep->item);
889         nn_ep_term (ep);
890         nn_free (ep);
891 
892 finish2:
893         /*  If all the endpoints are deallocated, we can start stopping
894             protocol-specific part of the socket. If there' no stop function
895             we can consider it stopped straight away. */
896         if (!nn_list_empty (&sock->sdeps))
897             return;
898         nn_assert (nn_list_empty (&sock->eps));
899         sock->state = NN_SOCK_STATE_STOPPING;
900         if (!sock->sockbase->vfptr->stop)
901             goto finish1;
902         sock->sockbase->vfptr->stop (sock->sockbase);
903         return;
904     }
905     if (nn_slow (sock->state == NN_SOCK_STATE_STOPPING)) {
906 
907         /*  We get here when the deallocation of the socket was delayed by the
908             specific socket type. */
909         nn_assert (src == NN_FSM_ACTION && type == NN_SOCK_ACTION_STOPPED);
910 
911 finish1:
912         /*  Protocol-specific part of the socket is stopped.
913             We can safely deallocate it. */
914         sock->sockbase->vfptr->destroy (sock->sockbase);
915         sock->state = NN_SOCK_STATE_FINI;
916 
917         /*  Now we can unblock the application thread blocked in
918             the nn_close() call. */
919         nn_sem_post (&sock->termsem);
920 
921         return;
922     }
923 
924     nn_fsm_bad_state(sock->state, src, type);
925 }
926 
nn_sock_handler(struct nn_fsm * self,int src,int type,void * srcptr)927 static void nn_sock_handler (struct nn_fsm *self, int src, int type,
928     void *srcptr)
929 {
930     struct nn_sock *sock;
931     struct nn_ep *ep;
932 
933     sock = nn_cont (self, struct nn_sock, fsm);
934 
935     switch (sock->state) {
936 
937 /******************************************************************************/
938 /*  INIT state.                                                               */
939 /******************************************************************************/
940     case NN_SOCK_STATE_INIT:
941         switch (src) {
942 
943         case NN_FSM_ACTION:
944             switch (type) {
945             case NN_FSM_START:
946                 sock->state = NN_SOCK_STATE_ACTIVE;
947                 return;
948             default:
949                 nn_fsm_bad_action (sock->state, src, type);
950             }
951 
952         default:
953             nn_fsm_bad_source (sock->state, src, type);
954         }
955 
956 /******************************************************************************/
957 /*  ACTIVE state.                                                             */
958 /******************************************************************************/
959     case NN_SOCK_STATE_ACTIVE:
960         switch (src) {
961 
962         case NN_FSM_ACTION:
963             switch (type) {
964             default:
965                 nn_fsm_bad_action (sock->state, src, type);
966             }
967 
968         case NN_SOCK_SRC_EP:
969             switch (type) {
970             case NN_EP_STOPPED:
971 
972                 /*  This happens when an endpoint is closed using
973                     nn_shutdown() function. */
974                 ep = (struct nn_ep*) srcptr;
975                 nn_list_erase (&sock->sdeps, &ep->item);
976                 nn_ep_term (ep);
977                 nn_free (ep);
978                 return;
979 
980             default:
981                 nn_fsm_bad_action (sock->state, src, type);
982             }
983 
984         default:
985 
986             /*  The assumption is that all the other events come from pipes. */
987             switch (type) {
988             case NN_PIPE_IN:
989                 sock->sockbase->vfptr->in (sock->sockbase,
990                     (struct nn_pipe*) srcptr);
991                 return;
992             case NN_PIPE_OUT:
993                 sock->sockbase->vfptr->out (sock->sockbase,
994                     (struct nn_pipe*) srcptr);
995                 return;
996             default:
997                 nn_fsm_bad_action (sock->state, src, type);
998             }
999         }
1000 
1001 /******************************************************************************/
1002 /*  Invalid state.                                                            */
1003 /******************************************************************************/
1004     default:
1005         nn_fsm_bad_state (sock->state, src, type);
1006     }
1007 }
1008 
1009 /******************************************************************************/
1010 /*  State machine actions.                                                    */
1011 /******************************************************************************/
1012 
nn_sock_report_error(struct nn_sock * self,struct nn_ep * ep,int errnum)1013 void nn_sock_report_error (struct nn_sock *self, struct nn_ep *ep, int errnum)
1014 {
1015     if (!nn_global_print_errors())
1016         return;
1017 
1018     if (errnum == 0)
1019         return;
1020 
1021     if (ep) {
1022         fprintf(stderr, "nanomsg: socket.%s[%s]: Error: %s\n",
1023             self->socket_name, nn_ep_getaddr(ep), nn_strerror(errnum));
1024     } else {
1025         fprintf(stderr, "nanomsg: socket.%s: Error: %s\n",
1026             self->socket_name, nn_strerror(errnum));
1027     }
1028 }
1029 
nn_sock_stat_increment(struct nn_sock * self,int name,int64_t increment)1030 void nn_sock_stat_increment (struct nn_sock *self, int name, int64_t increment)
1031 {
1032     switch (name) {
1033         case NN_STAT_ESTABLISHED_CONNECTIONS:
1034             nn_assert (increment > 0);
1035             self->statistics.established_connections += increment;
1036             break;
1037         case NN_STAT_ACCEPTED_CONNECTIONS:
1038             nn_assert (increment > 0);
1039             self->statistics.accepted_connections += increment;
1040             break;
1041         case NN_STAT_DROPPED_CONNECTIONS:
1042             nn_assert (increment > 0);
1043             self->statistics.dropped_connections += increment;
1044             break;
1045         case NN_STAT_BROKEN_CONNECTIONS:
1046             nn_assert (increment > 0);
1047             self->statistics.broken_connections += increment;
1048             break;
1049         case NN_STAT_CONNECT_ERRORS:
1050             nn_assert (increment > 0);
1051             self->statistics.connect_errors += increment;
1052             break;
1053         case NN_STAT_BIND_ERRORS:
1054             nn_assert (increment > 0);
1055             self->statistics.bind_errors += increment;
1056             break;
1057         case NN_STAT_ACCEPT_ERRORS:
1058             nn_assert (increment > 0);
1059             self->statistics.accept_errors += increment;
1060             break;
1061         case NN_STAT_MESSAGES_SENT:
1062             nn_assert (increment > 0);
1063             self->statistics.messages_sent += increment;
1064             break;
1065         case NN_STAT_MESSAGES_RECEIVED:
1066             nn_assert (increment > 0);
1067             self->statistics.messages_received += increment;
1068             break;
1069         case NN_STAT_BYTES_SENT:
1070             nn_assert (increment >= 0);
1071             self->statistics.bytes_sent += increment;
1072             break;
1073         case NN_STAT_BYTES_RECEIVED:
1074             nn_assert (increment >= 0);
1075             self->statistics.bytes_received += increment;
1076             break;
1077 
1078         case NN_STAT_CURRENT_CONNECTIONS:
1079             nn_assert (increment > 0 ||
1080                 self->statistics.current_connections >= -increment);
1081             nn_assert(increment < INT_MAX && increment > -INT_MAX);
1082             self->statistics.current_connections += (int) increment;
1083             break;
1084         case NN_STAT_INPROGRESS_CONNECTIONS:
1085             nn_assert (increment > 0 ||
1086                 self->statistics.inprogress_connections >= -increment);
1087             nn_assert(increment < INT_MAX && increment > -INT_MAX);
1088             self->statistics.inprogress_connections += (int) increment;
1089             break;
1090         case NN_STAT_CURRENT_SND_PRIORITY:
1091             /*  This is an exception, we don't want to increment priority  */
1092             nn_assert((increment > 0 && increment <= 16) || increment == -1);
1093             self->statistics.current_snd_priority = (int) increment;
1094             break;
1095         case NN_STAT_CURRENT_EP_ERRORS:
1096             nn_assert (increment > 0 ||
1097                 self->statistics.current_ep_errors >= -increment);
1098             nn_assert(increment < INT_MAX && increment > -INT_MAX);
1099             self->statistics.current_ep_errors += (int) increment;
1100             break;
1101     }
1102 }
1103 
nn_sock_hold(struct nn_sock * self)1104 int nn_sock_hold (struct nn_sock *self)
1105 {
1106     switch (self->state) {
1107     case NN_SOCK_STATE_ACTIVE:
1108     case NN_SOCK_STATE_INIT:
1109         self->holds++;
1110         return 0;
1111     case NN_SOCK_STATE_STOPPING:
1112     case NN_SOCK_STATE_STOPPING_EPS:
1113     case NN_SOCK_STATE_FINI:
1114     default:
1115         return -EBADF;
1116     }
1117 }
1118 
nn_sock_rele(struct nn_sock * self)1119 void nn_sock_rele (struct nn_sock *self)
1120 {
1121     self->holds--;
1122     if (self->holds == 0) {
1123         nn_sem_post (&self->relesem);
1124     }
1125 }
1126