1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/io/event_loop.h>
7 
8 #include <aws/common/atomics.h>
9 #include <aws/common/clock.h>
10 #include <aws/common/mutex.h>
11 #include <aws/common/task_scheduler.h>
12 #include <aws/common/thread.h>
13 
14 #include <aws/io/logging.h>
15 
16 #include <sys/epoll.h>
17 
18 #include <errno.h>
19 #include <limits.h>
20 #include <unistd.h>
21 
22 #if !defined(COMPAT_MODE) && defined(__GLIBC__) && __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 8
23 #    define USE_EFD 1
24 #else
25 #    define USE_EFD 0
26 #endif
27 
28 #if USE_EFD
29 #    include <aws/io/io.h>
30 #    include <sys/eventfd.h>
31 
32 #else
33 #    include <aws/io/pipe.h>
34 #endif
35 
36 /* This isn't defined on ancient linux distros (breaking the builds).
37  * However, if this is a prebuild, we purposely build on an ancient system, but
38  * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
39  * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
40  * gets passed as long as it does.
41  */
42 #ifndef EPOLLRDHUP
43 #    define EPOLLRDHUP 0x2000
44 #endif
45 
46 static void s_destroy(struct aws_event_loop *event_loop);
47 static int s_run(struct aws_event_loop *event_loop);
48 static int s_stop(struct aws_event_loop *event_loop);
49 static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
50 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
51 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
52 static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
53 static int s_subscribe_to_io_events(
54     struct aws_event_loop *event_loop,
55     struct aws_io_handle *handle,
56     int events,
57     aws_event_loop_on_event_fn *on_event,
58     void *user_data);
59 static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
60 static void s_free_io_event_resources(void *user_data);
61 static bool s_is_on_callers_thread(struct aws_event_loop *event_loop);
62 
63 static void s_main_loop(void *args);
64 
65 static struct aws_event_loop_vtable s_vtable = {
66     .destroy = s_destroy,
67     .run = s_run,
68     .stop = s_stop,
69     .wait_for_stop_completion = s_wait_for_stop_completion,
70     .schedule_task_now = s_schedule_task_now,
71     .schedule_task_future = s_schedule_task_future,
72     .cancel_task = s_cancel_task,
73     .subscribe_to_io_events = s_subscribe_to_io_events,
74     .unsubscribe_from_io_events = s_unsubscribe_from_io_events,
75     .free_io_event_resources = s_free_io_event_resources,
76     .is_on_callers_thread = s_is_on_callers_thread,
77 };
78 
79 struct epoll_loop {
80     struct aws_task_scheduler scheduler;
81     struct aws_thread thread_created_on;
82     struct aws_thread_options thread_options;
83     aws_thread_id_t thread_joined_to;
84     struct aws_atomic_var running_thread_id;
85     struct aws_io_handle read_task_handle;
86     struct aws_io_handle write_task_handle;
87     struct aws_mutex task_pre_queue_mutex;
88     struct aws_linked_list task_pre_queue;
89     struct aws_task stop_task;
90     struct aws_atomic_var stop_task_ptr;
91     int epoll_fd;
92     bool should_process_task_pre_queue;
93     bool should_continue;
94 };
95 
96 struct epoll_event_data {
97     struct aws_allocator *alloc;
98     struct aws_io_handle *handle;
99     aws_event_loop_on_event_fn *on_event;
100     void *user_data;
101     struct aws_task cleanup_task;
102     bool is_subscribed; /* false when handle is unsubscribed, but this struct hasn't been cleaned up yet */
103 };
104 
105 /* default timeout is 100 seconds */
106 enum {
107     DEFAULT_TIMEOUT = 100 * 1000,
108     MAX_EVENTS = 100,
109 };
110 
111 int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);
112 
113 /* Setup edge triggered epoll with a scheduler. */
aws_event_loop_new_default_with_options(struct aws_allocator * alloc,const struct aws_event_loop_options * options)114 struct aws_event_loop *aws_event_loop_new_default_with_options(
115     struct aws_allocator *alloc,
116     const struct aws_event_loop_options *options) {
117     AWS_PRECONDITION(options);
118     AWS_PRECONDITION(options->clock);
119 
120     struct aws_event_loop *loop = aws_mem_calloc(alloc, 1, sizeof(struct aws_event_loop));
121     if (!loop) {
122         return NULL;
123     }
124 
125     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered epoll", (void *)loop);
126     if (aws_event_loop_init_base(loop, alloc, options->clock)) {
127         goto clean_up_loop;
128     }
129 
130     struct epoll_loop *epoll_loop = aws_mem_calloc(alloc, 1, sizeof(struct epoll_loop));
131     if (!epoll_loop) {
132         goto cleanup_base_loop;
133     }
134 
135     if (options->thread_options) {
136         epoll_loop->thread_options = *options->thread_options;
137     } else {
138         epoll_loop->thread_options = *aws_default_thread_options();
139     }
140 
141     /* initialize thread id to NULL, it should be updated when the event loop thread starts. */
142     aws_atomic_init_ptr(&epoll_loop->running_thread_id, NULL);
143 
144     aws_linked_list_init(&epoll_loop->task_pre_queue);
145     epoll_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT;
146     aws_atomic_init_ptr(&epoll_loop->stop_task_ptr, NULL);
147 
148     epoll_loop->epoll_fd = epoll_create(100);
149     if (epoll_loop->epoll_fd < 0) {
150         AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open epoll handle.", (void *)loop);
151         aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
152         goto clean_up_epoll;
153     }
154 
155     if (aws_thread_init(&epoll_loop->thread_created_on, alloc)) {
156         goto clean_up_epoll;
157     }
158 
159 #if USE_EFD
160     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Using eventfd for cross-thread notifications.", (void *)loop);
161     int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
162 
163     if (fd < 0) {
164         AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open eventfd handle.", (void *)loop);
165         aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
166         goto clean_up_thread;
167     }
168 
169     AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: eventfd descriptor %d.", (void *)loop, fd);
170     epoll_loop->write_task_handle = (struct aws_io_handle){.data.fd = fd, .additional_data = NULL};
171     epoll_loop->read_task_handle = (struct aws_io_handle){.data.fd = fd, .additional_data = NULL};
172 #else
173     AWS_LOGF_DEBUG(
174         AWS_LS_IO_EVENT_LOOP,
175         "id=%p: Eventfd not available, falling back to pipe for cross-thread notification.",
176         (void *)loop);
177 
178     int pipe_fds[2] = {0};
179     /* this pipe is for task scheduling. */
180     if (aws_open_nonblocking_posix_pipe(pipe_fds)) {
181         AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)loop);
182         goto clean_up_thread;
183     }
184 
185     AWS_LOGF_TRACE(
186         AWS_LS_IO_EVENT_LOOP, "id=%p: pipe descriptors read %d, write %d.", (void *)loop, pipe_fds[0], pipe_fds[1]);
187     epoll_loop->write_task_handle.data.fd = pipe_fds[1];
188     epoll_loop->read_task_handle.data.fd = pipe_fds[0];
189 #endif
190 
191     if (aws_task_scheduler_init(&epoll_loop->scheduler, alloc)) {
192         goto clean_up_pipe;
193     }
194 
195     epoll_loop->should_continue = false;
196 
197     loop->impl_data = epoll_loop;
198     loop->vtable = &s_vtable;
199 
200     return loop;
201 
202 clean_up_pipe:
203 #if USE_EFD
204     close(epoll_loop->write_task_handle.data.fd);
205     epoll_loop->write_task_handle.data.fd = -1;
206     epoll_loop->read_task_handle.data.fd = -1;
207 #else
208     close(epoll_loop->read_task_handle.data.fd);
209     close(epoll_loop->write_task_handle.data.fd);
210 #endif
211 
212 clean_up_thread:
213     aws_thread_clean_up(&epoll_loop->thread_created_on);
214 
215 clean_up_epoll:
216     if (epoll_loop->epoll_fd >= 0) {
217         close(epoll_loop->epoll_fd);
218     }
219 
220     aws_mem_release(alloc, epoll_loop);
221 
222 cleanup_base_loop:
223     aws_event_loop_clean_up_base(loop);
224 
225 clean_up_loop:
226     aws_mem_release(alloc, loop);
227 
228     return NULL;
229 }
230 
s_destroy(struct aws_event_loop * event_loop)231 static void s_destroy(struct aws_event_loop *event_loop) {
232     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying event_loop", (void *)event_loop);
233 
234     struct epoll_loop *epoll_loop = event_loop->impl_data;
235 
236     /* we don't know if stop() has been called by someone else,
237      * just call stop() again and wait for event-loop to finish. */
238     aws_event_loop_stop(event_loop);
239     s_wait_for_stop_completion(event_loop);
240 
241     /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
242     epoll_loop->thread_joined_to = aws_thread_current_thread_id();
243     aws_atomic_store_ptr(&epoll_loop->running_thread_id, &epoll_loop->thread_joined_to);
244     aws_task_scheduler_clean_up(&epoll_loop->scheduler);
245 
246     while (!aws_linked_list_empty(&epoll_loop->task_pre_queue)) {
247         struct aws_linked_list_node *node = aws_linked_list_pop_front(&epoll_loop->task_pre_queue);
248         struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
249         task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
250     }
251 
252     aws_thread_clean_up(&epoll_loop->thread_created_on);
253 #if USE_EFD
254     close(epoll_loop->write_task_handle.data.fd);
255     epoll_loop->write_task_handle.data.fd = -1;
256     epoll_loop->read_task_handle.data.fd = -1;
257 #else
258     close(epoll_loop->read_task_handle.data.fd);
259     close(epoll_loop->write_task_handle.data.fd);
260 #endif
261 
262     close(epoll_loop->epoll_fd);
263     aws_mem_release(event_loop->alloc, epoll_loop);
264     aws_event_loop_clean_up_base(event_loop);
265     aws_mem_release(event_loop->alloc, event_loop);
266 }
267 
s_run(struct aws_event_loop * event_loop)268 static int s_run(struct aws_event_loop *event_loop) {
269     struct epoll_loop *epoll_loop = event_loop->impl_data;
270 
271     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop);
272 
273     epoll_loop->should_continue = true;
274     aws_thread_increment_unjoined_count();
275     if (aws_thread_launch(&epoll_loop->thread_created_on, &s_main_loop, event_loop, &epoll_loop->thread_options)) {
276         aws_thread_decrement_unjoined_count();
277         AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
278         epoll_loop->should_continue = false;
279         return AWS_OP_ERR;
280     }
281 
282     return AWS_OP_SUCCESS;
283 }
284 
s_stop_task(struct aws_task * task,void * args,enum aws_task_status status)285 static void s_stop_task(struct aws_task *task, void *args, enum aws_task_status status) {
286 
287     (void)task;
288     struct aws_event_loop *event_loop = args;
289     struct epoll_loop *epoll_loop = event_loop->impl_data;
290 
291     /* now okay to reschedule stop tasks. */
292     aws_atomic_store_ptr(&epoll_loop->stop_task_ptr, NULL);
293     if (status == AWS_TASK_STATUS_RUN_READY) {
294         /*
295          * this allows the event loop to invoke the callback once the event loop has completed.
296          */
297         epoll_loop->should_continue = false;
298     }
299 }
300 
s_stop(struct aws_event_loop * event_loop)301 static int s_stop(struct aws_event_loop *event_loop) {
302     struct epoll_loop *epoll_loop = event_loop->impl_data;
303 
304     void *expected_ptr = NULL;
305     bool update_succeeded =
306         aws_atomic_compare_exchange_ptr(&epoll_loop->stop_task_ptr, &expected_ptr, &epoll_loop->stop_task);
307     if (!update_succeeded) {
308         /* the stop task is already scheduled. */
309         return AWS_OP_SUCCESS;
310     }
311     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Stopping event-loop thread.", (void *)event_loop);
312     aws_task_init(&epoll_loop->stop_task, s_stop_task, event_loop, "epoll_event_loop_stop");
313     s_schedule_task_now(event_loop, &epoll_loop->stop_task);
314 
315     return AWS_OP_SUCCESS;
316 }
317 
s_wait_for_stop_completion(struct aws_event_loop * event_loop)318 static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
319     struct epoll_loop *epoll_loop = event_loop->impl_data;
320     int result = aws_thread_join(&epoll_loop->thread_created_on);
321     aws_thread_decrement_unjoined_count();
322     return result;
323 }
324 
s_schedule_task_common(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos)325 static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
326     struct epoll_loop *epoll_loop = event_loop->impl_data;
327 
328     /* if event loop and the caller are the same thread, just schedule and be done with it. */
329     if (s_is_on_callers_thread(event_loop)) {
330         AWS_LOGF_TRACE(
331             AWS_LS_IO_EVENT_LOOP,
332             "id=%p: scheduling task %p in-thread for timestamp %llu",
333             (void *)event_loop,
334             (void *)task,
335             (unsigned long long)run_at_nanos);
336         if (run_at_nanos == 0) {
337             /* zero denotes "now" task */
338             aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
339         } else {
340             aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, run_at_nanos);
341         }
342         return;
343     }
344 
345     AWS_LOGF_TRACE(
346         AWS_LS_IO_EVENT_LOOP,
347         "id=%p: Scheduling task %p cross-thread for timestamp %llu",
348         (void *)event_loop,
349         (void *)task,
350         (unsigned long long)run_at_nanos);
351     task->timestamp = run_at_nanos;
352     aws_mutex_lock(&epoll_loop->task_pre_queue_mutex);
353 
354     uint64_t counter = 1;
355 
356     bool is_first_task = aws_linked_list_empty(&epoll_loop->task_pre_queue);
357 
358     aws_linked_list_push_back(&epoll_loop->task_pre_queue, &task->node);
359 
360     /* if the list was not empty, we already have a pending read on the pipe/eventfd, no need to write again. */
361     if (is_first_task) {
362         AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Waking up event-loop thread", (void *)event_loop);
363 
364         /* If the write fails because the buffer is full, we don't actually care because that means there's a pending
365          * read on the pipe/eventfd and thus the event loop will end up checking to see if something has been queued.*/
366         ssize_t do_not_care = write(epoll_loop->write_task_handle.data.fd, (void *)&counter, sizeof(counter));
367         (void)do_not_care;
368     }
369 
370     aws_mutex_unlock(&epoll_loop->task_pre_queue_mutex);
371 }
372 
s_schedule_task_now(struct aws_event_loop * event_loop,struct aws_task * task)373 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
374     s_schedule_task_common(event_loop, task, 0 /* zero denotes "now" task */);
375 }
376 
s_schedule_task_future(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos)377 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
378     s_schedule_task_common(event_loop, task, run_at_nanos);
379 }
380 
s_cancel_task(struct aws_event_loop * event_loop,struct aws_task * task)381 static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
382     AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
383     struct epoll_loop *epoll_loop = event_loop->impl_data;
384     aws_task_scheduler_cancel_task(&epoll_loop->scheduler, task);
385 }
386 
s_subscribe_to_io_events(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,aws_event_loop_on_event_fn * on_event,void * user_data)387 static int s_subscribe_to_io_events(
388     struct aws_event_loop *event_loop,
389     struct aws_io_handle *handle,
390     int events,
391     aws_event_loop_on_event_fn *on_event,
392     void *user_data) {
393 
394     AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle->data.fd);
395     struct epoll_event_data *epoll_event_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct epoll_event_data));
396     handle->additional_data = epoll_event_data;
397     if (!epoll_event_data) {
398         return AWS_OP_ERR;
399     }
400 
401     struct epoll_loop *epoll_loop = event_loop->impl_data;
402     epoll_event_data->alloc = event_loop->alloc;
403     epoll_event_data->user_data = user_data;
404     epoll_event_data->handle = handle;
405     epoll_event_data->on_event = on_event;
406     epoll_event_data->is_subscribed = true;
407 
408     /*everyone is always registered for edge-triggered, hang up, remote hang up, errors. */
409     uint32_t event_mask = EPOLLET | EPOLLHUP | EPOLLRDHUP | EPOLLERR;
410 
411     if (events & AWS_IO_EVENT_TYPE_READABLE) {
412         event_mask |= EPOLLIN;
413     }
414 
415     if (events & AWS_IO_EVENT_TYPE_WRITABLE) {
416         event_mask |= EPOLLOUT;
417     }
418 
419     /* this guy is copied by epoll_ctl */
420     struct epoll_event epoll_event = {
421         .data = {.ptr = epoll_event_data},
422         .events = event_mask,
423     };
424 
425     if (epoll_ctl(epoll_loop->epoll_fd, EPOLL_CTL_ADD, handle->data.fd, &epoll_event)) {
426         AWS_LOGF_ERROR(
427             AWS_LS_IO_EVENT_LOOP, "id=%p: failed to subscribe to events on fd %d", (void *)event_loop, handle->data.fd);
428         handle->additional_data = NULL;
429         aws_mem_release(event_loop->alloc, epoll_event_data);
430         return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
431     }
432 
433     return AWS_OP_SUCCESS;
434 }
435 
s_free_io_event_resources(void * user_data)436 static void s_free_io_event_resources(void *user_data) {
437     struct epoll_event_data *event_data = user_data;
438     aws_mem_release(event_data->alloc, (void *)event_data);
439 }
440 
s_unsubscribe_cleanup_task(struct aws_task * task,void * arg,enum aws_task_status status)441 static void s_unsubscribe_cleanup_task(struct aws_task *task, void *arg, enum aws_task_status status) {
442     (void)task;
443     (void)status;
444     struct epoll_event_data *event_data = (struct epoll_event_data *)arg;
445     s_free_io_event_resources(event_data);
446 }
447 
s_unsubscribe_from_io_events(struct aws_event_loop * event_loop,struct aws_io_handle * handle)448 static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
449     AWS_LOGF_TRACE(
450         AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd);
451     struct epoll_loop *epoll_loop = event_loop->impl_data;
452 
453     AWS_ASSERT(handle->additional_data);
454     struct epoll_event_data *additional_handle_data = handle->additional_data;
455 
456     struct epoll_event dummy_event;
457 
458     if (AWS_UNLIKELY(epoll_ctl(epoll_loop->epoll_fd, EPOLL_CTL_DEL, handle->data.fd, &dummy_event /*ignored*/))) {
459         AWS_LOGF_ERROR(
460             AWS_LS_IO_EVENT_LOOP,
461             "id=%p: failed to un-subscribe from events on fd %d",
462             (void *)event_loop,
463             handle->data.fd);
464         return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
465     }
466 
467     /* We can't clean up yet, because we have schedule tasks and more events to process,
468      * mark it as unsubscribed and schedule a cleanup task. */
469     additional_handle_data->is_subscribed = false;
470 
471     aws_task_init(
472         &additional_handle_data->cleanup_task,
473         s_unsubscribe_cleanup_task,
474         additional_handle_data,
475         "epoll_event_loop_unsubscribe_cleanup");
476     s_schedule_task_now(event_loop, &additional_handle_data->cleanup_task);
477 
478     handle->additional_data = NULL;
479     return AWS_OP_SUCCESS;
480 }
481 
s_is_on_callers_thread(struct aws_event_loop * event_loop)482 static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
483     struct epoll_loop *epoll_loop = event_loop->impl_data;
484 
485     aws_thread_id_t *thread_id = aws_atomic_load_ptr(&epoll_loop->running_thread_id);
486     return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
487 }
488 
489 /* We treat the pipe fd with a subscription to io events just like any other managed file descriptor.
490  * This is the event handler for events on that pipe.*/
s_on_tasks_to_schedule(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,void * user_data)491 static void s_on_tasks_to_schedule(
492     struct aws_event_loop *event_loop,
493     struct aws_io_handle *handle,
494     int events,
495     void *user_data) {
496 
497     (void)handle;
498     (void)user_data;
499 
500     AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread tasks to schedule", (void *)event_loop);
501     struct epoll_loop *epoll_loop = event_loop->impl_data;
502     if (events & AWS_IO_EVENT_TYPE_READABLE) {
503         epoll_loop->should_process_task_pre_queue = true;
504     }
505 }
506 
s_process_task_pre_queue(struct aws_event_loop * event_loop)507 static void s_process_task_pre_queue(struct aws_event_loop *event_loop) {
508     struct epoll_loop *epoll_loop = event_loop->impl_data;
509 
510     if (!epoll_loop->should_process_task_pre_queue) {
511         return;
512     }
513 
514     AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop);
515     epoll_loop->should_process_task_pre_queue = false;
516 
517     struct aws_linked_list task_pre_queue;
518     aws_linked_list_init(&task_pre_queue);
519 
520     uint64_t count_ignore = 0;
521 
522     aws_mutex_lock(&epoll_loop->task_pre_queue_mutex);
523 
524     /* several tasks could theoretically have been written (though this should never happen), make sure we drain the
525      * eventfd/pipe. */
526     while (read(epoll_loop->read_task_handle.data.fd, &count_ignore, sizeof(count_ignore)) > -1) {
527     }
528 
529     aws_linked_list_swap_contents(&epoll_loop->task_pre_queue, &task_pre_queue);
530 
531     aws_mutex_unlock(&epoll_loop->task_pre_queue_mutex);
532 
533     while (!aws_linked_list_empty(&task_pre_queue)) {
534         struct aws_linked_list_node *node = aws_linked_list_pop_front(&task_pre_queue);
535         struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
536         AWS_LOGF_TRACE(
537             AWS_LS_IO_EVENT_LOOP,
538             "id=%p: task %p pulled to event-loop, scheduling now.",
539             (void *)event_loop,
540             (void *)task);
541         /* Timestamp 0 is used to denote "now" tasks */
542         if (task->timestamp == 0) {
543             aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
544         } else {
545             aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, task->timestamp);
546         }
547     }
548 }
549 
s_main_loop(void * args)550 static void s_main_loop(void *args) {
551     struct aws_event_loop *event_loop = args;
552     AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
553     struct epoll_loop *epoll_loop = event_loop->impl_data;
554 
555     /* set thread id to the thread of the event loop */
556     aws_atomic_store_ptr(&epoll_loop->running_thread_id, &epoll_loop->thread_created_on.thread_id);
557 
558     int err = s_subscribe_to_io_events(
559         event_loop, &epoll_loop->read_task_handle, AWS_IO_EVENT_TYPE_READABLE, s_on_tasks_to_schedule, NULL);
560     if (err) {
561         return;
562     }
563 
564     int timeout = DEFAULT_TIMEOUT;
565 
566     struct epoll_event events[MAX_EVENTS];
567 
568     AWS_LOGF_INFO(
569         AWS_LS_IO_EVENT_LOOP,
570         "id=%p: default timeout %d, and max events to process per tick %d",
571         (void *)event_loop,
572         timeout,
573         MAX_EVENTS);
574 
575     /*
576      * until stop is called,
577      * call epoll_wait, if a task is scheduled, or a file descriptor has activity, it will
578      * return.
579      *
580      * process all events,
581      *
582      * run all scheduled tasks.
583      *
584      * process queued subscription cleanups.
585      */
586     while (epoll_loop->should_continue) {
587 
588         AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: waiting for a maximum of %d ms", (void *)event_loop, timeout);
589         int event_count = epoll_wait(epoll_loop->epoll_fd, events, MAX_EVENTS, timeout);
590         aws_event_loop_register_tick_start(event_loop);
591 
592         AWS_LOGF_TRACE(
593             AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, event_count);
594         for (int i = 0; i < event_count; ++i) {
595             struct epoll_event_data *event_data = (struct epoll_event_data *)events[i].data.ptr;
596 
597             int event_mask = 0;
598             if (events[i].events & EPOLLIN) {
599                 event_mask |= AWS_IO_EVENT_TYPE_READABLE;
600             }
601 
602             if (events[i].events & EPOLLOUT) {
603                 event_mask |= AWS_IO_EVENT_TYPE_WRITABLE;
604             }
605 
606             if (events[i].events & EPOLLRDHUP) {
607                 event_mask |= AWS_IO_EVENT_TYPE_REMOTE_HANG_UP;
608             }
609 
610             if (events[i].events & EPOLLHUP) {
611                 event_mask |= AWS_IO_EVENT_TYPE_CLOSED;
612             }
613 
614             if (events[i].events & EPOLLERR) {
615                 event_mask |= AWS_IO_EVENT_TYPE_ERROR;
616             }
617 
618             if (event_data->is_subscribed) {
619                 AWS_LOGF_TRACE(
620                     AWS_LS_IO_EVENT_LOOP,
621                     "id=%p: activity on fd %d, invoking handler.",
622                     (void *)event_loop,
623                     event_data->handle->data.fd);
624                 event_data->on_event(event_loop, event_data->handle, event_mask, event_data->user_data);
625             }
626         }
627 
628         /* run scheduled tasks */
629         s_process_task_pre_queue(event_loop);
630 
631         uint64_t now_ns = 0;
632         event_loop->clock(&now_ns); /* if clock fails, now_ns will be 0 and tasks scheduled for a specific time
633                                        will not be run. That's ok, we'll handle them next time around. */
634         AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
635         aws_task_scheduler_run_all(&epoll_loop->scheduler, now_ns);
636 
637         /* set timeout for next epoll_wait() call.
638          * if clock fails, or scheduler has no tasks, use default timeout */
639         bool use_default_timeout = false;
640 
641         if (event_loop->clock(&now_ns)) {
642             use_default_timeout = true;
643         }
644 
645         uint64_t next_run_time_ns;
646         if (!aws_task_scheduler_has_tasks(&epoll_loop->scheduler, &next_run_time_ns)) {
647             use_default_timeout = true;
648         }
649 
650         if (use_default_timeout) {
651             AWS_LOGF_TRACE(
652                 AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
653             timeout = DEFAULT_TIMEOUT;
654         } else {
655             /* Translate timestamp (in nanoseconds) to timeout (in milliseconds) */
656             uint64_t timeout_ns = (next_run_time_ns > now_ns) ? (next_run_time_ns - now_ns) : 0;
657             uint64_t timeout_ms64 = aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
658             timeout = timeout_ms64 > INT_MAX ? INT_MAX : (int)timeout_ms64;
659             AWS_LOGF_TRACE(
660                 AWS_LS_IO_EVENT_LOOP,
661                 "id=%p: detected more scheduled tasks with the next occurring at "
662                 "%llu, using timeout of %d.",
663                 (void *)event_loop,
664                 (unsigned long long)timeout_ns,
665                 timeout);
666         }
667 
668         aws_event_loop_register_tick_end(event_loop);
669     }
670 
671     AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
672     s_unsubscribe_from_io_events(event_loop, &epoll_loop->read_task_handle);
673     /* set thread id back to NULL. This should be updated again in destroy, before tasks are canceled. */
674     aws_atomic_store_ptr(&epoll_loop->running_thread_id, NULL);
675 }
676