1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 // Copyright 2019 Devolutions <info@devolutions.net>
5 //
6 // This software is supplied under the terms of the MIT License, a
7 // copy of which should be located in the distribution where this
8 // file was obtained (LICENSE.txt). A copy of the license may also be
9 // found online at https://opensource.org/licenses/MIT.
10 //
11
12 #include "core/nng_impl.h"
13
14 #include "win_ipc.h"
15
16 #include <stdio.h>
17
18 #define CONN(c) ((ipc_conn *) (c))
19
20 typedef struct ipc_conn {
21 nng_stream stream;
22 HANDLE f;
23 nni_win_io recv_io;
24 nni_win_io send_io;
25 nni_win_io conn_io;
26 nni_list recv_aios;
27 nni_list send_aios;
28 nni_aio * conn_aio;
29 nng_sockaddr sa;
30 bool dialer;
31 int recv_rv;
32 int send_rv;
33 int conn_rv;
34 bool closed;
35 nni_mtx mtx;
36 nni_cv cv;
37 nni_reap_node reap;
38 } ipc_conn;
39
40 static void
ipc_recv_start(ipc_conn * c)41 ipc_recv_start(ipc_conn *c)
42 {
43 nni_aio *aio;
44 unsigned idx;
45 unsigned naiov;
46 nni_iov *aiov;
47 void * buf;
48 DWORD len;
49 int rv;
50
51 if (c->closed) {
52 while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
53 nni_list_remove(&c->recv_aios, aio);
54 nni_aio_finish_error(aio, NNG_ECLOSED);
55 }
56 nni_cv_wake(&c->cv);
57 }
58 again:
59 if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
60 return;
61 }
62
63 nni_aio_get_iov(aio, &naiov, &aiov);
64
65 idx = 0;
66 while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
67 idx++;
68 }
69 NNI_ASSERT(idx < naiov);
70 // Now start a transfer. We assume that only one send can be
71 // outstanding on a pipe at a time. This is important to avoid
72 // scrambling the data anyway. Note that Windows named pipes do
73 // not appear to support scatter/gather, so we have to process
74 // each element in turn.
75 buf = aiov[idx].iov_buf;
76 len = (DWORD) aiov[idx].iov_len;
77 NNI_ASSERT(buf != NULL);
78 NNI_ASSERT(len != 0);
79
80 // We limit ourselves to writing 16MB at a time. Named Pipes
81 // on Windows have limits of between 31 and 64MB.
82 if (len > 0x1000000) {
83 len = 0x1000000;
84 }
85
86 if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
87 ((rv = GetLastError()) != ERROR_IO_PENDING)) {
88 // Synchronous failure.
89 nni_aio_list_remove(aio);
90 nni_aio_finish_error(aio, nni_win_error(rv));
91 goto again;
92 }
93 }
94
95 static void
ipc_recv_cb(nni_win_io * io,int rv,size_t num)96 ipc_recv_cb(nni_win_io *io, int rv, size_t num)
97 {
98 nni_aio * aio;
99 ipc_conn *c = io->ptr;
100 nni_mtx_lock(&c->mtx);
101 if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
102 // Should indicate that it was closed.
103 nni_mtx_unlock(&c->mtx);
104 return;
105 }
106 if (c->recv_rv != 0) {
107 rv = c->recv_rv;
108 c->recv_rv = 0;
109 }
110 nni_aio_list_remove(aio);
111 ipc_recv_start(c);
112 if (c->closed) {
113 nni_cv_wake(&c->cv);
114 }
115 nni_mtx_unlock(&c->mtx);
116
117 if ((rv == 0) && (num == 0)) {
118 // A zero byte receive is a remote close from the peer.
119 rv = NNG_ECONNSHUT;
120 }
121 nni_aio_finish_sync(aio, rv, num);
122 }
123 static void
ipc_recv_cancel(nni_aio * aio,void * arg,int rv)124 ipc_recv_cancel(nni_aio *aio, void *arg, int rv)
125 {
126 ipc_conn *c = arg;
127 nni_mtx_lock(&c->mtx);
128 if (aio == nni_list_first(&c->recv_aios)) {
129 c->recv_rv = rv;
130 CancelIoEx(c->f, &c->recv_io.olpd);
131 } else if (nni_aio_list_active(aio)) {
132 nni_aio_list_remove(aio);
133 nni_aio_finish_error(aio, rv);
134 nni_cv_wake(&c->cv);
135 }
136 nni_mtx_unlock(&c->mtx);
137 }
138
139 static void
ipc_recv(void * arg,nni_aio * aio)140 ipc_recv(void *arg, nni_aio *aio)
141 {
142 ipc_conn *c = arg;
143 int rv;
144
145 if (nni_aio_begin(aio) != 0) {
146 return;
147 }
148 nni_mtx_lock(&c->mtx);
149 if (c->closed) {
150 nni_mtx_unlock(&c->mtx);
151 nni_aio_finish_error(aio, NNG_ECLOSED);
152 return;
153 }
154 if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
155 nni_mtx_unlock(&c->mtx);
156 nni_aio_finish_error(aio, rv);
157 return;
158 }
159 nni_list_append(&c->recv_aios, aio);
160 if (aio == nni_list_first(&c->recv_aios)) {
161 ipc_recv_start(c);
162 }
163 nni_mtx_unlock(&c->mtx);
164 }
165
166 static void
ipc_send_start(ipc_conn * c)167 ipc_send_start(ipc_conn *c)
168 {
169 nni_aio *aio;
170 unsigned idx;
171 unsigned naiov;
172 nni_iov *aiov;
173 void * buf;
174 DWORD len;
175 int rv;
176
177 if (c->closed) {
178 while ((aio = nni_list_first(&c->send_aios)) != NULL) {
179 nni_list_remove(&c->send_aios, aio);
180 nni_aio_finish_error(aio, NNG_ECLOSED);
181 }
182 nni_cv_wake(&c->cv);
183 }
184 again:
185 if ((aio = nni_list_first(&c->send_aios)) == NULL) {
186 return;
187 }
188
189 nni_aio_get_iov(aio, &naiov, &aiov);
190
191 idx = 0;
192 while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
193 idx++;
194 }
195 NNI_ASSERT(idx < naiov);
196 // Now start a transfer. We assume that only one send can be
197 // outstanding on a pipe at a time. This is important to avoid
198 // scrambling the data anyway. Note that Windows named pipes do
199 // not appear to support scatter/gather, so we have to process
200 // each element in turn.
201 buf = aiov[idx].iov_buf;
202 len = (DWORD) aiov[idx].iov_len;
203 NNI_ASSERT(buf != NULL);
204 NNI_ASSERT(len != 0);
205
206 // We limit ourselves to writing 16MB at a time. Named Pipes
207 // on Windows have limits of between 31 and 64MB.
208 if (len > 0x1000000) {
209 len = 0x1000000;
210 }
211
212 if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
213 ((rv = GetLastError()) != ERROR_IO_PENDING)) {
214 // Synchronous failure.
215 nni_aio_list_remove(aio);
216 nni_aio_finish_error(aio, nni_win_error(rv));
217 goto again;
218 }
219 }
220
221 static void
ipc_send_cb(nni_win_io * io,int rv,size_t num)222 ipc_send_cb(nni_win_io *io, int rv, size_t num)
223 {
224 nni_aio * aio;
225 ipc_conn *c = io->ptr;
226 nni_mtx_lock(&c->mtx);
227 if ((aio = nni_list_first(&c->send_aios)) == NULL) {
228 // Should indicate that it was closed.
229 nni_mtx_unlock(&c->mtx);
230 return;
231 }
232 if (c->send_rv != 0) {
233 rv = c->send_rv;
234 c->send_rv = 0;
235 }
236 nni_aio_list_remove(aio);
237 ipc_send_start(c);
238 if (c->closed) {
239 nni_cv_wake(&c->cv);
240 }
241 nni_mtx_unlock(&c->mtx);
242
243 nni_aio_finish_sync(aio, rv, num);
244 }
245
246 static void
ipc_send_cancel(nni_aio * aio,void * arg,int rv)247 ipc_send_cancel(nni_aio *aio, void *arg, int rv)
248 {
249 ipc_conn *c = arg;
250 nni_mtx_lock(&c->mtx);
251 if (aio == nni_list_first(&c->send_aios)) {
252 c->send_rv = rv;
253 CancelIoEx(c->f, &c->send_io.olpd);
254 } else if (nni_aio_list_active(aio)) {
255 nni_aio_list_remove(aio);
256 nni_aio_finish_error(aio, rv);
257 nni_cv_wake(&c->cv);
258 }
259 nni_mtx_unlock(&c->mtx);
260 }
261
262 static void
ipc_send(void * arg,nni_aio * aio)263 ipc_send(void *arg, nni_aio *aio)
264 {
265 ipc_conn *c = arg;
266 int rv;
267
268 if (nni_aio_begin(aio) != 0) {
269 return;
270 }
271 nni_mtx_lock(&c->mtx);
272 if (c->closed) {
273 nni_mtx_unlock(&c->mtx);
274 nni_aio_finish_error(aio, NNG_ECLOSED);
275 return;
276 }
277 if ((rv = nni_aio_schedule(aio, ipc_send_cancel, c)) != 0) {
278 nni_mtx_unlock(&c->mtx);
279 nni_aio_finish_error(aio, rv);
280 return;
281 }
282 nni_list_append(&c->send_aios, aio);
283 if (aio == nni_list_first(&c->send_aios)) {
284 ipc_send_start(c);
285 }
286 nni_mtx_unlock(&c->mtx);
287 }
288
289 static void
ipc_close(void * arg)290 ipc_close(void *arg)
291 {
292 ipc_conn *c = arg;
293 nni_mtx_lock(&c->mtx);
294 if (!c->closed) {
295 c->closed = true;
296 if (!nni_list_empty(&c->recv_aios)) {
297 CancelIoEx(c->f, &c->recv_io.olpd);
298 }
299 if (!nni_list_empty(&c->send_aios)) {
300 CancelIoEx(c->f, &c->send_io.olpd);
301 }
302
303 if (c->f != INVALID_HANDLE_VALUE) {
304 // NB: closing the pipe is dangerous at this point.
305 DisconnectNamedPipe(c->f);
306 }
307 }
308 nni_mtx_unlock(&c->mtx);
309 }
310
311 static void
ipc_conn_reap(void * arg)312 ipc_conn_reap(void *arg)
313 {
314 ipc_conn *c = arg;
315
316 nni_mtx_lock(&c->mtx);
317 while ((!nni_list_empty(&c->recv_aios)) ||
318 (!nni_list_empty(&c->send_aios))) {
319 nni_cv_wait(&c->cv);
320 }
321 nni_mtx_unlock(&c->mtx);
322
323 nni_win_io_fini(&c->recv_io);
324 nni_win_io_fini(&c->send_io);
325 nni_win_io_fini(&c->conn_io);
326
327 if (c->f != INVALID_HANDLE_VALUE) {
328 CloseHandle(c->f);
329 }
330 nni_cv_fini(&c->cv);
331 nni_mtx_fini(&c->mtx);
332 NNI_FREE_STRUCT(c);
333 }
334
335 static nni_reap_list ipc_reap_list = {
336 .rl_offset = offsetof(ipc_conn, reap),
337 .rl_func = ipc_conn_reap,
338 };
339
340 static void
ipc_free(void * arg)341 ipc_free(void *arg)
342 {
343 ipc_conn *c = arg;
344 ipc_close(c);
345
346 nni_reap(&ipc_reap_list, c);
347 }
348
349 static int
ipc_conn_get_addr(void * c,void * buf,size_t * szp,nni_opt_type t)350 ipc_conn_get_addr(void *c, void *buf, size_t *szp, nni_opt_type t)
351 {
352 return (nni_copyout_sockaddr(&(CONN(c))->sa, buf, szp, t));
353 }
354
355 static int
ipc_conn_get_peer_pid(void * c,void * buf,size_t * szp,nni_opt_type t)356 ipc_conn_get_peer_pid(void *c, void *buf, size_t *szp, nni_opt_type t)
357 {
358 ULONG id;
359
360 if (CONN(c)->dialer) {
361 if (!GetNamedPipeServerProcessId(CONN(c)->f, &id)) {
362 return (nni_win_error(GetLastError()));
363 }
364 } else {
365 if (!GetNamedPipeClientProcessId(CONN(c)->f, &id)) {
366 return (nni_win_error(GetLastError()));
367 }
368 }
369 return (nni_copyout_u64(id, buf, szp, t));
370 }
371
372 static const nni_option ipc_conn_options[] = {
373 {
374 .o_name = NNG_OPT_LOCADDR,
375 .o_get = ipc_conn_get_addr,
376 },
377 {
378 .o_name = NNG_OPT_REMADDR,
379 .o_get = ipc_conn_get_addr,
380 },
381 {
382 .o_name = NNG_OPT_IPC_PEER_PID,
383 .o_get = ipc_conn_get_peer_pid,
384 },
385 {
386 .o_name = NULL, // terminator
387 },
388 };
389
390 static int
ipc_set(void * arg,const char * nm,const void * val,size_t sz,nni_opt_type t)391 ipc_set(void *arg, const char *nm, const void *val, size_t sz, nni_opt_type t)
392 {
393 ipc_conn *c = arg;
394 return (nni_setopt(ipc_conn_options, nm, c, val, sz, t));
395 }
396
397 static int
ipc_get(void * arg,const char * nm,void * val,size_t * szp,nni_opt_type t)398 ipc_get(void *arg, const char *nm, void *val, size_t *szp, nni_opt_type t)
399 {
400 ipc_conn *c = arg;
401 return (nni_getopt(ipc_conn_options, nm, c, val, szp, t));
402 }
403
404 int
nni_win_ipc_init(nng_stream ** connp,HANDLE p,const nng_sockaddr * sa,bool dialer)405 nni_win_ipc_init(
406 nng_stream **connp, HANDLE p, const nng_sockaddr *sa, bool dialer)
407 {
408 ipc_conn *c;
409 int rv;
410
411 if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
412 return (NNG_ENOMEM);
413 }
414 c->f = INVALID_HANDLE_VALUE;
415 nni_mtx_init(&c->mtx);
416 nni_cv_init(&c->cv, &c->mtx);
417 nni_aio_list_init(&c->recv_aios);
418 nni_aio_list_init(&c->send_aios);
419 c->dialer = dialer;
420 c->sa = *sa;
421 c->stream.s_free = ipc_free;
422 c->stream.s_close = ipc_close;
423 c->stream.s_send = ipc_send;
424 c->stream.s_recv = ipc_recv;
425 c->stream.s_get = ipc_get;
426 c->stream.s_set = ipc_set;
427
428 if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
429 ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
430 ipc_free(c);
431 return (rv);
432 }
433
434 c->f = p;
435 *connp = (void *) c;
436 return (0);
437 }
438