1 /* $NetBSD: bufferevent_ratelim.c,v 1.3 2017/01/31 23:17:39 christos Exp $ */ 2 /* 3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 4 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 #include "evconfig-private.h" 30 31 #include <sys/types.h> 32 #include <limits.h> 33 #include <string.h> 34 #include <stdlib.h> 35 36 #include "event2/event.h" 37 #include "event2/event_struct.h" 38 #include "event2/util.h" 39 #include "event2/bufferevent.h" 40 #include "event2/bufferevent_struct.h" 41 #include "event2/buffer.h" 42 43 #include "ratelim-internal.h" 44 45 #include "bufferevent-internal.h" 46 #include "mm-internal.h" 47 #include "util-internal.h" 48 #include "event-internal.h" 49 50 int 51 ev_token_bucket_init_(struct ev_token_bucket *bucket, 52 const struct ev_token_bucket_cfg *cfg, 53 ev_uint32_t current_tick, 54 int reinitialize) 55 { 56 if (reinitialize) { 57 /* on reinitialization, we only clip downwards, since we've 58 already used who-knows-how-much bandwidth this tick. We 59 leave "last_updated" as it is; the next update will add the 60 appropriate amount of bandwidth to the bucket. 61 */ 62 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) 63 bucket->read_limit = cfg->read_maximum; 64 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) 65 bucket->write_limit = cfg->write_maximum; 66 } else { 67 bucket->read_limit = cfg->read_rate; 68 bucket->write_limit = cfg->write_rate; 69 bucket->last_updated = current_tick; 70 } 71 return 0; 72 } 73 74 int 75 ev_token_bucket_update_(struct ev_token_bucket *bucket, 76 const struct ev_token_bucket_cfg *cfg, 77 ev_uint32_t current_tick) 78 { 79 /* It's okay if the tick number overflows, since we'll just 80 * wrap around when we do the unsigned substraction. */ 81 unsigned n_ticks = current_tick - bucket->last_updated; 82 83 /* Make sure some ticks actually happened, and that time didn't 84 * roll back. */ 85 if (n_ticks == 0 || n_ticks > INT_MAX) 86 return 0; 87 88 /* Naively, we would say 89 bucket->limit += n_ticks * cfg->rate; 90 91 if (bucket->limit > cfg->maximum) 92 bucket->limit = cfg->maximum; 93 94 But we're worried about overflow, so we do it like this: 95 */ 96 97 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) 98 bucket->read_limit = cfg->read_maximum; 99 else 100 bucket->read_limit += n_ticks * cfg->read_rate; 101 102 103 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) 104 bucket->write_limit = cfg->write_maximum; 105 else 106 bucket->write_limit += n_ticks * cfg->write_rate; 107 108 109 bucket->last_updated = current_tick; 110 111 return 1; 112 } 113 114 static inline void 115 bufferevent_update_buckets(struct bufferevent_private *bev) 116 { 117 /* Must hold lock on bev. */ 118 struct timeval now; 119 unsigned tick; 120 event_base_gettimeofday_cached(bev->bev.ev_base, &now); 121 tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); 122 if (tick != bev->rate_limiting->limit.last_updated) 123 ev_token_bucket_update_(&bev->rate_limiting->limit, 124 bev->rate_limiting->cfg, tick); 125 } 126 127 ev_uint32_t 128 ev_token_bucket_get_tick_(const struct timeval *tv, 129 const struct ev_token_bucket_cfg *cfg) 130 { 131 /* This computation uses two multiplies and a divide. We could do 132 * fewer if we knew that the tick length was an integer number of 133 * seconds, or if we knew it divided evenly into a second. We should 134 * investigate that more. 135 */ 136 137 /* We cast to an ev_uint64_t first, since we don't want to overflow 138 * before we do the final divide. */ 139 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; 140 return (unsigned)(msec / cfg->msec_per_tick); 141 } 142 143 struct ev_token_bucket_cfg * 144 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, 145 size_t write_rate, size_t write_burst, 146 const struct timeval *tick_len) 147 { 148 struct ev_token_bucket_cfg *r; 149 struct timeval g; 150 if (! tick_len) { 151 g.tv_sec = 1; 152 g.tv_usec = 0; 153 tick_len = &g; 154 } 155 if (read_rate > read_burst || write_rate > write_burst || 156 read_rate < 1 || write_rate < 1) 157 return NULL; 158 if (read_rate > EV_RATE_LIMIT_MAX || 159 write_rate > EV_RATE_LIMIT_MAX || 160 read_burst > EV_RATE_LIMIT_MAX || 161 write_burst > EV_RATE_LIMIT_MAX) 162 return NULL; 163 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); 164 if (!r) 165 return NULL; 166 r->read_rate = read_rate; 167 r->write_rate = write_rate; 168 r->read_maximum = read_burst; 169 r->write_maximum = write_burst; 170 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); 171 r->msec_per_tick = (tick_len->tv_sec * 1000) + 172 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; 173 return r; 174 } 175 176 void 177 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) 178 { 179 mm_free(cfg); 180 } 181 182 /* Default values for max_single_read & max_single_write variables. */ 183 #define MAX_SINGLE_READ_DEFAULT 16384 184 #define MAX_SINGLE_WRITE_DEFAULT 16384 185 186 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) 187 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) 188 189 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g); 190 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g); 191 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g); 192 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g); 193 194 /** Helper: figure out the maximum amount we should write if is_write, or 195 the maximum amount we should read if is_read. Return that maximum, or 196 0 if our bucket is wholly exhausted. 197 */ 198 static inline ev_ssize_t 199 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write) 200 { 201 /* needs lock on bev. */ 202 ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; 203 204 #define LIM(x) \ 205 (is_write ? (x).write_limit : (x).read_limit) 206 207 #define GROUP_SUSPENDED(g) \ 208 (is_write ? (g)->write_suspended : (g)->read_suspended) 209 210 /* Sets max_so_far to MIN(x, max_so_far) */ 211 #define CLAMPTO(x) \ 212 do { \ 213 if (max_so_far > (x)) \ 214 max_so_far = (x); \ 215 } while (/*CONSTCOND*/0); 216 217 if (!bev->rate_limiting) 218 return max_so_far; 219 220 /* If rate-limiting is enabled at all, update the appropriate 221 bucket, and take the smaller of our rate limit and the group 222 rate limit. 223 */ 224 225 if (bev->rate_limiting->cfg) { 226 bufferevent_update_buckets(bev); 227 max_so_far = LIM(bev->rate_limiting->limit); 228 } 229 if (bev->rate_limiting->group) { 230 struct bufferevent_rate_limit_group *g = 231 bev->rate_limiting->group; 232 ev_ssize_t share; 233 LOCK_GROUP(g); 234 if (GROUP_SUSPENDED(g)) { 235 /* We can get here if we failed to lock this 236 * particular bufferevent while suspending the whole 237 * group. */ 238 if (is_write) 239 bufferevent_suspend_write_(&bev->bev, 240 BEV_SUSPEND_BW_GROUP); 241 else 242 bufferevent_suspend_read_(&bev->bev, 243 BEV_SUSPEND_BW_GROUP); 244 share = 0; 245 } else { 246 /* XXXX probably we should divide among the active 247 * members, not the total members. */ 248 share = LIM(g->rate_limit) / g->n_members; 249 if (share < g->min_share) 250 share = g->min_share; 251 } 252 UNLOCK_GROUP(g); 253 CLAMPTO(share); 254 } 255 256 if (max_so_far < 0) 257 max_so_far = 0; 258 return max_so_far; 259 } 260 261 ev_ssize_t 262 bufferevent_get_read_max_(struct bufferevent_private *bev) 263 { 264 return bufferevent_get_rlim_max_(bev, 0); 265 } 266 267 ev_ssize_t 268 bufferevent_get_write_max_(struct bufferevent_private *bev) 269 { 270 return bufferevent_get_rlim_max_(bev, 1); 271 } 272 273 int 274 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 275 { 276 /* XXXXX Make sure all users of this function check its return value */ 277 int r = 0; 278 /* need to hold lock on bev */ 279 if (!bev->rate_limiting) 280 return 0; 281 282 if (bev->rate_limiting->cfg) { 283 bev->rate_limiting->limit.read_limit -= bytes; 284 if (bev->rate_limiting->limit.read_limit <= 0) { 285 bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW); 286 if (event_add(&bev->rate_limiting->refill_bucket_event, 287 &bev->rate_limiting->cfg->tick_timeout) < 0) 288 r = -1; 289 } else if (bev->read_suspended & BEV_SUSPEND_BW) { 290 if (!(bev->write_suspended & BEV_SUSPEND_BW)) 291 event_del(&bev->rate_limiting->refill_bucket_event); 292 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 293 } 294 } 295 296 if (bev->rate_limiting->group) { 297 LOCK_GROUP(bev->rate_limiting->group); 298 bev->rate_limiting->group->rate_limit.read_limit -= bytes; 299 bev->rate_limiting->group->total_read += bytes; 300 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { 301 bev_group_suspend_reading_(bev->rate_limiting->group); 302 } else if (bev->rate_limiting->group->read_suspended) { 303 bev_group_unsuspend_reading_(bev->rate_limiting->group); 304 } 305 UNLOCK_GROUP(bev->rate_limiting->group); 306 } 307 308 return r; 309 } 310 311 int 312 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 313 { 314 /* XXXXX Make sure all users of this function check its return value */ 315 int r = 0; 316 /* need to hold lock */ 317 if (!bev->rate_limiting) 318 return 0; 319 320 if (bev->rate_limiting->cfg) { 321 bev->rate_limiting->limit.write_limit -= bytes; 322 if (bev->rate_limiting->limit.write_limit <= 0) { 323 bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW); 324 if (event_add(&bev->rate_limiting->refill_bucket_event, 325 &bev->rate_limiting->cfg->tick_timeout) < 0) 326 r = -1; 327 } else if (bev->write_suspended & BEV_SUSPEND_BW) { 328 if (!(bev->read_suspended & BEV_SUSPEND_BW)) 329 event_del(&bev->rate_limiting->refill_bucket_event); 330 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 331 } 332 } 333 334 if (bev->rate_limiting->group) { 335 LOCK_GROUP(bev->rate_limiting->group); 336 bev->rate_limiting->group->rate_limit.write_limit -= bytes; 337 bev->rate_limiting->group->total_written += bytes; 338 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { 339 bev_group_suspend_writing_(bev->rate_limiting->group); 340 } else if (bev->rate_limiting->group->write_suspended) { 341 bev_group_unsuspend_writing_(bev->rate_limiting->group); 342 } 343 UNLOCK_GROUP(bev->rate_limiting->group); 344 } 345 346 return r; 347 } 348 349 /** Stop reading on every bufferevent in <b>g</b> */ 350 static int 351 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g) 352 { 353 /* Needs group lock */ 354 struct bufferevent_private *bev; 355 g->read_suspended = 1; 356 g->pending_unsuspend_read = 0; 357 358 /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK, 359 to prevent a deadlock. (Ordinarily, the group lock nests inside 360 the bufferevent locks. If we are unable to lock any individual 361 bufferevent, it will find out later when it looks at its limit 362 and sees that its group is suspended.) 363 */ 364 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 365 if (EVLOCK_TRY_LOCK_(bev->lock)) { 366 bufferevent_suspend_read_(&bev->bev, 367 BEV_SUSPEND_BW_GROUP); 368 EVLOCK_UNLOCK(bev->lock, 0); 369 } 370 } 371 return 0; 372 } 373 374 /** Stop writing on every bufferevent in <b>g</b> */ 375 static int 376 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g) 377 { 378 /* Needs group lock */ 379 struct bufferevent_private *bev; 380 g->write_suspended = 1; 381 g->pending_unsuspend_write = 0; 382 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 383 if (EVLOCK_TRY_LOCK_(bev->lock)) { 384 bufferevent_suspend_write_(&bev->bev, 385 BEV_SUSPEND_BW_GROUP); 386 EVLOCK_UNLOCK(bev->lock, 0); 387 } 388 } 389 return 0; 390 } 391 392 /** Timer callback invoked on a single bufferevent with one or more exhausted 393 buckets when they are ready to refill. */ 394 static void 395 bev_refill_callback_(evutil_socket_t fd, short what, void *arg) 396 { 397 unsigned tick; 398 struct timeval now; 399 struct bufferevent_private *bev = arg; 400 int again = 0; 401 BEV_LOCK(&bev->bev); 402 if (!bev->rate_limiting || !bev->rate_limiting->cfg) { 403 BEV_UNLOCK(&bev->bev); 404 return; 405 } 406 407 /* First, update the bucket */ 408 event_base_gettimeofday_cached(bev->bev.ev_base, &now); 409 tick = ev_token_bucket_get_tick_(&now, 410 bev->rate_limiting->cfg); 411 ev_token_bucket_update_(&bev->rate_limiting->limit, 412 bev->rate_limiting->cfg, 413 tick); 414 415 /* Now unsuspend any read/write operations as appropriate. */ 416 if ((bev->read_suspended & BEV_SUSPEND_BW)) { 417 if (bev->rate_limiting->limit.read_limit > 0) 418 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 419 else 420 again = 1; 421 } 422 if ((bev->write_suspended & BEV_SUSPEND_BW)) { 423 if (bev->rate_limiting->limit.write_limit > 0) 424 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 425 else 426 again = 1; 427 } 428 if (again) { 429 /* One or more of the buckets may need another refill if they 430 started negative. 431 432 XXXX if we need to be quiet for more ticks, we should 433 maybe figure out what timeout we really want. 434 */ 435 /* XXXX Handle event_add failure somehow */ 436 event_add(&bev->rate_limiting->refill_bucket_event, 437 &bev->rate_limiting->cfg->tick_timeout); 438 } 439 BEV_UNLOCK(&bev->bev); 440 } 441 442 /** Helper: grab a random element from a bufferevent group. 443 * 444 * Requires that we hold the lock on the group. 445 */ 446 static struct bufferevent_private * 447 bev_group_random_element_(struct bufferevent_rate_limit_group *group) 448 { 449 int which; 450 struct bufferevent_private *bev; 451 452 /* requires group lock */ 453 454 if (!group->n_members) 455 return NULL; 456 457 EVUTIL_ASSERT(! LIST_EMPTY(&group->members)); 458 459 which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members); 460 461 bev = LIST_FIRST(&group->members); 462 while (which--) 463 bev = LIST_NEXT(bev, rate_limiting->next_in_group); 464 465 return bev; 466 } 467 468 /** Iterate over the elements of a rate-limiting group 'g' with a random 469 starting point, assigning each to the variable 'bev', and executing the 470 block 'block'. 471 472 We do this in a half-baked effort to get fairness among group members. 473 XXX Round-robin or some kind of priority queue would be even more fair. 474 */ 475 #define FOREACH_RANDOM_ORDER(block) \ 476 do { \ 477 first = bev_group_random_element_(g); \ 478 for (bev = first; bev != LIST_END(&g->members); \ 479 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 480 block ; \ 481 } \ 482 for (bev = LIST_FIRST(&g->members); bev && bev != first; \ 483 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 484 block ; \ 485 } \ 486 } while (/*CONSTCOND*/0) 487 488 static void 489 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g) 490 { 491 int again = 0; 492 struct bufferevent_private *bev, *first; 493 494 g->read_suspended = 0; 495 FOREACH_RANDOM_ORDER({ 496 if (EVLOCK_TRY_LOCK_(bev->lock)) { 497 bufferevent_unsuspend_read_(&bev->bev, 498 BEV_SUSPEND_BW_GROUP); 499 EVLOCK_UNLOCK(bev->lock, 0); 500 } else { 501 again = 1; 502 } 503 }); 504 g->pending_unsuspend_read = again; 505 } 506 507 static void 508 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g) 509 { 510 int again = 0; 511 struct bufferevent_private *bev, *first; 512 g->write_suspended = 0; 513 514 FOREACH_RANDOM_ORDER({ 515 if (EVLOCK_TRY_LOCK_(bev->lock)) { 516 bufferevent_unsuspend_write_(&bev->bev, 517 BEV_SUSPEND_BW_GROUP); 518 EVLOCK_UNLOCK(bev->lock, 0); 519 } else { 520 again = 1; 521 } 522 }); 523 g->pending_unsuspend_write = again; 524 } 525 526 /** Callback invoked every tick to add more elements to the group bucket 527 and unsuspend group members as needed. 528 */ 529 static void 530 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg) 531 { 532 struct bufferevent_rate_limit_group *g = arg; 533 unsigned tick; 534 struct timeval now; 535 536 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); 537 538 LOCK_GROUP(g); 539 540 tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg); 541 ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick); 542 543 if (g->pending_unsuspend_read || 544 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { 545 bev_group_unsuspend_reading_(g); 546 } 547 if (g->pending_unsuspend_write || 548 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ 549 bev_group_unsuspend_writing_(g); 550 } 551 552 /* XXXX Rather than waiting to the next tick to unsuspend stuff 553 * with pending_unsuspend_write/read, we should do it on the 554 * next iteration of the mainloop. 555 */ 556 557 UNLOCK_GROUP(g); 558 } 559 560 int 561 bufferevent_set_rate_limit(struct bufferevent *bev, 562 struct ev_token_bucket_cfg *cfg) 563 { 564 struct bufferevent_private *bevp = 565 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 566 int r = -1; 567 struct bufferevent_rate_limit *rlim; 568 struct timeval now; 569 ev_uint32_t tick; 570 int reinit = 0, suspended = 0; 571 /* XXX reference-count cfg */ 572 573 BEV_LOCK(bev); 574 575 if (cfg == NULL) { 576 if (bevp->rate_limiting) { 577 rlim = bevp->rate_limiting; 578 rlim->cfg = NULL; 579 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 580 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 581 if (event_initialized(&rlim->refill_bucket_event)) 582 event_del(&rlim->refill_bucket_event); 583 } 584 r = 0; 585 goto done; 586 } 587 588 event_base_gettimeofday_cached(bev->ev_base, &now); 589 tick = ev_token_bucket_get_tick_(&now, cfg); 590 591 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { 592 /* no-op */ 593 r = 0; 594 goto done; 595 } 596 if (bevp->rate_limiting == NULL) { 597 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 598 if (!rlim) 599 goto done; 600 bevp->rate_limiting = rlim; 601 } else { 602 rlim = bevp->rate_limiting; 603 } 604 reinit = rlim->cfg != NULL; 605 606 rlim->cfg = cfg; 607 ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit); 608 609 if (reinit) { 610 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); 611 event_del(&rlim->refill_bucket_event); 612 } 613 event_assign(&rlim->refill_bucket_event, bev->ev_base, 614 -1, EV_FINALIZE, bev_refill_callback_, bevp); 615 616 if (rlim->limit.read_limit > 0) { 617 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 618 } else { 619 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 620 suspended=1; 621 } 622 if (rlim->limit.write_limit > 0) { 623 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 624 } else { 625 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 626 suspended = 1; 627 } 628 629 if (suspended) 630 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); 631 632 r = 0; 633 634 done: 635 BEV_UNLOCK(bev); 636 return r; 637 } 638 639 struct bufferevent_rate_limit_group * 640 bufferevent_rate_limit_group_new(struct event_base *base, 641 const struct ev_token_bucket_cfg *cfg) 642 { 643 struct bufferevent_rate_limit_group *g; 644 struct timeval now; 645 ev_uint32_t tick; 646 647 event_base_gettimeofday_cached(base, &now); 648 tick = ev_token_bucket_get_tick_(&now, cfg); 649 650 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); 651 if (!g) 652 return NULL; 653 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 654 LIST_INIT(&g->members); 655 656 ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); 657 658 event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, 659 bev_group_refill_callback_, g); 660 /*XXXX handle event_add failure */ 661 event_add(&g->master_refill_event, &cfg->tick_timeout); 662 663 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 664 665 bufferevent_rate_limit_group_set_min_share(g, 64); 666 667 evutil_weakrand_seed_(&g->weakrand_seed, 668 (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g)); 669 670 return g; 671 } 672 673 int 674 bufferevent_rate_limit_group_set_cfg( 675 struct bufferevent_rate_limit_group *g, 676 const struct ev_token_bucket_cfg *cfg) 677 { 678 int same_tick; 679 if (!g || !cfg) 680 return -1; 681 682 LOCK_GROUP(g); 683 same_tick = evutil_timercmp( 684 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); 685 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 686 687 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) 688 g->rate_limit.read_limit = cfg->read_maximum; 689 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) 690 g->rate_limit.write_limit = cfg->write_maximum; 691 692 if (!same_tick) { 693 /* This can cause a hiccup in the schedule */ 694 event_add(&g->master_refill_event, &cfg->tick_timeout); 695 } 696 697 /* The new limits might force us to adjust min_share differently. */ 698 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); 699 700 UNLOCK_GROUP(g); 701 return 0; 702 } 703 704 int 705 bufferevent_rate_limit_group_set_min_share( 706 struct bufferevent_rate_limit_group *g, 707 size_t share) 708 { 709 if (share > EV_SSIZE_MAX) 710 return -1; 711 712 g->configured_min_share = share; 713 714 /* Can't set share to less than the one-tick maximum. IOW, at steady 715 * state, at least one connection can go per tick. */ 716 if (share > g->rate_limit_cfg.read_rate) 717 share = g->rate_limit_cfg.read_rate; 718 if (share > g->rate_limit_cfg.write_rate) 719 share = g->rate_limit_cfg.write_rate; 720 721 g->min_share = share; 722 return 0; 723 } 724 725 void 726 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) 727 { 728 LOCK_GROUP(g); 729 EVUTIL_ASSERT(0 == g->n_members); 730 event_del(&g->master_refill_event); 731 UNLOCK_GROUP(g); 732 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 733 mm_free(g); 734 } 735 736 int 737 bufferevent_add_to_rate_limit_group(struct bufferevent *bev, 738 struct bufferevent_rate_limit_group *g) 739 { 740 int wsuspend, rsuspend; 741 struct bufferevent_private *bevp = 742 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 743 BEV_LOCK(bev); 744 745 if (!bevp->rate_limiting) { 746 struct bufferevent_rate_limit *rlim; 747 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 748 if (!rlim) { 749 BEV_UNLOCK(bev); 750 return -1; 751 } 752 event_assign(&rlim->refill_bucket_event, bev->ev_base, 753 -1, EV_FINALIZE, bev_refill_callback_, bevp); 754 bevp->rate_limiting = rlim; 755 } 756 757 if (bevp->rate_limiting->group == g) { 758 BEV_UNLOCK(bev); 759 return 0; 760 } 761 if (bevp->rate_limiting->group) 762 bufferevent_remove_from_rate_limit_group(bev); 763 764 LOCK_GROUP(g); 765 bevp->rate_limiting->group = g; 766 ++g->n_members; 767 LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group); 768 769 rsuspend = g->read_suspended; 770 wsuspend = g->write_suspended; 771 772 UNLOCK_GROUP(g); 773 774 if (rsuspend) 775 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP); 776 if (wsuspend) 777 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP); 778 779 BEV_UNLOCK(bev); 780 return 0; 781 } 782 783 int 784 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) 785 { 786 return bufferevent_remove_from_rate_limit_group_internal_(bev, 1); 787 } 788 789 int 790 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev, 791 int unsuspend) 792 { 793 struct bufferevent_private *bevp = 794 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 795 BEV_LOCK(bev); 796 if (bevp->rate_limiting && bevp->rate_limiting->group) { 797 struct bufferevent_rate_limit_group *g = 798 bevp->rate_limiting->group; 799 LOCK_GROUP(g); 800 bevp->rate_limiting->group = NULL; 801 --g->n_members; 802 LIST_REMOVE(bevp, rate_limiting->next_in_group); 803 UNLOCK_GROUP(g); 804 } 805 if (unsuspend) { 806 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP); 807 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP); 808 } 809 BEV_UNLOCK(bev); 810 return 0; 811 } 812 813 /* === 814 * API functions to expose rate limits. 815 * 816 * Don't use these from inside Libevent; they're meant to be for use by 817 * the program. 818 * === */ 819 820 /* Mostly you don't want to use this function from inside libevent; 821 * bufferevent_get_read_max_() is more likely what you want*/ 822 ev_ssize_t 823 bufferevent_get_read_limit(struct bufferevent *bev) 824 { 825 ev_ssize_t r; 826 struct bufferevent_private *bevp; 827 BEV_LOCK(bev); 828 bevp = BEV_UPCAST(bev); 829 if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 830 bufferevent_update_buckets(bevp); 831 r = bevp->rate_limiting->limit.read_limit; 832 } else { 833 r = EV_SSIZE_MAX; 834 } 835 BEV_UNLOCK(bev); 836 return r; 837 } 838 839 /* Mostly you don't want to use this function from inside libevent; 840 * bufferevent_get_write_max_() is more likely what you want*/ 841 ev_ssize_t 842 bufferevent_get_write_limit(struct bufferevent *bev) 843 { 844 ev_ssize_t r; 845 struct bufferevent_private *bevp; 846 BEV_LOCK(bev); 847 bevp = BEV_UPCAST(bev); 848 if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 849 bufferevent_update_buckets(bevp); 850 r = bevp->rate_limiting->limit.write_limit; 851 } else { 852 r = EV_SSIZE_MAX; 853 } 854 BEV_UNLOCK(bev); 855 return r; 856 } 857 858 int 859 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size) 860 { 861 struct bufferevent_private *bevp; 862 BEV_LOCK(bev); 863 bevp = BEV_UPCAST(bev); 864 if (size == 0 || size > EV_SSIZE_MAX) 865 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; 866 else 867 bevp->max_single_read = size; 868 BEV_UNLOCK(bev); 869 return 0; 870 } 871 872 int 873 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size) 874 { 875 struct bufferevent_private *bevp; 876 BEV_LOCK(bev); 877 bevp = BEV_UPCAST(bev); 878 if (size == 0 || size > EV_SSIZE_MAX) 879 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 880 else 881 bevp->max_single_write = size; 882 BEV_UNLOCK(bev); 883 return 0; 884 } 885 886 ev_ssize_t 887 bufferevent_get_max_single_read(struct bufferevent *bev) 888 { 889 ev_ssize_t r; 890 891 BEV_LOCK(bev); 892 r = BEV_UPCAST(bev)->max_single_read; 893 BEV_UNLOCK(bev); 894 return r; 895 } 896 897 ev_ssize_t 898 bufferevent_get_max_single_write(struct bufferevent *bev) 899 { 900 ev_ssize_t r; 901 902 BEV_LOCK(bev); 903 r = BEV_UPCAST(bev)->max_single_write; 904 BEV_UNLOCK(bev); 905 return r; 906 } 907 908 ev_ssize_t 909 bufferevent_get_max_to_read(struct bufferevent *bev) 910 { 911 ev_ssize_t r; 912 BEV_LOCK(bev); 913 r = bufferevent_get_read_max_(BEV_UPCAST(bev)); 914 BEV_UNLOCK(bev); 915 return r; 916 } 917 918 ev_ssize_t 919 bufferevent_get_max_to_write(struct bufferevent *bev) 920 { 921 ev_ssize_t r; 922 BEV_LOCK(bev); 923 r = bufferevent_get_write_max_(BEV_UPCAST(bev)); 924 BEV_UNLOCK(bev); 925 return r; 926 } 927 928 const struct ev_token_bucket_cfg * 929 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) { 930 struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 931 struct ev_token_bucket_cfg *cfg; 932 933 BEV_LOCK(bev); 934 935 if (bufev_private->rate_limiting) { 936 cfg = bufev_private->rate_limiting->cfg; 937 } else { 938 cfg = NULL; 939 } 940 941 BEV_UNLOCK(bev); 942 943 return cfg; 944 } 945 946 /* Mostly you don't want to use this function from inside libevent; 947 * bufferevent_get_read_max_() is more likely what you want*/ 948 ev_ssize_t 949 bufferevent_rate_limit_group_get_read_limit( 950 struct bufferevent_rate_limit_group *grp) 951 { 952 ev_ssize_t r; 953 LOCK_GROUP(grp); 954 r = grp->rate_limit.read_limit; 955 UNLOCK_GROUP(grp); 956 return r; 957 } 958 959 /* Mostly you don't want to use this function from inside libevent; 960 * bufferevent_get_write_max_() is more likely what you want. */ 961 ev_ssize_t 962 bufferevent_rate_limit_group_get_write_limit( 963 struct bufferevent_rate_limit_group *grp) 964 { 965 ev_ssize_t r; 966 LOCK_GROUP(grp); 967 r = grp->rate_limit.write_limit; 968 UNLOCK_GROUP(grp); 969 return r; 970 } 971 972 int 973 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) 974 { 975 int r = 0; 976 ev_ssize_t old_limit, new_limit; 977 struct bufferevent_private *bevp; 978 BEV_LOCK(bev); 979 bevp = BEV_UPCAST(bev); 980 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 981 old_limit = bevp->rate_limiting->limit.read_limit; 982 983 new_limit = (bevp->rate_limiting->limit.read_limit -= decr); 984 if (old_limit > 0 && new_limit <= 0) { 985 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 986 if (event_add(&bevp->rate_limiting->refill_bucket_event, 987 &bevp->rate_limiting->cfg->tick_timeout) < 0) 988 r = -1; 989 } else if (old_limit <= 0 && new_limit > 0) { 990 if (!(bevp->write_suspended & BEV_SUSPEND_BW)) 991 event_del(&bevp->rate_limiting->refill_bucket_event); 992 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 993 } 994 995 BEV_UNLOCK(bev); 996 return r; 997 } 998 999 int 1000 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) 1001 { 1002 /* XXXX this is mostly copy-and-paste from 1003 * bufferevent_decrement_read_limit */ 1004 int r = 0; 1005 ev_ssize_t old_limit, new_limit; 1006 struct bufferevent_private *bevp; 1007 BEV_LOCK(bev); 1008 bevp = BEV_UPCAST(bev); 1009 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 1010 old_limit = bevp->rate_limiting->limit.write_limit; 1011 1012 new_limit = (bevp->rate_limiting->limit.write_limit -= decr); 1013 if (old_limit > 0 && new_limit <= 0) { 1014 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 1015 if (event_add(&bevp->rate_limiting->refill_bucket_event, 1016 &bevp->rate_limiting->cfg->tick_timeout) < 0) 1017 r = -1; 1018 } else if (old_limit <= 0 && new_limit > 0) { 1019 if (!(bevp->read_suspended & BEV_SUSPEND_BW)) 1020 event_del(&bevp->rate_limiting->refill_bucket_event); 1021 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 1022 } 1023 1024 BEV_UNLOCK(bev); 1025 return r; 1026 } 1027 1028 int 1029 bufferevent_rate_limit_group_decrement_read( 1030 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1031 { 1032 int r = 0; 1033 ev_ssize_t old_limit, new_limit; 1034 LOCK_GROUP(grp); 1035 old_limit = grp->rate_limit.read_limit; 1036 new_limit = (grp->rate_limit.read_limit -= decr); 1037 1038 if (old_limit > 0 && new_limit <= 0) { 1039 bev_group_suspend_reading_(grp); 1040 } else if (old_limit <= 0 && new_limit > 0) { 1041 bev_group_unsuspend_reading_(grp); 1042 } 1043 1044 UNLOCK_GROUP(grp); 1045 return r; 1046 } 1047 1048 int 1049 bufferevent_rate_limit_group_decrement_write( 1050 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1051 { 1052 int r = 0; 1053 ev_ssize_t old_limit, new_limit; 1054 LOCK_GROUP(grp); 1055 old_limit = grp->rate_limit.write_limit; 1056 new_limit = (grp->rate_limit.write_limit -= decr); 1057 1058 if (old_limit > 0 && new_limit <= 0) { 1059 bev_group_suspend_writing_(grp); 1060 } else if (old_limit <= 0 && new_limit > 0) { 1061 bev_group_unsuspend_writing_(grp); 1062 } 1063 1064 UNLOCK_GROUP(grp); 1065 return r; 1066 } 1067 1068 void 1069 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, 1070 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) 1071 { 1072 EVUTIL_ASSERT(grp != NULL); 1073 if (total_read_out) 1074 *total_read_out = grp->total_read; 1075 if (total_written_out) 1076 *total_written_out = grp->total_written; 1077 } 1078 1079 void 1080 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) 1081 { 1082 grp->total_read = grp->total_written = 0; 1083 } 1084 1085 int 1086 bufferevent_ratelim_init_(struct bufferevent_private *bev) 1087 { 1088 bev->rate_limiting = NULL; 1089 bev->max_single_read = MAX_SINGLE_READ_DEFAULT; 1090 bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 1091 1092 return 0; 1093 } 1094