1 /*
2 * ovdb_server.c
3 * ovdb read server
4 */
5
6 #include "config.h"
7 #include "clibrary.h"
8 #include "portable/mmap.h"
9 #include "portable/setproctitle.h"
10 #include "portable/socket.h"
11 #include <errno.h>
12 #include <fcntl.h>
13 #include <signal.h>
14 #ifdef HAVE_SYS_SELECT_H
15 # include <sys/select.h>
16 #endif
17 #include <syslog.h>
18
19 #ifdef HAVE_SYS_TIME_H
20 # include <sys/time.h>
21 #endif
22 #include <time.h>
23
24 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
25 # include "portable/socket-unix.h"
26 #endif
27 #include <sys/wait.h>
28
29 #include "inn/fdflag.h"
30 #include "inn/innconf.h"
31 #include "inn/messages.h"
32 #include "inn/libinn.h"
33 #include "inn/paths.h"
34 #include "inn/storage.h"
35 #include "inn/ov.h"
36
37 #include "../storage/ovdb/ovdb.h"
38 #include "../storage/ovdb/ovdb-private.h"
39
40 #ifndef HAVE_BDB
41
42 int
main(int argc UNUSED,char ** argv UNUSED)43 main(int argc UNUSED, char **argv UNUSED)
44 {
45 die("Berkeley DB support not compiled");
46 }
47
48 #else /* HAVE_BDB */
49
50
51 #define SELECT_TIMEOUT 15
52
53
54 /* This will work unless user sets a larger clienttimeout
55 in readers.conf */
56 #define CLIENT_TIMEOUT (innconf->clienttimeout + 60)
57 /*#define CLIENT_TIMEOUT 3600*/
58
59
60 static int listensock;
61
62 #define MODE_READ 0
63 #define MODE_WRITE 1
64 #define MODE_CLOSED 2
65 #define STATE_READCMD 0
66 #define STATE_READGROUP 1
67 struct reader {
68 int fd;
69 int mode;
70 int state;
71 int buflen;
72 int bufpos;
73 void *buf;
74 time_t lastactive;
75 void *currentsearch;
76 };
77
78 static struct reader *readertab;
79 static int readertablen;
80 static int numreaders;
81 static time_t now;
82 static pid_t parent;
83
84 struct child {
85 pid_t pid;
86 int num;
87 time_t started;
88 };
89 static struct child *children;
90 #define wholistens (children[ovdb_conf.numrsprocs].num)
91
92 static int signalled = 0;
93 static void
sigfunc(int sig UNUSED)94 sigfunc(int sig UNUSED)
95 {
96 signalled = 1;
97 }
98
99 static int updated = 0;
100 static void
childsig(int sig UNUSED)101 childsig(int sig UNUSED)
102 {
103 updated = 1;
104 }
105
106 static void
parentsig(int sig UNUSED)107 parentsig(int sig UNUSED)
108 {
109 int i, which, smallest;
110 if(wholistens < 0) {
111 which = smallest = -1;
112 for(i = 0; i < ovdb_conf.numrsprocs; i++) {
113 if(children[i].pid == -1)
114 continue;
115 if(!ovdb_conf.maxrsconn || children[i].num <= ovdb_conf.maxrsconn) {
116 if(smallest == -1 || children[i].num < smallest) {
117 smallest = children[i].num;
118 which = i;
119 }
120 }
121 }
122 if(which != -1) {
123 wholistens = which;
124 kill(children[which].pid, SIGUSR1);
125 } else {
126 wholistens = -2;
127 }
128 updated = 1;
129 }
130 }
131
putpid(const char * path)132 static int putpid(const char *path)
133 {
134 char buf[30];
135 int fd = open(path, O_WRONLY|O_TRUNC|O_CREAT, 0664);
136 if(fd == -1) {
137 syswarn("cannot open %s", path);
138 return -1;
139 }
140 snprintf(buf, sizeof(buf), "%d\n", getpid());
141 if(write(fd, buf, strlen(buf)) < 0) {
142 syswarn("cannot write to %s", path);
143 close(fd);
144 return -1;
145 }
146 close(fd);
147 return 0;
148 }
149
150 static void
do_groupstats(struct reader * r)151 do_groupstats(struct reader *r)
152 {
153 struct rs_groupstats *reply;
154 char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
155 reply = xmalloc(sizeof(struct rs_groupstats));
156
157 debug("OVDB: rs: do_groupstats '%s'", group);
158 if(ovdb_groupstats(group, &reply->lo, &reply->hi, &reply->count, &reply->flag)) {
159 reply->status = CMD_GROUPSTATS;
160 reply->aliaslen = 0;
161 } else {
162 reply->status = CMD_GROUPSTATS | RPLY_ERROR;
163 }
164 free(r->buf);
165 r->buf = reply;
166 r->buflen = sizeof(struct rs_groupstats);
167 r->bufpos = 0;
168 r->mode = MODE_WRITE;
169 }
170
171 static void
do_opensrch(struct reader * r)172 do_opensrch(struct reader *r)
173 {
174 struct rs_cmd *cmd = r->buf;
175 struct rs_opensrch *reply;
176 char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
177 reply = xmalloc(sizeof(struct rs_opensrch));
178
179 debug("OVDB: rs: do_opensrch '%s' %d %d", group, cmd->artlo, cmd->arthi);
180
181 if(r->currentsearch != NULL) {
182 /* can only open one search at a time */
183 reply->status = CMD_OPENSRCH | RPLY_ERROR;
184 } else {
185 reply->handle = ovdb_opensearch(group, cmd->artlo, cmd->arthi);
186 if(reply->handle == NULL) {
187 reply->status = CMD_OPENSRCH | RPLY_ERROR;
188 } else {
189 reply->status = CMD_OPENSRCH;
190 }
191 r->currentsearch = reply->handle;
192 }
193 free(r->buf);
194 r->buf = reply;
195 r->buflen = sizeof(struct rs_opensrch);
196 r->bufpos = 0;
197 r->mode = MODE_WRITE;
198 }
199
200 static void
do_srch(struct reader * r)201 do_srch(struct reader *r)
202 {
203 struct rs_cmd *cmd = r->buf;
204 struct rs_srch *reply;
205 ARTNUM artnum;
206 TOKEN token;
207 time_t arrived;
208 int len;
209 char *data;
210
211 if(ovdb_search(cmd->handle, &artnum, &data, &len, &token, &arrived)) {
212 reply = xmalloc(sizeof(struct rs_srch) + len);
213 reply->status = CMD_SRCH;
214 reply->artnum = artnum;
215 reply->token = token;
216 reply->arrived = arrived;
217 reply->len = len;
218 memcpy((char *)reply + sizeof(struct rs_srch), data, len);
219 r->buflen = sizeof(struct rs_srch) + len;
220 } else {
221 reply = xmalloc(sizeof(struct rs_srch));
222 reply->status = CMD_SRCH | RPLY_ERROR;
223 r->buflen = sizeof(struct rs_srch);
224 }
225 free(r->buf);
226 r->buf = reply;
227 r->bufpos = 0;
228 r->mode = MODE_WRITE;
229 }
230
231 static void
do_closesrch(struct reader * r)232 do_closesrch(struct reader *r)
233 {
234 struct rs_cmd *cmd = r->buf;
235
236 ovdb_closesearch(cmd->handle);
237 free(r->buf);
238 r->buf = NULL;
239 r->bufpos = r->buflen = 0;
240 r->mode = MODE_READ;
241 r->currentsearch = NULL;
242 }
243
244 static void
do_artinfo(struct reader * r)245 do_artinfo(struct reader *r)
246 {
247 struct rs_cmd *cmd = r->buf;
248 struct rs_artinfo *reply;
249 char *group = (char *)(r->buf) + sizeof(struct rs_cmd);
250 TOKEN token;
251
252 debug("OVDB: rs: do_artinfo: '%s' %d", group, cmd->artlo);
253 if(ovdb_getartinfo(group, cmd->artlo, &token)) {
254 reply = xmalloc(sizeof(struct rs_artinfo));
255 reply->status = CMD_ARTINFO;
256 reply->token = token;
257 r->buflen = sizeof(struct rs_artinfo);
258 } else {
259 reply = xmalloc(sizeof(struct rs_artinfo));
260 reply->status = CMD_ARTINFO | RPLY_ERROR;
261 r->buflen = sizeof(struct rs_artinfo);
262 }
263 free(r->buf);
264 r->buf = reply;
265 r->bufpos = 0;
266 r->mode = MODE_WRITE;
267 }
268
269
270 static int
process_cmd(struct reader * r)271 process_cmd(struct reader *r)
272 {
273 struct rs_cmd *cmd = r->buf;
274
275 if(r->state == STATE_READCMD) {
276 switch(cmd->what) {
277 case CMD_GROUPSTATS:
278 case CMD_OPENSRCH:
279 case CMD_ARTINFO:
280 r->state = STATE_READGROUP;
281 if(cmd->grouplen == 0) {
282 /* shouldn't happen... */
283 r->mode = MODE_CLOSED;
284 close(r->fd);
285 free(r->buf);
286 r->buf = NULL;
287 return 0;
288 }
289 r->buflen += cmd->grouplen;
290 r->buf = xrealloc(r->buf, r->buflen);
291 return 1;
292 }
293 }
294
295 switch(cmd->what) {
296 case CMD_GROUPSTATS:
297 ((char *)r->buf)[r->buflen - 1] = 0; /* make sure group is null-terminated */
298 do_groupstats(r);
299 break;
300 case CMD_OPENSRCH:
301 ((char *)r->buf)[r->buflen - 1] = 0;
302 do_opensrch(r);
303 break;
304 case CMD_SRCH:
305 do_srch(r);
306 break;
307 case CMD_CLOSESRCH:
308 do_closesrch(r);
309 break;
310 case CMD_ARTINFO:
311 ((char *)r->buf)[r->buflen - 1] = 0;
312 do_artinfo(r);
313 break;
314 default:
315 r->mode = MODE_CLOSED;
316 close(r->fd);
317 free(r->buf);
318 r->buf = NULL;
319 break;
320 }
321
322 return 0;
323 }
324
325 static void
handle_read(struct reader * r)326 handle_read(struct reader *r)
327 {
328 int n;
329 r->lastactive = now;
330
331 if(r->buf == NULL) {
332 r->state = STATE_READCMD;
333 r->buf = xmalloc(sizeof(struct rs_cmd));
334 r->buflen = sizeof(struct rs_cmd);
335 r->bufpos = 0;
336 }
337 again:
338 n = read(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
339 if(n <= 0) {
340 if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
341 return;
342 r->mode = MODE_CLOSED;
343 close(r->fd);
344 free(r->buf);
345 r->buf = NULL;
346 }
347 r->bufpos += n;
348
349 if(r->bufpos >= r->buflen)
350 if(process_cmd(r))
351 goto again;
352 }
353
354 static void
handle_write(struct reader * r)355 handle_write(struct reader *r)
356 {
357 int n;
358 r->lastactive = now;
359
360 if(r->buf == NULL) /* shouldn't happen */
361 return;
362
363 n = write(r->fd, (char *)(r->buf) + r->bufpos, r->buflen - r->bufpos);
364 if(n <= 0) {
365 if(n < 0 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
366 return;
367 r->mode = MODE_CLOSED;
368 close(r->fd);
369 free(r->buf);
370 r->buf = NULL;
371 }
372 r->bufpos += n;
373
374 if(r->bufpos >= r->buflen) {
375 free(r->buf);
376 r->buf = NULL;
377 r->bufpos = r->buflen = 0;
378 r->mode = MODE_READ;
379 }
380 }
381
382 static void
newclient(int fd)383 newclient(int fd)
384 {
385 struct reader *r;
386 int i;
387
388 fdflag_nonblocking(fd, 1);
389
390 if(numreaders >= readertablen) {
391 readertablen += 50;
392 readertab = xrealloc(readertab, readertablen * sizeof(struct reader));
393 for(i = numreaders; i < readertablen; i++) {
394 readertab[i].mode = MODE_CLOSED;
395 readertab[i].buf = NULL;
396 }
397 }
398
399 r = &(readertab[numreaders]);
400 numreaders++;
401
402 r->fd = fd;
403 r->mode = MODE_WRITE;
404 r->buflen = sizeof(OVDB_SERVER_BANNER);
405 r->bufpos = 0;
406 r->buf = xstrdup(OVDB_SERVER_BANNER);
407 r->lastactive = now;
408 r->currentsearch = NULL;
409
410 handle_write(r);
411 }
412
413 static void
delclient(int which)414 delclient(int which)
415 {
416 int i;
417 struct reader *r = &(readertab[which]);
418
419 if(r->mode != MODE_CLOSED)
420 close(r->fd);
421
422 if(r->buf != NULL) {
423 free(r->buf);
424 }
425 if(r->currentsearch != NULL) {
426 ovdb_closesearch(r->currentsearch);
427 r->currentsearch = NULL;
428 }
429
430 /* numreaders will get decremented by the calling function */
431 for(i = which; i < numreaders-1; i++)
432 readertab[i] = readertab[i+1];
433
434 readertab[i].mode = MODE_CLOSED;
435 readertab[i].buf = NULL;
436 }
437
438 static pid_t
serverproc(int me)439 serverproc(int me)
440 {
441 fd_set rdset, wrset;
442 int i, ret, count, lastfd, lastnumreaders;
443 socklen_t salen;
444 struct sockaddr_in sa;
445 struct timeval tv;
446 pid_t pid;
447
448 pid = fork();
449 if (pid != 0)
450 return pid;
451
452 if (!ovdb_open(OV_READ|OVDB_SERVER))
453 die("cannot open overview");
454 xsignal_norestart(SIGINT, sigfunc);
455 xsignal_norestart(SIGTERM, sigfunc);
456 xsignal_norestart(SIGHUP, sigfunc);
457 xsignal_norestart(SIGUSR1, childsig);
458 xsignal(SIGPIPE, SIG_IGN);
459
460 numreaders = lastnumreaders = 0;
461 if(ovdb_conf.maxrsconn) {
462 readertablen = ovdb_conf.maxrsconn;
463 } else {
464 readertablen = 50;
465 }
466 readertab = xmalloc(readertablen * sizeof(struct reader));
467 for(i = 0; i < readertablen; i++) {
468 readertab[i].mode = MODE_CLOSED;
469 readertab[i].buf = NULL;
470 }
471
472 setproctitle("0 clients");
473
474 /* main loop */
475 while(!signalled) {
476 FD_ZERO(&rdset);
477 FD_ZERO(&wrset);
478 lastfd = 0;
479 if(wholistens == me) {
480 if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
481 FD_SET(listensock, &rdset);
482 lastfd = listensock;
483 setproctitle("%d client%s *", numreaders,
484 numreaders == 1 ? "" : "s");
485 } else {
486 wholistens = -1;
487 kill(parent, SIGUSR1);
488 }
489 }
490
491 for(i = 0; i < numreaders; i++) {
492 switch(readertab[i].mode) {
493 case MODE_READ:
494 FD_SET(readertab[i].fd, &rdset);
495 break;
496 case MODE_WRITE:
497 FD_SET(readertab[i].fd, &wrset);
498 break;
499 default:
500 continue;
501 }
502 if(readertab[i].fd > lastfd)
503 lastfd = readertab[i].fd;
504 }
505 tv.tv_usec = 0;
506 tv.tv_sec = SELECT_TIMEOUT;
507 count = select(lastfd + 1, &rdset, &wrset, NULL, &tv);
508
509 if(signalled)
510 break;
511 if(count <= 0)
512 continue;
513
514 now = time(NULL);
515
516 if(FD_ISSET(listensock, &rdset)) {
517 if(!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
518 salen = sizeof(sa);
519 ret = accept(listensock, (struct sockaddr *)&sa, &salen);
520 if(ret >= 0) {
521 newclient(ret);
522 wholistens = -1;
523 children[me].num = numreaders;
524 kill(parent, SIGUSR1);
525 }
526 }
527 }
528
529 for(i = 0; i < numreaders; i++) {
530 switch(readertab[i].mode) {
531 case MODE_READ:
532 if(FD_ISSET(readertab[i].fd, &rdset))
533 handle_read(&(readertab[i]));
534 break;
535 case MODE_WRITE:
536 if(FD_ISSET(readertab[i].fd, &wrset))
537 handle_write(&(readertab[i]));
538 break;
539 }
540 }
541
542 for(i = 0; i < numreaders; i++) {
543 if(readertab[i].mode == MODE_CLOSED
544 || (time_t) (readertab[i].lastactive + CLIENT_TIMEOUT) < now) {
545 delclient(i);
546 numreaders--;
547 i--;
548 }
549 }
550 if(children[me].num != numreaders) {
551 children[me].num = numreaders;
552 kill(parent, SIGUSR1);
553 }
554 if(numreaders != lastnumreaders) {
555 lastnumreaders = numreaders;
556 setproctitle("%d client%s", numreaders,
557 numreaders == 1 ? "" : "s");
558 }
559 }
560
561 ovdb_close();
562 exit(0);
563 }
564
565 static int
reap(void)566 reap(void)
567 {
568 int i, cs;
569 pid_t c;
570
571 while((c = waitpid(-1, &cs, WNOHANG)) > 0) {
572 for(i = 0; i < ovdb_conf.numrsprocs; i++) {
573 if(c == children[i].pid) {
574 if(children[i].started + 30 > time(NULL))
575 return 1;
576
577 children[i].num = 0;
578
579 if(wholistens == i)
580 wholistens = -1;
581
582 if((children[i].pid = serverproc(i)) == -1)
583 return 1;
584
585 children[i].started = time(NULL);
586 break;
587 }
588 }
589 }
590 if(wholistens == -1)
591 parentsig(SIGUSR1);
592 return 0;
593 }
594
595 #ifndef MAP_ANON
596 #ifdef MAP_ANONYMOUS
597 #define MAP_ANON MAP_ANONYMOUS
598 #endif
599 #endif
600
601 static void *
sharemem(size_t len)602 sharemem(size_t len)
603 {
604 #ifdef MAP_ANON
605 return mmap(0, len, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);
606 #else
607 int fd = open("/dev/zero", O_RDWR, 0);
608 char *ptr = mmap(0, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
609 close(fd);
610 return ptr;
611 #endif
612 }
613
614 int
main(int argc,char * argv[])615 main(int argc, char *argv[])
616 {
617 int i, ret;
618 socklen_t salen;
619 char *path, *pidfile;
620 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
621 struct sockaddr_un sa;
622 #else
623 struct sockaddr_in sa;
624 #endif
625 struct timeval tv;
626 fd_set rdset;
627
628 setproctitle_init(argc, argv);
629
630 openlog("ovdb_server", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
631 message_program_name = "ovdb_server";
632
633 if(argc != 2 || strcmp(argv[1], SPACES))
634 die("should be started by ovdb_init");
635 message_handlers_warn(1, message_log_syslog_err);
636 message_handlers_die(1, message_log_syslog_err);
637
638 if (!innconf_read(NULL))
639 exit(1);
640
641 if(strcmp(innconf->ovmethod, "ovdb"))
642 die("ovmethod not set to ovdb in inn.conf");
643
644 read_ovdb_conf();
645
646 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
647 listensock = socket(AF_UNIX, SOCK_STREAM, 0);
648 #else
649 listensock = socket(AF_INET, SOCK_STREAM, 0);
650 #endif
651 if(listensock < 0)
652 sysdie("cannot create socket");
653
654 fdflag_nonblocking(listensock, 1);
655
656 memset(&sa, 0, sizeof sa);
657 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
658 sa.sun_family = AF_UNIX;
659 path = concatpath(innconf->pathrun, OVDB_SERVER_SOCKET);
660 strlcpy(sa.sun_path, path, sizeof(sa.sun_path));
661 unlink(sa.sun_path);
662 free(path);
663 ret = bind(listensock, (struct sockaddr *)&sa, SUN_LEN(&sa));
664 #else
665 sa.sin_family = AF_INET;
666 sa.sin_port = htons(OVDB_SERVER_PORT);
667 sa.sin_addr.s_addr = htonl(0x7f000001UL);
668
669 ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
670
671 if(ret != 0 && errno == EADDRNOTAVAIL) {
672 sa.sin_family = AF_INET;
673 sa.sin_port = htons(OVDB_SERVER_PORT);
674 sa.sin_addr.s_addr = INADDR_ANY;
675 ret = bind(listensock, (struct sockaddr *)&sa, sizeof sa);
676 }
677 #endif
678
679 if(ret != 0)
680 sysdie("cannot bind socket");
681 if(listen(listensock, MAXLISTEN) < 0)
682 sysdie("cannot listen on socket");
683
684 pidfile = concatpath(innconf->pathrun, OVDB_SERVER_PIDFILE);
685 if(putpid(pidfile))
686 exit(1);
687
688 xsignal_norestart(SIGINT, sigfunc);
689 xsignal_norestart(SIGTERM, sigfunc);
690 xsignal_norestart(SIGHUP, sigfunc);
691
692 xsignal_norestart(SIGUSR1, parentsig);
693 xsignal_norestart(SIGCHLD, childsig);
694 parent = getpid();
695
696 children = sharemem(sizeof(struct child) * (ovdb_conf.numrsprocs+1));
697
698 if(children == NULL)
699 sysdie("cannot mmap shared memory");
700 for(i = 0; i < ovdb_conf.numrsprocs+1; i++) {
701 children[i].pid = -1;
702 children[i].num = 0;
703 }
704
705 for(i = 0; i < ovdb_conf.numrsprocs; i++) {
706 if((children[i].pid = serverproc(i)) == -1) {
707 for(i--; i >= 0; i--)
708 kill(children[i].pid, SIGTERM);
709 exit(1);
710 }
711 children[i].started = time(NULL);
712 sleep(1);
713 }
714
715 while(!signalled) {
716 if(reap())
717 break;
718
719 if(wholistens == -2) {
720 FD_ZERO(&rdset);
721 FD_SET(listensock, &rdset);
722 tv.tv_usec = 0;
723 tv.tv_sec = SELECT_TIMEOUT;
724 ret = select(listensock+1, &rdset, NULL, NULL, &tv);
725
726 if(ret == 1 && wholistens == -2) {
727 salen = sizeof(sa);
728 ret = accept(listensock, (struct sockaddr *)&sa, &salen);
729 if(ret >= 0)
730 close(ret);
731 }
732 } else {
733 pause();
734 }
735 }
736
737 for(i = 0; i < ovdb_conf.numrsprocs; i++)
738 if(children[i].pid != -1)
739 kill(children[i].pid, SIGTERM);
740
741 while(wait(&ret) > 0)
742 ;
743
744 unlink(pidfile);
745
746 exit(0);
747 }
748
749
750 #endif /* HAVE_BDB */
751