1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 // Copyright 2018 Devolutions <info@devolutions.net>
5 //
6 // This software is supplied under the terms of the MIT License, a
7 // copy of which should be located in the distribution where this
8 // file was obtained (LICENSE.txt).  A copy of the license may also be
9 // found online at https://opensource.org/licenses/MIT.
10 //
11 
12 #include "core/nng_impl.h"
13 #include "sockimpl.h"
14 
15 #include <stdio.h>
16 
17 // This file contains functions relating to pipes.
18 //
19 // Operations on pipes (to the transport) are generally blocking operations,
20 // performed in the context of the protocol.
21 
22 static nni_id_map pipes;
23 static nni_mtx    pipes_lk;
24 
25 static void pipe_destroy(void *);
26 
27 static nni_reap_list pipe_reap_list = {
28 	.rl_offset = offsetof(nni_pipe, p_reap),
29 	.rl_func   = pipe_destroy,
30 };
31 
32 int
nni_pipe_sys_init(void)33 nni_pipe_sys_init(void)
34 {
35 	nni_mtx_init(&pipes_lk);
36 
37 	// Pipe IDs needs to have high order bit clear, and we want
38 	// them to start at a random value.
39 	nni_id_map_init(&pipes, 1, 0x7fffffff, true);
40 
41 	return (0);
42 }
43 
44 void
nni_pipe_sys_fini(void)45 nni_pipe_sys_fini(void)
46 {
47 	nni_reap_drain();
48 	nni_mtx_fini(&pipes_lk);
49 	nni_id_map_fini(&pipes);
50 }
51 
52 static void
pipe_destroy(void * arg)53 pipe_destroy(void *arg)
54 {
55 	nni_pipe *p = arg;
56 	if (p == NULL) {
57 		return;
58 	}
59 
60 	nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST);
61 
62 	// Make sure any unlocked holders are done with this.
63 	// This happens during initialization for example.
64 	nni_mtx_lock(&pipes_lk);
65 	if (p->p_id != 0) {
66 		nni_id_remove(&pipes, p->p_id);
67 	}
68 	// This wait guarantees that all callers are done with us.
69 	while (p->p_ref != 0) {
70 		nni_cv_wait(&p->p_cv);
71 	}
72 	nni_mtx_unlock(&pipes_lk);
73 
74 	if (p->p_proto_data != NULL) {
75 		p->p_proto_ops.pipe_stop(p->p_proto_data);
76 	}
77 	if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
78 		p->p_tran_ops.p_stop(p->p_tran_data);
79 	}
80 
81 #ifdef NNG_ENABLE_STATS
82 	nni_stat_unregister(&p->st_root);
83 #endif
84 	nni_pipe_remove(p);
85 
86 	if (p->p_proto_data != NULL) {
87 		p->p_proto_ops.pipe_fini(p->p_proto_data);
88 	}
89 	if (p->p_tran_data != NULL) {
90 		p->p_tran_ops.p_fini(p->p_tran_data);
91 	}
92 	nni_cv_fini(&p->p_cv);
93 	nni_mtx_fini(&p->p_mtx);
94 	nni_free(p, p->p_size);
95 }
96 
97 int
nni_pipe_find(nni_pipe ** pp,uint32_t id)98 nni_pipe_find(nni_pipe **pp, uint32_t id)
99 {
100 	nni_pipe *p;
101 
102 	// We don't care if the pipe is "closed".  End users only have
103 	// access to the pipe in order to obtain properties (which may
104 	// be retried during the post-close notification callback) or to
105 	// close the pipe.
106 	nni_mtx_lock(&pipes_lk);
107 	if ((p = nni_id_get(&pipes, id)) != NULL) {
108 		p->p_ref++;
109 		*pp = p;
110 	}
111 	nni_mtx_unlock(&pipes_lk);
112 	return (p == NULL ? NNG_ENOENT : 0);
113 }
114 
115 void
nni_pipe_rele(nni_pipe * p)116 nni_pipe_rele(nni_pipe *p)
117 {
118 	nni_mtx_lock(&pipes_lk);
119 	p->p_ref--;
120 	if (p->p_ref == 0) {
121 		nni_cv_wake(&p->p_cv);
122 	}
123 	nni_mtx_unlock(&pipes_lk);
124 }
125 
126 // nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces.
127 uint32_t
nni_pipe_id(nni_pipe * p)128 nni_pipe_id(nni_pipe *p)
129 {
130 	return (p->p_id);
131 }
132 
133 void
nni_pipe_recv(nni_pipe * p,nni_aio * aio)134 nni_pipe_recv(nni_pipe *p, nni_aio *aio)
135 {
136 	p->p_tran_ops.p_recv(p->p_tran_data, aio);
137 }
138 
139 void
nni_pipe_send(nni_pipe * p,nni_aio * aio)140 nni_pipe_send(nni_pipe *p, nni_aio *aio)
141 {
142 	p->p_tran_ops.p_send(p->p_tran_data, aio);
143 }
144 
145 // nni_pipe_close closes the underlying connection.  It is expected that
146 // subsequent attempts to receive or send (including any waiting receive) will
147 // simply return NNG_ECLOSED.
148 void
nni_pipe_close(nni_pipe * p)149 nni_pipe_close(nni_pipe *p)
150 {
151 	nni_mtx_lock(&p->p_mtx);
152 	if (p->p_closed) {
153 		// We already did a close.
154 		nni_mtx_unlock(&p->p_mtx);
155 		return;
156 	}
157 	p->p_closed = true;
158 	nni_mtx_unlock(&p->p_mtx);
159 
160 	if (p->p_proto_data != NULL) {
161 		p->p_proto_ops.pipe_close(p->p_proto_data);
162 	}
163 
164 	// Close the underlying transport.
165 	if (p->p_tran_data != NULL) {
166 		p->p_tran_ops.p_close(p->p_tran_data);
167 	}
168 
169 	nni_reap(&pipe_reap_list, p);
170 }
171 
172 uint16_t
nni_pipe_peer(nni_pipe * p)173 nni_pipe_peer(nni_pipe *p)
174 {
175 	return (p->p_tran_ops.p_peer(p->p_tran_data));
176 }
177 
178 #ifdef NNG_ENABLE_STATS
179 static void
pipe_stat_init(nni_pipe * p,nni_stat_item * item,const nni_stat_info * info)180 pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info)
181 {
182 	nni_stat_init(item, info);
183 	nni_stat_add(&p->st_root, item);
184 }
185 
186 static void
pipe_stats_init(nni_pipe * p)187 pipe_stats_init(nni_pipe *p)
188 {
189 	static const nni_stat_info root_info = {
190 		.si_name = "pipe",
191 		.si_desc = "pipe statistics",
192 		.si_type = NNG_STAT_SCOPE,
193 	};
194 	static const nni_stat_info id_info = {
195 		.si_name = "id",
196 		.si_desc = "pipe id",
197 		.si_type = NNG_STAT_ID,
198 	};
199 	static const nni_stat_info socket_info = {
200 		.si_name = "socket",
201 		.si_desc = "socket for pipe",
202 		.si_type = NNG_STAT_ID,
203 	};
204 	static const nni_stat_info rx_msgs_info = {
205 		.si_name   = "rx_msgs",
206 		.si_desc   = "messages received",
207 		.si_type   = NNG_STAT_COUNTER,
208 		.si_unit   = NNG_UNIT_MESSAGES,
209 		.si_atomic = true,
210 	};
211 	static const nni_stat_info tx_msgs_info = {
212 		.si_name   = "tx_msgs",
213 		.si_desc   = "messages sent",
214 		.si_type   = NNG_STAT_COUNTER,
215 		.si_unit   = NNG_UNIT_MESSAGES,
216 		.si_atomic = true,
217 	};
218 	static const nni_stat_info rx_bytes_info = {
219 		.si_name   = "rx_bytes",
220 		.si_desc   = "bytes received",
221 		.si_type   = NNG_STAT_COUNTER,
222 		.si_unit   = NNG_UNIT_BYTES,
223 		.si_atomic = true,
224 	};
225 	static const nni_stat_info tx_bytes_info = {
226 		.si_name   = "tx_bytes",
227 		.si_desc   = "bytes sent",
228 		.si_type   = NNG_STAT_COUNTER,
229 		.si_unit   = NNG_UNIT_BYTES,
230 		.si_atomic = true,
231 	};
232 
233 	nni_stat_init(&p->st_root, &root_info);
234 	pipe_stat_init(p, &p->st_id, &id_info);
235 	pipe_stat_init(p, &p->st_sock_id, &socket_info);
236 	pipe_stat_init(p, &p->st_rx_msgs, &rx_msgs_info);
237 	pipe_stat_init(p, &p->st_tx_msgs, &tx_msgs_info);
238 	pipe_stat_init(p, &p->st_rx_bytes, &rx_bytes_info);
239 	pipe_stat_init(p, &p->st_tx_bytes, &tx_bytes_info);
240 
241 	nni_stat_set_id(&p->st_root, (int) p->p_id);
242 	nni_stat_set_id(&p->st_id, (int) p->p_id);
243 	nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock));
244 }
245 #endif // NNG_ENABLE_STATS
246 
247 static int
pipe_create(nni_pipe ** pp,nni_sock * sock,nni_sp_tran * tran,void * tdata)248 pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tdata)
249 {
250 	nni_pipe *          p;
251 	int                 rv;
252 	void *              sdata = nni_sock_proto_data(sock);
253 	nni_proto_pipe_ops *pops  = nni_sock_proto_pipe_ops(sock);
254 	size_t              sz;
255 
256 	sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size;
257 
258 	if ((p = nni_zalloc(sz)) == NULL) {
259 		// In this case we just toss the pipe...
260 		tran->tran_pipe->p_fini(tdata);
261 		return (NNG_ENOMEM);
262 	}
263 
264 	p->p_size       = sz;
265 	p->p_proto_data = p + 1;
266 	p->p_tran_ops   = *tran->tran_pipe;
267 	p->p_tran_data  = tdata;
268 	p->p_proto_ops  = *pops;
269 	p->p_sock       = sock;
270 	p->p_closed     = false;
271 	p->p_cbs        = false;
272 	p->p_ref        = 0;
273 
274 	nni_atomic_flag_reset(&p->p_stop);
275 	NNI_LIST_NODE_INIT(&p->p_sock_node);
276 	NNI_LIST_NODE_INIT(&p->p_ep_node);
277 
278 	nni_mtx_init(&p->p_mtx);
279 	nni_cv_init(&p->p_cv, &pipes_lk);
280 
281 	nni_mtx_lock(&pipes_lk);
282 	if ((rv = nni_id_alloc(&pipes, &p->p_id, p)) == 0) {
283 		p->p_ref = 1;
284 	}
285 	nni_mtx_unlock(&pipes_lk);
286 
287 #ifdef NNG_ENABLE_STATS
288 	pipe_stats_init(p);
289 #endif
290 
291 	if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tdata, p)) != 0) ||
292 	    ((rv = pops->pipe_init(p->p_proto_data, p, sdata)) != 0)) {
293 		nni_pipe_close(p);
294 		nni_pipe_rele(p);
295 		return (rv);
296 	}
297 
298 	*pp = p;
299 	return (0);
300 }
301 
302 int
nni_pipe_create_dialer(nni_pipe ** pp,nni_dialer * d,void * tdata)303 nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata)
304 {
305 	int          rv;
306 	nni_sp_tran *tran = d->d_tran;
307 	nni_pipe *   p;
308 
309 	if ((rv = pipe_create(&p, d->d_sock, tran, tdata)) != 0) {
310 		return (rv);
311 	}
312 	p->p_dialer = d;
313 #ifdef NNG_ENABLE_STATS
314 	static const nni_stat_info dialer_info = {
315 		.si_name = "dialer",
316 		.si_desc = "dialer for pipe",
317 		.si_type = NNG_STAT_ID,
318 	};
319 	pipe_stat_init(p, &p->st_ep_id, &dialer_info);
320 	nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d));
321 #endif
322 	*pp = p;
323 	return (0);
324 }
325 
326 int
nni_pipe_create_listener(nni_pipe ** pp,nni_listener * l,void * tdata)327 nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata)
328 {
329 	int          rv;
330 	nni_sp_tran *tran = l->l_tran;
331 	nni_pipe *   p;
332 
333 	if ((rv = pipe_create(&p, l->l_sock, tran, tdata)) != 0) {
334 		return (rv);
335 	}
336 	p->p_listener = l;
337 #if NNG_ENABLE_STATS
338 	static const nni_stat_info listener_info = {
339 		.si_name = "listener",
340 		.si_desc = "listener for pipe",
341 		.si_type = NNG_STAT_ID,
342 	};
343 	pipe_stat_init(p, &p->st_ep_id, &listener_info);
344 	nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l));
345 #endif
346 	*pp = p;
347 	return (0);
348 }
349 
350 int
nni_pipe_getopt(nni_pipe * p,const char * name,void * val,size_t * szp,nni_opt_type t)351 nni_pipe_getopt(
352     nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
353 {
354 	int rv;
355 
356 	rv = p->p_tran_ops.p_getopt(p->p_tran_data, name, val, szp, t);
357 	if (rv != NNG_ENOTSUP) {
358 		return (rv);
359 	}
360 
361 	// Maybe the endpoint knows? The guarantees on pipes ensure that the
362 	// pipe will not outlive its creating endpoint.
363 	if (p->p_dialer != NULL) {
364 		return (nni_dialer_getopt(p->p_dialer, name, val, szp, t));
365 	}
366 	if (p->p_listener != NULL) {
367 		return (nni_listener_getopt(p->p_listener, name, val, szp, t));
368 	}
369 	return (NNG_ENOTSUP);
370 }
371 
372 uint32_t
nni_pipe_sock_id(nni_pipe * p)373 nni_pipe_sock_id(nni_pipe *p)
374 {
375 	return (nni_sock_id(p->p_sock));
376 }
377 
378 uint32_t
nni_pipe_listener_id(nni_pipe * p)379 nni_pipe_listener_id(nni_pipe *p)
380 {
381 	return (p->p_listener ? nni_listener_id(p->p_listener) : 0);
382 }
383 
384 uint32_t
nni_pipe_dialer_id(nni_pipe * p)385 nni_pipe_dialer_id(nni_pipe *p)
386 {
387 	return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0);
388 }
389 
390 void
nni_pipe_add_stat(nni_pipe * p,nni_stat_item * item)391 nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item)
392 {
393 #ifdef NNG_ENABLE_STATS
394 	nni_stat_add(&p->st_root, item);
395 #else
396 	NNI_ARG_UNUSED(p);
397 	NNI_ARG_UNUSED(item);
398 #endif
399 }
400 
401 void
nni_pipe_bump_rx(nni_pipe * p,size_t nbytes)402 nni_pipe_bump_rx(nni_pipe *p, size_t nbytes)
403 {
404 #ifdef NNG_ENABLE_STATS
405 	nni_stat_inc(&p->st_rx_bytes, nbytes);
406 	nni_stat_inc(&p->st_rx_msgs, 1);
407 #else
408 	NNI_ARG_UNUSED(p);
409 	NNI_ARG_UNUSED(nbytes);
410 #endif
411 }
412 
413 void
nni_pipe_bump_tx(nni_pipe * p,size_t nbytes)414 nni_pipe_bump_tx(nni_pipe *p, size_t nbytes)
415 {
416 #ifdef NNG_ENABLE_STATS
417 	nni_stat_inc(&p->st_tx_bytes, nbytes);
418 	nni_stat_inc(&p->st_tx_msgs, 1);
419 #else
420 	NNI_ARG_UNUSED(p);
421 	NNI_ARG_UNUSED(nbytes);
422 #endif
423 }
424 
425 void
nni_pipe_bump_error(nni_pipe * p,int err)426 nni_pipe_bump_error(nni_pipe *p, int err)
427 {
428 	if (p->p_dialer != NULL) {
429 		nni_dialer_bump_error(p->p_dialer, err);
430 	} else {
431 		nni_listener_bump_error(p->p_listener, err);
432 	}
433 }