1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include <assert.h>
23 #include <io.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 
28 #include "uv.h"
29 #include "internal.h"
30 #include "handle-inl.h"
31 #include "stream-inl.h"
32 #include "req-inl.h"
33 
34 
35 /* A zero-size buffer for use by uv_pipe_read */
36 static char uv_zero_[] = "";
37 
38 /* Null uv_buf_t */
39 static const uv_buf_t uv_null_buf_ = { 0, NULL };
40 
41 /* The timeout that the pipe will wait for the remote end to write data */
42 /* when the local ends wants to shut it down. */
43 static const int64_t eof_timeout = 50; /* ms */
44 
45 static const int default_pending_pipe_instances = 4;
46 
47 /* IPC protocol flags. */
48 #define UV_IPC_RAW_DATA       0x0001
49 #define UV_IPC_TCP_SERVER     0x0002
50 #define UV_IPC_TCP_CONNECTION 0x0004
51 
52 /* IPC frame header. */
53 typedef struct {
54   int flags;
55   uint64_t raw_data_length;
56 } uv_ipc_frame_header_t;
57 
58 /* IPC frame, which contains an imported TCP socket stream. */
59 typedef struct {
60   uv_ipc_frame_header_t header;
61   WSAPROTOCOL_INFOW socket_info;
62 } uv_ipc_frame_uv_stream;
63 
64 static void eof_timer_init(uv_pipe_t* pipe);
65 static void eof_timer_start(uv_pipe_t* pipe);
66 static void eof_timer_stop(uv_pipe_t* pipe);
67 static void eof_timer_cb(uv_timer_t* timer, int status);
68 static void eof_timer_destroy(uv_pipe_t* pipe);
69 static void eof_timer_close_cb(uv_handle_t* handle);
70 
71 
uv_unique_pipe_name(char * ptr,char * name,size_t size)72 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
73   _snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%u", ptr, GetCurrentProcessId());
74 }
75 
76 
uv_pipe_init(uv_loop_t * loop,uv_pipe_t * handle,int ipc)77 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
78   uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
79 
80   handle->reqs_pending = 0;
81   handle->handle = INVALID_HANDLE_VALUE;
82   handle->name = NULL;
83   handle->ipc_pid = 0;
84   handle->remaining_ipc_rawdata_bytes = 0;
85   handle->pending_ipc_info.socket_info = NULL;
86   handle->pending_ipc_info.tcp_connection = 0;
87   handle->ipc = ipc;
88   handle->non_overlapped_writes_tail = NULL;
89 
90   uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
91 
92   return 0;
93 }
94 
95 
uv_pipe_connection_init(uv_pipe_t * handle)96 static void uv_pipe_connection_init(uv_pipe_t* handle) {
97   uv_connection_init((uv_stream_t*) handle);
98   handle->read_req.data = handle;
99   handle->eof_timer = NULL;
100 }
101 
102 
open_named_pipe(WCHAR * name,DWORD * duplex_flags)103 static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
104   HANDLE pipeHandle;
105 
106   /*
107    * Assume that we have a duplex pipe first, so attempt to
108    * connect with GENERIC_READ | GENERIC_WRITE.
109    */
110   pipeHandle = CreateFileW(name,
111                            GENERIC_READ | GENERIC_WRITE,
112                            0,
113                            NULL,
114                            OPEN_EXISTING,
115                            FILE_FLAG_OVERLAPPED,
116                            NULL);
117   if (pipeHandle != INVALID_HANDLE_VALUE) {
118     *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
119     return pipeHandle;
120   }
121 
122   /*
123    * If the pipe is not duplex CreateFileW fails with
124    * ERROR_ACCESS_DENIED.  In that case try to connect
125    * as a read-only or write-only.
126    */
127   if (GetLastError() == ERROR_ACCESS_DENIED) {
128     pipeHandle = CreateFileW(name,
129                              GENERIC_READ | FILE_WRITE_ATTRIBUTES,
130                              0,
131                              NULL,
132                              OPEN_EXISTING,
133                              FILE_FLAG_OVERLAPPED,
134                              NULL);
135 
136     if (pipeHandle != INVALID_HANDLE_VALUE) {
137       *duplex_flags = UV_HANDLE_READABLE;
138       return pipeHandle;
139     }
140   }
141 
142   if (GetLastError() == ERROR_ACCESS_DENIED) {
143     pipeHandle = CreateFileW(name,
144                              GENERIC_WRITE | FILE_READ_ATTRIBUTES,
145                              0,
146                              NULL,
147                              OPEN_EXISTING,
148                              FILE_FLAG_OVERLAPPED,
149                              NULL);
150 
151     if (pipeHandle != INVALID_HANDLE_VALUE) {
152       *duplex_flags = UV_HANDLE_WRITABLE;
153       return pipeHandle;
154     }
155   }
156 
157   return INVALID_HANDLE_VALUE;
158 }
159 
160 
uv_stdio_pipe_server(uv_loop_t * loop,uv_pipe_t * handle,DWORD access,char * name,size_t nameSize)161 int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
162     char* name, size_t nameSize) {
163   HANDLE pipeHandle;
164   int err;
165   char* ptr = (char*)handle;
166 
167   for (;;) {
168     uv_unique_pipe_name(ptr, name, nameSize);
169 
170     pipeHandle = CreateNamedPipeA(name,
171       access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
172       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
173       NULL);
174 
175     if (pipeHandle != INVALID_HANDLE_VALUE) {
176       /* No name collisions.  We're done. */
177       break;
178     }
179 
180     err = GetLastError();
181     if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
182       goto error;
183     }
184 
185     /* Pipe name collision.  Increment the pointer and try again. */
186     ptr++;
187   }
188 
189   if (CreateIoCompletionPort(pipeHandle,
190                              loop->iocp,
191                              (ULONG_PTR)handle,
192                              0) == NULL) {
193     err = GetLastError();
194     goto error;
195   }
196 
197   uv_pipe_connection_init(handle);
198   handle->handle = pipeHandle;
199 
200   return 0;
201 
202  error:
203   if (pipeHandle != INVALID_HANDLE_VALUE) {
204     CloseHandle(pipeHandle);
205   }
206 
207   return err;
208 }
209 
210 
uv_set_pipe_handle(uv_loop_t * loop,uv_pipe_t * handle,HANDLE pipeHandle,DWORD duplex_flags)211 static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
212     HANDLE pipeHandle, DWORD duplex_flags) {
213   NTSTATUS nt_status;
214   IO_STATUS_BLOCK io_status;
215   FILE_MODE_INFORMATION mode_info;
216   DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
217 
218   if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
219     /* If this returns ERROR_INVALID_PARAMETER we probably opened something */
220     /* that is not a pipe. */
221     if (GetLastError() == ERROR_INVALID_PARAMETER) {
222       SetLastError(WSAENOTSOCK);
223     }
224     return -1;
225   }
226 
227   /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
228   nt_status = pNtQueryInformationFile(pipeHandle,
229                                       &io_status,
230                                       &mode_info,
231                                       sizeof(mode_info),
232                                       FileModeInformation);
233   if (nt_status != STATUS_SUCCESS) {
234     return -1;
235   }
236 
237   if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
238       mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
239     /* Non-overlapped pipe. */
240     handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
241   } else {
242     /* Overlapped pipe.  Try to associate with IOCP. */
243     if (CreateIoCompletionPort(pipeHandle,
244                                loop->iocp,
245                                (ULONG_PTR)handle,
246                                0) == NULL) {
247       handle->flags |= UV_HANDLE_EMULATE_IOCP;
248     }
249   }
250 
251   handle->handle = pipeHandle;
252   handle->flags |= duplex_flags;
253 
254   return 0;
255 }
256 
257 
pipe_shutdown_thread_proc(void * parameter)258 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
259   uv_loop_t* loop;
260   uv_pipe_t* handle;
261   uv_shutdown_t* req;
262 
263   req = (uv_shutdown_t*) parameter;
264   assert(req);
265   handle = (uv_pipe_t*) req->handle;
266   assert(handle);
267   loop = handle->loop;
268   assert(loop);
269 
270   FlushFileBuffers(handle->handle);
271 
272   /* Post completed */
273   POST_COMPLETION_FOR_REQ(loop, req);
274 
275   return 0;
276 }
277 
278 
uv_pipe_endgame(uv_loop_t * loop,uv_pipe_t * handle)279 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
280   int err;
281   DWORD result;
282   uv_shutdown_t* req;
283   NTSTATUS nt_status;
284   IO_STATUS_BLOCK io_status;
285   FILE_PIPE_LOCAL_INFORMATION pipe_info;
286 
287   if ((handle->flags & UV_HANDLE_CONNECTION) &&
288       handle->shutdown_req != NULL &&
289       handle->write_reqs_pending == 0) {
290     req = handle->shutdown_req;
291 
292     /* Clear the shutdown_req field so we don't go here again. */
293     handle->shutdown_req = NULL;
294 
295     if (handle->flags & UV__HANDLE_CLOSING) {
296       UNREGISTER_HANDLE_REQ(loop, handle, req);
297 
298       /* Already closing. Cancel the shutdown. */
299       if (req->cb) {
300         req->cb(req, UV_ECANCELED);
301       }
302 
303       DECREASE_PENDING_REQ_COUNT(handle);
304       return;
305     }
306 
307     /* Try to avoid flushing the pipe buffer in the thread pool. */
308     nt_status = pNtQueryInformationFile(handle->handle,
309                                         &io_status,
310                                         &pipe_info,
311                                         sizeof pipe_info,
312                                         FilePipeLocalInformation);
313 
314     if (nt_status != STATUS_SUCCESS) {
315       /* Failure */
316       UNREGISTER_HANDLE_REQ(loop, handle, req);
317 
318       handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
319       if (req->cb) {
320         err = pRtlNtStatusToDosError(nt_status);
321         req->cb(req, uv_translate_sys_error(err));
322       }
323 
324       DECREASE_PENDING_REQ_COUNT(handle);
325       return;
326     }
327 
328     if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
329       /* Short-circuit, no need to call FlushFileBuffers. */
330       uv_insert_pending_req(loop, (uv_req_t*) req);
331       return;
332     }
333 
334     /* Run FlushFileBuffers in the thread pool. */
335     result = QueueUserWorkItem(pipe_shutdown_thread_proc,
336                                req,
337                                WT_EXECUTELONGFUNCTION);
338     if (result) {
339       return;
340 
341     } else {
342       /* Failure. */
343       UNREGISTER_HANDLE_REQ(loop, handle, req);
344 
345       handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
346       if (req->cb) {
347         err = GetLastError();
348         req->cb(req, uv_translate_sys_error(err));
349       }
350 
351       DECREASE_PENDING_REQ_COUNT(handle);
352       return;
353     }
354   }
355 
356   if (handle->flags & UV__HANDLE_CLOSING &&
357       handle->reqs_pending == 0) {
358     assert(!(handle->flags & UV_HANDLE_CLOSED));
359 
360     if (handle->flags & UV_HANDLE_CONNECTION) {
361       if (handle->pending_ipc_info.socket_info) {
362         free(handle->pending_ipc_info.socket_info);
363         handle->pending_ipc_info.socket_info = NULL;
364       }
365 
366       if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
367         if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
368           UnregisterWait(handle->read_req.wait_handle);
369           handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
370         }
371         if (handle->read_req.event_handle) {
372           CloseHandle(handle->read_req.event_handle);
373           handle->read_req.event_handle = NULL;
374         }
375       }
376     }
377 
378     if (handle->flags & UV_HANDLE_PIPESERVER) {
379       assert(handle->accept_reqs);
380       free(handle->accept_reqs);
381       handle->accept_reqs = NULL;
382     }
383 
384     uv__handle_close(handle);
385   }
386 }
387 
388 
uv_pipe_pending_instances(uv_pipe_t * handle,int count)389 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
390   handle->pending_instances = count;
391   handle->flags |= UV_HANDLE_PIPESERVER;
392 }
393 
394 
395 /* Creates a pipe server. */
uv_pipe_bind(uv_pipe_t * handle,const char * name)396 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
397   uv_loop_t* loop = handle->loop;
398   int i, err, nameSize;
399   uv_pipe_accept_t* req;
400 
401   if (handle->flags & UV_HANDLE_BOUND) {
402     return UV_EINVAL;
403   }
404 
405   if (!name) {
406     return UV_EINVAL;
407   }
408 
409   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
410     handle->pending_instances = default_pending_pipe_instances;
411   }
412 
413   handle->accept_reqs = (uv_pipe_accept_t*)
414     malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances);
415   if (!handle->accept_reqs) {
416     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
417   }
418 
419   for (i = 0; i < handle->pending_instances; i++) {
420     req = &handle->accept_reqs[i];
421     uv_req_init(loop, (uv_req_t*) req);
422     req->type = UV_ACCEPT;
423     req->data = handle;
424     req->pipeHandle = INVALID_HANDLE_VALUE;
425     req->next_pending = NULL;
426   }
427 
428   /* Convert name to UTF16. */
429   nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
430   handle->name = (WCHAR*)malloc(nameSize);
431   if (!handle->name) {
432     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
433   }
434 
435   if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
436     return uv_translate_sys_error(GetLastError());
437   }
438 
439   /*
440    * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
441    * If this fails then there's already a pipe server for the given pipe name.
442    */
443   handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
444       PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
445       FILE_FLAG_FIRST_PIPE_INSTANCE,
446       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
447       PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
448 
449   if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
450     err = GetLastError();
451     if (err == ERROR_ACCESS_DENIED) {
452       err = WSAEADDRINUSE;  /* Translates to UV_EADDRINUSE. */
453     } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
454       err = WSAEACCES;  /* Translates to UV_EACCES. */
455     }
456     goto error;
457   }
458 
459   if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) {
460     err = GetLastError();
461     goto error;
462   }
463 
464   handle->pending_accepts = NULL;
465   handle->flags |= UV_HANDLE_PIPESERVER;
466   handle->flags |= UV_HANDLE_BOUND;
467 
468   return 0;
469 
470 error:
471   if (handle->name) {
472     free(handle->name);
473     handle->name = NULL;
474   }
475 
476   if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
477     CloseHandle(handle->accept_reqs[0].pipeHandle);
478     handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
479   }
480 
481   return uv_translate_sys_error(err);
482 }
483 
484 
pipe_connect_thread_proc(void * parameter)485 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
486   uv_loop_t* loop;
487   uv_pipe_t* handle;
488   uv_connect_t* req;
489   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
490   DWORD duplex_flags;
491 
492   req = (uv_connect_t*) parameter;
493   assert(req);
494   handle = (uv_pipe_t*) req->handle;
495   assert(handle);
496   loop = handle->loop;
497   assert(loop);
498 
499   /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
500   /* We wait for the pipe to become available with WaitNamedPipe. */
501   while (WaitNamedPipeW(handle->name, 30000)) {
502     /* The pipe is now available, try to connect. */
503     pipeHandle = open_named_pipe(handle->name, &duplex_flags);
504     if (pipeHandle != INVALID_HANDLE_VALUE) {
505       break;
506     }
507 
508     SwitchToThread();
509   }
510 
511   if (pipeHandle != INVALID_HANDLE_VALUE &&
512       !uv_set_pipe_handle(loop, handle, pipeHandle, duplex_flags)) {
513     SET_REQ_SUCCESS(req);
514   } else {
515     SET_REQ_ERROR(req, GetLastError());
516   }
517 
518   /* Post completed */
519   POST_COMPLETION_FOR_REQ(loop, req);
520 
521   return 0;
522 }
523 
524 
uv_pipe_connect(uv_connect_t * req,uv_pipe_t * handle,const char * name,uv_connect_cb cb)525 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
526     const char* name, uv_connect_cb cb) {
527   uv_loop_t* loop = handle->loop;
528   int err, nameSize;
529   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
530   DWORD duplex_flags;
531 
532   uv_req_init(loop, (uv_req_t*) req);
533   req->type = UV_CONNECT;
534   req->handle = (uv_stream_t*) handle;
535   req->cb = cb;
536 
537   /* Convert name to UTF16. */
538   nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
539   handle->name = (WCHAR*)malloc(nameSize);
540   if (!handle->name) {
541     uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
542   }
543 
544   if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
545     err = GetLastError();
546     goto error;
547   }
548 
549   pipeHandle = open_named_pipe(handle->name, &duplex_flags);
550   if (pipeHandle == INVALID_HANDLE_VALUE) {
551     if (GetLastError() == ERROR_PIPE_BUSY) {
552       /* Wait for the server to make a pipe instance available. */
553       if (!QueueUserWorkItem(&pipe_connect_thread_proc,
554                              req,
555                              WT_EXECUTELONGFUNCTION)) {
556         err = GetLastError();
557         goto error;
558       }
559 
560       REGISTER_HANDLE_REQ(loop, handle, req);
561       handle->reqs_pending++;
562 
563       return;
564     }
565 
566     err = GetLastError();
567     goto error;
568   }
569 
570   assert(pipeHandle != INVALID_HANDLE_VALUE);
571 
572   if (uv_set_pipe_handle(loop,
573                          (uv_pipe_t*) req->handle,
574                          pipeHandle,
575                          duplex_flags)) {
576     err = GetLastError();
577     goto error;
578   }
579 
580   SET_REQ_SUCCESS(req);
581   uv_insert_pending_req(loop, (uv_req_t*) req);
582   handle->reqs_pending++;
583   REGISTER_HANDLE_REQ(loop, handle, req);
584   return;
585 
586 error:
587   if (handle->name) {
588     free(handle->name);
589     handle->name = NULL;
590   }
591 
592   if (pipeHandle != INVALID_HANDLE_VALUE) {
593     CloseHandle(pipeHandle);
594   }
595 
596   /* Make this req pending reporting an error. */
597   SET_REQ_ERROR(req, err);
598   uv_insert_pending_req(loop, (uv_req_t*) req);
599   handle->reqs_pending++;
600   REGISTER_HANDLE_REQ(loop, handle, req);
601   return;
602 }
603 
604 
605 /* Cleans up uv_pipe_t (server or connection) and all resources associated */
606 /* with it. */
uv_pipe_cleanup(uv_loop_t * loop,uv_pipe_t * handle)607 void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
608   int i;
609   HANDLE pipeHandle;
610 
611   if (handle->name) {
612     free(handle->name);
613     handle->name = NULL;
614   }
615 
616   if (handle->flags & UV_HANDLE_PIPESERVER) {
617     for (i = 0; i < handle->pending_instances; i++) {
618       pipeHandle = handle->accept_reqs[i].pipeHandle;
619       if (pipeHandle != INVALID_HANDLE_VALUE) {
620         CloseHandle(pipeHandle);
621         handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
622       }
623     }
624   }
625 
626   if (handle->flags & UV_HANDLE_CONNECTION) {
627     handle->flags &= ~UV_HANDLE_WRITABLE;
628     eof_timer_destroy(handle);
629   }
630 
631   if ((handle->flags & UV_HANDLE_CONNECTION)
632       && handle->handle != INVALID_HANDLE_VALUE) {
633     CloseHandle(handle->handle);
634     handle->handle = INVALID_HANDLE_VALUE;
635   }
636 }
637 
638 
uv_pipe_close(uv_loop_t * loop,uv_pipe_t * handle)639 void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
640   if (handle->flags & UV_HANDLE_READING) {
641     handle->flags &= ~UV_HANDLE_READING;
642     DECREASE_ACTIVE_COUNT(loop, handle);
643   }
644 
645   if (handle->flags & UV_HANDLE_LISTENING) {
646     handle->flags &= ~UV_HANDLE_LISTENING;
647     DECREASE_ACTIVE_COUNT(loop, handle);
648   }
649 
650   uv_pipe_cleanup(loop, handle);
651 
652   if (handle->reqs_pending == 0) {
653     uv_want_endgame(loop, (uv_handle_t*) handle);
654   }
655 
656   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
657   uv__handle_closing(handle);
658 }
659 
660 
uv_pipe_queue_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)661 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
662     uv_pipe_accept_t* req, BOOL firstInstance) {
663   assert(handle->flags & UV_HANDLE_LISTENING);
664 
665   if (!firstInstance) {
666     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
667 
668     req->pipeHandle = CreateNamedPipeW(handle->name,
669         PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
670         PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
671         PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
672 
673     if (req->pipeHandle == INVALID_HANDLE_VALUE) {
674       SET_REQ_ERROR(req, GetLastError());
675       uv_insert_pending_req(loop, (uv_req_t*) req);
676       handle->reqs_pending++;
677       return;
678     }
679 
680     if (uv_set_pipe_handle(loop, handle, req->pipeHandle, 0)) {
681       CloseHandle(req->pipeHandle);
682       req->pipeHandle = INVALID_HANDLE_VALUE;
683       SET_REQ_ERROR(req, GetLastError());
684       uv_insert_pending_req(loop, (uv_req_t*) req);
685       handle->reqs_pending++;
686       return;
687     }
688   }
689 
690   assert(req->pipeHandle != INVALID_HANDLE_VALUE);
691 
692   /* Prepare the overlapped structure. */
693   memset(&(req->overlapped), 0, sizeof(req->overlapped));
694 
695   if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) &&
696       GetLastError() != ERROR_IO_PENDING) {
697     if (GetLastError() == ERROR_PIPE_CONNECTED) {
698       SET_REQ_SUCCESS(req);
699     } else {
700       CloseHandle(req->pipeHandle);
701       req->pipeHandle = INVALID_HANDLE_VALUE;
702       /* Make this req pending reporting an error. */
703       SET_REQ_ERROR(req, GetLastError());
704     }
705     uv_insert_pending_req(loop, (uv_req_t*) req);
706     handle->reqs_pending++;
707     return;
708   }
709 
710   handle->reqs_pending++;
711 }
712 
713 
uv_pipe_accept(uv_pipe_t * server,uv_stream_t * client)714 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
715   uv_loop_t* loop = server->loop;
716   uv_pipe_t* pipe_client;
717   uv_pipe_accept_t* req;
718 
719   if (server->ipc) {
720     if (!server->pending_ipc_info.socket_info) {
721       /* No valid pending sockets. */
722       return WSAEWOULDBLOCK;
723     }
724 
725     return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
726                          server->pending_ipc_info.tcp_connection);
727   } else {
728     pipe_client = (uv_pipe_t*)client;
729 
730     /* Find a connection instance that has been connected, but not yet */
731     /* accepted. */
732     req = server->pending_accepts;
733 
734     if (!req) {
735       /* No valid connections found, so we error out. */
736       return WSAEWOULDBLOCK;
737     }
738 
739     /* Initialize the client handle and copy the pipeHandle to the client */
740     uv_pipe_connection_init(pipe_client);
741     pipe_client->handle = req->pipeHandle;
742     pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
743 
744     /* Prepare the req to pick up a new connection */
745     server->pending_accepts = req->next_pending;
746     req->next_pending = NULL;
747     req->pipeHandle = INVALID_HANDLE_VALUE;
748 
749     if (!(server->flags & UV__HANDLE_CLOSING)) {
750       uv_pipe_queue_accept(loop, server, req, FALSE);
751     }
752   }
753 
754   return 0;
755 }
756 
757 
758 /* Starts listening for connections for the given pipe. */
uv_pipe_listen(uv_pipe_t * handle,int backlog,uv_connection_cb cb)759 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
760   uv_loop_t* loop = handle->loop;
761   int i;
762 
763   if (handle->flags & UV_HANDLE_LISTENING) {
764     handle->connection_cb = cb;
765   }
766 
767   if (!(handle->flags & UV_HANDLE_BOUND)) {
768     return WSAEINVAL;
769   }
770 
771   if (handle->flags & UV_HANDLE_READING) {
772     return WSAEISCONN;
773   }
774 
775   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
776     return ERROR_NOT_SUPPORTED;
777   }
778 
779   handle->flags |= UV_HANDLE_LISTENING;
780   INCREASE_ACTIVE_COUNT(loop, handle);
781   handle->connection_cb = cb;
782 
783   /* First pipe handle should have already been created in uv_pipe_bind */
784   assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
785 
786   for (i = 0; i < handle->pending_instances; i++) {
787     uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
788   }
789 
790   return 0;
791 }
792 
793 
uv_pipe_zero_readfile_thread_proc(void * parameter)794 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
795   int result;
796   DWORD bytes;
797   uv_read_t* req = (uv_read_t*) parameter;
798   uv_pipe_t* handle = (uv_pipe_t*) req->data;
799   uv_loop_t* loop = handle->loop;
800 
801   assert(req != NULL);
802   assert(req->type == UV_READ);
803   assert(handle->type == UV_NAMED_PIPE);
804 
805   result = ReadFile(handle->handle,
806                     &uv_zero_,
807                     0,
808                     &bytes,
809                     NULL);
810 
811   if (!result) {
812     SET_REQ_ERROR(req, GetLastError());
813   }
814 
815   POST_COMPLETION_FOR_REQ(loop, req);
816   return 0;
817 }
818 
819 
uv_pipe_writefile_thread_proc(void * parameter)820 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
821   int result;
822   DWORD bytes;
823   uv_write_t* req = (uv_write_t*) parameter;
824   uv_pipe_t* handle = (uv_pipe_t*) req->handle;
825   uv_loop_t* loop = handle->loop;
826 
827   assert(req != NULL);
828   assert(req->type == UV_WRITE);
829   assert(handle->type == UV_NAMED_PIPE);
830   assert(req->write_buffer.base);
831 
832   result = WriteFile(handle->handle,
833                      req->write_buffer.base,
834                      req->write_buffer.len,
835                      &bytes,
836                      NULL);
837 
838   if (!result) {
839     SET_REQ_ERROR(req, GetLastError());
840   }
841 
842   POST_COMPLETION_FOR_REQ(loop, req);
843   return 0;
844 }
845 
846 
post_completion_read_wait(void * context,BOOLEAN timed_out)847 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
848   uv_read_t* req;
849   uv_tcp_t* handle;
850 
851   req = (uv_read_t*) context;
852   assert(req != NULL);
853   handle = (uv_tcp_t*)req->data;
854   assert(handle != NULL);
855   assert(!timed_out);
856 
857   if (!PostQueuedCompletionStatus(handle->loop->iocp,
858                                   req->overlapped.InternalHigh,
859                                   0,
860                                   &req->overlapped)) {
861     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
862   }
863 }
864 
865 
post_completion_write_wait(void * context,BOOLEAN timed_out)866 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
867   uv_write_t* req;
868   uv_tcp_t* handle;
869 
870   req = (uv_write_t*) context;
871   assert(req != NULL);
872   handle = (uv_tcp_t*)req->handle;
873   assert(handle != NULL);
874   assert(!timed_out);
875 
876   if (!PostQueuedCompletionStatus(handle->loop->iocp,
877                                   req->overlapped.InternalHigh,
878                                   0,
879                                   &req->overlapped)) {
880     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
881   }
882 }
883 
884 
uv_pipe_queue_read(uv_loop_t * loop,uv_pipe_t * handle)885 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
886   uv_read_t* req;
887   int result;
888 
889   assert(handle->flags & UV_HANDLE_READING);
890   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
891 
892   assert(handle->handle != INVALID_HANDLE_VALUE);
893 
894   req = &handle->read_req;
895 
896   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
897     if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
898                            req,
899                            WT_EXECUTELONGFUNCTION)) {
900       /* Make this req pending reporting an error. */
901       SET_REQ_ERROR(req, GetLastError());
902       goto error;
903     }
904   } else {
905     memset(&req->overlapped, 0, sizeof(req->overlapped));
906     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
907       req->overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
908     }
909 
910     /* Do 0-read */
911     result = ReadFile(handle->handle,
912                       &uv_zero_,
913                       0,
914                       NULL,
915                       &req->overlapped);
916 
917     if (!result && GetLastError() != ERROR_IO_PENDING) {
918       /* Make this req pending reporting an error. */
919       SET_REQ_ERROR(req, GetLastError());
920       goto error;
921     }
922 
923     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
924       if (!req->event_handle) {
925         req->event_handle = CreateEvent(NULL, 0, 0, NULL);
926         if (!req->event_handle) {
927           uv_fatal_error(GetLastError(), "CreateEvent");
928         }
929       }
930       if (req->wait_handle == INVALID_HANDLE_VALUE) {
931         if (!RegisterWaitForSingleObject(&req->wait_handle,
932             req->overlapped.hEvent, post_completion_read_wait, (void*) req,
933             INFINITE, WT_EXECUTEINWAITTHREAD)) {
934           SET_REQ_ERROR(req, GetLastError());
935           goto error;
936         }
937       }
938     }
939   }
940 
941   /* Start the eof timer if there is one */
942   eof_timer_start(handle);
943   handle->flags |= UV_HANDLE_READ_PENDING;
944   handle->reqs_pending++;
945   return;
946 
947 error:
948   uv_insert_pending_req(loop, (uv_req_t*)req);
949   handle->flags |= UV_HANDLE_READ_PENDING;
950   handle->reqs_pending++;
951 }
952 
953 
uv_pipe_read_start_impl(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb,uv_read2_cb read2_cb)954 static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
955     uv_read_cb read_cb, uv_read2_cb read2_cb) {
956   uv_loop_t* loop = handle->loop;
957 
958   handle->flags |= UV_HANDLE_READING;
959   INCREASE_ACTIVE_COUNT(loop, handle);
960   handle->read_cb = read_cb;
961   handle->read2_cb = read2_cb;
962   handle->alloc_cb = alloc_cb;
963 
964   /* If reading was stopped and then started again, there could still be a */
965   /* read request pending. */
966   if (!(handle->flags & UV_HANDLE_READ_PENDING))
967     uv_pipe_queue_read(loop, handle);
968 
969   return 0;
970 }
971 
972 
uv_pipe_read_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)973 int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
974     uv_read_cb read_cb) {
975   return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
976 }
977 
978 
uv_pipe_read2_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read2_cb read_cb)979 int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
980     uv_read2_cb read_cb) {
981   return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
982 }
983 
984 
uv_insert_non_overlapped_write_req(uv_pipe_t * handle,uv_write_t * req)985 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
986     uv_write_t* req) {
987   req->next_req = NULL;
988   if (handle->non_overlapped_writes_tail) {
989     req->next_req =
990       handle->non_overlapped_writes_tail->next_req;
991     handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req;
992     handle->non_overlapped_writes_tail = req;
993   } else {
994     req->next_req = (uv_req_t*)req;
995     handle->non_overlapped_writes_tail = req;
996   }
997 }
998 
999 
uv_remove_non_overlapped_write_req(uv_pipe_t * handle)1000 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1001   uv_write_t* req;
1002 
1003   if (handle->non_overlapped_writes_tail) {
1004     req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req;
1005 
1006     if (req == handle->non_overlapped_writes_tail) {
1007       handle->non_overlapped_writes_tail = NULL;
1008     } else {
1009       handle->non_overlapped_writes_tail->next_req =
1010         req->next_req;
1011     }
1012 
1013     return req;
1014   } else {
1015     /* queue empty */
1016     return NULL;
1017   }
1018 }
1019 
1020 
uv_queue_non_overlapped_write(uv_pipe_t * handle)1021 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
1022   uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1023   if (req) {
1024     if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1025                            req,
1026                            WT_EXECUTELONGFUNCTION)) {
1027       uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1028     }
1029   }
1030 }
1031 
1032 
uv_pipe_write_impl(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1033 static int uv_pipe_write_impl(uv_loop_t* loop,
1034                               uv_write_t* req,
1035                               uv_pipe_t* handle,
1036                               const uv_buf_t bufs[],
1037                               unsigned int nbufs,
1038                               uv_stream_t* send_handle,
1039                               uv_write_cb cb) {
1040   int err;
1041   int result;
1042   uv_tcp_t* tcp_send_handle;
1043   uv_write_t* ipc_header_req;
1044   uv_ipc_frame_uv_stream ipc_frame;
1045 
1046   if (nbufs != 1 && (nbufs != 0 || !send_handle)) {
1047     return ERROR_NOT_SUPPORTED;
1048   }
1049 
1050   /* Only TCP handles are supported for sharing. */
1051   if (send_handle && ((send_handle->type != UV_TCP) ||
1052       (!(send_handle->flags & UV_HANDLE_BOUND) &&
1053        !(send_handle->flags & UV_HANDLE_CONNECTION)))) {
1054     return ERROR_NOT_SUPPORTED;
1055   }
1056 
1057   assert(handle->handle != INVALID_HANDLE_VALUE);
1058 
1059   uv_req_init(loop, (uv_req_t*) req);
1060   req->type = UV_WRITE;
1061   req->handle = (uv_stream_t*) handle;
1062   req->cb = cb;
1063   req->ipc_header = 0;
1064   req->event_handle = NULL;
1065   req->wait_handle = INVALID_HANDLE_VALUE;
1066   memset(&req->overlapped, 0, sizeof(req->overlapped));
1067 
1068   if (handle->ipc) {
1069     assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1070     ipc_frame.header.flags = 0;
1071 
1072     /* Use the IPC framing protocol. */
1073     if (send_handle) {
1074       tcp_send_handle = (uv_tcp_t*)send_handle;
1075 
1076       err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
1077           &ipc_frame.socket_info);
1078       if (err) {
1079         return err;
1080       }
1081       ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
1082 
1083       if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
1084         ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
1085       }
1086     }
1087 
1088     if (nbufs == 1) {
1089       ipc_frame.header.flags |= UV_IPC_RAW_DATA;
1090       ipc_frame.header.raw_data_length = bufs[0].len;
1091     }
1092 
1093     /*
1094      * Use the provided req if we're only doing a single write.
1095      * If we're doing multiple writes, use ipc_header_write_req to do
1096      * the first write, and then use the provided req for the second write.
1097      */
1098     if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1099       ipc_header_req = req;
1100     } else {
1101       /*
1102        * Try to use the preallocated write req if it's available.
1103        * Otherwise allocate a new one.
1104        */
1105       if (handle->ipc_header_write_req.type != UV_WRITE) {
1106         ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
1107       } else {
1108         ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
1109         if (!ipc_header_req) {
1110           uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1111         }
1112       }
1113 
1114       uv_req_init(loop, (uv_req_t*) ipc_header_req);
1115       ipc_header_req->type = UV_WRITE;
1116       ipc_header_req->handle = (uv_stream_t*) handle;
1117       ipc_header_req->cb = NULL;
1118       ipc_header_req->ipc_header = 1;
1119     }
1120 
1121     /* Write the header or the whole frame. */
1122     memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
1123 
1124     /* Using overlapped IO, but wait for completion before returning.
1125        This write is blocking because ipc_frame is on stack. */
1126     ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1127     if (!ipc_header_req->overlapped.hEvent) {
1128       uv_fatal_error(GetLastError(), "CreateEvent");
1129     }
1130 
1131     result = WriteFile(handle->handle,
1132                         &ipc_frame,
1133                         ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
1134                           sizeof(ipc_frame) : sizeof(ipc_frame.header),
1135                         NULL,
1136                         &ipc_header_req->overlapped);
1137     if (!result && GetLastError() != ERROR_IO_PENDING) {
1138       err = GetLastError();
1139       CloseHandle(ipc_header_req->overlapped.hEvent);
1140       return err;
1141     }
1142 
1143     if (!result) {
1144       /* Request not completed immediately. Wait for it.*/
1145       if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
1146           WAIT_OBJECT_0) {
1147         err = GetLastError();
1148         CloseHandle(ipc_header_req->overlapped.hEvent);
1149         return err;
1150       }
1151     }
1152     ipc_header_req->queued_bytes = 0;
1153     CloseHandle(ipc_header_req->overlapped.hEvent);
1154     ipc_header_req->overlapped.hEvent = NULL;
1155 
1156     REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
1157     handle->reqs_pending++;
1158     handle->write_reqs_pending++;
1159 
1160     /* If we don't have any raw data to write - we're done. */
1161     if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1162       return 0;
1163     }
1164   }
1165 
1166   if ((handle->flags &
1167       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1168       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1169     DWORD bytes;
1170     result = WriteFile(handle->handle,
1171                        bufs[0].base,
1172                        bufs[0].len,
1173                        &bytes,
1174                        NULL);
1175 
1176     if (!result) {
1177       return err;
1178     } else {
1179       /* Request completed immediately. */
1180       req->queued_bytes = 0;
1181     }
1182 
1183     REGISTER_HANDLE_REQ(loop, handle, req);
1184     handle->reqs_pending++;
1185     handle->write_reqs_pending++;
1186     POST_COMPLETION_FOR_REQ(loop, req);
1187     return 0;
1188   } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1189     req->write_buffer = bufs[0];
1190     uv_insert_non_overlapped_write_req(handle, req);
1191     if (handle->write_reqs_pending == 0) {
1192       uv_queue_non_overlapped_write(handle);
1193     }
1194 
1195     /* Request queued by the kernel. */
1196     req->queued_bytes = uv_count_bufs(bufs, nbufs);
1197     handle->write_queue_size += req->queued_bytes;
1198   } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1199     /* Using overlapped IO, but wait for completion before returning */
1200     req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1201     if (!req->overlapped.hEvent) {
1202       uv_fatal_error(GetLastError(), "CreateEvent");
1203     }
1204 
1205     result = WriteFile(handle->handle,
1206                        bufs[0].base,
1207                        bufs[0].len,
1208                        NULL,
1209                        &req->overlapped);
1210 
1211     if (!result && GetLastError() != ERROR_IO_PENDING) {
1212       err = GetLastError();
1213       CloseHandle(req->overlapped.hEvent);
1214       return err;
1215     }
1216 
1217     if (result) {
1218       /* Request completed immediately. */
1219       req->queued_bytes = 0;
1220     } else {
1221       /* Request queued by the kernel. */
1222       if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
1223           WAIT_OBJECT_0) {
1224         err = GetLastError();
1225         CloseHandle(ipc_header_req->overlapped.hEvent);
1226         return uv_translate_sys_error(err);
1227       }
1228     }
1229     CloseHandle(req->overlapped.hEvent);
1230 
1231     REGISTER_HANDLE_REQ(loop, handle, req);
1232     handle->reqs_pending++;
1233     handle->write_reqs_pending++;
1234     POST_COMPLETION_FOR_REQ(loop, req);
1235     return 0;
1236   } else {
1237     result = WriteFile(handle->handle,
1238                        bufs[0].base,
1239                        bufs[0].len,
1240                        NULL,
1241                        &req->overlapped);
1242 
1243     if (!result && GetLastError() != ERROR_IO_PENDING) {
1244       return GetLastError();
1245     }
1246 
1247     if (result) {
1248       /* Request completed immediately. */
1249       req->queued_bytes = 0;
1250     } else {
1251       /* Request queued by the kernel. */
1252       req->queued_bytes = uv_count_bufs(bufs, nbufs);
1253       handle->write_queue_size += req->queued_bytes;
1254     }
1255 
1256     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1257       req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1258       if (!req->event_handle) {
1259         uv_fatal_error(GetLastError(), "CreateEvent");
1260       }
1261       if (!RegisterWaitForSingleObject(&req->wait_handle,
1262           req->overlapped.hEvent, post_completion_write_wait, (void*) req,
1263           INFINITE, WT_EXECUTEINWAITTHREAD)) {
1264         return GetLastError();
1265       }
1266     }
1267   }
1268 
1269   REGISTER_HANDLE_REQ(loop, handle, req);
1270   handle->reqs_pending++;
1271   handle->write_reqs_pending++;
1272 
1273   return 0;
1274 }
1275 
1276 
uv_pipe_write(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)1277 int uv_pipe_write(uv_loop_t* loop,
1278                   uv_write_t* req,
1279                   uv_pipe_t* handle,
1280                   const uv_buf_t bufs[],
1281                   unsigned int nbufs,
1282                   uv_write_cb cb) {
1283   return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
1284 }
1285 
1286 
uv_pipe_write2(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1287 int uv_pipe_write2(uv_loop_t* loop,
1288                    uv_write_t* req,
1289                    uv_pipe_t* handle,
1290                    const uv_buf_t bufs[],
1291                    unsigned int nbufs,
1292                    uv_stream_t* send_handle,
1293                    uv_write_cb cb) {
1294   if (!handle->ipc) {
1295     return WSAEINVAL;
1296   }
1297 
1298   return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
1299 }
1300 
1301 
uv_pipe_read_eof(uv_loop_t * loop,uv_pipe_t * handle,uv_buf_t buf)1302 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1303     uv_buf_t buf) {
1304   /* If there is an eof timer running, we don't need it any more, */
1305   /* so discard it. */
1306   eof_timer_destroy(handle);
1307 
1308   handle->flags &= ~UV_HANDLE_READABLE;
1309   uv_read_stop((uv_stream_t*) handle);
1310 
1311   if (handle->read2_cb) {
1312     handle->read2_cb(handle, UV_EOF, &uv_null_buf_, UV_UNKNOWN_HANDLE);
1313   } else {
1314     handle->read_cb((uv_stream_t*) handle, UV_EOF, &uv_null_buf_);
1315   }
1316 }
1317 
1318 
uv_pipe_read_error(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1319 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1320     uv_buf_t buf) {
1321   /* If there is an eof timer running, we don't need it any more, */
1322   /* so discard it. */
1323   eof_timer_destroy(handle);
1324 
1325   uv_read_stop((uv_stream_t*) handle);
1326 
1327   if (handle->read2_cb) {
1328     handle->read2_cb(handle,
1329                      uv_translate_sys_error(error),
1330                      &buf,
1331                      UV_UNKNOWN_HANDLE);
1332   } else {
1333     handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1334   }
1335 }
1336 
1337 
uv_pipe_read_error_or_eof(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1338 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1339     int error, uv_buf_t buf) {
1340   if (error == ERROR_BROKEN_PIPE) {
1341     uv_pipe_read_eof(loop, handle, buf);
1342   } else {
1343     uv_pipe_read_error(loop, handle, error, buf);
1344   }
1345 }
1346 
1347 
uv_process_pipe_read_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * req)1348 void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
1349     uv_req_t* req) {
1350   DWORD bytes, avail;
1351   uv_buf_t buf;
1352   uv_ipc_frame_uv_stream ipc_frame;
1353 
1354   assert(handle->type == UV_NAMED_PIPE);
1355 
1356   handle->flags &= ~UV_HANDLE_READ_PENDING;
1357   eof_timer_stop(handle);
1358 
1359   if (!REQ_SUCCESS(req)) {
1360     /* An error occurred doing the 0-read. */
1361     if (handle->flags & UV_HANDLE_READING) {
1362       uv_pipe_read_error_or_eof(loop,
1363                                 handle,
1364                                 GET_REQ_ERROR(req),
1365                                 uv_null_buf_);
1366     }
1367   } else {
1368     /* Do non-blocking reads until the buffer is empty */
1369     while (handle->flags & UV_HANDLE_READING) {
1370       if (!PeekNamedPipe(handle->handle,
1371                           NULL,
1372                           0,
1373                           NULL,
1374                           &avail,
1375                           NULL)) {
1376         uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1377         break;
1378       }
1379 
1380       if (avail == 0) {
1381         /* There is nothing to read after all. */
1382         break;
1383       }
1384 
1385       if (handle->ipc) {
1386         /* Use the IPC framing protocol to read the incoming data. */
1387         if (handle->remaining_ipc_rawdata_bytes == 0) {
1388           /* We're reading a new frame.  First, read the header. */
1389           assert(avail >= sizeof(ipc_frame.header));
1390 
1391           if (!ReadFile(handle->handle,
1392                         &ipc_frame.header,
1393                         sizeof(ipc_frame.header),
1394                         &bytes,
1395                         NULL)) {
1396             uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1397               uv_null_buf_);
1398             break;
1399           }
1400 
1401           assert(bytes == sizeof(ipc_frame.header));
1402           assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
1403             UV_IPC_TCP_CONNECTION));
1404 
1405           if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
1406             assert(avail - sizeof(ipc_frame.header) >=
1407               sizeof(ipc_frame.socket_info));
1408 
1409             /* Read the TCP socket info. */
1410             if (!ReadFile(handle->handle,
1411                           &ipc_frame.socket_info,
1412                           sizeof(ipc_frame) - sizeof(ipc_frame.header),
1413                           &bytes,
1414                           NULL)) {
1415               uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1416                 uv_null_buf_);
1417               break;
1418             }
1419 
1420             assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
1421 
1422             /* Store the pending socket info. */
1423             assert(!handle->pending_ipc_info.socket_info);
1424             handle->pending_ipc_info.socket_info =
1425               (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
1426             if (!handle->pending_ipc_info.socket_info) {
1427               uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
1428             }
1429 
1430             *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
1431             handle->pending_ipc_info.tcp_connection =
1432               ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
1433           }
1434 
1435           if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
1436             handle->remaining_ipc_rawdata_bytes =
1437               ipc_frame.header.raw_data_length;
1438             continue;
1439           }
1440         } else {
1441           avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
1442         }
1443       }
1444 
1445       handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
1446       if (buf.len == 0) {
1447         if (handle->read2_cb) {
1448           handle->read2_cb(handle, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE);
1449         } else if (handle->read_cb) {
1450           handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1451         }
1452         break;
1453       }
1454       assert(buf.base != NULL);
1455 
1456       if (ReadFile(handle->handle,
1457                    buf.base,
1458                    buf.len,
1459                    &bytes,
1460                    NULL)) {
1461         /* Successful read */
1462         if (handle->ipc) {
1463           assert(handle->remaining_ipc_rawdata_bytes >= bytes);
1464           handle->remaining_ipc_rawdata_bytes =
1465             handle->remaining_ipc_rawdata_bytes - bytes;
1466           if (handle->read2_cb) {
1467             handle->read2_cb(handle, bytes, &buf,
1468               handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
1469           } else if (handle->read_cb) {
1470             handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1471           }
1472 
1473           if (handle->pending_ipc_info.socket_info) {
1474             free(handle->pending_ipc_info.socket_info);
1475             handle->pending_ipc_info.socket_info = NULL;
1476           }
1477         } else {
1478           handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1479         }
1480 
1481         /* Read again only if bytes == buf.len */
1482         if (bytes <= buf.len) {
1483           break;
1484         }
1485       } else {
1486         uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1487         break;
1488       }
1489     }
1490 
1491     /* Post another 0-read if still reading and not closing. */
1492     if ((handle->flags & UV_HANDLE_READING) &&
1493         !(handle->flags & UV_HANDLE_READ_PENDING)) {
1494       uv_pipe_queue_read(loop, handle);
1495     }
1496   }
1497 
1498   DECREASE_PENDING_REQ_COUNT(handle);
1499 }
1500 
1501 
uv_process_pipe_write_req(uv_loop_t * loop,uv_pipe_t * handle,uv_write_t * req)1502 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
1503     uv_write_t* req) {
1504   int err;
1505 
1506   assert(handle->type == UV_NAMED_PIPE);
1507 
1508   assert(handle->write_queue_size >= req->queued_bytes);
1509   handle->write_queue_size -= req->queued_bytes;
1510 
1511   UNREGISTER_HANDLE_REQ(loop, handle, req);
1512 
1513   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1514     if (req->wait_handle != INVALID_HANDLE_VALUE) {
1515       UnregisterWait(req->wait_handle);
1516       req->wait_handle = INVALID_HANDLE_VALUE;
1517     }
1518     if (req->event_handle) {
1519       CloseHandle(req->event_handle);
1520       req->event_handle = NULL;
1521     }
1522   }
1523 
1524   if (req->ipc_header) {
1525     if (req == &handle->ipc_header_write_req) {
1526       req->type = UV_UNKNOWN_REQ;
1527     } else {
1528       free(req);
1529     }
1530   } else {
1531     if (req->cb) {
1532       err = GET_REQ_ERROR(req);
1533       req->cb(req, uv_translate_sys_error(err));
1534     }
1535   }
1536 
1537   handle->write_reqs_pending--;
1538 
1539   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
1540       handle->non_overlapped_writes_tail) {
1541     assert(handle->write_reqs_pending > 0);
1542     uv_queue_non_overlapped_write(handle);
1543   }
1544 
1545   if (handle->shutdown_req != NULL &&
1546       handle->write_reqs_pending == 0) {
1547     uv_want_endgame(loop, (uv_handle_t*)handle);
1548   }
1549 
1550   DECREASE_PENDING_REQ_COUNT(handle);
1551 }
1552 
1553 
uv_process_pipe_accept_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * raw_req)1554 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
1555     uv_req_t* raw_req) {
1556   uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
1557 
1558   assert(handle->type == UV_NAMED_PIPE);
1559 
1560   if (handle->flags & UV__HANDLE_CLOSING) {
1561     /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
1562     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
1563     DECREASE_PENDING_REQ_COUNT(handle);
1564     return;
1565   }
1566 
1567   if (REQ_SUCCESS(req)) {
1568     assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1569     req->next_pending = handle->pending_accepts;
1570     handle->pending_accepts = req;
1571 
1572     if (handle->connection_cb) {
1573       handle->connection_cb((uv_stream_t*)handle, 0);
1574     }
1575   } else {
1576     if (req->pipeHandle != INVALID_HANDLE_VALUE) {
1577       CloseHandle(req->pipeHandle);
1578       req->pipeHandle = INVALID_HANDLE_VALUE;
1579     }
1580     if (!(handle->flags & UV__HANDLE_CLOSING)) {
1581       uv_pipe_queue_accept(loop, handle, req, FALSE);
1582     }
1583   }
1584 
1585   DECREASE_PENDING_REQ_COUNT(handle);
1586 }
1587 
1588 
uv_process_pipe_connect_req(uv_loop_t * loop,uv_pipe_t * handle,uv_connect_t * req)1589 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
1590     uv_connect_t* req) {
1591   int err;
1592 
1593   assert(handle->type == UV_NAMED_PIPE);
1594 
1595   UNREGISTER_HANDLE_REQ(loop, handle, req);
1596 
1597   if (req->cb) {
1598     err = 0;
1599     if (REQ_SUCCESS(req)) {
1600       uv_pipe_connection_init(handle);
1601     } else {
1602       err = GET_REQ_ERROR(req);
1603     }
1604     req->cb(req, uv_translate_sys_error(err));
1605   }
1606 
1607   DECREASE_PENDING_REQ_COUNT(handle);
1608 }
1609 
1610 
uv_process_pipe_shutdown_req(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)1611 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
1612     uv_shutdown_t* req) {
1613   assert(handle->type == UV_NAMED_PIPE);
1614 
1615   UNREGISTER_HANDLE_REQ(loop, handle, req);
1616 
1617   if (handle->flags & UV_HANDLE_READABLE) {
1618     /* Initialize and optionally start the eof timer. Only do this if the */
1619     /* pipe is readable and we haven't seen EOF come in ourselves. */
1620     eof_timer_init(handle);
1621 
1622     /* If reading start the timer right now. */
1623     /* Otherwise uv_pipe_queue_read will start it. */
1624     if (handle->flags & UV_HANDLE_READ_PENDING) {
1625       eof_timer_start(handle);
1626     }
1627 
1628   } else {
1629     /* This pipe is not readable. We can just close it to let the other end */
1630     /* know that we're done writing. */
1631     CloseHandle(handle->handle);
1632     handle->handle = INVALID_HANDLE_VALUE;
1633   }
1634 
1635   if (req->cb) {
1636     req->cb(req, 0);
1637   }
1638 
1639   DECREASE_PENDING_REQ_COUNT(handle);
1640 }
1641 
1642 
eof_timer_init(uv_pipe_t * pipe)1643 static void eof_timer_init(uv_pipe_t* pipe) {
1644   int r;
1645 
1646   assert(pipe->eof_timer == NULL);
1647   assert(pipe->flags & UV_HANDLE_CONNECTION);
1648 
1649   pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer);
1650 
1651   r = uv_timer_init(pipe->loop, pipe->eof_timer);
1652   assert(r == 0); /* timers can't fail */
1653   pipe->eof_timer->data = pipe;
1654   uv_unref((uv_handle_t*) pipe->eof_timer);
1655 }
1656 
1657 
eof_timer_start(uv_pipe_t * pipe)1658 static void eof_timer_start(uv_pipe_t* pipe) {
1659   assert(pipe->flags & UV_HANDLE_CONNECTION);
1660 
1661   if (pipe->eof_timer != NULL) {
1662     uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0);
1663   }
1664 }
1665 
1666 
eof_timer_stop(uv_pipe_t * pipe)1667 static void eof_timer_stop(uv_pipe_t* pipe) {
1668   assert(pipe->flags & UV_HANDLE_CONNECTION);
1669 
1670   if (pipe->eof_timer != NULL) {
1671     uv_timer_stop(pipe->eof_timer);
1672   }
1673 }
1674 
1675 
eof_timer_cb(uv_timer_t * timer,int status)1676 static void eof_timer_cb(uv_timer_t* timer, int status) {
1677   uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
1678   uv_loop_t* loop = timer->loop;
1679 
1680   assert(status == 0); /* timers can't fail */
1681   assert(pipe->type == UV_NAMED_PIPE);
1682 
1683   /* This should always be true, since we start the timer only */
1684   /* in uv_pipe_queue_read after successfully calling ReadFile, */
1685   /* or in uv_process_pipe_shutdown_req if a read is pending, */
1686   /* and we always immediately stop the timer in */
1687   /* uv_process_pipe_read_req. */
1688   assert(pipe->flags & UV_HANDLE_READ_PENDING);
1689 
1690   /* If there are many packets coming off the iocp then the timer callback */
1691   /* may be called before the read request is coming off the queue. */
1692   /* Therefore we check here if the read request has completed but will */
1693   /* be processed later. */
1694   if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
1695       HasOverlappedIoCompleted(&pipe->read_req.overlapped)) {
1696     return;
1697   }
1698 
1699   /* Force both ends off the pipe. */
1700   CloseHandle(pipe->handle);
1701   pipe->handle = INVALID_HANDLE_VALUE;
1702 
1703   /* Stop reading, so the pending read that is going to fail will */
1704   /* not be reported to the user. */
1705   uv_read_stop((uv_stream_t*) pipe);
1706 
1707   /* Report the eof and update flags. This will get reported even if the */
1708   /* user stopped reading in the meantime. TODO: is that okay? */
1709   uv_pipe_read_eof(loop, pipe, uv_null_buf_);
1710 }
1711 
1712 
eof_timer_destroy(uv_pipe_t * pipe)1713 static void eof_timer_destroy(uv_pipe_t* pipe) {
1714   assert(pipe->flags && UV_HANDLE_CONNECTION);
1715 
1716   if (pipe->eof_timer) {
1717     uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb);
1718     pipe->eof_timer = NULL;
1719   }
1720 }
1721 
1722 
eof_timer_close_cb(uv_handle_t * handle)1723 static void eof_timer_close_cb(uv_handle_t* handle) {
1724   assert(handle->type == UV_TIMER);
1725   free(handle);
1726 }
1727 
1728 
uv_pipe_open(uv_pipe_t * pipe,uv_file file)1729 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
1730   HANDLE os_handle = uv__get_osfhandle(file);
1731 
1732   if (os_handle == INVALID_HANDLE_VALUE ||
1733       uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) {
1734     return UV_EINVAL;
1735   }
1736 
1737   uv_pipe_connection_init(pipe);
1738   pipe->handle = os_handle;
1739   pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1740 
1741   if (pipe->ipc) {
1742     assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1743     pipe->ipc_pid = uv_parent_pid();
1744     assert(pipe->ipc_pid != -1);
1745   }
1746   return 0;
1747 }
1748