1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit Demo
4 * ===============
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21
22 #include <fluent-bit/flb_lib.h>
23 #include <fluent-bit/flb_mem.h>
24 #include <fluent-bit/flb_pipe.h>
25 #include <fluent-bit/flb_engine.h>
26 #include <fluent-bit/flb_input.h>
27 #include <fluent-bit/flb_output.h>
28 #include <fluent-bit/flb_filter.h>
29 #include <fluent-bit/flb_utils.h>
30 #include <fluent-bit/flb_time.h>
31 #include <fluent-bit/flb_coro.h>
32 #include <fluent-bit/flb_callback.h>
33 #include <fluent-bit/flb_kv.h>
34 #include <fluent-bit/flb_metrics.h>
35 #include <fluent-bit/tls/flb_tls.h>
36
37 #include <signal.h>
38 #include <stdarg.h>
39
40 #ifdef FLB_HAVE_MTRACE
41 #include <mcheck.h>
42 #endif
43
44 #ifdef FLB_HAVE_AWS_ERROR_REPORTER
45 #include <fluent-bit/aws/flb_aws_error_reporter.h>
46
47 struct flb_aws_error_reporter *error_reporter;
48 #endif
49
50 /* thread initializator */
51 static pthread_once_t flb_lib_once = PTHREAD_ONCE_INIT;
52
53 #ifdef FLB_SYSTEM_WINDOWS
flb_socket_init_win32(void)54 static inline int flb_socket_init_win32(void)
55 {
56 WSADATA wsaData;
57 int err;
58
59 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
60 if (err != 0) {
61 fprintf(stderr, "WSAStartup failed with error: %d\n", err);
62 return err;
63 }
64 return 0;
65 }
66 #endif
67
in_instance_get(flb_ctx_t * ctx,int ffd)68 static inline struct flb_input_instance *in_instance_get(flb_ctx_t *ctx,
69 int ffd)
70 {
71 struct mk_list *head;
72 struct flb_input_instance *i_ins;
73
74 mk_list_foreach(head, &ctx->config->inputs) {
75 i_ins = mk_list_entry(head, struct flb_input_instance, _head);
76 if (i_ins->id == ffd) {
77 return i_ins;
78 }
79 }
80
81 return NULL;
82 }
83
out_instance_get(flb_ctx_t * ctx,int ffd)84 static inline struct flb_output_instance *out_instance_get(flb_ctx_t *ctx,
85 int ffd)
86 {
87 struct mk_list *head;
88 struct flb_output_instance *o_ins;
89
90 mk_list_foreach(head, &ctx->config->outputs) {
91 o_ins = mk_list_entry(head, struct flb_output_instance, _head);
92 if (o_ins->id == ffd) {
93 return o_ins;
94 }
95 }
96
97 return NULL;
98 }
99
filter_instance_get(flb_ctx_t * ctx,int ffd)100 static inline struct flb_filter_instance *filter_instance_get(flb_ctx_t *ctx,
101 int ffd)
102 {
103 struct mk_list *head;
104 struct flb_filter_instance *f_ins;
105
106 mk_list_foreach(head, &ctx->config->filters) {
107 f_ins = mk_list_entry(head, struct flb_filter_instance, _head);
108 if (f_ins->id == ffd) {
109 return f_ins;
110 }
111 }
112
113 return NULL;
114 }
115
flb_init_env()116 void flb_init_env()
117 {
118 flb_tls_init();
119 flb_coro_init();
120 flb_upstream_init();
121 flb_output_prepare();
122
123 /* libraries */
124 cmt_initialize();
125 }
126
flb_create()127 flb_ctx_t *flb_create()
128 {
129 int ret;
130 flb_ctx_t *ctx;
131 struct flb_config *config;
132
133 #ifdef FLB_HAVE_MTRACE
134 /* Start tracing malloc and free */
135 mtrace();
136 #endif
137
138 #ifdef FLB_SYSTEM_WINDOWS
139 /* Ensure we initialized Windows Sockets */
140 if (flb_socket_init_win32()) {
141 return NULL;
142 }
143 #endif
144
145 ctx = flb_calloc(1, sizeof(flb_ctx_t));
146 if (!ctx) {
147 perror("malloc");
148 return NULL;
149 }
150
151 config = flb_config_init();
152 if (!config) {
153 flb_free(ctx);
154 return NULL;
155 }
156 ctx->config = config;
157 ctx->status = FLB_LIB_NONE;
158
159 /*
160 * Initialize our pipe to send data to our worker, used
161 * by 'lib' input plugin.
162 */
163 ret = flb_pipe_create(config->ch_data);
164 if (ret == -1) {
165 perror("pipe");
166 flb_config_exit(ctx->config);
167 flb_free(ctx);
168 return NULL;
169 }
170
171 /* Create the event loop to receive notifications */
172 ctx->event_loop = mk_event_loop_create(256);
173 if (!ctx->event_loop) {
174 flb_config_exit(ctx->config);
175 flb_free(ctx);
176 return NULL;
177 }
178 config->ch_evl = ctx->event_loop;
179
180 /* Prepare the notification channels */
181 ctx->event_channel = flb_calloc(1, sizeof(struct mk_event));
182 if (!ctx->event_channel) {
183 perror("calloc");
184 flb_config_exit(ctx->config);
185 flb_free(ctx);
186 return NULL;
187 }
188
189 MK_EVENT_ZERO(ctx->event_channel);
190
191 ret = mk_event_channel_create(config->ch_evl,
192 &config->ch_notif[0],
193 &config->ch_notif[1],
194 ctx->event_channel);
195 if (ret != 0) {
196 flb_error("[lib] could not create notification channels");
197 flb_config_exit(ctx->config);
198 flb_destroy(ctx);
199 return NULL;
200 }
201
202 #ifdef FLB_HAVE_AWS_ERROR_REPORTER
203 if (is_error_reporting_enabled()) {
204 error_reporter = flb_aws_error_reporter_create();
205 }
206 #endif
207
208 return ctx;
209 }
210
211 /* Release resources associated to the library context */
flb_destroy(flb_ctx_t * ctx)212 void flb_destroy(flb_ctx_t *ctx)
213 {
214 if (!ctx) {
215 return;
216 }
217
218 if (ctx->event_channel) {
219 mk_event_del(ctx->event_loop, ctx->event_channel);
220 flb_free(ctx->event_channel);
221 }
222
223 /* Remove resources from the event loop */
224 mk_event_loop_destroy(ctx->event_loop);
225
226 /* cfg->is_running is set to false when flb_engine_shutdown has been invoked (event loop) */
227 if(ctx->config) {
228 if (ctx->config->is_running == FLB_TRUE) {
229 flb_engine_shutdown(ctx->config);
230 }
231 flb_config_exit(ctx->config);
232 }
233
234 #ifdef FLB_HAVE_AWS_ERROR_REPORTER
235 if (is_error_reporting_enabled()) {
236 flb_aws_error_reporter_destroy(error_reporter);
237 }
238 #endif
239
240 flb_free(ctx);
241 ctx = NULL;
242
243 #ifdef FLB_HAVE_MTRACE
244 /* Stop tracing malloc and free */
245 muntrace();
246 #endif
247 }
248
249 /* Defines a new input instance */
flb_input(flb_ctx_t * ctx,const char * input,void * data)250 int flb_input(flb_ctx_t *ctx, const char *input, void *data)
251 {
252 struct flb_input_instance *i_ins;
253
254 i_ins = flb_input_new(ctx->config, input, data, FLB_TRUE);
255 if (!i_ins) {
256 return -1;
257 }
258
259 return i_ins->id;
260 }
261
262 /* Defines a new output instance */
flb_output(flb_ctx_t * ctx,const char * output,struct flb_lib_out_cb * cb)263 int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb)
264 {
265 struct flb_output_instance *o_ins;
266
267 o_ins = flb_output_new(ctx->config, output, cb, FLB_TRUE);
268 if (!o_ins) {
269 return -1;
270 }
271
272 return o_ins->id;
273 }
274
275 /* Defines a new filter instance */
flb_filter(flb_ctx_t * ctx,const char * filter,void * data)276 int flb_filter(flb_ctx_t *ctx, const char *filter, void *data)
277 {
278 struct flb_filter_instance *f_ins;
279
280 f_ins = flb_filter_new(ctx->config, filter, data);
281 if (!f_ins) {
282 return -1;
283 }
284
285 return f_ins->id;
286 }
287
288 /* Set an input interface property */
flb_input_set(flb_ctx_t * ctx,int ffd,...)289 int flb_input_set(flb_ctx_t *ctx, int ffd, ...)
290 {
291 int ret;
292 char *key;
293 char *value;
294 va_list va;
295 struct flb_input_instance *i_ins;
296
297 i_ins = in_instance_get(ctx, ffd);
298 if (!i_ins) {
299 return -1;
300 }
301
302 va_start(va, ffd);
303 while ((key = va_arg(va, char *))) {
304 value = va_arg(va, char *);
305 if (!value) {
306 /* Wrong parameter */
307 va_end(va);
308 return -1;
309 }
310 ret = flb_input_set_property(i_ins, key, value);
311 if (ret != 0) {
312 va_end(va);
313 return -1;
314 }
315 }
316
317 va_end(va);
318 return 0;
319 }
320
flb_config_map_property_check(char * plugin_name,struct mk_list * config_map,char * key,char * val)321 static inline int flb_config_map_property_check(char *plugin_name, struct mk_list *config_map, char *key, char *val)
322 {
323 struct flb_kv *kv;
324 struct mk_list properties;
325 int r;
326
327 mk_list_init(&properties);
328
329 kv = flb_kv_item_create(&properties, (char *) key, (char *) val);
330 if (!kv) {
331 return FLB_LIB_ERROR;
332 }
333
334 r = flb_config_map_properties_check(plugin_name, &properties, config_map);
335 flb_kv_item_destroy(kv);
336 return r;
337 }
338
339 /* Check if a given k, v is a valid config directive for the given output plugin */
flb_output_property_check(flb_ctx_t * ctx,int ffd,char * key,char * val)340 int flb_output_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val)
341 {
342 struct flb_output_instance *o_ins;
343 struct mk_list *config_map;
344 struct flb_output_plugin *p;
345 int r;
346
347 o_ins = out_instance_get(ctx, ffd);
348 if (!o_ins) {
349 return FLB_LIB_ERROR;
350 }
351
352 p = o_ins->p;
353 if (!p->config_map) {
354 return FLB_LIB_NO_CONFIG_MAP;
355 }
356
357 config_map = flb_config_map_create(ctx->config, p->config_map);
358 if (!config_map) {
359 return FLB_LIB_ERROR;
360 }
361
362 r = flb_config_map_property_check(p->name, config_map, key, val);
363 flb_config_map_destroy(config_map);
364 return r;
365 }
366
367 /* Check if a given k, v is a valid config directive for the given input plugin */
flb_input_property_check(flb_ctx_t * ctx,int ffd,char * key,char * val)368 int flb_input_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val)
369 {
370 struct flb_input_instance *i_ins;
371 struct flb_input_plugin *p;
372 struct mk_list *config_map;
373 int r;
374
375 i_ins = in_instance_get(ctx, ffd);
376 if (!i_ins) {
377 return FLB_LIB_ERROR;
378 }
379
380 p = i_ins->p;
381 if (!p->config_map) {
382 return FLB_LIB_NO_CONFIG_MAP;
383 }
384
385 config_map = flb_config_map_create(ctx->config, p->config_map);
386 if (!config_map) {
387 return FLB_LIB_ERROR;
388 }
389
390 r = flb_config_map_property_check(p->name, config_map, key, val);
391 flb_config_map_destroy(config_map);
392 return r;
393 }
394
395 /* Check if a given k, v is a valid config directive for the given filter plugin */
flb_filter_property_check(flb_ctx_t * ctx,int ffd,char * key,char * val)396 int flb_filter_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val)
397 {
398 struct flb_filter_instance *f_ins;
399 struct flb_filter_plugin *p;
400 struct mk_list *config_map;
401 int r;
402
403 f_ins = filter_instance_get(ctx, ffd);
404 if (!f_ins) {
405 return FLB_LIB_ERROR;
406 }
407
408 p = f_ins->p;
409 if (!p->config_map) {
410 return FLB_LIB_NO_CONFIG_MAP;
411 }
412
413 config_map = flb_config_map_create(ctx->config, p->config_map);
414 if (!config_map) {
415 return FLB_LIB_ERROR;
416 }
417
418 r = flb_config_map_property_check(p->name, config_map, key, val);
419 flb_config_map_destroy(config_map);
420 return r;
421 }
422
423 /* Set an output interface property */
flb_output_set(flb_ctx_t * ctx,int ffd,...)424 int flb_output_set(flb_ctx_t *ctx, int ffd, ...)
425 {
426 int ret;
427 char *key;
428 char *value;
429 va_list va;
430 struct flb_output_instance *o_ins;
431
432 o_ins = out_instance_get(ctx, ffd);
433 if (!o_ins) {
434 return -1;
435 }
436
437 va_start(va, ffd);
438 while ((key = va_arg(va, char *))) {
439 value = va_arg(va, char *);
440 if (!value) {
441 /* Wrong parameter */
442 va_end(va);
443 return -1;
444 }
445
446 ret = flb_output_set_property(o_ins, key, value);
447 if (ret != 0) {
448 va_end(va);
449 return -1;
450 }
451 }
452
453 va_end(va);
454 return 0;
455 }
456
flb_output_set_callback(flb_ctx_t * ctx,int ffd,char * name,void (* cb)(char *,void *,void *))457 int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name,
458 void (*cb)(char *, void *, void *))
459 {
460 struct flb_output_instance *o_ins;
461
462 o_ins = out_instance_get(ctx, ffd);
463 if (!o_ins) {
464 return -1;
465 }
466
467 return flb_callback_set(o_ins->callback, name, cb);
468 }
469
flb_output_set_test(flb_ctx_t * ctx,int ffd,char * test_name,void (* out_callback)(void *,int,int,void *,size_t,void *),void * out_callback_data,void * test_ctx)470 int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name,
471 void (*out_callback) (void *, int, int, void *, size_t, void *),
472 void *out_callback_data,
473 void *test_ctx)
474 {
475 struct flb_output_instance *o_ins;
476
477 o_ins = out_instance_get(ctx, ffd);
478 if (!o_ins) {
479 return -1;
480 }
481
482 /*
483 * Enabling a test, set the output instance in 'test' mode, so no real
484 * flush callback is invoked, only the desired implemented test.
485 */
486
487 /* Formatter test */
488 if (strcmp(test_name, "formatter") == 0) {
489 o_ins->test_mode = FLB_TRUE;
490 o_ins->test_formatter.rt_ctx = ctx;
491 o_ins->test_formatter.rt_ffd = ffd;
492 o_ins->test_formatter.rt_out_callback = out_callback;
493 o_ins->test_formatter.rt_data = out_callback_data;
494 o_ins->test_formatter.flush_ctx = test_ctx;
495 }
496 else {
497 return -1;
498 }
499
500 return 0;
501 }
502
503 /* Set an filter interface property */
flb_filter_set(flb_ctx_t * ctx,int ffd,...)504 int flb_filter_set(flb_ctx_t *ctx, int ffd, ...)
505 {
506 int ret;
507 char *key;
508 char *value;
509 va_list va;
510 struct flb_filter_instance *f_ins;
511
512 f_ins = filter_instance_get(ctx, ffd);
513 if (!f_ins) {
514 return -1;
515 }
516
517 va_start(va, ffd);
518 while ((key = va_arg(va, char *))) {
519 value = va_arg(va, char *);
520 if (!value) {
521 /* Wrong parameter */
522 va_end(va);
523 return -1;
524 }
525
526 ret = flb_filter_set_property(f_ins, key, value);
527 if (ret != 0) {
528 va_end(va);
529 return -1;
530 }
531 }
532
533 va_end(va);
534 return 0;
535 }
536
537 /* Set a service property */
flb_service_set(flb_ctx_t * ctx,...)538 int flb_service_set(flb_ctx_t *ctx, ...)
539 {
540 int ret;
541 char *key;
542 char *value;
543 va_list va;
544
545 va_start(va, ctx);
546
547 while ((key = va_arg(va, char *))) {
548 value = va_arg(va, char *);
549 if (!value) {
550 /* Wrong parameter */
551 va_end(va);
552 return -1;
553 }
554
555 ret = flb_config_set_property(ctx->config, key, value);
556 if (ret != 0) {
557 va_end(va);
558 return -1;
559 }
560 }
561
562 va_end(va);
563 return 0;
564 }
565
566 /* Load a configuration file that may be used by the input or output plugin */
flb_lib_config_file(struct flb_lib_ctx * ctx,const char * path)567 int flb_lib_config_file(struct flb_lib_ctx *ctx, const char *path)
568 {
569 if (access(path, R_OK) != 0) {
570 perror("access");
571 return -1;
572 }
573
574 ctx->config->file = mk_rconf_open(path);
575 if (!ctx->config->file) {
576 fprintf(stderr, "Error reading configuration file: %s\n", path);
577 return -1;
578 }
579
580 return 0;
581 }
582
583 /* This is a wrapper to release a buffer which comes from out_lib_flush() */
flb_lib_free(void * data)584 int flb_lib_free(void* data)
585 {
586 if (data == NULL) {
587 return -1;
588 }
589 flb_free(data);
590 return 0;
591 }
592
593
594 /* Push some data into the Engine */
flb_lib_push(flb_ctx_t * ctx,int ffd,const void * data,size_t len)595 int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len)
596 {
597 int ret;
598 struct flb_input_instance *i_ins;
599
600
601 if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) {
602 flb_error("[lib] cannot push data, engine is not running");
603 return -1;
604 }
605
606 i_ins = in_instance_get(ctx, ffd);
607 if (!i_ins) {
608 return -1;
609 }
610
611 ret = flb_pipe_w(i_ins->channel[1], data, len);
612 if (ret == -1) {
613 flb_errno();
614 return -1;
615 }
616 return ret;
617 }
618
flb_lib_worker(void * data)619 static void flb_lib_worker(void *data)
620 {
621 int ret;
622 flb_ctx_t *ctx = data;
623 struct flb_config *config;
624
625 config = ctx->config;
626 mk_utils_worker_rename("flb-pipeline");
627 ret = flb_engine_start(config);
628 if (ret == -1) {
629 flb_engine_failed(config);
630 flb_engine_shutdown(config);
631 }
632 ctx->status = FLB_LIB_NONE;
633 }
634
635 /* Return the current time to be used by lib callers */
flb_time_now()636 double flb_time_now()
637 {
638 struct flb_time t;
639
640 flb_time_get(&t);
641 return flb_time_to_double(&t);
642 }
643
644 /* Start the engine */
flb_start(flb_ctx_t * ctx)645 int flb_start(flb_ctx_t *ctx)
646 {
647 int fd;
648 int bytes;
649 int ret;
650 uint64_t val;
651 pthread_t tid;
652 struct mk_event *event;
653 struct flb_config *config;
654
655 pthread_once(&flb_lib_once, flb_init_env);
656
657 config = ctx->config;
658 ret = mk_utils_worker_spawn(flb_lib_worker, ctx, &tid);
659 if (ret == -1) {
660 return -1;
661 }
662 config->worker = tid;
663
664 /* Wait for the started signal so we can return to the caller */
665 mk_event_wait(config->ch_evl);
666 mk_event_foreach(event, config->ch_evl) {
667 fd = event->fd;
668 bytes = flb_pipe_r(fd, &val, sizeof(uint64_t));
669 if (bytes <= 0) {
670 pthread_cancel(tid);
671 pthread_join(tid, NULL);
672 ctx->status = FLB_LIB_ERROR;
673 return -1;
674 }
675
676 if (val == FLB_ENGINE_STARTED) {
677 flb_debug("[lib] backend started");
678 ctx->status = FLB_LIB_OK;
679 break;
680 }
681 else if (val == FLB_ENGINE_FAILED) {
682 flb_error("[lib] backend failed");
683 pthread_join(tid, NULL);
684 ctx->status = FLB_LIB_ERROR;
685 return -1;
686 }
687 }
688
689 return 0;
690 }
691
flb_loop(flb_ctx_t * ctx)692 int flb_loop(flb_ctx_t *ctx)
693 {
694 while (ctx->status == FLB_LIB_OK) {
695 sleep(1);
696 }
697 return 0;
698 }
699
700 /* Stop the engine */
flb_stop(flb_ctx_t * ctx)701 int flb_stop(flb_ctx_t *ctx)
702 {
703 int ret;
704 pthread_t tid;
705
706 if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) {
707 return 0;
708 }
709
710 if (!ctx->config) {
711 return 0;
712 }
713
714 if (ctx->config->file) {
715 mk_rconf_free(ctx->config->file);
716 }
717
718 flb_debug("[lib] sending STOP signal to the engine");
719
720 tid = ctx->config->worker;
721 flb_engine_exit(ctx->config);
722 ret = pthread_join(tid, NULL);
723 flb_debug("[lib] Fluent Bit engine stopped");
724
725 return ret;
726 }
727