1 /*	$NetBSD: bufferevent_filter.c,v 1.1.1.1 2013/04/11 16:43:26 christos Exp $	*/
2 /*
3  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include <sys/types.h>
31 
32 #include "event2/event-config.h"
33 #include <sys/cdefs.h>
34 __RCSID("$NetBSD: bufferevent_filter.c,v 1.1.1.1 2013/04/11 16:43:26 christos Exp $");
35 
36 #ifdef _EVENT_HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39 
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #ifdef _EVENT_HAVE_STDARG_H
45 #include <stdarg.h>
46 #endif
47 
48 #ifdef WIN32
49 #include <winsock2.h>
50 #endif
51 
52 #include "event2/util.h"
53 #include "event2/bufferevent.h"
54 #include "event2/buffer.h"
55 #include "event2/bufferevent_struct.h"
56 #include "event2/event.h"
57 #include "log-internal.h"
58 #include "mm-internal.h"
59 #include "bufferevent-internal.h"
60 #include "util-internal.h"
61 
62 /* prototypes */
63 static int be_filter_enable(struct bufferevent *, short);
64 static int be_filter_disable(struct bufferevent *, short);
65 static void be_filter_destruct(struct bufferevent *);
66 
67 static void be_filter_readcb(struct bufferevent *, void *);
68 static void be_filter_writecb(struct bufferevent *, void *);
69 static void be_filter_eventcb(struct bufferevent *, short, void *);
70 static int be_filter_flush(struct bufferevent *bufev,
71     short iotype, enum bufferevent_flush_mode mode);
72 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
73 
74 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
75     const struct evbuffer_cb_info *info, void *arg);
76 
77 struct bufferevent_filtered {
78 	struct bufferevent_private bev;
79 
80 	/** The bufferevent that we read/write filtered data from/to. */
81 	struct bufferevent *underlying;
82 	/** A callback on our outbuf to notice when somebody adds data */
83 	struct evbuffer_cb_entry *outbuf_cb;
84 	/** True iff we have received an EOF callback from the underlying
85 	 * bufferevent. */
86 	unsigned got_eof;
87 
88 	/** Function to free context when we're done. */
89 	void (*free_context)(void *);
90 	/** Input filter */
91 	bufferevent_filter_cb process_in;
92 	/** Output filter */
93 	bufferevent_filter_cb process_out;
94 	/** User-supplied argument to the filters. */
95 	void *context;
96 };
97 
98 const struct bufferevent_ops bufferevent_ops_filter = {
99 	"filter",
100 	evutil_offsetof(struct bufferevent_filtered, bev.bev),
101 	be_filter_enable,
102 	be_filter_disable,
103 	be_filter_destruct,
104 	_bufferevent_generic_adj_timeouts,
105 	be_filter_flush,
106 	be_filter_ctrl,
107 };
108 
109 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
110  * return that bufferevent_filtered. Returns NULL otherwise.*/
111 static inline struct bufferevent_filtered *
upcast(struct bufferevent * bev)112 upcast(struct bufferevent *bev)
113 {
114 	struct bufferevent_filtered *bev_f;
115 	if (bev->be_ops != &bufferevent_ops_filter)
116 		return NULL;
117 	bev_f = (void*)( ((char*)bev) -
118 			 evutil_offsetof(struct bufferevent_filtered, bev.bev));
119 	EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter);
120 	return bev_f;
121 }
122 
123 #define downcast(bev_f) (&(bev_f)->bev.bev)
124 
125 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
126  * over its high watermark such that we should not write to it in a given
127  * flush mode. */
128 static int
be_underlying_writebuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)129 be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
130     enum bufferevent_flush_mode state)
131 {
132 	struct bufferevent *u = bevf->underlying;
133 	return state == BEV_NORMAL &&
134 	    u->wm_write.high &&
135 	    evbuffer_get_length(u->output) >= u->wm_write.high;
136 }
137 
138 /** Return 1 if our input buffer is at or over its high watermark such that we
139  * should not write to it in a given flush mode. */
140 static int
be_readbuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)141 be_readbuf_full(struct bufferevent_filtered *bevf,
142     enum bufferevent_flush_mode state)
143 {
144 	struct bufferevent *bufev = downcast(bevf);
145 	return state == BEV_NORMAL &&
146 	    bufev->wm_read.high &&
147 	    evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
148 }
149 
150 
151 /* Filter to use when we're created with a NULL filter. */
152 static enum bufferevent_filter_result
be_null_filter(struct evbuffer * src,struct evbuffer * dst,ev_ssize_t lim,enum bufferevent_flush_mode state,void * ctx)153 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
154 	       enum bufferevent_flush_mode state, void *ctx)
155 {
156 	(void)state;
157 	if (evbuffer_remove_buffer(src, dst, lim) == 0)
158 		return BEV_OK;
159 	else
160 		return BEV_ERROR;
161 }
162 
163 struct bufferevent *
bufferevent_filter_new(struct bufferevent * underlying,bufferevent_filter_cb input_filter,bufferevent_filter_cb output_filter,int options,void (* free_context)(void *),void * ctx)164 bufferevent_filter_new(struct bufferevent *underlying,
165 		       bufferevent_filter_cb input_filter,
166 		       bufferevent_filter_cb output_filter,
167 		       int options,
168 		       void (*free_context)(void *),
169 		       void *ctx)
170 {
171 	struct bufferevent_filtered *bufev_f;
172 	int tmp_options = options & ~BEV_OPT_THREADSAFE;
173 
174 	if (!underlying)
175 		return NULL;
176 
177 	if (!input_filter)
178 		input_filter = be_null_filter;
179 	if (!output_filter)
180 		output_filter = be_null_filter;
181 
182 	bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
183 	if (!bufev_f)
184 		return NULL;
185 
186 	if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
187 				    &bufferevent_ops_filter, tmp_options) < 0) {
188 		mm_free(bufev_f);
189 		return NULL;
190 	}
191 	if (options & BEV_OPT_THREADSAFE) {
192 		bufferevent_enable_locking(downcast(bufev_f), NULL);
193 	}
194 
195 	bufev_f->underlying = underlying;
196 
197 	bufev_f->process_in = input_filter;
198 	bufev_f->process_out = output_filter;
199 	bufev_f->free_context = free_context;
200 	bufev_f->context = ctx;
201 
202 	bufferevent_setcb(bufev_f->underlying,
203 	    be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
204 
205 	bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
206 	   bufferevent_filtered_outbuf_cb, bufev_f);
207 
208 	_bufferevent_init_generic_timeout_cbs(downcast(bufev_f));
209 	bufferevent_incref(underlying);
210 
211 	bufferevent_enable(underlying, EV_READ|EV_WRITE);
212 	bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ);
213 
214 	return downcast(bufev_f);
215 }
216 
217 static void
be_filter_destruct(struct bufferevent * bev)218 be_filter_destruct(struct bufferevent *bev)
219 {
220 	struct bufferevent_filtered *bevf = upcast(bev);
221 	EVUTIL_ASSERT(bevf);
222 	if (bevf->free_context)
223 		bevf->free_context(bevf->context);
224 
225 	if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
226 		/* Yes, there is also a decref in bufferevent_decref.
227 		 * That decref corresponds to the incref when we set
228 		 * underlying for the first time.  This decref is an
229 		 * extra one to remove the last reference.
230 		 */
231 		if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
232 			event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
233 			    "bufferevent with too few references");
234 		} else {
235 			bufferevent_free(bevf->underlying);
236 		}
237 	} else {
238 		if (bevf->underlying) {
239 			if (bevf->underlying->errorcb == be_filter_eventcb)
240 				bufferevent_setcb(bevf->underlying,
241 				    NULL, NULL, NULL, NULL);
242 			bufferevent_unsuspend_read(bevf->underlying,
243 			    BEV_SUSPEND_FILT_READ);
244 		}
245 	}
246 
247 	_bufferevent_del_generic_timeout_cbs(bev);
248 }
249 
250 static int
be_filter_enable(struct bufferevent * bev,short event)251 be_filter_enable(struct bufferevent *bev, short event)
252 {
253 	struct bufferevent_filtered *bevf = upcast(bev);
254 	if (event & EV_WRITE)
255 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
256 
257 	if (event & EV_READ) {
258 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
259 		bufferevent_unsuspend_read(bevf->underlying,
260 		    BEV_SUSPEND_FILT_READ);
261 	}
262 	return 0;
263 }
264 
265 static int
be_filter_disable(struct bufferevent * bev,short event)266 be_filter_disable(struct bufferevent *bev, short event)
267 {
268 	struct bufferevent_filtered *bevf = upcast(bev);
269 	if (event & EV_WRITE)
270 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
271 	if (event & EV_READ) {
272 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
273 		bufferevent_suspend_read(bevf->underlying,
274 		    BEV_SUSPEND_FILT_READ);
275 	}
276 	return 0;
277 }
278 
279 static enum bufferevent_filter_result
be_filter_process_input(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)280 be_filter_process_input(struct bufferevent_filtered *bevf,
281 			enum bufferevent_flush_mode state,
282 			int *processed_out)
283 {
284 	enum bufferevent_filter_result res;
285 	struct bufferevent *bev = downcast(bevf);
286 
287 	if (state == BEV_NORMAL) {
288 		/* If we're in 'normal' mode, don't urge data on the filter
289 		 * unless we're reading data and under our high-water mark.*/
290 		if (!(bev->enabled & EV_READ) ||
291 		    be_readbuf_full(bevf, state))
292 			return BEV_OK;
293 	}
294 
295 	do {
296 		ev_ssize_t limit = -1;
297 		if (state == BEV_NORMAL && bev->wm_read.high)
298 			limit = bev->wm_read.high -
299 			    evbuffer_get_length(bev->input);
300 
301 		res = bevf->process_in(bevf->underlying->input,
302 		    bev->input, limit, state, bevf->context);
303 
304 		if (res == BEV_OK)
305 			*processed_out = 1;
306 	} while (res == BEV_OK &&
307 		 (bev->enabled & EV_READ) &&
308 		 evbuffer_get_length(bevf->underlying->input) &&
309 		 !be_readbuf_full(bevf, state));
310 
311 	if (*processed_out)
312 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
313 
314 	return res;
315 }
316 
317 
318 static enum bufferevent_filter_result
be_filter_process_output(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)319 be_filter_process_output(struct bufferevent_filtered *bevf,
320 			 enum bufferevent_flush_mode state,
321 			 int *processed_out)
322 {
323 	/* Requires references and lock: might call writecb */
324 	enum bufferevent_filter_result res = BEV_OK;
325 	struct bufferevent *bufev = downcast(bevf);
326 	int again = 0;
327 
328 	if (state == BEV_NORMAL) {
329 		/* If we're in 'normal' mode, don't urge data on the
330 		 * filter unless we're writing data, and the underlying
331 		 * bufferevent is accepting data, and we have data to
332 		 * give the filter.  If we're in 'flush' or 'finish',
333 		 * call the filter no matter what. */
334 		if (!(bufev->enabled & EV_WRITE) ||
335 		    be_underlying_writebuf_full(bevf, state) ||
336 		    !evbuffer_get_length(bufev->output))
337 			return BEV_OK;
338 	}
339 
340 	/* disable the callback that calls this function
341 	   when the user adds to the output buffer. */
342 	evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0);
343 
344 	do {
345 		int processed = 0;
346 		again = 0;
347 
348 		do {
349 			ev_ssize_t limit = -1;
350 			if (state == BEV_NORMAL &&
351 			    bevf->underlying->wm_write.high)
352 				limit = bevf->underlying->wm_write.high -
353 				    evbuffer_get_length(bevf->underlying->output);
354 
355 			res = bevf->process_out(downcast(bevf)->output,
356 			    bevf->underlying->output,
357 			    limit,
358 			    state,
359 			    bevf->context);
360 
361 			if (res == BEV_OK)
362 				processed = *processed_out = 1;
363 		} while (/* Stop if the filter wasn't successful...*/
364 			res == BEV_OK &&
365 			/* Or if we aren't writing any more. */
366 			(bufev->enabled & EV_WRITE) &&
367 			/* Of if we have nothing more to write and we are
368 			 * not flushing. */
369 			evbuffer_get_length(bufev->output) &&
370 			/* Or if we have filled the underlying output buffer. */
371 			!be_underlying_writebuf_full(bevf,state));
372 
373 		if (processed &&
374 		    evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
375 			/* call the write callback.*/
376 			_bufferevent_run_writecb(bufev);
377 
378 			if (res == BEV_OK &&
379 			    (bufev->enabled & EV_WRITE) &&
380 			    evbuffer_get_length(bufev->output) &&
381 			    !be_underlying_writebuf_full(bevf, state)) {
382 				again = 1;
383 			}
384 		}
385 	} while (again);
386 
387 	/* reenable the outbuf_cb */
388 	evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
389 	    EVBUFFER_CB_ENABLED);
390 
391 	if (*processed_out)
392 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
393 
394 	return res;
395 }
396 
397 /* Called when the size of our outbuf changes. */
398 static void
bufferevent_filtered_outbuf_cb(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)399 bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
400     const struct evbuffer_cb_info *cbinfo, void *arg)
401 {
402 	struct bufferevent_filtered *bevf = arg;
403 	struct bufferevent *bev = downcast(bevf);
404 
405 	if (cbinfo->n_added) {
406 		int processed_any = 0;
407 		/* Somebody added more data to the output buffer. Try to
408 		 * process it, if we should. */
409 		_bufferevent_incref_and_lock(bev);
410 		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
411 		_bufferevent_decref_and_unlock(bev);
412 	}
413 }
414 
415 /* Called when the underlying socket has read. */
416 static void
be_filter_readcb(struct bufferevent * underlying,void * _me)417 be_filter_readcb(struct bufferevent *underlying, void *_me)
418 {
419 	struct bufferevent_filtered *bevf = _me;
420 	enum bufferevent_filter_result res;
421 	enum bufferevent_flush_mode state;
422 	struct bufferevent *bufev = downcast(bevf);
423 	int processed_any = 0;
424 
425 	_bufferevent_incref_and_lock(bufev);
426 
427 	if (bevf->got_eof)
428 		state = BEV_FINISHED;
429 	else
430 		state = BEV_NORMAL;
431 
432 	/* XXXX use return value */
433 	res = be_filter_process_input(bevf, state, &processed_any);
434 	(void)res;
435 
436 	/* XXX This should be in process_input, not here.  There are
437 	 * other places that can call process-input, and they should
438 	 * force readcb calls as needed. */
439 	if (processed_any &&
440 	    evbuffer_get_length(bufev->input) >= bufev->wm_read.low)
441 		_bufferevent_run_readcb(bufev);
442 
443 	_bufferevent_decref_and_unlock(bufev);
444 }
445 
446 /* Called when the underlying socket has drained enough that we can write to
447    it. */
448 static void
be_filter_writecb(struct bufferevent * underlying,void * _me)449 be_filter_writecb(struct bufferevent *underlying, void *_me)
450 {
451 	struct bufferevent_filtered *bevf = _me;
452 	struct bufferevent *bev = downcast(bevf);
453 	int processed_any = 0;
454 
455 	_bufferevent_incref_and_lock(bev);
456 	be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
457 	_bufferevent_decref_and_unlock(bev);
458 }
459 
460 /* Called when the underlying socket has given us an error */
461 static void
be_filter_eventcb(struct bufferevent * underlying,short what,void * _me)462 be_filter_eventcb(struct bufferevent *underlying, short what, void *_me)
463 {
464 	struct bufferevent_filtered *bevf = _me;
465 	struct bufferevent *bev = downcast(bevf);
466 
467 	_bufferevent_incref_and_lock(bev);
468 	/* All we can really to is tell our own eventcb. */
469 	_bufferevent_run_eventcb(bev, what);
470 	_bufferevent_decref_and_unlock(bev);
471 }
472 
473 static int
be_filter_flush(struct bufferevent * bufev,short iotype,enum bufferevent_flush_mode mode)474 be_filter_flush(struct bufferevent *bufev,
475     short iotype, enum bufferevent_flush_mode mode)
476 {
477 	struct bufferevent_filtered *bevf = upcast(bufev);
478 	int processed_any = 0;
479 	EVUTIL_ASSERT(bevf);
480 
481 	_bufferevent_incref_and_lock(bufev);
482 
483 	if (iotype & EV_READ) {
484 		be_filter_process_input(bevf, mode, &processed_any);
485 	}
486 	if (iotype & EV_WRITE) {
487 		be_filter_process_output(bevf, mode, &processed_any);
488 	}
489 	/* XXX check the return value? */
490 	/* XXX does this want to recursively call lower-level flushes? */
491 	bufferevent_flush(bevf->underlying, iotype, mode);
492 
493 	_bufferevent_decref_and_unlock(bufev);
494 
495 	return processed_any;
496 }
497 
498 static int
be_filter_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)499 be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
500     union bufferevent_ctrl_data *data)
501 {
502 	struct bufferevent_filtered *bevf;
503 	switch (op) {
504 	case BEV_CTRL_GET_UNDERLYING:
505 		bevf = upcast(bev);
506 		data->ptr = bevf->underlying;
507 		return 0;
508 	case BEV_CTRL_GET_FD:
509 	case BEV_CTRL_SET_FD:
510 	case BEV_CTRL_CANCEL_ALL:
511 	default:
512 		return -1;
513 	}
514 }
515