1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6 #include <aws/http/private/websocket_impl.h>
7
8 #include <aws/common/atomics.h>
9 #include <aws/common/device_random.h>
10 #include <aws/common/encoding.h>
11 #include <aws/common/mutex.h>
12 #include <aws/http/private/websocket_decoder.h>
13 #include <aws/http/private/websocket_encoder.h>
14 #include <aws/http/request_response.h>
15 #include <aws/io/channel.h>
16 #include <aws/io/logging.h>
17
18 #include <inttypes.h>
19
20 #if _MSC_VER
21 # pragma warning(disable : 4204) /* non-constant aggregate initializer */
22 #endif
23
24 /* TODO: echo payload of peer CLOSE */
25
26 /* TODO: Can we be sure socket will always mark aws_io_messages as complete? */
27
28 /* TODO: If something goes wrong during normal shutdown, do I change the error_code? */
29
30 /* TODO: Delayed payload works by sending 0-size io_msgs down pipe and trying again when they're compele.
31 * Do something more efficient? */
32
33 /* TODO: don't fire send completion until data written to socket .. which also means delaying on_shutdown cb */
34
35 /* TODO: stop using the HTTP_PARSE error, give websocket its own error */
36
37 struct outgoing_frame {
38 struct aws_websocket_send_frame_options def;
39 struct aws_linked_list_node node;
40 };
41
42 struct aws_websocket {
43 struct aws_allocator *alloc;
44 struct aws_channel_handler channel_handler;
45 struct aws_channel_slot *channel_slot;
46 size_t initial_window_size;
47 bool manual_window_update;
48
49 void *user_data;
50 aws_websocket_on_incoming_frame_begin_fn *on_incoming_frame_begin;
51 aws_websocket_on_incoming_frame_payload_fn *on_incoming_frame_payload;
52 aws_websocket_on_incoming_frame_complete_fn *on_incoming_frame_complete;
53
54 struct aws_channel_task move_synced_data_to_thread_task;
55 struct aws_channel_task shutdown_channel_task;
56 struct aws_channel_task increment_read_window_task;
57 struct aws_channel_task waiting_on_payload_stream_task;
58 struct aws_channel_task close_timeout_task;
59 bool is_server;
60
61 /* Data that should only be accessed from the websocket's channel thread. */
62 struct {
63 struct aws_websocket_encoder encoder;
64 struct aws_linked_list outgoing_frame_list;
65 struct outgoing_frame *current_outgoing_frame;
66
67 struct aws_websocket_decoder decoder;
68 struct aws_websocket_incoming_frame *current_incoming_frame;
69 struct aws_websocket_incoming_frame incoming_frame_storage;
70
71 /* If current incoming frame is CONTINUATION, this is the data type it is a continuation of. */
72 enum aws_websocket_opcode continuation_of_opcode;
73
74 /* Amount to increment window after a channel message has been processed. */
75 size_t incoming_message_window_update;
76
77 /* Cached slot to right */
78 struct aws_channel_slot *last_known_right_slot;
79
80 /* True when no more frames will be read, due to:
81 * - a CLOSE frame was received
82 * - decoder error
83 * - channel shutdown in read-dir */
84 bool is_reading_stopped;
85
86 /* True when no more frames will be written, due to:
87 * - a CLOSE frame was sent
88 * - encoder error
89 * - channel shutdown in write-dir */
90 bool is_writing_stopped;
91
92 /* During normal shutdown websocket ensures that a CLOSE frame is sent */
93 bool is_shutting_down_and_waiting_for_close_frame_to_be_written;
94 int channel_shutdown_error_code;
95 bool channel_shutdown_free_scarce_resources_immediately;
96
97 /* Wait until each aws_io_message is completely written to
98 * the socket before sending the next aws_io_message */
99 bool is_waiting_for_write_completion;
100
101 /* If, while writing out data from a payload stream, we experience "read would block",
102 * schedule a task to try again in the near-future. */
103 bool is_waiting_on_payload_stream_task;
104
105 /* True if this websocket is being used as a dumb mid-channel handler.
106 * The websocket will no longer respond to its public API or invoke callbacks. */
107 bool is_midchannel_handler;
108 } thread_data;
109
110 /* Data that may be touched from any thread (lock must be held). */
111 struct {
112 struct aws_mutex lock;
113
114 struct aws_linked_list outgoing_frame_list;
115
116 /* If non-zero, then increment_read_window_task is scheduled */
117 size_t window_increment_size;
118
119 /* Error-code returned by aws_websocket_send_frame() when is_writing_stopped is true */
120 int send_frame_error_code;
121
122 /* Use a task to issue a channel shutdown. */
123 int shutdown_channel_task_error_code;
124 bool is_shutdown_channel_task_scheduled;
125
126 bool is_move_synced_data_to_thread_task_scheduled;
127
128 /* Mirrors variable from thread_data */
129 bool is_midchannel_handler;
130
131 /* Whether aws_websocket_release() has been called */
132 bool is_released;
133 } synced_data;
134 };
135
136 static int s_handler_process_read_message(
137 struct aws_channel_handler *handler,
138 struct aws_channel_slot *slot,
139 struct aws_io_message *message);
140
141 static int s_handler_process_write_message(
142 struct aws_channel_handler *handler,
143 struct aws_channel_slot *slot,
144 struct aws_io_message *message);
145
146 static int s_handler_increment_read_window(
147 struct aws_channel_handler *handler,
148 struct aws_channel_slot *slot,
149 size_t size);
150
151 static int s_handler_shutdown(
152 struct aws_channel_handler *handler,
153 struct aws_channel_slot *slot,
154 enum aws_channel_direction dir,
155 int error_code,
156 bool free_scarce_resources_immediately);
157
158 static size_t s_handler_initial_window_size(struct aws_channel_handler *handler);
159 static size_t s_handler_message_overhead(struct aws_channel_handler *handler);
160 static void s_handler_destroy(struct aws_channel_handler *handler);
161
162 static int s_encoder_stream_outgoing_payload(struct aws_byte_buf *out_buf, void *user_data);
163
164 static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *user_data);
165 static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data);
166 static int s_decoder_on_user_payload(struct aws_websocket *websocket, struct aws_byte_cursor data);
167 static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, struct aws_byte_cursor data);
168
169 static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct outgoing_frame *frame, int error_code);
170 static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result);
171 static void s_finish_shutdown(struct aws_websocket *websocket);
172 static void s_io_message_write_completed(
173 struct aws_channel *channel,
174 struct aws_io_message *message,
175 int err_code,
176 void *user_data);
177 static int s_send_frame(
178 struct aws_websocket *websocket,
179 const struct aws_websocket_send_frame_options *options,
180 bool from_public_api);
181 static bool s_midchannel_send_payload(struct aws_websocket *websocket, struct aws_byte_buf *out_buf, void *user_data);
182 static void s_midchannel_send_complete(struct aws_websocket *websocket, int error_code, void *user_data);
183 static void s_move_synced_data_to_thread_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
184 static void s_increment_read_window_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
185 static void s_shutdown_channel_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
186 static void s_waiting_on_payload_stream_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
187 static void s_close_timeout_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
188 static void s_schedule_channel_shutdown(struct aws_websocket *websocket, int error_code);
189 static void s_shutdown_due_to_write_err(struct aws_websocket *websocket, int error_code);
190 static void s_shutdown_due_to_read_err(struct aws_websocket *websocket, int error_code);
191 static void s_stop_writing(struct aws_websocket *websocket, int send_frame_error_code);
192 static void s_try_write_outgoing_frames(struct aws_websocket *websocket);
193
194 static struct aws_channel_handler_vtable s_channel_handler_vtable = {
195 .process_read_message = s_handler_process_read_message,
196 .process_write_message = s_handler_process_write_message,
197 .increment_read_window = s_handler_increment_read_window,
198 .shutdown = s_handler_shutdown,
199 .initial_window_size = s_handler_initial_window_size,
200 .message_overhead = s_handler_message_overhead,
201 .destroy = s_handler_destroy,
202 };
203
aws_websocket_opcode_str(uint8_t opcode)204 const char *aws_websocket_opcode_str(uint8_t opcode) {
205 switch (opcode) {
206 case AWS_WEBSOCKET_OPCODE_CONTINUATION:
207 return "continuation";
208 case AWS_WEBSOCKET_OPCODE_TEXT:
209 return "text";
210 case AWS_WEBSOCKET_OPCODE_BINARY:
211 return "binary";
212 case AWS_WEBSOCKET_OPCODE_CLOSE:
213 return "close";
214 case AWS_WEBSOCKET_OPCODE_PING:
215 return "ping";
216 case AWS_WEBSOCKET_OPCODE_PONG:
217 return "pong";
218 default:
219 return "";
220 }
221 }
222
aws_websocket_is_data_frame(uint8_t opcode)223 bool aws_websocket_is_data_frame(uint8_t opcode) {
224 /* RFC-6455 Section 5.6: Most significant bit of (4 bit) data frame opcode is 0 */
225 return !(opcode & 0x08);
226 }
227
s_lock_synced_data(struct aws_websocket * websocket)228 static void s_lock_synced_data(struct aws_websocket *websocket) {
229 int err = aws_mutex_lock(&websocket->synced_data.lock);
230 AWS_ASSERT(!err);
231 (void)err;
232 }
233
s_unlock_synced_data(struct aws_websocket * websocket)234 static void s_unlock_synced_data(struct aws_websocket *websocket) {
235 int err = aws_mutex_unlock(&websocket->synced_data.lock);
236 AWS_ASSERT(!err);
237 (void)err;
238 }
239
aws_websocket_handler_new(const struct aws_websocket_handler_options * options)240 struct aws_websocket *aws_websocket_handler_new(const struct aws_websocket_handler_options *options) {
241 /* TODO: validate options */
242
243 struct aws_channel_slot *slot = NULL;
244 struct aws_websocket *websocket = NULL;
245 int err;
246
247 slot = aws_channel_slot_new(options->channel);
248 if (!slot) {
249 goto error;
250 }
251
252 err = aws_channel_slot_insert_end(options->channel, slot);
253 if (err) {
254 goto error;
255 }
256
257 websocket = aws_mem_calloc(options->allocator, 1, sizeof(struct aws_websocket));
258 if (!websocket) {
259 goto error;
260 }
261
262 websocket->alloc = options->allocator;
263 websocket->channel_handler.vtable = &s_channel_handler_vtable;
264 websocket->channel_handler.alloc = options->allocator;
265 websocket->channel_handler.impl = websocket;
266
267 websocket->channel_slot = slot;
268
269 websocket->initial_window_size = options->initial_window_size;
270 websocket->manual_window_update = options->manual_window_update;
271
272 websocket->user_data = options->user_data;
273 websocket->on_incoming_frame_begin = options->on_incoming_frame_begin;
274 websocket->on_incoming_frame_payload = options->on_incoming_frame_payload;
275 websocket->on_incoming_frame_complete = options->on_incoming_frame_complete;
276
277 websocket->is_server = options->is_server;
278
279 aws_channel_task_init(
280 &websocket->move_synced_data_to_thread_task,
281 s_move_synced_data_to_thread_task,
282 websocket,
283 "websocket_move_synced_data_to_thread");
284 aws_channel_task_init(
285 &websocket->shutdown_channel_task, s_shutdown_channel_task, websocket, "websocket_shutdown_channel");
286 aws_channel_task_init(
287 &websocket->increment_read_window_task,
288 s_increment_read_window_task,
289 websocket,
290 "websocket_increment_read_window");
291 aws_channel_task_init(
292 &websocket->waiting_on_payload_stream_task,
293 s_waiting_on_payload_stream_task,
294 websocket,
295 "websocket_waiting_on_payload_stream");
296 aws_channel_task_init(&websocket->close_timeout_task, s_close_timeout_task, websocket, "websocket_close_timeout");
297
298 aws_linked_list_init(&websocket->thread_data.outgoing_frame_list);
299
300 aws_websocket_encoder_init(&websocket->thread_data.encoder, s_encoder_stream_outgoing_payload, websocket);
301
302 aws_websocket_decoder_init(&websocket->thread_data.decoder, s_decoder_on_frame, s_decoder_on_payload, websocket);
303
304 aws_linked_list_init(&websocket->synced_data.outgoing_frame_list);
305
306 err = aws_mutex_init(&websocket->synced_data.lock);
307 if (err) {
308 AWS_LOGF_ERROR(
309 AWS_LS_HTTP_WEBSOCKET,
310 "static: Failed to initialize mutex, error %d (%s).",
311 aws_last_error(),
312 aws_error_name(aws_last_error()));
313
314 goto error;
315 }
316
317 err = aws_channel_slot_set_handler(slot, &websocket->channel_handler);
318 if (err) {
319 goto error;
320 }
321
322 /* Ensure websocket (and the rest of the channel) can't be destroyed until aws_websocket_release() is called */
323 aws_channel_acquire_hold(options->channel);
324
325 return websocket;
326
327 error:
328 if (slot) {
329 if (websocket && !slot->handler) {
330 websocket->channel_handler.vtable->destroy(&websocket->channel_handler);
331 }
332 aws_channel_slot_remove(slot);
333 }
334 return NULL;
335 }
336
s_handler_destroy(struct aws_channel_handler * handler)337 static void s_handler_destroy(struct aws_channel_handler *handler) {
338 struct aws_websocket *websocket = handler->impl;
339 AWS_ASSERT(!websocket->thread_data.current_outgoing_frame);
340 AWS_ASSERT(!websocket->thread_data.current_incoming_frame);
341
342 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Destroying websocket.", (void *)websocket);
343
344 aws_mutex_clean_up(&websocket->synced_data.lock);
345 aws_mem_release(websocket->alloc, websocket);
346 }
347
aws_websocket_release(struct aws_websocket * websocket)348 void aws_websocket_release(struct aws_websocket *websocket) {
349 AWS_ASSERT(websocket);
350 AWS_ASSERT(websocket->channel_slot);
351
352 bool was_already_released;
353
354 /* BEGIN CRITICAL SECTION */
355 s_lock_synced_data(websocket);
356 if (websocket->synced_data.is_released) {
357 was_already_released = true;
358 } else {
359 was_already_released = false;
360 websocket->synced_data.is_released = true;
361 }
362 s_unlock_synced_data(websocket);
363 /* END CRITICAL SECTION */
364
365 if (was_already_released) {
366 AWS_LOGF_TRACE(
367 AWS_LS_HTTP_WEBSOCKET, "id=%p: Ignoring multiple calls to websocket release.", (void *)websocket);
368 return;
369 }
370
371 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket released, shut down if necessary.", (void *)websocket);
372
373 /* Channel might already be shut down, but make sure */
374 s_schedule_channel_shutdown(websocket, AWS_ERROR_SUCCESS);
375
376 /* Channel won't destroy its slots/handlers until its refcount reaches 0 */
377 aws_channel_release_hold(websocket->channel_slot->channel);
378 }
379
aws_websocket_get_channel(const struct aws_websocket * websocket)380 struct aws_channel *aws_websocket_get_channel(const struct aws_websocket *websocket) {
381 return websocket->channel_slot->channel;
382 }
383
aws_websocket_convert_to_midchannel_handler(struct aws_websocket * websocket)384 int aws_websocket_convert_to_midchannel_handler(struct aws_websocket *websocket) {
385 if (!aws_channel_thread_is_callers_thread(websocket->channel_slot->channel)) {
386 AWS_LOGF_ERROR(
387 AWS_LS_HTTP_WEBSOCKET, "id=%p: Cannot convert to midchannel handler on this thread.", (void *)websocket);
388 return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
389 }
390
391 if (websocket->thread_data.is_midchannel_handler) {
392 AWS_LOGF_ERROR(
393 AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket has already converted to midchannel handler.", (void *)websocket);
394 return aws_raise_error(AWS_ERROR_HTTP_WEBSOCKET_IS_MIDCHANNEL_HANDLER);
395 }
396
397 if (websocket->thread_data.is_reading_stopped || websocket->thread_data.is_writing_stopped) {
398 AWS_LOGF_ERROR(
399 AWS_LS_HTTP_WEBSOCKET,
400 "id=%p: Cannot convert websocket to midchannel handler because it is closed or closing.",
401 (void *)websocket);
402 return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
403 }
404
405 if (websocket->thread_data.current_incoming_frame) {
406 AWS_LOGF_ERROR(
407 AWS_LS_HTTP_WEBSOCKET,
408 "id=%p: Cannot convert to midchannel handler in the middle of an incoming frame.",
409 (void *)websocket);
410 return aws_raise_error(AWS_ERROR_INVALID_STATE);
411 }
412
413 bool was_released = false;
414
415 /* BEGIN CRITICAL SECTION */
416 s_lock_synced_data(websocket);
417 if (websocket->synced_data.is_released) {
418 was_released = true;
419 } else {
420 websocket->synced_data.is_midchannel_handler = true;
421 }
422 s_unlock_synced_data(websocket);
423 /* END CRITICAL SECTION */
424
425 if (was_released) {
426 AWS_LOGF_ERROR(
427 AWS_LS_HTTP_WEBSOCKET,
428 "id=%p: Cannot convert websocket to midchannel handler because it was already released.",
429 (void *)websocket);
430 return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
431 }
432
433 websocket->thread_data.is_midchannel_handler = true;
434
435 return AWS_OP_SUCCESS;
436 }
437
438 /* Insert frame into list, sorting by priority, then by age (high-priority and older frames towards the front) */
s_enqueue_prioritized_frame(struct aws_linked_list * list,struct outgoing_frame * to_add)439 static void s_enqueue_prioritized_frame(struct aws_linked_list *list, struct outgoing_frame *to_add) {
440 /* Iterate in reverse so that common case (a bunch of low-priority frames) is O(1) */
441 struct aws_linked_list_node *rev_iter = aws_linked_list_rbegin(list);
442 const struct aws_linked_list_node *rev_end = aws_linked_list_rend(list);
443 while (rev_iter != rev_end) {
444 struct outgoing_frame *frame_i = AWS_CONTAINER_OF(rev_iter, struct outgoing_frame, node);
445 if (to_add->def.high_priority == frame_i->def.high_priority) {
446 break;
447 }
448 rev_iter = aws_linked_list_prev(rev_iter);
449 }
450
451 aws_linked_list_insert_after(rev_iter, &to_add->node);
452 }
453
s_send_frame(struct aws_websocket * websocket,const struct aws_websocket_send_frame_options * options,bool from_public_api)454 static int s_send_frame(
455 struct aws_websocket *websocket,
456 const struct aws_websocket_send_frame_options *options,
457 bool from_public_api) {
458
459 AWS_ASSERT(websocket);
460 AWS_ASSERT(options);
461
462 /* Check for bad input. Log about non-obvious errors. */
463 if (options->high_priority && aws_websocket_is_data_frame(options->opcode)) {
464 AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Data frames cannot be sent as high-priority.", (void *)websocket);
465 return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
466 }
467 if (options->payload_length > 0 && !options->stream_outgoing_payload) {
468 AWS_LOGF_ERROR(
469 AWS_LS_HTTP_WEBSOCKET,
470 "id=%p: Invalid frame options, payload streaming function required when payload length is non-zero.",
471 (void *)websocket);
472 return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
473 }
474
475 struct outgoing_frame *frame = aws_mem_calloc(websocket->alloc, 1, sizeof(struct outgoing_frame));
476 if (!frame) {
477 return AWS_OP_ERR;
478 }
479
480 frame->def = *options;
481
482 /* Enqueue frame, unless no further sending is allowed. */
483 int send_error = 0;
484 bool should_schedule_task = false;
485
486 /* BEGIN CRITICAL SECTION */
487 s_lock_synced_data(websocket);
488
489 if (websocket->synced_data.is_midchannel_handler && from_public_api) {
490 send_error = AWS_ERROR_HTTP_WEBSOCKET_IS_MIDCHANNEL_HANDLER;
491 } else if (websocket->synced_data.send_frame_error_code) {
492 send_error = websocket->synced_data.send_frame_error_code;
493 } else {
494 aws_linked_list_push_back(&websocket->synced_data.outgoing_frame_list, &frame->node);
495 if (!websocket->synced_data.is_move_synced_data_to_thread_task_scheduled) {
496 websocket->synced_data.is_move_synced_data_to_thread_task_scheduled = true;
497 should_schedule_task = true;
498 }
499 }
500
501 s_unlock_synced_data(websocket);
502 /* END CRITICAL SECTION */
503
504 if (send_error) {
505 AWS_LOGF_ERROR(
506 AWS_LS_HTTP_WEBSOCKET,
507 "id=%p: Cannot send frame, error %d (%s).",
508 (void *)websocket,
509 send_error,
510 aws_error_name(send_error));
511
512 aws_mem_release(websocket->alloc, frame);
513 return aws_raise_error(send_error);
514 }
515
516 AWS_LOGF_DEBUG(
517 AWS_LS_HTTP_WEBSOCKET,
518 "id=%p: Enqueuing outgoing frame with opcode=%" PRIu8 "(%s) length=%" PRIu64 " fin=%s priority=%s",
519 (void *)websocket,
520 options->opcode,
521 aws_websocket_opcode_str(options->opcode),
522 options->payload_length,
523 options->fin ? "T" : "F",
524 options->high_priority ? "high" : "normal");
525
526 if (should_schedule_task) {
527 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Scheduling synced data task.", (void *)websocket);
528 aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->move_synced_data_to_thread_task);
529 }
530
531 return AWS_OP_SUCCESS;
532 }
533
aws_websocket_send_frame(struct aws_websocket * websocket,const struct aws_websocket_send_frame_options * options)534 int aws_websocket_send_frame(struct aws_websocket *websocket, const struct aws_websocket_send_frame_options *options) {
535 return s_send_frame(websocket, options, true);
536 }
537
s_move_synced_data_to_thread_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)538 static void s_move_synced_data_to_thread_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
539 (void)task;
540 if (status != AWS_TASK_STATUS_RUN_READY) {
541 return;
542 }
543
544 struct aws_websocket *websocket = arg;
545 struct aws_linked_list tmp_list;
546 aws_linked_list_init(&tmp_list);
547
548 /* BEGIN CRITICAL SECTION */
549 s_lock_synced_data(websocket);
550
551 aws_linked_list_swap_contents(&websocket->synced_data.outgoing_frame_list, &tmp_list);
552
553 websocket->synced_data.is_move_synced_data_to_thread_task_scheduled = false;
554
555 s_unlock_synced_data(websocket);
556 /* END CRITICAL SECTION */
557
558 if (!aws_linked_list_empty(&tmp_list)) {
559 do {
560 struct aws_linked_list_node *node = aws_linked_list_pop_front(&tmp_list);
561 struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
562 s_enqueue_prioritized_frame(&websocket->thread_data.outgoing_frame_list, frame);
563 } while (!aws_linked_list_empty(&tmp_list));
564
565 s_try_write_outgoing_frames(websocket);
566 }
567 }
568
s_try_write_outgoing_frames(struct aws_websocket * websocket)569 static void s_try_write_outgoing_frames(struct aws_websocket *websocket) {
570 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
571 int err;
572
573 /* Check whether we should be writing data */
574 if (!websocket->thread_data.current_outgoing_frame &&
575 aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
576
577 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No data to write at this time.", (void *)websocket);
578 return;
579 }
580
581 if (websocket->thread_data.is_waiting_for_write_completion) {
582 AWS_LOGF_TRACE(
583 AWS_LS_HTTP_WEBSOCKET,
584 "id=%p: Waiting until outstanding aws_io_message is written to socket before sending more data.",
585 (void *)websocket);
586 return;
587 }
588
589 if (websocket->thread_data.is_writing_stopped) {
590 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket is no longer sending data.", (void *)websocket);
591 return;
592 }
593
594 /* Acquire aws_io_message */
595 struct aws_io_message *io_msg = aws_channel_slot_acquire_max_message_for_write(websocket->channel_slot);
596 if (!io_msg) {
597 AWS_LOGF_ERROR(
598 AWS_LS_HTTP_WEBSOCKET,
599 "id=%p: Failed acquire message from pool, error %d (%s).",
600 (void *)websocket,
601 aws_last_error(),
602 aws_error_name(aws_last_error()));
603 goto error;
604 }
605
606 io_msg->user_data = websocket;
607 io_msg->on_completion = s_io_message_write_completed;
608
609 /* Loop through frames, writing their data into the io_msg */
610 bool wrote_close_frame = false;
611 while (!websocket->thread_data.is_writing_stopped) {
612 if (websocket->thread_data.current_outgoing_frame) {
613 AWS_LOGF_TRACE(
614 AWS_LS_HTTP_WEBSOCKET,
615 "id=%p: Resuming write of frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
616 (void *)websocket,
617 (void *)websocket->thread_data.current_outgoing_frame,
618 websocket->thread_data.current_outgoing_frame->def.opcode,
619 aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
620 websocket->thread_data.current_outgoing_frame->def.payload_length);
621
622 } else {
623 /* We're not in the middle of encoding a frame, so pop off the next one to encode. */
624 if (aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
625 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No more frames to write.", (void *)websocket);
626 break;
627 }
628
629 struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list);
630 websocket->thread_data.current_outgoing_frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
631
632 struct aws_websocket_frame frame = {
633 .fin = websocket->thread_data.current_outgoing_frame->def.fin,
634 .opcode = websocket->thread_data.current_outgoing_frame->def.opcode,
635 .payload_length = websocket->thread_data.current_outgoing_frame->def.payload_length,
636 };
637
638 /* RFC-6455 Section 5.3 Client-to-Server Masking
639 * Clients must mask payload with key derived from an unpredictable source of entropy. */
640 if (!websocket->is_server) {
641 frame.masked = true;
642 /* TODO: faster source of random (but still seeded by device_random) */
643 struct aws_byte_buf masking_key_buf = aws_byte_buf_from_empty_array(frame.masking_key, 4);
644 err = aws_device_random_buffer(&masking_key_buf);
645 if (err) {
646 AWS_LOGF_ERROR(
647 AWS_LS_HTTP_WEBSOCKET,
648 "id=%p: Failed to derive masking key, error %d (%s).",
649 (void *)websocket,
650 aws_last_error(),
651 aws_error_name(aws_last_error()));
652 goto error;
653 }
654 }
655
656 err = aws_websocket_encoder_start_frame(&websocket->thread_data.encoder, &frame);
657 if (err) {
658 AWS_LOGF_ERROR(
659 AWS_LS_HTTP_WEBSOCKET,
660 "id=%p: Failed to start frame encoding, error %d (%s).",
661 (void *)websocket,
662 aws_last_error(),
663 aws_error_name(aws_last_error()));
664 goto error;
665 }
666
667 AWS_LOGF_TRACE(
668 AWS_LS_HTTP_WEBSOCKET,
669 "id=%p: Start writing frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
670 (void *)websocket,
671 (void *)websocket->thread_data.current_outgoing_frame,
672 websocket->thread_data.current_outgoing_frame->def.opcode,
673 aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
674 websocket->thread_data.current_outgoing_frame->def.payload_length);
675 }
676
677 err = aws_websocket_encoder_process(&websocket->thread_data.encoder, &io_msg->message_data);
678 if (err) {
679 AWS_LOGF_ERROR(
680 AWS_LS_HTTP_WEBSOCKET,
681 "id=%p: Frame encoding failed with error %d (%s).",
682 (void *)websocket,
683 aws_last_error(),
684 aws_error_name(aws_last_error()));
685 goto error;
686 }
687
688 if (aws_websocket_encoder_is_frame_in_progress(&websocket->thread_data.encoder)) {
689 AWS_LOGF_TRACE(
690 AWS_LS_HTTP_WEBSOCKET,
691 "id=%p: Outgoing frame still in progress, but no more data can be written at this time.",
692 (void *)websocket);
693 break;
694 }
695
696 if (websocket->thread_data.current_outgoing_frame->def.opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
697 wrote_close_frame = true;
698 }
699
700 s_destroy_outgoing_frame(websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_SUCCESS);
701 websocket->thread_data.current_outgoing_frame = NULL;
702
703 if (wrote_close_frame) {
704 break;
705 }
706 }
707
708 /* If payload stream didn't have any bytes available to read right now, then the aws_io_message might be empty.
709 * If this is the case schedule a task to try again in the future. */
710 if (io_msg->message_data.len == 0) {
711 AWS_LOGF_TRACE(
712 AWS_LS_HTTP_WEBSOCKET,
713 "id=%p: Reading from payload stream would block, will try again later.",
714 (void *)websocket);
715
716 if (!websocket->thread_data.is_waiting_on_payload_stream_task) {
717 websocket->thread_data.is_waiting_on_payload_stream_task = true;
718
719 /* Future Optimization Idea: Minimize work while we wait. Use some kind of backoff for the retry timing,
720 * or have some way for stream to notify when more data is available. */
721 aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->waiting_on_payload_stream_task);
722 }
723
724 aws_mem_release(io_msg->allocator, io_msg);
725 return;
726 }
727
728 /* Prepare to send aws_io_message up the channel.
729 * Note that the write-completion callback may fire before send_message() returns */
730
731 /* If CLOSE frame was written, that's the last data we'll write */
732 if (wrote_close_frame) {
733 s_stop_writing(websocket, AWS_ERROR_HTTP_WEBSOCKET_CLOSE_FRAME_SENT);
734 }
735
736 AWS_LOGF_TRACE(
737 AWS_LS_HTTP_WEBSOCKET,
738 "id=%p: Sending aws_io_message of size %zu in write direction.",
739 (void *)websocket,
740 io_msg->message_data.len);
741
742 websocket->thread_data.is_waiting_for_write_completion = true;
743 err = aws_channel_slot_send_message(websocket->channel_slot, io_msg, AWS_CHANNEL_DIR_WRITE);
744 if (err) {
745 websocket->thread_data.is_waiting_for_write_completion = false;
746 AWS_LOGF_ERROR(
747 AWS_LS_HTTP_WEBSOCKET,
748 "id=%p: Failed to send message in write direction, error %d (%s).",
749 (void *)websocket,
750 aws_last_error(),
751 aws_error_name(aws_last_error()));
752 goto error;
753 }
754
755 /* Finish shutdown if we were waiting for the CLOSE frame to be written */
756 if (wrote_close_frame && websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
757 AWS_LOGF_TRACE(
758 AWS_LS_HTTP_WEBSOCKET, "id=%p: CLOSE frame sent, finishing handler shutdown sequence.", (void *)websocket);
759
760 s_finish_shutdown(websocket);
761 }
762
763 return;
764
765 error:
766 if (io_msg) {
767 aws_mem_release(io_msg->allocator, io_msg);
768 }
769
770 s_shutdown_due_to_write_err(websocket, aws_last_error());
771 }
772
773 /* Encoder's outgoing_payload callback invokes current frame's callback */
s_encoder_stream_outgoing_payload(struct aws_byte_buf * out_buf,void * user_data)774 static int s_encoder_stream_outgoing_payload(struct aws_byte_buf *out_buf, void *user_data) {
775 struct aws_websocket *websocket = user_data;
776 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
777 AWS_ASSERT(websocket->thread_data.current_outgoing_frame);
778
779 struct outgoing_frame *current_frame = websocket->thread_data.current_outgoing_frame;
780 AWS_ASSERT(current_frame->def.stream_outgoing_payload);
781
782 bool callback_result = current_frame->def.stream_outgoing_payload(websocket, out_buf, current_frame->def.user_data);
783 if (!callback_result) {
784 AWS_LOGF_ERROR(
785 AWS_LS_HTTP_WEBSOCKET, "id=%p: Outgoing payload callback has reported a failure.", (void *)websocket);
786 return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
787 }
788
789 return AWS_OP_SUCCESS;
790 }
791
s_waiting_on_payload_stream_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)792 static void s_waiting_on_payload_stream_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
793 (void)task;
794 if (status != AWS_TASK_STATUS_RUN_READY) {
795 /* If channel has shut down, don't need to resume sending payload */
796 return;
797 }
798
799 struct aws_websocket *websocket = arg;
800 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
801
802 AWS_LOGF_TRACE(
803 AWS_LS_HTTP_WEBSOCKET, "id=%p: Done waiting for payload stream, sending more data...", (void *)websocket);
804
805 websocket->thread_data.is_waiting_on_payload_stream_task = false;
806 s_try_write_outgoing_frames(websocket);
807 }
808
s_io_message_write_completed(struct aws_channel * channel,struct aws_io_message * message,int err_code,void * user_data)809 static void s_io_message_write_completed(
810 struct aws_channel *channel,
811 struct aws_io_message *message,
812 int err_code,
813 void *user_data) {
814
815 (void)channel;
816 (void)message;
817 struct aws_websocket *websocket = user_data;
818 AWS_ASSERT(aws_channel_thread_is_callers_thread(channel));
819
820 if (err_code == AWS_ERROR_SUCCESS) {
821 AWS_LOGF_TRACE(
822 AWS_LS_HTTP_WEBSOCKET, "id=%p: aws_io_message written to socket, sending more data...", (void *)websocket);
823
824 websocket->thread_data.is_waiting_for_write_completion = false;
825 s_try_write_outgoing_frames(websocket);
826 } else {
827 AWS_LOGF_TRACE(
828 AWS_LS_HTTP_WEBSOCKET,
829 "id=%p: aws_io_message did not finish writing to socket, error %d (%s).",
830 (void *)websocket,
831 err_code,
832 aws_error_name(err_code));
833
834 s_shutdown_due_to_write_err(websocket, err_code);
835 }
836 }
837
s_handler_process_write_message(struct aws_channel_handler * handler,struct aws_channel_slot * slot,struct aws_io_message * message)838 static int s_handler_process_write_message(
839 struct aws_channel_handler *handler,
840 struct aws_channel_slot *slot,
841 struct aws_io_message *message) {
842
843 (void)slot;
844 struct aws_websocket *websocket = handler->impl;
845 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
846
847 /* For each aws_io_message headed in the write direction, send a BINARY frame,
848 * where the frame's payload is the data from this aws_io_message. */
849 struct aws_websocket_send_frame_options options = {
850 .payload_length = message->message_data.len,
851 .user_data = message,
852 .stream_outgoing_payload = s_midchannel_send_payload,
853 .on_complete = s_midchannel_send_complete,
854 .opcode = AWS_WEBSOCKET_OPCODE_BINARY,
855 .fin = true,
856 };
857
858 /* Use copy_mark to track progress as the data is streamed out */
859 message->copy_mark = 0;
860
861 int err = s_send_frame(websocket, &options, false);
862 if (err) {
863 /* TODO: mqtt handler needs to clean up messsages that fail to send. */
864 return AWS_OP_ERR;
865 }
866
867 return AWS_OP_SUCCESS;
868 }
869
870 /* Callback for writing data from downstream aws_io_messages into payload of BINARY frames headed upstream */
s_midchannel_send_payload(struct aws_websocket * websocket,struct aws_byte_buf * out_buf,void * user_data)871 static bool s_midchannel_send_payload(struct aws_websocket *websocket, struct aws_byte_buf *out_buf, void *user_data) {
872 (void)websocket;
873 struct aws_io_message *io_msg = user_data;
874
875 /* copy_mark is used to track progress */
876 size_t src_available = io_msg->message_data.len - io_msg->copy_mark;
877 size_t dst_available = out_buf->capacity - out_buf->len;
878 size_t sending = dst_available < src_available ? dst_available : src_available;
879
880 bool success = aws_byte_buf_write(out_buf, io_msg->message_data.buffer + io_msg->copy_mark, sending);
881
882 io_msg->copy_mark += sending;
883 return success;
884 }
885
886 /* Callback when data from downstream aws_io_messages, finishes being sent as a BINARY frame upstream. */
s_midchannel_send_complete(struct aws_websocket * websocket,int error_code,void * user_data)887 static void s_midchannel_send_complete(struct aws_websocket *websocket, int error_code, void *user_data) {
888 (void)websocket;
889 struct aws_io_message *io_msg = user_data;
890
891 if (io_msg->on_completion) {
892 io_msg->on_completion(io_msg->owning_channel, io_msg, error_code, io_msg->user_data);
893 }
894
895 aws_mem_release(io_msg->allocator, io_msg);
896 }
897
s_destroy_outgoing_frame(struct aws_websocket * websocket,struct outgoing_frame * frame,int error_code)898 static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct outgoing_frame *frame, int error_code) {
899 AWS_LOGF_TRACE(
900 AWS_LS_HTTP_WEBSOCKET,
901 "id=%p: Completed outgoing frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 " with error_code %d (%s).",
902 (void *)websocket,
903 (void *)frame,
904 frame->def.opcode,
905 aws_websocket_opcode_str(frame->def.opcode),
906 frame->def.payload_length,
907 error_code,
908 aws_error_name(error_code));
909
910 if (frame->def.on_complete) {
911 frame->def.on_complete(websocket, error_code, frame->def.user_data);
912 }
913
914 aws_mem_release(websocket->alloc, frame);
915 }
916
s_stop_writing(struct aws_websocket * websocket,int send_frame_error_code)917 static void s_stop_writing(struct aws_websocket *websocket, int send_frame_error_code) {
918 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
919 AWS_ASSERT(send_frame_error_code != AWS_ERROR_SUCCESS);
920
921 if (websocket->thread_data.is_writing_stopped) {
922 return;
923 }
924
925 AWS_LOGF_TRACE(
926 AWS_LS_HTTP_WEBSOCKET,
927 "id=%p: Websocket will send no more data, future attempts to send will get error %d (%s).",
928 (void *)websocket,
929 send_frame_error_code,
930 aws_error_name(send_frame_error_code));
931
932 /* BEGIN CRITICAL SECTION */
933 s_lock_synced_data(websocket);
934
935 websocket->synced_data.send_frame_error_code = send_frame_error_code;
936
937 s_unlock_synced_data(websocket);
938 /* END CRITICAL SECTION */
939
940 websocket->thread_data.is_writing_stopped = true;
941 }
942
s_shutdown_due_to_write_err(struct aws_websocket * websocket,int error_code)943 static void s_shutdown_due_to_write_err(struct aws_websocket *websocket, int error_code) {
944 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
945
946 /* No more writing allowed (it's ok to call this redundantly). */
947 s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
948
949 /* If there's a current outgoing frame, complete it with the specific error code.
950 * Any other pending frames will complete with the generic CONNECTION_CLOSED error. */
951 if (websocket->thread_data.current_outgoing_frame) {
952 s_destroy_outgoing_frame(websocket, websocket->thread_data.current_outgoing_frame, error_code);
953 websocket->thread_data.current_outgoing_frame = NULL;
954 }
955
956 /* If we're in the final stages of shutdown, ensure shutdown completes.
957 * Otherwise tell the channel to shutdown (it's ok to shutdown the channel redundantly). */
958 if (websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
959 s_finish_shutdown(websocket);
960 } else {
961 AWS_LOGF_ERROR(
962 AWS_LS_HTTP_WEBSOCKET,
963 "id=%p: Closing websocket due to failure during write, error %d (%s).",
964 (void *)websocket,
965 error_code,
966 aws_error_name(error_code));
967 s_schedule_channel_shutdown(websocket, error_code);
968 }
969 }
970
s_shutdown_due_to_read_err(struct aws_websocket * websocket,int error_code)971 static void s_shutdown_due_to_read_err(struct aws_websocket *websocket, int error_code) {
972 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
973
974 AWS_LOGF_ERROR(
975 AWS_LS_HTTP_WEBSOCKET,
976 "id=%p: Closing websocket due to failure during read, error %d (%s).",
977 (void *)websocket,
978 error_code,
979 aws_error_name(error_code));
980
981 websocket->thread_data.is_reading_stopped = true;
982
983 /* If there's a current incoming frame, complete it with the specific error code. */
984 if (websocket->thread_data.current_incoming_frame) {
985 s_complete_incoming_frame(websocket, error_code, NULL);
986 }
987
988 /* Tell channel to shutdown (it's ok to call this redundantly) */
989 s_schedule_channel_shutdown(websocket, error_code);
990 }
991
s_shutdown_channel_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)992 static void s_shutdown_channel_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
993 (void)task;
994
995 if (status != AWS_TASK_STATUS_RUN_READY) {
996 return;
997 }
998
999 struct aws_websocket *websocket = arg;
1000 int error_code;
1001
1002 /* BEGIN CRITICAL SECTION */
1003 s_lock_synced_data(websocket);
1004
1005 error_code = websocket->synced_data.shutdown_channel_task_error_code;
1006
1007 s_unlock_synced_data(websocket);
1008 /* END CRITICAL SECTION */
1009
1010 aws_channel_shutdown(websocket->channel_slot->channel, error_code);
1011 }
1012
1013 /* Tell the channel to shut down. It is safe to call this multiple times.
1014 * The call to aws_channel_shutdown() is delayed so that a user invoking aws_websocket_close doesn't
1015 * have completion callbacks firing before the function call even returns */
s_schedule_channel_shutdown(struct aws_websocket * websocket,int error_code)1016 static void s_schedule_channel_shutdown(struct aws_websocket *websocket, int error_code) {
1017 bool schedule_shutdown = false;
1018
1019 /* BEGIN CRITICAL SECTION */
1020 s_lock_synced_data(websocket);
1021
1022 if (!websocket->synced_data.is_shutdown_channel_task_scheduled) {
1023 schedule_shutdown = true;
1024 websocket->synced_data.is_shutdown_channel_task_scheduled = true;
1025 websocket->synced_data.shutdown_channel_task_error_code = error_code;
1026 }
1027
1028 s_unlock_synced_data(websocket);
1029 /* END CRITICAL SECTION */
1030
1031 if (schedule_shutdown) {
1032 aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->shutdown_channel_task);
1033 }
1034 }
1035
aws_websocket_close(struct aws_websocket * websocket,bool free_scarce_resources_immediately)1036 void aws_websocket_close(struct aws_websocket *websocket, bool free_scarce_resources_immediately) {
1037 bool is_midchannel_handler;
1038
1039 /* BEGIN CRITICAL SECTION */
1040 s_lock_synced_data(websocket);
1041 is_midchannel_handler = websocket->synced_data.is_midchannel_handler;
1042 s_unlock_synced_data(websocket);
1043 /* END CRITICAL SECTION */
1044
1045 if (is_midchannel_handler) {
1046 AWS_LOGF_ERROR(
1047 AWS_LS_HTTP_WEBSOCKET,
1048 "id=%p: Ignoring close call, websocket has converted to midchannel handler.",
1049 (void *)websocket);
1050 return;
1051 }
1052
1053 /* TODO: aws_channel_shutdown() should let users specify error_code and "immediate" as separate parameters.
1054 * Currently, any non-zero error_code results in "immediate" shutdown */
1055 int error_code = AWS_ERROR_SUCCESS;
1056 if (free_scarce_resources_immediately) {
1057 error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
1058 }
1059
1060 s_schedule_channel_shutdown(websocket, error_code);
1061 }
1062
s_handler_shutdown(struct aws_channel_handler * handler,struct aws_channel_slot * slot,enum aws_channel_direction dir,int error_code,bool free_scarce_resources_immediately)1063 static int s_handler_shutdown(
1064 struct aws_channel_handler *handler,
1065 struct aws_channel_slot *slot,
1066 enum aws_channel_direction dir,
1067 int error_code,
1068 bool free_scarce_resources_immediately) {
1069
1070 AWS_ASSERT(aws_channel_thread_is_callers_thread(slot->channel));
1071 struct aws_websocket *websocket = handler->impl;
1072 int err;
1073
1074 AWS_LOGF_DEBUG(
1075 AWS_LS_HTTP_WEBSOCKET,
1076 "id=%p: Websocket handler shutting down dir=%s error_code=%d immediate=%d.",
1077 (void *)websocket,
1078 dir == AWS_CHANNEL_DIR_READ ? "READ" : "WRITE",
1079 error_code,
1080 free_scarce_resources_immediately);
1081
1082 if (dir == AWS_CHANNEL_DIR_READ) {
1083 /* Shutdown in the read direction is immediate and simple. */
1084 websocket->thread_data.is_reading_stopped = true;
1085 aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resources_immediately);
1086
1087 } else {
1088 websocket->thread_data.channel_shutdown_error_code = error_code;
1089 websocket->thread_data.channel_shutdown_free_scarce_resources_immediately = free_scarce_resources_immediately;
1090 websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written = true;
1091
1092 if (websocket->thread_data.channel_shutdown_free_scarce_resources_immediately ||
1093 websocket->thread_data.is_writing_stopped) {
1094
1095 AWS_LOGF_TRACE(
1096 AWS_LS_HTTP_WEBSOCKET,
1097 "id=%p: Finishing handler shutdown immediately, without ensuring a CLOSE frame was sent.",
1098 (void *)websocket);
1099
1100 s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1101 s_finish_shutdown(websocket);
1102 } else {
1103 /* Attempt to queue a CLOSE frame, then wait for it to send before finishing shutdown. */
1104 struct aws_websocket_send_frame_options close_frame = {
1105 .opcode = AWS_WEBSOCKET_OPCODE_CLOSE,
1106 .fin = true,
1107 };
1108 err = s_send_frame(websocket, &close_frame, false);
1109 if (err) {
1110 AWS_LOGF_WARN(
1111 AWS_LS_HTTP_WEBSOCKET,
1112 "id=%p: Failed to send CLOSE frame, error %d (%s).",
1113 (void *)websocket,
1114 aws_last_error(),
1115 aws_error_name(aws_last_error()));
1116
1117 s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1118 s_finish_shutdown(websocket);
1119 } else {
1120 AWS_LOGF_TRACE(
1121 AWS_LS_HTTP_WEBSOCKET,
1122 "id=%p: Outgoing CLOSE frame queued, handler will finish shutdown once it's sent.",
1123 (void *)websocket);
1124 /* schedule a task to run after 1 sec. If the CLOSE still not sent at that time, we should just cancel
1125 * sending it and shutdown the channel. */
1126 uint64_t schedule_time = 0;
1127 aws_channel_current_clock_time(websocket->channel_slot->channel, &schedule_time);
1128 schedule_time += AWS_WEBSOCKET_CLOSE_TIMEOUT;
1129 AWS_LOGF_TRACE(
1130 AWS_LS_HTTP_WEBSOCKET,
1131 "id=%p: websocket_close_timeout task will be run at timestamp %" PRIu64,
1132 (void *)websocket,
1133 schedule_time);
1134 aws_channel_schedule_task_future(
1135 websocket->channel_slot->channel, &websocket->close_timeout_task, schedule_time);
1136 }
1137 }
1138 }
1139
1140 return AWS_OP_SUCCESS;
1141 }
1142
s_close_timeout_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)1143 static void s_close_timeout_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
1144 (void)task;
1145 if (status != AWS_TASK_STATUS_RUN_READY) {
1146 /* If channel has shut down, don't need to resume sending payload */
1147 return;
1148 }
1149
1150 struct aws_websocket *websocket = arg;
1151 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1152
1153 if (!websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
1154 /* Not waiting for write to complete, which means the CLOSE frame has sent, just do nothing */
1155 return;
1156 }
1157
1158 AWS_LOGF_WARN(
1159 AWS_LS_HTTP_WEBSOCKET,
1160 "id=%p: Failed to send CLOSE frame, timeout happened, shutdown the channel",
1161 (void *)websocket);
1162
1163 s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1164 s_finish_shutdown(websocket);
1165 }
1166
s_finish_shutdown(struct aws_websocket * websocket)1167 static void s_finish_shutdown(struct aws_websocket *websocket) {
1168 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1169 AWS_ASSERT(websocket->thread_data.is_writing_stopped);
1170 AWS_ASSERT(websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written);
1171
1172 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Finishing websocket handler shutdown.", (void *)websocket);
1173
1174 websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written = false;
1175
1176 /* Cancel all incomplete frames */
1177 if (websocket->thread_data.current_incoming_frame) {
1178 s_complete_incoming_frame(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED, NULL);
1179 }
1180
1181 if (websocket->thread_data.current_outgoing_frame) {
1182 s_destroy_outgoing_frame(
1183 websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1184 websocket->thread_data.current_outgoing_frame = NULL;
1185 }
1186
1187 /* BEGIN CRITICAL SECTION */
1188 s_lock_synced_data(websocket);
1189
1190 while (!aws_linked_list_empty(&websocket->synced_data.outgoing_frame_list)) {
1191 /* Move frames from synced_data to thread_data, then cancel them together outside critical section */
1192 struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->synced_data.outgoing_frame_list);
1193 aws_linked_list_push_back(&websocket->thread_data.outgoing_frame_list, node);
1194 }
1195
1196 s_unlock_synced_data(websocket);
1197 /* END CRITICAL SECTION */
1198
1199 while (!aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
1200 struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list);
1201 struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
1202 s_destroy_outgoing_frame(websocket, frame, AWS_ERROR_HTTP_CONNECTION_CLOSED);
1203 }
1204
1205 aws_channel_slot_on_handler_shutdown_complete(
1206 websocket->channel_slot,
1207 AWS_CHANNEL_DIR_WRITE,
1208 websocket->thread_data.channel_shutdown_error_code,
1209 websocket->thread_data.channel_shutdown_free_scarce_resources_immediately);
1210 }
1211
s_handler_process_read_message(struct aws_channel_handler * handler,struct aws_channel_slot * slot,struct aws_io_message * message)1212 static int s_handler_process_read_message(
1213 struct aws_channel_handler *handler,
1214 struct aws_channel_slot *slot,
1215 struct aws_io_message *message) {
1216
1217 AWS_ASSERT(message);
1218 AWS_ASSERT(aws_channel_thread_is_callers_thread(slot->channel));
1219 struct aws_websocket *websocket = handler->impl;
1220 struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&message->message_data);
1221 int err;
1222
1223 websocket->thread_data.incoming_message_window_update = message->message_data.len;
1224
1225 AWS_LOGF_TRACE(
1226 AWS_LS_HTTP_WEBSOCKET,
1227 "id=%p: Begin processing incoming message of size %zu.",
1228 (void *)websocket,
1229 message->message_data.len);
1230
1231 while (cursor.len) {
1232 if (websocket->thread_data.is_reading_stopped) {
1233 goto clean_up;
1234 }
1235
1236 bool frame_complete;
1237 err = aws_websocket_decoder_process(&websocket->thread_data.decoder, &cursor, &frame_complete);
1238 if (err) {
1239 AWS_LOGF_ERROR(
1240 AWS_LS_HTTP_WEBSOCKET,
1241 "id=%p: Failed processing incoming message, error %d (%s). Closing connection.",
1242 (void *)websocket,
1243 aws_last_error(),
1244 aws_error_name(aws_last_error()));
1245
1246 goto error;
1247 }
1248
1249 if (frame_complete) {
1250 bool callback_result;
1251 s_complete_incoming_frame(websocket, AWS_ERROR_SUCCESS, &callback_result);
1252 if (!callback_result) {
1253 AWS_LOGF_ERROR(
1254 AWS_LS_HTTP_WEBSOCKET,
1255 "id=%p: Incoming frame completion callback has reported a failure. Closing connection",
1256 (void *)websocket);
1257
1258 aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
1259 goto error;
1260 }
1261 }
1262 }
1263
1264 if (websocket->thread_data.incoming_message_window_update > 0) {
1265 err = aws_channel_slot_increment_read_window(slot, websocket->thread_data.incoming_message_window_update);
1266 if (err) {
1267 AWS_LOGF_ERROR(
1268 AWS_LS_HTTP_WEBSOCKET,
1269 "id=%p: Failed to increment read window after message processing, error %d (%s). Closing "
1270 "connection.",
1271 (void *)websocket,
1272 aws_last_error(),
1273 aws_error_name(aws_last_error()));
1274 goto error;
1275 }
1276 }
1277
1278 goto clean_up;
1279
1280 error:
1281 s_shutdown_due_to_read_err(websocket, aws_last_error());
1282
1283 clean_up:
1284 if (cursor.len > 0) {
1285 AWS_LOGF_TRACE(
1286 AWS_LS_HTTP_WEBSOCKET,
1287 "id=%p: Done processing incoming message, final %zu bytes ignored.",
1288 (void *)websocket,
1289 cursor.len);
1290 } else {
1291 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Done processing incoming message.", (void *)websocket);
1292 }
1293 aws_mem_release(message->allocator, message);
1294 return AWS_OP_SUCCESS;
1295 }
1296
s_decoder_on_frame(const struct aws_websocket_frame * frame,void * user_data)1297 static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *user_data) {
1298 struct aws_websocket *websocket = user_data;
1299 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1300 AWS_ASSERT(!websocket->thread_data.current_incoming_frame);
1301 AWS_ASSERT(!websocket->thread_data.is_reading_stopped);
1302
1303 websocket->thread_data.current_incoming_frame = &websocket->thread_data.incoming_frame_storage;
1304
1305 websocket->thread_data.current_incoming_frame->payload_length = frame->payload_length;
1306 websocket->thread_data.current_incoming_frame->opcode = frame->opcode;
1307 websocket->thread_data.current_incoming_frame->fin = frame->fin;
1308 websocket->thread_data.current_incoming_frame->rsv[0] = frame->rsv[0];
1309 websocket->thread_data.current_incoming_frame->rsv[1] = frame->rsv[1];
1310 websocket->thread_data.current_incoming_frame->rsv[2] = frame->rsv[2];
1311
1312 /* If CONTINUATION frames are expected, remember which type of data is being continued.
1313 * RFC-6455 Section 5.4 Fragmentation */
1314 if (aws_websocket_is_data_frame(frame->opcode)) {
1315 if (frame->opcode != AWS_WEBSOCKET_OPCODE_CONTINUATION) {
1316 if (frame->fin) {
1317 websocket->thread_data.continuation_of_opcode = 0;
1318 } else {
1319 websocket->thread_data.continuation_of_opcode = frame->opcode;
1320 }
1321 }
1322 }
1323
1324 /* Invoke user cb */
1325 bool callback_result = true;
1326 if (websocket->on_incoming_frame_begin && !websocket->thread_data.is_midchannel_handler) {
1327 callback_result = websocket->on_incoming_frame_begin(
1328 websocket, websocket->thread_data.current_incoming_frame, websocket->user_data);
1329 }
1330
1331 if (!callback_result) {
1332 AWS_LOGF_ERROR(
1333 AWS_LS_HTTP_WEBSOCKET, "id=%p: Incoming frame callback has reported a failure.", (void *)websocket);
1334 return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
1335 }
1336
1337 return AWS_OP_SUCCESS;
1338 }
1339
s_decoder_on_payload(struct aws_byte_cursor data,void * user_data)1340 static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data) {
1341 struct aws_websocket *websocket = user_data;
1342 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1343 AWS_ASSERT(websocket->thread_data.current_incoming_frame);
1344 AWS_ASSERT(!websocket->thread_data.is_reading_stopped);
1345
1346 if (websocket->thread_data.is_midchannel_handler) {
1347 return s_decoder_on_midchannel_payload(websocket, data);
1348 }
1349
1350 return s_decoder_on_user_payload(websocket, data);
1351 }
1352
1353 /* Invoke user cb */
s_decoder_on_user_payload(struct aws_websocket * websocket,struct aws_byte_cursor data)1354 static int s_decoder_on_user_payload(struct aws_websocket *websocket, struct aws_byte_cursor data) {
1355 if (!websocket->on_incoming_frame_payload) {
1356 return AWS_OP_SUCCESS;
1357 }
1358
1359 if (!websocket->on_incoming_frame_payload(
1360 websocket, websocket->thread_data.current_incoming_frame, data, websocket->user_data)) {
1361
1362 AWS_LOGF_ERROR(
1363 AWS_LS_HTTP_WEBSOCKET, "id=%p: Incoming payload callback has reported a failure.", (void *)websocket);
1364 return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
1365 }
1366
1367 /* If user reduced window_update_size, reduce how much the websocket will update its window */
1368 if (websocket->manual_window_update) {
1369 size_t reduce = data.len;
1370 websocket->thread_data.incoming_message_window_update -= data.len;
1371 AWS_LOGF_DEBUG(
1372 AWS_LS_HTTP_WEBSOCKET,
1373 "id=%p: Incoming payload callback changed window update size, window will shrink by %zu.",
1374 (void *)websocket,
1375 reduce);
1376 }
1377
1378 return AWS_OP_SUCCESS;
1379 }
1380
1381 /* Pass data to channel handler on the right */
s_decoder_on_midchannel_payload(struct aws_websocket * websocket,struct aws_byte_cursor data)1382 static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, struct aws_byte_cursor data) {
1383 struct aws_io_message *io_msg = NULL;
1384
1385 /* Only pass data to next handler if it's from a BINARY frame (or the CONTINUATION of a BINARY frame) */
1386 bool is_binary_data = websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_BINARY ||
1387 (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_CONTINUATION &&
1388 websocket->thread_data.continuation_of_opcode == AWS_WEBSOCKET_OPCODE_BINARY);
1389 if (!is_binary_data) {
1390 return AWS_OP_SUCCESS;
1391 }
1392
1393 AWS_ASSERT(websocket->channel_slot->adj_right); /* Expected another slot in the read direction */
1394
1395 /* Note that current implementation of websocket handler does not buffer data travelling in the "read" direction,
1396 * so the downstream read window needs to be large enough to immediately receive incoming data. */
1397 if (aws_channel_slot_downstream_read_window(websocket->channel_slot) < data.len) {
1398 AWS_LOGF_ERROR(
1399 AWS_LS_HTTP_WEBSOCKET,
1400 "id=%p: Cannot send entire message without exceeding read window.",
1401 (void *)websocket);
1402 aws_raise_error(AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW);
1403 goto error;
1404 }
1405
1406 io_msg = aws_channel_acquire_message_from_pool(
1407 websocket->channel_slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, data.len);
1408 if (!io_msg) {
1409 AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Failed to acquire message.", (void *)websocket);
1410 goto error;
1411 }
1412
1413 if (io_msg->message_data.capacity < data.len) {
1414 /* Probably can't happen. Data is coming an aws_io_message, should be able to acquire another just as big */
1415 AWS_LOGF_ERROR(
1416 AWS_LS_HTTP_WEBSOCKET, "id=%p: Failed to acquire sufficiently large message.", (void *)websocket);
1417 aws_raise_error(AWS_ERROR_UNKNOWN);
1418 goto error;
1419 }
1420
1421 if (!aws_byte_buf_write_from_whole_cursor(&io_msg->message_data, data)) {
1422 AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Unexpected error while copying data.", (void *)websocket);
1423 aws_raise_error(AWS_ERROR_UNKNOWN);
1424 goto error;
1425 }
1426
1427 int err = aws_channel_slot_send_message(websocket->channel_slot, io_msg, AWS_CHANNEL_DIR_READ);
1428 if (err) {
1429 AWS_LOGF_ERROR(
1430 AWS_LS_HTTP_WEBSOCKET,
1431 "id=%p: Failed to send read message, error %d (%s).",
1432 (void *)websocket,
1433 aws_last_error(),
1434 aws_error_name(aws_last_error()));
1435 goto error;
1436 }
1437
1438 /* Reduce amount by which websocket will update its read window */
1439 AWS_ASSERT(websocket->thread_data.incoming_message_window_update >= data.len);
1440 websocket->thread_data.incoming_message_window_update -= data.len;
1441
1442 return AWS_OP_SUCCESS;
1443
1444 error:
1445 if (io_msg) {
1446 aws_mem_release(io_msg->allocator, io_msg);
1447 }
1448 return AWS_OP_ERR;
1449 }
1450
s_complete_incoming_frame(struct aws_websocket * websocket,int error_code,bool * out_callback_result)1451 static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result) {
1452 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1453 AWS_ASSERT(websocket->thread_data.current_incoming_frame);
1454
1455 if (error_code == AWS_OP_SUCCESS) {
1456 /* If this was a CLOSE frame, don't read any more data. */
1457 if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
1458 AWS_LOGF_DEBUG(
1459 AWS_LS_HTTP_WEBSOCKET,
1460 "id=%p: Close frame received, any further data received will be ignored.",
1461 (void *)websocket);
1462 websocket->thread_data.is_reading_stopped = true;
1463
1464 /* TODO: auto-close if there's a channel-handler to the right */
1465 }
1466
1467 /* TODO: auto-respond to PING with PONG */
1468 }
1469
1470 /* Invoke user cb */
1471 bool callback_result = true;
1472 if (websocket->on_incoming_frame_complete && !websocket->thread_data.is_midchannel_handler) {
1473 callback_result = websocket->on_incoming_frame_complete(
1474 websocket, websocket->thread_data.current_incoming_frame, error_code, websocket->user_data);
1475 }
1476
1477 if (out_callback_result) {
1478 *out_callback_result = callback_result;
1479 }
1480
1481 websocket->thread_data.current_incoming_frame = NULL;
1482 }
1483
s_handler_initial_window_size(struct aws_channel_handler * handler)1484 static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) {
1485 struct aws_websocket *websocket = handler->impl;
1486 return websocket->initial_window_size;
1487 }
1488
s_handler_message_overhead(struct aws_channel_handler * handler)1489 static size_t s_handler_message_overhead(struct aws_channel_handler *handler) {
1490 (void)handler;
1491 return AWS_WEBSOCKET_MAX_FRAME_OVERHEAD;
1492 }
1493
s_handler_increment_read_window(struct aws_channel_handler * handler,struct aws_channel_slot * slot,size_t size)1494 static int s_handler_increment_read_window(
1495 struct aws_channel_handler *handler,
1496 struct aws_channel_slot *slot,
1497 size_t size) {
1498
1499 struct aws_websocket *websocket = handler->impl;
1500 AWS_ASSERT(aws_channel_thread_is_callers_thread(slot->channel));
1501 AWS_ASSERT(websocket->thread_data.is_midchannel_handler);
1502
1503 /* NOTE: This is pretty hacky and should change if it ever causes issues.
1504 *
1505 * Currently, all read messages are processed the moment they're received.
1506 * If the downstream read window is open enough to accept this data, we can send it right along.
1507 * BUT if the downstream window were too small, we'd need to buffer the data and wait until
1508 * the downstream window opened again to finish sending.
1509 *
1510 * To avoid that complexity, we go to pains here to ensure that the websocket's window exactly
1511 * matches the window to the right, allowing us to avoid buffering in the read direction.
1512 */
1513 size_t increment = size;
1514 if (websocket->thread_data.last_known_right_slot != slot->adj_right) {
1515 if (size < slot->window_size) {
1516 AWS_LOGF_ERROR(
1517 AWS_LS_HTTP_WEBSOCKET,
1518 "id=%p: The websocket does not support downstream handlers with a smaller window.",
1519 (void *)websocket);
1520 aws_raise_error(AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW);
1521 goto error;
1522 }
1523
1524 /* New handler to the right, make sure websocket's window matches its window. */
1525 websocket->thread_data.last_known_right_slot = slot->adj_right;
1526 increment = size - slot->window_size;
1527 }
1528
1529 if (increment != 0) {
1530 int err = aws_channel_slot_increment_read_window(slot, increment);
1531 if (err) {
1532 goto error;
1533 }
1534 }
1535
1536 return AWS_OP_SUCCESS;
1537
1538 error:
1539 websocket->thread_data.is_reading_stopped = true;
1540 /* Shutting down channel because I know that no one ever checks these errors */
1541 s_shutdown_due_to_read_err(websocket, aws_last_error());
1542 return AWS_OP_ERR;
1543 }
1544
s_increment_read_window_action(struct aws_websocket * websocket,size_t size)1545 static void s_increment_read_window_action(struct aws_websocket *websocket, size_t size) {
1546 AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
1547
1548 int err = aws_channel_slot_increment_read_window(websocket->channel_slot, size);
1549 if (err) {
1550 AWS_LOGF_ERROR(
1551 AWS_LS_HTTP_WEBSOCKET,
1552 "id=%p: Failed to increment read window, error %d (%s). Closing websocket.",
1553 (void *)websocket,
1554 aws_last_error(),
1555 aws_error_name(aws_last_error()));
1556
1557 s_schedule_channel_shutdown(websocket, aws_last_error());
1558 }
1559 }
1560
s_increment_read_window_task(struct aws_channel_task * task,void * arg,enum aws_task_status status)1561 static void s_increment_read_window_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
1562 (void)task;
1563
1564 if (status != AWS_TASK_STATUS_RUN_READY) {
1565 return;
1566 }
1567
1568 struct aws_websocket *websocket = arg;
1569 size_t size;
1570
1571 /* BEGIN CRITICAL SECTION */
1572 s_lock_synced_data(websocket);
1573
1574 size = websocket->synced_data.window_increment_size;
1575 websocket->synced_data.window_increment_size = 0;
1576
1577 s_unlock_synced_data(websocket);
1578 /* END CRITICAL SECTION */
1579
1580 AWS_LOGF_TRACE(
1581 AWS_LS_HTTP_WEBSOCKET, "id=%p: Running task to increment read window by %zu.", (void *)websocket, size);
1582
1583 s_increment_read_window_action(websocket, size);
1584 }
1585
aws_websocket_increment_read_window(struct aws_websocket * websocket,size_t size)1586 void aws_websocket_increment_read_window(struct aws_websocket *websocket, size_t size) {
1587 if (size == 0) {
1588 AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Ignoring window increment of size 0.", (void *)websocket);
1589 return;
1590 }
1591
1592 /* Schedule a task to do the increment.
1593 * If task is already scheduled, just increase size to be incremented */
1594 bool is_midchannel_handler = false;
1595 bool should_schedule_task = false;
1596
1597 /* BEGIN CRITICAL SECTION */
1598 s_lock_synced_data(websocket);
1599
1600 if (websocket->synced_data.is_midchannel_handler) {
1601 is_midchannel_handler = true;
1602 } else if (websocket->synced_data.window_increment_size == 0) {
1603 should_schedule_task = true;
1604 websocket->synced_data.window_increment_size = size;
1605 } else {
1606 websocket->synced_data.window_increment_size =
1607 aws_add_size_saturating(websocket->synced_data.window_increment_size, size);
1608 }
1609
1610 s_unlock_synced_data(websocket);
1611 /* END CRITICAL SECTION */
1612
1613 if (is_midchannel_handler) {
1614 AWS_LOGF_TRACE(
1615 AWS_LS_HTTP_WEBSOCKET,
1616 "id=%p: Ignoring window increment call, websocket has converted to midchannel handler.",
1617 (void *)websocket);
1618 } else if (should_schedule_task) {
1619 AWS_LOGF_TRACE(
1620 AWS_LS_HTTP_WEBSOCKET, "id=%p: Scheduling task to increment read window by %zu.", (void *)websocket, size);
1621 aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->increment_read_window_task);
1622 } else {
1623 AWS_LOGF_TRACE(
1624 AWS_LS_HTTP_WEBSOCKET,
1625 "id=%p: Task to increment read window already scheduled, increasing scheduled size by %zu.",
1626 (void *)websocket,
1627 size);
1628 }
1629 }
1630
aws_websocket_random_handshake_key(struct aws_byte_buf * dst)1631 int aws_websocket_random_handshake_key(struct aws_byte_buf *dst) {
1632 /* RFC-6455 Section 4.1.
1633 * Derive random 16-byte value, base64-encoded, for the Sec-WebSocket-Key header */
1634 uint8_t key_random_storage[16] = {0};
1635 struct aws_byte_buf key_random_buf = aws_byte_buf_from_empty_array(key_random_storage, sizeof(key_random_storage));
1636 int err = aws_device_random_buffer(&key_random_buf);
1637 if (err) {
1638 return AWS_OP_ERR;
1639 }
1640
1641 struct aws_byte_cursor key_random_cur = aws_byte_cursor_from_buf(&key_random_buf);
1642 err = aws_base64_encode(&key_random_cur, dst);
1643 if (err) {
1644 return AWS_OP_ERR;
1645 }
1646
1647 return AWS_OP_SUCCESS;
1648 }
1649
aws_http_message_new_websocket_handshake_request(struct aws_allocator * allocator,struct aws_byte_cursor path,struct aws_byte_cursor host)1650 struct aws_http_message *aws_http_message_new_websocket_handshake_request(
1651 struct aws_allocator *allocator,
1652 struct aws_byte_cursor path,
1653 struct aws_byte_cursor host) {
1654
1655 AWS_PRECONDITION(allocator);
1656 AWS_PRECONDITION(aws_byte_cursor_is_valid(&path))
1657 AWS_PRECONDITION(aws_byte_cursor_is_valid(&host))
1658
1659 struct aws_http_message *request = aws_http_message_new_request(allocator);
1660 if (!request) {
1661 goto error;
1662 }
1663
1664 int err = aws_http_message_set_request_method(request, aws_http_method_get);
1665 if (err) {
1666 goto error;
1667 }
1668
1669 err = aws_http_message_set_request_path(request, path);
1670 if (err) {
1671 goto error;
1672 }
1673
1674 uint8_t key_storage[AWS_WEBSOCKET_MAX_HANDSHAKE_KEY_LENGTH];
1675 struct aws_byte_buf key_buf = aws_byte_buf_from_empty_array(key_storage, sizeof(key_storage));
1676 err = aws_websocket_random_handshake_key(&key_buf);
1677 if (err) {
1678 goto error;
1679 }
1680
1681 struct aws_http_header required_headers[] = {
1682 {
1683 .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"),
1684 .value = host,
1685 },
1686 {
1687 .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Upgrade"),
1688 .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("websocket"),
1689 },
1690 {
1691 .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Connection"),
1692 .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Upgrade"),
1693 },
1694 {
1695 .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Sec-WebSocket-Key"),
1696 .value = aws_byte_cursor_from_buf(&key_buf),
1697 },
1698 {
1699 .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Sec-WebSocket-Version"),
1700 .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("13"),
1701 },
1702 };
1703
1704 for (size_t i = 0; i < AWS_ARRAY_SIZE(required_headers); ++i) {
1705 err = aws_http_message_add_header(request, required_headers[i]);
1706 if (err) {
1707 goto error;
1708 }
1709 }
1710
1711 return request;
1712
1713 error:
1714 aws_http_message_destroy(request);
1715 return NULL;
1716 }
1717