1 /**
2 * collectd - src/statsd.c
3 * Copyright (C) 2013 Florian octo Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian octo Forster <octo at collectd.org>
25 */
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/avltree/avltree.h"
31 #include "utils/common/common.h"
32 #include "utils/latency/latency.h"
33
34 #include <netdb.h>
35 #include <poll.h>
36 #include <sys/types.h>
37
38 /* AIX doesn't have MSG_DONTWAIT */
39 #ifndef MSG_DONTWAIT
40 #define MSG_DONTWAIT MSG_NONBLOCK
41 #endif
42
43 #ifndef STATSD_DEFAULT_NODE
44 #define STATSD_DEFAULT_NODE NULL
45 #endif
46
47 #ifndef STATSD_DEFAULT_SERVICE
48 #define STATSD_DEFAULT_SERVICE "8125"
49 #endif
50
51 enum metric_type_e { STATSD_COUNTER, STATSD_TIMER, STATSD_GAUGE, STATSD_SET };
52 typedef enum metric_type_e metric_type_t;
53
54 struct statsd_metric_s {
55 metric_type_t type;
56 double value;
57 derive_t counter;
58 latency_counter_t *latency;
59 c_avl_tree_t *set;
60 unsigned long updates_num;
61 };
62 typedef struct statsd_metric_s statsd_metric_t;
63
64 static c_avl_tree_t *metrics_tree;
65 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
66
67 static pthread_t network_thread;
68 static bool network_thread_running;
69 static bool network_thread_shutdown;
70
71 static char *conf_node;
72 static char *conf_service;
73
74 static bool conf_delete_counters;
75 static bool conf_delete_timers;
76 static bool conf_delete_gauges;
77 static bool conf_delete_sets;
78
79 static double *conf_timer_percentile;
80 static size_t conf_timer_percentile_num;
81
82 static bool conf_counter_sum;
83 static bool conf_timer_lower;
84 static bool conf_timer_upper;
85 static bool conf_timer_sum;
86 static bool conf_timer_count;
87
88 /* Must hold metrics_lock when calling this function. */
statsd_metric_lookup_unsafe(char const * name,metric_type_t type)89 static statsd_metric_t *statsd_metric_lookup_unsafe(char const *name, /* {{{ */
90 metric_type_t type) {
91 char key[DATA_MAX_NAME_LEN + 2];
92 char *key_copy;
93 statsd_metric_t *metric;
94 int status;
95
96 switch (type) {
97 case STATSD_COUNTER:
98 key[0] = 'c';
99 break;
100 case STATSD_TIMER:
101 key[0] = 't';
102 break;
103 case STATSD_GAUGE:
104 key[0] = 'g';
105 break;
106 case STATSD_SET:
107 key[0] = 's';
108 break;
109 default:
110 return NULL;
111 }
112
113 key[1] = ':';
114 sstrncpy(&key[2], name, sizeof(key) - 2);
115
116 status = c_avl_get(metrics_tree, key, (void *)&metric);
117 if (status == 0)
118 return metric;
119
120 key_copy = strdup(key);
121 if (key_copy == NULL) {
122 ERROR("statsd plugin: strdup failed.");
123 return NULL;
124 }
125
126 metric = calloc(1, sizeof(*metric));
127 if (metric == NULL) {
128 ERROR("statsd plugin: calloc failed.");
129 sfree(key_copy);
130 return NULL;
131 }
132
133 metric->type = type;
134 metric->latency = NULL;
135 metric->set = NULL;
136
137 status = c_avl_insert(metrics_tree, key_copy, metric);
138 if (status != 0) {
139 ERROR("statsd plugin: c_avl_insert failed.");
140 sfree(key_copy);
141 sfree(metric);
142 return NULL;
143 }
144
145 return metric;
146 } /* }}} statsd_metric_lookup_unsafe */
147
statsd_metric_set(char const * name,double value,metric_type_t type)148 static int statsd_metric_set(char const *name, double value, /* {{{ */
149 metric_type_t type) {
150 statsd_metric_t *metric;
151
152 pthread_mutex_lock(&metrics_lock);
153
154 metric = statsd_metric_lookup_unsafe(name, type);
155 if (metric == NULL) {
156 pthread_mutex_unlock(&metrics_lock);
157 return -1;
158 }
159
160 metric->value = value;
161 metric->updates_num++;
162
163 pthread_mutex_unlock(&metrics_lock);
164
165 return 0;
166 } /* }}} int statsd_metric_set */
167
statsd_metric_add(char const * name,double delta,metric_type_t type)168 static int statsd_metric_add(char const *name, double delta, /* {{{ */
169 metric_type_t type) {
170 statsd_metric_t *metric;
171
172 pthread_mutex_lock(&metrics_lock);
173
174 metric = statsd_metric_lookup_unsafe(name, type);
175 if (metric == NULL) {
176 pthread_mutex_unlock(&metrics_lock);
177 return -1;
178 }
179
180 metric->value += delta;
181 metric->updates_num++;
182
183 pthread_mutex_unlock(&metrics_lock);
184
185 return 0;
186 } /* }}} int statsd_metric_add */
187
statsd_metric_free(statsd_metric_t * metric)188 static void statsd_metric_free(statsd_metric_t *metric) /* {{{ */
189 {
190 if (metric == NULL)
191 return;
192
193 if (metric->latency != NULL) {
194 latency_counter_destroy(metric->latency);
195 metric->latency = NULL;
196 }
197
198 if (metric->set != NULL) {
199 void *key;
200 void *value;
201
202 while (c_avl_pick(metric->set, &key, &value) == 0) {
203 sfree(key);
204 assert(value == NULL);
205 }
206
207 c_avl_destroy(metric->set);
208 metric->set = NULL;
209 }
210
211 sfree(metric);
212 } /* }}} void statsd_metric_free */
213
statsd_parse_value(char const * str,value_t * ret_value)214 static int statsd_parse_value(char const *str, value_t *ret_value) /* {{{ */
215 {
216 char *endptr = NULL;
217
218 ret_value->gauge = (gauge_t)strtod(str, &endptr);
219 if ((str == endptr) || ((endptr != NULL) && (*endptr != 0)))
220 return -1;
221
222 return 0;
223 } /* }}} int statsd_parse_value */
224
statsd_handle_counter(char const * name,char const * value_str,char const * extra)225 static int statsd_handle_counter(char const *name, /* {{{ */
226 char const *value_str, char const *extra) {
227 value_t value;
228 value_t scale;
229 int status;
230
231 if ((extra != NULL) && (extra[0] != '@'))
232 return -1;
233
234 scale.gauge = 1.0;
235 if (extra != NULL) {
236 status = statsd_parse_value(extra + 1, &scale);
237 if (status != 0)
238 return status;
239
240 if (!isfinite(scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
241 return -1;
242 }
243
244 value.gauge = 1.0;
245 status = statsd_parse_value(value_str, &value);
246 if (status != 0)
247 return status;
248
249 /* Changes to the counter are added to (statsd_metric_t*)->value. ->counter is
250 * only updated in statsd_metric_submit_unsafe(). */
251 return statsd_metric_add(name, (double)(value.gauge / scale.gauge),
252 STATSD_COUNTER);
253 } /* }}} int statsd_handle_counter */
254
statsd_handle_gauge(char const * name,char const * value_str)255 static int statsd_handle_gauge(char const *name, /* {{{ */
256 char const *value_str) {
257 value_t value;
258 int status;
259
260 value.gauge = 0;
261 status = statsd_parse_value(value_str, &value);
262 if (status != 0)
263 return status;
264
265 if ((value_str[0] == '+') || (value_str[0] == '-'))
266 return statsd_metric_add(name, (double)value.gauge, STATSD_GAUGE);
267 else
268 return statsd_metric_set(name, (double)value.gauge, STATSD_GAUGE);
269 } /* }}} int statsd_handle_gauge */
270
statsd_handle_timer(char const * name,char const * value_str,char const * extra)271 static int statsd_handle_timer(char const *name, /* {{{ */
272 char const *value_str, char const *extra) {
273 statsd_metric_t *metric;
274 value_t value_ms;
275 value_t scale;
276 cdtime_t value;
277 int status;
278
279 if ((extra != NULL) && (extra[0] != '@'))
280 return -1;
281
282 scale.gauge = 1.0;
283 if (extra != NULL) {
284 status = statsd_parse_value(extra + 1, &scale);
285 if (status != 0)
286 return status;
287
288 if (!isfinite(scale.gauge) || (scale.gauge <= 0.0) || (scale.gauge > 1.0))
289 return -1;
290 }
291
292 value_ms.derive = 0;
293 status = statsd_parse_value(value_str, &value_ms);
294 if (status != 0)
295 return status;
296
297 value = MS_TO_CDTIME_T(value_ms.gauge / scale.gauge);
298
299 pthread_mutex_lock(&metrics_lock);
300
301 metric = statsd_metric_lookup_unsafe(name, STATSD_TIMER);
302 if (metric == NULL) {
303 pthread_mutex_unlock(&metrics_lock);
304 return -1;
305 }
306
307 if (metric->latency == NULL)
308 metric->latency = latency_counter_create();
309 if (metric->latency == NULL) {
310 pthread_mutex_unlock(&metrics_lock);
311 return -1;
312 }
313
314 latency_counter_add(metric->latency, value);
315 metric->updates_num++;
316
317 pthread_mutex_unlock(&metrics_lock);
318 return 0;
319 } /* }}} int statsd_handle_timer */
320
statsd_handle_set(char const * name,char const * set_key_orig)321 static int statsd_handle_set(char const *name, /* {{{ */
322 char const *set_key_orig) {
323 statsd_metric_t *metric = NULL;
324 char *set_key;
325 int status;
326
327 pthread_mutex_lock(&metrics_lock);
328
329 metric = statsd_metric_lookup_unsafe(name, STATSD_SET);
330 if (metric == NULL) {
331 pthread_mutex_unlock(&metrics_lock);
332 return -1;
333 }
334
335 /* Make sure metric->set exists. */
336 if (metric->set == NULL)
337 metric->set = c_avl_create((int (*)(const void *, const void *))strcmp);
338
339 if (metric->set == NULL) {
340 pthread_mutex_unlock(&metrics_lock);
341 ERROR("statsd plugin: c_avl_create failed.");
342 return -1;
343 }
344
345 set_key = strdup(set_key_orig);
346 if (set_key == NULL) {
347 pthread_mutex_unlock(&metrics_lock);
348 ERROR("statsd plugin: strdup failed.");
349 return -1;
350 }
351
352 status = c_avl_insert(metric->set, set_key, /* value = */ NULL);
353 if (status < 0) {
354 pthread_mutex_unlock(&metrics_lock);
355 ERROR("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
356 set_key, status);
357 sfree(set_key);
358 return -1;
359 } else if (status > 0) /* key already exists */
360 {
361 sfree(set_key);
362 }
363
364 metric->updates_num++;
365
366 pthread_mutex_unlock(&metrics_lock);
367 return 0;
368 } /* }}} int statsd_handle_set */
369
statsd_parse_line(char * buffer)370 static int statsd_parse_line(char *buffer) /* {{{ */
371 {
372 char *name = buffer;
373 char *value;
374 char *type;
375 char *extra;
376
377 type = strchr(name, '|');
378 if (type == NULL)
379 return -1;
380 *type = 0;
381 type++;
382
383 value = strrchr(name, ':');
384 if (value == NULL)
385 return -1;
386 *value = 0;
387 value++;
388
389 extra = strchr(type, '|');
390 if (extra != NULL) {
391 *extra = 0;
392 extra++;
393 }
394
395 if (strcmp("c", type) == 0)
396 return statsd_handle_counter(name, value, extra);
397 else if (strcmp("ms", type) == 0)
398 return statsd_handle_timer(name, value, extra);
399
400 /* extra is only valid for counters and timers */
401 if (extra != NULL)
402 return -1;
403
404 if (strcmp("g", type) == 0)
405 return statsd_handle_gauge(name, value);
406 else if (strcmp("s", type) == 0)
407 return statsd_handle_set(name, value);
408 else
409 return -1;
410 } /* }}} void statsd_parse_line */
411
statsd_parse_buffer(char * buffer)412 static void statsd_parse_buffer(char *buffer) /* {{{ */
413 {
414 while (buffer != NULL) {
415 char orig[64];
416 char *next;
417 int status;
418
419 next = strchr(buffer, '\n');
420 if (next != NULL) {
421 *next = 0;
422 next++;
423 }
424
425 if (*buffer == 0) {
426 buffer = next;
427 continue;
428 }
429
430 sstrncpy(orig, buffer, sizeof(orig));
431
432 status = statsd_parse_line(buffer);
433 if (status != 0)
434 ERROR("statsd plugin: Unable to parse line: \"%s\"", orig);
435
436 buffer = next;
437 }
438 } /* }}} void statsd_parse_buffer */
439
statsd_network_read(int fd)440 static void statsd_network_read(int fd) /* {{{ */
441 {
442 char buffer[4096];
443 size_t buffer_size;
444 ssize_t status;
445
446 status = recv(fd, buffer, sizeof(buffer), /* flags = */ MSG_DONTWAIT);
447 if (status < 0) {
448
449 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
450 return;
451
452 ERROR("statsd plugin: recv(2) failed: %s", STRERRNO);
453 return;
454 }
455
456 buffer_size = (size_t)status;
457 if (buffer_size >= sizeof(buffer))
458 buffer_size = sizeof(buffer) - 1;
459 buffer[buffer_size] = 0;
460
461 statsd_parse_buffer(buffer);
462 } /* }}} void statsd_network_read */
463
statsd_network_init(struct pollfd ** ret_fds,size_t * ret_fds_num)464 static int statsd_network_init(struct pollfd **ret_fds, /* {{{ */
465 size_t *ret_fds_num) {
466 struct pollfd *fds = NULL;
467 size_t fds_num = 0;
468
469 struct addrinfo *ai_list;
470 int status;
471
472 char const *node = (conf_node != NULL) ? conf_node : STATSD_DEFAULT_NODE;
473 char const *service =
474 (conf_service != NULL) ? conf_service : STATSD_DEFAULT_SERVICE;
475
476 struct addrinfo ai_hints = {.ai_family = AF_UNSPEC,
477 .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
478 .ai_socktype = SOCK_DGRAM};
479
480 status = getaddrinfo(node, service, &ai_hints, &ai_list);
481 if (status != 0) {
482 ERROR("statsd plugin: getaddrinfo (\"%s\", \"%s\") failed: %s", node,
483 service, gai_strerror(status));
484 return status;
485 }
486
487 for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL;
488 ai_ptr = ai_ptr->ai_next) {
489 int fd;
490 struct pollfd *tmp;
491
492 char str_node[NI_MAXHOST];
493 char str_service[NI_MAXSERV];
494
495 fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
496 if (fd < 0) {
497 ERROR("statsd plugin: socket(2) failed: %s", STRERRNO);
498 continue;
499 }
500
501 /* allow multiple sockets to use the same PORT number */
502 int yes = 1;
503 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) {
504 ERROR("statsd plugin: setsockopt (reuseaddr): %s", STRERRNO);
505 close(fd);
506 continue;
507 }
508
509 getnameinfo(ai_ptr->ai_addr, ai_ptr->ai_addrlen, str_node, sizeof(str_node),
510 str_service, sizeof(str_service),
511 NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV);
512 DEBUG("statsd plugin: Trying to bind to [%s]:%s ...", str_node,
513 str_service);
514
515 status = bind(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
516 if (status != 0) {
517 ERROR("statsd plugin: bind(2) to [%s]:%s failed: %s", str_node,
518 str_service, STRERRNO);
519 close(fd);
520 continue;
521 }
522
523 tmp = realloc(fds, sizeof(*fds) * (fds_num + 1));
524 if (tmp == NULL) {
525 ERROR("statsd plugin: realloc failed.");
526 close(fd);
527 continue;
528 }
529 fds = tmp;
530 tmp = fds + fds_num;
531 fds_num++;
532
533 memset(tmp, 0, sizeof(*tmp));
534 tmp->fd = fd;
535 tmp->events = POLLIN | POLLPRI;
536 INFO("statsd plugin: Listening on [%s]:%s.", str_node, str_service);
537 }
538
539 freeaddrinfo(ai_list);
540
541 if (fds_num == 0) {
542 ERROR("statsd plugin: Unable to create listening socket for [%s]:%s.",
543 (node != NULL) ? node : "::", service);
544 return ENOENT;
545 }
546
547 *ret_fds = fds;
548 *ret_fds_num = fds_num;
549 return 0;
550 } /* }}} int statsd_network_init */
551
statsd_network_thread(void * args)552 static void *statsd_network_thread(void *args) /* {{{ */
553 {
554 struct pollfd *fds = NULL;
555 size_t fds_num = 0;
556 int status;
557
558 status = statsd_network_init(&fds, &fds_num);
559 if (status != 0) {
560 ERROR("statsd plugin: Unable to open listening sockets.");
561 pthread_exit((void *)0);
562 }
563
564 while (!network_thread_shutdown) {
565 status = poll(fds, (nfds_t)fds_num, /* timeout = */ -1);
566 if (status < 0) {
567
568 if ((errno == EINTR) || (errno == EAGAIN))
569 continue;
570
571 ERROR("statsd plugin: poll(2) failed: %s", STRERRNO);
572 break;
573 }
574
575 for (size_t i = 0; i < fds_num; i++) {
576 if ((fds[i].revents & (POLLIN | POLLPRI)) == 0)
577 continue;
578
579 statsd_network_read(fds[i].fd);
580 fds[i].revents = 0;
581 }
582 } /* while (!network_thread_shutdown) */
583
584 /* Clean up */
585 for (size_t i = 0; i < fds_num; i++)
586 close(fds[i].fd);
587 sfree(fds);
588
589 return (void *)0;
590 } /* }}} void *statsd_network_thread */
591
statsd_config_timer_percentile(oconfig_item_t * ci)592 static int statsd_config_timer_percentile(oconfig_item_t *ci) /* {{{ */
593 {
594 double percent = NAN;
595 double *tmp;
596 int status;
597
598 status = cf_util_get_double(ci, &percent);
599 if (status != 0)
600 return status;
601
602 if ((percent <= 0.0) || (percent >= 100)) {
603 ERROR("statsd plugin: The value for \"%s\" must be between 0 and 100, "
604 "exclusively.",
605 ci->key);
606 return ERANGE;
607 }
608
609 tmp = realloc(conf_timer_percentile, sizeof(*conf_timer_percentile) *
610 (conf_timer_percentile_num + 1));
611 if (tmp == NULL) {
612 ERROR("statsd plugin: realloc failed.");
613 return ENOMEM;
614 }
615 conf_timer_percentile = tmp;
616 conf_timer_percentile[conf_timer_percentile_num] = percent;
617 conf_timer_percentile_num++;
618
619 return 0;
620 } /* }}} int statsd_config_timer_percentile */
621
statsd_config(oconfig_item_t * ci)622 static int statsd_config(oconfig_item_t *ci) /* {{{ */
623 {
624 for (int i = 0; i < ci->children_num; i++) {
625 oconfig_item_t *child = ci->children + i;
626
627 if (strcasecmp("Host", child->key) == 0)
628 cf_util_get_string(child, &conf_node);
629 else if (strcasecmp("Port", child->key) == 0)
630 cf_util_get_service(child, &conf_service);
631 else if (strcasecmp("DeleteCounters", child->key) == 0)
632 cf_util_get_boolean(child, &conf_delete_counters);
633 else if (strcasecmp("DeleteTimers", child->key) == 0)
634 cf_util_get_boolean(child, &conf_delete_timers);
635 else if (strcasecmp("DeleteGauges", child->key) == 0)
636 cf_util_get_boolean(child, &conf_delete_gauges);
637 else if (strcasecmp("DeleteSets", child->key) == 0)
638 cf_util_get_boolean(child, &conf_delete_sets);
639 else if (strcasecmp("CounterSum", child->key) == 0)
640 cf_util_get_boolean(child, &conf_counter_sum);
641 else if (strcasecmp("TimerLower", child->key) == 0)
642 cf_util_get_boolean(child, &conf_timer_lower);
643 else if (strcasecmp("TimerUpper", child->key) == 0)
644 cf_util_get_boolean(child, &conf_timer_upper);
645 else if (strcasecmp("TimerSum", child->key) == 0)
646 cf_util_get_boolean(child, &conf_timer_sum);
647 else if (strcasecmp("TimerCount", child->key) == 0)
648 cf_util_get_boolean(child, &conf_timer_count);
649 else if (strcasecmp("TimerPercentile", child->key) == 0)
650 statsd_config_timer_percentile(child);
651 else
652 ERROR("statsd plugin: The \"%s\" config option is not valid.",
653 child->key);
654 }
655
656 return 0;
657 } /* }}} int statsd_config */
658
statsd_init(void)659 static int statsd_init(void) /* {{{ */
660 {
661 pthread_mutex_lock(&metrics_lock);
662 if (metrics_tree == NULL)
663 metrics_tree = c_avl_create((int (*)(const void *, const void *))strcmp);
664
665 if (!network_thread_running) {
666 int status;
667
668 status = pthread_create(&network_thread,
669 /* attr = */ NULL, statsd_network_thread,
670 /* args = */ NULL);
671 if (status != 0) {
672 pthread_mutex_unlock(&metrics_lock);
673 ERROR("statsd plugin: pthread_create failed: %s", STRERRNO);
674 return status;
675 }
676 }
677 network_thread_running = true;
678
679 pthread_mutex_unlock(&metrics_lock);
680
681 return 0;
682 } /* }}} int statsd_init */
683
684 /* Must hold metrics_lock when calling this function. */
statsd_metric_clear_set_unsafe(statsd_metric_t * metric)685 static int statsd_metric_clear_set_unsafe(statsd_metric_t *metric) /* {{{ */
686 {
687 void *key;
688 void *value;
689
690 if ((metric == NULL) || (metric->type != STATSD_SET))
691 return EINVAL;
692
693 if (metric->set == NULL)
694 return 0;
695
696 while (c_avl_pick(metric->set, &key, &value) == 0) {
697 sfree(key);
698 sfree(value);
699 }
700
701 return 0;
702 } /* }}} int statsd_metric_clear_set_unsafe */
703
704 /* Must hold metrics_lock when calling this function. */
statsd_metric_submit_unsafe(char const * name,statsd_metric_t * metric)705 static int statsd_metric_submit_unsafe(char const *name,
706 statsd_metric_t *metric) /* {{{ */
707 {
708 value_list_t vl = VALUE_LIST_INIT;
709
710 vl.values = &(value_t){.gauge = NAN};
711 vl.values_len = 1;
712 sstrncpy(vl.plugin, "statsd", sizeof(vl.plugin));
713
714 if (metric->type == STATSD_GAUGE)
715 sstrncpy(vl.type, "gauge", sizeof(vl.type));
716 else if (metric->type == STATSD_TIMER)
717 sstrncpy(vl.type, "latency", sizeof(vl.type));
718 else if (metric->type == STATSD_SET)
719 sstrncpy(vl.type, "objects", sizeof(vl.type));
720 else /* if (metric->type == STATSD_COUNTER) */
721 sstrncpy(vl.type, "derive", sizeof(vl.type));
722
723 sstrncpy(vl.type_instance, name, sizeof(vl.type_instance));
724
725 if (metric->type == STATSD_GAUGE)
726 vl.values[0].gauge = (gauge_t)metric->value;
727 else if (metric->type == STATSD_TIMER) {
728 bool have_events = (metric->updates_num > 0);
729
730 /* Make sure all timer metrics share the *same* timestamp. */
731 vl.time = cdtime();
732
733 snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-average", name);
734 vl.values[0].gauge =
735 have_events
736 ? CDTIME_T_TO_DOUBLE(latency_counter_get_average(metric->latency))
737 : NAN;
738 plugin_dispatch_values(&vl);
739
740 if (conf_timer_lower) {
741 snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-lower", name);
742 vl.values[0].gauge =
743 have_events
744 ? CDTIME_T_TO_DOUBLE(latency_counter_get_min(metric->latency))
745 : NAN;
746 plugin_dispatch_values(&vl);
747 }
748
749 if (conf_timer_upper) {
750 snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-upper", name);
751 vl.values[0].gauge =
752 have_events
753 ? CDTIME_T_TO_DOUBLE(latency_counter_get_max(metric->latency))
754 : NAN;
755 plugin_dispatch_values(&vl);
756 }
757
758 if (conf_timer_sum) {
759 snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-sum", name);
760 vl.values[0].gauge =
761 have_events
762 ? CDTIME_T_TO_DOUBLE(latency_counter_get_sum(metric->latency))
763 : NAN;
764 plugin_dispatch_values(&vl);
765 }
766
767 for (size_t i = 0; i < conf_timer_percentile_num; i++) {
768 snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-percentile-%.0f",
769 name, conf_timer_percentile[i]);
770 vl.values[0].gauge =
771 have_events ? CDTIME_T_TO_DOUBLE(latency_counter_get_percentile(
772 metric->latency, conf_timer_percentile[i]))
773 : NAN;
774 plugin_dispatch_values(&vl);
775 }
776
777 /* Keep this at the end, since vl.type is set to "gauge" here. The
778 * vl.type's above are implicitly set to "latency". */
779 if (conf_timer_count) {
780 sstrncpy(vl.type, "gauge", sizeof(vl.type));
781 snprintf(vl.type_instance, sizeof(vl.type_instance), "%s-count", name);
782 vl.values[0].gauge = latency_counter_get_num(metric->latency);
783 plugin_dispatch_values(&vl);
784 }
785
786 latency_counter_reset(metric->latency);
787 return 0;
788 } else if (metric->type == STATSD_SET) {
789 if (metric->set == NULL)
790 vl.values[0].gauge = 0.0;
791 else
792 vl.values[0].gauge = (gauge_t)c_avl_size(metric->set);
793 } else { /* STATSD_COUNTER */
794 gauge_t delta = nearbyint(metric->value);
795
796 /* Etsy's statsd writes counters as two metrics: a rate and the change since
797 * the last write. Since collectd does not reset its DERIVE metrics to zero,
798 * this makes little sense, but we're dispatching a "count" metric here
799 * anyway - if requested by the user - for compatibility reasons. */
800 if (conf_counter_sum) {
801 sstrncpy(vl.type, "count", sizeof(vl.type));
802 vl.values[0].gauge = delta;
803 plugin_dispatch_values(&vl);
804
805 /* restore vl.type */
806 sstrncpy(vl.type, "derive", sizeof(vl.type));
807 }
808
809 /* Rather than resetting value to zero, subtract delta so we correctly keep
810 * track of residuals. */
811 metric->value -= delta;
812 metric->counter += (derive_t)delta;
813
814 vl.values[0].derive = metric->counter;
815 }
816
817 return plugin_dispatch_values(&vl);
818 } /* }}} int statsd_metric_submit_unsafe */
819
statsd_read(void)820 static int statsd_read(void) /* {{{ */
821 {
822 c_avl_iterator_t *iter;
823 char *name;
824 statsd_metric_t *metric;
825
826 char **to_be_deleted = NULL;
827 size_t to_be_deleted_num = 0;
828
829 pthread_mutex_lock(&metrics_lock);
830
831 if (metrics_tree == NULL) {
832 pthread_mutex_unlock(&metrics_lock);
833 return 0;
834 }
835
836 iter = c_avl_get_iterator(metrics_tree);
837 while (c_avl_iterator_next(iter, (void *)&name, (void *)&metric) == 0) {
838 if ((metric->updates_num == 0) &&
839 ((conf_delete_counters && (metric->type == STATSD_COUNTER)) ||
840 (conf_delete_timers && (metric->type == STATSD_TIMER)) ||
841 (conf_delete_gauges && (metric->type == STATSD_GAUGE)) ||
842 (conf_delete_sets && (metric->type == STATSD_SET)))) {
843 DEBUG("statsd plugin: Deleting metric \"%s\".", name);
844 strarray_add(&to_be_deleted, &to_be_deleted_num, name);
845 continue;
846 }
847
848 /* Names have a prefix, e.g. "c:", which determines the (statsd) type.
849 * Remove this here. */
850 statsd_metric_submit_unsafe(name + 2, metric);
851
852 /* Reset the metric. */
853 metric->updates_num = 0;
854 if (metric->type == STATSD_SET)
855 statsd_metric_clear_set_unsafe(metric);
856 }
857 c_avl_iterator_destroy(iter);
858
859 for (size_t i = 0; i < to_be_deleted_num; i++) {
860 int status;
861
862 status = c_avl_remove(metrics_tree, to_be_deleted[i], (void *)&name,
863 (void *)&metric);
864 if (status != 0) {
865 ERROR("stats plugin: c_avl_remove (\"%s\") failed with status %i.",
866 to_be_deleted[i], status);
867 continue;
868 }
869
870 sfree(name);
871 statsd_metric_free(metric);
872 }
873
874 pthread_mutex_unlock(&metrics_lock);
875
876 strarray_free(to_be_deleted, to_be_deleted_num);
877
878 return 0;
879 } /* }}} int statsd_read */
880
statsd_shutdown(void)881 static int statsd_shutdown(void) /* {{{ */
882 {
883 void *key;
884 void *value;
885
886 if (network_thread_running) {
887 network_thread_shutdown = true;
888 pthread_kill(network_thread, SIGTERM);
889 pthread_join(network_thread, /* retval = */ NULL);
890 }
891 network_thread_running = false;
892
893 pthread_mutex_lock(&metrics_lock);
894
895 while (c_avl_pick(metrics_tree, &key, &value) == 0) {
896 sfree(key);
897 statsd_metric_free(value);
898 }
899 c_avl_destroy(metrics_tree);
900 metrics_tree = NULL;
901
902 sfree(conf_node);
903 sfree(conf_service);
904
905 pthread_mutex_unlock(&metrics_lock);
906
907 return 0;
908 } /* }}} int statsd_shutdown */
909
module_register(void)910 void module_register(void) {
911 plugin_register_complex_config("statsd", statsd_config);
912 plugin_register_init("statsd", statsd_init);
913 plugin_register_read("statsd", statsd_read);
914 plugin_register_shutdown("statsd", statsd_shutdown);
915 }
916