1 /**
2  * collectd - src/plugin.c
3  * Copyright (C) 2005-2014  Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  *   Sebastian Harl <sh at tokkee.org>
26  **/
27 
28 /* _GNU_SOURCE is needed in Linux to use pthread_setname_np */
29 #define _GNU_SOURCE
30 
31 #include "collectd.h"
32 
33 #include "configfile.h"
34 #include "filter_chain.h"
35 #include "plugin.h"
36 #include "utils/avltree/avltree.h"
37 #include "utils/common/common.h"
38 #include "utils/heap/heap.h"
39 #include "utils_cache.h"
40 #include "utils_complain.h"
41 #include "utils_llist.h"
42 #include "utils_random.h"
43 #include "utils_time.h"
44 
45 #ifdef WIN32
46 #define EXPORT __declspec(dllexport)
47 #include <sys/stat.h>
48 #include <unistd.h>
49 #else
50 #define EXPORT
51 #endif
52 
53 #if HAVE_PTHREAD_NP_H
54 #include <pthread_np.h> /* for pthread_set_name_np(3) */
55 #endif
56 
57 #include <dlfcn.h>
58 
59 /*
60  * Private structures
61  */
62 struct callback_func_s {
63   void *cf_callback;
64   user_data_t cf_udata;
65   plugin_ctx_t cf_ctx;
66 };
67 typedef struct callback_func_s callback_func_t;
68 
69 #define RF_SIMPLE 0
70 #define RF_COMPLEX 1
71 #define RF_REMOVE 65535
72 struct read_func_s {
73 /* `read_func_t' "inherits" from `callback_func_t'.
74  * The `rf_super' member MUST be the first one in this structure! */
75 #define rf_callback rf_super.cf_callback
76 #define rf_udata rf_super.cf_udata
77 #define rf_ctx rf_super.cf_ctx
78   callback_func_t rf_super;
79   char rf_group[DATA_MAX_NAME_LEN];
80   char *rf_name;
81   int rf_type;
82   cdtime_t rf_interval;
83   cdtime_t rf_effective_interval;
84   cdtime_t rf_next_read;
85 };
86 typedef struct read_func_s read_func_t;
87 
88 struct cache_event_func_s {
89   plugin_cache_event_cb callback;
90   char *name;
91   user_data_t user_data;
92   plugin_ctx_t plugin_ctx;
93 };
94 typedef struct cache_event_func_s cache_event_func_t;
95 
96 struct write_queue_s;
97 typedef struct write_queue_s write_queue_t;
98 struct write_queue_s {
99   value_list_t *vl;
100   plugin_ctx_t ctx;
101   write_queue_t *next;
102 };
103 
104 struct flush_callback_s {
105   char *name;
106   cdtime_t timeout;
107 };
108 typedef struct flush_callback_s flush_callback_t;
109 
110 /*
111  * Private variables
112  */
113 static c_avl_tree_t *plugins_loaded;
114 
115 static llist_t *list_init;
116 static llist_t *list_write;
117 static llist_t *list_flush;
118 static llist_t *list_missing;
119 static llist_t *list_shutdown;
120 static llist_t *list_log;
121 static llist_t *list_notification;
122 
123 static size_t list_cache_event_num;
124 static cache_event_func_t list_cache_event[32];
125 
126 static fc_chain_t *pre_cache_chain;
127 static fc_chain_t *post_cache_chain;
128 
129 static c_avl_tree_t *data_sets;
130 
131 static char *plugindir;
132 
133 #ifndef DEFAULT_MAX_READ_INTERVAL
134 #define DEFAULT_MAX_READ_INTERVAL TIME_T_TO_CDTIME_T_STATIC(86400)
135 #endif
136 static c_heap_t *read_heap;
137 static llist_t *read_list;
138 static int read_loop = 1;
139 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
140 static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER;
141 static pthread_t *read_threads;
142 static size_t read_threads_num;
143 static cdtime_t max_read_interval = DEFAULT_MAX_READ_INTERVAL;
144 
145 static write_queue_t *write_queue_head;
146 static write_queue_t *write_queue_tail;
147 static long write_queue_length;
148 static bool write_loop = true;
149 static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;
150 static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER;
151 static pthread_t *write_threads;
152 static size_t write_threads_num;
153 
154 static pthread_key_t plugin_ctx_key;
155 static bool plugin_ctx_key_initialized;
156 
157 static long write_limit_high;
158 static long write_limit_low;
159 
160 static pthread_mutex_t statistics_lock = PTHREAD_MUTEX_INITIALIZER;
161 static derive_t stats_values_dropped;
162 static bool record_statistics;
163 
164 /*
165  * Static functions
166  */
167 static int plugin_dispatch_values_internal(value_list_t *vl);
168 
plugin_get_dir(void)169 static const char *plugin_get_dir(void) {
170   if (plugindir == NULL)
171     return PLUGINDIR;
172   else
173     return plugindir;
174 }
175 
plugin_update_internal_statistics(void)176 static int plugin_update_internal_statistics(void) { /* {{{ */
177   gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
178 
179   /* Initialize `vl' */
180   value_list_t vl = VALUE_LIST_INIT;
181   sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin));
182   vl.interval = plugin_get_interval();
183 
184   /* Write queue */
185   sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance));
186 
187   /* Write queue : queue length */
188   vl.values = &(value_t){.gauge = copy_write_queue_length};
189   vl.values_len = 1;
190   sstrncpy(vl.type, "queue_length", sizeof(vl.type));
191   vl.type_instance[0] = 0;
192   plugin_dispatch_values(&vl);
193 
194   /* Write queue : Values dropped (queue length > low limit) */
195   vl.values = &(value_t){.gauge = (gauge_t)stats_values_dropped};
196   vl.values_len = 1;
197   sstrncpy(vl.type, "derive", sizeof(vl.type));
198   sstrncpy(vl.type_instance, "dropped", sizeof(vl.type_instance));
199   plugin_dispatch_values(&vl);
200 
201   /* Cache */
202   sstrncpy(vl.plugin_instance, "cache", sizeof(vl.plugin_instance));
203 
204   /* Cache : Nb entry in cache tree */
205   vl.values = &(value_t){.gauge = (gauge_t)uc_get_size()};
206   vl.values_len = 1;
207   sstrncpy(vl.type, "cache_size", sizeof(vl.type));
208   vl.type_instance[0] = 0;
209   plugin_dispatch_values(&vl);
210 
211   return 0;
212 } /* }}} int plugin_update_internal_statistics */
213 
free_userdata(user_data_t const * ud)214 static void free_userdata(user_data_t const *ud) /* {{{ */
215 {
216   if (ud == NULL)
217     return;
218 
219   if ((ud->data != NULL) && (ud->free_func != NULL)) {
220     ud->free_func(ud->data);
221   }
222 } /* }}} void free_userdata */
223 
destroy_callback(callback_func_t * cf)224 static void destroy_callback(callback_func_t *cf) /* {{{ */
225 {
226   if (cf == NULL)
227     return;
228   free_userdata(&cf->cf_udata);
229   sfree(cf);
230 } /* }}} void destroy_callback */
231 
destroy_all_callbacks(llist_t ** list)232 static void destroy_all_callbacks(llist_t **list) /* {{{ */
233 {
234   llentry_t *le;
235 
236   if (*list == NULL)
237     return;
238 
239   le = llist_head(*list);
240   while (le != NULL) {
241     llentry_t *le_next;
242 
243     le_next = le->next;
244 
245     sfree(le->key);
246     destroy_callback(le->value);
247     le->value = NULL;
248 
249     le = le_next;
250   }
251 
252   llist_destroy(*list);
253   *list = NULL;
254 } /* }}} void destroy_all_callbacks */
255 
destroy_read_heap(void)256 static void destroy_read_heap(void) /* {{{ */
257 {
258   if (read_heap == NULL)
259     return;
260 
261   while (42) {
262     read_func_t *rf;
263 
264     rf = c_heap_get_root(read_heap);
265     if (rf == NULL)
266       break;
267     sfree(rf->rf_name);
268     destroy_callback((callback_func_t *)rf);
269   }
270 
271   c_heap_destroy(read_heap);
272   read_heap = NULL;
273 } /* }}} void destroy_read_heap */
274 
register_callback(llist_t ** list,const char * name,callback_func_t * cf)275 static int register_callback(llist_t **list, /* {{{ */
276                              const char *name, callback_func_t *cf) {
277 
278   if (*list == NULL) {
279     *list = llist_create();
280     if (*list == NULL) {
281       ERROR("plugin: register_callback: "
282             "llist_create failed.");
283       destroy_callback(cf);
284       return -1;
285     }
286   }
287 
288   char *key = strdup(name);
289   if (key == NULL) {
290     ERROR("plugin: register_callback: strdup failed.");
291     destroy_callback(cf);
292     return -1;
293   }
294 
295   llentry_t *le = llist_search(*list, name);
296   if (le == NULL) {
297     le = llentry_create(key, cf);
298     if (le == NULL) {
299       ERROR("plugin: register_callback: "
300             "llentry_create failed.");
301       sfree(key);
302       destroy_callback(cf);
303       return -1;
304     }
305 
306     llist_append(*list, le);
307   } else {
308     callback_func_t *old_cf = le->value;
309     le->value = cf;
310 
311     P_WARNING("register_callback: "
312               "a callback named `%s' already exists - "
313               "overwriting the old entry!",
314               name);
315 
316     destroy_callback(old_cf);
317     sfree(key);
318   }
319 
320   return 0;
321 } /* }}} int register_callback */
322 
log_list_callbacks(llist_t ** list,const char * comment)323 static void log_list_callbacks(llist_t **list, /* {{{ */
324                                const char *comment) {
325   char *str;
326   int len;
327   int i;
328   llentry_t *le;
329   int n;
330 
331   n = llist_size(*list);
332   if (n == 0) {
333     INFO("%s: [none]", comment);
334     return;
335   }
336 
337   char **keys = calloc(n, sizeof(*keys));
338   if (keys == NULL) {
339     ERROR("%s: failed to allocate memory for list of callbacks", comment);
340     return;
341   }
342 
343   for (le = llist_head(*list), i = 0, len = 0; le != NULL; le = le->next, i++) {
344     keys[i] = le->key;
345     len += strlen(le->key) + 6;
346   }
347   str = malloc(len + 10);
348   if (str == NULL) {
349     ERROR("%s: failed to allocate memory for list of callbacks", comment);
350   } else {
351     *str = '\0';
352     strjoin(str, len, keys, n, "', '");
353     INFO("%s ['%s']", comment, str);
354     sfree(str);
355   }
356   sfree(keys);
357 } /* }}} void log_list_callbacks */
358 
create_register_callback(llist_t ** list,const char * name,void * callback,user_data_t const * ud)359 static int create_register_callback(llist_t **list, /* {{{ */
360                                     const char *name, void *callback,
361                                     user_data_t const *ud) {
362 
363   if (name == NULL || callback == NULL)
364     return EINVAL;
365 
366   callback_func_t *cf = calloc(1, sizeof(*cf));
367   if (cf == NULL) {
368     free_userdata(ud);
369     ERROR("plugin: create_register_callback: calloc failed.");
370     return ENOMEM;
371   }
372 
373   cf->cf_callback = callback;
374   if (ud == NULL) {
375     cf->cf_udata = (user_data_t){
376         .data = NULL,
377         .free_func = NULL,
378     };
379   } else {
380     cf->cf_udata = *ud;
381   }
382 
383   cf->cf_ctx = plugin_get_ctx();
384 
385   return register_callback(list, name, cf);
386 } /* }}} int create_register_callback */
387 
plugin_unregister(llist_t * list,const char * name)388 static int plugin_unregister(llist_t *list, const char *name) /* {{{ */
389 {
390   llentry_t *e;
391 
392   if (list == NULL)
393     return -1;
394 
395   e = llist_search(list, name);
396   if (e == NULL)
397     return -1;
398 
399   llist_remove(list, e);
400 
401   sfree(e->key);
402   destroy_callback(e->value);
403 
404   llentry_destroy(e);
405 
406   return 0;
407 } /* }}} int plugin_unregister */
408 
409 /* plugin_load_file loads the shared object "file" and calls its
410  * "module_register" function. Returns zero on success, non-zero otherwise. */
plugin_load_file(char const * file,bool global)411 static int plugin_load_file(char const *file, bool global) {
412   int flags = RTLD_NOW;
413   if (global)
414     flags |= RTLD_GLOBAL;
415 
416   void *dlh = dlopen(file, flags);
417   if (dlh == NULL) {
418     char errbuf[1024] = "";
419 
420     snprintf(errbuf, sizeof(errbuf),
421              "dlopen(\"%s\") failed: %s. "
422              "The most common cause for this problem is missing dependencies. "
423              "Use ldd(1) to check the dependencies of the plugin / shared "
424              "object.",
425              file, dlerror());
426 
427     /* This error is printed to STDERR unconditionally. If list_log is NULL,
428      * plugin_log() will also print to STDERR. We avoid duplicate output by
429      * checking that the list of log handlers, list_log, is not NULL. */
430     fprintf(stderr, "ERROR: %s\n", errbuf);
431     if (list_log != NULL) {
432       ERROR("%s", errbuf);
433     }
434 
435     return ENOENT;
436   }
437 
438   void (*reg_handle)(void) = dlsym(dlh, "module_register");
439   if (reg_handle == NULL) {
440     ERROR("Couldn't find symbol \"module_register\" in \"%s\": %s\n", file,
441           dlerror());
442     dlclose(dlh);
443     return ENOENT;
444   }
445 
446   (*reg_handle)();
447   return 0;
448 }
449 
plugin_read_thread(void * args)450 static void *plugin_read_thread(void __attribute__((unused)) * args) {
451   while (read_loop != 0) {
452     read_func_t *rf;
453     plugin_ctx_t old_ctx;
454     cdtime_t start;
455     cdtime_t now;
456     cdtime_t elapsed;
457     int status;
458     int rf_type;
459     int rc;
460 
461     /* Get the read function that needs to be read next.
462      * We don't need to hold "read_lock" for the heap, but we need
463      * to call c_heap_get_root() and pthread_cond_wait() in the
464      * same protected block. */
465     pthread_mutex_lock(&read_lock);
466     rf = c_heap_get_root(read_heap);
467     if (rf == NULL) {
468       pthread_cond_wait(&read_cond, &read_lock);
469       pthread_mutex_unlock(&read_lock);
470       continue;
471     }
472     pthread_mutex_unlock(&read_lock);
473 
474     if (rf->rf_interval == 0) {
475       /* this should not happen, because the interval is set
476        * for each plugin when loading it
477        * XXX: issue a warning? */
478       rf->rf_interval = plugin_get_interval();
479       rf->rf_effective_interval = rf->rf_interval;
480 
481       rf->rf_next_read = cdtime();
482     }
483 
484     /* sleep until this entry is due,
485      * using pthread_cond_timedwait */
486     pthread_mutex_lock(&read_lock);
487     /* In pthread_cond_timedwait, spurious wakeups are possible
488      * (and really happen, at least on NetBSD with > 1 CPU), thus
489      * we need to re-evaluate the condition every time
490      * pthread_cond_timedwait returns. */
491     rc = 0;
492     while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) {
493       rc = pthread_cond_timedwait(&read_cond, &read_lock,
494                                   &CDTIME_T_TO_TIMESPEC(rf->rf_next_read));
495     }
496 
497     /* Must hold `read_lock' when accessing `rf->rf_type'. */
498     rf_type = rf->rf_type;
499     pthread_mutex_unlock(&read_lock);
500 
501     /* Check if we're supposed to stop.. This may have interrupted
502      * the sleep, too. */
503     if (read_loop == 0) {
504       /* Insert `rf' again, so it can be free'd correctly */
505       c_heap_insert(read_heap, rf);
506       break;
507     }
508 
509     /* The entry has been marked for deletion. The linked list
510      * entry has already been removed by `plugin_unregister_read'.
511      * All we have to do here is free the `read_func_t' and
512      * continue. */
513     if (rf_type == RF_REMOVE) {
514       DEBUG("plugin_read_thread: Destroying the `%s' "
515             "callback.",
516             rf->rf_name);
517       sfree(rf->rf_name);
518       destroy_callback((callback_func_t *)rf);
519       rf = NULL;
520       continue;
521     }
522 
523     DEBUG("plugin_read_thread: Handling `%s'.", rf->rf_name);
524 
525     start = cdtime();
526 
527     old_ctx = plugin_set_ctx(rf->rf_ctx);
528 
529     if (rf_type == RF_SIMPLE) {
530       int (*callback)(void);
531 
532       callback = rf->rf_callback;
533       status = (*callback)();
534     } else {
535       plugin_read_cb callback;
536 
537       assert(rf_type == RF_COMPLEX);
538 
539       callback = rf->rf_callback;
540       status = (*callback)(&rf->rf_udata);
541     }
542 
543     plugin_set_ctx(old_ctx);
544 
545     /* If the function signals failure, we will increase the
546      * intervals in which it will be called. */
547     if (status != 0) {
548       rf->rf_effective_interval *= 2;
549       if (rf->rf_effective_interval > max_read_interval)
550         rf->rf_effective_interval = max_read_interval;
551 
552       NOTICE("read-function of plugin `%s' failed. "
553              "Will suspend it for %.3f seconds.",
554              rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
555     } else {
556       /* Success: Restore the interval, if it was changed. */
557       rf->rf_effective_interval = rf->rf_interval;
558     }
559 
560     /* update the ``next read due'' field */
561     now = cdtime();
562 
563     /* calculate the time spent in the read function */
564     elapsed = (now - start);
565 
566     if (elapsed > rf->rf_effective_interval)
567       WARNING(
568           "plugin_read_thread: read-function of the `%s' plugin took %.3f "
569           "seconds, which is above its read interval (%.3f seconds). You might "
570           "want to adjust the `Interval' or `ReadThreads' settings.",
571           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed),
572           CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
573 
574     DEBUG("plugin_read_thread: read-function of the `%s' plugin took "
575           "%.6f seconds.",
576           rf->rf_name, CDTIME_T_TO_DOUBLE(elapsed));
577 
578     DEBUG("plugin_read_thread: Effective interval of the "
579           "`%s' plugin is %.3f seconds.",
580           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_effective_interval));
581 
582     /* Calculate the next (absolute) time at which this function
583      * should be called. */
584     rf->rf_next_read += rf->rf_effective_interval;
585 
586     /* Check, if `rf_next_read' is in the past. */
587     if (rf->rf_next_read < now) {
588       /* `rf_next_read' is in the past. Insert `now'
589        * so this value doesn't trail off into the
590        * past too much. */
591       rf->rf_next_read = now;
592     }
593 
594     DEBUG("plugin_read_thread: Next read of the `%s' plugin at %.3f.",
595           rf->rf_name, CDTIME_T_TO_DOUBLE(rf->rf_next_read));
596 
597     /* Re-insert this read function into the heap again. */
598     c_heap_insert(read_heap, rf);
599   } /* while (read_loop) */
600 
601   pthread_exit(NULL);
602   return (void *)0;
603 } /* void *plugin_read_thread */
604 
605 #ifdef PTHREAD_MAX_NAMELEN_NP
606 #define THREAD_NAME_MAX PTHREAD_MAX_NAMELEN_NP
607 #else
608 #define THREAD_NAME_MAX 16
609 #endif
610 
set_thread_name(pthread_t tid,char const * name)611 static void set_thread_name(pthread_t tid, char const *name) {
612 #if defined(HAVE_PTHREAD_SETNAME_NP) || defined(HAVE_PTHREAD_SET_NAME_NP)
613 
614   /* glibc limits the length of the name and fails if the passed string
615    * is too long, so we truncate it here. */
616   char n[THREAD_NAME_MAX];
617   if (strlen(name) >= THREAD_NAME_MAX)
618     WARNING("set_thread_name(\"%s\"): name too long", name);
619   sstrncpy(n, name, sizeof(n));
620 
621 #if defined(HAVE_PTHREAD_SETNAME_NP)
622   int status = pthread_setname_np(tid, n);
623   if (status != 0) {
624     ERROR("set_thread_name(\"%s\"): %s", n, STRERROR(status));
625   }
626 #else /* if defined(HAVE_PTHREAD_SET_NAME_NP) */
627   pthread_set_name_np(tid, n);
628 #endif
629 
630 #endif
631 }
632 
start_read_threads(size_t num)633 static void start_read_threads(size_t num) /* {{{ */
634 {
635   if (read_threads != NULL)
636     return;
637 
638   read_threads = calloc(num, sizeof(*read_threads));
639   if (read_threads == NULL) {
640     ERROR("plugin: start_read_threads: calloc failed.");
641     return;
642   }
643 
644   read_threads_num = 0;
645   for (size_t i = 0; i < num; i++) {
646     int status = pthread_create(read_threads + read_threads_num,
647                                 /* attr = */ NULL, plugin_read_thread,
648                                 /* arg = */ NULL);
649     if (status != 0) {
650       ERROR("plugin: start_read_threads: pthread_create failed with status %i "
651             "(%s).",
652             status, STRERROR(status));
653       return;
654     }
655 
656     char name[THREAD_NAME_MAX];
657     ssnprintf(name, sizeof(name), "reader#%" PRIu64,
658               (uint64_t)read_threads_num);
659     set_thread_name(read_threads[read_threads_num], name);
660 
661     read_threads_num++;
662   } /* for (i) */
663 } /* }}} void start_read_threads */
664 
stop_read_threads(void)665 static void stop_read_threads(void) {
666   if (read_threads == NULL)
667     return;
668 
669   INFO("collectd: Stopping %" PRIsz " read threads.", read_threads_num);
670 
671   pthread_mutex_lock(&read_lock);
672   read_loop = 0;
673   DEBUG("plugin: stop_read_threads: Signalling `read_cond'");
674   pthread_cond_broadcast(&read_cond);
675   pthread_mutex_unlock(&read_lock);
676 
677   for (size_t i = 0; i < read_threads_num; i++) {
678     if (pthread_join(read_threads[i], NULL) != 0) {
679       ERROR("plugin: stop_read_threads: pthread_join failed.");
680     }
681     read_threads[i] = (pthread_t)0;
682   }
683   sfree(read_threads);
684   read_threads_num = 0;
685 } /* void stop_read_threads */
686 
plugin_value_list_free(value_list_t * vl)687 static void plugin_value_list_free(value_list_t *vl) /* {{{ */
688 {
689   if (vl == NULL)
690     return;
691 
692   meta_data_destroy(vl->meta);
693   sfree(vl->values);
694   sfree(vl);
695 } /* }}} void plugin_value_list_free */
696 
697 static value_list_t *
plugin_value_list_clone(value_list_t const * vl_orig)698 plugin_value_list_clone(value_list_t const *vl_orig) /* {{{ */
699 {
700   value_list_t *vl;
701 
702   if (vl_orig == NULL)
703     return NULL;
704 
705   vl = malloc(sizeof(*vl));
706   if (vl == NULL)
707     return NULL;
708   memcpy(vl, vl_orig, sizeof(*vl));
709 
710   if (vl->host[0] == 0)
711     sstrncpy(vl->host, hostname_g, sizeof(vl->host));
712 
713   vl->values = calloc(vl_orig->values_len, sizeof(*vl->values));
714   if (vl->values == NULL) {
715     plugin_value_list_free(vl);
716     return NULL;
717   }
718   memcpy(vl->values, vl_orig->values,
719          vl_orig->values_len * sizeof(*vl->values));
720 
721   vl->meta = meta_data_clone(vl->meta);
722   if ((vl_orig->meta != NULL) && (vl->meta == NULL)) {
723     plugin_value_list_free(vl);
724     return NULL;
725   }
726 
727   if (vl->time == 0)
728     vl->time = cdtime();
729 
730   /* Fill in the interval from the thread context, if it is zero. */
731   if (vl->interval == 0)
732     vl->interval = plugin_get_interval();
733 
734   return vl;
735 } /* }}} value_list_t *plugin_value_list_clone */
736 
plugin_write_enqueue(value_list_t const * vl)737 static int plugin_write_enqueue(value_list_t const *vl) /* {{{ */
738 {
739   write_queue_t *q;
740 
741   q = malloc(sizeof(*q));
742   if (q == NULL)
743     return ENOMEM;
744   q->next = NULL;
745 
746   q->vl = plugin_value_list_clone(vl);
747   if (q->vl == NULL) {
748     sfree(q);
749     return ENOMEM;
750   }
751 
752   /* Store context of caller (read plugin); otherwise, it would not be
753    * available to the write plugins when actually dispatching the
754    * value-list later on. */
755   q->ctx = plugin_get_ctx();
756 
757   pthread_mutex_lock(&write_lock);
758 
759   if (write_queue_tail == NULL) {
760     write_queue_head = q;
761     write_queue_tail = q;
762     write_queue_length = 1;
763   } else {
764     write_queue_tail->next = q;
765     write_queue_tail = q;
766     write_queue_length += 1;
767   }
768 
769   pthread_cond_signal(&write_cond);
770   pthread_mutex_unlock(&write_lock);
771 
772   return 0;
773 } /* }}} int plugin_write_enqueue */
774 
plugin_write_dequeue(void)775 static value_list_t *plugin_write_dequeue(void) /* {{{ */
776 {
777   write_queue_t *q;
778   value_list_t *vl;
779 
780   pthread_mutex_lock(&write_lock);
781 
782   while (write_loop && (write_queue_head == NULL))
783     pthread_cond_wait(&write_cond, &write_lock);
784 
785   if (write_queue_head == NULL) {
786     pthread_mutex_unlock(&write_lock);
787     return NULL;
788   }
789 
790   q = write_queue_head;
791   write_queue_head = q->next;
792   write_queue_length -= 1;
793   if (write_queue_head == NULL) {
794     write_queue_tail = NULL;
795     assert(0 == write_queue_length);
796   }
797 
798   pthread_mutex_unlock(&write_lock);
799 
800   (void)plugin_set_ctx(q->ctx);
801 
802   vl = q->vl;
803   sfree(q);
804   return vl;
805 } /* }}} value_list_t *plugin_write_dequeue */
806 
plugin_write_thread(void * args)807 static void *plugin_write_thread(void __attribute__((unused)) * args) /* {{{ */
808 {
809   while (write_loop) {
810     value_list_t *vl = plugin_write_dequeue();
811     if (vl == NULL)
812       continue;
813 
814     plugin_dispatch_values_internal(vl);
815 
816     plugin_value_list_free(vl);
817   }
818 
819   pthread_exit(NULL);
820   return (void *)0;
821 } /* }}} void *plugin_write_thread */
822 
start_write_threads(size_t num)823 static void start_write_threads(size_t num) /* {{{ */
824 {
825   if (write_threads != NULL)
826     return;
827 
828   write_threads = calloc(num, sizeof(*write_threads));
829   if (write_threads == NULL) {
830     ERROR("plugin: start_write_threads: calloc failed.");
831     return;
832   }
833 
834   write_threads_num = 0;
835   for (size_t i = 0; i < num; i++) {
836     int status = pthread_create(write_threads + write_threads_num,
837                                 /* attr = */ NULL, plugin_write_thread,
838                                 /* arg = */ NULL);
839     if (status != 0) {
840       ERROR("plugin: start_write_threads: pthread_create failed with status %i "
841             "(%s).",
842             status, STRERROR(status));
843       return;
844     }
845 
846     char name[THREAD_NAME_MAX];
847     ssnprintf(name, sizeof(name), "writer#%" PRIu64,
848               (uint64_t)write_threads_num);
849     set_thread_name(write_threads[write_threads_num], name);
850 
851     write_threads_num++;
852   } /* for (i) */
853 } /* }}} void start_write_threads */
854 
stop_write_threads(void)855 static void stop_write_threads(void) /* {{{ */
856 {
857   write_queue_t *q;
858   size_t i;
859 
860   if (write_threads == NULL)
861     return;
862 
863   INFO("collectd: Stopping %" PRIsz " write threads.", write_threads_num);
864 
865   pthread_mutex_lock(&write_lock);
866   write_loop = false;
867   DEBUG("plugin: stop_write_threads: Signalling `write_cond'");
868   pthread_cond_broadcast(&write_cond);
869   pthread_mutex_unlock(&write_lock);
870 
871   for (i = 0; i < write_threads_num; i++) {
872     if (pthread_join(write_threads[i], NULL) != 0) {
873       ERROR("plugin: stop_write_threads: pthread_join failed.");
874     }
875     write_threads[i] = (pthread_t)0;
876   }
877   sfree(write_threads);
878   write_threads_num = 0;
879 
880   pthread_mutex_lock(&write_lock);
881   i = 0;
882   for (q = write_queue_head; q != NULL;) {
883     write_queue_t *q1 = q;
884     plugin_value_list_free(q->vl);
885     q = q->next;
886     sfree(q1);
887     i++;
888   }
889   write_queue_head = NULL;
890   write_queue_tail = NULL;
891   write_queue_length = 0;
892   pthread_mutex_unlock(&write_lock);
893 
894   if (i > 0) {
895     WARNING("plugin: %" PRIsz " value list%s left after shutting down "
896             "the write threads.",
897             i, (i == 1) ? " was" : "s were");
898   }
899 } /* }}} void stop_write_threads */
900 
901 /*
902  * Public functions
903  */
plugin_set_dir(const char * dir)904 void plugin_set_dir(const char *dir) {
905   sfree(plugindir);
906 
907   if (dir == NULL) {
908     plugindir = NULL;
909     return;
910   }
911 
912   plugindir = strdup(dir);
913   if (plugindir == NULL)
914     ERROR("plugin_set_dir: strdup(\"%s\") failed", dir);
915 }
916 
plugin_is_loaded(char const * name)917 bool plugin_is_loaded(char const *name) {
918   if (plugins_loaded == NULL)
919     plugins_loaded =
920         c_avl_create((int (*)(const void *, const void *))strcasecmp);
921   assert(plugins_loaded != NULL);
922 
923   int status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
924   return status == 0;
925 }
926 
plugin_mark_loaded(char const * name)927 static int plugin_mark_loaded(char const *name) {
928   char *name_copy;
929   int status;
930 
931   name_copy = strdup(name);
932   if (name_copy == NULL)
933     return ENOMEM;
934 
935   status = c_avl_insert(plugins_loaded,
936                         /* key = */ name_copy, /* value = */ NULL);
937   return status;
938 }
939 
plugin_free_loaded(void)940 static void plugin_free_loaded(void) {
941   void *key;
942   void *value;
943 
944   if (plugins_loaded == NULL)
945     return;
946 
947   while (c_avl_pick(plugins_loaded, &key, &value) == 0) {
948     sfree(key);
949     assert(value == NULL);
950   }
951 
952   c_avl_destroy(plugins_loaded);
953   plugins_loaded = NULL;
954 }
955 
956 #define BUFSIZE 512
957 #ifdef WIN32
958 #define SHLIB_SUFFIX ".dll"
959 #else
960 #define SHLIB_SUFFIX ".so"
961 #endif
plugin_load(char const * plugin_name,bool global)962 int plugin_load(char const *plugin_name, bool global) {
963   DIR *dh;
964   const char *dir;
965   char filename[BUFSIZE] = "";
966   char typename[BUFSIZE];
967   int ret;
968   struct stat statbuf;
969   struct dirent *de;
970   int status;
971 
972   if (plugin_name == NULL)
973     return EINVAL;
974 
975   /* Check if plugin is already loaded and don't do anything in this
976    * case. */
977   if (plugin_is_loaded(plugin_name))
978     return 0;
979 
980   dir = plugin_get_dir();
981   ret = 1;
982 
983   /*
984    * XXX: Magic at work:
985    *
986    * Some of the language bindings, for example the Python and Perl
987    * plugins, need to be able to export symbols to the scripts they run.
988    * For this to happen, the "Globals" flag needs to be set.
989    * Unfortunately, this technical detail is hard to explain to the
990    * average user and she shouldn't have to worry about this, ideally.
991    * So in order to save everyone's sanity use a different default for a
992    * handful of special plugins. --octo
993    */
994   if ((strcasecmp("perl", plugin_name) == 0) ||
995       (strcasecmp("python", plugin_name) == 0))
996     global = true;
997 
998   /* `cpu' should not match `cpufreq'. To solve this we add SHLIB_SUFFIX to the
999    * type when matching the filename */
1000   status = snprintf(typename, sizeof(typename), "%s" SHLIB_SUFFIX, plugin_name);
1001   if ((status < 0) || ((size_t)status >= sizeof(typename))) {
1002     WARNING("plugin_load: Filename too long: \"%s" SHLIB_SUFFIX "\"",
1003             plugin_name);
1004     return -1;
1005   }
1006 
1007   if ((dh = opendir(dir)) == NULL) {
1008     ERROR("plugin_load: opendir (%s) failed: %s", dir, STRERRNO);
1009     return -1;
1010   }
1011 
1012   while ((de = readdir(dh)) != NULL) {
1013     if (strcasecmp(de->d_name, typename))
1014       continue;
1015 
1016     status = snprintf(filename, sizeof(filename), "%s/%s", dir, de->d_name);
1017     if ((status < 0) || ((size_t)status >= sizeof(filename))) {
1018       WARNING("plugin_load: Filename too long: \"%s/%s\"", dir, de->d_name);
1019       continue;
1020     }
1021 
1022     if (lstat(filename, &statbuf) == -1) {
1023       WARNING("plugin_load: stat (\"%s\") failed: %s", filename, STRERRNO);
1024       continue;
1025     } else if (!S_ISREG(statbuf.st_mode)) {
1026       /* don't follow symlinks */
1027       WARNING("plugin_load: %s is not a regular file.", filename);
1028       continue;
1029     }
1030 
1031     status = plugin_load_file(filename, global);
1032     if (status == 0) {
1033       /* success */
1034       plugin_mark_loaded(plugin_name);
1035       ret = 0;
1036       INFO("plugin_load: plugin \"%s\" successfully loaded.", plugin_name);
1037       break;
1038     } else {
1039       ERROR("plugin_load: Load plugin \"%s\" failed with "
1040             "status %i.",
1041             plugin_name, status);
1042     }
1043   }
1044 
1045   closedir(dh);
1046 
1047   if (filename[0] == 0)
1048     ERROR("plugin_load: Could not find plugin \"%s\" in %s", plugin_name, dir);
1049 
1050   return ret;
1051 }
1052 
1053 /*
1054  * The `register_*' functions follow
1055  */
plugin_register_config(const char * name,int (* callback)(const char * key,const char * val),const char ** keys,int keys_num)1056 EXPORT int plugin_register_config(const char *name,
1057                                   int (*callback)(const char *key,
1058                                                   const char *val),
1059                                   const char **keys, int keys_num) {
1060   cf_register(name, callback, keys, keys_num);
1061   return 0;
1062 } /* int plugin_register_config */
1063 
plugin_register_complex_config(const char * type,int (* callback)(oconfig_item_t *))1064 EXPORT int plugin_register_complex_config(const char *type,
1065                                           int (*callback)(oconfig_item_t *)) {
1066   return cf_register_complex(type, callback);
1067 } /* int plugin_register_complex_config */
1068 
plugin_register_init(const char * name,int (* callback)(void))1069 EXPORT int plugin_register_init(const char *name, int (*callback)(void)) {
1070   return create_register_callback(&list_init, name, (void *)callback, NULL);
1071 } /* plugin_register_init */
1072 
plugin_compare_read_func(const void * arg0,const void * arg1)1073 static int plugin_compare_read_func(const void *arg0, const void *arg1) {
1074   const read_func_t *rf0;
1075   const read_func_t *rf1;
1076 
1077   rf0 = arg0;
1078   rf1 = arg1;
1079 
1080   if (rf0->rf_next_read < rf1->rf_next_read)
1081     return -1;
1082   else if (rf0->rf_next_read > rf1->rf_next_read)
1083     return 1;
1084   else
1085     return 0;
1086 } /* int plugin_compare_read_func */
1087 
1088 /* Add a read function to both, the heap and a linked list. The linked list if
1089  * used to look-up read functions, especially for the remove function. The heap
1090  * is used to determine which plugin to read next. */
plugin_insert_read(read_func_t * rf)1091 static int plugin_insert_read(read_func_t *rf) {
1092   int status;
1093   llentry_t *le;
1094 
1095   rf->rf_next_read = cdtime();
1096   rf->rf_effective_interval = rf->rf_interval;
1097 
1098   pthread_mutex_lock(&read_lock);
1099 
1100   if (read_list == NULL) {
1101     read_list = llist_create();
1102     if (read_list == NULL) {
1103       pthread_mutex_unlock(&read_lock);
1104       ERROR("plugin_insert_read: read_list failed.");
1105       return -1;
1106     }
1107   }
1108 
1109   if (read_heap == NULL) {
1110     read_heap = c_heap_create(plugin_compare_read_func);
1111     if (read_heap == NULL) {
1112       pthread_mutex_unlock(&read_lock);
1113       ERROR("plugin_insert_read: c_heap_create failed.");
1114       return -1;
1115     }
1116   }
1117 
1118   le = llist_search(read_list, rf->rf_name);
1119   if (le != NULL) {
1120     pthread_mutex_unlock(&read_lock);
1121     P_WARNING("The read function \"%s\" is already registered. "
1122               "Check for duplicates in your configuration!",
1123               rf->rf_name);
1124     return EINVAL;
1125   }
1126 
1127   le = llentry_create(rf->rf_name, rf);
1128   if (le == NULL) {
1129     pthread_mutex_unlock(&read_lock);
1130     ERROR("plugin_insert_read: llentry_create failed.");
1131     return -1;
1132   }
1133 
1134   status = c_heap_insert(read_heap, rf);
1135   if (status != 0) {
1136     pthread_mutex_unlock(&read_lock);
1137     ERROR("plugin_insert_read: c_heap_insert failed.");
1138     llentry_destroy(le);
1139     return -1;
1140   }
1141 
1142   /* This does not fail. */
1143   llist_append(read_list, le);
1144 
1145   /* Wake up all the read threads. */
1146   pthread_cond_broadcast(&read_cond);
1147   pthread_mutex_unlock(&read_lock);
1148   return 0;
1149 } /* int plugin_insert_read */
1150 
plugin_register_read(const char * name,int (* callback)(void))1151 EXPORT int plugin_register_read(const char *name, int (*callback)(void)) {
1152   read_func_t *rf;
1153   int status;
1154 
1155   rf = calloc(1, sizeof(*rf));
1156   if (rf == NULL) {
1157     ERROR("plugin_register_read: calloc failed.");
1158     return ENOMEM;
1159   }
1160 
1161   rf->rf_callback = (void *)callback;
1162   rf->rf_udata.data = NULL;
1163   rf->rf_udata.free_func = NULL;
1164   rf->rf_ctx = plugin_get_ctx();
1165   rf->rf_group[0] = '\0';
1166   rf->rf_name = strdup(name);
1167   rf->rf_type = RF_SIMPLE;
1168   rf->rf_interval = plugin_get_interval();
1169   rf->rf_ctx.interval = rf->rf_interval;
1170 
1171   status = plugin_insert_read(rf);
1172   if (status != 0) {
1173     sfree(rf->rf_name);
1174     sfree(rf);
1175   }
1176 
1177   return status;
1178 } /* int plugin_register_read */
1179 
plugin_register_complex_read(const char * group,const char * name,plugin_read_cb callback,cdtime_t interval,user_data_t const * user_data)1180 EXPORT int plugin_register_complex_read(const char *group, const char *name,
1181                                         plugin_read_cb callback,
1182                                         cdtime_t interval,
1183                                         user_data_t const *user_data) {
1184   read_func_t *rf;
1185   int status;
1186 
1187   rf = calloc(1, sizeof(*rf));
1188   if (rf == NULL) {
1189     free_userdata(user_data);
1190     ERROR("plugin_register_complex_read: calloc failed.");
1191     return ENOMEM;
1192   }
1193 
1194   rf->rf_callback = (void *)callback;
1195   if (group != NULL)
1196     sstrncpy(rf->rf_group, group, sizeof(rf->rf_group));
1197   else
1198     rf->rf_group[0] = '\0';
1199   rf->rf_name = strdup(name);
1200   rf->rf_type = RF_COMPLEX;
1201   rf->rf_interval = (interval != 0) ? interval : plugin_get_interval();
1202 
1203   /* Set user data */
1204   if (user_data == NULL) {
1205     rf->rf_udata.data = NULL;
1206     rf->rf_udata.free_func = NULL;
1207   } else {
1208     rf->rf_udata = *user_data;
1209   }
1210 
1211   rf->rf_ctx = plugin_get_ctx();
1212   rf->rf_ctx.interval = rf->rf_interval;
1213 
1214   status = plugin_insert_read(rf);
1215   if (status != 0) {
1216     free_userdata(&rf->rf_udata);
1217     sfree(rf->rf_name);
1218     sfree(rf);
1219   }
1220 
1221   return status;
1222 } /* int plugin_register_complex_read */
1223 
plugin_register_write(const char * name,plugin_write_cb callback,user_data_t const * ud)1224 EXPORT int plugin_register_write(const char *name, plugin_write_cb callback,
1225                                  user_data_t const *ud) {
1226   return create_register_callback(&list_write, name, (void *)callback, ud);
1227 } /* int plugin_register_write */
1228 
plugin_flush_timeout_callback(user_data_t * ud)1229 static int plugin_flush_timeout_callback(user_data_t *ud) {
1230   flush_callback_t *cb = ud->data;
1231 
1232   return plugin_flush(cb->name, cb->timeout, NULL);
1233 } /* static int plugin_flush_callback */
1234 
plugin_flush_timeout_callback_free(void * data)1235 static void plugin_flush_timeout_callback_free(void *data) {
1236   flush_callback_t *cb = data;
1237 
1238   if (cb == NULL)
1239     return;
1240 
1241   sfree(cb->name);
1242   sfree(cb);
1243 } /* static void plugin_flush_callback_free */
1244 
plugin_flush_callback_name(const char * name)1245 static char *plugin_flush_callback_name(const char *name) {
1246   const char *flush_prefix = "flush/";
1247   size_t prefix_size;
1248   char *flush_name;
1249   size_t name_size;
1250 
1251   prefix_size = strlen(flush_prefix);
1252   name_size = strlen(name);
1253 
1254   flush_name = malloc(name_size + prefix_size + 1);
1255   if (flush_name == NULL) {
1256     ERROR("plugin_flush_callback_name: malloc failed.");
1257     return NULL;
1258   }
1259 
1260   sstrncpy(flush_name, flush_prefix, prefix_size + 1);
1261   sstrncpy(flush_name + prefix_size, name, name_size + 1);
1262 
1263   return flush_name;
1264 } /* static char *plugin_flush_callback_name */
1265 
plugin_register_flush(const char * name,plugin_flush_cb callback,user_data_t const * ud)1266 EXPORT int plugin_register_flush(const char *name, plugin_flush_cb callback,
1267                                  user_data_t const *ud) {
1268   int status;
1269   plugin_ctx_t ctx = plugin_get_ctx();
1270 
1271   status = create_register_callback(&list_flush, name, (void *)callback, ud);
1272   if (status != 0)
1273     return status;
1274 
1275   if (ctx.flush_interval != 0) {
1276     char *flush_name;
1277     flush_callback_t *cb;
1278 
1279     flush_name = plugin_flush_callback_name(name);
1280     if (flush_name == NULL)
1281       return -1;
1282 
1283     cb = malloc(sizeof(*cb));
1284     if (cb == NULL) {
1285       ERROR("plugin_register_flush: malloc failed.");
1286       sfree(flush_name);
1287       return -1;
1288     }
1289 
1290     cb->name = strdup(name);
1291     if (cb->name == NULL) {
1292       ERROR("plugin_register_flush: strdup failed.");
1293       sfree(cb);
1294       sfree(flush_name);
1295       return -1;
1296     }
1297     cb->timeout = ctx.flush_timeout;
1298 
1299     status = plugin_register_complex_read(
1300         /* group     = */ "flush",
1301         /* name      = */ flush_name,
1302         /* callback  = */ plugin_flush_timeout_callback,
1303         /* interval  = */ ctx.flush_interval,
1304         /* user data = */
1305         &(user_data_t){
1306             .data = cb,
1307             .free_func = plugin_flush_timeout_callback_free,
1308         });
1309 
1310     sfree(flush_name);
1311     return status;
1312   }
1313 
1314   return 0;
1315 } /* int plugin_register_flush */
1316 
plugin_register_missing(const char * name,plugin_missing_cb callback,user_data_t const * ud)1317 EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback,
1318                                    user_data_t const *ud) {
1319   return create_register_callback(&list_missing, name, (void *)callback, ud);
1320 } /* int plugin_register_missing */
1321 
plugin_register_cache_event(const char * name,plugin_cache_event_cb callback,user_data_t const * ud)1322 EXPORT int plugin_register_cache_event(const char *name,
1323                                        plugin_cache_event_cb callback,
1324                                        user_data_t const *ud) {
1325 
1326   if (name == NULL || callback == NULL)
1327     return EINVAL;
1328 
1329   char *name_copy = strdup(name);
1330   if (name_copy == NULL) {
1331     P_ERROR("plugin_register_cache_event: strdup failed.");
1332     free_userdata(ud);
1333     return ENOMEM;
1334   }
1335 
1336   if (list_cache_event_num >= 32) {
1337     P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried "
1338             "to be registered.");
1339     free_userdata(ud);
1340     return ENOMEM;
1341   }
1342 
1343   for (size_t i = 0; i < list_cache_event_num; i++) {
1344     cache_event_func_t *cef = &list_cache_event[i];
1345     if (!cef->callback)
1346       continue;
1347 
1348     if (strcmp(name, cef->name) == 0) {
1349       P_ERROR("plugin_register_cache_event: a callback named `%s' already "
1350               "registered!",
1351               name);
1352       free_userdata(ud);
1353       return -1;
1354     }
1355   }
1356 
1357   user_data_t user_data;
1358   if (ud == NULL) {
1359     user_data = (user_data_t){
1360         .data = NULL,
1361         .free_func = NULL,
1362     };
1363   } else {
1364     user_data = *ud;
1365   }
1366 
1367   list_cache_event[list_cache_event_num] =
1368       (cache_event_func_t){.callback = callback,
1369                            .name = name_copy,
1370                            .user_data = user_data,
1371                            .plugin_ctx = plugin_get_ctx()};
1372   list_cache_event_num++;
1373 
1374   return 0;
1375 } /* int plugin_register_cache_event */
1376 
plugin_register_shutdown(const char * name,int (* callback)(void))1377 EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) {
1378   return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
1379 } /* int plugin_register_shutdown */
1380 
plugin_free_data_sets(void)1381 static void plugin_free_data_sets(void) {
1382   void *key;
1383   void *value;
1384 
1385   if (data_sets == NULL)
1386     return;
1387 
1388   while (c_avl_pick(data_sets, &key, &value) == 0) {
1389     data_set_t *ds = value;
1390     /* key is a pointer to ds->type */
1391 
1392     sfree(ds->ds);
1393     sfree(ds);
1394   }
1395 
1396   c_avl_destroy(data_sets);
1397   data_sets = NULL;
1398 } /* void plugin_free_data_sets */
1399 
plugin_register_data_set(const data_set_t * ds)1400 EXPORT int plugin_register_data_set(const data_set_t *ds) {
1401   data_set_t *ds_copy;
1402 
1403   if ((data_sets != NULL) && (c_avl_get(data_sets, ds->type, NULL) == 0)) {
1404     NOTICE("Replacing DS `%s' with another version.", ds->type);
1405     plugin_unregister_data_set(ds->type);
1406   } else if (data_sets == NULL) {
1407     data_sets = c_avl_create((int (*)(const void *, const void *))strcmp);
1408     if (data_sets == NULL)
1409       return -1;
1410   }
1411 
1412   ds_copy = malloc(sizeof(*ds_copy));
1413   if (ds_copy == NULL)
1414     return -1;
1415   memcpy(ds_copy, ds, sizeof(data_set_t));
1416 
1417   ds_copy->ds = malloc(sizeof(*ds_copy->ds) * ds->ds_num);
1418   if (ds_copy->ds == NULL) {
1419     sfree(ds_copy);
1420     return -1;
1421   }
1422 
1423   for (size_t i = 0; i < ds->ds_num; i++)
1424     memcpy(ds_copy->ds + i, ds->ds + i, sizeof(data_source_t));
1425 
1426   return c_avl_insert(data_sets, (void *)ds_copy->type, (void *)ds_copy);
1427 } /* int plugin_register_data_set */
1428 
plugin_register_log(const char * name,plugin_log_cb callback,user_data_t const * ud)1429 EXPORT int plugin_register_log(const char *name, plugin_log_cb callback,
1430                                user_data_t const *ud) {
1431   return create_register_callback(&list_log, name, (void *)callback, ud);
1432 } /* int plugin_register_log */
1433 
plugin_register_notification(const char * name,plugin_notification_cb callback,user_data_t const * ud)1434 EXPORT int plugin_register_notification(const char *name,
1435                                         plugin_notification_cb callback,
1436                                         user_data_t const *ud) {
1437   return create_register_callback(&list_notification, name, (void *)callback,
1438                                   ud);
1439 } /* int plugin_register_log */
1440 
plugin_unregister_config(const char * name)1441 EXPORT int plugin_unregister_config(const char *name) {
1442   cf_unregister(name);
1443   return 0;
1444 } /* int plugin_unregister_config */
1445 
plugin_unregister_complex_config(const char * name)1446 EXPORT int plugin_unregister_complex_config(const char *name) {
1447   cf_unregister_complex(name);
1448   return 0;
1449 } /* int plugin_unregister_complex_config */
1450 
plugin_unregister_init(const char * name)1451 EXPORT int plugin_unregister_init(const char *name) {
1452   return plugin_unregister(list_init, name);
1453 }
1454 
plugin_unregister_read(const char * name)1455 EXPORT int plugin_unregister_read(const char *name) /* {{{ */
1456 {
1457   llentry_t *le;
1458   read_func_t *rf;
1459 
1460   if (name == NULL)
1461     return -ENOENT;
1462 
1463   pthread_mutex_lock(&read_lock);
1464 
1465   if (read_list == NULL) {
1466     pthread_mutex_unlock(&read_lock);
1467     return -ENOENT;
1468   }
1469 
1470   le = llist_search(read_list, name);
1471   if (le == NULL) {
1472     pthread_mutex_unlock(&read_lock);
1473     WARNING("plugin_unregister_read: No such read function: %s", name);
1474     return -ENOENT;
1475   }
1476 
1477   llist_remove(read_list, le);
1478 
1479   rf = le->value;
1480   assert(rf != NULL);
1481   rf->rf_type = RF_REMOVE;
1482 
1483   pthread_mutex_unlock(&read_lock);
1484 
1485   llentry_destroy(le);
1486 
1487   DEBUG("plugin_unregister_read: Marked `%s' for removal.", name);
1488 
1489   return 0;
1490 } /* }}} int plugin_unregister_read */
1491 
plugin_log_available_writers(void)1492 EXPORT void plugin_log_available_writers(void) {
1493   log_list_callbacks(&list_write, "Available write targets:");
1494 }
1495 
compare_read_func_group(llentry_t * e,void * ud)1496 static int compare_read_func_group(llentry_t *e, void *ud) /* {{{ */
1497 {
1498   read_func_t *rf = e->value;
1499   char *group = ud;
1500 
1501   return strcmp(rf->rf_group, (const char *)group);
1502 } /* }}} int compare_read_func_group */
1503 
plugin_unregister_read_group(const char * group)1504 EXPORT int plugin_unregister_read_group(const char *group) /* {{{ */
1505 {
1506   llentry_t *le;
1507   read_func_t *rf;
1508 
1509   int found = 0;
1510 
1511   if (group == NULL)
1512     return -ENOENT;
1513 
1514   pthread_mutex_lock(&read_lock);
1515 
1516   if (read_list == NULL) {
1517     pthread_mutex_unlock(&read_lock);
1518     return -ENOENT;
1519   }
1520 
1521   while (42) {
1522     le = llist_search_custom(read_list, compare_read_func_group, (void *)group);
1523 
1524     if (le == NULL)
1525       break;
1526 
1527     ++found;
1528 
1529     llist_remove(read_list, le);
1530 
1531     rf = le->value;
1532     assert(rf != NULL);
1533     rf->rf_type = RF_REMOVE;
1534 
1535     llentry_destroy(le);
1536 
1537     DEBUG("plugin_unregister_read_group: "
1538           "Marked `%s' (group `%s') for removal.",
1539           rf->rf_name, group);
1540   }
1541 
1542   pthread_mutex_unlock(&read_lock);
1543 
1544   if (found == 0) {
1545     WARNING("plugin_unregister_read_group: No such "
1546             "group of read function: %s",
1547             group);
1548     return -ENOENT;
1549   }
1550 
1551   return 0;
1552 } /* }}} int plugin_unregister_read_group */
1553 
plugin_unregister_write(const char * name)1554 EXPORT int plugin_unregister_write(const char *name) {
1555   return plugin_unregister(list_write, name);
1556 }
1557 
plugin_unregister_flush(const char * name)1558 EXPORT int plugin_unregister_flush(const char *name) {
1559   plugin_ctx_t ctx = plugin_get_ctx();
1560 
1561   if (ctx.flush_interval != 0) {
1562     char *flush_name;
1563 
1564     flush_name = plugin_flush_callback_name(name);
1565     if (flush_name != NULL) {
1566       plugin_unregister_read(flush_name);
1567       sfree(flush_name);
1568     }
1569   }
1570 
1571   return plugin_unregister(list_flush, name);
1572 }
1573 
plugin_unregister_missing(const char * name)1574 EXPORT int plugin_unregister_missing(const char *name) {
1575   return plugin_unregister(list_missing, name);
1576 }
1577 
plugin_unregister_cache_event(const char * name)1578 EXPORT int plugin_unregister_cache_event(const char *name) {
1579   for (size_t i = 0; i < list_cache_event_num; i++) {
1580     cache_event_func_t *cef = &list_cache_event[i];
1581     if (!cef->callback)
1582       continue;
1583     if (strcmp(name, cef->name) == 0) {
1584       /* Mark callback as inactive, so mask in cache entries remains actual */
1585       cef->callback = NULL;
1586       sfree(cef->name);
1587       free_userdata(&cef->user_data);
1588     }
1589   }
1590   return 0;
1591 }
1592 
destroy_cache_event_callbacks()1593 static void destroy_cache_event_callbacks() {
1594   for (size_t i = 0; i < list_cache_event_num; i++) {
1595     cache_event_func_t *cef = &list_cache_event[i];
1596     if (!cef->callback)
1597       continue;
1598     cef->callback = NULL;
1599     sfree(cef->name);
1600     free_userdata(&cef->user_data);
1601   }
1602 }
1603 
plugin_unregister_shutdown(const char * name)1604 EXPORT int plugin_unregister_shutdown(const char *name) {
1605   return plugin_unregister(list_shutdown, name);
1606 }
1607 
plugin_unregister_data_set(const char * name)1608 EXPORT int plugin_unregister_data_set(const char *name) {
1609   data_set_t *ds;
1610 
1611   if (data_sets == NULL)
1612     return -1;
1613 
1614   if (c_avl_remove(data_sets, name, NULL, (void *)&ds) != 0)
1615     return -1;
1616 
1617   sfree(ds->ds);
1618   sfree(ds);
1619 
1620   return 0;
1621 } /* int plugin_unregister_data_set */
1622 
plugin_unregister_log(const char * name)1623 EXPORT int plugin_unregister_log(const char *name) {
1624   return plugin_unregister(list_log, name);
1625 }
1626 
plugin_unregister_notification(const char * name)1627 EXPORT int plugin_unregister_notification(const char *name) {
1628   return plugin_unregister(list_notification, name);
1629 }
1630 
plugin_init_all(void)1631 EXPORT int plugin_init_all(void) {
1632   char const *chain_name;
1633   llentry_t *le;
1634   int status;
1635   int ret = 0;
1636 
1637   /* Init the value cache */
1638   uc_init();
1639 
1640   if (IS_TRUE(global_option_get("CollectInternalStats"))) {
1641     record_statistics = true;
1642     plugin_register_read("collectd", plugin_update_internal_statistics);
1643   }
1644 
1645   chain_name = global_option_get("PreCacheChain");
1646   pre_cache_chain = fc_chain_get_by_name(chain_name);
1647 
1648   chain_name = global_option_get("PostCacheChain");
1649   post_cache_chain = fc_chain_get_by_name(chain_name);
1650 
1651   write_limit_high = global_option_get_long("WriteQueueLimitHigh",
1652                                             /* default = */ 0);
1653   if (write_limit_high < 0) {
1654     ERROR("WriteQueueLimitHigh must be positive or zero.");
1655     write_limit_high = 0;
1656   }
1657 
1658   write_limit_low =
1659       global_option_get_long("WriteQueueLimitLow",
1660                              /* default = */ write_limit_high / 2);
1661   if (write_limit_low < 0) {
1662     ERROR("WriteQueueLimitLow must be positive or zero.");
1663     write_limit_low = write_limit_high / 2;
1664   } else if (write_limit_low > write_limit_high) {
1665     ERROR("WriteQueueLimitLow must not be larger than "
1666           "WriteQueueLimitHigh.");
1667     write_limit_low = write_limit_high;
1668   }
1669 
1670   write_threads_num = global_option_get_long("WriteThreads",
1671                                              /* default = */ 5);
1672   if (write_threads_num < 1) {
1673     ERROR("WriteThreads must be positive.");
1674     write_threads_num = 5;
1675   }
1676 
1677   if ((list_init == NULL) && (read_heap == NULL))
1678     return ret;
1679 
1680   /* Calling all init callbacks before checking if read callbacks
1681    * are available allows the init callbacks to register the read
1682    * callback. */
1683   le = llist_head(list_init);
1684   while (le != NULL) {
1685     callback_func_t *cf;
1686     plugin_init_cb callback;
1687     plugin_ctx_t old_ctx;
1688 
1689     cf = le->value;
1690     old_ctx = plugin_set_ctx(cf->cf_ctx);
1691     callback = cf->cf_callback;
1692     status = (*callback)();
1693     plugin_set_ctx(old_ctx);
1694 
1695     if (status != 0) {
1696       ERROR("Initialization of plugin `%s' "
1697             "failed with status %i. "
1698             "Plugin will be unloaded.",
1699             le->key, status);
1700       /* Plugins that register read callbacks from the init
1701        * callback should take care of appropriate error
1702        * handling themselves. */
1703       /* FIXME: Unload _all_ functions */
1704       plugin_unregister_read(le->key);
1705       ret = -1;
1706     }
1707 
1708     le = le->next;
1709   }
1710 
1711   start_write_threads((size_t)write_threads_num);
1712 
1713   max_read_interval =
1714       global_option_get_time("MaxReadInterval", DEFAULT_MAX_READ_INTERVAL);
1715 
1716   /* Start read-threads */
1717   if (read_heap != NULL) {
1718     const char *rt;
1719     int num;
1720 
1721     rt = global_option_get("ReadThreads");
1722     num = atoi(rt);
1723     if (num != -1)
1724       start_read_threads((num > 0) ? ((size_t)num) : 5);
1725   }
1726   return ret;
1727 } /* void plugin_init_all */
1728 
1729 /* TODO: Rename this function. */
plugin_read_all(void)1730 EXPORT void plugin_read_all(void) {
1731   uc_check_timeout();
1732 
1733   return;
1734 } /* void plugin_read_all */
1735 
1736 /* Read function called when the `-T' command line argument is given. */
plugin_read_all_once(void)1737 EXPORT int plugin_read_all_once(void) {
1738   int status;
1739   int return_status = 0;
1740 
1741   if (read_heap == NULL) {
1742     NOTICE("No read-functions are registered.");
1743     return 0;
1744   }
1745 
1746   while (42) {
1747     read_func_t *rf;
1748     plugin_ctx_t old_ctx;
1749 
1750     rf = c_heap_get_root(read_heap);
1751     if (rf == NULL)
1752       break;
1753 
1754     old_ctx = plugin_set_ctx(rf->rf_ctx);
1755 
1756     if (rf->rf_type == RF_SIMPLE) {
1757       int (*callback)(void);
1758 
1759       callback = rf->rf_callback;
1760       status = (*callback)();
1761     } else {
1762       plugin_read_cb callback;
1763 
1764       callback = rf->rf_callback;
1765       status = (*callback)(&rf->rf_udata);
1766     }
1767 
1768     plugin_set_ctx(old_ctx);
1769 
1770     if (status != 0) {
1771       NOTICE("read-function of plugin `%s' failed.", rf->rf_name);
1772       return_status = -1;
1773     }
1774 
1775     sfree(rf->rf_name);
1776     destroy_callback((void *)rf);
1777   }
1778 
1779   return return_status;
1780 } /* int plugin_read_all_once */
1781 
plugin_write(const char * plugin,const data_set_t * ds,const value_list_t * vl)1782 EXPORT int plugin_write(const char *plugin, /* {{{ */
1783                         const data_set_t *ds, const value_list_t *vl) {
1784   llentry_t *le;
1785   int status;
1786 
1787   if (vl == NULL)
1788     return EINVAL;
1789 
1790   if (list_write == NULL)
1791     return ENOENT;
1792 
1793   if (ds == NULL) {
1794     ds = plugin_get_ds(vl->type);
1795     if (ds == NULL) {
1796       ERROR("plugin_write: Unable to lookup type `%s'.", vl->type);
1797       return ENOENT;
1798     }
1799   }
1800 
1801   if (plugin == NULL) {
1802     int success = 0;
1803     int failure = 0;
1804 
1805     le = llist_head(list_write);
1806     while (le != NULL) {
1807       callback_func_t *cf = le->value;
1808       plugin_write_cb callback;
1809 
1810       /* Keep the read plugin's interval and flush information but update the
1811        * plugin name. */
1812       plugin_ctx_t old_ctx = plugin_get_ctx();
1813       plugin_ctx_t ctx = old_ctx;
1814       ctx.name = cf->cf_ctx.name;
1815       plugin_set_ctx(ctx);
1816 
1817       DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1818       callback = cf->cf_callback;
1819       status = (*callback)(ds, vl, &cf->cf_udata);
1820       if (status != 0)
1821         failure++;
1822       else
1823         success++;
1824 
1825       plugin_set_ctx(old_ctx);
1826       le = le->next;
1827     }
1828 
1829     if ((success == 0) && (failure != 0))
1830       status = -1;
1831     else
1832       status = 0;
1833   } else /* plugin != NULL */
1834   {
1835     callback_func_t *cf;
1836     plugin_write_cb callback;
1837 
1838     le = llist_head(list_write);
1839     while (le != NULL) {
1840       if (strcasecmp(plugin, le->key) == 0)
1841         break;
1842 
1843       le = le->next;
1844     }
1845 
1846     if (le == NULL)
1847       return ENOENT;
1848 
1849     cf = le->value;
1850 
1851     /* do not switch plugin context; rather keep the context (interval)
1852      * information of the calling read plugin */
1853 
1854     DEBUG("plugin: plugin_write: Writing values via %s.", le->key);
1855     callback = cf->cf_callback;
1856     status = (*callback)(ds, vl, &cf->cf_udata);
1857   }
1858 
1859   return status;
1860 } /* }}} int plugin_write */
1861 
plugin_flush(const char * plugin,cdtime_t timeout,const char * identifier)1862 EXPORT int plugin_flush(const char *plugin, cdtime_t timeout,
1863                         const char *identifier) {
1864   llentry_t *le;
1865 
1866   if (list_flush == NULL)
1867     return 0;
1868 
1869   le = llist_head(list_flush);
1870   while (le != NULL) {
1871     callback_func_t *cf;
1872     plugin_flush_cb callback;
1873     plugin_ctx_t old_ctx;
1874 
1875     if ((plugin != NULL) && (strcmp(plugin, le->key) != 0)) {
1876       le = le->next;
1877       continue;
1878     }
1879 
1880     cf = le->value;
1881     old_ctx = plugin_set_ctx(cf->cf_ctx);
1882     callback = cf->cf_callback;
1883 
1884     (*callback)(timeout, identifier, &cf->cf_udata);
1885 
1886     plugin_set_ctx(old_ctx);
1887 
1888     le = le->next;
1889   }
1890   return 0;
1891 } /* int plugin_flush */
1892 
plugin_shutdown_all(void)1893 EXPORT int plugin_shutdown_all(void) {
1894   llentry_t *le;
1895   int ret = 0; // Assume success.
1896 
1897   destroy_all_callbacks(&list_init);
1898 
1899   stop_read_threads();
1900 
1901   pthread_mutex_lock(&read_lock);
1902   llist_destroy(read_list);
1903   read_list = NULL;
1904   pthread_mutex_unlock(&read_lock);
1905 
1906   destroy_read_heap();
1907 
1908   /* blocks until all write threads have shut down. */
1909   stop_write_threads();
1910 
1911   /* ask all plugins to write out the state they kept. */
1912   plugin_flush(/* plugin = */ NULL,
1913                /* timeout = */ 0,
1914                /* identifier = */ NULL);
1915 
1916   le = NULL;
1917   if (list_shutdown != NULL)
1918     le = llist_head(list_shutdown);
1919 
1920   while (le != NULL) {
1921     callback_func_t *cf;
1922     plugin_shutdown_cb callback;
1923     plugin_ctx_t old_ctx;
1924 
1925     cf = le->value;
1926     old_ctx = plugin_set_ctx(cf->cf_ctx);
1927     callback = cf->cf_callback;
1928 
1929     /* Advance the pointer before calling the callback allows
1930      * shutdown functions to unregister themselves. If done the
1931      * other way around the memory `le' points to will be freed
1932      * after callback returns. */
1933     le = le->next;
1934 
1935     if ((*callback)() != 0)
1936       ret = -1;
1937 
1938     plugin_set_ctx(old_ctx);
1939   }
1940 
1941   /* Write plugins which use the `user_data' pointer usually need the
1942    * same data available to the flush callback. If this is the case, set
1943    * the free_function to NULL when registering the flush callback and to
1944    * the real free function when registering the write callback. This way
1945    * the data isn't freed twice. */
1946   destroy_all_callbacks(&list_flush);
1947   destroy_all_callbacks(&list_missing);
1948   destroy_cache_event_callbacks();
1949   destroy_all_callbacks(&list_write);
1950 
1951   destroy_all_callbacks(&list_notification);
1952   destroy_all_callbacks(&list_shutdown);
1953   destroy_all_callbacks(&list_log);
1954 
1955   plugin_free_loaded();
1956   plugin_free_data_sets();
1957   return ret;
1958 } /* void plugin_shutdown_all */
1959 
plugin_dispatch_missing(const value_list_t * vl)1960 EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
1961 {
1962   if (list_missing == NULL)
1963     return 0;
1964 
1965   llentry_t *le = llist_head(list_missing);
1966   while (le != NULL) {
1967     callback_func_t *cf = le->value;
1968     plugin_ctx_t old_ctx = plugin_set_ctx(cf->cf_ctx);
1969     plugin_missing_cb callback = cf->cf_callback;
1970 
1971     int status = (*callback)(vl, &cf->cf_udata);
1972     plugin_set_ctx(old_ctx);
1973     if (status != 0) {
1974       if (status < 0) {
1975         ERROR("plugin_dispatch_missing: Callback function \"%s\" "
1976               "failed with status %i.",
1977               le->key, status);
1978         return status;
1979       } else {
1980         return 0;
1981       }
1982     }
1983 
1984     le = le->next;
1985   }
1986   return 0;
1987 } /* int }}} plugin_dispatch_missing */
1988 
plugin_dispatch_cache_event(enum cache_event_type_e event_type,unsigned long callbacks_mask,const char * name,const value_list_t * vl)1989 void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
1990                                  unsigned long callbacks_mask, const char *name,
1991                                  const value_list_t *vl) {
1992   switch (event_type) {
1993   case CE_VALUE_NEW:
1994     callbacks_mask = 0;
1995     for (size_t i = 0; i < list_cache_event_num; i++) {
1996       cache_event_func_t *cef = &list_cache_event[i];
1997       plugin_cache_event_cb callback = cef->callback;
1998 
1999       if (!callback)
2000         continue;
2001 
2002       cache_event_t event = (cache_event_t){.type = event_type,
2003                                             .value_list = vl,
2004                                             .value_list_name = name,
2005                                             .ret = 0};
2006 
2007       plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
2008       int status = (*callback)(&event, &cef->user_data);
2009       plugin_set_ctx(old_ctx);
2010 
2011       if (status != 0) {
2012         ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
2013               "%i for event NEW.",
2014               cef->name, status);
2015       } else {
2016         if (event.ret) {
2017           DEBUG(
2018               "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.",
2019               cef->name, name);
2020           callbacks_mask |= (1 << (i));
2021         } else {
2022           DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.",
2023                 cef->name, name);
2024         }
2025       }
2026     }
2027 
2028     if (callbacks_mask)
2029       uc_set_callbacks_mask(name, callbacks_mask);
2030 
2031     break;
2032   case CE_VALUE_UPDATE:
2033   case CE_VALUE_EXPIRED:
2034     for (size_t i = 0; i < list_cache_event_num; i++) {
2035       cache_event_func_t *cef = &list_cache_event[i];
2036       plugin_cache_event_cb callback = cef->callback;
2037 
2038       if (!callback)
2039         continue;
2040 
2041       if (callbacks_mask && (1 << (i)) == 0)
2042         continue;
2043 
2044       cache_event_t event = (cache_event_t){.type = event_type,
2045                                             .value_list = vl,
2046                                             .value_list_name = name,
2047                                             .ret = 0};
2048 
2049       plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
2050       int status = (*callback)(&event, &cef->user_data);
2051       plugin_set_ctx(old_ctx);
2052 
2053       if (status != 0) {
2054         ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
2055               "%i for event %s.",
2056               cef->name, status,
2057               ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED"));
2058       }
2059     }
2060     break;
2061   }
2062   return;
2063 }
2064 
plugin_dispatch_values_internal(value_list_t * vl)2065 static int plugin_dispatch_values_internal(value_list_t *vl) {
2066   int status;
2067   static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
2068 
2069   bool free_meta_data = false;
2070 
2071   assert(vl != NULL);
2072 
2073   /* These fields are initialized by plugin_value_list_clone() if needed: */
2074   assert(vl->host[0] != 0);
2075   assert(vl->time != 0); /* The time is determined at _enqueue_ time. */
2076   assert(vl->interval != 0);
2077 
2078   if (vl->type[0] == 0 || vl->values == NULL || vl->values_len < 1) {
2079     ERROR("plugin_dispatch_values: Invalid value list "
2080           "from plugin %s.",
2081           vl->plugin);
2082     return -1;
2083   }
2084 
2085   /* Free meta data only if the calling function didn't specify any. In
2086    * this case matches and targets may add some and the calling function
2087    * may not expect (and therefore free) that data. */
2088   if (vl->meta == NULL)
2089     free_meta_data = true;
2090 
2091   if (list_write == NULL)
2092     c_complain_once(LOG_WARNING, &no_write_complaint,
2093                     "plugin_dispatch_values: No write callback has been "
2094                     "registered. Please load at least one output plugin, "
2095                     "if you want the collected data to be stored.");
2096 
2097   if (data_sets == NULL) {
2098     ERROR("plugin_dispatch_values: No data sets registered. "
2099           "Could the types database be read? Check "
2100           "your `TypesDB' setting!");
2101     return -1;
2102   }
2103 
2104   data_set_t *ds = NULL;
2105   if (c_avl_get(data_sets, vl->type, (void *)&ds) != 0) {
2106     char ident[6 * DATA_MAX_NAME_LEN];
2107 
2108     FORMAT_VL(ident, sizeof(ident), vl);
2109     INFO("plugin_dispatch_values: Dataset not found: %s "
2110          "(from \"%s\"), check your types.db!",
2111          vl->type, ident);
2112     return -1;
2113   }
2114 
2115   DEBUG("plugin_dispatch_values: time = %.3f; interval = %.3f; "
2116         "host = %s; "
2117         "plugin = %s; plugin_instance = %s; "
2118         "type = %s; type_instance = %s;",
2119         CDTIME_T_TO_DOUBLE(vl->time), CDTIME_T_TO_DOUBLE(vl->interval),
2120         vl->host, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
2121 
2122 #if COLLECT_DEBUG
2123   assert(0 == strcmp(ds->type, vl->type));
2124 #else
2125   if (0 != strcmp(ds->type, vl->type))
2126     WARNING(
2127         "plugin_dispatch_values: <%s/%s-%s> (ds->type = %s) != (vl->type = %s)",
2128         vl->host, vl->plugin, vl->plugin_instance, ds->type, vl->type);
2129 #endif
2130 
2131 #if COLLECT_DEBUG
2132   assert(ds->ds_num == vl->values_len);
2133 #else
2134   if (ds->ds_num != vl->values_len) {
2135     ERROR("plugin_dispatch_values: <%s/%s-%s/%s-%s> ds->type = %s: "
2136           "(ds->ds_num = %" PRIsz ") != "
2137           "(vl->values_len = %" PRIsz ")",
2138           vl->host, vl->plugin, vl->plugin_instance, vl->type,
2139           vl->type_instance, ds->type, ds->ds_num, vl->values_len);
2140     return -1;
2141   }
2142 #endif
2143 
2144   escape_slashes(vl->host, sizeof(vl->host));
2145   escape_slashes(vl->plugin, sizeof(vl->plugin));
2146   escape_slashes(vl->plugin_instance, sizeof(vl->plugin_instance));
2147   escape_slashes(vl->type, sizeof(vl->type));
2148   escape_slashes(vl->type_instance, sizeof(vl->type_instance));
2149 
2150   if (pre_cache_chain != NULL) {
2151     status = fc_process_chain(ds, vl, pre_cache_chain);
2152     if (status < 0) {
2153       WARNING("plugin_dispatch_values: Running the "
2154               "pre-cache chain failed with "
2155               "status %i (%#x).",
2156               status, status);
2157     } else if (status == FC_TARGET_STOP)
2158       return 0;
2159   }
2160 
2161   /* Update the value cache */
2162   uc_update(ds, vl);
2163 
2164   if (post_cache_chain != NULL) {
2165     status = fc_process_chain(ds, vl, post_cache_chain);
2166     if (status < 0) {
2167       WARNING("plugin_dispatch_values: Running the "
2168               "post-cache chain failed with "
2169               "status %i (%#x).",
2170               status, status);
2171     }
2172   } else
2173     fc_default_action(ds, vl);
2174 
2175   if ((free_meta_data == true) && (vl->meta != NULL)) {
2176     meta_data_destroy(vl->meta);
2177     vl->meta = NULL;
2178   }
2179 
2180   return 0;
2181 } /* int plugin_dispatch_values_internal */
2182 
get_drop_probability(void)2183 static double get_drop_probability(void) /* {{{ */
2184 {
2185   long pos;
2186   long size;
2187   long wql;
2188 
2189   pthread_mutex_lock(&write_lock);
2190   wql = write_queue_length;
2191   pthread_mutex_unlock(&write_lock);
2192 
2193   if (wql < write_limit_low)
2194     return 0.0;
2195   if (wql >= write_limit_high)
2196     return 1.0;
2197 
2198   pos = 1 + wql - write_limit_low;
2199   size = 1 + write_limit_high - write_limit_low;
2200 
2201   return (double)pos / (double)size;
2202 } /* }}} double get_drop_probability */
2203 
check_drop_value(void)2204 static bool check_drop_value(void) /* {{{ */
2205 {
2206   static cdtime_t last_message_time;
2207   static pthread_mutex_t last_message_lock = PTHREAD_MUTEX_INITIALIZER;
2208 
2209   double p;
2210   double q;
2211   int status;
2212 
2213   if (write_limit_high == 0)
2214     return false;
2215 
2216   p = get_drop_probability();
2217   if (p == 0.0)
2218     return false;
2219 
2220   status = pthread_mutex_trylock(&last_message_lock);
2221   if (status == 0) {
2222     cdtime_t now;
2223 
2224     now = cdtime();
2225     if ((now - last_message_time) > TIME_T_TO_CDTIME_T(1)) {
2226       last_message_time = now;
2227       ERROR("plugin_dispatch_values: Low water mark "
2228             "reached. Dropping %.0f%% of metrics.",
2229             100.0 * p);
2230     }
2231     pthread_mutex_unlock(&last_message_lock);
2232   }
2233 
2234   if (p == 1.0)
2235     return true;
2236 
2237   q = cdrand_d();
2238   if (q > p)
2239     return true;
2240   else
2241     return false;
2242 } /* }}} bool check_drop_value */
2243 
plugin_dispatch_values(value_list_t const * vl)2244 EXPORT int plugin_dispatch_values(value_list_t const *vl) {
2245   int status;
2246 
2247   if (check_drop_value()) {
2248     if (record_statistics) {
2249       pthread_mutex_lock(&statistics_lock);
2250       stats_values_dropped++;
2251       pthread_mutex_unlock(&statistics_lock);
2252     }
2253     return 0;
2254   }
2255 
2256   status = plugin_write_enqueue(vl);
2257   if (status != 0) {
2258     ERROR("plugin_dispatch_values: plugin_write_enqueue failed with status %i "
2259           "(%s).",
2260           status, STRERROR(status));
2261     return status;
2262   }
2263 
2264   return 0;
2265 }
2266 
2267 __attribute__((sentinel)) int
plugin_dispatch_multivalue(value_list_t const * template,bool store_percentage,int store_type,...)2268 plugin_dispatch_multivalue(value_list_t const *template, /* {{{ */
2269                            bool store_percentage, int store_type, ...) {
2270   value_list_t *vl;
2271   int failed = 0;
2272   gauge_t sum = 0.0;
2273   va_list ap;
2274 
2275   if (check_drop_value()) {
2276     if (record_statistics) {
2277       pthread_mutex_lock(&statistics_lock);
2278       stats_values_dropped++;
2279       pthread_mutex_unlock(&statistics_lock);
2280     }
2281     return 0;
2282   }
2283 
2284   assert(template->values_len == 1);
2285 
2286   /* Calculate sum for Gauge to calculate percent if needed */
2287   if (DS_TYPE_GAUGE == store_type) {
2288     va_start(ap, store_type);
2289     while (42) {
2290       char const *name;
2291       gauge_t value;
2292 
2293       name = va_arg(ap, char const *);
2294       if (name == NULL)
2295         break;
2296 
2297       value = va_arg(ap, gauge_t);
2298       if (!isnan(value))
2299         sum += value;
2300     }
2301     va_end(ap);
2302   }
2303 
2304   vl = plugin_value_list_clone(template);
2305   /* plugin_value_list_clone makes sure vl->time is set to non-zero. */
2306   if (store_percentage)
2307     sstrncpy(vl->type, "percent", sizeof(vl->type));
2308 
2309   va_start(ap, store_type);
2310   while (42) {
2311     char const *name;
2312     int status;
2313 
2314     /* Set the type instance. */
2315     name = va_arg(ap, char const *);
2316     if (name == NULL)
2317       break;
2318     sstrncpy(vl->type_instance, name, sizeof(vl->type_instance));
2319 
2320     /* Set the value. */
2321     switch (store_type) {
2322     case DS_TYPE_GAUGE:
2323       vl->values[0].gauge = va_arg(ap, gauge_t);
2324       if (store_percentage)
2325         vl->values[0].gauge *= sum ? (100.0 / sum) : NAN;
2326       break;
2327     case DS_TYPE_ABSOLUTE:
2328       vl->values[0].absolute = va_arg(ap, absolute_t);
2329       break;
2330     case DS_TYPE_COUNTER:
2331       vl->values[0].counter = va_arg(ap, counter_t);
2332       break;
2333     case DS_TYPE_DERIVE:
2334       vl->values[0].derive = va_arg(ap, derive_t);
2335       break;
2336     default:
2337       ERROR("plugin_dispatch_multivalue: given store_type is incorrect.");
2338       failed++;
2339     }
2340 
2341     status = plugin_write_enqueue(vl);
2342     if (status != 0)
2343       failed++;
2344   }
2345   va_end(ap);
2346 
2347   plugin_value_list_free(vl);
2348   return failed;
2349 } /* }}} int plugin_dispatch_multivalue */
2350 
plugin_dispatch_notification(const notification_t * notif)2351 EXPORT int plugin_dispatch_notification(const notification_t *notif) {
2352   llentry_t *le;
2353   /* Possible TODO: Add flap detection here */
2354 
2355   DEBUG("plugin_dispatch_notification: severity = %i; message = %s; "
2356         "time = %.3f; host = %s;",
2357         notif->severity, notif->message, CDTIME_T_TO_DOUBLE(notif->time),
2358         notif->host);
2359 
2360   /* Nobody cares for notifications */
2361   if (list_notification == NULL)
2362     return -1;
2363 
2364   le = llist_head(list_notification);
2365   while (le != NULL) {
2366     callback_func_t *cf;
2367     plugin_notification_cb callback;
2368     int status;
2369 
2370     /* do not switch plugin context; rather keep the context
2371      * (interval) information of the calling plugin */
2372 
2373     cf = le->value;
2374     callback = cf->cf_callback;
2375     status = (*callback)(notif, &cf->cf_udata);
2376     if (status != 0) {
2377       WARNING("plugin_dispatch_notification: Notification "
2378               "callback %s returned %i.",
2379               le->key, status);
2380     }
2381 
2382     le = le->next;
2383   }
2384 
2385   return 0;
2386 } /* int plugin_dispatch_notification */
2387 
plugin_log(int level,const char * format,...)2388 EXPORT void plugin_log(int level, const char *format, ...) {
2389   char msg[1024];
2390   va_list ap;
2391   llentry_t *le;
2392 
2393 #if !COLLECT_DEBUG
2394   if (level >= LOG_DEBUG)
2395     return;
2396 #endif
2397 
2398   va_start(ap, format);
2399   vsnprintf(msg, sizeof(msg), format, ap);
2400   msg[sizeof(msg) - 1] = '\0';
2401   va_end(ap);
2402 
2403   if (list_log == NULL) {
2404     fprintf(stderr, "%s\n", msg);
2405     return;
2406   }
2407 
2408   le = llist_head(list_log);
2409   while (le != NULL) {
2410     callback_func_t *cf;
2411     plugin_log_cb callback;
2412 
2413     cf = le->value;
2414     callback = cf->cf_callback;
2415 
2416     /* do not switch plugin context; rather keep the context
2417      * (interval) information of the calling plugin */
2418 
2419     (*callback)(level, msg, &cf->cf_udata);
2420 
2421     le = le->next;
2422   }
2423 } /* void plugin_log */
2424 
daemon_log(int level,const char * format,...)2425 void daemon_log(int level, const char *format, ...) {
2426   char msg[1024] = ""; // Size inherits from plugin_log()
2427 
2428   char const *name = plugin_get_ctx().name;
2429   if (name == NULL)
2430     name = "UNKNOWN";
2431 
2432   va_list ap;
2433   va_start(ap, format);
2434   vsnprintf(msg, sizeof(msg), format, ap);
2435   va_end(ap);
2436 
2437   plugin_log(level, "%s plugin: %s", name, msg);
2438 } /* void daemon_log */
2439 
parse_log_severity(const char * severity)2440 int parse_log_severity(const char *severity) {
2441   int log_level = -1;
2442 
2443   if ((0 == strcasecmp(severity, "emerg")) ||
2444       (0 == strcasecmp(severity, "alert")) ||
2445       (0 == strcasecmp(severity, "crit")) || (0 == strcasecmp(severity, "err")))
2446     log_level = LOG_ERR;
2447   else if (0 == strcasecmp(severity, "warning"))
2448     log_level = LOG_WARNING;
2449   else if (0 == strcasecmp(severity, "notice"))
2450     log_level = LOG_NOTICE;
2451   else if (0 == strcasecmp(severity, "info"))
2452     log_level = LOG_INFO;
2453 #if COLLECT_DEBUG
2454   else if (0 == strcasecmp(severity, "debug"))
2455     log_level = LOG_DEBUG;
2456 #endif /* COLLECT_DEBUG */
2457 
2458   return log_level;
2459 } /* int parse_log_severity */
2460 
parse_notif_severity(const char * severity)2461 EXPORT int parse_notif_severity(const char *severity) {
2462   int notif_severity = -1;
2463 
2464   if (strcasecmp(severity, "FAILURE") == 0)
2465     notif_severity = NOTIF_FAILURE;
2466   else if (strcmp(severity, "OKAY") == 0)
2467     notif_severity = NOTIF_OKAY;
2468   else if ((strcmp(severity, "WARNING") == 0) ||
2469            (strcmp(severity, "WARN") == 0))
2470     notif_severity = NOTIF_WARNING;
2471 
2472   return notif_severity;
2473 } /* int parse_notif_severity */
2474 
plugin_get_ds(const char * name)2475 EXPORT const data_set_t *plugin_get_ds(const char *name) {
2476   data_set_t *ds;
2477 
2478   if (data_sets == NULL) {
2479     P_ERROR("plugin_get_ds: No data sets are defined yet.");
2480     return NULL;
2481   }
2482 
2483   if (c_avl_get(data_sets, name, (void *)&ds) != 0) {
2484     DEBUG("No such dataset registered: %s", name);
2485     return NULL;
2486   }
2487 
2488   return ds;
2489 } /* data_set_t *plugin_get_ds */
2490 
plugin_notification_meta_add(notification_t * n,const char * name,enum notification_meta_type_e type,const void * value)2491 static int plugin_notification_meta_add(notification_t *n, const char *name,
2492                                         enum notification_meta_type_e type,
2493                                         const void *value) {
2494   notification_meta_t *meta;
2495   notification_meta_t *tail;
2496 
2497   if ((n == NULL) || (name == NULL) || (value == NULL)) {
2498     ERROR("plugin_notification_meta_add: A pointer is NULL!");
2499     return -1;
2500   }
2501 
2502   meta = calloc(1, sizeof(*meta));
2503   if (meta == NULL) {
2504     ERROR("plugin_notification_meta_add: calloc failed.");
2505     return -1;
2506   }
2507 
2508   sstrncpy(meta->name, name, sizeof(meta->name));
2509   meta->type = type;
2510 
2511   switch (type) {
2512   case NM_TYPE_STRING: {
2513     meta->nm_value.nm_string = strdup((const char *)value);
2514     if (meta->nm_value.nm_string == NULL) {
2515       ERROR("plugin_notification_meta_add: strdup failed.");
2516       sfree(meta);
2517       return -1;
2518     }
2519     break;
2520   }
2521   case NM_TYPE_SIGNED_INT: {
2522     meta->nm_value.nm_signed_int = *((int64_t *)value);
2523     break;
2524   }
2525   case NM_TYPE_UNSIGNED_INT: {
2526     meta->nm_value.nm_unsigned_int = *((uint64_t *)value);
2527     break;
2528   }
2529   case NM_TYPE_DOUBLE: {
2530     meta->nm_value.nm_double = *((double *)value);
2531     break;
2532   }
2533   case NM_TYPE_BOOLEAN: {
2534     meta->nm_value.nm_boolean = *((bool *)value);
2535     break;
2536   }
2537   default: {
2538     ERROR("plugin_notification_meta_add: Unknown type: %i", type);
2539     sfree(meta);
2540     return -1;
2541   }
2542   } /* switch (type) */
2543 
2544   meta->next = NULL;
2545   tail = n->meta;
2546   while ((tail != NULL) && (tail->next != NULL))
2547     tail = tail->next;
2548 
2549   if (tail == NULL)
2550     n->meta = meta;
2551   else
2552     tail->next = meta;
2553 
2554   return 0;
2555 } /* int plugin_notification_meta_add */
2556 
plugin_notification_meta_add_string(notification_t * n,const char * name,const char * value)2557 int plugin_notification_meta_add_string(notification_t *n, const char *name,
2558                                         const char *value) {
2559   return plugin_notification_meta_add(n, name, NM_TYPE_STRING, value);
2560 }
2561 
plugin_notification_meta_add_signed_int(notification_t * n,const char * name,int64_t value)2562 int plugin_notification_meta_add_signed_int(notification_t *n, const char *name,
2563                                             int64_t value) {
2564   return plugin_notification_meta_add(n, name, NM_TYPE_SIGNED_INT, &value);
2565 }
2566 
plugin_notification_meta_add_unsigned_int(notification_t * n,const char * name,uint64_t value)2567 int plugin_notification_meta_add_unsigned_int(notification_t *n,
2568                                               const char *name,
2569                                               uint64_t value) {
2570   return plugin_notification_meta_add(n, name, NM_TYPE_UNSIGNED_INT, &value);
2571 }
2572 
plugin_notification_meta_add_double(notification_t * n,const char * name,double value)2573 int plugin_notification_meta_add_double(notification_t *n, const char *name,
2574                                         double value) {
2575   return plugin_notification_meta_add(n, name, NM_TYPE_DOUBLE, &value);
2576 }
2577 
plugin_notification_meta_add_boolean(notification_t * n,const char * name,bool value)2578 int plugin_notification_meta_add_boolean(notification_t *n, const char *name,
2579                                          bool value) {
2580   return plugin_notification_meta_add(n, name, NM_TYPE_BOOLEAN, &value);
2581 }
2582 
plugin_notification_meta_copy(notification_t * dst,const notification_t * src)2583 int plugin_notification_meta_copy(notification_t *dst,
2584                                   const notification_t *src) {
2585   assert(dst != NULL);
2586   assert(src != NULL);
2587   assert(dst != src);
2588   assert((src->meta == NULL) || (src->meta != dst->meta));
2589 
2590   for (notification_meta_t *meta = src->meta; meta != NULL; meta = meta->next) {
2591     if (meta->type == NM_TYPE_STRING)
2592       plugin_notification_meta_add_string(dst, meta->name,
2593                                           meta->nm_value.nm_string);
2594     else if (meta->type == NM_TYPE_SIGNED_INT)
2595       plugin_notification_meta_add_signed_int(dst, meta->name,
2596                                               meta->nm_value.nm_signed_int);
2597     else if (meta->type == NM_TYPE_UNSIGNED_INT)
2598       plugin_notification_meta_add_unsigned_int(dst, meta->name,
2599                                                 meta->nm_value.nm_unsigned_int);
2600     else if (meta->type == NM_TYPE_DOUBLE)
2601       plugin_notification_meta_add_double(dst, meta->name,
2602                                           meta->nm_value.nm_double);
2603     else if (meta->type == NM_TYPE_BOOLEAN)
2604       plugin_notification_meta_add_boolean(dst, meta->name,
2605                                            meta->nm_value.nm_boolean);
2606   }
2607 
2608   return 0;
2609 } /* int plugin_notification_meta_copy */
2610 
plugin_notification_meta_free(notification_meta_t * n)2611 int plugin_notification_meta_free(notification_meta_t *n) {
2612   notification_meta_t *this;
2613   notification_meta_t *next;
2614 
2615   if (n == NULL) {
2616     ERROR("plugin_notification_meta_free: n == NULL!");
2617     return -1;
2618   }
2619 
2620   this = n;
2621   while (this != NULL) {
2622     next = this->next;
2623 
2624     if (this->type == NM_TYPE_STRING) {
2625       /* Assign to a temporary variable to work around nm_string's const
2626        * modifier. */
2627       void *tmp = (void *)this->nm_value.nm_string;
2628 
2629       sfree(tmp);
2630       this->nm_value.nm_string = NULL;
2631     }
2632     sfree(this);
2633 
2634     this = next;
2635   }
2636 
2637   return 0;
2638 } /* int plugin_notification_meta_free */
2639 
plugin_ctx_destructor(void * ctx)2640 static void plugin_ctx_destructor(void *ctx) {
2641   sfree(ctx);
2642 } /* void plugin_ctx_destructor */
2643 
2644 static plugin_ctx_t ctx_init = {/* interval = */ 0};
2645 
plugin_ctx_create(void)2646 static plugin_ctx_t *plugin_ctx_create(void) {
2647   plugin_ctx_t *ctx;
2648 
2649   ctx = malloc(sizeof(*ctx));
2650   if (ctx == NULL) {
2651     ERROR("Failed to allocate plugin context: %s", STRERRNO);
2652     return NULL;
2653   }
2654 
2655   *ctx = ctx_init;
2656   assert(plugin_ctx_key_initialized);
2657   pthread_setspecific(plugin_ctx_key, ctx);
2658   DEBUG("Created new plugin context.");
2659   return ctx;
2660 } /* int plugin_ctx_create */
2661 
plugin_init_ctx(void)2662 EXPORT void plugin_init_ctx(void) {
2663   pthread_key_create(&plugin_ctx_key, plugin_ctx_destructor);
2664   plugin_ctx_key_initialized = true;
2665 } /* void plugin_init_ctx */
2666 
plugin_get_ctx(void)2667 EXPORT plugin_ctx_t plugin_get_ctx(void) {
2668   plugin_ctx_t *ctx;
2669 
2670   assert(plugin_ctx_key_initialized);
2671   ctx = pthread_getspecific(plugin_ctx_key);
2672 
2673   if (ctx == NULL) {
2674     ctx = plugin_ctx_create();
2675     /* this must no happen -- exit() instead? */
2676     if (ctx == NULL)
2677       return ctx_init;
2678   }
2679 
2680   return *ctx;
2681 } /* plugin_ctx_t plugin_get_ctx */
2682 
plugin_set_ctx(plugin_ctx_t ctx)2683 EXPORT plugin_ctx_t plugin_set_ctx(plugin_ctx_t ctx) {
2684   plugin_ctx_t *c;
2685   plugin_ctx_t old;
2686 
2687   assert(plugin_ctx_key_initialized);
2688   c = pthread_getspecific(plugin_ctx_key);
2689 
2690   if (c == NULL) {
2691     c = plugin_ctx_create();
2692     /* this must no happen -- exit() instead? */
2693     if (c == NULL)
2694       return ctx_init;
2695   }
2696 
2697   old = *c;
2698   *c = ctx;
2699 
2700   return old;
2701 } /* void plugin_set_ctx */
2702 
plugin_get_interval(void)2703 EXPORT cdtime_t plugin_get_interval(void) {
2704   cdtime_t interval;
2705 
2706   interval = plugin_get_ctx().interval;
2707   if (interval > 0)
2708     return interval;
2709 
2710   P_ERROR("plugin_get_interval: Unable to determine Interval from context.");
2711 
2712   return cf_get_default_interval();
2713 } /* cdtime_t plugin_get_interval */
2714 
2715 typedef struct {
2716   plugin_ctx_t ctx;
2717   void *(*start_routine)(void *);
2718   void *arg;
2719 } plugin_thread_t;
2720 
plugin_thread_start(void * arg)2721 static void *plugin_thread_start(void *arg) {
2722   plugin_thread_t *plugin_thread = arg;
2723 
2724   void *(*start_routine)(void *) = plugin_thread->start_routine;
2725   void *plugin_arg = plugin_thread->arg;
2726 
2727   plugin_set_ctx(plugin_thread->ctx);
2728 
2729   sfree(plugin_thread);
2730 
2731   return start_routine(plugin_arg);
2732 } /* void *plugin_thread_start */
2733 
plugin_thread_create(pthread_t * thread,void * (* start_routine)(void *),void * arg,char const * name)2734 int plugin_thread_create(pthread_t *thread, void *(*start_routine)(void *),
2735                          void *arg, char const *name) {
2736   plugin_thread_t *plugin_thread;
2737 
2738   plugin_thread = malloc(sizeof(*plugin_thread));
2739   if (plugin_thread == NULL)
2740     return ENOMEM;
2741 
2742   plugin_thread->ctx = plugin_get_ctx();
2743   plugin_thread->start_routine = start_routine;
2744   plugin_thread->arg = arg;
2745 
2746   int ret = pthread_create(thread, NULL, plugin_thread_start, plugin_thread);
2747   if (ret != 0) {
2748     sfree(plugin_thread);
2749     return ret;
2750   }
2751 
2752   if (name != NULL)
2753     set_thread_name(*thread, name);
2754 
2755   return 0;
2756 } /* int plugin_thread_create */
2757