1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/common/task_scheduler.h>
7 
8 #include <aws/common/logging.h>
9 
10 #include <inttypes.h>
11 
12 static const size_t DEFAULT_QUEUE_SIZE = 7;
13 
aws_task_init(struct aws_task * task,aws_task_fn * fn,void * arg,const char * type_tag)14 void aws_task_init(struct aws_task *task, aws_task_fn *fn, void *arg, const char *type_tag) {
15     AWS_ZERO_STRUCT(*task);
16     task->fn = fn;
17     task->arg = arg;
18     task->type_tag = type_tag;
19 }
20 
aws_task_status_to_c_str(enum aws_task_status status)21 const char *aws_task_status_to_c_str(enum aws_task_status status) {
22     switch (status) {
23         case AWS_TASK_STATUS_RUN_READY:
24             return "<Running>";
25 
26         case AWS_TASK_STATUS_CANCELED:
27             return "<Canceled>";
28 
29         default:
30             return "<Unknown>";
31     }
32 }
33 
aws_task_run(struct aws_task * task,enum aws_task_status status)34 void aws_task_run(struct aws_task *task, enum aws_task_status status) {
35     AWS_ASSERT(task->fn);
36     AWS_LOGF_DEBUG(
37         AWS_LS_COMMON_TASK_SCHEDULER,
38         "id=%p: Running %s task with %s status",
39         (void *)task,
40         task->type_tag,
41         aws_task_status_to_c_str(status));
42 
43     task->abi_extension.scheduled = false;
44     task->fn(task, task->arg, status);
45 }
46 
s_compare_timestamps(const void * a,const void * b)47 static int s_compare_timestamps(const void *a, const void *b) {
48     uint64_t a_time = (*(struct aws_task **)a)->timestamp;
49     uint64_t b_time = (*(struct aws_task **)b)->timestamp;
50     return a_time > b_time; /* min-heap */
51 }
52 
53 static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status);
54 
aws_task_scheduler_init(struct aws_task_scheduler * scheduler,struct aws_allocator * alloc)55 int aws_task_scheduler_init(struct aws_task_scheduler *scheduler, struct aws_allocator *alloc) {
56     AWS_ASSERT(alloc);
57 
58     AWS_ZERO_STRUCT(*scheduler);
59 
60     if (aws_priority_queue_init_dynamic(
61             &scheduler->timed_queue, alloc, DEFAULT_QUEUE_SIZE, sizeof(struct aws_task *), &s_compare_timestamps)) {
62         return AWS_OP_ERR;
63     };
64 
65     scheduler->alloc = alloc;
66     aws_linked_list_init(&scheduler->timed_list);
67     aws_linked_list_init(&scheduler->asap_list);
68 
69     AWS_POSTCONDITION(aws_task_scheduler_is_valid(scheduler));
70     return AWS_OP_SUCCESS;
71 }
72 
aws_task_scheduler_clean_up(struct aws_task_scheduler * scheduler)73 void aws_task_scheduler_clean_up(struct aws_task_scheduler *scheduler) {
74     AWS_ASSERT(scheduler);
75 
76     if (aws_task_scheduler_is_valid(scheduler)) {
77         /* Execute all remaining tasks as CANCELED.
78          * Do this in a loop so that tasks scheduled by other tasks are executed */
79         while (aws_task_scheduler_has_tasks(scheduler, NULL)) {
80             s_run_all(scheduler, UINT64_MAX, AWS_TASK_STATUS_CANCELED);
81         }
82     }
83 
84     aws_priority_queue_clean_up(&scheduler->timed_queue);
85     AWS_ZERO_STRUCT(*scheduler);
86 }
87 
aws_task_scheduler_is_valid(const struct aws_task_scheduler * scheduler)88 bool aws_task_scheduler_is_valid(const struct aws_task_scheduler *scheduler) {
89     return scheduler && scheduler->alloc && aws_priority_queue_is_valid(&scheduler->timed_queue) &&
90            aws_linked_list_is_valid(&scheduler->asap_list) && aws_linked_list_is_valid(&scheduler->timed_list);
91 }
92 
aws_task_scheduler_has_tasks(const struct aws_task_scheduler * scheduler,uint64_t * next_task_time)93 bool aws_task_scheduler_has_tasks(const struct aws_task_scheduler *scheduler, uint64_t *next_task_time) {
94     AWS_ASSERT(scheduler);
95 
96     uint64_t timestamp = UINT64_MAX;
97     bool has_tasks = false;
98 
99     if (!aws_linked_list_empty(&scheduler->asap_list)) {
100         timestamp = 0;
101         has_tasks = true;
102 
103     } else {
104         /* Check whether timed_list or timed_queue has the earlier task */
105         if (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) {
106             struct aws_linked_list_node *node = aws_linked_list_front(&scheduler->timed_list);
107             struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
108             timestamp = task->timestamp;
109             has_tasks = true;
110         }
111 
112         struct aws_task **task_ptrptr = NULL;
113         if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&task_ptrptr) == AWS_OP_SUCCESS) {
114             if ((*task_ptrptr)->timestamp < timestamp) {
115                 timestamp = (*task_ptrptr)->timestamp;
116             }
117             has_tasks = true;
118         }
119     }
120 
121     if (next_task_time) {
122         *next_task_time = timestamp;
123     }
124     return has_tasks;
125 }
126 
aws_task_scheduler_schedule_now(struct aws_task_scheduler * scheduler,struct aws_task * task)127 void aws_task_scheduler_schedule_now(struct aws_task_scheduler *scheduler, struct aws_task *task) {
128     AWS_ASSERT(scheduler);
129     AWS_ASSERT(task);
130     AWS_ASSERT(task->fn);
131 
132     AWS_LOGF_DEBUG(
133         AWS_LS_COMMON_TASK_SCHEDULER,
134         "id=%p: Scheduling %s task for immediate execution",
135         (void *)task,
136         task->type_tag);
137 
138     task->priority_queue_node.current_index = SIZE_MAX;
139     aws_linked_list_node_reset(&task->node);
140     task->timestamp = 0;
141 
142     aws_linked_list_push_back(&scheduler->asap_list, &task->node);
143     task->abi_extension.scheduled = true;
144 }
145 
aws_task_scheduler_schedule_future(struct aws_task_scheduler * scheduler,struct aws_task * task,uint64_t time_to_run)146 void aws_task_scheduler_schedule_future(
147     struct aws_task_scheduler *scheduler,
148     struct aws_task *task,
149     uint64_t time_to_run) {
150 
151     AWS_ASSERT(scheduler);
152     AWS_ASSERT(task);
153     AWS_ASSERT(task->fn);
154 
155     AWS_LOGF_DEBUG(
156         AWS_LS_COMMON_TASK_SCHEDULER,
157         "id=%p: Scheduling %s task for future execution at time %" PRIu64,
158         (void *)task,
159         task->type_tag,
160         time_to_run);
161 
162     task->timestamp = time_to_run;
163 
164     task->priority_queue_node.current_index = SIZE_MAX;
165     aws_linked_list_node_reset(&task->node);
166     int err = aws_priority_queue_push_ref(&scheduler->timed_queue, &task, &task->priority_queue_node);
167     if (AWS_UNLIKELY(err)) {
168         /* In the (very unlikely) case that we can't push into the timed_queue,
169          * perform a sorted insertion into timed_list. */
170         struct aws_linked_list_node *node_i;
171         for (node_i = aws_linked_list_begin(&scheduler->timed_list);
172              node_i != aws_linked_list_end(&scheduler->timed_list);
173              node_i = aws_linked_list_next(node_i)) {
174 
175             struct aws_task *task_i = AWS_CONTAINER_OF(node_i, struct aws_task, node);
176             if (task_i->timestamp > time_to_run) {
177                 break;
178             }
179         }
180         aws_linked_list_insert_before(node_i, &task->node);
181     }
182     task->abi_extension.scheduled = true;
183 }
184 
aws_task_scheduler_run_all(struct aws_task_scheduler * scheduler,uint64_t current_time)185 void aws_task_scheduler_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time) {
186     AWS_ASSERT(scheduler);
187 
188     s_run_all(scheduler, current_time, AWS_TASK_STATUS_RUN_READY);
189 }
190 
s_run_all(struct aws_task_scheduler * scheduler,uint64_t current_time,enum aws_task_status status)191 static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status) {
192 
193     /* Move scheduled tasks to running_list before executing.
194      * This gives us the desired behavior that: if executing a task results in another task being scheduled,
195      * that new task is not executed until the next time run() is invoked. */
196     struct aws_linked_list running_list;
197     aws_linked_list_init(&running_list);
198 
199     /* First move everything from asap_list */
200     aws_linked_list_swap_contents(&running_list, &scheduler->asap_list);
201 
202     /* Next move tasks from timed_queue and timed_list, based on whichever's next-task is sooner.
203      * It's very unlikely that any tasks are in timed_list, so once it has no more valid tasks,
204      * break out of this complex loop in favor of a simpler one. */
205     while (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) {
206 
207         struct aws_linked_list_node *timed_list_node = aws_linked_list_begin(&scheduler->timed_list);
208         struct aws_task *timed_list_task = AWS_CONTAINER_OF(timed_list_node, struct aws_task, node);
209         if (timed_list_task->timestamp > current_time) {
210             /* timed_list is out of valid tasks, break out of complex loop */
211             break;
212         }
213 
214         /* Check if timed_queue has a task which is sooner */
215         struct aws_task **timed_queue_task_ptrptr = NULL;
216         if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) {
217             if ((*timed_queue_task_ptrptr)->timestamp <= current_time) {
218                 if ((*timed_queue_task_ptrptr)->timestamp < timed_list_task->timestamp) {
219                     /* Take task from timed_queue */
220                     struct aws_task *timed_queue_task;
221                     aws_priority_queue_pop(&scheduler->timed_queue, &timed_queue_task);
222                     aws_linked_list_push_back(&running_list, &timed_queue_task->node);
223                     continue;
224                 }
225             }
226         }
227 
228         /* Take task from timed_list */
229         aws_linked_list_pop_front(&scheduler->timed_list);
230         aws_linked_list_push_back(&running_list, &timed_list_task->node);
231     }
232 
233     /* Simpler loop that moves remaining valid tasks from timed_queue */
234     struct aws_task **timed_queue_task_ptrptr = NULL;
235     while (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) {
236         if ((*timed_queue_task_ptrptr)->timestamp > current_time) {
237             break;
238         }
239 
240         struct aws_task *next_timed_task;
241         aws_priority_queue_pop(&scheduler->timed_queue, &next_timed_task);
242         aws_linked_list_push_back(&running_list, &next_timed_task->node);
243     }
244 
245     /* Run tasks */
246     while (!aws_linked_list_empty(&running_list)) {
247         struct aws_linked_list_node *task_node = aws_linked_list_pop_front(&running_list);
248         struct aws_task *task = AWS_CONTAINER_OF(task_node, struct aws_task, node);
249         aws_task_run(task, status);
250     }
251 }
252 
aws_task_scheduler_cancel_task(struct aws_task_scheduler * scheduler,struct aws_task * task)253 void aws_task_scheduler_cancel_task(struct aws_task_scheduler *scheduler, struct aws_task *task) {
254     /* attempt the linked lists first since those will be faster access and more likely to occur
255      * anyways.
256      */
257     if (task->node.next) {
258         aws_linked_list_remove(&task->node);
259     } else if (task->abi_extension.scheduled) {
260         aws_priority_queue_remove(&scheduler->timed_queue, &task, &task->priority_queue_node);
261     }
262 
263     /*
264      * No need to log cancellation specially; it will get logged during the run call with the canceled status
265      */
266     aws_task_run(task, AWS_TASK_STATUS_CANCELED);
267 }
268