1 /**
2 * collectd - src/write_sensu.c
3 * Copyright (C) 2015 Fabrice A. Marie
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Fabrice A. Marie <fabrice at kibinlabs.com>
25 */
26
27 #define _GNU_SOURCE
28
29 #include "collectd.h"
30
31 #include "plugin.h"
32 #include "utils/common/common.h"
33 #include "utils_cache.h"
34 #include <arpa/inet.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <netdb.h>
38 #include <stddef.h>
39
40 #include <stdlib.h>
41 #define SENSU_HOST "localhost"
42 #define SENSU_PORT "3030"
43
44 #ifdef HAVE_ASPRINTF
45 #define my_asprintf asprintf
46 #define my_vasprintf vasprintf
47 #else
48 /*
49 * asprintf() is available from Solaris 10 update 11.
50 * For older versions, use asprintf() portable implementation from
51 * https://github.com/littlstar/asprintf.c/blob/master/
52 * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
53 */
54
my_vasprintf(char ** str,const char * fmt,va_list args)55 static int my_vasprintf(char **str, const char *fmt, va_list args) {
56 int size = 0;
57 va_list tmpa;
58 // copy
59 va_copy(tmpa, args);
60 // apply variadic arguments to
61 // sprintf with format to get size
62 size = vsnprintf(NULL, size, fmt, tmpa);
63 // toss args
64 va_end(tmpa);
65 // return -1 to be compliant if
66 // size is less than 0
67 if (size < 0) {
68 return -1;
69 }
70 // alloc with size plus 1 for `\0'
71 *str = (char *)malloc(size + 1);
72 // return -1 to be compliant
73 // if pointer is `NULL'
74 if (NULL == *str) {
75 return -1;
76 }
77 // format string with original
78 // variadic arguments and set new size
79 size = vsprintf(*str, fmt, args);
80 return size;
81 }
82
my_asprintf(char ** str,const char * fmt,...)83 static int my_asprintf(char **str, const char *fmt, ...) {
84 int size = 0;
85 va_list args;
86 // init variadic argumens
87 va_start(args, fmt);
88 // format and get size
89 size = my_vasprintf(str, fmt, args);
90 // toss args
91 va_end(args);
92 return size;
93 }
94
95 #endif
96
97 struct str_list {
98 int nb_strs;
99 char **strs;
100 };
101
102 struct sensu_host {
103 char *name;
104 char *event_service_prefix;
105 struct str_list metric_handlers;
106 struct str_list notification_handlers;
107 #define F_READY 0x01
108 uint8_t flags;
109 pthread_mutex_t lock;
110 bool notifications;
111 bool metrics;
112 bool store_rates;
113 bool always_append_ds;
114 bool include_source;
115 char *separator;
116 char *node;
117 char *service;
118 int s;
119 struct addrinfo *res;
120 int reference_count;
121 };
122
123 static char *sensu_tags;
124 static char **sensu_attrs;
125 static size_t sensu_attrs_num;
126
add_str_to_list(struct str_list * strs,const char * str_to_add)127 static int add_str_to_list(struct str_list *strs,
128 const char *str_to_add) /* {{{ */
129 {
130 char **old_strs_ptr = strs->strs;
131 char *newstr = strdup(str_to_add);
132 if (newstr == NULL) {
133 ERROR("write_sensu plugin: Unable to alloc memory");
134 return -1;
135 }
136 strs->strs = realloc(strs->strs, strs->nb_strs + 1);
137 if (strs->strs == NULL) {
138 strs->strs = old_strs_ptr;
139 free(newstr);
140 ERROR("write_sensu plugin: Unable to alloc memory");
141 return -1;
142 }
143 strs->strs[strs->nb_strs] = newstr;
144 strs->nb_strs++;
145 return 0;
146 }
147 /* }}} int add_str_to_list */
148
free_str_list(struct str_list * strs)149 static void free_str_list(struct str_list *strs) /* {{{ */
150 {
151 for (int i = 0; i < strs->nb_strs; i++)
152 free(strs->strs[i]);
153 free(strs->strs);
154 }
155 /* }}} void free_str_list */
156
sensu_connect(struct sensu_host * host)157 static int sensu_connect(struct sensu_host *host) /* {{{ */
158 {
159 int e;
160 char const *node;
161 char const *service;
162
163 // Resolve the target if we haven't done already
164 if (!(host->flags & F_READY)) {
165 memset(&service, 0, sizeof(service));
166 host->res = NULL;
167
168 node = (host->node != NULL) ? host->node : SENSU_HOST;
169 service = (host->service != NULL) ? host->service : SENSU_PORT;
170
171 struct addrinfo ai_hints = {.ai_family = AF_INET,
172 .ai_flags = AI_ADDRCONFIG,
173 .ai_socktype = SOCK_STREAM};
174
175 if ((e = getaddrinfo(node, service, &ai_hints, &(host->res))) != 0) {
176 ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s", node,
177 gai_strerror(e));
178 return -1;
179 }
180 DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s", node,
181 service);
182 host->flags |= F_READY;
183 }
184
185 struct linger so_linger;
186 host->s = -1;
187 for (struct addrinfo *ai = host->res; ai != NULL; ai = ai->ai_next) {
188 // create the socket
189 if ((host->s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) ==
190 -1) {
191 continue;
192 }
193
194 // Set very low close() lingering
195 so_linger.l_onoff = 1;
196 so_linger.l_linger = 3;
197 if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger,
198 sizeof so_linger) != 0)
199 WARNING("write_sensu plugin: failed to set socket close() lingering");
200
201 set_sock_opts(host->s);
202
203 // connect the socket
204 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
205 close(host->s);
206 host->s = -1;
207 continue;
208 }
209 DEBUG("write_sensu plugin: connected");
210 break;
211 }
212
213 if (host->s < 0) {
214 WARNING("write_sensu plugin: Unable to connect to sensu client");
215 return -1;
216 }
217 return 0;
218 } /* }}} int sensu_connect */
219
sensu_close_socket(struct sensu_host * host)220 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
221 {
222 if (host->s != -1)
223 close(host->s);
224 host->s = -1;
225
226 } /* }}} void sensu_close_socket */
227
build_json_str_list(const char * tag,struct str_list const * list)228 static char *build_json_str_list(const char *tag,
229 struct str_list const *list) /* {{{ */
230 {
231 int res;
232 char *ret_str = NULL;
233 char *temp_str;
234 if (list->nb_strs == 0) {
235 ret_str = malloc(1);
236 if (ret_str == NULL) {
237 ERROR("write_sensu plugin: Unable to alloc memory");
238 return NULL;
239 }
240 ret_str[0] = '\0';
241 }
242
243 res = my_asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
244 if (res == -1) {
245 ERROR("write_sensu plugin: Unable to alloc memory");
246 free(ret_str);
247 return NULL;
248 }
249 for (int i = 1; i < list->nb_strs; i++) {
250 res = my_asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
251 free(temp_str);
252 if (res == -1) {
253 ERROR("write_sensu plugin: Unable to alloc memory");
254 return NULL;
255 }
256 temp_str = ret_str;
257 }
258 res = my_asprintf(&ret_str, "%s]", temp_str);
259 free(temp_str);
260 if (res == -1) {
261 ERROR("write_sensu plugin: Unable to alloc memory");
262 return NULL;
263 }
264
265 return ret_str;
266 } /* }}} char *build_json_str_list*/
267
sensu_format_name2(char * ret,int ret_len,const char * hostname,const char * plugin,const char * plugin_instance,const char * type,const char * type_instance,const char * separator)268 static int sensu_format_name2(char *ret, int ret_len, const char *hostname,
269 const char *plugin, const char *plugin_instance,
270 const char *type, const char *type_instance,
271 const char *separator) {
272 char *buffer;
273 size_t buffer_size;
274
275 buffer = ret;
276 buffer_size = (size_t)ret_len;
277
278 #define APPEND(str) \
279 do { \
280 size_t l = strlen(str); \
281 if (l >= buffer_size) \
282 return ENOBUFS; \
283 memcpy(buffer, (str), l); \
284 buffer += l; \
285 buffer_size -= l; \
286 } while (0)
287
288 assert(plugin != NULL);
289 assert(type != NULL);
290
291 APPEND(hostname);
292 APPEND(separator);
293 APPEND(plugin);
294 if ((plugin_instance != NULL) && (plugin_instance[0] != 0)) {
295 APPEND("-");
296 APPEND(plugin_instance);
297 }
298 APPEND(separator);
299 APPEND(type);
300 if ((type_instance != NULL) && (type_instance[0] != 0)) {
301 APPEND("-");
302 APPEND(type_instance);
303 }
304 assert(buffer_size > 0);
305 buffer[0] = 0;
306
307 #undef APPEND
308 return 0;
309 } /* int sensu_format_name2 */
310
in_place_replace_sensu_name_reserved(char * orig_name)311 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
312 {
313 size_t len = strlen(orig_name);
314 for (size_t i = 0; i < len; i++) {
315 // some plugins like ipmi generate special characters in metric name
316 switch (orig_name[i]) {
317 case '(':
318 orig_name[i] = '_';
319 break;
320 case ')':
321 orig_name[i] = '_';
322 break;
323 case ' ':
324 orig_name[i] = '_';
325 break;
326 case '"':
327 orig_name[i] = '_';
328 break;
329 case '\'':
330 orig_name[i] = '_';
331 break;
332 case '+':
333 orig_name[i] = '_';
334 break;
335 }
336 }
337 } /* }}} char *replace_sensu_name_reserved */
338
sensu_value_to_json(struct sensu_host const * host,data_set_t const * ds,value_list_t const * vl,size_t index,gauge_t const * rates)339 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
340 data_set_t const *ds, value_list_t const *vl,
341 size_t index, gauge_t const *rates) {
342 char name_buffer[5 * DATA_MAX_NAME_LEN];
343 char service_buffer[6 * DATA_MAX_NAME_LEN];
344 char *ret_str;
345 char *temp_str;
346 char *value_str;
347 int res;
348 // First part of the JSON string
349 const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
350
351 char *handlers_str =
352 build_json_str_list("handlers", &(host->metric_handlers));
353 if (handlers_str == NULL) {
354 ERROR("write_sensu plugin: Unable to alloc memory");
355 return NULL;
356 }
357
358 // incorporate the handlers
359 if (strlen(handlers_str) == 0) {
360 free(handlers_str);
361 ret_str = strdup(part1);
362 if (ret_str == NULL) {
363 ERROR("write_sensu plugin: Unable to alloc memory");
364 return NULL;
365 }
366 } else {
367 res = my_asprintf(&ret_str, "%s, %s", part1, handlers_str);
368 free(handlers_str);
369 if (res == -1) {
370 ERROR("write_sensu plugin: Unable to alloc memory");
371 return NULL;
372 }
373 }
374
375 if (host->include_source) {
376 res = my_asprintf(&temp_str, "%s, \"source\": \"%s\"", ret_str, vl->host);
377 free(ret_str);
378 if (res == -1) {
379 ERROR("write_sensu plugin: Unable to alloc memory");
380 return NULL;
381 }
382 ret_str = temp_str;
383 }
384
385 // incorporate the plugin name information
386 res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str,
387 vl->plugin);
388 free(ret_str);
389 if (res == -1) {
390 ERROR("write_sensu plugin: Unable to alloc memory");
391 return NULL;
392 }
393 ret_str = temp_str;
394
395 // incorporate the plugin type
396 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str,
397 vl->type);
398 free(ret_str);
399 if (res == -1) {
400 ERROR("write_sensu plugin: Unable to alloc memory");
401 return NULL;
402 }
403 ret_str = temp_str;
404
405 // incorporate the plugin instance if any
406 if (vl->plugin_instance[0] != 0) {
407 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"",
408 ret_str, vl->plugin_instance);
409 free(ret_str);
410 if (res == -1) {
411 ERROR("write_sensu plugin: Unable to alloc memory");
412 return NULL;
413 }
414 ret_str = temp_str;
415 }
416
417 // incorporate the plugin type instance if any
418 if (vl->type_instance[0] != 0) {
419 res =
420 my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"",
421 ret_str, vl->type_instance);
422 free(ret_str);
423 if (res == -1) {
424 ERROR("write_sensu plugin: Unable to alloc memory");
425 return NULL;
426 }
427 ret_str = temp_str;
428 }
429
430 // incorporate the data source type
431 if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
432 char ds_type[DATA_MAX_NAME_LEN];
433 snprintf(ds_type, sizeof(ds_type), "%s:rate",
434 DS_TYPE_TO_STRING(ds->ds[index].type));
435 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"",
436 ret_str, ds_type);
437 free(ret_str);
438 if (res == -1) {
439 ERROR("write_sensu plugin: Unable to alloc memory");
440 return NULL;
441 }
442 ret_str = temp_str;
443 } else {
444 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"",
445 ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
446 free(ret_str);
447 if (res == -1) {
448 ERROR("write_sensu plugin: Unable to alloc memory");
449 return NULL;
450 }
451 ret_str = temp_str;
452 }
453
454 // incorporate the data source name
455 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"",
456 ret_str, ds->ds[index].name);
457 free(ret_str);
458 if (res == -1) {
459 ERROR("write_sensu plugin: Unable to alloc memory");
460 return NULL;
461 }
462 ret_str = temp_str;
463
464 // incorporate the data source index
465 {
466 char ds_index[DATA_MAX_NAME_LEN];
467 snprintf(ds_index, sizeof(ds_index), "%" PRIsz, index);
468 res = my_asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s",
469 ret_str, ds_index);
470 free(ret_str);
471 if (res == -1) {
472 ERROR("write_sensu plugin: Unable to alloc memory");
473 return NULL;
474 }
475 ret_str = temp_str;
476 }
477
478 // add key value attributes from config if any
479 for (size_t i = 0; i < sensu_attrs_num; i += 2) {
480 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i],
481 sensu_attrs[i + 1]);
482 free(ret_str);
483 if (res == -1) {
484 ERROR("write_sensu plugin: Unable to alloc memory");
485 return NULL;
486 }
487 ret_str = temp_str;
488 }
489
490 // incorporate sensu tags from config if any
491 if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
492 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
493 free(ret_str);
494 if (res == -1) {
495 ERROR("write_sensu plugin: Unable to alloc memory");
496 return NULL;
497 }
498 ret_str = temp_str;
499 }
500
501 // calculate the value and set to a string
502 if (ds->ds[index].type == DS_TYPE_GAUGE) {
503 res = my_asprintf(&value_str, GAUGE_FORMAT, vl->values[index].gauge);
504 if (res == -1) {
505 free(ret_str);
506 ERROR("write_sensu plugin: Unable to alloc memory");
507 return NULL;
508 }
509 } else if (rates != NULL) {
510 res = my_asprintf(&value_str, GAUGE_FORMAT, rates[index]);
511 if (res == -1) {
512 free(ret_str);
513 ERROR("write_sensu plugin: Unable to alloc memory");
514 return NULL;
515 }
516 } else {
517 if (ds->ds[index].type == DS_TYPE_DERIVE) {
518 res = my_asprintf(&value_str, "%" PRIi64, vl->values[index].derive);
519 if (res == -1) {
520 free(ret_str);
521 ERROR("write_sensu plugin: Unable to alloc memory");
522 return NULL;
523 }
524 } else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) {
525 res = my_asprintf(&value_str, "%" PRIu64, vl->values[index].absolute);
526 if (res == -1) {
527 free(ret_str);
528 ERROR("write_sensu plugin: Unable to alloc memory");
529 return NULL;
530 }
531 } else {
532 res = my_asprintf(&value_str, "%" PRIu64,
533 (uint64_t)vl->values[index].counter);
534 if (res == -1) {
535 free(ret_str);
536 ERROR("write_sensu plugin: Unable to alloc memory");
537 return NULL;
538 }
539 }
540 }
541
542 // Generate the full service name
543 sensu_format_name2(name_buffer, sizeof(name_buffer), vl->host, vl->plugin,
544 vl->plugin_instance, vl->type, vl->type_instance,
545 host->separator);
546 if (host->always_append_ds || (ds->ds_num > 1)) {
547 if (host->event_service_prefix == NULL)
548 snprintf(service_buffer, sizeof(service_buffer), "%s.%s", name_buffer,
549 ds->ds[index].name);
550 else
551 snprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
552 host->event_service_prefix, name_buffer, ds->ds[index].name);
553 } else {
554 if (host->event_service_prefix == NULL)
555 sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
556 else
557 snprintf(service_buffer, sizeof(service_buffer), "%s%s",
558 host->event_service_prefix, name_buffer);
559 }
560
561 // Replace collectd sensor name reserved characters so that time series DB is
562 // happy
563 in_place_replace_sensu_name_reserved(service_buffer);
564
565 // finalize the buffer by setting the output and closing curly bracket
566 res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %lld\"}\n", ret_str,
567 service_buffer, value_str,
568 (long long)CDTIME_T_TO_TIME_T(vl->time));
569 free(ret_str);
570 free(value_str);
571 if (res == -1) {
572 ERROR("write_sensu plugin: Unable to alloc memory");
573 return NULL;
574 }
575 ret_str = temp_str;
576
577 DEBUG("write_sensu plugin: Successfully created json for metric: "
578 "host = \"%s\", service = \"%s\"",
579 vl->host, service_buffer);
580 return ret_str;
581 } /* }}} char *sensu_value_to_json */
582
583 /*
584 * Uses replace_str2() implementation from
585 * http://creativeandcritical.net/str-replace-c/
586 * copyright (c) Laird Shaw, under public domain.
587 */
replace_str(const char * str,const char * old,const char * new)588 static char *replace_str(const char *str, const char *old, /* {{{ */
589 const char *new) {
590 char *ret, *r;
591 const char *p, *q;
592 size_t oldlen = strlen(old);
593 size_t count = strlen(new);
594 size_t retlen;
595 size_t newlen = count;
596 int samesize = (oldlen == newlen);
597
598 if (!samesize) {
599 for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
600 count++;
601 /* This is undefined if p - str > PTRDIFF_MAX */
602 retlen = p - str + strlen(p) + count * (newlen - oldlen);
603 } else
604 retlen = strlen(str);
605
606 ret = calloc(1, retlen + 1);
607 if (ret == NULL)
608 return NULL;
609 // added to original: not optimized, but keeps valgrind happy.
610
611 r = ret;
612 p = str;
613 while (1) {
614 /* If the old and new strings are different lengths - in other
615 * words we have already iterated through with strstr above,
616 * and thus we know how many times we need to call it - then we
617 * can avoid the final (potentially lengthy) call to strstr,
618 * which we already know is going to return NULL, by
619 * decrementing and checking count.
620 */
621 if (!samesize && !count--)
622 break;
623 /* Otherwise i.e. when the old and new strings are the same
624 * length, and we don't know how many times to call strstr,
625 * we must check for a NULL return here (we check it in any
626 * event, to avoid further conditions, and because there's
627 * no harm done with the check even when the old and new
628 * strings are different lengths).
629 */
630 if ((q = strstr(p, old)) == NULL)
631 break;
632 /* This is undefined if q - p > PTRDIFF_MAX */
633 ptrdiff_t l = q - p;
634 memcpy(r, p, l);
635 r += l;
636 memcpy(r, new, newlen);
637 r += newlen;
638 p = q + oldlen;
639 }
640 sstrncpy(r, p, retlen + 1);
641
642 return ret;
643 } /* }}} char *replace_str */
644
replace_json_reserved(const char * message)645 static char *replace_json_reserved(const char *message) /* {{{ */
646 {
647 char *msg = replace_str(message, "\\", "\\\\");
648 if (msg == NULL) {
649 ERROR("write_sensu plugin: Unable to alloc memory");
650 return NULL;
651 }
652 char *tmp = replace_str(msg, "\"", "\\\"");
653 free(msg);
654 if (tmp == NULL) {
655 ERROR("write_sensu plugin: Unable to alloc memory");
656 return NULL;
657 }
658 msg = replace_str(tmp, "\n", "\\\n");
659 free(tmp);
660 if (msg == NULL) {
661 ERROR("write_sensu plugin: Unable to alloc memory");
662 return NULL;
663 }
664 return msg;
665 } /* }}} char *replace_json_reserved */
666
sensu_notification_to_json(struct sensu_host * host,notification_t const * n)667 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
668 notification_t const *n) {
669 char service_buffer[6 * DATA_MAX_NAME_LEN];
670 char const *severity;
671 char *ret_str;
672 char *temp_str;
673 int status;
674 size_t i;
675 int res;
676 // add the severity/status
677 switch (n->severity) {
678 case NOTIF_OKAY:
679 severity = "OK";
680 status = 0;
681 break;
682 case NOTIF_WARNING:
683 severity = "WARNING";
684 status = 1;
685 break;
686 case NOTIF_FAILURE:
687 severity = "CRITICAL";
688 status = 2;
689 break;
690 default:
691 severity = "UNKNOWN";
692 status = 3;
693 }
694 res = my_asprintf(&temp_str, "{\"status\": %d", status);
695 if (res == -1) {
696 ERROR("write_sensu plugin: Unable to alloc memory");
697 return NULL;
698 }
699 ret_str = temp_str;
700
701 // incorporate the timestamp
702 res = my_asprintf(&temp_str, "%s, \"timestamp\": %lld", ret_str,
703 (long long)CDTIME_T_TO_TIME_T(n->time));
704 free(ret_str);
705 if (res == -1) {
706 ERROR("write_sensu plugin: Unable to alloc memory");
707 return NULL;
708 }
709 ret_str = temp_str;
710
711 if (host->include_source) {
712 res = my_asprintf(&temp_str, "%s, \"source\": \"%s\"", ret_str, n->host);
713 free(ret_str);
714 if (res == -1) {
715 ERROR("write_sensu plugin: Unable to alloc memory");
716 return NULL;
717 }
718 ret_str = temp_str;
719 }
720
721 char *handlers_str =
722 build_json_str_list("handlers", &(host->notification_handlers));
723 if (handlers_str == NULL) {
724 ERROR("write_sensu plugin: Unable to alloc memory");
725 free(ret_str);
726 return NULL;
727 }
728 // incorporate the handlers
729 if (strlen(handlers_str) != 0) {
730 res = my_asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
731 free(ret_str);
732 free(handlers_str);
733 if (res == -1) {
734 ERROR("write_sensu plugin: Unable to alloc memory");
735 return NULL;
736 }
737 ret_str = temp_str;
738 } else {
739 free(handlers_str);
740 }
741
742 // incorporate the plugin name information if any
743 if (n->plugin[0] != 0) {
744 res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str,
745 n->plugin);
746 free(ret_str);
747 if (res == -1) {
748 ERROR("write_sensu plugin: Unable to alloc memory");
749 return NULL;
750 }
751 ret_str = temp_str;
752 }
753
754 // incorporate the plugin type if any
755 if (n->type[0] != 0) {
756 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"",
757 ret_str, n->type);
758 free(ret_str);
759 if (res == -1) {
760 ERROR("write_sensu plugin: Unable to alloc memory");
761 return NULL;
762 }
763 ret_str = temp_str;
764 }
765
766 // incorporate the plugin instance if any
767 if (n->plugin_instance[0] != 0) {
768 res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"",
769 ret_str, n->plugin_instance);
770 free(ret_str);
771 if (res == -1) {
772 ERROR("write_sensu plugin: Unable to alloc memory");
773 return NULL;
774 }
775 ret_str = temp_str;
776 }
777
778 // incorporate the plugin type instance if any
779 if (n->type_instance[0] != 0) {
780 res =
781 my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"",
782 ret_str, n->type_instance);
783 free(ret_str);
784 if (res == -1) {
785 ERROR("write_sensu plugin: Unable to alloc memory");
786 return NULL;
787 }
788 ret_str = temp_str;
789 }
790
791 // add key value attributes from config if any
792 for (i = 0; i < sensu_attrs_num; i += 2) {
793 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i],
794 sensu_attrs[i + 1]);
795 free(ret_str);
796 if (res == -1) {
797 ERROR("write_sensu plugin: Unable to alloc memory");
798 return NULL;
799 }
800 ret_str = temp_str;
801 }
802
803 // incorporate sensu tags from config if any
804 if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
805 res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
806 free(ret_str);
807 if (res == -1) {
808 ERROR("write_sensu plugin: Unable to alloc memory");
809 return NULL;
810 }
811 ret_str = temp_str;
812 }
813
814 // incorporate the service name
815 sensu_format_name2(service_buffer, sizeof(service_buffer),
816 /* host */ "", n->plugin, n->plugin_instance, n->type,
817 n->type_instance, host->separator);
818 // replace sensu event name chars that are considered illegal
819 in_place_replace_sensu_name_reserved(service_buffer);
820 res = my_asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str,
821 &service_buffer[1]);
822 free(ret_str);
823 if (res == -1) {
824 ERROR("write_sensu plugin: Unable to alloc memory");
825 return NULL;
826 }
827 ret_str = temp_str;
828
829 // incorporate the check output
830 if (n->message[0] != 0) {
831 char *msg = replace_json_reserved(n->message);
832 if (msg == NULL) {
833 ERROR("write_sensu plugin: Unable to alloc memory");
834 free(ret_str);
835 return NULL;
836 }
837 res = my_asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str,
838 severity, msg);
839 free(ret_str);
840 free(msg);
841 if (res == -1) {
842 ERROR("write_sensu plugin: Unable to alloc memory");
843 return NULL;
844 }
845 ret_str = temp_str;
846 }
847
848 // Pull in values from threshold and add extra attributes
849 for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
850 if (strcasecmp("CurrentValue", meta->name) == 0 &&
851 meta->type == NM_TYPE_DOUBLE) {
852 res = my_asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str,
853 meta->nm_value.nm_double);
854 free(ret_str);
855 if (res == -1) {
856 ERROR("write_sensu plugin: Unable to alloc memory");
857 return NULL;
858 }
859 ret_str = temp_str;
860 }
861 if (meta->type == NM_TYPE_STRING) {
862 res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name,
863 meta->nm_value.nm_string);
864 free(ret_str);
865 if (res == -1) {
866 ERROR("write_sensu plugin: Unable to alloc memory");
867 return NULL;
868 }
869 ret_str = temp_str;
870 }
871 }
872
873 // close the curly bracket
874 res = my_asprintf(&temp_str, "%s}\n", ret_str);
875 free(ret_str);
876 if (res == -1) {
877 ERROR("write_sensu plugin: Unable to alloc memory");
878 return NULL;
879 }
880 ret_str = temp_str;
881
882 DEBUG("write_sensu plugin: Successfully created JSON for notification: "
883 "host = \"%s\", service = \"%s\", state = \"%s\"",
884 n->host, service_buffer, severity);
885 return ret_str;
886 } /* }}} char *sensu_notification_to_json */
887
sensu_send_msg(struct sensu_host * host,const char * msg)888 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
889 {
890 int status = 0;
891 size_t buffer_len;
892
893 status = sensu_connect(host);
894 if (status != 0)
895 return status;
896
897 buffer_len = strlen(msg);
898
899 status = (int)swrite(host->s, msg, buffer_len);
900 sensu_close_socket(host);
901
902 if (status != 0) {
903 ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
904 (host->node != NULL) ? host->node : SENSU_HOST,
905 (host->service != NULL) ? host->service : SENSU_PORT, STRERRNO);
906 return -1;
907 }
908
909 return 0;
910 } /* }}} int sensu_send_msg */
911
sensu_send(struct sensu_host * host,char const * msg)912 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
913 {
914 int status = 0;
915
916 status = sensu_send_msg(host, msg);
917 if (status != 0) {
918 host->flags &= ~F_READY;
919 if (host->res != NULL) {
920 freeaddrinfo(host->res);
921 host->res = NULL;
922 }
923 return status;
924 }
925
926 return 0;
927 } /* }}} int sensu_send */
928
sensu_write(const data_set_t * ds,const value_list_t * vl,user_data_t * ud)929 static int sensu_write(const data_set_t *ds, /* {{{ */
930 const value_list_t *vl, user_data_t *ud) {
931 int status = 0;
932 int statuses[vl->values_len];
933 struct sensu_host *host = ud->data;
934 gauge_t *rates = NULL;
935 char *msg;
936
937 pthread_mutex_lock(&host->lock);
938 memset(statuses, 0, vl->values_len * sizeof(*statuses));
939
940 if (host->store_rates) {
941 rates = uc_get_rate(ds, vl);
942 if (rates == NULL) {
943 ERROR("write_sensu plugin: uc_get_rate failed.");
944 pthread_mutex_unlock(&host->lock);
945 return -1;
946 }
947 }
948 for (size_t i = 0; i < vl->values_len; i++) {
949 msg = sensu_value_to_json(host, ds, vl, (int)i, rates);
950 if (msg == NULL) {
951 sfree(rates);
952 pthread_mutex_unlock(&host->lock);
953 return -1;
954 }
955 status = sensu_send(host, msg);
956 free(msg);
957 if (status != 0) {
958 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
959 pthread_mutex_unlock(&host->lock);
960 sfree(rates);
961 return status;
962 }
963 }
964 sfree(rates);
965 pthread_mutex_unlock(&host->lock);
966 return status;
967 } /* }}} int sensu_write */
968
sensu_notification(const notification_t * n,user_data_t * ud)969 static int sensu_notification(const notification_t *n,
970 user_data_t *ud) /* {{{ */
971 {
972 int status;
973 struct sensu_host *host = ud->data;
974 char *msg;
975
976 pthread_mutex_lock(&host->lock);
977
978 msg = sensu_notification_to_json(host, n);
979 if (msg == NULL) {
980 pthread_mutex_unlock(&host->lock);
981 return -1;
982 }
983
984 status = sensu_send(host, msg);
985 free(msg);
986 if (status != 0)
987 ERROR("write_sensu plugin: sensu_send failed with status %i", status);
988 pthread_mutex_unlock(&host->lock);
989
990 return status;
991 } /* }}} int sensu_notification */
992
sensu_free(void * p)993 static void sensu_free(void *p) /* {{{ */
994 {
995 struct sensu_host *host = p;
996
997 if (host == NULL)
998 return;
999
1000 pthread_mutex_lock(&host->lock);
1001
1002 host->reference_count--;
1003 if (host->reference_count > 0) {
1004 pthread_mutex_unlock(&host->lock);
1005 return;
1006 }
1007
1008 sensu_close_socket(host);
1009 if (host->res != NULL) {
1010 freeaddrinfo(host->res);
1011 host->res = NULL;
1012 }
1013 sfree(host->service);
1014 sfree(host->event_service_prefix);
1015 sfree(host->name);
1016 sfree(host->node);
1017 sfree(host->separator);
1018 free_str_list(&(host->metric_handlers));
1019 free_str_list(&(host->notification_handlers));
1020
1021 pthread_mutex_unlock(&host->lock);
1022 pthread_mutex_destroy(&host->lock);
1023
1024 sfree(host);
1025 } /* }}} void sensu_free */
1026
sensu_config_node(oconfig_item_t * ci)1027 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
1028 {
1029 struct sensu_host *host = NULL;
1030 int status = 0;
1031 oconfig_item_t *child;
1032 char callback_name[DATA_MAX_NAME_LEN];
1033
1034 if ((host = calloc(1, sizeof(*host))) == NULL) {
1035 ERROR("write_sensu plugin: calloc failed.");
1036 return ENOMEM;
1037 }
1038 pthread_mutex_init(&host->lock, NULL);
1039 host->reference_count = 1;
1040 host->node = NULL;
1041 host->service = NULL;
1042 host->notifications = false;
1043 host->metrics = false;
1044 host->store_rates = true;
1045 host->always_append_ds = false;
1046 host->include_source = false;
1047 host->metric_handlers.nb_strs = 0;
1048 host->metric_handlers.strs = NULL;
1049 host->notification_handlers.nb_strs = 0;
1050 host->notification_handlers.strs = NULL;
1051 host->separator = strdup("/");
1052 if (host->separator == NULL) {
1053 ERROR("write_sensu plugin: Unable to alloc memory");
1054 sensu_free(host);
1055 return -1;
1056 }
1057
1058 status = cf_util_get_string(ci, &host->name);
1059 if (status != 0) {
1060 WARNING("write_sensu plugin: Required host name is missing.");
1061 sensu_free(host);
1062 return -1;
1063 }
1064
1065 for (int i = 0; i < ci->children_num; i++) {
1066 child = &ci->children[i];
1067 status = 0;
1068
1069 if (strcasecmp("Host", child->key) == 0) {
1070 status = cf_util_get_string(child, &host->node);
1071 if (status != 0)
1072 break;
1073 } else if (strcasecmp("Notifications", child->key) == 0) {
1074 status = cf_util_get_boolean(child, &host->notifications);
1075 if (status != 0)
1076 break;
1077 } else if (strcasecmp("Metrics", child->key) == 0) {
1078 status = cf_util_get_boolean(child, &host->metrics);
1079 if (status != 0)
1080 break;
1081 } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1082 status = cf_util_get_string(child, &host->event_service_prefix);
1083 if (status != 0)
1084 break;
1085 } else if (strcasecmp("Separator", child->key) == 0) {
1086 status = cf_util_get_string(child, &host->separator);
1087 if (status != 0)
1088 break;
1089 } else if (strcasecmp("MetricHandler", child->key) == 0) {
1090 char *temp_str = NULL;
1091 status = cf_util_get_string(child, &temp_str);
1092 if (status != 0)
1093 break;
1094 status = add_str_to_list(&(host->metric_handlers), temp_str);
1095 free(temp_str);
1096 if (status != 0)
1097 break;
1098 } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1099 char *temp_str = NULL;
1100 status = cf_util_get_string(child, &temp_str);
1101 if (status != 0)
1102 break;
1103 status = add_str_to_list(&(host->notification_handlers), temp_str);
1104 free(temp_str);
1105 if (status != 0)
1106 break;
1107 } else if (strcasecmp("Port", child->key) == 0) {
1108 status = cf_util_get_service(child, &host->service);
1109 if (status != 0)
1110 break;
1111 } else if (strcasecmp("StoreRates", child->key) == 0) {
1112 status = cf_util_get_boolean(child, &host->store_rates);
1113 if (status != 0)
1114 break;
1115 } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1116 status = cf_util_get_boolean(child, &host->always_append_ds);
1117 if (status != 0)
1118 break;
1119 } else if (strcasecmp("IncludeSource", child->key) == 0) {
1120 status = cf_util_get_boolean(child, &host->include_source);
1121 if (status != 0)
1122 break;
1123 } else {
1124 WARNING("write_sensu plugin: ignoring unknown config "
1125 "option: \"%s\"",
1126 child->key);
1127 }
1128 }
1129 if (status != 0) {
1130 sensu_free(host);
1131 return status;
1132 }
1133
1134 if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1135 sensu_free(host);
1136 WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. "
1137 "Giving up.");
1138 return -1;
1139 }
1140
1141 if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1142 sensu_free(host);
1143 WARNING("write_sensu plugin: notifications enabled but no "
1144 "NotificationHandler defined. Giving up.");
1145 return -1;
1146 }
1147
1148 if ((host->notification_handlers.nb_strs > 0) &&
1149 (host->notifications == false)) {
1150 WARNING("write_sensu plugin: NotificationHandler given so forcing "
1151 "notifications to be enabled");
1152 host->notifications = 1;
1153 }
1154
1155 if ((host->metric_handlers.nb_strs > 0) && (host->metrics == false)) {
1156 WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be "
1157 "enabled");
1158 host->metrics = true;
1159 }
1160
1161 if (!(host->notifications || host->metrics)) {
1162 WARNING("write_sensu plugin: neither metrics nor notifications enabled. "
1163 "Giving up.");
1164 sensu_free(host);
1165 return -1;
1166 }
1167
1168 snprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1169
1170 user_data_t ud = {.data = host, .free_func = sensu_free};
1171
1172 pthread_mutex_lock(&host->lock);
1173
1174 if (host->metrics) {
1175 status = plugin_register_write(callback_name, sensu_write, &ud);
1176 if (status != 0)
1177 WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1178 "failed with status %i.",
1179 callback_name, status);
1180 else /* success */
1181 host->reference_count++;
1182 }
1183
1184 if (host->notifications) {
1185 status =
1186 plugin_register_notification(callback_name, sensu_notification, &ud);
1187 if (status != 0)
1188 WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1189 "failed with status %i.",
1190 callback_name, status);
1191 else
1192 host->reference_count++;
1193 }
1194
1195 if (host->reference_count <= 1) {
1196 /* Both callbacks failed => free memory.
1197 * We need to unlock here, because sensu_free() will lock.
1198 * This is not a race condition, because we're the only one
1199 * holding a reference. */
1200 pthread_mutex_unlock(&host->lock);
1201 sensu_free(host);
1202 return -1;
1203 }
1204
1205 host->reference_count--;
1206 pthread_mutex_unlock(&host->lock);
1207
1208 return status;
1209 } /* }}} int sensu_config_node */
1210
sensu_config(oconfig_item_t * ci)1211 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1212 {
1213 oconfig_item_t *child;
1214 int status;
1215 struct str_list sensu_tags_arr;
1216
1217 sensu_tags_arr.nb_strs = 0;
1218 sensu_tags_arr.strs = NULL;
1219
1220 for (int i = 0; i < ci->children_num; i++) {
1221 child = &ci->children[i];
1222
1223 if (strcasecmp("Node", child->key) == 0) {
1224 sensu_config_node(child);
1225 } else if (strcasecmp(child->key, "attribute") == 0) {
1226 if (child->values_num != 2) {
1227 WARNING("sensu attributes need both a key and a value.");
1228 continue;
1229 }
1230 if (child->values[0].type != OCONFIG_TYPE_STRING ||
1231 child->values[1].type != OCONFIG_TYPE_STRING) {
1232 WARNING("sensu attribute needs string arguments.");
1233 continue;
1234 }
1235
1236 strarray_add(&sensu_attrs, &sensu_attrs_num,
1237 child->values[0].value.string);
1238 strarray_add(&sensu_attrs, &sensu_attrs_num,
1239 child->values[1].value.string);
1240
1241 DEBUG("write_sensu plugin: New attribute: %s => %s",
1242 child->values[0].value.string, child->values[1].value.string);
1243 } else if (strcasecmp(child->key, "tag") == 0) {
1244 char *tmp = NULL;
1245 status = cf_util_get_string(child, &tmp);
1246 if (status != 0)
1247 continue;
1248
1249 status = add_str_to_list(&sensu_tags_arr, tmp);
1250 DEBUG("write_sensu plugin: Got tag: %s", tmp);
1251 sfree(tmp);
1252 if (status != 0)
1253 continue;
1254 } else {
1255 WARNING("write_sensu plugin: Ignoring unknown "
1256 "configuration option \"%s\" at top level.",
1257 child->key);
1258 }
1259 }
1260 if (sensu_tags_arr.nb_strs > 0) {
1261 sfree(sensu_tags);
1262 sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1263 free_str_list(&sensu_tags_arr);
1264 if (sensu_tags == NULL) {
1265 ERROR("write_sensu plugin: Unable to alloc memory");
1266 return -1;
1267 }
1268 }
1269 return 0;
1270 } /* }}} int sensu_config */
1271
module_register(void)1272 void module_register(void) {
1273 plugin_register_complex_config("write_sensu", sensu_config);
1274 }
1275