1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include <assert.h>
23 #include <io.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27
28 #include "uv.h"
29 #include "internal.h"
30 #include "handle-inl.h"
31 #include "stream-inl.h"
32 #include "req-inl.h"
33
34 #include <aclapi.h>
35 #include <accctrl.h>
36
37 typedef struct uv__ipc_queue_item_s uv__ipc_queue_item_t;
38
39 struct uv__ipc_queue_item_s {
40 /*
41 * NOTE: It is important for socket_info_ex to be the first field,
42 * because we will we assigning it to the pending_ipc_info.socket_info
43 */
44 uv__ipc_socket_info_ex socket_info_ex;
45 QUEUE member;
46 int tcp_connection;
47 };
48
49 /* A zero-size buffer for use by uv_pipe_read */
50 static char uv_zero_[] = "";
51
52 /* Null uv_buf_t */
53 static const uv_buf_t uv_null_buf_ = { 0, NULL };
54
55 /* The timeout that the pipe will wait for the remote end to write data */
56 /* when the local ends wants to shut it down. */
57 static const int64_t eof_timeout = 50; /* ms */
58
59 static const int default_pending_pipe_instances = 4;
60
61 /* Pipe prefix */
62 static char pipe_prefix[] = "\\\\?\\pipe";
63 static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
64
65 /* IPC protocol flags. */
66 #define UV_IPC_RAW_DATA 0x0001
67 #define UV_IPC_TCP_SERVER 0x0002
68 #define UV_IPC_TCP_CONNECTION 0x0004
69
70 /* IPC frame header. */
71 typedef struct {
72 int flags;
73 uint64_t raw_data_length;
74 } uv_ipc_frame_header_t;
75
76 /* IPC frame, which contains an imported TCP socket stream. */
77 typedef struct {
78 uv_ipc_frame_header_t header;
79 uv__ipc_socket_info_ex socket_info_ex;
80 } uv_ipc_frame_uv_stream;
81
82 static void eof_timer_init(uv_pipe_t* pipe);
83 static void eof_timer_start(uv_pipe_t* pipe);
84 static void eof_timer_stop(uv_pipe_t* pipe);
85 static void eof_timer_cb(uv_timer_t* timer);
86 static void eof_timer_destroy(uv_pipe_t* pipe);
87 static void eof_timer_close_cb(uv_handle_t* handle);
88
89
uv_unique_pipe_name(char * ptr,char * name,size_t size)90 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
91 snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
92 }
93
94
uv_pipe_init(uv_loop_t * loop,uv_pipe_t * handle,int ipc)95 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
96 uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
97
98 handle->reqs_pending = 0;
99 handle->handle = INVALID_HANDLE_VALUE;
100 handle->name = NULL;
101 handle->pipe.conn.ipc_pid = 0;
102 handle->pipe.conn.remaining_ipc_rawdata_bytes = 0;
103 QUEUE_INIT(&handle->pipe.conn.pending_ipc_info.queue);
104 handle->pipe.conn.pending_ipc_info.queue_len = 0;
105 handle->ipc = ipc;
106 handle->pipe.conn.non_overlapped_writes_tail = NULL;
107 handle->pipe.conn.readfile_thread = NULL;
108
109 UV_REQ_INIT(&handle->pipe.conn.ipc_header_write_req, UV_UNKNOWN_REQ);
110
111 return 0;
112 }
113
114
uv_pipe_connection_init(uv_pipe_t * handle)115 static void uv_pipe_connection_init(uv_pipe_t* handle) {
116 uv_connection_init((uv_stream_t*) handle);
117 handle->read_req.data = handle;
118 handle->pipe.conn.eof_timer = NULL;
119 assert(!(handle->flags & UV_HANDLE_PIPESERVER));
120 if (pCancelSynchronousIo &&
121 handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
122 uv_mutex_init(&handle->pipe.conn.readfile_mutex);
123 handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE;
124 }
125 }
126
127
open_named_pipe(const WCHAR * name,DWORD * duplex_flags)128 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
129 HANDLE pipeHandle;
130
131 /*
132 * Assume that we have a duplex pipe first, so attempt to
133 * connect with GENERIC_READ | GENERIC_WRITE.
134 */
135 pipeHandle = CreateFileW(name,
136 GENERIC_READ | GENERIC_WRITE,
137 0,
138 NULL,
139 OPEN_EXISTING,
140 FILE_FLAG_OVERLAPPED,
141 NULL);
142 if (pipeHandle != INVALID_HANDLE_VALUE) {
143 *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
144 return pipeHandle;
145 }
146
147 /*
148 * If the pipe is not duplex CreateFileW fails with
149 * ERROR_ACCESS_DENIED. In that case try to connect
150 * as a read-only or write-only.
151 */
152 if (GetLastError() == ERROR_ACCESS_DENIED) {
153 pipeHandle = CreateFileW(name,
154 GENERIC_READ | FILE_WRITE_ATTRIBUTES,
155 0,
156 NULL,
157 OPEN_EXISTING,
158 FILE_FLAG_OVERLAPPED,
159 NULL);
160
161 if (pipeHandle != INVALID_HANDLE_VALUE) {
162 *duplex_flags = UV_HANDLE_READABLE;
163 return pipeHandle;
164 }
165 }
166
167 if (GetLastError() == ERROR_ACCESS_DENIED) {
168 pipeHandle = CreateFileW(name,
169 GENERIC_WRITE | FILE_READ_ATTRIBUTES,
170 0,
171 NULL,
172 OPEN_EXISTING,
173 FILE_FLAG_OVERLAPPED,
174 NULL);
175
176 if (pipeHandle != INVALID_HANDLE_VALUE) {
177 *duplex_flags = UV_HANDLE_WRITABLE;
178 return pipeHandle;
179 }
180 }
181
182 return INVALID_HANDLE_VALUE;
183 }
184
185
close_pipe(uv_pipe_t * pipe)186 static void close_pipe(uv_pipe_t* pipe) {
187 assert(pipe->u.fd == -1 || pipe->u.fd > 2);
188 if (pipe->u.fd == -1)
189 CloseHandle(pipe->handle);
190 else
191 close(pipe->u.fd);
192
193 pipe->u.fd = -1;
194 pipe->handle = INVALID_HANDLE_VALUE;
195 }
196
197
uv_stdio_pipe_server(uv_loop_t * loop,uv_pipe_t * handle,DWORD access,char * name,size_t nameSize)198 int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
199 char* name, size_t nameSize) {
200 HANDLE pipeHandle;
201 int err;
202 char* ptr = (char*)handle;
203
204 for (;;) {
205 uv_unique_pipe_name(ptr, name, nameSize);
206
207 pipeHandle = CreateNamedPipeA(name,
208 access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE | WRITE_DAC,
209 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
210 NULL);
211
212 if (pipeHandle != INVALID_HANDLE_VALUE) {
213 /* No name collisions. We're done. */
214 break;
215 }
216
217 err = GetLastError();
218 if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
219 goto error;
220 }
221
222 /* Pipe name collision. Increment the pointer and try again. */
223 ptr++;
224 }
225
226 if (CreateIoCompletionPort(pipeHandle,
227 loop->iocp,
228 (ULONG_PTR)handle,
229 0) == NULL) {
230 err = GetLastError();
231 goto error;
232 }
233
234 uv_pipe_connection_init(handle);
235 handle->handle = pipeHandle;
236
237 return 0;
238
239 error:
240 if (pipeHandle != INVALID_HANDLE_VALUE) {
241 CloseHandle(pipeHandle);
242 }
243
244 return err;
245 }
246
247
uv_set_pipe_handle(uv_loop_t * loop,uv_pipe_t * handle,HANDLE pipeHandle,int fd,DWORD duplex_flags)248 static int uv_set_pipe_handle(uv_loop_t* loop,
249 uv_pipe_t* handle,
250 HANDLE pipeHandle,
251 int fd,
252 DWORD duplex_flags) {
253 NTSTATUS nt_status;
254 IO_STATUS_BLOCK io_status;
255 FILE_MODE_INFORMATION mode_info;
256 DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
257 DWORD current_mode = 0;
258 DWORD err = 0;
259
260 if (!(handle->flags & UV_HANDLE_PIPESERVER) &&
261 handle->handle != INVALID_HANDLE_VALUE)
262 return UV_EBUSY;
263
264 if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
265 err = GetLastError();
266 if (err == ERROR_ACCESS_DENIED) {
267 /*
268 * SetNamedPipeHandleState can fail if the handle doesn't have either
269 * GENERIC_WRITE or FILE_WRITE_ATTRIBUTES.
270 * But if the handle already has the desired wait and blocking modes
271 * we can continue.
272 */
273 if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL,
274 NULL, NULL, 0)) {
275 return -1;
276 } else if (current_mode & PIPE_NOWAIT) {
277 SetLastError(ERROR_ACCESS_DENIED);
278 return -1;
279 }
280 } else {
281 /* If this returns ERROR_INVALID_PARAMETER we probably opened
282 * something that is not a pipe. */
283 if (err == ERROR_INVALID_PARAMETER) {
284 SetLastError(WSAENOTSOCK);
285 }
286 return -1;
287 }
288 }
289
290 /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
291 nt_status = pNtQueryInformationFile(pipeHandle,
292 &io_status,
293 &mode_info,
294 sizeof(mode_info),
295 FileModeInformation);
296 if (nt_status != STATUS_SUCCESS) {
297 return -1;
298 }
299
300 if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
301 mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
302 /* Non-overlapped pipe. */
303 handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
304 } else {
305 /* Overlapped pipe. Try to associate with IOCP. */
306 if (CreateIoCompletionPort(pipeHandle,
307 loop->iocp,
308 (ULONG_PTR)handle,
309 0) == NULL) {
310 handle->flags |= UV_HANDLE_EMULATE_IOCP;
311 }
312 }
313
314 handle->handle = pipeHandle;
315 handle->u.fd = fd;
316 handle->flags |= duplex_flags;
317
318 return 0;
319 }
320
321
pipe_shutdown_thread_proc(void * parameter)322 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
323 uv_loop_t* loop;
324 uv_pipe_t* handle;
325 uv_shutdown_t* req;
326
327 req = (uv_shutdown_t*) parameter;
328 assert(req);
329 handle = (uv_pipe_t*) req->handle;
330 assert(handle);
331 loop = handle->loop;
332 assert(loop);
333
334 FlushFileBuffers(handle->handle);
335
336 /* Post completed */
337 POST_COMPLETION_FOR_REQ(loop, req);
338
339 return 0;
340 }
341
342
uv_pipe_endgame(uv_loop_t * loop,uv_pipe_t * handle)343 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
344 int err;
345 DWORD result;
346 uv_shutdown_t* req;
347 NTSTATUS nt_status;
348 IO_STATUS_BLOCK io_status;
349 FILE_PIPE_LOCAL_INFORMATION pipe_info;
350 uv__ipc_queue_item_t* item;
351
352 if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
353 handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE;
354 uv_mutex_destroy(&handle->pipe.conn.readfile_mutex);
355 }
356
357 if ((handle->flags & UV_HANDLE_CONNECTION) &&
358 handle->stream.conn.shutdown_req != NULL &&
359 handle->stream.conn.write_reqs_pending == 0) {
360 req = handle->stream.conn.shutdown_req;
361
362 /* Clear the shutdown_req field so we don't go here again. */
363 handle->stream.conn.shutdown_req = NULL;
364
365 if (handle->flags & UV__HANDLE_CLOSING) {
366 UNREGISTER_HANDLE_REQ(loop, handle, req);
367
368 /* Already closing. Cancel the shutdown. */
369 if (req->cb) {
370 req->cb(req, UV_ECANCELED);
371 }
372
373 DECREASE_PENDING_REQ_COUNT(handle);
374 return;
375 }
376
377 /* Try to avoid flushing the pipe buffer in the thread pool. */
378 nt_status = pNtQueryInformationFile(handle->handle,
379 &io_status,
380 &pipe_info,
381 sizeof pipe_info,
382 FilePipeLocalInformation);
383
384 if (nt_status != STATUS_SUCCESS) {
385 /* Failure */
386 UNREGISTER_HANDLE_REQ(loop, handle, req);
387
388 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
389 if (req->cb) {
390 err = pRtlNtStatusToDosError(nt_status);
391 req->cb(req, uv_translate_sys_error(err));
392 }
393
394 DECREASE_PENDING_REQ_COUNT(handle);
395 return;
396 }
397
398 if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
399 /* Short-circuit, no need to call FlushFileBuffers. */
400 uv_insert_pending_req(loop, (uv_req_t*) req);
401 return;
402 }
403
404 /* Run FlushFileBuffers in the thread pool. */
405 result = QueueUserWorkItem(pipe_shutdown_thread_proc,
406 req,
407 WT_EXECUTELONGFUNCTION);
408 if (result) {
409 return;
410
411 } else {
412 /* Failure. */
413 UNREGISTER_HANDLE_REQ(loop, handle, req);
414
415 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
416 if (req->cb) {
417 err = GetLastError();
418 req->cb(req, uv_translate_sys_error(err));
419 }
420
421 DECREASE_PENDING_REQ_COUNT(handle);
422 return;
423 }
424 }
425
426 if (handle->flags & UV__HANDLE_CLOSING &&
427 handle->reqs_pending == 0) {
428 assert(!(handle->flags & UV_HANDLE_CLOSED));
429
430 if (handle->flags & UV_HANDLE_CONNECTION) {
431 /* Free pending sockets */
432 while (!QUEUE_EMPTY(&handle->pipe.conn.pending_ipc_info.queue)) {
433 QUEUE* q;
434 SOCKET socket;
435
436 q = QUEUE_HEAD(&handle->pipe.conn.pending_ipc_info.queue);
437 QUEUE_REMOVE(q);
438 item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
439
440 /* Materialize socket and close it */
441 socket = WSASocketW(FROM_PROTOCOL_INFO,
442 FROM_PROTOCOL_INFO,
443 FROM_PROTOCOL_INFO,
444 &item->socket_info_ex.socket_info,
445 0,
446 WSA_FLAG_OVERLAPPED);
447 uv__free(item);
448
449 if (socket != INVALID_SOCKET)
450 closesocket(socket);
451 }
452 handle->pipe.conn.pending_ipc_info.queue_len = 0;
453
454 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
455 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
456 UnregisterWait(handle->read_req.wait_handle);
457 handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
458 }
459 if (handle->read_req.event_handle) {
460 CloseHandle(handle->read_req.event_handle);
461 handle->read_req.event_handle = NULL;
462 }
463 }
464 }
465
466 if (handle->flags & UV_HANDLE_PIPESERVER) {
467 assert(handle->pipe.serv.accept_reqs);
468 uv__free(handle->pipe.serv.accept_reqs);
469 handle->pipe.serv.accept_reqs = NULL;
470 }
471
472 uv__handle_close(handle);
473 }
474 }
475
476
uv_pipe_pending_instances(uv_pipe_t * handle,int count)477 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
478 if (handle->flags & UV_HANDLE_BOUND)
479 return;
480 handle->pipe.serv.pending_instances = count;
481 handle->flags |= UV_HANDLE_PIPESERVER;
482 }
483
484
485 /* Creates a pipe server. */
uv_pipe_bind(uv_pipe_t * handle,const char * name)486 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
487 uv_loop_t* loop = handle->loop;
488 int i, err, nameSize;
489 uv_pipe_accept_t* req;
490
491 if (handle->flags & UV_HANDLE_BOUND) {
492 return UV_EINVAL;
493 }
494
495 if (!name) {
496 return UV_EINVAL;
497 }
498
499 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
500 handle->pipe.serv.pending_instances = default_pending_pipe_instances;
501 }
502
503 handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
504 uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
505 if (!handle->pipe.serv.accept_reqs) {
506 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
507 }
508
509 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
510 req = &handle->pipe.serv.accept_reqs[i];
511 UV_REQ_INIT(req, UV_ACCEPT);
512 req->data = handle;
513 req->pipeHandle = INVALID_HANDLE_VALUE;
514 req->next_pending = NULL;
515 }
516
517 /* Convert name to UTF16. */
518 nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
519 handle->name = (WCHAR*)uv__malloc(nameSize);
520 if (!handle->name) {
521 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
522 }
523
524 if (!MultiByteToWideChar(CP_UTF8,
525 0,
526 name,
527 -1,
528 handle->name,
529 nameSize / sizeof(WCHAR))) {
530 err = GetLastError();
531 goto error;
532 }
533
534 /*
535 * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
536 * If this fails then there's already a pipe server for the given pipe name.
537 */
538 handle->pipe.serv.accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
539 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
540 FILE_FLAG_FIRST_PIPE_INSTANCE | WRITE_DAC,
541 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
542 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
543
544 if (handle->pipe.serv.accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
545 err = GetLastError();
546 if (err == ERROR_ACCESS_DENIED) {
547 err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
548 } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
549 err = WSAEACCES; /* Translates to UV_EACCES. */
550 }
551 goto error;
552 }
553
554 if (uv_set_pipe_handle(loop,
555 handle,
556 handle->pipe.serv.accept_reqs[0].pipeHandle,
557 -1,
558 0)) {
559 err = GetLastError();
560 goto error;
561 }
562
563 handle->pipe.serv.pending_accepts = NULL;
564 handle->flags |= UV_HANDLE_PIPESERVER;
565 handle->flags |= UV_HANDLE_BOUND;
566
567 return 0;
568
569 error:
570 if (handle->name) {
571 uv__free(handle->name);
572 handle->name = NULL;
573 }
574
575 if (handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
576 CloseHandle(handle->pipe.serv.accept_reqs[0].pipeHandle);
577 handle->pipe.serv.accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
578 }
579
580 return uv_translate_sys_error(err);
581 }
582
583
pipe_connect_thread_proc(void * parameter)584 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
585 uv_loop_t* loop;
586 uv_pipe_t* handle;
587 uv_connect_t* req;
588 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
589 DWORD duplex_flags;
590
591 req = (uv_connect_t*) parameter;
592 assert(req);
593 handle = (uv_pipe_t*) req->handle;
594 assert(handle);
595 loop = handle->loop;
596 assert(loop);
597
598 /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
599 /* We wait for the pipe to become available with WaitNamedPipe. */
600 while (WaitNamedPipeW(handle->name, 30000)) {
601 /* The pipe is now available, try to connect. */
602 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
603 if (pipeHandle != INVALID_HANDLE_VALUE) {
604 break;
605 }
606
607 SwitchToThread();
608 }
609
610 if (pipeHandle != INVALID_HANDLE_VALUE &&
611 !uv_set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags)) {
612 SET_REQ_SUCCESS(req);
613 } else {
614 SET_REQ_ERROR(req, GetLastError());
615 }
616
617 /* Post completed */
618 POST_COMPLETION_FOR_REQ(loop, req);
619
620 return 0;
621 }
622
623
uv_pipe_connect(uv_connect_t * req,uv_pipe_t * handle,const char * name,uv_connect_cb cb)624 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
625 const char* name, uv_connect_cb cb) {
626 uv_loop_t* loop = handle->loop;
627 int err, nameSize;
628 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
629 DWORD duplex_flags;
630
631 UV_REQ_INIT(req, UV_CONNECT);
632 req->handle = (uv_stream_t*) handle;
633 req->cb = cb;
634
635 /* Convert name to UTF16. */
636 nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
637 handle->name = (WCHAR*)uv__malloc(nameSize);
638 if (!handle->name) {
639 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
640 }
641
642 if (!MultiByteToWideChar(CP_UTF8,
643 0,
644 name,
645 -1,
646 handle->name,
647 nameSize / sizeof(WCHAR))) {
648 err = GetLastError();
649 goto error;
650 }
651
652 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
653 if (pipeHandle == INVALID_HANDLE_VALUE) {
654 if (GetLastError() == ERROR_PIPE_BUSY) {
655 /* Wait for the server to make a pipe instance available. */
656 if (!QueueUserWorkItem(&pipe_connect_thread_proc,
657 req,
658 WT_EXECUTELONGFUNCTION)) {
659 err = GetLastError();
660 goto error;
661 }
662
663 REGISTER_HANDLE_REQ(loop, handle, req);
664 handle->reqs_pending++;
665
666 return;
667 }
668
669 err = GetLastError();
670 goto error;
671 }
672
673 assert(pipeHandle != INVALID_HANDLE_VALUE);
674
675 if (uv_set_pipe_handle(loop,
676 (uv_pipe_t*) req->handle,
677 pipeHandle,
678 -1,
679 duplex_flags)) {
680 err = GetLastError();
681 goto error;
682 }
683
684 SET_REQ_SUCCESS(req);
685 uv_insert_pending_req(loop, (uv_req_t*) req);
686 handle->reqs_pending++;
687 REGISTER_HANDLE_REQ(loop, handle, req);
688 return;
689
690 error:
691 if (handle->name) {
692 uv__free(handle->name);
693 handle->name = NULL;
694 }
695
696 if (pipeHandle != INVALID_HANDLE_VALUE) {
697 CloseHandle(pipeHandle);
698 }
699
700 /* Make this req pending reporting an error. */
701 SET_REQ_ERROR(req, err);
702 uv_insert_pending_req(loop, (uv_req_t*) req);
703 handle->reqs_pending++;
704 REGISTER_HANDLE_REQ(loop, handle, req);
705 return;
706 }
707
708
uv__pipe_pause_read(uv_pipe_t * handle)709 void uv__pipe_pause_read(uv_pipe_t* handle) {
710 if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
711 /* Pause the ReadFile task briefly, to work
712 around the Windows kernel bug that causes
713 any access to a NamedPipe to deadlock if
714 any process has called ReadFile */
715 HANDLE h;
716 uv_mutex_lock(&handle->pipe.conn.readfile_mutex);
717 h = handle->pipe.conn.readfile_thread;
718 while (h) {
719 /* spinlock: we expect this to finish quickly,
720 or we are probably about to deadlock anyways
721 (in the kernel), so it doesn't matter */
722 pCancelSynchronousIo(h);
723 SwitchToThread(); /* yield thread control briefly */
724 h = handle->pipe.conn.readfile_thread;
725 }
726 }
727 }
728
729
uv__pipe_unpause_read(uv_pipe_t * handle)730 void uv__pipe_unpause_read(uv_pipe_t* handle) {
731 if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
732 uv_mutex_unlock(&handle->pipe.conn.readfile_mutex);
733 }
734 }
735
736
uv__pipe_stop_read(uv_pipe_t * handle)737 void uv__pipe_stop_read(uv_pipe_t* handle) {
738 handle->flags &= ~UV_HANDLE_READING;
739 uv__pipe_pause_read((uv_pipe_t*)handle);
740 uv__pipe_unpause_read((uv_pipe_t*)handle);
741 }
742
743
744 /* Cleans up uv_pipe_t (server or connection) and all resources associated */
745 /* with it. */
uv_pipe_cleanup(uv_loop_t * loop,uv_pipe_t * handle)746 void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
747 int i;
748 HANDLE pipeHandle;
749
750 uv__pipe_stop_read(handle);
751
752 if (handle->name) {
753 uv__free(handle->name);
754 handle->name = NULL;
755 }
756
757 if (handle->flags & UV_HANDLE_PIPESERVER) {
758 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
759 pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
760 if (pipeHandle != INVALID_HANDLE_VALUE) {
761 CloseHandle(pipeHandle);
762 handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
763 }
764 }
765 handle->handle = INVALID_HANDLE_VALUE;
766 }
767
768 if (handle->flags & UV_HANDLE_CONNECTION) {
769 handle->flags &= ~UV_HANDLE_WRITABLE;
770 eof_timer_destroy(handle);
771 }
772
773 if ((handle->flags & UV_HANDLE_CONNECTION)
774 && handle->handle != INVALID_HANDLE_VALUE)
775 close_pipe(handle);
776 }
777
778
uv_pipe_close(uv_loop_t * loop,uv_pipe_t * handle)779 void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
780 if (handle->flags & UV_HANDLE_READING) {
781 handle->flags &= ~UV_HANDLE_READING;
782 DECREASE_ACTIVE_COUNT(loop, handle);
783 }
784
785 if (handle->flags & UV_HANDLE_LISTENING) {
786 handle->flags &= ~UV_HANDLE_LISTENING;
787 DECREASE_ACTIVE_COUNT(loop, handle);
788 }
789
790 uv_pipe_cleanup(loop, handle);
791
792 if (handle->reqs_pending == 0) {
793 uv_want_endgame(loop, (uv_handle_t*) handle);
794 }
795
796 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
797 uv__handle_closing(handle);
798 }
799
800
uv_pipe_queue_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)801 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
802 uv_pipe_accept_t* req, BOOL firstInstance) {
803 assert(handle->flags & UV_HANDLE_LISTENING);
804
805 if (!firstInstance) {
806 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
807
808 req->pipeHandle = CreateNamedPipeW(handle->name,
809 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC,
810 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
811 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
812
813 if (req->pipeHandle == INVALID_HANDLE_VALUE) {
814 SET_REQ_ERROR(req, GetLastError());
815 uv_insert_pending_req(loop, (uv_req_t*) req);
816 handle->reqs_pending++;
817 return;
818 }
819
820 if (uv_set_pipe_handle(loop, handle, req->pipeHandle, -1, 0)) {
821 CloseHandle(req->pipeHandle);
822 req->pipeHandle = INVALID_HANDLE_VALUE;
823 SET_REQ_ERROR(req, GetLastError());
824 uv_insert_pending_req(loop, (uv_req_t*) req);
825 handle->reqs_pending++;
826 return;
827 }
828 }
829
830 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
831
832 /* Prepare the overlapped structure. */
833 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
834
835 if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
836 GetLastError() != ERROR_IO_PENDING) {
837 if (GetLastError() == ERROR_PIPE_CONNECTED) {
838 SET_REQ_SUCCESS(req);
839 } else {
840 CloseHandle(req->pipeHandle);
841 req->pipeHandle = INVALID_HANDLE_VALUE;
842 /* Make this req pending reporting an error. */
843 SET_REQ_ERROR(req, GetLastError());
844 }
845 uv_insert_pending_req(loop, (uv_req_t*) req);
846 handle->reqs_pending++;
847 return;
848 }
849
850 handle->reqs_pending++;
851 }
852
853
uv_pipe_accept(uv_pipe_t * server,uv_stream_t * client)854 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
855 uv_loop_t* loop = server->loop;
856 uv_pipe_t* pipe_client;
857 uv_pipe_accept_t* req;
858 QUEUE* q;
859 uv__ipc_queue_item_t* item;
860 int err;
861
862 if (server->ipc) {
863 if (QUEUE_EMPTY(&server->pipe.conn.pending_ipc_info.queue)) {
864 /* No valid pending sockets. */
865 return WSAEWOULDBLOCK;
866 }
867
868 q = QUEUE_HEAD(&server->pipe.conn.pending_ipc_info.queue);
869 QUEUE_REMOVE(q);
870 server->pipe.conn.pending_ipc_info.queue_len--;
871 item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
872
873 err = uv_tcp_import((uv_tcp_t*)client,
874 &item->socket_info_ex,
875 item->tcp_connection);
876 if (err != 0)
877 return err;
878
879 uv__free(item);
880
881 } else {
882 pipe_client = (uv_pipe_t*)client;
883
884 /* Find a connection instance that has been connected, but not yet */
885 /* accepted. */
886 req = server->pipe.serv.pending_accepts;
887
888 if (!req) {
889 /* No valid connections found, so we error out. */
890 return WSAEWOULDBLOCK;
891 }
892
893 /* Initialize the client handle and copy the pipeHandle to the client */
894 uv_pipe_connection_init(pipe_client);
895 pipe_client->handle = req->pipeHandle;
896 pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
897
898 /* Prepare the req to pick up a new connection */
899 server->pipe.serv.pending_accepts = req->next_pending;
900 req->next_pending = NULL;
901 req->pipeHandle = INVALID_HANDLE_VALUE;
902
903 if (!(server->flags & UV__HANDLE_CLOSING)) {
904 uv_pipe_queue_accept(loop, server, req, FALSE);
905 }
906 }
907
908 return 0;
909 }
910
911
912 /* Starts listening for connections for the given pipe. */
uv_pipe_listen(uv_pipe_t * handle,int backlog,uv_connection_cb cb)913 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
914 uv_loop_t* loop = handle->loop;
915 int i;
916
917 if (handle->flags & UV_HANDLE_LISTENING) {
918 handle->stream.serv.connection_cb = cb;
919 }
920
921 if (!(handle->flags & UV_HANDLE_BOUND)) {
922 return WSAEINVAL;
923 }
924
925 if (handle->flags & UV_HANDLE_READING) {
926 return WSAEISCONN;
927 }
928
929 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
930 return ERROR_NOT_SUPPORTED;
931 }
932
933 handle->flags |= UV_HANDLE_LISTENING;
934 INCREASE_ACTIVE_COUNT(loop, handle);
935 handle->stream.serv.connection_cb = cb;
936
937 /* First pipe handle should have already been created in uv_pipe_bind */
938 assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
939
940 for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
941 uv_pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
942 }
943
944 return 0;
945 }
946
947
uv_pipe_zero_readfile_thread_proc(void * parameter)948 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
949 int result;
950 DWORD bytes;
951 uv_read_t* req = (uv_read_t*) parameter;
952 uv_pipe_t* handle = (uv_pipe_t*) req->data;
953 uv_loop_t* loop = handle->loop;
954 HANDLE hThread = NULL;
955 DWORD err;
956 uv_mutex_t *m = &handle->pipe.conn.readfile_mutex;
957
958 assert(req != NULL);
959 assert(req->type == UV_READ);
960 assert(handle->type == UV_NAMED_PIPE);
961
962 if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
963 uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */
964 if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
965 GetCurrentProcess(), &hThread,
966 0, FALSE, DUPLICATE_SAME_ACCESS)) {
967 handle->pipe.conn.readfile_thread = hThread;
968 } else {
969 hThread = NULL;
970 }
971 uv_mutex_unlock(m);
972 }
973 restart_readfile:
974 if (handle->flags & UV_HANDLE_READING) {
975 result = ReadFile(handle->handle,
976 &uv_zero_,
977 0,
978 &bytes,
979 NULL);
980 if (!result) {
981 err = GetLastError();
982 if (err == ERROR_OPERATION_ABORTED &&
983 handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
984 if (handle->flags & UV_HANDLE_READING) {
985 /* just a brief break to do something else */
986 handle->pipe.conn.readfile_thread = NULL;
987 /* resume after it is finished */
988 uv_mutex_lock(m);
989 handle->pipe.conn.readfile_thread = hThread;
990 uv_mutex_unlock(m);
991 goto restart_readfile;
992 } else {
993 result = 1; /* successfully stopped reading */
994 }
995 }
996 }
997 } else {
998 result = 1; /* successfully aborted read before it even started */
999 }
1000 if (hThread) {
1001 assert(hThread == handle->pipe.conn.readfile_thread);
1002 /* mutex does not control clearing readfile_thread */
1003 handle->pipe.conn.readfile_thread = NULL;
1004 uv_mutex_lock(m);
1005 /* only when we hold the mutex lock is it safe to
1006 open or close the handle */
1007 CloseHandle(hThread);
1008 uv_mutex_unlock(m);
1009 }
1010
1011 if (!result) {
1012 SET_REQ_ERROR(req, err);
1013 }
1014
1015 POST_COMPLETION_FOR_REQ(loop, req);
1016 return 0;
1017 }
1018
1019
uv_pipe_writefile_thread_proc(void * parameter)1020 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1021 int result;
1022 DWORD bytes;
1023 uv_write_t* req = (uv_write_t*) parameter;
1024 uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1025 uv_loop_t* loop = handle->loop;
1026
1027 assert(req != NULL);
1028 assert(req->type == UV_WRITE);
1029 assert(handle->type == UV_NAMED_PIPE);
1030 assert(req->write_buffer.base);
1031
1032 result = WriteFile(handle->handle,
1033 req->write_buffer.base,
1034 req->write_buffer.len,
1035 &bytes,
1036 NULL);
1037
1038 if (!result) {
1039 SET_REQ_ERROR(req, GetLastError());
1040 }
1041
1042 POST_COMPLETION_FOR_REQ(loop, req);
1043 return 0;
1044 }
1045
1046
post_completion_read_wait(void * context,BOOLEAN timed_out)1047 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1048 uv_read_t* req;
1049 uv_tcp_t* handle;
1050
1051 req = (uv_read_t*) context;
1052 assert(req != NULL);
1053 handle = (uv_tcp_t*)req->data;
1054 assert(handle != NULL);
1055 assert(!timed_out);
1056
1057 if (!PostQueuedCompletionStatus(handle->loop->iocp,
1058 req->u.io.overlapped.InternalHigh,
1059 0,
1060 &req->u.io.overlapped)) {
1061 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1062 }
1063 }
1064
1065
post_completion_write_wait(void * context,BOOLEAN timed_out)1066 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1067 uv_write_t* req;
1068 uv_tcp_t* handle;
1069
1070 req = (uv_write_t*) context;
1071 assert(req != NULL);
1072 handle = (uv_tcp_t*)req->handle;
1073 assert(handle != NULL);
1074 assert(!timed_out);
1075
1076 if (!PostQueuedCompletionStatus(handle->loop->iocp,
1077 req->u.io.overlapped.InternalHigh,
1078 0,
1079 &req->u.io.overlapped)) {
1080 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1081 }
1082 }
1083
1084
uv_pipe_queue_read(uv_loop_t * loop,uv_pipe_t * handle)1085 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1086 uv_read_t* req;
1087 int result;
1088
1089 assert(handle->flags & UV_HANDLE_READING);
1090 assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1091
1092 assert(handle->handle != INVALID_HANDLE_VALUE);
1093
1094 req = &handle->read_req;
1095
1096 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1097 if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1098 req,
1099 WT_EXECUTELONGFUNCTION)) {
1100 /* Make this req pending reporting an error. */
1101 SET_REQ_ERROR(req, GetLastError());
1102 goto error;
1103 }
1104 } else {
1105 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1106 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1107 req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1108 }
1109
1110 /* Do 0-read */
1111 result = ReadFile(handle->handle,
1112 &uv_zero_,
1113 0,
1114 NULL,
1115 &req->u.io.overlapped);
1116
1117 if (!result && GetLastError() != ERROR_IO_PENDING) {
1118 /* Make this req pending reporting an error. */
1119 SET_REQ_ERROR(req, GetLastError());
1120 goto error;
1121 }
1122
1123 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1124 if (!req->event_handle) {
1125 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1126 if (!req->event_handle) {
1127 uv_fatal_error(GetLastError(), "CreateEvent");
1128 }
1129 }
1130 if (req->wait_handle == INVALID_HANDLE_VALUE) {
1131 if (!RegisterWaitForSingleObject(&req->wait_handle,
1132 req->u.io.overlapped.hEvent, post_completion_read_wait, (void*) req,
1133 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1134 SET_REQ_ERROR(req, GetLastError());
1135 goto error;
1136 }
1137 }
1138 }
1139 }
1140
1141 /* Start the eof timer if there is one */
1142 eof_timer_start(handle);
1143 handle->flags |= UV_HANDLE_READ_PENDING;
1144 handle->reqs_pending++;
1145 return;
1146
1147 error:
1148 uv_insert_pending_req(loop, (uv_req_t*)req);
1149 handle->flags |= UV_HANDLE_READ_PENDING;
1150 handle->reqs_pending++;
1151 }
1152
1153
uv_pipe_read_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1154 int uv_pipe_read_start(uv_pipe_t* handle,
1155 uv_alloc_cb alloc_cb,
1156 uv_read_cb read_cb) {
1157 uv_loop_t* loop = handle->loop;
1158
1159 handle->flags |= UV_HANDLE_READING;
1160 INCREASE_ACTIVE_COUNT(loop, handle);
1161 handle->read_cb = read_cb;
1162 handle->alloc_cb = alloc_cb;
1163
1164 /* If reading was stopped and then started again, there could still be a */
1165 /* read request pending. */
1166 if (!(handle->flags & UV_HANDLE_READ_PENDING))
1167 uv_pipe_queue_read(loop, handle);
1168
1169 return 0;
1170 }
1171
1172
uv_insert_non_overlapped_write_req(uv_pipe_t * handle,uv_write_t * req)1173 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
1174 uv_write_t* req) {
1175 req->next_req = NULL;
1176 if (handle->pipe.conn.non_overlapped_writes_tail) {
1177 req->next_req =
1178 handle->pipe.conn.non_overlapped_writes_tail->next_req;
1179 handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1180 handle->pipe.conn.non_overlapped_writes_tail = req;
1181 } else {
1182 req->next_req = (uv_req_t*)req;
1183 handle->pipe.conn.non_overlapped_writes_tail = req;
1184 }
1185 }
1186
1187
uv_remove_non_overlapped_write_req(uv_pipe_t * handle)1188 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1189 uv_write_t* req;
1190
1191 if (handle->pipe.conn.non_overlapped_writes_tail) {
1192 req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1193
1194 if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1195 handle->pipe.conn.non_overlapped_writes_tail = NULL;
1196 } else {
1197 handle->pipe.conn.non_overlapped_writes_tail->next_req =
1198 req->next_req;
1199 }
1200
1201 return req;
1202 } else {
1203 /* queue empty */
1204 return NULL;
1205 }
1206 }
1207
1208
uv_queue_non_overlapped_write(uv_pipe_t * handle)1209 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
1210 uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1211 if (req) {
1212 if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1213 req,
1214 WT_EXECUTELONGFUNCTION)) {
1215 uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1216 }
1217 }
1218 }
1219
1220
uv_pipe_write_impl(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1221 static int uv_pipe_write_impl(uv_loop_t* loop,
1222 uv_write_t* req,
1223 uv_pipe_t* handle,
1224 const uv_buf_t bufs[],
1225 unsigned int nbufs,
1226 uv_stream_t* send_handle,
1227 uv_write_cb cb) {
1228 int err;
1229 int result;
1230 uv_tcp_t* tcp_send_handle;
1231 uv_write_t* ipc_header_req = NULL;
1232 uv_ipc_frame_uv_stream ipc_frame;
1233
1234 if (nbufs != 1 && (nbufs != 0 || !send_handle)) {
1235 return ERROR_NOT_SUPPORTED;
1236 }
1237
1238 /* Only TCP handles are supported for sharing. */
1239 if (send_handle && ((send_handle->type != UV_TCP) ||
1240 (!(send_handle->flags & UV_HANDLE_BOUND) &&
1241 !(send_handle->flags & UV_HANDLE_CONNECTION)))) {
1242 return ERROR_NOT_SUPPORTED;
1243 }
1244
1245 assert(handle->handle != INVALID_HANDLE_VALUE);
1246
1247 UV_REQ_INIT(req, UV_WRITE);
1248 req->handle = (uv_stream_t*) handle;
1249 req->cb = cb;
1250 req->ipc_header = 0;
1251 req->event_handle = NULL;
1252 req->wait_handle = INVALID_HANDLE_VALUE;
1253 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1254
1255 if (handle->ipc) {
1256 assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1257 ipc_frame.header.flags = 0;
1258
1259 /* Use the IPC framing protocol. */
1260 if (send_handle) {
1261 tcp_send_handle = (uv_tcp_t*)send_handle;
1262
1263 if (handle->pipe.conn.ipc_pid == 0) {
1264 handle->pipe.conn.ipc_pid = uv_current_pid();
1265 }
1266
1267 err = uv_tcp_duplicate_socket(tcp_send_handle, handle->pipe.conn.ipc_pid,
1268 &ipc_frame.socket_info_ex.socket_info);
1269 if (err) {
1270 return err;
1271 }
1272
1273 ipc_frame.socket_info_ex.delayed_error = tcp_send_handle->delayed_error;
1274
1275 ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
1276
1277 if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
1278 ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
1279 }
1280 }
1281
1282 if (nbufs == 1) {
1283 ipc_frame.header.flags |= UV_IPC_RAW_DATA;
1284 ipc_frame.header.raw_data_length = bufs[0].len;
1285 }
1286
1287 /*
1288 * Use the provided req if we're only doing a single write.
1289 * If we're doing multiple writes, use ipc_header_write_req to do
1290 * the first write, and then use the provided req for the second write.
1291 */
1292 if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1293 ipc_header_req = req;
1294 } else {
1295 /*
1296 * Try to use the preallocated write req if it's available.
1297 * Otherwise allocate a new one.
1298 */
1299 if (handle->pipe.conn.ipc_header_write_req.type != UV_WRITE) {
1300 ipc_header_req = (uv_write_t*)&handle->pipe.conn.ipc_header_write_req;
1301 } else {
1302 ipc_header_req = (uv_write_t*)uv__malloc(sizeof(uv_write_t));
1303 if (!ipc_header_req) {
1304 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1305 }
1306 }
1307
1308 UV_REQ_INIT(ipc_header_req, UV_WRITE);
1309 ipc_header_req->handle = (uv_stream_t*) handle;
1310 ipc_header_req->cb = NULL;
1311 ipc_header_req->ipc_header = 1;
1312 }
1313
1314 /* Write the header or the whole frame. */
1315 memset(&ipc_header_req->u.io.overlapped, 0,
1316 sizeof(ipc_header_req->u.io.overlapped));
1317
1318 /* Using overlapped IO, but wait for completion before returning.
1319 This write is blocking because ipc_frame is on stack. */
1320 ipc_header_req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1321 if (!ipc_header_req->u.io.overlapped.hEvent) {
1322 uv_fatal_error(GetLastError(), "CreateEvent");
1323 }
1324
1325 result = WriteFile(handle->handle,
1326 &ipc_frame,
1327 ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
1328 sizeof(ipc_frame) : sizeof(ipc_frame.header),
1329 NULL,
1330 &ipc_header_req->u.io.overlapped);
1331 if (!result && GetLastError() != ERROR_IO_PENDING) {
1332 err = GetLastError();
1333 CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
1334 return err;
1335 }
1336
1337 if (!result) {
1338 /* Request not completed immediately. Wait for it.*/
1339 if (WaitForSingleObject(ipc_header_req->u.io.overlapped.hEvent, INFINITE) !=
1340 WAIT_OBJECT_0) {
1341 err = GetLastError();
1342 CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
1343 return err;
1344 }
1345 }
1346 ipc_header_req->u.io.queued_bytes = 0;
1347 CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
1348 ipc_header_req->u.io.overlapped.hEvent = NULL;
1349
1350 REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
1351 handle->reqs_pending++;
1352 handle->stream.conn.write_reqs_pending++;
1353
1354 /* If we don't have any raw data to write - we're done. */
1355 if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1356 return 0;
1357 }
1358 }
1359
1360 if ((handle->flags &
1361 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1362 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1363 DWORD bytes;
1364 result = WriteFile(handle->handle,
1365 bufs[0].base,
1366 bufs[0].len,
1367 &bytes,
1368 NULL);
1369
1370 if (!result) {
1371 err = GetLastError();
1372 return err;
1373 } else {
1374 /* Request completed immediately. */
1375 req->u.io.queued_bytes = 0;
1376 }
1377
1378 REGISTER_HANDLE_REQ(loop, handle, req);
1379 handle->reqs_pending++;
1380 handle->stream.conn.write_reqs_pending++;
1381 POST_COMPLETION_FOR_REQ(loop, req);
1382 return 0;
1383 } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1384 req->write_buffer = bufs[0];
1385 uv_insert_non_overlapped_write_req(handle, req);
1386 if (handle->stream.conn.write_reqs_pending == 0) {
1387 uv_queue_non_overlapped_write(handle);
1388 }
1389
1390 /* Request queued by the kernel. */
1391 req->u.io.queued_bytes = bufs[0].len;
1392 handle->write_queue_size += req->u.io.queued_bytes;
1393 } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1394 /* Using overlapped IO, but wait for completion before returning */
1395 req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1396 if (!req->u.io.overlapped.hEvent) {
1397 uv_fatal_error(GetLastError(), "CreateEvent");
1398 }
1399
1400 result = WriteFile(handle->handle,
1401 bufs[0].base,
1402 bufs[0].len,
1403 NULL,
1404 &req->u.io.overlapped);
1405
1406 if (!result && GetLastError() != ERROR_IO_PENDING) {
1407 err = GetLastError();
1408 CloseHandle(req->u.io.overlapped.hEvent);
1409 return err;
1410 }
1411
1412 if (result) {
1413 /* Request completed immediately. */
1414 req->u.io.queued_bytes = 0;
1415 } else {
1416 /* Request queued by the kernel. */
1417 req->u.io.queued_bytes = bufs[0].len;
1418 handle->write_queue_size += req->u.io.queued_bytes;
1419 if (WaitForSingleObject(req->u.io.overlapped.hEvent, INFINITE) !=
1420 WAIT_OBJECT_0) {
1421 err = GetLastError();
1422 CloseHandle(req->u.io.overlapped.hEvent);
1423 return uv_translate_sys_error(err);
1424 }
1425 }
1426 CloseHandle(req->u.io.overlapped.hEvent);
1427
1428 REGISTER_HANDLE_REQ(loop, handle, req);
1429 handle->reqs_pending++;
1430 handle->stream.conn.write_reqs_pending++;
1431 return 0;
1432 } else {
1433 result = WriteFile(handle->handle,
1434 bufs[0].base,
1435 bufs[0].len,
1436 NULL,
1437 &req->u.io.overlapped);
1438
1439 if (!result && GetLastError() != ERROR_IO_PENDING) {
1440 return GetLastError();
1441 }
1442
1443 if (result) {
1444 /* Request completed immediately. */
1445 req->u.io.queued_bytes = 0;
1446 } else {
1447 /* Request queued by the kernel. */
1448 req->u.io.queued_bytes = bufs[0].len;
1449 handle->write_queue_size += req->u.io.queued_bytes;
1450 }
1451
1452 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1453 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1454 if (!req->event_handle) {
1455 uv_fatal_error(GetLastError(), "CreateEvent");
1456 }
1457 if (!RegisterWaitForSingleObject(&req->wait_handle,
1458 req->u.io.overlapped.hEvent, post_completion_write_wait, (void*) req,
1459 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1460 return GetLastError();
1461 }
1462 }
1463 }
1464
1465 REGISTER_HANDLE_REQ(loop, handle, req);
1466 handle->reqs_pending++;
1467 handle->stream.conn.write_reqs_pending++;
1468
1469 return 0;
1470 }
1471
1472
uv_pipe_write(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)1473 int uv_pipe_write(uv_loop_t* loop,
1474 uv_write_t* req,
1475 uv_pipe_t* handle,
1476 const uv_buf_t bufs[],
1477 unsigned int nbufs,
1478 uv_write_cb cb) {
1479 return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
1480 }
1481
1482
uv_pipe_write2(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1483 int uv_pipe_write2(uv_loop_t* loop,
1484 uv_write_t* req,
1485 uv_pipe_t* handle,
1486 const uv_buf_t bufs[],
1487 unsigned int nbufs,
1488 uv_stream_t* send_handle,
1489 uv_write_cb cb) {
1490 if (!handle->ipc) {
1491 return WSAEINVAL;
1492 }
1493
1494 return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
1495 }
1496
1497
uv_pipe_read_eof(uv_loop_t * loop,uv_pipe_t * handle,uv_buf_t buf)1498 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1499 uv_buf_t buf) {
1500 /* If there is an eof timer running, we don't need it any more, */
1501 /* so discard it. */
1502 eof_timer_destroy(handle);
1503
1504 handle->flags &= ~UV_HANDLE_READABLE;
1505 uv_read_stop((uv_stream_t*) handle);
1506
1507 handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1508 }
1509
1510
uv_pipe_read_error(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1511 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1512 uv_buf_t buf) {
1513 /* If there is an eof timer running, we don't need it any more, */
1514 /* so discard it. */
1515 eof_timer_destroy(handle);
1516
1517 uv_read_stop((uv_stream_t*) handle);
1518
1519 handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1520 }
1521
1522
uv_pipe_read_error_or_eof(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1523 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1524 int error, uv_buf_t buf) {
1525 if (error == ERROR_OPERATION_ABORTED) {
1526 /* do nothing (equivalent to EINTR) */
1527 }
1528 else if (error == ERROR_BROKEN_PIPE) {
1529 uv_pipe_read_eof(loop, handle, buf);
1530 } else {
1531 uv_pipe_read_error(loop, handle, error, buf);
1532 }
1533 }
1534
1535
uv__pipe_insert_pending_socket(uv_pipe_t * handle,uv__ipc_socket_info_ex * info,int tcp_connection)1536 void uv__pipe_insert_pending_socket(uv_pipe_t* handle,
1537 uv__ipc_socket_info_ex* info,
1538 int tcp_connection) {
1539 uv__ipc_queue_item_t* item;
1540
1541 item = (uv__ipc_queue_item_t*) uv__malloc(sizeof(*item));
1542 if (item == NULL)
1543 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1544
1545 memcpy(&item->socket_info_ex, info, sizeof(item->socket_info_ex));
1546 item->tcp_connection = tcp_connection;
1547 QUEUE_INSERT_TAIL(&handle->pipe.conn.pending_ipc_info.queue, &item->member);
1548 handle->pipe.conn.pending_ipc_info.queue_len++;
1549 }
1550
1551
uv_process_pipe_read_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * req)1552 void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
1553 uv_req_t* req) {
1554 DWORD bytes, avail;
1555 uv_buf_t buf;
1556 uv_ipc_frame_uv_stream ipc_frame;
1557
1558 assert(handle->type == UV_NAMED_PIPE);
1559
1560 handle->flags &= ~UV_HANDLE_READ_PENDING;
1561 eof_timer_stop(handle);
1562
1563 if (!REQ_SUCCESS(req)) {
1564 /* An error occurred doing the 0-read. */
1565 if (handle->flags & UV_HANDLE_READING) {
1566 uv_pipe_read_error_or_eof(loop,
1567 handle,
1568 GET_REQ_ERROR(req),
1569 uv_null_buf_);
1570 }
1571 } else {
1572 /* Do non-blocking reads until the buffer is empty */
1573 while (handle->flags & UV_HANDLE_READING) {
1574 if (!PeekNamedPipe(handle->handle,
1575 NULL,
1576 0,
1577 NULL,
1578 &avail,
1579 NULL)) {
1580 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1581 break;
1582 }
1583
1584 if (avail == 0) {
1585 /* There is nothing to read after all. */
1586 break;
1587 }
1588
1589 if (handle->ipc) {
1590 /* Use the IPC framing protocol to read the incoming data. */
1591 if (handle->pipe.conn.remaining_ipc_rawdata_bytes == 0) {
1592 /* We're reading a new frame. First, read the header. */
1593 assert(avail >= sizeof(ipc_frame.header));
1594
1595 if (!ReadFile(handle->handle,
1596 &ipc_frame.header,
1597 sizeof(ipc_frame.header),
1598 &bytes,
1599 NULL)) {
1600 uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1601 uv_null_buf_);
1602 break;
1603 }
1604
1605 assert(bytes == sizeof(ipc_frame.header));
1606 assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
1607 UV_IPC_TCP_CONNECTION));
1608
1609 if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
1610 assert(avail - sizeof(ipc_frame.header) >=
1611 sizeof(ipc_frame.socket_info_ex));
1612
1613 /* Read the TCP socket info. */
1614 if (!ReadFile(handle->handle,
1615 &ipc_frame.socket_info_ex,
1616 sizeof(ipc_frame) - sizeof(ipc_frame.header),
1617 &bytes,
1618 NULL)) {
1619 uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1620 uv_null_buf_);
1621 break;
1622 }
1623
1624 assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
1625
1626 /* Store the pending socket info. */
1627 uv__pipe_insert_pending_socket(
1628 handle,
1629 &ipc_frame.socket_info_ex,
1630 ipc_frame.header.flags & UV_IPC_TCP_CONNECTION);
1631 }
1632
1633 if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
1634 handle->pipe.conn.remaining_ipc_rawdata_bytes =
1635 ipc_frame.header.raw_data_length;
1636 continue;
1637 }
1638 } else {
1639 avail = min(avail, (DWORD)handle->pipe.conn.remaining_ipc_rawdata_bytes);
1640 }
1641 }
1642
1643 buf = uv_buf_init(NULL, 0);
1644 handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
1645 if (buf.base == NULL || buf.len == 0) {
1646 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1647 break;
1648 }
1649 assert(buf.base != NULL);
1650
1651 if (ReadFile(handle->handle,
1652 buf.base,
1653 min(buf.len, avail),
1654 &bytes,
1655 NULL)) {
1656 /* Successful read */
1657 if (handle->ipc) {
1658 assert(handle->pipe.conn.remaining_ipc_rawdata_bytes >= bytes);
1659 handle->pipe.conn.remaining_ipc_rawdata_bytes =
1660 handle->pipe.conn.remaining_ipc_rawdata_bytes - bytes;
1661 }
1662 handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1663
1664 /* Read again only if bytes == buf.len */
1665 if (bytes <= buf.len) {
1666 break;
1667 }
1668 } else {
1669 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1670 break;
1671 }
1672 }
1673
1674 /* Post another 0-read if still reading and not closing. */
1675 if ((handle->flags & UV_HANDLE_READING) &&
1676 !(handle->flags & UV_HANDLE_READ_PENDING)) {
1677 uv_pipe_queue_read(loop, handle);
1678 }
1679 }
1680
1681 DECREASE_PENDING_REQ_COUNT(handle);
1682 }
1683
1684
uv_process_pipe_write_req(uv_loop_t * loop,uv_pipe_t * handle,uv_write_t * req)1685 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
1686 uv_write_t* req) {
1687 int err;
1688
1689 assert(handle->type == UV_NAMED_PIPE);
1690
1691 assert(handle->write_queue_size >= req->u.io.queued_bytes);
1692 handle->write_queue_size -= req->u.io.queued_bytes;
1693
1694 UNREGISTER_HANDLE_REQ(loop, handle, req);
1695
1696 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1697 if (req->wait_handle != INVALID_HANDLE_VALUE) {
1698 UnregisterWait(req->wait_handle);
1699 req->wait_handle = INVALID_HANDLE_VALUE;
1700 }
1701 if (req->event_handle) {
1702 CloseHandle(req->event_handle);
1703 req->event_handle = NULL;
1704 }
1705 }
1706
1707 if (req->ipc_header) {
1708 if (req == &handle->pipe.conn.ipc_header_write_req) {
1709 req->type = UV_UNKNOWN_REQ;
1710 } else {
1711 uv__free(req);
1712 }
1713 } else {
1714 if (req->cb) {
1715 err = GET_REQ_ERROR(req);
1716 req->cb(req, uv_translate_sys_error(err));
1717 }
1718 }
1719
1720 handle->stream.conn.write_reqs_pending--;
1721
1722 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
1723 handle->pipe.conn.non_overlapped_writes_tail) {
1724 assert(handle->stream.conn.write_reqs_pending > 0);
1725 uv_queue_non_overlapped_write(handle);
1726 }
1727
1728 if (handle->stream.conn.shutdown_req != NULL &&
1729 handle->stream.conn.write_reqs_pending == 0) {
1730 uv_want_endgame(loop, (uv_handle_t*)handle);
1731 }
1732
1733 DECREASE_PENDING_REQ_COUNT(handle);
1734 }
1735
1736
uv_process_pipe_accept_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * raw_req)1737 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
1738 uv_req_t* raw_req) {
1739 uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
1740
1741 assert(handle->type == UV_NAMED_PIPE);
1742
1743 if (handle->flags & UV__HANDLE_CLOSING) {
1744 /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
1745 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
1746 DECREASE_PENDING_REQ_COUNT(handle);
1747 return;
1748 }
1749
1750 if (REQ_SUCCESS(req)) {
1751 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1752 req->next_pending = handle->pipe.serv.pending_accepts;
1753 handle->pipe.serv.pending_accepts = req;
1754
1755 if (handle->stream.serv.connection_cb) {
1756 handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
1757 }
1758 } else {
1759 if (req->pipeHandle != INVALID_HANDLE_VALUE) {
1760 CloseHandle(req->pipeHandle);
1761 req->pipeHandle = INVALID_HANDLE_VALUE;
1762 }
1763 if (!(handle->flags & UV__HANDLE_CLOSING)) {
1764 uv_pipe_queue_accept(loop, handle, req, FALSE);
1765 }
1766 }
1767
1768 DECREASE_PENDING_REQ_COUNT(handle);
1769 }
1770
1771
uv_process_pipe_connect_req(uv_loop_t * loop,uv_pipe_t * handle,uv_connect_t * req)1772 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
1773 uv_connect_t* req) {
1774 int err;
1775
1776 assert(handle->type == UV_NAMED_PIPE);
1777
1778 UNREGISTER_HANDLE_REQ(loop, handle, req);
1779
1780 if (req->cb) {
1781 err = 0;
1782 if (REQ_SUCCESS(req)) {
1783 uv_pipe_connection_init(handle);
1784 } else {
1785 err = GET_REQ_ERROR(req);
1786 }
1787 req->cb(req, uv_translate_sys_error(err));
1788 }
1789
1790 DECREASE_PENDING_REQ_COUNT(handle);
1791 }
1792
1793
uv_process_pipe_shutdown_req(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)1794 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
1795 uv_shutdown_t* req) {
1796 assert(handle->type == UV_NAMED_PIPE);
1797
1798 UNREGISTER_HANDLE_REQ(loop, handle, req);
1799
1800 if (handle->flags & UV_HANDLE_READABLE) {
1801 /* Initialize and optionally start the eof timer. Only do this if the */
1802 /* pipe is readable and we haven't seen EOF come in ourselves. */
1803 eof_timer_init(handle);
1804
1805 /* If reading start the timer right now. */
1806 /* Otherwise uv_pipe_queue_read will start it. */
1807 if (handle->flags & UV_HANDLE_READ_PENDING) {
1808 eof_timer_start(handle);
1809 }
1810
1811 } else {
1812 /* This pipe is not readable. We can just close it to let the other end */
1813 /* know that we're done writing. */
1814 close_pipe(handle);
1815 }
1816
1817 if (req->cb) {
1818 req->cb(req, 0);
1819 }
1820
1821 DECREASE_PENDING_REQ_COUNT(handle);
1822 }
1823
1824
eof_timer_init(uv_pipe_t * pipe)1825 static void eof_timer_init(uv_pipe_t* pipe) {
1826 int r;
1827
1828 assert(pipe->pipe.conn.eof_timer == NULL);
1829 assert(pipe->flags & UV_HANDLE_CONNECTION);
1830
1831 pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
1832
1833 r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
1834 assert(r == 0); /* timers can't fail */
1835 pipe->pipe.conn.eof_timer->data = pipe;
1836 uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
1837 }
1838
1839
eof_timer_start(uv_pipe_t * pipe)1840 static void eof_timer_start(uv_pipe_t* pipe) {
1841 assert(pipe->flags & UV_HANDLE_CONNECTION);
1842
1843 if (pipe->pipe.conn.eof_timer != NULL) {
1844 uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
1845 }
1846 }
1847
1848
eof_timer_stop(uv_pipe_t * pipe)1849 static void eof_timer_stop(uv_pipe_t* pipe) {
1850 assert(pipe->flags & UV_HANDLE_CONNECTION);
1851
1852 if (pipe->pipe.conn.eof_timer != NULL) {
1853 uv_timer_stop(pipe->pipe.conn.eof_timer);
1854 }
1855 }
1856
1857
eof_timer_cb(uv_timer_t * timer)1858 static void eof_timer_cb(uv_timer_t* timer) {
1859 uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
1860 uv_loop_t* loop = timer->loop;
1861
1862 assert(pipe->type == UV_NAMED_PIPE);
1863
1864 /* This should always be true, since we start the timer only */
1865 /* in uv_pipe_queue_read after successfully calling ReadFile, */
1866 /* or in uv_process_pipe_shutdown_req if a read is pending, */
1867 /* and we always immediately stop the timer in */
1868 /* uv_process_pipe_read_req. */
1869 assert(pipe->flags & UV_HANDLE_READ_PENDING);
1870
1871 /* If there are many packets coming off the iocp then the timer callback */
1872 /* may be called before the read request is coming off the queue. */
1873 /* Therefore we check here if the read request has completed but will */
1874 /* be processed later. */
1875 if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
1876 HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
1877 return;
1878 }
1879
1880 /* Force both ends off the pipe. */
1881 close_pipe(pipe);
1882
1883 /* Stop reading, so the pending read that is going to fail will */
1884 /* not be reported to the user. */
1885 uv_read_stop((uv_stream_t*) pipe);
1886
1887 /* Report the eof and update flags. This will get reported even if the */
1888 /* user stopped reading in the meantime. TODO: is that okay? */
1889 uv_pipe_read_eof(loop, pipe, uv_null_buf_);
1890 }
1891
1892
eof_timer_destroy(uv_pipe_t * pipe)1893 static void eof_timer_destroy(uv_pipe_t* pipe) {
1894 assert(pipe->flags & UV_HANDLE_CONNECTION);
1895
1896 if (pipe->pipe.conn.eof_timer) {
1897 uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
1898 pipe->pipe.conn.eof_timer = NULL;
1899 }
1900 }
1901
1902
eof_timer_close_cb(uv_handle_t * handle)1903 static void eof_timer_close_cb(uv_handle_t* handle) {
1904 assert(handle->type == UV_TIMER);
1905 uv__free(handle);
1906 }
1907
1908
uv_pipe_open(uv_pipe_t * pipe,uv_file file)1909 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
1910 HANDLE os_handle = uv__get_osfhandle(file);
1911 NTSTATUS nt_status;
1912 IO_STATUS_BLOCK io_status;
1913 FILE_ACCESS_INFORMATION access;
1914 DWORD duplex_flags = 0;
1915
1916 if (os_handle == INVALID_HANDLE_VALUE)
1917 return UV_EBADF;
1918
1919 uv__once_init();
1920 /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
1921 * underlying OS handle and forget about the original fd.
1922 * We could also opt to use the original OS handle and just never close it,
1923 * but then there would be no reliable way to cancel pending read operations
1924 * upon close.
1925 */
1926 if (file <= 2) {
1927 if (!DuplicateHandle(INVALID_HANDLE_VALUE,
1928 os_handle,
1929 INVALID_HANDLE_VALUE,
1930 &os_handle,
1931 0,
1932 FALSE,
1933 DUPLICATE_SAME_ACCESS))
1934 return uv_translate_sys_error(GetLastError());
1935 file = -1;
1936 }
1937
1938 /* Determine what kind of permissions we have on this handle.
1939 * Cygwin opens the pipe in message mode, but we can support it,
1940 * just query the access flags and set the stream flags accordingly.
1941 */
1942 nt_status = pNtQueryInformationFile(os_handle,
1943 &io_status,
1944 &access,
1945 sizeof(access),
1946 FileAccessInformation);
1947 if (nt_status != STATUS_SUCCESS)
1948 return UV_EINVAL;
1949
1950 if (pipe->ipc) {
1951 if (!(access.AccessFlags & FILE_WRITE_DATA) ||
1952 !(access.AccessFlags & FILE_READ_DATA)) {
1953 return UV_EINVAL;
1954 }
1955 }
1956
1957 if (access.AccessFlags & FILE_WRITE_DATA)
1958 duplex_flags |= UV_HANDLE_WRITABLE;
1959 if (access.AccessFlags & FILE_READ_DATA)
1960 duplex_flags |= UV_HANDLE_READABLE;
1961
1962 if (os_handle == INVALID_HANDLE_VALUE ||
1963 uv_set_pipe_handle(pipe->loop,
1964 pipe,
1965 os_handle,
1966 file,
1967 duplex_flags) == -1) {
1968 return UV_EINVAL;
1969 }
1970
1971 uv_pipe_connection_init(pipe);
1972
1973 if (pipe->ipc) {
1974 assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1975 pipe->pipe.conn.ipc_pid = uv_os_getppid();
1976 assert(pipe->pipe.conn.ipc_pid != -1);
1977 }
1978 return 0;
1979 }
1980
1981
uv__pipe_getname(const uv_pipe_t * handle,char * buffer,size_t * size)1982 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
1983 NTSTATUS nt_status;
1984 IO_STATUS_BLOCK io_status;
1985 FILE_NAME_INFORMATION tmp_name_info;
1986 FILE_NAME_INFORMATION* name_info;
1987 WCHAR* name_buf;
1988 unsigned int addrlen;
1989 unsigned int name_size;
1990 unsigned int name_len;
1991 int err;
1992
1993 uv__once_init();
1994 name_info = NULL;
1995
1996 if (handle->handle == INVALID_HANDLE_VALUE) {
1997 *size = 0;
1998 return UV_EINVAL;
1999 }
2000
2001 uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */
2002
2003 nt_status = pNtQueryInformationFile(handle->handle,
2004 &io_status,
2005 &tmp_name_info,
2006 sizeof tmp_name_info,
2007 FileNameInformation);
2008 if (nt_status == STATUS_BUFFER_OVERFLOW) {
2009 name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2010 name_info = uv__malloc(name_size);
2011 if (!name_info) {
2012 *size = 0;
2013 err = UV_ENOMEM;
2014 goto cleanup;
2015 }
2016
2017 nt_status = pNtQueryInformationFile(handle->handle,
2018 &io_status,
2019 name_info,
2020 name_size,
2021 FileNameInformation);
2022 }
2023
2024 if (nt_status != STATUS_SUCCESS) {
2025 *size = 0;
2026 err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2027 goto error;
2028 }
2029
2030 if (!name_info) {
2031 /* the struct on stack was used */
2032 name_buf = tmp_name_info.FileName;
2033 name_len = tmp_name_info.FileNameLength;
2034 } else {
2035 name_buf = name_info->FileName;
2036 name_len = name_info->FileNameLength;
2037 }
2038
2039 if (name_len == 0) {
2040 *size = 0;
2041 err = 0;
2042 goto error;
2043 }
2044
2045 name_len /= sizeof(WCHAR);
2046
2047 /* check how much space we need */
2048 addrlen = WideCharToMultiByte(CP_UTF8,
2049 0,
2050 name_buf,
2051 name_len,
2052 NULL,
2053 0,
2054 NULL,
2055 NULL);
2056 if (!addrlen) {
2057 *size = 0;
2058 err = uv_translate_sys_error(GetLastError());
2059 goto error;
2060 } else if (pipe_prefix_len + addrlen >= *size) {
2061 /* "\\\\.\\pipe" + name */
2062 *size = pipe_prefix_len + addrlen + 1;
2063 err = UV_ENOBUFS;
2064 goto error;
2065 }
2066
2067 memcpy(buffer, pipe_prefix, pipe_prefix_len);
2068 addrlen = WideCharToMultiByte(CP_UTF8,
2069 0,
2070 name_buf,
2071 name_len,
2072 buffer+pipe_prefix_len,
2073 *size-pipe_prefix_len,
2074 NULL,
2075 NULL);
2076 if (!addrlen) {
2077 *size = 0;
2078 err = uv_translate_sys_error(GetLastError());
2079 goto error;
2080 }
2081
2082 addrlen += pipe_prefix_len;
2083 *size = addrlen;
2084 buffer[addrlen] = '\0';
2085
2086 err = 0;
2087
2088 error:
2089 uv__free(name_info);
2090
2091 cleanup:
2092 uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */
2093 return err;
2094 }
2095
2096
uv_pipe_pending_count(uv_pipe_t * handle)2097 int uv_pipe_pending_count(uv_pipe_t* handle) {
2098 if (!handle->ipc)
2099 return 0;
2100 return handle->pipe.conn.pending_ipc_info.queue_len;
2101 }
2102
2103
uv_pipe_getsockname(const uv_pipe_t * handle,char * buffer,size_t * size)2104 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2105 if (handle->flags & UV_HANDLE_BOUND)
2106 return uv__pipe_getname(handle, buffer, size);
2107
2108 if (handle->flags & UV_HANDLE_CONNECTION ||
2109 handle->handle != INVALID_HANDLE_VALUE) {
2110 *size = 0;
2111 return 0;
2112 }
2113
2114 return UV_EBADF;
2115 }
2116
2117
uv_pipe_getpeername(const uv_pipe_t * handle,char * buffer,size_t * size)2118 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2119 /* emulate unix behaviour */
2120 if (handle->flags & UV_HANDLE_BOUND)
2121 return UV_ENOTCONN;
2122
2123 if (handle->handle != INVALID_HANDLE_VALUE)
2124 return uv__pipe_getname(handle, buffer, size);
2125
2126 return UV_EBADF;
2127 }
2128
2129
uv_pipe_pending_type(uv_pipe_t * handle)2130 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2131 if (!handle->ipc)
2132 return UV_UNKNOWN_HANDLE;
2133 if (handle->pipe.conn.pending_ipc_info.queue_len == 0)
2134 return UV_UNKNOWN_HANDLE;
2135 else
2136 return UV_TCP;
2137 }
2138
uv_pipe_chmod(uv_pipe_t * handle,int mode)2139 int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2140 SID_IDENTIFIER_AUTHORITY sid_world = SECURITY_WORLD_SID_AUTHORITY;
2141 PACL old_dacl, new_dacl;
2142 PSECURITY_DESCRIPTOR sd;
2143 EXPLICIT_ACCESS ea;
2144 PSID everyone;
2145 int error;
2146
2147 if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2148 return UV_EBADF;
2149
2150 if (mode != UV_READABLE &&
2151 mode != UV_WRITABLE &&
2152 mode != (UV_WRITABLE | UV_READABLE))
2153 return UV_EINVAL;
2154
2155 if (!AllocateAndInitializeSid(&sid_world,
2156 1,
2157 SECURITY_WORLD_RID,
2158 0, 0, 0, 0, 0, 0, 0,
2159 &everyone)) {
2160 error = GetLastError();
2161 goto done;
2162 }
2163
2164 if (GetSecurityInfo(handle->handle,
2165 SE_KERNEL_OBJECT,
2166 DACL_SECURITY_INFORMATION,
2167 NULL,
2168 NULL,
2169 &old_dacl,
2170 NULL,
2171 &sd)) {
2172 error = GetLastError();
2173 goto clean_sid;
2174 }
2175
2176 memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2177 if (mode & UV_READABLE)
2178 ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2179 if (mode & UV_WRITABLE)
2180 ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2181 ea.grfAccessPermissions |= SYNCHRONIZE;
2182 ea.grfAccessMode = SET_ACCESS;
2183 ea.grfInheritance = NO_INHERITANCE;
2184 ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2185 ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2186 ea.Trustee.ptstrName = (LPTSTR)everyone;
2187
2188 if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2189 error = GetLastError();
2190 goto clean_sd;
2191 }
2192
2193 if (SetSecurityInfo(handle->handle,
2194 SE_KERNEL_OBJECT,
2195 DACL_SECURITY_INFORMATION,
2196 NULL,
2197 NULL,
2198 new_dacl,
2199 NULL)) {
2200 error = GetLastError();
2201 goto clean_dacl;
2202 }
2203
2204 error = 0;
2205
2206 clean_dacl:
2207 LocalFree((HLOCAL) new_dacl);
2208 clean_sd:
2209 LocalFree((HLOCAL) sd);
2210 clean_sid:
2211 FreeSid(everyone);
2212 done:
2213 return uv_translate_sys_error(error);
2214 }
2215