1 /**
2 * collectd - src/connectivity.c
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a
5 * copy of this software and associated documentation files (the "Software"),
6 * to deal in the Software without restriction, including without limitation
7 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8 * and/or sell copies of the Software, and to permit persons to whom the
9 * Software is furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20 * DEALINGS IN THE SOFTWARE.
21 *
22 * Authors:
23 * Red Hat NFVPE
24 * Andrew Bays <abays at redhat.com>
25 * Aneesh Puttur <aputtur at redhat.com>
26 **/
27
28 #include "collectd.h"
29
30 #include "plugin.h"
31 #include "utils/common/common.h"
32 #include "utils/ignorelist/ignorelist.h"
33 #include "utils_complain.h"
34
35 #include <asm/types.h>
36 #include <errno.h>
37 #include <net/if.h>
38 #include <netinet/in.h>
39 #include <pthread.h>
40 #include <stdio.h>
41 #include <string.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44
45 #include <libmnl/libmnl.h>
46 #include <linux/netlink.h>
47 #include <linux/rtnetlink.h>
48
49 #include <yajl/yajl_common.h>
50 #include <yajl/yajl_gen.h>
51 #if HAVE_YAJL_YAJL_VERSION_H
52 #include <yajl/yajl_version.h>
53 #endif
54 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
55 #define HAVE_YAJL_V2 1
56 #endif
57
58 #define MYPROTO NETLINK_ROUTE
59
60 #define LINK_STATE_DOWN 0
61 #define LINK_STATE_UP 1
62 #define LINK_STATE_UNKNOWN 2
63
64 #define CONNECTIVITY_DOMAIN_FIELD "domain"
65 #define CONNECTIVITY_DOMAIN_VALUE "stateChange"
66 #define CONNECTIVITY_EVENT_ID_FIELD "eventId"
67 #define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
68 #define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
69 #define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
70 #define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
71 #define CONNECTIVITY_PRIORITY_FIELD "priority"
72 #define CONNECTIVITY_PRIORITY_VALUE "high"
73 #define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
74 #define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
75 #define CONNECTIVITY_SEQUENCE_FIELD "sequence"
76 #define CONNECTIVITY_SEQUENCE_VALUE "0"
77 #define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
78 #define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
79 #define CONNECTIVITY_VERSION_FIELD "version"
80 #define CONNECTIVITY_VERSION_VALUE "1.0"
81
82 #define CONNECTIVITY_NEW_STATE_FIELD "newState"
83 #define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
84 #define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
85 #define CONNECTIVITY_OLD_STATE_FIELD "oldState"
86 #define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
87 #define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
88 #define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
89 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD \
90 "stateChangeFieldsVersion"
91 #define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
92 #define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
93
94 /*
95 * Private data types
96 */
97
98 struct interface_list_s {
99 char *interface;
100
101 uint32_t status;
102 uint32_t prev_status;
103 uint32_t sent;
104 cdtime_t timestamp;
105
106 struct interface_list_s *next;
107 };
108 typedef struct interface_list_s interface_list_t;
109
110 /*
111 * Private variables
112 */
113
114 static ignorelist_t *ignorelist = NULL;
115
116 static interface_list_t *interface_list_head = NULL;
117 static int monitor_all_interfaces = 1;
118
119 static int connectivity_netlink_thread_loop = 0;
120 static int connectivity_netlink_thread_error = 0;
121 static pthread_t connectivity_netlink_thread_id;
122 static int connectivity_dequeue_thread_loop = 0;
123 static pthread_t connectivity_dequeue_thread_id;
124 static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER;
125 static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
126 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
127 static int nl_sock = -1;
128 static int event_id = 0;
129 static int statuses_to_send = 0;
130
131 static const char *config_keys[] = {"Interface", "IgnoreSelected"};
132 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
133
134 /*
135 * Private functions
136 */
137
gen_message_payload(int state,int old_state,const char * interface,cdtime_t timestamp,char ** buf)138 static int gen_message_payload(int state, int old_state, const char *interface,
139 cdtime_t timestamp, char **buf) {
140 const unsigned char *buf2;
141 yajl_gen g;
142 char json_str[DATA_MAX_NAME_LEN];
143
144 #if !defined(HAVE_YAJL_V2)
145 yajl_gen_config conf = {0};
146 #endif
147
148 #if HAVE_YAJL_V2
149 size_t len;
150 g = yajl_gen_alloc(NULL);
151 yajl_gen_config(g, yajl_gen_beautify, 0);
152 #else
153 unsigned int len;
154 g = yajl_gen_alloc(&conf, NULL);
155 #endif
156
157 yajl_gen_clear(g);
158
159 // *** BEGIN common event header ***
160
161 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
162 goto err;
163
164 // domain
165 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
166 strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
167 goto err;
168
169 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
170 strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
171 goto err;
172
173 // eventId
174 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
175 strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
176 yajl_gen_status_ok)
177 goto err;
178
179 event_id = event_id + 1;
180 if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
181 goto err;
182 }
183
184 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
185 goto err;
186 }
187
188 // eventName
189 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
190 strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
191 yajl_gen_status_ok)
192 goto err;
193
194 if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface,
195 (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
196 : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) {
197 goto err;
198 }
199
200 if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
201 yajl_gen_status_ok) {
202 goto err;
203 }
204
205 // lastEpochMicrosec
206 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
207 strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
208 yajl_gen_status_ok)
209 goto err;
210
211 if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
212 CDTIME_T_TO_US(cdtime())) < 0) {
213 goto err;
214 }
215
216 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
217 goto err;
218 }
219
220 // priority
221 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
222 strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
223 yajl_gen_status_ok)
224 goto err;
225
226 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
227 strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
228 yajl_gen_status_ok)
229 goto err;
230
231 // reportingEntityName
232 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
233 strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
234 yajl_gen_status_ok)
235 goto err;
236
237 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
238 strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
239 yajl_gen_status_ok)
240 goto err;
241
242 // sequence
243 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
244 strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
245 yajl_gen_status_ok)
246 goto err;
247
248 if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
249 strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
250 yajl_gen_status_ok)
251 goto err;
252
253 // sourceName
254 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
255 strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
256 yajl_gen_status_ok)
257 goto err;
258
259 if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
260 yajl_gen_status_ok)
261 goto err;
262
263 // startEpochMicrosec
264 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
265 strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
266 yajl_gen_status_ok)
267 goto err;
268
269 if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
270 CDTIME_T_TO_US(timestamp)) < 0) {
271 goto err;
272 }
273
274 if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
275 goto err;
276 }
277
278 // version
279 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
280 strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
281 goto err;
282
283 if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
284 strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
285 goto err;
286
287 // *** END common event header ***
288
289 // *** BEGIN state change fields ***
290
291 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
292 strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
293 yajl_gen_status_ok)
294 goto err;
295
296 if (yajl_gen_map_open(g) != yajl_gen_status_ok)
297 goto err;
298
299 // newState
300 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
301 strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
302 yajl_gen_status_ok)
303 goto err;
304
305 int new_state_len =
306 (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
307 : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
308
309 if (yajl_gen_string(g,
310 (u_char *)(state == 0
311 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
312 : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
313 new_state_len) != yajl_gen_status_ok)
314 goto err;
315
316 // oldState
317 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
318 strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
319 yajl_gen_status_ok)
320 goto err;
321
322 int old_state_len =
323 (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
324 : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
325
326 if (yajl_gen_string(g,
327 (u_char *)(old_state == 0
328 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
329 : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
330 old_state_len) != yajl_gen_status_ok)
331 goto err;
332
333 // stateChangeFieldsVersion
334 if (yajl_gen_string(g,
335 (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
336 strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
337 yajl_gen_status_ok)
338 goto err;
339
340 if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
341 strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
342 yajl_gen_status_ok)
343 goto err;
344
345 // stateInterface
346 if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
347 strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
348 yajl_gen_status_ok)
349 goto err;
350
351 if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
352 yajl_gen_status_ok)
353 goto err;
354
355 // close state change and header fields
356 if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
357 yajl_gen_map_close(g) != yajl_gen_status_ok)
358 goto err;
359
360 // *** END state change fields ***
361
362 if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
363 goto err;
364
365 *buf = strdup((char *)buf2);
366
367 if (*buf == NULL) {
368 ERROR("connectivity plugin: strdup failed during gen_message_payload: %s",
369 STRERRNO);
370 goto err;
371 }
372
373 yajl_gen_free(g);
374
375 return 0;
376
377 err:
378 yajl_gen_free(g);
379 ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
380 return -1;
381 }
382
add_interface(const char * interface,int status,int prev_status)383 static interface_list_t *add_interface(const char *interface, int status,
384 int prev_status) {
385 interface_list_t *il = calloc(1, sizeof(*il));
386
387 if (il == NULL) {
388 ERROR("connectivity plugin: calloc failed during add_interface: %s",
389 STRERRNO);
390 return NULL;
391 }
392
393 char *interface2 = strdup(interface);
394 if (interface2 == NULL) {
395 sfree(il);
396 ERROR("connectivity plugin: strdup failed during add_interface: %s",
397 STRERRNO);
398 return NULL;
399 }
400
401 il->interface = interface2;
402 il->status = status;
403 il->prev_status = prev_status;
404 il->timestamp = cdtime();
405 il->sent = 0;
406 il->next = interface_list_head;
407 interface_list_head = il;
408
409 DEBUG("connectivity plugin: added interface %s", interface2);
410
411 return il;
412 }
413
connectivity_link_state(struct nlmsghdr * msg)414 static int connectivity_link_state(struct nlmsghdr *msg) {
415 pthread_mutex_lock(&connectivity_data_lock);
416
417 struct nlattr *attr;
418 struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
419
420 /* Scan attribute list for device name. */
421 mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
422 if (mnl_attr_get_type(attr) != IFLA_IFNAME)
423 continue;
424
425 if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
426 ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
427 "mnl_attr_validate "
428 "failed.");
429 pthread_mutex_unlock(&connectivity_data_lock);
430 return MNL_CB_ERROR;
431 }
432
433 const char *dev = mnl_attr_get_str(attr);
434
435 // Check the list of interfaces we should monitor, if we've chosen
436 // a subset. If we don't care about this one, abort.
437 if (ignorelist_match(ignorelist, dev) != 0) {
438 DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
439 "interface: %s",
440 dev);
441 break;
442 }
443
444 interface_list_t *il = NULL;
445
446 for (il = interface_list_head; il != NULL; il = il->next)
447 if (strcmp(dev, il->interface) == 0)
448 break;
449
450 if (il == NULL) {
451 // We haven't encountered this interface yet, so add it to the linked list
452 il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
453
454 if (il == NULL) {
455 ERROR("connectivity plugin: unable to add interface %s during "
456 "connectivity_link_state",
457 dev);
458 return MNL_CB_ERROR;
459 }
460 }
461
462 uint32_t prev_status = il->status;
463 il->status =
464 ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
465 il->timestamp = cdtime();
466
467 // If the new status is different than the previous status,
468 // store the previous status and set sent to zero, and set the
469 // global flag to indicate there are statuses to dispatch
470 if (il->status != prev_status) {
471 il->prev_status = prev_status;
472 il->sent = 0;
473 statuses_to_send = 1;
474 }
475
476 DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
477 (unsigned long long)il->timestamp, dev,
478 ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
479
480 // no need to loop again, we found the interface name attr
481 // (otherwise the first if-statement in the loop would
482 // have moved us on with 'continue')
483 break;
484 }
485
486 pthread_mutex_unlock(&connectivity_data_lock);
487
488 return 0;
489 }
490
msg_handler(struct nlmsghdr * msg)491 static int msg_handler(struct nlmsghdr *msg) {
492 // We are only interested in RTM_NEWLINK messages
493 if (msg->nlmsg_type != RTM_NEWLINK) {
494 return 0;
495 }
496 return connectivity_link_state(msg);
497 }
498
read_event(int (* msg_handler)(struct nlmsghdr *))499 static int read_event(int (*msg_handler)(struct nlmsghdr *)) {
500 int ret = 0;
501 int recv_flags = MSG_DONTWAIT;
502
503 if (nl_sock == -1 || msg_handler == NULL)
504 return EINVAL;
505
506 while (42) {
507 pthread_mutex_lock(&connectivity_threads_lock);
508
509 if (connectivity_netlink_thread_loop <= 0) {
510 pthread_mutex_unlock(&connectivity_threads_lock);
511 return ret;
512 }
513
514 pthread_mutex_unlock(&connectivity_threads_lock);
515
516 char buf[4096];
517 int status = recv(nl_sock, buf, sizeof(buf), recv_flags);
518
519 if (status < 0) {
520
521 // If there were no more messages to drain from the socket,
522 // then signal the dequeue thread and allow it to dispatch
523 // any saved interface status changes. Then continue, but
524 // block and wait for new messages
525 if (errno == EWOULDBLOCK || errno == EAGAIN) {
526 pthread_cond_signal(&connectivity_cond);
527
528 recv_flags = 0;
529 continue;
530 }
531
532 if (errno == EINTR) {
533 // Interrupt, so just continue and try again
534 continue;
535 }
536
537 /* Anything else is an error */
538 ERROR("connectivity plugin: read_event: Error recv: %d", status);
539 return status;
540 }
541
542 // Message received successfully, so we'll stop blocking on the
543 // receive call for now (until we get a "would block" error, which
544 // will be handled above)
545 recv_flags = MSG_DONTWAIT;
546
547 if (status == 0) {
548 DEBUG("connectivity plugin: read_event: EOF");
549 }
550
551 /* We need to handle more than one message per 'recvmsg' */
552 for (struct nlmsghdr *h = (struct nlmsghdr *)buf;
553 NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) {
554 /* Finish reading */
555 if (h->nlmsg_type == NLMSG_DONE)
556 return ret;
557
558 /* Message is some kind of error */
559 if (h->nlmsg_type == NLMSG_ERROR) {
560 struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h);
561 ERROR("connectivity plugin: read_event: Message is an error: %d",
562 l_err->error);
563 return -1; // Error
564 }
565
566 /* Call message handler */
567 if (msg_handler) {
568 ret = (*msg_handler)(h);
569 if (ret < 0) {
570 ERROR("connectivity plugin: read_event: Message handler error %d",
571 ret);
572 return ret;
573 }
574 } else {
575 ERROR("connectivity plugin: read_event: Error NULL message handler");
576 return -1;
577 }
578 }
579 }
580
581 return ret;
582 }
583
connectivity_dispatch_notification(const char * interface,gauge_t value,gauge_t old_value,cdtime_t timestamp)584 static void connectivity_dispatch_notification(const char *interface,
585 gauge_t value, gauge_t old_value,
586 cdtime_t timestamp) {
587
588 notification_t n = {
589 .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE),
590 .time = cdtime(),
591 .plugin = "connectivity",
592 .type = "gauge",
593 .type_instance = "interface_status",
594 };
595
596 sstrncpy(n.host, hostname_g, sizeof(n.host));
597 sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
598
599 char *buf = NULL;
600
601 gen_message_payload(value, old_value, interface, timestamp, &buf);
602
603 int status = plugin_notification_meta_add_string(&n, "ves", buf);
604
605 if (status < 0) {
606 sfree(buf);
607 ERROR("connectivity plugin: unable to set notification VES metadata: %s",
608 STRERRNO);
609 return;
610 }
611
612 DEBUG("connectivity plugin: notification VES metadata: %s",
613 n.meta->nm_value.nm_string);
614
615 DEBUG("connectivity plugin: dispatching state %d for interface %s",
616 (int)value, interface);
617
618 plugin_dispatch_notification(&n);
619 plugin_notification_meta_free(n.meta);
620
621 // strdup'd in gen_message_payload
622 if (buf != NULL)
623 sfree(buf);
624 }
625
626 // NOTE: Caller MUST hold connectivity_data_lock when calling this function
send_interface_status()627 static void send_interface_status() {
628 for (interface_list_t *il = interface_list_head; il != NULL;
629 il = il->next) /* {{{ */
630 {
631 uint32_t status = il->status;
632 uint32_t prev_status = il->prev_status;
633 uint32_t sent = il->sent;
634
635 if (status != prev_status && sent == 0) {
636 connectivity_dispatch_notification(il->interface, status, prev_status,
637 il->timestamp);
638 il->sent = 1;
639 }
640 } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
641
642 statuses_to_send = 0;
643 }
644
read_interface_status()645 static void read_interface_status() /* {{{ */
646 {
647 pthread_mutex_lock(&connectivity_data_lock);
648
649 // If we don't have any interface statuses to dispatch,
650 // then we wait until signalled
651 if (!statuses_to_send)
652 pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
653
654 send_interface_status();
655
656 pthread_mutex_unlock(&connectivity_data_lock);
657 } /* }}} int *read_interface_status */
658
connectivity_netlink_thread(void * arg)659 static void *connectivity_netlink_thread(void *arg) /* {{{ */
660 {
661 pthread_mutex_lock(&connectivity_threads_lock);
662
663 while (connectivity_netlink_thread_loop > 0) {
664 pthread_mutex_unlock(&connectivity_threads_lock);
665
666 int status = read_event(msg_handler);
667
668 pthread_mutex_lock(&connectivity_threads_lock);
669
670 if (status < 0) {
671 connectivity_netlink_thread_error = 1;
672 break;
673 }
674 } /* while (connectivity_netlink_thread_loop > 0) */
675
676 pthread_mutex_unlock(&connectivity_threads_lock);
677
678 return (void *)0;
679 } /* }}} void *connectivity_netlink_thread */
680
connectivity_dequeue_thread(void * arg)681 static void *connectivity_dequeue_thread(void *arg) /* {{{ */
682 {
683 pthread_mutex_lock(&connectivity_threads_lock);
684
685 while (connectivity_dequeue_thread_loop > 0) {
686 pthread_mutex_unlock(&connectivity_threads_lock);
687
688 read_interface_status();
689
690 pthread_mutex_lock(&connectivity_threads_lock);
691 } /* while (connectivity_dequeue_thread_loop > 0) */
692
693 pthread_mutex_unlock(&connectivity_threads_lock);
694
695 return ((void *)0);
696 } /* }}} void *connectivity_dequeue_thread */
697
nl_connect()698 static int nl_connect() {
699 struct sockaddr_nl sa_nl = {
700 .nl_family = AF_NETLINK,
701 .nl_groups = RTMGRP_LINK,
702 .nl_pid = getpid(),
703 };
704
705 nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
706 if (nl_sock == -1) {
707 ERROR("connectivity plugin: socket open failed: %s", STRERRNO);
708 return -1;
709 }
710
711 int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
712 if (rc == -1) {
713 ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
714 close(nl_sock);
715 nl_sock = -1;
716 return -1;
717 }
718
719 return 0;
720 }
721
start_netlink_thread(void)722 static int start_netlink_thread(void) /* {{{ */
723 {
724 pthread_mutex_lock(&connectivity_threads_lock);
725
726 if (connectivity_netlink_thread_loop != 0) {
727 pthread_mutex_unlock(&connectivity_threads_lock);
728 return 0;
729 }
730
731 connectivity_netlink_thread_loop = 1;
732 connectivity_netlink_thread_error = 0;
733
734 int status;
735
736 if (nl_sock == -1) {
737 status = nl_connect();
738
739 if (status != 0) {
740 pthread_mutex_unlock(&connectivity_threads_lock);
741 return status;
742 }
743 }
744
745 status = plugin_thread_create(&connectivity_netlink_thread_id,
746 connectivity_netlink_thread,
747 /* arg = */ (void *)0, "connectivity");
748 if (status != 0) {
749 connectivity_netlink_thread_loop = 0;
750 ERROR("connectivity plugin: Starting thread failed.");
751 pthread_mutex_unlock(&connectivity_threads_lock);
752
753 int status2 = close(nl_sock);
754
755 if (status2 != 0) {
756 ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
757 status2, STRERRNO);
758 }
759
760 nl_sock = -1;
761
762 return -1;
763 }
764
765 pthread_mutex_unlock(&connectivity_threads_lock);
766
767 return status;
768 }
769
start_dequeue_thread(void)770 static int start_dequeue_thread(void) /* {{{ */
771 {
772 pthread_mutex_lock(&connectivity_threads_lock);
773
774 if (connectivity_dequeue_thread_loop != 0) {
775 pthread_mutex_unlock(&connectivity_threads_lock);
776 return 0;
777 }
778
779 connectivity_dequeue_thread_loop = 1;
780
781 int status = plugin_thread_create(&connectivity_dequeue_thread_id,
782 connectivity_dequeue_thread,
783 /* arg = */ (void *)0, "connectivity");
784 if (status != 0) {
785 connectivity_dequeue_thread_loop = 0;
786 ERROR("connectivity plugin: Starting dequeue thread failed.");
787 pthread_mutex_unlock(&connectivity_threads_lock);
788 return -1;
789 }
790
791 pthread_mutex_unlock(&connectivity_threads_lock);
792
793 return status;
794 } /* }}} int start_dequeue_thread */
795
start_threads(void)796 static int start_threads(void) /* {{{ */
797 {
798 int status = start_netlink_thread();
799 int status2 = start_dequeue_thread();
800
801 if (status != 0)
802 return status;
803 else
804 return status2;
805 } /* }}} int start_threads */
806
stop_netlink_thread(int shutdown)807 static int stop_netlink_thread(int shutdown) /* {{{ */
808 {
809 int socket_status;
810
811 if (nl_sock != -1) {
812 socket_status = close(nl_sock);
813 if (socket_status != 0) {
814 ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
815 socket_status, STRERRNO);
816 }
817
818 nl_sock = -1;
819 } else
820 socket_status = 0;
821
822 pthread_mutex_lock(&connectivity_threads_lock);
823
824 if (connectivity_netlink_thread_loop == 0) {
825 pthread_mutex_unlock(&connectivity_threads_lock);
826 // Thread has already been terminated, nothing more to attempt
827 return socket_status;
828 }
829
830 // Set thread termination status
831 connectivity_netlink_thread_loop = 0;
832 pthread_mutex_unlock(&connectivity_threads_lock);
833
834 // Let threads waiting on access to the interface list know to move
835 // on such that they'll see the thread's termination status
836 pthread_cond_broadcast(&connectivity_cond);
837
838 int thread_status;
839
840 if (shutdown == 1) {
841 // Since the thread is blocking, calling pthread_join
842 // doesn't actually succeed in stopping it. It will stick around
843 // until a NETLINK message is received on the socket (at which
844 // it will realize that "connectivity_netlink_thread_loop" is 0 and will
845 // break out of the read loop and be allowed to die). This is
846 // fine when the process isn't supposed to be exiting, but in
847 // the case of a process shutdown, we don't want to have an
848 // idle thread hanging around. Calling pthread_cancel here in
849 // the case of a shutdown is just assures that the thread is
850 // gone and that the process has been fully terminated.
851
852 DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
853
854 thread_status = pthread_cancel(connectivity_netlink_thread_id);
855
856 if (thread_status != 0 && thread_status != ESRCH) {
857 ERROR("connectivity plugin: Unable to cancel netlink thread: %d",
858 thread_status);
859 thread_status = -1;
860 } else
861 thread_status = 0;
862 } else {
863 thread_status =
864 pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
865 if (thread_status != 0 && thread_status != ESRCH) {
866 ERROR("connectivity plugin: Stopping netlink thread failed: %d",
867 thread_status);
868 thread_status = -1;
869 } else
870 thread_status = 0;
871 }
872
873 pthread_mutex_lock(&connectivity_threads_lock);
874 memset(&connectivity_netlink_thread_id, 0,
875 sizeof(connectivity_netlink_thread_id));
876 connectivity_netlink_thread_error = 0;
877 pthread_mutex_unlock(&connectivity_threads_lock);
878
879 DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
880
881 if (socket_status != 0)
882 return socket_status;
883 else
884 return thread_status;
885 }
886
stop_dequeue_thread()887 static int stop_dequeue_thread() /* {{{ */
888 {
889 pthread_mutex_lock(&connectivity_threads_lock);
890
891 if (connectivity_dequeue_thread_loop == 0) {
892 pthread_mutex_unlock(&connectivity_threads_lock);
893 return -1;
894 }
895
896 // Set thread termination status
897 connectivity_dequeue_thread_loop = 0;
898 pthread_mutex_unlock(&connectivity_threads_lock);
899
900 // Let threads waiting on access to the interface list know to move
901 // on such that they'll see the threads termination status
902 pthread_cond_broadcast(&connectivity_cond);
903
904 // Calling pthread_cancel here just assures that the thread is
905 // gone and that the process has been fully terminated.
906
907 DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
908
909 int status = pthread_cancel(connectivity_dequeue_thread_id);
910
911 if (status != 0 && status != ESRCH) {
912 ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
913 status = -1;
914 } else
915 status = 0;
916
917 pthread_mutex_lock(&connectivity_threads_lock);
918 memset(&connectivity_dequeue_thread_id, 0,
919 sizeof(connectivity_dequeue_thread_id));
920 pthread_mutex_unlock(&connectivity_threads_lock);
921
922 DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
923
924 return status;
925 } /* }}} int stop_dequeue_thread */
926
stop_threads()927 static int stop_threads() /* {{{ */
928 {
929 int status = stop_netlink_thread(1);
930 int status2 = stop_dequeue_thread();
931
932 if (status != 0)
933 return status;
934 else
935 return status2;
936 } /* }}} int stop_threads */
937
connectivity_init(void)938 static int connectivity_init(void) /* {{{ */
939 {
940 if (monitor_all_interfaces) {
941 NOTICE("connectivity plugin: No interfaces have been selected, so all will "
942 "be monitored");
943 }
944
945 return start_threads();
946 } /* }}} int connectivity_init */
947
connectivity_config(const char * key,const char * value)948 static int connectivity_config(const char *key, const char *value) /* {{{ */
949 {
950 if (ignorelist == NULL) {
951 ignorelist = ignorelist_create(/* invert = */ 1);
952
953 if (ignorelist == NULL)
954 return -1;
955 }
956
957 if (strcasecmp(key, "Interface") == 0) {
958 ignorelist_add(ignorelist, value);
959 monitor_all_interfaces = 0;
960 } else if (strcasecmp(key, "IgnoreSelected") == 0) {
961 int invert = 1;
962 if (IS_TRUE(value))
963 invert = 0;
964 ignorelist_set_invert(ignorelist, invert);
965 } else {
966 return -1;
967 }
968
969 return 0;
970 } /* }}} int connectivity_config */
971
connectivity_read(void)972 static int connectivity_read(void) /* {{{ */
973 {
974 pthread_mutex_lock(&connectivity_threads_lock);
975
976 if (connectivity_netlink_thread_error != 0) {
977
978 pthread_mutex_unlock(&connectivity_threads_lock);
979
980 ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
981 "it.");
982
983 stop_netlink_thread(0);
984
985 for (interface_list_t *il = interface_list_head; il != NULL;
986 il = il->next) {
987 il->status = LINK_STATE_UNKNOWN;
988 il->prev_status = LINK_STATE_UNKNOWN;
989 il->sent = 0;
990 }
991
992 start_netlink_thread();
993
994 return -1;
995 } /* if (connectivity_netlink_thread_error != 0) */
996
997 pthread_mutex_unlock(&connectivity_threads_lock);
998
999 return 0;
1000 } /* }}} int connectivity_read */
1001
connectivity_shutdown(void)1002 static int connectivity_shutdown(void) /* {{{ */
1003 {
1004 DEBUG("connectivity plugin: Shutting down thread.");
1005
1006 int status = stop_threads();
1007
1008 interface_list_t *il = interface_list_head;
1009 while (il != NULL) {
1010 interface_list_t *il_next;
1011
1012 il_next = il->next;
1013
1014 sfree(il->interface);
1015 sfree(il);
1016
1017 il = il_next;
1018 }
1019
1020 ignorelist_free(ignorelist);
1021
1022 return status;
1023 } /* }}} int connectivity_shutdown */
1024
module_register(void)1025 void module_register(void) {
1026 plugin_register_config("connectivity", connectivity_config, config_keys,
1027 config_keys_num);
1028 plugin_register_init("connectivity", connectivity_init);
1029 plugin_register_read("connectivity", connectivity_read);
1030 plugin_register_shutdown("connectivity", connectivity_shutdown);
1031 } /* void module_register */
1032