1 /*
2 * Copyright (c) 2001-2015 Hypertriton, Inc. <http://hypertriton.com/>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
15 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR
18 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
20 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
21 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
22 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
23 * USE OF THIS SOFTWARE EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26 /*
27 * Implementation of AG_Object events / virtual functions, as well as
28 * the generic AG_EventLoop(3) interface.
29 */
30
31 #include <agar/core/core.h>
32
33 #include <string.h>
34 #include <stdarg.h>
35
36 #include <agar/config/have_kqueue.h>
37 #include <agar/config/have_timerfd.h>
38 #include <agar/config/have_select.h>
39 #include <agar/config/ag_debug_core.h>
40
41 #if defined(HAVE_KQUEUE)
42 # ifdef __NetBSD__
43 # define _NETBSD_SOURCE
44 # endif
45 # include <sys/types.h>
46 # include <sys/event.h>
47 # include <unistd.h>
48 # include <errno.h>
49 #endif
50 #if defined(HAVE_TIMERFD)
51 # include <sys/timerfd.h>
52 # include <errno.h>
53 #endif
54 #if defined(HAVE_SELECT)
55 # include <sys/types.h>
56 # include <sys/time.h>
57 # include <sys/select.h>
58 # include <unistd.h>
59 # include <errno.h>
60 #endif
61
62 AG_EventSource *agEventSource = NULL; /* Event source (thread-local) */
63 #ifdef AG_THREADS
64 AG_ThreadKey agEventSourceKey;
65 #endif
66
67 #ifdef HAVE_KQUEUE
68 #define EVBUFSIZE 2
69 typedef struct ag_event_source_kqueue {
70 struct ag_event_source _inherit;
71 int fd; /* kqueue() fd */
72 struct kevent *changes; /* Queued changes */
73 Uint nChanges;
74 Uint maxChanges;
75 struct kevent events[EVBUFSIZE]; /* Input event buffer */
76 } AG_EventSourceKQUEUE;
77 #endif /* HAVE_KQUEUE */
78
79 /* #define DEBUG_TIMERS */
80
81 #ifdef __NetBSD__
82 # define AG_EV_SET(kevp,a,b,c,d,e,f) EV_SET((kevp),(uintptr_t)(a),(b),(c),(d),(e),(intptr_t)(f))
83 #else
84 # define AG_EV_SET(kevp,a,b,c,d,e,f) EV_SET((kevp),(a),(b),(c),(d),(e),(f))
85 #endif
86
87 /* Initialize a pointer argument. */
88 static __inline__ void
InitPointerArg(AG_Variable * V,void * p)89 InitPointerArg(AG_Variable *V, void *p)
90 {
91 V->name[0] = '\0';
92 V->type = AG_VARIABLE_POINTER;
93 V->fn.fnVoid = NULL;
94 V->mutex = NULL;
95 V->data.p = p;
96 }
97
98 static __inline__ void
InitEvent(AG_Event * ev,AG_Object * ob)99 InitEvent(AG_Event *ev, AG_Object *ob)
100 {
101 ev->flags = 0;
102 ev->argc = 1;
103 ev->argc0 = 1;
104 ev->fn.fnVoid = NULL;
105 InitPointerArg(&ev->argv[0], ob);
106 }
107
108 /* Initialize an AG_Event structure. */
109 void
AG_EventInit(AG_Event * ev)110 AG_EventInit(AG_Event *ev)
111 {
112 InitEvent(ev, NULL);
113 }
114
115 /* Initialize an AG_Event structure with the specified arguments. */
116 void
AG_EventArgs(AG_Event * ev,const char * fmt,...)117 AG_EventArgs(AG_Event *ev, const char *fmt, ...)
118 {
119 InitEvent(ev, NULL);
120 AG_EVENT_GET_ARGS(ev, fmt);
121 ev->argc0 = ev->argc;
122 }
123
124 /*
125 * Configure an event handler routine for a given event.
126 * If a handler routine already exists, replace it.
127 */
128 AG_Event *
AG_SetEvent(void * p,const char * name,AG_EventFn fn,const char * fmt,...)129 AG_SetEvent(void *p, const char *name, AG_EventFn fn, const char *fmt, ...)
130 {
131 AG_Object *ob = p;
132 AG_Event *ev;
133
134 AG_ObjectLock(ob);
135
136 if (name != NULL) {
137 TAILQ_FOREACH(ev, &ob->events, events)
138 if (strcmp(ev->name, name) == 0)
139 break;
140 } else {
141 ev = NULL;
142 }
143 if (ev == NULL) {
144 ev = Malloc(sizeof(AG_Event));
145 InitEvent(ev, ob);
146 if (name != NULL) {
147 Strlcpy(ev->name, name, sizeof(ev->name));
148 } else {
149 ev->name[0] = '\0';
150 }
151 TAILQ_INSERT_TAIL(&ob->events, ev, events);
152 } else {
153 ev->argc = 1;
154 ev->argc0 = 1;
155 }
156 InitPointerArg(&ev->argv[0], ob);
157 ev->fn.fnVoid = fn;
158 AG_EVENT_GET_ARGS(ev, fmt);
159 ev->argc0 = ev->argc;
160
161 AG_ObjectUnlock(ob);
162 return (ev);
163 }
164
165 /*
166 * Configure an event handler routine for a given event.
167 * If a handler routine already exists, don't replace it.
168 */
169 AG_Event *
AG_AddEvent(void * p,const char * name,AG_EventFn fn,const char * fmt,...)170 AG_AddEvent(void *p, const char *name, AG_EventFn fn, const char *fmt, ...)
171 {
172 AG_Object *ob = p;
173 AG_Event *ev, *evOther;
174
175 AG_ObjectLock(ob);
176
177 ev = Malloc(sizeof(AG_Event));
178 InitEvent(ev, ob);
179
180 if (name != NULL) {
181 TAILQ_FOREACH(evOther, &ob->events, events) {
182 if (strcmp(evOther->name, name) == 0)
183 break;
184 }
185 if (evOther != NULL) {
186 ev->flags = evOther->flags;
187 }
188 Strlcpy(ev->name, name, sizeof(ev->name));
189 } else {
190 ev->name[0] = '\0';
191 }
192
193 ev->fn.fnVoid = fn;
194 AG_EVENT_GET_ARGS(ev, fmt);
195 ev->argc0 = ev->argc;
196
197 TAILQ_INSERT_TAIL(&ob->events, ev, events);
198 AG_ObjectUnlock(ob);
199 return (ev);
200 }
201
202 /*
203 * Anonymous Function Constructors
204 */
205 #undef AG_SET_TYPED_FN
206 #define AG_SET_TYPED_FN(memb) \
207 AG_Object *ob = p; \
208 AG_Event *ev; \
209 \
210 ev = Malloc(sizeof(AG_Event)); \
211 InitEvent(ev, ob); \
212 ev->name[0] = '\0'; \
213 ev->fn.memb = fn; \
214 InitPointerArg(&ev->argv[0], ob); \
215 AG_EVENT_GET_ARGS(ev, fmt); \
216 \
217 AG_ObjectLock(ob); \
218 TAILQ_INSERT_TAIL(&ob->events, ev, events); \
219 ev->argc0 = ev->argc; \
220 AG_ObjectUnlock(ob); \
221 return (AG_Function *)ev
222
AG_SetVoidFn(void * p,AG_VoidFn fn,const char * fmt,...)223 AG_Function *AG_SetVoidFn(void *p, AG_VoidFn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnVoid); }
AG_SetIntFn(void * p,AG_IntFn fn,const char * fmt,...)224 AG_Function *AG_SetIntFn(void *p, AG_IntFn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnInt); }
AG_SetUint8Fn(void * p,AG_Uint8Fn fn,const char * fmt,...)225 AG_Function *AG_SetUint8Fn(void *p, AG_Uint8Fn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnUint8); }
AG_SetSint8Fn(void * p,AG_Sint8Fn fn,const char * fmt,...)226 AG_Function *AG_SetSint8Fn(void *p, AG_Sint8Fn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnSint8); }
AG_SetUint16Fn(void * p,AG_Uint16Fn fn,const char * fmt,...)227 AG_Function *AG_SetUint16Fn(void *p, AG_Uint16Fn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnUint16); }
AG_SetSint16Fn(void * p,AG_Sint16Fn fn,const char * fmt,...)228 AG_Function *AG_SetSint16Fn(void *p, AG_Sint16Fn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnSint16); }
AG_SetUint32Fn(void * p,AG_Uint32Fn fn,const char * fmt,...)229 AG_Function *AG_SetUint32Fn(void *p, AG_Uint32Fn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnUint32); }
AG_SetSint32Fn(void * p,AG_Sint32Fn fn,const char * fmt,...)230 AG_Function *AG_SetSint32Fn(void *p, AG_Sint32Fn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnSint32); }
231 #ifdef AG_HAVE_64BIT
AG_SetUint64Fn(void * p,AG_Uint64Fn fn,const char * fmt,...)232 AG_Function *AG_SetUint64Fn(void *p, AG_Uint64Fn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnUint64); }
AG_SetSint64Fn(void * p,AG_Sint64Fn fn,const char * fmt,...)233 AG_Function *AG_SetSint64Fn(void *p, AG_Sint64Fn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnSint64); }
234 #endif
AG_SetFloatFn(void * p,AG_FloatFn fn,const char * fmt,...)235 AG_Function *AG_SetFloatFn(void *p, AG_FloatFn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnFloat); }
AG_SetDoubleFn(void * p,AG_DoubleFn fn,const char * fmt,...)236 AG_Function *AG_SetDoubleFn(void *p, AG_DoubleFn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnDouble); }
237 #ifdef AG_HAVE_LONG_DOUBLE
AG_SetLongDoubleFn(void * p,AG_LongDoubleFn fn,const char * fmt,...)238 AG_Function *AG_SetLongDoubleFn(void *p, AG_LongDoubleFn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnLongDouble); }
239 #endif
AG_SetStringFn(void * p,AG_StringFn fn,const char * fmt,...)240 AG_Function *AG_SetStringFn(void *p, AG_StringFn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnString); }
AG_SetPointerFn(void * p,AG_PointerFn fn,const char * fmt,...)241 AG_Function *AG_SetPointerFn(void *p, AG_PointerFn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnPointer); }
AG_SetConstPointerFn(void * p,AG_ConstPointerFn fn,const char * fmt,...)242 AG_Function *AG_SetConstPointerFn(void *p, AG_ConstPointerFn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnConstPointer); }
AG_SetTextFn(void * p,AG_TextFn fn,const char * fmt,...)243 AG_Function *AG_SetTextFn(void *p, AG_TextFn fn, const char *fmt, ...) { AG_SET_TYPED_FN(fnText); }
244
245 #undef AG_SET_TYPED_FN
246
247 /* Delete an event handler by name. */
248 void
AG_UnsetEvent(void * p,const char * name)249 AG_UnsetEvent(void *p, const char *name)
250 {
251 AG_Object *ob = p;
252 AG_Event *ev;
253
254 AG_ObjectLock(ob);
255 TAILQ_FOREACH(ev, &ob->events, events) {
256 if (strcmp(name, ev->name) == 0)
257 break;
258 }
259 if (ev == NULL) {
260 goto out;
261 }
262 TAILQ_REMOVE(&ob->events, ev, events);
263 free(ev);
264 out:
265 AG_ObjectUnlock(ob);
266 }
267
268 /* Look up an AG_Event by name. */
269 AG_Event *
AG_FindEventHandler(void * p,const char * name)270 AG_FindEventHandler(void *p, const char *name)
271 {
272 AG_Object *ob = p;
273 AG_Event *ev;
274
275 AG_ObjectLock(ob);
276 TAILQ_FOREACH(ev, &ob->events, events) {
277 if (strcmp(name, ev->name) == 0)
278 break;
279 }
280 AG_ObjectUnlock(ob);
281 return (ev);
282 }
283
284 /* Forward an event to an object's descendents. */
285 static void
PropagateEvent(AG_Object * sndr,AG_Object * rcvr,AG_Event * ev)286 PropagateEvent(AG_Object *sndr, AG_Object *rcvr, AG_Event *ev)
287 {
288 AG_Object *chld;
289
290 OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
291 PropagateEvent(rcvr, chld, ev);
292 }
293 AG_ForwardEvent(sndr, rcvr, ev);
294 }
295
296 /* Timeout callback for scheduled events. */
297 static Uint32
EventTimeout(AG_Timer * to,AG_Event * event)298 EventTimeout(AG_Timer *to, AG_Event *event)
299 {
300 AG_Object *ob = AG_SELF();
301 AG_Object *obSender = AG_PTR(1);
302 char *eventName = AG_STRING(2);
303 AG_Event *ev;
304
305 #ifdef AG_DEBUG_CORE
306 if (agDebugLvl >= 2)
307 Debug(ob, "Event <%s> timeout (%u ticks)\n", eventName, (Uint)to->ival);
308 #endif
309 TAILQ_FOREACH(ev, &ob->events, events) {
310 if (strcmp(eventName, ev->name) == 0)
311 break;
312 }
313 if (ev == NULL) {
314 return (0);
315 }
316 InitPointerArg(&ev->argv[ev->argc], obSender);
317
318 /* Propagate event to children. */
319 if (ev->flags & AG_EVENT_PROPAGATE) {
320 AG_Object *child;
321 #ifdef AG_DEBUG_CORE
322 if (agDebugLvl >= 2)
323 Debug(ob, "Propagate <%s> (timeout)\n", ev->name);
324 #endif
325 AG_LockVFS(ob);
326 OBJECT_FOREACH_CHILD(child, ob, ag_object) {
327 PropagateEvent(ob, child, ev);
328 }
329 AG_UnlockVFS(ob);
330 }
331
332 /* Invoke the event handler routine. */
333 if (ev->fn.fnVoid != NULL) {
334 ev->fn.fnVoid(ev);
335 }
336 return (0);
337 }
338
339
340 #ifdef AG_THREADS
341 /* Invoke an event handler routine asynchronously. */
342 static void *
EventThread(void * p)343 EventThread(void *p)
344 {
345 AG_Event *eev = p;
346 AG_Object *rcvr = eev->argv[0].data.p;
347 AG_Object *chld;
348
349 if (eev->flags & AG_EVENT_PROPAGATE) {
350 #ifdef AG_DEBUG_CORE
351 if (agDebugLvl >= 2)
352 Debug(rcvr, "Propagate <%s> (async)\n", eev->name);
353 #endif
354 AG_LockVFS(rcvr);
355 OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
356 PropagateEvent(rcvr, chld, eev);
357 }
358 AG_UnlockVFS(rcvr);
359 }
360 #ifdef AG_DEBUG_CORE
361 if (agDebugLvl >= 2)
362 Debug(rcvr, "BEGIN event thread for <%s>\n", eev->name);
363 #endif
364 if (eev->fn.fnVoid != NULL) {
365 eev->fn.fnVoid(eev);
366 }
367 #ifdef AG_DEBUG_CORE
368 if (agDebugLvl >= 2)
369 Debug(rcvr, "CLOSE event thread for <%s>\n", eev->name);
370 #endif
371 free(eev);
372 return (NULL);
373 }
374 #endif /* AG_THREADS */
375
376 void
AG_InitEventQ(AG_EventQ * eq)377 AG_InitEventQ(AG_EventQ *eq)
378 {
379 eq->nEvents = 0;
380 eq->events = NULL;
381 }
382
383 void
AG_FreeEventQ(AG_EventQ * eq)384 AG_FreeEventQ(AG_EventQ *eq)
385 {
386 Free(eq->events);
387 eq->nEvents = 0;
388 eq->events = NULL;
389 }
390
391 /* Add a new entry to an event queue. */
392 void
AG_QueueEvent(AG_EventQ * eq,const char * evname,const char * fmt,...)393 AG_QueueEvent(AG_EventQ *eq, const char *evname, const char *fmt, ...)
394 {
395 AG_Event *ev;
396
397 eq->events = Realloc(eq->events, (eq->nEvents+1)*sizeof(AG_Event));
398 ev = &eq->events[eq->nEvents++];
399 InitEvent(ev, NULL);
400 AG_EVENT_GET_ARGS(ev, fmt);
401 ev->argc0 = ev->argc;
402 }
403
404 /*
405 * Raise the specified event. Configured event handler routines may be
406 * called immediately, but they may also get called from a separate
407 * thread, or queued for later execution.
408 *
409 * The argument vector passed to the event handler function contains
410 * the AG_SetEvent() arguments, and any arguments specified here are
411 * appended to that list.
412 */
413 void
AG_PostEvent(void * sp,void * rp,const char * evname,const char * fmt,...)414 AG_PostEvent(void *sp, void *rp, const char *evname, const char *fmt, ...)
415 {
416 AG_Object *sndr = sp;
417 AG_Object *rcvr = rp;
418 AG_Event *ev;
419 AG_Object *chld;
420 int propagated = 0;
421
422 #ifdef AG_DEBUG_CORE
423 if (agDebugLvl >= 2)
424 Debug(rcvr, "Event <%s> posted from %s\n", evname, sndr ? sndr->name : "NULL");
425 #endif
426 AG_ObjectLock(rcvr);
427 TAILQ_FOREACH(ev, &rcvr->events, events) {
428 if (strcmp(evname, ev->name) != 0)
429 continue;
430 #ifdef AG_THREADS
431 if (ev->flags & AG_EVENT_ASYNC) {
432 AG_Thread th;
433 AG_Event *evNew;
434
435 evNew = Malloc(sizeof(AG_Event));
436 memcpy(evNew, ev, sizeof(AG_Event));
437 AG_EVENT_GET_ARGS(evNew, fmt);
438 InitPointerArg(&evNew->argv[evNew->argc], sndr);
439 if (evNew->flags & AG_EVENT_PROPAGATE) { propagated = 1; }
440 if (propagated) {
441 evNew->flags &= ~(AG_EVENT_PROPAGATE);
442 }
443 AG_ThreadCreate(&th, EventThread, evNew);
444 } else
445 #endif /* AG_THREADS */
446 {
447 AG_Event tmpev;
448
449 memcpy(&tmpev, ev, sizeof(AG_Event));
450 AG_EVENT_GET_ARGS(&tmpev, fmt);
451 InitPointerArg(&tmpev.argv[tmpev.argc], sndr);
452 if ((tmpev.flags & AG_EVENT_PROPAGATE) && !propagated) {
453 #ifdef AG_DEBUG_CORE
454 if (agDebugLvl >= 2)
455 Debug(rcvr, "Propagate <%s>\n", evname);
456 #endif
457 AG_LockVFS(rcvr);
458 OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
459 PropagateEvent(rcvr, chld, &tmpev);
460 }
461 AG_UnlockVFS(rcvr);
462 propagated = 1;
463 }
464 if (tmpev.fn.fnVoid != NULL)
465 tmpev.fn.fnVoid(&tmpev);
466 }
467 }
468 AG_ObjectUnlock(rcvr);
469 }
470
471 /*
472 * Variant of AG_PostEvent() which accepts an AG_Event argument instead
473 * of looking up the event handler by name.
474 */
475 void
AG_PostEventByPtr(void * sp,void * rp,AG_Event * ev,const char * fmt,...)476 AG_PostEventByPtr(void *sp, void *rp, AG_Event *ev, const char *fmt, ...)
477 {
478 AG_Object *sndr = sp;
479 AG_Object *rcvr = rp;
480 AG_Object *chld;
481 int propagated = 0;
482
483 #ifdef AG_DEBUG_CORE
484 if (agDebugLvl >= 2)
485 Debug(rcvr, "Event %p posted from %s\n", ev, sndr ? sndr->name : "NULL");
486 #endif
487 AG_ObjectLock(rcvr);
488 #ifdef AG_THREADS
489 if (ev->flags & AG_EVENT_ASYNC) {
490 AG_Thread th;
491 AG_Event *evNew;
492
493 evNew = Malloc(sizeof(AG_Event));
494 memcpy(evNew, ev, sizeof(AG_Event));
495 AG_EVENT_GET_ARGS(evNew, fmt);
496 InitPointerArg(&evNew->argv[evNew->argc], sndr);
497 if (evNew->flags & AG_EVENT_PROPAGATE) { propagated = 1; }
498 if (propagated) {
499 evNew->flags &= ~(AG_EVENT_PROPAGATE);
500 }
501 AG_ThreadCreate(&th, EventThread, evNew);
502 } else
503 #endif /* AG_THREADS */
504 {
505 AG_Event evTmp;
506
507 memcpy(&evTmp, ev, sizeof(AG_Event));
508 AG_EVENT_GET_ARGS(&evTmp, fmt);
509 InitPointerArg(&evTmp.argv[evTmp.argc], sndr);
510 if ((evTmp.flags & AG_EVENT_PROPAGATE) && !propagated) {
511 #ifdef AG_DEBUG_CORE
512 if (agDebugLvl >= 2)
513 Debug(rcvr, "Propagate event %p (post)\n", ev);
514 #endif
515 AG_LockVFS(rcvr);
516 OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
517 PropagateEvent(rcvr, chld, &evTmp);
518 }
519 AG_UnlockVFS(rcvr);
520 propagated = 1;
521 }
522 if (evTmp.fn.fnVoid != NULL)
523 evTmp.fn.fnVoid(&evTmp);
524 }
525 AG_ObjectUnlock(rcvr);
526 }
527
528 /*
529 * Schedule the execution of the named event in the given number
530 * of AG_Time(3) ticks.
531 *
532 * The argument vector passed to the event handler function contains
533 * the AG_SetEvent() arguments, and any arguments specified here are
534 * appended to that list.
535 */
536 int
AG_SchedEvent(void * pSndr,void * pRcvr,Uint32 ticks,const char * evname,const char * fmt,...)537 AG_SchedEvent(void *pSndr, void *pRcvr, Uint32 ticks, const char *evname,
538 const char *fmt, ...)
539 {
540 AG_Object *sndr = pSndr;
541 AG_Object *rcvr = pRcvr;
542 AG_Event *ev;
543 AG_Timer *to;
544
545 if ((to = TryMalloc(sizeof(AG_Timer))) == NULL) {
546 return (-1);
547 }
548 AG_InitTimer(to, evname, AG_TIMER_AUTO_FREE);
549
550 AG_LockTiming();
551 AG_ObjectLock(rcvr);
552
553 if (AG_AddTimer(rcvr, to, ticks,
554 EventTimeout, "%p,%s", sndr, evname) == -1) {
555 free(to);
556 goto fail;
557 }
558 ev = &to->fnEvent;
559 AG_EventInit(ev);
560 ev->argv[0].data.p = rcvr;
561 AG_EVENT_GET_ARGS(ev, fmt);
562 ev->argc0 = ev->argc;
563
564 AG_UnlockTiming();
565 AG_ObjectUnlock(rcvr);
566 return (0);
567 fail:
568 AG_UnlockTiming();
569 AG_ObjectUnlock(rcvr);
570 return (-1);
571 }
572
573 /*
574 * Forward an event, without modifying the original event structure, except
575 * for the sender and receiver pointers.
576 */
577 void
AG_ForwardEvent(void * pSndr,void * pRcvr,AG_Event * event)578 AG_ForwardEvent(void *pSndr, void *pRcvr, AG_Event *event)
579 {
580 AG_Object *sndr = pSndr;
581 AG_Object *rcvr = pRcvr;
582 AG_Object *chld;
583 AG_Event *ev;
584
585 #ifdef AG_DEBUG_CORE
586 if (agDebugLvl >= 2)
587 Debug(rcvr, "Event <%s> forwarded from %s\n", event->name, sndr ? sndr->name : "NULL");
588 #endif
589 AG_ObjectLock(rcvr);
590 TAILQ_FOREACH(ev, &rcvr->events, events) {
591 if (strcmp(event->name, ev->name) != 0)
592 continue;
593 #ifdef AG_THREADS
594 if (ev->flags & AG_EVENT_ASYNC) {
595 AG_Thread th;
596 AG_Event *evNew;
597
598 evNew = Malloc(sizeof(AG_Event));
599 memcpy(evNew, ev, sizeof(AG_Event));
600 InitPointerArg(&evNew->argv[0], rcvr);
601 InitPointerArg(&evNew->argv[evNew->argc], sndr);
602 AG_ThreadCreate(&th, EventThread, evNew);
603 } else
604 #endif /* AG_THREADS */
605 {
606 AG_Event tmpev;
607
608 memcpy(&tmpev, event, sizeof(AG_Event));
609 InitPointerArg(&tmpev.argv[0], rcvr);
610 InitPointerArg(&tmpev.argv[tmpev.argc], sndr);
611
612 if (ev->flags & AG_EVENT_PROPAGATE) {
613 #ifdef AG_DEBUG_CORE
614 if (agDebugLvl >= 2)
615 Debug(rcvr, "Propagate <%s> (forward)\n", event->name);
616 #endif
617 AG_LockVFS(rcvr);
618 OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
619 PropagateEvent(rcvr, chld, ev);
620 }
621 AG_UnlockVFS(rcvr);
622 }
623 /* XXX AG_EVENT_ASYNC.. */
624 if (ev->fn.fnVoid != NULL)
625 ev->fn.fnVoid(&tmpev);
626 }
627 }
628 AG_ObjectUnlock(rcvr);
629 }
630
631 #ifdef HAVE_KQUEUE
632 static __inline__ int
GrowKqChangelist(AG_EventSourceKQUEUE * kq,Uint n)633 GrowKqChangelist(AG_EventSourceKQUEUE *kq, Uint n)
634 {
635 struct kevent *changesNew;
636
637 if (n <= kq->maxChanges) {
638 return (0);
639 }
640 if ((changesNew = TryRealloc(kq->changes, n*sizeof(struct kevent)))
641 == NULL) {
642 return (-1);
643 }
644 kq->changes = changesNew;
645 kq->maxChanges = n;
646 return (0);
647 }
648 #endif /* HAVE_KQUEUE */
649
650 /* Create a new event source. */
651 static AG_EventSource *
CreateEventSource(void)652 CreateEventSource(void)
653 {
654 #ifdef HAVE_KQUEUE
655 AG_EventSourceKQUEUE *kq = TryMalloc(sizeof(AG_EventSourceKQUEUE));
656 AG_EventSource *src = (AG_EventSource *)kq;
657 #else
658 AG_EventSource *src = TryMalloc(sizeof(AG_EventSource));
659 #endif
660 if (src == NULL) {
661 return (NULL);
662 }
663 src->flags = 0;
664 src->addTimerFn = NULL;
665 src->delTimerFn = NULL;
666 src->breakReq = 0;
667 src->returnCode = 0;
668 TAILQ_INIT(&src->prologues);
669 TAILQ_INIT(&src->epilogues);
670 TAILQ_INIT(&src->spinners);
671 TAILQ_INIT(&src->sinks);
672 memset(src->caps, 0, sizeof(src->caps));
673
674 #if defined(HAVE_KQUEUE)
675 if ((kq->fd = kqueue()) == -1) {
676 AG_SetError("kqueue: %s", AG_Strerror(errno));
677 return (NULL);
678 }
679 kq->changes = NULL;
680 kq->nChanges = 0;
681 kq->maxChanges = 0;
682 memset(kq->events, 0, EVBUFSIZE*sizeof(struct kevent));
683 src->sinkFn = AG_EventSinkKQUEUE;
684 src->addTimerFn = AG_AddTimerKQUEUE;
685 src->delTimerFn = AG_DelTimerKQUEUE;
686 src->caps[AG_SINK_TIMER] = 1; /* Provides timers internally */
687 src->caps[AG_SINK_READ] = 1;
688 src->caps[AG_SINK_WRITE] = 1;
689 src->caps[AG_SINK_FSEVENT] = 1;
690 src->caps[AG_SINK_PROCEVENT] = 1;
691 GrowKqChangelist(kq, 64); /* Preallocate */
692 #elif defined(HAVE_TIMERFD)
693 src->sinkFn = AG_EventSinkTIMERFD;
694 src->addTimerFn = AG_AddTimerTIMERFD;
695 src->delTimerFn = AG_DelTimerTIMERFD;
696 src->caps[AG_SINK_TIMER] = 1; /* Provides timers internally */
697 src->caps[AG_SINK_READ] = 1;
698 src->caps[AG_SINK_WRITE] = 1;
699 #elif defined(HAVE_SELECT) && !defined(AG_THREADS)
700 src->sinkFn = AG_EventSinkTIMEDSELECT;
701 src->caps[AG_SINK_READ] = 1;
702 src->caps[AG_SINK_WRITE] = 1;
703 #elif defined(HAVE_SELECT) && defined(AG_THREADS)
704 src->sinkFn = AG_EventSinkSELECT;
705 src->caps[AG_SINK_READ] = 1;
706 src->caps[AG_SINK_WRITE] = 1;
707 #else
708 src->sinkFn = AG_EventSinkSPINNER;
709 #endif
710 if (agSoftTimers) { /* Force soft timers */
711 src->addTimerFn = NULL;
712 src->delTimerFn = NULL;
713 src->caps[AG_SINK_TIMER] = 0;
714 }
715 return (src);
716 }
717
718 static void
DestroyEventSource(void * pEventSource)719 DestroyEventSource(void *pEventSource)
720 {
721 AG_EventSource *src = pEventSource;
722 AG_EventSink *es, *esNext;
723
724 if (agEventSource == NULL)
725 return;
726
727 #ifdef HAVE_KQUEUE
728 {
729 AG_EventSourceKQUEUE *kq = pEventSource;
730
731 if (kq->fd != -1) {
732 close(kq->fd);
733 }
734 Free(kq->changes);
735 }
736 #endif
737 for (es = TAILQ_FIRST(&src->prologues); es != TAILQ_END(&src->prologues); es = esNext) {
738 esNext = TAILQ_NEXT(es, sinks);
739 free(es);
740 }
741 for (es = TAILQ_FIRST(&src->epilogues); es != TAILQ_END(&src->epilogues); es = esNext) {
742 esNext = TAILQ_NEXT(es, sinks);
743 free(es);
744 }
745 for (es = TAILQ_FIRST(&src->spinners); es != TAILQ_END(&src->spinners); es = esNext) {
746 esNext = TAILQ_NEXT(es, sinks);
747 free(es);
748 }
749 for (es = TAILQ_FIRST(&src->sinks); es != TAILQ_END(&src->sinks); es = esNext) {
750 esNext = TAILQ_NEXT(es, sinks);
751 free(es);
752 }
753 free(src);
754 }
755
756 /* Return the calling thread's effective event source. */
757 AG_EventSource *
AG_GetEventSource(void)758 AG_GetEventSource(void)
759 {
760 AG_EventSource *src;
761
762 #ifdef AG_THREADS
763 if ((src = AG_ThreadKeyGet(agEventSourceKey)) != NULL && src != NULL)
764 return (src);
765 #else
766 if (agEventSource != NULL)
767 return (agEventSource);
768 #endif
769 if ((src = CreateEventSource()) == NULL)
770 AG_FatalError(NULL);
771 #ifdef AG_THREADS
772 AG_ThreadKeySet(agEventSourceKey, src);
773 #else
774 agEventSource = src;
775 #endif
776 return (src);
777 }
778
779 int
AG_InitEventSubsystem(Uint flags)780 AG_InitEventSubsystem(Uint flags)
781 {
782 /* Initialize the main thread's event source. */
783 agEventSource = NULL;
784 #ifdef AG_THREADS
785 if (AG_ThreadKeyTryCreate(&agEventSourceKey, DestroyEventSource) == -1)
786 return (-1);
787 #endif
788 if ((agEventSource = AG_GetEventSource()) == NULL) {
789 return (-1);
790 }
791 return (0);
792 }
793
794 void
AG_DestroyEventSubsystem(void)795 AG_DestroyEventSubsystem(void)
796 {
797 if (agEventSource != NULL) {
798 DestroyEventSource(agEventSource);
799 agEventSource = NULL;
800 }
801 }
802
803 #ifdef HAVE_KQUEUE
804 /*
805 * Routines for translating between AG_EventSink and kqueue types.
806 */
807 static __inline__ enum ag_event_sink_type
GetSinkType(int filter)808 GetSinkType(int filter)
809 {
810 switch (filter) {
811 case EVFILT_TIMER: return (AG_SINK_TIMER);
812 case EVFILT_READ: return (AG_SINK_READ);
813 case EVFILT_WRITE: return (AG_SINK_WRITE);
814 case EVFILT_VNODE: return (AG_SINK_FSEVENT);
815 case EVFILT_PROC: return (AG_SINK_PROCEVENT);
816 default: return (AG_SINK_NONE);
817 }
818 }
819 static Uint
GetKqFilterFlags(Uint flags)820 GetKqFilterFlags(Uint flags)
821 {
822 Uint fflags = 0;
823 if (flags & AG_FSEVENT_DELETE) { fflags |= NOTE_DELETE; }
824 if (flags & AG_FSEVENT_WRITE) { fflags |= NOTE_WRITE; }
825 if (flags & AG_FSEVENT_EXTEND) { fflags |= NOTE_EXTEND; }
826 if (flags & AG_FSEVENT_ATTRIB) { fflags |= NOTE_ATTRIB; }
827 if (flags & AG_FSEVENT_LINK) { fflags |= NOTE_LINK; }
828 if (flags & AG_FSEVENT_RENAME) { fflags |= NOTE_RENAME; }
829 if (flags & AG_FSEVENT_REVOKE) { fflags |= NOTE_REVOKE; }
830 if (flags & AG_PROCEVENT_EXIT) { fflags |= NOTE_EXIT; }
831 if (flags & AG_PROCEVENT_FORK) { fflags |= NOTE_FORK; }
832 if (flags & AG_PROCEVENT_EXEC) { fflags |= NOTE_EXEC; }
833 return (fflags);
834 }
835 static Uint
GetSinkFlags(Uint fflags)836 GetSinkFlags(Uint fflags)
837 {
838 Uint flags = 0;
839 if (fflags & NOTE_DELETE) { fflags |= AG_FSEVENT_DELETE; }
840 if (fflags & NOTE_WRITE) { fflags |= AG_FSEVENT_WRITE; }
841 if (fflags & NOTE_EXTEND) { fflags |= AG_FSEVENT_EXTEND; }
842 if (fflags & NOTE_ATTRIB) { fflags |= AG_FSEVENT_ATTRIB; }
843 if (fflags & NOTE_LINK) { fflags |= AG_FSEVENT_LINK; }
844 if (fflags & NOTE_RENAME) { fflags |= AG_FSEVENT_RENAME; }
845 if (fflags & NOTE_REVOKE) { fflags |= AG_FSEVENT_REVOKE; }
846 if (fflags & NOTE_EXIT) { fflags |= AG_PROCEVENT_EXIT; }
847 if (fflags & NOTE_FORK) { fflags |= AG_PROCEVENT_FORK; }
848 if (fflags & NOTE_EXEC) { fflags |= AG_PROCEVENT_EXEC; }
849 return (flags);
850 }
851 #endif /* HAVE_KQUEUE */
852
853 /*
854 * Add/remove an event processing prologue. The function will be invoked
855 * only once at the beginning of AG_EventLoop().
856 */
857 AG_EventSink *
AG_AddEventPrologue(AG_EventSinkFn fn,const char * fnArgs,...)858 AG_AddEventPrologue(AG_EventSinkFn fn, const char *fnArgs, ...)
859 {
860 AG_EventSource *src = AG_GetEventSource();
861 AG_EventSink *es;
862
863 if ((es = TryMalloc(sizeof(AG_EventSink))) == NULL) {
864 return (NULL);
865 }
866 es->type = AG_SINK_PROLOGUE;
867 es->fn = fn;
868 InitEvent(&es->fnArgs, NULL);
869 AG_EVENT_GET_ARGS(&es->fnArgs, fnArgs);
870 es->fnArgs.argc0 = es->fnArgs.argc;
871 TAILQ_INSERT_TAIL(&src->prologues, es, sinks);
872 return (es);
873 }
874 void
AG_DelEventPrologue(AG_EventSink * es)875 AG_DelEventPrologue(AG_EventSink *es)
876 {
877 AG_EventSource *src = AG_GetEventSource();
878
879 #ifdef AG_DEBUG
880 if (es->type != AG_SINK_PROLOGUE)
881 AG_FatalError("AG_DelEventPrologue");
882 #endif
883 TAILQ_REMOVE(&src->prologues, es, sinks);
884 free(es);
885 }
886
887 /*
888 * Add/remove an event sink epilogue. The function will be invoked
889 * after all incoming / queued events have been processed.
890 */
891 AG_EventSink *
AG_AddEventEpilogue(AG_EventSinkFn fn,const char * fnArgs,...)892 AG_AddEventEpilogue(AG_EventSinkFn fn, const char *fnArgs, ...)
893 {
894 AG_EventSource *src = AG_GetEventSource();
895 AG_EventSink *es;
896
897 if ((es = TryMalloc(sizeof(AG_EventSink))) == NULL) {
898 return (NULL);
899 }
900 es->type = AG_SINK_EPILOGUE;
901 es->fn = fn;
902 InitEvent(&es->fnArgs, NULL);
903 AG_EVENT_GET_ARGS(&es->fnArgs, fnArgs);
904 es->fnArgs.argc0 = es->fnArgs.argc;
905 TAILQ_INSERT_TAIL(&src->epilogues, es, sinks);
906 return (es);
907 }
908 void
AG_DelEventEpilogue(AG_EventSink * es)909 AG_DelEventEpilogue(AG_EventSink *es)
910 {
911 AG_EventSource *src = AG_GetEventSource();
912
913 #ifdef AG_DEBUG
914 if (es->type != AG_SINK_EPILOGUE)
915 AG_FatalError("AG_DelEventEpilogue");
916 #endif
917 TAILQ_REMOVE(&src->epilogues, es, sinks);
918 free(es);
919 }
920
921 /*
922 * Add/remove a spinning event sink. If at least one spinning sink exists,
923 * AG_EventLoop() will invoke it repeatedly and force non-blocking operation
924 * of subsequent polling methods.
925 */
926 AG_EventSink *
AG_AddEventSpinner(AG_EventSinkFn fn,const char * fnArgs,...)927 AG_AddEventSpinner(AG_EventSinkFn fn, const char *fnArgs, ...)
928 {
929 AG_EventSource *src = AG_GetEventSource();
930 AG_EventSink *es;
931
932 if ((es = TryMalloc(sizeof(AG_EventSink))) == NULL) {
933 return (NULL);
934 }
935 es->type = AG_SINK_SPINNER;
936 es->fn = fn;
937 InitEvent(&es->fnArgs, NULL);
938 AG_EVENT_GET_ARGS(&es->fnArgs, fnArgs);
939 es->fnArgs.argc0 = es->fnArgs.argc;
940 TAILQ_INSERT_TAIL(&src->spinners, es, sinks);
941 return (es);
942 }
943 void
AG_DelEventSpinner(AG_EventSink * es)944 AG_DelEventSpinner(AG_EventSink *es)
945 {
946 AG_EventSource *src = AG_GetEventSource();
947
948 #ifdef AG_DEBUG
949 if (es->type != AG_SINK_SPINNER)
950 AG_FatalError("AG_DelEventSpinner");
951 #endif
952 TAILQ_REMOVE(&src->spinners, es, sinks);
953 free(es);
954 }
955
956 /*
957 * Add/remove a low-level event sink. The function will be called
958 * whenever the specified event occurs.
959 */
960 AG_EventSink *
AG_AddEventSink(enum ag_event_sink_type type,int ident,Uint flags,AG_EventSinkFn fn,const char * fnArgs,...)961 AG_AddEventSink(enum ag_event_sink_type type, int ident, Uint flags,
962 AG_EventSinkFn fn, const char *fnArgs, ...)
963 {
964 AG_EventSource *src = AG_GetEventSource();
965 AG_EventSink *es;
966 #ifdef HAVE_KQUEUE
967 AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)src;
968 struct kevent *kev;
969 #endif
970 if (type >= AG_SINK_LAST || !src->caps[type]) {
971 AG_SetError("Unsupported event type: %u", (Uint)type);
972 return (NULL);
973 }
974 if ((es = TryMalloc(sizeof(AG_EventSink))) == NULL) {
975 return (NULL);
976 }
977 es->type = type;
978 es->ident = ident;
979 es->flags = flags;
980
981 #ifdef HAVE_KQUEUE
982 if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
983 free(es);
984 return (NULL);
985 }
986 kev = &kq->changes[kq->nChanges++];
987 switch (type) {
988 case AG_SINK_READ:
989 AG_EV_SET(kev, ident, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, es);
990 break;
991 case AG_SINK_WRITE:
992 AG_EV_SET(kev, ident, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, es);
993 break;
994 case AG_SINK_FSEVENT:
995 AG_EV_SET(kev, ident, EVFILT_VNODE, EV_ADD|EV_ENABLE,
996 GetKqFilterFlags(flags), 0, es);
997 break;
998 case AG_SINK_PROCEVENT:
999 AG_EV_SET(kev, ident, EVFILT_PROC, EV_ADD|EV_ENABLE,
1000 GetKqFilterFlags(flags), 0, es);
1001 break;
1002 default:
1003 kq->nChanges--;
1004 break;
1005 }
1006 #endif /* HAVE_KQUEUE */
1007
1008 es->fn = fn;
1009 InitEvent(&es->fnArgs, NULL);
1010 AG_EVENT_GET_ARGS(&es->fnArgs, fnArgs);
1011 es->fnArgs.argc0 = es->fnArgs.argc;
1012 TAILQ_INSERT_TAIL(&src->sinks, es, sinks);
1013 return (es);
1014 }
1015 void
AG_DelEventSink(AG_EventSink * es)1016 AG_DelEventSink(AG_EventSink *es)
1017 {
1018 AG_EventSource *src = AG_GetEventSource();
1019 #ifdef HAVE_KQUEUE
1020 AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)src;
1021 struct kevent *kev;
1022
1023 if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
1024 AG_FatalError(NULL);
1025 }
1026 kev = &kq->changes[kq->nChanges++];
1027
1028 switch (es->type) {
1029 case AG_SINK_READ:
1030 AG_EV_SET(kev, es->ident, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1031 break;
1032 case AG_SINK_WRITE:
1033 AG_EV_SET(kev, es->ident, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
1034 break;
1035 case AG_SINK_FSEVENT:
1036 AG_EV_SET(kev, es->ident, EVFILT_VNODE, EV_DELETE,
1037 GetKqFilterFlags(es->flags), 0, NULL);
1038 break;
1039 case AG_SINK_PROCEVENT:
1040 AG_EV_SET(kev, es->ident, EVFILT_PROC, EV_DELETE,
1041 GetKqFilterFlags(es->flags), 0, NULL);
1042 break;
1043 default:
1044 kq->nChanges--;
1045 break;
1046 }
1047 #endif /* HAVE_KQUEUE */
1048
1049 TAILQ_REMOVE(&src->sinks, es, sinks);
1050 free(es);
1051 }
1052 void
AG_DelEventSinksByIdent(enum ag_event_sink_type type,int ident,Uint flags)1053 AG_DelEventSinksByIdent(enum ag_event_sink_type type, int ident, Uint flags)
1054 {
1055 AG_EventSource *src = AG_GetEventSource();
1056 AG_EventSink *es, *esNext;
1057
1058 for (es = TAILQ_FIRST(&src->sinks); es != TAILQ_END(&src->sinks); es = esNext) {
1059 esNext = TAILQ_NEXT(es, sinks);
1060 if (es->type == type &&
1061 es->ident == ident &&
1062 es->flags == flags)
1063 AG_DelEventSink(es);
1064 }
1065 }
1066
1067 #ifdef HAVE_KQUEUE
1068 /*
1069 * Standard event sink using kqueue(2), commonly found on modern BSD
1070 * derived operating systems.
1071 */
1072 int
AG_EventSinkKQUEUE(void)1073 AG_EventSinkKQUEUE(void)
1074 {
1075 AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)agEventSource;
1076 int rv, i;
1077 struct timespec timeo, *pTimeo;
1078
1079 restart:
1080 if (!TAILQ_EMPTY(&agEventSource->spinners)) {
1081 timeo.tv_sec = 0;
1082 timeo.tv_nsec = 0L;
1083 pTimeo = &timeo;
1084 } else {
1085 pTimeo = NULL;
1086 }
1087 #ifdef DEBUG_TIMERS
1088 for (i = 0; i < kq->nChanges; i++) {
1089 struct kevent *chg = &kq->changes[i];
1090 Verbose("changes[%d]: f=%d i=%u f=0x%x ff=0x%x u=%p\n",
1091 i, (int)chg->filter,
1092 (Uint)chg->ident, chg->flags, chg->fflags,
1093 chg->udata);
1094 }
1095 #endif
1096 rv = kevent(kq->fd, kq->changes, kq->nChanges, kq->events, EVBUFSIZE,
1097 pTimeo);
1098 if (rv < 0) {
1099 if (errno == EINTR) {
1100 goto restart;
1101 }
1102 AG_SetError("kevent(): %s", AG_Strerror(errno));
1103 return (-1);
1104 }
1105 kq->nChanges = 0;
1106
1107 /* 1. Process timer expirations. */
1108 AG_LockTiming();
1109 for (i = 0; i < rv; i++) {
1110 struct kevent *kev = &kq->events[i];
1111 enum ag_event_sink_type esType = GetSinkType(kev->filter);
1112 Uint32 rvt;
1113 AG_Timer *to;
1114 AG_Object *ob;
1115
1116 if (kev->flags & EV_ERROR) {
1117 Verbose("kevent (%ld,%d): %s\n", kev->ident, kev->filter,
1118 AG_Strerror((int)kev->data));
1119 continue;
1120 }
1121 if (esType != AG_SINK_TIMER ||
1122 (to = (AG_Timer *)kev->udata) == NULL) {
1123 continue;
1124 }
1125 rvt = to->fn(to, &to->fnEvent);
1126 if (rvt > 0) { /* Restart timer */
1127 struct kevent *kev;
1128 #ifdef DEBUG_TIMERS
1129 Verbose("TIMER[%d] resetting t=+%u\n", to->id, (Uint)rvt);
1130 #endif
1131 if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
1132 AG_UnlockTiming();
1133 return (-1);
1134 }
1135 kev = &kq->changes[kq->nChanges++];
1136 AG_EV_SET(kev, to->id, EVFILT_TIMER,
1137 EV_ADD|EV_ENABLE|EV_ONESHOT, 0, (int)rvt, to);
1138 to->ival = rvt;
1139 } else { /* Expire */
1140 #ifdef DEBUG_TIMERS
1141 Verbose("TIMER[%d] expired\n", to->id);
1142 #endif
1143 if ((ob = to->obj) == NULL) {
1144 continue;
1145 }
1146 TAILQ_REMOVE(&ob->timers, to, timers);
1147 if (TAILQ_EMPTY(&ob->timers)) {
1148 TAILQ_REMOVE(&agTimerObjQ, ob, tobjs);
1149 }
1150 if (to->flags & AG_TIMER_AUTO_FREE) {
1151 free(to);
1152 } else {
1153 to->ival = 0;
1154 to->id = -1;
1155 to->obj = NULL;
1156 }
1157 agTimerCount--;
1158 }
1159 }
1160 AG_UnlockTiming();
1161
1162 /* 2. Process I/O and other events. */
1163 for (i = 0; i < rv; i++) {
1164 struct kevent *kev = &kq->events[i];
1165 enum ag_event_sink_type esType = GetSinkType(kev->filter);
1166 AG_EventSink *es;
1167
1168 switch (esType) {
1169 case AG_SINK_READ:
1170 case AG_SINK_WRITE:
1171 es = (AG_EventSink *)kev->udata;
1172 es->fn(es, &es->fnArgs);
1173 break;
1174 case AG_SINK_FSEVENT:
1175 case AG_SINK_PROCEVENT:
1176 es = (AG_EventSink *)kev->udata;
1177 es->flagsMatched = GetSinkFlags(kev->fflags);
1178 es->fn(es, &es->fnArgs);
1179 break;
1180 default:
1181 break;
1182 }
1183 }
1184 return (0);
1185 }
1186
1187 /*
1188 * Add/remove a kqueue(2) based timer.
1189 */
1190 static int
GenerateTimerID(AG_Timer * to)1191 GenerateTimerID(AG_Timer *to)
1192 {
1193 AG_Object *obOther;
1194 AG_Timer *toOther;
1195 int id;
1196
1197 gen_id:
1198 #ifdef AG_DEBUG
1199 if (agTimerCount+1 >= (AG_INT_MAX-1))
1200 AG_FatalError("agTimerCount");
1201 #endif
1202 id = (int)++agTimerCount; /* XXX */
1203 TAILQ_FOREACH(obOther, &agTimerObjQ, tobjs) {
1204 TAILQ_FOREACH(toOther, &obOther->timers, timers) {
1205 if (toOther == to) { continue; }
1206 if (toOther->id == id) {
1207 id++;
1208 goto gen_id;
1209 }
1210 }
1211 }
1212 return (id);
1213 }
1214 int
AG_AddTimerKQUEUE(AG_Timer * to,Uint32 ival,int newTimer)1215 AG_AddTimerKQUEUE(AG_Timer *to, Uint32 ival, int newTimer)
1216 {
1217 AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)agEventSource;
1218
1219 /* Create a kernel-based timer with kqueue. */
1220 if (newTimer) {
1221 to->id = GenerateTimerID(to);
1222 }
1223 if (newTimer || to->ival != ival) {
1224 struct kevent *kev;
1225 #ifdef DEBUG_TIMERS
1226 Verbose("kevent: creating timer ID=%d ival=%d\n", to->id, (int)ival);
1227 #endif
1228 if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
1229 return (-1);
1230 }
1231 kev = &kq->changes[kq->nChanges++];
1232 AG_EV_SET(kev, to->id, EVFILT_TIMER,
1233 EV_ADD|EV_ENABLE|EV_ONESHOT, 0, (int)ival, to);
1234 to->ival = ival;
1235 }
1236 return (0);
1237 }
1238 void
AG_DelTimerKQUEUE(AG_Timer * to)1239 AG_DelTimerKQUEUE(AG_Timer *to)
1240 {
1241 AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)agEventSource;
1242 struct kevent *kev;
1243
1244 if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
1245 AG_FatalError(NULL);
1246 }
1247 kev = &kq->changes[kq->nChanges++];
1248 AG_EV_SET(kev, to->id, EVFILT_TIMER, EV_DELETE,
1249 0, 0, NULL);
1250 agTimerCount--;
1251 }
1252
1253 #endif /* HAVE_KQUEUE */
1254
1255 #ifdef HAVE_TIMERFD
1256 /*
1257 * Standard event sink using select(2) and fd-based timers,
1258 * usually available on Linux.
1259 */
1260 int
AG_EventSinkTIMERFD(void)1261 AG_EventSinkTIMERFD(void)
1262 {
1263 fd_set rdFds, wrFds;
1264 int nFds, rv;
1265 AG_EventSink *es;
1266 AG_Object *ob, *obNext;
1267 AG_Timer *to, *toNext;
1268 struct timeval timeo, *pTimeo;
1269
1270 restart:
1271 nFds = 0;
1272 FD_ZERO(&rdFds);
1273 FD_ZERO(&wrFds);
1274 TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1275 switch (es->type) {
1276 case AG_SINK_READ:
1277 FD_SET(es->ident, &rdFds);
1278 if (es->ident > nFds) { nFds = es->ident; }
1279 break;
1280 case AG_SINK_WRITE:
1281 FD_SET(es->ident, &wrFds);
1282 if (es->ident > nFds) { nFds = es->ident; }
1283 break;
1284 }
1285 }
1286 TAILQ_FOREACH(ob, &agTimerObjQ, tobjs) {
1287 TAILQ_FOREACH(to, &ob->timers, timers) {
1288 FD_SET(to->id, &rdFds);
1289 if (to->id > nFds) { nFds = to->id; }
1290 }
1291 }
1292 if (!TAILQ_EMPTY(&agEventSource->spinners)) {
1293 timeo.tv_sec = 0;
1294 timeo.tv_usec = 0;
1295 pTimeo = &timeo;
1296 } else {
1297 pTimeo = NULL;
1298 }
1299 rv = select(nFds+1, &rdFds, &wrFds, NULL, pTimeo);
1300 if (rv == -1) {
1301 if (errno == EINTR) {
1302 goto restart;
1303 }
1304 AG_SetError("select: %s", AG_Strerror(errno));
1305 return (-1);
1306 }
1307
1308 AG_LockTiming();
1309
1310 /* 1. Process timer expirations. */
1311 for (ob = TAILQ_FIRST(&agTimerObjQ);
1312 ob != TAILQ_END(&agTimerObjQ);
1313 ob = obNext) {
1314 obNext = TAILQ_NEXT(ob, tobjs);
1315 AG_ObjectLock(ob);
1316 for (to = TAILQ_FIRST(&ob->timers);
1317 to != TAILQ_END(&ob->timers);
1318 to = toNext) {
1319 struct itimerspec its;
1320 Uint32 rvt;
1321
1322 toNext = TAILQ_NEXT(to, timers);
1323 if (!FD_ISSET(to->id, &rdFds)) {
1324 continue;
1325 }
1326 rvt = to->fn(to, &to->fnEvent);
1327 if (rvt > 0) {
1328 its.it_value.tv_sec = rvt/1000;
1329 its.it_value.tv_nsec = (rvt % 1000)*1000000L;
1330 its.it_interval.tv_sec = 0;
1331 its.it_interval.tv_nsec = 0L;
1332 if (timerfd_settime(to->id, 0, &its, NULL) == -1) {
1333 Verbose("timerfd_settime: %s\n", AG_Strerror(errno));
1334 FD_CLR(to->id, &rdFds);
1335 AG_DelTimer(ob, to);
1336 }
1337 } else {
1338 FD_CLR(to->id, &rdFds);
1339 AG_DelTimer(ob, to);
1340 }
1341 }
1342 AG_ObjectUnlock(ob);
1343 }
1344
1345 /* 2. Process I/O events. */
1346 TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1347 switch (es->type) {
1348 case AG_SINK_READ:
1349 if (FD_ISSET(es->ident, &rdFds)) {
1350 es->fn(es, &es->fnArgs);
1351 }
1352 break;
1353 case AG_SINK_WRITE:
1354 if (FD_ISSET(es->ident, &wrFds)) {
1355 es->fn(es, &es->fnArgs);
1356 }
1357 break;
1358 }
1359 }
1360
1361 AG_UnlockTiming();
1362 return (0);
1363 }
1364
1365 /*
1366 * Add/remove a fd-based timer.
1367 */
1368 int
AG_AddTimerTIMERFD(AG_Timer * to,Uint32 ival,int newTimer)1369 AG_AddTimerTIMERFD(AG_Timer *to, Uint32 ival, int newTimer)
1370 {
1371 struct itimerspec its;
1372
1373 if (newTimer) {
1374 /* Create a timerfd. Store the file descriptor as ID. */
1375 if ((to->id = timerfd_create(CLOCK_MONOTONIC, 0)) == -1) {
1376 AG_SetError("timerfd_create: %s", AG_Strerror(errno));
1377 return (-1);
1378 }
1379 }
1380 its.it_value.tv_sec = ival/1000;
1381 its.it_value.tv_nsec = (ival % 1000)*1000000L;
1382 its.it_interval.tv_sec = 0;
1383 its.it_interval.tv_nsec = 0L;
1384 if (timerfd_settime(to->id, 0, &its, NULL) == -1) {
1385 close(to->id);
1386 AG_SetError("timerfd_settime: %s", AG_Strerror(errno));
1387 return (-1);
1388 }
1389 to->ival = ival;
1390 return (0);
1391 }
1392 void
AG_DelTimerTIMERFD(AG_Timer * to)1393 AG_DelTimerTIMERFD(AG_Timer *to)
1394 {
1395 #ifdef AG_DEBUG
1396 if (to->id == -1) { AG_FatalError("timerfd inconsistency"); }
1397 #endif
1398 close(to->id);
1399 }
1400 #endif /* HAVE_TIMERFD */
1401
1402 #if defined(HAVE_SELECT) && !defined(AG_THREADS)
1403 /*
1404 * Standard event sink using select(2) with timers implemented using the
1405 * select() timeout. Only available in single-threaded builds, since timers
1406 * cannot be added, restarted or removed from different threads with this
1407 * method.
1408 */
1409 int
AG_EventSinkTIMEDSELECT(void)1410 AG_EventSinkTIMEDSELECT(void)
1411 {
1412 fd_set rdFds, wrFds;
1413 int i, nFds, rv;
1414 AG_EventSink *es;
1415 AG_Object *ob, *obNext;
1416 AG_Timer *to, *toNext;
1417 struct timeval timeo, *pTimeo;
1418 Uint32 t, tSoonest;
1419
1420 restart:
1421 nFds = 0;
1422 FD_ZERO(&rdFds);
1423 FD_ZERO(&wrFds);
1424 TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1425 switch (es->type) {
1426 case AG_SINK_READ:
1427 FD_SET(es->ident, &rdFds);
1428 if (es->ident > nFds) { nFds = es->ident; }
1429 break;
1430 case AG_SINK_WRITE:
1431 FD_SET(es->ident, &wrFds);
1432 if (es->ident > nFds) { nFds = es->ident; }
1433 break;
1434 }
1435 }
1436
1437 if (!TAILQ_EMPTY(&agEventSource->spinners)) {
1438 timeo.tv_sec = 0;
1439 timeo.tv_usec = 0;
1440 } else {
1441 AG_LockTiming();
1442 t = AG_GetTicks();
1443 tSoonest = 0xfffffffe;
1444 TAILQ_FOREACH(ob, &agTimerObjQ, tobjs) {
1445 TAILQ_FOREACH(to, &ob->timers, timers) {
1446 if ((to->tSched - t) < tSoonest)
1447 tSoonest = (to->tSched - t);
1448 }
1449 }
1450 timeo.tv_sec = tSoonest/1000;
1451 timeo.tv_usec = (tSoonest % 1000)*1000;
1452 AG_UnlockTiming();
1453 }
1454 rv = select(nFds+1, &rdFds, &wrFds, NULL, &timeo);
1455 if (rv == -1) {
1456 if (errno == EINTR) {
1457 goto restart;
1458 }
1459 AG_SetError("select: %s", AG_Strerror(errno));
1460 return (-1);
1461 }
1462
1463 AG_LockTiming();
1464 /* 1. Process timer expirations. */
1465 AG_ProcessTimeouts(t);
1466 if (rv > 0) {
1467 /* 2. Process I/O events */
1468 TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1469 switch (es->type) {
1470 case AG_SINK_READ:
1471 if (FD_ISSET(es->ident, &rdFds)) {
1472 es->fn(es, &es->fnArgs);
1473 }
1474 break;
1475 case AG_SINK_WRITE:
1476 if (FD_ISSET(es->ident, &wrFds)) {
1477 es->fn(es, &es->fnArgs);
1478 }
1479 break;
1480 }
1481 }
1482 }
1483 AG_UnlockTiming();
1484 return (0);
1485 }
1486 #endif /* HAVE_SELECT and !AG_THREADS */
1487
1488 #if defined(HAVE_SELECT) && defined(AG_THREADS)
1489 /*
1490 * Standard event sink using non-blocking select(2) with timers implemented
1491 * with a delay loop. This is inefficient, but on some platforms, it is the
1492 * only thread-safe option.
1493 */
1494 int
AG_EventSinkSELECT(void)1495 AG_EventSinkSELECT(void)
1496 {
1497 fd_set rdFds, wrFds;
1498 int nFds, rv;
1499 AG_EventSink *es;
1500 struct timeval timeo;
1501
1502 nFds = 0;
1503 FD_ZERO(&rdFds);
1504 FD_ZERO(&wrFds);
1505
1506 TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1507 switch (es->type) {
1508 case AG_SINK_READ:
1509 FD_SET(es->ident, &rdFds);
1510 if (es->ident > nFds) { nFds = es->ident; }
1511 break;
1512 case AG_SINK_WRITE:
1513 FD_SET(es->ident, &wrFds);
1514 if (es->ident > nFds) { nFds = es->ident; }
1515 break;
1516 default:
1517 break;
1518 }
1519 }
1520
1521 restart:
1522 timeo.tv_sec = 0;
1523 timeo.tv_usec = 0;
1524 rv = select(nFds+1, &rdFds, &wrFds, NULL, &timeo);
1525 if (rv == -1) {
1526 if (errno == EINTR) {
1527 goto restart;
1528 }
1529 AG_SetError("select: %s", AG_Strerror(errno));
1530 return (-1);
1531 }
1532
1533 AG_LockTiming();
1534 /* 1. Process timer expirations. */
1535 AG_ProcessTimeouts(AG_GetTicks());
1536 if (rv > 0) {
1537 /* 2. Process I/O events. */
1538 TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1539 switch (es->type) {
1540 case AG_SINK_READ:
1541 if (FD_ISSET(es->ident, &rdFds)) {
1542 es->fn(es, &es->fnArgs);
1543 }
1544 break;
1545 case AG_SINK_WRITE:
1546 if (FD_ISSET(es->ident, &wrFds)) {
1547 es->fn(es, &es->fnArgs);
1548 }
1549 break;
1550 }
1551 }
1552 }
1553 AG_UnlockTiming();
1554
1555 if (TAILQ_EMPTY(&agEventSource->spinners)) {
1556 AG_Delay(1);
1557 }
1558 return (0);
1559 }
1560 #endif /* HAVE_SELECT and AG_THREADS */
1561
1562 /*
1563 * Fallback "spinning" event sink using a delay loop. This is inefficient,
1564 * but is the only option on some platforms.
1565 */
1566 int
AG_EventSinkSPINNER(void)1567 AG_EventSinkSPINNER(void)
1568 {
1569 AG_ProcessTimeouts(AG_GetTicks());
1570 AG_Delay(1);
1571 return (0);
1572 }
1573
1574 /*
1575 * Standard event loop routine. We loop over the event source and invoke
1576 * the related event sinks. AG_EventLoop() may be used outside of the
1577 * main thread (in which case, a new event source will be created).
1578 */
1579 int
AG_EventLoop(void)1580 AG_EventLoop(void)
1581 {
1582 AG_EventSource *src = AG_GetEventSource();
1583 AG_EventSink *es;
1584
1585 TAILQ_FOREACH(es, &src->prologues, sinks) {
1586 es->fn(es, &es->fnArgs);
1587 }
1588 for (;;) {
1589 TAILQ_FOREACH(es, &src->spinners, sinks) {
1590 es->fn(es, &es->fnArgs);
1591 }
1592 if (src->sinkFn() == -1) {
1593 return (1);
1594 }
1595 TAILQ_FOREACH(es, &src->epilogues, sinks) {
1596 es->fn(es, &es->fnArgs);
1597 }
1598 if (src->breakReq)
1599 break;
1600 }
1601 return (src->returnCode);
1602 }
1603
1604 /* Request that we break out of AG_EventLoop(). */
1605 void
AG_Terminate(int retCode)1606 AG_Terminate(int retCode)
1607 {
1608 AG_EventSource *src = AG_GetEventSource();
1609
1610 src->breakReq = 1;
1611 src->returnCode = retCode;
1612 }
1613 void
AG_TerminateEv(AG_Event * ev)1614 AG_TerminateEv(AG_Event *ev)
1615 {
1616 AG_EventSource *src = AG_GetEventSource();
1617
1618 if (ev->argc > 1 &&
1619 ev->argv[1].type == AG_VARIABLE_INT) {
1620 src->returnCode = ev->argv[1].data.i;
1621 } else {
1622 src->returnCode = 0;
1623 }
1624 src->breakReq = 1;
1625 }
1626