1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <stdio.h>
22 #include <stdlib.h>
23
24 #include <fluent-bit/flb_info.h>
25 #include <fluent-bit/flb_config.h>
26 #include <fluent-bit/flb_input.h>
27 #include <fluent-bit/flb_input_chunk.h>
28 #include <fluent-bit/flb_output.h>
29 #include <fluent-bit/flb_router.h>
30 #include <fluent-bit/flb_task.h>
31 #include <fluent-bit/flb_mem.h>
32 #include <fluent-bit/flb_str.h>
33 #include <fluent-bit/flb_scheduler.h>
34
35 /*
36 * Every task created must have an unique ID, this function lookup the
37 * lowest number available in the tasks_map.
38 *
39 * This 'id' is used by the task interface to communicate with the engine event
40 * loop about some action.
41 */
42
map_get_task_id(struct flb_config * config)43 static inline int map_get_task_id(struct flb_config *config)
44 {
45 int i;
46 int map_size = (sizeof(config->tasks_map) / sizeof(struct flb_task_map));
47
48 for (i = 0; i < map_size; i++) {
49 if (config->tasks_map[i].task == NULL) {
50 return i;
51 }
52 }
53
54 return -1;
55 }
56
map_set_task_id(int id,struct flb_task * task,struct flb_config * config)57 static inline void map_set_task_id(int id, struct flb_task *task,
58 struct flb_config *config)
59 {
60 config->tasks_map[id].task = task;
61
62 }
63
map_free_task_id(int id,struct flb_config * config)64 static inline void map_free_task_id(int id, struct flb_config *config)
65 {
66 config->tasks_map[id].task = NULL;
67 }
68
flb_task_retry_destroy(struct flb_task_retry * retry)69 void flb_task_retry_destroy(struct flb_task_retry *retry)
70 {
71 int ret;
72
73 /* Make sure to invalidate any request from the scheduler */
74 ret = flb_sched_request_invalidate(retry->parent->config, retry);
75 if (ret == 0) {
76 flb_debug("[retry] task retry=%p, invalidated from the scheduler",
77 retry);
78 }
79
80 mk_list_del(&retry->_head);
81 flb_free(retry);
82 }
83
84 /*
85 * For an existing task 'retry', re-schedule it. One of the use case of this function
86 * is when the engine dispatcher fails to bring the chunk up due to Chunk I/O
87 * configuration restrictions, the task needs to be re-scheduled.
88 */
flb_task_retry_reschedule(struct flb_task_retry * retry,struct flb_config * config)89 int flb_task_retry_reschedule(struct flb_task_retry *retry, struct flb_config *config)
90 {
91 int seconds;
92 struct flb_task *task;
93
94 task = retry->parent;
95 seconds = flb_sched_request_create(config, retry, retry->attempts);
96 if (seconds == -1) {
97 /*
98 * This is the worse case scenario: 'cannot re-schedule a retry'. If the Chunk
99 * resides only in memory, it will be lost. */
100 flb_warn("[task] retry for task %i could not be re-scheduled", task->id);
101 flb_task_retry_destroy(retry);
102 if (task->users == 0 && mk_list_size(&task->retries) == 0) {
103 flb_task_destroy(task, FLB_TRUE);
104 }
105 return -1;
106 }
107 else {
108 flb_info("[task] re-schedule retry=%p %i in the next %i seconds",
109 retry, task->id, seconds);
110 }
111
112 return 0;
113 }
114
flb_task_retry_create(struct flb_task * task,struct flb_output_instance * ins)115 struct flb_task_retry *flb_task_retry_create(struct flb_task *task,
116 struct flb_output_instance *ins)
117 {
118 struct mk_list *head;
119 struct mk_list *tmp;
120 struct flb_task_retry *retry = NULL;
121
122 /* First discover if is there any previous retry context in the task */
123 mk_list_foreach_safe(head, tmp, &task->retries) {
124 retry = mk_list_entry(head, struct flb_task_retry, _head);
125 if (retry->o_ins == ins) {
126 if (retry->attempts >= ins->retry_limit && ins->retry_limit >= 0) {
127 flb_debug("[task] task_id=%i reached retry-attempts limit %i/%i",
128 task->id, retry->attempts, ins->retry_limit);
129 flb_task_retry_destroy(retry);
130 return NULL;
131 }
132 break;
133 }
134 retry = NULL;
135 }
136
137 if (!retry) {
138 /* Create a new re-try instance */
139 retry = flb_malloc(sizeof(struct flb_task_retry));
140 if (!retry) {
141 flb_errno();
142 return NULL;
143 }
144
145 retry->attempts = 1;
146 retry->o_ins = ins;
147 retry->parent = task;
148 mk_list_add(&retry->_head, &task->retries);
149
150 flb_debug("[retry] new retry created for task_id=%i attempts=%i",
151 task->id, retry->attempts);
152 }
153 else {
154 retry->attempts++;
155 flb_debug("[retry] re-using retry for task_id=%i attempts=%i",
156 task->id, retry->attempts);
157 }
158
159 /*
160 * This 'retry' was issued by an output plugin, from an Engine perspective
161 * we need to determinate if the source input plugin have some memory
162 * restrictions and if the Storage type is 'filesystem' we need to put
163 * the file content down.
164 */
165 flb_input_chunk_set_up_down(task->ic);
166
167 /*
168 * Besides limits adjusted above, a retry that's going to only one place
169 * must be down.
170 */
171 if (mk_list_size(&task->routes) == 1) {
172 flb_input_chunk_down(task->ic);
173 }
174
175 return retry;
176 }
177
178 /*
179 * Return FLB_TRUE or FLB_FALSE if the chunk pointed by the task was
180 * created on this running instance or it comes from a chunk in the
181 * filesystem from a previous run.
182 */
flb_task_from_fs_storage(struct flb_task * task)183 int flb_task_from_fs_storage(struct flb_task *task)
184 {
185 struct flb_input_chunk *ic;
186
187 ic = (struct flb_input_chunk *) task->ic;
188 return ic->fs_backlog;
189 }
190
flb_task_retry_count(struct flb_task * task,void * data)191 int flb_task_retry_count(struct flb_task *task, void *data)
192 {
193 struct mk_list *head;
194 struct flb_task_retry *retry;
195 struct flb_output_instance *o_ins;
196 struct flb_output_coro *out_coro;
197
198 out_coro = (struct flb_output_coro *) FLB_CORO_DATA(data);
199 o_ins = out_coro->o_ins;
200
201 /* Delete 'retries' only associated with the output instance */
202 mk_list_foreach(head, &task->retries) {
203 retry = mk_list_entry(head, struct flb_task_retry, _head);
204 if (retry->o_ins == o_ins) {
205 return retry->attempts;
206 }
207 }
208
209 return -1;
210 }
211
212 /* Check if a 'retry' context exists for a specific task, if so, cleanup */
flb_task_retry_clean(struct flb_task * task,struct flb_output_instance * ins)213 int flb_task_retry_clean(struct flb_task *task, struct flb_output_instance *ins)
214 {
215 struct mk_list *tmp;
216 struct mk_list *head;
217 struct flb_task_retry *retry;
218
219 /* Delete 'retries' only associated with the output instance */
220 mk_list_foreach_safe(head, tmp, &task->retries) {
221 retry = mk_list_entry(head, struct flb_task_retry, _head);
222 if (retry->o_ins == ins) {
223 flb_task_retry_destroy(retry);
224 return 0;
225 }
226 }
227
228 return -1;
229 }
230
231 /* Allocate an initialize a basic Task structure */
task_alloc(struct flb_config * config)232 static struct flb_task *task_alloc(struct flb_config *config)
233 {
234 int task_id;
235 struct flb_task *task;
236
237 /* Allocate the new task */
238 task = (struct flb_task *) flb_calloc(1, sizeof(struct flb_task));
239 if (!task) {
240 flb_errno();
241 return NULL;
242 }
243
244 /* Get ID and set back 'task' reference */
245 task_id = map_get_task_id(config);
246 if (task_id == -1) {
247 flb_free(task);
248 return NULL;
249 }
250 map_set_task_id(task_id, task, config);
251
252 flb_trace("[task %p] created (id=%i)", task, task_id);
253
254 /* Initialize minimum variables */
255 task->id = task_id;
256 task->config = config;
257 task->status = FLB_TASK_NEW;
258 task->users = 0;
259 mk_list_init(&task->routes);
260 mk_list_init(&task->retries);
261
262 return task;
263 }
264
265 /* Return the number of tasks with 'running status' */
flb_task_running_count(struct flb_config * config)266 int flb_task_running_count(struct flb_config *config)
267 {
268 int count = 0;
269 struct mk_list *head;
270 struct mk_list *t_head;
271 struct flb_task *task;
272 struct flb_input_instance *ins;
273
274 mk_list_foreach(head, &config->inputs) {
275 ins = mk_list_entry(head, struct flb_input_instance, _head);
276 mk_list_foreach(t_head, &ins->tasks) {
277 task = mk_list_entry(t_head, struct flb_task, _head);
278 if (task->users > 0) {
279 count++;
280 }
281 }
282 }
283
284 return count;
285 }
286
flb_task_running_print(struct flb_config * config)287 int flb_task_running_print(struct flb_config *config)
288 {
289 int count = 0;
290 flb_sds_t tmp;
291 flb_sds_t routes;
292 struct mk_list *head;
293 struct mk_list *t_head;
294 struct mk_list *r_head;
295 struct flb_task *task;
296 struct flb_task_route *route;
297 struct flb_input_instance *ins;
298
299 routes = flb_sds_create_size(256);
300 if (!routes) {
301 flb_error("[task] cannot allocate space to report pending tasks");
302 return -1;
303 }
304
305 mk_list_foreach(head, &config->inputs) {
306 ins = mk_list_entry(head, struct flb_input_instance, _head);
307 count = mk_list_size(&ins->tasks);
308 flb_info("[task] %s/%s has %i pending task(s):",
309 ins->p->name, flb_input_name(ins), count);
310 mk_list_foreach(t_head, &ins->tasks) {
311 task = mk_list_entry(t_head, struct flb_task, _head);
312
313 mk_list_foreach(r_head, &task->routes) {
314 route = mk_list_entry(r_head, struct flb_task_route, _head);
315 tmp = flb_sds_printf(&routes, "%s/%s ",
316 route->out->p->name,
317 flb_output_name(route->out));
318 if (!tmp) {
319 flb_sds_destroy(routes);
320 flb_error("[task] cannot print report for pending tasks");
321 return -1;
322 }
323 routes = tmp;
324 }
325
326 flb_info("[task] task_id=%i still running on route(s): %s",
327 task->id, routes);
328 flb_sds_len_set(routes, 0);
329 }
330 }
331 flb_sds_destroy(routes);
332 return 0;
333 }
334
335 /* Create an engine task to handle the output plugin flushing work */
flb_task_create(uint64_t ref_id,const char * buf,size_t size,struct flb_input_instance * i_ins,struct flb_input_chunk * ic,const char * tag_buf,int tag_len,struct flb_config * config,int * err)336 struct flb_task *flb_task_create(uint64_t ref_id,
337 const char *buf,
338 size_t size,
339 struct flb_input_instance *i_ins,
340 struct flb_input_chunk *ic,
341 const char *tag_buf, int tag_len,
342 struct flb_config *config,
343 int *err)
344 {
345 int count = 0;
346 struct flb_task *task;
347 struct flb_task_route *route;
348 struct flb_router_path *route_path;
349 struct flb_output_instance *o_ins;
350 struct flb_input_chunk *task_ic;
351 struct mk_list *i_head;
352 struct mk_list *o_head;
353
354 /* No error status */
355 *err = FLB_FALSE;
356
357 /* allocate task */
358 task = task_alloc(config);
359 if (!task) {
360 *err = FLB_TRUE;
361 return NULL;
362 }
363
364 /* create a copy of the tag */
365 task->tag = flb_malloc(tag_len + 1);
366 if (!task->tag) {
367 flb_errno();
368 flb_free(task);
369 *err = FLB_TRUE;
370 return NULL;
371 }
372 memcpy(task->tag, tag_buf, tag_len);
373 task->tag[tag_len] = '\0';
374 task->tag_len = tag_len;
375
376 task_ic = (struct flb_input_chunk *) ic;
377 task_ic->task = task;
378
379 /* Keep track of origins */
380 task->ref_id = ref_id;
381 task->buf = buf;
382 task->size = size;
383 task->i_ins = i_ins;
384 task->ic = ic;
385 mk_list_add(&task->_head, &i_ins->tasks);
386
387 #ifdef FLB_HAVE_METRICS
388 task->records = ((struct flb_input_chunk *) ic)->total_records;
389 #endif
390
391 /* Direct connects betweek input <> outputs (API based) */
392 if (mk_list_size(&i_ins->routes_direct) > 0) {
393 mk_list_foreach(i_head, &i_ins->routes_direct) {
394 route_path = mk_list_entry(i_head, struct flb_router_path, _head);
395 o_ins = route_path->ins;
396
397 route = flb_malloc(sizeof(struct flb_task_route));
398 if (!route) {
399 flb_errno();
400 task->buf = NULL;
401 flb_task_destroy(task, FLB_TRUE);
402 return NULL;
403 }
404
405 route->out = o_ins;
406 mk_list_add(&route->_head, &task->routes);
407 }
408 flb_debug("[task] created direct task=%p id=%i OK", task, task->id);
409 return task;
410 }
411
412 /* Find matching routes for the incoming task */
413 mk_list_foreach(o_head, &config->outputs) {
414 o_ins = mk_list_entry(o_head,
415 struct flb_output_instance, _head);
416
417 /* skip output plugins that don't handle proper event types */
418 if (!flb_router_match_type(ic->event_type, o_ins)) {
419 continue;
420 }
421
422 if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id) != 0) {
423 route = flb_malloc(sizeof(struct flb_task_route));
424 if (!route) {
425 flb_errno();
426 continue;
427 }
428
429 route->out = o_ins;
430 mk_list_add(&route->_head, &task->routes);
431 count++;
432 }
433 }
434
435 /* no destinations ?, useless task. */
436 if (count == 0) {
437 flb_debug("[task] created task=%p id=%i without routes, dropping.",
438 task, task->id);
439 task->buf = NULL;
440 flb_task_destroy(task, FLB_TRUE);
441 return NULL;
442 }
443
444 flb_debug("[task] created task=%p id=%i OK", task, task->id);
445 return task;
446 }
447
flb_task_destroy(struct flb_task * task,int del)448 void flb_task_destroy(struct flb_task *task, int del)
449 {
450 struct mk_list *tmp;
451 struct mk_list *head;
452 struct flb_task_route *route;
453 struct flb_task_retry *retry;
454
455 flb_debug("[task] destroy task=%p (task_id=%i)", task, task->id);
456
457 /* Release task_id */
458 map_free_task_id(task->id, task->config);
459
460 /* Remove routes */
461 mk_list_foreach_safe(head, tmp, &task->routes) {
462 route = mk_list_entry(head, struct flb_task_route, _head);
463 mk_list_del(&route->_head);
464 flb_free(route);
465 }
466
467 /* Unlink and release task */
468 mk_list_del(&task->_head);
469
470 /* destroy chunk */
471 flb_input_chunk_destroy(task->ic, del);
472
473 /* Remove 'retries' */
474 mk_list_foreach_safe(head, tmp, &task->retries) {
475 retry = mk_list_entry(head, struct flb_task_retry, _head);
476 flb_task_retry_destroy(retry);
477 }
478
479 flb_input_chunk_set_limits(task->i_ins);
480 flb_free(task->tag);
481 flb_free(task);
482 }
483