1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/io/event_loop.h>
7 
8 #include <aws/io/logging.h>
9 
10 #include <aws/common/atomics.h>
11 #include <aws/common/clock.h>
12 #include <aws/common/mutex.h>
13 #include <aws/common/task_scheduler.h>
14 #include <aws/common/thread.h>
15 
16 #if defined(__FreeBSD__) || defined(__NetBSD__)
17 #    define __BSD_VISIBLE 1
18 #    include <sys/types.h>
19 #endif
20 
21 #include <sys/event.h>
22 
23 #include <aws/io/io.h>
24 #include <limits.h>
25 #include <unistd.h>
26 
27 static void s_destroy(struct aws_event_loop *event_loop);
28 static int s_run(struct aws_event_loop *event_loop);
29 static int s_stop(struct aws_event_loop *event_loop);
30 static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
31 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
32 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
33 static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
34 static int s_subscribe_to_io_events(
35     struct aws_event_loop *event_loop,
36     struct aws_io_handle *handle,
37     int events,
38     aws_event_loop_on_event_fn *on_event,
39     void *user_data);
40 static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
41 static void s_free_io_event_resources(void *user_data);
42 static bool s_is_event_thread(struct aws_event_loop *event_loop);
43 
44 static void s_event_thread_main(void *user_data);
45 
46 int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);
47 
48 enum event_thread_state {
49     EVENT_THREAD_STATE_READY_TO_RUN,
50     EVENT_THREAD_STATE_RUNNING,
51     EVENT_THREAD_STATE_STOPPING,
52 };
53 
54 enum pipe_fd_index {
55     READ_FD,
56     WRITE_FD,
57 };
58 
59 struct kqueue_loop {
60     /* thread_created_on is the handle to the event loop thread. */
61     struct aws_thread thread_created_on;
62     /* thread_joined_to is used by the thread destroying the event loop. */
63     aws_thread_id_t thread_joined_to;
64     /* running_thread_id is NULL if the event loop thread is stopped or points-to the thread_id of the thread running
65      * the event loop (either thread_created_on or thread_joined_to). Atomic because of concurrent writes (e.g.,
66      * run/stop) and reads (e.g., is_event_loop_thread).
67      * An aws_thread_id_t variable itself cannot be atomic because it is an opaque type that is platform-dependent. */
68     struct aws_atomic_var running_thread_id;
69     int kq_fd; /* kqueue file descriptor */
70 
71     /* Pipe for signaling to event-thread that cross_thread_data has changed. */
72     int cross_thread_signal_pipe[2];
73 
74     /* cross_thread_data holds things that must be communicated across threads.
75      * When the event-thread is running, the mutex must be locked while anyone touches anything in cross_thread_data.
76      * If this data is modified outside the thread, the thread is signaled via activity on a pipe. */
77     struct {
78         struct aws_mutex mutex;
79         bool thread_signaled; /* whether thread has been signaled about changes to cross_thread_data */
80         struct aws_linked_list tasks_to_schedule;
81         enum event_thread_state state;
82     } cross_thread_data;
83 
84     /* thread_data holds things which, when the event-thread is running, may only be touched by the thread */
85     struct {
86         struct aws_task_scheduler scheduler;
87 
88         int connected_handle_count;
89 
90         /* These variables duplicate ones in cross_thread_data. We move values out while holding the mutex and operate
91          * on them later */
92         enum event_thread_state state;
93     } thread_data;
94 
95     struct aws_thread_options thread_options;
96 };
97 
98 /* Data attached to aws_io_handle while the handle is subscribed to io events */
99 struct handle_data {
100     struct aws_io_handle *owner;
101     struct aws_event_loop *event_loop;
102     aws_event_loop_on_event_fn *on_event;
103     void *on_event_user_data;
104 
105     int events_subscribed; /* aws_io_event_types this handle should be subscribed to */
106     int events_this_loop;  /* aws_io_event_types received during current loop of the event-thread */
107 
108     enum { HANDLE_STATE_SUBSCRIBING, HANDLE_STATE_SUBSCRIBED, HANDLE_STATE_UNSUBSCRIBED } state;
109 
110     struct aws_task subscribe_task;
111     struct aws_task cleanup_task;
112 };
113 
114 enum {
115     DEFAULT_TIMEOUT_SEC = 100, /* Max kevent() timeout per loop of the event-thread */
116     MAX_EVENTS = 100,          /* Max kevents to process per loop of the event-thread */
117 };
118 
119 struct aws_event_loop_vtable s_kqueue_vtable = {
120     .destroy = s_destroy,
121     .run = s_run,
122     .stop = s_stop,
123     .wait_for_stop_completion = s_wait_for_stop_completion,
124     .schedule_task_now = s_schedule_task_now,
125     .schedule_task_future = s_schedule_task_future,
126     .subscribe_to_io_events = s_subscribe_to_io_events,
127     .cancel_task = s_cancel_task,
128     .unsubscribe_from_io_events = s_unsubscribe_from_io_events,
129     .free_io_event_resources = s_free_io_event_resources,
130     .is_on_callers_thread = s_is_event_thread,
131 };
132 
aws_event_loop_new_default_with_options(struct aws_allocator * alloc,const struct aws_event_loop_options * options)133 struct aws_event_loop *aws_event_loop_new_default_with_options(
134     struct aws_allocator *alloc,
135     const struct aws_event_loop_options *options) {
136     AWS_ASSERT(alloc);
137     AWS_ASSERT(clock);
138     AWS_ASSERT(options);
139     AWS_ASSERT(options->clock);
140 
141     bool clean_up_event_loop_mem = false;
142     bool clean_up_event_loop_base = false;
143     bool clean_up_impl_mem = false;
144     bool clean_up_thread = false;
145     bool clean_up_kqueue = false;
146     bool clean_up_signal_pipe = false;
147     bool clean_up_signal_kevent = false;
148     bool clean_up_mutex = false;
149 
150     struct aws_event_loop *event_loop = aws_mem_acquire(alloc, sizeof(struct aws_event_loop));
151     if (!event_loop) {
152         return NULL;
153     }
154 
155     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered kqueue", (void *)event_loop);
156     clean_up_event_loop_mem = true;
157 
158     int err = aws_event_loop_init_base(event_loop, alloc, options->clock);
159     if (err) {
160         goto clean_up;
161     }
162     clean_up_event_loop_base = true;
163 
164     struct kqueue_loop *impl = aws_mem_calloc(alloc, 1, sizeof(struct kqueue_loop));
165     if (!impl) {
166         goto clean_up;
167     }
168 
169     if (options->thread_options) {
170         impl->thread_options = *options->thread_options;
171     } else {
172         impl->thread_options = *aws_default_thread_options();
173     }
174 
175     /* intialize thread id to NULL. It will be set when the event loop thread starts. */
176     aws_atomic_init_ptr(&impl->running_thread_id, NULL);
177     clean_up_impl_mem = true;
178 
179     err = aws_thread_init(&impl->thread_created_on, alloc);
180     if (err) {
181         goto clean_up;
182     }
183     clean_up_thread = true;
184 
185     impl->kq_fd = kqueue();
186     if (impl->kq_fd == -1) {
187         AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open kqueue handle.", (void *)event_loop);
188         aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
189         goto clean_up;
190     }
191     clean_up_kqueue = true;
192 
193     err = aws_open_nonblocking_posix_pipe(impl->cross_thread_signal_pipe);
194     if (err) {
195         AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)event_loop);
196         goto clean_up;
197     }
198     AWS_LOGF_TRACE(
199         AWS_LS_IO_EVENT_LOOP,
200         "id=%p: pipe descriptors read %d, write %d.",
201         (void *)event_loop,
202         impl->cross_thread_signal_pipe[READ_FD],
203         impl->cross_thread_signal_pipe[WRITE_FD]);
204     clean_up_signal_pipe = true;
205 
206     /* Set up kevent to handle activity on the cross_thread_signal_pipe */
207     struct kevent thread_signal_kevent;
208     EV_SET(
209         &thread_signal_kevent,
210         impl->cross_thread_signal_pipe[READ_FD],
211         EVFILT_READ /*filter*/,
212         EV_ADD | EV_CLEAR /*flags*/,
213         0 /*fflags*/,
214         0 /*data*/,
215         NULL /*udata*/);
216 
217     int res = kevent(
218         impl->kq_fd,
219         &thread_signal_kevent /*changelist*/,
220         1 /*nchanges*/,
221         NULL /*eventlist*/,
222         0 /*nevents*/,
223         NULL /*timeout*/);
224 
225     if (res == -1) {
226         AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to create cross-thread signal kevent.", (void *)event_loop);
227         aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
228         goto clean_up;
229     }
230     clean_up_signal_kevent = true;
231 
232     err = aws_mutex_init(&impl->cross_thread_data.mutex);
233     if (err) {
234         goto clean_up;
235     }
236     clean_up_mutex = true;
237 
238     impl->cross_thread_data.thread_signaled = false;
239 
240     aws_linked_list_init(&impl->cross_thread_data.tasks_to_schedule);
241 
242     impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
243 
244     err = aws_task_scheduler_init(&impl->thread_data.scheduler, alloc);
245     if (err) {
246         goto clean_up;
247     }
248 
249     impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
250 
251     event_loop->impl_data = impl;
252 
253     event_loop->vtable = &s_kqueue_vtable;
254 
255     /* success */
256     return event_loop;
257 
258 clean_up:
259     if (clean_up_mutex) {
260         aws_mutex_clean_up(&impl->cross_thread_data.mutex);
261     }
262     if (clean_up_signal_kevent) {
263         thread_signal_kevent.flags = EV_DELETE;
264         kevent(
265             impl->kq_fd,
266             &thread_signal_kevent /*changelist*/,
267             1 /*nchanges*/,
268             NULL /*eventlist*/,
269             0 /*nevents*/,
270             NULL /*timeout*/);
271     }
272     if (clean_up_signal_pipe) {
273         close(impl->cross_thread_signal_pipe[READ_FD]);
274         close(impl->cross_thread_signal_pipe[WRITE_FD]);
275     }
276     if (clean_up_kqueue) {
277         close(impl->kq_fd);
278     }
279     if (clean_up_thread) {
280         aws_thread_clean_up(&impl->thread_created_on);
281     }
282     if (clean_up_impl_mem) {
283         aws_mem_release(alloc, impl);
284     }
285     if (clean_up_event_loop_base) {
286         aws_event_loop_clean_up_base(event_loop);
287     }
288     if (clean_up_event_loop_mem) {
289         aws_mem_release(alloc, event_loop);
290     }
291     return NULL;
292 }
293 
s_destroy(struct aws_event_loop * event_loop)294 static void s_destroy(struct aws_event_loop *event_loop) {
295     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: destroying event_loop", (void *)event_loop);
296     struct kqueue_loop *impl = event_loop->impl_data;
297 
298     /* Stop the event-thread. This might have already happened. It's safe to call multiple times. */
299     s_stop(event_loop);
300     int err = s_wait_for_stop_completion(event_loop);
301     if (err) {
302         AWS_LOGF_WARN(
303             AWS_LS_IO_EVENT_LOOP,
304             "id=%p: failed to destroy event-thread, resources have been leaked",
305             (void *)event_loop);
306         AWS_ASSERT("Failed to destroy event-thread, resources have been leaked." == NULL);
307         return;
308     }
309     /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
310     impl->thread_joined_to = aws_thread_current_thread_id();
311     aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_joined_to);
312 
313     /* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop.
314      * Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */
315 
316     aws_task_scheduler_clean_up(&impl->thread_data.scheduler); /* Tasks in scheduler get cancelled*/
317 
318     while (!aws_linked_list_empty(&impl->cross_thread_data.tasks_to_schedule)) {
319         struct aws_linked_list_node *node = aws_linked_list_pop_front(&impl->cross_thread_data.tasks_to_schedule);
320         struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
321         task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
322     }
323 
324     /* Warn user if aws_io_handle was subscribed, but never unsubscribed. This would cause memory leaks. */
325     AWS_ASSERT(impl->thread_data.connected_handle_count == 0);
326 
327     /* Clean up everything else */
328     aws_mutex_clean_up(&impl->cross_thread_data.mutex);
329 
330     struct kevent thread_signal_kevent;
331     EV_SET(
332         &thread_signal_kevent,
333         impl->cross_thread_signal_pipe[READ_FD],
334         EVFILT_READ /*filter*/,
335         EV_DELETE /*flags*/,
336         0 /*fflags*/,
337         0 /*data*/,
338         NULL /*udata*/);
339 
340     kevent(
341         impl->kq_fd,
342         &thread_signal_kevent /*changelist*/,
343         1 /*nchanges*/,
344         NULL /*eventlist*/,
345         0 /*nevents*/,
346         NULL /*timeout*/);
347 
348     close(impl->cross_thread_signal_pipe[READ_FD]);
349     close(impl->cross_thread_signal_pipe[WRITE_FD]);
350     close(impl->kq_fd);
351     aws_thread_clean_up(&impl->thread_created_on);
352     aws_mem_release(event_loop->alloc, impl);
353     aws_event_loop_clean_up_base(event_loop);
354     aws_mem_release(event_loop->alloc, event_loop);
355 }
356 
s_run(struct aws_event_loop * event_loop)357 static int s_run(struct aws_event_loop *event_loop) {
358     struct kqueue_loop *impl = event_loop->impl_data;
359 
360     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: starting event-loop thread.", (void *)event_loop);
361     /* to re-run, call stop() and wait_for_stop_completion() */
362     AWS_ASSERT(impl->cross_thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
363     AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
364 
365     /* Since thread isn't running it's ok to touch thread_data,
366      * and it's ok to touch cross_thread_data without locking the mutex */
367     impl->cross_thread_data.state = EVENT_THREAD_STATE_RUNNING;
368 
369     aws_thread_increment_unjoined_count();
370     int err =
371         aws_thread_launch(&impl->thread_created_on, s_event_thread_main, (void *)event_loop, &impl->thread_options);
372 
373     if (err) {
374         aws_thread_decrement_unjoined_count();
375         AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
376         goto clean_up;
377     }
378 
379     return AWS_OP_SUCCESS;
380 
381 clean_up:
382     impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
383     return AWS_OP_ERR;
384 }
385 
386 /* This function can't fail, we're relying on the thread responding to critical messages (ex: stop thread) */
signal_cross_thread_data_changed(struct aws_event_loop * event_loop)387 void signal_cross_thread_data_changed(struct aws_event_loop *event_loop) {
388     struct kqueue_loop *impl = event_loop->impl_data;
389 
390     AWS_LOGF_TRACE(
391         AWS_LS_IO_EVENT_LOOP,
392         "id=%p: signaling event-loop that cross-thread tasks need to be scheduled.",
393         (void *)event_loop);
394     /* Doesn't actually matter what we write, any activity on pipe signals that cross_thread_data has changed,
395      * If the pipe is full and the write fails, that's fine, the event-thread will get the signal from some previous
396      * write */
397     uint32_t write_whatever = 0xC0FFEE;
398     write(impl->cross_thread_signal_pipe[WRITE_FD], &write_whatever, sizeof(write_whatever));
399 }
400 
s_stop(struct aws_event_loop * event_loop)401 static int s_stop(struct aws_event_loop *event_loop) {
402     struct kqueue_loop *impl = event_loop->impl_data;
403 
404     bool signal_thread = false;
405 
406     { /* Begin critical section */
407         aws_mutex_lock(&impl->cross_thread_data.mutex);
408         if (impl->cross_thread_data.state == EVENT_THREAD_STATE_RUNNING) {
409             impl->cross_thread_data.state = EVENT_THREAD_STATE_STOPPING;
410             signal_thread = !impl->cross_thread_data.thread_signaled;
411             impl->cross_thread_data.thread_signaled = true;
412         }
413         aws_mutex_unlock(&impl->cross_thread_data.mutex);
414     } /* End critical section */
415 
416     if (signal_thread) {
417         signal_cross_thread_data_changed(event_loop);
418     }
419 
420     return AWS_OP_SUCCESS;
421 }
422 
s_wait_for_stop_completion(struct aws_event_loop * event_loop)423 static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
424     struct kqueue_loop *impl = event_loop->impl_data;
425 
426 #ifdef DEBUG_BUILD
427     aws_mutex_lock(&impl->cross_thread_data.mutex);
428     /* call stop() before wait_for_stop_completion() or you'll wait forever */
429     AWS_ASSERT(impl->cross_thread_data.state != EVENT_THREAD_STATE_RUNNING);
430     aws_mutex_unlock(&impl->cross_thread_data.mutex);
431 #endif
432 
433     int err = aws_thread_join(&impl->thread_created_on);
434     aws_thread_decrement_unjoined_count();
435     if (err) {
436         return AWS_OP_ERR;
437     }
438 
439     /* Since thread is no longer running it's ok to touch thread_data,
440      * and it's ok to touch cross_thread_data without locking the mutex */
441     impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
442     impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
443 
444     return AWS_OP_SUCCESS;
445 }
446 
447 /* Common functionality for "now" and "future" task scheduling.
448  * If `run_at_nanos` is zero then the task is scheduled as a "now" task. */
s_schedule_task_common(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos)449 static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
450     AWS_ASSERT(task);
451     struct kqueue_loop *impl = event_loop->impl_data;
452 
453     /* If we're on the event-thread, just schedule it directly */
454     if (s_is_event_thread(event_loop)) {
455         AWS_LOGF_TRACE(
456             AWS_LS_IO_EVENT_LOOP,
457             "id=%p: scheduling task %p in-thread for timestamp %llu",
458             (void *)event_loop,
459             (void *)task,
460             (unsigned long long)run_at_nanos);
461         if (run_at_nanos == 0) {
462             aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
463         } else {
464             aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
465         }
466         return;
467     }
468 
469     /* Otherwise, add it to cross_thread_data.tasks_to_schedule and signal the event-thread to process it */
470     AWS_LOGF_TRACE(
471         AWS_LS_IO_EVENT_LOOP,
472         "id=%p: scheduling task %p cross-thread for timestamp %llu",
473         (void *)event_loop,
474         (void *)task,
475         (unsigned long long)run_at_nanos);
476     task->timestamp = run_at_nanos;
477     bool should_signal_thread = false;
478 
479     /* Begin critical section */
480     aws_mutex_lock(&impl->cross_thread_data.mutex);
481     aws_linked_list_push_back(&impl->cross_thread_data.tasks_to_schedule, &task->node);
482 
483     /* Signal thread that cross_thread_data has changed (unless it's been signaled already) */
484     if (!impl->cross_thread_data.thread_signaled) {
485         should_signal_thread = true;
486         impl->cross_thread_data.thread_signaled = true;
487     }
488 
489     aws_mutex_unlock(&impl->cross_thread_data.mutex);
490     /* End critical section */
491 
492     if (should_signal_thread) {
493         signal_cross_thread_data_changed(event_loop);
494     }
495 }
496 
s_schedule_task_now(struct aws_event_loop * event_loop,struct aws_task * task)497 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
498     s_schedule_task_common(event_loop, task, 0); /* Zero is used to denote "now" tasks */
499 }
500 
s_schedule_task_future(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos)501 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
502     s_schedule_task_common(event_loop, task, run_at_nanos);
503 }
504 
s_cancel_task(struct aws_event_loop * event_loop,struct aws_task * task)505 static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
506     struct kqueue_loop *kqueue_loop = event_loop->impl_data;
507     AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
508     aws_task_scheduler_cancel_task(&kqueue_loop->thread_data.scheduler, task);
509 }
510 
511 /* Scheduled task that connects aws_io_handle with the kqueue */
s_subscribe_task(struct aws_task * task,void * user_data,enum aws_task_status status)512 static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
513     (void)task;
514     struct handle_data *handle_data = user_data;
515     struct aws_event_loop *event_loop = handle_data->event_loop;
516     struct kqueue_loop *impl = handle_data->event_loop->impl_data;
517 
518     impl->thread_data.connected_handle_count++;
519 
520     /* if task was cancelled, nothing to do */
521     if (status == AWS_TASK_STATUS_CANCELED) {
522         return;
523     }
524     AWS_LOGF_TRACE(
525         AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle_data->owner->data.fd);
526 
527     /* If handle was unsubscribed before this task could execute, nothing to do */
528     if (handle_data->state == HANDLE_STATE_UNSUBSCRIBED) {
529         return;
530     }
531 
532     AWS_ASSERT(handle_data->state == HANDLE_STATE_SUBSCRIBING);
533 
534     /* In order to monitor both reads and writes, kqueue requires you to add two separate kevents.
535      * If we're adding two separate kevents, but one of those fails, we need to remove the other kevent.
536      * Therefore we use the EV_RECEIPT flag. This causes kevent() to tell whether each EV_ADD succeeded,
537      * rather than the usual behavior of telling us about recent events. */
538     struct kevent changelist[2];
539     AWS_ZERO_ARRAY(changelist);
540 
541     int changelist_size = 0;
542 
543     if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
544         EV_SET(
545             &changelist[changelist_size++],
546             handle_data->owner->data.fd,
547             EVFILT_READ /*filter*/,
548             EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
549             0 /*fflags*/,
550             0 /*data*/,
551             handle_data /*udata*/);
552     }
553     if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
554         EV_SET(
555             &changelist[changelist_size++],
556             handle_data->owner->data.fd,
557             EVFILT_WRITE /*filter*/,
558             EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
559             0 /*fflags*/,
560             0 /*data*/,
561             handle_data /*udata*/);
562     }
563 
564     int num_events = kevent(
565         impl->kq_fd,
566         changelist /*changelist*/,
567         changelist_size /*nchanges*/,
568         changelist /*eventlist. It's OK to re-use the same memory for changelist input and eventlist output*/,
569         changelist_size /*nevents*/,
570         NULL /*timeout*/);
571     if (num_events == -1) {
572         goto subscribe_failed;
573     }
574 
575     /* Look through results to see if any failed */
576     for (int i = 0; i < num_events; ++i) {
577         /* Every result should be flagged as error, that's just how EV_RECEIPT works */
578         AWS_ASSERT(changelist[i].flags & EV_ERROR);
579 
580         /* If a real error occurred, .data contains the error code */
581         if (changelist[i].data != 0) {
582             goto subscribe_failed;
583         }
584     }
585 
586     /* Success */
587     handle_data->state = HANDLE_STATE_SUBSCRIBED;
588     return;
589 
590 subscribe_failed:
591     AWS_LOGF_ERROR(
592         AWS_LS_IO_EVENT_LOOP,
593         "id=%p: failed to subscribe to events on fd %d",
594         (void *)event_loop,
595         handle_data->owner->data.fd);
596     /* Remove any related kevents that succeeded */
597     for (int i = 0; i < num_events; ++i) {
598         if (changelist[i].data == 0) {
599             changelist[i].flags = EV_DELETE;
600             kevent(
601                 impl->kq_fd,
602                 &changelist[i] /*changelist*/,
603                 1 /*nchanges*/,
604                 NULL /*eventlist*/,
605                 0 /*nevents*/,
606                 NULL /*timeout*/);
607         }
608     }
609 
610     /* We can't return an error code because this was a scheduled task.
611      * Notify the user of the failed subscription by passing AWS_IO_EVENT_TYPE_ERROR to the callback. */
612     handle_data->on_event(event_loop, handle_data->owner, AWS_IO_EVENT_TYPE_ERROR, handle_data->on_event_user_data);
613 }
614 
s_subscribe_to_io_events(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,aws_event_loop_on_event_fn * on_event,void * user_data)615 static int s_subscribe_to_io_events(
616     struct aws_event_loop *event_loop,
617     struct aws_io_handle *handle,
618     int events,
619     aws_event_loop_on_event_fn *on_event,
620     void *user_data) {
621 
622     AWS_ASSERT(event_loop);
623     AWS_ASSERT(handle->data.fd != -1);
624     AWS_ASSERT(handle->additional_data == NULL);
625     AWS_ASSERT(on_event);
626     /* Must subscribe for read, write, or both */
627     AWS_ASSERT(events & (AWS_IO_EVENT_TYPE_READABLE | AWS_IO_EVENT_TYPE_WRITABLE));
628 
629     struct handle_data *handle_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct handle_data));
630     if (!handle_data) {
631         return AWS_OP_ERR;
632     }
633 
634     handle_data->owner = handle;
635     handle_data->event_loop = event_loop;
636     handle_data->on_event = on_event;
637     handle_data->on_event_user_data = user_data;
638     handle_data->events_subscribed = events;
639     handle_data->state = HANDLE_STATE_SUBSCRIBING;
640 
641     handle->additional_data = handle_data;
642 
643     /* We schedule a task to perform the actual changes to the kqueue, read on for an explanation why...
644      *
645      * kqueue requires separate registrations for read and write events.
646      * If the user wants to know about both read and write, we need register once for read and once for write.
647      * If the first registration succeeds, but the second registration fails, we need to delete the first registration.
648      * If this all happened outside the event-thread, the successful registration's events could begin processing
649      * in the brief window of time before the registration is deleted. */
650 
651     aws_task_init(&handle_data->subscribe_task, s_subscribe_task, handle_data, "kqueue_event_loop_subscribe");
652     s_schedule_task_now(event_loop, &handle_data->subscribe_task);
653 
654     return AWS_OP_SUCCESS;
655 }
656 
s_free_io_event_resources(void * user_data)657 static void s_free_io_event_resources(void *user_data) {
658     struct handle_data *handle_data = user_data;
659     struct kqueue_loop *impl = handle_data->event_loop->impl_data;
660 
661     impl->thread_data.connected_handle_count--;
662 
663     aws_mem_release(handle_data->event_loop->alloc, handle_data);
664 }
665 
s_clean_up_handle_data_task(struct aws_task * task,void * user_data,enum aws_task_status status)666 static void s_clean_up_handle_data_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
667     (void)task;
668     (void)status;
669 
670     struct handle_data *handle_data = user_data;
671     s_free_io_event_resources(handle_data);
672 }
673 
s_unsubscribe_from_io_events(struct aws_event_loop * event_loop,struct aws_io_handle * handle)674 static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
675     AWS_LOGF_TRACE(
676         AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd);
677     AWS_ASSERT(handle->additional_data);
678     struct handle_data *handle_data = handle->additional_data;
679     struct kqueue_loop *impl = event_loop->impl_data;
680 
681     AWS_ASSERT(event_loop == handle_data->event_loop);
682 
683     /* If the handle was successfully subscribed to kqueue, then remove it. */
684     if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
685         struct kevent changelist[2];
686         int changelist_size = 0;
687 
688         if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
689             EV_SET(
690                 &changelist[changelist_size++],
691                 handle_data->owner->data.fd,
692                 EVFILT_READ /*filter*/,
693                 EV_DELETE /*flags*/,
694                 0 /*fflags*/,
695                 0 /*data*/,
696                 handle_data /*udata*/);
697         }
698         if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
699             EV_SET(
700                 &changelist[changelist_size++],
701                 handle_data->owner->data.fd,
702                 EVFILT_WRITE /*filter*/,
703                 EV_DELETE /*flags*/,
704                 0 /*fflags*/,
705                 0 /*data*/,
706                 handle_data /*udata*/);
707         }
708 
709         kevent(impl->kq_fd, changelist, changelist_size, NULL /*eventlist*/, 0 /*nevents*/, NULL /*timeout*/);
710     }
711 
712     /* Schedule a task to clean up the memory. This is done in a task to prevent the following scenario:
713      * - While processing a batch of events, some callback unsubscribes another aws_io_handle.
714      * - One of the other events in this batch belongs to that other aws_io_handle.
715      * - If the handle_data were already deleted, there would be an access invalid memory. */
716 
717     aws_task_init(
718         &handle_data->cleanup_task, s_clean_up_handle_data_task, handle_data, "kqueue_event_loop_clean_up_handle_data");
719     aws_event_loop_schedule_task_now(event_loop, &handle_data->cleanup_task);
720 
721     handle_data->state = HANDLE_STATE_UNSUBSCRIBED;
722     handle->additional_data = NULL;
723 
724     return AWS_OP_SUCCESS;
725 }
726 
s_is_event_thread(struct aws_event_loop * event_loop)727 static bool s_is_event_thread(struct aws_event_loop *event_loop) {
728     struct kqueue_loop *impl = event_loop->impl_data;
729 
730     aws_thread_id_t *thread_id = aws_atomic_load_ptr(&impl->running_thread_id);
731     return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
732 }
733 
734 /* Called from thread.
735  * Takes tasks from tasks_to_schedule and adds them to the scheduler. */
s_process_tasks_to_schedule(struct aws_event_loop * event_loop,struct aws_linked_list * tasks_to_schedule)736 static void s_process_tasks_to_schedule(struct aws_event_loop *event_loop, struct aws_linked_list *tasks_to_schedule) {
737     struct kqueue_loop *impl = event_loop->impl_data;
738     AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop);
739 
740     while (!aws_linked_list_empty(tasks_to_schedule)) {
741         struct aws_linked_list_node *node = aws_linked_list_pop_front(tasks_to_schedule);
742         struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
743 
744         AWS_LOGF_TRACE(
745             AWS_LS_IO_EVENT_LOOP,
746             "id=%p: task %p pulled to event-loop, scheduling now.",
747             (void *)event_loop,
748             (void *)task);
749         /* Timestamp 0 is used to denote "now" tasks */
750         if (task->timestamp == 0) {
751             aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
752         } else {
753             aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, task->timestamp);
754         }
755     }
756 }
757 
s_process_cross_thread_data(struct aws_event_loop * event_loop)758 static void s_process_cross_thread_data(struct aws_event_loop *event_loop) {
759     struct kqueue_loop *impl = event_loop->impl_data;
760 
761     AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread data to process", (void *)event_loop);
762     /* If there are tasks to schedule, grab them all out of synced_data.tasks_to_schedule.
763      * We'll process them later, so that we minimize time spent holding the mutex. */
764     struct aws_linked_list tasks_to_schedule;
765     aws_linked_list_init(&tasks_to_schedule);
766 
767     { /* Begin critical section */
768         aws_mutex_lock(&impl->cross_thread_data.mutex);
769         impl->cross_thread_data.thread_signaled = false;
770 
771         bool initiate_stop = (impl->cross_thread_data.state == EVENT_THREAD_STATE_STOPPING) &&
772                              (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING);
773         if (AWS_UNLIKELY(initiate_stop)) {
774             impl->thread_data.state = EVENT_THREAD_STATE_STOPPING;
775         }
776 
777         aws_linked_list_swap_contents(&impl->cross_thread_data.tasks_to_schedule, &tasks_to_schedule);
778 
779         aws_mutex_unlock(&impl->cross_thread_data.mutex);
780     } /* End critical section */
781 
782     s_process_tasks_to_schedule(event_loop, &tasks_to_schedule);
783 }
784 
s_aws_event_flags_from_kevent(struct kevent * kevent)785 static int s_aws_event_flags_from_kevent(struct kevent *kevent) {
786     int event_flags = 0;
787 
788     if (kevent->flags & EV_ERROR) {
789         event_flags |= AWS_IO_EVENT_TYPE_ERROR;
790     } else if (kevent->filter == EVFILT_READ) {
791         if (kevent->data != 0) {
792             event_flags |= AWS_IO_EVENT_TYPE_READABLE;
793         }
794 
795         if (kevent->flags & EV_EOF) {
796             event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
797         }
798     } else if (kevent->filter == EVFILT_WRITE) {
799         if (kevent->data != 0) {
800             event_flags |= AWS_IO_EVENT_TYPE_WRITABLE;
801         }
802 
803         if (kevent->flags & EV_EOF) {
804             event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
805         }
806     }
807 
808     return event_flags;
809 }
810 
s_event_thread_main(void * user_data)811 static void s_event_thread_main(void *user_data) {
812     struct aws_event_loop *event_loop = user_data;
813     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
814     struct kqueue_loop *impl = event_loop->impl_data;
815 
816     /* set thread id to the event-loop's thread. */
817     aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);
818 
819     AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
820     impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
821 
822     struct kevent kevents[MAX_EVENTS];
823 
824     /* A single aws_io_handle could have two separate kevents if subscribed for both read and write.
825      * If both the read and write kevents fire in the same loop of the event-thread,
826      * combine the event-flags and deliver them in a single callback.
827      * This makes the kqueue_event_loop behave more like the other platform implementations. */
828     struct handle_data *io_handle_events[MAX_EVENTS];
829 
830     struct timespec timeout = {
831         .tv_sec = DEFAULT_TIMEOUT_SEC,
832         .tv_nsec = 0,
833     };
834 
835     AWS_LOGF_INFO(
836         AWS_LS_IO_EVENT_LOOP,
837         "id=%p: default timeout %ds, and max events to process per tick %d",
838         (void *)event_loop,
839         DEFAULT_TIMEOUT_SEC,
840         MAX_EVENTS);
841 
842     while (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING) {
843         int num_io_handle_events = 0;
844         bool should_process_cross_thread_data = false;
845 
846         AWS_LOGF_TRACE(
847             AWS_LS_IO_EVENT_LOOP,
848             "id=%p: waiting for a maximum of %ds %lluns",
849             (void *)event_loop,
850             (int)timeout.tv_sec,
851             (unsigned long long)timeout.tv_nsec);
852 
853         /* Process kqueue events */
854         int num_kevents = kevent(
855             impl->kq_fd, NULL /*changelist*/, 0 /*nchanges*/, kevents /*eventlist*/, MAX_EVENTS /*nevents*/, &timeout);
856 
857         aws_event_loop_register_tick_start(event_loop);
858         AWS_LOGF_TRACE(
859             AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, num_kevents);
860         if (num_kevents == -1) {
861             /* Raise an error, in case this is interesting to anyone monitoring,
862              * and continue on with this loop. We can't process events,
863              * but we can still process scheduled tasks */
864             aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
865 
866             /* Force the cross_thread_data to be processed.
867              * There might be valuable info in there, like the message to stop the thread.
868              * It's fine to do this even if nothing has changed, it just costs a mutex lock/unlock. */
869             should_process_cross_thread_data = true;
870         }
871 
872         for (int i = 0; i < num_kevents; ++i) {
873             struct kevent *kevent = &kevents[i];
874 
875             /* Was this event to signal that cross_thread_data has changed? */
876             if ((int)kevent->ident == impl->cross_thread_signal_pipe[READ_FD]) {
877                 should_process_cross_thread_data = true;
878 
879                 /* Drain whatever data was written to the signaling pipe */
880                 uint32_t read_whatever;
881                 while (read((int)kevent->ident, &read_whatever, sizeof(read_whatever)) > 0) {
882                 }
883 
884                 continue;
885             }
886 
887             /* Otherwise this was a normal event on a subscribed handle. Figure out which flags to report. */
888             int event_flags = s_aws_event_flags_from_kevent(kevent);
889             if (event_flags == 0) {
890                 continue;
891             }
892 
893             /* Combine flags, in case multiple kevents correspond to one handle. (see notes at top of function) */
894             struct handle_data *handle_data = kevent->udata;
895             if (handle_data->events_this_loop == 0) {
896                 io_handle_events[num_io_handle_events++] = handle_data;
897             }
898             handle_data->events_this_loop |= event_flags;
899         }
900 
901         /* Invoke each handle's event callback (unless the handle has been unsubscribed) */
902         for (int i = 0; i < num_io_handle_events; ++i) {
903             struct handle_data *handle_data = io_handle_events[i];
904 
905             if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
906                 AWS_LOGF_TRACE(
907                     AWS_LS_IO_EVENT_LOOP,
908                     "id=%p: activity on fd %d, invoking handler.",
909                     (void *)event_loop,
910                     handle_data->owner->data.fd);
911                 handle_data->on_event(
912                     event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data);
913             }
914 
915             handle_data->events_this_loop = 0;
916         }
917 
918         /* Process cross_thread_data */
919         if (should_process_cross_thread_data) {
920             s_process_cross_thread_data(event_loop);
921         }
922 
923         /* Run scheduled tasks */
924         uint64_t now_ns = 0;
925         event_loop->clock(&now_ns); /* If clock fails, now_ns will be 0 and tasks scheduled for a specific time
926                                        will not be run. That's ok, we'll handle them next time around. */
927         AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
928         aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);
929 
930         /* Set timeout for next kevent() call.
931          * If clock fails, or scheduler has no tasks, use default timeout */
932         bool use_default_timeout = false;
933 
934         int err = event_loop->clock(&now_ns);
935         if (err) {
936             use_default_timeout = true;
937         }
938 
939         uint64_t next_run_time_ns;
940         if (!aws_task_scheduler_has_tasks(&impl->thread_data.scheduler, &next_run_time_ns)) {
941 
942             use_default_timeout = true;
943         }
944 
945         if (use_default_timeout) {
946             AWS_LOGF_TRACE(
947                 AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
948             timeout.tv_sec = DEFAULT_TIMEOUT_SEC;
949             timeout.tv_nsec = 0;
950         } else {
951             /* Convert from timestamp in nanoseconds, to timeout in seconds with nanosecond remainder */
952             uint64_t timeout_ns = next_run_time_ns > now_ns ? next_run_time_ns - now_ns : 0;
953 
954             uint64_t timeout_remainder_ns = 0;
955             uint64_t timeout_sec =
956                 aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &timeout_remainder_ns);
957 
958             if (timeout_sec > LONG_MAX) { /* Check for overflow. On Darwin, these values are stored as longs */
959                 timeout_sec = LONG_MAX;
960                 timeout_remainder_ns = 0;
961             }
962 
963             AWS_LOGF_TRACE(
964                 AWS_LS_IO_EVENT_LOOP,
965                 "id=%p: detected more scheduled tasks with the next occurring at "
966                 "%llu using timeout of %ds %lluns.",
967                 (void *)event_loop,
968                 (unsigned long long)timeout_ns,
969                 (int)timeout_sec,
970                 (unsigned long long)timeout_remainder_ns);
971             timeout.tv_sec = (time_t)(timeout_sec);
972             timeout.tv_nsec = (long)(timeout_remainder_ns);
973         }
974 
975         aws_event_loop_register_tick_end(event_loop);
976     }
977 
978     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
979     /* reset to NULL. This should be updated again during destroy before tasks are canceled. */
980     aws_atomic_store_ptr(&impl->running_thread_id, NULL);
981 }
982