1 /**
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  * SPDX-License-Identifier: Apache-2.0.
4  */
5 
6 #include <aws/event-stream/event_stream.h>
7 
8 #include <aws/checksums/crc.h>
9 
10 #include <aws/common/encoding.h>
11 #include <aws/io/io.h>
12 
13 #include <inttypes.h>
14 
15 #define LIB_NAME "libaws-c-event-stream"
16 
17 #if _MSC_VER
18 #    pragma warning(push)
19 #    pragma warning(disable : 4221) /* aggregate initializer using local variable addresses */
20 #    pragma warning(disable : 4204) /* non-constant aggregate initializer */
21 #    pragma warning(disable : 4306) /* msft doesn't trust us to do pointer arithmetic. */
22 #endif
23 
24 static struct aws_error_info s_errors[] = {
25     AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH, "Buffer length mismatch", LIB_NAME),
26     AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_INSUFFICIENT_BUFFER_LEN, "insufficient buffer length", LIB_NAME),
27     AWS_DEFINE_ERROR_INFO(
28         AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED,
29         "a field for the message was too large",
30         LIB_NAME),
31     AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE, "prelude checksum was incorrect", LIB_NAME),
32     AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE, "message checksum was incorrect", LIB_NAME),
33     AWS_DEFINE_ERROR_INFO(
34         AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN,
35         "message headers length was incorrect",
36         LIB_NAME),
37     AWS_DEFINE_ERROR_INFO(
38         AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE,
39         "An unknown header type was encountered",
40         LIB_NAME),
41     AWS_DEFINE_ERROR_INFO(
42         AWS_ERROR_EVENT_STREAM_MESSAGE_PARSER_ILLEGAL_STATE,
43         "message parser encountered an illegal state",
44         LIB_NAME),
45     AWS_DEFINE_ERROR_INFO(
46         AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED,
47         "event stream rpc connection has been closed",
48         LIB_NAME),
49     AWS_DEFINE_ERROR_INFO(
50         AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR,
51         "event stream rpc connection has encountered a protocol error",
52         LIB_NAME),
53     AWS_DEFINE_ERROR_INFO(
54         AWS_ERROR_EVENT_STREAM_RPC_STREAM_CLOSED,
55         "event stream rpc connection stream is closed.",
56         LIB_NAME),
57     AWS_DEFINE_ERROR_INFO(
58         AWS_ERROR_EVENT_STREAM_RPC_STREAM_NOT_ACTIVATED,
59         "event stream rpc stream continuation was not successfully activated before use. Call "
60         "aws_event_stream_rpc_client_continuation_activate()"
61         " before using a stream continuation token.",
62         LIB_NAME),
63 };
64 
65 static struct aws_error_info_list s_list = {
66     .error_list = s_errors,
67     .count = sizeof(s_errors) / sizeof(struct aws_error_info),
68 };
69 
70 static bool s_event_stream_library_initialized = false;
71 
72 static struct aws_log_subject_info s_event_stream_log_subject_infos[] = {
73     DEFINE_LOG_SUBJECT_INFO(
74         AWS_LS_EVENT_STREAM_GENERAL,
75         "event-stream-general",
76         "Subject for aws-c-event-stream logging that defies categorization."),
77     DEFINE_LOG_SUBJECT_INFO(
78         AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
79         "event-stream-channel-handler",
80         "Subject for event-stream channel handler related logging."),
81     DEFINE_LOG_SUBJECT_INFO(
82         AWS_LS_EVENT_STREAM_RPC_SERVER,
83         "event-stream-rpc-server",
84         "Subject for event-stream rpc server."),
85     DEFINE_LOG_SUBJECT_INFO(
86         AWS_LS_EVENT_STREAM_RPC_CLIENT,
87         "event-stream-rpc-client",
88         "Subject for event-stream rpc client."),
89 };
90 
91 static struct aws_log_subject_info_list s_event_stream_log_subject_list = {
92     .subject_list = s_event_stream_log_subject_infos,
93     .count = AWS_ARRAY_SIZE(s_event_stream_log_subject_infos),
94 };
95 
aws_event_stream_library_init(struct aws_allocator * allocator)96 void aws_event_stream_library_init(struct aws_allocator *allocator) {
97     if (!s_event_stream_library_initialized) {
98         s_event_stream_library_initialized = true;
99         aws_io_library_init(allocator);
100         aws_register_error_info(&s_list);
101         aws_register_log_subject_info_list(&s_event_stream_log_subject_list);
102     }
103 }
104 
aws_event_stream_library_clean_up(void)105 void aws_event_stream_library_clean_up(void) {
106     if (s_event_stream_library_initialized) {
107         s_event_stream_library_initialized = false;
108         aws_unregister_error_info(&s_list);
109         aws_io_library_clean_up();
110     }
111 }
112 
113 #define TOTAL_LEN_OFFSET 0
114 #define PRELUDE_CRC_OFFSET (sizeof(uint32_t) + sizeof(uint32_t))
115 #define HEADER_LEN_OFFSET sizeof(uint32_t)
116 
117 /* Computes the byte length necessary to store the headers represented in the headers list.
118  * returns that length. */
aws_event_stream_compute_headers_required_buffer_len(const struct aws_array_list * headers)119 uint32_t aws_event_stream_compute_headers_required_buffer_len(const struct aws_array_list *headers) {
120     if (!headers || !aws_array_list_length(headers)) {
121         return 0;
122     }
123 
124     size_t headers_count = aws_array_list_length(headers);
125     size_t headers_len = 0;
126 
127     for (size_t i = 0; i < headers_count; ++i) {
128         struct aws_event_stream_header_value_pair *header = NULL;
129 
130         aws_array_list_get_at_ptr(headers, (void **)&header, i);
131 
132         headers_len += sizeof(header->header_name_len) + header->header_name_len + 1;
133 
134         if (header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING ||
135             header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF) {
136             headers_len += sizeof(header->header_value_len);
137         }
138 
139         if (header->header_value_type != AWS_EVENT_STREAM_HEADER_BOOL_FALSE &&
140             header->header_value_type != AWS_EVENT_STREAM_HEADER_BOOL_TRUE) {
141             headers_len += header->header_value_len;
142         }
143     }
144 
145     return (uint32_t)headers_len;
146 }
147 
148 /* adds the headers represented in the headers list to the buffer.
149  returns the new buffer offset for use elsewhere. Assumes buffer length is at least the length of the return value
150  from compute_headers_length() */
aws_event_stream_write_headers_to_buffer(const struct aws_array_list * headers,uint8_t * buffer)151 size_t aws_event_stream_write_headers_to_buffer(const struct aws_array_list *headers, uint8_t *buffer) {
152     if (!headers || !aws_array_list_length(headers)) {
153         return 0;
154     }
155 
156     size_t headers_count = aws_array_list_length(headers);
157     uint8_t *buffer_alias = buffer;
158 
159     for (size_t i = 0; i < headers_count; ++i) {
160         struct aws_event_stream_header_value_pair *header = NULL;
161 
162         aws_array_list_get_at_ptr(headers, (void **)&header, i);
163         *buffer_alias = (uint8_t)header->header_name_len;
164         buffer_alias++;
165         memcpy(buffer_alias, header->header_name, (size_t)header->header_name_len);
166         buffer_alias += header->header_name_len;
167         *buffer_alias = (uint8_t)header->header_value_type;
168         buffer_alias++;
169         switch (header->header_value_type) {
170             case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
171             case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
172                 break;
173             case AWS_EVENT_STREAM_HEADER_BYTE:
174                 *buffer_alias = header->header_value.static_val[0];
175                 buffer_alias++;
176                 break;
177                 /* additions of integers here assume the endianness conversion has already happened */
178             case AWS_EVENT_STREAM_HEADER_INT16:
179                 memcpy(buffer_alias, header->header_value.static_val, sizeof(uint16_t));
180                 buffer_alias += sizeof(uint16_t);
181                 break;
182             case AWS_EVENT_STREAM_HEADER_INT32:
183                 memcpy(buffer_alias, header->header_value.static_val, sizeof(uint32_t));
184                 buffer_alias += sizeof(uint32_t);
185                 break;
186             case AWS_EVENT_STREAM_HEADER_INT64:
187             case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
188                 memcpy(buffer_alias, header->header_value.static_val, sizeof(uint64_t));
189                 buffer_alias += sizeof(uint64_t);
190                 break;
191             case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
192             case AWS_EVENT_STREAM_HEADER_STRING:
193                 aws_write_u16(header->header_value_len, buffer_alias);
194                 buffer_alias += sizeof(uint16_t);
195                 memcpy(buffer_alias, header->header_value.variable_len_val, header->header_value_len);
196                 buffer_alias += header->header_value_len;
197                 break;
198             case AWS_EVENT_STREAM_HEADER_UUID:
199                 memcpy(buffer_alias, header->header_value.static_val, 16);
200                 buffer_alias += header->header_value_len;
201                 break;
202         }
203     }
204 
205     return buffer_alias - buffer;
206 }
207 
aws_event_stream_read_headers_from_buffer(struct aws_array_list * headers,const uint8_t * buffer,size_t headers_len)208 int aws_event_stream_read_headers_from_buffer(
209     struct aws_array_list *headers,
210     const uint8_t *buffer,
211     size_t headers_len) {
212 
213     if (AWS_UNLIKELY(headers_len > AWS_EVENT_STREAM_MAX_HEADERS_SIZE)) {
214         return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
215     }
216 
217     /* iterate the buffer per header. */
218     const uint8_t *buffer_start = buffer;
219     while ((size_t)(buffer - buffer_start) < headers_len) {
220         struct aws_event_stream_header_value_pair header;
221         AWS_ZERO_STRUCT(header);
222 
223         /* get the header info from the buffer, make sure to increment buffer offset. */
224         header.header_name_len = *buffer;
225         buffer += sizeof(header.header_name_len);
226         memcpy((void *)header.header_name, buffer, (size_t)header.header_name_len);
227         buffer += header.header_name_len;
228         header.header_value_type = (enum aws_event_stream_header_value_type) * buffer;
229         buffer++;
230 
231         switch (header.header_value_type) {
232             case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
233                 header.header_value_len = 0;
234                 header.header_value.static_val[0] = 0;
235                 break;
236             case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
237                 header.header_value_len = 0;
238                 header.header_value.static_val[0] = 1;
239                 break;
240             case AWS_EVENT_STREAM_HEADER_BYTE:
241                 header.header_value_len = sizeof(uint8_t);
242                 header.header_value.static_val[0] = *buffer;
243                 buffer++;
244                 break;
245             case AWS_EVENT_STREAM_HEADER_INT16:
246                 header.header_value_len = sizeof(uint16_t);
247                 memcpy(header.header_value.static_val, buffer, sizeof(uint16_t));
248                 buffer += sizeof(uint16_t);
249                 break;
250             case AWS_EVENT_STREAM_HEADER_INT32:
251                 header.header_value_len = sizeof(uint32_t);
252                 memcpy(header.header_value.static_val, buffer, sizeof(uint32_t));
253                 buffer += sizeof(uint32_t);
254                 break;
255             case AWS_EVENT_STREAM_HEADER_INT64:
256             case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
257                 header.header_value_len = sizeof(uint64_t);
258                 memcpy(header.header_value.static_val, buffer, sizeof(uint64_t));
259                 buffer += sizeof(uint64_t);
260                 break;
261             case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
262             case AWS_EVENT_STREAM_HEADER_STRING:
263                 header.header_value_len = aws_read_u16(buffer);
264                 buffer += sizeof(header.header_value_len);
265                 header.header_value.variable_len_val = (uint8_t *)buffer;
266                 buffer += header.header_value_len;
267                 break;
268             case AWS_EVENT_STREAM_HEADER_UUID:
269                 header.header_value_len = 16;
270                 memcpy(header.header_value.static_val, buffer, 16);
271                 buffer += header.header_value_len;
272                 break;
273         }
274 
275         if (aws_array_list_push_back(headers, (const void *)&header)) {
276             return AWS_OP_ERR;
277         }
278     }
279 
280     return AWS_OP_SUCCESS;
281 }
282 
283 /* initialize message with the arguments
284  * the underlying buffer will be allocated and payload will be copied.
285  * see specification, this code should simply add these fields according to that.*/
aws_event_stream_message_init(struct aws_event_stream_message * message,struct aws_allocator * alloc,struct aws_array_list * headers,struct aws_byte_buf * payload)286 int aws_event_stream_message_init(
287     struct aws_event_stream_message *message,
288     struct aws_allocator *alloc,
289     struct aws_array_list *headers,
290     struct aws_byte_buf *payload) {
291 
292     size_t payload_len = payload ? payload->len : 0;
293 
294     uint32_t headers_length = aws_event_stream_compute_headers_required_buffer_len(headers);
295 
296     if (AWS_UNLIKELY(headers_length > AWS_EVENT_STREAM_MAX_HEADERS_SIZE)) {
297         return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
298     }
299 
300     uint32_t total_length =
301         (uint32_t)(AWS_EVENT_STREAM_PRELUDE_LENGTH + headers_length + payload_len + AWS_EVENT_STREAM_TRAILER_LENGTH);
302 
303     if (AWS_UNLIKELY(total_length < headers_length || total_length < payload_len)) {
304         return aws_raise_error(AWS_ERROR_OVERFLOW_DETECTED);
305     }
306 
307     if (AWS_UNLIKELY(total_length > AWS_EVENT_STREAM_MAX_MESSAGE_SIZE)) {
308         return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
309     }
310 
311     message->alloc = alloc;
312     message->message_buffer = aws_mem_acquire(message->alloc, total_length);
313 
314     if (message->message_buffer) {
315         message->owns_buffer = 1;
316         aws_write_u32(total_length, message->message_buffer);
317         uint8_t *buffer_offset = message->message_buffer + sizeof(total_length);
318         aws_write_u32(headers_length, buffer_offset);
319         buffer_offset += sizeof(headers_length);
320 
321         uint32_t running_crc =
322             aws_checksums_crc32(message->message_buffer, (int)(buffer_offset - message->message_buffer), 0);
323 
324         const uint8_t *message_crc_boundary_start = buffer_offset;
325         aws_write_u32(running_crc, buffer_offset);
326         buffer_offset += sizeof(running_crc);
327 
328         if (headers_length) {
329             buffer_offset += aws_event_stream_write_headers_to_buffer(headers, buffer_offset);
330         }
331 
332         if (payload) {
333             memcpy(buffer_offset, payload->buffer, payload->len);
334             buffer_offset += payload->len;
335         }
336 
337         running_crc = aws_checksums_crc32(
338             message_crc_boundary_start, (int)(buffer_offset - message_crc_boundary_start), running_crc);
339         aws_write_u32(running_crc, buffer_offset);
340 
341         return AWS_OP_SUCCESS;
342     }
343 
344     return aws_raise_error(AWS_ERROR_OOM);
345 }
346 
347 /* add buffer to the message (non-owning). Verify buffer crcs and that length fields are reasonable. */
aws_event_stream_message_from_buffer(struct aws_event_stream_message * message,struct aws_allocator * alloc,struct aws_byte_buf * buffer)348 int aws_event_stream_message_from_buffer(
349     struct aws_event_stream_message *message,
350     struct aws_allocator *alloc,
351     struct aws_byte_buf *buffer) {
352     AWS_ASSERT(buffer);
353 
354     message->alloc = alloc;
355     message->owns_buffer = 0;
356 
357     if (AWS_UNLIKELY(buffer->len < AWS_EVENT_STREAM_PRELUDE_LENGTH + AWS_EVENT_STREAM_TRAILER_LENGTH)) {
358         return aws_raise_error(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH);
359     }
360 
361     uint32_t message_length = aws_read_u32(buffer->buffer + TOTAL_LEN_OFFSET);
362 
363     if (AWS_UNLIKELY(message_length != buffer->len)) {
364         return aws_raise_error(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH);
365     }
366 
367     if (AWS_UNLIKELY(message_length > AWS_EVENT_STREAM_MAX_MESSAGE_SIZE)) {
368         return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
369     }
370 
371     uint32_t running_crc = aws_checksums_crc32(buffer->buffer, (int)PRELUDE_CRC_OFFSET, 0);
372     uint32_t prelude_crc = aws_read_u32(buffer->buffer + PRELUDE_CRC_OFFSET);
373 
374     if (running_crc != prelude_crc) {
375         return aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE);
376     }
377 
378     running_crc = aws_checksums_crc32(
379         buffer->buffer + PRELUDE_CRC_OFFSET,
380         (int)(message_length - PRELUDE_CRC_OFFSET - AWS_EVENT_STREAM_TRAILER_LENGTH),
381         running_crc);
382     uint32_t message_crc = aws_read_u32(buffer->buffer + message_length - AWS_EVENT_STREAM_TRAILER_LENGTH);
383 
384     if (running_crc != message_crc) {
385         return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE);
386     }
387 
388     message->message_buffer = buffer->buffer;
389 
390     if (aws_event_stream_message_headers_len(message) >
391         message_length - AWS_EVENT_STREAM_PRELUDE_LENGTH - AWS_EVENT_STREAM_TRAILER_LENGTH) {
392         message->message_buffer = 0;
393         return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN);
394     }
395 
396     return AWS_OP_SUCCESS;
397 }
398 
399 /* Verify buffer crcs and that length fields are reasonable. Once that is done, the buffer is copied to the message. */
aws_event_stream_message_from_buffer_copy(struct aws_event_stream_message * message,struct aws_allocator * alloc,const struct aws_byte_buf * buffer)400 int aws_event_stream_message_from_buffer_copy(
401     struct aws_event_stream_message *message,
402     struct aws_allocator *alloc,
403     const struct aws_byte_buf *buffer) {
404     int parse_value = aws_event_stream_message_from_buffer(message, alloc, (struct aws_byte_buf *)buffer);
405 
406     if (!parse_value) {
407         message->message_buffer = aws_mem_acquire(alloc, buffer->len);
408 
409         if (message->message_buffer) {
410             memcpy(message->message_buffer, buffer->buffer, buffer->len);
411             message->alloc = alloc;
412             message->owns_buffer = 1;
413 
414             return AWS_OP_SUCCESS;
415         }
416 
417         return aws_raise_error(AWS_ERROR_OOM);
418     }
419 
420     return parse_value;
421 }
422 
423 /* if buffer is owned, release the memory. */
aws_event_stream_message_clean_up(struct aws_event_stream_message * message)424 void aws_event_stream_message_clean_up(struct aws_event_stream_message *message) {
425     if (message->message_buffer && message->owns_buffer) {
426         aws_mem_release(message->alloc, message->message_buffer);
427     }
428 }
429 
aws_event_stream_message_total_length(const struct aws_event_stream_message * message)430 uint32_t aws_event_stream_message_total_length(const struct aws_event_stream_message *message) {
431     return aws_read_u32(message->message_buffer + TOTAL_LEN_OFFSET);
432 }
433 
aws_event_stream_message_headers_len(const struct aws_event_stream_message * message)434 uint32_t aws_event_stream_message_headers_len(const struct aws_event_stream_message *message) {
435     return aws_read_u32(message->message_buffer + HEADER_LEN_OFFSET);
436 }
437 
aws_event_stream_message_prelude_crc(const struct aws_event_stream_message * message)438 uint32_t aws_event_stream_message_prelude_crc(const struct aws_event_stream_message *message) {
439     return aws_read_u32(message->message_buffer + PRELUDE_CRC_OFFSET);
440 }
441 
aws_event_stream_message_headers(const struct aws_event_stream_message * message,struct aws_array_list * headers)442 int aws_event_stream_message_headers(const struct aws_event_stream_message *message, struct aws_array_list *headers) {
443     return aws_event_stream_read_headers_from_buffer(
444         headers,
445         message->message_buffer + AWS_EVENT_STREAM_PRELUDE_LENGTH,
446         aws_event_stream_message_headers_len(message));
447 }
448 
aws_event_stream_message_payload(const struct aws_event_stream_message * message)449 const uint8_t *aws_event_stream_message_payload(const struct aws_event_stream_message *message) {
450     return message->message_buffer + AWS_EVENT_STREAM_PRELUDE_LENGTH + aws_event_stream_message_headers_len(message);
451 }
452 
aws_event_stream_message_payload_len(const struct aws_event_stream_message * message)453 uint32_t aws_event_stream_message_payload_len(const struct aws_event_stream_message *message) {
454     return aws_event_stream_message_total_length(message) -
455            (AWS_EVENT_STREAM_PRELUDE_LENGTH + aws_event_stream_message_headers_len(message) +
456             AWS_EVENT_STREAM_TRAILER_LENGTH);
457 }
458 
aws_event_stream_message_message_crc(const struct aws_event_stream_message * message)459 uint32_t aws_event_stream_message_message_crc(const struct aws_event_stream_message *message) {
460     return aws_read_u32(
461         message->message_buffer + (aws_event_stream_message_total_length(message) - AWS_EVENT_STREAM_TRAILER_LENGTH));
462 }
463 
aws_event_stream_message_buffer(const struct aws_event_stream_message * message)464 const uint8_t *aws_event_stream_message_buffer(const struct aws_event_stream_message *message) {
465     return message->message_buffer;
466 }
467 
468 #define DEBUG_STR_PRELUDE_TOTAL_LEN "\"total_length\": "
469 #define DEBUG_STR_PRELUDE_HDRS_LEN "\"headers_length\": "
470 #define DEBUG_STR_PRELUDE_CRC "\"prelude_crc\": "
471 #define DEBUG_STR_MESSAGE_CRC "\"message_crc\": "
472 #define DEBUG_STR_HEADER_NAME "\"name\": "
473 #define DEBUG_STR_HEADER_VALUE "\"value\": "
474 #define DEBUG_STR_HEADER_TYPE "\"type\": "
475 
aws_event_stream_message_to_debug_str(FILE * fd,const struct aws_event_stream_message * message)476 int aws_event_stream_message_to_debug_str(FILE *fd, const struct aws_event_stream_message *message) {
477     struct aws_array_list headers;
478     aws_event_stream_headers_list_init(&headers, message->alloc);
479     aws_event_stream_message_headers(message, &headers);
480 
481     fprintf(
482         fd,
483         "{\n  " DEBUG_STR_PRELUDE_TOTAL_LEN "%d,\n  " DEBUG_STR_PRELUDE_HDRS_LEN "%d,\n  " DEBUG_STR_PRELUDE_CRC
484         "%d,\n",
485         aws_event_stream_message_total_length(message),
486         aws_event_stream_message_headers_len(message),
487         aws_event_stream_message_prelude_crc(message));
488 
489     int count = 0;
490 
491     uint16_t headers_count = (uint16_t)aws_array_list_length(&headers);
492 
493     fprintf(fd, "  \"headers\": [");
494 
495     for (uint16_t i = 0; i < headers_count; ++i) {
496         struct aws_event_stream_header_value_pair *header = NULL;
497 
498         aws_array_list_get_at_ptr(&headers, (void **)&header, i);
499 
500         fprintf(fd, "    {\n");
501 
502         fprintf(fd, "      " DEBUG_STR_HEADER_NAME "\"");
503         fwrite(header->header_name, sizeof(char), (size_t)header->header_name_len, fd);
504         fprintf(fd, "\",\n");
505 
506         fprintf(fd, "      " DEBUG_STR_HEADER_TYPE "%d,\n", header->header_value_type);
507 
508         if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_FALSE) {
509             fprintf(fd, "      " DEBUG_STR_HEADER_VALUE "false\n");
510         } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE) {
511             fprintf(fd, "      " DEBUG_STR_HEADER_VALUE "true\n");
512         } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE) {
513             int8_t int_value = (int8_t)header->header_value.static_val[0];
514             fprintf(fd, "      " DEBUG_STR_HEADER_VALUE "%d\n", (int)int_value);
515         } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT16) {
516             int16_t int_value = aws_read_u16(header->header_value.static_val);
517             fprintf(fd, "      " DEBUG_STR_HEADER_VALUE "%d\n", (int)int_value);
518         } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT32) {
519             int32_t int_value = (int32_t)aws_read_u32(header->header_value.static_val);
520             fprintf(fd, "      " DEBUG_STR_HEADER_VALUE "%d\n", (int)int_value);
521         } else if (
522             header->header_value_type == AWS_EVENT_STREAM_HEADER_INT64 ||
523             header->header_value_type == AWS_EVENT_STREAM_HEADER_TIMESTAMP) {
524             int64_t int_value = (int64_t)aws_read_u64(header->header_value.static_val);
525             fprintf(fd, "      " DEBUG_STR_HEADER_VALUE "%lld\n", (long long)int_value);
526         } else {
527             size_t buffer_len = 0;
528             aws_base64_compute_encoded_len(header->header_value_len, &buffer_len);
529             char *encoded_buffer = (char *)aws_mem_acquire(message->alloc, buffer_len);
530             if (!encoded_buffer) {
531                 return aws_raise_error(AWS_ERROR_OOM);
532             }
533 
534             struct aws_byte_buf encode_output = aws_byte_buf_from_array((uint8_t *)encoded_buffer, buffer_len);
535 
536             if (header->header_value_type == AWS_EVENT_STREAM_HEADER_UUID) {
537                 struct aws_byte_cursor to_encode =
538                     aws_byte_cursor_from_array(header->header_value.static_val, header->header_value_len);
539 
540                 aws_base64_encode(&to_encode, &encode_output);
541             } else {
542                 struct aws_byte_cursor to_encode =
543                     aws_byte_cursor_from_array(header->header_value.variable_len_val, header->header_value_len);
544                 aws_base64_encode(&to_encode, &encode_output);
545             }
546             fprintf(fd, "      " DEBUG_STR_HEADER_VALUE "\"%s\"\n", encoded_buffer);
547             aws_mem_release(message->alloc, encoded_buffer);
548         }
549 
550         fprintf(fd, "    }");
551 
552         if (count < headers_count - 1) {
553             fprintf(fd, ",");
554         }
555         fprintf(fd, "\n");
556 
557         count++;
558     }
559     aws_event_stream_headers_list_cleanup(&headers);
560     fprintf(fd, "  ],\n");
561 
562     size_t payload_len = aws_event_stream_message_payload_len(message);
563     const uint8_t *payload = aws_event_stream_message_payload(message);
564     size_t encoded_len = 0;
565     aws_base64_compute_encoded_len(payload_len, &encoded_len);
566     char *encoded_payload = (char *)aws_mem_acquire(message->alloc, encoded_len);
567 
568     if (!encoded_payload) {
569         return aws_raise_error(AWS_ERROR_OOM);
570     }
571 
572     struct aws_byte_cursor payload_buffer = aws_byte_cursor_from_array(payload, payload_len);
573     struct aws_byte_buf encoded_payload_buffer = aws_byte_buf_from_array((uint8_t *)encoded_payload, encoded_len);
574 
575     aws_base64_encode(&payload_buffer, &encoded_payload_buffer);
576     fprintf(fd, "  \"payload\": \"%s\",\n", encoded_payload);
577     fprintf(fd, "  " DEBUG_STR_MESSAGE_CRC "%d\n}\n", aws_event_stream_message_message_crc(message));
578 
579     return AWS_OP_SUCCESS;
580 }
581 
aws_event_stream_headers_list_init(struct aws_array_list * headers,struct aws_allocator * allocator)582 int aws_event_stream_headers_list_init(struct aws_array_list *headers, struct aws_allocator *allocator) {
583     AWS_ASSERT(headers);
584     AWS_ASSERT(allocator);
585 
586     return aws_array_list_init_dynamic(headers, allocator, 4, sizeof(struct aws_event_stream_header_value_pair));
587 }
588 
aws_event_stream_headers_list_cleanup(struct aws_array_list * headers)589 void aws_event_stream_headers_list_cleanup(struct aws_array_list *headers) {
590     if (AWS_UNLIKELY(!headers || !aws_array_list_is_valid(headers))) {
591         return;
592     }
593 
594     for (size_t i = 0; i < aws_array_list_length(headers); ++i) {
595         struct aws_event_stream_header_value_pair *header = NULL;
596         aws_array_list_get_at_ptr(headers, (void **)&header, i);
597 
598         if (header->value_owned) {
599             aws_mem_release(headers->alloc, (void *)header->header_value.variable_len_val);
600         }
601     }
602 
603     aws_array_list_clean_up(headers);
604 }
605 
s_add_variable_len_header(struct aws_array_list * headers,struct aws_event_stream_header_value_pair * header,const char * name,uint8_t name_len,uint8_t * value,uint16_t value_len,int8_t copy)606 static int s_add_variable_len_header(
607     struct aws_array_list *headers,
608     struct aws_event_stream_header_value_pair *header,
609     const char *name,
610     uint8_t name_len,
611     uint8_t *value,
612     uint16_t value_len,
613     int8_t copy) {
614 
615     memcpy((void *)header->header_name, (void *)name, (size_t)name_len);
616 
617     if (copy) {
618         header->header_value.variable_len_val = aws_mem_acquire(headers->alloc, value_len);
619         if (!header->header_value.variable_len_val) {
620             return aws_raise_error(AWS_ERROR_OOM);
621         }
622 
623         header->value_owned = 1;
624         memcpy((void *)header->header_value.variable_len_val, (void *)value, value_len);
625     } else {
626         header->value_owned = 0;
627         header->header_value.variable_len_val = value;
628     }
629 
630     if (aws_array_list_push_back(headers, (void *)header)) {
631         if (header->value_owned) {
632             aws_mem_release(headers->alloc, (void *)header->header_value.variable_len_val);
633         }
634         return AWS_OP_ERR;
635     }
636 
637     return AWS_OP_SUCCESS;
638 }
639 
aws_event_stream_add_string_header(struct aws_array_list * headers,const char * name,uint8_t name_len,const char * value,uint16_t value_len,int8_t copy)640 int aws_event_stream_add_string_header(
641     struct aws_array_list *headers,
642     const char *name,
643     uint8_t name_len,
644     const char *value,
645     uint16_t value_len,
646     int8_t copy) {
647     struct aws_event_stream_header_value_pair header = {.header_name_len = name_len,
648                                                         .header_value_len = value_len,
649                                                         .value_owned = copy,
650                                                         .header_value_type = AWS_EVENT_STREAM_HEADER_STRING};
651 
652     return s_add_variable_len_header(headers, &header, name, name_len, (uint8_t *)value, value_len, copy);
653 }
654 
aws_event_stream_create_string_header(struct aws_byte_cursor name,struct aws_byte_cursor value)655 struct aws_event_stream_header_value_pair aws_event_stream_create_string_header(
656     struct aws_byte_cursor name,
657     struct aws_byte_cursor value) {
658     AWS_PRECONDITION(name.len < INT8_MAX);
659     AWS_PRECONDITION(value.len < INT16_MAX);
660 
661     struct aws_event_stream_header_value_pair header = {
662         .header_value_type = AWS_EVENT_STREAM_HEADER_STRING,
663         .header_value.variable_len_val = value.ptr,
664         .header_value_len = (uint16_t)value.len,
665         .header_name_len = (uint8_t)name.len,
666         .value_owned = 0,
667     };
668 
669     memcpy(header.header_name, name.ptr, name.len);
670 
671     return header;
672 }
673 
aws_event_stream_create_int32_header(struct aws_byte_cursor name,int32_t value)674 struct aws_event_stream_header_value_pair aws_event_stream_create_int32_header(
675     struct aws_byte_cursor name,
676     int32_t value) {
677     AWS_PRECONDITION(name.len < INT8_MAX);
678 
679     struct aws_event_stream_header_value_pair header = {
680         .header_value_type = AWS_EVENT_STREAM_HEADER_INT32,
681         .header_value_len = (uint16_t)sizeof(int32_t),
682         .header_name_len = (uint8_t)name.len,
683         .value_owned = 0,
684     };
685 
686     memcpy(header.header_name, name.ptr, name.len);
687     aws_write_u32((uint32_t)value, header.header_value.static_val);
688 
689     return header;
690 }
691 
aws_event_stream_add_byte_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int8_t value)692 int aws_event_stream_add_byte_header(struct aws_array_list *headers, const char *name, uint8_t name_len, int8_t value) {
693     struct aws_event_stream_header_value_pair header = {.header_name_len = name_len,
694                                                         .header_value_len = 1,
695                                                         .value_owned = 0,
696                                                         .header_value_type = AWS_EVENT_STREAM_HEADER_BYTE,
697                                                         .header_value.static_val[0] = (uint8_t)value};
698 
699     memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
700 
701     return aws_array_list_push_back(headers, (void *)&header);
702 }
703 
aws_event_stream_add_bool_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int8_t value)704 int aws_event_stream_add_bool_header(struct aws_array_list *headers, const char *name, uint8_t name_len, int8_t value) {
705     struct aws_event_stream_header_value_pair header = {
706         .header_name_len = name_len,
707         .header_value_len = 0,
708         .value_owned = 0,
709         .header_value_type = value ? AWS_EVENT_STREAM_HEADER_BOOL_TRUE : AWS_EVENT_STREAM_HEADER_BOOL_FALSE,
710     };
711 
712     memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
713 
714     return aws_array_list_push_back(headers, (void *)&header);
715 }
716 
aws_event_stream_add_int16_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int16_t value)717 int aws_event_stream_add_int16_header(
718     struct aws_array_list *headers,
719     const char *name,
720     uint8_t name_len,
721     int16_t value) {
722     struct aws_event_stream_header_value_pair header = {
723         .header_name_len = name_len,
724         .header_value_len = sizeof(value),
725         .value_owned = 0,
726         .header_value_type = AWS_EVENT_STREAM_HEADER_INT16,
727     };
728 
729     memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
730     aws_write_u16((uint16_t)value, header.header_value.static_val);
731 
732     return aws_array_list_push_back(headers, (void *)&header);
733 }
734 
aws_event_stream_add_int32_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int32_t value)735 int aws_event_stream_add_int32_header(
736     struct aws_array_list *headers,
737     const char *name,
738     uint8_t name_len,
739     int32_t value) {
740     struct aws_event_stream_header_value_pair header = {
741         .header_name_len = name_len,
742         .header_value_len = sizeof(value),
743         .value_owned = 0,
744         .header_value_type = AWS_EVENT_STREAM_HEADER_INT32,
745     };
746 
747     memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
748     aws_write_u32((uint32_t)value, header.header_value.static_val);
749 
750     return aws_array_list_push_back(headers, (void *)&header);
751 }
752 
aws_event_stream_add_int64_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int64_t value)753 int aws_event_stream_add_int64_header(
754     struct aws_array_list *headers,
755     const char *name,
756     uint8_t name_len,
757     int64_t value) {
758     struct aws_event_stream_header_value_pair header = {
759         .header_name_len = name_len,
760         .header_value_len = sizeof(value),
761         .value_owned = 0,
762         .header_value_type = AWS_EVENT_STREAM_HEADER_INT64,
763     };
764 
765     memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
766     aws_write_u64((uint64_t)value, header.header_value.static_val);
767 
768     return aws_array_list_push_back(headers, (void *)&header);
769 }
770 
aws_event_stream_add_bytebuf_header(struct aws_array_list * headers,const char * name,uint8_t name_len,uint8_t * value,uint16_t value_len,int8_t copy)771 int aws_event_stream_add_bytebuf_header(
772     struct aws_array_list *headers,
773     const char *name,
774     uint8_t name_len,
775     uint8_t *value,
776     uint16_t value_len,
777     int8_t copy) {
778     struct aws_event_stream_header_value_pair header = {.header_name_len = name_len,
779                                                         .header_value_len = value_len,
780                                                         .value_owned = copy,
781                                                         .header_value_type = AWS_EVENT_STREAM_HEADER_BYTE_BUF};
782 
783     return s_add_variable_len_header(headers, &header, name, name_len, value, value_len, copy);
784 }
785 
aws_event_stream_add_timestamp_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int64_t value)786 int aws_event_stream_add_timestamp_header(
787     struct aws_array_list *headers,
788     const char *name,
789     uint8_t name_len,
790     int64_t value) {
791     struct aws_event_stream_header_value_pair header = {
792         .header_name_len = name_len,
793         .header_value_len = sizeof(uint64_t),
794         .value_owned = 0,
795         .header_value_type = AWS_EVENT_STREAM_HEADER_TIMESTAMP,
796     };
797 
798     memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
799     aws_write_u64((uint64_t)value, header.header_value.static_val);
800 
801     return aws_array_list_push_back(headers, (void *)&header);
802 }
803 
aws_event_stream_add_uuid_header(struct aws_array_list * headers,const char * name,uint8_t name_len,const uint8_t * value)804 int aws_event_stream_add_uuid_header(
805     struct aws_array_list *headers,
806     const char *name,
807     uint8_t name_len,
808     const uint8_t *value) {
809     struct aws_event_stream_header_value_pair header = {
810         .header_name_len = name_len,
811         .header_value_len = 16,
812         .value_owned = 0,
813         .header_value_type = AWS_EVENT_STREAM_HEADER_UUID,
814     };
815 
816     memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
817     memcpy((void *)header.header_value.static_val, value, 16);
818 
819     return aws_array_list_push_back(headers, (void *)&header);
820 }
821 
aws_event_stream_header_name(struct aws_event_stream_header_value_pair * header)822 struct aws_byte_buf aws_event_stream_header_name(struct aws_event_stream_header_value_pair *header) {
823     return aws_byte_buf_from_array((uint8_t *)header->header_name, header->header_name_len);
824 }
825 
aws_event_stream_header_value_as_byte(struct aws_event_stream_header_value_pair * header)826 int8_t aws_event_stream_header_value_as_byte(struct aws_event_stream_header_value_pair *header) {
827     return (int8_t)header->header_value.static_val[0];
828 }
829 
aws_event_stream_header_value_as_string(struct aws_event_stream_header_value_pair * header)830 struct aws_byte_buf aws_event_stream_header_value_as_string(struct aws_event_stream_header_value_pair *header) {
831     return aws_event_stream_header_value_as_bytebuf(header);
832 }
833 
aws_event_stream_header_value_as_bool(struct aws_event_stream_header_value_pair * header)834 int8_t aws_event_stream_header_value_as_bool(struct aws_event_stream_header_value_pair *header) {
835     return header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE ? (int8_t)1 : (int8_t)0;
836 }
837 
aws_event_stream_header_value_as_int16(struct aws_event_stream_header_value_pair * header)838 int16_t aws_event_stream_header_value_as_int16(struct aws_event_stream_header_value_pair *header) {
839     return (int16_t)aws_read_u16(header->header_value.static_val);
840 }
841 
aws_event_stream_header_value_as_int32(struct aws_event_stream_header_value_pair * header)842 int32_t aws_event_stream_header_value_as_int32(struct aws_event_stream_header_value_pair *header) {
843     return (int32_t)aws_read_u32(header->header_value.static_val);
844 }
845 
aws_event_stream_header_value_as_int64(struct aws_event_stream_header_value_pair * header)846 int64_t aws_event_stream_header_value_as_int64(struct aws_event_stream_header_value_pair *header) {
847     return (int64_t)aws_read_u64(header->header_value.static_val);
848 }
849 
aws_event_stream_header_value_as_bytebuf(struct aws_event_stream_header_value_pair * header)850 struct aws_byte_buf aws_event_stream_header_value_as_bytebuf(struct aws_event_stream_header_value_pair *header) {
851     return aws_byte_buf_from_array(header->header_value.variable_len_val, header->header_value_len);
852 }
853 
aws_event_stream_header_value_as_timestamp(struct aws_event_stream_header_value_pair * header)854 int64_t aws_event_stream_header_value_as_timestamp(struct aws_event_stream_header_value_pair *header) {
855     return aws_event_stream_header_value_as_int64(header);
856 }
857 
aws_event_stream_header_value_as_uuid(struct aws_event_stream_header_value_pair * header)858 struct aws_byte_buf aws_event_stream_header_value_as_uuid(struct aws_event_stream_header_value_pair *header) {
859     return aws_byte_buf_from_array(header->header_value.static_val, 16);
860 }
861 
aws_event_stream_header_value_length(struct aws_event_stream_header_value_pair * header)862 uint16_t aws_event_stream_header_value_length(struct aws_event_stream_header_value_pair *header) {
863     return header->header_value_len;
864 }
865 
866 static struct aws_event_stream_message_prelude s_empty_prelude = {.total_len = 0, .headers_len = 0, .prelude_crc = 0};
867 
s_reset_header_state(struct aws_event_stream_streaming_decoder * decoder,uint8_t free_header_data)868 static void s_reset_header_state(struct aws_event_stream_streaming_decoder *decoder, uint8_t free_header_data) {
869 
870     if (free_header_data && decoder->current_header.value_owned) {
871         aws_mem_release(decoder->alloc, (void *)decoder->current_header.header_value.variable_len_val);
872     }
873 
874     memset((void *)&decoder->current_header, 0, sizeof(struct aws_event_stream_header_value_pair));
875 }
876 
877 static void s_reset_state(struct aws_event_stream_streaming_decoder *decoder);
878 
879 static int s_headers_state(
880     struct aws_event_stream_streaming_decoder *decoder,
881     const uint8_t *data,
882     size_t len,
883     size_t *processed);
884 
s_read_header_value(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)885 static int s_read_header_value(
886     struct aws_event_stream_streaming_decoder *decoder,
887     const uint8_t *data,
888     size_t len,
889     size_t *processed) {
890 
891     size_t current_pos = decoder->message_pos;
892 
893     size_t length_read = current_pos - decoder->current_header_value_offset;
894     struct aws_event_stream_header_value_pair *current_header = &decoder->current_header;
895 
896     if (!length_read) {
897         /* save an allocation, this can only happen if the data we were handed is larger than the length of the header
898          * value. we don't really need to handle offsets in this case. This expects the user is living by the contract
899          * that they cannot act like they own this memory beyond the lifetime of their callback, and they should not
900          * mutate it */
901         if (len >= current_header->header_value_len) {
902             /* this part works regardless of type since the layout of the union will line up. */
903             current_header->header_value.variable_len_val = (uint8_t *)data;
904             current_header->value_owned = 0;
905             decoder->on_header(decoder, &decoder->prelude, &decoder->current_header, decoder->user_context);
906             *processed += current_header->header_value_len;
907             decoder->message_pos += current_header->header_value_len;
908             decoder->running_crc =
909                 aws_checksums_crc32(data, (int)current_header->header_value_len, decoder->running_crc);
910 
911             s_reset_header_state(decoder, 1);
912             decoder->state = s_headers_state;
913             return AWS_OP_SUCCESS;
914         }
915 
916         /* a possible optimization later would be to only allocate this once, and then keep reusing the same buffer. for
917          * subsequent messages.*/
918         if (current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF ||
919             current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING) {
920             current_header->header_value.variable_len_val =
921                 aws_mem_acquire(decoder->alloc, decoder->current_header.header_value_len);
922 
923             if (!current_header->header_value.variable_len_val) {
924                 return aws_raise_error(AWS_ERROR_OOM);
925             }
926 
927             current_header->value_owned = 1;
928         }
929     }
930 
931     size_t max_read =
932         len >= current_header->header_value_len - length_read ? current_header->header_value_len - length_read : len;
933 
934     const uint8_t *header_value_alias = current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF ||
935                                                 current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING
936                                             ? current_header->header_value.variable_len_val
937                                             : current_header->header_value.static_val;
938 
939     memcpy((void *)(header_value_alias + length_read), data, max_read);
940     decoder->running_crc = aws_checksums_crc32(data, (int)max_read, decoder->running_crc);
941 
942     *processed += max_read;
943     decoder->message_pos += max_read;
944     length_read += max_read;
945 
946     if (length_read == current_header->header_value_len) {
947         decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
948         s_reset_header_state(decoder, 1);
949         decoder->state = s_headers_state;
950     }
951 
952     return AWS_OP_SUCCESS;
953 }
954 
s_read_header_value_len(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)955 static int s_read_header_value_len(
956     struct aws_event_stream_streaming_decoder *decoder,
957     const uint8_t *data,
958     size_t len,
959     size_t *processed) {
960     size_t current_pos = decoder->message_pos;
961 
962     size_t length_portion_read = current_pos - decoder->current_header_value_offset;
963 
964     if (length_portion_read < sizeof(uint16_t)) {
965         size_t max_to_read =
966             len > sizeof(uint16_t) - length_portion_read ? sizeof(uint16_t) - length_portion_read : len;
967         memcpy(decoder->working_buffer + length_portion_read, data, max_to_read);
968         decoder->running_crc = aws_checksums_crc32(data, (int)max_to_read, decoder->running_crc);
969 
970         *processed += max_to_read;
971         decoder->message_pos += max_to_read;
972 
973         length_portion_read = decoder->message_pos - decoder->current_header_value_offset;
974     }
975 
976     if (length_portion_read == sizeof(uint16_t)) {
977         decoder->current_header.header_value_len = aws_read_u16(decoder->working_buffer);
978         decoder->current_header_value_offset = decoder->message_pos;
979         decoder->state = s_read_header_value;
980     }
981 
982     return AWS_OP_SUCCESS;
983 }
984 
s_read_header_type(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)985 static int s_read_header_type(
986     struct aws_event_stream_streaming_decoder *decoder,
987     const uint8_t *data,
988     size_t len,
989     size_t *processed) {
990     (void)len;
991     uint8_t type = *data;
992     decoder->running_crc = aws_checksums_crc32(data, 1, decoder->running_crc);
993     *processed += 1;
994     decoder->message_pos++;
995     decoder->current_header_value_offset++;
996     struct aws_event_stream_header_value_pair *current_header = &decoder->current_header;
997 
998     if (type >= AWS_EVENT_STREAM_HEADER_BOOL_FALSE && type <= AWS_EVENT_STREAM_HEADER_UUID) {
999         current_header->header_value_type = (enum aws_event_stream_header_value_type)type;
1000 
1001         switch (type) {
1002             case AWS_EVENT_STREAM_HEADER_STRING:
1003             case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
1004                 decoder->state = s_read_header_value_len;
1005                 break;
1006             case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
1007                 current_header->header_value_len = 0;
1008                 current_header->header_value.static_val[0] = 0;
1009                 decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
1010                 s_reset_header_state(decoder, 1);
1011                 break;
1012             case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
1013                 current_header->header_value_len = 0;
1014                 current_header->header_value.static_val[0] = 1;
1015                 decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
1016                 s_reset_header_state(decoder, 1);
1017                 break;
1018             case AWS_EVENT_STREAM_HEADER_BYTE:
1019                 current_header->header_value_len = 1;
1020                 decoder->state = s_read_header_value;
1021                 break;
1022             case AWS_EVENT_STREAM_HEADER_INT16:
1023                 current_header->header_value_len = sizeof(uint16_t);
1024                 decoder->state = s_read_header_value;
1025                 break;
1026             case AWS_EVENT_STREAM_HEADER_INT32:
1027                 current_header->header_value_len = sizeof(uint32_t);
1028                 decoder->state = s_read_header_value;
1029                 break;
1030             case AWS_EVENT_STREAM_HEADER_INT64:
1031             case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
1032                 current_header->header_value_len = sizeof(uint64_t);
1033                 decoder->state = s_read_header_value;
1034                 break;
1035             case AWS_EVENT_STREAM_HEADER_UUID:
1036                 current_header->header_value_len = 16;
1037                 decoder->state = s_read_header_value;
1038                 break;
1039             default:
1040                 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE);
1041         }
1042 
1043         return AWS_OP_SUCCESS;
1044     }
1045 
1046     return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE);
1047 }
1048 
s_read_header_name(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1049 static int s_read_header_name(
1050     struct aws_event_stream_streaming_decoder *decoder,
1051     const uint8_t *data,
1052     size_t len,
1053     size_t *processed) {
1054     size_t current_pos = decoder->message_pos;
1055 
1056     size_t length_read = current_pos - decoder->current_header_name_offset;
1057 
1058     size_t max_read = len >= decoder->current_header.header_name_len - length_read
1059                           ? decoder->current_header.header_name_len - length_read
1060                           : len;
1061     memcpy((void *)(decoder->current_header.header_name + length_read), data, max_read);
1062     decoder->running_crc = aws_checksums_crc32(data, (int)max_read, decoder->running_crc);
1063 
1064     *processed += max_read;
1065     decoder->message_pos += max_read;
1066     length_read += max_read;
1067 
1068     if (length_read == decoder->current_header.header_name_len) {
1069         decoder->state = s_read_header_type;
1070         decoder->current_header_value_offset = decoder->message_pos;
1071     }
1072 
1073     return AWS_OP_SUCCESS;
1074 }
1075 
s_read_header_name_len(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1076 static int s_read_header_name_len(
1077     struct aws_event_stream_streaming_decoder *decoder,
1078     const uint8_t *data,
1079     size_t len,
1080     size_t *processed) {
1081     (void)len;
1082     decoder->current_header.header_name_len = *data;
1083     decoder->message_pos++;
1084     decoder->current_header_name_offset++;
1085     *processed += 1;
1086     decoder->state = s_read_header_name;
1087     decoder->running_crc = aws_checksums_crc32(data, 1, decoder->running_crc);
1088 
1089     return AWS_OP_SUCCESS;
1090 }
1091 
s_start_header(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1092 static int s_start_header(
1093     struct aws_event_stream_streaming_decoder *decoder,
1094     const uint8_t *data,
1095     size_t len,
1096     size_t *processed) /* NOLINT */ {
1097     (void)data;
1098     (void)len;
1099     (void)processed;
1100     decoder->state = s_read_header_name_len;
1101     decoder->current_header_name_offset = decoder->message_pos;
1102 
1103     return AWS_OP_SUCCESS;
1104 }
1105 
1106 static int s_payload_state(
1107     struct aws_event_stream_streaming_decoder *decoder,
1108     const uint8_t *data,
1109     size_t len,
1110     size_t *processed);
1111 
1112 /*Handles the initial state for header parsing.
1113   will oscillate between multiple other states as well.
1114   after all headers have been handled, payload will be set as the next state. */
s_headers_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1115 static int s_headers_state(
1116     struct aws_event_stream_streaming_decoder *decoder,
1117     const uint8_t *data,
1118     size_t len,
1119     size_t *processed) /* NOLINT */ {
1120     (void)data;
1121     (void)len;
1122     (void)processed;
1123 
1124     size_t current_pos = decoder->message_pos;
1125 
1126     size_t headers_boundary = decoder->prelude.headers_len + AWS_EVENT_STREAM_PRELUDE_LENGTH;
1127 
1128     if (current_pos < headers_boundary) {
1129         decoder->state = s_start_header;
1130         return AWS_OP_SUCCESS;
1131     }
1132 
1133     if (current_pos == headers_boundary) {
1134         decoder->state = s_payload_state;
1135         return AWS_OP_SUCCESS;
1136     }
1137 
1138     return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_PARSER_ILLEGAL_STATE);
1139 }
1140 
1141 /* handles reading the trailer. Once it has been read, it will be compared to the running checksum. If successful,
1142  * the state will be reset. */
s_read_trailer_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1143 static int s_read_trailer_state(
1144     struct aws_event_stream_streaming_decoder *decoder,
1145     const uint8_t *data,
1146     size_t len,
1147     size_t *processed) {
1148 
1149     size_t remaining_amount = decoder->prelude.total_len - decoder->message_pos;
1150     size_t segment_length = len > remaining_amount ? remaining_amount : len;
1151     size_t offset = sizeof(uint32_t) - remaining_amount;
1152     memcpy(decoder->working_buffer + offset, data, segment_length);
1153     decoder->message_pos += segment_length;
1154     *processed += segment_length;
1155 
1156     if (decoder->message_pos == decoder->prelude.total_len) {
1157         uint32_t message_crc = aws_read_u32(decoder->working_buffer);
1158 
1159         if (message_crc == decoder->running_crc) {
1160             s_reset_state(decoder);
1161         } else {
1162             char error_message[70];
1163             snprintf(
1164                 error_message,
1165                 sizeof(error_message),
1166                 "CRC Mismatch. message_crc was 0x08%" PRIX32 ", but computed 0x08%" PRIX32,
1167                 message_crc,
1168                 decoder->running_crc);
1169             aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE);
1170             decoder->on_error(
1171                 decoder,
1172                 &decoder->prelude,
1173                 AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE,
1174                 error_message,
1175                 decoder->user_context);
1176             return AWS_OP_ERR;
1177         }
1178     }
1179 
1180     return AWS_OP_SUCCESS;
1181 }
1182 
1183 /* handles the reading of the payload up to the final checksum. Sets read_trailer_state as the new state once
1184  * the payload has been processed. */
s_payload_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1185 static int s_payload_state(
1186     struct aws_event_stream_streaming_decoder *decoder,
1187     const uint8_t *data,
1188     size_t len,
1189     size_t *processed) {
1190 
1191     if (decoder->message_pos < decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH) {
1192         size_t remaining_amount = decoder->prelude.total_len - decoder->message_pos - AWS_EVENT_STREAM_TRAILER_LENGTH;
1193         size_t segment_length = len > remaining_amount ? remaining_amount : len;
1194         int8_t final_segment =
1195             (segment_length + decoder->message_pos) == (decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH);
1196         struct aws_byte_buf payload_buf = aws_byte_buf_from_array(data, segment_length);
1197         decoder->on_payload(decoder, &payload_buf, final_segment, decoder->user_context);
1198         decoder->message_pos += segment_length;
1199         decoder->running_crc = aws_checksums_crc32(data, (int)segment_length, decoder->running_crc);
1200         *processed += segment_length;
1201     }
1202 
1203     if (decoder->message_pos == decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH) {
1204         decoder->state = s_read_trailer_state;
1205     }
1206 
1207     return AWS_OP_SUCCESS;
1208 }
1209 
1210 /* Parses the payload and verifies checksums. Sets the next state if successful. */
s_verify_prelude_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1211 static int s_verify_prelude_state(
1212     struct aws_event_stream_streaming_decoder *decoder,
1213     const uint8_t *data,
1214     size_t len,
1215     size_t *processed) /* NOLINT */ {
1216     (void)data;
1217     (void)len;
1218     (void)processed;
1219 
1220     decoder->prelude.headers_len = aws_read_u32(decoder->working_buffer + HEADER_LEN_OFFSET);
1221     decoder->prelude.prelude_crc = aws_read_u32(decoder->working_buffer + PRELUDE_CRC_OFFSET);
1222     decoder->prelude.total_len = aws_read_u32(decoder->working_buffer + TOTAL_LEN_OFFSET);
1223 
1224     decoder->running_crc = aws_checksums_crc32(decoder->working_buffer, PRELUDE_CRC_OFFSET, 0);
1225 
1226     if (AWS_LIKELY(decoder->running_crc == decoder->prelude.prelude_crc)) {
1227 
1228         if (AWS_UNLIKELY(
1229                 decoder->prelude.headers_len > AWS_EVENT_STREAM_MAX_HEADERS_SIZE ||
1230                 decoder->prelude.total_len > AWS_EVENT_STREAM_MAX_MESSAGE_SIZE)) {
1231             aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
1232             char error_message[] = "Maximum message field size exceeded";
1233 
1234             decoder->on_error(
1235                 decoder,
1236                 &decoder->prelude,
1237                 AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED,
1238                 error_message,
1239                 decoder->user_context);
1240             return AWS_OP_ERR;
1241         }
1242 
1243         /* Should only call on_prelude() after passing crc check and limitation check, otherwise call on_prelude() with
1244          * incorrect prelude is error prune. */
1245         decoder->on_prelude(decoder, &decoder->prelude, decoder->user_context);
1246 
1247         decoder->running_crc = aws_checksums_crc32(
1248             decoder->working_buffer + PRELUDE_CRC_OFFSET,
1249             (int)sizeof(decoder->prelude.prelude_crc),
1250             decoder->running_crc);
1251         memset(decoder->working_buffer, 0, sizeof(decoder->working_buffer));
1252         decoder->state = decoder->prelude.headers_len > 0 ? s_headers_state : s_payload_state;
1253     } else {
1254         char error_message[70];
1255         snprintf(
1256             error_message,
1257             sizeof(error_message),
1258             "CRC Mismatch. prelude_crc was 0x08%" PRIX32 ", but computed 0x08%" PRIX32,
1259             decoder->prelude.prelude_crc,
1260             decoder->running_crc);
1261 
1262         aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE);
1263         decoder->on_error(
1264             decoder,
1265             &decoder->prelude,
1266             AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE,
1267             error_message,
1268             decoder->user_context);
1269         return AWS_OP_ERR;
1270     }
1271 
1272     return AWS_OP_SUCCESS;
1273 }
1274 
1275 /* initial state handles up to the reading of the prelude */
s_start_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1276 static int s_start_state(
1277     struct aws_event_stream_streaming_decoder *decoder,
1278     const uint8_t *data,
1279     size_t len,
1280     size_t *processed) {
1281 
1282     size_t previous_position = decoder->message_pos;
1283     if (decoder->message_pos < AWS_EVENT_STREAM_PRELUDE_LENGTH) {
1284         if (len >= AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos) {
1285             memcpy(
1286                 decoder->working_buffer + decoder->message_pos,
1287                 data,
1288                 AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos);
1289             decoder->message_pos += AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos;
1290         } else {
1291             memcpy(decoder->working_buffer + decoder->message_pos, data, len);
1292             decoder->message_pos += len;
1293         }
1294 
1295         *processed += decoder->message_pos - previous_position;
1296     }
1297 
1298     if (decoder->message_pos == AWS_EVENT_STREAM_PRELUDE_LENGTH) {
1299         decoder->state = s_verify_prelude_state;
1300     }
1301 
1302     return AWS_OP_SUCCESS;
1303 }
1304 
s_reset_state(struct aws_event_stream_streaming_decoder * decoder)1305 static void s_reset_state(struct aws_event_stream_streaming_decoder *decoder) {
1306     decoder->message_pos = 0;
1307     decoder->prelude = s_empty_prelude;
1308     decoder->running_crc = 0;
1309     memset(decoder->working_buffer, 0, sizeof(decoder->working_buffer));
1310     decoder->state = s_start_state;
1311 }
1312 
aws_event_stream_streaming_decoder_init(struct aws_event_stream_streaming_decoder * decoder,struct aws_allocator * alloc,aws_event_stream_process_on_payload_segment_fn * on_payload_segment,aws_event_stream_prelude_received_fn * on_prelude,aws_event_stream_header_received_fn * on_header,aws_event_stream_on_error_fn * on_error,void * user_data)1313 void aws_event_stream_streaming_decoder_init(
1314     struct aws_event_stream_streaming_decoder *decoder,
1315     struct aws_allocator *alloc,
1316     aws_event_stream_process_on_payload_segment_fn *on_payload_segment,
1317     aws_event_stream_prelude_received_fn *on_prelude,
1318     aws_event_stream_header_received_fn *on_header,
1319     aws_event_stream_on_error_fn *on_error,
1320     void *user_data) {
1321 
1322     s_reset_state(decoder);
1323     decoder->alloc = alloc;
1324     decoder->on_error = on_error;
1325     decoder->on_header = on_header;
1326     decoder->on_payload = on_payload_segment;
1327     decoder->on_prelude = on_prelude;
1328     decoder->user_context = user_data;
1329 }
1330 
aws_event_stream_streaming_decoder_clean_up(struct aws_event_stream_streaming_decoder * decoder)1331 void aws_event_stream_streaming_decoder_clean_up(struct aws_event_stream_streaming_decoder *decoder) {
1332     s_reset_state(decoder);
1333     decoder->on_error = 0;
1334     decoder->on_header = 0;
1335     decoder->on_payload = 0;
1336     decoder->on_prelude = 0;
1337     decoder->user_context = 0;
1338 }
1339 
1340 /* Simply sends the data to the state machine until all has been processed or an error is returned. */
aws_event_stream_streaming_decoder_pump(struct aws_event_stream_streaming_decoder * decoder,const struct aws_byte_buf * data)1341 int aws_event_stream_streaming_decoder_pump(
1342     struct aws_event_stream_streaming_decoder *decoder,
1343     const struct aws_byte_buf *data) {
1344 
1345     size_t processed = 0;
1346     int err_val = 0;
1347     while (!err_val && data->buffer && data->len && processed < data->len) {
1348         err_val = decoder->state(decoder, data->buffer + processed, data->len - processed, &processed);
1349     }
1350 
1351     return err_val;
1352 }
1353 #if _MSC_VER
1354 #    pragma warning(pop)
1355 #endif
1356