1 /**
2 * collectd - src/rrdtool.c
3 * Copyright (C) 2006-2013 Florian octo Forster
4 * Copyright (C) 2008-2008 Sebastian Harl
5 * Copyright (C) 2009 Mariusz Gronczewski
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 *
20 * Authors:
21 * Florian octo Forster <octo at collectd.org>
22 * Sebastian Harl <sh at tokkee.org>
23 * Mariusz Gronczewski <xani666 at gmail.com>
24 **/
25
26 #include "collectd.h"
27
28 #include "plugin.h"
29 #include "utils/avltree/avltree.h"
30 #include "utils/common/common.h"
31 #include "utils/rrdcreate/rrdcreate.h"
32 #include "utils_random.h"
33
34 #include <rrd.h>
35
36 /*
37 * Private types
38 */
39 typedef struct rrd_cache_s {
40 int values_num;
41 char **values;
42 cdtime_t first_value;
43 cdtime_t last_value;
44 int64_t random_variation;
45 enum { FLAG_NONE = 0x00, FLAG_QUEUED = 0x01, FLAG_FLUSHQ = 0x02 } flags;
46 } rrd_cache_t;
47
48 enum rrd_queue_dir_e { QUEUE_INSERT_FRONT, QUEUE_INSERT_BACK };
49 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
50
51 struct rrd_queue_s {
52 char *filename;
53 struct rrd_queue_s *next;
54 };
55 typedef struct rrd_queue_s rrd_queue_t;
56
57 /*
58 * Private variables
59 */
60 static const char *config_keys[] = {
61 "CacheTimeout", "CacheFlush", "CreateFilesAsync", "DataDir",
62 "StepSize", "HeartBeat", "RRARows", "RRATimespan",
63 "XFF", "WritesPerSecond", "RandomTimeout"};
64 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
65
66 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
67 * is zero a default, depending on the `interval' member of the value list is
68 * being used. */
69 static char *datadir;
70 static double write_rate;
71 static rrdcreate_config_t rrdcreate_config = {
72 /* stepsize = */ 0,
73 /* heartbeat = */ 0,
74 /* rrarows = */ 1200,
75 /* xff = */ 0.1,
76
77 /* timespans = */ NULL,
78 /* timespans_num = */ 0,
79
80 /* consolidation_functions = */ NULL,
81 /* consolidation_functions_num = */ 0,
82
83 /* async = */ 0};
84
85 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
86 * ALWAYS lock `cache_lock' first! */
87 static cdtime_t cache_timeout;
88 static cdtime_t cache_flush_timeout;
89 static cdtime_t random_timeout;
90 static cdtime_t cache_flush_last;
91 static c_avl_tree_t *cache;
92 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
93
94 static rrd_queue_t *queue_head;
95 static rrd_queue_t *queue_tail;
96 static rrd_queue_t *flushq_head;
97 static rrd_queue_t *flushq_tail;
98 static pthread_t queue_thread;
99 static int queue_thread_running = 1;
100 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
101 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
102
103 #if !HAVE_THREADSAFE_LIBRRD
104 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
105 #endif
106
107 static int do_shutdown;
108
109 #if HAVE_THREADSAFE_LIBRRD
srrd_update(char * filename,char * template,int argc,const char ** argv)110 static int srrd_update(char *filename, char *template, int argc,
111 const char **argv) {
112 optind = 0; /* bug in librrd? */
113 rrd_clear_error();
114
115 int status = rrd_update_r(filename, template, argc, (void *)argv);
116 if (status != 0) {
117 WARNING("rrdtool plugin: rrd_update_r (%s) failed: %s", filename,
118 rrd_get_error());
119 }
120
121 return status;
122 } /* int srrd_update */
123 /* #endif HAVE_THREADSAFE_LIBRRD */
124
125 #else /* !HAVE_THREADSAFE_LIBRRD */
srrd_update(char * filename,char * template,int argc,const char ** argv)126 static int srrd_update(char *filename, char *template, int argc,
127 const char **argv) {
128 int status;
129
130 int new_argc;
131 char **new_argv;
132
133 assert(template == NULL);
134
135 new_argc = 2 + argc;
136 new_argv = malloc((new_argc + 1) * sizeof(*new_argv));
137 if (new_argv == NULL) {
138 ERROR("rrdtool plugin: malloc failed.");
139 return -1;
140 }
141
142 new_argv[0] = "update";
143 new_argv[1] = filename;
144
145 memcpy(new_argv + 2, argv, argc * sizeof(char *));
146 new_argv[new_argc] = NULL;
147
148 pthread_mutex_lock(&librrd_lock);
149 optind = 0; /* bug in librrd? */
150 rrd_clear_error();
151
152 status = rrd_update(new_argc, new_argv);
153 pthread_mutex_unlock(&librrd_lock);
154
155 if (status != 0) {
156 WARNING("rrdtool plugin: rrd_update_r failed: %s: %s", filename,
157 rrd_get_error());
158 }
159
160 sfree(new_argv);
161
162 return status;
163 } /* int srrd_update */
164 #endif /* !HAVE_THREADSAFE_LIBRRD */
165
value_list_to_string_multiple(char * buffer,int buffer_len,const data_set_t * ds,const value_list_t * vl)166 static int value_list_to_string_multiple(char *buffer, int buffer_len,
167 const data_set_t *ds,
168 const value_list_t *vl) {
169 int offset;
170 int status;
171 time_t tt;
172
173 memset(buffer, '\0', buffer_len);
174
175 tt = CDTIME_T_TO_TIME_T(vl->time);
176 status = ssnprintf(buffer, buffer_len, "%u", (unsigned int)tt);
177 if ((status < 1) || (status >= buffer_len))
178 return -1;
179 offset = status;
180
181 for (size_t i = 0; i < ds->ds_num; i++) {
182 if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
183 (ds->ds[i].type != DS_TYPE_GAUGE) &&
184 (ds->ds[i].type != DS_TYPE_DERIVE) &&
185 (ds->ds[i].type != DS_TYPE_ABSOLUTE))
186 return -1;
187
188 if (ds->ds[i].type == DS_TYPE_COUNTER)
189 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
190 (uint64_t)vl->values[i].counter);
191 else if (ds->ds[i].type == DS_TYPE_GAUGE)
192 status = ssnprintf(buffer + offset, buffer_len - offset, ":" GAUGE_FORMAT,
193 vl->values[i].gauge);
194 else if (ds->ds[i].type == DS_TYPE_DERIVE)
195 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIi64,
196 vl->values[i].derive);
197 else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
198 status = ssnprintf(buffer + offset, buffer_len - offset, ":%" PRIu64,
199 vl->values[i].absolute);
200
201 if ((status < 1) || (status >= (buffer_len - offset)))
202 return -1;
203
204 offset += status;
205 } /* for ds->ds_num */
206
207 return 0;
208 } /* int value_list_to_string_multiple */
209
value_list_to_string(char * buffer,int buffer_len,const data_set_t * ds,const value_list_t * vl)210 static int value_list_to_string(char *buffer, int buffer_len,
211 const data_set_t *ds, const value_list_t *vl) {
212 int status;
213 time_t tt;
214
215 if (ds->ds_num != 1)
216 return value_list_to_string_multiple(buffer, buffer_len, ds, vl);
217
218 tt = CDTIME_T_TO_TIME_T(vl->time);
219 switch (ds->ds[0].type) {
220 case DS_TYPE_DERIVE:
221 status = ssnprintf(buffer, buffer_len, "%u:%" PRIi64, (unsigned)tt,
222 vl->values[0].derive);
223 break;
224 case DS_TYPE_GAUGE:
225 status = ssnprintf(buffer, buffer_len, "%u:" GAUGE_FORMAT, (unsigned)tt,
226 vl->values[0].gauge);
227 break;
228 case DS_TYPE_COUNTER:
229 status = ssnprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
230 (uint64_t)vl->values[0].counter);
231 break;
232 case DS_TYPE_ABSOLUTE:
233 status = ssnprintf(buffer, buffer_len, "%u:%" PRIu64, (unsigned)tt,
234 vl->values[0].absolute);
235 break;
236 default:
237 return EINVAL;
238 }
239
240 if ((status < 1) || (status >= buffer_len))
241 return ENOMEM;
242
243 return 0;
244 } /* int value_list_to_string */
245
value_list_to_filename(char * buffer,size_t buffer_size,value_list_t const * vl)246 static int value_list_to_filename(char *buffer, size_t buffer_size,
247 value_list_t const *vl) {
248 char const suffix[] = ".rrd";
249 int status;
250 size_t len;
251
252 if (datadir != NULL) {
253 size_t datadir_len = strlen(datadir) + 1;
254
255 if (datadir_len >= buffer_size)
256 return ENOMEM;
257
258 sstrncpy(buffer, datadir, buffer_size);
259 buffer[datadir_len - 1] = '/';
260 buffer[datadir_len] = 0;
261
262 buffer += datadir_len;
263 buffer_size -= datadir_len;
264 }
265
266 status = FORMAT_VL(buffer, buffer_size, vl);
267 if (status != 0)
268 return status;
269
270 len = strlen(buffer);
271 assert(len < buffer_size);
272 buffer += len;
273 buffer_size -= len;
274
275 if (buffer_size <= sizeof(suffix))
276 return ENOMEM;
277
278 memcpy(buffer, suffix, sizeof(suffix));
279 return 0;
280 } /* int value_list_to_filename */
281
rrd_queue_thread(void * data)282 static void *rrd_queue_thread(void __attribute__((unused)) * data) {
283 struct timeval tv_next_update;
284 struct timeval tv_now;
285
286 gettimeofday(&tv_next_update, /* timezone = */ NULL);
287
288 while (42) {
289 rrd_queue_t *queue_entry;
290 rrd_cache_t *cache_entry;
291 char **values;
292 int values_num;
293 int status;
294
295 values = NULL;
296 values_num = 0;
297
298 pthread_mutex_lock(&queue_lock);
299 /* Wait for values to arrive */
300 while (42) {
301 struct timespec ts_wait;
302
303 while ((flushq_head == NULL) && (queue_head == NULL) &&
304 (do_shutdown == 0))
305 pthread_cond_wait(&queue_cond, &queue_lock);
306
307 if ((flushq_head == NULL) && (queue_head == NULL))
308 break;
309
310 /* Don't delay if there's something to flush */
311 if (flushq_head != NULL)
312 break;
313
314 /* Don't delay if we're shutting down */
315 if (do_shutdown != 0)
316 break;
317
318 /* Don't delay if no delay was configured. */
319 if (write_rate <= 0.0)
320 break;
321
322 gettimeofday(&tv_now, /* timezone = */ NULL);
323 status = timeval_cmp(tv_next_update, tv_now, NULL);
324 /* We're good to go */
325 if (status <= 0)
326 break;
327
328 /* We're supposed to wait a bit with this update, so we'll
329 * wait for the next addition to the queue or to the end of
330 * the wait period - whichever comes first. */
331 ts_wait.tv_sec = tv_next_update.tv_sec;
332 ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
333
334 status = pthread_cond_timedwait(&queue_cond, &queue_lock, &ts_wait);
335 if (status == ETIMEDOUT)
336 break;
337 } /* while (42) */
338
339 /* XXX: If you need to lock both, cache_lock and queue_lock, at
340 * the same time, ALWAYS lock `cache_lock' first! */
341
342 /* We're in the shutdown phase */
343 if ((flushq_head == NULL) && (queue_head == NULL)) {
344 pthread_mutex_unlock(&queue_lock);
345 break;
346 }
347
348 if (flushq_head != NULL) {
349 /* Dequeue the first flush entry */
350 queue_entry = flushq_head;
351 if (flushq_head == flushq_tail)
352 flushq_head = flushq_tail = NULL;
353 else
354 flushq_head = flushq_head->next;
355 } else /* if (queue_head != NULL) */
356 {
357 /* Dequeue the first regular entry */
358 queue_entry = queue_head;
359 if (queue_head == queue_tail)
360 queue_head = queue_tail = NULL;
361 else
362 queue_head = queue_head->next;
363 }
364
365 /* Unlock the queue again */
366 pthread_mutex_unlock(&queue_lock);
367
368 /* We now need the cache lock so the entry isn't updated while
369 * we make a copy of its values */
370 pthread_mutex_lock(&cache_lock);
371
372 status = c_avl_get(cache, queue_entry->filename, (void *)&cache_entry);
373
374 if (status == 0) {
375 values = cache_entry->values;
376 values_num = cache_entry->values_num;
377
378 cache_entry->values = NULL;
379 cache_entry->values_num = 0;
380 cache_entry->flags = FLAG_NONE;
381 }
382
383 pthread_mutex_unlock(&cache_lock);
384
385 if (status != 0) {
386 sfree(queue_entry->filename);
387 sfree(queue_entry);
388 continue;
389 }
390
391 /* Update `tv_next_update' */
392 if (write_rate > 0.0) {
393 gettimeofday(&tv_now, /* timezone = */ NULL);
394 tv_next_update.tv_sec = tv_now.tv_sec;
395 tv_next_update.tv_usec =
396 tv_now.tv_usec + ((suseconds_t)(1000000 * write_rate));
397 while (tv_next_update.tv_usec > 1000000) {
398 tv_next_update.tv_sec++;
399 tv_next_update.tv_usec -= 1000000;
400 }
401 }
402
403 /* Write the values to the RRD-file */
404 srrd_update(queue_entry->filename, NULL, values_num, (const char **)values);
405 DEBUG("rrdtool plugin: queue thread: Wrote %i value%s to %s", values_num,
406 (values_num == 1) ? "" : "s", queue_entry->filename);
407
408 for (int i = 0; i < values_num; i++) {
409 sfree(values[i]);
410 }
411 sfree(values);
412 sfree(queue_entry->filename);
413 sfree(queue_entry);
414 } /* while (42) */
415
416 pthread_exit((void *)0);
417 return (void *)0;
418 } /* void *rrd_queue_thread */
419
rrd_queue_enqueue(const char * filename,rrd_queue_t ** head,rrd_queue_t ** tail)420 static int rrd_queue_enqueue(const char *filename, rrd_queue_t **head,
421 rrd_queue_t **tail) {
422 rrd_queue_t *queue_entry;
423
424 queue_entry = malloc(sizeof(*queue_entry));
425 if (queue_entry == NULL)
426 return -1;
427
428 queue_entry->filename = strdup(filename);
429 if (queue_entry->filename == NULL) {
430 free(queue_entry);
431 return -1;
432 }
433
434 queue_entry->next = NULL;
435
436 pthread_mutex_lock(&queue_lock);
437
438 if (*tail == NULL)
439 *head = queue_entry;
440 else
441 (*tail)->next = queue_entry;
442 *tail = queue_entry;
443
444 pthread_cond_signal(&queue_cond);
445 pthread_mutex_unlock(&queue_lock);
446
447 return 0;
448 } /* int rrd_queue_enqueue */
449
rrd_queue_dequeue(const char * filename,rrd_queue_t ** head,rrd_queue_t ** tail)450 static int rrd_queue_dequeue(const char *filename, rrd_queue_t **head,
451 rrd_queue_t **tail) {
452 rrd_queue_t *this;
453 rrd_queue_t *prev;
454
455 pthread_mutex_lock(&queue_lock);
456
457 prev = NULL;
458 this = *head;
459
460 while (this != NULL) {
461 if (strcmp(this->filename, filename) == 0)
462 break;
463
464 prev = this;
465 this = this->next;
466 }
467
468 if (this == NULL) {
469 pthread_mutex_unlock(&queue_lock);
470 return -1;
471 }
472
473 if (prev == NULL)
474 *head = this->next;
475 else
476 prev->next = this->next;
477
478 if (this->next == NULL)
479 *tail = prev;
480
481 pthread_mutex_unlock(&queue_lock);
482
483 sfree(this->filename);
484 sfree(this);
485
486 return 0;
487 } /* int rrd_queue_dequeue */
488
489 /* XXX: You must hold "cache_lock" when calling this function! */
rrd_cache_flush(cdtime_t timeout)490 static void rrd_cache_flush(cdtime_t timeout) {
491 rrd_cache_t *rc;
492 cdtime_t now;
493
494 char **keys = NULL;
495 int keys_num = 0;
496
497 char *key;
498 c_avl_iterator_t *iter;
499
500 DEBUG("rrdtool plugin: Flushing cache, timeout = %.3f",
501 CDTIME_T_TO_DOUBLE(timeout));
502
503 now = cdtime();
504
505 /* Build a list of entries to be flushed */
506 iter = c_avl_get_iterator(cache);
507 while (c_avl_iterator_next(iter, (void *)&key, (void *)&rc) == 0) {
508 if (rc->flags != FLAG_NONE)
509 continue;
510 /* timeout == 0 => flush everything */
511 else if ((timeout != 0) && ((now - rc->first_value) < timeout))
512 continue;
513 else if (rc->values_num > 0) {
514 int status;
515
516 status = rrd_queue_enqueue(key, &queue_head, &queue_tail);
517 if (status == 0)
518 rc->flags = FLAG_QUEUED;
519 } else /* ancient and no values -> waste of memory */
520 {
521 char **tmp = realloc(keys, (keys_num + 1) * sizeof(char *));
522 if (tmp == NULL) {
523 ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
524 c_avl_iterator_destroy(iter);
525 sfree(keys);
526 return;
527 }
528 keys = tmp;
529 keys[keys_num] = key;
530 keys_num++;
531 }
532 } /* while (c_avl_iterator_next) */
533 c_avl_iterator_destroy(iter);
534
535 for (int i = 0; i < keys_num; i++) {
536 if (c_avl_remove(cache, keys[i], (void *)&key, (void *)&rc) != 0) {
537 DEBUG("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
538 continue;
539 }
540
541 assert(rc->values == NULL);
542 assert(rc->values_num == 0);
543
544 sfree(rc);
545 sfree(key);
546 keys[i] = NULL;
547 } /* for (i = 0..keys_num) */
548
549 sfree(keys);
550
551 cache_flush_last = now;
552 } /* void rrd_cache_flush */
553
rrd_cache_flush_identifier(cdtime_t timeout,const char * identifier)554 static int rrd_cache_flush_identifier(cdtime_t timeout,
555 const char *identifier) {
556 rrd_cache_t *rc;
557 cdtime_t now;
558 int status;
559 char key[2048];
560
561 if (identifier == NULL) {
562 rrd_cache_flush(timeout);
563 return 0;
564 }
565
566 now = cdtime();
567
568 if (datadir == NULL)
569 ssnprintf(key, sizeof(key), "%s.rrd", identifier);
570 else
571 ssnprintf(key, sizeof(key), "%s/%s.rrd", datadir, identifier);
572 key[sizeof(key) - 1] = '\0';
573
574 status = c_avl_get(cache, key, (void *)&rc);
575 if (status != 0) {
576 INFO("rrdtool plugin: rrd_cache_flush_identifier: "
577 "c_avl_get (%s) failed. Does that file really exist?",
578 key);
579 return status;
580 }
581
582 if (rc->flags == FLAG_FLUSHQ) {
583 status = 0;
584 } else if (rc->flags == FLAG_QUEUED) {
585 rrd_queue_dequeue(key, &queue_head, &queue_tail);
586 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
587 if (status == 0)
588 rc->flags = FLAG_FLUSHQ;
589 } else if ((now - rc->first_value) < timeout) {
590 status = 0;
591 } else if (rc->values_num > 0) {
592 status = rrd_queue_enqueue(key, &flushq_head, &flushq_tail);
593 if (status == 0)
594 rc->flags = FLAG_FLUSHQ;
595 }
596
597 return status;
598 } /* int rrd_cache_flush_identifier */
599
rrd_get_random_variation(void)600 static int64_t rrd_get_random_variation(void) {
601 if (random_timeout == 0)
602 return 0;
603
604 return (int64_t)cdrand_range(-random_timeout, random_timeout);
605 } /* int64_t rrd_get_random_variation */
606
rrd_cache_insert(const char * filename,const char * value,cdtime_t value_time)607 static int rrd_cache_insert(const char *filename, const char *value,
608 cdtime_t value_time) {
609 rrd_cache_t *rc = NULL;
610 int new_rc = 0;
611 char **values_new;
612
613 pthread_mutex_lock(&cache_lock);
614
615 /* This shouldn't happen, but it did happen at least once, so we'll be
616 * careful. */
617 if (cache == NULL) {
618 pthread_mutex_unlock(&cache_lock);
619 WARNING("rrdtool plugin: cache == NULL.");
620 return -1;
621 }
622
623 int status = c_avl_get(cache, filename, (void *)&rc);
624 if ((status != 0) || (rc == NULL)) {
625 rc = malloc(sizeof(*rc));
626 if (rc == NULL) {
627 ERROR("rrdtool plugin: malloc failed: %s", STRERRNO);
628 pthread_mutex_unlock(&cache_lock);
629 return -1;
630 }
631 rc->values_num = 0;
632 rc->values = NULL;
633 rc->first_value = 0;
634 rc->last_value = 0;
635 rc->random_variation = rrd_get_random_variation();
636 rc->flags = FLAG_NONE;
637 new_rc = 1;
638 }
639
640 assert(value_time > 0); /* plugin_dispatch() ensures this. */
641 if (rc->last_value >= value_time) {
642 pthread_mutex_unlock(&cache_lock);
643 DEBUG("rrdtool plugin: (rc->last_value = %" PRIu64 ") "
644 ">= (value_time = %" PRIu64 ")",
645 rc->last_value, value_time);
646 return -1;
647 }
648
649 values_new =
650 realloc((void *)rc->values, (rc->values_num + 1) * sizeof(char *));
651 if (values_new == NULL) {
652 void *cache_key = NULL;
653
654 c_avl_remove(cache, filename, &cache_key, NULL);
655 pthread_mutex_unlock(&cache_lock);
656
657 ERROR("rrdtool plugin: realloc failed: %s", STRERRNO);
658
659 sfree(cache_key);
660 sfree(rc->values);
661 sfree(rc);
662 return -1;
663 }
664 rc->values = values_new;
665
666 rc->values[rc->values_num] = strdup(value);
667 if (rc->values[rc->values_num] != NULL)
668 rc->values_num++;
669
670 if (rc->values_num == 1)
671 rc->first_value = value_time;
672 rc->last_value = value_time;
673
674 /* Insert if this is the first value */
675 if (new_rc == 1) {
676 void *cache_key = strdup(filename);
677
678 if (cache_key == NULL) {
679 pthread_mutex_unlock(&cache_lock);
680
681 ERROR("rrdtool plugin: strdup failed: %s", STRERRNO);
682
683 sfree(rc->values[0]);
684 sfree(rc->values);
685 sfree(rc);
686 return -1;
687 }
688
689 c_avl_insert(cache, cache_key, rc);
690 }
691
692 DEBUG("rrdtool plugin: rrd_cache_insert: file = %s; "
693 "values_num = %i; age = %.3f;",
694 filename, rc->values_num,
695 CDTIME_T_TO_DOUBLE(rc->last_value - rc->first_value));
696
697 if ((rc->last_value - rc->first_value) >=
698 (cache_timeout + rc->random_variation)) {
699 /* XXX: If you need to lock both, cache_lock and queue_lock, at
700 * the same time, ALWAYS lock `cache_lock' first! */
701 if (rc->flags == FLAG_NONE) {
702 int status;
703
704 status = rrd_queue_enqueue(filename, &queue_head, &queue_tail);
705 if (status == 0)
706 rc->flags = FLAG_QUEUED;
707
708 rc->random_variation = rrd_get_random_variation();
709 } else {
710 DEBUG("rrdtool plugin: `%s' is already queued.", filename);
711 }
712 }
713
714 if ((cache_timeout > 0) &&
715 ((cdtime() - cache_flush_last) > cache_flush_timeout))
716 rrd_cache_flush(cache_timeout + random_timeout);
717
718 pthread_mutex_unlock(&cache_lock);
719
720 return 0;
721 } /* int rrd_cache_insert */
722
rrd_cache_destroy(void)723 static int rrd_cache_destroy(void) /* {{{ */
724 {
725 void *key = NULL;
726 void *value = NULL;
727
728 int non_empty = 0;
729
730 pthread_mutex_lock(&cache_lock);
731
732 if (cache == NULL) {
733 pthread_mutex_unlock(&cache_lock);
734 return 0;
735 }
736
737 while (c_avl_pick(cache, &key, &value) == 0) {
738 rrd_cache_t *rc;
739
740 sfree(key);
741 key = NULL;
742
743 rc = value;
744 value = NULL;
745
746 if (rc->values_num > 0)
747 non_empty++;
748
749 for (int i = 0; i < rc->values_num; i++)
750 sfree(rc->values[i]);
751 sfree(rc->values);
752 sfree(rc);
753 }
754
755 c_avl_destroy(cache);
756 cache = NULL;
757
758 if (non_empty > 0) {
759 INFO("rrdtool plugin: %i cache %s had values when destroying the cache.",
760 non_empty, (non_empty == 1) ? "entry" : "entries");
761 } else {
762 DEBUG("rrdtool plugin: No values have been lost "
763 "when destroying the cache.");
764 }
765
766 pthread_mutex_unlock(&cache_lock);
767 return 0;
768 } /* }}} int rrd_cache_destroy */
769
rrd_compare_numeric(const void * a_ptr,const void * b_ptr)770 static int rrd_compare_numeric(const void *a_ptr, const void *b_ptr) {
771 int a = *((int *)a_ptr);
772 int b = *((int *)b_ptr);
773
774 if (a < b)
775 return -1;
776 else if (a > b)
777 return 1;
778 else
779 return 0;
780 } /* int rrd_compare_numeric */
781
rrd_write(const data_set_t * ds,const value_list_t * vl,user_data_t * user_data)782 static int rrd_write(const data_set_t *ds, const value_list_t *vl,
783 user_data_t __attribute__((unused)) * user_data) {
784
785 if (do_shutdown)
786 return 0;
787
788 if (0 != strcmp(ds->type, vl->type)) {
789 ERROR("rrdtool plugin: DS type does not match value list type");
790 return -1;
791 }
792
793 char filename[PATH_MAX];
794 if (value_list_to_filename(filename, sizeof(filename), vl) != 0) {
795 ERROR("rrdtool plugin: failed to build filename");
796 return -1;
797 }
798
799 char values[32 * (ds->ds_num + 1)];
800 if (value_list_to_string(values, sizeof(values), ds, vl) != 0) {
801 ERROR("rrdtool plugin: failed to build values string");
802 return -1;
803 }
804
805 struct stat statbuf = {0};
806 if (stat(filename, &statbuf) == -1) {
807 if (errno == ENOENT) {
808 if (cu_rrd_create_file(filename, ds, vl, &rrdcreate_config) != 0) {
809 ERROR("rrdtool plugin: cu_rrd_create_file (%s) failed.", filename);
810 return -1;
811 } else if (rrdcreate_config.async) {
812 return 0;
813 }
814 } else {
815 ERROR("rrdtool plugin: stat(%s) failed: %s", filename, STRERRNO);
816 return -1;
817 }
818 } else if (!S_ISREG(statbuf.st_mode)) {
819 ERROR("rrdtool plugin: stat(%s): Not a regular file!", filename);
820 return -1;
821 }
822
823 return rrd_cache_insert(filename, values, vl->time);
824 } /* int rrd_write */
825
rrd_flush(cdtime_t timeout,const char * identifier,user_data_t * user_data)826 static int rrd_flush(cdtime_t timeout, const char *identifier,
827 __attribute__((unused)) user_data_t *user_data) {
828 pthread_mutex_lock(&cache_lock);
829
830 if (cache == NULL) {
831 pthread_mutex_unlock(&cache_lock);
832 return 0;
833 }
834
835 rrd_cache_flush_identifier(timeout, identifier);
836
837 pthread_mutex_unlock(&cache_lock);
838 return 0;
839 } /* int rrd_flush */
840
rrd_config(const char * key,const char * value)841 static int rrd_config(const char *key, const char *value) {
842 if (strcasecmp("CacheTimeout", key) == 0) {
843 double tmp = atof(value);
844 if (tmp < 0) {
845 fprintf(stderr, "rrdtool: `CacheTimeout' must "
846 "be greater than 0.\n");
847 ERROR("rrdtool: `CacheTimeout' must "
848 "be greater than 0.\n");
849 return 1;
850 }
851 cache_timeout = DOUBLE_TO_CDTIME_T(tmp);
852 } else if (strcasecmp("CacheFlush", key) == 0) {
853 double tmp = atof(value);
854 if (tmp < 0) {
855 fprintf(stderr, "rrdtool: `CacheFlush' must "
856 "be greater than 0.\n");
857 ERROR("rrdtool: `CacheFlush' must "
858 "be greater than 0.\n");
859 return 1;
860 }
861 cache_flush_timeout = DOUBLE_TO_CDTIME_T(tmp);
862 } else if (strcasecmp("DataDir", key) == 0) {
863 char *tmp;
864 size_t len;
865
866 tmp = strdup(value);
867 if (tmp == NULL) {
868 ERROR("rrdtool plugin: strdup failed.");
869 return 1;
870 }
871
872 len = strlen(tmp);
873 while ((len > 0) && (tmp[len - 1] == '/')) {
874 len--;
875 tmp[len] = 0;
876 }
877
878 if (len == 0) {
879 ERROR("rrdtool plugin: Invalid \"DataDir\" option.");
880 sfree(tmp);
881 return 1;
882 }
883
884 if (datadir != NULL) {
885 sfree(datadir);
886 }
887
888 datadir = tmp;
889 } else if (strcasecmp("StepSize", key) == 0) {
890 unsigned long temp = strtoul(value, NULL, 0);
891 if (temp > 0)
892 rrdcreate_config.stepsize = temp;
893 } else if (strcasecmp("HeartBeat", key) == 0) {
894 int temp = atoi(value);
895 if (temp > 0)
896 rrdcreate_config.heartbeat = temp;
897 } else if (strcasecmp("CreateFilesAsync", key) == 0) {
898 if (IS_TRUE(value))
899 rrdcreate_config.async = 1;
900 else
901 rrdcreate_config.async = 0;
902 } else if (strcasecmp("RRARows", key) == 0) {
903 int tmp = atoi(value);
904 if (tmp <= 0) {
905 fprintf(stderr, "rrdtool: `RRARows' must "
906 "be greater than 0.\n");
907 ERROR("rrdtool: `RRARows' must "
908 "be greater than 0.\n");
909 return 1;
910 }
911 rrdcreate_config.rrarows = tmp;
912 } else if (strcasecmp("RRATimespan", key) == 0) {
913 char *saveptr = NULL;
914 char *dummy;
915 char *ptr;
916 char *value_copy;
917 int *tmp_alloc;
918
919 value_copy = strdup(value);
920 if (value_copy == NULL)
921 return 1;
922
923 dummy = value_copy;
924 while ((ptr = strtok_r(dummy, ", \t", &saveptr)) != NULL) {
925 dummy = NULL;
926
927 tmp_alloc = realloc(rrdcreate_config.timespans,
928 sizeof(int) * (rrdcreate_config.timespans_num + 1));
929 if (tmp_alloc == NULL) {
930 fprintf(stderr, "rrdtool: realloc failed.\n");
931 ERROR("rrdtool: realloc failed.\n");
932 free(value_copy);
933 return 1;
934 }
935 rrdcreate_config.timespans = tmp_alloc;
936 rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi(ptr);
937 if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
938 rrdcreate_config.timespans_num++;
939 } /* while (strtok_r) */
940
941 qsort(/* base = */ rrdcreate_config.timespans,
942 /* nmemb = */ rrdcreate_config.timespans_num,
943 /* size = */ sizeof(rrdcreate_config.timespans[0]),
944 /* compar = */ rrd_compare_numeric);
945
946 free(value_copy);
947 } else if (strcasecmp("XFF", key) == 0) {
948 double tmp = atof(value);
949 if ((tmp < 0.0) || (tmp >= 1.0)) {
950 fprintf(stderr, "rrdtool: `XFF' must "
951 "be in the range 0 to 1 (exclusive).");
952 ERROR("rrdtool: `XFF' must "
953 "be in the range 0 to 1 (exclusive).");
954 return 1;
955 }
956 rrdcreate_config.xff = tmp;
957 } else if (strcasecmp("WritesPerSecond", key) == 0) {
958 double wps = atof(value);
959
960 if (wps < 0.0) {
961 fprintf(stderr, "rrdtool: `WritesPerSecond' must be "
962 "greater than or equal to zero.");
963 return 1;
964 } else if (wps == 0.0) {
965 write_rate = 0.0;
966 } else {
967 write_rate = 1.0 / wps;
968 }
969 } else if (strcasecmp("RandomTimeout", key) == 0) {
970 double tmp;
971
972 tmp = atof(value);
973 if (tmp < 0.0) {
974 fprintf(stderr, "rrdtool: `RandomTimeout' must "
975 "be greater than or equal to zero.\n");
976 ERROR("rrdtool: `RandomTimeout' must "
977 "be greater then or equal to zero.");
978 } else {
979 random_timeout = DOUBLE_TO_CDTIME_T(tmp);
980 }
981 } else {
982 return -1;
983 }
984 return 0;
985 } /* int rrd_config */
986
rrd_shutdown(void)987 static int rrd_shutdown(void) {
988 pthread_mutex_lock(&cache_lock);
989 rrd_cache_flush(0);
990 pthread_mutex_unlock(&cache_lock);
991
992 pthread_mutex_lock(&queue_lock);
993 do_shutdown = 1;
994 pthread_cond_signal(&queue_cond);
995 pthread_mutex_unlock(&queue_lock);
996
997 if ((queue_thread_running != 0) &&
998 ((queue_head != NULL) || (flushq_head != NULL))) {
999 INFO("rrdtool plugin: Shutting down the queue thread. "
1000 "This may take a while.");
1001 } else if (queue_thread_running != 0) {
1002 INFO("rrdtool plugin: Shutting down the queue thread.");
1003 }
1004
1005 /* Wait for all the values to be written to disk before returning. */
1006 if (queue_thread_running != 0) {
1007 pthread_join(queue_thread, NULL);
1008 memset(&queue_thread, 0, sizeof(queue_thread));
1009 queue_thread_running = 0;
1010 DEBUG("rrdtool plugin: queue_thread exited.");
1011 }
1012
1013 rrd_cache_destroy();
1014
1015 return 0;
1016 } /* int rrd_shutdown */
1017
rrd_init(void)1018 static int rrd_init(void) {
1019 static int init_once;
1020
1021 if (init_once != 0)
1022 return 0;
1023 init_once = 1;
1024
1025 if (rrdcreate_config.heartbeat <= 0)
1026 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1027
1028 /* Set the cache up */
1029 pthread_mutex_lock(&cache_lock);
1030
1031 cache = c_avl_create((int (*)(const void *, const void *))strcmp);
1032 if (cache == NULL) {
1033 pthread_mutex_unlock(&cache_lock);
1034 ERROR("rrdtool plugin: c_avl_create failed.");
1035 return -1;
1036 }
1037
1038 cache_flush_last = cdtime();
1039 if (cache_timeout == 0) {
1040 random_timeout = 0;
1041 cache_flush_timeout = 0;
1042 } else if (cache_flush_timeout < cache_timeout) {
1043 INFO("rrdtool plugin: \"CacheFlush %.3f\" is less than \"CacheTimeout "
1044 "%.3f\". Adjusting \"CacheFlush\" to %.3f seconds.",
1045 CDTIME_T_TO_DOUBLE(cache_flush_timeout),
1046 CDTIME_T_TO_DOUBLE(cache_timeout),
1047 CDTIME_T_TO_DOUBLE(cache_timeout * 10));
1048 cache_flush_timeout = 10 * cache_timeout;
1049 }
1050
1051 /* Assure that "cache_timeout + random_variation" is never negative. */
1052 if (random_timeout > cache_timeout) {
1053 INFO("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
1054 CDTIME_T_TO_DOUBLE(cache_timeout));
1055 random_timeout = cache_timeout;
1056 }
1057
1058 pthread_mutex_unlock(&cache_lock);
1059
1060 int status = plugin_thread_create(&queue_thread, rrd_queue_thread,
1061 /* args = */ NULL, "rrdtool queue");
1062 if (status != 0) {
1063 ERROR("rrdtool plugin: Cannot create queue-thread.");
1064 return -1;
1065 }
1066 queue_thread_running = 1;
1067
1068 DEBUG("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1069 " heartbeat = %i; rrarows = %i; xff = %lf;",
1070 (datadir == NULL) ? "(null)" : datadir, rrdcreate_config.stepsize,
1071 rrdcreate_config.heartbeat, rrdcreate_config.rrarows,
1072 rrdcreate_config.xff);
1073
1074 return 0;
1075 } /* int rrd_init */
1076
module_register(void)1077 void module_register(void) {
1078 plugin_register_config("rrdtool", rrd_config, config_keys, config_keys_num);
1079 plugin_register_init("rrdtool", rrd_init);
1080 plugin_register_write("rrdtool", rrd_write, /* user_data = */ NULL);
1081 plugin_register_flush("rrdtool", rrd_flush, /* user_data = */ NULL);
1082 plugin_register_shutdown("rrdtool", rrd_shutdown);
1083 }
1084