1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_mem.h>
23 #include <fluent-bit/flb_str.h>
24 #include <fluent-bit/flb_log.h>
25 #include <fluent-bit/flb_pack.h>
26 #include <fluent-bit/flb_parser_decoder.h>
27 #include <fluent-bit/flb_unescape.h>
28 #include <fluent-bit/flb_utils.h>
29 #include <msgpack.h>
30 
31 #define TYPE_OUT_STRING  0  /* unstructured text         */
32 #define TYPE_OUT_OBJECT  1  /* structured msgpack object */
33 
34 /* Decode a stringified JSON message */
decode_json(struct flb_parser_dec * dec,const char * in_buf,size_t in_size,char ** out_buf,size_t * out_size,int * out_type)35 static int decode_json(struct flb_parser_dec *dec,
36                        const char *in_buf, size_t in_size,
37                        char **out_buf, size_t *out_size, int *out_type)
38 {
39     int ret;
40     int root_type;
41     int records;
42     char *buf;
43     const char *p;
44     size_t size;
45     size_t len;
46 
47     p = in_buf;
48     while (*p == ' ') p++;
49 
50     len = in_size - (p - in_buf);
51 
52     /* It must be a map or array */
53     if (p[0] != '{' && p[0] != '[') {
54         return -1;
55     }
56 
57     ret = flb_pack_json_recs(p, len, &buf, &size, &root_type, &records);
58     if (ret != 0) {
59         return -1;
60     }
61 
62     /* We expect to decode only one JSON element */
63     if (records != 1) {
64         flb_free(buf);
65         return -1;
66     }
67 
68     /* Only process a packed JSON object */
69     if (root_type != FLB_PACK_JSON_OBJECT) {
70         flb_free(buf);
71         return -1;
72     }
73 
74     *out_buf = buf;
75     *out_size = size;
76     *out_type = TYPE_OUT_OBJECT;
77 
78     return 0;
79 }
80 
decode_escaped(struct flb_parser_dec * dec,const char * in_buf,size_t in_size,char ** out_buf,size_t * out_size,int * out_type)81 static int decode_escaped(struct flb_parser_dec *dec,
82                           const char *in_buf, size_t in_size,
83                           char **out_buf, size_t *out_size, int *out_type)
84 {
85     int len;
86 
87     /* Unescape string */
88     len = flb_unescape_string(in_buf, in_size, &dec->buffer);
89     *out_buf = dec->buffer;
90     *out_size = len;
91     *out_type = TYPE_OUT_STRING;
92 
93     return 0;
94 }
95 
decode_escaped_utf8(struct flb_parser_dec * dec,const char * in_buf,size_t in_size,char ** out_buf,size_t * out_size,int * out_type)96 static int decode_escaped_utf8(struct flb_parser_dec *dec,
97                                const char *in_buf, size_t in_size,
98                                char **out_buf, size_t *out_size, int *out_type)
99 {
100     int len;
101 
102     len = flb_unescape_string_utf8(in_buf, in_size, dec->buffer);
103     *out_buf = dec->buffer;
104     *out_size = len;
105     *out_type = TYPE_OUT_STRING;
106 
107     return 0;
108 }
109 
decode_mysql_quoted(struct flb_parser_dec * dec,char * in_buf,size_t in_size,char ** out_buf,size_t * out_size,int * out_type)110 static int decode_mysql_quoted(struct flb_parser_dec *dec,
111                                char *in_buf, size_t in_size,
112                                char **out_buf, size_t *out_size, int *out_type)
113 {
114     int len;
115     if(in_size < 2) {
116         dec->buffer[0] = in_buf[0];
117         dec->buffer[1] = 0;
118         *out_buf = dec->buffer;
119         *out_size = in_size;
120         *out_type = TYPE_OUT_STRING;
121     }
122     else if(in_buf[0] == '\'' && in_buf[in_size-1] == '\'') {
123         len = flb_mysql_unquote_string(in_buf+1, in_size-2, &dec->buffer);
124         *out_buf = dec->buffer;
125         *out_size = len;
126         *out_type = TYPE_OUT_STRING;
127     }
128     else if(in_buf[0] == '\"' && in_buf[in_size-1] == '\"') {
129         len = flb_mysql_unquote_string(in_buf+1, in_size-2, &dec->buffer);
130         *out_buf = dec->buffer;
131         *out_size = len;
132         *out_type = TYPE_OUT_STRING;
133     }
134     else {
135         memcpy(dec->buffer, in_buf, in_size);
136         dec->buffer[in_size] = 0;
137         *out_buf = dec->buffer;
138         *out_size = in_size;
139         *out_type = TYPE_OUT_STRING;
140     }
141 
142     return 0;
143 }
144 
merge_record_and_extra_keys(const char * in_buf,size_t in_size,const char * extra_buf,size_t extra_size,char ** out_buf,size_t * out_size)145 static int merge_record_and_extra_keys(const char *in_buf, size_t in_size,
146                                        const char *extra_buf, size_t extra_size,
147                                        char **out_buf, size_t *out_size)
148 {
149     int i;
150     int ret;
151     int map_size = 0;
152     size_t in_off = 0;
153     size_t extra_off = 0;
154     msgpack_sbuffer mp_sbuf;
155     msgpack_packer  mp_pck;
156     msgpack_unpacked in_result;
157     msgpack_unpacked extra_result;
158     msgpack_object k;
159     msgpack_object v;
160     msgpack_object map;
161 
162     msgpack_unpacked_init(&in_result);
163     msgpack_unpacked_init(&extra_result);
164 
165     /* Check if the extra buffer have some serialized data */
166     ret = msgpack_unpack_next(&extra_result, extra_buf, extra_size, &extra_off);
167     if (ret != MSGPACK_UNPACK_SUCCESS) {
168         msgpack_unpacked_destroy(&in_result);
169         msgpack_unpacked_destroy(&extra_result);
170         return -1;
171     }
172     msgpack_unpack_next(&in_result, in_buf, in_size, &in_off);
173 
174 
175     msgpack_sbuffer_init(&mp_sbuf);
176     msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
177 
178     map_size = in_result.data.via.map.size;
179     map_size += extra_result.data.via.map.size;
180 
181     msgpack_pack_map(&mp_pck, map_size);
182     map = in_result.data;
183     for (i = 0; i < map.via.map.size; i++) {
184         k = map.via.map.ptr[i].key;
185         v = map.via.map.ptr[i].val;
186         msgpack_pack_object(&mp_pck, k);
187         msgpack_pack_object(&mp_pck, v);
188     }
189 
190     map = extra_result.data;
191     for (i = 0; i < map.via.map.size; i++) {
192         k = map.via.map.ptr[i].key;
193         v = map.via.map.ptr[i].val;
194         msgpack_pack_object(&mp_pck, k);
195         msgpack_pack_object(&mp_pck, v);
196     }
197 
198     msgpack_unpacked_destroy(&in_result);
199     msgpack_unpacked_destroy(&extra_result);
200 
201     *out_buf = mp_sbuf.data;
202     *out_size = mp_sbuf.size;
203 
204     return 0;
205 }
206 
207 /*
208  * Given a msgpack map, apply the parser-decoder rules defined and generate
209  * a new msgpack buffer.
210  */
flb_parser_decoder_do(struct mk_list * decoders,const char * in_buf,size_t in_size,char ** out_buf,size_t * out_size)211 int flb_parser_decoder_do(struct mk_list *decoders,
212                           const char *in_buf, size_t in_size,
213                           char **out_buf, size_t *out_size)
214 {
215     int i;
216     int ret;
217     int matched;
218     int is_decoded;
219     int is_decoded_as;
220     int in_type;
221     int out_type;
222     int dec_type;
223     int extra_keys = FLB_FALSE;
224     size_t off = 0;
225     char *dec_buf;
226     size_t dec_size;
227     flb_sds_t tmp_sds = NULL;
228     flb_sds_t data_sds = NULL;
229     flb_sds_t in_sds = NULL;
230     flb_sds_t out_sds = NULL;
231     struct mk_list *head;
232     struct mk_list *r_head;
233     struct flb_parser_dec *dec = NULL;
234     struct flb_parser_dec_rule *rule;
235     msgpack_object k;
236     msgpack_object v;
237     msgpack_object map;
238     msgpack_unpacked result;
239     msgpack_sbuffer mp_sbuf;
240     msgpack_packer  mp_pck;
241     /* Contexts to handle extra keys to be appended at the end of the log */
242     msgpack_sbuffer extra_mp_sbuf;
243     msgpack_packer  extra_mp_pck;
244 
245     /* Initialize unpacker */
246     msgpack_unpacked_init(&result);
247     msgpack_unpack_next(&result, in_buf, in_size, &off);
248     map = result.data;
249 
250     if (map.type != MSGPACK_OBJECT_MAP) {
251         msgpack_unpacked_destroy(&result);
252         return -1;
253     }
254 
255     /*
256      * First check if any field in the record matches a decoder rule. It's
257      * better to check this before hand otherwise we need to jump directly
258      * to create a "possible new outgoing buffer".
259      */
260     matched = -1;
261     for (i = 0; i < map.via.map.size; i++) {
262         k = map.via.map.ptr[i].key;
263         if (k.type != MSGPACK_OBJECT_STR) {
264             continue;
265         }
266 
267         /* Try to match this key name with decoder's rule */
268         mk_list_foreach(head, decoders) {
269             dec = mk_list_entry(head, struct flb_parser_dec, _head);
270             if (flb_sds_cmp(dec->key, k.via.str.ptr,
271                             k.via.str.size) == 0) {
272                 /* we have a match, stop the check */
273                 matched = i;
274                 break;
275             }
276             else {
277                 matched = -1;
278             }
279         }
280 
281         if (matched >= 0) {
282             break;
283         }
284     }
285 
286     /* No matches, no need to continue */
287     if (matched == -1) {
288         msgpack_unpacked_destroy(&result);
289         return -1;
290     }
291 
292     /* Create new outgoing buffer */
293     msgpack_sbuffer_init(&mp_sbuf);
294     msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
295 
296     /* Register the map (same size) */
297     msgpack_pack_map(&mp_pck, map.via.map.size);
298 
299     /* Compose new outgoing buffer */
300     for (i = 0; i < map.via.map.size; i++) {
301         k = map.via.map.ptr[i].key;
302         v = map.via.map.ptr[i].val;
303 
304         /* Pack right away previous fields in the map */
305         if (i < matched) {
306             msgpack_pack_object(&mp_pck, k);
307             msgpack_pack_object(&mp_pck, v);
308             continue;
309         }
310 
311         /* Process current key names and decoder rules */
312         if (k.type != MSGPACK_OBJECT_STR || v.type != MSGPACK_OBJECT_STR) {
313             msgpack_pack_object(&mp_pck, k);
314             msgpack_pack_object(&mp_pck, v);
315             continue;
316         }
317 
318         /*
319          * Per key, we allow only one successful 'Decode_Field' and one
320          * successful 'Decode_Field_As' rules. Otherwise it may lead
321          * to duplicated entries in the final map.
322          *
323          * is_decoded    => Decode_Field successul ?
324          * is_decoded_as => Decode_Field_As successful ?
325          */
326         is_decoded = FLB_FALSE;
327         is_decoded_as = FLB_FALSE;
328 
329         /* Lookup for decoders associated to the current 'key' */
330         mk_list_foreach(head, decoders) {
331             dec = mk_list_entry(head, struct flb_parser_dec, _head);
332             if (flb_sds_cmp(dec->key, k.via.str.ptr,
333                             k.via.str.size) == 0) {
334                 break;
335             }
336             dec = NULL;
337         }
338 
339         /* No decoder found, pack content */
340         if (!dec) {
341             msgpack_pack_object(&mp_pck, k);
342             msgpack_pack_object(&mp_pck, v);
343             continue;
344         }
345 
346         if (!in_sds) {
347             in_sds  = flb_sds_create_size(v.via.str.size);
348             if (!in_sds) {
349                 break;
350             }
351             out_sds = flb_sds_create_size(v.via.str.size);
352             if (!out_sds) {
353                 break;
354             }
355             data_sds = flb_sds_create_size(v.via.str.size);
356         }
357 
358         /* Copy original content */
359         tmp_sds = flb_sds_copy(data_sds, v.via.str.ptr,
360                                v.via.str.size);
361         if (tmp_sds != data_sds) {
362             data_sds = tmp_sds;
363         }
364 
365         /*
366          * We got a match: 'key name' == 'decoder field name', validate
367          * that we have enough space in our temporary buffer.
368          */
369         if (flb_sds_alloc(dec->buffer) < flb_sds_alloc(data_sds)) {
370             /* Increase buffer size */
371             size_t diff;
372             diff = (flb_sds_alloc(data_sds) - flb_sds_alloc(dec->buffer));
373             tmp_sds = flb_sds_increase(dec->buffer, diff);
374             if (!tmp_sds) {
375                 flb_errno();
376                 break;
377             }
378             if (tmp_sds != dec->buffer) {
379                 dec->buffer = tmp_sds;
380             }
381         }
382 
383         /* Process decoder rules */
384         ret = -1;
385         dec_buf = NULL;
386 
387         /*
388          * If some rule type is FLB_PARSER_DEC_DEFAULT, means that it will
389          * try to register some extra fields as part of the record. For such
390          * case we prepare a temporary buffer to hold these extra keys.
391          *
392          * The content of this buffer is just a serialized number of maps.
393          */
394         if (dec->add_extra_keys == FLB_TRUE) {
395             /* We need to clean up already allocated extra buffers */
396             if (extra_keys == FLB_TRUE) {
397                 msgpack_sbuffer_destroy(&extra_mp_sbuf);
398             }
399             extra_keys = FLB_TRUE;
400             msgpack_sbuffer_init(&extra_mp_sbuf);
401             msgpack_packer_init(&extra_mp_pck, &extra_mp_sbuf,
402                                 msgpack_sbuffer_write);
403         }
404 
405         mk_list_foreach(r_head, &dec->rules) {
406             rule = mk_list_entry(r_head, struct flb_parser_dec_rule, _head);
407 
408             if (rule->type == FLB_PARSER_DEC_DEFAULT &&
409                 rule->action == FLB_PARSER_ACT_DO_NEXT &&
410                 is_decoded == FLB_TRUE) {
411                 continue;
412             }
413 
414             if (is_decoded_as == FLB_TRUE && in_type != TYPE_OUT_STRING) {
415                 continue;
416             }
417 
418             /* Process using defined decoder backend */
419             if (rule->backend == FLB_PARSER_DEC_JSON) {
420                 ret = decode_json(dec, (char *) data_sds, flb_sds_len(data_sds),
421                                   &dec_buf, &dec_size, &dec_type);
422             }
423             else if (rule->backend == FLB_PARSER_DEC_ESCAPED) {
424                 ret = decode_escaped(dec,
425                                      (char *) data_sds, flb_sds_len(data_sds),
426                                      &dec_buf, &dec_size, &dec_type);
427             }
428             else if (rule->backend == FLB_PARSER_DEC_ESCAPED_UTF8) {
429                 ret = decode_escaped_utf8(dec,
430                                      (char *) data_sds, flb_sds_len(data_sds),
431                                      &dec_buf, &dec_size, &dec_type);
432             }
433             else if (rule->backend == FLB_PARSER_DEC_MYSQL_QUOTED) {
434                 ret = decode_mysql_quoted(dec,
435                                           (char *) data_sds, flb_sds_len(data_sds),
436                                           &dec_buf, &dec_size, &dec_type);
437             }
438 
439             /* Check decoder status */
440             if (ret == -1) {
441                 /* Current decoder failed, should we try the next one ? */
442                 if (rule->action == FLB_PARSER_ACT_TRY_NEXT ||
443                     rule->action == FLB_PARSER_ACT_DO_NEXT) {
444                     continue;
445                 }
446 
447                 /* Stop: no more rules should be applied */
448                 break;
449             }
450 
451             /* Internal packing: replace value content in the same key */
452             if (rule->type == FLB_PARSER_DEC_AS) {
453                 tmp_sds = flb_sds_copy(in_sds, dec_buf, dec_size);
454                 if (tmp_sds != in_sds) {
455                     in_sds = tmp_sds;
456                 }
457                 tmp_sds = flb_sds_copy(data_sds, dec_buf, dec_size);
458                 if (tmp_sds != data_sds) {
459                     data_sds = tmp_sds;
460                 }
461                 in_type = dec_type;
462                 is_decoded_as = FLB_TRUE;
463             }
464             else if (rule->type == FLB_PARSER_DEC_DEFAULT) {
465                 tmp_sds = flb_sds_copy(out_sds, dec_buf, dec_size);
466                 if (tmp_sds != out_sds) {
467                     out_sds = tmp_sds;
468                 }
469                 out_type = dec_type;
470                 is_decoded = FLB_TRUE;
471             }
472 
473 
474             if (dec_buf != dec->buffer) {
475                 flb_free(dec_buf);
476             }
477             dec_buf = NULL;
478             dec_size = 0;
479 
480             /* Apply more rules ? */
481             if (rule->action == FLB_PARSER_ACT_DO_NEXT) {
482                 continue;
483             }
484             break;
485         }
486 
487         /* Package the key */
488         msgpack_pack_object(&mp_pck, k);
489 
490         /* We need to place some value for the key in question */
491         if (is_decoded_as == FLB_TRUE) {
492             if (in_type == TYPE_OUT_STRING) {
493                 msgpack_pack_str(&mp_pck, flb_sds_len(in_sds));
494                 msgpack_pack_str_body(&mp_pck,
495                                       in_sds, flb_sds_len(in_sds));
496             }
497             else if (in_type == TYPE_OUT_OBJECT) {
498                 msgpack_sbuffer_write(&mp_sbuf,
499                                       in_sds, flb_sds_len(in_sds));
500             }
501         }
502         else {
503             /* Pack original value */
504             msgpack_pack_object(&mp_pck, v);
505         }
506 
507         /* Package as external keys */
508         if (is_decoded == FLB_TRUE) {
509             if (out_type == TYPE_OUT_STRING) {
510                 flb_error("[parser_decoder] string type is not allowed");
511             }
512             else if (out_type == TYPE_OUT_OBJECT) {
513                 msgpack_sbuffer_write(&extra_mp_sbuf,
514                                       out_sds, flb_sds_len(out_sds));
515             }
516         }
517     }
518 
519     if (in_sds) {
520         flb_sds_destroy(in_sds);
521     }
522     if (out_sds) {
523         flb_sds_destroy(out_sds);
524     }
525     if (data_sds) {
526         flb_sds_destroy(data_sds);
527     }
528 
529     msgpack_unpacked_destroy(&result);
530     *out_buf = mp_sbuf.data;
531     *out_size = mp_sbuf.size;
532 
533     if (extra_keys == FLB_TRUE) {
534         ret = merge_record_and_extra_keys(mp_sbuf.data, mp_sbuf.size,
535                                           extra_mp_sbuf.data, extra_mp_sbuf.size,
536                                           out_buf, out_size);
537         msgpack_sbuffer_destroy(&extra_mp_sbuf);
538         if (ret == 0) {
539             msgpack_sbuffer_destroy(&mp_sbuf);
540             return 0;
541         }
542     }
543 
544     return 0;
545 }
546 
547 /*
548  * Iterate decoders list and lookup for an existing context for 'key_name',
549  * if it does not exists, create and link a new one
550  */
get_decoder_key_context(const char * key_name,int key_len,struct mk_list * list)551 static struct flb_parser_dec *get_decoder_key_context(const char *key_name, int key_len,
552                                                       struct mk_list *list)
553 {
554     struct mk_list *head;
555     struct flb_parser_dec *dec = NULL;
556 
557     mk_list_foreach(head, list) {
558         dec = mk_list_entry(head, struct flb_parser_dec, _head);
559 
560         /* Check if the decoder matches the requested key name */
561         if (flb_sds_cmp(dec->key, key_name, key_len) != 0) {
562             dec = NULL;
563             continue;
564         }
565         else {
566             break;
567         }
568     }
569 
570     if (!dec) {
571         dec = flb_malloc(sizeof(struct flb_parser_dec));
572         if (!dec) {
573             flb_errno();
574             return NULL;
575         }
576 
577         dec->key = flb_sds_create_len(key_name, key_len);
578         if (!dec->key) {
579             flb_errno();
580             flb_free(dec);
581             return NULL;
582         }
583 
584         dec->buffer = flb_sds_create_size(FLB_PARSER_DEC_BUF_SIZE);
585         if (!dec->buffer) {
586             flb_errno();
587             flb_sds_destroy(dec->key);
588             flb_free(dec);
589             return NULL;
590         }
591         dec->add_extra_keys = FLB_FALSE;
592         mk_list_init(&dec->rules);
593         mk_list_add(&dec->_head, list);
594     }
595 
596     return dec;
597 }
598 
flb_parser_decoder_list_create(struct mk_rconf_section * section)599 struct mk_list *flb_parser_decoder_list_create(struct mk_rconf_section *section)
600 {
601     int c = 0;
602     int type;
603     int backend;
604     int size;
605     struct mk_rconf_entry *entry;
606     struct mk_list *head;
607     struct mk_list *list = NULL;
608     struct mk_list *split;
609     struct flb_split_entry *decoder;
610     struct flb_split_entry *field;
611     struct flb_split_entry *action;
612     struct flb_parser_dec *dec;
613     struct flb_parser_dec_rule *dec_rule;
614 
615     /* Global list to be referenced by parent parser definition */
616     list = flb_malloc(sizeof(struct mk_list));
617     if (!list) {
618         flb_errno();
619         return NULL;
620     }
621     mk_list_init(list);
622 
623 
624     mk_list_foreach(head, &section->entries) {
625         entry = mk_list_entry(head, struct mk_rconf_entry, _head);
626 
627         /* Lookup for specific Decode rules */
628         if (strcasecmp(entry->key, "Decode_Field") == 0) {
629             type = FLB_PARSER_DEC_DEFAULT;
630         }
631         else if (strcasecmp(entry->key, "Decode_Field_As") == 0) {
632             type = FLB_PARSER_DEC_AS;
633         }
634         else {
635             continue;
636         }
637 
638         /* Split the value */
639         split = flb_utils_split(entry->val, ' ', 3);
640         if (!split) {
641             flb_error("[parser] invalid number of parameters in decoder");
642             flb_parser_decoder_list_destroy(list);
643             return NULL;
644         }
645 
646         /* We expect at least two values: decoder name and target field */
647         size = mk_list_size(split);
648         if (size < 2) {
649             flb_error("[parser] invalid number of parameters in decoder");
650             flb_utils_split_free(split);
651             flb_parser_decoder_list_destroy(list);
652             return NULL;
653         }
654 
655         /*
656          * Get the rule/entry references:
657          *
658          * decoder: specify the backend that handle decoding (json, escaped..)
659          * field  : the 'key' where decoding should happen
660          * action : optional rules to follow on success or failure
661          */
662         decoder = mk_list_entry_first(split, struct flb_split_entry, _head);
663         field = mk_list_entry_next(&decoder->_head, struct flb_split_entry,
664                                    _head, list);
665         if (size >= 3) {
666             action = mk_list_entry_next(&field->_head, struct flb_split_entry,
667                                         _head, list);
668         }
669         else {
670             action = NULL;
671         }
672 
673         /* Get decoder */
674         if (strcasecmp(decoder->value, "json") == 0) {
675             backend = FLB_PARSER_DEC_JSON;
676         }
677         else if (strcasecmp(decoder->value, "escaped") == 0) {
678             backend = FLB_PARSER_DEC_ESCAPED;
679         }
680         else if (strcasecmp(decoder->value, "escaped_utf8") == 0) {
681             backend = FLB_PARSER_DEC_ESCAPED_UTF8;
682         }
683         else if (strcasecmp(decoder->value, "mysql_quoted") == 0) {
684             backend = FLB_PARSER_DEC_MYSQL_QUOTED;
685         }
686         else {
687             flb_error("[parser] field decoder '%s' unknown", decoder->value);
688             flb_utils_split_free(split);
689             flb_parser_decoder_list_destroy(list);
690             return NULL;
691         }
692 
693         /* Get the parent decoder that will hold the rules defined */
694         dec = get_decoder_key_context(field->value, strlen(field->value), list);
695         if (!dec) {
696             /* Unexpected error */
697             flb_error("[parser] unexpected error, could not get a decoder");
698             flb_utils_split_free(split);
699             flb_parser_decoder_list_destroy(list);
700             return NULL;
701         }
702 
703         /* Create decoder context */
704         dec_rule = flb_calloc(1, sizeof(struct flb_parser_dec_rule));
705         if (!dec_rule) {
706             flb_errno();
707             flb_utils_split_free(split);
708             flb_parser_decoder_list_destroy(list);
709             return NULL;
710         }
711 
712         if (type == FLB_PARSER_DEC_DEFAULT) {
713             dec->add_extra_keys = FLB_TRUE;
714         }
715 
716         dec_rule->type = type;
717         dec_rule->backend = backend;
718         if (action) {
719             if (strcasecmp(action->value, "try_next") == 0) {
720                 dec_rule->action = FLB_PARSER_ACT_TRY_NEXT;
721             }
722             else if (strcasecmp(action->value, "do_next") == 0) {
723                 dec_rule->action = FLB_PARSER_ACT_DO_NEXT;
724             }
725             else {
726                 dec_rule->action = FLB_PARSER_ACT_NONE;
727             }
728         }
729 
730         /* Remove temporary split */
731         flb_utils_split_free(split);
732         mk_list_add(&dec_rule->_head, &dec->rules);
733         c++;
734     }
735 
736     if (c == 0) {
737         flb_free(list);
738         return NULL;
739     }
740 
741     return list;
742 }
743 
flb_parser_decoder_list_destroy(struct mk_list * list)744 int flb_parser_decoder_list_destroy(struct mk_list *list)
745 {
746     int c = 0;
747     struct mk_list *head;
748     struct mk_list *r_head;
749     struct mk_list *tmp;
750     struct mk_list *r_tmp;
751     struct flb_parser_dec *dec;
752     struct flb_parser_dec_rule *dec_rule;
753 
754     mk_list_foreach_safe(head, tmp, list) {
755         dec = mk_list_entry(head, struct flb_parser_dec, _head);
756 
757         /* Destroy rules */
758         mk_list_foreach_safe(r_head, r_tmp, &dec->rules) {
759             dec_rule = mk_list_entry(r_head, struct flb_parser_dec_rule,
760                                      _head);
761             mk_list_del(&dec_rule->_head);
762             flb_free(dec_rule);
763         }
764 
765         mk_list_del(&dec->_head);
766         flb_sds_destroy(dec->key);
767         flb_sds_destroy(dec->buffer);
768         flb_free(dec);
769         c++;
770     }
771 
772     flb_free(list);
773     return c;
774 }
775