1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Fluent Bit
4 * ==========
5 * Copyright (C) 2019 The Fluent Bit Authors
6 * Copyright (C) 2015-2018 Treasure Data Inc.
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <fluent-bit/flb_info.h>
22 #include <fluent-bit/flb_log.h>
23 #include <fluent-bit/flb_sds.h>
24 #include <fluent-bit/flb_mem.h>
25 #include <fluent-bit/flb_slist.h>
26 #include <fluent-bit/flb_utils.h>
27 #include <fluent-bit/flb_time.h>
28 #include <fluent-bit/flb_input.h>
29 #include <fluent-bit/flb_pack.h>
30 #include <fluent-bit/flb_router.h>
31 #include <fluent-bit/stream_processor/flb_sp.h>
32 #include <fluent-bit/stream_processor/flb_sp_key.h>
33 #include <fluent-bit/stream_processor/flb_sp_stream.h>
34 #include <fluent-bit/stream_processor/flb_sp_snapshot.h>
35 #include <fluent-bit/stream_processor/flb_sp_parser.h>
36 #include <fluent-bit/stream_processor/flb_sp_func_time.h>
37 #include <fluent-bit/stream_processor/flb_sp_func_record.h>
38 #include <fluent-bit/stream_processor/flb_sp_aggregate_func.h>
39 #include <fluent-bit/stream_processor/flb_sp_window.h>
40 #include <fluent-bit/stream_processor/flb_sp_groupby.h>
41
42 #include <stdlib.h>
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #ifndef _WIN32
46 #include <unistd.h>
47 #endif
48
49 /* don't do this at home */
50 #define pack_uint16(buf, d) _msgpack_store16(buf, (uint16_t) d)
51 #define pack_uint32(buf, d) _msgpack_store32(buf, (uint32_t) d)
52
53 /* String type to numerical conversion */
54 #define FLB_STR_INT 1
55 #define FLB_STR_FLOAT 2
56
57 /* Read and process file system configuration file */
sp_config_file(struct flb_config * config,struct flb_sp * sp,const char * file)58 static int sp_config_file(struct flb_config *config, struct flb_sp *sp,
59 const char *file)
60 {
61 int ret;
62 char *name;
63 char *exec;
64 const char *cfg = NULL;
65 char tmp[PATH_MAX + 1];
66 struct stat st;
67 struct mk_rconf *fconf;
68 struct mk_rconf_section *section;
69 struct mk_list *head;
70 struct flb_sp_task *task;
71
72 #ifndef FLB_HAVE_STATIC_CONF
73 ret = stat(file, &st);
74 if (ret == -1 && errno == ENOENT) {
75 /* Try to resolve the real path (if exists) */
76 if (file[0] == '/') {
77 flb_error("[sp] cannot open configuration file: %s", file);
78 return -1;
79 }
80
81 if (config->conf_path) {
82 snprintf(tmp, PATH_MAX, "%s%s", config->conf_path, file);
83 cfg = tmp;
84 }
85 }
86 else {
87 cfg = file;
88 }
89
90 fconf = mk_rconf_open(cfg);
91 #else
92 fconf = flb_config_static_open(file);
93 #endif
94
95 if (!fconf) {
96 return -1;
97 }
98
99 /* Read all [STREAM_TASK] sections */
100 mk_list_foreach(head, &fconf->sections) {
101 section = mk_list_entry(head, struct mk_rconf_section, _head);
102 if (strcasecmp(section->name, "STREAM_TASK") != 0) {
103 continue;
104 }
105
106 name = NULL;
107 exec = NULL;
108
109 /* Name */
110 name = mk_rconf_section_get_key(section, "Name", MK_RCONF_STR);
111 if (!name) {
112 flb_error("[sp] task 'name' not found in file '%s'", cfg);
113 goto fconf_error;
114 }
115
116 /* Exec */
117 exec = mk_rconf_section_get_key(section, "Exec", MK_RCONF_STR);
118 if (!exec) {
119 flb_error("[sp] task '%s' don't have an 'exec' command", name);
120 goto fconf_error;
121 }
122
123 /* Register the task */
124 task = flb_sp_task_create(sp, name, exec);
125 if (!task) {
126 goto fconf_error;
127 }
128
129 flb_free(name);
130 flb_free(exec);
131 }
132
133 mk_rconf_free(fconf);
134 return 0;
135
136 fconf_error:
137 flb_free(name);
138 flb_free(exec);
139
140 return -1;
141 }
142
sp_task_to_instance(struct flb_sp_task * task,struct flb_sp * sp)143 static int sp_task_to_instance(struct flb_sp_task *task, struct flb_sp *sp)
144 {
145 struct mk_list *head;
146 struct flb_input_instance *in;
147
148 if (task->cmd->source_type != FLB_SP_STREAM) {
149 return -1;
150 }
151
152 mk_list_foreach(head, &sp->config->inputs) {
153 in = mk_list_entry(head, struct flb_input_instance, _head);
154 if (in->alias) {
155 if (strcasecmp(in->alias, task->cmd->source_name) == 0) {
156 task->source_instance = in;
157 return 0;
158 }
159 }
160
161 if (strcasecmp(in->name, task->cmd->source_name) == 0) {
162 task->source_instance = in;
163 return 0;
164 }
165 }
166
167 return -1;
168 }
169
sp_info(struct flb_sp * sp)170 static void sp_info(struct flb_sp *sp)
171 {
172 struct mk_list *head;
173 struct flb_sp_task *task;
174
175 flb_info("[sp] stream processor started");
176
177 mk_list_foreach(head, &sp->tasks) {
178 task = mk_list_entry(head, struct flb_sp_task, _head);
179 flb_info("[sp] registered task: %s", task->name);
180 }
181 }
182
subkeys_compare(struct mk_list * subkeys1,struct mk_list * subkeys2)183 int subkeys_compare(struct mk_list *subkeys1, struct mk_list *subkeys2)
184 {
185 int i;
186 struct flb_slist_entry *entry1;
187 struct flb_slist_entry *entry2;
188
189 if (!subkeys1 && !subkeys2) {
190 return 0;
191 }
192
193 if (!subkeys1 || !subkeys2) {
194 return -1;
195 }
196
197 if (mk_list_size(subkeys1) != mk_list_size(subkeys2)) {
198 return -1;
199 }
200
201 entry1 = mk_list_entry_first(subkeys1, struct flb_slist_entry, _head);
202 entry2 = mk_list_entry_first(subkeys2, struct flb_slist_entry, _head);
203
204 for (i = 0; i < mk_list_size(subkeys1); i++) {
205 if (flb_sds_cmp(entry1->str, entry2->str, flb_sds_len(entry2->str)) != 0) {
206 return -1;
207 }
208
209 entry1 = mk_list_entry_next(&entry1->_head, struct flb_slist_entry,
210 _head, subkeys1);
211 entry2 = mk_list_entry_next(&entry2->_head, struct flb_slist_entry,
212 _head, subkeys2);
213 }
214
215 return 0;
216 }
217
sp_cmd_aggregated_keys(struct flb_sp_cmd * cmd)218 static int sp_cmd_aggregated_keys(struct flb_sp_cmd *cmd)
219 {
220 int aggr = 0;
221 int not_aggr = 0;
222 struct mk_list *head;
223 struct mk_list *head_gb;
224 struct flb_sp_cmd_key *key;
225 struct flb_sp_cmd_gb_key *gb_key;
226
227 mk_list_foreach(head, &cmd->keys) {
228 key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
229 if (key->time_func > 0 || key->record_func > 0) {
230 continue;
231 }
232
233 if (key->aggr_func > 0) {
234 /* AVG, SUM, COUNT or timeseries functions */
235 aggr++;
236 }
237 else {
238 mk_list_foreach(head_gb, &cmd->gb_keys) {
239 gb_key = mk_list_entry(head_gb, struct flb_sp_cmd_gb_key, _head);
240
241 if (!key->name) { /* Key name is a wildcard '*' */
242 break;
243 }
244
245 if (flb_sds_cmp(key->name, gb_key->name,
246 flb_sds_len(gb_key->name)) == 0) {
247 if (subkeys_compare(key->subkeys, gb_key->subkeys) != 0) {
248 continue;
249 }
250
251 not_aggr--;
252
253 /* Map key selector with group-by */
254 key->gb_key = gb_key;
255 break;
256 }
257 }
258
259 not_aggr++;
260 }
261 }
262
263 /*
264 * if some aggregated function is required, not aggregated keys are
265 * not allowed so we return an error (-1).
266 */
267 if (aggr > 0 && not_aggr == 0) {
268 return aggr;
269 }
270 else if (aggr > 0 && not_aggr > 0) {
271 return -1;
272 }
273
274 return 0;
275 }
276
277 /*
278 * Convert a string to a numerical representation:
279 *
280 * - if output number is an integer, 'i' is set and returns FLB_STR_INT
281 * - if output number is a float, 'd' is set and returns FLB_STR_FLOAT
282 * - if no conversion is possible (not a number), returns -1
283 */
string_to_number(const char * str,int len,int64_t * i,double * d)284 static int string_to_number(const char *str, int len, int64_t *i, double *d)
285 {
286 int c;
287 int dots = 0;
288 char *end;
289 int64_t i_out;
290 double d_out;
291
292 /* Detect if this is a floating point number */
293 for (c = 0; c < len; c++) {
294 if (str[c] == '.') {
295 dots++;
296 }
297 }
298
299 if (dots > 1) {
300 return -1;
301 }
302 else if (dots == 1) {
303 /* Floating point number */
304 errno = 0;
305 d_out = strtold(str, &end);
306
307 /* Check for various possible errors */
308 if ((errno == ERANGE || (errno != 0 && d_out == 0))) {
309 return -1;
310 }
311
312 if (end == str) {
313 return -1;
314 }
315
316 *d = d_out;
317 return FLB_STR_FLOAT;
318 }
319 else {
320 /* Integer */
321 errno = 0;
322 i_out = strtoll(str, &end, 10);
323
324 /* Check for various possible errors */
325 if ((errno == ERANGE || (errno != 0 && i_out == 0))) {
326 return -1;
327 }
328
329 if (end == str) {
330 return -1;
331 }
332
333 *i = i_out;
334 return FLB_STR_INT;
335 }
336
337 return -1;
338 }
339
340 /*
341 * Convert a msgpack object value to a number 'if possible'. The conversion
342 * result is either stored on 'i' for 64 bits integers or in 'd' for
343 * float/doubles.
344 *
345 * This function aims to take care of strings representing a value too.
346 */
object_to_number(msgpack_object obj,int64_t * i,double * d)347 static int object_to_number(msgpack_object obj, int64_t *i, double *d)
348 {
349 int ret;
350 int64_t i_out;
351 double d_out;
352 char str_num[20];
353
354 if (obj.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
355 obj.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
356 *i = obj.via.i64;
357 return FLB_STR_INT;
358 }
359 else if (obj.type == MSGPACK_OBJECT_FLOAT32 ||
360 obj.type == MSGPACK_OBJECT_FLOAT) {
361 *d = obj.via.f64;
362 return FLB_STR_FLOAT;
363 }
364 else if (obj.type == MSGPACK_OBJECT_STR) {
365 /* A numeric representation of a string should not exceed 19 chars */
366 if (obj.via.str.size > 19) {
367 return -1;
368 }
369
370 memcpy(str_num, obj.via.str.ptr, obj.via.str.size);
371 str_num[obj.via.str.size] = '\0';
372
373 ret = string_to_number(str_num, obj.via.str.size,
374 &i_out, &d_out);
375 if (ret == FLB_STR_FLOAT) {
376 *d = d_out;
377 return FLB_STR_FLOAT;
378 }
379 else if (ret == FLB_STR_INT) {
380 *i = i_out;
381 return FLB_STR_INT;
382 }
383 }
384
385 return -1;
386 }
387
flb_sp_snapshot_create(struct flb_sp_task * task)388 int flb_sp_snapshot_create(struct flb_sp_task *task)
389 {
390 struct flb_sp_cmd *cmd;
391 struct flb_sp_snapshot *snapshot;
392
393 cmd = task->cmd;
394
395 snapshot = (struct flb_sp_snapshot *) flb_calloc(1, sizeof(struct flb_sp_snapshot));
396 if (!snapshot) {
397 flb_error("[sp] could not create snapshot '%s'", cmd->stream_name);
398 return -1;
399 }
400
401 mk_list_init(&snapshot->pages);
402 snapshot->record_limit = cmd->limit;
403
404 if (flb_sp_cmd_stream_prop_get(cmd, "seconds") != NULL) {
405 snapshot->time_limit = atoi(flb_sp_cmd_stream_prop_get(cmd, "seconds"));
406 }
407
408 if (snapshot->time_limit == 0 && snapshot->record_limit == 0) {
409 flb_error("[sp] could not create snapshot '%s': size is not defined",
410 cmd->stream_name);
411 flb_sp_snapshot_destroy(snapshot);
412 return -1;
413 }
414
415 task->snapshot = snapshot;
416 return 0;
417 }
418
flb_sp_task_create(struct flb_sp * sp,const char * name,const char * query)419 struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
420 const char *query)
421 {
422 int fd;
423 int ret;
424 struct mk_event *event;
425 struct flb_sp_cmd *cmd;
426 struct flb_sp_task *task;
427
428 /*
429 * Parse and validate the incoming exec query and create the 'command'
430 * context (this will be associated to the task in a later step
431 */
432 cmd = flb_sp_cmd_create(query);
433
434 if (!cmd) {
435 flb_error("[sp] invalid query on task '%s': '%s'", name, query);
436 return NULL;
437 }
438
439 /* Check if we got an invalid type due an error/restriction */
440 if (cmd->status == FLB_SP_ERROR) {
441 flb_error("[sp] invalid query on task '%s': '%s'", name, query);
442 flb_sp_cmd_destroy(cmd);
443 return NULL;
444 }
445
446 /* Create the task context */
447 task = flb_calloc(1, sizeof(struct flb_sp_task));
448 if (!task) {
449 flb_errno();
450 flb_sp_cmd_destroy(cmd);
451 return NULL;
452 }
453 task->name = flb_sds_create(name);
454 if (!task->name) {
455 flb_free(task);
456 flb_sp_cmd_destroy(cmd);
457 return NULL;
458 }
459
460 task->query = flb_sds_create(query);
461 if (!task->query) {
462 flb_sds_destroy(task->name);
463 flb_free(task);
464 flb_sp_cmd_destroy(cmd);
465 return NULL;
466 }
467
468 task->sp = sp;
469 task->cmd = cmd;
470 mk_list_add(&task->_head, &sp->tasks);
471
472 /*
473 * Assume no aggregated keys exists, if so, a different strategy is
474 * required to process the records.
475 */
476 task->aggregate_keys = FLB_FALSE;
477
478 mk_list_init(&task->window.data);
479 mk_list_init(&task->window.aggregate_list);
480 rb_tree_new(&task->window.aggregate_tree, flb_sp_groupby_compare);
481
482 mk_list_init(&task->window.hopping_slot);
483
484 /* Check and validate aggregated keys */
485 ret = sp_cmd_aggregated_keys(task->cmd);
486 if (ret == -1) {
487 flb_error("[sp] aggregated query cannot mix not aggregated keys: %s",
488 query);
489 flb_sp_task_destroy(task);
490 return NULL;
491 }
492 else if (ret > 0) {
493 task->aggregate_keys = FLB_TRUE;
494
495 task->window.type = cmd->window.type;
496
497 /* Register a timer event when task contains aggregation rules */
498 if (task->window.type != FLB_SP_WINDOW_DEFAULT) {
499 /* Initialize event loop context */
500 event = &task->window.event;
501 MK_EVENT_ZERO(event);
502
503 /* Run every 'size' seconds */
504 fd = mk_event_timeout_create(sp->config->evl,
505 cmd->window.size, (long) 0,
506 &task->window.event);
507 if (fd == -1) {
508 flb_error("[sp] registration for task %s failed", task->name);
509 flb_free(task);
510 return NULL;
511 }
512 task->window.fd = fd;
513
514 if (task->window.type == FLB_SP_WINDOW_HOPPING) {
515 /* Initialize event loop context */
516 event = &task->window.event_hop;
517 MK_EVENT_ZERO(event);
518
519 /* Run every 'size' seconds */
520 fd = mk_event_timeout_create(sp->config->evl,
521 cmd->window.advance_by, (long) 0,
522 &task->window.event_hop);
523 if (fd == -1) {
524 flb_error("[sp] registration for task %s failed", task->name);
525 flb_free(task);
526 return NULL;
527 }
528 task->window.advance_by = cmd->window.advance_by;
529 task->window.fd_hop = fd;
530 task->window.first_hop = true;
531 }
532 }
533 }
534
535 /* Init snapshot page list */
536 if (cmd->type == FLB_SP_CREATE_SNAPSHOT) {
537 if (flb_sp_snapshot_create(task) == -1) {
538 flb_sp_task_destroy(task);
539 return NULL;
540 }
541 }
542
543 /*
544 * If the task involves a stream creation (CREATE STREAM abc..), create
545 * the stream.
546 */
547 if (cmd->type == FLB_SP_CREATE_STREAM ||
548 cmd->type == FLB_SP_CREATE_SNAPSHOT ||
549 cmd->type == FLB_SP_FLUSH_SNAPSHOT) {
550
551 ret = flb_sp_stream_create(cmd->stream_name, task, sp);
552 if (ret == -1) {
553 flb_error("[sp] could not create stream '%s'", cmd->stream_name);
554 flb_sp_task_destroy(task);
555 return NULL;
556 }
557 }
558
559 /*
560 * Based in the command type, check if the source of data is a known
561 * stream so make a reference on this task for a quick comparisson and
562 * access it when processing data.
563 */
564 sp_task_to_instance(task, sp);
565 return task;
566 }
567
groupby_nums_destroy(struct aggregate_num * groupby_nums,int size)568 void groupby_nums_destroy(struct aggregate_num *groupby_nums, int size)
569 {
570 int i;
571
572 for (i = 0; i < size; i++) {
573 if (groupby_nums[i].type == FLB_SP_STRING) {
574 flb_sds_destroy(groupby_nums[i].string);
575 }
576 }
577
578 flb_free(groupby_nums);
579 }
580
581 /*
582 * Destroy aggregation node context: before to use this function make sure
583 * to unlink from the linked list.
584 */
flb_sp_aggregate_node_destroy(struct flb_sp_cmd * cmd,struct aggregate_node * aggr_node)585 void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd,
586 struct aggregate_node *aggr_node)
587 {
588 int i;
589 int key_id;
590 struct mk_list *head;
591 struct aggregate_num *num;
592 struct flb_sp_cmd_key *ckey;
593
594 for (i = 0; i < aggr_node->nums_size; i++) {
595 num = &aggr_node->nums[i];
596 if (num->type == FLB_SP_STRING) {
597 flb_sds_destroy(num->string);
598 }
599 }
600
601 groupby_nums_destroy(aggr_node->groupby_nums, aggr_node->groupby_keys);
602
603 key_id = 0;
604 mk_list_foreach(head, &cmd->keys) {
605 ckey = mk_list_entry(head, struct flb_sp_cmd_key, _head);
606
607 if (!ckey->aggr_func) {
608 key_id++;
609 continue;
610 }
611
612 aggregate_func_destroy[ckey->aggr_func - 1](aggr_node, key_id);
613 key_id++;
614 }
615
616 flb_free(aggr_node->nums);
617 flb_free(aggr_node->aggregate_data);
618 flb_free(aggr_node);
619 }
620
flb_sp_window_destroy(struct flb_sp_cmd * cmd,struct flb_sp_task_window * window)621 void flb_sp_window_destroy(struct flb_sp_cmd *cmd,
622 struct flb_sp_task_window *window)
623 {
624 struct flb_sp_window_data *data;
625 struct aggregate_node *aggr_node;
626 struct flb_sp_hopping_slot *hs;
627 struct mk_list *head;
628 struct mk_list *tmp;
629 struct mk_list *head_hs;
630 struct mk_list *tmp_hs;
631
632 mk_list_foreach_safe(head, tmp, &window->data) {
633 data = mk_list_entry(head, struct flb_sp_window_data, _head);
634 flb_free(data->buf_data);
635 mk_list_del(&data->_head);
636 flb_free(data);
637 }
638
639 mk_list_foreach_safe(head, tmp, &window->aggregate_list) {
640 aggr_node = mk_list_entry(head, struct aggregate_node, _head);
641 mk_list_del(&aggr_node->_head);
642 flb_sp_aggregate_node_destroy(cmd, aggr_node);
643 }
644
645 mk_list_foreach_safe(head, tmp, &window->hopping_slot) {
646 hs = mk_list_entry(head, struct flb_sp_hopping_slot, _head);
647 mk_list_foreach_safe(head_hs, tmp_hs, &hs->aggregate_list) {
648 aggr_node = mk_list_entry(head_hs, struct aggregate_node, _head);
649 mk_list_del(&aggr_node->_head);
650 flb_sp_aggregate_node_destroy(cmd, aggr_node);
651 }
652 rb_tree_destroy(&hs->aggregate_tree);
653 flb_free(hs);
654 }
655
656 rb_tree_destroy(&window->aggregate_tree);
657 }
658
flb_sp_task_destroy(struct flb_sp_task * task)659 void flb_sp_task_destroy(struct flb_sp_task *task)
660 {
661 flb_sds_destroy(task->name);
662 flb_sds_destroy(task->query);
663 flb_sp_window_destroy(task->cmd, &task->window);
664 flb_sp_snapshot_destroy(task->snapshot);
665 mk_list_del(&task->_head);
666
667 if (task->stream) {
668 flb_sp_stream_destroy(task->stream, task->sp);
669 }
670
671 flb_sp_cmd_destroy(task->cmd);
672 flb_free(task);
673 }
674
675 /* Create the stream processor context */
flb_sp_create(struct flb_config * config)676 struct flb_sp *flb_sp_create(struct flb_config *config)
677 {
678 int i = 0;
679 int ret;
680 char buf[32];
681 struct mk_list *head;
682 struct flb_sp *sp;
683 struct flb_slist_entry *e;
684 struct flb_sp_task *task;
685
686 /* Allocate context */
687 sp = flb_malloc(sizeof(struct flb_sp));
688 if (!sp) {
689 flb_errno();
690 return NULL;
691 }
692 sp->config = config;
693 mk_list_init(&sp->tasks);
694
695 /* Check for pre-configured Tasks (command line) */
696 mk_list_foreach(head, &config->stream_processor_tasks) {
697 e = mk_list_entry(head, struct flb_slist_entry, _head);
698 snprintf(buf, sizeof(buf) - 1, "flb-console:%i", i);
699 i++;
700 task = flb_sp_task_create(sp, buf, e->str);
701 if (!task) {
702 continue;
703 }
704 }
705
706 /* Lookup configuration file if any */
707 if (config->stream_processor_file) {
708 ret = sp_config_file(config, sp, config->stream_processor_file);
709 if (ret == -1) {
710 flb_error("[sp] could not initialize stream processor");
711 flb_sp_destroy(sp);
712 return NULL;
713 }
714 }
715
716 /* Write sp info to stdout */
717 sp_info(sp);
718
719 return sp;
720 }
721
free_value(struct flb_exp_val * v)722 void free_value(struct flb_exp_val *v)
723 {
724 if (!v) {
725 return;
726 }
727
728 if (v->type == FLB_EXP_STRING) {
729 flb_sds_destroy(v->val.string);
730 }
731
732 flb_free(v);
733 }
734
itof_convert(struct flb_exp_val * val)735 static void itof_convert(struct flb_exp_val *val)
736 {
737 if (val->type != FLB_EXP_INT) {
738 return;
739 }
740
741 val->type = FLB_EXP_FLOAT;
742 val->val.f64 = (double) val->val.i64;
743 }
744
745 /* Convert (string) expression to number */
exp_string_to_number(struct flb_exp_val * val)746 static void exp_string_to_number(struct flb_exp_val *val)
747 {
748 int ret;
749 int len;
750 int64_t i = 0;
751 char *str;
752 double d = 0.0;
753
754 len = flb_sds_len(val->val.string);
755 str = val->val.string;
756
757 ret = string_to_number(str, len, &i, &d);
758 if (ret == -1) {
759 return;
760 }
761
762 /* Assign to proper type */
763 if (ret == FLB_STR_FLOAT) {
764 flb_sds_destroy(val->val.string);
765 val->type = FLB_EXP_FLOAT;
766 val->val.f64 = d;
767 }
768 else if (ret == FLB_STR_INT) {
769 flb_sds_destroy(val->val.string);
770 val->type = FLB_EXP_INT;
771 val->val.i64 = i;
772 }
773 }
774
numerical_comp(struct flb_exp_val * left,struct flb_exp_val * right,struct flb_exp_val * result,int op)775 static void numerical_comp(struct flb_exp_val *left,
776 struct flb_exp_val *right,
777 struct flb_exp_val *result, int op)
778 {
779 result->type = FLB_EXP_BOOL;
780
781 if (left == NULL || right == NULL) {
782 result->val.boolean = false;
783 return;
784 }
785
786 /* Check if left expression value is a number, if so, convert it */
787 if (left->type == FLB_EXP_STRING && right->type != FLB_EXP_STRING) {
788 exp_string_to_number(left);
789 }
790
791 if (left->type == FLB_EXP_INT && right->type == FLB_EXP_FLOAT) {
792 itof_convert(left);
793 }
794 else if (left->type == FLB_EXP_FLOAT && right->type == FLB_EXP_INT) {
795 itof_convert(right);
796 }
797
798 switch (op) {
799 case FLB_EXP_EQ:
800 if (left->type == right->type) {
801 switch(left->type) {
802 case FLB_EXP_NULL:
803 result->val.boolean = true;
804 break;
805 case FLB_EXP_BOOL:
806 result->val.boolean = (left->val.boolean == right->val.boolean);
807 break;
808 case FLB_EXP_INT:
809 result->val.boolean = (left->val.i64 == right->val.i64);
810 break;
811 case FLB_EXP_FLOAT:
812 result->val.boolean = (left->val.f64 == right->val.f64);
813 break;
814 case FLB_EXP_STRING:
815 if (flb_sds_len(left->val.string) !=
816 flb_sds_len(right->val.string)) {
817 result->val.boolean = false;
818 }
819 else if (strncmp(left->val.string, right->val.string,
820 flb_sds_len(left->val.string)) != 0) {
821 result->val.boolean = false;
822 }
823 else {
824 result->val.boolean = true;
825 }
826 break;
827 default:
828 result->val.boolean = false;
829 break;
830 }
831 }
832 else {
833 result->val.boolean = false;
834 }
835 break;
836 case FLB_EXP_LT:
837 if (left->type == right->type) {
838 switch(left->type) {
839 case FLB_EXP_INT:
840 result->val.boolean = (left->val.i64 < right->val.i64);
841 break;
842 case FLB_EXP_FLOAT:
843 result->val.boolean = (left->val.f64 < right->val.f64);
844 break;
845 case FLB_EXP_STRING:
846 if (strncmp(left->val.string, right->val.string,
847 flb_sds_len(left->val.string)) < 0) {
848 result->val.boolean = true;
849 }
850 else {
851 result->val.boolean = false;
852 }
853 break;
854 default:
855 result->val.boolean = false;
856 break;
857 }
858 }
859 else {
860 result->val.boolean = false;
861 }
862 break;
863 case FLB_EXP_LTE:
864 if (left->type == right->type) {
865 switch(left->type) {
866 case FLB_EXP_INT:
867 result->val.boolean = (left->val.i64 <= right->val.i64);
868 break;
869 case FLB_EXP_FLOAT:
870 result->val.boolean = (left->val.f64 <= right->val.f64);
871 break;
872 case FLB_EXP_STRING:
873 if (strncmp(left->val.string, right->val.string,
874 flb_sds_len(left->val.string)) <= 0) {
875 result->val.boolean = true;
876 }
877 else {
878 result->val.boolean = false;
879 }
880 break;
881 default:
882 result->val.boolean = false;
883 break;
884 }
885 }
886 else {
887 result->val.boolean = false;
888 }
889 break;
890 case FLB_EXP_GT:
891 if (left->type == right->type) {
892 switch(left->type) {
893 case FLB_EXP_INT:
894 result->val.boolean = (left->val.i64 > right->val.i64);
895 break;
896 case FLB_EXP_FLOAT:
897 result->val.boolean = (left->val.f64 > right->val.f64);
898 break;
899 case FLB_EXP_STRING:
900 if (strncmp(left->val.string, right->val.string,
901 flb_sds_len(left->val.string)) > 0) {
902 result->val.boolean = true;
903 }
904 else {
905 result->val.boolean = false;
906 }
907 break;
908 default:
909 result->val.boolean = false;
910 break;
911 }
912 }
913 else {
914 result->val.boolean = false;
915 }
916 break;
917 case FLB_EXP_GTE:
918 if (left->type == right->type) {
919 switch(left->type) {
920 case FLB_EXP_INT:
921 result->val.boolean = (left->val.i64 >= right->val.i64);
922 break;
923 case FLB_EXP_FLOAT:
924 result->val.boolean = (left->val.f64 >= right->val.f64);
925 break;
926 case FLB_EXP_STRING:
927 if (strncmp(left->val.string, right->val.string,
928 flb_sds_len(left->val.string)) >= 0) {
929 result->val.boolean = true;
930 }
931 else {
932 result->val.boolean = false;
933 }
934 break;
935 default:
936 result->val.boolean = false;
937 break;
938 }
939 }
940 else {
941 result->val.boolean = false;
942 }
943 break;
944 }
945 }
946
value_to_bool(struct flb_exp_val * val)947 static bool value_to_bool(struct flb_exp_val *val) {
948 bool result = FLB_FALSE;
949
950 switch (val->type) {
951 case FLB_EXP_BOOL:
952 result = val->val.boolean;
953 break;
954 case FLB_EXP_INT:
955 result = val->val.i64 > 0;
956 break;
957 case FLB_EXP_FLOAT:
958 result = val->val.f64 > 0;
959 break;
960 case FLB_EXP_STRING:
961 result = true;
962 break;
963 }
964
965 return result;
966 }
967
968
logical_operation(struct flb_exp_val * left,struct flb_exp_val * right,struct flb_exp_val * result,int op)969 static void logical_operation(struct flb_exp_val *left,
970 struct flb_exp_val *right,
971 struct flb_exp_val *result, int op)
972 {
973 bool lval;
974 bool rval;
975
976 result->type = FLB_EXP_BOOL;
977
978 /* Null is always interpreted as false in a logical operation */
979 lval = left ? value_to_bool(left) : false;
980 rval = right ? value_to_bool(right) : false;
981
982 switch (op) {
983 case FLB_EXP_NOT:
984 result->val.boolean = !lval;
985 break;
986 case FLB_EXP_AND:
987 result->val.boolean = lval & rval;
988 break;
989 case FLB_EXP_OR:
990 result->val.boolean = lval | rval;
991 break;
992 }
993 }
994
reduce_expression(struct flb_exp * expression,const char * tag,int tag_len,struct flb_time * tms,msgpack_object * map)995 static struct flb_exp_val *reduce_expression(struct flb_exp *expression,
996 const char *tag, int tag_len,
997 struct flb_time *tms,
998 msgpack_object *map)
999 {
1000 int operation;
1001 flb_sds_t s;
1002 flb_sds_t tmp_sds = NULL;
1003 struct flb_exp_key *key;
1004 struct flb_sp_value *sval;
1005 struct flb_exp_val *ret, *left, *right;
1006 struct flb_exp_val *result;
1007
1008 if (!expression) {
1009 return NULL;
1010 }
1011
1012 result = flb_calloc(1, sizeof(struct flb_exp_val));
1013 if (!result) {
1014 flb_errno();
1015 return NULL;
1016 }
1017
1018 switch (expression->type) {
1019 case FLB_EXP_NULL:
1020 result->type = expression->type;
1021 break;
1022 case FLB_EXP_BOOL:
1023 result->type = expression->type;
1024 result->val.boolean = ((struct flb_exp_val *) expression)->val.boolean;
1025 break;
1026 case FLB_EXP_INT:
1027 result->type = expression->type;
1028 result->val.i64 = ((struct flb_exp_val *) expression)->val.i64;
1029 break;
1030 case FLB_EXP_FLOAT:
1031 result->type = expression->type;
1032 result->val.f64 = ((struct flb_exp_val *) expression)->val.f64;
1033 break;
1034 case FLB_EXP_STRING:
1035 s = ((struct flb_exp_val *) expression)->val.string;
1036 result->type = expression->type;
1037 result->val.string = flb_sds_create_size(flb_sds_len(s));
1038 tmp_sds = flb_sds_copy(result->val.string, s, flb_sds_len(s));
1039 if (tmp_sds != result->val.string) {
1040 result->val.string = tmp_sds;
1041 }
1042 break;
1043 case FLB_EXP_KEY:
1044 key = (struct flb_exp_key *) expression;
1045 sval = flb_sp_key_to_value(key->name, *map, key->subkeys);
1046 if (sval) {
1047 result->type = sval->type;
1048 result->val = sval->val;
1049 flb_free(sval);
1050 return result;
1051 }
1052 else {
1053 flb_free(result);
1054 return NULL;
1055 }
1056 break;
1057 case FLB_EXP_FUNC:
1058 /* we don't need result */
1059 flb_free(result);
1060 ret = reduce_expression(((struct flb_exp_func *) expression)->param,
1061 tag, tag_len, tms, map);
1062 result = ((struct flb_exp_func *) expression)->cb_func(tag, tag_len,
1063 tms, ret);
1064 free_value(ret);
1065 break;
1066 case FLB_LOGICAL_OP:
1067 left = reduce_expression(expression->left,
1068 tag, tag_len, tms, map);
1069 right = reduce_expression(expression->right,
1070 tag, tag_len, tms, map);
1071
1072 operation = ((struct flb_exp_op *) expression)->operation;
1073
1074 switch (operation) {
1075 case FLB_EXP_PAR:
1076 if (left == NULL) { /* Null is always interpreted as false in a
1077 logical operation */
1078 result->type = FLB_EXP_BOOL;
1079 result->val.boolean = false;
1080 }
1081 else { /* Left and right sides of a logical operation reduce to
1082 boolean values */
1083 result->type = FLB_EXP_BOOL;
1084 result->val.boolean = left->val.boolean;
1085 }
1086 break;
1087 case FLB_EXP_EQ:
1088 case FLB_EXP_LT:
1089 case FLB_EXP_LTE:
1090 case FLB_EXP_GT:
1091 case FLB_EXP_GTE:
1092 numerical_comp(left, right, result, operation);
1093 break;
1094 case FLB_EXP_NOT:
1095 case FLB_EXP_AND:
1096 case FLB_EXP_OR:
1097 logical_operation(left, right, result, operation);
1098 break;
1099 }
1100 free_value(left);
1101 free_value(right);
1102 }
1103 return result;
1104 }
1105
1106
package_results(const char * tag,int tag_len,char ** out_buf,size_t * out_size,struct flb_sp_task * task)1107 static void package_results(const char *tag, int tag_len,
1108 char **out_buf, size_t *out_size,
1109 struct flb_sp_task *task)
1110 {
1111 int i;
1112 int len;
1113 int map_entries;
1114 msgpack_sbuffer mp_sbuf;
1115 msgpack_packer mp_pck;
1116 struct aggregate_num *num;
1117 struct flb_time tm;
1118 struct flb_sp_cmd_key *ckey;
1119 struct flb_sp_cmd *cmd = task->cmd;
1120 struct mk_list *head;
1121 struct aggregate_node *aggr_node;
1122 struct flb_sp_cmd_gb_key *gb_key = NULL;
1123
1124 map_entries = mk_list_size(&cmd->keys);
1125
1126 msgpack_sbuffer_init(&mp_sbuf);
1127 msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1128
1129 mk_list_foreach(head, &task->window.aggregate_list) {
1130 aggr_node = mk_list_entry(head, struct aggregate_node, _head);
1131
1132 /* set outgoing array + map and it fixed size */
1133 msgpack_pack_array(&mp_pck, 2);
1134
1135 flb_time_get(&tm);
1136 flb_time_append_to_msgpack(&tm, &mp_pck, 0);
1137 msgpack_pack_map(&mp_pck, map_entries);
1138
1139 /* Packaging results */
1140 ckey = mk_list_entry_first(&cmd->keys, struct flb_sp_cmd_key, _head);
1141 for (i = 0; i < map_entries; i++) {
1142 num = &aggr_node->nums[i];
1143
1144 /* Check if there is a defined function */
1145 if (ckey->time_func > 0) {
1146 flb_sp_func_time(&mp_pck, ckey);
1147 goto next;
1148 }
1149 else if (ckey->record_func > 0) {
1150 flb_sp_func_record(tag, tag_len, &tm, &mp_pck, ckey);
1151 goto next;
1152 }
1153
1154 /* Pack key */
1155 if (ckey->alias) {
1156 msgpack_pack_str(&mp_pck, flb_sds_len(ckey->alias));
1157 msgpack_pack_str_body(&mp_pck,
1158 ckey->alias,
1159 flb_sds_len(ckey->alias));
1160 }
1161 else {
1162 len = 0;
1163 char *c_name;
1164 if (!ckey->name) {
1165 c_name = "*";
1166 }
1167 else {
1168 c_name = ckey->name;
1169 }
1170
1171 msgpack_pack_str(&mp_pck, len);
1172 msgpack_pack_str_body(&mp_pck, c_name, len);
1173 }
1174
1175 /*
1176 * If a group_by key is mapped as a source of this key,
1177 * change the 'num' reference to obtain the proper information
1178 * for the grouped key value.
1179 */
1180 if (ckey->gb_key != NULL) {
1181 gb_key = ckey->gb_key;
1182 if (aggr_node->groupby_keys > 0) {
1183 num = &aggr_node->groupby_nums[gb_key->id];
1184 }
1185 }
1186
1187 /* Pack value */
1188 switch (ckey->aggr_func) {
1189 case FLB_SP_NOP:
1190 if (num->type == FLB_SP_NUM_I64) {
1191 msgpack_pack_int64(&mp_pck, num->i64);
1192 }
1193 else if (num->type == FLB_SP_NUM_F64) {
1194 msgpack_pack_float(&mp_pck, num->f64);
1195 }
1196 else if (num->type == FLB_SP_STRING) {
1197 msgpack_pack_str(&mp_pck,
1198 flb_sds_len(num->string));
1199 msgpack_pack_str_body(&mp_pck,
1200 num->string,
1201 flb_sds_len(num->string));
1202 }
1203 else if (num->type == FLB_SP_BOOLEAN) {
1204 if (num->boolean) {
1205 msgpack_pack_true(&mp_pck);
1206 }
1207 else {
1208 msgpack_pack_false(&mp_pck);
1209 }
1210 }
1211 break;
1212 default:
1213 aggregate_func_calc[ckey->aggr_func - 1](aggr_node, ckey, &mp_pck, i);
1214 break;
1215 }
1216
1217 next:
1218 ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key,
1219 _head, &cmd->keys);
1220 }
1221 }
1222
1223 *out_buf = mp_sbuf.data;
1224 *out_size = mp_sbuf.size;
1225 }
1226
sp_process_aggregate_data(struct flb_sp_task * task,msgpack_object map)1227 static struct aggregate_node * sp_process_aggregate_data(struct flb_sp_task *task,
1228 msgpack_object map)
1229 {
1230 int i;
1231 int ret;
1232 int map_size;
1233 int key_id;
1234 int map_entries;
1235 int gb_entries;
1236 int values_found;
1237 int64_t ival;
1238 double dval;
1239 struct flb_sp_value *sval;
1240 struct aggregate_num *gb_nums;
1241 struct aggregate_node *aggr_node;
1242 struct flb_sp_cmd *cmd;
1243 struct flb_sp_cmd_gb_key *gb_key;
1244 struct mk_list *head;
1245 struct rb_tree_node *rb_result;
1246 msgpack_object key;
1247
1248 aggr_node = NULL;
1249 cmd = task->cmd;
1250 map_size = map.via.map.size;
1251 values_found = 0;
1252
1253 /* Number of expected output entries in the map */
1254 map_entries = mk_list_size(&cmd->keys);
1255 gb_entries = mk_list_size(&cmd->gb_keys);
1256
1257 if (gb_entries > 0) {
1258 gb_nums = flb_calloc(1, sizeof(struct aggregate_num) * gb_entries);
1259 if (!gb_nums) {
1260 return NULL;
1261 }
1262
1263 /* extract GROUP BY values */
1264 for (i = 0; i < map_size; i++) { /* extract group-by values */
1265 key = map.via.map.ptr[i].key;
1266
1267 key_id = 0;
1268 mk_list_foreach(head, &cmd->gb_keys) {
1269 gb_key = mk_list_entry(head, struct flb_sp_cmd_gb_key,
1270 _head);
1271 if (flb_sds_cmp(gb_key->name, key.via.str.ptr,
1272 key.via.str.size) != 0) {
1273 key_id++;
1274 continue;
1275 }
1276
1277 sval = flb_sp_key_to_value(gb_key->name, map, gb_key->subkeys);
1278 if (!sval) {
1279 /* If evaluation fails/sub-key doesn't exist */
1280 key_id++;
1281 continue;
1282 }
1283
1284 values_found++;
1285
1286 /* Convert string to number if that is possible */
1287 ret = object_to_number(sval->o, &ival, &dval);
1288 if (ret == -1) {
1289 if (sval->o.type == MSGPACK_OBJECT_STR) {
1290 gb_nums[key_id].type = FLB_SP_STRING;
1291 gb_nums[key_id].string =
1292 flb_sds_create_len(sval->o.via.str.ptr,
1293 sval->o.via.str.size);
1294 }
1295 else if (sval->o.type == MSGPACK_OBJECT_BOOLEAN) {
1296 gb_nums[key_id].type = FLB_SP_NUM_I64;
1297 gb_nums[key_id].i64 = sval->o.via.boolean;
1298 }
1299 }
1300 else if (ret == FLB_STR_INT) {
1301 gb_nums[key_id].type = FLB_SP_NUM_I64;
1302 gb_nums[key_id].i64 = ival;
1303 }
1304 else if (ret == FLB_STR_FLOAT) {
1305 gb_nums[key_id].type = FLB_SP_NUM_F64;
1306 gb_nums[key_id].f64 = dval;
1307 }
1308
1309 key_id++;
1310 flb_sp_key_value_destroy(sval);
1311 }
1312 }
1313
1314 /* if some GROUP BY keys are not found in the record */
1315 if (values_found < gb_entries) {
1316 groupby_nums_destroy(gb_nums, gb_entries);
1317 return NULL;
1318 }
1319
1320 aggr_node = (struct aggregate_node *) flb_calloc(1, sizeof(struct aggregate_node));
1321 if (!aggr_node) {
1322 flb_errno();
1323 groupby_nums_destroy(gb_nums, gb_entries);
1324 return NULL;
1325 }
1326
1327 aggr_node->groupby_keys = gb_entries;
1328 aggr_node->groupby_nums = gb_nums;
1329
1330 rb_tree_find_or_insert(&task->window.aggregate_tree, aggr_node, &aggr_node->_rb_head, &rb_result);
1331 if (&aggr_node->_rb_head != rb_result) {
1332 /* We don't need aggr_node anymore */
1333 flb_sp_aggregate_node_destroy(cmd, aggr_node);
1334
1335 aggr_node = container_of(rb_result, struct aggregate_node, _rb_head);
1336 container_of(rb_result, struct aggregate_node, _rb_head)->records++;
1337 }
1338 else {
1339 aggr_node->nums = flb_calloc(1, sizeof(struct aggregate_num) * map_entries);
1340 if (!aggr_node->nums) {
1341 flb_sp_aggregate_node_destroy(cmd, aggr_node);
1342 return NULL;
1343 }
1344 aggr_node->records = 1;
1345 aggr_node->nums_size = map_entries;
1346 aggr_node->aggregate_data = (struct aggregate_data **) flb_calloc(1, sizeof(struct aggregate_data *) * map_entries);
1347 mk_list_add(&aggr_node->_head, &task->window.aggregate_list);
1348 }
1349 }
1350 else { /* If query doesn't have GROUP BY */
1351 if (!mk_list_size(&task->window.aggregate_list)) {
1352 aggr_node = flb_calloc(1, sizeof(struct aggregate_node));
1353 if (!aggr_node) {
1354 flb_errno();
1355 return NULL;
1356 }
1357 aggr_node->nums = flb_calloc(1, sizeof(struct aggregate_num) * map_entries);
1358 if (!aggr_node->nums) {
1359 flb_sp_aggregate_node_destroy(cmd, aggr_node);
1360 return NULL;
1361 }
1362
1363 aggr_node->nums_size = map_entries;
1364 aggr_node->records = 1;
1365 aggr_node->aggregate_data = (struct aggregate_data **) flb_calloc(1, sizeof(struct aggregate_data *) * map_entries);
1366 mk_list_add(&aggr_node->_head, &task->window.aggregate_list);
1367 }
1368 else {
1369 aggr_node = mk_list_entry_first(&task->window.aggregate_list, struct aggregate_node, _head);
1370 aggr_node->records++;
1371 }
1372 }
1373
1374 return aggr_node;
1375 }
1376
1377 /*
1378 * Process data, task and it defined command involves the call of aggregation
1379 * functions (AVG, SUM, COUNT, MIN, MAX).
1380 */
sp_process_data_aggr(const char * buf_data,size_t buf_size,const char * tag,int tag_len,struct flb_sp_task * task,struct flb_sp * sp)1381 static int sp_process_data_aggr(const char *buf_data, size_t buf_size,
1382 const char *tag, int tag_len,
1383 struct flb_sp_task *task,
1384 struct flb_sp *sp)
1385 {
1386 int i;
1387 int ok;
1388 int ret;
1389 int map_size;
1390 int key_id;
1391 int values_found;
1392 size_t off;
1393 int64_t ival;
1394 double dval;
1395 msgpack_object root;
1396 msgpack_object map;
1397 msgpack_unpacked result;
1398 msgpack_object key;
1399 msgpack_object *obj;
1400 struct aggregate_num *nums = NULL;
1401 struct mk_list *head;
1402 struct flb_time tms;
1403 struct flb_sp_cmd *cmd = task->cmd;
1404 struct flb_sp_cmd_key *ckey;
1405 struct flb_sp_value *sval;
1406 struct flb_exp_val *condition;
1407 struct aggregate_node *aggr_node;
1408
1409 /* Number of expected output entries in the map */
1410 off = 0;
1411
1412 /* vars initialization */
1413 ok = MSGPACK_UNPACK_SUCCESS;
1414 msgpack_unpacked_init(&result);
1415
1416 /* Iterate incoming records */
1417 while (msgpack_unpack_next(&result, buf_data, buf_size, &off) == ok) {
1418 root = result.data;
1419
1420 /* extract timestamp */
1421 flb_time_pop_from_msgpack(&tms, &result, &obj);
1422
1423 /* get the map data and it size (number of items) */
1424 map = root.via.array.ptr[1];
1425 map_size = map.via.map.size;
1426
1427 /* Evaluate condition */
1428 if (cmd->condition) {
1429 condition = reduce_expression(cmd->condition,
1430 tag, tag_len, &tms, &map);
1431 if (!condition) {
1432 continue;
1433 }
1434 else if (!condition->val.boolean) {
1435 flb_free(condition);
1436 continue;
1437 }
1438 else {
1439 flb_free(condition);
1440 }
1441 }
1442
1443 aggr_node = sp_process_aggregate_data(task, map);
1444 if (!aggr_node)
1445 {
1446 continue;
1447 }
1448
1449 task->window.records++;
1450
1451 nums = aggr_node->nums;
1452
1453 values_found = 0;
1454 /* Iterate each map key and see if it matches any command key */
1455 for (i = 0; i < map_size; i++) {
1456 key = map.via.map.ptr[i].key;
1457
1458 if (key.type != MSGPACK_OBJECT_STR) {
1459 continue;
1460 }
1461
1462
1463 /*
1464 * Iterate each command key. Note that since the command key
1465 * can have different aggregation functions to the same key
1466 * we should compare all of them.
1467 */
1468 key_id = 0;
1469 mk_list_foreach(head, &cmd->keys) {
1470 ckey = mk_list_entry(head, struct flb_sp_cmd_key, _head);
1471
1472 if (!ckey->name) {
1473 key_id++;
1474 continue;
1475 }
1476
1477 if (flb_sds_cmp(ckey->name, key.via.str.ptr,
1478 key.via.str.size) != 0) {
1479 key_id++;
1480 continue;
1481 }
1482
1483 /* convert the value if it string */
1484 sval = flb_sp_key_to_value(ckey->name, map, ckey->subkeys);
1485 if (!sval) {
1486 key_id++;
1487 continue;
1488 }
1489
1490 values_found++;
1491
1492 /*
1493 * Convert value to a numeric representation only if key has an
1494 * assigned aggregation function
1495 */
1496 ival = 0;
1497 dval = 0.0;
1498 if (ckey->aggr_func != FLB_SP_NOP) {
1499 ret = object_to_number(sval->o, &ival, &dval);
1500 if (ret == -1) {
1501 /* Value cannot be represented as a number */
1502 key_id++;
1503 flb_sp_key_value_destroy(sval);
1504 continue;
1505 }
1506
1507 /*
1508 * If a floating pointer number exists, we use the same data
1509 * type for the output.
1510 */
1511 if (dval != 0.0 && nums[key_id].type == FLB_SP_NUM_I64) {
1512 nums[key_id].type = FLB_SP_NUM_F64;
1513 nums[key_id].f64 = (double) nums[key_id].i64;
1514 }
1515
1516 aggregate_func_add[ckey->aggr_func - 1](aggr_node, ckey, key_id, &tms, ival, dval);
1517 }
1518 else {
1519 if (sval->o.type == MSGPACK_OBJECT_BOOLEAN) {
1520 nums[key_id].type = FLB_SP_BOOLEAN;
1521 nums[key_id].boolean = sval->o.via.boolean;
1522 }
1523 if (sval->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
1524 sval->o.type == MSGPACK_OBJECT_NEGATIVE_INTEGER) {
1525 nums[key_id].type = FLB_SP_NUM_I64;
1526 nums[key_id].i64 = sval->o.via.i64;
1527 }
1528 else if (sval->o.type == MSGPACK_OBJECT_FLOAT32 ||
1529 sval->o.type == MSGPACK_OBJECT_FLOAT) {
1530 nums[key_id].type = FLB_SP_NUM_F64;
1531 nums[key_id].f64 = sval->o.via.f64;
1532 }
1533 else if (sval->o.type == MSGPACK_OBJECT_STR) {
1534 nums[key_id].type = FLB_SP_STRING;
1535 if (nums[key_id].string == NULL) {
1536 nums[key_id].string =
1537 flb_sds_create_len(sval->o.via.str.ptr,
1538 sval->o.via.str.size);
1539 }
1540 }
1541 }
1542
1543 key_id++;
1544 flb_sp_key_value_destroy(sval);
1545 }
1546 }
1547 }
1548
1549 msgpack_unpacked_destroy(&result);
1550 return task->window.records;
1551 }
1552
1553 /*
1554 * Data processing (no aggregation functions)
1555 */
sp_process_data(const char * tag,int tag_len,const char * buf_data,size_t buf_size,char ** out_buf,size_t * out_size,struct flb_sp_task * task,struct flb_sp * sp)1556 static int sp_process_data(const char *tag, int tag_len,
1557 const char *buf_data, size_t buf_size,
1558 char **out_buf, size_t *out_size,
1559 struct flb_sp_task *task,
1560 struct flb_sp *sp)
1561 {
1562 int i;
1563 int ok;
1564 int ret;
1565 int map_size;
1566 int map_entries;
1567 int records;
1568 uint8_t h;
1569 off_t map_off;
1570 off_t no_data;
1571 size_t off;
1572 size_t off_copy;
1573 size_t snapshot_out_size;
1574 char *tmp;
1575 char *snapshot_out_buffer;
1576 msgpack_object root;
1577 msgpack_object *obj;
1578 msgpack_object key;
1579 msgpack_object val;
1580 msgpack_unpacked result;
1581 msgpack_sbuffer mp_sbuf;
1582 msgpack_packer mp_pck;
1583 msgpack_object map;
1584 struct flb_time tms;
1585 struct mk_list *head;
1586 struct flb_sp_cmd *cmd;
1587 struct flb_sp_cmd_key *cmd_key;
1588 struct flb_exp_val *condition;
1589 struct flb_sp_value *sval;
1590
1591 /* Vars initialization */
1592 off = 0;
1593 off_copy = off;
1594 records = 0;
1595 cmd = task->cmd;
1596 ok = MSGPACK_UNPACK_SUCCESS;
1597 msgpack_unpacked_init(&result);
1598 msgpack_sbuffer_init(&mp_sbuf);
1599 msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
1600
1601 snapshot_out_size = 0;
1602 snapshot_out_buffer = NULL;
1603
1604 /* Iterate incoming records */
1605 while (msgpack_unpack_next(&result, buf_data, buf_size, &off) == ok) {
1606 root = result.data;
1607
1608 /* extract timestamp */
1609 flb_time_pop_from_msgpack(&tms, &result, &obj);
1610
1611 /* Store the buffer if the stream is a snapshot */
1612 if (cmd->type == FLB_SP_CREATE_SNAPSHOT) {
1613 flb_sp_snapshot_update(task, buf_data + off_copy, off - off_copy, &tms);
1614 off_copy = off;
1615 continue;
1616 }
1617
1618 /* get the map data and it size (number of items) */
1619 map = root.via.array.ptr[1];
1620 map_size = map.via.map.size;
1621
1622 /* Evaluate condition */
1623 if (cmd->condition) {
1624 condition = reduce_expression(cmd->condition,
1625 tag, tag_len, &tms, &map);
1626 if (!condition) {
1627 continue;
1628 }
1629 else if (!condition->val.boolean) {
1630 flb_free(condition);
1631 continue;
1632 }
1633 else {
1634 flb_free(condition);
1635 }
1636 }
1637
1638 records++;
1639
1640 /* Flush the snapshot if condition holds */
1641 if (cmd->type == FLB_SP_FLUSH_SNAPSHOT) {
1642 if (flb_sp_snapshot_flush(sp, task, &snapshot_out_buffer,
1643 &snapshot_out_size) == -1) {
1644 msgpack_unpacked_destroy(&result);
1645 msgpack_sbuffer_destroy(&mp_sbuf);
1646 return -1;
1647 }
1648 continue;
1649 }
1650
1651
1652 /*
1653 * If for some reason the Task keys did not insert any data, we will
1654 * need to discard any changes and reset the buffer position, let's
1655 * keep the memory size for that purpose.
1656 */
1657 no_data = mp_sbuf.size;
1658
1659 /* Pack main array */
1660 msgpack_pack_array(&mp_pck, 2);
1661 msgpack_pack_object(&mp_pck, root.via.array.ptr[0]);
1662
1663 /*
1664 * Save the current size/position of the buffer since this is
1665 * where the Map header will be stored.
1666 */
1667 map_off = mp_sbuf.size;
1668
1669 /*
1670 * In the new record register the same number of items, if due to
1671 * fields selection the number is lower, we perform an adjustment
1672 */
1673 msgpack_pack_map(&mp_pck, map_size);
1674
1675 /* Counter for new entries added to the outgoing map */
1676 map_entries = 0;
1677
1678 /* Iterate key selection */
1679 mk_list_foreach(head, &cmd->keys) {
1680 cmd_key = mk_list_entry(head, struct flb_sp_cmd_key, _head);
1681 if (cmd_key->time_func > 0) {
1682 /* Process time function */
1683 ret = flb_sp_func_time(&mp_pck, cmd_key);
1684 if (ret > 0) {
1685 map_entries += ret;
1686 }
1687 continue;
1688 }
1689 else if (cmd_key->record_func > 0) {
1690 ret = flb_sp_func_record(tag, tag_len, &tms, &mp_pck, cmd_key);
1691 if (ret > 0) {
1692 map_entries += ret;
1693 }
1694 continue;
1695 }
1696
1697 /* Lookup selection key in the incoming map */
1698 for (i = 0; i < map_size; i++) {
1699 key = map.via.map.ptr[i].key;
1700 val = map.via.map.ptr[i].val;
1701
1702 if (key.type != MSGPACK_OBJECT_STR) {
1703 continue;
1704 }
1705
1706 /* Wildcard selection: * */
1707 if (cmd_key->name == NULL) {
1708 msgpack_pack_object(&mp_pck, key);
1709 msgpack_pack_object(&mp_pck, val);
1710 map_entries++;
1711 continue;
1712 }
1713
1714 /* Compare lengths */
1715 if (flb_sds_cmp(cmd_key->name,
1716 key.via.str.ptr, key.via.str.size) != 0) {
1717 continue;
1718 }
1719
1720 /*
1721 * Package key name:
1722 *
1723 * Check if the command ask for an alias 'key AS abc'
1724 */
1725 if (cmd_key->alias) {
1726 msgpack_pack_str(&mp_pck,
1727 flb_sds_len(cmd_key->alias));
1728 msgpack_pack_str_body(&mp_pck,
1729 cmd_key->alias,
1730 flb_sds_len(cmd_key->alias));
1731 }
1732 else {
1733 msgpack_pack_object(&mp_pck, key);
1734 }
1735
1736 /* Package value */
1737 sval = flb_sp_key_to_value(cmd_key->name, map,
1738 cmd_key->subkeys);
1739 if (sval) {
1740 msgpack_pack_object(&mp_pck, sval->o);
1741 flb_sp_key_value_destroy(sval);
1742 }
1743
1744 map_entries++;
1745 }
1746 }
1747
1748 /* Final Map size adjustment */
1749 if (map_entries == 0) {
1750 mp_sbuf.size = no_data;
1751 }
1752 else {
1753 /*
1754 * The fields were packed, now we need to adjust the map size
1755 * to set the proper number of fields appended to the record.
1756 */
1757 tmp = mp_sbuf.data + map_off;
1758 h = tmp[0];
1759 if (h >> 4 == 0x8) {
1760 *tmp = (uint8_t) 0x8 << 4 | ((uint8_t) map_entries);
1761 }
1762 else if (h == 0xde) {
1763 tmp++;
1764 pack_uint16(tmp, map_entries);
1765 }
1766 else if (h == 0xdf) {
1767 tmp++;
1768 pack_uint32(tmp, map_entries);
1769 }
1770 }
1771 }
1772
1773 msgpack_unpacked_destroy(&result);
1774
1775 if (records == 0) {
1776 msgpack_sbuffer_destroy(&mp_sbuf);
1777 return 0;
1778 }
1779
1780 /* Use snapshot out buffer if it is flush stream */
1781 if (cmd->type == FLB_SP_FLUSH_SNAPSHOT) {
1782 if (snapshot_out_size == 0) {
1783 msgpack_sbuffer_destroy(&mp_sbuf);
1784 flb_free(snapshot_out_buffer);
1785 return 0;
1786 }
1787 else {
1788 *out_buf = snapshot_out_buffer;
1789 *out_size = snapshot_out_size;
1790 return records;
1791 }
1792 }
1793
1794 /* set outgoing results */
1795 *out_buf = mp_sbuf.data;
1796 *out_size = mp_sbuf.size;
1797
1798 return records;
1799 }
1800
sp_process_hopping_slot(const char * tag,int tag_len,struct flb_sp_task * task)1801 static int sp_process_hopping_slot(const char *tag, int tag_len,
1802 struct flb_sp_task *task)
1803 {
1804 int i;
1805 int key_id;
1806 int map_entries;
1807 int gb_entries;
1808 struct flb_sp_cmd *cmd = task->cmd;
1809 struct mk_list *head;
1810 struct mk_list *head_hs;
1811 struct aggregate_node *aggr_node;
1812 struct aggregate_node *aggr_node_hs;
1813 struct aggregate_node *aggr_node_prev;
1814 struct flb_sp_hopping_slot *hs;
1815 struct flb_sp_hopping_slot *hs_;
1816 struct rb_tree_node *rb_result;
1817 struct flb_sp_cmd_key *ckey;
1818 rb_result_t result;
1819
1820 map_entries = mk_list_size(&cmd->keys);
1821 gb_entries = mk_list_size(&cmd->gb_keys);
1822
1823 /* Initialize a hoping slot */
1824 hs = flb_calloc(1, sizeof(struct flb_sp_hopping_slot));
1825 if (!hs) {
1826 flb_errno();
1827 return -1;
1828 }
1829
1830 mk_list_init(&hs->aggregate_list);
1831 rb_tree_new(&hs->aggregate_tree, flb_sp_groupby_compare);
1832
1833 /* Loop over aggregation nodes on window */
1834 mk_list_foreach(head, &task->window.aggregate_list) {
1835 /* Window aggregation node */
1836 aggr_node = mk_list_entry(head, struct aggregate_node, _head);
1837
1838 /* Create a hopping slot aggregation node */
1839 aggr_node_hs = flb_calloc(1, sizeof(struct aggregate_node));
1840 if (!aggr_node_hs) {
1841 flb_errno();
1842 flb_free(hs);
1843 return -1;
1844 }
1845
1846 aggr_node_hs->nums = malloc(sizeof(struct aggregate_node) * map_entries);
1847 if (!aggr_node_hs->nums) {
1848 flb_errno();
1849 flb_free(hs);
1850 flb_free(aggr_node_hs);
1851 return -1;
1852 }
1853
1854 memcpy(aggr_node_hs->nums, aggr_node->nums, sizeof(struct aggregate_num) * map_entries);
1855 aggr_node_hs->records = aggr_node->records;
1856
1857 /* Clone aggregate data */
1858 key_id = 0;
1859 mk_list_foreach(head_hs, &cmd->keys) {
1860 ckey = mk_list_entry(head_hs, struct flb_sp_cmd_key, _head);
1861
1862 if (ckey->aggr_func) {
1863 if (!aggr_node_hs->aggregate_data) {
1864 aggr_node_hs->aggregate_data = (struct aggregate_data **)
1865 flb_calloc(1, sizeof(struct aggregate_data *) * map_entries);
1866 if (!aggr_node_hs->aggregate_data) {
1867 flb_errno();
1868 flb_free(hs);
1869 flb_free(aggr_node_hs->nums);
1870 flb_free(aggr_node_hs);
1871 return -1;
1872 }
1873 }
1874
1875 if (aggregate_func_clone[ckey->aggr_func - 1](aggr_node_hs, aggr_node, ckey, key_id) == -1) {
1876 flb_errno();
1877 flb_free(aggr_node_hs->nums);
1878 flb_free(aggr_node_hs->aggregate_data);
1879 flb_free(aggr_node_hs);
1880 flb_free(hs);
1881 return -1;
1882 }
1883 }
1884
1885 key_id++;
1886 }
1887
1888 /* Traverse over previous slots to calculate values/record numbers */
1889 mk_list_foreach(head_hs, &task->window.hopping_slot) {
1890 hs_ = mk_list_entry(head_hs, struct flb_sp_hopping_slot, _head);
1891 result = rb_tree_find(&hs_->aggregate_tree, aggr_node, &rb_result);
1892 /* If corresponding aggregation node exists in previous hopping slot,
1893 * calculate aggregation values
1894 */
1895 if (result == RB_OK) {
1896 aggr_node_prev = mk_list_entry(rb_result, struct aggregate_node,
1897 _rb_head);
1898 aggr_node_hs->records -= aggr_node_prev->records;
1899
1900 key_id = 0;
1901 ckey = mk_list_entry_first(&cmd->keys, struct flb_sp_cmd_key,
1902 _head);
1903 for (i = 0; i < map_entries; i++) {
1904 if (ckey->aggr_func) {
1905 aggregate_func_remove[ckey->aggr_func - 1](aggr_node_hs, aggr_node_prev, i);
1906 }
1907
1908 ckey = mk_list_entry_next(&ckey->_head, struct flb_sp_cmd_key,
1909 _head, &cmd->keys);
1910 }
1911 }
1912 }
1913
1914 if (aggr_node_hs->records > 0) {
1915 aggr_node_hs->groupby_nums =
1916 flb_calloc(1, sizeof(struct aggregate_node) * gb_entries);
1917 if (gb_entries > 0 && !aggr_node_hs->groupby_nums) {
1918 flb_errno();
1919 flb_free(hs);
1920 flb_free(aggr_node_hs->nums);
1921 flb_free(aggr_node_hs->aggregate_data);
1922 flb_free(aggr_node_hs);
1923 return -1;
1924 }
1925
1926 if (aggr_node_hs->groupby_nums != NULL) {
1927 memcpy(aggr_node_hs->groupby_nums, aggr_node->groupby_nums,
1928 sizeof(struct aggregate_num) * gb_entries);
1929 }
1930
1931 aggr_node_hs->nums_size = aggr_node->nums_size;
1932 aggr_node_hs->groupby_keys = aggr_node->groupby_keys;
1933
1934 rb_tree_insert(&hs->aggregate_tree, aggr_node_hs, &aggr_node_hs->_rb_head);
1935 mk_list_add(&aggr_node_hs->_head, &hs->aggregate_list);
1936 }
1937 else {
1938 flb_free(aggr_node_hs->nums);
1939 flb_free(aggr_node_hs->aggregate_data);
1940 flb_free(aggr_node_hs);
1941 }
1942 }
1943
1944 hs->records = task->window.records;
1945 mk_list_foreach(head_hs, &task->window.hopping_slot) {
1946 hs_ = mk_list_entry(head_hs, struct flb_sp_hopping_slot, _head);
1947 hs->records -= hs_->records;
1948 }
1949
1950 mk_list_add(&hs->_head, &task->window.hopping_slot);
1951
1952 return 0;
1953 }
1954
1955 /*
1956 * Do data processing for internal unit tests, no engine required, set
1957 * results on out_data/out_size variables.
1958 */
flb_sp_test_do(struct flb_sp * sp,struct flb_sp_task * task,const char * tag,int tag_len,const char * buf_data,size_t buf_size,char ** out_data,size_t * out_size)1959 int flb_sp_test_do(struct flb_sp *sp, struct flb_sp_task *task,
1960 const char *tag, int tag_len,
1961 const char *buf_data, size_t buf_size,
1962 char **out_data, size_t *out_size)
1963 {
1964 int ret;
1965 int records;
1966 struct flb_sp_cmd *cmd;
1967
1968 cmd = task->cmd;
1969 if (cmd->source_type == FLB_SP_TAG) {
1970 ret = flb_router_match(tag, tag_len, cmd->source_name, NULL);
1971 if (ret == FLB_FALSE) {
1972 *out_data = NULL;
1973 *out_size = 0;
1974 return 0;
1975 }
1976 }
1977
1978 if (task->aggregate_keys == FLB_TRUE) {
1979 ret = sp_process_data_aggr(buf_data, buf_size,
1980 tag, tag_len,
1981 task, sp);
1982 if (ret == -1) {
1983 flb_error("[sp] error error processing records for '%s'",
1984 task->name);
1985 return -1;
1986 }
1987
1988 if (flb_sp_window_populate(task, buf_data, buf_size) == -1) {
1989 flb_error("[sp] error populating window for '%s'",
1990 task->name);
1991 return -1;
1992 }
1993
1994 if (task->window.type == FLB_SP_WINDOW_DEFAULT) {
1995 package_results(tag, tag_len, out_data, out_size, task);
1996 }
1997
1998 records = task->window.records;
1999 }
2000 else {
2001 ret = sp_process_data(tag, tag_len,
2002 buf_data, buf_size,
2003 out_data, out_size,
2004 task, sp);
2005 if (ret == -1) {
2006 flb_error("[sp] error processing records for '%s'",
2007 task->name);
2008 return -1;
2009 }
2010 records = ret;
2011 }
2012
2013 if (records == 0) {
2014 *out_data = NULL;
2015 *out_size = 0;
2016 return 0;
2017 }
2018
2019 return 0;
2020 }
2021
2022 /* Iterate and find input chunks to process */
flb_sp_do(struct flb_sp * sp,struct flb_input_instance * in,const char * tag,int tag_len,const char * buf_data,size_t buf_size)2023 int flb_sp_do(struct flb_sp *sp, struct flb_input_instance *in,
2024 const char *tag, int tag_len,
2025 const char *buf_data, size_t buf_size)
2026
2027 {
2028 int ret;
2029 size_t out_size;
2030 char *out_buf;
2031 struct mk_list *head;
2032 struct flb_sp_task *task;
2033 struct flb_sp_cmd *cmd;
2034
2035 /* Lookup tasks that match the incoming instance data */
2036 mk_list_foreach(head, &sp->tasks) {
2037 task = mk_list_entry(head, struct flb_sp_task, _head);
2038 cmd = task->cmd;
2039
2040 if (cmd->source_type == FLB_SP_STREAM) {
2041 if (task->source_instance != in) {
2042 continue;
2043 }
2044 }
2045 else if (cmd->source_type == FLB_SP_TAG) {
2046 ret = flb_router_match(tag, tag_len, cmd->source_name, NULL);
2047 if (ret == FLB_FALSE) {
2048 continue;
2049 }
2050 }
2051
2052 /* We found a task that matches the stream rule */
2053 if (task->aggregate_keys == FLB_TRUE) {
2054 ret = sp_process_data_aggr(buf_data, buf_size,
2055 tag, tag_len,
2056 task, sp);
2057
2058 if (ret == -1) {
2059 flb_error("[sp] error processing records for '%s'",
2060 task->name);
2061 continue;
2062 }
2063
2064 if (flb_sp_window_populate(task, buf_data, buf_size) == -1) {
2065 flb_error("[sp] error populating window for '%s'",
2066 task->name);
2067 continue;
2068 }
2069
2070 if (task->window.type == FLB_SP_WINDOW_DEFAULT) {
2071 package_results(tag, tag_len, &out_buf, &out_size, task);
2072 flb_sp_window_prune(task);
2073 }
2074 }
2075 else {
2076 ret = sp_process_data(tag, tag_len,
2077 buf_data, buf_size,
2078 &out_buf, &out_size,
2079 task, sp);
2080
2081 if (ret == -1) {
2082 flb_error("[sp] error processing records for '%s'",
2083 task->name);
2084 continue;
2085 }
2086 }
2087
2088 if (ret == 0) {
2089 /* no records */
2090 continue;
2091 }
2092
2093 /*
2094 * This task involves append data to a stream, which
2095 * means: register the output of the query as data
2096 * generated by an input instance plugin.
2097 */
2098 if (task->aggregate_keys != FLB_TRUE ||
2099 task->window.type == FLB_SP_WINDOW_DEFAULT) {
2100 /*
2101 * Add to stream processing stream if there is no
2102 * aggregation function. Otherwise, write it at timer event
2103 */
2104 if (task->stream) {
2105 flb_sp_stream_append_data(out_buf, out_size, task->stream);
2106 }
2107 else {
2108 flb_pack_print(out_buf, out_size);
2109 flb_free(out_buf);
2110 }
2111 }
2112 }
2113
2114 return -1;
2115 }
2116
flb_sp_fd_event(int fd,struct flb_sp * sp)2117 int flb_sp_fd_event(int fd, struct flb_sp *sp)
2118 {
2119 bool update_timer_event;
2120 char *out_buf;
2121 char *tag = NULL;
2122 int tag_len = 0;
2123 int fd_timeout = 0;
2124 size_t out_size;
2125 struct mk_list *tmp;
2126 struct mk_list *head;
2127 struct flb_sp_task *task;
2128 struct flb_input_instance *in = NULL;
2129
2130 /* Lookup Tasks that matches the incoming event */
2131 mk_list_foreach_safe(head, tmp, &sp->tasks) {
2132 task = mk_list_entry(head, struct flb_sp_task, _head);
2133
2134 if (fd == task->window.fd) {
2135 update_timer_event = task->window.type == FLB_SP_WINDOW_HOPPING &&
2136 task->window.first_hop;
2137
2138 in = task->source_instance;
2139 if (in) {
2140 if (in->tag && in->tag_len > 0) {
2141 tag = in->tag;
2142 tag_len = in->tag_len;
2143 }
2144 else {
2145 tag = in->name;
2146 tag_len = strlen(in->name);
2147 }
2148 }
2149 else {
2150 in = NULL;
2151 }
2152
2153 if (task->window.records > 0) {
2154 /* find input tag from task source */
2155 package_results(tag, tag_len, &out_buf, &out_size, task);
2156 if (task->stream) {
2157 flb_sp_stream_append_data(out_buf, out_size, task->stream);
2158 }
2159 else {
2160 flb_pack_print(out_buf, out_size);
2161 flb_free(out_buf);
2162 }
2163
2164 }
2165
2166 flb_sp_window_prune(task);
2167
2168 flb_utils_timer_consume(fd);
2169
2170 if (update_timer_event && in) {
2171 task->window.first_hop = false;
2172 mk_event_timeout_destroy(in->config->evl, &task->window.event);
2173 mk_event_closesocket(fd);
2174
2175 fd_timeout = mk_event_timeout_create(in->config->evl,
2176 task->window.advance_by, (long) 0,
2177 &task->window.event);
2178 if (fd_timeout == -1) {
2179 flb_error("[sp] registration for task (updating timer event) %s failed", task->name);
2180 return -1;
2181 }
2182 task->window.fd = fd_timeout;
2183 }
2184
2185 break;
2186 }
2187 else if (fd == task->window.fd_hop) {
2188 in = task->source_instance;
2189 if (in) {
2190 if (in->tag && in->tag_len > 0) {
2191 tag = in->tag;
2192 tag_len = in->tag_len;
2193 }
2194 else {
2195 tag = in->name;
2196 tag_len = strlen(in->name);
2197 }
2198 }
2199 sp_process_hopping_slot(tag, tag_len, task);
2200 flb_utils_timer_consume(fd);
2201 }
2202 }
2203 return 0;
2204 }
2205
2206 /* Destroy stream processor context */
flb_sp_destroy(struct flb_sp * sp)2207 void flb_sp_destroy(struct flb_sp *sp)
2208 {
2209 struct mk_list *tmp;
2210 struct mk_list *head;
2211 struct flb_sp_task *task;
2212
2213 /* destroy tasks */
2214 mk_list_foreach_safe(head, tmp, &sp->tasks) {
2215 task = mk_list_entry(head, struct flb_sp_task, _head);
2216 flb_sp_task_destroy(task);
2217 }
2218
2219 flb_free(sp);
2220 }
2221
flb_sp_test_fd_event(int fd,struct flb_sp_task * task,char ** out_data,size_t * out_size)2222 int flb_sp_test_fd_event(int fd, struct flb_sp_task *task, char **out_data,
2223 size_t *out_size)
2224 {
2225 char *tag = NULL;
2226 int tag_len = 0;
2227
2228 if (task->window.type != FLB_SP_WINDOW_DEFAULT) {
2229 if (fd == task->window.fd) {
2230 if (task->window.records > 0) {
2231 /* find input tag from task source */
2232 package_results(tag, tag_len, out_data, out_size, task);
2233 if (task->stream) {
2234 flb_sp_stream_append_data(*out_data, *out_size, task->stream);
2235 }
2236 else {
2237 flb_pack_print(*out_data, *out_size);
2238 }
2239 }
2240
2241 flb_sp_window_prune(task);
2242 }
2243 else if (fd == task->window.fd_hop) {
2244 sp_process_hopping_slot(tag, tag_len, task);
2245 }
2246 }
2247
2248 return 0;
2249 }
2250