1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #ifndef FLB_INPUT_H
22 #define FLB_INPUT_H
23 
24 #include <fluent-bit/flb_info.h>
25 #include <fluent-bit/flb_input_chunk.h>
26 #include <fluent-bit/flb_coro.h>
27 #include <fluent-bit/flb_config.h>
28 #include <fluent-bit/flb_network.h>
29 #include <fluent-bit/flb_mem.h>
30 #include <fluent-bit/flb_str.h>
31 #include <fluent-bit/flb_bits.h>
32 #include <fluent-bit/flb_pipe.h>
33 #include <fluent-bit/flb_filter.h>
34 #include <fluent-bit/flb_coro.h>
35 #include <fluent-bit/flb_mp.h>
36 #include <fluent-bit/flb_hash.h>
37 
38 #ifdef FLB_HAVE_METRICS
39 #include <fluent-bit/flb_metrics.h>
40 #endif
41 
42 #include <cmetrics/cmetrics.h>
43 #include <monkey/mk_core.h>
44 #include <msgpack.h>
45 
46 #include <inttypes.h>
47 
48 #define FLB_COLLECT_TIME        1
49 #define FLB_COLLECT_FD_EVENT    2
50 #define FLB_COLLECT_FD_SERVER   4
51 
52 /* Input plugin flag masks */
53 #define FLB_INPUT_NET          4   /* input address may set host and port   */
54 #define FLB_INPUT_CORO       128   /* plugin requires a thread on callbacks */
55 #define FLB_INPUT_PRIVATE    256   /* plugin is not published/exposed       */
56 #define FLB_INPUT_NOTAG      512   /* plugin might don't have tags          */
57 
58 /* Input status */
59 #define FLB_INPUT_RUNNING     1
60 #define FLB_INPUT_PAUSED      0
61 
62 /* Input plugin event type */
63 #define FLB_INPUT_LOGS        0
64 #define FLB_INPUT_METRICS     1
65 
66 struct flb_input_instance;
67 
68 struct flb_input_plugin {
69     int flags;                /* plugin flags */
70     int event_type;           /* event type to be genarated: logs ?, metrics ? */
71 
72     /* The Input name */
73     char *name;
74 
75     /* Plugin Description */
76     char *description;
77 
78     struct flb_config_map *config_map;
79 
80     /* Initalization */
81     int (*cb_init)    (struct flb_input_instance *, struct flb_config *, void *);
82 
83     /* Pre run */
84     int (*cb_pre_run) (struct flb_input_instance *, struct flb_config *, void *);
85 
86     /* Collect: every certain amount of time, Fluent Bit trigger this callback */
87     int (*cb_collect) (struct flb_input_instance *, struct flb_config *, void *);
88 
89     /*
90      * Flush: each plugin during a collection, it does some buffering,
91      * when the Flush timer takes place on the Engine, it will trigger
92      * the cb_flush(...) to obtain the plugin buffer data. This data is
93      * a MsgPack buffer which will be processed by the Engine and delivered
94      * to the target output.
95      */
96 
97     /* Flush a buffer type (raw data) */
98     void *(*cb_flush_buf) (void *, size_t *);
99 
100     /* Notify that a flush have completed on the collector (buf + iov) */
101     void (*cb_flush_end) (void *);
102 
103     /*
104      * Callbacks to notify the plugin when it becomes paused (cannot longer append
105      * data) and when it can resume operations.
106      */
107     void (*cb_pause) (void *, struct flb_config *);
108     void (*cb_resume) (void *, struct flb_config *);
109 
110     /*
111      * Optional callback that can be used from a parent caller to ingest
112      * data into the engine.
113      */
114     int (*cb_ingest) (void *in_context, void *, size_t);
115 
116     /* Exit */
117     int (*cb_exit) (void *, struct flb_config *);
118 
119     void *instance;
120 
121     struct mk_list _head;
122 };
123 
124 /*
125  * Each initialized plugin must have an instance, same plugin may be
126  * loaded more than one time.
127  *
128  * An instance try to contain plugin data separating what is fixed data
129  * and the variable one that is generated when the plugin is invoked.
130  */
131 struct flb_input_instance {
132     int event_type;                  /* FLB_INPUT_LOGS, FLB_INPUT_METRICS */
133 
134     /*
135      * The instance flags are derivated from the fixed plugin flags. This
136      * is done to offer some flexibility where a plugin instance per
137      * configuration would like to change some specific behavior.
138      *
139      * e.g By default in_tail plugin supports fixed tag, but if a wildcard
140      * is added to the 'tag', it will instruct to perform dyntag operations
141      * as the tags will be composed used the file name being watched.
142      */
143     int flags;
144 
145     int id;                              /* instance id                  */
146     int log_level;                       /* log level for this plugin    */
147     flb_pipefd_t channel[2];             /* pipe(2) channel              */
148     int threaded;                        /* bool / Threaded instance ?   */
149     char name[32];                       /* numbered name (cpu -> cpu.0) */
150     char *alias;                         /* alias name for the instance  */
151     void *context;                       /* plugin configuration context */
152     struct flb_input_plugin *p;          /* original plugin              */
153 
154     /* Plugin properties */
155     char *tag;                           /* Input tag for routing        */
156     int tag_len;
157 
158     /* By default all input instances are 'routable' */
159     int routable;
160 
161     /* flag to pause input when storage is full */
162     int storage_pause_on_chunks_overlimit;
163 
164     /*
165      * Input network info:
166      *
167      * An input plugin can be specified just using it shortname or using the
168      * complete network address format, e.g:
169      *
170      *  $ fluent-bit -i cpu -o plugin://hostname:port/uri
171      *
172      * where:
173      *
174      *   plugin   = the output plugin shortname
175      *   name     = IP address or hostname of the target
176      *   port     = target TCP port
177      *   uri      = extra information that may be used by the plugin
178      */
179     struct flb_net_host host;
180 
181     /* Reference to struct flb_storage_input context */
182     void *storage;
183 
184     /* Type of storage: CIO_STORE_FS (filesystem) or CIO_STORE_MEM (memory) */
185     int storage_type;
186 
187     /*
188      * Buffers counter: it count the total of memory used by fixed and dynamic
189      * messgage pack buffers used by the input plugin instance.
190      */
191     size_t mem_chunks_size;
192     size_t mp_total_buf_size; /* FIXME: to be deprecated */
193 
194     /*
195      * Buffer limit: optional limit set by configuration so this input instance
196      * cannot exceed more than mp_buf_limit (bytes unit).
197      *
198      * As a reference, if an input plugin exceeds the limit, the pause() callback
199      * will be triggered to notirfy the input instance it cannot longer append
200      * more data, on that moment Fluent Bit will avoid to add more records.
201      *
202      * When the buffer size goes down (because data was flushed), a resume()
203      * callback will be triggered, from that moment the plugin can append more
204      * data.
205      */
206     size_t mem_buf_limit;
207 
208     /*
209      * Define the buffer status:
210      *
211      * - FLB_INPUT_RUNNING -> can append more data
212      * - FLB_INPUT_PAUSED  -> cannot append data
213      */
214     int mem_buf_status;
215 
216     /*
217      * Define the buffer status:
218      *
219      * - FLB_INPUT_RUNNING -> can append more data
220      * - FLB_INPUT_PAUSED  -> cannot append data
221      */
222     int storage_buf_status;
223 
224     /*
225      * Optional data passed to the plugin, this info is useful when
226      * running Fluent Bit in library mode and the target plugin needs
227      * some specific data from it caller.
228      */
229     void *data;
230 
231     struct mk_list *config_map;          /* configuration map        */
232 
233     struct mk_list _head;                /* link to config->inputs     */
234 
235     struct mk_list routes_direct;        /* direct routes set by API   */
236     struct mk_list routes;               /* flb_router_path's list     */
237     struct mk_list properties;           /* properties / configuration */
238     struct mk_list collectors;           /* collectors                 */
239 
240     /* Storage Chunks */
241     struct mk_list chunks;               /* linked list of all chunks  */
242 
243     /*
244      * The following list helps to separate the chunks per it
245      * status, it can be 'up' or 'down'.
246      */
247     struct mk_list chunks_up;            /* linked list of all chunks up */
248     struct mk_list chunks_down;          /* linked list of all chunks down */
249 
250     /*
251      * Every co-routine created by the engine when flushing data, it's
252      * linked into this list header.
253      */
254     struct mk_list tasks;
255 
256     struct mk_list coros;                /* list of input coros         */
257 
258 #ifdef FLB_HAVE_METRICS
259 
260     /* old metrics API */
261     struct flb_metrics *metrics;         /* metrics                    */
262 #endif
263 
264     /*
265      * CMetrics
266      * --------
267      */
268     struct cmt *cmt;                     /* parent context              */
269     struct cmt_counter *cmt_bytes;       /* metric: input_bytes_total   */
270     struct cmt_counter *cmt_records;     /* metric: input_records_total */
271 
272     /*
273      * Indexes for generated chunks: simple hash tables that keeps the latest
274      * available chunks for writing data operations. This optimize the
275      * lookup for candidates chunks to write data.
276      *
277      * Starting from v1.8 we have separate hash tables for logs and metrics.
278      */
279     struct flb_hash *ht_log_chunks;
280     struct flb_hash *ht_metric_chunks;
281 
282     /* Keep a reference to the original context this instance belongs to */
283     struct flb_config *config;
284 };
285 
286 struct flb_input_collector {
287     int id;                              /* collector id               */
288     int type;                            /* collector type             */
289     int running;                         /* is running ? (True/False)  */
290 
291     /* FLB_COLLECT_FD_EVENT */
292     flb_pipefd_t fd_event;               /* fd being watched           */
293 
294     /* FLB_COLLECT_TIME */
295     flb_pipefd_t fd_timer;               /* timer fd                   */
296     time_t seconds;                      /* expire time in seconds     */
297     long nanoseconds;                    /* expire nanoseconds         */
298 
299     /* Callback */
300     int (*cb_collect) (struct flb_input_instance *,
301                        struct flb_config *, void *);
302 
303     struct mk_event event;
304 
305     /* General references */
306     struct flb_input_instance *instance; /* plugin instance             */
307     struct mk_list _head;                /* link to global collectors   */
308     struct mk_list _head_ins;            /* link to instance collectors */
309 };
310 
311 struct flb_input_coro {
312     int id;                      /* ID obtained from config->in_table_id */
313     time_t start_time;           /* start time  */
314     time_t end_time;             /* end time    */
315     struct flb_config *config;   /* FLB context */
316     struct flb_coro *coro;       /* Back reference to parent thread */
317     struct mk_list _head;        /* link to list on input_instance->coros */
318 };
319 
320 /*
321  * Every thread created for an input instance plugin, requires to have an
322  * unique Thread-ID. This function lookup the static table in the context
323  * and return the lowest available ID.
324  */
325 static FLB_INLINE
flb_input_coro_get_id(struct flb_config * config)326 int flb_input_coro_get_id(struct flb_config *config)
327 {
328     unsigned int i;
329 
330     for (i = 0; i < (sizeof(config->in_table_id)/sizeof(uint16_t)); i++) {
331         if (config->in_table_id[i] == 0) {
332             config->in_table_id[i] = FLB_TRUE;
333             return i;
334         }
335     }
336 
337     return -1;
338 }
339 
340 /*
341  * When an input thread ends, it needs to release it ID. This function
342  * just mark the ID as unused.
343  */
344 static FLB_INLINE
flb_input_coro_del_id(int id,struct flb_config * config)345 void flb_input_coro_del_id(int id, struct flb_config *config)
346 {
347     config->in_table_id[id] = FLB_FALSE;
348 }
349 
350 static FLB_INLINE
flb_input_coro_destroy_id(int id,struct flb_config * config)351 int flb_input_coro_destroy_id(int id, struct flb_config *config)
352 {
353     struct mk_list *tmp;
354     struct mk_list *head;
355     struct mk_list *head_th;
356     struct flb_input_coro *in_coro;
357     struct flb_input_instance *i_ins;
358 
359     /* Iterate input-instances to find the thread */
360     mk_list_foreach(head, &config->inputs) {
361         i_ins = mk_list_entry(head, struct flb_input_instance, _head);
362         mk_list_foreach_safe(head_th, tmp, &i_ins->coros) {
363             in_coro = mk_list_entry(head_th, struct flb_input_coro, _head);
364             if (in_coro->id != id) {
365                 continue;
366             }
367 
368             mk_list_del(&in_coro->_head);
369             flb_input_coro_del_id(id, config);
370             flb_coro_destroy(in_coro->coro);
371             flb_debug("[input] destroy input_thread id=%i", id);
372             return 0;
373         }
374     }
375 
376     return -1;
377 }
378 
379 static FLB_INLINE
flb_input_coro_create(struct flb_input_instance * ins,struct flb_config * config)380 struct flb_coro *flb_input_coro_create(struct flb_input_instance *ins,
381                                        struct flb_config *config)
382 {
383     int id;
384     struct flb_coro *coro;
385     struct flb_input_coro *in_coro;
386 
387     /* Try to obtain an id */
388     id = flb_input_coro_get_id(config);
389     if (id == -1) {
390         return NULL;
391     }
392 
393     /* Setup thread specific data */
394     in_coro = (struct flb_input_coro *) flb_malloc(sizeof(struct flb_input_coro));
395     if (!in_coro) {
396         flb_errno();
397         return NULL;
398     }
399 
400     coro = flb_coro_create(in_coro);
401     if (!coro) {
402         flb_free(in_coro);
403         return NULL;
404     }
405 
406     in_coro->id         = id;
407     in_coro->start_time = time(NULL);
408     in_coro->coro       = coro;
409     in_coro->config     = config;
410     mk_list_add(&in_coro->_head, &ins->coros);
411 
412     return coro;
413 }
414 
415 struct flb_libco_in_params {
416     struct flb_config *config;
417     struct flb_input_collector *coll;
418     struct flb_coro *coro;
419 };
420 
421 extern struct flb_libco_in_params libco_in_param;
422 
input_params_set(struct flb_coro * coro,struct flb_input_collector * coll,struct flb_config * config,void * context)423 static FLB_INLINE void input_params_set(struct flb_coro *coro,
424                              struct flb_input_collector *coll,
425                              struct flb_config *config,
426                              void *context)
427 {
428     /* Set callback parameters */
429     libco_in_param.coll    = coll;
430     libco_in_param.config  = config;
431     libco_in_param.coro    = coro;
432     co_switch(coro->callee);
433 }
434 
input_pre_cb_collect(void)435 static FLB_INLINE void input_pre_cb_collect(void)
436 {
437     struct flb_input_collector *coll = libco_in_param.coll;
438     struct flb_config *config = libco_in_param.config;
439     struct flb_coro *coro     = libco_in_param.coro;
440 
441     co_switch(coro->caller);
442     coll->cb_collect(coll->instance, config, coll->instance->context);
443 }
444 
445 static FLB_INLINE
flb_input_coro_collect(struct flb_input_collector * coll,struct flb_config * config)446 struct flb_coro *flb_input_coro_collect(struct flb_input_collector *coll,
447                                           struct flb_config *config)
448 {
449     size_t stack_size;
450     struct flb_coro *coro;
451 
452     coro = flb_input_coro_create(coll->instance, config);
453     if (!coro) {
454         return NULL;
455     }
456 
457     coro->caller = co_active();
458     coro->callee = co_create(config->coro_stack_size,
459                              input_pre_cb_collect, &stack_size);
460 
461 #ifdef FLB_HAVE_VALGRIND
462     coro->valgrind_stack_id = VALGRIND_STACK_REGISTER(coro->callee,
463                                                       ((char *)coro->callee) + stack_size);
464 #endif
465 
466     /* Set parameters */
467     input_params_set(coro, coll, config, coll->instance->context);
468     return coro;
469 }
470 
471 /*
472  * This function is used by the output plugins to return. It's mandatory
473  * as it will take care to signal the event loop letting know the flush
474  * callback has done.
475  *
476  * The signal emmited indicate the 'Task' number that have finished plus
477  * a return value. The return value is either FLB_OK, FLB_RETRY or FLB_ERROR.
478  *
479  * If the caller have requested a FLB_RETRY, it will be issued depending of the
480  * number of retries, if it have exceed the 'retry_limit' option, a FLB_ERROR
481  * will be returned instead.
482  */
flb_input_return(struct flb_coro * coro)483 static inline void flb_input_return(struct flb_coro *coro) {
484     int n;
485     uint64_t val;
486     struct flb_input_coro *in_coro;
487 
488     in_coro = (struct flb_input_coro *) FLB_CORO_DATA(coro);
489 
490     /*
491      * To compose the signal event the relevant info is:
492      *
493      * - Unique Task events id: 2 in this case
494      * - Return value: FLB_OK (0) or FLB_ERROR (1)
495      * - Task ID
496      *
497      * We put together the return value with the task_id on the 32 bits at right
498      */
499     val = FLB_BITS_U64_SET(3 /* FLB_ENGINE_IN_COROREAD */, in_coro->id);
500     n = flb_pipe_w(in_coro->config->ch_manager[1], (void *) &val, sizeof(val));
501     if (n == -1) {
502         flb_errno();
503     }
504 }
505 
flb_input_buf_paused(struct flb_input_instance * i)506 static inline int flb_input_buf_paused(struct flb_input_instance *i)
507 {
508     if (i->mem_buf_status == FLB_INPUT_PAUSED) {
509         return FLB_TRUE;
510     }
511     if (i->storage_buf_status == FLB_INPUT_PAUSED) {
512         return FLB_TRUE;
513     }
514 
515     return FLB_FALSE;
516 }
517 
flb_input_config_map_set(struct flb_input_instance * ins,void * context)518 static inline int flb_input_config_map_set(struct flb_input_instance *ins,
519                                            void *context)
520 {
521     return flb_config_map_set(&ins->properties, ins->config_map, context);
522 }
523 
524 int flb_input_register_all(struct flb_config *config);
525 struct flb_input_instance *flb_input_new(struct flb_config *config,
526                                          const char *input, void *data,
527                                          int public_only);
528 int flb_input_set_property(struct flb_input_instance *ins,
529                            const char *k, const char *v);
530 const char *flb_input_get_property(const char *key,
531                                    struct flb_input_instance *ins);
532 
533 int flb_input_check(struct flb_config *config);
534 void flb_input_set_context(struct flb_input_instance *ins, void *context);
535 int flb_input_channel_init(struct flb_input_instance *ins);
536 
537 int flb_input_collector_start(int coll_id, struct flb_input_instance *ins);
538 int flb_input_collectors_start(struct flb_config *config);
539 int flb_input_collector_pause(int coll_id, struct flb_input_instance *ins);
540 int flb_input_collector_resume(int coll_id, struct flb_input_instance *ins);
541 int flb_input_collector_delete(int coll_id, struct flb_input_instance *ins);
542 int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config);
543 int flb_input_set_collector_time(struct flb_input_instance *ins,
544                                  int (*cb_collect) (struct flb_input_instance *,
545                                                     struct flb_config *, void *),
546                                  time_t seconds,
547                                  long   nanoseconds,
548                                  struct flb_config *config);
549 int flb_input_set_collector_event(struct flb_input_instance *ins,
550                                   int (*cb_collect) (struct flb_input_instance *,
551                                                      struct flb_config *, void *),
552                                   flb_pipefd_t fd,
553                                   struct flb_config *config);
554 int flb_input_set_collector_socket(struct flb_input_instance *ins,
555                                    int (*cb_new_connection) (struct flb_input_instance *,
556                                                              struct flb_config *,
557                                                              void*),
558                                    flb_pipefd_t fd,
559                                    struct flb_config *config);
560 int flb_input_collector_running(int coll_id, struct flb_input_instance *ins);
561 int flb_input_instance_init(struct flb_input_instance *ins,
562                             struct flb_config *config);
563 void flb_input_instance_exit(struct flb_input_instance *ins,
564                              struct flb_config *config);
565 void flb_input_instance_destroy(struct flb_input_instance *ins);
566 
567 int flb_input_init_all(struct flb_config *config);
568 void flb_input_pre_run_all(struct flb_config *config);
569 void flb_input_exit_all(struct flb_config *config);
570 
571 void *flb_input_flush(struct flb_input_instance *ins, size_t *size);
572 int flb_input_pause_all(struct flb_config *config);
573 const char *flb_input_name(struct flb_input_instance *ins);
574 int flb_input_name_exists(const char *name, struct flb_config *config);
575 
576 void flb_input_net_default_listener(const char *listen, int port,
577                                     struct flb_input_instance *ins);
578 
579 int flb_input_event_type_is_metric(struct flb_input_instance *ins);
580 int flb_input_event_type_is_log(struct flb_input_instance *ins);
581 
582 #endif
583