1 //
2 // Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 //
5 // This software is supplied under the terms of the MIT License, a
6 // copy of which should be located in the distribution where this
7 // file was obtained (LICENSE.txt).  A copy of the license may also be
8 // found online at https://opensource.org/licenses/MIT.
9 //
10 
11 #include "core/nng_impl.h"
12 #include "sockimpl.h"
13 
14 #include <stdio.h>
15 #include <string.h>
16 
17 // Socket implementation.
18 
19 static nni_list   sock_list;
20 static nni_id_map sock_ids;
21 static nni_mtx    sock_lk;
22 static nni_id_map ctx_ids;
23 static bool       inited;
24 
25 struct nni_ctx {
26 	nni_list_node     c_node;
27 	nni_sock *        c_sock;
28 	nni_proto_ctx_ops c_ops;
29 	void *            c_data;
30 	size_t            c_size;
31 	bool              c_closed;
32 	unsigned          c_ref; // protected by global lock
33 	uint32_t          c_id;
34 	nng_duration      c_sndtimeo;
35 	nng_duration      c_rcvtimeo;
36 };
37 
38 typedef struct nni_sockopt {
39 	nni_list_node node;
40 	char *        name;
41 	nni_type      typ;
42 	size_t        sz;
43 	void *        data;
44 } nni_sockopt;
45 
46 typedef struct nni_sock_pipe_cb {
47 	nng_pipe_cb cb_fn;
48 	void *      cb_arg;
49 } nni_sock_pipe_cb;
50 
51 struct nni_socket {
52 	nni_list_node s_node;
53 	nni_mtx       s_mx;
54 	nni_cv        s_cv;
55 	nni_cv        s_close_cv;
56 
57 	uint32_t s_id;
58 	uint32_t s_flags;
59 	unsigned s_ref;  // protected by global lock
60 	void *   s_data; // Protocol private
61 	size_t   s_size;
62 
63 	nni_msgq *s_uwq; // Upper write queue
64 	nni_msgq *s_urq; // Upper read queue
65 
66 	nni_proto_id s_self_id;
67 	nni_proto_id s_peer_id;
68 
69 	nni_proto_pipe_ops s_pipe_ops;
70 	nni_proto_sock_ops s_sock_ops;
71 	nni_proto_ctx_ops  s_ctx_ops;
72 
73 	// options
74 	nni_duration s_sndtimeo;  // send timeout
75 	nni_duration s_rcvtimeo;  // receive timeout
76 	nni_duration s_reconn;    // reconnect time
77 	nni_duration s_reconnmax; // max reconnect time
78 	size_t       s_rcvmaxsz;  // max receive size
79 	nni_list     s_options;   // opts not handled by sock/proto
80 	char         s_name[64];  // socket name (legacy compat)
81 
82 	nni_list s_listeners; // active listeners
83 	nni_list s_dialers;   // active dialers
84 	nni_list s_pipes;     // active pipes
85 	nni_list s_ctxs;      // active contexts (protected by global sock_lk)
86 
87 	bool s_closing; // Socket is closing
88 	bool s_closed;  // Socket closed, protected by global lock
89 	bool s_ctxwait; // Waiting for contexts to close.
90 
91 	nni_mtx          s_pipe_cbs_mtx;
92 	nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
93 
94 #ifdef NNG_ENABLE_STATS
95 	nni_stat_item st_root;      // socket scope
96 	nni_stat_item st_id;        // socket id
97 	nni_stat_item st_name;      // socket name
98 	nni_stat_item st_protocol;  // socket protocol
99 	nni_stat_item st_dialers;   // number of dialers
100 	nni_stat_item st_listeners; // number of listeners
101 	nni_stat_item st_pipes;     // number of pipes
102 	nni_stat_item st_rx_bytes;  // number of bytes received
103 	nni_stat_item st_tx_bytes;  // number of bytes received
104 	nni_stat_item st_rx_msgs;   // number of msgs received
105 	nni_stat_item st_tx_msgs;   // number of msgs sent
106 	nni_stat_item st_rejects;   // pipes rejected
107 #endif
108 };
109 
110 static void nni_ctx_destroy(nni_ctx *);
111 
112 static void dialer_shutdown_locked(nni_dialer *);
113 static void listener_shutdown_locked(nni_listener *);
114 
115 #define SOCK(s) ((nni_sock *) (s))
116 
117 static int
sock_get_fd(void * s,unsigned flag,int * fdp)118 sock_get_fd(void *s, unsigned flag, int *fdp)
119 {
120 	int           rv;
121 	nni_pollable *p;
122 
123 	if ((flag & nni_sock_flags(SOCK(s))) == 0) {
124 		return (NNG_ENOTSUP);
125 	}
126 
127 	if (flag == NNI_PROTO_FLAG_SND) {
128 		rv = nni_msgq_get_sendable(SOCK(s)->s_uwq, &p);
129 	} else {
130 		rv = nni_msgq_get_recvable(SOCK(s)->s_urq, &p);
131 	}
132 
133 	if (rv == 0) {
134 		rv = nni_pollable_getfd(p, fdp);
135 	}
136 
137 	return (rv);
138 }
139 
140 static int
sock_get_sendfd(void * s,void * buf,size_t * szp,nni_type t)141 sock_get_sendfd(void *s, void *buf, size_t *szp, nni_type t)
142 {
143 	int fd;
144 	int rv;
145 
146 	if ((rv = sock_get_fd(SOCK(s), NNI_PROTO_FLAG_SND, &fd)) != 0) {
147 		return (rv);
148 	}
149 	return (nni_copyout_int(fd, buf, szp, t));
150 }
151 
152 static int
sock_get_recvfd(void * s,void * buf,size_t * szp,nni_type t)153 sock_get_recvfd(void *s, void *buf, size_t *szp, nni_type t)
154 {
155 	int fd;
156 	int rv;
157 
158 	if ((rv = sock_get_fd(SOCK(s), NNI_PROTO_FLAG_RCV, &fd)) != 0) {
159 		return (rv);
160 	}
161 	return (nni_copyout_int(fd, buf, szp, t));
162 }
163 
164 static int
sock_get_raw(void * s,void * buf,size_t * szp,nni_type t)165 sock_get_raw(void *s, void *buf, size_t *szp, nni_type t)
166 {
167 	bool raw = ((nni_sock_flags(SOCK(s)) & NNI_PROTO_FLAG_RAW) != 0);
168 	return (nni_copyout_bool(raw, buf, szp, t));
169 }
170 
171 static int
sock_set_recvtimeo(void * s,const void * buf,size_t sz,nni_type t)172 sock_set_recvtimeo(void *s, const void *buf, size_t sz, nni_type t)
173 {
174 	return (nni_copyin_ms(&SOCK(s)->s_rcvtimeo, buf, sz, t));
175 }
176 
177 static int
sock_get_recvtimeo(void * s,void * buf,size_t * szp,nni_type t)178 sock_get_recvtimeo(void *s, void *buf, size_t *szp, nni_type t)
179 {
180 	return (nni_copyout_ms(SOCK(s)->s_rcvtimeo, buf, szp, t));
181 }
182 
183 static int
sock_set_sendtimeo(void * s,const void * buf,size_t sz,nni_type t)184 sock_set_sendtimeo(void *s, const void *buf, size_t sz, nni_type t)
185 {
186 	return (nni_copyin_ms(&SOCK(s)->s_sndtimeo, buf, sz, t));
187 }
188 
189 static int
sock_get_sendtimeo(void * s,void * buf,size_t * szp,nni_type t)190 sock_get_sendtimeo(void *s, void *buf, size_t *szp, nni_type t)
191 {
192 	return (nni_copyout_ms(SOCK(s)->s_sndtimeo, buf, szp, t));
193 }
194 
195 static int
sock_set_recvbuf(void * s,const void * buf,size_t sz,nni_type t)196 sock_set_recvbuf(void *s, const void *buf, size_t sz, nni_type t)
197 {
198 	int len;
199 	int rv;
200 
201 	if ((rv = nni_copyin_int(&len, buf, sz, 0, 8192, t)) != 0) {
202 		return (rv);
203 	}
204 	return (nni_msgq_resize(SOCK(s)->s_urq, len));
205 }
206 
207 static int
sock_get_recvbuf(void * s,void * buf,size_t * szp,nni_type t)208 sock_get_recvbuf(void *s, void *buf, size_t *szp, nni_type t)
209 {
210 	int len = nni_msgq_cap(SOCK(s)->s_urq);
211 
212 	return (nni_copyout_int(len, buf, szp, t));
213 }
214 
215 static int
sock_set_sendbuf(void * s,const void * buf,size_t sz,nni_type t)216 sock_set_sendbuf(void *s, const void *buf, size_t sz, nni_type t)
217 {
218 	int len;
219 	int rv;
220 
221 	if ((rv = nni_copyin_int(&len, buf, sz, 0, 8192, t)) != 0) {
222 		return (rv);
223 	}
224 	return (nni_msgq_resize(SOCK(s)->s_uwq, len));
225 }
226 
227 static int
sock_get_sendbuf(void * s,void * buf,size_t * szp,nni_type t)228 sock_get_sendbuf(void *s, void *buf, size_t *szp, nni_type t)
229 {
230 	int len = nni_msgq_cap(SOCK(s)->s_uwq);
231 
232 	return (nni_copyout_int(len, buf, szp, t));
233 }
234 
235 static int
sock_get_sockname(void * s,void * buf,size_t * szp,nni_type t)236 sock_get_sockname(void *s, void *buf, size_t *szp, nni_type t)
237 {
238 	return (nni_copyout_str(SOCK(s)->s_name, buf, szp, t));
239 }
240 
241 static int
sock_set_sockname(void * s,const void * buf,size_t sz,nni_type t)242 sock_set_sockname(void *s, const void *buf, size_t sz, nni_type t)
243 {
244 	return (nni_copyin_str(
245 	    SOCK(s)->s_name, buf, sizeof(SOCK(s)->s_name), sz, t));
246 }
247 
248 static int
sock_get_proto(void * s,void * buf,size_t * szp,nni_type t)249 sock_get_proto(void *s, void *buf, size_t *szp, nni_type t)
250 {
251 	return (nni_copyout_int(nni_sock_proto_id(SOCK(s)), buf, szp, t));
252 }
253 
254 static int
sock_get_peer(void * s,void * buf,size_t * szp,nni_type t)255 sock_get_peer(void *s, void *buf, size_t *szp, nni_type t)
256 {
257 	return (nni_copyout_int(nni_sock_peer_id(SOCK(s)), buf, szp, t));
258 }
259 
260 static int
sock_get_protoname(void * s,void * buf,size_t * szp,nni_type t)261 sock_get_protoname(void *s, void *buf, size_t *szp, nni_type t)
262 {
263 	return (nni_copyout_str(nni_sock_proto_name(SOCK(s)), buf, szp, t));
264 }
265 
266 static int
sock_get_peername(void * s,void * buf,size_t * szp,nni_type t)267 sock_get_peername(void *s, void *buf, size_t *szp, nni_type t)
268 {
269 	return (nni_copyout_str(nni_sock_peer_name(SOCK(s)), buf, szp, t));
270 }
271 
272 static const nni_option sock_options[] = {
273 	{
274 	    .o_name = NNG_OPT_RECVTIMEO,
275 	    .o_get  = sock_get_recvtimeo,
276 	    .o_set  = sock_set_recvtimeo,
277 	},
278 	{
279 	    .o_name = NNG_OPT_SENDTIMEO,
280 	    .o_get  = sock_get_sendtimeo,
281 	    .o_set  = sock_set_sendtimeo,
282 	},
283 	{
284 	    .o_name = NNG_OPT_RECVFD,
285 	    .o_get  = sock_get_recvfd,
286 	},
287 	{
288 	    .o_name = NNG_OPT_SENDFD,
289 	    .o_get  = sock_get_sendfd,
290 	},
291 	{
292 	    .o_name = NNG_OPT_RECVBUF,
293 	    .o_get  = sock_get_recvbuf,
294 	    .o_set  = sock_set_recvbuf,
295 	},
296 	{
297 	    .o_name = NNG_OPT_SENDBUF,
298 	    .o_get  = sock_get_sendbuf,
299 	    .o_set  = sock_set_sendbuf,
300 	},
301 	{
302 	    .o_name = NNG_OPT_SOCKNAME,
303 	    .o_get  = sock_get_sockname,
304 	    .o_set  = sock_set_sockname,
305 	},
306 	{
307 	    .o_name = NNG_OPT_RAW,
308 	    .o_get  = sock_get_raw,
309 	},
310 	{
311 	    .o_name = NNG_OPT_PROTO,
312 	    .o_get  = sock_get_proto,
313 	},
314 	{
315 	    .o_name = NNG_OPT_PEER,
316 	    .o_get  = sock_get_peer,
317 	},
318 	{
319 	    .o_name = NNG_OPT_PROTONAME,
320 	    .o_get  = sock_get_protoname,
321 	},
322 	{
323 	    .o_name = NNG_OPT_PEERNAME,
324 	    .o_get  = sock_get_peername,
325 	},
326 	// terminate list
327 	{
328 	    .o_name = NULL,
329 	},
330 };
331 
332 static void
nni_free_opt(nni_sockopt * opt)333 nni_free_opt(nni_sockopt *opt)
334 {
335 	nni_strfree(opt->name);
336 	nni_free(opt->data, opt->sz);
337 	NNI_FREE_STRUCT(opt);
338 }
339 
340 uint32_t
nni_sock_id(nni_sock * s)341 nni_sock_id(nni_sock *s)
342 {
343 	return (s->s_id);
344 }
345 
346 // nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
347 // the upper read and write queues.
348 nni_msgq *
nni_sock_sendq(nni_sock * s)349 nni_sock_sendq(nni_sock *s)
350 {
351 	return (s->s_uwq);
352 }
353 
354 nni_msgq *
nni_sock_recvq(nni_sock * s)355 nni_sock_recvq(nni_sock *s)
356 {
357 	return (s->s_urq);
358 }
359 
360 int
nni_sock_find(nni_sock ** sockp,uint32_t id)361 nni_sock_find(nni_sock **sockp, uint32_t id)
362 {
363 	int       rv;
364 	nni_sock *s;
365 
366 	if ((rv = nni_init()) != 0) {
367 		return (rv);
368 	}
369 	nni_mtx_lock(&sock_lk);
370 	if ((s = nni_id_get(&sock_ids, id)) != NULL) {
371 		if (s->s_closed) {
372 			rv = NNG_ECLOSED;
373 		} else {
374 			s->s_ref++;
375 			*sockp = s;
376 		}
377 	} else {
378 		rv = NNG_ECLOSED;
379 	}
380 	nni_mtx_unlock(&sock_lk);
381 
382 	return (rv);
383 }
384 
385 void
nni_sock_rele(nni_sock * s)386 nni_sock_rele(nni_sock *s)
387 {
388 	nni_mtx_lock(&sock_lk);
389 	s->s_ref--;
390 	if (s->s_closed && (s->s_ref < 2)) {
391 		nni_cv_wake(&s->s_close_cv);
392 	}
393 	nni_mtx_unlock(&sock_lk);
394 }
395 
396 #ifdef NNG_ENABLE_STATS
397 static void
sock_stat_init(nni_sock * s,nni_stat_item * item,const nni_stat_info * info)398 sock_stat_init(nni_sock *s, nni_stat_item *item, const nni_stat_info *info)
399 {
400 	nni_stat_init(item, info);
401 	nni_stat_add(&s->st_root, item);
402 }
403 
404 static void
sock_stats_init(nni_sock * s)405 sock_stats_init(nni_sock *s)
406 {
407 	static const nni_stat_info root_info = {
408 		.si_name = "socket",
409 		.si_desc = "socket statistics",
410 		.si_type = NNG_STAT_SCOPE,
411 	};
412 	static const nni_stat_info id_info = {
413 		.si_name = "id",
414 		.si_desc = "socket identifier",
415 		.si_type = NNG_STAT_ID,
416 	};
417 	static const nni_stat_info name_info = {
418 		.si_name  = "name",
419 		.si_desc  = "socket name",
420 		.si_type  = NNG_STAT_STRING,
421 		.si_alloc = true,
422 	};
423 	static const nni_stat_info protocol_info = {
424 		.si_name = "protocol",
425 		.si_desc = "socket protocol",
426 		.si_type = NNG_STAT_STRING,
427 	};
428 	static const nni_stat_info dialers_info = {
429 		.si_name   = "dialers",
430 		.si_desc   = "open dialers",
431 		.si_type   = NNG_STAT_LEVEL,
432 		.si_atomic = true,
433 	};
434 	static const nni_stat_info listeners_info = {
435 		.si_name   = "listeners",
436 		.si_desc   = "open listeners",
437 		.si_type   = NNG_STAT_LEVEL,
438 		.si_atomic = true,
439 	};
440 	static const nni_stat_info pipes_info = {
441 		.si_name   = "pipes",
442 		.si_desc   = "open pipes",
443 		.si_type   = NNG_STAT_LEVEL,
444 		.si_atomic = true,
445 	};
446 	static const nni_stat_info reject_info = {
447 		.si_name   = "reject",
448 		.si_desc   = "rejected pipes",
449 		.si_type   = NNG_STAT_COUNTER,
450 		.si_atomic = true,
451 	};
452 	static const nni_stat_info tx_msgs_info = {
453 		.si_name   = "tx_msgs",
454 		.si_desc   = "sent messages",
455 		.si_type   = NNG_STAT_COUNTER,
456 		.si_unit   = NNG_UNIT_MESSAGES,
457 		.si_atomic = true,
458 	};
459 	static const nni_stat_info rx_msgs_info = {
460 		.si_name   = "rx_msgs",
461 		.si_desc   = "received messages",
462 		.si_type   = NNG_STAT_COUNTER,
463 		.si_unit   = NNG_UNIT_MESSAGES,
464 		.si_atomic = true,
465 	};
466 	static const nni_stat_info tx_bytes_info = {
467 		.si_name   = "tx_bytes",
468 		.si_desc   = "sent bytes",
469 		.si_type   = NNG_STAT_COUNTER,
470 		.si_unit   = NNG_UNIT_BYTES,
471 		.si_atomic = true,
472 	};
473 	static const nni_stat_info rx_bytes_info = {
474 		.si_name   = "rx_bytes",
475 		.si_desc   = "received messages",
476 		.si_type   = NNG_STAT_COUNTER,
477 		.si_unit   = NNG_UNIT_BYTES,
478 		.si_atomic = true,
479 	};
480 
481 	// To make collection cheap and atomic for the socket,
482 	// we just use a single lock for the entire chain.
483 
484 	nni_stat_init(&s->st_root, &root_info);
485 	sock_stat_init(s, &s->st_id, &id_info);
486 	sock_stat_init(s, &s->st_name, &name_info);
487 	sock_stat_init(s, &s->st_protocol, &protocol_info);
488 	sock_stat_init(s, &s->st_dialers, &dialers_info);
489 	sock_stat_init(s, &s->st_listeners, &listeners_info);
490 	sock_stat_init(s, &s->st_pipes, &pipes_info);
491 	sock_stat_init(s, &s->st_rejects, &reject_info);
492 	sock_stat_init(s, &s->st_tx_msgs, &tx_msgs_info);
493 	sock_stat_init(s, &s->st_rx_msgs, &rx_msgs_info);
494 	sock_stat_init(s, &s->st_tx_bytes, &tx_bytes_info);
495 	sock_stat_init(s, &s->st_rx_bytes, &rx_bytes_info);
496 
497 	nni_stat_set_id(&s->st_id, (int) s->s_id);
498 	nni_stat_set_string(&s->st_name, s->s_name);
499 	nni_stat_set_string(&s->st_protocol, nni_sock_proto_name(s));
500 }
501 #endif
502 
503 static void
sock_destroy(nni_sock * s)504 sock_destroy(nni_sock *s)
505 {
506 	nni_sockopt *sopt;
507 
508 #ifdef NNG_ENABLE_STATS
509 	nni_stat_unregister(&s->st_root);
510 #endif
511 
512 	// The protocol needs to clean up its state.
513 	if (s->s_data != NULL) {
514 		s->s_sock_ops.sock_fini(s->s_data);
515 	}
516 
517 	nni_mtx_lock(&s->s_mx);
518 	while ((sopt = nni_list_first(&s->s_options)) != NULL) {
519 		nni_list_remove(&s->s_options, sopt);
520 		nni_free_opt(sopt);
521 	}
522 	nni_mtx_unlock(&s->s_mx);
523 
524 	nni_msgq_fini(s->s_urq);
525 	nni_msgq_fini(s->s_uwq);
526 	nni_cv_fini(&s->s_close_cv);
527 	nni_cv_fini(&s->s_cv);
528 	nni_mtx_fini(&s->s_mx);
529 	nni_mtx_fini(&s->s_pipe_cbs_mtx);
530 	nni_free(s, s->s_size);
531 }
532 
533 static int
nni_sock_create(nni_sock ** sp,const nni_proto * proto)534 nni_sock_create(nni_sock **sp, const nni_proto *proto)
535 {
536 	int       rv;
537 	nni_sock *s;
538 	bool      on;
539 	size_t    sz;
540 
541 	sz = NNI_ALIGN_UP(sizeof(*s)) + proto->proto_sock_ops->sock_size;
542 	if ((s = nni_zalloc(sz)) == NULL) {
543 		return (NNG_ENOMEM);
544 	}
545 	s->s_data      = s + 1;
546 	s->s_sndtimeo  = -1;
547 	s->s_rcvtimeo  = -1;
548 	s->s_reconn    = NNI_SECOND;
549 	s->s_reconnmax = 0;
550 	s->s_rcvmaxsz  = 0; // unlimited by default
551 	s->s_id        = 0;
552 	s->s_ref       = 0;
553 	s->s_self_id   = proto->proto_self;
554 	s->s_peer_id   = proto->proto_peer;
555 	s->s_flags     = proto->proto_flags;
556 	s->s_sock_ops  = *proto->proto_sock_ops;
557 	s->s_pipe_ops  = *proto->proto_pipe_ops;
558 	s->s_closed    = false;
559 	s->s_closing   = false;
560 
561 	if (proto->proto_ctx_ops != NULL) {
562 		s->s_ctx_ops = *proto->proto_ctx_ops;
563 	}
564 
565 	NNI_ASSERT(s->s_sock_ops.sock_open != NULL);
566 	NNI_ASSERT(s->s_sock_ops.sock_close != NULL);
567 
568 	NNI_LIST_NODE_INIT(&s->s_node);
569 	NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
570 	NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
571 	NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node);
572 	NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node);
573 	NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node);
574 	nni_mtx_init(&s->s_mx);
575 	nni_mtx_init(&s->s_pipe_cbs_mtx);
576 	nni_cv_init(&s->s_cv, &s->s_mx);
577 	nni_cv_init(&s->s_close_cv, &sock_lk);
578 
579 #ifdef NNG_ENABLE_STATS
580 	sock_stats_init(s);
581 #endif
582 
583 	if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
584 	    ((rv = nni_msgq_init(&s->s_urq, 1)) != 0) ||
585 	    ((rv = s->s_sock_ops.sock_init(s->s_data, s)) != 0) ||
586 	    ((rv = nni_sock_setopt(s, NNG_OPT_SENDTIMEO, &s->s_sndtimeo,
587 	          sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
588 	    ((rv = nni_sock_setopt(s, NNG_OPT_RECVTIMEO, &s->s_rcvtimeo,
589 	          sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
590 	    ((rv = nni_sock_setopt(s, NNG_OPT_RECONNMINT, &s->s_reconn,
591 	          sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
592 	    ((rv = nni_sock_setopt(s, NNG_OPT_RECONNMAXT, &s->s_reconnmax,
593 	          sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) ||
594 	    ((rv = nni_sock_setopt(s, NNG_OPT_RECVMAXSZ, &s->s_rcvmaxsz,
595 	          sizeof(size_t), NNI_TYPE_SIZE)) != 0)) {
596 		sock_destroy(s);
597 		return (rv);
598 	}
599 
600 	// These we *attempt* to call so that we are likely to have initial
601 	// values loaded.  They should not fail, but if they do we don't
602 	// worry about it.
603 	on = true;
604 	(void) nni_sock_setopt(
605 	    s, NNG_OPT_TCP_NODELAY, &on, sizeof(on), NNI_TYPE_BOOL);
606 	on = false;
607 	(void) nni_sock_setopt(
608 	    s, NNG_OPT_TCP_KEEPALIVE, &on, sizeof(on), NNI_TYPE_BOOL);
609 
610 	*sp = s;
611 	return (rv);
612 }
613 
614 int
nni_sock_sys_init(void)615 nni_sock_sys_init(void)
616 {
617 	NNI_LIST_INIT(&sock_list, nni_sock, s_node);
618 	nni_mtx_init(&sock_lk);
619 
620 	nni_id_map_init(&sock_ids, 1, 0x7fffffff, false);
621 	nni_id_map_init(&ctx_ids, 1, 0x7fffffff, false);
622 	inited = true;
623 	return (0);
624 }
625 
626 void
nni_sock_sys_fini(void)627 nni_sock_sys_fini(void)
628 {
629 	nni_id_map_fini(&sock_ids);
630 	nni_id_map_fini(&ctx_ids);
631 	nni_mtx_fini(&sock_lk);
632 	inited = false;
633 }
634 
635 int
nni_sock_open(nni_sock ** sockp,const nni_proto * proto)636 nni_sock_open(nni_sock **sockp, const nni_proto *proto)
637 {
638 	nni_sock *s = NULL;
639 	int       rv;
640 
641 	if (proto->proto_version != NNI_PROTOCOL_VERSION) {
642 		// unsupported protocol version
643 		return (NNG_ENOTSUP);
644 	}
645 
646 	if (((rv = nni_init()) != 0) ||
647 	    ((rv = nni_sock_create(&s, proto)) != 0)) {
648 		return (rv);
649 	}
650 
651 	nni_mtx_lock(&sock_lk);
652 	if (nni_id_alloc(&sock_ids, &s->s_id, s) != 0) {
653 		sock_destroy(s);
654 	} else {
655 		nni_list_append(&sock_list, s);
656 		s->s_sock_ops.sock_open(s->s_data);
657 		*sockp = s;
658 	}
659 	nni_mtx_unlock(&sock_lk);
660 
661 	// Set the socket name.
662 	(void) snprintf(s->s_name, sizeof(s->s_name), "%u", s->s_id);
663 
664 #ifdef NNG_ENABLE_STATS
665 	// Set up basic stat values.
666 	nni_stat_set_id(&s->st_id, (int) s->s_id);
667 
668 	// Add our stats chain.
669 	nni_stat_register(&s->st_root);
670 #endif
671 
672 	return (0);
673 }
674 
675 // nni_sock_shutdown shuts down the socket; after this point no
676 // further access to the socket will function, and any threads blocked
677 // in entry points will be woken (and the functions they are blocked
678 // in will return NNG_ECLOSED.)
679 int
nni_sock_shutdown(nni_sock * sock)680 nni_sock_shutdown(nni_sock *sock)
681 {
682 	nni_pipe *    pipe;
683 	nni_dialer *  d;
684 	nni_listener *l;
685 	nni_ctx *     ctx;
686 	nni_ctx *     nctx;
687 
688 	nni_mtx_lock(&sock->s_mx);
689 	if (sock->s_closing) {
690 		nni_mtx_unlock(&sock->s_mx);
691 		return (NNG_ECLOSED);
692 	}
693 	// Mark us closing, so no more EPs or changes can occur.
694 	sock->s_closing = true;
695 
696 	// Close the EPs. This prevents new connections from forming
697 	// but but allows existing ones to drain.
698 	NNI_LIST_FOREACH (&sock->s_listeners, l) {
699 		listener_shutdown_locked(l);
700 	}
701 	NNI_LIST_FOREACH (&sock->s_dialers, d) {
702 		dialer_shutdown_locked(d);
703 	}
704 
705 	nni_mtx_unlock(&sock->s_mx);
706 
707 	// We now mark any owned contexts as closing.
708 	// XXX: Add context draining support here!
709 	nni_mtx_lock(&sock_lk);
710 	nctx = nni_list_first(&sock->s_ctxs);
711 	while ((ctx = nctx) != NULL) {
712 		nctx          = nni_list_next(&sock->s_ctxs, ctx);
713 		ctx->c_closed = true;
714 		if (ctx->c_ref == 0) {
715 			// No open operations.  So close it.
716 			nni_id_remove(&ctx_ids, ctx->c_id);
717 			nni_list_remove(&sock->s_ctxs, ctx);
718 			nni_ctx_destroy(ctx);
719 		}
720 		// If still has a reference count, then wait for last
721 		// reference to close before nuking it.
722 	}
723 	nni_mtx_unlock(&sock_lk);
724 
725 	// Generally, unless the protocol is blocked trying to perform
726 	// writes (e.g. a slow reader on the other side), it should be
727 	// trying to shut things down.  We wait to give it
728 	// a chance to do so gracefully.
729 
730 	nni_mtx_lock(&sock_lk);
731 	while (!nni_list_empty(&sock->s_ctxs)) {
732 		sock->s_ctxwait = true;
733 		nni_cv_wait(&sock->s_close_cv);
734 	}
735 	nni_mtx_unlock(&sock_lk);
736 
737 	nni_mtx_lock(&sock->s_mx);
738 
739 	// At this point, we've done everything we politely can to
740 	// give the protocol a chance to flush its write side.  Now
741 	// its time to be a little more insistent.
742 
743 	// Close the upper queues immediately.  This can happen
744 	// safely while we hold the lock.
745 	nni_msgq_close(sock->s_urq);
746 	nni_msgq_close(sock->s_uwq);
747 
748 	// Go through the dialers and listeners, attempting to close them.
749 	// We might already have a close in progress, in which case
750 	// we skip past it; it will be removed from another thread.
751 	NNI_LIST_FOREACH (&sock->s_listeners, l) {
752 		if (nni_listener_hold(l) == 0) {
753 			nni_listener_close_rele(l);
754 		}
755 	}
756 	NNI_LIST_FOREACH (&sock->s_dialers, d) {
757 		if (nni_dialer_hold(d) == 0) {
758 			nni_dialer_close_rele(d);
759 		}
760 	}
761 
762 	// For each pipe, arrange for it to teardown hard.  We would
763 	// expect there not to be any here.  However, it is possible for
764 	// a pipe to have been added by an endpoint due to racing conditions
765 	// in the shutdown.  Therefore it is important that we shutdown pipes
766 	// *last*.
767 	NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
768 		nni_pipe_close(pipe);
769 	}
770 
771 	// We have to wait for *both* endpoints and pipes to be
772 	// removed.
773 	while ((!nni_list_empty(&sock->s_pipes)) ||
774 	    (!nni_list_empty(&sock->s_listeners)) ||
775 	    (!nni_list_empty(&sock->s_dialers))) {
776 		nni_cv_wait(&sock->s_cv);
777 	}
778 
779 	sock->s_sock_ops.sock_close(sock->s_data);
780 
781 	nni_cv_wake(&sock->s_cv);
782 
783 	NNI_ASSERT(nni_list_first(&sock->s_pipes) == NULL);
784 
785 	nni_mtx_unlock(&sock->s_mx);
786 
787 	// At this point, there are no threads blocked inside of us
788 	// that are referencing socket state.  User code should call
789 	// nng_close to release the last resources.
790 	return (0);
791 }
792 
793 // nni_sock_close shuts down the socket, then releases any resources
794 // associated with it.  It is a programmer error to reference the
795 // socket after this function is called, as the pointer may reference
796 // invalid memory or other objects.
797 void
nni_sock_close(nni_sock * s)798 nni_sock_close(nni_sock *s)
799 {
800 	// Shutdown everything if not already done.  This operation
801 	// is idempotent.
802 	nni_sock_shutdown(s);
803 
804 	nni_mtx_lock(&sock_lk);
805 	if (s->s_closed) {
806 		// Some other thread called close.  All we need to do
807 		// is drop our reference count.
808 		nni_mtx_unlock(&sock_lk);
809 		nni_sock_rele(s);
810 		return;
811 	}
812 	s->s_closed = true;
813 	nni_id_remove(&sock_ids, s->s_id);
814 
815 	// We might have been removed from the list already, e.g. by
816 	// nni_sock_closeall.  This is idempotent.
817 	nni_list_node_remove(&s->s_node);
818 
819 	// Wait for all other references to drop.  Note that we
820 	// have a reference already (from our caller).
821 	s->s_ctxwait = true;
822 	while ((s->s_ref > 1) || (!nni_list_empty(&s->s_ctxs))) {
823 		nni_cv_wait(&s->s_close_cv);
824 	}
825 	nni_mtx_unlock(&sock_lk);
826 
827 	// Because we already shut everything down before, we should not
828 	// have any child objects.
829 	nni_mtx_lock(&s->s_mx);
830 	NNI_ASSERT(nni_list_empty(&s->s_dialers));
831 	NNI_ASSERT(nni_list_empty(&s->s_listeners));
832 	NNI_ASSERT(nni_list_empty(&s->s_pipes));
833 	nni_mtx_unlock(&s->s_mx);
834 
835 	sock_destroy(s);
836 }
837 
838 void
nni_sock_closeall(void)839 nni_sock_closeall(void)
840 {
841 	nni_sock *s;
842 
843 	if (!inited) {
844 		return;
845 	}
846 	for (;;) {
847 		nni_mtx_lock(&sock_lk);
848 		if ((s = nni_list_first(&sock_list)) == NULL) {
849 			nni_mtx_unlock(&sock_lk);
850 			return;
851 		}
852 		// Bump the reference count.  The close call below
853 		// will drop it.
854 		s->s_ref++;
855 		nni_list_node_remove(&s->s_node);
856 		nni_mtx_unlock(&sock_lk);
857 		nni_sock_close(s);
858 	}
859 }
860 
861 void
nni_sock_send(nni_sock * sock,nni_aio * aio)862 nni_sock_send(nni_sock *sock, nni_aio *aio)
863 {
864 	nni_aio_normalize_timeout(aio, sock->s_sndtimeo);
865 	sock->s_sock_ops.sock_send(sock->s_data, aio);
866 }
867 
868 void
nni_sock_recv(nni_sock * sock,nni_aio * aio)869 nni_sock_recv(nni_sock *sock, nni_aio *aio)
870 {
871 	nni_aio_normalize_timeout(aio, sock->s_rcvtimeo);
872 	sock->s_sock_ops.sock_recv(sock->s_data, aio);
873 }
874 
875 // nni_sock_proto_id returns the socket's 16-bit protocol number.
876 uint16_t
nni_sock_proto_id(nni_sock * sock)877 nni_sock_proto_id(nni_sock *sock)
878 {
879 	return (sock->s_self_id.p_id);
880 }
881 
882 // nni_sock_peer_id returns the socket peer's 16-bit protocol number.
883 uint16_t
nni_sock_peer_id(nni_sock * sock)884 nni_sock_peer_id(nni_sock *sock)
885 {
886 	return (sock->s_peer_id.p_id);
887 }
888 
889 const char *
nni_sock_proto_name(nni_sock * sock)890 nni_sock_proto_name(nni_sock *sock)
891 {
892 	return (sock->s_self_id.p_name);
893 }
894 
895 const char *
nni_sock_peer_name(nni_sock * sock)896 nni_sock_peer_name(nni_sock *sock)
897 {
898 	return (sock->s_peer_id.p_name);
899 }
900 
901 struct nni_proto_pipe_ops *
nni_sock_proto_pipe_ops(nni_sock * sock)902 nni_sock_proto_pipe_ops(nni_sock *sock)
903 {
904 	return (&sock->s_pipe_ops);
905 }
906 
907 void *
nni_sock_proto_data(nni_sock * sock)908 nni_sock_proto_data(nni_sock *sock)
909 {
910 	return (sock->s_data);
911 }
912 
913 int
nni_sock_add_listener(nni_sock * s,nni_listener * l)914 nni_sock_add_listener(nni_sock *s, nni_listener *l)
915 {
916 	nni_sockopt *sopt;
917 
918 	nni_mtx_lock(&s->s_mx);
919 	if (s->s_closing) {
920 		nni_mtx_unlock(&s->s_mx);
921 		return (NNG_ECLOSED);
922 	}
923 
924 	NNI_LIST_FOREACH (&s->s_options, sopt) {
925 		int rv;
926 		rv = nni_listener_setopt(
927 		    l, sopt->name, sopt->data, sopt->sz, sopt->typ);
928 		if ((rv != 0) && (rv != NNG_ENOTSUP)) {
929 			nni_mtx_unlock(&s->s_mx);
930 			return (rv);
931 		}
932 	}
933 
934 	nni_list_append(&s->s_listeners, l);
935 
936 #ifdef NNG_ENABLE_STATS
937 	nni_stat_inc(&s->st_listeners, 1);
938 #endif
939 
940 	nni_mtx_unlock(&s->s_mx);
941 	return (0);
942 }
943 
944 int
nni_sock_add_dialer(nni_sock * s,nni_dialer * d)945 nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
946 {
947 	nni_sockopt *sopt;
948 
949 	nni_mtx_lock(&s->s_mx);
950 	if (s->s_closing) {
951 		nni_mtx_unlock(&s->s_mx);
952 		return (NNG_ECLOSED);
953 	}
954 
955 	NNI_LIST_FOREACH (&s->s_options, sopt) {
956 		int rv;
957 		rv = nni_dialer_setopt(
958 		    d, sopt->name, sopt->data, sopt->sz, sopt->typ);
959 		if ((rv != 0) && (rv != NNG_ENOTSUP)) {
960 			nni_mtx_unlock(&s->s_mx);
961 			return (rv);
962 		}
963 	}
964 
965 	nni_list_append(&s->s_dialers, d);
966 
967 #ifdef NNG_ENABLE_STATS
968 	nni_stat_inc(&s->st_dialers, 1);
969 #endif
970 
971 	nni_mtx_unlock(&s->s_mx);
972 	return (0);
973 }
974 
975 int
nni_sock_setopt(nni_sock * s,const char * name,const void * v,size_t sz,nni_type t)976 nni_sock_setopt(
977     nni_sock *s, const char *name, const void *v, size_t sz, nni_type t)
978 {
979 	int          rv;
980 	nni_sockopt *optv;
981 	nni_sockopt *oldv = NULL;
982 
983 	nni_mtx_lock(&s->s_mx);
984 	if (s->s_closing) {
985 		nni_mtx_unlock(&s->s_mx);
986 		return (NNG_ECLOSED);
987 	}
988 
989 	// Protocol options.  The protocol can override options that
990 	// the socket framework would otherwise supply, like buffer
991 	// sizes.
992 	rv = nni_setopt(s->s_sock_ops.sock_options, name, s->s_data, v, sz, t);
993 	if (rv != NNG_ENOTSUP) {
994 		nni_mtx_unlock(&s->s_mx);
995 		return (rv);
996 	}
997 
998 	// Some options do not go down to transports.  Handle them directly.
999 	rv = nni_setopt(sock_options, name, s, v, sz, t);
1000 	if (rv != NNG_ENOTSUP) {
1001 		nni_mtx_unlock(&s->s_mx);
1002 		return (rv);
1003 	}
1004 	nni_mtx_unlock(&s->s_mx);
1005 
1006 	// Validation of generic and transport options.
1007 	// NOTE: Setting transport options via socket is deprecated.
1008 	// These options should be set on the endpoint to which they apply.
1009 	if ((strcmp(name, NNG_OPT_RECONNMINT) == 0) ||
1010 	    (strcmp(name, NNG_OPT_RECONNMAXT) == 0)) {
1011 		if ((rv = nni_copyin_ms(NULL, v, sz, t)) != 0) {
1012 			return (rv);
1013 		}
1014 
1015 	} else if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) {
1016 		if ((rv = nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)) !=
1017 		    0) {
1018 			return (rv);
1019 		}
1020 
1021 #if !defined(NNG_ELIDE_DEPRECATED)
1022 		// TCP options, set via socket is deprecated.
1023 	} else if ((strcmp(name, NNG_OPT_TCP_KEEPALIVE) == 0) ||
1024 	    (strcmp(name, NNG_OPT_TCP_NODELAY)) == 0) {
1025 		if ((rv = nni_copyin_bool(NULL, v, sz, t)) != 0) {
1026 			return (rv);
1027 		}
1028 #endif
1029 
1030 #if defined(NNG_SUPP_TLS) && !defined(NNG_ELIDE_DEPRECATED)
1031 		// TLS options may not be supported if TLS is not
1032 		// compiled in.  Supporting all these is deprecated.
1033 	} else if (strcmp(name, NNG_OPT_TLS_CONFIG) == 0) {
1034 		if ((rv = nni_copyin_ptr(NULL, v, sz, t)) != 0) {
1035 			return (rv);
1036 		}
1037 	} else if ((strcmp(name, NNG_OPT_TLS_SERVER_NAME) == 0) ||
1038 	    (strcmp(name, NNG_OPT_TLS_CA_FILE) == 0) ||
1039 	    (strcmp(name, NNG_OPT_TLS_CERT_KEY_FILE) == 0)) {
1040 		if ((t != NNI_TYPE_OPAQUE) && (t != NNI_TYPE_STRING)) {
1041 			return (NNG_EBADTYPE);
1042 		}
1043 		if (nni_strnlen(v, sz) >= sz) {
1044 			return (NNG_EINVAL);
1045 		}
1046 	} else if ((strcmp(name, NNG_OPT_TLS_AUTH_MODE) == 0)) {
1047 		// 0, 1, or 2 (none, optional, required)
1048 		if ((rv = nni_copyin_int(NULL, v, sz, 0, 2, t)) != 0) {
1049 			return (rv);
1050 		}
1051 #endif
1052 
1053 #if defined(NNG_PLATFORM_POSIX) && !defined(NNG_SUPPRESS_DEPRECATED)
1054 	} else if (strcmp(name, NNG_OPT_IPC_PERMISSIONS) == 0) {
1055 		// UNIX mode bits are 0777, but allow set id and sticky bits
1056 		if ((rv = nni_copyin_int(NULL, v, sz, 0, 07777, t)) != 0) {
1057 			return (rv);
1058 		}
1059 #endif
1060 
1061 #if defined(NNG_PLATFORM_WINDOWS) && !defined(NNG_SUPPRESS_DEPRECATED)
1062 	} else if (strcmp(name, NNG_OPT_IPC_SECURITY_DESCRIPTOR) == 0) {
1063 		if ((rv = nni_copyin_ptr(NULL, v, sz, t)) == 0) {
1064 			return (rv);
1065 		}
1066 #endif
1067 	}
1068 
1069 	// Prepare a copy of the socket option.
1070 	if ((optv = NNI_ALLOC_STRUCT(optv)) == NULL) {
1071 		return (NNG_ENOMEM);
1072 	}
1073 	if ((optv->data = nni_alloc(sz)) == NULL) {
1074 		NNI_FREE_STRUCT(optv);
1075 		return (NNG_ENOMEM);
1076 	}
1077 	if ((optv->name = nni_strdup(name)) == NULL) {
1078 		nni_free(optv->data, sz);
1079 		NNI_FREE_STRUCT(optv);
1080 		return (NNG_ENOMEM);
1081 	}
1082 	memcpy(optv->data, v, sz);
1083 	optv->sz  = sz;
1084 	optv->typ = t;
1085 	NNI_LIST_NODE_INIT(&optv->node);
1086 
1087 	nni_mtx_lock(&s->s_mx);
1088 	NNI_LIST_FOREACH (&s->s_options, oldv) {
1089 		if (strcmp(oldv->name, name) == 0) {
1090 			if ((oldv->sz != sz) ||
1091 			    (memcmp(oldv->data, v, sz) != 0)) {
1092 				break;
1093 			}
1094 
1095 			// The values are the same.  This is a no-op.
1096 			nni_mtx_unlock(&s->s_mx);
1097 			nni_free_opt(optv);
1098 			return (0);
1099 		}
1100 	}
1101 
1102 #ifndef NNG_ELIDE_DEPRCATED
1103 	nni_dialer *  d;
1104 	nni_listener *l;
1105 
1106 	// Apply the options.  Failure to set any option on any
1107 	// transport (other than ENOTSUP) stops the operation
1108 	// altogether.  Its important that transport wide checks
1109 	// properly pre-validate.
1110 	NNI_LIST_FOREACH (&s->s_listeners, l) {
1111 		int x;
1112 		x = nni_listener_setopt(l, optv->name, optv->data, sz, t);
1113 		if (x != NNG_ENOTSUP) {
1114 			if ((rv = x) != 0) {
1115 				nni_mtx_unlock(&s->s_mx);
1116 				nni_free_opt(optv);
1117 				return (rv);
1118 			}
1119 		}
1120 	}
1121 	NNI_LIST_FOREACH (&s->s_dialers, d) {
1122 		int x;
1123 		x = nni_dialer_setopt(d, optv->name, optv->data, sz, t);
1124 		if (x != NNG_ENOTSUP) {
1125 			if ((rv = x) != 0) {
1126 				nni_mtx_unlock(&s->s_mx);
1127 				nni_free_opt(optv);
1128 				return (rv);
1129 			}
1130 		}
1131 	}
1132 #endif
1133 
1134 	if (rv == 0) {
1135 		// Remove and toss the old value; we are using a new one.
1136 		if (oldv != NULL) {
1137 			nni_list_remove(&s->s_options, oldv);
1138 			nni_free_opt(oldv);
1139 		}
1140 
1141 		// Insert our new value.  This permits it to be
1142 		// compared against later, and for new endpoints to
1143 		// automatically receive these values,
1144 		nni_list_append(&s->s_options, optv);
1145 	} else {
1146 		nni_free_opt(optv);
1147 	}
1148 
1149 	nni_mtx_unlock(&s->s_mx);
1150 	return (rv);
1151 }
1152 
1153 int
nni_sock_getopt(nni_sock * s,const char * name,void * val,size_t * szp,nni_type t)1154 nni_sock_getopt(
1155     nni_sock *s, const char *name, void *val, size_t *szp, nni_type t)
1156 {
1157 	int          rv;
1158 	nni_sockopt *sopt;
1159 
1160 	nni_mtx_lock(&s->s_mx);
1161 	if (s->s_closing) {
1162 		nni_mtx_unlock(&s->s_mx);
1163 		return (NNG_ECLOSED);
1164 	}
1165 
1166 	// Protocol specific options.  The protocol can override
1167 	// options like the send buffer or notification descriptors
1168 	// this way.
1169 	rv = nni_getopt(
1170 	    s->s_sock_ops.sock_options, name, s->s_data, val, szp, t);
1171 	if (rv != NNG_ENOTSUP) {
1172 		nni_mtx_unlock(&s->s_mx);
1173 		return (rv);
1174 	}
1175 
1176 	// Socket generic options.
1177 	rv = nni_getopt(sock_options, name, s, val, szp, t);
1178 	if (rv != NNG_ENOTSUP) {
1179 		nni_mtx_unlock(&s->s_mx);
1180 		return (rv);
1181 	}
1182 
1183 	NNI_LIST_FOREACH (&s->s_options, sopt) {
1184 		if (strcmp(sopt->name, name) == 0) {
1185 			size_t sz = sopt->sz;
1186 
1187 			if ((sopt->typ != NNI_TYPE_OPAQUE) &&
1188 			    (t != sopt->typ)) {
1189 
1190 				if (t != NNI_TYPE_OPAQUE) {
1191 					nni_mtx_unlock(&s->s_mx);
1192 					return (NNG_EBADTYPE);
1193 				}
1194 				if (*szp != sopt->sz) {
1195 					nni_mtx_unlock(&s->s_mx);
1196 					return (NNG_EINVAL);
1197 				}
1198 			}
1199 
1200 			if (szp != NULL) {
1201 				if (sopt->sz > *szp) {
1202 					sz = *szp;
1203 				}
1204 				*szp = sopt->sz;
1205 			}
1206 			memcpy(val, sopt->data, sz);
1207 			rv = 0;
1208 			break;
1209 		}
1210 	}
1211 
1212 	nni_mtx_unlock(&s->s_mx);
1213 	return (rv);
1214 }
1215 
1216 uint32_t
nni_sock_flags(nni_sock * sock)1217 nni_sock_flags(nni_sock *sock)
1218 {
1219 	return (sock->s_flags);
1220 }
1221 
1222 void
nni_sock_set_pipe_cb(nni_sock * s,int ev,nng_pipe_cb cb,void * arg)1223 nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg)
1224 {
1225 	if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
1226 		nni_mtx_lock(&s->s_pipe_cbs_mtx);
1227 		s->s_pipe_cbs[ev].cb_fn  = cb;
1228 		s->s_pipe_cbs[ev].cb_arg = arg;
1229 		nni_mtx_unlock(&s->s_pipe_cbs_mtx);
1230 	}
1231 }
1232 
1233 int
nni_ctx_find(nni_ctx ** cp,uint32_t id,bool closing)1234 nni_ctx_find(nni_ctx **cp, uint32_t id, bool closing)
1235 {
1236 	int      rv;
1237 	nni_ctx *ctx;
1238 
1239 	if ((rv = nni_init()) != 0) {
1240 		return (rv);
1241 	}
1242 	nni_mtx_lock(&sock_lk);
1243 	if ((ctx = nni_id_get(&ctx_ids, id)) != NULL) {
1244 		// We refuse a reference if either the socket is
1245 		// closed, or the context is closed.  (If the socket
1246 		// is closed, and we are only getting the reference so
1247 		// we can close it, then we still allow.  In the case
1248 		// the only valid operation will be to close the
1249 		// socket.)
1250 		if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) {
1251 			rv = NNG_ECLOSED;
1252 		} else {
1253 			ctx->c_ref++;
1254 			*cp = ctx;
1255 		}
1256 	} else {
1257 		rv = NNG_ECLOSED;
1258 	}
1259 	nni_mtx_unlock(&sock_lk);
1260 
1261 	return (rv);
1262 }
1263 
1264 static void
nni_ctx_destroy(nni_ctx * ctx)1265 nni_ctx_destroy(nni_ctx *ctx)
1266 {
1267 	if (ctx->c_data != NULL) {
1268 		ctx->c_ops.ctx_fini(ctx->c_data);
1269 	}
1270 
1271 	// Let the socket go, our hold on it is done.
1272 	nni_free(ctx, ctx->c_size);
1273 }
1274 
1275 void
nni_ctx_rele(nni_ctx * ctx)1276 nni_ctx_rele(nni_ctx *ctx)
1277 {
1278 	nni_sock *sock = ctx->c_sock;
1279 	nni_mtx_lock(&sock_lk);
1280 	ctx->c_ref--;
1281 	if ((ctx->c_ref > 0) || (!ctx->c_closed)) {
1282 		// Either still have an active reference, or not
1283 		// actually closing yet.
1284 		nni_mtx_unlock(&sock_lk);
1285 		return;
1286 	}
1287 
1288 	// Remove us from the hash, so we can't be found any more.
1289 	// This allows our ID to be reused later, although the system
1290 	// tries to avoid ID reuse.
1291 	nni_id_remove(&ctx_ids, ctx->c_id);
1292 	nni_list_remove(&sock->s_ctxs, ctx);
1293 	if (sock->s_closed || sock->s_ctxwait) {
1294 		nni_cv_wake(&sock->s_close_cv);
1295 	}
1296 	nni_mtx_unlock(&sock_lk);
1297 
1298 	nni_ctx_destroy(ctx);
1299 }
1300 
1301 int
nni_ctx_open(nni_ctx ** ctxp,nni_sock * sock)1302 nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
1303 {
1304 	nni_ctx *ctx;
1305 	int      rv;
1306 	size_t   sz;
1307 
1308 	if (sock->s_ctx_ops.ctx_init == NULL) {
1309 		return (NNG_ENOTSUP);
1310 	}
1311 
1312 	sz = NNI_ALIGN_UP(sizeof(*ctx)) + sock->s_ctx_ops.ctx_size;
1313 	if ((ctx = nni_zalloc(sz)) == NULL) {
1314 		return (NNG_ENOMEM);
1315 	}
1316 	ctx->c_size     = sz;
1317 	ctx->c_data     = ctx + 1;
1318 	ctx->c_closed   = false;
1319 	ctx->c_ref      = 1; // Caller implicitly gets a reference.
1320 	ctx->c_sock     = sock;
1321 	ctx->c_ops      = sock->s_ctx_ops;
1322 	ctx->c_rcvtimeo = sock->s_rcvtimeo;
1323 	ctx->c_sndtimeo = sock->s_sndtimeo;
1324 
1325 	nni_mtx_lock(&sock_lk);
1326 	if (sock->s_closed) {
1327 		nni_mtx_unlock(&sock_lk);
1328 		nni_free(ctx, ctx->c_size);
1329 		return (NNG_ECLOSED);
1330 	}
1331 	if ((rv = nni_id_alloc(&ctx_ids, &ctx->c_id, ctx)) != 0) {
1332 		nni_mtx_unlock(&sock_lk);
1333 		nni_free(ctx, ctx->c_size);
1334 		return (rv);
1335 	}
1336 
1337 	if ((rv = sock->s_ctx_ops.ctx_init(ctx->c_data, sock->s_data)) != 0) {
1338 		nni_id_remove(&ctx_ids, ctx->c_id);
1339 		nni_mtx_unlock(&sock_lk);
1340 		nni_free(ctx, ctx->c_size);
1341 		return (rv);
1342 	}
1343 
1344 	nni_list_append(&sock->s_ctxs, ctx);
1345 	nni_mtx_unlock(&sock_lk);
1346 
1347 	// Paranoia, fixing a possible race in close.  Don't let us
1348 	// give back a context if the socket is being shutdown (it
1349 	// might not have reached the "closed" state yet.)
1350 	nni_mtx_lock(&sock->s_mx);
1351 	if (sock->s_closing) {
1352 		nni_mtx_unlock(&sock->s_mx);
1353 		nni_ctx_rele(ctx);
1354 		return (NNG_ECLOSED);
1355 	}
1356 	nni_mtx_unlock(&sock->s_mx);
1357 	*ctxp = ctx;
1358 
1359 	return (0);
1360 }
1361 
1362 void
nni_ctx_close(nni_ctx * ctx)1363 nni_ctx_close(nni_ctx *ctx)
1364 {
1365 	nni_mtx_lock(&sock_lk);
1366 	ctx->c_closed = true;
1367 	nni_mtx_unlock(&sock_lk);
1368 
1369 	nni_ctx_rele(ctx);
1370 }
1371 
1372 uint32_t
nni_ctx_id(nni_ctx * ctx)1373 nni_ctx_id(nni_ctx *ctx)
1374 {
1375 	return (ctx->c_id);
1376 }
1377 
1378 void
nni_ctx_send(nni_ctx * ctx,nni_aio * aio)1379 nni_ctx_send(nni_ctx *ctx, nni_aio *aio)
1380 {
1381 	nni_aio_normalize_timeout(aio, ctx->c_sndtimeo);
1382 	ctx->c_ops.ctx_send(ctx->c_data, aio);
1383 }
1384 
1385 void
nni_ctx_recv(nni_ctx * ctx,nni_aio * aio)1386 nni_ctx_recv(nni_ctx *ctx, nni_aio *aio)
1387 {
1388 	nni_aio_normalize_timeout(aio, ctx->c_rcvtimeo);
1389 	ctx->c_ops.ctx_recv(ctx->c_data, aio);
1390 }
1391 
1392 int
nni_ctx_getopt(nni_ctx * ctx,const char * opt,void * v,size_t * szp,nni_type t)1393 nni_ctx_getopt(nni_ctx *ctx, const char *opt, void *v, size_t *szp, nni_type t)
1394 {
1395 	nni_sock *  sock = ctx->c_sock;
1396 	nni_option *o;
1397 	int         rv = NNG_ENOTSUP;
1398 
1399 	nni_mtx_lock(&sock->s_mx);
1400 	if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
1401 		rv = nni_copyout_ms(ctx->c_rcvtimeo, v, szp, t);
1402 	} else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
1403 		rv = nni_copyout_ms(ctx->c_sndtimeo, v, szp, t);
1404 	} else if (ctx->c_ops.ctx_options != NULL) {
1405 		for (o = ctx->c_ops.ctx_options; o->o_name != NULL; o++) {
1406 			if (strcmp(opt, o->o_name) != 0) {
1407 				continue;
1408 			}
1409 			if (o->o_get == NULL) {
1410 				rv = NNG_EWRITEONLY;
1411 				break;
1412 			}
1413 			rv = o->o_get(ctx->c_data, v, szp, t);
1414 			break;
1415 		}
1416 	}
1417 	nni_mtx_unlock(&sock->s_mx);
1418 	return (rv);
1419 }
1420 
1421 int
nni_ctx_setopt(nni_ctx * ctx,const char * opt,const void * v,size_t sz,nni_type t)1422 nni_ctx_setopt(
1423     nni_ctx *ctx, const char *opt, const void *v, size_t sz, nni_type t)
1424 {
1425 	nni_sock *  sock = ctx->c_sock;
1426 	nni_option *o;
1427 	int         rv = NNG_ENOTSUP;
1428 
1429 	nni_mtx_lock(&sock->s_mx);
1430 	if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
1431 		rv = nni_copyin_ms(&ctx->c_rcvtimeo, v, sz, t);
1432 	} else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
1433 		rv = nni_copyin_ms(&ctx->c_sndtimeo, v, sz, t);
1434 	} else if (ctx->c_ops.ctx_options != NULL) {
1435 		for (o = ctx->c_ops.ctx_options; o->o_name != NULL; o++) {
1436 			if (strcmp(opt, o->o_name) != 0) {
1437 				continue;
1438 			}
1439 			if (o->o_set == NULL) {
1440 				rv = NNG_EREADONLY;
1441 				break;
1442 			}
1443 			rv = o->o_set(ctx->c_data, v, sz, t);
1444 			break;
1445 		}
1446 	}
1447 
1448 	nni_mtx_unlock(&sock->s_mx);
1449 	return (rv);
1450 }
1451 
1452 static void
dialer_timer_start_locked(nni_dialer * d)1453 dialer_timer_start_locked(nni_dialer *d)
1454 {
1455 	nni_duration back_off;
1456 	nni_sock *   sock = d->d_sock;
1457 
1458 	if (d->d_closing || sock->s_closed) {
1459 		return;
1460 	}
1461 	back_off = d->d_currtime;
1462 	if (d->d_maxrtime > 0) {
1463 		d->d_currtime *= 2;
1464 		if (d->d_currtime > d->d_maxrtime) {
1465 			d->d_currtime = d->d_maxrtime;
1466 		}
1467 	}
1468 
1469 	// To minimize damage from storms, etc., we select a back-off
1470 	// value randomly, in the range of [0, back_off-1]; this is
1471 	// pretty similar to 802 style back-off, except that we have a
1472 	// nearly uniform time period instead of discrete slot times.
1473 	// This algorithm may lead to slight biases because we don't
1474 	// have a statistically perfect distribution with the modulo of
1475 	// the random number, but this really doesn't matter.
1476 	nni_sleep_aio(
1477 	    back_off ? (int) nni_random() % back_off : 0, &d->d_tmo_aio);
1478 }
1479 
1480 void
nni_dialer_timer_start(nni_dialer * d)1481 nni_dialer_timer_start(nni_dialer *d)
1482 {
1483 	nni_sock *s = d->d_sock;
1484 	nni_mtx_lock(&s->s_mx);
1485 	dialer_timer_start_locked(d);
1486 	nni_mtx_unlock(&s->s_mx);
1487 }
1488 
1489 void
nni_dialer_add_pipe(nni_dialer * d,void * tpipe)1490 nni_dialer_add_pipe(nni_dialer *d, void *tpipe)
1491 {
1492 	nni_sock *s = d->d_sock;
1493 	nni_pipe *p;
1494 
1495 	nni_mtx_lock(&s->s_mx);
1496 
1497 	if (s->s_closed || d->d_closing) {
1498 		d->d_tran->tran_pipe->p_fini(tpipe);
1499 		nni_mtx_unlock(&s->s_mx);
1500 		return;
1501 	}
1502 	if (nni_pipe_create_dialer(&p, d, tpipe) != 0) {
1503 		nni_mtx_unlock(&s->s_mx);
1504 		return;
1505 	}
1506 
1507 	nni_list_append(&d->d_pipes, p);
1508 	nni_list_append(&s->s_pipes, p);
1509 	d->d_pipe     = p;
1510 	d->d_currtime = d->d_inirtime;
1511 	nni_mtx_unlock(&s->s_mx);
1512 #ifdef NNG_ENABLE_STATS
1513 	nni_stat_inc(&s->st_pipes, 1);
1514 	nni_stat_inc(&d->st_pipes, 1);
1515 #endif
1516 
1517 	nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
1518 
1519 	nni_mtx_lock(&s->s_mx);
1520 	if (p->p_closed) {
1521 		nni_mtx_unlock(&s->s_mx);
1522 #ifdef NNG_ENABLE_STATS
1523 		nni_stat_inc(&d->st_reject, 1);
1524 		nni_stat_inc(&s->st_rejects, 1);
1525 #endif
1526 		nni_pipe_rele(p);
1527 		return;
1528 	}
1529 	if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
1530 		nni_mtx_unlock(&s->s_mx);
1531 #ifdef NNG_ENABLE_STATS
1532 		nni_stat_inc(&d->st_reject, 1);
1533 		nni_stat_inc(&s->st_rejects, 1);
1534 #endif
1535 		nni_pipe_close(p);
1536 		nni_pipe_rele(p);
1537 		return;
1538 	}
1539 	nni_mtx_unlock(&s->s_mx);
1540 #ifdef NNG_ENABLE_STATS
1541 	nni_stat_register(&p->st_root);
1542 #endif
1543 	nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
1544 	nni_pipe_rele(p);
1545 }
1546 
1547 static void
dialer_shutdown_impl(nni_dialer * d)1548 dialer_shutdown_impl(nni_dialer *d)
1549 {
1550 	nni_pipe *p;
1551 
1552 	// Abort any remaining in-flight operations.
1553 	nni_aio_close(&d->d_con_aio);
1554 	nni_aio_close(&d->d_tmo_aio);
1555 
1556 	// Stop the underlying transport.
1557 	d->d_ops.d_close(d->d_data);
1558 
1559 	NNI_LIST_FOREACH (&d->d_pipes, p) {
1560 		nni_pipe_close(p);
1561 	}
1562 }
1563 
1564 static void
dialer_shutdown_locked(nni_dialer * d)1565 dialer_shutdown_locked(nni_dialer *d)
1566 {
1567 	if (!d->d_closing) {
1568 		d->d_closing = true;
1569 		dialer_shutdown_impl(d);
1570 	}
1571 }
1572 
1573 void
nni_dialer_shutdown(nni_dialer * d)1574 nni_dialer_shutdown(nni_dialer *d)
1575 {
1576 	nni_sock *s = d->d_sock;
1577 	nni_mtx_lock(&s->s_mx);
1578 	dialer_shutdown_locked(d);
1579 	nni_mtx_unlock(&s->s_mx);
1580 }
1581 
1582 static void dialer_reap(void *);
1583 
1584 static nni_reap_list dialer_reap_list = {
1585 	.rl_offset = offsetof(nni_dialer, d_reap),
1586 	.rl_func   = dialer_reap,
1587 };
1588 
1589 static void
dialer_reap(void * arg)1590 dialer_reap(void *arg)
1591 {
1592 	nni_dialer *d = arg;
1593 	nni_sock *  s = d->d_sock;
1594 
1595 	nni_aio_stop(&d->d_tmo_aio);
1596 	nni_aio_stop(&d->d_con_aio);
1597 
1598 #ifdef NNG_ENABLE_STATS
1599 	nni_stat_unregister(&d->st_root);
1600 #endif
1601 
1602 	nni_mtx_lock(&s->s_mx);
1603 	if (!nni_list_empty(&d->d_pipes)) {
1604 		nni_pipe *p;
1605 		// This should already have been done, but be certain!
1606 		NNI_LIST_FOREACH (&d->d_pipes, p) {
1607 			nni_pipe_close(p);
1608 		}
1609 		nni_mtx_unlock(&s->s_mx);
1610 		// Go back to the end of reap list.
1611 		nni_dialer_reap(d);
1612 		return;
1613 	}
1614 
1615 	nni_list_remove(&s->s_dialers, d);
1616 	if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
1617 		nni_cv_wake(&s->s_cv);
1618 	}
1619 
1620 	nni_mtx_unlock(&s->s_mx);
1621 
1622 	nni_dialer_destroy(d);
1623 }
1624 
1625 void
nni_dialer_reap(nni_dialer * d)1626 nni_dialer_reap(nni_dialer *d)
1627 {
1628 	nni_reap(&dialer_reap_list, d);
1629 }
1630 
1631 void
nni_listener_add_pipe(nni_listener * l,void * tpipe)1632 nni_listener_add_pipe(nni_listener *l, void *tpipe)
1633 {
1634 	nni_sock *s = l->l_sock;
1635 	nni_pipe *p;
1636 
1637 	nni_mtx_lock(&s->s_mx);
1638 	if (s->s_closed || l->l_closing) {
1639 		l->l_tran->tran_pipe->p_fini(tpipe);
1640 		nni_mtx_unlock(&s->s_mx);
1641 		return;
1642 	}
1643 
1644 	if (nni_pipe_create_listener(&p, l, tpipe) != 0) {
1645 		nni_mtx_unlock(&s->s_mx);
1646 		return;
1647 	}
1648 
1649 	nni_list_append(&l->l_pipes, p);
1650 	nni_list_append(&s->s_pipes, p);
1651 	nni_mtx_unlock(&s->s_mx);
1652 #ifdef NNG_ENABLE_STATS
1653 	nni_stat_inc(&l->st_pipes, 1);
1654 	nni_stat_inc(&s->st_pipes, 1);
1655 #endif
1656 
1657 	nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_PRE);
1658 
1659 	nni_mtx_lock(&s->s_mx);
1660 	if (p->p_closed) {
1661 		nni_mtx_unlock(&s->s_mx);
1662 #ifdef NNG_ENABLE_STATS
1663 		nni_stat_inc(&l->st_reject, 1);
1664 		nni_stat_inc(&s->st_rejects, 1);
1665 #endif
1666 		nni_pipe_rele(p);
1667 		return;
1668 	}
1669 	if (p->p_proto_ops.pipe_start(p->p_proto_data) != 0) {
1670 		nni_mtx_unlock(&s->s_mx);
1671 #ifdef NNG_ENABLE_STATS
1672 		nni_stat_inc(&l->st_reject, 1);
1673 		nni_stat_inc(&s->st_rejects, 1);
1674 #endif
1675 		nni_pipe_close(p);
1676 		nni_pipe_rele(p);
1677 		return;
1678 	}
1679 	nni_mtx_unlock(&s->s_mx);
1680 #ifdef NNG_ENABLE_STATS
1681 	nni_stat_register(&p->st_root);
1682 #endif
1683 	nni_pipe_run_cb(p, NNG_PIPE_EV_ADD_POST);
1684 	nni_pipe_rele(p);
1685 }
1686 
1687 static void
listener_shutdown_impl(nni_listener * l)1688 listener_shutdown_impl(nni_listener *l)
1689 {
1690 	nni_pipe *p;
1691 
1692 	// Abort any remaining in-flight accepts.
1693 	nni_aio_close(&l->l_acc_aio);
1694 	nni_aio_close(&l->l_tmo_aio);
1695 
1696 	// Stop the underlying transport.
1697 	l->l_ops.l_close(l->l_data);
1698 
1699 	NNI_LIST_FOREACH (&l->l_pipes, p) {
1700 		nni_pipe_close(p);
1701 	}
1702 }
1703 
1704 static void
listener_shutdown_locked(nni_listener * l)1705 listener_shutdown_locked(nni_listener *l)
1706 {
1707 	if (!l->l_closing) {
1708 		l->l_closing = true;
1709 		listener_shutdown_impl(l);
1710 	}
1711 }
1712 
1713 void
nni_listener_shutdown(nni_listener * l)1714 nni_listener_shutdown(nni_listener *l)
1715 {
1716 	nni_sock *s = l->l_sock;
1717 
1718 	nni_mtx_lock(&s->s_mx);
1719 	listener_shutdown_locked(l);
1720 	nni_mtx_unlock(&s->s_mx);
1721 }
1722 
1723 static void listener_reap(void *);
1724 
1725 static nni_reap_list listener_reap_list = {
1726 	.rl_offset = offsetof(nni_listener, l_reap),
1727 	.rl_func   = listener_reap,
1728 };
1729 
1730 static void
listener_reap(void * arg)1731 listener_reap(void *arg)
1732 {
1733 	nni_listener *l = arg;
1734 	nni_sock *    s = l->l_sock;
1735 
1736 	nni_aio_stop(&l->l_tmo_aio);
1737 	nni_aio_stop(&l->l_acc_aio);
1738 
1739 #ifdef NNG_ENABLE_STATS
1740 	nni_stat_unregister(&l->st_root);
1741 #endif
1742 
1743 	nni_mtx_lock(&s->s_mx);
1744 	if (!nni_list_empty(&l->l_pipes)) {
1745 		nni_pipe *p;
1746 		// This should already have been done, but be certain!
1747 		NNI_LIST_FOREACH (&l->l_pipes, p) {
1748 			nni_pipe_close(p);
1749 		}
1750 		nni_mtx_unlock(&s->s_mx);
1751 		// Go back to the end of reap list.
1752 		nni_reap(&listener_reap_list, l);
1753 		return;
1754 	}
1755 
1756 	nni_list_remove(&s->s_listeners, l);
1757 	if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
1758 		nni_cv_wake(&s->s_cv);
1759 	}
1760 
1761 	nni_mtx_unlock(&s->s_mx);
1762 
1763 	nni_listener_destroy(l);
1764 }
1765 
1766 void
nni_listener_reap(nni_listener * l)1767 nni_listener_reap(nni_listener *l)
1768 {
1769 	nni_reap(&listener_reap_list, l);
1770 }
1771 
1772 void
nni_pipe_run_cb(nni_pipe * p,nng_pipe_ev ev)1773 nni_pipe_run_cb(nni_pipe *p, nng_pipe_ev ev)
1774 {
1775 	nni_sock *  s = p->p_sock;
1776 	nng_pipe_cb cb;
1777 	void *      arg;
1778 
1779 	nni_mtx_lock(&s->s_pipe_cbs_mtx);
1780 	if (!p->p_cbs) {
1781 		if (ev == NNG_PIPE_EV_ADD_PRE) {
1782 			// First event, after this we want all other events.
1783 			p->p_cbs = true;
1784 		} else {
1785 			nni_mtx_unlock(&s->s_pipe_cbs_mtx);
1786 			return;
1787 		}
1788 	}
1789 	cb  = s->s_pipe_cbs[ev].cb_fn;
1790 	arg = s->s_pipe_cbs[ev].cb_arg;
1791 	nni_mtx_unlock(&s->s_pipe_cbs_mtx);
1792 
1793 	if (cb != NULL) {
1794 		nng_pipe pid;
1795 		pid.id = p->p_id;
1796 		cb(pid, ev, arg);
1797 	}
1798 }
1799 
1800 void
nni_pipe_remove(nni_pipe * p)1801 nni_pipe_remove(nni_pipe *p)
1802 {
1803 	nni_sock *  s = p->p_sock;
1804 	nni_dialer *d = p->p_dialer;
1805 
1806 	nni_mtx_lock(&s->s_mx);
1807 #ifdef NNG_ENABLE_STATS
1808 	if (nni_list_node_active(&p->p_sock_node)) {
1809 		nni_stat_dec(&s->st_pipes, 1);
1810 	}
1811 	if (p->p_listener != NULL) {
1812 		nni_stat_dec(&p->p_listener->st_pipes, 1);
1813 	}
1814 	if (p->p_dialer != NULL) {
1815 		nni_stat_dec(&p->p_dialer->st_pipes, 1);
1816 	}
1817 #endif
1818 	nni_list_node_remove(&p->p_sock_node);
1819 	nni_list_node_remove(&p->p_ep_node);
1820 	p->p_listener = NULL;
1821 	p->p_dialer   = NULL;
1822 	if ((d != NULL) && (d->d_pipe == p)) {
1823 		d->d_pipe = NULL;
1824 		dialer_timer_start_locked(d); // Kick the timer to redial.
1825 	}
1826 	if (s->s_closing) {
1827 		nni_cv_wake(&s->s_cv);
1828 	}
1829 	nni_mtx_unlock(&s->s_mx);
1830 }
1831 
1832 void
nni_sock_add_stat(nni_sock * s,nni_stat_item * stat)1833 nni_sock_add_stat(nni_sock *s, nni_stat_item *stat)
1834 {
1835 #ifdef NNG_ENABLE_STATS
1836 	nni_stat_add(&s->st_root, stat);
1837 #else
1838 	NNI_ARG_UNUSED(s);
1839 	NNI_ARG_UNUSED(stat);
1840 #endif
1841 }
1842 
1843 void
nni_sock_bump_tx(nni_sock * s,uint64_t sz)1844 nni_sock_bump_tx(nni_sock *s, uint64_t sz)
1845 {
1846 #ifdef NNG_ENABLE_STATS
1847 	nni_stat_inc(&s->st_tx_msgs, 1);
1848 	nni_stat_inc(&s->st_tx_bytes, sz);
1849 #else
1850 	NNI_ARG_UNUSED(s);
1851 	NNI_ARG_UNUSED(sz);
1852 #endif
1853 }
1854 
1855 void
nni_sock_bump_rx(nni_sock * s,uint64_t sz)1856 nni_sock_bump_rx(nni_sock *s, uint64_t sz)
1857 {
1858 #ifdef NNG_ENABLE_STATS
1859 	nni_stat_inc(&s->st_rx_msgs, 1);
1860 	nni_stat_inc(&s->st_rx_bytes, sz);
1861 #else
1862 	NNI_ARG_UNUSED(s);
1863 	NNI_ARG_UNUSED(sz);
1864 #endif
1865 }
1866