1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 //
4 // This software is supplied under the terms of the MIT License, a
5 // copy of which should be located in the distribution where this
6 // file was obtained (LICENSE.txt).  A copy of the license may also be
7 // found online at https://opensource.org/licenses/MIT.
8 //
9 
10 // This provides an abstraction for byte streams, allowing polymorphic
11 // use of them in rather flexible contexts.
12 
13 #include <string.h>
14 
15 #include "core/nng_impl.h"
16 #include <nng/supplemental/tls/tls.h>
17 
18 #include "core/tcp.h"
19 #include "supplemental/tls/tls_api.h"
20 #include "supplemental/websocket/websocket.h"
21 
22 static struct {
23 	const char *scheme;
24 	int (*dialer_alloc)(nng_stream_dialer **, const nng_url *);
25 	int (*listener_alloc)(nng_stream_listener **, const nng_url *);
26 
27 } stream_drivers[] = {
28 	{
29 	    .scheme         = "ipc",
30 	    .dialer_alloc   = nni_ipc_dialer_alloc,
31 	    .listener_alloc = nni_ipc_listener_alloc,
32 	},
33 #ifdef NNG_PLATFORM_POSIX
34 	{
35 	    .scheme         = "unix",
36 	    .dialer_alloc   = nni_ipc_dialer_alloc,
37 	    .listener_alloc = nni_ipc_listener_alloc,
38 	},
39 #endif
40 #ifdef NNG_HAVE_ABSTRACT_SOCKETS
41 	{
42 	    .scheme         = "abstract",
43 	    .dialer_alloc   = nni_ipc_dialer_alloc,
44 	    .listener_alloc = nni_ipc_listener_alloc,
45 	},
46 #endif
47 	{
48 	    .scheme         = "tcp",
49 	    .dialer_alloc   = nni_tcp_dialer_alloc,
50 	    .listener_alloc = nni_tcp_listener_alloc,
51 	},
52 	{
53 	    .scheme         = "tcp4",
54 	    .dialer_alloc   = nni_tcp_dialer_alloc,
55 	    .listener_alloc = nni_tcp_listener_alloc,
56 	},
57 	{
58 	    .scheme         = "tcp6",
59 	    .dialer_alloc   = nni_tcp_dialer_alloc,
60 	    .listener_alloc = nni_tcp_listener_alloc,
61 	},
62 	{
63 	    .scheme         = "tls+tcp",
64 	    .dialer_alloc   = nni_tls_dialer_alloc,
65 	    .listener_alloc = nni_tls_listener_alloc,
66 	},
67 	{
68 	    .scheme         = "tls+tcp4",
69 	    .dialer_alloc   = nni_tls_dialer_alloc,
70 	    .listener_alloc = nni_tls_listener_alloc,
71 	},
72 	{
73 	    .scheme         = "tls+tcp6",
74 	    .dialer_alloc   = nni_tls_dialer_alloc,
75 	    .listener_alloc = nni_tls_listener_alloc,
76 	},
77 	{
78 	    .scheme         = "ws",
79 	    .dialer_alloc   = nni_ws_dialer_alloc,
80 	    .listener_alloc = nni_ws_listener_alloc,
81 	},
82 	{
83 	    .scheme         = "ws4",
84 	    .dialer_alloc   = nni_ws_dialer_alloc,
85 	    .listener_alloc = nni_ws_listener_alloc,
86 	},
87 	{
88 	    .scheme         = "ws6",
89 	    .dialer_alloc   = nni_ws_dialer_alloc,
90 	    .listener_alloc = nni_ws_listener_alloc,
91 	},
92 	{
93 	    .scheme         = "wss",
94 	    .dialer_alloc   = nni_ws_dialer_alloc,
95 	    .listener_alloc = nni_ws_listener_alloc,
96 	},
97 	{
98 	    .scheme = NULL,
99 	},
100 };
101 
102 void
nng_stream_close(nng_stream * s)103 nng_stream_close(nng_stream *s)
104 {
105 	s->s_close(s);
106 }
107 
108 void
nng_stream_free(nng_stream * s)109 nng_stream_free(nng_stream *s)
110 {
111 	if (s != NULL) {
112 		s->s_free(s);
113 	}
114 }
115 
116 void
nng_stream_send(nng_stream * s,nng_aio * aio)117 nng_stream_send(nng_stream *s, nng_aio *aio)
118 {
119 	s->s_send(s, aio);
120 }
121 
122 void
nng_stream_recv(nng_stream * s,nng_aio * aio)123 nng_stream_recv(nng_stream *s, nng_aio *aio)
124 {
125 	s->s_recv(s, aio);
126 }
127 
128 int
nni_stream_get(nng_stream * s,const char * nm,void * data,size_t * szp,nni_type t)129 nni_stream_get(
130     nng_stream *s, const char *nm, void *data, size_t *szp, nni_type t)
131 {
132 	return (s->s_get(s, nm, data, szp, t));
133 }
134 
135 int
nni_stream_set(nng_stream * s,const char * nm,const void * data,size_t sz,nni_type t)136 nni_stream_set(
137     nng_stream *s, const char *nm, const void *data, size_t sz, nni_type t)
138 {
139 	return (s->s_set(s, nm, data, sz, t));
140 }
141 
142 void
nng_stream_dialer_close(nng_stream_dialer * d)143 nng_stream_dialer_close(nng_stream_dialer *d)
144 {
145 	d->sd_close(d);
146 }
147 
148 void
nng_stream_dialer_free(nng_stream_dialer * d)149 nng_stream_dialer_free(nng_stream_dialer *d)
150 {
151 	if (d != NULL) {
152 		d->sd_free(d);
153 	}
154 }
155 
156 void
nng_stream_dialer_dial(nng_stream_dialer * d,nng_aio * aio)157 nng_stream_dialer_dial(nng_stream_dialer *d, nng_aio *aio)
158 {
159 	d->sd_dial(d, aio);
160 }
161 
162 int
nng_stream_dialer_alloc_url(nng_stream_dialer ** dp,const nng_url * url)163 nng_stream_dialer_alloc_url(nng_stream_dialer **dp, const nng_url *url)
164 {
165 	int rv;
166 
167 	if ((rv = nni_init()) != 0) {
168 		return (rv);
169 	}
170 
171 	for (int i = 0; stream_drivers[i].scheme != NULL; i++) {
172 		if (strcmp(stream_drivers[i].scheme, url->u_scheme) == 0) {
173 			return (stream_drivers[i].dialer_alloc(dp, url));
174 		}
175 	}
176 	return (NNG_ENOTSUP);
177 }
178 
179 int
nng_stream_dialer_alloc(nng_stream_dialer ** dp,const char * uri)180 nng_stream_dialer_alloc(nng_stream_dialer **dp, const char *uri)
181 {
182 	nng_url *url;
183 	int      rv;
184 
185 	if ((rv = nni_init()) != 0) {
186 		return (rv);
187 	}
188 	if ((rv = nng_url_parse(&url, uri)) != 0) {
189 		return (rv);
190 	}
191 	rv = nng_stream_dialer_alloc_url(dp, url);
192 	nng_url_free(url);
193 	return (rv);
194 }
195 
196 int
nni_stream_dialer_get(nng_stream_dialer * d,const char * nm,void * data,size_t * szp,nni_type t)197 nni_stream_dialer_get(
198     nng_stream_dialer *d, const char *nm, void *data, size_t *szp, nni_type t)
199 {
200 	return (d->sd_get(d, nm, data, szp, t));
201 }
202 
203 int
nni_stream_dialer_set(nng_stream_dialer * d,const char * nm,const void * data,size_t sz,nni_type t)204 nni_stream_dialer_set(nng_stream_dialer *d, const char *nm, const void *data,
205     size_t sz, nni_type t)
206 {
207 	return (d->sd_set(d, nm, data, sz, t));
208 }
209 
210 void
nng_stream_listener_close(nng_stream_listener * l)211 nng_stream_listener_close(nng_stream_listener *l)
212 {
213 	l->sl_close(l);
214 }
215 void
nng_stream_listener_free(nng_stream_listener * l)216 nng_stream_listener_free(nng_stream_listener *l)
217 {
218 	if (l != NULL) {
219 		l->sl_free(l);
220 	}
221 }
222 int
nng_stream_listener_listen(nng_stream_listener * l)223 nng_stream_listener_listen(nng_stream_listener *l)
224 {
225 	return (l->sl_listen(l));
226 }
227 
228 void
nng_stream_listener_accept(nng_stream_listener * l,nng_aio * aio)229 nng_stream_listener_accept(nng_stream_listener *l, nng_aio *aio)
230 {
231 	l->sl_accept(l, aio);
232 }
233 
234 int
nni_stream_listener_get(nng_stream_listener * l,const char * nm,void * data,size_t * szp,nni_type t)235 nni_stream_listener_get(nng_stream_listener *l, const char *nm, void *data,
236     size_t *szp, nni_type t)
237 {
238 	return (l->sl_get(l, nm, data, szp, t));
239 }
240 
241 int
nni_stream_listener_set(nng_stream_listener * l,const char * nm,const void * data,size_t sz,nni_type t)242 nni_stream_listener_set(nng_stream_listener *l, const char *nm,
243     const void *data, size_t sz, nni_type t)
244 {
245 	return (l->sl_set(l, nm, data, sz, t));
246 }
247 
248 int
nng_stream_listener_alloc_url(nng_stream_listener ** lp,const nng_url * url)249 nng_stream_listener_alloc_url(nng_stream_listener **lp, const nng_url *url)
250 {
251 	int rv;
252 
253 	if ((rv = nni_init()) != 0) {
254 		return (rv);
255 	}
256 
257 	for (int i = 0; stream_drivers[i].scheme != NULL; i++) {
258 		if (strcmp(stream_drivers[i].scheme, url->u_scheme) == 0) {
259 			return (stream_drivers[i].listener_alloc(lp, url));
260 		}
261 	}
262 	return (NNG_ENOTSUP);
263 }
264 
265 int
nng_stream_listener_alloc(nng_stream_listener ** lp,const char * uri)266 nng_stream_listener_alloc(nng_stream_listener **lp, const char *uri)
267 {
268 	nng_url *url;
269 	int      rv;
270 
271 	if ((rv = nni_init()) != 0) {
272 		return (rv);
273 	}
274 
275 	if ((rv = nng_url_parse(&url, uri)) != 0) {
276 		return (rv);
277 	}
278 	rv = nng_stream_listener_alloc_url(lp, url);
279 	nng_url_free(url);
280 	return (rv);
281 }
282 
283 // Public stream options.
284 
285 int
nng_stream_get(nng_stream * s,const char * n,void * v,size_t * szp)286 nng_stream_get(nng_stream *s, const char *n, void *v, size_t *szp)
287 {
288 	return (nni_stream_get(s, n, v, szp, NNI_TYPE_OPAQUE));
289 }
290 
291 int
nng_stream_get_int(nng_stream * s,const char * n,int * v)292 nng_stream_get_int(nng_stream *s, const char *n, int *v)
293 {
294 	return (nni_stream_get(s, n, v, NULL, NNI_TYPE_INT32));
295 }
296 
297 int
nng_stream_get_bool(nng_stream * s,const char * n,bool * v)298 nng_stream_get_bool(nng_stream *s, const char *n, bool *v)
299 {
300 	return (nni_stream_get(s, n, v, NULL, NNI_TYPE_BOOL));
301 }
302 
303 int
nng_stream_get_size(nng_stream * s,const char * n,size_t * v)304 nng_stream_get_size(nng_stream *s, const char *n, size_t *v)
305 {
306 	return (nni_stream_get(s, n, v, NULL, NNI_TYPE_SIZE));
307 }
308 
309 int
nng_stream_get_uint64(nng_stream * s,const char * n,uint64_t * v)310 nng_stream_get_uint64(nng_stream *s, const char *n, uint64_t *v)
311 {
312 	return (nni_stream_get(s, n, v, NULL, NNI_TYPE_UINT64));
313 }
314 
315 int
nng_stream_get_string(nng_stream * s,const char * n,char ** v)316 nng_stream_get_string(nng_stream *s, const char *n, char **v)
317 {
318 	return (nni_stream_get(s, n, v, NULL, NNI_TYPE_STRING));
319 }
320 
321 int
nng_stream_get_ptr(nng_stream * s,const char * n,void ** v)322 nng_stream_get_ptr(nng_stream *s, const char *n, void **v)
323 {
324 	return (nni_stream_get(s, n, v, NULL, NNI_TYPE_POINTER));
325 }
326 
327 int
nng_stream_get_ms(nng_stream * s,const char * n,nng_duration * v)328 nng_stream_get_ms(nng_stream *s, const char *n, nng_duration *v)
329 {
330 	return (nni_stream_get(s, n, v, NULL, NNI_TYPE_DURATION));
331 }
332 
333 int
nng_stream_get_addr(nng_stream * s,const char * n,nng_sockaddr * v)334 nng_stream_get_addr(nng_stream *s, const char *n, nng_sockaddr *v)
335 {
336 	return (nni_stream_get(s, n, v, NULL, NNI_TYPE_SOCKADDR));
337 }
338 
339 int
nng_stream_dialer_get(nng_stream_dialer * d,const char * n,void * v,size_t * szp)340 nng_stream_dialer_get(
341     nng_stream_dialer *d, const char *n, void *v, size_t *szp)
342 {
343 	return (nni_stream_dialer_get(d, n, v, szp, NNI_TYPE_OPAQUE));
344 }
345 
346 int
nng_stream_dialer_get_int(nng_stream_dialer * d,const char * n,int * v)347 nng_stream_dialer_get_int(nng_stream_dialer *d, const char *n, int *v)
348 {
349 	return (nni_stream_dialer_get(d, n, v, NULL, NNI_TYPE_INT32));
350 }
351 
352 int
nng_stream_dialer_get_bool(nng_stream_dialer * d,const char * n,bool * v)353 nng_stream_dialer_get_bool(nng_stream_dialer *d, const char *n, bool *v)
354 {
355 	return (nni_stream_dialer_get(d, n, v, NULL, NNI_TYPE_BOOL));
356 }
357 
358 int
nng_stream_dialer_get_size(nng_stream_dialer * d,const char * n,size_t * v)359 nng_stream_dialer_get_size(nng_stream_dialer *d, const char *n, size_t *v)
360 {
361 	return (nni_stream_dialer_get(d, n, v, NULL, NNI_TYPE_SIZE));
362 }
363 
364 int
nng_stream_dialer_get_uint64(nng_stream_dialer * d,const char * n,uint64_t * v)365 nng_stream_dialer_get_uint64(nng_stream_dialer *d, const char *n, uint64_t *v)
366 {
367 	return (nni_stream_dialer_get(d, n, v, NULL, NNI_TYPE_UINT64));
368 }
369 
370 int
nng_stream_dialer_get_string(nng_stream_dialer * d,const char * n,char ** v)371 nng_stream_dialer_get_string(nng_stream_dialer *d, const char *n, char **v)
372 {
373 	return (nni_stream_dialer_get(d, n, v, NULL, NNI_TYPE_STRING));
374 }
375 
376 int
nng_stream_dialer_get_ptr(nng_stream_dialer * d,const char * n,void ** v)377 nng_stream_dialer_get_ptr(nng_stream_dialer *d, const char *n, void **v)
378 {
379 	return (nni_stream_dialer_get(d, n, v, NULL, NNI_TYPE_POINTER));
380 }
381 
382 int
nng_stream_dialer_get_ms(nng_stream_dialer * d,const char * n,nng_duration * v)383 nng_stream_dialer_get_ms(nng_stream_dialer *d, const char *n, nng_duration *v)
384 {
385 	return (nni_stream_dialer_get(d, n, v, NULL, NNI_TYPE_DURATION));
386 }
387 
388 int
nng_stream_dialer_get_addr(nng_stream_dialer * d,const char * n,nng_sockaddr * v)389 nng_stream_dialer_get_addr(
390     nng_stream_dialer *d, const char *n, nng_sockaddr *v)
391 {
392 	return (nni_stream_dialer_get(d, n, v, NULL, NNI_TYPE_SOCKADDR));
393 }
394 
395 int
nng_stream_listener_get(nng_stream_listener * l,const char * n,void * v,size_t * szp)396 nng_stream_listener_get(
397     nng_stream_listener *l, const char *n, void *v, size_t *szp)
398 {
399 	return (nni_stream_listener_get(l, n, v, szp, NNI_TYPE_OPAQUE));
400 }
401 
402 int
nng_stream_listener_get_int(nng_stream_listener * l,const char * n,int * v)403 nng_stream_listener_get_int(nng_stream_listener *l, const char *n, int *v)
404 {
405 	return (nni_stream_listener_get(l, n, v, NULL, NNI_TYPE_INT32));
406 }
407 
408 int
nng_stream_listener_get_bool(nng_stream_listener * l,const char * n,bool * v)409 nng_stream_listener_get_bool(nng_stream_listener *l, const char *n, bool *v)
410 {
411 	return (nni_stream_listener_get(l, n, v, NULL, NNI_TYPE_BOOL));
412 }
413 
414 int
nng_stream_listener_get_size(nng_stream_listener * l,const char * n,size_t * v)415 nng_stream_listener_get_size(nng_stream_listener *l, const char *n, size_t *v)
416 {
417 	return (nni_stream_listener_get(l, n, v, NULL, NNI_TYPE_SIZE));
418 }
419 
420 int
nng_stream_listener_get_uint64(nng_stream_listener * l,const char * n,uint64_t * v)421 nng_stream_listener_get_uint64(
422     nng_stream_listener *l, const char *n, uint64_t *v)
423 {
424 	return (nni_stream_listener_get(l, n, v, NULL, NNI_TYPE_UINT64));
425 }
426 
427 int
nng_stream_listener_get_string(nng_stream_listener * l,const char * n,char ** v)428 nng_stream_listener_get_string(nng_stream_listener *l, const char *n, char **v)
429 {
430 	return (nni_stream_listener_get(l, n, v, NULL, NNI_TYPE_STRING));
431 }
432 
433 int
nng_stream_listener_get_ptr(nng_stream_listener * l,const char * n,void ** v)434 nng_stream_listener_get_ptr(nng_stream_listener *l, const char *n, void **v)
435 {
436 	return (nni_stream_listener_get(l, n, v, NULL, NNI_TYPE_POINTER));
437 }
438 
439 int
nng_stream_listener_get_ms(nng_stream_listener * l,const char * n,nng_duration * v)440 nng_stream_listener_get_ms(
441     nng_stream_listener *l, const char *n, nng_duration *v)
442 {
443 	return (nni_stream_listener_get(l, n, v, NULL, NNI_TYPE_DURATION));
444 }
445 
446 int
nng_stream_listener_get_addr(nng_stream_listener * l,const char * n,nng_sockaddr * v)447 nng_stream_listener_get_addr(
448     nng_stream_listener *l, const char *n, nng_sockaddr *v)
449 {
450 	return (nni_stream_listener_get(l, n, v, NULL, NNI_TYPE_SOCKADDR));
451 }
452 
453 int
nng_stream_set(nng_stream * s,const char * n,const void * v,size_t sz)454 nng_stream_set(nng_stream *s, const char *n, const void *v, size_t sz)
455 {
456 	return (nni_stream_set(s, n, v, sz, NNI_TYPE_OPAQUE));
457 }
458 
459 int
nng_stream_set_int(nng_stream * s,const char * n,int v)460 nng_stream_set_int(nng_stream *s, const char *n, int v)
461 {
462 	return (nni_stream_set(s, n, &v, sizeof(v), NNI_TYPE_INT32));
463 }
464 
465 int
nng_stream_set_bool(nng_stream * s,const char * n,bool v)466 nng_stream_set_bool(nng_stream *s, const char *n, bool v)
467 {
468 	return (nni_stream_set(s, n, &v, sizeof(v), NNI_TYPE_BOOL));
469 }
470 
471 int
nng_stream_set_size(nng_stream * s,const char * n,size_t v)472 nng_stream_set_size(nng_stream *s, const char *n, size_t v)
473 {
474 	return (nni_stream_set(s, n, &v, sizeof(v), NNI_TYPE_SIZE));
475 }
476 
477 int
nng_stream_set_uint64(nng_stream * s,const char * n,uint64_t v)478 nng_stream_set_uint64(nng_stream *s, const char *n, uint64_t v)
479 {
480 	return (nni_stream_set(s, n, &v, sizeof(v), NNI_TYPE_UINT64));
481 }
482 
483 int
nng_stream_set_ms(nng_stream * s,const char * n,nng_duration v)484 nng_stream_set_ms(nng_stream *s, const char *n, nng_duration v)
485 {
486 	return (nni_stream_set(s, n, &v, sizeof(v), NNI_TYPE_DURATION));
487 }
488 
489 int
nng_stream_set_ptr(nng_stream * s,const char * n,void * v)490 nng_stream_set_ptr(nng_stream *s, const char *n, void *v)
491 {
492 	return (nni_stream_set(s, n, &v, sizeof(v), NNI_TYPE_POINTER));
493 }
494 
495 int
nng_stream_set_string(nng_stream * s,const char * n,const char * v)496 nng_stream_set_string(nng_stream *s, const char *n, const char *v)
497 {
498 	return (nni_stream_set(
499 	    s, n, v, v == NULL ? 0 : strlen(v) + 1, NNI_TYPE_STRING));
500 }
501 
502 int
nng_stream_set_addr(nng_stream * s,const char * n,const nng_sockaddr * v)503 nng_stream_set_addr(nng_stream *s, const char *n, const nng_sockaddr *v)
504 {
505 	return (nni_stream_set(s, n, v, sizeof(*v), NNI_TYPE_SOCKADDR));
506 }
507 
508 int
nng_stream_dialer_set(nng_stream_dialer * d,const char * n,const void * v,size_t sz)509 nng_stream_dialer_set(
510     nng_stream_dialer *d, const char *n, const void *v, size_t sz)
511 {
512 	return (nni_stream_dialer_set(d, n, v, sz, NNI_TYPE_OPAQUE));
513 }
514 
515 int
nng_stream_dialer_set_int(nng_stream_dialer * d,const char * n,int v)516 nng_stream_dialer_set_int(nng_stream_dialer *d, const char *n, int v)
517 {
518 	return (nni_stream_dialer_set(d, n, &v, sizeof(v), NNI_TYPE_INT32));
519 }
520 
521 int
nng_stream_dialer_set_bool(nng_stream_dialer * d,const char * n,bool v)522 nng_stream_dialer_set_bool(nng_stream_dialer *d, const char *n, bool v)
523 {
524 	return (nni_stream_dialer_set(d, n, &v, sizeof(v), NNI_TYPE_BOOL));
525 }
526 
527 int
nng_stream_dialer_set_size(nng_stream_dialer * d,const char * n,size_t v)528 nng_stream_dialer_set_size(nng_stream_dialer *d, const char *n, size_t v)
529 {
530 	return (nni_stream_dialer_set(d, n, &v, sizeof(v), NNI_TYPE_SIZE));
531 }
532 
533 int
nng_stream_dialer_set_uint64(nng_stream_dialer * d,const char * n,uint64_t v)534 nng_stream_dialer_set_uint64(nng_stream_dialer *d, const char *n, uint64_t v)
535 {
536 	return (nni_stream_dialer_set(d, n, &v, sizeof(v), NNI_TYPE_UINT64));
537 }
538 
539 int
nng_stream_dialer_set_ms(nng_stream_dialer * d,const char * n,nng_duration v)540 nng_stream_dialer_set_ms(nng_stream_dialer *d, const char *n, nng_duration v)
541 {
542 	return (nni_stream_dialer_set(d, n, &v, sizeof(v), NNI_TYPE_DURATION));
543 }
544 
545 int
nng_stream_dialer_set_ptr(nng_stream_dialer * d,const char * n,void * v)546 nng_stream_dialer_set_ptr(nng_stream_dialer *d, const char *n, void *v)
547 {
548 	return (nni_stream_dialer_set(d, n, &v, sizeof(v), NNI_TYPE_POINTER));
549 }
550 
551 int
nng_stream_dialer_set_string(nng_stream_dialer * d,const char * n,const char * v)552 nng_stream_dialer_set_string(
553     nng_stream_dialer *d, const char *n, const char *v)
554 {
555 	return (nni_stream_dialer_set(
556 	    d, n, v, v == NULL ? 0 : strlen(v) + 1, NNI_TYPE_STRING));
557 }
558 
559 int
nng_stream_dialer_set_addr(nng_stream_dialer * d,const char * n,const nng_sockaddr * v)560 nng_stream_dialer_set_addr(
561     nng_stream_dialer *d, const char *n, const nng_sockaddr *v)
562 {
563 	return (nni_stream_dialer_set(d, n, v, sizeof(*v), NNI_TYPE_SOCKADDR));
564 }
565 
566 int
nng_stream_listener_set(nng_stream_listener * l,const char * n,const void * v,size_t sz)567 nng_stream_listener_set(
568     nng_stream_listener *l, const char *n, const void *v, size_t sz)
569 {
570 	return (nni_stream_listener_set(l, n, v, sz, NNI_TYPE_OPAQUE));
571 }
572 
573 int
nng_stream_listener_set_int(nng_stream_listener * l,const char * n,int v)574 nng_stream_listener_set_int(nng_stream_listener *l, const char *n, int v)
575 {
576 	return (nni_stream_listener_set(l, n, &v, sizeof(v), NNI_TYPE_INT32));
577 }
578 
579 int
nng_stream_listener_set_bool(nng_stream_listener * l,const char * n,bool v)580 nng_stream_listener_set_bool(nng_stream_listener *l, const char *n, bool v)
581 {
582 	return (nni_stream_listener_set(l, n, &v, sizeof(v), NNI_TYPE_BOOL));
583 }
584 
585 int
nng_stream_listener_set_size(nng_stream_listener * l,const char * n,size_t v)586 nng_stream_listener_set_size(nng_stream_listener *l, const char *n, size_t v)
587 {
588 	return (nni_stream_listener_set(l, n, &v, sizeof(v), NNI_TYPE_SIZE));
589 }
590 
591 int
nng_stream_listener_set_uint64(nng_stream_listener * l,const char * n,uint64_t v)592 nng_stream_listener_set_uint64(
593     nng_stream_listener *l, const char *n, uint64_t v)
594 {
595 	return (nni_stream_listener_set(l, n, &v, sizeof(v), NNI_TYPE_UINT64));
596 }
597 
598 int
nng_stream_listener_set_ms(nng_stream_listener * l,const char * n,nng_duration v)599 nng_stream_listener_set_ms(
600     nng_stream_listener *l, const char *n, nng_duration v)
601 {
602 	return (
603 	    nni_stream_listener_set(l, n, &v, sizeof(v), NNI_TYPE_DURATION));
604 }
605 
606 int
nng_stream_listener_set_ptr(nng_stream_listener * l,const char * n,void * v)607 nng_stream_listener_set_ptr(nng_stream_listener *l, const char *n, void *v)
608 {
609 	return (
610 	    nni_stream_listener_set(l, n, &v, sizeof(v), NNI_TYPE_POINTER));
611 }
612 
613 int
nng_stream_listener_set_string(nng_stream_listener * l,const char * n,const char * v)614 nng_stream_listener_set_string(
615     nng_stream_listener *l, const char *n, const char *v)
616 {
617 	return (nni_stream_listener_set(
618 	    l, n, v, v == NULL ? 0 : strlen(v) + 1, NNI_TYPE_STRING));
619 }
620 
621 int
nng_stream_listener_set_addr(nng_stream_listener * l,const char * n,const nng_sockaddr * v)622 nng_stream_listener_set_addr(
623     nng_stream_listener *l, const char *n, const nng_sockaddr *v)
624 {
625 	return (
626 	    nni_stream_listener_set(l, n, v, sizeof(*v), NNI_TYPE_SOCKADDR));
627 }
628