1 /**
2 * collectd - src/aggregation.c
3 * Copyright (C) 2012 Florian Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian Forster <octo at collectd.org>
25 **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/common/common.h"
31 #include "utils/lookup/vl_lookup.h"
32 #include "utils/metadata/meta_data.h"
33 #include "utils_cache.h" /* for uc_get_rate() */
34 #include "utils_subst.h"
35
36 #define AGG_MATCHES_ALL(str) (strcmp("/.*/", str) == 0)
37 #define AGG_FUNC_PLACEHOLDER "%{aggregation}"
38
39 struct aggregation_s /* {{{ */
40 {
41 lookup_identifier_t ident;
42 unsigned int group_by;
43
44 unsigned int regex_fields;
45
46 char *set_host;
47 char *set_plugin;
48 char *set_plugin_instance;
49 char *set_type_instance;
50
51 bool calc_num;
52 bool calc_sum;
53 bool calc_average;
54 bool calc_min;
55 bool calc_max;
56 bool calc_stddev;
57 }; /* }}} */
58 typedef struct aggregation_s aggregation_t;
59
60 struct agg_instance_s;
61 typedef struct agg_instance_s agg_instance_t;
62 struct agg_instance_s /* {{{ */
63 {
64 pthread_mutex_t lock;
65 lookup_identifier_t ident;
66
67 int ds_type;
68
69 derive_t num;
70 gauge_t sum;
71 gauge_t squares_sum;
72
73 gauge_t min;
74 gauge_t max;
75
76 rate_to_value_state_t *state_num;
77 rate_to_value_state_t *state_sum;
78 rate_to_value_state_t *state_average;
79 rate_to_value_state_t *state_min;
80 rate_to_value_state_t *state_max;
81 rate_to_value_state_t *state_stddev;
82
83 agg_instance_t *next;
84 }; /* }}} */
85
86 static lookup_t *lookup;
87
88 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
89 static agg_instance_t *agg_instance_list_head;
90
agg_is_regex(char const * str)91 static bool agg_is_regex(char const *str) /* {{{ */
92 {
93 if (str == NULL)
94 return false;
95
96 size_t len = strlen(str);
97 if (len < 3)
98 return false;
99
100 if ((str[0] == '/') && (str[len - 1] == '/'))
101 return true;
102 else
103 return false;
104 } /* }}} bool agg_is_regex */
105
agg_destroy(aggregation_t * agg)106 static void agg_destroy(aggregation_t *agg) /* {{{ */
107 {
108 sfree(agg);
109 } /* }}} void agg_destroy */
110
111 /* Frees all dynamically allocated memory within the instance. */
agg_instance_destroy(agg_instance_t * inst)112 static void agg_instance_destroy(agg_instance_t *inst) /* {{{ */
113 {
114 if (inst == NULL)
115 return;
116
117 /* Remove this instance from the global list of instances. */
118 pthread_mutex_lock(&agg_instance_list_lock);
119 if (agg_instance_list_head == inst)
120 agg_instance_list_head = inst->next;
121 else if (agg_instance_list_head != NULL) {
122 agg_instance_t *prev = agg_instance_list_head;
123 while ((prev != NULL) && (prev->next != inst))
124 prev = prev->next;
125 if (prev != NULL)
126 prev->next = inst->next;
127 }
128 pthread_mutex_unlock(&agg_instance_list_lock);
129
130 sfree(inst->state_num);
131 sfree(inst->state_sum);
132 sfree(inst->state_average);
133 sfree(inst->state_min);
134 sfree(inst->state_max);
135 sfree(inst->state_stddev);
136
137 memset(inst, 0, sizeof(*inst));
138 inst->ds_type = -1;
139 inst->min = NAN;
140 inst->max = NAN;
141 } /* }}} void agg_instance_destroy */
142
agg_instance_create_name(agg_instance_t * inst,value_list_t const * vl,aggregation_t const * agg)143 static int agg_instance_create_name(agg_instance_t *inst, /* {{{ */
144 value_list_t const *vl,
145 aggregation_t const *agg) {
146 #define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) \
147 do { \
148 if (agg->set_##field != NULL) \
149 sstrncpy(buffer, agg->set_##field, buffer_size); \
150 else if ((agg->regex_fields & group_mask) && (agg->group_by & group_mask)) \
151 sstrncpy(buffer, vl->field, buffer_size); \
152 else if ((agg->regex_fields & group_mask) && \
153 (AGG_MATCHES_ALL(agg->ident.field))) \
154 sstrncpy(buffer, all_value, buffer_size); \
155 else \
156 sstrncpy(buffer, agg->ident.field, buffer_size); \
157 } while (0)
158
159 /* Host */
160 COPY_FIELD(inst->ident.host, sizeof(inst->ident.host), host, LU_GROUP_BY_HOST,
161 "global");
162
163 /* Plugin */
164 if (agg->set_plugin != NULL)
165 sstrncpy(inst->ident.plugin, agg->set_plugin, sizeof(inst->ident.plugin));
166 else
167 sstrncpy(inst->ident.plugin, "aggregation", sizeof(inst->ident.plugin));
168
169 /* Plugin instance */
170 if (agg->set_plugin_instance != NULL)
171 sstrncpy(inst->ident.plugin_instance, agg->set_plugin_instance,
172 sizeof(inst->ident.plugin_instance));
173 else {
174 char tmp_plugin[DATA_MAX_NAME_LEN];
175 char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
176
177 if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) &&
178 (agg->group_by & LU_GROUP_BY_PLUGIN))
179 sstrncpy(tmp_plugin, vl->plugin, sizeof(tmp_plugin));
180 else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) &&
181 (AGG_MATCHES_ALL(agg->ident.plugin)))
182 sstrncpy(tmp_plugin, "", sizeof(tmp_plugin));
183 else
184 sstrncpy(tmp_plugin, agg->ident.plugin, sizeof(tmp_plugin));
185
186 if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) &&
187 (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
188 sstrncpy(tmp_plugin_instance, vl->plugin_instance,
189 sizeof(tmp_plugin_instance));
190 else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) &&
191 (AGG_MATCHES_ALL(agg->ident.plugin_instance)))
192 sstrncpy(tmp_plugin_instance, "", sizeof(tmp_plugin_instance));
193 else
194 sstrncpy(tmp_plugin_instance, agg->ident.plugin_instance,
195 sizeof(tmp_plugin_instance));
196
197 // Both tmp_plugin and tmp_plugin_instance are empty.
198 if ((strcmp("", tmp_plugin) == 0) && (strcmp("", tmp_plugin_instance) == 0))
199 sstrncpy(inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
200 sizeof(inst->ident.plugin_instance));
201 // tmp_plugin is non-empty, and tmp_plugin_instance is empty.
202 else if (strcmp("", tmp_plugin_instance) == 0)
203 ssnprintf(inst->ident.plugin_instance,
204 sizeof(inst->ident.plugin_instance), "%s-%s", tmp_plugin,
205 AGG_FUNC_PLACEHOLDER);
206 // tmp_plugin is empty, and tmp_plugin_instance is non-empty.
207 else if (strcmp("", tmp_plugin) == 0)
208 ssnprintf(inst->ident.plugin_instance,
209 sizeof(inst->ident.plugin_instance), "%s-%s",
210 tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
211 // Both tmp_plugin and tmp_plugin_instance are non-empty.
212 else
213 ssnprintf(inst->ident.plugin_instance,
214 sizeof(inst->ident.plugin_instance), "%s-%s-%s", tmp_plugin,
215 tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
216 }
217
218 /* Type */
219 sstrncpy(inst->ident.type, agg->ident.type, sizeof(inst->ident.type));
220
221 /* Type instance */
222 COPY_FIELD(inst->ident.type_instance, sizeof(inst->ident.type_instance),
223 type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
224
225 #undef COPY_FIELD
226
227 return 0;
228 } /* }}} int agg_instance_create_name */
229
230 /* Create a new aggregation instance. */
agg_instance_create(data_set_t const * ds,value_list_t const * vl,aggregation_t * agg)231 static agg_instance_t *agg_instance_create(data_set_t const *ds, /* {{{ */
232 value_list_t const *vl,
233 aggregation_t *agg) {
234 DEBUG("aggregation plugin: Creating new instance.");
235
236 agg_instance_t *inst = calloc(1, sizeof(*inst));
237 if (inst == NULL) {
238 ERROR("aggregation plugin: calloc() failed.");
239 return NULL;
240 }
241 pthread_mutex_init(&inst->lock, /* attr = */ NULL);
242
243 inst->ds_type = ds->ds[0].type;
244
245 agg_instance_create_name(inst, vl, agg);
246
247 inst->min = NAN;
248 inst->max = NAN;
249
250 #define INIT_STATE(field) \
251 do { \
252 inst->state_##field = NULL; \
253 if (agg->calc_##field) { \
254 inst->state_##field = calloc(1, sizeof(*inst->state_##field)); \
255 if (inst->state_##field == NULL) { \
256 agg_instance_destroy(inst); \
257 free(inst); \
258 ERROR("aggregation plugin: calloc() failed."); \
259 return NULL; \
260 } \
261 } \
262 } while (0)
263
264 INIT_STATE(num);
265 INIT_STATE(sum);
266 INIT_STATE(average);
267 INIT_STATE(min);
268 INIT_STATE(max);
269 INIT_STATE(stddev);
270
271 #undef INIT_STATE
272
273 pthread_mutex_lock(&agg_instance_list_lock);
274 inst->next = agg_instance_list_head;
275 agg_instance_list_head = inst;
276 pthread_mutex_unlock(&agg_instance_list_lock);
277
278 return inst;
279 } /* }}} agg_instance_t *agg_instance_create */
280
281 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
282 * the rate of the value list is available. Value lists with more than one data
283 * source are not supported and will return an error. Returns zero on success
284 * and non-zero otherwise. */
agg_instance_update(agg_instance_t * inst,data_set_t const * ds,value_list_t const * vl)285 static int agg_instance_update(agg_instance_t *inst, /* {{{ */
286 data_set_t const *ds, value_list_t const *vl) {
287 if (ds->ds_num != 1) {
288 ERROR("aggregation plugin: The \"%s\" type (data set) has more than one "
289 "data source. This is currently not supported by this plugin. "
290 "Sorry.",
291 ds->type);
292 return EINVAL;
293 }
294
295 gauge_t *rate = uc_get_rate(ds, vl);
296 if (rate == NULL) {
297 char ident[6 * DATA_MAX_NAME_LEN];
298 FORMAT_VL(ident, sizeof(ident), vl);
299 ERROR("aggregation plugin: Unable to read the current rate of \"%s\".",
300 ident);
301 return ENOENT;
302 }
303
304 if (isnan(rate[0])) {
305 sfree(rate);
306 return 0;
307 }
308
309 pthread_mutex_lock(&inst->lock);
310
311 inst->num++;
312 inst->sum += rate[0];
313 inst->squares_sum += (rate[0] * rate[0]);
314
315 if (isnan(inst->min) || (inst->min > rate[0]))
316 inst->min = rate[0];
317 if (isnan(inst->max) || (inst->max < rate[0]))
318 inst->max = rate[0];
319
320 pthread_mutex_unlock(&inst->lock);
321
322 sfree(rate);
323 return 0;
324 } /* }}} int agg_instance_update */
325
agg_instance_read_func(agg_instance_t * inst,char const * func,gauge_t rate,rate_to_value_state_t * state,value_list_t * vl,char const * pi_prefix,cdtime_t t)326 static int agg_instance_read_func(agg_instance_t *inst, /* {{{ */
327 char const *func, gauge_t rate,
328 rate_to_value_state_t *state,
329 value_list_t *vl, char const *pi_prefix,
330 cdtime_t t) {
331 if (pi_prefix[0] != 0)
332 subst_string(vl->plugin_instance, sizeof(vl->plugin_instance), pi_prefix,
333 AGG_FUNC_PLACEHOLDER, func);
334 else
335 sstrncpy(vl->plugin_instance, func, sizeof(vl->plugin_instance));
336
337 value_t v;
338
339 int status = rate_to_value(&v, rate, state, inst->ds_type, t);
340 if (status != 0) {
341 /* If this is the first iteration and rate_to_value() was asked to return a
342 * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
343 * gracefully. */
344 if (status == EAGAIN)
345 return 0;
346
347 WARNING("aggregation plugin: rate_to_value failed with status %i.", status);
348 return -1;
349 }
350
351 vl->values = &v;
352 vl->values_len = 1;
353
354 plugin_dispatch_values(vl);
355
356 vl->values = NULL;
357 vl->values_len = 0;
358
359 return 0;
360 } /* }}} int agg_instance_read_func */
361
agg_instance_read(agg_instance_t * inst,cdtime_t t)362 static int agg_instance_read(agg_instance_t *inst, cdtime_t t) /* {{{ */
363 {
364 value_list_t vl = VALUE_LIST_INIT;
365
366 /* Pre-set all the fields in the value list that will not change per
367 * aggregation type (sum, average, ...). The struct will be re-used and must
368 * therefore be dispatched using the "secure" function. */
369
370 vl.time = t;
371 vl.interval = 0;
372
373 vl.meta = meta_data_create();
374 if (vl.meta == NULL) {
375 ERROR("aggregation plugin: meta_data_create failed.");
376 return -1;
377 }
378 meta_data_add_boolean(vl.meta, "aggregation:created", 1);
379
380 sstrncpy(vl.host, inst->ident.host, sizeof(vl.host));
381 sstrncpy(vl.plugin, inst->ident.plugin, sizeof(vl.plugin));
382 sstrncpy(vl.type, inst->ident.type, sizeof(vl.type));
383 sstrncpy(vl.type_instance, inst->ident.type_instance,
384 sizeof(vl.type_instance));
385
386 #define READ_FUNC(func, rate) \
387 do { \
388 if (inst->state_##func != NULL) { \
389 agg_instance_read_func(inst, #func, rate, inst->state_##func, &vl, \
390 inst->ident.plugin_instance, t); \
391 } \
392 } while (0)
393
394 pthread_mutex_lock(&inst->lock);
395
396 READ_FUNC(num, (gauge_t)inst->num);
397
398 /* All other aggregations are only defined when there have been any values
399 * at all. */
400 if (inst->num > 0) {
401 READ_FUNC(sum, inst->sum);
402 READ_FUNC(average, (inst->sum / ((gauge_t)inst->num)));
403 READ_FUNC(min, inst->min);
404 READ_FUNC(max, inst->max);
405 READ_FUNC(stddev, sqrt((((gauge_t)inst->num) * inst->squares_sum) -
406 (inst->sum * inst->sum)) /
407 ((gauge_t)inst->num));
408 }
409
410 /* Reset internal state. */
411 inst->num = 0;
412 inst->sum = 0.0;
413 inst->squares_sum = 0.0;
414 inst->min = NAN;
415 inst->max = NAN;
416
417 pthread_mutex_unlock(&inst->lock);
418
419 meta_data_destroy(vl.meta);
420 vl.meta = NULL;
421
422 return 0;
423 } /* }}} int agg_instance_read */
424
425 /* lookup_class_callback_t for utils_vl_lookup */
agg_lookup_class_callback(data_set_t const * ds,value_list_t const * vl,void * user_class)426 static void *agg_lookup_class_callback(/* {{{ */
427 data_set_t const *ds,
428 value_list_t const *vl,
429 void *user_class) {
430 return agg_instance_create(ds, vl, (aggregation_t *)user_class);
431 } /* }}} void *agg_class_callback */
432
433 /* lookup_obj_callback_t for utils_vl_lookup */
agg_lookup_obj_callback(data_set_t const * ds,value_list_t const * vl,void * user_class,void * user_obj)434 static int agg_lookup_obj_callback(data_set_t const *ds, /* {{{ */
435 value_list_t const *vl,
436 __attribute__((unused)) void *user_class,
437 void *user_obj) {
438 return agg_instance_update((agg_instance_t *)user_obj, ds, vl);
439 } /* }}} int agg_lookup_obj_callback */
440
441 /* lookup_free_class_callback_t for utils_vl_lookup */
agg_lookup_free_class_callback(void * user_class)442 static void agg_lookup_free_class_callback(void *user_class) /* {{{ */
443 {
444 agg_destroy((aggregation_t *)user_class);
445 } /* }}} void agg_lookup_free_class_callback */
446
447 /* lookup_free_obj_callback_t for utils_vl_lookup */
agg_lookup_free_obj_callback(void * user_obj)448 static void agg_lookup_free_obj_callback(void *user_obj) /* {{{ */
449 {
450 agg_instance_destroy((agg_instance_t *)user_obj);
451 } /* }}} void agg_lookup_free_obj_callback */
452
453 /*
454 * <Plugin "aggregation">
455 * <Aggregation>
456 * Plugin "cpu"
457 * Type "cpu"
458 *
459 * GroupBy Host
460 * GroupBy TypeInstance
461 *
462 * CalculateNum true
463 * CalculateSum true
464 * CalculateAverage true
465 * CalculateMinimum true
466 * CalculateMaximum true
467 * CalculateStddev true
468 * </Aggregation>
469 * </Plugin>
470 */
agg_config_handle_group_by(oconfig_item_t const * ci,aggregation_t * agg)471 static int agg_config_handle_group_by(oconfig_item_t const *ci, /* {{{ */
472 aggregation_t *agg) {
473 for (int i = 0; i < ci->values_num; i++) {
474 if (ci->values[i].type != OCONFIG_TYPE_STRING) {
475 ERROR("aggregation plugin: Argument %i of the \"GroupBy\" option "
476 "is not a string.",
477 i + 1);
478 continue;
479 }
480
481 const char *value = ci->values[i].value.string;
482
483 if (strcasecmp("Host", value) == 0)
484 agg->group_by |= LU_GROUP_BY_HOST;
485 else if (strcasecmp("Plugin", value) == 0)
486 agg->group_by |= LU_GROUP_BY_PLUGIN;
487 else if (strcasecmp("PluginInstance", value) == 0)
488 agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
489 else if (strcasecmp("TypeInstance", value) == 0)
490 agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
491 else if (strcasecmp("Type", value) == 0)
492 ERROR("aggregation plugin: Grouping by type is not supported.");
493 else
494 WARNING("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
495 "option is invalid and will be ignored.",
496 value);
497 } /* for (ci->values) */
498
499 return 0;
500 } /* }}} int agg_config_handle_group_by */
501
agg_config_aggregation(oconfig_item_t * ci)502 static int agg_config_aggregation(oconfig_item_t *ci) /* {{{ */
503 {
504 aggregation_t *agg = calloc(1, sizeof(*agg));
505 if (agg == NULL) {
506 ERROR("aggregation plugin: calloc failed.");
507 return -1;
508 }
509
510 sstrncpy(agg->ident.host, "/.*/", sizeof(agg->ident.host));
511 sstrncpy(agg->ident.plugin, "/.*/", sizeof(agg->ident.plugin));
512 sstrncpy(agg->ident.plugin_instance, "/.*/",
513 sizeof(agg->ident.plugin_instance));
514 sstrncpy(agg->ident.type, "/.*/", sizeof(agg->ident.type));
515 sstrncpy(agg->ident.type_instance, "/.*/", sizeof(agg->ident.type_instance));
516
517 for (int i = 0; i < ci->children_num; i++) {
518 oconfig_item_t *child = ci->children + i;
519 int status = 0;
520
521 if (strcasecmp("Host", child->key) == 0)
522 status = cf_util_get_string_buffer(child, agg->ident.host,
523 sizeof(agg->ident.host));
524 else if (strcasecmp("Plugin", child->key) == 0)
525 status = cf_util_get_string_buffer(child, agg->ident.plugin,
526 sizeof(agg->ident.plugin));
527 else if (strcasecmp("PluginInstance", child->key) == 0)
528 status = cf_util_get_string_buffer(child, agg->ident.plugin_instance,
529 sizeof(agg->ident.plugin_instance));
530 else if (strcasecmp("Type", child->key) == 0)
531 status = cf_util_get_string_buffer(child, agg->ident.type,
532 sizeof(agg->ident.type));
533 else if (strcasecmp("TypeInstance", child->key) == 0)
534 status = cf_util_get_string_buffer(child, agg->ident.type_instance,
535 sizeof(agg->ident.type_instance));
536 else if (strcasecmp("SetHost", child->key) == 0)
537 status = cf_util_get_string(child, &agg->set_host);
538 else if (strcasecmp("SetPlugin", child->key) == 0)
539 status = cf_util_get_string(child, &agg->set_plugin);
540 else if (strcasecmp("SetPluginInstance", child->key) == 0)
541 status = cf_util_get_string(child, &agg->set_plugin_instance);
542 else if (strcasecmp("SetTypeInstance", child->key) == 0)
543 status = cf_util_get_string(child, &agg->set_type_instance);
544 else if (strcasecmp("GroupBy", child->key) == 0)
545 status = agg_config_handle_group_by(child, agg);
546 else if (strcasecmp("CalculateNum", child->key) == 0)
547 status = cf_util_get_boolean(child, &agg->calc_num);
548 else if (strcasecmp("CalculateSum", child->key) == 0)
549 status = cf_util_get_boolean(child, &agg->calc_sum);
550 else if (strcasecmp("CalculateAverage", child->key) == 0)
551 status = cf_util_get_boolean(child, &agg->calc_average);
552 else if (strcasecmp("CalculateMinimum", child->key) == 0)
553 status = cf_util_get_boolean(child, &agg->calc_min);
554 else if (strcasecmp("CalculateMaximum", child->key) == 0)
555 status = cf_util_get_boolean(child, &agg->calc_max);
556 else if (strcasecmp("CalculateStddev", child->key) == 0)
557 status = cf_util_get_boolean(child, &agg->calc_stddev);
558 else
559 WARNING("aggregation plugin: The \"%s\" key is not allowed inside "
560 "<Aggregation /> blocks and will be ignored.",
561 child->key);
562
563 if (status != 0) {
564 sfree(agg);
565 return status;
566 }
567 } /* for (int i = 0; i < ci->children_num; i++) */
568
569 if (agg_is_regex(agg->ident.host))
570 agg->regex_fields |= LU_GROUP_BY_HOST;
571 if (agg_is_regex(agg->ident.plugin))
572 agg->regex_fields |= LU_GROUP_BY_PLUGIN;
573 if (agg_is_regex(agg->ident.plugin_instance))
574 agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
575 if (agg_is_regex(agg->ident.type_instance))
576 agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
577
578 /* Sanity checking */
579 bool is_valid = true;
580 if (strcmp("/.*/", agg->ident.type) == 0) /* {{{ */
581 {
582 ERROR("aggregation plugin: It appears you did not specify the required "
583 "\"Type\" option in this aggregation. "
584 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
585 "Type \"%s\", TypeInstance \"%s\")",
586 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
587 agg->ident.type, agg->ident.type_instance);
588 is_valid = false;
589 } else if (strchr(agg->ident.type, '/') != NULL) {
590 ERROR("aggregation plugin: The \"Type\" may not contain the '/' "
591 "character. Especially, it may not be a regex. The current "
592 "value is \"%s\".",
593 agg->ident.type);
594 is_valid = false;
595 } /* }}} */
596
597 /* Check that there is at least one regex field without a grouping. {{{ */
598 if ((agg->regex_fields & ~agg->group_by) == 0) {
599 ERROR("aggregation plugin: An aggregation must contain at least one "
600 "wildcard. This is achieved by leaving at least one of the \"Host\", "
601 "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
602 "or using a regular expression and not grouping by that field. "
603 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
604 "Type \"%s\", TypeInstance \"%s\")",
605 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
606 agg->ident.type, agg->ident.type_instance);
607 is_valid = false;
608 } /* }}} */
609
610 /* Check that all grouping fields are regular expressions. {{{ */
611 if (agg->group_by & ~agg->regex_fields) {
612 ERROR("aggregation plugin: Only wildcard fields (fields for which a "
613 "regular expression is configured or which are left blank) can be "
614 "specified in the \"GroupBy\" option. "
615 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
616 "Type \"%s\", TypeInstance \"%s\")",
617 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
618 agg->ident.type, agg->ident.type_instance);
619 is_valid = false;
620 } /* }}} */
621
622 if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
623 && !agg->calc_min && !agg->calc_max && !agg->calc_stddev) {
624 ERROR("aggregation plugin: No aggregation function has been specified. "
625 "Without this, I don't know what I should be calculating. "
626 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
627 "Type \"%s\", TypeInstance \"%s\")",
628 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
629 agg->ident.type, agg->ident.type_instance);
630 is_valid = false;
631 } /* }}} */
632
633 if (!is_valid) { /* {{{ */
634 sfree(agg);
635 return -1;
636 } /* }}} */
637
638 int status = lookup_add(lookup, &agg->ident, agg->group_by, agg);
639 if (status != 0) {
640 ERROR("aggregation plugin: lookup_add failed with status %i.", status);
641 sfree(agg);
642 return -1;
643 }
644
645 DEBUG("aggregation plugin: Successfully added aggregation: "
646 "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
647 "Type \"%s\", TypeInstance \"%s\")",
648 agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
649 agg->ident.type, agg->ident.type_instance);
650 return 0;
651 } /* }}} int agg_config_aggregation */
652
agg_config(oconfig_item_t * ci)653 static int agg_config(oconfig_item_t *ci) /* {{{ */
654 {
655 pthread_mutex_lock(&agg_instance_list_lock);
656
657 if (lookup == NULL) {
658 lookup = lookup_create(agg_lookup_class_callback, agg_lookup_obj_callback,
659 agg_lookup_free_class_callback,
660 agg_lookup_free_obj_callback);
661 if (lookup == NULL) {
662 pthread_mutex_unlock(&agg_instance_list_lock);
663 ERROR("aggregation plugin: lookup_create failed.");
664 return -1;
665 }
666 }
667
668 for (int i = 0; i < ci->children_num; i++) {
669 oconfig_item_t *child = ci->children + i;
670
671 if (strcasecmp("Aggregation", child->key) == 0)
672 agg_config_aggregation(child);
673 else
674 WARNING("aggregation plugin: The \"%s\" key is not allowed inside "
675 "<Plugin aggregation /> blocks and will be ignored.",
676 child->key);
677 }
678
679 pthread_mutex_unlock(&agg_instance_list_lock);
680
681 return 0;
682 } /* }}} int agg_config */
683
agg_read(void)684 static int agg_read(void) /* {{{ */
685 {
686 cdtime_t t = cdtime();
687 int success = 0;
688
689 pthread_mutex_lock(&agg_instance_list_lock);
690
691 /* agg_instance_list_head only holds data, after the "write" callback has
692 * been called with a matching value list at least once. So on startup,
693 * there's a race between the aggregations read() and write() callback. If
694 * the read() callback is called first, agg_instance_list_head is NULL and
695 * "success" may be zero. This is expected and should not result in an error.
696 * Therefore we need to handle this case separately. */
697 if (agg_instance_list_head == NULL) {
698 pthread_mutex_unlock(&agg_instance_list_lock);
699 return 0;
700 }
701
702 for (agg_instance_t *this = agg_instance_list_head; this != NULL;
703 this = this->next) {
704 int status = agg_instance_read(this, t);
705 if (status != 0)
706 WARNING("aggregation plugin: Reading an aggregation instance "
707 "failed with status %i.",
708 status);
709 else
710 success++;
711 }
712
713 pthread_mutex_unlock(&agg_instance_list_lock);
714
715 return (success > 0) ? 0 : -1;
716 } /* }}} int agg_read */
717
agg_write(data_set_t const * ds,value_list_t const * vl,user_data_t * user_data)718 static int agg_write(data_set_t const *ds, value_list_t const *vl, /* {{{ */
719 __attribute__((unused)) user_data_t *user_data) {
720 bool created_by_aggregation = false;
721 /* Ignore values that were created by the aggregation plugin to avoid weird
722 * effects. */
723 (void)meta_data_get_boolean(vl->meta, "aggregation:created",
724 &created_by_aggregation);
725 if (created_by_aggregation)
726 return 0;
727
728 int status;
729
730 if (lookup == NULL)
731 status = ENOENT;
732 else {
733 status = lookup_search(lookup, ds, vl);
734 if (status > 0)
735 status = 0;
736 }
737
738 return status;
739 } /* }}} int agg_write */
740
module_register(void)741 void module_register(void) {
742 plugin_register_complex_config("aggregation", agg_config);
743 plugin_register_read("aggregation", agg_read);
744 plugin_register_write("aggregation", agg_write, /* user_data = */ NULL);
745 }
746