1 /*
2  * Stream filters related variables and functions.
3  *
4  * Copyright (C) 2015 Qualys Inc., Christopher Faulet <cfaulet@qualys.com>
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  *
11  */
12 
13 #include <haproxy/api.h>
14 #include <haproxy/buf-t.h>
15 #include <haproxy/cfgparse.h>
16 #include <haproxy/compression.h>
17 #include <haproxy/errors.h>
18 #include <haproxy/filters.h>
19 #include <haproxy/flt_http_comp.h>
20 #include <haproxy/http_ana.h>
21 #include <haproxy/http_htx.h>
22 #include <haproxy/htx.h>
23 #include <haproxy/namespace.h>
24 #include <haproxy/stream.h>
25 #include <haproxy/stream_interface.h>
26 #include <haproxy/tools.h>
27 #include <haproxy/trace.h>
28 
29 
30 #define TRACE_SOURCE &trace_strm
31 
32 /* Pool used to allocate filters */
33 DECLARE_STATIC_POOL(pool_head_filter, "filter", sizeof(struct filter));
34 
35 static int handle_analyzer_result(struct stream *s, struct channel *chn, unsigned int an_bit, int ret);
36 
37 /* - RESUME_FILTER_LOOP and RESUME_FILTER_END must always be used together.
38  *   The first one begins a loop and the seconds one ends it.
39  *
40  * - BREAK_EXECUTION must be used to break the loop and set the filter from
41  *   which to resume the next time.
42  *
43  *  Here is an example:
44  *
45  *    RESUME_FILTER_LOOP(stream, channel) {
46  *        ...
47  *        if (cond)
48  *             BREAK_EXECUTION(stream, channel, label);
49  *        ...
50  *    } RESUME_FILTER_END;
51  *    ...
52  *     label:
53  *    ...
54  *
55  */
56 #define RESUME_FILTER_LOOP(strm, chn)					\
57 	do {								\
58 		struct filter *filter;					\
59 									\
60 		if (strm_flt(strm)->current[CHN_IDX(chn)]) {	\
61 			filter = strm_flt(strm)->current[CHN_IDX(chn)]; \
62 			strm_flt(strm)->current[CHN_IDX(chn)] = NULL; \
63 			goto resume_execution;				\
64 		}							\
65 									\
66 		list_for_each_entry(filter, &strm_flt(s)->filters, list) { \
67 		resume_execution:
68 
69 #define RESUME_FILTER_END					\
70 		}						\
71 	} while(0)
72 
73 #define BREAK_EXECUTION(strm, chn, label)				\
74 	do {								\
75 		strm_flt(strm)->current[CHN_IDX(chn)] = filter;	\
76 		goto label;						\
77 	} while (0)
78 
79 
80 /* List head of all known filter keywords */
81 static struct flt_kw_list flt_keywords = {
82 	.list = LIST_HEAD_INIT(flt_keywords.list)
83 };
84 
85 /*
86  * Registers the filter keyword list <kwl> as a list of valid keywords for next
87  * parsing sessions.
88  */
89 void
flt_register_keywords(struct flt_kw_list * kwl)90 flt_register_keywords(struct flt_kw_list *kwl)
91 {
92 	LIST_ADDQ(&flt_keywords.list, &kwl->list);
93 }
94 
95 /*
96  * Returns a pointer to the filter keyword <kw>, or NULL if not found. If the
97  * keyword is found with a NULL ->parse() function, then an attempt is made to
98  * find one with a valid ->parse() function. This way it is possible to declare
99  * platform-dependant, known keywords as NULL, then only declare them as valid
100  * if some options are met. Note that if the requested keyword contains an
101  * opening parenthesis, everything from this point is ignored.
102  */
103 struct flt_kw *
flt_find_kw(const char * kw)104 flt_find_kw(const char *kw)
105 {
106 	int index;
107 	const char *kwend;
108 	struct flt_kw_list *kwl;
109 	struct flt_kw *ret = NULL;
110 
111 	kwend = strchr(kw, '(');
112 	if (!kwend)
113 		kwend = kw + strlen(kw);
114 
115 	list_for_each_entry(kwl, &flt_keywords.list, list) {
116 		for (index = 0; kwl->kw[index].kw != NULL; index++) {
117 			if ((strncmp(kwl->kw[index].kw, kw, kwend - kw) == 0) &&
118 			    kwl->kw[index].kw[kwend-kw] == 0) {
119 				if (kwl->kw[index].parse)
120 					return &kwl->kw[index]; /* found it !*/
121 				else
122 					ret = &kwl->kw[index];  /* may be OK */
123 			}
124 		}
125 	}
126 	return ret;
127 }
128 
129 /*
130  * Dumps all registered "filter" keywords to the <out> string pointer. The
131  * unsupported keywords are only dumped if their supported form was not found.
132  */
133 void
flt_dump_kws(char ** out)134 flt_dump_kws(char **out)
135 {
136 	struct flt_kw_list *kwl;
137 	int index;
138 
139 	if (!out)
140 		return;
141 
142 	*out = NULL;
143 	list_for_each_entry(kwl, &flt_keywords.list, list) {
144 		for (index = 0; kwl->kw[index].kw != NULL; index++) {
145 			if (kwl->kw[index].parse ||
146 			    flt_find_kw(kwl->kw[index].kw) == &kwl->kw[index]) {
147 				memprintf(out, "%s[%4s] %s%s\n", *out ? *out : "",
148 				          kwl->scope,
149 				          kwl->kw[index].kw,
150 				          kwl->kw[index].parse ? "" : " (not supported)");
151 			}
152 		}
153 	}
154 }
155 
156 /*
157  * Lists the known filters on <out>
158  */
159 void
list_filters(FILE * out)160 list_filters(FILE *out)
161 {
162 	char *filters, *p, *f;
163 
164 	fprintf(out, "Available filters :\n");
165 	flt_dump_kws(&filters);
166 	for (p = filters; (f = strtok_r(p,"\n",&p));)
167 		fprintf(out, "\t%s\n", f);
168 	free(filters);
169 }
170 
171 /*
172  * Parses the "filter" keyword. All keywords must be handled by filters
173  * themselves
174  */
175 static int
parse_filter(char ** args,int section_type,struct proxy * curpx,struct proxy * defpx,const char * file,int line,char ** err)176 parse_filter(char **args, int section_type, struct proxy *curpx,
177 	     struct proxy *defpx, const char *file, int line, char **err)
178 {
179 	struct flt_conf *fconf = NULL;
180 
181 	/* Filter cannot be defined on a default proxy */
182 	if (curpx == defpx) {
183 		memprintf(err, "parsing [%s:%d] : %s is not allowed in a 'default' section.",
184 			  file, line, args[0]);
185 		return -1;
186 	}
187 	if (!strcmp(args[0], "filter")) {
188 		struct flt_kw *kw;
189 		int cur_arg;
190 
191 		if (!*args[1]) {
192 			memprintf(err,
193 				  "parsing [%s:%d] : missing argument for '%s' in %s '%s'.",
194 				  file, line, args[0], proxy_type_str(curpx), curpx->id);
195 			goto error;
196 		}
197 		fconf = calloc(1, sizeof(*fconf));
198 		if (!fconf) {
199 			memprintf(err, "'%s' : out of memory", args[0]);
200 			goto error;
201 		}
202 
203 		cur_arg = 1;
204 		kw = flt_find_kw(args[cur_arg]);
205 		if (kw) {
206 			if (!kw->parse) {
207 				memprintf(err, "parsing [%s:%d] : '%s' : "
208 					  "'%s' option is not implemented in this version (check build options).",
209 					  file, line, args[0], args[cur_arg]);
210 				goto error;
211 			}
212 			if (kw->parse(args, &cur_arg, curpx, fconf, err, kw->private) != 0) {
213 				if (err && *err)
214 					memprintf(err, "'%s' : '%s'",
215 						  args[0], *err);
216 				else
217 					memprintf(err, "'%s' : error encountered while processing '%s'",
218 						  args[0], args[cur_arg]);
219 				goto error;
220 			}
221 		}
222 		else {
223 			flt_dump_kws(err);
224 			indent_msg(err, 4);
225 			memprintf(err, "'%s' : unknown keyword '%s'.%s%s",
226 			          args[0], args[cur_arg],
227 			          err && *err ? " Registered keywords :" : "", err && *err ? *err : "");
228 			goto error;
229 		}
230 		if (*args[cur_arg]) {
231 			memprintf(err, "'%s %s' : unknown keyword '%s'.",
232 			          args[0], args[1], args[cur_arg]);
233 			goto error;
234 		}
235 		if (fconf->ops == NULL) {
236 			memprintf(err, "'%s %s' : no callbacks defined.",
237 			          args[0], args[1]);
238 			goto error;
239 		}
240 
241 		LIST_ADDQ(&curpx->filter_configs, &fconf->list);
242 	}
243 	return 0;
244 
245   error:
246 	free(fconf);
247 	return -1;
248 
249 
250 }
251 
252 /*
253  * Calls 'init' callback for all filters attached to a proxy. This happens after
254  * the configuration parsing. Filters can finish to fill their config. Returns
255  * (ERR_ALERT|ERR_FATAL) if an error occurs, 0 otherwise.
256  */
257 static int
flt_init(struct proxy * proxy)258 flt_init(struct proxy *proxy)
259 {
260 	struct flt_conf *fconf;
261 
262 	list_for_each_entry(fconf, &proxy->filter_configs, list) {
263 		if (fconf->ops->init && fconf->ops->init(proxy, fconf) < 0)
264 			return ERR_ALERT|ERR_FATAL;
265 	}
266 	return 0;
267 }
268 
269 /*
270  * Calls 'init_per_thread' callback for all filters attached to a proxy for each
271  * threads. This happens after the thread creation. Filters can finish to fill
272  * their config. Returns (ERR_ALERT|ERR_FATAL) if an error occurs, 0 otherwise.
273  */
274 static int
flt_init_per_thread(struct proxy * proxy)275 flt_init_per_thread(struct proxy *proxy)
276 {
277 	struct flt_conf *fconf;
278 
279 	list_for_each_entry(fconf, &proxy->filter_configs, list) {
280 		if (fconf->ops->init_per_thread && fconf->ops->init_per_thread(proxy, fconf) < 0)
281 			return ERR_ALERT|ERR_FATAL;
282 	}
283 	return 0;
284 }
285 
286 /* Calls flt_init() for all proxies, see above */
287 static int
flt_init_all()288 flt_init_all()
289 {
290 	struct proxy *px;
291 	int err_code = 0;
292 
293 	for (px = proxies_list; px; px = px->next) {
294 		if (px->disabled) {
295 			flt_deinit(px);
296 			continue;
297 		}
298 		err_code |= flt_init(px);
299 		if (err_code & (ERR_ABORT|ERR_FATAL)) {
300 			ha_alert("Failed to initialize filters for proxy '%s'.\n",
301 				 px->id);
302 			return err_code;
303 		}
304 	}
305 	return 0;
306 }
307 
308 /* Calls flt_init_per_thread() for all proxies, see above.  Be careful here, it
309  * returns 0 if an error occurred. This is the opposite of flt_init_all. */
310 static int
flt_init_all_per_thread()311 flt_init_all_per_thread()
312 {
313 	struct proxy *px;
314 	int err_code = 0;
315 
316 	for (px = proxies_list; px; px = px->next) {
317 		if (px->disabled)
318 			continue;
319 
320 		err_code = flt_init_per_thread(px);
321 		if (err_code & (ERR_ABORT|ERR_FATAL)) {
322 			ha_alert("Failed to initialize filters for proxy '%s' for thread %u.\n",
323 				 px->id, tid);
324 			return 0;
325 		}
326 	}
327 	return 1;
328 }
329 
330 /*
331  * Calls 'check' callback for all filters attached to a proxy. This happens
332  * after the configuration parsing but before filters initialization. Returns
333  * the number of encountered errors.
334  */
335 int
flt_check(struct proxy * proxy)336 flt_check(struct proxy *proxy)
337 {
338 	struct flt_conf *fconf;
339 	int err = 0;
340 
341 	err += check_implicit_http_comp_flt(proxy);
342 	list_for_each_entry(fconf, &proxy->filter_configs, list) {
343 		if (fconf->ops->check)
344 			err += fconf->ops->check(proxy, fconf);
345 	}
346 	return err;
347 }
348 
349 /*
350  * Calls 'denit' callback for all filters attached to a proxy. This happens when
351  * HAProxy is stopped.
352  */
353 void
flt_deinit(struct proxy * proxy)354 flt_deinit(struct proxy *proxy)
355 {
356 	struct flt_conf *fconf, *back;
357 
358 	list_for_each_entry_safe(fconf, back, &proxy->filter_configs, list) {
359 		if (fconf->ops->deinit)
360 			fconf->ops->deinit(proxy, fconf);
361 		LIST_DEL(&fconf->list);
362 		free(fconf);
363 	}
364 }
365 
366 /*
367  * Calls 'denit_per_thread' callback for all filters attached to a proxy for
368  * each threads. This happens before exiting a thread.
369  */
370 void
flt_deinit_per_thread(struct proxy * proxy)371 flt_deinit_per_thread(struct proxy *proxy)
372 {
373 	struct flt_conf *fconf, *back;
374 
375 	list_for_each_entry_safe(fconf, back, &proxy->filter_configs, list) {
376 		if (fconf->ops->deinit_per_thread)
377 			fconf->ops->deinit_per_thread(proxy, fconf);
378 	}
379 }
380 
381 
382 /* Calls flt_deinit_per_thread() for all proxies, see above */
383 static void
flt_deinit_all_per_thread()384 flt_deinit_all_per_thread()
385 {
386 	struct proxy *px;
387 
388 	for (px = proxies_list; px; px = px->next)
389 		flt_deinit_per_thread(px);
390 }
391 
392 /* Attaches a filter to a stream. Returns -1 if an error occurs, 0 otherwise. */
393 static int
flt_stream_add_filter(struct stream * s,struct flt_conf * fconf,unsigned int flags)394 flt_stream_add_filter(struct stream *s, struct flt_conf *fconf, unsigned int flags)
395 {
396 	struct filter *f;
397 
398 	if (IS_HTX_STRM(s) && !(fconf->flags & FLT_CFG_FL_HTX))
399 		return 0;
400 
401 	f = pool_alloc(pool_head_filter);
402 	if (!f) /* not enough memory */
403 		return -1;
404 	memset(f, 0, sizeof(*f));
405 	f->config = fconf;
406 	f->flags |= flags;
407 
408 	if (FLT_OPS(f)->attach) {
409 		int ret = FLT_OPS(f)->attach(s, f);
410 		if (ret <= 0) {
411 			pool_free(pool_head_filter, f);
412 			return ret;
413 		}
414 	}
415 
416 	LIST_ADDQ(&strm_flt(s)->filters, &f->list);
417 	strm_flt(s)->flags |= STRM_FLT_FL_HAS_FILTERS;
418 	return 0;
419 }
420 
421 /*
422  * Called when a stream is created. It attaches all frontend filters to the
423  * stream. Returns -1 if an error occurs, 0 otherwise.
424  */
425 int
flt_stream_init(struct stream * s)426 flt_stream_init(struct stream *s)
427 {
428 	struct flt_conf *fconf;
429 
430 	memset(strm_flt(s), 0, sizeof(*strm_flt(s)));
431 	LIST_INIT(&strm_flt(s)->filters);
432 	list_for_each_entry(fconf, &strm_fe(s)->filter_configs, list) {
433 		if (flt_stream_add_filter(s, fconf, 0) < 0)
434 			return -1;
435 	}
436 	return 0;
437 }
438 
439 /*
440  * Called when a stream is closed or when analyze ends (For an HTTP stream, this
441  * happens after each request/response exchange). When analyze ends, backend
442  * filters are removed. When the stream is closed, all filters attached to the
443  * stream are removed.
444  */
445 void
flt_stream_release(struct stream * s,int only_backend)446 flt_stream_release(struct stream *s, int only_backend)
447 {
448 	struct filter *filter, *back;
449 
450 	list_for_each_entry_safe(filter, back, &strm_flt(s)->filters, list) {
451 		if (!only_backend || (filter->flags & FLT_FL_IS_BACKEND_FILTER)) {
452 			if (FLT_OPS(filter)->detach)
453 				FLT_OPS(filter)->detach(s, filter);
454 			LIST_DEL(&filter->list);
455 			pool_free(pool_head_filter, filter);
456 		}
457 	}
458 	if (LIST_ISEMPTY(&strm_flt(s)->filters))
459 		strm_flt(s)->flags &= ~STRM_FLT_FL_HAS_FILTERS;
460 }
461 
462 /*
463  * Calls 'stream_start' for all filters attached to a stream. This happens when
464  * the stream is created, just after calling flt_stream_init
465  * function. Returns -1 if an error occurs, 0 otherwise.
466  */
467 int
flt_stream_start(struct stream * s)468 flt_stream_start(struct stream *s)
469 {
470 	struct filter *filter;
471 
472 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
473 		if (FLT_OPS(filter)->stream_start && FLT_OPS(filter)->stream_start(s, filter) < 0)
474 			return -1;
475 	}
476 	if (strm_li(s) && (strm_li(s)->analysers & AN_REQ_FLT_START_FE)) {
477 		s->req.flags |= CF_FLT_ANALYZE;
478 		s->req.analysers |= AN_REQ_FLT_END;
479 	}
480 	return 0;
481 }
482 
483 /*
484  * Calls 'stream_stop' for all filters attached to a stream. This happens when
485  * the stream is stopped, just before calling flt_stream_release function.
486  */
487 void
flt_stream_stop(struct stream * s)488 flt_stream_stop(struct stream *s)
489 {
490 	struct filter *filter;
491 
492 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
493 		if (FLT_OPS(filter)->stream_stop)
494 			FLT_OPS(filter)->stream_stop(s, filter);
495 	}
496 }
497 
498 /*
499  * Calls 'check_timeouts' for all filters attached to a stream. This happens when
500  * the stream is woken up because of expired timer.
501  */
502 void
flt_stream_check_timeouts(struct stream * s)503 flt_stream_check_timeouts(struct stream *s)
504 {
505 	struct filter *filter;
506 
507 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
508 		if (FLT_OPS(filter)->check_timeouts)
509 			FLT_OPS(filter)->check_timeouts(s, filter);
510 	}
511 }
512 
513 /*
514  * Called when a backend is set for a stream. If the frontend and the backend
515  * are not the same, this function attaches all backend filters to the
516  * stream. Returns -1 if an error occurs, 0 otherwise.
517  */
518 int
flt_set_stream_backend(struct stream * s,struct proxy * be)519 flt_set_stream_backend(struct stream *s, struct proxy *be)
520 {
521 	struct flt_conf *fconf;
522 	struct filter   *filter;
523 
524 	if (strm_fe(s) == be)
525 		goto end;
526 
527 	list_for_each_entry(fconf, &be->filter_configs, list) {
528 		if (flt_stream_add_filter(s, fconf, FLT_FL_IS_BACKEND_FILTER) < 0)
529 			return -1;
530 	}
531 
532   end:
533 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
534 		if (FLT_OPS(filter)->stream_set_backend &&
535 		    FLT_OPS(filter)->stream_set_backend(s, filter, be) < 0)
536 			return -1;
537 	}
538 	if (be->be_req_ana & AN_REQ_FLT_START_BE) {
539 		s->req.flags |= CF_FLT_ANALYZE;
540 		s->req.analysers |= AN_REQ_FLT_END;
541 	}
542 	if ((strm_fe(s)->fe_rsp_ana | be->be_rsp_ana) & (AN_RES_FLT_START_FE|AN_RES_FLT_START_BE)) {
543 		s->res.flags |= CF_FLT_ANALYZE;
544 		s->res.analysers |= AN_RES_FLT_END;
545 	}
546 
547 	return 0;
548 }
549 
550 
551 /*
552  * Calls 'http_end' callback for all filters attached to a stream. All filters
553  * are called here, but only if there is at least one "data" filter. This
554  * functions is called when all data were parsed and forwarded. 'http_end'
555  * callback is resumable, so this function returns a negative value if an error
556  * occurs, 0 if it needs to wait for some reason, any other value otherwise.
557  */
558 int
flt_http_end(struct stream * s,struct http_msg * msg)559 flt_http_end(struct stream *s, struct http_msg *msg)
560 {
561 	unsigned long long *strm_off = &FLT_STRM_OFF(s, msg->chn);
562 	unsigned int offset = 0;
563 	int ret = 1;
564 
565 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s, s->txn, msg);
566 	RESUME_FILTER_LOOP(s, msg->chn) {
567 		unsigned long long flt_off = FLT_OFF(filter, msg->chn);
568 		offset = flt_off - *strm_off;
569 
570 		/* Call http_end for data filters only. But the filter offset is
571 		 * still valid for all filters
572 		 . */
573 		if (!IS_DATA_FILTER(filter, msg->chn))
574 			continue;
575 
576 		if (FLT_OPS(filter)->http_end) {
577 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
578 			ret = FLT_OPS(filter)->http_end(s, filter, msg);
579 			if (ret <= 0)
580 				BREAK_EXECUTION(s, msg->chn, end);
581 		}
582 	} RESUME_FILTER_END;
583 
584 	c_adv(msg->chn, offset);
585 	*strm_off += offset;
586 
587 end:
588 	DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
589 	return ret;
590 }
591 
592 /*
593  * Calls 'http_reset' callback for all filters attached to a stream. This
594  * happens when a 100-continue response is received.
595  */
596 void
flt_http_reset(struct stream * s,struct http_msg * msg)597 flt_http_reset(struct stream *s, struct http_msg *msg)
598 {
599 	struct filter *filter;
600 
601 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s, s->txn, msg);
602 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
603 		if (FLT_OPS(filter)->http_reset) {
604 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
605 			FLT_OPS(filter)->http_reset(s, filter, msg);
606 		}
607 	}
608 	DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
609 }
610 
611 /*
612  * Calls 'http_reply' callback for all filters attached to a stream when HA
613  * decides to stop the HTTP message processing.
614  */
615 void
flt_http_reply(struct stream * s,short status,const struct buffer * msg)616 flt_http_reply(struct stream *s, short status, const struct buffer *msg)
617 {
618 	struct filter *filter;
619 
620 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s, s->txn, msg);
621 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
622 		if (FLT_OPS(filter)->http_reply) {
623 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
624 			FLT_OPS(filter)->http_reply(s, filter, status, msg);
625 		}
626 	}
627 	DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
628 }
629 
630 /*
631  * Calls 'http_payload' callback for all "data" filters attached to a
632  * stream. This function is called when some data can be forwarded in the
633  * AN_REQ_HTTP_XFER_BODY and AN_RES_HTTP_XFER_BODY analyzers. It takes care to
634  * update the filters and the stream offset to be sure that a filter cannot
635  * forward more data than its predecessors. A filter can choose to not forward
636  * all data. Returns a negative value if an error occurs, else the number of
637  * forwarded bytes.
638  */
639 int
flt_http_payload(struct stream * s,struct http_msg * msg,unsigned int len)640 flt_http_payload(struct stream *s, struct http_msg *msg, unsigned int len)
641 {
642 	struct filter *filter;
643 	unsigned long long *strm_off = &FLT_STRM_OFF(s, msg->chn);
644 	unsigned int out = co_data(msg->chn);
645 	int ret, data;
646 
647 	strm_flt(s)->flags &= ~STRM_FLT_FL_HOLD_HTTP_HDRS;
648 
649 	ret = data = len - out;
650 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s, s->txn, msg);
651 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
652 		unsigned long long *flt_off = &FLT_OFF(filter, msg->chn);
653 		unsigned int offset = *flt_off - *strm_off;
654 
655 		/* Call http_payload for filters only. Forward all data for
656 		 * others and update the filter offset
657 		 */
658 		if (!IS_DATA_FILTER(filter, msg->chn)) {
659 			*flt_off += data - offset;
660 			continue;
661 		}
662 
663 		if (FLT_OPS(filter)->http_payload) {
664 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
665 			ret = FLT_OPS(filter)->http_payload(s, filter, msg, out + offset, data - offset);
666 			if (ret < 0)
667 				goto end;
668 			data = ret + *flt_off - *strm_off;
669 			*flt_off += ret;
670 		}
671 	}
672 
673 	/* If nothing was forwarded yet, we take care to hold the headers if
674 	 * following conditions are met :
675 	 *
676 	 *  - *strm_off == 0 (nothing forwarded yet)
677 	 *  - ret == 0       (no data forwarded at all on this turn)
678 	 *  - STRM_FLT_FL_HOLD_HTTP_HDRS flag set (at least one filter want to hold the headers)
679 	 *
680 	 * Be careful, STRM_FLT_FL_HOLD_HTTP_HDRS is removed before each http_payload loop.
681 	 * Thus, it must explicitly be set when necessary. We must do that to hold the headers
682 	 * when there is no payload.
683 	 */
684 	if (!ret && !*strm_off && (strm_flt(s)->flags & STRM_FLT_FL_HOLD_HTTP_HDRS))
685 		goto end;
686 
687 	ret = data;
688 	*strm_off += ret;
689  end:
690 	DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
691 	return ret;
692 }
693 
694 /*
695  * Calls 'channel_start_analyze' callback for all filters attached to a
696  * stream. This function is called when we start to analyze a request or a
697  * response. For frontend filters, it is called before all other analyzers. For
698  * backend ones, it is called before all backend
699  * analyzers. 'channel_start_analyze' callback is resumable, so this function
700  * returns 0 if an error occurs or if it needs to wait, any other value
701  * otherwise.
702  */
703 int
flt_start_analyze(struct stream * s,struct channel * chn,unsigned int an_bit)704 flt_start_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
705 {
706 	int ret = 1;
707 
708 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_FLT_ANA, s);
709 
710 	/* If this function is called, this means there is at least one filter,
711 	 * so we do not need to check the filter list's emptiness. */
712 
713 	/* Set flag on channel to tell that the channel is filtered */
714 	chn->flags |= CF_FLT_ANALYZE;
715 	chn->analysers |= ((chn->flags & CF_ISRESP) ? AN_RES_FLT_END : AN_REQ_FLT_END);
716 
717 	RESUME_FILTER_LOOP(s, chn) {
718 		if (!(chn->flags & CF_ISRESP)) {
719 			if (an_bit == AN_REQ_FLT_START_BE &&
720 			    !(filter->flags & FLT_FL_IS_BACKEND_FILTER))
721 				continue;
722 		}
723 		else {
724 			if (an_bit == AN_RES_FLT_START_BE &&
725 			    !(filter->flags & FLT_FL_IS_BACKEND_FILTER))
726 				continue;
727 		}
728 
729 		FLT_OFF(filter, chn) = 0;
730 		if (FLT_OPS(filter)->channel_start_analyze) {
731 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_FLT_ANA, s);
732 			ret = FLT_OPS(filter)->channel_start_analyze(s, filter, chn);
733 			if (ret <= 0)
734 				BREAK_EXECUTION(s, chn, end);
735 		}
736 	} RESUME_FILTER_END;
737 
738  end:
739 	ret = handle_analyzer_result(s, chn, an_bit, ret);
740 	DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_FLT_ANA, s);
741 	return ret;
742 }
743 
744 /*
745  * Calls 'channel_pre_analyze' callback for all filters attached to a
746  * stream. This function is called BEFORE each analyzer attached to a channel,
747  * expects analyzers responsible for data sending. 'channel_pre_analyze'
748  * callback is resumable, so this function returns 0 if an error occurs or if it
749  * needs to wait, any other value otherwise.
750  *
751  * Note this function can be called many times for the same analyzer. In fact,
752  * it is called until the analyzer finishes its processing.
753  */
754 int
flt_pre_analyze(struct stream * s,struct channel * chn,unsigned int an_bit)755 flt_pre_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
756 {
757 	int ret = 1;
758 
759 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_FLT_ANA, s);
760 
761 	RESUME_FILTER_LOOP(s, chn) {
762 		if (FLT_OPS(filter)->channel_pre_analyze && (filter->pre_analyzers & an_bit)) {
763 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_FLT_ANA, s);
764 			ret = FLT_OPS(filter)->channel_pre_analyze(s, filter, chn, an_bit);
765 			if (ret <= 0)
766 				BREAK_EXECUTION(s, chn, check_result);
767 			filter->pre_analyzers &= ~an_bit;
768 		}
769 	} RESUME_FILTER_END;
770 
771  check_result:
772 	ret = handle_analyzer_result(s, chn, 0, ret);
773 	DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_FLT_ANA, s);
774 	return ret;
775 }
776 
777 /*
778  * Calls 'channel_post_analyze' callback for all filters attached to a
779  * stream. This function is called AFTER each analyzer attached to a channel,
780  * expects analyzers responsible for data sending. 'channel_post_analyze'
781  * callback is NOT resumable, so this function returns a 0 if an error occurs,
782  * any other value otherwise.
783  *
784  * Here, AFTER means when the analyzer finishes its processing.
785  */
786 int
flt_post_analyze(struct stream * s,struct channel * chn,unsigned int an_bit)787 flt_post_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
788 {
789 	struct filter *filter;
790 	int            ret = 1;
791 
792 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_FLT_ANA, s);
793 
794 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
795 		if (FLT_OPS(filter)->channel_post_analyze &&  (filter->post_analyzers & an_bit)) {
796 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_FLT_ANA, s);
797 			ret = FLT_OPS(filter)->channel_post_analyze(s, filter, chn, an_bit);
798 			if (ret < 0)
799 				break;
800 			filter->post_analyzers &= ~an_bit;
801 		}
802 	}
803 	ret = handle_analyzer_result(s, chn, 0, ret);
804 	DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_FLT_ANA, s);
805 	return ret;
806 }
807 
808 /*
809  * This function is the AN_REQ/RES_FLT_HTTP_HDRS analyzer, used to filter HTTP
810  * headers or a request or a response. Returns 0 if an error occurs or if it
811  * needs to wait, any other value otherwise.
812  */
813 int
flt_analyze_http_headers(struct stream * s,struct channel * chn,unsigned int an_bit)814 flt_analyze_http_headers(struct stream *s, struct channel *chn, unsigned int an_bit)
815 {
816 	struct http_msg *msg;
817 	int              ret = 1;
818 
819 	msg = ((chn->flags & CF_ISRESP) ? &s->txn->rsp : &s->txn->req);
820 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s, s->txn, msg);
821 
822 	RESUME_FILTER_LOOP(s, chn) {
823 		if (FLT_OPS(filter)->http_headers) {
824 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
825 			ret = FLT_OPS(filter)->http_headers(s, filter, msg);
826 			if (ret <= 0)
827 				BREAK_EXECUTION(s, chn, check_result);
828 		}
829 	} RESUME_FILTER_END;
830 
831 	if (HAS_DATA_FILTERS(s, chn)) {
832 		size_t data = http_get_hdrs_size(htxbuf(&chn->buf));
833 		struct filter *f;
834 
835 		list_for_each_entry(f, &strm_flt(s)->filters, list)
836 			FLT_OFF(f, chn) = data;
837 	}
838 
839  check_result:
840 	ret = handle_analyzer_result(s, chn, an_bit, ret);
841 	DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA|STRM_EV_FLT_ANA, s);
842 	return ret;
843 }
844 
845 /*
846  * Calls 'channel_end_analyze' callback for all filters attached to a
847  * stream. This function is called when we stop to analyze a request or a
848  * response. It is called after all other analyzers. 'channel_end_analyze'
849  * callback is resumable, so this function returns 0 if an error occurs or if it
850  * needs to wait, any other value otherwise.
851  */
852 int
flt_end_analyze(struct stream * s,struct channel * chn,unsigned int an_bit)853 flt_end_analyze(struct stream *s, struct channel *chn, unsigned int an_bit)
854 {
855 	int ret = 1;
856 
857 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_FLT_ANA, s);
858 
859 	/* Check if all filters attached on the stream have finished their
860 	 * processing on this channel. */
861 	if (!(chn->flags & CF_FLT_ANALYZE))
862 		goto sync;
863 
864 	RESUME_FILTER_LOOP(s, chn) {
865 		FLT_OFF(filter, chn) = 0;
866 		unregister_data_filter(s, chn, filter);
867 
868 		if (FLT_OPS(filter)->channel_end_analyze) {
869 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_FLT_ANA, s);
870 			ret = FLT_OPS(filter)->channel_end_analyze(s, filter, chn);
871 			if (ret <= 0)
872 				BREAK_EXECUTION(s, chn, end);
873 		}
874 	} RESUME_FILTER_END;
875 
876  end:
877 	/* We don't remove yet this analyzer because we need to synchronize the
878 	 * both channels. So here, we just remove the flag CF_FLT_ANALYZE. */
879 	ret = handle_analyzer_result(s, chn, 0, ret);
880 	if (ret) {
881 		chn->flags &= ~CF_FLT_ANALYZE;
882 
883 		/* Pretend there is an activity on both channels. Flag on the
884 		 * current one will be automatically removed, so only the other
885 		 * one will remain. This is a way to be sure that
886 		 * 'channel_end_analyze' callback will have a chance to be
887 		 * called at least once for the other side to finish the current
888 		 * processing. Of course, this is the filter responsibility to
889 		 * wakeup the stream if it choose to loop on this callback. */
890 		s->req.flags |= CF_WAKE_ONCE;
891 		s->res.flags |= CF_WAKE_ONCE;
892 	}
893 
894 
895  sync:
896 	/* Now we can check if filters have finished their work on the both
897 	 * channels */
898 	if (!(s->req.flags & CF_FLT_ANALYZE) && !(s->res.flags & CF_FLT_ANALYZE)) {
899 		/* Sync channels by removing this analyzer for the both channels */
900 		s->req.analysers &= ~AN_REQ_FLT_END;
901 		s->res.analysers &= ~AN_RES_FLT_END;
902 
903 		/* Remove backend filters from the list */
904 		flt_stream_release(s, 1);
905 		DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_FLT_ANA, s);
906 	}
907 	else {
908 		DBG_TRACE_DEVEL("waiting for sync", STRM_EV_STRM_ANA|STRM_EV_FLT_ANA, s);
909 	}
910 	return ret;
911 }
912 
913 
914 /*
915  * Calls 'tcp_payload' callback for all "data" filters attached to a
916  * stream. This function is called when some data can be forwarded in the
917  * AN_REQ_FLT_XFER_BODY and AN_RES_FLT_XFER_BODY analyzers. It takes care to
918  * update the filters and the stream offset to be sure that a filter cannot
919  * forward more data than its predecessors. A filter can choose to not forward
920  * all data. Returns a negative value if an error occurs, else the number of
921  * forwarded bytes.
922  */
923 int
flt_tcp_payload(struct stream * s,struct channel * chn,unsigned int len)924 flt_tcp_payload(struct stream *s, struct channel *chn, unsigned int len)
925 {
926 	struct filter *filter;
927 	unsigned long long *strm_off = &FLT_STRM_OFF(s, chn);
928 	unsigned int out = co_data(chn);
929 	int ret, data;
930 
931 	ret = data = len - out;
932 	DBG_TRACE_ENTER(STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
933 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
934 		unsigned long long *flt_off = &FLT_OFF(filter, chn);
935 		unsigned int offset = *flt_off - *strm_off;
936 
937 		/* Call tcp_payload for filters only. Forward all data for
938 		 * others and update the filter offset
939 		 */
940 		if (!IS_DATA_FILTER(filter, chn)) {
941 			*flt_off += data - offset;
942 			continue;
943 		}
944 
945 		if (FLT_OPS(filter)->tcp_payload) {
946 
947 			DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
948 			ret = FLT_OPS(filter)->tcp_payload(s, filter, chn, out + offset, data - offset);
949 			if (ret < 0)
950 				goto end;
951 			data = ret + *flt_off - *strm_off;
952 			*flt_off += ret;
953 		}
954 	}
955 
956 	/* Only forward data if the last filter decides to forward something */
957 	if (ret > 0) {
958 		ret = data;
959 		*strm_off += ret;
960 	}
961  end:
962 	DBG_TRACE_LEAVE(STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
963 	return ret;
964 }
965 
966 /*
967  * Called when TCP data must be filtered on a channel. This function is the
968  * AN_REQ/RES_FLT_XFER_DATA analyzer. When called, it is responsible to forward
969  * data when the proxy is not in http mode. Behind the scene, it calls
970  * consecutively 'tcp_data' and 'tcp_forward_data' callbacks for all "data"
971  * filters attached to a stream. Returns 0 if an error occurs or if it needs to
972  * wait, any other value otherwise.
973  */
974 int
flt_xfer_data(struct stream * s,struct channel * chn,unsigned int an_bit)975 flt_xfer_data(struct stream *s, struct channel *chn, unsigned int an_bit)
976 {
977 	unsigned int len;
978 	int ret = 1;
979 
980 	DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
981 
982 	/* If there is no "data" filters, we do nothing */
983 	if (!HAS_DATA_FILTERS(s, chn))
984 		goto end;
985 
986 	/* Be sure that the output is still opened. Else we stop the data
987 	 * filtering. */
988 	if ((chn->flags & (CF_READ_ERROR|CF_READ_TIMEOUT|CF_WRITE_ERROR|CF_WRITE_TIMEOUT)) ||
989 	    ((chn->flags & CF_SHUTW) && (chn->to_forward || co_data(chn))))
990 		goto end;
991 
992 	if (s->flags & SF_HTX) {
993 		struct htx *htx = htxbuf(&chn->buf);
994 		len = htx->data;
995 	}
996 	else
997 		len = c_data(chn);
998 
999 	ret = flt_tcp_payload(s, chn, len);
1000 	if (ret < 0)
1001 		goto end;
1002 	c_adv(chn, ret);
1003 
1004 	/* Stop waiting data if the input in closed and no data is pending or if
1005 	 * the output is closed. */
1006 	if (chn->flags & CF_SHUTW) {
1007 		ret = 1;
1008 		goto end;
1009 	}
1010 	if (chn->flags & CF_SHUTR) {
1011 		if (((s->flags & SF_HTX) && htx_is_empty(htxbuf(&chn->buf))) || c_empty(chn)) {
1012 			ret = 1;
1013 			goto end;
1014 		}
1015 	}
1016 
1017 	/* Wait for data */
1018 	DBG_TRACE_DEVEL("waiting for more data", STRM_EV_STRM_ANA|STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
1019 	return 0;
1020  end:
1021 	/* Terminate the data filtering. If <ret> is negative, an error was
1022 	 * encountered during the filtering. */
1023 	ret = handle_analyzer_result(s, chn, an_bit, ret);
1024 	DBG_TRACE_LEAVE(STRM_EV_STRM_ANA|STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
1025 	return ret;
1026 }
1027 
1028 /*
1029  * Handles result of filter's analyzers. It returns 0 if an error occurs or if
1030  * it needs to wait, any other value otherwise.
1031  */
1032 static int
handle_analyzer_result(struct stream * s,struct channel * chn,unsigned int an_bit,int ret)1033 handle_analyzer_result(struct stream *s, struct channel *chn,
1034 		       unsigned int an_bit, int ret)
1035 {
1036 	int finst;
1037 	int status = 0;
1038 
1039 	if (ret < 0)
1040 		goto return_bad_req;
1041 	else if (!ret)
1042 		goto wait;
1043 
1044 	/* End of job, return OK */
1045 	if (an_bit) {
1046 		chn->analysers  &= ~an_bit;
1047 		chn->analyse_exp = TICK_ETERNITY;
1048 	}
1049 	return 1;
1050 
1051  return_bad_req:
1052 	/* An error occurs */
1053 	channel_abort(&s->req);
1054 	channel_abort(&s->res);
1055 
1056 	if (!(chn->flags & CF_ISRESP)) {
1057 		s->req.analysers &= AN_REQ_FLT_END;
1058 		finst = SF_FINST_R;
1059 		status = 400;
1060 		/* FIXME: incr counters */
1061 	}
1062 	else {
1063 		s->res.analysers &= AN_RES_FLT_END;
1064 		finst = SF_FINST_H;
1065 		status = 502;
1066 		/* FIXME: incr counters */
1067 	}
1068 
1069 	if (IS_HTX_STRM(s)) {
1070 		/* Do not do that when we are waiting for the next request */
1071 		if (s->txn->status > 0)
1072 			http_reply_and_close(s, s->txn->status, NULL);
1073 		else {
1074 			s->txn->status = status;
1075 			http_reply_and_close(s, status, http_error_message(s));
1076 		}
1077 	}
1078 
1079 	if (!(s->flags & SF_ERR_MASK))
1080 		s->flags |= SF_ERR_PRXCOND;
1081 	if (!(s->flags & SF_FINST_MASK))
1082 		s->flags |= finst;
1083 	DBG_TRACE_DEVEL("leaving on error", STRM_EV_FLT_ANA|STRM_EV_FLT_ERR, s);
1084 	return 0;
1085 
1086  wait:
1087 	if (!(chn->flags & CF_ISRESP))
1088 		channel_dont_connect(chn);
1089 	DBG_TRACE_DEVEL("wairing for more data", STRM_EV_FLT_ANA, s);
1090 	return 0;
1091 }
1092 
1093 
1094 /* Note: must not be declared <const> as its list will be overwritten.
1095  * Please take care of keeping this list alphabetically sorted, doing so helps
1096  * all code contributors.
1097  * Optional keywords are also declared with a NULL ->parse() function so that
1098  * the config parser can report an appropriate error when a known keyword was
1099  * not enabled. */
1100 static struct cfg_kw_list cfg_kws = {ILH, {
1101 		{ CFG_LISTEN, "filter", parse_filter },
1102 		{ 0, NULL, NULL },
1103 	}
1104 };
1105 
1106 INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
1107 
1108 REGISTER_POST_CHECK(flt_init_all);
1109 REGISTER_PER_THREAD_INIT(flt_init_all_per_thread);
1110 REGISTER_PER_THREAD_DEINIT(flt_deinit_all_per_thread);
1111 
1112 /*
1113  * Local variables:
1114  *  c-indent-level: 8
1115  *  c-basic-offset: 8
1116  * End:
1117  */
1118