1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 // Copyright 2019 Devolutions <info@devolutions.net>
5 //
6 // This software is supplied under the terms of the MIT License, a
7 // copy of which should be located in the distribution where this
8 // file was obtained (LICENSE.txt). A copy of the license may also be
9 // found online at https://opensource.org/licenses/MIT.
10 //
11
12 #include <stdint.h>
13 #include <string.h>
14
15 #include <nng/nng.h>
16
17 #include "core/nng_impl.h"
18 #include "core/tcp.h"
19
20 typedef struct {
21 nng_stream_dialer ops;
22 char * host;
23 char * port;
24 int af; // address family
25 bool closed;
26 nng_sockaddr sa;
27 nni_tcp_dialer * d; // platform dialer implementation
28 nni_aio * resaio; // resolver aio
29 nni_aio * conaio; // platform connection aio
30 nni_list conaios;
31 nni_mtx mtx;
32 } tcp_dialer;
33
34 static void
tcp_dial_cancel(nni_aio * aio,void * arg,int rv)35 tcp_dial_cancel(nni_aio *aio, void *arg, int rv)
36 {
37 tcp_dialer *d = arg;
38
39 nni_mtx_lock(&d->mtx);
40 if (nni_aio_list_active(aio)) {
41 nni_aio_list_remove(aio);
42 nni_aio_finish_error(aio, rv);
43
44 if (nni_list_empty(&d->conaios)) {
45 nni_aio_abort(d->conaio, NNG_ECANCELED);
46 nni_aio_abort(d->resaio, NNG_ECANCELED);
47 }
48 }
49 nni_mtx_unlock(&d->mtx);
50 }
51
52 static void
tcp_dial_start_next(tcp_dialer * d)53 tcp_dial_start_next(tcp_dialer *d)
54 {
55 if (nni_list_empty(&d->conaios)) {
56 return;
57 }
58 nni_resolv_ip(d->host, d->port, d->af, false, &d->sa, d->resaio);
59 }
60
61 static void
tcp_dial_res_cb(void * arg)62 tcp_dial_res_cb(void *arg)
63 {
64 tcp_dialer *d = arg;
65 nni_aio * aio;
66 int rv;
67
68 nni_mtx_lock(&d->mtx);
69 if (d->closed || ((aio = nni_list_first(&d->conaios)) == NULL)) {
70 // ignore this.
71 while ((aio = nni_list_first(&d->conaios)) != NULL) {
72 nni_list_remove(&d->conaios, aio);
73 nni_aio_finish_error(aio, NNG_ECLOSED);
74 }
75 nni_mtx_unlock(&d->mtx);
76 return;
77 }
78
79 if ((rv = nni_aio_result(d->resaio)) != 0) {
80 nni_list_remove(&d->conaios, aio);
81 nni_aio_finish_error(aio, rv);
82
83 // try DNS again for next connection...
84 tcp_dial_start_next(d);
85
86 } else {
87 nni_tcp_dial(d->d, &d->sa, d->conaio);
88 }
89
90 nni_mtx_unlock(&d->mtx);
91 }
92
93 static void
tcp_dial_con_cb(void * arg)94 tcp_dial_con_cb(void *arg)
95 {
96 tcp_dialer *d = arg;
97 nng_aio * aio;
98 int rv;
99
100 nni_mtx_lock(&d->mtx);
101 rv = nni_aio_result(d->conaio);
102 if ((d->closed) || ((aio = nni_list_first(&d->conaios)) == NULL)) {
103 if (rv == 0) {
104 // Make sure we discard the underlying connection.
105 nng_stream_free(nni_aio_get_output(d->conaio, 0));
106 nni_aio_set_output(d->conaio, 0, NULL);
107 }
108 nni_mtx_unlock(&d->mtx);
109 return;
110 }
111 nni_list_remove(&d->conaios, aio);
112 if (rv != 0) {
113 nni_aio_finish_error(aio, rv);
114 } else {
115 nni_aio_set_output(aio, 0, nni_aio_get_output(d->conaio, 0));
116 nni_aio_finish(aio, 0, 0);
117 }
118
119 tcp_dial_start_next(d);
120 nni_mtx_unlock(&d->mtx);
121 }
122
123 static void
tcp_dialer_close(void * arg)124 tcp_dialer_close(void *arg)
125 {
126 tcp_dialer *d = arg;
127 nni_aio * aio;
128 nni_mtx_lock(&d->mtx);
129 d->closed = true;
130 while ((aio = nni_list_first(&d->conaios)) != NULL) {
131 nni_list_remove(&d->conaios, aio);
132 nni_aio_finish_error(aio, NNG_ECLOSED);
133 }
134 nni_tcp_dialer_close(d->d);
135 nni_mtx_unlock(&d->mtx);
136 }
137
138 static void
tcp_dialer_free(void * arg)139 tcp_dialer_free(void *arg)
140 {
141 tcp_dialer *d = arg;
142
143 if (d == NULL) {
144 return;
145 }
146
147 nni_aio_stop(d->resaio);
148 nni_aio_stop(d->conaio);
149 nni_aio_free(d->resaio);
150 nni_aio_free(d->conaio);
151
152 if (d->d != NULL) {
153 nni_tcp_dialer_close(d->d);
154 nni_tcp_dialer_fini(d->d);
155 }
156 nni_mtx_fini(&d->mtx);
157 nni_strfree(d->host);
158 nni_strfree(d->port);
159 NNI_FREE_STRUCT(d);
160 }
161
162 static void
tcp_dialer_dial(void * arg,nng_aio * aio)163 tcp_dialer_dial(void *arg, nng_aio *aio)
164 {
165 tcp_dialer *d = arg;
166 int rv;
167 if (nni_aio_begin(aio) != 0) {
168 return;
169 }
170 nni_mtx_lock(&d->mtx);
171 if (d->closed) {
172 nni_mtx_unlock(&d->mtx);
173 nni_aio_finish_error(aio, NNG_ECLOSED);
174 return;
175 }
176 if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
177 nni_mtx_unlock(&d->mtx);
178 nni_aio_finish_error(aio, rv);
179 return;
180 }
181 nni_list_append(&d->conaios, aio);
182 if (nni_list_first(&d->conaios) == aio) {
183 tcp_dial_start_next(d);
184 }
185 nni_mtx_unlock(&d->mtx);
186 }
187
188 static int
tcp_dialer_get(void * arg,const char * name,void * buf,size_t * szp,nni_type t)189 tcp_dialer_get(
190 void *arg, const char *name, void *buf, size_t *szp, nni_type t)
191 {
192 tcp_dialer *d = arg;
193 return (nni_tcp_dialer_get(d->d, name, buf, szp, t));
194 }
195
196 static int
tcp_dialer_set(void * arg,const char * name,const void * buf,size_t sz,nni_type t)197 tcp_dialer_set(
198 void *arg, const char *name, const void *buf, size_t sz, nni_type t)
199 {
200 tcp_dialer *d = arg;
201 return (nni_tcp_dialer_set(d->d, name, buf, sz, t));
202 }
203
204 static int
tcp_dialer_alloc(tcp_dialer ** dp)205 tcp_dialer_alloc(tcp_dialer **dp)
206 {
207 int rv;
208 tcp_dialer *d;
209
210 if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
211 return (NNG_ENOMEM);
212 }
213
214 nni_mtx_init(&d->mtx);
215 nni_aio_list_init(&d->conaios);
216
217 if (((rv = nni_aio_alloc(&d->resaio, tcp_dial_res_cb, d)) != 0) ||
218 ((rv = nni_aio_alloc(&d->conaio, tcp_dial_con_cb, d)) != 0) ||
219 ((rv = nni_tcp_dialer_init(&d->d)) != 0)) {
220 tcp_dialer_free(d);
221 return (rv);
222 }
223
224 d->ops.sd_close = tcp_dialer_close;
225 d->ops.sd_free = tcp_dialer_free;
226 d->ops.sd_dial = tcp_dialer_dial;
227 d->ops.sd_get = tcp_dialer_get;
228 d->ops.sd_set = tcp_dialer_set;
229
230 *dp = d;
231 return (0);
232 }
233
234 int
nni_tcp_dialer_alloc(nng_stream_dialer ** dp,const nng_url * url)235 nni_tcp_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
236 {
237 tcp_dialer *d;
238 int rv;
239 const char *p;
240
241 if ((rv = nni_init()) != 0) {
242 return (rv);
243 }
244
245 if ((rv = tcp_dialer_alloc(&d)) != 0) {
246 return (rv);
247 }
248
249 if (((p = url->u_port) == NULL) || (strlen(p) == 0)) {
250 p = nni_url_default_port(url->u_scheme);
251 }
252
253 if ((strlen(p) == 0) || (strlen(url->u_hostname) == 0)) {
254 // Dialer needs both a destination hostname and port.
255 tcp_dialer_free(d);
256 return (NNG_EADDRINVAL);
257 }
258
259 if (strchr(url->u_scheme, '4') != NULL) {
260 d->af = NNG_AF_INET;
261 } else if (strchr(url->u_scheme, '6') != NULL) {
262 d->af = NNG_AF_INET6;
263 } else {
264 d->af = NNG_AF_UNSPEC;
265 }
266
267 if (((d->host = nng_strdup(url->u_hostname)) == NULL) ||
268 ((d->port = nng_strdup(p)) == NULL)) {
269 tcp_dialer_free(d);
270 return (NNG_ENOMEM);
271 }
272
273 *dp = (void *) d;
274 return (0);
275 }
276
277 typedef struct {
278 nng_stream_listener ops;
279 nni_tcp_listener * l;
280 nng_sockaddr sa;
281 } tcp_listener;
282
283 static void
tcp_listener_close(void * arg)284 tcp_listener_close(void *arg)
285 {
286 tcp_listener *l = arg;
287 nni_tcp_listener_close(l->l);
288 }
289
290 static void
tcp_listener_free(void * arg)291 tcp_listener_free(void *arg)
292 {
293 tcp_listener *l = arg;
294 nni_tcp_listener_fini(l->l);
295 NNI_FREE_STRUCT(l);
296 }
297
298 static int
tcp_listener_listen(void * arg)299 tcp_listener_listen(void *arg)
300 {
301 tcp_listener *l = arg;
302 return (nni_tcp_listener_listen(l->l, &l->sa));
303 }
304
305 static void
tcp_listener_accept(void * arg,nng_aio * aio)306 tcp_listener_accept(void *arg, nng_aio *aio)
307 {
308 tcp_listener *l = arg;
309 nni_tcp_listener_accept(l->l, aio);
310 }
311
312 static int
tcp_listener_get_port(void * arg,void * buf,size_t * szp,nni_type t)313 tcp_listener_get_port(void *arg, void *buf, size_t *szp, nni_type t)
314 {
315 tcp_listener *l = arg;
316 int rv;
317 nng_sockaddr sa;
318 size_t sz;
319 int port;
320 uint8_t * paddr;
321
322 sz = sizeof(sa);
323 rv = nni_tcp_listener_get(
324 l->l, NNG_OPT_LOCADDR, &sa, &sz, NNI_TYPE_SOCKADDR);
325 if (rv != 0) {
326 return (rv);
327 }
328
329 switch (sa.s_family) {
330 case NNG_AF_INET:
331 paddr = (void *) &sa.s_in.sa_port;
332 break;
333
334 case NNG_AF_INET6:
335 paddr = (void *) &sa.s_in6.sa_port;
336 break;
337
338 default:
339 paddr = NULL;
340 break;
341 }
342
343 if (paddr == NULL) {
344 return (NNG_ESTATE);
345 }
346
347 NNI_GET16(paddr, port);
348 return (nni_copyout_int(port, buf, szp, t));
349 }
350
351 static int
tcp_listener_get(void * arg,const char * name,void * buf,size_t * szp,nni_type t)352 tcp_listener_get(
353 void *arg, const char *name, void *buf, size_t *szp, nni_type t)
354 {
355 tcp_listener *l = arg;
356 if (strcmp(name, NNG_OPT_TCP_BOUND_PORT) == 0) {
357 return (tcp_listener_get_port(l, buf, szp, t));
358 }
359 return (nni_tcp_listener_get(l->l, name, buf, szp, t));
360 }
361
362 static int
tcp_listener_set(void * arg,const char * name,const void * buf,size_t sz,nni_type t)363 tcp_listener_set(
364 void *arg, const char *name, const void *buf, size_t sz, nni_type t)
365 {
366 tcp_listener *l = arg;
367 return (nni_tcp_listener_set(l->l, name, buf, sz, t));
368 }
369
370 static int
tcp_listener_alloc_addr(nng_stream_listener ** lp,const nng_sockaddr * sa)371 tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa)
372 {
373 tcp_listener *l;
374 int rv;
375
376 if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
377 return (NNG_ENOMEM);
378 }
379 if ((rv = nni_tcp_listener_init(&l->l)) != 0) {
380 NNI_FREE_STRUCT(l);
381 return (rv);
382 }
383 l->sa = *sa;
384
385 l->ops.sl_free = tcp_listener_free;
386 l->ops.sl_close = tcp_listener_close;
387 l->ops.sl_listen = tcp_listener_listen;
388 l->ops.sl_accept = tcp_listener_accept;
389 l->ops.sl_get = tcp_listener_get;
390 l->ops.sl_set = tcp_listener_set;
391
392 *lp = (void *) l;
393 return (0);
394 }
395
396 int
nni_tcp_listener_alloc(nng_stream_listener ** lp,const nng_url * url)397 nni_tcp_listener_alloc(nng_stream_listener **lp, const nng_url *url)
398 {
399 nni_aio * aio;
400 int af;
401 int rv;
402 nng_sockaddr sa;
403 const char * h;
404
405 if ((rv = nni_init()) != 0) {
406 return (rv);
407 }
408 if (strchr(url->u_scheme, '4') != NULL) {
409 af = NNG_AF_INET;
410 } else if (strchr(url->u_scheme, '6') != NULL) {
411 af = NNG_AF_INET6;
412 } else {
413 af = NNG_AF_UNSPEC;
414 }
415
416 if ((rv = nng_aio_alloc(&aio, NULL, NULL)) != 0) {
417 return (rv);
418 }
419
420 h = url->u_hostname;
421
422 // Wildcard special case, which means bind to INADDR_ANY.
423 if ((h != NULL) && ((strcmp(h, "*") == 0) || (strcmp(h, "") == 0))) {
424 h = NULL;
425 }
426 nni_resolv_ip(h, url->u_port, af, true, &sa, aio);
427 nni_aio_wait(aio);
428
429 if ((rv = nni_aio_result(aio)) != 0) {
430 nni_aio_free(aio);
431 return (rv);
432 }
433 nni_aio_free(aio);
434
435 return (tcp_listener_alloc_addr(lp, &sa));
436 }
437