1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6 #include <aws/io/event_loop.h>
7
8 #include <aws/io/logging.h>
9
10 #include <aws/common/atomics.h>
11 #include <aws/common/clock.h>
12 #include <aws/common/mutex.h>
13 #include <aws/common/task_scheduler.h>
14 #include <aws/common/thread.h>
15
16 #if defined(__FreeBSD__) || defined(__NetBSD__)
17 # define __BSD_VISIBLE 1
18 # include <sys/types.h>
19 #endif
20
21 #include <sys/event.h>
22
23 #include <aws/io/io.h>
24 #include <limits.h>
25 #include <unistd.h>
26
27 static void s_destroy(struct aws_event_loop *event_loop);
28 static int s_run(struct aws_event_loop *event_loop);
29 static int s_stop(struct aws_event_loop *event_loop);
30 static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
31 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
32 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
33 static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
34 static int s_subscribe_to_io_events(
35 struct aws_event_loop *event_loop,
36 struct aws_io_handle *handle,
37 int events,
38 aws_event_loop_on_event_fn *on_event,
39 void *user_data);
40 static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
41 static void s_free_io_event_resources(void *user_data);
42 static bool s_is_event_thread(struct aws_event_loop *event_loop);
43
44 static void s_event_thread_main(void *user_data);
45
46 int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);
47
48 enum event_thread_state {
49 EVENT_THREAD_STATE_READY_TO_RUN,
50 EVENT_THREAD_STATE_RUNNING,
51 EVENT_THREAD_STATE_STOPPING,
52 };
53
54 enum pipe_fd_index {
55 READ_FD,
56 WRITE_FD,
57 };
58
59 struct kqueue_loop {
60 /* thread_created_on is the handle to the event loop thread. */
61 struct aws_thread thread_created_on;
62 /* thread_joined_to is used by the thread destroying the event loop. */
63 aws_thread_id_t thread_joined_to;
64 /* running_thread_id is NULL if the event loop thread is stopped or points-to the thread_id of the thread running
65 * the event loop (either thread_created_on or thread_joined_to). Atomic because of concurrent writes (e.g.,
66 * run/stop) and reads (e.g., is_event_loop_thread).
67 * An aws_thread_id_t variable itself cannot be atomic because it is an opaque type that is platform-dependent. */
68 struct aws_atomic_var running_thread_id;
69 int kq_fd; /* kqueue file descriptor */
70
71 /* Pipe for signaling to event-thread that cross_thread_data has changed. */
72 int cross_thread_signal_pipe[2];
73
74 /* cross_thread_data holds things that must be communicated across threads.
75 * When the event-thread is running, the mutex must be locked while anyone touches anything in cross_thread_data.
76 * If this data is modified outside the thread, the thread is signaled via activity on a pipe. */
77 struct {
78 struct aws_mutex mutex;
79 bool thread_signaled; /* whether thread has been signaled about changes to cross_thread_data */
80 struct aws_linked_list tasks_to_schedule;
81 enum event_thread_state state;
82 } cross_thread_data;
83
84 /* thread_data holds things which, when the event-thread is running, may only be touched by the thread */
85 struct {
86 struct aws_task_scheduler scheduler;
87
88 int connected_handle_count;
89
90 /* These variables duplicate ones in cross_thread_data. We move values out while holding the mutex and operate
91 * on them later */
92 enum event_thread_state state;
93 } thread_data;
94
95 struct aws_thread_options thread_options;
96 };
97
98 /* Data attached to aws_io_handle while the handle is subscribed to io events */
99 struct handle_data {
100 struct aws_io_handle *owner;
101 struct aws_event_loop *event_loop;
102 aws_event_loop_on_event_fn *on_event;
103 void *on_event_user_data;
104
105 int events_subscribed; /* aws_io_event_types this handle should be subscribed to */
106 int events_this_loop; /* aws_io_event_types received during current loop of the event-thread */
107
108 enum { HANDLE_STATE_SUBSCRIBING, HANDLE_STATE_SUBSCRIBED, HANDLE_STATE_UNSUBSCRIBED } state;
109
110 struct aws_task subscribe_task;
111 struct aws_task cleanup_task;
112 };
113
114 enum {
115 DEFAULT_TIMEOUT_SEC = 100, /* Max kevent() timeout per loop of the event-thread */
116 MAX_EVENTS = 100, /* Max kevents to process per loop of the event-thread */
117 };
118
119 struct aws_event_loop_vtable s_kqueue_vtable = {
120 .destroy = s_destroy,
121 .run = s_run,
122 .stop = s_stop,
123 .wait_for_stop_completion = s_wait_for_stop_completion,
124 .schedule_task_now = s_schedule_task_now,
125 .schedule_task_future = s_schedule_task_future,
126 .subscribe_to_io_events = s_subscribe_to_io_events,
127 .cancel_task = s_cancel_task,
128 .unsubscribe_from_io_events = s_unsubscribe_from_io_events,
129 .free_io_event_resources = s_free_io_event_resources,
130 .is_on_callers_thread = s_is_event_thread,
131 };
132
aws_event_loop_new_default_with_options(struct aws_allocator * alloc,const struct aws_event_loop_options * options)133 struct aws_event_loop *aws_event_loop_new_default_with_options(
134 struct aws_allocator *alloc,
135 const struct aws_event_loop_options *options) {
136 AWS_ASSERT(alloc);
137 AWS_ASSERT(clock);
138 AWS_ASSERT(options);
139 AWS_ASSERT(options->clock);
140
141 bool clean_up_event_loop_mem = false;
142 bool clean_up_event_loop_base = false;
143 bool clean_up_impl_mem = false;
144 bool clean_up_thread = false;
145 bool clean_up_kqueue = false;
146 bool clean_up_signal_pipe = false;
147 bool clean_up_signal_kevent = false;
148 bool clean_up_mutex = false;
149
150 struct aws_event_loop *event_loop = aws_mem_acquire(alloc, sizeof(struct aws_event_loop));
151 if (!event_loop) {
152 return NULL;
153 }
154
155 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered kqueue", (void *)event_loop);
156 clean_up_event_loop_mem = true;
157
158 int err = aws_event_loop_init_base(event_loop, alloc, options->clock);
159 if (err) {
160 goto clean_up;
161 }
162 clean_up_event_loop_base = true;
163
164 struct kqueue_loop *impl = aws_mem_calloc(alloc, 1, sizeof(struct kqueue_loop));
165 if (!impl) {
166 goto clean_up;
167 }
168
169 if (options->thread_options) {
170 impl->thread_options = *options->thread_options;
171 } else {
172 impl->thread_options = *aws_default_thread_options();
173 }
174
175 /* intialize thread id to NULL. It will be set when the event loop thread starts. */
176 aws_atomic_init_ptr(&impl->running_thread_id, NULL);
177 clean_up_impl_mem = true;
178
179 err = aws_thread_init(&impl->thread_created_on, alloc);
180 if (err) {
181 goto clean_up;
182 }
183 clean_up_thread = true;
184
185 impl->kq_fd = kqueue();
186 if (impl->kq_fd == -1) {
187 AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open kqueue handle.", (void *)event_loop);
188 aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
189 goto clean_up;
190 }
191 clean_up_kqueue = true;
192
193 err = aws_open_nonblocking_posix_pipe(impl->cross_thread_signal_pipe);
194 if (err) {
195 AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)event_loop);
196 goto clean_up;
197 }
198 AWS_LOGF_TRACE(
199 AWS_LS_IO_EVENT_LOOP,
200 "id=%p: pipe descriptors read %d, write %d.",
201 (void *)event_loop,
202 impl->cross_thread_signal_pipe[READ_FD],
203 impl->cross_thread_signal_pipe[WRITE_FD]);
204 clean_up_signal_pipe = true;
205
206 /* Set up kevent to handle activity on the cross_thread_signal_pipe */
207 struct kevent thread_signal_kevent;
208 EV_SET(
209 &thread_signal_kevent,
210 impl->cross_thread_signal_pipe[READ_FD],
211 EVFILT_READ /*filter*/,
212 EV_ADD | EV_CLEAR /*flags*/,
213 0 /*fflags*/,
214 0 /*data*/,
215 NULL /*udata*/);
216
217 int res = kevent(
218 impl->kq_fd,
219 &thread_signal_kevent /*changelist*/,
220 1 /*nchanges*/,
221 NULL /*eventlist*/,
222 0 /*nevents*/,
223 NULL /*timeout*/);
224
225 if (res == -1) {
226 AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to create cross-thread signal kevent.", (void *)event_loop);
227 aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
228 goto clean_up;
229 }
230 clean_up_signal_kevent = true;
231
232 err = aws_mutex_init(&impl->cross_thread_data.mutex);
233 if (err) {
234 goto clean_up;
235 }
236 clean_up_mutex = true;
237
238 impl->cross_thread_data.thread_signaled = false;
239
240 aws_linked_list_init(&impl->cross_thread_data.tasks_to_schedule);
241
242 impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
243
244 err = aws_task_scheduler_init(&impl->thread_data.scheduler, alloc);
245 if (err) {
246 goto clean_up;
247 }
248
249 impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
250
251 event_loop->impl_data = impl;
252
253 event_loop->vtable = &s_kqueue_vtable;
254
255 /* success */
256 return event_loop;
257
258 clean_up:
259 if (clean_up_mutex) {
260 aws_mutex_clean_up(&impl->cross_thread_data.mutex);
261 }
262 if (clean_up_signal_kevent) {
263 thread_signal_kevent.flags = EV_DELETE;
264 kevent(
265 impl->kq_fd,
266 &thread_signal_kevent /*changelist*/,
267 1 /*nchanges*/,
268 NULL /*eventlist*/,
269 0 /*nevents*/,
270 NULL /*timeout*/);
271 }
272 if (clean_up_signal_pipe) {
273 close(impl->cross_thread_signal_pipe[READ_FD]);
274 close(impl->cross_thread_signal_pipe[WRITE_FD]);
275 }
276 if (clean_up_kqueue) {
277 close(impl->kq_fd);
278 }
279 if (clean_up_thread) {
280 aws_thread_clean_up(&impl->thread_created_on);
281 }
282 if (clean_up_impl_mem) {
283 aws_mem_release(alloc, impl);
284 }
285 if (clean_up_event_loop_base) {
286 aws_event_loop_clean_up_base(event_loop);
287 }
288 if (clean_up_event_loop_mem) {
289 aws_mem_release(alloc, event_loop);
290 }
291 return NULL;
292 }
293
s_destroy(struct aws_event_loop * event_loop)294 static void s_destroy(struct aws_event_loop *event_loop) {
295 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: destroying event_loop", (void *)event_loop);
296 struct kqueue_loop *impl = event_loop->impl_data;
297
298 /* Stop the event-thread. This might have already happened. It's safe to call multiple times. */
299 s_stop(event_loop);
300 int err = s_wait_for_stop_completion(event_loop);
301 if (err) {
302 AWS_LOGF_WARN(
303 AWS_LS_IO_EVENT_LOOP,
304 "id=%p: failed to destroy event-thread, resources have been leaked",
305 (void *)event_loop);
306 AWS_ASSERT("Failed to destroy event-thread, resources have been leaked." == NULL);
307 return;
308 }
309 /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
310 impl->thread_joined_to = aws_thread_current_thread_id();
311 aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_joined_to);
312
313 /* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop.
314 * Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */
315
316 aws_task_scheduler_clean_up(&impl->thread_data.scheduler); /* Tasks in scheduler get cancelled*/
317
318 while (!aws_linked_list_empty(&impl->cross_thread_data.tasks_to_schedule)) {
319 struct aws_linked_list_node *node = aws_linked_list_pop_front(&impl->cross_thread_data.tasks_to_schedule);
320 struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
321 task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
322 }
323
324 /* Warn user if aws_io_handle was subscribed, but never unsubscribed. This would cause memory leaks. */
325 AWS_ASSERT(impl->thread_data.connected_handle_count == 0);
326
327 /* Clean up everything else */
328 aws_mutex_clean_up(&impl->cross_thread_data.mutex);
329
330 struct kevent thread_signal_kevent;
331 EV_SET(
332 &thread_signal_kevent,
333 impl->cross_thread_signal_pipe[READ_FD],
334 EVFILT_READ /*filter*/,
335 EV_DELETE /*flags*/,
336 0 /*fflags*/,
337 0 /*data*/,
338 NULL /*udata*/);
339
340 kevent(
341 impl->kq_fd,
342 &thread_signal_kevent /*changelist*/,
343 1 /*nchanges*/,
344 NULL /*eventlist*/,
345 0 /*nevents*/,
346 NULL /*timeout*/);
347
348 close(impl->cross_thread_signal_pipe[READ_FD]);
349 close(impl->cross_thread_signal_pipe[WRITE_FD]);
350 close(impl->kq_fd);
351 aws_thread_clean_up(&impl->thread_created_on);
352 aws_mem_release(event_loop->alloc, impl);
353 aws_event_loop_clean_up_base(event_loop);
354 aws_mem_release(event_loop->alloc, event_loop);
355 }
356
s_run(struct aws_event_loop * event_loop)357 static int s_run(struct aws_event_loop *event_loop) {
358 struct kqueue_loop *impl = event_loop->impl_data;
359
360 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: starting event-loop thread.", (void *)event_loop);
361 /* to re-run, call stop() and wait_for_stop_completion() */
362 AWS_ASSERT(impl->cross_thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
363 AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
364
365 /* Since thread isn't running it's ok to touch thread_data,
366 * and it's ok to touch cross_thread_data without locking the mutex */
367 impl->cross_thread_data.state = EVENT_THREAD_STATE_RUNNING;
368
369 aws_thread_increment_unjoined_count();
370 int err =
371 aws_thread_launch(&impl->thread_created_on, s_event_thread_main, (void *)event_loop, &impl->thread_options);
372
373 if (err) {
374 aws_thread_decrement_unjoined_count();
375 AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
376 goto clean_up;
377 }
378
379 return AWS_OP_SUCCESS;
380
381 clean_up:
382 impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
383 return AWS_OP_ERR;
384 }
385
386 /* This function can't fail, we're relying on the thread responding to critical messages (ex: stop thread) */
signal_cross_thread_data_changed(struct aws_event_loop * event_loop)387 void signal_cross_thread_data_changed(struct aws_event_loop *event_loop) {
388 struct kqueue_loop *impl = event_loop->impl_data;
389
390 AWS_LOGF_TRACE(
391 AWS_LS_IO_EVENT_LOOP,
392 "id=%p: signaling event-loop that cross-thread tasks need to be scheduled.",
393 (void *)event_loop);
394 /* Doesn't actually matter what we write, any activity on pipe signals that cross_thread_data has changed,
395 * If the pipe is full and the write fails, that's fine, the event-thread will get the signal from some previous
396 * write */
397 uint32_t write_whatever = 0xC0FFEE;
398 write(impl->cross_thread_signal_pipe[WRITE_FD], &write_whatever, sizeof(write_whatever));
399 }
400
s_stop(struct aws_event_loop * event_loop)401 static int s_stop(struct aws_event_loop *event_loop) {
402 struct kqueue_loop *impl = event_loop->impl_data;
403
404 bool signal_thread = false;
405
406 { /* Begin critical section */
407 aws_mutex_lock(&impl->cross_thread_data.mutex);
408 if (impl->cross_thread_data.state == EVENT_THREAD_STATE_RUNNING) {
409 impl->cross_thread_data.state = EVENT_THREAD_STATE_STOPPING;
410 signal_thread = !impl->cross_thread_data.thread_signaled;
411 impl->cross_thread_data.thread_signaled = true;
412 }
413 aws_mutex_unlock(&impl->cross_thread_data.mutex);
414 } /* End critical section */
415
416 if (signal_thread) {
417 signal_cross_thread_data_changed(event_loop);
418 }
419
420 return AWS_OP_SUCCESS;
421 }
422
s_wait_for_stop_completion(struct aws_event_loop * event_loop)423 static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
424 struct kqueue_loop *impl = event_loop->impl_data;
425
426 #ifdef DEBUG_BUILD
427 aws_mutex_lock(&impl->cross_thread_data.mutex);
428 /* call stop() before wait_for_stop_completion() or you'll wait forever */
429 AWS_ASSERT(impl->cross_thread_data.state != EVENT_THREAD_STATE_RUNNING);
430 aws_mutex_unlock(&impl->cross_thread_data.mutex);
431 #endif
432
433 int err = aws_thread_join(&impl->thread_created_on);
434 aws_thread_decrement_unjoined_count();
435 if (err) {
436 return AWS_OP_ERR;
437 }
438
439 /* Since thread is no longer running it's ok to touch thread_data,
440 * and it's ok to touch cross_thread_data without locking the mutex */
441 impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
442 impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
443
444 return AWS_OP_SUCCESS;
445 }
446
447 /* Common functionality for "now" and "future" task scheduling.
448 * If `run_at_nanos` is zero then the task is scheduled as a "now" task. */
s_schedule_task_common(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos)449 static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
450 AWS_ASSERT(task);
451 struct kqueue_loop *impl = event_loop->impl_data;
452
453 /* If we're on the event-thread, just schedule it directly */
454 if (s_is_event_thread(event_loop)) {
455 AWS_LOGF_TRACE(
456 AWS_LS_IO_EVENT_LOOP,
457 "id=%p: scheduling task %p in-thread for timestamp %llu",
458 (void *)event_loop,
459 (void *)task,
460 (unsigned long long)run_at_nanos);
461 if (run_at_nanos == 0) {
462 aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
463 } else {
464 aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
465 }
466 return;
467 }
468
469 /* Otherwise, add it to cross_thread_data.tasks_to_schedule and signal the event-thread to process it */
470 AWS_LOGF_TRACE(
471 AWS_LS_IO_EVENT_LOOP,
472 "id=%p: scheduling task %p cross-thread for timestamp %llu",
473 (void *)event_loop,
474 (void *)task,
475 (unsigned long long)run_at_nanos);
476 task->timestamp = run_at_nanos;
477 bool should_signal_thread = false;
478
479 /* Begin critical section */
480 aws_mutex_lock(&impl->cross_thread_data.mutex);
481 aws_linked_list_push_back(&impl->cross_thread_data.tasks_to_schedule, &task->node);
482
483 /* Signal thread that cross_thread_data has changed (unless it's been signaled already) */
484 if (!impl->cross_thread_data.thread_signaled) {
485 should_signal_thread = true;
486 impl->cross_thread_data.thread_signaled = true;
487 }
488
489 aws_mutex_unlock(&impl->cross_thread_data.mutex);
490 /* End critical section */
491
492 if (should_signal_thread) {
493 signal_cross_thread_data_changed(event_loop);
494 }
495 }
496
s_schedule_task_now(struct aws_event_loop * event_loop,struct aws_task * task)497 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
498 s_schedule_task_common(event_loop, task, 0); /* Zero is used to denote "now" tasks */
499 }
500
s_schedule_task_future(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos)501 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
502 s_schedule_task_common(event_loop, task, run_at_nanos);
503 }
504
s_cancel_task(struct aws_event_loop * event_loop,struct aws_task * task)505 static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
506 struct kqueue_loop *kqueue_loop = event_loop->impl_data;
507 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
508 aws_task_scheduler_cancel_task(&kqueue_loop->thread_data.scheduler, task);
509 }
510
511 /* Scheduled task that connects aws_io_handle with the kqueue */
s_subscribe_task(struct aws_task * task,void * user_data,enum aws_task_status status)512 static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
513 (void)task;
514 struct handle_data *handle_data = user_data;
515 struct aws_event_loop *event_loop = handle_data->event_loop;
516 struct kqueue_loop *impl = handle_data->event_loop->impl_data;
517
518 impl->thread_data.connected_handle_count++;
519
520 /* if task was cancelled, nothing to do */
521 if (status == AWS_TASK_STATUS_CANCELED) {
522 return;
523 }
524 AWS_LOGF_TRACE(
525 AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle_data->owner->data.fd);
526
527 /* If handle was unsubscribed before this task could execute, nothing to do */
528 if (handle_data->state == HANDLE_STATE_UNSUBSCRIBED) {
529 return;
530 }
531
532 AWS_ASSERT(handle_data->state == HANDLE_STATE_SUBSCRIBING);
533
534 /* In order to monitor both reads and writes, kqueue requires you to add two separate kevents.
535 * If we're adding two separate kevents, but one of those fails, we need to remove the other kevent.
536 * Therefore we use the EV_RECEIPT flag. This causes kevent() to tell whether each EV_ADD succeeded,
537 * rather than the usual behavior of telling us about recent events. */
538 struct kevent changelist[2];
539 AWS_ZERO_ARRAY(changelist);
540
541 int changelist_size = 0;
542
543 if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
544 EV_SET(
545 &changelist[changelist_size++],
546 handle_data->owner->data.fd,
547 EVFILT_READ /*filter*/,
548 EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
549 0 /*fflags*/,
550 0 /*data*/,
551 handle_data /*udata*/);
552 }
553 if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
554 EV_SET(
555 &changelist[changelist_size++],
556 handle_data->owner->data.fd,
557 EVFILT_WRITE /*filter*/,
558 EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
559 0 /*fflags*/,
560 0 /*data*/,
561 handle_data /*udata*/);
562 }
563
564 int num_events = kevent(
565 impl->kq_fd,
566 changelist /*changelist*/,
567 changelist_size /*nchanges*/,
568 changelist /*eventlist. It's OK to re-use the same memory for changelist input and eventlist output*/,
569 changelist_size /*nevents*/,
570 NULL /*timeout*/);
571 if (num_events == -1) {
572 goto subscribe_failed;
573 }
574
575 /* Look through results to see if any failed */
576 for (int i = 0; i < num_events; ++i) {
577 /* Every result should be flagged as error, that's just how EV_RECEIPT works */
578 AWS_ASSERT(changelist[i].flags & EV_ERROR);
579
580 /* If a real error occurred, .data contains the error code */
581 if (changelist[i].data != 0) {
582 goto subscribe_failed;
583 }
584 }
585
586 /* Success */
587 handle_data->state = HANDLE_STATE_SUBSCRIBED;
588 return;
589
590 subscribe_failed:
591 AWS_LOGF_ERROR(
592 AWS_LS_IO_EVENT_LOOP,
593 "id=%p: failed to subscribe to events on fd %d",
594 (void *)event_loop,
595 handle_data->owner->data.fd);
596 /* Remove any related kevents that succeeded */
597 for (int i = 0; i < num_events; ++i) {
598 if (changelist[i].data == 0) {
599 changelist[i].flags = EV_DELETE;
600 kevent(
601 impl->kq_fd,
602 &changelist[i] /*changelist*/,
603 1 /*nchanges*/,
604 NULL /*eventlist*/,
605 0 /*nevents*/,
606 NULL /*timeout*/);
607 }
608 }
609
610 /* We can't return an error code because this was a scheduled task.
611 * Notify the user of the failed subscription by passing AWS_IO_EVENT_TYPE_ERROR to the callback. */
612 handle_data->on_event(event_loop, handle_data->owner, AWS_IO_EVENT_TYPE_ERROR, handle_data->on_event_user_data);
613 }
614
s_subscribe_to_io_events(struct aws_event_loop * event_loop,struct aws_io_handle * handle,int events,aws_event_loop_on_event_fn * on_event,void * user_data)615 static int s_subscribe_to_io_events(
616 struct aws_event_loop *event_loop,
617 struct aws_io_handle *handle,
618 int events,
619 aws_event_loop_on_event_fn *on_event,
620 void *user_data) {
621
622 AWS_ASSERT(event_loop);
623 AWS_ASSERT(handle->data.fd != -1);
624 AWS_ASSERT(handle->additional_data == NULL);
625 AWS_ASSERT(on_event);
626 /* Must subscribe for read, write, or both */
627 AWS_ASSERT(events & (AWS_IO_EVENT_TYPE_READABLE | AWS_IO_EVENT_TYPE_WRITABLE));
628
629 struct handle_data *handle_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct handle_data));
630 if (!handle_data) {
631 return AWS_OP_ERR;
632 }
633
634 handle_data->owner = handle;
635 handle_data->event_loop = event_loop;
636 handle_data->on_event = on_event;
637 handle_data->on_event_user_data = user_data;
638 handle_data->events_subscribed = events;
639 handle_data->state = HANDLE_STATE_SUBSCRIBING;
640
641 handle->additional_data = handle_data;
642
643 /* We schedule a task to perform the actual changes to the kqueue, read on for an explanation why...
644 *
645 * kqueue requires separate registrations for read and write events.
646 * If the user wants to know about both read and write, we need register once for read and once for write.
647 * If the first registration succeeds, but the second registration fails, we need to delete the first registration.
648 * If this all happened outside the event-thread, the successful registration's events could begin processing
649 * in the brief window of time before the registration is deleted. */
650
651 aws_task_init(&handle_data->subscribe_task, s_subscribe_task, handle_data, "kqueue_event_loop_subscribe");
652 s_schedule_task_now(event_loop, &handle_data->subscribe_task);
653
654 return AWS_OP_SUCCESS;
655 }
656
s_free_io_event_resources(void * user_data)657 static void s_free_io_event_resources(void *user_data) {
658 struct handle_data *handle_data = user_data;
659 struct kqueue_loop *impl = handle_data->event_loop->impl_data;
660
661 impl->thread_data.connected_handle_count--;
662
663 aws_mem_release(handle_data->event_loop->alloc, handle_data);
664 }
665
s_clean_up_handle_data_task(struct aws_task * task,void * user_data,enum aws_task_status status)666 static void s_clean_up_handle_data_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
667 (void)task;
668 (void)status;
669
670 struct handle_data *handle_data = user_data;
671 s_free_io_event_resources(handle_data);
672 }
673
s_unsubscribe_from_io_events(struct aws_event_loop * event_loop,struct aws_io_handle * handle)674 static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
675 AWS_LOGF_TRACE(
676 AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd);
677 AWS_ASSERT(handle->additional_data);
678 struct handle_data *handle_data = handle->additional_data;
679 struct kqueue_loop *impl = event_loop->impl_data;
680
681 AWS_ASSERT(event_loop == handle_data->event_loop);
682
683 /* If the handle was successfully subscribed to kqueue, then remove it. */
684 if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
685 struct kevent changelist[2];
686 int changelist_size = 0;
687
688 if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
689 EV_SET(
690 &changelist[changelist_size++],
691 handle_data->owner->data.fd,
692 EVFILT_READ /*filter*/,
693 EV_DELETE /*flags*/,
694 0 /*fflags*/,
695 0 /*data*/,
696 handle_data /*udata*/);
697 }
698 if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
699 EV_SET(
700 &changelist[changelist_size++],
701 handle_data->owner->data.fd,
702 EVFILT_WRITE /*filter*/,
703 EV_DELETE /*flags*/,
704 0 /*fflags*/,
705 0 /*data*/,
706 handle_data /*udata*/);
707 }
708
709 kevent(impl->kq_fd, changelist, changelist_size, NULL /*eventlist*/, 0 /*nevents*/, NULL /*timeout*/);
710 }
711
712 /* Schedule a task to clean up the memory. This is done in a task to prevent the following scenario:
713 * - While processing a batch of events, some callback unsubscribes another aws_io_handle.
714 * - One of the other events in this batch belongs to that other aws_io_handle.
715 * - If the handle_data were already deleted, there would be an access invalid memory. */
716
717 aws_task_init(
718 &handle_data->cleanup_task, s_clean_up_handle_data_task, handle_data, "kqueue_event_loop_clean_up_handle_data");
719 aws_event_loop_schedule_task_now(event_loop, &handle_data->cleanup_task);
720
721 handle_data->state = HANDLE_STATE_UNSUBSCRIBED;
722 handle->additional_data = NULL;
723
724 return AWS_OP_SUCCESS;
725 }
726
s_is_event_thread(struct aws_event_loop * event_loop)727 static bool s_is_event_thread(struct aws_event_loop *event_loop) {
728 struct kqueue_loop *impl = event_loop->impl_data;
729
730 aws_thread_id_t *thread_id = aws_atomic_load_ptr(&impl->running_thread_id);
731 return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
732 }
733
734 /* Called from thread.
735 * Takes tasks from tasks_to_schedule and adds them to the scheduler. */
s_process_tasks_to_schedule(struct aws_event_loop * event_loop,struct aws_linked_list * tasks_to_schedule)736 static void s_process_tasks_to_schedule(struct aws_event_loop *event_loop, struct aws_linked_list *tasks_to_schedule) {
737 struct kqueue_loop *impl = event_loop->impl_data;
738 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop);
739
740 while (!aws_linked_list_empty(tasks_to_schedule)) {
741 struct aws_linked_list_node *node = aws_linked_list_pop_front(tasks_to_schedule);
742 struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
743
744 AWS_LOGF_TRACE(
745 AWS_LS_IO_EVENT_LOOP,
746 "id=%p: task %p pulled to event-loop, scheduling now.",
747 (void *)event_loop,
748 (void *)task);
749 /* Timestamp 0 is used to denote "now" tasks */
750 if (task->timestamp == 0) {
751 aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
752 } else {
753 aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, task->timestamp);
754 }
755 }
756 }
757
s_process_cross_thread_data(struct aws_event_loop * event_loop)758 static void s_process_cross_thread_data(struct aws_event_loop *event_loop) {
759 struct kqueue_loop *impl = event_loop->impl_data;
760
761 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread data to process", (void *)event_loop);
762 /* If there are tasks to schedule, grab them all out of synced_data.tasks_to_schedule.
763 * We'll process them later, so that we minimize time spent holding the mutex. */
764 struct aws_linked_list tasks_to_schedule;
765 aws_linked_list_init(&tasks_to_schedule);
766
767 { /* Begin critical section */
768 aws_mutex_lock(&impl->cross_thread_data.mutex);
769 impl->cross_thread_data.thread_signaled = false;
770
771 bool initiate_stop = (impl->cross_thread_data.state == EVENT_THREAD_STATE_STOPPING) &&
772 (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING);
773 if (AWS_UNLIKELY(initiate_stop)) {
774 impl->thread_data.state = EVENT_THREAD_STATE_STOPPING;
775 }
776
777 aws_linked_list_swap_contents(&impl->cross_thread_data.tasks_to_schedule, &tasks_to_schedule);
778
779 aws_mutex_unlock(&impl->cross_thread_data.mutex);
780 } /* End critical section */
781
782 s_process_tasks_to_schedule(event_loop, &tasks_to_schedule);
783 }
784
s_aws_event_flags_from_kevent(struct kevent * kevent)785 static int s_aws_event_flags_from_kevent(struct kevent *kevent) {
786 int event_flags = 0;
787
788 if (kevent->flags & EV_ERROR) {
789 event_flags |= AWS_IO_EVENT_TYPE_ERROR;
790 } else if (kevent->filter == EVFILT_READ) {
791 if (kevent->data != 0) {
792 event_flags |= AWS_IO_EVENT_TYPE_READABLE;
793 }
794
795 if (kevent->flags & EV_EOF) {
796 event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
797 }
798 } else if (kevent->filter == EVFILT_WRITE) {
799 if (kevent->data != 0) {
800 event_flags |= AWS_IO_EVENT_TYPE_WRITABLE;
801 }
802
803 if (kevent->flags & EV_EOF) {
804 event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
805 }
806 }
807
808 return event_flags;
809 }
810
s_event_thread_main(void * user_data)811 static void s_event_thread_main(void *user_data) {
812 struct aws_event_loop *event_loop = user_data;
813 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
814 struct kqueue_loop *impl = event_loop->impl_data;
815
816 /* set thread id to the event-loop's thread. */
817 aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);
818
819 AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
820 impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
821
822 struct kevent kevents[MAX_EVENTS];
823
824 /* A single aws_io_handle could have two separate kevents if subscribed for both read and write.
825 * If both the read and write kevents fire in the same loop of the event-thread,
826 * combine the event-flags and deliver them in a single callback.
827 * This makes the kqueue_event_loop behave more like the other platform implementations. */
828 struct handle_data *io_handle_events[MAX_EVENTS];
829
830 struct timespec timeout = {
831 .tv_sec = DEFAULT_TIMEOUT_SEC,
832 .tv_nsec = 0,
833 };
834
835 AWS_LOGF_INFO(
836 AWS_LS_IO_EVENT_LOOP,
837 "id=%p: default timeout %ds, and max events to process per tick %d",
838 (void *)event_loop,
839 DEFAULT_TIMEOUT_SEC,
840 MAX_EVENTS);
841
842 while (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING) {
843 int num_io_handle_events = 0;
844 bool should_process_cross_thread_data = false;
845
846 AWS_LOGF_TRACE(
847 AWS_LS_IO_EVENT_LOOP,
848 "id=%p: waiting for a maximum of %ds %lluns",
849 (void *)event_loop,
850 (int)timeout.tv_sec,
851 (unsigned long long)timeout.tv_nsec);
852
853 /* Process kqueue events */
854 int num_kevents = kevent(
855 impl->kq_fd, NULL /*changelist*/, 0 /*nchanges*/, kevents /*eventlist*/, MAX_EVENTS /*nevents*/, &timeout);
856
857 aws_event_loop_register_tick_start(event_loop);
858 AWS_LOGF_TRACE(
859 AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, num_kevents);
860 if (num_kevents == -1) {
861 /* Raise an error, in case this is interesting to anyone monitoring,
862 * and continue on with this loop. We can't process events,
863 * but we can still process scheduled tasks */
864 aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
865
866 /* Force the cross_thread_data to be processed.
867 * There might be valuable info in there, like the message to stop the thread.
868 * It's fine to do this even if nothing has changed, it just costs a mutex lock/unlock. */
869 should_process_cross_thread_data = true;
870 }
871
872 for (int i = 0; i < num_kevents; ++i) {
873 struct kevent *kevent = &kevents[i];
874
875 /* Was this event to signal that cross_thread_data has changed? */
876 if ((int)kevent->ident == impl->cross_thread_signal_pipe[READ_FD]) {
877 should_process_cross_thread_data = true;
878
879 /* Drain whatever data was written to the signaling pipe */
880 uint32_t read_whatever;
881 while (read((int)kevent->ident, &read_whatever, sizeof(read_whatever)) > 0) {
882 }
883
884 continue;
885 }
886
887 /* Otherwise this was a normal event on a subscribed handle. Figure out which flags to report. */
888 int event_flags = s_aws_event_flags_from_kevent(kevent);
889 if (event_flags == 0) {
890 continue;
891 }
892
893 /* Combine flags, in case multiple kevents correspond to one handle. (see notes at top of function) */
894 struct handle_data *handle_data = kevent->udata;
895 if (handle_data->events_this_loop == 0) {
896 io_handle_events[num_io_handle_events++] = handle_data;
897 }
898 handle_data->events_this_loop |= event_flags;
899 }
900
901 /* Invoke each handle's event callback (unless the handle has been unsubscribed) */
902 for (int i = 0; i < num_io_handle_events; ++i) {
903 struct handle_data *handle_data = io_handle_events[i];
904
905 if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
906 AWS_LOGF_TRACE(
907 AWS_LS_IO_EVENT_LOOP,
908 "id=%p: activity on fd %d, invoking handler.",
909 (void *)event_loop,
910 handle_data->owner->data.fd);
911 handle_data->on_event(
912 event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data);
913 }
914
915 handle_data->events_this_loop = 0;
916 }
917
918 /* Process cross_thread_data */
919 if (should_process_cross_thread_data) {
920 s_process_cross_thread_data(event_loop);
921 }
922
923 /* Run scheduled tasks */
924 uint64_t now_ns = 0;
925 event_loop->clock(&now_ns); /* If clock fails, now_ns will be 0 and tasks scheduled for a specific time
926 will not be run. That's ok, we'll handle them next time around. */
927 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
928 aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);
929
930 /* Set timeout for next kevent() call.
931 * If clock fails, or scheduler has no tasks, use default timeout */
932 bool use_default_timeout = false;
933
934 int err = event_loop->clock(&now_ns);
935 if (err) {
936 use_default_timeout = true;
937 }
938
939 uint64_t next_run_time_ns;
940 if (!aws_task_scheduler_has_tasks(&impl->thread_data.scheduler, &next_run_time_ns)) {
941
942 use_default_timeout = true;
943 }
944
945 if (use_default_timeout) {
946 AWS_LOGF_TRACE(
947 AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
948 timeout.tv_sec = DEFAULT_TIMEOUT_SEC;
949 timeout.tv_nsec = 0;
950 } else {
951 /* Convert from timestamp in nanoseconds, to timeout in seconds with nanosecond remainder */
952 uint64_t timeout_ns = next_run_time_ns > now_ns ? next_run_time_ns - now_ns : 0;
953
954 uint64_t timeout_remainder_ns = 0;
955 uint64_t timeout_sec =
956 aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &timeout_remainder_ns);
957
958 if (timeout_sec > LONG_MAX) { /* Check for overflow. On Darwin, these values are stored as longs */
959 timeout_sec = LONG_MAX;
960 timeout_remainder_ns = 0;
961 }
962
963 AWS_LOGF_TRACE(
964 AWS_LS_IO_EVENT_LOOP,
965 "id=%p: detected more scheduled tasks with the next occurring at "
966 "%llu using timeout of %ds %lluns.",
967 (void *)event_loop,
968 (unsigned long long)timeout_ns,
969 (int)timeout_sec,
970 (unsigned long long)timeout_remainder_ns);
971 timeout.tv_sec = (time_t)(timeout_sec);
972 timeout.tv_nsec = (long)(timeout_remainder_ns);
973 }
974
975 aws_event_loop_register_tick_end(event_loop);
976 }
977
978 AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
979 /* reset to NULL. This should be updated again during destroy before tasks are canceled. */
980 aws_atomic_store_ptr(&impl->running_thread_id, NULL);
981 }
982