1 /* $NetBSD: bufferevent_ratelim.c,v 1.4 2021/04/07 03:36:48 christos Exp $ */ 2 3 /* 4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 5 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. The name of the author may not be used to endorse or promote products 17 * derived from this software without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 #include "evconfig-private.h" 31 32 #include <sys/types.h> 33 #include <limits.h> 34 #include <string.h> 35 #include <stdlib.h> 36 37 #include "event2/event.h" 38 #include "event2/event_struct.h" 39 #include "event2/util.h" 40 #include "event2/bufferevent.h" 41 #include "event2/bufferevent_struct.h" 42 #include "event2/buffer.h" 43 44 #include "ratelim-internal.h" 45 46 #include "bufferevent-internal.h" 47 #include "mm-internal.h" 48 #include "util-internal.h" 49 #include "event-internal.h" 50 51 int 52 ev_token_bucket_init_(struct ev_token_bucket *bucket, 53 const struct ev_token_bucket_cfg *cfg, 54 ev_uint32_t current_tick, 55 int reinitialize) 56 { 57 if (reinitialize) { 58 /* on reinitialization, we only clip downwards, since we've 59 already used who-knows-how-much bandwidth this tick. We 60 leave "last_updated" as it is; the next update will add the 61 appropriate amount of bandwidth to the bucket. 62 */ 63 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) 64 bucket->read_limit = cfg->read_maximum; 65 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) 66 bucket->write_limit = cfg->write_maximum; 67 } else { 68 bucket->read_limit = cfg->read_rate; 69 bucket->write_limit = cfg->write_rate; 70 bucket->last_updated = current_tick; 71 } 72 return 0; 73 } 74 75 int 76 ev_token_bucket_update_(struct ev_token_bucket *bucket, 77 const struct ev_token_bucket_cfg *cfg, 78 ev_uint32_t current_tick) 79 { 80 /* It's okay if the tick number overflows, since we'll just 81 * wrap around when we do the unsigned substraction. */ 82 unsigned n_ticks = current_tick - bucket->last_updated; 83 84 /* Make sure some ticks actually happened, and that time didn't 85 * roll back. */ 86 if (n_ticks == 0 || n_ticks > INT_MAX) 87 return 0; 88 89 /* Naively, we would say 90 bucket->limit += n_ticks * cfg->rate; 91 92 if (bucket->limit > cfg->maximum) 93 bucket->limit = cfg->maximum; 94 95 But we're worried about overflow, so we do it like this: 96 */ 97 98 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) 99 bucket->read_limit = cfg->read_maximum; 100 else 101 bucket->read_limit += n_ticks * cfg->read_rate; 102 103 104 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) 105 bucket->write_limit = cfg->write_maximum; 106 else 107 bucket->write_limit += n_ticks * cfg->write_rate; 108 109 110 bucket->last_updated = current_tick; 111 112 return 1; 113 } 114 115 static inline void 116 bufferevent_update_buckets(struct bufferevent_private *bev) 117 { 118 /* Must hold lock on bev. */ 119 struct timeval now; 120 unsigned tick; 121 event_base_gettimeofday_cached(bev->bev.ev_base, &now); 122 tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); 123 if (tick != bev->rate_limiting->limit.last_updated) 124 ev_token_bucket_update_(&bev->rate_limiting->limit, 125 bev->rate_limiting->cfg, tick); 126 } 127 128 ev_uint32_t 129 ev_token_bucket_get_tick_(const struct timeval *tv, 130 const struct ev_token_bucket_cfg *cfg) 131 { 132 /* This computation uses two multiplies and a divide. We could do 133 * fewer if we knew that the tick length was an integer number of 134 * seconds, or if we knew it divided evenly into a second. We should 135 * investigate that more. 136 */ 137 138 /* We cast to an ev_uint64_t first, since we don't want to overflow 139 * before we do the final divide. */ 140 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; 141 return (unsigned)(msec / cfg->msec_per_tick); 142 } 143 144 struct ev_token_bucket_cfg * 145 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, 146 size_t write_rate, size_t write_burst, 147 const struct timeval *tick_len) 148 { 149 struct ev_token_bucket_cfg *r; 150 struct timeval g; 151 if (! tick_len) { 152 g.tv_sec = 1; 153 g.tv_usec = 0; 154 tick_len = &g; 155 } 156 if (read_rate > read_burst || write_rate > write_burst || 157 read_rate < 1 || write_rate < 1) 158 return NULL; 159 if (read_rate > EV_RATE_LIMIT_MAX || 160 write_rate > EV_RATE_LIMIT_MAX || 161 read_burst > EV_RATE_LIMIT_MAX || 162 write_burst > EV_RATE_LIMIT_MAX) 163 return NULL; 164 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); 165 if (!r) 166 return NULL; 167 r->read_rate = read_rate; 168 r->write_rate = write_rate; 169 r->read_maximum = read_burst; 170 r->write_maximum = write_burst; 171 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); 172 r->msec_per_tick = (tick_len->tv_sec * 1000) + 173 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; 174 return r; 175 } 176 177 void 178 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) 179 { 180 mm_free(cfg); 181 } 182 183 /* Default values for max_single_read & max_single_write variables. */ 184 #define MAX_SINGLE_READ_DEFAULT 16384 185 #define MAX_SINGLE_WRITE_DEFAULT 16384 186 187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) 188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) 189 190 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g); 191 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g); 192 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g); 193 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g); 194 195 /** Helper: figure out the maximum amount we should write if is_write, or 196 the maximum amount we should read if is_read. Return that maximum, or 197 0 if our bucket is wholly exhausted. 198 */ 199 static inline ev_ssize_t 200 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write) 201 { 202 /* needs lock on bev. */ 203 ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; 204 205 #define LIM(x) \ 206 (is_write ? (x).write_limit : (x).read_limit) 207 208 #define GROUP_SUSPENDED(g) \ 209 (is_write ? (g)->write_suspended : (g)->read_suspended) 210 211 /* Sets max_so_far to MIN(x, max_so_far) */ 212 #define CLAMPTO(x) \ 213 do { \ 214 if (max_so_far > (x)) \ 215 max_so_far = (x); \ 216 } while (/*CONSTCOND*/0); 217 218 if (!bev->rate_limiting) 219 return max_so_far; 220 221 /* If rate-limiting is enabled at all, update the appropriate 222 bucket, and take the smaller of our rate limit and the group 223 rate limit. 224 */ 225 226 if (bev->rate_limiting->cfg) { 227 bufferevent_update_buckets(bev); 228 max_so_far = LIM(bev->rate_limiting->limit); 229 } 230 if (bev->rate_limiting->group) { 231 struct bufferevent_rate_limit_group *g = 232 bev->rate_limiting->group; 233 ev_ssize_t share; 234 LOCK_GROUP(g); 235 if (GROUP_SUSPENDED(g)) { 236 /* We can get here if we failed to lock this 237 * particular bufferevent while suspending the whole 238 * group. */ 239 if (is_write) 240 bufferevent_suspend_write_(&bev->bev, 241 BEV_SUSPEND_BW_GROUP); 242 else 243 bufferevent_suspend_read_(&bev->bev, 244 BEV_SUSPEND_BW_GROUP); 245 share = 0; 246 } else { 247 /* XXXX probably we should divide among the active 248 * members, not the total members. */ 249 share = LIM(g->rate_limit) / g->n_members; 250 if (share < g->min_share) 251 share = g->min_share; 252 } 253 UNLOCK_GROUP(g); 254 CLAMPTO(share); 255 } 256 257 if (max_so_far < 0) 258 max_so_far = 0; 259 return max_so_far; 260 } 261 262 ev_ssize_t 263 bufferevent_get_read_max_(struct bufferevent_private *bev) 264 { 265 return bufferevent_get_rlim_max_(bev, 0); 266 } 267 268 ev_ssize_t 269 bufferevent_get_write_max_(struct bufferevent_private *bev) 270 { 271 return bufferevent_get_rlim_max_(bev, 1); 272 } 273 274 int 275 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 276 { 277 /* XXXXX Make sure all users of this function check its return value */ 278 int r = 0; 279 /* need to hold lock on bev */ 280 if (!bev->rate_limiting) 281 return 0; 282 283 if (bev->rate_limiting->cfg) { 284 bev->rate_limiting->limit.read_limit -= bytes; 285 if (bev->rate_limiting->limit.read_limit <= 0) { 286 bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW); 287 if (event_add(&bev->rate_limiting->refill_bucket_event, 288 &bev->rate_limiting->cfg->tick_timeout) < 0) 289 r = -1; 290 } else if (bev->read_suspended & BEV_SUSPEND_BW) { 291 if (!(bev->write_suspended & BEV_SUSPEND_BW)) 292 event_del(&bev->rate_limiting->refill_bucket_event); 293 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 294 } 295 } 296 297 if (bev->rate_limiting->group) { 298 LOCK_GROUP(bev->rate_limiting->group); 299 bev->rate_limiting->group->rate_limit.read_limit -= bytes; 300 bev->rate_limiting->group->total_read += bytes; 301 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { 302 bev_group_suspend_reading_(bev->rate_limiting->group); 303 } else if (bev->rate_limiting->group->read_suspended) { 304 bev_group_unsuspend_reading_(bev->rate_limiting->group); 305 } 306 UNLOCK_GROUP(bev->rate_limiting->group); 307 } 308 309 return r; 310 } 311 312 int 313 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 314 { 315 /* XXXXX Make sure all users of this function check its return value */ 316 int r = 0; 317 /* need to hold lock */ 318 if (!bev->rate_limiting) 319 return 0; 320 321 if (bev->rate_limiting->cfg) { 322 bev->rate_limiting->limit.write_limit -= bytes; 323 if (bev->rate_limiting->limit.write_limit <= 0) { 324 bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW); 325 if (event_add(&bev->rate_limiting->refill_bucket_event, 326 &bev->rate_limiting->cfg->tick_timeout) < 0) 327 r = -1; 328 } else if (bev->write_suspended & BEV_SUSPEND_BW) { 329 if (!(bev->read_suspended & BEV_SUSPEND_BW)) 330 event_del(&bev->rate_limiting->refill_bucket_event); 331 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 332 } 333 } 334 335 if (bev->rate_limiting->group) { 336 LOCK_GROUP(bev->rate_limiting->group); 337 bev->rate_limiting->group->rate_limit.write_limit -= bytes; 338 bev->rate_limiting->group->total_written += bytes; 339 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { 340 bev_group_suspend_writing_(bev->rate_limiting->group); 341 } else if (bev->rate_limiting->group->write_suspended) { 342 bev_group_unsuspend_writing_(bev->rate_limiting->group); 343 } 344 UNLOCK_GROUP(bev->rate_limiting->group); 345 } 346 347 return r; 348 } 349 350 /** Stop reading on every bufferevent in <b>g</b> */ 351 static int 352 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g) 353 { 354 /* Needs group lock */ 355 struct bufferevent_private *bev; 356 g->read_suspended = 1; 357 g->pending_unsuspend_read = 0; 358 359 /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK, 360 to prevent a deadlock. (Ordinarily, the group lock nests inside 361 the bufferevent locks. If we are unable to lock any individual 362 bufferevent, it will find out later when it looks at its limit 363 and sees that its group is suspended.) 364 */ 365 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 366 if (EVLOCK_TRY_LOCK_(bev->lock)) { 367 bufferevent_suspend_read_(&bev->bev, 368 BEV_SUSPEND_BW_GROUP); 369 EVLOCK_UNLOCK(bev->lock, 0); 370 } 371 } 372 return 0; 373 } 374 375 /** Stop writing on every bufferevent in <b>g</b> */ 376 static int 377 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g) 378 { 379 /* Needs group lock */ 380 struct bufferevent_private *bev; 381 g->write_suspended = 1; 382 g->pending_unsuspend_write = 0; 383 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 384 if (EVLOCK_TRY_LOCK_(bev->lock)) { 385 bufferevent_suspend_write_(&bev->bev, 386 BEV_SUSPEND_BW_GROUP); 387 EVLOCK_UNLOCK(bev->lock, 0); 388 } 389 } 390 return 0; 391 } 392 393 /** Timer callback invoked on a single bufferevent with one or more exhausted 394 buckets when they are ready to refill. */ 395 static void 396 bev_refill_callback_(evutil_socket_t fd, short what, void *arg) 397 { 398 unsigned tick; 399 struct timeval now; 400 struct bufferevent_private *bev = arg; 401 int again = 0; 402 BEV_LOCK(&bev->bev); 403 if (!bev->rate_limiting || !bev->rate_limiting->cfg) { 404 BEV_UNLOCK(&bev->bev); 405 return; 406 } 407 408 /* First, update the bucket */ 409 event_base_gettimeofday_cached(bev->bev.ev_base, &now); 410 tick = ev_token_bucket_get_tick_(&now, 411 bev->rate_limiting->cfg); 412 ev_token_bucket_update_(&bev->rate_limiting->limit, 413 bev->rate_limiting->cfg, 414 tick); 415 416 /* Now unsuspend any read/write operations as appropriate. */ 417 if ((bev->read_suspended & BEV_SUSPEND_BW)) { 418 if (bev->rate_limiting->limit.read_limit > 0) 419 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 420 else 421 again = 1; 422 } 423 if ((bev->write_suspended & BEV_SUSPEND_BW)) { 424 if (bev->rate_limiting->limit.write_limit > 0) 425 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 426 else 427 again = 1; 428 } 429 if (again) { 430 /* One or more of the buckets may need another refill if they 431 started negative. 432 433 XXXX if we need to be quiet for more ticks, we should 434 maybe figure out what timeout we really want. 435 */ 436 /* XXXX Handle event_add failure somehow */ 437 event_add(&bev->rate_limiting->refill_bucket_event, 438 &bev->rate_limiting->cfg->tick_timeout); 439 } 440 BEV_UNLOCK(&bev->bev); 441 } 442 443 /** Helper: grab a random element from a bufferevent group. 444 * 445 * Requires that we hold the lock on the group. 446 */ 447 static struct bufferevent_private * 448 bev_group_random_element_(struct bufferevent_rate_limit_group *group) 449 { 450 int which; 451 struct bufferevent_private *bev; 452 453 /* requires group lock */ 454 455 if (!group->n_members) 456 return NULL; 457 458 EVUTIL_ASSERT(! LIST_EMPTY(&group->members)); 459 460 which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members); 461 462 bev = LIST_FIRST(&group->members); 463 while (which--) 464 bev = LIST_NEXT(bev, rate_limiting->next_in_group); 465 466 return bev; 467 } 468 469 /** Iterate over the elements of a rate-limiting group 'g' with a random 470 starting point, assigning each to the variable 'bev', and executing the 471 block 'block'. 472 473 We do this in a half-baked effort to get fairness among group members. 474 XXX Round-robin or some kind of priority queue would be even more fair. 475 */ 476 #define FOREACH_RANDOM_ORDER(block) \ 477 do { \ 478 first = bev_group_random_element_(g); \ 479 for (bev = first; bev != LIST_END(&g->members); \ 480 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 481 block ; \ 482 } \ 483 for (bev = LIST_FIRST(&g->members); bev && bev != first; \ 484 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 485 block ; \ 486 } \ 487 } while (/*CONSTCOND*/0) 488 489 static void 490 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g) 491 { 492 int again = 0; 493 struct bufferevent_private *bev, *first; 494 495 g->read_suspended = 0; 496 FOREACH_RANDOM_ORDER({ 497 if (EVLOCK_TRY_LOCK_(bev->lock)) { 498 bufferevent_unsuspend_read_(&bev->bev, 499 BEV_SUSPEND_BW_GROUP); 500 EVLOCK_UNLOCK(bev->lock, 0); 501 } else { 502 again = 1; 503 } 504 }); 505 g->pending_unsuspend_read = again; 506 } 507 508 static void 509 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g) 510 { 511 int again = 0; 512 struct bufferevent_private *bev, *first; 513 g->write_suspended = 0; 514 515 FOREACH_RANDOM_ORDER({ 516 if (EVLOCK_TRY_LOCK_(bev->lock)) { 517 bufferevent_unsuspend_write_(&bev->bev, 518 BEV_SUSPEND_BW_GROUP); 519 EVLOCK_UNLOCK(bev->lock, 0); 520 } else { 521 again = 1; 522 } 523 }); 524 g->pending_unsuspend_write = again; 525 } 526 527 /** Callback invoked every tick to add more elements to the group bucket 528 and unsuspend group members as needed. 529 */ 530 static void 531 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg) 532 { 533 struct bufferevent_rate_limit_group *g = arg; 534 unsigned tick; 535 struct timeval now; 536 537 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); 538 539 LOCK_GROUP(g); 540 541 tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg); 542 ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick); 543 544 if (g->pending_unsuspend_read || 545 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { 546 bev_group_unsuspend_reading_(g); 547 } 548 if (g->pending_unsuspend_write || 549 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ 550 bev_group_unsuspend_writing_(g); 551 } 552 553 /* XXXX Rather than waiting to the next tick to unsuspend stuff 554 * with pending_unsuspend_write/read, we should do it on the 555 * next iteration of the mainloop. 556 */ 557 558 UNLOCK_GROUP(g); 559 } 560 561 int 562 bufferevent_set_rate_limit(struct bufferevent *bev, 563 struct ev_token_bucket_cfg *cfg) 564 { 565 struct bufferevent_private *bevp = BEV_UPCAST(bev); 566 int r = -1; 567 struct bufferevent_rate_limit *rlim; 568 struct timeval now; 569 ev_uint32_t tick; 570 int reinit = 0, suspended = 0; 571 /* XXX reference-count cfg */ 572 573 BEV_LOCK(bev); 574 575 if (cfg == NULL) { 576 if (bevp->rate_limiting) { 577 rlim = bevp->rate_limiting; 578 rlim->cfg = NULL; 579 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 580 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 581 if (event_initialized(&rlim->refill_bucket_event)) 582 event_del(&rlim->refill_bucket_event); 583 } 584 r = 0; 585 goto done; 586 } 587 588 event_base_gettimeofday_cached(bev->ev_base, &now); 589 tick = ev_token_bucket_get_tick_(&now, cfg); 590 591 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { 592 /* no-op */ 593 r = 0; 594 goto done; 595 } 596 if (bevp->rate_limiting == NULL) { 597 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 598 if (!rlim) 599 goto done; 600 bevp->rate_limiting = rlim; 601 } else { 602 rlim = bevp->rate_limiting; 603 } 604 reinit = rlim->cfg != NULL; 605 606 rlim->cfg = cfg; 607 ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit); 608 609 if (reinit) { 610 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); 611 event_del(&rlim->refill_bucket_event); 612 } 613 event_assign(&rlim->refill_bucket_event, bev->ev_base, 614 -1, EV_FINALIZE, bev_refill_callback_, bevp); 615 616 if (rlim->limit.read_limit > 0) { 617 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 618 } else { 619 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 620 suspended=1; 621 } 622 if (rlim->limit.write_limit > 0) { 623 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 624 } else { 625 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 626 suspended = 1; 627 } 628 629 if (suspended) 630 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); 631 632 r = 0; 633 634 done: 635 BEV_UNLOCK(bev); 636 return r; 637 } 638 639 struct bufferevent_rate_limit_group * 640 bufferevent_rate_limit_group_new(struct event_base *base, 641 const struct ev_token_bucket_cfg *cfg) 642 { 643 struct bufferevent_rate_limit_group *g; 644 struct timeval now; 645 ev_uint32_t tick; 646 647 event_base_gettimeofday_cached(base, &now); 648 tick = ev_token_bucket_get_tick_(&now, cfg); 649 650 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); 651 if (!g) 652 return NULL; 653 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 654 LIST_INIT(&g->members); 655 656 ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); 657 658 event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, 659 bev_group_refill_callback_, g); 660 /*XXXX handle event_add failure */ 661 event_add(&g->master_refill_event, &cfg->tick_timeout); 662 663 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 664 665 bufferevent_rate_limit_group_set_min_share(g, 64); 666 667 evutil_weakrand_seed_(&g->weakrand_seed, 668 (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g)); 669 670 return g; 671 } 672 673 int 674 bufferevent_rate_limit_group_set_cfg( 675 struct bufferevent_rate_limit_group *g, 676 const struct ev_token_bucket_cfg *cfg) 677 { 678 int same_tick; 679 if (!g || !cfg) 680 return -1; 681 682 LOCK_GROUP(g); 683 same_tick = evutil_timercmp( 684 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); 685 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 686 687 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) 688 g->rate_limit.read_limit = cfg->read_maximum; 689 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) 690 g->rate_limit.write_limit = cfg->write_maximum; 691 692 if (!same_tick) { 693 /* This can cause a hiccup in the schedule */ 694 event_add(&g->master_refill_event, &cfg->tick_timeout); 695 } 696 697 /* The new limits might force us to adjust min_share differently. */ 698 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); 699 700 UNLOCK_GROUP(g); 701 return 0; 702 } 703 704 int 705 bufferevent_rate_limit_group_set_min_share( 706 struct bufferevent_rate_limit_group *g, 707 size_t share) 708 { 709 if (share > EV_SSIZE_MAX) 710 return -1; 711 712 g->configured_min_share = share; 713 714 /* Can't set share to less than the one-tick maximum. IOW, at steady 715 * state, at least one connection can go per tick. */ 716 if (share > g->rate_limit_cfg.read_rate) 717 share = g->rate_limit_cfg.read_rate; 718 if (share > g->rate_limit_cfg.write_rate) 719 share = g->rate_limit_cfg.write_rate; 720 721 g->min_share = share; 722 return 0; 723 } 724 725 void 726 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) 727 { 728 LOCK_GROUP(g); 729 EVUTIL_ASSERT(0 == g->n_members); 730 event_del(&g->master_refill_event); 731 UNLOCK_GROUP(g); 732 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 733 mm_free(g); 734 } 735 736 int 737 bufferevent_add_to_rate_limit_group(struct bufferevent *bev, 738 struct bufferevent_rate_limit_group *g) 739 { 740 int wsuspend, rsuspend; 741 struct bufferevent_private *bevp = BEV_UPCAST(bev); 742 BEV_LOCK(bev); 743 744 if (!bevp->rate_limiting) { 745 struct bufferevent_rate_limit *rlim; 746 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 747 if (!rlim) { 748 BEV_UNLOCK(bev); 749 return -1; 750 } 751 event_assign(&rlim->refill_bucket_event, bev->ev_base, 752 -1, EV_FINALIZE, bev_refill_callback_, bevp); 753 bevp->rate_limiting = rlim; 754 } 755 756 if (bevp->rate_limiting->group == g) { 757 BEV_UNLOCK(bev); 758 return 0; 759 } 760 if (bevp->rate_limiting->group) 761 bufferevent_remove_from_rate_limit_group(bev); 762 763 LOCK_GROUP(g); 764 bevp->rate_limiting->group = g; 765 ++g->n_members; 766 LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group); 767 768 rsuspend = g->read_suspended; 769 wsuspend = g->write_suspended; 770 771 UNLOCK_GROUP(g); 772 773 if (rsuspend) 774 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP); 775 if (wsuspend) 776 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP); 777 778 BEV_UNLOCK(bev); 779 return 0; 780 } 781 782 int 783 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) 784 { 785 return bufferevent_remove_from_rate_limit_group_internal_(bev, 1); 786 } 787 788 int 789 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev, 790 int unsuspend) 791 { 792 struct bufferevent_private *bevp = BEV_UPCAST(bev); 793 BEV_LOCK(bev); 794 if (bevp->rate_limiting && bevp->rate_limiting->group) { 795 struct bufferevent_rate_limit_group *g = 796 bevp->rate_limiting->group; 797 LOCK_GROUP(g); 798 bevp->rate_limiting->group = NULL; 799 --g->n_members; 800 LIST_REMOVE(bevp, rate_limiting->next_in_group); 801 UNLOCK_GROUP(g); 802 } 803 if (unsuspend) { 804 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP); 805 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP); 806 } 807 BEV_UNLOCK(bev); 808 return 0; 809 } 810 811 /* === 812 * API functions to expose rate limits. 813 * 814 * Don't use these from inside Libevent; they're meant to be for use by 815 * the program. 816 * === */ 817 818 /* Mostly you don't want to use this function from inside libevent; 819 * bufferevent_get_read_max_() is more likely what you want*/ 820 ev_ssize_t 821 bufferevent_get_read_limit(struct bufferevent *bev) 822 { 823 ev_ssize_t r; 824 struct bufferevent_private *bevp; 825 BEV_LOCK(bev); 826 bevp = BEV_UPCAST(bev); 827 if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 828 bufferevent_update_buckets(bevp); 829 r = bevp->rate_limiting->limit.read_limit; 830 } else { 831 r = EV_SSIZE_MAX; 832 } 833 BEV_UNLOCK(bev); 834 return r; 835 } 836 837 /* Mostly you don't want to use this function from inside libevent; 838 * bufferevent_get_write_max_() is more likely what you want*/ 839 ev_ssize_t 840 bufferevent_get_write_limit(struct bufferevent *bev) 841 { 842 ev_ssize_t r; 843 struct bufferevent_private *bevp; 844 BEV_LOCK(bev); 845 bevp = BEV_UPCAST(bev); 846 if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 847 bufferevent_update_buckets(bevp); 848 r = bevp->rate_limiting->limit.write_limit; 849 } else { 850 r = EV_SSIZE_MAX; 851 } 852 BEV_UNLOCK(bev); 853 return r; 854 } 855 856 int 857 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size) 858 { 859 struct bufferevent_private *bevp; 860 BEV_LOCK(bev); 861 bevp = BEV_UPCAST(bev); 862 if (size == 0 || size > EV_SSIZE_MAX) 863 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; 864 else 865 bevp->max_single_read = size; 866 BEV_UNLOCK(bev); 867 return 0; 868 } 869 870 int 871 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size) 872 { 873 struct bufferevent_private *bevp; 874 BEV_LOCK(bev); 875 bevp = BEV_UPCAST(bev); 876 if (size == 0 || size > EV_SSIZE_MAX) 877 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 878 else 879 bevp->max_single_write = size; 880 BEV_UNLOCK(bev); 881 return 0; 882 } 883 884 ev_ssize_t 885 bufferevent_get_max_single_read(struct bufferevent *bev) 886 { 887 ev_ssize_t r; 888 889 BEV_LOCK(bev); 890 r = BEV_UPCAST(bev)->max_single_read; 891 BEV_UNLOCK(bev); 892 return r; 893 } 894 895 ev_ssize_t 896 bufferevent_get_max_single_write(struct bufferevent *bev) 897 { 898 ev_ssize_t r; 899 900 BEV_LOCK(bev); 901 r = BEV_UPCAST(bev)->max_single_write; 902 BEV_UNLOCK(bev); 903 return r; 904 } 905 906 ev_ssize_t 907 bufferevent_get_max_to_read(struct bufferevent *bev) 908 { 909 ev_ssize_t r; 910 BEV_LOCK(bev); 911 r = bufferevent_get_read_max_(BEV_UPCAST(bev)); 912 BEV_UNLOCK(bev); 913 return r; 914 } 915 916 ev_ssize_t 917 bufferevent_get_max_to_write(struct bufferevent *bev) 918 { 919 ev_ssize_t r; 920 BEV_LOCK(bev); 921 r = bufferevent_get_write_max_(BEV_UPCAST(bev)); 922 BEV_UNLOCK(bev); 923 return r; 924 } 925 926 const struct ev_token_bucket_cfg * 927 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) { 928 struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 929 struct ev_token_bucket_cfg *cfg; 930 931 BEV_LOCK(bev); 932 933 if (bufev_private->rate_limiting) { 934 cfg = bufev_private->rate_limiting->cfg; 935 } else { 936 cfg = NULL; 937 } 938 939 BEV_UNLOCK(bev); 940 941 return cfg; 942 } 943 944 /* Mostly you don't want to use this function from inside libevent; 945 * bufferevent_get_read_max_() is more likely what you want*/ 946 ev_ssize_t 947 bufferevent_rate_limit_group_get_read_limit( 948 struct bufferevent_rate_limit_group *grp) 949 { 950 ev_ssize_t r; 951 LOCK_GROUP(grp); 952 r = grp->rate_limit.read_limit; 953 UNLOCK_GROUP(grp); 954 return r; 955 } 956 957 /* Mostly you don't want to use this function from inside libevent; 958 * bufferevent_get_write_max_() is more likely what you want. */ 959 ev_ssize_t 960 bufferevent_rate_limit_group_get_write_limit( 961 struct bufferevent_rate_limit_group *grp) 962 { 963 ev_ssize_t r; 964 LOCK_GROUP(grp); 965 r = grp->rate_limit.write_limit; 966 UNLOCK_GROUP(grp); 967 return r; 968 } 969 970 int 971 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) 972 { 973 int r = 0; 974 ev_ssize_t old_limit, new_limit; 975 struct bufferevent_private *bevp; 976 BEV_LOCK(bev); 977 bevp = BEV_UPCAST(bev); 978 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 979 old_limit = bevp->rate_limiting->limit.read_limit; 980 981 new_limit = (bevp->rate_limiting->limit.read_limit -= decr); 982 if (old_limit > 0 && new_limit <= 0) { 983 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 984 if (event_add(&bevp->rate_limiting->refill_bucket_event, 985 &bevp->rate_limiting->cfg->tick_timeout) < 0) 986 r = -1; 987 } else if (old_limit <= 0 && new_limit > 0) { 988 if (!(bevp->write_suspended & BEV_SUSPEND_BW)) 989 event_del(&bevp->rate_limiting->refill_bucket_event); 990 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 991 } 992 993 BEV_UNLOCK(bev); 994 return r; 995 } 996 997 int 998 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) 999 { 1000 /* XXXX this is mostly copy-and-paste from 1001 * bufferevent_decrement_read_limit */ 1002 int r = 0; 1003 ev_ssize_t old_limit, new_limit; 1004 struct bufferevent_private *bevp; 1005 BEV_LOCK(bev); 1006 bevp = BEV_UPCAST(bev); 1007 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 1008 old_limit = bevp->rate_limiting->limit.write_limit; 1009 1010 new_limit = (bevp->rate_limiting->limit.write_limit -= decr); 1011 if (old_limit > 0 && new_limit <= 0) { 1012 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 1013 if (event_add(&bevp->rate_limiting->refill_bucket_event, 1014 &bevp->rate_limiting->cfg->tick_timeout) < 0) 1015 r = -1; 1016 } else if (old_limit <= 0 && new_limit > 0) { 1017 if (!(bevp->read_suspended & BEV_SUSPEND_BW)) 1018 event_del(&bevp->rate_limiting->refill_bucket_event); 1019 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 1020 } 1021 1022 BEV_UNLOCK(bev); 1023 return r; 1024 } 1025 1026 int 1027 bufferevent_rate_limit_group_decrement_read( 1028 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1029 { 1030 int r = 0; 1031 ev_ssize_t old_limit, new_limit; 1032 LOCK_GROUP(grp); 1033 old_limit = grp->rate_limit.read_limit; 1034 new_limit = (grp->rate_limit.read_limit -= decr); 1035 1036 if (old_limit > 0 && new_limit <= 0) { 1037 bev_group_suspend_reading_(grp); 1038 } else if (old_limit <= 0 && new_limit > 0) { 1039 bev_group_unsuspend_reading_(grp); 1040 } 1041 1042 UNLOCK_GROUP(grp); 1043 return r; 1044 } 1045 1046 int 1047 bufferevent_rate_limit_group_decrement_write( 1048 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1049 { 1050 int r = 0; 1051 ev_ssize_t old_limit, new_limit; 1052 LOCK_GROUP(grp); 1053 old_limit = grp->rate_limit.write_limit; 1054 new_limit = (grp->rate_limit.write_limit -= decr); 1055 1056 if (old_limit > 0 && new_limit <= 0) { 1057 bev_group_suspend_writing_(grp); 1058 } else if (old_limit <= 0 && new_limit > 0) { 1059 bev_group_unsuspend_writing_(grp); 1060 } 1061 1062 UNLOCK_GROUP(grp); 1063 return r; 1064 } 1065 1066 void 1067 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, 1068 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) 1069 { 1070 EVUTIL_ASSERT(grp != NULL); 1071 if (total_read_out) 1072 *total_read_out = grp->total_read; 1073 if (total_written_out) 1074 *total_written_out = grp->total_written; 1075 } 1076 1077 void 1078 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) 1079 { 1080 grp->total_read = grp->total_written = 0; 1081 } 1082 1083 int 1084 bufferevent_ratelim_init_(struct bufferevent_private *bev) 1085 { 1086 bev->rate_limiting = NULL; 1087 bev->max_single_read = MAX_SINGLE_READ_DEFAULT; 1088 bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 1089 1090 return 0; 1091 } 1092