1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/common/thread_scheduler.h>
7 
8 #include <aws/common/clock.h>
9 #include <aws/common/condition_variable.h>
10 #include <aws/common/task_scheduler.h>
11 #include <aws/testing/aws_test_harness.h>
12 
13 struct executed_task_data {
14     struct aws_task *task;
15     void *arg;
16     enum aws_task_status status;
17 };
18 
19 static struct executed_task_data s_executed_tasks[16];
20 static struct aws_mutex s_test_mutex = AWS_MUTEX_INIT;
21 static struct aws_condition_variable s_test_c_var = AWS_CONDITION_VARIABLE_INIT;
22 
23 static size_t s_executed_tasks_n;
24 
25 /* Updates tl_executed_tasks and tl_executed_task_n when function is executed */
s_task_n_fn(struct aws_task * task,void * arg,enum aws_task_status status)26 static void s_task_n_fn(struct aws_task *task, void *arg, enum aws_task_status status) {
27     AWS_LOGF_INFO(AWS_LS_COMMON_GENERAL, "Invoking task");
28     aws_mutex_lock(&s_test_mutex);
29     AWS_LOGF_INFO(AWS_LS_COMMON_GENERAL, "Mutex Acquired");
30     if (s_executed_tasks_n > AWS_ARRAY_SIZE(s_executed_tasks)) {
31         AWS_ASSERT(0);
32     }
33 
34     struct executed_task_data *data = &s_executed_tasks[s_executed_tasks_n++];
35     data->task = task;
36     data->arg = arg;
37     data->status = status;
38     aws_mutex_unlock(&s_test_mutex);
39     AWS_LOGF_INFO(AWS_LS_COMMON_GENERAL, "Mutex Released, notifying");
40 
41     aws_condition_variable_notify_one(&s_test_c_var);
42 }
43 
s_scheduled_tasks_ran_predicate(void * arg)44 static bool s_scheduled_tasks_ran_predicate(void *arg) {
45     size_t *waiting_for = arg;
46 
47     return *waiting_for == s_executed_tasks_n;
48 }
49 
s_test_scheduler_ordering(struct aws_allocator * allocator,void * ctx)50 static int s_test_scheduler_ordering(struct aws_allocator *allocator, void *ctx) {
51     (void)ctx;
52 
53     aws_common_library_init(allocator);
54     s_executed_tasks_n = 0;
55 
56     struct aws_thread_scheduler *thread_scheduler = aws_thread_scheduler_new(allocator, NULL);
57     ASSERT_NOT_NULL(thread_scheduler);
58 
59     struct aws_task task2;
60     aws_task_init(&task2, s_task_n_fn, (void *)2, "scheduler_ordering_1");
61 
62     /* schedule 250 ms in the future. */
63     uint64_t task2_timestamp = 0;
64     aws_high_res_clock_get_ticks(&task2_timestamp);
65     task2_timestamp += 250000000;
66     aws_thread_scheduler_schedule_future(thread_scheduler, &task2, task2_timestamp);
67 
68     struct aws_task task1;
69     aws_task_init(&task1, s_task_n_fn, (void *)1, "scheduler_ordering_2");
70 
71     /* schedule now. */
72     aws_thread_scheduler_schedule_now(thread_scheduler, &task1);
73 
74     struct aws_task task3;
75     aws_task_init(&task3, s_task_n_fn, (void *)3, "scheduler_ordering_3");
76 
77     /* schedule 500 ms in the future. */
78     uint64_t task3_timestamp = 0;
79     aws_high_res_clock_get_ticks(&task3_timestamp);
80     task3_timestamp += 500000000;
81     aws_thread_scheduler_schedule_future(thread_scheduler, &task3, task3_timestamp);
82     ASSERT_SUCCESS(aws_mutex_lock(&s_test_mutex));
83     size_t expected_runs = 2;
84     ASSERT_SUCCESS(aws_condition_variable_wait_pred(
85         &s_test_c_var, &s_test_mutex, s_scheduled_tasks_ran_predicate, &expected_runs));
86 
87     ASSERT_UINT_EQUALS(2, s_executed_tasks_n);
88 
89     struct executed_task_data *task_data = &s_executed_tasks[0];
90     ASSERT_PTR_EQUALS(&task1, task_data->task);
91     ASSERT_PTR_EQUALS(task1.arg, task_data->arg);
92     ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task_data->status);
93 
94     task_data = &s_executed_tasks[1];
95     ASSERT_PTR_EQUALS(&task2, task_data->task);
96     ASSERT_PTR_EQUALS(task2.arg, task_data->arg);
97     ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task_data->status);
98 
99     expected_runs = 3;
100     ASSERT_SUCCESS(aws_condition_variable_wait_pred(
101         &s_test_c_var, &s_test_mutex, s_scheduled_tasks_ran_predicate, &expected_runs));
102     ASSERT_SUCCESS(aws_mutex_unlock(&s_test_mutex));
103 
104     /* run task 3 */
105     ASSERT_UINT_EQUALS(3, s_executed_tasks_n);
106 
107     task_data = &s_executed_tasks[2];
108     ASSERT_PTR_EQUALS(&task3, task_data->task);
109     ASSERT_PTR_EQUALS(task3.arg, task_data->arg);
110     ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task_data->status);
111 
112     aws_thread_scheduler_release(thread_scheduler);
113     aws_common_library_clean_up();
114     return 0;
115 }
116 
AWS_TEST_CASE(test_thread_scheduler_ordering,s_test_scheduler_ordering)117 AWS_TEST_CASE(test_thread_scheduler_ordering, s_test_scheduler_ordering)
118 
119 static int s_test_scheduler_happy_path_cancellation(struct aws_allocator *allocator, void *ctx) {
120     (void)ctx;
121     aws_common_library_init(allocator);
122     s_executed_tasks_n = 0;
123 
124     struct aws_thread_scheduler *thread_scheduler = aws_thread_scheduler_new(allocator, NULL);
125     ASSERT_NOT_NULL(thread_scheduler);
126 
127     struct aws_task task2;
128     aws_task_init(&task2, s_task_n_fn, (void *)2, "scheduler_ordering_1");
129 
130     /* schedule 250 ms in the future. */
131     uint64_t task2_timestamp = 0;
132     aws_high_res_clock_get_ticks(&task2_timestamp);
133     task2_timestamp += 250000000;
134     aws_thread_scheduler_schedule_future(thread_scheduler, &task2, task2_timestamp);
135 
136     struct aws_task task1;
137     aws_task_init(&task1, s_task_n_fn, (void *)1, "scheduler_ordering_2");
138 
139     /* schedule now. */
140     aws_thread_scheduler_schedule_now(thread_scheduler, &task1);
141 
142     struct aws_task task3;
143     aws_task_init(&task3, s_task_n_fn, (void *)3, "scheduler_ordering_3");
144 
145     /* schedule 500 ms in the future. */
146     uint64_t task3_timestamp = 0;
147     aws_high_res_clock_get_ticks(&task3_timestamp);
148     task3_timestamp += 500000000;
149     aws_thread_scheduler_schedule_future(thread_scheduler, &task3, task3_timestamp);
150     ASSERT_SUCCESS(aws_mutex_lock(&s_test_mutex));
151     size_t expected_runs = 2;
152     ASSERT_SUCCESS(aws_condition_variable_wait_pred(
153         &s_test_c_var, &s_test_mutex, s_scheduled_tasks_ran_predicate, &expected_runs));
154 
155     ASSERT_UINT_EQUALS(2, s_executed_tasks_n);
156 
157     struct executed_task_data *task_data = &s_executed_tasks[0];
158     ASSERT_PTR_EQUALS(&task1, task_data->task);
159     ASSERT_PTR_EQUALS(task1.arg, task_data->arg);
160     ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task_data->status);
161 
162     task_data = &s_executed_tasks[1];
163     ASSERT_PTR_EQUALS(&task2, task_data->task);
164     ASSERT_PTR_EQUALS(task2.arg, task_data->arg);
165     ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task_data->status);
166 
167     aws_thread_scheduler_cancel_task(thread_scheduler, &task3);
168     expected_runs = 3;
169     ASSERT_SUCCESS(aws_condition_variable_wait_pred(
170         &s_test_c_var, &s_test_mutex, s_scheduled_tasks_ran_predicate, &expected_runs));
171     ASSERT_SUCCESS(aws_mutex_unlock(&s_test_mutex));
172 
173     /* run task 3 */
174     ASSERT_UINT_EQUALS(3, s_executed_tasks_n);
175 
176     task_data = &s_executed_tasks[2];
177     ASSERT_PTR_EQUALS(&task3, task_data->task);
178     ASSERT_PTR_EQUALS(task3.arg, task_data->arg);
179     ASSERT_INT_EQUALS(AWS_TASK_STATUS_CANCELED, task_data->status);
180 
181     aws_thread_scheduler_release(thread_scheduler);
182     aws_common_library_clean_up();
183     return 0;
184 }
185 
186 AWS_TEST_CASE(test_thread_scheduler_happy_path_cancellation, s_test_scheduler_happy_path_cancellation)
187 
188 static struct aws_task s_cancel_task;
189 
s_schedule_and_cancel_task(struct aws_task * task,void * arg,enum aws_task_status status)190 static void s_schedule_and_cancel_task(struct aws_task *task, void *arg, enum aws_task_status status) {
191     struct aws_thread_scheduler *scheduler = arg;
192 
193     aws_task_init(&s_cancel_task, s_task_n_fn, (void *)2, "scheduler_ordering_2");
194     aws_thread_scheduler_schedule_now(scheduler, &s_cancel_task);
195     aws_thread_scheduler_cancel_task(scheduler, &s_cancel_task);
196     s_task_n_fn(task, arg, status);
197 }
198 
199 /* schedule a task. Inside that task schedule and then immediately cancel it. This will exercise the pending to be
200  * scheduled code path. */
s_test_scheduler_cancellation_for_pending_scheduled_task(struct aws_allocator * allocator,void * ctx)201 static int s_test_scheduler_cancellation_for_pending_scheduled_task(struct aws_allocator *allocator, void *ctx) {
202     (void)ctx;
203     aws_common_library_init(allocator);
204     s_executed_tasks_n = 0;
205 
206     struct aws_thread_scheduler *thread_scheduler = aws_thread_scheduler_new(allocator, NULL);
207     ASSERT_NOT_NULL(thread_scheduler);
208 
209     struct aws_task task1;
210     aws_task_init(&task1, s_schedule_and_cancel_task, thread_scheduler, "scheduler_ordering_1");
211     aws_thread_scheduler_schedule_now(thread_scheduler, &task1);
212 
213     ASSERT_SUCCESS(aws_mutex_lock(&s_test_mutex));
214     size_t expected_runs = 2;
215     ASSERT_SUCCESS(aws_condition_variable_wait_pred(
216         &s_test_c_var, &s_test_mutex, s_scheduled_tasks_ran_predicate, &expected_runs));
217 
218     ASSERT_SUCCESS(aws_mutex_unlock(&s_test_mutex));
219 
220     ASSERT_UINT_EQUALS(2, s_executed_tasks_n);
221 
222     struct executed_task_data *task_data = &s_executed_tasks[0];
223     ASSERT_PTR_EQUALS(&task1, task_data->task);
224     ASSERT_PTR_EQUALS(task1.arg, task_data->arg);
225     ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task_data->status);
226 
227     task_data = &s_executed_tasks[1];
228     ASSERT_PTR_EQUALS(&s_cancel_task, task_data->task);
229     ASSERT_INT_EQUALS(AWS_TASK_STATUS_CANCELED, task_data->status);
230 
231     aws_thread_scheduler_release(thread_scheduler);
232     aws_common_library_clean_up();
233     return 0;
234 }
235 
236 AWS_TEST_CASE(
237     test_scheduler_cancellation_for_pending_scheduled_task,
238     s_test_scheduler_cancellation_for_pending_scheduled_task)
239