1 
2 #include "../processx.h"
3 
4 HANDLE processx__connection_iocp = NULL;
5 
processx__get_default_iocp()6 HANDLE processx__get_default_iocp() {
7   if (! processx__connection_iocp) {
8     processx__connection_iocp = CreateIoCompletionPort(
9     /* FileHandle = */                 INVALID_HANDLE_VALUE,
10     /* ExistingCompletionPort = */     NULL,
11     /* CompletionKey = */              0,
12     /* NumberOfConcurrentThreads =  */ 0);
13   }
14   return processx__connection_iocp;
15 }
16 
17 HANDLE processx__iocp_thread = NULL;
18 HANDLE processx__thread_start = NULL;
19 HANDLE processx__thread_done = NULL;
20 BOOL processx__thread_success;
21 void *processx__thread_data = NULL;
22 DWORD processx__thread_last_error = 0;
23 int processx__thread_cmd = PROCESSX__THREAD_CMD_INIT;
24 
25 fd_set processx__readfds,
26   processx__writefds,
27   processx__exceptionfds;
28 SOCKET processx__notify_socket[2] = { 0, 0 };
29 int processx__select = 0;
30 
31 struct processx__thread_readfile_data {
32   processx_connection_t *ccon;
33   LPVOID lpBuffer;
34   DWORD nNumberOfBytesToRead;
35   LPDWORD lpNumberOfBytesRead;
36 } processx__thread_readfile_data;
37 
38 struct processx__thread_getstatus_data {
39   LPDWORD lpNumberOfBytes;
40   PULONG_PTR lpCompletionKey;
41   LPOVERLAPPED *lpOverlapped;
42   DWORD dwMilliseconds;
43 } processx__thread_getstatus_data;
44 
45 ULONG_PTR processx__key_none = 1;
46 
processx_i_thread_readfile()47 DWORD processx_i_thread_readfile() {
48 
49   processx_connection_t *ccon = processx__thread_readfile_data.ccon;
50 
51   if (! ccon->handle.overlapped.hEvent &&
52       (ccon->type == PROCESSX_FILE_TYPE_ASYNCFILE ||
53        ccon->type == PROCESSX_FILE_TYPE_ASYNCPIPE)) {
54     ccon->handle.overlapped.hEvent = CreateEvent(
55       /* lpEventAttributes = */ NULL,
56       /* bManualReset = */      FALSE,
57       /* bInitialState = */     FALSE,
58       /* lpName = */            NULL);
59 
60     if (ccon->handle.overlapped.hEvent == NULL) return FALSE;
61 
62     HANDLE iocp = processx__get_default_iocp();
63     if (!iocp) return FALSE;
64 
65     HANDLE res = CreateIoCompletionPort(
66       /* FileHandle =  */                ccon->handle.handle,
67       /* ExistingCompletionPort = */     iocp,
68       /* CompletionKey = */              (ULONG_PTR) ccon,
69       /* NumberOfConcurrentThreads = */  0);
70 
71     if (!res) return FALSE;
72   }
73 
74   /* These need to be set to zero for non-file handles */
75   if (ccon->type != PROCESSX_FILE_TYPE_ASYNCFILE) {
76     ccon->handle.overlapped.Offset = 0;
77     ccon->handle.overlapped.OffsetHigh = 0;
78   }
79 
80   DWORD res = ReadFile(ccon->handle.handle,
81 		       processx__thread_readfile_data.lpBuffer,
82 		       processx__thread_readfile_data.nNumberOfBytesToRead,
83 		       processx__thread_readfile_data.lpNumberOfBytesRead,
84 		       &ccon->handle.overlapped);
85   return res;
86 }
87 
processx_i_thread_getstatus()88 DWORD processx_i_thread_getstatus() {
89   static const char *ok_buf = "OK";
90   HANDLE iocp = processx__get_default_iocp();
91   if (!iocp) return FALSE;
92 
93   DWORD res = GetQueuedCompletionStatus(
94     iocp,
95     processx__thread_getstatus_data.lpNumberOfBytes,
96     processx__thread_getstatus_data.lpCompletionKey,
97     processx__thread_getstatus_data.lpOverlapped,
98     processx__thread_getstatus_data.dwMilliseconds);
99 
100   if (processx__select) {
101     /* TODO: error */
102     send(processx__notify_socket[1], ok_buf, 2, 0);
103   }
104 
105   return res;
106 }
107 
processx__thread_callback(void * data)108 DWORD processx__thread_callback(void *data) {
109   while (1) {
110     WaitForSingleObject(processx__thread_start, INFINITE);
111 
112     processx__thread_success = TRUE;
113     processx__thread_last_error = 0;
114 
115     switch (processx__thread_cmd) {
116     case PROCESSX__THREAD_CMD_INIT:
117     case PROCESSX__THREAD_CMD_IDLE:
118       break;
119 
120     case PROCESSX__THREAD_CMD_READFILE:
121       processx__thread_success = processx_i_thread_readfile();
122       break;
123 
124     case PROCESSX__THREAD_CMD_GETSTATUS:
125       processx__thread_success = processx_i_thread_getstatus();
126       break;
127 
128     default:
129       /* ???? */
130       processx__thread_success = FALSE;
131       break;
132     }
133 
134     if (!processx__thread_success) {
135       processx__thread_last_error = GetLastError();
136     }
137 
138     processx__thread_cmd = PROCESSX__THREAD_CMD_IDLE;
139     SetEvent(processx__thread_done);
140   }
141   return 0;
142 }
143 
processx__start_thread()144 int processx__start_thread() {
145   if (processx__iocp_thread != NULL) return 0;
146 
147   DWORD threadid;
148 
149   processx__thread_start = CreateEventA(NULL, FALSE, FALSE, NULL);
150   processx__thread_done  = CreateEventA(NULL, FALSE, FALSE, NULL);
151 
152   if (processx__thread_start == NULL || processx__thread_done == NULL) {
153     if (processx__thread_start) CloseHandle(processx__thread_start);
154     if (processx__thread_done ) CloseHandle(processx__thread_done);
155     processx__thread_start = processx__thread_done = NULL;
156     R_THROW_SYSTEM_ERROR("Cannot create I/O events");
157   }
158 
159   processx__thread_cmd = PROCESSX__THREAD_CMD_INIT;
160 
161   processx__iocp_thread = CreateThread(
162     /* lpThreadAttributes = */ NULL,
163     /* dwStackSize = */        0,
164     /* lpStartAddress = */
165       (LPTHREAD_START_ROUTINE) processx__thread_callback,
166     /* lpParameter = */        0,
167     /* dwCreationFlags = */    0,
168     /* lpThreadId = */         &threadid);
169 
170   if (processx__iocp_thread == NULL) {
171     CloseHandle(processx__thread_start);
172     CloseHandle(processx__thread_done);
173     processx__thread_start = processx__thread_done = NULL;
174     R_THROW_SYSTEM_ERROR("Cannot start I/O thread");
175   }
176 
177   /* Wait for thread to be ready */
178   SetEvent(processx__thread_start);
179   WaitForSingleObject(processx__thread_done, INFINITE);
180 
181   return 0;
182 }
183 
184 /* ReadFile, but in the bg thread */
185 
processx__thread_readfile(processx_connection_t * ccon,LPVOID lpBuffer,DWORD nNumberOfBytesToRead,LPDWORD lpNumberOfBytesRead)186 BOOL processx__thread_readfile(processx_connection_t *ccon,
187 			       LPVOID lpBuffer,
188 			       DWORD nNumberOfBytesToRead,
189 			       LPDWORD lpNumberOfBytesRead) {
190 
191   processx__start_thread();
192   processx__thread_cmd = PROCESSX__THREAD_CMD_READFILE;
193 
194   processx__thread_readfile_data.ccon = ccon;
195   processx__thread_readfile_data.lpBuffer = lpBuffer;
196   processx__thread_readfile_data.nNumberOfBytesToRead = nNumberOfBytesToRead;
197   processx__thread_readfile_data.lpNumberOfBytesRead = lpNumberOfBytesRead;
198 
199   SetEvent(processx__thread_start);
200   WaitForSingleObject(processx__thread_done, INFINITE);
201 
202   return processx__thread_success;
203 }
204 
205 /* GetQueuedCompletionStatus but in the bg thread */
206 
processx__thread_getstatus(LPDWORD lpNumberOfBytes,PULONG_PTR lpCompletionKey,LPOVERLAPPED * lpOverlapped,DWORD dwMilliseconds)207 BOOL  processx__thread_getstatus(LPDWORD lpNumberOfBytes,
208 				 PULONG_PTR lpCompletionKey,
209 				 LPOVERLAPPED *lpOverlapped,
210 				 DWORD dwMilliseconds) {
211 
212   processx__start_thread();
213   processx__thread_cmd = PROCESSX__THREAD_CMD_GETSTATUS;
214 
215   processx__thread_getstatus_data.lpNumberOfBytes = lpNumberOfBytes;
216   processx__thread_getstatus_data.lpCompletionKey = lpCompletionKey;
217   processx__thread_getstatus_data.lpOverlapped = lpOverlapped;
218   processx__thread_getstatus_data.dwMilliseconds = dwMilliseconds;
219 
220   SetEvent(processx__thread_start);
221   WaitForSingleObject(processx__thread_done, INFINITE);
222 
223   return processx__thread_success;
224 }
225 
processx__thread_getstatus_select(LPDWORD lpNumberOfBytes,PULONG_PTR lpCompletionKey,LPOVERLAPPED * lpOverlapped,DWORD dwMilliseconds)226 BOOL processx__thread_getstatus_select(LPDWORD lpNumberOfBytes,
227 				       PULONG_PTR lpCompletionKey,
228 				       LPOVERLAPPED *lpOverlapped,
229 				       DWORD dwMilliseconds) {
230   TIMEVAL timeout;
231   char buf[10];
232   HANDLE iocp = processx__get_default_iocp();
233   int ret;
234 
235   processx__start_thread();
236 
237   timeout.tv_sec = dwMilliseconds / 1000;
238   timeout.tv_usec = dwMilliseconds % 1000 * 1000;
239 
240   processx__thread_cmd = PROCESSX__THREAD_CMD_GETSTATUS;
241 
242   processx__select = 1;
243   processx__thread_getstatus_data.lpNumberOfBytes = lpNumberOfBytes;
244   processx__thread_getstatus_data.lpCompletionKey = lpCompletionKey;
245   processx__thread_getstatus_data.lpOverlapped = lpOverlapped;
246   processx__thread_getstatus_data.dwMilliseconds = dwMilliseconds;
247 
248   SetEvent(processx__thread_start);
249   ret = select(/* (ignored) */ 0, &processx__readfds, &processx__writefds,
250 	 &processx__exceptionfds, &timeout);
251   if (FD_ISSET(processx__notify_socket[0], &processx__readfds)) {
252     /* TODO: error */
253     recv(processx__notify_socket[0], buf, 10, 0);
254   } else {
255     /* Wake up the IO thread. */
256     PostQueuedCompletionStatus(iocp, 0, processx__key_none, 0);
257   }
258 
259   /* This waits until the IO thread is done */
260   WaitForSingleObject(processx__thread_done, INFINITE);
261 
262   return processx__thread_success;
263 }
264 
processx__thread_get_last_error()265 DWORD processx__thread_get_last_error() {
266   return processx__thread_last_error;
267 }
268