1
2 #include "../processx.h"
3
4 HANDLE processx__connection_iocp = NULL;
5
processx__get_default_iocp()6 HANDLE processx__get_default_iocp() {
7 if (! processx__connection_iocp) {
8 processx__connection_iocp = CreateIoCompletionPort(
9 /* FileHandle = */ INVALID_HANDLE_VALUE,
10 /* ExistingCompletionPort = */ NULL,
11 /* CompletionKey = */ 0,
12 /* NumberOfConcurrentThreads = */ 0);
13 }
14 return processx__connection_iocp;
15 }
16
17 HANDLE processx__iocp_thread = NULL;
18 HANDLE processx__thread_start = NULL;
19 HANDLE processx__thread_done = NULL;
20 BOOL processx__thread_success;
21 void *processx__thread_data = NULL;
22 DWORD processx__thread_last_error = 0;
23 int processx__thread_cmd = PROCESSX__THREAD_CMD_INIT;
24
25 fd_set processx__readfds,
26 processx__writefds,
27 processx__exceptionfds;
28 SOCKET processx__notify_socket[2] = { 0, 0 };
29 int processx__select = 0;
30
31 struct processx__thread_readfile_data {
32 processx_connection_t *ccon;
33 LPVOID lpBuffer;
34 DWORD nNumberOfBytesToRead;
35 LPDWORD lpNumberOfBytesRead;
36 } processx__thread_readfile_data;
37
38 struct processx__thread_getstatus_data {
39 LPDWORD lpNumberOfBytes;
40 PULONG_PTR lpCompletionKey;
41 LPOVERLAPPED *lpOverlapped;
42 DWORD dwMilliseconds;
43 } processx__thread_getstatus_data;
44
45 ULONG_PTR processx__key_none = 1;
46
processx_i_thread_readfile()47 DWORD processx_i_thread_readfile() {
48
49 processx_connection_t *ccon = processx__thread_readfile_data.ccon;
50
51 if (! ccon->handle.overlapped.hEvent &&
52 (ccon->type == PROCESSX_FILE_TYPE_ASYNCFILE ||
53 ccon->type == PROCESSX_FILE_TYPE_ASYNCPIPE)) {
54 ccon->handle.overlapped.hEvent = CreateEvent(
55 /* lpEventAttributes = */ NULL,
56 /* bManualReset = */ FALSE,
57 /* bInitialState = */ FALSE,
58 /* lpName = */ NULL);
59
60 if (ccon->handle.overlapped.hEvent == NULL) return FALSE;
61
62 HANDLE iocp = processx__get_default_iocp();
63 if (!iocp) return FALSE;
64
65 HANDLE res = CreateIoCompletionPort(
66 /* FileHandle = */ ccon->handle.handle,
67 /* ExistingCompletionPort = */ iocp,
68 /* CompletionKey = */ (ULONG_PTR) ccon,
69 /* NumberOfConcurrentThreads = */ 0);
70
71 if (!res) return FALSE;
72 }
73
74 /* These need to be set to zero for non-file handles */
75 if (ccon->type != PROCESSX_FILE_TYPE_ASYNCFILE) {
76 ccon->handle.overlapped.Offset = 0;
77 ccon->handle.overlapped.OffsetHigh = 0;
78 }
79
80 DWORD res = ReadFile(ccon->handle.handle,
81 processx__thread_readfile_data.lpBuffer,
82 processx__thread_readfile_data.nNumberOfBytesToRead,
83 processx__thread_readfile_data.lpNumberOfBytesRead,
84 &ccon->handle.overlapped);
85 return res;
86 }
87
processx_i_thread_getstatus()88 DWORD processx_i_thread_getstatus() {
89 static const char *ok_buf = "OK";
90 HANDLE iocp = processx__get_default_iocp();
91 if (!iocp) return FALSE;
92
93 DWORD res = GetQueuedCompletionStatus(
94 iocp,
95 processx__thread_getstatus_data.lpNumberOfBytes,
96 processx__thread_getstatus_data.lpCompletionKey,
97 processx__thread_getstatus_data.lpOverlapped,
98 processx__thread_getstatus_data.dwMilliseconds);
99
100 if (processx__select) {
101 /* TODO: error */
102 send(processx__notify_socket[1], ok_buf, 2, 0);
103 }
104
105 return res;
106 }
107
processx__thread_callback(void * data)108 DWORD processx__thread_callback(void *data) {
109 while (1) {
110 WaitForSingleObject(processx__thread_start, INFINITE);
111
112 processx__thread_success = TRUE;
113 processx__thread_last_error = 0;
114
115 switch (processx__thread_cmd) {
116 case PROCESSX__THREAD_CMD_INIT:
117 case PROCESSX__THREAD_CMD_IDLE:
118 break;
119
120 case PROCESSX__THREAD_CMD_READFILE:
121 processx__thread_success = processx_i_thread_readfile();
122 break;
123
124 case PROCESSX__THREAD_CMD_GETSTATUS:
125 processx__thread_success = processx_i_thread_getstatus();
126 break;
127
128 default:
129 /* ???? */
130 processx__thread_success = FALSE;
131 break;
132 }
133
134 if (!processx__thread_success) {
135 processx__thread_last_error = GetLastError();
136 }
137
138 processx__thread_cmd = PROCESSX__THREAD_CMD_IDLE;
139 SetEvent(processx__thread_done);
140 }
141 return 0;
142 }
143
processx__start_thread()144 int processx__start_thread() {
145 if (processx__iocp_thread != NULL) return 0;
146
147 DWORD threadid;
148
149 processx__thread_start = CreateEventA(NULL, FALSE, FALSE, NULL);
150 processx__thread_done = CreateEventA(NULL, FALSE, FALSE, NULL);
151
152 if (processx__thread_start == NULL || processx__thread_done == NULL) {
153 if (processx__thread_start) CloseHandle(processx__thread_start);
154 if (processx__thread_done ) CloseHandle(processx__thread_done);
155 processx__thread_start = processx__thread_done = NULL;
156 R_THROW_SYSTEM_ERROR("Cannot create I/O events");
157 }
158
159 processx__thread_cmd = PROCESSX__THREAD_CMD_INIT;
160
161 processx__iocp_thread = CreateThread(
162 /* lpThreadAttributes = */ NULL,
163 /* dwStackSize = */ 0,
164 /* lpStartAddress = */
165 (LPTHREAD_START_ROUTINE) processx__thread_callback,
166 /* lpParameter = */ 0,
167 /* dwCreationFlags = */ 0,
168 /* lpThreadId = */ &threadid);
169
170 if (processx__iocp_thread == NULL) {
171 CloseHandle(processx__thread_start);
172 CloseHandle(processx__thread_done);
173 processx__thread_start = processx__thread_done = NULL;
174 R_THROW_SYSTEM_ERROR("Cannot start I/O thread");
175 }
176
177 /* Wait for thread to be ready */
178 SetEvent(processx__thread_start);
179 WaitForSingleObject(processx__thread_done, INFINITE);
180
181 return 0;
182 }
183
184 /* ReadFile, but in the bg thread */
185
processx__thread_readfile(processx_connection_t * ccon,LPVOID lpBuffer,DWORD nNumberOfBytesToRead,LPDWORD lpNumberOfBytesRead)186 BOOL processx__thread_readfile(processx_connection_t *ccon,
187 LPVOID lpBuffer,
188 DWORD nNumberOfBytesToRead,
189 LPDWORD lpNumberOfBytesRead) {
190
191 processx__start_thread();
192 processx__thread_cmd = PROCESSX__THREAD_CMD_READFILE;
193
194 processx__thread_readfile_data.ccon = ccon;
195 processx__thread_readfile_data.lpBuffer = lpBuffer;
196 processx__thread_readfile_data.nNumberOfBytesToRead = nNumberOfBytesToRead;
197 processx__thread_readfile_data.lpNumberOfBytesRead = lpNumberOfBytesRead;
198
199 SetEvent(processx__thread_start);
200 WaitForSingleObject(processx__thread_done, INFINITE);
201
202 return processx__thread_success;
203 }
204
205 /* GetQueuedCompletionStatus but in the bg thread */
206
processx__thread_getstatus(LPDWORD lpNumberOfBytes,PULONG_PTR lpCompletionKey,LPOVERLAPPED * lpOverlapped,DWORD dwMilliseconds)207 BOOL processx__thread_getstatus(LPDWORD lpNumberOfBytes,
208 PULONG_PTR lpCompletionKey,
209 LPOVERLAPPED *lpOverlapped,
210 DWORD dwMilliseconds) {
211
212 processx__start_thread();
213 processx__thread_cmd = PROCESSX__THREAD_CMD_GETSTATUS;
214
215 processx__thread_getstatus_data.lpNumberOfBytes = lpNumberOfBytes;
216 processx__thread_getstatus_data.lpCompletionKey = lpCompletionKey;
217 processx__thread_getstatus_data.lpOverlapped = lpOverlapped;
218 processx__thread_getstatus_data.dwMilliseconds = dwMilliseconds;
219
220 SetEvent(processx__thread_start);
221 WaitForSingleObject(processx__thread_done, INFINITE);
222
223 return processx__thread_success;
224 }
225
processx__thread_getstatus_select(LPDWORD lpNumberOfBytes,PULONG_PTR lpCompletionKey,LPOVERLAPPED * lpOverlapped,DWORD dwMilliseconds)226 BOOL processx__thread_getstatus_select(LPDWORD lpNumberOfBytes,
227 PULONG_PTR lpCompletionKey,
228 LPOVERLAPPED *lpOverlapped,
229 DWORD dwMilliseconds) {
230 TIMEVAL timeout;
231 char buf[10];
232 HANDLE iocp = processx__get_default_iocp();
233 int ret;
234
235 processx__start_thread();
236
237 timeout.tv_sec = dwMilliseconds / 1000;
238 timeout.tv_usec = dwMilliseconds % 1000 * 1000;
239
240 processx__thread_cmd = PROCESSX__THREAD_CMD_GETSTATUS;
241
242 processx__select = 1;
243 processx__thread_getstatus_data.lpNumberOfBytes = lpNumberOfBytes;
244 processx__thread_getstatus_data.lpCompletionKey = lpCompletionKey;
245 processx__thread_getstatus_data.lpOverlapped = lpOverlapped;
246 processx__thread_getstatus_data.dwMilliseconds = dwMilliseconds;
247
248 SetEvent(processx__thread_start);
249 ret = select(/* (ignored) */ 0, &processx__readfds, &processx__writefds,
250 &processx__exceptionfds, &timeout);
251 if (FD_ISSET(processx__notify_socket[0], &processx__readfds)) {
252 /* TODO: error */
253 recv(processx__notify_socket[0], buf, 10, 0);
254 } else {
255 /* Wake up the IO thread. */
256 PostQueuedCompletionStatus(iocp, 0, processx__key_none, 0);
257 }
258
259 /* This waits until the IO thread is done */
260 WaitForSingleObject(processx__thread_done, INFINITE);
261
262 return processx__thread_success;
263 }
264
processx__thread_get_last_error()265 DWORD processx__thread_get_last_error() {
266 return processx__thread_last_error;
267 }
268