1 /*
2 * Copyright (c) 2020 Andri Yngvason
3 *
4 * Permission to use, copy, modify, and/or distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
9 * REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
10 * AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
11 * INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
12 * LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
13 * OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
14 * PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <string.h>
20 #include <assert.h>
21 #include <poll.h>
22 #include <fcntl.h>
23 #include <stdbool.h>
24 #include <time.h>
25 #include <signal.h>
26 #include <pthread.h>
27 #include <stdatomic.h>
28
29 #include "aml.h"
30 #include "backend.h"
31 #include "sys/queue.h"
32 #include "thread-pool.h"
33
34 #define EXPORT __attribute__((visibility("default")))
35
36 #define EVENT_MASK_DEFAULT AML_EVENT_READ
37
38 #ifndef MIN
39 #define MIN(a, b) ((a) < (b) ? (a) : (b))
40 #endif
41
42 enum aml_obj_type {
43 AML_OBJ_UNSPEC = 0,
44 AML_OBJ_AML,
45 AML_OBJ_HANDLER,
46 AML_OBJ_TIMER,
47 AML_OBJ_TICKER,
48 AML_OBJ_SIGNAL,
49 AML_OBJ_WORK,
50 AML_OBJ_IDLE,
51 };
52
53 struct aml_obj {
54 enum aml_obj_type type;
55 int ref;
56 void* userdata;
57 aml_free_fn free_fn;
58 aml_callback_fn cb;
59 unsigned long long id;
60
61 void* backend_data;
62
63 LIST_ENTRY(aml_obj) link;
64 LIST_ENTRY(aml_obj) global_link;
65 TAILQ_ENTRY(aml_obj) event_link;
66 };
67
68 LIST_HEAD(aml_obj_list, aml_obj);
69 TAILQ_HEAD(aml_obj_queue, aml_obj);
70
71 struct aml_handler {
72 struct aml_obj obj;
73
74 int fd;
75 enum aml_event event_mask;
76 atomic_uint revents;
77
78 struct aml* parent;
79 };
80
81 struct aml_timer {
82 struct aml_obj obj;
83
84 uint32_t timeout;
85 uint64_t deadline;
86
87 LIST_ENTRY(aml_timer) link;
88 };
89
90 LIST_HEAD(aml_timer_list, aml_timer);
91
92 struct aml_signal {
93 struct aml_obj obj;
94
95 int signo;
96 };
97
98 struct aml_work {
99 struct aml_obj obj;
100
101 aml_callback_fn work_fn;
102 };
103
104 struct aml_idle {
105 struct aml_obj obj;
106
107 LIST_ENTRY(aml_idle) link;
108 };
109
110 LIST_HEAD(aml_idle_list, aml_idle);
111
112 struct aml {
113 struct aml_obj obj;
114
115 void* state;
116 struct aml_backend backend;
117
118 int self_pipe_rfd, self_pipe_wfd;
119
120 bool do_exit;
121
122 struct aml_obj_list obj_list;
123 pthread_mutex_t obj_list_mutex;
124
125 struct aml_timer_list timer_list;
126 pthread_mutex_t timer_list_mutex;
127
128 struct aml_idle_list idle_list;
129
130 struct aml_obj_queue event_queue;
131 pthread_mutex_t event_queue_mutex;
132
133 bool have_thread_pool;
134 };
135
136 static struct aml* aml__default = NULL;
137
138 static unsigned long long aml__obj_id = 0;
139 static struct aml_obj_list aml__obj_list = LIST_HEAD_INITIALIZER(aml__obj_list);
140
141 // TODO: Properly initialise this?
142 static pthread_mutex_t aml__ref_mutex;
143
144 extern struct aml_backend implementation;
145
146 static struct aml_timer* aml__get_timer_with_earliest_deadline(struct aml* self);
147
148 #if defined(GIT_VERSION)
149 EXPORT const char aml_version[] = GIT_VERSION;
150 #elif defined(PROJECT_VERSION)
151 EXPORT const char aml_version[] = PROJECT_VERSION;
152 #else
153 EXPORT const char aml_version[] = "UNKNOWN";
154 #endif
155
156 EXPORT
aml_set_default(struct aml * aml)157 void aml_set_default(struct aml* aml)
158 {
159 aml__default = aml;
160 }
161
162 EXPORT
aml_get_default(void)163 struct aml* aml_get_default(void)
164 {
165 return aml__default;
166 }
167
aml__poll(struct aml * self,int timeout)168 static int aml__poll(struct aml* self, int timeout)
169 {
170 return self->backend.poll(self->state, timeout);
171 }
172
aml__add_fd(struct aml * self,struct aml_handler * handler)173 static int aml__add_fd(struct aml* self, struct aml_handler* handler)
174 {
175 return self->backend.add_fd(self->state, handler);
176 }
177
aml__del_fd(struct aml * self,struct aml_handler * handler)178 static int aml__del_fd(struct aml* self, struct aml_handler* handler)
179 {
180 return self->backend.del_fd(self->state, handler);
181 }
182
aml__mod_fd(struct aml * self,struct aml_handler * handler)183 static int aml__mod_fd(struct aml* self, struct aml_handler* handler)
184 {
185 if (!self->backend.mod_fd) {
186 aml__del_fd(self, handler);
187 return aml__add_fd(self, handler);
188 }
189
190 return self->backend.mod_fd(self->state, handler);
191 }
192
aml__set_deadline(struct aml * self,uint64_t deadline)193 static int aml__set_deadline(struct aml* self, uint64_t deadline)
194 {
195 return self->backend.set_deadline(self->state, deadline);
196 }
197
aml__post_dispatch(struct aml * self)198 static void aml__post_dispatch(struct aml* self)
199 {
200 if (self->backend.post_dispatch)
201 self->backend.post_dispatch(self->state);
202 }
203
aml__dont_block(int fd)204 static void aml__dont_block(int fd)
205 {
206 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
207 }
208
aml__gettime_ms(struct aml * self)209 static uint64_t aml__gettime_ms(struct aml* self)
210 {
211 struct timespec ts = { 0 };
212 clock_gettime(self->backend.clock, &ts);
213 return ts.tv_sec * 1000ULL + ts.tv_nsec / 1000000ULL;
214 }
215
aml__ref_lock(void)216 static void aml__ref_lock(void)
217 {
218 pthread_mutex_lock(&aml__ref_mutex);
219 }
220
aml__ref_unlock(void)221 static void aml__ref_unlock(void)
222 {
223 pthread_mutex_unlock(&aml__ref_mutex);
224 }
225
aml__obj_global_ref(struct aml_obj * obj)226 static void aml__obj_global_ref(struct aml_obj* obj)
227 {
228 aml__ref_lock();
229 obj->id = aml__obj_id++;
230 LIST_INSERT_HEAD(&aml__obj_list, obj, global_link);
231 aml__ref_unlock();
232 }
233
on_self_pipe_read(void * obj)234 static void on_self_pipe_read(void* obj) {
235 struct aml* self = aml_get_userdata(obj);
236 assert(self);
237 assert(self->self_pipe_rfd == aml_get_fd(obj));
238
239 char dummy[256];
240 while (read(self->self_pipe_rfd, dummy, sizeof(dummy)) > 0);
241 }
242
aml__destroy_self_pipe(void * userdata)243 static void aml__destroy_self_pipe(void* userdata)
244 {
245 struct aml* self = userdata;
246
247 close(self->self_pipe_rfd);
248 close(self->self_pipe_wfd);
249 }
250
aml__init_self_pipe(struct aml * self)251 static int aml__init_self_pipe(struct aml* self)
252 {
253 if (self->backend.interrupt)
254 return 0;
255
256 int fds[2];
257 if (pipe(fds) < 0)
258 return -1;
259
260 aml__dont_block(fds[0]);
261 aml__dont_block(fds[1]);
262
263 self->self_pipe_rfd = fds[0];
264 self->self_pipe_wfd = fds[1];
265
266 struct aml_handler* handler =
267 aml_handler_new(self->self_pipe_rfd, on_self_pipe_read, self,
268 aml__destroy_self_pipe);
269 if (!handler)
270 goto failure;
271
272 aml_start(self, handler);
273 aml_unref(handler);
274
275 return 0;
276
277 failure:
278 close(fds[1]);
279 close(fds[0]);
280 return -1;
281 }
282
283 EXPORT
aml_interrupt(struct aml * self)284 void aml_interrupt(struct aml* self)
285 {
286 if (self->backend.interrupt) {
287 self->backend.interrupt(self->state);
288 return;
289 }
290
291 char one = 1;
292 write(self->self_pipe_wfd, &one, sizeof(one));
293 }
294
295 EXPORT
aml_new(void)296 struct aml* aml_new(void)
297 {
298 struct aml* self = calloc(1, sizeof(*self));
299 if (!self)
300 return NULL;
301
302 self->obj.type = AML_OBJ_AML;
303 self->obj.ref = 1;
304
305 LIST_INIT(&self->obj_list);
306 LIST_INIT(&self->timer_list);
307 LIST_INIT(&self->idle_list);
308 TAILQ_INIT(&self->event_queue);
309
310 pthread_mutex_init(&self->event_queue_mutex, NULL);
311 pthread_mutex_init(&self->obj_list_mutex, NULL);
312 pthread_mutex_init(&self->timer_list_mutex, NULL);
313
314 memcpy(&self->backend, &implementation, sizeof(self->backend));
315
316 if (!self->backend.thread_pool_acquire)
317 self->backend.thread_pool_acquire = thread_pool_acquire_default;
318 if (!self->backend.thread_pool_release)
319 self->backend.thread_pool_release = thread_pool_release_default;
320 if (!self->backend.thread_pool_enqueue)
321 self->backend.thread_pool_enqueue = thread_pool_enqueue_default;
322
323 self->state = self->backend.new_state(self);
324 if (!self->state)
325 goto failure;
326
327 if (aml__init_self_pipe(self) < 0)
328 goto pipe_failure;
329
330 aml__obj_global_ref(&self->obj);
331
332 return self;
333
334 pipe_failure:
335 self->backend.del_state(self->state);
336 failure:
337 free(self);
338 return NULL;
339 }
340
get_n_processors(void)341 static int get_n_processors(void)
342 {
343 #ifdef _SC_NPROCESSORS_ONLN
344 return sysconf(_SC_NPROCESSORS_ONLN);
345 #else
346 return 4; /* Guess */
347 #endif
348 }
349
350 EXPORT
aml_require_workers(struct aml * self,int n)351 int aml_require_workers(struct aml* self, int n)
352 {
353 if (n < 0)
354 n = get_n_processors();
355
356 if (self->backend.thread_pool_acquire(self, n) < 0)
357 return -1;
358
359 self->have_thread_pool = true;
360 return 0;
361 }
362
363 EXPORT
aml_handler_new(int fd,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)364 struct aml_handler* aml_handler_new(int fd, aml_callback_fn callback,
365 void* userdata, aml_free_fn free_fn)
366 {
367 struct aml_handler* self = calloc(1, sizeof(*self));
368 if (!self)
369 return NULL;
370
371 self->obj.type = AML_OBJ_HANDLER;
372 self->obj.ref = 1;
373 self->obj.userdata = userdata;
374 self->obj.free_fn = free_fn;
375 self->obj.cb = callback;
376
377 self->fd = fd;
378 self->event_mask = EVENT_MASK_DEFAULT;
379
380 aml__obj_global_ref(&self->obj);
381
382 return self;
383 }
384
385 EXPORT
aml_timer_new(uint32_t timeout,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)386 struct aml_timer* aml_timer_new(uint32_t timeout, aml_callback_fn callback,
387 void* userdata, aml_free_fn free_fn)
388 {
389 struct aml_timer* self = calloc(1, sizeof(*self));
390 if (!self)
391 return NULL;
392
393 self->obj.type = AML_OBJ_TIMER;
394 self->obj.ref = 1;
395 self->obj.userdata = userdata;
396 self->obj.free_fn = free_fn;
397 self->obj.cb = callback;
398
399 self->timeout = timeout;
400
401 aml__obj_global_ref(&self->obj);
402
403 return self;
404 }
405
406 EXPORT
aml_ticker_new(uint32_t period,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)407 struct aml_ticker* aml_ticker_new(uint32_t period, aml_callback_fn callback,
408 void* userdata, aml_free_fn free_fn)
409 {
410 struct aml_timer* timer =
411 aml_timer_new(period, callback, userdata, free_fn);
412 timer->obj.type = AML_OBJ_TICKER;
413 return (struct aml_ticker*)timer;
414 }
415
416 EXPORT
aml_signal_new(int signo,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)417 struct aml_signal* aml_signal_new(int signo, aml_callback_fn callback,
418 void* userdata, aml_free_fn free_fn)
419 {
420 struct aml_signal* self = calloc(1, sizeof(*self));
421 if (!self)
422 return NULL;
423
424 self->obj.type = AML_OBJ_SIGNAL;
425 self->obj.ref = 1;
426 self->obj.userdata = userdata;
427 self->obj.free_fn = free_fn;
428 self->obj.cb = callback;
429
430 self->signo = signo;
431
432 aml__obj_global_ref(&self->obj);
433
434 return self;
435 }
436
437 EXPORT
aml_work_new(aml_callback_fn work_fn,aml_callback_fn callback,void * userdata,aml_free_fn free_fn)438 struct aml_work* aml_work_new(aml_callback_fn work_fn, aml_callback_fn callback,
439 void* userdata, aml_free_fn free_fn)
440 {
441 struct aml_work* self = calloc(1, sizeof(*self));
442 if (!self)
443 return NULL;
444
445 self->obj.type = AML_OBJ_WORK;
446 self->obj.ref = 1;
447 self->obj.userdata = userdata;
448 self->obj.free_fn = free_fn;
449 self->obj.cb = callback;
450
451 self->work_fn = work_fn;
452
453 aml__obj_global_ref(&self->obj);
454
455 return self;
456 }
457
458 EXPORT
aml_idle_new(aml_callback_fn callback,void * userdata,aml_free_fn free_fn)459 struct aml_idle* aml_idle_new(aml_callback_fn callback, void* userdata,
460 aml_free_fn free_fn)
461 {
462 struct aml_idle* self = calloc(1, sizeof(*self));
463 if (!self)
464 return NULL;
465
466 self->obj.type = AML_OBJ_IDLE;
467 self->obj.ref = 1;
468 self->obj.userdata = userdata;
469 self->obj.free_fn = free_fn;
470 self->obj.cb = callback;
471
472 aml__obj_global_ref(&self->obj);
473
474 return self;
475 }
476
aml__obj_is_started_unlocked(struct aml * self,void * obj)477 static bool aml__obj_is_started_unlocked(struct aml* self, void* obj)
478 {
479 struct aml_obj* elem;
480 LIST_FOREACH(elem, &self->obj_list, link)
481 if (elem == obj)
482 return true;
483
484 return false;
485 }
486
487 EXPORT
aml_is_started(struct aml * self,void * obj)488 bool aml_is_started(struct aml* self, void* obj)
489 {
490 pthread_mutex_lock(&self->obj_list_mutex);
491 bool result = aml__obj_is_started_unlocked(self, obj);
492 pthread_mutex_unlock(&self->obj_list_mutex);
493 return result;
494 }
495
aml__obj_try_add(struct aml * self,void * obj)496 static int aml__obj_try_add(struct aml* self, void* obj)
497 {
498 int rc = -1;
499
500 pthread_mutex_lock(&self->obj_list_mutex);
501
502 if (!aml__obj_is_started_unlocked(self, obj)) {
503 aml_ref(obj);
504 LIST_INSERT_HEAD(&self->obj_list, (struct aml_obj*)obj, link);
505 rc = 0;
506 }
507
508 pthread_mutex_unlock(&self->obj_list_mutex);
509
510 return rc;
511 }
512
aml__obj_remove_unlocked(struct aml * self,void * obj)513 static void aml__obj_remove_unlocked(struct aml* self, void* obj)
514 {
515 LIST_REMOVE((struct aml_obj*)obj, link);
516 aml_unref(obj);
517 }
518
aml__obj_remove(struct aml * self,void * obj)519 static void aml__obj_remove(struct aml* self, void* obj)
520 {
521 pthread_mutex_lock(&self->obj_list_mutex);
522 aml__obj_remove_unlocked(self, obj);
523 pthread_mutex_unlock(&self->obj_list_mutex);
524 }
525
aml__obj_try_remove(struct aml * self,void * obj)526 static int aml__obj_try_remove(struct aml* self, void* obj)
527 {
528 int rc = -1;
529
530 pthread_mutex_lock(&self->obj_list_mutex);
531
532 if (aml__obj_is_started_unlocked(self, obj)) {
533 aml__obj_remove_unlocked(self, obj);
534 rc = 0;
535 }
536
537 pthread_mutex_unlock(&self->obj_list_mutex);
538
539 return rc;
540 }
541
aml__start_handler(struct aml * self,struct aml_handler * handler)542 static int aml__start_handler(struct aml* self, struct aml_handler* handler)
543 {
544 if (aml__add_fd(self, handler) < 0)
545 return -1;
546
547 handler->parent = self;
548
549 return 0;
550 }
551
aml__start_timer(struct aml * self,struct aml_timer * timer)552 static int aml__start_timer(struct aml* self, struct aml_timer* timer)
553 {
554 timer->deadline = aml__gettime_ms(self) + timer->timeout;
555
556 pthread_mutex_lock(&self->timer_list_mutex);
557 LIST_INSERT_HEAD(&self->timer_list, timer, link);
558 pthread_mutex_unlock(&self->timer_list_mutex);
559
560 if (timer->timeout == 0) {
561 assert(timer->obj.type != AML_OBJ_TICKER);
562 aml_stop(self, timer);
563 aml_emit(self, timer, 0);
564 aml_interrupt(self);
565 return 0;
566 }
567
568 struct aml_timer* earliest = aml__get_timer_with_earliest_deadline(self);
569 if (earliest == timer)
570 aml__set_deadline(self, timer->deadline);
571
572 return 0;
573 }
574
aml__start_signal(struct aml * self,struct aml_signal * sig)575 static int aml__start_signal(struct aml* self, struct aml_signal* sig)
576 {
577 return self->backend.add_signal(self->state, sig);
578 }
579
aml__start_work(struct aml * self,struct aml_work * work)580 static int aml__start_work(struct aml* self, struct aml_work* work)
581 {
582 return self->backend.thread_pool_enqueue(self, work);
583 }
584
aml__start_idle(struct aml * self,struct aml_idle * idle)585 static int aml__start_idle(struct aml* self, struct aml_idle* idle)
586 {
587 LIST_INSERT_HEAD(&self->idle_list, idle, link);
588 return 0;
589 }
590
aml__start_unchecked(struct aml * self,void * obj)591 static int aml__start_unchecked(struct aml* self, void* obj)
592 {
593 struct aml_obj* head = obj;
594
595 switch (head->type) {
596 case AML_OBJ_AML: return -1;
597 case AML_OBJ_HANDLER: return aml__start_handler(self, obj);
598 case AML_OBJ_TIMER: /* fallthrough */
599 case AML_OBJ_TICKER: return aml__start_timer(self, obj);
600 case AML_OBJ_SIGNAL: return aml__start_signal(self, obj);
601 case AML_OBJ_WORK: return aml__start_work(self, obj);
602 case AML_OBJ_IDLE: return aml__start_idle(self, obj);
603 case AML_OBJ_UNSPEC: break;
604 }
605
606 abort();
607 return -1;
608 }
609
610 EXPORT
aml_start(struct aml * self,void * obj)611 int aml_start(struct aml* self, void* obj)
612 {
613 if (aml__obj_try_add(self, obj) < 0)
614 return -1;
615
616 if (aml__start_unchecked(self, obj) == 0)
617 return 0;
618
619 aml__obj_remove(self, obj);
620 return -1;
621 }
622
aml__stop_handler(struct aml * self,struct aml_handler * handler)623 static int aml__stop_handler(struct aml* self, struct aml_handler* handler)
624 {
625 if (aml__del_fd(self, handler) < 0)
626 return -1;
627
628 handler->parent = NULL;
629
630 return 0;
631 }
632
aml__stop_timer(struct aml * self,struct aml_timer * timer)633 static int aml__stop_timer(struct aml* self, struct aml_timer* timer)
634 {
635 pthread_mutex_lock(&self->timer_list_mutex);
636 LIST_REMOVE(timer, link);
637 pthread_mutex_unlock(&self->timer_list_mutex);
638 return 0;
639 }
640
aml__stop_signal(struct aml * self,struct aml_signal * sig)641 static int aml__stop_signal(struct aml* self, struct aml_signal* sig)
642 {
643 return self->backend.del_signal(self->state, sig);
644 }
645
aml__stop_work(struct aml * self,struct aml_work * work)646 static int aml__stop_work(struct aml* self, struct aml_work* work)
647 {
648 /* Note: The cb may be executed anyhow */
649 return 0;
650 }
651
aml__stop_idle(struct aml * self,struct aml_idle * idle)652 static int aml__stop_idle(struct aml* self, struct aml_idle* idle)
653 {
654 LIST_REMOVE(idle, link);
655 return 0;
656 }
657
aml__stop_unchecked(struct aml * self,void * obj)658 static int aml__stop_unchecked(struct aml* self, void* obj)
659 {
660 struct aml_obj* head = obj;
661
662 switch (head->type) {
663 case AML_OBJ_AML: return -1;
664 case AML_OBJ_HANDLER: return aml__stop_handler(self, obj);
665 case AML_OBJ_TIMER: /* fallthrough */
666 case AML_OBJ_TICKER: return aml__stop_timer(self, obj);
667 case AML_OBJ_SIGNAL: return aml__stop_signal(self, obj);
668 case AML_OBJ_WORK: return aml__stop_work(self, obj);
669 case AML_OBJ_IDLE: return aml__stop_idle(self, obj);
670 case AML_OBJ_UNSPEC: break;
671 }
672
673 abort();
674 return -1;
675 }
676
677 EXPORT
aml_stop(struct aml * self,void * obj)678 int aml_stop(struct aml* self, void* obj)
679 {
680 aml_ref(obj);
681
682 if (aml__obj_try_remove(self, obj) >= 0)
683 aml__stop_unchecked(self, obj);
684
685 aml_unref(obj);
686
687 return 0;
688 }
689
aml__get_timer_with_earliest_deadline(struct aml * self)690 static struct aml_timer* aml__get_timer_with_earliest_deadline(struct aml* self)
691 {
692 uint64_t deadline = UINT64_MAX;
693 struct aml_timer* result = NULL;
694
695 struct aml_timer* timer;
696
697 pthread_mutex_lock(&self->timer_list_mutex);
698 LIST_FOREACH(timer, &self->timer_list, link)
699 if (timer->deadline < deadline) {
700 deadline = timer->deadline;
701 result = timer;
702 }
703 pthread_mutex_unlock(&self->timer_list_mutex);
704
705 return result;
706 }
707
aml__handle_timeout(struct aml * self,uint64_t now)708 static bool aml__handle_timeout(struct aml* self, uint64_t now)
709 {
710 struct aml_timer* timer = aml__get_timer_with_earliest_deadline(self);
711 if (!timer || timer->deadline > now)
712 return false;
713
714 aml_emit(self, timer, 0);
715
716 switch (timer->obj.type) {
717 case AML_OBJ_TIMER:
718 aml_stop(self, timer);
719 break;
720 case AML_OBJ_TICKER:
721 timer->deadline += timer->timeout;
722 break;
723 default:
724 abort();
725 break;
726 }
727
728 return true;
729 }
730
aml__handle_idle(struct aml * self)731 static void aml__handle_idle(struct aml* self)
732 {
733 struct aml_idle* idle;
734
735 LIST_FOREACH(idle, &self->idle_list, link)
736 if (idle->obj.cb)
737 idle->obj.cb(idle);
738 }
739
aml__handle_event(struct aml * self,struct aml_obj * obj)740 static void aml__handle_event(struct aml* self, struct aml_obj* obj)
741 {
742 /* A reference is kept here in case an object is stopped inside the
743 * callback. We want the object to live until we're done with it.
744 */
745 aml_ref(obj);
746
747 if (obj->cb)
748 obj->cb(obj);
749
750 if (obj->type == AML_OBJ_HANDLER) {
751 struct aml_handler* handler = (struct aml_handler*)obj;
752 handler->revents = 0;
753
754 if (self->backend.flags & AML_BACKEND_EDGE_TRIGGERED)
755 aml__mod_fd(self, handler);
756 }
757
758 aml_unref(obj);
759 }
760
761 /* Might exit earlier than timeout. It's up to the user to check */
762 EXPORT
aml_poll(struct aml * self,int timeout)763 int aml_poll(struct aml* self, int timeout)
764 {
765 return aml__poll(self, timeout);
766 }
767
aml__event_dequeue(struct aml * self)768 static struct aml_obj* aml__event_dequeue(struct aml* self)
769 {
770 pthread_mutex_lock(&self->event_queue_mutex);
771 struct aml_obj* obj = TAILQ_FIRST(&self->event_queue);
772 if (obj)
773 TAILQ_REMOVE(&self->event_queue, obj, event_link);
774 pthread_mutex_unlock(&self->event_queue_mutex);
775 return obj;
776 }
777
778 EXPORT
aml_dispatch(struct aml * self)779 void aml_dispatch(struct aml* self)
780 {
781 uint64_t now = aml__gettime_ms(self);
782 while (aml__handle_timeout(self, now));
783
784 struct aml_timer* earliest = aml__get_timer_with_earliest_deadline(self);
785 if (earliest) {
786 assert(earliest->deadline > now);
787 aml__set_deadline(self, earliest->deadline);
788 }
789
790 sigset_t sig_old, sig_new;
791 sigfillset(&sig_new);
792
793 pthread_sigmask(SIG_BLOCK, &sig_new, &sig_old);
794
795 struct aml_obj* obj;
796 while ((obj = aml__event_dequeue(self)) != NULL) {
797 aml__handle_event(self, obj);
798 aml_unref(obj);
799 }
800
801 pthread_sigmask(SIG_SETMASK, &sig_old, NULL);
802
803 aml__handle_idle(self);
804 aml__post_dispatch(self);
805 }
806
807 EXPORT
aml_run(struct aml * self)808 int aml_run(struct aml* self)
809 {
810 self->do_exit = false;
811
812 do {
813 aml_poll(self, -1);
814 aml_dispatch(self);
815 } while (!self->do_exit);
816
817 return 0;
818 }
819
820 EXPORT
aml_exit(struct aml * self)821 void aml_exit(struct aml* self)
822 {
823 self->do_exit = true;
824
825 if (self->backend.exit)
826 self->backend.exit(self->state);
827 }
828
829 EXPORT
aml_ref(void * obj)830 int aml_ref(void* obj)
831 {
832 struct aml_obj* self = obj;
833
834 aml__ref_lock();
835 int ref = self->ref++;
836 aml__ref_unlock();
837
838 return ref;
839 }
840
aml__free(struct aml * self)841 static void aml__free(struct aml* self)
842 {
843 while (!LIST_EMPTY(&self->obj_list)) {
844 struct aml_obj* obj = LIST_FIRST(&self->obj_list);
845
846 aml__stop_unchecked(self, obj);
847 aml__obj_remove_unlocked(self, obj);
848 }
849
850 if (self->have_thread_pool)
851 self->backend.thread_pool_release(self);
852
853 self->backend.del_state(self->state);
854
855 while (!TAILQ_EMPTY(&self->event_queue)) {
856 struct aml_obj* obj = TAILQ_FIRST(&self->event_queue);
857 TAILQ_REMOVE(&self->event_queue, obj, event_link);
858 aml_unref(obj);
859 }
860
861 pthread_mutex_destroy(&self->timer_list_mutex);
862 pthread_mutex_destroy(&self->obj_list_mutex);
863 pthread_mutex_destroy(&self->event_queue_mutex);
864
865 free(self);
866 }
867
aml__free_handler(struct aml_handler * self)868 static void aml__free_handler(struct aml_handler* self)
869 {
870 if (self->obj.free_fn)
871 self->obj.free_fn(self->obj.userdata);
872
873 free(self);
874 }
875
aml__free_timer(struct aml_timer * self)876 static void aml__free_timer(struct aml_timer* self)
877 {
878 if (self->obj.free_fn)
879 self->obj.free_fn(self->obj.userdata);
880
881 free(self);
882 }
883
aml__free_signal(struct aml_signal * self)884 static void aml__free_signal(struct aml_signal* self)
885 {
886 if (self->obj.free_fn)
887 self->obj.free_fn(self->obj.userdata);
888
889 free(self);
890 }
891
aml__free_work(struct aml_work * self)892 static void aml__free_work(struct aml_work* self)
893 {
894 if (self->obj.free_fn)
895 self->obj.free_fn(self->obj.userdata);
896
897 free(self);
898 }
899
aml__free_idle(struct aml_idle * self)900 static void aml__free_idle(struct aml_idle* self)
901 {
902 if (self->obj.free_fn)
903 self->obj.free_fn(self->obj.userdata);
904
905 free(self);
906 }
907
908 EXPORT
aml_unref(void * obj)909 int aml_unref(void* obj)
910 {
911 struct aml_obj* self = obj;
912
913 aml__ref_lock();
914 int ref = --self->ref;
915 if (ref == 0)
916 LIST_REMOVE(self, global_link);
917 aml__ref_unlock();
918 assert(ref >= 0);
919 if (ref > 0)
920 goto done;
921
922 switch (self->type) {
923 case AML_OBJ_AML:
924 aml__free(obj);
925 break;
926 case AML_OBJ_HANDLER:
927 aml__free_handler(obj);
928 break;
929 case AML_OBJ_TIMER:
930 /* fallthrough */
931 case AML_OBJ_TICKER:
932 aml__free_timer(obj);
933 break;
934 case AML_OBJ_SIGNAL:
935 aml__free_signal(obj);
936 break;
937 case AML_OBJ_WORK:
938 aml__free_work(obj);
939 break;
940 case AML_OBJ_IDLE:
941 aml__free_idle(obj);
942 break;
943 default:
944 abort();
945 break;
946 }
947
948 done:
949 return ref;
950 }
951
952 EXPORT
aml_get_id(const void * obj)953 unsigned long long aml_get_id(const void* obj)
954 {
955 const struct aml_obj* aml_obj = obj;
956 return aml_obj->id;
957 }
958
959 EXPORT
aml_try_ref(unsigned long long id)960 void* aml_try_ref(unsigned long long id)
961 {
962 struct aml_obj* obj = NULL;
963
964 aml__ref_lock();
965 LIST_FOREACH(obj, &aml__obj_list, global_link)
966 if (obj->id == id)
967 break;
968
969 if (obj && obj->id == id)
970 obj->ref++;
971 else
972 obj = NULL;
973
974 aml__ref_unlock();
975 return obj;
976 }
977
978 EXPORT
aml_get_userdata(const void * obj)979 void* aml_get_userdata(const void* obj)
980 {
981 const struct aml_obj* aml_obj = obj;
982 return aml_obj->userdata;
983 }
984
985 EXPORT
aml_set_userdata(void * obj,void * userdata,aml_free_fn free_fn)986 void aml_set_userdata(void* obj, void* userdata, aml_free_fn free_fn)
987 {
988 struct aml_obj* aml_obj = obj;
989 aml_obj->userdata = userdata;
990 aml_obj->free_fn = free_fn;
991 }
992
aml_emit(struct aml * self,void * ptr,uint32_t revents)993 void aml_emit(struct aml* self, void* ptr, uint32_t revents)
994 {
995 struct aml_obj* obj = ptr;
996
997 if (obj->type == AML_OBJ_HANDLER) {
998 struct aml_handler* handler = ptr;
999 uint32_t old = atomic_fetch_or(&handler->revents, revents);
1000 if (old != 0)
1001 return;
1002 }
1003
1004 sigset_t sig_old, sig_new;
1005 sigfillset(&sig_new);
1006
1007 pthread_sigmask(SIG_BLOCK, &sig_new, &sig_old);
1008 pthread_mutex_lock(&self->event_queue_mutex);
1009 TAILQ_INSERT_TAIL(&self->event_queue, obj, event_link);
1010 aml_ref(obj);
1011 pthread_mutex_unlock(&self->event_queue_mutex);
1012 pthread_sigmask(SIG_SETMASK, &sig_old, NULL);
1013 }
1014
1015 EXPORT
aml_get_event_mask(const struct aml_handler * handler)1016 enum aml_event aml_get_event_mask(const struct aml_handler* handler)
1017 {
1018 return handler->event_mask;
1019 }
1020
1021 EXPORT
aml_set_event_mask(struct aml_handler * handler,enum aml_event mask)1022 void aml_set_event_mask(struct aml_handler* handler, enum aml_event mask)
1023 {
1024 handler->event_mask = mask;
1025
1026 if (handler->parent && aml_is_started(handler->parent, handler))
1027 aml__mod_fd(handler->parent, handler);
1028 }
1029
1030 EXPORT
aml_get_revents(const struct aml_handler * handler)1031 enum aml_event aml_get_revents(const struct aml_handler* handler)
1032 {
1033 return handler->revents;
1034 }
1035
1036 EXPORT
aml_get_fd(const void * ptr)1037 int aml_get_fd(const void* ptr)
1038 {
1039 const struct aml_obj* obj = ptr;
1040
1041 switch (obj->type) {
1042 case AML_OBJ_AML:;
1043 const struct aml* aml = ptr;
1044 return aml->backend.get_fd ?
1045 aml->backend.get_fd(aml->state) : -1;
1046 case AML_OBJ_HANDLER:
1047 return ((struct aml_handler*)ptr)->fd;
1048 default:
1049 break;
1050 }
1051
1052 return -1;
1053 }
1054
1055 EXPORT
aml_get_signo(const struct aml_signal * sig)1056 int aml_get_signo(const struct aml_signal* sig)
1057 {
1058 return sig->signo;
1059 }
1060
aml_get_work_fn(const struct aml_work * work)1061 aml_callback_fn aml_get_work_fn(const struct aml_work* work)
1062 {
1063 return work->work_fn;
1064 }
1065
aml_get_backend_data(const void * ptr)1066 void* aml_get_backend_data(const void* ptr)
1067 {
1068 const struct aml_obj* obj = ptr;
1069 return obj->backend_data;
1070 }
1071
aml_set_backend_data(void * ptr,void * data)1072 void aml_set_backend_data(void* ptr, void* data)
1073 {
1074 struct aml_obj* obj = ptr;
1075 obj->backend_data = data;
1076 }
1077
aml_get_backend_state(const struct aml * self)1078 void* aml_get_backend_state(const struct aml* self)
1079 {
1080 return self->state;
1081 }
1082
1083 EXPORT
aml_set_duration(void * ptr,uint32_t duration)1084 void aml_set_duration(void* ptr, uint32_t duration)
1085 {
1086 struct aml_obj* obj = ptr;
1087
1088 switch (obj->type) {
1089 case AML_OBJ_TIMER: /* fallthrough */
1090 case AML_OBJ_TICKER:
1091 ((struct aml_timer*)ptr)->timeout = duration;
1092 return;
1093 default:
1094 break;
1095 }
1096
1097 abort();
1098 }
1099