1 /*
2  * Copyright (c) 2001-2015 Hypertriton, Inc. <http://hypertriton.com/>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
15  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR
18  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
20  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
21  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
22  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
23  * USE OF THIS SOFTWARE EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  */
25 
26 /*
27  * Implementation of AG_Object events / virtual functions, as well as
28  * the generic AG_EventLoop(3) interface.
29  */
30 
31 #include <agar/core/core.h>
32 
33 #include <string.h>
34 #include <stdarg.h>
35 
36 #include <agar/config/have_kqueue.h>
37 #include <agar/config/have_timerfd.h>
38 #include <agar/config/have_select.h>
39 #include <agar/config/ag_debug_core.h>
40 
41 #if defined(HAVE_KQUEUE)
42 # ifdef __NetBSD__
43 #   define _NETBSD_SOURCE
44 # endif
45 # include <sys/types.h>
46 # include <sys/event.h>
47 # include <unistd.h>
48 # include <errno.h>
49 #endif
50 #if defined(HAVE_TIMERFD)
51 # include <sys/timerfd.h>
52 # include <errno.h>
53 #endif
54 #if defined(HAVE_SELECT)
55 # include <sys/types.h>
56 # include <sys/time.h>
57 # include <sys/select.h>
58 # include <unistd.h>
59 # include <errno.h>
60 #endif
61 
62 AG_EventSource *agEventSource = NULL;	/* Event source (thread-local) */
63 #ifdef AG_THREADS
64 AG_ThreadKey    agEventSourceKey;
65 #endif
66 
67 #ifdef HAVE_KQUEUE
68 #define EVBUFSIZE 2
69 typedef struct ag_event_source_kqueue {
70 	struct ag_event_source _inherit;
71 	int fd;					/* kqueue() fd */
72 	struct kevent *changes;			/* Queued changes */
73 	Uint          nChanges;
74 	Uint        maxChanges;
75 	struct kevent events[EVBUFSIZE];	/* Input event buffer */
76 } AG_EventSourceKQUEUE;
77 #endif /* HAVE_KQUEUE */
78 
79 /* #define DEBUG_TIMERS */
80 
81 #ifdef __NetBSD__
82 # define AG_EV_SET(kevp,a,b,c,d,e,f) EV_SET((kevp),(uintptr_t)(a),(b),(c),(d),(e),(intptr_t)(f))
83 #else
84 # define AG_EV_SET(kevp,a,b,c,d,e,f) EV_SET((kevp),(a),(b),(c),(d),(e),(f))
85 #endif
86 
87 /* Initialize a pointer argument. */
88 static __inline__ void
InitPointerArg(AG_Variable * V,void * p)89 InitPointerArg(AG_Variable *V, void *p)
90 {
91 	V->name[0] = '\0';
92 	V->type = AG_VARIABLE_POINTER;
93 	V->fn.fnVoid = NULL;
94 	V->mutex = NULL;
95 	V->data.p = p;
96 }
97 
98 static __inline__ void
InitEvent(AG_Event * ev,AG_Object * ob)99 InitEvent(AG_Event *ev, AG_Object *ob)
100 {
101 	ev->flags = 0;
102 	ev->argc = 1;
103 	ev->argc0 = 1;
104 	ev->fn.fnVoid = NULL;
105 	InitPointerArg(&ev->argv[0], ob);
106 }
107 
108 /* Initialize an AG_Event structure. */
109 void
AG_EventInit(AG_Event * ev)110 AG_EventInit(AG_Event *ev)
111 {
112 	InitEvent(ev, NULL);
113 }
114 
115 /* Initialize an AG_Event structure with the specified arguments. */
116 void
AG_EventArgs(AG_Event * ev,const char * fmt,...)117 AG_EventArgs(AG_Event *ev, const char *fmt, ...)
118 {
119 	InitEvent(ev, NULL);
120 	AG_EVENT_GET_ARGS(ev, fmt);
121 	ev->argc0 = ev->argc;
122 }
123 
124 /*
125  * Configure an event handler routine for a given event.
126  * If a handler routine already exists, replace it.
127  */
128 AG_Event *
AG_SetEvent(void * p,const char * name,AG_EventFn fn,const char * fmt,...)129 AG_SetEvent(void *p, const char *name, AG_EventFn fn, const char *fmt, ...)
130 {
131 	AG_Object *ob = p;
132 	AG_Event *ev;
133 
134 	AG_ObjectLock(ob);
135 
136 	if (name != NULL) {
137 		TAILQ_FOREACH(ev, &ob->events, events)
138 			if (strcmp(ev->name, name) == 0)
139 				break;
140 	} else {
141 		ev = NULL;
142 	}
143 	if (ev == NULL) {
144 		ev = Malloc(sizeof(AG_Event));
145 		InitEvent(ev, ob);
146 		if (name != NULL) {
147 			Strlcpy(ev->name, name, sizeof(ev->name));
148 		} else {
149 			ev->name[0] = '\0';
150 		}
151 		TAILQ_INSERT_TAIL(&ob->events, ev, events);
152 	} else {
153 		ev->argc = 1;
154 		ev->argc0 = 1;
155 	}
156 	InitPointerArg(&ev->argv[0], ob);
157 	ev->fn.fnVoid = fn;
158 	AG_EVENT_GET_ARGS(ev, fmt);
159 	ev->argc0 = ev->argc;
160 
161 	AG_ObjectUnlock(ob);
162 	return (ev);
163 }
164 
165 /*
166  * Configure an event handler routine for a given event.
167  * If a handler routine already exists, don't replace it.
168  */
169 AG_Event *
AG_AddEvent(void * p,const char * name,AG_EventFn fn,const char * fmt,...)170 AG_AddEvent(void *p, const char *name, AG_EventFn fn, const char *fmt, ...)
171 {
172 	AG_Object *ob = p;
173 	AG_Event *ev, *evOther;
174 
175 	AG_ObjectLock(ob);
176 
177 	ev = Malloc(sizeof(AG_Event));
178 	InitEvent(ev, ob);
179 
180 	if (name != NULL) {
181 		TAILQ_FOREACH(evOther, &ob->events, events) {
182 			if (strcmp(evOther->name, name) == 0)
183 				break;
184 		}
185 		if (evOther != NULL) {
186 			ev->flags = evOther->flags;
187 		}
188 		Strlcpy(ev->name, name, sizeof(ev->name));
189 	} else {
190 		ev->name[0] = '\0';
191 	}
192 
193 	ev->fn.fnVoid = fn;
194 	AG_EVENT_GET_ARGS(ev, fmt);
195 	ev->argc0 = ev->argc;
196 
197 	TAILQ_INSERT_TAIL(&ob->events, ev, events);
198 	AG_ObjectUnlock(ob);
199 	return (ev);
200 }
201 
202 /*
203  * Anonymous Function Constructors
204  */
205 #undef  AG_SET_TYPED_FN
206 #define AG_SET_TYPED_FN(memb)				\
207 	AG_Object *ob = p;				\
208 	AG_Event *ev;					\
209 							\
210 	ev = Malloc(sizeof(AG_Event));			\
211 	InitEvent(ev, ob);				\
212 	ev->name[0] = '\0';				\
213 	ev->fn.memb = fn;				\
214 	InitPointerArg(&ev->argv[0], ob);		\
215 	AG_EVENT_GET_ARGS(ev, fmt);			\
216 							\
217 	AG_ObjectLock(ob);				\
218 	TAILQ_INSERT_TAIL(&ob->events, ev, events);	\
219 	ev->argc0 = ev->argc;				\
220 	AG_ObjectUnlock(ob);				\
221 	return (AG_Function *)ev
222 
AG_SetVoidFn(void * p,AG_VoidFn fn,const char * fmt,...)223 AG_Function *AG_SetVoidFn(void *p, AG_VoidFn fn, const char *fmt, ...)		{ AG_SET_TYPED_FN(fnVoid);	}
AG_SetIntFn(void * p,AG_IntFn fn,const char * fmt,...)224 AG_Function *AG_SetIntFn(void *p, AG_IntFn fn, const char *fmt, ...)		{ AG_SET_TYPED_FN(fnInt);	}
AG_SetUint8Fn(void * p,AG_Uint8Fn fn,const char * fmt,...)225 AG_Function *AG_SetUint8Fn(void *p, AG_Uint8Fn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnUint8);	}
AG_SetSint8Fn(void * p,AG_Sint8Fn fn,const char * fmt,...)226 AG_Function *AG_SetSint8Fn(void *p, AG_Sint8Fn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnSint8);	}
AG_SetUint16Fn(void * p,AG_Uint16Fn fn,const char * fmt,...)227 AG_Function *AG_SetUint16Fn(void *p, AG_Uint16Fn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnUint16);	}
AG_SetSint16Fn(void * p,AG_Sint16Fn fn,const char * fmt,...)228 AG_Function *AG_SetSint16Fn(void *p, AG_Sint16Fn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnSint16);	}
AG_SetUint32Fn(void * p,AG_Uint32Fn fn,const char * fmt,...)229 AG_Function *AG_SetUint32Fn(void *p, AG_Uint32Fn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnUint32);	}
AG_SetSint32Fn(void * p,AG_Sint32Fn fn,const char * fmt,...)230 AG_Function *AG_SetSint32Fn(void *p, AG_Sint32Fn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnSint32);	}
231 #ifdef AG_HAVE_64BIT
AG_SetUint64Fn(void * p,AG_Uint64Fn fn,const char * fmt,...)232 AG_Function *AG_SetUint64Fn(void *p, AG_Uint64Fn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnUint64);	}
AG_SetSint64Fn(void * p,AG_Sint64Fn fn,const char * fmt,...)233 AG_Function *AG_SetSint64Fn(void *p, AG_Sint64Fn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnSint64);	}
234 #endif
AG_SetFloatFn(void * p,AG_FloatFn fn,const char * fmt,...)235 AG_Function *AG_SetFloatFn(void *p, AG_FloatFn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnFloat);	}
AG_SetDoubleFn(void * p,AG_DoubleFn fn,const char * fmt,...)236 AG_Function *AG_SetDoubleFn(void *p, AG_DoubleFn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnDouble);	}
237 #ifdef AG_HAVE_LONG_DOUBLE
AG_SetLongDoubleFn(void * p,AG_LongDoubleFn fn,const char * fmt,...)238 AG_Function *AG_SetLongDoubleFn(void *p, AG_LongDoubleFn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnLongDouble); }
239 #endif
AG_SetStringFn(void * p,AG_StringFn fn,const char * fmt,...)240 AG_Function *AG_SetStringFn(void *p, AG_StringFn fn, const char *fmt, ...)		{ AG_SET_TYPED_FN(fnString);	}
AG_SetPointerFn(void * p,AG_PointerFn fn,const char * fmt,...)241 AG_Function *AG_SetPointerFn(void *p, AG_PointerFn fn, const char *fmt, ...)		{ AG_SET_TYPED_FN(fnPointer);	}
AG_SetConstPointerFn(void * p,AG_ConstPointerFn fn,const char * fmt,...)242 AG_Function *AG_SetConstPointerFn(void *p, AG_ConstPointerFn fn, const char *fmt, ...)	{ AG_SET_TYPED_FN(fnConstPointer); }
AG_SetTextFn(void * p,AG_TextFn fn,const char * fmt,...)243 AG_Function *AG_SetTextFn(void *p, AG_TextFn fn, const char *fmt, ...)			{ AG_SET_TYPED_FN(fnText);	}
244 
245 #undef AG_SET_TYPED_FN
246 
247 /* Delete an event handler by name. */
248 void
AG_UnsetEvent(void * p,const char * name)249 AG_UnsetEvent(void *p, const char *name)
250 {
251 	AG_Object *ob = p;
252 	AG_Event *ev;
253 
254 	AG_ObjectLock(ob);
255 	TAILQ_FOREACH(ev, &ob->events, events) {
256 		if (strcmp(name, ev->name) == 0)
257 			break;
258 	}
259 	if (ev == NULL) {
260 		goto out;
261 	}
262 	TAILQ_REMOVE(&ob->events, ev, events);
263 	free(ev);
264 out:
265 	AG_ObjectUnlock(ob);
266 }
267 
268 /* Look up an AG_Event by name. */
269 AG_Event *
AG_FindEventHandler(void * p,const char * name)270 AG_FindEventHandler(void *p, const char *name)
271 {
272 	AG_Object *ob = p;
273 	AG_Event *ev;
274 
275 	AG_ObjectLock(ob);
276 	TAILQ_FOREACH(ev, &ob->events, events) {
277 		if (strcmp(name, ev->name) == 0)
278 			break;
279 	}
280 	AG_ObjectUnlock(ob);
281 	return (ev);
282 }
283 
284 /* Forward an event to an object's descendents. */
285 static void
PropagateEvent(AG_Object * sndr,AG_Object * rcvr,AG_Event * ev)286 PropagateEvent(AG_Object *sndr, AG_Object *rcvr, AG_Event *ev)
287 {
288 	AG_Object *chld;
289 
290 	OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
291 		PropagateEvent(rcvr, chld, ev);
292 	}
293 	AG_ForwardEvent(sndr, rcvr, ev);
294 }
295 
296 /* Timeout callback for scheduled events. */
297 static Uint32
EventTimeout(AG_Timer * to,AG_Event * event)298 EventTimeout(AG_Timer *to, AG_Event *event)
299 {
300 	AG_Object *ob = AG_SELF();
301 	AG_Object *obSender = AG_PTR(1);
302 	char *eventName = AG_STRING(2);
303 	AG_Event *ev;
304 
305 #ifdef AG_DEBUG_CORE
306 	if (agDebugLvl >= 2)
307 		Debug(ob, "Event <%s> timeout (%u ticks)\n", eventName, (Uint)to->ival);
308 #endif
309 	TAILQ_FOREACH(ev, &ob->events, events) {
310 		if (strcmp(eventName, ev->name) == 0)
311 			break;
312 	}
313 	if (ev == NULL) {
314 		return (0);
315 	}
316 	InitPointerArg(&ev->argv[ev->argc], obSender);
317 
318 	/* Propagate event to children. */
319 	if (ev->flags & AG_EVENT_PROPAGATE) {
320 		AG_Object *child;
321 #ifdef AG_DEBUG_CORE
322 		if (agDebugLvl >= 2)
323 			Debug(ob, "Propagate <%s> (timeout)\n", ev->name);
324 #endif
325 		AG_LockVFS(ob);
326 		OBJECT_FOREACH_CHILD(child, ob, ag_object) {
327 			PropagateEvent(ob, child, ev);
328 		}
329 		AG_UnlockVFS(ob);
330 	}
331 
332 	/* Invoke the event handler routine. */
333 	if (ev->fn.fnVoid != NULL) {
334 		ev->fn.fnVoid(ev);
335 	}
336 	return (0);
337 }
338 
339 
340 #ifdef AG_THREADS
341 /* Invoke an event handler routine asynchronously. */
342 static void *
EventThread(void * p)343 EventThread(void *p)
344 {
345 	AG_Event *eev = p;
346 	AG_Object *rcvr = eev->argv[0].data.p;
347 	AG_Object *chld;
348 
349 	if (eev->flags & AG_EVENT_PROPAGATE) {
350 #ifdef AG_DEBUG_CORE
351 		if (agDebugLvl >= 2)
352 			Debug(rcvr, "Propagate <%s> (async)\n", eev->name);
353 #endif
354 		AG_LockVFS(rcvr);
355 		OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
356 			PropagateEvent(rcvr, chld, eev);
357 		}
358 		AG_UnlockVFS(rcvr);
359 	}
360 #ifdef AG_DEBUG_CORE
361 	if (agDebugLvl >= 2)
362 		Debug(rcvr, "BEGIN event thread for <%s>\n", eev->name);
363 #endif
364 	if (eev->fn.fnVoid != NULL) {
365 		eev->fn.fnVoid(eev);
366 	}
367 #ifdef AG_DEBUG_CORE
368 	if (agDebugLvl >= 2)
369 		Debug(rcvr, "CLOSE event thread for <%s>\n", eev->name);
370 #endif
371 	free(eev);
372 	return (NULL);
373 }
374 #endif /* AG_THREADS */
375 
376 void
AG_InitEventQ(AG_EventQ * eq)377 AG_InitEventQ(AG_EventQ *eq)
378 {
379 	eq->nEvents = 0;
380 	eq->events = NULL;
381 }
382 
383 void
AG_FreeEventQ(AG_EventQ * eq)384 AG_FreeEventQ(AG_EventQ *eq)
385 {
386 	Free(eq->events);
387 	eq->nEvents = 0;
388 	eq->events = NULL;
389 }
390 
391 /* Add a new entry to an event queue. */
392 void
AG_QueueEvent(AG_EventQ * eq,const char * evname,const char * fmt,...)393 AG_QueueEvent(AG_EventQ *eq, const char *evname, const char *fmt, ...)
394 {
395 	AG_Event *ev;
396 
397 	eq->events = Realloc(eq->events, (eq->nEvents+1)*sizeof(AG_Event));
398 	ev = &eq->events[eq->nEvents++];
399 	InitEvent(ev, NULL);
400 	AG_EVENT_GET_ARGS(ev, fmt);
401 	ev->argc0 = ev->argc;
402 }
403 
404 /*
405  * Raise the specified event. Configured event handler routines may be
406  * called immediately, but they may also get called from a separate
407  * thread, or queued for later execution.
408  *
409  * The argument vector passed to the event handler function contains
410  * the AG_SetEvent() arguments, and any arguments specified here are
411  * appended to that list.
412  */
413 void
AG_PostEvent(void * sp,void * rp,const char * evname,const char * fmt,...)414 AG_PostEvent(void *sp, void *rp, const char *evname, const char *fmt, ...)
415 {
416 	AG_Object *sndr = sp;
417 	AG_Object *rcvr = rp;
418 	AG_Event *ev;
419 	AG_Object *chld;
420 	int propagated = 0;
421 
422 #ifdef AG_DEBUG_CORE
423 	if (agDebugLvl >= 2)
424 		Debug(rcvr, "Event <%s> posted from %s\n", evname, sndr ? sndr->name : "NULL");
425 #endif
426 	AG_ObjectLock(rcvr);
427 	TAILQ_FOREACH(ev, &rcvr->events, events) {
428 		if (strcmp(evname, ev->name) != 0)
429 			continue;
430 #ifdef AG_THREADS
431 		if (ev->flags & AG_EVENT_ASYNC) {
432 			AG_Thread th;
433 			AG_Event *evNew;
434 
435 			evNew = Malloc(sizeof(AG_Event));
436 			memcpy(evNew, ev, sizeof(AG_Event));
437 			AG_EVENT_GET_ARGS(evNew, fmt);
438 			InitPointerArg(&evNew->argv[evNew->argc], sndr);
439 			if (evNew->flags & AG_EVENT_PROPAGATE) { propagated = 1; }
440 			if (propagated) {
441 				evNew->flags &= ~(AG_EVENT_PROPAGATE);
442 			}
443 			AG_ThreadCreate(&th, EventThread, evNew);
444 		} else
445 #endif /* AG_THREADS */
446 		{
447 			AG_Event tmpev;
448 
449 			memcpy(&tmpev, ev, sizeof(AG_Event));
450 			AG_EVENT_GET_ARGS(&tmpev, fmt);
451 			InitPointerArg(&tmpev.argv[tmpev.argc], sndr);
452 			if ((tmpev.flags & AG_EVENT_PROPAGATE) && !propagated) {
453 #ifdef AG_DEBUG_CORE
454 				if (agDebugLvl >= 2)
455 					Debug(rcvr, "Propagate <%s>\n", evname);
456 #endif
457 				AG_LockVFS(rcvr);
458 				OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
459 					PropagateEvent(rcvr, chld, &tmpev);
460 				}
461 				AG_UnlockVFS(rcvr);
462 				propagated = 1;
463 			}
464 			if (tmpev.fn.fnVoid != NULL)
465 				tmpev.fn.fnVoid(&tmpev);
466 		}
467 	}
468 	AG_ObjectUnlock(rcvr);
469 }
470 
471 /*
472  * Variant of AG_PostEvent() which accepts an AG_Event argument instead
473  * of looking up the event handler by name.
474  */
475 void
AG_PostEventByPtr(void * sp,void * rp,AG_Event * ev,const char * fmt,...)476 AG_PostEventByPtr(void *sp, void *rp, AG_Event *ev, const char *fmt, ...)
477 {
478 	AG_Object *sndr = sp;
479 	AG_Object *rcvr = rp;
480 	AG_Object *chld;
481 	int propagated = 0;
482 
483 #ifdef AG_DEBUG_CORE
484 	if (agDebugLvl >= 2)
485 		Debug(rcvr, "Event %p posted from %s\n", ev, sndr ? sndr->name : "NULL");
486 #endif
487 	AG_ObjectLock(rcvr);
488 #ifdef AG_THREADS
489 	if (ev->flags & AG_EVENT_ASYNC) {
490 		AG_Thread th;
491 		AG_Event *evNew;
492 
493 		evNew = Malloc(sizeof(AG_Event));
494 		memcpy(evNew, ev, sizeof(AG_Event));
495 		AG_EVENT_GET_ARGS(evNew, fmt);
496 		InitPointerArg(&evNew->argv[evNew->argc], sndr);
497 		if (evNew->flags & AG_EVENT_PROPAGATE) { propagated = 1; }
498 		if (propagated) {
499 			evNew->flags &= ~(AG_EVENT_PROPAGATE);
500 		}
501 		AG_ThreadCreate(&th, EventThread, evNew);
502 	} else
503 #endif /* AG_THREADS */
504 	{
505 		AG_Event evTmp;
506 
507 		memcpy(&evTmp, ev, sizeof(AG_Event));
508 		AG_EVENT_GET_ARGS(&evTmp, fmt);
509 		InitPointerArg(&evTmp.argv[evTmp.argc], sndr);
510 		if ((evTmp.flags & AG_EVENT_PROPAGATE) && !propagated) {
511 #ifdef AG_DEBUG_CORE
512 			if (agDebugLvl >= 2)
513 				Debug(rcvr, "Propagate event %p (post)\n", ev);
514 #endif
515 			AG_LockVFS(rcvr);
516 			OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
517 				PropagateEvent(rcvr, chld, &evTmp);
518 			}
519 			AG_UnlockVFS(rcvr);
520 			propagated = 1;
521 		}
522 		if (evTmp.fn.fnVoid != NULL)
523 			evTmp.fn.fnVoid(&evTmp);
524 	}
525 	AG_ObjectUnlock(rcvr);
526 }
527 
528 /*
529  * Schedule the execution of the named event in the given number
530  * of AG_Time(3) ticks.
531  *
532  * The argument vector passed to the event handler function contains
533  * the AG_SetEvent() arguments, and any arguments specified here are
534  * appended to that list.
535  */
536 int
AG_SchedEvent(void * pSndr,void * pRcvr,Uint32 ticks,const char * evname,const char * fmt,...)537 AG_SchedEvent(void *pSndr, void *pRcvr, Uint32 ticks, const char *evname,
538     const char *fmt, ...)
539 {
540 	AG_Object *sndr = pSndr;
541 	AG_Object *rcvr = pRcvr;
542 	AG_Event *ev;
543 	AG_Timer *to;
544 
545 	if ((to = TryMalloc(sizeof(AG_Timer))) == NULL) {
546 		return (-1);
547 	}
548 	AG_InitTimer(to, evname, AG_TIMER_AUTO_FREE);
549 
550 	AG_LockTiming();
551 	AG_ObjectLock(rcvr);
552 
553 	if (AG_AddTimer(rcvr, to, ticks,
554 	    EventTimeout, "%p,%s", sndr, evname) == -1) {
555 		free(to);
556 		goto fail;
557 	}
558 	ev = &to->fnEvent;
559 	AG_EventInit(ev);
560 	ev->argv[0].data.p = rcvr;
561 	AG_EVENT_GET_ARGS(ev, fmt);
562 	ev->argc0 = ev->argc;
563 
564 	AG_UnlockTiming();
565 	AG_ObjectUnlock(rcvr);
566 	return (0);
567 fail:
568 	AG_UnlockTiming();
569 	AG_ObjectUnlock(rcvr);
570 	return (-1);
571 }
572 
573 /*
574  * Forward an event, without modifying the original event structure, except
575  * for the sender and receiver pointers.
576  */
577 void
AG_ForwardEvent(void * pSndr,void * pRcvr,AG_Event * event)578 AG_ForwardEvent(void *pSndr, void *pRcvr, AG_Event *event)
579 {
580 	AG_Object *sndr = pSndr;
581 	AG_Object *rcvr = pRcvr;
582 	AG_Object *chld;
583 	AG_Event *ev;
584 
585 #ifdef AG_DEBUG_CORE
586 	if (agDebugLvl >= 2)
587 		Debug(rcvr, "Event <%s> forwarded from %s\n", event->name, sndr ? sndr->name : "NULL");
588 #endif
589 	AG_ObjectLock(rcvr);
590 	TAILQ_FOREACH(ev, &rcvr->events, events) {
591 		if (strcmp(event->name, ev->name) != 0)
592 			continue;
593 #ifdef AG_THREADS
594 		if (ev->flags & AG_EVENT_ASYNC) {
595 			AG_Thread th;
596 			AG_Event *evNew;
597 
598 			evNew = Malloc(sizeof(AG_Event));
599 			memcpy(evNew, ev, sizeof(AG_Event));
600 			InitPointerArg(&evNew->argv[0], rcvr);
601 			InitPointerArg(&evNew->argv[evNew->argc], sndr);
602 			AG_ThreadCreate(&th, EventThread, evNew);
603 		} else
604 #endif /* AG_THREADS */
605 		{
606 			AG_Event tmpev;
607 
608 			memcpy(&tmpev, event, sizeof(AG_Event));
609 			InitPointerArg(&tmpev.argv[0], rcvr);
610 			InitPointerArg(&tmpev.argv[tmpev.argc], sndr);
611 
612 			if (ev->flags & AG_EVENT_PROPAGATE) {
613 #ifdef AG_DEBUG_CORE
614 				if (agDebugLvl >= 2)
615 					Debug(rcvr, "Propagate <%s> (forward)\n", event->name);
616 #endif
617 				AG_LockVFS(rcvr);
618 				OBJECT_FOREACH_CHILD(chld, rcvr, ag_object) {
619 					PropagateEvent(rcvr, chld, ev);
620 				}
621 				AG_UnlockVFS(rcvr);
622 			}
623 			/* XXX AG_EVENT_ASYNC.. */
624 			if (ev->fn.fnVoid != NULL)
625 				ev->fn.fnVoid(&tmpev);
626 		}
627 	}
628 	AG_ObjectUnlock(rcvr);
629 }
630 
631 #ifdef HAVE_KQUEUE
632 static __inline__ int
GrowKqChangelist(AG_EventSourceKQUEUE * kq,Uint n)633 GrowKqChangelist(AG_EventSourceKQUEUE *kq, Uint n)
634 {
635 	struct kevent *changesNew;
636 
637 	if (n <= kq->maxChanges) {
638 		return (0);
639 	}
640 	if ((changesNew = TryRealloc(kq->changes, n*sizeof(struct kevent)))
641 	    == NULL) {
642 		return (-1);
643 	}
644 	kq->changes = changesNew;
645 	kq->maxChanges = n;
646 	return (0);
647 }
648 #endif /* HAVE_KQUEUE */
649 
650 /* Create a new event source. */
651 static AG_EventSource *
CreateEventSource(void)652 CreateEventSource(void)
653 {
654 #ifdef HAVE_KQUEUE
655 	AG_EventSourceKQUEUE *kq = TryMalloc(sizeof(AG_EventSourceKQUEUE));
656 	AG_EventSource *src = (AG_EventSource *)kq;
657 #else
658 	AG_EventSource *src = TryMalloc(sizeof(AG_EventSource));
659 #endif
660 	if (src == NULL) {
661 		return (NULL);
662 	}
663 	src->flags = 0;
664 	src->addTimerFn = NULL;
665 	src->delTimerFn = NULL;
666 	src->breakReq = 0;
667 	src->returnCode = 0;
668 	TAILQ_INIT(&src->prologues);
669 	TAILQ_INIT(&src->epilogues);
670 	TAILQ_INIT(&src->spinners);
671 	TAILQ_INIT(&src->sinks);
672 	memset(src->caps, 0, sizeof(src->caps));
673 
674 #if defined(HAVE_KQUEUE)
675 	if ((kq->fd = kqueue()) == -1) {
676 		AG_SetError("kqueue: %s", AG_Strerror(errno));
677 		return (NULL);
678 	}
679 	kq->changes = NULL;
680 	kq->nChanges = 0;
681 	kq->maxChanges = 0;
682 	memset(kq->events, 0, EVBUFSIZE*sizeof(struct kevent));
683 	src->sinkFn = AG_EventSinkKQUEUE;
684 	src->addTimerFn = AG_AddTimerKQUEUE;
685 	src->delTimerFn = AG_DelTimerKQUEUE;
686 	src->caps[AG_SINK_TIMER] = 1;		/* Provides timers internally */
687 	src->caps[AG_SINK_READ] = 1;
688 	src->caps[AG_SINK_WRITE] = 1;
689 	src->caps[AG_SINK_FSEVENT] = 1;
690 	src->caps[AG_SINK_PROCEVENT] = 1;
691 	GrowKqChangelist(kq, 64);		/* Preallocate */
692 #elif defined(HAVE_TIMERFD)
693 	src->sinkFn = AG_EventSinkTIMERFD;
694 	src->addTimerFn = AG_AddTimerTIMERFD;
695 	src->delTimerFn = AG_DelTimerTIMERFD;
696 	src->caps[AG_SINK_TIMER] = 1;		/* Provides timers internally */
697 	src->caps[AG_SINK_READ] = 1;
698 	src->caps[AG_SINK_WRITE] = 1;
699 #elif defined(HAVE_SELECT) && !defined(AG_THREADS)
700 	src->sinkFn = AG_EventSinkTIMEDSELECT;
701 	src->caps[AG_SINK_READ] = 1;
702 	src->caps[AG_SINK_WRITE] = 1;
703 #elif defined(HAVE_SELECT) && defined(AG_THREADS)
704 	src->sinkFn = AG_EventSinkSELECT;
705 	src->caps[AG_SINK_READ] = 1;
706 	src->caps[AG_SINK_WRITE] = 1;
707 #else
708 	src->sinkFn = AG_EventSinkSPINNER;
709 #endif
710 	if (agSoftTimers) {			/* Force soft timers */
711 		src->addTimerFn = NULL;
712 		src->delTimerFn = NULL;
713 		src->caps[AG_SINK_TIMER] = 0;
714 	}
715 	return (src);
716 }
717 
718 static void
DestroyEventSource(void * pEventSource)719 DestroyEventSource(void *pEventSource)
720 {
721 	AG_EventSource *src = pEventSource;
722 	AG_EventSink *es, *esNext;
723 
724 	if (agEventSource == NULL)
725 		return;
726 
727 #ifdef HAVE_KQUEUE
728 	{
729 		AG_EventSourceKQUEUE *kq = pEventSource;
730 
731 		if (kq->fd != -1) {
732 			close(kq->fd);
733 		}
734 		Free(kq->changes);
735 	}
736 #endif
737 	for (es = TAILQ_FIRST(&src->prologues); es != TAILQ_END(&src->prologues); es = esNext) {
738 		esNext = TAILQ_NEXT(es, sinks);
739 		free(es);
740 	}
741 	for (es = TAILQ_FIRST(&src->epilogues); es != TAILQ_END(&src->epilogues); es = esNext) {
742 		esNext = TAILQ_NEXT(es, sinks);
743 		free(es);
744 	}
745 	for (es = TAILQ_FIRST(&src->spinners); es != TAILQ_END(&src->spinners); es = esNext) {
746 		esNext = TAILQ_NEXT(es, sinks);
747 		free(es);
748 	}
749 	for (es = TAILQ_FIRST(&src->sinks); es != TAILQ_END(&src->sinks); es = esNext) {
750 		esNext = TAILQ_NEXT(es, sinks);
751 		free(es);
752 	}
753 	free(src);
754 }
755 
756 /* Return the calling thread's effective event source. */
757 AG_EventSource *
AG_GetEventSource(void)758 AG_GetEventSource(void)
759 {
760 	AG_EventSource *src;
761 
762 #ifdef AG_THREADS
763 	if ((src = AG_ThreadKeyGet(agEventSourceKey)) != NULL && src != NULL)
764 		return (src);
765 #else
766 	if (agEventSource != NULL)
767 		return (agEventSource);
768 #endif
769 	if ((src = CreateEventSource()) == NULL)
770 		AG_FatalError(NULL);
771 #ifdef AG_THREADS
772 	AG_ThreadKeySet(agEventSourceKey, src);
773 #else
774 	agEventSource = src;
775 #endif
776 	return (src);
777 }
778 
779 int
AG_InitEventSubsystem(Uint flags)780 AG_InitEventSubsystem(Uint flags)
781 {
782 	/* Initialize the main thread's event source. */
783 	agEventSource = NULL;
784 #ifdef AG_THREADS
785 	if (AG_ThreadKeyTryCreate(&agEventSourceKey, DestroyEventSource) == -1)
786 		return (-1);
787 #endif
788 	if ((agEventSource = AG_GetEventSource()) == NULL) {
789 		return (-1);
790 	}
791 	return (0);
792 }
793 
794 void
AG_DestroyEventSubsystem(void)795 AG_DestroyEventSubsystem(void)
796 {
797 	if (agEventSource != NULL) {
798 		DestroyEventSource(agEventSource);
799 		agEventSource = NULL;
800 	}
801 }
802 
803 #ifdef HAVE_KQUEUE
804 /*
805  * Routines for translating between AG_EventSink and kqueue types.
806  */
807 static __inline__ enum ag_event_sink_type
GetSinkType(int filter)808 GetSinkType(int filter)
809 {
810 	switch (filter) {
811 	case EVFILT_TIMER:	return (AG_SINK_TIMER);
812 	case EVFILT_READ:	return (AG_SINK_READ);
813 	case EVFILT_WRITE:	return (AG_SINK_WRITE);
814 	case EVFILT_VNODE:	return (AG_SINK_FSEVENT);
815 	case EVFILT_PROC:	return (AG_SINK_PROCEVENT);
816 	default:		return (AG_SINK_NONE);
817 	}
818 }
819 static Uint
GetKqFilterFlags(Uint flags)820 GetKqFilterFlags(Uint flags)
821 {
822 	Uint fflags = 0;
823 	if (flags & AG_FSEVENT_DELETE) { fflags |= NOTE_DELETE; }
824 	if (flags & AG_FSEVENT_WRITE)  { fflags |= NOTE_WRITE;  }
825 	if (flags & AG_FSEVENT_EXTEND) { fflags |= NOTE_EXTEND; }
826 	if (flags & AG_FSEVENT_ATTRIB) { fflags |= NOTE_ATTRIB; }
827 	if (flags & AG_FSEVENT_LINK)   { fflags |= NOTE_LINK;   }
828 	if (flags & AG_FSEVENT_RENAME) { fflags |= NOTE_RENAME; }
829 	if (flags & AG_FSEVENT_REVOKE) { fflags |= NOTE_REVOKE; }
830 	if (flags & AG_PROCEVENT_EXIT) { fflags |= NOTE_EXIT; }
831 	if (flags & AG_PROCEVENT_FORK) { fflags |= NOTE_FORK; }
832 	if (flags & AG_PROCEVENT_EXEC) { fflags |= NOTE_EXEC; }
833 	return (fflags);
834 }
835 static Uint
GetSinkFlags(Uint fflags)836 GetSinkFlags(Uint fflags)
837 {
838 	Uint flags = 0;
839 	if (fflags & NOTE_DELETE) { fflags |= AG_FSEVENT_DELETE; }
840 	if (fflags & NOTE_WRITE)  { fflags |= AG_FSEVENT_WRITE;  }
841 	if (fflags & NOTE_EXTEND) { fflags |= AG_FSEVENT_EXTEND; }
842 	if (fflags & NOTE_ATTRIB) { fflags |= AG_FSEVENT_ATTRIB; }
843 	if (fflags & NOTE_LINK)   { fflags |= AG_FSEVENT_LINK;   }
844 	if (fflags & NOTE_RENAME) { fflags |= AG_FSEVENT_RENAME; }
845 	if (fflags & NOTE_REVOKE) { fflags |= AG_FSEVENT_REVOKE; }
846 	if (fflags & NOTE_EXIT) { fflags |= AG_PROCEVENT_EXIT; }
847 	if (fflags & NOTE_FORK) { fflags |= AG_PROCEVENT_FORK; }
848 	if (fflags & NOTE_EXEC) { fflags |= AG_PROCEVENT_EXEC; }
849 	return (flags);
850 }
851 #endif /* HAVE_KQUEUE */
852 
853 /*
854  * Add/remove an event processing prologue. The function will be invoked
855  * only once at the beginning of AG_EventLoop().
856  */
857 AG_EventSink *
AG_AddEventPrologue(AG_EventSinkFn fn,const char * fnArgs,...)858 AG_AddEventPrologue(AG_EventSinkFn fn, const char *fnArgs, ...)
859 {
860 	AG_EventSource *src = AG_GetEventSource();
861 	AG_EventSink *es;
862 
863 	if ((es = TryMalloc(sizeof(AG_EventSink))) == NULL) {
864 		return (NULL);
865 	}
866 	es->type = AG_SINK_PROLOGUE;
867 	es->fn = fn;
868 	InitEvent(&es->fnArgs, NULL);
869 	AG_EVENT_GET_ARGS(&es->fnArgs, fnArgs);
870 	es->fnArgs.argc0 = es->fnArgs.argc;
871 	TAILQ_INSERT_TAIL(&src->prologues, es, sinks);
872 	return (es);
873 }
874 void
AG_DelEventPrologue(AG_EventSink * es)875 AG_DelEventPrologue(AG_EventSink *es)
876 {
877 	AG_EventSource *src = AG_GetEventSource();
878 
879 #ifdef AG_DEBUG
880 	if (es->type != AG_SINK_PROLOGUE)
881 		AG_FatalError("AG_DelEventPrologue");
882 #endif
883 	TAILQ_REMOVE(&src->prologues, es, sinks);
884 	free(es);
885 }
886 
887 /*
888  * Add/remove an event sink epilogue. The function will be invoked
889  * after all incoming / queued events have been processed.
890  */
891 AG_EventSink *
AG_AddEventEpilogue(AG_EventSinkFn fn,const char * fnArgs,...)892 AG_AddEventEpilogue(AG_EventSinkFn fn, const char *fnArgs, ...)
893 {
894 	AG_EventSource *src = AG_GetEventSource();
895 	AG_EventSink *es;
896 
897 	if ((es = TryMalloc(sizeof(AG_EventSink))) == NULL) {
898 		return (NULL);
899 	}
900 	es->type = AG_SINK_EPILOGUE;
901 	es->fn = fn;
902 	InitEvent(&es->fnArgs, NULL);
903 	AG_EVENT_GET_ARGS(&es->fnArgs, fnArgs);
904 	es->fnArgs.argc0 = es->fnArgs.argc;
905 	TAILQ_INSERT_TAIL(&src->epilogues, es, sinks);
906 	return (es);
907 }
908 void
AG_DelEventEpilogue(AG_EventSink * es)909 AG_DelEventEpilogue(AG_EventSink *es)
910 {
911 	AG_EventSource *src = AG_GetEventSource();
912 
913 #ifdef AG_DEBUG
914 	if (es->type != AG_SINK_EPILOGUE)
915 		AG_FatalError("AG_DelEventEpilogue");
916 #endif
917 	TAILQ_REMOVE(&src->epilogues, es, sinks);
918 	free(es);
919 }
920 
921 /*
922  * Add/remove a spinning event sink. If at least one spinning sink exists,
923  * AG_EventLoop() will invoke it repeatedly and force non-blocking operation
924  * of subsequent polling methods.
925  */
926 AG_EventSink *
AG_AddEventSpinner(AG_EventSinkFn fn,const char * fnArgs,...)927 AG_AddEventSpinner(AG_EventSinkFn fn, const char *fnArgs, ...)
928 {
929 	AG_EventSource *src = AG_GetEventSource();
930 	AG_EventSink *es;
931 
932 	if ((es = TryMalloc(sizeof(AG_EventSink))) == NULL) {
933 		return (NULL);
934 	}
935 	es->type = AG_SINK_SPINNER;
936 	es->fn = fn;
937 	InitEvent(&es->fnArgs, NULL);
938 	AG_EVENT_GET_ARGS(&es->fnArgs, fnArgs);
939 	es->fnArgs.argc0 = es->fnArgs.argc;
940 	TAILQ_INSERT_TAIL(&src->spinners, es, sinks);
941 	return (es);
942 }
943 void
AG_DelEventSpinner(AG_EventSink * es)944 AG_DelEventSpinner(AG_EventSink *es)
945 {
946 	AG_EventSource *src = AG_GetEventSource();
947 
948 #ifdef AG_DEBUG
949 	if (es->type != AG_SINK_SPINNER)
950 		AG_FatalError("AG_DelEventSpinner");
951 #endif
952 	TAILQ_REMOVE(&src->spinners, es, sinks);
953 	free(es);
954 }
955 
956 /*
957  * Add/remove a low-level event sink. The function will be called
958  * whenever the specified event occurs.
959  */
960 AG_EventSink *
AG_AddEventSink(enum ag_event_sink_type type,int ident,Uint flags,AG_EventSinkFn fn,const char * fnArgs,...)961 AG_AddEventSink(enum ag_event_sink_type type, int ident, Uint flags,
962     AG_EventSinkFn fn, const char *fnArgs, ...)
963 {
964 	AG_EventSource *src = AG_GetEventSource();
965 	AG_EventSink *es;
966 #ifdef HAVE_KQUEUE
967 	AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)src;
968 	struct kevent *kev;
969 #endif
970 	if (type >= AG_SINK_LAST || !src->caps[type]) {
971 		AG_SetError("Unsupported event type: %u", (Uint)type);
972 		return (NULL);
973 	}
974 	if ((es = TryMalloc(sizeof(AG_EventSink))) == NULL) {
975 		return (NULL);
976 	}
977 	es->type = type;
978 	es->ident = ident;
979 	es->flags = flags;
980 
981 #ifdef HAVE_KQUEUE
982 	if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
983 		free(es);
984 		return (NULL);
985 	}
986 	kev = &kq->changes[kq->nChanges++];
987 	switch (type) {
988 	case AG_SINK_READ:
989 		AG_EV_SET(kev, ident, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, es);
990 		break;
991 	case AG_SINK_WRITE:
992 		AG_EV_SET(kev, ident, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, es);
993 		break;
994 	case AG_SINK_FSEVENT:
995 		AG_EV_SET(kev, ident, EVFILT_VNODE, EV_ADD|EV_ENABLE,
996 		    GetKqFilterFlags(flags), 0, es);
997 		break;
998 	case AG_SINK_PROCEVENT:
999 		AG_EV_SET(kev, ident, EVFILT_PROC, EV_ADD|EV_ENABLE,
1000 		    GetKqFilterFlags(flags), 0, es);
1001 		break;
1002 	default:
1003 		kq->nChanges--;
1004 		break;
1005 	}
1006 #endif /* HAVE_KQUEUE */
1007 
1008 	es->fn = fn;
1009 	InitEvent(&es->fnArgs, NULL);
1010 	AG_EVENT_GET_ARGS(&es->fnArgs, fnArgs);
1011 	es->fnArgs.argc0 = es->fnArgs.argc;
1012 	TAILQ_INSERT_TAIL(&src->sinks, es, sinks);
1013 	return (es);
1014 }
1015 void
AG_DelEventSink(AG_EventSink * es)1016 AG_DelEventSink(AG_EventSink *es)
1017 {
1018 	AG_EventSource *src = AG_GetEventSource();
1019 #ifdef HAVE_KQUEUE
1020 	AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)src;
1021 	struct kevent *kev;
1022 
1023 	if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
1024 		AG_FatalError(NULL);
1025 	}
1026 	kev = &kq->changes[kq->nChanges++];
1027 
1028 	switch (es->type) {
1029 	case AG_SINK_READ:
1030 		AG_EV_SET(kev, es->ident, EVFILT_READ, EV_DELETE, 0, 0, NULL);
1031 		break;
1032 	case AG_SINK_WRITE:
1033 		AG_EV_SET(kev, es->ident, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
1034 		break;
1035 	case AG_SINK_FSEVENT:
1036 		AG_EV_SET(kev, es->ident, EVFILT_VNODE, EV_DELETE,
1037 		    GetKqFilterFlags(es->flags), 0, NULL);
1038 		break;
1039 	case AG_SINK_PROCEVENT:
1040 		AG_EV_SET(kev, es->ident, EVFILT_PROC, EV_DELETE,
1041 		    GetKqFilterFlags(es->flags), 0, NULL);
1042 		break;
1043 	default:
1044 		kq->nChanges--;
1045 		break;
1046 	}
1047 #endif /* HAVE_KQUEUE */
1048 
1049 	TAILQ_REMOVE(&src->sinks, es, sinks);
1050 	free(es);
1051 }
1052 void
AG_DelEventSinksByIdent(enum ag_event_sink_type type,int ident,Uint flags)1053 AG_DelEventSinksByIdent(enum ag_event_sink_type type, int ident, Uint flags)
1054 {
1055 	AG_EventSource *src = AG_GetEventSource();
1056 	AG_EventSink *es, *esNext;
1057 
1058 	for (es = TAILQ_FIRST(&src->sinks); es != TAILQ_END(&src->sinks); es = esNext) {
1059 		esNext = TAILQ_NEXT(es, sinks);
1060 		if (es->type == type &&
1061 		    es->ident == ident &&
1062 		    es->flags == flags)
1063 			AG_DelEventSink(es);
1064 	}
1065 }
1066 
1067 #ifdef HAVE_KQUEUE
1068 /*
1069  * Standard event sink using kqueue(2), commonly found on modern BSD
1070  * derived operating systems.
1071  */
1072 int
AG_EventSinkKQUEUE(void)1073 AG_EventSinkKQUEUE(void)
1074 {
1075 	AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)agEventSource;
1076 	int rv, i;
1077 	struct timespec timeo, *pTimeo;
1078 
1079 restart:
1080 	if (!TAILQ_EMPTY(&agEventSource->spinners)) {
1081 		timeo.tv_sec = 0;
1082 		timeo.tv_nsec = 0L;
1083 		pTimeo = &timeo;
1084 	} else {
1085 		pTimeo = NULL;
1086 	}
1087 #ifdef DEBUG_TIMERS
1088 	for (i = 0; i < kq->nChanges; i++) {
1089 		struct kevent *chg = &kq->changes[i];
1090 		Verbose("changes[%d]: f=%d i=%u f=0x%x ff=0x%x u=%p\n",
1091 		    i, (int)chg->filter,
1092 		    (Uint)chg->ident, chg->flags, chg->fflags,
1093 		    chg->udata);
1094 	}
1095 #endif
1096 	rv = kevent(kq->fd, kq->changes, kq->nChanges, kq->events, EVBUFSIZE,
1097 	    pTimeo);
1098 	if (rv < 0) {
1099 		if (errno == EINTR) {
1100 			goto restart;
1101 		}
1102 		AG_SetError("kevent(): %s", AG_Strerror(errno));
1103 		return (-1);
1104 	}
1105 	kq->nChanges = 0;
1106 
1107 	/* 1. Process timer expirations. */
1108 	AG_LockTiming();
1109 	for (i = 0; i < rv; i++) {
1110 		struct kevent *kev = &kq->events[i];
1111 		enum ag_event_sink_type esType = GetSinkType(kev->filter);
1112 		Uint32 rvt;
1113 		AG_Timer *to;
1114 		AG_Object *ob;
1115 
1116 		if (kev->flags & EV_ERROR) {
1117 			Verbose("kevent (%ld,%d): %s\n", kev->ident, kev->filter,
1118 			    AG_Strerror((int)kev->data));
1119 			continue;
1120 		}
1121 		if (esType != AG_SINK_TIMER ||
1122 		    (to = (AG_Timer *)kev->udata) == NULL) {
1123 			continue;
1124 		}
1125 		rvt = to->fn(to, &to->fnEvent);
1126 		if (rvt > 0) {				/* Restart timer */
1127 			struct kevent *kev;
1128 #ifdef DEBUG_TIMERS
1129 			Verbose("TIMER[%d] resetting t=+%u\n", to->id, (Uint)rvt);
1130 #endif
1131 			if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
1132 				AG_UnlockTiming();
1133 				return (-1);
1134 			}
1135 			kev = &kq->changes[kq->nChanges++];
1136 			AG_EV_SET(kev, to->id, EVFILT_TIMER,
1137 			    EV_ADD|EV_ENABLE|EV_ONESHOT, 0, (int)rvt, to);
1138 			to->ival = rvt;
1139 		} else {				/* Expire */
1140 #ifdef DEBUG_TIMERS
1141 			Verbose("TIMER[%d] expired\n", to->id);
1142 #endif
1143 			if ((ob = to->obj) == NULL) {
1144 				continue;
1145 			}
1146 			TAILQ_REMOVE(&ob->timers, to, timers);
1147 			if (TAILQ_EMPTY(&ob->timers)) {
1148 				TAILQ_REMOVE(&agTimerObjQ, ob, tobjs);
1149 			}
1150 			if (to->flags & AG_TIMER_AUTO_FREE) {
1151 				free(to);
1152 			} else {
1153 				to->ival = 0;
1154 				to->id = -1;
1155 				to->obj = NULL;
1156 			}
1157 			agTimerCount--;
1158 		}
1159 	}
1160 	AG_UnlockTiming();
1161 
1162 	/* 2. Process I/O and other events. */
1163 	for (i = 0; i < rv; i++) {
1164 		struct kevent *kev = &kq->events[i];
1165 		enum ag_event_sink_type esType = GetSinkType(kev->filter);
1166 		AG_EventSink *es;
1167 
1168 		switch (esType) {
1169 		case AG_SINK_READ:
1170 		case AG_SINK_WRITE:
1171 			es = (AG_EventSink *)kev->udata;
1172 			es->fn(es, &es->fnArgs);
1173 			break;
1174 		case AG_SINK_FSEVENT:
1175 		case AG_SINK_PROCEVENT:
1176 			es = (AG_EventSink *)kev->udata;
1177 			es->flagsMatched = GetSinkFlags(kev->fflags);
1178 			es->fn(es, &es->fnArgs);
1179 			break;
1180 		default:
1181 			break;
1182 		}
1183 	}
1184 	return (0);
1185 }
1186 
1187 /*
1188  * Add/remove a kqueue(2) based timer.
1189  */
1190 static int
GenerateTimerID(AG_Timer * to)1191 GenerateTimerID(AG_Timer *to)
1192 {
1193 	AG_Object *obOther;
1194 	AG_Timer *toOther;
1195 	int id;
1196 
1197 gen_id:
1198 #ifdef AG_DEBUG
1199 	if (agTimerCount+1 >= (AG_INT_MAX-1))
1200 		AG_FatalError("agTimerCount");
1201 #endif
1202 	id = (int)++agTimerCount;			/* XXX */
1203 	TAILQ_FOREACH(obOther, &agTimerObjQ, tobjs) {
1204 		TAILQ_FOREACH(toOther, &obOther->timers, timers) {
1205 			if (toOther == to) { continue; }
1206 			if (toOther->id == id) {
1207 				id++;
1208 				goto gen_id;
1209 			}
1210 		}
1211 	}
1212 	return (id);
1213 }
1214 int
AG_AddTimerKQUEUE(AG_Timer * to,Uint32 ival,int newTimer)1215 AG_AddTimerKQUEUE(AG_Timer *to, Uint32 ival, int newTimer)
1216 {
1217 	AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)agEventSource;
1218 
1219 	/* Create a kernel-based timer with kqueue. */
1220 	if (newTimer) {
1221 		to->id = GenerateTimerID(to);
1222 	}
1223 	if (newTimer || to->ival != ival) {
1224 		struct kevent *kev;
1225 #ifdef DEBUG_TIMERS
1226 		Verbose("kevent: creating timer ID=%d ival=%d\n", to->id, (int)ival);
1227 #endif
1228 		if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
1229 			return (-1);
1230 		}
1231 		kev = &kq->changes[kq->nChanges++];
1232 		AG_EV_SET(kev, to->id, EVFILT_TIMER,
1233 		    EV_ADD|EV_ENABLE|EV_ONESHOT, 0, (int)ival, to);
1234 		to->ival = ival;
1235 	}
1236 	return (0);
1237 }
1238 void
AG_DelTimerKQUEUE(AG_Timer * to)1239 AG_DelTimerKQUEUE(AG_Timer *to)
1240 {
1241 	AG_EventSourceKQUEUE *kq = (AG_EventSourceKQUEUE *)agEventSource;
1242 	struct kevent *kev;
1243 
1244 	if (GrowKqChangelist(kq, kq->nChanges+1) == -1) {
1245 		AG_FatalError(NULL);
1246 	}
1247 	kev = &kq->changes[kq->nChanges++];
1248 	AG_EV_SET(kev, to->id, EVFILT_TIMER, EV_DELETE,
1249 	    0, 0, NULL);
1250 	agTimerCount--;
1251 }
1252 
1253 #endif /* HAVE_KQUEUE */
1254 
1255 #ifdef HAVE_TIMERFD
1256 /*
1257  * Standard event sink using select(2) and fd-based timers,
1258  * usually available on Linux.
1259  */
1260 int
AG_EventSinkTIMERFD(void)1261 AG_EventSinkTIMERFD(void)
1262 {
1263 	fd_set rdFds, wrFds;
1264 	int nFds, rv;
1265 	AG_EventSink *es;
1266 	AG_Object *ob, *obNext;
1267 	AG_Timer *to, *toNext;
1268 	struct timeval timeo, *pTimeo;
1269 
1270 restart:
1271 	nFds = 0;
1272 	FD_ZERO(&rdFds);
1273 	FD_ZERO(&wrFds);
1274 	TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1275 		switch (es->type) {
1276 		case AG_SINK_READ:
1277 			FD_SET(es->ident, &rdFds);
1278 			if (es->ident > nFds) { nFds = es->ident; }
1279 			break;
1280 		case AG_SINK_WRITE:
1281 			FD_SET(es->ident, &wrFds);
1282 			if (es->ident > nFds) { nFds = es->ident; }
1283 			break;
1284 		}
1285 	}
1286 	TAILQ_FOREACH(ob, &agTimerObjQ, tobjs) {
1287 		TAILQ_FOREACH(to, &ob->timers, timers) {
1288 			FD_SET(to->id, &rdFds);
1289 			if (to->id > nFds) { nFds = to->id; }
1290 		}
1291 	}
1292 	if (!TAILQ_EMPTY(&agEventSource->spinners)) {
1293 		timeo.tv_sec = 0;
1294 		timeo.tv_usec = 0;
1295 		pTimeo = &timeo;
1296 	} else {
1297 		pTimeo = NULL;
1298 	}
1299 	rv = select(nFds+1, &rdFds, &wrFds, NULL, pTimeo);
1300 	if (rv == -1) {
1301 		if (errno == EINTR) {
1302 			goto restart;
1303 		}
1304 		AG_SetError("select: %s", AG_Strerror(errno));
1305 		return (-1);
1306 	}
1307 
1308 	AG_LockTiming();
1309 
1310 	/* 1. Process timer expirations. */
1311 	for (ob = TAILQ_FIRST(&agTimerObjQ);
1312 	     ob != TAILQ_END(&agTimerObjQ);
1313 	     ob = obNext) {
1314 		obNext = TAILQ_NEXT(ob, tobjs);
1315 		AG_ObjectLock(ob);
1316 		for (to = TAILQ_FIRST(&ob->timers);
1317 		     to != TAILQ_END(&ob->timers);
1318 		     to = toNext) {
1319 			struct itimerspec its;
1320 			Uint32 rvt;
1321 
1322 			toNext = TAILQ_NEXT(to, timers);
1323 			if (!FD_ISSET(to->id, &rdFds)) {
1324 				continue;
1325 			}
1326 			rvt = to->fn(to, &to->fnEvent);
1327 			if (rvt > 0) {
1328 				its.it_value.tv_sec = rvt/1000;
1329 				its.it_value.tv_nsec = (rvt % 1000)*1000000L;
1330 				its.it_interval.tv_sec = 0;
1331 				its.it_interval.tv_nsec = 0L;
1332 				if (timerfd_settime(to->id, 0, &its, NULL) == -1) {
1333 					Verbose("timerfd_settime: %s\n", AG_Strerror(errno));
1334 					FD_CLR(to->id, &rdFds);
1335 					AG_DelTimer(ob, to);
1336 				}
1337 			} else {
1338 				FD_CLR(to->id, &rdFds);
1339 				AG_DelTimer(ob, to);
1340 			}
1341 		}
1342 		AG_ObjectUnlock(ob);
1343 	}
1344 
1345 	/* 2. Process I/O events. */
1346 	TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1347 		switch (es->type) {
1348 		case AG_SINK_READ:
1349 			if (FD_ISSET(es->ident, &rdFds)) {
1350 				es->fn(es, &es->fnArgs);
1351 			}
1352 			break;
1353 		case AG_SINK_WRITE:
1354 			if (FD_ISSET(es->ident, &wrFds)) {
1355 				es->fn(es, &es->fnArgs);
1356 			}
1357 			break;
1358 		}
1359 	}
1360 
1361 	AG_UnlockTiming();
1362 	return (0);
1363 }
1364 
1365 /*
1366  * Add/remove a fd-based timer.
1367  */
1368 int
AG_AddTimerTIMERFD(AG_Timer * to,Uint32 ival,int newTimer)1369 AG_AddTimerTIMERFD(AG_Timer *to, Uint32 ival, int newTimer)
1370 {
1371 	struct itimerspec its;
1372 
1373 	if (newTimer) {
1374 		/* Create a timerfd. Store the file descriptor as ID. */
1375 		if ((to->id = timerfd_create(CLOCK_MONOTONIC, 0)) == -1) {
1376 			AG_SetError("timerfd_create: %s", AG_Strerror(errno));
1377 			return (-1);
1378 		}
1379 	}
1380 	its.it_value.tv_sec = ival/1000;
1381 	its.it_value.tv_nsec = (ival % 1000)*1000000L;
1382 	its.it_interval.tv_sec = 0;
1383 	its.it_interval.tv_nsec = 0L;
1384 	if (timerfd_settime(to->id, 0, &its, NULL) == -1) {
1385 		close(to->id);
1386 		AG_SetError("timerfd_settime: %s", AG_Strerror(errno));
1387 		return (-1);
1388 	}
1389 	to->ival = ival;
1390 	return (0);
1391 }
1392 void
AG_DelTimerTIMERFD(AG_Timer * to)1393 AG_DelTimerTIMERFD(AG_Timer *to)
1394 {
1395 #ifdef AG_DEBUG
1396 	if (to->id == -1) { AG_FatalError("timerfd inconsistency"); }
1397 #endif
1398 	close(to->id);
1399 }
1400 #endif /* HAVE_TIMERFD */
1401 
1402 #if defined(HAVE_SELECT) && !defined(AG_THREADS)
1403 /*
1404  * Standard event sink using select(2) with timers implemented using the
1405  * select() timeout. Only available in single-threaded builds, since timers
1406  * cannot be added, restarted or removed from different threads with this
1407  * method.
1408  */
1409 int
AG_EventSinkTIMEDSELECT(void)1410 AG_EventSinkTIMEDSELECT(void)
1411 {
1412 	fd_set rdFds, wrFds;
1413 	int i, nFds, rv;
1414 	AG_EventSink *es;
1415 	AG_Object *ob, *obNext;
1416 	AG_Timer *to, *toNext;
1417 	struct timeval timeo, *pTimeo;
1418 	Uint32 t, tSoonest;
1419 
1420 restart:
1421 	nFds = 0;
1422 	FD_ZERO(&rdFds);
1423 	FD_ZERO(&wrFds);
1424 	TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1425 		switch (es->type) {
1426 		case AG_SINK_READ:
1427 			FD_SET(es->ident, &rdFds);
1428 			if (es->ident > nFds) { nFds = es->ident; }
1429 			break;
1430 		case AG_SINK_WRITE:
1431 			FD_SET(es->ident, &wrFds);
1432 			if (es->ident > nFds) { nFds = es->ident; }
1433 			break;
1434 		}
1435 	}
1436 
1437 	if (!TAILQ_EMPTY(&agEventSource->spinners)) {
1438 		timeo.tv_sec = 0;
1439 		timeo.tv_usec = 0;
1440 	} else {
1441 		AG_LockTiming();
1442 		t = AG_GetTicks();
1443 		tSoonest = 0xfffffffe;
1444 		TAILQ_FOREACH(ob, &agTimerObjQ, tobjs) {
1445 			TAILQ_FOREACH(to, &ob->timers, timers) {
1446 				if ((to->tSched - t) < tSoonest)
1447 					tSoonest = (to->tSched - t);
1448 			}
1449 		}
1450 		timeo.tv_sec = tSoonest/1000;
1451 		timeo.tv_usec = (tSoonest % 1000)*1000;
1452 		AG_UnlockTiming();
1453 	}
1454 	rv = select(nFds+1, &rdFds, &wrFds, NULL, &timeo);
1455 	if (rv == -1) {
1456 		if (errno == EINTR) {
1457 			goto restart;
1458 		}
1459 		AG_SetError("select: %s", AG_Strerror(errno));
1460 		return (-1);
1461 	}
1462 
1463 	AG_LockTiming();
1464 	/* 1. Process timer expirations. */
1465 	AG_ProcessTimeouts(t);
1466 	if (rv > 0) {
1467 		/* 2. Process I/O events */
1468 		TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1469 			switch (es->type) {
1470 			case AG_SINK_READ:
1471 				if (FD_ISSET(es->ident, &rdFds)) {
1472 					es->fn(es, &es->fnArgs);
1473 				}
1474 				break;
1475 			case AG_SINK_WRITE:
1476 				if (FD_ISSET(es->ident, &wrFds)) {
1477 					es->fn(es, &es->fnArgs);
1478 				}
1479 				break;
1480 			}
1481 		}
1482 	}
1483 	AG_UnlockTiming();
1484 	return (0);
1485 }
1486 #endif /* HAVE_SELECT and !AG_THREADS */
1487 
1488 #if defined(HAVE_SELECT) && defined(AG_THREADS)
1489 /*
1490  * Standard event sink using non-blocking select(2) with timers implemented
1491  * with a delay loop. This is inefficient, but on some platforms, it is the
1492  * only thread-safe option.
1493  */
1494 int
AG_EventSinkSELECT(void)1495 AG_EventSinkSELECT(void)
1496 {
1497 	fd_set rdFds, wrFds;
1498 	int nFds, rv;
1499 	AG_EventSink *es;
1500 	struct timeval timeo;
1501 
1502 	nFds = 0;
1503 	FD_ZERO(&rdFds);
1504 	FD_ZERO(&wrFds);
1505 
1506 	TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1507 		switch (es->type) {
1508 		case AG_SINK_READ:
1509 			FD_SET(es->ident, &rdFds);
1510 			if (es->ident > nFds) { nFds = es->ident; }
1511 			break;
1512 		case AG_SINK_WRITE:
1513 			FD_SET(es->ident, &wrFds);
1514 			if (es->ident > nFds) { nFds = es->ident; }
1515 			break;
1516 		default:
1517 			break;
1518 		}
1519 	}
1520 
1521 restart:
1522 	timeo.tv_sec = 0;
1523 	timeo.tv_usec = 0;
1524 	rv = select(nFds+1, &rdFds, &wrFds, NULL, &timeo);
1525 	if (rv == -1) {
1526 		if (errno == EINTR) {
1527 			goto restart;
1528 		}
1529 		AG_SetError("select: %s", AG_Strerror(errno));
1530 		return (-1);
1531 	}
1532 
1533 	AG_LockTiming();
1534 	/* 1. Process timer expirations. */
1535 	AG_ProcessTimeouts(AG_GetTicks());
1536 	if (rv > 0) {
1537 		/* 2. Process I/O events. */
1538 		TAILQ_FOREACH(es, &agEventSource->sinks, sinks) {
1539 			switch (es->type) {
1540 			case AG_SINK_READ:
1541 				if (FD_ISSET(es->ident, &rdFds)) {
1542 					es->fn(es, &es->fnArgs);
1543 				}
1544 				break;
1545 			case AG_SINK_WRITE:
1546 				if (FD_ISSET(es->ident, &wrFds)) {
1547 					es->fn(es, &es->fnArgs);
1548 				}
1549 				break;
1550 			}
1551 		}
1552 	}
1553 	AG_UnlockTiming();
1554 
1555 	if (TAILQ_EMPTY(&agEventSource->spinners)) {
1556 		AG_Delay(1);
1557 	}
1558 	return (0);
1559 }
1560 #endif /* HAVE_SELECT and AG_THREADS */
1561 
1562 /*
1563  * Fallback "spinning" event sink using a delay loop. This is inefficient,
1564  * but is the only option on some platforms.
1565  */
1566 int
AG_EventSinkSPINNER(void)1567 AG_EventSinkSPINNER(void)
1568 {
1569 	AG_ProcessTimeouts(AG_GetTicks());
1570 	AG_Delay(1);
1571 	return (0);
1572 }
1573 
1574 /*
1575  * Standard event loop routine. We loop over the event source and invoke
1576  * the related event sinks. AG_EventLoop() may be used outside of the
1577  * main thread (in which case, a new event source will be created).
1578  */
1579 int
AG_EventLoop(void)1580 AG_EventLoop(void)
1581 {
1582 	AG_EventSource *src = AG_GetEventSource();
1583 	AG_EventSink *es;
1584 
1585 	TAILQ_FOREACH(es, &src->prologues, sinks) {
1586 		es->fn(es, &es->fnArgs);
1587 	}
1588 	for (;;) {
1589 		TAILQ_FOREACH(es, &src->spinners, sinks) {
1590 			es->fn(es, &es->fnArgs);
1591 		}
1592 		if (src->sinkFn() == -1) {
1593 			return (1);
1594 		}
1595 		TAILQ_FOREACH(es, &src->epilogues, sinks) {
1596 			es->fn(es, &es->fnArgs);
1597 		}
1598 		if (src->breakReq)
1599 			break;
1600 	}
1601 	return (src->returnCode);
1602 }
1603 
1604 /* Request that we break out of AG_EventLoop(). */
1605 void
AG_Terminate(int retCode)1606 AG_Terminate(int retCode)
1607 {
1608 	AG_EventSource *src = AG_GetEventSource();
1609 
1610 	src->breakReq = 1;
1611 	src->returnCode = retCode;
1612 }
1613 void
AG_TerminateEv(AG_Event * ev)1614 AG_TerminateEv(AG_Event *ev)
1615 {
1616 	AG_EventSource *src = AG_GetEventSource();
1617 
1618 	if (ev->argc > 1 &&
1619 	    ev->argv[1].type == AG_VARIABLE_INT) {
1620 		src->returnCode = ev->argv[1].data.i;
1621 	} else {
1622 		src->returnCode = 0;
1623 	}
1624 	src->breakReq = 1;
1625 }
1626