1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/http/private/websocket_impl.h>
7 
8 #include <aws/common/atomics.h>
9 #include <aws/common/device_random.h>
10 #include <aws/common/encoding.h>
11 #include <aws/common/mutex.h>
12 #include <aws/http/private/websocket_decoder.h>
13 #include <aws/http/private/websocket_encoder.h>
14 #include <aws/http/request_response.h>
15 #include <aws/io/channel.h>
16 #include <aws/io/logging.h>
17 
18 #include <inttypes.h>
19 
20 #if _MSC_VER
21 #    pragma warning(disable : 4204) /* non-constant aggregate initializer */
22 #endif
23 
24 /* TODO: echo payload of peer CLOSE */
25 
26 /* TODO: Can we be sure socket will always mark aws_io_messages as complete? */
27 
28 /* TODO: If something goes wrong during normal shutdown, do I change the error_code? */
29 
30 /* TODO: Delayed payload works by sending 0-size io_msgs down pipe and trying again when they're compele.
31  *       Do something more efficient? */
32 
33 /* TODO: don't fire send completion until data written to socket .. which also means delaying on_shutdown cb */
34 
35 /* TODO: stop using the HTTP_PARSE error, give websocket its own error */
36 
37 struct outgoing_frame {
38     struct aws_websocket_send_frame_options def;
39     struct aws_linked_list_node node;
40 };
41 
42 struct aws_websocket {
43     struct aws_allocator *alloc;
44     struct aws_channel_handler channel_handler;
45     struct aws_channel_slot *channel_slot;
46     size_t initial_window_size;
47     bool manual_window_update;
48 
49     void *user_data;
50     aws_websocket_on_incoming_frame_begin_fn *on_incoming_frame_begin;
51     aws_websocket_on_incoming_frame_payload_fn *on_incoming_frame_payload;
52     aws_websocket_on_incoming_frame_complete_fn *on_incoming_frame_complete;
53 
54     struct aws_channel_task move_synced_data_to_thread_task;
55     struct aws_channel_task shutdown_channel_task;
56     struct aws_channel_task increment_read_window_task;
57     struct aws_channel_task waiting_on_payload_stream_task;
58     struct aws_channel_task close_timeout_task;
59     bool is_server;
60 
61     /* Data that should only be accessed from the websocket's channel thread. */
62     struct {
63         struct aws_websocket_encoder encoder;
64         struct aws_linked_list outgoing_frame_list;
65         struct outgoing_frame *current_outgoing_frame;
66 
67         struct aws_websocket_decoder decoder;
68         struct aws_websocket_incoming_frame *current_incoming_frame;
69         struct aws_websocket_incoming_frame incoming_frame_storage;
70 
71         /* If current incoming frame is CONTINUATION, this is the data type it is a continuation of. */
72         enum aws_websocket_opcode continuation_of_opcode;
73 
74         /* Amount to increment window after a channel message has been processed. */
75         size_t incoming_message_window_update;
76 
77         /* Cached slot to right */
78         struct aws_channel_slot *last_known_right_slot;
79 
80         /* True when no more frames will be read, due to:
81          * - a CLOSE frame was received
82          * - decoder error
83          * - channel shutdown in read-dir */
84         bool is_reading_stopped;
85 
86         /* True when no more frames will be written, due to:
87          * - a CLOSE frame was sent
88          * - encoder error
89          * - channel shutdown in write-dir */
90         bool is_writing_stopped;
91 
92         /* During normal shutdown websocket ensures that a CLOSE frame is sent */
93         bool is_shutting_down_and_waiting_for_close_frame_to_be_written;
94         int channel_shutdown_error_code;
95         bool channel_shutdown_free_scarce_resources_immediately;
96 
97         /* Wait until each aws_io_message is completely written to
98          * the socket before sending the next aws_io_message */
99         bool is_waiting_for_write_completion;
100 
101         /* If, while writing out data from a payload stream, we experience "read would block",
102          * schedule a task to try again in the near-future. */
103         bool is_waiting_on_payload_stream_task;
104 
105         /* True if this websocket is being used as a dumb mid-channel handler.
106          * The websocket will no longer respond to its public API or invoke callbacks. */
107         bool is_midchannel_handler;
108     } thread_data;
109 
110     /* Data that may be touched from any thread (lock must be held). */
111     struct {
112         struct aws_mutex lock;
113 
114         struct aws_linked_list outgoing_frame_list;
115 
116         /* If non-zero, then increment_read_window_task is scheduled */
117         size_t window_increment_size;
118 
119         /* Error-code returned by aws_websocket_send_frame() when is_writing_stopped is true */
120         int send_frame_error_code;
121 
122         /* Use a task to issue a channel shutdown. */
123         int shutdown_channel_task_error_code;
124         bool is_shutdown_channel_task_scheduled;
125 
126         bool is_move_synced_data_to_thread_task_scheduled;
127 
128         /* Mirrors variable from thread_data */
129         bool is_midchannel_handler;
130 
131         /* Whether aws_websocket_release() has been called */
132         bool is_released;
133     } synced_data;
134 };
135 
136 static int s_handler_process_read_message(
137     struct aws_channel_handler *handler,
138     struct aws_channel_slot *slot,
139     struct aws_io_message *message);
140 
141 static int s_handler_process_write_message(
142     struct aws_channel_handler *handler,
143     struct aws_channel_slot *slot,
144     struct aws_io_message *message);
145 
146 static int s_handler_increment_read_window(
147     struct aws_channel_handler *handler,
148     struct aws_channel_slot *slot,
149     size_t size);
150 
151 static int s_handler_shutdown(
152     struct aws_channel_handler *handler,
153     struct aws_channel_slot *slot,
154     enum aws_channel_direction dir,
155     int error_code,
156     bool free_scarce_resources_immediately);
157 
158 static size_t s_handler_initial_window_size(struct aws_channel_handler *handler);
159 static size_t s_handler_message_overhead(struct aws_channel_handler *handler);
160 static void s_handler_destroy(struct aws_channel_handler *handler);
161 
162 static int s_encoder_stream_outgoing_payload(struct aws_byte_buf *out_buf, void *user_data);
163 
164 static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *user_data);
165 static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data);
166 static int s_decoder_on_user_payload(struct aws_websocket *websocket, struct aws_byte_cursor data);
167 static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, struct aws_byte_cursor data);
168 
169 static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct outgoing_frame *frame, int error_code);
170 static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result);
171 static void s_finish_shutdown(struct aws_websocket *websocket);
172 static void s_io_message_write_completed(
173     struct aws_channel *channel,
174     struct aws_io_message *message,
175     int err_code,
176     void *user_data);
177 static int s_send_frame(
178     struct aws_websocket *websocket,
179     const struct aws_websocket_send_frame_options *options,
180     bool from_public_api);
181 static bool s_midchannel_send_payload(struct aws_websocket *websocket, struct aws_byte_buf *out_buf, void *user_data);
182 static void s_midchannel_send_complete(struct aws_websocket *websocket, int error_code, void *user_data);
183 static void s_move_synced_data_to_thread_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
184 static void s_increment_read_window_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
185 static void s_shutdown_channel_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
186 static void s_waiting_on_payload_stream_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
187 static void s_close_timeout_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
188 static void s_schedule_channel_shutdown(struct aws_websocket *websocket, int error_code);
189 static void s_shutdown_due_to_write_err(struct aws_websocket *websocket, int error_code);
190 static void s_shutdown_due_to_read_err(struct aws_websocket *websocket, int error_code);
191 static void s_stop_writing(struct aws_websocket *websocket, int send_frame_error_code);
192 static void s_try_write_outgoing_frames(struct aws_websocket *websocket);
193 
194 static struct aws_channel_handler_vtable s_channel_handler_vtable = {
195     .process_read_message = s_handler_process_read_message,
196     .process_write_message = s_handler_process_write_message,
197     .increment_read_window = s_handler_increment_read_window,
198     .shutdown = s_handler_shutdown,
199     .initial_window_size = s_handler_initial_window_size,
200     .message_overhead = s_handler_message_overhead,
201     .destroy = s_handler_destroy,
202 };
203 
aws_websocket_opcode_str(uint8_t opcode)204 const char *aws_websocket_opcode_str(uint8_t opcode) {
205     switch (opcode) {
206         case AWS_WEBSOCKET_OPCODE_CONTINUATION:
207             return "continuation";
208         case AWS_WEBSOCKET_OPCODE_TEXT:
209             return "text";
210         case AWS_WEBSOCKET_OPCODE_BINARY:
211             return "binary";
212         case AWS_WEBSOCKET_OPCODE_CLOSE:
213             return "close";
214         case AWS_WEBSOCKET_OPCODE_PING:
215             return "ping";
216         case AWS_WEBSOCKET_OPCODE_PONG:
217             return "pong";
218         default:
219             return "";
220     }
221 }
222 
aws_websocket_is_data_frame(uint8_t opcode)223 bool aws_websocket_is_data_frame(uint8_t opcode) {
224     /* RFC-6455 Section 5.6: Most significant bit of (4 bit) data frame opcode is 0 */
225     return !(opcode & 0x08);
226 }
227 
s_lock_synced_data(struct aws_websocket * websocket)228 static void s_lock_synced_data(struct aws_websocket *websocket) {
229     int err = aws_mutex_lock(&websocket->synced_data.lock);
230     AWS_ASSERT(!err);
231     (void)err;
232 }
233 
s_unlock_synced_data(struct aws_websocket * websocket)234 static void s_unlock_synced_data(struct aws_websocket *websocket) {
235     int err = aws_mutex_unlock(&websocket->synced_data.lock);
236     AWS_ASSERT(!err);
237     (void)err;
238 }
239 
aws_websocket_handler_new(const struct aws_websocket_handler_options * options)240 struct aws_websocket *aws_websocket_handler_new(const struct aws_websocket_handler_options *options) {
241     /* TODO: validate options */
242 
243     struct aws_channel_slot *slot = NULL;
244     struct aws_websocket *websocket = NULL;
245     int err;
246 
247     slot = aws_channel_slot_new(options->channel);
248     if (!slot) {
249         goto error;
250     }
251 
252     err = aws_channel_slot_insert_end(options->channel, slot);
253     if (err) {
254         goto error;
255     }
256 
257     websocket = aws_mem_calloc(options->allocator, 1, sizeof(struct aws_websocket));
258     if (!websocket) {
259         goto error;
260     }
261 
262     websocket->alloc = options->allocator;
263     websocket->channel_handler.vtable = &s_channel_handler_vtable;
264     websocket->channel_handler.alloc = options->allocator;
265     websocket->channel_handler.impl = websocket;
266 
267     websocket->channel_slot = slot;
268 
269     websocket->initial_window_size = options->initial_window_size;
270     websocket->manual_window_update = options->manual_window_update;
271 
272     websocket->user_data = options->user_data;
273     websocket->on_incoming_frame_begin = options->on_incoming_frame_begin;
274     websocket->on_incoming_frame_payload = options->on_incoming_frame_payload;
275     websocket->on_incoming_frame_complete = options->on_incoming_frame_complete;
276 
277     websocket->is_server = options->is_server;
278 
279     aws_channel_task_init(
280         &websocket->move_synced_data_to_thread_task,
281         s_move_synced_data_to_thread_task,
282         websocket,
283         "websocket_move_synced_data_to_thread");
284     aws_channel_task_init(
285         &websocket->shutdown_channel_task, s_shutdown_channel_task, websocket, "websocket_shutdown_channel");
286     aws_channel_task_init(
287         &websocket->increment_read_window_task,
288         s_increment_read_window_task,
289         websocket,
290         "websocket_increment_read_window");
291     aws_channel_task_init(
292         &websocket->waiting_on_payload_stream_task,
293         s_waiting_on_payload_stream_task,
294         websocket,
295         "websocket_waiting_on_payload_stream");
296     aws_channel_task_init(&websocket->close_timeout_task, s_close_timeout_task, websocket, "websocket_close_timeout");
297 
298     aws_linked_list_init(&websocket->thread_data.outgoing_frame_list);
299 
300     aws_websocket_encoder_init(&websocket->thread_data.encoder, s_encoder_stream_outgoing_payload, websocket);
301 
302     aws_websocket_decoder_init(&websocket->thread_data.decoder, s_decoder_on_frame, s_decoder_on_payload, websocket);
303 
304     aws_linked_list_init(&websocket->synced_data.outgoing_frame_list);
305 
306     err = aws_mutex_init(&websocket->synced_data.lock);
307     if (err) {
308         AWS_LOGF_ERROR(
309             AWS_LS_HTTP_WEBSOCKET,
310             "static: Failed to initialize mutex, error %d (%s).",
311             aws_last_error(),
312             aws_error_name(aws_last_error()));
313 
314         goto error;
315     }
316 
317     err = aws_channel_slot_set_handler(slot, &websocket->channel_handler);
318     if (err) {
319         goto error;
320     }
321 
322     /* Ensure websocket (and the rest of the channel) can't be destroyed until aws_websocket_release() is called */
323     aws_channel_acquire_hold(options->channel);
324 
325     return websocket;
326 
327 error:
328     if (slot) {
329         if (websocket && !slot->handler) {
330             websocket->channel_handler.vtable->destroy(&websocket->channel_handler);
331         }
332         aws_channel_slot_remove(slot);
333     }
334     return NULL;
335 }
336 
s_handler_destroy(struct aws_channel_handler * handler)337 static void s_handler_destroy(struct aws_channel_handler *handler) {
338     struct aws_websocket *websocket = handler->impl;
339     AWS_ASSERT(!websocket->thread_data.current_outgoing_frame);
340     AWS_ASSERT(!websocket->thread_data.current_incoming_frame);
341 
342     AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Destroying websocket.", (void *)websocket);
343 
344     aws_mutex_clean_up(&websocket->synced_data.lock);
345     aws_mem_release(websocket->alloc, websocket);
346 }
347 
aws_websocket_release(struct aws_websocket * websocket)348 void aws_websocket_release(struct aws_websocket *websocket) {
349     AWS_ASSERT(websocket);
350     AWS_ASSERT(websocket->channel_slot);
351 
352     bool was_already_released;
353 
354     /* BEGIN CRITICAL SECTION */
355     s_lock_synced_data(websocket);
356     if (websocket->synced_data.is_released) {
357         was_already_released = true;
358     } else {
359         was_already_released = false;
360         websocket->synced_data.is_released = true;
361     }
362     s_unlock_synced_data(websocket);
363     /* END CRITICAL SECTION */
364 
365     if (was_already_released) {
366         AWS_LOGF_TRACE(
367             AWS_LS_HTTP_WEBSOCKET, "id=%p: Ignoring multiple calls to websocket release.", (void *)websocket);
368         return;
369     }
370 
371     AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket released, shut down if necessary.", (void *)websocket);
372 
373     /* Channel might already be shut down, but make sure */
374     s_schedule_channel_shutdown(websocket, AWS_ERROR_SUCCESS);
375 
376     /* Channel won't destroy its slots/handlers until its refcount reaches 0 */
377     aws_channel_release_hold(websocket->channel_slot->channel);
378 }
379 
aws_websocket_get_channel(const struct aws_websocket * websocket)380 struct aws_channel *aws_websocket_get_channel(const struct aws_websocket *websocket) {
381     return websocket->channel_slot->channel;
382 }
383 
aws_websocket_convert_to_midchannel_handler(struct aws_websocket * websocket)384 int aws_websocket_convert_to_midchannel_handler(struct aws_websocket *websocket) {
385     if (!aws_channel_thread_is_callers_thread(websocket->channel_slot->channel)) {
386         AWS_LOGF_ERROR(
387             AWS_LS_HTTP_WEBSOCKET, "id=%p: Cannot convert to midchannel handler on this thread.", (void *)websocket);
388         return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
389     }
390 
391     if (websocket->thread_data.is_midchannel_handler) {
392         AWS_LOGF_ERROR(
393             AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket has already converted to midchannel handler.", (void *)websocket);
394         return aws_raise_error(AWS_ERROR_HTTP_WEBSOCKET_IS_MIDCHANNEL_HANDLER);
395     }
396 
397     if (websocket->thread_data.is_reading_stopped || websocket->thread_data.is_writing_stopped) {
398         AWS_LOGF_ERROR(
399             AWS_LS_HTTP_WEBSOCKET,
400             "id=%p: Cannot convert websocket to midchannel handler because it is closed or closing.",
401             (void *)websocket);
402         return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
403     }
404 
405     if (websocket->thread_data.current_incoming_frame) {
406         AWS_LOGF_ERROR(
407             AWS_LS_HTTP_WEBSOCKET,
408             "id=%p: Cannot convert to midchannel handler in the middle of an incoming frame.",
409             (void *)websocket);
410         return aws_raise_error(AWS_ERROR_INVALID_STATE);
411     }
412 
413     bool was_released = false;
414 
415     /* BEGIN CRITICAL SECTION */
416     s_lock_synced_data(websocket);
417     if (websocket->synced_data.is_released) {
418         was_released = true;
419     } else {
420         websocket->synced_data.is_midchannel_handler = true;
421     }
422     s_unlock_synced_data(websocket);
423     /* END CRITICAL SECTION */
424 
425     if (was_released) {
426         AWS_LOGF_ERROR(
427             AWS_LS_HTTP_WEBSOCKET,
428             "id=%p: Cannot convert websocket to midchannel handler because it was already released.",
429             (void *)websocket);
430         return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
431     }
432 
433     websocket->thread_data.is_midchannel_handler = true;
434 
435     return AWS_OP_SUCCESS;
436 }
437 
438 /* Insert frame into list, sorting by priority, then by age (high-priority and older frames towards the front) */
s_enqueue_prioritized_frame(struct aws_linked_list * list,struct outgoing_frame * to_add)439 static void s_enqueue_prioritized_frame(struct aws_linked_list *list, struct outgoing_frame *to_add) {
440     /* Iterate in reverse so that common case (a bunch of low-priority frames) is O(1) */
441     struct aws_linked_list_node *rev_iter = aws_linked_list_rbegin(list);
442     const struct aws_linked_list_node *rev_end = aws_linked_list_rend(list);
443     while (rev_iter != rev_end) {
444         struct outgoing_frame *frame_i = AWS_CONTAINER_OF(rev_iter, struct outgoing_frame, node);
445         if (to_add->def.high_priority == frame_i->def.high_priority) {
446             break;
447         }
448         rev_iter = aws_linked_list_prev(rev_iter);
449     }
450 
451     aws_linked_list_insert_after(rev_iter, &to_add->node);
452 }
453 
s_send_frame(struct aws_websocket * websocket,const struct aws_websocket_send_frame_options * options,bool from_public_api)454 static int s_send_frame(
455     struct aws_websocket *websocket,
456     const struct aws_websocket_send_frame_options *options,
457     bool from_public_api) {
458 
459     AWS_ASSERT(websocket);
460     AWS_ASSERT(options);
461 
462     /* Check for bad input. Log about non-obvious errors. */
463     if (options->high_priority && aws_websocket_is_data_frame(options->opcode)) {
464         AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Data frames cannot be sent as high-priority.", (void *)websocket);
465         return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
466     }
467     if (options->payload_length > 0 && !options->stream_outgoing_payload) {
468         AWS_LOGF_ERROR(
469             AWS_LS_HTTP_WEBSOCKET,
470             "id=%p: Invalid frame options, payload streaming function required when payload length is non-zero.",
471             (void *)websocket);
472         return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
473     }
474 
475     struct outgoing_frame *frame = aws_mem_calloc(websocket->alloc, 1, sizeof(struct outgoing_frame));
476     if (!frame) {
477         return AWS_OP_ERR;
478     }
479 
480     frame->def = *options;
481 
482     /* Enqueue frame, unless no further sending is allowed. */
483     int send_error = 0;
484     bool should_schedule_task = false;
485 
486     /* BEGIN CRITICAL SECTION */
487     s_lock_synced_data(websocket);
488 
489     if (websocket->synced_data.is_midchannel_handler && from_public_api) {
490         send_error = AWS_ERROR_HTTP_WEBSOCKET_IS_MIDCHANNEL_HANDLER;
491     } else if (websocket->synced_data.send_frame_error_code) {
492         send_error = websocket->synced_data.send_frame_error_code;
493     } else {
494         aws_linked_list_push_back(&websocket->synced_data.outgoing_frame_list, &frame->node);
495         if (!websocket->synced_data.is_move_synced_data_to_thread_task_scheduled) {
496             websocket->synced_data.is_move_synced_data_to_thread_task_scheduled = true;
497             should_schedule_task = true;
498         }
499     }
500 
501     s_unlock_synced_data(websocket);
502     /* END CRITICAL SECTION */
503 
504     if (send_error) {
505         AWS_LOGF_ERROR(
506             AWS_LS_HTTP_WEBSOCKET,
507             "id=%p: Cannot send frame, error %d (%s).",
508             (void *)websocket,
509             send_error,
510             aws_error_name(send_error));
511 
512         aws_mem_release(websocket->alloc, frame);
513         return aws_raise_error(send_error);
514     }
515 
516     AWS_LOGF_DEBUG(
517         AWS_LS_HTTP_WEBSOCKET,
518         "id=%p: Enqueuing outgoing frame with opcode=%" PRIu8 "(%s) length=%" PRIu64 " fin=%s priority=%s",
519         (void *)websocket,
520         options->opcode,
521         aws_websocket_opcode_str(options->opcode),
522         options->payload_length,
523         options->fin ? "T" : "F",
524         options->high_priority ? "high" : "normal");
525 
526     if (should_schedule_task) {
527         AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Scheduling synced data task.", (void *)websocket);
528         aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->move_synced_data_to_thread_task);
529     }
530 
531     return AWS_OP_SUCCESS;
532 }
533 
aws_websocket_send_frame(struct aws_websocket * websocket,const struct aws_websocket_send_frame_options * options)534 int aws_websocket_send_frame(struct aws_websocket *websocket, const struct aws_websocket_send_frame_options *options) {
535     return s_send_frame(websocket, options, true);
536 }
537 
s_move_synced_data_to_thread_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)538 static void s_move_synced_data_to_thread_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
539     (void)task;
540     if (status != AWS_TASK_STATUS_RUN_READY) {
541         return;
542     }
543 
544     struct aws_websocket *websocket = arg;
545     struct aws_linked_list tmp_list;
546     aws_linked_list_init(&tmp_list);
547 
548     /* BEGIN CRITICAL SECTION */
549     s_lock_synced_data(websocket);
550 
551     aws_linked_list_swap_contents(&websocket->synced_data.outgoing_frame_list, &tmp_list);
552 
553     websocket->synced_data.is_move_synced_data_to_thread_task_scheduled = false;
554 
555     s_unlock_synced_data(websocket);
556     /* END CRITICAL SECTION */
557 
558     if (!aws_linked_list_empty(&tmp_list)) {
559         do {
560             struct aws_linked_list_node *node = aws_linked_list_pop_front(&tmp_list);
561             struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
562             s_enqueue_prioritized_frame(&websocket->thread_data.outgoing_frame_list, frame);
563         } while (!aws_linked_list_empty(&tmp_list));
564 
565         s_try_write_outgoing_frames(websocket);
566     }
567 }
568 
s_try_write_outgoing_frames(struct aws_websocket * websocket)569 static void s_try_write_outgoing_frames(struct aws_websocket *websocket) {
570     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
571     int err;
572 
573     /* Check whether we should be writing data */
574     if (!websocket->thread_data.current_outgoing_frame &&
575         aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
576 
577         AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No data to write at this time.", (void *)websocket);
578         return;
579     }
580 
581     if (websocket->thread_data.is_waiting_for_write_completion) {
582         AWS_LOGF_TRACE(
583             AWS_LS_HTTP_WEBSOCKET,
584             "id=%p: Waiting until outstanding aws_io_message is written to socket before sending more data.",
585             (void *)websocket);
586         return;
587     }
588 
589     if (websocket->thread_data.is_writing_stopped) {
590         AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket is no longer sending data.", (void *)websocket);
591         return;
592     }
593 
594     /* Acquire aws_io_message */
595     struct aws_io_message *io_msg = aws_channel_slot_acquire_max_message_for_write(websocket->channel_slot);
596     if (!io_msg) {
597         AWS_LOGF_ERROR(
598             AWS_LS_HTTP_WEBSOCKET,
599             "id=%p: Failed acquire message from pool, error %d (%s).",
600             (void *)websocket,
601             aws_last_error(),
602             aws_error_name(aws_last_error()));
603         goto error;
604     }
605 
606     io_msg->user_data = websocket;
607     io_msg->on_completion = s_io_message_write_completed;
608 
609     /* Loop through frames, writing their data into the io_msg */
610     bool wrote_close_frame = false;
611     while (!websocket->thread_data.is_writing_stopped) {
612         if (websocket->thread_data.current_outgoing_frame) {
613             AWS_LOGF_TRACE(
614                 AWS_LS_HTTP_WEBSOCKET,
615                 "id=%p: Resuming write of frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
616                 (void *)websocket,
617                 (void *)websocket->thread_data.current_outgoing_frame,
618                 websocket->thread_data.current_outgoing_frame->def.opcode,
619                 aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
620                 websocket->thread_data.current_outgoing_frame->def.payload_length);
621 
622         } else {
623             /* We're not in the middle of encoding a frame, so pop off the next one to encode. */
624             if (aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
625                 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No more frames to write.", (void *)websocket);
626                 break;
627             }
628 
629             struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list);
630             websocket->thread_data.current_outgoing_frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
631 
632             struct aws_websocket_frame frame = {
633                 .fin = websocket->thread_data.current_outgoing_frame->def.fin,
634                 .opcode = websocket->thread_data.current_outgoing_frame->def.opcode,
635                 .payload_length = websocket->thread_data.current_outgoing_frame->def.payload_length,
636             };
637 
638             /* RFC-6455 Section 5.3 Client-to-Server Masking
639              * Clients must mask payload with key derived from an unpredictable source of entropy. */
640             if (!websocket->is_server) {
641                 frame.masked = true;
642                 /* TODO: faster source of random (but still seeded by device_random) */
643                 struct aws_byte_buf masking_key_buf = aws_byte_buf_from_empty_array(frame.masking_key, 4);
644                 err = aws_device_random_buffer(&masking_key_buf);
645                 if (err) {
646                     AWS_LOGF_ERROR(
647                         AWS_LS_HTTP_WEBSOCKET,
648                         "id=%p: Failed to derive masking key, error %d (%s).",
649                         (void *)websocket,
650                         aws_last_error(),
651                         aws_error_name(aws_last_error()));
652                     goto error;
653                 }
654             }
655 
656             err = aws_websocket_encoder_start_frame(&websocket->thread_data.encoder, &frame);
657             if (err) {
658                 AWS_LOGF_ERROR(
659                     AWS_LS_HTTP_WEBSOCKET,
660                     "id=%p: Failed to start frame encoding, error %d (%s).",
661                     (void *)websocket,
662                     aws_last_error(),
663                     aws_error_name(aws_last_error()));
664                 goto error;
665             }
666 
667             AWS_LOGF_TRACE(
668                 AWS_LS_HTTP_WEBSOCKET,
669                 "id=%p: Start writing frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
670                 (void *)websocket,
671                 (void *)websocket->thread_data.current_outgoing_frame,
672                 websocket->thread_data.current_outgoing_frame->def.opcode,
673                 aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
674                 websocket->thread_data.current_outgoing_frame->def.payload_length);
675         }
676 
677         err = aws_websocket_encoder_process(&websocket->thread_data.encoder, &io_msg->message_data);
678         if (err) {
679             AWS_LOGF_ERROR(
680                 AWS_LS_HTTP_WEBSOCKET,
681                 "id=%p: Frame encoding failed with error %d (%s).",
682                 (void *)websocket,
683                 aws_last_error(),
684                 aws_error_name(aws_last_error()));
685             goto error;
686         }
687 
688         if (aws_websocket_encoder_is_frame_in_progress(&websocket->thread_data.encoder)) {
689             AWS_LOGF_TRACE(
690                 AWS_LS_HTTP_WEBSOCKET,
691                 "id=%p: Outgoing frame still in progress, but no more data can be written at this time.",
692                 (void *)websocket);
693             break;
694         }
695 
696         if (websocket->thread_data.current_outgoing_frame->def.opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
697             wrote_close_frame = true;
698         }
699 
700         s_destroy_outgoing_frame(websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_SUCCESS);
701         websocket->thread_data.current_outgoing_frame = NULL;
702 
703         if (wrote_close_frame) {
704             break;
705         }
706     }
707 
708     /* If payload stream didn't have any bytes available to read right now, then the aws_io_message might be empty.
709      * If this is the case schedule a task to try again in the future. */
710     if (io_msg->message_data.len == 0) {
711         AWS_LOGF_TRACE(
712             AWS_LS_HTTP_WEBSOCKET,
713             "id=%p: Reading from payload stream would block, will try again later.",
714             (void *)websocket);
715 
716         if (!websocket->thread_data.is_waiting_on_payload_stream_task) {
717             websocket->thread_data.is_waiting_on_payload_stream_task = true;
718 
719             /* Future Optimization Idea: Minimize work while we wait. Use some kind of backoff for the retry timing,
720              * or have some way for stream to notify when more data is available. */
721             aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->waiting_on_payload_stream_task);
722         }
723 
724         aws_mem_release(io_msg->allocator, io_msg);
725         return;
726     }
727 
728     /* Prepare to send aws_io_message up the channel.
729      * Note that the write-completion callback may fire before send_message() returns */
730 
731     /* If CLOSE frame was written, that's the last data we'll write */
732     if (wrote_close_frame) {
733         s_stop_writing(websocket, AWS_ERROR_HTTP_WEBSOCKET_CLOSE_FRAME_SENT);
734     }
735 
736     AWS_LOGF_TRACE(
737         AWS_LS_HTTP_WEBSOCKET,
738         "id=%p: Sending aws_io_message of size %zu in write direction.",
739         (void *)websocket,
740         io_msg->message_data.len);
741 
742     websocket->thread_data.is_waiting_for_write_completion = true;
743     err = aws_channel_slot_send_message(websocket->channel_slot, io_msg, AWS_CHANNEL_DIR_WRITE);
744     if (err) {
745         websocket->thread_data.is_waiting_for_write_completion = false;
746         AWS_LOGF_ERROR(
747             AWS_LS_HTTP_WEBSOCKET,
748             "id=%p: Failed to send message in write direction, error %d (%s).",
749             (void *)websocket,
750             aws_last_error(),
751             aws_error_name(aws_last_error()));
752         goto error;
753     }
754 
755     /* Finish shutdown if we were waiting for the CLOSE frame to be written */
756     if (wrote_close_frame && websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
757         AWS_LOGF_TRACE(
758             AWS_LS_HTTP_WEBSOCKET, "id=%p: CLOSE frame sent, finishing handler shutdown sequence.", (void *)websocket);
759 
760         s_finish_shutdown(websocket);
761     }
762 
763     return;
764 
765 error:
766     if (io_msg) {
767         aws_mem_release(io_msg->allocator, io_msg);
768     }
769 
770     s_shutdown_due_to_write_err(websocket, aws_last_error());
771 }
772 
773 /* Encoder's outgoing_payload callback invokes current frame's callback */
s_encoder_stream_outgoing_payload(struct aws_byte_buf * out_buf,void * user_data)774 static int s_encoder_stream_outgoing_payload(struct aws_byte_buf *out_buf, void *user_data) {
775     struct aws_websocket *websocket = user_data;
776     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
777     AWS_ASSERT(websocket->thread_data.current_outgoing_frame);
778 
779     struct outgoing_frame *current_frame = websocket->thread_data.current_outgoing_frame;
780     AWS_ASSERT(current_frame->def.stream_outgoing_payload);
781 
782     bool callback_result = current_frame->def.stream_outgoing_payload(websocket, out_buf, current_frame->def.user_data);
783     if (!callback_result) {
784         AWS_LOGF_ERROR(
785             AWS_LS_HTTP_WEBSOCKET, "id=%p: Outgoing payload callback has reported a failure.", (void *)websocket);
786         return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
787     }
788 
789     return AWS_OP_SUCCESS;
790 }
791 
s_waiting_on_payload_stream_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)792 static void s_waiting_on_payload_stream_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
793     (void)task;
794     if (status != AWS_TASK_STATUS_RUN_READY) {
795         /* If channel has shut down, don't need to resume sending payload */
796         return;
797     }
798 
799     struct aws_websocket *websocket = arg;
800     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
801 
802     AWS_LOGF_TRACE(
803         AWS_LS_HTTP_WEBSOCKET, "id=%p: Done waiting for payload stream, sending more data...", (void *)websocket);
804 
805     websocket->thread_data.is_waiting_on_payload_stream_task = false;
806     s_try_write_outgoing_frames(websocket);
807 }
808 
s_io_message_write_completed(struct aws_channel * channel,struct aws_io_message * message,int err_code,void * user_data)809 static void s_io_message_write_completed(
810     struct aws_channel *channel,
811     struct aws_io_message *message,
812     int err_code,
813     void *user_data) {
814 
815     (void)channel;
816     (void)message;
817     struct aws_websocket *websocket = user_data;
818     AWS_ASSERT(aws_channel_thread_is_callers_thread(channel));
819 
820     if (err_code == AWS_ERROR_SUCCESS) {
821         AWS_LOGF_TRACE(
822             AWS_LS_HTTP_WEBSOCKET, "id=%p: aws_io_message written to socket, sending more data...", (void *)websocket);
823 
824         websocket->thread_data.is_waiting_for_write_completion = false;
825         s_try_write_outgoing_frames(websocket);
826     } else {
827         AWS_LOGF_TRACE(
828             AWS_LS_HTTP_WEBSOCKET,
829             "id=%p: aws_io_message did not finish writing to socket, error %d (%s).",
830             (void *)websocket,
831             err_code,
832             aws_error_name(err_code));
833 
834         s_shutdown_due_to_write_err(websocket, err_code);
835     }
836 }
837 
s_handler_process_write_message(struct aws_channel_handler * handler,struct aws_channel_slot * slot,struct aws_io_message * message)838 static int s_handler_process_write_message(
839     struct aws_channel_handler *handler,
840     struct aws_channel_slot *slot,
841     struct aws_io_message *message) {
842 
843     (void)slot;
844     struct aws_websocket *websocket = handler->impl;
845     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
846 
847     /* For each aws_io_message headed in the write direction, send a BINARY frame,
848      * where the frame's payload is the data from this aws_io_message. */
849     struct aws_websocket_send_frame_options options = {
850         .payload_length = message->message_data.len,
851         .user_data = message,
852         .stream_outgoing_payload = s_midchannel_send_payload,
853         .on_complete = s_midchannel_send_complete,
854         .opcode = AWS_WEBSOCKET_OPCODE_BINARY,
855         .fin = true,
856     };
857 
858     /* Use copy_mark to track progress as the data is streamed out */
859     message->copy_mark = 0;
860 
861     int err = s_send_frame(websocket, &options, false);
862     if (err) {
863         /* TODO: mqtt handler needs to clean up messsages that fail to send. */
864         return AWS_OP_ERR;
865     }
866 
867     return AWS_OP_SUCCESS;
868 }
869 
870 /* Callback for writing data from downstream aws_io_messages into payload of BINARY frames headed upstream */
s_midchannel_send_payload(struct aws_websocket * websocket,struct aws_byte_buf * out_buf,void * user_data)871 static bool s_midchannel_send_payload(struct aws_websocket *websocket, struct aws_byte_buf *out_buf, void *user_data) {
872     (void)websocket;
873     struct aws_io_message *io_msg = user_data;
874 
875     /* copy_mark is used to track progress */
876     size_t src_available = io_msg->message_data.len - io_msg->copy_mark;
877     size_t dst_available = out_buf->capacity - out_buf->len;
878     size_t sending = dst_available < src_available ? dst_available : src_available;
879 
880     bool success = aws_byte_buf_write(out_buf, io_msg->message_data.buffer + io_msg->copy_mark, sending);
881 
882     io_msg->copy_mark += sending;
883     return success;
884 }
885 
886 /* Callback when data from downstream aws_io_messages, finishes being sent as a BINARY frame upstream. */
s_midchannel_send_complete(struct aws_websocket * websocket,int error_code,void * user_data)887 static void s_midchannel_send_complete(struct aws_websocket *websocket, int error_code, void *user_data) {
888     (void)websocket;
889     struct aws_io_message *io_msg = user_data;
890 
891     if (io_msg->on_completion) {
892         io_msg->on_completion(io_msg->owning_channel, io_msg, error_code, io_msg->user_data);
893     }
894 
895     aws_mem_release(io_msg->allocator, io_msg);
896 }
897 
s_destroy_outgoing_frame(struct aws_websocket * websocket,struct outgoing_frame * frame,int error_code)898 static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct outgoing_frame *frame, int error_code) {
899     AWS_LOGF_TRACE(
900         AWS_LS_HTTP_WEBSOCKET,
901         "id=%p: Completed outgoing frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 " with error_code %d (%s).",
902         (void *)websocket,
903         (void *)frame,
904         frame->def.opcode,
905         aws_websocket_opcode_str(frame->def.opcode),
906         frame->def.payload_length,
907         error_code,
908         aws_error_name(error_code));
909 
910     if (frame->def.on_complete) {
911         frame->def.on_complete(websocket, error_code, frame->def.user_data);
912     }
913 
914     aws_mem_release(websocket->alloc, frame);
915 }
916 
s_stop_writing(struct aws_websocket * websocket,int send_frame_error_code)917 static void s_stop_writing(struct aws_websocket *websocket, int send_frame_error_code) {
918     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
919     AWS_ASSERT(send_frame_error_code != AWS_ERROR_SUCCESS);
920 
921     if (websocket->thread_data.is_writing_stopped) {
922         return;
923     }
924 
925     AWS_LOGF_TRACE(
926         AWS_LS_HTTP_WEBSOCKET,
927         "id=%p: Websocket will send no more data, future attempts to send will get error %d (%s).",
928         (void *)websocket,
929         send_frame_error_code,
930         aws_error_name(send_frame_error_code));
931 
932     /* BEGIN CRITICAL SECTION */
933     s_lock_synced_data(websocket);
934 
935     websocket->synced_data.send_frame_error_code = send_frame_error_code;
936 
937     s_unlock_synced_data(websocket);
938     /* END CRITICAL SECTION */
939 
940     websocket->thread_data.is_writing_stopped = true;
941 }
942 
s_shutdown_due_to_write_err(struct aws_websocket * websocket,int error_code)943 static void s_shutdown_due_to_write_err(struct aws_websocket *websocket, int error_code) {
944     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
945 
946     /* No more writing allowed (it's ok to call this redundantly). */
947     s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
948 
949     /* If there's a current outgoing frame, complete it with the specific error code.
950      * Any other pending frames will complete with the generic CONNECTION_CLOSED error. */
951     if (websocket->thread_data.current_outgoing_frame) {
952         s_destroy_outgoing_frame(websocket, websocket->thread_data.current_outgoing_frame, error_code);
953         websocket->thread_data.current_outgoing_frame = NULL;
954     }
955 
956     /* If we're in the final stages of shutdown, ensure shutdown completes.
957      * Otherwise tell the channel to shutdown (it's ok to shutdown the channel redundantly). */
958     if (websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
959         s_finish_shutdown(websocket);
960     } else {
961         AWS_LOGF_ERROR(
962             AWS_LS_HTTP_WEBSOCKET,
963             "id=%p: Closing websocket due to failure during write, error %d (%s).",
964             (void *)websocket,
965             error_code,
966             aws_error_name(error_code));
967         s_schedule_channel_shutdown(websocket, error_code);
968     }
969 }
970 
s_shutdown_due_to_read_err(struct aws_websocket * websocket,int error_code)971 static void s_shutdown_due_to_read_err(struct aws_websocket *websocket, int error_code) {
972     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
973 
974     AWS_LOGF_ERROR(
975         AWS_LS_HTTP_WEBSOCKET,
976         "id=%p: Closing websocket due to failure during read, error %d (%s).",
977         (void *)websocket,
978         error_code,
979         aws_error_name(error_code));
980 
981     websocket->thread_data.is_reading_stopped = true;
982 
983     /* If there's a current incoming frame, complete it with the specific error code. */
984     if (websocket->thread_data.current_incoming_frame) {
985         s_complete_incoming_frame(websocket, error_code, NULL);
986     }
987 
988     /* Tell channel to shutdown (it's ok to call this redundantly) */
989     s_schedule_channel_shutdown(websocket, error_code);
990 }
991 
s_shutdown_channel_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)992 static void s_shutdown_channel_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
993     (void)task;
994 
995     if (status != AWS_TASK_STATUS_RUN_READY) {
996         return;
997     }
998 
999     struct aws_websocket *websocket = arg;
1000     int error_code;
1001 
1002     /* BEGIN CRITICAL SECTION */
1003     s_lock_synced_data(websocket);
1004 
1005     error_code = websocket->synced_data.shutdown_channel_task_error_code;
1006 
1007     s_unlock_synced_data(websocket);
1008     /* END CRITICAL SECTION */
1009 
1010     aws_channel_shutdown(websocket->channel_slot->channel, error_code);
1011 }
1012 
1013 /* Tell the channel to shut down. It is safe to call this multiple times.
1014  * The call to aws_channel_shutdown() is delayed so that a user invoking aws_websocket_close doesn't
1015  * have completion callbacks firing before the function call even returns */
s_schedule_channel_shutdown(struct aws_websocket * websocket,int error_code)1016 static void s_schedule_channel_shutdown(struct aws_websocket *websocket, int error_code) {
1017     bool schedule_shutdown = false;
1018 
1019     /* BEGIN CRITICAL SECTION */
1020     s_lock_synced_data(websocket);
1021 
1022     if (!websocket->synced_data.is_shutdown_channel_task_scheduled) {
1023         schedule_shutdown = true;
1024         websocket->synced_data.is_shutdown_channel_task_scheduled = true;
1025         websocket->synced_data.shutdown_channel_task_error_code = error_code;
1026     }
1027 
1028     s_unlock_synced_data(websocket);
1029     /* END CRITICAL SECTION */
1030 
1031     if (schedule_shutdown) {
1032         aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->shutdown_channel_task);
1033     }
1034 }
1035 
aws_websocket_close(struct aws_websocket * websocket,bool free_scarce_resources_immediately)1036 void aws_websocket_close(struct aws_websocket *websocket, bool free_scarce_resources_immediately) {
1037     bool is_midchannel_handler;
1038 
1039     /* BEGIN CRITICAL SECTION */
1040     s_lock_synced_data(websocket);
1041     is_midchannel_handler = websocket->synced_data.is_midchannel_handler;
1042     s_unlock_synced_data(websocket);
1043     /* END CRITICAL SECTION */
1044 
1045     if (is_midchannel_handler) {
1046         AWS_LOGF_ERROR(
1047             AWS_LS_HTTP_WEBSOCKET,
1048             "id=%p: Ignoring close call, websocket has converted to midchannel handler.",
1049             (void *)websocket);
1050         return;
1051     }
1052 
1053     /* TODO: aws_channel_shutdown() should let users specify error_code and "immediate" as separate parameters.
1054      * Currently, any non-zero error_code results in "immediate" shutdown */
1055     int error_code = AWS_ERROR_SUCCESS;
1056     if (free_scarce_resources_immediately) {
1057         error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
1058     }
1059 
1060     s_schedule_channel_shutdown(websocket, error_code);
1061 }
1062 
s_handler_shutdown(struct aws_channel_handler * handler,struct aws_channel_slot * slot,enum aws_channel_direction dir,int error_code,bool free_scarce_resources_immediately)1063 static int s_handler_shutdown(
1064     struct aws_channel_handler *handler,
1065     struct aws_channel_slot *slot,
1066     enum aws_channel_direction dir,
1067     int error_code,
1068     bool free_scarce_resources_immediately) {
1069 
1070     AWS_ASSERT(aws_channel_thread_is_callers_thread(slot->channel));
1071     struct aws_websocket *websocket = handler->impl;
1072     int err;
1073 
1074     AWS_LOGF_DEBUG(
1075         AWS_LS_HTTP_WEBSOCKET,
1076         "id=%p: Websocket handler shutting down dir=%s error_code=%d immediate=%d.",
1077         (void *)websocket,
1078         dir == AWS_CHANNEL_DIR_READ ? "READ" : "WRITE",
1079         error_code,
1080         free_scarce_resources_immediately);
1081 
1082     if (dir == AWS_CHANNEL_DIR_READ) {
1083         /* Shutdown in the read direction is immediate and simple. */
1084         websocket->thread_data.is_reading_stopped = true;
1085         aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resources_immediately);
1086 
1087     } else {
1088         websocket->thread_data.channel_shutdown_error_code = error_code;
1089         websocket->thread_data.channel_shutdown_free_scarce_resources_immediately = free_scarce_resources_immediately;
1090         websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written = true;
1091 
1092         if (websocket->thread_data.channel_shutdown_free_scarce_resources_immediately ||
1093             websocket->thread_data.is_writing_stopped) {
1094 
1095             AWS_LOGF_TRACE(
1096                 AWS_LS_HTTP_WEBSOCKET,
1097                 "id=%p: Finishing handler shutdown immediately, without ensuring a CLOSE frame was sent.",
1098                 (void *)websocket);
1099 
1100             s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1101             s_finish_shutdown(websocket);
1102         } else {
1103             /* Attempt to queue a CLOSE frame, then wait for it to send before finishing shutdown. */
1104             struct aws_websocket_send_frame_options close_frame = {
1105                 .opcode = AWS_WEBSOCKET_OPCODE_CLOSE,
1106                 .fin = true,
1107             };
1108             err = s_send_frame(websocket, &close_frame, false);
1109             if (err) {
1110                 AWS_LOGF_WARN(
1111                     AWS_LS_HTTP_WEBSOCKET,
1112                     "id=%p: Failed to send CLOSE frame, error %d (%s).",
1113                     (void *)websocket,
1114                     aws_last_error(),
1115                     aws_error_name(aws_last_error()));
1116 
1117                 s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1118                 s_finish_shutdown(websocket);
1119             } else {
1120                 AWS_LOGF_TRACE(
1121                     AWS_LS_HTTP_WEBSOCKET,
1122                     "id=%p: Outgoing CLOSE frame queued, handler will finish shutdown once it's sent.",
1123                     (void *)websocket);
1124                 /* schedule a task to run after 1 sec. If the CLOSE still not sent at that time, we should just cancel
1125                  * sending it and shutdown the channel. */
1126                 uint64_t schedule_time = 0;
1127                 aws_channel_current_clock_time(websocket->channel_slot->channel, &schedule_time);
1128                 schedule_time += AWS_WEBSOCKET_CLOSE_TIMEOUT;
1129                 AWS_LOGF_TRACE(
1130                     AWS_LS_HTTP_WEBSOCKET,
1131                     "id=%p: websocket_close_timeout task will be run at timestamp %" PRIu64,
1132                     (void *)websocket,
1133                     schedule_time);
1134                 aws_channel_schedule_task_future(
1135                     websocket->channel_slot->channel, &websocket->close_timeout_task, schedule_time);
1136             }
1137         }
1138     }
1139 
1140     return AWS_OP_SUCCESS;
1141 }
1142 
s_close_timeout_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)1143 static void s_close_timeout_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
1144     (void)task;
1145     if (status != AWS_TASK_STATUS_RUN_READY) {
1146         /* If channel has shut down, don't need to resume sending payload */
1147         return;
1148     }
1149 
1150     struct aws_websocket *websocket = arg;
1151     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1152 
1153     if (!websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
1154         /* Not waiting for write to complete, which means the CLOSE frame has sent, just do nothing */
1155         return;
1156     }
1157 
1158     AWS_LOGF_WARN(
1159         AWS_LS_HTTP_WEBSOCKET,
1160         "id=%p: Failed to send CLOSE frame, timeout happened, shutdown the channel",
1161         (void *)websocket);
1162 
1163     s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1164     s_finish_shutdown(websocket);
1165 }
1166 
s_finish_shutdown(struct aws_websocket * websocket)1167 static void s_finish_shutdown(struct aws_websocket *websocket) {
1168     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1169     AWS_ASSERT(websocket->thread_data.is_writing_stopped);
1170     AWS_ASSERT(websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written);
1171 
1172     AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Finishing websocket handler shutdown.", (void *)websocket);
1173 
1174     websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written = false;
1175 
1176     /* Cancel all incomplete frames */
1177     if (websocket->thread_data.current_incoming_frame) {
1178         s_complete_incoming_frame(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED, NULL);
1179     }
1180 
1181     if (websocket->thread_data.current_outgoing_frame) {
1182         s_destroy_outgoing_frame(
1183             websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1184         websocket->thread_data.current_outgoing_frame = NULL;
1185     }
1186 
1187     /* BEGIN CRITICAL SECTION */
1188     s_lock_synced_data(websocket);
1189 
1190     while (!aws_linked_list_empty(&websocket->synced_data.outgoing_frame_list)) {
1191         /* Move frames from synced_data to thread_data, then cancel them together outside critical section */
1192         struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->synced_data.outgoing_frame_list);
1193         aws_linked_list_push_back(&websocket->thread_data.outgoing_frame_list, node);
1194     }
1195 
1196     s_unlock_synced_data(websocket);
1197     /* END CRITICAL SECTION */
1198 
1199     while (!aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
1200         struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list);
1201         struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
1202         s_destroy_outgoing_frame(websocket, frame, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1203     }
1204 
1205     aws_channel_slot_on_handler_shutdown_complete(
1206         websocket->channel_slot,
1207         AWS_CHANNEL_DIR_WRITE,
1208         websocket->thread_data.channel_shutdown_error_code,
1209         websocket->thread_data.channel_shutdown_free_scarce_resources_immediately);
1210 }
1211 
s_handler_process_read_message(struct aws_channel_handler * handler,struct aws_channel_slot * slot,struct aws_io_message * message)1212 static int s_handler_process_read_message(
1213     struct aws_channel_handler *handler,
1214     struct aws_channel_slot *slot,
1215     struct aws_io_message *message) {
1216 
1217     AWS_ASSERT(message);
1218     AWS_ASSERT(aws_channel_thread_is_callers_thread(slot->channel));
1219     struct aws_websocket *websocket = handler->impl;
1220     struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&message->message_data);
1221     int err;
1222 
1223     websocket->thread_data.incoming_message_window_update = message->message_data.len;
1224 
1225     AWS_LOGF_TRACE(
1226         AWS_LS_HTTP_WEBSOCKET,
1227         "id=%p: Begin processing incoming message of size %zu.",
1228         (void *)websocket,
1229         message->message_data.len);
1230 
1231     while (cursor.len) {
1232         if (websocket->thread_data.is_reading_stopped) {
1233             goto clean_up;
1234         }
1235 
1236         bool frame_complete;
1237         err = aws_websocket_decoder_process(&websocket->thread_data.decoder, &cursor, &frame_complete);
1238         if (err) {
1239             AWS_LOGF_ERROR(
1240                 AWS_LS_HTTP_WEBSOCKET,
1241                 "id=%p: Failed processing incoming message, error %d (%s). Closing connection.",
1242                 (void *)websocket,
1243                 aws_last_error(),
1244                 aws_error_name(aws_last_error()));
1245 
1246             goto error;
1247         }
1248 
1249         if (frame_complete) {
1250             bool callback_result;
1251             s_complete_incoming_frame(websocket, AWS_ERROR_SUCCESS, &callback_result);
1252             if (!callback_result) {
1253                 AWS_LOGF_ERROR(
1254                     AWS_LS_HTTP_WEBSOCKET,
1255                     "id=%p: Incoming frame completion callback has reported a failure. Closing connection",
1256                     (void *)websocket);
1257 
1258                 aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
1259                 goto error;
1260             }
1261         }
1262     }
1263 
1264     if (websocket->thread_data.incoming_message_window_update > 0) {
1265         err = aws_channel_slot_increment_read_window(slot, websocket->thread_data.incoming_message_window_update);
1266         if (err) {
1267             AWS_LOGF_ERROR(
1268                 AWS_LS_HTTP_WEBSOCKET,
1269                 "id=%p: Failed to increment read window after message processing, error %d (%s). Closing "
1270                 "connection.",
1271                 (void *)websocket,
1272                 aws_last_error(),
1273                 aws_error_name(aws_last_error()));
1274             goto error;
1275         }
1276     }
1277 
1278     goto clean_up;
1279 
1280 error:
1281     s_shutdown_due_to_read_err(websocket, aws_last_error());
1282 
1283 clean_up:
1284     if (cursor.len > 0) {
1285         AWS_LOGF_TRACE(
1286             AWS_LS_HTTP_WEBSOCKET,
1287             "id=%p: Done processing incoming message, final %zu bytes ignored.",
1288             (void *)websocket,
1289             cursor.len);
1290     } else {
1291         AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Done processing incoming message.", (void *)websocket);
1292     }
1293     aws_mem_release(message->allocator, message);
1294     return AWS_OP_SUCCESS;
1295 }
1296 
s_decoder_on_frame(const struct aws_websocket_frame * frame,void * user_data)1297 static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *user_data) {
1298     struct aws_websocket *websocket = user_data;
1299     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1300     AWS_ASSERT(!websocket->thread_data.current_incoming_frame);
1301     AWS_ASSERT(!websocket->thread_data.is_reading_stopped);
1302 
1303     websocket->thread_data.current_incoming_frame = &websocket->thread_data.incoming_frame_storage;
1304 
1305     websocket->thread_data.current_incoming_frame->payload_length = frame->payload_length;
1306     websocket->thread_data.current_incoming_frame->opcode = frame->opcode;
1307     websocket->thread_data.current_incoming_frame->fin = frame->fin;
1308     websocket->thread_data.current_incoming_frame->rsv[0] = frame->rsv[0];
1309     websocket->thread_data.current_incoming_frame->rsv[1] = frame->rsv[1];
1310     websocket->thread_data.current_incoming_frame->rsv[2] = frame->rsv[2];
1311 
1312     /* If CONTINUATION frames are expected, remember which type of data is being continued.
1313      * RFC-6455 Section 5.4 Fragmentation */
1314     if (aws_websocket_is_data_frame(frame->opcode)) {
1315         if (frame->opcode != AWS_WEBSOCKET_OPCODE_CONTINUATION) {
1316             if (frame->fin) {
1317                 websocket->thread_data.continuation_of_opcode = 0;
1318             } else {
1319                 websocket->thread_data.continuation_of_opcode = frame->opcode;
1320             }
1321         }
1322     }
1323 
1324     /* Invoke user cb */
1325     bool callback_result = true;
1326     if (websocket->on_incoming_frame_begin && !websocket->thread_data.is_midchannel_handler) {
1327         callback_result = websocket->on_incoming_frame_begin(
1328             websocket, websocket->thread_data.current_incoming_frame, websocket->user_data);
1329     }
1330 
1331     if (!callback_result) {
1332         AWS_LOGF_ERROR(
1333             AWS_LS_HTTP_WEBSOCKET, "id=%p: Incoming frame callback has reported a failure.", (void *)websocket);
1334         return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
1335     }
1336 
1337     return AWS_OP_SUCCESS;
1338 }
1339 
s_decoder_on_payload(struct aws_byte_cursor data,void * user_data)1340 static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data) {
1341     struct aws_websocket *websocket = user_data;
1342     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1343     AWS_ASSERT(websocket->thread_data.current_incoming_frame);
1344     AWS_ASSERT(!websocket->thread_data.is_reading_stopped);
1345 
1346     if (websocket->thread_data.is_midchannel_handler) {
1347         return s_decoder_on_midchannel_payload(websocket, data);
1348     }
1349 
1350     return s_decoder_on_user_payload(websocket, data);
1351 }
1352 
1353 /* Invoke user cb */
s_decoder_on_user_payload(struct aws_websocket * websocket,struct aws_byte_cursor data)1354 static int s_decoder_on_user_payload(struct aws_websocket *websocket, struct aws_byte_cursor data) {
1355     if (!websocket->on_incoming_frame_payload) {
1356         return AWS_OP_SUCCESS;
1357     }
1358 
1359     if (!websocket->on_incoming_frame_payload(
1360             websocket, websocket->thread_data.current_incoming_frame, data, websocket->user_data)) {
1361 
1362         AWS_LOGF_ERROR(
1363             AWS_LS_HTTP_WEBSOCKET, "id=%p: Incoming payload callback has reported a failure.", (void *)websocket);
1364         return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
1365     }
1366 
1367     /* If user reduced window_update_size, reduce how much the websocket will update its window */
1368     if (websocket->manual_window_update) {
1369         size_t reduce = data.len;
1370         websocket->thread_data.incoming_message_window_update -= data.len;
1371         AWS_LOGF_DEBUG(
1372             AWS_LS_HTTP_WEBSOCKET,
1373             "id=%p: Incoming payload callback changed window update size, window will shrink by %zu.",
1374             (void *)websocket,
1375             reduce);
1376     }
1377 
1378     return AWS_OP_SUCCESS;
1379 }
1380 
1381 /* Pass data to channel handler on the right */
s_decoder_on_midchannel_payload(struct aws_websocket * websocket,struct aws_byte_cursor data)1382 static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, struct aws_byte_cursor data) {
1383     struct aws_io_message *io_msg = NULL;
1384 
1385     /* Only pass data to next handler if it's from a BINARY frame (or the CONTINUATION of a BINARY frame) */
1386     bool is_binary_data = websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_BINARY ||
1387                           (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_CONTINUATION &&
1388                            websocket->thread_data.continuation_of_opcode == AWS_WEBSOCKET_OPCODE_BINARY);
1389     if (!is_binary_data) {
1390         return AWS_OP_SUCCESS;
1391     }
1392 
1393     AWS_ASSERT(websocket->channel_slot->adj_right); /* Expected another slot in the read direction */
1394 
1395     /* Note that current implementation of websocket handler does not buffer data travelling in the "read" direction,
1396      * so the downstream read window needs to be large enough to immediately receive incoming data. */
1397     if (aws_channel_slot_downstream_read_window(websocket->channel_slot) < data.len) {
1398         AWS_LOGF_ERROR(
1399             AWS_LS_HTTP_WEBSOCKET,
1400             "id=%p: Cannot send entire message without exceeding read window.",
1401             (void *)websocket);
1402         aws_raise_error(AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW);
1403         goto error;
1404     }
1405 
1406     io_msg = aws_channel_acquire_message_from_pool(
1407         websocket->channel_slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, data.len);
1408     if (!io_msg) {
1409         AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Failed to acquire message.", (void *)websocket);
1410         goto error;
1411     }
1412 
1413     if (io_msg->message_data.capacity < data.len) {
1414         /* Probably can't happen. Data is coming an aws_io_message, should be able to acquire another just as big */
1415         AWS_LOGF_ERROR(
1416             AWS_LS_HTTP_WEBSOCKET, "id=%p: Failed to acquire sufficiently large message.", (void *)websocket);
1417         aws_raise_error(AWS_ERROR_UNKNOWN);
1418         goto error;
1419     }
1420 
1421     if (!aws_byte_buf_write_from_whole_cursor(&io_msg->message_data, data)) {
1422         AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Unexpected error while copying data.", (void *)websocket);
1423         aws_raise_error(AWS_ERROR_UNKNOWN);
1424         goto error;
1425     }
1426 
1427     int err = aws_channel_slot_send_message(websocket->channel_slot, io_msg, AWS_CHANNEL_DIR_READ);
1428     if (err) {
1429         AWS_LOGF_ERROR(
1430             AWS_LS_HTTP_WEBSOCKET,
1431             "id=%p: Failed to send read message, error %d (%s).",
1432             (void *)websocket,
1433             aws_last_error(),
1434             aws_error_name(aws_last_error()));
1435         goto error;
1436     }
1437 
1438     /* Reduce amount by which websocket will update its read window */
1439     AWS_ASSERT(websocket->thread_data.incoming_message_window_update >= data.len);
1440     websocket->thread_data.incoming_message_window_update -= data.len;
1441 
1442     return AWS_OP_SUCCESS;
1443 
1444 error:
1445     if (io_msg) {
1446         aws_mem_release(io_msg->allocator, io_msg);
1447     }
1448     return AWS_OP_ERR;
1449 }
1450 
s_complete_incoming_frame(struct aws_websocket * websocket,int error_code,bool * out_callback_result)1451 static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result) {
1452     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1453     AWS_ASSERT(websocket->thread_data.current_incoming_frame);
1454 
1455     if (error_code == AWS_OP_SUCCESS) {
1456         /* If this was a CLOSE frame, don't read any more data. */
1457         if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
1458             AWS_LOGF_DEBUG(
1459                 AWS_LS_HTTP_WEBSOCKET,
1460                 "id=%p: Close frame received, any further data received will be ignored.",
1461                 (void *)websocket);
1462             websocket->thread_data.is_reading_stopped = true;
1463 
1464             /* TODO: auto-close if there's a channel-handler to the right */
1465         }
1466 
1467         /* TODO: auto-respond to PING with PONG */
1468     }
1469 
1470     /* Invoke user cb */
1471     bool callback_result = true;
1472     if (websocket->on_incoming_frame_complete && !websocket->thread_data.is_midchannel_handler) {
1473         callback_result = websocket->on_incoming_frame_complete(
1474             websocket, websocket->thread_data.current_incoming_frame, error_code, websocket->user_data);
1475     }
1476 
1477     if (out_callback_result) {
1478         *out_callback_result = callback_result;
1479     }
1480 
1481     websocket->thread_data.current_incoming_frame = NULL;
1482 }
1483 
s_handler_initial_window_size(struct aws_channel_handler * handler)1484 static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) {
1485     struct aws_websocket *websocket = handler->impl;
1486     return websocket->initial_window_size;
1487 }
1488 
s_handler_message_overhead(struct aws_channel_handler * handler)1489 static size_t s_handler_message_overhead(struct aws_channel_handler *handler) {
1490     (void)handler;
1491     return AWS_WEBSOCKET_MAX_FRAME_OVERHEAD;
1492 }
1493 
s_handler_increment_read_window(struct aws_channel_handler * handler,struct aws_channel_slot * slot,size_t size)1494 static int s_handler_increment_read_window(
1495     struct aws_channel_handler *handler,
1496     struct aws_channel_slot *slot,
1497     size_t size) {
1498 
1499     struct aws_websocket *websocket = handler->impl;
1500     AWS_ASSERT(aws_channel_thread_is_callers_thread(slot->channel));
1501     AWS_ASSERT(websocket->thread_data.is_midchannel_handler);
1502 
1503     /* NOTE: This is pretty hacky and should change if it ever causes issues.
1504      *
1505      * Currently, all read messages are processed the moment they're received.
1506      * If the downstream read window is open enough to accept this data, we can send it right along.
1507      * BUT if the downstream window were too small, we'd need to buffer the data and wait until
1508      * the downstream window opened again to finish sending.
1509      *
1510      * To avoid that complexity, we go to pains here to ensure that the websocket's window exactly
1511      * matches the window to the right, allowing us to avoid buffering in the read direction.
1512      */
1513     size_t increment = size;
1514     if (websocket->thread_data.last_known_right_slot != slot->adj_right) {
1515         if (size < slot->window_size) {
1516             AWS_LOGF_ERROR(
1517                 AWS_LS_HTTP_WEBSOCKET,
1518                 "id=%p: The websocket does not support downstream handlers with a smaller window.",
1519                 (void *)websocket);
1520             aws_raise_error(AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW);
1521             goto error;
1522         }
1523 
1524         /* New handler to the right, make sure websocket's window matches its window. */
1525         websocket->thread_data.last_known_right_slot = slot->adj_right;
1526         increment = size - slot->window_size;
1527     }
1528 
1529     if (increment != 0) {
1530         int err = aws_channel_slot_increment_read_window(slot, increment);
1531         if (err) {
1532             goto error;
1533         }
1534     }
1535 
1536     return AWS_OP_SUCCESS;
1537 
1538 error:
1539     websocket->thread_data.is_reading_stopped = true;
1540     /* Shutting down channel because I know that no one ever checks these errors */
1541     s_shutdown_due_to_read_err(websocket, aws_last_error());
1542     return AWS_OP_ERR;
1543 }
1544 
s_increment_read_window_action(struct aws_websocket * websocket,size_t size)1545 static void s_increment_read_window_action(struct aws_websocket *websocket, size_t size) {
1546     AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1547 
1548     int err = aws_channel_slot_increment_read_window(websocket->channel_slot, size);
1549     if (err) {
1550         AWS_LOGF_ERROR(
1551             AWS_LS_HTTP_WEBSOCKET,
1552             "id=%p: Failed to increment read window, error %d (%s). Closing websocket.",
1553             (void *)websocket,
1554             aws_last_error(),
1555             aws_error_name(aws_last_error()));
1556 
1557         s_schedule_channel_shutdown(websocket, aws_last_error());
1558     }
1559 }
1560 
s_increment_read_window_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)1561 static void s_increment_read_window_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
1562     (void)task;
1563 
1564     if (status != AWS_TASK_STATUS_RUN_READY) {
1565         return;
1566     }
1567 
1568     struct aws_websocket *websocket = arg;
1569     size_t size;
1570 
1571     /* BEGIN CRITICAL SECTION */
1572     s_lock_synced_data(websocket);
1573 
1574     size = websocket->synced_data.window_increment_size;
1575     websocket->synced_data.window_increment_size = 0;
1576 
1577     s_unlock_synced_data(websocket);
1578     /* END CRITICAL SECTION */
1579 
1580     AWS_LOGF_TRACE(
1581         AWS_LS_HTTP_WEBSOCKET, "id=%p: Running task to increment read window by %zu.", (void *)websocket, size);
1582 
1583     s_increment_read_window_action(websocket, size);
1584 }
1585 
aws_websocket_increment_read_window(struct aws_websocket * websocket,size_t size)1586 void aws_websocket_increment_read_window(struct aws_websocket *websocket, size_t size) {
1587     if (size == 0) {
1588         AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Ignoring window increment of size 0.", (void *)websocket);
1589         return;
1590     }
1591 
1592     /* Schedule a task to do the increment.
1593      * If task is already scheduled, just increase size to be incremented */
1594     bool is_midchannel_handler = false;
1595     bool should_schedule_task = false;
1596 
1597     /* BEGIN CRITICAL SECTION */
1598     s_lock_synced_data(websocket);
1599 
1600     if (websocket->synced_data.is_midchannel_handler) {
1601         is_midchannel_handler = true;
1602     } else if (websocket->synced_data.window_increment_size == 0) {
1603         should_schedule_task = true;
1604         websocket->synced_data.window_increment_size = size;
1605     } else {
1606         websocket->synced_data.window_increment_size =
1607             aws_add_size_saturating(websocket->synced_data.window_increment_size, size);
1608     }
1609 
1610     s_unlock_synced_data(websocket);
1611     /* END CRITICAL SECTION */
1612 
1613     if (is_midchannel_handler) {
1614         AWS_LOGF_TRACE(
1615             AWS_LS_HTTP_WEBSOCKET,
1616             "id=%p: Ignoring window increment call, websocket has converted to midchannel handler.",
1617             (void *)websocket);
1618     } else if (should_schedule_task) {
1619         AWS_LOGF_TRACE(
1620             AWS_LS_HTTP_WEBSOCKET, "id=%p: Scheduling task to increment read window by %zu.", (void *)websocket, size);
1621         aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->increment_read_window_task);
1622     } else {
1623         AWS_LOGF_TRACE(
1624             AWS_LS_HTTP_WEBSOCKET,
1625             "id=%p: Task to increment read window already scheduled, increasing scheduled size by %zu.",
1626             (void *)websocket,
1627             size);
1628     }
1629 }
1630 
aws_websocket_random_handshake_key(struct aws_byte_buf * dst)1631 int aws_websocket_random_handshake_key(struct aws_byte_buf *dst) {
1632     /* RFC-6455 Section 4.1.
1633      * Derive random 16-byte value, base64-encoded, for the Sec-WebSocket-Key header */
1634     uint8_t key_random_storage[16] = {0};
1635     struct aws_byte_buf key_random_buf = aws_byte_buf_from_empty_array(key_random_storage, sizeof(key_random_storage));
1636     int err = aws_device_random_buffer(&key_random_buf);
1637     if (err) {
1638         return AWS_OP_ERR;
1639     }
1640 
1641     struct aws_byte_cursor key_random_cur = aws_byte_cursor_from_buf(&key_random_buf);
1642     err = aws_base64_encode(&key_random_cur, dst);
1643     if (err) {
1644         return AWS_OP_ERR;
1645     }
1646 
1647     return AWS_OP_SUCCESS;
1648 }
1649 
aws_http_message_new_websocket_handshake_request(struct aws_allocator * allocator,struct aws_byte_cursor path,struct aws_byte_cursor host)1650 struct aws_http_message *aws_http_message_new_websocket_handshake_request(
1651     struct aws_allocator *allocator,
1652     struct aws_byte_cursor path,
1653     struct aws_byte_cursor host) {
1654 
1655     AWS_PRECONDITION(allocator);
1656     AWS_PRECONDITION(aws_byte_cursor_is_valid(&path))
1657     AWS_PRECONDITION(aws_byte_cursor_is_valid(&host))
1658 
1659     struct aws_http_message *request = aws_http_message_new_request(allocator);
1660     if (!request) {
1661         goto error;
1662     }
1663 
1664     int err = aws_http_message_set_request_method(request, aws_http_method_get);
1665     if (err) {
1666         goto error;
1667     }
1668 
1669     err = aws_http_message_set_request_path(request, path);
1670     if (err) {
1671         goto error;
1672     }
1673 
1674     uint8_t key_storage[AWS_WEBSOCKET_MAX_HANDSHAKE_KEY_LENGTH];
1675     struct aws_byte_buf key_buf = aws_byte_buf_from_empty_array(key_storage, sizeof(key_storage));
1676     err = aws_websocket_random_handshake_key(&key_buf);
1677     if (err) {
1678         goto error;
1679     }
1680 
1681     struct aws_http_header required_headers[] = {
1682         {
1683             .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"),
1684             .value = host,
1685         },
1686         {
1687             .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Upgrade"),
1688             .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("websocket"),
1689         },
1690         {
1691             .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Connection"),
1692             .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Upgrade"),
1693         },
1694         {
1695             .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Sec-WebSocket-Key"),
1696             .value = aws_byte_cursor_from_buf(&key_buf),
1697         },
1698         {
1699             .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Sec-WebSocket-Version"),
1700             .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("13"),
1701         },
1702     };
1703 
1704     for (size_t i = 0; i < AWS_ARRAY_SIZE(required_headers); ++i) {
1705         err = aws_http_message_add_header(request, required_headers[i]);
1706         if (err) {
1707             goto error;
1708         }
1709     }
1710 
1711     return request;
1712 
1713 error:
1714     aws_http_message_destroy(request);
1715     return NULL;
1716 }
1717