1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24
25 #include <fluent-bit/flb_info.h>
26 #include <fluent-bit/flb_mem.h>
27 #include <fluent-bit/flb_str.h>
28 #include <fluent-bit/flb_env.h>
29 #include <fluent-bit/flb_coro.h>
30 #include <fluent-bit/flb_output.h>
31 #include <fluent-bit/flb_kv.h>
32 #include <fluent-bit/flb_io.h>
33 #include <fluent-bit/flb_uri.h>
34 #include <fluent-bit/flb_config.h>
35 #include <fluent-bit/flb_macros.h>
36 #include <fluent-bit/flb_utils.h>
37 #include <fluent-bit/flb_plugin_proxy.h>
38 #include <fluent-bit/flb_http_client_debug.h>
39 #include <fluent-bit/flb_output_thread.h>
40 #include <fluent-bit/flb_mp.h>
41 #include <fluent-bit/flb_pack.h>
42
43 FLB_TLS_DEFINE(struct flb_out_coro_params, out_coro_params);
44
flb_output_prepare()45 void flb_output_prepare()
46 {
47 FLB_TLS_INIT(out_coro_params);
48 }
49
50 /* Validate the the output address protocol */
check_protocol(const char * prot,const char * output)51 static int check_protocol(const char *prot, const char *output)
52 {
53 int len;
54 char *p;
55
56 p = strstr(output, "://");
57 if (p && p != output) {
58 len = p - output;
59 }
60 else {
61 len = strlen(output);
62 }
63
64 if (strlen(prot) != len) {
65 return 0;
66 }
67
68 /* Output plugin match */
69 if (strncasecmp(prot, output, len) == 0) {
70 return 1;
71 }
72
73 return 0;
74 }
75
76
77 /* Invoke pre-run call for the output plugin */
flb_output_pre_run(struct flb_config * config)78 void flb_output_pre_run(struct flb_config *config)
79 {
80 struct mk_list *head;
81 struct flb_output_instance *ins;
82 struct flb_output_plugin *p;
83
84 mk_list_foreach(head, &config->outputs) {
85 ins = mk_list_entry(head, struct flb_output_instance, _head);
86 p = ins->p;
87 if (p->cb_pre_run) {
88 p->cb_pre_run(ins->context, config);
89 }
90 }
91 }
92
flb_output_free_properties(struct flb_output_instance * ins)93 static void flb_output_free_properties(struct flb_output_instance *ins)
94 {
95
96 flb_kv_release(&ins->properties);
97 flb_kv_release(&ins->net_properties);
98
99 #ifdef FLB_HAVE_TLS
100 if (ins->tls_vhost) {
101 flb_sds_destroy(ins->tls_vhost);
102 }
103 if (ins->tls_ca_path) {
104 flb_sds_destroy(ins->tls_ca_path);
105 }
106 if (ins->tls_ca_file) {
107 flb_sds_destroy(ins->tls_ca_file);
108 }
109 if (ins->tls_crt_file) {
110 flb_sds_destroy(ins->tls_crt_file);
111 }
112 if (ins->tls_key_file) {
113 flb_sds_destroy(ins->tls_key_file);
114 }
115 if (ins->tls_key_passwd) {
116 flb_sds_destroy(ins->tls_key_passwd);
117 }
118 #endif
119 }
120
flb_output_coro_prepare_destroy(struct flb_output_coro * out_coro)121 void flb_output_coro_prepare_destroy(struct flb_output_coro *out_coro)
122 {
123 struct flb_output_instance *ins = out_coro->o_ins;
124 struct flb_out_thread_instance *th_ins;
125
126 /* Move output coroutine context from active list to the destroy one */
127 if (flb_output_is_threaded(ins) == FLB_TRUE) {
128 th_ins = flb_output_thread_instance_get();
129 pthread_mutex_lock(&th_ins->coro_mutex);
130 mk_list_del(&out_coro->_head);
131 mk_list_add(&out_coro->_head, &th_ins->coros_destroy);
132 pthread_mutex_unlock(&th_ins->coro_mutex);
133 }
134 else {
135 mk_list_del(&out_coro->_head);
136 mk_list_add(&out_coro->_head, &ins->coros_destroy);
137 }
138 }
139
flb_output_coro_id_get(struct flb_output_instance * ins)140 int flb_output_coro_id_get(struct flb_output_instance *ins)
141 {
142 int id;
143 int max = (2 << 13) - 1; /* max for 14 bits */
144 struct flb_out_thread_instance *th_ins;
145
146 if (flb_output_is_threaded(ins) == FLB_TRUE) {
147 th_ins = flb_output_thread_instance_get();
148 id = th_ins->coro_id;
149 th_ins->coro_id++;
150
151 /* reset once it reach the maximum allowed */
152 if (th_ins->coro_id > max) {
153 th_ins->coro_id = 0;
154 }
155 }
156 else {
157 id = ins->coro_id;
158 ins->coro_id++;
159
160 /* reset once it reach the maximum allowed */
161 if (ins->coro_id > max) {
162 ins->coro_id = 0;
163 }
164 }
165
166 return id;
167 }
168
flb_output_coro_add(struct flb_output_instance * ins,struct flb_coro * coro)169 void flb_output_coro_add(struct flb_output_instance *ins, struct flb_coro *coro)
170 {
171 struct flb_output_coro *out_coro;
172
173 out_coro = (struct flb_output_coro *) FLB_CORO_DATA(coro);
174 mk_list_add(&out_coro->_head, &ins->coros);
175 }
176
177 /*
178 * Flush a task through the output plugin, either using a worker thread + coroutine
179 * or a simple co-routine in the current thread.
180 */
flb_output_task_flush(struct flb_task * task,struct flb_output_instance * out_ins,struct flb_config * config)181 int flb_output_task_flush(struct flb_task *task,
182 struct flb_output_instance *out_ins,
183 struct flb_config *config)
184 {
185 int ret;
186 struct flb_output_coro *out_coro;
187
188 if (flb_output_is_threaded(out_ins) == FLB_TRUE) {
189 flb_task_users_inc(task);
190
191 /* Dispatch the task to the thread pool */
192 ret = flb_output_thread_pool_flush(task, out_ins, config);
193 if (ret == -1) {
194 flb_task_users_dec(task, FLB_FALSE);
195 }
196 }
197 else {
198 /* Direct co-routine handling */
199 out_coro = flb_output_coro_create(task,
200 task->i_ins,
201 out_ins,
202 config,
203 task->buf, task->size,
204 task->tag,
205 task->tag_len);
206 if (!out_coro) {
207 return -1;
208 }
209
210 flb_task_users_inc(task);
211 flb_coro_resume(out_coro->coro);
212 }
213
214 return 0;
215 }
216
flb_output_instance_destroy(struct flb_output_instance * ins)217 int flb_output_instance_destroy(struct flb_output_instance *ins)
218 {
219 if (ins->alias) {
220 flb_sds_destroy(ins->alias);
221 }
222
223 /* Remove URI context */
224 if (ins->host.uri) {
225 flb_uri_destroy(ins->host.uri);
226 }
227
228 flb_sds_destroy(ins->host.name);
229 flb_sds_destroy(ins->host.address);
230 flb_sds_destroy(ins->host.listen);
231 flb_sds_destroy(ins->match);
232
233 #ifdef FLB_HAVE_REGEX
234 if (ins->match_regex) {
235 flb_regex_destroy(ins->match_regex);
236 }
237 #endif
238
239 #ifdef FLB_HAVE_TLS
240 if (ins->use_tls == FLB_TRUE) {
241 if (ins->tls) {
242 flb_tls_destroy(ins->tls);
243 }
244 }
245
246 if (ins->tls_config_map) {
247 flb_config_map_destroy(ins->tls_config_map);
248 }
249 #endif
250
251 /* Remove metrics */
252 #ifdef FLB_HAVE_METRICS
253 if (ins->cmt) {
254 cmt_destroy(ins->cmt);
255 }
256
257 if (ins->metrics) {
258 flb_metrics_destroy(ins->metrics);
259 }
260 #endif
261
262 /* destroy callback context */
263 if (ins->callback) {
264 flb_callback_destroy(ins->callback);
265 }
266
267 /* destroy config map */
268 if (ins->config_map) {
269 flb_config_map_destroy(ins->config_map);
270 }
271
272 if (ins->net_config_map) {
273 flb_config_map_destroy(ins->net_config_map);
274 }
275
276 if (ins->ch_events[0] > 0) {
277 mk_event_closesocket(ins->ch_events[0]);
278 }
279
280 if (ins->ch_events[1] > 0) {
281 mk_event_closesocket(ins->ch_events[1]);
282 }
283
284 /* release properties */
285 flb_output_free_properties(ins);
286
287 mk_list_del(&ins->_head);
288 flb_free(ins);
289
290 return 0;
291 }
292
293 /* Invoke exit call for the output plugin */
flb_output_exit(struct flb_config * config)294 void flb_output_exit(struct flb_config *config)
295 {
296 struct mk_list *tmp;
297 struct mk_list *head;
298 struct flb_output_instance *ins;
299 struct flb_output_plugin *p;
300 void *params;
301
302 mk_list_foreach_safe(head, tmp, &config->outputs) {
303 ins = mk_list_entry(head, struct flb_output_instance, _head);
304 p = ins->p;
305
306 /* Stop any worker thread */
307 if (flb_output_is_threaded(ins) == FLB_TRUE) {
308 flb_output_thread_pool_destroy(ins);
309 }
310
311 /* Check a exit callback */
312 if (p->cb_exit) {
313 if(!p->proxy) {
314 p->cb_exit(ins->context, config);
315 }
316 else {
317 p->cb_exit(p, ins->context);
318 }
319 }
320 flb_output_instance_destroy(ins);
321 }
322
323 params = FLB_TLS_GET(out_coro_params);
324 if (params) {
325 flb_free(params);
326 }
327 }
328
instance_id(struct flb_config * config)329 static inline int instance_id(struct flb_config *config)
330 {
331 struct flb_output_instance *ins;
332
333 if (mk_list_size(&config->outputs) == 0) {
334 return 0;
335 }
336
337 ins = mk_list_entry_last(&config->outputs, struct flb_output_instance,
338 _head);
339 return (ins->id + 1);
340 }
341
flb_output_get_instance(struct flb_config * config,int out_id)342 struct flb_output_instance *flb_output_get_instance(struct flb_config *config,
343 int out_id)
344 {
345 struct mk_list *head;
346 struct flb_output_instance *ins;
347
348 mk_list_foreach(head, &config->outputs) {
349 ins = mk_list_entry(head, struct flb_output_instance, _head);
350 if (ins->id == out_id) {
351 break;
352 }
353 ins = NULL;
354 }
355
356 if (!ins) {
357 return NULL;
358 }
359
360 return ins;
361 }
362
363 /*
364 * Invoked everytime a flush callback has finished (returned). This function
365 * is called from the event loop.
366 */
flb_output_flush_finished(struct flb_config * config,int out_id)367 int flb_output_flush_finished(struct flb_config *config, int out_id)
368 {
369 struct mk_list *tmp;
370 struct mk_list *head;
371 struct mk_list *list;
372 struct flb_output_instance *ins;
373 struct flb_output_coro *out_coro;
374 struct flb_out_thread_instance *th_ins;
375
376 ins = flb_output_get_instance(config, out_id);
377 if (!ins) {
378 return -1;
379 }
380
381 if (flb_output_is_threaded(ins) == FLB_TRUE) {
382 th_ins = flb_output_thread_instance_get();
383 list = &th_ins->coros_destroy;
384 }
385 else {
386 list = &ins->coros_destroy;
387 }
388
389 /* Look for output coroutines that needs to be destroyed */
390 mk_list_foreach_safe(head, tmp, list) {
391 out_coro = mk_list_entry(head, struct flb_output_coro, _head);
392 flb_output_coro_destroy(out_coro);
393 }
394
395 return 0;
396 }
397
398
399 /*
400 * It validate an output type given the string, it return the
401 * proper type and if valid, populate the global config.
402 */
flb_output_new(struct flb_config * config,const char * output,void * data,int public_only)403 struct flb_output_instance *flb_output_new(struct flb_config *config,
404 const char *output, void *data,
405 int public_only)
406 {
407 int ret = -1;
408 int flags = 0;
409 struct mk_list *head;
410 struct flb_output_plugin *plugin;
411 struct flb_output_instance *instance = NULL;
412
413 if (!output) {
414 return NULL;
415 }
416
417 mk_list_foreach(head, &config->out_plugins) {
418 plugin = mk_list_entry(head, struct flb_output_plugin, _head);
419 if (!check_protocol(plugin->name, output)) {
420 plugin = NULL;
421 continue;
422 }
423
424 if (public_only && plugin->flags & FLB_OUTPUT_PRIVATE) {
425 return NULL;
426 }
427 break;
428 }
429
430 if (!plugin) {
431 return NULL;
432 }
433
434 /* Create and load instance */
435 instance = flb_calloc(1, sizeof(struct flb_output_instance));
436 if (!instance) {
437 flb_errno();
438 return NULL;
439 }
440
441 /* Initialize event type, if not set, default to FLB_OUTPUT_LOGS */
442 if (plugin->event_type == 0) {
443 instance->event_type = FLB_OUTPUT_LOGS;
444 }
445 else {
446 instance->event_type = plugin->event_type;
447 }
448 instance->config = config;
449 instance->log_level = -1;
450 instance->test_mode = FLB_FALSE;
451 instance->is_threaded = FLB_FALSE;
452
453
454 /* Retrieve an instance id for the output instance */
455 instance->id = instance_id(config);
456
457 /* format name (with instance id) */
458 snprintf(instance->name, sizeof(instance->name) - 1,
459 "%s.%i", plugin->name, instance->id);
460 instance->p = plugin;
461 instance->callback = flb_callback_create(instance->name);
462 if (!instance->callback) {
463 flb_free(instance);
464 return NULL;
465 }
466
467 if (plugin->type == FLB_OUTPUT_PLUGIN_CORE) {
468 instance->context = NULL;
469 }
470 else {
471 struct flb_plugin_proxy_context *ctx;
472
473 ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
474 if (!ctx) {
475 flb_errno();
476 flb_free(instance);
477 return NULL;
478 }
479
480 ctx->proxy = plugin->proxy;
481
482 instance->context = ctx;
483 }
484
485 instance->alias = NULL;
486 instance->flags = instance->p->flags;
487 instance->data = data;
488 instance->match = NULL;
489 #ifdef FLB_HAVE_REGEX
490 instance->match_regex = NULL;
491 #endif
492 instance->retry_limit = 1;
493 instance->host.name = NULL;
494 instance->host.address = NULL;
495 instance->net_config_map = NULL;
496
497 /* Storage */
498 instance->total_limit_size = -1;
499
500 /* Parent plugin flags */
501 flags = instance->flags;
502 if (flags & FLB_IO_TCP) {
503 instance->use_tls = FLB_FALSE;
504 }
505 else if (flags & FLB_IO_TLS) {
506 instance->use_tls = FLB_TRUE;
507 }
508 else if (flags & FLB_IO_OPT_TLS) {
509 /* TLS must be enabled manually in the config */
510 instance->use_tls = FLB_FALSE;
511 instance->flags |= FLB_IO_TLS;
512 }
513
514 #ifdef FLB_HAVE_TLS
515 instance->tls = NULL;
516 instance->tls_debug = -1;
517 instance->tls_verify = FLB_TRUE;
518 instance->tls_vhost = NULL;
519 instance->tls_ca_path = NULL;
520 instance->tls_ca_file = NULL;
521 instance->tls_crt_file = NULL;
522 instance->tls_key_file = NULL;
523 instance->tls_key_passwd = NULL;
524 #endif
525
526 if (plugin->flags & FLB_OUTPUT_NET) {
527 ret = flb_net_host_set(plugin->name, &instance->host, output);
528 if (ret != 0) {
529 flb_free(instance);
530 return NULL;
531 }
532 }
533
534 flb_kv_init(&instance->properties);
535 flb_kv_init(&instance->net_properties);
536 mk_list_init(&instance->upstreams);
537 mk_list_init(&instance->coros);
538 mk_list_init(&instance->coros_destroy);
539
540 mk_list_add(&instance->_head, &config->outputs);
541
542 /* Tests */
543 instance->test_formatter.callback = plugin->test_formatter.callback;
544
545 return instance;
546 }
547
prop_key_check(const char * key,const char * kv,int k_len)548 static inline int prop_key_check(const char *key, const char *kv, int k_len)
549 {
550 int len;
551
552 len = strlen(key);
553 if (strncasecmp(key, kv, k_len) == 0 && len == k_len) {
554 return 0;
555 }
556
557 return -1;
558 }
559
560 /* Override a configuration property for the given input_instance plugin */
flb_output_set_property(struct flb_output_instance * ins,const char * k,const char * v)561 int flb_output_set_property(struct flb_output_instance *ins,
562 const char *k, const char *v)
563 {
564 int len;
565 int ret;
566 ssize_t limit;
567 flb_sds_t tmp;
568 struct flb_kv *kv;
569 struct flb_config *config = ins->config;
570
571 len = strlen(k);
572 tmp = flb_env_var_translate(config->env, v);
573 if (tmp) {
574 if (strlen(tmp) == 0) {
575 flb_sds_destroy(tmp);
576 tmp = NULL;
577 }
578 }
579
580 /* Check if the key is a known/shared property */
581 if (prop_key_check("match", k, len) == 0) {
582 ins->match = tmp;
583 }
584 #ifdef FLB_HAVE_REGEX
585 else if (prop_key_check("match_regex", k, len) == 0 && tmp) {
586 ins->match_regex = flb_regex_create(tmp);
587 flb_sds_destroy(tmp);
588 }
589 #endif
590 else if (prop_key_check("alias", k, len) == 0 && tmp) {
591 ins->alias = tmp;
592 }
593 else if (prop_key_check("log_level", k, len) == 0 && tmp) {
594 ret = flb_log_get_level_str(tmp);
595 flb_sds_destroy(tmp);
596 if (ret == -1) {
597 return -1;
598 }
599 ins->log_level = ret;
600 }
601 else if (prop_key_check("host", k, len) == 0) {
602 ins->host.name = tmp;
603 }
604 else if (prop_key_check("port", k, len) == 0) {
605 if (tmp) {
606 ins->host.port = atoi(tmp);
607 flb_sds_destroy(tmp);
608 }
609 else {
610 ins->host.port = 0;
611 }
612 }
613 else if (prop_key_check("ipv6", k, len) == 0 && tmp) {
614 ins->host.ipv6 = flb_utils_bool(tmp);
615 flb_sds_destroy(tmp);
616 }
617 else if (prop_key_check("retry_limit", k, len) == 0) {
618 if (tmp) {
619 if (strcasecmp(tmp, "no_limits") == 0 ||
620 strcasecmp(tmp, "false") == 0 ||
621 strcasecmp(tmp, "off") == 0) {
622 /* No limits for retries */
623 ins->retry_limit = FLB_OUT_RETRY_UNLIMITED;
624 }
625 else if (strcasecmp(tmp, "no_retries") == 0) {
626 ins->retry_limit = FLB_OUT_RETRY_NONE;
627 }
628 else {
629 ins->retry_limit = atoi(tmp);
630 if (ins->retry_limit <= 0) {
631 flb_warn("[config] invalid retry_limit. set default.");
632 /* set default when input is invalid number */
633 ins->retry_limit = 1;
634 }
635 }
636 flb_sds_destroy(tmp);
637 }
638 else {
639 ins->retry_limit = 1;
640 }
641 }
642 else if (strncasecmp("net.", k, 4) == 0 && tmp) {
643 kv = flb_kv_item_create(&ins->net_properties, (char *) k, NULL);
644 if (!kv) {
645 if (tmp) {
646 flb_sds_destroy(tmp);
647 }
648 return -1;
649 }
650 kv->val = tmp;
651 }
652 #ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
653 else if (strncasecmp("_debug.http.", k, 12) == 0 && tmp) {
654 ret = flb_http_client_debug_property_is_valid((char *) k, tmp);
655 if (ret == FLB_TRUE) {
656 kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
657 if (!kv) {
658 if (tmp) {
659 flb_sds_destroy(tmp);
660 }
661 return -1;
662 }
663 kv->val = tmp;
664 }
665 else {
666 flb_error("[config] invalid property '%s' on instance '%s'",
667 k, flb_output_name(ins));
668 flb_sds_destroy(tmp);
669 }
670 }
671 #endif
672 #ifdef FLB_HAVE_TLS
673 else if (prop_key_check("tls", k, len) == 0 && tmp) {
674 if (strcasecmp(tmp, "true") == 0 || strcasecmp(tmp, "on") == 0) {
675 if ((ins->flags & FLB_IO_TLS) == 0) {
676 flb_error("[config] %s don't support TLS", ins->name);
677 flb_sds_destroy(tmp);
678 return -1;
679 }
680
681 ins->use_tls = FLB_TRUE;
682 }
683 else {
684 ins->use_tls = FLB_FALSE;
685 }
686 flb_sds_destroy(tmp);
687 }
688 else if (prop_key_check("tls.verify", k, len) == 0 && tmp) {
689 if (strcasecmp(tmp, "true") == 0 || strcasecmp(tmp, "on") == 0) {
690 ins->tls_verify = FLB_TRUE;
691 }
692 else {
693 ins->tls_verify = FLB_FALSE;
694 }
695 flb_sds_destroy(tmp);
696 }
697 else if (prop_key_check("tls.debug", k, len) == 0 && tmp) {
698 ins->tls_debug = atoi(tmp);
699 flb_sds_destroy(tmp);
700 }
701 else if (prop_key_check("tls.vhost", k, len) == 0) {
702 ins->tls_vhost = tmp;
703 }
704 else if (prop_key_check("tls.ca_path", k, len) == 0) {
705 ins->tls_ca_path = tmp;
706 }
707 else if (prop_key_check("tls.ca_file", k, len) == 0) {
708 ins->tls_ca_file = tmp;
709 }
710 else if (prop_key_check("tls.crt_file", k, len) == 0) {
711 ins->tls_crt_file = tmp;
712 }
713 else if (prop_key_check("tls.key_file", k, len) == 0) {
714 ins->tls_key_file = tmp;
715 }
716 else if (prop_key_check("tls.key_passwd", k, len) == 0) {
717 ins->tls_key_passwd = tmp;
718 }
719 #endif
720 else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) {
721 if (strcasecmp(tmp, "off") == 0 ||
722 flb_utils_bool(tmp) == FLB_FALSE) {
723 /* no limit for filesystem storage */
724 limit = -1;
725 flb_info("[config] unlimited filesystem buffer for %s plugin",
726 ins->name);
727 }
728 else {
729 limit = flb_utils_size_to_bytes(tmp);
730 if (limit == -1) {
731 flb_sds_destroy(tmp);
732 return -1;
733 }
734
735 if (limit == 0) {
736 limit = -1;
737 }
738 }
739
740 flb_sds_destroy(tmp);
741 ins->total_limit_size = (size_t) limit;
742 }
743 else if (prop_key_check("workers", k, len) == 0 && tmp) {
744 /* Set the number of workers */
745 ins->tp_workers = atoi(tmp);
746 flb_sds_destroy(tmp);
747 }
748 else {
749 /*
750 * Create the property, we don't pass the value since we will
751 * map it directly to avoid an extra memory allocation.
752 */
753 kv = flb_kv_item_create(&ins->properties, (char *) k, NULL);
754 if (!kv) {
755 if (tmp) {
756 flb_sds_destroy(tmp);
757 }
758 return -1;
759 }
760 kv->val = tmp;
761 }
762
763 return 0;
764 }
765
766 /* Configure a default hostname and TCP port if they are not set */
flb_output_net_default(const char * host,const int port,struct flb_output_instance * ins)767 void flb_output_net_default(const char *host, const int port,
768 struct flb_output_instance *ins)
769 {
770 /* Set default network configuration */
771 if (!ins->host.name) {
772 ins->host.name = flb_sds_create(host);
773 }
774 if (ins->host.port == 0) {
775 ins->host.port = port;
776 }
777 }
778
779 /* Add thread pool for output plugin if configured with workers */
flb_output_enable_multi_threading(struct flb_output_instance * ins,struct flb_config * config)780 int flb_output_enable_multi_threading(struct flb_output_instance *ins, struct flb_config *config)
781 {
782 /* Multi-threading enabled ? (through 'workers' property) */
783 if (ins->tp_workers > 0) {
784 if(flb_output_thread_pool_create(config, ins) != 0) {
785 flb_output_instance_destroy(ins);
786 return -1;
787 }
788 flb_output_thread_pool_start(ins);
789 }
790
791 return 0;
792 }
793
794 /* Return an instance name or alias */
flb_output_name(struct flb_output_instance * ins)795 const char *flb_output_name(struct flb_output_instance *ins)
796 {
797 if (ins->alias) {
798 return ins->alias;
799 }
800
801 return ins->name;
802 }
803
flb_output_get_property(const char * key,struct flb_output_instance * ins)804 const char *flb_output_get_property(const char *key, struct flb_output_instance *ins)
805 {
806 return flb_config_prop_get(key, &ins->properties);
807 }
808
809 /* Trigger the output plugins setup callbacks to prepare them. */
flb_output_init_all(struct flb_config * config)810 int flb_output_init_all(struct flb_config *config)
811 {
812 int ret;
813 #ifdef FLB_HAVE_METRICS
814 char *name;
815 #endif
816 struct mk_list *tmp;
817 struct mk_list *head;
818 struct mk_list *config_map;
819 struct flb_output_instance *ins;
820 struct flb_output_plugin *p;
821
822 /* Retrieve the plugin reference */
823 mk_list_foreach_safe(head, tmp, &config->outputs) {
824 ins = mk_list_entry(head, struct flb_output_instance, _head);
825 if (ins->log_level == -1) {
826 ins->log_level = config->log->level;
827 }
828 p = ins->p;
829
830 /* Output Events Channel */
831 ret = mk_event_channel_create(config->evl,
832 &ins->ch_events[0],
833 &ins->ch_events[1],
834 ins);
835 if (ret != 0) {
836 flb_error("could not create events channels for '%s'",
837 flb_output_name(ins));
838 flb_output_instance_destroy(ins);
839 return -1;
840 }
841 flb_debug("[%s:%s] created event channels: read=%i write=%i",
842 ins->p->name, flb_output_name(ins),
843 ins->ch_events[0], ins->ch_events[1]);
844
845 /*
846 * Note: mk_event_channel_create() sets a type = MK_EVENT_NOTIFICATION by
847 * default, we need to overwrite this value so we can do a clean check
848 * into the Engine when the event is triggered.
849 */
850 ins->event.type = FLB_ENGINE_EV_OUTPUT;
851
852 /* Metrics */
853 #ifdef FLB_HAVE_METRICS
854 /* Get name or alias for the instance */
855 name = (char *) flb_output_name(ins);
856
857 /* CMetrics */
858 ins->cmt = cmt_create();
859 if (!ins->cmt) {
860 flb_error("[output] could not create cmetrics context");
861 return -1;
862 }
863
864 /* Register generic output plugin metrics */
865 ins->cmt_proc_records = cmt_counter_create(ins->cmt, "fluentbit",
866 "output", "proc_records_total",
867 "Number of processed output records.",
868 1, (char *[]) {"name"});
869
870 ins->cmt_proc_bytes = cmt_counter_create(ins->cmt, "fluentbit",
871 "output", "proc_bytes_total",
872 "Number of processed output bytes.",
873 1, (char *[]) {"name"});
874
875 ins->cmt_errors = cmt_counter_create(ins->cmt, "fluentbit",
876 "output", "errors_total",
877 "Number of output errors.",
878 1, (char *[]) {"name"});
879
880 ins->cmt_retries = cmt_counter_create(ins->cmt, "fluentbit",
881 "output", "retries_total",
882 "Number of output retries.",
883 1, (char *[]) {"name"});
884
885 ins->cmt_retries_failed = cmt_counter_create(ins->cmt, "fluentbit",
886 "output", "retries_failed_total",
887 "Number of abandoned batches because "
888 "the maximum number of re-tries was "
889 "reached.",
890 1, (char *[]) {"name"});
891
892 ins->cmt_dropped_records = cmt_counter_create(ins->cmt, "fluentbit",
893 "output", "dropped_records_total",
894 "Number of dropped records.",
895 1, (char *[]) {"name"});
896
897 ins->cmt_retried_records = cmt_counter_create(ins->cmt, "fluentbit",
898 "output", "retried_records_total",
899 "Number of retried records.",
900 1, (char *[]) {"name"});
901
902 /* old API */
903 ins->metrics = flb_metrics_create(name);
904 if (ins->metrics) {
905 flb_metrics_add(FLB_METRIC_OUT_OK_RECORDS,
906 "proc_records", ins->metrics);
907 flb_metrics_add(FLB_METRIC_OUT_OK_BYTES,
908 "proc_bytes", ins->metrics);
909 flb_metrics_add(FLB_METRIC_OUT_ERROR,
910 "errors", ins->metrics);
911 flb_metrics_add(FLB_METRIC_OUT_RETRY,
912 "retries", ins->metrics);
913 flb_metrics_add(FLB_METRIC_OUT_RETRY_FAILED,
914 "retries_failed", ins->metrics);
915 flb_metrics_add(FLB_METRIC_OUT_DROPPED_RECORDS,
916 "dropped_records", ins->metrics);
917 flb_metrics_add(FLB_METRIC_OUT_RETRIED_RECORDS,
918 "retried_records", ins->metrics);
919 }
920 #endif
921
922 #ifdef FLB_HAVE_PROXY_GO
923 /* Proxy plugins have their own initialization */
924 if (p->type == FLB_OUTPUT_PLUGIN_PROXY) {
925 ret = flb_plugin_proxy_init(p->proxy, ins, config);
926 if (ret == -1) {
927 flb_output_instance_destroy(ins);
928 return -1;
929 }
930
931 /* Multi-threading enabled if configured */
932 ret = flb_output_enable_multi_threading(ins, config);
933 if (ret == -1) {
934 flb_error("[output] could not start thread pool for '%s' plugin",
935 p->name);
936 return -1;
937 }
938
939 continue;
940 }
941 #endif
942
943 #ifdef FLB_HAVE_TLS
944 if (ins->use_tls == FLB_TRUE) {
945 ins->tls = flb_tls_create(ins->tls_verify,
946 ins->tls_debug,
947 ins->tls_vhost,
948 ins->tls_ca_path,
949 ins->tls_ca_file,
950 ins->tls_crt_file,
951 ins->tls_key_file,
952 ins->tls_key_passwd);
953 if (!ins->tls) {
954 flb_error("[output %s] error initializing TLS context",
955 ins->name);
956 flb_output_instance_destroy(ins);
957 return -1;
958 }
959 }
960 #endif
961 /*
962 * Before to call the initialization callback, make sure that the received
963 * configuration parameters are valid if the plugin is registering a config map.
964 */
965 if (p->config_map) {
966 /*
967 * Create a dynamic version of the configmap that will be used by the specific
968 * instance in question.
969 */
970 config_map = flb_config_map_create(config, p->config_map);
971 if (!config_map) {
972 flb_error("[output] error loading config map for '%s' plugin",
973 p->name);
974 flb_output_instance_destroy(ins);
975 return -1;
976 }
977 ins->config_map = config_map;
978
979 /* Validate incoming properties against config map */
980 ret = flb_config_map_properties_check(ins->p->name,
981 &ins->properties, ins->config_map);
982 if (ret == -1) {
983 if (config->program_name) {
984 flb_helper("try the command: %s -o %s -h\n",
985 config->program_name, ins->p->name);
986 }
987 flb_output_instance_destroy(ins);
988 return -1;
989 }
990 }
991
992 /* Init network defaults */
993 flb_net_setup_init(&ins->net_setup);
994
995 /* Get Upstream net_setup configmap */
996 ins->net_config_map = flb_upstream_get_config_map(config);
997 if (!ins->net_config_map) {
998 flb_output_instance_destroy(ins);
999 return -1;
1000 }
1001
1002 #ifdef FLB_HAVE_TLS
1003 struct flb_config_map *m;
1004
1005 /* TLS config map (just for 'help' formatting purposes) */
1006 ins->tls_config_map = flb_tls_get_config_map(config);
1007
1008 /* Override first configmap value based on it plugin flag */
1009 m = mk_list_entry_first(ins->tls_config_map, struct flb_config_map, _head);
1010 if (p->flags & FLB_IO_TLS) {
1011 m->value.val.boolean = FLB_TRUE;
1012 }
1013 else {
1014 m->value.val.boolean = FLB_FALSE;
1015 }
1016 #endif
1017 /*
1018 * Validate 'net.*' properties: if the plugin use the Upstream interface,
1019 * it might receive some networking settings.
1020 */
1021 if (mk_list_size(&ins->net_properties) > 0) {
1022 ret = flb_config_map_properties_check(ins->p->name,
1023 &ins->net_properties,
1024 ins->net_config_map);
1025 if (ret == -1) {
1026 if (config->program_name) {
1027 flb_helper("try the command: %s -o %s -h\n",
1028 config->program_name, ins->p->name);
1029 }
1030 flb_output_instance_destroy(ins);
1031 return -1;
1032 }
1033 }
1034
1035 /* Initialize plugin through it 'init callback' */
1036 ret = p->cb_init(ins, config, ins->data);
1037 if (ret == -1) {
1038 flb_error("[output] failed to initialize '%s' plugin",
1039 p->name);
1040 flb_output_instance_destroy(ins);
1041 return -1;
1042 }
1043
1044 /* Multi-threading enabled if configured */
1045 ret = flb_output_enable_multi_threading(ins, config);
1046 if (ret == -1) {
1047 flb_error("[output] could not start thread pool for '%s' plugin",
1048 p->name);
1049 return -1;
1050 }
1051 }
1052
1053 return 0;
1054 }
1055
1056 /* Assign an Configuration context to an Output */
flb_output_set_context(struct flb_output_instance * ins,void * context)1057 void flb_output_set_context(struct flb_output_instance *ins, void *context)
1058 {
1059 ins->context = context;
1060 }
1061
1062 /* Check that at least one Output is enabled */
flb_output_check(struct flb_config * config)1063 int flb_output_check(struct flb_config *config)
1064 {
1065 if (mk_list_is_empty(&config->outputs) == 0) {
1066 return -1;
1067 }
1068 return 0;
1069 }
1070
1071 /*
1072 * Output plugins might have enabled certain features that have not been passed
1073 * directly to the upstream context. In order to avoid let plugins validate specific
1074 * variables from the instance context like tls, tls.x, keepalive, etc, we populate
1075 * them directly through this function.
1076 */
flb_output_upstream_set(struct flb_upstream * u,struct flb_output_instance * ins)1077 int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins)
1078 {
1079 int flags = 0;
1080
1081 if (!u) {
1082 return -1;
1083 }
1084
1085 /* TLS */
1086 #ifdef FLB_HAVE_TLS
1087 if (ins->use_tls == FLB_TRUE) {
1088 flags |= FLB_IO_TLS;
1089 }
1090 else {
1091 flags |= FLB_IO_TCP;
1092 }
1093 #else
1094 flags |= FLB_IO_TCP;
1095 #endif
1096
1097 /* IPv6 */
1098 if (ins->host.ipv6 == FLB_TRUE) {
1099 flags |= FLB_IO_IPV6;
1100 }
1101
1102 /* Set flags */
1103 u->flags |= flags;
1104
1105 /*
1106 * If the output plugin flush callbacks will run in multiple threads, enable
1107 * the thread safe mode for the Upstream context.
1108 */
1109 if (ins->tp_workers > 0) {
1110 flb_upstream_thread_safe(u);
1111 mk_list_add(&u->_head, &ins->upstreams);
1112 }
1113
1114 /* Set networking options 'net.*' received through instance properties */
1115 memcpy(&u->net, &ins->net_setup, sizeof(struct flb_net_setup));
1116 return 0;
1117 }
1118
flb_output_upstream_ha_set(void * ha,struct flb_output_instance * ins)1119 int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins)
1120 {
1121 struct mk_list *head;
1122 struct flb_upstream_node *node;
1123 struct flb_upstream_ha *upstream_ha = ha;
1124
1125 mk_list_foreach(head, &upstream_ha->nodes) {
1126 node = mk_list_entry(head, struct flb_upstream_node, _head);
1127 flb_output_upstream_set(node->u, ins);
1128 }
1129
1130 return 0;
1131 }
1132
1133 /*
1134 * Helper function to set HTTP callbacks using the output instance 'callback'
1135 * context.
1136 */
flb_output_set_http_debug_callbacks(struct flb_output_instance * ins)1137 int flb_output_set_http_debug_callbacks(struct flb_output_instance *ins)
1138 {
1139 #ifdef FLB_HAVE_HTTP_CLIENT_DEBUG
1140 return flb_http_client_debug_setup(ins->callback, &ins->properties);
1141 #else
1142 return 0;
1143 #endif
1144 }
1145