1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019-2020 The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #ifndef FLB_SP_H
22 #define FLB_SP_H
23 
24 #include <fluent-bit/flb_info.h>
25 #include <fluent-bit/flb_config.h>
26 #include <fluent-bit/flb_sds.h>
27 #include <fluent-bit/flb_time.h>
28 #include <fluent-bit/flb_input.h>
29 #include <monkey/mk_core.h>
30 #include <rbtree.h>
31 
32 /* Aggregate num type */
33 #define FLB_SP_NUM_I64       0
34 #define FLB_SP_NUM_F64       1
35 #define FLB_SP_BOOLEAN       2
36 #define FLB_SP_STRING        3
37 
38 struct aggregate_num {
39     int type;
40     int ops;
41     int64_t i64;
42     double f64;
43     bool boolean;
44     flb_sds_t string;
45 };
46 
47 struct aggregate_data {
48     struct aggregate_num *nums;
49     struct mk_list _head;
50 };
51 
52 struct timeseries_forecast {
53     struct aggregate_num *nums;
54     struct mk_list _head;
55 
56     // future time to forecast
57     double future_time;
58 
59     // time offset (the first time value captured)
60     double offset;
61     double latest_x;
62 
63     double sigma_x;
64     double sigma_y;
65 
66     double sigma_xy;
67     double sigma_x2;
68 };
69 
70 struct aggregate_node {
71     int groupby_keys;
72     int records;
73     int nums_size;
74     struct aggregate_num *nums;
75     struct aggregate_num *groupby_nums;
76 
77     /* Aggregate data */
78     struct aggregate_data **aggregate_data;
79 
80     /* To keep track of the aggregation nodes */
81     struct rb_tree_node _rb_head;
82     struct mk_list _head;
83 };
84 
85 struct flb_sp_window_data {
86     char *buf_data;
87     size_t buf_size;
88     struct mk_list _head;
89 };
90 
91 struct flb_sp_hopping_slot {
92     struct rb_tree aggregate_tree;
93     struct mk_list aggregate_list;
94     int records;
95     struct mk_list _head;
96 };
97 
98 struct flb_sp_task_window {
99     int type;
100 
101     int fd;
102     struct mk_event event;
103     struct mk_event event_hop;
104 
105     struct rb_tree aggregate_tree;
106     struct mk_list aggregate_list;
107 
108     /* Hopping window parameters */
109     /*
110      * first hopping window. Timer event is set to window size for the first,
111      * and will change to the advance_by time thereafter
112      */
113     bool first_hop;
114     int fd_hop;
115     int advance_by;
116     struct mk_list hopping_slot;
117 
118     int records;
119 
120     struct mk_list data;
121 };
122 
123 struct flb_sp_task {
124     flb_sds_t name;          /* task name      */
125     flb_sds_t query;         /* SQL text query */
126 
127     /*
128      * if the command source is an existent stream (input plugin instance), we
129      * map the instance address here to perform a fast lookup once the data
130      * comes in.
131      */
132     void *source_instance;
133 
134     /*
135      * If the command created a new stream, this field keeps a reference to
136      * the initialized stream context.
137      */
138     void *stream;
139 
140     int aggregate_keys;      /* do commands contains aggregate keys? */
141     struct flb_sp *sp;       /* parent context */
142     struct flb_sp_cmd *cmd;  /* (SQL) commands */
143 
144     struct flb_sp_task_window window; /* task window */
145 
146     void *snapshot;          /* snapshot pages for SNAPSHOT sream type */
147 
148     struct mk_list _head;    /* link to parent list flb_sp->tasks */
149 };
150 
151 struct flb_sp {
152     struct mk_list tasks;        /* processor tasks */
153     struct flb_config *config;   /* reference to Fluent Bit context */
154 };
155 
156 struct flb_sp *flb_sp_create(struct flb_config *config);
157 void flb_sp_destroy(struct flb_sp *sp);
158 
159 int flb_sp_do(struct flb_sp *sp, struct flb_input_instance *in,
160               const char *tag, int tag_len,
161               const char *buf_data, size_t buf_size);
162 int flb_sp_test_do(struct flb_sp *sp, struct flb_sp_task *task,
163                    const char *tag, int tag_len,
164                    const char *buf_data, size_t buf_size,
165                    char **out_data, size_t *out_size);
166 int flb_sp_test_fd_event(int fd, struct flb_sp_task *task, char **out_data,
167                          size_t *out_size);
168 
169 int flb_sp_snapshot_create(struct flb_sp_task *task);
170 struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
171                                        const char *query);
172 int flb_sp_fd_event(int fd, struct flb_sp *sp);
173 void flb_sp_task_destroy(struct flb_sp_task *task);
174 void groupby_nums_destroy(struct aggregate_num *groupby_nums, int size);
175 void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd,
176                                    struct aggregate_node *aggregate_node);
177 
178 #endif
179