1 /*
2 Copyright (c) 2012-2014 Martin Sustrik All rights reserved.
3 Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
4 Copyright 2017 Garrett D'Amore <garrett@damore.org>
5 Copyright 2017 Capitar IT Group BV <info@capitar.com>
6
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"),
9 to deal in the Software without restriction, including without limitation
10 the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 and/or sell copies of the Software, and to permit persons to whom
12 the Software is furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included
15 in all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23 IN THE SOFTWARE.
24 */
25
26 #include "../protocol.h"
27 #include "../transport.h"
28
29 #include "sock.h"
30 #include "global.h"
31 #include "ep.h"
32
33 #include "../utils/err.h"
34 #include "../utils/cont.h"
35 #include "../utils/clock.h"
36 #include "../utils/fast.h"
37 #include "../utils/alloc.h"
38 #include "../utils/msg.h"
39
40 #include <limits.h>
41
42 /* These bits specify whether individual efds are signalled or not at
43 the moment. Storing this information allows us to avoid redundant signalling
44 and unsignalling of the efd objects. */
45 #define NN_SOCK_FLAG_IN 1
46 #define NN_SOCK_FLAG_OUT 2
47
48 /* Possible states of the socket. */
49 #define NN_SOCK_STATE_INIT 1
50 #define NN_SOCK_STATE_ACTIVE 2
51 #define NN_SOCK_STATE_STOPPING_EPS 3
52 #define NN_SOCK_STATE_STOPPING 4
53 #define NN_SOCK_STATE_FINI 5
54
55 /* Events sent to the state machine. */
56 #define NN_SOCK_ACTION_STOPPED 1
57
58 /* Subordinated source objects. */
59 #define NN_SOCK_SRC_EP 1
60
61 /* Private functions. */
62 static struct nn_optset *nn_sock_optset (struct nn_sock *self, int id);
63 static int nn_sock_setopt_inner (struct nn_sock *self, int level,
64 int option, const void *optval, size_t optvallen);
65 static void nn_sock_onleave (struct nn_ctx *self);
66 static void nn_sock_handler (struct nn_fsm *self, int src, int type,
67 void *srcptr);
68 static void nn_sock_shutdown (struct nn_fsm *self, int src, int type,
69 void *srcptr);
70
71 /* Initialize a socket. A hold is placed on the initialized socket for
72 the caller as well. */
nn_sock_init(struct nn_sock * self,const struct nn_socktype * socktype,int fd)73 int nn_sock_init (struct nn_sock *self, const struct nn_socktype *socktype,
74 int fd)
75 {
76 int rc;
77 int i;
78
79 /* Make sure that at least one message direction is supported. */
80 nn_assert (!(socktype->flags & NN_SOCKTYPE_FLAG_NOSEND) ||
81 !(socktype->flags & NN_SOCKTYPE_FLAG_NORECV));
82
83 /* Create the AIO context for the SP socket. */
84 nn_ctx_init (&self->ctx, nn_global_getpool (), nn_sock_onleave);
85
86 /* Initialise the state machine. */
87 nn_fsm_init_root (&self->fsm, nn_sock_handler,
88 nn_sock_shutdown, &self->ctx);
89 self->state = NN_SOCK_STATE_INIT;
90
91 /* Open the NN_SNDFD and NN_RCVFD efds. Do so, only if the socket type
92 supports send/recv, as appropriate. */
93 if (socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)
94 memset (&self->sndfd, 0xcd, sizeof (self->sndfd));
95 else {
96 rc = nn_efd_init (&self->sndfd);
97 if (nn_slow (rc < 0))
98 return rc;
99 }
100 if (socktype->flags & NN_SOCKTYPE_FLAG_NORECV)
101 memset (&self->rcvfd, 0xcd, sizeof (self->rcvfd));
102 else {
103 rc = nn_efd_init (&self->rcvfd);
104 if (nn_slow (rc < 0)) {
105 if (!(socktype->flags & NN_SOCKTYPE_FLAG_NOSEND))
106 nn_efd_term (&self->sndfd);
107 return rc;
108 }
109 }
110 nn_sem_init (&self->termsem);
111 nn_sem_init (&self->relesem);
112 if (nn_slow (rc < 0)) {
113 if (!(socktype->flags & NN_SOCKTYPE_FLAG_NORECV))
114 nn_efd_term (&self->rcvfd);
115 if (!(socktype->flags & NN_SOCKTYPE_FLAG_NOSEND))
116 nn_efd_term (&self->sndfd);
117 return rc;
118 }
119
120 self->holds = 1; /* Callers hold. */
121 self->flags = 0;
122 nn_list_init (&self->eps);
123 nn_list_init (&self->sdeps);
124 self->eid = 1;
125
126 /* Default values for NN_SOL_SOCKET options. */
127 self->sndbuf = 128 * 1024;
128 self->rcvbuf = 128 * 1024;
129 self->rcvmaxsize = 1024 * 1024;
130 self->sndtimeo = -1;
131 self->rcvtimeo = -1;
132 self->reconnect_ivl = 100;
133 self->reconnect_ivl_max = 0;
134 self->maxttl = 8;
135 self->ep_template.sndprio = 8;
136 self->ep_template.rcvprio = 8;
137 self->ep_template.ipv4only = 1;
138
139 /* Clear statistic entries */
140 memset(&self->statistics, 0, sizeof (self->statistics));
141
142 /* Should be pretty much enough space for just the number */
143 sprintf(self->socket_name, "%d", fd);
144
145 /* Security attribute */
146 self->sec_attr = NULL;
147 self->sec_attr_size = 0;
148 self->inbuffersz = 4096;
149 self->outbuffersz = 4096;
150
151 /* The transport-specific options are not initialised immediately,
152 rather, they are allocated later on when needed. */
153 for (i = 0; i != NN_MAX_TRANSPORT; ++i)
154 self->optsets [i] = NULL;
155
156 /* Create the specific socket type itself. */
157 rc = socktype->create ((void*) self, &self->sockbase);
158 errnum_assert (rc == 0, -rc);
159 self->socktype = socktype;
160
161 /* Launch the state machine. */
162 nn_ctx_enter (&self->ctx);
163 nn_fsm_start (&self->fsm);
164 nn_ctx_leave (&self->ctx);
165
166 return 0;
167 }
168
nn_sock_stopped(struct nn_sock * self)169 void nn_sock_stopped (struct nn_sock *self)
170 {
171 /* TODO: Do the following in a more sane way. */
172 self->fsm.stopped.fsm = &self->fsm;
173 self->fsm.stopped.src = NN_FSM_ACTION;
174 self->fsm.stopped.srcptr = NULL;
175 self->fsm.stopped.type = NN_SOCK_ACTION_STOPPED;
176 nn_ctx_raise (self->fsm.ctx, &self->fsm.stopped);
177 }
178
179 /* Stop the socket. This will prevent new calls from aquiring a
180 hold on the socket, cause endpoints to shut down, and wake any
181 threads waiting to recv or send data. */
nn_sock_stop(struct nn_sock * self)182 void nn_sock_stop (struct nn_sock *self)
183 {
184 nn_ctx_enter (&self->ctx);
185 nn_fsm_stop (&self->fsm);
186 nn_ctx_leave (&self->ctx);
187 }
188
nn_sock_term(struct nn_sock * self)189 int nn_sock_term (struct nn_sock *self)
190 {
191 int rc;
192 int i;
193
194 /* NOTE: nn_sock_stop must have already been called. */
195
196 /* Some endpoints may still be alive. Here we are going to wait
197 till they are all closed. This loop is not interruptible, because
198 making it so would leave a partially cleaned up socket, and we don't
199 have a way to defer resource deallocation. */
200 for (;;) {
201 rc = nn_sem_wait (&self->termsem);
202 if (nn_slow (rc == -EINTR))
203 continue;
204 errnum_assert (rc == 0, -rc);
205 break;
206 }
207
208 /* Also, wait for all holds on the socket to be released. */
209 for (;;) {
210 rc = nn_sem_wait (&self->relesem);
211 if (nn_slow (rc == -EINTR))
212 continue;
213 errnum_assert (rc == 0, -rc);
214 break;
215 }
216
217 /* Threads that posted the semaphore(s) can still have the ctx locked
218 for a short while. By simply entering the context and exiting it
219 immediately we can be sure that any such threads have already
220 exited the context. */
221 nn_ctx_enter (&self->ctx);
222 nn_ctx_leave (&self->ctx);
223
224 /* At this point, we can be reasonably certain that no other thread
225 has any references to the socket. */
226
227 /* Close the event FDs entirely. */
228 if (!(self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)) {
229 nn_efd_term (&self->rcvfd);
230 }
231 if (!(self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)) {
232 nn_efd_term (&self->sndfd);
233 }
234
235 nn_fsm_stopped_noevent (&self->fsm);
236 nn_fsm_term (&self->fsm);
237 nn_sem_term (&self->termsem);
238 nn_sem_term (&self->relesem);
239 nn_list_term (&self->sdeps);
240 nn_list_term (&self->eps);
241 nn_ctx_term (&self->ctx);
242
243 /* Destroy any optsets associated with the socket. */
244 for (i = 0; i != NN_MAX_TRANSPORT; ++i)
245 if (self->optsets [i])
246 self->optsets [i]->vfptr->destroy (self->optsets [i]);
247
248 return 0;
249 }
250
nn_sock_getctx(struct nn_sock * self)251 struct nn_ctx *nn_sock_getctx (struct nn_sock *self)
252 {
253 return &self->ctx;
254 }
255
nn_sock_ispeer(struct nn_sock * self,int socktype)256 int nn_sock_ispeer (struct nn_sock *self, int socktype)
257 {
258 /* If the peer implements a different SP protocol it is not a valid peer.
259 Checking it here ensures that even if faulty protocol implementation
260 allows for cross-protocol communication, it will never happen
261 in practice. */
262 if ((self->socktype->protocol & 0xfff0) != (socktype & 0xfff0))
263 return 0;
264
265 /* As long as the peer speaks the same protocol, socket type itself
266 decides which socket types are to be accepted. */
267 return self->socktype->ispeer (socktype);
268 }
269
nn_sock_setopt(struct nn_sock * self,int level,int option,const void * optval,size_t optvallen)270 int nn_sock_setopt (struct nn_sock *self, int level, int option,
271 const void *optval, size_t optvallen)
272 {
273 int rc;
274
275 nn_ctx_enter (&self->ctx);
276 rc = nn_sock_setopt_inner (self, level, option, optval, optvallen);
277 nn_ctx_leave (&self->ctx);
278
279 return rc;
280 }
281
nn_sock_setopt_inner(struct nn_sock * self,int level,int option,const void * optval,size_t optvallen)282 static int nn_sock_setopt_inner (struct nn_sock *self, int level,
283 int option, const void *optval, size_t optvallen)
284 {
285 struct nn_optset *optset;
286 int val;
287
288 /* Protocol-specific socket options. */
289 if (level > NN_SOL_SOCKET) {
290 if (self->sockbase->vfptr->setopt == NULL) {
291 return -ENOPROTOOPT;
292 }
293 return self->sockbase->vfptr->setopt (self->sockbase, level, option,
294 optval, optvallen);
295 }
296
297 /* Transport-specific options. */
298 if (level < NN_SOL_SOCKET) {
299 optset = nn_sock_optset (self, level);
300 if (!optset)
301 return -ENOPROTOOPT;
302 return optset->vfptr->setopt (optset, option, optval, optvallen);
303 }
304
305 nn_assert (level == NN_SOL_SOCKET);
306
307 /* Special-casing socket name for now as it's the only string option */
308 if (option == NN_SOCKET_NAME) {
309 if (optvallen > 63)
310 return -EINVAL;
311 memcpy (self->socket_name, optval, optvallen);
312 self->socket_name [optvallen] = 0;
313 return 0;
314 }
315
316 /* At this point we assume that all options are of type int. */
317 if (optvallen != sizeof (int))
318 return -EINVAL;
319 val = *(int*) optval;
320
321 /* Generic socket-level options. */
322 switch (option) {
323 case NN_SNDBUF:
324 if (val <= 0)
325 return -EINVAL;
326 self->sndbuf = val;
327 return 0;
328 case NN_RCVBUF:
329 if (val <= 0)
330 return -EINVAL;
331 self->rcvbuf = val;
332 return 0;
333 case NN_RCVMAXSIZE:
334 if (val < -1)
335 return -EINVAL;
336 self->rcvmaxsize = val;
337 return 0;
338 case NN_SNDTIMEO:
339 self->sndtimeo = val;
340 return 0;
341 case NN_RCVTIMEO:
342 self->rcvtimeo = val;
343 return 0;
344 case NN_RECONNECT_IVL:
345 if (val < 0)
346 return -EINVAL;
347 self->reconnect_ivl = val;
348 return 0;
349 case NN_RECONNECT_IVL_MAX:
350 if (val < 0)
351 return -EINVAL;
352 self->reconnect_ivl_max = val;
353 return 0;
354 case NN_SNDPRIO:
355 if (val < 1 || val > 16)
356 return -EINVAL;
357 self->ep_template.sndprio = val;
358 return 0;
359 case NN_RCVPRIO:
360 if (val < 1 || val > 16)
361 return -EINVAL;
362 self->ep_template.rcvprio = val;
363 return 0;
364 case NN_IPV4ONLY:
365 if (val != 0 && val != 1)
366 return -EINVAL;
367 self->ep_template.ipv4only = val;
368 return 0;
369 case NN_MAXTTL:
370 if (val < 1 || val > 255)
371 return -EINVAL;
372 self->maxttl = val;
373 return 0;
374 case NN_LINGER:
375 /* Ignored, retained for compatibility. */
376 return 0;
377 }
378
379 return -ENOPROTOOPT;
380 }
381
nn_sock_getopt(struct nn_sock * self,int level,int option,void * optval,size_t * optvallen)382 int nn_sock_getopt (struct nn_sock *self, int level, int option,
383 void *optval, size_t *optvallen)
384 {
385 int rc;
386
387 nn_ctx_enter (&self->ctx);
388 rc = nn_sock_getopt_inner (self, level, option, optval, optvallen);
389 nn_ctx_leave (&self->ctx);
390
391 return rc;
392 }
393
nn_sock_getopt_inner(struct nn_sock * self,int level,int option,void * optval,size_t * optvallen)394 int nn_sock_getopt_inner (struct nn_sock *self, int level,
395 int option, void *optval, size_t *optvallen)
396 {
397 struct nn_optset *optset;
398 int intval;
399 nn_fd fd;
400
401 /* Protocol-specific socket options. */
402 if (level > NN_SOL_SOCKET) {
403 if (self->sockbase->vfptr->getopt == NULL) {
404 return -ENOPROTOOPT;
405 }
406 return self->sockbase->vfptr->getopt (self->sockbase,
407 level, option, optval, optvallen);
408 }
409
410 /* Transport-specific options. */
411 if (level < NN_SOL_SOCKET) {
412 optset = nn_sock_optset (self, level);
413 if (!optset)
414 return -ENOPROTOOPT;
415 return optset->vfptr->getopt (optset, option, optval, optvallen);
416 }
417
418 nn_assert (level == NN_SOL_SOCKET);
419
420 /* Generic socket-level options. */
421 switch (option) {
422 case NN_DOMAIN:
423 intval = self->socktype->domain;
424 break;
425 case NN_PROTOCOL:
426 intval = self->socktype->protocol;
427 break;
428 case NN_LINGER:
429 intval = 0;
430 break;
431 case NN_SNDBUF:
432 intval = self->sndbuf;
433 break;
434 case NN_RCVBUF:
435 intval = self->rcvbuf;
436 break;
437 case NN_RCVMAXSIZE:
438 intval = self->rcvmaxsize;
439 break;
440 case NN_SNDTIMEO:
441 intval = self->sndtimeo;
442 break;
443 case NN_RCVTIMEO:
444 intval = self->rcvtimeo;
445 break;
446 case NN_RECONNECT_IVL:
447 intval = self->reconnect_ivl;
448 break;
449 case NN_RECONNECT_IVL_MAX:
450 intval = self->reconnect_ivl_max;
451 break;
452 case NN_SNDPRIO:
453 intval = self->ep_template.sndprio;
454 break;
455 case NN_RCVPRIO:
456 intval = self->ep_template.rcvprio;
457 break;
458 case NN_IPV4ONLY:
459 intval = self->ep_template.ipv4only;
460 break;
461 case NN_MAXTTL:
462 intval = self->maxttl;
463 break;
464 case NN_SNDFD:
465 if (self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)
466 return -ENOPROTOOPT;
467 fd = nn_efd_getfd (&self->sndfd);
468 memcpy (optval, &fd,
469 *optvallen < sizeof (nn_fd) ? *optvallen : sizeof (nn_fd));
470 *optvallen = sizeof (nn_fd);
471 return 0;
472 case NN_RCVFD:
473 if (self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)
474 return -ENOPROTOOPT;
475 fd = nn_efd_getfd (&self->rcvfd);
476 memcpy (optval, &fd,
477 *optvallen < sizeof (nn_fd) ? *optvallen : sizeof (nn_fd));
478 *optvallen = sizeof (nn_fd);
479 return 0;
480 case NN_SOCKET_NAME:
481 strncpy (optval, self->socket_name, *optvallen);
482 *optvallen = strlen(self->socket_name);
483 return 0;
484 default:
485 return -ENOPROTOOPT;
486 }
487
488 memcpy (optval, &intval,
489 *optvallen < sizeof (int) ? *optvallen : sizeof (int));
490 *optvallen = sizeof (int);
491
492 return 0;
493 }
494
nn_sock_add_ep(struct nn_sock * self,const struct nn_transport * transport,int bind,const char * addr)495 int nn_sock_add_ep (struct nn_sock *self, const struct nn_transport *transport,
496 int bind, const char *addr)
497 {
498 int rc;
499 struct nn_ep *ep;
500 int eid;
501
502 nn_ctx_enter (&self->ctx);
503
504 /* Instantiate the endpoint. */
505 ep = nn_alloc (sizeof (struct nn_ep), "endpoint");
506 rc = nn_ep_init (ep, NN_SOCK_SRC_EP, self, self->eid, transport,
507 bind, addr);
508 if (nn_slow (rc < 0)) {
509 nn_free (ep);
510 nn_ctx_leave (&self->ctx);
511 return rc;
512 }
513 nn_ep_start (ep);
514
515 /* Increase the endpoint ID for the next endpoint. */
516 eid = self->eid;
517 ++self->eid;
518
519 /* Add it to the list of active endpoints. */
520 nn_list_insert (&self->eps, &ep->item, nn_list_end (&self->eps));
521
522 nn_ctx_leave (&self->ctx);
523
524 return eid;
525 }
526
nn_sock_rm_ep(struct nn_sock * self,int eid)527 int nn_sock_rm_ep (struct nn_sock *self, int eid)
528 {
529 struct nn_list_item *it;
530 struct nn_ep *ep;
531
532 nn_ctx_enter (&self->ctx);
533
534 /* Find the specified enpoint. */
535 ep = NULL;
536 for (it = nn_list_begin (&self->eps);
537 it != nn_list_end (&self->eps);
538 it = nn_list_next (&self->eps, it)) {
539 ep = nn_cont (it, struct nn_ep, item);
540 if (ep->eid == eid)
541 break;
542 ep = NULL;
543 }
544
545 /* The endpoint doesn't exist. */
546 if (!ep) {
547 nn_ctx_leave (&self->ctx);
548 return -EINVAL;
549 }
550
551 /* Move the endpoint from the list of active endpoints to the list
552 of shutting down endpoints. */
553 nn_list_erase (&self->eps, &ep->item);
554 nn_list_insert (&self->sdeps, &ep->item, nn_list_end (&self->sdeps));
555
556 /* Ask the endpoint to stop. Actual terminatation may be delayed
557 by the transport. */
558 nn_ep_stop (ep);
559
560 nn_ctx_leave (&self->ctx);
561
562 return 0;
563 }
564
nn_sock_send(struct nn_sock * self,struct nn_msg * msg,int flags)565 int nn_sock_send (struct nn_sock *self, struct nn_msg *msg, int flags)
566 {
567 int rc;
568 uint64_t deadline;
569 uint64_t now;
570 int timeout;
571
572 /* Some sockets types cannot be used for sending messages. */
573 if (nn_slow (self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND))
574 return -ENOTSUP;
575
576 nn_ctx_enter (&self->ctx);
577
578 /* Compute the deadline for SNDTIMEO timer. */
579 if (self->sndtimeo < 0) {
580 deadline = -1;
581 timeout = -1;
582 }
583 else {
584 deadline = nn_clock_ms() + self->sndtimeo;
585 timeout = self->sndtimeo;
586 }
587
588 while (1) {
589
590 switch (self->state) {
591 case NN_SOCK_STATE_ACTIVE:
592 case NN_SOCK_STATE_INIT:
593 break;
594
595 case NN_SOCK_STATE_STOPPING_EPS:
596 case NN_SOCK_STATE_STOPPING:
597 case NN_SOCK_STATE_FINI:
598 /* Socket closed or closing. Should we return something
599 else here; recvmsg(2) for example returns no data in
600 this case, like read(2). The use of indexed file
601 descriptors is further problematic, as an FD can be reused
602 leading to situations where technically the outstanding
603 operation should refer to some other socket entirely. */
604 nn_ctx_leave (&self->ctx);
605 return -EBADF;
606 }
607
608 /* Try to send the message in a non-blocking way. */
609 rc = self->sockbase->vfptr->send (self->sockbase, msg);
610 if (nn_fast (rc == 0)) {
611 nn_ctx_leave (&self->ctx);
612 return 0;
613 }
614 nn_assert (rc < 0);
615
616 /* Any unexpected error is forwarded to the caller. */
617 if (nn_slow (rc != -EAGAIN)) {
618 nn_ctx_leave (&self->ctx);
619 return rc;
620 }
621
622 /* If the message cannot be sent at the moment and the send call
623 is non-blocking, return immediately. */
624 if (nn_fast (flags & NN_DONTWAIT)) {
625 nn_ctx_leave (&self->ctx);
626 return -EAGAIN;
627 }
628
629 /* With blocking send, wait while there are new pipes available
630 for sending. */
631 nn_ctx_leave (&self->ctx);
632 rc = nn_efd_wait (&self->sndfd, timeout);
633 if (nn_slow (rc == -ETIMEDOUT))
634 return -ETIMEDOUT;
635 if (nn_slow (rc == -EINTR))
636 return -EINTR;
637 if (nn_slow (rc == -EBADF))
638 return -EBADF;
639 errnum_assert (rc == 0, rc);
640 nn_ctx_enter (&self->ctx);
641 /*
642 * Double check if pipes are still available for sending
643 */
644 if (!nn_efd_wait (&self->sndfd, 0)) {
645 self->flags |= NN_SOCK_FLAG_OUT;
646 }
647
648 /* If needed, re-compute the timeout to reflect the time that have
649 already elapsed. */
650 if (self->sndtimeo >= 0) {
651 now = nn_clock_ms();
652 timeout = (int) (now > deadline ? 0 : deadline - now);
653 }
654 }
655 }
656
nn_sock_recv(struct nn_sock * self,struct nn_msg * msg,int flags)657 int nn_sock_recv (struct nn_sock *self, struct nn_msg *msg, int flags)
658 {
659 int rc;
660 uint64_t deadline;
661 uint64_t now;
662 int timeout;
663
664 /* Some sockets types cannot be used for receiving messages. */
665 if (nn_slow (self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV))
666 return -ENOTSUP;
667
668 nn_ctx_enter (&self->ctx);
669
670 /* Compute the deadline for RCVTIMEO timer. */
671 if (self->rcvtimeo < 0) {
672 deadline = -1;
673 timeout = -1;
674 }
675 else {
676 deadline = nn_clock_ms() + self->rcvtimeo;
677 timeout = self->rcvtimeo;
678 }
679
680 while (1) {
681
682 switch (self->state) {
683 case NN_SOCK_STATE_ACTIVE:
684 case NN_SOCK_STATE_INIT:
685 break;
686
687 case NN_SOCK_STATE_STOPPING_EPS:
688 case NN_SOCK_STATE_STOPPING:
689 case NN_SOCK_STATE_FINI:
690 /* Socket closed or closing. Should we return something
691 else here; recvmsg(2) for example returns no data in
692 this case, like read(2). The use of indexed file
693 descriptors is further problematic, as an FD can be reused
694 leading to situations where technically the outstanding
695 operation should refer to some other socket entirely. */
696 nn_ctx_leave (&self->ctx);
697 return -EBADF;
698 }
699
700 /* Try to receive the message in a non-blocking way. */
701 rc = self->sockbase->vfptr->recv (self->sockbase, msg);
702 if (nn_fast (rc == 0)) {
703 nn_ctx_leave (&self->ctx);
704 return 0;
705 }
706 nn_assert (rc < 0);
707
708 /* Any unexpected error is forwarded to the caller. */
709 if (nn_slow (rc != -EAGAIN)) {
710 nn_ctx_leave (&self->ctx);
711 return rc;
712 }
713
714 /* If the message cannot be received at the moment and the recv call
715 is non-blocking, return immediately. */
716 if (nn_fast (flags & NN_DONTWAIT)) {
717 nn_ctx_leave (&self->ctx);
718 return -EAGAIN;
719 }
720
721 /* With blocking recv, wait while there are new pipes available
722 for receiving. */
723 nn_ctx_leave (&self->ctx);
724 rc = nn_efd_wait (&self->rcvfd, timeout);
725 if (nn_slow (rc == -ETIMEDOUT))
726 return -ETIMEDOUT;
727 if (nn_slow (rc == -EINTR))
728 return -EINTR;
729 if (nn_slow (rc == -EBADF))
730 return -EBADF;
731 errnum_assert (rc == 0, rc);
732 nn_ctx_enter (&self->ctx);
733 /*
734 * Double check if pipes are still available for receiving
735 */
736 if (!nn_efd_wait (&self->rcvfd, 0)) {
737 self->flags |= NN_SOCK_FLAG_IN;
738 }
739
740 /* If needed, re-compute the timeout to reflect the time that have
741 already elapsed. */
742 if (self->rcvtimeo >= 0) {
743 now = nn_clock_ms();
744 timeout = (int) (now > deadline ? 0 : deadline - now);
745 }
746 }
747 }
748
nn_sock_add(struct nn_sock * self,struct nn_pipe * pipe)749 int nn_sock_add (struct nn_sock *self, struct nn_pipe *pipe)
750 {
751 int rc;
752
753 rc = self->sockbase->vfptr->add (self->sockbase, pipe);
754 if (nn_slow (rc >= 0)) {
755 nn_sock_stat_increment (self, NN_STAT_CURRENT_CONNECTIONS, 1);
756 }
757 return rc;
758 }
759
nn_sock_rm(struct nn_sock * self,struct nn_pipe * pipe)760 void nn_sock_rm (struct nn_sock *self, struct nn_pipe *pipe)
761 {
762 self->sockbase->vfptr->rm (self->sockbase, pipe);
763 nn_sock_stat_increment (self, NN_STAT_CURRENT_CONNECTIONS, -1);
764 }
765
nn_sock_onleave(struct nn_ctx * self)766 static void nn_sock_onleave (struct nn_ctx *self)
767 {
768 struct nn_sock *sock;
769 int events;
770
771 sock = nn_cont (self, struct nn_sock, ctx);
772
773 /* If nn_close() was already called there's no point in adjusting the
774 snd/rcv file descriptors. */
775 if (nn_slow (sock->state != NN_SOCK_STATE_ACTIVE))
776 return;
777
778 /* Check whether socket is readable and/or writable at the moment. */
779 events = sock->sockbase->vfptr->events (sock->sockbase);
780 errnum_assert (events >= 0, -events);
781
782 /* Signal/unsignal IN as needed. */
783 if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)) {
784 if (events & NN_SOCKBASE_EVENT_IN) {
785 if (!(sock->flags & NN_SOCK_FLAG_IN)) {
786 sock->flags |= NN_SOCK_FLAG_IN;
787 nn_efd_signal (&sock->rcvfd);
788 }
789 }
790 else {
791 if (sock->flags & NN_SOCK_FLAG_IN) {
792 sock->flags &= ~NN_SOCK_FLAG_IN;
793 nn_efd_unsignal (&sock->rcvfd);
794 }
795 }
796 }
797
798 /* Signal/unsignal OUT as needed. */
799 if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)) {
800 if (events & NN_SOCKBASE_EVENT_OUT) {
801 if (!(sock->flags & NN_SOCK_FLAG_OUT)) {
802 sock->flags |= NN_SOCK_FLAG_OUT;
803 nn_efd_signal (&sock->sndfd);
804 }
805 }
806 else {
807 if (sock->flags & NN_SOCK_FLAG_OUT) {
808 sock->flags &= ~NN_SOCK_FLAG_OUT;
809 nn_efd_unsignal (&sock->sndfd);
810 }
811 }
812 }
813 }
814
nn_sock_optset(struct nn_sock * self,int id)815 static struct nn_optset *nn_sock_optset (struct nn_sock *self, int id)
816 {
817 int index;
818 const struct nn_transport *tp;
819
820 /* Transport IDs are negative and start from -1. */
821 index = (-id) - 1;
822
823 /* Check for invalid indices. */
824 if (nn_slow (index < 0 || index >= NN_MAX_TRANSPORT))
825 return NULL;
826
827 /* If the option set already exists return it. */
828 if (nn_fast (self->optsets [index] != NULL))
829 return self->optsets [index];
830
831 /* If the option set doesn't exist yet, create it. */
832 tp = nn_global_transport (id);
833 if (nn_slow (!tp))
834 return NULL;
835 if (nn_slow (!tp->optset))
836 return NULL;
837 self->optsets [index] = tp->optset ();
838
839 return self->optsets [index];
840 }
841
nn_sock_shutdown(struct nn_fsm * self,int src,int type,void * srcptr)842 static void nn_sock_shutdown (struct nn_fsm *self, int src, int type,
843 void *srcptr)
844 {
845 struct nn_sock *sock;
846 struct nn_list_item *it;
847 struct nn_ep *ep;
848
849 sock = nn_cont (self, struct nn_sock, fsm);
850
851 if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
852 nn_assert (sock->state == NN_SOCK_STATE_ACTIVE);
853
854 /* Close sndfd and rcvfd. This should make any current
855 select/poll using SNDFD and/or RCVFD exit. */
856 if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)) {
857 nn_efd_stop (&sock->rcvfd);
858 }
859 if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)) {
860 nn_efd_stop (&sock->sndfd);
861 }
862
863 /* Ask all the associated endpoints to stop. */
864 it = nn_list_begin (&sock->eps);
865 while (it != nn_list_end (&sock->eps)) {
866 ep = nn_cont (it, struct nn_ep, item);
867 it = nn_list_next (&sock->eps, it);
868 nn_list_erase (&sock->eps, &ep->item);
869 nn_list_insert (&sock->sdeps, &ep->item,
870 nn_list_end (&sock->sdeps));
871 nn_ep_stop (ep);
872
873 }
874 sock->state = NN_SOCK_STATE_STOPPING_EPS;
875 goto finish2;
876 }
877 if (nn_slow (sock->state == NN_SOCK_STATE_STOPPING_EPS)) {
878
879 if (!(src == NN_SOCK_SRC_EP && type == NN_EP_STOPPED)) {
880 /* If we got here waiting for EPs to teardown, but src is
881 not an EP, then it isn't safe for us to do anything,
882 because we just need to wait for the EPs to finish
883 up their thing. Just bail. */
884 return;
885 }
886 /* Endpoint is stopped. Now we can safely deallocate it. */
887 ep = (struct nn_ep*) srcptr;
888 nn_list_erase (&sock->sdeps, &ep->item);
889 nn_ep_term (ep);
890 nn_free (ep);
891
892 finish2:
893 /* If all the endpoints are deallocated, we can start stopping
894 protocol-specific part of the socket. If there' no stop function
895 we can consider it stopped straight away. */
896 if (!nn_list_empty (&sock->sdeps))
897 return;
898 nn_assert (nn_list_empty (&sock->eps));
899 sock->state = NN_SOCK_STATE_STOPPING;
900 if (!sock->sockbase->vfptr->stop)
901 goto finish1;
902 sock->sockbase->vfptr->stop (sock->sockbase);
903 return;
904 }
905 if (nn_slow (sock->state == NN_SOCK_STATE_STOPPING)) {
906
907 /* We get here when the deallocation of the socket was delayed by the
908 specific socket type. */
909 nn_assert (src == NN_FSM_ACTION && type == NN_SOCK_ACTION_STOPPED);
910
911 finish1:
912 /* Protocol-specific part of the socket is stopped.
913 We can safely deallocate it. */
914 sock->sockbase->vfptr->destroy (sock->sockbase);
915 sock->state = NN_SOCK_STATE_FINI;
916
917 /* Now we can unblock the application thread blocked in
918 the nn_close() call. */
919 nn_sem_post (&sock->termsem);
920
921 return;
922 }
923
924 nn_fsm_bad_state(sock->state, src, type);
925 }
926
nn_sock_handler(struct nn_fsm * self,int src,int type,void * srcptr)927 static void nn_sock_handler (struct nn_fsm *self, int src, int type,
928 void *srcptr)
929 {
930 struct nn_sock *sock;
931 struct nn_ep *ep;
932
933 sock = nn_cont (self, struct nn_sock, fsm);
934
935 switch (sock->state) {
936
937 /******************************************************************************/
938 /* INIT state. */
939 /******************************************************************************/
940 case NN_SOCK_STATE_INIT:
941 switch (src) {
942
943 case NN_FSM_ACTION:
944 switch (type) {
945 case NN_FSM_START:
946 sock->state = NN_SOCK_STATE_ACTIVE;
947 return;
948 default:
949 nn_fsm_bad_action (sock->state, src, type);
950 }
951
952 default:
953 nn_fsm_bad_source (sock->state, src, type);
954 }
955
956 /******************************************************************************/
957 /* ACTIVE state. */
958 /******************************************************************************/
959 case NN_SOCK_STATE_ACTIVE:
960 switch (src) {
961
962 case NN_FSM_ACTION:
963 switch (type) {
964 default:
965 nn_fsm_bad_action (sock->state, src, type);
966 }
967
968 case NN_SOCK_SRC_EP:
969 switch (type) {
970 case NN_EP_STOPPED:
971
972 /* This happens when an endpoint is closed using
973 nn_shutdown() function. */
974 ep = (struct nn_ep*) srcptr;
975 nn_list_erase (&sock->sdeps, &ep->item);
976 nn_ep_term (ep);
977 nn_free (ep);
978 return;
979
980 default:
981 nn_fsm_bad_action (sock->state, src, type);
982 }
983
984 default:
985
986 /* The assumption is that all the other events come from pipes. */
987 switch (type) {
988 case NN_PIPE_IN:
989 sock->sockbase->vfptr->in (sock->sockbase,
990 (struct nn_pipe*) srcptr);
991 return;
992 case NN_PIPE_OUT:
993 sock->sockbase->vfptr->out (sock->sockbase,
994 (struct nn_pipe*) srcptr);
995 return;
996 default:
997 nn_fsm_bad_action (sock->state, src, type);
998 }
999 }
1000
1001 /******************************************************************************/
1002 /* Invalid state. */
1003 /******************************************************************************/
1004 default:
1005 nn_fsm_bad_state (sock->state, src, type);
1006 }
1007 }
1008
1009 /******************************************************************************/
1010 /* State machine actions. */
1011 /******************************************************************************/
1012
nn_sock_report_error(struct nn_sock * self,struct nn_ep * ep,int errnum)1013 void nn_sock_report_error (struct nn_sock *self, struct nn_ep *ep, int errnum)
1014 {
1015 if (!nn_global_print_errors())
1016 return;
1017
1018 if (errnum == 0)
1019 return;
1020
1021 if (ep) {
1022 fprintf(stderr, "nanomsg: socket.%s[%s]: Error: %s\n",
1023 self->socket_name, nn_ep_getaddr(ep), nn_strerror(errnum));
1024 } else {
1025 fprintf(stderr, "nanomsg: socket.%s: Error: %s\n",
1026 self->socket_name, nn_strerror(errnum));
1027 }
1028 }
1029
nn_sock_stat_increment(struct nn_sock * self,int name,int64_t increment)1030 void nn_sock_stat_increment (struct nn_sock *self, int name, int64_t increment)
1031 {
1032 switch (name) {
1033 case NN_STAT_ESTABLISHED_CONNECTIONS:
1034 nn_assert (increment > 0);
1035 self->statistics.established_connections += increment;
1036 break;
1037 case NN_STAT_ACCEPTED_CONNECTIONS:
1038 nn_assert (increment > 0);
1039 self->statistics.accepted_connections += increment;
1040 break;
1041 case NN_STAT_DROPPED_CONNECTIONS:
1042 nn_assert (increment > 0);
1043 self->statistics.dropped_connections += increment;
1044 break;
1045 case NN_STAT_BROKEN_CONNECTIONS:
1046 nn_assert (increment > 0);
1047 self->statistics.broken_connections += increment;
1048 break;
1049 case NN_STAT_CONNECT_ERRORS:
1050 nn_assert (increment > 0);
1051 self->statistics.connect_errors += increment;
1052 break;
1053 case NN_STAT_BIND_ERRORS:
1054 nn_assert (increment > 0);
1055 self->statistics.bind_errors += increment;
1056 break;
1057 case NN_STAT_ACCEPT_ERRORS:
1058 nn_assert (increment > 0);
1059 self->statistics.accept_errors += increment;
1060 break;
1061 case NN_STAT_MESSAGES_SENT:
1062 nn_assert (increment > 0);
1063 self->statistics.messages_sent += increment;
1064 break;
1065 case NN_STAT_MESSAGES_RECEIVED:
1066 nn_assert (increment > 0);
1067 self->statistics.messages_received += increment;
1068 break;
1069 case NN_STAT_BYTES_SENT:
1070 nn_assert (increment >= 0);
1071 self->statistics.bytes_sent += increment;
1072 break;
1073 case NN_STAT_BYTES_RECEIVED:
1074 nn_assert (increment >= 0);
1075 self->statistics.bytes_received += increment;
1076 break;
1077
1078 case NN_STAT_CURRENT_CONNECTIONS:
1079 nn_assert (increment > 0 ||
1080 self->statistics.current_connections >= -increment);
1081 nn_assert(increment < INT_MAX && increment > -INT_MAX);
1082 self->statistics.current_connections += (int) increment;
1083 break;
1084 case NN_STAT_INPROGRESS_CONNECTIONS:
1085 nn_assert (increment > 0 ||
1086 self->statistics.inprogress_connections >= -increment);
1087 nn_assert(increment < INT_MAX && increment > -INT_MAX);
1088 self->statistics.inprogress_connections += (int) increment;
1089 break;
1090 case NN_STAT_CURRENT_SND_PRIORITY:
1091 /* This is an exception, we don't want to increment priority */
1092 nn_assert((increment > 0 && increment <= 16) || increment == -1);
1093 self->statistics.current_snd_priority = (int) increment;
1094 break;
1095 case NN_STAT_CURRENT_EP_ERRORS:
1096 nn_assert (increment > 0 ||
1097 self->statistics.current_ep_errors >= -increment);
1098 nn_assert(increment < INT_MAX && increment > -INT_MAX);
1099 self->statistics.current_ep_errors += (int) increment;
1100 break;
1101 }
1102 }
1103
nn_sock_hold(struct nn_sock * self)1104 int nn_sock_hold (struct nn_sock *self)
1105 {
1106 switch (self->state) {
1107 case NN_SOCK_STATE_ACTIVE:
1108 case NN_SOCK_STATE_INIT:
1109 self->holds++;
1110 return 0;
1111 case NN_SOCK_STATE_STOPPING:
1112 case NN_SOCK_STATE_STOPPING_EPS:
1113 case NN_SOCK_STATE_FINI:
1114 default:
1115 return -EBADF;
1116 }
1117 }
1118
nn_sock_rele(struct nn_sock * self)1119 void nn_sock_rele (struct nn_sock *self)
1120 {
1121 self->holds--;
1122 if (self->holds == 0) {
1123 nn_sem_post (&self->relesem);
1124 }
1125 }
1126