1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include <assert.h>
23 #include <io.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27
28 #include "uv.h"
29 #include "internal.h"
30 #include "handle-inl.h"
31 #include "stream-inl.h"
32 #include "req-inl.h"
33
34
35 /* A zero-size buffer for use by uv_pipe_read */
36 static char uv_zero_[] = "";
37
38 /* Null uv_buf_t */
39 static const uv_buf_t uv_null_buf_ = { 0, NULL };
40
41 /* The timeout that the pipe will wait for the remote end to write data */
42 /* when the local ends wants to shut it down. */
43 static const int64_t eof_timeout = 50; /* ms */
44
45 static const int default_pending_pipe_instances = 4;
46
47 /* IPC protocol flags. */
48 #define UV_IPC_RAW_DATA 0x0001
49 #define UV_IPC_TCP_SERVER 0x0002
50 #define UV_IPC_TCP_CONNECTION 0x0004
51
52 /* IPC frame header. */
53 typedef struct {
54 int flags;
55 uint64_t raw_data_length;
56 } uv_ipc_frame_header_t;
57
58 /* IPC frame, which contains an imported TCP socket stream. */
59 typedef struct {
60 uv_ipc_frame_header_t header;
61 WSAPROTOCOL_INFOW socket_info;
62 } uv_ipc_frame_uv_stream;
63
64 static void eof_timer_init(uv_pipe_t* pipe);
65 static void eof_timer_start(uv_pipe_t* pipe);
66 static void eof_timer_stop(uv_pipe_t* pipe);
67 static void eof_timer_cb(uv_timer_t* timer, int status);
68 static void eof_timer_destroy(uv_pipe_t* pipe);
69 static void eof_timer_close_cb(uv_handle_t* handle);
70
71
uv_unique_pipe_name(char * ptr,char * name,size_t size)72 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
73 _snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%u", ptr, GetCurrentProcessId());
74 }
75
76
uv_pipe_init(uv_loop_t * loop,uv_pipe_t * handle,int ipc)77 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
78 uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
79
80 handle->reqs_pending = 0;
81 handle->handle = INVALID_HANDLE_VALUE;
82 handle->name = NULL;
83 handle->ipc_pid = 0;
84 handle->remaining_ipc_rawdata_bytes = 0;
85 handle->pending_ipc_info.socket_info = NULL;
86 handle->pending_ipc_info.tcp_connection = 0;
87 handle->ipc = ipc;
88 handle->non_overlapped_writes_tail = NULL;
89
90 uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
91
92 return 0;
93 }
94
95
uv_pipe_connection_init(uv_pipe_t * handle)96 static void uv_pipe_connection_init(uv_pipe_t* handle) {
97 uv_connection_init((uv_stream_t*) handle);
98 handle->read_req.data = handle;
99 handle->eof_timer = NULL;
100 }
101
102
open_named_pipe(WCHAR * name,DWORD * duplex_flags)103 static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
104 HANDLE pipeHandle;
105
106 /*
107 * Assume that we have a duplex pipe first, so attempt to
108 * connect with GENERIC_READ | GENERIC_WRITE.
109 */
110 pipeHandle = CreateFileW(name,
111 GENERIC_READ | GENERIC_WRITE,
112 0,
113 NULL,
114 OPEN_EXISTING,
115 FILE_FLAG_OVERLAPPED,
116 NULL);
117 if (pipeHandle != INVALID_HANDLE_VALUE) {
118 *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
119 return pipeHandle;
120 }
121
122 /*
123 * If the pipe is not duplex CreateFileW fails with
124 * ERROR_ACCESS_DENIED. In that case try to connect
125 * as a read-only or write-only.
126 */
127 if (GetLastError() == ERROR_ACCESS_DENIED) {
128 pipeHandle = CreateFileW(name,
129 GENERIC_READ | FILE_WRITE_ATTRIBUTES,
130 0,
131 NULL,
132 OPEN_EXISTING,
133 FILE_FLAG_OVERLAPPED,
134 NULL);
135
136 if (pipeHandle != INVALID_HANDLE_VALUE) {
137 *duplex_flags = UV_HANDLE_READABLE;
138 return pipeHandle;
139 }
140 }
141
142 if (GetLastError() == ERROR_ACCESS_DENIED) {
143 pipeHandle = CreateFileW(name,
144 GENERIC_WRITE | FILE_READ_ATTRIBUTES,
145 0,
146 NULL,
147 OPEN_EXISTING,
148 FILE_FLAG_OVERLAPPED,
149 NULL);
150
151 if (pipeHandle != INVALID_HANDLE_VALUE) {
152 *duplex_flags = UV_HANDLE_WRITABLE;
153 return pipeHandle;
154 }
155 }
156
157 return INVALID_HANDLE_VALUE;
158 }
159
160
uv_stdio_pipe_server(uv_loop_t * loop,uv_pipe_t * handle,DWORD access,char * name,size_t nameSize)161 int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
162 char* name, size_t nameSize) {
163 HANDLE pipeHandle;
164 int err;
165 char* ptr = (char*)handle;
166
167 for (;;) {
168 uv_unique_pipe_name(ptr, name, nameSize);
169
170 pipeHandle = CreateNamedPipeA(name,
171 access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
172 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
173 NULL);
174
175 if (pipeHandle != INVALID_HANDLE_VALUE) {
176 /* No name collisions. We're done. */
177 break;
178 }
179
180 err = GetLastError();
181 if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
182 goto error;
183 }
184
185 /* Pipe name collision. Increment the pointer and try again. */
186 ptr++;
187 }
188
189 if (CreateIoCompletionPort(pipeHandle,
190 loop->iocp,
191 (ULONG_PTR)handle,
192 0) == NULL) {
193 err = GetLastError();
194 goto error;
195 }
196
197 uv_pipe_connection_init(handle);
198 handle->handle = pipeHandle;
199
200 return 0;
201
202 error:
203 if (pipeHandle != INVALID_HANDLE_VALUE) {
204 CloseHandle(pipeHandle);
205 }
206
207 return err;
208 }
209
210
uv_set_pipe_handle(uv_loop_t * loop,uv_pipe_t * handle,HANDLE pipeHandle,DWORD duplex_flags)211 static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
212 HANDLE pipeHandle, DWORD duplex_flags) {
213 NTSTATUS nt_status;
214 IO_STATUS_BLOCK io_status;
215 FILE_MODE_INFORMATION mode_info;
216 DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
217
218 if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
219 /* If this returns ERROR_INVALID_PARAMETER we probably opened something */
220 /* that is not a pipe. */
221 if (GetLastError() == ERROR_INVALID_PARAMETER) {
222 SetLastError(WSAENOTSOCK);
223 }
224 return -1;
225 }
226
227 /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
228 nt_status = pNtQueryInformationFile(pipeHandle,
229 &io_status,
230 &mode_info,
231 sizeof(mode_info),
232 FileModeInformation);
233 if (nt_status != STATUS_SUCCESS) {
234 return -1;
235 }
236
237 if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
238 mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
239 /* Non-overlapped pipe. */
240 handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
241 } else {
242 /* Overlapped pipe. Try to associate with IOCP. */
243 if (CreateIoCompletionPort(pipeHandle,
244 loop->iocp,
245 (ULONG_PTR)handle,
246 0) == NULL) {
247 handle->flags |= UV_HANDLE_EMULATE_IOCP;
248 }
249 }
250
251 handle->handle = pipeHandle;
252 handle->flags |= duplex_flags;
253
254 return 0;
255 }
256
257
pipe_shutdown_thread_proc(void * parameter)258 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
259 uv_loop_t* loop;
260 uv_pipe_t* handle;
261 uv_shutdown_t* req;
262
263 req = (uv_shutdown_t*) parameter;
264 assert(req);
265 handle = (uv_pipe_t*) req->handle;
266 assert(handle);
267 loop = handle->loop;
268 assert(loop);
269
270 FlushFileBuffers(handle->handle);
271
272 /* Post completed */
273 POST_COMPLETION_FOR_REQ(loop, req);
274
275 return 0;
276 }
277
278
uv_pipe_endgame(uv_loop_t * loop,uv_pipe_t * handle)279 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
280 int err;
281 DWORD result;
282 uv_shutdown_t* req;
283 NTSTATUS nt_status;
284 IO_STATUS_BLOCK io_status;
285 FILE_PIPE_LOCAL_INFORMATION pipe_info;
286
287 if ((handle->flags & UV_HANDLE_CONNECTION) &&
288 handle->shutdown_req != NULL &&
289 handle->write_reqs_pending == 0) {
290 req = handle->shutdown_req;
291
292 /* Clear the shutdown_req field so we don't go here again. */
293 handle->shutdown_req = NULL;
294
295 if (handle->flags & UV__HANDLE_CLOSING) {
296 UNREGISTER_HANDLE_REQ(loop, handle, req);
297
298 /* Already closing. Cancel the shutdown. */
299 if (req->cb) {
300 req->cb(req, UV_ECANCELED);
301 }
302
303 DECREASE_PENDING_REQ_COUNT(handle);
304 return;
305 }
306
307 /* Try to avoid flushing the pipe buffer in the thread pool. */
308 nt_status = pNtQueryInformationFile(handle->handle,
309 &io_status,
310 &pipe_info,
311 sizeof pipe_info,
312 FilePipeLocalInformation);
313
314 if (nt_status != STATUS_SUCCESS) {
315 /* Failure */
316 UNREGISTER_HANDLE_REQ(loop, handle, req);
317
318 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
319 if (req->cb) {
320 err = pRtlNtStatusToDosError(nt_status);
321 req->cb(req, uv_translate_sys_error(err));
322 }
323
324 DECREASE_PENDING_REQ_COUNT(handle);
325 return;
326 }
327
328 if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
329 /* Short-circuit, no need to call FlushFileBuffers. */
330 uv_insert_pending_req(loop, (uv_req_t*) req);
331 return;
332 }
333
334 /* Run FlushFileBuffers in the thread pool. */
335 result = QueueUserWorkItem(pipe_shutdown_thread_proc,
336 req,
337 WT_EXECUTELONGFUNCTION);
338 if (result) {
339 return;
340
341 } else {
342 /* Failure. */
343 UNREGISTER_HANDLE_REQ(loop, handle, req);
344
345 handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
346 if (req->cb) {
347 err = GetLastError();
348 req->cb(req, uv_translate_sys_error(err));
349 }
350
351 DECREASE_PENDING_REQ_COUNT(handle);
352 return;
353 }
354 }
355
356 if (handle->flags & UV__HANDLE_CLOSING &&
357 handle->reqs_pending == 0) {
358 assert(!(handle->flags & UV_HANDLE_CLOSED));
359
360 if (handle->flags & UV_HANDLE_CONNECTION) {
361 if (handle->pending_ipc_info.socket_info) {
362 free(handle->pending_ipc_info.socket_info);
363 handle->pending_ipc_info.socket_info = NULL;
364 }
365
366 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
367 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
368 UnregisterWait(handle->read_req.wait_handle);
369 handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
370 }
371 if (handle->read_req.event_handle) {
372 CloseHandle(handle->read_req.event_handle);
373 handle->read_req.event_handle = NULL;
374 }
375 }
376 }
377
378 if (handle->flags & UV_HANDLE_PIPESERVER) {
379 assert(handle->accept_reqs);
380 free(handle->accept_reqs);
381 handle->accept_reqs = NULL;
382 }
383
384 uv__handle_close(handle);
385 }
386 }
387
388
uv_pipe_pending_instances(uv_pipe_t * handle,int count)389 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
390 handle->pending_instances = count;
391 handle->flags |= UV_HANDLE_PIPESERVER;
392 }
393
394
395 /* Creates a pipe server. */
uv_pipe_bind(uv_pipe_t * handle,const char * name)396 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
397 uv_loop_t* loop = handle->loop;
398 int i, err, nameSize;
399 uv_pipe_accept_t* req;
400
401 if (handle->flags & UV_HANDLE_BOUND) {
402 return UV_EINVAL;
403 }
404
405 if (!name) {
406 return UV_EINVAL;
407 }
408
409 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
410 handle->pending_instances = default_pending_pipe_instances;
411 }
412
413 handle->accept_reqs = (uv_pipe_accept_t*)
414 malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances);
415 if (!handle->accept_reqs) {
416 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
417 }
418
419 for (i = 0; i < handle->pending_instances; i++) {
420 req = &handle->accept_reqs[i];
421 uv_req_init(loop, (uv_req_t*) req);
422 req->type = UV_ACCEPT;
423 req->data = handle;
424 req->pipeHandle = INVALID_HANDLE_VALUE;
425 req->next_pending = NULL;
426 }
427
428 /* Convert name to UTF16. */
429 nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
430 handle->name = (WCHAR*)malloc(nameSize);
431 if (!handle->name) {
432 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
433 }
434
435 if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
436 return uv_translate_sys_error(GetLastError());
437 }
438
439 /*
440 * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
441 * If this fails then there's already a pipe server for the given pipe name.
442 */
443 handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
444 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
445 FILE_FLAG_FIRST_PIPE_INSTANCE,
446 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
447 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
448
449 if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
450 err = GetLastError();
451 if (err == ERROR_ACCESS_DENIED) {
452 err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
453 } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
454 err = WSAEACCES; /* Translates to UV_EACCES. */
455 }
456 goto error;
457 }
458
459 if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) {
460 err = GetLastError();
461 goto error;
462 }
463
464 handle->pending_accepts = NULL;
465 handle->flags |= UV_HANDLE_PIPESERVER;
466 handle->flags |= UV_HANDLE_BOUND;
467
468 return 0;
469
470 error:
471 if (handle->name) {
472 free(handle->name);
473 handle->name = NULL;
474 }
475
476 if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
477 CloseHandle(handle->accept_reqs[0].pipeHandle);
478 handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
479 }
480
481 return uv_translate_sys_error(err);
482 }
483
484
pipe_connect_thread_proc(void * parameter)485 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
486 uv_loop_t* loop;
487 uv_pipe_t* handle;
488 uv_connect_t* req;
489 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
490 DWORD duplex_flags;
491
492 req = (uv_connect_t*) parameter;
493 assert(req);
494 handle = (uv_pipe_t*) req->handle;
495 assert(handle);
496 loop = handle->loop;
497 assert(loop);
498
499 /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
500 /* We wait for the pipe to become available with WaitNamedPipe. */
501 while (WaitNamedPipeW(handle->name, 30000)) {
502 /* The pipe is now available, try to connect. */
503 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
504 if (pipeHandle != INVALID_HANDLE_VALUE) {
505 break;
506 }
507
508 SwitchToThread();
509 }
510
511 if (pipeHandle != INVALID_HANDLE_VALUE &&
512 !uv_set_pipe_handle(loop, handle, pipeHandle, duplex_flags)) {
513 SET_REQ_SUCCESS(req);
514 } else {
515 SET_REQ_ERROR(req, GetLastError());
516 }
517
518 /* Post completed */
519 POST_COMPLETION_FOR_REQ(loop, req);
520
521 return 0;
522 }
523
524
uv_pipe_connect(uv_connect_t * req,uv_pipe_t * handle,const char * name,uv_connect_cb cb)525 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
526 const char* name, uv_connect_cb cb) {
527 uv_loop_t* loop = handle->loop;
528 int err, nameSize;
529 HANDLE pipeHandle = INVALID_HANDLE_VALUE;
530 DWORD duplex_flags;
531
532 uv_req_init(loop, (uv_req_t*) req);
533 req->type = UV_CONNECT;
534 req->handle = (uv_stream_t*) handle;
535 req->cb = cb;
536
537 /* Convert name to UTF16. */
538 nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
539 handle->name = (WCHAR*)malloc(nameSize);
540 if (!handle->name) {
541 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
542 }
543
544 if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
545 err = GetLastError();
546 goto error;
547 }
548
549 pipeHandle = open_named_pipe(handle->name, &duplex_flags);
550 if (pipeHandle == INVALID_HANDLE_VALUE) {
551 if (GetLastError() == ERROR_PIPE_BUSY) {
552 /* Wait for the server to make a pipe instance available. */
553 if (!QueueUserWorkItem(&pipe_connect_thread_proc,
554 req,
555 WT_EXECUTELONGFUNCTION)) {
556 err = GetLastError();
557 goto error;
558 }
559
560 REGISTER_HANDLE_REQ(loop, handle, req);
561 handle->reqs_pending++;
562
563 return;
564 }
565
566 err = GetLastError();
567 goto error;
568 }
569
570 assert(pipeHandle != INVALID_HANDLE_VALUE);
571
572 if (uv_set_pipe_handle(loop,
573 (uv_pipe_t*) req->handle,
574 pipeHandle,
575 duplex_flags)) {
576 err = GetLastError();
577 goto error;
578 }
579
580 SET_REQ_SUCCESS(req);
581 uv_insert_pending_req(loop, (uv_req_t*) req);
582 handle->reqs_pending++;
583 REGISTER_HANDLE_REQ(loop, handle, req);
584 return;
585
586 error:
587 if (handle->name) {
588 free(handle->name);
589 handle->name = NULL;
590 }
591
592 if (pipeHandle != INVALID_HANDLE_VALUE) {
593 CloseHandle(pipeHandle);
594 }
595
596 /* Make this req pending reporting an error. */
597 SET_REQ_ERROR(req, err);
598 uv_insert_pending_req(loop, (uv_req_t*) req);
599 handle->reqs_pending++;
600 REGISTER_HANDLE_REQ(loop, handle, req);
601 return;
602 }
603
604
605 /* Cleans up uv_pipe_t (server or connection) and all resources associated */
606 /* with it. */
uv_pipe_cleanup(uv_loop_t * loop,uv_pipe_t * handle)607 void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
608 int i;
609 HANDLE pipeHandle;
610
611 if (handle->name) {
612 free(handle->name);
613 handle->name = NULL;
614 }
615
616 if (handle->flags & UV_HANDLE_PIPESERVER) {
617 for (i = 0; i < handle->pending_instances; i++) {
618 pipeHandle = handle->accept_reqs[i].pipeHandle;
619 if (pipeHandle != INVALID_HANDLE_VALUE) {
620 CloseHandle(pipeHandle);
621 handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
622 }
623 }
624 }
625
626 if (handle->flags & UV_HANDLE_CONNECTION) {
627 handle->flags &= ~UV_HANDLE_WRITABLE;
628 eof_timer_destroy(handle);
629 }
630
631 if ((handle->flags & UV_HANDLE_CONNECTION)
632 && handle->handle != INVALID_HANDLE_VALUE) {
633 CloseHandle(handle->handle);
634 handle->handle = INVALID_HANDLE_VALUE;
635 }
636 }
637
638
uv_pipe_close(uv_loop_t * loop,uv_pipe_t * handle)639 void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
640 if (handle->flags & UV_HANDLE_READING) {
641 handle->flags &= ~UV_HANDLE_READING;
642 DECREASE_ACTIVE_COUNT(loop, handle);
643 }
644
645 if (handle->flags & UV_HANDLE_LISTENING) {
646 handle->flags &= ~UV_HANDLE_LISTENING;
647 DECREASE_ACTIVE_COUNT(loop, handle);
648 }
649
650 uv_pipe_cleanup(loop, handle);
651
652 if (handle->reqs_pending == 0) {
653 uv_want_endgame(loop, (uv_handle_t*) handle);
654 }
655
656 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
657 uv__handle_closing(handle);
658 }
659
660
uv_pipe_queue_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)661 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
662 uv_pipe_accept_t* req, BOOL firstInstance) {
663 assert(handle->flags & UV_HANDLE_LISTENING);
664
665 if (!firstInstance) {
666 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
667
668 req->pipeHandle = CreateNamedPipeW(handle->name,
669 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
670 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
671 PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
672
673 if (req->pipeHandle == INVALID_HANDLE_VALUE) {
674 SET_REQ_ERROR(req, GetLastError());
675 uv_insert_pending_req(loop, (uv_req_t*) req);
676 handle->reqs_pending++;
677 return;
678 }
679
680 if (uv_set_pipe_handle(loop, handle, req->pipeHandle, 0)) {
681 CloseHandle(req->pipeHandle);
682 req->pipeHandle = INVALID_HANDLE_VALUE;
683 SET_REQ_ERROR(req, GetLastError());
684 uv_insert_pending_req(loop, (uv_req_t*) req);
685 handle->reqs_pending++;
686 return;
687 }
688 }
689
690 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
691
692 /* Prepare the overlapped structure. */
693 memset(&(req->overlapped), 0, sizeof(req->overlapped));
694
695 if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) &&
696 GetLastError() != ERROR_IO_PENDING) {
697 if (GetLastError() == ERROR_PIPE_CONNECTED) {
698 SET_REQ_SUCCESS(req);
699 } else {
700 CloseHandle(req->pipeHandle);
701 req->pipeHandle = INVALID_HANDLE_VALUE;
702 /* Make this req pending reporting an error. */
703 SET_REQ_ERROR(req, GetLastError());
704 }
705 uv_insert_pending_req(loop, (uv_req_t*) req);
706 handle->reqs_pending++;
707 return;
708 }
709
710 handle->reqs_pending++;
711 }
712
713
uv_pipe_accept(uv_pipe_t * server,uv_stream_t * client)714 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
715 uv_loop_t* loop = server->loop;
716 uv_pipe_t* pipe_client;
717 uv_pipe_accept_t* req;
718
719 if (server->ipc) {
720 if (!server->pending_ipc_info.socket_info) {
721 /* No valid pending sockets. */
722 return WSAEWOULDBLOCK;
723 }
724
725 return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
726 server->pending_ipc_info.tcp_connection);
727 } else {
728 pipe_client = (uv_pipe_t*)client;
729
730 /* Find a connection instance that has been connected, but not yet */
731 /* accepted. */
732 req = server->pending_accepts;
733
734 if (!req) {
735 /* No valid connections found, so we error out. */
736 return WSAEWOULDBLOCK;
737 }
738
739 /* Initialize the client handle and copy the pipeHandle to the client */
740 uv_pipe_connection_init(pipe_client);
741 pipe_client->handle = req->pipeHandle;
742 pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
743
744 /* Prepare the req to pick up a new connection */
745 server->pending_accepts = req->next_pending;
746 req->next_pending = NULL;
747 req->pipeHandle = INVALID_HANDLE_VALUE;
748
749 if (!(server->flags & UV__HANDLE_CLOSING)) {
750 uv_pipe_queue_accept(loop, server, req, FALSE);
751 }
752 }
753
754 return 0;
755 }
756
757
758 /* Starts listening for connections for the given pipe. */
uv_pipe_listen(uv_pipe_t * handle,int backlog,uv_connection_cb cb)759 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
760 uv_loop_t* loop = handle->loop;
761 int i;
762
763 if (handle->flags & UV_HANDLE_LISTENING) {
764 handle->connection_cb = cb;
765 }
766
767 if (!(handle->flags & UV_HANDLE_BOUND)) {
768 return WSAEINVAL;
769 }
770
771 if (handle->flags & UV_HANDLE_READING) {
772 return WSAEISCONN;
773 }
774
775 if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
776 return ERROR_NOT_SUPPORTED;
777 }
778
779 handle->flags |= UV_HANDLE_LISTENING;
780 INCREASE_ACTIVE_COUNT(loop, handle);
781 handle->connection_cb = cb;
782
783 /* First pipe handle should have already been created in uv_pipe_bind */
784 assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
785
786 for (i = 0; i < handle->pending_instances; i++) {
787 uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
788 }
789
790 return 0;
791 }
792
793
uv_pipe_zero_readfile_thread_proc(void * parameter)794 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
795 int result;
796 DWORD bytes;
797 uv_read_t* req = (uv_read_t*) parameter;
798 uv_pipe_t* handle = (uv_pipe_t*) req->data;
799 uv_loop_t* loop = handle->loop;
800
801 assert(req != NULL);
802 assert(req->type == UV_READ);
803 assert(handle->type == UV_NAMED_PIPE);
804
805 result = ReadFile(handle->handle,
806 &uv_zero_,
807 0,
808 &bytes,
809 NULL);
810
811 if (!result) {
812 SET_REQ_ERROR(req, GetLastError());
813 }
814
815 POST_COMPLETION_FOR_REQ(loop, req);
816 return 0;
817 }
818
819
uv_pipe_writefile_thread_proc(void * parameter)820 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
821 int result;
822 DWORD bytes;
823 uv_write_t* req = (uv_write_t*) parameter;
824 uv_pipe_t* handle = (uv_pipe_t*) req->handle;
825 uv_loop_t* loop = handle->loop;
826
827 assert(req != NULL);
828 assert(req->type == UV_WRITE);
829 assert(handle->type == UV_NAMED_PIPE);
830 assert(req->write_buffer.base);
831
832 result = WriteFile(handle->handle,
833 req->write_buffer.base,
834 req->write_buffer.len,
835 &bytes,
836 NULL);
837
838 if (!result) {
839 SET_REQ_ERROR(req, GetLastError());
840 }
841
842 POST_COMPLETION_FOR_REQ(loop, req);
843 return 0;
844 }
845
846
post_completion_read_wait(void * context,BOOLEAN timed_out)847 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
848 uv_read_t* req;
849 uv_tcp_t* handle;
850
851 req = (uv_read_t*) context;
852 assert(req != NULL);
853 handle = (uv_tcp_t*)req->data;
854 assert(handle != NULL);
855 assert(!timed_out);
856
857 if (!PostQueuedCompletionStatus(handle->loop->iocp,
858 req->overlapped.InternalHigh,
859 0,
860 &req->overlapped)) {
861 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
862 }
863 }
864
865
post_completion_write_wait(void * context,BOOLEAN timed_out)866 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
867 uv_write_t* req;
868 uv_tcp_t* handle;
869
870 req = (uv_write_t*) context;
871 assert(req != NULL);
872 handle = (uv_tcp_t*)req->handle;
873 assert(handle != NULL);
874 assert(!timed_out);
875
876 if (!PostQueuedCompletionStatus(handle->loop->iocp,
877 req->overlapped.InternalHigh,
878 0,
879 &req->overlapped)) {
880 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
881 }
882 }
883
884
uv_pipe_queue_read(uv_loop_t * loop,uv_pipe_t * handle)885 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
886 uv_read_t* req;
887 int result;
888
889 assert(handle->flags & UV_HANDLE_READING);
890 assert(!(handle->flags & UV_HANDLE_READ_PENDING));
891
892 assert(handle->handle != INVALID_HANDLE_VALUE);
893
894 req = &handle->read_req;
895
896 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
897 if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
898 req,
899 WT_EXECUTELONGFUNCTION)) {
900 /* Make this req pending reporting an error. */
901 SET_REQ_ERROR(req, GetLastError());
902 goto error;
903 }
904 } else {
905 memset(&req->overlapped, 0, sizeof(req->overlapped));
906 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
907 req->overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
908 }
909
910 /* Do 0-read */
911 result = ReadFile(handle->handle,
912 &uv_zero_,
913 0,
914 NULL,
915 &req->overlapped);
916
917 if (!result && GetLastError() != ERROR_IO_PENDING) {
918 /* Make this req pending reporting an error. */
919 SET_REQ_ERROR(req, GetLastError());
920 goto error;
921 }
922
923 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
924 if (!req->event_handle) {
925 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
926 if (!req->event_handle) {
927 uv_fatal_error(GetLastError(), "CreateEvent");
928 }
929 }
930 if (req->wait_handle == INVALID_HANDLE_VALUE) {
931 if (!RegisterWaitForSingleObject(&req->wait_handle,
932 req->overlapped.hEvent, post_completion_read_wait, (void*) req,
933 INFINITE, WT_EXECUTEINWAITTHREAD)) {
934 SET_REQ_ERROR(req, GetLastError());
935 goto error;
936 }
937 }
938 }
939 }
940
941 /* Start the eof timer if there is one */
942 eof_timer_start(handle);
943 handle->flags |= UV_HANDLE_READ_PENDING;
944 handle->reqs_pending++;
945 return;
946
947 error:
948 uv_insert_pending_req(loop, (uv_req_t*)req);
949 handle->flags |= UV_HANDLE_READ_PENDING;
950 handle->reqs_pending++;
951 }
952
953
uv_pipe_read_start_impl(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb,uv_read2_cb read2_cb)954 static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
955 uv_read_cb read_cb, uv_read2_cb read2_cb) {
956 uv_loop_t* loop = handle->loop;
957
958 handle->flags |= UV_HANDLE_READING;
959 INCREASE_ACTIVE_COUNT(loop, handle);
960 handle->read_cb = read_cb;
961 handle->read2_cb = read2_cb;
962 handle->alloc_cb = alloc_cb;
963
964 /* If reading was stopped and then started again, there could still be a */
965 /* read request pending. */
966 if (!(handle->flags & UV_HANDLE_READ_PENDING))
967 uv_pipe_queue_read(loop, handle);
968
969 return 0;
970 }
971
972
uv_pipe_read_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)973 int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
974 uv_read_cb read_cb) {
975 return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
976 }
977
978
uv_pipe_read2_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read2_cb read_cb)979 int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
980 uv_read2_cb read_cb) {
981 return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
982 }
983
984
uv_insert_non_overlapped_write_req(uv_pipe_t * handle,uv_write_t * req)985 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
986 uv_write_t* req) {
987 req->next_req = NULL;
988 if (handle->non_overlapped_writes_tail) {
989 req->next_req =
990 handle->non_overlapped_writes_tail->next_req;
991 handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req;
992 handle->non_overlapped_writes_tail = req;
993 } else {
994 req->next_req = (uv_req_t*)req;
995 handle->non_overlapped_writes_tail = req;
996 }
997 }
998
999
uv_remove_non_overlapped_write_req(uv_pipe_t * handle)1000 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1001 uv_write_t* req;
1002
1003 if (handle->non_overlapped_writes_tail) {
1004 req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req;
1005
1006 if (req == handle->non_overlapped_writes_tail) {
1007 handle->non_overlapped_writes_tail = NULL;
1008 } else {
1009 handle->non_overlapped_writes_tail->next_req =
1010 req->next_req;
1011 }
1012
1013 return req;
1014 } else {
1015 /* queue empty */
1016 return NULL;
1017 }
1018 }
1019
1020
uv_queue_non_overlapped_write(uv_pipe_t * handle)1021 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
1022 uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1023 if (req) {
1024 if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1025 req,
1026 WT_EXECUTELONGFUNCTION)) {
1027 uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1028 }
1029 }
1030 }
1031
1032
uv_pipe_write_impl(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1033 static int uv_pipe_write_impl(uv_loop_t* loop,
1034 uv_write_t* req,
1035 uv_pipe_t* handle,
1036 const uv_buf_t bufs[],
1037 unsigned int nbufs,
1038 uv_stream_t* send_handle,
1039 uv_write_cb cb) {
1040 int err;
1041 int result;
1042 uv_tcp_t* tcp_send_handle;
1043 uv_write_t* ipc_header_req;
1044 uv_ipc_frame_uv_stream ipc_frame;
1045
1046 if (nbufs != 1 && (nbufs != 0 || !send_handle)) {
1047 return ERROR_NOT_SUPPORTED;
1048 }
1049
1050 /* Only TCP handles are supported for sharing. */
1051 if (send_handle && ((send_handle->type != UV_TCP) ||
1052 (!(send_handle->flags & UV_HANDLE_BOUND) &&
1053 !(send_handle->flags & UV_HANDLE_CONNECTION)))) {
1054 return ERROR_NOT_SUPPORTED;
1055 }
1056
1057 assert(handle->handle != INVALID_HANDLE_VALUE);
1058
1059 uv_req_init(loop, (uv_req_t*) req);
1060 req->type = UV_WRITE;
1061 req->handle = (uv_stream_t*) handle;
1062 req->cb = cb;
1063 req->ipc_header = 0;
1064 req->event_handle = NULL;
1065 req->wait_handle = INVALID_HANDLE_VALUE;
1066 memset(&req->overlapped, 0, sizeof(req->overlapped));
1067
1068 if (handle->ipc) {
1069 assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1070 ipc_frame.header.flags = 0;
1071
1072 /* Use the IPC framing protocol. */
1073 if (send_handle) {
1074 tcp_send_handle = (uv_tcp_t*)send_handle;
1075
1076 err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
1077 &ipc_frame.socket_info);
1078 if (err) {
1079 return err;
1080 }
1081 ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
1082
1083 if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
1084 ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
1085 }
1086 }
1087
1088 if (nbufs == 1) {
1089 ipc_frame.header.flags |= UV_IPC_RAW_DATA;
1090 ipc_frame.header.raw_data_length = bufs[0].len;
1091 }
1092
1093 /*
1094 * Use the provided req if we're only doing a single write.
1095 * If we're doing multiple writes, use ipc_header_write_req to do
1096 * the first write, and then use the provided req for the second write.
1097 */
1098 if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1099 ipc_header_req = req;
1100 } else {
1101 /*
1102 * Try to use the preallocated write req if it's available.
1103 * Otherwise allocate a new one.
1104 */
1105 if (handle->ipc_header_write_req.type != UV_WRITE) {
1106 ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
1107 } else {
1108 ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
1109 if (!ipc_header_req) {
1110 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1111 }
1112 }
1113
1114 uv_req_init(loop, (uv_req_t*) ipc_header_req);
1115 ipc_header_req->type = UV_WRITE;
1116 ipc_header_req->handle = (uv_stream_t*) handle;
1117 ipc_header_req->cb = NULL;
1118 ipc_header_req->ipc_header = 1;
1119 }
1120
1121 /* Write the header or the whole frame. */
1122 memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
1123
1124 /* Using overlapped IO, but wait for completion before returning.
1125 This write is blocking because ipc_frame is on stack. */
1126 ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1127 if (!ipc_header_req->overlapped.hEvent) {
1128 uv_fatal_error(GetLastError(), "CreateEvent");
1129 }
1130
1131 result = WriteFile(handle->handle,
1132 &ipc_frame,
1133 ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
1134 sizeof(ipc_frame) : sizeof(ipc_frame.header),
1135 NULL,
1136 &ipc_header_req->overlapped);
1137 if (!result && GetLastError() != ERROR_IO_PENDING) {
1138 err = GetLastError();
1139 CloseHandle(ipc_header_req->overlapped.hEvent);
1140 return err;
1141 }
1142
1143 if (!result) {
1144 /* Request not completed immediately. Wait for it.*/
1145 if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
1146 WAIT_OBJECT_0) {
1147 err = GetLastError();
1148 CloseHandle(ipc_header_req->overlapped.hEvent);
1149 return err;
1150 }
1151 }
1152 ipc_header_req->queued_bytes = 0;
1153 CloseHandle(ipc_header_req->overlapped.hEvent);
1154 ipc_header_req->overlapped.hEvent = NULL;
1155
1156 REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
1157 handle->reqs_pending++;
1158 handle->write_reqs_pending++;
1159
1160 /* If we don't have any raw data to write - we're done. */
1161 if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1162 return 0;
1163 }
1164 }
1165
1166 if ((handle->flags &
1167 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1168 (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1169 DWORD bytes;
1170 result = WriteFile(handle->handle,
1171 bufs[0].base,
1172 bufs[0].len,
1173 &bytes,
1174 NULL);
1175
1176 if (!result) {
1177 return err;
1178 } else {
1179 /* Request completed immediately. */
1180 req->queued_bytes = 0;
1181 }
1182
1183 REGISTER_HANDLE_REQ(loop, handle, req);
1184 handle->reqs_pending++;
1185 handle->write_reqs_pending++;
1186 POST_COMPLETION_FOR_REQ(loop, req);
1187 return 0;
1188 } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1189 req->write_buffer = bufs[0];
1190 uv_insert_non_overlapped_write_req(handle, req);
1191 if (handle->write_reqs_pending == 0) {
1192 uv_queue_non_overlapped_write(handle);
1193 }
1194
1195 /* Request queued by the kernel. */
1196 req->queued_bytes = uv_count_bufs(bufs, nbufs);
1197 handle->write_queue_size += req->queued_bytes;
1198 } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1199 /* Using overlapped IO, but wait for completion before returning */
1200 req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1201 if (!req->overlapped.hEvent) {
1202 uv_fatal_error(GetLastError(), "CreateEvent");
1203 }
1204
1205 result = WriteFile(handle->handle,
1206 bufs[0].base,
1207 bufs[0].len,
1208 NULL,
1209 &req->overlapped);
1210
1211 if (!result && GetLastError() != ERROR_IO_PENDING) {
1212 err = GetLastError();
1213 CloseHandle(req->overlapped.hEvent);
1214 return err;
1215 }
1216
1217 if (result) {
1218 /* Request completed immediately. */
1219 req->queued_bytes = 0;
1220 } else {
1221 /* Request queued by the kernel. */
1222 if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
1223 WAIT_OBJECT_0) {
1224 err = GetLastError();
1225 CloseHandle(ipc_header_req->overlapped.hEvent);
1226 return uv_translate_sys_error(err);
1227 }
1228 }
1229 CloseHandle(req->overlapped.hEvent);
1230
1231 REGISTER_HANDLE_REQ(loop, handle, req);
1232 handle->reqs_pending++;
1233 handle->write_reqs_pending++;
1234 POST_COMPLETION_FOR_REQ(loop, req);
1235 return 0;
1236 } else {
1237 result = WriteFile(handle->handle,
1238 bufs[0].base,
1239 bufs[0].len,
1240 NULL,
1241 &req->overlapped);
1242
1243 if (!result && GetLastError() != ERROR_IO_PENDING) {
1244 return GetLastError();
1245 }
1246
1247 if (result) {
1248 /* Request completed immediately. */
1249 req->queued_bytes = 0;
1250 } else {
1251 /* Request queued by the kernel. */
1252 req->queued_bytes = uv_count_bufs(bufs, nbufs);
1253 handle->write_queue_size += req->queued_bytes;
1254 }
1255
1256 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1257 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1258 if (!req->event_handle) {
1259 uv_fatal_error(GetLastError(), "CreateEvent");
1260 }
1261 if (!RegisterWaitForSingleObject(&req->wait_handle,
1262 req->overlapped.hEvent, post_completion_write_wait, (void*) req,
1263 INFINITE, WT_EXECUTEINWAITTHREAD)) {
1264 return GetLastError();
1265 }
1266 }
1267 }
1268
1269 REGISTER_HANDLE_REQ(loop, handle, req);
1270 handle->reqs_pending++;
1271 handle->write_reqs_pending++;
1272
1273 return 0;
1274 }
1275
1276
uv_pipe_write(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)1277 int uv_pipe_write(uv_loop_t* loop,
1278 uv_write_t* req,
1279 uv_pipe_t* handle,
1280 const uv_buf_t bufs[],
1281 unsigned int nbufs,
1282 uv_write_cb cb) {
1283 return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
1284 }
1285
1286
uv_pipe_write2(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1287 int uv_pipe_write2(uv_loop_t* loop,
1288 uv_write_t* req,
1289 uv_pipe_t* handle,
1290 const uv_buf_t bufs[],
1291 unsigned int nbufs,
1292 uv_stream_t* send_handle,
1293 uv_write_cb cb) {
1294 if (!handle->ipc) {
1295 return WSAEINVAL;
1296 }
1297
1298 return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
1299 }
1300
1301
uv_pipe_read_eof(uv_loop_t * loop,uv_pipe_t * handle,uv_buf_t buf)1302 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1303 uv_buf_t buf) {
1304 /* If there is an eof timer running, we don't need it any more, */
1305 /* so discard it. */
1306 eof_timer_destroy(handle);
1307
1308 handle->flags &= ~UV_HANDLE_READABLE;
1309 uv_read_stop((uv_stream_t*) handle);
1310
1311 if (handle->read2_cb) {
1312 handle->read2_cb(handle, UV_EOF, &uv_null_buf_, UV_UNKNOWN_HANDLE);
1313 } else {
1314 handle->read_cb((uv_stream_t*) handle, UV_EOF, &uv_null_buf_);
1315 }
1316 }
1317
1318
uv_pipe_read_error(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1319 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1320 uv_buf_t buf) {
1321 /* If there is an eof timer running, we don't need it any more, */
1322 /* so discard it. */
1323 eof_timer_destroy(handle);
1324
1325 uv_read_stop((uv_stream_t*) handle);
1326
1327 if (handle->read2_cb) {
1328 handle->read2_cb(handle,
1329 uv_translate_sys_error(error),
1330 &buf,
1331 UV_UNKNOWN_HANDLE);
1332 } else {
1333 handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1334 }
1335 }
1336
1337
uv_pipe_read_error_or_eof(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1338 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1339 int error, uv_buf_t buf) {
1340 if (error == ERROR_BROKEN_PIPE) {
1341 uv_pipe_read_eof(loop, handle, buf);
1342 } else {
1343 uv_pipe_read_error(loop, handle, error, buf);
1344 }
1345 }
1346
1347
uv_process_pipe_read_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * req)1348 void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
1349 uv_req_t* req) {
1350 DWORD bytes, avail;
1351 uv_buf_t buf;
1352 uv_ipc_frame_uv_stream ipc_frame;
1353
1354 assert(handle->type == UV_NAMED_PIPE);
1355
1356 handle->flags &= ~UV_HANDLE_READ_PENDING;
1357 eof_timer_stop(handle);
1358
1359 if (!REQ_SUCCESS(req)) {
1360 /* An error occurred doing the 0-read. */
1361 if (handle->flags & UV_HANDLE_READING) {
1362 uv_pipe_read_error_or_eof(loop,
1363 handle,
1364 GET_REQ_ERROR(req),
1365 uv_null_buf_);
1366 }
1367 } else {
1368 /* Do non-blocking reads until the buffer is empty */
1369 while (handle->flags & UV_HANDLE_READING) {
1370 if (!PeekNamedPipe(handle->handle,
1371 NULL,
1372 0,
1373 NULL,
1374 &avail,
1375 NULL)) {
1376 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1377 break;
1378 }
1379
1380 if (avail == 0) {
1381 /* There is nothing to read after all. */
1382 break;
1383 }
1384
1385 if (handle->ipc) {
1386 /* Use the IPC framing protocol to read the incoming data. */
1387 if (handle->remaining_ipc_rawdata_bytes == 0) {
1388 /* We're reading a new frame. First, read the header. */
1389 assert(avail >= sizeof(ipc_frame.header));
1390
1391 if (!ReadFile(handle->handle,
1392 &ipc_frame.header,
1393 sizeof(ipc_frame.header),
1394 &bytes,
1395 NULL)) {
1396 uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1397 uv_null_buf_);
1398 break;
1399 }
1400
1401 assert(bytes == sizeof(ipc_frame.header));
1402 assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
1403 UV_IPC_TCP_CONNECTION));
1404
1405 if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
1406 assert(avail - sizeof(ipc_frame.header) >=
1407 sizeof(ipc_frame.socket_info));
1408
1409 /* Read the TCP socket info. */
1410 if (!ReadFile(handle->handle,
1411 &ipc_frame.socket_info,
1412 sizeof(ipc_frame) - sizeof(ipc_frame.header),
1413 &bytes,
1414 NULL)) {
1415 uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1416 uv_null_buf_);
1417 break;
1418 }
1419
1420 assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
1421
1422 /* Store the pending socket info. */
1423 assert(!handle->pending_ipc_info.socket_info);
1424 handle->pending_ipc_info.socket_info =
1425 (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
1426 if (!handle->pending_ipc_info.socket_info) {
1427 uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1428 }
1429
1430 *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
1431 handle->pending_ipc_info.tcp_connection =
1432 ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
1433 }
1434
1435 if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
1436 handle->remaining_ipc_rawdata_bytes =
1437 ipc_frame.header.raw_data_length;
1438 continue;
1439 }
1440 } else {
1441 avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
1442 }
1443 }
1444
1445 handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
1446 if (buf.len == 0) {
1447 if (handle->read2_cb) {
1448 handle->read2_cb(handle, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE);
1449 } else if (handle->read_cb) {
1450 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1451 }
1452 break;
1453 }
1454 assert(buf.base != NULL);
1455
1456 if (ReadFile(handle->handle,
1457 buf.base,
1458 buf.len,
1459 &bytes,
1460 NULL)) {
1461 /* Successful read */
1462 if (handle->ipc) {
1463 assert(handle->remaining_ipc_rawdata_bytes >= bytes);
1464 handle->remaining_ipc_rawdata_bytes =
1465 handle->remaining_ipc_rawdata_bytes - bytes;
1466 if (handle->read2_cb) {
1467 handle->read2_cb(handle, bytes, &buf,
1468 handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
1469 } else if (handle->read_cb) {
1470 handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1471 }
1472
1473 if (handle->pending_ipc_info.socket_info) {
1474 free(handle->pending_ipc_info.socket_info);
1475 handle->pending_ipc_info.socket_info = NULL;
1476 }
1477 } else {
1478 handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1479 }
1480
1481 /* Read again only if bytes == buf.len */
1482 if (bytes <= buf.len) {
1483 break;
1484 }
1485 } else {
1486 uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1487 break;
1488 }
1489 }
1490
1491 /* Post another 0-read if still reading and not closing. */
1492 if ((handle->flags & UV_HANDLE_READING) &&
1493 !(handle->flags & UV_HANDLE_READ_PENDING)) {
1494 uv_pipe_queue_read(loop, handle);
1495 }
1496 }
1497
1498 DECREASE_PENDING_REQ_COUNT(handle);
1499 }
1500
1501
uv_process_pipe_write_req(uv_loop_t * loop,uv_pipe_t * handle,uv_write_t * req)1502 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
1503 uv_write_t* req) {
1504 int err;
1505
1506 assert(handle->type == UV_NAMED_PIPE);
1507
1508 assert(handle->write_queue_size >= req->queued_bytes);
1509 handle->write_queue_size -= req->queued_bytes;
1510
1511 UNREGISTER_HANDLE_REQ(loop, handle, req);
1512
1513 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1514 if (req->wait_handle != INVALID_HANDLE_VALUE) {
1515 UnregisterWait(req->wait_handle);
1516 req->wait_handle = INVALID_HANDLE_VALUE;
1517 }
1518 if (req->event_handle) {
1519 CloseHandle(req->event_handle);
1520 req->event_handle = NULL;
1521 }
1522 }
1523
1524 if (req->ipc_header) {
1525 if (req == &handle->ipc_header_write_req) {
1526 req->type = UV_UNKNOWN_REQ;
1527 } else {
1528 free(req);
1529 }
1530 } else {
1531 if (req->cb) {
1532 err = GET_REQ_ERROR(req);
1533 req->cb(req, uv_translate_sys_error(err));
1534 }
1535 }
1536
1537 handle->write_reqs_pending--;
1538
1539 if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
1540 handle->non_overlapped_writes_tail) {
1541 assert(handle->write_reqs_pending > 0);
1542 uv_queue_non_overlapped_write(handle);
1543 }
1544
1545 if (handle->shutdown_req != NULL &&
1546 handle->write_reqs_pending == 0) {
1547 uv_want_endgame(loop, (uv_handle_t*)handle);
1548 }
1549
1550 DECREASE_PENDING_REQ_COUNT(handle);
1551 }
1552
1553
uv_process_pipe_accept_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * raw_req)1554 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
1555 uv_req_t* raw_req) {
1556 uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
1557
1558 assert(handle->type == UV_NAMED_PIPE);
1559
1560 if (handle->flags & UV__HANDLE_CLOSING) {
1561 /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
1562 assert(req->pipeHandle == INVALID_HANDLE_VALUE);
1563 DECREASE_PENDING_REQ_COUNT(handle);
1564 return;
1565 }
1566
1567 if (REQ_SUCCESS(req)) {
1568 assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1569 req->next_pending = handle->pending_accepts;
1570 handle->pending_accepts = req;
1571
1572 if (handle->connection_cb) {
1573 handle->connection_cb((uv_stream_t*)handle, 0);
1574 }
1575 } else {
1576 if (req->pipeHandle != INVALID_HANDLE_VALUE) {
1577 CloseHandle(req->pipeHandle);
1578 req->pipeHandle = INVALID_HANDLE_VALUE;
1579 }
1580 if (!(handle->flags & UV__HANDLE_CLOSING)) {
1581 uv_pipe_queue_accept(loop, handle, req, FALSE);
1582 }
1583 }
1584
1585 DECREASE_PENDING_REQ_COUNT(handle);
1586 }
1587
1588
uv_process_pipe_connect_req(uv_loop_t * loop,uv_pipe_t * handle,uv_connect_t * req)1589 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
1590 uv_connect_t* req) {
1591 int err;
1592
1593 assert(handle->type == UV_NAMED_PIPE);
1594
1595 UNREGISTER_HANDLE_REQ(loop, handle, req);
1596
1597 if (req->cb) {
1598 err = 0;
1599 if (REQ_SUCCESS(req)) {
1600 uv_pipe_connection_init(handle);
1601 } else {
1602 err = GET_REQ_ERROR(req);
1603 }
1604 req->cb(req, uv_translate_sys_error(err));
1605 }
1606
1607 DECREASE_PENDING_REQ_COUNT(handle);
1608 }
1609
1610
uv_process_pipe_shutdown_req(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)1611 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
1612 uv_shutdown_t* req) {
1613 assert(handle->type == UV_NAMED_PIPE);
1614
1615 UNREGISTER_HANDLE_REQ(loop, handle, req);
1616
1617 if (handle->flags & UV_HANDLE_READABLE) {
1618 /* Initialize and optionally start the eof timer. Only do this if the */
1619 /* pipe is readable and we haven't seen EOF come in ourselves. */
1620 eof_timer_init(handle);
1621
1622 /* If reading start the timer right now. */
1623 /* Otherwise uv_pipe_queue_read will start it. */
1624 if (handle->flags & UV_HANDLE_READ_PENDING) {
1625 eof_timer_start(handle);
1626 }
1627
1628 } else {
1629 /* This pipe is not readable. We can just close it to let the other end */
1630 /* know that we're done writing. */
1631 CloseHandle(handle->handle);
1632 handle->handle = INVALID_HANDLE_VALUE;
1633 }
1634
1635 if (req->cb) {
1636 req->cb(req, 0);
1637 }
1638
1639 DECREASE_PENDING_REQ_COUNT(handle);
1640 }
1641
1642
eof_timer_init(uv_pipe_t * pipe)1643 static void eof_timer_init(uv_pipe_t* pipe) {
1644 int r;
1645
1646 assert(pipe->eof_timer == NULL);
1647 assert(pipe->flags & UV_HANDLE_CONNECTION);
1648
1649 pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer);
1650
1651 r = uv_timer_init(pipe->loop, pipe->eof_timer);
1652 assert(r == 0); /* timers can't fail */
1653 pipe->eof_timer->data = pipe;
1654 uv_unref((uv_handle_t*) pipe->eof_timer);
1655 }
1656
1657
eof_timer_start(uv_pipe_t * pipe)1658 static void eof_timer_start(uv_pipe_t* pipe) {
1659 assert(pipe->flags & UV_HANDLE_CONNECTION);
1660
1661 if (pipe->eof_timer != NULL) {
1662 uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0);
1663 }
1664 }
1665
1666
eof_timer_stop(uv_pipe_t * pipe)1667 static void eof_timer_stop(uv_pipe_t* pipe) {
1668 assert(pipe->flags & UV_HANDLE_CONNECTION);
1669
1670 if (pipe->eof_timer != NULL) {
1671 uv_timer_stop(pipe->eof_timer);
1672 }
1673 }
1674
1675
eof_timer_cb(uv_timer_t * timer,int status)1676 static void eof_timer_cb(uv_timer_t* timer, int status) {
1677 uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
1678 uv_loop_t* loop = timer->loop;
1679
1680 assert(status == 0); /* timers can't fail */
1681 assert(pipe->type == UV_NAMED_PIPE);
1682
1683 /* This should always be true, since we start the timer only */
1684 /* in uv_pipe_queue_read after successfully calling ReadFile, */
1685 /* or in uv_process_pipe_shutdown_req if a read is pending, */
1686 /* and we always immediately stop the timer in */
1687 /* uv_process_pipe_read_req. */
1688 assert(pipe->flags & UV_HANDLE_READ_PENDING);
1689
1690 /* If there are many packets coming off the iocp then the timer callback */
1691 /* may be called before the read request is coming off the queue. */
1692 /* Therefore we check here if the read request has completed but will */
1693 /* be processed later. */
1694 if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
1695 HasOverlappedIoCompleted(&pipe->read_req.overlapped)) {
1696 return;
1697 }
1698
1699 /* Force both ends off the pipe. */
1700 CloseHandle(pipe->handle);
1701 pipe->handle = INVALID_HANDLE_VALUE;
1702
1703 /* Stop reading, so the pending read that is going to fail will */
1704 /* not be reported to the user. */
1705 uv_read_stop((uv_stream_t*) pipe);
1706
1707 /* Report the eof and update flags. This will get reported even if the */
1708 /* user stopped reading in the meantime. TODO: is that okay? */
1709 uv_pipe_read_eof(loop, pipe, uv_null_buf_);
1710 }
1711
1712
eof_timer_destroy(uv_pipe_t * pipe)1713 static void eof_timer_destroy(uv_pipe_t* pipe) {
1714 assert(pipe->flags && UV_HANDLE_CONNECTION);
1715
1716 if (pipe->eof_timer) {
1717 uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb);
1718 pipe->eof_timer = NULL;
1719 }
1720 }
1721
1722
eof_timer_close_cb(uv_handle_t * handle)1723 static void eof_timer_close_cb(uv_handle_t* handle) {
1724 assert(handle->type == UV_TIMER);
1725 free(handle);
1726 }
1727
1728
uv_pipe_open(uv_pipe_t * pipe,uv_file file)1729 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
1730 HANDLE os_handle = uv__get_osfhandle(file);
1731
1732 if (os_handle == INVALID_HANDLE_VALUE ||
1733 uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) {
1734 return UV_EINVAL;
1735 }
1736
1737 uv_pipe_connection_init(pipe);
1738 pipe->handle = os_handle;
1739 pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1740
1741 if (pipe->ipc) {
1742 assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1743 pipe->ipc_pid = uv_parent_pid();
1744 assert(pipe->ipc_pid != -1);
1745 }
1746 return 0;
1747 }
1748