1 /**************************************************************************/
2 /*                                                                        */
3 /*                                 OCaml                                  */
4 /*                                                                        */
5 /*   Contributed by Sylvain Le Gall for Lexifi                            */
6 /*                                                                        */
7 /*   Copyright 2008 Institut National de Recherche en Informatique et     */
8 /*     en Automatique.                                                    */
9 /*                                                                        */
10 /*   All rights reserved.  This file is distributed under the terms of    */
11 /*   the GNU Lesser General Public License version 2.1, with the          */
12 /*   special exception on linking described in the file LICENSE.          */
13 /*                                                                        */
14 /**************************************************************************/
15 
16 #include <caml/mlvalues.h>
17 #include <caml/alloc.h>
18 #include <caml/memory.h>
19 #include <caml/fail.h>
20 #include <caml/signals.h>
21 #include "winworker.h"
22 #include <stdio.h>
23 #include "windbug.h"
24 #include "winlist.h"
25 
26 /* This constant define the maximum number of objects that
27  * can be handle by a SELECTDATA.
28  * It takes the following parameters into account:
29  * - limitation on number of objects is mostly due to limitation
30  *   a WaitForMultipleObjects
31  * - there is always an event "hStop" to watch
32  *
33  * This lead to pick the following value as the biggest possible
34  * value
35  */
36 #define MAXIMUM_SELECT_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
37 
38 /* Manage set of handle */
39 typedef struct _SELECTHANDLESET {
40   LPHANDLE lpHdl;
41   DWORD    nMax;
42   DWORD    nLast;
43 } SELECTHANDLESET;
44 
45 typedef SELECTHANDLESET *LPSELECTHANDLESET;
46 
handle_set_init(LPSELECTHANDLESET hds,LPHANDLE lpHdl,DWORD max)47 void handle_set_init (LPSELECTHANDLESET hds, LPHANDLE lpHdl, DWORD max)
48 {
49   DWORD i;
50 
51   hds->lpHdl = lpHdl;
52   hds->nMax  = max;
53   hds->nLast = 0;
54 
55   /* Set to invalid value every entry of the handle */
56   for (i = 0; i < hds->nMax; i++)
57   {
58     hds->lpHdl[i] = INVALID_HANDLE_VALUE;
59   };
60 }
61 
handle_set_add(LPSELECTHANDLESET hds,HANDLE hdl)62 void handle_set_add (LPSELECTHANDLESET hds, HANDLE hdl)
63 {
64   LPSELECTHANDLESET res;
65 
66   if (hds->nLast < hds->nMax)
67   {
68     hds->lpHdl[hds->nLast] = hdl;
69     hds->nLast++;
70   }
71 
72   DEBUG_PRINT("Adding handle %x to set %x", hdl, hds);
73 }
74 
handle_set_mem(LPSELECTHANDLESET hds,HANDLE hdl)75 BOOL handle_set_mem (LPSELECTHANDLESET hds, HANDLE hdl)
76 {
77   BOOL  res;
78   DWORD i;
79 
80   res = FALSE;
81   for (i = 0; !res && i < hds->nLast; i++)
82   {
83     res = (hds->lpHdl[i] == hdl);
84   }
85 
86   return res;
87 }
88 
handle_set_reset(LPSELECTHANDLESET hds)89 void handle_set_reset (LPSELECTHANDLESET hds)
90 {
91   DWORD i;
92 
93   for (i = 0; i < hds->nMax; i++)
94   {
95     hds->lpHdl[i] = INVALID_HANDLE_VALUE;
96   }
97   hds->nMax  = 0;
98   hds->nLast = 0;
99   hds->lpHdl = NULL;
100 }
101 
102 /* Data structure for handling select */
103 
104 typedef enum _SELECTHANDLETYPE {
105   SELECT_HANDLE_NONE = 0,
106   SELECT_HANDLE_DISK,
107   SELECT_HANDLE_CONSOLE,
108   SELECT_HANDLE_PIPE,
109   SELECT_HANDLE_SOCKET,
110 } SELECTHANDLETYPE;
111 
112 typedef enum _SELECTMODE {
113   SELECT_MODE_NONE = 0,
114   SELECT_MODE_READ = 1,
115   SELECT_MODE_WRITE = 2,
116   SELECT_MODE_EXCEPT = 4,
117 } SELECTMODE;
118 
119 typedef enum _SELECTSTATE {
120   SELECT_STATE_NONE = 0,
121   SELECT_STATE_INITFAILED,
122   SELECT_STATE_ERROR,
123   SELECT_STATE_SIGNALED
124 } SELECTSTATE;
125 
126 typedef enum _SELECTTYPE {
127   SELECT_TYPE_NONE = 0,
128   SELECT_TYPE_STATIC,       /* Result is known without running anything */
129   SELECT_TYPE_CONSOLE_READ, /* Reading data on console */
130   SELECT_TYPE_PIPE_READ,    /* Reading data on pipe */
131   SELECT_TYPE_SOCKET        /* Classic select */
132 } SELECTTYPE;
133 
134 /* Data structure for results */
135 typedef struct _SELECTRESULT {
136   LIST       lst;
137   SELECTMODE EMode;
138   int        lpOrigIdx;
139 } SELECTRESULT;
140 
141 typedef SELECTRESULT *LPSELECTRESULT;
142 
143 /* Data structure for query */
144 typedef struct _SELECTQUERY {
145   LIST         lst;
146   SELECTMODE   EMode;
147   HANDLE       hFileDescr;
148   int          lpOrigIdx;
149   unsigned int uFlagsFd; /* Copy of filedescr->flags_fd */
150 } SELECTQUERY;
151 
152 typedef SELECTQUERY *LPSELECTQUERY;
153 
154 typedef struct _SELECTDATA {
155   LIST             lst;
156   SELECTTYPE       EType;
157   /* Sockets may generate a result for all three lists from one single
158      query object
159    */
160   SELECTRESULT     aResults[MAXIMUM_SELECT_OBJECTS * 3];
161   DWORD            nResultsCount;
162   /* Data following are dedicated to APC like call, they
163      will be initialized if required.
164      */
165   WORKERFUNC       funcWorker;
166   SELECTQUERY      aQueries[MAXIMUM_SELECT_OBJECTS];
167   DWORD            nQueriesCount;
168   SELECTSTATE      EState;
169   DWORD            nError;
170   LPWORKER         lpWorker;
171 } SELECTDATA;
172 
173 typedef SELECTDATA *LPSELECTDATA;
174 
175 /* Get error status if associated condition is false */
check_error(LPSELECTDATA lpSelectData,BOOL bFailed)176 static BOOL check_error(LPSELECTDATA lpSelectData, BOOL bFailed)
177 {
178   if (bFailed && lpSelectData->nError == 0)
179   {
180     lpSelectData->EState = SELECT_STATE_ERROR;
181     lpSelectData->nError = GetLastError();
182   }
183   return bFailed;
184 }
185 
186 /* Create data associated with a  select operation */
select_data_new(LPSELECTDATA lpSelectData,SELECTTYPE EType)187 LPSELECTDATA select_data_new (LPSELECTDATA lpSelectData, SELECTTYPE EType)
188 {
189   /* Allocate the data structure */
190   LPSELECTDATA res;
191   DWORD        i;
192 
193   res = (LPSELECTDATA)caml_stat_alloc(sizeof(SELECTDATA));
194 
195   /* Init common data */
196   list_init((LPLIST)res);
197   list_next_set((LPLIST)res, (LPLIST)lpSelectData);
198   res->EType         = EType;
199   res->nResultsCount = 0;
200 
201 
202   /* Data following are dedicated to APC like call, they
203      will be initialized if required. For now they are set to
204      invalid values.
205      */
206   res->funcWorker    = NULL;
207   res->nQueriesCount = 0;
208   res->EState        = SELECT_STATE_NONE;
209   res->nError        = 0;
210   res->lpWorker  = NULL;
211 
212   return res;
213 }
214 
215 /* Free select data */
select_data_free(LPSELECTDATA lpSelectData)216 void select_data_free (LPSELECTDATA lpSelectData)
217 {
218   DWORD i;
219 
220   DEBUG_PRINT("Freeing data of %x", lpSelectData);
221 
222   /* Free APC related data, if they exists */
223   if (lpSelectData->lpWorker != NULL)
224   {
225     worker_job_finish(lpSelectData->lpWorker);
226     lpSelectData->lpWorker = NULL;
227   };
228 
229   /* Make sure results/queries cannot be accessed */
230   lpSelectData->nResultsCount = 0;
231   lpSelectData->nQueriesCount = 0;
232 
233   caml_stat_free(lpSelectData);
234 }
235 
236 /* Add a result to select data, return zero if something goes wrong. */
select_data_result_add(LPSELECTDATA lpSelectData,SELECTMODE EMode,int lpOrigIdx)237 DWORD select_data_result_add (LPSELECTDATA lpSelectData, SELECTMODE EMode,
238                               int lpOrigIdx)
239 {
240   DWORD res;
241   DWORD i;
242 
243   res = 0;
244   if (lpSelectData->nResultsCount < MAXIMUM_SELECT_OBJECTS * 3)
245   {
246     i = lpSelectData->nResultsCount;
247     lpSelectData->aResults[i].EMode  = EMode;
248     lpSelectData->aResults[i].lpOrigIdx = lpOrigIdx;
249     lpSelectData->nResultsCount++;
250     res = 1;
251   }
252 
253   return res;
254 }
255 
256 /* Add a query to select data, return zero if something goes wrong */
select_data_query_add(LPSELECTDATA lpSelectData,SELECTMODE EMode,HANDLE hFileDescr,int lpOrigIdx,unsigned int uFlagsFd)257 DWORD select_data_query_add (LPSELECTDATA lpSelectData,
258                              SELECTMODE EMode,
259                              HANDLE hFileDescr,
260                              int lpOrigIdx,
261                              unsigned int uFlagsFd)
262 {
263   DWORD res;
264   DWORD i;
265 
266   res = 0;
267   if (lpSelectData->nQueriesCount < MAXIMUM_SELECT_OBJECTS)
268   {
269     i = lpSelectData->nQueriesCount;
270     lpSelectData->aQueries[i].EMode      = EMode;
271     lpSelectData->aQueries[i].hFileDescr = hFileDescr;
272     lpSelectData->aQueries[i].lpOrigIdx  = lpOrigIdx;
273     lpSelectData->aQueries[i].uFlagsFd   = uFlagsFd;
274     lpSelectData->nQueriesCount++;
275     res = 1;
276   }
277 
278   return res;
279 }
280 
281 /* Search for a job that has available query slots and that match provided type.
282  * If none is found, create a new one. Return the corresponding SELECTDATA, and
283  * update provided SELECTDATA head, if required.
284  */
select_data_job_search(LPSELECTDATA * lppSelectData,SELECTTYPE EType)285 LPSELECTDATA select_data_job_search (LPSELECTDATA *lppSelectData,
286                                      SELECTTYPE EType)
287 {
288   LPSELECTDATA res;
289 
290   res = NULL;
291 
292   /* Search for job */
293   DEBUG_PRINT("Searching an available job for type %d", EType);
294   res = *lppSelectData;
295   while (
296       res != NULL
297       && !(
298         res->EType == EType
299         && res->nQueriesCount < MAXIMUM_SELECT_OBJECTS
300         )
301       )
302   {
303     res = LIST_NEXT(LPSELECTDATA, res);
304   }
305 
306   /* No matching job found, create one */
307   if (res == NULL)
308   {
309     DEBUG_PRINT("No job for type %d found, create one", EType);
310     res = select_data_new(*lppSelectData, EType);
311     *lppSelectData = res;
312   }
313 
314   return res;
315 }
316 
317 /***********************/
318 /*      Console        */
319 /***********************/
320 
read_console_poll(HANDLE hStop,void * _data)321 void read_console_poll(HANDLE hStop, void *_data)
322 {
323   HANDLE events[2];
324   INPUT_RECORD record;
325   DWORD waitRes;
326   DWORD n;
327   LPSELECTDATA  lpSelectData;
328   LPSELECTQUERY lpQuery;
329 
330   DEBUG_PRINT("Waiting for data on console");
331 
332   record;
333   waitRes = 0;
334   n = 0;
335   lpSelectData = (LPSELECTDATA)_data;
336   lpQuery = &(lpSelectData->aQueries[0]);
337 
338   events[0] = hStop;
339   events[1] = lpQuery->hFileDescr;
340   while (lpSelectData->EState == SELECT_STATE_NONE)
341   {
342     waitRes = WaitForMultipleObjects(2, events, FALSE, INFINITE);
343     if (waitRes == WAIT_OBJECT_0
344         || check_error(lpSelectData, waitRes == WAIT_FAILED))
345     {
346       /* stop worker event or error */
347       break;
348     }
349     /* console event */
350     if (check_error(lpSelectData, PeekConsoleInput(lpQuery->hFileDescr,
351                                                    &record, 1, &n)
352                     == 0))
353     {
354       break;
355     }
356     /* check for ASCII keypress only */
357     if (record.EventType == KEY_EVENT &&
358       record.Event.KeyEvent.bKeyDown &&
359       record.Event.KeyEvent.uChar.AsciiChar != 0)
360     {
361       select_data_result_add(lpSelectData, lpQuery->EMode, lpQuery->lpOrigIdx);
362       lpSelectData->EState = SELECT_STATE_SIGNALED;
363       break;
364     }
365     else
366     {
367       /* discard everything else and try again */
368       if (check_error(lpSelectData, ReadConsoleInput(lpQuery->hFileDescr,
369                                                      &record, 1, &n)
370                       == 0))
371       {
372         break;
373       }
374     }
375   };
376 }
377 
378 /* Add a function to monitor console input */
read_console_poll_add(LPSELECTDATA lpSelectData,SELECTMODE EMode,HANDLE hFileDescr,int lpOrigIdx,unsigned int uFlagsFd)379 LPSELECTDATA read_console_poll_add (LPSELECTDATA lpSelectData,
380                                     SELECTMODE EMode,
381                                     HANDLE hFileDescr,
382                                     int lpOrigIdx,
383                                     unsigned int uFlagsFd)
384 {
385   LPSELECTDATA res;
386 
387   res = select_data_new(lpSelectData, SELECT_TYPE_CONSOLE_READ);
388   res->funcWorker = read_console_poll;
389   select_data_query_add(res, SELECT_MODE_READ, hFileDescr, lpOrigIdx, uFlagsFd);
390 
391   return res;
392 }
393 
394 /***********************/
395 /*        Pipe         */
396 /***********************/
397 
398 /* Monitor a pipe for input */
read_pipe_poll(HANDLE hStop,void * _data)399 void read_pipe_poll (HANDLE hStop, void *_data)
400 {
401   DWORD         res;
402   DWORD         event;
403   DWORD         n;
404   LPSELECTQUERY iterQuery;
405   LPSELECTDATA  lpSelectData;
406   DWORD         i;
407   DWORD         wait;
408 
409   /* Poll pipe */
410   event = 0;
411   n = 0;
412   lpSelectData = (LPSELECTDATA)_data;
413   wait = 1;
414 
415   DEBUG_PRINT("Checking data pipe");
416   while (lpSelectData->EState == SELECT_STATE_NONE)
417   {
418     for (i = 0; i < lpSelectData->nQueriesCount; i++)
419     {
420       iterQuery = &(lpSelectData->aQueries[i]);
421       res = PeekNamedPipe(
422           iterQuery->hFileDescr,
423           NULL,
424           0,
425           NULL,
426           &n,
427           NULL);
428       if (check_error(lpSelectData,
429             (res == 0) &&
430             (GetLastError() != ERROR_BROKEN_PIPE)))
431       {
432         break;
433       };
434 
435       if ((n > 0) || (res == 0))
436       {
437         lpSelectData->EState = SELECT_STATE_SIGNALED;
438         select_data_result_add(lpSelectData, iterQuery->EMode,
439                                iterQuery->lpOrigIdx);
440       };
441     };
442 
443     /* Alas, nothing except polling seems to work for pipes.
444        Check the state & stop_worker_event every 10 ms
445      */
446     if (lpSelectData->EState == SELECT_STATE_NONE)
447     {
448       event = WaitForSingleObject(hStop, wait);
449 
450       /* Fast start: begin to wait 1, 2, 4, 8 and then 10 ms.
451        * If we are working with the output of a program there is
452        * a chance that one of the 4 first calls succeed.
453        */
454       wait = 2 * wait;
455       if (wait > 10)
456       {
457         wait = 10;
458       };
459       if (event == WAIT_OBJECT_0
460           || check_error(lpSelectData, event == WAIT_FAILED))
461       {
462         break;
463       }
464     }
465   }
466   DEBUG_PRINT("Finish checking data on pipe");
467 }
468 
469 /* Add a function to monitor pipe input */
read_pipe_poll_add(LPSELECTDATA lpSelectData,SELECTMODE EMode,HANDLE hFileDescr,int lpOrigIdx,unsigned int uFlagsFd)470 LPSELECTDATA read_pipe_poll_add (LPSELECTDATA lpSelectData,
471                                  SELECTMODE EMode,
472                                  HANDLE hFileDescr,
473                                  int lpOrigIdx,
474                                  unsigned int uFlagsFd)
475 {
476   LPSELECTDATA res;
477   LPSELECTDATA hd;
478 
479   hd = lpSelectData;
480   /* Polling pipe is a non blocking operation by default. This means that each
481      worker can handle many pipe. We begin to try to find a worker that is
482      polling pipe, but for which there is under the limit of pipe per worker.
483      */
484   DEBUG_PRINT("Searching an available worker handling pipe");
485   res = select_data_job_search(&hd, SELECT_TYPE_PIPE_READ);
486 
487   /* Add a new pipe to poll */
488   res->funcWorker = read_pipe_poll;
489   select_data_query_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
490 
491   return hd;
492 }
493 
494 /***********************/
495 /*       Socket        */
496 /***********************/
497 
498 /* Monitor socket */
socket_poll(HANDLE hStop,void * _data)499 void socket_poll (HANDLE hStop, void *_data)
500 {
501   LPSELECTDATA   lpSelectData;
502   LPSELECTQUERY    iterQuery;
503   HANDLE           aEvents[MAXIMUM_SELECT_OBJECTS];
504   DWORD            nEvents;
505   long             maskEvents;
506   DWORD            i;
507   u_long           iMode;
508   SELECTMODE       mode;
509   WSANETWORKEVENTS events;
510 
511   lpSelectData = (LPSELECTDATA)_data;
512 
513   DEBUG_PRINT("Worker has %d queries to service", lpSelectData->nQueriesCount);
514   for (nEvents = 0; nEvents < lpSelectData->nQueriesCount; nEvents++)
515   {
516     iterQuery = &(lpSelectData->aQueries[nEvents]);
517     aEvents[nEvents] = CreateEvent(NULL, TRUE, FALSE, NULL);
518     maskEvents = 0;
519     mode = iterQuery->EMode;
520     if ((mode & SELECT_MODE_READ) != 0)
521     {
522       DEBUG_PRINT("Polling read for %d", iterQuery->hFileDescr);
523       maskEvents |= FD_READ | FD_ACCEPT | FD_CLOSE;
524     }
525     if ((mode & SELECT_MODE_WRITE) != 0)
526     {
527       DEBUG_PRINT("Polling write for %d", iterQuery->hFileDescr);
528       maskEvents |= FD_WRITE | FD_CONNECT | FD_CLOSE;
529     }
530     if ((mode & SELECT_MODE_EXCEPT) != 0)
531     {
532       DEBUG_PRINT("Polling exceptions for %d", iterQuery->hFileDescr);
533       maskEvents |= FD_OOB;
534     }
535 
536     check_error(lpSelectData,
537         WSAEventSelect(
538           (SOCKET)(iterQuery->hFileDescr),
539           aEvents[nEvents],
540           maskEvents) == SOCKET_ERROR);
541   }
542 
543   /* Add stop event */
544   aEvents[nEvents]  = hStop;
545   nEvents++;
546 
547   if (lpSelectData->nError == 0)
548   {
549     check_error(lpSelectData,
550         WaitForMultipleObjects(
551           nEvents,
552           aEvents,
553           FALSE,
554           INFINITE) == WAIT_FAILED);
555   };
556 
557   if (lpSelectData->nError == 0)
558   {
559     for (i = 0; i < lpSelectData->nQueriesCount; i++)
560     {
561       iterQuery = &(lpSelectData->aQueries[i]);
562       if (WaitForSingleObject(aEvents[i], 0) == WAIT_OBJECT_0)
563       {
564         DEBUG_PRINT("Socket %d has pending events", (i - 1));
565         if (iterQuery != NULL)
566         {
567           /* Find out what kind of events were raised
568            */
569           if (WSAEnumNetworkEvents((SOCKET)(iterQuery->hFileDescr),
570                                    aEvents[i], &events) == 0)
571           {
572             if ((iterQuery->EMode & SELECT_MODE_READ) != 0
573                 && (events.lNetworkEvents & (FD_READ | FD_ACCEPT | FD_CLOSE))
574                    != 0)
575             {
576               select_data_result_add(lpSelectData, SELECT_MODE_READ,
577                                      iterQuery->lpOrigIdx);
578             }
579             if ((iterQuery->EMode & SELECT_MODE_WRITE) != 0
580                 && (events.lNetworkEvents & (FD_WRITE | FD_CONNECT | FD_CLOSE))
581                    != 0)
582             {
583               select_data_result_add(lpSelectData, SELECT_MODE_WRITE,
584                                      iterQuery->lpOrigIdx);
585             }
586             if ((iterQuery->EMode & SELECT_MODE_EXCEPT) != 0
587                 && (events.lNetworkEvents & FD_OOB) != 0)
588             {
589               select_data_result_add(lpSelectData, SELECT_MODE_EXCEPT,
590                                      iterQuery->lpOrigIdx);
591             }
592           }
593         }
594       }
595       /* WSAEventSelect() automatically sets socket to nonblocking mode.
596          Restore the blocking one. */
597       if (iterQuery->uFlagsFd & FLAGS_FD_IS_BLOCKING)
598       {
599         DEBUG_PRINT("Restore a blocking socket");
600         iMode = 0;
601         check_error(lpSelectData,
602           WSAEventSelect((SOCKET)(iterQuery->hFileDescr), aEvents[i], 0) != 0 ||
603           ioctlsocket((SOCKET)(iterQuery->hFileDescr), FIONBIO, &iMode) != 0);
604       }
605       else
606       {
607         check_error(lpSelectData,
608           WSAEventSelect((SOCKET)(iterQuery->hFileDescr), aEvents[i], 0) != 0);
609       };
610 
611       CloseHandle(aEvents[i]);
612       aEvents[i] = INVALID_HANDLE_VALUE;
613     }
614   }
615 }
616 
617 /* Add a function to monitor socket */
socket_poll_add(LPSELECTDATA lpSelectData,SELECTMODE EMode,HANDLE hFileDescr,int lpOrigIdx,unsigned int uFlagsFd)618 LPSELECTDATA socket_poll_add (LPSELECTDATA lpSelectData,
619                               SELECTMODE EMode,
620                               HANDLE hFileDescr,
621                               int lpOrigIdx,
622                               unsigned int uFlagsFd)
623 {
624   LPSELECTDATA res;
625   LPSELECTDATA candidate;
626   long i;
627   LPSELECTQUERY aQueries;
628 
629   res = lpSelectData;
630   candidate = NULL;
631   aQueries = NULL;
632 
633   /* Polling socket can be done mulitple handle at the same time. You just
634      need one worker to use it. Try to find if there is already a worker
635      handling this kind of request.
636      Only one event can be associated with a given socket which means
637      that if a socket is in more than one of the fd_sets then we have
638      to find that particular query and update EMode with the
639      additional flag.
640      */
641   DEBUG_PRINT("Scanning list of worker to find one that already handle socket");
642   /* Search for job */
643   DEBUG_PRINT("Searching for an available job for type %d for descriptor %d",
644               SELECT_TYPE_SOCKET, hFileDescr);
645   while (res != NULL)
646   {
647     if (res->EType == SELECT_TYPE_SOCKET)
648     {
649       i = res->nQueriesCount - 1;
650       aQueries = res->aQueries;
651       while (i >= 0 && aQueries[i].hFileDescr != hFileDescr)
652       {
653         i--;
654       }
655       /* If we didn't find the socket but this worker has available
656          slots, store it
657        */
658       if (i < 0)
659       {
660         if ( res->nQueriesCount < MAXIMUM_SELECT_OBJECTS)
661         {
662           candidate = res;
663         }
664         res = LIST_NEXT(LPSELECTDATA, res);
665       }
666       else
667       {
668         /* Previous socket query located -- we're finished
669          */
670         aQueries = &aQueries[i];
671         break;
672       }
673     }
674     else
675     {
676       res = LIST_NEXT(LPSELECTDATA, res);
677     }
678   }
679 
680   if (res == NULL)
681   {
682     res = candidate;
683 
684     /* No matching job found, create one */
685     if (res == NULL)
686     {
687       DEBUG_PRINT("No job for type %d found, create one", SELECT_TYPE_SOCKET);
688       res = select_data_new(lpSelectData, SELECT_TYPE_SOCKET);
689       res->funcWorker = socket_poll;
690       res->nQueriesCount = 1;
691       aQueries = &res->aQueries[0];
692     }
693     else
694     {
695       aQueries = &(res->aQueries[res->nQueriesCount++]);
696     }
697     aQueries->EMode = EMode;
698     aQueries->hFileDescr = hFileDescr;
699     aQueries->lpOrigIdx = lpOrigIdx;
700     aQueries->uFlagsFd = uFlagsFd;
701     DEBUG_PRINT("Socket %x added", hFileDescr);
702   }
703   else
704   {
705     aQueries->EMode |= EMode;
706     DEBUG_PRINT("Socket %x updated to %d", hFileDescr, aQueries->EMode);
707   }
708 
709   return res;
710 }
711 
712 /***********************/
713 /*       Static        */
714 /***********************/
715 
716 /* Add a static result */
static_poll_add(LPSELECTDATA lpSelectData,SELECTMODE EMode,HANDLE hFileDescr,int lpOrigIdx,unsigned int uFlagsFd)717 LPSELECTDATA static_poll_add (LPSELECTDATA lpSelectData,
718                               SELECTMODE EMode,
719                               HANDLE hFileDescr,
720                               int lpOrigIdx,
721                               unsigned int uFlagsFd)
722 {
723   LPSELECTDATA res;
724   LPSELECTDATA hd;
725 
726   /* Look for an already initialized static element */
727   hd = lpSelectData;
728   res = select_data_job_search(&hd, SELECT_TYPE_STATIC);
729 
730   /* Add a new query/result */
731   select_data_query_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
732   select_data_result_add(res, EMode, lpOrigIdx);
733 
734   return hd;
735 }
736 
737 /********************************/
738 /* Generic select data handling */
739 /********************************/
740 
741 /* Guess handle type */
get_handle_type(value fd)742 static SELECTHANDLETYPE get_handle_type(value fd)
743 {
744   DWORD            mode;
745   SELECTHANDLETYPE res;
746 
747   CAMLparam1(fd);
748 
749   mode = 0;
750   res = SELECT_HANDLE_NONE;
751 
752   if (Descr_kind_val(fd) == KIND_SOCKET)
753   {
754     res = SELECT_HANDLE_SOCKET;
755   }
756   else
757   {
758     switch(GetFileType(Handle_val(fd)))
759     {
760       case FILE_TYPE_DISK:
761         res = SELECT_HANDLE_DISK;
762         break;
763 
764       case FILE_TYPE_CHAR: /* character file or a console */
765         if (GetConsoleMode(Handle_val(fd), &mode) != 0)
766         {
767           res = SELECT_HANDLE_CONSOLE;
768         }
769         else
770         {
771           res = SELECT_HANDLE_NONE;
772         };
773         break;
774 
775       case FILE_TYPE_PIPE: /* a named or an anonymous pipe (socket
776                               already handled) */
777         res = SELECT_HANDLE_PIPE;
778         break;
779     };
780   };
781 
782   CAMLreturnT(SELECTHANDLETYPE, res);
783 }
784 
785 /* Choose what to do with given data */
select_data_dispatch(LPSELECTDATA lpSelectData,SELECTMODE EMode,value fd,int lpOrigIdx)786 LPSELECTDATA select_data_dispatch (LPSELECTDATA lpSelectData, SELECTMODE EMode,
787                                    value fd, int lpOrigIdx)
788 {
789   LPSELECTDATA    res;
790   HANDLE          hFileDescr;
791   struct sockaddr sa;
792   int             sa_len;
793   BOOL            alreadyAdded;
794   unsigned int    uFlagsFd;
795 
796   CAMLparam1(fd);
797 
798   res          = lpSelectData;
799   hFileDescr   = Handle_val(fd);
800   sa_len       = sizeof(sa);
801   alreadyAdded = FALSE;
802   uFlagsFd     = Flags_fd_val(fd);
803 
804   DEBUG_PRINT("Begin dispatching handle %x", hFileDescr);
805 
806   DEBUG_PRINT("Waiting for %d on handle %x", EMode, hFileDescr);
807 
808   /* There is only 2 way to have except mode: transmission of OOB data through
809      a socket TCP/IP and through a strange interaction with a TTY.
810      With windows, we only consider the TCP/IP except condition
811   */
812   switch(get_handle_type(fd))
813   {
814     case SELECT_HANDLE_DISK:
815       DEBUG_PRINT("Handle %x is a disk handle", hFileDescr);
816       /* Disk is always ready in read/write operation */
817       if (EMode == SELECT_MODE_READ || EMode == SELECT_MODE_WRITE)
818       {
819         res = static_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
820       };
821       break;
822 
823     case SELECT_HANDLE_CONSOLE:
824       DEBUG_PRINT("Handle %x is a console handle", hFileDescr);
825       /* Console is always ready in write operation, need to check for read. */
826       if (EMode == SELECT_MODE_READ)
827       {
828         res = read_console_poll_add(res, EMode, hFileDescr, lpOrigIdx,
829                                     uFlagsFd);
830       }
831       else if (EMode == SELECT_MODE_WRITE)
832       {
833         res = static_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
834       };
835       break;
836 
837     case SELECT_HANDLE_PIPE:
838       DEBUG_PRINT("Handle %x is a pipe handle", hFileDescr);
839       /* Console is always ready in write operation, need to check for read. */
840       if (EMode == SELECT_MODE_READ)
841       {
842         DEBUG_PRINT("Need to check availability of data on pipe");
843         res = read_pipe_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
844       }
845       else if (EMode == SELECT_MODE_WRITE)
846       {
847         DEBUG_PRINT("No need to check availability of data on pipe, "
848                     "write operation always possible");
849         res = static_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
850       };
851       break;
852 
853     case SELECT_HANDLE_SOCKET:
854       DEBUG_PRINT("Handle %x is a socket handle", hFileDescr);
855       if (getsockname((SOCKET)hFileDescr, &sa, &sa_len) == SOCKET_ERROR)
856       {
857         if (WSAGetLastError() == WSAEINVAL)
858         {
859           /* Socket is not bound */
860           DEBUG_PRINT("Socket is not connected");
861           if (EMode == SELECT_MODE_WRITE || EMode == SELECT_MODE_READ)
862           {
863             res = static_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
864             alreadyAdded = TRUE;
865           }
866         }
867       }
868       if (!alreadyAdded)
869       {
870         res = socket_poll_add(res, EMode, hFileDescr, lpOrigIdx, uFlagsFd);
871       }
872       break;
873 
874     default:
875       DEBUG_PRINT("Handle %x is unknown", hFileDescr);
876       win32_maperr(ERROR_INVALID_HANDLE);
877       uerror("select", Nothing);
878       break;
879   };
880 
881   DEBUG_PRINT("Finish dispatching handle %x", hFileDescr);
882 
883   CAMLreturnT(LPSELECTDATA, res);
884 }
885 
caml_list_length(value lst)886 static DWORD caml_list_length (value lst)
887 {
888   DWORD res;
889 
890   CAMLparam1 (lst);
891   CAMLlocal1 (l);
892 
893   for (res = 0, l = lst; l != Val_int(0); l = Field(l, 1), res++)
894   { }
895 
896   CAMLreturnT(DWORD, res);
897 }
898 
find_handle(LPSELECTRESULT iterResult,value readfds,value writefds,value exceptfds)899 static value find_handle(LPSELECTRESULT iterResult, value readfds,
900                          value writefds, value exceptfds)
901 {
902   CAMLparam3(readfds, writefds, exceptfds);
903   CAMLlocal2(result, list);
904   int i;
905 
906   switch( iterResult->EMode )
907   {
908     case SELECT_MODE_READ:
909       list = readfds;
910       break;
911     case SELECT_MODE_WRITE:
912       list = writefds;
913       break;
914     case SELECT_MODE_EXCEPT:
915       list = exceptfds;
916       break;
917     case SELECT_MODE_NONE:
918       CAMLassert(0);
919   };
920 
921   for(i=0; list != Val_unit && i < iterResult->lpOrigIdx; ++i )
922   {
923     list = Field(list, 1);
924   }
925 
926   if (list == Val_unit)
927     caml_failwith ("select.c: original file handle not found");
928 
929   result = Field(list, 0);
930 
931   CAMLreturn( result );
932 }
933 
934 #define MAX(a, b) ((a) > (b) ? (a) : (b))
935 
936 /* Convert fdlist to an fd_set if all the handles in fdlist are
937  * sockets and return 1.  Returns 0 if a non-socket value is
938  * encountered, or if there are more than FD_SETSIZE sockets.
939  */
fdlist_to_fdset(value fdlist,fd_set * fdset)940 static int fdlist_to_fdset(value fdlist, fd_set *fdset)
941 {
942   value l, c;
943   int n = 0;
944   FD_ZERO(fdset);
945   for (l = fdlist; l != Val_int(0); l = Field(l, 1)) {
946     if (++n > FD_SETSIZE) {
947       DEBUG_PRINT("More than FD_SETSIZE sockets");
948       return 0;
949     }
950     c = Field(l, 0);
951     if (Descr_kind_val(c) == KIND_SOCKET) {
952       FD_SET(Socket_val(c), fdset);
953     } else {
954       DEBUG_PRINT("Non socket value encountered");
955       return 0;
956     }
957   }
958   return 1;
959 }
960 
fdset_to_fdlist(value fdlist,fd_set * fdset)961 static value fdset_to_fdlist(value fdlist, fd_set *fdset)
962 {
963   value res = Val_int(0);
964   Begin_roots2(fdlist, res)
965     for (/*nothing*/; fdlist != Val_int(0); fdlist = Field(fdlist, 1)) {
966       value s = Field(fdlist, 0);
967       if (FD_ISSET(Socket_val(s), fdset)) {
968         value newres = caml_alloc_small(2, 0);
969         Field(newres, 0) = s;
970         Field(newres, 1) = res;
971         res = newres;
972       }
973     }
974   End_roots();
975   return res;
976 }
977 
unix_select(value readfds,value writefds,value exceptfds,value timeout)978 CAMLprim value unix_select(value readfds, value writefds, value exceptfds,
979                            value timeout)
980 {
981   /* Event associated to handle */
982   DWORD   nEventsCount;
983   DWORD   nEventsMax;
984   HANDLE *lpEventsDone;
985 
986   /* Data for all handles */
987   LPSELECTDATA lpSelectData;
988   LPSELECTDATA iterSelectData;
989 
990   /* Iterator for results */
991   LPSELECTRESULT iterResult;
992 
993   /* Iterator */
994   DWORD i;
995 
996   /* Error status */
997   DWORD err;
998 
999   /* Time to wait */
1000   DWORD milliseconds;
1001 
1002   /* Is there static select data */
1003   BOOL  hasStaticData = FALSE;
1004 
1005   /* Wait return */
1006   DWORD waitRet;
1007 
1008   /* Set of handle */
1009   SELECTHANDLESET hds;
1010   DWORD           hdsMax;
1011   LPHANDLE        hdsData;
1012 
1013   /* Length of each list */
1014   DWORD readfds_len;
1015   DWORD writefds_len;
1016   DWORD exceptfds_len;
1017 
1018   CAMLparam4 (readfds, writefds, exceptfds, timeout);
1019   CAMLlocal5 (read_list, write_list, except_list, res, l);
1020   CAMLlocal1 (fd);
1021 
1022   fd_set read, write, except;
1023   double tm;
1024   struct timeval tv;
1025   struct timeval * tvp;
1026 
1027   DEBUG_PRINT("in select");
1028 
1029   err = 0;
1030   tm = Double_val(timeout);
1031   if (readfds == Val_int(0)
1032       && writefds == Val_int(0)
1033       && exceptfds == Val_int(0)) {
1034     DEBUG_PRINT("nothing to do");
1035     if ( tm > 0.0 ) {
1036       caml_enter_blocking_section();
1037       Sleep( (int)(tm * 1000));
1038       caml_leave_blocking_section();
1039     }
1040     read_list = write_list = except_list = Val_int(0);
1041   } else {
1042     if (fdlist_to_fdset(readfds, &read)
1043         && fdlist_to_fdset(writefds, &write)
1044         && fdlist_to_fdset(exceptfds, &except)) {
1045       DEBUG_PRINT("only sockets to select on, using classic select");
1046       if (tm < 0.0) {
1047         tvp = (struct timeval *) NULL;
1048       } else {
1049         tv.tv_sec = (int) tm;
1050         tv.tv_usec = (int) (1e6 * (tm - (int) tm));
1051         tvp = &tv;
1052       }
1053       caml_enter_blocking_section();
1054       if (select(FD_SETSIZE, &read, &write, &except, tvp) == -1) {
1055         err = WSAGetLastError();
1056         DEBUG_PRINT("Error %ld occurred", err);
1057       }
1058       caml_leave_blocking_section();
1059       if (err) {
1060         DEBUG_PRINT("Error %ld occurred", err);
1061         win32_maperr(err);
1062         uerror("select", Nothing);
1063       }
1064       read_list = fdset_to_fdlist(readfds, &read);
1065       write_list = fdset_to_fdlist(writefds, &write);
1066       except_list = fdset_to_fdlist(exceptfds, &except);
1067     } else {
1068       nEventsCount   = 0;
1069       nEventsMax     = 0;
1070       lpEventsDone   = NULL;
1071       lpSelectData   = NULL;
1072       iterSelectData = NULL;
1073       iterResult     = NULL;
1074       hasStaticData  = 0;
1075       waitRet        = 0;
1076       readfds_len    = caml_list_length(readfds);
1077       writefds_len   = caml_list_length(writefds);
1078       exceptfds_len  = caml_list_length(exceptfds);
1079       hdsMax         = MAX(readfds_len, MAX(writefds_len, exceptfds_len));
1080 
1081       hdsData = (HANDLE *)caml_stat_alloc(sizeof(HANDLE) * hdsMax);
1082 
1083       if (tm >= 0.0)
1084         {
1085           milliseconds = 1000 * tm;
1086           DEBUG_PRINT("Will wait %d ms", milliseconds);
1087         }
1088       else
1089         {
1090           milliseconds = INFINITE;
1091         }
1092 
1093 
1094       /* Create list of select data, based on the different list of fd
1095          to watch */
1096       DEBUG_PRINT("Dispatch read fd");
1097       handle_set_init(&hds, hdsData, hdsMax);
1098       i=0;
1099       for (l = readfds; l != Val_int(0); l = Field(l, 1))
1100         {
1101           fd = Field(l, 0);
1102           if (!handle_set_mem(&hds, Handle_val(fd)))
1103             {
1104               handle_set_add(&hds, Handle_val(fd));
1105               lpSelectData = select_data_dispatch(lpSelectData,
1106                                                   SELECT_MODE_READ, fd, i++);
1107             }
1108           else
1109             {
1110               DEBUG_PRINT("Discarding handle %x which is already monitor "
1111                           "for read", Handle_val(fd));
1112             }
1113         }
1114       handle_set_reset(&hds);
1115 
1116       DEBUG_PRINT("Dispatch write fd");
1117       handle_set_init(&hds, hdsData, hdsMax);
1118       i=0;
1119       for (l = writefds; l != Val_int(0); l = Field(l, 1))
1120         {
1121           fd = Field(l, 0);
1122           if (!handle_set_mem(&hds, Handle_val(fd)))
1123             {
1124               handle_set_add(&hds, Handle_val(fd));
1125               lpSelectData = select_data_dispatch(lpSelectData,
1126                                                   SELECT_MODE_WRITE, fd, i++);
1127             }
1128           else
1129             {
1130               DEBUG_PRINT("Discarding handle %x which is already monitor "
1131                           "for write", Handle_val(fd));
1132             }
1133         }
1134       handle_set_reset(&hds);
1135 
1136       DEBUG_PRINT("Dispatch exceptional fd");
1137       handle_set_init(&hds, hdsData, hdsMax);
1138       i=0;
1139       for (l = exceptfds; l != Val_int(0); l = Field(l, 1))
1140         {
1141           fd = Field(l, 0);
1142           if (!handle_set_mem(&hds, Handle_val(fd)))
1143             {
1144               handle_set_add(&hds, Handle_val(fd));
1145               lpSelectData = select_data_dispatch(lpSelectData,
1146                                                   SELECT_MODE_EXCEPT, fd, i++);
1147             }
1148           else
1149             {
1150               DEBUG_PRINT("Discarding handle %x which is already monitor "
1151                           "for exceptional", Handle_val(fd));
1152             }
1153         }
1154       handle_set_reset(&hds);
1155 
1156       /* Building the list of handle to wait for */
1157       DEBUG_PRINT("Building events done array");
1158       nEventsMax   = list_length((LPLIST)lpSelectData);
1159       nEventsCount = 0;
1160       lpEventsDone = (HANDLE *)caml_stat_alloc(sizeof(HANDLE) * nEventsMax);
1161 
1162       iterSelectData = lpSelectData;
1163       while (iterSelectData != NULL)
1164         {
1165           /* Check if it is static data. If this is the case, launch everything
1166            * but don't wait for events. It helps to test if there are events on
1167            * any other fd (which are not static), knowing that there is at least
1168            * one result (the static data).
1169            */
1170           if (iterSelectData->EType == SELECT_TYPE_STATIC)
1171             {
1172               hasStaticData = TRUE;
1173             };
1174 
1175           /* Execute APC */
1176           if (iterSelectData->funcWorker != NULL)
1177             {
1178               iterSelectData->lpWorker =
1179                 worker_job_submit(
1180                                   iterSelectData->funcWorker,
1181                                   (void *)iterSelectData);
1182               DEBUG_PRINT("Job submitted to worker %x",
1183                           iterSelectData->lpWorker);
1184               lpEventsDone[nEventsCount]
1185                 = worker_job_event_done(iterSelectData->lpWorker);
1186               nEventsCount++;
1187             };
1188           iterSelectData = LIST_NEXT(LPSELECTDATA, iterSelectData);
1189         };
1190 
1191       DEBUG_PRINT("Need to watch %d workers", nEventsCount);
1192 
1193       /* Processing select itself */
1194       caml_enter_blocking_section();
1195       /* There are worker started, waiting to be monitored */
1196       if (nEventsCount > 0)
1197         {
1198           /* Waiting for event */
1199           if (err == 0 && !hasStaticData)
1200             {
1201               DEBUG_PRINT("Waiting for one select worker to be done");
1202               switch (WaitForMultipleObjects(nEventsCount, lpEventsDone, FALSE,
1203                                              milliseconds))
1204                 {
1205                 case WAIT_FAILED:
1206                   err = GetLastError();
1207                   break;
1208 
1209                 case WAIT_TIMEOUT:
1210                   DEBUG_PRINT("Select timeout");
1211                   break;
1212 
1213                 default:
1214                   DEBUG_PRINT("One worker is done");
1215                   break;
1216                 };
1217             }
1218 
1219           /* Ordering stop to every worker */
1220           DEBUG_PRINT("Sending stop signal to every select workers");
1221           iterSelectData = lpSelectData;
1222           while (iterSelectData != NULL)
1223             {
1224               if (iterSelectData->lpWorker != NULL)
1225                 {
1226                   worker_job_stop(iterSelectData->lpWorker);
1227                 };
1228               iterSelectData = LIST_NEXT(LPSELECTDATA, iterSelectData);
1229             };
1230 
1231           DEBUG_PRINT("Waiting for every select worker to be done");
1232           switch (WaitForMultipleObjects(nEventsCount, lpEventsDone, TRUE,
1233                                          INFINITE))
1234             {
1235             case WAIT_FAILED:
1236               err = GetLastError();
1237               break;
1238 
1239             default:
1240               DEBUG_PRINT("Every worker is done");
1241               break;
1242             }
1243         }
1244       /* Nothing to monitor but some time to wait. */
1245       else if (!hasStaticData)
1246         {
1247           Sleep(milliseconds);
1248         }
1249       caml_leave_blocking_section();
1250 
1251       DEBUG_PRINT("Error status: %d (0 is ok)", err);
1252       /* Build results */
1253       if (err == 0)
1254         {
1255           DEBUG_PRINT("Building result");
1256           read_list = Val_unit;
1257           write_list = Val_unit;
1258           except_list = Val_unit;
1259 
1260           iterSelectData = lpSelectData;
1261           while (iterSelectData != NULL)
1262             {
1263               for (i = 0; i < iterSelectData->nResultsCount; i++)
1264                 {
1265                   iterResult = &(iterSelectData->aResults[i]);
1266                   l = caml_alloc_small(2, 0);
1267                   Store_field(l, 0, find_handle(iterResult, readfds, writefds,
1268                                                 exceptfds));
1269                   switch (iterResult->EMode)
1270                     {
1271                     case SELECT_MODE_READ:
1272                       Store_field(l, 1, read_list);
1273                       read_list = l;
1274                       break;
1275                     case SELECT_MODE_WRITE:
1276                       Store_field(l, 1, write_list);
1277                       write_list = l;
1278                       break;
1279                     case SELECT_MODE_EXCEPT:
1280                       Store_field(l, 1, except_list);
1281                       except_list = l;
1282                       break;
1283                     case SELECT_MODE_NONE:
1284                       CAMLassert(0);
1285                     }
1286                 }
1287               /* We try to only process the first error, bypass other errors */
1288               if (err == 0 && iterSelectData->EState == SELECT_STATE_ERROR)
1289                 {
1290                   err = iterSelectData->nError;
1291                 }
1292               iterSelectData = LIST_NEXT(LPSELECTDATA, iterSelectData);
1293             }
1294         }
1295 
1296       /* Free resources */
1297       DEBUG_PRINT("Free selectdata resources");
1298       iterSelectData = lpSelectData;
1299       while (iterSelectData != NULL)
1300         {
1301           lpSelectData = iterSelectData;
1302           iterSelectData = LIST_NEXT(LPSELECTDATA, iterSelectData);
1303           select_data_free(lpSelectData);
1304         }
1305       lpSelectData = NULL;
1306 
1307       /* Free allocated events/handle set array */
1308       DEBUG_PRINT("Free local allocated resources");
1309       caml_stat_free(lpEventsDone);
1310       caml_stat_free(hdsData);
1311 
1312       DEBUG_PRINT("Raise error if required");
1313       if (err != 0)
1314         {
1315           win32_maperr(err);
1316           uerror("select", Nothing);
1317         }
1318     }
1319   }
1320 
1321   DEBUG_PRINT("Build final result");
1322   res = caml_alloc_small(3, 0);
1323   Store_field(res, 0, read_list);
1324   Store_field(res, 1, write_list);
1325   Store_field(res, 2, except_list);
1326 
1327   DEBUG_PRINT("out select");
1328 
1329   CAMLreturn(res);
1330 }
1331