1 #ifndef AWS_IO_CHANNEL_H
2 #define AWS_IO_CHANNEL_H
3 /**
4  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
5  * SPDX-License-Identifier: Apache-2.0.
6  */
7 
8 #include <aws/io/io.h>
9 
10 #include <aws/common/statistics.h>
11 #include <aws/common/task_scheduler.h>
12 
13 enum aws_channel_direction {
14     AWS_CHANNEL_DIR_READ,
15     AWS_CHANNEL_DIR_WRITE,
16 };
17 
18 struct aws_channel;
19 struct aws_channel_slot;
20 struct aws_channel_handler;
21 struct aws_event_loop;
22 struct aws_event_loop_local_object;
23 
24 typedef void(aws_channel_on_setup_completed_fn)(struct aws_channel *channel, int error_code, void *user_data);
25 
26 /* Callback called when a channel is completely shutdown. error_code refers to the reason the channel was closed. */
27 typedef void(aws_channel_on_shutdown_completed_fn)(struct aws_channel *channel, int error_code, void *user_data);
28 
29 struct aws_channel_slot {
30     struct aws_allocator *alloc;
31     struct aws_channel *channel;
32     struct aws_channel_slot *adj_left;
33     struct aws_channel_slot *adj_right;
34     struct aws_channel_handler *handler;
35     size_t window_size;
36     size_t upstream_message_overhead;
37     size_t current_window_update_batch_size;
38 };
39 
40 struct aws_channel_task;
41 typedef void(aws_channel_task_fn)(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status);
42 
43 struct aws_channel_task {
44     struct aws_task wrapper_task;
45     aws_channel_task_fn *task_fn;
46     void *arg;
47     const char *type_tag;
48     struct aws_linked_list_node node;
49 };
50 
51 struct aws_channel_handler_vtable {
52     /**
53      * Called by the channel when a message is available for processing in the read direction. It is your
54      * responsibility to call aws_mem_release(message->allocator, message); on message when you are finished with it.
55      *
56      * Also keep in mind that your slot's internal window has been decremented. You'll want to call
57      * aws_channel_slot_increment_read_window() at some point in the future if you want to keep receiving data.
58      */
59     int (*process_read_message)(
60         struct aws_channel_handler *handler,
61         struct aws_channel_slot *slot,
62         struct aws_io_message *message);
63     /**
64      * Called by the channel when a message is available for processing in the write direction. It is your
65      * responsibility to call aws_mem_release(message->allocator, message); on message when you are finished with it.
66      */
67     int (*process_write_message)(
68         struct aws_channel_handler *handler,
69         struct aws_channel_slot *slot,
70         struct aws_io_message *message);
71     /**
72      * Called by the channel when a downstream handler has issued a window increment. You'll want to update your
73      * internal state and likely propagate a window increment message of your own by calling
74      * 'aws_channel_slot_increment_read_window()'
75      */
76     int (*increment_read_window)(struct aws_channel_handler *handler, struct aws_channel_slot *slot, size_t size);
77 
78     /**
79      * The channel calls shutdown on all handlers twice, once to shut down reading, and once to shut down writing.
80      * Shutdown always begins with the left-most handler, and proceeds to the right with dir set to
81      * AWS_CHANNEL_DIR_READ. Then shutdown is called on handlers from right to left with dir set to
82      * AWS_CHANNEL_DIR_WRITE.
83      *
84      * The shutdown process does not need to complete immediately and may rely on scheduled tasks.
85      * The handler must call aws_channel_slot_on_handler_shutdown_complete() when it is finished,
86      * which propagates shutdown to the next handler.  If 'free_scarce_resources_immediately' is true,
87      * then resources vulnerable to denial-of-service attacks (such as sockets and file handles)
88      * must be closed immediately before the shutdown() call returns.
89      */
90     int (*shutdown)(
91         struct aws_channel_handler *handler,
92         struct aws_channel_slot *slot,
93         enum aws_channel_direction dir,
94         int error_code,
95         bool free_scarce_resources_immediately);
96 
97     /**
98      * Called by the channel when the handler is added to a slot, to get the initial window size.
99      */
100     size_t (*initial_window_size)(struct aws_channel_handler *handler);
101 
102     /** Called by the channel anytime a handler is added or removed, provides a hint for downstream
103      * handlers to avoid message fragmentation due to message overhead. */
104     size_t (*message_overhead)(struct aws_channel_handler *handler);
105 
106     /**
107      * Clean up any resources and deallocate yourself. The shutdown process will already be completed before this
108      * function is called.
109      */
110     void (*destroy)(struct aws_channel_handler *handler);
111 
112     /**
113      * Directs the channel handler to reset all of the internal statistics it tracks about itself.
114      */
115     void (*reset_statistics)(struct aws_channel_handler *handler);
116 
117     /**
118      * Adds a pointer to the handler's internal statistics (if they exist) to a list of statistics structures
119      * associated with the channel's handler chain.
120      */
121     void (*gather_statistics)(struct aws_channel_handler *handler, struct aws_array_list *stats_list);
122 
123     /*
124      * If this handler represents a source of data (like the socket_handler), then this will trigger a read
125      * from the data source.
126      */
127     void (*trigger_read)(struct aws_channel_handler *handler);
128 };
129 
130 struct aws_channel_handler {
131     struct aws_channel_handler_vtable *vtable;
132     struct aws_allocator *alloc;
133     struct aws_channel_slot *slot;
134     void *impl;
135 };
136 
137 /**
138  * Args for creating a new channel.
139  *  event_loop to use for IO and tasks. on_setup_completed will be invoked when
140  *  the setup process is finished It will be executed in the event loop's thread.
141  *  on_shutdown_completed will be executed upon channel shutdown.
142  *
143  *  enable_read_back_pressure toggles whether or not back pressure will be applied in the channel.
144  *  Leave this option off unless you're using something like reactive-streams, since it is a slight throughput
145  *  penalty.
146  *
147  *  Unless otherwise
148  *  specified all functions for channels and channel slots must be executed within that channel's event-loop's thread.
149  **/
150 struct aws_channel_options {
151     struct aws_event_loop *event_loop;
152     aws_channel_on_setup_completed_fn *on_setup_completed;
153     aws_channel_on_shutdown_completed_fn *on_shutdown_completed;
154     void *setup_user_data;
155     void *shutdown_user_data;
156     bool enable_read_back_pressure;
157 };
158 
159 AWS_EXTERN_C_BEGIN
160 
161 extern AWS_IO_API size_t g_aws_channel_max_fragment_size;
162 
163 /**
164  * Initializes channel_task for use.
165  */
166 AWS_IO_API
167 void aws_channel_task_init(
168     struct aws_channel_task *channel_task,
169     aws_channel_task_fn *task_fn,
170     void *arg,
171     const char *type_tag);
172 
173 /**
174  * Allocates new channel, Unless otherwise specified all functions for channels and channel slots must be executed
175  * within that channel's event-loop's thread. channel_options are copied.
176  */
177 AWS_IO_API
178 struct aws_channel *aws_channel_new(struct aws_allocator *allocator, const struct aws_channel_options *creation_args);
179 
180 /**
181  * Mark the channel, along with all slots and handlers, for destruction.
182  * Must be called after shutdown has completed.
183  * Can be called from any thread assuming 'aws_channel_shutdown()' has completed.
184  * Note that memory will not be freed until all users which acquired holds on the channel via
185  * aws_channel_acquire_hold(), release them via aws_channel_release_hold().
186  */
187 AWS_IO_API
188 void aws_channel_destroy(struct aws_channel *channel);
189 
190 /**
191  * Initiates shutdown of the channel. Shutdown will begin with the left-most slot. Each handler will invoke
192  * 'aws_channel_slot_on_handler_shutdown_complete' once they've finished their shutdown process for the read direction.
193  * Once the right-most slot has shutdown in the read direction, the process will start shutting down starting on the
194  * right-most slot. Once the left-most slot has shutdown in the write direction, 'callbacks->shutdown_completed' will be
195  * invoked in the event loop's thread.
196  *
197  * This function can be called from any thread.
198  */
199 AWS_IO_API
200 int aws_channel_shutdown(struct aws_channel *channel, int error_code);
201 
202 /**
203  * Prevent a channel's memory from being freed.
204  * Any number of users may acquire a hold to prevent a channel and its handlers from being unexpectedly freed.
205  * Any user which acquires a hold must release it via aws_channel_release_hold().
206  * Memory will be freed once all holds are released and aws_channel_destroy() has been called.
207  */
208 AWS_IO_API
209 void aws_channel_acquire_hold(struct aws_channel *channel);
210 
211 /**
212  * Release a hold on the channel's memory, allowing it to be freed.
213  * This may be called before or after aws_channel_destroy().
214  */
215 AWS_IO_API
216 void aws_channel_release_hold(struct aws_channel *channel);
217 
218 /**
219  * Allocates and initializes a new slot for use with the channel. If this is the first slot in the channel, it will
220  * automatically be added to the channel as the first slot. For all subsequent calls on a given channel, the slot will
221  * need to be added to the channel via. the aws_channel_slot_insert_right(), aws_channel_slot_insert_end(), and
222  * aws_channel_slot_insert_left() APIs.
223  */
224 AWS_IO_API
225 struct aws_channel_slot *aws_channel_slot_new(struct aws_channel *channel);
226 
227 /**
228  * Fetches the event loop the channel is a part of.
229  */
230 AWS_IO_API
231 struct aws_event_loop *aws_channel_get_event_loop(struct aws_channel *channel);
232 
233 /**
234  * Fetches the current timestamp from the event-loop's clock, in nanoseconds.
235  */
236 AWS_IO_API
237 int aws_channel_current_clock_time(struct aws_channel *channel, uint64_t *time_nanos);
238 
239 /**
240  * Retrieves an object by key from the event loop's local storage.
241  */
242 AWS_IO_API
243 int aws_channel_fetch_local_object(
244     struct aws_channel *channel,
245     const void *key,
246     struct aws_event_loop_local_object *obj);
247 
248 /**
249  * Stores an object by key in the event loop's local storage.
250  */
251 AWS_IO_API
252 int aws_channel_put_local_object(
253     struct aws_channel *channel,
254     const void *key,
255     const struct aws_event_loop_local_object *obj);
256 
257 /**
258  * Removes an object by key from the event loop's local storage.
259  */
260 AWS_IO_API
261 int aws_channel_remove_local_object(
262     struct aws_channel *channel,
263     const void *key,
264     struct aws_event_loop_local_object *removed_obj);
265 
266 /**
267  * Acquires a message from the event loop's message pool. size_hint is merely a hint, it may be smaller than you
268  * requested and you are responsible for checking the bounds of it. If the returned message is not large enough, you
269  * must send multiple messages.
270  */
271 AWS_IO_API
272 struct aws_io_message *aws_channel_acquire_message_from_pool(
273     struct aws_channel *channel,
274     enum aws_io_message_type message_type,
275     size_t size_hint);
276 
277 /**
278  * Schedules a task to run on the event loop as soon as possible.
279  * This is the ideal way to move a task into the correct thread. It's also handy for context switches.
280  * This function is safe to call from any thread.
281  *
282  * The task should not be cleaned up or modified until its function is executed.
283  */
284 AWS_IO_API
285 void aws_channel_schedule_task_now(struct aws_channel *channel, struct aws_channel_task *task);
286 
287 /**
288  * Schedules a task to run on the event loop at the specified time.
289  * This is the ideal way to move a task into the correct thread. It's also handy for context switches.
290  * Use aws_channel_current_clock_time() to get the current time in nanoseconds.
291  * This function is safe to call from any thread.
292  *
293  * The task should not be cleaned up or modified until its function is executed.
294  */
295 AWS_IO_API
296 void aws_channel_schedule_task_future(
297     struct aws_channel *channel,
298     struct aws_channel_task *task,
299     uint64_t run_at_nanos);
300 
301 /**
302  * Instrument a channel with a statistics handler.  While instrumented with a statistics handler, the channel
303  * will periodically report per-channel-handler-specific statistics about handler performance and state.
304  *
305  * Assigning a statistics handler to a channel is a transfer of ownership -- the channel will clean up
306  * the handler appropriately.  Statistics handlers may be changed dynamically (for example, the upgrade
307  * from a vanilla http channel to a websocket channel), but this function may only be called from the
308  * event loop thread that the channel is a part of.
309  *
310  * The first possible hook to set a statistics handler is the channel's creation callback.
311  */
312 AWS_IO_API
313 int aws_channel_set_statistics_handler(struct aws_channel *channel, struct aws_crt_statistics_handler *handler);
314 
315 /**
316  * Returns true if the caller is on the event loop's thread. If false, you likely need to use
317  * aws_channel_schedule_task(). This function is safe to call from any thread.
318  */
319 AWS_IO_API
320 bool aws_channel_thread_is_callers_thread(struct aws_channel *channel);
321 
322 /**
323  * Sets the handler for a slot, the slot will also call get_current_window_size() and propagate a window update
324  * upstream.
325  */
326 AWS_IO_API
327 int aws_channel_slot_set_handler(struct aws_channel_slot *slot, struct aws_channel_handler *handler);
328 
329 /**
330  * Removes slot from the channel and deallocates the slot and its handler.
331  */
332 AWS_IO_API
333 int aws_channel_slot_remove(struct aws_channel_slot *slot);
334 
335 /**
336  * Replaces remove with new_slot. Deallocates remove and its handler.
337  */
338 AWS_IO_API
339 int aws_channel_slot_replace(struct aws_channel_slot *remove, struct aws_channel_slot *new_slot);
340 
341 /**
342  * inserts 'to_add' to the position immediately to the right of slot. Note that the first call to
343  * aws_channel_slot_new() adds it to the channel implicitly.
344  */
345 AWS_IO_API
346 int aws_channel_slot_insert_right(struct aws_channel_slot *slot, struct aws_channel_slot *to_add);
347 
348 /**
349  * Inserts to 'to_add' the end of the channel. Note that the first call to
350  * aws_channel_slot_new() adds it to the channel implicitly.
351  */
352 AWS_IO_API
353 int aws_channel_slot_insert_end(struct aws_channel *channel, struct aws_channel_slot *to_add);
354 
355 /**
356  * inserts 'to_add' to the position immediately to the left of slot. Note that the first call to
357  * aws_channel_slot_new() adds it to the channel implicitly.
358  */
359 AWS_IO_API
360 int aws_channel_slot_insert_left(struct aws_channel_slot *slot, struct aws_channel_slot *to_add);
361 
362 /**
363  * Sends a message to the adjacent slot in the channel based on dir. Also does window size checking.
364  *
365  * NOTE: if this function returns an error code, it is the caller's responsibility to release message
366  * back to the pool. If this function returns AWS_OP_SUCCESS, the recipient of the message has taken
367  * ownership of the message. So, for example, don't release a message to the pool and then return an error.
368  * If you encounter an error condition in this case, shutdown the channel with the appropriate error code.
369  */
370 AWS_IO_API
371 int aws_channel_slot_send_message(
372     struct aws_channel_slot *slot,
373     struct aws_io_message *message,
374     enum aws_channel_direction dir);
375 
376 /**
377  * Convenience function that invokes aws_channel_acquire_message_from_pool(),
378  * asking for the largest reasonable DATA message that can be sent in the write direction,
379  * with upstream overhead accounted for.
380  */
381 AWS_IO_API
382 struct aws_io_message *aws_channel_slot_acquire_max_message_for_write(struct aws_channel_slot *slot);
383 
384 /**
385  * Issues a window update notification upstream (to the left.)
386  */
387 AWS_IO_API
388 int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window);
389 
390 /**
391  * Called by handlers once they have finished their shutdown in the 'dir' direction. Propagates the shutdown process
392  * to the next handler in the channel.
393  */
394 AWS_IO_API
395 int aws_channel_slot_on_handler_shutdown_complete(
396     struct aws_channel_slot *slot,
397     enum aws_channel_direction dir,
398     int err_code,
399     bool free_scarce_resources_immediately);
400 
401 /**
402  * Initiates shutdown on slot. callbacks->on_shutdown_completed will be called
403  * once the shutdown process is completed.
404  */
405 AWS_IO_API
406 int aws_channel_slot_shutdown(
407     struct aws_channel_slot *slot,
408     enum aws_channel_direction dir,
409     int err_code,
410     bool free_scarce_resources_immediately);
411 
412 /**
413  * Fetches the downstream read window. This gives you the information necessary to honor the read window. If you call
414  * send_message() and it exceeds this window, the message will be rejected.
415  */
416 AWS_IO_API
417 size_t aws_channel_slot_downstream_read_window(struct aws_channel_slot *slot);
418 
419 /** Fetches the current overhead of upstream handlers. This provides a hint to avoid fragmentation if you care. */
420 AWS_IO_API
421 size_t aws_channel_slot_upstream_message_overhead(struct aws_channel_slot *slot);
422 
423 /**
424  * Calls destroy on handler's vtable
425  */
426 AWS_IO_API
427 void aws_channel_handler_destroy(struct aws_channel_handler *handler);
428 
429 /**
430  * Calls process_read_message on handler's vtable
431  */
432 AWS_IO_API
433 int aws_channel_handler_process_read_message(
434     struct aws_channel_handler *handler,
435     struct aws_channel_slot *slot,
436     struct aws_io_message *message);
437 
438 /**
439  * Calls process_write_message on handler's vtable.
440  */
441 AWS_IO_API
442 int aws_channel_handler_process_write_message(
443     struct aws_channel_handler *handler,
444     struct aws_channel_slot *slot,
445     struct aws_io_message *message);
446 
447 /**
448  * Calls on_window_update on handler's vtable.
449  */
450 AWS_IO_API
451 int aws_channel_handler_increment_read_window(
452     struct aws_channel_handler *handler,
453     struct aws_channel_slot *slot,
454     size_t size);
455 
456 /**
457  * calls shutdown_direction on handler's vtable.
458  */
459 AWS_IO_API
460 int aws_channel_handler_shutdown(
461     struct aws_channel_handler *handler,
462     struct aws_channel_slot *slot,
463     enum aws_channel_direction dir,
464     int error_code,
465     bool free_scarce_resources_immediately);
466 
467 /**
468  * Calls initial_window_size on handler's vtable.
469  */
470 AWS_IO_API
471 size_t aws_channel_handler_initial_window_size(struct aws_channel_handler *handler);
472 
473 AWS_IO_API
474 struct aws_channel_slot *aws_channel_get_first_slot(struct aws_channel *channel);
475 
476 /**
477  * A way for external processes to force a read by the data-source channel handler.  Necessary in certain cases, like
478  * when a server channel finishes setting up its initial handlers, a read may have already been triggered on the
479  * socket (the client's CLIENT_HELLO tls payload, for example) and absent further data/notifications, this data
480  * would never get processed.
481  */
482 AWS_IO_API
483 int aws_channel_trigger_read(struct aws_channel *channel);
484 
485 AWS_EXTERN_C_END
486 
487 #endif /* AWS_IO_CHANNEL_H */
488