1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ 2 3 /* Fluent Bit 4 * ========== 5 * Copyright (C) 2019-2020 The Fluent Bit Authors 6 * Copyright (C) 2015-2018 Treasure Data Inc. 7 * 8 * Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 21 #ifndef FLB_SP_H 22 #define FLB_SP_H 23 24 #include <fluent-bit/flb_info.h> 25 #include <fluent-bit/flb_config.h> 26 #include <fluent-bit/flb_sds.h> 27 #include <fluent-bit/flb_time.h> 28 #include <fluent-bit/flb_input.h> 29 #include <monkey/mk_core.h> 30 #include <rbtree.h> 31 32 /* Aggregate num type */ 33 #define FLB_SP_NUM_I64 0 34 #define FLB_SP_NUM_F64 1 35 #define FLB_SP_BOOLEAN 2 36 #define FLB_SP_STRING 3 37 38 struct aggregate_num { 39 int type; 40 int ops; 41 int64_t i64; 42 double f64; 43 bool boolean; 44 flb_sds_t string; 45 }; 46 47 struct aggregate_data { 48 struct aggregate_num *nums; 49 struct mk_list _head; 50 }; 51 52 struct timeseries_forecast { 53 struct aggregate_num *nums; 54 struct mk_list _head; 55 56 // future time to forecast 57 double future_time; 58 59 // time offset (the first time value captured) 60 double offset; 61 double latest_x; 62 63 double sigma_x; 64 double sigma_y; 65 66 double sigma_xy; 67 double sigma_x2; 68 }; 69 70 struct aggregate_node { 71 int groupby_keys; 72 int records; 73 int nums_size; 74 struct aggregate_num *nums; 75 struct aggregate_num *groupby_nums; 76 77 /* Aggregate data */ 78 struct aggregate_data **aggregate_data; 79 80 /* To keep track of the aggregation nodes */ 81 struct rb_tree_node _rb_head; 82 struct mk_list _head; 83 }; 84 85 struct flb_sp_window_data { 86 char *buf_data; 87 size_t buf_size; 88 struct mk_list _head; 89 }; 90 91 struct flb_sp_hopping_slot { 92 struct rb_tree aggregate_tree; 93 struct mk_list aggregate_list; 94 int records; 95 struct mk_list _head; 96 }; 97 98 struct flb_sp_task_window { 99 int type; 100 101 int fd; 102 struct mk_event event; 103 struct mk_event event_hop; 104 105 struct rb_tree aggregate_tree; 106 struct mk_list aggregate_list; 107 108 /* Hopping window parameters */ 109 /* 110 * first hopping window. Timer event is set to window size for the first, 111 * and will change to the advance_by time thereafter 112 */ 113 bool first_hop; 114 int fd_hop; 115 int advance_by; 116 struct mk_list hopping_slot; 117 118 int records; 119 120 struct mk_list data; 121 }; 122 123 struct flb_sp_task { 124 flb_sds_t name; /* task name */ 125 flb_sds_t query; /* SQL text query */ 126 127 /* 128 * if the command source is an existent stream (input plugin instance), we 129 * map the instance address here to perform a fast lookup once the data 130 * comes in. 131 */ 132 void *source_instance; 133 134 /* 135 * If the command created a new stream, this field keeps a reference to 136 * the initialized stream context. 137 */ 138 void *stream; 139 140 int aggregate_keys; /* do commands contains aggregate keys? */ 141 struct flb_sp *sp; /* parent context */ 142 struct flb_sp_cmd *cmd; /* (SQL) commands */ 143 144 struct flb_sp_task_window window; /* task window */ 145 146 void *snapshot; /* snapshot pages for SNAPSHOT sream type */ 147 148 struct mk_list _head; /* link to parent list flb_sp->tasks */ 149 }; 150 151 struct flb_sp { 152 struct mk_list tasks; /* processor tasks */ 153 struct flb_config *config; /* reference to Fluent Bit context */ 154 }; 155 156 struct flb_sp *flb_sp_create(struct flb_config *config); 157 void flb_sp_destroy(struct flb_sp *sp); 158 159 int flb_sp_do(struct flb_sp *sp, struct flb_input_instance *in, 160 const char *tag, int tag_len, 161 const char *buf_data, size_t buf_size); 162 int flb_sp_test_do(struct flb_sp *sp, struct flb_sp_task *task, 163 const char *tag, int tag_len, 164 const char *buf_data, size_t buf_size, 165 char **out_data, size_t *out_size); 166 int flb_sp_test_fd_event(int fd, struct flb_sp_task *task, char **out_data, 167 size_t *out_size); 168 169 int flb_sp_snapshot_create(struct flb_sp_task *task); 170 struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name, 171 const char *query); 172 int flb_sp_fd_event(int fd, struct flb_sp *sp); 173 void flb_sp_task_destroy(struct flb_sp_task *task); 174 void groupby_nums_destroy(struct aggregate_num *groupby_nums, int size); 175 void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd, 176 struct aggregate_node *aggregate_node); 177 178 #endif 179