1 /* $NetBSD: bufferevent.c,v 1.2 2013/04/11 16:56:41 christos Exp $ */ 2 /* 3 * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu> 4 * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include <sys/types.h> 30 31 #include "event2/event-config.h" 32 #include <sys/cdefs.h> 33 __RCSID("$NetBSD: bufferevent.c,v 1.2 2013/04/11 16:56:41 christos Exp $"); 34 35 #ifdef _EVENT_HAVE_SYS_TIME_H 36 #include <sys/time.h> 37 #endif 38 39 #include <errno.h> 40 #include <stdio.h> 41 #include <stdlib.h> 42 #include <string.h> 43 #ifdef _EVENT_HAVE_STDARG_H 44 #include <stdarg.h> 45 #endif 46 47 #ifdef WIN32 48 #include <winsock2.h> 49 #endif 50 #include <errno.h> 51 52 #include "event2/util.h" 53 #include "event2/buffer.h" 54 #include "event2/buffer_compat.h" 55 #include "event2/bufferevent.h" 56 #include "event2/bufferevent_struct.h" 57 #include "event2/bufferevent_compat.h" 58 #include "event2/event.h" 59 #include "log-internal.h" 60 #include "mm-internal.h" 61 #include "bufferevent-internal.h" 62 #include "evbuffer-internal.h" 63 #include "util-internal.h" 64 65 static void _bufferevent_cancel_all(struct bufferevent *bev); 66 67 68 void 69 bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what) 70 { 71 struct bufferevent_private *bufev_private = 72 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 73 BEV_LOCK(bufev); 74 if (!bufev_private->read_suspended) 75 bufev->be_ops->disable(bufev, EV_READ); 76 bufev_private->read_suspended |= what; 77 BEV_UNLOCK(bufev); 78 } 79 80 void 81 bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what) 82 { 83 struct bufferevent_private *bufev_private = 84 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 85 BEV_LOCK(bufev); 86 bufev_private->read_suspended &= ~what; 87 if (!bufev_private->read_suspended && (bufev->enabled & EV_READ)) 88 bufev->be_ops->enable(bufev, EV_READ); 89 BEV_UNLOCK(bufev); 90 } 91 92 void 93 bufferevent_suspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what) 94 { 95 struct bufferevent_private *bufev_private = 96 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 97 BEV_LOCK(bufev); 98 if (!bufev_private->write_suspended) 99 bufev->be_ops->disable(bufev, EV_WRITE); 100 bufev_private->write_suspended |= what; 101 BEV_UNLOCK(bufev); 102 } 103 104 void 105 bufferevent_unsuspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what) 106 { 107 struct bufferevent_private *bufev_private = 108 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 109 BEV_LOCK(bufev); 110 bufev_private->write_suspended &= ~what; 111 if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE)) 112 bufev->be_ops->enable(bufev, EV_WRITE); 113 BEV_UNLOCK(bufev); 114 } 115 116 117 /* Callback to implement watermarks on the input buffer. Only enabled 118 * if the watermark is set. */ 119 static void 120 bufferevent_inbuf_wm_cb(struct evbuffer *buf, 121 const struct evbuffer_cb_info *cbinfo, 122 void *arg) 123 { 124 struct bufferevent *bufev = arg; 125 size_t size; 126 127 size = evbuffer_get_length(buf); 128 129 if (size >= bufev->wm_read.high) 130 bufferevent_wm_suspend_read(bufev); 131 else 132 bufferevent_wm_unsuspend_read(bufev); 133 } 134 135 static void 136 bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg) 137 { 138 struct bufferevent_private *bufev_private = arg; 139 struct bufferevent *bufev = &bufev_private->bev; 140 141 BEV_LOCK(bufev); 142 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 143 bufev->errorcb) { 144 /* The "connected" happened before any reads or writes, so 145 send it first. */ 146 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 147 bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg); 148 } 149 if (bufev_private->readcb_pending && bufev->readcb) { 150 bufev_private->readcb_pending = 0; 151 bufev->readcb(bufev, bufev->cbarg); 152 } 153 if (bufev_private->writecb_pending && bufev->writecb) { 154 bufev_private->writecb_pending = 0; 155 bufev->writecb(bufev, bufev->cbarg); 156 } 157 if (bufev_private->eventcb_pending && bufev->errorcb) { 158 short what = bufev_private->eventcb_pending; 159 int err = bufev_private->errno_pending; 160 bufev_private->eventcb_pending = 0; 161 bufev_private->errno_pending = 0; 162 EVUTIL_SET_SOCKET_ERROR(err); 163 bufev->errorcb(bufev, what, bufev->cbarg); 164 } 165 _bufferevent_decref_and_unlock(bufev); 166 } 167 168 static void 169 bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg) 170 { 171 struct bufferevent_private *bufev_private = arg; 172 struct bufferevent *bufev = &bufev_private->bev; 173 174 BEV_LOCK(bufev); 175 #define UNLOCKED(stmt) \ 176 do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0) 177 178 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && 179 bufev->errorcb) { 180 /* The "connected" happened before any reads or writes, so 181 send it first. */ 182 bufferevent_event_cb errorcb = bufev->errorcb; 183 void *cbarg = bufev->cbarg; 184 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; 185 UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg)); 186 } 187 if (bufev_private->readcb_pending && bufev->readcb) { 188 bufferevent_data_cb readcb = bufev->readcb; 189 void *cbarg = bufev->cbarg; 190 bufev_private->readcb_pending = 0; 191 UNLOCKED(readcb(bufev, cbarg)); 192 } 193 if (bufev_private->writecb_pending && bufev->writecb) { 194 bufferevent_data_cb writecb = bufev->writecb; 195 void *cbarg = bufev->cbarg; 196 bufev_private->writecb_pending = 0; 197 UNLOCKED(writecb(bufev, cbarg)); 198 } 199 if (bufev_private->eventcb_pending && bufev->errorcb) { 200 bufferevent_event_cb errorcb = bufev->errorcb; 201 void *cbarg = bufev->cbarg; 202 short what = bufev_private->eventcb_pending; 203 int err = bufev_private->errno_pending; 204 bufev_private->eventcb_pending = 0; 205 bufev_private->errno_pending = 0; 206 EVUTIL_SET_SOCKET_ERROR(err); 207 UNLOCKED(errorcb(bufev,what,cbarg)); 208 } 209 _bufferevent_decref_and_unlock(bufev); 210 #undef UNLOCKED 211 } 212 213 #define SCHEDULE_DEFERRED(bevp) \ 214 do { \ 215 bufferevent_incref(&(bevp)->bev); \ 216 event_deferred_cb_schedule( \ 217 event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \ 218 &(bevp)->deferred); \ 219 } while (/*CONSTCOND*/0) 220 221 222 void 223 _bufferevent_run_readcb(struct bufferevent *bufev) 224 { 225 /* Requires that we hold the lock and a reference */ 226 struct bufferevent_private *p = 227 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 228 if (bufev->readcb == NULL) 229 return; 230 if (p->options & BEV_OPT_DEFER_CALLBACKS) { 231 p->readcb_pending = 1; 232 if (!p->deferred.queued) 233 SCHEDULE_DEFERRED(p); 234 } else { 235 bufev->readcb(bufev, bufev->cbarg); 236 } 237 } 238 239 void 240 _bufferevent_run_writecb(struct bufferevent *bufev) 241 { 242 /* Requires that we hold the lock and a reference */ 243 struct bufferevent_private *p = 244 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 245 if (bufev->writecb == NULL) 246 return; 247 if (p->options & BEV_OPT_DEFER_CALLBACKS) { 248 p->writecb_pending = 1; 249 if (!p->deferred.queued) 250 SCHEDULE_DEFERRED(p); 251 } else { 252 bufev->writecb(bufev, bufev->cbarg); 253 } 254 } 255 256 void 257 _bufferevent_run_eventcb(struct bufferevent *bufev, short what) 258 { 259 /* Requires that we hold the lock and a reference */ 260 struct bufferevent_private *p = 261 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 262 if (bufev->errorcb == NULL) 263 return; 264 if (p->options & BEV_OPT_DEFER_CALLBACKS) { 265 p->eventcb_pending |= what; 266 p->errno_pending = EVUTIL_SOCKET_ERROR(); 267 if (!p->deferred.queued) 268 SCHEDULE_DEFERRED(p); 269 } else { 270 bufev->errorcb(bufev, what, bufev->cbarg); 271 } 272 } 273 274 int 275 bufferevent_init_common(struct bufferevent_private *bufev_private, 276 struct event_base *base, 277 const struct bufferevent_ops *ops, 278 enum bufferevent_options options) 279 { 280 struct bufferevent *bufev = &bufev_private->bev; 281 282 if (!bufev->input) { 283 if ((bufev->input = evbuffer_new()) == NULL) 284 return -1; 285 } 286 287 if (!bufev->output) { 288 if ((bufev->output = evbuffer_new()) == NULL) { 289 evbuffer_free(bufev->input); 290 return -1; 291 } 292 } 293 294 bufev_private->refcnt = 1; 295 bufev->ev_base = base; 296 297 /* Disable timeouts. */ 298 evutil_timerclear(&bufev->timeout_read); 299 evutil_timerclear(&bufev->timeout_write); 300 301 bufev->be_ops = ops; 302 303 /* 304 * Set to EV_WRITE so that using bufferevent_write is going to 305 * trigger a callback. Reading needs to be explicitly enabled 306 * because otherwise no data will be available. 307 */ 308 bufev->enabled = EV_WRITE; 309 310 #ifndef _EVENT_DISABLE_THREAD_SUPPORT 311 if (options & BEV_OPT_THREADSAFE) { 312 if (bufferevent_enable_locking(bufev, NULL) < 0) { 313 /* cleanup */ 314 evbuffer_free(bufev->input); 315 evbuffer_free(bufev->output); 316 bufev->input = NULL; 317 bufev->output = NULL; 318 return -1; 319 } 320 } 321 #endif 322 if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) 323 == BEV_OPT_UNLOCK_CALLBACKS) { 324 event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); 325 return -1; 326 } 327 if (options & BEV_OPT_DEFER_CALLBACKS) { 328 if (options & BEV_OPT_UNLOCK_CALLBACKS) 329 event_deferred_cb_init(&bufev_private->deferred, 330 bufferevent_run_deferred_callbacks_unlocked, 331 bufev_private); 332 else 333 event_deferred_cb_init(&bufev_private->deferred, 334 bufferevent_run_deferred_callbacks_locked, 335 bufev_private); 336 } 337 338 bufev_private->options = options; 339 340 evbuffer_set_parent(bufev->input, bufev); 341 evbuffer_set_parent(bufev->output, bufev); 342 343 return 0; 344 } 345 346 void 347 bufferevent_setcb(struct bufferevent *bufev, 348 bufferevent_data_cb readcb, bufferevent_data_cb writecb, 349 bufferevent_event_cb eventcb, void *cbarg) 350 { 351 BEV_LOCK(bufev); 352 353 bufev->readcb = readcb; 354 bufev->writecb = writecb; 355 bufev->errorcb = eventcb; 356 357 bufev->cbarg = cbarg; 358 BEV_UNLOCK(bufev); 359 } 360 361 struct evbuffer * 362 bufferevent_get_input(struct bufferevent *bufev) 363 { 364 return bufev->input; 365 } 366 367 struct evbuffer * 368 bufferevent_get_output(struct bufferevent *bufev) 369 { 370 return bufev->output; 371 } 372 373 struct event_base * 374 bufferevent_get_base(struct bufferevent *bufev) 375 { 376 return bufev->ev_base; 377 } 378 379 int 380 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) 381 { 382 if (evbuffer_add(bufev->output, data, size) == -1) 383 return (-1); 384 385 return 0; 386 } 387 388 int 389 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 390 { 391 if (evbuffer_add_buffer(bufev->output, buf) == -1) 392 return (-1); 393 394 return 0; 395 } 396 397 size_t 398 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 399 { 400 return (evbuffer_remove(bufev->input, data, size)); 401 } 402 403 int 404 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf) 405 { 406 return (evbuffer_add_buffer(buf, bufev->input)); 407 } 408 409 int 410 bufferevent_enable(struct bufferevent *bufev, short event) 411 { 412 struct bufferevent_private *bufev_private = 413 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 414 short impl_events = event; 415 int r = 0; 416 417 _bufferevent_incref_and_lock(bufev); 418 if (bufev_private->read_suspended) 419 impl_events &= ~EV_READ; 420 if (bufev_private->write_suspended) 421 impl_events &= ~EV_WRITE; 422 423 bufev->enabled |= event; 424 425 if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0) 426 r = -1; 427 428 _bufferevent_decref_and_unlock(bufev); 429 return r; 430 } 431 432 int 433 bufferevent_set_timeouts(struct bufferevent *bufev, 434 const struct timeval *tv_read, 435 const struct timeval *tv_write) 436 { 437 int r = 0; 438 BEV_LOCK(bufev); 439 if (tv_read) { 440 bufev->timeout_read = *tv_read; 441 } else { 442 evutil_timerclear(&bufev->timeout_read); 443 } 444 if (tv_write) { 445 bufev->timeout_write = *tv_write; 446 } else { 447 evutil_timerclear(&bufev->timeout_write); 448 } 449 450 if (bufev->be_ops->adj_timeouts) 451 r = bufev->be_ops->adj_timeouts(bufev); 452 BEV_UNLOCK(bufev); 453 454 return r; 455 } 456 457 458 /* Obsolete; use bufferevent_set_timeouts */ 459 void 460 bufferevent_settimeout(struct bufferevent *bufev, 461 int timeout_read, int timeout_write) 462 { 463 struct timeval tv_read, tv_write; 464 struct timeval *ptv_read = NULL, *ptv_write = NULL; 465 466 memset(&tv_read, 0, sizeof(tv_read)); 467 memset(&tv_write, 0, sizeof(tv_write)); 468 469 if (timeout_read) { 470 tv_read.tv_sec = timeout_read; 471 ptv_read = &tv_read; 472 } 473 if (timeout_write) { 474 tv_write.tv_sec = timeout_write; 475 ptv_write = &tv_write; 476 } 477 478 bufferevent_set_timeouts(bufev, ptv_read, ptv_write); 479 } 480 481 482 int 483 bufferevent_disable_hard(struct bufferevent *bufev, short event) 484 { 485 int r = 0; 486 struct bufferevent_private *bufev_private = 487 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 488 489 BEV_LOCK(bufev); 490 bufev->enabled &= ~event; 491 492 bufev_private->connecting = 0; 493 if (bufev->be_ops->disable(bufev, event) < 0) 494 r = -1; 495 496 BEV_UNLOCK(bufev); 497 return r; 498 } 499 500 int 501 bufferevent_disable(struct bufferevent *bufev, short event) 502 { 503 int r = 0; 504 505 BEV_LOCK(bufev); 506 bufev->enabled &= ~event; 507 508 if (bufev->be_ops->disable(bufev, event) < 0) 509 r = -1; 510 511 BEV_UNLOCK(bufev); 512 return r; 513 } 514 515 /* 516 * Sets the water marks 517 */ 518 519 void 520 bufferevent_setwatermark(struct bufferevent *bufev, short events, 521 size_t lowmark, size_t highmark) 522 { 523 struct bufferevent_private *bufev_private = 524 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 525 526 BEV_LOCK(bufev); 527 if (events & EV_WRITE) { 528 bufev->wm_write.low = lowmark; 529 bufev->wm_write.high = highmark; 530 } 531 532 if (events & EV_READ) { 533 bufev->wm_read.low = lowmark; 534 bufev->wm_read.high = highmark; 535 536 if (highmark) { 537 /* There is now a new high-water mark for read. 538 enable the callback if needed, and see if we should 539 suspend/bufferevent_wm_unsuspend. */ 540 541 if (bufev_private->read_watermarks_cb == NULL) { 542 bufev_private->read_watermarks_cb = 543 evbuffer_add_cb(bufev->input, 544 bufferevent_inbuf_wm_cb, 545 bufev); 546 } 547 evbuffer_cb_set_flags(bufev->input, 548 bufev_private->read_watermarks_cb, 549 EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER); 550 551 if (evbuffer_get_length(bufev->input) > highmark) 552 bufferevent_wm_suspend_read(bufev); 553 else if (evbuffer_get_length(bufev->input) < highmark) 554 bufferevent_wm_unsuspend_read(bufev); 555 } else { 556 /* There is now no high-water mark for read. */ 557 if (bufev_private->read_watermarks_cb) 558 evbuffer_cb_clear_flags(bufev->input, 559 bufev_private->read_watermarks_cb, 560 EVBUFFER_CB_ENABLED); 561 bufferevent_wm_unsuspend_read(bufev); 562 } 563 } 564 BEV_UNLOCK(bufev); 565 } 566 567 int 568 bufferevent_flush(struct bufferevent *bufev, 569 short iotype, 570 enum bufferevent_flush_mode mode) 571 { 572 int r = -1; 573 BEV_LOCK(bufev); 574 if (bufev->be_ops->flush) 575 r = bufev->be_ops->flush(bufev, iotype, mode); 576 BEV_UNLOCK(bufev); 577 return r; 578 } 579 580 void 581 _bufferevent_incref_and_lock(struct bufferevent *bufev) 582 { 583 struct bufferevent_private *bufev_private = 584 BEV_UPCAST(bufev); 585 BEV_LOCK(bufev); 586 ++bufev_private->refcnt; 587 } 588 589 #if 0 590 static void 591 _bufferevent_transfer_lock_ownership(struct bufferevent *donor, 592 struct bufferevent *recipient) 593 { 594 struct bufferevent_private *d = BEV_UPCAST(donor); 595 struct bufferevent_private *r = BEV_UPCAST(recipient); 596 if (d->lock != r->lock) 597 return; 598 if (r->own_lock) 599 return; 600 if (d->own_lock) { 601 d->own_lock = 0; 602 r->own_lock = 1; 603 } 604 } 605 #endif 606 607 int 608 _bufferevent_decref_and_unlock(struct bufferevent *bufev) 609 { 610 struct bufferevent_private *bufev_private = 611 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 612 struct bufferevent *underlying; 613 614 EVUTIL_ASSERT(bufev_private->refcnt > 0); 615 616 if (--bufev_private->refcnt) { 617 BEV_UNLOCK(bufev); 618 return 0; 619 } 620 621 underlying = bufferevent_get_underlying(bufev); 622 623 /* Clean up the shared info */ 624 if (bufev->be_ops->destruct) 625 bufev->be_ops->destruct(bufev); 626 627 /* XXX what happens if refcnt for these buffers is > 1? 628 * The buffers can share a lock with this bufferevent object, 629 * but the lock might be destroyed below. */ 630 /* evbuffer will free the callbacks */ 631 evbuffer_free(bufev->input); 632 evbuffer_free(bufev->output); 633 634 if (bufev_private->rate_limiting) { 635 if (bufev_private->rate_limiting->group) 636 bufferevent_remove_from_rate_limit_group_internal(bufev,0); 637 if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event)) 638 event_del(&bufev_private->rate_limiting->refill_bucket_event); 639 event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event); 640 mm_free(bufev_private->rate_limiting); 641 bufev_private->rate_limiting = NULL; 642 } 643 644 event_debug_unassign(&bufev->ev_read); 645 event_debug_unassign(&bufev->ev_write); 646 647 BEV_UNLOCK(bufev); 648 if (bufev_private->own_lock) 649 EVTHREAD_FREE_LOCK(bufev_private->lock, 650 EVTHREAD_LOCKTYPE_RECURSIVE); 651 652 /* Free the actual allocated memory. */ 653 mm_free(((char*)bufev) - bufev->be_ops->mem_offset); 654 655 /* Release the reference to underlying now that we no longer need the 656 * reference to it. We wait this long mainly in case our lock is 657 * shared with underlying. 658 * 659 * The 'destruct' function will also drop a reference to underlying 660 * if BEV_OPT_CLOSE_ON_FREE is set. 661 * 662 * XXX Should we/can we just refcount evbuffer/bufferevent locks? 663 * It would probably save us some headaches. 664 */ 665 if (underlying) 666 bufferevent_decref(underlying); 667 668 return 1; 669 } 670 671 int 672 bufferevent_decref(struct bufferevent *bufev) 673 { 674 BEV_LOCK(bufev); 675 return _bufferevent_decref_and_unlock(bufev); 676 } 677 678 void 679 bufferevent_free(struct bufferevent *bufev) 680 { 681 BEV_LOCK(bufev); 682 bufferevent_setcb(bufev, NULL, NULL, NULL, NULL); 683 _bufferevent_cancel_all(bufev); 684 _bufferevent_decref_and_unlock(bufev); 685 } 686 687 void 688 bufferevent_incref(struct bufferevent *bufev) 689 { 690 struct bufferevent_private *bufev_private = 691 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); 692 693 BEV_LOCK(bufev); 694 ++bufev_private->refcnt; 695 BEV_UNLOCK(bufev); 696 } 697 698 int 699 bufferevent_enable_locking(struct bufferevent *bufev, void *lock) 700 { 701 #ifdef _EVENT_DISABLE_THREAD_SUPPORT 702 return -1; 703 #else 704 struct bufferevent *underlying; 705 706 if (BEV_UPCAST(bufev)->lock) 707 return -1; 708 underlying = bufferevent_get_underlying(bufev); 709 710 if (!lock && underlying && BEV_UPCAST(underlying)->lock) { 711 lock = BEV_UPCAST(underlying)->lock; 712 BEV_UPCAST(bufev)->lock = lock; 713 BEV_UPCAST(bufev)->own_lock = 0; 714 } else if (!lock) { 715 EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE); 716 if (!lock) 717 return -1; 718 BEV_UPCAST(bufev)->lock = lock; 719 BEV_UPCAST(bufev)->own_lock = 1; 720 } else { 721 BEV_UPCAST(bufev)->lock = lock; 722 BEV_UPCAST(bufev)->own_lock = 0; 723 } 724 evbuffer_enable_locking(bufev->input, lock); 725 evbuffer_enable_locking(bufev->output, lock); 726 727 if (underlying && !BEV_UPCAST(underlying)->lock) 728 bufferevent_enable_locking(underlying, lock); 729 730 return 0; 731 #endif 732 } 733 734 int 735 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd) 736 { 737 union bufferevent_ctrl_data d; 738 int res = -1; 739 d.fd = fd; 740 BEV_LOCK(bev); 741 if (bev->be_ops->ctrl) 742 res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d); 743 BEV_UNLOCK(bev); 744 return res; 745 } 746 747 evutil_socket_t 748 bufferevent_getfd(struct bufferevent *bev) 749 { 750 union bufferevent_ctrl_data d; 751 int res = -1; 752 d.fd = -1; 753 BEV_LOCK(bev); 754 if (bev->be_ops->ctrl) 755 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d); 756 BEV_UNLOCK(bev); 757 return (res<0) ? -1 : d.fd; 758 } 759 760 static void 761 _bufferevent_cancel_all(struct bufferevent *bev) 762 { 763 union bufferevent_ctrl_data d; 764 memset(&d, 0, sizeof(d)); 765 BEV_LOCK(bev); 766 if (bev->be_ops->ctrl) 767 bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d); 768 BEV_UNLOCK(bev); 769 } 770 771 short 772 bufferevent_get_enabled(struct bufferevent *bufev) 773 { 774 short r; 775 BEV_LOCK(bufev); 776 r = bufev->enabled; 777 BEV_UNLOCK(bufev); 778 return r; 779 } 780 781 struct bufferevent * 782 bufferevent_get_underlying(struct bufferevent *bev) 783 { 784 union bufferevent_ctrl_data d; 785 int res = -1; 786 d.ptr = NULL; 787 BEV_LOCK(bev); 788 if (bev->be_ops->ctrl) 789 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d); 790 BEV_UNLOCK(bev); 791 return (res<0) ? NULL : d.ptr; 792 } 793 794 static void 795 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx) 796 { 797 struct bufferevent *bev = ctx; 798 _bufferevent_incref_and_lock(bev); 799 bufferevent_disable(bev, EV_READ); 800 _bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING); 801 _bufferevent_decref_and_unlock(bev); 802 } 803 static void 804 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) 805 { 806 struct bufferevent *bev = ctx; 807 _bufferevent_incref_and_lock(bev); 808 bufferevent_disable(bev, EV_WRITE); 809 _bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING); 810 _bufferevent_decref_and_unlock(bev); 811 } 812 813 void 814 _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev) 815 { 816 evtimer_assign(&bev->ev_read, bev->ev_base, 817 bufferevent_generic_read_timeout_cb, bev); 818 evtimer_assign(&bev->ev_write, bev->ev_base, 819 bufferevent_generic_write_timeout_cb, bev); 820 } 821 822 int 823 _bufferevent_del_generic_timeout_cbs(struct bufferevent *bev) 824 { 825 int r1,r2; 826 r1 = event_del(&bev->ev_read); 827 r2 = event_del(&bev->ev_write); 828 if (r1<0 || r2<0) 829 return -1; 830 return 0; 831 } 832 833 int 834 _bufferevent_generic_adj_timeouts(struct bufferevent *bev) 835 { 836 const short enabled = bev->enabled; 837 struct bufferevent_private *bev_p = 838 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 839 int r1=0, r2=0; 840 if ((enabled & EV_READ) && !bev_p->read_suspended && 841 evutil_timerisset(&bev->timeout_read)) 842 r1 = event_add(&bev->ev_read, &bev->timeout_read); 843 else 844 r1 = event_del(&bev->ev_read); 845 846 if ((enabled & EV_WRITE) && !bev_p->write_suspended && 847 evutil_timerisset(&bev->timeout_write) && 848 evbuffer_get_length(bev->output)) 849 r2 = event_add(&bev->ev_write, &bev->timeout_write); 850 else 851 r2 = event_del(&bev->ev_write); 852 if (r1 < 0 || r2 < 0) 853 return -1; 854 return 0; 855 } 856 857 int 858 _bufferevent_add_event(struct event *ev, const struct timeval *tv) 859 { 860 if (tv->tv_sec == 0 && tv->tv_usec == 0) 861 return event_add(ev, NULL); 862 else 863 return event_add(ev, tv); 864 } 865 866 /* For use by user programs only; internally, we should be calling 867 either _bufferevent_incref_and_lock(), or BEV_LOCK. */ 868 void 869 bufferevent_lock(struct bufferevent *bev) 870 { 871 _bufferevent_incref_and_lock(bev); 872 } 873 874 void 875 bufferevent_unlock(struct bufferevent *bev) 876 { 877 _bufferevent_decref_and_unlock(bev); 878 } 879