1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #ifndef FLB_INPUT_H
22 #define FLB_INPUT_H
23
24 #include <fluent-bit/flb_info.h>
25 #include <fluent-bit/flb_input_chunk.h>
26 #include <fluent-bit/flb_coro.h>
27 #include <fluent-bit/flb_config.h>
28 #include <fluent-bit/flb_network.h>
29 #include <fluent-bit/flb_mem.h>
30 #include <fluent-bit/flb_str.h>
31 #include <fluent-bit/flb_bits.h>
32 #include <fluent-bit/flb_pipe.h>
33 #include <fluent-bit/flb_filter.h>
34 #include <fluent-bit/flb_coro.h>
35 #include <fluent-bit/flb_mp.h>
36 #include <fluent-bit/flb_hash.h>
37
38 #ifdef FLB_HAVE_METRICS
39 #include <fluent-bit/flb_metrics.h>
40 #endif
41
42 #include <cmetrics/cmetrics.h>
43 #include <monkey/mk_core.h>
44 #include <msgpack.h>
45
46 #include <inttypes.h>
47
48 #define FLB_COLLECT_TIME 1
49 #define FLB_COLLECT_FD_EVENT 2
50 #define FLB_COLLECT_FD_SERVER 4
51
52 /* Input plugin flag masks */
53 #define FLB_INPUT_NET 4 /* input address may set host and port */
54 #define FLB_INPUT_CORO 128 /* plugin requires a thread on callbacks */
55 #define FLB_INPUT_PRIVATE 256 /* plugin is not published/exposed */
56 #define FLB_INPUT_NOTAG 512 /* plugin might don't have tags */
57
58 /* Input status */
59 #define FLB_INPUT_RUNNING 1
60 #define FLB_INPUT_PAUSED 0
61
62 /* Input plugin event type */
63 #define FLB_INPUT_LOGS 0
64 #define FLB_INPUT_METRICS 1
65
66 struct flb_input_instance;
67
68 struct flb_input_plugin {
69 int flags; /* plugin flags */
70 int event_type; /* event type to be genarated: logs ?, metrics ? */
71
72 /* The Input name */
73 char *name;
74
75 /* Plugin Description */
76 char *description;
77
78 struct flb_config_map *config_map;
79
80 /* Initalization */
81 int (*cb_init) (struct flb_input_instance *, struct flb_config *, void *);
82
83 /* Pre run */
84 int (*cb_pre_run) (struct flb_input_instance *, struct flb_config *, void *);
85
86 /* Collect: every certain amount of time, Fluent Bit trigger this callback */
87 int (*cb_collect) (struct flb_input_instance *, struct flb_config *, void *);
88
89 /*
90 * Flush: each plugin during a collection, it does some buffering,
91 * when the Flush timer takes place on the Engine, it will trigger
92 * the cb_flush(...) to obtain the plugin buffer data. This data is
93 * a MsgPack buffer which will be processed by the Engine and delivered
94 * to the target output.
95 */
96
97 /* Flush a buffer type (raw data) */
98 void *(*cb_flush_buf) (void *, size_t *);
99
100 /* Notify that a flush have completed on the collector (buf + iov) */
101 void (*cb_flush_end) (void *);
102
103 /*
104 * Callbacks to notify the plugin when it becomes paused (cannot longer append
105 * data) and when it can resume operations.
106 */
107 void (*cb_pause) (void *, struct flb_config *);
108 void (*cb_resume) (void *, struct flb_config *);
109
110 /*
111 * Optional callback that can be used from a parent caller to ingest
112 * data into the engine.
113 */
114 int (*cb_ingest) (void *in_context, void *, size_t);
115
116 /* Exit */
117 int (*cb_exit) (void *, struct flb_config *);
118
119 void *instance;
120
121 struct mk_list _head;
122 };
123
124 /*
125 * Each initialized plugin must have an instance, same plugin may be
126 * loaded more than one time.
127 *
128 * An instance try to contain plugin data separating what is fixed data
129 * and the variable one that is generated when the plugin is invoked.
130 */
131 struct flb_input_instance {
132 int event_type; /* FLB_INPUT_LOGS, FLB_INPUT_METRICS */
133
134 /*
135 * The instance flags are derivated from the fixed plugin flags. This
136 * is done to offer some flexibility where a plugin instance per
137 * configuration would like to change some specific behavior.
138 *
139 * e.g By default in_tail plugin supports fixed tag, but if a wildcard
140 * is added to the 'tag', it will instruct to perform dyntag operations
141 * as the tags will be composed used the file name being watched.
142 */
143 int flags;
144
145 int id; /* instance id */
146 int log_level; /* log level for this plugin */
147 flb_pipefd_t channel[2]; /* pipe(2) channel */
148 int threaded; /* bool / Threaded instance ? */
149 char name[32]; /* numbered name (cpu -> cpu.0) */
150 char *alias; /* alias name for the instance */
151 void *context; /* plugin configuration context */
152 struct flb_input_plugin *p; /* original plugin */
153
154 /* Plugin properties */
155 char *tag; /* Input tag for routing */
156 int tag_len;
157
158 /* By default all input instances are 'routable' */
159 int routable;
160
161 /* flag to pause input when storage is full */
162 int storage_pause_on_chunks_overlimit;
163
164 /*
165 * Input network info:
166 *
167 * An input plugin can be specified just using it shortname or using the
168 * complete network address format, e.g:
169 *
170 * $ fluent-bit -i cpu -o plugin://hostname:port/uri
171 *
172 * where:
173 *
174 * plugin = the output plugin shortname
175 * name = IP address or hostname of the target
176 * port = target TCP port
177 * uri = extra information that may be used by the plugin
178 */
179 struct flb_net_host host;
180
181 /* Reference to struct flb_storage_input context */
182 void *storage;
183
184 /* Type of storage: CIO_STORE_FS (filesystem) or CIO_STORE_MEM (memory) */
185 int storage_type;
186
187 /*
188 * Buffers counter: it count the total of memory used by fixed and dynamic
189 * messgage pack buffers used by the input plugin instance.
190 */
191 size_t mem_chunks_size;
192 size_t mp_total_buf_size; /* FIXME: to be deprecated */
193
194 /*
195 * Buffer limit: optional limit set by configuration so this input instance
196 * cannot exceed more than mp_buf_limit (bytes unit).
197 *
198 * As a reference, if an input plugin exceeds the limit, the pause() callback
199 * will be triggered to notirfy the input instance it cannot longer append
200 * more data, on that moment Fluent Bit will avoid to add more records.
201 *
202 * When the buffer size goes down (because data was flushed), a resume()
203 * callback will be triggered, from that moment the plugin can append more
204 * data.
205 */
206 size_t mem_buf_limit;
207
208 /*
209 * Define the buffer status:
210 *
211 * - FLB_INPUT_RUNNING -> can append more data
212 * - FLB_INPUT_PAUSED -> cannot append data
213 */
214 int mem_buf_status;
215
216 /*
217 * Define the buffer status:
218 *
219 * - FLB_INPUT_RUNNING -> can append more data
220 * - FLB_INPUT_PAUSED -> cannot append data
221 */
222 int storage_buf_status;
223
224 /*
225 * Optional data passed to the plugin, this info is useful when
226 * running Fluent Bit in library mode and the target plugin needs
227 * some specific data from it caller.
228 */
229 void *data;
230
231 struct mk_list *config_map; /* configuration map */
232
233 struct mk_list _head; /* link to config->inputs */
234
235 struct mk_list routes_direct; /* direct routes set by API */
236 struct mk_list routes; /* flb_router_path's list */
237 struct mk_list properties; /* properties / configuration */
238 struct mk_list collectors; /* collectors */
239
240 /* Storage Chunks */
241 struct mk_list chunks; /* linked list of all chunks */
242
243 /*
244 * The following list helps to separate the chunks per it
245 * status, it can be 'up' or 'down'.
246 */
247 struct mk_list chunks_up; /* linked list of all chunks up */
248 struct mk_list chunks_down; /* linked list of all chunks down */
249
250 /*
251 * Every co-routine created by the engine when flushing data, it's
252 * linked into this list header.
253 */
254 struct mk_list tasks;
255
256 struct mk_list coros; /* list of input coros */
257
258 #ifdef FLB_HAVE_METRICS
259
260 /* old metrics API */
261 struct flb_metrics *metrics; /* metrics */
262 #endif
263
264 /*
265 * CMetrics
266 * --------
267 */
268 struct cmt *cmt; /* parent context */
269 struct cmt_counter *cmt_bytes; /* metric: input_bytes_total */
270 struct cmt_counter *cmt_records; /* metric: input_records_total */
271
272 /*
273 * Indexes for generated chunks: simple hash tables that keeps the latest
274 * available chunks for writing data operations. This optimize the
275 * lookup for candidates chunks to write data.
276 *
277 * Starting from v1.8 we have separate hash tables for logs and metrics.
278 */
279 struct flb_hash *ht_log_chunks;
280 struct flb_hash *ht_metric_chunks;
281
282 /* Keep a reference to the original context this instance belongs to */
283 struct flb_config *config;
284 };
285
286 struct flb_input_collector {
287 int id; /* collector id */
288 int type; /* collector type */
289 int running; /* is running ? (True/False) */
290
291 /* FLB_COLLECT_FD_EVENT */
292 flb_pipefd_t fd_event; /* fd being watched */
293
294 /* FLB_COLLECT_TIME */
295 flb_pipefd_t fd_timer; /* timer fd */
296 time_t seconds; /* expire time in seconds */
297 long nanoseconds; /* expire nanoseconds */
298
299 /* Callback */
300 int (*cb_collect) (struct flb_input_instance *,
301 struct flb_config *, void *);
302
303 struct mk_event event;
304
305 /* General references */
306 struct flb_input_instance *instance; /* plugin instance */
307 struct mk_list _head; /* link to global collectors */
308 struct mk_list _head_ins; /* link to instance collectors */
309 };
310
311 struct flb_input_coro {
312 int id; /* ID obtained from config->in_table_id */
313 time_t start_time; /* start time */
314 time_t end_time; /* end time */
315 struct flb_config *config; /* FLB context */
316 struct flb_coro *coro; /* Back reference to parent thread */
317 struct mk_list _head; /* link to list on input_instance->coros */
318 };
319
320 /*
321 * Every thread created for an input instance plugin, requires to have an
322 * unique Thread-ID. This function lookup the static table in the context
323 * and return the lowest available ID.
324 */
325 static FLB_INLINE
flb_input_coro_get_id(struct flb_config * config)326 int flb_input_coro_get_id(struct flb_config *config)
327 {
328 unsigned int i;
329
330 for (i = 0; i < (sizeof(config->in_table_id)/sizeof(uint16_t)); i++) {
331 if (config->in_table_id[i] == 0) {
332 config->in_table_id[i] = FLB_TRUE;
333 return i;
334 }
335 }
336
337 return -1;
338 }
339
340 /*
341 * When an input thread ends, it needs to release it ID. This function
342 * just mark the ID as unused.
343 */
344 static FLB_INLINE
flb_input_coro_del_id(int id,struct flb_config * config)345 void flb_input_coro_del_id(int id, struct flb_config *config)
346 {
347 config->in_table_id[id] = FLB_FALSE;
348 }
349
350 static FLB_INLINE
flb_input_coro_destroy_id(int id,struct flb_config * config)351 int flb_input_coro_destroy_id(int id, struct flb_config *config)
352 {
353 struct mk_list *tmp;
354 struct mk_list *head;
355 struct mk_list *head_th;
356 struct flb_input_coro *in_coro;
357 struct flb_input_instance *i_ins;
358
359 /* Iterate input-instances to find the thread */
360 mk_list_foreach(head, &config->inputs) {
361 i_ins = mk_list_entry(head, struct flb_input_instance, _head);
362 mk_list_foreach_safe(head_th, tmp, &i_ins->coros) {
363 in_coro = mk_list_entry(head_th, struct flb_input_coro, _head);
364 if (in_coro->id != id) {
365 continue;
366 }
367
368 mk_list_del(&in_coro->_head);
369 flb_input_coro_del_id(id, config);
370 flb_coro_destroy(in_coro->coro);
371 flb_debug("[input] destroy input_thread id=%i", id);
372 return 0;
373 }
374 }
375
376 return -1;
377 }
378
379 static FLB_INLINE
flb_input_coro_create(struct flb_input_instance * ins,struct flb_config * config)380 struct flb_coro *flb_input_coro_create(struct flb_input_instance *ins,
381 struct flb_config *config)
382 {
383 int id;
384 struct flb_coro *coro;
385 struct flb_input_coro *in_coro;
386
387 /* Try to obtain an id */
388 id = flb_input_coro_get_id(config);
389 if (id == -1) {
390 return NULL;
391 }
392
393 /* Setup thread specific data */
394 in_coro = (struct flb_input_coro *) flb_malloc(sizeof(struct flb_input_coro));
395 if (!in_coro) {
396 flb_errno();
397 return NULL;
398 }
399
400 coro = flb_coro_create(in_coro);
401 if (!coro) {
402 flb_free(in_coro);
403 return NULL;
404 }
405
406 in_coro->id = id;
407 in_coro->start_time = time(NULL);
408 in_coro->coro = coro;
409 in_coro->config = config;
410 mk_list_add(&in_coro->_head, &ins->coros);
411
412 return coro;
413 }
414
415 struct flb_libco_in_params {
416 struct flb_config *config;
417 struct flb_input_collector *coll;
418 struct flb_coro *coro;
419 };
420
421 extern struct flb_libco_in_params libco_in_param;
422
input_params_set(struct flb_coro * coro,struct flb_input_collector * coll,struct flb_config * config,void * context)423 static FLB_INLINE void input_params_set(struct flb_coro *coro,
424 struct flb_input_collector *coll,
425 struct flb_config *config,
426 void *context)
427 {
428 /* Set callback parameters */
429 libco_in_param.coll = coll;
430 libco_in_param.config = config;
431 libco_in_param.coro = coro;
432 co_switch(coro->callee);
433 }
434
input_pre_cb_collect(void)435 static FLB_INLINE void input_pre_cb_collect(void)
436 {
437 struct flb_input_collector *coll = libco_in_param.coll;
438 struct flb_config *config = libco_in_param.config;
439 struct flb_coro *coro = libco_in_param.coro;
440
441 co_switch(coro->caller);
442 coll->cb_collect(coll->instance, config, coll->instance->context);
443 }
444
445 static FLB_INLINE
flb_input_coro_collect(struct flb_input_collector * coll,struct flb_config * config)446 struct flb_coro *flb_input_coro_collect(struct flb_input_collector *coll,
447 struct flb_config *config)
448 {
449 size_t stack_size;
450 struct flb_coro *coro;
451
452 coro = flb_input_coro_create(coll->instance, config);
453 if (!coro) {
454 return NULL;
455 }
456
457 coro->caller = co_active();
458 coro->callee = co_create(config->coro_stack_size,
459 input_pre_cb_collect, &stack_size);
460
461 #ifdef FLB_HAVE_VALGRIND
462 coro->valgrind_stack_id = VALGRIND_STACK_REGISTER(coro->callee,
463 ((char *)coro->callee) + stack_size);
464 #endif
465
466 /* Set parameters */
467 input_params_set(coro, coll, config, coll->instance->context);
468 return coro;
469 }
470
471 /*
472 * This function is used by the output plugins to return. It's mandatory
473 * as it will take care to signal the event loop letting know the flush
474 * callback has done.
475 *
476 * The signal emmited indicate the 'Task' number that have finished plus
477 * a return value. The return value is either FLB_OK, FLB_RETRY or FLB_ERROR.
478 *
479 * If the caller have requested a FLB_RETRY, it will be issued depending of the
480 * number of retries, if it have exceed the 'retry_limit' option, a FLB_ERROR
481 * will be returned instead.
482 */
flb_input_return(struct flb_coro * coro)483 static inline void flb_input_return(struct flb_coro *coro) {
484 int n;
485 uint64_t val;
486 struct flb_input_coro *in_coro;
487
488 in_coro = (struct flb_input_coro *) FLB_CORO_DATA(coro);
489
490 /*
491 * To compose the signal event the relevant info is:
492 *
493 * - Unique Task events id: 2 in this case
494 * - Return value: FLB_OK (0) or FLB_ERROR (1)
495 * - Task ID
496 *
497 * We put together the return value with the task_id on the 32 bits at right
498 */
499 val = FLB_BITS_U64_SET(3 /* FLB_ENGINE_IN_COROREAD */, in_coro->id);
500 n = flb_pipe_w(in_coro->config->ch_manager[1], (void *) &val, sizeof(val));
501 if (n == -1) {
502 flb_errno();
503 }
504 }
505
flb_input_buf_paused(struct flb_input_instance * i)506 static inline int flb_input_buf_paused(struct flb_input_instance *i)
507 {
508 if (i->mem_buf_status == FLB_INPUT_PAUSED) {
509 return FLB_TRUE;
510 }
511 if (i->storage_buf_status == FLB_INPUT_PAUSED) {
512 return FLB_TRUE;
513 }
514
515 return FLB_FALSE;
516 }
517
flb_input_config_map_set(struct flb_input_instance * ins,void * context)518 static inline int flb_input_config_map_set(struct flb_input_instance *ins,
519 void *context)
520 {
521 return flb_config_map_set(&ins->properties, ins->config_map, context);
522 }
523
524 int flb_input_register_all(struct flb_config *config);
525 struct flb_input_instance *flb_input_new(struct flb_config *config,
526 const char *input, void *data,
527 int public_only);
528 int flb_input_set_property(struct flb_input_instance *ins,
529 const char *k, const char *v);
530 const char *flb_input_get_property(const char *key,
531 struct flb_input_instance *ins);
532
533 int flb_input_check(struct flb_config *config);
534 void flb_input_set_context(struct flb_input_instance *ins, void *context);
535 int flb_input_channel_init(struct flb_input_instance *ins);
536
537 int flb_input_collector_start(int coll_id, struct flb_input_instance *ins);
538 int flb_input_collectors_start(struct flb_config *config);
539 int flb_input_collector_pause(int coll_id, struct flb_input_instance *ins);
540 int flb_input_collector_resume(int coll_id, struct flb_input_instance *ins);
541 int flb_input_collector_delete(int coll_id, struct flb_input_instance *ins);
542 int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config);
543 int flb_input_set_collector_time(struct flb_input_instance *ins,
544 int (*cb_collect) (struct flb_input_instance *,
545 struct flb_config *, void *),
546 time_t seconds,
547 long nanoseconds,
548 struct flb_config *config);
549 int flb_input_set_collector_event(struct flb_input_instance *ins,
550 int (*cb_collect) (struct flb_input_instance *,
551 struct flb_config *, void *),
552 flb_pipefd_t fd,
553 struct flb_config *config);
554 int flb_input_set_collector_socket(struct flb_input_instance *ins,
555 int (*cb_new_connection) (struct flb_input_instance *,
556 struct flb_config *,
557 void*),
558 flb_pipefd_t fd,
559 struct flb_config *config);
560 int flb_input_collector_running(int coll_id, struct flb_input_instance *ins);
561 int flb_input_instance_init(struct flb_input_instance *ins,
562 struct flb_config *config);
563 void flb_input_instance_exit(struct flb_input_instance *ins,
564 struct flb_config *config);
565 void flb_input_instance_destroy(struct flb_input_instance *ins);
566
567 int flb_input_init_all(struct flb_config *config);
568 void flb_input_pre_run_all(struct flb_config *config);
569 void flb_input_exit_all(struct flb_config *config);
570
571 void *flb_input_flush(struct flb_input_instance *ins, size_t *size);
572 int flb_input_pause_all(struct flb_config *config);
573 const char *flb_input_name(struct flb_input_instance *ins);
574 int flb_input_name_exists(const char *name, struct flb_config *config);
575
576 void flb_input_net_default_listener(const char *listen, int port,
577 struct flb_input_instance *ins);
578
579 int flb_input_event_type_is_metric(struct flb_input_instance *ins);
580 int flb_input_event_type_is_log(struct flb_input_instance *ins);
581
582 #endif
583