1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 //
5 // This software is supplied under the terms of the MIT License, a
6 // copy of which should be located in the distribution where this
7 // file was obtained (LICENSE.txt). A copy of the license may also be
8 // found online at https://opensource.org/licenses/MIT.
9 //
10
11 #define TEST_NO_MAIN
12
13 #include <nng/nng.h>
14 #include <nng/supplemental/util/platform.h>
15
16 typedef struct {
17 uint8_t * base;
18 size_t rem;
19 nng_iov iov;
20 nng_aio * upper_aio;
21 nng_aio * lower_aio;
22 nng_stream *s;
23 void (*submit)(nng_stream *, nng_aio *);
24 } stream_xfr_t;
25
26 static void
stream_xfr_free(stream_xfr_t * x)27 stream_xfr_free(stream_xfr_t *x)
28 {
29 if (x == NULL) {
30 return;
31 }
32 if (x->upper_aio != NULL) {
33 nng_aio_free(x->upper_aio);
34 }
35 if (x->lower_aio != NULL) {
36 nng_aio_free(x->lower_aio);
37 }
38 nng_free(x, sizeof(*x));
39 }
40
41 static void
stream_xfr_start(stream_xfr_t * x)42 stream_xfr_start(stream_xfr_t *x)
43 {
44 nng_iov iov;
45 iov.iov_buf = x->base;
46 iov.iov_len = x->rem;
47
48 nng_aio_set_iov(x->lower_aio, 1, &iov);
49 x->submit(x->s, x->lower_aio);
50 }
51
52 static void
stream_xfr_cb(void * arg)53 stream_xfr_cb(void *arg)
54 {
55 stream_xfr_t *x = arg;
56 int rv;
57 size_t n;
58
59 rv = nng_aio_result(x->lower_aio);
60 if (rv != 0) {
61 nng_aio_finish(x->upper_aio, rv);
62 return;
63 }
64 n = nng_aio_count(x->lower_aio);
65
66 x->rem -= n;
67 x->base += n;
68
69 if (x->rem == 0) {
70 nng_aio_finish(x->upper_aio, 0);
71 return;
72 }
73
74 stream_xfr_start(x);
75 }
76
77 static stream_xfr_t *
stream_xfr_alloc(nng_stream * s,void (* submit)(nng_stream *,nng_aio *),void * buf,size_t size)78 stream_xfr_alloc(nng_stream *s, void (*submit)(nng_stream *, nng_aio *),
79 void *buf, size_t size)
80 {
81 stream_xfr_t *x;
82
83 if ((x = nng_alloc(size)) == NULL) {
84 return (NULL);
85 }
86 if (nng_aio_alloc(&x->upper_aio, NULL, NULL) != 0) {
87 stream_xfr_free(x);
88 return (NULL);
89 }
90 if (nng_aio_alloc(&x->lower_aio, stream_xfr_cb, x) != 0) {
91 stream_xfr_free(x);
92 return (NULL);
93 }
94
95 // Upper should not take more than 30 seconds, lower not more than 5.
96 nng_aio_set_timeout(x->upper_aio, 30000);
97 nng_aio_set_timeout(x->lower_aio, 5000);
98
99 nng_aio_begin(x->upper_aio);
100
101 x->s = s;
102 x->rem = size;
103 x->base = buf;
104 x->submit = submit;
105
106 return (x);
107 }
108
109 int
nuts_stream_wait(stream_xfr_t * x)110 nuts_stream_wait(stream_xfr_t *x)
111 {
112 int rv;
113 if (x == NULL) {
114 return (NNG_ENOMEM);
115 }
116 nng_aio_wait(x->upper_aio);
117 rv = nng_aio_result(x->upper_aio);
118 stream_xfr_free(x);
119 return (rv);
120 }
121
122 void *
nuts_stream_recv_start(nng_stream * s,void * buf,size_t size)123 nuts_stream_recv_start(nng_stream *s, void *buf, size_t size)
124 {
125 stream_xfr_t *x;
126
127 x = stream_xfr_alloc(s, nng_stream_recv, buf, size);
128 if (x == NULL) {
129 return (x);
130 }
131 stream_xfr_start(x);
132 return (x);
133 }
134
135 void *
nuts_stream_send_start(nng_stream * s,void * buf,size_t size)136 nuts_stream_send_start(nng_stream *s, void *buf, size_t size)
137 {
138 stream_xfr_t *x;
139
140 x = stream_xfr_alloc(s, nng_stream_send, buf, size);
141 if (x == NULL) {
142 return (x);
143 }
144 stream_xfr_start(x);
145 return (x);
146 }
147