1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_utils.h>
23 #include <fluent-bit/flb_mem.h>
24 #include <fluent-bit/flb_log.h>
25 #include <fluent-bit/flb_mp.h>
26 #include <fluent-bit/flb_slist.h>
27 #include <fluent-bit/flb_record_accessor.h>
28 
29 #include <msgpack.h>
30 #include <mpack/mpack.h>
31 
32 /* don't do this at home */
33 #define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d)
34 #define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d)
35 
flb_mp_count(const void * data,size_t bytes)36 int flb_mp_count(const void *data, size_t bytes)
37 {
38     int count = 0;
39     mpack_reader_t reader;
40 
41     mpack_reader_init_data(&reader, (const char *) data, bytes);
42     while (mpack_reader_remaining(&reader, NULL) > 0) {
43         count++;
44         mpack_discard(&reader);
45     }
46 
47     mpack_reader_destroy(&reader);
48     return count;
49 }
50 
flb_mp_validate_chunk(const void * data,size_t bytes,int * out_records,size_t * processed_bytes)51 int flb_mp_validate_chunk(const void *data, size_t bytes,
52                           int *out_records, size_t *processed_bytes)
53 {
54     int ret;
55     int count = 0;
56     size_t off = 0;
57     size_t pre_off = 0;
58     size_t ptr_size;
59     unsigned char *ptr;
60     msgpack_object array;
61     msgpack_object ts;
62     msgpack_object record;
63     msgpack_unpacked result;
64 
65     msgpack_unpacked_init(&result);
66     while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
67         array = result.data;
68 
69         if (array.type != MSGPACK_OBJECT_ARRAY) {
70             /*
71              * Sometimes there is a special case: Chunks might have extra zero
72              * bytes at the end of a record, meaning: no more records. This is not
73              * an error and actually it happens if a previous run of Fluent Bit
74              * was stopped/killed before to adjust the file size.
75              *
76              * Just validate if all bytes are zero, if so, adjust counters
77              * and return zero.
78              */
79             ptr = (unsigned char *) (data);
80             ptr += pre_off;
81             if (ptr[0] != 0) {
82                 goto error;
83             }
84 
85             ptr_size = bytes - pre_off;
86             ret = memcmp(ptr, ptr + 1, ptr_size - 1);
87             if (ret == 0) {
88                 /*
89                  * The chunk is valid, just let the caller know the last processed
90                  * valid byte.
91                  */
92                 msgpack_unpacked_destroy(&result);
93                 *out_records = count;
94                 *processed_bytes = pre_off;
95                 return 0;
96             }
97             goto error;
98         }
99 
100         if (array.via.array.size != 2) {
101             goto error;
102         }
103 
104         ts = array.via.array.ptr[0];
105         record = array.via.array.ptr[1];
106 
107         if (ts.type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
108             ts.type != MSGPACK_OBJECT_FLOAT &&
109             ts.type != MSGPACK_OBJECT_EXT) {
110             goto error;
111         }
112 
113         if (record.type != MSGPACK_OBJECT_MAP) {
114             goto error;
115         }
116 
117         count++;
118         pre_off = off;
119     }
120 
121     msgpack_unpacked_destroy(&result);
122     *out_records = count;
123     *processed_bytes = pre_off;
124     return 0;
125 
126  error:
127     msgpack_unpacked_destroy(&result);
128     *out_records = count;
129     *processed_bytes = pre_off;
130     return -1;
131 }
132 
133 /* Adjust a mspack header buffer size */
flb_mp_set_map_header_size(char * buf,int size)134 void flb_mp_set_map_header_size(char *buf, int size)
135 {
136     uint8_t h;
137     char *tmp = buf;
138 
139     h = tmp[0];
140     if (h >> 4 == 0x8) { /* 1000xxxx */
141         *tmp = (uint8_t) 0x8 << 4 | ((uint8_t) size);
142     }
143     else if (h == 0xde) {
144         tmp++;
145         pack_uint16(tmp, size);
146     }
147     else if (h == 0xdf) {
148         tmp++;
149         pack_uint32(tmp, size);
150     }
151 }
152 
flb_mp_set_array_header_size(char * buf,int size)153 void flb_mp_set_array_header_size(char *buf, int size)
154 {
155     uint8_t h;
156     char *tmp = buf;
157 
158     h = tmp[0];
159     if (h >> 4 == 0x9) { /* 1001xxxx */
160         *tmp = (uint8_t) 0x9 << 4 | ((uint8_t) size);
161     }
162     else if (h == 0xdc) {
163         tmp++;
164         pack_uint16(tmp, size);
165     }
166     else if (h == 0xdd) {
167         tmp++;
168         pack_uint32(tmp, size);
169     }
170 }
171 
172 /*
173  * msgpack-c requires to set the number of the entries in a map beforehand. For our
174  * use case this adds some complexity, having developers to count all possible
175  * entries that might be added.
176  *
177  * As a workaround and to avoid map's recomposition over and over, this simple API
178  * allows to initialize the array header, 'register' new entries (as counters) and
179  * finalize, upon finalization the proper array header size is adjusted.
180  *
181  * To make things easier, we make sure msgpack-c always register an array type of
182  * 32 bits (identified by 0xdf, for number of entries >= 65536). Yes, for every
183  * array using this API it will use 2 more bytes, not a big ideal. So whoever
184  * uses this API, use it only if you don't know the exact number of entries to add.
185  *
186  * MANDATORY: make sure to always initialize, register every entry and finalize,
187  * otherwise you will get a corrupted or incomplete msgpack buffer.
188  *
189  * Usage example
190  * =============
191  *
192  *  struct flb_mp_map_header mh;
193  *
194  *  flb_mp_map_header_init(&mh, mp_pck);
195  *
196  *  -- First key/value entry --
197  *  flb_mp_map_header_append(&mh);
198  *  msgpack_pack_str(mp_pck, 4);
199  *  msgpack_pack_str_body(mp_pck, "cool", 4);
200  *  msgpack_pack_true(mp_pck);
201  *
202  *  -- Second key/value entry --
203  *  flb_mp_map_header_append(&mh);
204  *  msgpack_pack_str(mp_pck, 4);
205  *  msgpack_pack_str_body(mp_pck, "slow", 4);
206  *  msgpack_pack_false(mp_pck);
207  *
208  *  -- Finalize Map --
209  *  flb_mp_map_header_end(&mh);
210  */
211 
mp_header_type_init(struct flb_mp_map_header * mh,msgpack_packer * mp_pck,int type)212 static inline void mp_header_type_init(struct flb_mp_map_header *mh,
213                                        msgpack_packer *mp_pck,
214                                        int type)
215 {
216     msgpack_sbuffer *mp_sbuf;
217 
218     mp_sbuf = (msgpack_sbuffer *) mp_pck->data;
219 
220     /* map sbuffer */
221     mh->data = mp_pck->data;
222 
223     /* Reset entries */
224     mh->entries = 0;
225 
226     /* Store the next byte available */
227     mh->offset = mp_sbuf->size;
228 }
229 
flb_mp_map_header_init(struct flb_mp_map_header * mh,msgpack_packer * mp_pck)230 int flb_mp_map_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
231 {
232     /* Initialize context for a map */
233     mp_header_type_init(mh, mp_pck, FLB_MP_MAP);
234 
235     /*
236      * Pack a map with size = 65536, so we force the underlaying msgpack-c
237      * to use a 32 bit buffer size (0xdf), reference:
238      *
239      * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family
240      */
241     return msgpack_pack_map(mp_pck, 65536);
242 }
243 
flb_mp_array_header_init(struct flb_mp_map_header * mh,msgpack_packer * mp_pck)244 int flb_mp_array_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
245 {
246     /* Initialize context for a map */
247     mp_header_type_init(mh, mp_pck, FLB_MP_ARRAY);
248 
249     /*
250      * Pack a map with size = 65536, so we force the underlaying msgpack-c
251      * to use a 32 bit buffer size (0xdf), reference:
252      *
253      * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family
254      */
255     return msgpack_pack_array(mp_pck, 65536);
256 }
257 
258 
flb_mp_map_header_append(struct flb_mp_map_header * mh)259 int flb_mp_map_header_append(struct flb_mp_map_header *mh)
260 {
261     mh->entries++;
262     return mh->entries;
263 }
264 
flb_mp_array_header_append(struct flb_mp_map_header * mh)265 int flb_mp_array_header_append(struct flb_mp_map_header *mh)
266 {
267     mh->entries++;
268     return mh->entries;
269 }
270 
flb_mp_map_header_end(struct flb_mp_map_header * mh)271 void flb_mp_map_header_end(struct flb_mp_map_header *mh)
272 {
273     char *ptr;
274     msgpack_sbuffer *mp_sbuf;
275 
276     mp_sbuf = mh->data;
277     ptr = (char *) mp_sbuf->data + mh->offset;
278     flb_mp_set_map_header_size(ptr, mh->entries);
279 }
280 
flb_mp_array_header_end(struct flb_mp_map_header * mh)281 void flb_mp_array_header_end(struct flb_mp_map_header *mh)
282 {
283     char *ptr;
284     msgpack_sbuffer *mp_sbuf;
285 
286     mp_sbuf = mh->data;
287     ptr = (char *) mp_sbuf->data + mh->offset;
288     flb_mp_set_array_header_size(ptr, mh->entries);
289 }
290 
291 /*
292  * Create an 'mp accessor' context: this context allows to create a list of
293  * record accessor patterns based on a 'slist' context, where every slist string
294  * buffer represents a key accessor.
295  */
flb_mp_accessor_create(struct mk_list * slist_patterns)296 struct flb_mp_accessor *flb_mp_accessor_create(struct mk_list *slist_patterns)
297 {
298     size_t size;
299     struct mk_list *head;
300     struct flb_slist_entry *entry;
301     struct flb_record_accessor *ra;
302     struct flb_mp_accessor *mpa;
303 
304     /* Allocate context */
305     mpa = flb_calloc(1, sizeof(struct flb_mp_accessor));
306     if (!mpa) {
307         flb_errno();
308         return NULL;
309     }
310     mk_list_init(&mpa->ra_list);
311 
312     mk_list_foreach(head, slist_patterns) {
313         entry = mk_list_entry(head, struct flb_slist_entry, _head);
314 
315         /* Create the record accessor context */
316         ra = flb_ra_create(entry->str, FLB_TRUE);
317         if (!ra) {
318             flb_error("[mp accessor] could not create entry for pattern '%s'",
319                       entry->str);
320             flb_mp_accessor_destroy(mpa);
321             return NULL;
322         }
323         mk_list_add(&ra->_head, &mpa->ra_list);
324     }
325 
326     if (mk_list_size(&mpa->ra_list) == 0) {
327         return mpa;
328     }
329 
330     size = sizeof(struct flb_mp_accessor_match) * mk_list_size(&mpa->ra_list);
331     mpa->matches_size = size;
332     mpa->matches = flb_calloc(1, size);
333     if (!mpa->matches) {
334         flb_errno();
335         flb_mp_accessor_destroy(mpa);
336         return NULL;
337     }
338 
339     return mpa;
340 }
341 
accessor_key_find_match(struct flb_mp_accessor * mpa,msgpack_object * key)342 static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
343                                           msgpack_object *key)
344 {
345     int i;
346     int count;
347     struct flb_mp_accessor_match *match;
348 
349     count = mk_list_size(&mpa->ra_list);
350     for (i = 0; i < count; i++) {
351         match = &mpa->matches[i];
352         if (match->matched == FLB_FALSE) {
353             continue;
354         }
355 
356         if (match->start_key == key) {
357             return i;
358         }
359     }
360 
361     return -1;
362 }
363 
accessor_sub_pack(struct flb_mp_accessor_match * match,msgpack_packer * mp_pck,msgpack_object * key,msgpack_object * val)364 static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
365                                     msgpack_packer *mp_pck,
366                                     msgpack_object *key,
367                                     msgpack_object *val)
368 {
369     int i;
370     int ret;
371     msgpack_object *k;
372     msgpack_object *v;
373     struct flb_mp_map_header mh;
374 
375     if (match->key == key || match->key == val) {
376         return FLB_FALSE;
377     }
378 
379     if (key) {
380         msgpack_pack_object(mp_pck, *key);
381     }
382 
383     if (val->type == MSGPACK_OBJECT_MAP) {
384         flb_mp_map_header_init(&mh, mp_pck);
385         for (i = 0; i < val->via.map.size; i++) {
386             k = &val->via.map.ptr[i].key;
387             v = &val->via.map.ptr[i].val;
388 
389             ret = accessor_sub_pack(match, mp_pck, k, v);
390             if (ret == FLB_TRUE) {
391                 flb_mp_map_header_append(&mh);
392             }
393         }
394         flb_mp_map_header_end(&mh);
395     }
396     else if (val->type == MSGPACK_OBJECT_ARRAY) {
397         flb_mp_array_header_init(&mh, mp_pck);
398         for (i = 0; i < val->via.array.size; i++) {
399             v = &val->via.array.ptr[i];
400             ret = accessor_sub_pack(match, mp_pck, NULL, v);
401             if (ret == FLB_TRUE) {
402                 flb_mp_array_header_append(&mh);
403             }
404         }
405         flb_mp_array_header_end(&mh);
406     }
407     else {
408         msgpack_pack_object(mp_pck, *val);
409     }
410 
411     return FLB_TRUE;
412 }
413 
414 /*
415  * Remove keys or nested keys from a map. It compose the final result in a
416  * new buffer. On error, it returns -1, if the map was modified it returns FLB_TRUE,
417  * if no modification was required it returns FLB_FALSE.
418  */
flb_mp_accessor_keys_remove(struct flb_mp_accessor * mpa,msgpack_object * map,void ** out_buf,size_t * out_size)419 int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
420                                 msgpack_object *map,
421                                 void **out_buf, size_t *out_size)
422 {
423     int i;
424     int ret;
425     int rule_id = 0;
426     int matches = 0;
427     msgpack_object *key;
428     msgpack_object *val;
429     msgpack_object *s_key;
430     msgpack_object *o_key;
431     msgpack_object *o_val;
432     struct mk_list *head;
433     struct flb_record_accessor *ra;
434     struct flb_mp_accessor_match *match;
435     struct flb_mp_map_header mh;
436     msgpack_sbuffer mp_sbuf;
437     msgpack_packer mp_pck;
438 
439     if (map->via.map.size == 0) {
440         return FLB_FALSE;
441     }
442 
443     /* Reset matches cache */
444     memset(mpa->matches, '\0', mpa->matches_size);
445 
446     mk_list_foreach(head, &mpa->ra_list) {
447         ra = mk_list_entry(head, struct flb_record_accessor, _head);
448 
449         /* Apply the record accessor rule against the map */
450         ret = flb_ra_get_kv_pair(ra, *map, &s_key, &o_key, &o_val);
451         if (ret == 0) {
452             /* There is a match, register in the matches table */
453             match = &mpa->matches[rule_id];
454             match->matched = FLB_TRUE;
455             match->start_key = s_key;        /* Initial key path that matched */
456             match->key = o_key;              /* Final key that matched */
457             match->val = o_val;              /* Final value */
458             match->ra = ra;                  /* Record accessor context */
459             matches++;
460         }
461         rule_id++;
462     }
463 
464     /* If no matches, no modifications were made */
465     if (matches == 0) {
466         return FLB_FALSE;
467     }
468 
469     /* Some rules matched, compose a new outgoing buffer */
470     msgpack_sbuffer_init(&mp_sbuf);
471     msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
472 
473     /* Initialize map */
474     flb_mp_map_header_init(&mh, &mp_pck);
475 
476     for (i = 0; i < map->via.map.size; i++) {
477         key = &map->via.map.ptr[i].key;
478         val = &map->via.map.ptr[i].val;
479 
480         /*
481          * For every entry on the path, check if we should do a step-by-step
482          * repackaging or just pack the whole object.
483          *
484          * Just check: does this 'key' exists on any path of the record
485          * accessor patterns ?
486          *
487          * Find if the active key in the map, matches an accessor rule, if
488          * if match we get the match id as return value, otherwise -1.
489          */
490         ret = accessor_key_find_match(mpa, key);
491         if (ret == -1) {
492             /* No matches, it's ok to pack the kv pair */
493             flb_mp_map_header_append(&mh);
494             msgpack_pack_object(&mp_pck, *key);
495             msgpack_pack_object(&mp_pck, *val);
496         }
497         else {
498             /* The key has a match. Now we do a step-by-step packaging */
499             match = &mpa->matches[ret];
500             ret = accessor_sub_pack(match, &mp_pck, key, val);
501             if (ret == FLB_TRUE) {
502                 flb_mp_map_header_append(&mh);
503             }
504         }
505     }
506     flb_mp_map_header_end(&mh);
507 
508     *out_buf = mp_sbuf.data;
509     *out_size = mp_sbuf.size;
510 
511     return FLB_TRUE;
512 }
513 
flb_mp_accessor_destroy(struct flb_mp_accessor * mpa)514 void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa)
515 {
516     struct mk_list *tmp;
517     struct mk_list *head;
518     struct flb_record_accessor *ra;
519 
520     if (!mpa) {
521         return;
522     }
523 
524     mk_list_foreach_safe(head, tmp, &mpa->ra_list) {
525         ra = mk_list_entry(head, struct flb_record_accessor, _head);
526         mk_list_del(&ra->_head);
527         flb_ra_destroy(ra);
528     }
529 
530     if (mpa->matches) {
531         flb_free(mpa->matches);
532     }
533     flb_free(mpa);
534 }
535