1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <stdio.h>
22 #include <stdlib.h>
23 
24 #include <fluent-bit/flb_info.h>
25 #include <fluent-bit/flb_config.h>
26 #include <fluent-bit/flb_input.h>
27 #include <fluent-bit/flb_input_chunk.h>
28 #include <fluent-bit/flb_output.h>
29 #include <fluent-bit/flb_router.h>
30 #include <fluent-bit/flb_task.h>
31 #include <fluent-bit/flb_mem.h>
32 #include <fluent-bit/flb_str.h>
33 #include <fluent-bit/flb_scheduler.h>
34 
35 /*
36  * Every task created must have an unique ID, this function lookup the
37  * lowest number available in the tasks_map.
38  *
39  * This 'id' is used by the task interface to communicate with the engine event
40  * loop about some action.
41  */
42 
map_get_task_id(struct flb_config * config)43 static inline int map_get_task_id(struct flb_config *config)
44 {
45     int i;
46     int map_size = (sizeof(config->tasks_map) / sizeof(struct flb_task_map));
47 
48     for (i = 0; i < map_size; i++) {
49         if (config->tasks_map[i].task == NULL) {
50             return i;
51         }
52     }
53 
54     return -1;
55 }
56 
map_set_task_id(int id,struct flb_task * task,struct flb_config * config)57 static inline void map_set_task_id(int id, struct flb_task *task,
58                                    struct flb_config *config)
59 {
60     config->tasks_map[id].task = task;
61 
62 }
63 
map_free_task_id(int id,struct flb_config * config)64 static inline void map_free_task_id(int id, struct flb_config *config)
65 {
66     config->tasks_map[id].task = NULL;
67 }
68 
flb_task_retry_destroy(struct flb_task_retry * retry)69 void flb_task_retry_destroy(struct flb_task_retry *retry)
70 {
71     int ret;
72 
73     /* Make sure to invalidate any request from the scheduler */
74     ret = flb_sched_request_invalidate(retry->parent->config, retry);
75     if (ret == 0) {
76         flb_debug("[retry] task retry=%p, invalidated from the scheduler",
77                   retry);
78     }
79 
80     mk_list_del(&retry->_head);
81     flb_free(retry);
82 }
83 
84 /*
85  * For an existing task 'retry', re-schedule it. One of the use case of this function
86  * is when the engine dispatcher fails to bring the chunk up due to Chunk I/O
87  * configuration restrictions, the task needs to be re-scheduled.
88  */
flb_task_retry_reschedule(struct flb_task_retry * retry,struct flb_config * config)89 int flb_task_retry_reschedule(struct flb_task_retry *retry, struct flb_config *config)
90 {
91     int seconds;
92     struct flb_task *task;
93 
94     task = retry->parent;
95     seconds = flb_sched_request_create(config, retry, retry->attempts);
96     if (seconds == -1) {
97         /*
98          * This is the worse case scenario: 'cannot re-schedule a retry'. If the Chunk
99          * resides only in memory, it will be lost.  */
100         flb_warn("[task] retry for task %i could not be re-scheduled", task->id);
101         flb_task_retry_destroy(retry);
102         if (task->users == 0 && mk_list_size(&task->retries) == 0) {
103             flb_task_destroy(task, FLB_TRUE);
104         }
105         return -1;
106     }
107     else {
108         flb_info("[task] re-schedule retry=%p %i in the next %i seconds",
109                   retry, task->id, seconds);
110     }
111 
112     return 0;
113 }
114 
flb_task_retry_create(struct flb_task * task,struct flb_output_instance * ins)115 struct flb_task_retry *flb_task_retry_create(struct flb_task *task,
116                                              struct flb_output_instance *ins)
117 {
118     struct mk_list *head;
119     struct mk_list *tmp;
120     struct flb_task_retry *retry = NULL;
121 
122     /* First discover if is there any previous retry context in the task */
123     mk_list_foreach_safe(head, tmp, &task->retries) {
124         retry = mk_list_entry(head, struct flb_task_retry, _head);
125         if (retry->o_ins == ins) {
126             if (retry->attempts >= ins->retry_limit && ins->retry_limit >= 0) {
127                 flb_debug("[task] task_id=%i reached retry-attempts limit %i/%i",
128                           task->id, retry->attempts, ins->retry_limit);
129                 flb_task_retry_destroy(retry);
130                 return NULL;
131             }
132             break;
133         }
134         retry = NULL;
135     }
136 
137     if (!retry) {
138         /* Create a new re-try instance */
139         retry = flb_malloc(sizeof(struct flb_task_retry));
140         if (!retry) {
141             flb_errno();
142             return NULL;
143         }
144 
145         retry->attempts = 1;
146         retry->o_ins   = ins;
147         retry->parent  = task;
148         mk_list_add(&retry->_head, &task->retries);
149 
150         flb_debug("[retry] new retry created for task_id=%i attempts=%i",
151                   task->id, retry->attempts);
152     }
153     else {
154         retry->attempts++;
155         flb_debug("[retry] re-using retry for task_id=%i attempts=%i",
156                   task->id, retry->attempts);
157     }
158 
159     /*
160      * This 'retry' was issued by an output plugin, from an Engine perspective
161      * we need to determinate if the source input plugin have some memory
162      * restrictions and if the Storage type is 'filesystem' we need to put
163      * the file content down.
164      */
165     flb_input_chunk_set_up_down(task->ic);
166 
167     /*
168      * Besides limits adjusted above, a retry that's going to only one place
169      * must be down.
170      */
171     if (mk_list_size(&task->routes) == 1) {
172         flb_input_chunk_down(task->ic);
173     }
174 
175     return retry;
176 }
177 
178 /*
179  * Return FLB_TRUE or FLB_FALSE if the chunk pointed by the task was
180  * created on this running instance or it comes from a chunk in the
181  * filesystem from a previous run.
182  */
flb_task_from_fs_storage(struct flb_task * task)183 int flb_task_from_fs_storage(struct flb_task *task)
184 {
185     struct flb_input_chunk *ic;
186 
187     ic = (struct flb_input_chunk *) task->ic;
188     return ic->fs_backlog;
189 }
190 
flb_task_retry_count(struct flb_task * task,void * data)191 int flb_task_retry_count(struct flb_task *task, void *data)
192 {
193     struct mk_list *head;
194     struct flb_task_retry *retry;
195     struct flb_output_instance *o_ins;
196     struct flb_output_coro *out_coro;
197 
198     out_coro = (struct flb_output_coro *) FLB_CORO_DATA(data);
199     o_ins = out_coro->o_ins;
200 
201     /* Delete 'retries' only associated with the output instance */
202     mk_list_foreach(head, &task->retries) {
203         retry = mk_list_entry(head, struct flb_task_retry, _head);
204         if (retry->o_ins == o_ins) {
205             return retry->attempts;
206         }
207     }
208 
209     return -1;
210 }
211 
212 /* Check if a 'retry' context exists for a specific task, if so, cleanup */
flb_task_retry_clean(struct flb_task * task,struct flb_output_instance * ins)213 int flb_task_retry_clean(struct flb_task *task, struct flb_output_instance *ins)
214 {
215     struct mk_list *tmp;
216     struct mk_list *head;
217     struct flb_task_retry *retry;
218 
219     /* Delete 'retries' only associated with the output instance */
220     mk_list_foreach_safe(head, tmp, &task->retries) {
221         retry = mk_list_entry(head, struct flb_task_retry, _head);
222         if (retry->o_ins == ins) {
223             flb_task_retry_destroy(retry);
224             return 0;
225         }
226     }
227 
228     return -1;
229 }
230 
231 /* Allocate an initialize a basic Task structure */
task_alloc(struct flb_config * config)232 static struct flb_task *task_alloc(struct flb_config *config)
233 {
234     int task_id;
235     struct flb_task *task;
236 
237     /* Allocate the new task */
238     task = (struct flb_task *) flb_calloc(1, sizeof(struct flb_task));
239     if (!task) {
240         flb_errno();
241         return NULL;
242     }
243 
244     /* Get ID and set back 'task' reference */
245     task_id = map_get_task_id(config);
246     if (task_id == -1) {
247         flb_free(task);
248         return NULL;
249     }
250     map_set_task_id(task_id, task, config);
251 
252     flb_trace("[task %p] created (id=%i)", task, task_id);
253 
254     /* Initialize minimum variables */
255     task->id        = task_id;
256     task->config    = config;
257     task->status    = FLB_TASK_NEW;
258     task->users     = 0;
259     mk_list_init(&task->routes);
260     mk_list_init(&task->retries);
261 
262     return task;
263 }
264 
265 /* Return the number of tasks with 'running status' */
flb_task_running_count(struct flb_config * config)266 int flb_task_running_count(struct flb_config *config)
267 {
268     int count = 0;
269     struct mk_list *head;
270     struct mk_list *t_head;
271     struct flb_task *task;
272     struct flb_input_instance *ins;
273 
274     mk_list_foreach(head, &config->inputs) {
275         ins = mk_list_entry(head, struct flb_input_instance, _head);
276         mk_list_foreach(t_head, &ins->tasks) {
277             task = mk_list_entry(t_head, struct flb_task, _head);
278             if (task->users > 0) {
279                 count++;
280             }
281         }
282     }
283 
284     return count;
285 }
286 
flb_task_running_print(struct flb_config * config)287 int flb_task_running_print(struct flb_config *config)
288 {
289     int count = 0;
290     flb_sds_t tmp;
291     flb_sds_t routes;
292     struct mk_list *head;
293     struct mk_list *t_head;
294     struct mk_list *r_head;
295     struct flb_task *task;
296     struct flb_task_route *route;
297     struct flb_input_instance *ins;
298 
299     routes = flb_sds_create_size(256);
300     if (!routes) {
301         flb_error("[task] cannot allocate space to report pending tasks");
302         return -1;
303     }
304 
305     mk_list_foreach(head, &config->inputs) {
306         ins = mk_list_entry(head, struct flb_input_instance, _head);
307         count = mk_list_size(&ins->tasks);
308         flb_info("[task] %s/%s has %i pending task(s):",
309                  ins->p->name, flb_input_name(ins), count);
310         mk_list_foreach(t_head, &ins->tasks) {
311             task = mk_list_entry(t_head, struct flb_task, _head);
312 
313             mk_list_foreach(r_head, &task->routes) {
314                 route = mk_list_entry(r_head, struct flb_task_route, _head);
315                 tmp = flb_sds_printf(&routes, "%s/%s ",
316                                      route->out->p->name,
317                                      flb_output_name(route->out));
318                 if (!tmp) {
319                     flb_sds_destroy(routes);
320                     flb_error("[task] cannot print report for pending tasks");
321                     return -1;
322                 }
323                 routes = tmp;
324             }
325 
326             flb_info("[task]   task_id=%i still running on route(s): %s",
327                      task->id, routes);
328             flb_sds_len_set(routes, 0);
329         }
330     }
331     flb_sds_destroy(routes);
332     return 0;
333 }
334 
335 /* Create an engine task to handle the output plugin flushing work */
flb_task_create(uint64_t ref_id,const char * buf,size_t size,struct flb_input_instance * i_ins,struct flb_input_chunk * ic,const char * tag_buf,int tag_len,struct flb_config * config,int * err)336 struct flb_task *flb_task_create(uint64_t ref_id,
337                                  const char *buf,
338                                  size_t size,
339                                  struct flb_input_instance *i_ins,
340                                  struct flb_input_chunk *ic,
341                                  const char *tag_buf, int tag_len,
342                                  struct flb_config *config,
343                                  int *err)
344 {
345     int count = 0;
346     struct flb_task *task;
347     struct flb_task_route *route;
348     struct flb_router_path *route_path;
349     struct flb_output_instance *o_ins;
350     struct flb_input_chunk *task_ic;
351     struct mk_list *i_head;
352     struct mk_list *o_head;
353 
354     /* No error status */
355     *err = FLB_FALSE;
356 
357     /* allocate task */
358     task = task_alloc(config);
359     if (!task) {
360         *err = FLB_TRUE;
361         return NULL;
362     }
363 
364     /* create a copy of the tag */
365     task->tag = flb_malloc(tag_len + 1);
366     if (!task->tag) {
367         flb_errno();
368         flb_free(task);
369         *err = FLB_TRUE;
370         return NULL;
371     }
372     memcpy(task->tag, tag_buf, tag_len);
373     task->tag[tag_len] = '\0';
374     task->tag_len = tag_len;
375 
376     task_ic = (struct flb_input_chunk *) ic;
377     task_ic->task = task;
378 
379     /* Keep track of origins */
380     task->ref_id = ref_id;
381     task->buf    = buf;
382     task->size   = size;
383     task->i_ins  = i_ins;
384     task->ic     = ic;
385     mk_list_add(&task->_head, &i_ins->tasks);
386 
387 #ifdef FLB_HAVE_METRICS
388     task->records = ((struct flb_input_chunk *) ic)->total_records;
389 #endif
390 
391     /* Direct connects betweek input <> outputs (API based) */
392     if (mk_list_size(&i_ins->routes_direct) > 0) {
393         mk_list_foreach(i_head, &i_ins->routes_direct) {
394             route_path = mk_list_entry(i_head, struct flb_router_path, _head);
395             o_ins = route_path->ins;
396 
397             route = flb_malloc(sizeof(struct flb_task_route));
398             if (!route) {
399                 flb_errno();
400                 task->buf = NULL;
401                 flb_task_destroy(task, FLB_TRUE);
402                 return NULL;
403             }
404 
405             route->out = o_ins;
406             mk_list_add(&route->_head, &task->routes);
407         }
408         flb_debug("[task] created direct task=%p id=%i OK", task, task->id);
409         return task;
410     }
411 
412     /* Find matching routes for the incoming task */
413     mk_list_foreach(o_head, &config->outputs) {
414         o_ins = mk_list_entry(o_head,
415                               struct flb_output_instance, _head);
416 
417         /* skip output plugins that don't handle proper event types */
418         if (!flb_router_match_type(ic->event_type, o_ins)) {
419             continue;
420         }
421 
422         if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id) != 0) {
423             route = flb_malloc(sizeof(struct flb_task_route));
424             if (!route) {
425                 flb_errno();
426                 continue;
427             }
428 
429             route->out = o_ins;
430             mk_list_add(&route->_head, &task->routes);
431             count++;
432         }
433     }
434 
435     /* no destinations ?, useless task. */
436     if (count == 0) {
437         flb_debug("[task] created task=%p id=%i without routes, dropping.",
438                   task, task->id);
439         task->buf = NULL;
440         flb_task_destroy(task, FLB_TRUE);
441         return NULL;
442     }
443 
444     flb_debug("[task] created task=%p id=%i OK", task, task->id);
445     return task;
446 }
447 
flb_task_destroy(struct flb_task * task,int del)448 void flb_task_destroy(struct flb_task *task, int del)
449 {
450     struct mk_list *tmp;
451     struct mk_list *head;
452     struct flb_task_route *route;
453     struct flb_task_retry *retry;
454 
455     flb_debug("[task] destroy task=%p (task_id=%i)", task, task->id);
456 
457     /* Release task_id */
458     map_free_task_id(task->id, task->config);
459 
460     /* Remove routes */
461     mk_list_foreach_safe(head, tmp, &task->routes) {
462         route = mk_list_entry(head, struct flb_task_route, _head);
463         mk_list_del(&route->_head);
464         flb_free(route);
465     }
466 
467     /* Unlink and release task */
468     mk_list_del(&task->_head);
469 
470     /* destroy chunk */
471     flb_input_chunk_destroy(task->ic, del);
472 
473     /* Remove 'retries' */
474     mk_list_foreach_safe(head, tmp, &task->retries) {
475         retry = mk_list_entry(head, struct flb_task_retry, _head);
476         flb_task_retry_destroy(retry);
477     }
478 
479     flb_input_chunk_set_limits(task->i_ins);
480     flb_free(task->tag);
481     flb_free(task);
482 }
483