1 /* AsyncIO.c
2  *
3  * Integrating Win32 asynchronous I/O with the GHC RTS.
4  *
5  * (c) sof, 2002-2003.
6  */
7 
8 #if !defined(THREADED_RTS)
9 
10 #include "Rts.h"
11 #include "RtsUtils.h"
12 #include <windows.h>
13 #include <stdio.h>
14 #include "Schedule.h"
15 #include "Capability.h"
16 #include "win32/AsyncIO.h"
17 #include "win32/IOManager.h"
18 
19 /*
20  * Overview:
21  *
22  * Haskell code issue asynchronous I/O requests via the
23  * async{Read,Write,DoOp}# primops. These cause addIORequest()
24  * to be invoked, which forwards the request to the underlying
25  * asynchronous I/O subsystem. Each request is tagged with a unique
26  * ID.
27  *
28  * addIORequest() returns this ID, so that when the blocked CH
29  * thread is added onto blocked_queue, its TSO is annotated with
30  * it. Upon completion of an I/O request, the async I/O handling
31  * code makes a back-call to signal its completion; the local
32  * onIOComplete() routine. It adds the IO request ID (along with
33  * its result data) to a queue of completed requests before returning.
34  *
35  * The queue of completed IO request is read by the thread operating
36  * the RTS scheduler. It de-queues the CH threads corresponding
37  * to the request IDs, making them runnable again.
38  *
39  */
40 
41 typedef struct CompletedReq {
42     unsigned int   reqID;
43     HsInt          len;
44     HsInt          errCode;
45 } CompletedReq;
46 
47 #define MAX_REQUESTS 200
48 
49 static CRITICAL_SECTION queue_lock;
50 static HANDLE           completed_req_event = INVALID_HANDLE_VALUE;
51 static HANDLE           abandon_req_wait = INVALID_HANDLE_VALUE;
52 static HANDLE           wait_handles[2];
53 static CompletedReq     completedTable[MAX_REQUESTS];
54 static int              completed_hw;
55 static HANDLE           completed_table_sema;
56 static int              issued_reqs;
57 
58 static void
onIOComplete(unsigned int reqID,int fd STG_UNUSED,HsInt len,void * buf STG_UNUSED,HsInt errCode)59 onIOComplete(unsigned int reqID,
60              int   fd STG_UNUSED,
61              HsInt len,
62              void* buf STG_UNUSED,
63              HsInt errCode)
64 {
65     DWORD dwRes;
66     /* Deposit result of request in queue/table..when there's room. */
67     dwRes = WaitForSingleObject(completed_table_sema, INFINITE);
68     switch (dwRes) {
69     case WAIT_OBJECT_0:
70         break;
71     default:
72         /* Not likely */
73         fprintf(stderr,
74                 "onIOComplete: failed to grab table semaphore, "
75                 "dropping request 0x%x\n", reqID);
76         fflush(stderr);
77         return;
78     }
79     EnterCriticalSection(&queue_lock);
80     if (completed_hw == MAX_REQUESTS) {
81         /* Shouldn't happen */
82         fprintf(stderr, "onIOComplete: ERROR -- Request table overflow (%d); "
83                         "dropping.\n", reqID);
84         fflush(stderr);
85     } else {
86 #if 0
87         fprintf(stderr, "onCompl: %d %d %d %d %d\n",
88                 reqID, len, errCode, issued_reqs, completed_hw);
89         fflush(stderr);
90 #endif
91         completedTable[completed_hw].reqID   = reqID;
92         completedTable[completed_hw].len     = len;
93         completedTable[completed_hw].errCode = errCode;
94         completed_hw++;
95         issued_reqs--;
96         if (completed_hw == 1) {
97             /* The event is used to wake up the scheduler thread should it
98              * be blocked waiting for requests to complete. The event resets
99              * once that thread has cleared out the request queue/table.
100              */
101             SetEvent(completed_req_event);
102         }
103     }
104     LeaveCriticalSection(&queue_lock);
105 }
106 
107 unsigned int
addIORequest(int fd,bool forWriting,bool isSock,HsInt len,char * buf)108 addIORequest(int   fd,
109              bool  forWriting,
110              bool  isSock,
111              HsInt len,
112              char* buf)
113 {
114     EnterCriticalSection(&queue_lock);
115     issued_reqs++;
116     LeaveCriticalSection(&queue_lock);
117 #if 0
118     fprintf(stderr, "addIOReq: %d %d %d\n", fd, forWriting, len);
119     fflush(stderr);
120 #endif
121     return AddIORequest(fd,forWriting,isSock,len,buf,onIOComplete);
122 }
123 
124 unsigned int
addDelayRequest(HsInt usecs)125 addDelayRequest(HsInt usecs)
126 {
127     EnterCriticalSection(&queue_lock);
128     issued_reqs++;
129     LeaveCriticalSection(&queue_lock);
130 #if 0
131     fprintf(stderr, "addDelayReq: %d\n", usecs); fflush(stderr);
132 #endif
133     return AddDelayRequest(usecs,onIOComplete);
134 }
135 
136 unsigned int
addDoProcRequest(void * proc,void * param)137 addDoProcRequest(void* proc, void* param)
138 {
139     EnterCriticalSection(&queue_lock);
140     issued_reqs++;
141     LeaveCriticalSection(&queue_lock);
142 #if 0
143     fprintf(stderr, "addProcReq: %p %p\n", proc, param); fflush(stderr);
144 #endif
145     return AddProcRequest(proc,param,onIOComplete);
146 }
147 
148 
149 int
startupAsyncIO()150 startupAsyncIO()
151 {
152     if (!StartIOManager()) {
153         return 0;
154     }
155     InitializeCriticalSection(&queue_lock);
156     /* Create a pair of events:
157      *
158      *    - completed_req_event -- signals the deposit of request result;
159      *                             manual reset.
160      *    - abandon_req_wait    -- external OS thread tells current
161      *                             RTS/Scheduler thread to abandon wait
162      *                             for IO request completion.
163      *                             Auto reset.
164      */
165     completed_req_event = CreateEvent (NULL, TRUE,  FALSE, NULL);
166     abandon_req_wait    = CreateEvent (NULL, FALSE, FALSE, NULL);
167     wait_handles[0] = completed_req_event;
168     wait_handles[1] = abandon_req_wait;
169     completed_hw = 0;
170     if ( !(completed_table_sema = CreateSemaphore(NULL, MAX_REQUESTS,
171                                                   MAX_REQUESTS, NULL)) ) {
172         DWORD rc = GetLastError();
173         fprintf(stderr, "startupAsyncIO: CreateSemaphore failed 0x%x\n",
174                 (int)rc);
175         fflush(stderr);
176     }
177 
178     return ( completed_req_event  != INVALID_HANDLE_VALUE &&
179              abandon_req_wait     != INVALID_HANDLE_VALUE &&
180              completed_table_sema != NULL );
181 }
182 
183 void
shutdownAsyncIO(bool wait_threads)184 shutdownAsyncIO(bool wait_threads)
185 {
186     ShutdownIOManager(wait_threads);
187     if (completed_req_event != INVALID_HANDLE_VALUE) {
188         CloseHandle(completed_req_event);
189         completed_req_event = INVALID_HANDLE_VALUE;
190     }
191     if (abandon_req_wait != INVALID_HANDLE_VALUE) {
192         CloseHandle(abandon_req_wait);
193         abandon_req_wait = INVALID_HANDLE_VALUE;
194     }
195     if (completed_table_sema != NULL) {
196         CloseHandle(completed_table_sema);
197         completed_table_sema = NULL;
198     }
199     DeleteCriticalSection(&queue_lock);
200 }
201 
202 /*
203  * Function: awaitRequests(wait)
204  *
205  * Check for the completion of external IO work requests. Worker
206  * threads signal completion of IO requests by depositing them
207  * in a table (completedTable). awaitRequests() matches up
208  * requests in that table with threads on the blocked_queue,
209  * making the threads whose IO requests have completed runnable
210  * again.
211  *
212  * awaitRequests() is called by the scheduler periodically _or_ if
213  * it is out of work, and need to wait for the completion of IO
214  * requests to make further progress. In the latter scenario,
215  * awaitRequests() will simply block waiting for worker threads
216  * to complete if the 'completedTable' is empty.
217  */
218 int
awaitRequests(bool wait)219 awaitRequests(bool wait)
220 {
221 #if !defined(THREADED_RTS)
222   // none of this is actually used in the threaded RTS
223 
224 start:
225 #if 0
226     fprintf(stderr, "awaitRequests(): %d %d %d\n",
227             issued_reqs, completed_hw, wait);
228     fflush(stderr);
229 #endif
230     EnterCriticalSection(&queue_lock);
231     // Nothing immediately available & we won't wait
232     if ((!wait && completed_hw == 0)
233 #if 0
234         // If we just return when wait==false, we'll go into a busy
235         // wait loop, so I disabled this condition --SDM 18/12/2003
236         (issued_reqs == 0 && completed_hw == 0)
237 #endif
238         ) {
239         LeaveCriticalSection(&queue_lock);
240         return 0;
241     }
242     if (completed_hw == 0) {
243         // empty table, drop lock and wait
244         LeaveCriticalSection(&queue_lock);
245         if ( wait && sched_state == SCHED_RUNNING ) {
246             DWORD dwRes = WaitForMultipleObjects(2, wait_handles,
247                                                  FALSE, INFINITE);
248             switch (dwRes) {
249             case WAIT_OBJECT_0:
250                 // a request was completed
251                 break;
252             case WAIT_OBJECT_0 + 1:
253             case WAIT_TIMEOUT:
254                 // timeout (unlikely) or told to abandon waiting
255                 return 0;
256             case WAIT_FAILED: {
257                 DWORD dw = GetLastError();
258                 fprintf(stderr, "awaitRequests: wait failed -- "
259                                 "error code: %lu\n", dw); fflush(stderr);
260                 return 0;
261             }
262             default:
263                 fprintf(stderr, "awaitRequests: unexpected wait return "
264                                 "code %lu\n", dwRes); fflush(stderr);
265                 return 0;
266             }
267         } else {
268             return 0;
269         }
270         goto start;
271     } else {
272         int i;
273         StgTSO *tso, *prev;
274 
275         for (i=0; i < completed_hw; i++) {
276             /* For each of the completed requests, match up their Ids
277              * with those of the threads on the blocked_queue. If the
278              * thread that made the IO request has been subsequently
279              * killed (and removed from blocked_queue), no match will
280              * be found for that request Id.
281              *
282              * i.e., killing a Haskell thread doesn't attempt to cancel
283              * the IO request it is blocked on.
284              *
285              */
286             unsigned int rID = completedTable[i].reqID;
287 
288             prev = NULL;
289             for(tso = blocked_queue_hd; tso != END_TSO_QUEUE;
290                   tso = tso->_link) {
291 
292                 switch(tso->why_blocked) {
293                 case BlockedOnRead:
294                 case BlockedOnWrite:
295                 case BlockedOnDoProc:
296                     if (tso->block_info.async_result->reqID == rID) {
297                         // Found the thread blocked waiting on request;
298                         // stodgily fill
299                         // in its result block.
300                         tso->block_info.async_result->len =
301                           completedTable[i].len;
302                         tso->block_info.async_result->errCode =
303                           completedTable[i].errCode;
304 
305                         // Drop the matched TSO from blocked_queue
306                         if (prev) {
307                             setTSOLink(&MainCapability, prev, tso->_link);
308                         } else {
309                             blocked_queue_hd = tso->_link;
310                         }
311                         if (blocked_queue_tl == tso) {
312                             blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
313                         }
314 
315                         // Terminates the run queue + this inner for-loop.
316                         tso->_link = END_TSO_QUEUE;
317                         tso->why_blocked = NotBlocked;
318                         // save the StgAsyncIOResult in the
319                         // stg_block_async_info stack frame, because
320                         // the block_info field will be overwritten by
321                         // pushOnRunQueue().
322                         tso->stackobj->sp[1] = (W_)tso->block_info.async_result;
323                         pushOnRunQueue(&MainCapability, tso);
324                         break;
325                     }
326                     break;
327                 default:
328                     if (tso->why_blocked != NotBlocked) {
329                         barf("awaitRequests: odd thread state");
330                     }
331                     break;
332                 }
333 
334                 prev = tso;
335             }
336             /* Signal that there's completed table slots available */
337             if ( !ReleaseSemaphore(completed_table_sema, 1, NULL) ) {
338                 DWORD dw = GetLastError();
339                 fprintf(stderr, "awaitRequests: failed to signal semaphore "
340                                 "(error code=0x%x)\n", (int)dw);
341                 fflush(stderr);
342             }
343         }
344         completed_hw = 0;
345         ResetEvent(completed_req_event);
346         LeaveCriticalSection(&queue_lock);
347         return 1;
348     }
349 #endif /* !THREADED_RTS */
350 }
351 
352 /*
353  * Function: abandonRequestWait()
354  *
355  * Wake up a thread that's blocked waiting for new IO requests
356  * to complete (via awaitRequests().)
357  */
358 void
abandonRequestWait(void)359 abandonRequestWait( void )
360 {
361     /* the event is auto-reset, but in case there's no thread
362      * already waiting on the event, we want to return it to
363      * a non-signalled state.
364      *
365      * Careful!  There is no synchronisation between
366      * abandonRequestWait and awaitRequest, which means that
367      * abandonRequestWait might be called just before a thread
368      * goes into a wait, and we miss the abandon signal.  So we
369      * must SetEvent() here rather than PulseEvent() to ensure
370      * that the event isn't lost.  We can re-optimise by resetting
371      * the event somewhere safe if we know the event has been
372      * properly serviced (see resetAbandon() below).  --SDM 18/12/2003
373      */
374     SetEvent(abandon_req_wait);
375 }
376 
377 void
resetAbandonRequestWait(void)378 resetAbandonRequestWait( void )
379 {
380     ResetEvent(abandon_req_wait);
381 }
382 
383 #endif /* !defined(THREADED_RTS) */
384