1 #include "rktio.h"
2 #include "rktio_private.h"
3 #ifdef RKTIO_SYSTEM_UNIX
4 # include <sys/select.h>
5 # include <unistd.h>
6 # include <fcntl.h>
7 # include <errno.h>
8 # include <math.h>
9 # ifdef USE_ULIMIT
10 # include <ulimit.h>
11 # endif
12 #endif
13 #ifdef HAVE_POLL_SYSCALL
14 # include <poll.h>
15 #endif
16 #include <string.h>
17 #include <stdlib.h>
18
19 /*========================================================================*/
20 /* Poll variant */
21 /*========================================================================*/
22
23 #ifdef HAVE_POLL_SYSCALL
24
25 # define PFD_EXTRA_SPACE 1
26
27 struct rktio_poll_set_t {
28 struct rktio_fd_set_data_t *data;
29 rktio_poll_set_t *w;
30 rktio_poll_set_t *e;
31 int flags;
32 };
33
34 struct rktio_fd_set_data_t {
35 struct pollfd *pfd;
36 intptr_t size, count;
37 int skip_sleep;
38 };
39
alloc_fdset_arrays(rktio_t * rktio)40 static rktio_poll_set_t *alloc_fdset_arrays(rktio_t *rktio)
41 {
42 struct rktio_fd_set_data_t *data;
43 rktio_poll_set_t *r, *w, *e;
44 struct pollfd *pfd;
45
46 data = malloc(sizeof(struct rktio_fd_set_data_t));
47 r = malloc(sizeof(struct rktio_poll_set_t));
48 w = malloc(sizeof(struct rktio_poll_set_t));
49 e = malloc(sizeof(struct rktio_poll_set_t));
50
51 r->w = w;
52 r->e = e;
53 r->data = data;
54 w->data = data;
55 e->data = data;
56
57 r->flags = POLLIN;
58 w->flags = POLLOUT;
59 e->flags = 0;
60
61 data->size = 32;
62 data->count = 0;
63 data->skip_sleep = 0;
64
65 pfd = malloc(sizeof(struct pollfd) * (32 + PFD_EXTRA_SPACE));
66 data->pfd = pfd;
67
68 return r;
69 }
70
free_fdset_arrays(rktio_poll_set_t * fds)71 static void free_fdset_arrays(rktio_poll_set_t *fds)
72 {
73 struct rktio_fd_set_data_t *data = fds->data;
74 free(fds->w);
75 free(fds->e);
76 free(fds);
77 free(data->pfd);
78 free(data);
79 }
80
rktio_get_fdset(rktio_poll_set_t * fdarray,int pos)81 rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
82 {
83 switch (pos) {
84 case 0:
85 return fdarray;
86 case 1:
87 return fdarray->w;
88 case 2:
89 default:
90 return fdarray->e;
91 }
92 }
93
rktio_fdzero(rktio_poll_set_t * fd)94 void rktio_fdzero(rktio_poll_set_t *fd)
95 {
96 fd->data->count = 0;
97 fd->data->skip_sleep = 0;
98 }
99
find_fd_pos(struct rktio_fd_set_data_t * data,intptr_t n)100 static int find_fd_pos(struct rktio_fd_set_data_t *data, intptr_t n)
101 {
102 intptr_t count = data->count;
103 intptr_t i;
104
105 /* This linear search probably isn't good enough for hundreds or
106 thousands of descriptors, but epoll()/kqueue() mode should handle
107 that case, anyway. */
108 for (i = 0; i < count; i++) {
109 if (data->pfd[i].fd == n) {
110 return i;
111 }
112 }
113
114 return -1;
115 }
116
rktio_fdclr(rktio_poll_set_t * fd,intptr_t n)117 void rktio_fdclr(rktio_poll_set_t *fd, intptr_t n)
118 {
119 struct rktio_fd_set_data_t *data = fd->data;
120 intptr_t flag = fd->flags;
121 intptr_t pos;
122
123 if (!flag) return;
124
125 pos = find_fd_pos(data, n);
126 if (pos >= 0) {
127 data->pfd[pos].events -= (data->pfd[pos].events & flag);
128 }
129 }
130
rktio_fdset(rktio_poll_set_t * fd,intptr_t n)131 void rktio_fdset(rktio_poll_set_t *fd, intptr_t n)
132 {
133 struct rktio_fd_set_data_t *data = fd->data;
134 intptr_t flag = fd->flags;
135 intptr_t count, size, pos;
136 struct pollfd *pfd;
137
138 if (!flag) return;
139
140 pos = find_fd_pos(data, n);
141 if (pos >= 0) {
142 data->pfd[pos].events |= flag;
143 return;
144 }
145
146 count = data->count;
147 size = data->size;
148 if (count >= size) {
149 size = size * 2;
150 pfd = malloc(sizeof(struct pollfd) * (size + PFD_EXTRA_SPACE));
151 memcpy(pfd, data->pfd, sizeof(struct pollfd) * count);
152 free(data->pfd);
153 data->pfd = pfd;
154 data->size = size;
155 }
156
157 data->pfd[count].fd = n;
158 data->pfd[count].events = flag;
159 count++;
160 data->count = count;
161 }
162
rktio_fdisset(rktio_poll_set_t * fd,intptr_t n)163 int rktio_fdisset(rktio_poll_set_t *fd, intptr_t n)
164 {
165 struct rktio_fd_set_data_t *data = fd->data;
166 intptr_t flag = fd->flags;
167 intptr_t pos;
168
169 if (!flag) flag = (POLLERR | POLLHUP);
170
171 pos = find_fd_pos(data, n);
172 if (pos >= 0) {
173 if (data->pfd[pos].revents & flag)
174 return 1;
175 else
176 return 0;
177 }
178
179 return 0;
180 }
181
cmp_fd(const void * _a,const void * _b)182 static int cmp_fd(const void *_a, const void *_b)
183 {
184 struct pollfd *a = (struct pollfd *)_a;
185 struct pollfd *b = (struct pollfd *)_b;
186 return a->fd - b->fd;
187 }
188
rktio_merge_fd_sets(rktio_poll_set_t * fds,rktio_poll_set_t * src_fds)189 void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds)
190 {
191 struct rktio_fd_set_data_t *data = fds->data;
192 struct rktio_fd_set_data_t *src_data = src_fds->data;
193 int i, si, c, sc, j, nc;
194 struct pollfd *pfds;
195
196 rktio_clean_fd_set(fds);
197 rktio_clean_fd_set(src_fds);
198
199 if (src_data->skip_sleep)
200 data->skip_sleep = 1;
201
202 c = data->count;
203 sc = src_data->count;
204
205 if (!sc)
206 return;
207
208 qsort(data->pfd, c, sizeof(struct pollfd), cmp_fd);
209 qsort(src_data->pfd, sc, sizeof(struct pollfd), cmp_fd);
210
211 nc = c + sc;
212 pfds = malloc(sizeof(struct pollfd) * (nc + PFD_EXTRA_SPACE));
213 j = 0;
214 for (i = 0, si = 0; (i < c) && (si < sc); ) {
215 if (data->pfd[i].fd == src_data->pfd[si].fd) {
216 pfds[j].fd = data->pfd[i].fd;
217 pfds[j].events = (data->pfd[i].events | src_data->pfd[si].events);
218 i++;
219 si++;
220 } else if (data->pfd[i].fd < src_data->pfd[si].fd) {
221 pfds[j].fd = data->pfd[i].fd;
222 pfds[j].events = data->pfd[i].events;
223 i++;
224 } else {
225 pfds[j].fd = src_data->pfd[si].fd;
226 pfds[j].events = src_data->pfd[si].events;
227 si++;
228 }
229 j++;
230 }
231 for ( ; i < c; i++, j++) {
232 pfds[j].fd = data->pfd[i].fd;
233 pfds[j].events = data->pfd[i].events;
234 }
235 for ( ; si < sc; si++, j++) {
236 pfds[j].fd = src_data->pfd[si].fd;
237 pfds[j].events = src_data->pfd[si].events;
238 }
239
240 if (nc > data->size) {
241 free(data->pfd);
242 data->pfd = pfds;
243 data->size = nc;
244 } else {
245 memcpy(data->pfd, pfds, j * sizeof(struct pollfd));
246 free(pfds);
247 }
248 data->count = j;
249 }
250
rktio_clean_fd_set(rktio_poll_set_t * fds)251 void rktio_clean_fd_set(rktio_poll_set_t *fds)
252 {
253 struct rktio_fd_set_data_t *data = fds->data;
254 intptr_t count = data->count;
255 intptr_t i, j = 0;
256
257 for (i = 0; i < count; i++) {
258 if (data->pfd[i].events) {
259 if (j < i) {
260 data->pfd[j].fd = data->pfd[i].fd;
261 data->pfd[j].events = data->pfd[i].events;
262 }
263 j++;
264 }
265 }
266
267 count = j;
268 data->count = count;
269 }
270
rktio_get_fd_limit(rktio_poll_set_t * fds)271 int rktio_get_fd_limit(rktio_poll_set_t *fds)
272 {
273 return 0;
274 }
275
rktio_get_poll_count(rktio_poll_set_t * fds)276 int rktio_get_poll_count(rktio_poll_set_t *fds)
277 {
278 return fds->data->count;
279 }
280
rktio_get_poll_fd_array(rktio_poll_set_t * fds)281 struct pollfd *rktio_get_poll_fd_array(rktio_poll_set_t *fds)
282 {
283 return fds->data->pfd;
284 }
285
rktio_poll_set_add_nosleep(rktio_t * rktio,rktio_poll_set_t * fds)286 void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
287 {
288 fds->data->skip_sleep = 1;
289 }
290
fdset_has_nosleep(rktio_poll_set_t * fds)291 static int fdset_has_nosleep(rktio_poll_set_t *fds)
292 {
293 return fds->data->skip_sleep;
294 }
295
296 #elif defined(USE_DYNAMIC_FDSET_SIZE)
297
298 /*========================================================================*/
299 /* Variant with run-time determined fd_set length */
300 /*========================================================================*/
301
302 struct rktio_poll_set_t {
303 fd_set data;
304 };
305
306 # ifdef RKTIO_GROWABLE_FDSET
307 /* In growable mode, a `rktio_poll_set_t` is an array whose first
308 element points to an array of three `fd_set`s. The array also has
309 elements (after a "header") that contain 0, 1, and 2, and the
310 default or "read" array corresponds to pointing at the 0 element,
311 while the "exn" array is represented by poinitng at the 2 index.
312 The three `fd_set`s are resized as needed when encountering a file
313 descriptor whose value is larger than the current allocated
314 size. */
315 # define GROWABLE_CONTENT_INDEX 0
316 # define GROWABLE_SIZE_INDEX 1
317 # define GROWABLE_RKTIO_INDEX 2
318 # define GROWABLE_HEADER_SIZE 3
319 # else
320 /* initialized early via rktio_alloc_global_poll_set */
321 static int dynamic_fd_size;
322 # endif
323
324 # define STORED_ACTUAL_FDSET_LIMIT
325 # define FDSET_LIMIT(fd, dynamic_fd_size) (*(int *)((char *)fd + dynamic_fd_size))
326
alloc_fdset_arrays(rktio_t * rktio)327 static rktio_poll_set_t *alloc_fdset_arrays(rktio_t *rktio)
328 {
329 void *p;
330
331 #ifdef RKTIO_GROWABLE_FDSET
332 int dynamic_fd_size = 2 * rktio->max_fd_so_far;
333 if (dynamic_fd_size == 0)
334 dynamic_fd_size = 64;
335 /* divide by bits-per-byte: */
336 dynamic_fd_size = (dynamic_fd_size + 7) >> 3;
337 /* word-align: */
338 if (dynamic_fd_size % sizeof(void*))
339 dynamic_fd_size += sizeof(void*) - (dynamic_fd_size % sizeof(void*));
340 #else
341 if (!dynamic_fd_size) {
342 # ifdef USE_ULIMIT
343 dynamic_fd_size = ulimit(4, 0);
344 # else
345 dynamic_fd_size = getdtablesize();
346 # endif
347 /* divide by bits-per-byte: */
348 dynamic_fd_size = (dynamic_fd_size + 7) >> 3;
349 /* word-align: */
350 if (dynamic_fd_size % sizeof(void*))
351 dynamic_fd_size += sizeof(void*) - (dynamic_fd_size % sizeof(void*));
352 }
353 #endif
354
355 /* Allocate an array with 1 extra intptr_t in each set to hold a
356 "max" fd counter, and 1 extra integer used to record "no
357 sleeping" */
358
359 p = malloc(3 * (dynamic_fd_size + sizeof(intptr_t) + sizeof(int)));
360
361 #ifdef RKTIO_GROWABLE_FDSET
362 {
363 void **p2 = malloc(6 * sizeof(void*));
364 p2[GROWABLE_CONTENT_INDEX] = p;
365 p2[GROWABLE_SIZE_INDEX] = (void *)(intptr_t)dynamic_fd_size;
366 p2[GROWABLE_RKTIO_INDEX] = &rktio->max_fd_so_far;
367 p2[GROWABLE_HEADER_SIZE] = (void *)0; /* point here for read set */
368 p2[GROWABLE_HEADER_SIZE+1] = (void *)1; /* point here for write set */
369 p2[GROWABLE_HEADER_SIZE+2] = (void *)2; /* point here for error set */
370 p = &p2[GROWABLE_HEADER_SIZE];
371 }
372 #endif
373
374 return p;
375 }
376
377 #ifdef RKTIO_GROWABLE_FDSET
378 # define GROWABLE_START(fd, index) ((rktio_poll_set_t *)(((void **)fd) - (index) - GROWABLE_HEADER_SIZE))
379 #endif
380
381 #ifdef RKTIO_GROWABLE_FDSET
maybe_grow(rktio_poll_set_t * fd,int n)382 static void maybe_grow(rktio_poll_set_t *fd, int n)
383 {
384 int index = *((intptr_t *)fd);
385 fd = GROWABLE_START(fd, index);
386
387 if (((int **)fd)[GROWABLE_RKTIO_INDEX][0] < n)
388 ((int **)fd)[GROWABLE_RKTIO_INDEX][0] = n;
389
390 if (((intptr_t *)fd)[GROWABLE_SIZE_INDEX] <= (n >> 3)) {
391 void *p2, *p = ((void **)fd)[GROWABLE_CONTENT_INDEX];
392 int extra = sizeof(intptr_t) + sizeof(int);
393 int old_size = ((intptr_t *)fd)[GROWABLE_SIZE_INDEX];
394 int new_size = ((2 * n + 7) >> 3);
395 if (new_size % sizeof(void*))
396 new_size += sizeof(void*) - (new_size % sizeof(void*));
397
398 p2 = malloc(3 * (new_size + extra));
399 memset(p2, 0, 3*new_size);
400
401 /* copy currently set bits: */
402 memcpy(p2, p, old_size);
403 memcpy((char *)p2 + (new_size + extra), (char *)p + (old_size + extra), old_size);
404 memcpy((char *)p2 + 2*(new_size + extra), (char *)p + 2*(old_size + extra), old_size);
405
406 /* copy over size and nosleep */
407 memcpy((char *)p2 + new_size, (char *)p + old_size, extra);
408
409 ((void **)fd)[GROWABLE_CONTENT_INDEX] = p2;
410 ((intptr_t *)fd)[GROWABLE_SIZE_INDEX] = new_size;
411
412 free(p);
413 }
414 }
415 #endif
416
free_fdset_arrays(rktio_poll_set_t * fds)417 static void free_fdset_arrays(rktio_poll_set_t *fds)
418 {
419 #ifdef RKTIO_GROWABLE_FDSET
420 fds = GROWABLE_START(fds, 0);
421 free(((void **)fds)[GROWABLE_CONTENT_INDEX]);
422 #endif
423 free(fds);
424 }
425
rktio_get_fdset(rktio_poll_set_t * fdarray,int pos)426 rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
427 {
428 #ifdef RKTIO_GROWABLE_FDSET
429 return (rktio_poll_set_t *)(((void **)fdarray) + pos);
430 #else
431 return (rktio_poll_set_t *)(((char *)fdarray) + (pos * (dynamic_fd_size + sizeof(intptr_t) + sizeof(int))));
432 #endif
433 }
434
435 #ifdef RKTIO_GROWABLE_FDSET
growable_resolve(rktio_poll_set_t * fd,int * _dynamic_fd_size)436 rktio_poll_set_t *growable_resolve(rktio_poll_set_t *fd, int *_dynamic_fd_size)
437 {
438 int index = ((intptr_t *)fd)[0];
439 int dynamic_fd_size;
440
441 fd = GROWABLE_START(fd, index);
442 dynamic_fd_size = ((intptr_t *)fd)[GROWABLE_SIZE_INDEX];
443 if (_dynamic_fd_size)
444 *_dynamic_fd_size = dynamic_fd_size;
445
446 return (rktio_poll_set_t *)((char *)(((void **)fd)[GROWABLE_CONTENT_INDEX])
447 + index * (dynamic_fd_size + sizeof(intptr_t) + sizeof(int)));
448 }
449
rktio_resolve_fds(rktio_poll_set_t * fd)450 void *rktio_resolve_fds(rktio_poll_set_t *fd)
451 {
452 return growable_resolve(fd, NULL);
453 }
454 #endif
455
rktio_fdzero(rktio_poll_set_t * fd)456 void rktio_fdzero(rktio_poll_set_t *fd)
457 {
458 #ifdef RKTIO_GROWABLE_FDSET
459 int dynamic_fd_size;
460 fd = growable_resolve(fd, &dynamic_fd_size);
461 #endif
462
463 memset(fd, 0, dynamic_fd_size + sizeof(intptr_t) + sizeof(int));
464 }
465
rktio_poll_set_add_nosleep(rktio_t * rktio,rktio_poll_set_t * fds)466 void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
467 {
468 #ifdef RKTIO_GROWABLE_FDSET
469 int dynamic_fd_size;
470 fds = growable_resolve(fds, &dynamic_fd_size);
471 #endif
472
473 *(int *)((char *)fds + dynamic_fd_size + sizeof(intptr_t)) = 1;
474 }
475
fdset_has_nosleep(rktio_poll_set_t * fds)476 static int fdset_has_nosleep(rktio_poll_set_t *fds)
477 {
478 #ifdef RKTIO_GROWABLE_FDSET
479 int dynamic_fd_size;
480 fds = growable_resolve(fds, &dynamic_fd_size);
481 #endif
482
483 return *(int *)((char *)fds + dynamic_fd_size + sizeof(intptr_t));
484 }
485
486 /* Continues below: */
487 #define USE_PLAIN_FDS_SET_OPS
488
489 #elif defined (RKTIO_SYSTEM_WINDOWS)
490
491 /*========================================================================*/
492 /* Windows variant */
493 /*========================================================================*/
494
495 struct rktio_poll_set_t {
496 SOCKET *sockets;
497
498 intptr_t added, alloc, last_alloc;
499
500 intptr_t num_handles, alloc_handles, last_alloc_handles;
501 HANDLE *handles;
502
503 int *repost_sema;
504
505 int no_sleep; /* boolean */
506
507 intptr_t wait_event_mask;
508
509 HANDLE *wait_array;
510
511 HANDLE *combined_wait_array;
512 intptr_t combined_len;
513 };
514
515 static void reset_wait_array(rktio_poll_set_t *efd);
516
init_fdset_array(rktio_poll_set_t * fdarray,int count)517 static void init_fdset_array(rktio_poll_set_t *fdarray, int count)
518 {
519 if (count) {
520 int i;
521 rktio_poll_set_t *fd;
522 for (i = 0; i < count; i++) {
523 int reset = 0;
524 fd = rktio_get_fdset(fdarray, i);
525 fd->added = 0;
526 if (fd->alloc > (2 * fd->last_alloc)) {
527 fd->alloc = 0;
528 if (fd->sockets) free(fd->sockets);
529 fd->sockets = NULL;
530 reset = 1;
531 }
532 fd->last_alloc = 0;
533 fd->num_handles = 0;
534 if (fd->alloc_handles > (2 * fd->last_alloc_handles)) {
535 fd->alloc_handles = 0;
536 if (fd->handles) free(fd->handles);
537 if (fd->repost_sema) free(fd->repost_sema);
538 fd->handles = NULL;
539 fd->repost_sema = NULL;
540 reset = 1;
541 }
542 fd->last_alloc_handles = 0;
543 fd->no_sleep = 0;
544 fd->wait_event_mask = 0;
545 if (reset)
546 reset_wait_array(fdarray);
547 }
548 }
549 }
550
alloc_fdset_arrays(rktio_t * rktio)551 static rktio_poll_set_t *alloc_fdset_arrays(rktio_t *rktio)
552 {
553 rktio_poll_set_t *fdarray;
554
555 fdarray = calloc(3, sizeof(rktio_poll_set_t));
556 init_fdset_array(fdarray, 3);
557
558 return fdarray;
559 }
560
free_fdset_arrays(rktio_poll_set_t * fds)561 static void free_fdset_arrays(rktio_poll_set_t *fds)
562 {
563 int i;
564
565 for (i = 0; i < 3; i++) {
566 if (fds[i].handles)
567 free(fds[i].handles);
568 if (fds[i].repost_sema)
569 free(fds[i].repost_sema);
570 if (fds[i].wait_array)
571 free(fds[i].wait_array);
572 }
573 free(fds);
574 }
575
reset_wait_array(rktio_poll_set_t * efd)576 static void reset_wait_array(rktio_poll_set_t *efd)
577 {
578 /* Allocate an array that may be big enough to hold all events
579 when we eventually call WaitForMultipleObjects. One of the three
580 arrays will be big enough. */
581 int sz = (3 * (efd->alloc + efd->alloc_handles)) + 2;
582 HANDLE *wa;
583 if (efd->wait_array) free(efd->wait_array);
584 wa = calloc(sz, sizeof(HANDLE));
585 efd->wait_array = wa;
586 }
587
rktio_get_fdset(rktio_poll_set_t * fdarray,int pos)588 rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
589 {
590 return fdarray + pos;
591 }
592
rktio_fdzero(rktio_poll_set_t * fd)593 void rktio_fdzero(rktio_poll_set_t *fd)
594 {
595 init_fdset_array(fd, 1);
596 }
597
rktio_fdclr(rktio_poll_set_t * fd,intptr_t n)598 void rktio_fdclr(rktio_poll_set_t *fd, intptr_t n)
599 {
600 intptr_t i;
601 for (i = fd->added; i--; ) {
602 if (fd->sockets[i] == n)
603 fd->sockets[i] = INVALID_SOCKET;
604 }
605 }
606
next_size(intptr_t v)607 static intptr_t next_size(intptr_t v) { return (v ? (2 * v) : 10); }
608
rktio_fdset(rktio_poll_set_t * fd,intptr_t n)609 void rktio_fdset(rktio_poll_set_t *fd, intptr_t n)
610 {
611 if (fd->added >= fd->last_alloc) {
612 intptr_t na;
613 na = next_size(fd->last_alloc);
614 fd->last_alloc = na;
615 }
616 if (fd->added >= fd->alloc) {
617 SOCKET *naya;
618 intptr_t na;
619 na = next_size(fd->alloc);
620 naya = malloc(na * sizeof(SOCKET));
621 memcpy(naya, fd->sockets, fd->alloc * sizeof(SOCKET));
622 if (fd->sockets) free(fd->sockets);
623 fd->sockets = naya;
624 fd->alloc = na;
625 reset_wait_array(fd);
626 }
627 fd->sockets[fd->added++] = (SOCKET)n;
628 }
629
rktio_fdisset(rktio_poll_set_t * fd,intptr_t n)630 int rktio_fdisset(rktio_poll_set_t *fd, intptr_t n)
631 {
632 intptr_t i;
633 for (i = fd->added; i--; ) {
634 if (fd->sockets[i] == (SOCKET)n)
635 return 1;
636 }
637 return 0;
638 }
639
rktio_merge_fd_sets(rktio_poll_set_t * all_fds,rktio_poll_set_t * src_all_fds)640 void rktio_merge_fd_sets(rktio_poll_set_t *all_fds, rktio_poll_set_t *src_all_fds)
641 {
642 int j;
643 intptr_t i;
644
645 for (j = 0; j < 3; j++) {
646 rktio_poll_set_t *fds;
647 rktio_poll_set_t *src_fds;
648 fds = rktio_get_fdset(all_fds, j);
649 src_fds = rktio_get_fdset(src_all_fds, j);
650 for (i = src_fds->added; i--; ) {
651 if (src_fds->sockets[i] != INVALID_SOCKET)
652 rktio_fdset(fds, (intptr_t)src_fds->sockets[i]);
653 }
654 if (src_fds->no_sleep)
655 fds->no_sleep = 1;
656 fds->wait_event_mask |= src_fds->wait_event_mask;
657 }
658 }
659
rktio_clean_fd_set(rktio_poll_set_t * fds)660 void rktio_clean_fd_set(rktio_poll_set_t *fds)
661 {
662 }
663
rktio_get_fd_limit(rktio_poll_set_t * fds)664 int rktio_get_fd_limit(rktio_poll_set_t *fds)
665 {
666 return 0;
667 }
668
rktio_poll_set_add_handle(rktio_t * rktio,intptr_t _h,rktio_poll_set_t * fds,int repost)669 void rktio_poll_set_add_handle(rktio_t *rktio, intptr_t _h, rktio_poll_set_t *fds, int repost)
670 {
671 HANDLE h = (HANDLE)_h;
672 rktio_poll_set_t *efd = fds;
673 HANDLE *hs;
674 intptr_t i, new_i;
675 int *rps;
676
677 if (efd->num_handles == efd->last_alloc_handles) {
678 i = next_size(efd->last_alloc_handles);
679 efd->last_alloc_handles = 1;
680 }
681 if (efd->num_handles == efd->alloc_handles) {
682 i = efd->alloc_handles;
683 new_i = next_size(i);
684 hs = malloc(sizeof(HANDLE) * new_i);
685 rps = malloc(sizeof(int) * new_i);
686 memcpy(hs, efd->handles, sizeof(HANDLE)*i);
687 memcpy(rps, efd->repost_sema, sizeof(int)*i);
688 if (efd->handles) free(efd->handles);
689 if (efd->repost_sema) free(efd->repost_sema);
690 efd->handles = hs;
691 efd->repost_sema = rps;
692 efd->alloc_handles = new_i;
693 reset_wait_array(efd);
694 }
695 i = efd->num_handles;
696 efd->handles[i] = h;
697 efd->repost_sema[i] = repost;
698 efd->num_handles++;
699 }
700
rktio_poll_set_add_nosleep(rktio_t * rktio,rktio_poll_set_t * fds)701 void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
702 {
703 fds->no_sleep = 1;
704 }
705
fdset_has_nosleep(rktio_poll_set_t * fds)706 static int fdset_has_nosleep(rktio_poll_set_t *fds)
707 {
708 return fds->no_sleep;
709 }
710
rktio_poll_set_add_eventmask(rktio_t * rktio,rktio_poll_set_t * fds,int mask)711 void rktio_poll_set_add_eventmask(rktio_t *rktio, rktio_poll_set_t *fds, int mask)
712 {
713 fds->wait_event_mask |= mask;
714 }
715
WSAEventSelect_plus_check(SOCKET s,WSAEVENT e,long mask)716 static void WSAEventSelect_plus_check(SOCKET s, WSAEVENT e, long mask)
717 {
718 fd_set rd[1], wr[1], ex[1];
719 struct timeval t = {0, 0};
720
721 WSAEventSelect(s, e, mask);
722
723 /* double-check with select(), because WSAEventSelect only
724 handles new activity (I think) */
725 FD_ZERO(rd);
726 FD_ZERO(wr);
727 FD_ZERO(ex);
728
729 if (mask & FD_READ)
730 FD_SET(s, rd);
731 if (mask & FD_WRITE)
732 FD_SET(s, wr);
733 if (mask & FD_OOB)
734 FD_SET(s, ex);
735
736 if (select(1, rd, wr, ex, &t)) {
737 /* already ready */
738 WSAEventSelect(s, NULL, 0);
739 SetEvent(e);
740 }
741 }
742
rktio_collapse_win_fd(rktio_poll_set_t * fds)743 void rktio_collapse_win_fd(rktio_poll_set_t *fds)
744 {
745 rktio_poll_set_t *rfd, *wfd, *efd;
746 HANDLE *wa, e;
747 intptr_t i, p = 0, mask, j;
748 SOCKET s;
749
750 rfd = fds;
751 wfd = rktio_get_fdset(fds, 1);
752 efd = rktio_get_fdset(fds, 2);
753
754 if (rfd->combined_wait_array) {
755 /* clean up */
756 for (i = rfd->added; i--; ) {
757 if (rfd->sockets[i] != INVALID_SOCKET)
758 WSAEventSelect(rfd->sockets[i], NULL, 0);
759 }
760 for (i = wfd->added; i--; ) {
761 if (wfd->sockets[i] != INVALID_SOCKET)
762 WSAEventSelect(wfd->sockets[i], NULL, 0);
763 }
764 for (i = efd->added; i--; ) {
765 if (efd->sockets[i] != INVALID_SOCKET)
766 WSAEventSelect(efd->sockets[i], NULL, 0);
767 }
768 p = rfd->num_handles;
769 for (i = rfd->combined_len; i-- > p; ) {
770 WSACloseEvent(rfd->combined_wait_array[i]);
771 }
772 rfd->combined_wait_array = NULL;
773 } else {
774 /* merge */
775 if (rfd->alloc < wfd->alloc) {
776 if (wfd->alloc < efd->alloc)
777 wa = efd->wait_array;
778 else
779 wa = wfd->wait_array;
780 } else {
781 if (rfd->alloc < efd->alloc)
782 wa = efd->wait_array;
783 else
784 wa = rfd->wait_array;
785 }
786
787 rfd->combined_wait_array = wa;
788
789 p = rfd->num_handles;
790 for (i = 0; i < p; i++) {
791 wa[i] = rfd->handles[i];
792 }
793
794 for (i = rfd->added; i--; ) {
795 s = rfd->sockets[i];
796 if (s != INVALID_SOCKET) {
797 mask = FD_READ | FD_ACCEPT | FD_CLOSE;
798
799 for (j = wfd->added; j--; ) {
800 if (wfd->sockets[j] == s) {
801 mask |= FD_WRITE;
802 break;
803 }
804 }
805
806 for (j = efd->added; j--; ) {
807 if (efd->sockets[j] == s) {
808 mask |= FD_OOB;
809 break;
810 }
811 }
812
813 e = WSACreateEvent();
814 wa[p++] = e;
815 WSAEventSelect_plus_check(s, e, mask);
816 }
817 }
818
819 for (i = wfd->added; i--; ) {
820 s = wfd->sockets[i];
821 if (s != INVALID_SOCKET) {
822 mask = FD_WRITE | FD_CONNECT | FD_CLOSE;
823
824 for (j = rfd->added; j--; ) {
825 if (rfd->sockets[j] == s) {
826 mask = 0;
827 break;
828 }
829 }
830
831 if (mask) {
832 for (j = efd->added; j--; ) {
833 if (efd->sockets[j] == s) {
834 mask |= FD_OOB;
835 break;
836 }
837 }
838
839 e = WSACreateEvent();
840 wa[p++] = e;
841 WSAEventSelect_plus_check(s, e, mask);
842 }
843 }
844 }
845
846 for (i = efd->added; i--; ) {
847 s = efd->sockets[i];
848 if (s != INVALID_SOCKET) {
849 mask = FD_OOB | FD_CLOSE;
850
851 for (j = rfd->added; j--; ) {
852 if (rfd->sockets[j] == s) {
853 mask = 0;
854 break;
855 }
856 }
857
858 if (mask) {
859 for (j = wfd->added; j--; ) {
860 if (wfd->sockets[j] == s) {
861 mask = 0;
862 break;
863 }
864 }
865
866 if (mask) {
867 e = WSACreateEvent();
868 wa[p++] = e;
869 WSAEventSelect_plus_check(s, e, mask);
870 }
871 }
872 }
873 }
874
875 rfd->combined_len = p;
876 }
877 }
878
879 #else
880
881 /*========================================================================*/
882 /* Plain fd_set variant */
883 /*========================================================================*/
884
alloc_fdset_arrays(rktio_t * rktio)885 static rktio_poll_set_t *alloc_fdset_arrays(rktio_t *rktio)
886 {
887 return malloc(3 * sizeof(rktio_poll_set_t));
888 }
889
free_fdset_arrays(rktio_poll_set_t * fds)890 static void free_fdset_arrays(rktio_poll_set_t *fds)
891 {
892 free(fds);
893 }
894
rktio_get_fdset(rktio_poll_set_t * fdarray,int pos)895 rktio_poll_set_t *rktio_get_fdset(rktio_poll_set_t *fdarray, int pos)
896 {
897 return fdarray + pos;
898 }
899
rktio_fdzero(rktio_poll_set_t * fd)900 void rktio_fdzero(rktio_poll_set_t *fd)
901 {
902 FD_ZERO(&(fd)->data);
903 fd->nosleep = 0;
904 }
905
rktio_poll_set_add_nosleep(rktio_t * rktio,rktio_poll_set_t * fds)906 void rktio_poll_set_add_nosleep(rktio_t *rktio, rktio_poll_set_t *fds)
907 {
908 fds->nosleep = 1;
909 }
910
fdset_has_nosleep(rktio_poll_set_t * fds)911 static int fdset_has_nosleep(rktio_poll_set_t *fds)
912 {
913 return fds->nosleep;
914 }
915
916 #define USE_PLAIN_FDS_SET_OPS
917
918 #endif
919
920 #ifdef USE_PLAIN_FDS_SET_OPS
921
rktio_fdclr(rktio_poll_set_t * fd,intptr_t n)922 void rktio_fdclr(rktio_poll_set_t *fd, intptr_t n)
923 {
924 # ifdef RKTIO_GROWABLE_FDSET
925 fd = growable_resolve(fd, NULL);
926 # endif
927 FD_CLR(n, &(fd)->data);
928 }
929
rktio_fdset(rktio_poll_set_t * fd,intptr_t n)930 void rktio_fdset(rktio_poll_set_t *fd, intptr_t n)
931 {
932 # ifdef RKTIO_GROWABLE_FDSET
933 int dynamic_fd_size;
934 maybe_grow(fd, n);
935 fd = growable_resolve(fd, &dynamic_fd_size);
936 # endif
937 # ifdef STORED_ACTUAL_FDSET_LIMIT
938 {
939 int mx;
940 mx = FDSET_LIMIT(fd, dynamic_fd_size);
941 if (n > mx) {
942 FDSET_LIMIT(fd, dynamic_fd_size) = n;
943 }
944 }
945 # endif
946
947 FD_SET(n, &(fd)->data);
948 }
949
rktio_fdisset(rktio_poll_set_t * fd,intptr_t n)950 int rktio_fdisset(rktio_poll_set_t *fd, intptr_t n)
951 {
952 # ifdef RKTIO_GROWABLE_FDSET
953 fd = growable_resolve(fd, NULL);
954 # endif
955 return FD_ISSET(n, &(fd)->data);
956 }
957
rktio_merge_fd_sets(rktio_poll_set_t * fds,rktio_poll_set_t * src_fds)958 void rktio_merge_fd_sets(rktio_poll_set_t *fds, rktio_poll_set_t *src_fds)
959 {
960 int i, j;
961 rktio_poll_set_t *fds_j, *src_fds_j;
962 unsigned char *p, *sp;
963 # ifdef RKTIO_GROWABLE_FDSET
964 int dynamic_fd_size;
965 int s_dynamic_fd_size;
966 # elif defined (STORED_ACTUAL_FDSET_LIMIT)
967 int s_dynamic_fd_size = dynamic_fd_size;
968 # endif
969
970 for (j = 0; j < 3; j++) {
971 src_fds_j = rktio_get_fdset(src_fds, j);
972 # ifdef RKTIO_GROWABLE_FDSET
973 src_fds_j = growable_resolve(src_fds_j, &s_dynamic_fd_size);
974 # endif
975 sp = (unsigned char *)src_fds_j;
976
977 # ifdef RKTIO_GROWABLE_FDSET
978 maybe_grow(fds, FDSET_LIMIT(sp, s_dynamic_fd_size));
979 # endif
980
981 fds_j = rktio_get_fdset(fds, j);
982 # ifdef RKTIO_GROWABLE_FDSET
983 fds_j = growable_resolve(fds_j, &dynamic_fd_size);
984 # endif
985 p = (unsigned char *)fds_j;
986
987 # ifdef STORED_ACTUAL_FDSET_LIMIT
988 i = FDSET_LIMIT(p, dynamic_fd_size);
989 if (FDSET_LIMIT(sp, s_dynamic_fd_size) > i) {
990 i = FDSET_LIMIT(sp, s_dynamic_fd_size);
991 FDSET_LIMIT(p, dynamic_fd_size) = i;
992 }
993 /* `i` is max fd, so add 1 to get count, then convert to bytes (rounding up) */
994 i = (i + 1 + 7) >> 3;
995 # elif defined(USE_DYNAMIC_FDSET_SIZE)
996 i = dynamic_fd_size;
997 # else
998 i = sizeof(fd_set);
999 # endif
1000
1001 for (; i--; p++, sp++) {
1002 *p |= *sp;
1003 }
1004 }
1005
1006 if (fdset_has_nosleep(src_fds))
1007 rktio_poll_set_add_nosleep(NULL, fds);
1008 }
1009
rktio_clean_fd_set(rktio_poll_set_t * fds)1010 void rktio_clean_fd_set(rktio_poll_set_t *fds)
1011 {
1012 }
1013
rktio_get_fd_limit(rktio_poll_set_t * fds)1014 int rktio_get_fd_limit(rktio_poll_set_t *fds)
1015 {
1016 int actual_limit;
1017
1018 # ifdef STORED_ACTUAL_FDSET_LIMIT
1019 {
1020 fd_set *rd, *wr, *ex;
1021 # ifdef RKTIO_GROWABLE_FDSET
1022 int dynamic_fd_size;
1023 (void)growable_resolve(fds, &dynamic_fd_size);
1024 # endif
1025
1026 rd = RKTIO_FDS(fds);
1027 wr = RKTIO_FDS(RKTIO_GET_FDSET(fds, 1));
1028 ex = RKTIO_FDS(RKTIO_GET_FDSET(fds, 2));
1029
1030 actual_limit = FDSET_LIMIT(rd, dynamic_fd_size);
1031 if (FDSET_LIMIT(wr, dynamic_fd_size) > actual_limit)
1032 actual_limit = FDSET_LIMIT(wr, dynamic_fd_size);
1033 if (FDSET_LIMIT(ex, dynamic_fd_size) > actual_limit)
1034 actual_limit = FDSET_LIMIT(ex, dynamic_fd_size);
1035 actual_limit++;
1036 }
1037 # elif defined (USE_ULIMIT)
1038 actual_limit = ulimit(4, 0);
1039 #elif defined(FIXED_FD_LIMIT)
1040 actual_limit = FIXED_FD_LIMIT;
1041 #else
1042 actual_limit = getdtablesize();
1043 # endif
1044
1045 return actual_limit;
1046 }
1047
1048 #endif
1049
1050 #ifndef RKTIO_SYSTEM_WINDOWS
rktio_poll_set_add_handle(rktio_t * rktio,intptr_t h,rktio_poll_set_t * fds,int repost)1051 void rktio_poll_set_add_handle(rktio_t *rktio, intptr_t h, rktio_poll_set_t *fds, int repost) { }
rktio_poll_set_add_eventmask(rktio_t * rktio,rktio_poll_set_t * fds,int mask)1052 void rktio_poll_set_add_eventmask(rktio_t *rktio, rktio_poll_set_t *fds, int mask) { }
rkio_reset_sleep_backoff(rktio_t * rktio)1053 void rkio_reset_sleep_backoff(rktio_t *rktio) { }
1054 #endif
1055
1056 /*========================================================================*/
1057 /* Shared internal poll set */
1058 /*========================================================================*/
1059
1060 /* Generalize fd arrays (FD_SET, etc) with a runtime-determined size,
1061 special hooks for Windows "descriptors" like even queues and
1062 semaphores, etc. */
1063
rktio_alloc_global_poll_set(rktio_t * rktio)1064 void rktio_alloc_global_poll_set(rktio_t *rktio) {
1065 #ifdef USE_FAR_RKTIO_FDCALLS
1066 rktio->rktio_global_poll_set = alloc_fdset_arrays(rktio);
1067 #endif
1068 }
1069
rktio_free_global_poll_set(rktio_t * rktio)1070 void rktio_free_global_poll_set(rktio_t *rktio) {
1071 #ifdef USE_FAR_RKTIO_FDCALLS
1072 free_fdset_arrays(rktio->rktio_global_poll_set);
1073 #endif
1074 }
1075
1076 /*========================================================================*/
1077 /* Create a poll set */
1078 /*========================================================================*/
1079
1080 /* Internally, poll sets are used with macros like DECL_FDSET(), but this
1081 is the API for external use. */
1082
rktio_make_poll_set(rktio_t * rktio)1083 rktio_poll_set_t *rktio_make_poll_set(rktio_t *rktio)
1084 {
1085 rktio_poll_set_t *fds = alloc_fdset_arrays(rktio);
1086
1087 RKTIO_FD_ZERO(fds);
1088 RKTIO_FD_ZERO(RKTIO_GET_FDSET(fds, 1));
1089 RKTIO_FD_ZERO(RKTIO_GET_FDSET(fds, 2));
1090
1091 return fds;
1092 }
1093
rktio_poll_set_forget(rktio_t * rktio,rktio_poll_set_t * fds)1094 void rktio_poll_set_forget(rktio_t *rktio, rktio_poll_set_t *fds)
1095 {
1096 free_fdset_arrays(fds);
1097 }
1098
1099 /*========================================================================*/
1100 /* Sleeping as a generalized select() */
1101 /*========================================================================*/
1102
rktio_initialize_signal(rktio_t * rktio)1103 int rktio_initialize_signal(rktio_t *rktio)
1104 {
1105 #ifdef RKTIO_SYSTEM_UNIX
1106 /* Set up a pipe for signaling external events: */
1107 int fds[2];
1108 if (!pipe(fds)) {
1109 rktio->external_event_fd = fds[0];
1110 rktio->put_external_event_fd = fds[1];
1111 fcntl(rktio->external_event_fd, F_SETFL, RKTIO_NONBLOCKING);
1112 fcntl(rktio->put_external_event_fd, F_SETFL, RKTIO_NONBLOCKING);
1113 return 1;
1114 } else {
1115 set_racket_error(RKTIO_ERROR_INIT_FAILED);
1116 return 0;
1117 }
1118 #endif
1119
1120 #ifdef RKTIO_SYSTEM_WINDOWS
1121 rktio->break_semaphore = (void*)CreateSemaphore(NULL, 0, 1, NULL);
1122 if (rktio->break_semaphore == INVALID_HANDLE_VALUE) {
1123 get_windows_error();
1124 return 0;
1125 } else
1126 return 1;
1127 #endif
1128 }
1129
rktio_free_signal(rktio_t * rktio)1130 void rktio_free_signal(rktio_t *rktio)
1131 {
1132 #ifdef RKTIO_SYSTEM_UNIX
1133 rktio_reliably_close(rktio->external_event_fd);
1134 rktio_reliably_close(rktio->put_external_event_fd);
1135 #endif
1136
1137 #ifdef RKTIO_SYSTEM_WINDOWS
1138 CloseHandle(rktio->break_semaphore);
1139 #endif
1140 }
1141
rktio_get_signal_handle(rktio_t * rktio)1142 rktio_signal_handle_t *rktio_get_signal_handle(rktio_t *rktio)
1143 {
1144 #ifdef RKTIO_SYSTEM_UNIX
1145 return (rktio_signal_handle_t *)&rktio->put_external_event_fd;
1146 #endif
1147 #ifdef RKTIO_SYSTEM_WINDOWS
1148 return (rktio_signal_handle_t *)&rktio->break_semaphore;
1149 #endif
1150 }
1151
rktio_signal_received(rktio_t * rktio)1152 void rktio_signal_received(rktio_t *rktio)
1153 {
1154 rktio_signal_received_at(rktio_get_signal_handle(rktio));
1155 }
1156
rktio_signal_received_at(rktio_signal_handle_t * h)1157 void rktio_signal_received_at(rktio_signal_handle_t *h)
1158 {
1159 #ifdef RKTIO_SYSTEM_UNIX
1160 int put_ext_event_fd = *(int *)h;
1161 int saved_errno = errno;
1162 if (put_ext_event_fd) {
1163 int v;
1164 do {
1165 v = write(put_ext_event_fd, "!", 1);
1166 } while ((v == -1) && (errno == EINTR));
1167 }
1168 errno = saved_errno;
1169 #endif
1170 #ifdef RKTIO_SYSTEM_WINDOWS
1171 ReleaseSemaphore(*(HANDLE *)h, 1, NULL);
1172 #endif
1173 }
1174
rktio_flush_signals_received(rktio_t * rktio)1175 void rktio_flush_signals_received(rktio_t *rktio)
1176 {
1177 #ifdef RKTIO_SYSTEM_UNIX
1178 /* Clear external event flag */
1179 if (rktio->external_event_fd) {
1180 int rc;
1181 char buf[10];
1182 do {
1183 rc = read(rktio->external_event_fd, buf, 10);
1184 } while ((rc == -1) && errno == EINTR);
1185 }
1186 #endif
1187 }
1188
rktio_wait_until_signal_received(rktio_t * rktio)1189 void rktio_wait_until_signal_received(rktio_t *rktio)
1190 {
1191 #ifdef RKTIO_SYSTEM_UNIX
1192 int r;
1193 # ifdef HAVE_POLL_SYSCALL
1194 struct pollfd pfd[1];
1195 pfd[0].fd = rktio->external_event_fd;
1196 pfd[0].events = POLLIN;
1197 do {
1198 r = poll(pfd, 1, -1);
1199 } while ((r == -1) && (errno == EINTR));
1200 # else
1201 DECL_FDSET(readfds, 1);
1202
1203 INIT_DECL_RD_FDSET(readfds);
1204
1205 RKTIO_FD_ZERO(readfds);
1206 RKTIO_FD_SET(rktio->external_event_fd, readfds);
1207
1208 do {
1209 r = select(rktio->external_event_fd + 1, RKTIO_FDS(readfds), NULL, NULL, NULL);
1210 } while ((r == -1) && (errno == EINTR));
1211 # endif
1212 #endif
1213 #ifdef RKTIO_SYSTEM_WINDOWS
1214 WaitForSingleObject(rktio->break_semaphore, INFINITE);
1215 #endif
1216
1217 rktio_flush_signals_received(rktio);
1218 }
1219
1220 /****************** Windows cleanup *****************/
1221
1222 #ifdef RKTIO_SYSTEM_WINDOWS
1223
clean_up_wait(rktio_t * rktio,intptr_t result,HANDLE * array,int * rps,int count)1224 static void clean_up_wait(rktio_t *rktio,
1225 intptr_t result, HANDLE *array,
1226 int *rps, int count)
1227 {
1228 if ((result >= (intptr_t)WAIT_OBJECT_0) && (result < (intptr_t)WAIT_OBJECT_0 + count)) {
1229 result -= WAIT_OBJECT_0;
1230 if (rps[result])
1231 ReleaseSemaphore(array[result], 1, NULL);
1232 }
1233
1234 /* Clear out break semaphore */
1235 WaitForSingleObject(rktio->break_semaphore, 0);
1236 }
1237
rkio_reset_sleep_backoff(rktio_t * rktio)1238 void rkio_reset_sleep_backoff(rktio_t *rktio)
1239 {
1240 rktio->made_progress = 1;
1241 }
1242
prepare_windows_sleep(DWORD msec)1243 static void prepare_windows_sleep(DWORD msec)
1244 {
1245 /* The default scheduling granilaity is usually 16ms, which
1246 means that a request to sleep 16ms could easily end up being 31ms,
1247 and a request to sleep 2ms is likely at least 16ms. We can
1248 request a finer granularity of scheduling, but do that only
1249 temporarily, because it may slow down the rest of the system. */
1250 if (msec < 32)
1251 timeBeginPeriod((msec >> 2) | 1);
1252 }
1253
finish_windows_sleep(DWORD msec)1254 static void finish_windows_sleep(DWORD msec)
1255 {
1256 if (msec < 32)
1257 timeEndPeriod((msec >> 1) | 1);
1258 }
1259
1260 #endif
1261
1262 /******************** Main sleep function *****************/
1263 /* The simple select() stuff is buried in various kinds of complexity. */
1264
1265 /* FIXME: don't forget SIGCHILD_DOESNT_INTERRUPT_SELECT handling in Racket */
1266
rktio_sleep(rktio_t * rktio,float nsecs,rktio_poll_set_t * fds,rktio_ltps_t * lt)1267 void rktio_sleep(rktio_t *rktio, float nsecs, rktio_poll_set_t *fds, rktio_ltps_t *lt)
1268 {
1269 if (fds && fdset_has_nosleep(fds))
1270 return;
1271
1272 if (fds && lt) {
1273 #if defined(HAVE_KQUEUE_SYSCALL) || defined(HAVE_EPOLL_SYSCALL)
1274 int fd = rktio_ltps_get_fd(lt);
1275 /* `fd` can be -1, because the file descriptor is created lazily */
1276 if (fd != -1) {
1277 RKTIO_FD_SET(fd, fds);
1278 RKTIO_FD_SET(fd, RKTIO_GET_FDSET(fds, 2));
1279 }
1280 #else
1281 rktio_merge_fd_sets(fds, rktio_ltps_get_fd_set(lt));
1282 #endif
1283 }
1284
1285 if (!fds) {
1286 /* Nothing to block on - just sleep for some amount of time. */
1287 #ifdef RKTIO_SYSTEM_UNIX
1288 # ifdef HAVE_POLL_SYSCALL
1289 int timeout;
1290 if (nsecs <= 0.0)
1291 timeout = -1;
1292 else {
1293 timeout = (int)(nsecs * 1000.0);
1294 if (timeout < 0)
1295 timeout = 0;
1296 }
1297 if (rktio->external_event_fd) {
1298 struct pollfd pfd[1];
1299 pfd[0].fd = rktio->external_event_fd;
1300 pfd[0].events = POLLIN;
1301 poll(pfd, 1, timeout);
1302 } else {
1303 poll(NULL, 0, timeout);
1304 }
1305 # else
1306 /* Sleep by selecting on the external event fd */
1307 struct timeval time;
1308 intptr_t secs = (intptr_t)nsecs;
1309 intptr_t usecs = (intptr_t)(fmod(nsecs, 1.0) * 1000000);
1310
1311 if (nsecs && (nsecs > 100000))
1312 secs = 100000;
1313 if (usecs < 0)
1314 usecs = 0;
1315 if (usecs >= 1000000)
1316 usecs = 999999;
1317
1318 time.tv_sec = secs;
1319 time.tv_usec = usecs;
1320
1321 if (rktio->external_event_fd) {
1322 DECL_FDSET(readfds, 1);
1323
1324 INIT_DECL_RD_FDSET(readfds);
1325
1326 RKTIO_FD_ZERO(readfds);
1327 RKTIO_FD_SET(rktio->external_event_fd, readfds);
1328
1329 select(rktio->external_event_fd + 1, RKTIO_FDS(readfds), NULL, NULL, &time);
1330 } else {
1331 select(0, NULL, NULL, NULL, &time);
1332 }
1333 # endif
1334 #else
1335 # ifdef RKTIO_SYSTEM_WINDOWS
1336 {
1337 DWORD msecs = nsecs * 1000;
1338 if (msecs > 0) {
1339 prepare_windows_sleep(msecs);
1340 Sleep(msecs);
1341 finish_windows_sleep(msecs);
1342 }
1343 }
1344 # else
1345 # ifndef NO_SLEEP
1346 # ifndef NO_USLEEP
1347 usleep((unsigned)(nsecs * 1000));
1348 # else
1349 sleep(nsecs);
1350 # endif
1351 # endif
1352 # endif
1353 #endif
1354 } else {
1355 /* Something to block on.... */
1356
1357 #ifdef HAVE_POLL_SYSCALL
1358
1359 /******* poll() variant *******/
1360
1361 {
1362 struct rktio_fd_set_data_t *data = fds->data;
1363 intptr_t count = data->count;
1364 int timeout;
1365
1366 if (nsecs <= 0.0)
1367 timeout = -1;
1368 else if (nsecs > 100000)
1369 timeout = 100000000;
1370 else {
1371 timeout = (int)(nsecs * 1000.0);
1372 if (timeout < 0)
1373 timeout = 0;
1374 }
1375
1376 if (rktio->external_event_fd) {
1377 data->pfd[count].fd = rktio->external_event_fd;
1378 data->pfd[count].events = POLLIN;
1379 count++;
1380 }
1381
1382 poll(data->pfd, count, timeout);
1383 }
1384 #elif !defined(RKTIO_SYSTEM_WINDOWS)
1385
1386 /******* select() variant *******/
1387
1388 {
1389 int actual_limit;
1390 fd_set *rd, *wr, *ex;
1391 struct timeval time;
1392 intptr_t secs = (intptr_t)nsecs;
1393 intptr_t usecs = (intptr_t)(fmod(nsecs, 1.0) * 1000000);
1394
1395 if (nsecs && (nsecs > 100000))
1396 secs = 100000;
1397 if (usecs < 0)
1398 usecs = 0;
1399 if (usecs >= 1000000)
1400 usecs = 999999;
1401
1402 time.tv_sec = secs;
1403 time.tv_usec = usecs;
1404
1405 rd = RKTIO_FDS(fds);
1406 wr = RKTIO_FDS(RKTIO_GET_FDSET(fds, 1));
1407 ex = RKTIO_FDS(RKTIO_GET_FDSET(fds, 2));
1408
1409 actual_limit = rktio_get_fd_limit(fds);
1410
1411 /* Watch for external events, too: */
1412 if (rktio->external_event_fd) {
1413 FD_SET(rktio->external_event_fd, rd);
1414 if (rktio->external_event_fd >= actual_limit)
1415 actual_limit = rktio->external_event_fd + 1;
1416 }
1417
1418 select(actual_limit, rd, wr, ex, nsecs ? &time : NULL);
1419 }
1420
1421 #else
1422
1423 /******* Windows variant *******/
1424
1425 {
1426 intptr_t result;
1427 HANDLE *array, just_two_array[2];
1428 intptr_t count, rcount;
1429 int *rps;
1430
1431 rktio_collapse_win_fd(fds); /* merges */
1432
1433 rcount = fds->num_handles;
1434 count = fds->combined_len;
1435 array = fds->combined_wait_array;
1436 rps = fds->repost_sema;
1437
1438 /* add break semaphore: */
1439 if (!count)
1440 array = just_two_array;
1441 array[count++] = rktio->break_semaphore;
1442
1443 /* Extensions may handle events.
1444 If the event queue is empty (as reported by GetQueueStatus),
1445 everything's ok.
1446
1447 Otherwise, we have trouble sleeping until an event is ready. We
1448 sometimes leave events on th queue because, say, an eventspace is
1449 not ready. The problem is that MsgWait... only unblocks when a new
1450 event appears. Since extensions may check the queue using a sequence of
1451 PeekMessages, it's possible that an event is added during the
1452 middle of the sequence, but doesn't get handled.
1453
1454 To avoid this problem, we don't actually sleep indefinitely if an event
1455 is pending. Instead, we slep 10 ms, then 20 ms, etc. This exponential
1456 backoff ensures that we eventually handle a pending event, but we don't
1457 spin and eat CPU cycles. The back-off is reset whenever a thread makes
1458 progress. */
1459
1460 if (fds->wait_event_mask && GetQueueStatus(fds->wait_event_mask)) {
1461 if (!rktio->made_progress) {
1462 /* Ok, we've gone around at least once. */
1463 if (rktio->max_sleep_time < 0x20000000) {
1464 rktio->max_sleep_time *= 2;
1465 }
1466 } else {
1467 /* Starting back-off mode */
1468 rktio->made_progress = 0;
1469 rktio->max_sleep_time = 5;
1470 }
1471 } else {
1472 /* Disable back-off mode */
1473 rktio->made_progress = 1;
1474 rktio->max_sleep_time = 0;
1475 }
1476
1477 /* Wait for HANDLE-based input: */
1478 {
1479 DWORD msec;
1480 if (nsecs) {
1481 if (nsecs > 100000)
1482 msec = 100000000;
1483 else
1484 msec = (DWORD)(nsecs * 1000);
1485 if (rktio->max_sleep_time && (msec > rktio->max_sleep_time))
1486 msec = rktio->max_sleep_time;
1487 } else {
1488 if (rktio->max_sleep_time)
1489 msec = rktio->max_sleep_time;
1490 else
1491 msec = INFINITE;
1492 }
1493
1494 prepare_windows_sleep(msec);
1495 result = MsgWaitForMultipleObjects(count, array, FALSE, msec, fds->wait_event_mask);
1496 finish_windows_sleep(msec);
1497 }
1498 clean_up_wait(rktio, result, array, rps, rcount);
1499 rktio_collapse_win_fd(fds); /* cleans up */
1500
1501 return;
1502 }
1503 #endif
1504 }
1505
1506 rktio_flush_signals_received(rktio);
1507 }
1508