1 /*
2  * ovdb_server.c
3  * ovdb read server
4  */
5 
6 #include "config.h"
7 #include "clibrary.h"
8 #include "portable/mmap.h"
9 #include "portable/setproctitle.h"
10 #include "portable/socket.h"
11 #include <errno.h>
12 #include <fcntl.h>
13 #include <signal.h>
14 #ifdef HAVE_SYS_SELECT_H
15 # include <sys/select.h>
16 #endif
17 #include <syslog.h>
18 
19 #ifdef HAVE_SYS_TIME_H
20 # include <sys/time.h>
21 #endif
22 #include <time.h>
23 
24 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
25 # include "portable/socket-unix.h"
26 #endif
27 #include <sys/wait.h>
28 
29 #include "inn/fdflag.h"
30 #include "inn/innconf.h"
31 #include "inn/messages.h"
32 #include "inn/libinn.h"
33 #include "inn/paths.h"
34 #include "inn/storage.h"
35 #include "inn/ov.h"
36 
37 #include "../storage/ovdb/ovdb.h"
38 #include "../storage/ovdb/ovdb-private.h"
39 
40 #ifndef HAVE_BDB
41 
42 int
main(int argc UNUSED,char ** argv UNUSED)43 main(int argc UNUSED, char **argv UNUSED)
44 {
45     die("Berkeley DB support not compiled");
46 }
47 
48 #else /* HAVE_BDB */
49 
50 
51 #define SELECT_TIMEOUT 15
52 
53 
54 /* This will work unless user sets a larger clienttimeout
55    in readers.conf */
56 #define CLIENT_TIMEOUT (innconf->clienttimeout + 60)
57 /*#define CLIENT_TIMEOUT 3600*/
58 
59 
60 static int listensock;
61 
62 #define MODE_READ   0
63 #define MODE_WRITE  1
64 #define MODE_CLOSED 2
65 #define STATE_READCMD 0
66 #define STATE_READGROUP 1
67 struct reader {
68     int fd;
69     int mode;
70     int state;
71     int buflen;
72     int bufpos;
73     void *buf;
74     time_t lastactive;
75     void *currentsearch;
76 };
77 
78 static struct reader *readertab;
79 static int readertablen;
80 static int numreaders;
81 static time_t now;
82 static pid_t parent;
83 
84 struct child {
85     pid_t pid;
86     int num;
87     time_t started;
88 };
89 static struct child *children;
90 #define wholistens (children[ovdb_conf.numrsprocs].num)
91 
92 static int signalled = 0;
93 static void
sigfunc(int sig UNUSED)94 sigfunc(int sig UNUSED)
95 {
96     signalled = 1;
97 }
98 
99 static int updated = 0;
100 static void
childsig(int sig UNUSED)101 childsig(int sig UNUSED)
102 {
103     updated = 1;
104 }
105 
106 static void
parentsig(int sig UNUSED)107 parentsig(int sig UNUSED)
108 {
109     int i, which, smallest;
110     if(wholistens < 0) {
111 	which = smallest = -1;
112 	for(i = 0; i < ovdb_conf.numrsprocs; i++) {
113 	    if(children[i].pid == -1)
114 		continue;
115 	    if(!ovdb_conf.maxrsconn || children[i].num <= ovdb_conf.maxrsconn) {
116 		if(smallest == -1 || children[i].num < smallest) {
117 		    smallest = children[i].num;
118 		    which = i;
119 		}
120 	    }
121 	}
122 	if(which != -1) {
123 	    wholistens = which;
124 	    kill(children[which].pid, SIGUSR1);
125 	} else {
126 	    wholistens = -2;
127 	}
128 	updated = 1;
129     }
130 }
131 
putpid(const char * path)132 static int putpid(const char *path)
133 {
134     char buf[30];
135     int fd = open(path, O_WRONLY|O_TRUNC|O_CREAT, 0664);
136     if(fd == -1) {
137         syswarn("cannot open %s", path);
138         return -1;
139     }
140     snprintf(buf, sizeof(buf), "%d\n", getpid());
141     if(write(fd, buf, strlen(buf)) < 0) {
142         syswarn("cannot write to %s", path);
143         close(fd);
144         return -1;
145     }
146     close(fd);
147     return 0;
148 }
149 
150 static void
do_groupstats(struct reader * r)151 do_groupstats(struct reader *r)
152 {
153     struct rs_groupstats *reply;
154     char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
155     reply = xmalloc(sizeof(struct rs_groupstats));
156 
157     debug("OVDB: rs: do_groupstats '%s'", group);
158     if(ovdb_groupstats(group, &reply->lo, &reply->hi, &reply->count, &reply->flag)) {
159 	reply->status = CMD_GROUPSTATS;
160     	reply->aliaslen = 0;
161     } else {
162 	reply->status = CMD_GROUPSTATS | RPLY_ERROR;
163     }
164     free(r->buf);
165     r->buf = reply;
166     r->buflen = sizeof(struct rs_groupstats);
167     r->bufpos = 0;
168     r->mode = MODE_WRITE;
169 }
170 
171 static void
do_opensrch(struct reader * r)172 do_opensrch(struct reader *r)
173 {
174     struct rs_cmd *cmd = r->buf;
175     struct rs_opensrch *reply;
176     char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
177     reply = xmalloc(sizeof(struct rs_opensrch));
178 
179     debug("OVDB: rs: do_opensrch '%s' %d %d", group, cmd->artlo, cmd->arthi);
180 
181     if(r->currentsearch != NULL) {
182 	/* can only open one search at a time */
183 	reply->status = CMD_OPENSRCH | RPLY_ERROR;
184     } else {
185 	reply->handle = ovdb_opensearch(group, cmd->artlo, cmd->arthi);
186 	if(reply->handle == NULL) {
187 	    reply->status = CMD_OPENSRCH | RPLY_ERROR;
188 	} else {
189 	    reply->status = CMD_OPENSRCH;
190 	}
191 	r->currentsearch = reply->handle;
192     }
193     free(r->buf);
194     r->buf = reply;
195     r->buflen = sizeof(struct rs_opensrch);
196     r->bufpos = 0;
197     r->mode = MODE_WRITE;
198 }
199 
200 static void
do_srch(struct reader * r)201 do_srch(struct reader *r)
202 {
203     struct rs_cmd *cmd = r->buf;
204     struct rs_srch *reply;
205     ARTNUM artnum;
206     TOKEN token;
207     time_t arrived;
208     int len;
209     char *data;
210 
211     if(ovdb_search(cmd->handle, &artnum, &data, &len, &token, &arrived)) {
212     	reply = xmalloc(sizeof(struct rs_srch) + len);
213 	reply->status = CMD_SRCH;
214 	reply->artnum = artnum;
215 	reply->token = token;
216 	reply->arrived = arrived;
217 	reply->len = len;
218 	memcpy((char *)reply + sizeof(struct rs_srch), data, len);
219 	r->buflen = sizeof(struct rs_srch) + len;
220     } else {
221     	reply = xmalloc(sizeof(struct rs_srch));
222 	reply->status = CMD_SRCH | RPLY_ERROR;
223 	r->buflen = sizeof(struct rs_srch);
224     }
225     free(r->buf);
226     r->buf = reply;
227     r->bufpos = 0;
228     r->mode = MODE_WRITE;
229 }
230 
231 static void
do_closesrch(struct reader * r)232 do_closesrch(struct reader *r)
233 {
234     struct rs_cmd *cmd = r->buf;
235 
236     ovdb_closesearch(cmd->handle);
237     free(r->buf);
238     r->buf = NULL;
239     r->bufpos = r->buflen = 0;
240     r->mode = MODE_READ;
241     r->currentsearch = NULL;
242 }
243 
244 static void
do_artinfo(struct reader * r)245 do_artinfo(struct reader *r)
246 {
247     struct rs_cmd *cmd = r->buf;
248     struct rs_artinfo *reply;
249     char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
250     TOKEN token;
251 
252     debug("OVDB: rs: do_artinfo: '%s' %d", group, cmd->artlo);
253     if(ovdb_getartinfo(group, cmd->artlo, &token)) {
254     	reply = xmalloc(sizeof(struct rs_artinfo));
255 	reply->status = CMD_ARTINFO;
256 	reply->token = token;
257 	r->buflen = sizeof(struct rs_artinfo);
258     } else {
259     	reply = xmalloc(sizeof(struct rs_artinfo));
260 	reply->status = CMD_ARTINFO | RPLY_ERROR;
261 	r->buflen = sizeof(struct rs_artinfo);
262     }
263     free(r->buf);
264     r->buf = reply;
265     r->bufpos = 0;
266     r->mode = MODE_WRITE;
267 }
268 
269 
270 static int
process_cmd(struct reader * r)271 process_cmd(struct reader *r)
272 {
273     struct rs_cmd *cmd = r->buf;
274 
275     if(r->state == STATE_READCMD) {
276 	switch(cmd->what) {
277 	case CMD_GROUPSTATS:
278 	case CMD_OPENSRCH:
279 	case CMD_ARTINFO:
280 	    r->state = STATE_READGROUP;
281 	    if(cmd->grouplen == 0) {
282 		/* shouldn't happen... */
283 		r->mode = MODE_CLOSED;
284 		close(r->fd);
285 		free(r->buf);
286 		r->buf = NULL;
287 		return 0;
288 	    }
289 	    r->buflen += cmd->grouplen;
290             r->buf = xrealloc(r->buf, r->buflen);
291 	    return 1;
292 	}
293     }
294 
295     switch(cmd->what) {
296     case CMD_GROUPSTATS:
297 	((char *)r->buf)[r->buflen - 1] = 0;	/* make sure group is null-terminated */
298 	do_groupstats(r);
299 	break;
300     case CMD_OPENSRCH:
301 	((char *)r->buf)[r->buflen - 1] = 0;
302 	do_opensrch(r);
303 	break;
304     case CMD_SRCH:
305 	do_srch(r);
306 	break;
307     case CMD_CLOSESRCH:
308 	do_closesrch(r);
309 	break;
310     case CMD_ARTINFO:
311 	((char *)r->buf)[r->buflen - 1] = 0;
312 	do_artinfo(r);
313 	break;
314     default:
315 	r->mode = MODE_CLOSED;
316 	close(r->fd);
317 	free(r->buf);
318 	r->buf = NULL;
319 	break;
320     }
321 
322     return 0;
323 }
324 
325 static void
handle_read(struct reader * r)326 handle_read(struct reader *r)
327 {
328     int n;
329     r->lastactive = now;
330 
331     if(r->buf == NULL) {
332     	r->state = STATE_READCMD;
333     	r->buf = xmalloc(sizeof(struct rs_cmd));
334 	r->buflen = sizeof(struct rs_cmd);
335 	r->bufpos = 0;
336     }
337 again:
338     n = read(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
339     if(n <= 0) {
340 	if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
341 	    return;
342     	r->mode = MODE_CLOSED;
343 	close(r->fd);
344 	free(r->buf);
345 	r->buf = NULL;
346     }
347     r->bufpos += n;
348 
349     if(r->bufpos >= r->buflen)
350 	if(process_cmd(r))
351 	    goto again;
352 }
353 
354 static void
handle_write(struct reader * r)355 handle_write(struct reader *r)
356 {
357     int n;
358     r->lastactive = now;
359 
360     if(r->buf == NULL)	/* shouldn't happen */
361 	return;
362 
363     n = write(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
364     if(n <= 0) {
365 	if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
366 	    return;
367     	r->mode = MODE_CLOSED;
368 	close(r->fd);
369 	free(r->buf);
370 	r->buf = NULL;
371     }
372     r->bufpos += n;
373 
374     if(r->bufpos >= r->buflen) {
375 	free(r->buf);
376     	r->buf = NULL;
377 	r->bufpos = r->buflen = 0;
378 	r->mode = MODE_READ;
379     }
380 }
381 
382 static void
newclient(int fd)383 newclient(int fd)
384 {
385     struct reader *r;
386     int i;
387 
388     fdflag_nonblocking(fd, 1);
389 
390     if(numreaders >= readertablen) {
391     	readertablen += 50;
392         readertab = xrealloc(readertab, readertablen * sizeof(struct reader));
393         for(i = numreaders; i < readertablen; i++) {
394 	    readertab[i].mode = MODE_CLOSED;
395 	    readertab[i].buf = NULL;
396 	}
397     }
398 
399     r = &(readertab[numreaders]);
400     numreaders++;
401 
402     r->fd = fd;
403     r->mode = MODE_WRITE;
404     r->buflen = sizeof(OVDB_SERVER_BANNER);
405     r->bufpos = 0;
406     r->buf = xstrdup(OVDB_SERVER_BANNER);
407     r->lastactive = now;
408     r->currentsearch = NULL;
409 
410     handle_write(r);
411 }
412 
413 static void
delclient(int which)414 delclient(int which)
415 {
416     int i;
417     struct reader *r = &(readertab[which]);
418 
419     if(r->mode != MODE_CLOSED)
420     	close(r->fd);
421 
422     if(r->buf != NULL) {
423     	free(r->buf);
424     }
425     if(r->currentsearch != NULL) {
426 	ovdb_closesearch(r->currentsearch);
427 	r->currentsearch = NULL;
428     }
429 
430     /* numreaders will get decremented by the calling function */
431     for(i = which; i < numreaders-1; i++)
432     	readertab[i] = readertab[i+1];
433 
434     readertab[i].mode = MODE_CLOSED;
435     readertab[i].buf = NULL;
436 }
437 
438 static pid_t
serverproc(int me)439 serverproc(int me)
440 {
441     fd_set rdset, wrset;
442     int i, ret, count, lastfd, lastnumreaders;
443     socklen_t salen;
444     struct sockaddr_in sa;
445     struct timeval tv;
446     pid_t pid;
447 
448     pid = fork();
449     if (pid != 0)
450 	return pid;
451 
452     if (!ovdb_open(OV_READ|OVDB_SERVER))
453         die("cannot open overview");
454     xsignal_norestart(SIGINT, sigfunc);
455     xsignal_norestart(SIGTERM, sigfunc);
456     xsignal_norestart(SIGHUP, sigfunc);
457     xsignal_norestart(SIGUSR1, childsig);
458     xsignal(SIGPIPE, SIG_IGN);
459 
460     numreaders = lastnumreaders = 0;
461     if(ovdb_conf.maxrsconn) {
462 	readertablen = ovdb_conf.maxrsconn;
463     } else {
464     	readertablen = 50;
465     }
466     readertab = xmalloc(readertablen * sizeof(struct reader));
467     for(i = 0; i < readertablen; i++) {
468 	readertab[i].mode = MODE_CLOSED;
469 	readertab[i].buf = NULL;
470     }
471 
472     setproctitle("0 clients");
473 
474     /* main loop */
475     while(!signalled) {
476 	FD_ZERO(&rdset);
477 	FD_ZERO(&wrset);
478 	lastfd = 0;
479 	if(wholistens == me) {
480 	    if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
481 		FD_SET(listensock, &rdset);
482 		lastfd = listensock;
483                 setproctitle("%d client%s *", numreaders,
484                              numreaders == 1 ? "" : "s");
485 	    } else {
486 		wholistens = -1;
487 		kill(parent, SIGUSR1);
488 	    }
489         }
490 
491 	for(i = 0; i < numreaders; i++) {
492 	    switch(readertab[i].mode) {
493 	    case MODE_READ:
494 	    	FD_SET(readertab[i].fd, &rdset);
495 		break;
496 	    case MODE_WRITE:
497 	    	FD_SET(readertab[i].fd, &wrset);
498 		break;
499 	    default:
500 	    	continue;
501 	    }
502 	    if(readertab[i].fd > lastfd)
503 	    	lastfd = readertab[i].fd;
504 	}
505 	tv.tv_usec = 0;
506 	tv.tv_sec = SELECT_TIMEOUT;
507 	count = select(lastfd + 1, &rdset, &wrset, NULL, &tv);
508 
509 	if(signalled)
510 	    break;
511 	if(count <= 0)
512 	    continue;
513 
514 	now = time(NULL);
515 
516 	if(FD_ISSET(listensock, &rdset)) {
517 	    if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
518 		salen = sizeof(sa);
519 	    	ret = accept(listensock, (struct sockaddr *)&sa, &salen);
520 		if(ret >= 0) {
521 		    newclient(ret);
522 		    wholistens = -1;
523 		    children[me].num = numreaders;
524 		    kill(parent, SIGUSR1);
525 		}
526 	    }
527 	}
528 
529 	for(i = 0; i < numreaders; i++) {
530 	    switch(readertab[i].mode) {
531 	    case MODE_READ:
532 	    	if(FD_ISSET(readertab[i].fd, &rdset))
533 		    handle_read(&(readertab[i]));
534 	        break;
535 	    case MODE_WRITE:
536 	    	if(FD_ISSET(readertab[i].fd, &wrset))
537 		    handle_write(&(readertab[i]));
538 		break;
539 	    }
540 	}
541 
542 	for(i = 0; i < numreaders; i++) {
543 	    if(readertab[i].mode == MODE_CLOSED
544 		  || (time_t) (readertab[i].lastactive + CLIENT_TIMEOUT) < now) {
545 	    	delclient(i);
546 		numreaders--;
547 		i--;
548 	    }
549 	}
550 	if(children[me].num != numreaders) {
551 	    children[me].num = numreaders;
552 	    kill(parent, SIGUSR1);
553         }
554 	if(numreaders != lastnumreaders) {
555 	    lastnumreaders = numreaders;
556             setproctitle("%d client%s", numreaders,
557                          numreaders == 1 ? "" : "s");
558 	}
559     }
560 
561     ovdb_close();
562     exit(0);
563 }
564 
565 static int
reap(void)566 reap(void)
567 {
568     int i, cs;
569     pid_t c;
570 
571     while((c = waitpid(-1, &cs, WNOHANG)) > 0) {
572 	for(i = 0; i < ovdb_conf.numrsprocs; i++) {
573 	    if(c == children[i].pid) {
574 		if(children[i].started + 30 > time(NULL))
575 		    return 1;
576 
577 		children[i].num = 0;
578 
579 		if(wholistens == i)
580 		    wholistens = -1;
581 
582 		if((children[i].pid = serverproc(i)) == -1)
583 		    return 1;
584 
585 		children[i].started = time(NULL);
586 		break;
587 	    }
588 	}
589     }
590     if(wholistens == -1)
591 	parentsig(SIGUSR1);
592     return 0;
593 }
594 
595 #ifndef MAP_ANON
596 #ifdef MAP_ANONYMOUS
597 #define MAP_ANON MAP_ANONYMOUS
598 #endif
599 #endif
600 
601 static void *
sharemem(size_t len)602 sharemem(size_t len)
603 {
604 #ifdef MAP_ANON
605     return mmap(0, len, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);
606 #else
607     int fd = open("/dev/zero", O_RDWR, 0);
608     char *ptr = mmap(0, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
609     close(fd);
610     return ptr;
611 #endif
612 }
613 
614 int
main(int argc,char * argv[])615 main(int argc, char *argv[])
616 {
617     int i, ret;
618     socklen_t salen;
619     char *path, *pidfile;
620 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
621     struct sockaddr_un sa;
622 #else
623     struct sockaddr_in sa;
624 #endif
625     struct timeval tv;
626     fd_set rdset;
627 
628     setproctitle_init(argc, argv);
629 
630     openlog("ovdb_server", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
631     message_program_name = "ovdb_server";
632 
633     if(argc != 2 || strcmp(argv[1], SPACES))
634         die("should be started by ovdb_init");
635     message_handlers_warn(1, message_log_syslog_err);
636     message_handlers_die(1, message_log_syslog_err);
637 
638     if (!innconf_read(NULL))
639 	exit(1);
640 
641     if(strcmp(innconf->ovmethod, "ovdb"))
642         die("ovmethod not set to ovdb in inn.conf");
643 
644     read_ovdb_conf();
645 
646 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
647     listensock = socket(AF_UNIX, SOCK_STREAM, 0);
648 #else
649     listensock = socket(AF_INET, SOCK_STREAM, 0);
650 #endif
651     if(listensock < 0)
652         sysdie("cannot create socket");
653 
654     fdflag_nonblocking(listensock, 1);
655 
656     memset(&sa, 0, sizeof sa);
657 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
658     sa.sun_family = AF_UNIX;
659     path = concatpath(innconf->pathrun, OVDB_SERVER_SOCKET);
660     strlcpy(sa.sun_path, path, sizeof(sa.sun_path));
661     unlink(sa.sun_path);
662     free(path);
663     ret = bind(listensock, (struct sockaddr *)&sa, SUN_LEN(&sa));
664 #else
665     sa.sin_family = AF_INET;
666     sa.sin_port = htons(OVDB_SERVER_PORT);
667     sa.sin_addr.s_addr = htonl(0x7f000001UL);
668 
669     ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
670 
671     if(ret != 0 && errno == EADDRNOTAVAIL) {
672 	sa.sin_family = AF_INET;
673 	sa.sin_port = htons(OVDB_SERVER_PORT);
674 	sa.sin_addr.s_addr = INADDR_ANY;
675 	ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
676     }
677 #endif
678 
679     if(ret != 0)
680         sysdie("cannot bind socket");
681     if(listen(listensock, MAXLISTEN) < 0)
682         sysdie("cannot listen on socket");
683 
684     pidfile = concatpath(innconf->pathrun, OVDB_SERVER_PIDFILE);
685     if(putpid(pidfile))
686         exit(1);
687 
688     xsignal_norestart(SIGINT, sigfunc);
689     xsignal_norestart(SIGTERM, sigfunc);
690     xsignal_norestart(SIGHUP, sigfunc);
691 
692     xsignal_norestart(SIGUSR1, parentsig);
693     xsignal_norestart(SIGCHLD, childsig);
694     parent = getpid();
695 
696     children = sharemem(sizeof(struct child) * (ovdb_conf.numrsprocs+1));
697 
698     if(children == NULL)
699         sysdie("cannot mmap shared memory");
700     for(i = 0; i < ovdb_conf.numrsprocs+1; i++) {
701 	children[i].pid = -1;
702 	children[i].num = 0;
703     }
704 
705     for(i = 0; i < ovdb_conf.numrsprocs; i++) {
706 	if((children[i].pid = serverproc(i)) == -1) {
707 	    for(i--; i >= 0; i--)
708 		kill(children[i].pid, SIGTERM);
709 	    exit(1);
710 	}
711 	children[i].started = time(NULL);
712 	sleep(1);
713     }
714 
715     while(!signalled) {
716 	if(reap())
717 	    break;
718 
719 	if(wholistens == -2) {
720 	    FD_ZERO(&rdset);
721 	    FD_SET(listensock, &rdset);
722 	    tv.tv_usec = 0;
723 	    tv.tv_sec = SELECT_TIMEOUT;
724 	    ret = select(listensock+1, &rdset, NULL, NULL, &tv);
725 
726 	    if(ret == 1 && wholistens == -2) {
727 		salen = sizeof(sa);
728 		ret = accept(listensock, (struct sockaddr *)&sa, &salen);
729 		if(ret >= 0)
730 		   close(ret);
731 	    }
732 	} else {
733 	    pause();
734 	}
735     }
736 
737     for(i = 0; i < ovdb_conf.numrsprocs; i++)
738 	if(children[i].pid != -1)
739 	    kill(children[i].pid, SIGTERM);
740 
741     while(wait(&ret) > 0)
742 	;
743 
744     unlink(pidfile);
745 
746     exit(0);
747 }
748 
749 
750 #endif /* HAVE_BDB */
751