1 /*----------------------------------------------------------------------------*/
2 /* Xymon message proxy. */
3 /* */
4 /* Copyright (C) 2004-2011 Henrik Storner <henrik@hswn.dk> */
5 /* */
6 /* This program is released under the GNU General Public License (GPL), */
7 /* version 2. See the file "COPYING" for details. */
8 /* */
9 /*----------------------------------------------------------------------------*/
10
11 static char rcsid[] = "$Id: xymonproxy.c 7678 2015-10-01 14:42:42Z jccleaver $";
12
13 #include "config.h"
14
15 #include <sys/time.h>
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h> /* Someday I'll move to GNU Autoconf for this ... */
23 #endif
24 #include <errno.h>
25 #include <sys/resource.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <stdio.h>
31 #include <netdb.h>
32 #include <ctype.h>
33 #include <signal.h>
34 #include <time.h>
35
36 #include "version.h"
37 #include "libxymon.h"
38
39 enum phase_t {
40 P_IDLE,
41
42 P_REQ_READING, /* Reading request data */
43 P_REQ_READY, /* Done reading request from client */
44
45 P_REQ_COMBINING,
46
47 P_REQ_CONNECTING, /* Connecting to server */
48 P_REQ_SENDING, /* Sending request data */
49 P_REQ_DONE, /* Done sending request data to server */
50
51 P_RESP_READING,
52 P_RESP_READY,
53 P_RESP_SENDING,
54 P_RESP_DONE,
55 P_CLEANUP
56 };
57
58 char *statename[P_CLEANUP+1] = {
59 "idle",
60 "reading from client",
61 "request from client OK",
62 "request combining",
63 "connecting to server",
64 "sending to server",
65 "request sent",
66 "reading from server",
67 "response from server OK",
68 "sending to client",
69 "response sent",
70 "cleanup"
71 };
72
73 typedef struct conn_t {
74 enum phase_t state;
75 int csocket;
76 struct sockaddr_in caddr;
77 struct in_addr *clientip, *serverip;
78 int snum;
79 int ssocket;
80 int conntries, sendtries;
81 int connectpending;
82 time_t conntime;
83 int madetocombo;
84 struct timespec arrival;
85 struct timespec timelimit;
86 unsigned char *buf, *bufp, *bufpsave;
87 unsigned int bufsize, buflen, buflensave;
88 struct conn_t *next;
89 } conn_t;
90
91 #define MAX_SERVERS 3
92 #define CONNECT_TRIES 3 /* How many connect-attempts against the server */
93 #define CONNECT_INTERVAL 8 /* Seconds between each connection attempt */
94 #define SEND_TRIES 2 /* How many times to try sending a message */
95 #define BUFSZ_READ 2048 /* Minimum #bytes that must be free when read'ing into a buffer */
96 #define BUFSZ_INC 8192 /* How much to grow the buffer when it is too small */
97 #define MAX_OPEN_SOCKS 256
98 #define MINIMUM_FOR_COMBO 2048 /* To start merging messages, at least have 2 KB free */
99 #define MAXIMUM_FOR_COMBO 32768 /* Max. size of a combined message */
100 #define COMBO_DELAY 250000000 /* Delay before sending a combo message (in nanoseconds) */
101
102 int keeprunning = 1;
103 time_t laststatus = 0;
104 char *logfile = NULL;
105 int logdetails = 0;
106 unsigned long msgs_timeout_from[P_CLEANUP+1] = { 0, };
107
108
sigmisc_handler(int signum)109 void sigmisc_handler(int signum)
110 {
111 switch (signum) {
112 case SIGTERM:
113 errprintf("Caught TERM signal, terminating\n");
114 keeprunning = 0;
115 break;
116
117 case SIGHUP:
118 if (logfile) {
119 reopen_file(logfile, "a", stdout);
120 reopen_file(logfile, "a", stderr);
121 errprintf("Caught SIGHUP, reopening logfile\n");
122 }
123 break;
124
125 case SIGUSR1:
126 /* Toggle logging of details */
127 logdetails = (1 - logdetails);
128 errprintf("Log details is %sabled\n", (logdetails ? "en" : "dis"));
129 break;
130 }
131 }
132
overdue(struct timespec * now,struct timespec * limit)133 int overdue(struct timespec *now, struct timespec *limit)
134 {
135 if (now->tv_sec < limit->tv_sec) return 0;
136 else if (now->tv_sec > limit->tv_sec) return 1;
137 else return (now->tv_nsec >= limit->tv_nsec);
138 }
139
do_read(int sockfd,struct in_addr * addr,conn_t * conn,enum phase_t completedstate)140 static int do_read(int sockfd, struct in_addr *addr, conn_t *conn, enum phase_t completedstate)
141 {
142 int n;
143
144 if ((conn->buflen + BUFSZ_READ + 1) > conn->bufsize) {
145 conn->bufsize += BUFSZ_INC;
146 conn->buf = realloc(conn->buf, conn->bufsize);
147 conn->bufp = conn->buf + conn->buflen;
148 }
149
150 n = read(sockfd, conn->bufp, (conn->bufsize - conn->buflen - 1));
151 if (n == -1) {
152 /* Error - abort */
153 errprintf("READ error from %s: %s\n", inet_ntoa(*addr), strerror(errno));
154 msgs_timeout_from[conn->state]++;
155 conn->state = P_CLEANUP;
156 return -1;
157 }
158 else if (n == 0) {
159 /* EOF - request is complete */
160 conn->state = completedstate;
161 }
162 else {
163 conn->buflen += n;
164 conn->bufp += n;
165 *conn->bufp = '\0';
166 }
167
168 return 0;
169 }
170
do_write(int sockfd,struct in_addr * addr,conn_t * conn,enum phase_t completedstate)171 static int do_write(int sockfd, struct in_addr *addr, conn_t *conn, enum phase_t completedstate)
172 {
173 int n;
174
175 n = write(sockfd, conn->bufp, conn->buflen);
176 if (n == -1) {
177 /* Error - abort */
178 errprintf("WRITE error to %s: %s\n", inet_ntoa(*addr), strerror(errno));
179 msgs_timeout_from[conn->state]++;
180 conn->state = P_CLEANUP;
181 return -1;
182 }
183 else if (n >= 0) {
184 conn->buflen -= n;
185 conn->bufp += n;
186 if (conn->buflen == 0) {
187 conn->state = completedstate;
188 }
189 }
190
191 return 0;
192 }
193
do_log(conn_t * conn)194 void do_log(conn_t *conn)
195 {
196 char *rq, *eol, *delim;
197 char savechar;
198
199 rq = conn->buf+6;
200 if (strncmp(rq, "combo\n", 6) == 0) rq += 6;
201
202 eol = strchr(rq, '\n'); if (eol) *eol = '\0';
203 for (delim = rq; (*delim && isalpha((unsigned char) *delim)); delim++);
204 for (; (*delim && isspace((unsigned char) *delim)); delim++);
205 for (; (*delim && !isspace((unsigned char) *delim)); delim++);
206 savechar = *delim; *delim = '\0';
207
208 errprintf("%s : %s\n", inet_ntoa(*conn->clientip), rq);
209 *delim = savechar;
210 if (eol) *eol = '\n';
211 }
212
main(int argc,char * argv[])213 int main(int argc, char *argv[])
214 {
215 int daemonize = 1;
216 int timeout = 10;
217 int listenq = 512;
218 char *pidfile = "/var/run/xymonproxy.pid";
219 char *proxyname = NULL;
220 char *proxynamesvc = "xymonproxy";
221
222 int sockcount = 0;
223 int lsocket;
224 struct sockaddr_in laddr;
225 struct sockaddr_in xymonserveraddr[MAX_SERVERS];
226 int xymonservercount = 0;
227 int opt;
228 conn_t *chead = NULL;
229 struct sigaction sa;
230 int selectfailures = 0;
231
232 /* Statistics info */
233 time_t startuptime = gettimer();
234 unsigned long msgs_total = 0;
235 unsigned long msgs_total_last = 0;
236 unsigned long msgs_combined = 0;
237 unsigned long msgs_merged = 0;
238 unsigned long msgs_delivered = 0;
239 unsigned long msgs_status = 0;
240 unsigned long msgs_combo = 0;
241 unsigned long msgs_other = 0;
242 unsigned long msgs_recovered = 0;
243 struct timespec timeinqueue = { 0, 0 };
244
245 /* Don'T save the output from errprintf() */
246 save_errbuf = 0;
247
248 memset(&laddr, 0, sizeof(laddr));
249 inet_aton("0.0.0.0", (struct in_addr *) &laddr.sin_addr.s_addr);
250 laddr.sin_port = htons(1984);
251 laddr.sin_family = AF_INET;
252
253 for (opt=1; (opt < argc); opt++) {
254 if (argnmatch(argv[opt], "--listen=")) {
255 char *locaddr, *p;
256 int locport;
257
258 locaddr = strchr(argv[opt], '=')+1;
259 p = strchr(locaddr, ':');
260 if (p) { locport = atoi(p+1); *p = '\0'; } else locport = 1984;
261
262 memset(&laddr, 0, sizeof(laddr));
263 laddr.sin_port = htons(locport);
264 laddr.sin_family = AF_INET;
265 if (inet_aton(locaddr, (struct in_addr *) &laddr.sin_addr.s_addr) == 0) {
266 errprintf("Invalid listen address %s\n", locaddr);
267 return 1;
268 }
269 if (p) *p = ':';
270 }
271 else if (argnmatch(argv[opt], "--server=") || argnmatch(argv[opt], "--bbdisplay=")) {
272 char *ips, *ip1;
273 int port1;
274
275 ips = strdup(strchr(argv[opt], '=')+1);
276
277 ip1 = strtok(ips, ",");
278 while (ip1) {
279 char *p;
280 p = strchr(ip1, ':');
281 if (p) { port1 = atoi(p+1); *p = '\0'; } else port1 = 1984;
282
283 memset(&xymonserveraddr[xymonservercount], 0, sizeof(xymonserveraddr[xymonservercount]));
284 xymonserveraddr[xymonservercount].sin_port = htons(port1);
285 xymonserveraddr[xymonservercount].sin_family = AF_INET;
286 if (inet_aton(ip1, (struct in_addr *) &xymonserveraddr[xymonservercount].sin_addr.s_addr) == 0) {
287 errprintf("Invalid remote address %s\n", ip1);
288 }
289 else {
290 xymonservercount++;
291 }
292 if (p) *p = ':';
293 ip1 = strtok(NULL, ",");
294 }
295 xfree(ips);
296 }
297 else if (argnmatch(argv[opt], "--timeout=")) {
298 char *p = strchr(argv[opt], '=');
299 timeout = atoi(p+1);
300 }
301 else if (argnmatch(argv[opt], "--lqueue=")) {
302 char *p = strchr(argv[opt], '=');
303 listenq = atoi(p+1);
304 }
305 else if (strcmp(argv[opt], "--daemon") == 0) {
306 daemonize = 1;
307 }
308 else if (strcmp(argv[opt], "--no-daemon") == 0) {
309 daemonize = 0;
310 }
311 else if (argnmatch(argv[opt], "--pidfile=")) {
312 char *p = strchr(argv[opt], '=');
313 pidfile = strdup(p+1);
314 }
315 else if (argnmatch(argv[opt], "--logfile=")) {
316 char *p = strchr(argv[opt], '=');
317 logfile = strdup(p+1);
318 }
319 else if (strcmp(argv[opt], "--log-details") == 0) {
320 logdetails = 1;
321 }
322 else if (argnmatch(argv[opt], "--report=")) {
323 char *p1 = strchr(argv[opt], '=')+1;
324
325 if (strchr(p1, '.') == NULL) {
326 if (xgetenv("MACHINE") == NULL) {
327 errprintf("Environment variable MACHINE is undefined\n");
328 return 1;
329 }
330
331 proxyname = strdup(xgetenv("MACHINE"));
332 proxyname = (char *)realloc(proxyname, strlen(proxyname) + strlen(p1) + 1);
333 strcat(proxyname, ".");
334 strcat(proxyname, p1);
335 proxynamesvc = strdup(p1);
336 }
337 else {
338 proxyname = strdup(p1);
339 proxynamesvc = strchr(proxyname, '.')+1;
340 }
341 }
342 else if (strcmp(argv[opt], "--debug") == 0) {
343 debug = 1;
344 }
345 else if (strcmp(argv[opt], "--version") == 0) {
346 printf("xymonproxy version %s\n", VERSION);
347 return 0;
348 }
349 else if (strcmp(argv[opt], "--help") == 0) {
350 printf("xymonproxy version %s\n", VERSION);
351 printf("\nOptions:\n");
352 printf("\t--listen=IP[:port] : Listen address and portnumber\n");
353 printf("\t--server=IP[:port] : Xymon server address and portnumber\n");
354 printf("\t--report=[HOST.]SERVICE : Sends a status message about proxy activity\n");
355 printf("\t--timeout=N : Communications timeout (seconds)\n");
356 printf("\t--lqueue=N : Listen-queue size\n");
357 printf("\t--daemon : Run as a daemon\n");
358 printf("\t--no-daemon : Do not run as a daemon\n");
359 printf("\t--pidfile=FILENAME : Save process-ID of daemon to FILENAME\n");
360 printf("\t--logfile=FILENAME : Log to FILENAME instead of stderr\n");
361 printf("\t--debug : Enable debugging output\n");
362 printf("\n");
363 return 0;
364 }
365 }
366
367 if (xymonservercount == 0) {
368 errprintf("No Xymon server address given - aborting\n");
369 return 1;
370 }
371
372 /* Set up a socket to listen for new connections */
373 lsocket = socket(AF_INET, SOCK_STREAM, 0);
374 if (lsocket == -1) {
375 errprintf("Cannot create listen socket (%s)\n", strerror(errno));
376 return 1;
377 }
378 opt = 1;
379 setsockopt(lsocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
380 fcntl(lsocket, F_SETFL, O_NONBLOCK);
381 if (bind(lsocket, (struct sockaddr *)&laddr, sizeof(laddr)) == -1) {
382 errprintf("Cannot bind to listen socket (%s)\n", strerror(errno));
383 return 1;
384 }
385
386 if (listen(lsocket, listenq) == -1) {
387 errprintf("Cannot listen (%s)\n", strerror(errno));
388 return 1;
389 }
390
391 /* Redirect logging to the logfile, if requested */
392 if (logfile) {
393 reopen_file(logfile, "a", stdout);
394 reopen_file(logfile, "a", stderr);
395 }
396
397 errprintf("xymonproxy version %s starting\n", VERSION);
398 errprintf("Listening on %s:%d\n", inet_ntoa(laddr.sin_addr), ntohs(laddr.sin_port));
399 {
400 int i;
401 char *p;
402 char srvrs[500];
403
404 for (i=0, srvrs[0] = '\0', p=srvrs; (i<xymonservercount); i++) {
405 p += sprintf(p, "%s:%d ", inet_ntoa(xymonserveraddr[i].sin_addr), ntohs(xymonserveraddr[i].sin_port));
406 }
407 errprintf("Sending to Xymon server(s) %s\n", srvrs);
408 }
409
410 if (daemonize) {
411 pid_t childpid;
412
413 reopen_file("/dev/null", "r", stdin);
414
415 /* Become a daemon */
416 childpid = fork();
417 if (childpid < 0) {
418 /* Fork failed */
419 errprintf("Could not fork\n");
420 exit(1);
421 }
422 else if (childpid > 0) {
423 /* Parent - save PID and exit */
424 FILE *fd = fopen(pidfile, "w");
425 if (fd) {
426 fprintf(fd, "%d\n", (int)childpid);
427 fclose(fd);
428 }
429 exit(0);
430 }
431 /* Child (daemon) continues here */
432 setsid();
433 }
434
435 setup_signalhandler(proxynamesvc);
436 memset(&sa, 0, sizeof(sa));
437 sa.sa_handler = sigmisc_handler;
438 sigaction(SIGHUP, &sa, NULL);
439 sigaction(SIGTERM, &sa, NULL);
440 sigaction(SIGUSR1, &sa, NULL);
441
442 do {
443 fd_set fdread, fdwrite;
444 int maxfd;
445 struct timespec tmo;
446 struct timeval selecttmo;
447 int n, idx;
448 conn_t *cwalk, *ctmp;
449 time_t ctime;
450 time_t now;
451 int combining = 0;
452
453 /* See if it is time for a status report */
454 if (proxyname && ((now = gettimer()) >= (laststatus+300))) {
455 conn_t *stentry;
456 int ccount = 0;
457 unsigned long bufspace = 0;
458 unsigned long avgtime; /* In millisecs */
459 char runtime_s[30];
460 unsigned long runt = (unsigned long) (now-startuptime);
461 char *p;
462 unsigned long msgs_sent = msgs_total - msgs_total_last;
463
464 /* Setup a conn_t struct for the status message */
465 stentry = (conn_t *)calloc(1, sizeof(conn_t));
466 stentry->state = P_REQ_READY;
467 stentry->csocket = stentry->ssocket = -1;
468 stentry->clientip = &stentry->caddr.sin_addr;
469 getntimer(&stentry->arrival);
470 stentry->timelimit.tv_sec = stentry->arrival.tv_sec + timeout;
471 stentry->timelimit.tv_nsec = stentry->arrival.tv_nsec;
472 stentry->bufsize = BUFSZ_INC;
473 stentry->buf = (char *)malloc(stentry->bufsize);
474 stentry->next = chead;
475 chead = stentry;
476
477 sprintf(runtime_s, "%lu days, %02lu:%02lu:%02lu",
478 (runt/86400), ((runt % 86400) / 3600),
479 ((runt % 3600) / 60), (runt % 60));
480
481 init_timestamp();
482
483 for (cwalk = chead; (cwalk); cwalk = cwalk->next) {
484 ccount++;
485 bufspace += cwalk->bufsize;
486 }
487
488 if (msgs_sent == 0) {
489 avgtime = 0;
490 }
491 else {
492 avgtime = (timeinqueue.tv_sec*1000 + timeinqueue.tv_nsec/1000) / msgs_sent;
493 }
494
495 p = stentry->buf;
496 p += sprintf(p, "combo\nstatus+11 %s green %s - xymon proxy up: %s\n\nxymonproxy for Xymon version %s\n\nProxy statistics\n\nIncoming messages : %10lu (%lu msgs/second)\nOutbound messages : %10lu\n\nIncoming message distribution\n- Combo messages : %10lu\n- Status messages : %10lu\n Messages merged : %10lu\n Resulting combos : %10lu\n- Other messages : %10lu\n\nProxy resources\n- Connection table size : %10d\n- Buffer space : %10lu kByte\n",
497 proxyname, timestamp, runtime_s, VERSION,
498 msgs_total, (msgs_total - msgs_total_last) / (now - laststatus),
499 msgs_delivered,
500 msgs_combo,
501 msgs_status, msgs_merged, msgs_combined,
502 msgs_other,
503 ccount, bufspace / 1024);
504 p += sprintf(p, "\nTimeout/failure details\n");
505 p += sprintf(p, "- %-22s : %10lu\n", statename[P_REQ_READING], msgs_timeout_from[P_REQ_READING]);
506 p += sprintf(p, "- %-22s : %10lu\n", statename[P_REQ_CONNECTING], msgs_timeout_from[P_REQ_CONNECTING]);
507 p += sprintf(p, "- %-22s : %10lu\n", statename[P_REQ_SENDING], msgs_timeout_from[P_REQ_SENDING]);
508 p += sprintf(p, "- %-22s : %10lu\n", "recovered", msgs_recovered);
509 p += sprintf(p, "- %-22s : %10lu\n", statename[P_RESP_READING], msgs_timeout_from[P_RESP_READING]);
510 p += sprintf(p, "- %-22s : %10lu\n", statename[P_RESP_SENDING], msgs_timeout_from[P_RESP_SENDING]);
511 p += sprintf(p, "\n%-24s : %10lu.%03lu\n", "Average queue time",
512 (avgtime / 1000), (avgtime % 1000));
513
514 /* Clear the summary collection totals */
515 laststatus = now;
516 msgs_total_last = msgs_total;
517 timeinqueue.tv_sec = timeinqueue.tv_nsec = 0;
518
519 stentry->buflen = strlen(stentry->buf);
520 stentry->bufp = stentry->buf + stentry->buflen;
521 stentry->state = P_REQ_READY;
522 }
523
524 FD_ZERO(&fdread);
525 FD_ZERO(&fdwrite);
526 maxfd = -1;
527 combining = 0;
528
529 for (cwalk = chead, idx=0; (cwalk); cwalk = cwalk->next, idx++) {
530 dbgprintf("state %d: %s\n", idx, statename[cwalk->state]);
531
532 /* First, handle any state transitions and setup the FD sets for select() */
533 switch (cwalk->state) {
534 case P_REQ_READING:
535 FD_SET(cwalk->csocket, &fdread);
536 if (cwalk->csocket > maxfd) maxfd = cwalk->csocket;
537 break;
538
539 case P_REQ_READY:
540 if (cwalk->buflen <= 6) {
541 /* Got an empty request - just drop it */
542 dbgprintf("Dropping empty request from %s\n", inet_ntoa(*cwalk->clientip));
543 cwalk->state = P_CLEANUP;
544 break;
545 }
546
547 if (logdetails) do_log(cwalk);
548 cwalk->conntries = CONNECT_TRIES;
549 cwalk->sendtries = SEND_TRIES;
550 cwalk->conntime = 0;
551
552 /*
553 * We now want to find out what kind of message we've got.
554 * If it's NOT a "status" message, just pass it along.
555 * For "status" messages, we want to try and merge many small
556 * messages into a "combo" message - so send those off the the
557 * P_REQ_COMBINING state for a while.
558 * If we are not going to send back a response to the client, we
559 * also close the client socket since it is no longer needed.
560 * Note that since we started out as optimists and put a "combo\n"
561 * at the front of the buffer, we need to skip that when looking at
562 * what type of message it is. Hence the "cwalk->buf+6".
563 */
564 if (strncmp(cwalk->buf+6, "client", 6) == 0) {
565 /*
566 * "client" messages go to all Xymon servers, but
567 * we will only pass back the response from one of them
568 * (the last one).
569 */
570 shutdown(cwalk->csocket, SHUT_RD);
571 msgs_other++;
572 cwalk->snum = xymonservercount;
573
574 if ((cwalk->buflen + 40 ) < cwalk->bufsize) {
575 int n = sprintf(cwalk->bufp,
576 "\n[proxy]\nClientIP:%s\n",
577 inet_ntoa(*cwalk->clientip));
578 cwalk->bufp += n;
579 cwalk->buflen += n;
580 }
581 }
582 else if ((strncmp(cwalk->buf+6, "query", 5) == 0) ||
583 (strncmp(cwalk->buf+6, "config", 6) == 0) ||
584 (strncmp(cwalk->buf+6, "ping", 4) == 0) ||
585 (strncmp(cwalk->buf+6, "download", 8) == 0)) {
586 /*
587 * These requests get a response back, but send no data.
588 * Send these to the last of the Xymon servers only.
589 */
590 shutdown(cwalk->csocket, SHUT_RD);
591 msgs_other++;
592 cwalk->snum = 1;
593 }
594 else {
595 /* It's a request that doesn't take a response. */
596 if (cwalk->csocket >= 0) {
597 shutdown(cwalk->csocket, SHUT_RDWR);
598 close(cwalk->csocket); sockcount--;
599 cwalk->csocket = -1;
600 }
601 cwalk->snum = xymonservercount;
602
603 if (strncmp(cwalk->buf+6, "status", 6) == 0) {
604 msgs_status++;
605 getntimer(&cwalk->timelimit);
606 cwalk->timelimit.tv_nsec += COMBO_DELAY;
607 if (cwalk->timelimit.tv_nsec >= 1000000000) {
608 cwalk->timelimit.tv_sec++;
609 cwalk->timelimit.tv_nsec -= 1000000000;
610 }
611
612 /*
613 * Some clients (bbnt) send a trailing \0, so we cannot
614 * rely on buflen being what we want it to be.
615 */
616 cwalk->buflen = strlen(cwalk->buf);
617 cwalk->bufp = cwalk->buf + cwalk->buflen;
618
619 if ((cwalk->buflen + 50 ) < cwalk->bufsize) {
620 int n = sprintf(cwalk->bufp,
621 "\nStatus message received from %s\n",
622 inet_ntoa(*cwalk->clientip));
623 cwalk->bufp += n;
624 cwalk->buflen += n;
625 }
626
627 cwalk->state = P_REQ_COMBINING;
628 break;
629 }
630 else if (strncmp(cwalk->buf+6, "combo\n", 6) == 0) {
631 char *currmsg, *nextmsg;
632
633 msgs_combo++;
634
635 /*
636 * Some clients (bbnt) send a trailing \0, so we cannot
637 * rely on buflen being what we want it to be.
638 */
639 cwalk->buflen = strlen(cwalk->buf);
640 cwalk->bufp = cwalk->buf + cwalk->buflen;
641
642 getntimer(&cwalk->timelimit);
643 cwalk->timelimit.tv_nsec += COMBO_DELAY;
644 if (cwalk->timelimit.tv_nsec >= 1000000000) {
645 cwalk->timelimit.tv_sec++;
646 cwalk->timelimit.tv_nsec -= 1000000000;
647 }
648
649 currmsg = cwalk->buf+12; /* Skip pre-def. "combo\n" and message "combo\n" */
650 do {
651 nextmsg = strstr(currmsg, "\n\nstatus");
652 if (nextmsg) { *(nextmsg+1) = '\0'; nextmsg += 2; }
653
654 /* Create a duplicate conn_t record for all embedded messages */
655 ctmp = (conn_t *)malloc(sizeof(conn_t));
656 memcpy(ctmp, cwalk, sizeof(conn_t));
657 ctmp->bufsize = BUFSZ_INC*(((6 + strlen(currmsg) + 50) / BUFSZ_INC) + 1);
658 ctmp->buf = (char *)malloc(ctmp->bufsize);
659 ctmp->buflen = sprintf(ctmp->buf,
660 "combo\n%s\nStatus message received from %s\n",
661 currmsg, inet_ntoa(*cwalk->clientip));
662 ctmp->bufp = ctmp->buf + ctmp->buflen;
663 ctmp->state = P_REQ_COMBINING;
664 ctmp->next = chead;
665 chead = ctmp;
666
667 currmsg = nextmsg;
668 } while (currmsg);
669
670 /* We don't do anymore with this conn_t */
671 cwalk->state = P_CLEANUP;
672 break;
673 }
674 else if (strncmp(cwalk->buf+6, "page", 4) == 0) {
675 /* xymond has no use for page requests */
676 cwalk->state = P_CLEANUP;
677 break;
678 }
679 else {
680 msgs_other++;
681 }
682 }
683
684 /*
685 * This wont be made into a combo message, so skip the "combo\n"
686 * and go off to send the message to the server.
687 */
688 cwalk->bufp = cwalk->buf+6;
689 cwalk->buflen -= 6;
690 cwalk->bufpsave = cwalk->bufp;
691 cwalk->buflensave = cwalk->buflen;
692 cwalk->state = P_REQ_CONNECTING;
693 /* Fall through for non-status messages */
694
695 case P_REQ_CONNECTING:
696 /* Need to restore the bufp and buflen, as we may get here many times for one message */
697 cwalk->bufp = cwalk->bufpsave;
698 cwalk->buflen = cwalk->buflensave;
699
700 ctime = gettimer();
701 if (ctime < (cwalk->conntime + CONNECT_INTERVAL)) {
702 dbgprintf("Delaying retry of connection\n");
703 break;
704 }
705
706 cwalk->conntries--;
707 cwalk->conntime = ctime;
708 if (cwalk->conntries < 0) {
709 errprintf("Server not responding, message lost\n");
710 cwalk->state = P_REQ_DONE; /* Not CLENAUP - might be more servers */
711 msgs_timeout_from[P_REQ_CONNECTING]++;
712 break;
713 }
714
715 cwalk->ssocket = socket(AF_INET, SOCK_STREAM, 0);
716 if (cwalk->ssocket == -1) {
717 dbgprintf("Could not get a socket - will try again\n");
718 break; /* Retry the next time around */
719 }
720 sockcount++;
721 fcntl(cwalk->ssocket, F_SETFL, O_NONBLOCK);
722
723 {
724 int idx = (xymonservercount - cwalk->snum);
725 n = connect(cwalk->ssocket, (struct sockaddr *)&xymonserveraddr[idx], sizeof(xymonserveraddr[idx]));
726 cwalk->serverip = &xymonserveraddr[idx].sin_addr;
727 dbgprintf("Connecting to Xymon server at %s\n", inet_ntoa(*cwalk->serverip));
728 }
729
730 if ((n == 0) || ((n == -1) && (errno == EINPROGRESS))) {
731 cwalk->state = P_REQ_SENDING;
732 cwalk->connectpending = 1;
733 getntimer(&cwalk->timelimit);
734 cwalk->timelimit.tv_sec += timeout;
735 /* Fallthrough */
736 }
737 else {
738 /* Could not connect! Invoke retries */
739 dbgprintf("Connect to server failed: %s\n", strerror(errno));
740 close(cwalk->ssocket); sockcount--;
741 cwalk->ssocket = -1;
742 break;
743 }
744 /* No "break" here! */
745
746 case P_REQ_SENDING:
747 FD_SET(cwalk->ssocket, &fdwrite);
748 if (cwalk->ssocket > maxfd) maxfd = cwalk->ssocket;
749 break;
750
751 case P_REQ_DONE:
752 /* Request has been sent to the server - we're done writing data */
753 shutdown(cwalk->ssocket, SHUT_WR);
754 cwalk->snum--;
755 if (cwalk->snum) {
756 /* More servers to do */
757 close(cwalk->ssocket); cwalk->ssocket = -1; sockcount--;
758 cwalk->conntries = CONNECT_TRIES;
759 cwalk->sendtries = SEND_TRIES;
760 cwalk->conntime = 0;
761 cwalk->state = P_REQ_CONNECTING;
762 break;
763 }
764 else {
765 /* Have sent to all servers, grab the response from the last one. */
766 cwalk->bufp = cwalk->buf; cwalk->buflen = 0;
767 memset(cwalk->buf, 0, cwalk->bufsize);
768 }
769
770 msgs_delivered++;
771
772 if (cwalk->sendtries < SEND_TRIES) {
773 errprintf("Recovered from write error after %d retries\n",
774 (SEND_TRIES - cwalk->sendtries));
775 msgs_recovered++;
776 }
777
778 if (cwalk->arrival.tv_sec > 0) {
779 struct timespec departure;
780
781 getntimer(&departure);
782 timeinqueue.tv_sec += (departure.tv_sec - cwalk->arrival.tv_sec);
783 if (departure.tv_nsec >= cwalk->arrival.tv_nsec) {
784 timeinqueue.tv_nsec += (departure.tv_nsec - cwalk->arrival.tv_nsec);
785 }
786 else {
787 timeinqueue.tv_sec--;
788 timeinqueue.tv_nsec += (1000000000 + departure.tv_nsec - cwalk->arrival.tv_nsec);
789 }
790
791 if (timeinqueue.tv_nsec > 1000000000) {
792 timeinqueue.tv_sec++;
793 timeinqueue.tv_nsec -= 1000000000;
794 }
795 }
796 else {
797 errprintf("Odd ... this message was not timestamped\n");
798 }
799
800 if (cwalk->csocket < 0) {
801 cwalk->state = P_CLEANUP;
802 break;
803 }
804 else {
805 cwalk->state = P_RESP_READING;
806 getntimer(&cwalk->timelimit);
807 cwalk->timelimit.tv_sec += timeout;
808 }
809 /* Fallthrough */
810
811 case P_RESP_READING:
812 FD_SET(cwalk->ssocket, &fdread);
813 if (cwalk->ssocket > maxfd) maxfd = cwalk->ssocket;
814 break;
815
816 case P_RESP_READY:
817 shutdown(cwalk->ssocket, SHUT_RD);
818 close(cwalk->ssocket); sockcount--;
819 cwalk->ssocket = -1;
820 cwalk->bufp = cwalk->buf;
821 cwalk->state = P_RESP_SENDING;
822 getntimer(&cwalk->timelimit);
823 cwalk->timelimit.tv_sec += timeout;
824 /* Fall through */
825
826 case P_RESP_SENDING:
827 if (cwalk->buflen && (cwalk->csocket >= 0)) {
828 FD_SET(cwalk->csocket, &fdwrite);
829 if (cwalk->csocket > maxfd) maxfd = cwalk->csocket;
830 break;
831 }
832 else {
833 cwalk->state = P_RESP_DONE;
834 }
835 /* Fall through */
836
837 case P_RESP_DONE:
838 if (cwalk->csocket >= 0) {
839 shutdown(cwalk->csocket, SHUT_WR);
840 close(cwalk->csocket); sockcount--;
841 }
842 cwalk->csocket = -1;
843 cwalk->state = P_CLEANUP;
844 /* Fall through */
845
846 case P_CLEANUP:
847 if (cwalk->csocket >= 0) {
848 close(cwalk->csocket); sockcount--;
849 cwalk->csocket = -1;
850 }
851 if (cwalk->ssocket >= 0) {
852 close(cwalk->ssocket); sockcount--;
853 cwalk->ssocket = -1;
854 }
855 cwalk->arrival.tv_sec = cwalk->arrival.tv_nsec = 0;
856 cwalk->bufp = cwalk->buf;
857 cwalk->buflen = 0;
858 memset(cwalk->buf, 0, cwalk->bufsize);
859 memset(&cwalk->caddr, 0, sizeof(cwalk->caddr));
860 cwalk->madetocombo = 0;
861 cwalk->state = P_IDLE;
862 break;
863
864 case P_IDLE:
865 break;
866
867 case P_REQ_COMBINING:
868 /* See if we can combine some "status" messages into a "combo" */
869 combining++;
870 getntimer(&tmo);
871 if ((cwalk->buflen < MINIMUM_FOR_COMBO) && !overdue(&tmo, &cwalk->timelimit)) {
872 conn_t *cextra;
873
874 /* Are there any other messages in P_COMBINING state ? */
875 cextra = cwalk->next;
876 while (cextra && (cextra->state != P_REQ_COMBINING)) cextra = cextra->next;
877
878 if (cextra) {
879 /*
880 * Yep. It might be worthwhile to go for a combo.
881 */
882 while (cextra && (cwalk->buflen < (MAXIMUM_FOR_COMBO-20))) {
883 if (strncmp(cextra->buf+6, "status", 6) == 0) {
884 int newsize;
885
886 /*
887 * Size of the new message - if the cextra one
888 * is merged - is the cwalk buffer, plus the
889 * two newlines separating messages in combo's,
890 * plus the cextra buffer except the leading
891 * "combo\n" of 6 bytes.
892 */
893 newsize = cwalk->buflen + 2 + (cextra->buflen - 6);
894
895 if ((newsize < cwalk->bufsize) && (newsize < MAXIMUM_FOR_COMBO)) {
896 /*
897 * There's room for it. Add it to the
898 * cwalk buffer, but without the leading
899 * "combo\n" (we already have one of those).
900 */
901 cwalk->madetocombo++;
902 strcat(cwalk->buf, "\n\n");
903 strcat(cwalk->buf, cextra->buf+6);
904 cwalk->buflen = newsize;
905 cextra->state = P_CLEANUP;
906 dbgprintf("Merged combo\n");
907 msgs_merged++;
908 }
909 }
910
911 /* Go to the next connection in the right state */
912 do {
913 cextra = cextra->next;
914 } while (cextra && (cextra->state != P_REQ_COMBINING));
915 }
916 }
917 }
918 else {
919 combining--;
920 cwalk->state = P_REQ_CONNECTING;
921
922 if (cwalk->madetocombo) {
923 /*
924 * Point the outgoing buffer pointer to the full
925 * message, including the "combo\n"
926 */
927 cwalk->bufp = cwalk->buf;
928 cwalk->madetocombo++;
929 msgs_merged++; /* Count the proginal message also */
930 msgs_combined++;
931 dbgprintf("Now going to send combo from %d messages\n%s\n",
932 cwalk->madetocombo, cwalk->buf);
933 }
934 else {
935 /*
936 * Skip sending the "combo\n" at start of buffer.
937 */
938 cwalk->bufp = cwalk->buf+6;
939 cwalk->buflen -= 6;
940 dbgprintf("No messages to combine - sending unchanged\n");
941 }
942 }
943
944 cwalk->bufpsave = cwalk->bufp;
945 cwalk->buflensave = cwalk->buflen;
946 break;
947
948 default:
949 break;
950 }
951 }
952
953 /* Add the listen-socket to the select() list, but only if we have room */
954 if (sockcount < MAX_OPEN_SOCKS) {
955 FD_SET(lsocket, &fdread);
956 if (lsocket > maxfd) maxfd = lsocket;
957 }
958 else {
959 static time_t lastlog = 0;
960 if ((now = gettimer()) < (lastlog+30)) {
961 lastlog = now;
962 errprintf("Squelching incoming connections, sockcount=%d\n", sockcount);
963 }
964 }
965
966 if (combining) {
967 selecttmo.tv_sec = 0; selecttmo.tv_usec = COMBO_DELAY / 1000;
968 }
969 else {
970 selecttmo.tv_sec = 1; selecttmo.tv_usec = 0;
971 }
972
973 n = select(maxfd+1, &fdread, &fdwrite, NULL, &selecttmo);
974
975 if (n < 0) {
976 errprintf("select() failed: %s\n", strerror(errno));
977 if (++selectfailures > 5) {
978 errprintf("Too many select failures, aborting\n");
979 exit(1);
980 }
981 }
982 else if (n == 0) {
983 /* Timeout */
984 if (selectfailures > 0) selectfailures--;
985
986 getntimer(&tmo);
987 for (cwalk = chead; (cwalk); cwalk = cwalk->next) {
988 switch (cwalk->state) {
989 case P_REQ_READING:
990 case P_REQ_SENDING:
991 case P_RESP_READING:
992 case P_RESP_SENDING:
993 if (overdue(&tmo, &cwalk->timelimit)) {
994 cwalk->state = P_CLEANUP;
995 msgs_timeout_from[cwalk->state]++;
996 }
997 break;
998
999 default:
1000 break;
1001 }
1002 }
1003 }
1004 else {
1005 if (selectfailures > 0) selectfailures--;
1006
1007 for (cwalk = chead; (cwalk); cwalk = cwalk->next) {
1008 switch (cwalk->state) {
1009 case P_REQ_READING:
1010 if (FD_ISSET(cwalk->csocket, &fdread)) {
1011 do_read(cwalk->csocket, cwalk->clientip, cwalk, P_REQ_READY);
1012 }
1013 break;
1014
1015 case P_REQ_SENDING:
1016 if (FD_ISSET(cwalk->ssocket, &fdwrite)) {
1017 if (cwalk->connectpending) {
1018 int connres, connressize;
1019
1020 /* First time ready for write - check connect status */
1021 cwalk->connectpending = 0;
1022 connressize = sizeof(connres);
1023 n = getsockopt(cwalk->ssocket, SOL_SOCKET, SO_ERROR, &connres, &connressize);
1024 if (connres != 0) {
1025 /* Connect failed! Invoke retries. */
1026 dbgprintf("Connect to server failed: %s - retrying\n",
1027 strerror(errno));
1028 close(cwalk->ssocket); sockcount--;
1029 cwalk->ssocket = -1;
1030 cwalk->state = P_REQ_CONNECTING;
1031 break;
1032 }
1033 }
1034
1035 if ( (do_write(cwalk->ssocket, cwalk->serverip, cwalk, P_REQ_DONE) == -1) &&
1036 (cwalk->sendtries > 0) ) {
1037 /*
1038 * Got a "write" error after connecting.
1039 * Try saving the situation by retrying the send later.
1040 */
1041 dbgprintf("Attempting recovery from write error\n");
1042 close(cwalk->ssocket); sockcount--; cwalk->ssocket = -1;
1043 cwalk->sendtries--;
1044 cwalk->state = P_REQ_CONNECTING;
1045 cwalk->conntries = CONNECT_TRIES;
1046 cwalk->conntime = gettimer();
1047 }
1048 }
1049 break;
1050
1051 case P_RESP_READING:
1052 if (FD_ISSET(cwalk->ssocket, &fdread)) {
1053 do_read(cwalk->ssocket, cwalk->serverip, cwalk, P_RESP_READY);
1054 }
1055 break;
1056
1057 case P_RESP_SENDING:
1058 if (FD_ISSET(cwalk->csocket, &fdwrite)) {
1059 do_write(cwalk->csocket, cwalk->clientip, cwalk, P_RESP_DONE);
1060 }
1061 break;
1062
1063 default:
1064 break;
1065 }
1066 }
1067
1068 if (FD_ISSET(lsocket, &fdread)) {
1069 /* New incoming connection */
1070 conn_t *newconn;
1071 int caddrsize;
1072
1073 dbgprintf("New connection\n");
1074 for (cwalk = chead; (cwalk && (cwalk->state != P_IDLE)); cwalk = cwalk->next);
1075 if (cwalk) {
1076 newconn = cwalk;
1077 }
1078 else {
1079 newconn = malloc(sizeof(conn_t));
1080 newconn->next = chead;
1081 chead = newconn;
1082 newconn->bufsize = BUFSZ_INC;
1083 newconn->buf = newconn->bufp = malloc(newconn->bufsize);
1084 }
1085
1086 newconn->connectpending = 0;
1087 newconn->madetocombo = 0;
1088 newconn->snum = 0;
1089 newconn->ssocket = -1;
1090 newconn->serverip = NULL;
1091 newconn->conntries = 0;
1092 newconn->sendtries = 0;
1093 newconn->timelimit.tv_sec = newconn->timelimit.tv_nsec = 0;
1094
1095 /*
1096 * Why this ? Because we like to merge small status messages
1097 * into larger combo messages. So put a "combo\n" at the start
1098 * of the buffer, and then don't send it if we decide it won't
1099 * be a combo-message after all.
1100 */
1101 strcpy(newconn->buf, "combo\n");
1102 newconn->buflen = 6;
1103 newconn->bufp = newconn->buf+6;
1104
1105 caddrsize = sizeof(newconn->caddr);
1106 newconn->csocket = accept(lsocket, (struct sockaddr *)&newconn->caddr, &caddrsize);
1107 if (newconn->csocket == -1) {
1108 /* accept() failure. Yes, it does happen! */
1109 dbgprintf("accept failure, ignoring connection (%s), sockcount=%d\n",
1110 strerror(errno), sockcount);
1111 newconn->state = P_IDLE;
1112 }
1113 else {
1114 msgs_total++;
1115 newconn->clientip = &newconn->caddr.sin_addr;
1116 sockcount++;
1117 fcntl(newconn->csocket, F_SETFL, O_NONBLOCK);
1118 newconn->state = P_REQ_READING;
1119 getntimer(&newconn->arrival);
1120 newconn->timelimit.tv_sec = newconn->arrival.tv_sec + timeout;
1121 newconn->timelimit.tv_nsec = newconn->arrival.tv_nsec;
1122 }
1123 }
1124 }
1125
1126 /* Clean up unused conn_t's */
1127 {
1128 conn_t *tmp, *khead;
1129
1130 khead = NULL; cwalk = chead;
1131 while (cwalk) {
1132 if ((cwalk == chead) && (cwalk->state == P_IDLE)) {
1133 /* head of chain is dead */
1134 tmp = chead;
1135 chead = chead->next;
1136 tmp->next = khead;
1137 khead = tmp;
1138
1139 cwalk = chead;
1140 }
1141 else if (cwalk->next && (cwalk->next->state == P_IDLE)) {
1142 tmp = cwalk->next;
1143 cwalk->next = tmp->next;
1144 tmp->next = khead;
1145 khead = tmp;
1146
1147 /* cwalk is unchanged */
1148 }
1149 else {
1150 cwalk = cwalk->next;
1151 }
1152 }
1153
1154 /* khead now holds a list of P_IDLE conn_t structs */
1155 while (khead) {
1156 tmp = khead;
1157 khead = khead->next;
1158
1159 if (tmp->buf) xfree(tmp->buf);
1160 xfree(tmp);
1161 }
1162 }
1163 } while (keeprunning);
1164
1165 if (pidfile) unlink(pidfile);
1166 return 0;
1167 }
1168
1169