1 /*
2  * ovdb_server.c
3  * ovdb read server
4  */
5 
6 #include "portable/system.h"
7 
8 #include "portable/mmap.h"
9 #include "portable/setproctitle.h"
10 #include "portable/socket.h"
11 #include <errno.h>
12 #include <fcntl.h>
13 #include <signal.h>
14 #ifdef HAVE_SYS_SELECT_H
15 #    include <sys/select.h>
16 #endif
17 #include <syslog.h>
18 
19 #ifdef HAVE_SYS_TIME_H
20 #    include <sys/time.h>
21 #endif
22 #include <time.h>
23 
24 #ifdef HAVE_UNIX_DOMAIN_SOCKETS
25 #    include "portable/socket-unix.h"
26 #endif
27 #include <sys/wait.h>
28 
29 #include "inn/fdflag.h"
30 #include "inn/innconf.h"
31 #include "inn/libinn.h"
32 #include "inn/messages.h"
33 #include "inn/ov.h"
34 #include "inn/paths.h"
35 #include "inn/storage.h"
36 
37 #include "../storage/ovdb/ovdb-private.h"
38 #include "../storage/ovdb/ovdb.h"
39 
40 #ifndef HAVE_BDB
41 
42 int
main(int argc UNUSED,char ** argv UNUSED)43 main(int argc UNUSED, char **argv UNUSED)
44 {
45     die("Berkeley DB support not compiled");
46 }
47 
48 #else /* HAVE_BDB */
49 
50 
51 #    define SELECT_TIMEOUT 15
52 
53 
54 /* This will work unless user sets a larger clienttimeout
55    in readers.conf */
56 #    define CLIENT_TIMEOUT (innconf->clienttimeout + 60)
57 /*#define CLIENT_TIMEOUT 3600*/
58 
59 
60 static int listensock;
61 
62 #    define MODE_READ       0
63 #    define MODE_WRITE      1
64 #    define MODE_CLOSED     2
65 #    define STATE_READCMD   0
66 #    define STATE_READGROUP 1
67 struct reader {
68     int fd;
69     int mode;
70     int state;
71     int buflen;
72     int bufpos;
73     void *buf;
74     time_t lastactive;
75     void *currentsearch;
76 };
77 
78 static struct reader *readertab;
79 static int readertablen;
80 static int numreaders;
81 static time_t now;
82 static pid_t parent;
83 
84 struct child {
85     pid_t pid;
86     int num;
87     time_t started;
88 };
89 static struct child *children;
90 #    define wholistens      (children[ovdb_conf.numrsprocs].num)
91 
92 static int signalled = 0;
93 static void
sigfunc(int sig UNUSED)94 sigfunc(int sig UNUSED)
95 {
96     signalled = 1;
97 }
98 
99 static int updated = 0;
100 static void
childsig(int sig UNUSED)101 childsig(int sig UNUSED)
102 {
103     updated = 1;
104 }
105 
106 static void
parentsig(int sig UNUSED)107 parentsig(int sig UNUSED)
108 {
109     int i, which, smallest;
110     if (wholistens < 0) {
111         which = smallest = -1;
112         for (i = 0; i < ovdb_conf.numrsprocs; i++) {
113             if (children[i].pid == -1)
114                 continue;
115             if (!ovdb_conf.maxrsconn
116                 || children[i].num <= ovdb_conf.maxrsconn) {
117                 if (smallest == -1 || children[i].num < smallest) {
118                     smallest = children[i].num;
119                     which = i;
120                 }
121             }
122         }
123         if (which != -1) {
124             wholistens = which;
125             kill(children[which].pid, SIGUSR1);
126         } else {
127             wholistens = -2;
128         }
129         updated = 1;
130     }
131 }
132 
133 static int
putpid(const char * path)134 putpid(const char *path)
135 {
136     char buf[30];
137     int fd = open(path, O_WRONLY | O_TRUNC | O_CREAT, 0664);
138     if (fd == -1) {
139         syswarn("cannot open %s", path);
140         return -1;
141     }
142     snprintf(buf, sizeof(buf), "%ld\n", (long) getpid());
143     if (write(fd, buf, strlen(buf)) < 0) {
144         syswarn("cannot write to %s", path);
145         close(fd);
146         return -1;
147     }
148     close(fd);
149     return 0;
150 }
151 
152 static void
do_groupstats(struct reader * r)153 do_groupstats(struct reader *r)
154 {
155     struct rs_groupstats *reply;
156     char *group = (char *) (r->buf) + sizeof(struct rs_cmd);
157     reply = xmalloc(sizeof(struct rs_groupstats));
158 
159     debug("OVDB: rs: do_groupstats '%s'", group);
160     if (ovdb_groupstats(group, &reply->lo, &reply->hi, &reply->count,
161                         &reply->flag)) {
162         reply->status = CMD_GROUPSTATS;
163         reply->aliaslen = 0;
164     } else {
165         reply->status = CMD_GROUPSTATS | RPLY_ERROR;
166     }
167     free(r->buf);
168     r->buf = reply;
169     r->buflen = sizeof(struct rs_groupstats);
170     r->bufpos = 0;
171     r->mode = MODE_WRITE;
172 }
173 
174 static void
do_opensrch(struct reader * r)175 do_opensrch(struct reader *r)
176 {
177     struct rs_cmd *cmd = r->buf;
178     struct rs_opensrch *reply;
179     char *group = (char *) (r->buf) + sizeof(struct rs_cmd);
180     reply = xmalloc(sizeof(struct rs_opensrch));
181 
182     debug("OVDB: rs: do_opensrch '%s' %d %d", group, cmd->artlo, cmd->arthi);
183 
184     if (r->currentsearch != NULL) {
185         /* can only open one search at a time */
186         reply->status = CMD_OPENSRCH | RPLY_ERROR;
187     } else {
188         reply->handle = ovdb_opensearch(group, cmd->artlo, cmd->arthi);
189         if (reply->handle == NULL) {
190             reply->status = CMD_OPENSRCH | RPLY_ERROR;
191         } else {
192             reply->status = CMD_OPENSRCH;
193         }
194         r->currentsearch = reply->handle;
195     }
196     free(r->buf);
197     r->buf = reply;
198     r->buflen = sizeof(struct rs_opensrch);
199     r->bufpos = 0;
200     r->mode = MODE_WRITE;
201 }
202 
203 static void
do_srch(struct reader * r)204 do_srch(struct reader *r)
205 {
206     struct rs_cmd *cmd = r->buf;
207     struct rs_srch *reply;
208     ARTNUM artnum;
209     TOKEN token;
210     time_t arrived;
211     int len;
212     char *data;
213 
214     if (ovdb_search(cmd->handle, &artnum, &data, &len, &token, &arrived)) {
215         reply = xmalloc(sizeof(struct rs_srch) + len);
216         reply->status = CMD_SRCH;
217         reply->artnum = artnum;
218         reply->token = token;
219         reply->arrived = arrived;
220         reply->len = len;
221         memcpy((char *) reply + sizeof(struct rs_srch), data, len);
222         r->buflen = sizeof(struct rs_srch) + len;
223     } else {
224         reply = xmalloc(sizeof(struct rs_srch));
225         reply->status = CMD_SRCH | RPLY_ERROR;
226         r->buflen = sizeof(struct rs_srch);
227     }
228     free(r->buf);
229     r->buf = reply;
230     r->bufpos = 0;
231     r->mode = MODE_WRITE;
232 }
233 
234 static void
do_closesrch(struct reader * r)235 do_closesrch(struct reader *r)
236 {
237     struct rs_cmd *cmd = r->buf;
238 
239     ovdb_closesearch(cmd->handle);
240     free(r->buf);
241     r->buf = NULL;
242     r->bufpos = r->buflen = 0;
243     r->mode = MODE_READ;
244     r->currentsearch = NULL;
245 }
246 
247 static void
do_artinfo(struct reader * r)248 do_artinfo(struct reader *r)
249 {
250     struct rs_cmd *cmd = r->buf;
251     struct rs_artinfo *reply;
252     char *group = (char *) (r->buf) + sizeof(struct rs_cmd);
253     TOKEN token;
254 
255     debug("OVDB: rs: do_artinfo: '%s' %d", group, cmd->artlo);
256     if (ovdb_getartinfo(group, cmd->artlo, &token)) {
257         reply = xmalloc(sizeof(struct rs_artinfo));
258         reply->status = CMD_ARTINFO;
259         reply->token = token;
260         r->buflen = sizeof(struct rs_artinfo);
261     } else {
262         reply = xmalloc(sizeof(struct rs_artinfo));
263         reply->status = CMD_ARTINFO | RPLY_ERROR;
264         r->buflen = sizeof(struct rs_artinfo);
265     }
266     free(r->buf);
267     r->buf = reply;
268     r->bufpos = 0;
269     r->mode = MODE_WRITE;
270 }
271 
272 
273 static int
process_cmd(struct reader * r)274 process_cmd(struct reader *r)
275 {
276     struct rs_cmd *cmd = r->buf;
277 
278     if (r->state == STATE_READCMD) {
279         switch (cmd->what) {
280         case CMD_GROUPSTATS:
281         case CMD_OPENSRCH:
282         case CMD_ARTINFO:
283             r->state = STATE_READGROUP;
284             if (cmd->grouplen == 0) {
285                 /* shouldn't happen... */
286                 r->mode = MODE_CLOSED;
287                 close(r->fd);
288                 free(r->buf);
289                 r->buf = NULL;
290                 return 0;
291             }
292             r->buflen += cmd->grouplen;
293             r->buf = xrealloc(r->buf, r->buflen);
294             return 1;
295         }
296     }
297 
298     switch (cmd->what) {
299     case CMD_GROUPSTATS:
300         ((char *) r->buf)[r->buflen - 1] =
301             0; /* make sure group is null-terminated */
302         do_groupstats(r);
303         break;
304     case CMD_OPENSRCH:
305         ((char *) r->buf)[r->buflen - 1] = 0;
306         do_opensrch(r);
307         break;
308     case CMD_SRCH:
309         do_srch(r);
310         break;
311     case CMD_CLOSESRCH:
312         do_closesrch(r);
313         break;
314     case CMD_ARTINFO:
315         ((char *) r->buf)[r->buflen - 1] = 0;
316         do_artinfo(r);
317         break;
318     default:
319         r->mode = MODE_CLOSED;
320         close(r->fd);
321         free(r->buf);
322         r->buf = NULL;
323         break;
324     }
325 
326     return 0;
327 }
328 
329 static void
handle_read(struct reader * r)330 handle_read(struct reader *r)
331 {
332     int n;
333     r->lastactive = now;
334 
335     if (r->buf == NULL) {
336         r->state = STATE_READCMD;
337         r->buf = xmalloc(sizeof(struct rs_cmd));
338         r->buflen = sizeof(struct rs_cmd);
339         r->bufpos = 0;
340     }
341 again:
342     n = read(r->fd, (char *) (r->buf) + r->bufpos, r->buflen - r->bufpos);
343     if (n <= 0) {
344         if (n < 0
345             && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
346             return;
347         r->mode = MODE_CLOSED;
348         close(r->fd);
349         free(r->buf);
350         r->buf = NULL;
351     }
352     r->bufpos += n;
353 
354     if (r->bufpos >= r->buflen)
355         if (process_cmd(r))
356             goto again;
357 }
358 
359 static void
handle_write(struct reader * r)360 handle_write(struct reader *r)
361 {
362     int n;
363     r->lastactive = now;
364 
365     if (r->buf == NULL) /* shouldn't happen */
366         return;
367 
368     n = write(r->fd, (char *) (r->buf) + r->bufpos, r->buflen - r->bufpos);
369     if (n <= 0) {
370         if (n < 0
371             && (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK))
372             return;
373         r->mode = MODE_CLOSED;
374         close(r->fd);
375         free(r->buf);
376         r->buf = NULL;
377     }
378     r->bufpos += n;
379 
380     if (r->bufpos >= r->buflen) {
381         free(r->buf);
382         r->buf = NULL;
383         r->bufpos = r->buflen = 0;
384         r->mode = MODE_READ;
385     }
386 }
387 
388 static void
newclient(int fd)389 newclient(int fd)
390 {
391     struct reader *r;
392     int i;
393 
394     fdflag_nonblocking(fd, 1);
395 
396     if (numreaders >= readertablen) {
397         readertablen += 50;
398         readertab = xrealloc(readertab, readertablen * sizeof(struct reader));
399         for (i = numreaders; i < readertablen; i++) {
400             readertab[i].mode = MODE_CLOSED;
401             readertab[i].buf = NULL;
402         }
403     }
404 
405     r = &(readertab[numreaders]);
406     numreaders++;
407 
408     r->fd = fd;
409     r->mode = MODE_WRITE;
410     r->buflen = sizeof(OVDB_SERVER_BANNER);
411     r->bufpos = 0;
412     r->buf = xstrdup(OVDB_SERVER_BANNER);
413     r->lastactive = now;
414     r->currentsearch = NULL;
415 
416     handle_write(r);
417 }
418 
419 static void
delclient(int which)420 delclient(int which)
421 {
422     int i;
423     struct reader *r = &(readertab[which]);
424 
425     if (r->mode != MODE_CLOSED)
426         close(r->fd);
427 
428     if (r->buf != NULL) {
429         free(r->buf);
430     }
431     if (r->currentsearch != NULL) {
432         ovdb_closesearch(r->currentsearch);
433         r->currentsearch = NULL;
434     }
435 
436     /* numreaders will get decremented by the calling function */
437     for (i = which; i < numreaders - 1; i++)
438         readertab[i] = readertab[i + 1];
439 
440     readertab[i].mode = MODE_CLOSED;
441     readertab[i].buf = NULL;
442 }
443 
444 static pid_t
serverproc(int me)445 serverproc(int me)
446 {
447     fd_set rdset, wrset;
448     int i, ret, count, lastfd, lastnumreaders;
449     socklen_t salen;
450     struct sockaddr_in sa;
451     struct timeval tv;
452     pid_t pid;
453 
454     pid = fork();
455     if (pid != 0)
456         return pid;
457 
458     if (!ovdb_open(OV_READ | OVDB_SERVER))
459         die("cannot open overview");
460     xsignal_norestart(SIGINT, sigfunc);
461     xsignal_norestart(SIGTERM, sigfunc);
462     xsignal_norestart(SIGHUP, sigfunc);
463     xsignal_norestart(SIGUSR1, childsig);
464     xsignal(SIGPIPE, SIG_IGN);
465 
466     numreaders = lastnumreaders = 0;
467     if (ovdb_conf.maxrsconn) {
468         readertablen = ovdb_conf.maxrsconn;
469     } else {
470         readertablen = 50;
471     }
472     readertab = xmalloc(readertablen * sizeof(struct reader));
473     for (i = 0; i < readertablen; i++) {
474         readertab[i].mode = MODE_CLOSED;
475         readertab[i].buf = NULL;
476     }
477 
478     setproctitle("0 clients");
479 
480     /* main loop */
481     while (!signalled) {
482         FD_ZERO(&rdset);
483         FD_ZERO(&wrset);
484         lastfd = 0;
485         if (wholistens == me) {
486             if (!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
487                 FD_SET(listensock, &rdset);
488                 lastfd = listensock;
489                 setproctitle("%d client%s *", numreaders,
490                              numreaders == 1 ? "" : "s");
491             } else {
492                 wholistens = -1;
493                 kill(parent, SIGUSR1);
494             }
495         }
496 
497         for (i = 0; i < numreaders; i++) {
498             switch (readertab[i].mode) {
499             case MODE_READ:
500                 FD_SET(readertab[i].fd, &rdset);
501                 break;
502             case MODE_WRITE:
503                 FD_SET(readertab[i].fd, &wrset);
504                 break;
505             default:
506                 continue;
507             }
508             if (readertab[i].fd > lastfd)
509                 lastfd = readertab[i].fd;
510         }
511         tv.tv_usec = 0;
512         tv.tv_sec = SELECT_TIMEOUT;
513         count = select(lastfd + 1, &rdset, &wrset, NULL, &tv);
514 
515         if (signalled)
516             break;
517         if (count <= 0)
518             continue;
519 
520         now = time(NULL);
521 
522         if (FD_ISSET(listensock, &rdset)) {
523             if (!ovdb_conf.maxrsconn || numreaders < ovdb_conf.maxrsconn) {
524                 salen = sizeof(sa);
525                 ret = accept(listensock, (struct sockaddr *) &sa, &salen);
526                 if (ret >= 0) {
527                     newclient(ret);
528                     wholistens = -1;
529                     children[me].num = numreaders;
530                     kill(parent, SIGUSR1);
531                 }
532             }
533         }
534 
535         for (i = 0; i < numreaders; i++) {
536             switch (readertab[i].mode) {
537             case MODE_READ:
538                 if (FD_ISSET(readertab[i].fd, &rdset))
539                     handle_read(&(readertab[i]));
540                 break;
541             case MODE_WRITE:
542                 if (FD_ISSET(readertab[i].fd, &wrset))
543                     handle_write(&(readertab[i]));
544                 break;
545             }
546         }
547 
548         for (i = 0; i < numreaders; i++) {
549             if (readertab[i].mode == MODE_CLOSED
550                 || (time_t)(readertab[i].lastactive + CLIENT_TIMEOUT) < now) {
551                 delclient(i);
552                 numreaders--;
553                 i--;
554             }
555         }
556         if (children[me].num != numreaders) {
557             children[me].num = numreaders;
558             kill(parent, SIGUSR1);
559         }
560         if (numreaders != lastnumreaders) {
561             lastnumreaders = numreaders;
562             setproctitle("%d client%s", numreaders,
563                          numreaders == 1 ? "" : "s");
564         }
565     }
566 
567     ovdb_close();
568     exit(0);
569 }
570 
571 static int
reap(void)572 reap(void)
573 {
574     int i, cs;
575     pid_t c;
576 
577     while ((c = waitpid(-1, &cs, WNOHANG)) > 0) {
578         for (i = 0; i < ovdb_conf.numrsprocs; i++) {
579             if (c == children[i].pid) {
580                 if (children[i].started + 30 > time(NULL))
581                     return 1;
582 
583                 children[i].num = 0;
584 
585                 if (wholistens == i)
586                     wholistens = -1;
587 
588                 if ((children[i].pid = serverproc(i)) == -1)
589                     return 1;
590 
591                 children[i].started = time(NULL);
592                 break;
593             }
594         }
595     }
596     if (wholistens == -1)
597         parentsig(SIGUSR1);
598     return 0;
599 }
600 
601 #    ifndef MAP_ANON
602 #        ifdef MAP_ANONYMOUS
603 #            define MAP_ANON MAP_ANONYMOUS
604 #        endif
605 #    endif
606 
607 static void *
sharemem(size_t len)608 sharemem(size_t len)
609 {
610 #    ifdef MAP_ANON
611     return mmap(0, len, PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0);
612 #    else
613     int fd = open("/dev/zero", O_RDWR, 0);
614     char *ptr = mmap(0, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
615     close(fd);
616     return ptr;
617 #    endif
618 }
619 
620 int
main(int argc,char * argv[])621 main(int argc, char *argv[])
622 {
623     int i, ret;
624     socklen_t salen;
625     char *path, *pidfile;
626 #    ifdef HAVE_UNIX_DOMAIN_SOCKETS
627     struct sockaddr_un sa;
628 #    else
629     struct sockaddr_in sa;
630 #    endif
631     struct timeval tv;
632     fd_set rdset;
633 
634     setproctitle_init(argc, argv);
635 
636     openlog("ovdb_server", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG);
637     message_program_name = "ovdb_server";
638 
639     if (argc != 2 || strcmp(argv[1], SPACES))
640         die("should be started by ovdb_init");
641     message_handlers_warn(1, message_log_syslog_err);
642     message_handlers_die(1, message_log_syslog_err);
643 
644     if (!innconf_read(NULL))
645         exit(1);
646 
647     if (strcmp(innconf->ovmethod, "ovdb"))
648         die("ovmethod not set to ovdb in inn.conf");
649 
650     read_ovdb_conf();
651 
652 #    ifdef HAVE_UNIX_DOMAIN_SOCKETS
653     listensock = socket(AF_UNIX, SOCK_STREAM, 0);
654 #    else
655     listensock = socket(AF_INET, SOCK_STREAM, 0);
656 #    endif
657     if (listensock < 0)
658         sysdie("cannot create socket");
659 
660     fdflag_nonblocking(listensock, 1);
661 
662     memset(&sa, 0, sizeof sa);
663 #    ifdef HAVE_UNIX_DOMAIN_SOCKETS
664     sa.sun_family = AF_UNIX;
665     path = concatpath(innconf->pathrun, OVDB_SERVER_SOCKET);
666     strlcpy(sa.sun_path, path, sizeof(sa.sun_path));
667     unlink(sa.sun_path);
668     free(path);
669     ret = bind(listensock, (struct sockaddr *) &sa, SUN_LEN(&sa));
670 #    else
671     sa.sin_family = AF_INET;
672     sa.sin_port = htons(OVDB_SERVER_PORT);
673     sa.sin_addr.s_addr = htonl(0x7f000001UL);
674 
675     ret = bind(listensock, (struct sockaddr *) &sa, sizeof sa);
676 
677     if (ret != 0 && errno == EADDRNOTAVAIL) {
678         sa.sin_family = AF_INET;
679         sa.sin_port = htons(OVDB_SERVER_PORT);
680         sa.sin_addr.s_addr = INADDR_ANY;
681         ret = bind(listensock, (struct sockaddr *) &sa, sizeof sa);
682     }
683 #    endif
684 
685     if (ret != 0)
686         sysdie("cannot bind socket");
687     if (listen(listensock, innconf->maxlisten) < 0)
688         sysdie("cannot listen on socket");
689 
690     pidfile = concatpath(innconf->pathrun, OVDB_SERVER_PIDFILE);
691     if (putpid(pidfile))
692         exit(1);
693 
694     xsignal_norestart(SIGINT, sigfunc);
695     xsignal_norestart(SIGTERM, sigfunc);
696     xsignal_norestart(SIGHUP, sigfunc);
697 
698     xsignal_norestart(SIGUSR1, parentsig);
699     xsignal_norestart(SIGCHLD, childsig);
700     parent = getpid();
701 
702     children = sharemem(sizeof(struct child) * (ovdb_conf.numrsprocs + 1));
703 
704     if (children == NULL)
705         sysdie("cannot mmap shared memory");
706     for (i = 0; i < ovdb_conf.numrsprocs + 1; i++) {
707         children[i].pid = -1;
708         children[i].num = 0;
709     }
710 
711     for (i = 0; i < ovdb_conf.numrsprocs; i++) {
712         if ((children[i].pid = serverproc(i)) == -1) {
713             for (i--; i >= 0; i--)
714                 kill(children[i].pid, SIGTERM);
715             exit(1);
716         }
717         children[i].started = time(NULL);
718         sleep(1);
719     }
720 
721     while (!signalled) {
722         if (reap())
723             break;
724 
725         if (wholistens == -2) {
726             FD_ZERO(&rdset);
727             FD_SET(listensock, &rdset);
728             tv.tv_usec = 0;
729             tv.tv_sec = SELECT_TIMEOUT;
730             ret = select(listensock + 1, &rdset, NULL, NULL, &tv);
731 
732             if (ret == 1 && wholistens == -2) {
733                 salen = sizeof(sa);
734                 ret = accept(listensock, (struct sockaddr *) &sa, &salen);
735                 if (ret >= 0)
736                     close(ret);
737             }
738         } else {
739             pause();
740         }
741     }
742 
743     for (i = 0; i < ovdb_conf.numrsprocs; i++)
744         if (children[i].pid != -1)
745             kill(children[i].pid, SIGTERM);
746 
747     while (wait(&ret) > 0)
748         ;
749 
750     unlink(pidfile);
751 
752     exit(0);
753 }
754 
755 
756 #endif /* HAVE_BDB */
757