1 /*
2  * Copyright (c) 2020 Andri Yngvason
3  *
4  * Permission to use, copy, modify, and/or distribute this software for any
5  * purpose with or without fee is hereby granted, provided that the above
6  * copyright notice and this permission notice appear in all copies.
7  *
8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
9  * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
10  * AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
11  * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
12  * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
13  * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
14  * PERFORMANCE OF THIS SOFTWARE.
15  */
16 
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <string.h>
20 #include <assert.h>
21 #include <poll.h>
22 #include <fcntl.h>
23 #include <stdbool.h>
24 #include <time.h>
25 #include <signal.h>
26 #include <pthread.h>
27 #include <stdatomic.h>
28 
29 #include "aml.h"
30 #include "backend.h"
31 #include "sys/queue.h"
32 #include "thread-pool.h"
33 
34 #define EXPORT __attribute__((visibility("default")))
35 
36 #define EVENT_MASK_DEFAULT AML_EVENT_READ
37 
38 #ifndef MIN
39 #define MIN(a, b) ((a) < (b) ? (a) : (b))
40 #endif
41 
42 enum aml_obj_type {
43 	AML_OBJ_UNSPEC = 0,
44 	AML_OBJ_AML,
45 	AML_OBJ_HANDLER,
46 	AML_OBJ_TIMER,
47 	AML_OBJ_TICKER,
48 	AML_OBJ_SIGNAL,
49 	AML_OBJ_WORK,
50 	AML_OBJ_IDLE,
51 };
52 
53 struct aml_obj {
54 	enum aml_obj_type type;
55 	int ref;
56 	void* userdata;
57 	aml_free_fn free_fn;
58 	aml_callback_fn cb;
59 	unsigned long long id;
60 
61 	void* backend_data;
62 
63 	LIST_ENTRY(aml_obj) link;
64 	LIST_ENTRY(aml_obj) global_link;
65 	TAILQ_ENTRY(aml_obj) event_link;
66 };
67 
68 LIST_HEAD(aml_obj_list, aml_obj);
69 TAILQ_HEAD(aml_obj_queue, aml_obj);
70 
71 struct aml_handler {
72 	struct aml_obj obj;
73 
74 	int fd;
75 	enum aml_event event_mask;
76 	atomic_uint revents;
77 
78 	struct aml* parent;
79 };
80 
81 struct aml_timer {
82 	struct aml_obj obj;
83 
84 	uint32_t timeout;
85 	uint64_t deadline;
86 
87 	LIST_ENTRY(aml_timer) link;
88 };
89 
90 LIST_HEAD(aml_timer_list, aml_timer);
91 
92 struct aml_signal {
93 	struct aml_obj obj;
94 
95 	int signo;
96 };
97 
98 struct aml_work {
99 	struct aml_obj obj;
100 
101 	aml_callback_fn work_fn;
102 };
103 
104 struct aml_idle {
105 	struct aml_obj obj;
106 
107 	LIST_ENTRY(aml_idle) link;
108 };
109 
110 LIST_HEAD(aml_idle_list, aml_idle);
111 
112 struct aml {
113 	struct aml_obj obj;
114 
115 	void* state;
116 	struct aml_backend backend;
117 
118 	int self_pipe_rfd, self_pipe_wfd;
119 
120 	bool do_exit;
121 
122 	struct aml_obj_list obj_list;
123 	pthread_mutex_t obj_list_mutex;
124 
125 	struct aml_timer_list timer_list;
126 	pthread_mutex_t timer_list_mutex;
127 
128 	struct aml_idle_list idle_list;
129 
130 	struct aml_obj_queue event_queue;
131 	pthread_mutex_t event_queue_mutex;
132 
133 	bool have_thread_pool;
134 };
135 
136 static struct aml* aml__default = NULL;
137 
138 static unsigned long long aml__obj_id = 0;
139 static struct aml_obj_list aml__obj_list = LIST_HEAD_INITIALIZER(aml__obj_list);
140 
141 // TODO: Properly initialise this?
142 static pthread_mutex_t aml__ref_mutex;
143 
144 extern struct aml_backend implementation;
145 
146 static struct aml_timer* aml__get_timer_with_earliest_deadline(struct aml* self);
147 
148 #if defined(GIT_VERSION)
149 EXPORT const char aml_version[] = GIT_VERSION;
150 #elif defined(PROJECT_VERSION)
151 EXPORT const char aml_version[] = PROJECT_VERSION;
152 #else
153 EXPORT const char aml_version[] = "UNKNOWN";
154 #endif
155 
156 EXPORT
aml_set_default(struct aml * aml)157 void aml_set_default(struct aml* aml)
158 {
159 	aml__default = aml;
160 }
161 
162 EXPORT
aml_get_default(void)163 struct aml* aml_get_default(void)
164 {
165 	return aml__default;
166 }
167 
aml__poll(struct aml * self,int timeout)168 static int aml__poll(struct aml* self, int timeout)
169 {
170 	return self->backend.poll(self->state, timeout);
171 }
172 
aml__add_fd(struct aml * self,struct aml_handler * handler)173 static int aml__add_fd(struct aml* self, struct aml_handler* handler)
174 {
175 	return self->backend.add_fd(self->state, handler);
176 }
177 
aml__del_fd(struct aml * self,struct aml_handler * handler)178 static int aml__del_fd(struct aml* self, struct aml_handler* handler)
179 {
180 	return self->backend.del_fd(self->state, handler);
181 }
182 
aml__mod_fd(struct aml * self,struct aml_handler * handler)183 static int aml__mod_fd(struct aml* self, struct aml_handler* handler)
184 {
185 	if (!self->backend.mod_fd) {
186 		aml__del_fd(self, handler);
187 		return aml__add_fd(self, handler);
188 	}
189 
190 	return self->backend.mod_fd(self->state, handler);
191 }
192 
aml__set_deadline(struct aml * self,uint64_t deadline)193 static int aml__set_deadline(struct aml* self, uint64_t deadline)
194 {
195 	return self->backend.set_deadline(self->state, deadline);
196 }
197 
aml__post_dispatch(struct aml * self)198 static void aml__post_dispatch(struct aml* self)
199 {
200 	if (self->backend.post_dispatch)
201 		self->backend.post_dispatch(self->state);
202 }
203 
aml__dont_block(int fd)204 static void aml__dont_block(int fd)
205 {
206 	fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
207 }
208 
aml__gettime_ms(struct aml * self)209 static uint64_t aml__gettime_ms(struct aml* self)
210 {
211 	struct timespec ts = { 0 };
212 	clock_gettime(self->backend.clock, &ts);
213 	return ts.tv_sec * 1000ULL + ts.tv_nsec / 1000000ULL;
214 }
215 
aml__ref_lock(void)216 static void aml__ref_lock(void)
217 {
218 	pthread_mutex_lock(&aml__ref_mutex);
219 }
220 
aml__ref_unlock(void)221 static void aml__ref_unlock(void)
222 {
223 	pthread_mutex_unlock(&aml__ref_mutex);
224 }
225 
aml__obj_global_ref(struct aml_obj * obj)226 static void aml__obj_global_ref(struct aml_obj* obj)
227 {
228 	aml__ref_lock();
229 	obj->id = aml__obj_id++;
230 	LIST_INSERT_HEAD(&aml__obj_list, obj, global_link);
231 	aml__ref_unlock();
232 }
233 
on_self_pipe_read(void * obj)234 static void on_self_pipe_read(void* obj) {
235 	struct aml* self = aml_get_userdata(obj);
236 	assert(self);
237 	assert(self->self_pipe_rfd == aml_get_fd(obj));
238 
239 	char dummy[256];
240 	while (read(self->self_pipe_rfd, dummy, sizeof(dummy)) > 0);
241 }
242 
aml__destroy_self_pipe(void * userdata)243 static void aml__destroy_self_pipe(void* userdata)
244 {
245 	struct aml* self = userdata;
246 
247 	close(self->self_pipe_rfd);
248 	close(self->self_pipe_wfd);
249 }
250 
aml__init_self_pipe(struct aml * self)251 static int aml__init_self_pipe(struct aml* self)
252 {
253 	if (self->backend.interrupt)
254 		return 0;
255 
256 	int fds[2];
257 	if (pipe(fds) < 0)
258 		return -1;
259 
260 	aml__dont_block(fds[0]);
261 	aml__dont_block(fds[1]);
262 
263 	self->self_pipe_rfd = fds[0];
264 	self->self_pipe_wfd = fds[1];
265 
266 	struct aml_handler* handler =
267 		aml_handler_new(self->self_pipe_rfd, on_self_pipe_read, self,
268 		                aml__destroy_self_pipe);
269 	if (!handler)
270 		goto failure;
271 
272 	aml_start(self, handler);
273 	aml_unref(handler);
274 
275 	return 0;
276 
277 failure:
278 	close(fds[1]);
279 	close(fds[0]);
280 	return -1;
281 }
282 
283 EXPORT
aml_interrupt(struct aml * self)284 void aml_interrupt(struct aml* self)
285 {
286 	if (self->backend.interrupt) {
287 		self->backend.interrupt(self->state);
288 		return;
289 	}
290 
291 	char one = 1;
292 	write(self->self_pipe_wfd, &one, sizeof(one));
293 }
294 
295 EXPORT
aml_new(void)296 struct aml* aml_new(void)
297 {
298 	struct aml* self = calloc(1, sizeof(*self));
299 	if (!self)
300 		return NULL;
301 
302 	self->obj.type = AML_OBJ_AML;
303 	self->obj.ref = 1;
304 
305 	LIST_INIT(&self->obj_list);
306 	LIST_INIT(&self->timer_list);
307 	LIST_INIT(&self->idle_list);
308 	TAILQ_INIT(&self->event_queue);
309 
310 	pthread_mutex_init(&self->event_queue_mutex, NULL);
311 	pthread_mutex_init(&self->obj_list_mutex, NULL);
312 	pthread_mutex_init(&self->timer_list_mutex, NULL);
313 
314 	memcpy(&self->backend, &implementation, sizeof(self->backend));
315 
316 	if (!self->backend.thread_pool_acquire)
317 		self->backend.thread_pool_acquire = thread_pool_acquire_default;
318 	if (!self->backend.thread_pool_release)
319 		self->backend.thread_pool_release = thread_pool_release_default;
320 	if (!self->backend.thread_pool_enqueue)
321 		self->backend.thread_pool_enqueue = thread_pool_enqueue_default;
322 
323 	self->state = self->backend.new_state(self);
324 	if (!self->state)
325 		goto failure;
326 
327 	if (aml__init_self_pipe(self) < 0)
328 		goto pipe_failure;
329 
330 	aml__obj_global_ref(&self->obj);
331 
332 	return self;
333 
334 pipe_failure:
335 	self->backend.del_state(self->state);
336 failure:
337 	free(self);
338 	return NULL;
339 }
340 
get_n_processors(void)341 static int get_n_processors(void)
342 {
343 #ifdef _SC_NPROCESSORS_ONLN
344 	return sysconf(_SC_NPROCESSORS_ONLN);
345 #else
346 	return 4; /* Guess */
347 #endif
348 }
349 
350 EXPORT
aml_require_workers(struct aml * self,int n)351 int aml_require_workers(struct aml* self, int n)
352 {
353 	if (n < 0)
354 		n = get_n_processors();
355 
356 	if (self->backend.thread_pool_acquire(self, n) < 0)
357 		return -1;
358 
359 	self->have_thread_pool = true;
360 	return 0;
361 }
362 
363 EXPORT
aml_handler_new(int fd,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)364 struct aml_handler* aml_handler_new(int fd, aml_callback_fn callback,
365                                     void* userdata, aml_free_fn free_fn)
366 {
367 	struct aml_handler* self = calloc(1, sizeof(*self));
368 	if (!self)
369 		return NULL;
370 
371 	self->obj.type = AML_OBJ_HANDLER;
372 	self->obj.ref = 1;
373 	self->obj.userdata = userdata;
374 	self->obj.free_fn = free_fn;
375 	self->obj.cb = callback;
376 
377 	self->fd = fd;
378 	self->event_mask = EVENT_MASK_DEFAULT;
379 
380 	aml__obj_global_ref(&self->obj);
381 
382 	return self;
383 }
384 
385 EXPORT
aml_timer_new(uint32_t timeout,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)386 struct aml_timer* aml_timer_new(uint32_t timeout, aml_callback_fn callback,
387                                 void* userdata, aml_free_fn free_fn)
388 {
389 	struct aml_timer* self = calloc(1, sizeof(*self));
390 	if (!self)
391 		return NULL;
392 
393 	self->obj.type = AML_OBJ_TIMER;
394 	self->obj.ref = 1;
395 	self->obj.userdata = userdata;
396 	self->obj.free_fn = free_fn;
397 	self->obj.cb = callback;
398 
399 	self->timeout = timeout;
400 
401 	aml__obj_global_ref(&self->obj);
402 
403 	return self;
404 }
405 
406 EXPORT
aml_ticker_new(uint32_t period,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)407 struct aml_ticker* aml_ticker_new(uint32_t period, aml_callback_fn callback,
408                                   void* userdata, aml_free_fn free_fn)
409 {
410 	struct aml_timer* timer =
411 		aml_timer_new(period, callback, userdata, free_fn);
412 	timer->obj.type = AML_OBJ_TICKER;
413 	return (struct aml_ticker*)timer;
414 }
415 
416 EXPORT
aml_signal_new(int signo,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)417 struct aml_signal* aml_signal_new(int signo, aml_callback_fn callback,
418                                   void* userdata, aml_free_fn free_fn)
419 {
420 	struct aml_signal* self = calloc(1, sizeof(*self));
421 	if (!self)
422 		return NULL;
423 
424 	self->obj.type = AML_OBJ_SIGNAL;
425 	self->obj.ref = 1;
426 	self->obj.userdata = userdata;
427 	self->obj.free_fn = free_fn;
428 	self->obj.cb = callback;
429 
430 	self->signo = signo;
431 
432 	aml__obj_global_ref(&self->obj);
433 
434 	return self;
435 }
436 
437 EXPORT
aml_work_new(aml_callback_fn work_fn,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)438 struct aml_work* aml_work_new(aml_callback_fn work_fn, aml_callback_fn callback,
439                               void* userdata, aml_free_fn free_fn)
440 {
441 	struct aml_work* self = calloc(1, sizeof(*self));
442 	if (!self)
443 		return NULL;
444 
445 	self->obj.type = AML_OBJ_WORK;
446 	self->obj.ref = 1;
447 	self->obj.userdata = userdata;
448 	self->obj.free_fn = free_fn;
449 	self->obj.cb = callback;
450 
451 	self->work_fn = work_fn;
452 
453 	aml__obj_global_ref(&self->obj);
454 
455 	return self;
456 }
457 
458 EXPORT
aml_idle_new(aml_callback_fn callback,void * userdata,aml_free_fn free_fn)459 struct aml_idle* aml_idle_new(aml_callback_fn callback, void* userdata,
460                               aml_free_fn free_fn)
461 {
462 	struct aml_idle* self = calloc(1, sizeof(*self));
463 	if (!self)
464 		return NULL;
465 
466 	self->obj.type = AML_OBJ_IDLE;
467 	self->obj.ref = 1;
468 	self->obj.userdata = userdata;
469 	self->obj.free_fn = free_fn;
470 	self->obj.cb = callback;
471 
472 	aml__obj_global_ref(&self->obj);
473 
474 	return self;
475 }
476 
aml__obj_is_started_unlocked(struct aml * self,void * obj)477 static bool aml__obj_is_started_unlocked(struct aml* self, void* obj)
478 {
479 	struct aml_obj* elem;
480 	LIST_FOREACH(elem, &self->obj_list, link)
481 		if (elem == obj)
482 			return true;
483 
484 	return false;
485 }
486 
487 EXPORT
aml_is_started(struct aml * self,void * obj)488 bool aml_is_started(struct aml* self, void* obj)
489 {
490 	pthread_mutex_lock(&self->obj_list_mutex);
491 	bool result = aml__obj_is_started_unlocked(self, obj);
492 	pthread_mutex_unlock(&self->obj_list_mutex);
493 	return result;
494 }
495 
aml__obj_try_add(struct aml * self,void * obj)496 static int aml__obj_try_add(struct aml* self, void* obj)
497 {
498 	int rc = -1;
499 
500 	pthread_mutex_lock(&self->obj_list_mutex);
501 
502 	if (!aml__obj_is_started_unlocked(self, obj)) {
503 		aml_ref(obj);
504 		LIST_INSERT_HEAD(&self->obj_list, (struct aml_obj*)obj, link);
505 		rc = 0;
506 	}
507 
508 	pthread_mutex_unlock(&self->obj_list_mutex);
509 
510 	return rc;
511 }
512 
aml__obj_remove_unlocked(struct aml * self,void * obj)513 static void aml__obj_remove_unlocked(struct aml* self, void* obj)
514 {
515 	LIST_REMOVE((struct aml_obj*)obj, link);
516 	aml_unref(obj);
517 }
518 
aml__obj_remove(struct aml * self,void * obj)519 static void aml__obj_remove(struct aml* self, void* obj)
520 {
521 	pthread_mutex_lock(&self->obj_list_mutex);
522 	aml__obj_remove_unlocked(self, obj);
523 	pthread_mutex_unlock(&self->obj_list_mutex);
524 }
525 
aml__obj_try_remove(struct aml * self,void * obj)526 static int aml__obj_try_remove(struct aml* self, void* obj)
527 {
528 	int rc = -1;
529 
530 	pthread_mutex_lock(&self->obj_list_mutex);
531 
532 	if (aml__obj_is_started_unlocked(self, obj)) {
533 		aml__obj_remove_unlocked(self, obj);
534 		rc = 0;
535 	}
536 
537 	pthread_mutex_unlock(&self->obj_list_mutex);
538 
539 	return rc;
540 }
541 
aml__start_handler(struct aml * self,struct aml_handler * handler)542 static int aml__start_handler(struct aml* self, struct aml_handler* handler)
543 {
544 	if (aml__add_fd(self, handler) < 0)
545 		return -1;
546 
547 	handler->parent = self;
548 
549 	return 0;
550 }
551 
aml__start_timer(struct aml * self,struct aml_timer * timer)552 static int aml__start_timer(struct aml* self, struct aml_timer* timer)
553 {
554 	timer->deadline = aml__gettime_ms(self) + timer->timeout;
555 
556 	pthread_mutex_lock(&self->timer_list_mutex);
557 	LIST_INSERT_HEAD(&self->timer_list, timer, link);
558 	pthread_mutex_unlock(&self->timer_list_mutex);
559 
560 	if (timer->timeout == 0) {
561 		assert(timer->obj.type != AML_OBJ_TICKER);
562 		aml_stop(self, timer);
563 		aml_emit(self, timer, 0);
564 		aml_interrupt(self);
565 		return 0;
566 	}
567 
568 	struct aml_timer* earliest = aml__get_timer_with_earliest_deadline(self);
569 	if (earliest == timer)
570 		aml__set_deadline(self, timer->deadline);
571 
572 	return 0;
573 }
574 
aml__start_signal(struct aml * self,struct aml_signal * sig)575 static int aml__start_signal(struct aml* self, struct aml_signal* sig)
576 {
577 	return self->backend.add_signal(self->state, sig);
578 }
579 
aml__start_work(struct aml * self,struct aml_work * work)580 static int aml__start_work(struct aml* self, struct aml_work* work)
581 {
582 	return self->backend.thread_pool_enqueue(self, work);
583 }
584 
aml__start_idle(struct aml * self,struct aml_idle * idle)585 static int aml__start_idle(struct aml* self, struct aml_idle* idle)
586 {
587 	LIST_INSERT_HEAD(&self->idle_list, idle, link);
588 	return 0;
589 }
590 
aml__start_unchecked(struct aml * self,void * obj)591 static int aml__start_unchecked(struct aml* self, void* obj)
592 {
593 	struct aml_obj* head = obj;
594 
595 	switch (head->type) {
596 	case AML_OBJ_AML: return -1;
597 	case AML_OBJ_HANDLER: return aml__start_handler(self, obj);
598 	case AML_OBJ_TIMER: /* fallthrough */
599 	case AML_OBJ_TICKER: return aml__start_timer(self, obj);
600 	case AML_OBJ_SIGNAL: return aml__start_signal(self, obj);
601 	case AML_OBJ_WORK: return aml__start_work(self, obj);
602 	case AML_OBJ_IDLE: return aml__start_idle(self, obj);
603 	case AML_OBJ_UNSPEC: break;
604 	}
605 
606 	abort();
607 	return -1;
608 }
609 
610 EXPORT
aml_start(struct aml * self,void * obj)611 int aml_start(struct aml* self, void* obj)
612 {
613 	if (aml__obj_try_add(self, obj) < 0)
614 		return -1;
615 
616 	if (aml__start_unchecked(self, obj) == 0)
617 		return 0;
618 
619 	aml__obj_remove(self, obj);
620 	return -1;
621 }
622 
aml__stop_handler(struct aml * self,struct aml_handler * handler)623 static int aml__stop_handler(struct aml* self, struct aml_handler* handler)
624 {
625 	if (aml__del_fd(self, handler) < 0)
626 		return -1;
627 
628 	handler->parent = NULL;
629 
630 	return 0;
631 }
632 
aml__stop_timer(struct aml * self,struct aml_timer * timer)633 static int aml__stop_timer(struct aml* self, struct aml_timer* timer)
634 {
635 	pthread_mutex_lock(&self->timer_list_mutex);
636 	LIST_REMOVE(timer, link);
637 	pthread_mutex_unlock(&self->timer_list_mutex);
638 	return 0;
639 }
640 
aml__stop_signal(struct aml * self,struct aml_signal * sig)641 static int aml__stop_signal(struct aml* self, struct aml_signal* sig)
642 {
643 	return self->backend.del_signal(self->state, sig);
644 }
645 
aml__stop_work(struct aml * self,struct aml_work * work)646 static int aml__stop_work(struct aml* self, struct aml_work* work)
647 {
648 	/* Note: The cb may be executed anyhow */
649 	return 0;
650 }
651 
aml__stop_idle(struct aml * self,struct aml_idle * idle)652 static int aml__stop_idle(struct aml* self, struct aml_idle* idle)
653 {
654 	LIST_REMOVE(idle, link);
655 	return 0;
656 }
657 
aml__stop_unchecked(struct aml * self,void * obj)658 static int aml__stop_unchecked(struct aml* self, void* obj)
659 {
660 	struct aml_obj* head = obj;
661 
662 	switch (head->type) {
663 	case AML_OBJ_AML: return -1;
664 	case AML_OBJ_HANDLER: return aml__stop_handler(self, obj);
665 	case AML_OBJ_TIMER: /* fallthrough */
666 	case AML_OBJ_TICKER: return aml__stop_timer(self, obj);
667 	case AML_OBJ_SIGNAL: return aml__stop_signal(self, obj);
668 	case AML_OBJ_WORK: return aml__stop_work(self, obj);
669 	case AML_OBJ_IDLE: return aml__stop_idle(self, obj);
670 	case AML_OBJ_UNSPEC: break;
671 	}
672 
673 	abort();
674 	return -1;
675 }
676 
677 EXPORT
aml_stop(struct aml * self,void * obj)678 int aml_stop(struct aml* self, void* obj)
679 {
680 	aml_ref(obj);
681 
682 	if (aml__obj_try_remove(self, obj) >= 0)
683 		aml__stop_unchecked(self, obj);
684 
685 	aml_unref(obj);
686 
687 	return 0;
688 }
689 
aml__get_timer_with_earliest_deadline(struct aml * self)690 static struct aml_timer* aml__get_timer_with_earliest_deadline(struct aml* self)
691 {
692 	uint64_t deadline = UINT64_MAX;
693 	struct aml_timer* result = NULL;
694 
695 	struct aml_timer* timer;
696 
697 	pthread_mutex_lock(&self->timer_list_mutex);
698 	LIST_FOREACH(timer, &self->timer_list, link)
699 		if (timer->deadline < deadline) {
700 			deadline = timer->deadline;
701 			result = timer;
702 		}
703 	pthread_mutex_unlock(&self->timer_list_mutex);
704 
705 	return result;
706 }
707 
aml__handle_timeout(struct aml * self,uint64_t now)708 static bool aml__handle_timeout(struct aml* self, uint64_t now)
709 {
710 	struct aml_timer* timer = aml__get_timer_with_earliest_deadline(self);
711 	if (!timer || timer->deadline > now)
712 		return false;
713 
714 	aml_emit(self, timer, 0);
715 
716 	switch (timer->obj.type) {
717 	case AML_OBJ_TIMER:
718 		aml_stop(self, timer);
719 		break;
720 	case AML_OBJ_TICKER:
721 		timer->deadline += timer->timeout;
722 		break;
723 	default:
724 		abort();
725 		break;
726 	}
727 
728 	return true;
729 }
730 
aml__handle_idle(struct aml * self)731 static void aml__handle_idle(struct aml* self)
732 {
733 	struct aml_idle* idle;
734 
735 	LIST_FOREACH(idle, &self->idle_list, link)
736 		if (idle->obj.cb)
737 			idle->obj.cb(idle);
738 }
739 
aml__handle_event(struct aml * self,struct aml_obj * obj)740 static void aml__handle_event(struct aml* self, struct aml_obj* obj)
741 {
742 	/* A reference is kept here in case an object is stopped inside the
743 	 * callback. We want the object to live until we're done with it.
744 	 */
745 	aml_ref(obj);
746 
747 	if (obj->cb)
748 		obj->cb(obj);
749 
750 	if (obj->type == AML_OBJ_HANDLER) {
751 		struct aml_handler* handler = (struct aml_handler*)obj;
752 		handler->revents = 0;
753 
754 		if (self->backend.flags & AML_BACKEND_EDGE_TRIGGERED)
755 			aml__mod_fd(self, handler);
756 	}
757 
758 	aml_unref(obj);
759 }
760 
761 /* Might exit earlier than timeout. It's up to the user to check */
762 EXPORT
aml_poll(struct aml * self,int timeout)763 int aml_poll(struct aml* self, int timeout)
764 {
765 	return aml__poll(self, timeout);
766 }
767 
aml__event_dequeue(struct aml * self)768 static struct aml_obj* aml__event_dequeue(struct aml* self)
769 {
770 	pthread_mutex_lock(&self->event_queue_mutex);
771 	struct aml_obj* obj = TAILQ_FIRST(&self->event_queue);
772 	if (obj)
773 		TAILQ_REMOVE(&self->event_queue, obj, event_link);
774 	pthread_mutex_unlock(&self->event_queue_mutex);
775 	return obj;
776 }
777 
778 EXPORT
aml_dispatch(struct aml * self)779 void aml_dispatch(struct aml* self)
780 {
781 	uint64_t now = aml__gettime_ms(self);
782 	while (aml__handle_timeout(self, now));
783 
784 	struct aml_timer* earliest = aml__get_timer_with_earliest_deadline(self);
785 	if (earliest) {
786 		assert(earliest->deadline > now);
787 		aml__set_deadline(self, earliest->deadline);
788 	}
789 
790 	sigset_t sig_old, sig_new;
791 	sigfillset(&sig_new);
792 
793 	pthread_sigmask(SIG_BLOCK, &sig_new, &sig_old);
794 
795 	struct aml_obj* obj;
796 	while ((obj = aml__event_dequeue(self)) != NULL) {
797 		aml__handle_event(self, obj);
798 		aml_unref(obj);
799 	}
800 
801 	pthread_sigmask(SIG_SETMASK, &sig_old, NULL);
802 
803 	aml__handle_idle(self);
804 	aml__post_dispatch(self);
805 }
806 
807 EXPORT
aml_run(struct aml * self)808 int aml_run(struct aml* self)
809 {
810 	self->do_exit = false;
811 
812 	do {
813 		aml_poll(self, -1);
814 		aml_dispatch(self);
815 	} while (!self->do_exit);
816 
817 	return 0;
818 }
819 
820 EXPORT
aml_exit(struct aml * self)821 void aml_exit(struct aml* self)
822 {
823 	self->do_exit = true;
824 
825 	if (self->backend.exit)
826 		self->backend.exit(self->state);
827 }
828 
829 EXPORT
aml_ref(void * obj)830 int aml_ref(void* obj)
831 {
832 	struct aml_obj* self = obj;
833 
834 	aml__ref_lock();
835 	int ref = self->ref++;
836 	aml__ref_unlock();
837 
838 	return ref;
839 }
840 
aml__free(struct aml * self)841 static void aml__free(struct aml* self)
842 {
843 	while (!LIST_EMPTY(&self->obj_list)) {
844 		struct aml_obj* obj = LIST_FIRST(&self->obj_list);
845 
846 		aml__stop_unchecked(self, obj);
847 		aml__obj_remove_unlocked(self, obj);
848 	}
849 
850 	if (self->have_thread_pool)
851 		self->backend.thread_pool_release(self);
852 
853 	self->backend.del_state(self->state);
854 
855 	while (!TAILQ_EMPTY(&self->event_queue)) {
856 		struct aml_obj* obj = TAILQ_FIRST(&self->event_queue);
857 		TAILQ_REMOVE(&self->event_queue, obj, event_link);
858 		aml_unref(obj);
859 	}
860 
861 	pthread_mutex_destroy(&self->timer_list_mutex);
862 	pthread_mutex_destroy(&self->obj_list_mutex);
863 	pthread_mutex_destroy(&self->event_queue_mutex);
864 
865 	free(self);
866 }
867 
aml__free_handler(struct aml_handler * self)868 static void aml__free_handler(struct aml_handler* self)
869 {
870 	if (self->obj.free_fn)
871 		self->obj.free_fn(self->obj.userdata);
872 
873 	free(self);
874 }
875 
aml__free_timer(struct aml_timer * self)876 static void aml__free_timer(struct aml_timer* self)
877 {
878 	if (self->obj.free_fn)
879 		self->obj.free_fn(self->obj.userdata);
880 
881 	free(self);
882 }
883 
aml__free_signal(struct aml_signal * self)884 static void aml__free_signal(struct aml_signal* self)
885 {
886 	if (self->obj.free_fn)
887 		self->obj.free_fn(self->obj.userdata);
888 
889 	free(self);
890 }
891 
aml__free_work(struct aml_work * self)892 static void aml__free_work(struct aml_work* self)
893 {
894 	if (self->obj.free_fn)
895 		self->obj.free_fn(self->obj.userdata);
896 
897 	free(self);
898 }
899 
aml__free_idle(struct aml_idle * self)900 static void aml__free_idle(struct aml_idle* self)
901 {
902 	if (self->obj.free_fn)
903 		self->obj.free_fn(self->obj.userdata);
904 
905 	free(self);
906 }
907 
908 EXPORT
aml_unref(void * obj)909 int aml_unref(void* obj)
910 {
911 	struct aml_obj* self = obj;
912 
913 	aml__ref_lock();
914 	int ref = --self->ref;
915 	if (ref == 0)
916 		LIST_REMOVE(self, global_link);
917 	aml__ref_unlock();
918 	assert(ref >= 0);
919 	if (ref > 0)
920 		goto done;
921 
922 	switch (self->type) {
923 	case AML_OBJ_AML:
924 		aml__free(obj);
925 		break;
926 	case AML_OBJ_HANDLER:
927 		aml__free_handler(obj);
928 		break;
929 	case AML_OBJ_TIMER:
930 		/* fallthrough */
931 	case AML_OBJ_TICKER:
932 		aml__free_timer(obj);
933 		break;
934 	case AML_OBJ_SIGNAL:
935 		aml__free_signal(obj);
936 		break;
937 	case AML_OBJ_WORK:
938 		aml__free_work(obj);
939 		break;
940 	case AML_OBJ_IDLE:
941 		aml__free_idle(obj);
942 		break;
943 	default:
944 		abort();
945 		break;
946 	}
947 
948 done:
949 	return ref;
950 }
951 
952 EXPORT
aml_get_id(const void * obj)953 unsigned long long aml_get_id(const void* obj)
954 {
955 	const struct aml_obj* aml_obj = obj;
956 	return aml_obj->id;
957 }
958 
959 EXPORT
aml_try_ref(unsigned long long id)960 void* aml_try_ref(unsigned long long id)
961 {
962 	struct aml_obj* obj = NULL;
963 
964 	aml__ref_lock();
965 	LIST_FOREACH(obj, &aml__obj_list, global_link)
966 		if (obj->id == id)
967 			break;
968 
969 	if (obj && obj->id == id)
970 		obj->ref++;
971 	else
972 		obj = NULL;
973 
974 	aml__ref_unlock();
975 	return obj;
976 }
977 
978 EXPORT
aml_get_userdata(const void * obj)979 void* aml_get_userdata(const void* obj)
980 {
981 	const struct aml_obj* aml_obj = obj;
982 	return aml_obj->userdata;
983 }
984 
985 EXPORT
aml_set_userdata(void * obj,void * userdata,aml_free_fn free_fn)986 void aml_set_userdata(void* obj, void* userdata, aml_free_fn free_fn)
987 {
988 	struct aml_obj* aml_obj = obj;
989 	aml_obj->userdata = userdata;
990 	aml_obj->free_fn = free_fn;
991 }
992 
aml_emit(struct aml * self,void * ptr,uint32_t revents)993 void aml_emit(struct aml* self, void* ptr, uint32_t revents)
994 {
995 	struct aml_obj* obj = ptr;
996 
997 	if (obj->type == AML_OBJ_HANDLER) {
998 		struct aml_handler* handler = ptr;
999 		uint32_t old = atomic_fetch_or(&handler->revents, revents);
1000 		if (old != 0)
1001 			return;
1002 	}
1003 
1004 	sigset_t sig_old, sig_new;
1005 	sigfillset(&sig_new);
1006 
1007 	pthread_sigmask(SIG_BLOCK, &sig_new, &sig_old);
1008 	pthread_mutex_lock(&self->event_queue_mutex);
1009 	TAILQ_INSERT_TAIL(&self->event_queue, obj, event_link);
1010 	aml_ref(obj);
1011 	pthread_mutex_unlock(&self->event_queue_mutex);
1012 	pthread_sigmask(SIG_SETMASK, &sig_old, NULL);
1013 }
1014 
1015 EXPORT
aml_get_event_mask(const struct aml_handler * handler)1016 enum aml_event aml_get_event_mask(const struct aml_handler* handler)
1017 {
1018 	return handler->event_mask;
1019 }
1020 
1021 EXPORT
aml_set_event_mask(struct aml_handler * handler,enum aml_event mask)1022 void aml_set_event_mask(struct aml_handler* handler, enum aml_event mask)
1023 {
1024 	handler->event_mask = mask;
1025 
1026 	if (handler->parent && aml_is_started(handler->parent, handler))
1027 		aml__mod_fd(handler->parent, handler);
1028 }
1029 
1030 EXPORT
aml_get_revents(const struct aml_handler * handler)1031 enum aml_event aml_get_revents(const struct aml_handler* handler)
1032 {
1033 	return handler->revents;
1034 }
1035 
1036 EXPORT
aml_get_fd(const void * ptr)1037 int aml_get_fd(const void* ptr)
1038 {
1039 	const struct aml_obj* obj = ptr;
1040 
1041 	switch (obj->type) {
1042 	case AML_OBJ_AML:;
1043 		const struct aml* aml = ptr;
1044 		return aml->backend.get_fd ?
1045 			aml->backend.get_fd(aml->state) : -1;
1046 	case AML_OBJ_HANDLER:
1047 		return ((struct aml_handler*)ptr)->fd;
1048 	default:
1049 		break;
1050 	}
1051 
1052 	return -1;
1053 }
1054 
1055 EXPORT
aml_get_signo(const struct aml_signal * sig)1056 int aml_get_signo(const struct aml_signal* sig)
1057 {
1058 	return sig->signo;
1059 }
1060 
aml_get_work_fn(const struct aml_work * work)1061 aml_callback_fn aml_get_work_fn(const struct aml_work* work)
1062 {
1063 	return work->work_fn;
1064 }
1065 
aml_get_backend_data(const void * ptr)1066 void* aml_get_backend_data(const void* ptr)
1067 {
1068 	const struct aml_obj* obj = ptr;
1069 	return obj->backend_data;
1070 }
1071 
aml_set_backend_data(void * ptr,void * data)1072 void aml_set_backend_data(void* ptr, void* data)
1073 {
1074 	struct aml_obj* obj = ptr;
1075 	obj->backend_data = data;
1076 }
1077 
aml_get_backend_state(const struct aml * self)1078 void* aml_get_backend_state(const struct aml* self)
1079 {
1080 	return self->state;
1081 }
1082 
1083 EXPORT
aml_set_duration(void * ptr,uint32_t duration)1084 void aml_set_duration(void* ptr, uint32_t duration)
1085 {
1086 	struct aml_obj* obj = ptr;
1087 
1088 	switch (obj->type) {
1089 	case AML_OBJ_TIMER: /* fallthrough */
1090 	case AML_OBJ_TICKER:
1091 		((struct aml_timer*)ptr)->timeout = duration;
1092 		return;
1093 	default:
1094 		break;
1095 	}
1096 
1097 	abort();
1098 }
1099