1 /**
2  * collectd - src/connectivity.c
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8  * and/or sell copies of the Software, and to permit persons to whom the
9  * Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20  * DEALINGS IN THE SOFTWARE.
21  *
22  * Authors:
23  *   Red Hat NFVPE
24  *     Andrew Bays <abays at redhat.com>
25  *     Aneesh Puttur <aputtur at redhat.com>
26  **/
27 
28 #include "collectd.h"
29 
30 #include "plugin.h"
31 #include "utils/common/common.h"
32 #include "utils/ignorelist/ignorelist.h"
33 #include "utils_complain.h"
34 
35 #include <asm/types.h>
36 #include <errno.h>
37 #include <net/if.h>
38 #include <netinet/in.h>
39 #include <pthread.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44 
45 #include <libmnl/libmnl.h>
46 #include <linux/netlink.h>
47 #include <linux/rtnetlink.h>
48 
49 #include <yajl/yajl_common.h>
50 #include <yajl/yajl_gen.h>
51 #if HAVE_YAJL_YAJL_VERSION_H
52 #include <yajl/yajl_version.h>
53 #endif
54 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
55 #define HAVE_YAJL_V2 1
56 #endif
57 
58 #define MYPROTO NETLINK_ROUTE
59 
60 #define LINK_STATE_DOWN 0
61 #define LINK_STATE_UP 1
62 #define LINK_STATE_UNKNOWN 2
63 
64 #define CONNECTIVITY_DOMAIN_FIELD "domain"
65 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
66 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
67 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
68 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
69 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
70 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
71 #define CONNECTIVITY_PRIORITY_FIELD "priority"
72 #define CONNECTIVITY_PRIORITY_VALUE "high"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
74 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
75 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
76 #define CONNECTIVITY_SEQUENCE_VALUE "0"
77 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
78 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
79 #define CONNECTIVITY_VERSION_FIELD "version"
80 #define CONNECTIVITY_VERSION_VALUE "1.0"
81 
82 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
83 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
84 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
85 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
86 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
87 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
89 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
90   "stateChangeFieldsVersion"
91 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
92 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
93 
94 /*
95  * Private data types
96  */
97 
98 struct interface_list_s {
99   char *interface;
100 
101   uint32_t status;
102   uint32_t prev_status;
103   uint32_t sent;
104   cdtime_t timestamp;
105 
106   struct interface_list_s *next;
107 };
108 typedef struct interface_list_s interface_list_t;
109 
110 /*
111  * Private variables
112  */
113 
114 static ignorelist_t *ignorelist = NULL;
115 
116 static interface_list_t *interface_list_head = NULL;
117 static int monitor_all_interfaces = 1;
118 
119 static int connectivity_netlink_thread_loop = 0;
120 static int connectivity_netlink_thread_error = 0;
121 static pthread_t connectivity_netlink_thread_id;
122 static int connectivity_dequeue_thread_loop = 0;
123 static pthread_t connectivity_dequeue_thread_id;
124 static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER;
125 static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
126 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
127 static int nl_sock = -1;
128 static int event_id = 0;
129 static int statuses_to_send = 0;
130 
131 static const char *config_keys[] = {"Interface", "IgnoreSelected"};
132 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
133 
134 /*
135  * Private functions
136  */
137 
gen_message_payload(int state,int old_state,const char * interface,cdtime_t timestamp,char ** buf)138 static int gen_message_payload(int state, int old_state, const char *interface,
139                                cdtime_t timestamp, char **buf) {
140   const unsigned char *buf2;
141   yajl_gen g;
142   char json_str[DATA_MAX_NAME_LEN];
143 
144 #if !defined(HAVE_YAJL_V2)
145   yajl_gen_config conf = {0};
146 #endif
147 
148 #if HAVE_YAJL_V2
149   size_t len;
150   g = yajl_gen_alloc(NULL);
151   yajl_gen_config(g, yajl_gen_beautify, 0);
152 #else
153   unsigned int len;
154   g = yajl_gen_alloc(&conf, NULL);
155 #endif
156 
157   yajl_gen_clear(g);
158 
159   // *** BEGIN common event header ***
160 
161   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
162     goto err;
163 
164   // domain
165   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
166                       strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
167     goto err;
168 
169   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
170                       strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
171     goto err;
172 
173   // eventId
174   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
175                       strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
176       yajl_gen_status_ok)
177     goto err;
178 
179   event_id = event_id + 1;
180   if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
181     goto err;
182   }
183 
184   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
185     goto err;
186   }
187 
188   // eventName
189   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
190                       strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
191       yajl_gen_status_ok)
192     goto err;
193 
194   if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface,
195                (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
196                            : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) {
197     goto err;
198   }
199 
200   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
201       yajl_gen_status_ok) {
202     goto err;
203   }
204 
205   // lastEpochMicrosec
206   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
207                       strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
208       yajl_gen_status_ok)
209     goto err;
210 
211   if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
212                CDTIME_T_TO_US(cdtime())) < 0) {
213     goto err;
214   }
215 
216   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
217     goto err;
218   }
219 
220   // priority
221   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
222                       strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
223       yajl_gen_status_ok)
224     goto err;
225 
226   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
227                       strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
228       yajl_gen_status_ok)
229     goto err;
230 
231   // reportingEntityName
232   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
233                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
234       yajl_gen_status_ok)
235     goto err;
236 
237   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
238                       strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
239       yajl_gen_status_ok)
240     goto err;
241 
242   // sequence
243   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
244                       strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
245       yajl_gen_status_ok)
246     goto err;
247 
248   if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
249                       strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
250       yajl_gen_status_ok)
251     goto err;
252 
253   // sourceName
254   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
255                       strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
256       yajl_gen_status_ok)
257     goto err;
258 
259   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
260       yajl_gen_status_ok)
261     goto err;
262 
263   // startEpochMicrosec
264   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
265                       strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
266       yajl_gen_status_ok)
267     goto err;
268 
269   if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
270                CDTIME_T_TO_US(timestamp)) < 0) {
271     goto err;
272   }
273 
274   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
275     goto err;
276   }
277 
278   // version
279   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
280                       strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
281     goto err;
282 
283   if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
284                       strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
285     goto err;
286 
287   // *** END common event header ***
288 
289   // *** BEGIN state change fields ***
290 
291   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
292                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
293       yajl_gen_status_ok)
294     goto err;
295 
296   if (yajl_gen_map_open(g) != yajl_gen_status_ok)
297     goto err;
298 
299   // newState
300   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
301                       strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
302       yajl_gen_status_ok)
303     goto err;
304 
305   int new_state_len =
306       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
307                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
308 
309   if (yajl_gen_string(g,
310                       (u_char *)(state == 0
311                                      ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
312                                      : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
313                       new_state_len) != yajl_gen_status_ok)
314     goto err;
315 
316   // oldState
317   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
318                       strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
319       yajl_gen_status_ok)
320     goto err;
321 
322   int old_state_len =
323       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
324                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
325 
326   if (yajl_gen_string(g,
327                       (u_char *)(old_state == 0
328                                      ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
329                                      : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
330                       old_state_len) != yajl_gen_status_ok)
331     goto err;
332 
333   // stateChangeFieldsVersion
334   if (yajl_gen_string(g,
335                       (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
336                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
337       yajl_gen_status_ok)
338     goto err;
339 
340   if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
341                       strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
342       yajl_gen_status_ok)
343     goto err;
344 
345   // stateInterface
346   if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
347                       strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
348       yajl_gen_status_ok)
349     goto err;
350 
351   if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
352       yajl_gen_status_ok)
353     goto err;
354 
355   // close state change and header fields
356   if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
357       yajl_gen_map_close(g) != yajl_gen_status_ok)
358     goto err;
359 
360   // *** END state change fields ***
361 
362   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
363     goto err;
364 
365   *buf = strdup((char *)buf2);
366 
367   if (*buf == NULL) {
368     ERROR("connectivity plugin: strdup failed during gen_message_payload: %s",
369           STRERRNO);
370     goto err;
371   }
372 
373   yajl_gen_free(g);
374 
375   return 0;
376 
377 err:
378   yajl_gen_free(g);
379   ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
380   return -1;
381 }
382 
add_interface(const char * interface,int status,int prev_status)383 static interface_list_t *add_interface(const char *interface, int status,
384                                        int prev_status) {
385   interface_list_t *il = calloc(1, sizeof(*il));
386 
387   if (il == NULL) {
388     ERROR("connectivity plugin: calloc failed during add_interface: %s",
389           STRERRNO);
390     return NULL;
391   }
392 
393   char *interface2 = strdup(interface);
394   if (interface2 == NULL) {
395     sfree(il);
396     ERROR("connectivity plugin: strdup failed during add_interface: %s",
397           STRERRNO);
398     return NULL;
399   }
400 
401   il->interface = interface2;
402   il->status = status;
403   il->prev_status = prev_status;
404   il->timestamp = cdtime();
405   il->sent = 0;
406   il->next = interface_list_head;
407   interface_list_head = il;
408 
409   DEBUG("connectivity plugin: added interface %s", interface2);
410 
411   return il;
412 }
413 
connectivity_link_state(struct nlmsghdr * msg)414 static int connectivity_link_state(struct nlmsghdr *msg) {
415   pthread_mutex_lock(&connectivity_data_lock);
416 
417   struct nlattr *attr;
418   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
419 
420   /* Scan attribute list for device name. */
421   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
422     if (mnl_attr_get_type(attr) != IFLA_IFNAME)
423       continue;
424 
425     if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
426       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
427             "mnl_attr_validate "
428             "failed.");
429       pthread_mutex_unlock(&connectivity_data_lock);
430       return MNL_CB_ERROR;
431     }
432 
433     const char *dev = mnl_attr_get_str(attr);
434 
435     // Check the list of interfaces we should monitor, if we've chosen
436     // a subset.  If we don't care about this one, abort.
437     if (ignorelist_match(ignorelist, dev) != 0) {
438       DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
439             "interface: %s",
440             dev);
441       break;
442     }
443 
444     interface_list_t *il = NULL;
445 
446     for (il = interface_list_head; il != NULL; il = il->next)
447       if (strcmp(dev, il->interface) == 0)
448         break;
449 
450     if (il == NULL) {
451       // We haven't encountered this interface yet, so add it to the linked list
452       il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
453 
454       if (il == NULL) {
455         ERROR("connectivity plugin: unable to add interface %s during "
456               "connectivity_link_state",
457               dev);
458         return MNL_CB_ERROR;
459       }
460     }
461 
462     uint32_t prev_status = il->status;
463     il->status =
464         ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
465     il->timestamp = cdtime();
466 
467     // If the new status is different than the previous status,
468     // store the previous status and set sent to zero, and set the
469     // global flag to indicate there are statuses to dispatch
470     if (il->status != prev_status) {
471       il->prev_status = prev_status;
472       il->sent = 0;
473       statuses_to_send = 1;
474     }
475 
476     DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
477           (unsigned long long)il->timestamp, dev,
478           ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
479 
480     // no need to loop again, we found the interface name attr
481     // (otherwise the first if-statement in the loop would
482     // have moved us on with 'continue')
483     break;
484   }
485 
486   pthread_mutex_unlock(&connectivity_data_lock);
487 
488   return 0;
489 }
490 
msg_handler(struct nlmsghdr * msg)491 static int msg_handler(struct nlmsghdr *msg) {
492   // We are only interested in RTM_NEWLINK messages
493   if (msg->nlmsg_type != RTM_NEWLINK) {
494     return 0;
495   }
496   return connectivity_link_state(msg);
497 }
498 
read_event(int (* msg_handler)(struct nlmsghdr *))499 static int read_event(int (*msg_handler)(struct nlmsghdr *)) {
500   int ret = 0;
501   int recv_flags = MSG_DONTWAIT;
502 
503   if (nl_sock == -1 || msg_handler == NULL)
504     return EINVAL;
505 
506   while (42) {
507     pthread_mutex_lock(&connectivity_threads_lock);
508 
509     if (connectivity_netlink_thread_loop <= 0) {
510       pthread_mutex_unlock(&connectivity_threads_lock);
511       return ret;
512     }
513 
514     pthread_mutex_unlock(&connectivity_threads_lock);
515 
516     char buf[4096];
517     int status = recv(nl_sock, buf, sizeof(buf), recv_flags);
518 
519     if (status < 0) {
520 
521       // If there were no more messages to drain from the socket,
522       // then signal the dequeue thread and allow it to dispatch
523       // any saved interface status changes.  Then continue, but
524       // block and wait for new messages
525       if (errno == EWOULDBLOCK || errno == EAGAIN) {
526         pthread_cond_signal(&connectivity_cond);
527 
528         recv_flags = 0;
529         continue;
530       }
531 
532       if (errno == EINTR) {
533         // Interrupt, so just continue and try again
534         continue;
535       }
536 
537       /* Anything else is an error */
538       ERROR("connectivity plugin: read_event: Error recv: %d", status);
539       return status;
540     }
541 
542     // Message received successfully, so we'll stop blocking on the
543     // receive call for now (until we get a "would block" error, which
544     // will be handled above)
545     recv_flags = MSG_DONTWAIT;
546 
547     if (status == 0) {
548       DEBUG("connectivity plugin: read_event: EOF");
549     }
550 
551     /* We need to handle more than one message per 'recvmsg' */
552     for (struct nlmsghdr *h = (struct nlmsghdr *)buf;
553          NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) {
554       /* Finish reading */
555       if (h->nlmsg_type == NLMSG_DONE)
556         return ret;
557 
558       /* Message is some kind of error */
559       if (h->nlmsg_type == NLMSG_ERROR) {
560         struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h);
561         ERROR("connectivity plugin: read_event: Message is an error: %d",
562               l_err->error);
563         return -1; // Error
564       }
565 
566       /* Call message handler */
567       if (msg_handler) {
568         ret = (*msg_handler)(h);
569         if (ret < 0) {
570           ERROR("connectivity plugin: read_event: Message handler error %d",
571                 ret);
572           return ret;
573         }
574       } else {
575         ERROR("connectivity plugin: read_event: Error NULL message handler");
576         return -1;
577       }
578     }
579   }
580 
581   return ret;
582 }
583 
connectivity_dispatch_notification(const char * interface,gauge_t value,gauge_t old_value,cdtime_t timestamp)584 static void connectivity_dispatch_notification(const char *interface,
585                                                gauge_t value, gauge_t old_value,
586                                                cdtime_t timestamp) {
587 
588   notification_t n = {
589       .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE),
590       .time = cdtime(),
591       .plugin = "connectivity",
592       .type = "gauge",
593       .type_instance = "interface_status",
594   };
595 
596   sstrncpy(n.host, hostname_g, sizeof(n.host));
597   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
598 
599   char *buf = NULL;
600 
601   gen_message_payload(value, old_value, interface, timestamp, &buf);
602 
603   int status = plugin_notification_meta_add_string(&n, "ves", buf);
604 
605   if (status < 0) {
606     sfree(buf);
607     ERROR("connectivity plugin: unable to set notification VES metadata: %s",
608           STRERRNO);
609     return;
610   }
611 
612   DEBUG("connectivity plugin: notification VES metadata: %s",
613         n.meta->nm_value.nm_string);
614 
615   DEBUG("connectivity plugin: dispatching state %d for interface %s",
616         (int)value, interface);
617 
618   plugin_dispatch_notification(&n);
619   plugin_notification_meta_free(n.meta);
620 
621   // strdup'd in gen_message_payload
622   if (buf != NULL)
623     sfree(buf);
624 }
625 
626 // NOTE: Caller MUST hold connectivity_data_lock when calling this function
send_interface_status()627 static void send_interface_status() {
628   for (interface_list_t *il = interface_list_head; il != NULL;
629        il = il->next) /* {{{ */
630   {
631     uint32_t status = il->status;
632     uint32_t prev_status = il->prev_status;
633     uint32_t sent = il->sent;
634 
635     if (status != prev_status && sent == 0) {
636       connectivity_dispatch_notification(il->interface, status, prev_status,
637                                          il->timestamp);
638       il->sent = 1;
639     }
640   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
641 
642   statuses_to_send = 0;
643 }
644 
read_interface_status()645 static void read_interface_status() /* {{{ */
646 {
647   pthread_mutex_lock(&connectivity_data_lock);
648 
649   // If we don't have any interface statuses to dispatch,
650   // then we wait until signalled
651   if (!statuses_to_send)
652     pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
653 
654   send_interface_status();
655 
656   pthread_mutex_unlock(&connectivity_data_lock);
657 } /* }}} int *read_interface_status */
658 
connectivity_netlink_thread(void * arg)659 static void *connectivity_netlink_thread(void *arg) /* {{{ */
660 {
661   pthread_mutex_lock(&connectivity_threads_lock);
662 
663   while (connectivity_netlink_thread_loop > 0) {
664     pthread_mutex_unlock(&connectivity_threads_lock);
665 
666     int status = read_event(msg_handler);
667 
668     pthread_mutex_lock(&connectivity_threads_lock);
669 
670     if (status < 0) {
671       connectivity_netlink_thread_error = 1;
672       break;
673     }
674   } /* while (connectivity_netlink_thread_loop > 0) */
675 
676   pthread_mutex_unlock(&connectivity_threads_lock);
677 
678   return (void *)0;
679 } /* }}} void *connectivity_netlink_thread */
680 
connectivity_dequeue_thread(void * arg)681 static void *connectivity_dequeue_thread(void *arg) /* {{{ */
682 {
683   pthread_mutex_lock(&connectivity_threads_lock);
684 
685   while (connectivity_dequeue_thread_loop > 0) {
686     pthread_mutex_unlock(&connectivity_threads_lock);
687 
688     read_interface_status();
689 
690     pthread_mutex_lock(&connectivity_threads_lock);
691   } /* while (connectivity_dequeue_thread_loop > 0) */
692 
693   pthread_mutex_unlock(&connectivity_threads_lock);
694 
695   return ((void *)0);
696 } /* }}} void *connectivity_dequeue_thread */
697 
nl_connect()698 static int nl_connect() {
699   struct sockaddr_nl sa_nl = {
700       .nl_family = AF_NETLINK,
701       .nl_groups = RTMGRP_LINK,
702       .nl_pid = getpid(),
703   };
704 
705   nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
706   if (nl_sock == -1) {
707     ERROR("connectivity plugin: socket open failed: %s", STRERRNO);
708     return -1;
709   }
710 
711   int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
712   if (rc == -1) {
713     ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
714     close(nl_sock);
715     nl_sock = -1;
716     return -1;
717   }
718 
719   return 0;
720 }
721 
start_netlink_thread(void)722 static int start_netlink_thread(void) /* {{{ */
723 {
724   pthread_mutex_lock(&connectivity_threads_lock);
725 
726   if (connectivity_netlink_thread_loop != 0) {
727     pthread_mutex_unlock(&connectivity_threads_lock);
728     return 0;
729   }
730 
731   connectivity_netlink_thread_loop = 1;
732   connectivity_netlink_thread_error = 0;
733 
734   int status;
735 
736   if (nl_sock == -1) {
737     status = nl_connect();
738 
739     if (status != 0) {
740       pthread_mutex_unlock(&connectivity_threads_lock);
741       return status;
742     }
743   }
744 
745   status = plugin_thread_create(&connectivity_netlink_thread_id,
746                                 connectivity_netlink_thread,
747                                 /* arg = */ (void *)0, "connectivity");
748   if (status != 0) {
749     connectivity_netlink_thread_loop = 0;
750     ERROR("connectivity plugin: Starting thread failed.");
751     pthread_mutex_unlock(&connectivity_threads_lock);
752 
753     int status2 = close(nl_sock);
754 
755     if (status2 != 0) {
756       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
757             status2, STRERRNO);
758     }
759 
760     nl_sock = -1;
761 
762     return -1;
763   }
764 
765   pthread_mutex_unlock(&connectivity_threads_lock);
766 
767   return status;
768 }
769 
start_dequeue_thread(void)770 static int start_dequeue_thread(void) /* {{{ */
771 {
772   pthread_mutex_lock(&connectivity_threads_lock);
773 
774   if (connectivity_dequeue_thread_loop != 0) {
775     pthread_mutex_unlock(&connectivity_threads_lock);
776     return 0;
777   }
778 
779   connectivity_dequeue_thread_loop = 1;
780 
781   int status = plugin_thread_create(&connectivity_dequeue_thread_id,
782                                     connectivity_dequeue_thread,
783                                     /* arg = */ (void *)0, "connectivity");
784   if (status != 0) {
785     connectivity_dequeue_thread_loop = 0;
786     ERROR("connectivity plugin: Starting dequeue thread failed.");
787     pthread_mutex_unlock(&connectivity_threads_lock);
788     return -1;
789   }
790 
791   pthread_mutex_unlock(&connectivity_threads_lock);
792 
793   return status;
794 } /* }}} int start_dequeue_thread */
795 
start_threads(void)796 static int start_threads(void) /* {{{ */
797 {
798   int status = start_netlink_thread();
799   int status2 = start_dequeue_thread();
800 
801   if (status != 0)
802     return status;
803   else
804     return status2;
805 } /* }}} int start_threads */
806 
stop_netlink_thread(int shutdown)807 static int stop_netlink_thread(int shutdown) /* {{{ */
808 {
809   int socket_status;
810 
811   if (nl_sock != -1) {
812     socket_status = close(nl_sock);
813     if (socket_status != 0) {
814       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
815             socket_status, STRERRNO);
816     }
817 
818     nl_sock = -1;
819   } else
820     socket_status = 0;
821 
822   pthread_mutex_lock(&connectivity_threads_lock);
823 
824   if (connectivity_netlink_thread_loop == 0) {
825     pthread_mutex_unlock(&connectivity_threads_lock);
826     // Thread has already been terminated, nothing more to attempt
827     return socket_status;
828   }
829 
830   // Set thread termination status
831   connectivity_netlink_thread_loop = 0;
832   pthread_mutex_unlock(&connectivity_threads_lock);
833 
834   // Let threads waiting on access to the interface list know to move
835   // on such that they'll see the thread's termination status
836   pthread_cond_broadcast(&connectivity_cond);
837 
838   int thread_status;
839 
840   if (shutdown == 1) {
841     // Since the thread is blocking, calling pthread_join
842     // doesn't actually succeed in stopping it.  It will stick around
843     // until a NETLINK message is received on the socket (at which
844     // it will realize that "connectivity_netlink_thread_loop" is 0 and will
845     // break out of the read loop and be allowed to die).  This is
846     // fine when the process isn't supposed to be exiting, but in
847     // the case of a process shutdown, we don't want to have an
848     // idle thread hanging around.  Calling pthread_cancel here in
849     // the case of a shutdown is just assures that the thread is
850     // gone and that the process has been fully terminated.
851 
852     DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
853 
854     thread_status = pthread_cancel(connectivity_netlink_thread_id);
855 
856     if (thread_status != 0 && thread_status != ESRCH) {
857       ERROR("connectivity plugin: Unable to cancel netlink thread: %d",
858             thread_status);
859       thread_status = -1;
860     } else
861       thread_status = 0;
862   } else {
863     thread_status =
864         pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
865     if (thread_status != 0 && thread_status != ESRCH) {
866       ERROR("connectivity plugin: Stopping netlink thread failed: %d",
867             thread_status);
868       thread_status = -1;
869     } else
870       thread_status = 0;
871   }
872 
873   pthread_mutex_lock(&connectivity_threads_lock);
874   memset(&connectivity_netlink_thread_id, 0,
875          sizeof(connectivity_netlink_thread_id));
876   connectivity_netlink_thread_error = 0;
877   pthread_mutex_unlock(&connectivity_threads_lock);
878 
879   DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
880 
881   if (socket_status != 0)
882     return socket_status;
883   else
884     return thread_status;
885 }
886 
stop_dequeue_thread()887 static int stop_dequeue_thread() /* {{{ */
888 {
889   pthread_mutex_lock(&connectivity_threads_lock);
890 
891   if (connectivity_dequeue_thread_loop == 0) {
892     pthread_mutex_unlock(&connectivity_threads_lock);
893     return -1;
894   }
895 
896   // Set thread termination status
897   connectivity_dequeue_thread_loop = 0;
898   pthread_mutex_unlock(&connectivity_threads_lock);
899 
900   // Let threads waiting on access to the interface list know to move
901   // on such that they'll see the threads termination status
902   pthread_cond_broadcast(&connectivity_cond);
903 
904   // Calling pthread_cancel here just assures that the thread is
905   // gone and that the process has been fully terminated.
906 
907   DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
908 
909   int status = pthread_cancel(connectivity_dequeue_thread_id);
910 
911   if (status != 0 && status != ESRCH) {
912     ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
913     status = -1;
914   } else
915     status = 0;
916 
917   pthread_mutex_lock(&connectivity_threads_lock);
918   memset(&connectivity_dequeue_thread_id, 0,
919          sizeof(connectivity_dequeue_thread_id));
920   pthread_mutex_unlock(&connectivity_threads_lock);
921 
922   DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
923 
924   return status;
925 } /* }}} int stop_dequeue_thread */
926 
stop_threads()927 static int stop_threads() /* {{{ */
928 {
929   int status = stop_netlink_thread(1);
930   int status2 = stop_dequeue_thread();
931 
932   if (status != 0)
933     return status;
934   else
935     return status2;
936 } /* }}} int stop_threads */
937 
connectivity_init(void)938 static int connectivity_init(void) /* {{{ */
939 {
940   if (monitor_all_interfaces) {
941     NOTICE("connectivity plugin: No interfaces have been selected, so all will "
942            "be monitored");
943   }
944 
945   return start_threads();
946 } /* }}} int connectivity_init */
947 
connectivity_config(const char * key,const char * value)948 static int connectivity_config(const char *key, const char *value) /* {{{ */
949 {
950   if (ignorelist == NULL) {
951     ignorelist = ignorelist_create(/* invert = */ 1);
952 
953     if (ignorelist == NULL)
954       return -1;
955   }
956 
957   if (strcasecmp(key, "Interface") == 0) {
958     ignorelist_add(ignorelist, value);
959     monitor_all_interfaces = 0;
960   } else if (strcasecmp(key, "IgnoreSelected") == 0) {
961     int invert = 1;
962     if (IS_TRUE(value))
963       invert = 0;
964     ignorelist_set_invert(ignorelist, invert);
965   } else {
966     return -1;
967   }
968 
969   return 0;
970 } /* }}} int connectivity_config */
971 
connectivity_read(void)972 static int connectivity_read(void) /* {{{ */
973 {
974   pthread_mutex_lock(&connectivity_threads_lock);
975 
976   if (connectivity_netlink_thread_error != 0) {
977 
978     pthread_mutex_unlock(&connectivity_threads_lock);
979 
980     ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
981           "it.");
982 
983     stop_netlink_thread(0);
984 
985     for (interface_list_t *il = interface_list_head; il != NULL;
986          il = il->next) {
987       il->status = LINK_STATE_UNKNOWN;
988       il->prev_status = LINK_STATE_UNKNOWN;
989       il->sent = 0;
990     }
991 
992     start_netlink_thread();
993 
994     return -1;
995   } /* if (connectivity_netlink_thread_error != 0) */
996 
997   pthread_mutex_unlock(&connectivity_threads_lock);
998 
999   return 0;
1000 } /* }}} int connectivity_read */
1001 
connectivity_shutdown(void)1002 static int connectivity_shutdown(void) /* {{{ */
1003 {
1004   DEBUG("connectivity plugin: Shutting down thread.");
1005 
1006   int status = stop_threads();
1007 
1008   interface_list_t *il = interface_list_head;
1009   while (il != NULL) {
1010     interface_list_t *il_next;
1011 
1012     il_next = il->next;
1013 
1014     sfree(il->interface);
1015     sfree(il);
1016 
1017     il = il_next;
1018   }
1019 
1020   ignorelist_free(ignorelist);
1021 
1022   return status;
1023 } /* }}} int connectivity_shutdown */
1024 
module_register(void)1025 void module_register(void) {
1026   plugin_register_config("connectivity", connectivity_config, config_keys,
1027                          config_keys_num);
1028   plugin_register_init("connectivity", connectivity_init);
1029   plugin_register_read("connectivity", connectivity_read);
1030   plugin_register_shutdown("connectivity", connectivity_shutdown);
1031 } /* void module_register */
1032