1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6 #include <aws/io/channel.h>
7
8 #include <aws/common/atomics.h>
9 #include <aws/common/clock.h>
10 #include <aws/common/mutex.h>
11
12 #include <aws/io/event_loop.h>
13 #include <aws/io/logging.h>
14 #include <aws/io/message_pool.h>
15 #include <aws/io/statistics.h>
16
17 #if _MSC_VER
18 # pragma warning(disable : 4204) /* non-constant aggregate initializer */
19 #endif
20
21 static size_t s_message_pool_key = 0; /* Address of variable serves as key in hash table */
22
23 enum {
24 KB_16 = 16 * 1024,
25 };
26
27 size_t g_aws_channel_max_fragment_size = KB_16;
28
29 #define INITIAL_STATISTIC_LIST_SIZE 5
30
31 enum aws_channel_state {
32 AWS_CHANNEL_SETTING_UP,
33 AWS_CHANNEL_ACTIVE,
34 AWS_CHANNEL_SHUTTING_DOWN,
35 AWS_CHANNEL_SHUT_DOWN,
36 };
37
38 struct aws_shutdown_notification_task {
39 struct aws_task task;
40 int error_code;
41 struct aws_channel_slot *slot;
42 bool shutdown_immediately;
43 };
44
45 struct shutdown_task {
46 struct aws_channel_task task;
47 struct aws_channel *channel;
48 int error_code;
49 bool shutdown_immediately;
50 };
51
52 struct aws_channel {
53 struct aws_allocator *alloc;
54 struct aws_event_loop *loop;
55 struct aws_channel_slot *first;
56 struct aws_message_pool *msg_pool;
57 enum aws_channel_state channel_state;
58 struct aws_shutdown_notification_task shutdown_notify_task;
59 aws_channel_on_shutdown_completed_fn *on_shutdown_completed;
60 void *shutdown_user_data;
61 struct aws_atomic_var refcount;
62 struct aws_task deletion_task;
63
64 struct aws_task statistics_task;
65 struct aws_crt_statistics_handler *statistics_handler;
66 uint64_t statistics_interval_start_time_ms;
67 struct aws_array_list statistic_list;
68
69 struct {
70 struct aws_linked_list list;
71 } channel_thread_tasks;
72 struct {
73 struct aws_mutex lock;
74 struct aws_linked_list list;
75 struct aws_task scheduling_task;
76 struct shutdown_task shutdown_task;
77 bool is_channel_shut_down;
78 } cross_thread_tasks;
79
80 size_t window_update_batch_emit_threshold;
81 struct aws_channel_task window_update_task;
82 bool read_back_pressure_enabled;
83 bool window_update_in_progress;
84 };
85
86 struct channel_setup_args {
87 struct aws_allocator *alloc;
88 struct aws_channel *channel;
89 aws_channel_on_setup_completed_fn *on_setup_completed;
90 void *user_data;
91 struct aws_task task;
92 };
93
s_on_msg_pool_removed(struct aws_event_loop_local_object * object)94 static void s_on_msg_pool_removed(struct aws_event_loop_local_object *object) {
95 struct aws_message_pool *msg_pool = object->object;
96 AWS_LOGF_TRACE(
97 AWS_LS_IO_CHANNEL,
98 "static: message pool %p has been purged "
99 "from the event-loop: likely because of shutdown",
100 (void *)msg_pool);
101 struct aws_allocator *alloc = msg_pool->alloc;
102 aws_message_pool_clean_up(msg_pool);
103 aws_mem_release(alloc, msg_pool);
104 aws_mem_release(alloc, object);
105 }
106
s_on_channel_setup_complete(struct aws_task * task,void * arg,enum aws_task_status task_status)107 static void s_on_channel_setup_complete(struct aws_task *task, void *arg, enum aws_task_status task_status) {
108
109 (void)task;
110 struct channel_setup_args *setup_args = arg;
111 struct aws_message_pool *message_pool = NULL;
112 struct aws_event_loop_local_object *local_object = NULL;
113
114 AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: setup complete, notifying caller.", (void *)setup_args->channel);
115 if (task_status == AWS_TASK_STATUS_RUN_READY) {
116 struct aws_event_loop_local_object stack_obj;
117 AWS_ZERO_STRUCT(stack_obj);
118 local_object = &stack_obj;
119
120 if (aws_event_loop_fetch_local_object(setup_args->channel->loop, &s_message_pool_key, local_object)) {
121
122 local_object = aws_mem_calloc(setup_args->alloc, 1, sizeof(struct aws_event_loop_local_object));
123 if (!local_object) {
124 goto cleanup_setup_args;
125 }
126
127 message_pool = aws_mem_acquire(setup_args->alloc, sizeof(struct aws_message_pool));
128 if (!message_pool) {
129 goto cleanup_local_obj;
130 }
131
132 AWS_LOGF_DEBUG(
133 AWS_LS_IO_CHANNEL,
134 "id=%p: no message pool is currently stored in the event-loop "
135 "local storage, adding %p with max message size %zu, "
136 "message count 4, with 4 small blocks of 128 bytes.",
137 (void *)setup_args->channel,
138 (void *)message_pool,
139 g_aws_channel_max_fragment_size);
140
141 struct aws_message_pool_creation_args creation_args = {
142 .application_data_msg_data_size = g_aws_channel_max_fragment_size,
143 .application_data_msg_count = 4,
144 .small_block_msg_count = 4,
145 .small_block_msg_data_size = 128,
146 };
147
148 if (aws_message_pool_init(message_pool, setup_args->alloc, &creation_args)) {
149 goto cleanup_msg_pool_mem;
150 }
151
152 local_object->key = &s_message_pool_key;
153 local_object->object = message_pool;
154 local_object->on_object_removed = s_on_msg_pool_removed;
155
156 if (aws_event_loop_put_local_object(setup_args->channel->loop, local_object)) {
157 goto cleanup_msg_pool;
158 }
159 } else {
160 message_pool = local_object->object;
161 AWS_LOGF_DEBUG(
162 AWS_LS_IO_CHANNEL,
163 "id=%p: message pool %p found in event-loop local storage: using it.",
164 (void *)setup_args->channel,
165 (void *)message_pool)
166 }
167
168 setup_args->channel->msg_pool = message_pool;
169 setup_args->channel->channel_state = AWS_CHANNEL_ACTIVE;
170 setup_args->on_setup_completed(setup_args->channel, AWS_OP_SUCCESS, setup_args->user_data);
171 aws_channel_release_hold(setup_args->channel);
172 aws_mem_release(setup_args->alloc, setup_args);
173 return;
174 }
175
176 goto cleanup_setup_args;
177
178 cleanup_msg_pool:
179 aws_message_pool_clean_up(message_pool);
180
181 cleanup_msg_pool_mem:
182 aws_mem_release(setup_args->alloc, message_pool);
183
184 cleanup_local_obj:
185 aws_mem_release(setup_args->alloc, local_object);
186
187 cleanup_setup_args:
188 setup_args->on_setup_completed(setup_args->channel, AWS_OP_ERR, setup_args->user_data);
189 aws_channel_release_hold(setup_args->channel);
190 aws_mem_release(setup_args->alloc, setup_args);
191 }
192
193 static void s_schedule_cross_thread_tasks(struct aws_task *task, void *arg, enum aws_task_status status);
194
s_destroy_partially_constructed_channel(struct aws_channel * channel)195 static void s_destroy_partially_constructed_channel(struct aws_channel *channel) {
196 if (channel == NULL) {
197 return;
198 }
199
200 aws_array_list_clean_up(&channel->statistic_list);
201
202 aws_mem_release(channel->alloc, channel);
203 }
204
aws_channel_new(struct aws_allocator * alloc,const struct aws_channel_options * creation_args)205 struct aws_channel *aws_channel_new(struct aws_allocator *alloc, const struct aws_channel_options *creation_args) {
206 AWS_PRECONDITION(creation_args);
207 AWS_PRECONDITION(creation_args->event_loop);
208 AWS_PRECONDITION(creation_args->on_setup_completed);
209
210 struct aws_channel *channel = aws_mem_calloc(alloc, 1, sizeof(struct aws_channel));
211 if (!channel) {
212 return NULL;
213 }
214
215 AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: Beginning creation and setup of new channel.", (void *)channel);
216 channel->alloc = alloc;
217 channel->loop = creation_args->event_loop;
218 channel->on_shutdown_completed = creation_args->on_shutdown_completed;
219 channel->shutdown_user_data = creation_args->shutdown_user_data;
220
221 if (aws_array_list_init_dynamic(
222 &channel->statistic_list, alloc, INITIAL_STATISTIC_LIST_SIZE, sizeof(struct aws_crt_statistics_base *))) {
223 goto on_error;
224 }
225
226 /* Start refcount at 2:
227 * 1 for self-reference, released from aws_channel_destroy()
228 * 1 for the setup task, released when task executes */
229 aws_atomic_init_int(&channel->refcount, 2);
230
231 struct channel_setup_args *setup_args = aws_mem_calloc(alloc, 1, sizeof(struct channel_setup_args));
232 if (!setup_args) {
233 goto on_error;
234 }
235
236 channel->channel_state = AWS_CHANNEL_SETTING_UP;
237 aws_linked_list_init(&channel->channel_thread_tasks.list);
238 aws_linked_list_init(&channel->cross_thread_tasks.list);
239 channel->cross_thread_tasks.lock = (struct aws_mutex)AWS_MUTEX_INIT;
240
241 if (creation_args->enable_read_back_pressure) {
242 channel->read_back_pressure_enabled = true;
243 /* we probably only need room for one fragment, but let's avoid potential deadlocks
244 * on things like tls that need extra head-room. */
245 channel->window_update_batch_emit_threshold = g_aws_channel_max_fragment_size * 2;
246 }
247
248 aws_task_init(
249 &channel->cross_thread_tasks.scheduling_task,
250 s_schedule_cross_thread_tasks,
251 channel,
252 "schedule_cross_thread_tasks");
253
254 setup_args->alloc = alloc;
255 setup_args->channel = channel;
256 setup_args->on_setup_completed = creation_args->on_setup_completed;
257 setup_args->user_data = creation_args->setup_user_data;
258
259 aws_task_init(&setup_args->task, s_on_channel_setup_complete, setup_args, "on_channel_setup_complete");
260 aws_event_loop_schedule_task_now(creation_args->event_loop, &setup_args->task);
261
262 return channel;
263
264 on_error:
265
266 s_destroy_partially_constructed_channel(channel);
267
268 return NULL;
269 }
270
s_cleanup_slot(struct aws_channel_slot * slot)271 static void s_cleanup_slot(struct aws_channel_slot *slot) {
272 if (slot) {
273 if (slot->handler) {
274 aws_channel_handler_destroy(slot->handler);
275 }
276 aws_mem_release(slot->alloc, slot);
277 }
278 }
279
aws_channel_destroy(struct aws_channel * channel)280 void aws_channel_destroy(struct aws_channel *channel) {
281 AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: destroying channel.", (void *)channel);
282
283 aws_channel_release_hold(channel);
284 }
285
s_final_channel_deletion_task(struct aws_task * task,void * arg,enum aws_task_status status)286 static void s_final_channel_deletion_task(struct aws_task *task, void *arg, enum aws_task_status status) {
287 (void)task;
288 (void)status;
289 struct aws_channel *channel = arg;
290
291 struct aws_channel_slot *current = channel->first;
292
293 if (!current || !current->handler) {
294 /* Allow channels with no valid slots to skip shutdown process */
295 channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
296 }
297
298 AWS_ASSERT(channel->channel_state == AWS_CHANNEL_SHUT_DOWN);
299
300 while (current) {
301 struct aws_channel_slot *tmp = current->adj_right;
302 s_cleanup_slot(current);
303 current = tmp;
304 }
305
306 aws_array_list_clean_up(&channel->statistic_list);
307
308 aws_channel_set_statistics_handler(channel, NULL);
309
310 aws_mem_release(channel->alloc, channel);
311 }
312
aws_channel_acquire_hold(struct aws_channel * channel)313 void aws_channel_acquire_hold(struct aws_channel *channel) {
314 size_t prev_refcount = aws_atomic_fetch_add(&channel->refcount, 1);
315 AWS_ASSERT(prev_refcount != 0);
316 (void)prev_refcount;
317 }
318
aws_channel_release_hold(struct aws_channel * channel)319 void aws_channel_release_hold(struct aws_channel *channel) {
320 size_t prev_refcount = aws_atomic_fetch_sub(&channel->refcount, 1);
321 AWS_ASSERT(prev_refcount != 0);
322
323 if (prev_refcount == 1) {
324 /* Refcount is now 0, finish cleaning up channel memory. */
325 if (aws_channel_thread_is_callers_thread(channel)) {
326 s_final_channel_deletion_task(NULL, channel, AWS_TASK_STATUS_RUN_READY);
327 } else {
328 aws_task_init(&channel->deletion_task, s_final_channel_deletion_task, channel, "final_channel_deletion");
329 aws_event_loop_schedule_task_now(channel->loop, &channel->deletion_task);
330 }
331 }
332 }
333
334 struct channel_shutdown_task_args {
335 struct aws_channel *channel;
336 struct aws_allocator *alloc;
337 int error_code;
338 struct aws_task task;
339 };
340
341 static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately);
342
343 static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status);
344
s_shutdown_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)345 static void s_shutdown_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
346
347 (void)task;
348 (void)status;
349 struct shutdown_task *shutdown_task = arg;
350 struct aws_channel *channel = shutdown_task->channel;
351 int error_code = shutdown_task->error_code;
352 bool shutdown_immediately = shutdown_task->shutdown_immediately;
353 if (channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
354 AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: beginning shutdown process", (void *)channel);
355
356 struct aws_channel_slot *slot = channel->first;
357 channel->channel_state = AWS_CHANNEL_SHUTTING_DOWN;
358
359 if (slot) {
360 AWS_LOGF_TRACE(
361 AWS_LS_IO_CHANNEL,
362 "id=%p: shutting down slot %p (the first one) in the read direction",
363 (void *)channel,
364 (void *)slot);
365
366 aws_channel_slot_shutdown(slot, AWS_CHANNEL_DIR_READ, error_code, shutdown_immediately);
367 return;
368 }
369
370 channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
371 AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: shutdown completed", (void *)channel);
372
373 aws_mutex_lock(&channel->cross_thread_tasks.lock);
374 channel->cross_thread_tasks.is_channel_shut_down = true;
375 aws_mutex_unlock(&channel->cross_thread_tasks.lock);
376
377 if (channel->on_shutdown_completed) {
378 channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
379 channel->shutdown_notify_task.task.arg = channel;
380 channel->shutdown_notify_task.error_code = error_code;
381 aws_event_loop_schedule_task_now(channel->loop, &channel->shutdown_notify_task.task);
382 }
383 }
384 }
385
s_channel_shutdown(struct aws_channel * channel,int error_code,bool shutdown_immediately)386 static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately) {
387 bool need_to_schedule = true;
388 aws_mutex_lock(&channel->cross_thread_tasks.lock);
389 if (channel->cross_thread_tasks.shutdown_task.task.task_fn) {
390 need_to_schedule = false;
391 AWS_LOGF_DEBUG(
392 AWS_LS_IO_CHANNEL, "id=%p: Channel shutdown is already pending, not scheduling another.", (void *)channel);
393
394 } else {
395 aws_channel_task_init(
396 &channel->cross_thread_tasks.shutdown_task.task,
397 s_shutdown_task,
398 &channel->cross_thread_tasks.shutdown_task,
399 "channel_shutdown");
400 channel->cross_thread_tasks.shutdown_task.shutdown_immediately = shutdown_immediately;
401 channel->cross_thread_tasks.shutdown_task.channel = channel;
402 channel->cross_thread_tasks.shutdown_task.error_code = error_code;
403 }
404
405 aws_mutex_unlock(&channel->cross_thread_tasks.lock);
406
407 if (need_to_schedule) {
408 AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: channel shutdown task is scheduled", (void *)channel);
409 aws_channel_schedule_task_now(channel, &channel->cross_thread_tasks.shutdown_task.task);
410 }
411
412 return AWS_OP_SUCCESS;
413 }
414
aws_channel_shutdown(struct aws_channel * channel,int error_code)415 int aws_channel_shutdown(struct aws_channel *channel, int error_code) {
416 return s_channel_shutdown(channel, error_code, false);
417 }
418
aws_channel_acquire_message_from_pool(struct aws_channel * channel,enum aws_io_message_type message_type,size_t size_hint)419 struct aws_io_message *aws_channel_acquire_message_from_pool(
420 struct aws_channel *channel,
421 enum aws_io_message_type message_type,
422 size_t size_hint) {
423
424 struct aws_io_message *message = aws_message_pool_acquire(channel->msg_pool, message_type, size_hint);
425
426 if (AWS_LIKELY(message)) {
427 message->owning_channel = channel;
428 AWS_LOGF_TRACE(
429 AWS_LS_IO_CHANNEL,
430 "id=%p: acquired message %p of capacity %zu from pool %p. Requested size was %zu",
431 (void *)channel,
432 (void *)message,
433 message->message_data.capacity,
434 (void *)channel->msg_pool,
435 size_hint);
436 }
437
438 return message;
439 }
440
aws_channel_slot_new(struct aws_channel * channel)441 struct aws_channel_slot *aws_channel_slot_new(struct aws_channel *channel) {
442 struct aws_channel_slot *new_slot = aws_mem_calloc(channel->alloc, 1, sizeof(struct aws_channel_slot));
443 if (!new_slot) {
444 return NULL;
445 }
446
447 AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: creating new slot %p.", (void *)channel, (void *)new_slot);
448 new_slot->alloc = channel->alloc;
449 new_slot->channel = channel;
450
451 if (!channel->first) {
452 channel->first = new_slot;
453 }
454
455 return new_slot;
456 }
457
aws_channel_current_clock_time(struct aws_channel * channel,uint64_t * time_nanos)458 int aws_channel_current_clock_time(struct aws_channel *channel, uint64_t *time_nanos) {
459 return aws_event_loop_current_clock_time(channel->loop, time_nanos);
460 }
461
aws_channel_fetch_local_object(struct aws_channel * channel,const void * key,struct aws_event_loop_local_object * obj)462 int aws_channel_fetch_local_object(
463 struct aws_channel *channel,
464 const void *key,
465 struct aws_event_loop_local_object *obj) {
466
467 return aws_event_loop_fetch_local_object(channel->loop, (void *)key, obj);
468 }
aws_channel_put_local_object(struct aws_channel * channel,const void * key,const struct aws_event_loop_local_object * obj)469 int aws_channel_put_local_object(
470 struct aws_channel *channel,
471 const void *key,
472 const struct aws_event_loop_local_object *obj) {
473
474 (void)key;
475 return aws_event_loop_put_local_object(channel->loop, (struct aws_event_loop_local_object *)obj);
476 }
477
aws_channel_remove_local_object(struct aws_channel * channel,const void * key,struct aws_event_loop_local_object * removed_obj)478 int aws_channel_remove_local_object(
479 struct aws_channel *channel,
480 const void *key,
481 struct aws_event_loop_local_object *removed_obj) {
482
483 return aws_event_loop_remove_local_object(channel->loop, (void *)key, removed_obj);
484 }
485
s_channel_task_run(struct aws_task * task,void * arg,enum aws_task_status status)486 static void s_channel_task_run(struct aws_task *task, void *arg, enum aws_task_status status) {
487 struct aws_channel_task *channel_task = AWS_CONTAINER_OF(task, struct aws_channel_task, wrapper_task);
488 struct aws_channel *channel = arg;
489
490 /* Any task that runs after shutdown completes is considered canceled */
491 if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
492 status = AWS_TASK_STATUS_CANCELED;
493 }
494
495 aws_linked_list_remove(&channel_task->node);
496 channel_task->task_fn(channel_task, channel_task->arg, status);
497 }
498
s_schedule_cross_thread_tasks(struct aws_task * task,void * arg,enum aws_task_status status)499 static void s_schedule_cross_thread_tasks(struct aws_task *task, void *arg, enum aws_task_status status) {
500 (void)task;
501 struct aws_channel *channel = arg;
502
503 struct aws_linked_list cross_thread_task_list;
504 aws_linked_list_init(&cross_thread_task_list);
505
506 /* Grab contents of cross-thread task list while we have the lock */
507 aws_mutex_lock(&channel->cross_thread_tasks.lock);
508 aws_linked_list_swap_contents(&channel->cross_thread_tasks.list, &cross_thread_task_list);
509 aws_mutex_unlock(&channel->cross_thread_tasks.lock);
510
511 /* If the channel has shut down since the cross-thread tasks were scheduled, run tasks immediately as canceled */
512 if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
513 status = AWS_TASK_STATUS_CANCELED;
514 }
515
516 while (!aws_linked_list_empty(&cross_thread_task_list)) {
517 struct aws_linked_list_node *node = aws_linked_list_pop_front(&cross_thread_task_list);
518 struct aws_channel_task *channel_task = AWS_CONTAINER_OF(node, struct aws_channel_task, node);
519
520 if ((channel_task->wrapper_task.timestamp == 0) || (status == AWS_TASK_STATUS_CANCELED)) {
521 /* Run "now" tasks, and canceled tasks, immediately */
522 channel_task->task_fn(channel_task, channel_task->arg, status);
523 } else {
524 /* "Future" tasks are scheduled with the event-loop. */
525 aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node);
526 aws_event_loop_schedule_task_future(
527 channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp);
528 }
529 }
530 }
531
aws_channel_task_init(struct aws_channel_task * channel_task,aws_channel_task_fn * task_fn,void * arg,const char * type_tag)532 void aws_channel_task_init(
533 struct aws_channel_task *channel_task,
534 aws_channel_task_fn *task_fn,
535 void *arg,
536 const char *type_tag) {
537 AWS_ZERO_STRUCT(*channel_task);
538 channel_task->task_fn = task_fn;
539 channel_task->arg = arg;
540 channel_task->type_tag = type_tag;
541 }
542
543 /* Common functionality for scheduling "now" and "future" tasks.
544 * For "now" tasks, pass 0 for `run_at_nanos` */
s_register_pending_task(struct aws_channel * channel,struct aws_channel_task * channel_task,uint64_t run_at_nanos)545 static void s_register_pending_task(
546 struct aws_channel *channel,
547 struct aws_channel_task *channel_task,
548 uint64_t run_at_nanos) {
549
550 /* Reset every property on channel task other than user's fn & arg.*/
551 aws_task_init(&channel_task->wrapper_task, s_channel_task_run, channel, channel_task->type_tag);
552 channel_task->wrapper_task.timestamp = run_at_nanos;
553 aws_linked_list_node_reset(&channel_task->node);
554
555 if (aws_channel_thread_is_callers_thread(channel)) {
556 AWS_LOGF_TRACE(
557 AWS_LS_IO_CHANNEL,
558 "id=%p: scheduling task with wrapper task id %p.",
559 (void *)channel,
560 (void *)&channel_task->wrapper_task);
561
562 /* If channel is shut down, run task immediately as canceled */
563 if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
564 AWS_LOGF_DEBUG(
565 AWS_LS_IO_CHANNEL,
566 "id=%p: Running %s channel task immediately as canceled due to shut down channel",
567 (void *)channel,
568 channel_task->type_tag);
569 channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED);
570 return;
571 }
572
573 aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node);
574 if (run_at_nanos == 0) {
575 aws_event_loop_schedule_task_now(channel->loop, &channel_task->wrapper_task);
576 } else {
577 aws_event_loop_schedule_task_future(
578 channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp);
579 }
580 return;
581 }
582
583 AWS_LOGF_TRACE(
584 AWS_LS_IO_CHANNEL,
585 "id=%p: scheduling task with wrapper task id %p from "
586 "outside the event-loop thread.",
587 (void *)channel,
588 (void *)&channel_task->wrapper_task);
589 /* Outside event-loop thread... */
590 bool should_cancel_task = false;
591
592 /* Begin Critical Section */
593 aws_mutex_lock(&channel->cross_thread_tasks.lock);
594 if (channel->cross_thread_tasks.is_channel_shut_down) {
595 should_cancel_task = true; /* run task outside critical section to avoid deadlock */
596 } else {
597 bool list_was_empty = aws_linked_list_empty(&channel->cross_thread_tasks.list);
598 aws_linked_list_push_back(&channel->cross_thread_tasks.list, &channel_task->node);
599
600 if (list_was_empty) {
601 aws_event_loop_schedule_task_now(channel->loop, &channel->cross_thread_tasks.scheduling_task);
602 }
603 }
604 aws_mutex_unlock(&channel->cross_thread_tasks.lock);
605 /* End Critical Section */
606
607 if (should_cancel_task) {
608 channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED);
609 }
610 }
611
aws_channel_schedule_task_now(struct aws_channel * channel,struct aws_channel_task * task)612 void aws_channel_schedule_task_now(struct aws_channel *channel, struct aws_channel_task *task) {
613 s_register_pending_task(channel, task, 0);
614 }
615
aws_channel_schedule_task_future(struct aws_channel * channel,struct aws_channel_task * task,uint64_t run_at_nanos)616 void aws_channel_schedule_task_future(
617 struct aws_channel *channel,
618 struct aws_channel_task *task,
619 uint64_t run_at_nanos) {
620
621 s_register_pending_task(channel, task, run_at_nanos);
622 }
623
aws_channel_thread_is_callers_thread(struct aws_channel * channel)624 bool aws_channel_thread_is_callers_thread(struct aws_channel *channel) {
625 return aws_event_loop_thread_is_callers_thread(channel->loop);
626 }
627
s_update_channel_slot_message_overheads(struct aws_channel * channel)628 static void s_update_channel_slot_message_overheads(struct aws_channel *channel) {
629 size_t overhead = 0;
630 struct aws_channel_slot *slot_iter = channel->first;
631 while (slot_iter) {
632 slot_iter->upstream_message_overhead = overhead;
633
634 if (slot_iter->handler) {
635 overhead += slot_iter->handler->vtable->message_overhead(slot_iter->handler);
636 }
637 slot_iter = slot_iter->adj_right;
638 }
639 }
640
aws_channel_slot_set_handler(struct aws_channel_slot * slot,struct aws_channel_handler * handler)641 int aws_channel_slot_set_handler(struct aws_channel_slot *slot, struct aws_channel_handler *handler) {
642 slot->handler = handler;
643 slot->handler->slot = slot;
644 s_update_channel_slot_message_overheads(slot->channel);
645
646 return aws_channel_slot_increment_read_window(slot, slot->handler->vtable->initial_window_size(handler));
647 }
648
aws_channel_slot_remove(struct aws_channel_slot * slot)649 int aws_channel_slot_remove(struct aws_channel_slot *slot) {
650 if (slot->adj_right) {
651 slot->adj_right->adj_left = slot->adj_left;
652
653 if (slot == slot->channel->first) {
654 slot->channel->first = slot->adj_right;
655 }
656 }
657
658 if (slot->adj_left) {
659 slot->adj_left->adj_right = slot->adj_right;
660 }
661
662 if (slot == slot->channel->first) {
663 slot->channel->first = NULL;
664 }
665
666 s_update_channel_slot_message_overheads(slot->channel);
667 s_cleanup_slot(slot);
668 return AWS_OP_SUCCESS;
669 }
670
aws_channel_slot_replace(struct aws_channel_slot * remove,struct aws_channel_slot * new_slot)671 int aws_channel_slot_replace(struct aws_channel_slot *remove, struct aws_channel_slot *new_slot) {
672 new_slot->adj_left = remove->adj_left;
673
674 if (remove->adj_left) {
675 remove->adj_left->adj_right = new_slot;
676 }
677
678 new_slot->adj_right = remove->adj_right;
679
680 if (remove->adj_right) {
681 remove->adj_right->adj_left = new_slot;
682 }
683
684 if (remove == remove->channel->first) {
685 remove->channel->first = new_slot;
686 }
687
688 s_update_channel_slot_message_overheads(remove->channel);
689 s_cleanup_slot(remove);
690 return AWS_OP_SUCCESS;
691 }
692
aws_channel_slot_insert_right(struct aws_channel_slot * slot,struct aws_channel_slot * to_add)693 int aws_channel_slot_insert_right(struct aws_channel_slot *slot, struct aws_channel_slot *to_add) {
694 to_add->adj_right = slot->adj_right;
695
696 if (slot->adj_right) {
697 slot->adj_right->adj_left = to_add;
698 }
699
700 slot->adj_right = to_add;
701 to_add->adj_left = slot;
702
703 return AWS_OP_SUCCESS;
704 }
705
aws_channel_slot_insert_end(struct aws_channel * channel,struct aws_channel_slot * to_add)706 int aws_channel_slot_insert_end(struct aws_channel *channel, struct aws_channel_slot *to_add) {
707 /* It's actually impossible there's not a first if the user went through the aws_channel_slot_new() function.
708 * But also check that a user didn't call insert_end if it's the first slot in the channel since first would already
709 * have been set. */
710 if (AWS_LIKELY(channel->first && channel->first != to_add)) {
711 struct aws_channel_slot *cur = channel->first;
712 while (cur->adj_right) {
713 cur = cur->adj_right;
714 }
715
716 return aws_channel_slot_insert_right(cur, to_add);
717 }
718
719 AWS_ASSERT(0);
720 return AWS_OP_ERR;
721 }
722
aws_channel_slot_insert_left(struct aws_channel_slot * slot,struct aws_channel_slot * to_add)723 int aws_channel_slot_insert_left(struct aws_channel_slot *slot, struct aws_channel_slot *to_add) {
724 to_add->adj_left = slot->adj_left;
725
726 if (slot->adj_left) {
727 slot->adj_left->adj_right = to_add;
728 }
729
730 slot->adj_left = to_add;
731 to_add->adj_right = slot;
732
733 if (slot == slot->channel->first) {
734 slot->channel->first = to_add;
735 }
736
737 return AWS_OP_SUCCESS;
738 }
739
aws_channel_slot_send_message(struct aws_channel_slot * slot,struct aws_io_message * message,enum aws_channel_direction dir)740 int aws_channel_slot_send_message(
741 struct aws_channel_slot *slot,
742 struct aws_io_message *message,
743 enum aws_channel_direction dir) {
744
745 if (dir == AWS_CHANNEL_DIR_READ) {
746 AWS_ASSERT(slot->adj_right);
747 AWS_ASSERT(slot->adj_right->handler);
748
749 if (!slot->channel->read_back_pressure_enabled || slot->adj_right->window_size >= message->message_data.len) {
750 AWS_LOGF_TRACE(
751 AWS_LS_IO_CHANNEL,
752 "id=%p: sending read message of size %zu, "
753 "from slot %p to slot %p with handler %p.",
754 (void *)slot->channel,
755 message->message_data.len,
756 (void *)slot,
757 (void *)slot->adj_right,
758 (void *)slot->adj_right->handler);
759 slot->adj_right->window_size -= message->message_data.len;
760 return aws_channel_handler_process_read_message(slot->adj_right->handler, slot->adj_right, message);
761 }
762 AWS_LOGF_ERROR(
763 AWS_LS_IO_CHANNEL,
764 "id=%p: sending message of size %zu, "
765 "from slot %p to slot %p with handler %p, but this would exceed the channel's "
766 "read window, this is always a programming error.",
767 (void *)slot->channel,
768 message->message_data.len,
769 (void *)slot,
770 (void *)slot->adj_right,
771 (void *)slot->adj_right->handler);
772 return aws_raise_error(AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW);
773 }
774
775 AWS_ASSERT(slot->adj_left);
776 AWS_ASSERT(slot->adj_left->handler);
777 AWS_LOGF_TRACE(
778 AWS_LS_IO_CHANNEL,
779 "id=%p: sending write message of size %zu, "
780 "from slot %p to slot %p with handler %p.",
781 (void *)slot->channel,
782 message->message_data.len,
783 (void *)slot,
784 (void *)slot->adj_left,
785 (void *)slot->adj_left->handler);
786 return aws_channel_handler_process_write_message(slot->adj_left->handler, slot->adj_left, message);
787 }
788
aws_channel_slot_acquire_max_message_for_write(struct aws_channel_slot * slot)789 struct aws_io_message *aws_channel_slot_acquire_max_message_for_write(struct aws_channel_slot *slot) {
790 AWS_PRECONDITION(slot);
791 AWS_PRECONDITION(slot->channel);
792 AWS_PRECONDITION(aws_channel_thread_is_callers_thread(slot->channel));
793
794 const size_t overhead = aws_channel_slot_upstream_message_overhead(slot);
795 if (overhead >= g_aws_channel_max_fragment_size) {
796 AWS_LOGF_ERROR(
797 AWS_LS_IO_CHANNEL, "id=%p: Upstream overhead exceeds channel's max message size.", (void *)slot->channel);
798 aws_raise_error(AWS_ERROR_INVALID_STATE);
799 return NULL;
800 }
801
802 const size_t size_hint = g_aws_channel_max_fragment_size - overhead;
803 return aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, size_hint);
804 }
805
s_window_update_task(struct aws_channel_task * channel_task,void * arg,enum aws_task_status status)806 static void s_window_update_task(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
807 (void)channel_task;
808 struct aws_channel *channel = arg;
809
810 if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
811 /* get the right-most slot to start the updates. */
812 struct aws_channel_slot *slot = channel->first;
813 while (slot->adj_right) {
814 slot = slot->adj_right;
815 }
816
817 while (slot->adj_left) {
818 struct aws_channel_slot *upstream_slot = slot->adj_left;
819 if (upstream_slot->handler) {
820 slot->window_size = aws_add_size_saturating(slot->window_size, slot->current_window_update_batch_size);
821 size_t update_size = slot->current_window_update_batch_size;
822 slot->current_window_update_batch_size = 0;
823 if (aws_channel_handler_increment_read_window(upstream_slot->handler, upstream_slot, update_size)) {
824 AWS_LOGF_ERROR(
825 AWS_LS_IO_CHANNEL,
826 "channel %p: channel update task failed with status %d",
827 (void *)slot->channel,
828 aws_last_error());
829 slot->channel->window_update_in_progress = false;
830 aws_channel_shutdown(channel, aws_last_error());
831 return;
832 }
833 }
834 slot = slot->adj_left;
835 }
836 }
837 channel->window_update_in_progress = false;
838 }
839
aws_channel_slot_increment_read_window(struct aws_channel_slot * slot,size_t window)840 int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window) {
841
842 if (slot->channel->read_back_pressure_enabled && slot->channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
843 slot->current_window_update_batch_size =
844 aws_add_size_saturating(slot->current_window_update_batch_size, window);
845
846 if (!slot->channel->window_update_in_progress &&
847 slot->window_size <= slot->channel->window_update_batch_emit_threshold) {
848 slot->channel->window_update_in_progress = true;
849 aws_channel_task_init(
850 &slot->channel->window_update_task, s_window_update_task, slot->channel, "window update task");
851 aws_channel_schedule_task_now(slot->channel, &slot->channel->window_update_task);
852 }
853 }
854
855 return AWS_OP_SUCCESS;
856 }
857
aws_channel_slot_shutdown(struct aws_channel_slot * slot,enum aws_channel_direction dir,int err_code,bool free_scarce_resources_immediately)858 int aws_channel_slot_shutdown(
859 struct aws_channel_slot *slot,
860 enum aws_channel_direction dir,
861 int err_code,
862 bool free_scarce_resources_immediately) {
863 AWS_ASSERT(slot->handler);
864 AWS_LOGF_TRACE(
865 AWS_LS_IO_CHANNEL,
866 "id=%p: shutting down slot %p, with handler %p "
867 "in %s direction with error code %d",
868 (void *)slot->channel,
869 (void *)slot,
870 (void *)slot->handler,
871 (dir == AWS_CHANNEL_DIR_READ) ? "read" : "write",
872 err_code);
873 return aws_channel_handler_shutdown(slot->handler, slot, dir, err_code, free_scarce_resources_immediately);
874 }
875
s_on_shutdown_completion_task(struct aws_task * task,void * arg,enum aws_task_status status)876 static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status) {
877 (void)status;
878
879 struct aws_shutdown_notification_task *shutdown_notify = (struct aws_shutdown_notification_task *)task;
880 struct aws_channel *channel = arg;
881 AWS_ASSERT(channel->channel_state == AWS_CHANNEL_SHUT_DOWN);
882
883 /* Cancel tasks that have been scheduled with the event loop */
884 while (!aws_linked_list_empty(&channel->channel_thread_tasks.list)) {
885 struct aws_linked_list_node *node = aws_linked_list_front(&channel->channel_thread_tasks.list);
886 struct aws_channel_task *channel_task = AWS_CONTAINER_OF(node, struct aws_channel_task, node);
887 AWS_LOGF_DEBUG(
888 AWS_LS_IO_CHANNEL,
889 "id=%p: during shutdown, canceling task %p",
890 (void *)channel,
891 (void *)&channel_task->wrapper_task);
892 /* The task will remove itself from the list when it's canceled */
893 aws_event_loop_cancel_task(channel->loop, &channel_task->wrapper_task);
894 }
895
896 /* Cancel off-thread tasks, which haven't made it to the event-loop thread yet */
897 aws_mutex_lock(&channel->cross_thread_tasks.lock);
898 bool cancel_cross_thread_tasks = !aws_linked_list_empty(&channel->cross_thread_tasks.list);
899 aws_mutex_unlock(&channel->cross_thread_tasks.lock);
900
901 if (cancel_cross_thread_tasks) {
902 aws_event_loop_cancel_task(channel->loop, &channel->cross_thread_tasks.scheduling_task);
903 }
904
905 AWS_ASSERT(aws_linked_list_empty(&channel->channel_thread_tasks.list));
906 AWS_ASSERT(aws_linked_list_empty(&channel->cross_thread_tasks.list));
907
908 channel->on_shutdown_completed(channel, shutdown_notify->error_code, channel->shutdown_user_data);
909 }
910
s_run_shutdown_write_direction(struct aws_task * task,void * arg,enum aws_task_status status)911 static void s_run_shutdown_write_direction(struct aws_task *task, void *arg, enum aws_task_status status) {
912 (void)arg;
913 (void)status;
914
915 struct aws_shutdown_notification_task *shutdown_notify = (struct aws_shutdown_notification_task *)task;
916 task->fn = NULL;
917 task->arg = NULL;
918 struct aws_channel_slot *slot = shutdown_notify->slot;
919 aws_channel_handler_shutdown(
920 slot->handler, slot, AWS_CHANNEL_DIR_WRITE, shutdown_notify->error_code, shutdown_notify->shutdown_immediately);
921 }
922
aws_channel_slot_on_handler_shutdown_complete(struct aws_channel_slot * slot,enum aws_channel_direction dir,int err_code,bool free_scarce_resources_immediately)923 int aws_channel_slot_on_handler_shutdown_complete(
924 struct aws_channel_slot *slot,
925 enum aws_channel_direction dir,
926 int err_code,
927 bool free_scarce_resources_immediately) {
928
929 AWS_LOGF_DEBUG(
930 AWS_LS_IO_CHANNEL,
931 "id=%p: handler %p shutdown in %s dir completed.",
932 (void *)slot->channel,
933 (void *)slot->handler,
934 (dir == AWS_CHANNEL_DIR_READ) ? "read" : "write");
935
936 if (slot->channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
937 return AWS_OP_SUCCESS;
938 }
939
940 if (dir == AWS_CHANNEL_DIR_READ) {
941 if (slot->adj_right && slot->adj_right->handler) {
942 return aws_channel_handler_shutdown(
943 slot->adj_right->handler, slot->adj_right, dir, err_code, free_scarce_resources_immediately);
944 }
945
946 /* break the shutdown sequence so we don't have handlers having to deal with their memory disappearing out from
947 * under them during a shutdown process. */
948 slot->channel->shutdown_notify_task.slot = slot;
949 slot->channel->shutdown_notify_task.shutdown_immediately = free_scarce_resources_immediately;
950 slot->channel->shutdown_notify_task.error_code = err_code;
951 slot->channel->shutdown_notify_task.task.fn = s_run_shutdown_write_direction;
952 slot->channel->shutdown_notify_task.task.arg = NULL;
953
954 aws_event_loop_schedule_task_now(slot->channel->loop, &slot->channel->shutdown_notify_task.task);
955 return AWS_OP_SUCCESS;
956 }
957
958 if (slot->adj_left && slot->adj_left->handler) {
959 return aws_channel_handler_shutdown(
960 slot->adj_left->handler, slot->adj_left, dir, err_code, free_scarce_resources_immediately);
961 }
962
963 if (slot->channel->first == slot) {
964 slot->channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
965 aws_mutex_lock(&slot->channel->cross_thread_tasks.lock);
966 slot->channel->cross_thread_tasks.is_channel_shut_down = true;
967 aws_mutex_unlock(&slot->channel->cross_thread_tasks.lock);
968
969 if (slot->channel->on_shutdown_completed) {
970 slot->channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
971 slot->channel->shutdown_notify_task.task.arg = slot->channel;
972 slot->channel->shutdown_notify_task.error_code = err_code;
973 aws_event_loop_schedule_task_now(slot->channel->loop, &slot->channel->shutdown_notify_task.task);
974 }
975 }
976
977 return AWS_OP_SUCCESS;
978 }
979
aws_channel_slot_downstream_read_window(struct aws_channel_slot * slot)980 size_t aws_channel_slot_downstream_read_window(struct aws_channel_slot *slot) {
981 AWS_ASSERT(slot->adj_right);
982 return slot->channel->read_back_pressure_enabled ? slot->adj_right->window_size : SIZE_MAX;
983 }
984
aws_channel_slot_upstream_message_overhead(struct aws_channel_slot * slot)985 size_t aws_channel_slot_upstream_message_overhead(struct aws_channel_slot *slot) {
986 return slot->upstream_message_overhead;
987 }
988
aws_channel_handler_destroy(struct aws_channel_handler * handler)989 void aws_channel_handler_destroy(struct aws_channel_handler *handler) {
990 AWS_ASSERT(handler->vtable && handler->vtable->destroy);
991 handler->vtable->destroy(handler);
992 }
993
aws_channel_handler_process_read_message(struct aws_channel_handler * handler,struct aws_channel_slot * slot,struct aws_io_message * message)994 int aws_channel_handler_process_read_message(
995 struct aws_channel_handler *handler,
996 struct aws_channel_slot *slot,
997 struct aws_io_message *message) {
998
999 AWS_ASSERT(handler->vtable && handler->vtable->process_read_message);
1000 return handler->vtable->process_read_message(handler, slot, message);
1001 }
1002
aws_channel_handler_process_write_message(struct aws_channel_handler * handler,struct aws_channel_slot * slot,struct aws_io_message * message)1003 int aws_channel_handler_process_write_message(
1004 struct aws_channel_handler *handler,
1005 struct aws_channel_slot *slot,
1006 struct aws_io_message *message) {
1007
1008 AWS_ASSERT(handler->vtable && handler->vtable->process_write_message);
1009 return handler->vtable->process_write_message(handler, slot, message);
1010 }
1011
aws_channel_handler_increment_read_window(struct aws_channel_handler * handler,struct aws_channel_slot * slot,size_t size)1012 int aws_channel_handler_increment_read_window(
1013 struct aws_channel_handler *handler,
1014 struct aws_channel_slot *slot,
1015 size_t size) {
1016
1017 AWS_ASSERT(handler->vtable && handler->vtable->increment_read_window);
1018
1019 return handler->vtable->increment_read_window(handler, slot, size);
1020 }
1021
aws_channel_handler_shutdown(struct aws_channel_handler * handler,struct aws_channel_slot * slot,enum aws_channel_direction dir,int error_code,bool free_scarce_resources_immediately)1022 int aws_channel_handler_shutdown(
1023 struct aws_channel_handler *handler,
1024 struct aws_channel_slot *slot,
1025 enum aws_channel_direction dir,
1026 int error_code,
1027 bool free_scarce_resources_immediately) {
1028
1029 AWS_ASSERT(handler->vtable && handler->vtable->shutdown);
1030 return handler->vtable->shutdown(handler, slot, dir, error_code, free_scarce_resources_immediately);
1031 }
1032
aws_channel_handler_initial_window_size(struct aws_channel_handler * handler)1033 size_t aws_channel_handler_initial_window_size(struct aws_channel_handler *handler) {
1034 AWS_ASSERT(handler->vtable && handler->vtable->initial_window_size);
1035 return handler->vtable->initial_window_size(handler);
1036 }
1037
aws_channel_get_first_slot(struct aws_channel * channel)1038 struct aws_channel_slot *aws_channel_get_first_slot(struct aws_channel *channel) {
1039 return channel->first;
1040 }
1041
s_reset_statistics(struct aws_channel * channel)1042 static void s_reset_statistics(struct aws_channel *channel) {
1043 AWS_FATAL_ASSERT(aws_channel_thread_is_callers_thread(channel));
1044
1045 struct aws_channel_slot *current_slot = channel->first;
1046 while (current_slot) {
1047 struct aws_channel_handler *handler = current_slot->handler;
1048 if (handler != NULL && handler->vtable->reset_statistics != NULL) {
1049 handler->vtable->reset_statistics(handler);
1050 }
1051 current_slot = current_slot->adj_right;
1052 }
1053 }
1054
s_channel_gather_statistics_task(struct aws_task * task,void * arg,enum aws_task_status status)1055 static void s_channel_gather_statistics_task(struct aws_task *task, void *arg, enum aws_task_status status) {
1056 if (status != AWS_TASK_STATUS_RUN_READY) {
1057 return;
1058 }
1059
1060 struct aws_channel *channel = arg;
1061 if (channel->statistics_handler == NULL) {
1062 return;
1063 }
1064
1065 if (channel->channel_state == AWS_CHANNEL_SHUTTING_DOWN || channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
1066 return;
1067 }
1068
1069 uint64_t now_ns = 0;
1070 if (aws_channel_current_clock_time(channel, &now_ns)) {
1071 return;
1072 }
1073
1074 uint64_t now_ms = aws_timestamp_convert(now_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
1075
1076 struct aws_array_list *statistics_list = &channel->statistic_list;
1077 aws_array_list_clear(statistics_list);
1078
1079 struct aws_channel_slot *current_slot = channel->first;
1080 while (current_slot) {
1081 struct aws_channel_handler *handler = current_slot->handler;
1082 if (handler != NULL && handler->vtable->gather_statistics != NULL) {
1083 handler->vtable->gather_statistics(handler, statistics_list);
1084 }
1085 current_slot = current_slot->adj_right;
1086 }
1087
1088 struct aws_crt_statistics_sample_interval sample_interval = {
1089 .begin_time_ms = channel->statistics_interval_start_time_ms, .end_time_ms = now_ms};
1090
1091 aws_crt_statistics_handler_process_statistics(
1092 channel->statistics_handler, &sample_interval, statistics_list, channel);
1093
1094 s_reset_statistics(channel);
1095
1096 uint64_t reschedule_interval_ns = aws_timestamp_convert(
1097 aws_crt_statistics_handler_get_report_interval_ms(channel->statistics_handler),
1098 AWS_TIMESTAMP_MILLIS,
1099 AWS_TIMESTAMP_NANOS,
1100 NULL);
1101
1102 aws_event_loop_schedule_task_future(channel->loop, task, now_ns + reschedule_interval_ns);
1103
1104 channel->statistics_interval_start_time_ms = now_ms;
1105 }
1106
aws_channel_set_statistics_handler(struct aws_channel * channel,struct aws_crt_statistics_handler * handler)1107 int aws_channel_set_statistics_handler(struct aws_channel *channel, struct aws_crt_statistics_handler *handler) {
1108 AWS_FATAL_ASSERT(aws_channel_thread_is_callers_thread(channel));
1109
1110 if (channel->statistics_handler) {
1111 aws_crt_statistics_handler_destroy(channel->statistics_handler);
1112 aws_event_loop_cancel_task(channel->loop, &channel->statistics_task);
1113 channel->statistics_handler = NULL;
1114 }
1115
1116 if (handler != NULL) {
1117 aws_task_init(&channel->statistics_task, s_channel_gather_statistics_task, channel, "gather_statistics");
1118
1119 uint64_t now_ns = 0;
1120 if (aws_channel_current_clock_time(channel, &now_ns)) {
1121 return AWS_OP_ERR;
1122 }
1123
1124 uint64_t report_time_ns = now_ns + aws_timestamp_convert(
1125 aws_crt_statistics_handler_get_report_interval_ms(handler),
1126 AWS_TIMESTAMP_MILLIS,
1127 AWS_TIMESTAMP_NANOS,
1128 NULL);
1129
1130 channel->statistics_interval_start_time_ms =
1131 aws_timestamp_convert(now_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
1132 s_reset_statistics(channel);
1133
1134 aws_event_loop_schedule_task_future(channel->loop, &channel->statistics_task, report_time_ns);
1135 }
1136
1137 channel->statistics_handler = handler;
1138
1139 return AWS_OP_SUCCESS;
1140 }
1141
aws_channel_get_event_loop(struct aws_channel * channel)1142 struct aws_event_loop *aws_channel_get_event_loop(struct aws_channel *channel) {
1143 return channel->loop;
1144 }
1145
aws_channel_trigger_read(struct aws_channel * channel)1146 int aws_channel_trigger_read(struct aws_channel *channel) {
1147 if (channel == NULL) {
1148 return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
1149 }
1150
1151 if (!aws_channel_thread_is_callers_thread(channel)) {
1152 return aws_raise_error(AWS_ERROR_INVALID_STATE);
1153 }
1154
1155 struct aws_channel_slot *slot = channel->first;
1156 if (slot == NULL) {
1157 return aws_raise_error(AWS_ERROR_INVALID_STATE);
1158 }
1159
1160 struct aws_channel_handler *handler = slot->handler;
1161 if (handler == NULL) {
1162 return aws_raise_error(AWS_ERROR_INVALID_STATE);
1163 }
1164
1165 if (handler->vtable->trigger_read != NULL) {
1166 handler->vtable->trigger_read(handler);
1167 }
1168
1169 return AWS_OP_SUCCESS;
1170 }
1171