1 /*----------------------------------------------------------------------------*/
2 /* Xymon message daemon. */
3 /* */
4 /* This is a small library for xymond worker modules, to read a new message */
5 /* from the xymond_channel process, and also do the decoding of messages */
6 /* that are passed on the "meta-data" first line of such a message. */
7 /* */
8 /* Copyright (C) 2004-2011 Henrik Storner <henrik@hswn.dk> */
9 /* */
10 /* This program is released under the GNU General Public License (GPL), */
11 /* version 2. See the file "COPYING" for details. */
12 /* */
13 /*----------------------------------------------------------------------------*/
14
15 static char rcsid[] = "$Id: xymond_worker.c 7678 2015-10-01 14:42:42Z jccleaver $";
16
17 #include "config.h"
18
19 #include <stdio.h>
20 #include <string.h>
21 #include <stdlib.h>
22 #include <ctype.h>
23
24 #include <unistd.h>
25 #include <fcntl.h>
26
27 #include <sys/types.h>
28 #include <sys/socket.h>
29
30 #ifdef HAVE_SYS_SELECT_H
31 #include <sys/select.h> /* Someday I'll move to GNU Autoconf for this .. . */
32 #endif
33
34 #include <sys/time.h>
35 #include <time.h>
36 #include <errno.h>
37 #include <sys/resource.h>
38 #include <sys/wait.h>
39
40 #include "libxymon.h"
41
42 #include "xymond_worker.h"
43
44 #include <signal.h>
45
46
47 static int running = 1;
48 static int inputfd = STDIN_FILENO;
49
50 #define EXTRABUFSPACE 4095
51
52 static char *locatorlocation = NULL;
53 static char *locatorid = NULL;
54 static enum locator_servicetype_t locatorsvc = ST_MAX;
55 static int locatorweight = 1;
56 static char *locatorextra = NULL;
57 static char *listenipport = NULL;
58 static time_t locatorhb = 0;
59
60
netinp_sighandler(int signum)61 static void netinp_sighandler(int signum)
62 {
63 switch (signum) {
64 case SIGINT:
65 case SIGTERM:
66 running = 0;
67 break;
68 }
69 }
70
net_worker_heartbeat(void)71 static void net_worker_heartbeat(void)
72 {
73 time_t now;
74
75 if (!locatorid || (locatorsvc == ST_MAX)) return;
76
77 now = gettimer();
78 if (now > locatorhb) {
79 locator_serverup(locatorid, locatorsvc);
80 locatorhb = now + 60;
81 }
82 }
83
net_worker_listener(char * ipport)84 static int net_worker_listener(char *ipport)
85 {
86 /*
87 * Setup a listener socket on IP+port. When a connection arrives,
88 * pick it up, fork() and let the rest of the input go via the
89 * network socket.
90 */
91
92 char *listenip, *p;
93 int listenport = 0;
94 int lsocket = -1;
95 struct sockaddr_in laddr;
96 struct sigaction sa;
97 int opt;
98
99 listenip = ipport;
100 p = strchr(listenip, ':');
101 if (p) {
102 *p = '\0';
103 listenport = atoi(p+1);
104 }
105
106 if (listenport == 0) {
107 errprintf("Must include PORT number in --listen=IP:PORT option\n");
108 return -1;
109 }
110
111 /* Set up a socket to listen for new connections */
112 errprintf("Setting up network listener on %s:%d\n", listenip, listenport);
113 memset(&laddr, 0, sizeof(laddr));
114 if ((strlen(listenip) == 0) || (strcmp(listenip, "0.0.0.0") == 0)) {
115 listenip = "0.0.0.0";
116 laddr.sin_addr.s_addr = INADDR_ANY;
117 }
118 else if (inet_aton(listenip, (struct in_addr *) &laddr.sin_addr.s_addr) == 0) {
119 /* Not an IP */
120 errprintf("Listener IP must be an IP-address, not hostname\n");
121 return -1;
122 }
123 laddr.sin_port = htons(listenport);
124 laddr.sin_family = AF_INET;
125 lsocket = socket(AF_INET, SOCK_STREAM, 0);
126 if (lsocket == -1) {
127 errprintf("Cannot create listen socket (%s)\n", strerror(errno));
128 return -1;
129 }
130
131 opt = 1;
132 setsockopt(lsocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
133
134 if (bind(lsocket, (struct sockaddr *)&laddr, sizeof(laddr)) == -1) {
135 errprintf("Cannot bind to listen socket (%s)\n", strerror(errno));
136 return -1;
137 }
138 if (listen(lsocket, 5) == -1) {
139 errprintf("Cannot listen (%s)\n", strerror(errno));
140 return -1;
141 }
142
143 /* Make listener socket non-blocking, so we can send keep-alive's while waiting for connections */
144 fcntl(lsocket, F_SETFL, O_NONBLOCK);
145
146 /* Catch some signals */
147 setup_signalhandler("xymond_listener");
148 memset(&sa, 0, sizeof(sa));
149 sa.sa_handler = netinp_sighandler;
150 sigaction(SIGCHLD, &sa, NULL);
151 sigaction(SIGTERM, &sa, NULL);
152 sigaction(SIGINT, &sa, NULL);
153
154 while (running) {
155 struct sockaddr_in netaddr;
156 int addrsz, sock;
157 int childstat;
158 pid_t childpid;
159 fd_set fdread;
160 struct timeval tmo;
161 int n;
162
163 FD_ZERO(&fdread);
164 FD_SET(lsocket, &fdread);
165 tmo.tv_sec = 60; tmo.tv_usec = 0;
166 n = select(lsocket+1, &fdread, NULL, NULL, &tmo);
167 if (n == -1) {
168 if (errno != EINTR) {
169 errprintf("select() failed while waiting for connection : %s\n", strerror(errno));
170 running = 0;
171 }
172 }
173 else if (n == 0) {
174 /* Timeout */
175 net_worker_heartbeat();
176 }
177 else {
178 /* We have a connection ready */
179 addrsz = sizeof(netaddr);
180 sock = accept(lsocket, (struct sockaddr *)&netaddr, &addrsz);
181
182 if (sock >= 0) {
183 /* Got a new connection */
184
185 childpid = fork();
186 if (childpid == 0) {
187 /* Child takes input from the new socket, and starts working */
188 close(lsocket); /* Close the listener socket */
189 inputfd = sock;
190 return 0;
191 }
192 else if (childpid > 0) {
193 /* Parent closes the new socket (child has it) */
194 close(sock);
195 continue;
196 }
197 else {
198 errprintf("Error forking worker for new connection: %s\n", strerror(errno));
199 running = 0;
200 continue;
201 }
202 }
203 else {
204 /* Error while waiting for accept() to complete */
205 if (errno != EINTR) {
206 errprintf("accept() failed: %s\n", strerror(errno));
207 running = 0;
208 }
209 }
210 }
211
212 /* Pickup failed children */
213 while ((childpid = wait3(&childstat, WNOHANG, NULL)) > 0);
214 }
215
216 /* Close the listener socket */
217 close(lsocket);
218
219 return 1;
220 }
221
222
net_worker_option(char * arg)223 int net_worker_option(char *arg)
224 {
225 int res = 1;
226
227 if (argnmatch(arg, "--locator=")) {
228 char *p = strchr(arg, '=');
229 locatorlocation = strdup(p+1);
230 }
231 else if (argnmatch(arg, "--locatorid=")) {
232 char *p = strchr(arg, '=');
233 locatorid = strdup(p+1);
234 }
235 else if (argnmatch(arg, "--locatorweight=")) {
236 char *p = strchr(arg, '=');
237 locatorweight = atoi(p+1);
238 }
239 else if (argnmatch(arg, "--locatorextra=")) {
240 char *p = strchr(arg, '=');
241 locatorextra = strdup(p+1);
242 }
243 else if (argnmatch(arg, "--listen=")) {
244 char *p = strchr(arg, '=');
245 listenipport = strdup(p+1);
246 }
247 else {
248 res = 0;
249 }
250
251 return res;
252 }
253
254
net_worker_locatorbased(void)255 int net_worker_locatorbased(void)
256 {
257 return ((locatorsvc != ST_MAX) && listenipport && locatorlocation);
258 }
259
net_worker_run(enum locator_servicetype_t svc,enum locator_sticky_t sticky,update_fn_t * updfunc)260 void net_worker_run(enum locator_servicetype_t svc, enum locator_sticky_t sticky, update_fn_t *updfunc)
261 {
262 locatorsvc = svc;
263
264 if (listenipport) {
265 char *p;
266 struct in_addr dummy;
267
268 if (!locatorid) locatorid = strdup(listenipport);
269
270 p = strchr(locatorid, ':');
271 if (p == NULL) {
272 errprintf("Locator ID must be IP:PORT matching the listener address\n");
273 exit(1);
274 }
275 *p = '\0';
276 if (inet_aton(locatorid, &dummy) == 0) {
277 errprintf("Locator ID must be IP:PORT matching the listener address\n");
278 exit(1);
279 }
280 *p = ':';
281 }
282
283 if (listenipport && locatorlocation) {
284 int res;
285 int delay = 10;
286
287 /* Tell the world we're here */
288 while (locator_init(locatorlocation) != 0) {
289 errprintf("Locator unavailable, waiting for it to be ready\n");
290 sleep(delay);
291 if (delay < 240) delay *= 2;
292 }
293
294 locator_register_server(locatorid, svc, locatorweight, sticky, locatorextra);
295 if (updfunc) (*updfunc)(locatorid);
296
297 /* Launch the network listener and wait for incoming connections */
298 res = net_worker_listener(listenipport);
299
300 /*
301 * Return value is:
302 * -1 : Error in setup. Abort.
303 * 0 : New connection arrived, and this is now a forked worker process. Continue.
304 * 1 : Listener terminates. Exit normally.
305 */
306 if (res == -1) {
307 errprintf("Listener setup failed, aborting\n");
308 locator_serverdown(locatorid, svc);
309 exit(1);
310 }
311 else if (res == 1) {
312 errprintf("xymond_listener listener terminated\n");
313 locator_serverdown(locatorid, svc);
314 exit(0);
315 }
316 else {
317 /* Worker process started. Return from here causes worker to start. */
318 }
319 }
320 else if (listenipport || locatorlocation || locatorid) {
321 errprintf("Must specify all of --listen, --locator and --locatorid\n");
322 exit(1);
323 }
324 }
325
326
get_xymond_message(enum msgchannels_t chnid,char * id,int * seq,struct timespec * timeout)327 unsigned char *get_xymond_message(enum msgchannels_t chnid, char *id, int *seq, struct timespec *timeout)
328 {
329 static unsigned int seqnum = 0;
330 static char *idlemsg = NULL;
331 static char *buf = NULL;
332 static size_t bufsz = 0;
333 static size_t maxmsgsize = 0;
334 static int ioerror = 0;
335
336 static char *startpos; /* Where our unused data starts */
337 static char *endpos; /* Where the first message ends */
338 static char *fillpos; /* Where our unused data ends (the \0 byte) */
339
340 int truncated = 0;
341 struct timespec cutoff;
342 int maymove, needmoredata;
343 char *endsrch; /* Where in the buffer do we start looking for the end-message marker */
344 char *result;
345
346 /*
347 * The way this works is to read data from stdin into a
348 * buffer. Each read fetches as much data as possible,
349 * i.e. all that is available up to the amount of
350 * buffer space we have.
351 *
352 * When the buffer contains a complete message,
353 * we return a pointer to the message.
354 *
355 * Since a read into the buffer can potentially
356 * fetch multiple messages, we need to keep track of
357 * the start/end positions of the next message, and
358 * where in the buffer new data should be read in.
359 * As long as there is a complete message available
360 * in the buffer, we just return that message - only
361 * when there is no complete message do we read data
362 * from stdin.
363 *
364 * A message is normally NOT copied, we just return
365 * a pointer to our input buffer. The only time we
366 * need to shuffle data around is if the buffer
367 * does not have room left to hold a complete message.
368 */
369
370 if (buf == NULL) {
371 /*
372 * Initial setup of the buffers.
373 * We allocate a buffer large enough for the largest message
374 * that can arrive on this channel, and add 4KB extra room.
375 * The EXTRABUFSPACE is to allow the memmove() that will be
376 * needed occasionally some room to work optimally.
377 */
378 maxmsgsize = 1024*shbufsz(chnid);
379 bufsz = maxmsgsize + EXTRABUFSPACE;
380 buf = (char *)malloc(bufsz+1);
381 *buf = '\0';
382 startpos = fillpos = buf;
383 endpos = NULL;
384
385 /* idlemsg is used to return the idle message in case of timeouts. */
386 idlemsg = strdup("@@idle\n");
387
388 /* We don't want to block when reading data. */
389 fcntl(inputfd, F_SETFL, O_NONBLOCK);
390 }
391
392 /*
393 * If the start of the next message doesn't begin with "@" then
394 * there's something rotten.
395 */
396 if (*startpos && (*startpos != '@')) {
397 errprintf("Bad data in channel, skipping it\n");
398 startpos = strstr(startpos, "\n@@");
399 endpos = (startpos ? strstr(startpos, "\n@@\n") : NULL);
400 if (startpos && (startpos == endpos)) {
401 startpos = endpos + 4;
402 endpos = strstr(startpos, "\n@@\n");
403 }
404
405 if (!startpos) {
406 /* We're lost - flush the buffer and try to recover */
407 errprintf("Buffer sync lost, flushing data\n");
408 *buf = '\0';
409 startpos = fillpos = buf;
410 endpos = NULL;
411 }
412
413 seqnum = 0; /* After skipping, we don't know what to expect */
414 }
415
416 startagain:
417 if (ioerror) {
418 errprintf("get_xymond_message: Returning NULL due to previous i/o error\n");
419 return NULL;
420 }
421
422 if (timeout) {
423 /* Calculate when the read should timeout. */
424 getntimer(&cutoff);
425 cutoff.tv_sec += timeout->tv_sec;
426 cutoff.tv_nsec += timeout->tv_nsec;
427 if (cutoff.tv_nsec > 1000000000) {
428 cutoff.tv_sec += 1;
429 cutoff.tv_nsec -= 1000000000;
430 }
431 }
432
433 /*
434 * Start looking for the end-of-message marker at the beginning of
435 * the message. The next scans will only look at the new data we've
436 * got when reading data in.
437 */
438 endsrch = startpos;
439
440 /*
441 * See if the current available buffer space is enough to hold a full message.
442 * If not, then flag that we may do a memmove() of the buffer data.
443 */
444 maymove = ((startpos + maxmsgsize) >= (buf + bufsz));
445
446 /* We only need to read data, if we do not have an end-of-message marker */
447 needmoredata = (endpos == NULL);
448 while (needmoredata) {
449 /* Fill buffer with more data until we get an end-of-message marker */
450 struct timespec now;
451 struct timeval selecttmo;
452 fd_set fdread;
453 int res;
454 size_t bufleft = bufsz - (fillpos - buf);
455 size_t usedbytes = (fillpos - startpos);
456
457 dbgprintf("Want msg %d, startpos %ld, fillpos %ld, endpos %ld, usedbytes=%ld, bufleft=%ld\n",
458 (seqnum+1), (startpos-buf), (fillpos-buf), (endpos ? (endpos-buf) : -1), usedbytes, bufleft);
459
460 if (usedbytes >= maxmsgsize) {
461 /* Over-size message. Truncate it. */
462 errprintf("Got over-size message, truncating at %d bytes (max: %d)\n", usedbytes, maxmsgsize);
463 endpos = startpos + usedbytes - 5;
464 memcpy(endpos, "\n@@\n", 4); /* Simulate end-of-message and flush data */
465 needmoredata = 0;
466 truncated = 1;
467 }
468
469 if (needmoredata) {
470 if (maymove && (bufleft < EXTRABUFSPACE)) {
471 /* Buffer is almost full - move data to accommodate a large message. */
472 dbgprintf("Moving %d bytes to start of buffer\n", usedbytes);
473 memmove(buf, startpos, usedbytes);
474 startpos = buf;
475 fillpos = startpos + usedbytes;
476 *fillpos = '\0';
477 endsrch = (usedbytes >= 4) ? (fillpos - 4) : startpos;
478 maymove = 0;
479 bufleft = bufsz - (fillpos - buf);
480 }
481
482 if (timeout) {
483 /* How long time until the timeout ? */
484
485 getntimer(&now);
486 selecttmo.tv_sec = cutoff.tv_sec - now.tv_sec;
487 selecttmo.tv_usec = (cutoff.tv_nsec - now.tv_nsec) / 1000;
488 if (selecttmo.tv_usec < 0) {
489 selecttmo.tv_sec--;
490 selecttmo.tv_usec += 1000000;
491 }
492 }
493
494 FD_ZERO(&fdread);
495 FD_SET(inputfd, &fdread);
496
497 res = select(inputfd+1, &fdread, NULL, NULL, (timeout ? &selecttmo : NULL));
498
499 if (res < 0) {
500 if (errno == EAGAIN) continue;
501
502 if (errno == EINTR) {
503 dbgprintf("get_xymond_message: Interrupted\n");
504 *seq = 0;
505 return idlemsg;
506 }
507
508 /* Some error happened */
509 ioerror = 1;
510 dbgprintf("get_xymond_message: Returning NULL due to select error %s\n",
511 strerror(errno));
512 return NULL;
513 }
514 else if (res == 0) {
515 /*
516 * Timeout - return the "idle" message.
517 * NB: If select() was not passed a timeout parameter, this cannot trigger
518 */
519 *seq = 0;
520 return idlemsg;
521 }
522 else if (FD_ISSET(inputfd, &fdread)) {
523 res = read(inputfd, fillpos, bufleft);
524 if (res < 0) {
525 if ((errno == EAGAIN) || (errno == EINTR)) continue;
526
527 ioerror = 1;
528 dbgprintf("get_xymond_message: Returning NULL due to read error %s\n",
529 strerror(errno));
530 return NULL;
531 }
532 else if (res == 0) {
533 /* read() returns 0 --> End-of-file */
534 ioerror = 1;
535 dbgprintf("get_xymond_message: Returning NULL due to EOF\n");
536 return NULL;
537 }
538 else {
539 /*
540 * Got data - null-terminate it, and update fillpos
541 */
542 dbgprintf("Got %d bytes\n", res);
543
544 *(fillpos+res) = '\0';
545 fillpos += res;
546
547 /* Did we get an end-of-message marker ? Then we're done. */
548 endpos = strstr(endsrch, "\n@@\n");
549 needmoredata = (endpos == NULL);
550
551 /*
552 * If not done, update endsrch. We need to look at the
553 * last 3 bytes of input we got - they could be "\n@@" so
554 * all that is missing is the final "\n".
555 */
556 if (needmoredata && (res >= 3)) endsrch = fillpos-3;
557 }
558 }
559 }
560 }
561
562 /* We have a complete message between startpos and endpos */
563 result = startpos;
564 *endpos = '\0';
565 if (truncated) {
566 startpos = fillpos = buf;
567 endpos = NULL;
568 }
569 else {
570 startpos = endpos+4; /* +4 because we skip the "\n@@\n" end-marker from the previous message */
571 endpos = strstr(startpos, "\n@@\n"); /* To see if we already have a full message loaded */
572 /* fillpos stays where it is */
573 }
574
575 /* Check that it really is a message, and not just some garbled data */
576 if (strncmp(result, "@@", 2) != 0) {
577 errprintf("Dropping (more) garbled data\n");
578 goto startagain;
579 }
580
581 {
582 /*
583 * Get and check the message sequence number.
584 * We don't do this for network based workers, since the
585 * sequence number is globally generated (by xymond)
586 * but a network-based worker may only see some of the
587 * messages (those that are not handled by other network-based
588 * worker modules).
589 */
590 char *p = result + strcspn(result, "#/|\n");
591 if (*p == '#') {
592 *seq = atoi(p+1);
593
594 if (debug) {
595 p = strchr(result, '\n'); if (p) *p = '\0';
596 dbgprintf("%s: Got message %u %s\n", id, *seq, result);
597 if (p) *p = '\n';
598 }
599
600 if ((seqnum == 0) || (*seq == (seqnum + 1))) {
601 /* First message, or the correct sequence # */
602 seqnum = *seq;
603 }
604 else if (*seq == seqnum) {
605 /* Duplicate message - drop it */
606 errprintf("%s: Duplicate message %d dropped\n", id, *seq);
607 goto startagain;
608 }
609 else {
610 /*
611 * Out-of-sequence message. Cant do much except accept it.
612 * Since xymond_channel filters out some messages, messages
613 * may be missing. We really could do without the sequence
614 * numbers now, I think.
615 */
616 seqnum = *seq;
617 }
618
619 if (seqnum == 999999) seqnum = 0;
620 }
621 }
622
623 /* Verify checksum - except for truncated messages, where it won't match since we overwrite bytes with the end-marker */
624 if (result && !truncated) {
625 static struct digestctx_t *ctx = NULL;
626 char *hashstr;
627
628 if (!ctx) {
629 ctx = (digestctx_t *) malloc(sizeof(digestctx_t));
630 ctx->digestname = strdup("md5");
631 ctx->digesttype = D_MD5;
632 ctx->mdctx = (void *)malloc(myMD5_Size());
633 }
634
635 hashstr = result + strcspn(result, ":#/|\n");
636
637 if (*hashstr == ':') {
638 unsigned char md_value[16];
639 char md_string[2*16+1];
640 int i;
641 char *p;
642
643 myMD5_Init(ctx->mdctx);
644 myMD5_Update(ctx->mdctx, result, (hashstr - result));
645 myMD5_Update(ctx->mdctx, hashstr + 33, strlen(hashstr + 33));
646 myMD5_Update(ctx->mdctx, "\n@@\n", 4); /* Stripped earlier */
647 myMD5_Final(md_value, ctx->mdctx);
648 for(i = 0, p = md_string; (i < sizeof(md_value)); i++)
649 p += sprintf(p, "%02x", md_value[i]);
650 *p = '\0';
651 if (memcmp(hashstr+1, md_string, 32) != 0) {
652 p = strchr(result, '\n'); if (p) *(p+1) = '\0';
653 errprintf("get_xymond_message: Invalid checksum, skipping message '%s'\n", result);
654 result = NULL;
655 goto startagain;
656 }
657 }
658 }
659
660 dbgprintf("startpos %ld, fillpos %ld, endpos %ld\n",
661 (startpos-buf), (fillpos-buf), (endpos ? (endpos-buf) : -1));
662
663 return result;
664 }
665
666