1 //
2 // Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 //
5 // This software is supplied under the terms of the MIT License, a
6 // copy of which should be located in the distribution where this
7 // file was obtained (LICENSE.txt). A copy of the license may also be
8 // found online at https://opensource.org/licenses/MIT.
9 //
10
11 #include "core/nng_impl.h"
12 #include "sockimpl.h"
13
14 #include <stdio.h>
15 #include <string.h>
16
17 // Socket implementation.
18
19 static nni_list sock_list;
20 static nni_id_map sock_ids;
21 static nni_mtx sock_lk;
22 static nni_id_map ctx_ids;
23 static bool inited;
24
25 struct nni_ctx {
26 nni_list_node c_node;
27 nni_sock * c_sock;
28 nni_proto_ctx_ops c_ops;
29 void * c_data;
30 size_t c_size;
31 bool c_closed;
32 unsigned c_ref; // protected by global lock
33 uint32_t c_id;
34 nng_duration c_sndtimeo;
35 nng_duration c_rcvtimeo;
36 };
37
38 typedef struct nni_sockopt {
39 nni_list_node node;
40 char * name;
41 nni_type typ;
42 size_t sz;
43 void * data;
44 } nni_sockopt;
45
46 typedef struct nni_sock_pipe_cb {
47 nng_pipe_cb cb_fn;
48 void * cb_arg;
49 } nni_sock_pipe_cb;
50
51 struct nni_socket {
52 nni_list_node s_node;
53 nni_mtx s_mx;
54 nni_cv s_cv;
55 nni_cv s_close_cv;
56
57 uint32_t s_id;
58 uint32_t s_flags;
59 unsigned s_ref; // protected by global lock
60 void * s_data; // Protocol private
61 size_t s_size;
62
63 nni_msgq *s_uwq; // Upper write queue
64 nni_msgq *s_urq; // Upper read queue
65
66 nni_proto_id s_self_id;
67 nni_proto_id s_peer_id;
68
69 nni_proto_pipe_ops s_pipe_ops;
70 nni_proto_sock_ops s_sock_ops;
71 nni_proto_ctx_ops s_ctx_ops;
72
73 // options
74 nni_duration s_sndtimeo; // send timeout
75 nni_duration s_rcvtimeo; // receive timeout
76 nni_duration s_reconn; // reconnect time
77 nni_duration s_reconnmax; // max reconnect time
78 size_t s_rcvmaxsz; // max receive size
79 nni_list s_options; // opts not handled by sock/proto
80 char s_name[64]; // socket name (legacy compat)
81
82 nni_list s_listeners; // active listeners
83 nni_list s_dialers; // active dialers
84 nni_list s_pipes; // active pipes
85 nni_list s_ctxs; // active contexts (protected by global sock_lk)
86
87 bool s_closing; // Socket is closing
88 bool s_closed; // Socket closed, protected by global lock
89 bool s_ctxwait; // Waiting for contexts to close.
90
91 nni_mtx s_pipe_cbs_mtx;
92 nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
93
94 #ifdef NNG_ENABLE_STATS
95 nni_stat_item st_root; // socket scope
96 nni_stat_item st_id; // socket id
97 nni_stat_item st_name; // socket name
98 nni_stat_item st_protocol; // socket protocol
99 nni_stat_item st_dialers; // number of dialers
100 nni_stat_item st_listeners; // number of listeners
101 nni_stat_item st_pipes; // number of pipes
102 nni_stat_item st_rx_bytes; // number of bytes received
103 nni_stat_item st_tx_bytes; // number of bytes received
104 nni_stat_item st_rx_msgs; // number of msgs received
105 nni_stat_item st_tx_msgs; // number of msgs sent
106 nni_stat_item st_rejects; // pipes rejected
107 #endif
108 };
109
110 static void nni_ctx_destroy(nni_ctx *);
111
112 static void dialer_shutdown_locked(nni_dialer *);
113 static void listener_shutdown_locked(nni_listener *);
114
115 #define SOCK(s) ((nni_sock *) (s))
116
117 static int
sock_get_fd(void * s,unsigned flag,int * fdp)118 sock_get_fd(void *s, unsigned flag, int *fdp)
119 {
120 int rv;
121 nni_pollable *p;
122
123 if ((flag & nni_sock_flags(SOCK(s))) == 0) {
124 return (NNG_ENOTSUP);
125 }
126
127 if (flag == NNI_PROTO_FLAG_SND) {
128 rv = nni_msgq_get_sendable(SOCK(s)->s_uwq, &p);
129 } else {
130 rv = nni_msgq_get_recvable(SOCK(s)->s_urq, &p);
131 }
132
133 if (rv == 0) {
134 rv = nni_pollable_getfd(p, fdp);
135 }
136
137 return (rv);
138 }
139
140 static int
sock_get_sendfd(void * s,void * buf,size_t * szp,nni_type t)141 sock_get_sendfd(void *s, void *buf, size_t *szp, nni_type t)
142 {
143 int fd;
144 int rv;
145
146 if ((rv = sock_get_fd(SOCK(s), NNI_PROTO_FLAG_SND, &fd)) != 0) {
147 return (rv);
148 }
149 return (nni_copyout_int(fd, buf, szp, t));
150 }
151
152 static int
sock_get_recvfd(void * s,void * buf,size_t * szp,nni_type t)153 sock_get_recvfd(void *s, void *buf, size_t *szp, nni_type t)
154 {
155 int fd;
156 int rv;
157
158 if ((rv = sock_get_fd(SOCK(s), NNI_PROTO_FLAG_RCV, &fd)) != 0) {
159 return (rv);
160 }
161 return (nni_copyout_int(fd, buf, szp, t));
162 }
163
164 static int
sock_get_raw(void * s,void * buf,size_t * szp,nni_type t)165 sock_get_raw(void *s, void *buf, size_t *szp, nni_type t)
166 {
167 bool raw = ((nni_sock_flags(SOCK(s)) & NNI_PROTO_FLAG_RAW) != 0);
168 return (nni_copyout_bool(raw, buf, szp, t));
169 }
170
171 static int
sock_set_recvtimeo(void * s,const void * buf,size_t sz,nni_type t)172 sock_set_recvtimeo(void *s, const void *buf, size_t sz, nni_type t)
173 {
174 return (nni_copyin_ms(&SOCK(s)->s_rcvtimeo, buf, sz, t));
175 }
176
177 static int
sock_get_recvtimeo(void * s,void * buf,size_t * szp,nni_type t)178 sock_get_recvtimeo(void *s, void *buf, size_t *szp, nni_type t)
179 {
180 return (nni_copyout_ms(SOCK(s)->s_rcvtimeo, buf, szp, t));
181 }
182
183 static int
sock_set_sendtimeo(void * s,const void * buf,size_t sz,nni_type t)184 sock_set_sendtimeo(void *s, const void *buf, size_t sz, nni_type t)
185 {
186 return (nni_copyin_ms(&SOCK(s)->s_sndtimeo, buf, sz, t));
187 }
188
189 static int
sock_get_sendtimeo(void * s,void * buf,size_t * szp,nni_type t)190 sock_get_sendtimeo(void *s, void *buf, size_t *szp, nni_type t)
191 {
192 return (nni_copyout_ms(SOCK(s)->s_sndtimeo, buf, szp, t));
193 }
194
195 static int
sock_set_recvbuf(void * s,const void * buf,size_t sz,nni_type t)196 sock_set_recvbuf(void *s, const void *buf, size_t sz, nni_type t)
197 {
198 int len;
199 int rv;
200
201 if ((rv = nni_copyin_int(&len, buf, sz, 0, 8192, t)) != 0) {
202 return (rv);
203 }
204 return (nni_msgq_resize(SOCK(s)->s_urq, len));
205 }
206
207 static int
sock_get_recvbuf(void * s,void * buf,size_t * szp,nni_type t)208 sock_get_recvbuf(void *s, void *buf, size_t *szp, nni_type t)
209 {
210 int len = nni_msgq_cap(SOCK(s)->s_urq);
211
212 return (nni_copyout_int(len, buf, szp, t));
213 }
214
215 static int
sock_set_sendbuf(void * s,const void * buf,size_t sz,nni_type t)216 sock_set_sendbuf(void *s, const void *buf, size_t sz, nni_type t)
217 {
218 int len;
219 int rv;
220
221 if ((rv = nni_copyin_int(&len, buf, sz, 0, 8192, t)) != 0) {
222 return (rv);
223 }
224 return (nni_msgq_resize(SOCK(s)->s_uwq, len));
225 }
226
227 static int
sock_get_sendbuf(void * s,void * buf,size_t * szp,nni_type t)228 sock_get_sendbuf(void *s, void *buf, size_t *szp, nni_type t)
229 {
230 int len = nni_msgq_cap(SOCK(s)->s_uwq);
231
232 return (nni_copyout_int(len, buf, szp, t));
233 }
234
235 static int
sock_get_sockname(void * s,void * buf,size_t * szp,nni_type t)236 sock_get_sockname(void *s, void *buf, size_t *szp, nni_type t)
237 {
238 return (nni_copyout_str(SOCK(s)->s_name, buf, szp, t));
239 }
240
241 static int
sock_set_sockname(void * s,const void * buf,size_t sz,nni_type t)242 sock_set_sockname(void *s, const void *buf, size_t sz, nni_type t)
243 {
244 return (nni_copyin_str(
245 SOCK(s)->s_name, buf, sizeof(SOCK(s)->s_name), sz, t));
246 }
247
248 static int
sock_get_proto(void * s,void * buf,size_t * szp,nni_type t)249 sock_get_proto(void *s, void *buf, size_t *szp, nni_type t)
250 {
251 return (nni_copyout_int(nni_sock_proto_id(SOCK(s)), buf, szp, t));
252 }
253
254 static int
sock_get_peer(void * s,void * buf,size_t * szp,nni_type t)255 sock_get_peer(void *s, void *buf, size_t *szp, nni_type t)
256 {
257 return (nni_copyout_int(nni_sock_peer_id(SOCK(s)), buf, szp, t));
258 }
259
260 static int
sock_get_protoname(void * s,void * buf,size_t * szp,nni_type t)261 sock_get_protoname(void *s, void *buf, size_t *szp, nni_type t)
262 {
263 return (nni_copyout_str(nni_sock_proto_name(SOCK(s)), buf, szp, t));
264 }
265
266 static int
sock_get_peername(void * s,void * buf,size_t * szp,nni_type t)267 sock_get_peername(void *s, void *buf, size_t *szp, nni_type t)
268 {
269 return (nni_copyout_str(nni_sock_peer_name(SOCK(s)), buf, szp, t));
270 }
271
272 static const nni_option sock_options[] = {
273 {
274 .o_name = NNG_OPT_RECVTIMEO,
275 .o_get = sock_get_recvtimeo,
276 .o_set = sock_set_recvtimeo,
277 },
278 {
279 .o_name = NNG_OPT_SENDTIMEO,
280 .o_get = sock_get_sendtimeo,
281 .o_set = sock_set_sendtimeo,
282 },
283 {
284 .o_name = NNG_OPT_RECVFD,
285 .o_get = sock_get_recvfd,
286 },
287 {
288 .o_name = NNG_OPT_SENDFD,
289 .o_get = sock_get_sendfd,
290 },
291 {
292 .o_name = NNG_OPT_RECVBUF,
293 .o_get = sock_get_recvbuf,
294 .o_set = sock_set_recvbuf,
295 },
296 {
297 .o_name = NNG_OPT_SENDBUF,
298 .o_get = sock_get_sendbuf,
299 .o_set = sock_set_sendbuf,
300 },
301 {
302 .o_name = NNG_OPT_SOCKNAME,
303 .o_get = sock_get_sockname,
304 .o_set = sock_set_sockname,
305 },
306 {
307 .o_name = NNG_OPT_RAW,
308 .o_get = sock_get_raw,
309 },
310 {
311 .o_name = NNG_OPT_PROTO,
312 .o_get = sock_get_proto,
313 },
314 {
315 .o_name = NNG_OPT_PEER,
316 .o_get = sock_get_peer,
317 },
318 {
319 .o_name = NNG_OPT_PROTONAME,
320 .o_get = sock_get_protoname,
321 },
322 {
323 .o_name = NNG_OPT_PEERNAME,
324 .o_get = sock_get_peername,
325 },
326 // terminate list
327 {
328 .o_name = NULL,
329 },
330 };
331
332 static void
nni_free_opt(nni_sockopt * opt)333 nni_free_opt(nni_sockopt *opt)
334 {
335 nni_strfree(opt->name);
336 nni_free(opt->data, opt->sz);
337 NNI_FREE_STRUCT(opt);
338 }
339
340 uint32_t
nni_sock_id(nni_sock * s)341 nni_sock_id(nni_sock *s)
342 {
343 return (s->s_id);
344 }
345
346 // nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
347 // the upper read and write queues.
348 nni_msgq *
nni_sock_sendq(nni_sock * s)349 nni_sock_sendq(nni_sock *s)
350 {
351 return (s->s_uwq);
352 }
353
354 nni_msgq *
nni_sock_recvq(nni_sock * s)355 nni_sock_recvq(nni_sock *s)
356 {
357 return (s->s_urq);
358 }
359
360 int
nni_sock_find(nni_sock ** sockp,uint32_t id)361 nni_sock_find(nni_sock **sockp, uint32_t id)
362 {
363 int rv;
364 nni_sock *s;
365
366 if ((rv = nni_init()) != 0) {
367 return (rv);
368 }
369 nni_mtx_lock(&sock_lk);
370 if ((s = nni_id_get(&sock_ids, id)) != NULL) {
371 if (s->s_closed) {
372 rv = NNG_ECLOSED;
373 } else {
374 s->s_ref++;
375 *sockp = s;
376 }
377 } else {
378 rv = NNG_ECLOSED;
379 }
380 nni_mtx_unlock(&sock_lk);
381
382 return (rv);
383 }
384
385 void
nni_sock_rele(nni_sock * s)386 nni_sock_rele(nni_sock *s)
387 {
388 nni_mtx_lock(&sock_lk);
389 s->s_ref--;
390 if (s->s_closed && (s->s_ref < 2)) {
391 nni_cv_wake(&s->s_close_cv);
392 }
393 nni_mtx_unlock(&sock_lk);
394 }
395
396 #ifdef NNG_ENABLE_STATS
397 static void
sock_stat_init(nni_sock * s,nni_stat_item * item,const nni_stat_info * info)398 sock_stat_init(nni_sock *s, nni_stat_item *item, const nni_stat_info *info)
399 {
400 nni_stat_init(item, info);
401 nni_stat_add(&s->st_root, item);
402 }
403
404 static void
sock_stats_init(nni_sock * s)405 sock_stats_init(nni_sock *s)
406 {
407 static const nni_stat_info root_info = {
408 .si_name = "socket",
409 .si_desc = "socket statistics",
410 .si_type = NNG_STAT_SCOPE,
411 };
412 static const nni_stat_info id_info = {
413 .si_name = "id",
414 .si_desc = "socket identifier",
415 .si_type = NNG_STAT_ID,
416 };
417 static const nni_stat_info name_info = {
418 .si_name = "name",
419 .si_desc = "socket name",
420 .si_type = NNG_STAT_STRING,
421 .si_alloc = true,
422 };
423 static const nni_stat_info protocol_info = {
424 .si_name = "protocol",
425 .si_desc = "socket protocol",
426 .si_type = NNG_STAT_STRING,
427 };
428 static const nni_stat_info dialers_info = {
429 .si_name = "dialers",
430 .si_desc = "open dialers",
431 .si_type = NNG_STAT_LEVEL,
432 .si_atomic = true,
433 };
434 static const nni_stat_info listeners_info = {
435 .si_name = "listeners",
436 .si_desc = "open listeners",
437 .si_type = NNG_STAT_LEVEL,
438 .si_atomic = true,
439 };
440 static const nni_stat_info pipes_info = {
441 .si_name = "pipes",
442 .si_desc = "open pipes",
443 .si_type = NNG_STAT_LEVEL,
444 .si_atomic = true,
445 };
446 static const nni_stat_info reject_info = {
447 .si_name = "reject",
448 .si_desc = "rejected pipes",
449 .si_type = NNG_STAT_COUNTER,
450 .si_atomic = true,
451 };
452 static const nni_stat_info tx_msgs_info = {
453 .si_name = "tx_msgs",
454 .si_desc = "sent messages",
455 .si_type = NNG_STAT_COUNTER,
456 .si_unit = NNG_UNIT_MESSAGES,
457 .si_atomic = true,
458 };
459 static const nni_stat_info rx_msgs_info = {
460 .si_name = "rx_msgs",
461 .si_desc = "received messages",
462 .si_type = NNG_STAT_COUNTER,
463 .si_unit = NNG_UNIT_MESSAGES,
464 .si_atomic = true,
465 };
466 static const nni_stat_info tx_bytes_info = {
467 .si_name = "tx_bytes",
468 .si_desc = "sent bytes",
469 .si_type = NNG_STAT_COUNTER,
470 .si_unit = NNG_UNIT_BYTES,
471 .si_atomic = true,
472 };
473 static const nni_stat_info rx_bytes_info = {
474 .si_name = "rx_bytes",
475 .si_desc = "received messages",
476 .si_type = NNG_STAT_COUNTER,
477 .si_unit = NNG_UNIT_BYTES,
478 .si_atomic = true,
479 };
480
481 // To make collection cheap and atomic for the socket,
482 // we just use a single lock for the entire chain.
483
484 nni_stat_init(&s->st_root, &root_info);
485 sock_stat_init(s, &s->st_id, &id_info);
486 sock_stat_init(s, &s->st_name, &name_info);
487 sock_stat_init(s, &s->st_protocol, &protocol_info);
488 sock_stat_init(s, &s->st_dialers, &dialers_info);
489 sock_stat_init(s, &s->st_listeners, &listeners_info);
490 sock_stat_init(s, &s->st_pipes, &pipes_info);
491 sock_stat_init(s, &s->st_rejects, &reject_info);
492 sock_stat_init(s, &s->st_tx_msgs, &tx_msgs_info);
493 sock_stat_init(s, &s->st_rx_msgs, &rx_msgs_info);
494 sock_stat_init(s, &s->st_tx_bytes, &tx_bytes_info);
495 sock_stat_init(s, &s->st_rx_bytes, &rx_bytes_info);
496
497 nni_stat_set_id(&s->st_id, (int) s->s_id);
498 nni_stat_set_string(&s->st_name, s->s_name);
499 nni_stat_set_string(&s->st_protocol, nni_sock_proto_name(s));
500 }
501 #endif
502
503 static void
sock_destroy(nni_sock * s)504 sock_destroy(nni_sock *s)
505 {
506 nni_sockopt *sopt;
507
508 #ifdef NNG_ENABLE_STATS
509 nni_stat_unregister(&s->st_root);
510 #endif
511
512 // The protocol needs to clean up its state.
513 if (s->s_data != NULL) {
514 s->s_sock_ops.sock_fini(s->s_data);
515 }
516
517 nni_mtx_lock(&s->s_mx);
518 while ((sopt = nni_list_first(&s->s_options)) != NULL) {
519 nni_list_remove(&s->s_options, sopt);
520 nni_free_opt(sopt);
521 }
522 nni_mtx_unlock(&s->s_mx);
523
524 nni_msgq_fini(s->s_urq);
525 nni_msgq_fini(s->s_uwq);
526 nni_cv_fini(&s->s_close_cv);
527 nni_cv_fini(&s->s_cv);
528 nni_mtx_fini(&s->s_mx);
529 nni_mtx_fini(&s->s_pipe_cbs_mtx);
530 nni_free(s, s->s_size);
531 }
532
533 static int
nni_sock_create(nni_sock ** sp,const nni_proto * proto)534 nni_sock_create(nni_sock **sp, const nni_proto *proto)
535 {
536 int rv;
537 nni_sock *s;
538 bool on;
539 size_t sz;
540
541 sz = NNI_ALIGN_UP(sizeof(*s)) + proto->proto_sock_ops->sock_size;
542 if ((s = nni_zalloc(sz)) == NULL) {
543 return (NNG_ENOMEM);
544 }
545 s->s_data = s + 1;
546 s->s_sndtimeo = -1;
547 s->s_rcvtimeo = -1;
548 s->s_reconn = NNI_SECOND;
549 s->s_reconnmax = 0;
550 s->s_rcvmaxsz = 0; // unlimited by default
551 s->s_id = 0;
552 s->s_ref = 0;
553 s->s_self_id = proto->proto_self;
554 s->s_peer_id = proto->proto_peer;
555 s->s_flags = proto->proto_flags;
556 s->s_sock_ops = *proto->proto_sock_ops;
557 s->s_pipe_ops = *proto->proto_pipe_ops;
558 s->s_closed = false;
559 s->s_closing = false;
560
561 if (proto->proto_ctx_ops != NULL) {
562 s->s_ctx_ops = *proto->proto_ctx_ops;
563 }
564
565 NNI_ASSERT(s->s_sock_ops.sock_open != NULL);
566 NNI_ASSERT(s->s_sock_ops.sock_close != NULL);
567
568 NNI_LIST_NODE_INIT(&s->s_node);
569 NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
570 NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
571 NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node);
572 NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node);
573 NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node);
574 nni_mtx_init(&s->s_mx);
575 nni_mtx_init(&s->s_pipe_cbs_mtx);
576 nni_cv_init(&s->s_cv, &s->s_mx);
577 nni_cv_init(&s->s_close_cv, &sock_lk);
578
579 #ifdef NNG_ENABLE_STATS
580 sock_stats_init(s);
581 #endif
582
583 if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
584 ((rv = nni_msgq_init(&s->s_urq, 1)) != 0) ||
585 ((rv = s->s_sock_ops.sock_init(s->s_data, s)) != 0) ||
586 ((rv = nni_sock_setopt(s, NNG_OPT_SENDTIMEO, &s->s_sndtimeo,
587 sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
588 ((rv = nni_sock_setopt(s, NNG_OPT_RECVTIMEO, &s->s_rcvtimeo,
589 sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
590 ((rv = nni_sock_setopt(s, NNG_OPT_RECONNMINT, &s->s_reconn,
591 sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
592 ((rv = nni_sock_setopt(s, NNG_OPT_RECONNMAXT, &s->s_reconnmax,
593 sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
594 ((rv = nni_sock_setopt(s, NNG_OPT_RECVMAXSZ, &s->s_rcvmaxsz,
595 sizeof(size_t), NNI_TYPE_SIZE)) != 0)) {
596 sock_destroy(s);
597 return (rv);
598 }
599
600 // These we *attempt* to call so that we are likely to have initial
601 // values loaded. They should not fail, but if they do we don't
602 // worry about it.
603 on = true;
604 (void) nni_sock_setopt(
605 s, NNG_OPT_TCP_NODELAY, &on, sizeof(on), NNI_TYPE_BOOL);
606 on = false;
607 (void) nni_sock_setopt(
608 s, NNG_OPT_TCP_KEEPALIVE, &on, sizeof(on), NNI_TYPE_BOOL);
609
610 *sp = s;
611 return (rv);
612 }
613
614 int
nni_sock_sys_init(void)615 nni_sock_sys_init(void)
616 {
617 NNI_LIST_INIT(&sock_list, nni_sock, s_node);
618 nni_mtx_init(&sock_lk);
619
620 nni_id_map_init(&sock_ids, 1, 0x7fffffff, false);
621 nni_id_map_init(&ctx_ids, 1, 0x7fffffff, false);
622 inited = true;
623 return (0);
624 }
625
626 void
nni_sock_sys_fini(void)627 nni_sock_sys_fini(void)
628 {
629 nni_id_map_fini(&sock_ids);
630 nni_id_map_fini(&ctx_ids);
631 nni_mtx_fini(&sock_lk);
632 inited = false;
633 }
634
635 int
nni_sock_open(nni_sock ** sockp,const nni_proto * proto)636 nni_sock_open(nni_sock **sockp, const nni_proto *proto)
637 {
638 nni_sock *s = NULL;
639 int rv;
640
641 if (proto->proto_version != NNI_PROTOCOL_VERSION) {
642 // unsupported protocol version
643 return (NNG_ENOTSUP);
644 }
645
646 if (((rv = nni_init()) != 0) ||
647 ((rv = nni_sock_create(&s, proto)) != 0)) {
648 return (rv);
649 }
650
651 nni_mtx_lock(&sock_lk);
652 if (nni_id_alloc(&sock_ids, &s->s_id, s) != 0) {
653 sock_destroy(s);
654 } else {
655 nni_list_append(&sock_list, s);
656 s->s_sock_ops.sock_open(s->s_data);
657 *sockp = s;
658 }
659 nni_mtx_unlock(&sock_lk);
660
661 // Set the socket name.
662 (void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
663
664 #ifdef NNG_ENABLE_STATS
665 // Set up basic stat values.
666 nni_stat_set_id(&s->st_id, (int) s->s_id);
667
668 // Add our stats chain.
669 nni_stat_register(&s->st_root);
670 #endif
671
672 return (0);
673 }
674
675 // nni_sock_shutdown shuts down the socket; after this point no
676 // further access to the socket will function, and any threads blocked
677 // in entry points will be woken (and the functions they are blocked
678 // in will return NNG_ECLOSED.)
679 int
nni_sock_shutdown(nni_sock * sock)680 nni_sock_shutdown(nni_sock *sock)
681 {
682 nni_pipe * pipe;
683 nni_dialer * d;
684 nni_listener *l;
685 nni_ctx * ctx;
686 nni_ctx * nctx;
687
688 nni_mtx_lock(&sock->s_mx);
689 if (sock->s_closing) {
690 nni_mtx_unlock(&sock->s_mx);
691 return (NNG_ECLOSED);
692 }
693 // Mark us closing, so no more EPs or changes can occur.
694 sock->s_closing = true;
695
696 // Close the EPs. This prevents new connections from forming
697 // but but allows existing ones to drain.
698 NNI_LIST_FOREACH (&sock->s_listeners, l) {
699 listener_shutdown_locked(l);
700 }
701 NNI_LIST_FOREACH (&sock->s_dialers, d) {
702 dialer_shutdown_locked(d);
703 }
704
705 nni_mtx_unlock(&sock->s_mx);
706
707 // We now mark any owned contexts as closing.
708 // XXX: Add context draining support here!
709 nni_mtx_lock(&sock_lk);
710 nctx = nni_list_first(&sock->s_ctxs);
711 while ((ctx = nctx) != NULL) {
712 nctx = nni_list_next(&sock->s_ctxs, ctx);
713 ctx->c_closed = true;
714 if (ctx->c_ref == 0) {
715 // No open operations. So close it.
716 nni_id_remove(&ctx_ids, ctx->c_id);
717 nni_list_remove(&sock->s_ctxs, ctx);
718 nni_ctx_destroy(ctx);
719 }
720 // If still has a reference count, then wait for last
721 // reference to close before nuking it.
722 }
723 nni_mtx_unlock(&sock_lk);
724
725 // Generally, unless the protocol is blocked trying to perform
726 // writes (e.g. a slow reader on the other side), it should be
727 // trying to shut things down. We wait to give it
728 // a chance to do so gracefully.
729
730 nni_mtx_lock(&sock_lk);
731 while (!nni_list_empty(&sock->s_ctxs)) {
732 sock->s_ctxwait = true;
733 nni_cv_wait(&sock->s_close_cv);
734 }
735 nni_mtx_unlock(&sock_lk);
736
737 nni_mtx_lock(&sock->s_mx);
738
739 // At this point, we've done everything we politely can to
740 // give the protocol a chance to flush its write side. Now
741 // its time to be a little more insistent.
742
743 // Close the upper queues immediately. This can happen
744 // safely while we hold the lock.
745 nni_msgq_close(sock->s_urq);
746 nni_msgq_close(sock->s_uwq);
747
748 // Go through the dialers and listeners, attempting to close them.
749 // We might already have a close in progress, in which case
750 // we skip past it; it will be removed from another thread.
751 NNI_LIST_FOREACH (&sock->s_listeners, l) {
752 if (nni_listener_hold(l) == 0) {
753 nni_listener_close_rele(l);
754 }
755 }
756 NNI_LIST_FOREACH (&sock->s_dialers, d) {
757 if (nni_dialer_hold(d) == 0) {
758 nni_dialer_close_rele(d);
759 }
760 }
761
762 // For each pipe, arrange for it to teardown hard. We would
763 // expect there not to be any here. However, it is possible for
764 // a pipe to have been added by an endpoint due to racing conditions
765 // in the shutdown. Therefore it is important that we shutdown pipes
766 // *last*.
767 NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
768 nni_pipe_close(pipe);
769 }
770
771 // We have to wait for *both* endpoints and pipes to be
772 // removed.
773 while ((!nni_list_empty(&sock->s_pipes)) ||
774 (!nni_list_empty(&sock->s_listeners)) ||
775 (!nni_list_empty(&sock->s_dialers))) {
776 nni_cv_wait(&sock->s_cv);
777 }
778
779 sock->s_sock_ops.sock_close(sock->s_data);
780
781 nni_cv_wake(&sock->s_cv);
782
783 NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL);
784
785 nni_mtx_unlock(&sock->s_mx);
786
787 // At this point, there are no threads blocked inside of us
788 // that are referencing socket state. User code should call
789 // nng_close to release the last resources.
790 return (0);
791 }
792
793 // nni_sock_close shuts down the socket, then releases any resources
794 // associated with it. It is a programmer error to reference the
795 // socket after this function is called, as the pointer may reference
796 // invalid memory or other objects.
797 void
nni_sock_close(nni_sock * s)798 nni_sock_close(nni_sock *s)
799 {
800 // Shutdown everything if not already done. This operation
801 // is idempotent.
802 nni_sock_shutdown(s);
803
804 nni_mtx_lock(&sock_lk);
805 if (s->s_closed) {
806 // Some other thread called close. All we need to do
807 // is drop our reference count.
808 nni_mtx_unlock(&sock_lk);
809 nni_sock_rele(s);
810 return;
811 }
812 s->s_closed = true;
813 nni_id_remove(&sock_ids, s->s_id);
814
815 // We might have been removed from the list already, e.g. by
816 // nni_sock_closeall. This is idempotent.
817 nni_list_node_remove(&s->s_node);
818
819 // Wait for all other references to drop. Note that we
820 // have a reference already (from our caller).
821 s->s_ctxwait = true;
822 while ((s->s_ref > 1) || (!nni_list_empty(&s->s_ctxs))) {
823 nni_cv_wait(&s->s_close_cv);
824 }
825 nni_mtx_unlock(&sock_lk);
826
827 // Because we already shut everything down before, we should not
828 // have any child objects.
829 nni_mtx_lock(&s->s_mx);
830 NNI_ASSERT(nni_list_empty(&s->s_dialers));
831 NNI_ASSERT(nni_list_empty(&s->s_listeners));
832 NNI_ASSERT(nni_list_empty(&s->s_pipes));
833 nni_mtx_unlock(&s->s_mx);
834
835 sock_destroy(s);
836 }
837
838 void
nni_sock_closeall(void)839 nni_sock_closeall(void)
840 {
841 nni_sock *s;
842
843 if (!inited) {
844 return;
845 }
846 for (;;) {
847 nni_mtx_lock(&sock_lk);
848 if ((s = nni_list_first(&sock_list)) == NULL) {
849 nni_mtx_unlock(&sock_lk);
850 return;
851 }
852 // Bump the reference count. The close call below
853 // will drop it.
854 s->s_ref++;
855 nni_list_node_remove(&s->s_node);
856 nni_mtx_unlock(&sock_lk);
857 nni_sock_close(s);
858 }
859 }
860
861 void
nni_sock_send(nni_sock * sock,nni_aio * aio)862 nni_sock_send(nni_sock *sock, nni_aio *aio)
863 {
864 nni_aio_normalize_timeout(aio, sock->s_sndtimeo);
865 sock->s_sock_ops.sock_send(sock->s_data, aio);
866 }
867
868 void
nni_sock_recv(nni_sock * sock,nni_aio * aio)869 nni_sock_recv(nni_sock *sock, nni_aio *aio)
870 {
871 nni_aio_normalize_timeout(aio, sock->s_rcvtimeo);
872 sock->s_sock_ops.sock_recv(sock->s_data, aio);
873 }
874
875 // nni_sock_proto_id returns the socket's 16-bit protocol number.
876 uint16_t
nni_sock_proto_id(nni_sock * sock)877 nni_sock_proto_id(nni_sock *sock)
878 {
879 return (sock->s_self_id.p_id);
880 }
881
882 // nni_sock_peer_id returns the socket peer's 16-bit protocol number.
883 uint16_t
nni_sock_peer_id(nni_sock * sock)884 nni_sock_peer_id(nni_sock *sock)
885 {
886 return (sock->s_peer_id.p_id);
887 }
888
889 const char *
nni_sock_proto_name(nni_sock * sock)890 nni_sock_proto_name(nni_sock *sock)
891 {
892 return (sock->s_self_id.p_name);
893 }
894
895 const char *
nni_sock_peer_name(nni_sock * sock)896 nni_sock_peer_name(nni_sock *sock)
897 {
898 return (sock->s_peer_id.p_name);
899 }
900
901 struct nni_proto_pipe_ops *
nni_sock_proto_pipe_ops(nni_sock * sock)902 nni_sock_proto_pipe_ops(nni_sock *sock)
903 {
904 return (&sock->s_pipe_ops);
905 }
906
907 void *
nni_sock_proto_data(nni_sock * sock)908 nni_sock_proto_data(nni_sock *sock)
909 {
910 return (sock->s_data);
911 }
912
913 int
nni_sock_add_listener(nni_sock * s,nni_listener * l)914 nni_sock_add_listener(nni_sock *s, nni_listener *l)
915 {
916 nni_sockopt *sopt;
917
918 nni_mtx_lock(&s->s_mx);
919 if (s->s_closing) {
920 nni_mtx_unlock(&s->s_mx);
921 return (NNG_ECLOSED);
922 }
923
924 NNI_LIST_FOREACH (&s->s_options, sopt) {
925 int rv;
926 rv = nni_listener_setopt(
927 l, sopt->name, sopt->data, sopt->sz, sopt->typ);
928 if ((rv != 0) && (rv != NNG_ENOTSUP)) {
929 nni_mtx_unlock(&s->s_mx);
930 return (rv);
931 }
932 }
933
934 nni_list_append(&s->s_listeners, l);
935
936 #ifdef NNG_ENABLE_STATS
937 nni_stat_inc(&s->st_listeners, 1);
938 #endif
939
940 nni_mtx_unlock(&s->s_mx);
941 return (0);
942 }
943
944 int
nni_sock_add_dialer(nni_sock * s,nni_dialer * d)945 nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
946 {
947 nni_sockopt *sopt;
948
949 nni_mtx_lock(&s->s_mx);
950 if (s->s_closing) {
951 nni_mtx_unlock(&s->s_mx);
952 return (NNG_ECLOSED);
953 }
954
955 NNI_LIST_FOREACH (&s->s_options, sopt) {
956 int rv;
957 rv = nni_dialer_setopt(
958 d, sopt->name, sopt->data, sopt->sz, sopt->typ);
959 if ((rv != 0) && (rv != NNG_ENOTSUP)) {
960 nni_mtx_unlock(&s->s_mx);
961 return (rv);
962 }
963 }
964
965 nni_list_append(&s->s_dialers, d);
966
967 #ifdef NNG_ENABLE_STATS
968 nni_stat_inc(&s->st_dialers, 1);
969 #endif
970
971 nni_mtx_unlock(&s->s_mx);
972 return (0);
973 }
974
975 int
nni_sock_setopt(nni_sock * s,const char * name,const void * v,size_t sz,nni_type t)976 nni_sock_setopt(
977 nni_sock *s, const char *name, const void *v, size_t sz, nni_type t)
978 {
979 int rv;
980 nni_sockopt *optv;
981 nni_sockopt *oldv = NULL;
982
983 nni_mtx_lock(&s->s_mx);
984 if (s->s_closing) {
985 nni_mtx_unlock(&s->s_mx);
986 return (NNG_ECLOSED);
987 }
988
989 // Protocol options. The protocol can override options that
990 // the socket framework would otherwise supply, like buffer
991 // sizes.
992 rv = nni_setopt(s->s_sock_ops.sock_options, name, s->s_data, v, sz, t);
993 if (rv != NNG_ENOTSUP) {
994 nni_mtx_unlock(&s->s_mx);
995 return (rv);
996 }
997
998 // Some options do not go down to transports. Handle them directly.
999 rv = nni_setopt(sock_options, name, s, v, sz, t);
1000 if (rv != NNG_ENOTSUP) {
1001 nni_mtx_unlock(&s->s_mx);
1002 return (rv);
1003 }
1004 nni_mtx_unlock(&s->s_mx);
1005
1006 // Validation of generic and transport options.
1007 // NOTE: Setting transport options via socket is deprecated.
1008 // These options should be set on the endpoint to which they apply.
1009 if ((strcmp(name, NNG_OPT_RECONNMINT) == 0) ||
1010 (strcmp(name, NNG_OPT_RECONNMAXT) == 0)) {
1011 if ((rv = nni_copyin_ms(NULL, v, sz, t)) != 0) {
1012 return (rv);
1013 }
1014
1015 } else if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) {
1016 if ((rv = nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)) !=
1017 0) {
1018 return (rv);
1019 }
1020
1021 #if !defined(NNG_ELIDE_DEPRECATED)
1022 // TCP options, set via socket is deprecated.
1023 } else if ((strcmp(name, NNG_OPT_TCP_KEEPALIVE) == 0) ||
1024 (strcmp(name, NNG_OPT_TCP_NODELAY)) == 0) {
1025 if ((rv = nni_copyin_bool(NULL, v, sz, t)) != 0) {
1026 return (rv);
1027 }
1028 #endif
1029
1030 #if defined(NNG_SUPP_TLS) && !defined(NNG_ELIDE_DEPRECATED)
1031 // TLS options may not be supported if TLS is not
1032 // compiled in. Supporting all these is deprecated.
1033 } else if (strcmp(name, NNG_OPT_TLS_CONFIG) == 0) {
1034 if ((rv = nni_copyin_ptr(NULL, v, sz, t)) != 0) {
1035 return (rv);
1036 }
1037 } else if ((strcmp(name, NNG_OPT_TLS_SERVER_NAME) == 0) ||
1038 (strcmp(name, NNG_OPT_TLS_CA_FILE) == 0) ||
1039 (strcmp(name, NNG_OPT_TLS_CERT_KEY_FILE) == 0)) {
1040 if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) {
1041 return (NNG_EBADTYPE);
1042 }
1043 if (nni_strnlen(v, sz) >= sz) {
1044 return (NNG_EINVAL);
1045 }
1046 } else if ((strcmp(name, NNG_OPT_TLS_AUTH_MODE) == 0)) {
1047 // 0, 1, or 2 (none, optional, required)
1048 if ((rv = nni_copyin_int(NULL, v, sz, 0, 2, t)) != 0) {
1049 return (rv);
1050 }
1051 #endif
1052
1053 #if defined(NNG_PLATFORM_POSIX) && !defined(NNG_SUPPRESS_DEPRECATED)
1054 } else if (strcmp(name, NNG_OPT_IPC_PERMISSIONS) == 0) {
1055 // UNIX mode bits are 0777, but allow set id and sticky bits
1056 if ((rv = nni_copyin_int(NULL, v, sz, 0, 07777, t)) != 0) {
1057 return (rv);
1058 }
1059 #endif
1060
1061 #if defined(NNG_PLATFORM_WINDOWS) && !defined(NNG_SUPPRESS_DEPRECATED)
1062 } else if (strcmp(name, NNG_OPT_IPC_SECURITY_DESCRIPTOR) == 0) {
1063 if ((rv = nni_copyin_ptr(NULL, v, sz, t)) == 0) {
1064 return (rv);
1065 }
1066 #endif
1067 }
1068
1069 // Prepare a copy of the socket option.
1070 if ((optv = NNI_ALLOC_STRUCT(optv)) == NULL) {
1071 return (NNG_ENOMEM);
1072 }
1073 if ((optv->data = nni_alloc(sz)) == NULL) {
1074 NNI_FREE_STRUCT(optv);
1075 return (NNG_ENOMEM);
1076 }
1077 if ((optv->name = nni_strdup(name)) == NULL) {
1078 nni_free(optv->data, sz);
1079 NNI_FREE_STRUCT(optv);
1080 return (NNG_ENOMEM);
1081 }
1082 memcpy(optv->data, v, sz);
1083 optv->sz = sz;
1084 optv->typ = t;
1085 NNI_LIST_NODE_INIT(&optv->node);
1086
1087 nni_mtx_lock(&s->s_mx);
1088 NNI_LIST_FOREACH (&s->s_options, oldv) {
1089 if (strcmp(oldv->name, name) == 0) {
1090 if ((oldv->sz != sz) ||
1091 (memcmp(oldv->data, v, sz) != 0)) {
1092 break;
1093 }
1094
1095 // The values are the same. This is a no-op.
1096 nni_mtx_unlock(&s->s_mx);
1097 nni_free_opt(optv);
1098 return (0);
1099 }
1100 }
1101
1102 #ifndef NNG_ELIDE_DEPRCATED
1103 nni_dialer * d;
1104 nni_listener *l;
1105
1106 // Apply the options. Failure to set any option on any
1107 // transport (other than ENOTSUP) stops the operation
1108 // altogether. Its important that transport wide checks
1109 // properly pre-validate.
1110 NNI_LIST_FOREACH (&s->s_listeners, l) {
1111 int x;
1112 x = nni_listener_setopt(l, optv->name, optv->data, sz, t);
1113 if (x != NNG_ENOTSUP) {
1114 if ((rv = x) != 0) {
1115 nni_mtx_unlock(&s->s_mx);
1116 nni_free_opt(optv);
1117 return (rv);
1118 }
1119 }
1120 }
1121 NNI_LIST_FOREACH (&s->s_dialers, d) {
1122 int x;
1123 x = nni_dialer_setopt(d, optv->name, optv->data, sz, t);
1124 if (x != NNG_ENOTSUP) {
1125 if ((rv = x) != 0) {
1126 nni_mtx_unlock(&s->s_mx);
1127 nni_free_opt(optv);
1128 return (rv);
1129 }
1130 }
1131 }
1132 #endif
1133
1134 if (rv == 0) {
1135 // Remove and toss the old value; we are using a new one.
1136 if (oldv != NULL) {
1137 nni_list_remove(&s->s_options, oldv);
1138 nni_free_opt(oldv);
1139 }
1140
1141 // Insert our new value. This permits it to be
1142 // compared against later, and for new endpoints to
1143 // automatically receive these values,
1144 nni_list_append(&s->s_options, optv);
1145 } else {
1146 nni_free_opt(optv);
1147 }
1148
1149 nni_mtx_unlock(&s->s_mx);
1150 return (rv);
1151 }
1152
1153 int
nni_sock_getopt(nni_sock * s,const char * name,void * val,size_t * szp,nni_type t)1154 nni_sock_getopt(
1155 nni_sock *s, const char *name, void *val, size_t *szp, nni_type t)
1156 {
1157 int rv;
1158 nni_sockopt *sopt;
1159
1160 nni_mtx_lock(&s->s_mx);
1161 if (s->s_closing) {
1162 nni_mtx_unlock(&s->s_mx);
1163 return (NNG_ECLOSED);
1164 }
1165
1166 // Protocol specific options. The protocol can override
1167 // options like the send buffer or notification descriptors
1168 // this way.
1169 rv = nni_getopt(
1170 s->s_sock_ops.sock_options, name, s->s_data, val, szp, t);
1171 if (rv != NNG_ENOTSUP) {
1172 nni_mtx_unlock(&s->s_mx);
1173 return (rv);
1174 }
1175
1176 // Socket generic options.
1177 rv = nni_getopt(sock_options, name, s, val, szp, t);
1178 if (rv != NNG_ENOTSUP) {
1179 nni_mtx_unlock(&s->s_mx);
1180 return (rv);
1181 }
1182
1183 NNI_LIST_FOREACH (&s->s_options, sopt) {
1184 if (strcmp(sopt->name, name) == 0) {
1185 size_t sz = sopt->sz;
1186
1187 if ((sopt->typ != NNI_TYPE_OPAQUE) &&
1188 (t != sopt->typ)) {
1189
1190 if (t != NNI_TYPE_OPAQUE) {
1191 nni_mtx_unlock(&s->s_mx);
1192 return (NNG_EBADTYPE);
1193 }
1194 if (*szp != sopt->sz) {
1195 nni_mtx_unlock(&s->s_mx);
1196 return (NNG_EINVAL);
1197 }
1198 }
1199
1200 if (szp != NULL) {
1201 if (sopt->sz > *szp) {
1202 sz = *szp;
1203 }
1204 *szp = sopt->sz;
1205 }
1206 memcpy(val, sopt->data, sz);
1207 rv = 0;
1208 break;
1209 }
1210 }
1211
1212 nni_mtx_unlock(&s->s_mx);
1213 return (rv);
1214 }
1215
1216 uint32_t
nni_sock_flags(nni_sock * sock)1217 nni_sock_flags(nni_sock *sock)
1218 {
1219 return (sock->s_flags);
1220 }
1221
1222 void
nni_sock_set_pipe_cb(nni_sock * s,int ev,nng_pipe_cb cb,void * arg)1223 nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg)
1224 {
1225 if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
1226 nni_mtx_lock(&s->s_pipe_cbs_mtx);
1227 s->s_pipe_cbs[ev].cb_fn = cb;
1228 s->s_pipe_cbs[ev].cb_arg = arg;
1229 nni_mtx_unlock(&s->s_pipe_cbs_mtx);
1230 }
1231 }
1232
1233 int
nni_ctx_find(nni_ctx ** cp,uint32_t id,bool closing)1234 nni_ctx_find(nni_ctx **cp, uint32_t id, bool closing)
1235 {
1236 int rv;
1237 nni_ctx *ctx;
1238
1239 if ((rv = nni_init()) != 0) {
1240 return (rv);
1241 }
1242 nni_mtx_lock(&sock_lk);
1243 if ((ctx = nni_id_get(&ctx_ids, id)) != NULL) {
1244 // We refuse a reference if either the socket is
1245 // closed, or the context is closed. (If the socket
1246 // is closed, and we are only getting the reference so
1247 // we can close it, then we still allow. In the case
1248 // the only valid operation will be to close the
1249 // socket.)
1250 if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) {
1251 rv = NNG_ECLOSED;
1252 } else {
1253 ctx->c_ref++;
1254 *cp = ctx;
1255 }
1256 } else {
1257 rv = NNG_ECLOSED;
1258 }
1259 nni_mtx_unlock(&sock_lk);
1260
1261 return (rv);
1262 }
1263
1264 static void
nni_ctx_destroy(nni_ctx * ctx)1265 nni_ctx_destroy(nni_ctx *ctx)
1266 {
1267 if (ctx->c_data != NULL) {
1268 ctx->c_ops.ctx_fini(ctx->c_data);
1269 }
1270
1271 // Let the socket go, our hold on it is done.
1272 nni_free(ctx, ctx->c_size);
1273 }
1274
1275 void
nni_ctx_rele(nni_ctx * ctx)1276 nni_ctx_rele(nni_ctx *ctx)
1277 {
1278 nni_sock *sock = ctx->c_sock;
1279 nni_mtx_lock(&sock_lk);
1280 ctx->c_ref--;
1281 if ((ctx->c_ref > 0) || (!ctx->c_closed)) {
1282 // Either still have an active reference, or not
1283 // actually closing yet.
1284 nni_mtx_unlock(&sock_lk);
1285 return;
1286 }
1287
1288 // Remove us from the hash, so we can't be found any more.
1289 // This allows our ID to be reused later, although the system
1290 // tries to avoid ID reuse.
1291 nni_id_remove(&ctx_ids, ctx->c_id);
1292 nni_list_remove(&sock->s_ctxs, ctx);
1293 if (sock->s_closed || sock->s_ctxwait) {
1294 nni_cv_wake(&sock->s_close_cv);
1295 }
1296 nni_mtx_unlock(&sock_lk);
1297
1298 nni_ctx_destroy(ctx);
1299 }
1300
1301 int
nni_ctx_open(nni_ctx ** ctxp,nni_sock * sock)1302 nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
1303 {
1304 nni_ctx *ctx;
1305 int rv;
1306 size_t sz;
1307
1308 if (sock->s_ctx_ops.ctx_init == NULL) {
1309 return (NNG_ENOTSUP);
1310 }
1311
1312 sz = NNI_ALIGN_UP(sizeof(*ctx)) + sock->s_ctx_ops.ctx_size;
1313 if ((ctx = nni_zalloc(sz)) == NULL) {
1314 return (NNG_ENOMEM);
1315 }
1316 ctx->c_size = sz;
1317 ctx->c_data = ctx + 1;
1318 ctx->c_closed = false;
1319 ctx->c_ref = 1; // Caller implicitly gets a reference.
1320 ctx->c_sock = sock;
1321 ctx->c_ops = sock->s_ctx_ops;
1322 ctx->c_rcvtimeo = sock->s_rcvtimeo;
1323 ctx->c_sndtimeo = sock->s_sndtimeo;
1324
1325 nni_mtx_lock(&sock_lk);
1326 if (sock->s_closed) {
1327 nni_mtx_unlock(&sock_lk);
1328 nni_free(ctx, ctx->c_size);
1329 return (NNG_ECLOSED);
1330 }
1331 if ((rv = nni_id_alloc(&ctx_ids, &ctx->c_id, ctx)) != 0) {
1332 nni_mtx_unlock(&sock_lk);
1333 nni_free(ctx, ctx->c_size);
1334 return (rv);
1335 }
1336
1337 if ((rv = sock->s_ctx_ops.ctx_init(ctx->c_data, sock->s_data)) != 0) {
1338 nni_id_remove(&ctx_ids, ctx->c_id);
1339 nni_mtx_unlock(&sock_lk);
1340 nni_free(ctx, ctx->c_size);
1341 return (rv);
1342 }
1343
1344 nni_list_append(&sock->s_ctxs, ctx);
1345 nni_mtx_unlock(&sock_lk);
1346
1347 // Paranoia, fixing a possible race in close. Don't let us
1348 // give back a context if the socket is being shutdown (it
1349 // might not have reached the "closed" state yet.)
1350 nni_mtx_lock(&sock->s_mx);
1351 if (sock->s_closing) {
1352 nni_mtx_unlock(&sock->s_mx);
1353 nni_ctx_rele(ctx);
1354 return (NNG_ECLOSED);
1355 }
1356 nni_mtx_unlock(&sock->s_mx);
1357 *ctxp = ctx;
1358
1359 return (0);
1360 }
1361
1362 void
nni_ctx_close(nni_ctx * ctx)1363 nni_ctx_close(nni_ctx *ctx)
1364 {
1365 nni_mtx_lock(&sock_lk);
1366 ctx->c_closed = true;
1367 nni_mtx_unlock(&sock_lk);
1368
1369 nni_ctx_rele(ctx);
1370 }
1371
1372 uint32_t
nni_ctx_id(nni_ctx * ctx)1373 nni_ctx_id(nni_ctx *ctx)
1374 {
1375 return (ctx->c_id);
1376 }
1377
1378 void
nni_ctx_send(nni_ctx * ctx,nni_aio * aio)1379 nni_ctx_send(nni_ctx *ctx, nni_aio *aio)
1380 {
1381 nni_aio_normalize_timeout(aio, ctx->c_sndtimeo);
1382 ctx->c_ops.ctx_send(ctx->c_data, aio);
1383 }
1384
1385 void
nni_ctx_recv(nni_ctx * ctx,nni_aio * aio)1386 nni_ctx_recv(nni_ctx *ctx, nni_aio *aio)
1387 {
1388 nni_aio_normalize_timeout(aio, ctx->c_rcvtimeo);
1389 ctx->c_ops.ctx_recv(ctx->c_data, aio);
1390 }
1391
1392 int
nni_ctx_getopt(nni_ctx * ctx,const char * opt,void * v,size_t * szp,nni_type t)1393 nni_ctx_getopt(nni_ctx *ctx, const char *opt, void *v, size_t *szp, nni_type t)
1394 {
1395 nni_sock * sock = ctx->c_sock;
1396 nni_option *o;
1397 int rv = NNG_ENOTSUP;
1398
1399 nni_mtx_lock(&sock->s_mx);
1400 if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
1401 rv = nni_copyout_ms(ctx->c_rcvtimeo, v, szp, t);
1402 } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
1403 rv = nni_copyout_ms(ctx->c_sndtimeo, v, szp, t);
1404 } else if (ctx->c_ops.ctx_options != NULL) {
1405 for (o = ctx->c_ops.ctx_options; o->o_name != NULL; o++) {
1406 if (strcmp(opt, o->o_name) != 0) {
1407 continue;
1408 }
1409 if (o->o_get == NULL) {
1410 rv = NNG_EWRITEONLY;
1411 break;
1412 }
1413 rv = o->o_get(ctx->c_data, v, szp, t);
1414 break;
1415 }
1416 }
1417 nni_mtx_unlock(&sock->s_mx);
1418 return (rv);
1419 }
1420
1421 int
nni_ctx_setopt(nni_ctx * ctx,const char * opt,const void * v,size_t sz,nni_type t)1422 nni_ctx_setopt(
1423 nni_ctx *ctx, const char *opt, const void *v, size_t sz, nni_type t)
1424 {
1425 nni_sock * sock = ctx->c_sock;
1426 nni_option *o;
1427 int rv = NNG_ENOTSUP;
1428
1429 nni_mtx_lock(&sock->s_mx);
1430 if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
1431 rv = nni_copyin_ms(&ctx->c_rcvtimeo, v, sz, t);
1432 } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
1433 rv = nni_copyin_ms(&ctx->c_sndtimeo, v, sz, t);
1434 } else if (ctx->c_ops.ctx_options != NULL) {
1435 for (o = ctx->c_ops.ctx_options; o->o_name != NULL; o++) {
1436 if (strcmp(opt, o->o_name) != 0) {
1437 continue;
1438 }
1439 if (o->o_set == NULL) {
1440 rv = NNG_EREADONLY;
1441 break;
1442 }
1443 rv = o->o_set(ctx->c_data, v, sz, t);
1444 break;
1445 }
1446 }
1447
1448 nni_mtx_unlock(&sock->s_mx);
1449 return (rv);
1450 }
1451
1452 static void
dialer_timer_start_locked(nni_dialer * d)1453 dialer_timer_start_locked(nni_dialer *d)
1454 {
1455 nni_duration back_off;
1456 nni_sock * sock = d->d_sock;
1457
1458 if (d->d_closing || sock->s_closed) {
1459 return;
1460 }
1461 back_off = d->d_currtime;
1462 if (d->d_maxrtime > 0) {
1463 d->d_currtime *= 2;
1464 if (d->d_currtime > d->d_maxrtime) {
1465 d->d_currtime = d->d_maxrtime;
1466 }
1467 }
1468
1469 // To minimize damage from storms, etc., we select a back-off
1470 // value randomly, in the range of [0, back_off-1]; this is
1471 // pretty similar to 802 style back-off, except that we have a
1472 // nearly uniform time period instead of discrete slot times.
1473 // This algorithm may lead to slight biases because we don't
1474 // have a statistically perfect distribution with the modulo of
1475 // the random number, but this really doesn't matter.
1476 nni_sleep_aio(
1477 back_off ? (int) nni_random() % back_off : 0, &d->d_tmo_aio);
1478 }
1479
1480 void
nni_dialer_timer_start(nni_dialer * d)1481 nni_dialer_timer_start(nni_dialer *d)
1482 {
1483 nni_sock *s = d->d_sock;
1484 nni_mtx_lock(&s->s_mx);
1485 dialer_timer_start_locked(d);
1486 nni_mtx_unlock(&s->s_mx);
1487 }
1488
1489 void
nni_dialer_add_pipe(nni_dialer * d,void * tpipe)1490 nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
1491 {
1492 nni_sock *s = d->d_sock;
1493 nni_pipe *p;
1494
1495 nni_mtx_lock(&s->s_mx);
1496
1497 if (s->s_closed || d->d_closing) {
1498 d->d_tran->tran_pipe->p_fini(tpipe);
1499 nni_mtx_unlock(&s->s_mx);
1500 return;
1501 }
1502 if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
1503 nni_mtx_unlock(&s->s_mx);
1504 return;
1505 }
1506
1507 nni_list_append(&d->d_pipes, p);
1508 nni_list_append(&s->s_pipes, p);
1509 d->d_pipe = p;
1510 d->d_currtime = d->d_inirtime;
1511 nni_mtx_unlock(&s->s_mx);
1512 #ifdef NNG_ENABLE_STATS
1513 nni_stat_inc(&s->st_pipes, 1);
1514 nni_stat_inc(&d->st_pipes, 1);
1515 #endif
1516
1517 nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
1518
1519 nni_mtx_lock(&s->s_mx);
1520 if (p->p_closed) {
1521 nni_mtx_unlock(&s->s_mx);
1522 #ifdef NNG_ENABLE_STATS
1523 nni_stat_inc(&d->st_reject, 1);
1524 nni_stat_inc(&s->st_rejects, 1);
1525 #endif
1526 nni_pipe_rele(p);
1527 return;
1528 }
1529 if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
1530 nni_mtx_unlock(&s->s_mx);
1531 #ifdef NNG_ENABLE_STATS
1532 nni_stat_inc(&d->st_reject, 1);
1533 nni_stat_inc(&s->st_rejects, 1);
1534 #endif
1535 nni_pipe_close(p);
1536 nni_pipe_rele(p);
1537 return;
1538 }
1539 nni_mtx_unlock(&s->s_mx);
1540 #ifdef NNG_ENABLE_STATS
1541 nni_stat_register(&p->st_root);
1542 #endif
1543 nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
1544 nni_pipe_rele(p);
1545 }
1546
1547 static void
dialer_shutdown_impl(nni_dialer * d)1548 dialer_shutdown_impl(nni_dialer *d)
1549 {
1550 nni_pipe *p;
1551
1552 // Abort any remaining in-flight operations.
1553 nni_aio_close(&d->d_con_aio);
1554 nni_aio_close(&d->d_tmo_aio);
1555
1556 // Stop the underlying transport.
1557 d->d_ops.d_close(d->d_data);
1558
1559 NNI_LIST_FOREACH (&d->d_pipes, p) {
1560 nni_pipe_close(p);
1561 }
1562 }
1563
1564 static void
dialer_shutdown_locked(nni_dialer * d)1565 dialer_shutdown_locked(nni_dialer *d)
1566 {
1567 if (!d->d_closing) {
1568 d->d_closing = true;
1569 dialer_shutdown_impl(d);
1570 }
1571 }
1572
1573 void
nni_dialer_shutdown(nni_dialer * d)1574 nni_dialer_shutdown(nni_dialer *d)
1575 {
1576 nni_sock *s = d->d_sock;
1577 nni_mtx_lock(&s->s_mx);
1578 dialer_shutdown_locked(d);
1579 nni_mtx_unlock(&s->s_mx);
1580 }
1581
1582 static void dialer_reap(void *);
1583
1584 static nni_reap_list dialer_reap_list = {
1585 .rl_offset = offsetof(nni_dialer, d_reap),
1586 .rl_func = dialer_reap,
1587 };
1588
1589 static void
dialer_reap(void * arg)1590 dialer_reap(void *arg)
1591 {
1592 nni_dialer *d = arg;
1593 nni_sock * s = d->d_sock;
1594
1595 nni_aio_stop(&d->d_tmo_aio);
1596 nni_aio_stop(&d->d_con_aio);
1597
1598 #ifdef NNG_ENABLE_STATS
1599 nni_stat_unregister(&d->st_root);
1600 #endif
1601
1602 nni_mtx_lock(&s->s_mx);
1603 if (!nni_list_empty(&d->d_pipes)) {
1604 nni_pipe *p;
1605 // This should already have been done, but be certain!
1606 NNI_LIST_FOREACH (&d->d_pipes, p) {
1607 nni_pipe_close(p);
1608 }
1609 nni_mtx_unlock(&s->s_mx);
1610 // Go back to the end of reap list.
1611 nni_dialer_reap(d);
1612 return;
1613 }
1614
1615 nni_list_remove(&s->s_dialers, d);
1616 if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
1617 nni_cv_wake(&s->s_cv);
1618 }
1619
1620 nni_mtx_unlock(&s->s_mx);
1621
1622 nni_dialer_destroy(d);
1623 }
1624
1625 void
nni_dialer_reap(nni_dialer * d)1626 nni_dialer_reap(nni_dialer *d)
1627 {
1628 nni_reap(&dialer_reap_list, d);
1629 }
1630
1631 void
nni_listener_add_pipe(nni_listener * l,void * tpipe)1632 nni_listener_add_pipe(nni_listener *l, void *tpipe)
1633 {
1634 nni_sock *s = l->l_sock;
1635 nni_pipe *p;
1636
1637 nni_mtx_lock(&s->s_mx);
1638 if (s->s_closed || l->l_closing) {
1639 l->l_tran->tran_pipe->p_fini(tpipe);
1640 nni_mtx_unlock(&s->s_mx);
1641 return;
1642 }
1643
1644 if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
1645 nni_mtx_unlock(&s->s_mx);
1646 return;
1647 }
1648
1649 nni_list_append(&l->l_pipes, p);
1650 nni_list_append(&s->s_pipes, p);
1651 nni_mtx_unlock(&s->s_mx);
1652 #ifdef NNG_ENABLE_STATS
1653 nni_stat_inc(&l->st_pipes, 1);
1654 nni_stat_inc(&s->st_pipes, 1);
1655 #endif
1656
1657 nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
1658
1659 nni_mtx_lock(&s->s_mx);
1660 if (p->p_closed) {
1661 nni_mtx_unlock(&s->s_mx);
1662 #ifdef NNG_ENABLE_STATS
1663 nni_stat_inc(&l->st_reject, 1);
1664 nni_stat_inc(&s->st_rejects, 1);
1665 #endif
1666 nni_pipe_rele(p);
1667 return;
1668 }
1669 if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
1670 nni_mtx_unlock(&s->s_mx);
1671 #ifdef NNG_ENABLE_STATS
1672 nni_stat_inc(&l->st_reject, 1);
1673 nni_stat_inc(&s->st_rejects, 1);
1674 #endif
1675 nni_pipe_close(p);
1676 nni_pipe_rele(p);
1677 return;
1678 }
1679 nni_mtx_unlock(&s->s_mx);
1680 #ifdef NNG_ENABLE_STATS
1681 nni_stat_register(&p->st_root);
1682 #endif
1683 nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
1684 nni_pipe_rele(p);
1685 }
1686
1687 static void
listener_shutdown_impl(nni_listener * l)1688 listener_shutdown_impl(nni_listener *l)
1689 {
1690 nni_pipe *p;
1691
1692 // Abort any remaining in-flight accepts.
1693 nni_aio_close(&l->l_acc_aio);
1694 nni_aio_close(&l->l_tmo_aio);
1695
1696 // Stop the underlying transport.
1697 l->l_ops.l_close(l->l_data);
1698
1699 NNI_LIST_FOREACH (&l->l_pipes, p) {
1700 nni_pipe_close(p);
1701 }
1702 }
1703
1704 static void
listener_shutdown_locked(nni_listener * l)1705 listener_shutdown_locked(nni_listener *l)
1706 {
1707 if (!l->l_closing) {
1708 l->l_closing = true;
1709 listener_shutdown_impl(l);
1710 }
1711 }
1712
1713 void
nni_listener_shutdown(nni_listener * l)1714 nni_listener_shutdown(nni_listener *l)
1715 {
1716 nni_sock *s = l->l_sock;
1717
1718 nni_mtx_lock(&s->s_mx);
1719 listener_shutdown_locked(l);
1720 nni_mtx_unlock(&s->s_mx);
1721 }
1722
1723 static void listener_reap(void *);
1724
1725 static nni_reap_list listener_reap_list = {
1726 .rl_offset = offsetof(nni_listener, l_reap),
1727 .rl_func = listener_reap,
1728 };
1729
1730 static void
listener_reap(void * arg)1731 listener_reap(void *arg)
1732 {
1733 nni_listener *l = arg;
1734 nni_sock * s = l->l_sock;
1735
1736 nni_aio_stop(&l->l_tmo_aio);
1737 nni_aio_stop(&l->l_acc_aio);
1738
1739 #ifdef NNG_ENABLE_STATS
1740 nni_stat_unregister(&l->st_root);
1741 #endif
1742
1743 nni_mtx_lock(&s->s_mx);
1744 if (!nni_list_empty(&l->l_pipes)) {
1745 nni_pipe *p;
1746 // This should already have been done, but be certain!
1747 NNI_LIST_FOREACH (&l->l_pipes, p) {
1748 nni_pipe_close(p);
1749 }
1750 nni_mtx_unlock(&s->s_mx);
1751 // Go back to the end of reap list.
1752 nni_reap(&listener_reap_list, l);
1753 return;
1754 }
1755
1756 nni_list_remove(&s->s_listeners, l);
1757 if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
1758 nni_cv_wake(&s->s_cv);
1759 }
1760
1761 nni_mtx_unlock(&s->s_mx);
1762
1763 nni_listener_destroy(l);
1764 }
1765
1766 void
nni_listener_reap(nni_listener * l)1767 nni_listener_reap(nni_listener *l)
1768 {
1769 nni_reap(&listener_reap_list, l);
1770 }
1771
1772 void
nni_pipe_run_cb(nni_pipe * p,nng_pipe_ev ev)1773 nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
1774 {
1775 nni_sock * s = p->p_sock;
1776 nng_pipe_cb cb;
1777 void * arg;
1778
1779 nni_mtx_lock(&s->s_pipe_cbs_mtx);
1780 if (!p->p_cbs) {
1781 if (ev == NNG_PIPE_EV_ADD_PRE) {
1782 // First event, after this we want all other events.
1783 p->p_cbs = true;
1784 } else {
1785 nni_mtx_unlock(&s->s_pipe_cbs_mtx);
1786 return;
1787 }
1788 }
1789 cb = s->s_pipe_cbs[ev].cb_fn;
1790 arg = s->s_pipe_cbs[ev].cb_arg;
1791 nni_mtx_unlock(&s->s_pipe_cbs_mtx);
1792
1793 if (cb != NULL) {
1794 nng_pipe pid;
1795 pid.id = p->p_id;
1796 cb(pid, ev, arg);
1797 }
1798 }
1799
1800 void
nni_pipe_remove(nni_pipe * p)1801 nni_pipe_remove(nni_pipe *p)
1802 {
1803 nni_sock * s = p->p_sock;
1804 nni_dialer *d = p->p_dialer;
1805
1806 nni_mtx_lock(&s->s_mx);
1807 #ifdef NNG_ENABLE_STATS
1808 if (nni_list_node_active(&p->p_sock_node)) {
1809 nni_stat_dec(&s->st_pipes, 1);
1810 }
1811 if (p->p_listener != NULL) {
1812 nni_stat_dec(&p->p_listener->st_pipes, 1);
1813 }
1814 if (p->p_dialer != NULL) {
1815 nni_stat_dec(&p->p_dialer->st_pipes, 1);
1816 }
1817 #endif
1818 nni_list_node_remove(&p->p_sock_node);
1819 nni_list_node_remove(&p->p_ep_node);
1820 p->p_listener = NULL;
1821 p->p_dialer = NULL;
1822 if ((d != NULL) && (d->d_pipe == p)) {
1823 d->d_pipe = NULL;
1824 dialer_timer_start_locked(d); // Kick the timer to redial.
1825 }
1826 if (s->s_closing) {
1827 nni_cv_wake(&s->s_cv);
1828 }
1829 nni_mtx_unlock(&s->s_mx);
1830 }
1831
1832 void
nni_sock_add_stat(nni_sock * s,nni_stat_item * stat)1833 nni_sock_add_stat(nni_sock *s, nni_stat_item *stat)
1834 {
1835 #ifdef NNG_ENABLE_STATS
1836 nni_stat_add(&s->st_root, stat);
1837 #else
1838 NNI_ARG_UNUSED(s);
1839 NNI_ARG_UNUSED(stat);
1840 #endif
1841 }
1842
1843 void
nni_sock_bump_tx(nni_sock * s,uint64_t sz)1844 nni_sock_bump_tx(nni_sock *s, uint64_t sz)
1845 {
1846 #ifdef NNG_ENABLE_STATS
1847 nni_stat_inc(&s->st_tx_msgs, 1);
1848 nni_stat_inc(&s->st_tx_bytes, sz);
1849 #else
1850 NNI_ARG_UNUSED(s);
1851 NNI_ARG_UNUSED(sz);
1852 #endif
1853 }
1854
1855 void
nni_sock_bump_rx(nni_sock * s,uint64_t sz)1856 nni_sock_bump_rx(nni_sock *s, uint64_t sz)
1857 {
1858 #ifdef NNG_ENABLE_STATS
1859 nni_stat_inc(&s->st_rx_msgs, 1);
1860 nni_stat_inc(&s->st_rx_bytes, sz);
1861 #else
1862 NNI_ARG_UNUSED(s);
1863 NNI_ARG_UNUSED(sz);
1864 #endif
1865 }
1866