1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 #include <fluent-bit.h>
4 #include <fluent-bit/flb_time.h>
5 #include <fluent-bit/flb_parser.h>
6 #include "flb_tests_runtime.h"
7
8 pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER;
9 char *output = NULL;
10
set_output(char * val)11 void set_output(char *val)
12 {
13 pthread_mutex_lock(&result_mutex);
14 output = val;
15 pthread_mutex_unlock(&result_mutex);
16 }
17
get_output(void)18 char *get_output(void)
19 {
20 char *val;
21
22 pthread_mutex_lock(&result_mutex);
23 val = output;
24 pthread_mutex_unlock(&result_mutex);
25
26 return val;
27 }
28
callback_test(void * data,size_t size,void * cb_data)29 int callback_test(void* data, size_t size, void* cb_data)
30 {
31 if (size > 0) {
32 flb_debug("[test_filter_parser] received message: %s", data);
33 set_output(data); /* success */
34 }
35 return 0;
36 }
37
flb_test_filter_parser_extract_fields()38 void flb_test_filter_parser_extract_fields()
39 {
40 int ret;
41 int bytes;
42 char *p, *output, *expected;
43 flb_ctx_t *ctx;
44 int in_ffd;
45 int out_ffd;
46 int filter_ffd;
47 struct flb_parser *parser;
48
49 struct flb_lib_out_cb cb;
50 cb.cb = callback_test;
51 cb.data = NULL;
52
53 ctx = flb_create();
54
55 /* Configure service */
56 flb_service_set(ctx, "Flush", "1", "Grace" "1", "Log_Level", "debug", NULL);
57
58 /* Input */
59 in_ffd = flb_input(ctx, (char *) "lib", NULL);
60 TEST_CHECK(in_ffd >= 0);
61 flb_input_set(ctx, in_ffd,
62 "Tag", "test",
63 NULL);
64
65 /* Parser */
66 parser = flb_parser_create("dummy_test", "regex", "^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$",
67 NULL, NULL, NULL, MK_FALSE, MK_TRUE, NULL, 0,
68 NULL, ctx->config);
69 TEST_CHECK(parser != NULL);
70
71 /* Filter */
72 filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
73 TEST_CHECK(filter_ffd >= 0);
74 ret = flb_filter_set(ctx, filter_ffd,
75 "Match", "test",
76 "Key_Name", "data",
77 "Parser", "dummy_test",
78 "Reserve_Data", "On",
79 "Preserve_Key", "Off",
80 NULL);
81 TEST_CHECK(ret == 0);
82
83 /* Output */
84 out_ffd = flb_output(ctx, (char *) "lib", &cb);
85 TEST_CHECK(out_ffd >= 0);
86 flb_output_set(ctx, out_ffd,
87 "Match", "*",
88 "format", "json",
89 NULL);
90
91 /* Start the engine */
92 ret = flb_start(ctx);
93 TEST_CHECK(ret == 0);
94
95 /* Ingest data */
96 p = "[1448403340, {\"data\":\"100 0.5 true This is an example\", \"extra\":\"Some more data\"}]";
97 bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
98 TEST_CHECK(bytes == strlen(p));
99
100 flb_time_msleep(1500); /* waiting flush */
101 output = get_output(); /* 1sec passed, data should be flushed */
102 TEST_CHECK_(output != NULL, "Expected output to not be NULL");
103 if (output != NULL) {
104 /* check timestamp */
105 expected = "[1448403340.000000,{";
106 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
107 /* check fields were extracted */
108 expected = "\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"";
109 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
110 /* check original field was not preserved */
111 expected = "\"data\":\"100 0.5 true This is an example\"";
112 TEST_CHECK_(strstr(output, expected) == NULL, "Expected output to not contain '%s', got '%s'", expected, output);
113 /* check extra data was preserved */
114 expected = "\"extra\":\"Some more data\"";
115 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to preserve extra field, got '%s'", output);
116 free(output);
117 }
118
119 flb_stop(ctx);
120 flb_destroy(ctx);
121 }
122
flb_test_filter_parser_reserve_data_off()123 void flb_test_filter_parser_reserve_data_off()
124 {
125 int ret;
126 int bytes;
127 char *p, *output, *expected;
128 flb_ctx_t *ctx;
129 int in_ffd;
130 int out_ffd;
131 int filter_ffd;
132 struct flb_parser *parser;
133
134 struct flb_lib_out_cb cb;
135 cb.cb = callback_test;
136 cb.data = NULL;
137
138 ctx = flb_create();
139
140 /* Configure service */
141 flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
142
143 /* Input */
144 in_ffd = flb_input(ctx, (char *) "lib", NULL);
145 TEST_CHECK(in_ffd >= 0);
146 flb_input_set(ctx, in_ffd,
147 "Tag", "test",
148 NULL);
149
150 /* Parser */
151 parser = flb_parser_create("dummy_test", "regex", "^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$",
152 NULL, NULL, NULL, MK_FALSE, MK_TRUE, NULL, 0,
153 NULL, ctx->config);
154 TEST_CHECK(parser != NULL);
155
156 /* Filter */
157 filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
158 TEST_CHECK(filter_ffd >= 0);
159 ret = flb_filter_set(ctx, filter_ffd,
160 "Match", "test",
161 "Key_Name", "data",
162 "Parser", "dummy_test",
163 "Reserve_Data", "Off",
164 NULL);
165 TEST_CHECK(ret == 0);
166
167 /* Output */
168 out_ffd = flb_output(ctx, (char *) "lib", &cb);
169 TEST_CHECK(out_ffd >= 0);
170 flb_output_set(ctx, out_ffd,
171 "Match", "*",
172 "format", "json",
173 NULL);
174
175 /* Start the engine */
176 ret = flb_start(ctx);
177 TEST_CHECK(ret == 0);
178
179 /* Ingest data */
180 p = "[1448403340,{\"data\":\"100 0.5 true This is an example\",\"extra\":\"Some more data\"}]";
181 bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
182 TEST_CHECK(bytes == strlen(p));
183
184 flb_time_msleep(1500); /* waiting flush */
185 output = get_output(); /* 1sec passed, data should be flushed */
186 TEST_CHECK_(output != NULL, "Expected output to not be NULL");
187 if (output != NULL) {
188 /* check extra data was not preserved */
189 expected = "\"extra\":\"Some more data\"";
190 TEST_CHECK_(strstr(output, expected) == NULL, "Expected output to not preserve extra field, got '%s'", output);
191 free(output);
192 }
193
194 flb_stop(ctx);
195 flb_destroy(ctx);
196 }
197
flb_test_filter_parser_handle_time_key()198 void flb_test_filter_parser_handle_time_key()
199 {
200 int ret;
201 int bytes;
202 char *p, *output, *expected;
203 flb_ctx_t *ctx;
204 int in_ffd;
205 int out_ffd;
206 int filter_ffd;
207 struct flb_parser *parser;
208
209 struct flb_lib_out_cb cb;
210 cb.cb = callback_test;
211 cb.data = NULL;
212
213 ctx = flb_create();
214
215 /* Configure service */
216 flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
217
218 /* Input */
219 in_ffd = flb_input(ctx, (char *) "lib", NULL);
220 TEST_CHECK(in_ffd >= 0);
221 flb_input_set(ctx, in_ffd,
222 "Tag", "test",
223 NULL);
224
225 /* Parser */
226 parser = flb_parser_create("timestamp", "regex", "^(?<time>.*)$", "%Y-%m-%dT%H:%M:%S.%L",
227 "time",
228 NULL, MK_FALSE, MK_TRUE,
229 NULL, 0, NULL, ctx->config);
230 TEST_CHECK(parser != NULL);
231
232 /* Filter */
233 filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
234 TEST_CHECK(filter_ffd >= 0);
235 ret = flb_filter_set(ctx, filter_ffd,
236 "Match", "test",
237 "Key_Name", "@timestamp",
238 "Parser", "timestamp",
239 "Reserve_Data", "On",
240 NULL);
241 TEST_CHECK(ret == 0);
242
243 /* Output */
244 out_ffd = flb_output(ctx, (char *) "lib", &cb);
245 TEST_CHECK(out_ffd >= 0);
246 flb_output_set(ctx, out_ffd,
247 "Match", "*",
248 "format", "json",
249 NULL);
250
251 /* Start the engine */
252 ret = flb_start(ctx);
253 TEST_CHECK(ret == 0);
254
255 /* Ingest data */
256 p = "[1448403340, {\"@timestamp\":\"2017-11-01T22:25:21.648+00:00\", \"message\":\"This is an example\"}]";
257 bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
258 TEST_CHECK(bytes == strlen(p));
259
260 flb_time_msleep(1500); /* waiting flush */
261 output = get_output(); /* 1sec passed, data should be flushed */
262 TEST_CHECK_(output != NULL, "Expected output to not be NULL");
263 if (output != NULL) {
264 /* check the timestamp field was updated correctly */
265 /* this is in fluent-bits extended timestamp format */
266 expected = "[1509575121.648000,{";
267 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
268 /* check additional field is preserved */
269 expected = "\"message\":\"This is an example\"";
270 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
271 free(output);
272 }
273
274 flb_stop(ctx);
275 flb_destroy(ctx);
276 }
277
flb_test_filter_parser_handle_time_key_with_fractional_timestamp()278 void flb_test_filter_parser_handle_time_key_with_fractional_timestamp()
279 {
280 int ret;
281 int bytes;
282 char *p, *output, *expected;
283 flb_ctx_t *ctx;
284 int in_ffd;
285 int out_ffd;
286 int filter_ffd;
287 struct flb_parser *parser;
288
289 ctx = flb_create();
290
291 /* Configure service */
292 flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
293
294 /* Input */
295 in_ffd = flb_input(ctx, (char *) "lib", NULL);
296 TEST_CHECK(in_ffd >= 0);
297 flb_input_set(ctx, in_ffd,
298 "Tag", "test",
299 NULL);
300
301 /* Parser */
302 parser = flb_parser_create("timestamp", "regex", "^(?<time>.*)$", "%s.%L",
303 "time",
304 NULL, MK_FALSE, MK_TRUE,
305 NULL, 0, NULL, ctx->config);
306 TEST_CHECK(parser != NULL);
307
308 /* Filter */
309 filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
310 TEST_CHECK(filter_ffd >= 0);
311 ret = flb_filter_set(ctx, filter_ffd,
312 "Match", "test",
313 "Key_Name", "@timestamp",
314 "Parser", "timestamp",
315 "Reserve_Data", "On",
316 NULL);
317 TEST_CHECK(ret == 0);
318
319 /* Output */
320 out_ffd = flb_output(ctx, (char *) "lib", (void*)callback_test);
321 TEST_CHECK(out_ffd >= 0);
322 flb_output_set(ctx, out_ffd,
323 "Match", "*",
324 "format", "json",
325 NULL);
326
327 /* Start the engine */
328 ret = flb_start(ctx);
329 TEST_CHECK(ret == 0);
330
331 /* Ingest data */
332 p = "[1448403340, {\"@timestamp\":\"1509575121.648\", \"message\":\"This is an example\"}]";
333 bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
334 TEST_CHECK(bytes == strlen(p));
335
336 flb_time_msleep(1500); /* waiting flush */
337 output = get_output(); /* 1sec passed, data should be flushed */
338 TEST_CHECK_(output != NULL, "Expected output to not be NULL");
339 if (output != NULL) {
340 /* check the timestamp field was updated correctly */
341 /* this is in fluent-bits extended timestamp format */
342 expected = "[\"\\x59\\xfffffffa\\x49\\xffffffd1\\x26\\xffffff9f\\xffffffb2\\x00\", {";
343 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
344 /* check additional field is preserved */
345 expected = "\"message\":\"This is an example\"";
346 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
347 free(output);
348 }
349
350 flb_stop(ctx);
351 flb_destroy(ctx);
352 }
353
flb_test_filter_parser_ignore_malformed_time()354 void flb_test_filter_parser_ignore_malformed_time()
355 {
356 int ret;
357 int bytes;
358 char *p, *output, *expected;
359 flb_ctx_t *ctx;
360 int in_ffd;
361 int out_ffd;
362 int filter_ffd;
363 struct flb_parser *parser;
364
365 struct flb_lib_out_cb cb;
366 cb.cb = callback_test;
367 cb.data = NULL;
368
369 ctx = flb_create();
370
371 /* Configure service */
372 flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
373
374 /* Input */
375 in_ffd = flb_input(ctx, (char *) "lib", NULL);
376 TEST_CHECK(in_ffd >= 0);
377 flb_input_set(ctx, in_ffd,
378 "Tag", "test",
379 NULL);
380
381 /* Parser */
382 parser = flb_parser_create("timestamp", "regex",
383 "^(?<time>.*)$", "%Y-%m-%dT%H:%M:%S.%L", "time",
384 NULL, FLB_FALSE, MK_TRUE,
385 NULL, 0, NULL, ctx->config);
386 TEST_CHECK(parser != NULL);
387
388 /* Filter */
389 filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
390 TEST_CHECK(filter_ffd >= 0);
391 ret = flb_filter_set(ctx, filter_ffd,
392 "Match", "test",
393 "Key_Name", "@timestamp",
394 "Parser", "timestamp",
395 "Reserve_Data", "On",
396 "Preserve_Key", "On",
397 NULL);
398 TEST_CHECK(ret == 0);
399
400 /* Output */
401 out_ffd = flb_output(ctx, (char *) "lib", &cb);
402 TEST_CHECK(out_ffd >= 0);
403 flb_output_set(ctx, out_ffd,
404 "Match", "*",
405 "format", "json",
406 NULL);
407
408 /* Start the engine */
409 ret = flb_start(ctx);
410 TEST_CHECK(ret == 0);
411
412 /* Ingest data */
413 p = "[1448403340, {\"@timestamp\":\"2017_$!^-11-01T22:25:21.648\", \"log\":\"An example\"}]";
414 bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
415 TEST_CHECK(bytes == strlen(p));
416
417 flb_time_msleep(1500); /* waiting flush */
418 output = get_output(); /* 1sec passed, data should be flushed */
419 TEST_CHECK_(output != NULL, "Expected output to not be NULL");
420 if (output != NULL) {
421 /* check the timestamp field was ignored and we received everything else */
422 expected = "[1448403340.000000,{\"@timestamp\":\"2017_$!^-11-01T22:25:21.648\",\"log\":\"An example\"}]";
423 TEST_CHECK_(strcmp(output, expected) == 0, "Expected output to be '%s', got '%s'", expected, output);
424 free(output);
425 }
426
427 flb_stop(ctx);
428 flb_destroy(ctx);
429 }
430
flb_test_filter_parser_preserve_original_field()431 void flb_test_filter_parser_preserve_original_field()
432 {
433 int ret;
434 int bytes;
435 char *p, *output, *expected;
436 flb_ctx_t *ctx;
437 int in_ffd;
438 int out_ffd;
439 int filter_ffd;
440 struct flb_parser *parser;
441
442 struct flb_lib_out_cb cb;
443 cb.cb = callback_test;
444 cb.data = NULL;
445
446 ctx = flb_create();
447
448 /* Configure service */
449 flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
450
451 /* Input */
452 in_ffd = flb_input(ctx, (char *) "lib", NULL);
453 TEST_CHECK(in_ffd >= 0);
454 flb_input_set(ctx, in_ffd,
455 "Tag", "test",
456 NULL);
457
458 /* Parser */
459 parser = flb_parser_create("dummy_test", "regex", "^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$",
460 NULL, NULL, NULL, MK_FALSE, MK_TRUE, NULL, 0,
461 NULL, ctx->config);
462 TEST_CHECK(parser != NULL);
463
464 /* Filter */
465 filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
466 TEST_CHECK(filter_ffd >= 0);
467 ret = flb_filter_set(ctx, filter_ffd,
468 "Match", "test",
469 "Key_Name", "data",
470 "Parser", "dummy_test",
471 "Reserve_Data", "On",
472 "Preserve_Key", "On",
473 NULL);
474 TEST_CHECK(ret == 0);
475
476 /* Output */
477 out_ffd = flb_output(ctx, (char *) "lib", &cb);
478 TEST_CHECK(out_ffd >= 0);
479 flb_output_set(ctx, out_ffd,
480 "Match", "*",
481 "format", "json",
482 NULL);
483
484 /* Start the engine */
485 ret = flb_start(ctx);
486 TEST_CHECK(ret == 0);
487
488 /* Ingest data */
489 p = "[1448403340,{\"data\":\"100 0.5 true This is an example\",\"log\":\"An example\"}]";
490 bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
491 TEST_CHECK(bytes == strlen(p));
492
493 flb_time_msleep(1500); /* waiting flush */
494 output = get_output(); /* 1sec passed, data should be flushed */
495 TEST_CHECK_(output != NULL, "Expected output to not be NULL");
496 if (output != NULL) {
497 /* check original field is preserved */
498 expected = "\"data\":\"100 0.5 true This is an example\"";
499 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
500 /* check fields were extracted */
501 expected = "\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"";
502 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
503 /* check other fields are preserved */
504 expected = "\"log\":\"An example\"";
505 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
506 free(output);
507 }
508
509 flb_stop(ctx);
510 flb_destroy(ctx);
511 }
512
513 // https://github.com/fluent/fluent-bit/issues/2250
flb_test_filter_parser_first_matched_when_mutilple_parser()514 void flb_test_filter_parser_first_matched_when_mutilple_parser()
515 {
516 int ret;
517 int bytes;
518 char *p, *output, *expected;
519 flb_ctx_t *ctx;
520 int in_ffd;
521 int out_ffd;
522 int filter_ffd;
523 struct flb_parser *parser;
524
525 struct flb_lib_out_cb cb;
526 cb.cb = callback_test;
527 cb.data = NULL;
528
529 ctx = flb_create();
530
531 /* Configure service */
532 flb_service_set(ctx, "Flush", "1", "Grace" "1", "Log_Level", "debug", NULL);
533
534 /* Input */
535 in_ffd = flb_input(ctx, (char *) "lib", NULL);
536 TEST_CHECK(in_ffd >= 0);
537 flb_input_set(ctx, in_ffd,
538 "Tag", "test",
539 NULL);
540
541 /* Parser */
542 parser = flb_parser_create("one", "regex", "^(?<one>.+?)$",
543 NULL, NULL, NULL, MK_FALSE, MK_TRUE,
544 NULL, 0, NULL, ctx->config);
545 TEST_CHECK(parser != NULL);
546
547 parser = flb_parser_create("two", "regex", "^(?<two>.+?)$",
548 NULL, NULL, NULL, MK_FALSE, MK_TRUE,
549 NULL, 0, NULL, ctx->config);
550 TEST_CHECK(parser != NULL);
551
552 /* Filter */
553 filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
554 TEST_CHECK(filter_ffd >= 0);
555 ret = flb_filter_set(ctx, filter_ffd,
556 "Match", "test",
557 "Key_Name", "data",
558 "Parser", "one",
559 "Parser", "two",
560 "Reserve_Data", "On",
561 "Preserve_Key", "On",
562 NULL);
563 TEST_CHECK(ret == 0);
564
565 /* Output */
566 out_ffd = flb_output(ctx, (char *) "lib", &cb);
567 TEST_CHECK(out_ffd >= 0);
568 flb_output_set(ctx, out_ffd,
569 "Match", "*",
570 "format", "json",
571 NULL);
572
573 /* Start the engine */
574 ret = flb_start(ctx);
575 TEST_CHECK(ret == 0);
576
577 /* Ingest data */
578 p = "[1,{\"data\":\"hoge\"}]";
579 bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
580 TEST_CHECK(bytes == strlen(p));
581
582 flb_time_msleep(1500); /* waiting flush */
583 output = get_output(); /* 1sec passed, data should be flushed */
584 TEST_CHECK_(output != NULL, "Expected output to not be NULL");
585 if (output != NULL) {
586 /* check extra data was not preserved */
587 expected = "\"one\":\"hoge\",\"data\":\"hoge\"";
588 TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain key one , got '%s'", output);
589 free(output);
590 }
591
592 flb_stop(ctx);
593 flb_destroy(ctx);
594 }
595
596
597 TEST_LIST = {
598 {"filter_parser_extract_fields", flb_test_filter_parser_extract_fields },
599 {"filter_parser_reserve_data_off", flb_test_filter_parser_reserve_data_off },
600 {"filter_parser_handle_time_key", flb_test_filter_parser_handle_time_key },
601 {"filter_parser_ignore_malformed_time", flb_test_filter_parser_ignore_malformed_time },
602 {"filter_parser_preserve_original_field", flb_test_filter_parser_preserve_original_field },
603 {"filter_parser_first_matched_when_multiple_parser", flb_test_filter_parser_first_matched_when_mutilple_parser },
604 {NULL, NULL}
605 };
606
607