1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019-2021 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_input.h>
23 #include <fluent-bit/flb_input_chunk.h>
24 #include <fluent-bit/flb_input_plugin.h>
25 #include <fluent-bit/flb_storage.h>
26 #include <fluent-bit/flb_time.h>
27 #include <fluent-bit/flb_router.h>
28 #include <fluent-bit/flb_task.h>
29 #include <fluent-bit/flb_routes_mask.h>
30 #include <fluent-bit/flb_metrics.h>
31 #include <fluent-bit/stream_processor/flb_sp.h>
32 #include <chunkio/chunkio.h>
33
34 #define BLOCK_UNTIL_KEYPRESS() {char temp_keypress_buffer; read(0, &temp_keypress_buffer, 1);}
35
36 #define FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL 0
37 #define FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL 1
38
39 extern ssize_t sb_get_releasable_output_queue_space(struct flb_output_instance *output_plugin,
40 size_t required_space);
41
42 extern int sb_release_output_queue_space(struct flb_output_instance *output_plugin,
43 size_t required_space);
44
45 static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic,
46 struct flb_input_chunk *old_ic,
47 uint64_t o_id);
48
49 static int flb_input_chunk_is_task_safe_delete(struct flb_task *task);
50
51 static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic);
52
flb_input_chunk_get_releasable_space(struct flb_input_chunk * new_input_chunk,struct flb_input_instance * input_plugin,struct flb_output_instance * output_plugin,size_t required_space)53 static ssize_t flb_input_chunk_get_releasable_space(
54 struct flb_input_chunk *new_input_chunk,
55 struct flb_input_instance *input_plugin,
56 struct flb_output_instance *output_plugin,
57 size_t required_space)
58 {
59 struct mk_list *input_chunk_iterator;
60 ssize_t releasable_space;
61 struct flb_input_chunk *old_input_chunk;
62
63 releasable_space = 0;
64
65 mk_list_foreach(input_chunk_iterator, &input_plugin->chunks) {
66 old_input_chunk = mk_list_entry(input_chunk_iterator, struct flb_input_chunk, _head);
67
68 if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask, output_plugin->id)) {
69 continue;
70 }
71
72 if (flb_input_chunk_safe_delete(new_input_chunk, old_input_chunk,
73 output_plugin->id) == FLB_FALSE ||
74 flb_input_chunk_is_task_safe_delete(old_input_chunk->task) == FLB_FALSE) {
75 continue;
76 }
77
78 releasable_space += flb_input_chunk_get_real_size(old_input_chunk);
79
80 if (releasable_space >= required_space) {
81 break;
82 }
83 }
84
85 return releasable_space;
86 }
87
flb_input_chunk_release_space(struct flb_input_chunk * new_input_chunk,struct flb_input_instance * input_plugin,struct flb_output_instance * output_plugin,ssize_t required_space,int release_scope)88 static int flb_input_chunk_release_space(
89 struct flb_input_chunk *new_input_chunk,
90 struct flb_input_instance *input_plugin,
91 struct flb_output_instance *output_plugin,
92 ssize_t required_space,
93 int release_scope)
94 {
95 struct mk_list *input_chunk_iterator_tmp;
96 struct mk_list *input_chunk_iterator;
97 int chunk_destroy_flag;
98 struct flb_input_chunk *old_input_chunk;
99 ssize_t released_space;
100 int chunk_released;
101 ssize_t chunk_size;
102
103 released_space = 0;
104
105 mk_list_foreach_safe(input_chunk_iterator, input_chunk_iterator_tmp,
106 &input_plugin->chunks) {
107 old_input_chunk = mk_list_entry(input_chunk_iterator,
108 struct flb_input_chunk, _head);
109
110 if (!flb_routes_mask_get_bit(old_input_chunk->routes_mask,
111 output_plugin->id)) {
112 continue;
113 }
114
115 if (flb_input_chunk_safe_delete(new_input_chunk,
116 old_input_chunk,
117 output_plugin->id) == FLB_FALSE ||
118 flb_input_chunk_is_task_safe_delete(old_input_chunk->task) == FLB_FALSE) {
119 continue;
120 }
121
122 chunk_size = flb_input_chunk_get_real_size(old_input_chunk);
123 chunk_released = FLB_FALSE;
124 chunk_destroy_flag = FLB_FALSE;
125
126 if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL) {
127 flb_routes_mask_clear_bit(old_input_chunk->routes_mask,
128 output_plugin->id);
129
130 output_plugin->fs_chunks_size -= chunk_size;
131
132 chunk_destroy_flag = flb_routes_mask_is_empty(
133 old_input_chunk->routes_mask);
134
135 chunk_released = FLB_TRUE;
136 }
137 else if (release_scope == FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL) {
138 chunk_destroy_flag = FLB_TRUE;
139 }
140
141 if (chunk_destroy_flag) {
142 if (old_input_chunk->task != NULL) {
143 /*
144 * If the chunk is referenced by a task and task has no active route,
145 * we need to destroy the task as well.
146 */
147 if (old_input_chunk->task->users == 0) {
148 flb_debug("[task] drop task_id %d with no active route from input plugin %s",
149 old_input_chunk->task->id, new_input_chunk->in->name);
150 flb_task_destroy(old_input_chunk->task, FLB_TRUE);
151
152 chunk_released = FLB_TRUE;
153 }
154 }
155 else {
156 flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s",
157 flb_input_chunk_get_name(old_input_chunk), new_input_chunk->in->name);
158
159 flb_input_chunk_destroy(old_input_chunk, FLB_TRUE);
160
161 chunk_released = FLB_TRUE;
162 }
163 }
164
165 if (chunk_released) {
166 released_space += chunk_size;
167 }
168
169 if (released_space >= required_space) {
170 break;
171 }
172 }
173
174 if (released_space < required_space) {
175 return -2;
176 }
177
178 return 0;
179 }
180
generate_chunk_name(struct flb_input_instance * in,char * out_buf,int buf_size)181 static void generate_chunk_name(struct flb_input_instance *in,
182 char *out_buf, int buf_size)
183 {
184 struct flb_time tm;
185 (void) in;
186
187 flb_time_get(&tm);
188 snprintf(out_buf, buf_size - 1,
189 "%i-%lu.%4lu.flb",
190 getpid(),
191 tm.tm.tv_sec, tm.tm.tv_nsec);
192 }
193
flb_input_chunk_get_size(struct flb_input_chunk * ic)194 ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic)
195 {
196 return cio_chunk_get_content_size(ic->chunk);
197 }
198
199 /*
200 * When chunk is set to DOWN from memory, data_size is set to 0 and
201 * cio_chunk_get_content_size(1) returns the data_size. fs_chunks_size
202 * is used to track the size of chunks in filesystem so we need to call
203 * cio_chunk_get_real_size to return the original size in the file system
204 */
flb_input_chunk_get_real_size(struct flb_input_chunk * ic)205 static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic)
206 {
207 ssize_t meta_size;
208 ssize_t size;
209
210 size = cio_chunk_get_real_size(ic->chunk);
211
212 if (size != 0) {
213 return size;
214 }
215
216 // Real size is not synced to chunk yet
217 size = flb_input_chunk_get_size(ic);
218 if (size == 0) {
219 flb_debug("[input chunk] no data in the chunk %s",
220 flb_input_chunk_get_name(ic));
221 return -1;
222 }
223
224 meta_size = cio_meta_size(ic->chunk);
225 size += meta_size
226 /* See https://github.com/edsiper/chunkio#file-layout for more details */
227 + 2 /* HEADER BYTES */
228 + 4 /* CRC32 */
229 + 16 /* PADDING */
230 + 2; /* METADATA LENGTH BYTES */
231
232 return size;
233 }
234
flb_input_chunk_write(void * data,const char * buf,size_t len)235 int flb_input_chunk_write(void *data, const char *buf, size_t len)
236 {
237 int ret;
238 struct flb_input_chunk *ic;
239
240 ic = (struct flb_input_chunk *) data;
241
242 ret = cio_chunk_write(ic->chunk, buf, len);
243 #ifdef FLB_HAVE_METRICS
244 if (ret == CIO_OK) {
245 ic->added_records = flb_mp_count(buf, len);
246 ic->total_records += ic->added_records;
247 }
248 #endif
249
250 return ret;
251 }
252
flb_input_chunk_write_at(void * data,off_t offset,const char * buf,size_t len)253 int flb_input_chunk_write_at(void *data, off_t offset,
254 const char *buf, size_t len)
255 {
256 int ret;
257 struct flb_input_chunk *ic;
258
259 ic = (struct flb_input_chunk *) data;
260
261 ret = cio_chunk_write_at(ic->chunk, offset, buf, len);
262 return ret;
263 }
264
265 /*
266 * For input_chunk referenced by an outgoing task, we need to check
267 * whether the chunk is in the middle of output flush callback
268 */
flb_input_chunk_is_task_safe_delete(struct flb_task * task)269 static int flb_input_chunk_is_task_safe_delete(struct flb_task *task)
270 {
271 if (!task) {
272 return FLB_TRUE;
273 }
274
275 if (task->users != 0) {
276 return FLB_FALSE;
277 }
278
279 return FLB_TRUE;
280 }
281
flb_input_chunk_safe_delete(struct flb_input_chunk * ic,struct flb_input_chunk * old_ic,uint64_t o_id)282 static int flb_input_chunk_safe_delete(struct flb_input_chunk *ic,
283 struct flb_input_chunk *old_ic,
284 uint64_t o_id)
285 {
286 /* The chunk we want to drop should not be the incoming chunk */
287 if (ic == old_ic) {
288 return FLB_FALSE;
289 }
290
291 /*
292 * Even if chunks from same input plugin have same routes_mask when created,
293 * the routes_mask could be modified when new chunks is ingested. Therefore,
294 * we still need to do the validation on the routes_mask with o_id.
295 */
296 if (flb_routes_mask_get_bit(old_ic->routes_mask, o_id) == 0) {
297 return FLB_FALSE;
298 }
299
300 return FLB_TRUE;
301 }
302
flb_input_chunk_release_space_compound(struct flb_input_chunk * new_input_chunk,struct flb_output_instance * output_plugin,size_t * local_release_requirement,int release_local_space)303 int flb_input_chunk_release_space_compound(
304 struct flb_input_chunk *new_input_chunk,
305 struct flb_output_instance *output_plugin,
306 size_t *local_release_requirement,
307 int release_local_space)
308 {
309 ssize_t segregated_backlog_releasable_space;
310 ssize_t active_backlog_releasable_space;
311 ssize_t active_plugin_releasable_space;
312 ssize_t required_space_remainder;
313 struct flb_input_instance *storage_backlog_instance;
314 int result;
315
316 storage_backlog_instance = output_plugin->config->storage_input_plugin;
317
318 *local_release_requirement = flb_input_chunk_get_real_size(new_input_chunk);
319 required_space_remainder = (ssize_t) *local_release_requirement;
320
321 segregated_backlog_releasable_space = 0;
322 active_backlog_releasable_space = 0;
323 active_plugin_releasable_space = 0;
324
325 active_backlog_releasable_space = flb_input_chunk_get_releasable_space(
326 new_input_chunk,
327 storage_backlog_instance,
328 output_plugin,
329 required_space_remainder);
330
331 required_space_remainder -= active_backlog_releasable_space;
332
333 if (required_space_remainder > 0) {
334 segregated_backlog_releasable_space = sb_get_releasable_output_queue_space(
335 output_plugin,
336 required_space_remainder);
337
338 required_space_remainder -= segregated_backlog_releasable_space;
339 }
340
341 if (required_space_remainder > 0) {
342 active_plugin_releasable_space = flb_input_chunk_get_releasable_space(
343 new_input_chunk,
344 new_input_chunk->in,
345 output_plugin,
346 required_space_remainder);
347
348 required_space_remainder -= active_plugin_releasable_space;
349 }
350
351 /* When we get here required_space_remainder could be negative but it's not a problem
352 * this happens when the weight of the removed chunk is higher than the remainder of
353 * the required space and it's not something that can nor should be prevented.
354 */
355
356 if (required_space_remainder > 0) {
357 return -2;
358 }
359
360 required_space_remainder = (ssize_t) *local_release_requirement;
361
362 if (required_space_remainder > 0 && active_backlog_releasable_space > 0) {
363 result = flb_input_chunk_release_space(new_input_chunk,
364 storage_backlog_instance,
365 output_plugin,
366 active_backlog_releasable_space,
367 FLB_INPUT_CHUNK_RELEASE_SCOPE_GLOBAL);
368
369 if (result) {
370 return -3;
371 }
372
373 required_space_remainder -= active_backlog_releasable_space;
374 }
375
376 if (required_space_remainder > 0 && segregated_backlog_releasable_space > 0) {
377 result = sb_release_output_queue_space(
378 output_plugin,
379 segregated_backlog_releasable_space);
380
381 if (result) {
382 *local_release_requirement = (size_t) required_space_remainder;
383
384 return -4;
385 }
386
387 required_space_remainder -= segregated_backlog_releasable_space;
388 }
389
390 if (release_local_space) {
391 if (required_space_remainder > 0 && active_plugin_releasable_space > 0) {
392 result = flb_input_chunk_release_space(new_input_chunk,
393 new_input_chunk->in,
394 output_plugin,
395 active_plugin_releasable_space,
396 FLB_INPUT_CHUNK_RELEASE_SCOPE_LOCAL);
397
398 if (result) {
399 printf("FAILED\n");
400 return -5;
401 }
402
403 required_space_remainder -= active_plugin_releasable_space;
404 }
405 }
406
407 if (required_space_remainder < 0) {
408 required_space_remainder = 0;
409 }
410
411 *local_release_requirement = (size_t) required_space_remainder;
412
413 return 0;
414 }
415
416 /*
417 * Returns how many chunks needs to be dropped in order to get enough space to
418 * buffer the incoming data (with size chunk_size)
419 */
flb_intput_chunk_count_dropped_chunks(struct flb_input_chunk * ic,struct flb_output_instance * o_ins,size_t chunk_size)420 int flb_intput_chunk_count_dropped_chunks(struct flb_input_chunk *ic,
421 struct flb_output_instance *o_ins,
422 size_t chunk_size)
423 {
424 int count = 0;
425 int enough_space = FLB_FALSE;
426 ssize_t bytes_remained;
427 struct mk_list *head;
428 struct flb_input_chunk *old_ic;
429
430 bytes_remained = o_ins->total_limit_size -
431 o_ins->fs_chunks_size -
432 o_ins->fs_backlog_chunks_size;
433
434 mk_list_foreach(head, &ic->in->chunks) {
435 old_ic = mk_list_entry(head, struct flb_input_chunk, _head);
436
437 if (flb_input_chunk_safe_delete(ic, old_ic, o_ins->id) == FLB_FALSE ||
438 flb_input_chunk_is_task_safe_delete(old_ic->task) == FLB_FALSE) {
439 continue;
440 }
441
442 bytes_remained += flb_input_chunk_get_real_size(old_ic);
443 count++;
444 if (bytes_remained >= (ssize_t) chunk_size) {
445 enough_space = FLB_TRUE;
446 break;
447 }
448 }
449
450 /*
451 * flb_intput_chunk_count_dropped_chunks(3) will only be called if the chunk will
452 * be flushing to the output instance passed in and the instance will reach its
453 * limit after appending the new data. This function will try to count how many
454 * chunks need to be dropped in order to place the incoing chunk.
455 *
456 * Return '0' means that we cannot find a slot to ingest the incoming data.
457 */
458 if (enough_space == FLB_FALSE) {
459 return 0;
460 }
461
462 return count;
463 }
464
465 /*
466 * Find a slot in the output instance to append the new data with size chunk_size, it
467 * will drop the the oldest chunks when the limitation on local disk is reached.
468 */
flb_input_chunk_find_space_new_data(struct flb_input_chunk * ic,size_t chunk_size,int overlimit)469 int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic,
470 size_t chunk_size, int overlimit)
471 {
472 int count;
473 int result;
474 ssize_t bytes;
475 ssize_t old_ic_bytes;
476 struct mk_list *tmp;
477 struct mk_list *head;
478 struct mk_list *head_chunk;
479 struct flb_output_instance *o_ins;
480 struct flb_input_chunk *old_ic;
481 size_t local_release_requirement;
482
483 /*
484 * For each output instances that will be over the limit after adding the new chunk,
485 * we have to determine how many chunks needs to be removed. We will adjust the
486 * routes_mask to only route to the output plugin that have enough space after
487 * deleting some chunks fome the queue.
488 */
489 mk_list_foreach(head, &ic->in->config->outputs) {
490 count = 0;
491 o_ins = mk_list_entry(head, struct flb_output_instance, _head);
492
493 if ((o_ins->total_limit_size == -1) || ((1 << o_ins->id) & overlimit) == 0 ||
494 (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) {
495 continue;
496 }
497
498 local_release_requirement = 0;
499
500 result = flb_input_chunk_release_space_compound(
501 ic, o_ins,
502 &local_release_requirement,
503 FLB_FALSE);
504
505 if (!result && local_release_requirement == 0) {
506 /* If this function returned 0 it means the space requirement was
507 * satisfied solely by releasing chunks in either storage_backlog
508 * state (segregated or in queue)
509 */
510 continue;
511 }
512
513 /* flb_input_chunk_find_space_new_data_backlog may fail to meet the space
514 * requirements but it always sets local_release_requirement to the right amount
515 */
516
517 count = flb_intput_chunk_count_dropped_chunks(ic, o_ins, local_release_requirement);
518
519 if (count == 0) {
520 /*
521 * The worst scenerio is that we cannot find a space by dropping some
522 * old chunks for the incoming chunk. We need to adjust the routes_mask
523 * of the incoming chunk to not flush to that output instance.
524 */
525 flb_error("[input chunk] no enough space in filesystem to buffer "
526 "chunk %s in plugin %s", flb_input_chunk_get_name(ic), o_ins->name);
527
528 flb_routes_mask_clear_bit(ic->routes_mask, o_ins->id);
529 if (flb_routes_mask_is_empty(ic->routes_mask)) {
530 bytes = flb_input_chunk_get_size(ic);
531 if (bytes != 0) {
532 /*
533 * Skip newly created chunk as newly created chunk
534 * hasn't updated the fs_chunks_size yet.
535 */
536 bytes = flb_input_chunk_get_real_size(ic);
537 o_ins->fs_chunks_size -= bytes;
538 flb_debug("[input chunk] chunk %s has no output route, "
539 "remove %ld bytes from fs_chunks_size",
540 flb_input_chunk_get_name(ic), bytes);
541 }
542 }
543
544 continue;
545 }
546
547 /*
548 * Here we need to drop some chunks from the beginning of chunks list.
549 * Since chunks are stored in a double linked list (mk_list), we are
550 * able to iterate the list from the beginning and check if the current
551 * chunk is able to be removed.
552 */
553 mk_list_foreach_safe(head_chunk, tmp, &ic->in->chunks) {
554 old_ic = mk_list_entry(head_chunk, struct flb_input_chunk, _head);
555
556 if (flb_input_chunk_safe_delete(ic, old_ic, o_ins->id) == FLB_FALSE ||
557 flb_input_chunk_is_task_safe_delete(old_ic->task) == FLB_FALSE) {
558 continue;
559 }
560
561 old_ic_bytes = flb_input_chunk_get_real_size(old_ic);
562
563 /* drop chunk by adjusting the routes_mask */
564 flb_routes_mask_clear_bit(old_ic->routes_mask, o_ins->id);
565 o_ins->fs_chunks_size -= old_ic_bytes;
566
567 flb_debug("[input chunk] remove route of chunk %s with size %ld bytes to output plugin %s "
568 "to place the incoming data with size %ld bytes", flb_input_chunk_get_name(old_ic),
569 old_ic_bytes, o_ins->name, chunk_size);
570
571 if (flb_routes_mask_is_empty(old_ic->routes_mask)) {
572 if (old_ic->task != NULL) {
573 /*
574 * If the chunk is referenced by a task and task has no active route,
575 * we need to destroy the task as well.
576 */
577 if (old_ic->task->users == 0) {
578 flb_debug("[task] drop task_id %d with no active route from input plugin %s",
579 old_ic->task->id, ic->in->name);
580 flb_task_destroy(old_ic->task, FLB_TRUE);
581 }
582 }
583 else {
584 flb_debug("[input chunk] drop chunk %s with no output route from input plugin %s",
585 flb_input_chunk_get_name(old_ic), ic->in->name);
586 flb_input_chunk_destroy(old_ic, FLB_TRUE);
587 }
588 }
589
590 count--;
591 if (count == 0) {
592 /* we have dropped enough chunks to place the incoming chunks */
593 break;
594 }
595 }
596 }
597
598 if (count != 0) {
599 flb_error("[input chunk] fail to drop enough chunks in order to place new data");
600 }
601
602 return 0;
603 }
604
605 /*
606 * Returns a non-zero result if any output instances will reach the limit
607 * after buffering the new data
608 */
flb_input_chunk_has_overlimit_routes(struct flb_input_chunk * ic,size_t chunk_size)609 int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic,
610 size_t chunk_size)
611 {
612 int overlimit = 0;
613 struct mk_list *head;
614 struct flb_output_instance *o_ins;
615
616 mk_list_foreach(head, &ic->in->config->outputs) {
617 o_ins = mk_list_entry(head, struct flb_output_instance, _head);
618
619 if ((o_ins->total_limit_size == -1) ||
620 (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) == 0)) {
621 continue;
622 }
623
624 flb_debug("[input chunk] chunk %s required %ld bytes and %ld bytes left "
625 "in plugin %s", flb_input_chunk_get_name(ic), chunk_size,
626 o_ins->total_limit_size -
627 o_ins->fs_backlog_chunks_size -
628 o_ins->fs_chunks_size,
629 o_ins->name);
630
631 if ((o_ins->fs_chunks_size +
632 o_ins->fs_backlog_chunks_size +
633 chunk_size) > o_ins->total_limit_size) {
634 overlimit |= (1 << o_ins->id);
635 }
636 }
637
638 return overlimit;
639 }
640
641 /* Find a slot for the incoming data to buffer it in local file system
642 * returns 0 if none of the routes can be written to
643 */
flb_input_chunk_place_new_chunk(struct flb_input_chunk * ic,size_t chunk_size)644 int flb_input_chunk_place_new_chunk(struct flb_input_chunk *ic, size_t chunk_size)
645 {
646 int overlimit;
647 overlimit = flb_input_chunk_has_overlimit_routes(ic, chunk_size);
648 if (overlimit != 0) {
649 flb_input_chunk_find_space_new_data(ic, chunk_size, overlimit);
650 }
651
652 return !flb_routes_mask_is_empty(ic->routes_mask);
653 }
654
655 /* Create an input chunk using a Chunk I/O */
flb_input_chunk_map(struct flb_input_instance * in,void * chunk)656 struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in,
657 void *chunk)
658 {
659 int records = 0;
660 int tag_len;
661 int has_routes;
662 int ret;
663 uint64_t ts;
664 char *buf_data;
665 size_t buf_size;
666 size_t offset;
667 ssize_t bytes;
668 const char *tag_buf;
669 struct flb_input_chunk *ic;
670
671 /* Create context for the input instance */
672 ic = flb_calloc(1, sizeof(struct flb_input_chunk));
673 if (!ic) {
674 flb_errno();
675 return NULL;
676 }
677
678 ic->busy = FLB_FALSE;
679 ic->fs_backlog = FLB_TRUE;
680 ic->chunk = chunk;
681 ic->in = in;
682 msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write);
683
684 ret = cio_chunk_get_content(ic->chunk, &buf_data, &buf_size);
685 if (ret != CIO_OK) {
686 flb_error("[input chunk] error retrieving content for metrics");
687 flb_free(ic);
688 return NULL;
689 }
690
691 /* Validate records in the chunk */
692 ret = flb_mp_validate_chunk(buf_data, buf_size, &records, &offset);
693 if (ret == -1) {
694 /* If there are valid records, truncate the chunk size */
695 if (records <= 0) {
696 flb_plg_error(in,
697 "chunk validation failed, data might be corrupted. "
698 "No valid records found, the chunk will be discarded.");
699 flb_free(ic);
700 return NULL;
701 }
702 if (records > 0 && offset > 32) {
703 flb_plg_warn(in,
704 "chunk validation failed, data might be corrupted. "
705 "Found %d valid records, failed content starts "
706 "right after byte %lu. Recovering valid records.",
707 records, offset);
708
709 /* truncate the chunk to recover valid records */
710 cio_chunk_write_at(chunk, offset, NULL, 0);
711 }
712 else {
713 flb_plg_error(in,
714 "chunk validation failed, data might be corrupted. "
715 "Found %d valid records, failed content starts "
716 "right after byte %lu. Cannot recover chunk,",
717 records, offset);
718 flb_free(ic);
719 return NULL;
720 }
721 }
722
723 if (records == 0) {
724 flb_plg_error(in,
725 "chunk validation failed, data might be corrupted. "
726 "No valid records found, the chunk will be discarded.");
727 flb_free(ic);
728 return NULL;
729 }
730
731 /*
732 * If the content is valid and the chunk has extra padding zeros, just
733 * perform an adjustment.
734 */
735 bytes = cio_chunk_get_content_size(chunk);
736 if (bytes == -1) {
737 flb_free(ic);
738 return NULL;
739 }
740 if (offset < bytes) {
741 cio_chunk_write_at(chunk, offset, NULL, 0);
742 }
743
744 /* Updat metrics */
745 #ifdef FLB_HAVE_METRICS
746 ic->total_records = records;
747 if (ic->total_records > 0) {
748 /* timestamp */
749 ts = cmt_time_now();
750
751 /* fluentbit_input_records_total */
752 cmt_counter_add(in->cmt_records, ts, ic->total_records,
753 1, (char *[]) {(char *) flb_input_name(in)});
754
755 /* fluentbit_input_bytes_total */
756 cmt_counter_add(in->cmt_bytes, ts, buf_size,
757 1, (char *[]) {(char *) flb_input_name(in)});
758
759 /* OLD metrics */
760 flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->total_records, in->metrics);
761 flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
762 }
763 #endif
764
765 /* Get the the tag reference (chunk metadata) */
766 ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len);
767 if (ret == -1) {
768 flb_error("[input chunk] error retrieving tag of input chunk");
769 flb_free(ic);
770 return NULL;
771 }
772
773 bytes = flb_input_chunk_get_real_size(ic);
774 if (bytes < 0) {
775 flb_warn("[input chunk] could not retrieve chunk real size");
776 flb_free(ic);
777 return NULL;
778 }
779
780 has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag_buf, tag_len, in);
781 if (has_routes == 0) {
782 flb_warn("[input chunk] no matching route for backoff log chunk %s",
783 flb_input_chunk_get_name(ic));
784 }
785
786 mk_list_add(&ic->_head, &in->chunks);
787
788 flb_input_chunk_update_output_instances(ic, bytes);
789
790 return ic;
791 }
792
flb_input_chunk_create(struct flb_input_instance * in,const char * tag,int tag_len)793 struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in,
794 const char *tag, int tag_len)
795 {
796 int ret;
797 int err;
798 int set_down = FLB_FALSE;
799 int has_routes;
800 char name[64];
801 struct cio_chunk *chunk;
802 struct flb_storage_input *storage;
803 struct flb_input_chunk *ic;
804
805 storage = in->storage;
806
807 /* chunk name */
808 generate_chunk_name(in, name, sizeof(name) - 1);
809
810 /* open/create target chunk file */
811 chunk = cio_chunk_open(storage->cio, storage->stream, name,
812 CIO_OPEN, FLB_INPUT_CHUNK_SIZE, &err);
813 if (!chunk) {
814 flb_error("[input chunk] could not create chunk file: %s:%s",
815 storage->stream->name, name);
816 return NULL;
817 }
818 /*
819 * If the returned chunk at open is 'down', just put it up, write the
820 * content and set it down again.
821 */
822 ret = cio_chunk_is_up(chunk);
823 if (ret == CIO_FALSE) {
824 ret = cio_chunk_up_force(chunk);
825 if (ret == -1) {
826 cio_chunk_close(chunk, CIO_TRUE);
827 return NULL;
828 }
829 set_down = FLB_TRUE;
830 }
831
832 /* write metadata (tag) */
833 if (tag_len > 65535) {
834 /* truncate length */
835 tag_len = 65535;
836 }
837
838 /* Write tag into metadata section */
839 ret = cio_meta_write(chunk, (char *) tag, tag_len);
840 if (ret == -1) {
841 flb_error("[input chunk] could not write metadata");
842 cio_chunk_close(chunk, CIO_TRUE);
843 return NULL;
844 }
845
846 /* Create context for the input instance */
847 ic = flb_calloc(1, sizeof(struct flb_input_chunk));
848 if (!ic) {
849 flb_errno();
850 cio_chunk_close(chunk, CIO_TRUE);
851 return NULL;
852 }
853
854 /*
855 * Check chunk content type to be created: depending of the value set by
856 * the input plugin, this can be FLB_INPUT_LOGS or FLB_INPUT_METRICS
857 */
858 ic->event_type = in->event_type;
859
860 ic->busy = FLB_FALSE;
861 ic->chunk = chunk;
862 ic->fs_backlog = FLB_FALSE;
863 ic->in = in;
864 ic->stream_off = 0;
865 ic->task = NULL;
866 #ifdef FLB_HAVE_METRICS
867 ic->total_records = 0;
868 #endif
869
870 /* Calculate the routes_mask for the input chunk */
871 has_routes = flb_routes_mask_set_by_tag(ic->routes_mask, tag, tag_len, in);
872 if (has_routes == 0) {
873 flb_trace("[input chunk] no matching route for input chunk '%s' with tag '%s'",
874 flb_input_chunk_get_name(ic), tag);
875 }
876
877 msgpack_packer_init(&ic->mp_pck, ic, flb_input_chunk_write);
878 mk_list_add(&ic->_head, &in->chunks);
879
880 if (set_down == FLB_TRUE) {
881 cio_chunk_down(chunk);
882 }
883
884 if (flb_input_event_type_is_log(in)) {
885 flb_hash_add(in->ht_log_chunks, tag, tag_len, ic, 0);
886 }
887 else if (flb_input_event_type_is_metric(in)) {
888 flb_hash_add(in->ht_metric_chunks, tag, tag_len, ic, 0);
889 }
890
891 return ic;
892 }
893
flb_input_chunk_destroy(struct flb_input_chunk * ic,int del)894 int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)
895 {
896 int tag_len;
897 int ret;
898 ssize_t bytes;
899 const char *tag_buf = NULL;
900 struct mk_list *head;
901 struct flb_output_instance *o_ins;
902
903 if (flb_input_chunk_is_up(ic) == FLB_FALSE) {
904 flb_input_chunk_set_up(ic);
905 }
906
907 mk_list_foreach(head, &ic->in->config->outputs) {
908 o_ins = mk_list_entry(head, struct flb_output_instance, _head);
909
910 if (o_ins->total_limit_size == -1) {
911 continue;
912 }
913
914 bytes = flb_input_chunk_get_real_size(ic);
915 if (bytes == -1) {
916 // no data in the chunk
917 continue;
918 }
919
920 if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
921 o_ins->fs_chunks_size -= bytes;
922 flb_debug("[input chunk] remove chunk %s with %ld bytes from plugin %s, "
923 "the updated fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
924 bytes, o_ins->name, o_ins->fs_chunks_size);
925 }
926 }
927
928 /*
929 * When a chunk is going to be destroyed, this can be in a down state,
930 * since the next step is to retrieve the Tag we need to have the
931 * content up.
932 */
933 ret = flb_input_chunk_is_up(ic);
934 if (ret == FLB_FALSE) {
935 ret = cio_chunk_up_force(ic->chunk);
936 if (ret == -1) {
937 flb_error("[input chunk] cannot load chunk: %s",
938 flb_input_chunk_get_name(ic));
939 }
940 }
941
942 /* Retrieve Tag */
943 ret = flb_input_chunk_get_tag(ic, &tag_buf, &tag_len);
944 if (ret == -1) {
945 flb_trace("[input chunk] could not retrieve chunk tag: %s",
946 flb_input_chunk_get_name(ic));
947 }
948
949 if (del == CIO_TRUE && tag_buf) {
950 /*
951 * "TRY" to delete any reference to this chunk ('ic') from the hash
952 * table. Note that maybe the value is not longer available in the
953 * entries if it was replaced: note that we always keep the last
954 * chunk for a specific Tag.
955 */
956 if (ic->event_type == FLB_INPUT_LOGS) {
957 flb_hash_del_ptr(ic->in->ht_log_chunks,
958 tag_buf, tag_len, (void *) ic);
959 }
960 else if (ic->event_type == FLB_INPUT_METRICS) {
961 flb_hash_del_ptr(ic->in->ht_metric_chunks,
962 tag_buf, tag_len, (void *) ic);
963 }
964 }
965
966 cio_chunk_close(ic->chunk, del);
967 mk_list_del(&ic->_head);
968 flb_free(ic);
969
970 return 0;
971 }
972
973 /* Return or create an available chunk to write data */
input_chunk_get(struct flb_input_instance * in,const char * tag,int tag_len,size_t chunk_size,int * set_down)974 static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
975 const char *tag, int tag_len,
976 size_t chunk_size, int *set_down)
977 {
978 int id = -1;
979 int ret;
980 int new_chunk = FLB_FALSE;
981 size_t out_size;
982 struct flb_input_chunk *ic = NULL;
983
984 if (in->event_type == FLB_INPUT_LOGS) {
985 id = flb_hash_get(in->ht_log_chunks, tag, tag_len,
986 (void *) &ic, &out_size);
987 }
988 else if (in->event_type == FLB_INPUT_METRICS) {
989 id = flb_hash_get(in->ht_metric_chunks, tag, tag_len,
990 (void *) &ic, &out_size);
991 }
992
993 if (id >= 0) {
994 if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) {
995 ic = NULL;
996 }
997 else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
998 ret = cio_chunk_up_force(ic->chunk);
999 if (ret == -1) {
1000 ic = NULL;
1001 }
1002 *set_down = FLB_TRUE;
1003 }
1004 }
1005
1006 /* No chunk was found, we need to create a new one */
1007 if (!ic) {
1008 ic = flb_input_chunk_create(in, (char *) tag, tag_len);
1009 new_chunk = FLB_TRUE;
1010 if (!ic) {
1011 return NULL;
1012 }
1013 }
1014
1015 /*
1016 * If buffering this block of data will exceed one of the limit among all output instances
1017 * that the chunk will flush to, we need to modify the routes_mask of the oldest chunks
1018 * (based in creation time) to get enough space for the incoming chunk.
1019 */
1020 if (!flb_routes_mask_is_empty(ic->routes_mask)
1021 && flb_input_chunk_place_new_chunk(ic, chunk_size) == 0) {
1022 /*
1023 * If the chunk is not newly created, the chunk might already have logs inside.
1024 * We cannot delete (reused) chunks here.
1025 * If the routes_mask is cleared after trying to append new data, we destroy
1026 * the chunk.
1027 */
1028 if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) {
1029 flb_input_chunk_destroy(ic, FLB_TRUE);
1030 }
1031
1032 return NULL;
1033 }
1034
1035 return ic;
1036 }
1037
flb_input_chunk_is_mem_overlimit(struct flb_input_instance * i)1038 static inline int flb_input_chunk_is_mem_overlimit(struct flb_input_instance *i)
1039 {
1040 if (i->mem_buf_limit <= 0) {
1041 return FLB_FALSE;
1042 }
1043
1044 if (i->mem_chunks_size >= i->mem_buf_limit) {
1045 return FLB_TRUE;
1046 }
1047
1048 return FLB_FALSE;
1049 }
1050
flb_input_chunk_is_storage_overlimit(struct flb_input_instance * i)1051 static inline int flb_input_chunk_is_storage_overlimit(struct flb_input_instance *i)
1052 {
1053 struct flb_storage_input *storage = (struct flb_storage_input *)i->storage;
1054
1055
1056 if (storage->type == CIO_STORE_FS) {
1057 if (i->storage_pause_on_chunks_overlimit == FLB_TRUE) {
1058 if (storage->cio->total_chunks >= storage->cio->max_chunks_up) {
1059 return FLB_TRUE;
1060 }
1061 }
1062 }
1063
1064 return FLB_FALSE;
1065 }
1066
1067 /*
1068 * Check all chunks associated to the input instance and summarize
1069 * the number of bytes in use.
1070 */
flb_input_chunk_total_size(struct flb_input_instance * in)1071 size_t flb_input_chunk_total_size(struct flb_input_instance *in)
1072 {
1073 size_t total = 0;
1074 struct flb_storage_input *storage;
1075
1076 storage = (struct flb_storage_input *) in->storage;
1077 total = cio_stream_size_chunks_up(storage->stream);
1078 return total;
1079 }
1080
1081 /*
1082 * Count and update the number of bytes being used by the instance. Also
1083 * check if the instance is paused, if so, check if it can be resumed if
1084 * is not longer over the limits.
1085 *
1086 * It always returns the number of bytes in use.
1087 */
flb_input_chunk_set_limits(struct flb_input_instance * in)1088 size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
1089 {
1090 size_t total;
1091
1092 /* Gather total number of enqueued bytes */
1093 total = flb_input_chunk_total_size(in);
1094 /* Register the total into the context variable */
1095 in->mem_chunks_size = total;
1096
1097 /*
1098 * After the adjustments, validate if the plugin is overlimit or paused
1099 * and perform further adjustments.
1100 */
1101 if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE &&
1102 in->config->is_running == FLB_TRUE &&
1103 in->config->is_ingestion_active == FLB_TRUE &&
1104 in->mem_buf_status == FLB_INPUT_PAUSED) {
1105 in->mem_buf_status = FLB_INPUT_RUNNING;
1106 if (in->p->cb_resume) {
1107 in->p->cb_resume(in->context, in->config);
1108 flb_info("[input] %s resume (mem buf overlimit)",
1109 in->name);
1110 }
1111 }
1112 if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE &&
1113 in->config->is_running == FLB_TRUE &&
1114 in->config->is_ingestion_active == FLB_TRUE &&
1115 in->storage_buf_status == FLB_INPUT_PAUSED) {
1116 in->storage_buf_status = FLB_INPUT_RUNNING;
1117 if (in->p->cb_resume) {
1118 in->p->cb_resume(in->context, in->config);
1119 flb_info("[input] %s resume (storage buf overlimit %d/%d)",
1120 in->name,
1121 ((struct flb_storage_input *)in->storage)->cio->total_chunks,
1122 ((struct flb_storage_input *)in->storage)->cio->max_chunks_up);
1123 }
1124 }
1125
1126 return total;
1127 }
1128
1129 /*
1130 * If the number of bytes in use by the chunks are over the imposed limit
1131 * by configuration, pause the instance.
1132 */
flb_input_chunk_protect(struct flb_input_instance * i)1133 static inline int flb_input_chunk_protect(struct flb_input_instance *i)
1134 {
1135 struct flb_storage_input *storage = i->storage;
1136
1137 if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) {
1138 flb_warn("[input] %s paused (storage buf overlimit %d/%d)",
1139 i->name,
1140 storage->cio->total_chunks,
1141 storage->cio->max_chunks_up);
1142
1143 if (!flb_input_buf_paused(i)) {
1144 if (i->p->cb_pause) {
1145 i->p->cb_pause(i->context, i->config);
1146 }
1147 }
1148 i->mem_buf_status = FLB_INPUT_PAUSED;
1149 return FLB_TRUE;
1150 }
1151
1152 if (storage->type == CIO_STORE_FS) {
1153 return FLB_FALSE;
1154 }
1155
1156 if (flb_input_chunk_is_mem_overlimit(i) == FLB_TRUE) {
1157 flb_warn("[input] %s paused (mem buf overlimit)",
1158 i->name);
1159 if (!flb_input_buf_paused(i)) {
1160 if (i->p->cb_pause) {
1161 i->p->cb_pause(i->context, i->config);
1162 }
1163 }
1164 i->storage_buf_status = FLB_INPUT_PAUSED;
1165 return FLB_TRUE;
1166 }
1167
1168 return FLB_FALSE;
1169 }
1170
1171 /*
1172 * Validate if the chunk coming from the input plugin based on config and
1173 * resources usage must be 'up' or 'down' (applicable for filesystem storage
1174 * type).
1175 *
1176 * FIXME: can we find a better name for this function ?
1177 */
flb_input_chunk_set_up_down(struct flb_input_chunk * ic)1178 int flb_input_chunk_set_up_down(struct flb_input_chunk *ic)
1179 {
1180 size_t total;
1181 struct flb_input_instance *in;
1182
1183 in = ic->in;
1184
1185 /* Gather total number of enqueued bytes */
1186 total = flb_input_chunk_total_size(in);
1187
1188 /* Register the total into the context variable */
1189 in->mem_chunks_size = total;
1190
1191 if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE) {
1192 if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
1193 cio_chunk_down(ic->chunk);
1194
1195 /* Adjust new counters */
1196 total = flb_input_chunk_total_size(ic->in);
1197 in->mem_chunks_size = total;
1198
1199 return FLB_FALSE;
1200 }
1201 }
1202
1203 return FLB_TRUE;
1204 }
1205
flb_input_chunk_is_up(struct flb_input_chunk * ic)1206 int flb_input_chunk_is_up(struct flb_input_chunk *ic)
1207 {
1208 return cio_chunk_is_up(ic->chunk);
1209
1210 }
1211
flb_input_chunk_down(struct flb_input_chunk * ic)1212 int flb_input_chunk_down(struct flb_input_chunk *ic)
1213 {
1214 if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
1215 return cio_chunk_down(ic->chunk);
1216 }
1217
1218 return 0;
1219 }
1220
flb_input_chunk_set_up(struct flb_input_chunk * ic)1221 int flb_input_chunk_set_up(struct flb_input_chunk *ic)
1222 {
1223 if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
1224 return cio_chunk_up(ic->chunk);
1225 }
1226
1227 return 0;
1228 }
1229
1230 /* Append a RAW MessagPack buffer to the input instance */
flb_input_chunk_append_raw(struct flb_input_instance * in,const char * tag,size_t tag_len,const void * buf,size_t buf_size)1231 int flb_input_chunk_append_raw(struct flb_input_instance *in,
1232 const char *tag, size_t tag_len,
1233 const void *buf, size_t buf_size)
1234 {
1235 int ret;
1236 int set_down = FLB_FALSE;
1237 int min;
1238 int meta_size;
1239 int new_chunk = FLB_FALSE;
1240 uint64_t ts;
1241 size_t diff;
1242 size_t size;
1243 size_t pre_size;
1244 struct flb_input_chunk *ic;
1245 struct flb_storage_input *si;
1246
1247 /* Check if the input plugin has been paused */
1248 if (flb_input_buf_paused(in) == FLB_TRUE) {
1249 flb_debug("[input chunk] %s is paused, cannot append records",
1250 in->name);
1251 return -1;
1252 }
1253
1254 if (buf_size == 0) {
1255 flb_debug("[input chunk] skip ingesting data with 0 bytes");
1256 return -1;
1257 }
1258
1259 /*
1260 * Some callers might not set a custom tag, on that case just inherit
1261 * the fixed instance tag or instance name.
1262 */
1263 if (!tag) {
1264 if (in->tag && in->tag_len > 0) {
1265 tag = in->tag;
1266 tag_len = in->tag_len;
1267 }
1268 else {
1269 tag = in->name;
1270 tag_len = strlen(in->name);
1271 }
1272 }
1273
1274 /*
1275 * Get a target input chunk, can be one with remaining space available
1276 * or a new one.
1277 */
1278 ic = input_chunk_get(in, tag, tag_len, buf_size, &set_down);
1279 if (!ic) {
1280 flb_error("[input chunk] no available chunk");
1281 return -1;
1282 }
1283
1284 /* newly created chunk */
1285 if (flb_input_chunk_get_size(ic) == 0) {
1286 new_chunk = FLB_TRUE;
1287 }
1288
1289 /* We got the chunk, validate if is 'up' or 'down' */
1290 ret = flb_input_chunk_is_up(ic);
1291 if (ret == FLB_FALSE) {
1292 ret = cio_chunk_up_force(ic->chunk);
1293 if (ret == -1) {
1294 flb_error("[input chunk] cannot retrieve temporary chunk");
1295 return -1;
1296 }
1297 set_down = FLB_TRUE;
1298 }
1299
1300 /*
1301 * Previous size from the chunk, used to calculate the difference
1302 * after filtering
1303 */
1304 pre_size = cio_chunk_get_content_size(ic->chunk);
1305
1306 /* Write the new data */
1307 ret = flb_input_chunk_write(ic, buf, buf_size);
1308 if (ret == -1) {
1309 flb_error("[input chunk] error writing data from %s instance",
1310 in->name);
1311 cio_chunk_tx_rollback(ic->chunk);
1312 return -1;
1313 }
1314
1315 /* Update 'input' metrics */
1316 #ifdef FLB_HAVE_METRICS
1317 if (ic->total_records > 0) {
1318 /* timestamp */
1319 ts = cmt_time_now();
1320
1321 /* fluentbit_input_records_total */
1322 cmt_counter_add(in->cmt_records, ts, ic->added_records,
1323 1, (char *[]) {(char *) flb_input_name(in)});
1324
1325 /* fluentbit_input_bytes_total */
1326 cmt_counter_add(in->cmt_bytes, ts, buf_size,
1327 1, (char *[]) {(char *) flb_input_name(in)});
1328
1329 /* OLD api */
1330 flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics);
1331 flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
1332 }
1333 #endif
1334
1335 /* Apply filters */
1336 if (in->event_type == FLB_INPUT_LOGS) {
1337 flb_filter_do(ic,
1338 buf, buf_size,
1339 tag, tag_len, in->config);
1340 }
1341
1342 /* Get chunk size */
1343 size = cio_chunk_get_content_size(ic->chunk);
1344
1345 /* calculate the 'real' new bytes being added after the filtering phase */
1346 diff = llabs(size - pre_size);
1347
1348 /*
1349 * Update output instance bytes counters, note that bytes counter should
1350 * always count the chunk size in the file system. Therefore, it should
1351 * add the extra bytes for the metadata.
1352 */
1353 meta_size = cio_meta_size(ic->chunk);
1354 if (new_chunk == FLB_TRUE) {
1355 diff += meta_size
1356 /* See https://github.com/edsiper/chunkio#file-layout for more details */
1357 + 2 /* HEADER BYTES */
1358 + 4 /* CRC32 */
1359 + 16 /* PADDING */
1360 + 2; /* METADATA LENGTH BYTES */
1361 }
1362
1363 /*
1364 * There is a case that rewrite_tag will modify the tag and keep rule is set
1365 * to drop the original record. The original record will still go through the
1366 * flb_input_chunk_update_output_instances(2) to update the fs_chunks_size by
1367 * metadata bytes (consisted by metadata bytes of the file chunk). This condition
1368 * sets the diff to 0 in order to not update the fs_chunks_size.
1369 */
1370 if (flb_input_chunk_get_size(ic) == 0) {
1371 diff = 0;
1372 }
1373
1374 if (diff != 0) {
1375 flb_input_chunk_update_output_instances(ic, diff);
1376 }
1377
1378 /* Lock buffers where size > 2MB */
1379 if (size > FLB_INPUT_CHUNK_FS_MAX_SIZE) {
1380 cio_chunk_lock(ic->chunk);
1381 }
1382
1383 /* Make sure the data was not filtered out and the buffer size is zero */
1384 if (size == 0) {
1385 flb_input_chunk_destroy(ic, FLB_TRUE);
1386 flb_input_chunk_set_limits(in);
1387 return 0;
1388 }
1389 #ifdef FLB_HAVE_STREAM_PROCESSOR
1390 else if (in->config->stream_processor_ctx &&
1391 ic->event_type == FLB_INPUT_LOGS) {
1392 char *c_data;
1393 size_t c_size;
1394
1395 /* Retrieve chunk (filtered) output content */
1396 cio_chunk_get_content(ic->chunk, &c_data, &c_size);
1397
1398 /* Invoke stream processor */
1399 flb_sp_do(in->config->stream_processor_ctx,
1400 in,
1401 tag, tag_len,
1402 c_data + ic->stream_off, c_size - ic->stream_off);
1403 ic->stream_off += (c_size - ic->stream_off);
1404 }
1405 #endif
1406
1407 if (set_down == FLB_TRUE) {
1408 cio_chunk_down(ic->chunk);
1409 }
1410
1411 /*
1412 * If the instance is not routable, there is no need to keep the
1413 * content in the storage engine, just get rid of it.
1414 */
1415 if (in->routable == FLB_FALSE) {
1416 flb_input_chunk_destroy(ic, FLB_TRUE);
1417 return 0;
1418 }
1419
1420 /* Update memory counters and adjust limits if any */
1421 flb_input_chunk_set_limits(in);
1422
1423 /*
1424 * Check if we are overlimit and validate if is there any filesystem
1425 * storage type asociated to this input instance, if so, unload the
1426 * chunk content from memory to respect imposed limits.
1427 *
1428 * Calling cio_chunk_down() the memory map associated and the file
1429 * descriptor will be released. At any later time, it must be bring up
1430 * for I/O operations.
1431 */
1432 si = (struct flb_storage_input *) in->storage;
1433 if (flb_input_chunk_is_mem_overlimit(in) == FLB_TRUE &&
1434 si->type == CIO_STORE_FS) {
1435 if (cio_chunk_is_up(ic->chunk) == CIO_TRUE) {
1436 /*
1437 * If we are already over limit, a sub-sequent data ingestion
1438 * might need a Chunk to write data in. As an optimization we
1439 * will put this Chunk down ONLY IF it has less than 1% of
1440 * it capacity as available space, otherwise keep it 'up' so
1441 * it available space can be used.
1442 */
1443 size = cio_chunk_get_content_size(ic->chunk);
1444
1445 /* Do we have less than 1% available ? */
1446 min = (FLB_INPUT_CHUNK_FS_MAX_SIZE * 0.01);
1447 if (FLB_INPUT_CHUNK_FS_MAX_SIZE - size < min) {
1448 cio_chunk_down(ic->chunk);
1449 }
1450 }
1451 }
1452
1453 flb_input_chunk_protect(in);
1454
1455 return 0;
1456 }
1457
1458 /* Retrieve a raw buffer from a dyntag node */
flb_input_chunk_flush(struct flb_input_chunk * ic,size_t * size)1459 const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size)
1460 {
1461 int ret;
1462 char *buf = NULL;
1463
1464 if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) {
1465 ret = cio_chunk_up(ic->chunk);
1466 if (ret == -1) {
1467 return NULL;
1468 }
1469 }
1470
1471 /*
1472 * msgpack-c internal use a raw buffer for it operations, since we
1473 * already appended data we just can take out the references to avoid
1474 * a new memory allocation and skip a copy operation.
1475 */
1476 ret = cio_chunk_get_content(ic->chunk, &buf, size);
1477 if (ret == -1) {
1478 flb_error("[input chunk] error retrieving chunk content");
1479 return NULL;
1480 }
1481
1482 if (!buf) {
1483 *size = 0;
1484 return NULL;
1485 }
1486
1487 /* Set it busy as it likely it's a reference for an outgoing task */
1488 ic->busy = FLB_TRUE;
1489
1490 /* Lock the internal chunk */
1491 cio_chunk_lock(ic->chunk);
1492
1493 return buf;
1494 }
1495
flb_input_chunk_release_lock(struct flb_input_chunk * ic)1496 int flb_input_chunk_release_lock(struct flb_input_chunk *ic)
1497 {
1498 if (ic->busy == FLB_FALSE) {
1499 return -1;
1500 }
1501
1502 ic->busy = FLB_FALSE;
1503 return 0;
1504 }
1505
flb_input_chunk_get_name(struct flb_input_chunk * ic)1506 flb_sds_t flb_input_chunk_get_name(struct flb_input_chunk *ic)
1507 {
1508 struct cio_chunk *ch;
1509
1510 ch = (struct cio_chunk *) ic->chunk;
1511 return ch->name;
1512 }
1513
flb_input_chunk_get_tag(struct flb_input_chunk * ic,const char ** tag_buf,int * tag_len)1514 int flb_input_chunk_get_tag(struct flb_input_chunk *ic,
1515 const char **tag_buf, int *tag_len)
1516 {
1517 int len;
1518 int ret;
1519 char *buf;
1520
1521 ret = cio_meta_read(ic->chunk, &buf, &len);
1522 if (ret == -1) {
1523 *tag_len = -1;
1524 *tag_buf = NULL;
1525 return -1;
1526 }
1527
1528 *tag_len = len;
1529 *tag_buf = buf;
1530
1531 return ret;
1532 }
1533
1534 /*
1535 * Iterates all output instances that the chunk will be flushing to and summarize
1536 * the total number of bytes in use after ingesting the new data.
1537 */
flb_input_chunk_update_output_instances(struct flb_input_chunk * ic,size_t chunk_size)1538 void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
1539 size_t chunk_size)
1540 {
1541 struct mk_list *head;
1542 struct flb_output_instance *o_ins;
1543
1544 /* for each output plugin, we update the fs_chunks_size */
1545 mk_list_foreach(head, &ic->in->config->outputs) {
1546 o_ins = mk_list_entry(head, struct flb_output_instance, _head);
1547 if (o_ins->total_limit_size == -1) {
1548 continue;
1549 }
1550
1551 if (flb_routes_mask_get_bit(ic->routes_mask, o_ins->id) != 0) {
1552 /*
1553 * if there is match on any index of 1's in the binary, it indicates
1554 * that the input chunk will flush to this output instance
1555 */
1556 o_ins->fs_chunks_size += chunk_size;
1557
1558 flb_debug("[input chunk] chunk %s update plugin %s fs_chunks_size by %ld bytes, "
1559 "the current fs_chunks_size is %ld bytes", flb_input_chunk_get_name(ic),
1560 o_ins->name, chunk_size, o_ins->fs_chunks_size);
1561 }
1562 }
1563 }
1564