1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 //
5 // This software is supplied under the terms of the MIT License, a
6 // copy of which should be located in the distribution where this
7 // file was obtained (LICENSE.txt).  A copy of the license may also be
8 // found online at https://opensource.org/licenses/MIT.
9 //
10 
11 #define TEST_NO_MAIN
12 
13 #include <nng/nng.h>
14 #include <nng/supplemental/util/platform.h>
15 
16 typedef struct {
17 	uint8_t *   base;
18 	size_t      rem;
19 	nng_iov     iov;
20 	nng_aio *   upper_aio;
21 	nng_aio *   lower_aio;
22 	nng_stream *s;
23 	void (*submit)(nng_stream *, nng_aio *);
24 } stream_xfr_t;
25 
26 static void
stream_xfr_free(stream_xfr_t * x)27 stream_xfr_free(stream_xfr_t *x)
28 {
29 	if (x == NULL) {
30 		return;
31 	}
32 	if (x->upper_aio != NULL) {
33 		nng_aio_free(x->upper_aio);
34 	}
35 	if (x->lower_aio != NULL) {
36 		nng_aio_free(x->lower_aio);
37 	}
38 	nng_free(x, sizeof(*x));
39 }
40 
41 static void
stream_xfr_start(stream_xfr_t * x)42 stream_xfr_start(stream_xfr_t *x)
43 {
44 	nng_iov iov;
45 	iov.iov_buf = x->base;
46 	iov.iov_len = x->rem;
47 
48 	nng_aio_set_iov(x->lower_aio, 1, &iov);
49 	x->submit(x->s, x->lower_aio);
50 }
51 
52 static void
stream_xfr_cb(void * arg)53 stream_xfr_cb(void *arg)
54 {
55 	stream_xfr_t *x = arg;
56 	int           rv;
57 	size_t        n;
58 
59 	rv = nng_aio_result(x->lower_aio);
60 	if (rv != 0) {
61 		nng_aio_finish(x->upper_aio, rv);
62 		return;
63 	}
64 	n = nng_aio_count(x->lower_aio);
65 
66 	x->rem -= n;
67 	x->base += n;
68 
69 	if (x->rem == 0) {
70 		nng_aio_finish(x->upper_aio, 0);
71 		return;
72 	}
73 
74 	stream_xfr_start(x);
75 }
76 
77 static stream_xfr_t *
stream_xfr_alloc(nng_stream * s,void (* submit)(nng_stream *,nng_aio *),void * buf,size_t size)78 stream_xfr_alloc(nng_stream *s, void (*submit)(nng_stream *, nng_aio *),
79     void *buf, size_t size)
80 {
81 	stream_xfr_t *x;
82 
83 	if ((x = nng_alloc(size)) == NULL) {
84 		return (NULL);
85 	}
86 	if (nng_aio_alloc(&x->upper_aio, NULL, NULL) != 0) {
87 		stream_xfr_free(x);
88 		return (NULL);
89 	}
90 	if (nng_aio_alloc(&x->lower_aio, stream_xfr_cb, x) != 0) {
91 		stream_xfr_free(x);
92 		return (NULL);
93 	}
94 
95 	// Upper should not take more than 30 seconds, lower not more than 5.
96 	nng_aio_set_timeout(x->upper_aio, 30000);
97 	nng_aio_set_timeout(x->lower_aio, 5000);
98 
99 	nng_aio_begin(x->upper_aio);
100 
101 	x->s      = s;
102 	x->rem    = size;
103 	x->base   = buf;
104 	x->submit = submit;
105 
106 	return (x);
107 }
108 
109 int
nuts_stream_wait(stream_xfr_t * x)110 nuts_stream_wait(stream_xfr_t *x)
111 {
112 	int rv;
113 	if (x == NULL) {
114 		return (NNG_ENOMEM);
115 	}
116 	nng_aio_wait(x->upper_aio);
117 	rv = nng_aio_result(x->upper_aio);
118 	stream_xfr_free(x);
119 	return (rv);
120 }
121 
122 void *
nuts_stream_recv_start(nng_stream * s,void * buf,size_t size)123 nuts_stream_recv_start(nng_stream *s, void *buf, size_t size)
124 {
125 	stream_xfr_t *x;
126 
127 	x = stream_xfr_alloc(s, nng_stream_recv, buf, size);
128 	if (x == NULL) {
129 		return (x);
130 	}
131 	stream_xfr_start(x);
132 	return (x);
133 }
134 
135 void *
nuts_stream_send_start(nng_stream * s,void * buf,size_t size)136 nuts_stream_send_start(nng_stream *s, void *buf, size_t size)
137 {
138 	stream_xfr_t *x;
139 
140 	x = stream_xfr_alloc(s, nng_stream_send, buf, size);
141 	if (x == NULL) {
142 		return (x);
143 	}
144 	stream_xfr_start(x);
145 	return (x);
146 }
147