1 #include "rktio.h"
2 #include "rktio_private.h"
3 #ifdef RKTIO_SYSTEM_UNIX
4 # include <sys/select.h>
5 # include <unistd.h>
6 # include <fcntl.h>
7 # include <errno.h>
8 # include <math.h>
9 # ifdef USE_ULIMIT
10 #  include <ulimit.h>
11 # endif
12 #endif
13 #ifdef HAVE_POLL_SYSCALL
14 # include <poll.h>
15 #endif
16 #include <string.h>
17 #include <stdlib.h>
18 
19 /*========================================================================*/
20 /* Poll variant                                                           */
21 /*========================================================================*/
22 
23 #ifdef HAVE_POLL_SYSCALL
24 
25 # define PFD_EXTRA_SPACE 1
26 
27 struct rktio_poll_set_t {
28   struct rktio_fd_set_data_t *data;
29   rktio_poll_set_t *w;
30   rktio_poll_set_t *e;
31   int flags;
32 };
33 
34 struct rktio_fd_set_data_t {
35   struct pollfd *pfd;
36   intptr_t size, count;
37   int skip_sleep;
38 };
39 
alloc_fdset_arrays(rktio_t * rktio)40 static rktio_poll_set_t *alloc_fdset_arrays(rktio_t *rktio)
41 {
42   struct rktio_fd_set_data_t *data;
43   rktio_poll_set_t *r, *w, *e;
44   struct pollfd *pfd;
45 
46   data = malloc(sizeof(struct rktio_fd_set_data_t));
47   r = malloc(sizeof(struct rktio_poll_set_t));
48   w = malloc(sizeof(struct rktio_poll_set_t));
49   e = malloc(sizeof(struct rktio_poll_set_t));
50 
51   r->w = w;
52   r->e = e;
53   r->data = data;
54   w->data = data;
55   e->data = data;
56 
57   r->flags = POLLIN;
58   w->flags = POLLOUT;
59   e->flags = 0;
60 
61   data->size = 32;
62   data->count = 0;
63   data->skip_sleep = 0;
64 
65   pfd = malloc(sizeof(struct pollfd) * (32 + PFD_EXTRA_SPACE));
66   data->pfd = pfd;
67 
68   return r;
69 }
70 
free_fdset_arrays(rktio_poll_set_t * fds)71 static void free_fdset_arrays(rktio_poll_set_t *fds)
72 {
73   struct rktio_fd_set_data_t *data = fds->data;
74   free(fds->w);
75   free(fds->e);
76   free(fds);
77   free(data->pfd);
78   free(data);
79 }
80 
rktio_get_fdset(rktio_poll_set_t * fdarray,int pos)81 rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
82 {
83   switch (pos) {
84   case 0:
85     return fdarray;
86   case 1:
87     return fdarray->w;
88   case 2:
89   default:
90     return fdarray->e;
91   }
92 }
93 
rktio_fdzero(rktio_poll_set_t * fd)94 void rktio_fdzero(rktio_poll_set_t *fd)
95 {
96   fd->data->count = 0;
97   fd->data->skip_sleep = 0;
98 }
99 
find_fd_pos(struct rktio_fd_set_data_t * data,intptr_t n)100 static int find_fd_pos(struct rktio_fd_set_data_t *data, intptr_t n)
101 {
102   intptr_t count = data->count;
103   intptr_t i;
104 
105   /* This linear search probably isn't good enough for hundreds or
106      thousands of descriptors, but epoll()/kqueue() mode should handle
107      that case, anyway. */
108   for (i = 0; i < count; i++) {
109     if (data->pfd[i].fd == n) {
110       return i;
111     }
112   }
113 
114   return -1;
115 }
116 
rktio_fdclr(rktio_poll_set_t * fd,intptr_t n)117 void rktio_fdclr(rktio_poll_set_t *fd, intptr_t n)
118 {
119   struct rktio_fd_set_data_t *data = fd->data;
120   intptr_t flag = fd->flags;
121   intptr_t pos;
122 
123   if (!flag) return;
124 
125   pos = find_fd_pos(data, n);
126   if (pos >= 0) {
127     data->pfd[pos].events -= (data->pfd[pos].events & flag);
128   }
129 }
130 
rktio_fdset(rktio_poll_set_t * fd,intptr_t n)131 void rktio_fdset(rktio_poll_set_t *fd, intptr_t n)
132 {
133   struct rktio_fd_set_data_t *data = fd->data;
134   intptr_t flag = fd->flags;
135   intptr_t count, size, pos;
136   struct pollfd *pfd;
137 
138   if (!flag) return;
139 
140   pos = find_fd_pos(data, n);
141   if (pos >= 0) {
142     data->pfd[pos].events |= flag;
143     return;
144   }
145 
146   count = data->count;
147   size = data->size;
148   if (count >= size) {
149     size = size * 2;
150     pfd = malloc(sizeof(struct pollfd) * (size + PFD_EXTRA_SPACE));
151     memcpy(pfd, data->pfd, sizeof(struct pollfd) * count);
152     free(data->pfd);
153     data->pfd = pfd;
154     data->size = size;
155   }
156 
157   data->pfd[count].fd = n;
158   data->pfd[count].events = flag;
159   count++;
160   data->count = count;
161 }
162 
rktio_fdisset(rktio_poll_set_t * fd,intptr_t n)163 int rktio_fdisset(rktio_poll_set_t *fd, intptr_t n)
164 {
165   struct rktio_fd_set_data_t *data = fd->data;
166   intptr_t flag = fd->flags;
167   intptr_t pos;
168 
169   if (!flag) flag = (POLLERR | POLLHUP);
170 
171   pos = find_fd_pos(data, n);
172   if (pos >= 0) {
173     if (data->pfd[pos].revents & flag)
174       return 1;
175     else
176       return 0;
177   }
178 
179   return 0;
180 }
181 
cmp_fd(const void * _a,const void * _b)182 static int cmp_fd(const void *_a, const void *_b)
183 {
184   struct pollfd *a = (struct pollfd *)_a;
185   struct pollfd *b = (struct pollfd *)_b;
186   return a->fd - b->fd;
187 }
188 
rktio_merge_fd_sets(rktio_poll_set_t * fds,rktio_poll_set_t * src_fds)189 void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds)
190 {
191   struct rktio_fd_set_data_t *data = fds->data;
192   struct rktio_fd_set_data_t *src_data = src_fds->data;
193   int i, si, c, sc, j, nc;
194   struct pollfd *pfds;
195 
196   rktio_clean_fd_set(fds);
197   rktio_clean_fd_set(src_fds);
198 
199   if (src_data->skip_sleep)
200     data->skip_sleep = 1;
201 
202   c = data->count;
203   sc = src_data->count;
204 
205   if (!sc)
206     return;
207 
208   qsort(data->pfd, c, sizeof(struct pollfd), cmp_fd);
209   qsort(src_data->pfd, sc, sizeof(struct pollfd), cmp_fd);
210 
211   nc = c + sc;
212   pfds = malloc(sizeof(struct pollfd) * (nc + PFD_EXTRA_SPACE));
213   j = 0;
214   for (i = 0, si = 0; (i < c) && (si < sc); ) {
215     if (data->pfd[i].fd == src_data->pfd[si].fd) {
216       pfds[j].fd = data->pfd[i].fd;
217       pfds[j].events = (data->pfd[i].events | src_data->pfd[si].events);
218       i++;
219       si++;
220     } else if (data->pfd[i].fd < src_data->pfd[si].fd) {
221       pfds[j].fd = data->pfd[i].fd;
222       pfds[j].events = data->pfd[i].events;
223       i++;
224     } else {
225       pfds[j].fd = src_data->pfd[si].fd;
226       pfds[j].events = src_data->pfd[si].events;
227       si++;
228     }
229     j++;
230   }
231   for ( ; i < c; i++, j++) {
232     pfds[j].fd = data->pfd[i].fd;
233     pfds[j].events = data->pfd[i].events;
234   }
235   for ( ; si < sc; si++, j++) {
236     pfds[j].fd = src_data->pfd[si].fd;
237     pfds[j].events = src_data->pfd[si].events;
238   }
239 
240   if (nc > data->size) {
241     free(data->pfd);
242     data->pfd = pfds;
243     data->size = nc;
244   } else {
245     memcpy(data->pfd, pfds, j * sizeof(struct pollfd));
246     free(pfds);
247   }
248   data->count = j;
249 }
250 
rktio_clean_fd_set(rktio_poll_set_t * fds)251 void rktio_clean_fd_set(rktio_poll_set_t *fds)
252 {
253   struct rktio_fd_set_data_t *data = fds->data;
254   intptr_t count = data->count;
255   intptr_t i, j = 0;
256 
257   for (i = 0; i < count; i++) {
258     if (data->pfd[i].events) {
259       if (j < i) {
260         data->pfd[j].fd = data->pfd[i].fd;
261         data->pfd[j].events = data->pfd[i].events;
262       }
263       j++;
264     }
265   }
266 
267   count = j;
268   data->count = count;
269 }
270 
rktio_get_fd_limit(rktio_poll_set_t * fds)271 int rktio_get_fd_limit(rktio_poll_set_t *fds)
272 {
273   return 0;
274 }
275 
rktio_get_poll_count(rktio_poll_set_t * fds)276 int rktio_get_poll_count(rktio_poll_set_t *fds)
277 {
278   return fds->data->count;
279 }
280 
rktio_get_poll_fd_array(rktio_poll_set_t * fds)281 struct pollfd *rktio_get_poll_fd_array(rktio_poll_set_t *fds)
282 {
283   return fds->data->pfd;
284 }
285 
rktio_poll_set_add_nosleep(rktio_t * rktio,rktio_poll_set_t * fds)286 void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
287 {
288   fds->data->skip_sleep = 1;
289 }
290 
fdset_has_nosleep(rktio_poll_set_t * fds)291 static int fdset_has_nosleep(rktio_poll_set_t *fds)
292 {
293   return fds->data->skip_sleep;
294 }
295 
296 #elif defined(USE_DYNAMIC_FDSET_SIZE)
297 
298 /*========================================================================*/
299 /* Variant with run-time determined fd_set length                         */
300 /*========================================================================*/
301 
302 struct rktio_poll_set_t {
303   fd_set data;
304 };
305 
306 # ifdef RKTIO_GROWABLE_FDSET
307 /* In growable mode, a `rktio_poll_set_t` is an array whose first
308    element points to an array of three `fd_set`s. The array also has
309    elements (after a "header") that contain 0, 1, and 2, and the
310    default or "read" array corresponds to pointing at the 0 element,
311    while the "exn" array is represented by poinitng at the 2 index.
312    The three `fd_set`s are resized as needed when encountering a file
313    descriptor whose value is larger than the current allocated
314    size. */
315 # define GROWABLE_CONTENT_INDEX 0
316 # define GROWABLE_SIZE_INDEX 1
317 # define GROWABLE_RKTIO_INDEX 2
318 # define GROWABLE_HEADER_SIZE 3
319 # else
320 /* initialized early via rktio_alloc_global_poll_set */
321 static int dynamic_fd_size;
322 # endif
323 
324 # define STORED_ACTUAL_FDSET_LIMIT
325 # define FDSET_LIMIT(fd, dynamic_fd_size) (*(int *)((char *)fd + dynamic_fd_size))
326 
alloc_fdset_arrays(rktio_t * rktio)327 static rktio_poll_set_t *alloc_fdset_arrays(rktio_t *rktio)
328 {
329   void *p;
330 
331 #ifdef RKTIO_GROWABLE_FDSET
332   int dynamic_fd_size = 2 * rktio->max_fd_so_far;
333   if (dynamic_fd_size == 0)
334     dynamic_fd_size = 64;
335   /* divide by bits-per-byte: */
336   dynamic_fd_size = (dynamic_fd_size + 7) >> 3;
337   /* word-align: */
338   if (dynamic_fd_size % sizeof(void*))
339     dynamic_fd_size += sizeof(void*) - (dynamic_fd_size % sizeof(void*));
340 #else
341   if (!dynamic_fd_size) {
342 # ifdef USE_ULIMIT
343     dynamic_fd_size = ulimit(4, 0);
344 # else
345     dynamic_fd_size = getdtablesize();
346 # endif
347     /* divide by bits-per-byte: */
348     dynamic_fd_size = (dynamic_fd_size + 7) >> 3;
349     /* word-align: */
350     if (dynamic_fd_size % sizeof(void*))
351       dynamic_fd_size += sizeof(void*) - (dynamic_fd_size % sizeof(void*));
352   }
353 #endif
354 
355   /* Allocate an array with 1 extra intptr_t in each set to hold a
356      "max" fd counter, and 1 extra integer used to record "no
357      sleeping" */
358 
359   p = malloc(3 * (dynamic_fd_size + sizeof(intptr_t) + sizeof(int)));
360 
361 #ifdef RKTIO_GROWABLE_FDSET
362   {
363     void **p2 = malloc(6 * sizeof(void*));
364     p2[GROWABLE_CONTENT_INDEX] = p;
365     p2[GROWABLE_SIZE_INDEX] = (void *)(intptr_t)dynamic_fd_size;
366     p2[GROWABLE_RKTIO_INDEX] = &rktio->max_fd_so_far;
367     p2[GROWABLE_HEADER_SIZE] = (void *)0; /* point here for read set */
368     p2[GROWABLE_HEADER_SIZE+1] = (void *)1; /* point here for write set */
369     p2[GROWABLE_HEADER_SIZE+2] = (void *)2; /* point here for error set */
370     p = &p2[GROWABLE_HEADER_SIZE];
371   }
372 #endif
373 
374   return p;
375 }
376 
377 #ifdef RKTIO_GROWABLE_FDSET
378 # define GROWABLE_START(fd, index) ((rktio_poll_set_t *)(((void **)fd) - (index) - GROWABLE_HEADER_SIZE))
379 #endif
380 
381 #ifdef RKTIO_GROWABLE_FDSET
maybe_grow(rktio_poll_set_t * fd,int n)382 static void maybe_grow(rktio_poll_set_t *fd, int n)
383 {
384   int index = *((intptr_t *)fd);
385   fd = GROWABLE_START(fd, index);
386 
387   if (((int **)fd)[GROWABLE_RKTIO_INDEX][0] < n)
388     ((int **)fd)[GROWABLE_RKTIO_INDEX][0] = n;
389 
390   if (((intptr_t *)fd)[GROWABLE_SIZE_INDEX] <= (n >> 3)) {
391     void *p2, *p = ((void **)fd)[GROWABLE_CONTENT_INDEX];
392     int extra = sizeof(intptr_t) + sizeof(int);
393     int old_size = ((intptr_t *)fd)[GROWABLE_SIZE_INDEX];
394     int new_size = ((2 * n + 7) >> 3);
395     if (new_size % sizeof(void*))
396       new_size += sizeof(void*) - (new_size % sizeof(void*));
397 
398     p2 = malloc(3 * (new_size + extra));
399     memset(p2, 0, 3*new_size);
400 
401     /* copy currently set bits: */
402     memcpy(p2, p, old_size);
403     memcpy((char *)p2 + (new_size + extra), (char *)p + (old_size + extra), old_size);
404     memcpy((char *)p2 + 2*(new_size + extra), (char *)p + 2*(old_size + extra), old_size);
405 
406     /* copy over size and nosleep */
407     memcpy((char *)p2 + new_size, (char *)p + old_size, extra);
408 
409     ((void **)fd)[GROWABLE_CONTENT_INDEX] = p2;
410     ((intptr_t *)fd)[GROWABLE_SIZE_INDEX] = new_size;
411 
412     free(p);
413   }
414 }
415 #endif
416 
free_fdset_arrays(rktio_poll_set_t * fds)417 static void free_fdset_arrays(rktio_poll_set_t *fds)
418 {
419 #ifdef RKTIO_GROWABLE_FDSET
420   fds = GROWABLE_START(fds, 0);
421   free(((void **)fds)[GROWABLE_CONTENT_INDEX]);
422 #endif
423   free(fds);
424 }
425 
rktio_get_fdset(rktio_poll_set_t * fdarray,int pos)426 rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
427 {
428 #ifdef RKTIO_GROWABLE_FDSET
429   return (rktio_poll_set_t *)(((void **)fdarray) + pos);
430 #else
431   return (rktio_poll_set_t *)(((char *)fdarray) + (pos * (dynamic_fd_size + sizeof(intptr_t) + sizeof(int))));
432 #endif
433 }
434 
435 #ifdef RKTIO_GROWABLE_FDSET
growable_resolve(rktio_poll_set_t * fd,int * _dynamic_fd_size)436 rktio_poll_set_t *growable_resolve(rktio_poll_set_t *fd, int *_dynamic_fd_size)
437 {
438   int index = ((intptr_t *)fd)[0];
439   int dynamic_fd_size;
440 
441   fd = GROWABLE_START(fd, index);
442   dynamic_fd_size = ((intptr_t *)fd)[GROWABLE_SIZE_INDEX];
443   if (_dynamic_fd_size)
444     *_dynamic_fd_size = dynamic_fd_size;
445 
446   return (rktio_poll_set_t *)((char *)(((void **)fd)[GROWABLE_CONTENT_INDEX])
447                               + index * (dynamic_fd_size  + sizeof(intptr_t) + sizeof(int)));
448 }
449 
rktio_resolve_fds(rktio_poll_set_t * fd)450 void *rktio_resolve_fds(rktio_poll_set_t *fd)
451 {
452   return growable_resolve(fd, NULL);
453 }
454 #endif
455 
rktio_fdzero(rktio_poll_set_t * fd)456 void rktio_fdzero(rktio_poll_set_t *fd)
457 {
458 #ifdef RKTIO_GROWABLE_FDSET
459   int dynamic_fd_size;
460   fd = growable_resolve(fd, &dynamic_fd_size);
461 #endif
462 
463   memset(fd, 0, dynamic_fd_size + sizeof(intptr_t) + sizeof(int));
464 }
465 
rktio_poll_set_add_nosleep(rktio_t * rktio,rktio_poll_set_t * fds)466 void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
467 {
468 #ifdef RKTIO_GROWABLE_FDSET
469   int dynamic_fd_size;
470   fds = growable_resolve(fds, &dynamic_fd_size);
471 #endif
472 
473   *(int *)((char *)fds + dynamic_fd_size + sizeof(intptr_t)) = 1;
474 }
475 
fdset_has_nosleep(rktio_poll_set_t * fds)476 static int fdset_has_nosleep(rktio_poll_set_t *fds)
477 {
478 #ifdef RKTIO_GROWABLE_FDSET
479   int dynamic_fd_size;
480   fds = growable_resolve(fds, &dynamic_fd_size);
481 #endif
482 
483   return *(int *)((char *)fds + dynamic_fd_size + sizeof(intptr_t));
484 }
485 
486 /* Continues below: */
487 #define USE_PLAIN_FDS_SET_OPS
488 
489 #elif defined (RKTIO_SYSTEM_WINDOWS)
490 
491 /*========================================================================*/
492 /* Windows variant                                                        */
493 /*========================================================================*/
494 
495 struct  rktio_poll_set_t {
496   SOCKET *sockets;
497 
498   intptr_t added, alloc, last_alloc;
499 
500   intptr_t num_handles, alloc_handles, last_alloc_handles;
501   HANDLE *handles;
502 
503   int *repost_sema;
504 
505   int no_sleep; /* boolean */
506 
507   intptr_t wait_event_mask;
508 
509   HANDLE *wait_array;
510 
511   HANDLE *combined_wait_array;
512   intptr_t combined_len;
513 };
514 
515 static void reset_wait_array(rktio_poll_set_t *efd);
516 
init_fdset_array(rktio_poll_set_t * fdarray,int count)517 static void init_fdset_array(rktio_poll_set_t *fdarray, int count)
518 {
519   if (count) {
520     int i;
521     rktio_poll_set_t *fd;
522     for (i = 0; i < count; i++) {
523       int reset = 0;
524       fd = rktio_get_fdset(fdarray, i);
525       fd->added = 0;
526       if (fd->alloc > (2 * fd->last_alloc)) {
527 	fd->alloc = 0;
528         if (fd->sockets) free(fd->sockets);
529 	fd->sockets = NULL;
530 	reset = 1;
531       }
532       fd->last_alloc = 0;
533       fd->num_handles = 0;
534       if (fd->alloc_handles > (2 * fd->last_alloc_handles)) {
535 	fd->alloc_handles = 0;
536         if (fd->handles) free(fd->handles);
537         if (fd->repost_sema) free(fd->repost_sema);
538 	fd->handles = NULL;
539 	fd->repost_sema = NULL;
540 	reset = 1;
541       }
542       fd->last_alloc_handles = 0;
543       fd->no_sleep = 0;
544       fd->wait_event_mask = 0;
545       if (reset)
546 	reset_wait_array(fdarray);
547     }
548   }
549 }
550 
alloc_fdset_arrays(rktio_t * rktio)551 static rktio_poll_set_t *alloc_fdset_arrays(rktio_t *rktio)
552 {
553   rktio_poll_set_t *fdarray;
554 
555   fdarray = calloc(3, sizeof(rktio_poll_set_t));
556   init_fdset_array(fdarray, 3);
557 
558   return fdarray;
559 }
560 
free_fdset_arrays(rktio_poll_set_t * fds)561 static void free_fdset_arrays(rktio_poll_set_t *fds)
562 {
563   int i;
564 
565   for (i = 0; i < 3; i++) {
566     if (fds[i].handles)
567       free(fds[i].handles);
568     if (fds[i].repost_sema)
569       free(fds[i].repost_sema);
570     if (fds[i].wait_array)
571       free(fds[i].wait_array);
572   }
573   free(fds);
574 }
575 
reset_wait_array(rktio_poll_set_t * efd)576 static void reset_wait_array(rktio_poll_set_t *efd)
577 {
578   /* Allocate an array that may be big enough to hold all events
579      when we eventually call WaitForMultipleObjects. One of the three
580      arrays will be big enough. */
581   int sz = (3 * (efd->alloc + efd->alloc_handles)) + 2;
582   HANDLE *wa;
583   if (efd->wait_array) free(efd->wait_array);
584   wa = calloc(sz, sizeof(HANDLE));
585   efd->wait_array = wa;
586 }
587 
rktio_get_fdset(rktio_poll_set_t * fdarray,int pos)588 rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
589 {
590   return fdarray + pos;
591 }
592 
rktio_fdzero(rktio_poll_set_t * fd)593 void rktio_fdzero(rktio_poll_set_t *fd)
594 {
595   init_fdset_array(fd, 1);
596 }
597 
rktio_fdclr(rktio_poll_set_t * fd,intptr_t n)598 void rktio_fdclr(rktio_poll_set_t *fd, intptr_t n)
599 {
600   intptr_t i;
601   for (i = fd->added; i--; ) {
602     if (fd->sockets[i] == n)
603       fd->sockets[i] = INVALID_SOCKET;
604   }
605 }
606 
next_size(intptr_t v)607 static intptr_t next_size(intptr_t v) { return (v ? (2 * v) : 10); }
608 
rktio_fdset(rktio_poll_set_t * fd,intptr_t n)609 void rktio_fdset(rktio_poll_set_t *fd, intptr_t n)
610 {
611   if (fd->added >= fd->last_alloc) {
612     intptr_t na;
613     na = next_size(fd->last_alloc);
614     fd->last_alloc = na;
615   }
616   if (fd->added >= fd->alloc) {
617     SOCKET *naya;
618     intptr_t na;
619     na = next_size(fd->alloc);
620     naya = malloc(na * sizeof(SOCKET));
621     memcpy(naya, fd->sockets, fd->alloc * sizeof(SOCKET));
622     if (fd->sockets) free(fd->sockets);
623     fd->sockets = naya;
624     fd->alloc = na;
625     reset_wait_array(fd);
626   }
627   fd->sockets[fd->added++] = (SOCKET)n;
628 }
629 
rktio_fdisset(rktio_poll_set_t * fd,intptr_t n)630 int rktio_fdisset(rktio_poll_set_t *fd, intptr_t n)
631 {
632   intptr_t i;
633   for (i = fd->added; i--; ) {
634     if (fd->sockets[i] == (SOCKET)n)
635       return 1;
636   }
637   return 0;
638 }
639 
rktio_merge_fd_sets(rktio_poll_set_t * all_fds,rktio_poll_set_t * src_all_fds)640 void rktio_merge_fd_sets(rktio_poll_set_t *all_fds, rktio_poll_set_t *src_all_fds)
641 {
642   int j;
643   intptr_t i;
644 
645   for (j = 0; j < 3; j++) {
646     rktio_poll_set_t *fds;
647     rktio_poll_set_t *src_fds;
648     fds = rktio_get_fdset(all_fds, j);
649     src_fds = rktio_get_fdset(src_all_fds, j);
650     for (i = src_fds->added; i--; ) {
651       if (src_fds->sockets[i] != INVALID_SOCKET)
652 	rktio_fdset(fds, (intptr_t)src_fds->sockets[i]);
653     }
654     if (src_fds->no_sleep)
655       fds->no_sleep = 1;
656     fds->wait_event_mask |= src_fds->wait_event_mask;
657   }
658 }
659 
rktio_clean_fd_set(rktio_poll_set_t * fds)660 void rktio_clean_fd_set(rktio_poll_set_t *fds)
661 {
662 }
663 
rktio_get_fd_limit(rktio_poll_set_t * fds)664 int rktio_get_fd_limit(rktio_poll_set_t *fds)
665 {
666   return 0;
667 }
668 
rktio_poll_set_add_handle(rktio_t * rktio,intptr_t _h,rktio_poll_set_t * fds,int repost)669 void rktio_poll_set_add_handle(rktio_t *rktio, intptr_t _h, rktio_poll_set_t *fds, int repost)
670 {
671   HANDLE h = (HANDLE)_h;
672   rktio_poll_set_t *efd = fds;
673   HANDLE *hs;
674   intptr_t i, new_i;
675   int *rps;
676 
677   if (efd->num_handles == efd->last_alloc_handles) {
678     i = next_size(efd->last_alloc_handles);
679     efd->last_alloc_handles = 1;
680   }
681   if (efd->num_handles == efd->alloc_handles) {
682     i = efd->alloc_handles;
683     new_i = next_size(i);
684     hs = malloc(sizeof(HANDLE) * new_i);
685     rps = malloc(sizeof(int) * new_i);
686     memcpy(hs, efd->handles, sizeof(HANDLE)*i);
687     memcpy(rps, efd->repost_sema, sizeof(int)*i);
688     if (efd->handles) free(efd->handles);
689     if (efd->repost_sema) free(efd->repost_sema);
690     efd->handles = hs;
691     efd->repost_sema = rps;
692     efd->alloc_handles = new_i;
693     reset_wait_array(efd);
694   }
695   i = efd->num_handles;
696   efd->handles[i] = h;
697   efd->repost_sema[i] = repost;
698   efd->num_handles++;
699 }
700 
rktio_poll_set_add_nosleep(rktio_t * rktio,rktio_poll_set_t * fds)701 void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
702 {
703   fds->no_sleep = 1;
704 }
705 
fdset_has_nosleep(rktio_poll_set_t * fds)706 static int fdset_has_nosleep(rktio_poll_set_t *fds)
707 {
708   return fds->no_sleep;
709 }
710 
rktio_poll_set_add_eventmask(rktio_t * rktio,rktio_poll_set_t * fds,int mask)711 void rktio_poll_set_add_eventmask(rktio_t *rktio, rktio_poll_set_t *fds, int mask)
712 {
713   fds->wait_event_mask |= mask;
714 }
715 
WSAEventSelect_plus_check(SOCKET s,WSAEVENT e,long mask)716 static void WSAEventSelect_plus_check(SOCKET s, WSAEVENT e, long mask)
717 {
718   fd_set rd[1], wr[1], ex[1];
719   struct timeval t = {0, 0};
720 
721   WSAEventSelect(s, e, mask);
722 
723   /* double-check with select(), because WSAEventSelect only
724      handles new activity (I think) */
725   FD_ZERO(rd);
726   FD_ZERO(wr);
727   FD_ZERO(ex);
728 
729   if (mask & FD_READ)
730     FD_SET(s, rd);
731   if (mask & FD_WRITE)
732     FD_SET(s, wr);
733   if (mask & FD_OOB)
734     FD_SET(s, ex);
735 
736   if (select(1, rd, wr, ex, &t)) {
737     /* already ready */
738     WSAEventSelect(s, NULL, 0);
739     SetEvent(e);
740   }
741 }
742 
rktio_collapse_win_fd(rktio_poll_set_t * fds)743 void rktio_collapse_win_fd(rktio_poll_set_t *fds)
744 {
745   rktio_poll_set_t *rfd, *wfd, *efd;
746   HANDLE *wa, e;
747   intptr_t i, p = 0, mask, j;
748   SOCKET s;
749 
750   rfd = fds;
751   wfd = rktio_get_fdset(fds, 1);
752   efd = rktio_get_fdset(fds, 2);
753 
754   if (rfd->combined_wait_array) {
755     /* clean up */
756     for (i = rfd->added; i--; ) {
757       if (rfd->sockets[i] != INVALID_SOCKET)
758 	WSAEventSelect(rfd->sockets[i], NULL, 0);
759     }
760     for (i = wfd->added; i--; ) {
761       if (wfd->sockets[i] != INVALID_SOCKET)
762 	WSAEventSelect(wfd->sockets[i], NULL, 0);
763     }
764     for (i = efd->added; i--; ) {
765       if (efd->sockets[i] != INVALID_SOCKET)
766 	WSAEventSelect(efd->sockets[i], NULL, 0);
767     }
768     p = rfd->num_handles;
769     for (i = rfd->combined_len; i-- > p; ) {
770       WSACloseEvent(rfd->combined_wait_array[i]);
771     }
772     rfd->combined_wait_array = NULL;
773   } else {
774     /* merge */
775     if (rfd->alloc < wfd->alloc) {
776       if (wfd->alloc < efd->alloc)
777 	wa = efd->wait_array;
778       else
779 	wa = wfd->wait_array;
780     } else {
781       if (rfd->alloc < efd->alloc)
782 	wa = efd->wait_array;
783       else
784 	wa = rfd->wait_array;
785     }
786 
787     rfd->combined_wait_array = wa;
788 
789     p = rfd->num_handles;
790     for (i = 0; i < p; i++) {
791       wa[i] = rfd->handles[i];
792     }
793 
794     for (i = rfd->added; i--; ) {
795       s = rfd->sockets[i];
796       if (s != INVALID_SOCKET) {
797 	mask = FD_READ | FD_ACCEPT | FD_CLOSE;
798 
799 	for (j = wfd->added; j--; ) {
800 	  if (wfd->sockets[j] == s) {
801 	    mask |= FD_WRITE;
802 	    break;
803 	  }
804 	}
805 
806 	for (j = efd->added; j--; ) {
807 	  if (efd->sockets[j] == s) {
808 	    mask |= FD_OOB;
809 	    break;
810 	  }
811 	}
812 
813 	e = WSACreateEvent();
814 	wa[p++] = e;
815 	WSAEventSelect_plus_check(s, e, mask);
816       }
817     }
818 
819     for (i = wfd->added; i--; ) {
820       s = wfd->sockets[i];
821       if (s != INVALID_SOCKET) {
822 	mask = FD_WRITE | FD_CONNECT | FD_CLOSE;
823 
824 	for (j = rfd->added; j--; ) {
825 	  if (rfd->sockets[j] == s) {
826 	    mask = 0;
827 	    break;
828 	  }
829 	}
830 
831 	if (mask) {
832 	  for (j = efd->added; j--; ) {
833 	    if (efd->sockets[j] == s) {
834 	      mask |= FD_OOB;
835 	      break;
836 	    }
837 	  }
838 
839 	  e = WSACreateEvent();
840 	  wa[p++] = e;
841 	  WSAEventSelect_plus_check(s, e, mask);
842 	}
843       }
844     }
845 
846     for (i = efd->added; i--; ) {
847       s = efd->sockets[i];
848       if (s != INVALID_SOCKET) {
849 	mask = FD_OOB | FD_CLOSE;
850 
851 	for (j = rfd->added; j--; ) {
852 	  if (rfd->sockets[j] == s) {
853 	    mask = 0;
854 	    break;
855 	  }
856 	}
857 
858 	if (mask) {
859 	  for (j = wfd->added; j--; ) {
860 	    if (wfd->sockets[j] == s) {
861 	      mask = 0;
862 	      break;
863 	    }
864 	  }
865 
866 	  if (mask) {
867 	    e = WSACreateEvent();
868 	    wa[p++] = e;
869 	    WSAEventSelect_plus_check(s, e, mask);
870 	  }
871 	}
872       }
873     }
874 
875     rfd->combined_len = p;
876   }
877 }
878 
879 #else
880 
881 /*========================================================================*/
882 /* Plain fd_set variant                                                   */
883 /*========================================================================*/
884 
alloc_fdset_arrays(rktio_t * rktio)885 static rktio_poll_set_t *alloc_fdset_arrays(rktio_t *rktio)
886 {
887   return malloc(3 * sizeof(rktio_poll_set_t));
888 }
889 
free_fdset_arrays(rktio_poll_set_t * fds)890 static void free_fdset_arrays(rktio_poll_set_t *fds)
891 {
892   free(fds);
893 }
894 
rktio_get_fdset(rktio_poll_set_t * fdarray,int pos)895 rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
896 {
897   return fdarray + pos;
898 }
899 
rktio_fdzero(rktio_poll_set_t * fd)900 void rktio_fdzero(rktio_poll_set_t *fd)
901 {
902   FD_ZERO(&(fd)->data);
903   fd->nosleep = 0;
904 }
905 
rktio_poll_set_add_nosleep(rktio_t * rktio,rktio_poll_set_t * fds)906 void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
907 {
908   fds->nosleep = 1;
909 }
910 
fdset_has_nosleep(rktio_poll_set_t * fds)911 static int fdset_has_nosleep(rktio_poll_set_t *fds)
912 {
913   return fds->nosleep;
914 }
915 
916 #define USE_PLAIN_FDS_SET_OPS
917 
918 #endif
919 
920 #ifdef USE_PLAIN_FDS_SET_OPS
921 
rktio_fdclr(rktio_poll_set_t * fd,intptr_t n)922 void rktio_fdclr(rktio_poll_set_t *fd, intptr_t n)
923 {
924 # ifdef RKTIO_GROWABLE_FDSET
925   fd = growable_resolve(fd, NULL);
926 # endif
927   FD_CLR(n, &(fd)->data);
928 }
929 
rktio_fdset(rktio_poll_set_t * fd,intptr_t n)930 void rktio_fdset(rktio_poll_set_t *fd, intptr_t n)
931 {
932 # ifdef RKTIO_GROWABLE_FDSET
933   int dynamic_fd_size;
934   maybe_grow(fd, n);
935   fd = growable_resolve(fd, &dynamic_fd_size);
936 # endif
937 # ifdef STORED_ACTUAL_FDSET_LIMIT
938   {
939     int mx;
940     mx = FDSET_LIMIT(fd, dynamic_fd_size);
941     if (n > mx) {
942       FDSET_LIMIT(fd, dynamic_fd_size) = n;
943     }
944   }
945 # endif
946 
947   FD_SET(n, &(fd)->data);
948 }
949 
rktio_fdisset(rktio_poll_set_t * fd,intptr_t n)950 int rktio_fdisset(rktio_poll_set_t *fd, intptr_t n)
951 {
952 # ifdef RKTIO_GROWABLE_FDSET
953   fd = growable_resolve(fd, NULL);
954 # endif
955   return FD_ISSET(n, &(fd)->data);
956 }
957 
rktio_merge_fd_sets(rktio_poll_set_t * fds,rktio_poll_set_t * src_fds)958 void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds)
959 {
960   int i, j;
961   rktio_poll_set_t *fds_j, *src_fds_j;
962   unsigned char *p, *sp;
963 # ifdef RKTIO_GROWABLE_FDSET
964   int dynamic_fd_size;
965   int s_dynamic_fd_size;
966 # elif defined (STORED_ACTUAL_FDSET_LIMIT)
967   int s_dynamic_fd_size = dynamic_fd_size;
968 # endif
969 
970   for (j = 0; j < 3; j++) {
971     src_fds_j = rktio_get_fdset(src_fds, j);
972 # ifdef RKTIO_GROWABLE_FDSET
973     src_fds_j = growable_resolve(src_fds_j, &s_dynamic_fd_size);
974 # endif
975     sp = (unsigned char *)src_fds_j;
976 
977 # ifdef RKTIO_GROWABLE_FDSET
978     maybe_grow(fds, FDSET_LIMIT(sp, s_dynamic_fd_size));
979 # endif
980 
981     fds_j = rktio_get_fdset(fds, j);
982 # ifdef RKTIO_GROWABLE_FDSET
983     fds_j = growable_resolve(fds_j, &dynamic_fd_size);
984 # endif
985     p = (unsigned char *)fds_j;
986 
987 # ifdef STORED_ACTUAL_FDSET_LIMIT
988     i = FDSET_LIMIT(p, dynamic_fd_size);
989     if (FDSET_LIMIT(sp, s_dynamic_fd_size) > i) {
990       i = FDSET_LIMIT(sp, s_dynamic_fd_size);
991       FDSET_LIMIT(p, dynamic_fd_size) = i;
992     }
993     /* `i` is max fd, so add 1 to get count, then convert to bytes (rounding up) */
994     i = (i + 1 + 7) >> 3;
995 # elif defined(USE_DYNAMIC_FDSET_SIZE)
996     i = dynamic_fd_size;
997 # else
998     i = sizeof(fd_set);
999 # endif
1000 
1001     for (; i--; p++, sp++) {
1002       *p |= *sp;
1003     }
1004   }
1005 
1006   if (fdset_has_nosleep(src_fds))
1007     rktio_poll_set_add_nosleep(NULL, fds);
1008 }
1009 
rktio_clean_fd_set(rktio_poll_set_t * fds)1010 void rktio_clean_fd_set(rktio_poll_set_t *fds)
1011 {
1012 }
1013 
rktio_get_fd_limit(rktio_poll_set_t * fds)1014 int rktio_get_fd_limit(rktio_poll_set_t *fds)
1015 {
1016   int actual_limit;
1017 
1018 # ifdef STORED_ACTUAL_FDSET_LIMIT
1019   {
1020     fd_set *rd, *wr, *ex;
1021 #  ifdef RKTIO_GROWABLE_FDSET
1022     int dynamic_fd_size;
1023     (void)growable_resolve(fds, &dynamic_fd_size);
1024 #  endif
1025 
1026     rd = RKTIO_FDS(fds);
1027     wr = RKTIO_FDS(RKTIO_GET_FDSET(fds, 1));
1028     ex = RKTIO_FDS(RKTIO_GET_FDSET(fds, 2));
1029 
1030     actual_limit = FDSET_LIMIT(rd, dynamic_fd_size);
1031     if (FDSET_LIMIT(wr, dynamic_fd_size) > actual_limit)
1032       actual_limit = FDSET_LIMIT(wr, dynamic_fd_size);
1033     if (FDSET_LIMIT(ex, dynamic_fd_size) > actual_limit)
1034       actual_limit = FDSET_LIMIT(ex, dynamic_fd_size);
1035     actual_limit++;
1036   }
1037 # elif defined (USE_ULIMIT)
1038   actual_limit = ulimit(4, 0);
1039 #elif defined(FIXED_FD_LIMIT)
1040   actual_limit = FIXED_FD_LIMIT;
1041 #else
1042   actual_limit = getdtablesize();
1043 # endif
1044 
1045   return actual_limit;
1046 }
1047 
1048 #endif
1049 
1050 #ifndef RKTIO_SYSTEM_WINDOWS
rktio_poll_set_add_handle(rktio_t * rktio,intptr_t h,rktio_poll_set_t * fds,int repost)1051 void rktio_poll_set_add_handle(rktio_t *rktio, intptr_t h, rktio_poll_set_t *fds, int repost) { }
rktio_poll_set_add_eventmask(rktio_t * rktio,rktio_poll_set_t * fds,int mask)1052 void rktio_poll_set_add_eventmask(rktio_t *rktio, rktio_poll_set_t *fds, int mask) { }
rkio_reset_sleep_backoff(rktio_t * rktio)1053 void rkio_reset_sleep_backoff(rktio_t *rktio) { }
1054 #endif
1055 
1056 /*========================================================================*/
1057 /* Shared internal poll set                                               */
1058 /*========================================================================*/
1059 
1060 /* Generalize fd arrays (FD_SET, etc) with a runtime-determined size,
1061    special hooks for Windows "descriptors" like even queues and
1062    semaphores, etc. */
1063 
rktio_alloc_global_poll_set(rktio_t * rktio)1064 void rktio_alloc_global_poll_set(rktio_t *rktio) {
1065 #ifdef USE_FAR_RKTIO_FDCALLS
1066   rktio->rktio_global_poll_set = alloc_fdset_arrays(rktio);
1067 #endif
1068 }
1069 
rktio_free_global_poll_set(rktio_t * rktio)1070 void rktio_free_global_poll_set(rktio_t *rktio) {
1071 #ifdef USE_FAR_RKTIO_FDCALLS
1072   free_fdset_arrays(rktio->rktio_global_poll_set);
1073 #endif
1074 }
1075 
1076 /*========================================================================*/
1077 /* Create a poll set                                                      */
1078 /*========================================================================*/
1079 
1080 /* Internally, poll sets are used with macros like DECL_FDSET(), but this
1081    is the API for external use. */
1082 
rktio_make_poll_set(rktio_t * rktio)1083 rktio_poll_set_t *rktio_make_poll_set(rktio_t *rktio)
1084 {
1085   rktio_poll_set_t *fds = alloc_fdset_arrays(rktio);
1086 
1087   RKTIO_FD_ZERO(fds);
1088   RKTIO_FD_ZERO(RKTIO_GET_FDSET(fds, 1));
1089   RKTIO_FD_ZERO(RKTIO_GET_FDSET(fds, 2));
1090 
1091   return fds;
1092 }
1093 
rktio_poll_set_forget(rktio_t * rktio,rktio_poll_set_t * fds)1094 void rktio_poll_set_forget(rktio_t *rktio, rktio_poll_set_t *fds)
1095 {
1096   free_fdset_arrays(fds);
1097 }
1098 
1099 /*========================================================================*/
1100 /* Sleeping as a generalized select()                                     */
1101 /*========================================================================*/
1102 
rktio_initialize_signal(rktio_t * rktio)1103 int rktio_initialize_signal(rktio_t *rktio)
1104 {
1105 #ifdef RKTIO_SYSTEM_UNIX
1106   /* Set up a pipe for signaling external events: */
1107   int fds[2];
1108   if (!pipe(fds)) {
1109     rktio->external_event_fd = fds[0];
1110     rktio->put_external_event_fd = fds[1];
1111     fcntl(rktio->external_event_fd, F_SETFL, RKTIO_NONBLOCKING);
1112     fcntl(rktio->put_external_event_fd, F_SETFL, RKTIO_NONBLOCKING);
1113     return 1;
1114   } else {
1115     set_racket_error(RKTIO_ERROR_INIT_FAILED);
1116     return 0;
1117   }
1118 #endif
1119 
1120 #ifdef RKTIO_SYSTEM_WINDOWS
1121   rktio->break_semaphore = (void*)CreateSemaphore(NULL, 0, 1, NULL);
1122   if (rktio->break_semaphore == INVALID_HANDLE_VALUE) {
1123     get_windows_error();
1124     return 0;
1125   } else
1126     return 1;
1127 #endif
1128 }
1129 
rktio_free_signal(rktio_t * rktio)1130 void rktio_free_signal(rktio_t *rktio)
1131 {
1132 #ifdef RKTIO_SYSTEM_UNIX
1133   rktio_reliably_close(rktio->external_event_fd);
1134   rktio_reliably_close(rktio->put_external_event_fd);
1135 #endif
1136 
1137 #ifdef RKTIO_SYSTEM_WINDOWS
1138   CloseHandle(rktio->break_semaphore);
1139 #endif
1140 }
1141 
rktio_get_signal_handle(rktio_t * rktio)1142 rktio_signal_handle_t *rktio_get_signal_handle(rktio_t *rktio)
1143 {
1144 #ifdef RKTIO_SYSTEM_UNIX
1145   return (rktio_signal_handle_t *)&rktio->put_external_event_fd;
1146 #endif
1147 #ifdef RKTIO_SYSTEM_WINDOWS
1148   return (rktio_signal_handle_t *)&rktio->break_semaphore;
1149 #endif
1150 }
1151 
rktio_signal_received(rktio_t * rktio)1152 void rktio_signal_received(rktio_t *rktio)
1153 {
1154   rktio_signal_received_at(rktio_get_signal_handle(rktio));
1155 }
1156 
rktio_signal_received_at(rktio_signal_handle_t * h)1157 void rktio_signal_received_at(rktio_signal_handle_t *h)
1158 {
1159 #ifdef RKTIO_SYSTEM_UNIX
1160   int put_ext_event_fd = *(int *)h;
1161   int saved_errno = errno;
1162   if (put_ext_event_fd) {
1163     int v;
1164     do {
1165       v = write(put_ext_event_fd, "!", 1);
1166     } while ((v == -1) && (errno == EINTR));
1167   }
1168   errno = saved_errno;
1169 #endif
1170 #ifdef RKTIO_SYSTEM_WINDOWS
1171   ReleaseSemaphore(*(HANDLE *)h, 1, NULL);
1172 #endif
1173 }
1174 
rktio_flush_signals_received(rktio_t * rktio)1175 void rktio_flush_signals_received(rktio_t *rktio)
1176 {
1177 #ifdef RKTIO_SYSTEM_UNIX
1178   /* Clear external event flag */
1179   if (rktio->external_event_fd) {
1180     int rc;
1181     char buf[10];
1182     do {
1183       rc = read(rktio->external_event_fd, buf, 10);
1184     } while ((rc == -1) && errno == EINTR);
1185   }
1186 #endif
1187 }
1188 
rktio_wait_until_signal_received(rktio_t * rktio)1189 void rktio_wait_until_signal_received(rktio_t *rktio)
1190 {
1191 #ifdef RKTIO_SYSTEM_UNIX
1192   int r;
1193 # ifdef HAVE_POLL_SYSCALL
1194   struct pollfd pfd[1];
1195   pfd[0].fd = rktio->external_event_fd;
1196   pfd[0].events = POLLIN;
1197   do {
1198     r = poll(pfd, 1, -1);
1199   } while ((r == -1) && (errno == EINTR));
1200 # else
1201   DECL_FDSET(readfds, 1);
1202 
1203   INIT_DECL_RD_FDSET(readfds);
1204 
1205   RKTIO_FD_ZERO(readfds);
1206   RKTIO_FD_SET(rktio->external_event_fd, readfds);
1207 
1208   do {
1209     r = select(rktio->external_event_fd + 1, RKTIO_FDS(readfds), NULL, NULL, NULL);
1210   } while ((r == -1) && (errno == EINTR));
1211 # endif
1212 #endif
1213 #ifdef RKTIO_SYSTEM_WINDOWS
1214   WaitForSingleObject(rktio->break_semaphore, INFINITE);
1215 #endif
1216 
1217   rktio_flush_signals_received(rktio);
1218 }
1219 
1220 /****************** Windows cleanup  *****************/
1221 
1222 #ifdef RKTIO_SYSTEM_WINDOWS
1223 
clean_up_wait(rktio_t * rktio,intptr_t result,HANDLE * array,int * rps,int count)1224 static void clean_up_wait(rktio_t *rktio,
1225                           intptr_t result, HANDLE *array,
1226 			  int *rps, int count)
1227 {
1228   if ((result >= (intptr_t)WAIT_OBJECT_0) && (result < (intptr_t)WAIT_OBJECT_0 + count)) {
1229     result -= WAIT_OBJECT_0;
1230     if (rps[result])
1231       ReleaseSemaphore(array[result], 1, NULL);
1232   }
1233 
1234   /* Clear out break semaphore */
1235   WaitForSingleObject(rktio->break_semaphore, 0);
1236 }
1237 
rkio_reset_sleep_backoff(rktio_t * rktio)1238 void rkio_reset_sleep_backoff(rktio_t *rktio)
1239 {
1240   rktio->made_progress = 1;
1241 }
1242 
prepare_windows_sleep(DWORD msec)1243 static void prepare_windows_sleep(DWORD msec)
1244 {
1245   /* The default scheduling granilaity is usually 16ms, which
1246      means that a request to sleep 16ms could easily end up being 31ms,
1247      and a request to sleep 2ms is likely at least 16ms. We can
1248      request a finer granularity of scheduling, but do that only
1249      temporarily, because it may slow down the rest of the system. */
1250   if (msec < 32)
1251     timeBeginPeriod((msec >> 2) | 1);
1252 }
1253 
finish_windows_sleep(DWORD msec)1254 static void finish_windows_sleep(DWORD msec)
1255 {
1256   if (msec < 32)
1257     timeEndPeriod((msec >> 1) | 1);
1258 }
1259 
1260 #endif
1261 
1262 /******************** Main sleep function  *****************/
1263 /* The simple select() stuff is buried in various kinds of complexity. */
1264 
1265 /* FIXME: don't forget SIGCHILD_DOESNT_INTERRUPT_SELECT handling in Racket */
1266 
rktio_sleep(rktio_t * rktio,float nsecs,rktio_poll_set_t * fds,rktio_ltps_t * lt)1267 void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt)
1268 {
1269   if (fds && fdset_has_nosleep(fds))
1270     return;
1271 
1272   if (fds && lt) {
1273 #if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
1274     int fd = rktio_ltps_get_fd(lt);
1275     /* `fd` can be -1, because the file descriptor is created lazily */
1276     if (fd != -1) {
1277       RKTIO_FD_SET(fd, fds);
1278       RKTIO_FD_SET(fd, RKTIO_GET_FDSET(fds, 2));
1279     }
1280 #else
1281     rktio_merge_fd_sets(fds, rktio_ltps_get_fd_set(lt));
1282 #endif
1283   }
1284 
1285   if (!fds) {
1286     /* Nothing to block on - just sleep for some amount of time. */
1287 #ifdef RKTIO_SYSTEM_UNIX
1288 # ifdef HAVE_POLL_SYSCALL
1289     int timeout;
1290     if (nsecs <= 0.0)
1291       timeout = -1;
1292     else {
1293       timeout = (int)(nsecs * 1000.0);
1294       if (timeout < 0)
1295         timeout = 0;
1296     }
1297     if (rktio->external_event_fd) {
1298       struct pollfd pfd[1];
1299       pfd[0].fd = rktio->external_event_fd;
1300       pfd[0].events = POLLIN;
1301       poll(pfd, 1, timeout);
1302     } else {
1303       poll(NULL, 0, timeout);
1304     }
1305 # else
1306     /* Sleep by selecting on the external event fd */
1307     struct timeval time;
1308     intptr_t secs = (intptr_t)nsecs;
1309     intptr_t usecs = (intptr_t)(fmod(nsecs, 1.0) * 1000000);
1310 
1311     if (nsecs && (nsecs > 100000))
1312       secs = 100000;
1313     if (usecs < 0)
1314       usecs = 0;
1315     if (usecs >= 1000000)
1316       usecs = 999999;
1317 
1318     time.tv_sec = secs;
1319     time.tv_usec = usecs;
1320 
1321     if (rktio->external_event_fd) {
1322       DECL_FDSET(readfds, 1);
1323 
1324       INIT_DECL_RD_FDSET(readfds);
1325 
1326       RKTIO_FD_ZERO(readfds);
1327       RKTIO_FD_SET(rktio->external_event_fd, readfds);
1328 
1329       select(rktio->external_event_fd + 1, RKTIO_FDS(readfds), NULL, NULL, &time);
1330     } else {
1331       select(0, NULL, NULL, NULL, &time);
1332     }
1333 # endif
1334 #else
1335 # ifdef RKTIO_SYSTEM_WINDOWS
1336     {
1337       DWORD msecs = nsecs * 1000;
1338       if (msecs > 0) {
1339 	prepare_windows_sleep(msecs);
1340 	Sleep(msecs);
1341 	finish_windows_sleep(msecs);
1342       }
1343     }
1344 # else
1345 #  ifndef NO_SLEEP
1346 #   ifndef NO_USLEEP
1347     usleep((unsigned)(nsecs * 1000));
1348 #    else
1349     sleep(nsecs);
1350 #   endif
1351 #  endif
1352 # endif
1353 #endif
1354   } else {
1355     /* Something to block on.... */
1356 
1357 #ifdef HAVE_POLL_SYSCALL
1358 
1359     /******* poll() variant *******/
1360 
1361     {
1362       struct rktio_fd_set_data_t *data = fds->data;
1363       intptr_t count = data->count;
1364       int timeout;
1365 
1366       if (nsecs <= 0.0)
1367         timeout = -1;
1368       else if (nsecs > 100000)
1369         timeout = 100000000;
1370       else {
1371         timeout = (int)(nsecs * 1000.0);
1372         if (timeout < 0)
1373           timeout = 0;
1374       }
1375 
1376       if (rktio->external_event_fd) {
1377         data->pfd[count].fd = rktio->external_event_fd;
1378         data->pfd[count].events = POLLIN;
1379         count++;
1380       }
1381 
1382       poll(data->pfd, count, timeout);
1383     }
1384 #elif !defined(RKTIO_SYSTEM_WINDOWS)
1385 
1386     /******* select() variant *******/
1387 
1388     {
1389       int actual_limit;
1390       fd_set *rd, *wr, *ex;
1391       struct timeval time;
1392       intptr_t secs = (intptr_t)nsecs;
1393       intptr_t usecs = (intptr_t)(fmod(nsecs, 1.0) * 1000000);
1394 
1395       if (nsecs && (nsecs > 100000))
1396 	secs = 100000;
1397       if (usecs < 0)
1398 	usecs = 0;
1399       if (usecs >= 1000000)
1400 	usecs = 999999;
1401 
1402       time.tv_sec = secs;
1403       time.tv_usec = usecs;
1404 
1405       rd = RKTIO_FDS(fds);
1406       wr = RKTIO_FDS(RKTIO_GET_FDSET(fds, 1));
1407       ex = RKTIO_FDS(RKTIO_GET_FDSET(fds, 2));
1408 
1409       actual_limit = rktio_get_fd_limit(fds);
1410 
1411       /* Watch for external events, too: */
1412       if (rktio->external_event_fd) {
1413         FD_SET(rktio->external_event_fd, rd);
1414         if (rktio->external_event_fd >= actual_limit)
1415           actual_limit = rktio->external_event_fd + 1;
1416       }
1417 
1418       select(actual_limit, rd, wr, ex, nsecs ? &time : NULL);
1419     }
1420 
1421 #else
1422 
1423     /******* Windows variant *******/
1424 
1425     {
1426       intptr_t result;
1427       HANDLE *array, just_two_array[2];
1428       intptr_t count, rcount;
1429       int *rps;
1430 
1431       rktio_collapse_win_fd(fds); /* merges */
1432 
1433       rcount = fds->num_handles;
1434       count = fds->combined_len;
1435       array = fds->combined_wait_array;
1436       rps = fds->repost_sema;
1437 
1438       /* add break semaphore: */
1439       if (!count)
1440 	array = just_two_array;
1441       array[count++] = rktio->break_semaphore;
1442 
1443       /* Extensions may handle events.
1444 	 If the event queue is empty (as reported by GetQueueStatus),
1445 	 everything's ok.
1446 
1447 	 Otherwise, we have trouble sleeping until an event is ready. We
1448 	 sometimes leave events on th queue because, say, an eventspace is
1449 	 not ready. The problem is that MsgWait... only unblocks when a new
1450 	 event appears. Since extensions may check the queue using a sequence of
1451 	 PeekMessages, it's possible that an event is added during the
1452 	 middle of the sequence, but doesn't get handled.
1453 
1454 	 To avoid this problem, we don't actually sleep indefinitely if an event
1455 	 is pending. Instead, we slep 10 ms, then 20 ms, etc. This exponential
1456 	 backoff ensures that we eventually handle a pending event, but we don't
1457 	 spin and eat CPU cycles. The back-off is reset whenever a thread makes
1458 	 progress. */
1459 
1460       if (fds->wait_event_mask && GetQueueStatus(fds->wait_event_mask)) {
1461 	if (!rktio->made_progress) {
1462 	  /* Ok, we've gone around at least once. */
1463 	  if (rktio->max_sleep_time < 0x20000000) {
1464 	    rktio->max_sleep_time *= 2;
1465 	  }
1466 	} else {
1467 	  /* Starting back-off mode */
1468 	  rktio->made_progress = 0;
1469 	  rktio->max_sleep_time = 5;
1470 	}
1471       } else {
1472 	/* Disable back-off mode */
1473 	rktio->made_progress = 1;
1474 	rktio->max_sleep_time = 0;
1475       }
1476 
1477       /* Wait for HANDLE-based input: */
1478       {
1479 	DWORD msec;
1480 	if (nsecs) {
1481 	  if (nsecs > 100000)
1482 	    msec = 100000000;
1483 	  else
1484 	    msec = (DWORD)(nsecs * 1000);
1485 	  if (rktio->max_sleep_time && (msec > rktio->max_sleep_time))
1486 	    msec = rktio->max_sleep_time;
1487 	} else {
1488 	  if (rktio->max_sleep_time)
1489 	    msec = rktio->max_sleep_time;
1490 	  else
1491 	    msec = INFINITE;
1492 	}
1493 
1494 	prepare_windows_sleep(msec);
1495 	result = MsgWaitForMultipleObjects(count, array, FALSE, msec, fds->wait_event_mask);
1496 	finish_windows_sleep(msec);
1497       }
1498       clean_up_wait(rktio, result, array, rps, rcount);
1499       rktio_collapse_win_fd(fds); /* cleans up */
1500 
1501       return;
1502     }
1503 #endif
1504   }
1505 
1506   rktio_flush_signals_received(rktio);
1507 }
1508