1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_input.h>
23 #include <fluent-bit/flb_input_chunk.h>
24 #include <fluent-bit/flb_input_plugin.h>
25 #include <fluent-bit/flb_storage.h>
26 #include <fluent-bit/flb_time.h>
27 #include <fluent-bit/flb_router.h>
28 #include <fluent-bit/flb_task.h>
29 #include <fluent-bit/flb_routes_mask.h>
30 #include <fluent-bit/flb_metrics.h>
31 #include <fluent-bit/stream_processor/flb_sp.h>
32 #include <chunkio/chunkio.h>
33 
34 #define BLOCK_UNTIL_KEYPRESS() {char temp_keypress_buffer; read(0, &temp_keypress_buffer, 1);}
35 
36 #define FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL  0
37 #define FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL 1
38 
39 extern ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin,
40                                                     size_t                      required_space);
41 
42 extern int sb_release_output_queue_space(struct flb_output_instance *output_plugin,
43                                          size_t                      required_space);
44 
45 static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic,
46                                        struct flb_input_chunk *old_ic,
47                                        uint64_t o_id);
48 
49 static int flb_input_chunk_is_task_safe_delete(struct flb_task *task);
50 
51 static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic);
52 
flb_input_chunk_get_releasable_space(struct flb_input_chunk * new_input_chunk,struct flb_input_instance * input_plugin,struct flb_output_instance * output_plugin,size_t required_space)53 static ssize_t flb_input_chunk_get_releasable_space(
54                                     struct flb_input_chunk     *new_input_chunk,
55                                     struct flb_input_instance  *input_plugin,
56                                     struct flb_output_instance *output_plugin,
57                                     size_t                      required_space)
58 {
59     struct mk_list         *input_chunk_iterator;
60     ssize_t                 releasable_space;
61     struct flb_input_chunk *old_input_chunk;
62 
63     releasable_space = 0;
64 
65     mk_list_foreach(input_chunk_iterator, &input_plugin->chunks) {
66         old_input_chunk = mk_list_entry(input_chunk_iterator, struct flb_input_chunk, _head);
67 
68         if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, output_plugin->id)) {
69             continue;
70         }
71 
72         if (flb_input_chunk_safe_delete(new_input_chunk, old_input_chunk,
73                                         output_plugin->id) == FLB_FALSE ||
74             flb_input_chunk_is_task_safe_delete(old_input_chunk->task) == FLB_FALSE) {
75             continue;
76         }
77 
78         releasable_space += flb_input_chunk_get_real_size(old_input_chunk);
79 
80         if (releasable_space >= required_space) {
81             break;
82         }
83     }
84 
85     return releasable_space;
86 }
87 
flb_input_chunk_release_space(struct flb_input_chunk * new_input_chunk,struct flb_input_instance * input_plugin,struct flb_output_instance * output_plugin,ssize_t required_space,int release_scope)88 static int flb_input_chunk_release_space(
89                     struct flb_input_chunk     *new_input_chunk,
90                     struct flb_input_instance  *input_plugin,
91                     struct flb_output_instance *output_plugin,
92                     ssize_t                     required_space,
93                     int                         release_scope)
94 {
95     struct mk_list         *input_chunk_iterator_tmp;
96     struct mk_list         *input_chunk_iterator;
97     int                     chunk_destroy_flag;
98     struct flb_input_chunk *old_input_chunk;
99     ssize_t                 released_space;
100     int                     chunk_released;
101     ssize_t                 chunk_size;
102 
103     released_space = 0;
104 
105     mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp,
106                          &input_plugin->chunks) {
107         old_input_chunk = mk_list_entry(input_chunk_iterator,
108                                              struct flb_input_chunk, _head);
109 
110         if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask,
111                                      output_plugin->id)) {
112             continue;
113         }
114 
115         if (flb_input_chunk_safe_delete(new_input_chunk,
116                                         old_input_chunk,
117                                         output_plugin->id) == FLB_FALSE ||
118             flb_input_chunk_is_task_safe_delete(old_input_chunk->task) == FLB_FALSE) {
119             continue;
120         }
121 
122         chunk_size = flb_input_chunk_get_real_size(old_input_chunk);
123         chunk_released = FLB_FALSE;
124         chunk_destroy_flag = FLB_FALSE;
125 
126         if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) {
127             flb_routes_mask_clear_bit(old_input_chunk->routes_mask,
128                                       output_plugin->id);
129 
130             output_plugin->fs_chunks_size -= chunk_size;
131 
132             chunk_destroy_flag = flb_routes_mask_is_empty(
133                                                 old_input_chunk->routes_mask);
134 
135             chunk_released = FLB_TRUE;
136         }
137         else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) {
138             chunk_destroy_flag = FLB_TRUE;
139         }
140 
141         if (chunk_destroy_flag) {
142             if (old_input_chunk->task != NULL) {
143                 /*
144                  * If the chunk is referenced by a task and task has no active route,
145                  * we need to destroy the task as well.
146                  */
147                 if (old_input_chunk->task->users == 0) {
148                     flb_debug("[task] drop task_id %d with no active route from input plugin %s",
149                               old_input_chunk->task->id, new_input_chunk->in->name);
150                     flb_task_destroy(old_input_chunk->task, FLB_TRUE);
151 
152                     chunk_released = FLB_TRUE;
153                 }
154             }
155             else {
156                 flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s",
157                           flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name);
158 
159                 flb_input_chunk_destroy(old_input_chunk, FLB_TRUE);
160 
161                 chunk_released = FLB_TRUE;
162             }
163         }
164 
165         if (chunk_released) {
166             released_space += chunk_size;
167         }
168 
169         if (released_space >= required_space) {
170             break;
171         }
172     }
173 
174     if (released_space < required_space) {
175         return -2;
176     }
177 
178     return 0;
179 }
180 
generate_chunk_name(struct flb_input_instance * in,char * out_buf,int buf_size)181 static void generate_chunk_name(struct flb_input_instance *in,
182                                 char *out_buf, int buf_size)
183 {
184     struct flb_time tm;
185     (void) in;
186 
187     flb_time_get(&tm);
188     snprintf(out_buf, buf_size - 1,
189              "%i-%lu.%4lu.flb",
190              getpid(),
191              tm.tm.tv_sec, tm.tm.tv_nsec);
192 }
193 
flb_input_chunk_get_size(struct flb_input_chunk * ic)194 ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic)
195 {
196     return cio_chunk_get_content_size(ic->chunk);
197 }
198 
199 /*
200  * When chunk is set to DOWN from memory, data_size is set to 0 and
201  * cio_chunk_get_content_size(1) returns the data_size. fs_chunks_size
202  * is used to track the size of chunks in filesystem so we need to call
203  * cio_chunk_get_real_size to return the original size in the file system
204  */
flb_input_chunk_get_real_size(struct flb_input_chunk * ic)205 static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic)
206 {
207     ssize_t meta_size;
208     ssize_t size;
209 
210     size = cio_chunk_get_real_size(ic->chunk);
211 
212     if (size != 0) {
213         return size;
214     }
215 
216     // Real size is not synced to chunk yet
217     size = flb_input_chunk_get_size(ic);
218     if (size == 0) {
219         flb_debug("[input chunk] no data in the chunk %s",
220                   flb_input_chunk_get_name(ic));
221         return -1;
222     }
223 
224     meta_size = cio_meta_size(ic->chunk);
225     size += meta_size
226         /* See https://github.com/edsiper/chunkio#file-layout for more details */
227          + 2    /* HEADER BYTES */
228          + 4    /* CRC32 */
229          + 16   /* PADDING */
230          + 2;   /* METADATA LENGTH BYTES */
231 
232     return size;
233 }
234 
flb_input_chunk_write(void * data,const char * buf,size_t len)235 int flb_input_chunk_write(void *data, const char *buf, size_t len)
236 {
237     int ret;
238     struct flb_input_chunk *ic;
239 
240     ic = (struct flb_input_chunk *) data;
241 
242     ret = cio_chunk_write(ic->chunk, buf, len);
243 #ifdef FLB_HAVE_METRICS
244     if (ret == CIO_OK) {
245         ic->added_records = flb_mp_count(buf, len);
246         ic->total_records += ic->added_records;
247     }
248 #endif
249 
250     return ret;
251 }
252 
flb_input_chunk_write_at(void * data,off_t offset,const char * buf,size_t len)253 int flb_input_chunk_write_at(void *data, off_t offset,
254                              const char *buf, size_t len)
255 {
256     int ret;
257     struct flb_input_chunk *ic;
258 
259     ic = (struct flb_input_chunk *) data;
260 
261     ret = cio_chunk_write_at(ic->chunk, offset, buf, len);
262     return ret;
263 }
264 
265 /*
266  * For input_chunk referenced by an outgoing task, we need to check
267  * whether the chunk is in the middle of output flush callback
268  */
flb_input_chunk_is_task_safe_delete(struct flb_task * task)269 static int flb_input_chunk_is_task_safe_delete(struct flb_task *task)
270 {
271     if (!task) {
272         return FLB_TRUE;
273     }
274 
275     if (task->users != 0) {
276         return FLB_FALSE;
277     }
278 
279     return FLB_TRUE;
280 }
281 
flb_input_chunk_safe_delete(struct flb_input_chunk * ic,struct flb_input_chunk * old_ic,uint64_t o_id)282 static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic,
283                                        struct flb_input_chunk *old_ic,
284                                        uint64_t o_id)
285 {
286     /* The chunk we want to drop should not be the incoming chunk */
287     if (ic == old_ic) {
288         return FLB_FALSE;
289     }
290 
291     /*
292      * Even if chunks from same input plugin have same routes_mask when created,
293      * the routes_mask could be modified when new chunks is ingested. Therefore,
294      * we still need to do the validation on the routes_mask with o_id.
295      */
296     if (flb_routes_mask_get_bit(old_ic->routes_mask, o_id) == 0) {
297         return FLB_FALSE;
298     }
299 
300     return FLB_TRUE;
301 }
302 
flb_input_chunk_release_space_compound(struct flb_input_chunk * new_input_chunk,struct flb_output_instance * output_plugin,size_t * local_release_requirement,int release_local_space)303 int flb_input_chunk_release_space_compound(
304                         struct flb_input_chunk *new_input_chunk,
305                         struct flb_output_instance *output_plugin,
306                         size_t *local_release_requirement,
307                         int release_local_space)
308 {
309     ssize_t                    segregated_backlog_releasable_space;
310     ssize_t                    active_backlog_releasable_space;
311     ssize_t                    active_plugin_releasable_space;
312     ssize_t                    required_space_remainder;
313     struct flb_input_instance *storage_backlog_instance;
314     int                        result;
315 
316     storage_backlog_instance = output_plugin->config->storage_input_plugin;
317 
318     *local_release_requirement = flb_input_chunk_get_real_size(new_input_chunk);
319     required_space_remainder = (ssize_t) *local_release_requirement;
320 
321     segregated_backlog_releasable_space = 0;
322     active_backlog_releasable_space = 0;
323     active_plugin_releasable_space = 0;
324 
325     active_backlog_releasable_space = flb_input_chunk_get_releasable_space(
326                                                         new_input_chunk,
327                                                         storage_backlog_instance,
328                                                         output_plugin,
329                                                         required_space_remainder);
330 
331     required_space_remainder -= active_backlog_releasable_space;
332 
333     if (required_space_remainder > 0) {
334         segregated_backlog_releasable_space = sb_get_releasable_output_queue_space(
335                                                             output_plugin,
336                                                             required_space_remainder);
337 
338         required_space_remainder -= segregated_backlog_releasable_space;
339     }
340 
341     if (required_space_remainder > 0) {
342         active_plugin_releasable_space = flb_input_chunk_get_releasable_space(
343                                                     new_input_chunk,
344                                                     new_input_chunk->in,
345                                                     output_plugin,
346                                                     required_space_remainder);
347 
348         required_space_remainder -= active_plugin_releasable_space;
349     }
350 
351     /* When we get here required_space_remainder could be negative but it's not a problem
352      * this happens when the weight of the removed chunk is higher than the remainder of
353      * the required space and it's not something that can nor should be prevented.
354      */
355 
356     if (required_space_remainder > 0) {
357         return -2;
358     }
359 
360     required_space_remainder = (ssize_t) *local_release_requirement;
361 
362     if (required_space_remainder > 0 && active_backlog_releasable_space > 0) {
363         result = flb_input_chunk_release_space(new_input_chunk,
364                                                storage_backlog_instance,
365                                                output_plugin,
366                                                active_backlog_releasable_space,
367                                                FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL);
368 
369         if (result) {
370             return -3;
371         }
372 
373         required_space_remainder -= active_backlog_releasable_space;
374     }
375 
376     if (required_space_remainder > 0 && segregated_backlog_releasable_space > 0) {
377         result = sb_release_output_queue_space(
378                                             output_plugin,
379                                             segregated_backlog_releasable_space);
380 
381         if (result) {
382             *local_release_requirement = (size_t) required_space_remainder;
383 
384             return -4;
385         }
386 
387         required_space_remainder -= segregated_backlog_releasable_space;
388     }
389 
390     if (release_local_space) {
391         if (required_space_remainder > 0 && active_plugin_releasable_space > 0) {
392             result = flb_input_chunk_release_space(new_input_chunk,
393                                                    new_input_chunk->in,
394                                                    output_plugin,
395                                                    active_plugin_releasable_space,
396                                                    FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL);
397 
398             if (result) {
399                 printf("FAILED\n");
400                 return -5;
401             }
402 
403             required_space_remainder -= active_plugin_releasable_space;
404         }
405     }
406 
407     if (required_space_remainder < 0) {
408         required_space_remainder = 0;
409     }
410 
411     *local_release_requirement = (size_t) required_space_remainder;
412 
413     return 0;
414 }
415 
416 /*
417  * Returns how many chunks needs to be dropped in order to get enough space to
418  * buffer the incoming data (with size chunk_size)
419  */
flb_intput_chunk_count_dropped_chunks(struct flb_input_chunk * ic,struct flb_output_instance * o_ins,size_t chunk_size)420 int flb_intput_chunk_count_dropped_chunks(struct flb_input_chunk *ic,
421                                           struct flb_output_instance *o_ins,
422                                           size_t chunk_size)
423 {
424     int count = 0;
425     int enough_space = FLB_FALSE;
426     ssize_t bytes_remained;
427     struct mk_list *head;
428     struct flb_input_chunk *old_ic;
429 
430     bytes_remained = o_ins->total_limit_size -
431                      o_ins->fs_chunks_size -
432                      o_ins->fs_backlog_chunks_size;
433 
434     mk_list_foreach(head, &ic->in->chunks) {
435         old_ic = mk_list_entry(head, struct flb_input_chunk, _head);
436 
437         if (flb_input_chunk_safe_delete(ic, old_ic, o_ins->id) == FLB_FALSE ||
438             flb_input_chunk_is_task_safe_delete(old_ic->task) == FLB_FALSE) {
439             continue;
440         }
441 
442         bytes_remained += flb_input_chunk_get_real_size(old_ic);
443         count++;
444         if (bytes_remained >= (ssize_t) chunk_size) {
445             enough_space = FLB_TRUE;
446             break;
447         }
448     }
449 
450     /*
451      * flb_intput_chunk_count_dropped_chunks(3) will only be called if the chunk will
452      * be flushing to the output instance passed in and the instance will reach its
453      * limit after appending the new data. This function will try to count how many
454      * chunks need to be dropped in order to place the incoing chunk.
455      *
456      * Return '0' means that we cannot find a slot to ingest the incoming data.
457      */
458     if (enough_space == FLB_FALSE) {
459         return 0;
460     }
461 
462     return count;
463 }
464 
465 /*
466  * Find a slot in the output instance to append the new data with size chunk_size, it
467  * will drop the the oldest chunks when the limitation on local disk is reached.
468  */
flb_input_chunk_find_space_new_data(struct flb_input_chunk * ic,size_t chunk_size,int overlimit)469 int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic,
470                                         size_t chunk_size, int overlimit)
471 {
472     int count;
473     int result;
474     ssize_t bytes;
475     ssize_t old_ic_bytes;
476     struct mk_list *tmp;
477     struct mk_list *head;
478     struct mk_list *head_chunk;
479     struct flb_output_instance *o_ins;
480     struct flb_input_chunk *old_ic;
481     size_t local_release_requirement;
482 
483     /*
484      * For each output instances that will be over the limit after adding the new chunk,
485      * we have to determine how many chunks needs to be removed. We will adjust the
486      * routes_mask to only route to the output plugin that have enough space after
487      * deleting some chunks fome the queue.
488      */
489     mk_list_foreach(head, &ic->in->config->outputs) {
490         count = 0;
491         o_ins = mk_list_entry(head, struct flb_output_instance, _head);
492 
493         if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 ||
494            (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) {
495             continue;
496         }
497 
498         local_release_requirement = 0;
499 
500         result = flb_input_chunk_release_space_compound(
501                                             ic, o_ins,
502                                             &local_release_requirement,
503                                             FLB_FALSE);
504 
505         if (!result && local_release_requirement == 0) {
506             /* If this function returned 0 it means the space requirement was
507              * satisfied solely by releasing chunks in either storage_backlog
508              * state (segregated or in queue)
509              */
510             continue;
511         }
512 
513         /* flb_input_chunk_find_space_new_data_backlog may fail to meet the space
514          * requirements but it always sets local_release_requirement to the right amount
515          */
516 
517         count = flb_intput_chunk_count_dropped_chunks(ic, o_ins, local_release_requirement);
518 
519         if (count == 0) {
520             /*
521              * The worst scenerio is that we cannot find a space by dropping some
522              * old chunks for the incoming chunk. We need to adjust the routes_mask
523              * of the incoming chunk to not flush to that output instance.
524              */
525             flb_error("[input chunk] no enough space in filesystem to buffer "
526                       "chunk %s in plugin %s", flb_input_chunk_get_name(ic), o_ins->name);
527 
528             flb_routes_mask_clear_bit(ic->routes_mask, o_ins->id);
529             if (flb_routes_mask_is_empty(ic->routes_mask)) {
530                 bytes = flb_input_chunk_get_size(ic);
531                 if (bytes != 0) {
532                     /*
533                      * Skip newly created chunk as newly created chunk
534                      * hasn't updated the fs_chunks_size yet.
535                      */
536                     bytes = flb_input_chunk_get_real_size(ic);
537                     o_ins->fs_chunks_size -= bytes;
538                     flb_debug("[input chunk] chunk %s has no output route, "
539                               "remove %ld bytes from fs_chunks_size",
540                               flb_input_chunk_get_name(ic), bytes);
541                 }
542             }
543 
544             continue;
545         }
546 
547         /*
548          * Here we need to drop some chunks from the beginning of chunks list.
549          * Since chunks are stored in a double linked list (mk_list), we are
550          * able to iterate the list from the beginning and check if the current
551          * chunk is able to be removed.
552          */
553         mk_list_foreach_safe(head_chunk, tmp, &ic->in->chunks) {
554             old_ic = mk_list_entry(head_chunk, struct flb_input_chunk, _head);
555 
556             if (flb_input_chunk_safe_delete(ic, old_ic, o_ins->id) == FLB_FALSE ||
557                 flb_input_chunk_is_task_safe_delete(old_ic->task) == FLB_FALSE) {
558                 continue;
559             }
560 
561             old_ic_bytes = flb_input_chunk_get_real_size(old_ic);
562 
563             /* drop chunk by adjusting the routes_mask */
564             flb_routes_mask_clear_bit(old_ic->routes_mask, o_ins->id);
565             o_ins->fs_chunks_size -= old_ic_bytes;
566 
567             flb_debug("[input chunk] remove route of chunk %s with size %ld bytes to output plugin %s "
568                       "to place the incoming data with size %ld bytes", flb_input_chunk_get_name(old_ic),
569                       old_ic_bytes, o_ins->name, chunk_size);
570 
571             if (flb_routes_mask_is_empty(old_ic->routes_mask)) {
572                 if (old_ic->task != NULL) {
573                     /*
574                      * If the chunk is referenced by a task and task has no active route,
575                      * we need to destroy the task as well.
576                      */
577                     if (old_ic->task->users == 0) {
578                         flb_debug("[task] drop task_id %d with no active route from input plugin %s",
579                                   old_ic->task->id, ic->in->name);
580                         flb_task_destroy(old_ic->task, FLB_TRUE);
581                     }
582                 }
583                 else {
584                     flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s",
585                               flb_input_chunk_get_name(old_ic), ic->in->name);
586                     flb_input_chunk_destroy(old_ic, FLB_TRUE);
587                 }
588             }
589 
590             count--;
591             if (count == 0) {
592                 /* we have dropped enough chunks to place the incoming chunks */
593                 break;
594             }
595         }
596     }
597 
598     if (count != 0) {
599         flb_error("[input chunk] fail to drop enough chunks in order to place new data");
600     }
601 
602     return 0;
603 }
604 
605 /*
606  * Returns a non-zero result if any output instances will reach the limit
607  * after buffering the new data
608  */
flb_input_chunk_has_overlimit_routes(struct flb_input_chunk * ic,size_t chunk_size)609 int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic,
610                                          size_t chunk_size)
611 {
612     int overlimit = 0;
613     struct mk_list *head;
614     struct flb_output_instance *o_ins;
615 
616     mk_list_foreach(head, &ic->in->config->outputs) {
617         o_ins = mk_list_entry(head, struct flb_output_instance, _head);
618 
619         if ((o_ins->total_limit_size == -1) ||
620             (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) {
621             continue;
622         }
623 
624         flb_debug("[input chunk] chunk %s required %ld bytes and %ld bytes left "
625                   "in plugin %s", flb_input_chunk_get_name(ic), chunk_size,
626                   o_ins->total_limit_size -
627                   o_ins->fs_backlog_chunks_size -
628                   o_ins->fs_chunks_size,
629                   o_ins->name);
630 
631         if ((o_ins->fs_chunks_size +
632              o_ins->fs_backlog_chunks_size +
633              chunk_size) > o_ins->total_limit_size) {
634             overlimit |= (1 << o_ins->id);
635         }
636     }
637 
638     return overlimit;
639 }
640 
641 /* Find a slot for the incoming data to buffer it in local file system
642  * returns 0 if none of the routes can be written to
643  */
flb_input_chunk_place_new_chunk(struct flb_input_chunk * ic,size_t chunk_size)644 int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_size)
645 {
646 	int overlimit;
647     overlimit = flb_input_chunk_has_overlimit_routes(ic, chunk_size);
648     if (overlimit != 0) {
649         flb_input_chunk_find_space_new_data(ic, chunk_size, overlimit);
650     }
651 
652     return !flb_routes_mask_is_empty(ic->routes_mask);
653 }
654 
655 /* Create an input chunk using a Chunk I/O */
flb_input_chunk_map(struct flb_input_instance * in,void * chunk)656 struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in,
657                                             void *chunk)
658 {
659     int records = 0;
660     int tag_len;
661     int has_routes;
662     int ret;
663     uint64_t ts;
664     char *buf_data;
665     size_t buf_size;
666     size_t offset;
667     ssize_t bytes;
668     const char *tag_buf;
669     struct flb_input_chunk *ic;
670 
671     /* Create context for the input instance */
672     ic = flb_calloc(1, sizeof(struct flb_input_chunk));
673     if (!ic) {
674         flb_errno();
675         return NULL;
676     }
677 
678     ic->busy = FLB_FALSE;
679     ic->fs_backlog = FLB_TRUE;
680     ic->chunk = chunk;
681     ic->in = in;
682     msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write);
683 
684     ret = cio_chunk_get_content(ic->chunk, &buf_data, &buf_size);
685     if (ret != CIO_OK) {
686         flb_error("[input chunk] error retrieving content for metrics");
687         flb_free(ic);
688         return NULL;
689     }
690 
691     /* Validate records in the chunk */
692     ret = flb_mp_validate_chunk(buf_data, buf_size, &records, &offset);
693     if (ret == -1) {
694         /* If there are valid records, truncate the chunk size */
695         if (records <= 0) {
696             flb_plg_error(in,
697                           "chunk validation failed, data might be corrupted. "
698                           "No valid records found, the chunk will be discarded.");
699             flb_free(ic);
700             return NULL;
701         }
702         if (records > 0 && offset > 32) {
703             flb_plg_warn(in,
704                          "chunk validation failed, data might be corrupted. "
705                          "Found %d valid records, failed content starts "
706                          "right after byte %lu. Recovering valid records.",
707                          records, offset);
708 
709             /* truncate the chunk to recover valid records */
710             cio_chunk_write_at(chunk, offset, NULL, 0);
711         }
712         else {
713             flb_plg_error(in,
714                           "chunk validation failed, data might be corrupted. "
715                           "Found %d valid records, failed content starts "
716                           "right after byte %lu. Cannot recover chunk,",
717                           records, offset);
718             flb_free(ic);
719             return NULL;
720         }
721     }
722 
723     if (records == 0) {
724         flb_plg_error(in,
725                       "chunk validation failed, data might be corrupted. "
726                       "No valid records found, the chunk will be discarded.");
727         flb_free(ic);
728         return NULL;
729     }
730 
731     /*
732      * If the content is valid and the chunk has extra padding zeros, just
733      * perform an adjustment.
734      */
735     bytes = cio_chunk_get_content_size(chunk);
736     if (bytes == -1) {
737         flb_free(ic);
738         return NULL;
739     }
740     if (offset < bytes) {
741         cio_chunk_write_at(chunk, offset, NULL, 0);
742     }
743 
744     /* Updat metrics */
745 #ifdef FLB_HAVE_METRICS
746     ic->total_records = records;
747     if (ic->total_records > 0) {
748         /* timestamp */
749         ts = cmt_time_now();
750 
751         /* fluentbit_input_records_total */
752         cmt_counter_add(in->cmt_records, ts, ic->total_records,
753                         1, (char *[]) {(char *) flb_input_name(in)});
754 
755         /* fluentbit_input_bytes_total */
756         cmt_counter_add(in->cmt_bytes, ts, buf_size,
757                         1, (char *[]) {(char *) flb_input_name(in)});
758 
759         /* OLD metrics */
760         flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics);
761         flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
762     }
763 #endif
764 
765     /* Get the the tag reference (chunk metadata) */
766     ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len);
767     if (ret == -1) {
768         flb_error("[input chunk] error retrieving tag of input chunk");
769         flb_free(ic);
770         return NULL;
771     }
772 
773     bytes = flb_input_chunk_get_real_size(ic);
774     if (bytes < 0) {
775         flb_warn("[input chunk] could not retrieve chunk real size");
776         flb_free(ic);
777         return NULL;
778     }
779 
780     has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag_buf, tag_len, in);
781     if (has_routes == 0) {
782         flb_warn("[input chunk] no matching route for backoff log chunk %s",
783                  flb_input_chunk_get_name(ic));
784     }
785 
786     mk_list_add(&ic->_head, &in->chunks);
787 
788     flb_input_chunk_update_output_instances(ic, bytes);
789 
790     return ic;
791 }
792 
flb_input_chunk_create(struct flb_input_instance * in,const char * tag,int tag_len)793 struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in,
794                                                const char *tag, int tag_len)
795 {
796     int ret;
797     int err;
798     int set_down = FLB_FALSE;
799     int has_routes;
800     char name[64];
801     struct cio_chunk *chunk;
802     struct flb_storage_input *storage;
803     struct flb_input_chunk *ic;
804 
805     storage = in->storage;
806 
807     /* chunk name */
808     generate_chunk_name(in, name, sizeof(name) - 1);
809 
810     /* open/create target chunk file */
811     chunk = cio_chunk_open(storage->cio, storage->stream, name,
812                            CIO_OPEN, FLB_INPUT_CHUNK_SIZE, &err);
813     if (!chunk) {
814         flb_error("[input chunk] could not create chunk file: %s:%s",
815                   storage->stream->name, name);
816         return NULL;
817     }
818     /*
819      * If the returned chunk at open is 'down', just put it up, write the
820      * content and set it down again.
821      */
822     ret = cio_chunk_is_up(chunk);
823     if (ret == CIO_FALSE) {
824         ret = cio_chunk_up_force(chunk);
825         if (ret == -1) {
826             cio_chunk_close(chunk, CIO_TRUE);
827             return NULL;
828         }
829         set_down = FLB_TRUE;
830     }
831 
832     /* write metadata (tag) */
833     if (tag_len > 65535) {
834         /* truncate length */
835         tag_len = 65535;
836     }
837 
838     /* Write tag into metadata section */
839     ret = cio_meta_write(chunk, (char *) tag, tag_len);
840     if (ret == -1) {
841         flb_error("[input chunk] could not write metadata");
842         cio_chunk_close(chunk, CIO_TRUE);
843         return NULL;
844     }
845 
846     /* Create context for the input instance */
847     ic = flb_calloc(1, sizeof(struct flb_input_chunk));
848     if (!ic) {
849         flb_errno();
850         cio_chunk_close(chunk, CIO_TRUE);
851         return NULL;
852     }
853 
854     /*
855      * Check chunk content type to be created: depending of the value set by
856      * the input plugin, this can be FLB_INPUT_LOGS or FLB_INPUT_METRICS
857      */
858     ic->event_type = in->event_type;
859 
860     ic->busy = FLB_FALSE;
861     ic->chunk = chunk;
862     ic->fs_backlog = FLB_FALSE;
863     ic->in = in;
864     ic->stream_off = 0;
865     ic->task = NULL;
866 #ifdef FLB_HAVE_METRICS
867     ic->total_records = 0;
868 #endif
869 
870     /* Calculate the routes_mask for the input chunk */
871     has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag, tag_len, in);
872     if (has_routes == 0) {
873         flb_trace("[input chunk] no matching route for input chunk '%s' with tag '%s'",
874                   flb_input_chunk_get_name(ic), tag);
875     }
876 
877     msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write);
878     mk_list_add(&ic->_head, &in->chunks);
879 
880     if (set_down == FLB_TRUE) {
881         cio_chunk_down(chunk);
882     }
883 
884     if (flb_input_event_type_is_log(in)) {
885         flb_hash_add(in->ht_log_chunks, tag, tag_len, ic, 0);
886     }
887     else if (flb_input_event_type_is_metric(in)) {
888         flb_hash_add(in->ht_metric_chunks, tag, tag_len, ic, 0);
889     }
890 
891     return ic;
892 }
893 
flb_input_chunk_destroy(struct flb_input_chunk * ic,int del)894 int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)
895 {
896     int tag_len;
897     int ret;
898     ssize_t bytes;
899     const char *tag_buf = NULL;
900     struct mk_list *head;
901     struct flb_output_instance *o_ins;
902 
903     if (flb_input_chunk_is_up(ic) == FLB_FALSE) {
904         flb_input_chunk_set_up(ic);
905     }
906 
907     mk_list_foreach(head, &ic->in->config->outputs) {
908         o_ins = mk_list_entry(head, struct flb_output_instance, _head);
909 
910         if (o_ins->total_limit_size == -1) {
911             continue;
912         }
913 
914         bytes = flb_input_chunk_get_real_size(ic);
915         if (bytes == -1) {
916             // no data in the chunk
917             continue;
918         }
919 
920         if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
921             o_ins->fs_chunks_size -= bytes;
922             flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, "
923                       "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
924                       bytes, o_ins->name, o_ins->fs_chunks_size);
925         }
926     }
927 
928     /*
929      * When a chunk is going to be destroyed, this can be in a down state,
930      * since the next step is to retrieve the Tag we need to have the
931      * content up.
932      */
933     ret = flb_input_chunk_is_up(ic);
934     if (ret == FLB_FALSE) {
935         ret = cio_chunk_up_force(ic->chunk);
936         if (ret == -1) {
937             flb_error("[input chunk] cannot load chunk: %s",
938                       flb_input_chunk_get_name(ic));
939         }
940     }
941 
942     /* Retrieve Tag */
943     ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len);
944     if (ret == -1) {
945         flb_trace("[input chunk] could not retrieve chunk tag: %s",
946                   flb_input_chunk_get_name(ic));
947     }
948 
949     if (del == CIO_TRUE && tag_buf) {
950         /*
951          * "TRY" to delete any reference to this chunk ('ic') from the hash
952          * table. Note that maybe the value is not longer available in the
953          * entries if it was replaced: note that we always keep the last
954          * chunk for a specific Tag.
955          */
956         if (ic->event_type == FLB_INPUT_LOGS) {
957             flb_hash_del_ptr(ic->in->ht_log_chunks,
958                              tag_buf, tag_len, (void *) ic);
959         }
960         else if (ic->event_type == FLB_INPUT_METRICS) {
961             flb_hash_del_ptr(ic->in->ht_metric_chunks,
962                              tag_buf, tag_len, (void *) ic);
963         }
964     }
965 
966     cio_chunk_close(ic->chunk, del);
967     mk_list_del(&ic->_head);
968     flb_free(ic);
969 
970     return 0;
971 }
972 
973 /* Return or create an available chunk to write data */
input_chunk_get(struct flb_input_instance * in,const char * tag,int tag_len,size_t chunk_size,int * set_down)974 static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
975                                                const char *tag, int tag_len,
976                                                size_t chunk_size, int *set_down)
977 {
978     int id = -1;
979     int ret;
980     int new_chunk = FLB_FALSE;
981     size_t out_size;
982     struct flb_input_chunk *ic = NULL;
983 
984     if (in->event_type == FLB_INPUT_LOGS) {
985         id = flb_hash_get(in->ht_log_chunks, tag, tag_len,
986                           (void *) &ic, &out_size);
987     }
988     else if (in->event_type == FLB_INPUT_METRICS) {
989         id = flb_hash_get(in->ht_metric_chunks, tag, tag_len,
990                           (void *) &ic, &out_size);
991     }
992 
993     if (id >= 0) {
994         if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) {
995             ic = NULL;
996         }
997         else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
998             ret = cio_chunk_up_force(ic->chunk);
999             if (ret == -1) {
1000                 ic = NULL;
1001             }
1002             *set_down = FLB_TRUE;
1003         }
1004     }
1005 
1006     /* No chunk was found, we need to create a new one */
1007     if (!ic) {
1008         ic = flb_input_chunk_create(in, (char *) tag, tag_len);
1009         new_chunk = FLB_TRUE;
1010         if (!ic) {
1011             return NULL;
1012         }
1013     }
1014 
1015     /*
1016      * If buffering this block of data will exceed one of the limit among all output instances
1017      * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks
1018      * (based in creation time) to get enough space for the incoming chunk.
1019      */
1020     if (!flb_routes_mask_is_empty(ic->routes_mask)
1021         && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) {
1022         /*
1023          * If the chunk is not newly created, the chunk might already have logs inside.
1024          * We cannot delete (reused) chunks here.
1025          * If the routes_mask is cleared after trying to append new data, we destroy
1026          * the chunk.
1027          */
1028         if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) {
1029             flb_input_chunk_destroy(ic, FLB_TRUE);
1030         }
1031 
1032         return NULL;
1033     }
1034 
1035     return ic;
1036 }
1037 
flb_input_chunk_is_mem_overlimit(struct flb_input_instance * i)1038 static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i)
1039 {
1040     if (i->mem_buf_limit <= 0) {
1041         return FLB_FALSE;
1042     }
1043 
1044     if (i->mem_chunks_size >= i->mem_buf_limit) {
1045         return FLB_TRUE;
1046     }
1047 
1048     return FLB_FALSE;
1049 }
1050 
flb_input_chunk_is_storage_overlimit(struct flb_input_instance * i)1051 static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i)
1052 {
1053     struct flb_storage_input *storage = (struct flb_storage_input *)i->storage;
1054 
1055 
1056     if (storage->type == CIO_STORE_FS) {
1057         if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) {
1058             if (storage->cio->total_chunks >= storage->cio->max_chunks_up) {
1059                 return FLB_TRUE;
1060             }
1061         }
1062     }
1063 
1064     return FLB_FALSE;
1065 }
1066 
1067 /*
1068  * Check all chunks associated to the input instance and summarize
1069  * the number of bytes in use.
1070  */
flb_input_chunk_total_size(struct flb_input_instance * in)1071 size_t flb_input_chunk_total_size(struct flb_input_instance *in)
1072 {
1073     size_t total = 0;
1074     struct flb_storage_input *storage;
1075 
1076     storage = (struct flb_storage_input *) in->storage;
1077     total = cio_stream_size_chunks_up(storage->stream);
1078     return total;
1079 }
1080 
1081 /*
1082  * Count and update the number of bytes being used by the instance. Also
1083  * check if the instance is paused, if so, check if it can be resumed if
1084  * is not longer over the limits.
1085  *
1086  * It always returns the number of bytes in use.
1087  */
flb_input_chunk_set_limits(struct flb_input_instance * in)1088 size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
1089 {
1090     size_t total;
1091 
1092     /* Gather total number of enqueued bytes */
1093     total = flb_input_chunk_total_size(in);
1094     /* Register the total into the context variable */
1095     in->mem_chunks_size = total;
1096 
1097     /*
1098      * After the adjustments, validate if the plugin is overlimit or paused
1099      * and perform further adjustments.
1100      */
1101     if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE &&
1102         in->config->is_running == FLB_TRUE &&
1103         in->config->is_ingestion_active == FLB_TRUE &&
1104         in->mem_buf_status == FLB_INPUT_PAUSED) {
1105         in->mem_buf_status = FLB_INPUT_RUNNING;
1106         if (in->p->cb_resume) {
1107             in->p->cb_resume(in->context, in->config);
1108             flb_info("[input] %s resume (mem buf overlimit)",
1109                       in->name);
1110         }
1111     }
1112     if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE &&
1113         in->config->is_running == FLB_TRUE &&
1114         in->config->is_ingestion_active == FLB_TRUE &&
1115         in->storage_buf_status == FLB_INPUT_PAUSED) {
1116         in->storage_buf_status = FLB_INPUT_RUNNING;
1117         if (in->p->cb_resume) {
1118             in->p->cb_resume(in->context, in->config);
1119             flb_info("[input] %s resume (storage buf overlimit %d/%d)",
1120                       in->name,
1121                       ((struct flb_storage_input *)in->storage)->cio->total_chunks,
1122                       ((struct flb_storage_input *)in->storage)->cio->max_chunks_up);
1123         }
1124     }
1125 
1126     return total;
1127 }
1128 
1129 /*
1130  * If the number of bytes in use by the chunks are over the imposed limit
1131  * by configuration, pause the instance.
1132  */
flb_input_chunk_protect(struct flb_input_instance * i)1133 static inline int flb_input_chunk_protect(struct flb_input_instance *i)
1134 {
1135     struct flb_storage_input *storage = i->storage;
1136 
1137     if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) {
1138         flb_warn("[input] %s paused (storage buf overlimit %d/%d)",
1139                  i->name,
1140                  storage->cio->total_chunks,
1141                  storage->cio->max_chunks_up);
1142 
1143         if (!flb_input_buf_paused(i)) {
1144             if (i->p->cb_pause) {
1145                 i->p->cb_pause(i->context, i->config);
1146             }
1147         }
1148         i->mem_buf_status = FLB_INPUT_PAUSED;
1149         return FLB_TRUE;
1150     }
1151 
1152     if (storage->type == CIO_STORE_FS) {
1153         return FLB_FALSE;
1154     }
1155 
1156     if (flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) {
1157         flb_warn("[input] %s paused (mem buf overlimit)",
1158                  i->name);
1159         if (!flb_input_buf_paused(i)) {
1160             if (i->p->cb_pause) {
1161                 i->p->cb_pause(i->context, i->config);
1162             }
1163         }
1164         i->storage_buf_status = FLB_INPUT_PAUSED;
1165         return FLB_TRUE;
1166     }
1167 
1168     return FLB_FALSE;
1169 }
1170 
1171 /*
1172  * Validate if the chunk coming from the input plugin based on config and
1173  * resources usage must be 'up' or 'down' (applicable for filesystem storage
1174  * type).
1175  *
1176  * FIXME: can we find a better name for this function ?
1177  */
flb_input_chunk_set_up_down(struct flb_input_chunk * ic)1178 int flb_input_chunk_set_up_down(struct flb_input_chunk *ic)
1179 {
1180     size_t total;
1181     struct flb_input_instance *in;
1182 
1183     in = ic->in;
1184 
1185     /* Gather total number of enqueued bytes */
1186     total = flb_input_chunk_total_size(in);
1187 
1188     /* Register the total into the context variable */
1189     in->mem_chunks_size = total;
1190 
1191     if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE) {
1192         if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
1193             cio_chunk_down(ic->chunk);
1194 
1195             /* Adjust new counters */
1196             total = flb_input_chunk_total_size(ic->in);
1197             in->mem_chunks_size = total;
1198 
1199             return FLB_FALSE;
1200         }
1201     }
1202 
1203     return FLB_TRUE;
1204 }
1205 
flb_input_chunk_is_up(struct flb_input_chunk * ic)1206 int flb_input_chunk_is_up(struct flb_input_chunk *ic)
1207 {
1208     return cio_chunk_is_up(ic->chunk);
1209 
1210 }
1211 
flb_input_chunk_down(struct flb_input_chunk * ic)1212 int flb_input_chunk_down(struct flb_input_chunk *ic)
1213 {
1214     if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
1215         return cio_chunk_down(ic->chunk);
1216     }
1217 
1218     return 0;
1219 }
1220 
flb_input_chunk_set_up(struct flb_input_chunk * ic)1221 int flb_input_chunk_set_up(struct flb_input_chunk *ic)
1222 {
1223     if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
1224         return cio_chunk_up(ic->chunk);
1225     }
1226 
1227     return 0;
1228 }
1229 
1230 /* Append a RAW MessagPack buffer to the input instance */
flb_input_chunk_append_raw(struct flb_input_instance * in,const char * tag,size_t tag_len,const void * buf,size_t buf_size)1231 int flb_input_chunk_append_raw(struct flb_input_instance *in,
1232                                const char *tag, size_t tag_len,
1233                                const void *buf, size_t buf_size)
1234 {
1235     int ret;
1236     int set_down = FLB_FALSE;
1237     int min;
1238     int meta_size;
1239     int new_chunk = FLB_FALSE;
1240     uint64_t ts;
1241     size_t diff;
1242     size_t size;
1243     size_t pre_size;
1244     struct flb_input_chunk *ic;
1245     struct flb_storage_input *si;
1246 
1247     /* Check if the input plugin has been paused */
1248     if (flb_input_buf_paused(in) == FLB_TRUE) {
1249         flb_debug("[input chunk] %s is paused, cannot append records",
1250                   in->name);
1251         return -1;
1252     }
1253 
1254     if (buf_size == 0) {
1255         flb_debug("[input chunk] skip ingesting data with 0 bytes");
1256         return -1;
1257     }
1258 
1259     /*
1260      * Some callers might not set a custom tag, on that case just inherit
1261      * the fixed instance tag or instance name.
1262      */
1263     if (!tag) {
1264         if (in->tag && in->tag_len > 0) {
1265             tag = in->tag;
1266             tag_len = in->tag_len;
1267         }
1268         else {
1269             tag = in->name;
1270             tag_len = strlen(in->name);
1271         }
1272     }
1273 
1274     /*
1275      * Get a target input chunk, can be one with remaining space available
1276      * or a new one.
1277      */
1278     ic = input_chunk_get(in, tag, tag_len, buf_size, &set_down);
1279     if (!ic) {
1280         flb_error("[input chunk] no available chunk");
1281         return -1;
1282     }
1283 
1284     /* newly created chunk */
1285     if (flb_input_chunk_get_size(ic) == 0) {
1286         new_chunk = FLB_TRUE;
1287     }
1288 
1289     /* We got the chunk, validate if is 'up' or 'down' */
1290     ret = flb_input_chunk_is_up(ic);
1291     if (ret == FLB_FALSE) {
1292         ret = cio_chunk_up_force(ic->chunk);
1293         if (ret == -1) {
1294             flb_error("[input chunk] cannot retrieve temporary chunk");
1295             return -1;
1296         }
1297         set_down = FLB_TRUE;
1298     }
1299 
1300     /*
1301      * Previous size from the chunk, used to calculate the difference
1302      * after filtering
1303      */
1304     pre_size = cio_chunk_get_content_size(ic->chunk);
1305 
1306     /* Write the new data */
1307     ret = flb_input_chunk_write(ic, buf, buf_size);
1308     if (ret == -1) {
1309         flb_error("[input chunk] error writing data from %s instance",
1310                   in->name);
1311         cio_chunk_tx_rollback(ic->chunk);
1312         return -1;
1313     }
1314 
1315     /* Update 'input' metrics */
1316 #ifdef FLB_HAVE_METRICS
1317     if (ic->total_records > 0) {
1318         /* timestamp */
1319         ts = cmt_time_now();
1320 
1321         /* fluentbit_input_records_total */
1322         cmt_counter_add(in->cmt_records, ts, ic->added_records,
1323                         1, (char *[]) {(char *) flb_input_name(in)});
1324 
1325         /* fluentbit_input_bytes_total */
1326         cmt_counter_add(in->cmt_bytes, ts, buf_size,
1327                         1, (char *[]) {(char *) flb_input_name(in)});
1328 
1329         /* OLD api */
1330         flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics);
1331         flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
1332     }
1333 #endif
1334 
1335     /* Apply filters */
1336     if (in->event_type == FLB_INPUT_LOGS) {
1337         flb_filter_do(ic,
1338                       buf, buf_size,
1339                       tag, tag_len, in->config);
1340     }
1341 
1342     /* Get chunk size */
1343     size = cio_chunk_get_content_size(ic->chunk);
1344 
1345     /* calculate the 'real' new bytes being added after the filtering phase */
1346     diff = llabs(size - pre_size);
1347 
1348     /*
1349      * Update output instance bytes counters, note that bytes counter should
1350      * always count the chunk size in the file system. Therefore, it should
1351      * add the extra bytes for the metadata.
1352      */
1353     meta_size = cio_meta_size(ic->chunk);
1354     if (new_chunk == FLB_TRUE) {
1355         diff += meta_size
1356             /* See https://github.com/edsiper/chunkio#file-layout for more details */
1357             + 2    /* HEADER BYTES */
1358             + 4    /* CRC32 */
1359             + 16   /* PADDING */
1360             + 2;   /* METADATA LENGTH BYTES */
1361     }
1362 
1363     /*
1364      * There is a case that rewrite_tag will modify the tag and keep rule is set
1365      * to drop the original record. The original record will still go through the
1366      * flb_input_chunk_update_output_instances(2) to update the fs_chunks_size by
1367      * metadata bytes (consisted by metadata bytes of the file chunk). This condition
1368      * sets the diff to 0 in order to not update the fs_chunks_size.
1369      */
1370     if (flb_input_chunk_get_size(ic) == 0) {
1371         diff = 0;
1372     }
1373 
1374     if (diff != 0) {
1375         flb_input_chunk_update_output_instances(ic, diff);
1376     }
1377 
1378     /* Lock buffers where size > 2MB */
1379     if (size > FLB_INPUT_CHUNK_FS_MAX_SIZE) {
1380         cio_chunk_lock(ic->chunk);
1381     }
1382 
1383     /* Make sure the data was not filtered out and the buffer size is zero */
1384     if (size == 0) {
1385         flb_input_chunk_destroy(ic, FLB_TRUE);
1386         flb_input_chunk_set_limits(in);
1387         return 0;
1388     }
1389 #ifdef FLB_HAVE_STREAM_PROCESSOR
1390     else if (in->config->stream_processor_ctx &&
1391              ic->event_type == FLB_INPUT_LOGS) {
1392         char *c_data;
1393         size_t c_size;
1394 
1395         /* Retrieve chunk (filtered) output content */
1396         cio_chunk_get_content(ic->chunk, &c_data, &c_size);
1397 
1398         /* Invoke stream processor */
1399         flb_sp_do(in->config->stream_processor_ctx,
1400                   in,
1401                   tag, tag_len,
1402                   c_data + ic->stream_off, c_size - ic->stream_off);
1403         ic->stream_off += (c_size - ic->stream_off);
1404     }
1405 #endif
1406 
1407     if (set_down == FLB_TRUE) {
1408         cio_chunk_down(ic->chunk);
1409     }
1410 
1411     /*
1412      * If the instance is not routable, there is no need to keep the
1413      * content in the storage engine, just get rid of it.
1414      */
1415     if (in->routable == FLB_FALSE) {
1416         flb_input_chunk_destroy(ic, FLB_TRUE);
1417         return 0;
1418     }
1419 
1420     /* Update memory counters and adjust limits if any */
1421     flb_input_chunk_set_limits(in);
1422 
1423     /*
1424      * Check if we are overlimit and validate if is there any filesystem
1425      * storage type asociated to this input instance, if so, unload the
1426      * chunk content from memory to respect imposed limits.
1427      *
1428      * Calling cio_chunk_down() the memory map associated and the file
1429      * descriptor will be released. At any later time, it must be bring up
1430      * for I/O operations.
1431      */
1432     si = (struct flb_storage_input *) in->storage;
1433     if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE &&
1434         si->type == CIO_STORE_FS) {
1435         if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
1436             /*
1437              * If we are already over limit, a sub-sequent data ingestion
1438              * might need a Chunk to write data in. As an optimization we
1439              * will put this Chunk down ONLY IF it has less than 1% of
1440              * it capacity as available space, otherwise keep it 'up' so
1441              * it available space can be used.
1442              */
1443             size = cio_chunk_get_content_size(ic->chunk);
1444 
1445             /* Do we have less than 1% available ? */
1446             min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01);
1447             if (FLB_INPUT_CHUNK_FS_MAX_SIZE - size < min) {
1448                 cio_chunk_down(ic->chunk);
1449             }
1450         }
1451     }
1452 
1453     flb_input_chunk_protect(in);
1454 
1455     return 0;
1456 }
1457 
1458 /* Retrieve a raw buffer from a dyntag node */
flb_input_chunk_flush(struct flb_input_chunk * ic,size_t * size)1459 const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size)
1460 {
1461     int ret;
1462     char *buf = NULL;
1463 
1464     if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
1465         ret = cio_chunk_up(ic->chunk);
1466         if (ret == -1) {
1467             return NULL;
1468         }
1469     }
1470 
1471     /*
1472      * msgpack-c internal use a raw buffer for it operations, since we
1473      * already appended data we just can take out the references to avoid
1474      * a new memory allocation and skip a copy operation.
1475      */
1476     ret = cio_chunk_get_content(ic->chunk, &buf, size);
1477     if (ret == -1) {
1478         flb_error("[input chunk] error retrieving chunk content");
1479         return NULL;
1480     }
1481 
1482     if (!buf) {
1483         *size = 0;
1484         return NULL;
1485     }
1486 
1487     /* Set it busy as it likely it's a reference for an outgoing task */
1488     ic->busy = FLB_TRUE;
1489 
1490     /* Lock the internal chunk */
1491     cio_chunk_lock(ic->chunk);
1492 
1493     return buf;
1494 }
1495 
flb_input_chunk_release_lock(struct flb_input_chunk * ic)1496 int flb_input_chunk_release_lock(struct flb_input_chunk *ic)
1497 {
1498     if (ic->busy == FLB_FALSE) {
1499         return -1;
1500     }
1501 
1502     ic->busy = FLB_FALSE;
1503     return 0;
1504 }
1505 
flb_input_chunk_get_name(struct flb_input_chunk * ic)1506 flb_sds_t flb_input_chunk_get_name(struct flb_input_chunk *ic)
1507 {
1508     struct cio_chunk *ch;
1509 
1510     ch = (struct cio_chunk *) ic->chunk;
1511     return ch->name;
1512 }
1513 
flb_input_chunk_get_tag(struct flb_input_chunk * ic,const char ** tag_buf,int * tag_len)1514 int flb_input_chunk_get_tag(struct flb_input_chunk *ic,
1515                             const char **tag_buf, int *tag_len)
1516 {
1517     int len;
1518     int ret;
1519     char *buf;
1520 
1521     ret = cio_meta_read(ic->chunk, &buf, &len);
1522     if (ret == -1) {
1523         *tag_len = -1;
1524         *tag_buf = NULL;
1525         return -1;
1526     }
1527 
1528     *tag_len = len;
1529     *tag_buf = buf;
1530 
1531     return ret;
1532 }
1533 
1534 /*
1535  * Iterates all output instances that the chunk will be flushing to and summarize
1536  * the total number of bytes in use after ingesting the new data.
1537  */
flb_input_chunk_update_output_instances(struct flb_input_chunk * ic,size_t chunk_size)1538 void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
1539                                              size_t chunk_size)
1540 {
1541     struct mk_list *head;
1542     struct flb_output_instance *o_ins;
1543 
1544     /* for each output plugin, we update the fs_chunks_size */
1545     mk_list_foreach(head, &ic->in->config->outputs) {
1546         o_ins = mk_list_entry(head, struct flb_output_instance, _head);
1547         if (o_ins->total_limit_size == -1) {
1548             continue;
1549         }
1550 
1551         if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
1552             /*
1553              * if there is match on any index of 1's in the binary, it indicates
1554              * that the input chunk will flush to this output instance
1555              */
1556             o_ins->fs_chunks_size += chunk_size;
1557 
1558             flb_debug("[input chunk] chunk %s update plugin %s fs_chunks_size by %ld bytes, "
1559                       "the current fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
1560                       o_ins->name, chunk_size, o_ins->fs_chunks_size);
1561         }
1562     }
1563 }
1564