1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_mem.h>
23 #include <fluent-bit/flb_str.h>
24 #include <fluent-bit/flb_log.h>
25 #include <fluent-bit/flb_pack.h>
26 #include <fluent-bit/flb_parser_decoder.h>
27 #include <fluent-bit/flb_unescape.h>
28 #include <fluent-bit/flb_utils.h>
29 #include <msgpack.h>
30
31 #define TYPE_OUT_STRING 0 /* unstructured text */
32 #define TYPE_OUT_OBJECT 1 /* structured msgpack object */
33
34 /* Decode a stringified JSON message */
decode_json(struct flb_parser_dec * dec,const char * in_buf,size_t in_size,char ** out_buf,size_t * out_size,int * out_type)35 static int decode_json(struct flb_parser_dec *dec,
36 const char *in_buf, size_t in_size,
37 char **out_buf, size_t *out_size, int *out_type)
38 {
39 int ret;
40 int root_type;
41 int records;
42 char *buf;
43 const char *p;
44 size_t size;
45 size_t len;
46
47 p = in_buf;
48 while (*p == ' ') p++;
49
50 len = in_size - (p - in_buf);
51
52 /* It must be a map or array */
53 if (p[0] != '{' && p[0] != '[') {
54 return -1;
55 }
56
57 ret = flb_pack_json_recs(p, len, &buf, &size, &root_type, &records);
58 if (ret != 0) {
59 return -1;
60 }
61
62 /* We expect to decode only one JSON element */
63 if (records != 1) {
64 flb_free(buf);
65 return -1;
66 }
67
68 /* Only process a packed JSON object */
69 if (root_type != FLB_PACK_JSON_OBJECT) {
70 flb_free(buf);
71 return -1;
72 }
73
74 *out_buf = buf;
75 *out_size = size;
76 *out_type = TYPE_OUT_OBJECT;
77
78 return 0;
79 }
80
decode_escaped(struct flb_parser_dec * dec,const char * in_buf,size_t in_size,char ** out_buf,size_t * out_size,int * out_type)81 static int decode_escaped(struct flb_parser_dec *dec,
82 const char *in_buf, size_t in_size,
83 char **out_buf, size_t *out_size, int *out_type)
84 {
85 int len;
86
87 /* Unescape string */
88 len = flb_unescape_string(in_buf, in_size, &dec->buffer);
89 *out_buf = dec->buffer;
90 *out_size = len;
91 *out_type = TYPE_OUT_STRING;
92
93 return 0;
94 }
95
decode_escaped_utf8(struct flb_parser_dec * dec,const char * in_buf,size_t in_size,char ** out_buf,size_t * out_size,int * out_type)96 static int decode_escaped_utf8(struct flb_parser_dec *dec,
97 const char *in_buf, size_t in_size,
98 char **out_buf, size_t *out_size, int *out_type)
99 {
100 int len;
101
102 len = flb_unescape_string_utf8(in_buf, in_size, dec->buffer);
103 *out_buf = dec->buffer;
104 *out_size = len;
105 *out_type = TYPE_OUT_STRING;
106
107 return 0;
108 }
109
decode_mysql_quoted(struct flb_parser_dec * dec,char * in_buf,size_t in_size,char ** out_buf,size_t * out_size,int * out_type)110 static int decode_mysql_quoted(struct flb_parser_dec *dec,
111 char *in_buf, size_t in_size,
112 char **out_buf, size_t *out_size, int *out_type)
113 {
114 int len;
115 if(in_size < 2) {
116 dec->buffer[0] = in_buf[0];
117 dec->buffer[1] = 0;
118 *out_buf = dec->buffer;
119 *out_size = in_size;
120 *out_type = TYPE_OUT_STRING;
121 }
122 else if(in_buf[0] == '\'' && in_buf[in_size-1] == '\'') {
123 len = flb_mysql_unquote_string(in_buf+1, in_size-2, &dec->buffer);
124 *out_buf = dec->buffer;
125 *out_size = len;
126 *out_type = TYPE_OUT_STRING;
127 }
128 else if(in_buf[0] == '\"' && in_buf[in_size-1] == '\"') {
129 len = flb_mysql_unquote_string(in_buf+1, in_size-2, &dec->buffer);
130 *out_buf = dec->buffer;
131 *out_size = len;
132 *out_type = TYPE_OUT_STRING;
133 }
134 else {
135 memcpy(dec->buffer, in_buf, in_size);
136 dec->buffer[in_size] = 0;
137 *out_buf = dec->buffer;
138 *out_size = in_size;
139 *out_type = TYPE_OUT_STRING;
140 }
141
142 return 0;
143 }
144
merge_record_and_extra_keys(const char * in_buf,size_t in_size,const char * extra_buf,size_t extra_size,char ** out_buf,size_t * out_size)145 static int merge_record_and_extra_keys(const char *in_buf, size_t in_size,
146 const char *extra_buf, size_t extra_size,
147 char **out_buf, size_t *out_size)
148 {
149 int i;
150 int ret;
151 int map_size = 0;
152 size_t in_off = 0;
153 size_t extra_off = 0;
154 msgpack_sbuffer mp_sbuf;
155 msgpack_packer mp_pck;
156 msgpack_unpacked in_result;
157 msgpack_unpacked extra_result;
158 msgpack_object k;
159 msgpack_object v;
160 msgpack_object map;
161
162 msgpack_unpacked_init(&in_result);
163 msgpack_unpacked_init(&extra_result);
164
165 /* Check if the extra buffer have some serialized data */
166 ret = msgpack_unpack_next(&extra_result, extra_buf, extra_size, &extra_off);
167 if (ret != MSGPACK_UNPACK_SUCCESS) {
168 msgpack_unpacked_destroy(&in_result);
169 msgpack_unpacked_destroy(&extra_result);
170 return -1;
171 }
172 msgpack_unpack_next(&in_result, in_buf, in_size, &in_off);
173
174
175 msgpack_sbuffer_init(&mp_sbuf);
176 msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
177
178 map_size = in_result.data.via.map.size;
179 map_size += extra_result.data.via.map.size;
180
181 msgpack_pack_map(&mp_pck, map_size);
182 map = in_result.data;
183 for (i = 0; i < map.via.map.size; i++) {
184 k = map.via.map.ptr[i].key;
185 v = map.via.map.ptr[i].val;
186 msgpack_pack_object(&mp_pck, k);
187 msgpack_pack_object(&mp_pck, v);
188 }
189
190 map = extra_result.data;
191 for (i = 0; i < map.via.map.size; i++) {
192 k = map.via.map.ptr[i].key;
193 v = map.via.map.ptr[i].val;
194 msgpack_pack_object(&mp_pck, k);
195 msgpack_pack_object(&mp_pck, v);
196 }
197
198 msgpack_unpacked_destroy(&in_result);
199 msgpack_unpacked_destroy(&extra_result);
200
201 *out_buf = mp_sbuf.data;
202 *out_size = mp_sbuf.size;
203
204 return 0;
205 }
206
207 /*
208 * Given a msgpack map, apply the parser-decoder rules defined and generate
209 * a new msgpack buffer.
210 */
flb_parser_decoder_do(struct mk_list * decoders,const char * in_buf,size_t in_size,char ** out_buf,size_t * out_size)211 int flb_parser_decoder_do(struct mk_list *decoders,
212 const char *in_buf, size_t in_size,
213 char **out_buf, size_t *out_size)
214 {
215 int i;
216 int ret;
217 int matched;
218 int is_decoded;
219 int is_decoded_as;
220 int in_type;
221 int out_type;
222 int dec_type;
223 int extra_keys = FLB_FALSE;
224 size_t off = 0;
225 char *dec_buf;
226 size_t dec_size;
227 flb_sds_t tmp_sds = NULL;
228 flb_sds_t data_sds = NULL;
229 flb_sds_t in_sds = NULL;
230 flb_sds_t out_sds = NULL;
231 struct mk_list *head;
232 struct mk_list *r_head;
233 struct flb_parser_dec *dec = NULL;
234 struct flb_parser_dec_rule *rule;
235 msgpack_object k;
236 msgpack_object v;
237 msgpack_object map;
238 msgpack_unpacked result;
239 msgpack_sbuffer mp_sbuf;
240 msgpack_packer mp_pck;
241 /* Contexts to handle extra keys to be appended at the end of the log */
242 msgpack_sbuffer extra_mp_sbuf;
243 msgpack_packer extra_mp_pck;
244
245 /* Initialize unpacker */
246 msgpack_unpacked_init(&result);
247 msgpack_unpack_next(&result, in_buf, in_size, &off);
248 map = result.data;
249
250 if (map.type != MSGPACK_OBJECT_MAP) {
251 msgpack_unpacked_destroy(&result);
252 return -1;
253 }
254
255 /*
256 * First check if any field in the record matches a decoder rule. It's
257 * better to check this before hand otherwise we need to jump directly
258 * to create a "possible new outgoing buffer".
259 */
260 matched = -1;
261 for (i = 0; i < map.via.map.size; i++) {
262 k = map.via.map.ptr[i].key;
263 if (k.type != MSGPACK_OBJECT_STR) {
264 continue;
265 }
266
267 /* Try to match this key name with decoder's rule */
268 mk_list_foreach(head, decoders) {
269 dec = mk_list_entry(head, struct flb_parser_dec, _head);
270 if (flb_sds_cmp(dec->key, k.via.str.ptr,
271 k.via.str.size) == 0) {
272 /* we have a match, stop the check */
273 matched = i;
274 break;
275 }
276 else {
277 matched = -1;
278 }
279 }
280
281 if (matched >= 0) {
282 break;
283 }
284 }
285
286 /* No matches, no need to continue */
287 if (matched == -1) {
288 msgpack_unpacked_destroy(&result);
289 return -1;
290 }
291
292 /* Create new outgoing buffer */
293 msgpack_sbuffer_init(&mp_sbuf);
294 msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
295
296 /* Register the map (same size) */
297 msgpack_pack_map(&mp_pck, map.via.map.size);
298
299 /* Compose new outgoing buffer */
300 for (i = 0; i < map.via.map.size; i++) {
301 k = map.via.map.ptr[i].key;
302 v = map.via.map.ptr[i].val;
303
304 /* Pack right away previous fields in the map */
305 if (i < matched) {
306 msgpack_pack_object(&mp_pck, k);
307 msgpack_pack_object(&mp_pck, v);
308 continue;
309 }
310
311 /* Process current key names and decoder rules */
312 if (k.type != MSGPACK_OBJECT_STR || v.type != MSGPACK_OBJECT_STR) {
313 msgpack_pack_object(&mp_pck, k);
314 msgpack_pack_object(&mp_pck, v);
315 continue;
316 }
317
318 /*
319 * Per key, we allow only one successful 'Decode_Field' and one
320 * successful 'Decode_Field_As' rules. Otherwise it may lead
321 * to duplicated entries in the final map.
322 *
323 * is_decoded => Decode_Field successul ?
324 * is_decoded_as => Decode_Field_As successful ?
325 */
326 is_decoded = FLB_FALSE;
327 is_decoded_as = FLB_FALSE;
328
329 /* Lookup for decoders associated to the current 'key' */
330 mk_list_foreach(head, decoders) {
331 dec = mk_list_entry(head, struct flb_parser_dec, _head);
332 if (flb_sds_cmp(dec->key, k.via.str.ptr,
333 k.via.str.size) == 0) {
334 break;
335 }
336 dec = NULL;
337 }
338
339 /* No decoder found, pack content */
340 if (!dec) {
341 msgpack_pack_object(&mp_pck, k);
342 msgpack_pack_object(&mp_pck, v);
343 continue;
344 }
345
346 if (!in_sds) {
347 in_sds = flb_sds_create_size(v.via.str.size);
348 if (!in_sds) {
349 break;
350 }
351 out_sds = flb_sds_create_size(v.via.str.size);
352 if (!out_sds) {
353 break;
354 }
355 data_sds = flb_sds_create_size(v.via.str.size);
356 }
357
358 /* Copy original content */
359 tmp_sds = flb_sds_copy(data_sds, v.via.str.ptr,
360 v.via.str.size);
361 if (tmp_sds != data_sds) {
362 data_sds = tmp_sds;
363 }
364
365 /*
366 * We got a match: 'key name' == 'decoder field name', validate
367 * that we have enough space in our temporary buffer.
368 */
369 if (flb_sds_alloc(dec->buffer) < flb_sds_alloc(data_sds)) {
370 /* Increase buffer size */
371 size_t diff;
372 diff = (flb_sds_alloc(data_sds) - flb_sds_alloc(dec->buffer));
373 tmp_sds = flb_sds_increase(dec->buffer, diff);
374 if (!tmp_sds) {
375 flb_errno();
376 break;
377 }
378 if (tmp_sds != dec->buffer) {
379 dec->buffer = tmp_sds;
380 }
381 }
382
383 /* Process decoder rules */
384 ret = -1;
385 dec_buf = NULL;
386
387 /*
388 * If some rule type is FLB_PARSER_DEC_DEFAULT, means that it will
389 * try to register some extra fields as part of the record. For such
390 * case we prepare a temporary buffer to hold these extra keys.
391 *
392 * The content of this buffer is just a serialized number of maps.
393 */
394 if (dec->add_extra_keys == FLB_TRUE) {
395 /* We need to clean up already allocated extra buffers */
396 if (extra_keys == FLB_TRUE) {
397 msgpack_sbuffer_destroy(&extra_mp_sbuf);
398 }
399 extra_keys = FLB_TRUE;
400 msgpack_sbuffer_init(&extra_mp_sbuf);
401 msgpack_packer_init(&extra_mp_pck, &extra_mp_sbuf,
402 msgpack_sbuffer_write);
403 }
404
405 mk_list_foreach(r_head, &dec->rules) {
406 rule = mk_list_entry(r_head, struct flb_parser_dec_rule, _head);
407
408 if (rule->type == FLB_PARSER_DEC_DEFAULT &&
409 rule->action == FLB_PARSER_ACT_DO_NEXT &&
410 is_decoded == FLB_TRUE) {
411 continue;
412 }
413
414 if (is_decoded_as == FLB_TRUE && in_type != TYPE_OUT_STRING) {
415 continue;
416 }
417
418 /* Process using defined decoder backend */
419 if (rule->backend == FLB_PARSER_DEC_JSON) {
420 ret = decode_json(dec, (char *) data_sds, flb_sds_len(data_sds),
421 &dec_buf, &dec_size, &dec_type);
422 }
423 else if (rule->backend == FLB_PARSER_DEC_ESCAPED) {
424 ret = decode_escaped(dec,
425 (char *) data_sds, flb_sds_len(data_sds),
426 &dec_buf, &dec_size, &dec_type);
427 }
428 else if (rule->backend == FLB_PARSER_DEC_ESCAPED_UTF8) {
429 ret = decode_escaped_utf8(dec,
430 (char *) data_sds, flb_sds_len(data_sds),
431 &dec_buf, &dec_size, &dec_type);
432 }
433 else if (rule->backend == FLB_PARSER_DEC_MYSQL_QUOTED) {
434 ret = decode_mysql_quoted(dec,
435 (char *) data_sds, flb_sds_len(data_sds),
436 &dec_buf, &dec_size, &dec_type);
437 }
438
439 /* Check decoder status */
440 if (ret == -1) {
441 /* Current decoder failed, should we try the next one ? */
442 if (rule->action == FLB_PARSER_ACT_TRY_NEXT ||
443 rule->action == FLB_PARSER_ACT_DO_NEXT) {
444 continue;
445 }
446
447 /* Stop: no more rules should be applied */
448 break;
449 }
450
451 /* Internal packing: replace value content in the same key */
452 if (rule->type == FLB_PARSER_DEC_AS) {
453 tmp_sds = flb_sds_copy(in_sds, dec_buf, dec_size);
454 if (tmp_sds != in_sds) {
455 in_sds = tmp_sds;
456 }
457 tmp_sds = flb_sds_copy(data_sds, dec_buf, dec_size);
458 if (tmp_sds != data_sds) {
459 data_sds = tmp_sds;
460 }
461 in_type = dec_type;
462 is_decoded_as = FLB_TRUE;
463 }
464 else if (rule->type == FLB_PARSER_DEC_DEFAULT) {
465 tmp_sds = flb_sds_copy(out_sds, dec_buf, dec_size);
466 if (tmp_sds != out_sds) {
467 out_sds = tmp_sds;
468 }
469 out_type = dec_type;
470 is_decoded = FLB_TRUE;
471 }
472
473
474 if (dec_buf != dec->buffer) {
475 flb_free(dec_buf);
476 }
477 dec_buf = NULL;
478 dec_size = 0;
479
480 /* Apply more rules ? */
481 if (rule->action == FLB_PARSER_ACT_DO_NEXT) {
482 continue;
483 }
484 break;
485 }
486
487 /* Package the key */
488 msgpack_pack_object(&mp_pck, k);
489
490 /* We need to place some value for the key in question */
491 if (is_decoded_as == FLB_TRUE) {
492 if (in_type == TYPE_OUT_STRING) {
493 msgpack_pack_str(&mp_pck, flb_sds_len(in_sds));
494 msgpack_pack_str_body(&mp_pck,
495 in_sds, flb_sds_len(in_sds));
496 }
497 else if (in_type == TYPE_OUT_OBJECT) {
498 msgpack_sbuffer_write(&mp_sbuf,
499 in_sds, flb_sds_len(in_sds));
500 }
501 }
502 else {
503 /* Pack original value */
504 msgpack_pack_object(&mp_pck, v);
505 }
506
507 /* Package as external keys */
508 if (is_decoded == FLB_TRUE) {
509 if (out_type == TYPE_OUT_STRING) {
510 flb_error("[parser_decoder] string type is not allowed");
511 }
512 else if (out_type == TYPE_OUT_OBJECT) {
513 msgpack_sbuffer_write(&extra_mp_sbuf,
514 out_sds, flb_sds_len(out_sds));
515 }
516 }
517 }
518
519 if (in_sds) {
520 flb_sds_destroy(in_sds);
521 }
522 if (out_sds) {
523 flb_sds_destroy(out_sds);
524 }
525 if (data_sds) {
526 flb_sds_destroy(data_sds);
527 }
528
529 msgpack_unpacked_destroy(&result);
530 *out_buf = mp_sbuf.data;
531 *out_size = mp_sbuf.size;
532
533 if (extra_keys == FLB_TRUE) {
534 ret = merge_record_and_extra_keys(mp_sbuf.data, mp_sbuf.size,
535 extra_mp_sbuf.data, extra_mp_sbuf.size,
536 out_buf, out_size);
537 msgpack_sbuffer_destroy(&extra_mp_sbuf);
538 if (ret == 0) {
539 msgpack_sbuffer_destroy(&mp_sbuf);
540 return 0;
541 }
542 }
543
544 return 0;
545 }
546
547 /*
548 * Iterate decoders list and lookup for an existing context for 'key_name',
549 * if it does not exists, create and link a new one
550 */
get_decoder_key_context(const char * key_name,int key_len,struct mk_list * list)551 static struct flb_parser_dec *get_decoder_key_context(const char *key_name, int key_len,
552 struct mk_list *list)
553 {
554 struct mk_list *head;
555 struct flb_parser_dec *dec = NULL;
556
557 mk_list_foreach(head, list) {
558 dec = mk_list_entry(head, struct flb_parser_dec, _head);
559
560 /* Check if the decoder matches the requested key name */
561 if (flb_sds_cmp(dec->key, key_name, key_len) != 0) {
562 dec = NULL;
563 continue;
564 }
565 else {
566 break;
567 }
568 }
569
570 if (!dec) {
571 dec = flb_malloc(sizeof(struct flb_parser_dec));
572 if (!dec) {
573 flb_errno();
574 return NULL;
575 }
576
577 dec->key = flb_sds_create_len(key_name, key_len);
578 if (!dec->key) {
579 flb_errno();
580 flb_free(dec);
581 return NULL;
582 }
583
584 dec->buffer = flb_sds_create_size(FLB_PARSER_DEC_BUF_SIZE);
585 if (!dec->buffer) {
586 flb_errno();
587 flb_sds_destroy(dec->key);
588 flb_free(dec);
589 return NULL;
590 }
591 dec->add_extra_keys = FLB_FALSE;
592 mk_list_init(&dec->rules);
593 mk_list_add(&dec->_head, list);
594 }
595
596 return dec;
597 }
598
flb_parser_decoder_list_create(struct mk_rconf_section * section)599 struct mk_list *flb_parser_decoder_list_create(struct mk_rconf_section *section)
600 {
601 int c = 0;
602 int type;
603 int backend;
604 int size;
605 struct mk_rconf_entry *entry;
606 struct mk_list *head;
607 struct mk_list *list = NULL;
608 struct mk_list *split;
609 struct flb_split_entry *decoder;
610 struct flb_split_entry *field;
611 struct flb_split_entry *action;
612 struct flb_parser_dec *dec;
613 struct flb_parser_dec_rule *dec_rule;
614
615 /* Global list to be referenced by parent parser definition */
616 list = flb_malloc(sizeof(struct mk_list));
617 if (!list) {
618 flb_errno();
619 return NULL;
620 }
621 mk_list_init(list);
622
623
624 mk_list_foreach(head, §ion->entries) {
625 entry = mk_list_entry(head, struct mk_rconf_entry, _head);
626
627 /* Lookup for specific Decode rules */
628 if (strcasecmp(entry->key, "Decode_Field") == 0) {
629 type = FLB_PARSER_DEC_DEFAULT;
630 }
631 else if (strcasecmp(entry->key, "Decode_Field_As") == 0) {
632 type = FLB_PARSER_DEC_AS;
633 }
634 else {
635 continue;
636 }
637
638 /* Split the value */
639 split = flb_utils_split(entry->val, ' ', 3);
640 if (!split) {
641 flb_error("[parser] invalid number of parameters in decoder");
642 flb_parser_decoder_list_destroy(list);
643 return NULL;
644 }
645
646 /* We expect at least two values: decoder name and target field */
647 size = mk_list_size(split);
648 if (size < 2) {
649 flb_error("[parser] invalid number of parameters in decoder");
650 flb_utils_split_free(split);
651 flb_parser_decoder_list_destroy(list);
652 return NULL;
653 }
654
655 /*
656 * Get the rule/entry references:
657 *
658 * decoder: specify the backend that handle decoding (json, escaped..)
659 * field : the 'key' where decoding should happen
660 * action : optional rules to follow on success or failure
661 */
662 decoder = mk_list_entry_first(split, struct flb_split_entry, _head);
663 field = mk_list_entry_next(&decoder->_head, struct flb_split_entry,
664 _head, list);
665 if (size >= 3) {
666 action = mk_list_entry_next(&field->_head, struct flb_split_entry,
667 _head, list);
668 }
669 else {
670 action = NULL;
671 }
672
673 /* Get decoder */
674 if (strcasecmp(decoder->value, "json") == 0) {
675 backend = FLB_PARSER_DEC_JSON;
676 }
677 else if (strcasecmp(decoder->value, "escaped") == 0) {
678 backend = FLB_PARSER_DEC_ESCAPED;
679 }
680 else if (strcasecmp(decoder->value, "escaped_utf8") == 0) {
681 backend = FLB_PARSER_DEC_ESCAPED_UTF8;
682 }
683 else if (strcasecmp(decoder->value, "mysql_quoted") == 0) {
684 backend = FLB_PARSER_DEC_MYSQL_QUOTED;
685 }
686 else {
687 flb_error("[parser] field decoder '%s' unknown", decoder->value);
688 flb_utils_split_free(split);
689 flb_parser_decoder_list_destroy(list);
690 return NULL;
691 }
692
693 /* Get the parent decoder that will hold the rules defined */
694 dec = get_decoder_key_context(field->value, strlen(field->value), list);
695 if (!dec) {
696 /* Unexpected error */
697 flb_error("[parser] unexpected error, could not get a decoder");
698 flb_utils_split_free(split);
699 flb_parser_decoder_list_destroy(list);
700 return NULL;
701 }
702
703 /* Create decoder context */
704 dec_rule = flb_calloc(1, sizeof(struct flb_parser_dec_rule));
705 if (!dec_rule) {
706 flb_errno();
707 flb_utils_split_free(split);
708 flb_parser_decoder_list_destroy(list);
709 return NULL;
710 }
711
712 if (type == FLB_PARSER_DEC_DEFAULT) {
713 dec->add_extra_keys = FLB_TRUE;
714 }
715
716 dec_rule->type = type;
717 dec_rule->backend = backend;
718 if (action) {
719 if (strcasecmp(action->value, "try_next") == 0) {
720 dec_rule->action = FLB_PARSER_ACT_TRY_NEXT;
721 }
722 else if (strcasecmp(action->value, "do_next") == 0) {
723 dec_rule->action = FLB_PARSER_ACT_DO_NEXT;
724 }
725 else {
726 dec_rule->action = FLB_PARSER_ACT_NONE;
727 }
728 }
729
730 /* Remove temporary split */
731 flb_utils_split_free(split);
732 mk_list_add(&dec_rule->_head, &dec->rules);
733 c++;
734 }
735
736 if (c == 0) {
737 flb_free(list);
738 return NULL;
739 }
740
741 return list;
742 }
743
flb_parser_decoder_list_destroy(struct mk_list * list)744 int flb_parser_decoder_list_destroy(struct mk_list *list)
745 {
746 int c = 0;
747 struct mk_list *head;
748 struct mk_list *r_head;
749 struct mk_list *tmp;
750 struct mk_list *r_tmp;
751 struct flb_parser_dec *dec;
752 struct flb_parser_dec_rule *dec_rule;
753
754 mk_list_foreach_safe(head, tmp, list) {
755 dec = mk_list_entry(head, struct flb_parser_dec, _head);
756
757 /* Destroy rules */
758 mk_list_foreach_safe(r_head, r_tmp, &dec->rules) {
759 dec_rule = mk_list_entry(r_head, struct flb_parser_dec_rule,
760 _head);
761 mk_list_del(&dec_rule->_head);
762 flb_free(dec_rule);
763 }
764
765 mk_list_del(&dec->_head);
766 flb_sds_destroy(dec->key);
767 flb_sds_destroy(dec->buffer);
768 flb_free(dec);
769 c++;
770 }
771
772 flb_free(list);
773 return c;
774 }
775