1 /* 2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include "evconfig-private.h" 30 31 #include <sys/types.h> 32 33 #include "event2/event-config.h" 34 35 #ifdef EVENT__HAVE_SYS_TIME_H 36 #include <sys/time.h> 37 #endif 38 39 #include <errno.h> 40 #include <stdio.h> 41 #include <stdlib.h> 42 #include <string.h> 43 #ifdef EVENT__HAVE_STDARG_H 44 #include <stdarg.h> 45 #endif 46 47 #ifdef _WIN32 48 #include <winsock2.h> 49 #endif 50 51 #include "event2/util.h" 52 #include "event2/bufferevent.h" 53 #include "event2/buffer.h" 54 #include "event2/bufferevent_struct.h" 55 #include "event2/event.h" 56 #include "log-internal.h" 57 #include "mm-internal.h" 58 #include "bufferevent-internal.h" 59 #include "util-internal.h" 60 61 /* prototypes */ 62 static int be_filter_enable(struct bufferevent *, short); 63 static int be_filter_disable(struct bufferevent *, short); 64 static void be_filter_unlink(struct bufferevent *); 65 static void be_filter_destruct(struct bufferevent *); 66 67 static void be_filter_readcb(struct bufferevent *, void *); 68 static void be_filter_writecb(struct bufferevent *, void *); 69 static void be_filter_eventcb(struct bufferevent *, short, void *); 70 static int be_filter_flush(struct bufferevent *bufev, 71 short iotype, enum bufferevent_flush_mode mode); 72 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 73 74 static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf, 75 const struct evbuffer_cb_info *cbinfo, void *arg); 76 77 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, 78 const struct evbuffer_cb_info *info, void *arg); 79 80 struct bufferevent_filtered { 81 struct bufferevent_private bev; 82 83 /** The bufferevent that we read/write filtered data from/to. */ 84 struct bufferevent *underlying; 85 /** A callback on our inbuf to notice somebory removes data */ 86 struct evbuffer_cb_entry *inbuf_cb; 87 /** A callback on our outbuf to notice when somebody adds data */ 88 struct evbuffer_cb_entry *outbuf_cb; 89 /** True iff we have received an EOF callback from the underlying 90 * bufferevent. */ 91 unsigned got_eof; 92 93 /** Function to free context when we're done. */ 94 void (*free_context)(void *); 95 /** Input filter */ 96 bufferevent_filter_cb process_in; 97 /** Output filter */ 98 bufferevent_filter_cb process_out; 99 /** User-supplied argument to the filters. */ 100 void *context; 101 }; 102 103 const struct bufferevent_ops bufferevent_ops_filter = { 104 "filter", 105 evutil_offsetof(struct bufferevent_filtered, bev.bev), 106 be_filter_enable, 107 be_filter_disable, 108 be_filter_unlink, 109 be_filter_destruct, 110 bufferevent_generic_adj_timeouts_, 111 be_filter_flush, 112 be_filter_ctrl, 113 }; 114 115 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered, 116 * return that bufferevent_filtered. Returns NULL otherwise.*/ 117 static inline struct bufferevent_filtered * 118 upcast(struct bufferevent *bev) 119 { 120 struct bufferevent_filtered *bev_f; 121 if (!BEV_IS_FILTER(bev)) 122 return NULL; 123 bev_f = (void*)( ((char*)bev) - 124 evutil_offsetof(struct bufferevent_filtered, bev.bev)); 125 EVUTIL_ASSERT(BEV_IS_FILTER(&bev_f->bev.bev)); 126 return bev_f; 127 } 128 129 #define downcast(bev_f) (&(bev_f)->bev.bev) 130 131 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or 132 * over its high watermark such that we should not write to it in a given 133 * flush mode. */ 134 static int 135 be_underlying_writebuf_full(struct bufferevent_filtered *bevf, 136 enum bufferevent_flush_mode state) 137 { 138 struct bufferevent *u = bevf->underlying; 139 return state == BEV_NORMAL && 140 u->wm_write.high && 141 evbuffer_get_length(u->output) >= u->wm_write.high; 142 } 143 144 /** Return 1 if our input buffer is at or over its high watermark such that we 145 * should not write to it in a given flush mode. */ 146 static int 147 be_readbuf_full(struct bufferevent_filtered *bevf, 148 enum bufferevent_flush_mode state) 149 { 150 struct bufferevent *bufev = downcast(bevf); 151 return state == BEV_NORMAL && 152 bufev->wm_read.high && 153 evbuffer_get_length(bufev->input) >= bufev->wm_read.high; 154 } 155 156 157 /* Filter to use when we're created with a NULL filter. */ 158 static enum bufferevent_filter_result 159 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim, 160 enum bufferevent_flush_mode state, void *ctx) 161 { 162 (void)state; 163 if (evbuffer_remove_buffer(src, dst, lim) >= 0) 164 return BEV_OK; 165 else 166 return BEV_ERROR; 167 } 168 169 struct bufferevent * 170 bufferevent_filter_new(struct bufferevent *underlying, 171 bufferevent_filter_cb input_filter, 172 bufferevent_filter_cb output_filter, 173 int options, 174 void (*free_context)(void *), 175 void *ctx) 176 { 177 struct bufferevent_filtered *bufev_f; 178 int tmp_options = options & ~BEV_OPT_THREADSAFE; 179 180 if (!underlying) 181 return NULL; 182 183 if (!input_filter) 184 input_filter = be_null_filter; 185 if (!output_filter) 186 output_filter = be_null_filter; 187 188 bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered)); 189 if (!bufev_f) 190 return NULL; 191 192 if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base, 193 &bufferevent_ops_filter, tmp_options) < 0) { 194 mm_free(bufev_f); 195 return NULL; 196 } 197 if (options & BEV_OPT_THREADSAFE) { 198 bufferevent_enable_locking_(downcast(bufev_f), NULL); 199 } 200 201 bufev_f->underlying = underlying; 202 203 bufev_f->process_in = input_filter; 204 bufev_f->process_out = output_filter; 205 bufev_f->free_context = free_context; 206 bufev_f->context = ctx; 207 208 bufferevent_setcb(bufev_f->underlying, 209 be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f); 210 211 bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input, 212 bufferevent_filtered_inbuf_cb, bufev_f); 213 evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb, 214 EVBUFFER_CB_ENABLED); 215 216 bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, 217 bufferevent_filtered_outbuf_cb, bufev_f); 218 219 bufferevent_init_generic_timeout_cbs_(downcast(bufev_f)); 220 bufferevent_incref_(underlying); 221 222 bufferevent_enable(underlying, EV_READ|EV_WRITE); 223 bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ); 224 225 return downcast(bufev_f); 226 } 227 228 static void 229 be_filter_unlink(struct bufferevent *bev) 230 { 231 struct bufferevent_filtered *bevf = upcast(bev); 232 EVUTIL_ASSERT(bevf); 233 234 if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) { 235 /* Yes, there is also a decref in bufferevent_decref_. 236 * That decref corresponds to the incref when we set 237 * underlying for the first time. This decref is an 238 * extra one to remove the last reference. 239 */ 240 if (BEV_UPCAST(bevf->underlying)->refcnt < 2) { 241 event_warnx("BEV_OPT_CLOSE_ON_FREE set on an " 242 "bufferevent with too few references"); 243 } else { 244 bufferevent_free(bevf->underlying); 245 } 246 } else { 247 if (bevf->underlying) { 248 if (bevf->underlying->errorcb == be_filter_eventcb) 249 bufferevent_setcb(bevf->underlying, 250 NULL, NULL, NULL, NULL); 251 bufferevent_unsuspend_read_(bevf->underlying, 252 BEV_SUSPEND_FILT_READ); 253 } 254 } 255 } 256 257 static void 258 be_filter_destruct(struct bufferevent *bev) 259 { 260 struct bufferevent_filtered *bevf = upcast(bev); 261 EVUTIL_ASSERT(bevf); 262 if (bevf->free_context) 263 bevf->free_context(bevf->context); 264 265 if (bevf->inbuf_cb) 266 evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb); 267 268 if (bevf->outbuf_cb) 269 evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb); 270 } 271 272 static int 273 be_filter_enable(struct bufferevent *bev, short event) 274 { 275 struct bufferevent_filtered *bevf = upcast(bev); 276 if (event & EV_WRITE) 277 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 278 279 if (event & EV_READ) { 280 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 281 bufferevent_unsuspend_read_(bevf->underlying, 282 BEV_SUSPEND_FILT_READ); 283 } 284 return 0; 285 } 286 287 static int 288 be_filter_disable(struct bufferevent *bev, short event) 289 { 290 struct bufferevent_filtered *bevf = upcast(bev); 291 if (event & EV_WRITE) 292 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 293 if (event & EV_READ) { 294 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 295 bufferevent_suspend_read_(bevf->underlying, 296 BEV_SUSPEND_FILT_READ); 297 } 298 return 0; 299 } 300 301 static enum bufferevent_filter_result 302 be_filter_process_input(struct bufferevent_filtered *bevf, 303 enum bufferevent_flush_mode state, 304 int *processed_out) 305 { 306 enum bufferevent_filter_result res; 307 struct bufferevent *bev = downcast(bevf); 308 309 if (state == BEV_NORMAL) { 310 /* If we're in 'normal' mode, don't urge data on the filter 311 * unless we're reading data and under our high-water mark.*/ 312 if (!(bev->enabled & EV_READ) || 313 be_readbuf_full(bevf, state)) 314 return BEV_OK; 315 } 316 317 do { 318 ev_ssize_t limit = -1; 319 if (state == BEV_NORMAL && bev->wm_read.high) 320 limit = bev->wm_read.high - 321 evbuffer_get_length(bev->input); 322 323 res = bevf->process_in(bevf->underlying->input, 324 bev->input, limit, state, bevf->context); 325 326 if (res == BEV_OK) 327 *processed_out = 1; 328 } while (res == BEV_OK && 329 (bev->enabled & EV_READ) && 330 evbuffer_get_length(bevf->underlying->input) && 331 !be_readbuf_full(bevf, state)); 332 333 if (*processed_out) 334 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 335 336 return res; 337 } 338 339 340 static enum bufferevent_filter_result 341 be_filter_process_output(struct bufferevent_filtered *bevf, 342 enum bufferevent_flush_mode state, 343 int *processed_out) 344 { 345 /* Requires references and lock: might call writecb */ 346 enum bufferevent_filter_result res = BEV_OK; 347 struct bufferevent *bufev = downcast(bevf); 348 int again = 0; 349 350 if (state == BEV_NORMAL) { 351 /* If we're in 'normal' mode, don't urge data on the 352 * filter unless we're writing data, and the underlying 353 * bufferevent is accepting data, and we have data to 354 * give the filter. If we're in 'flush' or 'finish', 355 * call the filter no matter what. */ 356 if (!(bufev->enabled & EV_WRITE) || 357 be_underlying_writebuf_full(bevf, state) || 358 !evbuffer_get_length(bufev->output)) 359 return BEV_OK; 360 } 361 362 /* disable the callback that calls this function 363 when the user adds to the output buffer. */ 364 evbuffer_cb_clear_flags(bufev->output, bevf->outbuf_cb, 365 EVBUFFER_CB_ENABLED); 366 367 do { 368 int processed = 0; 369 again = 0; 370 371 do { 372 ev_ssize_t limit = -1; 373 if (state == BEV_NORMAL && 374 bevf->underlying->wm_write.high) 375 limit = bevf->underlying->wm_write.high - 376 evbuffer_get_length(bevf->underlying->output); 377 378 res = bevf->process_out(downcast(bevf)->output, 379 bevf->underlying->output, 380 limit, 381 state, 382 bevf->context); 383 384 if (res == BEV_OK) 385 processed = *processed_out = 1; 386 } while (/* Stop if the filter wasn't successful...*/ 387 res == BEV_OK && 388 /* Or if we aren't writing any more. */ 389 (bufev->enabled & EV_WRITE) && 390 /* Of if we have nothing more to write and we are 391 * not flushing. */ 392 evbuffer_get_length(bufev->output) && 393 /* Or if we have filled the underlying output buffer. */ 394 !be_underlying_writebuf_full(bevf,state)); 395 396 if (processed) { 397 /* call the write callback.*/ 398 bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); 399 400 if (res == BEV_OK && 401 (bufev->enabled & EV_WRITE) && 402 evbuffer_get_length(bufev->output) && 403 !be_underlying_writebuf_full(bevf, state)) { 404 again = 1; 405 } 406 } 407 } while (again); 408 409 /* reenable the outbuf_cb */ 410 evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb, 411 EVBUFFER_CB_ENABLED); 412 413 if (*processed_out) 414 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 415 416 return res; 417 } 418 419 /* Called when the size of our outbuf changes. */ 420 static void 421 bufferevent_filtered_outbuf_cb(struct evbuffer *buf, 422 const struct evbuffer_cb_info *cbinfo, void *arg) 423 { 424 struct bufferevent_filtered *bevf = arg; 425 struct bufferevent *bev = downcast(bevf); 426 427 if (cbinfo->n_added) { 428 int processed_any = 0; 429 /* Somebody added more data to the output buffer. Try to 430 * process it, if we should. */ 431 bufferevent_incref_and_lock_(bev); 432 be_filter_process_output(bevf, BEV_NORMAL, &processed_any); 433 bufferevent_decref_and_unlock_(bev); 434 } 435 } 436 437 static void 438 be_filter_read_nolock_(struct bufferevent *underlying, void *me_) 439 { 440 struct bufferevent_filtered *bevf = me_; 441 enum bufferevent_filter_result res; 442 enum bufferevent_flush_mode state; 443 struct bufferevent *bufev = downcast(bevf); 444 struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); 445 int processed_any = 0; 446 447 // It's possible our refcount is 0 at this point if another thread free'd our filterevent 448 EVUTIL_ASSERT(bufev_private->refcnt >= 0); 449 450 // If our refcount is > 0 451 if (bufev_private->refcnt > 0) { 452 453 if (bevf->got_eof) 454 state = BEV_FINISHED; 455 else 456 state = BEV_NORMAL; 457 458 /* XXXX use return value */ 459 res = be_filter_process_input(bevf, state, &processed_any); 460 (void)res; 461 462 /* XXX This should be in process_input, not here. There are 463 * other places that can call process-input, and they should 464 * force readcb calls as needed. */ 465 if (processed_any) { 466 bufferevent_trigger_nolock_(bufev, EV_READ, 0); 467 if (evbuffer_get_length(underlying->input) > 0 && 468 be_readbuf_full(bevf, state)) { 469 /* data left in underlying buffer and filter input buffer 470 * hit its read high watermark. 471 * Schedule callback to avoid data gets stuck in underlying 472 * input buffer. 473 */ 474 evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb, 475 EVBUFFER_CB_ENABLED); 476 } 477 } 478 } 479 } 480 481 /* Called when the size of our inbuf changes. */ 482 static void 483 bufferevent_filtered_inbuf_cb(struct evbuffer *buf, 484 const struct evbuffer_cb_info *cbinfo, void *arg) 485 { 486 struct bufferevent_filtered *bevf = arg; 487 enum bufferevent_flush_mode state; 488 struct bufferevent *bev = downcast(bevf); 489 490 BEV_LOCK(bev); 491 492 if (bevf->got_eof) 493 state = BEV_FINISHED; 494 else 495 state = BEV_NORMAL; 496 497 498 if (!be_readbuf_full(bevf, state)) { 499 /* opportunity to read data which was left in underlying 500 * input buffer because filter input buffer hit read 501 * high watermark. 502 */ 503 evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb, 504 EVBUFFER_CB_ENABLED); 505 if (evbuffer_get_length(bevf->underlying->input) > 0) 506 be_filter_read_nolock_(bevf->underlying, bevf); 507 } 508 509 BEV_UNLOCK(bev); 510 } 511 512 /* Called when the underlying socket has read. */ 513 static void 514 be_filter_readcb(struct bufferevent *underlying, void *me_) 515 { 516 struct bufferevent_filtered *bevf = me_; 517 struct bufferevent *bev = downcast(bevf); 518 519 BEV_LOCK(bev); 520 521 be_filter_read_nolock_(underlying, me_); 522 523 BEV_UNLOCK(bev); 524 } 525 526 /* Called when the underlying socket has drained enough that we can write to 527 it. */ 528 static void 529 be_filter_writecb(struct bufferevent *underlying, void *me_) 530 { 531 struct bufferevent_filtered *bevf = me_; 532 struct bufferevent *bev = downcast(bevf); 533 struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 534 int processed_any = 0; 535 536 BEV_LOCK(bev); 537 538 // It's possible our refcount is 0 at this point if another thread free'd our filterevent 539 EVUTIL_ASSERT(bufev_private->refcnt >= 0); 540 541 // If our refcount is > 0 542 if (bufev_private->refcnt > 0) { 543 be_filter_process_output(bevf, BEV_NORMAL, &processed_any); 544 } 545 546 BEV_UNLOCK(bev); 547 } 548 549 /* Called when the underlying socket has given us an error */ 550 static void 551 be_filter_eventcb(struct bufferevent *underlying, short what, void *me_) 552 { 553 struct bufferevent_filtered *bevf = me_; 554 struct bufferevent *bev = downcast(bevf); 555 struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 556 557 BEV_LOCK(bev); 558 559 // It's possible our refcount is 0 at this point if another thread free'd our filterevent 560 EVUTIL_ASSERT(bufev_private->refcnt >= 0); 561 562 // If our refcount is > 0 563 if (bufev_private->refcnt > 0) { 564 565 /* All we can really to is tell our own eventcb. */ 566 bufferevent_run_eventcb_(bev, what, 0); 567 } 568 569 BEV_UNLOCK(bev); 570 } 571 572 static int 573 be_filter_flush(struct bufferevent *bufev, 574 short iotype, enum bufferevent_flush_mode mode) 575 { 576 struct bufferevent_filtered *bevf = upcast(bufev); 577 int processed_any = 0; 578 EVUTIL_ASSERT(bevf); 579 580 bufferevent_incref_and_lock_(bufev); 581 582 if (iotype & EV_READ) { 583 be_filter_process_input(bevf, mode, &processed_any); 584 } 585 if (iotype & EV_WRITE) { 586 be_filter_process_output(bevf, mode, &processed_any); 587 } 588 /* XXX check the return value? */ 589 /* XXX does this want to recursively call lower-level flushes? */ 590 bufferevent_flush(bevf->underlying, iotype, mode); 591 592 bufferevent_decref_and_unlock_(bufev); 593 594 return processed_any; 595 } 596 597 static int 598 be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 599 union bufferevent_ctrl_data *data) 600 { 601 struct bufferevent_filtered *bevf; 602 switch (op) { 603 case BEV_CTRL_GET_UNDERLYING: 604 bevf = upcast(bev); 605 data->ptr = bevf->underlying; 606 return 0; 607 case BEV_CTRL_SET_FD: 608 case BEV_CTRL_GET_FD: 609 bevf = upcast(bev); 610 611 if (bevf->underlying && 612 bevf->underlying->be_ops && 613 bevf->underlying->be_ops->ctrl) { 614 return (bevf->underlying->be_ops->ctrl)(bevf->underlying, op, data); 615 } 616 EVUTIL_FALLTHROUGH; 617 618 case BEV_CTRL_CANCEL_ALL: 619 EVUTIL_FALLTHROUGH; 620 default: 621 return -1; 622 } 623 624 return -1; 625 } 626