1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 // Copyright 2018 Devolutions <info@devolutions.net>
5 //
6 // This software is supplied under the terms of the MIT License, a
7 // copy of which should be located in the distribution where this
8 // file was obtained (LICENSE.txt). A copy of the license may also be
9 // found online at https://opensource.org/licenses/MIT.
10 //
11
12 #include "core/nng_impl.h"
13 #include "sockimpl.h"
14
15 #include <stdio.h>
16
17 // This file contains functions relating to pipes.
18 //
19 // Operations on pipes (to the transport) are generally blocking operations,
20 // performed in the context of the protocol.
21
22 static nni_id_map pipes;
23 static nni_mtx pipes_lk;
24
25 static void pipe_destroy(void *);
26
27 static nni_reap_list pipe_reap_list = {
28 .rl_offset = offsetof(nni_pipe, p_reap),
29 .rl_func = pipe_destroy,
30 };
31
32 int
nni_pipe_sys_init(void)33 nni_pipe_sys_init(void)
34 {
35 nni_mtx_init(&pipes_lk);
36
37 // Pipe IDs needs to have high order bit clear, and we want
38 // them to start at a random value.
39 nni_id_map_init(&pipes, 1, 0x7fffffff, true);
40
41 return (0);
42 }
43
44 void
nni_pipe_sys_fini(void)45 nni_pipe_sys_fini(void)
46 {
47 nni_reap_drain();
48 nni_mtx_fini(&pipes_lk);
49 nni_id_map_fini(&pipes);
50 }
51
52 static void
pipe_destroy(void * arg)53 pipe_destroy(void *arg)
54 {
55 nni_pipe *p = arg;
56 if (p == NULL) {
57 return;
58 }
59
60 nni_pipe_run_cb(p, NNG_PIPE_EV_REM_POST);
61
62 // Make sure any unlocked holders are done with this.
63 // This happens during initialization for example.
64 nni_mtx_lock(&pipes_lk);
65 if (p->p_id != 0) {
66 nni_id_remove(&pipes, p->p_id);
67 }
68 // This wait guarantees that all callers are done with us.
69 while (p->p_ref != 0) {
70 nni_cv_wait(&p->p_cv);
71 }
72 nni_mtx_unlock(&pipes_lk);
73
74 if (p->p_proto_data != NULL) {
75 p->p_proto_ops.pipe_stop(p->p_proto_data);
76 }
77 if ((p->p_tran_data != NULL) && (p->p_tran_ops.p_stop != NULL)) {
78 p->p_tran_ops.p_stop(p->p_tran_data);
79 }
80
81 #ifdef NNG_ENABLE_STATS
82 nni_stat_unregister(&p->st_root);
83 #endif
84 nni_pipe_remove(p);
85
86 if (p->p_proto_data != NULL) {
87 p->p_proto_ops.pipe_fini(p->p_proto_data);
88 }
89 if (p->p_tran_data != NULL) {
90 p->p_tran_ops.p_fini(p->p_tran_data);
91 }
92 nni_cv_fini(&p->p_cv);
93 nni_mtx_fini(&p->p_mtx);
94 nni_free(p, p->p_size);
95 }
96
97 int
nni_pipe_find(nni_pipe ** pp,uint32_t id)98 nni_pipe_find(nni_pipe **pp, uint32_t id)
99 {
100 nni_pipe *p;
101
102 // We don't care if the pipe is "closed". End users only have
103 // access to the pipe in order to obtain properties (which may
104 // be retried during the post-close notification callback) or to
105 // close the pipe.
106 nni_mtx_lock(&pipes_lk);
107 if ((p = nni_id_get(&pipes, id)) != NULL) {
108 p->p_ref++;
109 *pp = p;
110 }
111 nni_mtx_unlock(&pipes_lk);
112 return (p == NULL ? NNG_ENOENT : 0);
113 }
114
115 void
nni_pipe_rele(nni_pipe * p)116 nni_pipe_rele(nni_pipe *p)
117 {
118 nni_mtx_lock(&pipes_lk);
119 p->p_ref--;
120 if (p->p_ref == 0) {
121 nni_cv_wake(&p->p_cv);
122 }
123 nni_mtx_unlock(&pipes_lk);
124 }
125
126 // nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces.
127 uint32_t
nni_pipe_id(nni_pipe * p)128 nni_pipe_id(nni_pipe *p)
129 {
130 return (p->p_id);
131 }
132
133 void
nni_pipe_recv(nni_pipe * p,nni_aio * aio)134 nni_pipe_recv(nni_pipe *p, nni_aio *aio)
135 {
136 p->p_tran_ops.p_recv(p->p_tran_data, aio);
137 }
138
139 void
nni_pipe_send(nni_pipe * p,nni_aio * aio)140 nni_pipe_send(nni_pipe *p, nni_aio *aio)
141 {
142 p->p_tran_ops.p_send(p->p_tran_data, aio);
143 }
144
145 // nni_pipe_close closes the underlying connection. It is expected that
146 // subsequent attempts to receive or send (including any waiting receive) will
147 // simply return NNG_ECLOSED.
148 void
nni_pipe_close(nni_pipe * p)149 nni_pipe_close(nni_pipe *p)
150 {
151 nni_mtx_lock(&p->p_mtx);
152 if (p->p_closed) {
153 // We already did a close.
154 nni_mtx_unlock(&p->p_mtx);
155 return;
156 }
157 p->p_closed = true;
158 nni_mtx_unlock(&p->p_mtx);
159
160 if (p->p_proto_data != NULL) {
161 p->p_proto_ops.pipe_close(p->p_proto_data);
162 }
163
164 // Close the underlying transport.
165 if (p->p_tran_data != NULL) {
166 p->p_tran_ops.p_close(p->p_tran_data);
167 }
168
169 nni_reap(&pipe_reap_list, p);
170 }
171
172 uint16_t
nni_pipe_peer(nni_pipe * p)173 nni_pipe_peer(nni_pipe *p)
174 {
175 return (p->p_tran_ops.p_peer(p->p_tran_data));
176 }
177
178 #ifdef NNG_ENABLE_STATS
179 static void
pipe_stat_init(nni_pipe * p,nni_stat_item * item,const nni_stat_info * info)180 pipe_stat_init(nni_pipe *p, nni_stat_item *item, const nni_stat_info *info)
181 {
182 nni_stat_init(item, info);
183 nni_stat_add(&p->st_root, item);
184 }
185
186 static void
pipe_stats_init(nni_pipe * p)187 pipe_stats_init(nni_pipe *p)
188 {
189 static const nni_stat_info root_info = {
190 .si_name = "pipe",
191 .si_desc = "pipe statistics",
192 .si_type = NNG_STAT_SCOPE,
193 };
194 static const nni_stat_info id_info = {
195 .si_name = "id",
196 .si_desc = "pipe id",
197 .si_type = NNG_STAT_ID,
198 };
199 static const nni_stat_info socket_info = {
200 .si_name = "socket",
201 .si_desc = "socket for pipe",
202 .si_type = NNG_STAT_ID,
203 };
204 static const nni_stat_info rx_msgs_info = {
205 .si_name = "rx_msgs",
206 .si_desc = "messages received",
207 .si_type = NNG_STAT_COUNTER,
208 .si_unit = NNG_UNIT_MESSAGES,
209 .si_atomic = true,
210 };
211 static const nni_stat_info tx_msgs_info = {
212 .si_name = "tx_msgs",
213 .si_desc = "messages sent",
214 .si_type = NNG_STAT_COUNTER,
215 .si_unit = NNG_UNIT_MESSAGES,
216 .si_atomic = true,
217 };
218 static const nni_stat_info rx_bytes_info = {
219 .si_name = "rx_bytes",
220 .si_desc = "bytes received",
221 .si_type = NNG_STAT_COUNTER,
222 .si_unit = NNG_UNIT_BYTES,
223 .si_atomic = true,
224 };
225 static const nni_stat_info tx_bytes_info = {
226 .si_name = "tx_bytes",
227 .si_desc = "bytes sent",
228 .si_type = NNG_STAT_COUNTER,
229 .si_unit = NNG_UNIT_BYTES,
230 .si_atomic = true,
231 };
232
233 nni_stat_init(&p->st_root, &root_info);
234 pipe_stat_init(p, &p->st_id, &id_info);
235 pipe_stat_init(p, &p->st_sock_id, &socket_info);
236 pipe_stat_init(p, &p->st_rx_msgs, &rx_msgs_info);
237 pipe_stat_init(p, &p->st_tx_msgs, &tx_msgs_info);
238 pipe_stat_init(p, &p->st_rx_bytes, &rx_bytes_info);
239 pipe_stat_init(p, &p->st_tx_bytes, &tx_bytes_info);
240
241 nni_stat_set_id(&p->st_root, (int) p->p_id);
242 nni_stat_set_id(&p->st_id, (int) p->p_id);
243 nni_stat_set_id(&p->st_sock_id, (int) nni_sock_id(p->p_sock));
244 }
245 #endif // NNG_ENABLE_STATS
246
247 static int
pipe_create(nni_pipe ** pp,nni_sock * sock,nni_sp_tran * tran,void * tdata)248 pipe_create(nni_pipe **pp, nni_sock *sock, nni_sp_tran *tran, void *tdata)
249 {
250 nni_pipe * p;
251 int rv;
252 void * sdata = nni_sock_proto_data(sock);
253 nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
254 size_t sz;
255
256 sz = NNI_ALIGN_UP(sizeof(*p)) + pops->pipe_size;
257
258 if ((p = nni_zalloc(sz)) == NULL) {
259 // In this case we just toss the pipe...
260 tran->tran_pipe->p_fini(tdata);
261 return (NNG_ENOMEM);
262 }
263
264 p->p_size = sz;
265 p->p_proto_data = p + 1;
266 p->p_tran_ops = *tran->tran_pipe;
267 p->p_tran_data = tdata;
268 p->p_proto_ops = *pops;
269 p->p_sock = sock;
270 p->p_closed = false;
271 p->p_cbs = false;
272 p->p_ref = 0;
273
274 nni_atomic_flag_reset(&p->p_stop);
275 NNI_LIST_NODE_INIT(&p->p_sock_node);
276 NNI_LIST_NODE_INIT(&p->p_ep_node);
277
278 nni_mtx_init(&p->p_mtx);
279 nni_cv_init(&p->p_cv, &pipes_lk);
280
281 nni_mtx_lock(&pipes_lk);
282 if ((rv = nni_id_alloc(&pipes, &p->p_id, p)) == 0) {
283 p->p_ref = 1;
284 }
285 nni_mtx_unlock(&pipes_lk);
286
287 #ifdef NNG_ENABLE_STATS
288 pipe_stats_init(p);
289 #endif
290
291 if ((rv != 0) || ((rv = p->p_tran_ops.p_init(tdata, p)) != 0) ||
292 ((rv = pops->pipe_init(p->p_proto_data, p, sdata)) != 0)) {
293 nni_pipe_close(p);
294 nni_pipe_rele(p);
295 return (rv);
296 }
297
298 *pp = p;
299 return (0);
300 }
301
302 int
nni_pipe_create_dialer(nni_pipe ** pp,nni_dialer * d,void * tdata)303 nni_pipe_create_dialer(nni_pipe **pp, nni_dialer *d, void *tdata)
304 {
305 int rv;
306 nni_sp_tran *tran = d->d_tran;
307 nni_pipe * p;
308
309 if ((rv = pipe_create(&p, d->d_sock, tran, tdata)) != 0) {
310 return (rv);
311 }
312 p->p_dialer = d;
313 #ifdef NNG_ENABLE_STATS
314 static const nni_stat_info dialer_info = {
315 .si_name = "dialer",
316 .si_desc = "dialer for pipe",
317 .si_type = NNG_STAT_ID,
318 };
319 pipe_stat_init(p, &p->st_ep_id, &dialer_info);
320 nni_stat_set_id(&p->st_ep_id, (int) nni_dialer_id(d));
321 #endif
322 *pp = p;
323 return (0);
324 }
325
326 int
nni_pipe_create_listener(nni_pipe ** pp,nni_listener * l,void * tdata)327 nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tdata)
328 {
329 int rv;
330 nni_sp_tran *tran = l->l_tran;
331 nni_pipe * p;
332
333 if ((rv = pipe_create(&p, l->l_sock, tran, tdata)) != 0) {
334 return (rv);
335 }
336 p->p_listener = l;
337 #if NNG_ENABLE_STATS
338 static const nni_stat_info listener_info = {
339 .si_name = "listener",
340 .si_desc = "listener for pipe",
341 .si_type = NNG_STAT_ID,
342 };
343 pipe_stat_init(p, &p->st_ep_id, &listener_info);
344 nni_stat_set_id(&p->st_ep_id, (int) nni_listener_id(l));
345 #endif
346 *pp = p;
347 return (0);
348 }
349
350 int
nni_pipe_getopt(nni_pipe * p,const char * name,void * val,size_t * szp,nni_opt_type t)351 nni_pipe_getopt(
352 nni_pipe *p, const char *name, void *val, size_t *szp, nni_opt_type t)
353 {
354 int rv;
355
356 rv = p->p_tran_ops.p_getopt(p->p_tran_data, name, val, szp, t);
357 if (rv != NNG_ENOTSUP) {
358 return (rv);
359 }
360
361 // Maybe the endpoint knows? The guarantees on pipes ensure that the
362 // pipe will not outlive its creating endpoint.
363 if (p->p_dialer != NULL) {
364 return (nni_dialer_getopt(p->p_dialer, name, val, szp, t));
365 }
366 if (p->p_listener != NULL) {
367 return (nni_listener_getopt(p->p_listener, name, val, szp, t));
368 }
369 return (NNG_ENOTSUP);
370 }
371
372 uint32_t
nni_pipe_sock_id(nni_pipe * p)373 nni_pipe_sock_id(nni_pipe *p)
374 {
375 return (nni_sock_id(p->p_sock));
376 }
377
378 uint32_t
nni_pipe_listener_id(nni_pipe * p)379 nni_pipe_listener_id(nni_pipe *p)
380 {
381 return (p->p_listener ? nni_listener_id(p->p_listener) : 0);
382 }
383
384 uint32_t
nni_pipe_dialer_id(nni_pipe * p)385 nni_pipe_dialer_id(nni_pipe *p)
386 {
387 return (p->p_dialer ? nni_dialer_id(p->p_dialer) : 0);
388 }
389
390 void
nni_pipe_add_stat(nni_pipe * p,nni_stat_item * item)391 nni_pipe_add_stat(nni_pipe *p, nni_stat_item *item)
392 {
393 #ifdef NNG_ENABLE_STATS
394 nni_stat_add(&p->st_root, item);
395 #else
396 NNI_ARG_UNUSED(p);
397 NNI_ARG_UNUSED(item);
398 #endif
399 }
400
401 void
nni_pipe_bump_rx(nni_pipe * p,size_t nbytes)402 nni_pipe_bump_rx(nni_pipe *p, size_t nbytes)
403 {
404 #ifdef NNG_ENABLE_STATS
405 nni_stat_inc(&p->st_rx_bytes, nbytes);
406 nni_stat_inc(&p->st_rx_msgs, 1);
407 #else
408 NNI_ARG_UNUSED(p);
409 NNI_ARG_UNUSED(nbytes);
410 #endif
411 }
412
413 void
nni_pipe_bump_tx(nni_pipe * p,size_t nbytes)414 nni_pipe_bump_tx(nni_pipe *p, size_t nbytes)
415 {
416 #ifdef NNG_ENABLE_STATS
417 nni_stat_inc(&p->st_tx_bytes, nbytes);
418 nni_stat_inc(&p->st_tx_msgs, 1);
419 #else
420 NNI_ARG_UNUSED(p);
421 NNI_ARG_UNUSED(nbytes);
422 #endif
423 }
424
425 void
nni_pipe_bump_error(nni_pipe * p,int err)426 nni_pipe_bump_error(nni_pipe *p, int err)
427 {
428 if (p->p_dialer != NULL) {
429 nni_dialer_bump_error(p->p_dialer, err);
430 } else {
431 nni_listener_bump_error(p->p_listener, err);
432 }
433 }