1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ 2 3 /* Fluent Bit 4 * ========== 5 * Copyright (C) 2019-2021 The Fluent Bit Authors 6 * Copyright (C) 2015-2018 Treasure Data Inc. 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #ifndef FLB_INPUT_CHUNK_H 22 #define FLB_INPUT_CHUNK_H 23 24 #include <fluent-bit/flb_info.h> 25 #include <fluent-bit/flb_sds.h> 26 #include <fluent-bit/flb_routes_mask.h> 27 #include <monkey/mk_core.h> 28 #include <msgpack.h> 29 30 #define FLB_INPUT_CHUNK_LOG 0 31 #define FLB_INPUT_CHUNK_METRIC 1 32 33 /* 34 * This variable defines a 'hint' size for new Chunks created, this 35 * value is passed to Chunk I/O. 36 */ 37 #define FLB_INPUT_CHUNK_SIZE 262144 /* 256KB (hint) */ 38 39 /* 40 * Defines a maximum size for a Chunk in the file system: note that despite 41 * this is considered a limit, a Chunk size might get greater than this. 42 */ 43 #define FLB_INPUT_CHUNK_FS_MAX_SIZE 2048000 /* 2MB */ 44 45 struct flb_input_chunk { 46 int event_type; /* chunk type: logs or metrics */ 47 int busy; /* buffer is being flushed */ 48 int fs_backlog; /* chunk originated from fs backlog */ 49 int sp_done; /* sp already processed this chunk */ 50 #ifdef FLB_HAVE_METRICS 51 int total_records; /* total records in the chunk */ 52 int added_records; /* recently added records */ 53 #endif 54 void *chunk; /* context of struct cio_chunk */ 55 off_t stream_off; /* stream offset */ 56 msgpack_packer mp_pck; /* msgpack packer */ 57 struct flb_input_instance *in; /* reference to parent input instance */ 58 struct flb_task *task; /* reference to the outgoing task */ 59 uint64_t routes_mask 60 [FLB_ROUTES_MASK_ELEMENTS]; /* track the output plugins the chunk routes to */ 61 struct mk_list _head; 62 }; 63 64 struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, 65 const char *tag, int tag_len); 66 int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del); 67 void flb_input_chunk_destroy_all(struct flb_input_instance *in); 68 int flb_input_chunk_write(void *data, const char *buf, size_t len); 69 int flb_input_chunk_write_at(void *data, off_t offset, 70 const char *buf, size_t len); 71 int flb_input_chunk_append_obj(struct flb_input_instance *in, 72 const char *tag, int tag_len, 73 msgpack_object data); 74 int flb_input_chunk_append_raw(struct flb_input_instance *in, 75 const char *tag, size_t tag_len, 76 const void *buf, size_t buf_size); 77 const void *flb_input_chunk_flush(struct flb_input_chunk *ic, size_t *size); 78 int flb_input_chunk_release_lock(struct flb_input_chunk *ic); 79 flb_sds_t flb_input_chunk_get_name(struct flb_input_chunk *ic); 80 int flb_input_chunk_get_tag(struct flb_input_chunk *ic, 81 const char **tag_buf, int *tag_len); 82 ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic); 83 size_t flb_input_chunk_set_limits(struct flb_input_instance *in); 84 size_t flb_input_chunk_total_size(struct flb_input_instance *in); 85 struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in, 86 void *chunk); 87 int flb_input_chunk_set_up_down(struct flb_input_chunk *ic); 88 int flb_input_chunk_set_up(struct flb_input_chunk *ic); 89 int flb_input_chunk_down(struct flb_input_chunk *ic); 90 int flb_input_chunk_is_up(struct flb_input_chunk *ic); 91 void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, 92 size_t chunk_size); 93 94 #endif 95