1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 /* clang-format off */
17 #include "preamble.h"
18 
19 /* not in preamble because we specialize for reading-from a uint64_t var (as
20 used by `s_schedule_task_common`) */
21 ssize_t write(int fd, void *bytes, size_t nbytes)
22     _(requires valid_fd(fd))
23     _(requires nbytes == sizeof(uint64_t))
24     _(requires \thread_local((uint64_t *)bytes))
25 ;
26 
s_schedule_task_common(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos _ (ghost\\claim (c_event_loop))_ (ghost\\claim (c_mutex)))27 static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos
28     _(ghost \claim(c_event_loop))
29     _(ghost \claim(c_mutex))
30 )
31     _(always c_event_loop, event_loop->\closed)
32     _(requires \wrapped(c_mutex) && \claims_object(c_mutex, &(epoll_loop_of(event_loop)->task_pre_queue_mutex)))
33     _(requires \thread_local(task))
34     _(requires \wrapped(task))
35     _(requires task->fn->\valid)
36     _(writes task)
37     _(updates &epoll_loop_of(event_loop)->scheduler)
38 {
39     _(assert \always_by_claim(c_event_loop, event_loop))
40     _(assert \active_claim(c_event_loop))
41     struct epoll_loop *epoll_loop = event_loop->impl_data;
42     _(assert \inv(epoll_loop))
43     _(assert epoll_loop->\closed)
44     _(assert epoll_loop->write_task_handle.\closed)
45     _(assert \inv(&epoll_loop->write_task_handle))
46     _(assert valid_fd(epoll_loop->write_task_handle.data.fd))
47 
48     /* if event loop and the caller are the same thread, just schedule and be done with it. */
49     if (s_is_on_callers_thread(event_loop _(ghost c_event_loop))) {
50         AWS_LOGF_TRACE(
51             AWS_LS_IO_EVENT_LOOP,
52             "id=%p: scheduling task %p in-thread for timestamp %llu",
53             (void *)event_loop,
54             (void *)task,
55             (unsigned long long)run_at_nanos);
56         if (run_at_nanos == 0) {
57             /* zero denotes "now" task */
58             aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
59         } else {
60             aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, run_at_nanos);
61         }
62         return;
63     }
64 
65     AWS_LOGF_TRACE(
66         AWS_LS_IO_EVENT_LOOP,
67         "id=%p: Scheduling task %p cross-thread for timestamp %llu",
68         (void *)event_loop,
69         (void *)task,
70         (unsigned long long)run_at_nanos);
71     _(unwrap task)
72     task->timestamp = run_at_nanos;
73     _(wrap task)
74     aws_mutex_lock(&epoll_loop->task_pre_queue_mutex _(ghost c_mutex));
75     _(assert epoll_loop->task_pre_queue.\owner == \me)
76 
77     uint64_t counter = 1;
78 
79     bool is_first_task = aws_linked_list_empty(&epoll_loop->task_pre_queue);
80 
81     aws_linked_list_push_back(&epoll_loop->task_pre_queue, &task->node _(ghost task));
82 
83     /* if the list was not empty, we already have a pending read on the pipe/eventfd, no need to write again. */
84     if (is_first_task) {
85         AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Waking up event-loop thread", (void *)event_loop);
86 
87         /* If the write fails because the buffer is full, we don't actually care because that means there's a pending
88          * read on the pipe/eventfd and thus the event loop will end up checking to see if something has been queued.*/
89         _(assert \active_claim(c_event_loop)) /*< implies write_task_handle.data.fd is a valid fd
90                                                  ==> event_loop->\closed
91                                                  ==> epoll_loop->\closed ==> \inv(epoll_loop)
92                                                  ==> epoll_loop->write_task_handle.\closed ==> \inv(&epoll_loop->write_task_handle) */
93         ssize_t do_not_care = write(_(by_claim c_event_loop) epoll_loop->write_task_handle.data.fd, (void *)&counter, sizeof(counter));
94         (void)do_not_care;
95     }
96 
97     aws_mutex_unlock(&epoll_loop->task_pre_queue_mutex _(ghost c_mutex));
98 }
99 
s_schedule_task_now(struct aws_event_loop * event_loop,struct aws_task * task _ (ghost\\claim (c_event_loop))_ (ghost\\claim (c_mutex)))100 static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task
101     _(ghost \claim(c_event_loop))
102     _(ghost \claim(c_mutex))
103 ) {
104     s_schedule_task_common(event_loop, task, 0 /* zero denotes "now" task */ _(ghost c_event_loop) _(ghost c_mutex));
105 }
106 
s_schedule_task_future(struct aws_event_loop * event_loop,struct aws_task * task,uint64_t run_at_nanos _ (ghost\\claim (c_event_loop))_ (ghost\\claim (c_mutex)))107 static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos
108     _(ghost \claim(c_event_loop))
109     _(ghost \claim(c_mutex))
110 ) {
111     s_schedule_task_common(event_loop, task, run_at_nanos _(ghost c_event_loop) _(ghost c_mutex));
112 }
113 /* clang-format on */
114