1 /*----------------------------------------------------------------------------*/
2 /* Xymon message proxy.                                                       */
3 /*                                                                            */
4 /* Copyright (C) 2004-2011 Henrik Storner <henrik@hswn.dk>                    */
5 /*                                                                            */
6 /* This program is released under the GNU General Public License (GPL),       */
7 /* version 2. See the file "COPYING" for details.                             */
8 /*                                                                            */
9 /*----------------------------------------------------------------------------*/
10 
11 static char rcsid[] = "$Id: xymonproxy.c 7678 2015-10-01 14:42:42Z jccleaver $";
12 
13 #include "config.h"
14 
15 #include <sys/time.h>
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>         /* Someday I'll move to GNU Autoconf for this ... */
23 #endif
24 #include <errno.h>
25 #include <sys/resource.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <stdio.h>
31 #include <netdb.h>
32 #include <ctype.h>
33 #include <signal.h>
34 #include <time.h>
35 
36 #include "version.h"
37 #include "libxymon.h"
38 
39 enum phase_t {
40 	P_IDLE,
41 
42 	P_REQ_READING,		/* Reading request data */
43 	P_REQ_READY, 		/* Done reading request from client */
44 
45 	P_REQ_COMBINING,
46 
47 	P_REQ_CONNECTING,	/* Connecting to server */
48 	P_REQ_SENDING, 		/* Sending request data */
49 	P_REQ_DONE,		/* Done sending request data to server */
50 
51 	P_RESP_READING,
52 	P_RESP_READY,
53 	P_RESP_SENDING,
54 	P_RESP_DONE,
55 	P_CLEANUP
56 };
57 
58 char *statename[P_CLEANUP+1] = {
59 	"idle",
60 	"reading from client",
61 	"request from client OK",
62 	"request combining",
63 	"connecting to server",
64 	"sending to server",
65 	"request sent",
66 	"reading from server",
67 	"response from server OK",
68 	"sending to client",
69 	"response sent",
70 	"cleanup"
71 };
72 
73 typedef struct conn_t {
74 	enum phase_t state;
75 	int csocket;
76 	struct sockaddr_in caddr;
77 	struct in_addr *clientip, *serverip;
78 	int snum;
79 	int ssocket;
80 	int conntries, sendtries;
81 	int connectpending;
82 	time_t conntime;
83 	int madetocombo;
84 	struct timespec arrival;
85 	struct timespec timelimit;
86 	unsigned char *buf, *bufp, *bufpsave;
87 	unsigned int bufsize, buflen, buflensave;
88 	struct conn_t *next;
89 } conn_t;
90 
91 #define MAX_SERVERS 3
92 #define CONNECT_TRIES 3		/* How many connect-attempts against the server */
93 #define CONNECT_INTERVAL 8	/* Seconds between each connection attempt */
94 #define SEND_TRIES 2		/* How many times to try sending a message */
95 #define BUFSZ_READ 2048		/* Minimum #bytes that must be free when read'ing into a buffer */
96 #define BUFSZ_INC  8192		/* How much to grow the buffer when it is too small */
97 #define MAX_OPEN_SOCKS 256
98 #define MINIMUM_FOR_COMBO 2048	/* To start merging messages, at least have 2 KB free */
99 #define MAXIMUM_FOR_COMBO 32768 /* Max. size of a combined message */
100 #define COMBO_DELAY 250000000	/* Delay before sending a combo message (in nanoseconds) */
101 
102 int keeprunning = 1;
103 time_t laststatus = 0;
104 char *logfile = NULL;
105 int logdetails = 0;
106 unsigned long msgs_timeout_from[P_CLEANUP+1] = { 0, };
107 
108 
sigmisc_handler(int signum)109 void sigmisc_handler(int signum)
110 {
111 	switch (signum) {
112 	  case SIGTERM:
113 		errprintf("Caught TERM signal, terminating\n");
114 		keeprunning = 0;
115 		break;
116 
117 	  case SIGHUP:
118 		if (logfile) {
119 			reopen_file(logfile, "a", stdout);
120 			reopen_file(logfile, "a", stderr);
121 			errprintf("Caught SIGHUP, reopening logfile\n");
122 		}
123 		break;
124 
125 	  case SIGUSR1:
126 		/* Toggle logging of details */
127 		logdetails = (1 - logdetails);
128 		errprintf("Log details is %sabled\n", (logdetails ? "en" : "dis"));
129 		break;
130 	}
131 }
132 
overdue(struct timespec * now,struct timespec * limit)133 int overdue(struct timespec *now, struct timespec *limit)
134 {
135 	if (now->tv_sec < limit->tv_sec) return 0;
136 	else if (now->tv_sec > limit->tv_sec) return 1;
137 	else return (now->tv_nsec >= limit->tv_nsec);
138 }
139 
do_read(int sockfd,struct in_addr * addr,conn_t * conn,enum phase_t completedstate)140 static int do_read(int sockfd, struct in_addr *addr, conn_t *conn, enum phase_t completedstate)
141 {
142 	int n;
143 
144 	if ((conn->buflen + BUFSZ_READ + 1) > conn->bufsize) {
145 		conn->bufsize += BUFSZ_INC;
146 		conn->buf = realloc(conn->buf, conn->bufsize);
147 		conn->bufp = conn->buf + conn->buflen;
148 	}
149 
150 	n = read(sockfd, conn->bufp, (conn->bufsize - conn->buflen - 1));
151 	if (n == -1) {
152 		/* Error - abort */
153 		errprintf("READ error from %s: %s\n", inet_ntoa(*addr), strerror(errno));
154 		msgs_timeout_from[conn->state]++;
155 		conn->state = P_CLEANUP;
156 		return -1;
157 	}
158 	else if (n == 0) {
159 		/* EOF - request is complete */
160 		conn->state = completedstate;
161 	}
162 	else {
163 		conn->buflen += n;
164 		conn->bufp += n;
165 		*conn->bufp = '\0';
166 	}
167 
168 	return 0;
169 }
170 
do_write(int sockfd,struct in_addr * addr,conn_t * conn,enum phase_t completedstate)171 static int do_write(int sockfd, struct in_addr *addr, conn_t *conn, enum phase_t completedstate)
172 {
173 	int n;
174 
175 	n = write(sockfd, conn->bufp, conn->buflen);
176 	if (n == -1) {
177 		/* Error - abort */
178 		errprintf("WRITE error to %s: %s\n", inet_ntoa(*addr), strerror(errno));
179 		msgs_timeout_from[conn->state]++;
180 		conn->state = P_CLEANUP;
181 		return -1;
182 	}
183 	else if (n >= 0) {
184 		conn->buflen -= n;
185 		conn->bufp += n;
186 		if (conn->buflen == 0) {
187 			conn->state = completedstate;
188 		}
189 	}
190 
191 	return 0;
192 }
193 
do_log(conn_t * conn)194 void do_log(conn_t *conn)
195 {
196 	char *rq, *eol, *delim;
197 	char savechar;
198 
199 	rq = conn->buf+6;
200 	if (strncmp(rq, "combo\n", 6) == 0) rq += 6;
201 
202 	eol = strchr(rq, '\n'); if (eol) *eol = '\0';
203 	for (delim = rq; (*delim && isalpha((unsigned char) *delim)); delim++);
204 	for (; (*delim && isspace((unsigned char) *delim)); delim++);
205 	for (; (*delim && !isspace((unsigned char) *delim)); delim++);
206 	savechar = *delim; *delim = '\0';
207 
208 	errprintf("%s : %s\n", inet_ntoa(*conn->clientip), rq);
209 	*delim = savechar;
210 	if (eol) *eol = '\n';
211 }
212 
main(int argc,char * argv[])213 int main(int argc, char *argv[])
214 {
215 	int daemonize = 1;
216 	int timeout = 10;
217 	int listenq = 512;
218 	char *pidfile = "/var/run/xymonproxy.pid";
219 	char *proxyname = NULL;
220 	char *proxynamesvc = "xymonproxy";
221 
222 	int sockcount = 0;
223 	int lsocket;
224 	struct sockaddr_in laddr;
225 	struct sockaddr_in xymonserveraddr[MAX_SERVERS];
226 	int xymonservercount = 0;
227 	int opt;
228 	conn_t *chead = NULL;
229 	struct sigaction sa;
230 	int selectfailures = 0;
231 
232 	/* Statistics info */
233 	time_t startuptime = gettimer();
234 	unsigned long msgs_total = 0;
235 	unsigned long msgs_total_last = 0;
236 	unsigned long msgs_combined = 0;
237 	unsigned long msgs_merged = 0;
238 	unsigned long msgs_delivered = 0;
239 	unsigned long msgs_status = 0;
240 	unsigned long msgs_combo = 0;
241 	unsigned long msgs_other = 0;
242 	unsigned long msgs_recovered = 0;
243 	struct timespec timeinqueue = { 0, 0 };
244 
245 	/* Don'T save the output from errprintf() */
246 	save_errbuf = 0;
247 
248 	memset(&laddr, 0, sizeof(laddr));
249 	inet_aton("0.0.0.0", (struct in_addr *) &laddr.sin_addr.s_addr);
250 	laddr.sin_port = htons(1984);
251 	laddr.sin_family = AF_INET;
252 
253 	for (opt=1; (opt < argc); opt++) {
254 		if (argnmatch(argv[opt], "--listen=")) {
255 			char *locaddr, *p;
256 			int locport;
257 
258 			locaddr = strchr(argv[opt], '=')+1;
259 			p = strchr(locaddr, ':');
260 			if (p) { locport = atoi(p+1); *p = '\0'; } else locport = 1984;
261 
262 			memset(&laddr, 0, sizeof(laddr));
263 			laddr.sin_port = htons(locport);
264 			laddr.sin_family = AF_INET;
265 			if (inet_aton(locaddr, (struct in_addr *) &laddr.sin_addr.s_addr) == 0) {
266 				errprintf("Invalid listen address %s\n", locaddr);
267 				return 1;
268 			}
269 			if (p) *p = ':';
270 		}
271 		else if (argnmatch(argv[opt], "--server=") || argnmatch(argv[opt], "--bbdisplay=")) {
272 			char *ips, *ip1;
273 			int port1;
274 
275 			ips = strdup(strchr(argv[opt], '=')+1);
276 
277 			ip1 = strtok(ips, ",");
278 			while (ip1) {
279 				char *p;
280 				p = strchr(ip1, ':');
281 				if (p) { port1 = atoi(p+1); *p = '\0'; } else port1 = 1984;
282 
283 				memset(&xymonserveraddr[xymonservercount], 0, sizeof(xymonserveraddr[xymonservercount]));
284 				xymonserveraddr[xymonservercount].sin_port = htons(port1);
285 				xymonserveraddr[xymonservercount].sin_family = AF_INET;
286 				if (inet_aton(ip1, (struct in_addr *) &xymonserveraddr[xymonservercount].sin_addr.s_addr) == 0) {
287 					errprintf("Invalid remote address %s\n", ip1);
288 				}
289 				else {
290 					xymonservercount++;
291 				}
292 				if (p) *p = ':';
293 				ip1 = strtok(NULL, ",");
294 			}
295 			xfree(ips);
296 		}
297 		else if (argnmatch(argv[opt], "--timeout=")) {
298 			char *p = strchr(argv[opt], '=');
299 			timeout = atoi(p+1);
300 		}
301 		else if (argnmatch(argv[opt], "--lqueue=")) {
302 			char *p = strchr(argv[opt], '=');
303 			listenq = atoi(p+1);
304 		}
305 		else if (strcmp(argv[opt], "--daemon") == 0) {
306 			daemonize = 1;
307 		}
308 		else if (strcmp(argv[opt], "--no-daemon") == 0) {
309 			daemonize = 0;
310 		}
311 		else if (argnmatch(argv[opt], "--pidfile=")) {
312 			char *p = strchr(argv[opt], '=');
313 			pidfile = strdup(p+1);
314 		}
315 		else if (argnmatch(argv[opt], "--logfile=")) {
316 			char *p = strchr(argv[opt], '=');
317 			logfile = strdup(p+1);
318 		}
319 		else if (strcmp(argv[opt], "--log-details") == 0) {
320 			logdetails = 1;
321 		}
322 		else if (argnmatch(argv[opt], "--report=")) {
323 			char *p1 = strchr(argv[opt], '=')+1;
324 
325 			if (strchr(p1, '.') == NULL) {
326 				if (xgetenv("MACHINE") == NULL) {
327 					errprintf("Environment variable MACHINE is undefined\n");
328 					return 1;
329 				}
330 
331 				proxyname = strdup(xgetenv("MACHINE"));
332 				proxyname = (char *)realloc(proxyname, strlen(proxyname) + strlen(p1) + 1);
333 				strcat(proxyname, ".");
334 				strcat(proxyname, p1);
335 				proxynamesvc = strdup(p1);
336 			}
337 			else {
338 				proxyname = strdup(p1);
339 				proxynamesvc = strchr(proxyname, '.')+1;
340 			}
341 		}
342 		else if (strcmp(argv[opt], "--debug") == 0) {
343 			debug = 1;
344 		}
345 		else if (strcmp(argv[opt], "--version") == 0) {
346 			printf("xymonproxy version %s\n", VERSION);
347 			return 0;
348 		}
349 		else if (strcmp(argv[opt], "--help") == 0) {
350 			printf("xymonproxy version %s\n", VERSION);
351 			printf("\nOptions:\n");
352 			printf("\t--listen=IP[:port]          : Listen address and portnumber\n");
353 			printf("\t--server=IP[:port]          : Xymon server address and portnumber\n");
354 			printf("\t--report=[HOST.]SERVICE     : Sends a status message about proxy activity\n");
355 			printf("\t--timeout=N                 : Communications timeout (seconds)\n");
356 			printf("\t--lqueue=N                  : Listen-queue size\n");
357 			printf("\t--daemon                    : Run as a daemon\n");
358 			printf("\t--no-daemon                 : Do not run as a daemon\n");
359 			printf("\t--pidfile=FILENAME          : Save process-ID of daemon to FILENAME\n");
360 			printf("\t--logfile=FILENAME          : Log to FILENAME instead of stderr\n");
361 			printf("\t--debug                     : Enable debugging output\n");
362 			printf("\n");
363 			return 0;
364 		}
365 	}
366 
367 	if (xymonservercount == 0) {
368 		errprintf("No Xymon server address given - aborting\n");
369 		return 1;
370 	}
371 
372 	/* Set up a socket to listen for new connections */
373 	lsocket = socket(AF_INET, SOCK_STREAM, 0);
374 	if (lsocket == -1) {
375 		errprintf("Cannot create listen socket (%s)\n", strerror(errno));
376 		return 1;
377 	}
378 	opt = 1;
379 	setsockopt(lsocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
380 	fcntl(lsocket, F_SETFL, O_NONBLOCK);
381 	if (bind(lsocket, (struct sockaddr *)&laddr, sizeof(laddr)) == -1) {
382 		errprintf("Cannot bind to listen socket (%s)\n", strerror(errno));
383 		return 1;
384 	}
385 
386 	if (listen(lsocket, listenq) == -1) {
387 		errprintf("Cannot listen (%s)\n", strerror(errno));
388 		return 1;
389 	}
390 
391 	/* Redirect logging to the logfile, if requested */
392 	if (logfile) {
393 		reopen_file(logfile, "a", stdout);
394 		reopen_file(logfile, "a", stderr);
395 	}
396 
397 	errprintf("xymonproxy version %s starting\n", VERSION);
398 	errprintf("Listening on %s:%d\n", inet_ntoa(laddr.sin_addr), ntohs(laddr.sin_port));
399 	{
400 		int i;
401 		char *p;
402 		char srvrs[500];
403 
404 		for (i=0, srvrs[0] = '\0', p=srvrs; (i<xymonservercount); i++) {
405 			p += sprintf(p, "%s:%d ", inet_ntoa(xymonserveraddr[i].sin_addr), ntohs(xymonserveraddr[i].sin_port));
406 		}
407 		errprintf("Sending to Xymon server(s) %s\n", srvrs);
408 	}
409 
410 	if (daemonize) {
411 		pid_t childpid;
412 
413 		reopen_file("/dev/null", "r", stdin);
414 
415 		/* Become a daemon */
416 		childpid = fork();
417 		if (childpid < 0) {
418 			/* Fork failed */
419 			errprintf("Could not fork\n");
420 			exit(1);
421 		}
422 		else if (childpid > 0) {
423 			/* Parent - save PID and exit */
424 			FILE *fd = fopen(pidfile, "w");
425 			if (fd) {
426 				fprintf(fd, "%d\n", (int)childpid);
427 				fclose(fd);
428 			}
429 			exit(0);
430 		}
431 		/* Child (daemon) continues here */
432 		setsid();
433 	}
434 
435 	setup_signalhandler(proxynamesvc);
436 	memset(&sa, 0, sizeof(sa));
437 	sa.sa_handler = sigmisc_handler;
438 	sigaction(SIGHUP, &sa, NULL);
439 	sigaction(SIGTERM, &sa, NULL);
440 	sigaction(SIGUSR1, &sa, NULL);
441 
442 	do {
443 		fd_set fdread, fdwrite;
444 		int maxfd;
445 		struct timespec tmo;
446 		struct timeval selecttmo;
447 		int n, idx;
448 		conn_t *cwalk, *ctmp;
449 		time_t ctime;
450 		time_t now;
451 		int combining = 0;
452 
453 		/* See if it is time for a status report */
454 		if (proxyname && ((now = gettimer()) >= (laststatus+300))) {
455 			conn_t *stentry;
456 			int ccount = 0;
457 			unsigned long bufspace = 0;
458 			unsigned long avgtime;	/* In millisecs */
459 			char runtime_s[30];
460 			unsigned long runt = (unsigned long) (now-startuptime);
461 			char *p;
462 			unsigned long msgs_sent = msgs_total - msgs_total_last;
463 
464 			/* Setup a conn_t struct for the status message */
465 			stentry = (conn_t *)calloc(1, sizeof(conn_t));
466 			stentry->state = P_REQ_READY;
467 			stentry->csocket = stentry->ssocket = -1;
468 			stentry->clientip = &stentry->caddr.sin_addr;
469 			getntimer(&stentry->arrival);
470 			stentry->timelimit.tv_sec = stentry->arrival.tv_sec + timeout;
471 			stentry->timelimit.tv_nsec = stentry->arrival.tv_nsec;
472 			stentry->bufsize = BUFSZ_INC;
473 			stentry->buf = (char *)malloc(stentry->bufsize);
474 			stentry->next = chead;
475 			chead = stentry;
476 
477 			sprintf(runtime_s, "%lu days, %02lu:%02lu:%02lu",
478 				(runt/86400), ((runt % 86400) / 3600),
479 				((runt % 3600) / 60), (runt % 60));
480 
481 			init_timestamp();
482 
483 			for (cwalk = chead; (cwalk); cwalk = cwalk->next) {
484 				ccount++;
485 				bufspace += cwalk->bufsize;
486 			}
487 
488 			if (msgs_sent == 0) {
489 				avgtime = 0;
490 			}
491 			else {
492 				avgtime = (timeinqueue.tv_sec*1000 + timeinqueue.tv_nsec/1000) / msgs_sent;
493 			}
494 
495 			p = stentry->buf;
496 			p += sprintf(p, "combo\nstatus+11 %s green %s - xymon proxy up: %s\n\nxymonproxy for Xymon version %s\n\nProxy statistics\n\nIncoming messages        : %10lu (%lu msgs/second)\nOutbound messages        : %10lu\n\nIncoming message distribution\n- Combo messages         : %10lu\n- Status messages        : %10lu\n  Messages merged        : %10lu\n  Resulting combos       : %10lu\n- Other messages         : %10lu\n\nProxy resources\n- Connection table size  : %10d\n- Buffer space           : %10lu kByte\n",
497 				proxyname, timestamp, runtime_s, VERSION,
498 				msgs_total, (msgs_total - msgs_total_last) / (now - laststatus),
499 				msgs_delivered,
500 				msgs_combo,
501 				msgs_status, msgs_merged, msgs_combined,
502 				msgs_other,
503 				ccount, bufspace / 1024);
504 			p += sprintf(p, "\nTimeout/failure details\n");
505 			p += sprintf(p, "- %-22s : %10lu\n", statename[P_REQ_READING], msgs_timeout_from[P_REQ_READING]);
506 			p += sprintf(p, "- %-22s : %10lu\n", statename[P_REQ_CONNECTING], msgs_timeout_from[P_REQ_CONNECTING]);
507 			p += sprintf(p, "- %-22s : %10lu\n", statename[P_REQ_SENDING], msgs_timeout_from[P_REQ_SENDING]);
508 			p += sprintf(p, "- %-22s : %10lu\n", "recovered", msgs_recovered);
509 			p += sprintf(p, "- %-22s : %10lu\n", statename[P_RESP_READING], msgs_timeout_from[P_RESP_READING]);
510 			p += sprintf(p, "- %-22s : %10lu\n", statename[P_RESP_SENDING], msgs_timeout_from[P_RESP_SENDING]);
511 			p += sprintf(p, "\n%-24s : %10lu.%03lu\n", "Average queue time",
512 					(avgtime / 1000), (avgtime % 1000));
513 
514 			/* Clear the summary collection totals */
515 			laststatus = now;
516 			msgs_total_last = msgs_total;
517 			timeinqueue.tv_sec = timeinqueue.tv_nsec = 0;
518 
519 			stentry->buflen = strlen(stentry->buf);
520 			stentry->bufp = stentry->buf + stentry->buflen;
521 			stentry->state = P_REQ_READY;
522 		}
523 
524 		FD_ZERO(&fdread);
525 		FD_ZERO(&fdwrite);
526 		maxfd = -1;
527 		combining = 0;
528 
529 		for (cwalk = chead, idx=0; (cwalk); cwalk = cwalk->next, idx++) {
530 			dbgprintf("state %d: %s\n", idx, statename[cwalk->state]);
531 
532 			/* First, handle any state transitions and setup the FD sets for select() */
533 			switch (cwalk->state) {
534 			  case P_REQ_READING:
535 				FD_SET(cwalk->csocket, &fdread);
536 				if (cwalk->csocket > maxfd) maxfd = cwalk->csocket;
537 				break;
538 
539 			  case P_REQ_READY:
540 				if (cwalk->buflen <= 6) {
541 					/* Got an empty request - just drop it */
542 					dbgprintf("Dropping empty request from %s\n", inet_ntoa(*cwalk->clientip));
543 					cwalk->state = P_CLEANUP;
544 					break;
545 				}
546 
547 				if (logdetails) do_log(cwalk);
548 				cwalk->conntries = CONNECT_TRIES;
549 				cwalk->sendtries = SEND_TRIES;
550 				cwalk->conntime = 0;
551 
552 				/*
553 				 * We now want to find out what kind of message we've got.
554 				 * If it's NOT a "status" message, just pass it along.
555 				 * For "status" messages, we want to try and merge many small
556 				 * messages into a "combo" message - so send those off the the
557 				 * P_REQ_COMBINING state for a while.
558 				 * If we are not going to send back a response to the client, we
559 				 * also close the client socket since it is no longer needed.
560 				 * Note that since we started out as optimists and put a "combo\n"
561 				 * at the front of the buffer, we need to skip that when looking at
562 				 * what type of message it is. Hence the "cwalk->buf+6".
563 				 */
564 				if (strncmp(cwalk->buf+6, "client", 6) == 0) {
565 					/*
566 					 * "client" messages go to all Xymon servers, but
567 					 * we will only pass back the response from one of them
568 					 * (the last one).
569 					 */
570 					shutdown(cwalk->csocket, SHUT_RD);
571 					msgs_other++;
572 					cwalk->snum = xymonservercount;
573 
574 					if ((cwalk->buflen + 40 ) < cwalk->bufsize) {
575 						int n = sprintf(cwalk->bufp,
576 								"\n[proxy]\nClientIP:%s\n",
577 								inet_ntoa(*cwalk->clientip));
578 						cwalk->bufp += n;
579 						cwalk->buflen += n;
580 					}
581 				}
582 				else if ((strncmp(cwalk->buf+6, "query", 5) == 0)  ||
583 				         (strncmp(cwalk->buf+6, "config", 6) == 0) ||
584 				         (strncmp(cwalk->buf+6, "ping", 4) == 0) ||
585 				         (strncmp(cwalk->buf+6, "download", 8) == 0)) {
586 					/*
587 					 * These requests get a response back, but send no data.
588 					 * Send these to the last of the Xymon servers only.
589 					 */
590 					shutdown(cwalk->csocket, SHUT_RD);
591 					msgs_other++;
592 					cwalk->snum = 1;
593 				}
594 				else {
595 					/* It's a request that doesn't take a response. */
596 					if (cwalk->csocket >= 0) {
597 						shutdown(cwalk->csocket, SHUT_RDWR);
598 						close(cwalk->csocket); sockcount--;
599 						cwalk->csocket = -1;
600 					}
601 					cwalk->snum = xymonservercount;
602 
603 					if (strncmp(cwalk->buf+6, "status", 6) == 0) {
604 						msgs_status++;
605 						getntimer(&cwalk->timelimit);
606 						cwalk->timelimit.tv_nsec += COMBO_DELAY;
607 						if (cwalk->timelimit.tv_nsec >= 1000000000) {
608 							cwalk->timelimit.tv_sec++;
609 							cwalk->timelimit.tv_nsec -= 1000000000;
610 						}
611 
612 						/*
613 						 * Some clients (bbnt) send a trailing \0, so we cannot
614 						 * rely on buflen being what we want it to be.
615 						 */
616 						cwalk->buflen = strlen(cwalk->buf);
617 						cwalk->bufp = cwalk->buf + cwalk->buflen;
618 
619 						if ((cwalk->buflen + 50 ) < cwalk->bufsize) {
620 							int n = sprintf(cwalk->bufp,
621 									"\nStatus message received from %s\n",
622 									inet_ntoa(*cwalk->clientip));
623 							cwalk->bufp += n;
624 							cwalk->buflen += n;
625 						}
626 
627 						cwalk->state = P_REQ_COMBINING;
628 						break;
629 					}
630 					else if (strncmp(cwalk->buf+6, "combo\n", 6) == 0) {
631 						char *currmsg, *nextmsg;
632 
633 						msgs_combo++;
634 
635 						/*
636 						 * Some clients (bbnt) send a trailing \0, so we cannot
637 						 * rely on buflen being what we want it to be.
638 						 */
639 						cwalk->buflen = strlen(cwalk->buf);
640 						cwalk->bufp = cwalk->buf + cwalk->buflen;
641 
642 						getntimer(&cwalk->timelimit);
643 						cwalk->timelimit.tv_nsec += COMBO_DELAY;
644 						if (cwalk->timelimit.tv_nsec >= 1000000000) {
645 							cwalk->timelimit.tv_sec++;
646 							cwalk->timelimit.tv_nsec -= 1000000000;
647 						}
648 
649 						currmsg = cwalk->buf+12; /* Skip pre-def. "combo\n" and message "combo\n" */
650 						do {
651 							nextmsg = strstr(currmsg, "\n\nstatus");
652 							if (nextmsg) { *(nextmsg+1) = '\0'; nextmsg += 2; }
653 
654 							/* Create a duplicate conn_t record for all embedded messages */
655 							ctmp = (conn_t *)malloc(sizeof(conn_t));
656 							memcpy(ctmp, cwalk, sizeof(conn_t));
657 							ctmp->bufsize = BUFSZ_INC*(((6 + strlen(currmsg) + 50) / BUFSZ_INC) + 1);
658 							ctmp->buf = (char *)malloc(ctmp->bufsize);
659 							ctmp->buflen = sprintf(ctmp->buf,
660 								"combo\n%s\nStatus message received from %s\n",
661 								currmsg, inet_ntoa(*cwalk->clientip));
662 							ctmp->bufp = ctmp->buf + ctmp->buflen;
663 							ctmp->state = P_REQ_COMBINING;
664 							ctmp->next = chead;
665 							chead = ctmp;
666 
667 							currmsg = nextmsg;
668 						} while (currmsg);
669 
670 						/* We don't do anymore with this conn_t */
671 						cwalk->state = P_CLEANUP;
672 						break;
673 					}
674 					else if (strncmp(cwalk->buf+6, "page", 4) == 0) {
675 						/* xymond has no use for page requests */
676 						cwalk->state = P_CLEANUP;
677 						break;
678 					}
679 					else {
680 						msgs_other++;
681 					}
682 				}
683 
684 				/*
685 				 * This wont be made into a combo message, so skip the "combo\n"
686 				 * and go off to send the message to the server.
687 				 */
688 				cwalk->bufp = cwalk->buf+6;
689 				cwalk->buflen -= 6;
690 				cwalk->bufpsave = cwalk->bufp;
691 				cwalk->buflensave = cwalk->buflen;
692 				cwalk->state = P_REQ_CONNECTING;
693 				/* Fall through for non-status messages */
694 
695 			  case P_REQ_CONNECTING:
696 				/* Need to restore the bufp and buflen, as we may get here many times for one message */
697 				cwalk->bufp = cwalk->bufpsave;
698 				cwalk->buflen = cwalk->buflensave;
699 
700 				ctime = gettimer();
701 				if (ctime < (cwalk->conntime + CONNECT_INTERVAL)) {
702 					dbgprintf("Delaying retry of connection\n");
703 					break;
704 				}
705 
706 				cwalk->conntries--;
707 				cwalk->conntime = ctime;
708 				if (cwalk->conntries < 0) {
709 					errprintf("Server not responding, message lost\n");
710 					cwalk->state = P_REQ_DONE;	/* Not CLENAUP - might be more servers */
711 					msgs_timeout_from[P_REQ_CONNECTING]++;
712 					break;
713 				}
714 
715 				cwalk->ssocket = socket(AF_INET, SOCK_STREAM, 0);
716 				if (cwalk->ssocket == -1) {
717 					dbgprintf("Could not get a socket - will try again\n");
718 					break; /* Retry the next time around */
719 				}
720 				sockcount++;
721 				fcntl(cwalk->ssocket, F_SETFL, O_NONBLOCK);
722 
723 				{
724 					int idx = (xymonservercount - cwalk->snum);
725 					n = connect(cwalk->ssocket, (struct sockaddr *)&xymonserveraddr[idx], sizeof(xymonserveraddr[idx]));
726 					cwalk->serverip = &xymonserveraddr[idx].sin_addr;
727 					dbgprintf("Connecting to Xymon server at %s\n", inet_ntoa(*cwalk->serverip));
728 				}
729 
730 				if ((n == 0) || ((n == -1) && (errno == EINPROGRESS))) {
731 					cwalk->state = P_REQ_SENDING;
732 					cwalk->connectpending = 1;
733 					getntimer(&cwalk->timelimit);
734 					cwalk->timelimit.tv_sec += timeout;
735 					/* Fallthrough */
736 				}
737 				else {
738 					/* Could not connect! Invoke retries */
739 					dbgprintf("Connect to server failed: %s\n", strerror(errno));
740 					close(cwalk->ssocket); sockcount--;
741 					cwalk->ssocket = -1;
742 					break;
743 				}
744 				/* No "break" here! */
745 
746 			  case P_REQ_SENDING:
747 				FD_SET(cwalk->ssocket, &fdwrite);
748 				if (cwalk->ssocket > maxfd) maxfd = cwalk->ssocket;
749 				break;
750 
751 			  case P_REQ_DONE:
752 				/* Request has been sent to the server - we're done writing data */
753 				shutdown(cwalk->ssocket, SHUT_WR);
754 				cwalk->snum--;
755 				if (cwalk->snum) {
756 					/* More servers to do */
757 					close(cwalk->ssocket); cwalk->ssocket = -1; sockcount--;
758 					cwalk->conntries = CONNECT_TRIES;
759 					cwalk->sendtries = SEND_TRIES;
760 					cwalk->conntime = 0;
761 					cwalk->state = P_REQ_CONNECTING;
762 					break;
763 				}
764 				else {
765 					/* Have sent to all servers, grab the response from the last one. */
766 					cwalk->bufp = cwalk->buf; cwalk->buflen = 0;
767 					memset(cwalk->buf, 0, cwalk->bufsize);
768 				}
769 
770 				msgs_delivered++;
771 
772 				if (cwalk->sendtries < SEND_TRIES) {
773 					errprintf("Recovered from write error after %d retries\n",
774 						  (SEND_TRIES - cwalk->sendtries));
775 					msgs_recovered++;
776 				}
777 
778 				if (cwalk->arrival.tv_sec > 0) {
779 					struct timespec departure;
780 
781 					getntimer(&departure);
782 					timeinqueue.tv_sec += (departure.tv_sec - cwalk->arrival.tv_sec);
783 					if (departure.tv_nsec >= cwalk->arrival.tv_nsec) {
784 						timeinqueue.tv_nsec += (departure.tv_nsec - cwalk->arrival.tv_nsec);
785 					}
786 					else {
787 						timeinqueue.tv_sec--;
788 						timeinqueue.tv_nsec += (1000000000 + departure.tv_nsec - cwalk->arrival.tv_nsec);
789 					}
790 
791 					if (timeinqueue.tv_nsec > 1000000000) {
792 						timeinqueue.tv_sec++;
793 						timeinqueue.tv_nsec -= 1000000000;
794 					}
795 				}
796 				else {
797 					errprintf("Odd ... this message was not timestamped\n");
798 				}
799 
800 				if (cwalk->csocket < 0) {
801 					cwalk->state = P_CLEANUP;
802 					break;
803 				}
804 				else {
805 					cwalk->state = P_RESP_READING;
806 					getntimer(&cwalk->timelimit);
807 					cwalk->timelimit.tv_sec += timeout;
808 				}
809 				/* Fallthrough */
810 
811 			  case P_RESP_READING:
812 				FD_SET(cwalk->ssocket, &fdread);
813 				if (cwalk->ssocket > maxfd) maxfd = cwalk->ssocket;
814 				break;
815 
816 			  case P_RESP_READY:
817 				shutdown(cwalk->ssocket, SHUT_RD);
818 				close(cwalk->ssocket); sockcount--;
819 				cwalk->ssocket = -1;
820 				cwalk->bufp = cwalk->buf;
821 				cwalk->state = P_RESP_SENDING;
822 				getntimer(&cwalk->timelimit);
823 				cwalk->timelimit.tv_sec += timeout;
824 				/* Fall through */
825 
826 			  case P_RESP_SENDING:
827 				if (cwalk->buflen && (cwalk->csocket >= 0)) {
828 					FD_SET(cwalk->csocket, &fdwrite);
829 					if (cwalk->csocket > maxfd) maxfd = cwalk->csocket;
830 					break;
831 				}
832 				else {
833 					cwalk->state = P_RESP_DONE;
834 				}
835 				/* Fall through */
836 
837 			  case P_RESP_DONE:
838 				if (cwalk->csocket >= 0) {
839 					shutdown(cwalk->csocket, SHUT_WR);
840 					close(cwalk->csocket); sockcount--;
841 				}
842 				cwalk->csocket = -1;
843 				cwalk->state = P_CLEANUP;
844 				/* Fall through */
845 
846 			  case P_CLEANUP:
847 				if (cwalk->csocket >= 0) {
848 					close(cwalk->csocket); sockcount--;
849 					cwalk->csocket = -1;
850 				}
851 				if (cwalk->ssocket >= 0) {
852 					close(cwalk->ssocket); sockcount--;
853 					cwalk->ssocket = -1;
854 				}
855 				cwalk->arrival.tv_sec = cwalk->arrival.tv_nsec = 0;
856 				cwalk->bufp = cwalk->buf;
857 				cwalk->buflen = 0;
858 				memset(cwalk->buf, 0, cwalk->bufsize);
859 				memset(&cwalk->caddr, 0, sizeof(cwalk->caddr));
860 				cwalk->madetocombo = 0;
861 				cwalk->state = P_IDLE;
862 				break;
863 
864 			  case P_IDLE:
865 				break;
866 
867 			  case P_REQ_COMBINING:
868 				/* See if we can combine some "status" messages into a "combo" */
869 				combining++;
870 				getntimer(&tmo);
871 				if ((cwalk->buflen < MINIMUM_FOR_COMBO) && !overdue(&tmo, &cwalk->timelimit)) {
872 					conn_t *cextra;
873 
874 					/* Are there any other messages in P_COMBINING state ? */
875 					cextra = cwalk->next;
876 					while (cextra && (cextra->state != P_REQ_COMBINING)) cextra = cextra->next;
877 
878 					if (cextra) {
879 						/*
880 						 * Yep. It might be worthwhile to go for a combo.
881 						 */
882 						while (cextra && (cwalk->buflen < (MAXIMUM_FOR_COMBO-20))) {
883 							if (strncmp(cextra->buf+6, "status", 6) == 0) {
884 								int newsize;
885 
886 								/*
887 								 * Size of the new message - if the cextra one
888 								 * is merged - is the cwalk buffer, plus the
889 								 * two newlines separating messages in combo's,
890 								 * plus the cextra buffer except the leading
891 								 * "combo\n" of 6 bytes.
892 								 */
893 								newsize = cwalk->buflen + 2 + (cextra->buflen - 6);
894 
895 								if ((newsize < cwalk->bufsize) && (newsize < MAXIMUM_FOR_COMBO)) {
896 									/*
897 									 * There's room for it. Add it to the
898 									 * cwalk buffer, but without the leading
899 									 * "combo\n" (we already have one of those).
900 									 */
901 									cwalk->madetocombo++;
902 									strcat(cwalk->buf, "\n\n");
903 									strcat(cwalk->buf, cextra->buf+6);
904 									cwalk->buflen = newsize;
905 									cextra->state = P_CLEANUP;
906 									dbgprintf("Merged combo\n");
907 									msgs_merged++;
908 								}
909 							}
910 
911 							/* Go to the next connection in the right state */
912 							do {
913 								cextra = cextra->next;
914 							} while (cextra && (cextra->state != P_REQ_COMBINING));
915 						}
916 					}
917 				}
918 				else {
919 					combining--;
920 					cwalk->state = P_REQ_CONNECTING;
921 
922 					if (cwalk->madetocombo) {
923 						/*
924 						 * Point the outgoing buffer pointer to the full
925 						 * message, including the "combo\n"
926 						 */
927 						cwalk->bufp = cwalk->buf;
928 						cwalk->madetocombo++;
929 						msgs_merged++; /* Count the proginal message also */
930 						msgs_combined++;
931 						dbgprintf("Now going to send combo from %d messages\n%s\n",
932 							cwalk->madetocombo, cwalk->buf);
933 					}
934 					else {
935 						/*
936 						 * Skip sending the "combo\n" at start of buffer.
937 						 */
938 						cwalk->bufp = cwalk->buf+6;
939 						cwalk->buflen -= 6;
940 						dbgprintf("No messages to combine - sending unchanged\n");
941 					}
942 				}
943 
944 				cwalk->bufpsave = cwalk->bufp;
945 				cwalk->buflensave = cwalk->buflen;
946 				break;
947 
948 			  default:
949 				break;
950 			}
951 		}
952 
953 		/* Add the listen-socket to the select() list, but only if we have room */
954 		if (sockcount < MAX_OPEN_SOCKS) {
955 			FD_SET(lsocket, &fdread);
956 			if (lsocket > maxfd) maxfd = lsocket;
957 		}
958 		else {
959 			static time_t lastlog = 0;
960 			if ((now = gettimer()) < (lastlog+30)) {
961 				lastlog = now;
962 				errprintf("Squelching incoming connections, sockcount=%d\n", sockcount);
963 			}
964 		}
965 
966 		if (combining) {
967 			selecttmo.tv_sec = 0; selecttmo.tv_usec = COMBO_DELAY / 1000;
968 		}
969 		else {
970 			selecttmo.tv_sec = 1; selecttmo.tv_usec = 0;
971 		}
972 
973 		n = select(maxfd+1, &fdread, &fdwrite, NULL, &selecttmo);
974 
975 		if (n < 0) {
976 			errprintf("select() failed: %s\n", strerror(errno));
977 			if (++selectfailures > 5) {
978 				errprintf("Too many select failures, aborting\n");
979 				exit(1);
980 			}
981 		}
982 		else if (n == 0) {
983 			/* Timeout */
984 			if (selectfailures > 0) selectfailures--;
985 
986 			getntimer(&tmo);
987 			for (cwalk = chead; (cwalk); cwalk = cwalk->next) {
988 				switch (cwalk->state) {
989 				  case P_REQ_READING:
990 				  case P_REQ_SENDING:
991 				  case P_RESP_READING:
992 				  case P_RESP_SENDING:
993 					if (overdue(&tmo, &cwalk->timelimit)) {
994 						cwalk->state = P_CLEANUP;
995 						msgs_timeout_from[cwalk->state]++;
996 					}
997 					break;
998 
999 				  default:
1000 					break;
1001 				}
1002 			}
1003 		}
1004 		else {
1005 			if (selectfailures > 0) selectfailures--;
1006 
1007 			for (cwalk = chead; (cwalk); cwalk = cwalk->next) {
1008 				switch (cwalk->state) {
1009 				  case P_REQ_READING:
1010 					if (FD_ISSET(cwalk->csocket, &fdread)) {
1011 						do_read(cwalk->csocket, cwalk->clientip, cwalk, P_REQ_READY);
1012 					}
1013 					break;
1014 
1015 				  case P_REQ_SENDING:
1016 					if (FD_ISSET(cwalk->ssocket, &fdwrite)) {
1017 						if (cwalk->connectpending) {
1018 							int connres, connressize;
1019 
1020 							/* First time ready for write - check connect status */
1021 							cwalk->connectpending = 0;
1022 							connressize = sizeof(connres);
1023 							n = getsockopt(cwalk->ssocket, SOL_SOCKET, SO_ERROR, &connres, &connressize);
1024 							if (connres != 0) {
1025 								/* Connect failed! Invoke retries. */
1026 								dbgprintf("Connect to server failed: %s - retrying\n",
1027 									strerror(errno));
1028 								close(cwalk->ssocket); sockcount--;
1029 								cwalk->ssocket = -1;
1030 								cwalk->state = P_REQ_CONNECTING;
1031 								break;
1032 							}
1033 						}
1034 
1035 						if ( (do_write(cwalk->ssocket, cwalk->serverip, cwalk, P_REQ_DONE) == -1) &&
1036 						     (cwalk->sendtries > 0) ) {
1037 							/*
1038 							 * Got a "write" error after connecting.
1039 							 * Try saving the situation by retrying the send later.
1040 							 */
1041 							dbgprintf("Attempting recovery from write error\n");
1042 							close(cwalk->ssocket); sockcount--; cwalk->ssocket = -1;
1043 							cwalk->sendtries--;
1044 							cwalk->state = P_REQ_CONNECTING;
1045 							cwalk->conntries = CONNECT_TRIES;
1046 							cwalk->conntime = gettimer();
1047 						}
1048 					}
1049 					break;
1050 
1051 				  case P_RESP_READING:
1052 					if (FD_ISSET(cwalk->ssocket, &fdread)) {
1053 						do_read(cwalk->ssocket, cwalk->serverip, cwalk, P_RESP_READY);
1054 					}
1055 					break;
1056 
1057 				  case P_RESP_SENDING:
1058 					if (FD_ISSET(cwalk->csocket, &fdwrite)) {
1059 						do_write(cwalk->csocket, cwalk->clientip, cwalk, P_RESP_DONE);
1060 					}
1061 					break;
1062 
1063 				  default:
1064 					break;
1065 				}
1066 			}
1067 
1068 			if (FD_ISSET(lsocket, &fdread)) {
1069 				/* New incoming connection */
1070 				conn_t *newconn;
1071 				int caddrsize;
1072 
1073 				dbgprintf("New connection\n");
1074 				for (cwalk = chead; (cwalk && (cwalk->state != P_IDLE)); cwalk = cwalk->next);
1075 				if (cwalk) {
1076 					newconn = cwalk;
1077 				}
1078 				else {
1079 					newconn = malloc(sizeof(conn_t));
1080 					newconn->next = chead;
1081 					chead = newconn;
1082 					newconn->bufsize = BUFSZ_INC;
1083 					newconn->buf = newconn->bufp = malloc(newconn->bufsize);
1084 				}
1085 
1086 				newconn->connectpending = 0;
1087 				newconn->madetocombo = 0;
1088 				newconn->snum = 0;
1089 				newconn->ssocket = -1;
1090 				newconn->serverip = NULL;
1091 				newconn->conntries = 0;
1092 				newconn->sendtries = 0;
1093 				newconn->timelimit.tv_sec = newconn->timelimit.tv_nsec = 0;
1094 
1095 				/*
1096 				 * Why this ? Because we like to merge small status messages
1097 				 * into larger combo messages. So put a "combo\n" at the start
1098 				 * of the buffer, and then don't send it if we decide it won't
1099 				 * be a combo-message after all.
1100 				 */
1101 				strcpy(newconn->buf, "combo\n");
1102 				newconn->buflen = 6;
1103 				newconn->bufp = newconn->buf+6;
1104 
1105 				caddrsize = sizeof(newconn->caddr);
1106 				newconn->csocket = accept(lsocket, (struct sockaddr *)&newconn->caddr, &caddrsize);
1107 				if (newconn->csocket == -1) {
1108 					/* accept() failure. Yes, it does happen! */
1109 					dbgprintf("accept failure, ignoring connection (%s), sockcount=%d\n",
1110 						strerror(errno), sockcount);
1111 					newconn->state = P_IDLE;
1112 				}
1113 				else {
1114 					msgs_total++;
1115 					newconn->clientip = &newconn->caddr.sin_addr;
1116 					sockcount++;
1117 					fcntl(newconn->csocket, F_SETFL, O_NONBLOCK);
1118 					newconn->state = P_REQ_READING;
1119 					getntimer(&newconn->arrival);
1120 					newconn->timelimit.tv_sec = newconn->arrival.tv_sec + timeout;
1121 					newconn->timelimit.tv_nsec = newconn->arrival.tv_nsec;
1122 				}
1123 			}
1124 		}
1125 
1126 		/* Clean up unused conn_t's */
1127 		{
1128 			conn_t *tmp, *khead;
1129 
1130 			khead = NULL; cwalk = chead;
1131 			while (cwalk) {
1132 				if ((cwalk == chead) && (cwalk->state == P_IDLE)) {
1133 					/* head of chain is dead */
1134 					tmp = chead;
1135 					chead = chead->next;
1136 					tmp->next = khead;
1137 					khead = tmp;
1138 
1139 					cwalk = chead;
1140 				}
1141 				else if (cwalk->next && (cwalk->next->state == P_IDLE)) {
1142 					tmp = cwalk->next;
1143 					cwalk->next = tmp->next;
1144 					tmp->next = khead;
1145 					khead = tmp;
1146 
1147 					/* cwalk is unchanged */
1148 				}
1149 				else {
1150 					cwalk = cwalk->next;
1151 				}
1152 			}
1153 
1154 			/* khead now holds a list of P_IDLE conn_t structs */
1155 			while (khead) {
1156 				tmp = khead;
1157 				khead = khead->next;
1158 
1159 				if (tmp->buf) xfree(tmp->buf);
1160 				xfree(tmp);
1161 			}
1162 		}
1163 	} while (keeprunning);
1164 
1165 	if (pidfile) unlink(pidfile);
1166 	return 0;
1167 }
1168 
1169