1 /*----------------------------------------------------------------------------*/
2 /* Xymon message daemon.                                                      */
3 /*                                                                            */
4 /* This module receives messages from one channel of the Xymon master daemon. */
5 /* These messages are then forwarded to the actual worker process via stdin;  */
6 /* the worker process can process the messages without having to care about   */
7 /* the tricky details in the xymond/xymond_channel communications.            */
8 /*                                                                            */
9 /* This program is released under the GNU General Public License (GPL),       */
10 /* version 2. See the file "COPYING" for details.                             */
11 /*                                                                            */
12 /* Copyright (C) 2004-2011 Henrik Storner <henrik@hswn.dk>                    */
13 /*                                                                            */
14 /*----------------------------------------------------------------------------*/
15 
16 static char rcsid[] = "$Id: xymond_channel.c 7678 2015-10-01 14:42:42Z jccleaver $";
17 
18 #include <sys/types.h>
19 #include <sys/ipc.h>
20 #include <sys/sem.h>
21 #include <sys/shm.h>
22 #include <sys/time.h>
23 
24 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <arpa/inet.h>
27 #ifdef HAVE_SYS_SELECT_H
28 #include <sys/select.h>
29 #endif
30 #include <errno.h>
31 #include <netdb.h>
32 #include <fcntl.h>
33 
34 #include <sys/wait.h>
35 #include <stdlib.h>
36 #include <unistd.h>
37 #include <string.h>
38 
39 #include "libxymon.h"
40 
41 #include <signal.h>
42 
43 
44 
45 /* Our in-memory queue of messages received from xymond via IPC. One queue per peer. */
46 typedef struct xymon_msg_t {
47 	time_t tstamp;  /* When did the message arrive */
48 	char *buf;	/* The message data */
49 	char *bufp;	/* Next char to send */
50 	int buflen;	/* How many bytes left to send */
51 	struct xymon_msg_t *next;
52 } xymon_msg_t;
53 
54 
55 /* Our list of peers we send data to */
56 typedef struct xymon_peer_t {
57 	char *peername;
58 
59 	enum { P_DOWN, P_UP, P_FAILED } peerstatus;
60 	xymon_msg_t *msghead, *msgtail;	/* Message queue */
61 
62 	enum { P_LOCAL, P_NET } peertype;
63 	int peersocket;				/* File descriptor receiving the data */
64 	time_t lastopentime;			/* Last time we attempted to connect to the peer */
65 
66 	/* For P_NET peers */
67 	struct sockaddr_in peeraddr;		/* The IP address of the peer */
68 
69 	/* For P_LOCAL peers */
70 	char *childcmd;				/* Command and arguments for the child process */
71 	char **childargs;
72 	pid_t childpid;				/* PID of the running worker child */
73 } xymon_peer_t;
74 
75 void * peers;
76 
77 pid_t deadpid = 0;
78 int childexit;
79 
80 xymond_channel_t *channel = NULL;
81 char *logfn = NULL;
82 int locatorbased = 0;
83 enum locator_servicetype_t locatorservice = ST_MAX;
84 
85 static int running = 1;
86 static int gotalarm = 0;
87 static int pendingcount = 0;
88 static int messagetimeout = 30;
89 
90 /*
91  * chksumsize is the space left in front of the message buffer, to
92  * allow room for a message digest checksum to be added to the
93  * message. Since we use an MD5 hash, this will be 32 bytes
94  * plus a one-char marker.
95  */
96 static int checksumsize = 0;
97 
addnetpeer(char * peername)98 void addnetpeer(char *peername)
99 {
100 	xymon_peer_t *newpeer;
101 	struct in_addr addr;
102 	char *oneip;
103 	int peerport = 0;
104 	char *delim;
105 
106 	dbgprintf("Adding network peer %s\n", peername);
107 
108 	oneip = strdup(peername);
109 
110 	delim = strchr(oneip, ':');
111 	if (delim) {
112 		*delim = '\0';
113 		peerport = atoi(delim+1);
114 	}
115 
116 	if (inet_aton(oneip, &addr) == 0) {
117 		/* peer is not an IP - do DNS lookup */
118 
119 		struct hostent *hent;
120 
121 		hent = gethostbyname(oneip);
122 		if (hent) {
123 			char *realip;
124 
125 			memcpy(&addr, *(hent->h_addr_list), sizeof(struct in_addr));
126 			realip = inet_ntoa(addr);
127 			if (inet_aton(realip, &addr) == 0) {
128 				errprintf("Invalid IP address for %s (%s)\n", oneip, realip);
129 				goto done;
130 			}
131 		}
132 		else {
133 			errprintf("Cannot determine IP address of peer %s\n", oneip);
134 			goto done;
135 		}
136 	}
137 
138 	if (peerport == 0) peerport = atoi(xgetenv("XYMONDPORT"));
139 
140 	newpeer = calloc(1, sizeof(xymon_peer_t));
141 	newpeer->peername = strdup(peername);
142 	newpeer->peerstatus = P_DOWN;
143 	newpeer->peertype = P_NET;
144 	newpeer->peeraddr.sin_family = AF_INET;
145 	newpeer->peeraddr.sin_addr.s_addr = addr.s_addr;
146 	newpeer->peeraddr.sin_port = htons(peerport);
147 
148 	xtreeAdd(peers, newpeer->peername, newpeer);
149 
150 done:
151 	xfree(oneip);
152 }
153 
154 
addlocalpeer(char * childcmd,char ** childargs)155 void addlocalpeer(char *childcmd, char **childargs)
156 {
157 	xymon_peer_t *newpeer;
158 	int i, count;
159 
160 	dbgprintf("Adding local peer using command %s\n", childcmd);
161 
162 	for (count=0; (childargs[count]); count++) ;
163 
164 	newpeer = (xymon_peer_t *)calloc(1, sizeof(xymon_peer_t));
165 	newpeer->peername = strdup("");
166 	newpeer->peerstatus = P_DOWN;
167 	newpeer->peertype = P_LOCAL;
168 	newpeer->childcmd = strdup(childcmd);
169 	newpeer->childargs = (char **)calloc(count+1, sizeof(char *));
170 	for (i=0; (i<count); i++) newpeer->childargs[i] = strdup(childargs[i]);
171 
172 	xtreeAdd(peers, newpeer->peername, newpeer);
173 }
174 
175 
openconnection(xymon_peer_t * peer)176 void openconnection(xymon_peer_t *peer)
177 {
178 	int n;
179 	int pfd[2];
180 	pid_t childpid;
181 	time_t now;
182 
183 	peer->peerstatus = P_DOWN;
184 
185 	now = gettimer();
186 	if (now < (peer->lastopentime + 60)) return;	/* Will only attempt one open per minute */
187 
188 	dbgprintf("Connecting to peer %s:%d\n", inet_ntoa(peer->peeraddr.sin_addr), ntohs(peer->peeraddr.sin_port));
189 
190 	peer->lastopentime = now;
191 	switch (peer->peertype) {
192 	  case P_NET:
193 		/* Get a socket, and connect to the peer */
194 		peer->peersocket = socket(PF_INET, SOCK_STREAM, 0);
195 		if (peer->peersocket == -1) {
196 			errprintf("Cannot get socket: %s\n", strerror(errno));
197 			return;
198 		}
199 
200 		n = connect(peer->peersocket, (struct sockaddr *)&peer->peeraddr, sizeof(peer->peeraddr));
201 		if (n == -1) {
202 			errprintf("Cannot connect to peer %s:%d : %s\n",
203 				inet_ntoa(peer->peeraddr.sin_addr), ntohs(peer->peeraddr.sin_port),
204 				strerror(errno));
205 			return;
206 		}
207 		break;
208 
209 	  case P_LOCAL:
210 		/* Create a pipe to the child handler program, and run it */
211 		n = pipe(pfd);
212 		if (n == -1) {
213 			errprintf("Could not get a pipe: %s\n", strerror(errno));
214 			return;
215 		}
216 
217 		childpid = fork();
218 		if (childpid == -1) {
219 			errprintf("Could not fork channel handler: %s\n", strerror(errno));
220 			return;
221 		}
222 		else if (childpid == 0) {
223 			/* The channel handler child */
224 			if (logfn) {
225 				char *logfnenv = (char *)malloc(strlen(logfn) + 30);
226 				sprintf(logfnenv, "XYMONCHANNEL_LOGFILENAME=%s", logfn);
227 				putenv(logfnenv);
228 			}
229 
230 			dbgprintf("Child '%s' started (PID %d), about to fork\n", peer->childcmd, (int)getpid());
231 
232 			n = dup2(pfd[0], STDIN_FILENO);
233 			close(pfd[0]); close(pfd[1]);
234 			n = execvp(peer->childcmd, peer->childargs);
235 
236 			/* We should never go here */
237 			errprintf("exec() failed for child command %s: %s\n",
238 				peer->childcmd, strerror(errno));
239 			exit(1);
240 		}
241 
242 		/* Parent process continues */
243 		close(pfd[0]);
244 		peer->peersocket = pfd[1];
245 		peer->childpid = childpid;
246 		break;
247 	}
248 
249 	fcntl(peer->peersocket, F_SETFL, O_NONBLOCK);
250 	peer->peerstatus = P_UP;
251 	dbgprintf("Peer is UP\n");
252 }
253 
254 
255 
flushmessage(xymon_peer_t * peer)256 void flushmessage(xymon_peer_t *peer)
257 {
258 	xymon_msg_t *zombie;
259 
260 	zombie = peer->msghead;
261 
262 	peer->msghead = peer->msghead->next;
263 	if (peer->msghead == NULL) peer->msgtail = NULL;
264 
265 	xfree(zombie->buf);
266 	xfree(zombie);
267 	pendingcount--;
268 }
269 
addmessage_onepeer(xymon_peer_t * peer,char * inbuf,int inlen)270 static void addmessage_onepeer(xymon_peer_t *peer, char *inbuf, int inlen)
271 {
272 	xymon_msg_t *newmsg;
273 
274 	newmsg = (xymon_msg_t *) calloc(1, sizeof(xymon_msg_t));
275 	newmsg->tstamp = gettimer();
276 	newmsg->buf = newmsg->bufp = inbuf;
277 	newmsg->buflen = inlen;
278 
279 	/*
280 	 * If we've flagged the peer as FAILED, then change status to DOWN so
281 	 * we will attempt to reconnect to the peer. The locator believes it is
282 	 * up and running, so it probably is ...
283 	 */
284 	if (peer->peerstatus == P_FAILED) peer->peerstatus = P_DOWN;
285 
286 	/* If the peer is down, we will only permit ONE message in the queue. */
287 	if (peer->peerstatus != P_UP) {
288 		errprintf("Peer not up, flushing message queue\n");
289 		while (peer->msghead) flushmessage(peer);
290 	}
291 
292 	if (peer->msghead == NULL) {
293 		peer->msghead = peer->msgtail = newmsg;
294 	}
295 	else {
296 		peer->msgtail->next = newmsg;
297 		peer->msgtail = newmsg;
298 	}
299 
300 	pendingcount++;
301 }
302 
addmessage(char * inbuf)303 int addmessage(char *inbuf)
304 {
305 	xtreePos_t phandle;
306 	xymon_peer_t *peer;
307 	int bcastmsg = 0;
308 	int inlen = strlen(inbuf);
309 
310 	if (locatorbased) {
311 		char *hostname, *hostend, *peerlocation;
312 
313 		/* xymond sends us messages with the KEY in the first field, between a '/' and a '|' */
314 		hostname = inbuf + strcspn(inbuf, "/|\r\n");
315 		if (*hostname != '/') {
316 			errprintf("No key field in message, dropping it\n");
317 			return -1; /* Malformed input */
318 		}
319 		hostname++;
320 		bcastmsg = (*hostname == '*');
321 		if (!bcastmsg) {
322 			/* Lookup which server handles this host */
323 			hostend = hostname + strcspn(hostname, "|\r\n");
324 			if (*hostend != '|') {
325 				errprintf("No delimiter found in input, dropping it\n");
326 				return -1; /* Malformed input */
327 			}
328 			*hostend = '\0';
329 			peerlocation = locator_query(hostname, locatorservice, NULL);
330 
331 			/*
332 			 * If we get no response, or an empty response,
333 			 * then there is no server capable of handling this
334 			 * request.
335 			 */
336 			if (!peerlocation || (*peerlocation == '\0')) {
337 				errprintf("No response from locator for %s/%s, dropping it\n",
338 					  servicetype_names[locatorservice], hostname);
339 				return -1;
340 			}
341 
342 			*hostend = '|';
343 			phandle = xtreeFind(peers, peerlocation);
344 			if (phandle == xtreeEnd(peers)) {
345 				/* New peer - register it */
346 				addnetpeer(peerlocation);
347 				phandle = xtreeFind(peers, peerlocation);
348 			}
349 		}
350 	}
351 	else {
352 		phandle = xtreeFind(peers, "");
353 	}
354 
355 	if (bcastmsg) {
356 		for (phandle = xtreeFirst(peers); (phandle != xtreeEnd(peers)); phandle = xtreeNext(peers, phandle)) {
357 			peer = (xymon_peer_t *)xtreeData(peers, phandle);
358 
359 			addmessage_onepeer(peer, inbuf, inlen);
360 		}
361 	}
362 	else {
363 		if (phandle == xtreeEnd(peers)) {
364 			errprintf("No peer found to handle message, dropping it\n");
365 			return -1;
366 		}
367 		peer = (xymon_peer_t *)xtreeData(peers, phandle);
368 
369 		addmessage_onepeer(peer, inbuf, inlen);
370 	}
371 
372 	return 0;
373 }
374 
shutdownconnection(xymon_peer_t * peer)375 void shutdownconnection(xymon_peer_t *peer)
376 {
377 	if (peer->peerstatus != P_UP) return;
378 
379 	peer->peerstatus = P_DOWN;
380 
381 	switch (peer->peertype) {
382 	  case P_LOCAL:
383 		close(peer->peersocket);
384 		peer->peersocket = -1;
385 		if (peer->childpid > 0) kill(peer->childpid, SIGTERM);
386 		peer->childpid = 0;
387 		break;
388 
389 	  case P_NET:
390 		shutdown(peer->peersocket, SHUT_RDWR);
391 		close(peer->peersocket);
392 		peer->peersocket = -1;
393 		break;
394 	}
395 
396 	/* Any messages queued are discarded */
397 	while (peer->msghead) flushmessage(peer);
398 	peer->msghead = peer->msgtail = NULL;
399 }
400 
401 
sig_handler(int signum)402 void sig_handler(int signum)
403 {
404 	switch (signum) {
405 	  case SIGTERM:
406 	  case SIGINT:
407 		/* Shutting down. */
408 		running = 0;
409 		break;
410 
411 	  case SIGCHLD:
412 		/* Our worker child died. Avoid zombies. */
413 		deadpid = wait(&childexit);
414 		break;
415 
416 	  case SIGALRM:
417 		gotalarm = 1;
418 		break;
419 	}
420 }
421 
422 
main(int argc,char * argv[])423 int main(int argc, char *argv[])
424 {
425 	int daemonize = 0;
426 	char *pidfile = NULL;
427 	char *envarea = NULL;
428 	int cnid = -1;
429 	pcre *msgfilter = NULL;
430 	pcre *stdfilter = NULL;
431 
432 	int argi;
433 	struct sigaction sa;
434 	xtreePos_t handle;
435 
436 
437 	/* Don't save the error buffer */
438 	save_errbuf = 0;
439 
440 	/* Create the peer container */
441 	peers = xtreeNew(strcasecmp);
442 
443 	for (argi=1; (argi < argc); argi++) {
444 		if (argnmatch(argv[argi], "--debug")) {
445 			debug = 1;
446 		}
447 		else if (argnmatch(argv[argi], "--channel=")) {
448 			char *cn = strchr(argv[argi], '=') + 1;
449 
450 			for (cnid = C_STATUS; (channelnames[cnid] && strcmp(channelnames[cnid], cn)); cnid++) ;
451 			if (channelnames[cnid] == NULL) cnid = -1;
452 		}
453 		else if (argnmatch(argv[argi], "--msgtimeout")) {
454 			char *p = strchr(argv[argi], '=');
455 			messagetimeout = atoi(p+1);
456 		}
457 		else if (argnmatch(argv[argi], "--daemon")) {
458 			daemonize = 1;
459 		}
460 		else if (argnmatch(argv[argi], "--no-daemon")) {
461 			daemonize = 0;
462 		}
463 		else if (argnmatch(argv[argi], "--pidfile=")) {
464 			char *p = strchr(argv[argi], '=');
465 			pidfile = strdup(p+1);
466 		}
467 		else if (argnmatch(argv[argi], "--log=")) {
468 			char *p = strchr(argv[argi], '=');
469 			logfn = strdup(p+1);
470 		}
471 		else if (argnmatch(argv[argi], "--env=")) {
472 			char *p = strchr(argv[argi], '=');
473 			loadenv(p+1, envarea);
474 		}
475 		else if (argnmatch(argv[argi], "--area=")) {
476 			char *p = strchr(argv[argi], '=');
477 			envarea = strdup(p+1);
478 		}
479 		else if (argnmatch(argv[argi], "--locator=")) {
480 			char *p = strchr(argv[argi], '=');
481 			locator_init(p+1);
482 			locatorbased = 1;
483 		}
484 		else if (argnmatch(argv[argi], "--service=")) {
485 			char *p = strchr(argv[argi], '=');
486 			locatorservice = get_servicetype(p+1);
487 		}
488 		else if (argnmatch(argv[argi], "--filter=")) {
489 			char *p = strchr(argv[argi], '=');
490 			msgfilter = compileregex(p+1);
491 			if (!msgfilter) {
492 				errprintf("Invalid filter (bad expression): %s\n", p+1);
493 			}
494 			else {
495 				stdfilter = firstlineregex("^@@(logrotate|shutdown|drophost|droptest|renamehost|renametest)");
496 			}
497 		}
498 		else if (argnmatch(argv[argi], "--md5")) {
499 			checksumsize = 33;
500 		}
501 		else if (argnmatch(argv[argi], "--no-md5")) {
502 			checksumsize = 0;
503 		}
504 		else {
505 			char *childcmd;
506 			char **childargs;
507 			int i = 0;
508 
509 			childcmd = argv[argi];
510 			childargs = (char **) calloc((1 + argc - argi), sizeof(char *));
511 			while (argi < argc) { childargs[i++] = argv[argi++]; }
512 			addlocalpeer(childcmd, childargs);
513 		}
514 	}
515 
516 	/* Sanity checks */
517 	if (cnid == -1) {
518 		errprintf("No channel/unknown channel specified\n");
519 		return 1;
520 	}
521 	if (locatorbased && (locatorservice == ST_MAX)) {
522 		errprintf("Must specify --service when using locator\n");
523 		return 1;
524 	}
525 	if (!locatorbased && (xtreeFirst(peers) == xtreeEnd(peers))) {
526 		errprintf("Must specify command for local worker\n");
527 		return 1;
528 	}
529 
530 	/* Do cache responses to avoid doing too many lookups */
531 	if (locatorbased) locator_prepcache(locatorservice, 0);
532 
533 	/* Go daemon */
534 	if (daemonize) {
535 		/* Become a daemon */
536 		pid_t daemonpid = fork();
537 		if (daemonpid < 0) {
538 			/* Fork failed */
539 			errprintf("Could not fork child\n");
540 			exit(1);
541 		}
542 		else if (daemonpid > 0) {
543 			/* Parent creates PID file and exits */
544 			FILE *fd = NULL;
545 			if (pidfile) fd = fopen(pidfile, "w");
546 			if (fd) {
547 				fprintf(fd, "%d\n", (int)daemonpid);
548 				fclose(fd);
549 			}
550 			exit(0);
551 		}
552 		/* Child (daemon) continues here */
553 		setsid();
554 	}
555 
556 	/* Catch signals */
557 	setup_signalhandler("xymond_channel");
558 	memset(&sa, 0, sizeof(sa));
559 	sa.sa_handler = sig_handler;
560 	sigaction(SIGINT, &sa, NULL);
561 	sigaction(SIGTERM, &sa, NULL);
562 	sigaction(SIGCHLD, &sa, NULL);
563 	signal(SIGALRM, SIG_IGN);
564 
565 	/* Switch stdout/stderr to the logfile, if one was specified */
566 	reopen_file("/dev/null", "r", stdin);	/* xymond_channel's stdin is not used */
567 	if (logfn) {
568 		reopen_file(logfn, "a", stdout);
569 		reopen_file(logfn, "a", stderr);
570 	}
571 
572 	/* Attach to the channel */
573 	channel = setup_channel(cnid, CHAN_CLIENT);
574 	if (channel == NULL) {
575 		errprintf("Channel not available\n");
576 		running = 0;
577 	}
578 
579 	while (running) {
580 		/*
581 		 * Wait for GOCLIENT to go up.
582 		 *
583 		 * Note that we use IPC_NOWAIT if there are messages in the
584 		 * queue, because then we just want to pick up a message if
585 		 * there is one, and if not we want to continue pushing the
586 		 * queued data to the worker.
587 		 */
588 		struct sembuf s;
589 		int n;
590 
591 		if (deadpid != 0) {
592 			char *cause = "Unknown";
593 			int ecode = -1;
594 
595 			if (WIFEXITED(childexit)) { cause = "Exit status"; ecode = WEXITSTATUS(childexit); }
596 			else if (WIFSIGNALED(childexit)) { cause = "Signal"; ecode = WTERMSIG(childexit); }
597 			errprintf("Child process %d died: %s %d\n", deadpid, cause, ecode);
598 			deadpid = 0;
599 		}
600 
601 		s.sem_num = GOCLIENT; s.sem_op  = -1; s.sem_flg = ((pendingcount > 0) ? IPC_NOWAIT : 0);
602 		n = semop(channel->semid, &s, 1);
603 
604 		if (n == 0) {
605 			/*
606 			 * GOCLIENT went high, and so we got alerted about a new
607 			 * message arriving. Copy the message to our own buffer queue.
608 			 */
609 			char *inbuf = NULL;
610 			int msgsz = 0;
611 
612 			if (!msgfilter || matchregex(channel->channelbuf, msgfilter) || matchregex(channel->channelbuf, stdfilter)) {
613 				msgsz = strlen(channel->channelbuf);
614 				inbuf = (char *)malloc(msgsz + checksumsize + 1);
615 				memcpy(inbuf+checksumsize, channel->channelbuf, msgsz+1); /* Include \0 */
616 			}
617 
618 			/*
619 			 * Now we have safely stored the new message in our buffer.
620 			 * Wait until any other clients on the same channel have picked up
621 			 * this message (GOCLIENT reaches 0).
622 			 *
623 			 * We wrap this into an alarm handler, because it can occasionally
624 			 * fail, causing the whole system to lock up. We don't want that....
625 			 * We'll set the alarm to trigger after 1 second. Experience shows
626 			 * that we'll either succeed in a few milliseconds, or fail completely
627 			 * and wait the full alarm-timer duration.
628 			 */
629 			gotalarm = 0; signal(SIGALRM, sig_handler); alarm(2);
630 			do {
631 				s.sem_num = GOCLIENT; s.sem_op  = 0; s.sem_flg = 0;
632 				n = semop(channel->semid, &s, 1);
633 			} while ((n == -1) && (errno == EAGAIN) && running && (!gotalarm));
634 			signal(SIGALRM, SIG_IGN);
635 
636 			if (gotalarm) {
637 				errprintf("Gave up waiting for GOCLIENT to go low.\n");
638 			}
639 
640 			/*
641 			 * Let master know we got it by downing BOARDBUSY.
642 			 * This should not block, since BOARDBUSY is upped
643 			 * by the master just before he ups GOCLIENT.
644 			 */
645 			do {
646 				s.sem_num = BOARDBUSY; s.sem_op  = -1; s.sem_flg = IPC_NOWAIT;
647 				n = semop(channel->semid, &s, 1);
648 			} while ((n == -1) && (errno == EINTR));
649 			if (n == -1) {
650 				errprintf("Tried to down BOARDBUSY: %s\n", strerror(errno));
651 			}
652 
653 			if (inbuf) {
654 				/*
655 				 * See if they want us to rotate logs. We pass this on to
656 				 * the worker module as well, but must handle our own logfile.
657 				 */
658 				if (strncmp(inbuf+checksumsize, "@@logrotate", 11) == 0) {
659 					reopen_file(logfn, "a", stdout);
660 					reopen_file(logfn, "a", stderr);
661 				}
662 
663 				if (checksumsize > 0) {
664 					char *sep1 = inbuf + checksumsize + strcspn(inbuf+checksumsize, "#|\n");
665 
666 					if (*sep1 == '#') {
667 						/*
668 						 * Add md5 hash of the message. I.e. transform the header line from
669 						 *   "@@%s#%u/%s|%d.%06d| channelmarker, seq, hostname, tstamp.tv_sec, tstamp.tv_usec
670 						 * to
671 						 *   "@@%s:%s#%u/%s|%d.%06d| channelmarker, hashstr, seq, hostname, tstamp.tv_sec, tstamp.tv_usec
672 						 */
673 						char *hashstr = md5hash(inbuf+checksumsize);
674 						int hlen = sep1 - (inbuf + checksumsize);
675 
676 						memmove(inbuf, inbuf+checksumsize, hlen);
677 						*(inbuf + hlen) = ':';
678 						memcpy(inbuf+hlen+1, hashstr, strlen(hashstr));
679 					}
680 					else {
681 						/* No sequence number (control message). Skip checksum for these */
682 						memmove(inbuf, inbuf+checksumsize, msgsz+1);
683 					}
684 				}
685 
686 
687 				/*
688 				 * Put the new message on our outbound queue.
689 				 */
690 				if (addmessage(inbuf) != 0) {
691 					/* Failed to queue message, free the buffer */
692 					xfree(inbuf);
693 				}
694 			}
695 		}
696 		else {
697 			if (errno != EAGAIN) {
698 				dbgprintf("Semaphore wait aborted: %s\n", strerror(errno));
699 				continue;
700 			}
701 		}
702 
703 		/*
704 		 * We've picked up messages from the master. Now we
705 		 * must push them to the worker process. Since there
706 		 * is no way to hang off both a semaphore and select(),
707 		 * we'll just push as much data as possible into the
708 		 * pipe. If we get to a point where we would block,
709 		 * then wait a teeny bit of time and restart the
710 		 * whole loop with checking for new messages from the
711 		 * master etc.
712 		 *
713 		 * In theory, this could become an almost busy-wait loop.
714 		 * In practice, however, the queue will be empty most
715 		 * of the time because we'll just shove the data to the
716 		 * worker child.
717 		 */
718 		for (handle = xtreeFirst(peers); (handle != xtreeEnd(peers)); handle = xtreeNext(peers, handle)) {
719 			int canwrite = 1, hasfailed = 0;
720 			xymon_peer_t *pwalk;
721 			time_t msgtimeout = gettimer() - messagetimeout;
722 			int flushcount = 0;
723 
724 			pwalk = (xymon_peer_t *) xtreeData(peers, handle);
725 			if (pwalk->msghead == NULL) continue; /* Ignore peers with nothing queued */
726 
727 			switch (pwalk->peerstatus) {
728 			  case P_UP:
729 				canwrite = 1;
730 				break;
731 
732 			  case P_DOWN:
733 				openconnection(pwalk);
734 				canwrite = (pwalk->peerstatus == P_UP);
735 				break;
736 
737 			  case P_FAILED:
738 				canwrite = 0;
739 				break;
740 			}
741 
742 			/* See if we have stale messages queued */
743 			while (pwalk->msghead && (pwalk->msghead->tstamp < msgtimeout)) {
744 				flushmessage(pwalk);
745 				flushcount++;
746 			}
747 
748 			if (flushcount) {
749 				errprintf("Flushed %d stale messages for %s:%d\n",
750 					  flushcount,
751 				  	  inet_ntoa(pwalk->peeraddr.sin_addr),
752 					  ntohs(pwalk->peeraddr.sin_port));
753 			}
754 
755 			while (pwalk->msghead && canwrite) {
756 				fd_set fdwrite;
757 				struct timeval tmo;
758 
759 				/* Check that this peer is ready for writing. */
760 				FD_ZERO(&fdwrite); FD_SET(pwalk->peersocket, &fdwrite);
761 				tmo.tv_sec = 0; tmo.tv_usec = 2000;
762 				n = select(pwalk->peersocket+1, NULL, &fdwrite, NULL, &tmo);
763 				if (n == -1) {
764 					errprintf("select() failed: %s\n", strerror(errno));
765 					canwrite = 0;
766 					hasfailed = 1;
767 					continue;
768 				}
769 				else if ((n == 0) || (!FD_ISSET(pwalk->peersocket, &fdwrite))) {
770 					canwrite = 0;
771 					continue;
772 				}
773 
774 				n = write(pwalk->peersocket, pwalk->msghead->bufp, pwalk->msghead->buflen);
775 				if (n >= 0) {
776 					pwalk->msghead->bufp += n;
777 					pwalk->msghead->buflen -= n;
778 					if (pwalk->msghead->buflen == 0) flushmessage(pwalk);
779 				}
780 				else if (errno == EAGAIN) {
781 					/*
782 					 * Write would block ... stop for now.
783 					 */
784 					canwrite = 0;
785 				}
786 				else {
787 					hasfailed = 1;
788 				}
789 
790 				if (hasfailed) {
791 					/* Write failed, or message grew stale */
792 					errprintf("Peer at %s:%d failed: %s\n",
793 						  inet_ntoa(pwalk->peeraddr.sin_addr), ntohs(pwalk->peeraddr.sin_port),
794 						  strerror(errno));
795 					canwrite = 0;
796 					shutdownconnection(pwalk);
797 					if (pwalk->peertype == P_NET) locator_serverdown(pwalk->peername, locatorservice);
798 					pwalk->peerstatus = P_FAILED;
799 				}
800 			}
801 		}
802 	}
803 
804 	/* Detach from channels */
805 	close_channel(channel, CHAN_CLIENT);
806 
807 	/* Close peer connections */
808 	for (handle = xtreeFirst(peers); (handle != xtreeEnd(peers)); handle = xtreeNext(peers, handle)) {
809 		xymon_peer_t *pwalk = (xymon_peer_t *) xtreeData(peers, handle);
810 		shutdownconnection(pwalk);
811 	}
812 
813 	/* Remove the PID file */
814 	if (pidfile) unlink(pidfile);
815 
816 	return 0;
817 }
818 
819