1 /**
2 * collectd - src/write_prometheus.c
3 * Copyright (C) 2016 Florian octo Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 *
23 * Authors:
24 * Florian octo Forster <octo at collectd.org>
25 */
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/avltree/avltree.h"
31 #include "utils/common/common.h"
32 #include "utils_complain.h"
33 #include "utils_time.h"
34
35 #include "prometheus.pb-c.h"
36
37 #include <microhttpd.h>
38
39 #include <netdb.h>
40 #include <sys/socket.h>
41 #include <sys/types.h>
42
43 #ifndef PROMETHEUS_DEFAULT_STALENESS_DELTA
44 #define PROMETHEUS_DEFAULT_STALENESS_DELTA TIME_T_TO_CDTIME_T_STATIC(300)
45 #endif
46
47 #define VARINT_UINT32_BYTES 5
48
49 #define CONTENT_TYPE_PROTO \
50 "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; " \
51 "encoding=delimited"
52 #define CONTENT_TYPE_TEXT "text/plain; version=0.0.4"
53
54 #if MHD_VERSION >= 0x00097002
55 #define MHD_RESULT enum MHD_Result
56 #else
57 #define MHD_RESULT int
58 #endif
59
60 static c_avl_tree_t *metrics;
61 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
62
63 static char *httpd_host = NULL;
64 static unsigned short httpd_port = 9103;
65 static struct MHD_Daemon *httpd;
66
67 static cdtime_t staleness_delta = PROMETHEUS_DEFAULT_STALENESS_DELTA;
68
69 /* Unfortunately, protoc-c doesn't export its implementation of varint, so we
70 * need to implement our own. */
varint(uint8_t buffer[static VARINT_UINT32_BYTES],uint32_t value)71 static size_t varint(uint8_t buffer[static VARINT_UINT32_BYTES],
72 uint32_t value) {
73 for (size_t i = 0; i < VARINT_UINT32_BYTES; i++) {
74 buffer[i] = (uint8_t)(value & 0x7f);
75 value >>= 7;
76
77 if (value == 0)
78 return i + 1;
79
80 buffer[i] |= 0x80;
81 }
82
83 return 0;
84 }
85
86 /* format_protobuf iterates over all metric families in "metrics" and adds them
87 * to a buffer in ProtoBuf format. It prefixes each protobuf with its encoded
88 * size, the so called "delimited" format. */
format_protobuf(ProtobufCBuffer * buffer)89 static void format_protobuf(ProtobufCBuffer *buffer) {
90 pthread_mutex_lock(&metrics_lock);
91
92 char *unused_name;
93 Io__Prometheus__Client__MetricFamily *fam;
94 c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
95 while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
96 /* Prometheus uses a message length prefix to determine where one
97 * MetricFamily ends and the next begins. This delimiter is encoded as a
98 * "varint", which is common in Protobufs. */
99 uint8_t delim[VARINT_UINT32_BYTES] = {0};
100 size_t delim_len = varint(
101 delim,
102 (uint32_t)io__prometheus__client__metric_family__get_packed_size(fam));
103 buffer->append(buffer, delim_len, delim);
104
105 io__prometheus__client__metric_family__pack_to_buffer(fam, buffer);
106 }
107 c_avl_iterator_destroy(iter);
108
109 pthread_mutex_unlock(&metrics_lock);
110 }
111
escape_label_value(char * buffer,size_t buffer_size,char const * value)112 static char const *escape_label_value(char *buffer, size_t buffer_size,
113 char const *value) {
114 /* shortcut for values that don't need escaping. */
115 if (strpbrk(value, "\n\"\\") == NULL)
116 return value;
117
118 size_t value_len = strlen(value);
119 size_t buffer_len = 0;
120
121 for (size_t i = 0; i < value_len; i++) {
122 switch (value[i]) {
123 case '\n':
124 case '"':
125 case '\\':
126 if ((buffer_size - buffer_len) < 3) {
127 break;
128 }
129 buffer[buffer_len] = '\\';
130 buffer[buffer_len + 1] = (value[i] == '\n') ? 'n' : value[i];
131 buffer_len += 2;
132 break;
133
134 default:
135 if ((buffer_size - buffer_len) < 2) {
136 break;
137 }
138 buffer[buffer_len] = value[i];
139 buffer_len++;
140 break;
141 }
142 }
143
144 assert(buffer_len < buffer_size);
145 buffer[buffer_len] = 0;
146 return buffer;
147 }
148
149 /* format_labels formats a metric's labels in Prometheus-compatible format. This
150 * format looks like this:
151 *
152 * key0="value0",key1="value1"
153 */
format_labels(char * buffer,size_t buffer_size,Io__Prometheus__Client__Metric const * m)154 static char *format_labels(char *buffer, size_t buffer_size,
155 Io__Prometheus__Client__Metric const *m) {
156 /* our metrics always have at least one and at most three labels. */
157 assert(m->n_label >= 1);
158 assert(m->n_label <= 3);
159
160 #define LABEL_KEY_SIZE DATA_MAX_NAME_LEN
161 #define LABEL_VALUE_SIZE (2 * DATA_MAX_NAME_LEN - 1)
162 #define LABEL_BUFFER_SIZE (LABEL_KEY_SIZE + LABEL_VALUE_SIZE + 4)
163
164 char *labels[3] = {
165 (char[LABEL_BUFFER_SIZE]){0},
166 (char[LABEL_BUFFER_SIZE]){0},
167 (char[LABEL_BUFFER_SIZE]){0},
168 };
169
170 /* N.B.: the label *names* are hard-coded by this plugin and therefore we
171 * know that they are sane. */
172 for (size_t i = 0; i < m->n_label; i++) {
173 char value[LABEL_VALUE_SIZE];
174 ssnprintf(labels[i], LABEL_BUFFER_SIZE, "%s=\"%s\"", m->label[i]->name,
175 escape_label_value(value, sizeof(value), m->label[i]->value));
176 }
177
178 strjoin(buffer, buffer_size, labels, m->n_label, ",");
179 return buffer;
180 }
181
182 /* format_protobuf iterates over all metric families in "metrics" and adds them
183 * to a buffer in plain text format. */
format_text(ProtobufCBuffer * buffer)184 static void format_text(ProtobufCBuffer *buffer) {
185 pthread_mutex_lock(&metrics_lock);
186
187 char *unused_name;
188 Io__Prometheus__Client__MetricFamily *fam;
189 c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
190 while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
191 char line[1024]; /* 4x DATA_MAX_NAME_LEN? */
192
193 ssnprintf(line, sizeof(line), "# HELP %s %s\n", fam->name, fam->help);
194 buffer->append(buffer, strlen(line), (uint8_t *)line);
195
196 ssnprintf(line, sizeof(line), "# TYPE %s %s\n", fam->name,
197 (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
198 ? "gauge"
199 : "counter");
200 buffer->append(buffer, strlen(line), (uint8_t *)line);
201
202 for (size_t i = 0; i < fam->n_metric; i++) {
203 Io__Prometheus__Client__Metric *m = fam->metric[i];
204
205 char labels[1024];
206
207 char timestamp_ms[24] = "";
208 if (m->has_timestamp_ms)
209 ssnprintf(timestamp_ms, sizeof(timestamp_ms), " %" PRIi64,
210 m->timestamp_ms);
211
212 if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
213 ssnprintf(line, sizeof(line), "%s{%s} " GAUGE_FORMAT "%s\n", fam->name,
214 format_labels(labels, sizeof(labels), m), m->gauge->value,
215 timestamp_ms);
216 else /* if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER) */
217 ssnprintf(line, sizeof(line), "%s{%s} %.0f%s\n", fam->name,
218 format_labels(labels, sizeof(labels), m), m->counter->value,
219 timestamp_ms);
220
221 buffer->append(buffer, strlen(line), (uint8_t *)line);
222 }
223 }
224 c_avl_iterator_destroy(iter);
225
226 char server[1024];
227 ssnprintf(server, sizeof(server), "\n# collectd/write_prometheus %s at %s\n",
228 PACKAGE_VERSION, hostname_g);
229 buffer->append(buffer, strlen(server), (uint8_t *)server);
230
231 pthread_mutex_unlock(&metrics_lock);
232 }
233
234 /* http_handler is the callback called by the microhttpd library. It essentially
235 * handles all HTTP request aspects and creates an HTTP response. */
http_handler(void * cls,struct MHD_Connection * connection,const char * url,const char * method,const char * version,const char * upload_data,size_t * upload_data_size,void ** connection_state)236 static MHD_RESULT http_handler(void *cls, struct MHD_Connection *connection,
237 const char *url, const char *method,
238 const char *version, const char *upload_data,
239 size_t *upload_data_size,
240 void **connection_state) {
241 if (strcmp(method, MHD_HTTP_METHOD_GET) != 0) {
242 return MHD_NO;
243 }
244
245 /* On the first call for each connection, return without anything further.
246 * Apparently not everything has been initialized yet or so; the docs are not
247 * very specific on the issue. */
248 if (*connection_state == NULL) {
249 /* set to a random non-NULL pointer. */
250 *connection_state = &(int){42};
251 return MHD_YES;
252 }
253
254 char const *accept = MHD_lookup_connection_value(connection, MHD_HEADER_KIND,
255 MHD_HTTP_HEADER_ACCEPT);
256 bool want_proto = (accept != NULL) &&
257 (strstr(accept, "application/vnd.google.protobuf") != NULL);
258
259 uint8_t scratch[4096] = {0};
260 ProtobufCBufferSimple simple = PROTOBUF_C_BUFFER_SIMPLE_INIT(scratch);
261 ProtobufCBuffer *buffer = (ProtobufCBuffer *)&simple;
262
263 if (want_proto)
264 format_protobuf(buffer);
265 else
266 format_text(buffer);
267
268 #if defined(MHD_VERSION) && MHD_VERSION >= 0x00090500
269 struct MHD_Response *res = MHD_create_response_from_buffer(
270 simple.len, simple.data, MHD_RESPMEM_MUST_COPY);
271 #else
272 struct MHD_Response *res = MHD_create_response_from_data(
273 simple.len, simple.data, /* must_free = */ 0, /* must_copy = */ 1);
274 #endif
275 MHD_add_response_header(res, MHD_HTTP_HEADER_CONTENT_TYPE,
276 want_proto ? CONTENT_TYPE_PROTO : CONTENT_TYPE_TEXT);
277
278 MHD_RESULT status = MHD_queue_response(connection, MHD_HTTP_OK, res);
279
280 MHD_destroy_response(res);
281 PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&simple);
282 return status;
283 }
284
285 /*
286 * Functions for manipulating the global state in "metrics". This is organized
287 * in two tiers: the global "metrics" tree holds "metric families", which are
288 * identified by a name (a string). Each metric family has one or more
289 * "metrics", which are identified by a unique set of key-value-pairs. For
290 * example:
291 *
292 * collectd_cpu_total
293 * {cpu="0",type="idle"}
294 * {cpu="0",type="user"}
295 * ...
296 * collectd_memory
297 * {memory="used"}
298 * {memory="free"}
299 * ...
300 * {{{ */
301 /* label_pair_destroy frees the memory used by a label pair. */
label_pair_destroy(Io__Prometheus__Client__LabelPair * msg)302 static void label_pair_destroy(Io__Prometheus__Client__LabelPair *msg) {
303 if (msg == NULL)
304 return;
305
306 sfree(msg->name);
307 sfree(msg->value);
308
309 sfree(msg);
310 }
311
312 /* label_pair_clone allocates and initializes a new label pair. */
313 static Io__Prometheus__Client__LabelPair *
label_pair_clone(Io__Prometheus__Client__LabelPair const * orig)314 label_pair_clone(Io__Prometheus__Client__LabelPair const *orig) {
315 Io__Prometheus__Client__LabelPair *copy = calloc(1, sizeof(*copy));
316 if (copy == NULL)
317 return NULL;
318 io__prometheus__client__label_pair__init(copy);
319
320 copy->name = strdup(orig->name);
321 copy->value = strdup(orig->value);
322 if ((copy->name == NULL) || (copy->value == NULL)) {
323 label_pair_destroy(copy);
324 return NULL;
325 }
326
327 return copy;
328 }
329
330 /* metric_destroy frees the memory used by a metric. */
metric_destroy(Io__Prometheus__Client__Metric * msg)331 static void metric_destroy(Io__Prometheus__Client__Metric *msg) {
332 if (msg == NULL)
333 return;
334
335 for (size_t i = 0; i < msg->n_label; i++) {
336 label_pair_destroy(msg->label[i]);
337 }
338 sfree(msg->label);
339
340 sfree(msg->gauge);
341 sfree(msg->counter);
342
343 sfree(msg);
344 }
345
346 /* metric_cmp compares two metrics. It's prototype makes it easy to use with
347 * qsort(3) and bsearch(3). */
metric_cmp(void const * a,void const * b)348 static int metric_cmp(void const *a, void const *b) {
349 Io__Prometheus__Client__Metric const *m_a =
350 *((Io__Prometheus__Client__Metric **)a);
351 Io__Prometheus__Client__Metric const *m_b =
352 *((Io__Prometheus__Client__Metric **)b);
353
354 if (m_a->n_label < m_b->n_label)
355 return -1;
356 else if (m_a->n_label > m_b->n_label)
357 return 1;
358
359 /* Prometheus does not care about the order of labels. All labels in this
360 * plugin are created by METRIC_ADD_LABELS(), though, and therefore always
361 * appear in the same order. We take advantage of this and simplify the check
362 * by making sure all labels are the same in each position.
363 *
364 * We also only need to check the label values, because the label names are
365 * the same for all metrics in a metric family.
366 *
367 * 3 labels:
368 * [0] $plugin="$plugin_instance" => $plugin is the same within a family
369 * [1] type="$type_instance" => "type" is a static string
370 * [2] instance="$host" => "instance" is a static string
371 *
372 * 2 labels, variant 1:
373 * [0] $plugin="$plugin_instance" => $plugin is the same within a family
374 * [1] instance="$host" => "instance" is a static string
375 *
376 * 2 labels, variant 2:
377 * [0] $plugin="$type_instance" => $plugin is the same within a family
378 * [1] instance="$host" => "instance" is a static string
379 *
380 * 1 label:
381 * [1] instance="$host" => "instance" is a static string
382 */
383 for (size_t i = 0; i < m_a->n_label; i++) {
384 int status = strcmp(m_a->label[i]->value, m_b->label[i]->value);
385 if (status != 0)
386 return status;
387
388 #if COLLECT_DEBUG
389 assert(strcmp(m_a->label[i]->name, m_b->label[i]->name) == 0);
390 #endif
391 }
392
393 return 0;
394 }
395
396 #define METRIC_INIT \
397 &(Io__Prometheus__Client__Metric) { \
398 .label = \
399 (Io__Prometheus__Client__LabelPair *[]){ \
400 &(Io__Prometheus__Client__LabelPair){ \
401 .name = NULL, \
402 }, \
403 &(Io__Prometheus__Client__LabelPair){ \
404 .name = NULL, \
405 }, \
406 &(Io__Prometheus__Client__LabelPair){ \
407 .name = NULL, \
408 }, \
409 }, \
410 .n_label = 0, \
411 }
412
413 #define METRIC_ADD_LABELS(m, vl) \
414 do { \
415 if (strlen((vl)->plugin_instance) != 0) { \
416 (m)->label[(m)->n_label]->name = (char *)(vl)->plugin; \
417 (m)->label[(m)->n_label]->value = (char *)(vl)->plugin_instance; \
418 (m)->n_label++; \
419 } \
420 \
421 if (strlen((vl)->type_instance) != 0) { \
422 (m)->label[(m)->n_label]->name = "type"; \
423 if (strlen((vl)->plugin_instance) == 0) \
424 (m)->label[(m)->n_label]->name = (char *)(vl)->plugin; \
425 (m)->label[(m)->n_label]->value = (char *)(vl)->type_instance; \
426 (m)->n_label++; \
427 } \
428 \
429 (m)->label[(m)->n_label]->name = "instance"; \
430 (m)->label[(m)->n_label]->value = (char *)(vl)->host; \
431 (m)->n_label++; \
432 } while (0)
433
434 /* metric_clone allocates and initializes a new metric based on orig. */
435 static Io__Prometheus__Client__Metric *
metric_clone(Io__Prometheus__Client__Metric const * orig)436 metric_clone(Io__Prometheus__Client__Metric const *orig) {
437 Io__Prometheus__Client__Metric *copy = calloc(1, sizeof(*copy));
438 if (copy == NULL)
439 return NULL;
440 io__prometheus__client__metric__init(copy);
441
442 copy->n_label = orig->n_label;
443 copy->label = calloc(copy->n_label, sizeof(*copy->label));
444 if (copy->label == NULL) {
445 sfree(copy);
446 return NULL;
447 }
448
449 for (size_t i = 0; i < copy->n_label; i++) {
450 copy->label[i] = label_pair_clone(orig->label[i]);
451 if (copy->label[i] == NULL) {
452 metric_destroy(copy);
453 return NULL;
454 }
455 }
456
457 return copy;
458 }
459
460 /* metric_update stores the new value and timestamp in m. */
metric_update(Io__Prometheus__Client__Metric * m,value_t value,int ds_type,cdtime_t t,cdtime_t interval)461 static int metric_update(Io__Prometheus__Client__Metric *m, value_t value,
462 int ds_type, cdtime_t t, cdtime_t interval) {
463 if (ds_type == DS_TYPE_GAUGE) {
464 sfree(m->counter);
465 if (m->gauge == NULL) {
466 m->gauge = calloc(1, sizeof(*m->gauge));
467 if (m->gauge == NULL)
468 return ENOMEM;
469 io__prometheus__client__gauge__init(m->gauge);
470 }
471
472 m->gauge->value = (double)value.gauge;
473 m->gauge->has_value = 1;
474 } else { /* not gauge */
475 sfree(m->gauge);
476 if (m->counter == NULL) {
477 m->counter = calloc(1, sizeof(*m->counter));
478 if (m->counter == NULL)
479 return ENOMEM;
480 io__prometheus__client__counter__init(m->counter);
481 }
482
483 switch (ds_type) {
484 case DS_TYPE_ABSOLUTE:
485 m->counter->value = (double)value.absolute;
486 break;
487 case DS_TYPE_COUNTER:
488 m->counter->value = (double)value.counter;
489 break;
490 default:
491 m->counter->value = (double)value.derive;
492 break;
493 }
494 m->counter->has_value = 1;
495 }
496
497 /* Prometheus has a globally configured timeout after which metrics are
498 * considered stale. This causes problems when metrics have an interval
499 * exceeding that limit. We emulate the behavior of "pushgateway" and *not*
500 * send a timestamp value – Prometheus will fill in the current time. */
501 if (interval <= staleness_delta) {
502 m->timestamp_ms = CDTIME_T_TO_MS(t);
503 m->has_timestamp_ms = 1;
504 } else {
505 static c_complain_t long_metric = C_COMPLAIN_INIT_STATIC;
506 c_complain(
507 LOG_NOTICE, &long_metric,
508 "write_prometheus plugin: You have metrics with an interval exceeding "
509 "\"StalenessDelta\" setting (%.3fs). This is suboptimal, please check "
510 "the collectd.conf(5) manual page to understand what's going on.",
511 CDTIME_T_TO_DOUBLE(staleness_delta));
512
513 m->timestamp_ms = 0;
514 m->has_timestamp_ms = 0;
515 }
516
517 return 0;
518 }
519
520 /* metric_family_add_metric adds m to the metric list of fam. */
metric_family_add_metric(Io__Prometheus__Client__MetricFamily * fam,Io__Prometheus__Client__Metric * m)521 static int metric_family_add_metric(Io__Prometheus__Client__MetricFamily *fam,
522 Io__Prometheus__Client__Metric *m) {
523 Io__Prometheus__Client__Metric **tmp =
524 realloc(fam->metric, (fam->n_metric + 1) * sizeof(*fam->metric));
525 if (tmp == NULL)
526 return ENOMEM;
527 fam->metric = tmp;
528
529 fam->metric[fam->n_metric] = m;
530 fam->n_metric++;
531
532 /* Sort the metrics so that lookup is fast. */
533 qsort(fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
534
535 return 0;
536 }
537
538 /* metric_family_delete_metric looks up and deletes the metric corresponding to
539 * vl. */
540 static int
metric_family_delete_metric(Io__Prometheus__Client__MetricFamily * fam,value_list_t const * vl)541 metric_family_delete_metric(Io__Prometheus__Client__MetricFamily *fam,
542 value_list_t const *vl) {
543 Io__Prometheus__Client__Metric *key = METRIC_INIT;
544 METRIC_ADD_LABELS(key, vl);
545
546 size_t i;
547 for (i = 0; i < fam->n_metric; i++) {
548 if (metric_cmp(&key, &fam->metric[i]) == 0)
549 break;
550 }
551
552 if (i >= fam->n_metric)
553 return ENOENT;
554
555 metric_destroy(fam->metric[i]);
556 if ((fam->n_metric - 1) > i)
557 memmove(&fam->metric[i], &fam->metric[i + 1],
558 ((fam->n_metric - 1) - i) * sizeof(fam->metric[i]));
559 fam->n_metric--;
560
561 if (fam->n_metric == 0) {
562 sfree(fam->metric);
563 return 0;
564 }
565
566 Io__Prometheus__Client__Metric **tmp =
567 realloc(fam->metric, fam->n_metric * sizeof(*fam->metric));
568 if (tmp != NULL)
569 fam->metric = tmp;
570
571 return 0;
572 }
573
574 /* metric_family_get_metric looks up the matching metric in a metric family,
575 * allocating it if necessary. */
576 static Io__Prometheus__Client__Metric *
metric_family_get_metric(Io__Prometheus__Client__MetricFamily * fam,value_list_t const * vl)577 metric_family_get_metric(Io__Prometheus__Client__MetricFamily *fam,
578 value_list_t const *vl) {
579 Io__Prometheus__Client__Metric *key = METRIC_INIT;
580 METRIC_ADD_LABELS(key, vl);
581
582 /* Metrics are sorted in metric_family_add_metric() so that we can do a binary
583 * search here. */
584 Io__Prometheus__Client__Metric **m = bsearch(
585 &key, fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
586
587 if (m != NULL) {
588 return *m;
589 }
590
591 Io__Prometheus__Client__Metric *new_metric = metric_clone(key);
592 if (new_metric == NULL)
593 return NULL;
594
595 DEBUG("write_prometheus plugin: created new metric in family");
596 int status = metric_family_add_metric(fam, new_metric);
597 if (status != 0) {
598 metric_destroy(new_metric);
599 return NULL;
600 }
601
602 return new_metric;
603 }
604
605 /* metric_family_update looks up the matching metric in a metric family,
606 * allocating it if necessary, and updates the metric to the latest value. */
metric_family_update(Io__Prometheus__Client__MetricFamily * fam,data_set_t const * ds,value_list_t const * vl,size_t ds_index)607 static int metric_family_update(Io__Prometheus__Client__MetricFamily *fam,
608 data_set_t const *ds, value_list_t const *vl,
609 size_t ds_index) {
610 Io__Prometheus__Client__Metric *m = metric_family_get_metric(fam, vl);
611 if (m == NULL)
612 return -1;
613
614 return metric_update(m, vl->values[ds_index], ds->ds[ds_index].type, vl->time,
615 vl->interval);
616 }
617
618 /* metric_family_destroy frees the memory used by a metric family. */
metric_family_destroy(Io__Prometheus__Client__MetricFamily * msg)619 static void metric_family_destroy(Io__Prometheus__Client__MetricFamily *msg) {
620 if (msg == NULL)
621 return;
622
623 sfree(msg->name);
624 sfree(msg->help);
625
626 for (size_t i = 0; i < msg->n_metric; i++) {
627 metric_destroy(msg->metric[i]);
628 }
629 sfree(msg->metric);
630
631 sfree(msg);
632 }
633
634 /* metric_family_create allocates and initializes a new metric family. */
635 static Io__Prometheus__Client__MetricFamily *
metric_family_create(char * name,data_set_t const * ds,value_list_t const * vl,size_t ds_index)636 metric_family_create(char *name, data_set_t const *ds, value_list_t const *vl,
637 size_t ds_index) {
638 Io__Prometheus__Client__MetricFamily *msg = calloc(1, sizeof(*msg));
639 if (msg == NULL)
640 return NULL;
641 io__prometheus__client__metric_family__init(msg);
642
643 msg->name = name;
644
645 char help[1024];
646 ssnprintf(
647 help, sizeof(help),
648 "write_prometheus plugin: '%s' Type: '%s', Dstype: '%s', Dsname: '%s'",
649 vl->plugin, vl->type, DS_TYPE_TO_STRING(ds->ds[ds_index].type),
650 ds->ds[ds_index].name);
651 msg->help = strdup(help);
652
653 msg->type = (ds->ds[ds_index].type == DS_TYPE_GAUGE)
654 ? IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE
655 : IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER;
656 msg->has_type = 1;
657
658 return msg;
659 }
660
661 /* metric_family_name creates a metric family's name from a data source. This is
662 * done in the same way as done by the "collectd_exporter" for best possible
663 * compatibility. In essence, the plugin, type and data source name go in the
664 * metric family name, while hostname, plugin instance and type instance go into
665 * the labels of a metric. */
metric_family_name(data_set_t const * ds,value_list_t const * vl,size_t ds_index)666 static char *metric_family_name(data_set_t const *ds, value_list_t const *vl,
667 size_t ds_index) {
668 char const *fields[5] = {"collectd"};
669 size_t fields_num = 1;
670
671 if (strcmp(vl->plugin, vl->type) != 0) {
672 fields[fields_num] = vl->plugin;
673 fields_num++;
674 }
675 fields[fields_num] = vl->type;
676 fields_num++;
677
678 if (strcmp("value", ds->ds[ds_index].name) != 0) {
679 fields[fields_num] = ds->ds[ds_index].name;
680 fields_num++;
681 }
682
683 /* Prometheus best practices:
684 * cumulative metrics should have a "total" suffix. */
685 if ((ds->ds[ds_index].type == DS_TYPE_COUNTER) ||
686 (ds->ds[ds_index].type == DS_TYPE_DERIVE)) {
687 fields[fields_num] = "total";
688 fields_num++;
689 }
690
691 char name[5 * DATA_MAX_NAME_LEN];
692 strjoin(name, sizeof(name), (char **)fields, fields_num, "_");
693 return strdup(name);
694 }
695
696 /* metric_family_get looks up the matching metric family, allocating it if
697 * necessary. */
698 static Io__Prometheus__Client__MetricFamily *
metric_family_get(data_set_t const * ds,value_list_t const * vl,size_t ds_index,bool allocate)699 metric_family_get(data_set_t const *ds, value_list_t const *vl, size_t ds_index,
700 bool allocate) {
701 char *name = metric_family_name(ds, vl, ds_index);
702 if (name == NULL) {
703 ERROR("write_prometheus plugin: Allocating metric family name failed.");
704 return NULL;
705 }
706
707 Io__Prometheus__Client__MetricFamily *fam = NULL;
708 if (c_avl_get(metrics, name, (void *)&fam) == 0) {
709 sfree(name);
710 assert(fam != NULL);
711 return fam;
712 }
713
714 if (!allocate) {
715 sfree(name);
716 return NULL;
717 }
718
719 fam = metric_family_create(name, ds, vl, ds_index);
720 if (fam == NULL) {
721 ERROR("write_prometheus plugin: Allocating metric family failed.");
722 sfree(name);
723 return NULL;
724 }
725
726 /* If successful, "name" is owned by "fam", i.e. don't free it here. */
727 DEBUG("write_prometheus plugin: metric family \"%s\" has been created.",
728 name);
729 name = NULL;
730
731 int status = c_avl_insert(metrics, fam->name, fam);
732 if (status != 0) {
733 ERROR("write_prometheus plugin: Adding \"%s\" failed.", fam->name);
734 metric_family_destroy(fam);
735 return NULL;
736 }
737
738 return fam;
739 }
740 /* }}} */
741
prom_logger(void * arg,char const * fmt,va_list ap)742 static void prom_logger(__attribute__((unused)) void *arg, char const *fmt,
743 va_list ap) {
744 /* {{{ */
745 char errbuf[1024];
746 vsnprintf(errbuf, sizeof(errbuf), fmt, ap);
747
748 ERROR("write_prometheus plugin: %s", errbuf);
749 } /* }}} prom_logger */
750
751 #if MHD_VERSION >= 0x00090000
prom_open_socket(int addrfamily)752 static int prom_open_socket(int addrfamily) {
753 /* {{{ */
754 char service[NI_MAXSERV];
755 ssnprintf(service, sizeof(service), "%hu", httpd_port);
756
757 struct addrinfo *res;
758 int status = getaddrinfo(httpd_host, service,
759 &(struct addrinfo){
760 .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
761 .ai_family = addrfamily,
762 .ai_socktype = SOCK_STREAM,
763 },
764 &res);
765 if (status != 0) {
766 return -1;
767 }
768
769 int fd = -1;
770 for (struct addrinfo *ai = res; ai != NULL; ai = ai->ai_next) {
771 int flags = ai->ai_socktype;
772 #ifdef SOCK_CLOEXEC
773 flags |= SOCK_CLOEXEC;
774 #endif
775
776 fd = socket(ai->ai_family, flags, 0);
777 if (fd == -1)
778 continue;
779
780 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) != 0) {
781 WARNING("write_prometheus plugin: setsockopt(SO_REUSEADDR) failed: %s",
782 STRERRNO);
783 close(fd);
784 fd = -1;
785 continue;
786 }
787
788 if (bind(fd, ai->ai_addr, ai->ai_addrlen) != 0) {
789 close(fd);
790 fd = -1;
791 continue;
792 }
793
794 if (listen(fd, /* backlog = */ 16) != 0) {
795 close(fd);
796 fd = -1;
797 continue;
798 }
799
800 char str_node[NI_MAXHOST];
801 char str_service[NI_MAXSERV];
802
803 getnameinfo(ai->ai_addr, ai->ai_addrlen, str_node, sizeof(str_node),
804 str_service, sizeof(str_service),
805 NI_NUMERICHOST | NI_NUMERICSERV);
806
807 INFO("write_prometheus plugin: Listening on [%s]:%s.", str_node,
808 str_service);
809 break;
810 }
811
812 freeaddrinfo(res);
813
814 return fd;
815 } /* }}} int prom_open_socket */
816
prom_start_daemon()817 static struct MHD_Daemon *prom_start_daemon() {
818 /* {{{ */
819 int fd = prom_open_socket(PF_INET6);
820 if (fd == -1)
821 fd = prom_open_socket(PF_INET);
822 if (fd == -1) {
823 ERROR("write_prometheus plugin: Opening a listening socket for [%s]:%hu "
824 "failed.",
825 (httpd_host != NULL) ? httpd_host : "::", httpd_port);
826 return NULL;
827 }
828
829 unsigned int flags = MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG;
830 #if MHD_VERSION >= 0x00095300
831 flags |= MHD_USE_INTERNAL_POLLING_THREAD;
832 #endif
833
834 struct MHD_Daemon *d = MHD_start_daemon(
835 flags, httpd_port,
836 /* MHD_AcceptPolicyCallback = */ NULL,
837 /* MHD_AcceptPolicyCallback arg = */ NULL, http_handler, NULL,
838 MHD_OPTION_LISTEN_SOCKET, fd, MHD_OPTION_EXTERNAL_LOGGER, prom_logger,
839 NULL, MHD_OPTION_END);
840 if (d == NULL) {
841 ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
842 close(fd);
843 return NULL;
844 }
845
846 return d;
847 } /* }}} struct MHD_Daemon *prom_start_daemon */
848 #else /* if MHD_VERSION < 0x00090000 */
prom_start_daemon()849 static struct MHD_Daemon *prom_start_daemon() {
850 /* {{{ */
851 struct MHD_Daemon *d = MHD_start_daemon(
852 MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG, httpd_port,
853 /* MHD_AcceptPolicyCallback = */ NULL,
854 /* MHD_AcceptPolicyCallback arg = */ NULL, http_handler, NULL,
855 MHD_OPTION_EXTERNAL_LOGGER, prom_logger, NULL, MHD_OPTION_END);
856 if (d == NULL) {
857 ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
858 return NULL;
859 }
860
861 return d;
862 } /* }}} struct MHD_Daemon *prom_start_daemon */
863 #endif
864
865 /*
866 * collectd callbacks
867 */
prom_config(oconfig_item_t * ci)868 static int prom_config(oconfig_item_t *ci) {
869 for (int i = 0; i < ci->children_num; i++) {
870 oconfig_item_t *child = ci->children + i;
871
872 if (strcasecmp("Host", child->key) == 0) {
873 #if MHD_VERSION >= 0x00090000
874 cf_util_get_string(child, &httpd_host);
875 #else
876 ERROR("write_prometheus plugin: Option `Host' not supported. Please "
877 "upgrade libmicrohttpd to at least 0.9.0");
878 return -1;
879 #endif
880 } else if (strcasecmp("Port", child->key) == 0) {
881 int status = cf_util_get_port_number(child);
882 if (status > 0)
883 httpd_port = (unsigned short)status;
884 } else if (strcasecmp("StalenessDelta", child->key) == 0) {
885 cf_util_get_cdtime(child, &staleness_delta);
886 } else {
887 WARNING("write_prometheus plugin: Ignoring unknown configuration option "
888 "\"%s\".",
889 child->key);
890 }
891 }
892
893 return 0;
894 }
895
prom_init()896 static int prom_init() {
897 if (metrics == NULL) {
898 metrics = c_avl_create((void *)strcmp);
899 if (metrics == NULL) {
900 ERROR("write_prometheus plugin: c_avl_create() failed.");
901 return -1;
902 }
903 }
904
905 if (httpd == NULL) {
906 httpd = prom_start_daemon();
907 if (httpd == NULL) {
908 return -1;
909 }
910 DEBUG("write_prometheus plugin: Successfully started microhttpd %s",
911 MHD_get_version());
912 }
913
914 return 0;
915 }
916
prom_write(data_set_t const * ds,value_list_t const * vl,user_data_t * ud)917 static int prom_write(data_set_t const *ds, value_list_t const *vl,
918 __attribute__((unused)) user_data_t *ud) {
919 pthread_mutex_lock(&metrics_lock);
920
921 for (size_t i = 0; i < ds->ds_num; i++) {
922 Io__Prometheus__Client__MetricFamily *fam =
923 metric_family_get(ds, vl, i, /* allocate = */ true);
924 if (fam == NULL)
925 continue;
926
927 int status = metric_family_update(fam, ds, vl, i);
928 if (status != 0) {
929 ERROR("write_prometheus plugin: Updating metric \"%s\" failed with "
930 "status %d",
931 fam->name, status);
932 continue;
933 }
934 }
935
936 pthread_mutex_unlock(&metrics_lock);
937 return 0;
938 }
939
prom_missing(value_list_t const * vl,user_data_t * ud)940 static int prom_missing(value_list_t const *vl,
941 __attribute__((unused)) user_data_t *ud) {
942 data_set_t const *ds = plugin_get_ds(vl->type);
943 if (ds == NULL)
944 return ENOENT;
945
946 pthread_mutex_lock(&metrics_lock);
947
948 for (size_t i = 0; i < ds->ds_num; i++) {
949 Io__Prometheus__Client__MetricFamily *fam =
950 metric_family_get(ds, vl, i, /* allocate = */ false);
951 if (fam == NULL)
952 continue;
953
954 int status = metric_family_delete_metric(fam, vl);
955 if (status != 0) {
956 ERROR("write_prometheus plugin: Deleting a metric in family \"%s\" "
957 "failed with status %d",
958 fam->name, status);
959
960 continue;
961 }
962
963 if (fam->n_metric == 0) {
964 int status = c_avl_remove(metrics, fam->name, NULL, NULL);
965 if (status != 0) {
966 ERROR("write_prometheus plugin: Deleting metric family \"%s\" failed "
967 "with status %d",
968 fam->name, status);
969 continue;
970 }
971 metric_family_destroy(fam);
972 }
973 }
974
975 pthread_mutex_unlock(&metrics_lock);
976 return 0;
977 }
978
prom_shutdown()979 static int prom_shutdown() {
980 if (httpd != NULL) {
981 MHD_stop_daemon(httpd);
982 httpd = NULL;
983 }
984
985 pthread_mutex_lock(&metrics_lock);
986 if (metrics != NULL) {
987 char *name;
988 Io__Prometheus__Client__MetricFamily *fam;
989 while (c_avl_pick(metrics, (void *)&name, (void *)&fam) == 0) {
990 assert(name == fam->name);
991 name = NULL;
992
993 metric_family_destroy(fam);
994 }
995 c_avl_destroy(metrics);
996 metrics = NULL;
997 }
998 pthread_mutex_unlock(&metrics_lock);
999
1000 sfree(httpd_host);
1001
1002 return 0;
1003 }
1004
module_register()1005 void module_register() {
1006 plugin_register_complex_config("write_prometheus", prom_config);
1007 plugin_register_init("write_prometheus", prom_init);
1008 plugin_register_write("write_prometheus", prom_write,
1009 /* user data = */ NULL);
1010 plugin_register_missing("write_prometheus", prom_missing,
1011 /* user data = */ NULL);
1012 plugin_register_shutdown("write_prometheus", prom_shutdown);
1013 }
1014