1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 // Copyright 2019 Devolutions <info@devolutions.net>
5 //
6 // This software is supplied under the terms of the MIT License, a
7 // copy of which should be located in the distribution where this
8 // file was obtained (LICENSE.txt).  A copy of the license may also be
9 // found online at https://opensource.org/licenses/MIT.
10 //
11 
12 #include <stdint.h>
13 #include <string.h>
14 
15 #include <nng/nng.h>
16 
17 #include "core/nng_impl.h"
18 #include "core/tcp.h"
19 
20 typedef struct {
21 	nng_stream_dialer ops;
22 	char *            host;
23 	char *            port;
24 	int               af; // address family
25 	bool              closed;
26 	nng_sockaddr      sa;
27 	nni_tcp_dialer *  d;      // platform dialer implementation
28 	nni_aio *         resaio; // resolver aio
29 	nni_aio *         conaio; // platform connection aio
30 	nni_list          conaios;
31 	nni_mtx           mtx;
32 } tcp_dialer;
33 
34 static void
tcp_dial_cancel(nni_aio * aio,void * arg,int rv)35 tcp_dial_cancel(nni_aio *aio, void *arg, int rv)
36 {
37 	tcp_dialer *d = arg;
38 
39 	nni_mtx_lock(&d->mtx);
40 	if (nni_aio_list_active(aio)) {
41 		nni_aio_list_remove(aio);
42 		nni_aio_finish_error(aio, rv);
43 
44 		if (nni_list_empty(&d->conaios)) {
45 			nni_aio_abort(d->conaio, NNG_ECANCELED);
46 			nni_aio_abort(d->resaio, NNG_ECANCELED);
47 		}
48 	}
49 	nni_mtx_unlock(&d->mtx);
50 }
51 
52 static void
tcp_dial_start_next(tcp_dialer * d)53 tcp_dial_start_next(tcp_dialer *d)
54 {
55 	if (nni_list_empty(&d->conaios)) {
56 		return;
57 	}
58 	nni_resolv_ip(d->host, d->port, d->af, false, &d->sa, d->resaio);
59 }
60 
61 static void
tcp_dial_res_cb(void * arg)62 tcp_dial_res_cb(void *arg)
63 {
64 	tcp_dialer *d = arg;
65 	nni_aio *   aio;
66 	int         rv;
67 
68 	nni_mtx_lock(&d->mtx);
69 	if (d->closed || ((aio = nni_list_first(&d->conaios)) == NULL)) {
70 		// ignore this.
71 		while ((aio = nni_list_first(&d->conaios)) != NULL) {
72 			nni_list_remove(&d->conaios, aio);
73 			nni_aio_finish_error(aio, NNG_ECLOSED);
74 		}
75 		nni_mtx_unlock(&d->mtx);
76 		return;
77 	}
78 
79 	if ((rv = nni_aio_result(d->resaio)) != 0) {
80 		nni_list_remove(&d->conaios, aio);
81 		nni_aio_finish_error(aio, rv);
82 
83 		// try DNS again for next connection...
84 		tcp_dial_start_next(d);
85 
86 	} else {
87 		nni_tcp_dial(d->d, &d->sa, d->conaio);
88 	}
89 
90 	nni_mtx_unlock(&d->mtx);
91 }
92 
93 static void
tcp_dial_con_cb(void * arg)94 tcp_dial_con_cb(void *arg)
95 {
96 	tcp_dialer *d = arg;
97 	nng_aio *   aio;
98 	int         rv;
99 
100 	nni_mtx_lock(&d->mtx);
101 	rv = nni_aio_result(d->conaio);
102 	if ((d->closed) || ((aio = nni_list_first(&d->conaios)) == NULL)) {
103 		if (rv == 0) {
104 			// Make sure we discard the underlying connection.
105 			nng_stream_free(nni_aio_get_output(d->conaio, 0));
106 			nni_aio_set_output(d->conaio, 0, NULL);
107 		}
108 		nni_mtx_unlock(&d->mtx);
109 		return;
110 	}
111 	nni_list_remove(&d->conaios, aio);
112 	if (rv != 0) {
113 		nni_aio_finish_error(aio, rv);
114 	} else {
115 		nni_aio_set_output(aio, 0, nni_aio_get_output(d->conaio, 0));
116 		nni_aio_finish(aio, 0, 0);
117 	}
118 
119 	tcp_dial_start_next(d);
120 	nni_mtx_unlock(&d->mtx);
121 }
122 
123 static void
tcp_dialer_close(void * arg)124 tcp_dialer_close(void *arg)
125 {
126 	tcp_dialer *d = arg;
127 	nni_aio *   aio;
128 	nni_mtx_lock(&d->mtx);
129 	d->closed = true;
130 	while ((aio = nni_list_first(&d->conaios)) != NULL) {
131 		nni_list_remove(&d->conaios, aio);
132 		nni_aio_finish_error(aio, NNG_ECLOSED);
133 	}
134 	nni_tcp_dialer_close(d->d);
135 	nni_mtx_unlock(&d->mtx);
136 }
137 
138 static void
tcp_dialer_free(void * arg)139 tcp_dialer_free(void *arg)
140 {
141 	tcp_dialer *d = arg;
142 
143 	if (d == NULL) {
144 		return;
145 	}
146 
147 	nni_aio_stop(d->resaio);
148 	nni_aio_stop(d->conaio);
149 	nni_aio_free(d->resaio);
150 	nni_aio_free(d->conaio);
151 
152 	if (d->d != NULL) {
153 		nni_tcp_dialer_close(d->d);
154 		nni_tcp_dialer_fini(d->d);
155 	}
156 	nni_mtx_fini(&d->mtx);
157 	nni_strfree(d->host);
158 	nni_strfree(d->port);
159 	NNI_FREE_STRUCT(d);
160 }
161 
162 static void
tcp_dialer_dial(void * arg,nng_aio * aio)163 tcp_dialer_dial(void *arg, nng_aio *aio)
164 {
165 	tcp_dialer *d = arg;
166 	int         rv;
167 	if (nni_aio_begin(aio) != 0) {
168 		return;
169 	}
170 	nni_mtx_lock(&d->mtx);
171 	if (d->closed) {
172 		nni_mtx_unlock(&d->mtx);
173 		nni_aio_finish_error(aio, NNG_ECLOSED);
174 		return;
175 	}
176 	if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
177 		nni_mtx_unlock(&d->mtx);
178 		nni_aio_finish_error(aio, rv);
179 		return;
180 	}
181 	nni_list_append(&d->conaios, aio);
182 	if (nni_list_first(&d->conaios) == aio) {
183 		tcp_dial_start_next(d);
184 	}
185 	nni_mtx_unlock(&d->mtx);
186 }
187 
188 static int
tcp_dialer_get(void * arg,const char * name,void * buf,size_t * szp,nni_type t)189 tcp_dialer_get(
190     void *arg, const char *name, void *buf, size_t *szp, nni_type t)
191 {
192 	tcp_dialer *d = arg;
193 	return (nni_tcp_dialer_get(d->d, name, buf, szp, t));
194 }
195 
196 static int
tcp_dialer_set(void * arg,const char * name,const void * buf,size_t sz,nni_type t)197 tcp_dialer_set(
198     void *arg, const char *name, const void *buf, size_t sz, nni_type t)
199 {
200 	tcp_dialer *d = arg;
201 	return (nni_tcp_dialer_set(d->d, name, buf, sz, t));
202 }
203 
204 static int
tcp_dialer_alloc(tcp_dialer ** dp)205 tcp_dialer_alloc(tcp_dialer **dp)
206 {
207 	int         rv;
208 	tcp_dialer *d;
209 
210 	if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
211 		return (NNG_ENOMEM);
212 	}
213 
214 	nni_mtx_init(&d->mtx);
215 	nni_aio_list_init(&d->conaios);
216 
217 	if (((rv = nni_aio_alloc(&d->resaio, tcp_dial_res_cb, d)) != 0) ||
218 	    ((rv = nni_aio_alloc(&d->conaio, tcp_dial_con_cb, d)) != 0) ||
219 	    ((rv = nni_tcp_dialer_init(&d->d)) != 0)) {
220 		tcp_dialer_free(d);
221 		return (rv);
222 	}
223 
224 	d->ops.sd_close = tcp_dialer_close;
225 	d->ops.sd_free  = tcp_dialer_free;
226 	d->ops.sd_dial  = tcp_dialer_dial;
227 	d->ops.sd_get   = tcp_dialer_get;
228 	d->ops.sd_set   = tcp_dialer_set;
229 
230 	*dp = d;
231 	return (0);
232 }
233 
234 int
nni_tcp_dialer_alloc(nng_stream_dialer ** dp,const nng_url * url)235 nni_tcp_dialer_alloc(nng_stream_dialer **dp, const nng_url *url)
236 {
237 	tcp_dialer *d;
238 	int         rv;
239 	const char *p;
240 
241 	if ((rv = nni_init()) != 0) {
242 		return (rv);
243 	}
244 
245 	if ((rv = tcp_dialer_alloc(&d)) != 0) {
246 		return (rv);
247 	}
248 
249 	if (((p = url->u_port) == NULL) || (strlen(p) == 0)) {
250 		p = nni_url_default_port(url->u_scheme);
251 	}
252 
253 	if ((strlen(p) == 0) || (strlen(url->u_hostname) == 0)) {
254 		// Dialer needs both a destination hostname and port.
255 		tcp_dialer_free(d);
256 		return (NNG_EADDRINVAL);
257 	}
258 
259 	if (strchr(url->u_scheme, '4') != NULL) {
260 		d->af = NNG_AF_INET;
261 	} else if (strchr(url->u_scheme, '6') != NULL) {
262 		d->af = NNG_AF_INET6;
263 	} else {
264 		d->af = NNG_AF_UNSPEC;
265 	}
266 
267 	if (((d->host = nng_strdup(url->u_hostname)) == NULL) ||
268 	    ((d->port = nng_strdup(p)) == NULL)) {
269 		tcp_dialer_free(d);
270 		return (NNG_ENOMEM);
271 	}
272 
273 	*dp = (void *) d;
274 	return (0);
275 }
276 
277 typedef struct {
278 	nng_stream_listener ops;
279 	nni_tcp_listener *  l;
280 	nng_sockaddr        sa;
281 } tcp_listener;
282 
283 static void
tcp_listener_close(void * arg)284 tcp_listener_close(void *arg)
285 {
286 	tcp_listener *l = arg;
287 	nni_tcp_listener_close(l->l);
288 }
289 
290 static void
tcp_listener_free(void * arg)291 tcp_listener_free(void *arg)
292 {
293 	tcp_listener *l = arg;
294 	nni_tcp_listener_fini(l->l);
295 	NNI_FREE_STRUCT(l);
296 }
297 
298 static int
tcp_listener_listen(void * arg)299 tcp_listener_listen(void *arg)
300 {
301 	tcp_listener *l = arg;
302 	return (nni_tcp_listener_listen(l->l, &l->sa));
303 }
304 
305 static void
tcp_listener_accept(void * arg,nng_aio * aio)306 tcp_listener_accept(void *arg, nng_aio *aio)
307 {
308 	tcp_listener *l = arg;
309 	nni_tcp_listener_accept(l->l, aio);
310 }
311 
312 static int
tcp_listener_get_port(void * arg,void * buf,size_t * szp,nni_type t)313 tcp_listener_get_port(void *arg, void *buf, size_t *szp, nni_type t)
314 {
315 	tcp_listener *l = arg;
316 	int           rv;
317 	nng_sockaddr  sa;
318 	size_t        sz;
319 	int           port;
320 	uint8_t *     paddr;
321 
322 	sz = sizeof(sa);
323 	rv = nni_tcp_listener_get(
324 	    l->l, NNG_OPT_LOCADDR, &sa, &sz, NNI_TYPE_SOCKADDR);
325 	if (rv != 0) {
326 		return (rv);
327 	}
328 
329 	switch (sa.s_family) {
330 	case NNG_AF_INET:
331 		paddr = (void *) &sa.s_in.sa_port;
332 		break;
333 
334 	case NNG_AF_INET6:
335 		paddr = (void *) &sa.s_in6.sa_port;
336 		break;
337 
338 	default:
339 		paddr = NULL;
340 		break;
341 	}
342 
343 	if (paddr == NULL) {
344 		return (NNG_ESTATE);
345 	}
346 
347 	NNI_GET16(paddr, port);
348 	return (nni_copyout_int(port, buf, szp, t));
349 }
350 
351 static int
tcp_listener_get(void * arg,const char * name,void * buf,size_t * szp,nni_type t)352 tcp_listener_get(
353     void *arg, const char *name, void *buf, size_t *szp, nni_type t)
354 {
355 	tcp_listener *l = arg;
356 	if (strcmp(name, NNG_OPT_TCP_BOUND_PORT) == 0) {
357 		return (tcp_listener_get_port(l, buf, szp, t));
358 	}
359 	return (nni_tcp_listener_get(l->l, name, buf, szp, t));
360 }
361 
362 static int
tcp_listener_set(void * arg,const char * name,const void * buf,size_t sz,nni_type t)363 tcp_listener_set(
364     void *arg, const char *name, const void *buf, size_t sz, nni_type t)
365 {
366 	tcp_listener *l = arg;
367 	return (nni_tcp_listener_set(l->l, name, buf, sz, t));
368 }
369 
370 static int
tcp_listener_alloc_addr(nng_stream_listener ** lp,const nng_sockaddr * sa)371 tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa)
372 {
373 	tcp_listener *l;
374 	int           rv;
375 
376 	if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
377 		return (NNG_ENOMEM);
378 	}
379 	if ((rv = nni_tcp_listener_init(&l->l)) != 0) {
380 		NNI_FREE_STRUCT(l);
381 		return (rv);
382 	}
383 	l->sa = *sa;
384 
385 	l->ops.sl_free   = tcp_listener_free;
386 	l->ops.sl_close  = tcp_listener_close;
387 	l->ops.sl_listen = tcp_listener_listen;
388 	l->ops.sl_accept = tcp_listener_accept;
389 	l->ops.sl_get    = tcp_listener_get;
390 	l->ops.sl_set    = tcp_listener_set;
391 
392 	*lp = (void *) l;
393 	return (0);
394 }
395 
396 int
nni_tcp_listener_alloc(nng_stream_listener ** lp,const nng_url * url)397 nni_tcp_listener_alloc(nng_stream_listener **lp, const nng_url *url)
398 {
399 	nni_aio *    aio;
400 	int          af;
401 	int          rv;
402 	nng_sockaddr sa;
403 	const char * h;
404 
405 	if ((rv = nni_init()) != 0) {
406 		return (rv);
407 	}
408 	if (strchr(url->u_scheme, '4') != NULL) {
409 		af = NNG_AF_INET;
410 	} else if (strchr(url->u_scheme, '6') != NULL) {
411 		af = NNG_AF_INET6;
412 	} else {
413 		af = NNG_AF_UNSPEC;
414 	}
415 
416 	if ((rv = nng_aio_alloc(&aio, NULL, NULL)) != 0) {
417 		return (rv);
418 	}
419 
420 	h = url->u_hostname;
421 
422 	// Wildcard special case, which means bind to INADDR_ANY.
423 	if ((h != NULL) && ((strcmp(h, "*") == 0) || (strcmp(h, "") == 0))) {
424 		h = NULL;
425 	}
426 	nni_resolv_ip(h, url->u_port, af, true, &sa, aio);
427 	nni_aio_wait(aio);
428 
429 	if ((rv = nni_aio_result(aio)) != 0) {
430 		nni_aio_free(aio);
431 		return (rv);
432 	}
433 	nni_aio_free(aio);
434 
435 	return (tcp_listener_alloc_addr(lp, &sa));
436 }
437