1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2021 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #ifndef FLB_INPUT_CHUNK_H
22 #define FLB_INPUT_CHUNK_H
23 
24 #include <fluent-bit/flb_info.h>
25 #include <fluent-bit/flb_sds.h>
26 #include <fluent-bit/flb_routes_mask.h>
27 #include <monkey/mk_core.h>
28 #include <msgpack.h>
29 
30 #define FLB_INPUT_CHUNK_LOG            0
31 #define FLB_INPUT_CHUNK_METRIC         1
32 
33 /*
34  * This variable defines a 'hint' size for new Chunks created, this
35  * value is passed to Chunk I/O.
36  */
37 #define FLB_INPUT_CHUNK_SIZE           262144  /* 256KB (hint) */
38 
39 /*
40  * Defines a maximum size for a Chunk in the file system: note that despite
41  * this is considered a limit, a Chunk size might get greater than this.
42  */
43 #define FLB_INPUT_CHUNK_FS_MAX_SIZE   2048000  /* 2MB */
44 
45 struct flb_input_chunk {
46     int event_type;                 /* chunk type: logs or metrics */
47     int busy;                       /* buffer is being flushed  */
48     int fs_backlog;                 /* chunk originated from fs backlog */
49     int sp_done;                    /* sp already processed this chunk */
50 #ifdef FLB_HAVE_METRICS
51     int total_records;              /* total records in the chunk */
52     int added_records;              /* recently added records */
53 #endif
54     void *chunk;                    /* context of struct cio_chunk */
55     off_t stream_off;               /* stream offset */
56     msgpack_packer mp_pck;          /* msgpack packer */
57     struct flb_input_instance *in;  /* reference to parent input instance */
58     struct flb_task *task;          /* reference to the outgoing task */
59     uint64_t routes_mask
60         [FLB_ROUTES_MASK_ELEMENTS]; /* track the output plugins the chunk routes to */
61     struct mk_list _head;
62 };
63 
64 struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in,
65                                                const char *tag, int tag_len);
66 int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del);
67 void flb_input_chunk_destroy_all(struct flb_input_instance *in);
68 int flb_input_chunk_write(void *data, const char *buf, size_t len);
69 int flb_input_chunk_write_at(void *data, off_t offset,
70                              const char *buf, size_t len);
71 int flb_input_chunk_append_obj(struct flb_input_instance *in,
72                                const char *tag, int tag_len,
73                                msgpack_object data);
74 int flb_input_chunk_append_raw(struct flb_input_instance *in,
75                                const char *tag, size_t tag_len,
76                                const void *buf, size_t buf_size);
77 const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size);
78 int flb_input_chunk_release_lock(struct flb_input_chunk *ic);
79 flb_sds_t flb_input_chunk_get_name(struct flb_input_chunk *ic);
80 int flb_input_chunk_get_tag(struct flb_input_chunk *ic,
81                             const char **tag_buf, int *tag_len);
82 ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic);
83 size_t flb_input_chunk_set_limits(struct flb_input_instance *in);
84 size_t flb_input_chunk_total_size(struct flb_input_instance *in);
85 struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in,
86                                             void *chunk);
87 int flb_input_chunk_set_up_down(struct flb_input_chunk *ic);
88 int flb_input_chunk_set_up(struct flb_input_chunk *ic);
89 int flb_input_chunk_down(struct flb_input_chunk *ic);
90 int flb_input_chunk_is_up(struct flb_input_chunk *ic);
91 void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
92                                              size_t chunk_size);
93 
94 #endif
95