1 /**
2  * collectd - src/write_prometheus.c
3  * Copyright (C) 2016       Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a copy
6  * of this software and associated documentation files (the "Software"), to deal
7  * in the Software without restriction, including without limitation the rights
8  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9  * copies of the Software, and to permit persons to whom the Software is
10  * furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21  * SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  */
26 
27 #include "collectd.h"
28 
29 #include "plugin.h"
30 #include "utils/avltree/avltree.h"
31 #include "utils/common/common.h"
32 #include "utils_complain.h"
33 #include "utils_time.h"
34 
35 #include "prometheus.pb-c.h"
36 
37 #include <microhttpd.h>
38 
39 #include <netdb.h>
40 #include <sys/socket.h>
41 #include <sys/types.h>
42 
43 #ifndef PROMETHEUS_DEFAULT_STALENESS_DELTA
44 #define PROMETHEUS_DEFAULT_STALENESS_DELTA TIME_T_TO_CDTIME_T_STATIC(300)
45 #endif
46 
47 #define VARINT_UINT32_BYTES 5
48 
49 #define CONTENT_TYPE_PROTO                                                     \
50   "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; " \
51   "encoding=delimited"
52 #define CONTENT_TYPE_TEXT "text/plain; version=0.0.4"
53 
54 #if MHD_VERSION >= 0x00097002
55 #define MHD_RESULT enum MHD_Result
56 #else
57 #define MHD_RESULT int
58 #endif
59 
60 static c_avl_tree_t *metrics;
61 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
62 
63 static char *httpd_host = NULL;
64 static unsigned short httpd_port = 9103;
65 static struct MHD_Daemon *httpd;
66 
67 static cdtime_t staleness_delta = PROMETHEUS_DEFAULT_STALENESS_DELTA;
68 
69 /* Unfortunately, protoc-c doesn't export its implementation of varint, so we
70  * need to implement our own. */
varint(uint8_t buffer[static VARINT_UINT32_BYTES],uint32_t value)71 static size_t varint(uint8_t buffer[static VARINT_UINT32_BYTES],
72                      uint32_t value) {
73   for (size_t i = 0; i < VARINT_UINT32_BYTES; i++) {
74     buffer[i] = (uint8_t)(value & 0x7f);
75     value >>= 7;
76 
77     if (value == 0)
78       return i + 1;
79 
80     buffer[i] |= 0x80;
81   }
82 
83   return 0;
84 }
85 
86 /* format_protobuf iterates over all metric families in "metrics" and adds them
87  * to a buffer in ProtoBuf format. It prefixes each protobuf with its encoded
88  * size, the so called "delimited" format. */
format_protobuf(ProtobufCBuffer * buffer)89 static void format_protobuf(ProtobufCBuffer *buffer) {
90   pthread_mutex_lock(&metrics_lock);
91 
92   char *unused_name;
93   Io__Prometheus__Client__MetricFamily *fam;
94   c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
95   while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
96     /* Prometheus uses a message length prefix to determine where one
97      * MetricFamily ends and the next begins. This delimiter is encoded as a
98      * "varint", which is common in Protobufs. */
99     uint8_t delim[VARINT_UINT32_BYTES] = {0};
100     size_t delim_len = varint(
101         delim,
102         (uint32_t)io__prometheus__client__metric_family__get_packed_size(fam));
103     buffer->append(buffer, delim_len, delim);
104 
105     io__prometheus__client__metric_family__pack_to_buffer(fam, buffer);
106   }
107   c_avl_iterator_destroy(iter);
108 
109   pthread_mutex_unlock(&metrics_lock);
110 }
111 
escape_label_value(char * buffer,size_t buffer_size,char const * value)112 static char const *escape_label_value(char *buffer, size_t buffer_size,
113                                       char const *value) {
114   /* shortcut for values that don't need escaping. */
115   if (strpbrk(value, "\n\"\\") == NULL)
116     return value;
117 
118   size_t value_len = strlen(value);
119   size_t buffer_len = 0;
120 
121   for (size_t i = 0; i < value_len; i++) {
122     switch (value[i]) {
123     case '\n':
124     case '"':
125     case '\\':
126       if ((buffer_size - buffer_len) < 3) {
127         break;
128       }
129       buffer[buffer_len] = '\\';
130       buffer[buffer_len + 1] = (value[i] == '\n') ? 'n' : value[i];
131       buffer_len += 2;
132       break;
133 
134     default:
135       if ((buffer_size - buffer_len) < 2) {
136         break;
137       }
138       buffer[buffer_len] = value[i];
139       buffer_len++;
140       break;
141     }
142   }
143 
144   assert(buffer_len < buffer_size);
145   buffer[buffer_len] = 0;
146   return buffer;
147 }
148 
149 /* format_labels formats a metric's labels in Prometheus-compatible format. This
150  * format looks like this:
151  *
152  *   key0="value0",key1="value1"
153  */
format_labels(char * buffer,size_t buffer_size,Io__Prometheus__Client__Metric const * m)154 static char *format_labels(char *buffer, size_t buffer_size,
155                            Io__Prometheus__Client__Metric const *m) {
156   /* our metrics always have at least one and at most three labels. */
157   assert(m->n_label >= 1);
158   assert(m->n_label <= 3);
159 
160 #define LABEL_KEY_SIZE DATA_MAX_NAME_LEN
161 #define LABEL_VALUE_SIZE (2 * DATA_MAX_NAME_LEN - 1)
162 #define LABEL_BUFFER_SIZE (LABEL_KEY_SIZE + LABEL_VALUE_SIZE + 4)
163 
164   char *labels[3] = {
165       (char[LABEL_BUFFER_SIZE]){0},
166       (char[LABEL_BUFFER_SIZE]){0},
167       (char[LABEL_BUFFER_SIZE]){0},
168   };
169 
170   /* N.B.: the label *names* are hard-coded by this plugin and therefore we
171    * know that they are sane. */
172   for (size_t i = 0; i < m->n_label; i++) {
173     char value[LABEL_VALUE_SIZE];
174     ssnprintf(labels[i], LABEL_BUFFER_SIZE, "%s=\"%s\"", m->label[i]->name,
175               escape_label_value(value, sizeof(value), m->label[i]->value));
176   }
177 
178   strjoin(buffer, buffer_size, labels, m->n_label, ",");
179   return buffer;
180 }
181 
182 /* format_protobuf iterates over all metric families in "metrics" and adds them
183  * to a buffer in plain text format. */
format_text(ProtobufCBuffer * buffer)184 static void format_text(ProtobufCBuffer *buffer) {
185   pthread_mutex_lock(&metrics_lock);
186 
187   char *unused_name;
188   Io__Prometheus__Client__MetricFamily *fam;
189   c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
190   while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
191     char line[1024]; /* 4x DATA_MAX_NAME_LEN? */
192 
193     ssnprintf(line, sizeof(line), "# HELP %s %s\n", fam->name, fam->help);
194     buffer->append(buffer, strlen(line), (uint8_t *)line);
195 
196     ssnprintf(line, sizeof(line), "# TYPE %s %s\n", fam->name,
197               (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
198                   ? "gauge"
199                   : "counter");
200     buffer->append(buffer, strlen(line), (uint8_t *)line);
201 
202     for (size_t i = 0; i < fam->n_metric; i++) {
203       Io__Prometheus__Client__Metric *m = fam->metric[i];
204 
205       char labels[1024];
206 
207       char timestamp_ms[24] = "";
208       if (m->has_timestamp_ms)
209         ssnprintf(timestamp_ms, sizeof(timestamp_ms), " %" PRIi64,
210                   m->timestamp_ms);
211 
212       if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
213         ssnprintf(line, sizeof(line), "%s{%s} " GAUGE_FORMAT "%s\n", fam->name,
214                   format_labels(labels, sizeof(labels), m), m->gauge->value,
215                   timestamp_ms);
216       else /* if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER) */
217         ssnprintf(line, sizeof(line), "%s{%s} %.0f%s\n", fam->name,
218                   format_labels(labels, sizeof(labels), m), m->counter->value,
219                   timestamp_ms);
220 
221       buffer->append(buffer, strlen(line), (uint8_t *)line);
222     }
223   }
224   c_avl_iterator_destroy(iter);
225 
226   char server[1024];
227   ssnprintf(server, sizeof(server), "\n# collectd/write_prometheus %s at %s\n",
228             PACKAGE_VERSION, hostname_g);
229   buffer->append(buffer, strlen(server), (uint8_t *)server);
230 
231   pthread_mutex_unlock(&metrics_lock);
232 }
233 
234 /* http_handler is the callback called by the microhttpd library. It essentially
235  * handles all HTTP request aspects and creates an HTTP response. */
http_handler(void * cls,struct MHD_Connection * connection,const char * url,const char * method,const char * version,const char * upload_data,size_t * upload_data_size,void ** connection_state)236 static MHD_RESULT http_handler(void *cls, struct MHD_Connection *connection,
237                                const char *url, const char *method,
238                                const char *version, const char *upload_data,
239                                size_t *upload_data_size,
240                                void **connection_state) {
241   if (strcmp(method, MHD_HTTP_METHOD_GET) != 0) {
242     return MHD_NO;
243   }
244 
245   /* On the first call for each connection, return without anything further.
246    * Apparently not everything has been initialized yet or so; the docs are not
247    * very specific on the issue. */
248   if (*connection_state == NULL) {
249     /* set to a random non-NULL pointer. */
250     *connection_state = &(int){42};
251     return MHD_YES;
252   }
253 
254   char const *accept = MHD_lookup_connection_value(connection, MHD_HEADER_KIND,
255                                                    MHD_HTTP_HEADER_ACCEPT);
256   bool want_proto = (accept != NULL) &&
257                     (strstr(accept, "application/vnd.google.protobuf") != NULL);
258 
259   uint8_t scratch[4096] = {0};
260   ProtobufCBufferSimple simple = PROTOBUF_C_BUFFER_SIMPLE_INIT(scratch);
261   ProtobufCBuffer *buffer = (ProtobufCBuffer *)&simple;
262 
263   if (want_proto)
264     format_protobuf(buffer);
265   else
266     format_text(buffer);
267 
268 #if defined(MHD_VERSION) && MHD_VERSION >= 0x00090500
269   struct MHD_Response *res = MHD_create_response_from_buffer(
270       simple.len, simple.data, MHD_RESPMEM_MUST_COPY);
271 #else
272   struct MHD_Response *res = MHD_create_response_from_data(
273       simple.len, simple.data, /* must_free = */ 0, /* must_copy = */ 1);
274 #endif
275   MHD_add_response_header(res, MHD_HTTP_HEADER_CONTENT_TYPE,
276                           want_proto ? CONTENT_TYPE_PROTO : CONTENT_TYPE_TEXT);
277 
278   MHD_RESULT status = MHD_queue_response(connection, MHD_HTTP_OK, res);
279 
280   MHD_destroy_response(res);
281   PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&simple);
282   return status;
283 }
284 
285 /*
286  * Functions for manipulating the global state in "metrics". This is organized
287  * in two tiers: the global "metrics" tree holds "metric families", which are
288  * identified by a name (a string). Each metric family has one or more
289  * "metrics", which are identified by a unique set of key-value-pairs. For
290  * example:
291  *
292  * collectd_cpu_total
293  *   {cpu="0",type="idle"}
294  *   {cpu="0",type="user"}
295  *   ...
296  * collectd_memory
297  *   {memory="used"}
298  *   {memory="free"}
299  *   ...
300  * {{{ */
301 /* label_pair_destroy frees the memory used by a label pair. */
label_pair_destroy(Io__Prometheus__Client__LabelPair * msg)302 static void label_pair_destroy(Io__Prometheus__Client__LabelPair *msg) {
303   if (msg == NULL)
304     return;
305 
306   sfree(msg->name);
307   sfree(msg->value);
308 
309   sfree(msg);
310 }
311 
312 /* label_pair_clone allocates and initializes a new label pair. */
313 static Io__Prometheus__Client__LabelPair *
label_pair_clone(Io__Prometheus__Client__LabelPair const * orig)314 label_pair_clone(Io__Prometheus__Client__LabelPair const *orig) {
315   Io__Prometheus__Client__LabelPair *copy = calloc(1, sizeof(*copy));
316   if (copy == NULL)
317     return NULL;
318   io__prometheus__client__label_pair__init(copy);
319 
320   copy->name = strdup(orig->name);
321   copy->value = strdup(orig->value);
322   if ((copy->name == NULL) || (copy->value == NULL)) {
323     label_pair_destroy(copy);
324     return NULL;
325   }
326 
327   return copy;
328 }
329 
330 /* metric_destroy frees the memory used by a metric. */
metric_destroy(Io__Prometheus__Client__Metric * msg)331 static void metric_destroy(Io__Prometheus__Client__Metric *msg) {
332   if (msg == NULL)
333     return;
334 
335   for (size_t i = 0; i < msg->n_label; i++) {
336     label_pair_destroy(msg->label[i]);
337   }
338   sfree(msg->label);
339 
340   sfree(msg->gauge);
341   sfree(msg->counter);
342 
343   sfree(msg);
344 }
345 
346 /* metric_cmp compares two metrics. It's prototype makes it easy to use with
347  * qsort(3) and bsearch(3). */
metric_cmp(void const * a,void const * b)348 static int metric_cmp(void const *a, void const *b) {
349   Io__Prometheus__Client__Metric const *m_a =
350       *((Io__Prometheus__Client__Metric **)a);
351   Io__Prometheus__Client__Metric const *m_b =
352       *((Io__Prometheus__Client__Metric **)b);
353 
354   if (m_a->n_label < m_b->n_label)
355     return -1;
356   else if (m_a->n_label > m_b->n_label)
357     return 1;
358 
359   /* Prometheus does not care about the order of labels. All labels in this
360    * plugin are created by METRIC_ADD_LABELS(), though, and therefore always
361    * appear in the same order. We take advantage of this and simplify the check
362    * by making sure all labels are the same in each position.
363    *
364    * We also only need to check the label values, because the label names are
365    * the same for all metrics in a metric family.
366    *
367    * 3 labels:
368    * [0] $plugin="$plugin_instance" => $plugin is the same within a family
369    * [1] type="$type_instance"      => "type" is a static string
370    * [2] instance="$host"           => "instance" is a static string
371    *
372    * 2 labels, variant 1:
373    * [0] $plugin="$plugin_instance" => $plugin is the same within a family
374    * [1] instance="$host"           => "instance" is a static string
375    *
376    * 2 labels, variant 2:
377    * [0] $plugin="$type_instance"   => $plugin is the same within a family
378    * [1] instance="$host"           => "instance" is a static string
379    *
380    * 1 label:
381    * [1] instance="$host"           => "instance" is a static string
382    */
383   for (size_t i = 0; i < m_a->n_label; i++) {
384     int status = strcmp(m_a->label[i]->value, m_b->label[i]->value);
385     if (status != 0)
386       return status;
387 
388 #if COLLECT_DEBUG
389     assert(strcmp(m_a->label[i]->name, m_b->label[i]->name) == 0);
390 #endif
391   }
392 
393   return 0;
394 }
395 
396 #define METRIC_INIT                                                            \
397   &(Io__Prometheus__Client__Metric) {                                          \
398     .label =                                                                   \
399         (Io__Prometheus__Client__LabelPair *[]){                               \
400             &(Io__Prometheus__Client__LabelPair){                              \
401                 .name = NULL,                                                  \
402             },                                                                 \
403             &(Io__Prometheus__Client__LabelPair){                              \
404                 .name = NULL,                                                  \
405             },                                                                 \
406             &(Io__Prometheus__Client__LabelPair){                              \
407                 .name = NULL,                                                  \
408             },                                                                 \
409         },                                                                     \
410     .n_label = 0,                                                              \
411   }
412 
413 #define METRIC_ADD_LABELS(m, vl)                                               \
414   do {                                                                         \
415     if (strlen((vl)->plugin_instance) != 0) {                                  \
416       (m)->label[(m)->n_label]->name = (char *)(vl)->plugin;                   \
417       (m)->label[(m)->n_label]->value = (char *)(vl)->plugin_instance;         \
418       (m)->n_label++;                                                          \
419     }                                                                          \
420                                                                                \
421     if (strlen((vl)->type_instance) != 0) {                                    \
422       (m)->label[(m)->n_label]->name = "type";                                 \
423       if (strlen((vl)->plugin_instance) == 0)                                  \
424         (m)->label[(m)->n_label]->name = (char *)(vl)->plugin;                 \
425       (m)->label[(m)->n_label]->value = (char *)(vl)->type_instance;           \
426       (m)->n_label++;                                                          \
427     }                                                                          \
428                                                                                \
429     (m)->label[(m)->n_label]->name = "instance";                               \
430     (m)->label[(m)->n_label]->value = (char *)(vl)->host;                      \
431     (m)->n_label++;                                                            \
432   } while (0)
433 
434 /* metric_clone allocates and initializes a new metric based on orig. */
435 static Io__Prometheus__Client__Metric *
metric_clone(Io__Prometheus__Client__Metric const * orig)436 metric_clone(Io__Prometheus__Client__Metric const *orig) {
437   Io__Prometheus__Client__Metric *copy = calloc(1, sizeof(*copy));
438   if (copy == NULL)
439     return NULL;
440   io__prometheus__client__metric__init(copy);
441 
442   copy->n_label = orig->n_label;
443   copy->label = calloc(copy->n_label, sizeof(*copy->label));
444   if (copy->label == NULL) {
445     sfree(copy);
446     return NULL;
447   }
448 
449   for (size_t i = 0; i < copy->n_label; i++) {
450     copy->label[i] = label_pair_clone(orig->label[i]);
451     if (copy->label[i] == NULL) {
452       metric_destroy(copy);
453       return NULL;
454     }
455   }
456 
457   return copy;
458 }
459 
460 /* metric_update stores the new value and timestamp in m. */
metric_update(Io__Prometheus__Client__Metric * m,value_t value,int ds_type,cdtime_t t,cdtime_t interval)461 static int metric_update(Io__Prometheus__Client__Metric *m, value_t value,
462                          int ds_type, cdtime_t t, cdtime_t interval) {
463   if (ds_type == DS_TYPE_GAUGE) {
464     sfree(m->counter);
465     if (m->gauge == NULL) {
466       m->gauge = calloc(1, sizeof(*m->gauge));
467       if (m->gauge == NULL)
468         return ENOMEM;
469       io__prometheus__client__gauge__init(m->gauge);
470     }
471 
472     m->gauge->value = (double)value.gauge;
473     m->gauge->has_value = 1;
474   } else { /* not gauge */
475     sfree(m->gauge);
476     if (m->counter == NULL) {
477       m->counter = calloc(1, sizeof(*m->counter));
478       if (m->counter == NULL)
479         return ENOMEM;
480       io__prometheus__client__counter__init(m->counter);
481     }
482 
483     switch (ds_type) {
484     case DS_TYPE_ABSOLUTE:
485       m->counter->value = (double)value.absolute;
486       break;
487     case DS_TYPE_COUNTER:
488       m->counter->value = (double)value.counter;
489       break;
490     default:
491       m->counter->value = (double)value.derive;
492       break;
493     }
494     m->counter->has_value = 1;
495   }
496 
497   /* Prometheus has a globally configured timeout after which metrics are
498    * considered stale. This causes problems when metrics have an interval
499    * exceeding that limit. We emulate the behavior of "pushgateway" and *not*
500    * send a timestamp value – Prometheus will fill in the current time. */
501   if (interval <= staleness_delta) {
502     m->timestamp_ms = CDTIME_T_TO_MS(t);
503     m->has_timestamp_ms = 1;
504   } else {
505     static c_complain_t long_metric = C_COMPLAIN_INIT_STATIC;
506     c_complain(
507         LOG_NOTICE, &long_metric,
508         "write_prometheus plugin: You have metrics with an interval exceeding "
509         "\"StalenessDelta\" setting (%.3fs). This is suboptimal, please check "
510         "the collectd.conf(5) manual page to understand what's going on.",
511         CDTIME_T_TO_DOUBLE(staleness_delta));
512 
513     m->timestamp_ms = 0;
514     m->has_timestamp_ms = 0;
515   }
516 
517   return 0;
518 }
519 
520 /* metric_family_add_metric adds m to the metric list of fam. */
metric_family_add_metric(Io__Prometheus__Client__MetricFamily * fam,Io__Prometheus__Client__Metric * m)521 static int metric_family_add_metric(Io__Prometheus__Client__MetricFamily *fam,
522                                     Io__Prometheus__Client__Metric *m) {
523   Io__Prometheus__Client__Metric **tmp =
524       realloc(fam->metric, (fam->n_metric + 1) * sizeof(*fam->metric));
525   if (tmp == NULL)
526     return ENOMEM;
527   fam->metric = tmp;
528 
529   fam->metric[fam->n_metric] = m;
530   fam->n_metric++;
531 
532   /* Sort the metrics so that lookup is fast. */
533   qsort(fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
534 
535   return 0;
536 }
537 
538 /* metric_family_delete_metric looks up and deletes the metric corresponding to
539  * vl. */
540 static int
metric_family_delete_metric(Io__Prometheus__Client__MetricFamily * fam,value_list_t const * vl)541 metric_family_delete_metric(Io__Prometheus__Client__MetricFamily *fam,
542                             value_list_t const *vl) {
543   Io__Prometheus__Client__Metric *key = METRIC_INIT;
544   METRIC_ADD_LABELS(key, vl);
545 
546   size_t i;
547   for (i = 0; i < fam->n_metric; i++) {
548     if (metric_cmp(&key, &fam->metric[i]) == 0)
549       break;
550   }
551 
552   if (i >= fam->n_metric)
553     return ENOENT;
554 
555   metric_destroy(fam->metric[i]);
556   if ((fam->n_metric - 1) > i)
557     memmove(&fam->metric[i], &fam->metric[i + 1],
558             ((fam->n_metric - 1) - i) * sizeof(fam->metric[i]));
559   fam->n_metric--;
560 
561   if (fam->n_metric == 0) {
562     sfree(fam->metric);
563     return 0;
564   }
565 
566   Io__Prometheus__Client__Metric **tmp =
567       realloc(fam->metric, fam->n_metric * sizeof(*fam->metric));
568   if (tmp != NULL)
569     fam->metric = tmp;
570 
571   return 0;
572 }
573 
574 /* metric_family_get_metric looks up the matching metric in a metric family,
575  * allocating it if necessary. */
576 static Io__Prometheus__Client__Metric *
metric_family_get_metric(Io__Prometheus__Client__MetricFamily * fam,value_list_t const * vl)577 metric_family_get_metric(Io__Prometheus__Client__MetricFamily *fam,
578                          value_list_t const *vl) {
579   Io__Prometheus__Client__Metric *key = METRIC_INIT;
580   METRIC_ADD_LABELS(key, vl);
581 
582   /* Metrics are sorted in metric_family_add_metric() so that we can do a binary
583    * search here. */
584   Io__Prometheus__Client__Metric **m = bsearch(
585       &key, fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
586 
587   if (m != NULL) {
588     return *m;
589   }
590 
591   Io__Prometheus__Client__Metric *new_metric = metric_clone(key);
592   if (new_metric == NULL)
593     return NULL;
594 
595   DEBUG("write_prometheus plugin: created new metric in family");
596   int status = metric_family_add_metric(fam, new_metric);
597   if (status != 0) {
598     metric_destroy(new_metric);
599     return NULL;
600   }
601 
602   return new_metric;
603 }
604 
605 /* metric_family_update looks up the matching metric in a metric family,
606  * allocating it if necessary, and updates the metric to the latest value. */
metric_family_update(Io__Prometheus__Client__MetricFamily * fam,data_set_t const * ds,value_list_t const * vl,size_t ds_index)607 static int metric_family_update(Io__Prometheus__Client__MetricFamily *fam,
608                                 data_set_t const *ds, value_list_t const *vl,
609                                 size_t ds_index) {
610   Io__Prometheus__Client__Metric *m = metric_family_get_metric(fam, vl);
611   if (m == NULL)
612     return -1;
613 
614   return metric_update(m, vl->values[ds_index], ds->ds[ds_index].type, vl->time,
615                        vl->interval);
616 }
617 
618 /* metric_family_destroy frees the memory used by a metric family. */
metric_family_destroy(Io__Prometheus__Client__MetricFamily * msg)619 static void metric_family_destroy(Io__Prometheus__Client__MetricFamily *msg) {
620   if (msg == NULL)
621     return;
622 
623   sfree(msg->name);
624   sfree(msg->help);
625 
626   for (size_t i = 0; i < msg->n_metric; i++) {
627     metric_destroy(msg->metric[i]);
628   }
629   sfree(msg->metric);
630 
631   sfree(msg);
632 }
633 
634 /* metric_family_create allocates and initializes a new metric family. */
635 static Io__Prometheus__Client__MetricFamily *
metric_family_create(char * name,data_set_t const * ds,value_list_t const * vl,size_t ds_index)636 metric_family_create(char *name, data_set_t const *ds, value_list_t const *vl,
637                      size_t ds_index) {
638   Io__Prometheus__Client__MetricFamily *msg = calloc(1, sizeof(*msg));
639   if (msg == NULL)
640     return NULL;
641   io__prometheus__client__metric_family__init(msg);
642 
643   msg->name = name;
644 
645   char help[1024];
646   ssnprintf(
647       help, sizeof(help),
648       "write_prometheus plugin: '%s' Type: '%s', Dstype: '%s', Dsname: '%s'",
649       vl->plugin, vl->type, DS_TYPE_TO_STRING(ds->ds[ds_index].type),
650       ds->ds[ds_index].name);
651   msg->help = strdup(help);
652 
653   msg->type = (ds->ds[ds_index].type == DS_TYPE_GAUGE)
654                   ? IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE
655                   : IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER;
656   msg->has_type = 1;
657 
658   return msg;
659 }
660 
661 /* metric_family_name creates a metric family's name from a data source. This is
662  * done in the same way as done by the "collectd_exporter" for best possible
663  * compatibility. In essence, the plugin, type and data source name go in the
664  * metric family name, while hostname, plugin instance and type instance go into
665  * the labels of a metric. */
metric_family_name(data_set_t const * ds,value_list_t const * vl,size_t ds_index)666 static char *metric_family_name(data_set_t const *ds, value_list_t const *vl,
667                                 size_t ds_index) {
668   char const *fields[5] = {"collectd"};
669   size_t fields_num = 1;
670 
671   if (strcmp(vl->plugin, vl->type) != 0) {
672     fields[fields_num] = vl->plugin;
673     fields_num++;
674   }
675   fields[fields_num] = vl->type;
676   fields_num++;
677 
678   if (strcmp("value", ds->ds[ds_index].name) != 0) {
679     fields[fields_num] = ds->ds[ds_index].name;
680     fields_num++;
681   }
682 
683   /* Prometheus best practices:
684    * cumulative metrics should have a "total" suffix. */
685   if ((ds->ds[ds_index].type == DS_TYPE_COUNTER) ||
686       (ds->ds[ds_index].type == DS_TYPE_DERIVE)) {
687     fields[fields_num] = "total";
688     fields_num++;
689   }
690 
691   char name[5 * DATA_MAX_NAME_LEN];
692   strjoin(name, sizeof(name), (char **)fields, fields_num, "_");
693   return strdup(name);
694 }
695 
696 /* metric_family_get looks up the matching metric family, allocating it if
697  * necessary. */
698 static Io__Prometheus__Client__MetricFamily *
metric_family_get(data_set_t const * ds,value_list_t const * vl,size_t ds_index,bool allocate)699 metric_family_get(data_set_t const *ds, value_list_t const *vl, size_t ds_index,
700                   bool allocate) {
701   char *name = metric_family_name(ds, vl, ds_index);
702   if (name == NULL) {
703     ERROR("write_prometheus plugin: Allocating metric family name failed.");
704     return NULL;
705   }
706 
707   Io__Prometheus__Client__MetricFamily *fam = NULL;
708   if (c_avl_get(metrics, name, (void *)&fam) == 0) {
709     sfree(name);
710     assert(fam != NULL);
711     return fam;
712   }
713 
714   if (!allocate) {
715     sfree(name);
716     return NULL;
717   }
718 
719   fam = metric_family_create(name, ds, vl, ds_index);
720   if (fam == NULL) {
721     ERROR("write_prometheus plugin: Allocating metric family failed.");
722     sfree(name);
723     return NULL;
724   }
725 
726   /* If successful, "name" is owned by "fam", i.e. don't free it here. */
727   DEBUG("write_prometheus plugin: metric family \"%s\" has been created.",
728         name);
729   name = NULL;
730 
731   int status = c_avl_insert(metrics, fam->name, fam);
732   if (status != 0) {
733     ERROR("write_prometheus plugin: Adding \"%s\" failed.", fam->name);
734     metric_family_destroy(fam);
735     return NULL;
736   }
737 
738   return fam;
739 }
740 /* }}} */
741 
prom_logger(void * arg,char const * fmt,va_list ap)742 static void prom_logger(__attribute__((unused)) void *arg, char const *fmt,
743                         va_list ap) {
744   /* {{{ */
745   char errbuf[1024];
746   vsnprintf(errbuf, sizeof(errbuf), fmt, ap);
747 
748   ERROR("write_prometheus plugin: %s", errbuf);
749 } /* }}} prom_logger */
750 
751 #if MHD_VERSION >= 0x00090000
prom_open_socket(int addrfamily)752 static int prom_open_socket(int addrfamily) {
753   /* {{{ */
754   char service[NI_MAXSERV];
755   ssnprintf(service, sizeof(service), "%hu", httpd_port);
756 
757   struct addrinfo *res;
758   int status = getaddrinfo(httpd_host, service,
759                            &(struct addrinfo){
760                                .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
761                                .ai_family = addrfamily,
762                                .ai_socktype = SOCK_STREAM,
763                            },
764                            &res);
765   if (status != 0) {
766     return -1;
767   }
768 
769   int fd = -1;
770   for (struct addrinfo *ai = res; ai != NULL; ai = ai->ai_next) {
771     int flags = ai->ai_socktype;
772 #ifdef SOCK_CLOEXEC
773     flags |= SOCK_CLOEXEC;
774 #endif
775 
776     fd = socket(ai->ai_family, flags, 0);
777     if (fd == -1)
778       continue;
779 
780     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) != 0) {
781       WARNING("write_prometheus plugin: setsockopt(SO_REUSEADDR) failed: %s",
782               STRERRNO);
783       close(fd);
784       fd = -1;
785       continue;
786     }
787 
788     if (bind(fd, ai->ai_addr, ai->ai_addrlen) != 0) {
789       close(fd);
790       fd = -1;
791       continue;
792     }
793 
794     if (listen(fd, /* backlog = */ 16) != 0) {
795       close(fd);
796       fd = -1;
797       continue;
798     }
799 
800     char str_node[NI_MAXHOST];
801     char str_service[NI_MAXSERV];
802 
803     getnameinfo(ai->ai_addr, ai->ai_addrlen, str_node, sizeof(str_node),
804                 str_service, sizeof(str_service),
805                 NI_NUMERICHOST | NI_NUMERICSERV);
806 
807     INFO("write_prometheus plugin: Listening on [%s]:%s.", str_node,
808          str_service);
809     break;
810   }
811 
812   freeaddrinfo(res);
813 
814   return fd;
815 } /* }}} int prom_open_socket */
816 
prom_start_daemon()817 static struct MHD_Daemon *prom_start_daemon() {
818   /* {{{ */
819   int fd = prom_open_socket(PF_INET6);
820   if (fd == -1)
821     fd = prom_open_socket(PF_INET);
822   if (fd == -1) {
823     ERROR("write_prometheus plugin: Opening a listening socket for [%s]:%hu "
824           "failed.",
825           (httpd_host != NULL) ? httpd_host : "::", httpd_port);
826     return NULL;
827   }
828 
829   unsigned int flags = MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG;
830 #if MHD_VERSION >= 0x00095300
831   flags |= MHD_USE_INTERNAL_POLLING_THREAD;
832 #endif
833 
834   struct MHD_Daemon *d = MHD_start_daemon(
835       flags, httpd_port,
836       /* MHD_AcceptPolicyCallback = */ NULL,
837       /* MHD_AcceptPolicyCallback arg = */ NULL, http_handler, NULL,
838       MHD_OPTION_LISTEN_SOCKET, fd, MHD_OPTION_EXTERNAL_LOGGER, prom_logger,
839       NULL, MHD_OPTION_END);
840   if (d == NULL) {
841     ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
842     close(fd);
843     return NULL;
844   }
845 
846   return d;
847 } /* }}} struct MHD_Daemon *prom_start_daemon */
848 #else /* if MHD_VERSION < 0x00090000 */
prom_start_daemon()849 static struct MHD_Daemon *prom_start_daemon() {
850   /* {{{ */
851   struct MHD_Daemon *d = MHD_start_daemon(
852       MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG, httpd_port,
853       /* MHD_AcceptPolicyCallback = */ NULL,
854       /* MHD_AcceptPolicyCallback arg = */ NULL, http_handler, NULL,
855       MHD_OPTION_EXTERNAL_LOGGER, prom_logger, NULL, MHD_OPTION_END);
856   if (d == NULL) {
857     ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
858     return NULL;
859   }
860 
861   return d;
862 } /* }}} struct MHD_Daemon *prom_start_daemon */
863 #endif
864 
865 /*
866  * collectd callbacks
867  */
prom_config(oconfig_item_t * ci)868 static int prom_config(oconfig_item_t *ci) {
869   for (int i = 0; i < ci->children_num; i++) {
870     oconfig_item_t *child = ci->children + i;
871 
872     if (strcasecmp("Host", child->key) == 0) {
873 #if MHD_VERSION >= 0x00090000
874       cf_util_get_string(child, &httpd_host);
875 #else
876       ERROR("write_prometheus plugin: Option `Host' not supported. Please "
877             "upgrade libmicrohttpd to at least 0.9.0");
878       return -1;
879 #endif
880     } else if (strcasecmp("Port", child->key) == 0) {
881       int status = cf_util_get_port_number(child);
882       if (status > 0)
883         httpd_port = (unsigned short)status;
884     } else if (strcasecmp("StalenessDelta", child->key) == 0) {
885       cf_util_get_cdtime(child, &staleness_delta);
886     } else {
887       WARNING("write_prometheus plugin: Ignoring unknown configuration option "
888               "\"%s\".",
889               child->key);
890     }
891   }
892 
893   return 0;
894 }
895 
prom_init()896 static int prom_init() {
897   if (metrics == NULL) {
898     metrics = c_avl_create((void *)strcmp);
899     if (metrics == NULL) {
900       ERROR("write_prometheus plugin: c_avl_create() failed.");
901       return -1;
902     }
903   }
904 
905   if (httpd == NULL) {
906     httpd = prom_start_daemon();
907     if (httpd == NULL) {
908       return -1;
909     }
910     DEBUG("write_prometheus plugin: Successfully started microhttpd %s",
911           MHD_get_version());
912   }
913 
914   return 0;
915 }
916 
prom_write(data_set_t const * ds,value_list_t const * vl,user_data_t * ud)917 static int prom_write(data_set_t const *ds, value_list_t const *vl,
918                       __attribute__((unused)) user_data_t *ud) {
919   pthread_mutex_lock(&metrics_lock);
920 
921   for (size_t i = 0; i < ds->ds_num; i++) {
922     Io__Prometheus__Client__MetricFamily *fam =
923         metric_family_get(ds, vl, i, /* allocate = */ true);
924     if (fam == NULL)
925       continue;
926 
927     int status = metric_family_update(fam, ds, vl, i);
928     if (status != 0) {
929       ERROR("write_prometheus plugin: Updating metric \"%s\" failed with "
930             "status %d",
931             fam->name, status);
932       continue;
933     }
934   }
935 
936   pthread_mutex_unlock(&metrics_lock);
937   return 0;
938 }
939 
prom_missing(value_list_t const * vl,user_data_t * ud)940 static int prom_missing(value_list_t const *vl,
941                         __attribute__((unused)) user_data_t *ud) {
942   data_set_t const *ds = plugin_get_ds(vl->type);
943   if (ds == NULL)
944     return ENOENT;
945 
946   pthread_mutex_lock(&metrics_lock);
947 
948   for (size_t i = 0; i < ds->ds_num; i++) {
949     Io__Prometheus__Client__MetricFamily *fam =
950         metric_family_get(ds, vl, i, /* allocate = */ false);
951     if (fam == NULL)
952       continue;
953 
954     int status = metric_family_delete_metric(fam, vl);
955     if (status != 0) {
956       ERROR("write_prometheus plugin: Deleting a metric in family \"%s\" "
957             "failed with status %d",
958             fam->name, status);
959 
960       continue;
961     }
962 
963     if (fam->n_metric == 0) {
964       int status = c_avl_remove(metrics, fam->name, NULL, NULL);
965       if (status != 0) {
966         ERROR("write_prometheus plugin: Deleting metric family \"%s\" failed "
967               "with status %d",
968               fam->name, status);
969         continue;
970       }
971       metric_family_destroy(fam);
972     }
973   }
974 
975   pthread_mutex_unlock(&metrics_lock);
976   return 0;
977 }
978 
prom_shutdown()979 static int prom_shutdown() {
980   if (httpd != NULL) {
981     MHD_stop_daemon(httpd);
982     httpd = NULL;
983   }
984 
985   pthread_mutex_lock(&metrics_lock);
986   if (metrics != NULL) {
987     char *name;
988     Io__Prometheus__Client__MetricFamily *fam;
989     while (c_avl_pick(metrics, (void *)&name, (void *)&fam) == 0) {
990       assert(name == fam->name);
991       name = NULL;
992 
993       metric_family_destroy(fam);
994     }
995     c_avl_destroy(metrics);
996     metrics = NULL;
997   }
998   pthread_mutex_unlock(&metrics_lock);
999 
1000   sfree(httpd_host);
1001 
1002   return 0;
1003 }
1004 
module_register()1005 void module_register() {
1006   plugin_register_complex_config("write_prometheus", prom_config);
1007   plugin_register_init("write_prometheus", prom_init);
1008   plugin_register_write("write_prometheus", prom_write,
1009                         /* user data = */ NULL);
1010   plugin_register_missing("write_prometheus", prom_missing,
1011                           /* user data = */ NULL);
1012   plugin_register_shutdown("write_prometheus", prom_shutdown);
1013 }
1014