1 /* $NetBSD: bufferevent_filter.c,v 1.1.1.1 2013/04/11 16:43:26 christos Exp $ */
2 /*
3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <sys/types.h>
31
32 #include "event2/event-config.h"
33 #include <sys/cdefs.h>
34 __RCSID("$NetBSD: bufferevent_filter.c,v 1.1.1.1 2013/04/11 16:43:26 christos Exp $");
35
36 #ifdef _EVENT_HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #ifdef _EVENT_HAVE_STDARG_H
45 #include <stdarg.h>
46 #endif
47
48 #ifdef WIN32
49 #include <winsock2.h>
50 #endif
51
52 #include "event2/util.h"
53 #include "event2/bufferevent.h"
54 #include "event2/buffer.h"
55 #include "event2/bufferevent_struct.h"
56 #include "event2/event.h"
57 #include "log-internal.h"
58 #include "mm-internal.h"
59 #include "bufferevent-internal.h"
60 #include "util-internal.h"
61
62 /* prototypes */
63 static int be_filter_enable(struct bufferevent *, short);
64 static int be_filter_disable(struct bufferevent *, short);
65 static void be_filter_destruct(struct bufferevent *);
66
67 static void be_filter_readcb(struct bufferevent *, void *);
68 static void be_filter_writecb(struct bufferevent *, void *);
69 static void be_filter_eventcb(struct bufferevent *, short, void *);
70 static int be_filter_flush(struct bufferevent *bufev,
71 short iotype, enum bufferevent_flush_mode mode);
72 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
73
74 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
75 const struct evbuffer_cb_info *info, void *arg);
76
77 struct bufferevent_filtered {
78 struct bufferevent_private bev;
79
80 /** The bufferevent that we read/write filtered data from/to. */
81 struct bufferevent *underlying;
82 /** A callback on our outbuf to notice when somebody adds data */
83 struct evbuffer_cb_entry *outbuf_cb;
84 /** True iff we have received an EOF callback from the underlying
85 * bufferevent. */
86 unsigned got_eof;
87
88 /** Function to free context when we're done. */
89 void (*free_context)(void *);
90 /** Input filter */
91 bufferevent_filter_cb process_in;
92 /** Output filter */
93 bufferevent_filter_cb process_out;
94 /** User-supplied argument to the filters. */
95 void *context;
96 };
97
98 const struct bufferevent_ops bufferevent_ops_filter = {
99 "filter",
100 evutil_offsetof(struct bufferevent_filtered, bev.bev),
101 be_filter_enable,
102 be_filter_disable,
103 be_filter_destruct,
104 _bufferevent_generic_adj_timeouts,
105 be_filter_flush,
106 be_filter_ctrl,
107 };
108
109 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
110 * return that bufferevent_filtered. Returns NULL otherwise.*/
111 static inline struct bufferevent_filtered *
upcast(struct bufferevent * bev)112 upcast(struct bufferevent *bev)
113 {
114 struct bufferevent_filtered *bev_f;
115 if (bev->be_ops != &bufferevent_ops_filter)
116 return NULL;
117 bev_f = (void*)( ((char*)bev) -
118 evutil_offsetof(struct bufferevent_filtered, bev.bev));
119 EVUTIL_ASSERT(bev_f->bev.bev.be_ops == &bufferevent_ops_filter);
120 return bev_f;
121 }
122
123 #define downcast(bev_f) (&(bev_f)->bev.bev)
124
125 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
126 * over its high watermark such that we should not write to it in a given
127 * flush mode. */
128 static int
be_underlying_writebuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)129 be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
130 enum bufferevent_flush_mode state)
131 {
132 struct bufferevent *u = bevf->underlying;
133 return state == BEV_NORMAL &&
134 u->wm_write.high &&
135 evbuffer_get_length(u->output) >= u->wm_write.high;
136 }
137
138 /** Return 1 if our input buffer is at or over its high watermark such that we
139 * should not write to it in a given flush mode. */
140 static int
be_readbuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)141 be_readbuf_full(struct bufferevent_filtered *bevf,
142 enum bufferevent_flush_mode state)
143 {
144 struct bufferevent *bufev = downcast(bevf);
145 return state == BEV_NORMAL &&
146 bufev->wm_read.high &&
147 evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
148 }
149
150
151 /* Filter to use when we're created with a NULL filter. */
152 static enum bufferevent_filter_result
be_null_filter(struct evbuffer * src,struct evbuffer * dst,ev_ssize_t lim,enum bufferevent_flush_mode state,void * ctx)153 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
154 enum bufferevent_flush_mode state, void *ctx)
155 {
156 (void)state;
157 if (evbuffer_remove_buffer(src, dst, lim) == 0)
158 return BEV_OK;
159 else
160 return BEV_ERROR;
161 }
162
163 struct bufferevent *
bufferevent_filter_new(struct bufferevent * underlying,bufferevent_filter_cb input_filter,bufferevent_filter_cb output_filter,int options,void (* free_context)(void *),void * ctx)164 bufferevent_filter_new(struct bufferevent *underlying,
165 bufferevent_filter_cb input_filter,
166 bufferevent_filter_cb output_filter,
167 int options,
168 void (*free_context)(void *),
169 void *ctx)
170 {
171 struct bufferevent_filtered *bufev_f;
172 int tmp_options = options & ~BEV_OPT_THREADSAFE;
173
174 if (!underlying)
175 return NULL;
176
177 if (!input_filter)
178 input_filter = be_null_filter;
179 if (!output_filter)
180 output_filter = be_null_filter;
181
182 bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
183 if (!bufev_f)
184 return NULL;
185
186 if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base,
187 &bufferevent_ops_filter, tmp_options) < 0) {
188 mm_free(bufev_f);
189 return NULL;
190 }
191 if (options & BEV_OPT_THREADSAFE) {
192 bufferevent_enable_locking(downcast(bufev_f), NULL);
193 }
194
195 bufev_f->underlying = underlying;
196
197 bufev_f->process_in = input_filter;
198 bufev_f->process_out = output_filter;
199 bufev_f->free_context = free_context;
200 bufev_f->context = ctx;
201
202 bufferevent_setcb(bufev_f->underlying,
203 be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
204
205 bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
206 bufferevent_filtered_outbuf_cb, bufev_f);
207
208 _bufferevent_init_generic_timeout_cbs(downcast(bufev_f));
209 bufferevent_incref(underlying);
210
211 bufferevent_enable(underlying, EV_READ|EV_WRITE);
212 bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ);
213
214 return downcast(bufev_f);
215 }
216
217 static void
be_filter_destruct(struct bufferevent * bev)218 be_filter_destruct(struct bufferevent *bev)
219 {
220 struct bufferevent_filtered *bevf = upcast(bev);
221 EVUTIL_ASSERT(bevf);
222 if (bevf->free_context)
223 bevf->free_context(bevf->context);
224
225 if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
226 /* Yes, there is also a decref in bufferevent_decref.
227 * That decref corresponds to the incref when we set
228 * underlying for the first time. This decref is an
229 * extra one to remove the last reference.
230 */
231 if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
232 event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
233 "bufferevent with too few references");
234 } else {
235 bufferevent_free(bevf->underlying);
236 }
237 } else {
238 if (bevf->underlying) {
239 if (bevf->underlying->errorcb == be_filter_eventcb)
240 bufferevent_setcb(bevf->underlying,
241 NULL, NULL, NULL, NULL);
242 bufferevent_unsuspend_read(bevf->underlying,
243 BEV_SUSPEND_FILT_READ);
244 }
245 }
246
247 _bufferevent_del_generic_timeout_cbs(bev);
248 }
249
250 static int
be_filter_enable(struct bufferevent * bev,short event)251 be_filter_enable(struct bufferevent *bev, short event)
252 {
253 struct bufferevent_filtered *bevf = upcast(bev);
254 if (event & EV_WRITE)
255 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
256
257 if (event & EV_READ) {
258 BEV_RESET_GENERIC_READ_TIMEOUT(bev);
259 bufferevent_unsuspend_read(bevf->underlying,
260 BEV_SUSPEND_FILT_READ);
261 }
262 return 0;
263 }
264
265 static int
be_filter_disable(struct bufferevent * bev,short event)266 be_filter_disable(struct bufferevent *bev, short event)
267 {
268 struct bufferevent_filtered *bevf = upcast(bev);
269 if (event & EV_WRITE)
270 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
271 if (event & EV_READ) {
272 BEV_DEL_GENERIC_READ_TIMEOUT(bev);
273 bufferevent_suspend_read(bevf->underlying,
274 BEV_SUSPEND_FILT_READ);
275 }
276 return 0;
277 }
278
279 static enum bufferevent_filter_result
be_filter_process_input(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)280 be_filter_process_input(struct bufferevent_filtered *bevf,
281 enum bufferevent_flush_mode state,
282 int *processed_out)
283 {
284 enum bufferevent_filter_result res;
285 struct bufferevent *bev = downcast(bevf);
286
287 if (state == BEV_NORMAL) {
288 /* If we're in 'normal' mode, don't urge data on the filter
289 * unless we're reading data and under our high-water mark.*/
290 if (!(bev->enabled & EV_READ) ||
291 be_readbuf_full(bevf, state))
292 return BEV_OK;
293 }
294
295 do {
296 ev_ssize_t limit = -1;
297 if (state == BEV_NORMAL && bev->wm_read.high)
298 limit = bev->wm_read.high -
299 evbuffer_get_length(bev->input);
300
301 res = bevf->process_in(bevf->underlying->input,
302 bev->input, limit, state, bevf->context);
303
304 if (res == BEV_OK)
305 *processed_out = 1;
306 } while (res == BEV_OK &&
307 (bev->enabled & EV_READ) &&
308 evbuffer_get_length(bevf->underlying->input) &&
309 !be_readbuf_full(bevf, state));
310
311 if (*processed_out)
312 BEV_RESET_GENERIC_READ_TIMEOUT(bev);
313
314 return res;
315 }
316
317
318 static enum bufferevent_filter_result
be_filter_process_output(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)319 be_filter_process_output(struct bufferevent_filtered *bevf,
320 enum bufferevent_flush_mode state,
321 int *processed_out)
322 {
323 /* Requires references and lock: might call writecb */
324 enum bufferevent_filter_result res = BEV_OK;
325 struct bufferevent *bufev = downcast(bevf);
326 int again = 0;
327
328 if (state == BEV_NORMAL) {
329 /* If we're in 'normal' mode, don't urge data on the
330 * filter unless we're writing data, and the underlying
331 * bufferevent is accepting data, and we have data to
332 * give the filter. If we're in 'flush' or 'finish',
333 * call the filter no matter what. */
334 if (!(bufev->enabled & EV_WRITE) ||
335 be_underlying_writebuf_full(bevf, state) ||
336 !evbuffer_get_length(bufev->output))
337 return BEV_OK;
338 }
339
340 /* disable the callback that calls this function
341 when the user adds to the output buffer. */
342 evbuffer_cb_set_flags(bufev->output, bevf->outbuf_cb, 0);
343
344 do {
345 int processed = 0;
346 again = 0;
347
348 do {
349 ev_ssize_t limit = -1;
350 if (state == BEV_NORMAL &&
351 bevf->underlying->wm_write.high)
352 limit = bevf->underlying->wm_write.high -
353 evbuffer_get_length(bevf->underlying->output);
354
355 res = bevf->process_out(downcast(bevf)->output,
356 bevf->underlying->output,
357 limit,
358 state,
359 bevf->context);
360
361 if (res == BEV_OK)
362 processed = *processed_out = 1;
363 } while (/* Stop if the filter wasn't successful...*/
364 res == BEV_OK &&
365 /* Or if we aren't writing any more. */
366 (bufev->enabled & EV_WRITE) &&
367 /* Of if we have nothing more to write and we are
368 * not flushing. */
369 evbuffer_get_length(bufev->output) &&
370 /* Or if we have filled the underlying output buffer. */
371 !be_underlying_writebuf_full(bevf,state));
372
373 if (processed &&
374 evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
375 /* call the write callback.*/
376 _bufferevent_run_writecb(bufev);
377
378 if (res == BEV_OK &&
379 (bufev->enabled & EV_WRITE) &&
380 evbuffer_get_length(bufev->output) &&
381 !be_underlying_writebuf_full(bevf, state)) {
382 again = 1;
383 }
384 }
385 } while (again);
386
387 /* reenable the outbuf_cb */
388 evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
389 EVBUFFER_CB_ENABLED);
390
391 if (*processed_out)
392 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
393
394 return res;
395 }
396
397 /* Called when the size of our outbuf changes. */
398 static void
bufferevent_filtered_outbuf_cb(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)399 bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
400 const struct evbuffer_cb_info *cbinfo, void *arg)
401 {
402 struct bufferevent_filtered *bevf = arg;
403 struct bufferevent *bev = downcast(bevf);
404
405 if (cbinfo->n_added) {
406 int processed_any = 0;
407 /* Somebody added more data to the output buffer. Try to
408 * process it, if we should. */
409 _bufferevent_incref_and_lock(bev);
410 be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
411 _bufferevent_decref_and_unlock(bev);
412 }
413 }
414
415 /* Called when the underlying socket has read. */
416 static void
be_filter_readcb(struct bufferevent * underlying,void * _me)417 be_filter_readcb(struct bufferevent *underlying, void *_me)
418 {
419 struct bufferevent_filtered *bevf = _me;
420 enum bufferevent_filter_result res;
421 enum bufferevent_flush_mode state;
422 struct bufferevent *bufev = downcast(bevf);
423 int processed_any = 0;
424
425 _bufferevent_incref_and_lock(bufev);
426
427 if (bevf->got_eof)
428 state = BEV_FINISHED;
429 else
430 state = BEV_NORMAL;
431
432 /* XXXX use return value */
433 res = be_filter_process_input(bevf, state, &processed_any);
434 (void)res;
435
436 /* XXX This should be in process_input, not here. There are
437 * other places that can call process-input, and they should
438 * force readcb calls as needed. */
439 if (processed_any &&
440 evbuffer_get_length(bufev->input) >= bufev->wm_read.low)
441 _bufferevent_run_readcb(bufev);
442
443 _bufferevent_decref_and_unlock(bufev);
444 }
445
446 /* Called when the underlying socket has drained enough that we can write to
447 it. */
448 static void
be_filter_writecb(struct bufferevent * underlying,void * _me)449 be_filter_writecb(struct bufferevent *underlying, void *_me)
450 {
451 struct bufferevent_filtered *bevf = _me;
452 struct bufferevent *bev = downcast(bevf);
453 int processed_any = 0;
454
455 _bufferevent_incref_and_lock(bev);
456 be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
457 _bufferevent_decref_and_unlock(bev);
458 }
459
460 /* Called when the underlying socket has given us an error */
461 static void
be_filter_eventcb(struct bufferevent * underlying,short what,void * _me)462 be_filter_eventcb(struct bufferevent *underlying, short what, void *_me)
463 {
464 struct bufferevent_filtered *bevf = _me;
465 struct bufferevent *bev = downcast(bevf);
466
467 _bufferevent_incref_and_lock(bev);
468 /* All we can really to is tell our own eventcb. */
469 _bufferevent_run_eventcb(bev, what);
470 _bufferevent_decref_and_unlock(bev);
471 }
472
473 static int
be_filter_flush(struct bufferevent * bufev,short iotype,enum bufferevent_flush_mode mode)474 be_filter_flush(struct bufferevent *bufev,
475 short iotype, enum bufferevent_flush_mode mode)
476 {
477 struct bufferevent_filtered *bevf = upcast(bufev);
478 int processed_any = 0;
479 EVUTIL_ASSERT(bevf);
480
481 _bufferevent_incref_and_lock(bufev);
482
483 if (iotype & EV_READ) {
484 be_filter_process_input(bevf, mode, &processed_any);
485 }
486 if (iotype & EV_WRITE) {
487 be_filter_process_output(bevf, mode, &processed_any);
488 }
489 /* XXX check the return value? */
490 /* XXX does this want to recursively call lower-level flushes? */
491 bufferevent_flush(bevf->underlying, iotype, mode);
492
493 _bufferevent_decref_and_unlock(bufev);
494
495 return processed_any;
496 }
497
498 static int
be_filter_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)499 be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
500 union bufferevent_ctrl_data *data)
501 {
502 struct bufferevent_filtered *bevf;
503 switch (op) {
504 case BEV_CTRL_GET_UNDERLYING:
505 bevf = upcast(bev);
506 data->ptr = bevf->underlying;
507 return 0;
508 case BEV_CTRL_GET_FD:
509 case BEV_CTRL_SET_FD:
510 case BEV_CTRL_CANCEL_ALL:
511 default:
512 return -1;
513 }
514 }
515