1 /*-*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <stdlib.h>
22 #include <string.h>
23 
24 #include <fluent-bit/flb_info.h>
25 #include <fluent-bit/flb_mem.h>
26 #include <fluent-bit/flb_sds.h>
27 #include <fluent-bit/flb_error.h>
28 #include <fluent-bit/flb_utils.h>
29 #include <fluent-bit/flb_sds.h>
30 #include <fluent-bit/flb_time.h>
31 #include <fluent-bit/flb_pack.h>
32 #include <fluent-bit/flb_unescape.h>
33 
34 #include <msgpack.h>
35 #include <jsmn/jsmn.h>
36 
37 #define try_to_write_str  flb_utils_write_str
38 
flb_json_tokenise(const char * js,size_t len,struct flb_pack_state * state)39 int flb_json_tokenise(const char *js, size_t len,
40                       struct flb_pack_state *state)
41 {
42     int ret;
43     int new_tokens = 256;
44     size_t old_size;
45     size_t new_size;
46     void *tmp;
47 
48     ret = jsmn_parse(&state->parser, js, len,
49                      state->tokens, state->tokens_size);
50     while (ret == JSMN_ERROR_NOMEM) {
51         /* Get current size of the array in bytes */
52         old_size = state->tokens_size * sizeof(jsmntok_t);
53 
54         /* New size: add capacity for new 256 entries */
55         new_size = old_size + (sizeof(jsmntok_t) * new_tokens);
56 
57         tmp = flb_realloc_z(state->tokens, old_size, new_size);
58         if (!tmp) {
59             flb_errno();
60             return -1;
61         }
62         state->tokens = tmp;
63         state->tokens_size += new_tokens;
64 
65         ret = jsmn_parse(&state->parser, js, len,
66                          state->tokens, state->tokens_size);
67     }
68 
69     if (ret == JSMN_ERROR_INVAL) {
70         return FLB_ERR_JSON_INVAL;
71     }
72 
73     if (ret == JSMN_ERROR_PART) {
74         /* This is a partial JSON message, just stop */
75         flb_trace("[json tokenise] incomplete");
76         return FLB_ERR_JSON_PART;
77     }
78 
79     state->tokens_count += ret;
80     return 0;
81 }
82 
is_float(const char * buf,int len)83 static inline int is_float(const char *buf, int len)
84 {
85     const char *end = buf + len;
86     const char *p = buf;
87 
88     while (p <= end) {
89         if (*p == 'e' && p < end && *(p + 1) == '-') {
90             return 1;
91         }
92         else if (*p == '.') {
93             return 1;
94         }
95         p++;
96     }
97 
98     return 0;
99 }
100 
101 /* Sanitize incoming JSON string */
pack_string_token(struct flb_pack_state * state,const char * str,int len,msgpack_packer * pck)102 static inline int pack_string_token(struct flb_pack_state *state,
103                                     const char *str, int len,
104                                     msgpack_packer *pck)
105 {
106     int s;
107     int out_len;
108     char *tmp;
109     char *out_buf;
110 
111     if (state->buf_size < len + 1) {
112         s = len + 1;
113         tmp = flb_realloc(state->buf_data, s);
114         if (!tmp) {
115             flb_errno();
116             return -1;
117         }
118         else {
119             state->buf_data = tmp;
120             state->buf_size = s;
121         }
122     }
123     out_buf = state->buf_data;
124 
125     /* Always decode any UTF-8 or special characters */
126     out_len = flb_unescape_string_utf8(str, len, out_buf);
127 
128     /* Pack decoded text */
129     msgpack_pack_str(pck, out_len);
130     msgpack_pack_str_body(pck, out_buf, out_len);
131 
132     return out_len;
133 }
134 
135 /* Receive a tokenized JSON message and convert it to MsgPack */
tokens_to_msgpack(struct flb_pack_state * state,const char * js,int * out_size,int * last_byte,int * out_records)136 static char *tokens_to_msgpack(struct flb_pack_state *state,
137                                const char *js,
138                                int *out_size, int *last_byte,
139                                int *out_records)
140 {
141     int i;
142     int flen;
143     int arr_size;
144     int records = 0;
145     const char *p;
146     char *buf;
147     const jsmntok_t *t;
148     msgpack_packer pck;
149     msgpack_sbuffer sbuf;
150     jsmntok_t *tokens;
151 
152     tokens = state->tokens;
153     arr_size = state->tokens_count;
154 
155     if (arr_size == 0) {
156         return NULL;
157     }
158 
159     /* initialize buffers */
160     msgpack_sbuffer_init(&sbuf);
161     msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
162 
163     for (i = 0; i < arr_size ; i++) {
164         t = &tokens[i];
165 
166         if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
167             break;
168         }
169 
170         if (t->parent == -1) {
171             *last_byte = t->end;
172             records++;
173         }
174 
175         flen = (t->end - t->start);
176         switch (t->type) {
177         case JSMN_OBJECT:
178             msgpack_pack_map(&pck, t->size);
179             break;
180         case JSMN_ARRAY:
181             msgpack_pack_array(&pck, t->size);
182             break;
183         case JSMN_STRING:
184             pack_string_token(state, js + t->start, flen, &pck);
185             break;
186         case JSMN_PRIMITIVE:
187             p = js + t->start;
188             if (*p == 'f') {
189                 msgpack_pack_false(&pck);
190             }
191             else if (*p == 't') {
192                 msgpack_pack_true(&pck);
193             }
194             else if (*p == 'n') {
195                 msgpack_pack_nil(&pck);
196             }
197             else {
198                 if (is_float(p, flen)) {
199                     msgpack_pack_double(&pck, atof(p));
200                 }
201                 else {
202                     msgpack_pack_int64(&pck, atoll(p));
203                 }
204             }
205             break;
206         case JSMN_UNDEFINED:
207             msgpack_sbuffer_destroy(&sbuf);
208             return NULL;
209         }
210     }
211 
212     *out_size = sbuf.size;
213     *out_records = records;
214     buf = sbuf.data;
215 
216     return buf;
217 }
218 
219 /*
220  * It parse a JSON string and convert it to MessagePack format, this packer is
221  * useful when a complete JSON message exists, otherwise it will fail until
222  * the message is complete.
223  *
224  * This routine do not keep a state in the parser, do not use it for big
225  * JSON messages.
226  */
pack_json_to_msgpack(const char * js,size_t len,char ** buffer,size_t * size,int * root_type,int * records)227 static int pack_json_to_msgpack(const char *js, size_t len, char **buffer,
228                                 size_t *size, int *root_type, int *records)
229 {
230     int ret = -1;
231     int n_records;
232     int out;
233     int last;
234     char *buf = NULL;
235     struct flb_pack_state state;
236 
237     ret = flb_pack_state_init(&state);
238     if (ret != 0) {
239         return -1;
240     }
241     ret = flb_json_tokenise(js, len, &state);
242     if (ret != 0) {
243         ret = -1;
244         goto flb_pack_json_end;
245     }
246 
247     if (state.tokens_count == 0) {
248         ret = -1;
249         goto flb_pack_json_end;
250     }
251 
252     buf = tokens_to_msgpack(&state, js, &out, &last, &n_records);
253     if (!buf) {
254         ret = -1;
255         goto flb_pack_json_end;
256     }
257 
258     *root_type = state.tokens[0].type;
259     *size = out;
260     *buffer = buf;
261     *records = n_records;
262     ret = 0;
263 
264  flb_pack_json_end:
265     flb_pack_state_reset(&state);
266     return ret;
267 }
268 
269 /* Pack unlimited serialized JSON messages into msgpack */
flb_pack_json(const char * js,size_t len,char ** buffer,size_t * size,int * root_type)270 int flb_pack_json(const char *js, size_t len, char **buffer, size_t *size,
271                   int *root_type)
272 {
273     int records;
274 
275     return pack_json_to_msgpack(js, len, buffer, size, root_type, &records);
276 }
277 
278 /*
279  * Pack unlimited serialized JSON messages into msgpack, finally it writes on
280  * 'out_records' the number of messages.
281  */
flb_pack_json_recs(const char * js,size_t len,char ** buffer,size_t * size,int * root_type,int * out_records)282 int flb_pack_json_recs(const char *js, size_t len, char **buffer, size_t *size,
283                        int *root_type, int *out_records)
284 {
285     return pack_json_to_msgpack(js, len, buffer, size, root_type, out_records);
286 }
287 
288 /* Initialize a JSON packer state */
flb_pack_state_init(struct flb_pack_state * s)289 int flb_pack_state_init(struct flb_pack_state *s)
290 {
291     int tokens = 256;
292     size_t size = 256;
293 
294     jsmn_init(&s->parser);
295 
296     size = sizeof(jsmntok_t) * tokens;
297     s->tokens = flb_calloc(1, size);
298     if (!s->tokens) {
299         flb_errno();
300         return -1;
301     }
302     s->tokens_size   = tokens;
303     s->tokens_count  = 0;
304     s->last_byte     = 0;
305     s->multiple      = FLB_FALSE;
306 
307     s->buf_data = flb_malloc(size);
308     if (!s->buf_data) {
309         flb_errno();
310         flb_free(s->tokens);
311         return -1;
312     }
313     s->buf_size = size;
314     s->buf_len = 0;
315 
316     return 0;
317 }
318 
flb_pack_state_reset(struct flb_pack_state * s)319 void flb_pack_state_reset(struct flb_pack_state *s)
320 {
321     flb_free(s->tokens);
322     s->tokens_size  = 0;
323     s->tokens_count = 0;
324     s->last_byte    = 0;
325     s->buf_size     = 0;
326     flb_free(s->buf_data);
327 }
328 
329 
330 /*
331  * It parse a JSON string and convert it to MessagePack format. The main
332  * difference of this function and the previous flb_pack_json() is that it
333  * keeps a parser and tokens state, allowing to process big messages and
334  * resume the parsing process instead of start from zero.
335  */
flb_pack_json_state(const char * js,size_t len,char ** buffer,int * size,struct flb_pack_state * state)336 int flb_pack_json_state(const char *js, size_t len,
337                         char **buffer, int *size,
338                         struct flb_pack_state *state)
339 {
340     int ret;
341     int out;
342     int delim = 0;
343     int last =  0;
344     int records;
345     char *buf;
346     jsmntok_t *t;
347 
348     ret = flb_json_tokenise(js, len, state);
349     state->multiple = FLB_TRUE;
350     if (ret == FLB_ERR_JSON_PART && state->multiple == FLB_TRUE) {
351         /*
352          * If the caller enabled 'multiple' flag, it means that the incoming
353          * JSON message may have multiple messages concatenated and likely
354          * the last one is only incomplete.
355          *
356          * The following routine aims to determinate how many JSON messages
357          * are OK in the array of tokens, if any, process them and adjust
358          * the JSMN context/buffers.
359          */
360         int i;
361         int found = 0;
362 
363         for (i = 1; i < state->tokens_size; i++) {
364             t = &state->tokens[i];
365 
366             if (t->start < (state->tokens[i - 1]).start) {
367                 break;
368             }
369 
370             if (t->parent == -1 && (t->end != 0)) {
371                 found++;
372                 delim = i;
373             }
374 
375         }
376 
377         if (found > 0) {
378             state->tokens_count += delim;
379         }
380         else {
381             return ret;
382         }
383     }
384     else if (ret != 0) {
385         return ret;
386     }
387 
388     if (state->tokens_count == 0) {
389         state->last_byte = last;
390         return FLB_ERR_JSON_INVAL;
391     }
392 
393     buf = tokens_to_msgpack(state, js, &out, &last, &records);
394     if (!buf) {
395         return -1;
396     }
397 
398     *size = out;
399     *buffer = buf;
400     state->last_byte = last;
401 
402     return 0;
403 }
404 
pack_print_fluent_record(size_t cnt,msgpack_unpacked result)405 static int pack_print_fluent_record(size_t cnt, msgpack_unpacked result)
406 {
407     msgpack_object o;
408     msgpack_object *obj;
409     msgpack_object root;
410     struct flb_time tms;
411 
412     root = result.data;
413     if (root.type != MSGPACK_OBJECT_ARRAY) {
414         return -1;
415     }
416 
417     /* decode expected timestamp only (integer, float or ext) */
418     o = root.via.array.ptr[0];
419     if (o.type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
420         o.type != MSGPACK_OBJECT_FLOAT &&
421         o.type != MSGPACK_OBJECT_EXT) {
422         return -1;
423     }
424 
425     /* This is a Fluent Bit record, just do the proper unpacking/printing */
426     flb_time_pop_from_msgpack(&tms, &result, &obj);
427 
428     fprintf(stdout, "[%zd] [%"PRIu32".%09lu, ", cnt,
429             (uint32_t) tms.tm.tv_sec, tms.tm.tv_nsec);
430     msgpack_object_print(stdout, *obj);
431     fprintf(stdout, "]\n");
432 
433     return 0;
434 }
435 
flb_pack_print(const char * data,size_t bytes)436 void flb_pack_print(const char *data, size_t bytes)
437 {
438     int ret;
439     msgpack_unpacked result;
440     size_t off = 0, cnt = 0;
441 
442     msgpack_unpacked_init(&result);
443     while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
444         /* Check if we are processing an internal Fluent Bit record */
445         ret = pack_print_fluent_record(cnt, result);
446         if (ret == 0) {
447             continue;
448         }
449 
450         printf("[%zd] ", cnt++);
451         msgpack_object_print(stdout, result.data);
452         printf("\n");
453     }
454     msgpack_unpacked_destroy(&result);
455 }
456 
457 
try_to_write(char * buf,int * off,size_t left,const char * str,size_t str_len)458 static inline int try_to_write(char *buf, int *off, size_t left,
459                                const char *str, size_t str_len)
460 {
461     if (str_len <= 0){
462         str_len = strlen(str);
463     }
464     if (left <= *off+str_len) {
465         return FLB_FALSE;
466     }
467     memcpy(buf+*off, str, str_len);
468     *off += str_len;
469     return FLB_TRUE;
470 }
471 
472 
473 /*
474  * Check if a key exists in the map using the 'offset' as an index to define
475  * which element needs to start looking from
476  */
key_exists_in_map(msgpack_object key,msgpack_object map,int offset)477 static inline int key_exists_in_map(msgpack_object key, msgpack_object map, int offset)
478 {
479     int i;
480     msgpack_object p;
481 
482     if (key.type != MSGPACK_OBJECT_STR) {
483         return FLB_FALSE;
484     }
485 
486     for (i = offset; i < map.via.map.size; i++) {
487         p = map.via.map.ptr[i].key;
488         if (p.type != MSGPACK_OBJECT_STR) {
489             continue;
490         }
491 
492         if (key.via.str.size != p.via.str.size) {
493             continue;
494         }
495 
496         if (memcmp(key.via.str.ptr, p.via.str.ptr, p.via.str.size) == 0) {
497             return FLB_TRUE;
498         }
499     }
500 
501     return FLB_FALSE;
502 }
503 
msgpack2json(char * buf,int * off,size_t left,const msgpack_object * o)504 static int msgpack2json(char *buf, int *off, size_t left,
505                         const msgpack_object *o)
506 {
507     int i;
508     int dup;
509     int ret = FLB_FALSE;
510     int loop;
511     int packed;
512 
513     switch(o->type) {
514     case MSGPACK_OBJECT_NIL:
515         ret = try_to_write(buf, off, left, "null", 4);
516         break;
517 
518     case MSGPACK_OBJECT_BOOLEAN:
519         ret = try_to_write(buf, off, left,
520                            (o->via.boolean ? "true":"false"),0);
521 
522         break;
523 
524     case MSGPACK_OBJECT_POSITIVE_INTEGER:
525         {
526             char temp[32] = {0};
527             i = snprintf(temp, sizeof(temp)-1, "%"PRIu64, o->via.u64);
528             ret = try_to_write(buf, off, left, temp, i);
529         }
530         break;
531 
532     case MSGPACK_OBJECT_NEGATIVE_INTEGER:
533         {
534             char temp[32] = {0};
535             i = snprintf(temp, sizeof(temp)-1, "%"PRId64, o->via.i64);
536             ret = try_to_write(buf, off, left, temp, i);
537         }
538         break;
539     case MSGPACK_OBJECT_FLOAT32:
540     case MSGPACK_OBJECT_FLOAT64:
541         {
542             char temp[512] = {0};
543             if (o->via.f64 == (double)(long long int)o->via.f64) {
544                 i = snprintf(temp, sizeof(temp)-1, "%.1f", o->via.f64);
545             }
546             else {
547                 i = snprintf(temp, sizeof(temp)-1, "%.16g", o->via.f64);
548             }
549             ret = try_to_write(buf, off, left, temp, i);
550         }
551         break;
552 
553     case MSGPACK_OBJECT_STR:
554         if (try_to_write(buf, off, left, "\"", 1) &&
555             (o->via.str.size > 0 ?
556              try_to_write_str(buf, off, left, o->via.str.ptr, o->via.str.size)
557              : 1/* nothing to do */) &&
558             try_to_write(buf, off, left, "\"", 1)) {
559             ret = FLB_TRUE;
560         }
561         break;
562 
563     case MSGPACK_OBJECT_BIN:
564         if (try_to_write(buf, off, left, "\"", 1) &&
565             (o->via.bin.size > 0 ?
566              try_to_write_str(buf, off, left, o->via.bin.ptr, o->via.bin.size)
567               : 1 /* nothing to do */) &&
568             try_to_write(buf, off, left, "\"", 1)) {
569             ret = FLB_TRUE;
570         }
571         break;
572 
573     case MSGPACK_OBJECT_EXT:
574         if (!try_to_write(buf, off, left, "\"", 1)) {
575             goto msg2json_end;
576         }
577         /* ext body. fortmat is similar to printf(1) */
578         {
579             char temp[32] = {0};
580             int  len;
581             loop = o->via.ext.size;
582             for(i=0; i<loop; i++) {
583                 len = snprintf(temp, sizeof(temp)-1, "\\x%02x", (char)o->via.ext.ptr[i]);
584                 if (!try_to_write(buf, off, left, temp, len)) {
585                     goto msg2json_end;
586                 }
587             }
588         }
589         if (!try_to_write(buf, off, left, "\"", 1)) {
590             goto msg2json_end;
591         }
592         ret = FLB_TRUE;
593         break;
594 
595     case MSGPACK_OBJECT_ARRAY:
596         loop = o->via.array.size;
597 
598         if (!try_to_write(buf, off, left, "[", 1)) {
599             goto msg2json_end;
600         }
601         if (loop != 0) {
602             msgpack_object* p = o->via.array.ptr;
603             if (!msgpack2json(buf, off, left, p)) {
604                 goto msg2json_end;
605             }
606             for (i=1; i<loop; i++) {
607                 if (!try_to_write(buf, off, left, ",", 1) ||
608                     !msgpack2json(buf, off, left, p+i)) {
609                     goto msg2json_end;
610                 }
611             }
612         }
613 
614         ret = try_to_write(buf, off, left, "]", 1);
615         break;
616 
617     case MSGPACK_OBJECT_MAP:
618         loop = o->via.map.size;
619         if (!try_to_write(buf, off, left, "{", 1)) {
620             goto msg2json_end;
621         }
622         if (loop != 0) {
623             msgpack_object k;
624             msgpack_object_kv *p = o->via.map.ptr;
625 
626             packed = 0;
627             dup = FLB_FALSE;
628 
629             k = o->via.map.ptr[0].key;
630             for (i = 0; i < loop; i++) {
631                 k = o->via.map.ptr[i].key;
632                 dup = key_exists_in_map(k, *o, i + 1);
633                 if (dup == FLB_TRUE) {
634                     continue;
635                 }
636 
637                 if (packed > 0) {
638                     if (!try_to_write(buf, off, left, ",", 1)) {
639                         goto msg2json_end;
640                     }
641                 }
642 
643                 if (
644                     !msgpack2json(buf, off, left, &(p+i)->key) ||
645                     !try_to_write(buf, off, left, ":", 1)  ||
646                     !msgpack2json(buf, off, left, &(p+i)->val) ) {
647                     goto msg2json_end;
648                 }
649                 packed++;
650             }
651         }
652 
653         ret = try_to_write(buf, off, left, "}", 1);
654         break;
655 
656     default:
657         flb_warn("[%s] unknown msgpack type %i", __FUNCTION__, o->type);
658     }
659 
660  msg2json_end:
661     return ret;
662 }
663 
664 /**
665  *  convert msgpack to JSON string.
666  *  This API is similar to snprintf.
667  *
668  *  @param  json_str  The buffer to fill JSON string.
669  *  @param  json_size The size of json_str.
670  *  @param  data      The msgpack_unpacked data.
671  *  @return success   ? a number characters filled : negative value
672  */
flb_msgpack_to_json(char * json_str,size_t json_size,const msgpack_object * obj)673 int flb_msgpack_to_json(char *json_str, size_t json_size,
674                         const msgpack_object *obj)
675 {
676     int ret = -1;
677     int off = 0;
678 
679     if (json_str == NULL || obj == NULL) {
680         return -1;
681     }
682 
683     ret = msgpack2json(json_str, &off, json_size - 1, obj);
684     json_str[off] = '\0';
685     return ret ? off: ret;
686 }
687 
flb_msgpack_raw_to_json_sds(const void * in_buf,size_t in_size)688 flb_sds_t flb_msgpack_raw_to_json_sds(const void *in_buf, size_t in_size)
689 {
690     int ret;
691     size_t off = 0;
692     size_t out_size;
693     msgpack_unpacked result;
694     msgpack_object *root;
695     flb_sds_t out_buf;
696     flb_sds_t tmp_buf;
697 
698     out_size = in_size * 3 / 2;
699     out_buf = flb_sds_create_size(out_size);
700     if (!out_buf) {
701         flb_errno();
702         return NULL;
703     }
704 
705     msgpack_unpacked_init(&result);
706     ret = msgpack_unpack_next(&result, in_buf, in_size, &off);
707     if (ret != MSGPACK_UNPACK_SUCCESS) {
708         flb_sds_destroy(out_buf);
709         msgpack_unpacked_destroy(&result);
710         return NULL;
711     }
712 
713     root = &result.data;
714     while (1) {
715         ret = flb_msgpack_to_json(out_buf, out_size, root);
716         if (ret <= 0) {
717             tmp_buf = flb_sds_increase(out_buf, 256);
718             if (tmp_buf) {
719                 out_buf = tmp_buf;
720                 out_size += 256;
721             }
722             else {
723                 flb_errno();
724                 flb_sds_destroy(out_buf);
725                 msgpack_unpacked_destroy(&result);
726                 return NULL;
727             }
728         }
729         else {
730             break;
731         }
732     }
733 
734     msgpack_unpacked_destroy(&result);
735     flb_sds_len_set(out_buf, ret);
736 
737     return out_buf;
738 }
739 
740 /*
741  * Given a 'format' string type, return it integer representation. This
742  * is used by output plugins that uses pack functions to convert
743  * msgpack records to JSON.
744  */
flb_pack_to_json_format_type(const char * str)745 int flb_pack_to_json_format_type(const char *str)
746 {
747     if (strcasecmp(str, "msgpack") == 0) {
748         return FLB_PACK_JSON_FORMAT_NONE;
749     }
750     else if (strcasecmp(str, "json") == 0) {
751         return FLB_PACK_JSON_FORMAT_JSON;
752     }
753     else if (strcasecmp(str, "json_stream") == 0) {
754         return FLB_PACK_JSON_FORMAT_STREAM;
755     }
756     else if (strcasecmp(str, "json_lines") == 0) {
757         return FLB_PACK_JSON_FORMAT_LINES;
758     }
759 
760     return -1;
761 }
762 
763 /* Given a 'date string type', return it integer representation */
flb_pack_to_json_date_type(const char * str)764 int flb_pack_to_json_date_type(const char *str)
765 {
766     if (strcasecmp(str, "double") == 0) {
767         return FLB_PACK_JSON_DATE_DOUBLE;
768     }
769     else if (strcasecmp(str, "iso8601") == 0) {
770         return FLB_PACK_JSON_DATE_ISO8601;
771     }
772     else if (strcasecmp(str, "epoch") == 0) {
773         return FLB_PACK_JSON_DATE_EPOCH;
774     }
775 
776     return -1;
777 }
778 
779 
flb_pack_msgpack_to_json_format(const char * data,uint64_t bytes,int json_format,int date_format,flb_sds_t date_key)780 flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes,
781                                           int json_format, int date_format,
782                                           flb_sds_t date_key)
783 {
784     int i;
785     int len;
786     int ok = MSGPACK_UNPACK_SUCCESS;
787     int records = 0;
788     int map_size;
789     size_t off = 0;
790     char time_formatted[32];
791     size_t s;
792     flb_sds_t out_tmp;
793     flb_sds_t out_js;
794     flb_sds_t out_buf = NULL;
795     msgpack_unpacked result;
796     msgpack_object root;
797     msgpack_object map;
798     msgpack_sbuffer tmp_sbuf;
799     msgpack_packer tmp_pck;
800     msgpack_object *obj;
801     msgpack_object *k;
802     msgpack_object *v;
803     struct tm tm;
804     struct flb_time tms;
805 
806     /* Iterate the original buffer and perform adjustments */
807     records = flb_mp_count(data, bytes);
808     if (records <= 0) {
809         return NULL;
810     }
811 
812     /* For json lines and streams mode we need a pre-allocated buffer */
813     if (json_format == FLB_PACK_JSON_FORMAT_LINES ||
814         json_format == FLB_PACK_JSON_FORMAT_STREAM) {
815         out_buf = flb_sds_create_size(bytes + bytes / 4);
816         if (!out_buf) {
817             flb_errno();
818             return NULL;
819         }
820     }
821 
822     /* Create temporary msgpack buffer */
823     msgpack_sbuffer_init(&tmp_sbuf);
824     msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);
825 
826     /*
827      * If the format is the original msgpack style of one big array,
828      * registrate the array, otherwise is not necessary. FYI, original format:
829      *
830      * [
831      *   [timestamp, map],
832      *   [timestamp, map],
833      *   [T, M]...
834      * ]
835      */
836     if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
837         msgpack_pack_array(&tmp_pck, records);
838     }
839 
840     msgpack_unpacked_init(&result);
841     while (msgpack_unpack_next(&result, data, bytes, &off) == ok) {
842         /* Each array must have two entries: time and record */
843         root = result.data;
844         if (root.type != MSGPACK_OBJECT_ARRAY) {
845             continue;
846         }
847         if (root.via.array.size != 2) {
848             continue;
849         }
850 
851         /* Unpack time */
852         flb_time_pop_from_msgpack(&tms, &result, &obj);
853 
854         /* Get the record/map */
855         map = root.via.array.ptr[1];
856         if (map.type != MSGPACK_OBJECT_MAP) {
857             continue;
858         }
859         map_size = map.via.map.size;
860 
861         if (date_key != NULL) {
862             msgpack_pack_map(&tmp_pck, map_size + 1);
863         }
864         else {
865             msgpack_pack_map(&tmp_pck, map_size);
866         }
867 
868         if (date_key != NULL) {
869             /* Append date key */
870             msgpack_pack_str(&tmp_pck, flb_sds_len(date_key));
871             msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key));
872 
873             /* Append date value */
874             switch (date_format) {
875             case FLB_PACK_JSON_DATE_DOUBLE:
876                 msgpack_pack_double(&tmp_pck, flb_time_to_double(&tms));
877                 break;
878             case FLB_PACK_JSON_DATE_ISO8601:
879             /* Format the time, use microsecond precision not nanoseconds */
880                 gmtime_r(&tms.tm.tv_sec, &tm);
881                 s = strftime(time_formatted, sizeof(time_formatted) - 1,
882                              FLB_PACK_JSON_DATE_ISO8601_FMT, &tm);
883 
884                 len = snprintf(time_formatted + s,
885                                sizeof(time_formatted) - 1 - s,
886                                ".%06" PRIu64 "Z",
887                                (uint64_t) tms.tm.tv_nsec / 1000);
888                 s += len;
889                 msgpack_pack_str(&tmp_pck, s);
890                 msgpack_pack_str_body(&tmp_pck, time_formatted, s);
891                 break;
892             case FLB_PACK_JSON_DATE_EPOCH:
893                 msgpack_pack_uint64(&tmp_pck, (long long unsigned)(tms.tm.tv_sec));
894                 break;
895             }
896         }
897 
898         /* Append remaining keys/values */
899         for (i = 0; i < map_size; i++) {
900             k = &map.via.map.ptr[i].key;
901             v = &map.via.map.ptr[i].val;
902             msgpack_pack_object(&tmp_pck, *k);
903             msgpack_pack_object(&tmp_pck, *v);
904         }
905 
906         /*
907          * If the format is the original msgpack style, just continue since
908          * we don't care about separator or JSON convertion at this point.
909          */
910         if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
911             continue;
912         }
913 
914         /*
915          * Here we handle two types of records concatenation:
916          *
917          * FLB_PACK_JSON_FORMAT_LINES: add  breakline (\n) after each record
918          *
919          *
920          *     {'ts':abc,'k1':1}
921          *     {'ts':abc,'k1':2}
922          *     {N}
923          *
924          * FLB_PACK_JSON_FORMAT_STREAM: no separators, e.g:
925          *
926          *     {'ts':abc,'k1':1}{'ts':abc,'k1':2}{N}
927          */
928         if (json_format == FLB_PACK_JSON_FORMAT_LINES ||
929             json_format == FLB_PACK_JSON_FORMAT_STREAM) {
930 
931             /* Encode current record into JSON in a temporary variable */
932             out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
933             if (!out_js) {
934                 flb_sds_destroy(out_buf);
935                 msgpack_sbuffer_destroy(&tmp_sbuf);
936                 msgpack_unpacked_destroy(&result);
937                 return NULL;
938             }
939 
940             /*
941              * One map record has been converted, now append it to the
942              * outgoing out_buf sds variable.
943              */
944             out_tmp = flb_sds_cat(out_buf, out_js, flb_sds_len(out_js));
945             if (!out_tmp) {
946                 flb_sds_destroy(out_js);
947                 flb_sds_destroy(out_buf);
948                 msgpack_sbuffer_destroy(&tmp_sbuf);
949                 msgpack_unpacked_destroy(&result);
950                 return NULL;
951             }
952 
953             /* Release temporary json sds buffer */
954             flb_sds_destroy(out_js);
955 
956             /* If a realloc happened, check the returned address */
957             if (out_tmp != out_buf) {
958                 out_buf = out_tmp;
959             }
960 
961             /* Append the breakline only for json lines mode */
962             if (json_format == FLB_PACK_JSON_FORMAT_LINES) {
963                 out_tmp = flb_sds_cat(out_buf, "\n", 1);
964                 if (!out_tmp) {
965                     flb_sds_destroy(out_buf);
966                     msgpack_sbuffer_destroy(&tmp_sbuf);
967                     msgpack_unpacked_destroy(&result);
968                     return NULL;
969                 }
970                 if (out_tmp != out_buf) {
971                     out_buf = out_tmp;
972                 }
973             }
974             msgpack_sbuffer_clear(&tmp_sbuf);
975         }
976     }
977 
978     /* Release the unpacker */
979     msgpack_unpacked_destroy(&result);
980 
981     /* Format to JSON */
982     if (json_format == FLB_PACK_JSON_FORMAT_JSON) {
983         out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size);
984         msgpack_sbuffer_destroy(&tmp_sbuf);
985 
986         if (!out_buf) {
987             return NULL;
988         }
989     }
990     else {
991         msgpack_sbuffer_destroy(&tmp_sbuf);
992     }
993 
994     if (out_buf && flb_sds_len(out_buf) == 0) {
995         flb_sds_destroy(out_buf);
996         return NULL;
997     }
998 
999     return out_buf;
1000 }
1001 
1002 /**
1003  *  convert msgpack to JSON string.
1004  *  This API is similar to snprintf.
1005  *  @param  size     Estimated length of json str.
1006  *  @param  data     The msgpack_unpacked data.
1007  *  @return success  ? allocated json str ptr : NULL
1008  */
flb_msgpack_to_json_str(size_t size,const msgpack_object * obj)1009 char *flb_msgpack_to_json_str(size_t size, const msgpack_object *obj)
1010 {
1011     int ret;
1012     char *buf = NULL;
1013     char *tmp;
1014 
1015     if (obj == NULL) {
1016         return NULL;
1017     }
1018 
1019     if (size <= 0) {
1020         size = 128;
1021     }
1022 
1023     buf = flb_malloc(size);
1024     if (!buf) {
1025         flb_errno();
1026         return NULL;
1027     }
1028 
1029     while (1) {
1030         ret = flb_msgpack_to_json(buf, size, obj);
1031         if (ret <= 0) {
1032             /* buffer is small. retry.*/
1033             size += 128;
1034             tmp = flb_realloc(buf, size);
1035             if (tmp) {
1036                 buf = tmp;
1037             }
1038             else {
1039                 flb_free(buf);
1040                 flb_errno();
1041                 return NULL;
1042             }
1043         }
1044         else {
1045             break;
1046         }
1047     }
1048 
1049     return buf;
1050 }
1051 
flb_pack_time_now(msgpack_packer * pck)1052 int flb_pack_time_now(msgpack_packer *pck)
1053 {
1054     int ret;
1055     struct flb_time t;
1056 
1057     flb_time_get(&t);
1058     ret = flb_time_append_to_msgpack(&t, pck, 0);
1059 
1060     return ret;
1061 }
1062 
flb_msgpack_expand_map(char * map_data,size_t map_size,msgpack_object_kv ** kv_arr,int kv_arr_len,char ** out_buf,int * out_size)1063 int flb_msgpack_expand_map(char *map_data, size_t map_size,
1064                            msgpack_object_kv **kv_arr, int kv_arr_len,
1065                            char** out_buf, int* out_size)
1066 {
1067     msgpack_sbuffer sbuf;
1068     msgpack_packer  pck;
1069     msgpack_unpacked result;
1070     size_t off = 0;
1071     char *ret_buf;
1072     int map_num;
1073     int i;
1074     int len;
1075 
1076     if (map_data == NULL){
1077         return -1;
1078     }
1079 
1080     msgpack_unpacked_init(&result);
1081     if ((i=msgpack_unpack_next(&result, map_data, map_size, &off)) !=
1082         MSGPACK_UNPACK_SUCCESS ) {
1083         msgpack_unpacked_destroy(&result);
1084         return -1;
1085     }
1086     if (result.data.type != MSGPACK_OBJECT_MAP) {
1087         msgpack_unpacked_destroy(&result);
1088         return -1;
1089     }
1090 
1091     len = result.data.via.map.size;
1092     map_num = kv_arr_len + len;
1093 
1094     msgpack_sbuffer_init(&sbuf);
1095     msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
1096     msgpack_pack_map(&pck, map_num);
1097 
1098     for (i=0; i<len; i++) {
1099         msgpack_pack_object(&pck, result.data.via.map.ptr[i].key);
1100         msgpack_pack_object(&pck, result.data.via.map.ptr[i].val);
1101     }
1102     for (i=0; i<kv_arr_len; i++){
1103         msgpack_pack_object(&pck, kv_arr[i]->key);
1104         msgpack_pack_object(&pck, kv_arr[i]->val);
1105     }
1106     msgpack_unpacked_destroy(&result);
1107 
1108     *out_size = sbuf.size;
1109     ret_buf  = flb_malloc(sbuf.size);
1110     *out_buf = ret_buf;
1111     if (*out_buf == NULL) {
1112         flb_errno();
1113         msgpack_sbuffer_destroy(&sbuf);
1114         return -1;
1115     }
1116     memcpy(*out_buf, sbuf.data, sbuf.size);
1117     msgpack_sbuffer_destroy(&sbuf);
1118 
1119     return 0;
1120 }
1121