1 /**
2  * collectd - src/write_influxdb_udp.c
3  * Copyright (C) 2007-2009  Florian octo Forster
4  * Copyright (C) 2009       Aman Gupta
5  * Copyright (C) 2019       Carlos Peon Costa
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at collectd.org>
22  *   Aman Gupta <aman at tmm1.net>
23  *   Carlos Peon Costa <carlospeon at gmail.com>
24  **/
25 
26 #include "collectd.h"
27 
28 #include "plugin.h"
29 #include "utils/common/common.h"
30 #include "utils_cache.h"
31 #include "utils_complain.h"
32 
33 #if HAVE_NETDB_H
34 #include <netdb.h>
35 #endif
36 #if HAVE_NETINET_IN_H
37 #include <netinet/in.h>
38 #endif
39 #if HAVE_ARPA_INET_H
40 #include <arpa/inet.h>
41 #endif
42 #if HAVE_POLL_H
43 #include <poll.h>
44 #endif
45 #if HAVE_NET_IF_H
46 #include <net/if.h>
47 #endif
48 
49 struct sockent_client {
50   int fd;
51   struct sockaddr_storage *addr;
52   socklen_t addrlen;
53   cdtime_t next_resolve_reconnect;
54   cdtime_t resolve_interval;
55   struct sockaddr_storage *bind_addr;
56 };
57 
58 typedef struct sockent {
59   char *node;
60   char *service;
61   int interface;
62   struct sockent_client client;
63 } sockent_t;
64 
65 #define NET_DEFAULT_PACKET_SIZE 1452
66 #define NET_DEFAULT_PORT "8089"
67 
68 /*
69  * Private variables
70  */
71 
72 static int wifxudp_config_ttl;
73 static size_t wifxudp_config_packet_size = NET_DEFAULT_PACKET_SIZE;
74 static bool wifxudp_config_store_rates;
75 
76 static sockent_t *sending_socket;
77 
78 /* Buffer in which to-be-sent network packets are constructed. */
79 static char *send_buffer;
80 static char *send_buffer_ptr;
81 static int send_buffer_fill;
82 static cdtime_t send_buffer_last_update;
83 static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
84 
85 static int listen_loop;
86 
set_ttl(const sockent_t * se,const struct addrinfo * ai)87 static int set_ttl(const sockent_t *se, const struct addrinfo *ai) {
88 
89   if ((wifxudp_config_ttl < 1) || (wifxudp_config_ttl > 255))
90     return -1;
91 
92   if (ai->ai_family == AF_INET) {
93     struct sockaddr_in *addr = (struct sockaddr_in *)ai->ai_addr;
94     int optname;
95 
96     if (IN_MULTICAST(ntohl(addr->sin_addr.s_addr)))
97       optname = IP_MULTICAST_TTL;
98     else
99       optname = IP_TTL;
100 
101     if (setsockopt(se->client.fd, IPPROTO_IP, optname, &wifxudp_config_ttl,
102                    sizeof(wifxudp_config_ttl)) != 0) {
103       ERROR("write_influxdb_udp plugin: setsockopt (ipv4-ttl): %s", STRERRNO);
104       return -1;
105     }
106   } else if (ai->ai_family == AF_INET6) {
107     /* Useful example:
108      * http://gsyc.escet.urjc.es/~eva/IPv6-web/examples/mcast.html */
109     struct sockaddr_in6 *addr = (struct sockaddr_in6 *)ai->ai_addr;
110     int optname;
111 
112     if (IN6_IS_ADDR_MULTICAST(&addr->sin6_addr))
113       optname = IPV6_MULTICAST_HOPS;
114     else
115       optname = IPV6_UNICAST_HOPS;
116 
117     if (setsockopt(se->client.fd, IPPROTO_IPV6, optname, &wifxudp_config_ttl,
118                    sizeof(wifxudp_config_ttl)) != 0) {
119       ERROR("write_influxdb_udp plugin: setsockopt(ipv6-ttl): %s", STRERRNO);
120       return -1;
121     }
122   }
123 
124   return 0;
125 } /* int set_ttl */
126 
bind_socket_to_addr(sockent_t * se,const struct addrinfo * ai)127 static int bind_socket_to_addr(sockent_t *se, const struct addrinfo *ai) {
128 
129   if (se->client.bind_addr == NULL)
130     return 0;
131 
132   char pbuffer[64];
133 
134   if (ai->ai_family == AF_INET) {
135     struct sockaddr_in *addr = (struct sockaddr_in *)(se->client.bind_addr);
136     inet_ntop(AF_INET, &(addr->sin_addr), pbuffer, 64);
137 
138     if (bind(se->client.fd, (struct sockaddr *)addr, sizeof(*addr)) == -1)
139       return -1;
140   } else if (ai->ai_family == AF_INET6) {
141     struct sockaddr_in6 *addr = (struct sockaddr_in6 *)(se->client.bind_addr);
142     inet_ntop(AF_INET6, &(addr->sin6_addr), pbuffer, 64);
143 
144     if (bind(se->client.fd, (struct sockaddr *)addr, sizeof(*addr)) == -1)
145       return -1;
146   }
147 
148   return 0;
149 } /* int bind_socket_to_addr */
150 
sockent_create()151 static sockent_t *sockent_create() {
152   sockent_t *se = calloc(1, sizeof(*se));
153   if (se == NULL)
154     return NULL;
155 
156   se->node = NULL;
157   se->service = NULL;
158   se->interface = 0;
159 
160   se->client.fd = -1;
161   se->client.addr = NULL;
162   se->client.bind_addr = NULL;
163   se->client.resolve_interval = 0;
164   se->client.next_resolve_reconnect = 0;
165 
166   return se;
167 } /* sockent_t *sockent_create */
168 
sockent_client_disconnect(sockent_t * se)169 static int sockent_client_disconnect(sockent_t *se) {
170 
171   if (se == NULL)
172     return EINVAL;
173 
174   struct sockent_client *client = &se->client;
175   if (client->fd >= 0) /* connected */
176   {
177     close(client->fd);
178     client->fd = -1;
179   }
180 
181   sfree(client->addr);
182   client->addrlen = 0;
183 
184   return 0;
185 } /* int sockent_client_disconnect */
186 
sockent_client_connect(sockent_t * se)187 static int sockent_client_connect(sockent_t *se) {
188   struct addrinfo *ai_list;
189   static c_complain_t complaint = C_COMPLAIN_INIT_STATIC;
190   bool reconnect = false;
191 
192   if (se == NULL)
193     return EINVAL;
194 
195   struct sockent_client *client = &se->client;
196 
197   cdtime_t now = cdtime();
198   if (client->resolve_interval != 0 && client->next_resolve_reconnect < now) {
199     DEBUG("write_influxdb_udp plugin: "
200           "Reconnecting socket, resolve_interval = %lf, "
201           "next_resolve_reconnect = %lf",
202           CDTIME_T_TO_DOUBLE(client->resolve_interval),
203           CDTIME_T_TO_DOUBLE(client->next_resolve_reconnect));
204     reconnect = true;
205   }
206 
207   if (client->fd >= 0 && !reconnect) /* already connected and not stale*/
208     return 0;
209 
210   struct addrinfo ai_hints = {.ai_family = AF_UNSPEC,
211                               .ai_flags = AI_ADDRCONFIG,
212                               .ai_protocol = IPPROTO_UDP,
213                               .ai_socktype = SOCK_DGRAM};
214 
215   int status = getaddrinfo(
216       se->node, (se->service != NULL) ? se->service : NET_DEFAULT_PORT,
217       &ai_hints, &ai_list);
218   if (status != 0) {
219     c_complain(LOG_ERR, &complaint,
220                "write_influxdb_udp plugin: getaddrinfo (%s, %s) failed: %s",
221                (se->node == NULL) ? "(null)" : se->node,
222                (se->service == NULL) ? "(null)" : se->service,
223                gai_strerror(status));
224     return -1;
225   } else {
226     c_release(LOG_NOTICE, &complaint,
227               "write_influxdb_udp plugin: Successfully resolved \"%s\".",
228               se->node);
229   }
230 
231   for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL;
232        ai_ptr = ai_ptr->ai_next) {
233     if (client->fd >= 0) /* when we reconnect */
234       sockent_client_disconnect(se);
235 
236     client->fd =
237         socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
238     if (client->fd < 0) {
239       ERROR("write_influxdb_udp plugin: socket(2) failed: %s", STRERRNO);
240       continue;
241     }
242 
243     client->addr = calloc(1, sizeof(*client->addr));
244     if (client->addr == NULL) {
245       ERROR("write_influxdb_udp plugin: calloc failed.");
246       close(client->fd);
247       client->fd = -1;
248       continue;
249     }
250 
251     assert(sizeof(*client->addr) >= ai_ptr->ai_addrlen);
252     memcpy(client->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
253     client->addrlen = ai_ptr->ai_addrlen;
254 
255     set_ttl(se, ai_ptr);
256     bind_socket_to_addr(se, ai_ptr);
257 
258     /* We don't open more than one write-socket per
259      * node/service pair.. */
260     break;
261   }
262 
263   freeaddrinfo(ai_list);
264   if (client->fd < 0)
265     return -1;
266 
267   if (client->resolve_interval > 0)
268     client->next_resolve_reconnect = now + client->resolve_interval;
269   return 0;
270 } /* int sockent_client_connect */
271 
free_sockent_client(struct sockent_client * sec)272 static void free_sockent_client(struct sockent_client *sec) {
273   if (sec->fd >= 0) {
274     close(sec->fd);
275     sec->fd = -1;
276   }
277   sfree(sec->addr);
278   sfree(sec->bind_addr);
279 } /* void free_sockent_client */
280 
sockent_destroy(sockent_t * se)281 static void sockent_destroy(sockent_t *se) {
282 
283   if (se != NULL) {
284     sfree(se->node);
285     sfree(se->service);
286 
287     free_sockent_client(&se->client);
288 
289     sfree(se);
290   }
291 } /* void sockent_destroy */
292 
write_influxdb_udp_init_buffer(void)293 static void write_influxdb_udp_init_buffer(void) {
294   memset(send_buffer, 0, wifxudp_config_packet_size);
295   send_buffer_ptr = send_buffer;
296   send_buffer_fill = 0;
297   send_buffer_last_update = 0;
298 } /* write_influxdb_udp_init_buffer */
299 
write_influxdb_udp_send_buffer(const char * buffer,size_t buffer_size)300 static void write_influxdb_udp_send_buffer(const char *buffer,
301                                            size_t buffer_size) {
302   while (42) {
303     int status = sockent_client_connect(sending_socket);
304     if (status != 0)
305       return;
306 
307     status =
308         sendto(sending_socket->client.fd, buffer, buffer_size,
309                /* flags = */ 0, (struct sockaddr *)sending_socket->client.addr,
310                sending_socket->client.addrlen);
311     if (status < 0) {
312       if ((errno == EINTR) || (errno == EAGAIN))
313         continue;
314 
315       ERROR("write_influxdb_udp plugin: "
316             "sendto failed: %s. Closing sending socket.",
317             STRERRNO);
318       sockent_client_disconnect(sending_socket);
319       return;
320     }
321 
322     break;
323   } /* while (42) */
324 } /* void write_influxdb_udp_send_buffer */
325 
flush_buffer(void)326 static void flush_buffer(void) {
327   write_influxdb_udp_send_buffer(send_buffer, (size_t)send_buffer_fill);
328   write_influxdb_udp_init_buffer();
329 }
330 
wifxudp_escape_string(char * buffer,size_t buffer_size,const char * string)331 static int wifxudp_escape_string(char *buffer, size_t buffer_size,
332                                  const char *string) {
333 
334   if ((buffer == NULL) || (string == NULL))
335     return -EINVAL;
336 
337   if (buffer_size < 3)
338     return -ENOMEM;
339 
340   int dst_pos = 0;
341 
342 #define BUFFER_ADD(c)                                                          \
343   do {                                                                         \
344     if (dst_pos >= (buffer_size - 1)) {                                        \
345       buffer[buffer_size - 1] = '\0';                                          \
346       return -ENOMEM;                                                          \
347     }                                                                          \
348     buffer[dst_pos] = (c);                                                     \
349     dst_pos++;                                                                 \
350   } while (0)
351 
352   /* Escape special characters */
353   for (int src_pos = 0; string[src_pos] != 0; src_pos++) {
354     if ((string[src_pos] == '\\') || (string[src_pos] == ' ') ||
355         (string[src_pos] == ',') || (string[src_pos] == '=') ||
356         (string[src_pos] == '"')) {
357       BUFFER_ADD('\\');
358       BUFFER_ADD(string[src_pos]);
359     } else
360       BUFFER_ADD(string[src_pos]);
361   } /* for */
362   buffer[dst_pos] = 0;
363 
364 #undef BUFFER_ADD
365 
366   return dst_pos;
367 } /* int wifxudp_escape_string */
368 
write_influxdb_point(char * buffer,int buffer_len,const data_set_t * ds,const value_list_t * vl)369 static int write_influxdb_point(char *buffer, int buffer_len,
370                                 const data_set_t *ds, const value_list_t *vl) {
371   int status;
372   int offset = 0;
373   gauge_t *rates = NULL;
374   bool have_values = false;
375 
376   assert(0 == strcmp(ds->type, vl->type));
377 
378 #define BUFFER_ADD_ESCAPE(...)                                                 \
379   do {                                                                         \
380     status = wifxudp_escape_string(buffer + offset, buffer_len - offset,       \
381                                    __VA_ARGS__);                               \
382     if (status < 0)                                                            \
383       return -1;                                                               \
384     offset += status;                                                          \
385   } while (0)
386 
387 #define BUFFER_ADD(...)                                                        \
388   do {                                                                         \
389     status = snprintf(buffer + offset, buffer_len - offset, __VA_ARGS__);      \
390     if ((status < 0) || (status >= (buffer_len - offset))) {                   \
391       sfree(rates);                                                            \
392       return -1;                                                               \
393     }                                                                          \
394     offset += status;                                                          \
395   } while (0)
396 
397   BUFFER_ADD_ESCAPE(vl->plugin);
398   BUFFER_ADD(",host=");
399   BUFFER_ADD_ESCAPE(vl->host);
400   if (strcmp(vl->plugin_instance, "") != 0) {
401     BUFFER_ADD(",instance=");
402     BUFFER_ADD_ESCAPE(vl->plugin_instance);
403   }
404   if (strcmp(vl->type, "") != 0) {
405     BUFFER_ADD(",type=");
406     BUFFER_ADD_ESCAPE(vl->type);
407   }
408   if (strcmp(vl->type_instance, "") != 0) {
409     BUFFER_ADD(",type_instance=");
410     BUFFER_ADD_ESCAPE(vl->type_instance);
411   }
412 
413   BUFFER_ADD(" ");
414   for (size_t i = 0; i < ds->ds_num; i++) {
415     if ((ds->ds[i].type != DS_TYPE_COUNTER) &&
416         (ds->ds[i].type != DS_TYPE_GAUGE) &&
417         (ds->ds[i].type != DS_TYPE_DERIVE) &&
418         (ds->ds[i].type != DS_TYPE_ABSOLUTE)) {
419       sfree(rates);
420       return -1;
421     }
422 
423     if (ds->ds[i].type == DS_TYPE_GAUGE) {
424       if (isnan(vl->values[i].gauge))
425         continue;
426       if (have_values)
427         BUFFER_ADD(",");
428       BUFFER_ADD("%s=%lf", ds->ds[i].name, vl->values[i].gauge);
429       have_values = true;
430     } else if (wifxudp_config_store_rates) {
431       if (rates == NULL)
432         rates = uc_get_rate(ds, vl);
433       if (rates == NULL) {
434         WARNING("write_influxdb_udp plugin: "
435                 "uc_get_rate failed.");
436         return -1;
437       }
438       if (isnan(rates[i]))
439         continue;
440       if (have_values)
441         BUFFER_ADD(",");
442       BUFFER_ADD("%s=%lf", ds->ds[i].name, rates[i]);
443       have_values = true;
444     } else if (ds->ds[i].type == DS_TYPE_COUNTER) {
445       if (have_values)
446         BUFFER_ADD(",");
447       BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name,
448                  (uint64_t)vl->values[i].counter);
449       have_values = true;
450     } else if (ds->ds[i].type == DS_TYPE_DERIVE) {
451       if (have_values)
452         BUFFER_ADD(",");
453       BUFFER_ADD("%s=%" PRIi64 "i", ds->ds[i].name, vl->values[i].derive);
454       have_values = true;
455     } else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) {
456       if (have_values)
457         BUFFER_ADD(",");
458       BUFFER_ADD("%s=%" PRIu64 "i", ds->ds[i].name, vl->values[i].absolute);
459       have_values = true;
460     }
461 
462   } /* for ds->ds_num */
463   sfree(rates);
464 
465   if (!have_values)
466     return 0;
467 
468   BUFFER_ADD(" %" PRIu64 "\n", CDTIME_T_TO_MS(vl->time));
469 
470 #undef BUFFER_ADD_ESCAPE
471 #undef BUFFER_ADD
472 
473   return offset;
474 } /* int write_influxdb_point */
475 
476 static int
write_influxdb_udp_write(const data_set_t * ds,const value_list_t * vl,user_data_t * user_data)477 write_influxdb_udp_write(const data_set_t *ds, const value_list_t *vl,
478                          user_data_t __attribute__((unused)) * user_data) {
479 
480   /* listen_loop is set to non-zero in the shutdown callback, which is
481    * guaranteed to be called *after* all the write threads have been shut
482    * down. */
483   assert(listen_loop == 0);
484 
485   pthread_mutex_lock(&send_buffer_lock);
486 
487   int status = write_influxdb_point(
488       send_buffer_ptr, wifxudp_config_packet_size - send_buffer_fill, ds, vl);
489 
490   if (status < 0) {
491     flush_buffer();
492     status = write_influxdb_point(
493         send_buffer_ptr, wifxudp_config_packet_size - send_buffer_fill, ds, vl);
494   }
495   if (status < 0) {
496     ERROR("write_influxdb_udp plugin: write_influxdb_udp_write failed.");
497     pthread_mutex_unlock(&send_buffer_lock);
498     return -1;
499   }
500   if (status == 0) {
501     /* no real values to send (nan) */
502     pthread_mutex_unlock(&send_buffer_lock);
503     return 0;
504   }
505 
506   send_buffer_fill += status;
507   send_buffer_ptr += status;
508   send_buffer_last_update = cdtime();
509 
510   if (wifxudp_config_packet_size - send_buffer_fill < 120)
511     /* No room for a new point of average size in buffer,
512        the probability of fail for the new point is bigger than
513        the probability of success */
514     flush_buffer();
515 
516   pthread_mutex_unlock(&send_buffer_lock);
517   return 0;
518 } /* int write_influxdb_udp_write */
519 
wifxudp_config_set_ttl(const oconfig_item_t * ci)520 static int wifxudp_config_set_ttl(const oconfig_item_t *ci) {
521   int tmp = 0;
522 
523   if (cf_util_get_int(ci, &tmp) != 0)
524     return -1;
525   else if ((tmp > 0) && (tmp <= 255))
526     wifxudp_config_ttl = tmp;
527   else {
528     WARNING("write_influxdb_udp plugin: "
529             "The `TimeToLive' must be between 1 and 255.");
530     return -1;
531   }
532 
533   return 0;
534 } /* int wifxudp_config_set_ttl */
535 
wifxudp_config_set_buffer_size(const oconfig_item_t * ci)536 static int wifxudp_config_set_buffer_size(const oconfig_item_t *ci) {
537   int tmp = 0;
538 
539   if (cf_util_get_int(ci, &tmp) != 0)
540     return -1;
541   else if ((tmp >= 1024) && (tmp <= 65535))
542     wifxudp_config_packet_size = tmp;
543   else {
544     WARNING("write_influxdb_udp plugin: "
545             "The `MaxPacketSize' must be between 1024 and 65535.");
546     return -1;
547   }
548 
549   return 0;
550 } /* int wifxudp_config_set_buffer_size */
551 
wifxudp_config_set_server(const oconfig_item_t * ci)552 static int wifxudp_config_set_server(const oconfig_item_t *ci) {
553   if ((ci->values_num < 1) || (ci->values_num > 2) ||
554       (ci->values[0].type != OCONFIG_TYPE_STRING) ||
555       ((ci->values_num > 1) && (ci->values[1].type != OCONFIG_TYPE_STRING))) {
556     ERROR("write_influxdb_udp plugin: The `%s' config option needs "
557           "one or two string arguments.",
558           ci->key);
559     return -1;
560   }
561 
562   sending_socket = sockent_create();
563   if (sending_socket == NULL) {
564     ERROR("write_influxdb_udp plugin: sockent_create failed.");
565     return -1;
566   }
567 
568   sending_socket->node = strdup(ci->values[0].value.string);
569   if (ci->values_num >= 2)
570     sending_socket->service = strdup(ci->values[1].value.string);
571 
572   return 0;
573 } /* int wifxudp_config_set_server */
574 
write_influxdb_udp_config(oconfig_item_t * ci)575 static int write_influxdb_udp_config(oconfig_item_t *ci) {
576   for (int i = 0; i < ci->children_num; i++) {
577     oconfig_item_t *child = ci->children + i;
578 
579     if (strcasecmp("Server", child->key) == 0)
580       wifxudp_config_set_server(child);
581     else if (strcasecmp("TimeToLive", child->key) == 0) {
582       wifxudp_config_set_ttl(child);
583     } else if (strcasecmp("MaxPacketSize", child->key) == 0)
584       wifxudp_config_set_buffer_size(child);
585     else if (strcasecmp("StoreRates", child->key) == 0)
586       cf_util_get_boolean(child, &wifxudp_config_store_rates);
587     else {
588       WARNING("write_influxdb_udp plugin: "
589               "Option `%s' is not allowed here.",
590               child->key);
591     }
592   }
593 
594   return 0;
595 } /* int write_influxdb_udp_config */
596 
write_influxdb_udp_shutdown(void)597 static int write_influxdb_udp_shutdown(void) {
598   if (send_buffer_fill > 0)
599     flush_buffer();
600 
601   sfree(send_buffer);
602 
603   if (sending_socket != NULL) {
604     sockent_client_disconnect(sending_socket);
605     sockent_destroy(sending_socket);
606   }
607 
608   plugin_unregister_config("write_influxdb_udp");
609   plugin_unregister_init("write_influxdb_udp");
610   plugin_unregister_write("write_influxdb_udp");
611   plugin_unregister_shutdown("write_influxdb_udp");
612 
613   return 0;
614 } /* int write_influxdb_udp_shutdown */
615 
write_influxdb_udp_init(void)616 static int write_influxdb_udp_init(void) {
617   static bool have_init;
618 
619   /* Check if we were already initialized. If so, just return - there's
620    * nothing more to do (for now, that is). */
621   if (have_init)
622     return 0;
623   have_init = true;
624 
625   plugin_register_shutdown("write_influxdb_udp", write_influxdb_udp_shutdown);
626 
627   send_buffer = malloc(wifxudp_config_packet_size);
628   if (send_buffer == NULL) {
629     ERROR("write_influxdb_udp plugin: malloc failed.");
630     return -1;
631   }
632   write_influxdb_udp_init_buffer();
633 
634   /* setup socket(s) and so on */
635   if (sending_socket != NULL) {
636     plugin_register_write("write_influxdb_udp", write_influxdb_udp_write,
637                           /* user_data = */ NULL);
638   }
639 
640   return 0;
641 } /* int write_influxdb_udp_init */
642 
write_influxdb_udp_flush(cdtime_t timeout,const char * identifier,user_data_t * user_data)643 static int write_influxdb_udp_flush(cdtime_t timeout,
644                                     __attribute__((unused))
645                                     const char *identifier,
646                                     __attribute__((unused))
647                                     user_data_t *user_data) {
648   pthread_mutex_lock(&send_buffer_lock);
649 
650   if (send_buffer_fill > 0) {
651     if (timeout > 0) {
652       cdtime_t now = cdtime();
653       if ((send_buffer_last_update + timeout) > now) {
654         pthread_mutex_unlock(&send_buffer_lock);
655         return 0;
656       }
657     }
658     flush_buffer();
659   }
660   pthread_mutex_unlock(&send_buffer_lock);
661 
662   return 0;
663 } /* int write_influxdb_udp_flush */
664 
module_register(void)665 void module_register(void) {
666   plugin_register_complex_config("write_influxdb_udp",
667                                  write_influxdb_udp_config);
668   plugin_register_init("write_influxdb_udp", write_influxdb_udp_init);
669   plugin_register_flush("write_influxdb_udp", write_influxdb_udp_flush, NULL);
670 } /* void module_register */
671