1 /*-*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <fluent-bit/flb_info.h>
25 #include <fluent-bit/flb_mem.h>
26 #include <fluent-bit/flb_sds.h>
27 #include <fluent-bit/flb_error.h>
28 #include <fluent-bit/flb_utils.h>
29 #include <fluent-bit/flb_sds.h>
30 #include <fluent-bit/flb_time.h>
31 #include <fluent-bit/flb_pack.h>
32 #include <fluent-bit/flb_unescape.h>
33
34 #include <msgpack.h>
35 #include <jsmn/jsmn.h>
36
37 #define try_to_write_str flb_utils_write_str
38
flb_json_tokenise(const char * js,size_t len,struct flb_pack_state * state)39 int flb_json_tokenise(const char *js, size_t len,
40 struct flb_pack_state *state)
41 {
42 int ret;
43 int new_tokens = 256;
44 size_t old_size;
45 size_t new_size;
46 void *tmp;
47
48 ret = jsmn_parse(&state->parser, js, len,
49 state->tokens, state->tokens_size);
50 while (ret == JSMN_ERROR_NOMEM) {
51 /* Get current size of the array in bytes */
52 old_size = state->tokens_size * sizeof(jsmntok_t);
53
54 /* New size: add capacity for new 256 entries */
55 new_size = old_size + (sizeof(jsmntok_t) * new_tokens);
56
57 tmp = flb_realloc_z(state->tokens, old_size, new_size);
58 if (!tmp) {
59 flb_errno();
60 return -1;
61 }
62 state->tokens = tmp;
63 state->tokens_size += new_tokens;
64
65 ret = jsmn_parse(&state->parser, js, len,
66 state->tokens, state->tokens_size);
67 }
68
69 if (ret == JSMN_ERROR_INVAL) {
70 return FLB_ERR_JSON_INVAL;
71 }
72
73 if (ret == JSMN_ERROR_PART) {
74 /* This is a partial JSON message, just stop */
75 flb_trace("[json tokenise] incomplete");
76 return FLB_ERR_JSON_PART;
77 }
78
79 state->tokens_count += ret;
80 return 0;
81 }
82
is_float(const char * buf,int len)83 static inline int is_float(const char *buf, int len)
84 {
85 const char *end = buf + len;
86 const char *p = buf;
87
88 while (p <= end) {
89 if (*p == 'e' && p < end && *(p + 1) == '-') {
90 return 1;
91 }
92 else if (*p == '.') {
93 return 1;
94 }
95 p++;
96 }
97
98 return 0;
99 }
100
101 /* Sanitize incoming JSON string */
pack_string_token(struct flb_pack_state * state,const char * str,int len,msgpack_packer * pck)102 static inline int pack_string_token(struct flb_pack_state *state,
103 const char *str, int len,
104 msgpack_packer *pck)
105 {
106 int s;
107 int out_len;
108 char *tmp;
109 char *out_buf;
110
111 if (state->buf_size < len + 1) {
112 s = len + 1;
113 tmp = flb_realloc(state->buf_data, s);
114 if (!tmp) {
115 flb_errno();
116 return -1;
117 }
118 else {
119 state->buf_data = tmp;
120 state->buf_size = s;
121 }
122 }
123 out_buf = state->buf_data;
124
125 /* Always decode any UTF-8 or special characters */
126 out_len = flb_unescape_string_utf8(str, len, out_buf);
127
128 /* Pack decoded text */
129 msgpack_pack_str(pck, out_len);
130 msgpack_pack_str_body(pck, out_buf, out_len);
131
132 return out_len;
133 }
134
135 /* Receive a tokenized JSON message and convert it to MsgPack */
tokens_to_msgpack(struct flb_pack_state * state,const char * js,int * out_size,int * last_byte,int * out_records)136 static char *tokens_to_msgpack(struct flb_pack_state *state,
137 const char *js,
138 int *out_size, int *last_byte,
139 int *out_records)
140 {
141 int i;
142 int flen;
143 int arr_size;
144 int records = 0;
145 const char *p;
146 char *buf;
147 const jsmntok_t *t;
148 msgpack_packer pck;
149 msgpack_sbuffer sbuf;
150 jsmntok_t *tokens;
151
152 tokens = state->tokens;
153 arr_size = state->tokens_count;
154
155 if (arr_size == 0) {
156 return NULL;
157 }
158
159 /* initialize buffers */
160 msgpack_sbuffer_init(&sbuf);
161 msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
162
163 for (i = 0; i < arr_size ; i++) {
164 t = &tokens[i];
165
166 if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
167 break;
168 }
169
170 if (t->parent == -1) {
171 *last_byte = t->end;
172 records++;
173 }
174
175 flen = (t->end - t->start);
176 switch (t->type) {
177 case JSMN_OBJECT:
178 msgpack_pack_map(&pck, t->size);
179 break;
180 case JSMN_ARRAY:
181 msgpack_pack_array(&pck, t->size);
182 break;
183 case JSMN_STRING:
184 pack_string_token(state, js + t->start, flen, &pck);
185 break;
186 case JSMN_PRIMITIVE:
187 p = js + t->start;
188 if (*p == 'f') {
189 msgpack_pack_false(&pck);
190 }
191 else if (*p == 't') {
192 msgpack_pack_true(&pck);
193 }
194 else if (*p == 'n') {
195 msgpack_pack_nil(&pck);
196 }
197 else {
198 if (is_float(p, flen)) {
199 msgpack_pack_double(&pck, atof(p));
200 }
201 else {
202 msgpack_pack_int64(&pck, atoll(p));
203 }
204 }
205 break;
206 case JSMN_UNDEFINED:
207 msgpack_sbuffer_destroy(&sbuf);
208 return NULL;
209 }
210 }
211
212 *out_size = sbuf.size;
213 *out_records = records;
214 buf = sbuf.data;
215
216 return buf;
217 }
218
219 /*
220 * It parse a JSON string and convert it to MessagePack format, this packer is
221 * useful when a complete JSON message exists, otherwise it will fail until
222 * the message is complete.
223 *
224 * This routine do not keep a state in the parser, do not use it for big
225 * JSON messages.
226 */
pack_json_to_msgpack(const char * js,size_t len,char ** buffer,size_t * size,int * root_type,int * records)227 static int pack_json_to_msgpack(const char *js, size_t len, char **buffer,
228 size_t *size, int *root_type, int *records)
229 {
230 int ret = -1;
231 int n_records;
232 int out;
233 int last;
234 char *buf = NULL;
235 struct flb_pack_state state;
236
237 ret = flb_pack_state_init(&state);
238 if (ret != 0) {
239 return -1;
240 }
241 ret = flb_json_tokenise(js, len, &state);
242 if (ret != 0) {
243 ret = -1;
244 goto flb_pack_json_end;
245 }
246
247 if (state.tokens_count == 0) {
248 ret = -1;
249 goto flb_pack_json_end;
250 }
251
252 buf = tokens_to_msgpack(&state, js, &out, &last, &n_records);
253 if (!buf) {
254 ret = -1;
255 goto flb_pack_json_end;
256 }
257
258 *root_type = state.tokens[0].type;
259 *size = out;
260 *buffer = buf;
261 *records = n_records;
262 ret = 0;
263
264 flb_pack_json_end:
265 flb_pack_state_reset(&state);
266 return ret;
267 }
268
269 /* Pack unlimited serialized JSON messages into msgpack */
flb_pack_json(const char * js,size_t len,char ** buffer,size_t * size,int * root_type)270 int flb_pack_json(const char *js, size_t len, char **buffer, size_t *size,
271 int *root_type)
272 {
273 int records;
274
275 return pack_json_to_msgpack(js, len, buffer, size, root_type, &records);
276 }
277
278 /*
279 * Pack unlimited serialized JSON messages into msgpack, finally it writes on
280 * 'out_records' the number of messages.
281 */
flb_pack_json_recs(const char * js,size_t len,char ** buffer,size_t * size,int * root_type,int * out_records)282 int flb_pack_json_recs(const char *js, size_t len, char **buffer, size_t *size,
283 int *root_type, int *out_records)
284 {
285 return pack_json_to_msgpack(js, len, buffer, size, root_type, out_records);
286 }
287
288 /* Initialize a JSON packer state */
flb_pack_state_init(struct flb_pack_state * s)289 int flb_pack_state_init(struct flb_pack_state *s)
290 {
291 int tokens = 256;
292 size_t size = 256;
293
294 jsmn_init(&s->parser);
295
296 size = sizeof(jsmntok_t) * tokens;
297 s->tokens = flb_calloc(1, size);
298 if (!s->tokens) {
299 flb_errno();
300 return -1;
301 }
302 s->tokens_size = tokens;
303 s->tokens_count = 0;
304 s->last_byte = 0;
305 s->multiple = FLB_FALSE;
306
307 s->buf_data = flb_malloc(size);
308 if (!s->buf_data) {
309 flb_errno();
310 flb_free(s->tokens);
311 return -1;
312 }
313 s->buf_size = size;
314 s->buf_len = 0;
315
316 return 0;
317 }
318
flb_pack_state_reset(struct flb_pack_state * s)319 void flb_pack_state_reset(struct flb_pack_state *s)
320 {
321 flb_free(s->tokens);
322 s->tokens_size = 0;
323 s->tokens_count = 0;
324 s->last_byte = 0;
325 s->buf_size = 0;
326 flb_free(s->buf_data);
327 }
328
329
330 /*
331 * It parse a JSON string and convert it to MessagePack format. The main
332 * difference of this function and the previous flb_pack_json() is that it
333 * keeps a parser and tokens state, allowing to process big messages and
334 * resume the parsing process instead of start from zero.
335 */
flb_pack_json_state(const char * js,size_t len,char ** buffer,int * size,struct flb_pack_state * state)336 int flb_pack_json_state(const char *js, size_t len,
337 char **buffer, int *size,
338 struct flb_pack_state *state)
339 {
340 int ret;
341 int out;
342 int delim = 0;
343 int last = 0;
344 int records;
345 char *buf;
346 jsmntok_t *t;
347
348 ret = flb_json_tokenise(js, len, state);
349 state->multiple = FLB_TRUE;
350 if (ret == FLB_ERR_JSON_PART && state->multiple == FLB_TRUE) {
351 /*
352 * If the caller enabled 'multiple' flag, it means that the incoming
353 * JSON message may have multiple messages concatenated and likely
354 * the last one is only incomplete.
355 *
356 * The following routine aims to determinate how many JSON messages
357 * are OK in the array of tokens, if any, process them and adjust
358 * the JSMN context/buffers.
359 */
360 int i;
361 int found = 0;
362
363 for (i = 1; i < state->tokens_size; i++) {
364 t = &state->tokens[i];
365
366 if (t->start < (state->tokens[i - 1]).start) {
367 break;
368 }
369
370 if (t->parent == -1 && (t->end != 0)) {
371 found++;
372 delim = i;
373 }
374
375 }
376
377 if (found > 0) {
378 state->tokens_count += delim;
379 }
380 else {
381 return ret;
382 }
383 }
384 else if (ret != 0) {
385 return ret;
386 }
387
388 if (state->tokens_count == 0) {
389 state->last_byte = last;
390 return FLB_ERR_JSON_INVAL;
391 }
392
393 buf = tokens_to_msgpack(state, js, &out, &last, &records);
394 if (!buf) {
395 return -1;
396 }
397
398 *size = out;
399 *buffer = buf;
400 state->last_byte = last;
401
402 return 0;
403 }
404
pack_print_fluent_record(size_t cnt,msgpack_unpacked result)405 static int pack_print_fluent_record(size_t cnt, msgpack_unpacked result)
406 {
407 msgpack_object o;
408 msgpack_object *obj;
409 msgpack_object root;
410 struct flb_time tms;
411
412 root = result.data;
413 if (root.type != MSGPACK_OBJECT_ARRAY) {
414 return -1;
415 }
416
417 /* decode expected timestamp only (integer, float or ext) */
418 o = root.via.array.ptr[0];
419 if (o.type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
420 o.type != MSGPACK_OBJECT_FLOAT &&
421 o.type != MSGPACK_OBJECT_EXT) {
422 return -1;
423 }
424
425 /* This is a Fluent Bit record, just do the proper unpacking/printing */
426 flb_time_pop_from_msgpack(&tms, &result, &obj);
427
428 fprintf(stdout, "[%zd] [%"PRIu32".%09lu, ", cnt,
429 (uint32_t) tms.tm.tv_sec, tms.tm.tv_nsec);
430 msgpack_object_print(stdout, *obj);
431 fprintf(stdout, "]\n");
432
433 return 0;
434 }
435
flb_pack_print(const char * data,size_t bytes)436 void flb_pack_print(const char *data, size_t bytes)
437 {
438 int ret;
439 msgpack_unpacked result;
440 size_t off = 0, cnt = 0;
441
442 msgpack_unpacked_init(&result);
443 while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
444 /* Check if we are processing an internal Fluent Bit record */
445 ret = pack_print_fluent_record(cnt, result);
446 if (ret == 0) {
447 continue;
448 }
449
450 printf("[%zd] ", cnt++);
451 msgpack_object_print(stdout, result.data);
452 printf("\n");
453 }
454 msgpack_unpacked_destroy(&result);
455 }
456
457
try_to_write(char * buf,int * off,size_t left,const char * str,size_t str_len)458 static inline int try_to_write(char *buf, int *off, size_t left,
459 const char *str, size_t str_len)
460 {
461 if (str_len <= 0){
462 str_len = strlen(str);
463 }
464 if (left <= *off+str_len) {
465 return FLB_FALSE;
466 }
467 memcpy(buf+*off, str, str_len);
468 *off += str_len;
469 return FLB_TRUE;
470 }
471
472
473 /*
474 * Check if a key exists in the map using the 'offset' as an index to define
475 * which element needs to start looking from
476 */
key_exists_in_map(msgpack_object key,msgpack_object map,int offset)477 static inline int key_exists_in_map(msgpack_object key, msgpack_object map, int offset)
478 {
479 int i;
480 msgpack_object p;
481
482 if (key.type != MSGPACK_OBJECT_STR) {
483 return FLB_FALSE;
484 }
485
486 for (i = offset; i < map.via.map.size; i++) {
487 p = map.via.map.ptr[i].key;
488 if (p.type != MSGPACK_OBJECT_STR) {
489 continue;
490 }
491
492 if (key.via.str.size != p.via.str.size) {
493 continue;
494 }
495
496 if (memcmp(key.via.str.ptr, p.via.str.ptr, p.via.str.size) == 0) {
497 return FLB_TRUE;
498 }
499 }
500
501 return FLB_FALSE;
502 }
503
msgpack2json(char * buf,int * off,size_t left,const msgpack_object * o)504 static int msgpack2json(char *buf, int *off, size_t left,
505 const msgpack_object *o)
506 {
507 int i;
508 int dup;
509 int ret = FLB_FALSE;
510 int loop;
511 int packed;
512
513 switch(o->type) {
514 case MSGPACK_OBJECT_NIL:
515 ret = try_to_write(buf, off, left, "null", 4);
516 break;
517
518 case MSGPACK_OBJECT_BOOLEAN:
519 ret = try_to_write(buf, off, left,
520 (o->via.boolean ? "true":"false"),0);
521
522 break;
523
524 case MSGPACK_OBJECT_POSITIVE_INTEGER:
525 {
526 char temp[32] = {0};
527 i = snprintf(temp, sizeof(temp)-1, "%"PRIu64, o->via.u64);
528 ret = try_to_write(buf, off, left, temp, i);
529 }
530 break;
531
532 case MSGPACK_OBJECT_NEGATIVE_INTEGER:
533 {
534 char temp[32] = {0};
535 i = snprintf(temp, sizeof(temp)-1, "%"PRId64, o->via.i64);
536 ret = try_to_write(buf, off, left, temp, i);
537 }
538 break;
539 case MSGPACK_OBJECT_FLOAT32:
540 case MSGPACK_OBJECT_FLOAT64:
541 {
542 char temp[512] = {0};
543 if (o->via.f64 == (double)(long long int)o->via.f64) {
544 i = snprintf(temp, sizeof(temp)-1, "%.1f", o->via.f64);
545 }
546 else {
547 i = snprintf(temp, sizeof(temp)-1, "%.16g", o->via.f64);
548 }
549 ret = try_to_write(buf, off, left, temp, i);
550 }
551 break;
552
553 case MSGPACK_OBJECT_STR:
554 if (try_to_write(buf, off, left, "\"", 1) &&
555 (o->via.str.size > 0 ?
556 try_to_write_str(buf, off, left, o->via.str.ptr, o->via.str.size)
557 : 1/* nothing to do */) &&
558 try_to_write(buf, off, left, "\"", 1)) {
559 ret = FLB_TRUE;
560 }
561 break;
562
563 case MSGPACK_OBJECT_BIN:
564 if (try_to_write(buf, off, left, "\"", 1) &&
565 (o->via.bin.size > 0 ?
566 try_to_write_str(buf, off, left, o->via.bin.ptr, o->via.bin.size)
567 : 1 /* nothing to do */) &&
568 try_to_write(buf, off, left, "\"", 1)) {
569 ret = FLB_TRUE;
570 }
571 break;
572
573 case MSGPACK_OBJECT_EXT:
574 if (!try_to_write(buf, off, left, "\"", 1)) {
575 goto msg2json_end;
576 }
577 /* ext body. fortmat is similar to printf(1) */
578 {
579 char temp[32] = {0};
580 int len;
581 loop = o->via.ext.size;
582 for(i=0; i<loop; i++) {
583 len = snprintf(temp, sizeof(temp)-1, "\\x%02x", (char)o->via.ext.ptr[i]);
584 if (!try_to_write(buf, off, left, temp, len)) {
585 goto msg2json_end;
586 }
587 }
588 }
589 if (!try_to_write(buf, off, left, "\"", 1)) {
590 goto msg2json_end;
591 }
592 ret = FLB_TRUE;
593 break;
594
595 case MSGPACK_OBJECT_ARRAY:
596 loop = o->via.array.size;
597
598 if (!try_to_write(buf, off, left, "[", 1)) {
599 goto msg2json_end;
600 }
601 if (loop != 0) {
602 msgpack_object* p = o->via.array.ptr;
603 if (!msgpack2json(buf, off, left, p)) {
604 goto msg2json_end;
605 }
606 for (i=1; i<loop; i++) {
607 if (!try_to_write(buf, off, left, ",", 1) ||
608 !msgpack2json(buf, off, left, p+i)) {
609 goto msg2json_end;
610 }
611 }
612 }
613
614 ret = try_to_write(buf, off, left, "]", 1);
615 break;
616
617 case MSGPACK_OBJECT_MAP:
618 loop = o->via.map.size;
619 if (!try_to_write(buf, off, left, "{", 1)) {
620 goto msg2json_end;
621 }
622 if (loop != 0) {
623 msgpack_object k;
624 msgpack_object_kv *p = o->via.map.ptr;
625
626 packed = 0;
627 dup = FLB_FALSE;
628
629 k = o->via.map.ptr[0].key;
630 for (i = 0; i < loop; i++) {
631 k = o->via.map.ptr[i].key;
632 dup = key_exists_in_map(k, *o, i + 1);
633 if (dup == FLB_TRUE) {
634 continue;
635 }
636
637 if (packed > 0) {
638 if (!try_to_write(buf, off, left, ",", 1)) {
639 goto msg2json_end;
640 }
641 }
642
643 if (
644 !msgpack2json(buf, off, left, &(p+i)->key) ||
645 !try_to_write(buf, off, left, ":", 1) ||
646 !msgpack2json(buf, off, left, &(p+i)->val) ) {
647 goto msg2json_end;
648 }
649 packed++;
650 }
651 }
652
653 ret = try_to_write(buf, off, left, "}", 1);
654 break;
655
656 default:
657 flb_warn("[%s] unknown msgpack type %i", __FUNCTION__, o->type);
658 }
659
660 msg2json_end:
661 return ret;
662 }
663
664 /**
665 * convert msgpack to JSON string.
666 * This API is similar to snprintf.
667 *
668 * @param json_str The buffer to fill JSON string.
669 * @param json_size The size of json_str.
670 * @param data The msgpack_unpacked data.
671 * @return success ? a number characters filled : negative value
672 */
flb_msgpack_to_json(char * json_str,size_t json_size,const msgpack_object * obj)673 int flb_msgpack_to_json(char *json_str, size_t json_size,
674 const msgpack_object *obj)
675 {
676 int ret = -1;
677 int off = 0;
678
679 if (json_str == NULL || obj == NULL) {
680 return -1;
681 }
682
683 ret = msgpack2json(json_str, &off, json_size - 1, obj);
684 json_str[off] = '\0';
685 return ret ? off: ret;
686 }
687
flb_msgpack_raw_to_json_sds(const void * in_buf,size_t in_size)688 flb_sds_t flb_msgpack_raw_to_json_sds(const void *in_buf, size_t in_size)
689 {
690 int ret;
691 size_t off = 0;
692 size_t out_size;
693 msgpack_unpacked result;
694 msgpack_object *root;
695 flb_sds_t out_buf;
696 flb_sds_t tmp_buf;
697
698 out_size = in_size * 3 / 2;
699 out_buf = flb_sds_create_size(out_size);
700 if (!out_buf) {
701 flb_errno();
702 return NULL;
703 }
704
705 msgpack_unpacked_init(&result);
706 ret = msgpack_unpack_next(&result, in_buf, in_size, &off);
707 if (ret != MSGPACK_UNPACK_SUCCESS) {
708 flb_sds_destroy(out_buf);
709 msgpack_unpacked_destroy(&result);
710 return NULL;
711 }
712
713 root = &result.data;
714 while (1) {
715 ret = flb_msgpack_to_json(out_buf, out_size, root);
716 if (ret <= 0) {
717 tmp_buf = flb_sds_increase(out_buf, 256);
718 if (tmp_buf) {
719 out_buf = tmp_buf;
720 out_size += 256;
721 }
722 else {
723 flb_errno();
724 flb_sds_destroy(out_buf);
725 msgpack_unpacked_destroy(&result);
726 return NULL;
727 }
728 }
729 else {
730 break;
731 }
732 }
733
734 msgpack_unpacked_destroy(&result);
735 flb_sds_len_set(out_buf, ret);
736
737 return out_buf;
738 }
739
740 /*
741 * Given a 'format' string type, return it integer representation. This
742 * is used by output plugins that uses pack functions to convert
743 * msgpack records to JSON.
744 */
flb_pack_to_json_format_type(const char * str)745 int flb_pack_to_json_format_type(const char *str)
746 {
747 if (strcasecmp(str, "msgpack") == 0) {
748 return FLB_PACK_JSON_FORMAT_NONE;
749 }
750 else if (strcasecmp(str, "json") == 0) {
751 return FLB_PACK_JSON_FORMAT_JSON;
752 }
753 else if (strcasecmp(str, "json_stream") == 0) {
754 return FLB_PACK_JSON_FORMAT_STREAM;
755 }
756 else if (strcasecmp(str, "json_lines") == 0) {
757 return FLB_PACK_JSON_FORMAT_LINES;
758 }
759
760 return -1;
761 }
762
763 /* Given a 'date string type', return it integer representation */
flb_pack_to_json_date_type(const char * str)764 int flb_pack_to_json_date_type(const char *str)
765 {
766 if (strcasecmp(str, "double") == 0) {
767 return FLB_PACK_JSON_DATE_DOUBLE;
768 }
769 else if (strcasecmp(str, "iso8601") == 0) {
770 return FLB_PACK_JSON_DATE_ISO8601;
771 }
772 else if (strcasecmp(str, "epoch") == 0) {
773 return FLB_PACK_JSON_DATE_EPOCH;
774 }
775
776 return -1;
777 }
778
779
flb_pack_msgpack_to_json_format(const char * data,uint64_t bytes,int json_format,int date_format,flb_sds_t date_key)780 flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes,
781 int json_format, int date_format,
782 flb_sds_t date_key)
783 {
784 int i;
785 int len;
786 int ok = MSGPACK_UNPACK_SUCCESS;
787 int records = 0;
788 int map_size;
789 size_t off = 0;
790 char time_formatted[32];
791 size_t s;
792 flb_sds_t out_tmp;
793 flb_sds_t out_js;
794 flb_sds_t out_buf = NULL;
795 msgpack_unpacked result;
796 msgpack_object root;
797 msgpack_object map;
798 msgpack_sbuffer tmp_sbuf;
799 msgpack_packer tmp_pck;
800 msgpack_object *obj;
801 msgpack_object *k;
802 msgpack_object *v;
803 struct tm tm;
804 struct flb_time tms;
805
806 /* Iterate the original buffer and perform adjustments */
807 records = flb_mp_count(data, bytes);
808 if (records <= 0) {
809 return NULL;
810 }
811
812 /* For json lines and streams mode we need a pre-allocated buffer */
813 if (json_format == FLB_PACK_JSON_FORMAT_LINES ||
814 json_format == FLB_PACK_JSON_FORMAT_STREAM) {
815 out_buf = flb_sds_create_size(bytes + bytes / 4);
816 if (!out_buf) {
817 flb_errno();
818 return NULL;
819 }
820 }
821
822 /* Create temporary msgpack buffer */
823 msgpack_sbuffer_init(&tmp_sbuf);
824 msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
825
826 /*
827 * If the format is the original msgpack style of one big array,
828 * registrate the array, otherwise is not necessary. FYI, original format:
829 *
830 * [
831 * [timestamp, map],
832 * [timestamp, map],
833 * [T, M]...
834 * ]
835 */
836 if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
837 msgpack_pack_array(&tmp_pck, records);
838 }
839
840 msgpack_unpacked_init(&result);
841 while (msgpack_unpack_next(&result, data, bytes, &off) == ok) {
842 /* Each array must have two entries: time and record */
843 root = result.data;
844 if (root.type != MSGPACK_OBJECT_ARRAY) {
845 continue;
846 }
847 if (root.via.array.size != 2) {
848 continue;
849 }
850
851 /* Unpack time */
852 flb_time_pop_from_msgpack(&tms, &result, &obj);
853
854 /* Get the record/map */
855 map = root.via.array.ptr[1];
856 if (map.type != MSGPACK_OBJECT_MAP) {
857 continue;
858 }
859 map_size = map.via.map.size;
860
861 if (date_key != NULL) {
862 msgpack_pack_map(&tmp_pck, map_size + 1);
863 }
864 else {
865 msgpack_pack_map(&tmp_pck, map_size);
866 }
867
868 if (date_key != NULL) {
869 /* Append date key */
870 msgpack_pack_str(&tmp_pck, flb_sds_len(date_key));
871 msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key));
872
873 /* Append date value */
874 switch (date_format) {
875 case FLB_PACK_JSON_DATE_DOUBLE:
876 msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms));
877 break;
878 case FLB_PACK_JSON_DATE_ISO8601:
879 /* Format the time, use microsecond precision not nanoseconds */
880 gmtime_r(&tms.tm.tv_sec, &tm);
881 s = strftime(time_formatted, sizeof(time_formatted) - 1,
882 FLB_PACK_JSON_DATE_ISO8601_FMT, &tm);
883
884 len = snprintf(time_formatted + s,
885 sizeof(time_formatted) - 1 - s,
886 ".%06" PRIu64 "Z",
887 (uint64_t) tms.tm.tv_nsec / 1000);
888 s += len;
889 msgpack_pack_str(&tmp_pck, s);
890 msgpack_pack_str_body(&tmp_pck, time_formatted, s);
891 break;
892 case FLB_PACK_JSON_DATE_EPOCH:
893 msgpack_pack_uint64(&tmp_pck, (long long unsigned)(tms.tm.tv_sec));
894 break;
895 }
896 }
897
898 /* Append remaining keys/values */
899 for (i = 0; i < map_size; i++) {
900 k = &map.via.map.ptr[i].key;
901 v = &map.via.map.ptr[i].val;
902 msgpack_pack_object(&tmp_pck, *k);
903 msgpack_pack_object(&tmp_pck, *v);
904 }
905
906 /*
907 * If the format is the original msgpack style, just continue since
908 * we don't care about separator or JSON convertion at this point.
909 */
910 if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
911 continue;
912 }
913
914 /*
915 * Here we handle two types of records concatenation:
916 *
917 * FLB_PACK_JSON_FORMAT_LINES: add breakline (\n) after each record
918 *
919 *
920 * {'ts':abc,'k1':1}
921 * {'ts':abc,'k1':2}
922 * {N}
923 *
924 * FLB_PACK_JSON_FORMAT_STREAM: no separators, e.g:
925 *
926 * {'ts':abc,'k1':1}{'ts':abc,'k1':2}{N}
927 */
928 if (json_format == FLB_PACK_JSON_FORMAT_LINES ||
929 json_format == FLB_PACK_JSON_FORMAT_STREAM) {
930
931 /* Encode current record into JSON in a temporary variable */
932 out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
933 if (!out_js) {
934 flb_sds_destroy(out_buf);
935 msgpack_sbuffer_destroy(&tmp_sbuf);
936 msgpack_unpacked_destroy(&result);
937 return NULL;
938 }
939
940 /*
941 * One map record has been converted, now append it to the
942 * outgoing out_buf sds variable.
943 */
944 out_tmp = flb_sds_cat(out_buf, out_js, flb_sds_len(out_js));
945 if (!out_tmp) {
946 flb_sds_destroy(out_js);
947 flb_sds_destroy(out_buf);
948 msgpack_sbuffer_destroy(&tmp_sbuf);
949 msgpack_unpacked_destroy(&result);
950 return NULL;
951 }
952
953 /* Release temporary json sds buffer */
954 flb_sds_destroy(out_js);
955
956 /* If a realloc happened, check the returned address */
957 if (out_tmp != out_buf) {
958 out_buf = out_tmp;
959 }
960
961 /* Append the breakline only for json lines mode */
962 if (json_format == FLB_PACK_JSON_FORMAT_LINES) {
963 out_tmp = flb_sds_cat(out_buf, "\n", 1);
964 if (!out_tmp) {
965 flb_sds_destroy(out_buf);
966 msgpack_sbuffer_destroy(&tmp_sbuf);
967 msgpack_unpacked_destroy(&result);
968 return NULL;
969 }
970 if (out_tmp != out_buf) {
971 out_buf = out_tmp;
972 }
973 }
974 msgpack_sbuffer_clear(&tmp_sbuf);
975 }
976 }
977
978 /* Release the unpacker */
979 msgpack_unpacked_destroy(&result);
980
981 /* Format to JSON */
982 if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
983 out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
984 msgpack_sbuffer_destroy(&tmp_sbuf);
985
986 if (!out_buf) {
987 return NULL;
988 }
989 }
990 else {
991 msgpack_sbuffer_destroy(&tmp_sbuf);
992 }
993
994 if (out_buf && flb_sds_len(out_buf) == 0) {
995 flb_sds_destroy(out_buf);
996 return NULL;
997 }
998
999 return out_buf;
1000 }
1001
1002 /**
1003 * convert msgpack to JSON string.
1004 * This API is similar to snprintf.
1005 * @param size Estimated length of json str.
1006 * @param data The msgpack_unpacked data.
1007 * @return success ? allocated json str ptr : NULL
1008 */
flb_msgpack_to_json_str(size_t size,const msgpack_object * obj)1009 char *flb_msgpack_to_json_str(size_t size, const msgpack_object *obj)
1010 {
1011 int ret;
1012 char *buf = NULL;
1013 char *tmp;
1014
1015 if (obj == NULL) {
1016 return NULL;
1017 }
1018
1019 if (size <= 0) {
1020 size = 128;
1021 }
1022
1023 buf = flb_malloc(size);
1024 if (!buf) {
1025 flb_errno();
1026 return NULL;
1027 }
1028
1029 while (1) {
1030 ret = flb_msgpack_to_json(buf, size, obj);
1031 if (ret <= 0) {
1032 /* buffer is small. retry.*/
1033 size += 128;
1034 tmp = flb_realloc(buf, size);
1035 if (tmp) {
1036 buf = tmp;
1037 }
1038 else {
1039 flb_free(buf);
1040 flb_errno();
1041 return NULL;
1042 }
1043 }
1044 else {
1045 break;
1046 }
1047 }
1048
1049 return buf;
1050 }
1051
flb_pack_time_now(msgpack_packer * pck)1052 int flb_pack_time_now(msgpack_packer *pck)
1053 {
1054 int ret;
1055 struct flb_time t;
1056
1057 flb_time_get(&t);
1058 ret = flb_time_append_to_msgpack(&t, pck, 0);
1059
1060 return ret;
1061 }
1062
flb_msgpack_expand_map(char * map_data,size_t map_size,msgpack_object_kv ** kv_arr,int kv_arr_len,char ** out_buf,int * out_size)1063 int flb_msgpack_expand_map(char *map_data, size_t map_size,
1064 msgpack_object_kv **kv_arr, int kv_arr_len,
1065 char** out_buf, int* out_size)
1066 {
1067 msgpack_sbuffer sbuf;
1068 msgpack_packer pck;
1069 msgpack_unpacked result;
1070 size_t off = 0;
1071 char *ret_buf;
1072 int map_num;
1073 int i;
1074 int len;
1075
1076 if (map_data == NULL){
1077 return -1;
1078 }
1079
1080 msgpack_unpacked_init(&result);
1081 if ((i=msgpack_unpack_next(&result, map_data, map_size, &off)) !=
1082 MSGPACK_UNPACK_SUCCESS ) {
1083 msgpack_unpacked_destroy(&result);
1084 return -1;
1085 }
1086 if (result.data.type != MSGPACK_OBJECT_MAP) {
1087 msgpack_unpacked_destroy(&result);
1088 return -1;
1089 }
1090
1091 len = result.data.via.map.size;
1092 map_num = kv_arr_len + len;
1093
1094 msgpack_sbuffer_init(&sbuf);
1095 msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
1096 msgpack_pack_map(&pck, map_num);
1097
1098 for (i=0; i<len; i++) {
1099 msgpack_pack_object(&pck, result.data.via.map.ptr[i].key);
1100 msgpack_pack_object(&pck, result.data.via.map.ptr[i].val);
1101 }
1102 for (i=0; i<kv_arr_len; i++){
1103 msgpack_pack_object(&pck, kv_arr[i]->key);
1104 msgpack_pack_object(&pck, kv_arr[i]->val);
1105 }
1106 msgpack_unpacked_destroy(&result);
1107
1108 *out_size = sbuf.size;
1109 ret_buf = flb_malloc(sbuf.size);
1110 *out_buf = ret_buf;
1111 if (*out_buf == NULL) {
1112 flb_errno();
1113 msgpack_sbuffer_destroy(&sbuf);
1114 return -1;
1115 }
1116 memcpy(*out_buf, sbuf.data, sbuf.size);
1117 msgpack_sbuffer_destroy(&sbuf);
1118
1119 return 0;
1120 }
1121