1 /*
2 * ovdb_server.c
3 * ovdb read server
4 */
5
6 #include "portable/system.h"
7
8 #include "portable/mmap.h"
9 #include "portable/setproctitle.h"
10 #include "portable/socket.h"
11 #include <errno.h>
12 #include <fcntl.h>
13 #include <signal.h>
14 #ifdef HAVE_SYS_SELECT_H
15 # include <sys/select.h>
16 #endif
17 #include <syslog.h>
18
19 #ifdef HAVE_SYS_TIME_H
20 # include <sys/time.h>
21 #endif
22 #include <time.h>
23
24 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
25 # include "portable/socket-unix.h"
26 #endif
27 #include <sys/wait.h>
28
29 #include "inn/fdflag.h"
30 #include "inn/innconf.h"
31 #include "inn/libinn.h"
32 #include "inn/messages.h"
33 #include "inn/ov.h"
34 #include "inn/paths.h"
35 #include "inn/storage.h"
36
37 #include "../storage/ovdb/ovdb-private.h"
38 #include "../storage/ovdb/ovdb.h"
39
40 #ifndef HAVE_BDB
41
42 int
main(int argc UNUSED,char ** argv UNUSED)43 main(int argc UNUSED, char **argv UNUSED)
44 {
45 die("Berkeley DB support not compiled");
46 }
47
48 #else /* HAVE_BDB */
49
50
51 # define SELECT_TIMEOUT 15
52
53
54 /* This will work unless user sets a larger clienttimeout
55 in readers.conf */
56 # define CLIENT_TIMEOUT (innconf->clienttimeout + 60)
57 /*#define CLIENT_TIMEOUT 3600*/
58
59
60 static int listensock;
61
62 # define MODE_READ 0
63 # define MODE_WRITE 1
64 # define MODE_CLOSED 2
65 # define STATE_READCMD 0
66 # define STATE_READGROUP 1
67 struct reader {
68 int fd;
69 int mode;
70 int state;
71 int buflen;
72 int bufpos;
73 void *buf;
74 time_t lastactive;
75 void *currentsearch;
76 };
77
78 static struct reader *readertab;
79 static int readertablen;
80 static int numreaders;
81 static time_t now;
82 static pid_t parent;
83
84 struct child {
85 pid_t pid;
86 int num;
87 time_t started;
88 };
89 static struct child *children;
90 # define wholistens (children[ovdb_conf.numrsprocs].num)
91
92 static int signalled = 0;
93 static void
sigfunc(int sig UNUSED)94 sigfunc(int sig UNUSED)
95 {
96 signalled = 1;
97 }
98
99 static int updated = 0;
100 static void
childsig(int sig UNUSED)101 childsig(int sig UNUSED)
102 {
103 updated = 1;
104 }
105
106 static void
parentsig(int sig UNUSED)107 parentsig(int sig UNUSED)
108 {
109 int i, which, smallest;
110 if (wholistens < 0) {
111 which = smallest = -1;
112 for (i = 0; i < ovdb_conf.numrsprocs; i++) {
113 if (children[i].pid == -1)
114 continue;
115 if (!ovdb_conf.maxrsconn
116 || children[i].num <= ovdb_conf.maxrsconn) {
117 if (smallest == -1 || children[i].num < smallest) {
118 smallest = children[i].num;
119 which = i;
120 }
121 }
122 }
123 if (which != -1) {
124 wholistens = which;
125 kill(children[which].pid, SIGUSR1);
126 } else {
127 wholistens = -2;
128 }
129 updated = 1;
130 }
131 }
132
133 static int
putpid(const char * path)134 putpid(const char *path)
135 {
136 char buf[30];
137 int fd = open(path, O_WRONLY | O_TRUNC | O_CREAT, 0664);
138 if (fd == -1) {
139 syswarn("cannot open %s", path);
140 return -1;
141 }
142 snprintf(buf, sizeof(buf), "%ld\n", (long) getpid());
143 if (write(fd, buf, strlen(buf)) < 0) {
144 syswarn("cannot write to %s", path);
145 close(fd);
146 return -1;
147 }
148 close(fd);
149 return 0;
150 }
151
152 static void
do_groupstats(struct reader * r)153 do_groupstats(struct reader *r)
154 {
155 struct rs_groupstats *reply;
156 char *group = (char *) (r->buf) + sizeof(struct rs_cmd);
157 reply = xmalloc(sizeof(struct rs_groupstats));
158
159 debug("OVDB: rs: do_groupstats '%s'", group);
160 if (ovdb_groupstats(group, &reply->lo, &reply->hi, &reply->count,
161 &reply->flag)) {
162 reply->status = CMD_GROUPSTATS;
163 reply->aliaslen = 0;
164 } else {
165 reply->status = CMD_GROUPSTATS | RPLY_ERROR;
166 }
167 free(r->buf);
168 r->buf = reply;
169 r->buflen = sizeof(struct rs_groupstats);
170 r->bufpos = 0;
171 r->mode = MODE_WRITE;
172 }
173
174 static void
do_opensrch(struct reader * r)175 do_opensrch(struct reader *r)
176 {
177 struct rs_cmd *cmd = r->buf;
178 struct rs_opensrch *reply;
179 char *group = (char *) (r->buf) + sizeof(struct rs_cmd);
180 reply = xmalloc(sizeof(struct rs_opensrch));
181
182 debug("OVDB: rs: do_opensrch '%s' %d %d", group, cmd->artlo, cmd->arthi);
183
184 if (r->currentsearch != NULL) {
185 /* can only open one search at a time */
186 reply->status = CMD_OPENSRCH | RPLY_ERROR;
187 } else {
188 reply->handle = ovdb_opensearch(group, cmd->artlo, cmd->arthi);
189 if (reply->handle == NULL) {
190 reply->status = CMD_OPENSRCH | RPLY_ERROR;
191 } else {
192 reply->status = CMD_OPENSRCH;
193 }
194 r->currentsearch = reply->handle;
195 }
196 free(r->buf);
197 r->buf = reply;
198 r->buflen = sizeof(struct rs_opensrch);
199 r->bufpos = 0;
200 r->mode = MODE_WRITE;
201 }
202
203 static void
do_srch(struct reader * r)204 do_srch(struct reader *r)
205 {
206 struct rs_cmd *cmd = r->buf;
207 struct rs_srch *reply;
208 ARTNUM artnum;
209 TOKEN token;
210 time_t arrived;
211 int len;
212 char *data;
213
214 if (ovdb_search(cmd->handle, &artnum, &data, &len, &token, &arrived)) {
215 reply = xmalloc(sizeof(struct rs_srch) + len);
216 reply->status = CMD_SRCH;
217 reply->artnum = artnum;
218 reply->token = token;
219 reply->arrived = arrived;
220 reply->len = len;
221 memcpy((char *) reply + sizeof(struct rs_srch), data, len);
222 r->buflen = sizeof(struct rs_srch) + len;
223 } else {
224 reply = xmalloc(sizeof(struct rs_srch));
225 reply->status = CMD_SRCH | RPLY_ERROR;
226 r->buflen = sizeof(struct rs_srch);
227 }
228 free(r->buf);
229 r->buf = reply;
230 r->bufpos = 0;
231 r->mode = MODE_WRITE;
232 }
233
234 static void
do_closesrch(struct reader * r)235 do_closesrch(struct reader *r)
236 {
237 struct rs_cmd *cmd = r->buf;
238
239 ovdb_closesearch(cmd->handle);
240 free(r->buf);
241 r->buf = NULL;
242 r->bufpos = r->buflen = 0;
243 r->mode = MODE_READ;
244 r->currentsearch = NULL;
245 }
246
247 static void
do_artinfo(struct reader * r)248 do_artinfo(struct reader *r)
249 {
250 struct rs_cmd *cmd = r->buf;
251 struct rs_artinfo *reply;
252 char *group = (char *) (r->buf) + sizeof(struct rs_cmd);
253 TOKEN token;
254
255 debug("OVDB: rs: do_artinfo: '%s' %d", group, cmd->artlo);
256 if (ovdb_getartinfo(group, cmd->artlo, &token)) {
257 reply = xmalloc(sizeof(struct rs_artinfo));
258 reply->status = CMD_ARTINFO;
259 reply->token = token;
260 r->buflen = sizeof(struct rs_artinfo);
261 } else {
262 reply = xmalloc(sizeof(struct rs_artinfo));
263 reply->status = CMD_ARTINFO | RPLY_ERROR;
264 r->buflen = sizeof(struct rs_artinfo);
265 }
266 free(r->buf);
267 r->buf = reply;
268 r->bufpos = 0;
269 r->mode = MODE_WRITE;
270 }
271
272
273 static int
process_cmd(struct reader * r)274 process_cmd(struct reader *r)
275 {
276 struct rs_cmd *cmd = r->buf;
277
278 if (r->state == STATE_READCMD) {
279 switch (cmd->what) {
280 case CMD_GROUPSTATS:
281 case CMD_OPENSRCH:
282 case CMD_ARTINFO:
283 r->state = STATE_READGROUP;
284 if (cmd->grouplen == 0) {
285 /* shouldn't happen... */
286 r->mode = MODE_CLOSED;
287 close(r->fd);
288 free(r->buf);
289 r->buf = NULL;
290 return 0;
291 }
292 r->buflen += cmd->grouplen;
293 r->buf = xrealloc(r->buf, r->buflen);
294 return 1;
295 }
296 }
297
298 switch (cmd->what) {
299 case CMD_GROUPSTATS:
300 ((char *) r->buf)[r->buflen - 1] =
301 0; /* make sure group is null-terminated */
302 do_groupstats(r);
303 break;
304 case CMD_OPENSRCH:
305 ((char *) r->buf)[r->buflen - 1] = 0;
306 do_opensrch(r);
307 break;
308 case CMD_SRCH:
309 do_srch(r);
310 break;
311 case CMD_CLOSESRCH:
312 do_closesrch(r);
313 break;
314 case CMD_ARTINFO:
315 ((char *) r->buf)[r->buflen - 1] = 0;
316 do_artinfo(r);
317 break;
318 default:
319 r->mode = MODE_CLOSED;
320 close(r->fd);
321 free(r->buf);
322 r->buf = NULL;
323 break;
324 }
325
326 return 0;
327 }
328
329 static void
handle_read(struct reader * r)330 handle_read(struct reader *r)
331 {
332 int n;
333 r->lastactive = now;
334
335 if (r->buf == NULL) {
336 r->state = STATE_READCMD;
337 r->buf = xmalloc(sizeof(struct rs_cmd));
338 r->buflen = sizeof(struct rs_cmd);
339 r->bufpos = 0;
340 }
341 again:
342 n = read(r->fd, (char *) (r->buf) + r->bufpos, r->buflen - r->bufpos);
343 if (n <= 0) {
344 if (n < 0
345 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
346 return;
347 r->mode = MODE_CLOSED;
348 close(r->fd);
349 free(r->buf);
350 r->buf = NULL;
351 }
352 r->bufpos += n;
353
354 if (r->bufpos >= r->buflen)
355 if (process_cmd(r))
356 goto again;
357 }
358
359 static void
handle_write(struct reader * r)360 handle_write(struct reader *r)
361 {
362 int n;
363 r->lastactive = now;
364
365 if (r->buf == NULL) /* shouldn't happen */
366 return;
367
368 n = write(r->fd, (char *) (r->buf) + r->bufpos, r->buflen - r->bufpos);
369 if (n <= 0) {
370 if (n < 0
371 && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
372 return;
373 r->mode = MODE_CLOSED;
374 close(r->fd);
375 free(r->buf);
376 r->buf = NULL;
377 }
378 r->bufpos += n;
379
380 if (r->bufpos >= r->buflen) {
381 free(r->buf);
382 r->buf = NULL;
383 r->bufpos = r->buflen = 0;
384 r->mode = MODE_READ;
385 }
386 }
387
388 static void
newclient(int fd)389 newclient(int fd)
390 {
391 struct reader *r;
392 int i;
393
394 fdflag_nonblocking(fd, 1);
395
396 if (numreaders >= readertablen) {
397 readertablen += 50;
398 readertab = xrealloc(readertab, readertablen * sizeof(struct reader));
399 for (i = numreaders; i < readertablen; i++) {
400 readertab[i].mode = MODE_CLOSED;
401 readertab[i].buf = NULL;
402 }
403 }
404
405 r = &(readertab[numreaders]);
406 numreaders++;
407
408 r->fd = fd;
409 r->mode = MODE_WRITE;
410 r->buflen = sizeof(OVDB_SERVER_BANNER);
411 r->bufpos = 0;
412 r->buf = xstrdup(OVDB_SERVER_BANNER);
413 r->lastactive = now;
414 r->currentsearch = NULL;
415
416 handle_write(r);
417 }
418
419 static void
delclient(int which)420 delclient(int which)
421 {
422 int i;
423 struct reader *r = &(readertab[which]);
424
425 if (r->mode != MODE_CLOSED)
426 close(r->fd);
427
428 if (r->buf != NULL) {
429 free(r->buf);
430 }
431 if (r->currentsearch != NULL) {
432 ovdb_closesearch(r->currentsearch);
433 r->currentsearch = NULL;
434 }
435
436 /* numreaders will get decremented by the calling function */
437 for (i = which; i < numreaders - 1; i++)
438 readertab[i] = readertab[i + 1];
439
440 readertab[i].mode = MODE_CLOSED;
441 readertab[i].buf = NULL;
442 }
443
444 static pid_t
serverproc(int me)445 serverproc(int me)
446 {
447 fd_set rdset, wrset;
448 int i, ret, count, lastfd, lastnumreaders;
449 socklen_t salen;
450 struct sockaddr_in sa;
451 struct timeval tv;
452 pid_t pid;
453
454 pid = fork();
455 if (pid != 0)
456 return pid;
457
458 if (!ovdb_open(OV_READ | OVDB_SERVER))
459 die("cannot open overview");
460 xsignal_norestart(SIGINT, sigfunc);
461 xsignal_norestart(SIGTERM, sigfunc);
462 xsignal_norestart(SIGHUP, sigfunc);
463 xsignal_norestart(SIGUSR1, childsig);
464 xsignal(SIGPIPE, SIG_IGN);
465
466 numreaders = lastnumreaders = 0;
467 if (ovdb_conf.maxrsconn) {
468 readertablen = ovdb_conf.maxrsconn;
469 } else {
470 readertablen = 50;
471 }
472 readertab = xmalloc(readertablen * sizeof(struct reader));
473 for (i = 0; i < readertablen; i++) {
474 readertab[i].mode = MODE_CLOSED;
475 readertab[i].buf = NULL;
476 }
477
478 setproctitle("0 clients");
479
480 /* main loop */
481 while (!signalled) {
482 FD_ZERO(&rdset);
483 FD_ZERO(&wrset);
484 lastfd = 0;
485 if (wholistens == me) {
486 if (!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
487 FD_SET(listensock, &rdset);
488 lastfd = listensock;
489 setproctitle("%d client%s *", numreaders,
490 numreaders == 1 ? "" : "s");
491 } else {
492 wholistens = -1;
493 kill(parent, SIGUSR1);
494 }
495 }
496
497 for (i = 0; i < numreaders; i++) {
498 switch (readertab[i].mode) {
499 case MODE_READ:
500 FD_SET(readertab[i].fd, &rdset);
501 break;
502 case MODE_WRITE:
503 FD_SET(readertab[i].fd, &wrset);
504 break;
505 default:
506 continue;
507 }
508 if (readertab[i].fd > lastfd)
509 lastfd = readertab[i].fd;
510 }
511 tv.tv_usec = 0;
512 tv.tv_sec = SELECT_TIMEOUT;
513 count = select(lastfd + 1, &rdset, &wrset, NULL, &tv);
514
515 if (signalled)
516 break;
517 if (count <= 0)
518 continue;
519
520 now = time(NULL);
521
522 if (FD_ISSET(listensock, &rdset)) {
523 if (!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
524 salen = sizeof(sa);
525 ret = accept(listensock, (struct sockaddr *) &sa, &salen);
526 if (ret >= 0) {
527 newclient(ret);
528 wholistens = -1;
529 children[me].num = numreaders;
530 kill(parent, SIGUSR1);
531 }
532 }
533 }
534
535 for (i = 0; i < numreaders; i++) {
536 switch (readertab[i].mode) {
537 case MODE_READ:
538 if (FD_ISSET(readertab[i].fd, &rdset))
539 handle_read(&(readertab[i]));
540 break;
541 case MODE_WRITE:
542 if (FD_ISSET(readertab[i].fd, &wrset))
543 handle_write(&(readertab[i]));
544 break;
545 }
546 }
547
548 for (i = 0; i < numreaders; i++) {
549 if (readertab[i].mode == MODE_CLOSED
550 || (time_t)(readertab[i].lastactive + CLIENT_TIMEOUT) < now) {
551 delclient(i);
552 numreaders--;
553 i--;
554 }
555 }
556 if (children[me].num != numreaders) {
557 children[me].num = numreaders;
558 kill(parent, SIGUSR1);
559 }
560 if (numreaders != lastnumreaders) {
561 lastnumreaders = numreaders;
562 setproctitle("%d client%s", numreaders,
563 numreaders == 1 ? "" : "s");
564 }
565 }
566
567 ovdb_close();
568 exit(0);
569 }
570
571 static int
reap(void)572 reap(void)
573 {
574 int i, cs;
575 pid_t c;
576
577 while ((c = waitpid(-1, &cs, WNOHANG)) > 0) {
578 for (i = 0; i < ovdb_conf.numrsprocs; i++) {
579 if (c == children[i].pid) {
580 if (children[i].started + 30 > time(NULL))
581 return 1;
582
583 children[i].num = 0;
584
585 if (wholistens == i)
586 wholistens = -1;
587
588 if ((children[i].pid = serverproc(i)) == -1)
589 return 1;
590
591 children[i].started = time(NULL);
592 break;
593 }
594 }
595 }
596 if (wholistens == -1)
597 parentsig(SIGUSR1);
598 return 0;
599 }
600
601 # ifndef MAP_ANON
602 # ifdef MAP_ANONYMOUS
603 # define MAP_ANON MAP_ANONYMOUS
604 # endif
605 # endif
606
607 static void *
sharemem(size_t len)608 sharemem(size_t len)
609 {
610 # ifdef MAP_ANON
611 return mmap(0, len, PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
612 # else
613 int fd = open("/dev/zero", O_RDWR, 0);
614 char *ptr = mmap(0, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
615 close(fd);
616 return ptr;
617 # endif
618 }
619
620 int
main(int argc,char * argv[])621 main(int argc, char *argv[])
622 {
623 int i, ret;
624 socklen_t salen;
625 char *path, *pidfile;
626 # ifdef HAVE_UNIX_DOMAIN_SOCKETS
627 struct sockaddr_un sa;
628 # else
629 struct sockaddr_in sa;
630 # endif
631 struct timeval tv;
632 fd_set rdset;
633
634 setproctitle_init(argc, argv);
635
636 openlog("ovdb_server", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
637 message_program_name = "ovdb_server";
638
639 if (argc != 2 || strcmp(argv[1], SPACES))
640 die("should be started by ovdb_init");
641 message_handlers_warn(1, message_log_syslog_err);
642 message_handlers_die(1, message_log_syslog_err);
643
644 if (!innconf_read(NULL))
645 exit(1);
646
647 if (strcmp(innconf->ovmethod, "ovdb"))
648 die("ovmethod not set to ovdb in inn.conf");
649
650 read_ovdb_conf();
651
652 # ifdef HAVE_UNIX_DOMAIN_SOCKETS
653 listensock = socket(AF_UNIX, SOCK_STREAM, 0);
654 # else
655 listensock = socket(AF_INET, SOCK_STREAM, 0);
656 # endif
657 if (listensock < 0)
658 sysdie("cannot create socket");
659
660 fdflag_nonblocking(listensock, 1);
661
662 memset(&sa, 0, sizeof sa);
663 # ifdef HAVE_UNIX_DOMAIN_SOCKETS
664 sa.sun_family = AF_UNIX;
665 path = concatpath(innconf->pathrun, OVDB_SERVER_SOCKET);
666 strlcpy(sa.sun_path, path, sizeof(sa.sun_path));
667 unlink(sa.sun_path);
668 free(path);
669 ret = bind(listensock, (struct sockaddr *) &sa, SUN_LEN(&sa));
670 # else
671 sa.sin_family = AF_INET;
672 sa.sin_port = htons(OVDB_SERVER_PORT);
673 sa.sin_addr.s_addr = htonl(0x7f000001UL);
674
675 ret = bind(listensock, (struct sockaddr *) &sa, sizeof sa);
676
677 if (ret != 0 && errno == EADDRNOTAVAIL) {
678 sa.sin_family = AF_INET;
679 sa.sin_port = htons(OVDB_SERVER_PORT);
680 sa.sin_addr.s_addr = INADDR_ANY;
681 ret = bind(listensock, (struct sockaddr *) &sa, sizeof sa);
682 }
683 # endif
684
685 if (ret != 0)
686 sysdie("cannot bind socket");
687 if (listen(listensock, innconf->maxlisten) < 0)
688 sysdie("cannot listen on socket");
689
690 pidfile = concatpath(innconf->pathrun, OVDB_SERVER_PIDFILE);
691 if (putpid(pidfile))
692 exit(1);
693
694 xsignal_norestart(SIGINT, sigfunc);
695 xsignal_norestart(SIGTERM, sigfunc);
696 xsignal_norestart(SIGHUP, sigfunc);
697
698 xsignal_norestart(SIGUSR1, parentsig);
699 xsignal_norestart(SIGCHLD, childsig);
700 parent = getpid();
701
702 children = sharemem(sizeof(struct child) * (ovdb_conf.numrsprocs + 1));
703
704 if (children == NULL)
705 sysdie("cannot mmap shared memory");
706 for (i = 0; i < ovdb_conf.numrsprocs + 1; i++) {
707 children[i].pid = -1;
708 children[i].num = 0;
709 }
710
711 for (i = 0; i < ovdb_conf.numrsprocs; i++) {
712 if ((children[i].pid = serverproc(i)) == -1) {
713 for (i--; i >= 0; i--)
714 kill(children[i].pid, SIGTERM);
715 exit(1);
716 }
717 children[i].started = time(NULL);
718 sleep(1);
719 }
720
721 while (!signalled) {
722 if (reap())
723 break;
724
725 if (wholistens == -2) {
726 FD_ZERO(&rdset);
727 FD_SET(listensock, &rdset);
728 tv.tv_usec = 0;
729 tv.tv_sec = SELECT_TIMEOUT;
730 ret = select(listensock + 1, &rdset, NULL, NULL, &tv);
731
732 if (ret == 1 && wholistens == -2) {
733 salen = sizeof(sa);
734 ret = accept(listensock, (struct sockaddr *) &sa, &salen);
735 if (ret >= 0)
736 close(ret);
737 }
738 } else {
739 pause();
740 }
741 }
742
743 for (i = 0; i < ovdb_conf.numrsprocs; i++)
744 if (children[i].pid != -1)
745 kill(children[i].pid, SIGTERM);
746
747 while (wait(&ret) > 0)
748 ;
749
750 unlink(pidfile);
751
752 exit(0);
753 }
754
755
756 #endif /* HAVE_BDB */
757