1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_utils.h>
23 #include <fluent-bit/flb_mem.h>
24 #include <fluent-bit/flb_log.h>
25 #include <fluent-bit/flb_mp.h>
26 #include <fluent-bit/flb_slist.h>
27 #include <fluent-bit/flb_record_accessor.h>
28
29 #include <msgpack.h>
30 #include <mpack/mpack.h>
31
32 /* don't do this at home */
33 #define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d)
34 #define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d)
35
flb_mp_count(const void * data,size_t bytes)36 int flb_mp_count(const void *data, size_t bytes)
37 {
38 int count = 0;
39 mpack_reader_t reader;
40
41 mpack_reader_init_data(&reader, (const char *) data, bytes);
42 while (mpack_reader_remaining(&reader, NULL) > 0) {
43 count++;
44 mpack_discard(&reader);
45 }
46
47 mpack_reader_destroy(&reader);
48 return count;
49 }
50
flb_mp_validate_chunk(const void * data,size_t bytes,int * out_records,size_t * processed_bytes)51 int flb_mp_validate_chunk(const void *data, size_t bytes,
52 int *out_records, size_t *processed_bytes)
53 {
54 int ret;
55 int count = 0;
56 size_t off = 0;
57 size_t pre_off = 0;
58 size_t ptr_size;
59 unsigned char *ptr;
60 msgpack_object array;
61 msgpack_object ts;
62 msgpack_object record;
63 msgpack_unpacked result;
64
65 msgpack_unpacked_init(&result);
66 while (msgpack_unpack_next(&result, data, bytes, &off) == MSGPACK_UNPACK_SUCCESS) {
67 array = result.data;
68
69 if (array.type != MSGPACK_OBJECT_ARRAY) {
70 /*
71 * Sometimes there is a special case: Chunks might have extra zero
72 * bytes at the end of a record, meaning: no more records. This is not
73 * an error and actually it happens if a previous run of Fluent Bit
74 * was stopped/killed before to adjust the file size.
75 *
76 * Just validate if all bytes are zero, if so, adjust counters
77 * and return zero.
78 */
79 ptr = (unsigned char *) (data);
80 ptr += pre_off;
81 if (ptr[0] != 0) {
82 goto error;
83 }
84
85 ptr_size = bytes - pre_off;
86 ret = memcmp(ptr, ptr + 1, ptr_size - 1);
87 if (ret == 0) {
88 /*
89 * The chunk is valid, just let the caller know the last processed
90 * valid byte.
91 */
92 msgpack_unpacked_destroy(&result);
93 *out_records = count;
94 *processed_bytes = pre_off;
95 return 0;
96 }
97 goto error;
98 }
99
100 if (array.via.array.size != 2) {
101 goto error;
102 }
103
104 ts = array.via.array.ptr[0];
105 record = array.via.array.ptr[1];
106
107 if (ts.type != MSGPACK_OBJECT_POSITIVE_INTEGER &&
108 ts.type != MSGPACK_OBJECT_FLOAT &&
109 ts.type != MSGPACK_OBJECT_EXT) {
110 goto error;
111 }
112
113 if (record.type != MSGPACK_OBJECT_MAP) {
114 goto error;
115 }
116
117 count++;
118 pre_off = off;
119 }
120
121 msgpack_unpacked_destroy(&result);
122 *out_records = count;
123 *processed_bytes = pre_off;
124 return 0;
125
126 error:
127 msgpack_unpacked_destroy(&result);
128 *out_records = count;
129 *processed_bytes = pre_off;
130 return -1;
131 }
132
133 /* Adjust a mspack header buffer size */
flb_mp_set_map_header_size(char * buf,int size)134 void flb_mp_set_map_header_size(char *buf, int size)
135 {
136 uint8_t h;
137 char *tmp = buf;
138
139 h = tmp[0];
140 if (h >> 4 == 0x8) { /* 1000xxxx */
141 *tmp = (uint8_t) 0x8 << 4 | ((uint8_t) size);
142 }
143 else if (h == 0xde) {
144 tmp++;
145 pack_uint16(tmp, size);
146 }
147 else if (h == 0xdf) {
148 tmp++;
149 pack_uint32(tmp, size);
150 }
151 }
152
flb_mp_set_array_header_size(char * buf,int size)153 void flb_mp_set_array_header_size(char *buf, int size)
154 {
155 uint8_t h;
156 char *tmp = buf;
157
158 h = tmp[0];
159 if (h >> 4 == 0x9) { /* 1001xxxx */
160 *tmp = (uint8_t) 0x9 << 4 | ((uint8_t) size);
161 }
162 else if (h == 0xdc) {
163 tmp++;
164 pack_uint16(tmp, size);
165 }
166 else if (h == 0xdd) {
167 tmp++;
168 pack_uint32(tmp, size);
169 }
170 }
171
172 /*
173 * msgpack-c requires to set the number of the entries in a map beforehand. For our
174 * use case this adds some complexity, having developers to count all possible
175 * entries that might be added.
176 *
177 * As a workaround and to avoid map's recomposition over and over, this simple API
178 * allows to initialize the array header, 'register' new entries (as counters) and
179 * finalize, upon finalization the proper array header size is adjusted.
180 *
181 * To make things easier, we make sure msgpack-c always register an array type of
182 * 32 bits (identified by 0xdf, for number of entries >= 65536). Yes, for every
183 * array using this API it will use 2 more bytes, not a big ideal. So whoever
184 * uses this API, use it only if you don't know the exact number of entries to add.
185 *
186 * MANDATORY: make sure to always initialize, register every entry and finalize,
187 * otherwise you will get a corrupted or incomplete msgpack buffer.
188 *
189 * Usage example
190 * =============
191 *
192 * struct flb_mp_map_header mh;
193 *
194 * flb_mp_map_header_init(&mh, mp_pck);
195 *
196 * -- First key/value entry --
197 * flb_mp_map_header_append(&mh);
198 * msgpack_pack_str(mp_pck, 4);
199 * msgpack_pack_str_body(mp_pck, "cool", 4);
200 * msgpack_pack_true(mp_pck);
201 *
202 * -- Second key/value entry --
203 * flb_mp_map_header_append(&mh);
204 * msgpack_pack_str(mp_pck, 4);
205 * msgpack_pack_str_body(mp_pck, "slow", 4);
206 * msgpack_pack_false(mp_pck);
207 *
208 * -- Finalize Map --
209 * flb_mp_map_header_end(&mh);
210 */
211
mp_header_type_init(struct flb_mp_map_header * mh,msgpack_packer * mp_pck,int type)212 static inline void mp_header_type_init(struct flb_mp_map_header *mh,
213 msgpack_packer *mp_pck,
214 int type)
215 {
216 msgpack_sbuffer *mp_sbuf;
217
218 mp_sbuf = (msgpack_sbuffer *) mp_pck->data;
219
220 /* map sbuffer */
221 mh->data = mp_pck->data;
222
223 /* Reset entries */
224 mh->entries = 0;
225
226 /* Store the next byte available */
227 mh->offset = mp_sbuf->size;
228 }
229
flb_mp_map_header_init(struct flb_mp_map_header * mh,msgpack_packer * mp_pck)230 int flb_mp_map_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
231 {
232 /* Initialize context for a map */
233 mp_header_type_init(mh, mp_pck, FLB_MP_MAP);
234
235 /*
236 * Pack a map with size = 65536, so we force the underlaying msgpack-c
237 * to use a 32 bit buffer size (0xdf), reference:
238 *
239 * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family
240 */
241 return msgpack_pack_map(mp_pck, 65536);
242 }
243
flb_mp_array_header_init(struct flb_mp_map_header * mh,msgpack_packer * mp_pck)244 int flb_mp_array_header_init(struct flb_mp_map_header *mh, msgpack_packer *mp_pck)
245 {
246 /* Initialize context for a map */
247 mp_header_type_init(mh, mp_pck, FLB_MP_ARRAY);
248
249 /*
250 * Pack a map with size = 65536, so we force the underlaying msgpack-c
251 * to use a 32 bit buffer size (0xdf), reference:
252 *
253 * - https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family
254 */
255 return msgpack_pack_array(mp_pck, 65536);
256 }
257
258
flb_mp_map_header_append(struct flb_mp_map_header * mh)259 int flb_mp_map_header_append(struct flb_mp_map_header *mh)
260 {
261 mh->entries++;
262 return mh->entries;
263 }
264
flb_mp_array_header_append(struct flb_mp_map_header * mh)265 int flb_mp_array_header_append(struct flb_mp_map_header *mh)
266 {
267 mh->entries++;
268 return mh->entries;
269 }
270
flb_mp_map_header_end(struct flb_mp_map_header * mh)271 void flb_mp_map_header_end(struct flb_mp_map_header *mh)
272 {
273 char *ptr;
274 msgpack_sbuffer *mp_sbuf;
275
276 mp_sbuf = mh->data;
277 ptr = (char *) mp_sbuf->data + mh->offset;
278 flb_mp_set_map_header_size(ptr, mh->entries);
279 }
280
flb_mp_array_header_end(struct flb_mp_map_header * mh)281 void flb_mp_array_header_end(struct flb_mp_map_header *mh)
282 {
283 char *ptr;
284 msgpack_sbuffer *mp_sbuf;
285
286 mp_sbuf = mh->data;
287 ptr = (char *) mp_sbuf->data + mh->offset;
288 flb_mp_set_array_header_size(ptr, mh->entries);
289 }
290
291 /*
292 * Create an 'mp accessor' context: this context allows to create a list of
293 * record accessor patterns based on a 'slist' context, where every slist string
294 * buffer represents a key accessor.
295 */
flb_mp_accessor_create(struct mk_list * slist_patterns)296 struct flb_mp_accessor *flb_mp_accessor_create(struct mk_list *slist_patterns)
297 {
298 size_t size;
299 struct mk_list *head;
300 struct flb_slist_entry *entry;
301 struct flb_record_accessor *ra;
302 struct flb_mp_accessor *mpa;
303
304 /* Allocate context */
305 mpa = flb_calloc(1, sizeof(struct flb_mp_accessor));
306 if (!mpa) {
307 flb_errno();
308 return NULL;
309 }
310 mk_list_init(&mpa->ra_list);
311
312 mk_list_foreach(head, slist_patterns) {
313 entry = mk_list_entry(head, struct flb_slist_entry, _head);
314
315 /* Create the record accessor context */
316 ra = flb_ra_create(entry->str, FLB_TRUE);
317 if (!ra) {
318 flb_error("[mp accessor] could not create entry for pattern '%s'",
319 entry->str);
320 flb_mp_accessor_destroy(mpa);
321 return NULL;
322 }
323 mk_list_add(&ra->_head, &mpa->ra_list);
324 }
325
326 if (mk_list_size(&mpa->ra_list) == 0) {
327 return mpa;
328 }
329
330 size = sizeof(struct flb_mp_accessor_match) * mk_list_size(&mpa->ra_list);
331 mpa->matches_size = size;
332 mpa->matches = flb_calloc(1, size);
333 if (!mpa->matches) {
334 flb_errno();
335 flb_mp_accessor_destroy(mpa);
336 return NULL;
337 }
338
339 return mpa;
340 }
341
accessor_key_find_match(struct flb_mp_accessor * mpa,msgpack_object * key)342 static inline int accessor_key_find_match(struct flb_mp_accessor *mpa,
343 msgpack_object *key)
344 {
345 int i;
346 int count;
347 struct flb_mp_accessor_match *match;
348
349 count = mk_list_size(&mpa->ra_list);
350 for (i = 0; i < count; i++) {
351 match = &mpa->matches[i];
352 if (match->matched == FLB_FALSE) {
353 continue;
354 }
355
356 if (match->start_key == key) {
357 return i;
358 }
359 }
360
361 return -1;
362 }
363
accessor_sub_pack(struct flb_mp_accessor_match * match,msgpack_packer * mp_pck,msgpack_object * key,msgpack_object * val)364 static inline int accessor_sub_pack(struct flb_mp_accessor_match *match,
365 msgpack_packer *mp_pck,
366 msgpack_object *key,
367 msgpack_object *val)
368 {
369 int i;
370 int ret;
371 msgpack_object *k;
372 msgpack_object *v;
373 struct flb_mp_map_header mh;
374
375 if (match->key == key || match->key == val) {
376 return FLB_FALSE;
377 }
378
379 if (key) {
380 msgpack_pack_object(mp_pck, *key);
381 }
382
383 if (val->type == MSGPACK_OBJECT_MAP) {
384 flb_mp_map_header_init(&mh, mp_pck);
385 for (i = 0; i < val->via.map.size; i++) {
386 k = &val->via.map.ptr[i].key;
387 v = &val->via.map.ptr[i].val;
388
389 ret = accessor_sub_pack(match, mp_pck, k, v);
390 if (ret == FLB_TRUE) {
391 flb_mp_map_header_append(&mh);
392 }
393 }
394 flb_mp_map_header_end(&mh);
395 }
396 else if (val->type == MSGPACK_OBJECT_ARRAY) {
397 flb_mp_array_header_init(&mh, mp_pck);
398 for (i = 0; i < val->via.array.size; i++) {
399 v = &val->via.array.ptr[i];
400 ret = accessor_sub_pack(match, mp_pck, NULL, v);
401 if (ret == FLB_TRUE) {
402 flb_mp_array_header_append(&mh);
403 }
404 }
405 flb_mp_array_header_end(&mh);
406 }
407 else {
408 msgpack_pack_object(mp_pck, *val);
409 }
410
411 return FLB_TRUE;
412 }
413
414 /*
415 * Remove keys or nested keys from a map. It compose the final result in a
416 * new buffer. On error, it returns -1, if the map was modified it returns FLB_TRUE,
417 * if no modification was required it returns FLB_FALSE.
418 */
flb_mp_accessor_keys_remove(struct flb_mp_accessor * mpa,msgpack_object * map,void ** out_buf,size_t * out_size)419 int flb_mp_accessor_keys_remove(struct flb_mp_accessor *mpa,
420 msgpack_object *map,
421 void **out_buf, size_t *out_size)
422 {
423 int i;
424 int ret;
425 int rule_id = 0;
426 int matches = 0;
427 msgpack_object *key;
428 msgpack_object *val;
429 msgpack_object *s_key;
430 msgpack_object *o_key;
431 msgpack_object *o_val;
432 struct mk_list *head;
433 struct flb_record_accessor *ra;
434 struct flb_mp_accessor_match *match;
435 struct flb_mp_map_header mh;
436 msgpack_sbuffer mp_sbuf;
437 msgpack_packer mp_pck;
438
439 if (map->via.map.size == 0) {
440 return FLB_FALSE;
441 }
442
443 /* Reset matches cache */
444 memset(mpa->matches, '\0', mpa->matches_size);
445
446 mk_list_foreach(head, &mpa->ra_list) {
447 ra = mk_list_entry(head, struct flb_record_accessor, _head);
448
449 /* Apply the record accessor rule against the map */
450 ret = flb_ra_get_kv_pair(ra, *map, &s_key, &o_key, &o_val);
451 if (ret == 0) {
452 /* There is a match, register in the matches table */
453 match = &mpa->matches[rule_id];
454 match->matched = FLB_TRUE;
455 match->start_key = s_key; /* Initial key path that matched */
456 match->key = o_key; /* Final key that matched */
457 match->val = o_val; /* Final value */
458 match->ra = ra; /* Record accessor context */
459 matches++;
460 }
461 rule_id++;
462 }
463
464 /* If no matches, no modifications were made */
465 if (matches == 0) {
466 return FLB_FALSE;
467 }
468
469 /* Some rules matched, compose a new outgoing buffer */
470 msgpack_sbuffer_init(&mp_sbuf);
471 msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
472
473 /* Initialize map */
474 flb_mp_map_header_init(&mh, &mp_pck);
475
476 for (i = 0; i < map->via.map.size; i++) {
477 key = &map->via.map.ptr[i].key;
478 val = &map->via.map.ptr[i].val;
479
480 /*
481 * For every entry on the path, check if we should do a step-by-step
482 * repackaging or just pack the whole object.
483 *
484 * Just check: does this 'key' exists on any path of the record
485 * accessor patterns ?
486 *
487 * Find if the active key in the map, matches an accessor rule, if
488 * if match we get the match id as return value, otherwise -1.
489 */
490 ret = accessor_key_find_match(mpa, key);
491 if (ret == -1) {
492 /* No matches, it's ok to pack the kv pair */
493 flb_mp_map_header_append(&mh);
494 msgpack_pack_object(&mp_pck, *key);
495 msgpack_pack_object(&mp_pck, *val);
496 }
497 else {
498 /* The key has a match. Now we do a step-by-step packaging */
499 match = &mpa->matches[ret];
500 ret = accessor_sub_pack(match, &mp_pck, key, val);
501 if (ret == FLB_TRUE) {
502 flb_mp_map_header_append(&mh);
503 }
504 }
505 }
506 flb_mp_map_header_end(&mh);
507
508 *out_buf = mp_sbuf.data;
509 *out_size = mp_sbuf.size;
510
511 return FLB_TRUE;
512 }
513
flb_mp_accessor_destroy(struct flb_mp_accessor * mpa)514 void flb_mp_accessor_destroy(struct flb_mp_accessor *mpa)
515 {
516 struct mk_list *tmp;
517 struct mk_list *head;
518 struct flb_record_accessor *ra;
519
520 if (!mpa) {
521 return;
522 }
523
524 mk_list_foreach_safe(head, tmp, &mpa->ra_list) {
525 ra = mk_list_entry(head, struct flb_record_accessor, _head);
526 mk_list_del(&ra->_head);
527 flb_ra_destroy(ra);
528 }
529
530 if (mpa->matches) {
531 flb_free(mpa->matches);
532 }
533 flb_free(mpa);
534 }
535