1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 #include <fluent-bit.h>
4 #include <fluent-bit/flb_time.h>
5 #include <fluent-bit/flb_parser.h>
6 #include "flb_tests_runtime.h"
7 
8 pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER;
9 char *output = NULL;
10 
set_output(char * val)11 void set_output(char *val)
12 {
13     pthread_mutex_lock(&result_mutex);
14     output = val;
15     pthread_mutex_unlock(&result_mutex);
16 }
17 
get_output(void)18 char *get_output(void)
19 {
20     char *val;
21 
22     pthread_mutex_lock(&result_mutex);
23     val = output;
24     pthread_mutex_unlock(&result_mutex);
25 
26     return val;
27 }
28 
callback_test(void * data,size_t size,void * cb_data)29 int callback_test(void* data, size_t size, void* cb_data)
30 {
31     if (size > 0) {
32         flb_debug("[test_filter_parser] received message: %s", data);
33         set_output(data); /* success */
34     }
35     return 0;
36 }
37 
flb_test_filter_parser_extract_fields()38 void flb_test_filter_parser_extract_fields()
39 {
40     int ret;
41     int bytes;
42     char *p, *output, *expected;
43     flb_ctx_t *ctx;
44     int in_ffd;
45     int out_ffd;
46     int filter_ffd;
47     struct flb_parser *parser;
48 
49     struct flb_lib_out_cb cb;
50     cb.cb   = callback_test;
51     cb.data = NULL;
52 
53     ctx = flb_create();
54 
55     /* Configure service */
56     flb_service_set(ctx, "Flush", "1", "Grace" "1", "Log_Level", "debug", NULL);
57 
58     /* Input */
59     in_ffd = flb_input(ctx, (char *) "lib", NULL);
60     TEST_CHECK(in_ffd >= 0);
61     flb_input_set(ctx, in_ffd,
62                   "Tag", "test",
63                   NULL);
64 
65     /* Parser */
66     parser = flb_parser_create("dummy_test", "regex", "^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$",
67                                NULL, NULL, NULL, MK_FALSE, MK_TRUE, NULL, 0,
68                                NULL, ctx->config);
69     TEST_CHECK(parser != NULL);
70 
71     /* Filter */
72     filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
73     TEST_CHECK(filter_ffd >= 0);
74     ret = flb_filter_set(ctx, filter_ffd,
75                          "Match", "test",
76                          "Key_Name", "data",
77                          "Parser", "dummy_test",
78                          "Reserve_Data", "On",
79                          "Preserve_Key", "Off",
80                          NULL);
81     TEST_CHECK(ret == 0);
82 
83     /* Output */
84     out_ffd = flb_output(ctx, (char *) "lib", &cb);
85     TEST_CHECK(out_ffd >= 0);
86     flb_output_set(ctx, out_ffd,
87                    "Match", "*",
88                    "format", "json",
89                    NULL);
90 
91     /* Start the engine */
92     ret = flb_start(ctx);
93     TEST_CHECK(ret == 0);
94 
95     /* Ingest data */
96     p = "[1448403340, {\"data\":\"100 0.5 true This is an example\", \"extra\":\"Some more data\"}]";
97     bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
98     TEST_CHECK(bytes == strlen(p));
99 
100     flb_time_msleep(1500); /* waiting flush */
101     output = get_output(); /* 1sec passed, data should be flushed */
102     TEST_CHECK_(output != NULL, "Expected output to not be NULL");
103     if (output != NULL) {
104         /* check timestamp */
105         expected = "[1448403340.000000,{";
106         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
107         /* check fields were extracted */
108         expected = "\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"";
109         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
110         /* check original field was not preserved */
111         expected = "\"data\":\"100 0.5 true This is an example\"";
112         TEST_CHECK_(strstr(output, expected) == NULL, "Expected output to not contain '%s', got '%s'", expected, output);
113         /* check extra data was preserved */
114         expected = "\"extra\":\"Some more data\"";
115         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to preserve extra field, got '%s'", output);
116         free(output);
117     }
118 
119     flb_stop(ctx);
120     flb_destroy(ctx);
121 }
122 
flb_test_filter_parser_reserve_data_off()123 void flb_test_filter_parser_reserve_data_off()
124 {
125     int ret;
126     int bytes;
127     char *p, *output, *expected;
128     flb_ctx_t *ctx;
129     int in_ffd;
130     int out_ffd;
131     int filter_ffd;
132     struct flb_parser *parser;
133 
134     struct flb_lib_out_cb cb;
135     cb.cb   = callback_test;
136     cb.data = NULL;
137 
138     ctx = flb_create();
139 
140     /* Configure service */
141     flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
142 
143     /* Input */
144     in_ffd = flb_input(ctx, (char *) "lib", NULL);
145     TEST_CHECK(in_ffd >= 0);
146     flb_input_set(ctx, in_ffd,
147                   "Tag", "test",
148                   NULL);
149 
150     /* Parser */
151     parser = flb_parser_create("dummy_test", "regex", "^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$",
152                                NULL, NULL, NULL, MK_FALSE, MK_TRUE, NULL, 0,
153                                NULL, ctx->config);
154     TEST_CHECK(parser != NULL);
155 
156     /* Filter */
157     filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
158     TEST_CHECK(filter_ffd >= 0);
159     ret = flb_filter_set(ctx, filter_ffd,
160                          "Match", "test",
161                          "Key_Name", "data",
162                          "Parser", "dummy_test",
163                          "Reserve_Data", "Off",
164                          NULL);
165     TEST_CHECK(ret == 0);
166 
167     /* Output */
168     out_ffd = flb_output(ctx, (char *) "lib", &cb);
169     TEST_CHECK(out_ffd >= 0);
170     flb_output_set(ctx, out_ffd,
171                    "Match", "*",
172                    "format", "json",
173                    NULL);
174 
175     /* Start the engine */
176     ret = flb_start(ctx);
177     TEST_CHECK(ret == 0);
178 
179     /* Ingest data */
180     p = "[1448403340,{\"data\":\"100 0.5 true This is an example\",\"extra\":\"Some more data\"}]";
181     bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
182     TEST_CHECK(bytes == strlen(p));
183 
184     flb_time_msleep(1500); /* waiting flush */
185     output = get_output(); /* 1sec passed, data should be flushed */
186     TEST_CHECK_(output != NULL, "Expected output to not be NULL");
187     if (output != NULL) {
188         /* check extra data was not preserved */
189         expected = "\"extra\":\"Some more data\"";
190         TEST_CHECK_(strstr(output, expected) == NULL, "Expected output to not preserve extra field, got '%s'", output);
191         free(output);
192     }
193 
194     flb_stop(ctx);
195     flb_destroy(ctx);
196 }
197 
flb_test_filter_parser_handle_time_key()198 void flb_test_filter_parser_handle_time_key()
199 {
200     int ret;
201     int bytes;
202     char *p, *output, *expected;
203     flb_ctx_t *ctx;
204     int in_ffd;
205     int out_ffd;
206     int filter_ffd;
207     struct flb_parser *parser;
208 
209     struct flb_lib_out_cb cb;
210     cb.cb   = callback_test;
211     cb.data = NULL;
212 
213     ctx = flb_create();
214 
215     /* Configure service */
216     flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
217 
218     /* Input */
219     in_ffd = flb_input(ctx, (char *) "lib", NULL);
220     TEST_CHECK(in_ffd >= 0);
221     flb_input_set(ctx, in_ffd,
222                   "Tag", "test",
223                   NULL);
224 
225     /* Parser */
226     parser = flb_parser_create("timestamp", "regex", "^(?<time>.*)$", "%Y-%m-%dT%H:%M:%S.%L",
227                                "time",
228                                NULL, MK_FALSE, MK_TRUE,
229                                NULL, 0, NULL, ctx->config);
230     TEST_CHECK(parser != NULL);
231 
232     /* Filter */
233     filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
234     TEST_CHECK(filter_ffd >= 0);
235     ret = flb_filter_set(ctx, filter_ffd,
236                          "Match", "test",
237                          "Key_Name", "@timestamp",
238                          "Parser", "timestamp",
239                          "Reserve_Data", "On",
240                          NULL);
241     TEST_CHECK(ret == 0);
242 
243     /* Output */
244     out_ffd = flb_output(ctx, (char *) "lib", &cb);
245     TEST_CHECK(out_ffd >= 0);
246     flb_output_set(ctx, out_ffd,
247                    "Match", "*",
248                    "format", "json",
249                    NULL);
250 
251     /* Start the engine */
252     ret = flb_start(ctx);
253     TEST_CHECK(ret == 0);
254 
255     /* Ingest data */
256     p = "[1448403340, {\"@timestamp\":\"2017-11-01T22:25:21.648+00:00\", \"message\":\"This is an example\"}]";
257     bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
258     TEST_CHECK(bytes == strlen(p));
259 
260     flb_time_msleep(1500); /* waiting flush */
261     output = get_output(); /* 1sec passed, data should be flushed */
262     TEST_CHECK_(output != NULL, "Expected output to not be NULL");
263     if (output != NULL) {
264         /* check the timestamp field was updated correctly */
265         /* this is in fluent-bits extended timestamp format */
266         expected = "[1509575121.648000,{";
267         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
268         /* check additional field is preserved */
269         expected = "\"message\":\"This is an example\"";
270         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
271         free(output);
272     }
273 
274     flb_stop(ctx);
275     flb_destroy(ctx);
276 }
277 
flb_test_filter_parser_handle_time_key_with_fractional_timestamp()278 void flb_test_filter_parser_handle_time_key_with_fractional_timestamp()
279 {
280     int ret;
281     int bytes;
282     char *p, *output, *expected;
283     flb_ctx_t *ctx;
284     int in_ffd;
285     int out_ffd;
286     int filter_ffd;
287     struct flb_parser *parser;
288 
289     ctx = flb_create();
290 
291     /* Configure service */
292     flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
293 
294     /* Input */
295     in_ffd = flb_input(ctx, (char *) "lib", NULL);
296     TEST_CHECK(in_ffd >= 0);
297     flb_input_set(ctx, in_ffd,
298                   "Tag", "test",
299                   NULL);
300 
301     /* Parser */
302     parser = flb_parser_create("timestamp", "regex", "^(?<time>.*)$", "%s.%L",
303                                "time",
304                                NULL, MK_FALSE, MK_TRUE,
305                                NULL, 0, NULL, ctx->config);
306     TEST_CHECK(parser != NULL);
307 
308     /* Filter */
309     filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
310     TEST_CHECK(filter_ffd >= 0);
311     ret = flb_filter_set(ctx, filter_ffd,
312                          "Match", "test",
313                          "Key_Name", "@timestamp",
314                          "Parser", "timestamp",
315                          "Reserve_Data", "On",
316                          NULL);
317     TEST_CHECK(ret == 0);
318 
319     /* Output */
320     out_ffd = flb_output(ctx, (char *) "lib", (void*)callback_test);
321     TEST_CHECK(out_ffd >= 0);
322     flb_output_set(ctx, out_ffd,
323                    "Match", "*",
324                    "format", "json",
325                    NULL);
326 
327     /* Start the engine */
328     ret = flb_start(ctx);
329     TEST_CHECK(ret == 0);
330 
331     /* Ingest data */
332     p = "[1448403340, {\"@timestamp\":\"1509575121.648\", \"message\":\"This is an example\"}]";
333     bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
334     TEST_CHECK(bytes == strlen(p));
335 
336     flb_time_msleep(1500); /* waiting flush */
337     output = get_output(); /* 1sec passed, data should be flushed */
338     TEST_CHECK_(output != NULL, "Expected output to not be NULL");
339     if (output != NULL) {
340         /* check the timestamp field was updated correctly */
341         /* this is in fluent-bits extended timestamp format */
342         expected = "[\"\\x59\\xfffffffa\\x49\\xffffffd1\\x26\\xffffff9f\\xffffffb2\\x00\", {";
343         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
344         /* check additional field is preserved */
345         expected = "\"message\":\"This is an example\"";
346         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
347         free(output);
348     }
349 
350     flb_stop(ctx);
351     flb_destroy(ctx);
352 }
353 
flb_test_filter_parser_ignore_malformed_time()354 void flb_test_filter_parser_ignore_malformed_time()
355 {
356     int ret;
357     int bytes;
358     char *p, *output, *expected;
359     flb_ctx_t *ctx;
360     int in_ffd;
361     int out_ffd;
362     int filter_ffd;
363     struct flb_parser *parser;
364 
365     struct flb_lib_out_cb cb;
366     cb.cb   = callback_test;
367     cb.data = NULL;
368 
369     ctx = flb_create();
370 
371     /* Configure service */
372     flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
373 
374     /* Input */
375     in_ffd = flb_input(ctx, (char *) "lib", NULL);
376     TEST_CHECK(in_ffd >= 0);
377     flb_input_set(ctx, in_ffd,
378                   "Tag", "test",
379                   NULL);
380 
381     /* Parser */
382     parser = flb_parser_create("timestamp", "regex",
383                                "^(?<time>.*)$", "%Y-%m-%dT%H:%M:%S.%L", "time",
384                                NULL, FLB_FALSE, MK_TRUE,
385                                NULL, 0, NULL, ctx->config);
386     TEST_CHECK(parser != NULL);
387 
388     /* Filter */
389     filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
390     TEST_CHECK(filter_ffd >= 0);
391     ret = flb_filter_set(ctx, filter_ffd,
392                          "Match", "test",
393                          "Key_Name", "@timestamp",
394                          "Parser", "timestamp",
395                          "Reserve_Data", "On",
396                          "Preserve_Key", "On",
397                          NULL);
398     TEST_CHECK(ret == 0);
399 
400     /* Output */
401     out_ffd = flb_output(ctx, (char *) "lib", &cb);
402     TEST_CHECK(out_ffd >= 0);
403     flb_output_set(ctx, out_ffd,
404                    "Match", "*",
405                    "format", "json",
406                    NULL);
407 
408     /* Start the engine */
409     ret = flb_start(ctx);
410     TEST_CHECK(ret == 0);
411 
412     /* Ingest data */
413     p = "[1448403340, {\"@timestamp\":\"2017_$!^-11-01T22:25:21.648\", \"log\":\"An example\"}]";
414     bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
415     TEST_CHECK(bytes == strlen(p));
416 
417     flb_time_msleep(1500); /* waiting flush */
418     output = get_output(); /* 1sec passed, data should be flushed */
419     TEST_CHECK_(output != NULL, "Expected output to not be NULL");
420     if (output != NULL) {
421         /* check the timestamp field was ignored and we received everything else */
422         expected = "[1448403340.000000,{\"@timestamp\":\"2017_$!^-11-01T22:25:21.648\",\"log\":\"An example\"}]";
423         TEST_CHECK_(strcmp(output, expected) == 0, "Expected output to be '%s', got '%s'", expected, output);
424         free(output);
425     }
426 
427     flb_stop(ctx);
428     flb_destroy(ctx);
429 }
430 
flb_test_filter_parser_preserve_original_field()431 void flb_test_filter_parser_preserve_original_field()
432 {
433     int ret;
434     int bytes;
435     char *p, *output, *expected;
436     flb_ctx_t *ctx;
437     int in_ffd;
438     int out_ffd;
439     int filter_ffd;
440     struct flb_parser *parser;
441 
442     struct flb_lib_out_cb cb;
443     cb.cb   = callback_test;
444     cb.data = NULL;
445 
446     ctx = flb_create();
447 
448     /* Configure service */
449     flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "debug", NULL);
450 
451     /* Input */
452     in_ffd = flb_input(ctx, (char *) "lib", NULL);
453     TEST_CHECK(in_ffd >= 0);
454     flb_input_set(ctx, in_ffd,
455                   "Tag", "test",
456                   NULL);
457 
458     /* Parser */
459     parser = flb_parser_create("dummy_test", "regex", "^(?<INT>[^ ]+) (?<FLOAT>[^ ]+) (?<BOOL>[^ ]+) (?<STRING>.+)$",
460                                NULL, NULL, NULL, MK_FALSE, MK_TRUE, NULL, 0,
461                                NULL, ctx->config);
462     TEST_CHECK(parser != NULL);
463 
464     /* Filter */
465     filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
466     TEST_CHECK(filter_ffd >= 0);
467     ret = flb_filter_set(ctx, filter_ffd,
468                          "Match", "test",
469                          "Key_Name", "data",
470                          "Parser", "dummy_test",
471                          "Reserve_Data", "On",
472                          "Preserve_Key", "On",
473                          NULL);
474     TEST_CHECK(ret == 0);
475 
476     /* Output */
477     out_ffd = flb_output(ctx, (char *) "lib", &cb);
478     TEST_CHECK(out_ffd >= 0);
479     flb_output_set(ctx, out_ffd,
480                    "Match", "*",
481                    "format", "json",
482                    NULL);
483 
484     /* Start the engine */
485     ret = flb_start(ctx);
486     TEST_CHECK(ret == 0);
487 
488     /* Ingest data */
489     p = "[1448403340,{\"data\":\"100 0.5 true This is an example\",\"log\":\"An example\"}]";
490     bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
491     TEST_CHECK(bytes == strlen(p));
492 
493     flb_time_msleep(1500); /* waiting flush */
494     output = get_output(); /* 1sec passed, data should be flushed */
495     TEST_CHECK_(output != NULL, "Expected output to not be NULL");
496     if (output != NULL) {
497         /* check original field is preserved */
498         expected = "\"data\":\"100 0.5 true This is an example\"";
499         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
500         /* check fields were extracted */
501         expected = "\"INT\":\"100\",\"FLOAT\":\"0.5\",\"BOOL\":\"true\",\"STRING\":\"This is an example\"";
502         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
503         /* check other fields are preserved */
504         expected = "\"log\":\"An example\"";
505         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain '%s', got '%s'", expected, output);
506         free(output);
507     }
508 
509     flb_stop(ctx);
510     flb_destroy(ctx);
511 }
512 
513 // https://github.com/fluent/fluent-bit/issues/2250
flb_test_filter_parser_first_matched_when_mutilple_parser()514 void flb_test_filter_parser_first_matched_when_mutilple_parser()
515 {
516     int ret;
517     int bytes;
518     char *p, *output, *expected;
519     flb_ctx_t *ctx;
520     int in_ffd;
521     int out_ffd;
522     int filter_ffd;
523     struct flb_parser *parser;
524 
525     struct flb_lib_out_cb cb;
526     cb.cb   = callback_test;
527     cb.data = NULL;
528 
529     ctx = flb_create();
530 
531     /* Configure service */
532     flb_service_set(ctx, "Flush", "1", "Grace" "1", "Log_Level", "debug", NULL);
533 
534     /* Input */
535     in_ffd = flb_input(ctx, (char *) "lib", NULL);
536     TEST_CHECK(in_ffd >= 0);
537     flb_input_set(ctx, in_ffd,
538                   "Tag", "test",
539                   NULL);
540 
541     /* Parser */
542     parser = flb_parser_create("one", "regex", "^(?<one>.+?)$",
543                                NULL, NULL, NULL, MK_FALSE, MK_TRUE,
544                                NULL, 0, NULL, ctx->config);
545     TEST_CHECK(parser != NULL);
546 
547     parser = flb_parser_create("two", "regex", "^(?<two>.+?)$",
548                                NULL, NULL, NULL, MK_FALSE, MK_TRUE,
549                                NULL, 0, NULL, ctx->config);
550     TEST_CHECK(parser != NULL);
551 
552     /* Filter */
553     filter_ffd = flb_filter(ctx, (char *) "parser", NULL);
554     TEST_CHECK(filter_ffd >= 0);
555     ret = flb_filter_set(ctx, filter_ffd,
556                          "Match", "test",
557                          "Key_Name", "data",
558                          "Parser", "one",
559                          "Parser", "two",
560                          "Reserve_Data", "On",
561                          "Preserve_Key", "On",
562                          NULL);
563     TEST_CHECK(ret == 0);
564 
565     /* Output */
566     out_ffd = flb_output(ctx, (char *) "lib", &cb);
567     TEST_CHECK(out_ffd >= 0);
568     flb_output_set(ctx, out_ffd,
569                    "Match", "*",
570                    "format", "json",
571                    NULL);
572 
573     /* Start the engine */
574     ret = flb_start(ctx);
575     TEST_CHECK(ret == 0);
576 
577     /* Ingest data */
578     p = "[1,{\"data\":\"hoge\"}]";
579     bytes = flb_lib_push(ctx, in_ffd, p, strlen(p));
580     TEST_CHECK(bytes == strlen(p));
581 
582     flb_time_msleep(1500); /* waiting flush */
583     output = get_output(); /* 1sec passed, data should be flushed */
584     TEST_CHECK_(output != NULL, "Expected output to not be NULL");
585     if (output != NULL) {
586         /* check extra data was not preserved */
587         expected = "\"one\":\"hoge\",\"data\":\"hoge\"";
588         TEST_CHECK_(strstr(output, expected) != NULL, "Expected output to contain key one , got '%s'", output);
589         free(output);
590     }
591 
592     flb_stop(ctx);
593     flb_destroy(ctx);
594 }
595 
596 
597 TEST_LIST = {
598     {"filter_parser_extract_fields", flb_test_filter_parser_extract_fields },
599     {"filter_parser_reserve_data_off", flb_test_filter_parser_reserve_data_off },
600     {"filter_parser_handle_time_key", flb_test_filter_parser_handle_time_key },
601     {"filter_parser_ignore_malformed_time", flb_test_filter_parser_ignore_malformed_time },
602     {"filter_parser_preserve_original_field", flb_test_filter_parser_preserve_original_field },
603     {"filter_parser_first_matched_when_multiple_parser", flb_test_filter_parser_first_matched_when_mutilple_parser },
604     {NULL, NULL}
605 };
606 
607