1 #ifndef AWS_IO_CHANNEL_H 2 #define AWS_IO_CHANNEL_H 3 /** 4 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 5 * SPDX-License-Identifier: Apache-2.0. 6 */ 7 8 #include <aws/io/io.h> 9 10 #include <aws/common/statistics.h> 11 #include <aws/common/task_scheduler.h> 12 13 enum aws_channel_direction { 14 AWS_CHANNEL_DIR_READ, 15 AWS_CHANNEL_DIR_WRITE, 16 }; 17 18 struct aws_channel; 19 struct aws_channel_slot; 20 struct aws_channel_handler; 21 struct aws_event_loop; 22 struct aws_event_loop_local_object; 23 24 typedef void(aws_channel_on_setup_completed_fn)(struct aws_channel *channel, int error_code, void *user_data); 25 26 /* Callback called when a channel is completely shutdown. error_code refers to the reason the channel was closed. */ 27 typedef void(aws_channel_on_shutdown_completed_fn)(struct aws_channel *channel, int error_code, void *user_data); 28 29 struct aws_channel_slot { 30 struct aws_allocator *alloc; 31 struct aws_channel *channel; 32 struct aws_channel_slot *adj_left; 33 struct aws_channel_slot *adj_right; 34 struct aws_channel_handler *handler; 35 size_t window_size; 36 size_t upstream_message_overhead; 37 size_t current_window_update_batch_size; 38 }; 39 40 struct aws_channel_task; 41 typedef void(aws_channel_task_fn)(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status); 42 43 struct aws_channel_task { 44 struct aws_task wrapper_task; 45 aws_channel_task_fn *task_fn; 46 void *arg; 47 const char *type_tag; 48 struct aws_linked_list_node node; 49 }; 50 51 struct aws_channel_handler_vtable { 52 /** 53 * Called by the channel when a message is available for processing in the read direction. It is your 54 * responsibility to call aws_mem_release(message->allocator, message); on message when you are finished with it. 55 * 56 * Also keep in mind that your slot's internal window has been decremented. You'll want to call 57 * aws_channel_slot_increment_read_window() at some point in the future if you want to keep receiving data. 58 */ 59 int (*process_read_message)( 60 struct aws_channel_handler *handler, 61 struct aws_channel_slot *slot, 62 struct aws_io_message *message); 63 /** 64 * Called by the channel when a message is available for processing in the write direction. It is your 65 * responsibility to call aws_mem_release(message->allocator, message); on message when you are finished with it. 66 */ 67 int (*process_write_message)( 68 struct aws_channel_handler *handler, 69 struct aws_channel_slot *slot, 70 struct aws_io_message *message); 71 /** 72 * Called by the channel when a downstream handler has issued a window increment. You'll want to update your 73 * internal state and likely propagate a window increment message of your own by calling 74 * 'aws_channel_slot_increment_read_window()' 75 */ 76 int (*increment_read_window)(struct aws_channel_handler *handler, struct aws_channel_slot *slot, size_t size); 77 78 /** 79 * The channel calls shutdown on all handlers twice, once to shut down reading, and once to shut down writing. 80 * Shutdown always begins with the left-most handler, and proceeds to the right with dir set to 81 * AWS_CHANNEL_DIR_READ. Then shutdown is called on handlers from right to left with dir set to 82 * AWS_CHANNEL_DIR_WRITE. 83 * 84 * The shutdown process does not need to complete immediately and may rely on scheduled tasks. 85 * The handler must call aws_channel_slot_on_handler_shutdown_complete() when it is finished, 86 * which propagates shutdown to the next handler. If 'free_scarce_resources_immediately' is true, 87 * then resources vulnerable to denial-of-service attacks (such as sockets and file handles) 88 * must be closed immediately before the shutdown() call returns. 89 */ 90 int (*shutdown)( 91 struct aws_channel_handler *handler, 92 struct aws_channel_slot *slot, 93 enum aws_channel_direction dir, 94 int error_code, 95 bool free_scarce_resources_immediately); 96 97 /** 98 * Called by the channel when the handler is added to a slot, to get the initial window size. 99 */ 100 size_t (*initial_window_size)(struct aws_channel_handler *handler); 101 102 /** Called by the channel anytime a handler is added or removed, provides a hint for downstream 103 * handlers to avoid message fragmentation due to message overhead. */ 104 size_t (*message_overhead)(struct aws_channel_handler *handler); 105 106 /** 107 * Clean up any resources and deallocate yourself. The shutdown process will already be completed before this 108 * function is called. 109 */ 110 void (*destroy)(struct aws_channel_handler *handler); 111 112 /** 113 * Directs the channel handler to reset all of the internal statistics it tracks about itself. 114 */ 115 void (*reset_statistics)(struct aws_channel_handler *handler); 116 117 /** 118 * Adds a pointer to the handler's internal statistics (if they exist) to a list of statistics structures 119 * associated with the channel's handler chain. 120 */ 121 void (*gather_statistics)(struct aws_channel_handler *handler, struct aws_array_list *stats_list); 122 123 /* 124 * If this handler represents a source of data (like the socket_handler), then this will trigger a read 125 * from the data source. 126 */ 127 void (*trigger_read)(struct aws_channel_handler *handler); 128 }; 129 130 struct aws_channel_handler { 131 struct aws_channel_handler_vtable *vtable; 132 struct aws_allocator *alloc; 133 struct aws_channel_slot *slot; 134 void *impl; 135 }; 136 137 /** 138 * Args for creating a new channel. 139 * event_loop to use for IO and tasks. on_setup_completed will be invoked when 140 * the setup process is finished It will be executed in the event loop's thread. 141 * on_shutdown_completed will be executed upon channel shutdown. 142 * 143 * enable_read_back_pressure toggles whether or not back pressure will be applied in the channel. 144 * Leave this option off unless you're using something like reactive-streams, since it is a slight throughput 145 * penalty. 146 * 147 * Unless otherwise 148 * specified all functions for channels and channel slots must be executed within that channel's event-loop's thread. 149 **/ 150 struct aws_channel_options { 151 struct aws_event_loop *event_loop; 152 aws_channel_on_setup_completed_fn *on_setup_completed; 153 aws_channel_on_shutdown_completed_fn *on_shutdown_completed; 154 void *setup_user_data; 155 void *shutdown_user_data; 156 bool enable_read_back_pressure; 157 }; 158 159 AWS_EXTERN_C_BEGIN 160 161 extern AWS_IO_API size_t g_aws_channel_max_fragment_size; 162 163 /** 164 * Initializes channel_task for use. 165 */ 166 AWS_IO_API 167 void aws_channel_task_init( 168 struct aws_channel_task *channel_task, 169 aws_channel_task_fn *task_fn, 170 void *arg, 171 const char *type_tag); 172 173 /** 174 * Allocates new channel, Unless otherwise specified all functions for channels and channel slots must be executed 175 * within that channel's event-loop's thread. channel_options are copied. 176 */ 177 AWS_IO_API 178 struct aws_channel *aws_channel_new(struct aws_allocator *allocator, const struct aws_channel_options *creation_args); 179 180 /** 181 * Mark the channel, along with all slots and handlers, for destruction. 182 * Must be called after shutdown has completed. 183 * Can be called from any thread assuming 'aws_channel_shutdown()' has completed. 184 * Note that memory will not be freed until all users which acquired holds on the channel via 185 * aws_channel_acquire_hold(), release them via aws_channel_release_hold(). 186 */ 187 AWS_IO_API 188 void aws_channel_destroy(struct aws_channel *channel); 189 190 /** 191 * Initiates shutdown of the channel. Shutdown will begin with the left-most slot. Each handler will invoke 192 * 'aws_channel_slot_on_handler_shutdown_complete' once they've finished their shutdown process for the read direction. 193 * Once the right-most slot has shutdown in the read direction, the process will start shutting down starting on the 194 * right-most slot. Once the left-most slot has shutdown in the write direction, 'callbacks->shutdown_completed' will be 195 * invoked in the event loop's thread. 196 * 197 * This function can be called from any thread. 198 */ 199 AWS_IO_API 200 int aws_channel_shutdown(struct aws_channel *channel, int error_code); 201 202 /** 203 * Prevent a channel's memory from being freed. 204 * Any number of users may acquire a hold to prevent a channel and its handlers from being unexpectedly freed. 205 * Any user which acquires a hold must release it via aws_channel_release_hold(). 206 * Memory will be freed once all holds are released and aws_channel_destroy() has been called. 207 */ 208 AWS_IO_API 209 void aws_channel_acquire_hold(struct aws_channel *channel); 210 211 /** 212 * Release a hold on the channel's memory, allowing it to be freed. 213 * This may be called before or after aws_channel_destroy(). 214 */ 215 AWS_IO_API 216 void aws_channel_release_hold(struct aws_channel *channel); 217 218 /** 219 * Allocates and initializes a new slot for use with the channel. If this is the first slot in the channel, it will 220 * automatically be added to the channel as the first slot. For all subsequent calls on a given channel, the slot will 221 * need to be added to the channel via. the aws_channel_slot_insert_right(), aws_channel_slot_insert_end(), and 222 * aws_channel_slot_insert_left() APIs. 223 */ 224 AWS_IO_API 225 struct aws_channel_slot *aws_channel_slot_new(struct aws_channel *channel); 226 227 /** 228 * Fetches the event loop the channel is a part of. 229 */ 230 AWS_IO_API 231 struct aws_event_loop *aws_channel_get_event_loop(struct aws_channel *channel); 232 233 /** 234 * Fetches the current timestamp from the event-loop's clock, in nanoseconds. 235 */ 236 AWS_IO_API 237 int aws_channel_current_clock_time(struct aws_channel *channel, uint64_t *time_nanos); 238 239 /** 240 * Retrieves an object by key from the event loop's local storage. 241 */ 242 AWS_IO_API 243 int aws_channel_fetch_local_object( 244 struct aws_channel *channel, 245 const void *key, 246 struct aws_event_loop_local_object *obj); 247 248 /** 249 * Stores an object by key in the event loop's local storage. 250 */ 251 AWS_IO_API 252 int aws_channel_put_local_object( 253 struct aws_channel *channel, 254 const void *key, 255 const struct aws_event_loop_local_object *obj); 256 257 /** 258 * Removes an object by key from the event loop's local storage. 259 */ 260 AWS_IO_API 261 int aws_channel_remove_local_object( 262 struct aws_channel *channel, 263 const void *key, 264 struct aws_event_loop_local_object *removed_obj); 265 266 /** 267 * Acquires a message from the event loop's message pool. size_hint is merely a hint, it may be smaller than you 268 * requested and you are responsible for checking the bounds of it. If the returned message is not large enough, you 269 * must send multiple messages. 270 */ 271 AWS_IO_API 272 struct aws_io_message *aws_channel_acquire_message_from_pool( 273 struct aws_channel *channel, 274 enum aws_io_message_type message_type, 275 size_t size_hint); 276 277 /** 278 * Schedules a task to run on the event loop as soon as possible. 279 * This is the ideal way to move a task into the correct thread. It's also handy for context switches. 280 * This function is safe to call from any thread. 281 * 282 * The task should not be cleaned up or modified until its function is executed. 283 */ 284 AWS_IO_API 285 void aws_channel_schedule_task_now(struct aws_channel *channel, struct aws_channel_task *task); 286 287 /** 288 * Schedules a task to run on the event loop at the specified time. 289 * This is the ideal way to move a task into the correct thread. It's also handy for context switches. 290 * Use aws_channel_current_clock_time() to get the current time in nanoseconds. 291 * This function is safe to call from any thread. 292 * 293 * The task should not be cleaned up or modified until its function is executed. 294 */ 295 AWS_IO_API 296 void aws_channel_schedule_task_future( 297 struct aws_channel *channel, 298 struct aws_channel_task *task, 299 uint64_t run_at_nanos); 300 301 /** 302 * Instrument a channel with a statistics handler. While instrumented with a statistics handler, the channel 303 * will periodically report per-channel-handler-specific statistics about handler performance and state. 304 * 305 * Assigning a statistics handler to a channel is a transfer of ownership -- the channel will clean up 306 * the handler appropriately. Statistics handlers may be changed dynamically (for example, the upgrade 307 * from a vanilla http channel to a websocket channel), but this function may only be called from the 308 * event loop thread that the channel is a part of. 309 * 310 * The first possible hook to set a statistics handler is the channel's creation callback. 311 */ 312 AWS_IO_API 313 int aws_channel_set_statistics_handler(struct aws_channel *channel, struct aws_crt_statistics_handler *handler); 314 315 /** 316 * Returns true if the caller is on the event loop's thread. If false, you likely need to use 317 * aws_channel_schedule_task(). This function is safe to call from any thread. 318 */ 319 AWS_IO_API 320 bool aws_channel_thread_is_callers_thread(struct aws_channel *channel); 321 322 /** 323 * Sets the handler for a slot, the slot will also call get_current_window_size() and propagate a window update 324 * upstream. 325 */ 326 AWS_IO_API 327 int aws_channel_slot_set_handler(struct aws_channel_slot *slot, struct aws_channel_handler *handler); 328 329 /** 330 * Removes slot from the channel and deallocates the slot and its handler. 331 */ 332 AWS_IO_API 333 int aws_channel_slot_remove(struct aws_channel_slot *slot); 334 335 /** 336 * Replaces remove with new_slot. Deallocates remove and its handler. 337 */ 338 AWS_IO_API 339 int aws_channel_slot_replace(struct aws_channel_slot *remove, struct aws_channel_slot *new_slot); 340 341 /** 342 * inserts 'to_add' to the position immediately to the right of slot. Note that the first call to 343 * aws_channel_slot_new() adds it to the channel implicitly. 344 */ 345 AWS_IO_API 346 int aws_channel_slot_insert_right(struct aws_channel_slot *slot, struct aws_channel_slot *to_add); 347 348 /** 349 * Inserts to 'to_add' the end of the channel. Note that the first call to 350 * aws_channel_slot_new() adds it to the channel implicitly. 351 */ 352 AWS_IO_API 353 int aws_channel_slot_insert_end(struct aws_channel *channel, struct aws_channel_slot *to_add); 354 355 /** 356 * inserts 'to_add' to the position immediately to the left of slot. Note that the first call to 357 * aws_channel_slot_new() adds it to the channel implicitly. 358 */ 359 AWS_IO_API 360 int aws_channel_slot_insert_left(struct aws_channel_slot *slot, struct aws_channel_slot *to_add); 361 362 /** 363 * Sends a message to the adjacent slot in the channel based on dir. Also does window size checking. 364 * 365 * NOTE: if this function returns an error code, it is the caller's responsibility to release message 366 * back to the pool. If this function returns AWS_OP_SUCCESS, the recipient of the message has taken 367 * ownership of the message. So, for example, don't release a message to the pool and then return an error. 368 * If you encounter an error condition in this case, shutdown the channel with the appropriate error code. 369 */ 370 AWS_IO_API 371 int aws_channel_slot_send_message( 372 struct aws_channel_slot *slot, 373 struct aws_io_message *message, 374 enum aws_channel_direction dir); 375 376 /** 377 * Convenience function that invokes aws_channel_acquire_message_from_pool(), 378 * asking for the largest reasonable DATA message that can be sent in the write direction, 379 * with upstream overhead accounted for. 380 */ 381 AWS_IO_API 382 struct aws_io_message *aws_channel_slot_acquire_max_message_for_write(struct aws_channel_slot *slot); 383 384 /** 385 * Issues a window update notification upstream (to the left.) 386 */ 387 AWS_IO_API 388 int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window); 389 390 /** 391 * Called by handlers once they have finished their shutdown in the 'dir' direction. Propagates the shutdown process 392 * to the next handler in the channel. 393 */ 394 AWS_IO_API 395 int aws_channel_slot_on_handler_shutdown_complete( 396 struct aws_channel_slot *slot, 397 enum aws_channel_direction dir, 398 int err_code, 399 bool free_scarce_resources_immediately); 400 401 /** 402 * Initiates shutdown on slot. callbacks->on_shutdown_completed will be called 403 * once the shutdown process is completed. 404 */ 405 AWS_IO_API 406 int aws_channel_slot_shutdown( 407 struct aws_channel_slot *slot, 408 enum aws_channel_direction dir, 409 int err_code, 410 bool free_scarce_resources_immediately); 411 412 /** 413 * Fetches the downstream read window. This gives you the information necessary to honor the read window. If you call 414 * send_message() and it exceeds this window, the message will be rejected. 415 */ 416 AWS_IO_API 417 size_t aws_channel_slot_downstream_read_window(struct aws_channel_slot *slot); 418 419 /** Fetches the current overhead of upstream handlers. This provides a hint to avoid fragmentation if you care. */ 420 AWS_IO_API 421 size_t aws_channel_slot_upstream_message_overhead(struct aws_channel_slot *slot); 422 423 /** 424 * Calls destroy on handler's vtable 425 */ 426 AWS_IO_API 427 void aws_channel_handler_destroy(struct aws_channel_handler *handler); 428 429 /** 430 * Calls process_read_message on handler's vtable 431 */ 432 AWS_IO_API 433 int aws_channel_handler_process_read_message( 434 struct aws_channel_handler *handler, 435 struct aws_channel_slot *slot, 436 struct aws_io_message *message); 437 438 /** 439 * Calls process_write_message on handler's vtable. 440 */ 441 AWS_IO_API 442 int aws_channel_handler_process_write_message( 443 struct aws_channel_handler *handler, 444 struct aws_channel_slot *slot, 445 struct aws_io_message *message); 446 447 /** 448 * Calls on_window_update on handler's vtable. 449 */ 450 AWS_IO_API 451 int aws_channel_handler_increment_read_window( 452 struct aws_channel_handler *handler, 453 struct aws_channel_slot *slot, 454 size_t size); 455 456 /** 457 * calls shutdown_direction on handler's vtable. 458 */ 459 AWS_IO_API 460 int aws_channel_handler_shutdown( 461 struct aws_channel_handler *handler, 462 struct aws_channel_slot *slot, 463 enum aws_channel_direction dir, 464 int error_code, 465 bool free_scarce_resources_immediately); 466 467 /** 468 * Calls initial_window_size on handler's vtable. 469 */ 470 AWS_IO_API 471 size_t aws_channel_handler_initial_window_size(struct aws_channel_handler *handler); 472 473 AWS_IO_API 474 struct aws_channel_slot *aws_channel_get_first_slot(struct aws_channel *channel); 475 476 /** 477 * A way for external processes to force a read by the data-source channel handler. Necessary in certain cases, like 478 * when a server channel finishes setting up its initial handlers, a read may have already been triggered on the 479 * socket (the client's CLIENT_HELLO tls payload, for example) and absent further data/notifications, this data 480 * would never get processed. 481 */ 482 AWS_IO_API 483 int aws_channel_trigger_read(struct aws_channel *channel); 484 485 AWS_EXTERN_C_END 486 487 #endif /* AWS_IO_CHANNEL_H */ 488