1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Fluent Bit
4  *  ==========
5  *  Copyright (C) 2019      The Fluent Bit Authors
6  *  Copyright (C) 2015-2018 Treasure Data Inc.
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_log.h>
23 #include <fluent-bit/flb_sds.h>
24 #include <fluent-bit/flb_mem.h>
25 #include <fluent-bit/flb_slist.h>
26 #include <fluent-bit/flb_utils.h>
27 #include <fluent-bit/flb_time.h>
28 #include <fluent-bit/flb_input.h>
29 #include <fluent-bit/flb_pack.h>
30 #include <fluent-bit/flb_router.h>
31 #include <fluent-bit/stream_processor/flb_sp.h>
32 #include <fluent-bit/stream_processor/flb_sp_key.h>
33 #include <fluent-bit/stream_processor/flb_sp_stream.h>
34 #include <fluent-bit/stream_processor/flb_sp_snapshot.h>
35 #include <fluent-bit/stream_processor/flb_sp_parser.h>
36 #include <fluent-bit/stream_processor/flb_sp_func_time.h>
37 #include <fluent-bit/stream_processor/flb_sp_func_record.h>
38 #include <fluent-bit/stream_processor/flb_sp_aggregate_func.h>
39 #include <fluent-bit/stream_processor/flb_sp_window.h>
40 #include <fluent-bit/stream_processor/flb_sp_groupby.h>
41 
42 #include <stdlib.h>
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #ifndef _WIN32
46 #include <unistd.h>
47 #endif
48 
49 /* don't do this at home */
50 #define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d)
51 #define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d)
52 
53 /* String type to numerical conversion */
54 #define FLB_STR_INT   1
55 #define FLB_STR_FLOAT 2
56 
57 /* Read and process file system configuration file */
sp_config_file(struct flb_config * config,struct flb_sp * sp,const char * file)58 static int sp_config_file(struct flb_config *config, struct flb_sp *sp,
59                           const char *file)
60 {
61     int ret;
62     char *name;
63     char *exec;
64     const char *cfg = NULL;
65     char tmp[PATH_MAX + 1];
66     struct stat st;
67     struct mk_rconf *fconf;
68     struct mk_rconf_section *section;
69     struct mk_list *head;
70     struct flb_sp_task *task;
71 
72 #ifndef FLB_HAVE_STATIC_CONF
73     ret = stat(file, &st);
74     if (ret == -1 && errno == ENOENT) {
75         /* Try to resolve the real path (if exists) */
76         if (file[0] == '/') {
77             flb_error("[sp] cannot open configuration file: %s", file);
78             return -1;
79         }
80 
81         if (config->conf_path) {
82             snprintf(tmp, PATH_MAX, "%s%s", config->conf_path, file);
83             cfg = tmp;
84         }
85     }
86     else {
87         cfg = file;
88     }
89 
90     fconf = mk_rconf_open(cfg);
91 #else
92     fconf = flb_config_static_open(file);
93 #endif
94 
95     if (!fconf) {
96         return -1;
97     }
98 
99     /* Read all [STREAM_TASK] sections */
100     mk_list_foreach(head, &fconf->sections) {
101         section = mk_list_entry(head, struct mk_rconf_section, _head);
102         if (strcasecmp(section->name, "STREAM_TASK") != 0) {
103             continue;
104         }
105 
106         name = NULL;
107         exec = NULL;
108 
109         /* Name */
110         name = mk_rconf_section_get_key(section, "Name", MK_RCONF_STR);
111         if (!name) {
112             flb_error("[sp] task 'name' not found in file '%s'", cfg);
113             goto fconf_error;
114         }
115 
116         /* Exec */
117         exec = mk_rconf_section_get_key(section, "Exec", MK_RCONF_STR);
118         if (!exec) {
119             flb_error("[sp] task '%s' don't have an 'exec' command", name);
120             goto fconf_error;
121         }
122 
123         /* Register the task */
124         task = flb_sp_task_create(sp, name, exec);
125         if (!task) {
126             goto fconf_error;
127         }
128 
129         flb_free(name);
130         flb_free(exec);
131     }
132 
133     mk_rconf_free(fconf);
134     return 0;
135 
136 fconf_error:
137     flb_free(name);
138     flb_free(exec);
139 
140     return -1;
141 }
142 
sp_task_to_instance(struct flb_sp_task * task,struct flb_sp * sp)143 static int sp_task_to_instance(struct flb_sp_task *task, struct flb_sp *sp)
144 {
145     struct mk_list *head;
146     struct flb_input_instance *in;
147 
148     if (task->cmd->source_type != FLB_SP_STREAM) {
149         return -1;
150     }
151 
152     mk_list_foreach(head, &sp->config->inputs) {
153         in = mk_list_entry(head, struct flb_input_instance, _head);
154         if (in->alias) {
155             if (strcasecmp(in->alias, task->cmd->source_name) == 0) {
156                 task->source_instance = in;
157                 return 0;
158             }
159         }
160 
161         if (strcasecmp(in->name, task->cmd->source_name) == 0) {
162             task->source_instance = in;
163             return 0;
164         }
165     }
166 
167     return -1;
168 }
169 
sp_info(struct flb_sp * sp)170 static void sp_info(struct flb_sp *sp)
171 {
172     struct mk_list *head;
173     struct flb_sp_task *task;
174 
175     flb_info("[sp] stream processor started");
176 
177     mk_list_foreach(head, &sp->tasks) {
178         task = mk_list_entry(head, struct flb_sp_task, _head);
179         flb_info("[sp] registered task: %s", task->name);
180     }
181 }
182 
subkeys_compare(struct mk_list * subkeys1,struct mk_list * subkeys2)183 int subkeys_compare(struct mk_list *subkeys1, struct mk_list *subkeys2)
184 {
185     int i;
186     struct flb_slist_entry *entry1;
187     struct flb_slist_entry *entry2;
188 
189     if (!subkeys1 && !subkeys2) {
190         return 0;
191     }
192 
193     if (!subkeys1 || !subkeys2) {
194         return -1;
195     }
196 
197     if (mk_list_size(subkeys1) != mk_list_size(subkeys2)) {
198         return -1;
199     }
200 
201     entry1 = mk_list_entry_first(subkeys1, struct flb_slist_entry, _head);
202     entry2 = mk_list_entry_first(subkeys2, struct flb_slist_entry, _head);
203 
204     for (i = 0; i < mk_list_size(subkeys1); i++) {
205         if (flb_sds_cmp(entry1->str, entry2->str, flb_sds_len(entry2->str)) != 0) {
206             return -1;
207         }
208 
209         entry1 = mk_list_entry_next(&entry1->_head, struct flb_slist_entry,
210                                     _head, subkeys1);
211         entry2 = mk_list_entry_next(&entry2->_head, struct flb_slist_entry,
212                                     _head, subkeys2);
213     }
214 
215     return 0;
216 }
217 
sp_cmd_aggregated_keys(struct flb_sp_cmd * cmd)218 static int sp_cmd_aggregated_keys(struct flb_sp_cmd *cmd)
219 {
220     int aggr = 0;
221     int not_aggr = 0;
222     struct mk_list *head;
223     struct mk_list *head_gb;
224     struct flb_sp_cmd_key *key;
225     struct flb_sp_cmd_gb_key *gb_key;
226 
227     mk_list_foreach(head, &cmd->keys) {
228         key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
229         if (key->time_func > 0 || key->record_func > 0) {
230             continue;
231         }
232 
233         if (key->aggr_func > 0) {
234             /* AVG, SUM, COUNT or timeseries functions */
235             aggr++;
236         }
237         else {
238             mk_list_foreach(head_gb, &cmd->gb_keys) {
239                 gb_key = mk_list_entry(head_gb, struct flb_sp_cmd_gb_key, _head);
240 
241                 if (!key->name) { /* Key name is a wildcard '*' */
242                     break;
243                 }
244 
245                 if (flb_sds_cmp(key->name, gb_key->name,
246                                 flb_sds_len(gb_key->name)) == 0) {
247                     if (subkeys_compare(key->subkeys, gb_key->subkeys) != 0) {
248                         continue;
249                     }
250 
251                     not_aggr--;
252 
253                     /* Map key selector with group-by */
254                     key->gb_key = gb_key;
255                     break;
256                 }
257             }
258 
259             not_aggr++;
260         }
261     }
262 
263     /*
264      * if some aggregated function is required, not aggregated keys are
265      * not allowed so we return an error (-1).
266      */
267     if (aggr > 0 && not_aggr == 0) {
268         return aggr;
269     }
270     else if (aggr > 0 && not_aggr > 0) {
271         return -1;
272     }
273 
274     return 0;
275 }
276 
277 /*
278  * Convert a string to a numerical representation:
279  *
280  * - if output number is an integer, 'i' is set and returns FLB_STR_INT
281  * - if output number is a float, 'd' is set and returns FLB_STR_FLOAT
282  * - if no conversion is possible (not a number), returns -1
283  */
string_to_number(const char * str,int len,int64_t * i,double * d)284 static int string_to_number(const char *str, int len, int64_t *i, double *d)
285 {
286     int c;
287     int dots = 0;
288     char *end;
289     int64_t i_out;
290     double d_out;
291 
292     /* Detect if this is a floating point number */
293     for (c = 0; c < len; c++) {
294         if (str[c] == '.') {
295             dots++;
296         }
297     }
298 
299     if (dots > 1) {
300         return -1;
301     }
302     else if (dots == 1) {
303         /* Floating point number */
304         errno = 0;
305         d_out = strtold(str, &end);
306 
307         /* Check for various possible errors */
308         if ((errno == ERANGE || (errno != 0 && d_out == 0))) {
309             return -1;
310         }
311 
312         if (end == str) {
313             return -1;
314         }
315 
316         *d = d_out;
317         return FLB_STR_FLOAT;
318     }
319     else {
320         /* Integer */
321         errno = 0;
322         i_out = strtoll(str, &end, 10);
323 
324         /* Check for various possible errors */
325         if ((errno == ERANGE || (errno != 0 && i_out == 0))) {
326             return -1;
327         }
328 
329         if (end == str) {
330             return -1;
331         }
332 
333         *i = i_out;
334         return FLB_STR_INT;
335     }
336 
337     return -1;
338 }
339 
340 /*
341  * Convert a msgpack object value to a number 'if possible'. The conversion
342  * result is either stored on 'i' for 64 bits integers or in 'd' for
343  * float/doubles.
344  *
345  * This function aims to take care of strings representing a value too.
346  */
object_to_number(msgpack_object obj,int64_t * i,double * d)347 static int object_to_number(msgpack_object obj, int64_t *i, double *d)
348 {
349     int ret;
350     int64_t i_out;
351     double d_out;
352     char str_num[20];
353 
354     if (obj.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
355         obj.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
356         *i = obj.via.i64;
357         return FLB_STR_INT;
358     }
359     else if (obj.type == MSGPACK_OBJECT_FLOAT32 ||
360              obj.type == MSGPACK_OBJECT_FLOAT) {
361         *d = obj.via.f64;
362         return FLB_STR_FLOAT;
363     }
364     else if (obj.type == MSGPACK_OBJECT_STR) {
365         /* A numeric representation of a string should not exceed 19 chars */
366         if (obj.via.str.size > 19) {
367             return -1;
368         }
369 
370         memcpy(str_num, obj.via.str.ptr, obj.via.str.size);
371         str_num[obj.via.str.size] = '\0';
372 
373         ret = string_to_number(str_num, obj.via.str.size,
374                                &i_out, &d_out);
375         if (ret == FLB_STR_FLOAT) {
376             *d = d_out;
377             return FLB_STR_FLOAT;
378         }
379         else if (ret == FLB_STR_INT) {
380             *i = i_out;
381             return FLB_STR_INT;
382         }
383     }
384 
385     return -1;
386 }
387 
flb_sp_snapshot_create(struct flb_sp_task * task)388 int flb_sp_snapshot_create(struct flb_sp_task *task)
389 {
390     struct flb_sp_cmd *cmd;
391     struct flb_sp_snapshot *snapshot;
392 
393     cmd = task->cmd;
394 
395     snapshot = (struct flb_sp_snapshot *) flb_calloc(1, sizeof(struct flb_sp_snapshot));
396     if (!snapshot) {
397         flb_error("[sp] could not create snapshot '%s'", cmd->stream_name);
398         return -1;
399     }
400 
401     mk_list_init(&snapshot->pages);
402     snapshot->record_limit = cmd->limit;
403 
404     if (flb_sp_cmd_stream_prop_get(cmd, "seconds") != NULL) {
405         snapshot->time_limit = atoi(flb_sp_cmd_stream_prop_get(cmd, "seconds"));
406     }
407 
408     if (snapshot->time_limit == 0 && snapshot->record_limit == 0) {
409         flb_error("[sp] could not create snapshot '%s': size is not defined",
410                   cmd->stream_name);
411         flb_sp_snapshot_destroy(snapshot);
412         return -1;
413     }
414 
415     task->snapshot = snapshot;
416     return 0;
417 }
418 
flb_sp_task_create(struct flb_sp * sp,const char * name,const char * query)419 struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
420                                        const char *query)
421 {
422     int fd;
423     int ret;
424     struct mk_event *event;
425     struct flb_sp_cmd *cmd;
426     struct flb_sp_task *task;
427 
428     /*
429      * Parse and validate the incoming exec query and create the 'command'
430      * context (this will be associated to the task in a later step
431      */
432     cmd = flb_sp_cmd_create(query);
433 
434     if (!cmd) {
435         flb_error("[sp] invalid query on task '%s': '%s'", name, query);
436         return NULL;
437     }
438 
439     /* Check if we got an invalid type due an error/restriction */
440     if (cmd->status == FLB_SP_ERROR) {
441         flb_error("[sp] invalid query on task '%s': '%s'", name, query);
442         flb_sp_cmd_destroy(cmd);
443         return NULL;
444     }
445 
446     /* Create the task context */
447     task = flb_calloc(1, sizeof(struct flb_sp_task));
448     if (!task) {
449         flb_errno();
450         flb_sp_cmd_destroy(cmd);
451         return NULL;
452     }
453     task->name = flb_sds_create(name);
454     if (!task->name) {
455         flb_free(task);
456         flb_sp_cmd_destroy(cmd);
457         return NULL;
458     }
459 
460     task->query = flb_sds_create(query);
461     if (!task->query) {
462         flb_sds_destroy(task->name);
463         flb_free(task);
464         flb_sp_cmd_destroy(cmd);
465         return NULL;
466     }
467 
468     task->sp = sp;
469     task->cmd = cmd;
470     mk_list_add(&task->_head, &sp->tasks);
471 
472     /*
473      * Assume no aggregated keys exists, if so, a different strategy is
474      * required to process the records.
475      */
476     task->aggregate_keys = FLB_FALSE;
477 
478     mk_list_init(&task->window.data);
479     mk_list_init(&task->window.aggregate_list);
480     rb_tree_new(&task->window.aggregate_tree, flb_sp_groupby_compare);
481 
482     mk_list_init(&task->window.hopping_slot);
483 
484     /* Check and validate aggregated keys */
485     ret = sp_cmd_aggregated_keys(task->cmd);
486     if (ret == -1) {
487         flb_error("[sp] aggregated query cannot mix not aggregated keys: %s",
488                   query);
489         flb_sp_task_destroy(task);
490         return NULL;
491     }
492     else if (ret > 0) {
493         task->aggregate_keys = FLB_TRUE;
494 
495         task->window.type = cmd->window.type;
496 
497         /* Register a timer event when task contains aggregation rules */
498         if (task->window.type != FLB_SP_WINDOW_DEFAULT) {
499             /* Initialize event loop context */
500             event = &task->window.event;
501             MK_EVENT_ZERO(event);
502 
503             /* Run every 'size' seconds */
504             fd = mk_event_timeout_create(sp->config->evl,
505                                          cmd->window.size, (long) 0,
506                                          &task->window.event);
507             if (fd == -1) {
508                 flb_error("[sp] registration for task %s failed", task->name);
509                 flb_free(task);
510                 return NULL;
511             }
512             task->window.fd = fd;
513 
514             if (task->window.type == FLB_SP_WINDOW_HOPPING) {
515                 /* Initialize event loop context */
516                 event = &task->window.event_hop;
517                 MK_EVENT_ZERO(event);
518 
519                 /* Run every 'size' seconds */
520                 fd = mk_event_timeout_create(sp->config->evl,
521                                              cmd->window.advance_by, (long) 0,
522                                              &task->window.event_hop);
523                 if (fd == -1) {
524                     flb_error("[sp] registration for task %s failed", task->name);
525                     flb_free(task);
526                     return NULL;
527                 }
528                 task->window.advance_by = cmd->window.advance_by;
529                 task->window.fd_hop = fd;
530                 task->window.first_hop = true;
531             }
532         }
533     }
534 
535     /* Init snapshot page list */
536     if (cmd->type == FLB_SP_CREATE_SNAPSHOT) {
537         if (flb_sp_snapshot_create(task) == -1) {
538             flb_sp_task_destroy(task);
539             return NULL;
540         }
541     }
542 
543     /*
544      * If the task involves a stream creation (CREATE STREAM abc..), create
545      * the stream.
546      */
547     if (cmd->type == FLB_SP_CREATE_STREAM ||
548         cmd->type == FLB_SP_CREATE_SNAPSHOT ||
549         cmd->type == FLB_SP_FLUSH_SNAPSHOT) {
550 
551         ret = flb_sp_stream_create(cmd->stream_name, task, sp);
552         if (ret == -1) {
553             flb_error("[sp] could not create stream '%s'", cmd->stream_name);
554             flb_sp_task_destroy(task);
555             return NULL;
556         }
557     }
558 
559     /*
560      * Based in the command type, check if the source of data is a known
561      * stream so make a reference on this task for a quick comparisson and
562      * access it when processing data.
563      */
564     sp_task_to_instance(task, sp);
565     return task;
566 }
567 
groupby_nums_destroy(struct aggregate_num * groupby_nums,int size)568 void groupby_nums_destroy(struct aggregate_num *groupby_nums, int size)
569 {
570     int i;
571 
572     for (i = 0; i < size; i++) {
573         if (groupby_nums[i].type == FLB_SP_STRING) {
574             flb_sds_destroy(groupby_nums[i].string);
575           }
576     }
577 
578     flb_free(groupby_nums);
579 }
580 
581 /*
582  * Destroy aggregation node context: before to use this function make sure
583  * to unlink from the linked list.
584  */
flb_sp_aggregate_node_destroy(struct flb_sp_cmd * cmd,struct aggregate_node * aggr_node)585 void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd,
586                               struct aggregate_node *aggr_node)
587 {
588     int i;
589     int key_id;
590     struct mk_list *head;
591     struct aggregate_num *num;
592     struct flb_sp_cmd_key *ckey;
593 
594     for (i = 0; i < aggr_node->nums_size; i++) {
595         num = &aggr_node->nums[i];
596         if (num->type == FLB_SP_STRING) {
597             flb_sds_destroy(num->string);
598         }
599     }
600 
601     groupby_nums_destroy(aggr_node->groupby_nums, aggr_node->groupby_keys);
602 
603     key_id = 0;
604     mk_list_foreach(head, &cmd->keys) {
605         ckey = mk_list_entry(head, struct flb_sp_cmd_key, _head);
606 
607         if (!ckey->aggr_func) {
608             key_id++;
609             continue;
610         }
611 
612         aggregate_func_destroy[ckey->aggr_func - 1](aggr_node, key_id);
613         key_id++;
614     }
615 
616     flb_free(aggr_node->nums);
617     flb_free(aggr_node->aggregate_data);
618     flb_free(aggr_node);
619 }
620 
flb_sp_window_destroy(struct flb_sp_cmd * cmd,struct flb_sp_task_window * window)621 void flb_sp_window_destroy(struct flb_sp_cmd *cmd,
622                            struct flb_sp_task_window *window)
623 {
624     struct flb_sp_window_data *data;
625     struct aggregate_node *aggr_node;
626     struct flb_sp_hopping_slot *hs;
627     struct mk_list *head;
628     struct mk_list *tmp;
629     struct mk_list *head_hs;
630     struct mk_list *tmp_hs;
631 
632     mk_list_foreach_safe(head, tmp, &window->data) {
633         data = mk_list_entry(head, struct flb_sp_window_data, _head);
634         flb_free(data->buf_data);
635         mk_list_del(&data->_head);
636         flb_free(data);
637     }
638 
639     mk_list_foreach_safe(head, tmp, &window->aggregate_list) {
640         aggr_node = mk_list_entry(head, struct aggregate_node, _head);
641         mk_list_del(&aggr_node->_head);
642         flb_sp_aggregate_node_destroy(cmd, aggr_node);
643     }
644 
645     mk_list_foreach_safe(head, tmp, &window->hopping_slot) {
646         hs = mk_list_entry(head, struct flb_sp_hopping_slot, _head);
647         mk_list_foreach_safe(head_hs, tmp_hs, &hs->aggregate_list) {
648             aggr_node = mk_list_entry(head_hs, struct aggregate_node, _head);
649             mk_list_del(&aggr_node->_head);
650             flb_sp_aggregate_node_destroy(cmd, aggr_node);
651         }
652         rb_tree_destroy(&hs->aggregate_tree);
653         flb_free(hs);
654     }
655 
656     rb_tree_destroy(&window->aggregate_tree);
657 }
658 
flb_sp_task_destroy(struct flb_sp_task * task)659 void flb_sp_task_destroy(struct flb_sp_task *task)
660 {
661     flb_sds_destroy(task->name);
662     flb_sds_destroy(task->query);
663     flb_sp_window_destroy(task->cmd, &task->window);
664     flb_sp_snapshot_destroy(task->snapshot);
665     mk_list_del(&task->_head);
666 
667     if (task->stream) {
668         flb_sp_stream_destroy(task->stream, task->sp);
669     }
670 
671     flb_sp_cmd_destroy(task->cmd);
672     flb_free(task);
673 }
674 
675 /* Create the stream processor context */
flb_sp_create(struct flb_config * config)676 struct flb_sp *flb_sp_create(struct flb_config *config)
677 {
678     int i = 0;
679     int ret;
680     char buf[32];
681     struct mk_list *head;
682     struct flb_sp *sp;
683     struct flb_slist_entry *e;
684     struct flb_sp_task *task;
685 
686     /* Allocate context */
687     sp = flb_malloc(sizeof(struct flb_sp));
688     if (!sp) {
689         flb_errno();
690         return NULL;
691     }
692     sp->config = config;
693     mk_list_init(&sp->tasks);
694 
695     /* Check for pre-configured Tasks (command line) */
696     mk_list_foreach(head, &config->stream_processor_tasks) {
697         e = mk_list_entry(head, struct flb_slist_entry, _head);
698         snprintf(buf, sizeof(buf) - 1, "flb-console:%i", i);
699         i++;
700         task = flb_sp_task_create(sp, buf, e->str);
701         if (!task) {
702             continue;
703         }
704     }
705 
706     /* Lookup configuration file if any */
707     if (config->stream_processor_file) {
708         ret = sp_config_file(config, sp, config->stream_processor_file);
709         if (ret == -1) {
710             flb_error("[sp] could not initialize stream processor");
711             flb_sp_destroy(sp);
712             return NULL;
713         }
714     }
715 
716     /* Write sp info to stdout */
717     sp_info(sp);
718 
719     return sp;
720 }
721 
free_value(struct flb_exp_val * v)722 void free_value(struct flb_exp_val *v)
723 {
724     if (!v) {
725         return;
726     }
727 
728     if (v->type == FLB_EXP_STRING) {
729         flb_sds_destroy(v->val.string);
730     }
731 
732     flb_free(v);
733 }
734 
itof_convert(struct flb_exp_val * val)735 static void itof_convert(struct flb_exp_val *val)
736 {
737     if (val->type != FLB_EXP_INT) {
738         return;
739     }
740 
741     val->type = FLB_EXP_FLOAT;
742     val->val.f64 = (double) val->val.i64;
743 }
744 
745 /* Convert (string) expression to number */
exp_string_to_number(struct flb_exp_val * val)746 static void exp_string_to_number(struct flb_exp_val *val)
747 {
748     int ret;
749     int len;
750     int64_t i = 0;
751     char *str;
752     double d = 0.0;
753 
754     len = flb_sds_len(val->val.string);
755     str = val->val.string;
756 
757     ret = string_to_number(str, len, &i, &d);
758     if (ret == -1) {
759         return;
760     }
761 
762     /* Assign to proper type */
763     if (ret == FLB_STR_FLOAT) {
764         flb_sds_destroy(val->val.string);
765         val->type = FLB_EXP_FLOAT;
766         val->val.f64 = d;
767     }
768     else if (ret == FLB_STR_INT) {
769         flb_sds_destroy(val->val.string);
770         val->type = FLB_EXP_INT;
771         val->val.i64 = i;
772     }
773 }
774 
numerical_comp(struct flb_exp_val * left,struct flb_exp_val * right,struct flb_exp_val * result,int op)775 static void numerical_comp(struct flb_exp_val *left,
776                            struct flb_exp_val *right,
777                            struct flb_exp_val *result, int op)
778 {
779     result->type = FLB_EXP_BOOL;
780 
781     if (left == NULL || right == NULL) {
782         result->val.boolean = false;
783         return;
784     }
785 
786     /* Check if left expression value is a number, if so, convert it */
787     if (left->type == FLB_EXP_STRING && right->type != FLB_EXP_STRING) {
788         exp_string_to_number(left);
789     }
790 
791     if (left->type == FLB_EXP_INT && right->type == FLB_EXP_FLOAT) {
792         itof_convert(left);
793     }
794     else if (left->type == FLB_EXP_FLOAT && right->type == FLB_EXP_INT) {
795         itof_convert(right);
796     }
797 
798     switch (op) {
799     case FLB_EXP_EQ:
800         if (left->type == right->type) {
801             switch(left->type) {
802             case FLB_EXP_NULL:
803                 result->val.boolean = true;
804                 break;
805             case FLB_EXP_BOOL:
806                 result->val.boolean = (left->val.boolean == right->val.boolean);
807                 break;
808             case FLB_EXP_INT:
809                 result->val.boolean = (left->val.i64 == right->val.i64);
810                 break;
811             case FLB_EXP_FLOAT:
812                 result->val.boolean = (left->val.f64 == right->val.f64);
813                 break;
814             case FLB_EXP_STRING:
815                 if (flb_sds_len(left->val.string) !=
816                     flb_sds_len(right->val.string)) {
817                     result->val.boolean = false;
818                 }
819                 else if (strncmp(left->val.string, right->val.string,
820                                  flb_sds_len(left->val.string)) != 0) {
821                     result->val.boolean = false;
822                 }
823                 else {
824                     result->val.boolean = true;
825                 }
826                 break;
827             default:
828                 result->val.boolean = false;
829                 break;
830             }
831         }
832         else {
833             result->val.boolean = false;
834         }
835         break;
836     case FLB_EXP_LT:
837         if (left->type == right->type) {
838             switch(left->type) {
839             case FLB_EXP_INT:
840                 result->val.boolean = (left->val.i64 < right->val.i64);
841                 break;
842             case FLB_EXP_FLOAT:
843                 result->val.boolean = (left->val.f64 < right->val.f64);
844                 break;
845             case FLB_EXP_STRING:
846                 if (strncmp(left->val.string, right->val.string,
847                             flb_sds_len(left->val.string)) < 0) {
848                     result->val.boolean = true;
849                 }
850                 else {
851                     result->val.boolean = false;
852                 }
853                 break;
854             default:
855                 result->val.boolean = false;
856                 break;
857             }
858         }
859         else {
860             result->val.boolean = false;
861         }
862         break;
863     case FLB_EXP_LTE:
864         if (left->type == right->type) {
865             switch(left->type) {
866             case FLB_EXP_INT:
867                 result->val.boolean = (left->val.i64 <= right->val.i64);
868                 break;
869             case FLB_EXP_FLOAT:
870                 result->val.boolean = (left->val.f64 <= right->val.f64);
871                 break;
872             case FLB_EXP_STRING:
873                 if (strncmp(left->val.string, right->val.string,
874                             flb_sds_len(left->val.string)) <= 0) {
875                     result->val.boolean = true;
876                 }
877                 else {
878                     result->val.boolean = false;
879                 }
880                 break;
881             default:
882                 result->val.boolean = false;
883                 break;
884             }
885         }
886         else {
887             result->val.boolean = false;
888         }
889         break;
890     case FLB_EXP_GT:
891         if (left->type == right->type) {
892             switch(left->type) {
893             case FLB_EXP_INT:
894                 result->val.boolean = (left->val.i64 > right->val.i64);
895                 break;
896             case FLB_EXP_FLOAT:
897                 result->val.boolean = (left->val.f64 > right->val.f64);
898                 break;
899             case FLB_EXP_STRING:
900                 if (strncmp(left->val.string, right->val.string,
901                             flb_sds_len(left->val.string)) > 0) {
902                     result->val.boolean = true;
903                 }
904                 else {
905                     result->val.boolean = false;
906                 }
907                 break;
908             default:
909                 result->val.boolean = false;
910                 break;
911             }
912         }
913         else {
914             result->val.boolean = false;
915         }
916         break;
917     case FLB_EXP_GTE:
918         if (left->type == right->type) {
919             switch(left->type) {
920             case FLB_EXP_INT:
921                 result->val.boolean = (left->val.i64 >= right->val.i64);
922                 break;
923             case FLB_EXP_FLOAT:
924                 result->val.boolean = (left->val.f64 >= right->val.f64);
925                 break;
926             case FLB_EXP_STRING:
927                 if (strncmp(left->val.string, right->val.string,
928                             flb_sds_len(left->val.string)) >= 0) {
929                     result->val.boolean = true;
930                 }
931                 else {
932                     result->val.boolean = false;
933                 }
934                 break;
935             default:
936                 result->val.boolean = false;
937                 break;
938             }
939         }
940         else {
941             result->val.boolean = false;
942         }
943         break;
944     }
945 }
946 
value_to_bool(struct flb_exp_val * val)947 static bool value_to_bool(struct flb_exp_val *val) {
948     bool result = FLB_FALSE;
949 
950     switch (val->type) {
951     case FLB_EXP_BOOL:
952         result = val->val.boolean;
953         break;
954     case FLB_EXP_INT:
955         result = val->val.i64 > 0;
956         break;
957     case FLB_EXP_FLOAT:
958         result = val->val.f64 > 0;
959         break;
960     case FLB_EXP_STRING:
961         result = true;
962         break;
963     }
964 
965     return result;
966 }
967 
968 
logical_operation(struct flb_exp_val * left,struct flb_exp_val * right,struct flb_exp_val * result,int op)969 static void logical_operation(struct flb_exp_val *left,
970                               struct flb_exp_val *right,
971                               struct flb_exp_val *result, int op)
972 {
973     bool lval;
974     bool rval;
975 
976     result->type = FLB_EXP_BOOL;
977 
978     /* Null is always interpreted as false in a logical operation */
979     lval = left ? value_to_bool(left) : false;
980     rval = right ? value_to_bool(right) : false;
981 
982     switch (op) {
983     case FLB_EXP_NOT:
984         result->val.boolean = !lval;
985         break;
986     case FLB_EXP_AND:
987         result->val.boolean = lval & rval;
988         break;
989     case FLB_EXP_OR:
990         result->val.boolean = lval | rval;
991         break;
992     }
993 }
994 
reduce_expression(struct flb_exp * expression,const char * tag,int tag_len,struct flb_time * tms,msgpack_object * map)995 static struct flb_exp_val *reduce_expression(struct flb_exp *expression,
996                                              const char *tag, int tag_len,
997                                              struct flb_time *tms,
998                                              msgpack_object *map)
999 {
1000     int operation;
1001     flb_sds_t s;
1002     flb_sds_t tmp_sds = NULL;
1003     struct flb_exp_key *key;
1004     struct flb_sp_value *sval;
1005     struct flb_exp_val *ret, *left, *right;
1006     struct flb_exp_val *result;
1007 
1008     if (!expression) {
1009         return NULL;
1010     }
1011 
1012     result = flb_calloc(1, sizeof(struct flb_exp_val));
1013     if (!result) {
1014         flb_errno();
1015         return NULL;
1016     }
1017 
1018     switch (expression->type) {
1019     case FLB_EXP_NULL:
1020         result->type = expression->type;
1021         break;
1022     case FLB_EXP_BOOL:
1023         result->type = expression->type;
1024         result->val.boolean = ((struct flb_exp_val *) expression)->val.boolean;
1025         break;
1026     case FLB_EXP_INT:
1027         result->type = expression->type;
1028         result->val.i64 = ((struct flb_exp_val *) expression)->val.i64;
1029         break;
1030     case FLB_EXP_FLOAT:
1031         result->type = expression->type;
1032         result->val.f64 = ((struct flb_exp_val *) expression)->val.f64;
1033         break;
1034     case FLB_EXP_STRING:
1035         s = ((struct flb_exp_val *) expression)->val.string;
1036         result->type = expression->type;
1037         result->val.string = flb_sds_create_size(flb_sds_len(s));
1038         tmp_sds = flb_sds_copy(result->val.string, s, flb_sds_len(s));
1039         if (tmp_sds != result->val.string) {
1040             result->val.string = tmp_sds;
1041         }
1042         break;
1043     case FLB_EXP_KEY:
1044         key = (struct flb_exp_key *) expression;
1045         sval = flb_sp_key_to_value(key->name, *map, key->subkeys);
1046         if (sval) {
1047             result->type = sval->type;
1048             result->val = sval->val;
1049             flb_free(sval);
1050             return result;
1051         }
1052         else {
1053             flb_free(result);
1054             return NULL;
1055         }
1056         break;
1057     case FLB_EXP_FUNC:
1058         /* we don't need result */
1059         flb_free(result);
1060         ret = reduce_expression(((struct flb_exp_func *) expression)->param,
1061                                 tag, tag_len, tms, map);
1062         result = ((struct flb_exp_func *) expression)->cb_func(tag, tag_len,
1063                                                                tms, ret);
1064         free_value(ret);
1065         break;
1066     case FLB_LOGICAL_OP:
1067         left = reduce_expression(expression->left,
1068                                  tag, tag_len, tms, map);
1069         right = reduce_expression(expression->right,
1070                                   tag, tag_len, tms, map);
1071 
1072         operation = ((struct flb_exp_op *) expression)->operation;
1073 
1074         switch (operation) {
1075         case FLB_EXP_PAR:
1076             if (left == NULL) { /* Null is always interpreted as false in a
1077                                    logical operation */
1078                 result->type = FLB_EXP_BOOL;
1079                 result->val.boolean = false;
1080             }
1081             else { /* Left and right sides of a logical operation reduce to
1082                       boolean values */
1083                 result->type = FLB_EXP_BOOL;
1084                 result->val.boolean = left->val.boolean;
1085             }
1086             break;
1087         case FLB_EXP_EQ:
1088         case FLB_EXP_LT:
1089         case FLB_EXP_LTE:
1090         case FLB_EXP_GT:
1091         case FLB_EXP_GTE:
1092             numerical_comp(left, right, result, operation);
1093             break;
1094         case FLB_EXP_NOT:
1095         case FLB_EXP_AND:
1096         case FLB_EXP_OR:
1097             logical_operation(left, right, result, operation);
1098             break;
1099         }
1100         free_value(left);
1101         free_value(right);
1102     }
1103     return result;
1104 }
1105 
1106 
package_results(const char * tag,int tag_len,char ** out_buf,size_t * out_size,struct flb_sp_task * task)1107 static void package_results(const char *tag, int tag_len,
1108                             char **out_buf, size_t *out_size,
1109                             struct flb_sp_task *task)
1110 {
1111     int i;
1112     int len;
1113     int map_entries;
1114     msgpack_sbuffer mp_sbuf;
1115     msgpack_packer mp_pck;
1116     struct aggregate_num *num;
1117     struct flb_time tm;
1118     struct flb_sp_cmd_key *ckey;
1119     struct flb_sp_cmd *cmd = task->cmd;
1120     struct mk_list *head;
1121     struct aggregate_node *aggr_node;
1122     struct flb_sp_cmd_gb_key *gb_key = NULL;
1123 
1124     map_entries = mk_list_size(&cmd->keys);
1125 
1126     msgpack_sbuffer_init(&mp_sbuf);
1127     msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1128 
1129     mk_list_foreach(head, &task->window.aggregate_list) {
1130         aggr_node = mk_list_entry(head, struct aggregate_node, _head);
1131 
1132         /* set outgoing array + map and it fixed size */
1133         msgpack_pack_array(&mp_pck, 2);
1134 
1135         flb_time_get(&tm);
1136         flb_time_append_to_msgpack(&tm, &mp_pck, 0);
1137         msgpack_pack_map(&mp_pck, map_entries);
1138 
1139         /* Packaging results */
1140         ckey = mk_list_entry_first(&cmd->keys, struct flb_sp_cmd_key, _head);
1141         for (i = 0; i < map_entries; i++) {
1142             num = &aggr_node->nums[i];
1143 
1144             /* Check if there is a defined function */
1145             if (ckey->time_func > 0) {
1146                 flb_sp_func_time(&mp_pck, ckey);
1147                 goto next;
1148             }
1149             else if (ckey->record_func > 0) {
1150                 flb_sp_func_record(tag, tag_len, &tm, &mp_pck, ckey);
1151                 goto next;
1152             }
1153 
1154             /* Pack key */
1155             if (ckey->alias) {
1156                 msgpack_pack_str(&mp_pck, flb_sds_len(ckey->alias));
1157                 msgpack_pack_str_body(&mp_pck,
1158                                       ckey->alias,
1159                                       flb_sds_len(ckey->alias));
1160             }
1161             else {
1162                 len = 0;
1163                 char *c_name;
1164                 if (!ckey->name) {
1165                     c_name = "*";
1166                 }
1167                 else {
1168                     c_name = ckey->name;
1169                 }
1170 
1171                 msgpack_pack_str(&mp_pck, len);
1172                 msgpack_pack_str_body(&mp_pck, c_name, len);
1173             }
1174 
1175             /*
1176              * If a group_by key is mapped as a source of this key,
1177              * change the 'num' reference to obtain the proper information
1178              * for the grouped key value.
1179              */
1180             if (ckey->gb_key != NULL) {
1181                 gb_key = ckey->gb_key;
1182                 if (aggr_node->groupby_keys > 0) {
1183                     num = &aggr_node->groupby_nums[gb_key->id];
1184                 }
1185             }
1186 
1187             /* Pack value */
1188             switch (ckey->aggr_func) {
1189             case FLB_SP_NOP:
1190                 if (num->type == FLB_SP_NUM_I64) {
1191                     msgpack_pack_int64(&mp_pck, num->i64);
1192                 }
1193                 else if (num->type == FLB_SP_NUM_F64) {
1194                     msgpack_pack_float(&mp_pck, num->f64);
1195                 }
1196                 else if (num->type == FLB_SP_STRING) {
1197                     msgpack_pack_str(&mp_pck,
1198                                      flb_sds_len(num->string));
1199                     msgpack_pack_str_body(&mp_pck,
1200                                           num->string,
1201                                           flb_sds_len(num->string));
1202                 }
1203                 else if (num->type == FLB_SP_BOOLEAN) {
1204                     if (num->boolean) {
1205                         msgpack_pack_true(&mp_pck);
1206                     }
1207                     else {
1208                         msgpack_pack_false(&mp_pck);
1209                     }
1210                 }
1211                 break;
1212             default:
1213                 aggregate_func_calc[ckey->aggr_func - 1](aggr_node, ckey, &mp_pck, i);
1214                 break;
1215             }
1216 
1217 next:
1218             ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key,
1219                                       _head, &cmd->keys);
1220         }
1221     }
1222 
1223     *out_buf = mp_sbuf.data;
1224     *out_size = mp_sbuf.size;
1225 }
1226 
sp_process_aggregate_data(struct flb_sp_task * task,msgpack_object map)1227 static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *task,
1228                                                          msgpack_object map)
1229 {
1230     int i;
1231     int ret;
1232     int map_size;
1233     int key_id;
1234     int map_entries;
1235     int gb_entries;
1236     int values_found;
1237     int64_t ival;
1238     double dval;
1239     struct flb_sp_value *sval;
1240     struct aggregate_num *gb_nums;
1241     struct aggregate_node *aggr_node;
1242     struct flb_sp_cmd *cmd;
1243     struct flb_sp_cmd_gb_key *gb_key;
1244     struct mk_list *head;
1245     struct rb_tree_node *rb_result;
1246     msgpack_object key;
1247 
1248     aggr_node = NULL;
1249     cmd = task->cmd;
1250     map_size = map.via.map.size;
1251     values_found = 0;
1252 
1253     /* Number of expected output entries in the map */
1254     map_entries = mk_list_size(&cmd->keys);
1255     gb_entries = mk_list_size(&cmd->gb_keys);
1256 
1257     if (gb_entries > 0) {
1258         gb_nums = flb_calloc(1, sizeof(struct aggregate_num) * gb_entries);
1259         if (!gb_nums) {
1260             return NULL;
1261         }
1262 
1263         /* extract GROUP BY values */
1264         for (i = 0; i < map_size; i++) { /* extract group-by values */
1265             key = map.via.map.ptr[i].key;
1266 
1267             key_id = 0;
1268             mk_list_foreach(head, &cmd->gb_keys) {
1269                 gb_key = mk_list_entry(head, struct flb_sp_cmd_gb_key,
1270                                        _head);
1271                 if (flb_sds_cmp(gb_key->name, key.via.str.ptr,
1272                                 key.via.str.size) != 0) {
1273                     key_id++;
1274                     continue;
1275                 }
1276 
1277                 sval = flb_sp_key_to_value(gb_key->name, map, gb_key->subkeys);
1278                 if (!sval) {
1279                     /* If evaluation fails/sub-key doesn't exist */
1280                     key_id++;
1281                     continue;
1282                 }
1283 
1284                 values_found++;
1285 
1286                 /* Convert string to number if that is possible */
1287                 ret = object_to_number(sval->o, &ival, &dval);
1288                 if (ret == -1) {
1289                     if (sval->o.type == MSGPACK_OBJECT_STR) {
1290                         gb_nums[key_id].type = FLB_SP_STRING;
1291                         gb_nums[key_id].string =
1292                             flb_sds_create_len(sval->o.via.str.ptr,
1293                                                sval->o.via.str.size);
1294                     }
1295                     else if (sval->o.type == MSGPACK_OBJECT_BOOLEAN) {
1296                         gb_nums[key_id].type = FLB_SP_NUM_I64;
1297                         gb_nums[key_id].i64 = sval->o.via.boolean;
1298                     }
1299                 }
1300                 else if (ret == FLB_STR_INT) {
1301                     gb_nums[key_id].type = FLB_SP_NUM_I64;
1302                     gb_nums[key_id].i64 = ival;
1303                 }
1304                 else if (ret == FLB_STR_FLOAT) {
1305                     gb_nums[key_id].type = FLB_SP_NUM_F64;
1306                     gb_nums[key_id].f64 = dval;
1307                 }
1308 
1309                 key_id++;
1310                 flb_sp_key_value_destroy(sval);
1311             }
1312         }
1313 
1314         /* if some GROUP BY keys are not found in the record */
1315         if (values_found < gb_entries) {
1316             groupby_nums_destroy(gb_nums, gb_entries);
1317             return NULL;
1318         }
1319 
1320         aggr_node = (struct aggregate_node *) flb_calloc(1, sizeof(struct aggregate_node));
1321         if (!aggr_node) {
1322             flb_errno();
1323             groupby_nums_destroy(gb_nums, gb_entries);
1324             return NULL;
1325         }
1326 
1327         aggr_node->groupby_keys = gb_entries;
1328         aggr_node->groupby_nums = gb_nums;
1329 
1330         rb_tree_find_or_insert(&task->window.aggregate_tree, aggr_node, &aggr_node->_rb_head, &rb_result);
1331         if (&aggr_node->_rb_head != rb_result) {
1332             /* We don't need aggr_node anymore */
1333             flb_sp_aggregate_node_destroy(cmd, aggr_node);
1334 
1335             aggr_node = container_of(rb_result, struct aggregate_node, _rb_head);
1336             container_of(rb_result, struct aggregate_node, _rb_head)->records++;
1337         }
1338         else {
1339             aggr_node->nums = flb_calloc(1, sizeof(struct aggregate_num) * map_entries);
1340             if (!aggr_node->nums) {
1341                 flb_sp_aggregate_node_destroy(cmd, aggr_node);
1342                 return NULL;
1343             }
1344             aggr_node->records = 1;
1345             aggr_node->nums_size = map_entries;
1346             aggr_node->aggregate_data = (struct aggregate_data **) flb_calloc(1, sizeof(struct aggregate_data *) * map_entries);
1347             mk_list_add(&aggr_node->_head, &task->window.aggregate_list);
1348         }
1349     }
1350     else { /* If query doesn't have GROUP BY */
1351         if (!mk_list_size(&task->window.aggregate_list)) {
1352             aggr_node = flb_calloc(1, sizeof(struct aggregate_node));
1353             if (!aggr_node) {
1354                 flb_errno();
1355                 return NULL;
1356             }
1357             aggr_node->nums = flb_calloc(1, sizeof(struct aggregate_num) * map_entries);
1358             if (!aggr_node->nums) {
1359                 flb_sp_aggregate_node_destroy(cmd, aggr_node);
1360                 return NULL;
1361             }
1362 
1363             aggr_node->nums_size = map_entries;
1364             aggr_node->records = 1;
1365             aggr_node->aggregate_data = (struct aggregate_data **) flb_calloc(1, sizeof(struct aggregate_data *) * map_entries);
1366             mk_list_add(&aggr_node->_head, &task->window.aggregate_list);
1367         }
1368         else {
1369             aggr_node = mk_list_entry_first(&task->window.aggregate_list, struct aggregate_node, _head);
1370             aggr_node->records++;
1371         }
1372     }
1373 
1374     return aggr_node;
1375 }
1376 
1377 /*
1378  * Process data, task and it defined command involves the call of aggregation
1379  * functions (AVG, SUM, COUNT, MIN, MAX).
1380  */
sp_process_data_aggr(const char * buf_data,size_t buf_size,const char * tag,int tag_len,struct flb_sp_task * task,struct flb_sp * sp)1381 static int sp_process_data_aggr(const char *buf_data, size_t buf_size,
1382                                 const char *tag, int tag_len,
1383                                 struct flb_sp_task *task,
1384                                 struct flb_sp *sp)
1385 {
1386     int i;
1387     int ok;
1388     int ret;
1389     int map_size;
1390     int key_id;
1391     int values_found;
1392     size_t off;
1393     int64_t ival;
1394     double dval;
1395     msgpack_object root;
1396     msgpack_object map;
1397     msgpack_unpacked result;
1398     msgpack_object key;
1399     msgpack_object *obj;
1400     struct aggregate_num *nums = NULL;
1401     struct mk_list *head;
1402     struct flb_time tms;
1403     struct flb_sp_cmd *cmd = task->cmd;
1404     struct flb_sp_cmd_key *ckey;
1405     struct flb_sp_value *sval;
1406     struct flb_exp_val *condition;
1407     struct aggregate_node *aggr_node;
1408 
1409     /* Number of expected output entries in the map */
1410     off = 0;
1411 
1412     /* vars initialization */
1413     ok = MSGPACK_UNPACK_SUCCESS;
1414     msgpack_unpacked_init(&result);
1415 
1416     /* Iterate incoming records */
1417     while (msgpack_unpack_next(&result, buf_data, buf_size, &off) == ok) {
1418         root = result.data;
1419 
1420         /* extract timestamp */
1421         flb_time_pop_from_msgpack(&tms, &result, &obj);
1422 
1423         /* get the map data and it size (number of items) */
1424         map   = root.via.array.ptr[1];
1425         map_size = map.via.map.size;
1426 
1427         /* Evaluate condition */
1428         if (cmd->condition) {
1429             condition = reduce_expression(cmd->condition,
1430                                           tag, tag_len, &tms, &map);
1431             if (!condition) {
1432                 continue;
1433             }
1434             else if (!condition->val.boolean) {
1435                 flb_free(condition);
1436                 continue;
1437             }
1438             else {
1439                 flb_free(condition);
1440             }
1441         }
1442 
1443         aggr_node = sp_process_aggregate_data(task, map);
1444         if (!aggr_node)
1445         {
1446             continue;
1447         }
1448 
1449         task->window.records++;
1450 
1451         nums = aggr_node->nums;
1452 
1453         values_found = 0;
1454         /* Iterate each map key and see if it matches any command key */
1455         for (i = 0; i < map_size; i++) {
1456             key = map.via.map.ptr[i].key;
1457 
1458             if (key.type != MSGPACK_OBJECT_STR) {
1459                 continue;
1460             }
1461 
1462 
1463             /*
1464              * Iterate each command key. Note that since the command key
1465              * can have different aggregation functions to the same key
1466              * we should compare all of them.
1467              */
1468             key_id = 0;
1469             mk_list_foreach(head, &cmd->keys) {
1470                 ckey = mk_list_entry(head, struct flb_sp_cmd_key, _head);
1471 
1472                 if (!ckey->name) {
1473                     key_id++;
1474                     continue;
1475                 }
1476 
1477                 if (flb_sds_cmp(ckey->name, key.via.str.ptr,
1478                                 key.via.str.size) != 0) {
1479                     key_id++;
1480                     continue;
1481                 }
1482 
1483                 /* convert the value if it string */
1484                 sval = flb_sp_key_to_value(ckey->name, map, ckey->subkeys);
1485                 if (!sval) {
1486                     key_id++;
1487                     continue;
1488                 }
1489 
1490                 values_found++;
1491 
1492                 /*
1493                  * Convert value to a numeric representation only if key has an
1494                  * assigned aggregation function
1495                  */
1496                 ival = 0;
1497                 dval = 0.0;
1498                 if (ckey->aggr_func != FLB_SP_NOP) {
1499                     ret = object_to_number(sval->o, &ival, &dval);
1500                     if (ret == -1) {
1501                         /* Value cannot be represented as a number */
1502                         key_id++;
1503                         flb_sp_key_value_destroy(sval);
1504                         continue;
1505                     }
1506 
1507                     /*
1508                      * If a floating pointer number exists, we use the same data
1509                      * type for the output.
1510                      */
1511                     if (dval != 0.0 && nums[key_id].type == FLB_SP_NUM_I64) {
1512                         nums[key_id].type = FLB_SP_NUM_F64;
1513                         nums[key_id].f64 = (double) nums[key_id].i64;
1514                     }
1515 
1516                     aggregate_func_add[ckey->aggr_func - 1](aggr_node, ckey, key_id, &tms, ival, dval);
1517                 }
1518                 else {
1519                     if (sval->o.type == MSGPACK_OBJECT_BOOLEAN) {
1520                         nums[key_id].type = FLB_SP_BOOLEAN;
1521                         nums[key_id].boolean = sval->o.via.boolean;
1522                     }
1523                     if (sval->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
1524                         sval->o.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
1525                         nums[key_id].type = FLB_SP_NUM_I64;
1526                         nums[key_id].i64 = sval->o.via.i64;
1527                     }
1528                     else if (sval->o.type == MSGPACK_OBJECT_FLOAT32 ||
1529                              sval->o.type == MSGPACK_OBJECT_FLOAT) {
1530                         nums[key_id].type = FLB_SP_NUM_F64;
1531                         nums[key_id].f64 = sval->o.via.f64;
1532                     }
1533                     else if (sval->o.type == MSGPACK_OBJECT_STR) {
1534                         nums[key_id].type = FLB_SP_STRING;
1535                         if (nums[key_id].string == NULL) {
1536                             nums[key_id].string =
1537                                 flb_sds_create_len(sval->o.via.str.ptr,
1538                                                    sval->o.via.str.size);
1539                         }
1540                     }
1541                 }
1542 
1543                 key_id++;
1544                 flb_sp_key_value_destroy(sval);
1545             }
1546         }
1547     }
1548 
1549     msgpack_unpacked_destroy(&result);
1550     return task->window.records;
1551 }
1552 
1553 /*
1554  * Data processing (no aggregation functions)
1555  */
sp_process_data(const char * tag,int tag_len,const char * buf_data,size_t buf_size,char ** out_buf,size_t * out_size,struct flb_sp_task * task,struct flb_sp * sp)1556 static int sp_process_data(const char *tag, int tag_len,
1557                            const char *buf_data, size_t buf_size,
1558                            char **out_buf, size_t *out_size,
1559                            struct flb_sp_task *task,
1560                            struct flb_sp *sp)
1561 {
1562     int i;
1563     int ok;
1564     int ret;
1565     int map_size;
1566     int map_entries;
1567     int records;
1568     uint8_t h;
1569     off_t map_off;
1570     off_t no_data;
1571     size_t off;
1572     size_t off_copy;
1573     size_t snapshot_out_size;
1574     char *tmp;
1575     char *snapshot_out_buffer;
1576     msgpack_object root;
1577     msgpack_object *obj;
1578     msgpack_object key;
1579     msgpack_object val;
1580     msgpack_unpacked result;
1581     msgpack_sbuffer mp_sbuf;
1582     msgpack_packer mp_pck;
1583     msgpack_object map;
1584     struct flb_time tms;
1585     struct mk_list *head;
1586     struct flb_sp_cmd *cmd;
1587     struct flb_sp_cmd_key *cmd_key;
1588     struct flb_exp_val *condition;
1589     struct flb_sp_value *sval;
1590 
1591     /* Vars initialization */
1592     off = 0;
1593     off_copy = off;
1594     records = 0;
1595     cmd = task->cmd;
1596     ok = MSGPACK_UNPACK_SUCCESS;
1597     msgpack_unpacked_init(&result);
1598     msgpack_sbuffer_init(&mp_sbuf);
1599     msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1600 
1601     snapshot_out_size = 0;
1602     snapshot_out_buffer = NULL;
1603 
1604     /* Iterate incoming records */
1605     while (msgpack_unpack_next(&result, buf_data, buf_size, &off) == ok) {
1606         root = result.data;
1607 
1608         /* extract timestamp */
1609         flb_time_pop_from_msgpack(&tms, &result, &obj);
1610 
1611         /* Store the buffer if the stream is a snapshot */
1612         if (cmd->type == FLB_SP_CREATE_SNAPSHOT) {
1613             flb_sp_snapshot_update(task, buf_data + off_copy, off - off_copy, &tms);
1614             off_copy = off;
1615             continue;
1616         }
1617 
1618         /* get the map data and it size (number of items) */
1619         map   = root.via.array.ptr[1];
1620         map_size = map.via.map.size;
1621 
1622         /* Evaluate condition */
1623         if (cmd->condition) {
1624             condition = reduce_expression(cmd->condition,
1625                                           tag, tag_len, &tms, &map);
1626             if (!condition) {
1627                 continue;
1628             }
1629             else if (!condition->val.boolean) {
1630                 flb_free(condition);
1631                 continue;
1632             }
1633             else {
1634                 flb_free(condition);
1635             }
1636         }
1637 
1638         records++;
1639 
1640         /* Flush the snapshot if condition holds */
1641         if (cmd->type == FLB_SP_FLUSH_SNAPSHOT) {
1642             if (flb_sp_snapshot_flush(sp, task, &snapshot_out_buffer,
1643                                       &snapshot_out_size) == -1) {
1644                 msgpack_unpacked_destroy(&result);
1645                 msgpack_sbuffer_destroy(&mp_sbuf);
1646                 return -1;
1647             }
1648             continue;
1649         }
1650 
1651 
1652         /*
1653          * If for some reason the Task keys did not insert any data, we will
1654          * need to discard any changes and reset the buffer position, let's
1655          * keep the memory size for that purpose.
1656          */
1657         no_data = mp_sbuf.size;
1658 
1659         /* Pack main array */
1660         msgpack_pack_array(&mp_pck, 2);
1661         msgpack_pack_object(&mp_pck, root.via.array.ptr[0]);
1662 
1663         /*
1664          * Save the current size/position of the buffer since this is
1665          * where the Map header will be stored.
1666          */
1667         map_off = mp_sbuf.size;
1668 
1669         /*
1670          * In the new record register the same number of items, if due to
1671          * fields selection the number is lower, we perform an adjustment
1672          */
1673         msgpack_pack_map(&mp_pck, map_size);
1674 
1675         /* Counter for new entries added to the outgoing map */
1676         map_entries = 0;
1677 
1678         /* Iterate key selection */
1679         mk_list_foreach(head, &cmd->keys) {
1680             cmd_key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
1681             if (cmd_key->time_func > 0) {
1682                 /* Process time function */
1683                 ret = flb_sp_func_time(&mp_pck, cmd_key);
1684                 if (ret > 0) {
1685                     map_entries += ret;
1686                 }
1687                 continue;
1688             }
1689             else if (cmd_key->record_func > 0) {
1690                 ret = flb_sp_func_record(tag, tag_len, &tms, &mp_pck, cmd_key);
1691                 if (ret > 0) {
1692                     map_entries += ret;
1693                 }
1694                 continue;
1695             }
1696 
1697             /* Lookup selection key in the incoming map */
1698             for (i = 0; i < map_size; i++) {
1699                 key = map.via.map.ptr[i].key;
1700                 val = map.via.map.ptr[i].val;
1701 
1702                 if (key.type != MSGPACK_OBJECT_STR) {
1703                     continue;
1704                 }
1705 
1706                 /* Wildcard selection: * */
1707                 if (cmd_key->name == NULL) {
1708                     msgpack_pack_object(&mp_pck, key);
1709                     msgpack_pack_object(&mp_pck, val);
1710                     map_entries++;
1711                     continue;
1712                 }
1713 
1714                 /* Compare lengths */
1715                 if (flb_sds_cmp(cmd_key->name,
1716                                 key.via.str.ptr, key.via.str.size) != 0) {
1717                     continue;
1718                 }
1719 
1720                 /*
1721                  * Package key name:
1722                  *
1723                  * Check if the command ask for an alias 'key AS abc'
1724                  */
1725                 if (cmd_key->alias) {
1726                     msgpack_pack_str(&mp_pck,
1727                                      flb_sds_len(cmd_key->alias));
1728                     msgpack_pack_str_body(&mp_pck,
1729                                           cmd_key->alias,
1730                                           flb_sds_len(cmd_key->alias));
1731                 }
1732                 else {
1733                     msgpack_pack_object(&mp_pck, key);
1734                 }
1735 
1736                 /* Package value */
1737                 sval = flb_sp_key_to_value(cmd_key->name, map,
1738                                            cmd_key->subkeys);
1739                 if (sval) {
1740                     msgpack_pack_object(&mp_pck, sval->o);
1741                     flb_sp_key_value_destroy(sval);
1742                 }
1743 
1744                 map_entries++;
1745             }
1746         }
1747 
1748         /* Final Map size adjustment */
1749         if (map_entries == 0) {
1750             mp_sbuf.size = no_data;
1751         }
1752         else {
1753             /*
1754              * The fields were packed, now we need to adjust the map size
1755              * to set the proper number of fields appended to the record.
1756              */
1757             tmp = mp_sbuf.data + map_off;
1758             h = tmp[0];
1759             if (h >> 4 == 0x8) {
1760                 *tmp = (uint8_t) 0x8 << 4 | ((uint8_t) map_entries);
1761             }
1762             else if (h == 0xde) {
1763                 tmp++;
1764                 pack_uint16(tmp, map_entries);
1765             }
1766             else if (h == 0xdf) {
1767                 tmp++;
1768                 pack_uint32(tmp, map_entries);
1769             }
1770         }
1771     }
1772 
1773     msgpack_unpacked_destroy(&result);
1774 
1775     if (records == 0) {
1776         msgpack_sbuffer_destroy(&mp_sbuf);
1777         return 0;
1778     }
1779 
1780     /* Use snapshot out buffer if it is flush stream */
1781     if (cmd->type == FLB_SP_FLUSH_SNAPSHOT) {
1782         if (snapshot_out_size == 0) {
1783             msgpack_sbuffer_destroy(&mp_sbuf);
1784             flb_free(snapshot_out_buffer);
1785             return 0;
1786         }
1787         else {
1788             *out_buf = snapshot_out_buffer;
1789             *out_size = snapshot_out_size;
1790             return records;
1791         }
1792     }
1793 
1794     /* set outgoing results */
1795     *out_buf = mp_sbuf.data;
1796     *out_size = mp_sbuf.size;
1797 
1798     return records;
1799 }
1800 
sp_process_hopping_slot(const char * tag,int tag_len,struct flb_sp_task * task)1801 static int sp_process_hopping_slot(const char *tag, int tag_len,
1802                                    struct flb_sp_task *task)
1803 {
1804     int i;
1805     int key_id;
1806     int map_entries;
1807     int gb_entries;
1808     struct flb_sp_cmd *cmd = task->cmd;
1809     struct mk_list *head;
1810     struct mk_list *head_hs;
1811     struct aggregate_node *aggr_node;
1812     struct aggregate_node *aggr_node_hs;
1813     struct aggregate_node *aggr_node_prev;
1814     struct flb_sp_hopping_slot *hs;
1815     struct flb_sp_hopping_slot *hs_;
1816     struct rb_tree_node *rb_result;
1817     struct flb_sp_cmd_key *ckey;
1818     rb_result_t result;
1819 
1820     map_entries = mk_list_size(&cmd->keys);
1821     gb_entries = mk_list_size(&cmd->gb_keys);
1822 
1823     /* Initialize a hoping slot */
1824     hs = flb_calloc(1, sizeof(struct flb_sp_hopping_slot));
1825     if (!hs) {
1826         flb_errno();
1827         return -1;
1828     }
1829 
1830     mk_list_init(&hs->aggregate_list);
1831     rb_tree_new(&hs->aggregate_tree, flb_sp_groupby_compare);
1832 
1833     /* Loop over aggregation nodes on window */
1834     mk_list_foreach(head, &task->window.aggregate_list) {
1835         /* Window aggregation node */
1836         aggr_node = mk_list_entry(head, struct aggregate_node, _head);
1837 
1838         /* Create a hopping slot aggregation node */
1839         aggr_node_hs = flb_calloc(1, sizeof(struct aggregate_node));
1840         if (!aggr_node_hs) {
1841             flb_errno();
1842             flb_free(hs);
1843             return -1;
1844         }
1845 
1846         aggr_node_hs->nums = malloc(sizeof(struct aggregate_node) * map_entries);
1847         if (!aggr_node_hs->nums) {
1848             flb_errno();
1849             flb_free(hs);
1850             flb_free(aggr_node_hs);
1851             return -1;
1852         }
1853 
1854         memcpy(aggr_node_hs->nums, aggr_node->nums, sizeof(struct aggregate_num) * map_entries);
1855         aggr_node_hs->records = aggr_node->records;
1856 
1857         /* Clone aggregate data */
1858         key_id = 0;
1859         mk_list_foreach(head_hs, &cmd->keys) {
1860             ckey = mk_list_entry(head_hs, struct flb_sp_cmd_key, _head);
1861 
1862             if (ckey->aggr_func) {
1863                 if (!aggr_node_hs->aggregate_data) {
1864                     aggr_node_hs->aggregate_data = (struct aggregate_data **)
1865                                        flb_calloc(1, sizeof(struct aggregate_data *) * map_entries);
1866                     if (!aggr_node_hs->aggregate_data) {
1867                         flb_errno();
1868                         flb_free(hs);
1869                         flb_free(aggr_node_hs->nums);
1870                         flb_free(aggr_node_hs);
1871                         return -1;
1872                     }
1873                 }
1874 
1875                 if (aggregate_func_clone[ckey->aggr_func - 1](aggr_node_hs, aggr_node, ckey, key_id) == -1) {
1876                     flb_errno();
1877                     flb_free(aggr_node_hs->nums);
1878                     flb_free(aggr_node_hs->aggregate_data);
1879                     flb_free(aggr_node_hs);
1880                     flb_free(hs);
1881                     return -1;
1882                 }
1883             }
1884 
1885             key_id++;
1886         }
1887 
1888         /* Traverse over previous slots to calculate values/record numbers */
1889         mk_list_foreach(head_hs, &task->window.hopping_slot) {
1890             hs_ = mk_list_entry(head_hs, struct flb_sp_hopping_slot, _head);
1891             result = rb_tree_find(&hs_->aggregate_tree, aggr_node, &rb_result);
1892             /* If corresponding aggregation node exists in previous hopping slot,
1893              * calculate aggregation values
1894              */
1895             if (result == RB_OK) {
1896                 aggr_node_prev = mk_list_entry(rb_result, struct aggregate_node,
1897                                                _rb_head);
1898                 aggr_node_hs->records -= aggr_node_prev->records;
1899 
1900                 key_id = 0;
1901                 ckey = mk_list_entry_first(&cmd->keys, struct flb_sp_cmd_key,
1902                                            _head);
1903                 for (i = 0; i < map_entries; i++) {
1904                     if (ckey->aggr_func) {
1905                         aggregate_func_remove[ckey->aggr_func - 1](aggr_node_hs, aggr_node_prev, i);
1906                     }
1907 
1908                     ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key,
1909                                               _head, &cmd->keys);
1910                 }
1911             }
1912         }
1913 
1914         if (aggr_node_hs->records > 0) {
1915             aggr_node_hs->groupby_nums =
1916                 flb_calloc(1, sizeof(struct aggregate_node) * gb_entries);
1917             if (gb_entries > 0 && !aggr_node_hs->groupby_nums) {
1918                 flb_errno();
1919                 flb_free(hs);
1920                 flb_free(aggr_node_hs->nums);
1921                 flb_free(aggr_node_hs->aggregate_data);
1922                 flb_free(aggr_node_hs);
1923                 return -1;
1924             }
1925 
1926             if (aggr_node_hs->groupby_nums != NULL) {
1927                 memcpy(aggr_node_hs->groupby_nums, aggr_node->groupby_nums,
1928                        sizeof(struct aggregate_num) * gb_entries);
1929             }
1930 
1931             aggr_node_hs->nums_size = aggr_node->nums_size;
1932             aggr_node_hs->groupby_keys = aggr_node->groupby_keys;
1933 
1934             rb_tree_insert(&hs->aggregate_tree, aggr_node_hs, &aggr_node_hs->_rb_head);
1935             mk_list_add(&aggr_node_hs->_head, &hs->aggregate_list);
1936         }
1937         else {
1938             flb_free(aggr_node_hs->nums);
1939             flb_free(aggr_node_hs->aggregate_data);
1940             flb_free(aggr_node_hs);
1941         }
1942     }
1943 
1944     hs->records = task->window.records;
1945     mk_list_foreach(head_hs, &task->window.hopping_slot) {
1946         hs_ = mk_list_entry(head_hs, struct flb_sp_hopping_slot, _head);
1947         hs->records -= hs_->records;
1948     }
1949 
1950     mk_list_add(&hs->_head, &task->window.hopping_slot);
1951 
1952     return 0;
1953 }
1954 
1955 /*
1956  * Do data processing for internal unit tests, no engine required, set
1957  * results on out_data/out_size variables.
1958  */
flb_sp_test_do(struct flb_sp * sp,struct flb_sp_task * task,const char * tag,int tag_len,const char * buf_data,size_t buf_size,char ** out_data,size_t * out_size)1959 int flb_sp_test_do(struct flb_sp *sp, struct flb_sp_task *task,
1960                    const char *tag, int tag_len,
1961                    const char *buf_data, size_t buf_size,
1962                    char **out_data, size_t *out_size)
1963 {
1964     int ret;
1965     int records;
1966     struct flb_sp_cmd *cmd;
1967 
1968     cmd = task->cmd;
1969     if (cmd->source_type == FLB_SP_TAG) {
1970         ret = flb_router_match(tag, tag_len, cmd->source_name, NULL);
1971         if (ret == FLB_FALSE) {
1972             *out_data = NULL;
1973             *out_size = 0;
1974             return 0;
1975         }
1976     }
1977 
1978     if (task->aggregate_keys == FLB_TRUE) {
1979         ret = sp_process_data_aggr(buf_data, buf_size,
1980                                    tag, tag_len,
1981                                    task, sp);
1982         if (ret == -1) {
1983             flb_error("[sp] error error processing records for '%s'",
1984                       task->name);
1985             return -1;
1986         }
1987 
1988         if (flb_sp_window_populate(task, buf_data, buf_size) == -1) {
1989             flb_error("[sp] error populating window for '%s'",
1990                       task->name);
1991             return -1;
1992         }
1993 
1994         if (task->window.type == FLB_SP_WINDOW_DEFAULT) {
1995             package_results(tag, tag_len, out_data, out_size, task);
1996         }
1997 
1998         records = task->window.records;
1999     }
2000     else {
2001         ret = sp_process_data(tag, tag_len,
2002                               buf_data, buf_size,
2003                               out_data, out_size,
2004                               task, sp);
2005         if (ret == -1) {
2006             flb_error("[sp] error processing records for '%s'",
2007                       task->name);
2008             return -1;
2009         }
2010         records = ret;
2011     }
2012 
2013     if (records == 0) {
2014         *out_data = NULL;
2015         *out_size = 0;
2016         return 0;
2017     }
2018 
2019     return 0;
2020 }
2021 
2022 /* Iterate and find input chunks to process */
flb_sp_do(struct flb_sp * sp,struct flb_input_instance * in,const char * tag,int tag_len,const char * buf_data,size_t buf_size)2023 int flb_sp_do(struct flb_sp *sp, struct flb_input_instance *in,
2024               const char *tag, int tag_len,
2025               const char *buf_data, size_t buf_size)
2026 
2027 {
2028     int ret;
2029     size_t out_size;
2030     char *out_buf;
2031     struct mk_list *head;
2032     struct flb_sp_task *task;
2033     struct flb_sp_cmd *cmd;
2034 
2035     /* Lookup tasks that match the incoming instance data */
2036     mk_list_foreach(head, &sp->tasks) {
2037         task = mk_list_entry(head, struct flb_sp_task, _head);
2038         cmd = task->cmd;
2039 
2040         if (cmd->source_type == FLB_SP_STREAM) {
2041             if (task->source_instance != in) {
2042                 continue;
2043             }
2044         }
2045         else if (cmd->source_type == FLB_SP_TAG) {
2046             ret = flb_router_match(tag, tag_len, cmd->source_name, NULL);
2047             if (ret == FLB_FALSE) {
2048                 continue;
2049             }
2050         }
2051 
2052         /* We found a task that matches the stream rule */
2053         if (task->aggregate_keys == FLB_TRUE) {
2054             ret = sp_process_data_aggr(buf_data, buf_size,
2055                                        tag, tag_len,
2056                                        task, sp);
2057 
2058             if (ret == -1) {
2059                 flb_error("[sp] error processing records for '%s'",
2060                           task->name);
2061                 continue;
2062             }
2063 
2064             if (flb_sp_window_populate(task, buf_data, buf_size) == -1) {
2065                 flb_error("[sp] error populating window for '%s'",
2066                           task->name);
2067                 continue;
2068             }
2069 
2070             if (task->window.type == FLB_SP_WINDOW_DEFAULT) {
2071                 package_results(tag, tag_len, &out_buf, &out_size, task);
2072                 flb_sp_window_prune(task);
2073             }
2074         }
2075         else {
2076             ret = sp_process_data(tag, tag_len,
2077                                   buf_data, buf_size,
2078                                   &out_buf, &out_size,
2079                                   task, sp);
2080 
2081             if (ret == -1) {
2082                 flb_error("[sp] error processing records for '%s'",
2083                           task->name);
2084                 continue;
2085             }
2086         }
2087 
2088         if (ret == 0) {
2089             /* no records */
2090             continue;
2091         }
2092 
2093         /*
2094          * This task involves append data to a stream, which
2095          * means: register the output of the query as data
2096          * generated by an input instance plugin.
2097          */
2098         if (task->aggregate_keys != FLB_TRUE ||
2099             task->window.type == FLB_SP_WINDOW_DEFAULT) {
2100             /*
2101              * Add to stream processing stream if there is no
2102              * aggregation function. Otherwise, write it at timer event
2103              */
2104             if (task->stream) {
2105                 flb_sp_stream_append_data(out_buf, out_size, task->stream);
2106             }
2107             else {
2108                 flb_pack_print(out_buf, out_size);
2109                 flb_free(out_buf);
2110             }
2111         }
2112     }
2113 
2114     return -1;
2115 }
2116 
flb_sp_fd_event(int fd,struct flb_sp * sp)2117 int flb_sp_fd_event(int fd, struct flb_sp *sp)
2118 {
2119     bool update_timer_event;
2120     char *out_buf;
2121     char *tag = NULL;
2122     int tag_len = 0;
2123     int fd_timeout = 0;
2124     size_t out_size;
2125     struct mk_list *tmp;
2126     struct mk_list *head;
2127     struct flb_sp_task *task;
2128     struct flb_input_instance *in = NULL;
2129 
2130     /* Lookup Tasks that matches the incoming event */
2131     mk_list_foreach_safe(head, tmp, &sp->tasks) {
2132         task = mk_list_entry(head, struct flb_sp_task, _head);
2133 
2134         if (fd == task->window.fd) {
2135             update_timer_event = task->window.type == FLB_SP_WINDOW_HOPPING &&
2136                                  task->window.first_hop;
2137 
2138             in = task->source_instance;
2139             if (in) {
2140                 if (in->tag && in->tag_len > 0) {
2141                     tag = in->tag;
2142                     tag_len = in->tag_len;
2143                 }
2144                 else {
2145                     tag = in->name;
2146                     tag_len = strlen(in->name);
2147                 }
2148             }
2149             else {
2150                 in = NULL;
2151             }
2152 
2153             if (task->window.records > 0) {
2154                 /* find input tag from task source */
2155                 package_results(tag, tag_len, &out_buf, &out_size, task);
2156                 if (task->stream) {
2157                     flb_sp_stream_append_data(out_buf, out_size, task->stream);
2158                 }
2159                 else {
2160                     flb_pack_print(out_buf, out_size);
2161                     flb_free(out_buf);
2162                 }
2163 
2164             }
2165 
2166             flb_sp_window_prune(task);
2167 
2168             flb_utils_timer_consume(fd);
2169 
2170             if (update_timer_event && in) {
2171                 task->window.first_hop = false;
2172                 mk_event_timeout_destroy(in->config->evl, &task->window.event);
2173                 mk_event_closesocket(fd);
2174 
2175                 fd_timeout = mk_event_timeout_create(in->config->evl,
2176                                                      task->window.advance_by, (long) 0,
2177                                                      &task->window.event);
2178                 if (fd_timeout == -1) {
2179                     flb_error("[sp] registration for task (updating timer event) %s failed", task->name);
2180                     return -1;
2181                 }
2182                 task->window.fd = fd_timeout;
2183             }
2184 
2185             break;
2186         }
2187         else if (fd == task->window.fd_hop) {
2188             in = task->source_instance;
2189             if (in) {
2190                 if (in->tag && in->tag_len > 0) {
2191                     tag = in->tag;
2192                     tag_len = in->tag_len;
2193                 }
2194                 else {
2195                     tag = in->name;
2196                     tag_len = strlen(in->name);
2197                 }
2198             }
2199             sp_process_hopping_slot(tag, tag_len, task);
2200             flb_utils_timer_consume(fd);
2201         }
2202     }
2203     return 0;
2204 }
2205 
2206 /* Destroy stream processor context */
flb_sp_destroy(struct flb_sp * sp)2207 void flb_sp_destroy(struct flb_sp *sp)
2208 {
2209     struct mk_list *tmp;
2210     struct mk_list *head;
2211     struct flb_sp_task *task;
2212 
2213     /* destroy tasks */
2214     mk_list_foreach_safe(head, tmp, &sp->tasks) {
2215         task = mk_list_entry(head, struct flb_sp_task, _head);
2216         flb_sp_task_destroy(task);
2217     }
2218 
2219     flb_free(sp);
2220 }
2221 
flb_sp_test_fd_event(int fd,struct flb_sp_task * task,char ** out_data,size_t * out_size)2222 int flb_sp_test_fd_event(int fd, struct flb_sp_task *task, char **out_data,
2223                          size_t *out_size)
2224 {
2225     char *tag = NULL;
2226     int tag_len = 0;
2227 
2228     if (task->window.type != FLB_SP_WINDOW_DEFAULT) {
2229         if (fd == task->window.fd) {
2230             if (task->window.records > 0) {
2231                 /* find input tag from task source */
2232                 package_results(tag, tag_len, out_data, out_size, task);
2233                 if (task->stream) {
2234                     flb_sp_stream_append_data(*out_data, *out_size, task->stream);
2235                 }
2236                 else {
2237                     flb_pack_print(*out_data, *out_size);
2238                 }
2239             }
2240 
2241             flb_sp_window_prune(task);
2242         }
2243         else if (fd == task->window.fd_hop) {
2244             sp_process_hopping_slot(tag, tag_len, task);
2245         }
2246     }
2247 
2248     return 0;
2249 }
2250