1 /*----------------------------------------------------------------------------*/
2 /* Xymon message daemon.                                                      */
3 /*                                                                            */
4 /* This is a small library for xymond worker modules, to read a new message   */
5 /* from the xymond_channel process, and also do the decoding of messages      */
6 /* that are passed on the "meta-data" first line of such a message.           */
7 /*                                                                            */
8 /* Copyright (C) 2004-2011 Henrik Storner <henrik@hswn.dk>                    */
9 /*                                                                            */
10 /* This program is released under the GNU General Public License (GPL),       */
11 /* version 2. See the file "COPYING" for details.                             */
12 /*                                                                            */
13 /*----------------------------------------------------------------------------*/
14 
15 static char rcsid[] = "$Id: xymond_worker.c 7678 2015-10-01 14:42:42Z jccleaver $";
16 
17 #include "config.h"
18 
19 #include <stdio.h>
20 #include <string.h>
21 #include <stdlib.h>
22 #include <ctype.h>
23 
24 #include <unistd.h>
25 #include <fcntl.h>
26 
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 
30 #ifdef HAVE_SYS_SELECT_H
31 #include <sys/select.h>         /* Someday I'll move to GNU Autoconf for this ..  . */
32 #endif
33 
34 #include <sys/time.h>
35 #include <time.h>
36 #include <errno.h>
37 #include <sys/resource.h>
38 #include <sys/wait.h>
39 
40 #include "libxymon.h"
41 
42 #include "xymond_worker.h"
43 
44 #include <signal.h>
45 
46 
47 static int running = 1;
48 static int inputfd = STDIN_FILENO;
49 
50 #define EXTRABUFSPACE 4095
51 
52 static char *locatorlocation = NULL;
53 static char *locatorid = NULL;
54 static enum locator_servicetype_t locatorsvc = ST_MAX;
55 static int  locatorweight = 1;
56 static char *locatorextra = NULL;
57 static char *listenipport = NULL;
58 static time_t locatorhb = 0;
59 
60 
netinp_sighandler(int signum)61 static void netinp_sighandler(int signum)
62 {
63 	switch (signum) {
64 	  case SIGINT:
65 	  case SIGTERM:
66 		running = 0;
67 		break;
68 	}
69 }
70 
net_worker_heartbeat(void)71 static void net_worker_heartbeat(void)
72 {
73 	time_t now;
74 
75 	if (!locatorid || (locatorsvc == ST_MAX)) return;
76 
77 	now = gettimer();
78 	if (now > locatorhb) {
79 		locator_serverup(locatorid, locatorsvc);
80 		locatorhb = now + 60;
81 	}
82 }
83 
net_worker_listener(char * ipport)84 static int net_worker_listener(char *ipport)
85 {
86 	/*
87 	 * Setup a listener socket on IP+port. When a connection arrives,
88 	 * pick it up, fork() and let the rest of the input go via the
89 	 * network socket.
90 	 */
91 
92 	char *listenip, *p;
93 	int listenport = 0;
94 	int lsocket = -1;
95 	struct sockaddr_in laddr;
96 	struct sigaction sa;
97 	int opt;
98 
99 	listenip = ipport;
100 	p = strchr(listenip, ':');
101 	if (p) {
102 		*p = '\0';
103 		listenport = atoi(p+1);
104 	}
105 
106 	if (listenport == 0) {
107 		errprintf("Must include PORT number in --listen=IP:PORT option\n");
108 		return -1;
109 	}
110 
111         /* Set up a socket to listen for new connections */
112 	errprintf("Setting up network listener on %s:%d\n", listenip, listenport);
113 	memset(&laddr, 0, sizeof(laddr));
114 	if ((strlen(listenip) == 0) || (strcmp(listenip, "0.0.0.0") == 0)) {
115 		listenip = "0.0.0.0";
116 		laddr.sin_addr.s_addr = INADDR_ANY;
117 	}
118 	else if (inet_aton(listenip, (struct in_addr *) &laddr.sin_addr.s_addr) == 0) {
119 		/* Not an IP */
120 		errprintf("Listener IP must be an IP-address, not hostname\n");
121 		return -1;
122 	}
123 	laddr.sin_port = htons(listenport);
124 	laddr.sin_family = AF_INET;
125 	lsocket = socket(AF_INET, SOCK_STREAM, 0);
126 	if (lsocket == -1) {
127 		errprintf("Cannot create listen socket (%s)\n", strerror(errno));
128 		return -1;
129 	}
130 
131 	opt = 1;
132 	setsockopt(lsocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
133 
134 	if (bind(lsocket, (struct sockaddr *)&laddr, sizeof(laddr)) == -1) {
135 		errprintf("Cannot bind to listen socket (%s)\n", strerror(errno));
136 		return -1;
137 	}
138 	if (listen(lsocket, 5) == -1) {
139 		errprintf("Cannot listen (%s)\n", strerror(errno));
140 		return -1;
141 	}
142 
143 	/* Make listener socket non-blocking, so we can send keep-alive's while waiting for connections */
144 	fcntl(lsocket, F_SETFL, O_NONBLOCK);
145 
146 	/* Catch some signals */
147 	setup_signalhandler("xymond_listener");
148 	memset(&sa, 0, sizeof(sa));
149 	sa.sa_handler = netinp_sighandler;
150 	sigaction(SIGCHLD, &sa, NULL);
151 	sigaction(SIGTERM, &sa, NULL);
152 	sigaction(SIGINT, &sa, NULL);
153 
154 	while (running) {
155 		struct sockaddr_in netaddr;
156 		int addrsz, sock;
157 		int childstat;
158 		pid_t childpid;
159 		fd_set fdread;
160 		struct timeval tmo;
161 		int n;
162 
163 		FD_ZERO(&fdread);
164 		FD_SET(lsocket, &fdread);
165 		tmo.tv_sec = 60; tmo.tv_usec = 0;
166 		n = select(lsocket+1, &fdread, NULL, NULL, &tmo);
167 		if (n == -1) {
168 			if (errno != EINTR) {
169 				errprintf("select() failed while waiting for connection : %s\n", strerror(errno));
170 				running = 0;
171 			}
172 		}
173 		else if (n == 0) {
174 			/* Timeout */
175 			net_worker_heartbeat();
176 		}
177 		else {
178 			/* We have a connection ready */
179 			addrsz = sizeof(netaddr);
180 			sock = accept(lsocket, (struct sockaddr *)&netaddr, &addrsz);
181 
182 			if (sock >= 0) {
183 				/* Got a new connection */
184 
185 				childpid = fork();
186 				if (childpid == 0) {
187 					/* Child takes input from the new socket, and starts working */
188 					close(lsocket);	/* Close the listener socket */
189 					inputfd = sock;
190 					return 0;
191 				}
192 				else if (childpid > 0) {
193 					/* Parent closes the new socket (child has it) */
194 					close(sock);
195 					continue;
196 				}
197 				else {
198 					errprintf("Error forking worker for new connection: %s\n", strerror(errno));
199 					running = 0;
200 					continue;
201 				}
202 			}
203 			else {
204 				/* Error while waiting for accept() to complete */
205 				if (errno != EINTR) {
206 					errprintf("accept() failed: %s\n", strerror(errno));
207 					running = 0;
208 				}
209 			}
210 		}
211 
212 		/* Pickup failed children */
213 		while ((childpid = wait3(&childstat, WNOHANG, NULL)) > 0);
214 	}
215 
216 	/* Close the listener socket */
217 	close(lsocket);
218 
219 	return 1;
220 }
221 
222 
net_worker_option(char * arg)223 int net_worker_option(char *arg)
224 {
225 	int res = 1;
226 
227 	if (argnmatch(arg, "--locator=")) {
228 		char *p = strchr(arg, '=');
229 		locatorlocation = strdup(p+1);
230 	}
231 	else if (argnmatch(arg, "--locatorid=")) {
232 		char *p = strchr(arg, '=');
233 		locatorid = strdup(p+1);
234 	}
235 	else if (argnmatch(arg, "--locatorweight=")) {
236 		char *p = strchr(arg, '=');
237 		locatorweight = atoi(p+1);
238 	}
239 	else if (argnmatch(arg, "--locatorextra=")) {
240 		char *p = strchr(arg, '=');
241 		locatorextra = strdup(p+1);
242 	}
243 	else if (argnmatch(arg, "--listen=")) {
244 		char *p = strchr(arg, '=');
245 		listenipport = strdup(p+1);
246 	}
247 	else {
248 		res = 0;
249 	}
250 
251 	return res;
252 }
253 
254 
net_worker_locatorbased(void)255 int net_worker_locatorbased(void)
256 {
257 	return ((locatorsvc != ST_MAX) && listenipport && locatorlocation);
258 }
259 
net_worker_run(enum locator_servicetype_t svc,enum locator_sticky_t sticky,update_fn_t * updfunc)260 void net_worker_run(enum locator_servicetype_t svc, enum locator_sticky_t sticky, update_fn_t *updfunc)
261 {
262 	locatorsvc = svc;
263 
264 	if (listenipport) {
265 		char *p;
266 		struct in_addr dummy;
267 
268 		if (!locatorid) locatorid = strdup(listenipport);
269 
270 		p = strchr(locatorid, ':');
271 		if (p == NULL) {
272 			errprintf("Locator ID must be IP:PORT matching the listener address\n");
273 			exit(1);
274 		}
275 		*p = '\0';
276 		if (inet_aton(locatorid, &dummy) == 0) {
277 			errprintf("Locator ID must be IP:PORT matching the listener address\n");
278 			exit(1);
279 		}
280 		*p = ':';
281 	}
282 
283 	if (listenipport && locatorlocation) {
284 		int res;
285 		int delay = 10;
286 
287 		/* Tell the world we're here */
288 		while (locator_init(locatorlocation) != 0) {
289 			errprintf("Locator unavailable, waiting for it to be ready\n");
290 			sleep(delay);
291 			if (delay < 240) delay *= 2;
292 		}
293 
294 		locator_register_server(locatorid, svc, locatorweight, sticky, locatorextra);
295 		if (updfunc) (*updfunc)(locatorid);
296 
297 		/* Launch the network listener and wait for incoming connections */
298 		res = net_worker_listener(listenipport);
299 
300 		/*
301 		 * Return value is:
302 		 * -1 : Error in setup. Abort.
303 		 *  0 : New connection arrived, and this is now a forked worker process. Continue.
304 		 *  1 : Listener terminates. Exit normally.
305 		 */
306 		if (res == -1) {
307 			errprintf("Listener setup failed, aborting\n");
308 			locator_serverdown(locatorid, svc);
309 			exit(1);
310 		}
311 		else if (res == 1) {
312 			errprintf("xymond_listener listener terminated\n");
313 			locator_serverdown(locatorid, svc);
314 			exit(0);
315 		}
316 		else {
317 			/* Worker process started. Return from here causes worker to start. */
318 		}
319 	}
320 	else if (listenipport || locatorlocation || locatorid) {
321 		errprintf("Must specify all of --listen, --locator and --locatorid\n");
322 		exit(1);
323 	}
324 }
325 
326 
get_xymond_message(enum msgchannels_t chnid,char * id,int * seq,struct timespec * timeout)327 unsigned char *get_xymond_message(enum msgchannels_t chnid, char *id, int *seq, struct timespec *timeout)
328 {
329 	static unsigned int seqnum = 0;
330 	static char *idlemsg = NULL;
331 	static char *buf = NULL;
332 	static size_t bufsz = 0;
333 	static size_t maxmsgsize = 0;
334 	static int ioerror = 0;
335 
336 	static char *startpos;	/* Where our unused data starts */
337 	static char *endpos;	/* Where the first message ends */
338 	static char *fillpos;	/* Where our unused data ends (the \0 byte) */
339 
340 	int truncated = 0;
341 	struct timespec cutoff;
342 	int maymove, needmoredata;
343 	char *endsrch;		/* Where in the buffer do we start looking for the end-message marker */
344 	char *result;
345 
346 	/*
347 	 * The way this works is to read data from stdin into a
348 	 * buffer. Each read fetches as much data as possible,
349 	 * i.e. all that is available up to the amount of
350 	 * buffer space we have.
351 	 *
352 	 * When the buffer contains a complete message,
353 	 * we return a pointer to the message.
354 	 *
355 	 * Since a read into the buffer can potentially
356 	 * fetch multiple messages, we need to keep track of
357 	 * the start/end positions of the next message, and
358 	 * where in the buffer new data should be read in.
359 	 * As long as there is a complete message available
360 	 * in the buffer, we just return that message - only
361 	 * when there is no complete message do we read data
362 	 * from stdin.
363 	 *
364 	 * A message is normally NOT copied, we just return
365 	 * a pointer to our input buffer. The only time we
366 	 * need to shuffle data around is if the buffer
367 	 * does not have room left to hold a complete message.
368 	 */
369 
370 	if (buf == NULL) {
371 		/*
372 		 * Initial setup of the buffers.
373 		 * We allocate a buffer large enough for the largest message
374 		 * that can arrive on this channel, and add 4KB extra room.
375 		 * The EXTRABUFSPACE is to allow the memmove() that will be
376 		 * needed occasionally some room to work optimally.
377 		 */
378 		maxmsgsize = 1024*shbufsz(chnid);
379 		bufsz = maxmsgsize + EXTRABUFSPACE;
380 		buf = (char *)malloc(bufsz+1);
381 		*buf = '\0';
382 		startpos = fillpos = buf;
383 		endpos = NULL;
384 
385 		/* idlemsg is used to return the idle message in case of timeouts. */
386 		idlemsg = strdup("@@idle\n");
387 
388 		/* We don't want to block when reading data. */
389 		fcntl(inputfd, F_SETFL, O_NONBLOCK);
390 	}
391 
392 	/*
393 	 * If the start of the next message doesn't begin with "@" then
394 	 * there's something rotten.
395 	 */
396 	if (*startpos && (*startpos != '@')) {
397 		errprintf("Bad data in channel, skipping it\n");
398 		startpos = strstr(startpos, "\n@@");
399 		endpos = (startpos ? strstr(startpos, "\n@@\n") : NULL);
400 		if (startpos && (startpos == endpos)) {
401 			startpos = endpos + 4;
402 			endpos = strstr(startpos, "\n@@\n");
403 		}
404 
405 		if (!startpos) {
406 			/* We're lost - flush the buffer and try to recover */
407 			errprintf("Buffer sync lost, flushing data\n");
408 			*buf = '\0';
409 			startpos = fillpos = buf;
410 			endpos = NULL;
411 		}
412 
413 		seqnum = 0; /* After skipping, we don't know what to expect */
414 	}
415 
416 startagain:
417 	if (ioerror) {
418 		errprintf("get_xymond_message: Returning NULL due to previous i/o error\n");
419 		return NULL;
420 	}
421 
422 	if (timeout) {
423 		/* Calculate when the read should timeout. */
424 		getntimer(&cutoff);
425 		cutoff.tv_sec += timeout->tv_sec;
426 		cutoff.tv_nsec += timeout->tv_nsec;
427 		if (cutoff.tv_nsec > 1000000000) {
428 			cutoff.tv_sec += 1;
429 			cutoff.tv_nsec -= 1000000000;
430 		}
431 	}
432 
433 	/*
434 	 * Start looking for the end-of-message marker at the beginning of
435 	 * the message. The next scans will only look at the new data we've
436 	 * got when reading data in.
437 	 */
438 	endsrch = startpos;
439 
440 	/*
441 	 * See if the current available buffer space is enough to hold a full message.
442 	 * If not, then flag that we may do a memmove() of the buffer data.
443 	 */
444 	maymove = ((startpos + maxmsgsize) >= (buf + bufsz));
445 
446 	/* We only need to read data, if we do not have an end-of-message marker */
447 	needmoredata = (endpos == NULL);
448 	while (needmoredata) {
449 		/* Fill buffer with more data until we get an end-of-message marker */
450 		struct timespec now;
451 		struct timeval selecttmo;
452 		fd_set fdread;
453 		int res;
454 		size_t bufleft = bufsz - (fillpos - buf);
455 		size_t usedbytes = (fillpos - startpos);
456 
457 		dbgprintf("Want msg %d, startpos %ld, fillpos %ld, endpos %ld, usedbytes=%ld, bufleft=%ld\n",
458 			  (seqnum+1), (startpos-buf), (fillpos-buf), (endpos ? (endpos-buf) : -1), usedbytes, bufleft);
459 
460 		if (usedbytes >= maxmsgsize) {
461 			/* Over-size message. Truncate it. */
462 			errprintf("Got over-size message, truncating at %d bytes (max: %d)\n", usedbytes, maxmsgsize);
463 			endpos = startpos + usedbytes - 5;
464 			memcpy(endpos, "\n@@\n", 4);	/* Simulate end-of-message and flush data */
465 			needmoredata = 0;
466 			truncated = 1;
467 		}
468 
469 		if (needmoredata) {
470 			if (maymove && (bufleft < EXTRABUFSPACE)) {
471 				/* Buffer is almost full - move data to accommodate a large message. */
472 				dbgprintf("Moving %d bytes to start of buffer\n", usedbytes);
473 				memmove(buf, startpos, usedbytes);
474 				startpos = buf;
475 				fillpos = startpos + usedbytes;
476 				*fillpos = '\0';
477 				endsrch = (usedbytes >= 4) ? (fillpos - 4) : startpos;
478 				maymove = 0;
479 				bufleft = bufsz - (fillpos - buf);
480 			}
481 
482 			if (timeout) {
483 				/* How long time until the timeout ? */
484 
485 				getntimer(&now);
486 				selecttmo.tv_sec = cutoff.tv_sec - now.tv_sec;
487 				selecttmo.tv_usec = (cutoff.tv_nsec - now.tv_nsec) / 1000;
488 				if (selecttmo.tv_usec < 0) {
489 					selecttmo.tv_sec--;
490 					selecttmo.tv_usec += 1000000;
491 				}
492 			}
493 
494 			FD_ZERO(&fdread);
495 			FD_SET(inputfd, &fdread);
496 
497 			res = select(inputfd+1, &fdread, NULL, NULL, (timeout ? &selecttmo : NULL));
498 
499 			if (res < 0) {
500 				if (errno == EAGAIN) continue;
501 
502 				if (errno == EINTR) {
503 					dbgprintf("get_xymond_message: Interrupted\n");
504 					*seq = 0;
505 					return idlemsg;
506 				}
507 
508 				/* Some error happened */
509 				ioerror = 1;
510 				dbgprintf("get_xymond_message: Returning NULL due to select error %s\n",
511 						strerror(errno));
512 				return NULL;
513 			}
514 			else if (res == 0) {
515 				/*
516 				 * Timeout - return the "idle" message.
517 				 * NB: If select() was not passed a timeout parameter, this cannot trigger
518 				 */
519 				*seq = 0;
520 				return idlemsg;
521 			}
522 			else if (FD_ISSET(inputfd, &fdread)) {
523 				res = read(inputfd, fillpos, bufleft);
524 				if (res < 0) {
525 					if ((errno == EAGAIN) || (errno == EINTR)) continue;
526 
527 					ioerror = 1;
528 					dbgprintf("get_xymond_message: Returning NULL due to read error %s\n",
529 							strerror(errno));
530 					return NULL;
531 				}
532 				else if (res == 0) {
533 					/* read() returns 0 --> End-of-file */
534 					ioerror = 1;
535 					dbgprintf("get_xymond_message: Returning NULL due to EOF\n");
536 					return NULL;
537 				}
538 				else {
539 					/*
540 					 * Got data - null-terminate it, and update fillpos
541 					 */
542 					dbgprintf("Got %d bytes\n", res);
543 
544 					*(fillpos+res) = '\0';
545 					fillpos += res;
546 
547 					/* Did we get an end-of-message marker ? Then we're done. */
548 					endpos = strstr(endsrch, "\n@@\n");
549 					needmoredata = (endpos == NULL);
550 
551 					/*
552 					 * If not done, update endsrch. We need to look at the
553 					 * last 3 bytes of input we got - they could be "\n@@" so
554 					 * all that is missing is the final "\n".
555 					 */
556 					if (needmoredata && (res >= 3)) endsrch = fillpos-3;
557 				}
558 			}
559 		}
560 	}
561 
562 	/* We have a complete message between startpos and endpos */
563 	result = startpos;
564 	*endpos = '\0';
565 	if (truncated) {
566 		startpos = fillpos = buf;
567 		endpos = NULL;
568 	}
569 	else {
570 		startpos = endpos+4; /* +4 because we skip the "\n@@\n" end-marker from the previous message */
571 		endpos = strstr(startpos, "\n@@\n");	/* To see if we already have a full message loaded */
572 		/* fillpos stays where it is */
573 	}
574 
575 	/* Check that it really is a message, and not just some garbled data */
576 	if (strncmp(result, "@@", 2) != 0) {
577 		errprintf("Dropping (more) garbled data\n");
578 		goto startagain;
579 	}
580 
581 	{
582 		/*
583 		 * Get and check the message sequence number.
584 		 * We don't do this for network based workers, since the
585 		 * sequence number is globally generated (by xymond)
586 		 * but a network-based worker may only see some of the
587 		 * messages (those that are not handled by other network-based
588 		 * worker modules).
589 		 */
590 		char *p = result + strcspn(result, "#/|\n");
591 		if (*p == '#') {
592 			*seq = atoi(p+1);
593 
594 			if (debug) {
595 				p = strchr(result, '\n'); if (p) *p = '\0';
596 				dbgprintf("%s: Got message %u %s\n", id, *seq, result);
597 				if (p) *p = '\n';
598 			}
599 
600 			if ((seqnum == 0) || (*seq == (seqnum + 1))) {
601 				/* First message, or the correct sequence # */
602 				seqnum = *seq;
603 			}
604 			else if (*seq == seqnum) {
605 				/* Duplicate message - drop it */
606 				errprintf("%s: Duplicate message %d dropped\n", id, *seq);
607 				goto startagain;
608 			}
609 			else {
610 				/*
611 				 * Out-of-sequence message. Cant do much except accept it.
612 				 * Since xymond_channel filters out some messages, messages
613 				 * may be missing. We really could do without the sequence
614 				 * numbers now, I think.
615 				 */
616 				seqnum = *seq;
617 			}
618 
619 			if (seqnum == 999999) seqnum = 0;
620 		}
621 	}
622 
623 	/* Verify checksum - except for truncated messages, where it won't match since we overwrite bytes with the end-marker */
624 	if (result && !truncated) {
625 		static struct digestctx_t *ctx = NULL;
626 		char *hashstr;
627 
628 		if (!ctx) {
629 			ctx = (digestctx_t *) malloc(sizeof(digestctx_t));
630 			ctx->digestname = strdup("md5");
631 			ctx->digesttype = D_MD5;
632 			ctx->mdctx = (void *)malloc(myMD5_Size());
633 		}
634 
635 		hashstr = result + strcspn(result, ":#/|\n");
636 
637 		if (*hashstr == ':') {
638 			unsigned char md_value[16];
639 			char md_string[2*16+1];
640 			int i;
641 			char *p;
642 
643 			myMD5_Init(ctx->mdctx);
644 			myMD5_Update(ctx->mdctx, result, (hashstr - result));
645 			myMD5_Update(ctx->mdctx, hashstr + 33, strlen(hashstr + 33));
646 			myMD5_Update(ctx->mdctx, "\n@@\n", 4);	/* Stripped earlier */
647 			myMD5_Final(md_value, ctx->mdctx);
648 			for(i = 0, p = md_string; (i < sizeof(md_value)); i++)
649 				p += sprintf(p, "%02x", md_value[i]);
650 			*p = '\0';
651 			if (memcmp(hashstr+1, md_string, 32) != 0) {
652 				p = strchr(result, '\n'); if (p) *(p+1) = '\0';
653 				errprintf("get_xymond_message: Invalid checksum, skipping message '%s'\n", result);
654 				result = NULL;
655 				goto startagain;
656 			}
657 		}
658 	}
659 
660 	dbgprintf("startpos %ld, fillpos %ld, endpos %ld\n",
661 		  (startpos-buf), (fillpos-buf), (endpos ? (endpos-buf) : -1));
662 
663 	return result;
664 }
665 
666