1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include <assert.h>
23 #include <io.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27
28 #include "handle-inl.h"
29 #include "internal.h"
30 #include "req-inl.h"
31 #include "stream-inl.h"
32 #include "uv-common.h"
33 #include "uv.h"
34
35 #include <aclapi.h>
36 #include <accctrl.h>
37
38 /* A zero-size buffer for use by uv_pipe_read */
39 static char uv_zero_[] = "";
40
41 /* Null uv_buf_t */
42 static const uv_buf_t uv_null_buf_ = { 0, NULL };
43
44 /* The timeout that the pipe will wait for the remote end to write data when
45 * the local ends wants to shut it down. */
46 static const int64_t eof_timeout = 50; /* ms */
47
48 static const int default_pending_pipe_instances = 4;
49
50 /* Pipe prefix */
51 static char pipe_prefix[] = "\\\\?\\pipe";
52 static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
53
54 /* IPC incoming xfer queue item. */
55 typedef struct {
56 uv__ipc_socket_xfer_type_t xfer_type;
57 uv__ipc_socket_xfer_info_t xfer_info;
58 QUEUE member;
59 } uv__ipc_xfer_queue_item_t;
60
61 /* IPC frame header flags. */
62 /* clang-format off */
63 enum {
64 UV__IPC_FRAME_HAS_DATA = 0x01,
65 UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
66 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
67 /* These are combinations of the flags above. */
68 UV__IPC_FRAME_XFER_FLAGS = 0x06,
69 UV__IPC_FRAME_VALID_FLAGS = 0x07
70 };
71 /* clang-format on */
72
73 /* IPC frame header. */
74 typedef struct {
75 uint32_t flags;
76 uint32_t reserved1; /* Ignored. */
77 uint32_t data_length; /* Must be zero if there is no data. */
78 uint32_t reserved2; /* Must be zero. */
79 } uv__ipc_frame_header_t;
80
81 /* To implement the IPC protocol correctly, these structures must have exactly
82 * the right size. */
83 STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
84 STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
85
86 /* Coalesced write request. */
87 typedef struct {
88 uv_write_t req; /* Internal heap-allocated write request. */
89 uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
90 } uv__coalesced_write_t;
91
92
93 static void eof_timer_init(uv_pipe_t* pipe);
94 static void eof_timer_start(uv_pipe_t* pipe);
95 static void eof_timer_stop(uv_pipe_t* pipe);
96 static void eof_timer_cb(uv_timer_t* timer);
97 static void eof_timer_destroy(uv_pipe_t* pipe);
98 static void eof_timer_close_cb(uv_handle_t* handle);
99
100
uv_unique_pipe_name(char * ptr,char * name,size_t size)101 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
102 snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
103 }
104
105
uv_pipe_init(uv_loop_t * loop,uv_pipe_t * handle,int ipc)106 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
107 uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
108
109 handle->reqs_pending = 0;
110 handle->handle = INVALID_HANDLE_VALUE;
111 handle->name = NULL;
112 handle->pipe.conn.ipc_remote_pid = 0;
113 handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
114 QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
115 handle->pipe.conn.ipc_xfer_queue_length = 0;
116 handle->ipc = ipc;
117 handle->pipe.conn.non_overlapped_writes_tail = NULL;
118
119 return 0;
120 }
121
122
uv_pipe_connection_init(uv_pipe_t * handle)123 static void uv_pipe_connection_init(uv_pipe_t* handle) {
124 uv_connection_init((uv_stream_t*) handle);
125 handle->read_req.data = handle;
126 handle->pipe.conn.eof_timer = NULL;
127 assert(!(handle->flags & UV_HANDLE_PIPESERVER));
128 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
129 handle->pipe.conn.readfile_thread_handle = NULL;
130 InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
131 }
132 }
133
134
open_named_pipe(const WCHAR * name,DWORD * duplex_flags)135 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
136 HANDLE pipeHandle;
137
138 /*
139 * Assume that we have a duplex pipe first, so attempt to
140 * connect with GENERIC_READ | GENERIC_WRITE.
141 */
142 pipeHandle = CreateFileW(name,
143 GENERIC_READ | GENERIC_WRITE,
144 0,
145 NULL,
146 OPEN_EXISTING,
147 FILE_FLAG_OVERLAPPED,
148 NULL);
149 if (pipeHandle != INVALID_HANDLE_VALUE) {
150 *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
151 return pipeHandle;
152 }
153
154 /*
155 * If the pipe is not duplex CreateFileW fails with
156 * ERROR_ACCESS_DENIED. In that case try to connect
157 * as a read-only or write-only.
158 */
159 if (GetLastError() == ERROR_ACCESS_DENIED) {
160 pipeHandle = CreateFileW(name,
161 GENERIC_READ | FILE_WRITE_ATTRIBUTES,
162 0,
163 NULL,
164 OPEN_EXISTING,
165 FILE_FLAG_OVERLAPPED,
166 NULL);
167
168 if (pipeHandle != INVALID_HANDLE_VALUE) {
169 *duplex_flags = UV_HANDLE_READABLE;
170 return pipeHandle;
171 }
172 }
173
174 if (GetLastError() == ERROR_ACCESS_DENIED) {
175 pipeHandle = CreateFileW(name,
176 GENERIC_WRITE | FILE_READ_ATTRIBUTES,
177 0,
178 NULL,
179 OPEN_EXISTING,
180 FILE_FLAG_OVERLAPPED,
181 NULL);
182
183 if (pipeHandle != INVALID_HANDLE_VALUE) {
184 *duplex_flags = UV_HANDLE_WRITABLE;
185 return pipeHandle;
186 }
187 }
188
189 return INVALID_HANDLE_VALUE;
190 }
191
192
close_pipe(uv_pipe_t * pipe)193 static void close_pipe(uv_pipe_t* pipe) {
194 assert(pipe->u.fd == -1 || pipe->u.fd > 2);
195 if (pipe->u.fd == -1)
196 CloseHandle(pipe->handle);
197 else
198 close(pipe->u.fd);
199
200 pipe->u.fd = -1;
201 pipe->handle = INVALID_HANDLE_VALUE;
202 }
203
204
uv_stdio_pipe_server(uv_loop_t * loop,uv_pipe_t * handle,DWORD access,char * name,size_t nameSize)205 int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
206 char* name, size_t nameSize) {
207 HANDLE pipeHandle;
208 int err;
209 char* ptr = (char*)handle;
210
211 for (;;) {
212 uv_unique_pipe_name(ptr, name, nameSize);
213
214 pipeHandle = CreateNamedPipeA(name,
215 access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE | WRITE_DAC,
216 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
217 NULL);
218
219 if (pipeHandle != INVALID_HANDLE_VALUE) {
220 /* No name collisions. We're done. */
221 break;
222 }
223
224 err = GetLastError();
225 if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
226 goto error;
227 }
228
229 /* Pipe name collision. Increment the pointer and try again. */
230 ptr++;
231 }
232
233 if (CreateIoCompletionPort(pipeHandle,
234 loop->iocp,
235 (ULONG_PTR)handle,
236 0) == NULL) {
237 err = GetLastError();
238 goto error;
239 }
240
241 uv_pipe_connection_init(handle);
242 handle->handle = pipeHandle;
243
244 return 0;
245
246 error:
247 if (pipeHandle != INVALID_HANDLE_VALUE) {
248 CloseHandle(pipeHandle);
249 }
250
251 return err;
252 }
253
254
uv_set_pipe_handle(uv_loop_t * loop,uv_pipe_t * handle,HANDLE pipeHandle,int fd,DWORD duplex_flags)255 static int uv_set_pipe_handle(uv_loop_t* loop,
256 uv_pipe_t* handle,
257 HANDLE pipeHandle,
258 int fd,
259 DWORD duplex_flags) {
260 NTSTATUS nt_status;
261 IO_STATUS_BLOCK io_status;
262 FILE_MODE_INFORMATION mode_info;
263 DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
264 DWORD current_mode = 0;
265 DWORD err = 0;
266
267 if (handle->flags & UV_HANDLE_PIPESERVER)
268 return UV_EINVAL;
269 if (handle->handle != INVALID_HANDLE_VALUE)
270 return UV_EBUSY;
271
272 if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
273 err = GetLastError();
274 if (err == ERROR_ACCESS_DENIED) {
275 /*
276 * SetNamedPipeHandleState can fail if the handle doesn't have either
277 * GENERIC_WRITE or FILE_WRITE_ATTRIBUTES.
278 * But if the handle already has the desired wait and blocking modes
279 * we can continue.
280 */
281 if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL,
282 NULL, NULL, 0)) {
283 return -1;
284 } else if (current_mode & PIPE_NOWAIT) {
285 SetLastError(ERROR_ACCESS_DENIED);
286 return -1;
287 }
288 } else {
289 /* If this returns ERROR_INVALID_PARAMETER we probably opened
290 * something that is not a pipe. */
291 if (err == ERROR_INVALID_PARAMETER) {
292 SetLastError(WSAENOTSOCK);
293 }
294 return -1;
295 }
296 }
297
298 /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
299 nt_status = pNtQueryInformationFile(pipeHandle,
300 &io_status,
301 &mode_info,
302 sizeof(mode_info),
303 FileModeInformation);
304 if (nt_status != STATUS_SUCCESS) {
305 return -1;
306 }
307
308 if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
309 mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
310 /* Non-overlapped pipe. */
311 handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
312 } else {
313 /* Overlapped pipe. Try to associate with IOCP. */
314 if (CreateIoCompletionPort(pipeHandle,
315 loop->iocp,
316 (ULONG_PTR) handle,
317 0) == NULL) {
318 handle->flags |= UV_HANDLE_EMULATE_IOCP;
319 }
320 }
321
322 handle->handle = pipeHandle;
323 handle->u.fd = fd;
324 handle->flags |= duplex_flags;
325
326 return 0;
327 }
328
329
pipe_alloc_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)330 static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
331 uv_pipe_accept_t* req, BOOL firstInstance) {
332 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
333
334 req->pipeHandle =
335 CreateNamedPipeW(handle->name,
336 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
337 (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
338 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
339 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
340
341 if (req->pipeHandle == INVALID_HANDLE_VALUE) {
342 return 0;
343 }
344
345 /* Associate it with IOCP so we can get events. */
346 if (CreateIoCompletionPort(req->pipeHandle,
347 loop->iocp,
348 (ULONG_PTR) handle,
349 0) == NULL) {
350 uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
351 }
352
353 /* Stash a handle in the server object for use from places such as
354 * getsockname and chmod. As we transfer ownership of these to client
355 * objects, we'll allocate new ones here. */
356 handle->handle = req->pipeHandle;
357
358 return 1;
359 }
360
361
pipe_shutdown_thread_proc(void * parameter)362 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
363 uv_loop_t* loop;
364 uv_pipe_t* handle;
365 uv_shutdown_t* req;
366
367 req = (uv_shutdown_t*) parameter;
368 assert(req);
369 handle = (uv_pipe_t*) req->handle;
370 assert(handle);
371 loop = handle->loop;
372 assert(loop);
373
374 FlushFileBuffers(handle->handle);
375
376 /* Post completed */
377 POST_COMPLETION_FOR_REQ(loop, req);
378
379 return 0;
380 }
381
382
uv_pipe_endgame(uv_loop_t * loop,uv_pipe_t * handle)383 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
384 int err;
385 DWORD result;
386 uv_shutdown_t* req;
387 NTSTATUS nt_status;
388 IO_STATUS_BLOCK io_status;
389 FILE_PIPE_LOCAL_INFORMATION pipe_info;
390 uv__ipc_xfer_queue_item_t* xfer_queue_item;
391
392 if ((handle->flags & UV_HANDLE_CONNECTION) &&
393 handle->stream.conn.shutdown_req != NULL &&
394 handle->stream.conn.write_reqs_pending == 0) {
395 req = handle->stream.conn.shutdown_req;
396
397 /* Clear the shutdown_req field so we don't go here again. */
398 handle->stream.conn.shutdown_req = NULL;
399
400 if (handle->flags & UV_HANDLE_CLOSING) {
401 UNREGISTER_HANDLE_REQ(loop, handle, req);
402
403 /* Already closing. Cancel the shutdown. */
404 if (req->cb) {
405 req->cb(req, UV_ECANCELED);
406 }
407
408 DECREASE_PENDING_REQ_COUNT(handle);
409 return;
410 }
411
412 /* Try to avoid flushing the pipe buffer in the thread pool. */
413 nt_status = pNtQueryInformationFile(handle->handle,
414 &io_status,
415 &pipe_info,
416 sizeof pipe_info,
417 FilePipeLocalInformation);
418
419 if (nt_status != STATUS_SUCCESS) {
420 /* Failure */
421 UNREGISTER_HANDLE_REQ(loop, handle, req);
422
423 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
424 if (req->cb) {
425 err = pRtlNtStatusToDosError(nt_status);
426 req->cb(req, uv_translate_sys_error(err));
427 }
428
429 DECREASE_PENDING_REQ_COUNT(handle);
430 return;
431 }
432
433 if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
434 /* Short-circuit, no need to call FlushFileBuffers. */
435 uv_insert_pending_req(loop, (uv_req_t*) req);
436 return;
437 }
438
439 /* Run FlushFileBuffers in the thread pool. */
440 result = QueueUserWorkItem(pipe_shutdown_thread_proc,
441 req,
442 WT_EXECUTELONGFUNCTION);
443 if (result) {
444 return;
445
446 } else {
447 /* Failure. */
448 UNREGISTER_HANDLE_REQ(loop, handle, req);
449
450 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
451 if (req->cb) {
452 err = GetLastError();
453 req->cb(req, uv_translate_sys_error(err));
454 }
455
456 DECREASE_PENDING_REQ_COUNT(handle);
457 return;
458 }
459 }
460
461 if (handle->flags & UV_HANDLE_CLOSING &&
462 handle->reqs_pending == 0) {
463 assert(!(handle->flags & UV_HANDLE_CLOSED));
464
465 if (handle->flags & UV_HANDLE_CONNECTION) {
466 /* Free pending sockets */
467 while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
468 QUEUE* q;
469 SOCKET socket;
470
471 q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
472 QUEUE_REMOVE(q);
473 xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
474
475 /* Materialize socket and close it */
476 socket = WSASocketW(FROM_PROTOCOL_INFO,
477 FROM_PROTOCOL_INFO,
478 FROM_PROTOCOL_INFO,
479 &xfer_queue_item->xfer_info.socket_info,
480 0,
481 WSA_FLAG_OVERLAPPED);
482 uv__free(xfer_queue_item);
483
484 if (socket != INVALID_SOCKET)
485 closesocket(socket);
486 }
487 handle->pipe.conn.ipc_xfer_queue_length = 0;
488
489 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
490 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
491 UnregisterWait(handle->read_req.wait_handle);
492 handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
493 }
494 if (handle->read_req.event_handle != NULL) {
495 CloseHandle(handle->read_req.event_handle);
496 handle->read_req.event_handle = NULL;
497 }
498 }
499
500 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
501 DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
502 }
503
504 if (handle->flags & UV_HANDLE_PIPESERVER) {
505 assert(handle->pipe.serv.accept_reqs);
506 uv__free(handle->pipe.serv.accept_reqs);
507 handle->pipe.serv.accept_reqs = NULL;
508 }
509
510 uv__handle_close(handle);
511 }
512 }
513
514
uv_pipe_pending_instances(uv_pipe_t * handle,int count)515 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
516 if (handle->flags & UV_HANDLE_BOUND)
517 return;
518 handle->pipe.serv.pending_instances = count;
519 handle->flags |= UV_HANDLE_PIPESERVER;
520 }
521
522
523 /* Creates a pipe server. */
uv_pipe_bind(uv_pipe_t * handle,const char * name)524 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
525 uv_loop_t* loop = handle->loop;
526 int i, err, nameSize;
527 uv_pipe_accept_t* req;
528
529 if (handle->flags & UV_HANDLE_BOUND) {
530 return UV_EINVAL;
531 }
532
533 if (!name) {
534 return UV_EINVAL;
535 }
536
537 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
538 handle->pipe.serv.pending_instances = default_pending_pipe_instances;
539 }
540
541 handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
542 uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
543 if (!handle->pipe.serv.accept_reqs) {
544 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
545 }
546
547 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
548 req = &handle->pipe.serv.accept_reqs[i];
549 UV_REQ_INIT(req, UV_ACCEPT);
550 req->data = handle;
551 req->pipeHandle = INVALID_HANDLE_VALUE;
552 req->next_pending = NULL;
553 }
554
555 /* Convert name to UTF16. */
556 nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
557 handle->name = (WCHAR*)uv__malloc(nameSize);
558 if (!handle->name) {
559 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
560 }
561
562 if (!MultiByteToWideChar(CP_UTF8,
563 0,
564 name,
565 -1,
566 handle->name,
567 nameSize / sizeof(WCHAR))) {
568 err = GetLastError();
569 goto error;
570 }
571
572 /*
573 * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
574 * If this fails then there's already a pipe server for the given pipe name.
575 */
576 if (!pipe_alloc_accept(loop,
577 handle,
578 &handle->pipe.serv.accept_reqs[0],
579 TRUE)) {
580 err = GetLastError();
581 if (err == ERROR_ACCESS_DENIED) {
582 err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
583 } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
584 err = WSAEACCES; /* Translates to UV_EACCES. */
585 }
586 goto error;
587 }
588
589 handle->pipe.serv.pending_accepts = NULL;
590 handle->flags |= UV_HANDLE_PIPESERVER;
591 handle->flags |= UV_HANDLE_BOUND;
592
593 return 0;
594
595 error:
596 if (handle->name) {
597 uv__free(handle->name);
598 handle->name = NULL;
599 }
600
601 return uv_translate_sys_error(err);
602 }
603
604
pipe_connect_thread_proc(void * parameter)605 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
606 uv_loop_t* loop;
607 uv_pipe_t* handle;
608 uv_connect_t* req;
609 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
610 DWORD duplex_flags;
611
612 req = (uv_connect_t*) parameter;
613 assert(req);
614 handle = (uv_pipe_t*) req->handle;
615 assert(handle);
616 loop = handle->loop;
617 assert(loop);
618
619 /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
620 * for the pipe to become available with WaitNamedPipe. */
621 while (WaitNamedPipeW(handle->name, 30000)) {
622 /* The pipe is now available, try to connect. */
623 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
624 if (pipeHandle != INVALID_HANDLE_VALUE) {
625 break;
626 }
627
628 SwitchToThread();
629 }
630
631 if (pipeHandle != INVALID_HANDLE_VALUE &&
632 !uv_set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags)) {
633 SET_REQ_SUCCESS(req);
634 } else {
635 SET_REQ_ERROR(req, GetLastError());
636 }
637
638 /* Post completed */
639 POST_COMPLETION_FOR_REQ(loop, req);
640
641 return 0;
642 }
643
644
uv_pipe_connect(uv_connect_t * req,uv_pipe_t * handle,const char * name,uv_connect_cb cb)645 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
646 const char* name, uv_connect_cb cb) {
647 uv_loop_t* loop = handle->loop;
648 int err, nameSize;
649 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
650 DWORD duplex_flags;
651
652 UV_REQ_INIT(req, UV_CONNECT);
653 req->handle = (uv_stream_t*) handle;
654 req->cb = cb;
655
656 /* Convert name to UTF16. */
657 nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
658 handle->name = (WCHAR*)uv__malloc(nameSize);
659 if (!handle->name) {
660 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
661 }
662
663 if (!MultiByteToWideChar(CP_UTF8,
664 0,
665 name,
666 -1,
667 handle->name,
668 nameSize / sizeof(WCHAR))) {
669 err = GetLastError();
670 goto error;
671 }
672
673 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
674 if (pipeHandle == INVALID_HANDLE_VALUE) {
675 if (GetLastError() == ERROR_PIPE_BUSY) {
676 /* Wait for the server to make a pipe instance available. */
677 if (!QueueUserWorkItem(&pipe_connect_thread_proc,
678 req,
679 WT_EXECUTELONGFUNCTION)) {
680 err = GetLastError();
681 goto error;
682 }
683
684 REGISTER_HANDLE_REQ(loop, handle, req);
685 handle->reqs_pending++;
686
687 return;
688 }
689
690 err = GetLastError();
691 goto error;
692 }
693
694 assert(pipeHandle != INVALID_HANDLE_VALUE);
695
696 if (uv_set_pipe_handle(loop,
697 (uv_pipe_t*) req->handle,
698 pipeHandle,
699 -1,
700 duplex_flags)) {
701 err = GetLastError();
702 goto error;
703 }
704
705 SET_REQ_SUCCESS(req);
706 uv_insert_pending_req(loop, (uv_req_t*) req);
707 handle->reqs_pending++;
708 REGISTER_HANDLE_REQ(loop, handle, req);
709 return;
710
711 error:
712 if (handle->name) {
713 uv__free(handle->name);
714 handle->name = NULL;
715 }
716
717 if (pipeHandle != INVALID_HANDLE_VALUE) {
718 CloseHandle(pipeHandle);
719 }
720
721 /* Make this req pending reporting an error. */
722 SET_REQ_ERROR(req, err);
723 uv_insert_pending_req(loop, (uv_req_t*) req);
724 handle->reqs_pending++;
725 REGISTER_HANDLE_REQ(loop, handle, req);
726 return;
727 }
728
729
uv__pipe_interrupt_read(uv_pipe_t * handle)730 void uv__pipe_interrupt_read(uv_pipe_t* handle) {
731 BOOL r;
732
733 if (!(handle->flags & UV_HANDLE_READ_PENDING))
734 return; /* No pending reads. */
735 if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
736 return; /* Already cancelled. */
737 if (handle->handle == INVALID_HANDLE_VALUE)
738 return; /* Pipe handle closed. */
739
740 if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
741 /* Cancel asynchronous read. */
742 r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
743 assert(r || GetLastError() == ERROR_NOT_FOUND);
744
745 } else {
746 /* Cancel synchronous read (which is happening in the thread pool). */
747 HANDLE thread;
748 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
749
750 EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
751
752 thread = *thread_ptr;
753 if (thread == NULL) {
754 /* The thread pool thread has not yet reached the point of blocking, we
755 * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
756 *thread_ptr = INVALID_HANDLE_VALUE;
757
758 } else {
759 /* Spin until the thread has acknowledged (by setting the thread to
760 * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
761 while (thread != INVALID_HANDLE_VALUE) {
762 r = CancelSynchronousIo(thread);
763 assert(r || GetLastError() == ERROR_NOT_FOUND);
764 SwitchToThread(); /* Yield thread. */
765 thread = *thread_ptr;
766 }
767 }
768
769 LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
770 }
771
772 /* Set flag to indicate that read has been cancelled. */
773 handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
774 }
775
776
uv__pipe_read_stop(uv_pipe_t * handle)777 void uv__pipe_read_stop(uv_pipe_t* handle) {
778 handle->flags &= ~UV_HANDLE_READING;
779 DECREASE_ACTIVE_COUNT(handle->loop, handle);
780
781 uv__pipe_interrupt_read(handle);
782 }
783
784
785 /* Cleans up uv_pipe_t (server or connection) and all resources associated with
786 * it. */
uv_pipe_cleanup(uv_loop_t * loop,uv_pipe_t * handle)787 void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
788 int i;
789 HANDLE pipeHandle;
790
791 uv__pipe_interrupt_read(handle);
792
793 if (handle->name) {
794 uv__free(handle->name);
795 handle->name = NULL;
796 }
797
798 if (handle->flags & UV_HANDLE_PIPESERVER) {
799 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
800 pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
801 if (pipeHandle != INVALID_HANDLE_VALUE) {
802 CloseHandle(pipeHandle);
803 handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
804 }
805 }
806 handle->handle = INVALID_HANDLE_VALUE;
807 }
808
809 if (handle->flags & UV_HANDLE_CONNECTION) {
810 handle->flags &= ~UV_HANDLE_WRITABLE;
811 eof_timer_destroy(handle);
812 }
813
814 if ((handle->flags & UV_HANDLE_CONNECTION)
815 && handle->handle != INVALID_HANDLE_VALUE)
816 close_pipe(handle);
817 }
818
819
uv_pipe_close(uv_loop_t * loop,uv_pipe_t * handle)820 void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
821 if (handle->flags & UV_HANDLE_READING) {
822 handle->flags &= ~UV_HANDLE_READING;
823 DECREASE_ACTIVE_COUNT(loop, handle);
824 }
825
826 if (handle->flags & UV_HANDLE_LISTENING) {
827 handle->flags &= ~UV_HANDLE_LISTENING;
828 DECREASE_ACTIVE_COUNT(loop, handle);
829 }
830
831 uv_pipe_cleanup(loop, handle);
832
833 if (handle->reqs_pending == 0) {
834 uv_want_endgame(loop, (uv_handle_t*) handle);
835 }
836
837 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
838 uv__handle_closing(handle);
839 }
840
841
uv_pipe_queue_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)842 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
843 uv_pipe_accept_t* req, BOOL firstInstance) {
844 assert(handle->flags & UV_HANDLE_LISTENING);
845
846 if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
847 SET_REQ_ERROR(req, GetLastError());
848 uv_insert_pending_req(loop, (uv_req_t*) req);
849 handle->reqs_pending++;
850 return;
851 }
852
853 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
854
855 /* Prepare the overlapped structure. */
856 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
857
858 if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
859 GetLastError() != ERROR_IO_PENDING) {
860 if (GetLastError() == ERROR_PIPE_CONNECTED) {
861 SET_REQ_SUCCESS(req);
862 } else {
863 CloseHandle(req->pipeHandle);
864 req->pipeHandle = INVALID_HANDLE_VALUE;
865 /* Make this req pending reporting an error. */
866 SET_REQ_ERROR(req, GetLastError());
867 }
868 uv_insert_pending_req(loop, (uv_req_t*) req);
869 handle->reqs_pending++;
870 return;
871 }
872
873 /* Wait for completion via IOCP */
874 handle->reqs_pending++;
875 }
876
877
uv_pipe_accept(uv_pipe_t * server,uv_stream_t * client)878 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
879 uv_loop_t* loop = server->loop;
880 uv_pipe_t* pipe_client;
881 uv_pipe_accept_t* req;
882 QUEUE* q;
883 uv__ipc_xfer_queue_item_t* item;
884 int err;
885
886 if (server->ipc) {
887 if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
888 /* No valid pending sockets. */
889 return WSAEWOULDBLOCK;
890 }
891
892 q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
893 QUEUE_REMOVE(q);
894 server->pipe.conn.ipc_xfer_queue_length--;
895 item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
896
897 err = uv__tcp_xfer_import(
898 (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
899 if (err != 0)
900 return err;
901
902 uv__free(item);
903
904 } else {
905 pipe_client = (uv_pipe_t*) client;
906
907 /* Find a connection instance that has been connected, but not yet
908 * accepted. */
909 req = server->pipe.serv.pending_accepts;
910
911 if (!req) {
912 /* No valid connections found, so we error out. */
913 return WSAEWOULDBLOCK;
914 }
915
916 /* Initialize the client handle and copy the pipeHandle to the client */
917 uv_pipe_connection_init(pipe_client);
918 pipe_client->handle = req->pipeHandle;
919 pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
920
921 /* Prepare the req to pick up a new connection */
922 server->pipe.serv.pending_accepts = req->next_pending;
923 req->next_pending = NULL;
924 req->pipeHandle = INVALID_HANDLE_VALUE;
925
926 server->handle = INVALID_HANDLE_VALUE;
927 if (!(server->flags & UV_HANDLE_CLOSING)) {
928 uv_pipe_queue_accept(loop, server, req, FALSE);
929 }
930 }
931
932 return 0;
933 }
934
935
936 /* Starts listening for connections for the given pipe. */
uv_pipe_listen(uv_pipe_t * handle,int backlog,uv_connection_cb cb)937 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
938 uv_loop_t* loop = handle->loop;
939 int i;
940
941 if (handle->flags & UV_HANDLE_LISTENING) {
942 handle->stream.serv.connection_cb = cb;
943 }
944
945 if (!(handle->flags & UV_HANDLE_BOUND)) {
946 return WSAEINVAL;
947 }
948
949 if (handle->flags & UV_HANDLE_READING) {
950 return WSAEISCONN;
951 }
952
953 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
954 return ERROR_NOT_SUPPORTED;
955 }
956
957 if (handle->ipc) {
958 return WSAEINVAL;
959 }
960
961 handle->flags |= UV_HANDLE_LISTENING;
962 INCREASE_ACTIVE_COUNT(loop, handle);
963 handle->stream.serv.connection_cb = cb;
964
965 /* First pipe handle should have already been created in uv_pipe_bind */
966 assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
967
968 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
969 uv_pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
970 }
971
972 return 0;
973 }
974
975
uv_pipe_zero_readfile_thread_proc(void * arg)976 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
977 uv_read_t* req = (uv_read_t*) arg;
978 uv_pipe_t* handle = (uv_pipe_t*) req->data;
979 uv_loop_t* loop = handle->loop;
980 volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
981 CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
982 HANDLE thread;
983 DWORD bytes;
984 DWORD err;
985
986 assert(req->type == UV_READ);
987 assert(handle->type == UV_NAMED_PIPE);
988
989 err = 0;
990
991 /* Create a handle to the current thread. */
992 if (!DuplicateHandle(GetCurrentProcess(),
993 GetCurrentThread(),
994 GetCurrentProcess(),
995 &thread,
996 0,
997 FALSE,
998 DUPLICATE_SAME_ACCESS)) {
999 err = GetLastError();
1000 goto out1;
1001 }
1002
1003 /* The lock needs to be held when thread handle is modified. */
1004 EnterCriticalSection(lock);
1005 if (*thread_ptr == INVALID_HANDLE_VALUE) {
1006 /* uv__pipe_interrupt_read() cancelled reading before we got here. */
1007 err = ERROR_OPERATION_ABORTED;
1008 } else {
1009 /* Let main thread know which worker thread is doing the blocking read. */
1010 assert(*thread_ptr == NULL);
1011 *thread_ptr = thread;
1012 }
1013 LeaveCriticalSection(lock);
1014
1015 if (err)
1016 goto out2;
1017
1018 /* Block the thread until data is available on the pipe, or the read is
1019 * cancelled. */
1020 if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
1021 err = GetLastError();
1022
1023 /* Let the main thread know the worker is past the point of blocking. */
1024 assert(thread == *thread_ptr);
1025 *thread_ptr = INVALID_HANDLE_VALUE;
1026
1027 /* Briefly acquire the mutex. Since the main thread holds the lock while it
1028 * is spinning trying to cancel this thread's I/O, we will block here until
1029 * it stops doing that. */
1030 EnterCriticalSection(lock);
1031 LeaveCriticalSection(lock);
1032
1033 out2:
1034 /* Close the handle to the current thread. */
1035 CloseHandle(thread);
1036
1037 out1:
1038 /* Set request status and post a completion record to the IOCP. */
1039 if (err)
1040 SET_REQ_ERROR(req, err);
1041 else
1042 SET_REQ_SUCCESS(req);
1043 POST_COMPLETION_FOR_REQ(loop, req);
1044
1045 return 0;
1046 }
1047
1048
uv_pipe_writefile_thread_proc(void * parameter)1049 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1050 int result;
1051 DWORD bytes;
1052 uv_write_t* req = (uv_write_t*) parameter;
1053 uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1054 uv_loop_t* loop = handle->loop;
1055
1056 assert(req != NULL);
1057 assert(req->type == UV_WRITE);
1058 assert(handle->type == UV_NAMED_PIPE);
1059 assert(req->write_buffer.base);
1060
1061 result = WriteFile(handle->handle,
1062 req->write_buffer.base,
1063 req->write_buffer.len,
1064 &bytes,
1065 NULL);
1066
1067 if (!result) {
1068 SET_REQ_ERROR(req, GetLastError());
1069 }
1070
1071 POST_COMPLETION_FOR_REQ(loop, req);
1072 return 0;
1073 }
1074
1075
post_completion_read_wait(void * context,BOOLEAN timed_out)1076 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1077 uv_read_t* req;
1078 uv_tcp_t* handle;
1079
1080 req = (uv_read_t*) context;
1081 assert(req != NULL);
1082 handle = (uv_tcp_t*)req->data;
1083 assert(handle != NULL);
1084 assert(!timed_out);
1085
1086 if (!PostQueuedCompletionStatus(handle->loop->iocp,
1087 req->u.io.overlapped.InternalHigh,
1088 0,
1089 &req->u.io.overlapped)) {
1090 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1091 }
1092 }
1093
1094
post_completion_write_wait(void * context,BOOLEAN timed_out)1095 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1096 uv_write_t* req;
1097 uv_tcp_t* handle;
1098
1099 req = (uv_write_t*) context;
1100 assert(req != NULL);
1101 handle = (uv_tcp_t*)req->handle;
1102 assert(handle != NULL);
1103 assert(!timed_out);
1104
1105 if (!PostQueuedCompletionStatus(handle->loop->iocp,
1106 req->u.io.overlapped.InternalHigh,
1107 0,
1108 &req->u.io.overlapped)) {
1109 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1110 }
1111 }
1112
1113
uv_pipe_queue_read(uv_loop_t * loop,uv_pipe_t * handle)1114 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1115 uv_read_t* req;
1116 int result;
1117
1118 assert(handle->flags & UV_HANDLE_READING);
1119 assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1120
1121 assert(handle->handle != INVALID_HANDLE_VALUE);
1122
1123 req = &handle->read_req;
1124
1125 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1126 handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
1127 if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1128 req,
1129 WT_EXECUTELONGFUNCTION)) {
1130 /* Make this req pending reporting an error. */
1131 SET_REQ_ERROR(req, GetLastError());
1132 goto error;
1133 }
1134 } else {
1135 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1136 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1137 assert(req->event_handle != NULL);
1138 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1139 }
1140
1141 /* Do 0-read */
1142 result = ReadFile(handle->handle,
1143 &uv_zero_,
1144 0,
1145 NULL,
1146 &req->u.io.overlapped);
1147
1148 if (!result && GetLastError() != ERROR_IO_PENDING) {
1149 /* Make this req pending reporting an error. */
1150 SET_REQ_ERROR(req, GetLastError());
1151 goto error;
1152 }
1153
1154 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1155 if (req->wait_handle == INVALID_HANDLE_VALUE) {
1156 if (!RegisterWaitForSingleObject(&req->wait_handle,
1157 req->event_handle, post_completion_read_wait, (void*) req,
1158 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1159 SET_REQ_ERROR(req, GetLastError());
1160 goto error;
1161 }
1162 }
1163 }
1164 }
1165
1166 /* Start the eof timer if there is one */
1167 eof_timer_start(handle);
1168 handle->flags |= UV_HANDLE_READ_PENDING;
1169 handle->reqs_pending++;
1170 return;
1171
1172 error:
1173 uv_insert_pending_req(loop, (uv_req_t*)req);
1174 handle->flags |= UV_HANDLE_READ_PENDING;
1175 handle->reqs_pending++;
1176 }
1177
1178
uv_pipe_read_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1179 int uv_pipe_read_start(uv_pipe_t* handle,
1180 uv_alloc_cb alloc_cb,
1181 uv_read_cb read_cb) {
1182 uv_loop_t* loop = handle->loop;
1183
1184 handle->flags |= UV_HANDLE_READING;
1185 INCREASE_ACTIVE_COUNT(loop, handle);
1186 handle->read_cb = read_cb;
1187 handle->alloc_cb = alloc_cb;
1188
1189 /* If reading was stopped and then started again, there could still be a read
1190 * request pending. */
1191 if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
1192 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
1193 handle->read_req.event_handle == NULL) {
1194 handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
1195 if (handle->read_req.event_handle == NULL) {
1196 uv_fatal_error(GetLastError(), "CreateEvent");
1197 }
1198 }
1199 uv_pipe_queue_read(loop, handle);
1200 }
1201
1202 return 0;
1203 }
1204
1205
uv_insert_non_overlapped_write_req(uv_pipe_t * handle,uv_write_t * req)1206 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
1207 uv_write_t* req) {
1208 req->next_req = NULL;
1209 if (handle->pipe.conn.non_overlapped_writes_tail) {
1210 req->next_req =
1211 handle->pipe.conn.non_overlapped_writes_tail->next_req;
1212 handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1213 handle->pipe.conn.non_overlapped_writes_tail = req;
1214 } else {
1215 req->next_req = (uv_req_t*)req;
1216 handle->pipe.conn.non_overlapped_writes_tail = req;
1217 }
1218 }
1219
1220
uv_remove_non_overlapped_write_req(uv_pipe_t * handle)1221 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1222 uv_write_t* req;
1223
1224 if (handle->pipe.conn.non_overlapped_writes_tail) {
1225 req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1226
1227 if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1228 handle->pipe.conn.non_overlapped_writes_tail = NULL;
1229 } else {
1230 handle->pipe.conn.non_overlapped_writes_tail->next_req =
1231 req->next_req;
1232 }
1233
1234 return req;
1235 } else {
1236 /* queue empty */
1237 return NULL;
1238 }
1239 }
1240
1241
uv_queue_non_overlapped_write(uv_pipe_t * handle)1242 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
1243 uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1244 if (req) {
1245 if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1246 req,
1247 WT_EXECUTELONGFUNCTION)) {
1248 uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1249 }
1250 }
1251 }
1252
1253
uv__build_coalesced_write_req(uv_write_t * user_req,const uv_buf_t bufs[],size_t nbufs,uv_write_t ** req_out,uv_buf_t * write_buf_out)1254 static int uv__build_coalesced_write_req(uv_write_t* user_req,
1255 const uv_buf_t bufs[],
1256 size_t nbufs,
1257 uv_write_t** req_out,
1258 uv_buf_t* write_buf_out) {
1259 /* Pack into a single heap-allocated buffer:
1260 * (a) a uv_write_t structure where libuv stores the actual state.
1261 * (b) a pointer to the original uv_write_t.
1262 * (c) data from all `bufs` entries.
1263 */
1264 char* heap_buffer;
1265 size_t heap_buffer_length, heap_buffer_offset;
1266 uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
1267 char* data_start; /* (c) */
1268 size_t data_length;
1269 unsigned int i;
1270
1271 /* Compute combined size of all combined buffers from `bufs`. */
1272 data_length = 0;
1273 for (i = 0; i < nbufs; i++)
1274 data_length += bufs[i].len;
1275
1276 /* The total combined size of data buffers should not exceed UINT32_MAX,
1277 * because WriteFile() won't accept buffers larger than that. */
1278 if (data_length > UINT32_MAX)
1279 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1280
1281 /* Compute heap buffer size. */
1282 heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
1283 data_length; /* (c) */
1284
1285 /* Allocate buffer. */
1286 heap_buffer = uv__malloc(heap_buffer_length);
1287 if (heap_buffer == NULL)
1288 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1289
1290 /* Copy uv_write_t information to the buffer. */
1291 coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
1292 coalesced_write_req->req = *user_req; /* copy (a) */
1293 coalesced_write_req->req.coalesced = 1;
1294 coalesced_write_req->user_req = user_req; /* copy (b) */
1295 heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
1296
1297 /* Copy data buffers to the heap buffer. */
1298 data_start = &heap_buffer[heap_buffer_offset];
1299 for (i = 0; i < nbufs; i++) {
1300 memcpy(&heap_buffer[heap_buffer_offset],
1301 bufs[i].base,
1302 bufs[i].len); /* copy (c) */
1303 heap_buffer_offset += bufs[i].len; /* offset (c) */
1304 }
1305 assert(heap_buffer_offset == heap_buffer_length);
1306
1307 /* Set out arguments and return. */
1308 *req_out = &coalesced_write_req->req;
1309 *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
1310 return 0;
1311 }
1312
1313
uv__pipe_write_data(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_write_cb cb,int copy_always)1314 static int uv__pipe_write_data(uv_loop_t* loop,
1315 uv_write_t* req,
1316 uv_pipe_t* handle,
1317 const uv_buf_t bufs[],
1318 size_t nbufs,
1319 uv_write_cb cb,
1320 int copy_always) {
1321 int err;
1322 int result;
1323 uv_buf_t write_buf;
1324
1325 assert(handle->handle != INVALID_HANDLE_VALUE);
1326
1327 UV_REQ_INIT(req, UV_WRITE);
1328 req->handle = (uv_stream_t*) handle;
1329 req->send_handle = NULL;
1330 req->cb = cb;
1331 /* Private fields. */
1332 req->coalesced = 0;
1333 req->event_handle = NULL;
1334 req->wait_handle = INVALID_HANDLE_VALUE;
1335
1336 /* Prepare the overlapped structure. */
1337 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1338 if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
1339 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1340 if (req->event_handle == NULL) {
1341 uv_fatal_error(GetLastError(), "CreateEvent");
1342 }
1343 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1344 }
1345 req->write_buffer = uv_null_buf_;
1346
1347 if (nbufs == 0) {
1348 /* Write empty buffer. */
1349 write_buf = uv_null_buf_;
1350 } else if (nbufs == 1 && !copy_always) {
1351 /* Write directly from bufs[0]. */
1352 write_buf = bufs[0];
1353 } else {
1354 /* Coalesce all `bufs` into one big buffer. This also creates a new
1355 * write-request structure that replaces the old one. */
1356 err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
1357 if (err != 0)
1358 return err;
1359 }
1360
1361 if ((handle->flags &
1362 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1363 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1364 DWORD bytes;
1365 result =
1366 WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
1367
1368 if (!result) {
1369 err = GetLastError();
1370 return err;
1371 } else {
1372 /* Request completed immediately. */
1373 req->u.io.queued_bytes = 0;
1374 }
1375
1376 REGISTER_HANDLE_REQ(loop, handle, req);
1377 handle->reqs_pending++;
1378 handle->stream.conn.write_reqs_pending++;
1379 POST_COMPLETION_FOR_REQ(loop, req);
1380 return 0;
1381 } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1382 req->write_buffer = write_buf;
1383 uv_insert_non_overlapped_write_req(handle, req);
1384 if (handle->stream.conn.write_reqs_pending == 0) {
1385 uv_queue_non_overlapped_write(handle);
1386 }
1387
1388 /* Request queued by the kernel. */
1389 req->u.io.queued_bytes = write_buf.len;
1390 handle->write_queue_size += req->u.io.queued_bytes;
1391 } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1392 /* Using overlapped IO, but wait for completion before returning */
1393 result = WriteFile(handle->handle,
1394 write_buf.base,
1395 write_buf.len,
1396 NULL,
1397 &req->u.io.overlapped);
1398
1399 if (!result && GetLastError() != ERROR_IO_PENDING) {
1400 err = GetLastError();
1401 CloseHandle(req->event_handle);
1402 req->event_handle = NULL;
1403 return err;
1404 }
1405
1406 if (result) {
1407 /* Request completed immediately. */
1408 req->u.io.queued_bytes = 0;
1409 } else {
1410 /* Request queued by the kernel. */
1411 req->u.io.queued_bytes = write_buf.len;
1412 handle->write_queue_size += req->u.io.queued_bytes;
1413 if (WaitForSingleObject(req->event_handle, INFINITE) !=
1414 WAIT_OBJECT_0) {
1415 err = GetLastError();
1416 CloseHandle(req->event_handle);
1417 req->event_handle = NULL;
1418 return err;
1419 }
1420 }
1421 CloseHandle(req->event_handle);
1422 req->event_handle = NULL;
1423
1424 REGISTER_HANDLE_REQ(loop, handle, req);
1425 handle->reqs_pending++;
1426 handle->stream.conn.write_reqs_pending++;
1427 return 0;
1428 } else {
1429 result = WriteFile(handle->handle,
1430 write_buf.base,
1431 write_buf.len,
1432 NULL,
1433 &req->u.io.overlapped);
1434
1435 if (!result && GetLastError() != ERROR_IO_PENDING) {
1436 return GetLastError();
1437 }
1438
1439 if (result) {
1440 /* Request completed immediately. */
1441 req->u.io.queued_bytes = 0;
1442 } else {
1443 /* Request queued by the kernel. */
1444 req->u.io.queued_bytes = write_buf.len;
1445 handle->write_queue_size += req->u.io.queued_bytes;
1446 }
1447
1448 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1449 if (!RegisterWaitForSingleObject(&req->wait_handle,
1450 req->event_handle, post_completion_write_wait, (void*) req,
1451 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1452 return GetLastError();
1453 }
1454 }
1455 }
1456
1457 REGISTER_HANDLE_REQ(loop, handle, req);
1458 handle->reqs_pending++;
1459 handle->stream.conn.write_reqs_pending++;
1460
1461 return 0;
1462 }
1463
1464
uv__pipe_get_ipc_remote_pid(uv_pipe_t * handle)1465 static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
1466 DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
1467
1468 /* If the both ends of the IPC pipe are owned by the same process,
1469 * the remote end pid may not yet be set. If so, do it here.
1470 * TODO: this is weird; it'd probably better to use a handshake. */
1471 if (*pid == 0)
1472 *pid = GetCurrentProcessId();
1473
1474 return *pid;
1475 }
1476
1477
uv__pipe_write_ipc(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t data_bufs[],size_t data_buf_count,uv_stream_t * send_handle,uv_write_cb cb)1478 int uv__pipe_write_ipc(uv_loop_t* loop,
1479 uv_write_t* req,
1480 uv_pipe_t* handle,
1481 const uv_buf_t data_bufs[],
1482 size_t data_buf_count,
1483 uv_stream_t* send_handle,
1484 uv_write_cb cb) {
1485 uv_buf_t stack_bufs[6];
1486 uv_buf_t* bufs;
1487 size_t buf_count, buf_index;
1488 uv__ipc_frame_header_t frame_header;
1489 uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
1490 uv__ipc_socket_xfer_info_t xfer_info;
1491 uint64_t data_length;
1492 size_t i;
1493 int err;
1494
1495 /* Compute the combined size of data buffers. */
1496 data_length = 0;
1497 for (i = 0; i < data_buf_count; i++)
1498 data_length += data_bufs[i].len;
1499 if (data_length > UINT32_MAX)
1500 return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1501
1502 /* Prepare the frame's socket xfer payload. */
1503 if (send_handle != NULL) {
1504 uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
1505
1506 /* Verify that `send_handle` it is indeed a tcp handle. */
1507 if (send_tcp_handle->type != UV_TCP)
1508 return ERROR_NOT_SUPPORTED;
1509
1510 /* Export the tcp handle. */
1511 err = uv__tcp_xfer_export(send_tcp_handle,
1512 uv__pipe_get_ipc_remote_pid(handle),
1513 &xfer_type,
1514 &xfer_info);
1515 if (err != 0)
1516 return err;
1517 }
1518
1519 /* Compute the number of uv_buf_t's required. */
1520 buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
1521 if (send_handle != NULL)
1522 buf_count += 1; /* One extra for the socket xfer information. */
1523
1524 /* Use the on-stack buffer array if it is big enough; otherwise allocate
1525 * space for it on the heap. */
1526 if (buf_count < ARRAY_SIZE(stack_bufs)) {
1527 /* Use on-stack buffer array. */
1528 bufs = stack_bufs;
1529 } else {
1530 /* Use heap-allocated buffer array. */
1531 bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
1532 if (bufs == NULL)
1533 return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1534 }
1535 buf_index = 0;
1536
1537 /* Initialize frame header and add it to the buffers list. */
1538 memset(&frame_header, 0, sizeof frame_header);
1539 bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
1540
1541 if (send_handle != NULL) {
1542 /* Add frame header flags. */
1543 switch (xfer_type) {
1544 case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
1545 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
1546 UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
1547 break;
1548 case UV__IPC_SOCKET_XFER_TCP_SERVER:
1549 frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
1550 break;
1551 default:
1552 assert(0); /* Unreachable. */
1553 }
1554 /* Add xfer info buffer. */
1555 bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
1556 }
1557
1558 if (data_length > 0) {
1559 /* Update frame header. */
1560 frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
1561 frame_header.data_length = (uint32_t) data_length;
1562 /* Add data buffers to buffers list. */
1563 for (i = 0; i < data_buf_count; i++)
1564 bufs[buf_index++] = data_bufs[i];
1565 }
1566
1567 /* Write buffers. We set the `always_copy` flag, so it is not a problem that
1568 * some of the written data lives on the stack. */
1569 err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
1570
1571 /* If we had to heap-allocate the bufs array, free it now. */
1572 if (bufs != stack_bufs) {
1573 uv__free(bufs);
1574 }
1575
1576 return err;
1577 }
1578
1579
uv__pipe_write(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],size_t nbufs,uv_stream_t * send_handle,uv_write_cb cb)1580 int uv__pipe_write(uv_loop_t* loop,
1581 uv_write_t* req,
1582 uv_pipe_t* handle,
1583 const uv_buf_t bufs[],
1584 size_t nbufs,
1585 uv_stream_t* send_handle,
1586 uv_write_cb cb) {
1587 if (handle->ipc) {
1588 /* IPC pipe write: use framing protocol. */
1589 return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
1590 } else {
1591 /* Non-IPC pipe write: put data on the wire directly. */
1592 assert(send_handle == NULL);
1593 return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
1594 }
1595 }
1596
1597
uv_pipe_read_eof(uv_loop_t * loop,uv_pipe_t * handle,uv_buf_t buf)1598 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1599 uv_buf_t buf) {
1600 /* If there is an eof timer running, we don't need it any more, so discard
1601 * it. */
1602 eof_timer_destroy(handle);
1603
1604 handle->flags &= ~UV_HANDLE_READABLE;
1605 uv_read_stop((uv_stream_t*) handle);
1606
1607 handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1608 }
1609
1610
uv_pipe_read_error(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1611 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1612 uv_buf_t buf) {
1613 /* If there is an eof timer running, we don't need it any more, so discard
1614 * it. */
1615 eof_timer_destroy(handle);
1616
1617 uv_read_stop((uv_stream_t*) handle);
1618
1619 handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1620 }
1621
1622
uv_pipe_read_error_or_eof(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1623 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1624 int error, uv_buf_t buf) {
1625 if (error == ERROR_BROKEN_PIPE) {
1626 uv_pipe_read_eof(loop, handle, buf);
1627 } else {
1628 uv_pipe_read_error(loop, handle, error, buf);
1629 }
1630 }
1631
1632
uv__pipe_queue_ipc_xfer_info(uv_pipe_t * handle,uv__ipc_socket_xfer_type_t xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1633 static void uv__pipe_queue_ipc_xfer_info(
1634 uv_pipe_t* handle,
1635 uv__ipc_socket_xfer_type_t xfer_type,
1636 uv__ipc_socket_xfer_info_t* xfer_info) {
1637 uv__ipc_xfer_queue_item_t* item;
1638
1639 item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
1640 if (item == NULL)
1641 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1642
1643 item->xfer_type = xfer_type;
1644 item->xfer_info = *xfer_info;
1645
1646 QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
1647 handle->pipe.conn.ipc_xfer_queue_length++;
1648 }
1649
1650
1651 /* Read an exact number of bytes from a pipe. If an error or end-of-file is
1652 * encountered before the requested number of bytes are read, an error is
1653 * returned. */
uv__pipe_read_exactly(HANDLE h,void * buffer,DWORD count)1654 static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
1655 DWORD bytes_read, bytes_read_now;
1656
1657 bytes_read = 0;
1658 while (bytes_read < count) {
1659 if (!ReadFile(h,
1660 (char*) buffer + bytes_read,
1661 count - bytes_read,
1662 &bytes_read_now,
1663 NULL)) {
1664 return GetLastError();
1665 }
1666
1667 bytes_read += bytes_read_now;
1668 }
1669
1670 assert(bytes_read == count);
1671 return 0;
1672 }
1673
1674
uv__pipe_read_data(uv_loop_t * loop,uv_pipe_t * handle,DWORD suggested_bytes,DWORD max_bytes)1675 static DWORD uv__pipe_read_data(uv_loop_t* loop,
1676 uv_pipe_t* handle,
1677 DWORD suggested_bytes,
1678 DWORD max_bytes) {
1679 DWORD bytes_read;
1680 uv_buf_t buf;
1681
1682 /* Ask the user for a buffer to read data into. */
1683 buf = uv_buf_init(NULL, 0);
1684 handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
1685 if (buf.base == NULL || buf.len == 0) {
1686 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1687 return 0; /* Break out of read loop. */
1688 }
1689
1690 /* Ensure we read at most the smaller of:
1691 * (a) the length of the user-allocated buffer.
1692 * (b) the maximum data length as specified by the `max_bytes` argument.
1693 */
1694 if (max_bytes > buf.len)
1695 max_bytes = buf.len;
1696
1697 /* Read into the user buffer. */
1698 if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
1699 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1700 return 0; /* Break out of read loop. */
1701 }
1702
1703 /* Call the read callback. */
1704 handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
1705
1706 return bytes_read;
1707 }
1708
1709
uv__pipe_read_ipc(uv_loop_t * loop,uv_pipe_t * handle)1710 static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
1711 uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
1712 int err;
1713
1714 if (*data_remaining > 0) {
1715 /* Read frame data payload. */
1716 DWORD bytes_read =
1717 uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
1718 *data_remaining -= bytes_read;
1719 return bytes_read;
1720
1721 } else {
1722 /* Start of a new IPC frame. */
1723 uv__ipc_frame_header_t frame_header;
1724 uint32_t xfer_flags;
1725 uv__ipc_socket_xfer_type_t xfer_type;
1726 uv__ipc_socket_xfer_info_t xfer_info;
1727
1728 /* Read the IPC frame header. */
1729 err = uv__pipe_read_exactly(
1730 handle->handle, &frame_header, sizeof frame_header);
1731 if (err)
1732 goto error;
1733
1734 /* Validate that flags are valid. */
1735 if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
1736 goto invalid;
1737 /* Validate that reserved2 is zero. */
1738 if (frame_header.reserved2 != 0)
1739 goto invalid;
1740
1741 /* Parse xfer flags. */
1742 xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
1743 if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
1744 /* Socket coming -- determine the type. */
1745 xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
1746 ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
1747 : UV__IPC_SOCKET_XFER_TCP_SERVER;
1748 } else if (xfer_flags == 0) {
1749 /* No socket. */
1750 xfer_type = UV__IPC_SOCKET_XFER_NONE;
1751 } else {
1752 /* Invalid flags. */
1753 goto invalid;
1754 }
1755
1756 /* Parse data frame information. */
1757 if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
1758 *data_remaining = frame_header.data_length;
1759 } else if (frame_header.data_length != 0) {
1760 /* Data length greater than zero but data flag not set -- invalid. */
1761 goto invalid;
1762 }
1763
1764 /* If no socket xfer info follows, return here. Data will be read in a
1765 * subsequent invocation of uv__pipe_read_ipc(). */
1766 if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
1767 return sizeof frame_header; /* Number of bytes read. */
1768
1769 /* Read transferred socket information. */
1770 err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
1771 if (err)
1772 goto error;
1773
1774 /* Store the pending socket info. */
1775 uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
1776
1777 /* Return number of bytes read. */
1778 return sizeof frame_header + sizeof xfer_info;
1779 }
1780
1781 invalid:
1782 /* Invalid frame. */
1783 err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
1784
1785 error:
1786 uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
1787 return 0; /* Break out of read loop. */
1788 }
1789
1790
uv_process_pipe_read_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * req)1791 void uv_process_pipe_read_req(uv_loop_t* loop,
1792 uv_pipe_t* handle,
1793 uv_req_t* req) {
1794 assert(handle->type == UV_NAMED_PIPE);
1795
1796 handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
1797 DECREASE_PENDING_REQ_COUNT(handle);
1798 eof_timer_stop(handle);
1799
1800 /* At this point, we're done with bookkeeping. If the user has stopped
1801 * reading the pipe in the meantime, there is nothing left to do, since there
1802 * is no callback that we can call. */
1803 if (!(handle->flags & UV_HANDLE_READING))
1804 return;
1805
1806 if (!REQ_SUCCESS(req)) {
1807 /* An error occurred doing the zero-read. */
1808 DWORD err = GET_REQ_ERROR(req);
1809
1810 /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
1811 * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
1812 * the user; we'll start a new zero-read at the end of this function. */
1813 if (err != ERROR_OPERATION_ABORTED)
1814 uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
1815
1816 } else {
1817 /* The zero-read completed without error, indicating there is data
1818 * available in the kernel buffer. */
1819 DWORD avail;
1820
1821 /* Get the number of bytes available. */
1822 avail = 0;
1823 if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
1824 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1825
1826 /* Read until we've either read all the bytes available, or the 'reading'
1827 * flag is cleared. */
1828 while (avail > 0 && handle->flags & UV_HANDLE_READING) {
1829 /* Depending on the type of pipe, read either IPC frames or raw data. */
1830 DWORD bytes_read =
1831 handle->ipc ? uv__pipe_read_ipc(loop, handle)
1832 : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
1833
1834 /* If no bytes were read, treat this as an indication that an error
1835 * occurred, and break out of the read loop. */
1836 if (bytes_read == 0)
1837 break;
1838
1839 /* It is possible that more bytes were read than we thought were
1840 * available. To prevent `avail` from underflowing, break out of the loop
1841 * if this is the case. */
1842 if (bytes_read > avail)
1843 break;
1844
1845 /* Recompute the number of bytes available. */
1846 avail -= bytes_read;
1847 }
1848 }
1849
1850 /* Start another zero-read request if necessary. */
1851 if ((handle->flags & UV_HANDLE_READING) &&
1852 !(handle->flags & UV_HANDLE_READ_PENDING)) {
1853 uv_pipe_queue_read(loop, handle);
1854 }
1855 }
1856
1857
uv_process_pipe_write_req(uv_loop_t * loop,uv_pipe_t * handle,uv_write_t * req)1858 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
1859 uv_write_t* req) {
1860 int err;
1861
1862 assert(handle->type == UV_NAMED_PIPE);
1863
1864 assert(handle->write_queue_size >= req->u.io.queued_bytes);
1865 handle->write_queue_size -= req->u.io.queued_bytes;
1866
1867 UNREGISTER_HANDLE_REQ(loop, handle, req);
1868
1869 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1870 if (req->wait_handle != INVALID_HANDLE_VALUE) {
1871 UnregisterWait(req->wait_handle);
1872 req->wait_handle = INVALID_HANDLE_VALUE;
1873 }
1874 if (req->event_handle) {
1875 CloseHandle(req->event_handle);
1876 req->event_handle = NULL;
1877 }
1878 }
1879
1880 err = GET_REQ_ERROR(req);
1881
1882 /* If this was a coalesced write, extract pointer to the user_provided
1883 * uv_write_t structure so we can pass the expected pointer to the callback,
1884 * then free the heap-allocated write req. */
1885 if (req->coalesced) {
1886 uv__coalesced_write_t* coalesced_write =
1887 container_of(req, uv__coalesced_write_t, req);
1888 req = coalesced_write->user_req;
1889 uv__free(coalesced_write);
1890 }
1891 if (req->cb) {
1892 req->cb(req, uv_translate_sys_error(err));
1893 }
1894
1895 handle->stream.conn.write_reqs_pending--;
1896
1897 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
1898 handle->pipe.conn.non_overlapped_writes_tail) {
1899 assert(handle->stream.conn.write_reqs_pending > 0);
1900 uv_queue_non_overlapped_write(handle);
1901 }
1902
1903 if (handle->stream.conn.shutdown_req != NULL &&
1904 handle->stream.conn.write_reqs_pending == 0) {
1905 uv_want_endgame(loop, (uv_handle_t*)handle);
1906 }
1907
1908 DECREASE_PENDING_REQ_COUNT(handle);
1909 }
1910
1911
uv_process_pipe_accept_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * raw_req)1912 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
1913 uv_req_t* raw_req) {
1914 uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
1915
1916 assert(handle->type == UV_NAMED_PIPE);
1917
1918 if (handle->flags & UV_HANDLE_CLOSING) {
1919 /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
1920 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
1921 DECREASE_PENDING_REQ_COUNT(handle);
1922 return;
1923 }
1924
1925 if (REQ_SUCCESS(req)) {
1926 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1927 req->next_pending = handle->pipe.serv.pending_accepts;
1928 handle->pipe.serv.pending_accepts = req;
1929
1930 if (handle->stream.serv.connection_cb) {
1931 handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
1932 }
1933 } else {
1934 if (req->pipeHandle != INVALID_HANDLE_VALUE) {
1935 CloseHandle(req->pipeHandle);
1936 req->pipeHandle = INVALID_HANDLE_VALUE;
1937 }
1938 if (!(handle->flags & UV_HANDLE_CLOSING)) {
1939 uv_pipe_queue_accept(loop, handle, req, FALSE);
1940 }
1941 }
1942
1943 DECREASE_PENDING_REQ_COUNT(handle);
1944 }
1945
1946
uv_process_pipe_connect_req(uv_loop_t * loop,uv_pipe_t * handle,uv_connect_t * req)1947 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
1948 uv_connect_t* req) {
1949 int err;
1950
1951 assert(handle->type == UV_NAMED_PIPE);
1952
1953 UNREGISTER_HANDLE_REQ(loop, handle, req);
1954
1955 if (req->cb) {
1956 err = 0;
1957 if (REQ_SUCCESS(req)) {
1958 uv_pipe_connection_init(handle);
1959 } else {
1960 err = GET_REQ_ERROR(req);
1961 }
1962 req->cb(req, uv_translate_sys_error(err));
1963 }
1964
1965 DECREASE_PENDING_REQ_COUNT(handle);
1966 }
1967
1968
uv_process_pipe_shutdown_req(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)1969 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
1970 uv_shutdown_t* req) {
1971 assert(handle->type == UV_NAMED_PIPE);
1972
1973 UNREGISTER_HANDLE_REQ(loop, handle, req);
1974
1975 if (handle->flags & UV_HANDLE_READABLE) {
1976 /* Initialize and optionally start the eof timer. Only do this if the pipe
1977 * is readable and we haven't seen EOF come in ourselves. */
1978 eof_timer_init(handle);
1979
1980 /* If reading start the timer right now. Otherwise uv_pipe_queue_read will
1981 * start it. */
1982 if (handle->flags & UV_HANDLE_READ_PENDING) {
1983 eof_timer_start(handle);
1984 }
1985
1986 } else {
1987 /* This pipe is not readable. We can just close it to let the other end
1988 * know that we're done writing. */
1989 close_pipe(handle);
1990 }
1991
1992 if (req->cb) {
1993 req->cb(req, 0);
1994 }
1995
1996 DECREASE_PENDING_REQ_COUNT(handle);
1997 }
1998
1999
eof_timer_init(uv_pipe_t * pipe)2000 static void eof_timer_init(uv_pipe_t* pipe) {
2001 int r;
2002
2003 assert(pipe->pipe.conn.eof_timer == NULL);
2004 assert(pipe->flags & UV_HANDLE_CONNECTION);
2005
2006 pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
2007
2008 r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
2009 assert(r == 0); /* timers can't fail */
2010 pipe->pipe.conn.eof_timer->data = pipe;
2011 uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
2012 }
2013
2014
eof_timer_start(uv_pipe_t * pipe)2015 static void eof_timer_start(uv_pipe_t* pipe) {
2016 assert(pipe->flags & UV_HANDLE_CONNECTION);
2017
2018 if (pipe->pipe.conn.eof_timer != NULL) {
2019 uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
2020 }
2021 }
2022
2023
eof_timer_stop(uv_pipe_t * pipe)2024 static void eof_timer_stop(uv_pipe_t* pipe) {
2025 assert(pipe->flags & UV_HANDLE_CONNECTION);
2026
2027 if (pipe->pipe.conn.eof_timer != NULL) {
2028 uv_timer_stop(pipe->pipe.conn.eof_timer);
2029 }
2030 }
2031
2032
eof_timer_cb(uv_timer_t * timer)2033 static void eof_timer_cb(uv_timer_t* timer) {
2034 uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
2035 uv_loop_t* loop = timer->loop;
2036
2037 assert(pipe->type == UV_NAMED_PIPE);
2038
2039 /* This should always be true, since we start the timer only in
2040 * uv_pipe_queue_read after successfully calling ReadFile, or in
2041 * uv_process_pipe_shutdown_req if a read is pending, and we always
2042 * immediately stop the timer in uv_process_pipe_read_req. */
2043 assert(pipe->flags & UV_HANDLE_READ_PENDING);
2044
2045 /* If there are many packets coming off the iocp then the timer callback may
2046 * be called before the read request is coming off the queue. Therefore we
2047 * check here if the read request has completed but will be processed later.
2048 */
2049 if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
2050 HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
2051 return;
2052 }
2053
2054 /* Force both ends off the pipe. */
2055 close_pipe(pipe);
2056
2057 /* Stop reading, so the pending read that is going to fail will not be
2058 * reported to the user. */
2059 uv_read_stop((uv_stream_t*) pipe);
2060
2061 /* Report the eof and update flags. This will get reported even if the user
2062 * stopped reading in the meantime. TODO: is that okay? */
2063 uv_pipe_read_eof(loop, pipe, uv_null_buf_);
2064 }
2065
2066
eof_timer_destroy(uv_pipe_t * pipe)2067 static void eof_timer_destroy(uv_pipe_t* pipe) {
2068 assert(pipe->flags & UV_HANDLE_CONNECTION);
2069
2070 if (pipe->pipe.conn.eof_timer) {
2071 uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
2072 pipe->pipe.conn.eof_timer = NULL;
2073 }
2074 }
2075
2076
eof_timer_close_cb(uv_handle_t * handle)2077 static void eof_timer_close_cb(uv_handle_t* handle) {
2078 assert(handle->type == UV_TIMER);
2079 uv__free(handle);
2080 }
2081
2082
uv_pipe_open(uv_pipe_t * pipe,uv_file file)2083 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
2084 HANDLE os_handle = uv__get_osfhandle(file);
2085 NTSTATUS nt_status;
2086 IO_STATUS_BLOCK io_status;
2087 FILE_ACCESS_INFORMATION access;
2088 DWORD duplex_flags = 0;
2089
2090 if (os_handle == INVALID_HANDLE_VALUE)
2091 return UV_EBADF;
2092
2093 uv__once_init();
2094 /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
2095 * underlying OS handle and forget about the original fd.
2096 * We could also opt to use the original OS handle and just never close it,
2097 * but then there would be no reliable way to cancel pending read operations
2098 * upon close.
2099 */
2100 if (file <= 2) {
2101 if (!DuplicateHandle(INVALID_HANDLE_VALUE,
2102 os_handle,
2103 INVALID_HANDLE_VALUE,
2104 &os_handle,
2105 0,
2106 FALSE,
2107 DUPLICATE_SAME_ACCESS))
2108 return uv_translate_sys_error(GetLastError());
2109 file = -1;
2110 }
2111
2112 /* Determine what kind of permissions we have on this handle.
2113 * Cygwin opens the pipe in message mode, but we can support it,
2114 * just query the access flags and set the stream flags accordingly.
2115 */
2116 nt_status = pNtQueryInformationFile(os_handle,
2117 &io_status,
2118 &access,
2119 sizeof(access),
2120 FileAccessInformation);
2121 if (nt_status != STATUS_SUCCESS)
2122 return UV_EINVAL;
2123
2124 if (pipe->ipc) {
2125 if (!(access.AccessFlags & FILE_WRITE_DATA) ||
2126 !(access.AccessFlags & FILE_READ_DATA)) {
2127 return UV_EINVAL;
2128 }
2129 }
2130
2131 if (access.AccessFlags & FILE_WRITE_DATA)
2132 duplex_flags |= UV_HANDLE_WRITABLE;
2133 if (access.AccessFlags & FILE_READ_DATA)
2134 duplex_flags |= UV_HANDLE_READABLE;
2135
2136 if (os_handle == INVALID_HANDLE_VALUE ||
2137 uv_set_pipe_handle(pipe->loop,
2138 pipe,
2139 os_handle,
2140 file,
2141 duplex_flags) == -1) {
2142 return UV_EINVAL;
2143 }
2144
2145 uv_pipe_connection_init(pipe);
2146
2147 if (pipe->ipc) {
2148 assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
2149 pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
2150 assert(pipe->pipe.conn.ipc_remote_pid != (DWORD) -1);
2151 }
2152 return 0;
2153 }
2154
2155
uv__pipe_getname(const uv_pipe_t * handle,char * buffer,size_t * size)2156 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2157 NTSTATUS nt_status;
2158 IO_STATUS_BLOCK io_status;
2159 FILE_NAME_INFORMATION tmp_name_info;
2160 FILE_NAME_INFORMATION* name_info;
2161 WCHAR* name_buf;
2162 unsigned int addrlen;
2163 unsigned int name_size;
2164 unsigned int name_len;
2165 int err;
2166
2167 uv__once_init();
2168 name_info = NULL;
2169
2170 if (handle->handle == INVALID_HANDLE_VALUE) {
2171 *size = 0;
2172 return UV_EINVAL;
2173 }
2174
2175 /* NtQueryInformationFile will block if another thread is performing a
2176 * blocking operation on the queried handle. If the pipe handle is
2177 * synchronous, there may be a worker thread currently calling ReadFile() on
2178 * the pipe handle, which could cause a deadlock. To avoid this, interrupt
2179 * the read. */
2180 if (handle->flags & UV_HANDLE_CONNECTION &&
2181 handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
2182 uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
2183 }
2184
2185 nt_status = pNtQueryInformationFile(handle->handle,
2186 &io_status,
2187 &tmp_name_info,
2188 sizeof tmp_name_info,
2189 FileNameInformation);
2190 if (nt_status == STATUS_BUFFER_OVERFLOW) {
2191 name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2192 name_info = uv__malloc(name_size);
2193 if (!name_info) {
2194 *size = 0;
2195 err = UV_ENOMEM;
2196 goto cleanup;
2197 }
2198
2199 nt_status = pNtQueryInformationFile(handle->handle,
2200 &io_status,
2201 name_info,
2202 name_size,
2203 FileNameInformation);
2204 }
2205
2206 if (nt_status != STATUS_SUCCESS) {
2207 *size = 0;
2208 err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2209 goto error;
2210 }
2211
2212 if (!name_info) {
2213 /* the struct on stack was used */
2214 name_buf = tmp_name_info.FileName;
2215 name_len = tmp_name_info.FileNameLength;
2216 } else {
2217 name_buf = name_info->FileName;
2218 name_len = name_info->FileNameLength;
2219 }
2220
2221 if (name_len == 0) {
2222 *size = 0;
2223 err = 0;
2224 goto error;
2225 }
2226
2227 name_len /= sizeof(WCHAR);
2228
2229 /* check how much space we need */
2230 addrlen = WideCharToMultiByte(CP_UTF8,
2231 0,
2232 name_buf,
2233 name_len,
2234 NULL,
2235 0,
2236 NULL,
2237 NULL);
2238 if (!addrlen) {
2239 *size = 0;
2240 err = uv_translate_sys_error(GetLastError());
2241 goto error;
2242 } else if (pipe_prefix_len + addrlen >= *size) {
2243 /* "\\\\.\\pipe" + name */
2244 *size = pipe_prefix_len + addrlen + 1;
2245 err = UV_ENOBUFS;
2246 goto error;
2247 }
2248
2249 memcpy(buffer, pipe_prefix, pipe_prefix_len);
2250 addrlen = WideCharToMultiByte(CP_UTF8,
2251 0,
2252 name_buf,
2253 name_len,
2254 buffer+pipe_prefix_len,
2255 *size-pipe_prefix_len,
2256 NULL,
2257 NULL);
2258 if (!addrlen) {
2259 *size = 0;
2260 err = uv_translate_sys_error(GetLastError());
2261 goto error;
2262 }
2263
2264 addrlen += pipe_prefix_len;
2265 *size = addrlen;
2266 buffer[addrlen] = '\0';
2267
2268 err = 0;
2269
2270 error:
2271 uv__free(name_info);
2272
2273 cleanup:
2274 return err;
2275 }
2276
2277
uv_pipe_pending_count(uv_pipe_t * handle)2278 int uv_pipe_pending_count(uv_pipe_t* handle) {
2279 if (!handle->ipc)
2280 return 0;
2281 return handle->pipe.conn.ipc_xfer_queue_length;
2282 }
2283
2284
uv_pipe_getsockname(const uv_pipe_t * handle,char * buffer,size_t * size)2285 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2286 if (handle->flags & UV_HANDLE_BOUND)
2287 return uv__pipe_getname(handle, buffer, size);
2288
2289 if (handle->flags & UV_HANDLE_CONNECTION ||
2290 handle->handle != INVALID_HANDLE_VALUE) {
2291 *size = 0;
2292 return 0;
2293 }
2294
2295 return UV_EBADF;
2296 }
2297
2298
uv_pipe_getpeername(const uv_pipe_t * handle,char * buffer,size_t * size)2299 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2300 /* emulate unix behaviour */
2301 if (handle->flags & UV_HANDLE_BOUND)
2302 return UV_ENOTCONN;
2303
2304 if (handle->handle != INVALID_HANDLE_VALUE)
2305 return uv__pipe_getname(handle, buffer, size);
2306
2307 return UV_EBADF;
2308 }
2309
2310
uv_pipe_pending_type(uv_pipe_t * handle)2311 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2312 if (!handle->ipc)
2313 return UV_UNKNOWN_HANDLE;
2314 if (handle->pipe.conn.ipc_xfer_queue_length == 0)
2315 return UV_UNKNOWN_HANDLE;
2316 else
2317 return UV_TCP;
2318 }
2319
uv_pipe_chmod(uv_pipe_t * handle,int mode)2320 int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2321 SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
2322 PACL old_dacl, new_dacl;
2323 PSECURITY_DESCRIPTOR sd;
2324 EXPLICIT_ACCESS ea;
2325 PSID everyone;
2326 int error;
2327
2328 if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2329 return UV_EBADF;
2330
2331 if (mode != UV_READABLE &&
2332 mode != UV_WRITABLE &&
2333 mode != (UV_WRITABLE | UV_READABLE))
2334 return UV_EINVAL;
2335
2336 if (!AllocateAndInitializeSid(&sid_world,
2337 1,
2338 SECURITY_WORLD_RID,
2339 0, 0, 0, 0, 0, 0, 0,
2340 &everyone)) {
2341 error = GetLastError();
2342 goto done;
2343 }
2344
2345 if (GetSecurityInfo(handle->handle,
2346 SE_KERNEL_OBJECT,
2347 DACL_SECURITY_INFORMATION,
2348 NULL,
2349 NULL,
2350 &old_dacl,
2351 NULL,
2352 &sd)) {
2353 error = GetLastError();
2354 goto clean_sid;
2355 }
2356
2357 memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2358 if (mode & UV_READABLE)
2359 ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2360 if (mode & UV_WRITABLE)
2361 ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2362 ea.grfAccessPermissions |= SYNCHRONIZE;
2363 ea.grfAccessMode = SET_ACCESS;
2364 ea.grfInheritance = NO_INHERITANCE;
2365 ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2366 ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2367 ea.Trustee.ptstrName = (LPTSTR)everyone;
2368
2369 if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2370 error = GetLastError();
2371 goto clean_sd;
2372 }
2373
2374 if (SetSecurityInfo(handle->handle,
2375 SE_KERNEL_OBJECT,
2376 DACL_SECURITY_INFORMATION,
2377 NULL,
2378 NULL,
2379 new_dacl,
2380 NULL)) {
2381 error = GetLastError();
2382 goto clean_dacl;
2383 }
2384
2385 error = 0;
2386
2387 clean_dacl:
2388 LocalFree((HLOCAL) new_dacl);
2389 clean_sd:
2390 LocalFree((HLOCAL) sd);
2391 clean_sid:
2392 FreeSid(everyone);
2393 done:
2394 return uv_translate_sys_error(error);
2395 }
2396