1 /**
2  * collectd - src/write_sensu.c
3  * Copyright (C) 2015 Fabrice A. Marie
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Fabrice A. Marie <fabrice at kibinlabs.com>
25  */
26 
27 #define _GNU_SOURCE
28 
29 #include "collectd.h"
30 
31 #include "plugin.h"
32 #include "utils/common/common.h"
33 #include "utils_cache.h"
34 #include <arpa/inet.h>
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <netdb.h>
38 #include <stddef.h>
39 
40 #include <stdlib.h>
41 #define SENSU_HOST "localhost"
42 #define SENSU_PORT "3030"
43 
44 #ifdef HAVE_ASPRINTF
45 #define my_asprintf asprintf
46 #define my_vasprintf vasprintf
47 #else
48 /*
49  * asprintf() is available from Solaris 10 update 11.
50  * For older versions, use asprintf() portable implementation from
51  * https://github.com/littlstar/asprintf.c/blob/master/
52  * copyright (c) 2014 joseph werle <joseph.werle@gmail.com> under MIT license.
53  */
54 
my_vasprintf(char ** str,const char * fmt,va_list args)55 static int my_vasprintf(char **str, const char *fmt, va_list args) {
56   int size = 0;
57   va_list tmpa;
58   // copy
59   va_copy(tmpa, args);
60   // apply variadic arguments to
61   // sprintf with format to get size
62   size = vsnprintf(NULL, size, fmt, tmpa);
63   // toss args
64   va_end(tmpa);
65   // return -1 to be compliant if
66   // size is less than 0
67   if (size < 0) {
68     return -1;
69   }
70   // alloc with size plus 1 for `\0'
71   *str = (char *)malloc(size + 1);
72   // return -1 to be compliant
73   // if pointer is `NULL'
74   if (NULL == *str) {
75     return -1;
76   }
77   // format string with original
78   // variadic arguments and set new size
79   size = vsprintf(*str, fmt, args);
80   return size;
81 }
82 
my_asprintf(char ** str,const char * fmt,...)83 static int my_asprintf(char **str, const char *fmt, ...) {
84   int size = 0;
85   va_list args;
86   // init variadic argumens
87   va_start(args, fmt);
88   // format and get size
89   size = my_vasprintf(str, fmt, args);
90   // toss args
91   va_end(args);
92   return size;
93 }
94 
95 #endif
96 
97 struct str_list {
98   int nb_strs;
99   char **strs;
100 };
101 
102 struct sensu_host {
103   char *name;
104   char *event_service_prefix;
105   struct str_list metric_handlers;
106   struct str_list notification_handlers;
107 #define F_READY 0x01
108   uint8_t flags;
109   pthread_mutex_t lock;
110   bool notifications;
111   bool metrics;
112   bool store_rates;
113   bool always_append_ds;
114   bool include_source;
115   char *separator;
116   char *node;
117   char *service;
118   int s;
119   struct addrinfo *res;
120   int reference_count;
121 };
122 
123 static char *sensu_tags;
124 static char **sensu_attrs;
125 static size_t sensu_attrs_num;
126 
add_str_to_list(struct str_list * strs,const char * str_to_add)127 static int add_str_to_list(struct str_list *strs,
128                            const char *str_to_add) /* {{{ */
129 {
130   char **old_strs_ptr = strs->strs;
131   char *newstr = strdup(str_to_add);
132   if (newstr == NULL) {
133     ERROR("write_sensu plugin: Unable to alloc memory");
134     return -1;
135   }
136   strs->strs = realloc(strs->strs, strs->nb_strs + 1);
137   if (strs->strs == NULL) {
138     strs->strs = old_strs_ptr;
139     free(newstr);
140     ERROR("write_sensu plugin: Unable to alloc memory");
141     return -1;
142   }
143   strs->strs[strs->nb_strs] = newstr;
144   strs->nb_strs++;
145   return 0;
146 }
147 /* }}} int add_str_to_list */
148 
free_str_list(struct str_list * strs)149 static void free_str_list(struct str_list *strs) /* {{{ */
150 {
151   for (int i = 0; i < strs->nb_strs; i++)
152     free(strs->strs[i]);
153   free(strs->strs);
154 }
155 /* }}} void free_str_list */
156 
sensu_connect(struct sensu_host * host)157 static int sensu_connect(struct sensu_host *host) /* {{{ */
158 {
159   int e;
160   char const *node;
161   char const *service;
162 
163   // Resolve the target if we haven't done already
164   if (!(host->flags & F_READY)) {
165     memset(&service, 0, sizeof(service));
166     host->res = NULL;
167 
168     node = (host->node != NULL) ? host->node : SENSU_HOST;
169     service = (host->service != NULL) ? host->service : SENSU_PORT;
170 
171     struct addrinfo ai_hints = {.ai_family = AF_INET,
172                                 .ai_flags = AI_ADDRCONFIG,
173                                 .ai_socktype = SOCK_STREAM};
174 
175     if ((e = getaddrinfo(node, service, &ai_hints, &(host->res))) != 0) {
176       ERROR("write_sensu plugin: Unable to resolve host \"%s\": %s", node,
177             gai_strerror(e));
178       return -1;
179     }
180     DEBUG("write_sensu plugin: successfully resolved host/port: %s/%s", node,
181           service);
182     host->flags |= F_READY;
183   }
184 
185   struct linger so_linger;
186   host->s = -1;
187   for (struct addrinfo *ai = host->res; ai != NULL; ai = ai->ai_next) {
188     // create the socket
189     if ((host->s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) ==
190         -1) {
191       continue;
192     }
193 
194     // Set very low close() lingering
195     so_linger.l_onoff = 1;
196     so_linger.l_linger = 3;
197     if (setsockopt(host->s, SOL_SOCKET, SO_LINGER, &so_linger,
198                    sizeof so_linger) != 0)
199       WARNING("write_sensu plugin: failed to set socket close() lingering");
200 
201     set_sock_opts(host->s);
202 
203     // connect the socket
204     if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
205       close(host->s);
206       host->s = -1;
207       continue;
208     }
209     DEBUG("write_sensu plugin: connected");
210     break;
211   }
212 
213   if (host->s < 0) {
214     WARNING("write_sensu plugin: Unable to connect to sensu client");
215     return -1;
216   }
217   return 0;
218 } /* }}} int sensu_connect */
219 
sensu_close_socket(struct sensu_host * host)220 static void sensu_close_socket(struct sensu_host *host) /* {{{ */
221 {
222   if (host->s != -1)
223     close(host->s);
224   host->s = -1;
225 
226 } /* }}} void sensu_close_socket */
227 
build_json_str_list(const char * tag,struct str_list const * list)228 static char *build_json_str_list(const char *tag,
229                                  struct str_list const *list) /* {{{ */
230 {
231   int res;
232   char *ret_str = NULL;
233   char *temp_str;
234   if (list->nb_strs == 0) {
235     ret_str = malloc(1);
236     if (ret_str == NULL) {
237       ERROR("write_sensu plugin: Unable to alloc memory");
238       return NULL;
239     }
240     ret_str[0] = '\0';
241   }
242 
243   res = my_asprintf(&temp_str, "\"%s\": [\"%s\"", tag, list->strs[0]);
244   if (res == -1) {
245     ERROR("write_sensu plugin: Unable to alloc memory");
246     free(ret_str);
247     return NULL;
248   }
249   for (int i = 1; i < list->nb_strs; i++) {
250     res = my_asprintf(&ret_str, "%s, \"%s\"", temp_str, list->strs[i]);
251     free(temp_str);
252     if (res == -1) {
253       ERROR("write_sensu plugin: Unable to alloc memory");
254       return NULL;
255     }
256     temp_str = ret_str;
257   }
258   res = my_asprintf(&ret_str, "%s]", temp_str);
259   free(temp_str);
260   if (res == -1) {
261     ERROR("write_sensu plugin: Unable to alloc memory");
262     return NULL;
263   }
264 
265   return ret_str;
266 } /* }}} char *build_json_str_list*/
267 
sensu_format_name2(char * ret,int ret_len,const char * hostname,const char * plugin,const char * plugin_instance,const char * type,const char * type_instance,const char * separator)268 static int sensu_format_name2(char *ret, int ret_len, const char *hostname,
269                               const char *plugin, const char *plugin_instance,
270                               const char *type, const char *type_instance,
271                               const char *separator) {
272   char *buffer;
273   size_t buffer_size;
274 
275   buffer = ret;
276   buffer_size = (size_t)ret_len;
277 
278 #define APPEND(str)                                                            \
279   do {                                                                         \
280     size_t l = strlen(str);                                                    \
281     if (l >= buffer_size)                                                      \
282       return ENOBUFS;                                                          \
283     memcpy(buffer, (str), l);                                                  \
284     buffer += l;                                                               \
285     buffer_size -= l;                                                          \
286   } while (0)
287 
288   assert(plugin != NULL);
289   assert(type != NULL);
290 
291   APPEND(hostname);
292   APPEND(separator);
293   APPEND(plugin);
294   if ((plugin_instance != NULL) && (plugin_instance[0] != 0)) {
295     APPEND("-");
296     APPEND(plugin_instance);
297   }
298   APPEND(separator);
299   APPEND(type);
300   if ((type_instance != NULL) && (type_instance[0] != 0)) {
301     APPEND("-");
302     APPEND(type_instance);
303   }
304   assert(buffer_size > 0);
305   buffer[0] = 0;
306 
307 #undef APPEND
308   return 0;
309 } /* int sensu_format_name2 */
310 
in_place_replace_sensu_name_reserved(char * orig_name)311 static void in_place_replace_sensu_name_reserved(char *orig_name) /* {{{ */
312 {
313   size_t len = strlen(orig_name);
314   for (size_t i = 0; i < len; i++) {
315     // some plugins like ipmi generate special characters in metric name
316     switch (orig_name[i]) {
317     case '(':
318       orig_name[i] = '_';
319       break;
320     case ')':
321       orig_name[i] = '_';
322       break;
323     case ' ':
324       orig_name[i] = '_';
325       break;
326     case '"':
327       orig_name[i] = '_';
328       break;
329     case '\'':
330       orig_name[i] = '_';
331       break;
332     case '+':
333       orig_name[i] = '_';
334       break;
335     }
336   }
337 } /* }}} char *replace_sensu_name_reserved */
338 
sensu_value_to_json(struct sensu_host const * host,data_set_t const * ds,value_list_t const * vl,size_t index,gauge_t const * rates)339 static char *sensu_value_to_json(struct sensu_host const *host, /* {{{ */
340                                  data_set_t const *ds, value_list_t const *vl,
341                                  size_t index, gauge_t const *rates) {
342   char name_buffer[5 * DATA_MAX_NAME_LEN];
343   char service_buffer[6 * DATA_MAX_NAME_LEN];
344   char *ret_str;
345   char *temp_str;
346   char *value_str;
347   int res;
348   // First part of the JSON string
349   const char *part1 = "{\"name\": \"collectd\", \"type\": \"metric\"";
350 
351   char *handlers_str =
352       build_json_str_list("handlers", &(host->metric_handlers));
353   if (handlers_str == NULL) {
354     ERROR("write_sensu plugin: Unable to alloc memory");
355     return NULL;
356   }
357 
358   // incorporate the handlers
359   if (strlen(handlers_str) == 0) {
360     free(handlers_str);
361     ret_str = strdup(part1);
362     if (ret_str == NULL) {
363       ERROR("write_sensu plugin: Unable to alloc memory");
364       return NULL;
365     }
366   } else {
367     res = my_asprintf(&ret_str, "%s, %s", part1, handlers_str);
368     free(handlers_str);
369     if (res == -1) {
370       ERROR("write_sensu plugin: Unable to alloc memory");
371       return NULL;
372     }
373   }
374 
375   if (host->include_source) {
376     res = my_asprintf(&temp_str, "%s, \"source\": \"%s\"", ret_str, vl->host);
377     free(ret_str);
378     if (res == -1) {
379       ERROR("write_sensu plugin: Unable to alloc memory");
380       return NULL;
381     }
382     ret_str = temp_str;
383   }
384 
385   // incorporate the plugin name information
386   res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str,
387                     vl->plugin);
388   free(ret_str);
389   if (res == -1) {
390     ERROR("write_sensu plugin: Unable to alloc memory");
391     return NULL;
392   }
393   ret_str = temp_str;
394 
395   // incorporate the plugin type
396   res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"", ret_str,
397                     vl->type);
398   free(ret_str);
399   if (res == -1) {
400     ERROR("write_sensu plugin: Unable to alloc memory");
401     return NULL;
402   }
403   ret_str = temp_str;
404 
405   // incorporate the plugin instance if any
406   if (vl->plugin_instance[0] != 0) {
407     res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"",
408                       ret_str, vl->plugin_instance);
409     free(ret_str);
410     if (res == -1) {
411       ERROR("write_sensu plugin: Unable to alloc memory");
412       return NULL;
413     }
414     ret_str = temp_str;
415   }
416 
417   // incorporate the plugin type instance if any
418   if (vl->type_instance[0] != 0) {
419     res =
420         my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"",
421                     ret_str, vl->type_instance);
422     free(ret_str);
423     if (res == -1) {
424       ERROR("write_sensu plugin: Unable to alloc memory");
425       return NULL;
426     }
427     ret_str = temp_str;
428   }
429 
430   // incorporate the data source type
431   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
432     char ds_type[DATA_MAX_NAME_LEN];
433     snprintf(ds_type, sizeof(ds_type), "%s:rate",
434              DS_TYPE_TO_STRING(ds->ds[index].type));
435     res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"",
436                       ret_str, ds_type);
437     free(ret_str);
438     if (res == -1) {
439       ERROR("write_sensu plugin: Unable to alloc memory");
440       return NULL;
441     }
442     ret_str = temp_str;
443   } else {
444     res = my_asprintf(&temp_str, "%s, \"collectd_data_source_type\": \"%s\"",
445                       ret_str, DS_TYPE_TO_STRING(ds->ds[index].type));
446     free(ret_str);
447     if (res == -1) {
448       ERROR("write_sensu plugin: Unable to alloc memory");
449       return NULL;
450     }
451     ret_str = temp_str;
452   }
453 
454   // incorporate the data source name
455   res = my_asprintf(&temp_str, "%s, \"collectd_data_source_name\": \"%s\"",
456                     ret_str, ds->ds[index].name);
457   free(ret_str);
458   if (res == -1) {
459     ERROR("write_sensu plugin: Unable to alloc memory");
460     return NULL;
461   }
462   ret_str = temp_str;
463 
464   // incorporate the data source index
465   {
466     char ds_index[DATA_MAX_NAME_LEN];
467     snprintf(ds_index, sizeof(ds_index), "%" PRIsz, index);
468     res = my_asprintf(&temp_str, "%s, \"collectd_data_source_index\": %s",
469                       ret_str, ds_index);
470     free(ret_str);
471     if (res == -1) {
472       ERROR("write_sensu plugin: Unable to alloc memory");
473       return NULL;
474     }
475     ret_str = temp_str;
476   }
477 
478   // add key value attributes from config if any
479   for (size_t i = 0; i < sensu_attrs_num; i += 2) {
480     res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i],
481                       sensu_attrs[i + 1]);
482     free(ret_str);
483     if (res == -1) {
484       ERROR("write_sensu plugin: Unable to alloc memory");
485       return NULL;
486     }
487     ret_str = temp_str;
488   }
489 
490   // incorporate sensu tags from config if any
491   if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
492     res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
493     free(ret_str);
494     if (res == -1) {
495       ERROR("write_sensu plugin: Unable to alloc memory");
496       return NULL;
497     }
498     ret_str = temp_str;
499   }
500 
501   // calculate the value and set to a string
502   if (ds->ds[index].type == DS_TYPE_GAUGE) {
503     res = my_asprintf(&value_str, GAUGE_FORMAT, vl->values[index].gauge);
504     if (res == -1) {
505       free(ret_str);
506       ERROR("write_sensu plugin: Unable to alloc memory");
507       return NULL;
508     }
509   } else if (rates != NULL) {
510     res = my_asprintf(&value_str, GAUGE_FORMAT, rates[index]);
511     if (res == -1) {
512       free(ret_str);
513       ERROR("write_sensu plugin: Unable to alloc memory");
514       return NULL;
515     }
516   } else {
517     if (ds->ds[index].type == DS_TYPE_DERIVE) {
518       res = my_asprintf(&value_str, "%" PRIi64, vl->values[index].derive);
519       if (res == -1) {
520         free(ret_str);
521         ERROR("write_sensu plugin: Unable to alloc memory");
522         return NULL;
523       }
524     } else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) {
525       res = my_asprintf(&value_str, "%" PRIu64, vl->values[index].absolute);
526       if (res == -1) {
527         free(ret_str);
528         ERROR("write_sensu plugin: Unable to alloc memory");
529         return NULL;
530       }
531     } else {
532       res = my_asprintf(&value_str, "%" PRIu64,
533                         (uint64_t)vl->values[index].counter);
534       if (res == -1) {
535         free(ret_str);
536         ERROR("write_sensu plugin: Unable to alloc memory");
537         return NULL;
538       }
539     }
540   }
541 
542   // Generate the full service name
543   sensu_format_name2(name_buffer, sizeof(name_buffer), vl->host, vl->plugin,
544                      vl->plugin_instance, vl->type, vl->type_instance,
545                      host->separator);
546   if (host->always_append_ds || (ds->ds_num > 1)) {
547     if (host->event_service_prefix == NULL)
548       snprintf(service_buffer, sizeof(service_buffer), "%s.%s", name_buffer,
549                ds->ds[index].name);
550     else
551       snprintf(service_buffer, sizeof(service_buffer), "%s%s.%s",
552                host->event_service_prefix, name_buffer, ds->ds[index].name);
553   } else {
554     if (host->event_service_prefix == NULL)
555       sstrncpy(service_buffer, name_buffer, sizeof(service_buffer));
556     else
557       snprintf(service_buffer, sizeof(service_buffer), "%s%s",
558                host->event_service_prefix, name_buffer);
559   }
560 
561   // Replace collectd sensor name reserved characters so that time series DB is
562   // happy
563   in_place_replace_sensu_name_reserved(service_buffer);
564 
565   // finalize the buffer by setting the output and closing curly bracket
566   res = my_asprintf(&temp_str, "%s, \"output\": \"%s %s %lld\"}\n", ret_str,
567                     service_buffer, value_str,
568                     (long long)CDTIME_T_TO_TIME_T(vl->time));
569   free(ret_str);
570   free(value_str);
571   if (res == -1) {
572     ERROR("write_sensu plugin: Unable to alloc memory");
573     return NULL;
574   }
575   ret_str = temp_str;
576 
577   DEBUG("write_sensu plugin: Successfully created json for metric: "
578         "host = \"%s\", service = \"%s\"",
579         vl->host, service_buffer);
580   return ret_str;
581 } /* }}} char *sensu_value_to_json */
582 
583 /*
584  * Uses replace_str2() implementation from
585  * http://creativeandcritical.net/str-replace-c/
586  * copyright (c) Laird Shaw, under public domain.
587  */
replace_str(const char * str,const char * old,const char * new)588 static char *replace_str(const char *str, const char *old, /* {{{ */
589                          const char *new) {
590   char *ret, *r;
591   const char *p, *q;
592   size_t oldlen = strlen(old);
593   size_t count = strlen(new);
594   size_t retlen;
595   size_t newlen = count;
596   int samesize = (oldlen == newlen);
597 
598   if (!samesize) {
599     for (count = 0, p = str; (q = strstr(p, old)) != NULL; p = q + oldlen)
600       count++;
601     /* This is undefined if p - str > PTRDIFF_MAX */
602     retlen = p - str + strlen(p) + count * (newlen - oldlen);
603   } else
604     retlen = strlen(str);
605 
606   ret = calloc(1, retlen + 1);
607   if (ret == NULL)
608     return NULL;
609   // added to original: not optimized, but keeps valgrind happy.
610 
611   r = ret;
612   p = str;
613   while (1) {
614     /* If the old and new strings are different lengths - in other
615      * words we have already iterated through with strstr above,
616      * and thus we know how many times we need to call it - then we
617      * can avoid the final (potentially lengthy) call to strstr,
618      * which we already know is going to return NULL, by
619      * decrementing and checking count.
620      */
621     if (!samesize && !count--)
622       break;
623     /* Otherwise i.e. when the old and new strings are the same
624      * length, and we don't know how many times to call strstr,
625      * we must check for a NULL return here (we check it in any
626      * event, to avoid further conditions, and because there's
627      * no harm done with the check even when the old and new
628      * strings are different lengths).
629      */
630     if ((q = strstr(p, old)) == NULL)
631       break;
632     /* This is undefined if q - p > PTRDIFF_MAX */
633     ptrdiff_t l = q - p;
634     memcpy(r, p, l);
635     r += l;
636     memcpy(r, new, newlen);
637     r += newlen;
638     p = q + oldlen;
639   }
640   sstrncpy(r, p, retlen + 1);
641 
642   return ret;
643 } /* }}} char *replace_str */
644 
replace_json_reserved(const char * message)645 static char *replace_json_reserved(const char *message) /* {{{ */
646 {
647   char *msg = replace_str(message, "\\", "\\\\");
648   if (msg == NULL) {
649     ERROR("write_sensu plugin: Unable to alloc memory");
650     return NULL;
651   }
652   char *tmp = replace_str(msg, "\"", "\\\"");
653   free(msg);
654   if (tmp == NULL) {
655     ERROR("write_sensu plugin: Unable to alloc memory");
656     return NULL;
657   }
658   msg = replace_str(tmp, "\n", "\\\n");
659   free(tmp);
660   if (msg == NULL) {
661     ERROR("write_sensu plugin: Unable to alloc memory");
662     return NULL;
663   }
664   return msg;
665 } /* }}} char *replace_json_reserved */
666 
sensu_notification_to_json(struct sensu_host * host,notification_t const * n)667 static char *sensu_notification_to_json(struct sensu_host *host, /* {{{ */
668                                         notification_t const *n) {
669   char service_buffer[6 * DATA_MAX_NAME_LEN];
670   char const *severity;
671   char *ret_str;
672   char *temp_str;
673   int status;
674   size_t i;
675   int res;
676   // add the severity/status
677   switch (n->severity) {
678   case NOTIF_OKAY:
679     severity = "OK";
680     status = 0;
681     break;
682   case NOTIF_WARNING:
683     severity = "WARNING";
684     status = 1;
685     break;
686   case NOTIF_FAILURE:
687     severity = "CRITICAL";
688     status = 2;
689     break;
690   default:
691     severity = "UNKNOWN";
692     status = 3;
693   }
694   res = my_asprintf(&temp_str, "{\"status\": %d", status);
695   if (res == -1) {
696     ERROR("write_sensu plugin: Unable to alloc memory");
697     return NULL;
698   }
699   ret_str = temp_str;
700 
701   // incorporate the timestamp
702   res = my_asprintf(&temp_str, "%s, \"timestamp\": %lld", ret_str,
703                     (long long)CDTIME_T_TO_TIME_T(n->time));
704   free(ret_str);
705   if (res == -1) {
706     ERROR("write_sensu plugin: Unable to alloc memory");
707     return NULL;
708   }
709   ret_str = temp_str;
710 
711   if (host->include_source) {
712     res = my_asprintf(&temp_str, "%s, \"source\": \"%s\"", ret_str, n->host);
713     free(ret_str);
714     if (res == -1) {
715       ERROR("write_sensu plugin: Unable to alloc memory");
716       return NULL;
717     }
718     ret_str = temp_str;
719   }
720 
721   char *handlers_str =
722       build_json_str_list("handlers", &(host->notification_handlers));
723   if (handlers_str == NULL) {
724     ERROR("write_sensu plugin: Unable to alloc memory");
725     free(ret_str);
726     return NULL;
727   }
728   // incorporate the handlers
729   if (strlen(handlers_str) != 0) {
730     res = my_asprintf(&temp_str, "%s, %s", ret_str, handlers_str);
731     free(ret_str);
732     free(handlers_str);
733     if (res == -1) {
734       ERROR("write_sensu plugin: Unable to alloc memory");
735       return NULL;
736     }
737     ret_str = temp_str;
738   } else {
739     free(handlers_str);
740   }
741 
742   // incorporate the plugin name information if any
743   if (n->plugin[0] != 0) {
744     res = my_asprintf(&temp_str, "%s, \"collectd_plugin\": \"%s\"", ret_str,
745                       n->plugin);
746     free(ret_str);
747     if (res == -1) {
748       ERROR("write_sensu plugin: Unable to alloc memory");
749       return NULL;
750     }
751     ret_str = temp_str;
752   }
753 
754   // incorporate the plugin type if any
755   if (n->type[0] != 0) {
756     res = my_asprintf(&temp_str, "%s, \"collectd_plugin_type\": \"%s\"",
757                       ret_str, n->type);
758     free(ret_str);
759     if (res == -1) {
760       ERROR("write_sensu plugin: Unable to alloc memory");
761       return NULL;
762     }
763     ret_str = temp_str;
764   }
765 
766   // incorporate the plugin instance if any
767   if (n->plugin_instance[0] != 0) {
768     res = my_asprintf(&temp_str, "%s, \"collectd_plugin_instance\": \"%s\"",
769                       ret_str, n->plugin_instance);
770     free(ret_str);
771     if (res == -1) {
772       ERROR("write_sensu plugin: Unable to alloc memory");
773       return NULL;
774     }
775     ret_str = temp_str;
776   }
777 
778   // incorporate the plugin type instance if any
779   if (n->type_instance[0] != 0) {
780     res =
781         my_asprintf(&temp_str, "%s, \"collectd_plugin_type_instance\": \"%s\"",
782                     ret_str, n->type_instance);
783     free(ret_str);
784     if (res == -1) {
785       ERROR("write_sensu plugin: Unable to alloc memory");
786       return NULL;
787     }
788     ret_str = temp_str;
789   }
790 
791   // add key value attributes from config if any
792   for (i = 0; i < sensu_attrs_num; i += 2) {
793     res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, sensu_attrs[i],
794                       sensu_attrs[i + 1]);
795     free(ret_str);
796     if (res == -1) {
797       ERROR("write_sensu plugin: Unable to alloc memory");
798       return NULL;
799     }
800     ret_str = temp_str;
801   }
802 
803   // incorporate sensu tags from config if any
804   if ((sensu_tags != NULL) && (strlen(sensu_tags) != 0)) {
805     res = my_asprintf(&temp_str, "%s, %s", ret_str, sensu_tags);
806     free(ret_str);
807     if (res == -1) {
808       ERROR("write_sensu plugin: Unable to alloc memory");
809       return NULL;
810     }
811     ret_str = temp_str;
812   }
813 
814   // incorporate the service name
815   sensu_format_name2(service_buffer, sizeof(service_buffer),
816                      /* host */ "", n->plugin, n->plugin_instance, n->type,
817                      n->type_instance, host->separator);
818   // replace sensu event name chars that are considered illegal
819   in_place_replace_sensu_name_reserved(service_buffer);
820   res = my_asprintf(&temp_str, "%s, \"name\": \"%s\"", ret_str,
821                     &service_buffer[1]);
822   free(ret_str);
823   if (res == -1) {
824     ERROR("write_sensu plugin: Unable to alloc memory");
825     return NULL;
826   }
827   ret_str = temp_str;
828 
829   // incorporate the check output
830   if (n->message[0] != 0) {
831     char *msg = replace_json_reserved(n->message);
832     if (msg == NULL) {
833       ERROR("write_sensu plugin: Unable to alloc memory");
834       free(ret_str);
835       return NULL;
836     }
837     res = my_asprintf(&temp_str, "%s, \"output\": \"%s - %s\"", ret_str,
838                       severity, msg);
839     free(ret_str);
840     free(msg);
841     if (res == -1) {
842       ERROR("write_sensu plugin: Unable to alloc memory");
843       return NULL;
844     }
845     ret_str = temp_str;
846   }
847 
848   // Pull in values from threshold and add extra attributes
849   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
850     if (strcasecmp("CurrentValue", meta->name) == 0 &&
851         meta->type == NM_TYPE_DOUBLE) {
852       res = my_asprintf(&temp_str, "%s, \"current_value\": \"%.8f\"", ret_str,
853                         meta->nm_value.nm_double);
854       free(ret_str);
855       if (res == -1) {
856         ERROR("write_sensu plugin: Unable to alloc memory");
857         return NULL;
858       }
859       ret_str = temp_str;
860     }
861     if (meta->type == NM_TYPE_STRING) {
862       res = my_asprintf(&temp_str, "%s, \"%s\": \"%s\"", ret_str, meta->name,
863                         meta->nm_value.nm_string);
864       free(ret_str);
865       if (res == -1) {
866         ERROR("write_sensu plugin: Unable to alloc memory");
867         return NULL;
868       }
869       ret_str = temp_str;
870     }
871   }
872 
873   // close the curly bracket
874   res = my_asprintf(&temp_str, "%s}\n", ret_str);
875   free(ret_str);
876   if (res == -1) {
877     ERROR("write_sensu plugin: Unable to alloc memory");
878     return NULL;
879   }
880   ret_str = temp_str;
881 
882   DEBUG("write_sensu plugin: Successfully created JSON for notification: "
883         "host = \"%s\", service = \"%s\", state = \"%s\"",
884         n->host, service_buffer, severity);
885   return ret_str;
886 } /* }}} char *sensu_notification_to_json */
887 
sensu_send_msg(struct sensu_host * host,const char * msg)888 static int sensu_send_msg(struct sensu_host *host, const char *msg) /* {{{ */
889 {
890   int status = 0;
891   size_t buffer_len;
892 
893   status = sensu_connect(host);
894   if (status != 0)
895     return status;
896 
897   buffer_len = strlen(msg);
898 
899   status = (int)swrite(host->s, msg, buffer_len);
900   sensu_close_socket(host);
901 
902   if (status != 0) {
903     ERROR("write_sensu plugin: Sending to Sensu at %s:%s failed: %s",
904           (host->node != NULL) ? host->node : SENSU_HOST,
905           (host->service != NULL) ? host->service : SENSU_PORT, STRERRNO);
906     return -1;
907   }
908 
909   return 0;
910 } /* }}} int sensu_send_msg */
911 
sensu_send(struct sensu_host * host,char const * msg)912 static int sensu_send(struct sensu_host *host, char const *msg) /* {{{ */
913 {
914   int status = 0;
915 
916   status = sensu_send_msg(host, msg);
917   if (status != 0) {
918     host->flags &= ~F_READY;
919     if (host->res != NULL) {
920       freeaddrinfo(host->res);
921       host->res = NULL;
922     }
923     return status;
924   }
925 
926   return 0;
927 } /* }}} int sensu_send */
928 
sensu_write(const data_set_t * ds,const value_list_t * vl,user_data_t * ud)929 static int sensu_write(const data_set_t *ds, /* {{{ */
930                        const value_list_t *vl, user_data_t *ud) {
931   int status = 0;
932   int statuses[vl->values_len];
933   struct sensu_host *host = ud->data;
934   gauge_t *rates = NULL;
935   char *msg;
936 
937   pthread_mutex_lock(&host->lock);
938   memset(statuses, 0, vl->values_len * sizeof(*statuses));
939 
940   if (host->store_rates) {
941     rates = uc_get_rate(ds, vl);
942     if (rates == NULL) {
943       ERROR("write_sensu plugin: uc_get_rate failed.");
944       pthread_mutex_unlock(&host->lock);
945       return -1;
946     }
947   }
948   for (size_t i = 0; i < vl->values_len; i++) {
949     msg = sensu_value_to_json(host, ds, vl, (int)i, rates);
950     if (msg == NULL) {
951       sfree(rates);
952       pthread_mutex_unlock(&host->lock);
953       return -1;
954     }
955     status = sensu_send(host, msg);
956     free(msg);
957     if (status != 0) {
958       ERROR("write_sensu plugin: sensu_send failed with status %i", status);
959       pthread_mutex_unlock(&host->lock);
960       sfree(rates);
961       return status;
962     }
963   }
964   sfree(rates);
965   pthread_mutex_unlock(&host->lock);
966   return status;
967 } /* }}} int sensu_write */
968 
sensu_notification(const notification_t * n,user_data_t * ud)969 static int sensu_notification(const notification_t *n,
970                               user_data_t *ud) /* {{{ */
971 {
972   int status;
973   struct sensu_host *host = ud->data;
974   char *msg;
975 
976   pthread_mutex_lock(&host->lock);
977 
978   msg = sensu_notification_to_json(host, n);
979   if (msg == NULL) {
980     pthread_mutex_unlock(&host->lock);
981     return -1;
982   }
983 
984   status = sensu_send(host, msg);
985   free(msg);
986   if (status != 0)
987     ERROR("write_sensu plugin: sensu_send failed with status %i", status);
988   pthread_mutex_unlock(&host->lock);
989 
990   return status;
991 } /* }}} int sensu_notification */
992 
sensu_free(void * p)993 static void sensu_free(void *p) /* {{{ */
994 {
995   struct sensu_host *host = p;
996 
997   if (host == NULL)
998     return;
999 
1000   pthread_mutex_lock(&host->lock);
1001 
1002   host->reference_count--;
1003   if (host->reference_count > 0) {
1004     pthread_mutex_unlock(&host->lock);
1005     return;
1006   }
1007 
1008   sensu_close_socket(host);
1009   if (host->res != NULL) {
1010     freeaddrinfo(host->res);
1011     host->res = NULL;
1012   }
1013   sfree(host->service);
1014   sfree(host->event_service_prefix);
1015   sfree(host->name);
1016   sfree(host->node);
1017   sfree(host->separator);
1018   free_str_list(&(host->metric_handlers));
1019   free_str_list(&(host->notification_handlers));
1020 
1021   pthread_mutex_unlock(&host->lock);
1022   pthread_mutex_destroy(&host->lock);
1023 
1024   sfree(host);
1025 } /* }}} void sensu_free */
1026 
sensu_config_node(oconfig_item_t * ci)1027 static int sensu_config_node(oconfig_item_t *ci) /* {{{ */
1028 {
1029   struct sensu_host *host = NULL;
1030   int status = 0;
1031   oconfig_item_t *child;
1032   char callback_name[DATA_MAX_NAME_LEN];
1033 
1034   if ((host = calloc(1, sizeof(*host))) == NULL) {
1035     ERROR("write_sensu plugin: calloc failed.");
1036     return ENOMEM;
1037   }
1038   pthread_mutex_init(&host->lock, NULL);
1039   host->reference_count = 1;
1040   host->node = NULL;
1041   host->service = NULL;
1042   host->notifications = false;
1043   host->metrics = false;
1044   host->store_rates = true;
1045   host->always_append_ds = false;
1046   host->include_source = false;
1047   host->metric_handlers.nb_strs = 0;
1048   host->metric_handlers.strs = NULL;
1049   host->notification_handlers.nb_strs = 0;
1050   host->notification_handlers.strs = NULL;
1051   host->separator = strdup("/");
1052   if (host->separator == NULL) {
1053     ERROR("write_sensu plugin: Unable to alloc memory");
1054     sensu_free(host);
1055     return -1;
1056   }
1057 
1058   status = cf_util_get_string(ci, &host->name);
1059   if (status != 0) {
1060     WARNING("write_sensu plugin: Required host name is missing.");
1061     sensu_free(host);
1062     return -1;
1063   }
1064 
1065   for (int i = 0; i < ci->children_num; i++) {
1066     child = &ci->children[i];
1067     status = 0;
1068 
1069     if (strcasecmp("Host", child->key) == 0) {
1070       status = cf_util_get_string(child, &host->node);
1071       if (status != 0)
1072         break;
1073     } else if (strcasecmp("Notifications", child->key) == 0) {
1074       status = cf_util_get_boolean(child, &host->notifications);
1075       if (status != 0)
1076         break;
1077     } else if (strcasecmp("Metrics", child->key) == 0) {
1078       status = cf_util_get_boolean(child, &host->metrics);
1079       if (status != 0)
1080         break;
1081     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
1082       status = cf_util_get_string(child, &host->event_service_prefix);
1083       if (status != 0)
1084         break;
1085     } else if (strcasecmp("Separator", child->key) == 0) {
1086       status = cf_util_get_string(child, &host->separator);
1087       if (status != 0)
1088         break;
1089     } else if (strcasecmp("MetricHandler", child->key) == 0) {
1090       char *temp_str = NULL;
1091       status = cf_util_get_string(child, &temp_str);
1092       if (status != 0)
1093         break;
1094       status = add_str_to_list(&(host->metric_handlers), temp_str);
1095       free(temp_str);
1096       if (status != 0)
1097         break;
1098     } else if (strcasecmp("NotificationHandler", child->key) == 0) {
1099       char *temp_str = NULL;
1100       status = cf_util_get_string(child, &temp_str);
1101       if (status != 0)
1102         break;
1103       status = add_str_to_list(&(host->notification_handlers), temp_str);
1104       free(temp_str);
1105       if (status != 0)
1106         break;
1107     } else if (strcasecmp("Port", child->key) == 0) {
1108       status = cf_util_get_service(child, &host->service);
1109       if (status != 0)
1110         break;
1111     } else if (strcasecmp("StoreRates", child->key) == 0) {
1112       status = cf_util_get_boolean(child, &host->store_rates);
1113       if (status != 0)
1114         break;
1115     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
1116       status = cf_util_get_boolean(child, &host->always_append_ds);
1117       if (status != 0)
1118         break;
1119     } else if (strcasecmp("IncludeSource", child->key) == 0) {
1120       status = cf_util_get_boolean(child, &host->include_source);
1121       if (status != 0)
1122         break;
1123     } else {
1124       WARNING("write_sensu plugin: ignoring unknown config "
1125               "option: \"%s\"",
1126               child->key);
1127     }
1128   }
1129   if (status != 0) {
1130     sensu_free(host);
1131     return status;
1132   }
1133 
1134   if (host->metrics && (host->metric_handlers.nb_strs == 0)) {
1135     sensu_free(host);
1136     WARNING("write_sensu plugin: metrics enabled but no MetricHandler defined. "
1137             "Giving up.");
1138     return -1;
1139   }
1140 
1141   if (host->notifications && (host->notification_handlers.nb_strs == 0)) {
1142     sensu_free(host);
1143     WARNING("write_sensu plugin: notifications enabled but no "
1144             "NotificationHandler defined. Giving up.");
1145     return -1;
1146   }
1147 
1148   if ((host->notification_handlers.nb_strs > 0) &&
1149       (host->notifications == false)) {
1150     WARNING("write_sensu plugin: NotificationHandler given so forcing "
1151             "notifications to be enabled");
1152     host->notifications = 1;
1153   }
1154 
1155   if ((host->metric_handlers.nb_strs > 0) && (host->metrics == false)) {
1156     WARNING("write_sensu plugin: MetricHandler given so forcing metrics to be "
1157             "enabled");
1158     host->metrics = true;
1159   }
1160 
1161   if (!(host->notifications || host->metrics)) {
1162     WARNING("write_sensu plugin: neither metrics nor notifications enabled. "
1163             "Giving up.");
1164     sensu_free(host);
1165     return -1;
1166   }
1167 
1168   snprintf(callback_name, sizeof(callback_name), "write_sensu/%s", host->name);
1169 
1170   user_data_t ud = {.data = host, .free_func = sensu_free};
1171 
1172   pthread_mutex_lock(&host->lock);
1173 
1174   if (host->metrics) {
1175     status = plugin_register_write(callback_name, sensu_write, &ud);
1176     if (status != 0)
1177       WARNING("write_sensu plugin: plugin_register_write (\"%s\") "
1178               "failed with status %i.",
1179               callback_name, status);
1180     else /* success */
1181       host->reference_count++;
1182   }
1183 
1184   if (host->notifications) {
1185     status =
1186         plugin_register_notification(callback_name, sensu_notification, &ud);
1187     if (status != 0)
1188       WARNING("write_sensu plugin: plugin_register_notification (\"%s\") "
1189               "failed with status %i.",
1190               callback_name, status);
1191     else
1192       host->reference_count++;
1193   }
1194 
1195   if (host->reference_count <= 1) {
1196     /* Both callbacks failed => free memory.
1197      * We need to unlock here, because sensu_free() will lock.
1198      * This is not a race condition, because we're the only one
1199      * holding a reference. */
1200     pthread_mutex_unlock(&host->lock);
1201     sensu_free(host);
1202     return -1;
1203   }
1204 
1205   host->reference_count--;
1206   pthread_mutex_unlock(&host->lock);
1207 
1208   return status;
1209 } /* }}} int sensu_config_node */
1210 
sensu_config(oconfig_item_t * ci)1211 static int sensu_config(oconfig_item_t *ci) /* {{{ */
1212 {
1213   oconfig_item_t *child;
1214   int status;
1215   struct str_list sensu_tags_arr;
1216 
1217   sensu_tags_arr.nb_strs = 0;
1218   sensu_tags_arr.strs = NULL;
1219 
1220   for (int i = 0; i < ci->children_num; i++) {
1221     child = &ci->children[i];
1222 
1223     if (strcasecmp("Node", child->key) == 0) {
1224       sensu_config_node(child);
1225     } else if (strcasecmp(child->key, "attribute") == 0) {
1226       if (child->values_num != 2) {
1227         WARNING("sensu attributes need both a key and a value.");
1228         continue;
1229       }
1230       if (child->values[0].type != OCONFIG_TYPE_STRING ||
1231           child->values[1].type != OCONFIG_TYPE_STRING) {
1232         WARNING("sensu attribute needs string arguments.");
1233         continue;
1234       }
1235 
1236       strarray_add(&sensu_attrs, &sensu_attrs_num,
1237                    child->values[0].value.string);
1238       strarray_add(&sensu_attrs, &sensu_attrs_num,
1239                    child->values[1].value.string);
1240 
1241       DEBUG("write_sensu plugin: New attribute: %s => %s",
1242             child->values[0].value.string, child->values[1].value.string);
1243     } else if (strcasecmp(child->key, "tag") == 0) {
1244       char *tmp = NULL;
1245       status = cf_util_get_string(child, &tmp);
1246       if (status != 0)
1247         continue;
1248 
1249       status = add_str_to_list(&sensu_tags_arr, tmp);
1250       DEBUG("write_sensu plugin: Got tag: %s", tmp);
1251       sfree(tmp);
1252       if (status != 0)
1253         continue;
1254     } else {
1255       WARNING("write_sensu plugin: Ignoring unknown "
1256               "configuration option \"%s\" at top level.",
1257               child->key);
1258     }
1259   }
1260   if (sensu_tags_arr.nb_strs > 0) {
1261     sfree(sensu_tags);
1262     sensu_tags = build_json_str_list("tags", &sensu_tags_arr);
1263     free_str_list(&sensu_tags_arr);
1264     if (sensu_tags == NULL) {
1265       ERROR("write_sensu plugin: Unable to alloc memory");
1266       return -1;
1267     }
1268   }
1269   return 0;
1270 } /* }}} int sensu_config */
1271 
module_register(void)1272 void module_register(void) {
1273   plugin_register_complex_config("write_sensu", sensu_config);
1274 }
1275