1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 
25 #include <fluent-bit/flb_info.h>
26 #include <fluent-bit/flb_mem.h>
27 #include <fluent-bit/flb_str.h>
28 #include <fluent-bit/flb_env.h>
29 #include <fluent-bit/flb_coro.h>
30 #include <fluent-bit/flb_output.h>
31 #include <fluent-bit/flb_kv.h>
32 #include <fluent-bit/flb_io.h>
33 #include <fluent-bit/flb_uri.h>
34 #include <fluent-bit/flb_config.h>
35 #include <fluent-bit/flb_macros.h>
36 #include <fluent-bit/flb_utils.h>
37 #include <fluent-bit/flb_plugin_proxy.h>
38 #include <fluent-bit/flb_http_client_debug.h>
39 #include <fluent-bit/flb_output_thread.h>
40 #include <fluent-bit/flb_mp.h>
41 #include <fluent-bit/flb_pack.h>
42 
43 FLB_TLS_DEFINE(struct flb_out_coro_params, out_coro_params);
44 
flb_output_prepare()45 void flb_output_prepare()
46 {
47     FLB_TLS_INIT(out_coro_params);
48 }
49 
50 /* Validate the the output address protocol */
check_protocol(const char * prot,const char * output)51 static int check_protocol(const char *prot, const char *output)
52 {
53     int len;
54     char *p;
55 
56     p = strstr(output, "://");
57     if (p && p != output) {
58         len = p - output;
59     }
60     else {
61         len = strlen(output);
62     }
63 
64     if (strlen(prot) != len) {
65         return 0;
66     }
67 
68     /* Output plugin match */
69     if (strncasecmp(prot, output, len) == 0) {
70         return 1;
71     }
72 
73     return 0;
74 }
75 
76 
77 /* Invoke pre-run call for the output plugin */
flb_output_pre_run(struct flb_config * config)78 void flb_output_pre_run(struct flb_config *config)
79 {
80     struct mk_list *head;
81     struct flb_output_instance *ins;
82     struct flb_output_plugin *p;
83 
84     mk_list_foreach(head, &config->outputs) {
85         ins = mk_list_entry(head, struct flb_output_instance, _head);
86         p = ins->p;
87         if (p->cb_pre_run) {
88             p->cb_pre_run(ins->context, config);
89         }
90     }
91 }
92 
flb_output_free_properties(struct flb_output_instance * ins)93 static void flb_output_free_properties(struct flb_output_instance *ins)
94 {
95 
96     flb_kv_release(&ins->properties);
97     flb_kv_release(&ins->net_properties);
98 
99 #ifdef FLB_HAVE_TLS
100     if (ins->tls_vhost) {
101         flb_sds_destroy(ins->tls_vhost);
102     }
103     if (ins->tls_ca_path) {
104         flb_sds_destroy(ins->tls_ca_path);
105     }
106     if (ins->tls_ca_file) {
107         flb_sds_destroy(ins->tls_ca_file);
108     }
109     if (ins->tls_crt_file) {
110         flb_sds_destroy(ins->tls_crt_file);
111     }
112     if (ins->tls_key_file) {
113         flb_sds_destroy(ins->tls_key_file);
114     }
115     if (ins->tls_key_passwd) {
116         flb_sds_destroy(ins->tls_key_passwd);
117     }
118 #endif
119 }
120 
flb_output_coro_prepare_destroy(struct flb_output_coro * out_coro)121 void flb_output_coro_prepare_destroy(struct flb_output_coro *out_coro)
122 {
123     struct flb_output_instance *ins = out_coro->o_ins;
124     struct flb_out_thread_instance *th_ins;
125 
126     /* Move output coroutine context from active list to the destroy one */
127     if (flb_output_is_threaded(ins) == FLB_TRUE) {
128         th_ins = flb_output_thread_instance_get();
129         pthread_mutex_lock(&th_ins->coro_mutex);
130         mk_list_del(&out_coro->_head);
131         mk_list_add(&out_coro->_head, &th_ins->coros_destroy);
132         pthread_mutex_unlock(&th_ins->coro_mutex);
133     }
134     else {
135         mk_list_del(&out_coro->_head);
136         mk_list_add(&out_coro->_head, &ins->coros_destroy);
137     }
138 }
139 
flb_output_coro_id_get(struct flb_output_instance * ins)140 int flb_output_coro_id_get(struct flb_output_instance *ins)
141 {
142     int id;
143     int max = (2 << 13) - 1; /* max for 14 bits */
144     struct flb_out_thread_instance *th_ins;
145 
146     if (flb_output_is_threaded(ins) == FLB_TRUE) {
147         th_ins = flb_output_thread_instance_get();
148         id = th_ins->coro_id;
149         th_ins->coro_id++;
150 
151         /* reset once it reach the maximum allowed */
152         if (th_ins->coro_id > max) {
153             th_ins->coro_id = 0;
154         }
155     }
156     else {
157         id = ins->coro_id;
158         ins->coro_id++;
159 
160         /* reset once it reach the maximum allowed */
161         if (ins->coro_id > max) {
162             ins->coro_id = 0;
163         }
164     }
165 
166     return id;
167 }
168 
flb_output_coro_add(struct flb_output_instance * ins,struct flb_coro * coro)169 void flb_output_coro_add(struct flb_output_instance *ins, struct flb_coro *coro)
170 {
171     struct flb_output_coro *out_coro;
172 
173     out_coro = (struct flb_output_coro *) FLB_CORO_DATA(coro);
174     mk_list_add(&out_coro->_head, &ins->coros);
175 }
176 
177 /*
178  * Flush a task through the output plugin, either using a worker thread + coroutine
179  * or a simple co-routine in the current thread.
180  */
flb_output_task_flush(struct flb_task * task,struct flb_output_instance * out_ins,struct flb_config * config)181 int flb_output_task_flush(struct flb_task *task,
182                           struct flb_output_instance *out_ins,
183                           struct flb_config *config)
184 {
185     int ret;
186     struct flb_output_coro *out_coro;
187 
188     if (flb_output_is_threaded(out_ins) == FLB_TRUE) {
189         flb_task_users_inc(task);
190 
191         /* Dispatch the task to the thread pool */
192         ret = flb_output_thread_pool_flush(task, out_ins, config);
193         if (ret == -1) {
194             flb_task_users_dec(task, FLB_FALSE);
195         }
196     }
197     else {
198         /* Direct co-routine handling */
199         out_coro = flb_output_coro_create(task,
200                                           task->i_ins,
201                                           out_ins,
202                                           config,
203                                           task->buf, task->size,
204                                           task->tag,
205                                           task->tag_len);
206         if (!out_coro) {
207             return -1;
208         }
209 
210         flb_task_users_inc(task);
211         flb_coro_resume(out_coro->coro);
212     }
213 
214     return 0;
215 }
216 
flb_output_instance_destroy(struct flb_output_instance * ins)217 int flb_output_instance_destroy(struct flb_output_instance *ins)
218 {
219     if (ins->alias) {
220         flb_sds_destroy(ins->alias);
221     }
222 
223     /* Remove URI context */
224     if (ins->host.uri) {
225         flb_uri_destroy(ins->host.uri);
226     }
227 
228     flb_sds_destroy(ins->host.name);
229     flb_sds_destroy(ins->host.address);
230     flb_sds_destroy(ins->host.listen);
231     flb_sds_destroy(ins->match);
232 
233 #ifdef FLB_HAVE_REGEX
234         if (ins->match_regex) {
235             flb_regex_destroy(ins->match_regex);
236         }
237 #endif
238 
239 #ifdef FLB_HAVE_TLS
240     if (ins->use_tls == FLB_TRUE) {
241         if (ins->tls) {
242             flb_tls_destroy(ins->tls);
243         }
244     }
245 
246     if (ins->tls_config_map) {
247         flb_config_map_destroy(ins->tls_config_map);
248     }
249 #endif
250 
251     /* Remove metrics */
252 #ifdef FLB_HAVE_METRICS
253     if (ins->cmt) {
254         cmt_destroy(ins->cmt);
255     }
256 
257     if (ins->metrics) {
258         flb_metrics_destroy(ins->metrics);
259     }
260 #endif
261 
262     /* destroy callback context */
263     if (ins->callback) {
264         flb_callback_destroy(ins->callback);
265     }
266 
267     /* destroy config map */
268     if (ins->config_map) {
269         flb_config_map_destroy(ins->config_map);
270     }
271 
272     if (ins->net_config_map) {
273         flb_config_map_destroy(ins->net_config_map);
274     }
275 
276     if (ins->ch_events[0] > 0) {
277         mk_event_closesocket(ins->ch_events[0]);
278     }
279 
280     if (ins->ch_events[1] > 0) {
281         mk_event_closesocket(ins->ch_events[1]);
282     }
283 
284     /* release properties */
285     flb_output_free_properties(ins);
286 
287     mk_list_del(&ins->_head);
288     flb_free(ins);
289 
290     return 0;
291 }
292 
293 /* Invoke exit call for the output plugin */
flb_output_exit(struct flb_config * config)294 void flb_output_exit(struct flb_config *config)
295 {
296     struct mk_list *tmp;
297     struct mk_list *head;
298     struct flb_output_instance *ins;
299     struct flb_output_plugin *p;
300     void *params;
301 
302     mk_list_foreach_safe(head, tmp, &config->outputs) {
303         ins = mk_list_entry(head, struct flb_output_instance, _head);
304         p = ins->p;
305 
306         /* Stop any worker thread */
307         if (flb_output_is_threaded(ins) == FLB_TRUE) {
308             flb_output_thread_pool_destroy(ins);
309         }
310 
311         /* Check a exit callback */
312         if (p->cb_exit) {
313             if(!p->proxy) {
314                 p->cb_exit(ins->context, config);
315             }
316             else {
317                 p->cb_exit(p, ins->context);
318             }
319         }
320         flb_output_instance_destroy(ins);
321     }
322 
323     params = FLB_TLS_GET(out_coro_params);
324     if (params) {
325         flb_free(params);
326     }
327 }
328 
instance_id(struct flb_config * config)329 static inline int instance_id(struct flb_config *config)
330 {
331     struct flb_output_instance *ins;
332 
333     if (mk_list_size(&config->outputs) == 0) {
334         return 0;
335     }
336 
337     ins = mk_list_entry_last(&config->outputs, struct flb_output_instance,
338                              _head);
339     return (ins->id + 1);
340 }
341 
flb_output_get_instance(struct flb_config * config,int out_id)342 struct flb_output_instance *flb_output_get_instance(struct flb_config *config,
343                                                     int out_id)
344 {
345     struct mk_list *head;
346     struct flb_output_instance *ins;
347 
348     mk_list_foreach(head, &config->outputs) {
349         ins = mk_list_entry(head, struct flb_output_instance, _head);
350         if (ins->id == out_id) {
351             break;
352         }
353         ins = NULL;
354     }
355 
356     if (!ins) {
357         return NULL;
358     }
359 
360     return ins;
361 }
362 
363 /*
364  * Invoked everytime a flush callback has finished (returned). This function
365  * is called from the event loop.
366  */
flb_output_flush_finished(struct flb_config * config,int out_id)367 int flb_output_flush_finished(struct flb_config *config, int out_id)
368 {
369     struct mk_list *tmp;
370     struct mk_list *head;
371     struct mk_list *list;
372     struct flb_output_instance *ins;
373     struct flb_output_coro *out_coro;
374     struct flb_out_thread_instance *th_ins;
375 
376     ins = flb_output_get_instance(config, out_id);
377     if (!ins) {
378         return -1;
379     }
380 
381     if (flb_output_is_threaded(ins) == FLB_TRUE) {
382         th_ins = flb_output_thread_instance_get();
383         list = &th_ins->coros_destroy;
384     }
385     else {
386         list = &ins->coros_destroy;
387     }
388 
389     /* Look for output coroutines that needs to be destroyed */
390     mk_list_foreach_safe(head, tmp, list) {
391         out_coro = mk_list_entry(head, struct flb_output_coro, _head);
392         flb_output_coro_destroy(out_coro);
393     }
394 
395     return 0;
396 }
397 
398 
399 /*
400  * It validate an output type given the string, it return the
401  * proper type and if valid, populate the global config.
402  */
flb_output_new(struct flb_config * config,const char * output,void * data,int public_only)403 struct flb_output_instance *flb_output_new(struct flb_config *config,
404                                            const char *output, void *data,
405                                            int public_only)
406 {
407     int ret = -1;
408     int flags = 0;
409     struct mk_list *head;
410     struct flb_output_plugin *plugin;
411     struct flb_output_instance *instance = NULL;
412 
413     if (!output) {
414         return NULL;
415     }
416 
417     mk_list_foreach(head, &config->out_plugins) {
418         plugin = mk_list_entry(head, struct flb_output_plugin, _head);
419         if (!check_protocol(plugin->name, output)) {
420             plugin = NULL;
421             continue;
422         }
423 
424         if (public_only && plugin->flags & FLB_OUTPUT_PRIVATE) {
425             return NULL;
426         }
427         break;
428     }
429 
430     if (!plugin) {
431         return NULL;
432     }
433 
434     /* Create and load instance */
435     instance = flb_calloc(1, sizeof(struct flb_output_instance));
436     if (!instance) {
437         flb_errno();
438         return NULL;
439     }
440 
441     /* Initialize event type, if not set, default to FLB_OUTPUT_LOGS */
442     if (plugin->event_type == 0) {
443         instance->event_type = FLB_OUTPUT_LOGS;
444     }
445     else {
446         instance->event_type = plugin->event_type;
447     }
448     instance->config = config;
449     instance->log_level = -1;
450     instance->test_mode = FLB_FALSE;
451     instance->is_threaded = FLB_FALSE;
452 
453 
454     /* Retrieve an instance id for the output instance */
455     instance->id = instance_id(config);
456 
457     /* format name (with instance id) */
458     snprintf(instance->name, sizeof(instance->name) - 1,
459              "%s.%i", plugin->name, instance->id);
460     instance->p = plugin;
461     instance->callback = flb_callback_create(instance->name);
462     if (!instance->callback) {
463         flb_free(instance);
464         return NULL;
465     }
466 
467     if (plugin->type == FLB_OUTPUT_PLUGIN_CORE) {
468         instance->context = NULL;
469     }
470     else {
471         struct flb_plugin_proxy_context *ctx;
472 
473         ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
474         if (!ctx) {
475             flb_errno();
476             flb_free(instance);
477             return NULL;
478         }
479 
480         ctx->proxy = plugin->proxy;
481 
482         instance->context = ctx;
483     }
484 
485     instance->alias       = NULL;
486     instance->flags       = instance->p->flags;
487     instance->data        = data;
488     instance->match       = NULL;
489 #ifdef FLB_HAVE_REGEX
490     instance->match_regex = NULL;
491 #endif
492     instance->retry_limit = 1;
493     instance->host.name   = NULL;
494     instance->host.address = NULL;
495     instance->net_config_map = NULL;
496 
497     /* Storage */
498     instance->total_limit_size = -1;
499 
500     /* Parent plugin flags */
501     flags = instance->flags;
502     if (flags & FLB_IO_TCP) {
503         instance->use_tls = FLB_FALSE;
504     }
505     else if (flags & FLB_IO_TLS) {
506         instance->use_tls = FLB_TRUE;
507     }
508     else if (flags & FLB_IO_OPT_TLS) {
509         /* TLS must be enabled manually in the config */
510         instance->use_tls = FLB_FALSE;
511         instance->flags |= FLB_IO_TLS;
512     }
513 
514 #ifdef FLB_HAVE_TLS
515     instance->tls                   = NULL;
516     instance->tls_debug             = -1;
517     instance->tls_verify            = FLB_TRUE;
518     instance->tls_vhost             = NULL;
519     instance->tls_ca_path           = NULL;
520     instance->tls_ca_file           = NULL;
521     instance->tls_crt_file          = NULL;
522     instance->tls_key_file          = NULL;
523     instance->tls_key_passwd        = NULL;
524 #endif
525 
526     if (plugin->flags & FLB_OUTPUT_NET) {
527         ret = flb_net_host_set(plugin->name, &instance->host, output);
528         if (ret != 0) {
529             flb_free(instance);
530             return NULL;
531         }
532     }
533 
534     flb_kv_init(&instance->properties);
535     flb_kv_init(&instance->net_properties);
536     mk_list_init(&instance->upstreams);
537     mk_list_init(&instance->coros);
538     mk_list_init(&instance->coros_destroy);
539 
540     mk_list_add(&instance->_head, &config->outputs);
541 
542     /* Tests */
543     instance->test_formatter.callback = plugin->test_formatter.callback;
544 
545     return instance;
546 }
547 
prop_key_check(const char * key,const char * kv,int k_len)548 static inline int prop_key_check(const char *key, const char *kv, int k_len)
549 {
550     int len;
551 
552     len = strlen(key);
553     if (strncasecmp(key, kv, k_len) == 0 && len == k_len) {
554         return 0;
555     }
556 
557     return -1;
558 }
559 
560 /* Override a configuration property for the given input_instance plugin */
flb_output_set_property(struct flb_output_instance * ins,const char * k,const char * v)561 int flb_output_set_property(struct flb_output_instance *ins,
562                             const char *k, const char *v)
563 {
564     int len;
565     int ret;
566     ssize_t limit;
567     flb_sds_t tmp;
568     struct flb_kv *kv;
569     struct flb_config *config = ins->config;
570 
571     len = strlen(k);
572     tmp = flb_env_var_translate(config->env, v);
573     if (tmp) {
574         if (strlen(tmp) == 0) {
575             flb_sds_destroy(tmp);
576             tmp = NULL;
577         }
578     }
579 
580     /* Check if the key is a known/shared property */
581     if (prop_key_check("match", k, len) == 0) {
582         ins->match = tmp;
583     }
584 #ifdef FLB_HAVE_REGEX
585     else if (prop_key_check("match_regex", k, len) == 0 && tmp) {
586         ins->match_regex = flb_regex_create(tmp);
587         flb_sds_destroy(tmp);
588     }
589 #endif
590     else if (prop_key_check("alias", k, len) == 0 && tmp) {
591         ins->alias = tmp;
592     }
593     else if (prop_key_check("log_level", k, len) == 0 && tmp) {
594         ret = flb_log_get_level_str(tmp);
595         flb_sds_destroy(tmp);
596         if (ret == -1) {
597             return -1;
598         }
599         ins->log_level = ret;
600     }
601     else if (prop_key_check("host", k, len) == 0) {
602         ins->host.name = tmp;
603     }
604     else if (prop_key_check("port", k, len) == 0) {
605         if (tmp) {
606             ins->host.port = atoi(tmp);
607             flb_sds_destroy(tmp);
608         }
609         else {
610             ins->host.port = 0;
611         }
612     }
613     else if (prop_key_check("ipv6", k, len) == 0 && tmp) {
614         ins->host.ipv6 = flb_utils_bool(tmp);
615         flb_sds_destroy(tmp);
616     }
617     else if (prop_key_check("retry_limit", k, len) == 0) {
618         if (tmp) {
619             if (strcasecmp(tmp, "no_limits") == 0 ||
620                 strcasecmp(tmp, "false") == 0 ||
621                 strcasecmp(tmp, "off") == 0) {
622                 /* No limits for retries */
623                 ins->retry_limit = FLB_OUT_RETRY_UNLIMITED;
624             }
625             else if (strcasecmp(tmp, "no_retries") == 0) {
626                 ins->retry_limit = FLB_OUT_RETRY_NONE;
627             }
628             else {
629                 ins->retry_limit = atoi(tmp);
630                 if (ins->retry_limit <= 0) {
631                     flb_warn("[config] invalid retry_limit. set default.");
632                     /* set default when input is invalid number */
633                     ins->retry_limit = 1;
634                 }
635             }
636             flb_sds_destroy(tmp);
637         }
638         else {
639             ins->retry_limit = 1;
640         }
641     }
642     else if (strncasecmp("net.", k, 4) == 0 && tmp) {
643         kv = flb_kv_item_create(&ins->net_properties, (char *) k, NULL);
644         if (!kv) {
645             if (tmp) {
646                 flb_sds_destroy(tmp);
647             }
648             return -1;
649         }
650         kv->val = tmp;
651     }
652 #ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
653     else if (strncasecmp("_debug.http.", k, 12) == 0 && tmp) {
654         ret = flb_http_client_debug_property_is_valid((char *) k, tmp);
655         if (ret == FLB_TRUE) {
656             kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
657             if (!kv) {
658                 if (tmp) {
659                     flb_sds_destroy(tmp);
660                 }
661                 return -1;
662             }
663             kv->val = tmp;
664         }
665         else {
666             flb_error("[config] invalid property '%s' on instance '%s'",
667                       k, flb_output_name(ins));
668             flb_sds_destroy(tmp);
669         }
670     }
671 #endif
672 #ifdef FLB_HAVE_TLS
673     else if (prop_key_check("tls", k, len) == 0 && tmp) {
674         if (strcasecmp(tmp, "true") == 0 || strcasecmp(tmp, "on") == 0) {
675             if ((ins->flags & FLB_IO_TLS) == 0) {
676                 flb_error("[config] %s don't support TLS", ins->name);
677                 flb_sds_destroy(tmp);
678                 return -1;
679             }
680 
681             ins->use_tls = FLB_TRUE;
682         }
683         else {
684             ins->use_tls = FLB_FALSE;
685         }
686         flb_sds_destroy(tmp);
687     }
688     else if (prop_key_check("tls.verify", k, len) == 0 && tmp) {
689         if (strcasecmp(tmp, "true") == 0 || strcasecmp(tmp, "on") == 0) {
690             ins->tls_verify = FLB_TRUE;
691         }
692         else {
693             ins->tls_verify = FLB_FALSE;
694         }
695         flb_sds_destroy(tmp);
696     }
697     else if (prop_key_check("tls.debug", k, len) == 0 && tmp) {
698         ins->tls_debug = atoi(tmp);
699         flb_sds_destroy(tmp);
700     }
701     else if (prop_key_check("tls.vhost", k, len) == 0) {
702         ins->tls_vhost = tmp;
703     }
704     else if (prop_key_check("tls.ca_path", k, len) == 0) {
705         ins->tls_ca_path = tmp;
706     }
707     else if (prop_key_check("tls.ca_file", k, len) == 0) {
708         ins->tls_ca_file = tmp;
709     }
710     else if (prop_key_check("tls.crt_file", k, len) == 0) {
711         ins->tls_crt_file = tmp;
712     }
713     else if (prop_key_check("tls.key_file", k, len) == 0) {
714         ins->tls_key_file = tmp;
715     }
716     else if (prop_key_check("tls.key_passwd", k, len) == 0) {
717         ins->tls_key_passwd = tmp;
718     }
719 #endif
720     else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) {
721         if (strcasecmp(tmp, "off") == 0 ||
722             flb_utils_bool(tmp) == FLB_FALSE) {
723             /* no limit for filesystem storage */
724             limit = -1;
725             flb_info("[config] unlimited filesystem buffer for %s plugin",
726                      ins->name);
727         }
728         else {
729             limit = flb_utils_size_to_bytes(tmp);
730             if (limit == -1) {
731                 flb_sds_destroy(tmp);
732                 return -1;
733             }
734 
735             if (limit == 0) {
736                 limit = -1;
737             }
738         }
739 
740         flb_sds_destroy(tmp);
741         ins->total_limit_size = (size_t) limit;
742     }
743     else if (prop_key_check("workers", k, len) == 0 && tmp) {
744         /* Set the number of workers */
745         ins->tp_workers = atoi(tmp);
746         flb_sds_destroy(tmp);
747     }
748     else {
749         /*
750          * Create the property, we don't pass the value since we will
751          * map it directly to avoid an extra memory allocation.
752          */
753         kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
754         if (!kv) {
755             if (tmp) {
756                 flb_sds_destroy(tmp);
757             }
758             return -1;
759         }
760         kv->val = tmp;
761     }
762 
763     return 0;
764 }
765 
766 /* Configure a default hostname and TCP port if they are not set */
flb_output_net_default(const char * host,const int port,struct flb_output_instance * ins)767 void flb_output_net_default(const char *host, const int port,
768                             struct flb_output_instance *ins)
769 {
770     /* Set default network configuration */
771     if (!ins->host.name) {
772         ins->host.name = flb_sds_create(host);
773     }
774     if (ins->host.port == 0) {
775         ins->host.port = port;
776     }
777 }
778 
779 /* Add thread pool for output plugin if configured with workers */
flb_output_enable_multi_threading(struct flb_output_instance * ins,struct flb_config * config)780 int flb_output_enable_multi_threading(struct flb_output_instance *ins, struct flb_config *config)
781 {
782     /* Multi-threading enabled ? (through 'workers' property) */
783     if (ins->tp_workers > 0) {
784         if(flb_output_thread_pool_create(config, ins) != 0) {
785             flb_output_instance_destroy(ins);
786             return -1;
787         }
788         flb_output_thread_pool_start(ins);
789     }
790 
791     return 0;
792 }
793 
794 /* Return an instance name or alias */
flb_output_name(struct flb_output_instance * ins)795 const char *flb_output_name(struct flb_output_instance *ins)
796 {
797     if (ins->alias) {
798         return ins->alias;
799     }
800 
801     return ins->name;
802 }
803 
flb_output_get_property(const char * key,struct flb_output_instance * ins)804 const char *flb_output_get_property(const char *key, struct flb_output_instance *ins)
805 {
806     return flb_config_prop_get(key, &ins->properties);
807 }
808 
809 /* Trigger the output plugins setup callbacks to prepare them. */
flb_output_init_all(struct flb_config * config)810 int flb_output_init_all(struct flb_config *config)
811 {
812     int ret;
813 #ifdef FLB_HAVE_METRICS
814     char *name;
815 #endif
816     struct mk_list *tmp;
817     struct mk_list *head;
818     struct mk_list *config_map;
819     struct flb_output_instance *ins;
820     struct flb_output_plugin *p;
821 
822     /* Retrieve the plugin reference */
823     mk_list_foreach_safe(head, tmp, &config->outputs) {
824         ins = mk_list_entry(head, struct flb_output_instance, _head);
825         if (ins->log_level == -1) {
826             ins->log_level = config->log->level;
827         }
828         p = ins->p;
829 
830         /* Output Events Channel */
831         ret = mk_event_channel_create(config->evl,
832                                       &ins->ch_events[0],
833                                       &ins->ch_events[1],
834                                       ins);
835         if (ret != 0) {
836             flb_error("could not create events channels for '%s'",
837                       flb_output_name(ins));
838             flb_output_instance_destroy(ins);
839             return -1;
840         }
841         flb_debug("[%s:%s] created event channels: read=%i write=%i",
842                   ins->p->name, flb_output_name(ins),
843                   ins->ch_events[0], ins->ch_events[1]);
844 
845         /*
846          * Note: mk_event_channel_create() sets a type = MK_EVENT_NOTIFICATION by
847          * default, we need to overwrite this value so we can do a clean check
848          * into the Engine when the event is triggered.
849          */
850         ins->event.type = FLB_ENGINE_EV_OUTPUT;
851 
852         /* Metrics */
853 #ifdef FLB_HAVE_METRICS
854         /* Get name or alias for the instance */
855         name = (char *) flb_output_name(ins);
856 
857         /* CMetrics */
858         ins->cmt = cmt_create();
859         if (!ins->cmt) {
860             flb_error("[output] could not create cmetrics context");
861             return -1;
862         }
863 
864         /* Register generic output plugin metrics */
865         ins->cmt_proc_records = cmt_counter_create(ins->cmt, "fluentbit",
866                                                    "output", "proc_records_total",
867                                                    "Number of processed output records.",
868                                                    1, (char *[]) {"name"});
869 
870         ins->cmt_proc_bytes = cmt_counter_create(ins->cmt, "fluentbit",
871                                                  "output", "proc_bytes_total",
872                                                  "Number of processed output bytes.",
873                                                  1, (char *[]) {"name"});
874 
875         ins->cmt_errors = cmt_counter_create(ins->cmt, "fluentbit",
876                                              "output", "errors_total",
877                                              "Number of output errors.",
878                                              1, (char *[]) {"name"});
879 
880         ins->cmt_retries = cmt_counter_create(ins->cmt, "fluentbit",
881                                              "output", "retries_total",
882                                              "Number of output retries.",
883                                              1, (char *[]) {"name"});
884 
885         ins->cmt_retries_failed = cmt_counter_create(ins->cmt, "fluentbit",
886                                              "output", "retries_failed_total",
887                                              "Number of abandoned batches because "
888                                              "the maximum number of re-tries was "
889                                              "reached.",
890                                              1, (char *[]) {"name"});
891 
892         ins->cmt_dropped_records = cmt_counter_create(ins->cmt, "fluentbit",
893                                              "output", "dropped_records_total",
894                                              "Number of dropped records.",
895                                              1, (char *[]) {"name"});
896 
897         ins->cmt_retried_records = cmt_counter_create(ins->cmt, "fluentbit",
898                                              "output", "retried_records_total",
899                                              "Number of retried records.",
900                                              1, (char *[]) {"name"});
901 
902         /* old API */
903         ins->metrics = flb_metrics_create(name);
904         if (ins->metrics) {
905             flb_metrics_add(FLB_METRIC_OUT_OK_RECORDS,
906                             "proc_records", ins->metrics);
907             flb_metrics_add(FLB_METRIC_OUT_OK_BYTES,
908                             "proc_bytes", ins->metrics);
909             flb_metrics_add(FLB_METRIC_OUT_ERROR,
910                             "errors", ins->metrics);
911             flb_metrics_add(FLB_METRIC_OUT_RETRY,
912                             "retries", ins->metrics);
913             flb_metrics_add(FLB_METRIC_OUT_RETRY_FAILED,
914                         "retries_failed", ins->metrics);
915             flb_metrics_add(FLB_METRIC_OUT_DROPPED_RECORDS,
916                         "dropped_records", ins->metrics);
917             flb_metrics_add(FLB_METRIC_OUT_RETRIED_RECORDS,
918                         "retried_records", ins->metrics);
919         }
920 #endif
921 
922 #ifdef FLB_HAVE_PROXY_GO
923         /* Proxy plugins have their own initialization */
924         if (p->type == FLB_OUTPUT_PLUGIN_PROXY) {
925             ret = flb_plugin_proxy_init(p->proxy, ins, config);
926             if (ret == -1) {
927                 flb_output_instance_destroy(ins);
928                 return -1;
929             }
930 
931             /* Multi-threading enabled if configured */
932             ret = flb_output_enable_multi_threading(ins, config);
933             if (ret == -1) {
934                 flb_error("[output] could not start thread pool for '%s' plugin",
935                           p->name);
936                 return -1;
937             }
938 
939             continue;
940         }
941 #endif
942 
943 #ifdef FLB_HAVE_TLS
944         if (ins->use_tls == FLB_TRUE) {
945             ins->tls = flb_tls_create(ins->tls_verify,
946                                       ins->tls_debug,
947                                       ins->tls_vhost,
948                                       ins->tls_ca_path,
949                                       ins->tls_ca_file,
950                                       ins->tls_crt_file,
951                                       ins->tls_key_file,
952                                       ins->tls_key_passwd);
953             if (!ins->tls) {
954                 flb_error("[output %s] error initializing TLS context",
955                           ins->name);
956                 flb_output_instance_destroy(ins);
957                 return -1;
958             }
959         }
960 #endif
961         /*
962          * Before to call the initialization callback, make sure that the received
963          * configuration parameters are valid if the plugin is registering a config map.
964          */
965         if (p->config_map) {
966             /*
967              * Create a dynamic version of the configmap that will be used by the specific
968              * instance in question.
969              */
970             config_map = flb_config_map_create(config, p->config_map);
971             if (!config_map) {
972                 flb_error("[output] error loading config map for '%s' plugin",
973                           p->name);
974                 flb_output_instance_destroy(ins);
975                 return -1;
976             }
977             ins->config_map = config_map;
978 
979             /* Validate incoming properties against config map */
980             ret = flb_config_map_properties_check(ins->p->name,
981                                                   &ins->properties, ins->config_map);
982             if (ret == -1) {
983                 if (config->program_name) {
984                     flb_helper("try the command: %s -o %s -h\n",
985                                config->program_name, ins->p->name);
986                 }
987                 flb_output_instance_destroy(ins);
988                 return -1;
989             }
990         }
991 
992         /* Init network defaults */
993         flb_net_setup_init(&ins->net_setup);
994 
995         /* Get Upstream net_setup configmap */
996         ins->net_config_map = flb_upstream_get_config_map(config);
997         if (!ins->net_config_map) {
998             flb_output_instance_destroy(ins);
999             return -1;
1000         }
1001 
1002 #ifdef FLB_HAVE_TLS
1003         struct flb_config_map *m;
1004 
1005         /* TLS config map (just for 'help' formatting purposes) */
1006         ins->tls_config_map = flb_tls_get_config_map(config);
1007 
1008         /* Override first configmap value based on it plugin flag */
1009         m = mk_list_entry_first(ins->tls_config_map, struct flb_config_map, _head);
1010         if (p->flags & FLB_IO_TLS) {
1011             m->value.val.boolean = FLB_TRUE;
1012         }
1013         else {
1014             m->value.val.boolean = FLB_FALSE;
1015         }
1016 #endif
1017         /*
1018          * Validate 'net.*' properties: if the plugin use the Upstream interface,
1019          * it might receive some networking settings.
1020          */
1021         if (mk_list_size(&ins->net_properties) > 0) {
1022             ret = flb_config_map_properties_check(ins->p->name,
1023                                                   &ins->net_properties,
1024                                                   ins->net_config_map);
1025             if (ret == -1) {
1026                 if (config->program_name) {
1027                     flb_helper("try the command: %s -o %s -h\n",
1028                                config->program_name, ins->p->name);
1029                 }
1030                 flb_output_instance_destroy(ins);
1031                 return -1;
1032             }
1033         }
1034 
1035         /* Initialize plugin through it 'init callback' */
1036         ret = p->cb_init(ins, config, ins->data);
1037         if (ret == -1) {
1038             flb_error("[output] failed to initialize '%s' plugin",
1039                       p->name);
1040             flb_output_instance_destroy(ins);
1041             return -1;
1042         }
1043 
1044         /* Multi-threading enabled if configured */
1045         ret = flb_output_enable_multi_threading(ins, config);
1046         if (ret == -1) {
1047             flb_error("[output] could not start thread pool for '%s' plugin",
1048                       p->name);
1049             return -1;
1050         }
1051     }
1052 
1053     return 0;
1054 }
1055 
1056 /* Assign an Configuration context to an Output */
flb_output_set_context(struct flb_output_instance * ins,void * context)1057 void flb_output_set_context(struct flb_output_instance *ins, void *context)
1058 {
1059     ins->context = context;
1060 }
1061 
1062 /* Check that at least one Output is enabled */
flb_output_check(struct flb_config * config)1063 int flb_output_check(struct flb_config *config)
1064 {
1065     if (mk_list_is_empty(&config->outputs) == 0) {
1066         return -1;
1067     }
1068     return 0;
1069 }
1070 
1071 /*
1072  * Output plugins might have enabled certain features that have not been passed
1073  * directly to the upstream context. In order to avoid let plugins validate specific
1074  * variables from the instance context like tls, tls.x, keepalive, etc, we populate
1075  * them directly through this function.
1076  */
flb_output_upstream_set(struct flb_upstream * u,struct flb_output_instance * ins)1077 int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins)
1078 {
1079     int flags = 0;
1080 
1081     if (!u) {
1082         return -1;
1083     }
1084 
1085     /* TLS */
1086 #ifdef FLB_HAVE_TLS
1087     if (ins->use_tls == FLB_TRUE) {
1088         flags |= FLB_IO_TLS;
1089     }
1090     else {
1091         flags |= FLB_IO_TCP;
1092     }
1093 #else
1094     flags |= FLB_IO_TCP;
1095 #endif
1096 
1097     /* IPv6 */
1098     if (ins->host.ipv6 == FLB_TRUE) {
1099         flags |= FLB_IO_IPV6;
1100     }
1101 
1102     /* Set flags */
1103     u->flags |= flags;
1104 
1105     /*
1106      * If the output plugin flush callbacks will run in multiple threads, enable
1107      * the thread safe mode for the Upstream context.
1108      */
1109     if (ins->tp_workers > 0) {
1110         flb_upstream_thread_safe(u);
1111         mk_list_add(&u->_head, &ins->upstreams);
1112     }
1113 
1114     /* Set networking options 'net.*' received through instance properties */
1115     memcpy(&u->net, &ins->net_setup, sizeof(struct flb_net_setup));
1116     return 0;
1117 }
1118 
flb_output_upstream_ha_set(void * ha,struct flb_output_instance * ins)1119 int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins)
1120 {
1121     struct mk_list *head;
1122     struct flb_upstream_node *node;
1123     struct flb_upstream_ha *upstream_ha = ha;
1124 
1125     mk_list_foreach(head, &upstream_ha->nodes) {
1126         node = mk_list_entry(head, struct flb_upstream_node, _head);
1127         flb_output_upstream_set(node->u, ins);
1128     }
1129 
1130     return 0;
1131 }
1132 
1133 /*
1134  * Helper function to set HTTP callbacks using the output instance 'callback'
1135  * context.
1136  */
flb_output_set_http_debug_callbacks(struct flb_output_instance * ins)1137 int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins)
1138 {
1139 #ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
1140     return flb_http_client_debug_setup(ins->callback, &ins->properties);
1141 #else
1142     return 0;
1143 #endif
1144 }
1145