1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6 #include <aws/io/event_loop.h>
7
8 #include <aws/common/atomics.h>
9 #include <aws/common/clock.h>
10 #include <aws/common/mutex.h>
11 #include <aws/common/task_scheduler.h>
12 #include <aws/common/thread.h>
13
14 #include <aws/io/logging.h>
15
16 #include <sys/epoll.h>
17
18 #include <errno.h>
19 #include <limits.h>
20 #include <unistd.h>
21
22 #if !defined(COMPAT_MODE) && defined(__GLIBC__) && __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 8
23 # define USE_EFD 1
24 #else
25 # define USE_EFD 0
26 #endif
27
28 #if USE_EFD
29 # include <aws/io/io.h>
30 # include <sys/eventfd.h>
31
32 #else
33 # include <aws/io/pipe.h>
34 #endif
35
36 /* This isn't defined on ancient linux distros (breaking the builds).
37 * However, if this is a prebuild, we purposely build on an ancient system, but
38 * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
39 * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
40 * gets passed as long as it does.
41 */
42 #ifndef EPOLLRDHUP
43 # define EPOLLRDHUP 0x2000
44 #endif
45
46 static void s_destroy(struct aws_event_loop *event_loop);
47 static int s_run(struct aws_event_loop *event_loop);
48 static int s_stop(struct aws_event_loop *event_loop);
49 static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
50 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
51 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
52 static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
53 static int s_subscribe_to_io_events(
54 struct aws_event_loop *event_loop,
55 struct aws_io_handle *handle,
56 int events,
57 aws_event_loop_on_event_fn *on_event,
58 void *user_data);
59 static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
60 static void s_free_io_event_resources(void *user_data);
61 static bool s_is_on_callers_thread(struct aws_event_loop *event_loop);
62
63 static void s_main_loop(void *args);
64
65 static struct aws_event_loop_vtable s_vtable = {
66 .destroy = s_destroy,
67 .run = s_run,
68 .stop = s_stop,
69 .wait_for_stop_completion = s_wait_for_stop_completion,
70 .schedule_task_now = s_schedule_task_now,
71 .schedule_task_future = s_schedule_task_future,
72 .cancel_task = s_cancel_task,
73 .subscribe_to_io_events = s_subscribe_to_io_events,
74 .unsubscribe_from_io_events = s_unsubscribe_from_io_events,
75 .free_io_event_resources = s_free_io_event_resources,
76 .is_on_callers_thread = s_is_on_callers_thread,
77 };
78
79 struct epoll_loop {
80 struct aws_task_scheduler scheduler;
81 struct aws_thread thread_created_on;
82 struct aws_thread_options thread_options;
83 aws_thread_id_t thread_joined_to;
84 struct aws_atomic_var running_thread_id;
85 struct aws_io_handle read_task_handle;
86 struct aws_io_handle write_task_handle;
87 struct aws_mutex task_pre_queue_mutex;
88 struct aws_linked_list task_pre_queue;
89 struct aws_task stop_task;
90 struct aws_atomic_var stop_task_ptr;
91 int epoll_fd;
92 bool should_process_task_pre_queue;
93 bool should_continue;
94 };
95
96 struct epoll_event_data {
97 struct aws_allocator *alloc;
98 struct aws_io_handle *handle;
99 aws_event_loop_on_event_fn *on_event;
100 void *user_data;
101 struct aws_task cleanup_task;
102 bool is_subscribed; /* false when handle is unsubscribed, but this struct hasn't been cleaned up yet */
103 };
104
105 /* default timeout is 100 seconds */
106 enum {
107 DEFAULT_TIMEOUT = 100 * 1000,
108 MAX_EVENTS = 100,
109 };
110
111 int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);
112
113 /* Setup edge triggered epoll with a scheduler. */
aws_event_loop_new_default_with_options(struct aws_allocator * alloc,const struct aws_event_loop_options * options)114 struct aws_event_loop *aws_event_loop_new_default_with_options(
115 struct aws_allocator *alloc,
116 const struct aws_event_loop_options *options) {
117 AWS_PRECONDITION(options);
118 AWS_PRECONDITION(options->clock);
119
120 struct aws_event_loop *loop = aws_mem_calloc(alloc, 1, sizeof(struct aws_event_loop));
121 if (!loop) {
122 return NULL;
123 }
124
125 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered epoll", (void *)loop);
126 if (aws_event_loop_init_base(loop, alloc, options->clock)) {
127 goto clean_up_loop;
128 }
129
130 struct epoll_loop *epoll_loop = aws_mem_calloc(alloc, 1, sizeof(struct epoll_loop));
131 if (!epoll_loop) {
132 goto cleanup_base_loop;
133 }
134
135 if (options->thread_options) {
136 epoll_loop->thread_options = *options->thread_options;
137 } else {
138 epoll_loop->thread_options = *aws_default_thread_options();
139 }
140
141 /* initialize thread id to NULL, it should be updated when the event loop thread starts. */
142 aws_atomic_init_ptr(&epoll_loop->running_thread_id, NULL);
143
144 aws_linked_list_init(&epoll_loop->task_pre_queue);
145 epoll_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT;
146 aws_atomic_init_ptr(&epoll_loop->stop_task_ptr, NULL);
147
148 epoll_loop->epoll_fd = epoll_create(100);
149 if (epoll_loop->epoll_fd < 0) {
150 AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open epoll handle.", (void *)loop);
151 aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
152 goto clean_up_epoll;
153 }
154
155 if (aws_thread_init(&epoll_loop->thread_created_on, alloc)) {
156 goto clean_up_epoll;
157 }
158
159 #if USE_EFD
160 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Using eventfd for cross-thread notifications.", (void *)loop);
161 int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
162
163 if (fd < 0) {
164 AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open eventfd handle.", (void *)loop);
165 aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
166 goto clean_up_thread;
167 }
168
169 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: eventfd descriptor %d.", (void *)loop, fd);
170 epoll_loop->write_task_handle = (struct aws_io_handle){.data.fd = fd, .additional_data = NULL};
171 epoll_loop->read_task_handle = (struct aws_io_handle){.data.fd = fd, .additional_data = NULL};
172 #else
173 AWS_LOGF_DEBUG(
174 AWS_LS_IO_EVENT_LOOP,
175 "id=%p: Eventfd not available, falling back to pipe for cross-thread notification.",
176 (void *)loop);
177
178 int pipe_fds[2] = {0};
179 /* this pipe is for task scheduling. */
180 if (aws_open_nonblocking_posix_pipe(pipe_fds)) {
181 AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)loop);
182 goto clean_up_thread;
183 }
184
185 AWS_LOGF_TRACE(
186 AWS_LS_IO_EVENT_LOOP, "id=%p: pipe descriptors read %d, write %d.", (void *)loop, pipe_fds[0], pipe_fds[1]);
187 epoll_loop->write_task_handle.data.fd = pipe_fds[1];
188 epoll_loop->read_task_handle.data.fd = pipe_fds[0];
189 #endif
190
191 if (aws_task_scheduler_init(&epoll_loop->scheduler, alloc)) {
192 goto clean_up_pipe;
193 }
194
195 epoll_loop->should_continue = false;
196
197 loop->impl_data = epoll_loop;
198 loop->vtable = &s_vtable;
199
200 return loop;
201
202 clean_up_pipe:
203 #if USE_EFD
204 close(epoll_loop->write_task_handle.data.fd);
205 epoll_loop->write_task_handle.data.fd = -1;
206 epoll_loop->read_task_handle.data.fd = -1;
207 #else
208 close(epoll_loop->read_task_handle.data.fd);
209 close(epoll_loop->write_task_handle.data.fd);
210 #endif
211
212 clean_up_thread:
213 aws_thread_clean_up(&epoll_loop->thread_created_on);
214
215 clean_up_epoll:
216 if (epoll_loop->epoll_fd >= 0) {
217 close(epoll_loop->epoll_fd);
218 }
219
220 aws_mem_release(alloc, epoll_loop);
221
222 cleanup_base_loop:
223 aws_event_loop_clean_up_base(loop);
224
225 clean_up_loop:
226 aws_mem_release(alloc, loop);
227
228 return NULL;
229 }
230
s_destroy(struct aws_event_loop * event_loop)231 static void s_destroy(struct aws_event_loop *event_loop) {
232 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying event_loop", (void *)event_loop);
233
234 struct epoll_loop *epoll_loop = event_loop->impl_data;
235
236 /* we don't know if stop() has been called by someone else,
237 * just call stop() again and wait for event-loop to finish. */
238 aws_event_loop_stop(event_loop);
239 s_wait_for_stop_completion(event_loop);
240
241 /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
242 epoll_loop->thread_joined_to = aws_thread_current_thread_id();
243 aws_atomic_store_ptr(&epoll_loop->running_thread_id, &epoll_loop->thread_joined_to);
244 aws_task_scheduler_clean_up(&epoll_loop->scheduler);
245
246 while (!aws_linked_list_empty(&epoll_loop->task_pre_queue)) {
247 struct aws_linked_list_node *node = aws_linked_list_pop_front(&epoll_loop->task_pre_queue);
248 struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
249 task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
250 }
251
252 aws_thread_clean_up(&epoll_loop->thread_created_on);
253 #if USE_EFD
254 close(epoll_loop->write_task_handle.data.fd);
255 epoll_loop->write_task_handle.data.fd = -1;
256 epoll_loop->read_task_handle.data.fd = -1;
257 #else
258 close(epoll_loop->read_task_handle.data.fd);
259 close(epoll_loop->write_task_handle.data.fd);
260 #endif
261
262 close(epoll_loop->epoll_fd);
263 aws_mem_release(event_loop->alloc, epoll_loop);
264 aws_event_loop_clean_up_base(event_loop);
265 aws_mem_release(event_loop->alloc, event_loop);
266 }
267
s_run(struct aws_event_loop * event_loop)268 static int s_run(struct aws_event_loop *event_loop) {
269 struct epoll_loop *epoll_loop = event_loop->impl_data;
270
271 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop);
272
273 epoll_loop->should_continue = true;
274 aws_thread_increment_unjoined_count();
275 if (aws_thread_launch(&epoll_loop->thread_created_on, &s_main_loop, event_loop, &epoll_loop->thread_options)) {
276 aws_thread_decrement_unjoined_count();
277 AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
278 epoll_loop->should_continue = false;
279 return AWS_OP_ERR;
280 }
281
282 return AWS_OP_SUCCESS;
283 }
284
s_stop_task(struct aws_task * task,void * args,enum aws_task_status status)285 static void s_stop_task(struct aws_task *task, void *args, enum aws_task_status status) {
286
287 (void)task;
288 struct aws_event_loop *event_loop = args;
289 struct epoll_loop *epoll_loop = event_loop->impl_data;
290
291 /* now okay to reschedule stop tasks. */
292 aws_atomic_store_ptr(&epoll_loop->stop_task_ptr, NULL);
293 if (status == AWS_TASK_STATUS_RUN_READY) {
294 /*
295 * this allows the event loop to invoke the callback once the event loop has completed.
296 */
297 epoll_loop->should_continue = false;
298 }
299 }
300
s_stop(struct aws_event_loop * event_loop)301 static int s_stop(struct aws_event_loop *event_loop) {
302 struct epoll_loop *epoll_loop = event_loop->impl_data;
303
304 void *expected_ptr = NULL;
305 bool update_succeeded =
306 aws_atomic_compare_exchange_ptr(&epoll_loop->stop_task_ptr, &expected_ptr, &epoll_loop->stop_task);
307 if (!update_succeeded) {
308 /* the stop task is already scheduled. */
309 return AWS_OP_SUCCESS;
310 }
311 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Stopping event-loop thread.", (void *)event_loop);
312 aws_task_init(&epoll_loop->stop_task, s_stop_task, event_loop, "epoll_event_loop_stop");
313 s_schedule_task_now(event_loop, &epoll_loop->stop_task);
314
315 return AWS_OP_SUCCESS;
316 }
317
s_wait_for_stop_completion(struct aws_event_loop * event_loop)318 static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
319 struct epoll_loop *epoll_loop = event_loop->impl_data;
320 int result = aws_thread_join(&epoll_loop->thread_created_on);
321 aws_thread_decrement_unjoined_count();
322 return result;
323 }
324
s_schedule_task_common(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos)325 static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
326 struct epoll_loop *epoll_loop = event_loop->impl_data;
327
328 /* if event loop and the caller are the same thread, just schedule and be done with it. */
329 if (s_is_on_callers_thread(event_loop)) {
330 AWS_LOGF_TRACE(
331 AWS_LS_IO_EVENT_LOOP,
332 "id=%p: scheduling task %p in-thread for timestamp %llu",
333 (void *)event_loop,
334 (void *)task,
335 (unsigned long long)run_at_nanos);
336 if (run_at_nanos == 0) {
337 /* zero denotes "now" task */
338 aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
339 } else {
340 aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, run_at_nanos);
341 }
342 return;
343 }
344
345 AWS_LOGF_TRACE(
346 AWS_LS_IO_EVENT_LOOP,
347 "id=%p: Scheduling task %p cross-thread for timestamp %llu",
348 (void *)event_loop,
349 (void *)task,
350 (unsigned long long)run_at_nanos);
351 task->timestamp = run_at_nanos;
352 aws_mutex_lock(&epoll_loop->task_pre_queue_mutex);
353
354 uint64_t counter = 1;
355
356 bool is_first_task = aws_linked_list_empty(&epoll_loop->task_pre_queue);
357
358 aws_linked_list_push_back(&epoll_loop->task_pre_queue, &task->node);
359
360 /* if the list was not empty, we already have a pending read on the pipe/eventfd, no need to write again. */
361 if (is_first_task) {
362 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Waking up event-loop thread", (void *)event_loop);
363
364 /* If the write fails because the buffer is full, we don't actually care because that means there's a pending
365 * read on the pipe/eventfd and thus the event loop will end up checking to see if something has been queued.*/
366 ssize_t do_not_care = write(epoll_loop->write_task_handle.data.fd, (void *)&counter, sizeof(counter));
367 (void)do_not_care;
368 }
369
370 aws_mutex_unlock(&epoll_loop->task_pre_queue_mutex);
371 }
372
s_schedule_task_now(struct aws_event_loop * event_loop,struct aws_task * task)373 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
374 s_schedule_task_common(event_loop, task, 0 /* zero denotes "now" task */);
375 }
376
s_schedule_task_future(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos)377 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
378 s_schedule_task_common(event_loop, task, run_at_nanos);
379 }
380
s_cancel_task(struct aws_event_loop * event_loop,struct aws_task * task)381 static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
382 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
383 struct epoll_loop *epoll_loop = event_loop->impl_data;
384 aws_task_scheduler_cancel_task(&epoll_loop->scheduler, task);
385 }
386
s_subscribe_to_io_events(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,aws_event_loop_on_event_fn * on_event,void * user_data)387 static int s_subscribe_to_io_events(
388 struct aws_event_loop *event_loop,
389 struct aws_io_handle *handle,
390 int events,
391 aws_event_loop_on_event_fn *on_event,
392 void *user_data) {
393
394 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle->data.fd);
395 struct epoll_event_data *epoll_event_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct epoll_event_data));
396 handle->additional_data = epoll_event_data;
397 if (!epoll_event_data) {
398 return AWS_OP_ERR;
399 }
400
401 struct epoll_loop *epoll_loop = event_loop->impl_data;
402 epoll_event_data->alloc = event_loop->alloc;
403 epoll_event_data->user_data = user_data;
404 epoll_event_data->handle = handle;
405 epoll_event_data->on_event = on_event;
406 epoll_event_data->is_subscribed = true;
407
408 /*everyone is always registered for edge-triggered, hang up, remote hang up, errors. */
409 uint32_t event_mask = EPOLLET | EPOLLHUP | EPOLLRDHUP | EPOLLERR;
410
411 if (events & AWS_IO_EVENT_TYPE_READABLE) {
412 event_mask |= EPOLLIN;
413 }
414
415 if (events & AWS_IO_EVENT_TYPE_WRITABLE) {
416 event_mask |= EPOLLOUT;
417 }
418
419 /* this guy is copied by epoll_ctl */
420 struct epoll_event epoll_event = {
421 .data = {.ptr = epoll_event_data},
422 .events = event_mask,
423 };
424
425 if (epoll_ctl(epoll_loop->epoll_fd, EPOLL_CTL_ADD, handle->data.fd, &epoll_event)) {
426 AWS_LOGF_ERROR(
427 AWS_LS_IO_EVENT_LOOP, "id=%p: failed to subscribe to events on fd %d", (void *)event_loop, handle->data.fd);
428 handle->additional_data = NULL;
429 aws_mem_release(event_loop->alloc, epoll_event_data);
430 return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
431 }
432
433 return AWS_OP_SUCCESS;
434 }
435
s_free_io_event_resources(void * user_data)436 static void s_free_io_event_resources(void *user_data) {
437 struct epoll_event_data *event_data = user_data;
438 aws_mem_release(event_data->alloc, (void *)event_data);
439 }
440
s_unsubscribe_cleanup_task(struct aws_task * task,void * arg,enum aws_task_status status)441 static void s_unsubscribe_cleanup_task(struct aws_task *task, void *arg, enum aws_task_status status) {
442 (void)task;
443 (void)status;
444 struct epoll_event_data *event_data = (struct epoll_event_data *)arg;
445 s_free_io_event_resources(event_data);
446 }
447
s_unsubscribe_from_io_events(struct aws_event_loop * event_loop,struct aws_io_handle * handle)448 static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
449 AWS_LOGF_TRACE(
450 AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd);
451 struct epoll_loop *epoll_loop = event_loop->impl_data;
452
453 AWS_ASSERT(handle->additional_data);
454 struct epoll_event_data *additional_handle_data = handle->additional_data;
455
456 struct epoll_event dummy_event;
457
458 if (AWS_UNLIKELY(epoll_ctl(epoll_loop->epoll_fd, EPOLL_CTL_DEL, handle->data.fd, &dummy_event /*ignored*/))) {
459 AWS_LOGF_ERROR(
460 AWS_LS_IO_EVENT_LOOP,
461 "id=%p: failed to un-subscribe from events on fd %d",
462 (void *)event_loop,
463 handle->data.fd);
464 return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
465 }
466
467 /* We can't clean up yet, because we have schedule tasks and more events to process,
468 * mark it as unsubscribed and schedule a cleanup task. */
469 additional_handle_data->is_subscribed = false;
470
471 aws_task_init(
472 &additional_handle_data->cleanup_task,
473 s_unsubscribe_cleanup_task,
474 additional_handle_data,
475 "epoll_event_loop_unsubscribe_cleanup");
476 s_schedule_task_now(event_loop, &additional_handle_data->cleanup_task);
477
478 handle->additional_data = NULL;
479 return AWS_OP_SUCCESS;
480 }
481
s_is_on_callers_thread(struct aws_event_loop * event_loop)482 static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
483 struct epoll_loop *epoll_loop = event_loop->impl_data;
484
485 aws_thread_id_t *thread_id = aws_atomic_load_ptr(&epoll_loop->running_thread_id);
486 return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
487 }
488
489 /* We treat the pipe fd with a subscription to io events just like any other managed file descriptor.
490 * This is the event handler for events on that pipe.*/
s_on_tasks_to_schedule(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,void * user_data)491 static void s_on_tasks_to_schedule(
492 struct aws_event_loop *event_loop,
493 struct aws_io_handle *handle,
494 int events,
495 void *user_data) {
496
497 (void)handle;
498 (void)user_data;
499
500 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread tasks to schedule", (void *)event_loop);
501 struct epoll_loop *epoll_loop = event_loop->impl_data;
502 if (events & AWS_IO_EVENT_TYPE_READABLE) {
503 epoll_loop->should_process_task_pre_queue = true;
504 }
505 }
506
s_process_task_pre_queue(struct aws_event_loop * event_loop)507 static void s_process_task_pre_queue(struct aws_event_loop *event_loop) {
508 struct epoll_loop *epoll_loop = event_loop->impl_data;
509
510 if (!epoll_loop->should_process_task_pre_queue) {
511 return;
512 }
513
514 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop);
515 epoll_loop->should_process_task_pre_queue = false;
516
517 struct aws_linked_list task_pre_queue;
518 aws_linked_list_init(&task_pre_queue);
519
520 uint64_t count_ignore = 0;
521
522 aws_mutex_lock(&epoll_loop->task_pre_queue_mutex);
523
524 /* several tasks could theoretically have been written (though this should never happen), make sure we drain the
525 * eventfd/pipe. */
526 while (read(epoll_loop->read_task_handle.data.fd, &count_ignore, sizeof(count_ignore)) > -1) {
527 }
528
529 aws_linked_list_swap_contents(&epoll_loop->task_pre_queue, &task_pre_queue);
530
531 aws_mutex_unlock(&epoll_loop->task_pre_queue_mutex);
532
533 while (!aws_linked_list_empty(&task_pre_queue)) {
534 struct aws_linked_list_node *node = aws_linked_list_pop_front(&task_pre_queue);
535 struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
536 AWS_LOGF_TRACE(
537 AWS_LS_IO_EVENT_LOOP,
538 "id=%p: task %p pulled to event-loop, scheduling now.",
539 (void *)event_loop,
540 (void *)task);
541 /* Timestamp 0 is used to denote "now" tasks */
542 if (task->timestamp == 0) {
543 aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
544 } else {
545 aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, task->timestamp);
546 }
547 }
548 }
549
s_main_loop(void * args)550 static void s_main_loop(void *args) {
551 struct aws_event_loop *event_loop = args;
552 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
553 struct epoll_loop *epoll_loop = event_loop->impl_data;
554
555 /* set thread id to the thread of the event loop */
556 aws_atomic_store_ptr(&epoll_loop->running_thread_id, &epoll_loop->thread_created_on.thread_id);
557
558 int err = s_subscribe_to_io_events(
559 event_loop, &epoll_loop->read_task_handle, AWS_IO_EVENT_TYPE_READABLE, s_on_tasks_to_schedule, NULL);
560 if (err) {
561 return;
562 }
563
564 int timeout = DEFAULT_TIMEOUT;
565
566 struct epoll_event events[MAX_EVENTS];
567
568 AWS_LOGF_INFO(
569 AWS_LS_IO_EVENT_LOOP,
570 "id=%p: default timeout %d, and max events to process per tick %d",
571 (void *)event_loop,
572 timeout,
573 MAX_EVENTS);
574
575 /*
576 * until stop is called,
577 * call epoll_wait, if a task is scheduled, or a file descriptor has activity, it will
578 * return.
579 *
580 * process all events,
581 *
582 * run all scheduled tasks.
583 *
584 * process queued subscription cleanups.
585 */
586 while (epoll_loop->should_continue) {
587
588 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: waiting for a maximum of %d ms", (void *)event_loop, timeout);
589 int event_count = epoll_wait(epoll_loop->epoll_fd, events, MAX_EVENTS, timeout);
590 aws_event_loop_register_tick_start(event_loop);
591
592 AWS_LOGF_TRACE(
593 AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, event_count);
594 for (int i = 0; i < event_count; ++i) {
595 struct epoll_event_data *event_data = (struct epoll_event_data *)events[i].data.ptr;
596
597 int event_mask = 0;
598 if (events[i].events & EPOLLIN) {
599 event_mask |= AWS_IO_EVENT_TYPE_READABLE;
600 }
601
602 if (events[i].events & EPOLLOUT) {
603 event_mask |= AWS_IO_EVENT_TYPE_WRITABLE;
604 }
605
606 if (events[i].events & EPOLLRDHUP) {
607 event_mask |= AWS_IO_EVENT_TYPE_REMOTE_HANG_UP;
608 }
609
610 if (events[i].events & EPOLLHUP) {
611 event_mask |= AWS_IO_EVENT_TYPE_CLOSED;
612 }
613
614 if (events[i].events & EPOLLERR) {
615 event_mask |= AWS_IO_EVENT_TYPE_ERROR;
616 }
617
618 if (event_data->is_subscribed) {
619 AWS_LOGF_TRACE(
620 AWS_LS_IO_EVENT_LOOP,
621 "id=%p: activity on fd %d, invoking handler.",
622 (void *)event_loop,
623 event_data->handle->data.fd);
624 event_data->on_event(event_loop, event_data->handle, event_mask, event_data->user_data);
625 }
626 }
627
628 /* run scheduled tasks */
629 s_process_task_pre_queue(event_loop);
630
631 uint64_t now_ns = 0;
632 event_loop->clock(&now_ns); /* if clock fails, now_ns will be 0 and tasks scheduled for a specific time
633 will not be run. That's ok, we'll handle them next time around. */
634 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
635 aws_task_scheduler_run_all(&epoll_loop->scheduler, now_ns);
636
637 /* set timeout for next epoll_wait() call.
638 * if clock fails, or scheduler has no tasks, use default timeout */
639 bool use_default_timeout = false;
640
641 if (event_loop->clock(&now_ns)) {
642 use_default_timeout = true;
643 }
644
645 uint64_t next_run_time_ns;
646 if (!aws_task_scheduler_has_tasks(&epoll_loop->scheduler, &next_run_time_ns)) {
647 use_default_timeout = true;
648 }
649
650 if (use_default_timeout) {
651 AWS_LOGF_TRACE(
652 AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
653 timeout = DEFAULT_TIMEOUT;
654 } else {
655 /* Translate timestamp (in nanoseconds) to timeout (in milliseconds) */
656 uint64_t timeout_ns = (next_run_time_ns > now_ns) ? (next_run_time_ns - now_ns) : 0;
657 uint64_t timeout_ms64 = aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
658 timeout = timeout_ms64 > INT_MAX ? INT_MAX : (int)timeout_ms64;
659 AWS_LOGF_TRACE(
660 AWS_LS_IO_EVENT_LOOP,
661 "id=%p: detected more scheduled tasks with the next occurring at "
662 "%llu, using timeout of %d.",
663 (void *)event_loop,
664 (unsigned long long)timeout_ns,
665 timeout);
666 }
667
668 aws_event_loop_register_tick_end(event_loop);
669 }
670
671 AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
672 s_unsubscribe_from_io_events(event_loop, &epoll_loop->read_task_handle);
673 /* set thread id back to NULL. This should be updated again in destroy, before tasks are canceled. */
674 aws_atomic_store_ptr(&epoll_loop->running_thread_id, NULL);
675 }
676