1 /* AsyncIO.c
2 *
3 * Integrating Win32 asynchronous I/O with the GHC RTS.
4 *
5 * (c) sof, 2002-2003.
6 */
7
8 #if !defined(THREADED_RTS)
9
10 #include "Rts.h"
11 #include "RtsUtils.h"
12 #include <windows.h>
13 #include <stdio.h>
14 #include "Schedule.h"
15 #include "Capability.h"
16 #include "win32/AsyncIO.h"
17 #include "win32/IOManager.h"
18
19 /*
20 * Overview:
21 *
22 * Haskell code issue asynchronous I/O requests via the
23 * async{Read,Write,DoOp}# primops. These cause addIORequest()
24 * to be invoked, which forwards the request to the underlying
25 * asynchronous I/O subsystem. Each request is tagged with a unique
26 * ID.
27 *
28 * addIORequest() returns this ID, so that when the blocked CH
29 * thread is added onto blocked_queue, its TSO is annotated with
30 * it. Upon completion of an I/O request, the async I/O handling
31 * code makes a back-call to signal its completion; the local
32 * onIOComplete() routine. It adds the IO request ID (along with
33 * its result data) to a queue of completed requests before returning.
34 *
35 * The queue of completed IO request is read by the thread operating
36 * the RTS scheduler. It de-queues the CH threads corresponding
37 * to the request IDs, making them runnable again.
38 *
39 */
40
41 typedef struct CompletedReq {
42 unsigned int reqID;
43 HsInt len;
44 HsInt errCode;
45 } CompletedReq;
46
47 #define MAX_REQUESTS 200
48
49 static CRITICAL_SECTION queue_lock;
50 static HANDLE completed_req_event = INVALID_HANDLE_VALUE;
51 static HANDLE abandon_req_wait = INVALID_HANDLE_VALUE;
52 static HANDLE wait_handles[2];
53 static CompletedReq completedTable[MAX_REQUESTS];
54 static int completed_hw;
55 static HANDLE completed_table_sema;
56 static int issued_reqs;
57
58 static void
onIOComplete(unsigned int reqID,int fd STG_UNUSED,HsInt len,void * buf STG_UNUSED,HsInt errCode)59 onIOComplete(unsigned int reqID,
60 int fd STG_UNUSED,
61 HsInt len,
62 void* buf STG_UNUSED,
63 HsInt errCode)
64 {
65 DWORD dwRes;
66 /* Deposit result of request in queue/table..when there's room. */
67 dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
68 switch (dwRes) {
69 case WAIT_OBJECT_0:
70 break;
71 default:
72 /* Not likely */
73 fprintf(stderr,
74 "onIOComplete: failed to grab table semaphore, "
75 "dropping request 0x%x\n", reqID);
76 fflush(stderr);
77 return;
78 }
79 EnterCriticalSection(&queue_lock);
80 if (completed_hw == MAX_REQUESTS) {
81 /* Shouldn't happen */
82 fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); "
83 "dropping.\n", reqID);
84 fflush(stderr);
85 } else {
86 #if 0
87 fprintf(stderr, "onCompl: %d %d %d %d %d\n",
88 reqID, len, errCode, issued_reqs, completed_hw);
89 fflush(stderr);
90 #endif
91 completedTable[completed_hw].reqID = reqID;
92 completedTable[completed_hw].len = len;
93 completedTable[completed_hw].errCode = errCode;
94 completed_hw++;
95 issued_reqs--;
96 if (completed_hw == 1) {
97 /* The event is used to wake up the scheduler thread should it
98 * be blocked waiting for requests to complete. The event resets
99 * once that thread has cleared out the request queue/table.
100 */
101 SetEvent(completed_req_event);
102 }
103 }
104 LeaveCriticalSection(&queue_lock);
105 }
106
107 unsigned int
addIORequest(int fd,bool forWriting,bool isSock,HsInt len,char * buf)108 addIORequest(int fd,
109 bool forWriting,
110 bool isSock,
111 HsInt len,
112 char* buf)
113 {
114 EnterCriticalSection(&queue_lock);
115 issued_reqs++;
116 LeaveCriticalSection(&queue_lock);
117 #if 0
118 fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len);
119 fflush(stderr);
120 #endif
121 return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
122 }
123
124 unsigned int
addDelayRequest(HsInt usecs)125 addDelayRequest(HsInt usecs)
126 {
127 EnterCriticalSection(&queue_lock);
128 issued_reqs++;
129 LeaveCriticalSection(&queue_lock);
130 #if 0
131 fprintf(stderr, "addDelayReq: %d\n", usecs); fflush(stderr);
132 #endif
133 return AddDelayRequest(usecs,onIOComplete);
134 }
135
136 unsigned int
addDoProcRequest(void * proc,void * param)137 addDoProcRequest(void* proc, void* param)
138 {
139 EnterCriticalSection(&queue_lock);
140 issued_reqs++;
141 LeaveCriticalSection(&queue_lock);
142 #if 0
143 fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
144 #endif
145 return AddProcRequest(proc,param,onIOComplete);
146 }
147
148
149 int
startupAsyncIO()150 startupAsyncIO()
151 {
152 if (!StartIOManager()) {
153 return 0;
154 }
155 InitializeCriticalSection(&queue_lock);
156 /* Create a pair of events:
157 *
158 * - completed_req_event -- signals the deposit of request result;
159 * manual reset.
160 * - abandon_req_wait -- external OS thread tells current
161 * RTS/Scheduler thread to abandon wait
162 * for IO request completion.
163 * Auto reset.
164 */
165 completed_req_event = CreateEvent (NULL, TRUE, FALSE, NULL);
166 abandon_req_wait = CreateEvent (NULL, FALSE, FALSE, NULL);
167 wait_handles[0] = completed_req_event;
168 wait_handles[1] = abandon_req_wait;
169 completed_hw = 0;
170 if ( !(completed_table_sema = CreateSemaphore(NULL, MAX_REQUESTS,
171 MAX_REQUESTS, NULL)) ) {
172 DWORD rc = GetLastError();
173 fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n",
174 (int)rc);
175 fflush(stderr);
176 }
177
178 return ( completed_req_event != INVALID_HANDLE_VALUE &&
179 abandon_req_wait != INVALID_HANDLE_VALUE &&
180 completed_table_sema != NULL );
181 }
182
183 void
shutdownAsyncIO(bool wait_threads)184 shutdownAsyncIO(bool wait_threads)
185 {
186 ShutdownIOManager(wait_threads);
187 if (completed_req_event != INVALID_HANDLE_VALUE) {
188 CloseHandle(completed_req_event);
189 completed_req_event = INVALID_HANDLE_VALUE;
190 }
191 if (abandon_req_wait != INVALID_HANDLE_VALUE) {
192 CloseHandle(abandon_req_wait);
193 abandon_req_wait = INVALID_HANDLE_VALUE;
194 }
195 if (completed_table_sema != NULL) {
196 CloseHandle(completed_table_sema);
197 completed_table_sema = NULL;
198 }
199 DeleteCriticalSection(&queue_lock);
200 }
201
202 /*
203 * Function: awaitRequests(wait)
204 *
205 * Check for the completion of external IO work requests. Worker
206 * threads signal completion of IO requests by depositing them
207 * in a table (completedTable). awaitRequests() matches up
208 * requests in that table with threads on the blocked_queue,
209 * making the threads whose IO requests have completed runnable
210 * again.
211 *
212 * awaitRequests() is called by the scheduler periodically _or_ if
213 * it is out of work, and need to wait for the completion of IO
214 * requests to make further progress. In the latter scenario,
215 * awaitRequests() will simply block waiting for worker threads
216 * to complete if the 'completedTable' is empty.
217 */
218 int
awaitRequests(bool wait)219 awaitRequests(bool wait)
220 {
221 #if !defined(THREADED_RTS)
222 // none of this is actually used in the threaded RTS
223
224 start:
225 #if 0
226 fprintf(stderr, "awaitRequests(): %d %d %d\n",
227 issued_reqs, completed_hw, wait);
228 fflush(stderr);
229 #endif
230 EnterCriticalSection(&queue_lock);
231 // Nothing immediately available & we won't wait
232 if ((!wait && completed_hw == 0)
233 #if 0
234 // If we just return when wait==false, we'll go into a busy
235 // wait loop, so I disabled this condition --SDM 18/12/2003
236 (issued_reqs == 0 && completed_hw == 0)
237 #endif
238 ) {
239 LeaveCriticalSection(&queue_lock);
240 return 0;
241 }
242 if (completed_hw == 0) {
243 // empty table, drop lock and wait
244 LeaveCriticalSection(&queue_lock);
245 if ( wait && sched_state == SCHED_RUNNING ) {
246 DWORD dwRes = WaitForMultipleObjects(2, wait_handles,
247 FALSE, INFINITE);
248 switch (dwRes) {
249 case WAIT_OBJECT_0:
250 // a request was completed
251 break;
252 case WAIT_OBJECT_0 + 1:
253 case WAIT_TIMEOUT:
254 // timeout (unlikely) or told to abandon waiting
255 return 0;
256 case WAIT_FAILED: {
257 DWORD dw = GetLastError();
258 fprintf(stderr, "awaitRequests: wait failed -- "
259 "error code: %lu\n", dw); fflush(stderr);
260 return 0;
261 }
262 default:
263 fprintf(stderr, "awaitRequests: unexpected wait return "
264 "code %lu\n", dwRes); fflush(stderr);
265 return 0;
266 }
267 } else {
268 return 0;
269 }
270 goto start;
271 } else {
272 int i;
273 StgTSO *tso, *prev;
274
275 for (i=0; i < completed_hw; i++) {
276 /* For each of the completed requests, match up their Ids
277 * with those of the threads on the blocked_queue. If the
278 * thread that made the IO request has been subsequently
279 * killed (and removed from blocked_queue), no match will
280 * be found for that request Id.
281 *
282 * i.e., killing a Haskell thread doesn't attempt to cancel
283 * the IO request it is blocked on.
284 *
285 */
286 unsigned int rID = completedTable[i].reqID;
287
288 prev = NULL;
289 for(tso = blocked_queue_hd; tso != END_TSO_QUEUE;
290 tso = tso->_link) {
291
292 switch(tso->why_blocked) {
293 case BlockedOnRead:
294 case BlockedOnWrite:
295 case BlockedOnDoProc:
296 if (tso->block_info.async_result->reqID == rID) {
297 // Found the thread blocked waiting on request;
298 // stodgily fill
299 // in its result block.
300 tso->block_info.async_result->len =
301 completedTable[i].len;
302 tso->block_info.async_result->errCode =
303 completedTable[i].errCode;
304
305 // Drop the matched TSO from blocked_queue
306 if (prev) {
307 setTSOLink(&MainCapability, prev, tso->_link);
308 } else {
309 blocked_queue_hd = tso->_link;
310 }
311 if (blocked_queue_tl == tso) {
312 blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
313 }
314
315 // Terminates the run queue + this inner for-loop.
316 tso->_link = END_TSO_QUEUE;
317 tso->why_blocked = NotBlocked;
318 // save the StgAsyncIOResult in the
319 // stg_block_async_info stack frame, because
320 // the block_info field will be overwritten by
321 // pushOnRunQueue().
322 tso->stackobj->sp[1] = (W_)tso->block_info.async_result;
323 pushOnRunQueue(&MainCapability, tso);
324 break;
325 }
326 break;
327 default:
328 if (tso->why_blocked != NotBlocked) {
329 barf("awaitRequests: odd thread state");
330 }
331 break;
332 }
333
334 prev = tso;
335 }
336 /* Signal that there's completed table slots available */
337 if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
338 DWORD dw = GetLastError();
339 fprintf(stderr, "awaitRequests: failed to signal semaphore "
340 "(error code=0x%x)\n", (int)dw);
341 fflush(stderr);
342 }
343 }
344 completed_hw = 0;
345 ResetEvent(completed_req_event);
346 LeaveCriticalSection(&queue_lock);
347 return 1;
348 }
349 #endif /* !THREADED_RTS */
350 }
351
352 /*
353 * Function: abandonRequestWait()
354 *
355 * Wake up a thread that's blocked waiting for new IO requests
356 * to complete (via awaitRequests().)
357 */
358 void
abandonRequestWait(void)359 abandonRequestWait( void )
360 {
361 /* the event is auto-reset, but in case there's no thread
362 * already waiting on the event, we want to return it to
363 * a non-signalled state.
364 *
365 * Careful! There is no synchronisation between
366 * abandonRequestWait and awaitRequest, which means that
367 * abandonRequestWait might be called just before a thread
368 * goes into a wait, and we miss the abandon signal. So we
369 * must SetEvent() here rather than PulseEvent() to ensure
370 * that the event isn't lost. We can re-optimise by resetting
371 * the event somewhere safe if we know the event has been
372 * properly serviced (see resetAbandon() below). --SDM 18/12/2003
373 */
374 SetEvent(abandon_req_wait);
375 }
376
377 void
resetAbandonRequestWait(void)378 resetAbandonRequestWait( void )
379 {
380 ResetEvent(abandon_req_wait);
381 }
382
383 #endif /* !defined(THREADED_RTS) */
384