1 /*----------------------------------------------------------------------------*/
2 /* Xymon message daemon. */
3 /* */
4 /* This module receives messages from one channel of the Xymon master daemon. */
5 /* These messages are then forwarded to the actual worker process via stdin; */
6 /* the worker process can process the messages without having to care about */
7 /* the tricky details in the xymond/xymond_channel communications. */
8 /* */
9 /* This program is released under the GNU General Public License (GPL), */
10 /* version 2. See the file "COPYING" for details. */
11 /* */
12 /* Copyright (C) 2004-2011 Henrik Storner <henrik@hswn.dk> */
13 /* */
14 /*----------------------------------------------------------------------------*/
15
16 static char rcsid[] = "$Id: xymond_channel.c 7678 2015-10-01 14:42:42Z jccleaver $";
17
18 #include <sys/types.h>
19 #include <sys/ipc.h>
20 #include <sys/sem.h>
21 #include <sys/shm.h>
22 #include <sys/time.h>
23
24 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <arpa/inet.h>
27 #ifdef HAVE_SYS_SELECT_H
28 #include <sys/select.h>
29 #endif
30 #include <errno.h>
31 #include <netdb.h>
32 #include <fcntl.h>
33
34 #include <sys/wait.h>
35 #include <stdlib.h>
36 #include <unistd.h>
37 #include <string.h>
38
39 #include "libxymon.h"
40
41 #include <signal.h>
42
43
44
45 /* Our in-memory queue of messages received from xymond via IPC. One queue per peer. */
46 typedef struct xymon_msg_t {
47 time_t tstamp; /* When did the message arrive */
48 char *buf; /* The message data */
49 char *bufp; /* Next char to send */
50 int buflen; /* How many bytes left to send */
51 struct xymon_msg_t *next;
52 } xymon_msg_t;
53
54
55 /* Our list of peers we send data to */
56 typedef struct xymon_peer_t {
57 char *peername;
58
59 enum { P_DOWN, P_UP, P_FAILED } peerstatus;
60 xymon_msg_t *msghead, *msgtail; /* Message queue */
61
62 enum { P_LOCAL, P_NET } peertype;
63 int peersocket; /* File descriptor receiving the data */
64 time_t lastopentime; /* Last time we attempted to connect to the peer */
65
66 /* For P_NET peers */
67 struct sockaddr_in peeraddr; /* The IP address of the peer */
68
69 /* For P_LOCAL peers */
70 char *childcmd; /* Command and arguments for the child process */
71 char **childargs;
72 pid_t childpid; /* PID of the running worker child */
73 } xymon_peer_t;
74
75 void * peers;
76
77 pid_t deadpid = 0;
78 int childexit;
79
80 xymond_channel_t *channel = NULL;
81 char *logfn = NULL;
82 int locatorbased = 0;
83 enum locator_servicetype_t locatorservice = ST_MAX;
84
85 static int running = 1;
86 static int gotalarm = 0;
87 static int pendingcount = 0;
88 static int messagetimeout = 30;
89
90 /*
91 * chksumsize is the space left in front of the message buffer, to
92 * allow room for a message digest checksum to be added to the
93 * message. Since we use an MD5 hash, this will be 32 bytes
94 * plus a one-char marker.
95 */
96 static int checksumsize = 0;
97
addnetpeer(char * peername)98 void addnetpeer(char *peername)
99 {
100 xymon_peer_t *newpeer;
101 struct in_addr addr;
102 char *oneip;
103 int peerport = 0;
104 char *delim;
105
106 dbgprintf("Adding network peer %s\n", peername);
107
108 oneip = strdup(peername);
109
110 delim = strchr(oneip, ':');
111 if (delim) {
112 *delim = '\0';
113 peerport = atoi(delim+1);
114 }
115
116 if (inet_aton(oneip, &addr) == 0) {
117 /* peer is not an IP - do DNS lookup */
118
119 struct hostent *hent;
120
121 hent = gethostbyname(oneip);
122 if (hent) {
123 char *realip;
124
125 memcpy(&addr, *(hent->h_addr_list), sizeof(struct in_addr));
126 realip = inet_ntoa(addr);
127 if (inet_aton(realip, &addr) == 0) {
128 errprintf("Invalid IP address for %s (%s)\n", oneip, realip);
129 goto done;
130 }
131 }
132 else {
133 errprintf("Cannot determine IP address of peer %s\n", oneip);
134 goto done;
135 }
136 }
137
138 if (peerport == 0) peerport = atoi(xgetenv("XYMONDPORT"));
139
140 newpeer = calloc(1, sizeof(xymon_peer_t));
141 newpeer->peername = strdup(peername);
142 newpeer->peerstatus = P_DOWN;
143 newpeer->peertype = P_NET;
144 newpeer->peeraddr.sin_family = AF_INET;
145 newpeer->peeraddr.sin_addr.s_addr = addr.s_addr;
146 newpeer->peeraddr.sin_port = htons(peerport);
147
148 xtreeAdd(peers, newpeer->peername, newpeer);
149
150 done:
151 xfree(oneip);
152 }
153
154
addlocalpeer(char * childcmd,char ** childargs)155 void addlocalpeer(char *childcmd, char **childargs)
156 {
157 xymon_peer_t *newpeer;
158 int i, count;
159
160 dbgprintf("Adding local peer using command %s\n", childcmd);
161
162 for (count=0; (childargs[count]); count++) ;
163
164 newpeer = (xymon_peer_t *)calloc(1, sizeof(xymon_peer_t));
165 newpeer->peername = strdup("");
166 newpeer->peerstatus = P_DOWN;
167 newpeer->peertype = P_LOCAL;
168 newpeer->childcmd = strdup(childcmd);
169 newpeer->childargs = (char **)calloc(count+1, sizeof(char *));
170 for (i=0; (i<count); i++) newpeer->childargs[i] = strdup(childargs[i]);
171
172 xtreeAdd(peers, newpeer->peername, newpeer);
173 }
174
175
openconnection(xymon_peer_t * peer)176 void openconnection(xymon_peer_t *peer)
177 {
178 int n;
179 int pfd[2];
180 pid_t childpid;
181 time_t now;
182
183 peer->peerstatus = P_DOWN;
184
185 now = gettimer();
186 if (now < (peer->lastopentime + 60)) return; /* Will only attempt one open per minute */
187
188 dbgprintf("Connecting to peer %s:%d\n", inet_ntoa(peer->peeraddr.sin_addr), ntohs(peer->peeraddr.sin_port));
189
190 peer->lastopentime = now;
191 switch (peer->peertype) {
192 case P_NET:
193 /* Get a socket, and connect to the peer */
194 peer->peersocket = socket(PF_INET, SOCK_STREAM, 0);
195 if (peer->peersocket == -1) {
196 errprintf("Cannot get socket: %s\n", strerror(errno));
197 return;
198 }
199
200 n = connect(peer->peersocket, (struct sockaddr *)&peer->peeraddr, sizeof(peer->peeraddr));
201 if (n == -1) {
202 errprintf("Cannot connect to peer %s:%d : %s\n",
203 inet_ntoa(peer->peeraddr.sin_addr), ntohs(peer->peeraddr.sin_port),
204 strerror(errno));
205 return;
206 }
207 break;
208
209 case P_LOCAL:
210 /* Create a pipe to the child handler program, and run it */
211 n = pipe(pfd);
212 if (n == -1) {
213 errprintf("Could not get a pipe: %s\n", strerror(errno));
214 return;
215 }
216
217 childpid = fork();
218 if (childpid == -1) {
219 errprintf("Could not fork channel handler: %s\n", strerror(errno));
220 return;
221 }
222 else if (childpid == 0) {
223 /* The channel handler child */
224 if (logfn) {
225 char *logfnenv = (char *)malloc(strlen(logfn) + 30);
226 sprintf(logfnenv, "XYMONCHANNEL_LOGFILENAME=%s", logfn);
227 putenv(logfnenv);
228 }
229
230 dbgprintf("Child '%s' started (PID %d), about to fork\n", peer->childcmd, (int)getpid());
231
232 n = dup2(pfd[0], STDIN_FILENO);
233 close(pfd[0]); close(pfd[1]);
234 n = execvp(peer->childcmd, peer->childargs);
235
236 /* We should never go here */
237 errprintf("exec() failed for child command %s: %s\n",
238 peer->childcmd, strerror(errno));
239 exit(1);
240 }
241
242 /* Parent process continues */
243 close(pfd[0]);
244 peer->peersocket = pfd[1];
245 peer->childpid = childpid;
246 break;
247 }
248
249 fcntl(peer->peersocket, F_SETFL, O_NONBLOCK);
250 peer->peerstatus = P_UP;
251 dbgprintf("Peer is UP\n");
252 }
253
254
255
flushmessage(xymon_peer_t * peer)256 void flushmessage(xymon_peer_t *peer)
257 {
258 xymon_msg_t *zombie;
259
260 zombie = peer->msghead;
261
262 peer->msghead = peer->msghead->next;
263 if (peer->msghead == NULL) peer->msgtail = NULL;
264
265 xfree(zombie->buf);
266 xfree(zombie);
267 pendingcount--;
268 }
269
addmessage_onepeer(xymon_peer_t * peer,char * inbuf,int inlen)270 static void addmessage_onepeer(xymon_peer_t *peer, char *inbuf, int inlen)
271 {
272 xymon_msg_t *newmsg;
273
274 newmsg = (xymon_msg_t *) calloc(1, sizeof(xymon_msg_t));
275 newmsg->tstamp = gettimer();
276 newmsg->buf = newmsg->bufp = inbuf;
277 newmsg->buflen = inlen;
278
279 /*
280 * If we've flagged the peer as FAILED, then change status to DOWN so
281 * we will attempt to reconnect to the peer. The locator believes it is
282 * up and running, so it probably is ...
283 */
284 if (peer->peerstatus == P_FAILED) peer->peerstatus = P_DOWN;
285
286 /* If the peer is down, we will only permit ONE message in the queue. */
287 if (peer->peerstatus != P_UP) {
288 errprintf("Peer not up, flushing message queue\n");
289 while (peer->msghead) flushmessage(peer);
290 }
291
292 if (peer->msghead == NULL) {
293 peer->msghead = peer->msgtail = newmsg;
294 }
295 else {
296 peer->msgtail->next = newmsg;
297 peer->msgtail = newmsg;
298 }
299
300 pendingcount++;
301 }
302
addmessage(char * inbuf)303 int addmessage(char *inbuf)
304 {
305 xtreePos_t phandle;
306 xymon_peer_t *peer;
307 int bcastmsg = 0;
308 int inlen = strlen(inbuf);
309
310 if (locatorbased) {
311 char *hostname, *hostend, *peerlocation;
312
313 /* xymond sends us messages with the KEY in the first field, between a '/' and a '|' */
314 hostname = inbuf + strcspn(inbuf, "/|\r\n");
315 if (*hostname != '/') {
316 errprintf("No key field in message, dropping it\n");
317 return -1; /* Malformed input */
318 }
319 hostname++;
320 bcastmsg = (*hostname == '*');
321 if (!bcastmsg) {
322 /* Lookup which server handles this host */
323 hostend = hostname + strcspn(hostname, "|\r\n");
324 if (*hostend != '|') {
325 errprintf("No delimiter found in input, dropping it\n");
326 return -1; /* Malformed input */
327 }
328 *hostend = '\0';
329 peerlocation = locator_query(hostname, locatorservice, NULL);
330
331 /*
332 * If we get no response, or an empty response,
333 * then there is no server capable of handling this
334 * request.
335 */
336 if (!peerlocation || (*peerlocation == '\0')) {
337 errprintf("No response from locator for %s/%s, dropping it\n",
338 servicetype_names[locatorservice], hostname);
339 return -1;
340 }
341
342 *hostend = '|';
343 phandle = xtreeFind(peers, peerlocation);
344 if (phandle == xtreeEnd(peers)) {
345 /* New peer - register it */
346 addnetpeer(peerlocation);
347 phandle = xtreeFind(peers, peerlocation);
348 }
349 }
350 }
351 else {
352 phandle = xtreeFind(peers, "");
353 }
354
355 if (bcastmsg) {
356 for (phandle = xtreeFirst(peers); (phandle != xtreeEnd(peers)); phandle = xtreeNext(peers, phandle)) {
357 peer = (xymon_peer_t *)xtreeData(peers, phandle);
358
359 addmessage_onepeer(peer, inbuf, inlen);
360 }
361 }
362 else {
363 if (phandle == xtreeEnd(peers)) {
364 errprintf("No peer found to handle message, dropping it\n");
365 return -1;
366 }
367 peer = (xymon_peer_t *)xtreeData(peers, phandle);
368
369 addmessage_onepeer(peer, inbuf, inlen);
370 }
371
372 return 0;
373 }
374
shutdownconnection(xymon_peer_t * peer)375 void shutdownconnection(xymon_peer_t *peer)
376 {
377 if (peer->peerstatus != P_UP) return;
378
379 peer->peerstatus = P_DOWN;
380
381 switch (peer->peertype) {
382 case P_LOCAL:
383 close(peer->peersocket);
384 peer->peersocket = -1;
385 if (peer->childpid > 0) kill(peer->childpid, SIGTERM);
386 peer->childpid = 0;
387 break;
388
389 case P_NET:
390 shutdown(peer->peersocket, SHUT_RDWR);
391 close(peer->peersocket);
392 peer->peersocket = -1;
393 break;
394 }
395
396 /* Any messages queued are discarded */
397 while (peer->msghead) flushmessage(peer);
398 peer->msghead = peer->msgtail = NULL;
399 }
400
401
sig_handler(int signum)402 void sig_handler(int signum)
403 {
404 switch (signum) {
405 case SIGTERM:
406 case SIGINT:
407 /* Shutting down. */
408 running = 0;
409 break;
410
411 case SIGCHLD:
412 /* Our worker child died. Avoid zombies. */
413 deadpid = wait(&childexit);
414 break;
415
416 case SIGALRM:
417 gotalarm = 1;
418 break;
419 }
420 }
421
422
main(int argc,char * argv[])423 int main(int argc, char *argv[])
424 {
425 int daemonize = 0;
426 char *pidfile = NULL;
427 char *envarea = NULL;
428 int cnid = -1;
429 pcre *msgfilter = NULL;
430 pcre *stdfilter = NULL;
431
432 int argi;
433 struct sigaction sa;
434 xtreePos_t handle;
435
436
437 /* Don't save the error buffer */
438 save_errbuf = 0;
439
440 /* Create the peer container */
441 peers = xtreeNew(strcasecmp);
442
443 for (argi=1; (argi < argc); argi++) {
444 if (argnmatch(argv[argi], "--debug")) {
445 debug = 1;
446 }
447 else if (argnmatch(argv[argi], "--channel=")) {
448 char *cn = strchr(argv[argi], '=') + 1;
449
450 for (cnid = C_STATUS; (channelnames[cnid] && strcmp(channelnames[cnid], cn)); cnid++) ;
451 if (channelnames[cnid] == NULL) cnid = -1;
452 }
453 else if (argnmatch(argv[argi], "--msgtimeout")) {
454 char *p = strchr(argv[argi], '=');
455 messagetimeout = atoi(p+1);
456 }
457 else if (argnmatch(argv[argi], "--daemon")) {
458 daemonize = 1;
459 }
460 else if (argnmatch(argv[argi], "--no-daemon")) {
461 daemonize = 0;
462 }
463 else if (argnmatch(argv[argi], "--pidfile=")) {
464 char *p = strchr(argv[argi], '=');
465 pidfile = strdup(p+1);
466 }
467 else if (argnmatch(argv[argi], "--log=")) {
468 char *p = strchr(argv[argi], '=');
469 logfn = strdup(p+1);
470 }
471 else if (argnmatch(argv[argi], "--env=")) {
472 char *p = strchr(argv[argi], '=');
473 loadenv(p+1, envarea);
474 }
475 else if (argnmatch(argv[argi], "--area=")) {
476 char *p = strchr(argv[argi], '=');
477 envarea = strdup(p+1);
478 }
479 else if (argnmatch(argv[argi], "--locator=")) {
480 char *p = strchr(argv[argi], '=');
481 locator_init(p+1);
482 locatorbased = 1;
483 }
484 else if (argnmatch(argv[argi], "--service=")) {
485 char *p = strchr(argv[argi], '=');
486 locatorservice = get_servicetype(p+1);
487 }
488 else if (argnmatch(argv[argi], "--filter=")) {
489 char *p = strchr(argv[argi], '=');
490 msgfilter = compileregex(p+1);
491 if (!msgfilter) {
492 errprintf("Invalid filter (bad expression): %s\n", p+1);
493 }
494 else {
495 stdfilter = firstlineregex("^@@(logrotate|shutdown|drophost|droptest|renamehost|renametest)");
496 }
497 }
498 else if (argnmatch(argv[argi], "--md5")) {
499 checksumsize = 33;
500 }
501 else if (argnmatch(argv[argi], "--no-md5")) {
502 checksumsize = 0;
503 }
504 else {
505 char *childcmd;
506 char **childargs;
507 int i = 0;
508
509 childcmd = argv[argi];
510 childargs = (char **) calloc((1 + argc - argi), sizeof(char *));
511 while (argi < argc) { childargs[i++] = argv[argi++]; }
512 addlocalpeer(childcmd, childargs);
513 }
514 }
515
516 /* Sanity checks */
517 if (cnid == -1) {
518 errprintf("No channel/unknown channel specified\n");
519 return 1;
520 }
521 if (locatorbased && (locatorservice == ST_MAX)) {
522 errprintf("Must specify --service when using locator\n");
523 return 1;
524 }
525 if (!locatorbased && (xtreeFirst(peers) == xtreeEnd(peers))) {
526 errprintf("Must specify command for local worker\n");
527 return 1;
528 }
529
530 /* Do cache responses to avoid doing too many lookups */
531 if (locatorbased) locator_prepcache(locatorservice, 0);
532
533 /* Go daemon */
534 if (daemonize) {
535 /* Become a daemon */
536 pid_t daemonpid = fork();
537 if (daemonpid < 0) {
538 /* Fork failed */
539 errprintf("Could not fork child\n");
540 exit(1);
541 }
542 else if (daemonpid > 0) {
543 /* Parent creates PID file and exits */
544 FILE *fd = NULL;
545 if (pidfile) fd = fopen(pidfile, "w");
546 if (fd) {
547 fprintf(fd, "%d\n", (int)daemonpid);
548 fclose(fd);
549 }
550 exit(0);
551 }
552 /* Child (daemon) continues here */
553 setsid();
554 }
555
556 /* Catch signals */
557 setup_signalhandler("xymond_channel");
558 memset(&sa, 0, sizeof(sa));
559 sa.sa_handler = sig_handler;
560 sigaction(SIGINT, &sa, NULL);
561 sigaction(SIGTERM, &sa, NULL);
562 sigaction(SIGCHLD, &sa, NULL);
563 signal(SIGALRM, SIG_IGN);
564
565 /* Switch stdout/stderr to the logfile, if one was specified */
566 reopen_file("/dev/null", "r", stdin); /* xymond_channel's stdin is not used */
567 if (logfn) {
568 reopen_file(logfn, "a", stdout);
569 reopen_file(logfn, "a", stderr);
570 }
571
572 /* Attach to the channel */
573 channel = setup_channel(cnid, CHAN_CLIENT);
574 if (channel == NULL) {
575 errprintf("Channel not available\n");
576 running = 0;
577 }
578
579 while (running) {
580 /*
581 * Wait for GOCLIENT to go up.
582 *
583 * Note that we use IPC_NOWAIT if there are messages in the
584 * queue, because then we just want to pick up a message if
585 * there is one, and if not we want to continue pushing the
586 * queued data to the worker.
587 */
588 struct sembuf s;
589 int n;
590
591 if (deadpid != 0) {
592 char *cause = "Unknown";
593 int ecode = -1;
594
595 if (WIFEXITED(childexit)) { cause = "Exit status"; ecode = WEXITSTATUS(childexit); }
596 else if (WIFSIGNALED(childexit)) { cause = "Signal"; ecode = WTERMSIG(childexit); }
597 errprintf("Child process %d died: %s %d\n", deadpid, cause, ecode);
598 deadpid = 0;
599 }
600
601 s.sem_num = GOCLIENT; s.sem_op = -1; s.sem_flg = ((pendingcount > 0) ? IPC_NOWAIT : 0);
602 n = semop(channel->semid, &s, 1);
603
604 if (n == 0) {
605 /*
606 * GOCLIENT went high, and so we got alerted about a new
607 * message arriving. Copy the message to our own buffer queue.
608 */
609 char *inbuf = NULL;
610 int msgsz = 0;
611
612 if (!msgfilter || matchregex(channel->channelbuf, msgfilter) || matchregex(channel->channelbuf, stdfilter)) {
613 msgsz = strlen(channel->channelbuf);
614 inbuf = (char *)malloc(msgsz + checksumsize + 1);
615 memcpy(inbuf+checksumsize, channel->channelbuf, msgsz+1); /* Include \0 */
616 }
617
618 /*
619 * Now we have safely stored the new message in our buffer.
620 * Wait until any other clients on the same channel have picked up
621 * this message (GOCLIENT reaches 0).
622 *
623 * We wrap this into an alarm handler, because it can occasionally
624 * fail, causing the whole system to lock up. We don't want that....
625 * We'll set the alarm to trigger after 1 second. Experience shows
626 * that we'll either succeed in a few milliseconds, or fail completely
627 * and wait the full alarm-timer duration.
628 */
629 gotalarm = 0; signal(SIGALRM, sig_handler); alarm(2);
630 do {
631 s.sem_num = GOCLIENT; s.sem_op = 0; s.sem_flg = 0;
632 n = semop(channel->semid, &s, 1);
633 } while ((n == -1) && (errno == EAGAIN) && running && (!gotalarm));
634 signal(SIGALRM, SIG_IGN);
635
636 if (gotalarm) {
637 errprintf("Gave up waiting for GOCLIENT to go low.\n");
638 }
639
640 /*
641 * Let master know we got it by downing BOARDBUSY.
642 * This should not block, since BOARDBUSY is upped
643 * by the master just before he ups GOCLIENT.
644 */
645 do {
646 s.sem_num = BOARDBUSY; s.sem_op = -1; s.sem_flg = IPC_NOWAIT;
647 n = semop(channel->semid, &s, 1);
648 } while ((n == -1) && (errno == EINTR));
649 if (n == -1) {
650 errprintf("Tried to down BOARDBUSY: %s\n", strerror(errno));
651 }
652
653 if (inbuf) {
654 /*
655 * See if they want us to rotate logs. We pass this on to
656 * the worker module as well, but must handle our own logfile.
657 */
658 if (strncmp(inbuf+checksumsize, "@@logrotate", 11) == 0) {
659 reopen_file(logfn, "a", stdout);
660 reopen_file(logfn, "a", stderr);
661 }
662
663 if (checksumsize > 0) {
664 char *sep1 = inbuf + checksumsize + strcspn(inbuf+checksumsize, "#|\n");
665
666 if (*sep1 == '#') {
667 /*
668 * Add md5 hash of the message. I.e. transform the header line from
669 * "@@%s#%u/%s|%d.%06d| channelmarker, seq, hostname, tstamp.tv_sec, tstamp.tv_usec
670 * to
671 * "@@%s:%s#%u/%s|%d.%06d| channelmarker, hashstr, seq, hostname, tstamp.tv_sec, tstamp.tv_usec
672 */
673 char *hashstr = md5hash(inbuf+checksumsize);
674 int hlen = sep1 - (inbuf + checksumsize);
675
676 memmove(inbuf, inbuf+checksumsize, hlen);
677 *(inbuf + hlen) = ':';
678 memcpy(inbuf+hlen+1, hashstr, strlen(hashstr));
679 }
680 else {
681 /* No sequence number (control message). Skip checksum for these */
682 memmove(inbuf, inbuf+checksumsize, msgsz+1);
683 }
684 }
685
686
687 /*
688 * Put the new message on our outbound queue.
689 */
690 if (addmessage(inbuf) != 0) {
691 /* Failed to queue message, free the buffer */
692 xfree(inbuf);
693 }
694 }
695 }
696 else {
697 if (errno != EAGAIN) {
698 dbgprintf("Semaphore wait aborted: %s\n", strerror(errno));
699 continue;
700 }
701 }
702
703 /*
704 * We've picked up messages from the master. Now we
705 * must push them to the worker process. Since there
706 * is no way to hang off both a semaphore and select(),
707 * we'll just push as much data as possible into the
708 * pipe. If we get to a point where we would block,
709 * then wait a teeny bit of time and restart the
710 * whole loop with checking for new messages from the
711 * master etc.
712 *
713 * In theory, this could become an almost busy-wait loop.
714 * In practice, however, the queue will be empty most
715 * of the time because we'll just shove the data to the
716 * worker child.
717 */
718 for (handle = xtreeFirst(peers); (handle != xtreeEnd(peers)); handle = xtreeNext(peers, handle)) {
719 int canwrite = 1, hasfailed = 0;
720 xymon_peer_t *pwalk;
721 time_t msgtimeout = gettimer() - messagetimeout;
722 int flushcount = 0;
723
724 pwalk = (xymon_peer_t *) xtreeData(peers, handle);
725 if (pwalk->msghead == NULL) continue; /* Ignore peers with nothing queued */
726
727 switch (pwalk->peerstatus) {
728 case P_UP:
729 canwrite = 1;
730 break;
731
732 case P_DOWN:
733 openconnection(pwalk);
734 canwrite = (pwalk->peerstatus == P_UP);
735 break;
736
737 case P_FAILED:
738 canwrite = 0;
739 break;
740 }
741
742 /* See if we have stale messages queued */
743 while (pwalk->msghead && (pwalk->msghead->tstamp < msgtimeout)) {
744 flushmessage(pwalk);
745 flushcount++;
746 }
747
748 if (flushcount) {
749 errprintf("Flushed %d stale messages for %s:%d\n",
750 flushcount,
751 inet_ntoa(pwalk->peeraddr.sin_addr),
752 ntohs(pwalk->peeraddr.sin_port));
753 }
754
755 while (pwalk->msghead && canwrite) {
756 fd_set fdwrite;
757 struct timeval tmo;
758
759 /* Check that this peer is ready for writing. */
760 FD_ZERO(&fdwrite); FD_SET(pwalk->peersocket, &fdwrite);
761 tmo.tv_sec = 0; tmo.tv_usec = 2000;
762 n = select(pwalk->peersocket+1, NULL, &fdwrite, NULL, &tmo);
763 if (n == -1) {
764 errprintf("select() failed: %s\n", strerror(errno));
765 canwrite = 0;
766 hasfailed = 1;
767 continue;
768 }
769 else if ((n == 0) || (!FD_ISSET(pwalk->peersocket, &fdwrite))) {
770 canwrite = 0;
771 continue;
772 }
773
774 n = write(pwalk->peersocket, pwalk->msghead->bufp, pwalk->msghead->buflen);
775 if (n >= 0) {
776 pwalk->msghead->bufp += n;
777 pwalk->msghead->buflen -= n;
778 if (pwalk->msghead->buflen == 0) flushmessage(pwalk);
779 }
780 else if (errno == EAGAIN) {
781 /*
782 * Write would block ... stop for now.
783 */
784 canwrite = 0;
785 }
786 else {
787 hasfailed = 1;
788 }
789
790 if (hasfailed) {
791 /* Write failed, or message grew stale */
792 errprintf("Peer at %s:%d failed: %s\n",
793 inet_ntoa(pwalk->peeraddr.sin_addr), ntohs(pwalk->peeraddr.sin_port),
794 strerror(errno));
795 canwrite = 0;
796 shutdownconnection(pwalk);
797 if (pwalk->peertype == P_NET) locator_serverdown(pwalk->peername, locatorservice);
798 pwalk->peerstatus = P_FAILED;
799 }
800 }
801 }
802 }
803
804 /* Detach from channels */
805 close_channel(channel, CHAN_CLIENT);
806
807 /* Close peer connections */
808 for (handle = xtreeFirst(peers); (handle != xtreeEnd(peers)); handle = xtreeNext(peers, handle)) {
809 xymon_peer_t *pwalk = (xymon_peer_t *) xtreeData(peers, handle);
810 shutdownconnection(pwalk);
811 }
812
813 /* Remove the PID file */
814 if (pidfile) unlink(pidfile);
815
816 return 0;
817 }
818
819