1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6 #include <aws/common/task_scheduler.h>
7
8 #include <aws/common/logging.h>
9
10 #include <inttypes.h>
11
12 static const size_t DEFAULT_QUEUE_SIZE = 7;
13
aws_task_init(struct aws_task * task,aws_task_fn * fn,void * arg,const char * type_tag)14 void aws_task_init(struct aws_task *task, aws_task_fn *fn, void *arg, const char *type_tag) {
15 AWS_ZERO_STRUCT(*task);
16 task->fn = fn;
17 task->arg = arg;
18 task->type_tag = type_tag;
19 }
20
aws_task_status_to_c_str(enum aws_task_status status)21 const char *aws_task_status_to_c_str(enum aws_task_status status) {
22 switch (status) {
23 case AWS_TASK_STATUS_RUN_READY:
24 return "<Running>";
25
26 case AWS_TASK_STATUS_CANCELED:
27 return "<Canceled>";
28
29 default:
30 return "<Unknown>";
31 }
32 }
33
aws_task_run(struct aws_task * task,enum aws_task_status status)34 void aws_task_run(struct aws_task *task, enum aws_task_status status) {
35 AWS_ASSERT(task->fn);
36 AWS_LOGF_DEBUG(
37 AWS_LS_COMMON_TASK_SCHEDULER,
38 "id=%p: Running %s task with %s status",
39 (void *)task,
40 task->type_tag,
41 aws_task_status_to_c_str(status));
42
43 task->abi_extension.scheduled = false;
44 task->fn(task, task->arg, status);
45 }
46
s_compare_timestamps(const void * a,const void * b)47 static int s_compare_timestamps(const void *a, const void *b) {
48 uint64_t a_time = (*(struct aws_task **)a)->timestamp;
49 uint64_t b_time = (*(struct aws_task **)b)->timestamp;
50 return a_time > b_time; /* min-heap */
51 }
52
53 static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status);
54
aws_task_scheduler_init(struct aws_task_scheduler * scheduler,struct aws_allocator * alloc)55 int aws_task_scheduler_init(struct aws_task_scheduler *scheduler, struct aws_allocator *alloc) {
56 AWS_ASSERT(alloc);
57
58 AWS_ZERO_STRUCT(*scheduler);
59
60 if (aws_priority_queue_init_dynamic(
61 &scheduler->timed_queue, alloc, DEFAULT_QUEUE_SIZE, sizeof(struct aws_task *), &s_compare_timestamps)) {
62 return AWS_OP_ERR;
63 };
64
65 scheduler->alloc = alloc;
66 aws_linked_list_init(&scheduler->timed_list);
67 aws_linked_list_init(&scheduler->asap_list);
68
69 AWS_POSTCONDITION(aws_task_scheduler_is_valid(scheduler));
70 return AWS_OP_SUCCESS;
71 }
72
aws_task_scheduler_clean_up(struct aws_task_scheduler * scheduler)73 void aws_task_scheduler_clean_up(struct aws_task_scheduler *scheduler) {
74 AWS_ASSERT(scheduler);
75
76 if (aws_task_scheduler_is_valid(scheduler)) {
77 /* Execute all remaining tasks as CANCELED.
78 * Do this in a loop so that tasks scheduled by other tasks are executed */
79 while (aws_task_scheduler_has_tasks(scheduler, NULL)) {
80 s_run_all(scheduler, UINT64_MAX, AWS_TASK_STATUS_CANCELED);
81 }
82 }
83
84 aws_priority_queue_clean_up(&scheduler->timed_queue);
85 AWS_ZERO_STRUCT(*scheduler);
86 }
87
aws_task_scheduler_is_valid(const struct aws_task_scheduler * scheduler)88 bool aws_task_scheduler_is_valid(const struct aws_task_scheduler *scheduler) {
89 return scheduler && scheduler->alloc && aws_priority_queue_is_valid(&scheduler->timed_queue) &&
90 aws_linked_list_is_valid(&scheduler->asap_list) && aws_linked_list_is_valid(&scheduler->timed_list);
91 }
92
aws_task_scheduler_has_tasks(const struct aws_task_scheduler * scheduler,uint64_t * next_task_time)93 bool aws_task_scheduler_has_tasks(const struct aws_task_scheduler *scheduler, uint64_t *next_task_time) {
94 AWS_ASSERT(scheduler);
95
96 uint64_t timestamp = UINT64_MAX;
97 bool has_tasks = false;
98
99 if (!aws_linked_list_empty(&scheduler->asap_list)) {
100 timestamp = 0;
101 has_tasks = true;
102
103 } else {
104 /* Check whether timed_list or timed_queue has the earlier task */
105 if (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) {
106 struct aws_linked_list_node *node = aws_linked_list_front(&scheduler->timed_list);
107 struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
108 timestamp = task->timestamp;
109 has_tasks = true;
110 }
111
112 struct aws_task **task_ptrptr = NULL;
113 if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&task_ptrptr) == AWS_OP_SUCCESS) {
114 if ((*task_ptrptr)->timestamp < timestamp) {
115 timestamp = (*task_ptrptr)->timestamp;
116 }
117 has_tasks = true;
118 }
119 }
120
121 if (next_task_time) {
122 *next_task_time = timestamp;
123 }
124 return has_tasks;
125 }
126
aws_task_scheduler_schedule_now(struct aws_task_scheduler * scheduler,struct aws_task * task)127 void aws_task_scheduler_schedule_now(struct aws_task_scheduler *scheduler, struct aws_task *task) {
128 AWS_ASSERT(scheduler);
129 AWS_ASSERT(task);
130 AWS_ASSERT(task->fn);
131
132 AWS_LOGF_DEBUG(
133 AWS_LS_COMMON_TASK_SCHEDULER,
134 "id=%p: Scheduling %s task for immediate execution",
135 (void *)task,
136 task->type_tag);
137
138 task->priority_queue_node.current_index = SIZE_MAX;
139 aws_linked_list_node_reset(&task->node);
140 task->timestamp = 0;
141
142 aws_linked_list_push_back(&scheduler->asap_list, &task->node);
143 task->abi_extension.scheduled = true;
144 }
145
aws_task_scheduler_schedule_future(struct aws_task_scheduler * scheduler,struct aws_task * task,uint64_t time_to_run)146 void aws_task_scheduler_schedule_future(
147 struct aws_task_scheduler *scheduler,
148 struct aws_task *task,
149 uint64_t time_to_run) {
150
151 AWS_ASSERT(scheduler);
152 AWS_ASSERT(task);
153 AWS_ASSERT(task->fn);
154
155 AWS_LOGF_DEBUG(
156 AWS_LS_COMMON_TASK_SCHEDULER,
157 "id=%p: Scheduling %s task for future execution at time %" PRIu64,
158 (void *)task,
159 task->type_tag,
160 time_to_run);
161
162 task->timestamp = time_to_run;
163
164 task->priority_queue_node.current_index = SIZE_MAX;
165 aws_linked_list_node_reset(&task->node);
166 int err = aws_priority_queue_push_ref(&scheduler->timed_queue, &task, &task->priority_queue_node);
167 if (AWS_UNLIKELY(err)) {
168 /* In the (very unlikely) case that we can't push into the timed_queue,
169 * perform a sorted insertion into timed_list. */
170 struct aws_linked_list_node *node_i;
171 for (node_i = aws_linked_list_begin(&scheduler->timed_list);
172 node_i != aws_linked_list_end(&scheduler->timed_list);
173 node_i = aws_linked_list_next(node_i)) {
174
175 struct aws_task *task_i = AWS_CONTAINER_OF(node_i, struct aws_task, node);
176 if (task_i->timestamp > time_to_run) {
177 break;
178 }
179 }
180 aws_linked_list_insert_before(node_i, &task->node);
181 }
182 task->abi_extension.scheduled = true;
183 }
184
aws_task_scheduler_run_all(struct aws_task_scheduler * scheduler,uint64_t current_time)185 void aws_task_scheduler_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time) {
186 AWS_ASSERT(scheduler);
187
188 s_run_all(scheduler, current_time, AWS_TASK_STATUS_RUN_READY);
189 }
190
s_run_all(struct aws_task_scheduler * scheduler,uint64_t current_time,enum aws_task_status status)191 static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status) {
192
193 /* Move scheduled tasks to running_list before executing.
194 * This gives us the desired behavior that: if executing a task results in another task being scheduled,
195 * that new task is not executed until the next time run() is invoked. */
196 struct aws_linked_list running_list;
197 aws_linked_list_init(&running_list);
198
199 /* First move everything from asap_list */
200 aws_linked_list_swap_contents(&running_list, &scheduler->asap_list);
201
202 /* Next move tasks from timed_queue and timed_list, based on whichever's next-task is sooner.
203 * It's very unlikely that any tasks are in timed_list, so once it has no more valid tasks,
204 * break out of this complex loop in favor of a simpler one. */
205 while (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) {
206
207 struct aws_linked_list_node *timed_list_node = aws_linked_list_begin(&scheduler->timed_list);
208 struct aws_task *timed_list_task = AWS_CONTAINER_OF(timed_list_node, struct aws_task, node);
209 if (timed_list_task->timestamp > current_time) {
210 /* timed_list is out of valid tasks, break out of complex loop */
211 break;
212 }
213
214 /* Check if timed_queue has a task which is sooner */
215 struct aws_task **timed_queue_task_ptrptr = NULL;
216 if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) {
217 if ((*timed_queue_task_ptrptr)->timestamp <= current_time) {
218 if ((*timed_queue_task_ptrptr)->timestamp < timed_list_task->timestamp) {
219 /* Take task from timed_queue */
220 struct aws_task *timed_queue_task;
221 aws_priority_queue_pop(&scheduler->timed_queue, &timed_queue_task);
222 aws_linked_list_push_back(&running_list, &timed_queue_task->node);
223 continue;
224 }
225 }
226 }
227
228 /* Take task from timed_list */
229 aws_linked_list_pop_front(&scheduler->timed_list);
230 aws_linked_list_push_back(&running_list, &timed_list_task->node);
231 }
232
233 /* Simpler loop that moves remaining valid tasks from timed_queue */
234 struct aws_task **timed_queue_task_ptrptr = NULL;
235 while (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) {
236 if ((*timed_queue_task_ptrptr)->timestamp > current_time) {
237 break;
238 }
239
240 struct aws_task *next_timed_task;
241 aws_priority_queue_pop(&scheduler->timed_queue, &next_timed_task);
242 aws_linked_list_push_back(&running_list, &next_timed_task->node);
243 }
244
245 /* Run tasks */
246 while (!aws_linked_list_empty(&running_list)) {
247 struct aws_linked_list_node *task_node = aws_linked_list_pop_front(&running_list);
248 struct aws_task *task = AWS_CONTAINER_OF(task_node, struct aws_task, node);
249 aws_task_run(task, status);
250 }
251 }
252
aws_task_scheduler_cancel_task(struct aws_task_scheduler * scheduler,struct aws_task * task)253 void aws_task_scheduler_cancel_task(struct aws_task_scheduler *scheduler, struct aws_task *task) {
254 /* attempt the linked lists first since those will be faster access and more likely to occur
255 * anyways.
256 */
257 if (task->node.next) {
258 aws_linked_list_remove(&task->node);
259 } else if (task->abi_extension.scheduled) {
260 aws_priority_queue_remove(&scheduler->timed_queue, &task, &task->priority_queue_node);
261 }
262
263 /*
264 * No need to log cancellation specially; it will get logged during the run call with the canceled status
265 */
266 aws_task_run(task, AWS_TASK_STATUS_CANCELED);
267 }
268