1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 // Copyright 2019 Devolutions <info@devolutions.net>
5 //
6 // This software is supplied under the terms of the MIT License, a
7 // copy of which should be located in the distribution where this
8 // file was obtained (LICENSE.txt).  A copy of the license may also be
9 // found online at https://opensource.org/licenses/MIT.
10 //
11 
12 #include "core/nng_impl.h"
13 
14 #include "win_ipc.h"
15 
16 #include <stdio.h>
17 
18 #define CONN(c) ((ipc_conn *) (c))
19 
20 typedef struct ipc_conn {
21 	nng_stream    stream;
22 	HANDLE        f;
23 	nni_win_io    recv_io;
24 	nni_win_io    send_io;
25 	nni_win_io    conn_io;
26 	nni_list      recv_aios;
27 	nni_list      send_aios;
28 	nni_aio *     conn_aio;
29 	nng_sockaddr  sa;
30 	bool          dialer;
31 	int           recv_rv;
32 	int           send_rv;
33 	int           conn_rv;
34 	bool          closed;
35 	nni_mtx       mtx;
36 	nni_cv        cv;
37 	nni_reap_node reap;
38 } ipc_conn;
39 
40 static void
ipc_recv_start(ipc_conn * c)41 ipc_recv_start(ipc_conn *c)
42 {
43 	nni_aio *aio;
44 	unsigned idx;
45 	unsigned naiov;
46 	nni_iov *aiov;
47 	void *   buf;
48 	DWORD    len;
49 	int      rv;
50 
51 	if (c->closed) {
52 		while ((aio = nni_list_first(&c->recv_aios)) != NULL) {
53 			nni_list_remove(&c->recv_aios, aio);
54 			nni_aio_finish_error(aio, NNG_ECLOSED);
55 		}
56 		nni_cv_wake(&c->cv);
57 	}
58 again:
59 	if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
60 		return;
61 	}
62 
63 	nni_aio_get_iov(aio, &naiov, &aiov);
64 
65 	idx = 0;
66 	while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
67 		idx++;
68 	}
69 	NNI_ASSERT(idx < naiov);
70 	// Now start a transfer.  We assume that only one send can be
71 	// outstanding on a pipe at a time.  This is important to avoid
72 	// scrambling the data anyway.  Note that Windows named pipes do
73 	// not appear to support scatter/gather, so we have to process
74 	// each element in turn.
75 	buf = aiov[idx].iov_buf;
76 	len = (DWORD) aiov[idx].iov_len;
77 	NNI_ASSERT(buf != NULL);
78 	NNI_ASSERT(len != 0);
79 
80 	// We limit ourselves to writing 16MB at a time.  Named Pipes
81 	// on Windows have limits of between 31 and 64MB.
82 	if (len > 0x1000000) {
83 		len = 0x1000000;
84 	}
85 
86 	if ((!ReadFile(c->f, buf, len, NULL, &c->recv_io.olpd)) &&
87 	    ((rv = GetLastError()) != ERROR_IO_PENDING)) {
88 		// Synchronous failure.
89 		nni_aio_list_remove(aio);
90 		nni_aio_finish_error(aio, nni_win_error(rv));
91 		goto again;
92 	}
93 }
94 
95 static void
ipc_recv_cb(nni_win_io * io,int rv,size_t num)96 ipc_recv_cb(nni_win_io *io, int rv, size_t num)
97 {
98 	nni_aio * aio;
99 	ipc_conn *c = io->ptr;
100 	nni_mtx_lock(&c->mtx);
101 	if ((aio = nni_list_first(&c->recv_aios)) == NULL) {
102 		// Should indicate that it was closed.
103 		nni_mtx_unlock(&c->mtx);
104 		return;
105 	}
106 	if (c->recv_rv != 0) {
107 		rv         = c->recv_rv;
108 		c->recv_rv = 0;
109 	}
110 	nni_aio_list_remove(aio);
111 	ipc_recv_start(c);
112 	if (c->closed) {
113 		nni_cv_wake(&c->cv);
114 	}
115 	nni_mtx_unlock(&c->mtx);
116 
117 	if ((rv == 0) && (num == 0)) {
118 		// A zero byte receive is a remote close from the peer.
119 		rv = NNG_ECONNSHUT;
120 	}
121 	nni_aio_finish_sync(aio, rv, num);
122 }
123 static void
ipc_recv_cancel(nni_aio * aio,void * arg,int rv)124 ipc_recv_cancel(nni_aio *aio, void *arg, int rv)
125 {
126 	ipc_conn *c = arg;
127 	nni_mtx_lock(&c->mtx);
128 	if (aio == nni_list_first(&c->recv_aios)) {
129 		c->recv_rv = rv;
130 		CancelIoEx(c->f, &c->recv_io.olpd);
131 	} else if (nni_aio_list_active(aio)) {
132 		nni_aio_list_remove(aio);
133 		nni_aio_finish_error(aio, rv);
134 		nni_cv_wake(&c->cv);
135 	}
136 	nni_mtx_unlock(&c->mtx);
137 }
138 
139 static void
ipc_recv(void * arg,nni_aio * aio)140 ipc_recv(void *arg, nni_aio *aio)
141 {
142 	ipc_conn *c = arg;
143 	int       rv;
144 
145 	if (nni_aio_begin(aio) != 0) {
146 		return;
147 	}
148 	nni_mtx_lock(&c->mtx);
149 	if (c->closed) {
150 		nni_mtx_unlock(&c->mtx);
151 		nni_aio_finish_error(aio, NNG_ECLOSED);
152 		return;
153 	}
154 	if ((rv = nni_aio_schedule(aio, ipc_recv_cancel, c)) != 0) {
155 		nni_mtx_unlock(&c->mtx);
156 		nni_aio_finish_error(aio, rv);
157 		return;
158 	}
159 	nni_list_append(&c->recv_aios, aio);
160 	if (aio == nni_list_first(&c->recv_aios)) {
161 		ipc_recv_start(c);
162 	}
163 	nni_mtx_unlock(&c->mtx);
164 }
165 
166 static void
ipc_send_start(ipc_conn * c)167 ipc_send_start(ipc_conn *c)
168 {
169 	nni_aio *aio;
170 	unsigned idx;
171 	unsigned naiov;
172 	nni_iov *aiov;
173 	void *   buf;
174 	DWORD    len;
175 	int      rv;
176 
177 	if (c->closed) {
178 		while ((aio = nni_list_first(&c->send_aios)) != NULL) {
179 			nni_list_remove(&c->send_aios, aio);
180 			nni_aio_finish_error(aio, NNG_ECLOSED);
181 		}
182 		nni_cv_wake(&c->cv);
183 	}
184 again:
185 	if ((aio = nni_list_first(&c->send_aios)) == NULL) {
186 		return;
187 	}
188 
189 	nni_aio_get_iov(aio, &naiov, &aiov);
190 
191 	idx = 0;
192 	while ((idx < naiov) && (aiov[idx].iov_len == 0)) {
193 		idx++;
194 	}
195 	NNI_ASSERT(idx < naiov);
196 	// Now start a transfer.  We assume that only one send can be
197 	// outstanding on a pipe at a time.  This is important to avoid
198 	// scrambling the data anyway.  Note that Windows named pipes do
199 	// not appear to support scatter/gather, so we have to process
200 	// each element in turn.
201 	buf = aiov[idx].iov_buf;
202 	len = (DWORD) aiov[idx].iov_len;
203 	NNI_ASSERT(buf != NULL);
204 	NNI_ASSERT(len != 0);
205 
206 	// We limit ourselves to writing 16MB at a time.  Named Pipes
207 	// on Windows have limits of between 31 and 64MB.
208 	if (len > 0x1000000) {
209 		len = 0x1000000;
210 	}
211 
212 	if ((!WriteFile(c->f, buf, len, NULL, &c->send_io.olpd)) &&
213 	    ((rv = GetLastError()) != ERROR_IO_PENDING)) {
214 		// Synchronous failure.
215 		nni_aio_list_remove(aio);
216 		nni_aio_finish_error(aio, nni_win_error(rv));
217 		goto again;
218 	}
219 }
220 
221 static void
ipc_send_cb(nni_win_io * io,int rv,size_t num)222 ipc_send_cb(nni_win_io *io, int rv, size_t num)
223 {
224 	nni_aio * aio;
225 	ipc_conn *c = io->ptr;
226 	nni_mtx_lock(&c->mtx);
227 	if ((aio = nni_list_first(&c->send_aios)) == NULL) {
228 		// Should indicate that it was closed.
229 		nni_mtx_unlock(&c->mtx);
230 		return;
231 	}
232 	if (c->send_rv != 0) {
233 		rv         = c->send_rv;
234 		c->send_rv = 0;
235 	}
236 	nni_aio_list_remove(aio);
237 	ipc_send_start(c);
238 	if (c->closed) {
239 		nni_cv_wake(&c->cv);
240 	}
241 	nni_mtx_unlock(&c->mtx);
242 
243 	nni_aio_finish_sync(aio, rv, num);
244 }
245 
246 static void
ipc_send_cancel(nni_aio * aio,void * arg,int rv)247 ipc_send_cancel(nni_aio *aio, void *arg, int rv)
248 {
249 	ipc_conn *c = arg;
250 	nni_mtx_lock(&c->mtx);
251 	if (aio == nni_list_first(&c->send_aios)) {
252 		c->send_rv = rv;
253 		CancelIoEx(c->f, &c->send_io.olpd);
254 	} else if (nni_aio_list_active(aio)) {
255 		nni_aio_list_remove(aio);
256 		nni_aio_finish_error(aio, rv);
257 		nni_cv_wake(&c->cv);
258 	}
259 	nni_mtx_unlock(&c->mtx);
260 }
261 
262 static void
ipc_send(void * arg,nni_aio * aio)263 ipc_send(void *arg, nni_aio *aio)
264 {
265 	ipc_conn *c = arg;
266 	int       rv;
267 
268 	if (nni_aio_begin(aio) != 0) {
269 		return;
270 	}
271 	nni_mtx_lock(&c->mtx);
272 	if (c->closed) {
273 		nni_mtx_unlock(&c->mtx);
274 		nni_aio_finish_error(aio, NNG_ECLOSED);
275 		return;
276 	}
277 	if ((rv = nni_aio_schedule(aio, ipc_send_cancel, c)) != 0) {
278 		nni_mtx_unlock(&c->mtx);
279 		nni_aio_finish_error(aio, rv);
280 		return;
281 	}
282 	nni_list_append(&c->send_aios, aio);
283 	if (aio == nni_list_first(&c->send_aios)) {
284 		ipc_send_start(c);
285 	}
286 	nni_mtx_unlock(&c->mtx);
287 }
288 
289 static void
ipc_close(void * arg)290 ipc_close(void *arg)
291 {
292 	ipc_conn *c = arg;
293 	nni_mtx_lock(&c->mtx);
294 	if (!c->closed) {
295 		c->closed = true;
296 		if (!nni_list_empty(&c->recv_aios)) {
297 			CancelIoEx(c->f, &c->recv_io.olpd);
298 		}
299 		if (!nni_list_empty(&c->send_aios)) {
300 			CancelIoEx(c->f, &c->send_io.olpd);
301 		}
302 
303 		if (c->f != INVALID_HANDLE_VALUE) {
304 			// NB: closing the pipe is dangerous at this point.
305 			DisconnectNamedPipe(c->f);
306 		}
307 	}
308 	nni_mtx_unlock(&c->mtx);
309 }
310 
311 static void
ipc_conn_reap(void * arg)312 ipc_conn_reap(void *arg)
313 {
314 	ipc_conn *c = arg;
315 
316 	nni_mtx_lock(&c->mtx);
317 	while ((!nni_list_empty(&c->recv_aios)) ||
318 	    (!nni_list_empty(&c->send_aios))) {
319 		nni_cv_wait(&c->cv);
320 	}
321 	nni_mtx_unlock(&c->mtx);
322 
323 	nni_win_io_fini(&c->recv_io);
324 	nni_win_io_fini(&c->send_io);
325 	nni_win_io_fini(&c->conn_io);
326 
327 	if (c->f != INVALID_HANDLE_VALUE) {
328 		CloseHandle(c->f);
329 	}
330 	nni_cv_fini(&c->cv);
331 	nni_mtx_fini(&c->mtx);
332 	NNI_FREE_STRUCT(c);
333 }
334 
335 static nni_reap_list ipc_reap_list = {
336 	.rl_offset = offsetof(ipc_conn, reap),
337 	.rl_func   = ipc_conn_reap,
338 };
339 
340 static void
ipc_free(void * arg)341 ipc_free(void *arg)
342 {
343 	ipc_conn *c = arg;
344 	ipc_close(c);
345 
346 	nni_reap(&ipc_reap_list, c);
347 }
348 
349 static int
ipc_conn_get_addr(void * c,void * buf,size_t * szp,nni_opt_type t)350 ipc_conn_get_addr(void *c, void *buf, size_t *szp, nni_opt_type t)
351 {
352 	return (nni_copyout_sockaddr(&(CONN(c))->sa, buf, szp, t));
353 }
354 
355 static int
ipc_conn_get_peer_pid(void * c,void * buf,size_t * szp,nni_opt_type t)356 ipc_conn_get_peer_pid(void *c, void *buf, size_t *szp, nni_opt_type t)
357 {
358 	ULONG id;
359 
360 	if (CONN(c)->dialer) {
361 		if (!GetNamedPipeServerProcessId(CONN(c)->f, &id)) {
362 			return (nni_win_error(GetLastError()));
363 		}
364 	} else {
365 		if (!GetNamedPipeClientProcessId(CONN(c)->f, &id)) {
366 			return (nni_win_error(GetLastError()));
367 		}
368 	}
369 	return (nni_copyout_u64(id, buf, szp, t));
370 }
371 
372 static const nni_option ipc_conn_options[] = {
373 	{
374 	    .o_name = NNG_OPT_LOCADDR,
375 	    .o_get  = ipc_conn_get_addr,
376 	},
377 	{
378 	    .o_name = NNG_OPT_REMADDR,
379 	    .o_get  = ipc_conn_get_addr,
380 	},
381 	{
382 	    .o_name = NNG_OPT_IPC_PEER_PID,
383 	    .o_get  = ipc_conn_get_peer_pid,
384 	},
385 	{
386 	    .o_name = NULL, // terminator
387 	},
388 };
389 
390 static int
ipc_set(void * arg,const char * nm,const void * val,size_t sz,nni_opt_type t)391 ipc_set(void *arg, const char *nm, const void *val, size_t sz, nni_opt_type t)
392 {
393 	ipc_conn *c = arg;
394 	return (nni_setopt(ipc_conn_options, nm, c, val, sz, t));
395 }
396 
397 static int
ipc_get(void * arg,const char * nm,void * val,size_t * szp,nni_opt_type t)398 ipc_get(void *arg, const char *nm, void *val, size_t *szp, nni_opt_type t)
399 {
400 	ipc_conn *c = arg;
401 	return (nni_getopt(ipc_conn_options, nm, c, val, szp, t));
402 }
403 
404 int
nni_win_ipc_init(nng_stream ** connp,HANDLE p,const nng_sockaddr * sa,bool dialer)405 nni_win_ipc_init(
406     nng_stream **connp, HANDLE p, const nng_sockaddr *sa, bool dialer)
407 {
408 	ipc_conn *c;
409 	int       rv;
410 
411 	if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
412 		return (NNG_ENOMEM);
413 	}
414 	c->f = INVALID_HANDLE_VALUE;
415 	nni_mtx_init(&c->mtx);
416 	nni_cv_init(&c->cv, &c->mtx);
417 	nni_aio_list_init(&c->recv_aios);
418 	nni_aio_list_init(&c->send_aios);
419 	c->dialer         = dialer;
420 	c->sa             = *sa;
421 	c->stream.s_free  = ipc_free;
422 	c->stream.s_close = ipc_close;
423 	c->stream.s_send  = ipc_send;
424 	c->stream.s_recv  = ipc_recv;
425 	c->stream.s_get   = ipc_get;
426 	c->stream.s_set   = ipc_set;
427 
428 	if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
429 	    ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
430 		ipc_free(c);
431 		return (rv);
432 	}
433 
434 	c->f   = p;
435 	*connp = (void *) c;
436 	return (0);
437 }
438