1 /**
2 * collectd - src/plugin.c
3 * Copyright (C) 2005-2014 Florian octo Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian octo Forster <octo at collectd.org>
25 * Sebastian Harl <sh at tokkee.org>
26 **/
27
28 /* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
29 #define _GNU_SOURCE
30
31 #include "collectd.h"
32
33 #include "configfile.h"
34 #include "filter_chain.h"
35 #include "plugin.h"
36 #include "utils/avltree/avltree.h"
37 #include "utils/common/common.h"
38 #include "utils/heap/heap.h"
39 #include "utils_cache.h"
40 #include "utils_complain.h"
41 #include "utils_llist.h"
42 #include "utils_random.h"
43 #include "utils_time.h"
44
45 #ifdef WIN32
46 #define EXPORT __declspec(dllexport)
47 #include <sys/stat.h>
48 #include <unistd.h>
49 #else
50 #define EXPORT
51 #endif
52
53 #if HAVE_PTHREAD_NP_H
54 #include <pthread_np.h> /* for pthread_set_name_np(3) */
55 #endif
56
57 #include <dlfcn.h>
58
59 /*
60 * Private structures
61 */
62 struct callback_func_s {
63 void *cf_callback;
64 user_data_t cf_udata;
65 plugin_ctx_t cf_ctx;
66 };
67 typedef struct callback_func_s callback_func_t;
68
69 #define RF_SIMPLE 0
70 #define RF_COMPLEX 1
71 #define RF_REMOVE 65535
72 struct read_func_s {
73 /* `read_func_t' "inherits" from `callback_func_t'.
74 * The `rf_super' member MUST be the first one in this structure! */
75 #define rf_callback rf_super.cf_callback
76 #define rf_udata rf_super.cf_udata
77 #define rf_ctx rf_super.cf_ctx
78 callback_func_t rf_super;
79 char rf_group[DATA_MAX_NAME_LEN];
80 char *rf_name;
81 int rf_type;
82 cdtime_t rf_interval;
83 cdtime_t rf_effective_interval;
84 cdtime_t rf_next_read;
85 };
86 typedef struct read_func_s read_func_t;
87
88 struct cache_event_func_s {
89 plugin_cache_event_cb callback;
90 char *name;
91 user_data_t user_data;
92 plugin_ctx_t plugin_ctx;
93 };
94 typedef struct cache_event_func_s cache_event_func_t;
95
96 struct write_queue_s;
97 typedef struct write_queue_s write_queue_t;
98 struct write_queue_s {
99 value_list_t *vl;
100 plugin_ctx_t ctx;
101 write_queue_t *next;
102 };
103
104 struct flush_callback_s {
105 char *name;
106 cdtime_t timeout;
107 };
108 typedef struct flush_callback_s flush_callback_t;
109
110 /*
111 * Private variables
112 */
113 static c_avl_tree_t *plugins_loaded;
114
115 static llist_t *list_init;
116 static llist_t *list_write;
117 static llist_t *list_flush;
118 static llist_t *list_missing;
119 static llist_t *list_shutdown;
120 static llist_t *list_log;
121 static llist_t *list_notification;
122
123 static size_t list_cache_event_num;
124 static cache_event_func_t list_cache_event[32];
125
126 static fc_chain_t *pre_cache_chain;
127 static fc_chain_t *post_cache_chain;
128
129 static c_avl_tree_t *data_sets;
130
131 static char *plugindir;
132
133 #ifndef DEFAULT_MAX_READ_INTERVAL
134 #define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400)
135 #endif
136 static c_heap_t *read_heap;
137 static llist_t *read_list;
138 static int read_loop = 1;
139 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
140 static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
141 static pthread_t *read_threads;
142 static size_t read_threads_num;
143 static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
144
145 static write_queue_t *write_queue_head;
146 static write_queue_t *write_queue_tail;
147 static long write_queue_length;
148 static bool write_loop = true;
149 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
150 static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
151 static pthread_t *write_threads;
152 static size_t write_threads_num;
153
154 static pthread_key_t plugin_ctx_key;
155 static bool plugin_ctx_key_initialized;
156
157 static long write_limit_high;
158 static long write_limit_low;
159
160 static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
161 static derive_t stats_values_dropped;
162 static bool record_statistics;
163
164 /*
165 * Static functions
166 */
167 static int plugin_dispatch_values_internal(value_list_t *vl);
168
plugin_get_dir(void)169 static const char *plugin_get_dir(void) {
170 if (plugindir == NULL)
171 return PLUGINDIR;
172 else
173 return plugindir;
174 }
175
plugin_update_internal_statistics(void)176 static int plugin_update_internal_statistics(void) { /* {{{ */
177 gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
178
179 /* Initialize `vl' */
180 value_list_t vl = VALUE_LIST_INIT;
181 sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin));
182 vl.interval = plugin_get_interval();
183
184 /* Write queue */
185 sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance));
186
187 /* Write queue : queue length */
188 vl.values = &(value_t){.gauge = copy_write_queue_length};
189 vl.values_len = 1;
190 sstrncpy(vl.type, "queue_length", sizeof(vl.type));
191 vl.type_instance[0] = 0;
192 plugin_dispatch_values(&vl);
193
194 /* Write queue : Values dropped (queue length > low limit) */
195 vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped};
196 vl.values_len = 1;
197 sstrncpy(vl.type, "derive", sizeof(vl.type));
198 sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance));
199 plugin_dispatch_values(&vl);
200
201 /* Cache */
202 sstrncpy(vl.plugin_instance, "cache", sizeof(vl.plugin_instance));
203
204 /* Cache : Nb entry in cache tree */
205 vl.values = &(value_t){.gauge = (gauge_t)uc_get_size()};
206 vl.values_len = 1;
207 sstrncpy(vl.type, "cache_size", sizeof(vl.type));
208 vl.type_instance[0] = 0;
209 plugin_dispatch_values(&vl);
210
211 return 0;
212 } /* }}} int plugin_update_internal_statistics */
213
free_userdata(user_data_t const * ud)214 static void free_userdata(user_data_t const *ud) /* {{{ */
215 {
216 if (ud == NULL)
217 return;
218
219 if ((ud->data != NULL) && (ud->free_func != NULL)) {
220 ud->free_func(ud->data);
221 }
222 } /* }}} void free_userdata */
223
destroy_callback(callback_func_t * cf)224 static void destroy_callback(callback_func_t *cf) /* {{{ */
225 {
226 if (cf == NULL)
227 return;
228 free_userdata(&cf->cf_udata);
229 sfree(cf);
230 } /* }}} void destroy_callback */
231
destroy_all_callbacks(llist_t ** list)232 static void destroy_all_callbacks(llist_t **list) /* {{{ */
233 {
234 llentry_t *le;
235
236 if (*list == NULL)
237 return;
238
239 le = llist_head(*list);
240 while (le != NULL) {
241 llentry_t *le_next;
242
243 le_next = le->next;
244
245 sfree(le->key);
246 destroy_callback(le->value);
247 le->value = NULL;
248
249 le = le_next;
250 }
251
252 llist_destroy(*list);
253 *list = NULL;
254 } /* }}} void destroy_all_callbacks */
255
destroy_read_heap(void)256 static void destroy_read_heap(void) /* {{{ */
257 {
258 if (read_heap == NULL)
259 return;
260
261 while (42) {
262 read_func_t *rf;
263
264 rf = c_heap_get_root(read_heap);
265 if (rf == NULL)
266 break;
267 sfree(rf->rf_name);
268 destroy_callback((callback_func_t *)rf);
269 }
270
271 c_heap_destroy(read_heap);
272 read_heap = NULL;
273 } /* }}} void destroy_read_heap */
274
register_callback(llist_t ** list,const char * name,callback_func_t * cf)275 static int register_callback(llist_t **list, /* {{{ */
276 const char *name, callback_func_t *cf) {
277
278 if (*list == NULL) {
279 *list = llist_create();
280 if (*list == NULL) {
281 ERROR("plugin: register_callback: "
282 "llist_create failed.");
283 destroy_callback(cf);
284 return -1;
285 }
286 }
287
288 char *key = strdup(name);
289 if (key == NULL) {
290 ERROR("plugin: register_callback: strdup failed.");
291 destroy_callback(cf);
292 return -1;
293 }
294
295 llentry_t *le = llist_search(*list, name);
296 if (le == NULL) {
297 le = llentry_create(key, cf);
298 if (le == NULL) {
299 ERROR("plugin: register_callback: "
300 "llentry_create failed.");
301 sfree(key);
302 destroy_callback(cf);
303 return -1;
304 }
305
306 llist_append(*list, le);
307 } else {
308 callback_func_t *old_cf = le->value;
309 le->value = cf;
310
311 P_WARNING("register_callback: "
312 "a callback named `%s' already exists - "
313 "overwriting the old entry!",
314 name);
315
316 destroy_callback(old_cf);
317 sfree(key);
318 }
319
320 return 0;
321 } /* }}} int register_callback */
322
log_list_callbacks(llist_t ** list,const char * comment)323 static void log_list_callbacks(llist_t **list, /* {{{ */
324 const char *comment) {
325 char *str;
326 int len;
327 int i;
328 llentry_t *le;
329 int n;
330
331 n = llist_size(*list);
332 if (n == 0) {
333 INFO("%s: [none]", comment);
334 return;
335 }
336
337 char **keys = calloc(n, sizeof(*keys));
338 if (keys == NULL) {
339 ERROR("%s: failed to allocate memory for list of callbacks", comment);
340 return;
341 }
342
343 for (le = llist_head(*list), i = 0, len = 0; le != NULL; le = le->next, i++) {
344 keys[i] = le->key;
345 len += strlen(le->key) + 6;
346 }
347 str = malloc(len + 10);
348 if (str == NULL) {
349 ERROR("%s: failed to allocate memory for list of callbacks", comment);
350 } else {
351 *str = '\0';
352 strjoin(str, len, keys, n, "', '");
353 INFO("%s ['%s']", comment, str);
354 sfree(str);
355 }
356 sfree(keys);
357 } /* }}} void log_list_callbacks */
358
create_register_callback(llist_t ** list,const char * name,void * callback,user_data_t const * ud)359 static int create_register_callback(llist_t **list, /* {{{ */
360 const char *name, void *callback,
361 user_data_t const *ud) {
362
363 if (name == NULL || callback == NULL)
364 return EINVAL;
365
366 callback_func_t *cf = calloc(1, sizeof(*cf));
367 if (cf == NULL) {
368 free_userdata(ud);
369 ERROR("plugin: create_register_callback: calloc failed.");
370 return ENOMEM;
371 }
372
373 cf->cf_callback = callback;
374 if (ud == NULL) {
375 cf->cf_udata = (user_data_t){
376 .data = NULL,
377 .free_func = NULL,
378 };
379 } else {
380 cf->cf_udata = *ud;
381 }
382
383 cf->cf_ctx = plugin_get_ctx();
384
385 return register_callback(list, name, cf);
386 } /* }}} int create_register_callback */
387
plugin_unregister(llist_t * list,const char * name)388 static int plugin_unregister(llist_t *list, const char *name) /* {{{ */
389 {
390 llentry_t *e;
391
392 if (list == NULL)
393 return -1;
394
395 e = llist_search(list, name);
396 if (e == NULL)
397 return -1;
398
399 llist_remove(list, e);
400
401 sfree(e->key);
402 destroy_callback(e->value);
403
404 llentry_destroy(e);
405
406 return 0;
407 } /* }}} int plugin_unregister */
408
409 /* plugin_load_file loads the shared object "file" and calls its
410 * "module_register" function. Returns zero on success, non-zero otherwise. */
plugin_load_file(char const * file,bool global)411 static int plugin_load_file(char const *file, bool global) {
412 int flags = RTLD_NOW;
413 if (global)
414 flags |= RTLD_GLOBAL;
415
416 void *dlh = dlopen(file, flags);
417 if (dlh == NULL) {
418 char errbuf[1024] = "";
419
420 snprintf(errbuf, sizeof(errbuf),
421 "dlopen(\"%s\") failed: %s. "
422 "The most common cause for this problem is missing dependencies. "
423 "Use ldd(1) to check the dependencies of the plugin / shared "
424 "object.",
425 file, dlerror());
426
427 /* This error is printed to STDERR unconditionally. If list_log is NULL,
428 * plugin_log() will also print to STDERR. We avoid duplicate output by
429 * checking that the list of log handlers, list_log, is not NULL. */
430 fprintf(stderr, "ERROR: %s\n", errbuf);
431 if (list_log != NULL) {
432 ERROR("%s", errbuf);
433 }
434
435 return ENOENT;
436 }
437
438 void (*reg_handle)(void) = dlsym(dlh, "module_register");
439 if (reg_handle == NULL) {
440 ERROR("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file,
441 dlerror());
442 dlclose(dlh);
443 return ENOENT;
444 }
445
446 (*reg_handle)();
447 return 0;
448 }
449
plugin_read_thread(void * args)450 static void *plugin_read_thread(void __attribute__((unused)) * args) {
451 while (read_loop != 0) {
452 read_func_t *rf;
453 plugin_ctx_t old_ctx;
454 cdtime_t start;
455 cdtime_t now;
456 cdtime_t elapsed;
457 int status;
458 int rf_type;
459 int rc;
460
461 /* Get the read function that needs to be read next.
462 * We don't need to hold "read_lock" for the heap, but we need
463 * to call c_heap_get_root() and pthread_cond_wait() in the
464 * same protected block. */
465 pthread_mutex_lock(&read_lock);
466 rf = c_heap_get_root(read_heap);
467 if (rf == NULL) {
468 pthread_cond_wait(&read_cond, &read_lock);
469 pthread_mutex_unlock(&read_lock);
470 continue;
471 }
472 pthread_mutex_unlock(&read_lock);
473
474 if (rf->rf_interval == 0) {
475 /* this should not happen, because the interval is set
476 * for each plugin when loading it
477 * XXX: issue a warning? */
478 rf->rf_interval = plugin_get_interval();
479 rf->rf_effective_interval = rf->rf_interval;
480
481 rf->rf_next_read = cdtime();
482 }
483
484 /* sleep until this entry is due,
485 * using pthread_cond_timedwait */
486 pthread_mutex_lock(&read_lock);
487 /* In pthread_cond_timedwait, spurious wakeups are possible
488 * (and really happen, at least on NetBSD with > 1 CPU), thus
489 * we need to re-evaluate the condition every time
490 * pthread_cond_timedwait returns. */
491 rc = 0;
492 while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) {
493 rc = pthread_cond_timedwait(&read_cond, &read_lock,
494 &CDTIME_T_TO_TIMESPEC(rf->rf_next_read));
495 }
496
497 /* Must hold `read_lock' when accessing `rf->rf_type'. */
498 rf_type = rf->rf_type;
499 pthread_mutex_unlock(&read_lock);
500
501 /* Check if we're supposed to stop.. This may have interrupted
502 * the sleep, too. */
503 if (read_loop == 0) {
504 /* Insert `rf' again, so it can be free'd correctly */
505 c_heap_insert(read_heap, rf);
506 break;
507 }
508
509 /* The entry has been marked for deletion. The linked list
510 * entry has already been removed by `plugin_unregister_read'.
511 * All we have to do here is free the `read_func_t' and
512 * continue. */
513 if (rf_type == RF_REMOVE) {
514 DEBUG("plugin_read_thread: Destroying the `%s' "
515 "callback.",
516 rf->rf_name);
517 sfree(rf->rf_name);
518 destroy_callback((callback_func_t *)rf);
519 rf = NULL;
520 continue;
521 }
522
523 DEBUG("plugin_read_thread: Handling `%s'.", rf->rf_name);
524
525 start = cdtime();
526
527 old_ctx = plugin_set_ctx(rf->rf_ctx);
528
529 if (rf_type == RF_SIMPLE) {
530 int (*callback)(void);
531
532 callback = rf->rf_callback;
533 status = (*callback)();
534 } else {
535 plugin_read_cb callback;
536
537 assert(rf_type == RF_COMPLEX);
538
539 callback = rf->rf_callback;
540 status = (*callback)(&rf->rf_udata);
541 }
542
543 plugin_set_ctx(old_ctx);
544
545 /* If the function signals failure, we will increase the
546 * intervals in which it will be called. */
547 if (status != 0) {
548 rf->rf_effective_interval *= 2;
549 if (rf->rf_effective_interval > max_read_interval)
550 rf->rf_effective_interval = max_read_interval;
551
552 NOTICE("read-function of plugin `%s' failed. "
553 "Will suspend it for %.3f seconds.",
554 rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
555 } else {
556 /* Success: Restore the interval, if it was changed. */
557 rf->rf_effective_interval = rf->rf_interval;
558 }
559
560 /* update the ``next read due'' field */
561 now = cdtime();
562
563 /* calculate the time spent in the read function */
564 elapsed = (now - start);
565
566 if (elapsed > rf->rf_effective_interval)
567 WARNING(
568 "plugin_read_thread: read-function of the `%s' plugin took %.3f "
569 "seconds, which is above its read interval (%.3f seconds). You might "
570 "want to adjust the `Interval' or `ReadThreads' settings.",
571 rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed),
572 CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
573
574 DEBUG("plugin_read_thread: read-function of the `%s' plugin took "
575 "%.6f seconds.",
576 rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed));
577
578 DEBUG("plugin_read_thread: Effective interval of the "
579 "`%s' plugin is %.3f seconds.",
580 rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
581
582 /* Calculate the next (absolute) time at which this function
583 * should be called. */
584 rf->rf_next_read += rf->rf_effective_interval;
585
586 /* Check, if `rf_next_read' is in the past. */
587 if (rf->rf_next_read < now) {
588 /* `rf_next_read' is in the past. Insert `now'
589 * so this value doesn't trail off into the
590 * past too much. */
591 rf->rf_next_read = now;
592 }
593
594 DEBUG("plugin_read_thread: Next read of the `%s' plugin at %.3f.",
595 rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_next_read));
596
597 /* Re-insert this read function into the heap again. */
598 c_heap_insert(read_heap, rf);
599 } /* while (read_loop) */
600
601 pthread_exit(NULL);
602 return (void *)0;
603 } /* void *plugin_read_thread */
604
605 #ifdef PTHREAD_MAX_NAMELEN_NP
606 #define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
607 #else
608 #define THREAD_NAME_MAX 16
609 #endif
610
set_thread_name(pthread_t tid,char const * name)611 static void set_thread_name(pthread_t tid, char const *name) {
612 #if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
613
614 /* glibc limits the length of the name and fails if the passed string
615 * is too long, so we truncate it here. */
616 char n[THREAD_NAME_MAX];
617 if (strlen(name) >= THREAD_NAME_MAX)
618 WARNING("set_thread_name(\"%s\"): name too long", name);
619 sstrncpy(n, name, sizeof(n));
620
621 #if defined(HAVE_PTHREAD_SETNAME_NP)
622 int status = pthread_setname_np(tid, n);
623 if (status != 0) {
624 ERROR("set_thread_name(\"%s\"): %s", n, STRERROR(status));
625 }
626 #else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
627 pthread_set_name_np(tid, n);
628 #endif
629
630 #endif
631 }
632
start_read_threads(size_t num)633 static void start_read_threads(size_t num) /* {{{ */
634 {
635 if (read_threads != NULL)
636 return;
637
638 read_threads = calloc(num, sizeof(*read_threads));
639 if (read_threads == NULL) {
640 ERROR("plugin: start_read_threads: calloc failed.");
641 return;
642 }
643
644 read_threads_num = 0;
645 for (size_t i = 0; i < num; i++) {
646 int status = pthread_create(read_threads + read_threads_num,
647 /* attr = */ NULL, plugin_read_thread,
648 /* arg = */ NULL);
649 if (status != 0) {
650 ERROR("plugin: start_read_threads: pthread_create failed with status %i "
651 "(%s).",
652 status, STRERROR(status));
653 return;
654 }
655
656 char name[THREAD_NAME_MAX];
657 ssnprintf(name, sizeof(name), "reader#%" PRIu64,
658 (uint64_t)read_threads_num);
659 set_thread_name(read_threads[read_threads_num], name);
660
661 read_threads_num++;
662 } /* for (i) */
663 } /* }}} void start_read_threads */
664
stop_read_threads(void)665 static void stop_read_threads(void) {
666 if (read_threads == NULL)
667 return;
668
669 INFO("collectd: Stopping %" PRIsz " read threads.", read_threads_num);
670
671 pthread_mutex_lock(&read_lock);
672 read_loop = 0;
673 DEBUG("plugin: stop_read_threads: Signalling `read_cond'");
674 pthread_cond_broadcast(&read_cond);
675 pthread_mutex_unlock(&read_lock);
676
677 for (size_t i = 0; i < read_threads_num; i++) {
678 if (pthread_join(read_threads[i], NULL) != 0) {
679 ERROR("plugin: stop_read_threads: pthread_join failed.");
680 }
681 read_threads[i] = (pthread_t)0;
682 }
683 sfree(read_threads);
684 read_threads_num = 0;
685 } /* void stop_read_threads */
686
plugin_value_list_free(value_list_t * vl)687 static void plugin_value_list_free(value_list_t *vl) /* {{{ */
688 {
689 if (vl == NULL)
690 return;
691
692 meta_data_destroy(vl->meta);
693 sfree(vl->values);
694 sfree(vl);
695 } /* }}} void plugin_value_list_free */
696
697 static value_list_t *
plugin_value_list_clone(value_list_t const * vl_orig)698 plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */
699 {
700 value_list_t *vl;
701
702 if (vl_orig == NULL)
703 return NULL;
704
705 vl = malloc(sizeof(*vl));
706 if (vl == NULL)
707 return NULL;
708 memcpy(vl, vl_orig, sizeof(*vl));
709
710 if (vl->host[0] == 0)
711 sstrncpy(vl->host, hostname_g, sizeof(vl->host));
712
713 vl->values = calloc(vl_orig->values_len, sizeof(*vl->values));
714 if (vl->values == NULL) {
715 plugin_value_list_free(vl);
716 return NULL;
717 }
718 memcpy(vl->values, vl_orig->values,
719 vl_orig->values_len * sizeof(*vl->values));
720
721 vl->meta = meta_data_clone(vl->meta);
722 if ((vl_orig->meta != NULL) && (vl->meta == NULL)) {
723 plugin_value_list_free(vl);
724 return NULL;
725 }
726
727 if (vl->time == 0)
728 vl->time = cdtime();
729
730 /* Fill in the interval from the thread context, if it is zero. */
731 if (vl->interval == 0)
732 vl->interval = plugin_get_interval();
733
734 return vl;
735 } /* }}} value_list_t *plugin_value_list_clone */
736
plugin_write_enqueue(value_list_t const * vl)737 static int plugin_write_enqueue(value_list_t const *vl) /* {{{ */
738 {
739 write_queue_t *q;
740
741 q = malloc(sizeof(*q));
742 if (q == NULL)
743 return ENOMEM;
744 q->next = NULL;
745
746 q->vl = plugin_value_list_clone(vl);
747 if (q->vl == NULL) {
748 sfree(q);
749 return ENOMEM;
750 }
751
752 /* Store context of caller (read plugin); otherwise, it would not be
753 * available to the write plugins when actually dispatching the
754 * value-list later on. */
755 q->ctx = plugin_get_ctx();
756
757 pthread_mutex_lock(&write_lock);
758
759 if (write_queue_tail == NULL) {
760 write_queue_head = q;
761 write_queue_tail = q;
762 write_queue_length = 1;
763 } else {
764 write_queue_tail->next = q;
765 write_queue_tail = q;
766 write_queue_length += 1;
767 }
768
769 pthread_cond_signal(&write_cond);
770 pthread_mutex_unlock(&write_lock);
771
772 return 0;
773 } /* }}} int plugin_write_enqueue */
774
plugin_write_dequeue(void)775 static value_list_t *plugin_write_dequeue(void) /* {{{ */
776 {
777 write_queue_t *q;
778 value_list_t *vl;
779
780 pthread_mutex_lock(&write_lock);
781
782 while (write_loop && (write_queue_head == NULL))
783 pthread_cond_wait(&write_cond, &write_lock);
784
785 if (write_queue_head == NULL) {
786 pthread_mutex_unlock(&write_lock);
787 return NULL;
788 }
789
790 q = write_queue_head;
791 write_queue_head = q->next;
792 write_queue_length -= 1;
793 if (write_queue_head == NULL) {
794 write_queue_tail = NULL;
795 assert(0 == write_queue_length);
796 }
797
798 pthread_mutex_unlock(&write_lock);
799
800 (void)plugin_set_ctx(q->ctx);
801
802 vl = q->vl;
803 sfree(q);
804 return vl;
805 } /* }}} value_list_t *plugin_write_dequeue */
806
plugin_write_thread(void * args)807 static void *plugin_write_thread(void __attribute__((unused)) * args) /* {{{ */
808 {
809 while (write_loop) {
810 value_list_t *vl = plugin_write_dequeue();
811 if (vl == NULL)
812 continue;
813
814 plugin_dispatch_values_internal(vl);
815
816 plugin_value_list_free(vl);
817 }
818
819 pthread_exit(NULL);
820 return (void *)0;
821 } /* }}} void *plugin_write_thread */
822
start_write_threads(size_t num)823 static void start_write_threads(size_t num) /* {{{ */
824 {
825 if (write_threads != NULL)
826 return;
827
828 write_threads = calloc(num, sizeof(*write_threads));
829 if (write_threads == NULL) {
830 ERROR("plugin: start_write_threads: calloc failed.");
831 return;
832 }
833
834 write_threads_num = 0;
835 for (size_t i = 0; i < num; i++) {
836 int status = pthread_create(write_threads + write_threads_num,
837 /* attr = */ NULL, plugin_write_thread,
838 /* arg = */ NULL);
839 if (status != 0) {
840 ERROR("plugin: start_write_threads: pthread_create failed with status %i "
841 "(%s).",
842 status, STRERROR(status));
843 return;
844 }
845
846 char name[THREAD_NAME_MAX];
847 ssnprintf(name, sizeof(name), "writer#%" PRIu64,
848 (uint64_t)write_threads_num);
849 set_thread_name(write_threads[write_threads_num], name);
850
851 write_threads_num++;
852 } /* for (i) */
853 } /* }}} void start_write_threads */
854
stop_write_threads(void)855 static void stop_write_threads(void) /* {{{ */
856 {
857 write_queue_t *q;
858 size_t i;
859
860 if (write_threads == NULL)
861 return;
862
863 INFO("collectd: Stopping %" PRIsz " write threads.", write_threads_num);
864
865 pthread_mutex_lock(&write_lock);
866 write_loop = false;
867 DEBUG("plugin: stop_write_threads: Signalling `write_cond'");
868 pthread_cond_broadcast(&write_cond);
869 pthread_mutex_unlock(&write_lock);
870
871 for (i = 0; i < write_threads_num; i++) {
872 if (pthread_join(write_threads[i], NULL) != 0) {
873 ERROR("plugin: stop_write_threads: pthread_join failed.");
874 }
875 write_threads[i] = (pthread_t)0;
876 }
877 sfree(write_threads);
878 write_threads_num = 0;
879
880 pthread_mutex_lock(&write_lock);
881 i = 0;
882 for (q = write_queue_head; q != NULL;) {
883 write_queue_t *q1 = q;
884 plugin_value_list_free(q->vl);
885 q = q->next;
886 sfree(q1);
887 i++;
888 }
889 write_queue_head = NULL;
890 write_queue_tail = NULL;
891 write_queue_length = 0;
892 pthread_mutex_unlock(&write_lock);
893
894 if (i > 0) {
895 WARNING("plugin: %" PRIsz " value list%s left after shutting down "
896 "the write threads.",
897 i, (i == 1) ? " was" : "s were");
898 }
899 } /* }}} void stop_write_threads */
900
901 /*
902 * Public functions
903 */
plugin_set_dir(const char * dir)904 void plugin_set_dir(const char *dir) {
905 sfree(plugindir);
906
907 if (dir == NULL) {
908 plugindir = NULL;
909 return;
910 }
911
912 plugindir = strdup(dir);
913 if (plugindir == NULL)
914 ERROR("plugin_set_dir: strdup(\"%s\") failed", dir);
915 }
916
plugin_is_loaded(char const * name)917 bool plugin_is_loaded(char const *name) {
918 if (plugins_loaded == NULL)
919 plugins_loaded =
920 c_avl_create((int (*)(const void *, const void *))strcasecmp);
921 assert(plugins_loaded != NULL);
922
923 int status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
924 return status == 0;
925 }
926
plugin_mark_loaded(char const * name)927 static int plugin_mark_loaded(char const *name) {
928 char *name_copy;
929 int status;
930
931 name_copy = strdup(name);
932 if (name_copy == NULL)
933 return ENOMEM;
934
935 status = c_avl_insert(plugins_loaded,
936 /* key = */ name_copy, /* value = */ NULL);
937 return status;
938 }
939
plugin_free_loaded(void)940 static void plugin_free_loaded(void) {
941 void *key;
942 void *value;
943
944 if (plugins_loaded == NULL)
945 return;
946
947 while (c_avl_pick(plugins_loaded, &key, &value) == 0) {
948 sfree(key);
949 assert(value == NULL);
950 }
951
952 c_avl_destroy(plugins_loaded);
953 plugins_loaded = NULL;
954 }
955
956 #define BUFSIZE 512
957 #ifdef WIN32
958 #define SHLIB_SUFFIX ".dll"
959 #else
960 #define SHLIB_SUFFIX ".so"
961 #endif
plugin_load(char const * plugin_name,bool global)962 int plugin_load(char const *plugin_name, bool global) {
963 DIR *dh;
964 const char *dir;
965 char filename[BUFSIZE] = "";
966 char typename[BUFSIZE];
967 int ret;
968 struct stat statbuf;
969 struct dirent *de;
970 int status;
971
972 if (plugin_name == NULL)
973 return EINVAL;
974
975 /* Check if plugin is already loaded and don't do anything in this
976 * case. */
977 if (plugin_is_loaded(plugin_name))
978 return 0;
979
980 dir = plugin_get_dir();
981 ret = 1;
982
983 /*
984 * XXX: Magic at work:
985 *
986 * Some of the language bindings, for example the Python and Perl
987 * plugins, need to be able to export symbols to the scripts they run.
988 * For this to happen, the "Globals" flag needs to be set.
989 * Unfortunately, this technical detail is hard to explain to the
990 * average user and she shouldn't have to worry about this, ideally.
991 * So in order to save everyone's sanity use a different default for a
992 * handful of special plugins. --octo
993 */
994 if ((strcasecmp("perl", plugin_name) == 0) ||
995 (strcasecmp("python", plugin_name) == 0))
996 global = true;
997
998 /* `cpu' should not match `cpufreq'. To solve this we add SHLIB_SUFFIX to the
999 * type when matching the filename */
1000 status = snprintf(typename, sizeof(typename), "%s" SHLIB_SUFFIX, plugin_name);
1001 if ((status < 0) || ((size_t)status >= sizeof(typename))) {
1002 WARNING("plugin_load: Filename too long: \"%s" SHLIB_SUFFIX "\"",
1003 plugin_name);
1004 return -1;
1005 }
1006
1007 if ((dh = opendir(dir)) == NULL) {
1008 ERROR("plugin_load: opendir (%s) failed: %s", dir, STRERRNO);
1009 return -1;
1010 }
1011
1012 while ((de = readdir(dh)) != NULL) {
1013 if (strcasecmp(de->d_name, typename))
1014 continue;
1015
1016 status = snprintf(filename, sizeof(filename), "%s/%s", dir, de->d_name);
1017 if ((status < 0) || ((size_t)status >= sizeof(filename))) {
1018 WARNING("plugin_load: Filename too long: \"%s/%s\"", dir, de->d_name);
1019 continue;
1020 }
1021
1022 if (lstat(filename, &statbuf) == -1) {
1023 WARNING("plugin_load: stat (\"%s\") failed: %s", filename, STRERRNO);
1024 continue;
1025 } else if (!S_ISREG(statbuf.st_mode)) {
1026 /* don't follow symlinks */
1027 WARNING("plugin_load: %s is not a regular file.", filename);
1028 continue;
1029 }
1030
1031 status = plugin_load_file(filename, global);
1032 if (status == 0) {
1033 /* success */
1034 plugin_mark_loaded(plugin_name);
1035 ret = 0;
1036 INFO("plugin_load: plugin \"%s\" successfully loaded.", plugin_name);
1037 break;
1038 } else {
1039 ERROR("plugin_load: Load plugin \"%s\" failed with "
1040 "status %i.",
1041 plugin_name, status);
1042 }
1043 }
1044
1045 closedir(dh);
1046
1047 if (filename[0] == 0)
1048 ERROR("plugin_load: Could not find plugin \"%s\" in %s", plugin_name, dir);
1049
1050 return ret;
1051 }
1052
1053 /*
1054 * The `register_*' functions follow
1055 */
plugin_register_config(const char * name,int (* callback)(const char * key,const char * val),const char ** keys,int keys_num)1056 EXPORT int plugin_register_config(const char *name,
1057 int (*callback)(const char *key,
1058 const char *val),
1059 const char **keys, int keys_num) {
1060 cf_register(name, callback, keys, keys_num);
1061 return 0;
1062 } /* int plugin_register_config */
1063
plugin_register_complex_config(const char * type,int (* callback)(oconfig_item_t *))1064 EXPORT int plugin_register_complex_config(const char *type,
1065 int (*callback)(oconfig_item_t *)) {
1066 return cf_register_complex(type, callback);
1067 } /* int plugin_register_complex_config */
1068
plugin_register_init(const char * name,int (* callback)(void))1069 EXPORT int plugin_register_init(const char *name, int (*callback)(void)) {
1070 return create_register_callback(&list_init, name, (void *)callback, NULL);
1071 } /* plugin_register_init */
1072
plugin_compare_read_func(const void * arg0,const void * arg1)1073 static int plugin_compare_read_func(const void *arg0, const void *arg1) {
1074 const read_func_t *rf0;
1075 const read_func_t *rf1;
1076
1077 rf0 = arg0;
1078 rf1 = arg1;
1079
1080 if (rf0->rf_next_read < rf1->rf_next_read)
1081 return -1;
1082 else if (rf0->rf_next_read > rf1->rf_next_read)
1083 return 1;
1084 else
1085 return 0;
1086 } /* int plugin_compare_read_func */
1087
1088 /* Add a read function to both, the heap and a linked list. The linked list if
1089 * used to look-up read functions, especially for the remove function. The heap
1090 * is used to determine which plugin to read next. */
plugin_insert_read(read_func_t * rf)1091 static int plugin_insert_read(read_func_t *rf) {
1092 int status;
1093 llentry_t *le;
1094
1095 rf->rf_next_read = cdtime();
1096 rf->rf_effective_interval = rf->rf_interval;
1097
1098 pthread_mutex_lock(&read_lock);
1099
1100 if (read_list == NULL) {
1101 read_list = llist_create();
1102 if (read_list == NULL) {
1103 pthread_mutex_unlock(&read_lock);
1104 ERROR("plugin_insert_read: read_list failed.");
1105 return -1;
1106 }
1107 }
1108
1109 if (read_heap == NULL) {
1110 read_heap = c_heap_create(plugin_compare_read_func);
1111 if (read_heap == NULL) {
1112 pthread_mutex_unlock(&read_lock);
1113 ERROR("plugin_insert_read: c_heap_create failed.");
1114 return -1;
1115 }
1116 }
1117
1118 le = llist_search(read_list, rf->rf_name);
1119 if (le != NULL) {
1120 pthread_mutex_unlock(&read_lock);
1121 P_WARNING("The read function \"%s\" is already registered. "
1122 "Check for duplicates in your configuration!",
1123 rf->rf_name);
1124 return EINVAL;
1125 }
1126
1127 le = llentry_create(rf->rf_name, rf);
1128 if (le == NULL) {
1129 pthread_mutex_unlock(&read_lock);
1130 ERROR("plugin_insert_read: llentry_create failed.");
1131 return -1;
1132 }
1133
1134 status = c_heap_insert(read_heap, rf);
1135 if (status != 0) {
1136 pthread_mutex_unlock(&read_lock);
1137 ERROR("plugin_insert_read: c_heap_insert failed.");
1138 llentry_destroy(le);
1139 return -1;
1140 }
1141
1142 /* This does not fail. */
1143 llist_append(read_list, le);
1144
1145 /* Wake up all the read threads. */
1146 pthread_cond_broadcast(&read_cond);
1147 pthread_mutex_unlock(&read_lock);
1148 return 0;
1149 } /* int plugin_insert_read */
1150
plugin_register_read(const char * name,int (* callback)(void))1151 EXPORT int plugin_register_read(const char *name, int (*callback)(void)) {
1152 read_func_t *rf;
1153 int status;
1154
1155 rf = calloc(1, sizeof(*rf));
1156 if (rf == NULL) {
1157 ERROR("plugin_register_read: calloc failed.");
1158 return ENOMEM;
1159 }
1160
1161 rf->rf_callback = (void *)callback;
1162 rf->rf_udata.data = NULL;
1163 rf->rf_udata.free_func = NULL;
1164 rf->rf_ctx = plugin_get_ctx();
1165 rf->rf_group[0] = '\0';
1166 rf->rf_name = strdup(name);
1167 rf->rf_type = RF_SIMPLE;
1168 rf->rf_interval = plugin_get_interval();
1169 rf->rf_ctx.interval = rf->rf_interval;
1170
1171 status = plugin_insert_read(rf);
1172 if (status != 0) {
1173 sfree(rf->rf_name);
1174 sfree(rf);
1175 }
1176
1177 return status;
1178 } /* int plugin_register_read */
1179
plugin_register_complex_read(const char * group,const char * name,plugin_read_cb callback,cdtime_t interval,user_data_t const * user_data)1180 EXPORT int plugin_register_complex_read(const char *group, const char *name,
1181 plugin_read_cb callback,
1182 cdtime_t interval,
1183 user_data_t const *user_data) {
1184 read_func_t *rf;
1185 int status;
1186
1187 rf = calloc(1, sizeof(*rf));
1188 if (rf == NULL) {
1189 free_userdata(user_data);
1190 ERROR("plugin_register_complex_read: calloc failed.");
1191 return ENOMEM;
1192 }
1193
1194 rf->rf_callback = (void *)callback;
1195 if (group != NULL)
1196 sstrncpy(rf->rf_group, group, sizeof(rf->rf_group));
1197 else
1198 rf->rf_group[0] = '\0';
1199 rf->rf_name = strdup(name);
1200 rf->rf_type = RF_COMPLEX;
1201 rf->rf_interval = (interval != 0) ? interval : plugin_get_interval();
1202
1203 /* Set user data */
1204 if (user_data == NULL) {
1205 rf->rf_udata.data = NULL;
1206 rf->rf_udata.free_func = NULL;
1207 } else {
1208 rf->rf_udata = *user_data;
1209 }
1210
1211 rf->rf_ctx = plugin_get_ctx();
1212 rf->rf_ctx.interval = rf->rf_interval;
1213
1214 status = plugin_insert_read(rf);
1215 if (status != 0) {
1216 free_userdata(&rf->rf_udata);
1217 sfree(rf->rf_name);
1218 sfree(rf);
1219 }
1220
1221 return status;
1222 } /* int plugin_register_complex_read */
1223
plugin_register_write(const char * name,plugin_write_cb callback,user_data_t const * ud)1224 EXPORT int plugin_register_write(const char *name, plugin_write_cb callback,
1225 user_data_t const *ud) {
1226 return create_register_callback(&list_write, name, (void *)callback, ud);
1227 } /* int plugin_register_write */
1228
plugin_flush_timeout_callback(user_data_t * ud)1229 static int plugin_flush_timeout_callback(user_data_t *ud) {
1230 flush_callback_t *cb = ud->data;
1231
1232 return plugin_flush(cb->name, cb->timeout, NULL);
1233 } /* static int plugin_flush_callback */
1234
plugin_flush_timeout_callback_free(void * data)1235 static void plugin_flush_timeout_callback_free(void *data) {
1236 flush_callback_t *cb = data;
1237
1238 if (cb == NULL)
1239 return;
1240
1241 sfree(cb->name);
1242 sfree(cb);
1243 } /* static void plugin_flush_callback_free */
1244
plugin_flush_callback_name(const char * name)1245 static char *plugin_flush_callback_name(const char *name) {
1246 const char *flush_prefix = "flush/";
1247 size_t prefix_size;
1248 char *flush_name;
1249 size_t name_size;
1250
1251 prefix_size = strlen(flush_prefix);
1252 name_size = strlen(name);
1253
1254 flush_name = malloc(name_size + prefix_size + 1);
1255 if (flush_name == NULL) {
1256 ERROR("plugin_flush_callback_name: malloc failed.");
1257 return NULL;
1258 }
1259
1260 sstrncpy(flush_name, flush_prefix, prefix_size + 1);
1261 sstrncpy(flush_name + prefix_size, name, name_size + 1);
1262
1263 return flush_name;
1264 } /* static char *plugin_flush_callback_name */
1265
plugin_register_flush(const char * name,plugin_flush_cb callback,user_data_t const * ud)1266 EXPORT int plugin_register_flush(const char *name, plugin_flush_cb callback,
1267 user_data_t const *ud) {
1268 int status;
1269 plugin_ctx_t ctx = plugin_get_ctx();
1270
1271 status = create_register_callback(&list_flush, name, (void *)callback, ud);
1272 if (status != 0)
1273 return status;
1274
1275 if (ctx.flush_interval != 0) {
1276 char *flush_name;
1277 flush_callback_t *cb;
1278
1279 flush_name = plugin_flush_callback_name(name);
1280 if (flush_name == NULL)
1281 return -1;
1282
1283 cb = malloc(sizeof(*cb));
1284 if (cb == NULL) {
1285 ERROR("plugin_register_flush: malloc failed.");
1286 sfree(flush_name);
1287 return -1;
1288 }
1289
1290 cb->name = strdup(name);
1291 if (cb->name == NULL) {
1292 ERROR("plugin_register_flush: strdup failed.");
1293 sfree(cb);
1294 sfree(flush_name);
1295 return -1;
1296 }
1297 cb->timeout = ctx.flush_timeout;
1298
1299 status = plugin_register_complex_read(
1300 /* group = */ "flush",
1301 /* name = */ flush_name,
1302 /* callback = */ plugin_flush_timeout_callback,
1303 /* interval = */ ctx.flush_interval,
1304 /* user data = */
1305 &(user_data_t){
1306 .data = cb,
1307 .free_func = plugin_flush_timeout_callback_free,
1308 });
1309
1310 sfree(flush_name);
1311 return status;
1312 }
1313
1314 return 0;
1315 } /* int plugin_register_flush */
1316
plugin_register_missing(const char * name,plugin_missing_cb callback,user_data_t const * ud)1317 EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback,
1318 user_data_t const *ud) {
1319 return create_register_callback(&list_missing, name, (void *)callback, ud);
1320 } /* int plugin_register_missing */
1321
plugin_register_cache_event(const char * name,plugin_cache_event_cb callback,user_data_t const * ud)1322 EXPORT int plugin_register_cache_event(const char *name,
1323 plugin_cache_event_cb callback,
1324 user_data_t const *ud) {
1325
1326 if (name == NULL || callback == NULL)
1327 return EINVAL;
1328
1329 char *name_copy = strdup(name);
1330 if (name_copy == NULL) {
1331 P_ERROR("plugin_register_cache_event: strdup failed.");
1332 free_userdata(ud);
1333 return ENOMEM;
1334 }
1335
1336 if (list_cache_event_num >= 32) {
1337 P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried "
1338 "to be registered.");
1339 free_userdata(ud);
1340 return ENOMEM;
1341 }
1342
1343 for (size_t i = 0; i < list_cache_event_num; i++) {
1344 cache_event_func_t *cef = &list_cache_event[i];
1345 if (!cef->callback)
1346 continue;
1347
1348 if (strcmp(name, cef->name) == 0) {
1349 P_ERROR("plugin_register_cache_event: a callback named `%s' already "
1350 "registered!",
1351 name);
1352 free_userdata(ud);
1353 return -1;
1354 }
1355 }
1356
1357 user_data_t user_data;
1358 if (ud == NULL) {
1359 user_data = (user_data_t){
1360 .data = NULL,
1361 .free_func = NULL,
1362 };
1363 } else {
1364 user_data = *ud;
1365 }
1366
1367 list_cache_event[list_cache_event_num] =
1368 (cache_event_func_t){.callback = callback,
1369 .name = name_copy,
1370 .user_data = user_data,
1371 .plugin_ctx = plugin_get_ctx()};
1372 list_cache_event_num++;
1373
1374 return 0;
1375 } /* int plugin_register_cache_event */
1376
plugin_register_shutdown(const char * name,int (* callback)(void))1377 EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) {
1378 return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
1379 } /* int plugin_register_shutdown */
1380
plugin_free_data_sets(void)1381 static void plugin_free_data_sets(void) {
1382 void *key;
1383 void *value;
1384
1385 if (data_sets == NULL)
1386 return;
1387
1388 while (c_avl_pick(data_sets, &key, &value) == 0) {
1389 data_set_t *ds = value;
1390 /* key is a pointer to ds->type */
1391
1392 sfree(ds->ds);
1393 sfree(ds);
1394 }
1395
1396 c_avl_destroy(data_sets);
1397 data_sets = NULL;
1398 } /* void plugin_free_data_sets */
1399
plugin_register_data_set(const data_set_t * ds)1400 EXPORT int plugin_register_data_set(const data_set_t *ds) {
1401 data_set_t *ds_copy;
1402
1403 if ((data_sets != NULL) && (c_avl_get(data_sets, ds->type, NULL) == 0)) {
1404 NOTICE("Replacing DS `%s' with another version.", ds->type);
1405 plugin_unregister_data_set(ds->type);
1406 } else if (data_sets == NULL) {
1407 data_sets = c_avl_create((int (*)(const void *, const void *))strcmp);
1408 if (data_sets == NULL)
1409 return -1;
1410 }
1411
1412 ds_copy = malloc(sizeof(*ds_copy));
1413 if (ds_copy == NULL)
1414 return -1;
1415 memcpy(ds_copy, ds, sizeof(data_set_t));
1416
1417 ds_copy->ds = malloc(sizeof(*ds_copy->ds) * ds->ds_num);
1418 if (ds_copy->ds == NULL) {
1419 sfree(ds_copy);
1420 return -1;
1421 }
1422
1423 for (size_t i = 0; i < ds->ds_num; i++)
1424 memcpy(ds_copy->ds + i, ds->ds + i, sizeof(data_source_t));
1425
1426 return c_avl_insert(data_sets, (void *)ds_copy->type, (void *)ds_copy);
1427 } /* int plugin_register_data_set */
1428
plugin_register_log(const char * name,plugin_log_cb callback,user_data_t const * ud)1429 EXPORT int plugin_register_log(const char *name, plugin_log_cb callback,
1430 user_data_t const *ud) {
1431 return create_register_callback(&list_log, name, (void *)callback, ud);
1432 } /* int plugin_register_log */
1433
plugin_register_notification(const char * name,plugin_notification_cb callback,user_data_t const * ud)1434 EXPORT int plugin_register_notification(const char *name,
1435 plugin_notification_cb callback,
1436 user_data_t const *ud) {
1437 return create_register_callback(&list_notification, name, (void *)callback,
1438 ud);
1439 } /* int plugin_register_log */
1440
plugin_unregister_config(const char * name)1441 EXPORT int plugin_unregister_config(const char *name) {
1442 cf_unregister(name);
1443 return 0;
1444 } /* int plugin_unregister_config */
1445
plugin_unregister_complex_config(const char * name)1446 EXPORT int plugin_unregister_complex_config(const char *name) {
1447 cf_unregister_complex(name);
1448 return 0;
1449 } /* int plugin_unregister_complex_config */
1450
plugin_unregister_init(const char * name)1451 EXPORT int plugin_unregister_init(const char *name) {
1452 return plugin_unregister(list_init, name);
1453 }
1454
plugin_unregister_read(const char * name)1455 EXPORT int plugin_unregister_read(const char *name) /* {{{ */
1456 {
1457 llentry_t *le;
1458 read_func_t *rf;
1459
1460 if (name == NULL)
1461 return -ENOENT;
1462
1463 pthread_mutex_lock(&read_lock);
1464
1465 if (read_list == NULL) {
1466 pthread_mutex_unlock(&read_lock);
1467 return -ENOENT;
1468 }
1469
1470 le = llist_search(read_list, name);
1471 if (le == NULL) {
1472 pthread_mutex_unlock(&read_lock);
1473 WARNING("plugin_unregister_read: No such read function: %s", name);
1474 return -ENOENT;
1475 }
1476
1477 llist_remove(read_list, le);
1478
1479 rf = le->value;
1480 assert(rf != NULL);
1481 rf->rf_type = RF_REMOVE;
1482
1483 pthread_mutex_unlock(&read_lock);
1484
1485 llentry_destroy(le);
1486
1487 DEBUG("plugin_unregister_read: Marked `%s' for removal.", name);
1488
1489 return 0;
1490 } /* }}} int plugin_unregister_read */
1491
plugin_log_available_writers(void)1492 EXPORT void plugin_log_available_writers(void) {
1493 log_list_callbacks(&list_write, "Available write targets:");
1494 }
1495
compare_read_func_group(llentry_t * e,void * ud)1496 static int compare_read_func_group(llentry_t *e, void *ud) /* {{{ */
1497 {
1498 read_func_t *rf = e->value;
1499 char *group = ud;
1500
1501 return strcmp(rf->rf_group, (const char *)group);
1502 } /* }}} int compare_read_func_group */
1503
plugin_unregister_read_group(const char * group)1504 EXPORT int plugin_unregister_read_group(const char *group) /* {{{ */
1505 {
1506 llentry_t *le;
1507 read_func_t *rf;
1508
1509 int found = 0;
1510
1511 if (group == NULL)
1512 return -ENOENT;
1513
1514 pthread_mutex_lock(&read_lock);
1515
1516 if (read_list == NULL) {
1517 pthread_mutex_unlock(&read_lock);
1518 return -ENOENT;
1519 }
1520
1521 while (42) {
1522 le = llist_search_custom(read_list, compare_read_func_group, (void *)group);
1523
1524 if (le == NULL)
1525 break;
1526
1527 ++found;
1528
1529 llist_remove(read_list, le);
1530
1531 rf = le->value;
1532 assert(rf != NULL);
1533 rf->rf_type = RF_REMOVE;
1534
1535 llentry_destroy(le);
1536
1537 DEBUG("plugin_unregister_read_group: "
1538 "Marked `%s' (group `%s') for removal.",
1539 rf->rf_name, group);
1540 }
1541
1542 pthread_mutex_unlock(&read_lock);
1543
1544 if (found == 0) {
1545 WARNING("plugin_unregister_read_group: No such "
1546 "group of read function: %s",
1547 group);
1548 return -ENOENT;
1549 }
1550
1551 return 0;
1552 } /* }}} int plugin_unregister_read_group */
1553
plugin_unregister_write(const char * name)1554 EXPORT int plugin_unregister_write(const char *name) {
1555 return plugin_unregister(list_write, name);
1556 }
1557
plugin_unregister_flush(const char * name)1558 EXPORT int plugin_unregister_flush(const char *name) {
1559 plugin_ctx_t ctx = plugin_get_ctx();
1560
1561 if (ctx.flush_interval != 0) {
1562 char *flush_name;
1563
1564 flush_name = plugin_flush_callback_name(name);
1565 if (flush_name != NULL) {
1566 plugin_unregister_read(flush_name);
1567 sfree(flush_name);
1568 }
1569 }
1570
1571 return plugin_unregister(list_flush, name);
1572 }
1573
plugin_unregister_missing(const char * name)1574 EXPORT int plugin_unregister_missing(const char *name) {
1575 return plugin_unregister(list_missing, name);
1576 }
1577
plugin_unregister_cache_event(const char * name)1578 EXPORT int plugin_unregister_cache_event(const char *name) {
1579 for (size_t i = 0; i < list_cache_event_num; i++) {
1580 cache_event_func_t *cef = &list_cache_event[i];
1581 if (!cef->callback)
1582 continue;
1583 if (strcmp(name, cef->name) == 0) {
1584 /* Mark callback as inactive, so mask in cache entries remains actual */
1585 cef->callback = NULL;
1586 sfree(cef->name);
1587 free_userdata(&cef->user_data);
1588 }
1589 }
1590 return 0;
1591 }
1592
destroy_cache_event_callbacks()1593 static void destroy_cache_event_callbacks() {
1594 for (size_t i = 0; i < list_cache_event_num; i++) {
1595 cache_event_func_t *cef = &list_cache_event[i];
1596 if (!cef->callback)
1597 continue;
1598 cef->callback = NULL;
1599 sfree(cef->name);
1600 free_userdata(&cef->user_data);
1601 }
1602 }
1603
plugin_unregister_shutdown(const char * name)1604 EXPORT int plugin_unregister_shutdown(const char *name) {
1605 return plugin_unregister(list_shutdown, name);
1606 }
1607
plugin_unregister_data_set(const char * name)1608 EXPORT int plugin_unregister_data_set(const char *name) {
1609 data_set_t *ds;
1610
1611 if (data_sets == NULL)
1612 return -1;
1613
1614 if (c_avl_remove(data_sets, name, NULL, (void *)&ds) != 0)
1615 return -1;
1616
1617 sfree(ds->ds);
1618 sfree(ds);
1619
1620 return 0;
1621 } /* int plugin_unregister_data_set */
1622
plugin_unregister_log(const char * name)1623 EXPORT int plugin_unregister_log(const char *name) {
1624 return plugin_unregister(list_log, name);
1625 }
1626
plugin_unregister_notification(const char * name)1627 EXPORT int plugin_unregister_notification(const char *name) {
1628 return plugin_unregister(list_notification, name);
1629 }
1630
plugin_init_all(void)1631 EXPORT int plugin_init_all(void) {
1632 char const *chain_name;
1633 llentry_t *le;
1634 int status;
1635 int ret = 0;
1636
1637 /* Init the value cache */
1638 uc_init();
1639
1640 if (IS_TRUE(global_option_get("CollectInternalStats"))) {
1641 record_statistics = true;
1642 plugin_register_read("collectd", plugin_update_internal_statistics);
1643 }
1644
1645 chain_name = global_option_get("PreCacheChain");
1646 pre_cache_chain = fc_chain_get_by_name(chain_name);
1647
1648 chain_name = global_option_get("PostCacheChain");
1649 post_cache_chain = fc_chain_get_by_name(chain_name);
1650
1651 write_limit_high = global_option_get_long("WriteQueueLimitHigh",
1652 /* default = */ 0);
1653 if (write_limit_high < 0) {
1654 ERROR("WriteQueueLimitHigh must be positive or zero.");
1655 write_limit_high = 0;
1656 }
1657
1658 write_limit_low =
1659 global_option_get_long("WriteQueueLimitLow",
1660 /* default = */ write_limit_high / 2);
1661 if (write_limit_low < 0) {
1662 ERROR("WriteQueueLimitLow must be positive or zero.");
1663 write_limit_low = write_limit_high / 2;
1664 } else if (write_limit_low > write_limit_high) {
1665 ERROR("WriteQueueLimitLow must not be larger than "
1666 "WriteQueueLimitHigh.");
1667 write_limit_low = write_limit_high;
1668 }
1669
1670 write_threads_num = global_option_get_long("WriteThreads",
1671 /* default = */ 5);
1672 if (write_threads_num < 1) {
1673 ERROR("WriteThreads must be positive.");
1674 write_threads_num = 5;
1675 }
1676
1677 if ((list_init == NULL) && (read_heap == NULL))
1678 return ret;
1679
1680 /* Calling all init callbacks before checking if read callbacks
1681 * are available allows the init callbacks to register the read
1682 * callback. */
1683 le = llist_head(list_init);
1684 while (le != NULL) {
1685 callback_func_t *cf;
1686 plugin_init_cb callback;
1687 plugin_ctx_t old_ctx;
1688
1689 cf = le->value;
1690 old_ctx = plugin_set_ctx(cf->cf_ctx);
1691 callback = cf->cf_callback;
1692 status = (*callback)();
1693 plugin_set_ctx(old_ctx);
1694
1695 if (status != 0) {
1696 ERROR("Initialization of plugin `%s' "
1697 "failed with status %i. "
1698 "Plugin will be unloaded.",
1699 le->key, status);
1700 /* Plugins that register read callbacks from the init
1701 * callback should take care of appropriate error
1702 * handling themselves. */
1703 /* FIXME: Unload _all_ functions */
1704 plugin_unregister_read(le->key);
1705 ret = -1;
1706 }
1707
1708 le = le->next;
1709 }
1710
1711 start_write_threads((size_t)write_threads_num);
1712
1713 max_read_interval =
1714 global_option_get_time("MaxReadInterval", DEFAULT_MAX_READ_INTERVAL);
1715
1716 /* Start read-threads */
1717 if (read_heap != NULL) {
1718 const char *rt;
1719 int num;
1720
1721 rt = global_option_get("ReadThreads");
1722 num = atoi(rt);
1723 if (num != -1)
1724 start_read_threads((num > 0) ? ((size_t)num) : 5);
1725 }
1726 return ret;
1727 } /* void plugin_init_all */
1728
1729 /* TODO: Rename this function. */
plugin_read_all(void)1730 EXPORT void plugin_read_all(void) {
1731 uc_check_timeout();
1732
1733 return;
1734 } /* void plugin_read_all */
1735
1736 /* Read function called when the `-T' command line argument is given. */
plugin_read_all_once(void)1737 EXPORT int plugin_read_all_once(void) {
1738 int status;
1739 int return_status = 0;
1740
1741 if (read_heap == NULL) {
1742 NOTICE("No read-functions are registered.");
1743 return 0;
1744 }
1745
1746 while (42) {
1747 read_func_t *rf;
1748 plugin_ctx_t old_ctx;
1749
1750 rf = c_heap_get_root(read_heap);
1751 if (rf == NULL)
1752 break;
1753
1754 old_ctx = plugin_set_ctx(rf->rf_ctx);
1755
1756 if (rf->rf_type == RF_SIMPLE) {
1757 int (*callback)(void);
1758
1759 callback = rf->rf_callback;
1760 status = (*callback)();
1761 } else {
1762 plugin_read_cb callback;
1763
1764 callback = rf->rf_callback;
1765 status = (*callback)(&rf->rf_udata);
1766 }
1767
1768 plugin_set_ctx(old_ctx);
1769
1770 if (status != 0) {
1771 NOTICE("read-function of plugin `%s' failed.", rf->rf_name);
1772 return_status = -1;
1773 }
1774
1775 sfree(rf->rf_name);
1776 destroy_callback((void *)rf);
1777 }
1778
1779 return return_status;
1780 } /* int plugin_read_all_once */
1781
plugin_write(const char * plugin,const data_set_t * ds,const value_list_t * vl)1782 EXPORT int plugin_write(const char *plugin, /* {{{ */
1783 const data_set_t *ds, const value_list_t *vl) {
1784 llentry_t *le;
1785 int status;
1786
1787 if (vl == NULL)
1788 return EINVAL;
1789
1790 if (list_write == NULL)
1791 return ENOENT;
1792
1793 if (ds == NULL) {
1794 ds = plugin_get_ds(vl->type);
1795 if (ds == NULL) {
1796 ERROR("plugin_write: Unable to lookup type `%s'.", vl->type);
1797 return ENOENT;
1798 }
1799 }
1800
1801 if (plugin == NULL) {
1802 int success = 0;
1803 int failure = 0;
1804
1805 le = llist_head(list_write);
1806 while (le != NULL) {
1807 callback_func_t *cf = le->value;
1808 plugin_write_cb callback;
1809
1810 /* Keep the read plugin's interval and flush information but update the
1811 * plugin name. */
1812 plugin_ctx_t old_ctx = plugin_get_ctx();
1813 plugin_ctx_t ctx = old_ctx;
1814 ctx.name = cf->cf_ctx.name;
1815 plugin_set_ctx(ctx);
1816
1817 DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1818 callback = cf->cf_callback;
1819 status = (*callback)(ds, vl, &cf->cf_udata);
1820 if (status != 0)
1821 failure++;
1822 else
1823 success++;
1824
1825 plugin_set_ctx(old_ctx);
1826 le = le->next;
1827 }
1828
1829 if ((success == 0) && (failure != 0))
1830 status = -1;
1831 else
1832 status = 0;
1833 } else /* plugin != NULL */
1834 {
1835 callback_func_t *cf;
1836 plugin_write_cb callback;
1837
1838 le = llist_head(list_write);
1839 while (le != NULL) {
1840 if (strcasecmp(plugin, le->key) == 0)
1841 break;
1842
1843 le = le->next;
1844 }
1845
1846 if (le == NULL)
1847 return ENOENT;
1848
1849 cf = le->value;
1850
1851 /* do not switch plugin context; rather keep the context (interval)
1852 * information of the calling read plugin */
1853
1854 DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1855 callback = cf->cf_callback;
1856 status = (*callback)(ds, vl, &cf->cf_udata);
1857 }
1858
1859 return status;
1860 } /* }}} int plugin_write */
1861
plugin_flush(const char * plugin,cdtime_t timeout,const char * identifier)1862 EXPORT int plugin_flush(const char *plugin, cdtime_t timeout,
1863 const char *identifier) {
1864 llentry_t *le;
1865
1866 if (list_flush == NULL)
1867 return 0;
1868
1869 le = llist_head(list_flush);
1870 while (le != NULL) {
1871 callback_func_t *cf;
1872 plugin_flush_cb callback;
1873 plugin_ctx_t old_ctx;
1874
1875 if ((plugin != NULL) && (strcmp(plugin, le->key) != 0)) {
1876 le = le->next;
1877 continue;
1878 }
1879
1880 cf = le->value;
1881 old_ctx = plugin_set_ctx(cf->cf_ctx);
1882 callback = cf->cf_callback;
1883
1884 (*callback)(timeout, identifier, &cf->cf_udata);
1885
1886 plugin_set_ctx(old_ctx);
1887
1888 le = le->next;
1889 }
1890 return 0;
1891 } /* int plugin_flush */
1892
plugin_shutdown_all(void)1893 EXPORT int plugin_shutdown_all(void) {
1894 llentry_t *le;
1895 int ret = 0; // Assume success.
1896
1897 destroy_all_callbacks(&list_init);
1898
1899 stop_read_threads();
1900
1901 pthread_mutex_lock(&read_lock);
1902 llist_destroy(read_list);
1903 read_list = NULL;
1904 pthread_mutex_unlock(&read_lock);
1905
1906 destroy_read_heap();
1907
1908 /* blocks until all write threads have shut down. */
1909 stop_write_threads();
1910
1911 /* ask all plugins to write out the state they kept. */
1912 plugin_flush(/* plugin = */ NULL,
1913 /* timeout = */ 0,
1914 /* identifier = */ NULL);
1915
1916 le = NULL;
1917 if (list_shutdown != NULL)
1918 le = llist_head(list_shutdown);
1919
1920 while (le != NULL) {
1921 callback_func_t *cf;
1922 plugin_shutdown_cb callback;
1923 plugin_ctx_t old_ctx;
1924
1925 cf = le->value;
1926 old_ctx = plugin_set_ctx(cf->cf_ctx);
1927 callback = cf->cf_callback;
1928
1929 /* Advance the pointer before calling the callback allows
1930 * shutdown functions to unregister themselves. If done the
1931 * other way around the memory `le' points to will be freed
1932 * after callback returns. */
1933 le = le->next;
1934
1935 if ((*callback)() != 0)
1936 ret = -1;
1937
1938 plugin_set_ctx(old_ctx);
1939 }
1940
1941 /* Write plugins which use the `user_data' pointer usually need the
1942 * same data available to the flush callback. If this is the case, set
1943 * the free_function to NULL when registering the flush callback and to
1944 * the real free function when registering the write callback. This way
1945 * the data isn't freed twice. */
1946 destroy_all_callbacks(&list_flush);
1947 destroy_all_callbacks(&list_missing);
1948 destroy_cache_event_callbacks();
1949 destroy_all_callbacks(&list_write);
1950
1951 destroy_all_callbacks(&list_notification);
1952 destroy_all_callbacks(&list_shutdown);
1953 destroy_all_callbacks(&list_log);
1954
1955 plugin_free_loaded();
1956 plugin_free_data_sets();
1957 return ret;
1958 } /* void plugin_shutdown_all */
1959
plugin_dispatch_missing(const value_list_t * vl)1960 EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
1961 {
1962 if (list_missing == NULL)
1963 return 0;
1964
1965 llentry_t *le = llist_head(list_missing);
1966 while (le != NULL) {
1967 callback_func_t *cf = le->value;
1968 plugin_ctx_t old_ctx = plugin_set_ctx(cf->cf_ctx);
1969 plugin_missing_cb callback = cf->cf_callback;
1970
1971 int status = (*callback)(vl, &cf->cf_udata);
1972 plugin_set_ctx(old_ctx);
1973 if (status != 0) {
1974 if (status < 0) {
1975 ERROR("plugin_dispatch_missing: Callback function \"%s\" "
1976 "failed with status %i.",
1977 le->key, status);
1978 return status;
1979 } else {
1980 return 0;
1981 }
1982 }
1983
1984 le = le->next;
1985 }
1986 return 0;
1987 } /* int }}} plugin_dispatch_missing */
1988
plugin_dispatch_cache_event(enum cache_event_type_e event_type,unsigned long callbacks_mask,const char * name,const value_list_t * vl)1989 void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
1990 unsigned long callbacks_mask, const char *name,
1991 const value_list_t *vl) {
1992 switch (event_type) {
1993 case CE_VALUE_NEW:
1994 callbacks_mask = 0;
1995 for (size_t i = 0; i < list_cache_event_num; i++) {
1996 cache_event_func_t *cef = &list_cache_event[i];
1997 plugin_cache_event_cb callback = cef->callback;
1998
1999 if (!callback)
2000 continue;
2001
2002 cache_event_t event = (cache_event_t){.type = event_type,
2003 .value_list = vl,
2004 .value_list_name = name,
2005 .ret = 0};
2006
2007 plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
2008 int status = (*callback)(&event, &cef->user_data);
2009 plugin_set_ctx(old_ctx);
2010
2011 if (status != 0) {
2012 ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
2013 "%i for event NEW.",
2014 cef->name, status);
2015 } else {
2016 if (event.ret) {
2017 DEBUG(
2018 "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.",
2019 cef->name, name);
2020 callbacks_mask |= (1 << (i));
2021 } else {
2022 DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.",
2023 cef->name, name);
2024 }
2025 }
2026 }
2027
2028 if (callbacks_mask)
2029 uc_set_callbacks_mask(name, callbacks_mask);
2030
2031 break;
2032 case CE_VALUE_UPDATE:
2033 case CE_VALUE_EXPIRED:
2034 for (size_t i = 0; i < list_cache_event_num; i++) {
2035 cache_event_func_t *cef = &list_cache_event[i];
2036 plugin_cache_event_cb callback = cef->callback;
2037
2038 if (!callback)
2039 continue;
2040
2041 if (callbacks_mask && (1 << (i)) == 0)
2042 continue;
2043
2044 cache_event_t event = (cache_event_t){.type = event_type,
2045 .value_list = vl,
2046 .value_list_name = name,
2047 .ret = 0};
2048
2049 plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
2050 int status = (*callback)(&event, &cef->user_data);
2051 plugin_set_ctx(old_ctx);
2052
2053 if (status != 0) {
2054 ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
2055 "%i for event %s.",
2056 cef->name, status,
2057 ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED"));
2058 }
2059 }
2060 break;
2061 }
2062 return;
2063 }
2064
plugin_dispatch_values_internal(value_list_t * vl)2065 static int plugin_dispatch_values_internal(value_list_t *vl) {
2066 int status;
2067 static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
2068
2069 bool free_meta_data = false;
2070
2071 assert(vl != NULL);
2072
2073 /* These fields are initialized by plugin_value_list_clone() if needed: */
2074 assert(vl->host[0] != 0);
2075 assert(vl->time != 0); /* The time is determined at _enqueue_ time. */
2076 assert(vl->interval != 0);
2077
2078 if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) {
2079 ERROR("plugin_dispatch_values: Invalid value list "
2080 "from plugin %s.",
2081 vl->plugin);
2082 return -1;
2083 }
2084
2085 /* Free meta data only if the calling function didn't specify any. In
2086 * this case matches and targets may add some and the calling function
2087 * may not expect (and therefore free) that data. */
2088 if (vl->meta == NULL)
2089 free_meta_data = true;
2090
2091 if (list_write == NULL)
2092 c_complain_once(LOG_WARNING, &no_write_complaint,
2093 "plugin_dispatch_values: No write callback has been "
2094 "registered. Please load at least one output plugin, "
2095 "if you want the collected data to be stored.");
2096
2097 if (data_sets == NULL) {
2098 ERROR("plugin_dispatch_values: No data sets registered. "
2099 "Could the types database be read? Check "
2100 "your `TypesDB' setting!");
2101 return -1;
2102 }
2103
2104 data_set_t *ds = NULL;
2105 if (c_avl_get(data_sets, vl->type, (void *)&ds) != 0) {
2106 char ident[6 * DATA_MAX_NAME_LEN];
2107
2108 FORMAT_VL(ident, sizeof(ident), vl);
2109 INFO("plugin_dispatch_values: Dataset not found: %s "
2110 "(from \"%s\"), check your types.db!",
2111 vl->type, ident);
2112 return -1;
2113 }
2114
2115 DEBUG("plugin_dispatch_values: time = %.3f; interval = %.3f; "
2116 "host = %s; "
2117 "plugin = %s; plugin_instance = %s; "
2118 "type = %s; type_instance = %s;",
2119 CDTIME_T_TO_DOUBLE(vl->time), CDTIME_T_TO_DOUBLE(vl->interval),
2120 vl->host, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
2121
2122 #if COLLECT_DEBUG
2123 assert(0 == strcmp(ds->type, vl->type));
2124 #else
2125 if (0 != strcmp(ds->type, vl->type))
2126 WARNING(
2127 "plugin_dispatch_values: <%s/%s-%s> (ds->type = %s) != (vl->type = %s)",
2128 vl->host, vl->plugin, vl->plugin_instance, ds->type, vl->type);
2129 #endif
2130
2131 #if COLLECT_DEBUG
2132 assert(ds->ds_num == vl->values_len);
2133 #else
2134 if (ds->ds_num != vl->values_len) {
2135 ERROR("plugin_dispatch_values: <%s/%s-%s/%s-%s> ds->type = %s: "
2136 "(ds->ds_num = %" PRIsz ") != "
2137 "(vl->values_len = %" PRIsz ")",
2138 vl->host, vl->plugin, vl->plugin_instance, vl->type,
2139 vl->type_instance, ds->type, ds->ds_num, vl->values_len);
2140 return -1;
2141 }
2142 #endif
2143
2144 escape_slashes(vl->host, sizeof(vl->host));
2145 escape_slashes(vl->plugin, sizeof(vl->plugin));
2146 escape_slashes(vl->plugin_instance, sizeof(vl->plugin_instance));
2147 escape_slashes(vl->type, sizeof(vl->type));
2148 escape_slashes(vl->type_instance, sizeof(vl->type_instance));
2149
2150 if (pre_cache_chain != NULL) {
2151 status = fc_process_chain(ds, vl, pre_cache_chain);
2152 if (status < 0) {
2153 WARNING("plugin_dispatch_values: Running the "
2154 "pre-cache chain failed with "
2155 "status %i (%#x).",
2156 status, status);
2157 } else if (status == FC_TARGET_STOP)
2158 return 0;
2159 }
2160
2161 /* Update the value cache */
2162 uc_update(ds, vl);
2163
2164 if (post_cache_chain != NULL) {
2165 status = fc_process_chain(ds, vl, post_cache_chain);
2166 if (status < 0) {
2167 WARNING("plugin_dispatch_values: Running the "
2168 "post-cache chain failed with "
2169 "status %i (%#x).",
2170 status, status);
2171 }
2172 } else
2173 fc_default_action(ds, vl);
2174
2175 if ((free_meta_data == true) && (vl->meta != NULL)) {
2176 meta_data_destroy(vl->meta);
2177 vl->meta = NULL;
2178 }
2179
2180 return 0;
2181 } /* int plugin_dispatch_values_internal */
2182
get_drop_probability(void)2183 static double get_drop_probability(void) /* {{{ */
2184 {
2185 long pos;
2186 long size;
2187 long wql;
2188
2189 pthread_mutex_lock(&write_lock);
2190 wql = write_queue_length;
2191 pthread_mutex_unlock(&write_lock);
2192
2193 if (wql < write_limit_low)
2194 return 0.0;
2195 if (wql >= write_limit_high)
2196 return 1.0;
2197
2198 pos = 1 + wql - write_limit_low;
2199 size = 1 + write_limit_high - write_limit_low;
2200
2201 return (double)pos / (double)size;
2202 } /* }}} double get_drop_probability */
2203
check_drop_value(void)2204 static bool check_drop_value(void) /* {{{ */
2205 {
2206 static cdtime_t last_message_time;
2207 static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
2208
2209 double p;
2210 double q;
2211 int status;
2212
2213 if (write_limit_high == 0)
2214 return false;
2215
2216 p = get_drop_probability();
2217 if (p == 0.0)
2218 return false;
2219
2220 status = pthread_mutex_trylock(&last_message_lock);
2221 if (status == 0) {
2222 cdtime_t now;
2223
2224 now = cdtime();
2225 if ((now - last_message_time) > TIME_T_TO_CDTIME_T(1)) {
2226 last_message_time = now;
2227 ERROR("plugin_dispatch_values: Low water mark "
2228 "reached. Dropping %.0f%% of metrics.",
2229 100.0 * p);
2230 }
2231 pthread_mutex_unlock(&last_message_lock);
2232 }
2233
2234 if (p == 1.0)
2235 return true;
2236
2237 q = cdrand_d();
2238 if (q > p)
2239 return true;
2240 else
2241 return false;
2242 } /* }}} bool check_drop_value */
2243
plugin_dispatch_values(value_list_t const * vl)2244 EXPORT int plugin_dispatch_values(value_list_t const *vl) {
2245 int status;
2246
2247 if (check_drop_value()) {
2248 if (record_statistics) {
2249 pthread_mutex_lock(&statistics_lock);
2250 stats_values_dropped++;
2251 pthread_mutex_unlock(&statistics_lock);
2252 }
2253 return 0;
2254 }
2255
2256 status = plugin_write_enqueue(vl);
2257 if (status != 0) {
2258 ERROR("plugin_dispatch_values: plugin_write_enqueue failed with status %i "
2259 "(%s).",
2260 status, STRERROR(status));
2261 return status;
2262 }
2263
2264 return 0;
2265 }
2266
2267 __attribute__((sentinel)) int
plugin_dispatch_multivalue(value_list_t const * template,bool store_percentage,int store_type,...)2268 plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */
2269 bool store_percentage, int store_type, ...) {
2270 value_list_t *vl;
2271 int failed = 0;
2272 gauge_t sum = 0.0;
2273 va_list ap;
2274
2275 if (check_drop_value()) {
2276 if (record_statistics) {
2277 pthread_mutex_lock(&statistics_lock);
2278 stats_values_dropped++;
2279 pthread_mutex_unlock(&statistics_lock);
2280 }
2281 return 0;
2282 }
2283
2284 assert(template->values_len == 1);
2285
2286 /* Calculate sum for Gauge to calculate percent if needed */
2287 if (DS_TYPE_GAUGE == store_type) {
2288 va_start(ap, store_type);
2289 while (42) {
2290 char const *name;
2291 gauge_t value;
2292
2293 name = va_arg(ap, char const *);
2294 if (name == NULL)
2295 break;
2296
2297 value = va_arg(ap, gauge_t);
2298 if (!isnan(value))
2299 sum += value;
2300 }
2301 va_end(ap);
2302 }
2303
2304 vl = plugin_value_list_clone(template);
2305 /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
2306 if (store_percentage)
2307 sstrncpy(vl->type, "percent", sizeof(vl->type));
2308
2309 va_start(ap, store_type);
2310 while (42) {
2311 char const *name;
2312 int status;
2313
2314 /* Set the type instance. */
2315 name = va_arg(ap, char const *);
2316 if (name == NULL)
2317 break;
2318 sstrncpy(vl->type_instance, name, sizeof(vl->type_instance));
2319
2320 /* Set the value. */
2321 switch (store_type) {
2322 case DS_TYPE_GAUGE:
2323 vl->values[0].gauge = va_arg(ap, gauge_t);
2324 if (store_percentage)
2325 vl->values[0].gauge *= sum ? (100.0 / sum) : NAN;
2326 break;
2327 case DS_TYPE_ABSOLUTE:
2328 vl->values[0].absolute = va_arg(ap, absolute_t);
2329 break;
2330 case DS_TYPE_COUNTER:
2331 vl->values[0].counter = va_arg(ap, counter_t);
2332 break;
2333 case DS_TYPE_DERIVE:
2334 vl->values[0].derive = va_arg(ap, derive_t);
2335 break;
2336 default:
2337 ERROR("plugin_dispatch_multivalue: given store_type is incorrect.");
2338 failed++;
2339 }
2340
2341 status = plugin_write_enqueue(vl);
2342 if (status != 0)
2343 failed++;
2344 }
2345 va_end(ap);
2346
2347 plugin_value_list_free(vl);
2348 return failed;
2349 } /* }}} int plugin_dispatch_multivalue */
2350
plugin_dispatch_notification(const notification_t * notif)2351 EXPORT int plugin_dispatch_notification(const notification_t *notif) {
2352 llentry_t *le;
2353 /* Possible TODO: Add flap detection here */
2354
2355 DEBUG("plugin_dispatch_notification: severity = %i; message = %s; "
2356 "time = %.3f; host = %s;",
2357 notif->severity, notif->message, CDTIME_T_TO_DOUBLE(notif->time),
2358 notif->host);
2359
2360 /* Nobody cares for notifications */
2361 if (list_notification == NULL)
2362 return -1;
2363
2364 le = llist_head(list_notification);
2365 while (le != NULL) {
2366 callback_func_t *cf;
2367 plugin_notification_cb callback;
2368 int status;
2369
2370 /* do not switch plugin context; rather keep the context
2371 * (interval) information of the calling plugin */
2372
2373 cf = le->value;
2374 callback = cf->cf_callback;
2375 status = (*callback)(notif, &cf->cf_udata);
2376 if (status != 0) {
2377 WARNING("plugin_dispatch_notification: Notification "
2378 "callback %s returned %i.",
2379 le->key, status);
2380 }
2381
2382 le = le->next;
2383 }
2384
2385 return 0;
2386 } /* int plugin_dispatch_notification */
2387
plugin_log(int level,const char * format,...)2388 EXPORT void plugin_log(int level, const char *format, ...) {
2389 char msg[1024];
2390 va_list ap;
2391 llentry_t *le;
2392
2393 #if !COLLECT_DEBUG
2394 if (level >= LOG_DEBUG)
2395 return;
2396 #endif
2397
2398 va_start(ap, format);
2399 vsnprintf(msg, sizeof(msg), format, ap);
2400 msg[sizeof(msg) - 1] = '\0';
2401 va_end(ap);
2402
2403 if (list_log == NULL) {
2404 fprintf(stderr, "%s\n", msg);
2405 return;
2406 }
2407
2408 le = llist_head(list_log);
2409 while (le != NULL) {
2410 callback_func_t *cf;
2411 plugin_log_cb callback;
2412
2413 cf = le->value;
2414 callback = cf->cf_callback;
2415
2416 /* do not switch plugin context; rather keep the context
2417 * (interval) information of the calling plugin */
2418
2419 (*callback)(level, msg, &cf->cf_udata);
2420
2421 le = le->next;
2422 }
2423 } /* void plugin_log */
2424
daemon_log(int level,const char * format,...)2425 void daemon_log(int level, const char *format, ...) {
2426 char msg[1024] = ""; // Size inherits from plugin_log()
2427
2428 char const *name = plugin_get_ctx().name;
2429 if (name == NULL)
2430 name = "UNKNOWN";
2431
2432 va_list ap;
2433 va_start(ap, format);
2434 vsnprintf(msg, sizeof(msg), format, ap);
2435 va_end(ap);
2436
2437 plugin_log(level, "%s plugin: %s", name, msg);
2438 } /* void daemon_log */
2439
parse_log_severity(const char * severity)2440 int parse_log_severity(const char *severity) {
2441 int log_level = -1;
2442
2443 if ((0 == strcasecmp(severity, "emerg")) ||
2444 (0 == strcasecmp(severity, "alert")) ||
2445 (0 == strcasecmp(severity, "crit")) || (0 == strcasecmp(severity, "err")))
2446 log_level = LOG_ERR;
2447 else if (0 == strcasecmp(severity, "warning"))
2448 log_level = LOG_WARNING;
2449 else if (0 == strcasecmp(severity, "notice"))
2450 log_level = LOG_NOTICE;
2451 else if (0 == strcasecmp(severity, "info"))
2452 log_level = LOG_INFO;
2453 #if COLLECT_DEBUG
2454 else if (0 == strcasecmp(severity, "debug"))
2455 log_level = LOG_DEBUG;
2456 #endif /* COLLECT_DEBUG */
2457
2458 return log_level;
2459 } /* int parse_log_severity */
2460
parse_notif_severity(const char * severity)2461 EXPORT int parse_notif_severity(const char *severity) {
2462 int notif_severity = -1;
2463
2464 if (strcasecmp(severity, "FAILURE") == 0)
2465 notif_severity = NOTIF_FAILURE;
2466 else if (strcmp(severity, "OKAY") == 0)
2467 notif_severity = NOTIF_OKAY;
2468 else if ((strcmp(severity, "WARNING") == 0) ||
2469 (strcmp(severity, "WARN") == 0))
2470 notif_severity = NOTIF_WARNING;
2471
2472 return notif_severity;
2473 } /* int parse_notif_severity */
2474
plugin_get_ds(const char * name)2475 EXPORT const data_set_t *plugin_get_ds(const char *name) {
2476 data_set_t *ds;
2477
2478 if (data_sets == NULL) {
2479 P_ERROR("plugin_get_ds: No data sets are defined yet.");
2480 return NULL;
2481 }
2482
2483 if (c_avl_get(data_sets, name, (void *)&ds) != 0) {
2484 DEBUG("No such dataset registered: %s", name);
2485 return NULL;
2486 }
2487
2488 return ds;
2489 } /* data_set_t *plugin_get_ds */
2490
plugin_notification_meta_add(notification_t * n,const char * name,enum notification_meta_type_e type,const void * value)2491 static int plugin_notification_meta_add(notification_t *n, const char *name,
2492 enum notification_meta_type_e type,
2493 const void *value) {
2494 notification_meta_t *meta;
2495 notification_meta_t *tail;
2496
2497 if ((n == NULL) || (name == NULL) || (value == NULL)) {
2498 ERROR("plugin_notification_meta_add: A pointer is NULL!");
2499 return -1;
2500 }
2501
2502 meta = calloc(1, sizeof(*meta));
2503 if (meta == NULL) {
2504 ERROR("plugin_notification_meta_add: calloc failed.");
2505 return -1;
2506 }
2507
2508 sstrncpy(meta->name, name, sizeof(meta->name));
2509 meta->type = type;
2510
2511 switch (type) {
2512 case NM_TYPE_STRING: {
2513 meta->nm_value.nm_string = strdup((const char *)value);
2514 if (meta->nm_value.nm_string == NULL) {
2515 ERROR("plugin_notification_meta_add: strdup failed.");
2516 sfree(meta);
2517 return -1;
2518 }
2519 break;
2520 }
2521 case NM_TYPE_SIGNED_INT: {
2522 meta->nm_value.nm_signed_int = *((int64_t *)value);
2523 break;
2524 }
2525 case NM_TYPE_UNSIGNED_INT: {
2526 meta->nm_value.nm_unsigned_int = *((uint64_t *)value);
2527 break;
2528 }
2529 case NM_TYPE_DOUBLE: {
2530 meta->nm_value.nm_double = *((double *)value);
2531 break;
2532 }
2533 case NM_TYPE_BOOLEAN: {
2534 meta->nm_value.nm_boolean = *((bool *)value);
2535 break;
2536 }
2537 default: {
2538 ERROR("plugin_notification_meta_add: Unknown type: %i", type);
2539 sfree(meta);
2540 return -1;
2541 }
2542 } /* switch (type) */
2543
2544 meta->next = NULL;
2545 tail = n->meta;
2546 while ((tail != NULL) && (tail->next != NULL))
2547 tail = tail->next;
2548
2549 if (tail == NULL)
2550 n->meta = meta;
2551 else
2552 tail->next = meta;
2553
2554 return 0;
2555 } /* int plugin_notification_meta_add */
2556
plugin_notification_meta_add_string(notification_t * n,const char * name,const char * value)2557 int plugin_notification_meta_add_string(notification_t *n, const char *name,
2558 const char *value) {
2559 return plugin_notification_meta_add(n, name, NM_TYPE_STRING, value);
2560 }
2561
plugin_notification_meta_add_signed_int(notification_t * n,const char * name,int64_t value)2562 int plugin_notification_meta_add_signed_int(notification_t *n, const char *name,
2563 int64_t value) {
2564 return plugin_notification_meta_add(n, name, NM_TYPE_SIGNED_INT, &value);
2565 }
2566
plugin_notification_meta_add_unsigned_int(notification_t * n,const char * name,uint64_t value)2567 int plugin_notification_meta_add_unsigned_int(notification_t *n,
2568 const char *name,
2569 uint64_t value) {
2570 return plugin_notification_meta_add(n, name, NM_TYPE_UNSIGNED_INT, &value);
2571 }
2572
plugin_notification_meta_add_double(notification_t * n,const char * name,double value)2573 int plugin_notification_meta_add_double(notification_t *n, const char *name,
2574 double value) {
2575 return plugin_notification_meta_add(n, name, NM_TYPE_DOUBLE, &value);
2576 }
2577
plugin_notification_meta_add_boolean(notification_t * n,const char * name,bool value)2578 int plugin_notification_meta_add_boolean(notification_t *n, const char *name,
2579 bool value) {
2580 return plugin_notification_meta_add(n, name, NM_TYPE_BOOLEAN, &value);
2581 }
2582
plugin_notification_meta_copy(notification_t * dst,const notification_t * src)2583 int plugin_notification_meta_copy(notification_t *dst,
2584 const notification_t *src) {
2585 assert(dst != NULL);
2586 assert(src != NULL);
2587 assert(dst != src);
2588 assert((src->meta == NULL) || (src->meta != dst->meta));
2589
2590 for (notification_meta_t *meta = src->meta; meta != NULL; meta = meta->next) {
2591 if (meta->type == NM_TYPE_STRING)
2592 plugin_notification_meta_add_string(dst, meta->name,
2593 meta->nm_value.nm_string);
2594 else if (meta->type == NM_TYPE_SIGNED_INT)
2595 plugin_notification_meta_add_signed_int(dst, meta->name,
2596 meta->nm_value.nm_signed_int);
2597 else if (meta->type == NM_TYPE_UNSIGNED_INT)
2598 plugin_notification_meta_add_unsigned_int(dst, meta->name,
2599 meta->nm_value.nm_unsigned_int);
2600 else if (meta->type == NM_TYPE_DOUBLE)
2601 plugin_notification_meta_add_double(dst, meta->name,
2602 meta->nm_value.nm_double);
2603 else if (meta->type == NM_TYPE_BOOLEAN)
2604 plugin_notification_meta_add_boolean(dst, meta->name,
2605 meta->nm_value.nm_boolean);
2606 }
2607
2608 return 0;
2609 } /* int plugin_notification_meta_copy */
2610
plugin_notification_meta_free(notification_meta_t * n)2611 int plugin_notification_meta_free(notification_meta_t *n) {
2612 notification_meta_t *this;
2613 notification_meta_t *next;
2614
2615 if (n == NULL) {
2616 ERROR("plugin_notification_meta_free: n == NULL!");
2617 return -1;
2618 }
2619
2620 this = n;
2621 while (this != NULL) {
2622 next = this->next;
2623
2624 if (this->type == NM_TYPE_STRING) {
2625 /* Assign to a temporary variable to work around nm_string's const
2626 * modifier. */
2627 void *tmp = (void *)this->nm_value.nm_string;
2628
2629 sfree(tmp);
2630 this->nm_value.nm_string = NULL;
2631 }
2632 sfree(this);
2633
2634 this = next;
2635 }
2636
2637 return 0;
2638 } /* int plugin_notification_meta_free */
2639
plugin_ctx_destructor(void * ctx)2640 static void plugin_ctx_destructor(void *ctx) {
2641 sfree(ctx);
2642 } /* void plugin_ctx_destructor */
2643
2644 static plugin_ctx_t ctx_init = {/* interval = */ 0};
2645
plugin_ctx_create(void)2646 static plugin_ctx_t *plugin_ctx_create(void) {
2647 plugin_ctx_t *ctx;
2648
2649 ctx = malloc(sizeof(*ctx));
2650 if (ctx == NULL) {
2651 ERROR("Failed to allocate plugin context: %s", STRERRNO);
2652 return NULL;
2653 }
2654
2655 *ctx = ctx_init;
2656 assert(plugin_ctx_key_initialized);
2657 pthread_setspecific(plugin_ctx_key, ctx);
2658 DEBUG("Created new plugin context.");
2659 return ctx;
2660 } /* int plugin_ctx_create */
2661
plugin_init_ctx(void)2662 EXPORT void plugin_init_ctx(void) {
2663 pthread_key_create(&plugin_ctx_key, plugin_ctx_destructor);
2664 plugin_ctx_key_initialized = true;
2665 } /* void plugin_init_ctx */
2666
plugin_get_ctx(void)2667 EXPORT plugin_ctx_t plugin_get_ctx(void) {
2668 plugin_ctx_t *ctx;
2669
2670 assert(plugin_ctx_key_initialized);
2671 ctx = pthread_getspecific(plugin_ctx_key);
2672
2673 if (ctx == NULL) {
2674 ctx = plugin_ctx_create();
2675 /* this must no happen -- exit() instead? */
2676 if (ctx == NULL)
2677 return ctx_init;
2678 }
2679
2680 return *ctx;
2681 } /* plugin_ctx_t plugin_get_ctx */
2682
plugin_set_ctx(plugin_ctx_t ctx)2683 EXPORT plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) {
2684 plugin_ctx_t *c;
2685 plugin_ctx_t old;
2686
2687 assert(plugin_ctx_key_initialized);
2688 c = pthread_getspecific(plugin_ctx_key);
2689
2690 if (c == NULL) {
2691 c = plugin_ctx_create();
2692 /* this must no happen -- exit() instead? */
2693 if (c == NULL)
2694 return ctx_init;
2695 }
2696
2697 old = *c;
2698 *c = ctx;
2699
2700 return old;
2701 } /* void plugin_set_ctx */
2702
plugin_get_interval(void)2703 EXPORT cdtime_t plugin_get_interval(void) {
2704 cdtime_t interval;
2705
2706 interval = plugin_get_ctx().interval;
2707 if (interval > 0)
2708 return interval;
2709
2710 P_ERROR("plugin_get_interval: Unable to determine Interval from context.");
2711
2712 return cf_get_default_interval();
2713 } /* cdtime_t plugin_get_interval */
2714
2715 typedef struct {
2716 plugin_ctx_t ctx;
2717 void *(*start_routine)(void *);
2718 void *arg;
2719 } plugin_thread_t;
2720
plugin_thread_start(void * arg)2721 static void *plugin_thread_start(void *arg) {
2722 plugin_thread_t *plugin_thread = arg;
2723
2724 void *(*start_routine)(void *) = plugin_thread->start_routine;
2725 void *plugin_arg = plugin_thread->arg;
2726
2727 plugin_set_ctx(plugin_thread->ctx);
2728
2729 sfree(plugin_thread);
2730
2731 return start_routine(plugin_arg);
2732 } /* void *plugin_thread_start */
2733
plugin_thread_create(pthread_t * thread,void * (* start_routine)(void *),void * arg,char const * name)2734 int plugin_thread_create(pthread_t *thread, void *(*start_routine)(void *),
2735 void *arg, char const *name) {
2736 plugin_thread_t *plugin_thread;
2737
2738 plugin_thread = malloc(sizeof(*plugin_thread));
2739 if (plugin_thread == NULL)
2740 return ENOMEM;
2741
2742 plugin_thread->ctx = plugin_get_ctx();
2743 plugin_thread->start_routine = start_routine;
2744 plugin_thread->arg = arg;
2745
2746 int ret = pthread_create(thread, NULL, plugin_thread_start, plugin_thread);
2747 if (ret != 0) {
2748 sfree(plugin_thread);
2749 return ret;
2750 }
2751
2752 if (name != NULL)
2753 set_thread_name(*thread, name);
2754
2755 return 0;
2756 } /* int plugin_thread_create */
2757