1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/io/channel.h>
7 
8 #include <aws/common/atomics.h>
9 #include <aws/common/clock.h>
10 #include <aws/common/mutex.h>
11 
12 #include <aws/io/event_loop.h>
13 #include <aws/io/logging.h>
14 #include <aws/io/message_pool.h>
15 #include <aws/io/statistics.h>
16 
17 #if _MSC_VER
18 #    pragma warning(disable : 4204) /* non-constant aggregate initializer */
19 #endif
20 
21 static size_t s_message_pool_key = 0; /* Address of variable serves as key in hash table */
22 
23 enum {
24     KB_16 = 16 * 1024,
25 };
26 
27 size_t g_aws_channel_max_fragment_size = KB_16;
28 
29 #define INITIAL_STATISTIC_LIST_SIZE 5
30 
31 enum aws_channel_state {
32     AWS_CHANNEL_SETTING_UP,
33     AWS_CHANNEL_ACTIVE,
34     AWS_CHANNEL_SHUTTING_DOWN,
35     AWS_CHANNEL_SHUT_DOWN,
36 };
37 
38 struct aws_shutdown_notification_task {
39     struct aws_task task;
40     int error_code;
41     struct aws_channel_slot *slot;
42     bool shutdown_immediately;
43 };
44 
45 struct shutdown_task {
46     struct aws_channel_task task;
47     struct aws_channel *channel;
48     int error_code;
49     bool shutdown_immediately;
50 };
51 
52 struct aws_channel {
53     struct aws_allocator *alloc;
54     struct aws_event_loop *loop;
55     struct aws_channel_slot *first;
56     struct aws_message_pool *msg_pool;
57     enum aws_channel_state channel_state;
58     struct aws_shutdown_notification_task shutdown_notify_task;
59     aws_channel_on_shutdown_completed_fn *on_shutdown_completed;
60     void *shutdown_user_data;
61     struct aws_atomic_var refcount;
62     struct aws_task deletion_task;
63 
64     struct aws_task statistics_task;
65     struct aws_crt_statistics_handler *statistics_handler;
66     uint64_t statistics_interval_start_time_ms;
67     struct aws_array_list statistic_list;
68 
69     struct {
70         struct aws_linked_list list;
71     } channel_thread_tasks;
72     struct {
73         struct aws_mutex lock;
74         struct aws_linked_list list;
75         struct aws_task scheduling_task;
76         struct shutdown_task shutdown_task;
77         bool is_channel_shut_down;
78     } cross_thread_tasks;
79 
80     size_t window_update_batch_emit_threshold;
81     struct aws_channel_task window_update_task;
82     bool read_back_pressure_enabled;
83     bool window_update_in_progress;
84 };
85 
86 struct channel_setup_args {
87     struct aws_allocator *alloc;
88     struct aws_channel *channel;
89     aws_channel_on_setup_completed_fn *on_setup_completed;
90     void *user_data;
91     struct aws_task task;
92 };
93 
s_on_msg_pool_removed(struct aws_event_loop_local_object * object)94 static void s_on_msg_pool_removed(struct aws_event_loop_local_object *object) {
95     struct aws_message_pool *msg_pool = object->object;
96     AWS_LOGF_TRACE(
97         AWS_LS_IO_CHANNEL,
98         "static: message pool %p has been purged "
99         "from the event-loop: likely because of shutdown",
100         (void *)msg_pool);
101     struct aws_allocator *alloc = msg_pool->alloc;
102     aws_message_pool_clean_up(msg_pool);
103     aws_mem_release(alloc, msg_pool);
104     aws_mem_release(alloc, object);
105 }
106 
s_on_channel_setup_complete(struct aws_task * task,void * arg,enum aws_task_status task_status)107 static void s_on_channel_setup_complete(struct aws_task *task, void *arg, enum aws_task_status task_status) {
108 
109     (void)task;
110     struct channel_setup_args *setup_args = arg;
111     struct aws_message_pool *message_pool = NULL;
112     struct aws_event_loop_local_object *local_object = NULL;
113 
114     AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: setup complete, notifying caller.", (void *)setup_args->channel);
115     if (task_status == AWS_TASK_STATUS_RUN_READY) {
116         struct aws_event_loop_local_object stack_obj;
117         AWS_ZERO_STRUCT(stack_obj);
118         local_object = &stack_obj;
119 
120         if (aws_event_loop_fetch_local_object(setup_args->channel->loop, &s_message_pool_key, local_object)) {
121 
122             local_object = aws_mem_calloc(setup_args->alloc, 1, sizeof(struct aws_event_loop_local_object));
123             if (!local_object) {
124                 goto cleanup_setup_args;
125             }
126 
127             message_pool = aws_mem_acquire(setup_args->alloc, sizeof(struct aws_message_pool));
128             if (!message_pool) {
129                 goto cleanup_local_obj;
130             }
131 
132             AWS_LOGF_DEBUG(
133                 AWS_LS_IO_CHANNEL,
134                 "id=%p: no message pool is currently stored in the event-loop "
135                 "local storage, adding %p with max message size %zu, "
136                 "message count 4, with 4 small blocks of 128 bytes.",
137                 (void *)setup_args->channel,
138                 (void *)message_pool,
139                 g_aws_channel_max_fragment_size);
140 
141             struct aws_message_pool_creation_args creation_args = {
142                 .application_data_msg_data_size = g_aws_channel_max_fragment_size,
143                 .application_data_msg_count = 4,
144                 .small_block_msg_count = 4,
145                 .small_block_msg_data_size = 128,
146             };
147 
148             if (aws_message_pool_init(message_pool, setup_args->alloc, &creation_args)) {
149                 goto cleanup_msg_pool_mem;
150             }
151 
152             local_object->key = &s_message_pool_key;
153             local_object->object = message_pool;
154             local_object->on_object_removed = s_on_msg_pool_removed;
155 
156             if (aws_event_loop_put_local_object(setup_args->channel->loop, local_object)) {
157                 goto cleanup_msg_pool;
158             }
159         } else {
160             message_pool = local_object->object;
161             AWS_LOGF_DEBUG(
162                 AWS_LS_IO_CHANNEL,
163                 "id=%p: message pool %p found in event-loop local storage: using it.",
164                 (void *)setup_args->channel,
165                 (void *)message_pool)
166         }
167 
168         setup_args->channel->msg_pool = message_pool;
169         setup_args->channel->channel_state = AWS_CHANNEL_ACTIVE;
170         setup_args->on_setup_completed(setup_args->channel, AWS_OP_SUCCESS, setup_args->user_data);
171         aws_channel_release_hold(setup_args->channel);
172         aws_mem_release(setup_args->alloc, setup_args);
173         return;
174     }
175 
176     goto cleanup_setup_args;
177 
178 cleanup_msg_pool:
179     aws_message_pool_clean_up(message_pool);
180 
181 cleanup_msg_pool_mem:
182     aws_mem_release(setup_args->alloc, message_pool);
183 
184 cleanup_local_obj:
185     aws_mem_release(setup_args->alloc, local_object);
186 
187 cleanup_setup_args:
188     setup_args->on_setup_completed(setup_args->channel, AWS_OP_ERR, setup_args->user_data);
189     aws_channel_release_hold(setup_args->channel);
190     aws_mem_release(setup_args->alloc, setup_args);
191 }
192 
193 static void s_schedule_cross_thread_tasks(struct aws_task *task, void *arg, enum aws_task_status status);
194 
s_destroy_partially_constructed_channel(struct aws_channel * channel)195 static void s_destroy_partially_constructed_channel(struct aws_channel *channel) {
196     if (channel == NULL) {
197         return;
198     }
199 
200     aws_array_list_clean_up(&channel->statistic_list);
201 
202     aws_mem_release(channel->alloc, channel);
203 }
204 
aws_channel_new(struct aws_allocator * alloc,const struct aws_channel_options * creation_args)205 struct aws_channel *aws_channel_new(struct aws_allocator *alloc, const struct aws_channel_options *creation_args) {
206     AWS_PRECONDITION(creation_args);
207     AWS_PRECONDITION(creation_args->event_loop);
208     AWS_PRECONDITION(creation_args->on_setup_completed);
209 
210     struct aws_channel *channel = aws_mem_calloc(alloc, 1, sizeof(struct aws_channel));
211     if (!channel) {
212         return NULL;
213     }
214 
215     AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: Beginning creation and setup of new channel.", (void *)channel);
216     channel->alloc = alloc;
217     channel->loop = creation_args->event_loop;
218     channel->on_shutdown_completed = creation_args->on_shutdown_completed;
219     channel->shutdown_user_data = creation_args->shutdown_user_data;
220 
221     if (aws_array_list_init_dynamic(
222             &channel->statistic_list, alloc, INITIAL_STATISTIC_LIST_SIZE, sizeof(struct aws_crt_statistics_base *))) {
223         goto on_error;
224     }
225 
226     /* Start refcount at 2:
227      * 1 for self-reference, released from aws_channel_destroy()
228      * 1 for the setup task, released when task executes */
229     aws_atomic_init_int(&channel->refcount, 2);
230 
231     struct channel_setup_args *setup_args = aws_mem_calloc(alloc, 1, sizeof(struct channel_setup_args));
232     if (!setup_args) {
233         goto on_error;
234     }
235 
236     channel->channel_state = AWS_CHANNEL_SETTING_UP;
237     aws_linked_list_init(&channel->channel_thread_tasks.list);
238     aws_linked_list_init(&channel->cross_thread_tasks.list);
239     channel->cross_thread_tasks.lock = (struct aws_mutex)AWS_MUTEX_INIT;
240 
241     if (creation_args->enable_read_back_pressure) {
242         channel->read_back_pressure_enabled = true;
243         /* we probably only need room for one fragment, but let's avoid potential deadlocks
244          * on things like tls that need extra head-room. */
245         channel->window_update_batch_emit_threshold = g_aws_channel_max_fragment_size * 2;
246     }
247 
248     aws_task_init(
249         &channel->cross_thread_tasks.scheduling_task,
250         s_schedule_cross_thread_tasks,
251         channel,
252         "schedule_cross_thread_tasks");
253 
254     setup_args->alloc = alloc;
255     setup_args->channel = channel;
256     setup_args->on_setup_completed = creation_args->on_setup_completed;
257     setup_args->user_data = creation_args->setup_user_data;
258 
259     aws_task_init(&setup_args->task, s_on_channel_setup_complete, setup_args, "on_channel_setup_complete");
260     aws_event_loop_schedule_task_now(creation_args->event_loop, &setup_args->task);
261 
262     return channel;
263 
264 on_error:
265 
266     s_destroy_partially_constructed_channel(channel);
267 
268     return NULL;
269 }
270 
s_cleanup_slot(struct aws_channel_slot * slot)271 static void s_cleanup_slot(struct aws_channel_slot *slot) {
272     if (slot) {
273         if (slot->handler) {
274             aws_channel_handler_destroy(slot->handler);
275         }
276         aws_mem_release(slot->alloc, slot);
277     }
278 }
279 
aws_channel_destroy(struct aws_channel * channel)280 void aws_channel_destroy(struct aws_channel *channel) {
281     AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: destroying channel.", (void *)channel);
282 
283     aws_channel_release_hold(channel);
284 }
285 
s_final_channel_deletion_task(struct aws_task * task,void * arg,enum aws_task_status status)286 static void s_final_channel_deletion_task(struct aws_task *task, void *arg, enum aws_task_status status) {
287     (void)task;
288     (void)status;
289     struct aws_channel *channel = arg;
290 
291     struct aws_channel_slot *current = channel->first;
292 
293     if (!current || !current->handler) {
294         /* Allow channels with no valid slots to skip shutdown process */
295         channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
296     }
297 
298     AWS_ASSERT(channel->channel_state == AWS_CHANNEL_SHUT_DOWN);
299 
300     while (current) {
301         struct aws_channel_slot *tmp = current->adj_right;
302         s_cleanup_slot(current);
303         current = tmp;
304     }
305 
306     aws_array_list_clean_up(&channel->statistic_list);
307 
308     aws_channel_set_statistics_handler(channel, NULL);
309 
310     aws_mem_release(channel->alloc, channel);
311 }
312 
aws_channel_acquire_hold(struct aws_channel * channel)313 void aws_channel_acquire_hold(struct aws_channel *channel) {
314     size_t prev_refcount = aws_atomic_fetch_add(&channel->refcount, 1);
315     AWS_ASSERT(prev_refcount != 0);
316     (void)prev_refcount;
317 }
318 
aws_channel_release_hold(struct aws_channel * channel)319 void aws_channel_release_hold(struct aws_channel *channel) {
320     size_t prev_refcount = aws_atomic_fetch_sub(&channel->refcount, 1);
321     AWS_ASSERT(prev_refcount != 0);
322 
323     if (prev_refcount == 1) {
324         /* Refcount is now 0, finish cleaning up channel memory. */
325         if (aws_channel_thread_is_callers_thread(channel)) {
326             s_final_channel_deletion_task(NULL, channel, AWS_TASK_STATUS_RUN_READY);
327         } else {
328             aws_task_init(&channel->deletion_task, s_final_channel_deletion_task, channel, "final_channel_deletion");
329             aws_event_loop_schedule_task_now(channel->loop, &channel->deletion_task);
330         }
331     }
332 }
333 
334 struct channel_shutdown_task_args {
335     struct aws_channel *channel;
336     struct aws_allocator *alloc;
337     int error_code;
338     struct aws_task task;
339 };
340 
341 static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately);
342 
343 static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status);
344 
s_shutdown_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)345 static void s_shutdown_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
346 
347     (void)task;
348     (void)status;
349     struct shutdown_task *shutdown_task = arg;
350     struct aws_channel *channel = shutdown_task->channel;
351     int error_code = shutdown_task->error_code;
352     bool shutdown_immediately = shutdown_task->shutdown_immediately;
353     if (channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
354         AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: beginning shutdown process", (void *)channel);
355 
356         struct aws_channel_slot *slot = channel->first;
357         channel->channel_state = AWS_CHANNEL_SHUTTING_DOWN;
358 
359         if (slot) {
360             AWS_LOGF_TRACE(
361                 AWS_LS_IO_CHANNEL,
362                 "id=%p: shutting down slot %p (the first one) in the read direction",
363                 (void *)channel,
364                 (void *)slot);
365 
366             aws_channel_slot_shutdown(slot, AWS_CHANNEL_DIR_READ, error_code, shutdown_immediately);
367             return;
368         }
369 
370         channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
371         AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: shutdown completed", (void *)channel);
372 
373         aws_mutex_lock(&channel->cross_thread_tasks.lock);
374         channel->cross_thread_tasks.is_channel_shut_down = true;
375         aws_mutex_unlock(&channel->cross_thread_tasks.lock);
376 
377         if (channel->on_shutdown_completed) {
378             channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
379             channel->shutdown_notify_task.task.arg = channel;
380             channel->shutdown_notify_task.error_code = error_code;
381             aws_event_loop_schedule_task_now(channel->loop, &channel->shutdown_notify_task.task);
382         }
383     }
384 }
385 
s_channel_shutdown(struct aws_channel * channel,int error_code,bool shutdown_immediately)386 static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately) {
387     bool need_to_schedule = true;
388     aws_mutex_lock(&channel->cross_thread_tasks.lock);
389     if (channel->cross_thread_tasks.shutdown_task.task.task_fn) {
390         need_to_schedule = false;
391         AWS_LOGF_DEBUG(
392             AWS_LS_IO_CHANNEL, "id=%p: Channel shutdown is already pending, not scheduling another.", (void *)channel);
393 
394     } else {
395         aws_channel_task_init(
396             &channel->cross_thread_tasks.shutdown_task.task,
397             s_shutdown_task,
398             &channel->cross_thread_tasks.shutdown_task,
399             "channel_shutdown");
400         channel->cross_thread_tasks.shutdown_task.shutdown_immediately = shutdown_immediately;
401         channel->cross_thread_tasks.shutdown_task.channel = channel;
402         channel->cross_thread_tasks.shutdown_task.error_code = error_code;
403     }
404 
405     aws_mutex_unlock(&channel->cross_thread_tasks.lock);
406 
407     if (need_to_schedule) {
408         AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: channel shutdown task is scheduled", (void *)channel);
409         aws_channel_schedule_task_now(channel, &channel->cross_thread_tasks.shutdown_task.task);
410     }
411 
412     return AWS_OP_SUCCESS;
413 }
414 
aws_channel_shutdown(struct aws_channel * channel,int error_code)415 int aws_channel_shutdown(struct aws_channel *channel, int error_code) {
416     return s_channel_shutdown(channel, error_code, false);
417 }
418 
aws_channel_acquire_message_from_pool(struct aws_channel * channel,enum aws_io_message_type message_type,size_t size_hint)419 struct aws_io_message *aws_channel_acquire_message_from_pool(
420     struct aws_channel *channel,
421     enum aws_io_message_type message_type,
422     size_t size_hint) {
423 
424     struct aws_io_message *message = aws_message_pool_acquire(channel->msg_pool, message_type, size_hint);
425 
426     if (AWS_LIKELY(message)) {
427         message->owning_channel = channel;
428         AWS_LOGF_TRACE(
429             AWS_LS_IO_CHANNEL,
430             "id=%p: acquired message %p of capacity %zu from pool %p. Requested size was %zu",
431             (void *)channel,
432             (void *)message,
433             message->message_data.capacity,
434             (void *)channel->msg_pool,
435             size_hint);
436     }
437 
438     return message;
439 }
440 
aws_channel_slot_new(struct aws_channel * channel)441 struct aws_channel_slot *aws_channel_slot_new(struct aws_channel *channel) {
442     struct aws_channel_slot *new_slot = aws_mem_calloc(channel->alloc, 1, sizeof(struct aws_channel_slot));
443     if (!new_slot) {
444         return NULL;
445     }
446 
447     AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: creating new slot %p.", (void *)channel, (void *)new_slot);
448     new_slot->alloc = channel->alloc;
449     new_slot->channel = channel;
450 
451     if (!channel->first) {
452         channel->first = new_slot;
453     }
454 
455     return new_slot;
456 }
457 
aws_channel_current_clock_time(struct aws_channel * channel,uint64_t * time_nanos)458 int aws_channel_current_clock_time(struct aws_channel *channel, uint64_t *time_nanos) {
459     return aws_event_loop_current_clock_time(channel->loop, time_nanos);
460 }
461 
aws_channel_fetch_local_object(struct aws_channel * channel,const void * key,struct aws_event_loop_local_object * obj)462 int aws_channel_fetch_local_object(
463     struct aws_channel *channel,
464     const void *key,
465     struct aws_event_loop_local_object *obj) {
466 
467     return aws_event_loop_fetch_local_object(channel->loop, (void *)key, obj);
468 }
aws_channel_put_local_object(struct aws_channel * channel,const void * key,const struct aws_event_loop_local_object * obj)469 int aws_channel_put_local_object(
470     struct aws_channel *channel,
471     const void *key,
472     const struct aws_event_loop_local_object *obj) {
473 
474     (void)key;
475     return aws_event_loop_put_local_object(channel->loop, (struct aws_event_loop_local_object *)obj);
476 }
477 
aws_channel_remove_local_object(struct aws_channel * channel,const void * key,struct aws_event_loop_local_object * removed_obj)478 int aws_channel_remove_local_object(
479     struct aws_channel *channel,
480     const void *key,
481     struct aws_event_loop_local_object *removed_obj) {
482 
483     return aws_event_loop_remove_local_object(channel->loop, (void *)key, removed_obj);
484 }
485 
s_channel_task_run(struct aws_task * task,void * arg,enum aws_task_status status)486 static void s_channel_task_run(struct aws_task *task, void *arg, enum aws_task_status status) {
487     struct aws_channel_task *channel_task = AWS_CONTAINER_OF(task, struct aws_channel_task, wrapper_task);
488     struct aws_channel *channel = arg;
489 
490     /* Any task that runs after shutdown completes is considered canceled */
491     if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
492         status = AWS_TASK_STATUS_CANCELED;
493     }
494 
495     aws_linked_list_remove(&channel_task->node);
496     channel_task->task_fn(channel_task, channel_task->arg, status);
497 }
498 
s_schedule_cross_thread_tasks(struct aws_task * task,void * arg,enum aws_task_status status)499 static void s_schedule_cross_thread_tasks(struct aws_task *task, void *arg, enum aws_task_status status) {
500     (void)task;
501     struct aws_channel *channel = arg;
502 
503     struct aws_linked_list cross_thread_task_list;
504     aws_linked_list_init(&cross_thread_task_list);
505 
506     /* Grab contents of cross-thread task list while we have the lock */
507     aws_mutex_lock(&channel->cross_thread_tasks.lock);
508     aws_linked_list_swap_contents(&channel->cross_thread_tasks.list, &cross_thread_task_list);
509     aws_mutex_unlock(&channel->cross_thread_tasks.lock);
510 
511     /* If the channel has shut down since the cross-thread tasks were scheduled, run tasks immediately as canceled */
512     if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
513         status = AWS_TASK_STATUS_CANCELED;
514     }
515 
516     while (!aws_linked_list_empty(&cross_thread_task_list)) {
517         struct aws_linked_list_node *node = aws_linked_list_pop_front(&cross_thread_task_list);
518         struct aws_channel_task *channel_task = AWS_CONTAINER_OF(node, struct aws_channel_task, node);
519 
520         if ((channel_task->wrapper_task.timestamp == 0) || (status == AWS_TASK_STATUS_CANCELED)) {
521             /* Run "now" tasks, and canceled tasks, immediately */
522             channel_task->task_fn(channel_task, channel_task->arg, status);
523         } else {
524             /* "Future" tasks are scheduled with the event-loop. */
525             aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node);
526             aws_event_loop_schedule_task_future(
527                 channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp);
528         }
529     }
530 }
531 
aws_channel_task_init(struct aws_channel_task * channel_task,aws_channel_task_fn * task_fn,void * arg,const char * type_tag)532 void aws_channel_task_init(
533     struct aws_channel_task *channel_task,
534     aws_channel_task_fn *task_fn,
535     void *arg,
536     const char *type_tag) {
537     AWS_ZERO_STRUCT(*channel_task);
538     channel_task->task_fn = task_fn;
539     channel_task->arg = arg;
540     channel_task->type_tag = type_tag;
541 }
542 
543 /* Common functionality for scheduling "now" and "future" tasks.
544  * For "now" tasks, pass 0 for `run_at_nanos` */
s_register_pending_task(struct aws_channel * channel,struct aws_channel_task * channel_task,uint64_t run_at_nanos)545 static void s_register_pending_task(
546     struct aws_channel *channel,
547     struct aws_channel_task *channel_task,
548     uint64_t run_at_nanos) {
549 
550     /* Reset every property on channel task other than user's fn & arg.*/
551     aws_task_init(&channel_task->wrapper_task, s_channel_task_run, channel, channel_task->type_tag);
552     channel_task->wrapper_task.timestamp = run_at_nanos;
553     aws_linked_list_node_reset(&channel_task->node);
554 
555     if (aws_channel_thread_is_callers_thread(channel)) {
556         AWS_LOGF_TRACE(
557             AWS_LS_IO_CHANNEL,
558             "id=%p: scheduling task with wrapper task id %p.",
559             (void *)channel,
560             (void *)&channel_task->wrapper_task);
561 
562         /* If channel is shut down, run task immediately as canceled */
563         if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
564             AWS_LOGF_DEBUG(
565                 AWS_LS_IO_CHANNEL,
566                 "id=%p: Running %s channel task immediately as canceled due to shut down channel",
567                 (void *)channel,
568                 channel_task->type_tag);
569             channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED);
570             return;
571         }
572 
573         aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node);
574         if (run_at_nanos == 0) {
575             aws_event_loop_schedule_task_now(channel->loop, &channel_task->wrapper_task);
576         } else {
577             aws_event_loop_schedule_task_future(
578                 channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp);
579         }
580         return;
581     }
582 
583     AWS_LOGF_TRACE(
584         AWS_LS_IO_CHANNEL,
585         "id=%p: scheduling task with wrapper task id %p from "
586         "outside the event-loop thread.",
587         (void *)channel,
588         (void *)&channel_task->wrapper_task);
589     /* Outside event-loop thread... */
590     bool should_cancel_task = false;
591 
592     /* Begin Critical Section */
593     aws_mutex_lock(&channel->cross_thread_tasks.lock);
594     if (channel->cross_thread_tasks.is_channel_shut_down) {
595         should_cancel_task = true; /* run task outside critical section to avoid deadlock */
596     } else {
597         bool list_was_empty = aws_linked_list_empty(&channel->cross_thread_tasks.list);
598         aws_linked_list_push_back(&channel->cross_thread_tasks.list, &channel_task->node);
599 
600         if (list_was_empty) {
601             aws_event_loop_schedule_task_now(channel->loop, &channel->cross_thread_tasks.scheduling_task);
602         }
603     }
604     aws_mutex_unlock(&channel->cross_thread_tasks.lock);
605     /* End Critical Section */
606 
607     if (should_cancel_task) {
608         channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED);
609     }
610 }
611 
aws_channel_schedule_task_now(struct aws_channel * channel,struct aws_channel_task * task)612 void aws_channel_schedule_task_now(struct aws_channel *channel, struct aws_channel_task *task) {
613     s_register_pending_task(channel, task, 0);
614 }
615 
aws_channel_schedule_task_future(struct aws_channel * channel,struct aws_channel_task * task,uint64_t run_at_nanos)616 void aws_channel_schedule_task_future(
617     struct aws_channel *channel,
618     struct aws_channel_task *task,
619     uint64_t run_at_nanos) {
620 
621     s_register_pending_task(channel, task, run_at_nanos);
622 }
623 
aws_channel_thread_is_callers_thread(struct aws_channel * channel)624 bool aws_channel_thread_is_callers_thread(struct aws_channel *channel) {
625     return aws_event_loop_thread_is_callers_thread(channel->loop);
626 }
627 
s_update_channel_slot_message_overheads(struct aws_channel * channel)628 static void s_update_channel_slot_message_overheads(struct aws_channel *channel) {
629     size_t overhead = 0;
630     struct aws_channel_slot *slot_iter = channel->first;
631     while (slot_iter) {
632         slot_iter->upstream_message_overhead = overhead;
633 
634         if (slot_iter->handler) {
635             overhead += slot_iter->handler->vtable->message_overhead(slot_iter->handler);
636         }
637         slot_iter = slot_iter->adj_right;
638     }
639 }
640 
aws_channel_slot_set_handler(struct aws_channel_slot * slot,struct aws_channel_handler * handler)641 int aws_channel_slot_set_handler(struct aws_channel_slot *slot, struct aws_channel_handler *handler) {
642     slot->handler = handler;
643     slot->handler->slot = slot;
644     s_update_channel_slot_message_overheads(slot->channel);
645 
646     return aws_channel_slot_increment_read_window(slot, slot->handler->vtable->initial_window_size(handler));
647 }
648 
aws_channel_slot_remove(struct aws_channel_slot * slot)649 int aws_channel_slot_remove(struct aws_channel_slot *slot) {
650     if (slot->adj_right) {
651         slot->adj_right->adj_left = slot->adj_left;
652 
653         if (slot == slot->channel->first) {
654             slot->channel->first = slot->adj_right;
655         }
656     }
657 
658     if (slot->adj_left) {
659         slot->adj_left->adj_right = slot->adj_right;
660     }
661 
662     if (slot == slot->channel->first) {
663         slot->channel->first = NULL;
664     }
665 
666     s_update_channel_slot_message_overheads(slot->channel);
667     s_cleanup_slot(slot);
668     return AWS_OP_SUCCESS;
669 }
670 
aws_channel_slot_replace(struct aws_channel_slot * remove,struct aws_channel_slot * new_slot)671 int aws_channel_slot_replace(struct aws_channel_slot *remove, struct aws_channel_slot *new_slot) {
672     new_slot->adj_left = remove->adj_left;
673 
674     if (remove->adj_left) {
675         remove->adj_left->adj_right = new_slot;
676     }
677 
678     new_slot->adj_right = remove->adj_right;
679 
680     if (remove->adj_right) {
681         remove->adj_right->adj_left = new_slot;
682     }
683 
684     if (remove == remove->channel->first) {
685         remove->channel->first = new_slot;
686     }
687 
688     s_update_channel_slot_message_overheads(remove->channel);
689     s_cleanup_slot(remove);
690     return AWS_OP_SUCCESS;
691 }
692 
aws_channel_slot_insert_right(struct aws_channel_slot * slot,struct aws_channel_slot * to_add)693 int aws_channel_slot_insert_right(struct aws_channel_slot *slot, struct aws_channel_slot *to_add) {
694     to_add->adj_right = slot->adj_right;
695 
696     if (slot->adj_right) {
697         slot->adj_right->adj_left = to_add;
698     }
699 
700     slot->adj_right = to_add;
701     to_add->adj_left = slot;
702 
703     return AWS_OP_SUCCESS;
704 }
705 
aws_channel_slot_insert_end(struct aws_channel * channel,struct aws_channel_slot * to_add)706 int aws_channel_slot_insert_end(struct aws_channel *channel, struct aws_channel_slot *to_add) {
707     /* It's actually impossible there's not a first if the user went through the aws_channel_slot_new() function.
708      * But also check that a user didn't call insert_end if it's the first slot in the channel since first would already
709      * have been set. */
710     if (AWS_LIKELY(channel->first && channel->first != to_add)) {
711         struct aws_channel_slot *cur = channel->first;
712         while (cur->adj_right) {
713             cur = cur->adj_right;
714         }
715 
716         return aws_channel_slot_insert_right(cur, to_add);
717     }
718 
719     AWS_ASSERT(0);
720     return AWS_OP_ERR;
721 }
722 
aws_channel_slot_insert_left(struct aws_channel_slot * slot,struct aws_channel_slot * to_add)723 int aws_channel_slot_insert_left(struct aws_channel_slot *slot, struct aws_channel_slot *to_add) {
724     to_add->adj_left = slot->adj_left;
725 
726     if (slot->adj_left) {
727         slot->adj_left->adj_right = to_add;
728     }
729 
730     slot->adj_left = to_add;
731     to_add->adj_right = slot;
732 
733     if (slot == slot->channel->first) {
734         slot->channel->first = to_add;
735     }
736 
737     return AWS_OP_SUCCESS;
738 }
739 
aws_channel_slot_send_message(struct aws_channel_slot * slot,struct aws_io_message * message,enum aws_channel_direction dir)740 int aws_channel_slot_send_message(
741     struct aws_channel_slot *slot,
742     struct aws_io_message *message,
743     enum aws_channel_direction dir) {
744 
745     if (dir == AWS_CHANNEL_DIR_READ) {
746         AWS_ASSERT(slot->adj_right);
747         AWS_ASSERT(slot->adj_right->handler);
748 
749         if (!slot->channel->read_back_pressure_enabled || slot->adj_right->window_size >= message->message_data.len) {
750             AWS_LOGF_TRACE(
751                 AWS_LS_IO_CHANNEL,
752                 "id=%p: sending read message of size %zu, "
753                 "from slot %p to slot %p with handler %p.",
754                 (void *)slot->channel,
755                 message->message_data.len,
756                 (void *)slot,
757                 (void *)slot->adj_right,
758                 (void *)slot->adj_right->handler);
759             slot->adj_right->window_size -= message->message_data.len;
760             return aws_channel_handler_process_read_message(slot->adj_right->handler, slot->adj_right, message);
761         }
762         AWS_LOGF_ERROR(
763             AWS_LS_IO_CHANNEL,
764             "id=%p: sending message of size %zu, "
765             "from slot %p to slot %p with handler %p, but this would exceed the channel's "
766             "read window, this is always a programming error.",
767             (void *)slot->channel,
768             message->message_data.len,
769             (void *)slot,
770             (void *)slot->adj_right,
771             (void *)slot->adj_right->handler);
772         return aws_raise_error(AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW);
773     }
774 
775     AWS_ASSERT(slot->adj_left);
776     AWS_ASSERT(slot->adj_left->handler);
777     AWS_LOGF_TRACE(
778         AWS_LS_IO_CHANNEL,
779         "id=%p: sending write message of size %zu, "
780         "from slot %p to slot %p with handler %p.",
781         (void *)slot->channel,
782         message->message_data.len,
783         (void *)slot,
784         (void *)slot->adj_left,
785         (void *)slot->adj_left->handler);
786     return aws_channel_handler_process_write_message(slot->adj_left->handler, slot->adj_left, message);
787 }
788 
aws_channel_slot_acquire_max_message_for_write(struct aws_channel_slot * slot)789 struct aws_io_message *aws_channel_slot_acquire_max_message_for_write(struct aws_channel_slot *slot) {
790     AWS_PRECONDITION(slot);
791     AWS_PRECONDITION(slot->channel);
792     AWS_PRECONDITION(aws_channel_thread_is_callers_thread(slot->channel));
793 
794     const size_t overhead = aws_channel_slot_upstream_message_overhead(slot);
795     if (overhead >= g_aws_channel_max_fragment_size) {
796         AWS_LOGF_ERROR(
797             AWS_LS_IO_CHANNEL, "id=%p: Upstream overhead exceeds channel's max message size.", (void *)slot->channel);
798         aws_raise_error(AWS_ERROR_INVALID_STATE);
799         return NULL;
800     }
801 
802     const size_t size_hint = g_aws_channel_max_fragment_size - overhead;
803     return aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, size_hint);
804 }
805 
s_window_update_task(struct aws_channel_task * channel_task,void * arg,enum aws_task_status status)806 static void s_window_update_task(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
807     (void)channel_task;
808     struct aws_channel *channel = arg;
809 
810     if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
811         /* get the right-most slot to start the updates. */
812         struct aws_channel_slot *slot = channel->first;
813         while (slot->adj_right) {
814             slot = slot->adj_right;
815         }
816 
817         while (slot->adj_left) {
818             struct aws_channel_slot *upstream_slot = slot->adj_left;
819             if (upstream_slot->handler) {
820                 slot->window_size = aws_add_size_saturating(slot->window_size, slot->current_window_update_batch_size);
821                 size_t update_size = slot->current_window_update_batch_size;
822                 slot->current_window_update_batch_size = 0;
823                 if (aws_channel_handler_increment_read_window(upstream_slot->handler, upstream_slot, update_size)) {
824                     AWS_LOGF_ERROR(
825                         AWS_LS_IO_CHANNEL,
826                         "channel %p: channel update task failed with status %d",
827                         (void *)slot->channel,
828                         aws_last_error());
829                     slot->channel->window_update_in_progress = false;
830                     aws_channel_shutdown(channel, aws_last_error());
831                     return;
832                 }
833             }
834             slot = slot->adj_left;
835         }
836     }
837     channel->window_update_in_progress = false;
838 }
839 
aws_channel_slot_increment_read_window(struct aws_channel_slot * slot,size_t window)840 int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window) {
841 
842     if (slot->channel->read_back_pressure_enabled && slot->channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
843         slot->current_window_update_batch_size =
844             aws_add_size_saturating(slot->current_window_update_batch_size, window);
845 
846         if (!slot->channel->window_update_in_progress &&
847             slot->window_size <= slot->channel->window_update_batch_emit_threshold) {
848             slot->channel->window_update_in_progress = true;
849             aws_channel_task_init(
850                 &slot->channel->window_update_task, s_window_update_task, slot->channel, "window update task");
851             aws_channel_schedule_task_now(slot->channel, &slot->channel->window_update_task);
852         }
853     }
854 
855     return AWS_OP_SUCCESS;
856 }
857 
aws_channel_slot_shutdown(struct aws_channel_slot * slot,enum aws_channel_direction dir,int err_code,bool free_scarce_resources_immediately)858 int aws_channel_slot_shutdown(
859     struct aws_channel_slot *slot,
860     enum aws_channel_direction dir,
861     int err_code,
862     bool free_scarce_resources_immediately) {
863     AWS_ASSERT(slot->handler);
864     AWS_LOGF_TRACE(
865         AWS_LS_IO_CHANNEL,
866         "id=%p: shutting down slot %p, with handler %p "
867         "in %s direction with error code %d",
868         (void *)slot->channel,
869         (void *)slot,
870         (void *)slot->handler,
871         (dir == AWS_CHANNEL_DIR_READ) ? "read" : "write",
872         err_code);
873     return aws_channel_handler_shutdown(slot->handler, slot, dir, err_code, free_scarce_resources_immediately);
874 }
875 
s_on_shutdown_completion_task(struct aws_task * task,void * arg,enum aws_task_status status)876 static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status) {
877     (void)status;
878 
879     struct aws_shutdown_notification_task *shutdown_notify = (struct aws_shutdown_notification_task *)task;
880     struct aws_channel *channel = arg;
881     AWS_ASSERT(channel->channel_state == AWS_CHANNEL_SHUT_DOWN);
882 
883     /* Cancel tasks that have been scheduled with the event loop */
884     while (!aws_linked_list_empty(&channel->channel_thread_tasks.list)) {
885         struct aws_linked_list_node *node = aws_linked_list_front(&channel->channel_thread_tasks.list);
886         struct aws_channel_task *channel_task = AWS_CONTAINER_OF(node, struct aws_channel_task, node);
887         AWS_LOGF_DEBUG(
888             AWS_LS_IO_CHANNEL,
889             "id=%p: during shutdown, canceling task %p",
890             (void *)channel,
891             (void *)&channel_task->wrapper_task);
892         /* The task will remove itself from the list when it's canceled */
893         aws_event_loop_cancel_task(channel->loop, &channel_task->wrapper_task);
894     }
895 
896     /* Cancel off-thread tasks, which haven't made it to the event-loop thread yet */
897     aws_mutex_lock(&channel->cross_thread_tasks.lock);
898     bool cancel_cross_thread_tasks = !aws_linked_list_empty(&channel->cross_thread_tasks.list);
899     aws_mutex_unlock(&channel->cross_thread_tasks.lock);
900 
901     if (cancel_cross_thread_tasks) {
902         aws_event_loop_cancel_task(channel->loop, &channel->cross_thread_tasks.scheduling_task);
903     }
904 
905     AWS_ASSERT(aws_linked_list_empty(&channel->channel_thread_tasks.list));
906     AWS_ASSERT(aws_linked_list_empty(&channel->cross_thread_tasks.list));
907 
908     channel->on_shutdown_completed(channel, shutdown_notify->error_code, channel->shutdown_user_data);
909 }
910 
s_run_shutdown_write_direction(struct aws_task * task,void * arg,enum aws_task_status status)911 static void s_run_shutdown_write_direction(struct aws_task *task, void *arg, enum aws_task_status status) {
912     (void)arg;
913     (void)status;
914 
915     struct aws_shutdown_notification_task *shutdown_notify = (struct aws_shutdown_notification_task *)task;
916     task->fn = NULL;
917     task->arg = NULL;
918     struct aws_channel_slot *slot = shutdown_notify->slot;
919     aws_channel_handler_shutdown(
920         slot->handler, slot, AWS_CHANNEL_DIR_WRITE, shutdown_notify->error_code, shutdown_notify->shutdown_immediately);
921 }
922 
aws_channel_slot_on_handler_shutdown_complete(struct aws_channel_slot * slot,enum aws_channel_direction dir,int err_code,bool free_scarce_resources_immediately)923 int aws_channel_slot_on_handler_shutdown_complete(
924     struct aws_channel_slot *slot,
925     enum aws_channel_direction dir,
926     int err_code,
927     bool free_scarce_resources_immediately) {
928 
929     AWS_LOGF_DEBUG(
930         AWS_LS_IO_CHANNEL,
931         "id=%p: handler %p shutdown in %s dir completed.",
932         (void *)slot->channel,
933         (void *)slot->handler,
934         (dir == AWS_CHANNEL_DIR_READ) ? "read" : "write");
935 
936     if (slot->channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
937         return AWS_OP_SUCCESS;
938     }
939 
940     if (dir == AWS_CHANNEL_DIR_READ) {
941         if (slot->adj_right && slot->adj_right->handler) {
942             return aws_channel_handler_shutdown(
943                 slot->adj_right->handler, slot->adj_right, dir, err_code, free_scarce_resources_immediately);
944         }
945 
946         /* break the shutdown sequence so we don't have handlers having to deal with their memory disappearing out from
947          * under them during a shutdown process. */
948         slot->channel->shutdown_notify_task.slot = slot;
949         slot->channel->shutdown_notify_task.shutdown_immediately = free_scarce_resources_immediately;
950         slot->channel->shutdown_notify_task.error_code = err_code;
951         slot->channel->shutdown_notify_task.task.fn = s_run_shutdown_write_direction;
952         slot->channel->shutdown_notify_task.task.arg = NULL;
953 
954         aws_event_loop_schedule_task_now(slot->channel->loop, &slot->channel->shutdown_notify_task.task);
955         return AWS_OP_SUCCESS;
956     }
957 
958     if (slot->adj_left && slot->adj_left->handler) {
959         return aws_channel_handler_shutdown(
960             slot->adj_left->handler, slot->adj_left, dir, err_code, free_scarce_resources_immediately);
961     }
962 
963     if (slot->channel->first == slot) {
964         slot->channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
965         aws_mutex_lock(&slot->channel->cross_thread_tasks.lock);
966         slot->channel->cross_thread_tasks.is_channel_shut_down = true;
967         aws_mutex_unlock(&slot->channel->cross_thread_tasks.lock);
968 
969         if (slot->channel->on_shutdown_completed) {
970             slot->channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
971             slot->channel->shutdown_notify_task.task.arg = slot->channel;
972             slot->channel->shutdown_notify_task.error_code = err_code;
973             aws_event_loop_schedule_task_now(slot->channel->loop, &slot->channel->shutdown_notify_task.task);
974         }
975     }
976 
977     return AWS_OP_SUCCESS;
978 }
979 
aws_channel_slot_downstream_read_window(struct aws_channel_slot * slot)980 size_t aws_channel_slot_downstream_read_window(struct aws_channel_slot *slot) {
981     AWS_ASSERT(slot->adj_right);
982     return slot->channel->read_back_pressure_enabled ? slot->adj_right->window_size : SIZE_MAX;
983 }
984 
aws_channel_slot_upstream_message_overhead(struct aws_channel_slot * slot)985 size_t aws_channel_slot_upstream_message_overhead(struct aws_channel_slot *slot) {
986     return slot->upstream_message_overhead;
987 }
988 
aws_channel_handler_destroy(struct aws_channel_handler * handler)989 void aws_channel_handler_destroy(struct aws_channel_handler *handler) {
990     AWS_ASSERT(handler->vtable && handler->vtable->destroy);
991     handler->vtable->destroy(handler);
992 }
993 
aws_channel_handler_process_read_message(struct aws_channel_handler * handler,struct aws_channel_slot * slot,struct aws_io_message * message)994 int aws_channel_handler_process_read_message(
995     struct aws_channel_handler *handler,
996     struct aws_channel_slot *slot,
997     struct aws_io_message *message) {
998 
999     AWS_ASSERT(handler->vtable && handler->vtable->process_read_message);
1000     return handler->vtable->process_read_message(handler, slot, message);
1001 }
1002 
aws_channel_handler_process_write_message(struct aws_channel_handler * handler,struct aws_channel_slot * slot,struct aws_io_message * message)1003 int aws_channel_handler_process_write_message(
1004     struct aws_channel_handler *handler,
1005     struct aws_channel_slot *slot,
1006     struct aws_io_message *message) {
1007 
1008     AWS_ASSERT(handler->vtable && handler->vtable->process_write_message);
1009     return handler->vtable->process_write_message(handler, slot, message);
1010 }
1011 
aws_channel_handler_increment_read_window(struct aws_channel_handler * handler,struct aws_channel_slot * slot,size_t size)1012 int aws_channel_handler_increment_read_window(
1013     struct aws_channel_handler *handler,
1014     struct aws_channel_slot *slot,
1015     size_t size) {
1016 
1017     AWS_ASSERT(handler->vtable && handler->vtable->increment_read_window);
1018 
1019     return handler->vtable->increment_read_window(handler, slot, size);
1020 }
1021 
aws_channel_handler_shutdown(struct aws_channel_handler * handler,struct aws_channel_slot * slot,enum aws_channel_direction dir,int error_code,bool free_scarce_resources_immediately)1022 int aws_channel_handler_shutdown(
1023     struct aws_channel_handler *handler,
1024     struct aws_channel_slot *slot,
1025     enum aws_channel_direction dir,
1026     int error_code,
1027     bool free_scarce_resources_immediately) {
1028 
1029     AWS_ASSERT(handler->vtable && handler->vtable->shutdown);
1030     return handler->vtable->shutdown(handler, slot, dir, error_code, free_scarce_resources_immediately);
1031 }
1032 
aws_channel_handler_initial_window_size(struct aws_channel_handler * handler)1033 size_t aws_channel_handler_initial_window_size(struct aws_channel_handler *handler) {
1034     AWS_ASSERT(handler->vtable && handler->vtable->initial_window_size);
1035     return handler->vtable->initial_window_size(handler);
1036 }
1037 
aws_channel_get_first_slot(struct aws_channel * channel)1038 struct aws_channel_slot *aws_channel_get_first_slot(struct aws_channel *channel) {
1039     return channel->first;
1040 }
1041 
s_reset_statistics(struct aws_channel * channel)1042 static void s_reset_statistics(struct aws_channel *channel) {
1043     AWS_FATAL_ASSERT(aws_channel_thread_is_callers_thread(channel));
1044 
1045     struct aws_channel_slot *current_slot = channel->first;
1046     while (current_slot) {
1047         struct aws_channel_handler *handler = current_slot->handler;
1048         if (handler != NULL && handler->vtable->reset_statistics != NULL) {
1049             handler->vtable->reset_statistics(handler);
1050         }
1051         current_slot = current_slot->adj_right;
1052     }
1053 }
1054 
s_channel_gather_statistics_task(struct aws_task * task,void * arg,enum aws_task_status status)1055 static void s_channel_gather_statistics_task(struct aws_task *task, void *arg, enum aws_task_status status) {
1056     if (status != AWS_TASK_STATUS_RUN_READY) {
1057         return;
1058     }
1059 
1060     struct aws_channel *channel = arg;
1061     if (channel->statistics_handler == NULL) {
1062         return;
1063     }
1064 
1065     if (channel->channel_state == AWS_CHANNEL_SHUTTING_DOWN || channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
1066         return;
1067     }
1068 
1069     uint64_t now_ns = 0;
1070     if (aws_channel_current_clock_time(channel, &now_ns)) {
1071         return;
1072     }
1073 
1074     uint64_t now_ms = aws_timestamp_convert(now_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
1075 
1076     struct aws_array_list *statistics_list = &channel->statistic_list;
1077     aws_array_list_clear(statistics_list);
1078 
1079     struct aws_channel_slot *current_slot = channel->first;
1080     while (current_slot) {
1081         struct aws_channel_handler *handler = current_slot->handler;
1082         if (handler != NULL && handler->vtable->gather_statistics != NULL) {
1083             handler->vtable->gather_statistics(handler, statistics_list);
1084         }
1085         current_slot = current_slot->adj_right;
1086     }
1087 
1088     struct aws_crt_statistics_sample_interval sample_interval = {
1089         .begin_time_ms = channel->statistics_interval_start_time_ms, .end_time_ms = now_ms};
1090 
1091     aws_crt_statistics_handler_process_statistics(
1092         channel->statistics_handler, &sample_interval, statistics_list, channel);
1093 
1094     s_reset_statistics(channel);
1095 
1096     uint64_t reschedule_interval_ns = aws_timestamp_convert(
1097         aws_crt_statistics_handler_get_report_interval_ms(channel->statistics_handler),
1098         AWS_TIMESTAMP_MILLIS,
1099         AWS_TIMESTAMP_NANOS,
1100         NULL);
1101 
1102     aws_event_loop_schedule_task_future(channel->loop, task, now_ns + reschedule_interval_ns);
1103 
1104     channel->statistics_interval_start_time_ms = now_ms;
1105 }
1106 
aws_channel_set_statistics_handler(struct aws_channel * channel,struct aws_crt_statistics_handler * handler)1107 int aws_channel_set_statistics_handler(struct aws_channel *channel, struct aws_crt_statistics_handler *handler) {
1108     AWS_FATAL_ASSERT(aws_channel_thread_is_callers_thread(channel));
1109 
1110     if (channel->statistics_handler) {
1111         aws_crt_statistics_handler_destroy(channel->statistics_handler);
1112         aws_event_loop_cancel_task(channel->loop, &channel->statistics_task);
1113         channel->statistics_handler = NULL;
1114     }
1115 
1116     if (handler != NULL) {
1117         aws_task_init(&channel->statistics_task, s_channel_gather_statistics_task, channel, "gather_statistics");
1118 
1119         uint64_t now_ns = 0;
1120         if (aws_channel_current_clock_time(channel, &now_ns)) {
1121             return AWS_OP_ERR;
1122         }
1123 
1124         uint64_t report_time_ns = now_ns + aws_timestamp_convert(
1125                                                aws_crt_statistics_handler_get_report_interval_ms(handler),
1126                                                AWS_TIMESTAMP_MILLIS,
1127                                                AWS_TIMESTAMP_NANOS,
1128                                                NULL);
1129 
1130         channel->statistics_interval_start_time_ms =
1131             aws_timestamp_convert(now_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
1132         s_reset_statistics(channel);
1133 
1134         aws_event_loop_schedule_task_future(channel->loop, &channel->statistics_task, report_time_ns);
1135     }
1136 
1137     channel->statistics_handler = handler;
1138 
1139     return AWS_OP_SUCCESS;
1140 }
1141 
aws_channel_get_event_loop(struct aws_channel * channel)1142 struct aws_event_loop *aws_channel_get_event_loop(struct aws_channel *channel) {
1143     return channel->loop;
1144 }
1145 
aws_channel_trigger_read(struct aws_channel * channel)1146 int aws_channel_trigger_read(struct aws_channel *channel) {
1147     if (channel == NULL) {
1148         return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
1149     }
1150 
1151     if (!aws_channel_thread_is_callers_thread(channel)) {
1152         return aws_raise_error(AWS_ERROR_INVALID_STATE);
1153     }
1154 
1155     struct aws_channel_slot *slot = channel->first;
1156     if (slot == NULL) {
1157         return aws_raise_error(AWS_ERROR_INVALID_STATE);
1158     }
1159 
1160     struct aws_channel_handler *handler = slot->handler;
1161     if (handler == NULL) {
1162         return aws_raise_error(AWS_ERROR_INVALID_STATE);
1163     }
1164 
1165     if (handler->vtable->trigger_read != NULL) {
1166         handler->vtable->trigger_read(handler);
1167     }
1168 
1169     return AWS_OP_SUCCESS;
1170 }
1171