1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2007-2018. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #endif
23 
24 #define WANT_NONBLOCKING
25 
26 #include "sys.h"
27 #include "erl_alloc.h"
28 #include "erl_poll.h"
29 #include "erl_time.h"
30 #include "erl_msacc.h"
31 
32 /*
33  * Some debug macros
34  */
35 
36 /*#define HARDDEBUG */
37 /*#define HARDTRACE */
38 #ifdef HARDDEBUG
39 #ifdef HARDTRACE
40 #define HARDTRACEF(X) my_debug_printf##X
41 #else
42 #define HARDTRACEF(X)
43 #endif
44 
45 #define HARDDEBUGF(X) my_debug_printf##X
my_debug_printf(char * fmt,...)46 static void my_debug_printf(char *fmt, ...)
47 {
48     char buffer[1024];
49     va_list args;
50 
51     va_start(args, fmt);
52     erts_vsnprintf(buffer,1024,fmt,args);
53     va_end(args);
54     erts_printf("%s\r\n",buffer);
55 }
56 #else
57 #define HARDTRACEF(X)
58 #define HARDDEBUGF(X)
59 #endif
60 
61 #ifdef DEBUG
62 #define NoMansLandFill 0xFD	/* fill no-man's land with this */
63 #define DeadLandFill   0xDD	/* fill free objects with this */
64 #define CleanLandFill  0xCD	/* fill new objects with this */
65 
66 static void consistency_check(struct _Waiter* w);
67 static void* debug_alloc(ErtsAlcType_t, Uint);
68 static void* debug_realloc(ErtsAlcType_t, void *, Uint, Uint);
69 
70 #  define SEL_ALLOC	debug_alloc
71 #  define SEL_REALLOC	debug_realloc
72 #  define SEL_FREE	erts_free
73 
debug_alloc(ErtsAlcType_t type,Uint size)74 static void *debug_alloc(ErtsAlcType_t type, Uint size)
75 {
76     void* p = erts_alloc(type, size);
77     memset(p, CleanLandFill, size);
78     return p;
79 }
80 
debug_realloc(ErtsAlcType_t type,void * ptr,Uint prev_size,Uint size)81 static void *debug_realloc(ErtsAlcType_t type, void *ptr, Uint prev_size,
82 			   Uint size)
83 {
84     void *p;
85     size_t fill_size;
86     void *fill_ptr;
87 
88     if (prev_size > size) {
89 	size_t fill_size = (size_t) (prev_size - size);
90 	void *fill_ptr = (void *) (((char *) ptr) + size);
91 	memset(fill_ptr, NoMansLandFill, fill_size);
92     }
93 
94     p = erts_realloc(type, ptr, size);
95 
96     if (size > prev_size) {
97 	size_t fill_size = (size_t) (size - prev_size);
98 	void *fill_ptr = (void *) (((char *) p) + prev_size);
99 	memset(fill_ptr, CleanLandFill, fill_size);
100     }
101 
102     return p;
103 }
104 #else
105 #  define SEL_ALLOC	erts_alloc
106 #  define SEL_REALLOC	realloc_wrap
107 #  define SEL_FREE	erts_free
108 
109 static ERTS_INLINE void *
realloc_wrap(ErtsAlcType_t t,void * p,Uint ps,Uint s)110 realloc_wrap(ErtsAlcType_t t, void *p, Uint ps, Uint s)
111 {
112     return erts_realloc(t, p, s);
113 }
114 #endif
115 
116 
117 #ifdef HARD_POLL_DEBUG
118 #define OP_SELECT 1
119 #define OP_DESELECT 2
120 #define OP_FIRED 3
121 #define OP_READ_BEGIN 4
122 #define OP_READ_DONE 5
123 #define OP_WRITE_BEGIN 6
124 #define OP_WRITE_DONE 7
125 #define OP_REPORTED 8
126 #define OP_DIED 9
127 #define OP_ASYNC_INIT 10
128 #define OP_ASYNC_IMMED 11
129 #define OP_FD_MOVED 12
130 
131 static struct {
132     int op;
133     ErtsSysFdType active;
134     int xdata;
135 } debug_save_ops[1024];
136 
137 static int num_debug_save_ops = 0;
138 
139 static ErtsSysFdType active_debug_fd;
140 static int active_debug_fd_set = 0;
141 
142 static erts_mtx_t save_ops_mtx;
143 
poll_debug_init(void)144 static void poll_debug_init(void)
145 {
146     erts_mtx_init(&save_ops_mtx, "save_ops_lock", NIL,
147         ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DEBUG);
148 }
149 
poll_debug_set_active_fd(ErtsSysFdType fd)150 void poll_debug_set_active_fd(ErtsSysFdType fd)
151 {
152     erts_mtx_lock(&save_ops_mtx);
153     active_debug_fd_set = 1;
154     active_debug_fd = fd;
155     erts_mtx_unlock(&save_ops_mtx);
156 }
157 
do_save_op(ErtsSysFdType fd,int op,int xdata)158 static void do_save_op(ErtsSysFdType fd, int op, int xdata)
159 {
160     erts_mtx_lock(&save_ops_mtx);
161     if (fd == active_debug_fd && num_debug_save_ops < 1024) {
162 	int x = num_debug_save_ops++;
163 	debug_save_ops[x].op = op;
164 	debug_save_ops[x].active = fd;
165 	debug_save_ops[x].xdata = xdata;
166     }
167     erts_mtx_unlock(&save_ops_mtx);
168 }
169 
poll_debug_moved(ErtsSysFdType fd,int s1,int s2)170 void poll_debug_moved(ErtsSysFdType fd, int s1, int s2)
171 {
172     do_save_op(fd,OP_FD_MOVED,s1 | (s2 << 16));
173 }
174 
poll_debug_select(ErtsSysFdType fd,int mode)175 void poll_debug_select(ErtsSysFdType fd, int mode)
176 {
177     do_save_op(fd,OP_SELECT,mode);
178 }
179 
poll_debug_deselect(ErtsSysFdType fd)180 void poll_debug_deselect(ErtsSysFdType fd)
181 {
182     do_save_op(fd,OP_DESELECT,0);
183 }
184 
poll_debug_fired(ErtsSysFdType fd)185 void poll_debug_fired(ErtsSysFdType fd)
186 {
187     do_save_op(fd,OP_FIRED,0);
188 }
189 
poll_debug_read_begin(ErtsSysFdType fd)190 void poll_debug_read_begin(ErtsSysFdType fd)
191 {
192     do_save_op(fd,OP_READ_BEGIN,0);
193 }
194 
poll_debug_read_done(ErtsSysFdType fd,int bytes)195 void poll_debug_read_done(ErtsSysFdType fd, int bytes)
196 {
197     do_save_op(fd,OP_READ_DONE,bytes);
198 }
199 
poll_debug_async_initialized(ErtsSysFdType fd)200 void poll_debug_async_initialized(ErtsSysFdType fd)
201 {
202     do_save_op(fd,OP_ASYNC_INIT,0);
203 }
204 
poll_debug_async_immediate(ErtsSysFdType fd,int bytes)205 void poll_debug_async_immediate(ErtsSysFdType fd, int bytes)
206 {
207     do_save_op(fd,OP_ASYNC_IMMED,bytes);
208 }
209 
poll_debug_write_begin(ErtsSysFdType fd)210 void poll_debug_write_begin(ErtsSysFdType fd)
211 {
212     do_save_op(fd,OP_WRITE_BEGIN,0);
213 }
214 
poll_debug_write_done(ErtsSysFdType fd,int bytes)215 void poll_debug_write_done(ErtsSysFdType fd, int bytes)
216 {
217     do_save_op(fd,OP_WRITE_DONE,bytes);
218 }
219 
poll_debug_reported(ErtsSysFdType fd,int mode)220 void poll_debug_reported(ErtsSysFdType fd, int mode)
221 {
222     do_save_op(fd,OP_REPORTED,mode);
223 }
224 
poll_debug_died(ErtsSysFdType fd)225 void poll_debug_died(ErtsSysFdType fd)
226 {
227     do_save_op(fd,OP_DIED,0);
228 }
229 
230 #endif /* DEBUG */
231 
232 /*
233  * End of debug macros
234  */
235 
236 
237 
238 /*
239  * Handles that we poll, but that are actually signalled from outside
240  * this module
241  */
242 
243 extern HANDLE erts_service_event;
244 extern HANDLE erts_sys_break_event;
245 
246 
247 /*
248  * The structure we hold for each event (i.e. fd)
249  */
250 typedef struct _EventData {
251     HANDLE event;		/* For convenience. */
252     ErtsPollEvents mode;	/* The current select mode. */
253     struct _EventData *next;	/* Next in free or delete lists. */
254 } EventData;
255 
256 /*
257  * The structure to represent a waiter thread
258  */
259 typedef struct _Waiter {
260     HANDLE events[MAXIMUM_WAIT_OBJECTS];     /* The events. */
261     EventData* evdata[MAXIMUM_WAIT_OBJECTS]; /* Pointers to associated data. */
262     int active_events;		/* Number of events to wait for */
263     int total_events;		/* Total number of events in the arrays. */
264     int highwater;              /* Events processed up to here */
265     EventData evdata_heap[MAXIMUM_WAIT_OBJECTS]; /* Pre-allocated EventDatas */
266     EventData* first_free_evdata; /* Index of first free EventData object. */
267     HANDLE go_ahead;		/* The waiter may continue. (Auto-reset) */
268     void *xdata;                /* used when thread parameter */
269     erts_tid_t this;            /* Thread "handle" of this waiter */
270     erts_mtx_t mtx;             /* Mutex for updating/reading pollset, but the
271 				   currently used set require thread stopping
272 				   to be updated */
273 } Waiter;
274 
275 /*
276  * The structure for a pollset. There can currently be only one...
277  */
278 struct erts_pollset {
279     Waiter** waiter;
280     int allocated_waiters;  /* Size ow waiter array */
281     int num_waiters;	    /* Number of waiter threads. */
282     HANDLE event_io_ready;     /* To be used when waiting for io */
283     /* These are used to wait for workers to enter standby */
284     volatile int standby_wait_counter; /* Number of threads to wait for */
285     CRITICAL_SECTION standby_crit;     /* CS to guard the counter */
286     HANDLE standby_wait_event;         /* Event signalled when counter == 0 */
287     erts_atomic32_t wakeup_state;
288     erts_mtx_t mtx;
289 };
290 
291 
292 #define ERTS_POLLSET_LOCK(PS) \
293   erts_mtx_lock(&(PS)->mtx)
294 #define ERTS_POLLSET_UNLOCK(PS) \
295   erts_mtx_unlock(&(PS)->mtx)
296 
297 
298 /*
299  * Communication with sys_interrupt
300  */
301 
302 extern erts_atomic32_t erts_break_requested;
303 #define ERTS_SET_BREAK_REQUESTED \
304   erts_atomic32_set_nob(&erts_break_requested, (erts_aint32_t) 1)
305 #define ERTS_UNSET_BREAK_REQUESTED \
306   erts_atomic32_set_nob(&erts_break_requested, (erts_aint32_t) 0)
307 
308 static erts_mtx_t break_waiter_lock;
309 static HANDLE break_happened_event;
310 static erts_atomic32_t break_waiter_state;
311 #define BREAK_WAITER_GOT_BREAK 1
312 #define BREAK_WAITER_GOT_HALT 2
313 
314 
315 /*
316  * Forward declarations
317  */
318 
319 static void *threaded_waiter(void *param);
320 static void *break_waiter(void *param);
321 
322 /*
323  * Sychronization macros and functions
324  */
325 #define START_WAITER(PS, w) \
326     SetEvent((w)->go_ahead)
327 
328 #define STOP_WAITER(PS,w) \
329 do { \
330     setup_standby_wait((PS),1); \
331     SetEvent((w)->events[0]); \
332     wait_standby(PS); \
333 } while(0)
334 
335 #define START_WAITERS(PS) \
336 do { \
337     int i; \
338     for (i = 0; i < (PS)->num_waiters; i++) { \
339 	SetEvent((PS)->waiter[i]->go_ahead); \
340     } \
341  } while(0)
342 
343 #define STOP_WAITERS(PS) \
344 do { \
345     int i; \
346     setup_standby_wait((PS),(PS)->num_waiters); \
347     for (i = 0; i < (PS)->num_waiters; i++) { \
348 	SetEvent((PS)->waiter[i]->events[0]); \
349     } \
350     wait_standby(PS); \
351  } while(0)
352 
353 #define ERTS_POLL_NOT_WOKEN		((erts_aint32_t) 0)
354 #define ERTS_POLL_WOKEN_IO_READY	((erts_aint32_t) 1)
355 #define ERTS_POLL_WOKEN_INTR		((erts_aint32_t) 2)
356 #define ERTS_POLL_WOKEN_TIMEDOUT	((erts_aint32_t) 3)
357 
358 static ERTS_INLINE int
is_io_ready(ErtsPollSet * ps)359 is_io_ready(ErtsPollSet *ps)
360 {
361     return erts_atomic32_read_nob(&ps->wakeup_state) == ERTS_POLL_WOKEN_IO_READY;
362 }
363 
364 static ERTS_INLINE void
woke_up(ErtsPollSet * ps,int waketype)365 woke_up(ErtsPollSet *ps, int waketype)
366 {
367     if (erts_atomic32_read_nob(&ps->wakeup_state) == ERTS_POLL_NOT_WOKEN)
368 	erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
369 				  waketype,
370 				  ERTS_POLL_NOT_WOKEN);
371 #ifdef DEBUG
372     {
373 	erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
374 	switch (wakeup_state) {
375 	case ERTS_POLL_WOKEN_IO_READY:
376 	case ERTS_POLL_WOKEN_INTR:
377 	case ERTS_POLL_WOKEN_TIMEDOUT:
378 	    break;
379 	default:
380 	    ASSERT(0);
381 	    break;
382 	}
383     }
384 #endif
385 }
386 
387 static ERTS_INLINE int
wakeup_cause(ErtsPollSet * ps)388 wakeup_cause(ErtsPollSet *ps)
389 {
390     int res;
391     erts_aint32_t wakeup_state = erts_atomic32_read_acqb(&ps->wakeup_state);
392     switch (wakeup_state) {
393     case ERTS_POLL_WOKEN_IO_READY:
394 	res = 0;
395 	break;
396     case ERTS_POLL_WOKEN_INTR:
397 	res = EINTR;
398 	break;
399     case ERTS_POLL_WOKEN_TIMEDOUT:
400 	res = ETIMEDOUT;
401 	break;
402     default:
403 	res = 0;
404 	erts_exit(ERTS_ABORT_EXIT,
405 		 "%s:%d: Internal error: Invalid wakeup_state=%d\n",
406 		 __FILE__, __LINE__, (int) wakeup_state);
407     }
408     return res;
409 }
410 
411 static ERTS_INLINE void
wake_poller(ErtsPollSet * ps,int io_ready)412 wake_poller(ErtsPollSet *ps, int io_ready)
413 {
414     erts_aint32_t wakeup_state;
415     if (io_ready) {
416         wakeup_state = erts_atomic32_xchg_relb(&ps->wakeup_state,
417                                                ERTS_POLL_WOKEN_IO_READY);
418     }
419     else {
420 	ERTS_THR_MEMORY_BARRIER;
421 	wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
422 	while (wakeup_state != ERTS_POLL_WOKEN_IO_READY
423 	       && wakeup_state != ERTS_POLL_WOKEN_INTR) {
424 	    erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
425 							  ERTS_POLL_WOKEN_INTR,
426 							  wakeup_state);
427 	    if (act == wakeup_state) {
428 		wakeup_state = act;
429 		break;
430 	    }
431 	    wakeup_state = act;
432 	}
433     }
434     if (wakeup_state == ERTS_POLL_NOT_WOKEN) {
435 	/*
436 	 * Since we don't know the internals of SetEvent() we issue
437 	 * a memory barrier as a safety precaution ensuring that
438 	 * the store we just made to wakeup_state wont be reordered
439 	 * with loads in SetEvent().
440 	 */
441 	ERTS_THR_MEMORY_BARRIER;
442 	SetEvent(ps->event_io_ready);
443     }
444 }
445 
446 static ERTS_INLINE void
reset_io_ready(ErtsPollSet * ps)447 reset_io_ready(ErtsPollSet *ps)
448 {
449     erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
450 }
451 
452 static ERTS_INLINE void
restore_io_ready(ErtsPollSet * ps)453 restore_io_ready(ErtsPollSet *ps)
454 {
455     erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_IO_READY);
456 }
457 
458 /*
459  * notify_io_ready() is used by threads waiting for events, when
460  * notifying a poller thread about I/O ready.
461  */
462 static ERTS_INLINE void
notify_io_ready(ErtsPollSet * ps)463 notify_io_ready(ErtsPollSet *ps)
464 {
465     wake_poller(ps, 1);
466 }
467 
468 static ERTS_INLINE void
reset_interrupt(ErtsPollSet * ps)469 reset_interrupt(ErtsPollSet *ps)
470 {
471     /* We need to keep io-ready if set */
472     erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
473     while (wakeup_state != ERTS_POLL_WOKEN_IO_READY
474 	   && wakeup_state != ERTS_POLL_NOT_WOKEN) {
475 	erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
476 						      ERTS_POLL_NOT_WOKEN,
477 						      wakeup_state);
478 	if (wakeup_state == act)
479 	    break;
480 	wakeup_state = act;
481     }
482     ERTS_THR_MEMORY_BARRIER;
483 }
484 
485 static ERTS_INLINE void
set_interrupt(ErtsPollSet * ps)486 set_interrupt(ErtsPollSet *ps)
487 {
488     wake_poller(ps, 0);
489 }
490 
setup_standby_wait(ErtsPollSet * ps,int num_threads)491 static void setup_standby_wait(ErtsPollSet *ps, int num_threads)
492 {
493     EnterCriticalSection(&(ps->standby_crit));
494     ps->standby_wait_counter = num_threads;
495     ResetEvent(ps->standby_wait_event);
496     LeaveCriticalSection(&(ps->standby_crit));
497 }
498 
signal_standby(ErtsPollSet * ps)499 static void signal_standby(ErtsPollSet *ps)
500 {
501     EnterCriticalSection(&(ps->standby_crit));
502     --(ps->standby_wait_counter);
503     if (ps->standby_wait_counter < 0) {
504 	LeaveCriticalSection(&(ps->standby_crit));
505 	erts_exit(ERTS_ERROR_EXIT,"Standby signalled by more threads than expected");
506     }
507     if (!(ps->standby_wait_counter)) {
508 	SetEvent(ps->standby_wait_event);
509     }
510     LeaveCriticalSection(&(ps->standby_crit));
511 }
512 
wait_standby(ErtsPollSet * ps)513 static void wait_standby(ErtsPollSet *ps)
514 {
515     WaitForSingleObject(ps->standby_wait_event,INFINITE);
516 }
517 
remove_event_from_set(Waiter * w,int j)518 static void remove_event_from_set(Waiter *w, int j)
519 {
520     w->evdata[j]->event = INVALID_HANDLE_VALUE;
521     w->evdata[j]->mode = 0;
522     w->evdata[j]->next = w->first_free_evdata;
523     w->first_free_evdata = w->evdata[j];
524 
525     /*
526      * If the event is active, we will overwrite it
527      * with the last active event and make the hole
528      * the first non-active event.
529      */
530 
531     if (j < w->active_events) {
532 	w->active_events--;
533 	w->highwater--;
534 	w->total_events--;
535 	w->events[j] = w->events[w->active_events];
536 	w->evdata[j] = w->evdata[w->active_events];
537 	w->events[w->active_events] = w->events[w->highwater];
538 	w->evdata[w->active_events] = w->evdata[w->highwater];
539 	w->events[w->highwater] =  w->events[w->total_events];
540 	w->evdata[w->highwater] =  w->evdata[w->total_events];
541     } else if (j < w->highwater) {
542 	w->highwater--;
543 	w->total_events--;
544 	w->events[j] = w->events[w->highwater];
545 	w->evdata[j] = w->evdata[w->highwater];
546 	w->events[w->highwater] =  w->events[w->total_events];
547 	w->evdata[w->highwater] =  w->evdata[w->total_events];
548     } else {
549 	w->total_events--;
550 	w->events[j] = w->events[w->total_events];
551 	w->evdata[j] = w->evdata[w->total_events];
552     }
553 
554 #ifdef DEBUG
555     w->events[w->total_events] = (HANDLE) CleanLandFill;
556     w->evdata[w->total_events] = (EventData *) CleanLandFill;
557     consistency_check(w);
558 #endif
559 }
560 
561 /*
562  * Thread handling
563  */
564 
565 #ifdef DEBUG
consistency_check(Waiter * w)566 static void consistency_check(Waiter* w)
567 {
568     int i;
569 
570     ASSERT(w->active_events <= w->total_events);
571     ASSERT(w->evdata[0] == NULL);
572 
573     for (i = 1; i < w->total_events; i++) {
574 	ASSERT(w->events[i] == w->evdata[i]->event);
575 	ASSERT(w->evdata[i]->mode != 0);
576     }
577 }
578 
579 #endif
580 
new_waiter(ErtsPollSet * ps)581 static void new_waiter(ErtsPollSet *ps)
582 {
583     register Waiter* w;
584     DWORD tid;			/* Id for thread. */
585     erts_tid_t thread;
586     int i;
587     int tres;
588 
589     if (ps->num_waiters == ps->allocated_waiters) {
590 	Uint old_size = sizeof(Waiter *)*ps->allocated_waiters;
591 	ps->allocated_waiters += 64;
592 	ps->waiter = SEL_REALLOC(ERTS_ALC_T_WAITER_OBJ,
593 				 (void *) ps->waiter,
594 				 old_size,
595 				 sizeof(Waiter *) * (ps->allocated_waiters));
596     }
597 
598     w = (Waiter *) SEL_ALLOC(ERTS_ALC_T_WAITER_OBJ, sizeof(Waiter));
599     ps->waiter[ps->num_waiters] = w;
600 
601     w->events[0] = CreateAutoEvent(FALSE);
602     w->evdata[0] = NULL;	/* Should never be used. */
603     w->active_events = 1;
604     w->highwater = 1;
605     w->total_events = 1;
606     erts_mtx_init(&w->mtx, "pollwaiter", NIL, ERTS_LOCK_FLAGS_CATEGORY_IO);
607 
608 
609     /*
610      * Form the free list of EventData objects.
611      */
612 
613     w->evdata_heap[0].next = 0;	/* Last in free list. */
614     for (i = 1; i < MAXIMUM_WAIT_OBJECTS; i++) {
615 	w->evdata_heap[i].next = w->evdata_heap+i-1;
616     }
617     w->first_free_evdata = w->evdata_heap+MAXIMUM_WAIT_OBJECTS-1;
618 
619     /*
620      * Create the other events.
621      */
622 
623     w->go_ahead = CreateAutoEvent(FALSE);
624 
625     /*
626      * Create the thread.
627      */
628     w->xdata = ps;
629     erts_thr_create(&thread, &threaded_waiter, w, NULL);
630     w->this = thread;
631 
632     /*
633      * Finally, done.
634      */
635 
636     (ps->num_waiters)++;
637 }
638 
break_waiter(void * param)639 static void *break_waiter(void *param)
640 {
641     HANDLE harr[2];
642     int i = 0;
643     harr[i++] = erts_sys_break_event;
644     if (erts_service_event != NULL) {
645 	harr[i++] = erts_service_event;
646     }
647 
648     for(;;) {
649 	switch (WaitForMultipleObjects(i,harr,FALSE,INFINITE)) {
650 	case WAIT_OBJECT_0:
651 	    ResetEvent(harr[0]);
652 	    erts_mtx_lock(&break_waiter_lock);
653 	    erts_atomic32_set_nob(&break_waiter_state,BREAK_WAITER_GOT_BREAK);
654 	    ERTS_THR_MEMORY_BARRIER;
655 	    SetEvent(break_happened_event);
656 	    erts_mtx_unlock(&break_waiter_lock);
657 	    break;
658 	case (WAIT_OBJECT_0+1):
659 	    ResetEvent(harr[1]);
660 	    erts_mtx_lock(&break_waiter_lock);
661 	    erts_atomic32_set_nob(&break_waiter_state,BREAK_WAITER_GOT_HALT);
662 	    ERTS_THR_MEMORY_BARRIER;
663 	    SetEvent(break_happened_event);
664 	    erts_mtx_unlock(&break_waiter_lock);
665 	    break;
666 	default:
667 	    erts_exit(ERTS_ERROR_EXIT,"Unexpected event in break_waiter");
668 	}
669     }
670 }
671 
threaded_waiter(void * param)672 static void *threaded_waiter(void *param)
673 {
674     register Waiter* w = (Waiter *) param;
675     ErtsPollSet *ps = (ErtsPollSet*) w->xdata;
676 #ifdef HARD_POLL_DEBUG2
677     HANDLE oold_fired[64];
678     int num_oold_fired;
679     HANDLE old_fired[64];
680     int num_old_fired = 0;
681     HANDLE fired[64];
682     int num_fired = 0;
683     HANDLE errors[1024];
684     int num_errors = 0;
685     HANDLE save_events[64];
686     int save_active_events;
687     int save_total_events;
688     int save_highwater;
689 #endif
690 
691  again:
692     WaitForSingleObject(w->go_ahead, INFINITE);
693     /* Atomic enough when just checking, skip lock */
694     if (w->total_events == 0) {
695 	return NULL;
696     }
697     if (w->active_events == 0) {
698 	goto again;
699     }
700     ASSERT(w->evdata[0] == NULL);
701 #ifdef HARD_POLL_DEBUG2
702     num_oold_fired = num_old_fired;
703     memcpy(oold_fired,old_fired,num_old_fired*sizeof(HANDLE));
704     num_old_fired = num_fired;
705     memcpy(old_fired,fired,num_fired*sizeof(HANDLE));
706     num_fired = 0;
707 #endif
708     for (;;) {
709 	int i;
710 	int j;
711 #ifdef HARD_POLL_DEBUG2
712 	erts_mtx_lock(&w->mtx);
713 	memcpy(save_events,w->events,w->active_events*sizeof(HANDLE));
714 	save_active_events = w->active_events;
715 	save_total_events = w->total_events;
716 	save_highwater = w->highwater;
717 	erts_mtx_unlock(&w->mtx);
718 #endif
719 	i = WaitForMultipleObjects(w->active_events, w->events, FALSE, INFINITE);
720 	switch (i) {
721 	case WAIT_FAILED:
722 	    DEBUGF(("Wait failed: %s\n", last_error()));
723 	    erts_mtx_lock(&w->mtx);
724 	    /* Dont wait for our signal event */
725 	    for (j = 1; j < w->active_events; j++) {
726 		int tmp;
727 		if ((tmp = WaitForSingleObject(w->events[j], 0))
728 		    == WAIT_FAILED) {
729 		    DEBUGF(("Invalid handle: i = %d, handle = 0x%0x\n",
730 			    j, w->events[j]));
731 #ifdef HARD_POLL_DEBUG2
732 		    if (num_errors < 1024)
733 			errors[num_errors++] = w->events[j];
734 #endif
735 #ifdef HARD_POLL_DEBUG
736 		    poll_debug_died(w->events[j]);
737 #endif
738 		    remove_event_from_set(w,j);
739 #ifdef DEBUG
740 		    consistency_check(w);
741 #endif
742 		} else if (tmp == WAIT_OBJECT_0) {
743 		    i = WAIT_OBJECT_0 + j;
744 		    goto event_happened;
745 		}
746 	    }
747 	    erts_mtx_unlock(&w->mtx);
748 	    break;
749 	case WAIT_OBJECT_0:
750 	    signal_standby(ps);
751 	    goto again;
752 #ifdef DEBUG
753 	case WAIT_TIMEOUT:
754 	    ASSERT(0);
755 #endif
756 	default:
757 	    erts_mtx_lock(&w->mtx);
758 #ifdef HARD_POLL_DEBUG2
759 	    {
760 		int x = memcmp(save_events,w->events,w->active_events*sizeof(HANDLE));
761 		ASSERT(x == 0 && save_active_events == w->active_events);
762 	    }
763 #endif
764 event_happened:
765 #ifdef DEBUG
766 	    consistency_check(w);
767 #endif
768 	    ASSERT(WAIT_OBJECT_0 < i && i < WAIT_OBJECT_0+w->active_events);
769 	    notify_io_ready(ps);
770 
771 	    /*
772 	     * The main thread wont start working on our arrays until we're
773 	     * stopped, so we can work in peace although the main thread runs
774 	     */
775 	    ASSERT(i >= WAIT_OBJECT_0+1);
776 	    i -= WAIT_OBJECT_0;
777 	    ASSERT(i >= 1);
778 	    HARDDEBUGF(("i = %d, a,h,t = %d,%d,%d",i,
779 			w->active_events, w->highwater, w->total_events));
780 	    w->active_events--;
781 #ifdef HARD_POLL_DEBUG2
782 	    fired[num_fired++] = w->events[i];
783 #endif
784 #ifdef HARD_POLL_DEBUG
785 	    poll_debug_fired(w->events[i]);
786 #endif
787 	    if (i < w->active_events) {
788 		HANDLE te = w->events[i];
789 		EventData* tp = w->evdata[i];
790 		w->events[i] = w->events[w->active_events];
791 		w->evdata[i] = w->evdata[w->active_events];
792 		w->events[w->active_events] = te;
793 		w->evdata[w->active_events] = tp;
794 	    }
795 	    HARDDEBUGF(("i = %d, a,h,t = %d,%d,%d",i,
796 			w->active_events, w->highwater, w->total_events));
797 #ifdef DEBUG
798 	    consistency_check(w);
799 #endif
800 	    erts_mtx_unlock(&w->mtx);
801 	    break;
802 	}
803     }
804 }
805 
806 /*
807  * The actual adding and removing from pollset utilities
808  */
809 
set_driver_select(ErtsPollSet * ps,HANDLE event,ErtsPollEvents mode)810 static int set_driver_select(ErtsPollSet *ps, HANDLE event, ErtsPollEvents mode)
811 {
812     int i;
813     int best_waiter = -1;	/* The waiter with lowest number of events. */
814     int lowest = MAXIMUM_WAIT_OBJECTS; /* Lowest number of events
815 					* in any waiter.
816 					*/
817     EventData* ev;
818     Waiter* w;
819 
820     /*
821      * Find the waiter which is least busy.
822      */
823 
824 #ifdef HARD_POLL_DEBUG
825     poll_debug_select(event, mode);
826 #endif
827 
828     /* total_events can no longer be read without the lock, it's changed in the waiter */
829     for (i = 0; i < ps->num_waiters; i++) {
830 	erts_mtx_lock(&(ps->waiter[i]->mtx));
831 	if (ps->waiter[i]->total_events < lowest) {
832 	    lowest = ps->waiter[i]->total_events;
833 	    best_waiter = i;
834 	}
835 	erts_mtx_unlock(&(ps->waiter[i]->mtx));
836     }
837 
838     /*
839      * Stop the selected waiter, or start a new waiter if all were busy.
840      */
841 
842     if (best_waiter >= 0) {
843 	w = ps->waiter[best_waiter];
844 	STOP_WAITER(ps,w);
845 	erts_mtx_lock(&w->mtx);
846     } else {
847 	new_waiter(ps);
848 	w = ps->waiter[(ps->num_waiters)-1];
849 	erts_mtx_lock(&w->mtx);
850     }
851 
852 #ifdef DEBUG
853     consistency_check(w);
854 #endif
855 
856     /*
857      * Allocate and initialize an EventData structure.
858      */
859 
860     ev = w->first_free_evdata;
861     w->first_free_evdata = ev->next;
862     ev->event = event;
863     ev->mode = mode;
864     ev->next = NULL;
865 
866     /*
867      * At this point, the selected waiter (newly-created or not) is
868      * standing by.  Put the new event into the active part of the array.
869      */
870 
871     if (w->active_events < w->total_events) {
872 	/*
873 	 * Move the first event beyond the active part of the array to
874 	 * the very end to make place for the new event.
875 	 */
876 
877 #ifdef HARD_POLL_DEBUG
878 	poll_debug_moved(w->events[w->highwater],w->highwater,w->total_events);
879 #endif
880 	w->events[w->total_events] = w->events[w->highwater];
881 	w->evdata[w->total_events] = w->evdata[w->highwater];
882 #ifdef HARD_POLL_DEBUG
883 	poll_debug_moved(w->events[w->active_events],w->active_events,w->highwater);
884 #endif
885 	w->events[w->highwater] = w->events[w->active_events];
886 	w->evdata[w->highwater] = w->evdata[w->active_events];
887 
888     }
889     w->events[w->active_events] = event;
890     w->evdata[w->active_events] = ev;
891     w->active_events++;
892     w->highwater++;
893     w->total_events++;
894 
895 #ifdef DEBUG
896     consistency_check(w);
897 #endif
898     erts_mtx_unlock(&w->mtx);
899     START_WAITER(ps,w);
900     HARDDEBUGF(("%d: add select %d %d %d %d", event, best_waiter,
901 		w->active_events,w->highwater,w->total_events));
902     return mode;
903 }
904 
905 
cancel_driver_select(ErtsPollSet * ps,HANDLE event)906 static int cancel_driver_select(ErtsPollSet *ps, HANDLE event)
907 {
908     int i;
909 
910     ASSERT(event != INVALID_HANDLE_VALUE);
911  restart:
912     for (i = 0; i < ps->num_waiters; i++) {
913 	Waiter* w = ps->waiter[i];
914 	int j;
915 
916 	erts_mtx_lock(&w->mtx);
917 #ifdef DEBUG
918 	consistency_check(w);
919 #endif
920 	for (j = 0; j < w->total_events; j++) {
921 	    if (w->events[j] == event) {
922 		int stopped = 0;
923 		/*
924 		 * Free the event's EventData structure.
925 		 */
926 
927 		if (j < w->active_events) {
928 		    HARDDEBUGF(("Stopped in remove select"));
929 		    stopped = 1;
930 		    erts_mtx_unlock(&w->mtx);
931 		    STOP_WAITER(ps,w);
932 		    erts_mtx_lock(&w->mtx);
933 		    if ( j >= w->active_events || w->events[j] != event) {
934 			/* things happened while unlocked */
935 			START_WAITER(ps,w);
936 			erts_mtx_unlock(&w->mtx);
937 			goto restart;
938 		    }
939 		}
940 #ifdef HARD_POLL_DEBUG
941 		poll_debug_deselect(w->events[j]);
942 #endif
943 		remove_event_from_set(w, j);
944 		if (stopped) {
945 		    START_WAITER(ps,w);
946 		}
947 		HARDDEBUGF(("removed select %d,%d %d %d %d",i,j,
948 			    w->active_events,w->highwater,w->total_events));
949 		break;
950 	    }
951 	}
952 	erts_mtx_unlock(&w->mtx);
953     }
954     return 0;
955 }
956 
957 /*
958  * Interface functions
959  */
960 
erts_poll_interrupt(ErtsPollSet * ps,int set)961 void  erts_poll_interrupt(ErtsPollSet *ps, int set /* bool */)
962 {
963     HARDTRACEF(("In erts_poll_interrupt(%p, %d)",ps,set));
964     if (!set)
965 	reset_interrupt(ps);
966     else
967 	set_interrupt(ps);
968     HARDTRACEF(("Out erts_poll_interrupt(%p, %d)",ps,set));
969 }
970 
971 
972 /*
973  * Windows is special, there is actually only one event type, and
974  * the only difference between ERTS_POLL_EV_IN and ERTS_POLL_EV_OUT
975  * is which driver callback will eventually be called.
976  */
do_poll_control(ErtsPollSet * ps,ErtsSysFdType fd,ErtsPollOp op,ErtsPollEvents pe)977 static ErtsPollEvents do_poll_control(ErtsPollSet *ps,
978                                       ErtsSysFdType fd,
979                                       ErtsPollOp op,
980                                       ErtsPollEvents pe)
981 {
982     HANDLE event = (HANDLE) fd;
983     ErtsPollEvents mode;
984     ErtsPollEvents result;
985     ASSERT(event != INVALID_HANDLE_VALUE);
986 
987     if (op != ERTS_POLL_OP_DEL) {
988 	if (pe & ERTS_POLL_EV_IN || !(pe & ERTS_POLL_EV_OUT )) {
989 	    mode = ERTS_POLL_EV_IN;
990 	} else {
991 	    mode = ERTS_POLL_EV_OUT; /* ready output only in this case */
992 	}
993 	result = set_driver_select(ps, event, mode);
994     } else {
995 	result = cancel_driver_select(ps, event);
996     }
997     return result;
998 }
999 
erts_poll_control(ErtsPollSet * ps,ErtsSysFdType fd,ErtsPollOp op,ErtsPollEvents pe,int * do_wake)1000 ErtsPollEvents erts_poll_control(ErtsPollSet *ps,
1001 				 ErtsSysFdType fd,
1002                                  ErtsPollOp op,
1003 				 ErtsPollEvents pe,
1004 				 int* do_wake) /* In: Wake up polling thread */
1005 				               /* Out: Poller is woken */
1006 {
1007     ErtsPollEvents result;
1008     HARDTRACEF(("In erts_poll_control(0x%08X, %s, %s)",
1009                 (unsigned long) fd, op2str(op), ev2str(pe)));
1010     ERTS_POLLSET_LOCK(ps);
1011     result=do_poll_control(ps, fd, op, pe);
1012     ERTS_POLLSET_UNLOCK(ps);
1013     *do_wake = 0; /* Never any need to wake polling threads on windows */
1014     HARDTRACEF(("Out erts_poll_control -> %u",(unsigned) result));
1015     return result;
1016 }
1017 
erts_poll_wait(ErtsPollSet * ps,ErtsPollResFd pr[],int * len,ErtsThrPrgrData * tpd,Sint64 timeout_in)1018 int erts_poll_wait(ErtsPollSet *ps,
1019 		   ErtsPollResFd pr[],
1020 		   int *len,
1021                    ErtsThrPrgrData *tpd,
1022                    Sint64 timeout_in)
1023 {
1024     int no_fds;
1025     DWORD timeout = timeout_in == -1 ? INFINITE : timeout_in;
1026     EventData* ev;
1027     int res = 0;
1028     int num = 0;
1029     int n;
1030     int i;
1031     int break_state;
1032 
1033     HARDTRACEF(("In erts_poll_wait"));
1034     ERTS_POLLSET_LOCK(ps);
1035 
1036     no_fds = *len;
1037 
1038 #ifdef ERTS_POLL_MAX_RES
1039     if (no_fds >= ERTS_POLL_MAX_RES)
1040 	no_fds = ERTS_POLL_MAX_RES;
1041 #endif
1042 
1043     ResetEvent(ps->event_io_ready);
1044     /*
1045      * Since we don't know the internals of ResetEvent() we issue
1046      * a memory barrier as a safety precaution ensuring that
1047      * the load of wakeup_state wont be reordered with stores made
1048      * by ResetEvent().
1049      */
1050     ERTS_THR_MEMORY_BARRIER;
1051     if (erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN)
1052        timeout = (DWORD) 0;
1053 
1054     if (!erts_atomic32_read_nob(&break_waiter_state)) {
1055 	HANDLE harr[2] = {ps->event_io_ready, break_happened_event};
1056 	int num_h = 2, handle;
1057         ERTS_MSACC_PUSH_STATE();
1058 
1059 	HARDDEBUGF(("Start waiting %d [%d]",num_h, (int) timeout));
1060 	ERTS_POLLSET_UNLOCK(ps);
1061 	erts_thr_progress_prepare_wait(tpd);
1062         ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_SLEEP);
1063 	handle = WaitForMultipleObjects(num_h, harr, FALSE, timeout);
1064 	erts_thr_progress_finalize_wait(tpd);
1065         ERTS_MSACC_POP_STATE();
1066 	ERTS_POLLSET_LOCK(ps);
1067 	HARDDEBUGF(("Stop waiting %d [%d]",num_h, (int) timeout));
1068         if (handle == WAIT_OBJECT_0)
1069             woke_up(ps, ERTS_POLL_WOKEN_TIMEDOUT);
1070     }
1071 
1072     ERTS_UNSET_BREAK_REQUESTED;
1073     if(erts_atomic32_read_nob(&break_waiter_state)) {
1074 	erts_mtx_lock(&break_waiter_lock);
1075 	break_state = erts_atomic32_read_nob(&break_waiter_state);
1076 	erts_atomic32_set_nob(&break_waiter_state,0);
1077 	ResetEvent(break_happened_event);
1078 	erts_mtx_unlock(&break_waiter_lock);
1079 	switch (break_state) {
1080 	case  BREAK_WAITER_GOT_BREAK:
1081             woke_up(ps, ERTS_POLL_WOKEN_INTR);
1082 	    ERTS_SET_BREAK_REQUESTED;
1083             /* Wake aux thread to get handle break */
1084             erts_aux_thread_poke();
1085 	    break;
1086 	case  BREAK_WAITER_GOT_HALT:
1087 	    erts_exit(0,"");
1088 	    break;
1089 	default:
1090 	    break;
1091 	}
1092     }
1093 
1094     res = wakeup_cause(ps);
1095     if (res != 0) {
1096 	HARDDEBUGF(("%s!", res == EINTR ? "EINTR" : "ETIMEDOUT"));
1097 	goto done;
1098     }
1099 
1100     reset_io_ready(ps);
1101 
1102     n = ps->num_waiters;
1103 
1104     for (i = 0; i < n; i++) {
1105 	Waiter* w = ps->waiter[i];
1106 	int j;
1107 	int first;
1108 	int last;
1109 	erts_mtx_lock(&w->mtx);
1110 #ifdef DEBUG
1111 	consistency_check(w);
1112 #endif
1113 
1114 	first = w->active_events;
1115 	last = w->highwater;
1116 	w->highwater = w->active_events;
1117 
1118 	for (j = last-1; j >= first; --j) {
1119 	    if (num >= no_fds) {
1120 		w->highwater=j+1;
1121 		erts_mtx_unlock(&w->mtx);
1122 		/* This might mean we still have data to report,
1123 		   restore flag indicating I/O ready! */
1124 		restore_io_ready(ps);
1125 		HARDDEBUGF(("To many FD's to report!"));
1126 		goto done;
1127 	    }
1128 	    HARDDEBUGF(("Report %d,%d",i,j));
1129 	    ERTS_POLL_RES_SET_FD(&pr[num], w->events[j]);
1130 	    ERTS_POLL_RES_SET_EVTS(&pr[num], w->evdata[j]->mode);
1131             remove_event_from_set(w, j);
1132 #ifdef HARD_POLL_DEBUG
1133 	    poll_debug_reported(w->events[j],w->highwater | (j << 16));
1134 	    poll_debug_reported(w->events[j],first | (last << 16));
1135 #endif
1136 	    ++num;
1137 	}
1138 
1139         w->total_events = w->highwater = w->active_events;
1140 
1141 #ifdef DEBUG
1142 	consistency_check(w);
1143 #endif
1144 	erts_mtx_unlock(&w->mtx);
1145     }
1146  done:
1147     *len = num;
1148     ERTS_POLLSET_UNLOCK(ps);
1149     HARDTRACEF(("Out erts_poll_wait"));
1150     return res;
1151 
1152 }
1153 
erts_poll_max_fds(void)1154 int erts_poll_max_fds(void)
1155 {
1156     int res = sys_max_files();
1157     HARDTRACEF(("In/Out erts_poll_max_fds -> %d",res));
1158     return res;
1159 }
1160 
erts_poll_info(ErtsPollSet * ps,ErtsPollInfo * pip)1161 void erts_poll_info(ErtsPollSet *ps,
1162 		    ErtsPollInfo *pip)
1163 {
1164     Uint size = 0;
1165     Uint num_events = 0;
1166     int i;
1167 
1168     HARDTRACEF(("In erts_poll_info"));
1169     ERTS_POLLSET_LOCK(ps);
1170 
1171     size += sizeof(struct erts_pollset);
1172     size += sizeof(Waiter *) * ps->allocated_waiters;
1173     for (i = 0; i < ps->num_waiters; ++i) {
1174 	Waiter *w = ps->waiter[i];
1175 	if (w != NULL) {
1176 	    size += sizeof(Waiter);
1177 	    erts_mtx_lock(&w->mtx);
1178 	    size += sizeof(EventData) * w->total_events;
1179 	    num_events += (w->total_events - 1); /* First event is internal */
1180 	    erts_mtx_unlock(&w->mtx);
1181 	}
1182     }
1183 
1184     pip->primary = "WaitForMultipleObjects";
1185 
1186     pip->kernel_poll = NULL;
1187 
1188     pip->memory_size = size;
1189 
1190     pip->poll_set_size = num_events;
1191 
1192     pip->lazy_updates = 0;
1193 
1194     pip->pending_updates = 0;
1195 
1196     pip->batch_updates = 0;
1197 
1198     pip->concurrent_updates = 0;
1199 
1200     pip->is_fallback = 0;
1201     ERTS_POLLSET_UNLOCK(ps);
1202 
1203     pip->max_fds = erts_poll_max_fds();
1204     HARDTRACEF(("Out erts_poll_info"));
1205 
1206 }
1207 
erts_poll_create_pollset(int no)1208 ErtsPollSet *erts_poll_create_pollset(int no)
1209 {
1210     ErtsPollSet *ps = SEL_ALLOC(ERTS_ALC_T_POLLSET,
1211 			       sizeof(struct erts_pollset));
1212     HARDTRACEF(("In erts_poll_create_pollset"));
1213 
1214     ps->num_waiters = 0;
1215     ps->allocated_waiters = 64;
1216     ps->waiter = SEL_ALLOC(ERTS_ALC_T_WAITER_OBJ,
1217 			   sizeof(Waiter *)*ps->allocated_waiters);
1218     InitializeCriticalSection(&(ps->standby_crit));
1219     ps->standby_wait_counter = 0;
1220     ps->event_io_ready = CreateManualEvent(FALSE);
1221     ps->standby_wait_event = CreateManualEvent(FALSE);
1222 
1223     erts_atomic32_init_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
1224     erts_mtx_init(&ps->mtx, "pollset", NIL, ERTS_LOCK_FLAGS_CATEGORY_IO);
1225 
1226     HARDTRACEF(("Out erts_poll_create_pollset"));
1227     return ps;
1228 }
1229 
erts_poll_destroy_pollset(ErtsPollSet * ps)1230 void erts_poll_destroy_pollset(ErtsPollSet *ps)
1231 {
1232     int i;
1233     HARDTRACEF(("In erts_poll_destroy_pollset"));
1234     ERTS_POLLSET_LOCK(ps);
1235     STOP_WAITERS(ps);
1236     for (i=0;i<ps->num_waiters;++i) {
1237 	Waiter *w = ps->waiter[i];
1238 	void *dummy;
1239 	erts_tid_t t = w->this;
1240 	/* Assume we're alone, no locking here... */
1241 	w->active_events = w->total_events = w->highwater = 0;
1242 	START_WAITER(ps,w);
1243 	erts_thr_join(t,&dummy);
1244 	CloseHandle(w->go_ahead);
1245 	CloseHandle(w->events[0]);
1246 	erts_mtx_destroy(&w->mtx);
1247 	SEL_FREE(ERTS_ALC_T_WAITER_OBJ, (void *) w);
1248     }
1249     SEL_FREE(ERTS_ALC_T_WAITER_OBJ,ps->waiter);
1250     CloseHandle(ps->event_io_ready);
1251     CloseHandle(ps->standby_wait_event);
1252     ERTS_POLLSET_UNLOCK(ps);
1253     erts_mtx_destroy(&ps->mtx);
1254     SEL_FREE(ERTS_ALC_T_POLLSET, (void *) ps);
1255     HARDTRACEF(("Out erts_poll_destroy_pollset"));
1256 }
1257 
1258 /*
1259  * Actually mostly initializes the friend module sys_interrupt...
1260  */
erts_poll_init(int * concurrent_updates)1261 void  erts_poll_init(int *concurrent_updates)
1262 {
1263 
1264 #ifdef HARD_POLL_DEBUG
1265     poll_debug_init();
1266 #endif
1267 
1268     if (concurrent_updates)
1269         *concurrent_updates = 0;
1270 
1271     HARDTRACEF(("In erts_poll_init"));
1272     erts_sys_break_event = CreateManualEvent(FALSE);
1273 
1274     erts_mtx_init(&break_waiter_lock, "break_waiter_lock", NIL,
1275         ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_IO);
1276     break_happened_event = CreateManualEvent(FALSE);
1277     erts_atomic32_init_nob(&break_waiter_state, 0);
1278 
1279     HARDTRACEF(("Out erts_poll_init"));
1280 }
1281 
erts_poll_late_init(void)1282 void erts_poll_late_init(void)
1283 {
1284     erts_tid_t thread;
1285     erts_thr_create(&thread, &break_waiter, NULL, NULL);
1286     ERTS_UNSET_BREAK_REQUESTED;
1287 }
1288 
1289 /*
1290  * Non windows friendly interface, not used when fd's are not continous
1291  */
erts_poll_get_selected_events(ErtsPollSet * ps,ErtsPollEvents ev[],int len)1292 void  erts_poll_get_selected_events(ErtsPollSet *ps,
1293 				    ErtsPollEvents ev[],
1294 				    int len)
1295 {
1296     int i;
1297     HARDTRACEF(("In erts_poll_get_selected_events"));
1298     for (i = 0; i < len; ++i)
1299 	ev[i] = ERTS_POLL_EV_NONE;
1300     HARDTRACEF(("Out erts_poll_get_selected_events"));
1301 }
1302