1 /*
2  * libmavis_external.c
3  * (C)2001-2015 by Marc Huber <Marc.Huber@web.de>
4  * All rights reserved.
5  *
6  * $Id: libmavis_external.c,v 1.68 2015/03/14 06:11:27 marc Exp $
7  */
8 
9 #define __MAVIS_external__
10 #define MAVIS_name "external"
11 
12 #include <stdio.h>
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include <sys/time.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <signal.h>
19 #include <time.h>
20 #include <dlfcn.h>
21 #include <sys/time.h>
22 #include <unistd.h>
23 #include <pwd.h>
24 #include <grp.h>
25 #include <errno.h>
26 
27 #include "misc/memops.h"
28 #include "debug.h"
29 #include "log.h"
30 #include "misc/strops.h"
31 #include "misc/crc32.h"
32 #include "misc/rb.h"
33 #include "misc/io.h"
34 #include "misc/io_sched.h"
35 #include "misc/io_child.h"
36 #include "misc/ostype.h"
37 
38 static const char rcsid[] __attribute__ ((used)) = "$Id: libmavis_external.c,v 1.68 2015/03/14 06:11:27 marc Exp $";
39 
40 #define REAPMAX 30		/* terminated child history table size */
41 #define REAPINT 30		/* terminated child interval (seconds) */
42 
43 #define MAVIS_CTX_PRIVATE			\
44   struct io_context *io_context_local;		\
45   struct io_context *io_context_parent;		\
46   char *path;					\
47   char **argv;					\
48   int argc;					\
49   int child_cur;				\
50   int child_min;				\
51   int child_max;				\
52   int ncx;					\
53   struct context **cx;				\
54   struct context_stat *cx_stat;			\
55   int usage;					\
56   u_int counter;				\
57   rb_tree_t *backlog_fifo;			\
58   rb_tree_t *backlog_serial;			\
59   rb_tree_t *backlog_app_ctx;			\
60   rb_tree_t *outgoing;				\
61   rb_tree_t *junkcontexts;			\
62   time_t lastdump;				\
63   u_long backlog_cur;				\
64   u_long backlog_max;				\
65   u_long backlog_max_p;				\
66   int envcount;					\
67   char **env;					\
68   uid_t uid;					\
69   gid_t gid;					\
70   char *home;					\
71   int reapcur;					\
72   time_t reaphist[REAPMAX];			\
73   time_t startup_time;
74 
75 #include "mavis.h"
76 
77 struct context {
78     mavis_ctx *mcx;
79     pid_t pid;
80     char b_in[66536];
81     char b_out[66536];
82     char b_err[8192];
83     size_t b_in_len;
84     size_t b_out_len;
85     size_t b_err_len;
86     size_t b_in_off;
87     size_t b_out_off;
88     size_t b_err_off;
89     int fd_in;
90     int fd_out;
91     int fd_err;
92     u_int in_use:1;
93     u_int canceled:1;
94     av_ctx *ac;
95     int index;
96     int result;
97     unsigned long long counter;
98 };
99 struct context_stat {
100     unsigned long startup;
101     unsigned long startup_p;
102     unsigned long long counter;
103     unsigned long long counter_p;
104 };
105 
106 static int fork_child(mavis_ctx *, int);
107 
108 struct query {
109     mavis_ctx *mcx;
110     av_ctx *ac;
111     av_ctx *ac_bak;
112     time_t when;
113     u_long counter;
114     u_int serial_crc;
115     u_int canceled:1;
116     int result;
117 };
118 
compare_fifo(const void * v1,const void * v2)119 static int compare_fifo(const void *v1, const void *v2)
120 {
121     if (((struct query *) v1)->when < ((struct query *) v2)->when)
122 	return -1;
123     if (((struct query *) v1)->when > ((struct query *) v2)->when)
124 	return +1;
125     if (((struct query *) v1)->counter < ((struct query *) v2)->counter)
126 	return -1;
127     if (((struct query *) v1)->counter > ((struct query *) v2)->counter)
128 	return +1;
129     if (((struct query *) v1)->serial_crc < ((struct query *) v2)->serial_crc)
130 	return -1;
131     if (((struct query *) v1)->serial_crc > ((struct query *) v2)->serial_crc)
132 	return +1;
133     if (((struct query *) v1)->ac->app_ctx < ((struct query *) v2)->ac->app_ctx)
134 	return -1;
135     if (((struct query *) v1)->ac->app_ctx > ((struct query *) v2)->ac->app_ctx)
136 	return +1;
137     return 0;
138 }
139 
compare_serial(const void * v1,const void * v2)140 static int compare_serial(const void *v1, const void *v2)
141 {
142     if (((struct query *) v1)->serial_crc < ((struct query *) v2)->serial_crc)
143 	return -1;
144     if (((struct query *) v1)->serial_crc > ((struct query *) v2)->serial_crc)
145 	return +1;
146     return strcmp(((struct query *) v1)->ac->arr[AV_A_SERIAL], ((struct query *) v2)->ac->arr[AV_A_SERIAL]);
147 }
148 
compare_app_ctx(const void * v1,const void * v2)149 static int compare_app_ctx(const void *v1, const void *v2)
150 {
151     if (((struct query *) v1)->ac->app_ctx < ((struct query *) v2)->ac->app_ctx)
152 	return -1;
153     if (((struct query *) v1)->ac->app_ctx > ((struct query *) v2)->ac->app_ctx)
154 	return +1;
155     return 0;
156 }
157 
compare_ctx(const void * v1,const void * v2)158 static int compare_ctx(const void *v1, const void *v2)
159 {
160     if (v1 < v2)
161 	return -1;
162     if (v1 > v2)
163 	return +1;
164     return 0;
165 }
166 
free_payload(void * p)167 static void free_payload(void *p)
168 {
169     av_free(((struct query *) p)->ac);
170     av_free(((struct query *) p)->ac_bak);
171     free(p);
172 }
173 
free_context(void * c)174 static void free_context(void *c)
175 {
176     struct context *ctx = (struct context *) c;
177 
178     if (ctx->fd_err > -1)
179 	io_close(ctx->mcx->io, ctx->fd_err);
180     if (ctx->fd_in > -1)
181 	io_close(ctx->mcx->io, ctx->fd_in);
182     if (ctx->fd_out > -1)
183 	io_close(ctx->mcx->io, ctx->fd_out);
184     free(ctx);
185 }
186 
187 static void write_to_child(struct context *, int);
188 
189 #define HAVE_mavis_init_in
mavis_init_in(mavis_ctx * mcx)190 static int mavis_init_in(mavis_ctx * mcx)
191 {
192     int i;
193 
194     DebugIn(DEBUG_MAVIS);
195 
196     mcx->lastdump = mcx->startup_time = time(NULL);
197 
198     if (!mcx->path)
199 	logmsg("Warning: %s: module lacks path definition", MAVIS_name);
200     else if (!mcx->argv[0]) {
201 	mcx->argv[0] = Xstrdup(basename(mcx->path));
202 	mcx->argv[1] = NULL;
203     }
204 
205     if (mcx->child_min > mcx->child_max)
206 	mcx->child_min = mcx->child_max;
207 
208     if (!mcx->io_context_parent)
209 	mcx->io_context_local = mcx->io = io_init();
210     mcx->cx = Xcalloc(mcx->child_max, sizeof(struct context *));
211     mcx->cx_stat = Xcalloc(mcx->child_max, sizeof(struct context_stat));
212     for (i = 0; i < mcx->child_min; i++)
213 	fork_child(mcx, i);
214 
215     mcx->backlog_serial = RB_tree_new(compare_serial, NULL);
216     mcx->backlog_app_ctx = RB_tree_new(compare_app_ctx, NULL);
217     mcx->backlog_fifo = RB_tree_new(compare_fifo, free_payload);
218     mcx->outgoing = RB_tree_new(compare_app_ctx, free_payload);
219     mcx->junkcontexts = RB_tree_new(compare_ctx, free_context);
220 
221     DebugOut(DEBUG_MAVIS);
222     return MAVIS_INIT_OK;
223 }
224 
225 /*
226 exec = path argv0 argv1 argv2...
227 setenv a = b
228 childs min = n
229 childs max = n
230 home = dir
231 user-id = uid
232 group-id = gid
233 */
234 #define HAVE_mavis_parse_in
mavis_parse_in(mavis_ctx * mcx,struct sym * sym)235 static int mavis_parse_in(mavis_ctx * mcx, struct sym *sym)
236 {
237     u_int line;
238     char *env_name;
239     size_t len;
240     struct stat st;
241 
242     while (1) {
243 	switch (sym->code) {
244 	case S_script:
245 	    mavis_script_parse(mcx, sym);
246 	    continue;
247 	case S_userid:
248 	    parse_userid(sym, &mcx->uid, &mcx->gid);
249 	    continue;
250 	case S_groupid:
251 	    parse_groupid(sym, &mcx->gid);
252 	    continue;
253 	case S_home:
254 	    sym_get(sym);
255 	    parse(sym, S_equal);
256 	    strset(&mcx->home, sym->buf);
257 	    sym_get(sym);
258 	    continue;
259 	case S_childs:
260 	    sym_get(sym);
261 	    switch (sym->code) {
262 	    case S_min:
263 		sym_get(sym);
264 		parse(sym, S_equal);
265 		mcx->child_min = parse_int(sym);
266 		continue;
267 	    case S_max:
268 		sym_get(sym);
269 		parse(sym, S_equal);
270 		mcx->child_max = parse_int(sym);
271 		continue;
272 	    default:
273 		parse_error_expect(sym, S_min, S_max, S_unknown);
274 	    }
275 
276 	case S_setenv:
277 	    sym_get(sym);
278 	    env_name = alloca(strlen(sym->buf) + 1);
279 	    strcpy(env_name, sym->buf);
280 	    sym_get(sym);
281 	    parse(sym, S_equal);
282 	    len = strlen(env_name) + strlen(sym->buf) + 2;
283 	    mcx->env = Xrealloc(mcx->env, (mcx->envcount + 2) * sizeof(char *));
284 	    mcx->env[mcx->envcount] = Xcalloc(1, len);
285 	    snprintf(mcx->env[mcx->envcount++], len, "%s=%s", env_name, sym->buf);
286 	    mcx->env[mcx->envcount] = NULL;
287 	    sym_get(sym);
288 	    continue;
289 
290 	case S_exec:{
291 		char buf[MAX_INPUT_LINE_LEN];
292 		sym_get(sym);
293 		parse(sym, S_equal);
294 		mcx->argv = calloc(1, sizeof(char *));
295 		line = sym->line;
296 		ostypef(sym->buf, buf, sizeof(buf));
297 		if (stat(buf, &st))
298 		    parse_error(sym, "%s: %s", buf, strerror(errno));
299 		strset(&mcx->path, buf);
300 		sym_get(sym);
301 		while (sym->line == line) {
302 		    mcx->argv = realloc(mcx->argv, (mcx->argc + 2) * sizeof(char *));
303 		    mcx->argv[mcx->argc] = strdup(sym->buf);
304 		    mcx->argc++;
305 		    mcx->argv[mcx->argc] = NULL;
306 		    sym_get(sym);
307 		}
308 		if (!mcx->argv[0]) {
309 		    mcx->argv = realloc(mcx->argv, 2 * sizeof(char *));
310 		    mcx->argv[0] = strdup(mcx->path);
311 		    mcx->argv[1] = NULL;
312 		}
313 		continue;
314 	    }
315 	case S_eof:
316 	case S_closebra:
317 	    if (!mcx->argv)
318 		parse_error(sym, "Missing \"exec\" declaration.");
319 	    return MAVIS_CONF_OK;
320 	default:
321 	    parse_error_expect(sym, S_script, S_userid, S_groupid, S_home, S_childs, S_setenv, S_exec, S_closebra, S_unknown);
322 	}
323     }
324 }
325 
326 #define HAVE_mavis_drop_in
mavis_drop_in(mavis_ctx * mcx)327 static void mavis_drop_in(mavis_ctx * mcx)
328 {
329     int i;
330 
331     free(mcx->path);
332 
333     for (i = 0; mcx->argv[i]; i++)
334 	Xfree(&mcx->argv[i]);
335 
336     for (i = 0; i < mcx->child_max; i++)
337 	if (mcx->cx[i]) {
338 	    if (mcx->cx[i]->fd_in > -1)
339 		io_close(mcx->io, mcx->cx[i]->fd_in);
340 	    if (mcx->cx[i]->fd_out > -1)
341 		io_close(mcx->io, mcx->cx[i]->fd_out);
342 	    if (mcx->cx[i]->fd_err > -1)
343 		io_close(mcx->io, mcx->cx[i]->fd_err);
344 	    if (mcx->cx[i]) {
345 		kill(mcx->cx[i]->pid, SIGTERM);
346 	    }
347 
348 	    av_free(mcx->cx[i]->ac);
349 
350 	    free(mcx->cx[i]);
351 	}
352 
353     RB_tree_delete(mcx->junkcontexts);
354 
355     RB_tree_delete(mcx->backlog_app_ctx);
356     RB_tree_delete(mcx->backlog_serial);
357     RB_tree_delete(mcx->backlog_fifo);
358     RB_tree_delete(mcx->outgoing);
359 
360     if (mcx->env) {
361 	for (i = 0; i < mcx->envcount; i++)
362 	    free(mcx->env[i]);
363 	free(mcx->env);
364     }
365 
366     free(mcx->cx);
367     free(mcx->cx_stat);
368 
369     io_destroy(mcx->io_context_local, NULL);
370 }
371 
child_closed_stderr(struct context * ctx,int cur)372 static void child_closed_stderr(struct context *ctx, int cur __attribute__ ((unused)))
373 {
374     if (ctx->b_err_len) {
375 	logmsg("%s: %lu: %s", ctx->mcx->argv[0], (u_long) ctx->pid, ctx->b_err);
376 	ctx->b_err_len = 0;
377     }
378     RB_search_and_delete(ctx->mcx->junkcontexts, ctx);
379 }
380 
381 static void write_to_child(struct context *, int);
382 static void start_query(struct context *);
383 static int mavis_send_in(mavis_ctx *, av_ctx **);
384 
child_died(struct context * ctx,int cur)385 static void child_died(struct context *ctx, int cur __attribute__ ((unused)))
386 {
387     if (ctx->ac) {		// might be called multiple times else
388 	int i = ctx->index;
389 	DebugIn(DEBUG_PROC);
390 
391 	if (ctx->mcx->cx[i]->counter < 2) {
392 	    logmsg("%s: %lu: terminated before finishing first request", ctx->mcx->argv[0], (u_long) ctx->pid);
393 	    ctx->mcx->reaphist[ctx->mcx->reapcur] = io_now.tv_sec + REAPINT;
394 	    ctx->mcx->reapcur++;
395 	    ctx->mcx->reapcur %= REAPMAX;
396 	    ctx->mcx->usage--;
397 	} else
398 	    logmsg("%s: %lu: terminated after processing %llu requests", ctx->mcx->argv[0], (u_long) ctx->pid, ctx->mcx->cx[i]->counter);
399 
400 	ctx->mcx->cx[i]->counter = 0;
401 
402 	io_child_set(ctx->pid, NULL, NULL);
403 
404 	if (ctx->fd_in > -1) {
405 	    io_close(ctx->mcx->io, ctx->fd_in);
406 	    ctx->fd_in = -1;
407 	}
408 	if (ctx->fd_out > -1) {
409 	    io_close(ctx->mcx->io, ctx->fd_out);
410 	    ctx->fd_out = -1;
411 	}
412 
413 	ctx->index = -1;
414 
415 	RB_insert(ctx->mcx->junkcontexts, ctx);
416 
417 #ifdef DEBUG_RB
418 	fprintf(stderr, "EXT insert junkcontexts %p\n", ctx);
419 #endif
420 
421 	ctx->mcx->cx[i] = NULL;
422 	ctx->mcx->child_cur--;
423 
424 	fork_child(ctx->mcx, i);
425 
426 	if (ctx->mcx->cx[i]) {
427 	    ctx->mcx->cx[i]->ac = ctx->ac;
428 	    ctx->ac = NULL;
429 
430 	    ctx->mcx->cx_stat[i].counter++;
431 	    ctx->mcx->cx_stat[i].counter_p++;
432 	    start_query(ctx->mcx->cx[i]);
433 	}
434 
435 	DebugOut(DEBUG_PROC);
436     }
437 }
438 
read_from_child(struct context * ctx,int cur)439 static void read_from_child(struct context *ctx, int cur)
440 {
441     ssize_t len;
442     DebugIn(DEBUG_MAVIS);
443 
444     len = Read(ctx->fd_in, ctx->b_in + ctx->b_in_len, sizeof(ctx->b_in) - ctx->b_in_len - 1);
445 
446     if (len > 0) {
447 	char *t;
448 	int matchlevel = 0;
449 
450 	Debug((DEBUG_PROC, "%s:%d %s\n", __FILE__, __LINE__, ctx->mcx->path));
451 	ctx->b_in_len += len;
452 	ctx->b_in[ctx->b_in_len] = 0;
453 
454 	for (t = ctx->b_in + ctx->b_in_len - 1; t > ctx->b_in; t--)
455 	    switch (matchlevel) {
456 	    case 0:
457 		if (*t != '\n') {
458 		    DebugOut(DEBUG_MAVIS);
459 		    return;
460 		}
461 		matchlevel++;
462 		break;
463 	    case 1:
464 		if (!isdigit((int) *t)) {
465 		    DebugOut(DEBUG_MAVIS);
466 		    return;
467 		}
468 		matchlevel++;
469 		break;
470 	    case 2:
471 		if (!isdigit((int) *t) && *t != '-' && *t != '=') {
472 		    DebugOut(DEBUG_MAVIS);
473 		    return;
474 		}
475 		if (*t == '=')
476 		    matchlevel++;
477 		break;
478 	    case 3:
479 		if (*t == '\n') {
480 		    rb_node_t *r;
481 		    struct query *q;
482 		    char *serial = av_get(ctx->ac, AV_A_SERIAL);
483 		    char *serial_old = alloca(strlen(serial) + 1);
484 		    int result;
485 
486 		    strcpy(serial_old, serial);
487 
488 		    io_clr_i(ctx->mcx->io, ctx->fd_in);
489 
490 		    av_clear(ctx->ac);
491 		    *++t = 0;
492 		    av_char_to_array(ctx->ac, ctx->b_in, NULL);
493 		    result = atoi(++t);
494 
495 		    ctx->in_use = 0;
496 		    ctx->mcx->usage--;
497 
498 		    serial = av_get(ctx->ac, AV_A_SERIAL);
499 
500 		    if (!serial || strcmp(serial, serial_old)) {
501 			if (serial)
502 			    logmsg("%s: %lu: out of sync: " "got %s, expected %s. Terminating.", ctx->mcx->argv[0], (u_long) ctx->pid, serial, serial_old);
503 			else
504 			    logmsg("%s: %lu: missing serial. Terminating.", ctx->mcx->argv[0], (u_long) ctx->pid);
505 			av_free(ctx->ac);
506 			ctx->ac = NULL;
507 			kill(ctx->pid, SIGTERM);
508 			child_died(ctx, ctx->fd_in);
509 			DebugOut(DEBUG_MAVIS);
510 			return;
511 		    }
512 
513 		    q = Xcalloc(1, sizeof(struct context));
514 		    q->ac = ctx->ac;
515 		    ctx->ac = NULL;
516 
517 		    q->result = result;
518 
519 		    q->canceled = ctx->canceled;
520 		    ctx->canceled = 0;
521 
522 		    RB_insert(ctx->mcx->outgoing, q);
523 #ifdef DEBUG_RB
524 		    fprintf(stderr, "EXT insert outgoing %p\n", q);
525 #endif
526 
527 		    if (ctx->mcx->io_context_parent) {
528 			if (!RB_empty(ctx->mcx->backlog_fifo)) {
529 			    rb_node_t *rbn = RB_first(ctx->mcx->backlog_fifo);
530 			    struct query *qp = RB_payload(rbn, struct query *);
531 			    Debug((DEBUG_PROC, "%s:%d\n", __FILE__, __LINE__));
532 			    RB_search_and_delete(ctx->mcx->backlog_app_ctx, qp);
533 			    RB_search_and_delete(ctx->mcx->backlog_serial, qp);
534 			    ctx->ac = qp->ac;
535 			    qp->ac = NULL;
536 			    RB_delete(ctx->mcx->backlog_fifo, rbn);
537 #ifdef DEBUG_RB
538 			    fprintf(stderr, "EXT remove backlog_fifo %p\n", RB_payload(rbn, void *));
539 #endif
540 			    ctx->mcx->backlog_cur--;
541 			    ctx->mcx->usage++;
542 			    ctx->mcx->cx_stat[ctx->index].counter++;
543 			    ctx->mcx->cx_stat[ctx->index].counter_p++;
544 			    start_query(ctx);
545 			}
546 
547 			while ((r = RB_first(ctx->mcx->outgoing))) {
548 			    struct query *qp = RB_payload(r, struct query *);
549 
550 			    if (ctx->mcx->ac_bak)
551 				av_free(ctx->mcx->ac_bak);
552 			    ctx->mcx->ac_bak = qp->ac_bak;
553 			    qp->ac_bak = NULL;
554 
555 			    if (q->canceled) {
556 				av_free(ctx->mcx->ac_bak);
557 				ctx->mcx->ac_bak = NULL;
558 				RB_delete(ctx->mcx->outgoing, r);
559 			    } else
560 				((void (*)(void *)) qp->ac->app_cb) (qp->ac->app_ctx);
561 			}
562 		    }
563 		    DebugOut(DEBUG_MAVIS);
564 		    return;
565 		}
566 	    }
567     } else			//if(errno != EAGAIN)
568 	child_died(ctx, cur);
569     DebugOut(DEBUG_MAVIS);
570 }
571 
write_to_child(struct context * ctx,int cur)572 static void write_to_child(struct context *ctx, int cur)
573 {
574     ssize_t len;
575     DebugIn(DEBUG_PROC);
576 
577     len = Write(ctx->fd_out, ctx->b_out + ctx->b_out_off, ctx->b_out_len - ctx->b_out_off);
578 
579     if (len > 0) {
580 	ctx->b_out_off += len;
581 	if (ctx->b_out_len == ctx->b_out_off) {
582 	    io_clr_o(ctx->mcx->io, ctx->fd_out);
583 	    io_set_i(ctx->mcx->io, ctx->fd_in);
584 	} else
585 	    io_set_o(ctx->mcx->io, ctx->fd_out);
586     } else			//if(errno != EAGAIN)
587 	child_died(ctx, cur);
588 
589     DebugOut(DEBUG_PROC);
590 }
591 
read_err_from_child(struct context * ctx,int cur)592 static void read_err_from_child(struct context *ctx, int cur __attribute__ ((unused)))
593 {
594     ssize_t len;
595 
596     DebugIn(DEBUG_PROC);
597 
598     len = Read(ctx->fd_err, ctx->b_err + ctx->b_err_len, sizeof(ctx->b_err) - ctx->b_err_len - 1);
599 
600     Debug((DEBUG_PROC, " fd %d: read %d bytes (errno: %d, pid: %d\n", cur, (int) len, errno, (int) ctx->pid));
601 
602     Debug((DEBUG_ALL, ">>>%.*s<<<\n", (int) len, ctx->b_err + ctx->b_err_len));
603 
604     if (len > 0) {
605 	char *linestart = ctx->b_err;
606 	char *lineend;
607 
608 	ctx->b_err_len += len;
609 	ctx->b_err[ctx->b_err_len] = 0;
610 
611 	while ((lineend = strchr(linestart, '\n'))) {
612 	    *lineend = 0;
613 	    logmsg("%s: %lu: %s", ctx->mcx->argv[0], (u_long) ctx->pid, linestart);
614 	    linestart = lineend + 1;
615 	}
616 
617 	ctx->b_err_off = linestart - ctx->b_err;
618 	if (ctx->b_err_off)
619 	    memmove(ctx->b_err, linestart, ctx->b_err_len - ctx->b_err_off + 1);
620 	ctx->b_err_len -= ctx->b_err_off;
621 	ctx->b_err_off = 0;
622     } else			//if (errno != EAGAIN)
623 	child_closed_stderr(ctx, cur);
624 
625     DebugOut(DEBUG_PROC);
626 }
627 
fork_child(mavis_ctx * mcx,int i)628 static int fork_child(mavis_ctx * mcx, int i)
629 {
630     int fi[2], fo[2], fe[2];
631     pid_t childpid;
632 
633     if (mcx->reaphist[mcx->reapcur] >= io_now.tv_sec) {
634 	logmsg("%s: %s respawning too fast; throttling for %ld seconds.", MAVIS_name, mcx->path, (u_long) (mcx->reaphist[mcx->reapcur] - io_now.tv_sec));
635 	return -1;
636     }
637 
638     Debug((DEBUG_PROC, "forking child number %d\n", i));
639 
640     signal(SIGPIPE, SIG_IGN);
641 
642     if (pipe(fi) < 0) {
643 	logerr("pipe (%s:%d)", __FILE__, __LINE__);
644 	return -1;
645     }
646     if (pipe(fo) < 0) {
647 	logerr("pipe (%s:%d)", __FILE__, __LINE__);
648 	close(fi[0]);
649 	close(fi[1]);
650 	return -1;
651     }
652     if (pipe(fe) < 0) {
653 	logerr("pipe (%s:%d)", __FILE__, __LINE__);
654 	close(fi[0]);
655 	close(fi[1]);
656 	close(fo[0]);
657 	close(fo[1]);
658 	return -1;
659     }
660 #ifdef DEBUG
661     fflush(stderr);
662 #endif
663 
664     switch ((childpid = io_child_fork(NULL, NULL))) {
665     case 0:
666 	signal(SIGCHLD, SIG_DFL);
667 	close(fi[1]);
668 	close(fo[0]);
669 	close(fe[0]);
670 	dup2(fi[0], 0);
671 	dup2(fo[1], 1);
672 	dup2(fe[1], 2);
673 	if (mcx->home && chdir(mcx->home)) {
674 	    logerr("chdir(%s) (%s:%d)", mcx->home, __FILE__, __LINE__);
675 	    //FIXME
676 	}
677 	if (mcx->gid)
678 	    setgid(mcx->gid);
679 	if (mcx->uid)
680 	    setuid(mcx->uid);
681 
682 	if (mcx->env)
683 	    execve(mcx->path, mcx->argv, mcx->env);
684 	else
685 	    execv(mcx->path, mcx->argv);
686 
687 	logerr("exec (%s) (%s:%d)", mcx->path, __FILE__, __LINE__);
688 	exit(0);
689     case -1:
690 	logerr("fork (%s:%d)", __FILE__, __LINE__);
691 	close(fi[0]);
692 	close(fo[0]);
693 	close(fe[0]);
694 	close(fi[1]);
695 	close(fo[1]);
696 	close(fe[1]);
697 	return -1;
698     }
699     signal(SIGCHLD, SIG_IGN);
700 
701     close(fi[0]);
702     close(fo[1]);
703     close(fe[1]);
704 
705 #ifdef SO_NOSIGPIPE
706     {
707 	int one = 1;
708 	setsockopt(fi[1], SOL_SOCKET, SO_NOSIGPIPE, (const char *) &one, sizeof(one));
709 	setsockopt(fo[0], SOL_SOCKET, SO_NOSIGPIPE, (const char *) &one, sizeof(one));
710 	setsockopt(fe[0], SOL_SOCKET, SO_NOSIGPIPE, (const char *) &one, sizeof(one));
711     }
712 #endif
713 
714     fcntl(fi[1], F_SETFD, FD_CLOEXEC);
715     fcntl(fo[0], F_SETFD, FD_CLOEXEC);
716     fcntl(fe[0], F_SETFD, FD_CLOEXEC);
717 
718     fcntl(fi[1], F_SETFL, O_NONBLOCK);
719     fcntl(fo[0], F_SETFL, O_NONBLOCK);
720     fcntl(fe[0], F_SETFL, O_NONBLOCK);
721 
722     mcx->cx[i] = Xcalloc(1, sizeof(struct context));
723     mcx->cx[i]->mcx = mcx;
724     mcx->cx[i]->index = i;
725     mcx->cx[i]->pid = childpid;
726     mcx->cx[i]->fd_out = fi[1];
727     mcx->cx[i]->fd_in = fo[0];
728     mcx->cx[i]->fd_err = fe[0];
729 
730     mcx->child_cur++;
731 
732     io_register(mcx->io, mcx->cx[i]->fd_out, mcx->cx[i]);
733     io_set_cb_o(mcx->io, mcx->cx[i]->fd_out, (void *) write_to_child);
734     io_clr_cb_i(mcx->io, mcx->cx[i]->fd_out);
735     io_set_cb_h(mcx->io, mcx->cx[i]->fd_out, (void *) child_died);
736     io_set_cb_e(mcx->io, mcx->cx[i]->fd_out, (void *) child_died);
737 
738     io_register(mcx->io, mcx->cx[i]->fd_in, mcx->cx[i]);
739     io_clr_cb_o(mcx->io, mcx->cx[i]->fd_in);
740     io_set_cb_i(mcx->io, mcx->cx[i]->fd_in, (void *) read_from_child);
741     io_set_cb_h(mcx->io, mcx->cx[i]->fd_in, (void *) child_died);
742     io_set_cb_e(mcx->io, mcx->cx[i]->fd_in, (void *) child_died);
743 
744     io_register(mcx->io, mcx->cx[i]->fd_err, mcx->cx[i]);
745     io_clr_cb_o(mcx->io, mcx->cx[i]->fd_err);
746     io_set_cb_i(mcx->io, mcx->cx[i]->fd_err, (void *) read_err_from_child);
747     io_set_cb_h(mcx->io, mcx->cx[i]->fd_err, (void *) child_closed_stderr);
748     io_set_cb_e(mcx->io, mcx->cx[i]->fd_err, (void *) child_closed_stderr);
749     io_set_i(mcx->io, mcx->cx[i]->fd_err);
750     mcx->cx_stat[i].startup++;
751     mcx->cx_stat[i].startup_p++;
752 
753     return 0;
754 }
755 
start_query(struct context * ctx)756 static void start_query(struct context *ctx)
757 {
758 
759     if (ctx) {
760 	int l;
761 
762 	Debug((DEBUG_PROC, "starting query on child %d (%s)\n", ctx->index, av_get(ctx->ac, AV_A_SERIAL)));
763 	ctx->in_use = 1;
764 
765 	ctx->b_in_len = ctx->b_in_off = ctx->b_out_len = ctx->b_out_off = 0;
766 	l = av_array_to_char(ctx->ac, ctx->b_out, sizeof(ctx->b_out) - 3, NULL);
767 	if (l > -1) {
768 	    strcpy(ctx->b_out + l, "=\n");
769 	    ctx->b_out_len = l + 2;
770 	    write_to_child(ctx, ctx->fd_out);
771 	} else
772 	    logmsg("%s: query too long, ignoring", MAVIS_name);
773     }
774 }
775 
776 #define HAVE_mavis_send_in
mavis_send_in(mavis_ctx * mcx,av_ctx ** ac)777 static int mavis_send_in(mavis_ctx * mcx, av_ctx ** ac)
778 {
779     int i = -1;
780     int res = MAVIS_DEFERRED;
781 
782     if (!strcasecmp(av_get(*ac, AV_A_TYPE), AV_V_TYPE_LOGSTATS)) {
783 	unsigned long long counter = 0;
784 	unsigned long long counter_p = 0;
785 	u_long startup = 0;
786 	u_long startup_p = 0;
787 
788 	for (i = 0; i < mcx->child_max; i++)
789 	    if (mcx->cx[i]) {
790 		logmsg("STAT %s: %d: Q=%llu F=%lu q=%llu f=%lu",
791 		       MAVIS_name, i, mcx->cx_stat[i].counter, mcx->cx_stat[i].startup, mcx->cx_stat[i].counter_p, mcx->cx_stat[i].startup_p);
792 
793 		counter += mcx->cx_stat[i].counter;
794 		counter_p += mcx->cx_stat[i].counter_p;
795 		startup += mcx->cx_stat[i].startup;
796 		startup_p += mcx->cx_stat[i].startup_p;
797 		mcx->cx_stat[i].counter_p = 0;
798 		mcx->cx_stat[i].startup_p = 0;
799 	    }
800 
801 	logmsg
802 	    ("STAT %s: Q=%llu F=%lu B=%lu T=%d q=%llu f=%lu b=%lu t=%d",
803 	     MAVIS_name, counter, startup, mcx->backlog_max,
804 	     (int) (io_now.tv_sec - mcx->startup_time), counter_p, startup_p, mcx->backlog_max_p, (int) (io_now.tv_sec - mcx->lastdump));
805 
806 	mcx->backlog_max_p = mcx->backlog_cur;
807 	mcx->lastdump = io_now.tv_sec;
808 
809 	res = MAVIS_DOWN;
810     } else if (mcx->usage == mcx->child_max) {
811 	struct query *q = Xcalloc(1, sizeof(struct query));
812 	char *serial = av_get(*ac, AV_A_SERIAL);
813 	q->mcx = mcx;
814 	q->ac = *ac;
815 	*ac = NULL;
816 
817 	q->ac_bak = mcx->ac_bak;
818 	mcx->ac_bak = NULL;
819 
820 	q->serial_crc = crc32_update(INITCRC32, (u_char *) serial, strlen(serial));
821 	q->when = io_now.tv_sec;
822 	q->counter = mcx->counter++;
823 
824 	RB_insert(mcx->backlog_fifo, q);
825 #ifdef DEBUG_RB
826 	fprintf(stderr, "EXT insert backlog_fifo %p\n", q);
827 #endif
828 	RB_insert(mcx->backlog_app_ctx, q);
829 #ifdef DEBUG_RB
830 	fprintf(stderr, "EXT insert backlog_app_ctx %p\n", q);
831 #endif
832 	RB_insert(mcx->backlog_serial, q);
833 #ifdef DEBUG_RB
834 	fprintf(stderr, "EXT insert backlog_serial %p\n", q);
835 #endif
836 
837 	mcx->backlog_cur++;
838 	if (mcx->backlog_cur > mcx->backlog_max)
839 	    mcx->backlog_max = mcx->backlog_cur;
840 	if (mcx->backlog_cur > mcx->backlog_max_p)
841 	    mcx->backlog_max_p = mcx->backlog_cur;
842     } else {
843 	/* First, look for active childs that are idle */
844 	for (i = 0; i < mcx->child_max && (!mcx->cx[i] || mcx->cx[i]->in_use); i++);
845 
846 	/* If none found: fork a new child process */
847 	if (i == mcx->child_max) {
848 	    for (i = 0; i < mcx->child_max && mcx->cx[i]; i++);
849 	    if (0 > fork_child(mcx, i))
850 		return MAVIS_DEFERRED;
851 	}
852 
853 	mcx->cx[i]->ac = *ac;
854 	*ac = NULL;
855 
856 	mcx->usage++;
857 	mcx->cx[i]->counter++;
858 	mcx->cx_stat[i].counter++;
859 	mcx->cx_stat[i].counter_p++;
860 	start_query(mcx->cx[i]);
861 
862 	if (!mcx->io_context_parent) {
863 	    rb_node_t *r;
864 
865 	    while (mcx->cx[i] && mcx->cx[i]->in_use)
866 		io_poll(mcx->io, -1);
867 
868 	    r = RB_first(mcx->outgoing);
869 	    if (r) {
870 		struct query *q = RB_payload(r, struct query *);
871 		*ac = q->ac;
872 		q->ac = NULL;
873 		res = q->result;
874 		RB_delete(mcx->outgoing, r);
875 #ifdef DEBUG_RB
876 		fprintf(stderr, "EXT delete outgoing %p\n", r);
877 #endif
878 	    } else
879 		res = MAVIS_IGNORE;
880 	}
881     }
882     return res;
883 }
884 
885 #define HAVE_mavis_cancel_in
mavis_cancel_in(mavis_ctx * mcx,void * app_ctx)886 static int mavis_cancel_in(mavis_ctx * mcx, void *app_ctx)
887 {
888     struct query q;
889     rb_node_t *r;
890     int res = MAVIS_DOWN;
891     int i;
892 
893     q.ac = av_new(NULL, app_ctx);
894 
895     if ((r = RB_search(mcx->backlog_app_ctx, &q))) {
896 	struct query *qp = RB_payload(r, struct query *);
897 	io_sched_pop(mcx->io, qp);
898 	mcx->backlog_cur--;
899 	RB_search_and_delete(mcx->backlog_app_ctx, qp);
900 	RB_search_and_delete(mcx->backlog_fifo, qp);
901 	RB_delete(mcx->backlog_serial, r);
902 #ifdef DEBUG_RB
903 	fprintf(stderr, "EXT delete backlog_serial %p\n", r);
904 #endif
905     } else if ((r = RB_search(mcx->outgoing, &q))) {
906 	struct query *qp = RB_payload(r, struct query *);
907 	io_sched_pop(mcx->io, qp);
908 	RB_delete(mcx->outgoing, r);
909 #ifdef DEBUG_RB
910 	fprintf(stderr, "EXT delete outgoing %p\n", r);
911 #endif
912 	res = MAVIS_FINAL;
913     }
914 
915     for (i = 0; i < mcx->child_max; i++)
916 	if (mcx->cx[i] && mcx->cx[i]->ac && mcx->cx[i]->ac->app_ctx == app_ctx) {
917 	    mcx->cx[i]->canceled = 1;
918 	    break;
919 	}
920 
921     av_free(q.ac);
922     return res;
923 }
924 
925 #define HAVE_mavis_recv_in
mavis_recv_in(mavis_ctx * mcx,av_ctx ** ac,void * app_ctx)926 static int mavis_recv_in(mavis_ctx * mcx, av_ctx ** ac, void *app_ctx)
927 {
928     struct query q;
929     rb_node_t *r;
930     int res = MAVIS_DOWN;
931 
932     DebugIn(DEBUG_MAVIS);
933 
934     q.ac = av_new(NULL, app_ctx);
935     r = RB_search(mcx->outgoing, &q);
936     av_free(q.ac);
937 
938     if (r) {
939 	struct query *qp = RB_payload(r, struct query *);
940 	res = qp->result;
941 	*ac = qp->ac;
942 	qp->ac = NULL;
943 #ifdef DEBUG_RB
944 	fprintf(stderr, "EXT delete outgoing %p\n", r);
945 #endif
946 	RB_delete(mcx->outgoing, r);
947     }
948 
949     DebugOut(DEBUG_MAVIS);
950     return res;
951 }
952 
953 #define HAVE_mavis_new
mavis_new(mavis_ctx * mcx)954 static void mavis_new(mavis_ctx * mcx)
955 {
956     if (mcx->io)
957 	mcx->child_min = 4, mcx->child_max = 20;
958     else
959 	mcx->child_min = 1, mcx->child_max = 1;
960     mcx->io_context_parent = mcx->io;
961 }
962 
963 #include "mavis_glue.c"
964