1 /**
2 * collectd - src/rrdcached.c
3 * Copyright (C) 2008-2013 Florian octo Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian octo Forster <octo at collectd.org>
25 **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/common/common.h"
31 #include "utils/rrdcreate/rrdcreate.h"
32
33 #undef HAVE_CONFIG_H
34 #include <rrd.h>
35 #include <rrd_client.h>
36
37 /*
38 * Private variables
39 */
40 static char *datadir;
41 static char *daemon_address;
42 static bool config_create_files = true;
43 static bool config_collect_stats = true;
44 static rrdcreate_config_t rrdcreate_config = {.stepsize = 0,
45 .heartbeat = 0,
46 .rrarows = 1200,
47 .xff = 0.1,
48 .timespans = NULL,
49 .timespans_num = 0,
50 .consolidation_functions = NULL,
51 .consolidation_functions_num = 0,
52 .async = 0};
53
54 /*
55 * Prototypes.
56 */
57 static int rc_write(const data_set_t *ds, const value_list_t *vl,
58 __attribute__((unused)) user_data_t *ud);
59 static int rc_flush(__attribute__((unused)) cdtime_t timeout,
60 const char *identifier,
61 __attribute__((unused)) user_data_t *ud);
62
value_list_to_string(char * buffer,int buffer_len,const data_set_t * ds,const value_list_t * vl)63 static int value_list_to_string(char *buffer, int buffer_len,
64 const data_set_t *ds, const value_list_t *vl) {
65 assert(strcmp(ds->type, vl->type) == 0);
66
67 memset(buffer, '\0', buffer_len);
68
69 int status =
70 ssnprintf(buffer, buffer_len, "%.6f", CDTIME_T_TO_DOUBLE(vl->time));
71 if ((status < 1) || (status >= buffer_len))
72 return -1;
73 int offset = status;
74
75 for (size_t i = 0; i < ds->ds_num; i++) {
76 if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
77 (ds->ds[i].type != DS_TYPE_GAUGE) &&
78 (ds->ds[i].type != DS_TYPE_DERIVE) &&
79 (ds->ds[i].type != DS_TYPE_ABSOLUTE))
80 return -1;
81
82 if (ds->ds[i].type == DS_TYPE_COUNTER) {
83 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
84 (uint64_t)vl->values[i].counter);
85 } else if (ds->ds[i].type == DS_TYPE_GAUGE) {
86 status = ssnprintf(buffer + offset, buffer_len - offset, ":%f",
87 vl->values[i].gauge);
88 } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
89 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
90 vl->values[i].derive);
91 } else /* if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */ {
92 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
93 vl->values[i].absolute);
94 }
95
96 if ((status < 1) || (status >= (buffer_len - offset)))
97 return -1;
98
99 offset += status;
100 } /* for ds->ds_num */
101
102 return 0;
103 } /* int value_list_to_string */
104
value_list_to_filename(char * buffer,size_t buffer_size,value_list_t const * vl)105 static int value_list_to_filename(char *buffer, size_t buffer_size,
106 value_list_t const *vl) {
107 char const suffix[] = ".rrd";
108
109 if (datadir != NULL) {
110 size_t datadir_len = strlen(datadir) + 1;
111
112 if (datadir_len >= buffer_size)
113 return ENOMEM;
114
115 sstrncpy(buffer, datadir, buffer_size);
116 buffer[datadir_len - 1] = '/';
117 buffer[datadir_len] = '\0';
118
119 buffer += datadir_len;
120 buffer_size -= datadir_len;
121 }
122
123 int status = FORMAT_VL(buffer, buffer_size, vl);
124 if (status != 0)
125 return status;
126
127 size_t len = strlen(buffer);
128 assert(len < buffer_size);
129 buffer += len;
130 buffer_size -= len;
131
132 if (buffer_size <= sizeof(suffix))
133 return ENOMEM;
134
135 memcpy(buffer, suffix, sizeof(suffix));
136 return 0;
137 } /* int value_list_to_filename */
138
rc_config_get_int_positive(oconfig_item_t const * ci,int * ret)139 static int rc_config_get_int_positive(oconfig_item_t const *ci, int *ret) {
140 int tmp = 0;
141
142 int status = cf_util_get_int(ci, &tmp);
143 if (status != 0)
144 return status;
145 if (tmp < 0)
146 return EINVAL;
147
148 *ret = tmp;
149 return 0;
150 } /* int rc_config_get_int_positive */
151
rc_config_get_xff(oconfig_item_t const * ci,double * ret)152 static int rc_config_get_xff(oconfig_item_t const *ci, double *ret) {
153 if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER)) {
154 ERROR("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
155 "in the range [0.0, 1.0)",
156 ci->key);
157 return EINVAL;
158 }
159
160 double value = ci->values[0].value.number;
161 if ((value >= 0.0) && (value < 1.0)) {
162 *ret = value;
163 return 0;
164 }
165
166 ERROR("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
167 "in the range [0.0, 1.0)",
168 ci->key);
169 return EINVAL;
170 } /* int rc_config_get_xff */
171
rc_config_add_timespan(int timespan)172 static int rc_config_add_timespan(int timespan) {
173 if (timespan <= 0)
174 return EINVAL;
175
176 int *tmp = realloc(rrdcreate_config.timespans,
177 sizeof(*rrdcreate_config.timespans) *
178 (rrdcreate_config.timespans_num + 1));
179 if (tmp == NULL)
180 return ENOMEM;
181 rrdcreate_config.timespans = tmp;
182
183 rrdcreate_config.timespans[rrdcreate_config.timespans_num] = timespan;
184 rrdcreate_config.timespans_num++;
185
186 return 0;
187 } /* int rc_config_add_timespan */
188
rc_config(oconfig_item_t * ci)189 static int rc_config(oconfig_item_t *ci) {
190 for (int i = 0; i < ci->children_num; i++) {
191 oconfig_item_t const *child = ci->children + i;
192 const char *key = child->key;
193 int status = 0;
194
195 if (strcasecmp("DataDir", key) == 0) {
196 status = cf_util_get_string(child, &datadir);
197 if (status == 0) {
198 int len = strlen(datadir);
199
200 while ((len > 0) && (datadir[len - 1] == '/')) {
201 len--;
202 datadir[len] = 0;
203 }
204
205 if (len <= 0)
206 sfree(datadir);
207 }
208 } else if (strcasecmp("DaemonAddress", key) == 0)
209 status = cf_util_get_string(child, &daemon_address);
210 else if (strcasecmp("CreateFiles", key) == 0)
211 status = cf_util_get_boolean(child, &config_create_files);
212 else if (strcasecmp("CreateFilesAsync", key) == 0)
213 status = cf_util_get_boolean(child, &rrdcreate_config.async);
214 else if (strcasecmp("CollectStatistics", key) == 0)
215 status = cf_util_get_boolean(child, &config_collect_stats);
216 else if (strcasecmp("StepSize", key) == 0) {
217 int tmp = -1;
218
219 status = rc_config_get_int_positive(child, &tmp);
220 if (status == 0)
221 rrdcreate_config.stepsize = (unsigned long)tmp;
222 } else if (strcasecmp("HeartBeat", key) == 0)
223 status = rc_config_get_int_positive(child, &rrdcreate_config.heartbeat);
224 else if (strcasecmp("RRARows", key) == 0)
225 status = rc_config_get_int_positive(child, &rrdcreate_config.rrarows);
226 else if (strcasecmp("RRATimespan", key) == 0) {
227 int tmp = -1;
228 status = rc_config_get_int_positive(child, &tmp);
229 if (status == 0)
230 status = rc_config_add_timespan(tmp);
231 } else if (strcasecmp("XFF", key) == 0)
232 status = rc_config_get_xff(child, &rrdcreate_config.xff);
233 else {
234 WARNING("rrdcached plugin: Ignoring invalid option %s.", key);
235 continue;
236 }
237
238 if (status != 0)
239 WARNING("rrdcached plugin: Handling the \"%s\" option failed.", key);
240 }
241
242 if (daemon_address != NULL) {
243 plugin_register_write("rrdcached", rc_write, /* user_data = */ NULL);
244 plugin_register_flush("rrdcached", rc_flush, /* user_data = */ NULL);
245 }
246 return 0;
247 } /* int rc_config */
248
try_reconnect(void)249 static int try_reconnect(void) {
250 rrdc_disconnect();
251
252 rrd_clear_error();
253 int status = rrdc_connect(daemon_address);
254 if (status != 0) {
255 ERROR("rrdcached plugin: Failed to reconnect to RRDCacheD "
256 "at %s: %s (status=%d)",
257 daemon_address, rrd_get_error(), status);
258 return -1;
259 }
260
261 INFO("rrdcached plugin: Successfully reconnected to RRDCacheD "
262 "at %s",
263 daemon_address);
264 return 0;
265 } /* int try_reconnect */
266
rc_read(void)267 static int rc_read(void) {
268
269 value_list_t vl = VALUE_LIST_INIT;
270 vl.values = &(value_t){.gauge = NAN};
271 vl.values_len = 1;
272
273 if (daemon_address == NULL)
274 return -1;
275
276 if (!config_collect_stats)
277 return -1;
278
279 if ((strncmp("unix:", daemon_address, strlen("unix:")) != 0) &&
280 (daemon_address[0] != '/'))
281 sstrncpy(vl.host, daemon_address, sizeof(vl.host));
282 sstrncpy(vl.plugin, "rrdcached", sizeof(vl.plugin));
283
284 rrd_clear_error();
285 int status = rrdc_connect(daemon_address);
286 if (status != 0) {
287 ERROR("rrdcached plugin: Failed to connect to RRDCacheD "
288 "at %s: %s (status=%d)",
289 daemon_address, rrd_get_error(), status);
290 return -1;
291 }
292
293 rrdc_stats_t *head;
294 bool retried = false;
295
296 while (42) {
297 /* The RRD client lib does not provide any means for checking a
298 * connection, hence we'll have to retry upon failed operations. */
299 head = NULL;
300 rrd_clear_error();
301 status = rrdc_stats_get(&head);
302 if (status == 0)
303 break;
304
305 if (!retried) {
306 retried = true;
307 if (try_reconnect() == 0)
308 continue;
309 /* else: report the error and fail */
310 }
311
312 ERROR("rrdcached plugin: rrdc_stats_get failed: %s (status=%i).",
313 rrd_get_error(), status);
314 return -1;
315 }
316
317 for (rrdc_stats_t *ptr = head; ptr != NULL; ptr = ptr->next) {
318 if (ptr->type == RRDC_STATS_TYPE_GAUGE)
319 vl.values[0].gauge = (gauge_t)ptr->value.gauge;
320 else if (ptr->type == RRDC_STATS_TYPE_COUNTER)
321 vl.values[0].counter = (counter_t)ptr->value.counter;
322 else
323 continue;
324
325 if (strcasecmp("QueueLength", ptr->name) == 0) {
326 sstrncpy(vl.type, "queue_length", sizeof(vl.type));
327 sstrncpy(vl.type_instance, "", sizeof(vl.type_instance));
328 } else if (strcasecmp("UpdatesWritten", ptr->name) == 0) {
329 sstrncpy(vl.type, "operations", sizeof(vl.type));
330 sstrncpy(vl.type_instance, "write-updates", sizeof(vl.type_instance));
331 } else if (strcasecmp("DataSetsWritten", ptr->name) == 0) {
332 sstrncpy(vl.type, "operations", sizeof(vl.type));
333 sstrncpy(vl.type_instance, "write-data_sets", sizeof(vl.type_instance));
334 } else if (strcasecmp("TreeNodesNumber", ptr->name) == 0) {
335 sstrncpy(vl.type, "gauge", sizeof(vl.type));
336 sstrncpy(vl.type_instance, "tree_nodes", sizeof(vl.type_instance));
337 } else if (strcasecmp("TreeDepth", ptr->name) == 0) {
338 sstrncpy(vl.type, "gauge", sizeof(vl.type));
339 sstrncpy(vl.type_instance, "tree_depth", sizeof(vl.type_instance));
340 } else if (strcasecmp("FlushesReceived", ptr->name) == 0) {
341 sstrncpy(vl.type, "operations", sizeof(vl.type));
342 sstrncpy(vl.type_instance, "receive-flush", sizeof(vl.type_instance));
343 } else if (strcasecmp("JournalBytes", ptr->name) == 0) {
344 sstrncpy(vl.type, "counter", sizeof(vl.type));
345 sstrncpy(vl.type_instance, "journal-bytes", sizeof(vl.type_instance));
346 } else if (strcasecmp("JournalRotate", ptr->name) == 0) {
347 sstrncpy(vl.type, "counter", sizeof(vl.type));
348 sstrncpy(vl.type_instance, "journal-rotates", sizeof(vl.type_instance));
349 } else if (strcasecmp("UpdatesReceived", ptr->name) == 0) {
350 sstrncpy(vl.type, "operations", sizeof(vl.type));
351 sstrncpy(vl.type_instance, "receive-update", sizeof(vl.type_instance));
352 } else {
353 DEBUG("rrdcached plugin: rc_read: Unknown statistic `%s'.", ptr->name);
354 continue;
355 }
356
357 plugin_dispatch_values(&vl);
358 } /* for (ptr = head; ptr != NULL; ptr = ptr->next) */
359
360 rrdc_stats_free(head);
361
362 return 0;
363 } /* int rc_read */
364
rc_init(void)365 static int rc_init(void) {
366 if (config_collect_stats)
367 plugin_register_read("rrdcached", rc_read);
368
369 return 0;
370 } /* int rc_init */
371
rc_write(const data_set_t * ds,const value_list_t * vl,user_data_t * user_data)372 static int rc_write(const data_set_t *ds, const value_list_t *vl,
373 user_data_t __attribute__((unused)) * user_data) {
374 char filename[PATH_MAX];
375 char values[512];
376 int status;
377 bool retried = false;
378
379 if (daemon_address == NULL) {
380 ERROR("rrdcached plugin: daemon_address == NULL.");
381 plugin_unregister_write("rrdcached");
382 return -1;
383 }
384
385 if (strcmp(ds->type, vl->type) != 0) {
386 ERROR("rrdcached plugin: DS type does not match value list type");
387 return -1;
388 }
389
390 if (value_list_to_filename(filename, sizeof(filename), vl) != 0) {
391 ERROR("rrdcached plugin: value_list_to_filename failed.");
392 return -1;
393 }
394
395 if (value_list_to_string(values, sizeof(values), ds, vl) != 0) {
396 ERROR("rrdcached plugin: value_list_to_string failed.");
397 return -1;
398 }
399
400 if (config_create_files) {
401 struct stat statbuf;
402
403 status = stat(filename, &statbuf);
404 if (status != 0) {
405 if (errno != ENOENT) {
406 ERROR("rrdcached plugin: stat (%s) failed: %s", filename, STRERRNO);
407 return -1;
408 }
409
410 status = cu_rrd_create_file(filename, ds, vl, &rrdcreate_config);
411 if (status != 0) {
412 ERROR("rrdcached plugin: cu_rrd_create_file (%s) failed.", filename);
413 return -1;
414 } else if (rrdcreate_config.async)
415 return 0;
416 }
417 }
418
419 rrd_clear_error();
420 status = rrdc_connect(daemon_address);
421 if (status != 0) {
422 ERROR("rrdcached plugin: Failed to connect to RRDCacheD "
423 "at %s: %s (status=%d)",
424 daemon_address, rrd_get_error(), status);
425 return -1;
426 }
427
428 char *values_array[2] = {
429 [0] = values,
430 [1] = NULL,
431 };
432
433 while (42) {
434 /* The RRD client lib does not provide any means for checking a
435 * connection, hence we'll have to retry upon failed operations. */
436 rrd_clear_error();
437 status = rrdc_update(filename, /* values_num = */ 1, (void *)values_array);
438 if (status == 0)
439 break;
440
441 if (!retried) {
442 retried = true;
443 if (try_reconnect() == 0)
444 continue;
445 /* else: report the error and fail */
446 }
447
448 ERROR("rrdcached plugin: rrdc_update (%s, [%s], 1) failed: %s (status=%i)",
449 filename, values_array[0], rrd_get_error(), status);
450 return -1;
451 }
452
453 return 0;
454 } /* int rc_write */
455
rc_flush(cdtime_t timeout,const char * identifier,user_data_t * ud)456 static int rc_flush(__attribute__((unused)) cdtime_t timeout, /* {{{ */
457 const char *identifier,
458 __attribute__((unused)) user_data_t *ud) {
459 if (identifier == NULL)
460 return EINVAL;
461
462 char filename[PATH_MAX + 1];
463
464 if (datadir != NULL)
465 ssnprintf(filename, sizeof(filename), "%s/%s.rrd", datadir, identifier);
466 else
467 ssnprintf(filename, sizeof(filename), "%s.rrd", identifier);
468
469 rrd_clear_error();
470 int status = rrdc_connect(daemon_address);
471 if (status != 0) {
472 ERROR("rrdcached plugin: Failed to connect to RRDCacheD "
473 "at %s: %s (status=%d)",
474 daemon_address, rrd_get_error(), status);
475 return -1;
476 }
477
478 bool retried = false;
479
480 while (42) {
481 /* The RRD client lib does not provide any means for checking a
482 * connection, hence we'll have to retry upon failed operations. */
483 rrd_clear_error();
484 status = rrdc_flush(filename);
485 if (status == 0)
486 break;
487
488 if (!retried) {
489 retried = true;
490 if (try_reconnect() == 0)
491 continue;
492 /* else: report the error and fail */
493 }
494
495 ERROR("rrdcached plugin: rrdc_flush (%s) failed: %s (status=%i).", filename,
496 rrd_get_error(), status);
497 return -1;
498 }
499 DEBUG("rrdcached plugin: rrdc_flush (%s): Success.", filename);
500
501 return 0;
502 } /* }}} int rc_flush */
503
rc_shutdown(void)504 static int rc_shutdown(void) {
505 rrdc_disconnect();
506 return 0;
507 } /* int rc_shutdown */
508
module_register(void)509 void module_register(void) {
510 plugin_register_complex_config("rrdcached", rc_config);
511 plugin_register_init("rrdcached", rc_init);
512 plugin_register_shutdown("rrdcached", rc_shutdown);
513 } /* void module_register */
514