1 /**
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0.
4 */
5
6 #include <aws/event-stream/event_stream.h>
7
8 #include <aws/checksums/crc.h>
9
10 #include <aws/common/encoding.h>
11 #include <aws/io/io.h>
12
13 #include <inttypes.h>
14
15 #define LIB_NAME "libaws-c-event-stream"
16
17 #if _MSC_VER
18 # pragma warning(push)
19 # pragma warning(disable : 4221) /* aggregate initializer using local variable addresses */
20 # pragma warning(disable : 4204) /* non-constant aggregate initializer */
21 # pragma warning(disable : 4306) /* msft doesn't trust us to do pointer arithmetic. */
22 #endif
23
24 static struct aws_error_info s_errors[] = {
25 AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH, "Buffer length mismatch", LIB_NAME),
26 AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_INSUFFICIENT_BUFFER_LEN, "insufficient buffer length", LIB_NAME),
27 AWS_DEFINE_ERROR_INFO(
28 AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED,
29 "a field for the message was too large",
30 LIB_NAME),
31 AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE, "prelude checksum was incorrect", LIB_NAME),
32 AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE, "message checksum was incorrect", LIB_NAME),
33 AWS_DEFINE_ERROR_INFO(
34 AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN,
35 "message headers length was incorrect",
36 LIB_NAME),
37 AWS_DEFINE_ERROR_INFO(
38 AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE,
39 "An unknown header type was encountered",
40 LIB_NAME),
41 AWS_DEFINE_ERROR_INFO(
42 AWS_ERROR_EVENT_STREAM_MESSAGE_PARSER_ILLEGAL_STATE,
43 "message parser encountered an illegal state",
44 LIB_NAME),
45 AWS_DEFINE_ERROR_INFO(
46 AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED,
47 "event stream rpc connection has been closed",
48 LIB_NAME),
49 AWS_DEFINE_ERROR_INFO(
50 AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR,
51 "event stream rpc connection has encountered a protocol error",
52 LIB_NAME),
53 AWS_DEFINE_ERROR_INFO(
54 AWS_ERROR_EVENT_STREAM_RPC_STREAM_CLOSED,
55 "event stream rpc connection stream is closed.",
56 LIB_NAME),
57 AWS_DEFINE_ERROR_INFO(
58 AWS_ERROR_EVENT_STREAM_RPC_STREAM_NOT_ACTIVATED,
59 "event stream rpc stream continuation was not successfully activated before use. Call "
60 "aws_event_stream_rpc_client_continuation_activate()"
61 " before using a stream continuation token.",
62 LIB_NAME),
63 };
64
65 static struct aws_error_info_list s_list = {
66 .error_list = s_errors,
67 .count = sizeof(s_errors) / sizeof(struct aws_error_info),
68 };
69
70 static bool s_event_stream_library_initialized = false;
71
72 static struct aws_log_subject_info s_event_stream_log_subject_infos[] = {
73 DEFINE_LOG_SUBJECT_INFO(
74 AWS_LS_EVENT_STREAM_GENERAL,
75 "event-stream-general",
76 "Subject for aws-c-event-stream logging that defies categorization."),
77 DEFINE_LOG_SUBJECT_INFO(
78 AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
79 "event-stream-channel-handler",
80 "Subject for event-stream channel handler related logging."),
81 DEFINE_LOG_SUBJECT_INFO(
82 AWS_LS_EVENT_STREAM_RPC_SERVER,
83 "event-stream-rpc-server",
84 "Subject for event-stream rpc server."),
85 DEFINE_LOG_SUBJECT_INFO(
86 AWS_LS_EVENT_STREAM_RPC_CLIENT,
87 "event-stream-rpc-client",
88 "Subject for event-stream rpc client."),
89 };
90
91 static struct aws_log_subject_info_list s_event_stream_log_subject_list = {
92 .subject_list = s_event_stream_log_subject_infos,
93 .count = AWS_ARRAY_SIZE(s_event_stream_log_subject_infos),
94 };
95
aws_event_stream_library_init(struct aws_allocator * allocator)96 void aws_event_stream_library_init(struct aws_allocator *allocator) {
97 if (!s_event_stream_library_initialized) {
98 s_event_stream_library_initialized = true;
99 aws_io_library_init(allocator);
100 aws_register_error_info(&s_list);
101 aws_register_log_subject_info_list(&s_event_stream_log_subject_list);
102 }
103 }
104
aws_event_stream_library_clean_up(void)105 void aws_event_stream_library_clean_up(void) {
106 if (s_event_stream_library_initialized) {
107 s_event_stream_library_initialized = false;
108 aws_unregister_error_info(&s_list);
109 aws_io_library_clean_up();
110 }
111 }
112
113 #define TOTAL_LEN_OFFSET 0
114 #define PRELUDE_CRC_OFFSET (sizeof(uint32_t) + sizeof(uint32_t))
115 #define HEADER_LEN_OFFSET sizeof(uint32_t)
116
117 /* Computes the byte length necessary to store the headers represented in the headers list.
118 * returns that length. */
aws_event_stream_compute_headers_required_buffer_len(const struct aws_array_list * headers)119 uint32_t aws_event_stream_compute_headers_required_buffer_len(const struct aws_array_list *headers) {
120 if (!headers || !aws_array_list_length(headers)) {
121 return 0;
122 }
123
124 size_t headers_count = aws_array_list_length(headers);
125 size_t headers_len = 0;
126
127 for (size_t i = 0; i < headers_count; ++i) {
128 struct aws_event_stream_header_value_pair *header = NULL;
129
130 aws_array_list_get_at_ptr(headers, (void **)&header, i);
131
132 headers_len += sizeof(header->header_name_len) + header->header_name_len + 1;
133
134 if (header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING ||
135 header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF) {
136 headers_len += sizeof(header->header_value_len);
137 }
138
139 if (header->header_value_type != AWS_EVENT_STREAM_HEADER_BOOL_FALSE &&
140 header->header_value_type != AWS_EVENT_STREAM_HEADER_BOOL_TRUE) {
141 headers_len += header->header_value_len;
142 }
143 }
144
145 return (uint32_t)headers_len;
146 }
147
148 /* adds the headers represented in the headers list to the buffer.
149 returns the new buffer offset for use elsewhere. Assumes buffer length is at least the length of the return value
150 from compute_headers_length() */
aws_event_stream_write_headers_to_buffer(const struct aws_array_list * headers,uint8_t * buffer)151 size_t aws_event_stream_write_headers_to_buffer(const struct aws_array_list *headers, uint8_t *buffer) {
152 if (!headers || !aws_array_list_length(headers)) {
153 return 0;
154 }
155
156 size_t headers_count = aws_array_list_length(headers);
157 uint8_t *buffer_alias = buffer;
158
159 for (size_t i = 0; i < headers_count; ++i) {
160 struct aws_event_stream_header_value_pair *header = NULL;
161
162 aws_array_list_get_at_ptr(headers, (void **)&header, i);
163 *buffer_alias = (uint8_t)header->header_name_len;
164 buffer_alias++;
165 memcpy(buffer_alias, header->header_name, (size_t)header->header_name_len);
166 buffer_alias += header->header_name_len;
167 *buffer_alias = (uint8_t)header->header_value_type;
168 buffer_alias++;
169 switch (header->header_value_type) {
170 case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
171 case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
172 break;
173 case AWS_EVENT_STREAM_HEADER_BYTE:
174 *buffer_alias = header->header_value.static_val[0];
175 buffer_alias++;
176 break;
177 /* additions of integers here assume the endianness conversion has already happened */
178 case AWS_EVENT_STREAM_HEADER_INT16:
179 memcpy(buffer_alias, header->header_value.static_val, sizeof(uint16_t));
180 buffer_alias += sizeof(uint16_t);
181 break;
182 case AWS_EVENT_STREAM_HEADER_INT32:
183 memcpy(buffer_alias, header->header_value.static_val, sizeof(uint32_t));
184 buffer_alias += sizeof(uint32_t);
185 break;
186 case AWS_EVENT_STREAM_HEADER_INT64:
187 case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
188 memcpy(buffer_alias, header->header_value.static_val, sizeof(uint64_t));
189 buffer_alias += sizeof(uint64_t);
190 break;
191 case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
192 case AWS_EVENT_STREAM_HEADER_STRING:
193 aws_write_u16(header->header_value_len, buffer_alias);
194 buffer_alias += sizeof(uint16_t);
195 memcpy(buffer_alias, header->header_value.variable_len_val, header->header_value_len);
196 buffer_alias += header->header_value_len;
197 break;
198 case AWS_EVENT_STREAM_HEADER_UUID:
199 memcpy(buffer_alias, header->header_value.static_val, 16);
200 buffer_alias += header->header_value_len;
201 break;
202 }
203 }
204
205 return buffer_alias - buffer;
206 }
207
aws_event_stream_read_headers_from_buffer(struct aws_array_list * headers,const uint8_t * buffer,size_t headers_len)208 int aws_event_stream_read_headers_from_buffer(
209 struct aws_array_list *headers,
210 const uint8_t *buffer,
211 size_t headers_len) {
212
213 if (AWS_UNLIKELY(headers_len > AWS_EVENT_STREAM_MAX_HEADERS_SIZE)) {
214 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
215 }
216
217 /* iterate the buffer per header. */
218 const uint8_t *buffer_start = buffer;
219 while ((size_t)(buffer - buffer_start) < headers_len) {
220 struct aws_event_stream_header_value_pair header;
221 AWS_ZERO_STRUCT(header);
222
223 /* get the header info from the buffer, make sure to increment buffer offset. */
224 header.header_name_len = *buffer;
225 buffer += sizeof(header.header_name_len);
226 memcpy((void *)header.header_name, buffer, (size_t)header.header_name_len);
227 buffer += header.header_name_len;
228 header.header_value_type = (enum aws_event_stream_header_value_type) * buffer;
229 buffer++;
230
231 switch (header.header_value_type) {
232 case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
233 header.header_value_len = 0;
234 header.header_value.static_val[0] = 0;
235 break;
236 case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
237 header.header_value_len = 0;
238 header.header_value.static_val[0] = 1;
239 break;
240 case AWS_EVENT_STREAM_HEADER_BYTE:
241 header.header_value_len = sizeof(uint8_t);
242 header.header_value.static_val[0] = *buffer;
243 buffer++;
244 break;
245 case AWS_EVENT_STREAM_HEADER_INT16:
246 header.header_value_len = sizeof(uint16_t);
247 memcpy(header.header_value.static_val, buffer, sizeof(uint16_t));
248 buffer += sizeof(uint16_t);
249 break;
250 case AWS_EVENT_STREAM_HEADER_INT32:
251 header.header_value_len = sizeof(uint32_t);
252 memcpy(header.header_value.static_val, buffer, sizeof(uint32_t));
253 buffer += sizeof(uint32_t);
254 break;
255 case AWS_EVENT_STREAM_HEADER_INT64:
256 case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
257 header.header_value_len = sizeof(uint64_t);
258 memcpy(header.header_value.static_val, buffer, sizeof(uint64_t));
259 buffer += sizeof(uint64_t);
260 break;
261 case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
262 case AWS_EVENT_STREAM_HEADER_STRING:
263 header.header_value_len = aws_read_u16(buffer);
264 buffer += sizeof(header.header_value_len);
265 header.header_value.variable_len_val = (uint8_t *)buffer;
266 buffer += header.header_value_len;
267 break;
268 case AWS_EVENT_STREAM_HEADER_UUID:
269 header.header_value_len = 16;
270 memcpy(header.header_value.static_val, buffer, 16);
271 buffer += header.header_value_len;
272 break;
273 }
274
275 if (aws_array_list_push_back(headers, (const void *)&header)) {
276 return AWS_OP_ERR;
277 }
278 }
279
280 return AWS_OP_SUCCESS;
281 }
282
283 /* initialize message with the arguments
284 * the underlying buffer will be allocated and payload will be copied.
285 * see specification, this code should simply add these fields according to that.*/
aws_event_stream_message_init(struct aws_event_stream_message * message,struct aws_allocator * alloc,struct aws_array_list * headers,struct aws_byte_buf * payload)286 int aws_event_stream_message_init(
287 struct aws_event_stream_message *message,
288 struct aws_allocator *alloc,
289 struct aws_array_list *headers,
290 struct aws_byte_buf *payload) {
291
292 size_t payload_len = payload ? payload->len : 0;
293
294 uint32_t headers_length = aws_event_stream_compute_headers_required_buffer_len(headers);
295
296 if (AWS_UNLIKELY(headers_length > AWS_EVENT_STREAM_MAX_HEADERS_SIZE)) {
297 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
298 }
299
300 uint32_t total_length =
301 (uint32_t)(AWS_EVENT_STREAM_PRELUDE_LENGTH + headers_length + payload_len + AWS_EVENT_STREAM_TRAILER_LENGTH);
302
303 if (AWS_UNLIKELY(total_length < headers_length || total_length < payload_len)) {
304 return aws_raise_error(AWS_ERROR_OVERFLOW_DETECTED);
305 }
306
307 if (AWS_UNLIKELY(total_length > AWS_EVENT_STREAM_MAX_MESSAGE_SIZE)) {
308 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
309 }
310
311 message->alloc = alloc;
312 message->message_buffer = aws_mem_acquire(message->alloc, total_length);
313
314 if (message->message_buffer) {
315 message->owns_buffer = 1;
316 aws_write_u32(total_length, message->message_buffer);
317 uint8_t *buffer_offset = message->message_buffer + sizeof(total_length);
318 aws_write_u32(headers_length, buffer_offset);
319 buffer_offset += sizeof(headers_length);
320
321 uint32_t running_crc =
322 aws_checksums_crc32(message->message_buffer, (int)(buffer_offset - message->message_buffer), 0);
323
324 const uint8_t *message_crc_boundary_start = buffer_offset;
325 aws_write_u32(running_crc, buffer_offset);
326 buffer_offset += sizeof(running_crc);
327
328 if (headers_length) {
329 buffer_offset += aws_event_stream_write_headers_to_buffer(headers, buffer_offset);
330 }
331
332 if (payload) {
333 memcpy(buffer_offset, payload->buffer, payload->len);
334 buffer_offset += payload->len;
335 }
336
337 running_crc = aws_checksums_crc32(
338 message_crc_boundary_start, (int)(buffer_offset - message_crc_boundary_start), running_crc);
339 aws_write_u32(running_crc, buffer_offset);
340
341 return AWS_OP_SUCCESS;
342 }
343
344 return aws_raise_error(AWS_ERROR_OOM);
345 }
346
347 /* add buffer to the message (non-owning). Verify buffer crcs and that length fields are reasonable. */
aws_event_stream_message_from_buffer(struct aws_event_stream_message * message,struct aws_allocator * alloc,struct aws_byte_buf * buffer)348 int aws_event_stream_message_from_buffer(
349 struct aws_event_stream_message *message,
350 struct aws_allocator *alloc,
351 struct aws_byte_buf *buffer) {
352 AWS_ASSERT(buffer);
353
354 message->alloc = alloc;
355 message->owns_buffer = 0;
356
357 if (AWS_UNLIKELY(buffer->len < AWS_EVENT_STREAM_PRELUDE_LENGTH + AWS_EVENT_STREAM_TRAILER_LENGTH)) {
358 return aws_raise_error(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH);
359 }
360
361 uint32_t message_length = aws_read_u32(buffer->buffer + TOTAL_LEN_OFFSET);
362
363 if (AWS_UNLIKELY(message_length != buffer->len)) {
364 return aws_raise_error(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH);
365 }
366
367 if (AWS_UNLIKELY(message_length > AWS_EVENT_STREAM_MAX_MESSAGE_SIZE)) {
368 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
369 }
370
371 uint32_t running_crc = aws_checksums_crc32(buffer->buffer, (int)PRELUDE_CRC_OFFSET, 0);
372 uint32_t prelude_crc = aws_read_u32(buffer->buffer + PRELUDE_CRC_OFFSET);
373
374 if (running_crc != prelude_crc) {
375 return aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE);
376 }
377
378 running_crc = aws_checksums_crc32(
379 buffer->buffer + PRELUDE_CRC_OFFSET,
380 (int)(message_length - PRELUDE_CRC_OFFSET - AWS_EVENT_STREAM_TRAILER_LENGTH),
381 running_crc);
382 uint32_t message_crc = aws_read_u32(buffer->buffer + message_length - AWS_EVENT_STREAM_TRAILER_LENGTH);
383
384 if (running_crc != message_crc) {
385 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE);
386 }
387
388 message->message_buffer = buffer->buffer;
389
390 if (aws_event_stream_message_headers_len(message) >
391 message_length - AWS_EVENT_STREAM_PRELUDE_LENGTH - AWS_EVENT_STREAM_TRAILER_LENGTH) {
392 message->message_buffer = 0;
393 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN);
394 }
395
396 return AWS_OP_SUCCESS;
397 }
398
399 /* Verify buffer crcs and that length fields are reasonable. Once that is done, the buffer is copied to the message. */
aws_event_stream_message_from_buffer_copy(struct aws_event_stream_message * message,struct aws_allocator * alloc,const struct aws_byte_buf * buffer)400 int aws_event_stream_message_from_buffer_copy(
401 struct aws_event_stream_message *message,
402 struct aws_allocator *alloc,
403 const struct aws_byte_buf *buffer) {
404 int parse_value = aws_event_stream_message_from_buffer(message, alloc, (struct aws_byte_buf *)buffer);
405
406 if (!parse_value) {
407 message->message_buffer = aws_mem_acquire(alloc, buffer->len);
408
409 if (message->message_buffer) {
410 memcpy(message->message_buffer, buffer->buffer, buffer->len);
411 message->alloc = alloc;
412 message->owns_buffer = 1;
413
414 return AWS_OP_SUCCESS;
415 }
416
417 return aws_raise_error(AWS_ERROR_OOM);
418 }
419
420 return parse_value;
421 }
422
423 /* if buffer is owned, release the memory. */
aws_event_stream_message_clean_up(struct aws_event_stream_message * message)424 void aws_event_stream_message_clean_up(struct aws_event_stream_message *message) {
425 if (message->message_buffer && message->owns_buffer) {
426 aws_mem_release(message->alloc, message->message_buffer);
427 }
428 }
429
aws_event_stream_message_total_length(const struct aws_event_stream_message * message)430 uint32_t aws_event_stream_message_total_length(const struct aws_event_stream_message *message) {
431 return aws_read_u32(message->message_buffer + TOTAL_LEN_OFFSET);
432 }
433
aws_event_stream_message_headers_len(const struct aws_event_stream_message * message)434 uint32_t aws_event_stream_message_headers_len(const struct aws_event_stream_message *message) {
435 return aws_read_u32(message->message_buffer + HEADER_LEN_OFFSET);
436 }
437
aws_event_stream_message_prelude_crc(const struct aws_event_stream_message * message)438 uint32_t aws_event_stream_message_prelude_crc(const struct aws_event_stream_message *message) {
439 return aws_read_u32(message->message_buffer + PRELUDE_CRC_OFFSET);
440 }
441
aws_event_stream_message_headers(const struct aws_event_stream_message * message,struct aws_array_list * headers)442 int aws_event_stream_message_headers(const struct aws_event_stream_message *message, struct aws_array_list *headers) {
443 return aws_event_stream_read_headers_from_buffer(
444 headers,
445 message->message_buffer + AWS_EVENT_STREAM_PRELUDE_LENGTH,
446 aws_event_stream_message_headers_len(message));
447 }
448
aws_event_stream_message_payload(const struct aws_event_stream_message * message)449 const uint8_t *aws_event_stream_message_payload(const struct aws_event_stream_message *message) {
450 return message->message_buffer + AWS_EVENT_STREAM_PRELUDE_LENGTH + aws_event_stream_message_headers_len(message);
451 }
452
aws_event_stream_message_payload_len(const struct aws_event_stream_message * message)453 uint32_t aws_event_stream_message_payload_len(const struct aws_event_stream_message *message) {
454 return aws_event_stream_message_total_length(message) -
455 (AWS_EVENT_STREAM_PRELUDE_LENGTH + aws_event_stream_message_headers_len(message) +
456 AWS_EVENT_STREAM_TRAILER_LENGTH);
457 }
458
aws_event_stream_message_message_crc(const struct aws_event_stream_message * message)459 uint32_t aws_event_stream_message_message_crc(const struct aws_event_stream_message *message) {
460 return aws_read_u32(
461 message->message_buffer + (aws_event_stream_message_total_length(message) - AWS_EVENT_STREAM_TRAILER_LENGTH));
462 }
463
aws_event_stream_message_buffer(const struct aws_event_stream_message * message)464 const uint8_t *aws_event_stream_message_buffer(const struct aws_event_stream_message *message) {
465 return message->message_buffer;
466 }
467
468 #define DEBUG_STR_PRELUDE_TOTAL_LEN "\"total_length\": "
469 #define DEBUG_STR_PRELUDE_HDRS_LEN "\"headers_length\": "
470 #define DEBUG_STR_PRELUDE_CRC "\"prelude_crc\": "
471 #define DEBUG_STR_MESSAGE_CRC "\"message_crc\": "
472 #define DEBUG_STR_HEADER_NAME "\"name\": "
473 #define DEBUG_STR_HEADER_VALUE "\"value\": "
474 #define DEBUG_STR_HEADER_TYPE "\"type\": "
475
aws_event_stream_message_to_debug_str(FILE * fd,const struct aws_event_stream_message * message)476 int aws_event_stream_message_to_debug_str(FILE *fd, const struct aws_event_stream_message *message) {
477 struct aws_array_list headers;
478 aws_event_stream_headers_list_init(&headers, message->alloc);
479 aws_event_stream_message_headers(message, &headers);
480
481 fprintf(
482 fd,
483 "{\n " DEBUG_STR_PRELUDE_TOTAL_LEN "%d,\n " DEBUG_STR_PRELUDE_HDRS_LEN "%d,\n " DEBUG_STR_PRELUDE_CRC
484 "%d,\n",
485 aws_event_stream_message_total_length(message),
486 aws_event_stream_message_headers_len(message),
487 aws_event_stream_message_prelude_crc(message));
488
489 int count = 0;
490
491 uint16_t headers_count = (uint16_t)aws_array_list_length(&headers);
492
493 fprintf(fd, " \"headers\": [");
494
495 for (uint16_t i = 0; i < headers_count; ++i) {
496 struct aws_event_stream_header_value_pair *header = NULL;
497
498 aws_array_list_get_at_ptr(&headers, (void **)&header, i);
499
500 fprintf(fd, " {\n");
501
502 fprintf(fd, " " DEBUG_STR_HEADER_NAME "\"");
503 fwrite(header->header_name, sizeof(char), (size_t)header->header_name_len, fd);
504 fprintf(fd, "\",\n");
505
506 fprintf(fd, " " DEBUG_STR_HEADER_TYPE "%d,\n", header->header_value_type);
507
508 if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_FALSE) {
509 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "false\n");
510 } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE) {
511 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "true\n");
512 } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE) {
513 int8_t int_value = (int8_t)header->header_value.static_val[0];
514 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%d\n", (int)int_value);
515 } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT16) {
516 int16_t int_value = aws_read_u16(header->header_value.static_val);
517 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%d\n", (int)int_value);
518 } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT32) {
519 int32_t int_value = (int32_t)aws_read_u32(header->header_value.static_val);
520 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%d\n", (int)int_value);
521 } else if (
522 header->header_value_type == AWS_EVENT_STREAM_HEADER_INT64 ||
523 header->header_value_type == AWS_EVENT_STREAM_HEADER_TIMESTAMP) {
524 int64_t int_value = (int64_t)aws_read_u64(header->header_value.static_val);
525 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%lld\n", (long long)int_value);
526 } else {
527 size_t buffer_len = 0;
528 aws_base64_compute_encoded_len(header->header_value_len, &buffer_len);
529 char *encoded_buffer = (char *)aws_mem_acquire(message->alloc, buffer_len);
530 if (!encoded_buffer) {
531 return aws_raise_error(AWS_ERROR_OOM);
532 }
533
534 struct aws_byte_buf encode_output = aws_byte_buf_from_array((uint8_t *)encoded_buffer, buffer_len);
535
536 if (header->header_value_type == AWS_EVENT_STREAM_HEADER_UUID) {
537 struct aws_byte_cursor to_encode =
538 aws_byte_cursor_from_array(header->header_value.static_val, header->header_value_len);
539
540 aws_base64_encode(&to_encode, &encode_output);
541 } else {
542 struct aws_byte_cursor to_encode =
543 aws_byte_cursor_from_array(header->header_value.variable_len_val, header->header_value_len);
544 aws_base64_encode(&to_encode, &encode_output);
545 }
546 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "\"%s\"\n", encoded_buffer);
547 aws_mem_release(message->alloc, encoded_buffer);
548 }
549
550 fprintf(fd, " }");
551
552 if (count < headers_count - 1) {
553 fprintf(fd, ",");
554 }
555 fprintf(fd, "\n");
556
557 count++;
558 }
559 aws_event_stream_headers_list_cleanup(&headers);
560 fprintf(fd, " ],\n");
561
562 size_t payload_len = aws_event_stream_message_payload_len(message);
563 const uint8_t *payload = aws_event_stream_message_payload(message);
564 size_t encoded_len = 0;
565 aws_base64_compute_encoded_len(payload_len, &encoded_len);
566 char *encoded_payload = (char *)aws_mem_acquire(message->alloc, encoded_len);
567
568 if (!encoded_payload) {
569 return aws_raise_error(AWS_ERROR_OOM);
570 }
571
572 struct aws_byte_cursor payload_buffer = aws_byte_cursor_from_array(payload, payload_len);
573 struct aws_byte_buf encoded_payload_buffer = aws_byte_buf_from_array((uint8_t *)encoded_payload, encoded_len);
574
575 aws_base64_encode(&payload_buffer, &encoded_payload_buffer);
576 fprintf(fd, " \"payload\": \"%s\",\n", encoded_payload);
577 fprintf(fd, " " DEBUG_STR_MESSAGE_CRC "%d\n}\n", aws_event_stream_message_message_crc(message));
578
579 return AWS_OP_SUCCESS;
580 }
581
aws_event_stream_headers_list_init(struct aws_array_list * headers,struct aws_allocator * allocator)582 int aws_event_stream_headers_list_init(struct aws_array_list *headers, struct aws_allocator *allocator) {
583 AWS_ASSERT(headers);
584 AWS_ASSERT(allocator);
585
586 return aws_array_list_init_dynamic(headers, allocator, 4, sizeof(struct aws_event_stream_header_value_pair));
587 }
588
aws_event_stream_headers_list_cleanup(struct aws_array_list * headers)589 void aws_event_stream_headers_list_cleanup(struct aws_array_list *headers) {
590 if (AWS_UNLIKELY(!headers || !aws_array_list_is_valid(headers))) {
591 return;
592 }
593
594 for (size_t i = 0; i < aws_array_list_length(headers); ++i) {
595 struct aws_event_stream_header_value_pair *header = NULL;
596 aws_array_list_get_at_ptr(headers, (void **)&header, i);
597
598 if (header->value_owned) {
599 aws_mem_release(headers->alloc, (void *)header->header_value.variable_len_val);
600 }
601 }
602
603 aws_array_list_clean_up(headers);
604 }
605
s_add_variable_len_header(struct aws_array_list * headers,struct aws_event_stream_header_value_pair * header,const char * name,uint8_t name_len,uint8_t * value,uint16_t value_len,int8_t copy)606 static int s_add_variable_len_header(
607 struct aws_array_list *headers,
608 struct aws_event_stream_header_value_pair *header,
609 const char *name,
610 uint8_t name_len,
611 uint8_t *value,
612 uint16_t value_len,
613 int8_t copy) {
614
615 memcpy((void *)header->header_name, (void *)name, (size_t)name_len);
616
617 if (copy) {
618 header->header_value.variable_len_val = aws_mem_acquire(headers->alloc, value_len);
619 if (!header->header_value.variable_len_val) {
620 return aws_raise_error(AWS_ERROR_OOM);
621 }
622
623 header->value_owned = 1;
624 memcpy((void *)header->header_value.variable_len_val, (void *)value, value_len);
625 } else {
626 header->value_owned = 0;
627 header->header_value.variable_len_val = value;
628 }
629
630 if (aws_array_list_push_back(headers, (void *)header)) {
631 if (header->value_owned) {
632 aws_mem_release(headers->alloc, (void *)header->header_value.variable_len_val);
633 }
634 return AWS_OP_ERR;
635 }
636
637 return AWS_OP_SUCCESS;
638 }
639
aws_event_stream_add_string_header(struct aws_array_list * headers,const char * name,uint8_t name_len,const char * value,uint16_t value_len,int8_t copy)640 int aws_event_stream_add_string_header(
641 struct aws_array_list *headers,
642 const char *name,
643 uint8_t name_len,
644 const char *value,
645 uint16_t value_len,
646 int8_t copy) {
647 struct aws_event_stream_header_value_pair header = {.header_name_len = name_len,
648 .header_value_len = value_len,
649 .value_owned = copy,
650 .header_value_type = AWS_EVENT_STREAM_HEADER_STRING};
651
652 return s_add_variable_len_header(headers, &header, name, name_len, (uint8_t *)value, value_len, copy);
653 }
654
aws_event_stream_create_string_header(struct aws_byte_cursor name,struct aws_byte_cursor value)655 struct aws_event_stream_header_value_pair aws_event_stream_create_string_header(
656 struct aws_byte_cursor name,
657 struct aws_byte_cursor value) {
658 AWS_PRECONDITION(name.len < INT8_MAX);
659 AWS_PRECONDITION(value.len < INT16_MAX);
660
661 struct aws_event_stream_header_value_pair header = {
662 .header_value_type = AWS_EVENT_STREAM_HEADER_STRING,
663 .header_value.variable_len_val = value.ptr,
664 .header_value_len = (uint16_t)value.len,
665 .header_name_len = (uint8_t)name.len,
666 .value_owned = 0,
667 };
668
669 memcpy(header.header_name, name.ptr, name.len);
670
671 return header;
672 }
673
aws_event_stream_create_int32_header(struct aws_byte_cursor name,int32_t value)674 struct aws_event_stream_header_value_pair aws_event_stream_create_int32_header(
675 struct aws_byte_cursor name,
676 int32_t value) {
677 AWS_PRECONDITION(name.len < INT8_MAX);
678
679 struct aws_event_stream_header_value_pair header = {
680 .header_value_type = AWS_EVENT_STREAM_HEADER_INT32,
681 .header_value_len = (uint16_t)sizeof(int32_t),
682 .header_name_len = (uint8_t)name.len,
683 .value_owned = 0,
684 };
685
686 memcpy(header.header_name, name.ptr, name.len);
687 aws_write_u32((uint32_t)value, header.header_value.static_val);
688
689 return header;
690 }
691
aws_event_stream_add_byte_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int8_t value)692 int aws_event_stream_add_byte_header(struct aws_array_list *headers, const char *name, uint8_t name_len, int8_t value) {
693 struct aws_event_stream_header_value_pair header = {.header_name_len = name_len,
694 .header_value_len = 1,
695 .value_owned = 0,
696 .header_value_type = AWS_EVENT_STREAM_HEADER_BYTE,
697 .header_value.static_val[0] = (uint8_t)value};
698
699 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
700
701 return aws_array_list_push_back(headers, (void *)&header);
702 }
703
aws_event_stream_add_bool_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int8_t value)704 int aws_event_stream_add_bool_header(struct aws_array_list *headers, const char *name, uint8_t name_len, int8_t value) {
705 struct aws_event_stream_header_value_pair header = {
706 .header_name_len = name_len,
707 .header_value_len = 0,
708 .value_owned = 0,
709 .header_value_type = value ? AWS_EVENT_STREAM_HEADER_BOOL_TRUE : AWS_EVENT_STREAM_HEADER_BOOL_FALSE,
710 };
711
712 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
713
714 return aws_array_list_push_back(headers, (void *)&header);
715 }
716
aws_event_stream_add_int16_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int16_t value)717 int aws_event_stream_add_int16_header(
718 struct aws_array_list *headers,
719 const char *name,
720 uint8_t name_len,
721 int16_t value) {
722 struct aws_event_stream_header_value_pair header = {
723 .header_name_len = name_len,
724 .header_value_len = sizeof(value),
725 .value_owned = 0,
726 .header_value_type = AWS_EVENT_STREAM_HEADER_INT16,
727 };
728
729 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
730 aws_write_u16((uint16_t)value, header.header_value.static_val);
731
732 return aws_array_list_push_back(headers, (void *)&header);
733 }
734
aws_event_stream_add_int32_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int32_t value)735 int aws_event_stream_add_int32_header(
736 struct aws_array_list *headers,
737 const char *name,
738 uint8_t name_len,
739 int32_t value) {
740 struct aws_event_stream_header_value_pair header = {
741 .header_name_len = name_len,
742 .header_value_len = sizeof(value),
743 .value_owned = 0,
744 .header_value_type = AWS_EVENT_STREAM_HEADER_INT32,
745 };
746
747 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
748 aws_write_u32((uint32_t)value, header.header_value.static_val);
749
750 return aws_array_list_push_back(headers, (void *)&header);
751 }
752
aws_event_stream_add_int64_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int64_t value)753 int aws_event_stream_add_int64_header(
754 struct aws_array_list *headers,
755 const char *name,
756 uint8_t name_len,
757 int64_t value) {
758 struct aws_event_stream_header_value_pair header = {
759 .header_name_len = name_len,
760 .header_value_len = sizeof(value),
761 .value_owned = 0,
762 .header_value_type = AWS_EVENT_STREAM_HEADER_INT64,
763 };
764
765 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
766 aws_write_u64((uint64_t)value, header.header_value.static_val);
767
768 return aws_array_list_push_back(headers, (void *)&header);
769 }
770
aws_event_stream_add_bytebuf_header(struct aws_array_list * headers,const char * name,uint8_t name_len,uint8_t * value,uint16_t value_len,int8_t copy)771 int aws_event_stream_add_bytebuf_header(
772 struct aws_array_list *headers,
773 const char *name,
774 uint8_t name_len,
775 uint8_t *value,
776 uint16_t value_len,
777 int8_t copy) {
778 struct aws_event_stream_header_value_pair header = {.header_name_len = name_len,
779 .header_value_len = value_len,
780 .value_owned = copy,
781 .header_value_type = AWS_EVENT_STREAM_HEADER_BYTE_BUF};
782
783 return s_add_variable_len_header(headers, &header, name, name_len, value, value_len, copy);
784 }
785
aws_event_stream_add_timestamp_header(struct aws_array_list * headers,const char * name,uint8_t name_len,int64_t value)786 int aws_event_stream_add_timestamp_header(
787 struct aws_array_list *headers,
788 const char *name,
789 uint8_t name_len,
790 int64_t value) {
791 struct aws_event_stream_header_value_pair header = {
792 .header_name_len = name_len,
793 .header_value_len = sizeof(uint64_t),
794 .value_owned = 0,
795 .header_value_type = AWS_EVENT_STREAM_HEADER_TIMESTAMP,
796 };
797
798 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
799 aws_write_u64((uint64_t)value, header.header_value.static_val);
800
801 return aws_array_list_push_back(headers, (void *)&header);
802 }
803
aws_event_stream_add_uuid_header(struct aws_array_list * headers,const char * name,uint8_t name_len,const uint8_t * value)804 int aws_event_stream_add_uuid_header(
805 struct aws_array_list *headers,
806 const char *name,
807 uint8_t name_len,
808 const uint8_t *value) {
809 struct aws_event_stream_header_value_pair header = {
810 .header_name_len = name_len,
811 .header_value_len = 16,
812 .value_owned = 0,
813 .header_value_type = AWS_EVENT_STREAM_HEADER_UUID,
814 };
815
816 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
817 memcpy((void *)header.header_value.static_val, value, 16);
818
819 return aws_array_list_push_back(headers, (void *)&header);
820 }
821
aws_event_stream_header_name(struct aws_event_stream_header_value_pair * header)822 struct aws_byte_buf aws_event_stream_header_name(struct aws_event_stream_header_value_pair *header) {
823 return aws_byte_buf_from_array((uint8_t *)header->header_name, header->header_name_len);
824 }
825
aws_event_stream_header_value_as_byte(struct aws_event_stream_header_value_pair * header)826 int8_t aws_event_stream_header_value_as_byte(struct aws_event_stream_header_value_pair *header) {
827 return (int8_t)header->header_value.static_val[0];
828 }
829
aws_event_stream_header_value_as_string(struct aws_event_stream_header_value_pair * header)830 struct aws_byte_buf aws_event_stream_header_value_as_string(struct aws_event_stream_header_value_pair *header) {
831 return aws_event_stream_header_value_as_bytebuf(header);
832 }
833
aws_event_stream_header_value_as_bool(struct aws_event_stream_header_value_pair * header)834 int8_t aws_event_stream_header_value_as_bool(struct aws_event_stream_header_value_pair *header) {
835 return header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE ? (int8_t)1 : (int8_t)0;
836 }
837
aws_event_stream_header_value_as_int16(struct aws_event_stream_header_value_pair * header)838 int16_t aws_event_stream_header_value_as_int16(struct aws_event_stream_header_value_pair *header) {
839 return (int16_t)aws_read_u16(header->header_value.static_val);
840 }
841
aws_event_stream_header_value_as_int32(struct aws_event_stream_header_value_pair * header)842 int32_t aws_event_stream_header_value_as_int32(struct aws_event_stream_header_value_pair *header) {
843 return (int32_t)aws_read_u32(header->header_value.static_val);
844 }
845
aws_event_stream_header_value_as_int64(struct aws_event_stream_header_value_pair * header)846 int64_t aws_event_stream_header_value_as_int64(struct aws_event_stream_header_value_pair *header) {
847 return (int64_t)aws_read_u64(header->header_value.static_val);
848 }
849
aws_event_stream_header_value_as_bytebuf(struct aws_event_stream_header_value_pair * header)850 struct aws_byte_buf aws_event_stream_header_value_as_bytebuf(struct aws_event_stream_header_value_pair *header) {
851 return aws_byte_buf_from_array(header->header_value.variable_len_val, header->header_value_len);
852 }
853
aws_event_stream_header_value_as_timestamp(struct aws_event_stream_header_value_pair * header)854 int64_t aws_event_stream_header_value_as_timestamp(struct aws_event_stream_header_value_pair *header) {
855 return aws_event_stream_header_value_as_int64(header);
856 }
857
aws_event_stream_header_value_as_uuid(struct aws_event_stream_header_value_pair * header)858 struct aws_byte_buf aws_event_stream_header_value_as_uuid(struct aws_event_stream_header_value_pair *header) {
859 return aws_byte_buf_from_array(header->header_value.static_val, 16);
860 }
861
aws_event_stream_header_value_length(struct aws_event_stream_header_value_pair * header)862 uint16_t aws_event_stream_header_value_length(struct aws_event_stream_header_value_pair *header) {
863 return header->header_value_len;
864 }
865
866 static struct aws_event_stream_message_prelude s_empty_prelude = {.total_len = 0, .headers_len = 0, .prelude_crc = 0};
867
s_reset_header_state(struct aws_event_stream_streaming_decoder * decoder,uint8_t free_header_data)868 static void s_reset_header_state(struct aws_event_stream_streaming_decoder *decoder, uint8_t free_header_data) {
869
870 if (free_header_data && decoder->current_header.value_owned) {
871 aws_mem_release(decoder->alloc, (void *)decoder->current_header.header_value.variable_len_val);
872 }
873
874 memset((void *)&decoder->current_header, 0, sizeof(struct aws_event_stream_header_value_pair));
875 }
876
877 static void s_reset_state(struct aws_event_stream_streaming_decoder *decoder);
878
879 static int s_headers_state(
880 struct aws_event_stream_streaming_decoder *decoder,
881 const uint8_t *data,
882 size_t len,
883 size_t *processed);
884
s_read_header_value(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)885 static int s_read_header_value(
886 struct aws_event_stream_streaming_decoder *decoder,
887 const uint8_t *data,
888 size_t len,
889 size_t *processed) {
890
891 size_t current_pos = decoder->message_pos;
892
893 size_t length_read = current_pos - decoder->current_header_value_offset;
894 struct aws_event_stream_header_value_pair *current_header = &decoder->current_header;
895
896 if (!length_read) {
897 /* save an allocation, this can only happen if the data we were handed is larger than the length of the header
898 * value. we don't really need to handle offsets in this case. This expects the user is living by the contract
899 * that they cannot act like they own this memory beyond the lifetime of their callback, and they should not
900 * mutate it */
901 if (len >= current_header->header_value_len) {
902 /* this part works regardless of type since the layout of the union will line up. */
903 current_header->header_value.variable_len_val = (uint8_t *)data;
904 current_header->value_owned = 0;
905 decoder->on_header(decoder, &decoder->prelude, &decoder->current_header, decoder->user_context);
906 *processed += current_header->header_value_len;
907 decoder->message_pos += current_header->header_value_len;
908 decoder->running_crc =
909 aws_checksums_crc32(data, (int)current_header->header_value_len, decoder->running_crc);
910
911 s_reset_header_state(decoder, 1);
912 decoder->state = s_headers_state;
913 return AWS_OP_SUCCESS;
914 }
915
916 /* a possible optimization later would be to only allocate this once, and then keep reusing the same buffer. for
917 * subsequent messages.*/
918 if (current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF ||
919 current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING) {
920 current_header->header_value.variable_len_val =
921 aws_mem_acquire(decoder->alloc, decoder->current_header.header_value_len);
922
923 if (!current_header->header_value.variable_len_val) {
924 return aws_raise_error(AWS_ERROR_OOM);
925 }
926
927 current_header->value_owned = 1;
928 }
929 }
930
931 size_t max_read =
932 len >= current_header->header_value_len - length_read ? current_header->header_value_len - length_read : len;
933
934 const uint8_t *header_value_alias = current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF ||
935 current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING
936 ? current_header->header_value.variable_len_val
937 : current_header->header_value.static_val;
938
939 memcpy((void *)(header_value_alias + length_read), data, max_read);
940 decoder->running_crc = aws_checksums_crc32(data, (int)max_read, decoder->running_crc);
941
942 *processed += max_read;
943 decoder->message_pos += max_read;
944 length_read += max_read;
945
946 if (length_read == current_header->header_value_len) {
947 decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
948 s_reset_header_state(decoder, 1);
949 decoder->state = s_headers_state;
950 }
951
952 return AWS_OP_SUCCESS;
953 }
954
s_read_header_value_len(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)955 static int s_read_header_value_len(
956 struct aws_event_stream_streaming_decoder *decoder,
957 const uint8_t *data,
958 size_t len,
959 size_t *processed) {
960 size_t current_pos = decoder->message_pos;
961
962 size_t length_portion_read = current_pos - decoder->current_header_value_offset;
963
964 if (length_portion_read < sizeof(uint16_t)) {
965 size_t max_to_read =
966 len > sizeof(uint16_t) - length_portion_read ? sizeof(uint16_t) - length_portion_read : len;
967 memcpy(decoder->working_buffer + length_portion_read, data, max_to_read);
968 decoder->running_crc = aws_checksums_crc32(data, (int)max_to_read, decoder->running_crc);
969
970 *processed += max_to_read;
971 decoder->message_pos += max_to_read;
972
973 length_portion_read = decoder->message_pos - decoder->current_header_value_offset;
974 }
975
976 if (length_portion_read == sizeof(uint16_t)) {
977 decoder->current_header.header_value_len = aws_read_u16(decoder->working_buffer);
978 decoder->current_header_value_offset = decoder->message_pos;
979 decoder->state = s_read_header_value;
980 }
981
982 return AWS_OP_SUCCESS;
983 }
984
s_read_header_type(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)985 static int s_read_header_type(
986 struct aws_event_stream_streaming_decoder *decoder,
987 const uint8_t *data,
988 size_t len,
989 size_t *processed) {
990 (void)len;
991 uint8_t type = *data;
992 decoder->running_crc = aws_checksums_crc32(data, 1, decoder->running_crc);
993 *processed += 1;
994 decoder->message_pos++;
995 decoder->current_header_value_offset++;
996 struct aws_event_stream_header_value_pair *current_header = &decoder->current_header;
997
998 if (type >= AWS_EVENT_STREAM_HEADER_BOOL_FALSE && type <= AWS_EVENT_STREAM_HEADER_UUID) {
999 current_header->header_value_type = (enum aws_event_stream_header_value_type)type;
1000
1001 switch (type) {
1002 case AWS_EVENT_STREAM_HEADER_STRING:
1003 case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
1004 decoder->state = s_read_header_value_len;
1005 break;
1006 case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
1007 current_header->header_value_len = 0;
1008 current_header->header_value.static_val[0] = 0;
1009 decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
1010 s_reset_header_state(decoder, 1);
1011 break;
1012 case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
1013 current_header->header_value_len = 0;
1014 current_header->header_value.static_val[0] = 1;
1015 decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
1016 s_reset_header_state(decoder, 1);
1017 break;
1018 case AWS_EVENT_STREAM_HEADER_BYTE:
1019 current_header->header_value_len = 1;
1020 decoder->state = s_read_header_value;
1021 break;
1022 case AWS_EVENT_STREAM_HEADER_INT16:
1023 current_header->header_value_len = sizeof(uint16_t);
1024 decoder->state = s_read_header_value;
1025 break;
1026 case AWS_EVENT_STREAM_HEADER_INT32:
1027 current_header->header_value_len = sizeof(uint32_t);
1028 decoder->state = s_read_header_value;
1029 break;
1030 case AWS_EVENT_STREAM_HEADER_INT64:
1031 case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
1032 current_header->header_value_len = sizeof(uint64_t);
1033 decoder->state = s_read_header_value;
1034 break;
1035 case AWS_EVENT_STREAM_HEADER_UUID:
1036 current_header->header_value_len = 16;
1037 decoder->state = s_read_header_value;
1038 break;
1039 default:
1040 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE);
1041 }
1042
1043 return AWS_OP_SUCCESS;
1044 }
1045
1046 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE);
1047 }
1048
s_read_header_name(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1049 static int s_read_header_name(
1050 struct aws_event_stream_streaming_decoder *decoder,
1051 const uint8_t *data,
1052 size_t len,
1053 size_t *processed) {
1054 size_t current_pos = decoder->message_pos;
1055
1056 size_t length_read = current_pos - decoder->current_header_name_offset;
1057
1058 size_t max_read = len >= decoder->current_header.header_name_len - length_read
1059 ? decoder->current_header.header_name_len - length_read
1060 : len;
1061 memcpy((void *)(decoder->current_header.header_name + length_read), data, max_read);
1062 decoder->running_crc = aws_checksums_crc32(data, (int)max_read, decoder->running_crc);
1063
1064 *processed += max_read;
1065 decoder->message_pos += max_read;
1066 length_read += max_read;
1067
1068 if (length_read == decoder->current_header.header_name_len) {
1069 decoder->state = s_read_header_type;
1070 decoder->current_header_value_offset = decoder->message_pos;
1071 }
1072
1073 return AWS_OP_SUCCESS;
1074 }
1075
s_read_header_name_len(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1076 static int s_read_header_name_len(
1077 struct aws_event_stream_streaming_decoder *decoder,
1078 const uint8_t *data,
1079 size_t len,
1080 size_t *processed) {
1081 (void)len;
1082 decoder->current_header.header_name_len = *data;
1083 decoder->message_pos++;
1084 decoder->current_header_name_offset++;
1085 *processed += 1;
1086 decoder->state = s_read_header_name;
1087 decoder->running_crc = aws_checksums_crc32(data, 1, decoder->running_crc);
1088
1089 return AWS_OP_SUCCESS;
1090 }
1091
s_start_header(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1092 static int s_start_header(
1093 struct aws_event_stream_streaming_decoder *decoder,
1094 const uint8_t *data,
1095 size_t len,
1096 size_t *processed) /* NOLINT */ {
1097 (void)data;
1098 (void)len;
1099 (void)processed;
1100 decoder->state = s_read_header_name_len;
1101 decoder->current_header_name_offset = decoder->message_pos;
1102
1103 return AWS_OP_SUCCESS;
1104 }
1105
1106 static int s_payload_state(
1107 struct aws_event_stream_streaming_decoder *decoder,
1108 const uint8_t *data,
1109 size_t len,
1110 size_t *processed);
1111
1112 /*Handles the initial state for header parsing.
1113 will oscillate between multiple other states as well.
1114 after all headers have been handled, payload will be set as the next state. */
s_headers_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1115 static int s_headers_state(
1116 struct aws_event_stream_streaming_decoder *decoder,
1117 const uint8_t *data,
1118 size_t len,
1119 size_t *processed) /* NOLINT */ {
1120 (void)data;
1121 (void)len;
1122 (void)processed;
1123
1124 size_t current_pos = decoder->message_pos;
1125
1126 size_t headers_boundary = decoder->prelude.headers_len + AWS_EVENT_STREAM_PRELUDE_LENGTH;
1127
1128 if (current_pos < headers_boundary) {
1129 decoder->state = s_start_header;
1130 return AWS_OP_SUCCESS;
1131 }
1132
1133 if (current_pos == headers_boundary) {
1134 decoder->state = s_payload_state;
1135 return AWS_OP_SUCCESS;
1136 }
1137
1138 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_PARSER_ILLEGAL_STATE);
1139 }
1140
1141 /* handles reading the trailer. Once it has been read, it will be compared to the running checksum. If successful,
1142 * the state will be reset. */
s_read_trailer_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1143 static int s_read_trailer_state(
1144 struct aws_event_stream_streaming_decoder *decoder,
1145 const uint8_t *data,
1146 size_t len,
1147 size_t *processed) {
1148
1149 size_t remaining_amount = decoder->prelude.total_len - decoder->message_pos;
1150 size_t segment_length = len > remaining_amount ? remaining_amount : len;
1151 size_t offset = sizeof(uint32_t) - remaining_amount;
1152 memcpy(decoder->working_buffer + offset, data, segment_length);
1153 decoder->message_pos += segment_length;
1154 *processed += segment_length;
1155
1156 if (decoder->message_pos == decoder->prelude.total_len) {
1157 uint32_t message_crc = aws_read_u32(decoder->working_buffer);
1158
1159 if (message_crc == decoder->running_crc) {
1160 s_reset_state(decoder);
1161 } else {
1162 char error_message[70];
1163 snprintf(
1164 error_message,
1165 sizeof(error_message),
1166 "CRC Mismatch. message_crc was 0x08%" PRIX32 ", but computed 0x08%" PRIX32,
1167 message_crc,
1168 decoder->running_crc);
1169 aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE);
1170 decoder->on_error(
1171 decoder,
1172 &decoder->prelude,
1173 AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE,
1174 error_message,
1175 decoder->user_context);
1176 return AWS_OP_ERR;
1177 }
1178 }
1179
1180 return AWS_OP_SUCCESS;
1181 }
1182
1183 /* handles the reading of the payload up to the final checksum. Sets read_trailer_state as the new state once
1184 * the payload has been processed. */
s_payload_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1185 static int s_payload_state(
1186 struct aws_event_stream_streaming_decoder *decoder,
1187 const uint8_t *data,
1188 size_t len,
1189 size_t *processed) {
1190
1191 if (decoder->message_pos < decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH) {
1192 size_t remaining_amount = decoder->prelude.total_len - decoder->message_pos - AWS_EVENT_STREAM_TRAILER_LENGTH;
1193 size_t segment_length = len > remaining_amount ? remaining_amount : len;
1194 int8_t final_segment =
1195 (segment_length + decoder->message_pos) == (decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH);
1196 struct aws_byte_buf payload_buf = aws_byte_buf_from_array(data, segment_length);
1197 decoder->on_payload(decoder, &payload_buf, final_segment, decoder->user_context);
1198 decoder->message_pos += segment_length;
1199 decoder->running_crc = aws_checksums_crc32(data, (int)segment_length, decoder->running_crc);
1200 *processed += segment_length;
1201 }
1202
1203 if (decoder->message_pos == decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH) {
1204 decoder->state = s_read_trailer_state;
1205 }
1206
1207 return AWS_OP_SUCCESS;
1208 }
1209
1210 /* Parses the payload and verifies checksums. Sets the next state if successful. */
s_verify_prelude_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1211 static int s_verify_prelude_state(
1212 struct aws_event_stream_streaming_decoder *decoder,
1213 const uint8_t *data,
1214 size_t len,
1215 size_t *processed) /* NOLINT */ {
1216 (void)data;
1217 (void)len;
1218 (void)processed;
1219
1220 decoder->prelude.headers_len = aws_read_u32(decoder->working_buffer + HEADER_LEN_OFFSET);
1221 decoder->prelude.prelude_crc = aws_read_u32(decoder->working_buffer + PRELUDE_CRC_OFFSET);
1222 decoder->prelude.total_len = aws_read_u32(decoder->working_buffer + TOTAL_LEN_OFFSET);
1223
1224 decoder->running_crc = aws_checksums_crc32(decoder->working_buffer, PRELUDE_CRC_OFFSET, 0);
1225
1226 if (AWS_LIKELY(decoder->running_crc == decoder->prelude.prelude_crc)) {
1227
1228 if (AWS_UNLIKELY(
1229 decoder->prelude.headers_len > AWS_EVENT_STREAM_MAX_HEADERS_SIZE ||
1230 decoder->prelude.total_len > AWS_EVENT_STREAM_MAX_MESSAGE_SIZE)) {
1231 aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
1232 char error_message[] = "Maximum message field size exceeded";
1233
1234 decoder->on_error(
1235 decoder,
1236 &decoder->prelude,
1237 AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED,
1238 error_message,
1239 decoder->user_context);
1240 return AWS_OP_ERR;
1241 }
1242
1243 /* Should only call on_prelude() after passing crc check and limitation check, otherwise call on_prelude() with
1244 * incorrect prelude is error prune. */
1245 decoder->on_prelude(decoder, &decoder->prelude, decoder->user_context);
1246
1247 decoder->running_crc = aws_checksums_crc32(
1248 decoder->working_buffer + PRELUDE_CRC_OFFSET,
1249 (int)sizeof(decoder->prelude.prelude_crc),
1250 decoder->running_crc);
1251 memset(decoder->working_buffer, 0, sizeof(decoder->working_buffer));
1252 decoder->state = decoder->prelude.headers_len > 0 ? s_headers_state : s_payload_state;
1253 } else {
1254 char error_message[70];
1255 snprintf(
1256 error_message,
1257 sizeof(error_message),
1258 "CRC Mismatch. prelude_crc was 0x08%" PRIX32 ", but computed 0x08%" PRIX32,
1259 decoder->prelude.prelude_crc,
1260 decoder->running_crc);
1261
1262 aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE);
1263 decoder->on_error(
1264 decoder,
1265 &decoder->prelude,
1266 AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE,
1267 error_message,
1268 decoder->user_context);
1269 return AWS_OP_ERR;
1270 }
1271
1272 return AWS_OP_SUCCESS;
1273 }
1274
1275 /* initial state handles up to the reading of the prelude */
s_start_state(struct aws_event_stream_streaming_decoder * decoder,const uint8_t * data,size_t len,size_t * processed)1276 static int s_start_state(
1277 struct aws_event_stream_streaming_decoder *decoder,
1278 const uint8_t *data,
1279 size_t len,
1280 size_t *processed) {
1281
1282 size_t previous_position = decoder->message_pos;
1283 if (decoder->message_pos < AWS_EVENT_STREAM_PRELUDE_LENGTH) {
1284 if (len >= AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos) {
1285 memcpy(
1286 decoder->working_buffer + decoder->message_pos,
1287 data,
1288 AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos);
1289 decoder->message_pos += AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos;
1290 } else {
1291 memcpy(decoder->working_buffer + decoder->message_pos, data, len);
1292 decoder->message_pos += len;
1293 }
1294
1295 *processed += decoder->message_pos - previous_position;
1296 }
1297
1298 if (decoder->message_pos == AWS_EVENT_STREAM_PRELUDE_LENGTH) {
1299 decoder->state = s_verify_prelude_state;
1300 }
1301
1302 return AWS_OP_SUCCESS;
1303 }
1304
s_reset_state(struct aws_event_stream_streaming_decoder * decoder)1305 static void s_reset_state(struct aws_event_stream_streaming_decoder *decoder) {
1306 decoder->message_pos = 0;
1307 decoder->prelude = s_empty_prelude;
1308 decoder->running_crc = 0;
1309 memset(decoder->working_buffer, 0, sizeof(decoder->working_buffer));
1310 decoder->state = s_start_state;
1311 }
1312
aws_event_stream_streaming_decoder_init(struct aws_event_stream_streaming_decoder * decoder,struct aws_allocator * alloc,aws_event_stream_process_on_payload_segment_fn * on_payload_segment,aws_event_stream_prelude_received_fn * on_prelude,aws_event_stream_header_received_fn * on_header,aws_event_stream_on_error_fn * on_error,void * user_data)1313 void aws_event_stream_streaming_decoder_init(
1314 struct aws_event_stream_streaming_decoder *decoder,
1315 struct aws_allocator *alloc,
1316 aws_event_stream_process_on_payload_segment_fn *on_payload_segment,
1317 aws_event_stream_prelude_received_fn *on_prelude,
1318 aws_event_stream_header_received_fn *on_header,
1319 aws_event_stream_on_error_fn *on_error,
1320 void *user_data) {
1321
1322 s_reset_state(decoder);
1323 decoder->alloc = alloc;
1324 decoder->on_error = on_error;
1325 decoder->on_header = on_header;
1326 decoder->on_payload = on_payload_segment;
1327 decoder->on_prelude = on_prelude;
1328 decoder->user_context = user_data;
1329 }
1330
aws_event_stream_streaming_decoder_clean_up(struct aws_event_stream_streaming_decoder * decoder)1331 void aws_event_stream_streaming_decoder_clean_up(struct aws_event_stream_streaming_decoder *decoder) {
1332 s_reset_state(decoder);
1333 decoder->on_error = 0;
1334 decoder->on_header = 0;
1335 decoder->on_payload = 0;
1336 decoder->on_prelude = 0;
1337 decoder->user_context = 0;
1338 }
1339
1340 /* Simply sends the data to the state machine until all has been processed or an error is returned. */
aws_event_stream_streaming_decoder_pump(struct aws_event_stream_streaming_decoder * decoder,const struct aws_byte_buf * data)1341 int aws_event_stream_streaming_decoder_pump(
1342 struct aws_event_stream_streaming_decoder *decoder,
1343 const struct aws_byte_buf *data) {
1344
1345 size_t processed = 0;
1346 int err_val = 0;
1347 while (!err_val && data->buffer && data->len && processed < data->len) {
1348 err_val = decoder->state(decoder, data->buffer + processed, data->len - processed, &processed);
1349 }
1350
1351 return err_val;
1352 }
1353 #if _MSC_VER
1354 # pragma warning(pop)
1355 #endif
1356