1 /*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15
16 /* clang-format off */
17 #include "preamble.h"
18
19 /* not in preamble because we specialize for reading-from a uint64_t var (as
20 used by `s_schedule_task_common`) */
21 ssize_t write(int fd, void *bytes, size_t nbytes)
22 _(requires valid_fd(fd))
23 _(requires nbytes == sizeof(uint64_t))
24 _(requires \thread_local((uint64_t *)bytes))
25 ;
26
s_schedule_task_common(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos _ (ghost\\claim (c_event_loop))_ (ghost\\claim (c_mutex)))27 static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos
28 _(ghost \claim(c_event_loop))
29 _(ghost \claim(c_mutex))
30 )
31 _(always c_event_loop, event_loop->\closed)
32 _(requires \wrapped(c_mutex) && \claims_object(c_mutex, &(epoll_loop_of(event_loop)->task_pre_queue_mutex)))
33 _(requires \thread_local(task))
34 _(requires \wrapped(task))
35 _(requires task->fn->\valid)
36 _(writes task)
37 _(updates &epoll_loop_of(event_loop)->scheduler)
38 {
39 _(assert \always_by_claim(c_event_loop, event_loop))
40 _(assert \active_claim(c_event_loop))
41 struct epoll_loop *epoll_loop = event_loop->impl_data;
42 _(assert \inv(epoll_loop))
43 _(assert epoll_loop->\closed)
44 _(assert epoll_loop->write_task_handle.\closed)
45 _(assert \inv(&epoll_loop->write_task_handle))
46 _(assert valid_fd(epoll_loop->write_task_handle.data.fd))
47
48 /* if event loop and the caller are the same thread, just schedule and be done with it. */
49 if (s_is_on_callers_thread(event_loop _(ghost c_event_loop))) {
50 AWS_LOGF_TRACE(
51 AWS_LS_IO_EVENT_LOOP,
52 "id=%p: scheduling task %p in-thread for timestamp %llu",
53 (void *)event_loop,
54 (void *)task,
55 (unsigned long long)run_at_nanos);
56 if (run_at_nanos == 0) {
57 /* zero denotes "now" task */
58 aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
59 } else {
60 aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, run_at_nanos);
61 }
62 return;
63 }
64
65 AWS_LOGF_TRACE(
66 AWS_LS_IO_EVENT_LOOP,
67 "id=%p: Scheduling task %p cross-thread for timestamp %llu",
68 (void *)event_loop,
69 (void *)task,
70 (unsigned long long)run_at_nanos);
71 _(unwrap task)
72 task->timestamp = run_at_nanos;
73 _(wrap task)
74 aws_mutex_lock(&epoll_loop->task_pre_queue_mutex _(ghost c_mutex));
75 _(assert epoll_loop->task_pre_queue.\owner == \me)
76
77 uint64_t counter = 1;
78
79 bool is_first_task = aws_linked_list_empty(&epoll_loop->task_pre_queue);
80
81 aws_linked_list_push_back(&epoll_loop->task_pre_queue, &task->node _(ghost task));
82
83 /* if the list was not empty, we already have a pending read on the pipe/eventfd, no need to write again. */
84 if (is_first_task) {
85 AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Waking up event-loop thread", (void *)event_loop);
86
87 /* If the write fails because the buffer is full, we don't actually care because that means there's a pending
88 * read on the pipe/eventfd and thus the event loop will end up checking to see if something has been queued.*/
89 _(assert \active_claim(c_event_loop)) /*< implies write_task_handle.data.fd is a valid fd
90 ==> event_loop->\closed
91 ==> epoll_loop->\closed ==> \inv(epoll_loop)
92 ==> epoll_loop->write_task_handle.\closed ==> \inv(&epoll_loop->write_task_handle) */
93 ssize_t do_not_care = write(_(by_claim c_event_loop) epoll_loop->write_task_handle.data.fd, (void *)&counter, sizeof(counter));
94 (void)do_not_care;
95 }
96
97 aws_mutex_unlock(&epoll_loop->task_pre_queue_mutex _(ghost c_mutex));
98 }
99
s_schedule_task_now(struct aws_event_loop * event_loop,struct aws_task * task _ (ghost\\claim (c_event_loop))_ (ghost\\claim (c_mutex)))100 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task
101 _(ghost \claim(c_event_loop))
102 _(ghost \claim(c_mutex))
103 ) {
104 s_schedule_task_common(event_loop, task, 0 /* zero denotes "now" task */ _(ghost c_event_loop) _(ghost c_mutex));
105 }
106
s_schedule_task_future(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos _ (ghost\\claim (c_event_loop))_ (ghost\\claim (c_mutex)))107 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos
108 _(ghost \claim(c_event_loop))
109 _(ghost \claim(c_mutex))
110 ) {
111 s_schedule_task_common(event_loop, task, run_at_nanos _(ghost c_event_loop) _(ghost c_mutex));
112 }
113 /* clang-format on */
114