1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit Demo
4  *  ===============
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 
22 #include <fluent-bit/flb_lib.h>
23 #include <fluent-bit/flb_mem.h>
24 #include <fluent-bit/flb_pipe.h>
25 #include <fluent-bit/flb_engine.h>
26 #include <fluent-bit/flb_input.h>
27 #include <fluent-bit/flb_output.h>
28 #include <fluent-bit/flb_filter.h>
29 #include <fluent-bit/flb_utils.h>
30 #include <fluent-bit/flb_time.h>
31 #include <fluent-bit/flb_coro.h>
32 #include <fluent-bit/flb_callback.h>
33 #include <fluent-bit/flb_kv.h>
34 #include <fluent-bit/flb_metrics.h>
35 #include <fluent-bit/tls/flb_tls.h>
36 
37 #include <signal.h>
38 #include <stdarg.h>
39 
40 #ifdef FLB_HAVE_MTRACE
41 #include <mcheck.h>
42 #endif
43 
44 #ifdef FLB_HAVE_AWS_ERROR_REPORTER
45 #include <fluent-bit/aws/flb_aws_error_reporter.h>
46 
47 struct flb_aws_error_reporter *error_reporter;
48 #endif
49 
50 /* thread initializator */
51 static pthread_once_t flb_lib_once = PTHREAD_ONCE_INIT;
52 
53 #ifdef FLB_SYSTEM_WINDOWS
flb_socket_init_win32(void)54 static inline int flb_socket_init_win32(void)
55 {
56     WSADATA wsaData;
57     int err;
58 
59     err = WSAStartup(MAKEWORD(2, 2), &wsaData);
60     if (err != 0) {
61         fprintf(stderr, "WSAStartup failed with error: %d\n", err);
62         return err;
63     }
64     return 0;
65 }
66 #endif
67 
in_instance_get(flb_ctx_t * ctx,int ffd)68 static inline struct flb_input_instance *in_instance_get(flb_ctx_t *ctx,
69                                                          int ffd)
70 {
71     struct mk_list *head;
72     struct flb_input_instance *i_ins;
73 
74     mk_list_foreach(head, &ctx->config->inputs) {
75         i_ins = mk_list_entry(head, struct flb_input_instance, _head);
76         if (i_ins->id == ffd) {
77             return i_ins;
78         }
79     }
80 
81     return NULL;
82 }
83 
out_instance_get(flb_ctx_t * ctx,int ffd)84 static inline struct flb_output_instance *out_instance_get(flb_ctx_t *ctx,
85                                                            int ffd)
86 {
87     struct mk_list *head;
88     struct flb_output_instance *o_ins;
89 
90     mk_list_foreach(head, &ctx->config->outputs) {
91         o_ins = mk_list_entry(head, struct flb_output_instance, _head);
92         if (o_ins->id == ffd) {
93             return o_ins;
94         }
95     }
96 
97     return NULL;
98 }
99 
filter_instance_get(flb_ctx_t * ctx,int ffd)100 static inline struct flb_filter_instance *filter_instance_get(flb_ctx_t *ctx,
101                                                               int ffd)
102 {
103     struct mk_list *head;
104     struct flb_filter_instance *f_ins;
105 
106     mk_list_foreach(head, &ctx->config->filters) {
107         f_ins = mk_list_entry(head, struct flb_filter_instance, _head);
108         if (f_ins->id == ffd) {
109             return f_ins;
110         }
111     }
112 
113     return NULL;
114 }
115 
flb_init_env()116 void flb_init_env()
117 {
118     flb_tls_init();
119     flb_coro_init();
120     flb_upstream_init();
121     flb_output_prepare();
122 
123     /* libraries */
124     cmt_initialize();
125 }
126 
flb_create()127 flb_ctx_t *flb_create()
128 {
129     int ret;
130     flb_ctx_t *ctx;
131     struct flb_config *config;
132 
133 #ifdef FLB_HAVE_MTRACE
134     /* Start tracing malloc and free */
135     mtrace();
136 #endif
137 
138 #ifdef FLB_SYSTEM_WINDOWS
139     /* Ensure we initialized Windows Sockets */
140     if (flb_socket_init_win32()) {
141         return NULL;
142     }
143 #endif
144 
145     ctx = flb_calloc(1, sizeof(flb_ctx_t));
146     if (!ctx) {
147         perror("malloc");
148         return NULL;
149     }
150 
151     config = flb_config_init();
152     if (!config) {
153         flb_free(ctx);
154         return NULL;
155     }
156     ctx->config = config;
157     ctx->status = FLB_LIB_NONE;
158 
159     /*
160      * Initialize our pipe to send data to our worker, used
161      * by 'lib' input plugin.
162      */
163     ret = flb_pipe_create(config->ch_data);
164     if (ret == -1) {
165         perror("pipe");
166         flb_config_exit(ctx->config);
167         flb_free(ctx);
168         return NULL;
169     }
170 
171     /* Create the event loop to receive notifications */
172     ctx->event_loop = mk_event_loop_create(256);
173     if (!ctx->event_loop) {
174         flb_config_exit(ctx->config);
175         flb_free(ctx);
176         return NULL;
177     }
178     config->ch_evl = ctx->event_loop;
179 
180     /* Prepare the notification channels */
181     ctx->event_channel = flb_calloc(1, sizeof(struct mk_event));
182     if (!ctx->event_channel) {
183         perror("calloc");
184         flb_config_exit(ctx->config);
185         flb_free(ctx);
186         return NULL;
187     }
188 
189     MK_EVENT_ZERO(ctx->event_channel);
190 
191     ret = mk_event_channel_create(config->ch_evl,
192                                   &config->ch_notif[0],
193                                   &config->ch_notif[1],
194                                   ctx->event_channel);
195     if (ret != 0) {
196         flb_error("[lib] could not create notification channels");
197         flb_config_exit(ctx->config);
198         flb_destroy(ctx);
199         return NULL;
200     }
201 
202     #ifdef FLB_HAVE_AWS_ERROR_REPORTER
203     if (is_error_reporting_enabled()) {
204         error_reporter = flb_aws_error_reporter_create();
205     }
206     #endif
207 
208     return ctx;
209 }
210 
211 /* Release resources associated to the library context */
flb_destroy(flb_ctx_t * ctx)212 void flb_destroy(flb_ctx_t *ctx)
213 {
214     if (!ctx) {
215         return;
216     }
217 
218     if (ctx->event_channel) {
219         mk_event_del(ctx->event_loop, ctx->event_channel);
220         flb_free(ctx->event_channel);
221     }
222 
223     /* Remove resources from the event loop */
224     mk_event_loop_destroy(ctx->event_loop);
225 
226     /* cfg->is_running is set to false when flb_engine_shutdown has been invoked (event loop) */
227     if(ctx->config) {
228         if (ctx->config->is_running == FLB_TRUE) {
229             flb_engine_shutdown(ctx->config);
230         }
231         flb_config_exit(ctx->config);
232     }
233 
234     #ifdef FLB_HAVE_AWS_ERROR_REPORTER
235     if (is_error_reporting_enabled()) {
236         flb_aws_error_reporter_destroy(error_reporter);
237     }
238     #endif
239 
240     flb_free(ctx);
241     ctx = NULL;
242 
243 #ifdef FLB_HAVE_MTRACE
244     /* Stop tracing malloc and free */
245     muntrace();
246 #endif
247 }
248 
249 /* Defines a new input instance */
flb_input(flb_ctx_t * ctx,const char * input,void * data)250 int flb_input(flb_ctx_t *ctx, const char *input, void *data)
251 {
252     struct flb_input_instance *i_ins;
253 
254     i_ins = flb_input_new(ctx->config, input, data, FLB_TRUE);
255     if (!i_ins) {
256         return -1;
257     }
258 
259     return i_ins->id;
260 }
261 
262 /* Defines a new output instance */
flb_output(flb_ctx_t * ctx,const char * output,struct flb_lib_out_cb * cb)263 int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb)
264 {
265     struct flb_output_instance *o_ins;
266 
267     o_ins = flb_output_new(ctx->config, output, cb, FLB_TRUE);
268     if (!o_ins) {
269         return -1;
270     }
271 
272     return o_ins->id;
273 }
274 
275 /* Defines a new filter instance */
flb_filter(flb_ctx_t * ctx,const char * filter,void * data)276 int flb_filter(flb_ctx_t *ctx, const char *filter, void *data)
277 {
278     struct flb_filter_instance *f_ins;
279 
280     f_ins = flb_filter_new(ctx->config, filter, data);
281     if (!f_ins) {
282         return -1;
283     }
284 
285     return f_ins->id;
286 }
287 
288 /* Set an input interface property */
flb_input_set(flb_ctx_t * ctx,int ffd,...)289 int flb_input_set(flb_ctx_t *ctx, int ffd, ...)
290 {
291     int ret;
292     char *key;
293     char *value;
294     va_list va;
295     struct flb_input_instance *i_ins;
296 
297     i_ins = in_instance_get(ctx, ffd);
298     if (!i_ins) {
299         return -1;
300     }
301 
302     va_start(va, ffd);
303     while ((key = va_arg(va, char *))) {
304         value = va_arg(va, char *);
305         if (!value) {
306             /* Wrong parameter */
307             va_end(va);
308             return -1;
309         }
310         ret = flb_input_set_property(i_ins, key, value);
311         if (ret != 0) {
312             va_end(va);
313             return -1;
314         }
315     }
316 
317     va_end(va);
318     return 0;
319 }
320 
flb_config_map_property_check(char * plugin_name,struct mk_list * config_map,char * key,char * val)321 static inline int flb_config_map_property_check(char *plugin_name, struct mk_list *config_map, char *key, char *val)
322 {
323     struct flb_kv *kv;
324     struct mk_list properties;
325     int r;
326 
327     mk_list_init(&properties);
328 
329     kv = flb_kv_item_create(&properties, (char *) key, (char *) val);
330     if (!kv) {
331         return FLB_LIB_ERROR;
332     }
333 
334     r = flb_config_map_properties_check(plugin_name, &properties, config_map);
335     flb_kv_item_destroy(kv);
336     return r;
337 }
338 
339 /* Check if a given k, v is a valid config directive for the given output plugin */
flb_output_property_check(flb_ctx_t * ctx,int ffd,char * key,char * val)340 int flb_output_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val)
341 {
342     struct flb_output_instance *o_ins;
343     struct mk_list *config_map;
344     struct flb_output_plugin *p;
345     int r;
346 
347     o_ins = out_instance_get(ctx, ffd);
348     if (!o_ins) {
349       return FLB_LIB_ERROR;
350     }
351 
352     p = o_ins->p;
353     if (!p->config_map) {
354         return FLB_LIB_NO_CONFIG_MAP;
355     }
356 
357     config_map = flb_config_map_create(ctx->config, p->config_map);
358     if (!config_map) {
359         return FLB_LIB_ERROR;
360     }
361 
362     r = flb_config_map_property_check(p->name, config_map, key, val);
363     flb_config_map_destroy(config_map);
364     return r;
365 }
366 
367 /* Check if a given k, v is a valid config directive for the given input plugin */
flb_input_property_check(flb_ctx_t * ctx,int ffd,char * key,char * val)368 int flb_input_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val)
369 {
370     struct flb_input_instance *i_ins;
371     struct flb_input_plugin *p;
372     struct mk_list *config_map;
373     int r;
374 
375     i_ins = in_instance_get(ctx, ffd);
376     if (!i_ins) {
377       return FLB_LIB_ERROR;
378     }
379 
380     p = i_ins->p;
381     if (!p->config_map) {
382         return FLB_LIB_NO_CONFIG_MAP;
383     }
384 
385     config_map = flb_config_map_create(ctx->config, p->config_map);
386     if (!config_map) {
387         return FLB_LIB_ERROR;
388     }
389 
390     r = flb_config_map_property_check(p->name, config_map, key, val);
391     flb_config_map_destroy(config_map);
392     return r;
393 }
394 
395 /* Check if a given k, v is a valid config directive for the given filter plugin */
flb_filter_property_check(flb_ctx_t * ctx,int ffd,char * key,char * val)396 int flb_filter_property_check(flb_ctx_t *ctx, int ffd, char *key, char *val)
397 {
398     struct flb_filter_instance *f_ins;
399     struct flb_filter_plugin *p;
400     struct mk_list *config_map;
401     int r;
402 
403     f_ins = filter_instance_get(ctx, ffd);
404     if (!f_ins) {
405       return FLB_LIB_ERROR;
406     }
407 
408     p = f_ins->p;
409     if (!p->config_map) {
410         return FLB_LIB_NO_CONFIG_MAP;
411     }
412 
413     config_map = flb_config_map_create(ctx->config, p->config_map);
414     if (!config_map) {
415         return FLB_LIB_ERROR;
416     }
417 
418     r = flb_config_map_property_check(p->name, config_map, key, val);
419     flb_config_map_destroy(config_map);
420     return r;
421 }
422 
423 /* Set an output interface property */
flb_output_set(flb_ctx_t * ctx,int ffd,...)424 int flb_output_set(flb_ctx_t *ctx, int ffd, ...)
425 {
426     int ret;
427     char *key;
428     char *value;
429     va_list va;
430     struct flb_output_instance *o_ins;
431 
432     o_ins = out_instance_get(ctx, ffd);
433     if (!o_ins) {
434         return -1;
435     }
436 
437     va_start(va, ffd);
438     while ((key = va_arg(va, char *))) {
439         value = va_arg(va, char *);
440         if (!value) {
441             /* Wrong parameter */
442             va_end(va);
443             return -1;
444         }
445 
446         ret = flb_output_set_property(o_ins, key, value);
447         if (ret != 0) {
448             va_end(va);
449             return -1;
450         }
451     }
452 
453     va_end(va);
454     return 0;
455 }
456 
flb_output_set_callback(flb_ctx_t * ctx,int ffd,char * name,void (* cb)(char *,void *,void *))457 int flb_output_set_callback(flb_ctx_t *ctx, int ffd, char *name,
458                             void (*cb)(char *, void *, void *))
459 {
460     struct flb_output_instance *o_ins;
461 
462     o_ins = out_instance_get(ctx, ffd);
463     if (!o_ins) {
464         return -1;
465     }
466 
467     return flb_callback_set(o_ins->callback, name, cb);
468 }
469 
flb_output_set_test(flb_ctx_t * ctx,int ffd,char * test_name,void (* out_callback)(void *,int,int,void *,size_t,void *),void * out_callback_data,void * test_ctx)470 int flb_output_set_test(flb_ctx_t *ctx, int ffd, char *test_name,
471                         void (*out_callback) (void *, int, int, void *, size_t, void *),
472                         void *out_callback_data,
473                         void *test_ctx)
474 {
475     struct flb_output_instance *o_ins;
476 
477     o_ins = out_instance_get(ctx, ffd);
478     if (!o_ins) {
479         return -1;
480     }
481 
482     /*
483      * Enabling a test, set the output instance in 'test' mode, so no real
484      * flush callback is invoked, only the desired implemented test.
485      */
486 
487     /* Formatter test */
488     if (strcmp(test_name, "formatter") == 0) {
489         o_ins->test_mode = FLB_TRUE;
490         o_ins->test_formatter.rt_ctx = ctx;
491         o_ins->test_formatter.rt_ffd = ffd;
492         o_ins->test_formatter.rt_out_callback = out_callback;
493         o_ins->test_formatter.rt_data = out_callback_data;
494         o_ins->test_formatter.flush_ctx = test_ctx;
495     }
496     else {
497         return -1;
498     }
499 
500     return 0;
501 }
502 
503 /* Set an filter interface property */
flb_filter_set(flb_ctx_t * ctx,int ffd,...)504 int flb_filter_set(flb_ctx_t *ctx, int ffd, ...)
505 {
506     int ret;
507     char *key;
508     char *value;
509     va_list va;
510     struct flb_filter_instance *f_ins;
511 
512     f_ins = filter_instance_get(ctx, ffd);
513     if (!f_ins) {
514         return -1;
515     }
516 
517     va_start(va, ffd);
518     while ((key = va_arg(va, char *))) {
519         value = va_arg(va, char *);
520         if (!value) {
521             /* Wrong parameter */
522             va_end(va);
523             return -1;
524         }
525 
526         ret = flb_filter_set_property(f_ins, key, value);
527         if (ret != 0) {
528             va_end(va);
529             return -1;
530         }
531     }
532 
533     va_end(va);
534     return 0;
535 }
536 
537 /* Set a service property */
flb_service_set(flb_ctx_t * ctx,...)538 int flb_service_set(flb_ctx_t *ctx, ...)
539 {
540     int ret;
541     char *key;
542     char *value;
543     va_list va;
544 
545     va_start(va, ctx);
546 
547     while ((key = va_arg(va, char *))) {
548         value = va_arg(va, char *);
549         if (!value) {
550             /* Wrong parameter */
551             va_end(va);
552             return -1;
553         }
554 
555         ret = flb_config_set_property(ctx->config, key, value);
556         if (ret != 0) {
557             va_end(va);
558             return -1;
559         }
560     }
561 
562     va_end(va);
563     return 0;
564 }
565 
566 /* Load a configuration file that may be used by the input or output plugin */
flb_lib_config_file(struct flb_lib_ctx * ctx,const char * path)567 int flb_lib_config_file(struct flb_lib_ctx *ctx, const char *path)
568 {
569     if (access(path, R_OK) != 0) {
570         perror("access");
571         return -1;
572     }
573 
574     ctx->config->file = mk_rconf_open(path);
575     if (!ctx->config->file) {
576         fprintf(stderr, "Error reading configuration file: %s\n", path);
577         return -1;
578     }
579 
580     return 0;
581 }
582 
583 /* This is a wrapper to release a buffer which comes from out_lib_flush() */
flb_lib_free(void * data)584 int flb_lib_free(void* data)
585 {
586     if (data == NULL) {
587         return -1;
588     }
589     flb_free(data);
590     return 0;
591 }
592 
593 
594 /* Push some data into the Engine */
flb_lib_push(flb_ctx_t * ctx,int ffd,const void * data,size_t len)595 int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len)
596 {
597     int ret;
598     struct flb_input_instance *i_ins;
599 
600 
601     if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) {
602         flb_error("[lib] cannot push data, engine is not running");
603         return -1;
604     }
605 
606     i_ins = in_instance_get(ctx, ffd);
607     if (!i_ins) {
608         return -1;
609     }
610 
611     ret = flb_pipe_w(i_ins->channel[1], data, len);
612     if (ret == -1) {
613         flb_errno();
614         return -1;
615     }
616     return ret;
617 }
618 
flb_lib_worker(void * data)619 static void flb_lib_worker(void *data)
620 {
621     int ret;
622     flb_ctx_t *ctx = data;
623     struct flb_config *config;
624 
625     config = ctx->config;
626     mk_utils_worker_rename("flb-pipeline");
627     ret = flb_engine_start(config);
628     if (ret == -1) {
629         flb_engine_failed(config);
630         flb_engine_shutdown(config);
631     }
632     ctx->status = FLB_LIB_NONE;
633 }
634 
635 /* Return the current time to be used by lib callers */
flb_time_now()636 double flb_time_now()
637 {
638     struct flb_time t;
639 
640     flb_time_get(&t);
641     return flb_time_to_double(&t);
642 }
643 
644 /* Start the engine */
flb_start(flb_ctx_t * ctx)645 int flb_start(flb_ctx_t *ctx)
646 {
647     int fd;
648     int bytes;
649     int ret;
650     uint64_t val;
651     pthread_t tid;
652     struct mk_event *event;
653     struct flb_config *config;
654 
655     pthread_once(&flb_lib_once, flb_init_env);
656 
657     config = ctx->config;
658     ret = mk_utils_worker_spawn(flb_lib_worker, ctx, &tid);
659     if (ret == -1) {
660         return -1;
661     }
662     config->worker = tid;
663 
664     /* Wait for the started signal so we can return to the caller */
665     mk_event_wait(config->ch_evl);
666     mk_event_foreach(event, config->ch_evl) {
667         fd = event->fd;
668         bytes = flb_pipe_r(fd, &val, sizeof(uint64_t));
669         if (bytes <= 0) {
670             pthread_cancel(tid);
671             pthread_join(tid, NULL);
672             ctx->status = FLB_LIB_ERROR;
673             return -1;
674         }
675 
676         if (val == FLB_ENGINE_STARTED) {
677             flb_debug("[lib] backend started");
678             ctx->status = FLB_LIB_OK;
679             break;
680         }
681         else if (val == FLB_ENGINE_FAILED) {
682             flb_error("[lib] backend failed");
683             pthread_join(tid, NULL);
684             ctx->status = FLB_LIB_ERROR;
685             return -1;
686         }
687     }
688 
689     return 0;
690 }
691 
flb_loop(flb_ctx_t * ctx)692 int flb_loop(flb_ctx_t *ctx)
693 {
694     while (ctx->status == FLB_LIB_OK) {
695         sleep(1);
696     }
697     return 0;
698 }
699 
700 /* Stop the engine */
flb_stop(flb_ctx_t * ctx)701 int flb_stop(flb_ctx_t *ctx)
702 {
703     int ret;
704     pthread_t tid;
705 
706     if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) {
707         return 0;
708     }
709 
710     if (!ctx->config) {
711         return 0;
712     }
713 
714     if (ctx->config->file) {
715         mk_rconf_free(ctx->config->file);
716     }
717 
718     flb_debug("[lib] sending STOP signal to the engine");
719 
720     tid = ctx->config->worker;
721     flb_engine_exit(ctx->config);
722     ret = pthread_join(tid, NULL);
723     flb_debug("[lib] Fluent Bit engine stopped");
724 
725     return ret;
726 }
727