1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include <assert.h>
23 #include <io.h>
24 #include <string.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 
28 #include "uv.h"
29 #include "internal.h"
30 #include "handle-inl.h"
31 #include "stream-inl.h"
32 #include "req-inl.h"
33 
34 #include <aclapi.h>
35 #include <accctrl.h>
36 
37 typedef struct uv__ipc_queue_item_s uv__ipc_queue_item_t;
38 
39 struct uv__ipc_queue_item_s {
40   /*
41    * NOTE: It is important for socket_info_ex to be the first field,
42    * because we will we assigning it to the pending_ipc_info.socket_info
43    */
44   uv__ipc_socket_info_ex socket_info_ex;
45   QUEUE member;
46   int tcp_connection;
47 };
48 
49 /* A zero-size buffer for use by uv_pipe_read */
50 static char uv_zero_[] = "";
51 
52 /* Null uv_buf_t */
53 static const uv_buf_t uv_null_buf_ = { 0, NULL };
54 
55 /* The timeout that the pipe will wait for the remote end to write data */
56 /* when the local ends wants to shut it down. */
57 static const int64_t eof_timeout = 50; /* ms */
58 
59 static const int default_pending_pipe_instances = 4;
60 
61 /* Pipe prefix */
62 static char pipe_prefix[] = "\\\\?\\pipe";
63 static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
64 
65 /* IPC protocol flags. */
66 #define UV_IPC_RAW_DATA       0x0001
67 #define UV_IPC_TCP_SERVER     0x0002
68 #define UV_IPC_TCP_CONNECTION 0x0004
69 
70 /* IPC frame header. */
71 typedef struct {
72   int flags;
73   uint64_t raw_data_length;
74 } uv_ipc_frame_header_t;
75 
76 /* IPC frame, which contains an imported TCP socket stream. */
77 typedef struct {
78   uv_ipc_frame_header_t header;
79   uv__ipc_socket_info_ex socket_info_ex;
80 } uv_ipc_frame_uv_stream;
81 
82 static void eof_timer_init(uv_pipe_t* pipe);
83 static void eof_timer_start(uv_pipe_t* pipe);
84 static void eof_timer_stop(uv_pipe_t* pipe);
85 static void eof_timer_cb(uv_timer_t* timer);
86 static void eof_timer_destroy(uv_pipe_t* pipe);
87 static void eof_timer_close_cb(uv_handle_t* handle);
88 
89 
uv_unique_pipe_name(char * ptr,char * name,size_t size)90 static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
91   snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
92 }
93 
94 
uv_pipe_init(uv_loop_t * loop,uv_pipe_t * handle,int ipc)95 int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
96   uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
97 
98   handle->reqs_pending = 0;
99   handle->handle = INVALID_HANDLE_VALUE;
100   handle->name = NULL;
101   handle->pipe.conn.ipc_pid = 0;
102   handle->pipe.conn.remaining_ipc_rawdata_bytes = 0;
103   QUEUE_INIT(&handle->pipe.conn.pending_ipc_info.queue);
104   handle->pipe.conn.pending_ipc_info.queue_len = 0;
105   handle->ipc = ipc;
106   handle->pipe.conn.non_overlapped_writes_tail = NULL;
107   handle->pipe.conn.readfile_thread = NULL;
108 
109   UV_REQ_INIT(&handle->pipe.conn.ipc_header_write_req, UV_UNKNOWN_REQ);
110 
111   return 0;
112 }
113 
114 
uv_pipe_connection_init(uv_pipe_t * handle)115 static void uv_pipe_connection_init(uv_pipe_t* handle) {
116   uv_connection_init((uv_stream_t*) handle);
117   handle->read_req.data = handle;
118   handle->pipe.conn.eof_timer = NULL;
119   assert(!(handle->flags & UV_HANDLE_PIPESERVER));
120   if (pCancelSynchronousIo &&
121       handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
122       uv_mutex_init(&handle->pipe.conn.readfile_mutex);
123       handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE;
124   }
125 }
126 
127 
open_named_pipe(const WCHAR * name,DWORD * duplex_flags)128 static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
129   HANDLE pipeHandle;
130 
131   /*
132    * Assume that we have a duplex pipe first, so attempt to
133    * connect with GENERIC_READ | GENERIC_WRITE.
134    */
135   pipeHandle = CreateFileW(name,
136                            GENERIC_READ | GENERIC_WRITE,
137                            0,
138                            NULL,
139                            OPEN_EXISTING,
140                            FILE_FLAG_OVERLAPPED,
141                            NULL);
142   if (pipeHandle != INVALID_HANDLE_VALUE) {
143     *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
144     return pipeHandle;
145   }
146 
147   /*
148    * If the pipe is not duplex CreateFileW fails with
149    * ERROR_ACCESS_DENIED.  In that case try to connect
150    * as a read-only or write-only.
151    */
152   if (GetLastError() == ERROR_ACCESS_DENIED) {
153     pipeHandle = CreateFileW(name,
154                              GENERIC_READ | FILE_WRITE_ATTRIBUTES,
155                              0,
156                              NULL,
157                              OPEN_EXISTING,
158                              FILE_FLAG_OVERLAPPED,
159                              NULL);
160 
161     if (pipeHandle != INVALID_HANDLE_VALUE) {
162       *duplex_flags = UV_HANDLE_READABLE;
163       return pipeHandle;
164     }
165   }
166 
167   if (GetLastError() == ERROR_ACCESS_DENIED) {
168     pipeHandle = CreateFileW(name,
169                              GENERIC_WRITE | FILE_READ_ATTRIBUTES,
170                              0,
171                              NULL,
172                              OPEN_EXISTING,
173                              FILE_FLAG_OVERLAPPED,
174                              NULL);
175 
176     if (pipeHandle != INVALID_HANDLE_VALUE) {
177       *duplex_flags = UV_HANDLE_WRITABLE;
178       return pipeHandle;
179     }
180   }
181 
182   return INVALID_HANDLE_VALUE;
183 }
184 
185 
close_pipe(uv_pipe_t * pipe)186 static void close_pipe(uv_pipe_t* pipe) {
187   assert(pipe->u.fd == -1 || pipe->u.fd > 2);
188   if (pipe->u.fd == -1)
189     CloseHandle(pipe->handle);
190   else
191     close(pipe->u.fd);
192 
193   pipe->u.fd = -1;
194   pipe->handle = INVALID_HANDLE_VALUE;
195 }
196 
197 
uv_stdio_pipe_server(uv_loop_t * loop,uv_pipe_t * handle,DWORD access,char * name,size_t nameSize)198 int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
199     char* name, size_t nameSize) {
200   HANDLE pipeHandle;
201   int err;
202   char* ptr = (char*)handle;
203 
204   for (;;) {
205     uv_unique_pipe_name(ptr, name, nameSize);
206 
207     pipeHandle = CreateNamedPipeA(name,
208       access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE | WRITE_DAC,
209       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
210       NULL);
211 
212     if (pipeHandle != INVALID_HANDLE_VALUE) {
213       /* No name collisions.  We're done. */
214       break;
215     }
216 
217     err = GetLastError();
218     if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
219       goto error;
220     }
221 
222     /* Pipe name collision.  Increment the pointer and try again. */
223     ptr++;
224   }
225 
226   if (CreateIoCompletionPort(pipeHandle,
227                              loop->iocp,
228                              (ULONG_PTR)handle,
229                              0) == NULL) {
230     err = GetLastError();
231     goto error;
232   }
233 
234   uv_pipe_connection_init(handle);
235   handle->handle = pipeHandle;
236 
237   return 0;
238 
239  error:
240   if (pipeHandle != INVALID_HANDLE_VALUE) {
241     CloseHandle(pipeHandle);
242   }
243 
244   return err;
245 }
246 
247 
uv_set_pipe_handle(uv_loop_t * loop,uv_pipe_t * handle,HANDLE pipeHandle,int fd,DWORD duplex_flags)248 static int uv_set_pipe_handle(uv_loop_t* loop,
249                               uv_pipe_t* handle,
250                               HANDLE pipeHandle,
251                               int fd,
252                               DWORD duplex_flags) {
253   NTSTATUS nt_status;
254   IO_STATUS_BLOCK io_status;
255   FILE_MODE_INFORMATION mode_info;
256   DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
257   DWORD current_mode = 0;
258   DWORD err = 0;
259 
260   if (!(handle->flags & UV_HANDLE_PIPESERVER) &&
261       handle->handle != INVALID_HANDLE_VALUE)
262     return UV_EBUSY;
263 
264   if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
265     err = GetLastError();
266     if (err == ERROR_ACCESS_DENIED) {
267       /*
268        * SetNamedPipeHandleState can fail if the handle doesn't have either
269        * GENERIC_WRITE  or FILE_WRITE_ATTRIBUTES.
270        * But if the handle already has the desired wait and blocking modes
271        * we can continue.
272        */
273       if (!GetNamedPipeHandleState(pipeHandle, &current_mode, NULL, NULL,
274                                    NULL, NULL, 0)) {
275         return -1;
276       } else if (current_mode & PIPE_NOWAIT) {
277         SetLastError(ERROR_ACCESS_DENIED);
278         return -1;
279       }
280     } else {
281       /* If this returns ERROR_INVALID_PARAMETER we probably opened
282        * something that is not a pipe. */
283       if (err == ERROR_INVALID_PARAMETER) {
284         SetLastError(WSAENOTSOCK);
285       }
286       return -1;
287     }
288   }
289 
290   /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
291   nt_status = pNtQueryInformationFile(pipeHandle,
292                                       &io_status,
293                                       &mode_info,
294                                       sizeof(mode_info),
295                                       FileModeInformation);
296   if (nt_status != STATUS_SUCCESS) {
297     return -1;
298   }
299 
300   if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
301       mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
302     /* Non-overlapped pipe. */
303     handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
304   } else {
305     /* Overlapped pipe.  Try to associate with IOCP. */
306     if (CreateIoCompletionPort(pipeHandle,
307                                loop->iocp,
308                                (ULONG_PTR)handle,
309                                0) == NULL) {
310       handle->flags |= UV_HANDLE_EMULATE_IOCP;
311     }
312   }
313 
314   handle->handle = pipeHandle;
315   handle->u.fd = fd;
316   handle->flags |= duplex_flags;
317 
318   return 0;
319 }
320 
321 
pipe_shutdown_thread_proc(void * parameter)322 static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
323   uv_loop_t* loop;
324   uv_pipe_t* handle;
325   uv_shutdown_t* req;
326 
327   req = (uv_shutdown_t*) parameter;
328   assert(req);
329   handle = (uv_pipe_t*) req->handle;
330   assert(handle);
331   loop = handle->loop;
332   assert(loop);
333 
334   FlushFileBuffers(handle->handle);
335 
336   /* Post completed */
337   POST_COMPLETION_FOR_REQ(loop, req);
338 
339   return 0;
340 }
341 
342 
uv_pipe_endgame(uv_loop_t * loop,uv_pipe_t * handle)343 void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
344   int err;
345   DWORD result;
346   uv_shutdown_t* req;
347   NTSTATUS nt_status;
348   IO_STATUS_BLOCK io_status;
349   FILE_PIPE_LOCAL_INFORMATION pipe_info;
350   uv__ipc_queue_item_t* item;
351 
352   if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
353     handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE;
354     uv_mutex_destroy(&handle->pipe.conn.readfile_mutex);
355   }
356 
357   if ((handle->flags & UV_HANDLE_CONNECTION) &&
358       handle->stream.conn.shutdown_req != NULL &&
359       handle->stream.conn.write_reqs_pending == 0) {
360     req = handle->stream.conn.shutdown_req;
361 
362     /* Clear the shutdown_req field so we don't go here again. */
363     handle->stream.conn.shutdown_req = NULL;
364 
365     if (handle->flags & UV__HANDLE_CLOSING) {
366       UNREGISTER_HANDLE_REQ(loop, handle, req);
367 
368       /* Already closing. Cancel the shutdown. */
369       if (req->cb) {
370         req->cb(req, UV_ECANCELED);
371       }
372 
373       DECREASE_PENDING_REQ_COUNT(handle);
374       return;
375     }
376 
377     /* Try to avoid flushing the pipe buffer in the thread pool. */
378     nt_status = pNtQueryInformationFile(handle->handle,
379                                         &io_status,
380                                         &pipe_info,
381                                         sizeof pipe_info,
382                                         FilePipeLocalInformation);
383 
384     if (nt_status != STATUS_SUCCESS) {
385       /* Failure */
386       UNREGISTER_HANDLE_REQ(loop, handle, req);
387 
388       handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
389       if (req->cb) {
390         err = pRtlNtStatusToDosError(nt_status);
391         req->cb(req, uv_translate_sys_error(err));
392       }
393 
394       DECREASE_PENDING_REQ_COUNT(handle);
395       return;
396     }
397 
398     if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
399       /* Short-circuit, no need to call FlushFileBuffers. */
400       uv_insert_pending_req(loop, (uv_req_t*) req);
401       return;
402     }
403 
404     /* Run FlushFileBuffers in the thread pool. */
405     result = QueueUserWorkItem(pipe_shutdown_thread_proc,
406                                req,
407                                WT_EXECUTELONGFUNCTION);
408     if (result) {
409       return;
410 
411     } else {
412       /* Failure. */
413       UNREGISTER_HANDLE_REQ(loop, handle, req);
414 
415       handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
416       if (req->cb) {
417         err = GetLastError();
418         req->cb(req, uv_translate_sys_error(err));
419       }
420 
421       DECREASE_PENDING_REQ_COUNT(handle);
422       return;
423     }
424   }
425 
426   if (handle->flags & UV__HANDLE_CLOSING &&
427       handle->reqs_pending == 0) {
428     assert(!(handle->flags & UV_HANDLE_CLOSED));
429 
430     if (handle->flags & UV_HANDLE_CONNECTION) {
431       /* Free pending sockets */
432       while (!QUEUE_EMPTY(&handle->pipe.conn.pending_ipc_info.queue)) {
433         QUEUE* q;
434         SOCKET socket;
435 
436         q = QUEUE_HEAD(&handle->pipe.conn.pending_ipc_info.queue);
437         QUEUE_REMOVE(q);
438         item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
439 
440         /* Materialize socket and close it */
441         socket = WSASocketW(FROM_PROTOCOL_INFO,
442                             FROM_PROTOCOL_INFO,
443                             FROM_PROTOCOL_INFO,
444                             &item->socket_info_ex.socket_info,
445                             0,
446                             WSA_FLAG_OVERLAPPED);
447         uv__free(item);
448 
449         if (socket != INVALID_SOCKET)
450           closesocket(socket);
451       }
452       handle->pipe.conn.pending_ipc_info.queue_len = 0;
453 
454       if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
455         if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
456           UnregisterWait(handle->read_req.wait_handle);
457           handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
458         }
459         if (handle->read_req.event_handle) {
460           CloseHandle(handle->read_req.event_handle);
461           handle->read_req.event_handle = NULL;
462         }
463       }
464     }
465 
466     if (handle->flags & UV_HANDLE_PIPESERVER) {
467       assert(handle->pipe.serv.accept_reqs);
468       uv__free(handle->pipe.serv.accept_reqs);
469       handle->pipe.serv.accept_reqs = NULL;
470     }
471 
472     uv__handle_close(handle);
473   }
474 }
475 
476 
uv_pipe_pending_instances(uv_pipe_t * handle,int count)477 void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
478   if (handle->flags & UV_HANDLE_BOUND)
479     return;
480   handle->pipe.serv.pending_instances = count;
481   handle->flags |= UV_HANDLE_PIPESERVER;
482 }
483 
484 
485 /* Creates a pipe server. */
uv_pipe_bind(uv_pipe_t * handle,const char * name)486 int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
487   uv_loop_t* loop = handle->loop;
488   int i, err, nameSize;
489   uv_pipe_accept_t* req;
490 
491   if (handle->flags & UV_HANDLE_BOUND) {
492     return UV_EINVAL;
493   }
494 
495   if (!name) {
496     return UV_EINVAL;
497   }
498 
499   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
500     handle->pipe.serv.pending_instances = default_pending_pipe_instances;
501   }
502 
503   handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
504     uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
505   if (!handle->pipe.serv.accept_reqs) {
506     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
507   }
508 
509   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
510     req = &handle->pipe.serv.accept_reqs[i];
511     UV_REQ_INIT(req, UV_ACCEPT);
512     req->data = handle;
513     req->pipeHandle = INVALID_HANDLE_VALUE;
514     req->next_pending = NULL;
515   }
516 
517   /* Convert name to UTF16. */
518   nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
519   handle->name = (WCHAR*)uv__malloc(nameSize);
520   if (!handle->name) {
521     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
522   }
523 
524   if (!MultiByteToWideChar(CP_UTF8,
525                            0,
526                            name,
527                            -1,
528                            handle->name,
529                            nameSize / sizeof(WCHAR))) {
530     err = GetLastError();
531     goto error;
532   }
533 
534   /*
535    * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
536    * If this fails then there's already a pipe server for the given pipe name.
537    */
538   handle->pipe.serv.accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
539       PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
540       FILE_FLAG_FIRST_PIPE_INSTANCE | WRITE_DAC,
541       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
542       PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
543 
544   if (handle->pipe.serv.accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
545     err = GetLastError();
546     if (err == ERROR_ACCESS_DENIED) {
547       err = WSAEADDRINUSE;  /* Translates to UV_EADDRINUSE. */
548     } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
549       err = WSAEACCES;  /* Translates to UV_EACCES. */
550     }
551     goto error;
552   }
553 
554   if (uv_set_pipe_handle(loop,
555                          handle,
556                          handle->pipe.serv.accept_reqs[0].pipeHandle,
557                          -1,
558                          0)) {
559     err = GetLastError();
560     goto error;
561   }
562 
563   handle->pipe.serv.pending_accepts = NULL;
564   handle->flags |= UV_HANDLE_PIPESERVER;
565   handle->flags |= UV_HANDLE_BOUND;
566 
567   return 0;
568 
569 error:
570   if (handle->name) {
571     uv__free(handle->name);
572     handle->name = NULL;
573   }
574 
575   if (handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
576     CloseHandle(handle->pipe.serv.accept_reqs[0].pipeHandle);
577     handle->pipe.serv.accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
578   }
579 
580   return uv_translate_sys_error(err);
581 }
582 
583 
pipe_connect_thread_proc(void * parameter)584 static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
585   uv_loop_t* loop;
586   uv_pipe_t* handle;
587   uv_connect_t* req;
588   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
589   DWORD duplex_flags;
590 
591   req = (uv_connect_t*) parameter;
592   assert(req);
593   handle = (uv_pipe_t*) req->handle;
594   assert(handle);
595   loop = handle->loop;
596   assert(loop);
597 
598   /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
599   /* We wait for the pipe to become available with WaitNamedPipe. */
600   while (WaitNamedPipeW(handle->name, 30000)) {
601     /* The pipe is now available, try to connect. */
602     pipeHandle = open_named_pipe(handle->name, &duplex_flags);
603     if (pipeHandle != INVALID_HANDLE_VALUE) {
604       break;
605     }
606 
607     SwitchToThread();
608   }
609 
610   if (pipeHandle != INVALID_HANDLE_VALUE &&
611       !uv_set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags)) {
612     SET_REQ_SUCCESS(req);
613   } else {
614     SET_REQ_ERROR(req, GetLastError());
615   }
616 
617   /* Post completed */
618   POST_COMPLETION_FOR_REQ(loop, req);
619 
620   return 0;
621 }
622 
623 
uv_pipe_connect(uv_connect_t * req,uv_pipe_t * handle,const char * name,uv_connect_cb cb)624 void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
625     const char* name, uv_connect_cb cb) {
626   uv_loop_t* loop = handle->loop;
627   int err, nameSize;
628   HANDLE pipeHandle = INVALID_HANDLE_VALUE;
629   DWORD duplex_flags;
630 
631   UV_REQ_INIT(req, UV_CONNECT);
632   req->handle = (uv_stream_t*) handle;
633   req->cb = cb;
634 
635   /* Convert name to UTF16. */
636   nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
637   handle->name = (WCHAR*)uv__malloc(nameSize);
638   if (!handle->name) {
639     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
640   }
641 
642   if (!MultiByteToWideChar(CP_UTF8,
643                            0,
644                            name,
645                            -1,
646                            handle->name,
647                            nameSize / sizeof(WCHAR))) {
648     err = GetLastError();
649     goto error;
650   }
651 
652   pipeHandle = open_named_pipe(handle->name, &duplex_flags);
653   if (pipeHandle == INVALID_HANDLE_VALUE) {
654     if (GetLastError() == ERROR_PIPE_BUSY) {
655       /* Wait for the server to make a pipe instance available. */
656       if (!QueueUserWorkItem(&pipe_connect_thread_proc,
657                              req,
658                              WT_EXECUTELONGFUNCTION)) {
659         err = GetLastError();
660         goto error;
661       }
662 
663       REGISTER_HANDLE_REQ(loop, handle, req);
664       handle->reqs_pending++;
665 
666       return;
667     }
668 
669     err = GetLastError();
670     goto error;
671   }
672 
673   assert(pipeHandle != INVALID_HANDLE_VALUE);
674 
675   if (uv_set_pipe_handle(loop,
676                          (uv_pipe_t*) req->handle,
677                          pipeHandle,
678                          -1,
679                          duplex_flags)) {
680     err = GetLastError();
681     goto error;
682   }
683 
684   SET_REQ_SUCCESS(req);
685   uv_insert_pending_req(loop, (uv_req_t*) req);
686   handle->reqs_pending++;
687   REGISTER_HANDLE_REQ(loop, handle, req);
688   return;
689 
690 error:
691   if (handle->name) {
692     uv__free(handle->name);
693     handle->name = NULL;
694   }
695 
696   if (pipeHandle != INVALID_HANDLE_VALUE) {
697     CloseHandle(pipeHandle);
698   }
699 
700   /* Make this req pending reporting an error. */
701   SET_REQ_ERROR(req, err);
702   uv_insert_pending_req(loop, (uv_req_t*) req);
703   handle->reqs_pending++;
704   REGISTER_HANDLE_REQ(loop, handle, req);
705   return;
706 }
707 
708 
uv__pipe_pause_read(uv_pipe_t * handle)709 void uv__pipe_pause_read(uv_pipe_t* handle) {
710   if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
711       /* Pause the ReadFile task briefly, to work
712          around the Windows kernel bug that causes
713          any access to a NamedPipe to deadlock if
714          any process has called ReadFile */
715       HANDLE h;
716       uv_mutex_lock(&handle->pipe.conn.readfile_mutex);
717       h = handle->pipe.conn.readfile_thread;
718       while (h) {
719         /* spinlock: we expect this to finish quickly,
720            or we are probably about to deadlock anyways
721            (in the kernel), so it doesn't matter */
722         pCancelSynchronousIo(h);
723         SwitchToThread(); /* yield thread control briefly */
724         h = handle->pipe.conn.readfile_thread;
725       }
726   }
727 }
728 
729 
uv__pipe_unpause_read(uv_pipe_t * handle)730 void uv__pipe_unpause_read(uv_pipe_t* handle) {
731   if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
732     uv_mutex_unlock(&handle->pipe.conn.readfile_mutex);
733   }
734 }
735 
736 
uv__pipe_stop_read(uv_pipe_t * handle)737 void uv__pipe_stop_read(uv_pipe_t* handle) {
738   handle->flags &= ~UV_HANDLE_READING;
739   uv__pipe_pause_read((uv_pipe_t*)handle);
740   uv__pipe_unpause_read((uv_pipe_t*)handle);
741 }
742 
743 
744 /* Cleans up uv_pipe_t (server or connection) and all resources associated */
745 /* with it. */
uv_pipe_cleanup(uv_loop_t * loop,uv_pipe_t * handle)746 void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
747   int i;
748   HANDLE pipeHandle;
749 
750   uv__pipe_stop_read(handle);
751 
752   if (handle->name) {
753     uv__free(handle->name);
754     handle->name = NULL;
755   }
756 
757   if (handle->flags & UV_HANDLE_PIPESERVER) {
758     for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
759       pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
760       if (pipeHandle != INVALID_HANDLE_VALUE) {
761         CloseHandle(pipeHandle);
762         handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
763       }
764     }
765     handle->handle = INVALID_HANDLE_VALUE;
766   }
767 
768   if (handle->flags & UV_HANDLE_CONNECTION) {
769     handle->flags &= ~UV_HANDLE_WRITABLE;
770     eof_timer_destroy(handle);
771   }
772 
773   if ((handle->flags & UV_HANDLE_CONNECTION)
774       && handle->handle != INVALID_HANDLE_VALUE)
775     close_pipe(handle);
776 }
777 
778 
uv_pipe_close(uv_loop_t * loop,uv_pipe_t * handle)779 void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
780   if (handle->flags & UV_HANDLE_READING) {
781     handle->flags &= ~UV_HANDLE_READING;
782     DECREASE_ACTIVE_COUNT(loop, handle);
783   }
784 
785   if (handle->flags & UV_HANDLE_LISTENING) {
786     handle->flags &= ~UV_HANDLE_LISTENING;
787     DECREASE_ACTIVE_COUNT(loop, handle);
788   }
789 
790   uv_pipe_cleanup(loop, handle);
791 
792   if (handle->reqs_pending == 0) {
793     uv_want_endgame(loop, (uv_handle_t*) handle);
794   }
795 
796   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
797   uv__handle_closing(handle);
798 }
799 
800 
uv_pipe_queue_accept(uv_loop_t * loop,uv_pipe_t * handle,uv_pipe_accept_t * req,BOOL firstInstance)801 static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
802     uv_pipe_accept_t* req, BOOL firstInstance) {
803   assert(handle->flags & UV_HANDLE_LISTENING);
804 
805   if (!firstInstance) {
806     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
807 
808     req->pipeHandle = CreateNamedPipeW(handle->name,
809         PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC,
810         PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
811         PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
812 
813     if (req->pipeHandle == INVALID_HANDLE_VALUE) {
814       SET_REQ_ERROR(req, GetLastError());
815       uv_insert_pending_req(loop, (uv_req_t*) req);
816       handle->reqs_pending++;
817       return;
818     }
819 
820     if (uv_set_pipe_handle(loop, handle, req->pipeHandle, -1, 0)) {
821       CloseHandle(req->pipeHandle);
822       req->pipeHandle = INVALID_HANDLE_VALUE;
823       SET_REQ_ERROR(req, GetLastError());
824       uv_insert_pending_req(loop, (uv_req_t*) req);
825       handle->reqs_pending++;
826       return;
827     }
828   }
829 
830   assert(req->pipeHandle != INVALID_HANDLE_VALUE);
831 
832   /* Prepare the overlapped structure. */
833   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
834 
835   if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
836       GetLastError() != ERROR_IO_PENDING) {
837     if (GetLastError() == ERROR_PIPE_CONNECTED) {
838       SET_REQ_SUCCESS(req);
839     } else {
840       CloseHandle(req->pipeHandle);
841       req->pipeHandle = INVALID_HANDLE_VALUE;
842       /* Make this req pending reporting an error. */
843       SET_REQ_ERROR(req, GetLastError());
844     }
845     uv_insert_pending_req(loop, (uv_req_t*) req);
846     handle->reqs_pending++;
847     return;
848   }
849 
850   handle->reqs_pending++;
851 }
852 
853 
uv_pipe_accept(uv_pipe_t * server,uv_stream_t * client)854 int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
855   uv_loop_t* loop = server->loop;
856   uv_pipe_t* pipe_client;
857   uv_pipe_accept_t* req;
858   QUEUE* q;
859   uv__ipc_queue_item_t* item;
860   int err;
861 
862   if (server->ipc) {
863     if (QUEUE_EMPTY(&server->pipe.conn.pending_ipc_info.queue)) {
864       /* No valid pending sockets. */
865       return WSAEWOULDBLOCK;
866     }
867 
868     q = QUEUE_HEAD(&server->pipe.conn.pending_ipc_info.queue);
869     QUEUE_REMOVE(q);
870     server->pipe.conn.pending_ipc_info.queue_len--;
871     item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
872 
873     err = uv_tcp_import((uv_tcp_t*)client,
874                         &item->socket_info_ex,
875                         item->tcp_connection);
876     if (err != 0)
877       return err;
878 
879     uv__free(item);
880 
881   } else {
882     pipe_client = (uv_pipe_t*)client;
883 
884     /* Find a connection instance that has been connected, but not yet */
885     /* accepted. */
886     req = server->pipe.serv.pending_accepts;
887 
888     if (!req) {
889       /* No valid connections found, so we error out. */
890       return WSAEWOULDBLOCK;
891     }
892 
893     /* Initialize the client handle and copy the pipeHandle to the client */
894     uv_pipe_connection_init(pipe_client);
895     pipe_client->handle = req->pipeHandle;
896     pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
897 
898     /* Prepare the req to pick up a new connection */
899     server->pipe.serv.pending_accepts = req->next_pending;
900     req->next_pending = NULL;
901     req->pipeHandle = INVALID_HANDLE_VALUE;
902 
903     if (!(server->flags & UV__HANDLE_CLOSING)) {
904       uv_pipe_queue_accept(loop, server, req, FALSE);
905     }
906   }
907 
908   return 0;
909 }
910 
911 
912 /* Starts listening for connections for the given pipe. */
uv_pipe_listen(uv_pipe_t * handle,int backlog,uv_connection_cb cb)913 int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
914   uv_loop_t* loop = handle->loop;
915   int i;
916 
917   if (handle->flags & UV_HANDLE_LISTENING) {
918     handle->stream.serv.connection_cb = cb;
919   }
920 
921   if (!(handle->flags & UV_HANDLE_BOUND)) {
922     return WSAEINVAL;
923   }
924 
925   if (handle->flags & UV_HANDLE_READING) {
926     return WSAEISCONN;
927   }
928 
929   if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
930     return ERROR_NOT_SUPPORTED;
931   }
932 
933   handle->flags |= UV_HANDLE_LISTENING;
934   INCREASE_ACTIVE_COUNT(loop, handle);
935   handle->stream.serv.connection_cb = cb;
936 
937   /* First pipe handle should have already been created in uv_pipe_bind */
938   assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
939 
940   for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
941     uv_pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
942   }
943 
944   return 0;
945 }
946 
947 
uv_pipe_zero_readfile_thread_proc(void * parameter)948 static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
949   int result;
950   DWORD bytes;
951   uv_read_t* req = (uv_read_t*) parameter;
952   uv_pipe_t* handle = (uv_pipe_t*) req->data;
953   uv_loop_t* loop = handle->loop;
954   HANDLE hThread = NULL;
955   DWORD err;
956   uv_mutex_t *m = &handle->pipe.conn.readfile_mutex;
957 
958   assert(req != NULL);
959   assert(req->type == UV_READ);
960   assert(handle->type == UV_NAMED_PIPE);
961 
962   if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
963     uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */
964     if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
965                         GetCurrentProcess(), &hThread,
966                         0, FALSE, DUPLICATE_SAME_ACCESS)) {
967       handle->pipe.conn.readfile_thread = hThread;
968     } else {
969       hThread = NULL;
970     }
971     uv_mutex_unlock(m);
972   }
973 restart_readfile:
974   if (handle->flags & UV_HANDLE_READING) {
975     result = ReadFile(handle->handle,
976                       &uv_zero_,
977                       0,
978                       &bytes,
979                       NULL);
980     if (!result) {
981       err = GetLastError();
982       if (err == ERROR_OPERATION_ABORTED &&
983           handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
984         if (handle->flags & UV_HANDLE_READING) {
985           /* just a brief break to do something else */
986           handle->pipe.conn.readfile_thread = NULL;
987           /* resume after it is finished */
988           uv_mutex_lock(m);
989           handle->pipe.conn.readfile_thread = hThread;
990           uv_mutex_unlock(m);
991           goto restart_readfile;
992         } else {
993           result = 1; /* successfully stopped reading */
994         }
995       }
996     }
997   } else {
998     result = 1; /* successfully aborted read before it even started */
999   }
1000   if (hThread) {
1001     assert(hThread == handle->pipe.conn.readfile_thread);
1002     /* mutex does not control clearing readfile_thread */
1003     handle->pipe.conn.readfile_thread = NULL;
1004     uv_mutex_lock(m);
1005     /* only when we hold the mutex lock is it safe to
1006        open or close the handle */
1007     CloseHandle(hThread);
1008     uv_mutex_unlock(m);
1009   }
1010 
1011   if (!result) {
1012     SET_REQ_ERROR(req, err);
1013   }
1014 
1015   POST_COMPLETION_FOR_REQ(loop, req);
1016   return 0;
1017 }
1018 
1019 
uv_pipe_writefile_thread_proc(void * parameter)1020 static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1021   int result;
1022   DWORD bytes;
1023   uv_write_t* req = (uv_write_t*) parameter;
1024   uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1025   uv_loop_t* loop = handle->loop;
1026 
1027   assert(req != NULL);
1028   assert(req->type == UV_WRITE);
1029   assert(handle->type == UV_NAMED_PIPE);
1030   assert(req->write_buffer.base);
1031 
1032   result = WriteFile(handle->handle,
1033                      req->write_buffer.base,
1034                      req->write_buffer.len,
1035                      &bytes,
1036                      NULL);
1037 
1038   if (!result) {
1039     SET_REQ_ERROR(req, GetLastError());
1040   }
1041 
1042   POST_COMPLETION_FOR_REQ(loop, req);
1043   return 0;
1044 }
1045 
1046 
post_completion_read_wait(void * context,BOOLEAN timed_out)1047 static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1048   uv_read_t* req;
1049   uv_tcp_t* handle;
1050 
1051   req = (uv_read_t*) context;
1052   assert(req != NULL);
1053   handle = (uv_tcp_t*)req->data;
1054   assert(handle != NULL);
1055   assert(!timed_out);
1056 
1057   if (!PostQueuedCompletionStatus(handle->loop->iocp,
1058                                   req->u.io.overlapped.InternalHigh,
1059                                   0,
1060                                   &req->u.io.overlapped)) {
1061     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1062   }
1063 }
1064 
1065 
post_completion_write_wait(void * context,BOOLEAN timed_out)1066 static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1067   uv_write_t* req;
1068   uv_tcp_t* handle;
1069 
1070   req = (uv_write_t*) context;
1071   assert(req != NULL);
1072   handle = (uv_tcp_t*)req->handle;
1073   assert(handle != NULL);
1074   assert(!timed_out);
1075 
1076   if (!PostQueuedCompletionStatus(handle->loop->iocp,
1077                                   req->u.io.overlapped.InternalHigh,
1078                                   0,
1079                                   &req->u.io.overlapped)) {
1080     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1081   }
1082 }
1083 
1084 
uv_pipe_queue_read(uv_loop_t * loop,uv_pipe_t * handle)1085 static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1086   uv_read_t* req;
1087   int result;
1088 
1089   assert(handle->flags & UV_HANDLE_READING);
1090   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1091 
1092   assert(handle->handle != INVALID_HANDLE_VALUE);
1093 
1094   req = &handle->read_req;
1095 
1096   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1097     if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1098                            req,
1099                            WT_EXECUTELONGFUNCTION)) {
1100       /* Make this req pending reporting an error. */
1101       SET_REQ_ERROR(req, GetLastError());
1102       goto error;
1103     }
1104   } else {
1105     memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1106     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1107       req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1108     }
1109 
1110     /* Do 0-read */
1111     result = ReadFile(handle->handle,
1112                       &uv_zero_,
1113                       0,
1114                       NULL,
1115                       &req->u.io.overlapped);
1116 
1117     if (!result && GetLastError() != ERROR_IO_PENDING) {
1118       /* Make this req pending reporting an error. */
1119       SET_REQ_ERROR(req, GetLastError());
1120       goto error;
1121     }
1122 
1123     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1124       if (!req->event_handle) {
1125         req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1126         if (!req->event_handle) {
1127           uv_fatal_error(GetLastError(), "CreateEvent");
1128         }
1129       }
1130       if (req->wait_handle == INVALID_HANDLE_VALUE) {
1131         if (!RegisterWaitForSingleObject(&req->wait_handle,
1132             req->u.io.overlapped.hEvent, post_completion_read_wait, (void*) req,
1133             INFINITE, WT_EXECUTEINWAITTHREAD)) {
1134           SET_REQ_ERROR(req, GetLastError());
1135           goto error;
1136         }
1137       }
1138     }
1139   }
1140 
1141   /* Start the eof timer if there is one */
1142   eof_timer_start(handle);
1143   handle->flags |= UV_HANDLE_READ_PENDING;
1144   handle->reqs_pending++;
1145   return;
1146 
1147 error:
1148   uv_insert_pending_req(loop, (uv_req_t*)req);
1149   handle->flags |= UV_HANDLE_READ_PENDING;
1150   handle->reqs_pending++;
1151 }
1152 
1153 
uv_pipe_read_start(uv_pipe_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)1154 int uv_pipe_read_start(uv_pipe_t* handle,
1155                        uv_alloc_cb alloc_cb,
1156                        uv_read_cb read_cb) {
1157   uv_loop_t* loop = handle->loop;
1158 
1159   handle->flags |= UV_HANDLE_READING;
1160   INCREASE_ACTIVE_COUNT(loop, handle);
1161   handle->read_cb = read_cb;
1162   handle->alloc_cb = alloc_cb;
1163 
1164   /* If reading was stopped and then started again, there could still be a */
1165   /* read request pending. */
1166   if (!(handle->flags & UV_HANDLE_READ_PENDING))
1167     uv_pipe_queue_read(loop, handle);
1168 
1169   return 0;
1170 }
1171 
1172 
uv_insert_non_overlapped_write_req(uv_pipe_t * handle,uv_write_t * req)1173 static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
1174     uv_write_t* req) {
1175   req->next_req = NULL;
1176   if (handle->pipe.conn.non_overlapped_writes_tail) {
1177     req->next_req =
1178       handle->pipe.conn.non_overlapped_writes_tail->next_req;
1179     handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1180     handle->pipe.conn.non_overlapped_writes_tail = req;
1181   } else {
1182     req->next_req = (uv_req_t*)req;
1183     handle->pipe.conn.non_overlapped_writes_tail = req;
1184   }
1185 }
1186 
1187 
uv_remove_non_overlapped_write_req(uv_pipe_t * handle)1188 static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1189   uv_write_t* req;
1190 
1191   if (handle->pipe.conn.non_overlapped_writes_tail) {
1192     req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1193 
1194     if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1195       handle->pipe.conn.non_overlapped_writes_tail = NULL;
1196     } else {
1197       handle->pipe.conn.non_overlapped_writes_tail->next_req =
1198         req->next_req;
1199     }
1200 
1201     return req;
1202   } else {
1203     /* queue empty */
1204     return NULL;
1205   }
1206 }
1207 
1208 
uv_queue_non_overlapped_write(uv_pipe_t * handle)1209 static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
1210   uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1211   if (req) {
1212     if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1213                            req,
1214                            WT_EXECUTELONGFUNCTION)) {
1215       uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1216     }
1217   }
1218 }
1219 
1220 
uv_pipe_write_impl(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1221 static int uv_pipe_write_impl(uv_loop_t* loop,
1222                               uv_write_t* req,
1223                               uv_pipe_t* handle,
1224                               const uv_buf_t bufs[],
1225                               unsigned int nbufs,
1226                               uv_stream_t* send_handle,
1227                               uv_write_cb cb) {
1228   int err;
1229   int result;
1230   uv_tcp_t* tcp_send_handle;
1231   uv_write_t* ipc_header_req = NULL;
1232   uv_ipc_frame_uv_stream ipc_frame;
1233 
1234   if (nbufs != 1 && (nbufs != 0 || !send_handle)) {
1235     return ERROR_NOT_SUPPORTED;
1236   }
1237 
1238   /* Only TCP handles are supported for sharing. */
1239   if (send_handle && ((send_handle->type != UV_TCP) ||
1240       (!(send_handle->flags & UV_HANDLE_BOUND) &&
1241        !(send_handle->flags & UV_HANDLE_CONNECTION)))) {
1242     return ERROR_NOT_SUPPORTED;
1243   }
1244 
1245   assert(handle->handle != INVALID_HANDLE_VALUE);
1246 
1247   UV_REQ_INIT(req, UV_WRITE);
1248   req->handle = (uv_stream_t*) handle;
1249   req->cb = cb;
1250   req->ipc_header = 0;
1251   req->event_handle = NULL;
1252   req->wait_handle = INVALID_HANDLE_VALUE;
1253   memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1254 
1255   if (handle->ipc) {
1256     assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1257     ipc_frame.header.flags = 0;
1258 
1259     /* Use the IPC framing protocol. */
1260     if (send_handle) {
1261       tcp_send_handle = (uv_tcp_t*)send_handle;
1262 
1263       if (handle->pipe.conn.ipc_pid == 0) {
1264           handle->pipe.conn.ipc_pid = uv_current_pid();
1265       }
1266 
1267       err = uv_tcp_duplicate_socket(tcp_send_handle, handle->pipe.conn.ipc_pid,
1268           &ipc_frame.socket_info_ex.socket_info);
1269       if (err) {
1270         return err;
1271       }
1272 
1273       ipc_frame.socket_info_ex.delayed_error = tcp_send_handle->delayed_error;
1274 
1275       ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
1276 
1277       if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
1278         ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
1279       }
1280     }
1281 
1282     if (nbufs == 1) {
1283       ipc_frame.header.flags |= UV_IPC_RAW_DATA;
1284       ipc_frame.header.raw_data_length = bufs[0].len;
1285     }
1286 
1287     /*
1288      * Use the provided req if we're only doing a single write.
1289      * If we're doing multiple writes, use ipc_header_write_req to do
1290      * the first write, and then use the provided req for the second write.
1291      */
1292     if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1293       ipc_header_req = req;
1294     } else {
1295       /*
1296        * Try to use the preallocated write req if it's available.
1297        * Otherwise allocate a new one.
1298        */
1299       if (handle->pipe.conn.ipc_header_write_req.type != UV_WRITE) {
1300         ipc_header_req = (uv_write_t*)&handle->pipe.conn.ipc_header_write_req;
1301       } else {
1302         ipc_header_req = (uv_write_t*)uv__malloc(sizeof(uv_write_t));
1303         if (!ipc_header_req) {
1304           uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1305         }
1306       }
1307 
1308       UV_REQ_INIT(ipc_header_req, UV_WRITE);
1309       ipc_header_req->handle = (uv_stream_t*) handle;
1310       ipc_header_req->cb = NULL;
1311       ipc_header_req->ipc_header = 1;
1312     }
1313 
1314     /* Write the header or the whole frame. */
1315     memset(&ipc_header_req->u.io.overlapped, 0,
1316            sizeof(ipc_header_req->u.io.overlapped));
1317 
1318     /* Using overlapped IO, but wait for completion before returning.
1319        This write is blocking because ipc_frame is on stack. */
1320     ipc_header_req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1321     if (!ipc_header_req->u.io.overlapped.hEvent) {
1322       uv_fatal_error(GetLastError(), "CreateEvent");
1323     }
1324 
1325     result = WriteFile(handle->handle,
1326                         &ipc_frame,
1327                         ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
1328                           sizeof(ipc_frame) : sizeof(ipc_frame.header),
1329                         NULL,
1330                         &ipc_header_req->u.io.overlapped);
1331     if (!result && GetLastError() != ERROR_IO_PENDING) {
1332       err = GetLastError();
1333       CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
1334       return err;
1335     }
1336 
1337     if (!result) {
1338       /* Request not completed immediately. Wait for it.*/
1339       if (WaitForSingleObject(ipc_header_req->u.io.overlapped.hEvent, INFINITE) !=
1340           WAIT_OBJECT_0) {
1341         err = GetLastError();
1342         CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
1343         return err;
1344       }
1345     }
1346     ipc_header_req->u.io.queued_bytes = 0;
1347     CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
1348     ipc_header_req->u.io.overlapped.hEvent = NULL;
1349 
1350     REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
1351     handle->reqs_pending++;
1352     handle->stream.conn.write_reqs_pending++;
1353 
1354     /* If we don't have any raw data to write - we're done. */
1355     if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
1356       return 0;
1357     }
1358   }
1359 
1360   if ((handle->flags &
1361       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1362       (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1363     DWORD bytes;
1364     result = WriteFile(handle->handle,
1365                        bufs[0].base,
1366                        bufs[0].len,
1367                        &bytes,
1368                        NULL);
1369 
1370     if (!result) {
1371       err = GetLastError();
1372       return err;
1373     } else {
1374       /* Request completed immediately. */
1375       req->u.io.queued_bytes = 0;
1376     }
1377 
1378     REGISTER_HANDLE_REQ(loop, handle, req);
1379     handle->reqs_pending++;
1380     handle->stream.conn.write_reqs_pending++;
1381     POST_COMPLETION_FOR_REQ(loop, req);
1382     return 0;
1383   } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1384     req->write_buffer = bufs[0];
1385     uv_insert_non_overlapped_write_req(handle, req);
1386     if (handle->stream.conn.write_reqs_pending == 0) {
1387       uv_queue_non_overlapped_write(handle);
1388     }
1389 
1390     /* Request queued by the kernel. */
1391     req->u.io.queued_bytes = bufs[0].len;
1392     handle->write_queue_size += req->u.io.queued_bytes;
1393   } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1394     /* Using overlapped IO, but wait for completion before returning */
1395     req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
1396     if (!req->u.io.overlapped.hEvent) {
1397       uv_fatal_error(GetLastError(), "CreateEvent");
1398     }
1399 
1400     result = WriteFile(handle->handle,
1401                        bufs[0].base,
1402                        bufs[0].len,
1403                        NULL,
1404                        &req->u.io.overlapped);
1405 
1406     if (!result && GetLastError() != ERROR_IO_PENDING) {
1407       err = GetLastError();
1408       CloseHandle(req->u.io.overlapped.hEvent);
1409       return err;
1410     }
1411 
1412     if (result) {
1413       /* Request completed immediately. */
1414       req->u.io.queued_bytes = 0;
1415     } else {
1416       /* Request queued by the kernel. */
1417       req->u.io.queued_bytes = bufs[0].len;
1418       handle->write_queue_size += req->u.io.queued_bytes;
1419       if (WaitForSingleObject(req->u.io.overlapped.hEvent, INFINITE) !=
1420           WAIT_OBJECT_0) {
1421         err = GetLastError();
1422         CloseHandle(req->u.io.overlapped.hEvent);
1423         return uv_translate_sys_error(err);
1424       }
1425     }
1426     CloseHandle(req->u.io.overlapped.hEvent);
1427 
1428     REGISTER_HANDLE_REQ(loop, handle, req);
1429     handle->reqs_pending++;
1430     handle->stream.conn.write_reqs_pending++;
1431     return 0;
1432   } else {
1433     result = WriteFile(handle->handle,
1434                        bufs[0].base,
1435                        bufs[0].len,
1436                        NULL,
1437                        &req->u.io.overlapped);
1438 
1439     if (!result && GetLastError() != ERROR_IO_PENDING) {
1440       return GetLastError();
1441     }
1442 
1443     if (result) {
1444       /* Request completed immediately. */
1445       req->u.io.queued_bytes = 0;
1446     } else {
1447       /* Request queued by the kernel. */
1448       req->u.io.queued_bytes = bufs[0].len;
1449       handle->write_queue_size += req->u.io.queued_bytes;
1450     }
1451 
1452     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1453       req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1454       if (!req->event_handle) {
1455         uv_fatal_error(GetLastError(), "CreateEvent");
1456       }
1457       if (!RegisterWaitForSingleObject(&req->wait_handle,
1458           req->u.io.overlapped.hEvent, post_completion_write_wait, (void*) req,
1459           INFINITE, WT_EXECUTEINWAITTHREAD)) {
1460         return GetLastError();
1461       }
1462     }
1463   }
1464 
1465   REGISTER_HANDLE_REQ(loop, handle, req);
1466   handle->reqs_pending++;
1467   handle->stream.conn.write_reqs_pending++;
1468 
1469   return 0;
1470 }
1471 
1472 
uv_pipe_write(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)1473 int uv_pipe_write(uv_loop_t* loop,
1474                   uv_write_t* req,
1475                   uv_pipe_t* handle,
1476                   const uv_buf_t bufs[],
1477                   unsigned int nbufs,
1478                   uv_write_cb cb) {
1479   return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
1480 }
1481 
1482 
uv_pipe_write2(uv_loop_t * loop,uv_write_t * req,uv_pipe_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_stream_t * send_handle,uv_write_cb cb)1483 int uv_pipe_write2(uv_loop_t* loop,
1484                    uv_write_t* req,
1485                    uv_pipe_t* handle,
1486                    const uv_buf_t bufs[],
1487                    unsigned int nbufs,
1488                    uv_stream_t* send_handle,
1489                    uv_write_cb cb) {
1490   if (!handle->ipc) {
1491     return WSAEINVAL;
1492   }
1493 
1494   return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
1495 }
1496 
1497 
uv_pipe_read_eof(uv_loop_t * loop,uv_pipe_t * handle,uv_buf_t buf)1498 static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1499     uv_buf_t buf) {
1500   /* If there is an eof timer running, we don't need it any more, */
1501   /* so discard it. */
1502   eof_timer_destroy(handle);
1503 
1504   handle->flags &= ~UV_HANDLE_READABLE;
1505   uv_read_stop((uv_stream_t*) handle);
1506 
1507   handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1508 }
1509 
1510 
uv_pipe_read_error(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1511 static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1512     uv_buf_t buf) {
1513   /* If there is an eof timer running, we don't need it any more, */
1514   /* so discard it. */
1515   eof_timer_destroy(handle);
1516 
1517   uv_read_stop((uv_stream_t*) handle);
1518 
1519   handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1520 }
1521 
1522 
uv_pipe_read_error_or_eof(uv_loop_t * loop,uv_pipe_t * handle,int error,uv_buf_t buf)1523 static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1524     int error, uv_buf_t buf) {
1525   if (error == ERROR_OPERATION_ABORTED) {
1526     /* do nothing (equivalent to EINTR) */
1527   }
1528   else if (error == ERROR_BROKEN_PIPE) {
1529     uv_pipe_read_eof(loop, handle, buf);
1530   } else {
1531     uv_pipe_read_error(loop, handle, error, buf);
1532   }
1533 }
1534 
1535 
uv__pipe_insert_pending_socket(uv_pipe_t * handle,uv__ipc_socket_info_ex * info,int tcp_connection)1536 void uv__pipe_insert_pending_socket(uv_pipe_t* handle,
1537                                     uv__ipc_socket_info_ex* info,
1538                                     int tcp_connection) {
1539   uv__ipc_queue_item_t* item;
1540 
1541   item = (uv__ipc_queue_item_t*) uv__malloc(sizeof(*item));
1542   if (item == NULL)
1543     uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1544 
1545   memcpy(&item->socket_info_ex, info, sizeof(item->socket_info_ex));
1546   item->tcp_connection = tcp_connection;
1547   QUEUE_INSERT_TAIL(&handle->pipe.conn.pending_ipc_info.queue, &item->member);
1548   handle->pipe.conn.pending_ipc_info.queue_len++;
1549 }
1550 
1551 
uv_process_pipe_read_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * req)1552 void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
1553     uv_req_t* req) {
1554   DWORD bytes, avail;
1555   uv_buf_t buf;
1556   uv_ipc_frame_uv_stream ipc_frame;
1557 
1558   assert(handle->type == UV_NAMED_PIPE);
1559 
1560   handle->flags &= ~UV_HANDLE_READ_PENDING;
1561   eof_timer_stop(handle);
1562 
1563   if (!REQ_SUCCESS(req)) {
1564     /* An error occurred doing the 0-read. */
1565     if (handle->flags & UV_HANDLE_READING) {
1566       uv_pipe_read_error_or_eof(loop,
1567                                 handle,
1568                                 GET_REQ_ERROR(req),
1569                                 uv_null_buf_);
1570     }
1571   } else {
1572     /* Do non-blocking reads until the buffer is empty */
1573     while (handle->flags & UV_HANDLE_READING) {
1574       if (!PeekNamedPipe(handle->handle,
1575                           NULL,
1576                           0,
1577                           NULL,
1578                           &avail,
1579                           NULL)) {
1580         uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
1581         break;
1582       }
1583 
1584       if (avail == 0) {
1585         /* There is nothing to read after all. */
1586         break;
1587       }
1588 
1589       if (handle->ipc) {
1590         /* Use the IPC framing protocol to read the incoming data. */
1591         if (handle->pipe.conn.remaining_ipc_rawdata_bytes == 0) {
1592           /* We're reading a new frame.  First, read the header. */
1593           assert(avail >= sizeof(ipc_frame.header));
1594 
1595           if (!ReadFile(handle->handle,
1596                         &ipc_frame.header,
1597                         sizeof(ipc_frame.header),
1598                         &bytes,
1599                         NULL)) {
1600             uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1601               uv_null_buf_);
1602             break;
1603           }
1604 
1605           assert(bytes == sizeof(ipc_frame.header));
1606           assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
1607             UV_IPC_TCP_CONNECTION));
1608 
1609           if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
1610             assert(avail - sizeof(ipc_frame.header) >=
1611               sizeof(ipc_frame.socket_info_ex));
1612 
1613             /* Read the TCP socket info. */
1614             if (!ReadFile(handle->handle,
1615                           &ipc_frame.socket_info_ex,
1616                           sizeof(ipc_frame) - sizeof(ipc_frame.header),
1617                           &bytes,
1618                           NULL)) {
1619               uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
1620                 uv_null_buf_);
1621               break;
1622             }
1623 
1624             assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
1625 
1626             /* Store the pending socket info. */
1627             uv__pipe_insert_pending_socket(
1628                 handle,
1629                 &ipc_frame.socket_info_ex,
1630                 ipc_frame.header.flags & UV_IPC_TCP_CONNECTION);
1631           }
1632 
1633           if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
1634             handle->pipe.conn.remaining_ipc_rawdata_bytes =
1635               ipc_frame.header.raw_data_length;
1636             continue;
1637           }
1638         } else {
1639           avail = min(avail, (DWORD)handle->pipe.conn.remaining_ipc_rawdata_bytes);
1640         }
1641       }
1642 
1643       buf = uv_buf_init(NULL, 0);
1644       handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
1645       if (buf.base == NULL || buf.len == 0) {
1646         handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1647         break;
1648       }
1649       assert(buf.base != NULL);
1650 
1651       if (ReadFile(handle->handle,
1652                    buf.base,
1653                    min(buf.len, avail),
1654                    &bytes,
1655                    NULL)) {
1656         /* Successful read */
1657         if (handle->ipc) {
1658           assert(handle->pipe.conn.remaining_ipc_rawdata_bytes >= bytes);
1659           handle->pipe.conn.remaining_ipc_rawdata_bytes =
1660             handle->pipe.conn.remaining_ipc_rawdata_bytes - bytes;
1661         }
1662         handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1663 
1664         /* Read again only if bytes == buf.len */
1665         if (bytes <= buf.len) {
1666           break;
1667         }
1668       } else {
1669         uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1670         break;
1671       }
1672     }
1673 
1674     /* Post another 0-read if still reading and not closing. */
1675     if ((handle->flags & UV_HANDLE_READING) &&
1676         !(handle->flags & UV_HANDLE_READ_PENDING)) {
1677       uv_pipe_queue_read(loop, handle);
1678     }
1679   }
1680 
1681   DECREASE_PENDING_REQ_COUNT(handle);
1682 }
1683 
1684 
uv_process_pipe_write_req(uv_loop_t * loop,uv_pipe_t * handle,uv_write_t * req)1685 void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
1686     uv_write_t* req) {
1687   int err;
1688 
1689   assert(handle->type == UV_NAMED_PIPE);
1690 
1691   assert(handle->write_queue_size >= req->u.io.queued_bytes);
1692   handle->write_queue_size -= req->u.io.queued_bytes;
1693 
1694   UNREGISTER_HANDLE_REQ(loop, handle, req);
1695 
1696   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1697     if (req->wait_handle != INVALID_HANDLE_VALUE) {
1698       UnregisterWait(req->wait_handle);
1699       req->wait_handle = INVALID_HANDLE_VALUE;
1700     }
1701     if (req->event_handle) {
1702       CloseHandle(req->event_handle);
1703       req->event_handle = NULL;
1704     }
1705   }
1706 
1707   if (req->ipc_header) {
1708     if (req == &handle->pipe.conn.ipc_header_write_req) {
1709       req->type = UV_UNKNOWN_REQ;
1710     } else {
1711       uv__free(req);
1712     }
1713   } else {
1714     if (req->cb) {
1715       err = GET_REQ_ERROR(req);
1716       req->cb(req, uv_translate_sys_error(err));
1717     }
1718   }
1719 
1720   handle->stream.conn.write_reqs_pending--;
1721 
1722   if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
1723       handle->pipe.conn.non_overlapped_writes_tail) {
1724     assert(handle->stream.conn.write_reqs_pending > 0);
1725     uv_queue_non_overlapped_write(handle);
1726   }
1727 
1728   if (handle->stream.conn.shutdown_req != NULL &&
1729       handle->stream.conn.write_reqs_pending == 0) {
1730     uv_want_endgame(loop, (uv_handle_t*)handle);
1731   }
1732 
1733   DECREASE_PENDING_REQ_COUNT(handle);
1734 }
1735 
1736 
uv_process_pipe_accept_req(uv_loop_t * loop,uv_pipe_t * handle,uv_req_t * raw_req)1737 void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
1738     uv_req_t* raw_req) {
1739   uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
1740 
1741   assert(handle->type == UV_NAMED_PIPE);
1742 
1743   if (handle->flags & UV__HANDLE_CLOSING) {
1744     /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
1745     assert(req->pipeHandle == INVALID_HANDLE_VALUE);
1746     DECREASE_PENDING_REQ_COUNT(handle);
1747     return;
1748   }
1749 
1750   if (REQ_SUCCESS(req)) {
1751     assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1752     req->next_pending = handle->pipe.serv.pending_accepts;
1753     handle->pipe.serv.pending_accepts = req;
1754 
1755     if (handle->stream.serv.connection_cb) {
1756       handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
1757     }
1758   } else {
1759     if (req->pipeHandle != INVALID_HANDLE_VALUE) {
1760       CloseHandle(req->pipeHandle);
1761       req->pipeHandle = INVALID_HANDLE_VALUE;
1762     }
1763     if (!(handle->flags & UV__HANDLE_CLOSING)) {
1764       uv_pipe_queue_accept(loop, handle, req, FALSE);
1765     }
1766   }
1767 
1768   DECREASE_PENDING_REQ_COUNT(handle);
1769 }
1770 
1771 
uv_process_pipe_connect_req(uv_loop_t * loop,uv_pipe_t * handle,uv_connect_t * req)1772 void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
1773     uv_connect_t* req) {
1774   int err;
1775 
1776   assert(handle->type == UV_NAMED_PIPE);
1777 
1778   UNREGISTER_HANDLE_REQ(loop, handle, req);
1779 
1780   if (req->cb) {
1781     err = 0;
1782     if (REQ_SUCCESS(req)) {
1783       uv_pipe_connection_init(handle);
1784     } else {
1785       err = GET_REQ_ERROR(req);
1786     }
1787     req->cb(req, uv_translate_sys_error(err));
1788   }
1789 
1790   DECREASE_PENDING_REQ_COUNT(handle);
1791 }
1792 
1793 
uv_process_pipe_shutdown_req(uv_loop_t * loop,uv_pipe_t * handle,uv_shutdown_t * req)1794 void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
1795     uv_shutdown_t* req) {
1796   assert(handle->type == UV_NAMED_PIPE);
1797 
1798   UNREGISTER_HANDLE_REQ(loop, handle, req);
1799 
1800   if (handle->flags & UV_HANDLE_READABLE) {
1801     /* Initialize and optionally start the eof timer. Only do this if the */
1802     /* pipe is readable and we haven't seen EOF come in ourselves. */
1803     eof_timer_init(handle);
1804 
1805     /* If reading start the timer right now. */
1806     /* Otherwise uv_pipe_queue_read will start it. */
1807     if (handle->flags & UV_HANDLE_READ_PENDING) {
1808       eof_timer_start(handle);
1809     }
1810 
1811   } else {
1812     /* This pipe is not readable. We can just close it to let the other end */
1813     /* know that we're done writing. */
1814     close_pipe(handle);
1815   }
1816 
1817   if (req->cb) {
1818     req->cb(req, 0);
1819   }
1820 
1821   DECREASE_PENDING_REQ_COUNT(handle);
1822 }
1823 
1824 
eof_timer_init(uv_pipe_t * pipe)1825 static void eof_timer_init(uv_pipe_t* pipe) {
1826   int r;
1827 
1828   assert(pipe->pipe.conn.eof_timer == NULL);
1829   assert(pipe->flags & UV_HANDLE_CONNECTION);
1830 
1831   pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
1832 
1833   r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
1834   assert(r == 0); /* timers can't fail */
1835   pipe->pipe.conn.eof_timer->data = pipe;
1836   uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
1837 }
1838 
1839 
eof_timer_start(uv_pipe_t * pipe)1840 static void eof_timer_start(uv_pipe_t* pipe) {
1841   assert(pipe->flags & UV_HANDLE_CONNECTION);
1842 
1843   if (pipe->pipe.conn.eof_timer != NULL) {
1844     uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
1845   }
1846 }
1847 
1848 
eof_timer_stop(uv_pipe_t * pipe)1849 static void eof_timer_stop(uv_pipe_t* pipe) {
1850   assert(pipe->flags & UV_HANDLE_CONNECTION);
1851 
1852   if (pipe->pipe.conn.eof_timer != NULL) {
1853     uv_timer_stop(pipe->pipe.conn.eof_timer);
1854   }
1855 }
1856 
1857 
eof_timer_cb(uv_timer_t * timer)1858 static void eof_timer_cb(uv_timer_t* timer) {
1859   uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
1860   uv_loop_t* loop = timer->loop;
1861 
1862   assert(pipe->type == UV_NAMED_PIPE);
1863 
1864   /* This should always be true, since we start the timer only */
1865   /* in uv_pipe_queue_read after successfully calling ReadFile, */
1866   /* or in uv_process_pipe_shutdown_req if a read is pending, */
1867   /* and we always immediately stop the timer in */
1868   /* uv_process_pipe_read_req. */
1869   assert(pipe->flags & UV_HANDLE_READ_PENDING);
1870 
1871   /* If there are many packets coming off the iocp then the timer callback */
1872   /* may be called before the read request is coming off the queue. */
1873   /* Therefore we check here if the read request has completed but will */
1874   /* be processed later. */
1875   if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
1876       HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
1877     return;
1878   }
1879 
1880   /* Force both ends off the pipe. */
1881   close_pipe(pipe);
1882 
1883   /* Stop reading, so the pending read that is going to fail will */
1884   /* not be reported to the user. */
1885   uv_read_stop((uv_stream_t*) pipe);
1886 
1887   /* Report the eof and update flags. This will get reported even if the */
1888   /* user stopped reading in the meantime. TODO: is that okay? */
1889   uv_pipe_read_eof(loop, pipe, uv_null_buf_);
1890 }
1891 
1892 
eof_timer_destroy(uv_pipe_t * pipe)1893 static void eof_timer_destroy(uv_pipe_t* pipe) {
1894   assert(pipe->flags & UV_HANDLE_CONNECTION);
1895 
1896   if (pipe->pipe.conn.eof_timer) {
1897     uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
1898     pipe->pipe.conn.eof_timer = NULL;
1899   }
1900 }
1901 
1902 
eof_timer_close_cb(uv_handle_t * handle)1903 static void eof_timer_close_cb(uv_handle_t* handle) {
1904   assert(handle->type == UV_TIMER);
1905   uv__free(handle);
1906 }
1907 
1908 
uv_pipe_open(uv_pipe_t * pipe,uv_file file)1909 int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
1910   HANDLE os_handle = uv__get_osfhandle(file);
1911   NTSTATUS nt_status;
1912   IO_STATUS_BLOCK io_status;
1913   FILE_ACCESS_INFORMATION access;
1914   DWORD duplex_flags = 0;
1915 
1916   if (os_handle == INVALID_HANDLE_VALUE)
1917     return UV_EBADF;
1918 
1919   uv__once_init();
1920   /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
1921    * underlying OS handle and forget about the original fd.
1922    * We could also opt to use the original OS handle and just never close it,
1923    * but then there would be no reliable way to cancel pending read operations
1924    * upon close.
1925    */
1926   if (file <= 2) {
1927     if (!DuplicateHandle(INVALID_HANDLE_VALUE,
1928                          os_handle,
1929                          INVALID_HANDLE_VALUE,
1930                          &os_handle,
1931                          0,
1932                          FALSE,
1933                          DUPLICATE_SAME_ACCESS))
1934       return uv_translate_sys_error(GetLastError());
1935     file = -1;
1936   }
1937 
1938   /* Determine what kind of permissions we have on this handle.
1939    * Cygwin opens the pipe in message mode, but we can support it,
1940    * just query the access flags and set the stream flags accordingly.
1941    */
1942   nt_status = pNtQueryInformationFile(os_handle,
1943                                       &io_status,
1944                                       &access,
1945                                       sizeof(access),
1946                                       FileAccessInformation);
1947   if (nt_status != STATUS_SUCCESS)
1948     return UV_EINVAL;
1949 
1950   if (pipe->ipc) {
1951     if (!(access.AccessFlags & FILE_WRITE_DATA) ||
1952         !(access.AccessFlags & FILE_READ_DATA)) {
1953       return UV_EINVAL;
1954     }
1955   }
1956 
1957   if (access.AccessFlags & FILE_WRITE_DATA)
1958     duplex_flags |= UV_HANDLE_WRITABLE;
1959   if (access.AccessFlags & FILE_READ_DATA)
1960     duplex_flags |= UV_HANDLE_READABLE;
1961 
1962   if (os_handle == INVALID_HANDLE_VALUE ||
1963       uv_set_pipe_handle(pipe->loop,
1964                          pipe,
1965                          os_handle,
1966                          file,
1967                          duplex_flags) == -1) {
1968     return UV_EINVAL;
1969   }
1970 
1971   uv_pipe_connection_init(pipe);
1972 
1973   if (pipe->ipc) {
1974     assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
1975     pipe->pipe.conn.ipc_pid = uv_os_getppid();
1976     assert(pipe->pipe.conn.ipc_pid != -1);
1977   }
1978   return 0;
1979 }
1980 
1981 
uv__pipe_getname(const uv_pipe_t * handle,char * buffer,size_t * size)1982 static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
1983   NTSTATUS nt_status;
1984   IO_STATUS_BLOCK io_status;
1985   FILE_NAME_INFORMATION tmp_name_info;
1986   FILE_NAME_INFORMATION* name_info;
1987   WCHAR* name_buf;
1988   unsigned int addrlen;
1989   unsigned int name_size;
1990   unsigned int name_len;
1991   int err;
1992 
1993   uv__once_init();
1994   name_info = NULL;
1995 
1996   if (handle->handle == INVALID_HANDLE_VALUE) {
1997     *size = 0;
1998     return UV_EINVAL;
1999   }
2000 
2001   uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */
2002 
2003   nt_status = pNtQueryInformationFile(handle->handle,
2004                                       &io_status,
2005                                       &tmp_name_info,
2006                                       sizeof tmp_name_info,
2007                                       FileNameInformation);
2008   if (nt_status == STATUS_BUFFER_OVERFLOW) {
2009     name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2010     name_info = uv__malloc(name_size);
2011     if (!name_info) {
2012       *size = 0;
2013       err = UV_ENOMEM;
2014       goto cleanup;
2015     }
2016 
2017     nt_status = pNtQueryInformationFile(handle->handle,
2018                                         &io_status,
2019                                         name_info,
2020                                         name_size,
2021                                         FileNameInformation);
2022   }
2023 
2024   if (nt_status != STATUS_SUCCESS) {
2025     *size = 0;
2026     err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2027     goto error;
2028   }
2029 
2030   if (!name_info) {
2031     /* the struct on stack was used */
2032     name_buf = tmp_name_info.FileName;
2033     name_len = tmp_name_info.FileNameLength;
2034   } else {
2035     name_buf = name_info->FileName;
2036     name_len = name_info->FileNameLength;
2037   }
2038 
2039   if (name_len == 0) {
2040     *size = 0;
2041     err = 0;
2042     goto error;
2043   }
2044 
2045   name_len /= sizeof(WCHAR);
2046 
2047   /* check how much space we need */
2048   addrlen = WideCharToMultiByte(CP_UTF8,
2049                                 0,
2050                                 name_buf,
2051                                 name_len,
2052                                 NULL,
2053                                 0,
2054                                 NULL,
2055                                 NULL);
2056   if (!addrlen) {
2057     *size = 0;
2058     err = uv_translate_sys_error(GetLastError());
2059     goto error;
2060   } else if (pipe_prefix_len + addrlen >= *size) {
2061     /* "\\\\.\\pipe" + name */
2062     *size = pipe_prefix_len + addrlen + 1;
2063     err = UV_ENOBUFS;
2064     goto error;
2065   }
2066 
2067   memcpy(buffer, pipe_prefix, pipe_prefix_len);
2068   addrlen = WideCharToMultiByte(CP_UTF8,
2069                                 0,
2070                                 name_buf,
2071                                 name_len,
2072                                 buffer+pipe_prefix_len,
2073                                 *size-pipe_prefix_len,
2074                                 NULL,
2075                                 NULL);
2076   if (!addrlen) {
2077     *size = 0;
2078     err = uv_translate_sys_error(GetLastError());
2079     goto error;
2080   }
2081 
2082   addrlen += pipe_prefix_len;
2083   *size = addrlen;
2084   buffer[addrlen] = '\0';
2085 
2086   err = 0;
2087 
2088 error:
2089   uv__free(name_info);
2090 
2091 cleanup:
2092   uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */
2093   return err;
2094 }
2095 
2096 
uv_pipe_pending_count(uv_pipe_t * handle)2097 int uv_pipe_pending_count(uv_pipe_t* handle) {
2098   if (!handle->ipc)
2099     return 0;
2100   return handle->pipe.conn.pending_ipc_info.queue_len;
2101 }
2102 
2103 
uv_pipe_getsockname(const uv_pipe_t * handle,char * buffer,size_t * size)2104 int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2105   if (handle->flags & UV_HANDLE_BOUND)
2106     return uv__pipe_getname(handle, buffer, size);
2107 
2108   if (handle->flags & UV_HANDLE_CONNECTION ||
2109       handle->handle != INVALID_HANDLE_VALUE) {
2110     *size = 0;
2111     return 0;
2112   }
2113 
2114   return UV_EBADF;
2115 }
2116 
2117 
uv_pipe_getpeername(const uv_pipe_t * handle,char * buffer,size_t * size)2118 int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2119   /* emulate unix behaviour */
2120   if (handle->flags & UV_HANDLE_BOUND)
2121     return UV_ENOTCONN;
2122 
2123   if (handle->handle != INVALID_HANDLE_VALUE)
2124     return uv__pipe_getname(handle, buffer, size);
2125 
2126   return UV_EBADF;
2127 }
2128 
2129 
uv_pipe_pending_type(uv_pipe_t * handle)2130 uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2131   if (!handle->ipc)
2132     return UV_UNKNOWN_HANDLE;
2133   if (handle->pipe.conn.pending_ipc_info.queue_len == 0)
2134     return UV_UNKNOWN_HANDLE;
2135   else
2136     return UV_TCP;
2137 }
2138 
uv_pipe_chmod(uv_pipe_t * handle,int mode)2139 int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2140   SID_IDENTIFIER_AUTHORITY sid_world = SECURITY_WORLD_SID_AUTHORITY;
2141   PACL old_dacl, new_dacl;
2142   PSECURITY_DESCRIPTOR sd;
2143   EXPLICIT_ACCESS ea;
2144   PSID everyone;
2145   int error;
2146 
2147   if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2148     return UV_EBADF;
2149 
2150   if (mode != UV_READABLE &&
2151       mode != UV_WRITABLE &&
2152       mode != (UV_WRITABLE | UV_READABLE))
2153     return UV_EINVAL;
2154 
2155   if (!AllocateAndInitializeSid(&sid_world,
2156                                 1,
2157                                 SECURITY_WORLD_RID,
2158                                 0, 0, 0, 0, 0, 0, 0,
2159                                 &everyone)) {
2160     error = GetLastError();
2161     goto done;
2162   }
2163 
2164   if (GetSecurityInfo(handle->handle,
2165                       SE_KERNEL_OBJECT,
2166                       DACL_SECURITY_INFORMATION,
2167                       NULL,
2168                       NULL,
2169                       &old_dacl,
2170                       NULL,
2171                       &sd)) {
2172     error = GetLastError();
2173     goto clean_sid;
2174   }
2175 
2176   memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2177   if (mode & UV_READABLE)
2178     ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2179   if (mode & UV_WRITABLE)
2180     ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2181   ea.grfAccessPermissions |= SYNCHRONIZE;
2182   ea.grfAccessMode = SET_ACCESS;
2183   ea.grfInheritance = NO_INHERITANCE;
2184   ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2185   ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2186   ea.Trustee.ptstrName = (LPTSTR)everyone;
2187 
2188   if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2189     error = GetLastError();
2190     goto clean_sd;
2191   }
2192 
2193   if (SetSecurityInfo(handle->handle,
2194                       SE_KERNEL_OBJECT,
2195                       DACL_SECURITY_INFORMATION,
2196                       NULL,
2197                       NULL,
2198                       new_dacl,
2199                       NULL)) {
2200     error = GetLastError();
2201     goto clean_dacl;
2202   }
2203 
2204   error = 0;
2205 
2206 clean_dacl:
2207   LocalFree((HLOCAL) new_dacl);
2208 clean_sd:
2209   LocalFree((HLOCAL) sd);
2210 clean_sid:
2211   FreeSid(everyone);
2212 done:
2213   return uv_translate_sys_error(error);
2214 }
2215