1*9034ec65Schristos /* $NetBSD: bufferevent_ratelim.c,v 1.5 2020/05/25 20:47:33 christos Exp $ */
22b3787f6Schristos
32b3787f6Schristos /*
42b3787f6Schristos * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
52b3787f6Schristos * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
62b3787f6Schristos * All rights reserved.
72b3787f6Schristos *
82b3787f6Schristos * Redistribution and use in source and binary forms, with or without
92b3787f6Schristos * modification, are permitted provided that the following conditions
102b3787f6Schristos * are met:
112b3787f6Schristos * 1. Redistributions of source code must retain the above copyright
122b3787f6Schristos * notice, this list of conditions and the following disclaimer.
132b3787f6Schristos * 2. Redistributions in binary form must reproduce the above copyright
142b3787f6Schristos * notice, this list of conditions and the following disclaimer in the
152b3787f6Schristos * documentation and/or other materials provided with the distribution.
162b3787f6Schristos * 3. The name of the author may not be used to endorse or promote products
172b3787f6Schristos * derived from this software without specific prior written permission.
182b3787f6Schristos *
192b3787f6Schristos * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
202b3787f6Schristos * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
212b3787f6Schristos * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
222b3787f6Schristos * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
232b3787f6Schristos * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
242b3787f6Schristos * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
252b3787f6Schristos * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
262b3787f6Schristos * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
272b3787f6Schristos * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
282b3787f6Schristos * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
292b3787f6Schristos */
302b3787f6Schristos #include "evconfig-private.h"
312b3787f6Schristos
322b3787f6Schristos #include <sys/types.h>
332b3787f6Schristos #include <limits.h>
342b3787f6Schristos #include <string.h>
352b3787f6Schristos #include <stdlib.h>
362b3787f6Schristos
372b3787f6Schristos #include "event2/event.h"
382b3787f6Schristos #include "event2/event_struct.h"
392b3787f6Schristos #include "event2/util.h"
402b3787f6Schristos #include "event2/bufferevent.h"
412b3787f6Schristos #include "event2/bufferevent_struct.h"
422b3787f6Schristos #include "event2/buffer.h"
432b3787f6Schristos
442b3787f6Schristos #include "ratelim-internal.h"
452b3787f6Schristos
462b3787f6Schristos #include "bufferevent-internal.h"
472b3787f6Schristos #include "mm-internal.h"
482b3787f6Schristos #include "util-internal.h"
492b3787f6Schristos #include "event-internal.h"
502b3787f6Schristos
512b3787f6Schristos int
ev_token_bucket_init_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)522b3787f6Schristos ev_token_bucket_init_(struct ev_token_bucket *bucket,
532b3787f6Schristos const struct ev_token_bucket_cfg *cfg,
542b3787f6Schristos ev_uint32_t current_tick,
552b3787f6Schristos int reinitialize)
562b3787f6Schristos {
572b3787f6Schristos if (reinitialize) {
582b3787f6Schristos /* on reinitialization, we only clip downwards, since we've
592b3787f6Schristos already used who-knows-how-much bandwidth this tick. We
602b3787f6Schristos leave "last_updated" as it is; the next update will add the
612b3787f6Schristos appropriate amount of bandwidth to the bucket.
622b3787f6Schristos */
632b3787f6Schristos if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
642b3787f6Schristos bucket->read_limit = cfg->read_maximum;
652b3787f6Schristos if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
662b3787f6Schristos bucket->write_limit = cfg->write_maximum;
672b3787f6Schristos } else {
682b3787f6Schristos bucket->read_limit = cfg->read_rate;
692b3787f6Schristos bucket->write_limit = cfg->write_rate;
702b3787f6Schristos bucket->last_updated = current_tick;
712b3787f6Schristos }
722b3787f6Schristos return 0;
732b3787f6Schristos }
742b3787f6Schristos
752b3787f6Schristos int
ev_token_bucket_update_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)762b3787f6Schristos ev_token_bucket_update_(struct ev_token_bucket *bucket,
772b3787f6Schristos const struct ev_token_bucket_cfg *cfg,
782b3787f6Schristos ev_uint32_t current_tick)
792b3787f6Schristos {
802b3787f6Schristos /* It's okay if the tick number overflows, since we'll just
812b3787f6Schristos * wrap around when we do the unsigned substraction. */
822b3787f6Schristos unsigned n_ticks = current_tick - bucket->last_updated;
832b3787f6Schristos
842b3787f6Schristos /* Make sure some ticks actually happened, and that time didn't
852b3787f6Schristos * roll back. */
862b3787f6Schristos if (n_ticks == 0 || n_ticks > INT_MAX)
872b3787f6Schristos return 0;
882b3787f6Schristos
892b3787f6Schristos /* Naively, we would say
902b3787f6Schristos bucket->limit += n_ticks * cfg->rate;
912b3787f6Schristos
922b3787f6Schristos if (bucket->limit > cfg->maximum)
932b3787f6Schristos bucket->limit = cfg->maximum;
942b3787f6Schristos
952b3787f6Schristos But we're worried about overflow, so we do it like this:
962b3787f6Schristos */
972b3787f6Schristos
982b3787f6Schristos if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
992b3787f6Schristos bucket->read_limit = cfg->read_maximum;
1002b3787f6Schristos else
1012b3787f6Schristos bucket->read_limit += n_ticks * cfg->read_rate;
1022b3787f6Schristos
1032b3787f6Schristos
1042b3787f6Schristos if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
1052b3787f6Schristos bucket->write_limit = cfg->write_maximum;
1062b3787f6Schristos else
1072b3787f6Schristos bucket->write_limit += n_ticks * cfg->write_rate;
1082b3787f6Schristos
1092b3787f6Schristos
1102b3787f6Schristos bucket->last_updated = current_tick;
1112b3787f6Schristos
1122b3787f6Schristos return 1;
1132b3787f6Schristos }
1142b3787f6Schristos
1152b3787f6Schristos static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)1162b3787f6Schristos bufferevent_update_buckets(struct bufferevent_private *bev)
1172b3787f6Schristos {
1182b3787f6Schristos /* Must hold lock on bev. */
1192b3787f6Schristos struct timeval now;
1202b3787f6Schristos unsigned tick;
1212b3787f6Schristos event_base_gettimeofday_cached(bev->bev.ev_base, &now);
1222b3787f6Schristos tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
1232b3787f6Schristos if (tick != bev->rate_limiting->limit.last_updated)
1242b3787f6Schristos ev_token_bucket_update_(&bev->rate_limiting->limit,
1252b3787f6Schristos bev->rate_limiting->cfg, tick);
1262b3787f6Schristos }
1272b3787f6Schristos
1282b3787f6Schristos ev_uint32_t
ev_token_bucket_get_tick_(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)1292b3787f6Schristos ev_token_bucket_get_tick_(const struct timeval *tv,
1302b3787f6Schristos const struct ev_token_bucket_cfg *cfg)
1312b3787f6Schristos {
1322b3787f6Schristos /* This computation uses two multiplies and a divide. We could do
1332b3787f6Schristos * fewer if we knew that the tick length was an integer number of
1342b3787f6Schristos * seconds, or if we knew it divided evenly into a second. We should
1352b3787f6Schristos * investigate that more.
1362b3787f6Schristos */
1372b3787f6Schristos
1382b3787f6Schristos /* We cast to an ev_uint64_t first, since we don't want to overflow
1392b3787f6Schristos * before we do the final divide. */
1402b3787f6Schristos ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
1412b3787f6Schristos return (unsigned)(msec / cfg->msec_per_tick);
1422b3787f6Schristos }
1432b3787f6Schristos
1442b3787f6Schristos struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)1452b3787f6Schristos ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
1462b3787f6Schristos size_t write_rate, size_t write_burst,
1472b3787f6Schristos const struct timeval *tick_len)
1482b3787f6Schristos {
1492b3787f6Schristos struct ev_token_bucket_cfg *r;
1502b3787f6Schristos struct timeval g;
1512b3787f6Schristos if (! tick_len) {
1522b3787f6Schristos g.tv_sec = 1;
1532b3787f6Schristos g.tv_usec = 0;
1542b3787f6Schristos tick_len = &g;
1552b3787f6Schristos }
1562b3787f6Schristos if (read_rate > read_burst || write_rate > write_burst ||
1572b3787f6Schristos read_rate < 1 || write_rate < 1)
1582b3787f6Schristos return NULL;
1592b3787f6Schristos if (read_rate > EV_RATE_LIMIT_MAX ||
1602b3787f6Schristos write_rate > EV_RATE_LIMIT_MAX ||
1612b3787f6Schristos read_burst > EV_RATE_LIMIT_MAX ||
1622b3787f6Schristos write_burst > EV_RATE_LIMIT_MAX)
1632b3787f6Schristos return NULL;
1642b3787f6Schristos r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
1652b3787f6Schristos if (!r)
1662b3787f6Schristos return NULL;
1672b3787f6Schristos r->read_rate = read_rate;
1682b3787f6Schristos r->write_rate = write_rate;
1692b3787f6Schristos r->read_maximum = read_burst;
1702b3787f6Schristos r->write_maximum = write_burst;
1712b3787f6Schristos memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
1722b3787f6Schristos r->msec_per_tick = (tick_len->tv_sec * 1000) +
1732b3787f6Schristos (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
1742b3787f6Schristos return r;
1752b3787f6Schristos }
1762b3787f6Schristos
1772b3787f6Schristos void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)1782b3787f6Schristos ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
1792b3787f6Schristos {
1802b3787f6Schristos mm_free(cfg);
1812b3787f6Schristos }
1822b3787f6Schristos
1832b3787f6Schristos /* Default values for max_single_read & max_single_write variables. */
1842b3787f6Schristos #define MAX_SINGLE_READ_DEFAULT 16384
1852b3787f6Schristos #define MAX_SINGLE_WRITE_DEFAULT 16384
1862b3787f6Schristos
1872b3787f6Schristos #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
1882b3787f6Schristos #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
1892b3787f6Schristos
1902b3787f6Schristos static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
1912b3787f6Schristos static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
1922b3787f6Schristos static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
1932b3787f6Schristos static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
1942b3787f6Schristos
1952b3787f6Schristos /** Helper: figure out the maximum amount we should write if is_write, or
1962b3787f6Schristos the maximum amount we should read if is_read. Return that maximum, or
1972b3787f6Schristos 0 if our bucket is wholly exhausted.
1982b3787f6Schristos */
1992b3787f6Schristos static inline ev_ssize_t
bufferevent_get_rlim_max_(struct bufferevent_private * bev,int is_write)2002b3787f6Schristos bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
2012b3787f6Schristos {
2022b3787f6Schristos /* needs lock on bev. */
2032b3787f6Schristos ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
2042b3787f6Schristos
2052b3787f6Schristos #define LIM(x) \
2062b3787f6Schristos (is_write ? (x).write_limit : (x).read_limit)
2072b3787f6Schristos
2082b3787f6Schristos #define GROUP_SUSPENDED(g) \
2092b3787f6Schristos (is_write ? (g)->write_suspended : (g)->read_suspended)
2102b3787f6Schristos
2112b3787f6Schristos /* Sets max_so_far to MIN(x, max_so_far) */
2122b3787f6Schristos #define CLAMPTO(x) \
2132b3787f6Schristos do { \
2142b3787f6Schristos if (max_so_far > (x)) \
2152b3787f6Schristos max_so_far = (x); \
2162b3787f6Schristos } while (0);
2172b3787f6Schristos
2182b3787f6Schristos if (!bev->rate_limiting)
2192b3787f6Schristos return max_so_far;
2202b3787f6Schristos
2212b3787f6Schristos /* If rate-limiting is enabled at all, update the appropriate
2222b3787f6Schristos bucket, and take the smaller of our rate limit and the group
2232b3787f6Schristos rate limit.
2242b3787f6Schristos */
2252b3787f6Schristos
2262b3787f6Schristos if (bev->rate_limiting->cfg) {
2272b3787f6Schristos bufferevent_update_buckets(bev);
2282b3787f6Schristos max_so_far = LIM(bev->rate_limiting->limit);
2292b3787f6Schristos }
2302b3787f6Schristos if (bev->rate_limiting->group) {
2312b3787f6Schristos struct bufferevent_rate_limit_group *g =
2322b3787f6Schristos bev->rate_limiting->group;
2332b3787f6Schristos ev_ssize_t share;
2342b3787f6Schristos LOCK_GROUP(g);
2352b3787f6Schristos if (GROUP_SUSPENDED(g)) {
2362b3787f6Schristos /* We can get here if we failed to lock this
2372b3787f6Schristos * particular bufferevent while suspending the whole
2382b3787f6Schristos * group. */
2392b3787f6Schristos if (is_write)
2402b3787f6Schristos bufferevent_suspend_write_(&bev->bev,
2412b3787f6Schristos BEV_SUSPEND_BW_GROUP);
2422b3787f6Schristos else
2432b3787f6Schristos bufferevent_suspend_read_(&bev->bev,
2442b3787f6Schristos BEV_SUSPEND_BW_GROUP);
2452b3787f6Schristos share = 0;
2462b3787f6Schristos } else {
2472b3787f6Schristos /* XXXX probably we should divide among the active
2482b3787f6Schristos * members, not the total members. */
2492b3787f6Schristos share = LIM(g->rate_limit) / g->n_members;
2502b3787f6Schristos if (share < g->min_share)
2512b3787f6Schristos share = g->min_share;
2522b3787f6Schristos }
2532b3787f6Schristos UNLOCK_GROUP(g);
2542b3787f6Schristos CLAMPTO(share);
2552b3787f6Schristos }
2562b3787f6Schristos
2572b3787f6Schristos if (max_so_far < 0)
2582b3787f6Schristos max_so_far = 0;
2592b3787f6Schristos return max_so_far;
2602b3787f6Schristos }
2612b3787f6Schristos
2622b3787f6Schristos ev_ssize_t
bufferevent_get_read_max_(struct bufferevent_private * bev)2632b3787f6Schristos bufferevent_get_read_max_(struct bufferevent_private *bev)
2642b3787f6Schristos {
2652b3787f6Schristos return bufferevent_get_rlim_max_(bev, 0);
2662b3787f6Schristos }
2672b3787f6Schristos
2682b3787f6Schristos ev_ssize_t
bufferevent_get_write_max_(struct bufferevent_private * bev)2692b3787f6Schristos bufferevent_get_write_max_(struct bufferevent_private *bev)
2702b3787f6Schristos {
2712b3787f6Schristos return bufferevent_get_rlim_max_(bev, 1);
2722b3787f6Schristos }
2732b3787f6Schristos
2742b3787f6Schristos int
bufferevent_decrement_read_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)2752b3787f6Schristos bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
2762b3787f6Schristos {
2772b3787f6Schristos /* XXXXX Make sure all users of this function check its return value */
2782b3787f6Schristos int r = 0;
2792b3787f6Schristos /* need to hold lock on bev */
2802b3787f6Schristos if (!bev->rate_limiting)
2812b3787f6Schristos return 0;
2822b3787f6Schristos
2832b3787f6Schristos if (bev->rate_limiting->cfg) {
2842b3787f6Schristos bev->rate_limiting->limit.read_limit -= bytes;
2852b3787f6Schristos if (bev->rate_limiting->limit.read_limit <= 0) {
2862b3787f6Schristos bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
2872b3787f6Schristos if (event_add(&bev->rate_limiting->refill_bucket_event,
2882b3787f6Schristos &bev->rate_limiting->cfg->tick_timeout) < 0)
2892b3787f6Schristos r = -1;
2902b3787f6Schristos } else if (bev->read_suspended & BEV_SUSPEND_BW) {
2912b3787f6Schristos if (!(bev->write_suspended & BEV_SUSPEND_BW))
2922b3787f6Schristos event_del(&bev->rate_limiting->refill_bucket_event);
2932b3787f6Schristos bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
2942b3787f6Schristos }
2952b3787f6Schristos }
2962b3787f6Schristos
2972b3787f6Schristos if (bev->rate_limiting->group) {
2982b3787f6Schristos LOCK_GROUP(bev->rate_limiting->group);
2992b3787f6Schristos bev->rate_limiting->group->rate_limit.read_limit -= bytes;
3002b3787f6Schristos bev->rate_limiting->group->total_read += bytes;
3012b3787f6Schristos if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
3022b3787f6Schristos bev_group_suspend_reading_(bev->rate_limiting->group);
3032b3787f6Schristos } else if (bev->rate_limiting->group->read_suspended) {
3042b3787f6Schristos bev_group_unsuspend_reading_(bev->rate_limiting->group);
3052b3787f6Schristos }
3062b3787f6Schristos UNLOCK_GROUP(bev->rate_limiting->group);
3072b3787f6Schristos }
3082b3787f6Schristos
3092b3787f6Schristos return r;
3102b3787f6Schristos }
3112b3787f6Schristos
3122b3787f6Schristos int
bufferevent_decrement_write_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)3132b3787f6Schristos bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
3142b3787f6Schristos {
3152b3787f6Schristos /* XXXXX Make sure all users of this function check its return value */
3162b3787f6Schristos int r = 0;
3172b3787f6Schristos /* need to hold lock */
3182b3787f6Schristos if (!bev->rate_limiting)
3192b3787f6Schristos return 0;
3202b3787f6Schristos
3212b3787f6Schristos if (bev->rate_limiting->cfg) {
3222b3787f6Schristos bev->rate_limiting->limit.write_limit -= bytes;
3232b3787f6Schristos if (bev->rate_limiting->limit.write_limit <= 0) {
3242b3787f6Schristos bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
3252b3787f6Schristos if (event_add(&bev->rate_limiting->refill_bucket_event,
3262b3787f6Schristos &bev->rate_limiting->cfg->tick_timeout) < 0)
3272b3787f6Schristos r = -1;
3282b3787f6Schristos } else if (bev->write_suspended & BEV_SUSPEND_BW) {
3292b3787f6Schristos if (!(bev->read_suspended & BEV_SUSPEND_BW))
3302b3787f6Schristos event_del(&bev->rate_limiting->refill_bucket_event);
3312b3787f6Schristos bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
3322b3787f6Schristos }
3332b3787f6Schristos }
3342b3787f6Schristos
3352b3787f6Schristos if (bev->rate_limiting->group) {
3362b3787f6Schristos LOCK_GROUP(bev->rate_limiting->group);
3372b3787f6Schristos bev->rate_limiting->group->rate_limit.write_limit -= bytes;
3382b3787f6Schristos bev->rate_limiting->group->total_written += bytes;
3392b3787f6Schristos if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
3402b3787f6Schristos bev_group_suspend_writing_(bev->rate_limiting->group);
3412b3787f6Schristos } else if (bev->rate_limiting->group->write_suspended) {
3422b3787f6Schristos bev_group_unsuspend_writing_(bev->rate_limiting->group);
3432b3787f6Schristos }
3442b3787f6Schristos UNLOCK_GROUP(bev->rate_limiting->group);
3452b3787f6Schristos }
3462b3787f6Schristos
3472b3787f6Schristos return r;
3482b3787f6Schristos }
3492b3787f6Schristos
3502b3787f6Schristos /** Stop reading on every bufferevent in <b>g</b> */
3512b3787f6Schristos static int
bev_group_suspend_reading_(struct bufferevent_rate_limit_group * g)3522b3787f6Schristos bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
3532b3787f6Schristos {
3542b3787f6Schristos /* Needs group lock */
3552b3787f6Schristos struct bufferevent_private *bev;
3562b3787f6Schristos g->read_suspended = 1;
3572b3787f6Schristos g->pending_unsuspend_read = 0;
3582b3787f6Schristos
3592b3787f6Schristos /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
3602b3787f6Schristos to prevent a deadlock. (Ordinarily, the group lock nests inside
3612b3787f6Schristos the bufferevent locks. If we are unable to lock any individual
3622b3787f6Schristos bufferevent, it will find out later when it looks at its limit
3632b3787f6Schristos and sees that its group is suspended.)
3642b3787f6Schristos */
3652b3787f6Schristos LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
3662b3787f6Schristos if (EVLOCK_TRY_LOCK_(bev->lock)) {
3672b3787f6Schristos bufferevent_suspend_read_(&bev->bev,
3682b3787f6Schristos BEV_SUSPEND_BW_GROUP);
3692b3787f6Schristos EVLOCK_UNLOCK(bev->lock, 0);
3702b3787f6Schristos }
3712b3787f6Schristos }
3722b3787f6Schristos return 0;
3732b3787f6Schristos }
3742b3787f6Schristos
3752b3787f6Schristos /** Stop writing on every bufferevent in <b>g</b> */
3762b3787f6Schristos static int
bev_group_suspend_writing_(struct bufferevent_rate_limit_group * g)3772b3787f6Schristos bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
3782b3787f6Schristos {
3792b3787f6Schristos /* Needs group lock */
3802b3787f6Schristos struct bufferevent_private *bev;
3812b3787f6Schristos g->write_suspended = 1;
3822b3787f6Schristos g->pending_unsuspend_write = 0;
3832b3787f6Schristos LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
3842b3787f6Schristos if (EVLOCK_TRY_LOCK_(bev->lock)) {
3852b3787f6Schristos bufferevent_suspend_write_(&bev->bev,
3862b3787f6Schristos BEV_SUSPEND_BW_GROUP);
3872b3787f6Schristos EVLOCK_UNLOCK(bev->lock, 0);
3882b3787f6Schristos }
3892b3787f6Schristos }
3902b3787f6Schristos return 0;
3912b3787f6Schristos }
3922b3787f6Schristos
3932b3787f6Schristos /** Timer callback invoked on a single bufferevent with one or more exhausted
3942b3787f6Schristos buckets when they are ready to refill. */
3952b3787f6Schristos static void
bev_refill_callback_(evutil_socket_t fd,short what,void * arg)3962b3787f6Schristos bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
3972b3787f6Schristos {
3982b3787f6Schristos unsigned tick;
3992b3787f6Schristos struct timeval now;
4002b3787f6Schristos struct bufferevent_private *bev = arg;
4012b3787f6Schristos int again = 0;
4022b3787f6Schristos BEV_LOCK(&bev->bev);
4032b3787f6Schristos if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
4042b3787f6Schristos BEV_UNLOCK(&bev->bev);
4052b3787f6Schristos return;
4062b3787f6Schristos }
4072b3787f6Schristos
4082b3787f6Schristos /* First, update the bucket */
4092b3787f6Schristos event_base_gettimeofday_cached(bev->bev.ev_base, &now);
4102b3787f6Schristos tick = ev_token_bucket_get_tick_(&now,
4112b3787f6Schristos bev->rate_limiting->cfg);
4122b3787f6Schristos ev_token_bucket_update_(&bev->rate_limiting->limit,
4132b3787f6Schristos bev->rate_limiting->cfg,
4142b3787f6Schristos tick);
4152b3787f6Schristos
4162b3787f6Schristos /* Now unsuspend any read/write operations as appropriate. */
4172b3787f6Schristos if ((bev->read_suspended & BEV_SUSPEND_BW)) {
4182b3787f6Schristos if (bev->rate_limiting->limit.read_limit > 0)
4192b3787f6Schristos bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
4202b3787f6Schristos else
4212b3787f6Schristos again = 1;
4222b3787f6Schristos }
4232b3787f6Schristos if ((bev->write_suspended & BEV_SUSPEND_BW)) {
4242b3787f6Schristos if (bev->rate_limiting->limit.write_limit > 0)
4252b3787f6Schristos bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
4262b3787f6Schristos else
4272b3787f6Schristos again = 1;
4282b3787f6Schristos }
4292b3787f6Schristos if (again) {
4302b3787f6Schristos /* One or more of the buckets may need another refill if they
4312b3787f6Schristos started negative.
4322b3787f6Schristos
4332b3787f6Schristos XXXX if we need to be quiet for more ticks, we should
4342b3787f6Schristos maybe figure out what timeout we really want.
4352b3787f6Schristos */
4362b3787f6Schristos /* XXXX Handle event_add failure somehow */
4372b3787f6Schristos event_add(&bev->rate_limiting->refill_bucket_event,
4382b3787f6Schristos &bev->rate_limiting->cfg->tick_timeout);
4392b3787f6Schristos }
4402b3787f6Schristos BEV_UNLOCK(&bev->bev);
4412b3787f6Schristos }
4422b3787f6Schristos
4432b3787f6Schristos /** Helper: grab a random element from a bufferevent group.
4442b3787f6Schristos *
4452b3787f6Schristos * Requires that we hold the lock on the group.
4462b3787f6Schristos */
4472b3787f6Schristos static struct bufferevent_private *
bev_group_random_element_(struct bufferevent_rate_limit_group * group)4482b3787f6Schristos bev_group_random_element_(struct bufferevent_rate_limit_group *group)
4492b3787f6Schristos {
4502b3787f6Schristos int which;
4512b3787f6Schristos struct bufferevent_private *bev;
4522b3787f6Schristos
4532b3787f6Schristos /* requires group lock */
4542b3787f6Schristos
4552b3787f6Schristos if (!group->n_members)
4562b3787f6Schristos return NULL;
4572b3787f6Schristos
4582b3787f6Schristos EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
4592b3787f6Schristos
4602b3787f6Schristos which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
4612b3787f6Schristos
4622b3787f6Schristos bev = LIST_FIRST(&group->members);
4632b3787f6Schristos while (which--)
4642b3787f6Schristos bev = LIST_NEXT(bev, rate_limiting->next_in_group);
4652b3787f6Schristos
4662b3787f6Schristos return bev;
4672b3787f6Schristos }
4682b3787f6Schristos
4692b3787f6Schristos /** Iterate over the elements of a rate-limiting group 'g' with a random
4702b3787f6Schristos starting point, assigning each to the variable 'bev', and executing the
4712b3787f6Schristos block 'block'.
4722b3787f6Schristos
4732b3787f6Schristos We do this in a half-baked effort to get fairness among group members.
4742b3787f6Schristos XXX Round-robin or some kind of priority queue would be even more fair.
4752b3787f6Schristos */
4762b3787f6Schristos #define FOREACH_RANDOM_ORDER(block) \
4772b3787f6Schristos do { \
4782b3787f6Schristos first = bev_group_random_element_(g); \
4792b3787f6Schristos for (bev = first; bev != LIST_END(&g->members); \
4802b3787f6Schristos bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
4812b3787f6Schristos block ; \
4822b3787f6Schristos } \
4832b3787f6Schristos for (bev = LIST_FIRST(&g->members); bev && bev != first; \
4842b3787f6Schristos bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
4852b3787f6Schristos block ; \
4862b3787f6Schristos } \
4872b3787f6Schristos } while (0)
4882b3787f6Schristos
4892b3787f6Schristos static void
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group * g)4902b3787f6Schristos bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
4912b3787f6Schristos {
4922b3787f6Schristos int again = 0;
4932b3787f6Schristos struct bufferevent_private *bev, *first;
4942b3787f6Schristos
4952b3787f6Schristos g->read_suspended = 0;
4962b3787f6Schristos FOREACH_RANDOM_ORDER({
4972b3787f6Schristos if (EVLOCK_TRY_LOCK_(bev->lock)) {
4982b3787f6Schristos bufferevent_unsuspend_read_(&bev->bev,
4992b3787f6Schristos BEV_SUSPEND_BW_GROUP);
5002b3787f6Schristos EVLOCK_UNLOCK(bev->lock, 0);
5012b3787f6Schristos } else {
5022b3787f6Schristos again = 1;
5032b3787f6Schristos }
5042b3787f6Schristos });
5052b3787f6Schristos g->pending_unsuspend_read = again;
5062b3787f6Schristos }
5072b3787f6Schristos
5082b3787f6Schristos static void
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group * g)5092b3787f6Schristos bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
5102b3787f6Schristos {
5112b3787f6Schristos int again = 0;
5122b3787f6Schristos struct bufferevent_private *bev, *first;
5132b3787f6Schristos g->write_suspended = 0;
5142b3787f6Schristos
5152b3787f6Schristos FOREACH_RANDOM_ORDER({
5162b3787f6Schristos if (EVLOCK_TRY_LOCK_(bev->lock)) {
5172b3787f6Schristos bufferevent_unsuspend_write_(&bev->bev,
5182b3787f6Schristos BEV_SUSPEND_BW_GROUP);
5192b3787f6Schristos EVLOCK_UNLOCK(bev->lock, 0);
5202b3787f6Schristos } else {
5212b3787f6Schristos again = 1;
5222b3787f6Schristos }
5232b3787f6Schristos });
5242b3787f6Schristos g->pending_unsuspend_write = again;
5252b3787f6Schristos }
5262b3787f6Schristos
5272b3787f6Schristos /** Callback invoked every tick to add more elements to the group bucket
5282b3787f6Schristos and unsuspend group members as needed.
5292b3787f6Schristos */
5302b3787f6Schristos static void
bev_group_refill_callback_(evutil_socket_t fd,short what,void * arg)5312b3787f6Schristos bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
5322b3787f6Schristos {
5332b3787f6Schristos struct bufferevent_rate_limit_group *g = arg;
5342b3787f6Schristos unsigned tick;
5352b3787f6Schristos struct timeval now;
5362b3787f6Schristos
5372b3787f6Schristos event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
5382b3787f6Schristos
5392b3787f6Schristos LOCK_GROUP(g);
5402b3787f6Schristos
5412b3787f6Schristos tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
5422b3787f6Schristos ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
5432b3787f6Schristos
5442b3787f6Schristos if (g->pending_unsuspend_read ||
5452b3787f6Schristos (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
5462b3787f6Schristos bev_group_unsuspend_reading_(g);
5472b3787f6Schristos }
5482b3787f6Schristos if (g->pending_unsuspend_write ||
5492b3787f6Schristos (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
5502b3787f6Schristos bev_group_unsuspend_writing_(g);
5512b3787f6Schristos }
5522b3787f6Schristos
5532b3787f6Schristos /* XXXX Rather than waiting to the next tick to unsuspend stuff
5542b3787f6Schristos * with pending_unsuspend_write/read, we should do it on the
5552b3787f6Schristos * next iteration of the mainloop.
5562b3787f6Schristos */
5572b3787f6Schristos
5582b3787f6Schristos UNLOCK_GROUP(g);
5592b3787f6Schristos }
5602b3787f6Schristos
5612b3787f6Schristos int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)5622b3787f6Schristos bufferevent_set_rate_limit(struct bufferevent *bev,
5632b3787f6Schristos struct ev_token_bucket_cfg *cfg)
5642b3787f6Schristos {
5652b3787f6Schristos struct bufferevent_private *bevp =
5662b3787f6Schristos EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
5672b3787f6Schristos int r = -1;
5682b3787f6Schristos struct bufferevent_rate_limit *rlim;
5692b3787f6Schristos struct timeval now;
5702b3787f6Schristos ev_uint32_t tick;
5712b3787f6Schristos int reinit = 0, suspended = 0;
5722b3787f6Schristos /* XXX reference-count cfg */
5732b3787f6Schristos
5742b3787f6Schristos BEV_LOCK(bev);
5752b3787f6Schristos
5762b3787f6Schristos if (cfg == NULL) {
5772b3787f6Schristos if (bevp->rate_limiting) {
5782b3787f6Schristos rlim = bevp->rate_limiting;
5792b3787f6Schristos rlim->cfg = NULL;
5802b3787f6Schristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
5812b3787f6Schristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
5822b3787f6Schristos if (event_initialized(&rlim->refill_bucket_event))
5832b3787f6Schristos event_del(&rlim->refill_bucket_event);
5842b3787f6Schristos }
5852b3787f6Schristos r = 0;
5862b3787f6Schristos goto done;
5872b3787f6Schristos }
5882b3787f6Schristos
5892b3787f6Schristos event_base_gettimeofday_cached(bev->ev_base, &now);
5902b3787f6Schristos tick = ev_token_bucket_get_tick_(&now, cfg);
5912b3787f6Schristos
5922b3787f6Schristos if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
5932b3787f6Schristos /* no-op */
5942b3787f6Schristos r = 0;
5952b3787f6Schristos goto done;
5962b3787f6Schristos }
5972b3787f6Schristos if (bevp->rate_limiting == NULL) {
5982b3787f6Schristos rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
5992b3787f6Schristos if (!rlim)
6002b3787f6Schristos goto done;
6012b3787f6Schristos bevp->rate_limiting = rlim;
6022b3787f6Schristos } else {
6032b3787f6Schristos rlim = bevp->rate_limiting;
6042b3787f6Schristos }
6052b3787f6Schristos reinit = rlim->cfg != NULL;
6062b3787f6Schristos
6072b3787f6Schristos rlim->cfg = cfg;
6082b3787f6Schristos ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
6092b3787f6Schristos
6102b3787f6Schristos if (reinit) {
6112b3787f6Schristos EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
6122b3787f6Schristos event_del(&rlim->refill_bucket_event);
6132b3787f6Schristos }
6141b6f2cd4Schristos event_assign(&rlim->refill_bucket_event, bev->ev_base,
6151b6f2cd4Schristos -1, EV_FINALIZE, bev_refill_callback_, bevp);
6162b3787f6Schristos
6172b3787f6Schristos if (rlim->limit.read_limit > 0) {
6182b3787f6Schristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
6192b3787f6Schristos } else {
6202b3787f6Schristos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
6212b3787f6Schristos suspended=1;
6222b3787f6Schristos }
6232b3787f6Schristos if (rlim->limit.write_limit > 0) {
6242b3787f6Schristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
6252b3787f6Schristos } else {
6262b3787f6Schristos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
6272b3787f6Schristos suspended = 1;
6282b3787f6Schristos }
6292b3787f6Schristos
6302b3787f6Schristos if (suspended)
6312b3787f6Schristos event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
6322b3787f6Schristos
6332b3787f6Schristos r = 0;
6342b3787f6Schristos
6352b3787f6Schristos done:
6362b3787f6Schristos BEV_UNLOCK(bev);
6372b3787f6Schristos return r;
6382b3787f6Schristos }
6392b3787f6Schristos
6402b3787f6Schristos struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)6412b3787f6Schristos bufferevent_rate_limit_group_new(struct event_base *base,
6422b3787f6Schristos const struct ev_token_bucket_cfg *cfg)
6432b3787f6Schristos {
6442b3787f6Schristos struct bufferevent_rate_limit_group *g;
6452b3787f6Schristos struct timeval now;
6462b3787f6Schristos ev_uint32_t tick;
6472b3787f6Schristos
6482b3787f6Schristos event_base_gettimeofday_cached(base, &now);
6492b3787f6Schristos tick = ev_token_bucket_get_tick_(&now, cfg);
6502b3787f6Schristos
6512b3787f6Schristos g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
6522b3787f6Schristos if (!g)
6532b3787f6Schristos return NULL;
6542b3787f6Schristos memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
6552b3787f6Schristos LIST_INIT(&g->members);
6562b3787f6Schristos
6572b3787f6Schristos ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
6582b3787f6Schristos
6591b6f2cd4Schristos event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
6602b3787f6Schristos bev_group_refill_callback_, g);
6612b3787f6Schristos /*XXXX handle event_add failure */
6622b3787f6Schristos event_add(&g->master_refill_event, &cfg->tick_timeout);
6632b3787f6Schristos
6642b3787f6Schristos EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
6652b3787f6Schristos
6662b3787f6Schristos bufferevent_rate_limit_group_set_min_share(g, 64);
6672b3787f6Schristos
6682b3787f6Schristos evutil_weakrand_seed_(&g->weakrand_seed,
6692b3787f6Schristos (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
6702b3787f6Schristos
6712b3787f6Schristos return g;
6722b3787f6Schristos }
6732b3787f6Schristos
6742b3787f6Schristos int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)6752b3787f6Schristos bufferevent_rate_limit_group_set_cfg(
6762b3787f6Schristos struct bufferevent_rate_limit_group *g,
6772b3787f6Schristos const struct ev_token_bucket_cfg *cfg)
6782b3787f6Schristos {
6792b3787f6Schristos int same_tick;
6802b3787f6Schristos if (!g || !cfg)
6812b3787f6Schristos return -1;
6822b3787f6Schristos
6832b3787f6Schristos LOCK_GROUP(g);
6842b3787f6Schristos same_tick = evutil_timercmp(
6852b3787f6Schristos &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
6862b3787f6Schristos memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
6872b3787f6Schristos
6882b3787f6Schristos if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
6892b3787f6Schristos g->rate_limit.read_limit = cfg->read_maximum;
6902b3787f6Schristos if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
6912b3787f6Schristos g->rate_limit.write_limit = cfg->write_maximum;
6922b3787f6Schristos
6932b3787f6Schristos if (!same_tick) {
6942b3787f6Schristos /* This can cause a hiccup in the schedule */
6952b3787f6Schristos event_add(&g->master_refill_event, &cfg->tick_timeout);
6962b3787f6Schristos }
6972b3787f6Schristos
6982b3787f6Schristos /* The new limits might force us to adjust min_share differently. */
6992b3787f6Schristos bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
7002b3787f6Schristos
7012b3787f6Schristos UNLOCK_GROUP(g);
7022b3787f6Schristos return 0;
7032b3787f6Schristos }
7042b3787f6Schristos
7052b3787f6Schristos int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)7062b3787f6Schristos bufferevent_rate_limit_group_set_min_share(
7072b3787f6Schristos struct bufferevent_rate_limit_group *g,
7082b3787f6Schristos size_t share)
7092b3787f6Schristos {
7102b3787f6Schristos if (share > EV_SSIZE_MAX)
7112b3787f6Schristos return -1;
7122b3787f6Schristos
7132b3787f6Schristos g->configured_min_share = share;
7142b3787f6Schristos
7152b3787f6Schristos /* Can't set share to less than the one-tick maximum. IOW, at steady
7162b3787f6Schristos * state, at least one connection can go per tick. */
7172b3787f6Schristos if (share > g->rate_limit_cfg.read_rate)
7182b3787f6Schristos share = g->rate_limit_cfg.read_rate;
7192b3787f6Schristos if (share > g->rate_limit_cfg.write_rate)
7202b3787f6Schristos share = g->rate_limit_cfg.write_rate;
7212b3787f6Schristos
7222b3787f6Schristos g->min_share = share;
7232b3787f6Schristos return 0;
7242b3787f6Schristos }
7252b3787f6Schristos
7262b3787f6Schristos void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)7272b3787f6Schristos bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
7282b3787f6Schristos {
7292b3787f6Schristos LOCK_GROUP(g);
7302b3787f6Schristos EVUTIL_ASSERT(0 == g->n_members);
7312b3787f6Schristos event_del(&g->master_refill_event);
7322b3787f6Schristos UNLOCK_GROUP(g);
7332b3787f6Schristos EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
7342b3787f6Schristos mm_free(g);
7352b3787f6Schristos }
7362b3787f6Schristos
7372b3787f6Schristos int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)7382b3787f6Schristos bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
7392b3787f6Schristos struct bufferevent_rate_limit_group *g)
7402b3787f6Schristos {
7412b3787f6Schristos int wsuspend, rsuspend;
7422b3787f6Schristos struct bufferevent_private *bevp =
7432b3787f6Schristos EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
7442b3787f6Schristos BEV_LOCK(bev);
7452b3787f6Schristos
7462b3787f6Schristos if (!bevp->rate_limiting) {
7472b3787f6Schristos struct bufferevent_rate_limit *rlim;
7482b3787f6Schristos rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
7492b3787f6Schristos if (!rlim) {
7502b3787f6Schristos BEV_UNLOCK(bev);
7512b3787f6Schristos return -1;
7522b3787f6Schristos }
7531b6f2cd4Schristos event_assign(&rlim->refill_bucket_event, bev->ev_base,
7541b6f2cd4Schristos -1, EV_FINALIZE, bev_refill_callback_, bevp);
7552b3787f6Schristos bevp->rate_limiting = rlim;
7562b3787f6Schristos }
7572b3787f6Schristos
7582b3787f6Schristos if (bevp->rate_limiting->group == g) {
7592b3787f6Schristos BEV_UNLOCK(bev);
7602b3787f6Schristos return 0;
7612b3787f6Schristos }
7622b3787f6Schristos if (bevp->rate_limiting->group)
7632b3787f6Schristos bufferevent_remove_from_rate_limit_group(bev);
7642b3787f6Schristos
7652b3787f6Schristos LOCK_GROUP(g);
7662b3787f6Schristos bevp->rate_limiting->group = g;
7672b3787f6Schristos ++g->n_members;
7682b3787f6Schristos LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
7692b3787f6Schristos
7702b3787f6Schristos rsuspend = g->read_suspended;
7712b3787f6Schristos wsuspend = g->write_suspended;
7722b3787f6Schristos
7732b3787f6Schristos UNLOCK_GROUP(g);
7742b3787f6Schristos
7752b3787f6Schristos if (rsuspend)
7762b3787f6Schristos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
7772b3787f6Schristos if (wsuspend)
7782b3787f6Schristos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
7792b3787f6Schristos
7802b3787f6Schristos BEV_UNLOCK(bev);
7812b3787f6Schristos return 0;
7822b3787f6Schristos }
7832b3787f6Schristos
7842b3787f6Schristos int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)7852b3787f6Schristos bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
7862b3787f6Schristos {
7872b3787f6Schristos return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
7882b3787f6Schristos }
7892b3787f6Schristos
7902b3787f6Schristos int
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent * bev,int unsuspend)7912b3787f6Schristos bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
7922b3787f6Schristos int unsuspend)
7932b3787f6Schristos {
7942b3787f6Schristos struct bufferevent_private *bevp =
7952b3787f6Schristos EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
7962b3787f6Schristos BEV_LOCK(bev);
7972b3787f6Schristos if (bevp->rate_limiting && bevp->rate_limiting->group) {
7982b3787f6Schristos struct bufferevent_rate_limit_group *g =
7992b3787f6Schristos bevp->rate_limiting->group;
8002b3787f6Schristos LOCK_GROUP(g);
8012b3787f6Schristos bevp->rate_limiting->group = NULL;
8022b3787f6Schristos --g->n_members;
8032b3787f6Schristos LIST_REMOVE(bevp, rate_limiting->next_in_group);
8042b3787f6Schristos UNLOCK_GROUP(g);
8052b3787f6Schristos }
8062b3787f6Schristos if (unsuspend) {
8072b3787f6Schristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
8082b3787f6Schristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
8092b3787f6Schristos }
8102b3787f6Schristos BEV_UNLOCK(bev);
8112b3787f6Schristos return 0;
8122b3787f6Schristos }
8132b3787f6Schristos
8142b3787f6Schristos /* ===
8152b3787f6Schristos * API functions to expose rate limits.
8162b3787f6Schristos *
8172b3787f6Schristos * Don't use these from inside Libevent; they're meant to be for use by
8182b3787f6Schristos * the program.
8192b3787f6Schristos * === */
8202b3787f6Schristos
8212b3787f6Schristos /* Mostly you don't want to use this function from inside libevent;
8222b3787f6Schristos * bufferevent_get_read_max_() is more likely what you want*/
8232b3787f6Schristos ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)8242b3787f6Schristos bufferevent_get_read_limit(struct bufferevent *bev)
8252b3787f6Schristos {
8262b3787f6Schristos ev_ssize_t r;
8272b3787f6Schristos struct bufferevent_private *bevp;
8282b3787f6Schristos BEV_LOCK(bev);
8292b3787f6Schristos bevp = BEV_UPCAST(bev);
8302b3787f6Schristos if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
8312b3787f6Schristos bufferevent_update_buckets(bevp);
8322b3787f6Schristos r = bevp->rate_limiting->limit.read_limit;
8332b3787f6Schristos } else {
8342b3787f6Schristos r = EV_SSIZE_MAX;
8352b3787f6Schristos }
8362b3787f6Schristos BEV_UNLOCK(bev);
8372b3787f6Schristos return r;
8382b3787f6Schristos }
8392b3787f6Schristos
8402b3787f6Schristos /* Mostly you don't want to use this function from inside libevent;
8412b3787f6Schristos * bufferevent_get_write_max_() is more likely what you want*/
8422b3787f6Schristos ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)8432b3787f6Schristos bufferevent_get_write_limit(struct bufferevent *bev)
8442b3787f6Schristos {
8452b3787f6Schristos ev_ssize_t r;
8462b3787f6Schristos struct bufferevent_private *bevp;
8472b3787f6Schristos BEV_LOCK(bev);
8482b3787f6Schristos bevp = BEV_UPCAST(bev);
8492b3787f6Schristos if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
8502b3787f6Schristos bufferevent_update_buckets(bevp);
8512b3787f6Schristos r = bevp->rate_limiting->limit.write_limit;
8522b3787f6Schristos } else {
8532b3787f6Schristos r = EV_SSIZE_MAX;
8542b3787f6Schristos }
8552b3787f6Schristos BEV_UNLOCK(bev);
8562b3787f6Schristos return r;
8572b3787f6Schristos }
8582b3787f6Schristos
8592b3787f6Schristos int
bufferevent_set_max_single_read(struct bufferevent * bev,size_t size)8602b3787f6Schristos bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
8612b3787f6Schristos {
8622b3787f6Schristos struct bufferevent_private *bevp;
8632b3787f6Schristos BEV_LOCK(bev);
8642b3787f6Schristos bevp = BEV_UPCAST(bev);
8652b3787f6Schristos if (size == 0 || size > EV_SSIZE_MAX)
8662b3787f6Schristos bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
8672b3787f6Schristos else
8682b3787f6Schristos bevp->max_single_read = size;
8692b3787f6Schristos BEV_UNLOCK(bev);
8702b3787f6Schristos return 0;
8712b3787f6Schristos }
8722b3787f6Schristos
8732b3787f6Schristos int
bufferevent_set_max_single_write(struct bufferevent * bev,size_t size)8742b3787f6Schristos bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
8752b3787f6Schristos {
8762b3787f6Schristos struct bufferevent_private *bevp;
8772b3787f6Schristos BEV_LOCK(bev);
8782b3787f6Schristos bevp = BEV_UPCAST(bev);
8792b3787f6Schristos if (size == 0 || size > EV_SSIZE_MAX)
8802b3787f6Schristos bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
8812b3787f6Schristos else
8822b3787f6Schristos bevp->max_single_write = size;
8832b3787f6Schristos BEV_UNLOCK(bev);
8842b3787f6Schristos return 0;
8852b3787f6Schristos }
8862b3787f6Schristos
8872b3787f6Schristos ev_ssize_t
bufferevent_get_max_single_read(struct bufferevent * bev)8882b3787f6Schristos bufferevent_get_max_single_read(struct bufferevent *bev)
8892b3787f6Schristos {
8902b3787f6Schristos ev_ssize_t r;
8912b3787f6Schristos
8922b3787f6Schristos BEV_LOCK(bev);
8932b3787f6Schristos r = BEV_UPCAST(bev)->max_single_read;
8942b3787f6Schristos BEV_UNLOCK(bev);
8952b3787f6Schristos return r;
8962b3787f6Schristos }
8972b3787f6Schristos
8982b3787f6Schristos ev_ssize_t
bufferevent_get_max_single_write(struct bufferevent * bev)8992b3787f6Schristos bufferevent_get_max_single_write(struct bufferevent *bev)
9002b3787f6Schristos {
9012b3787f6Schristos ev_ssize_t r;
9022b3787f6Schristos
9032b3787f6Schristos BEV_LOCK(bev);
9042b3787f6Schristos r = BEV_UPCAST(bev)->max_single_write;
9052b3787f6Schristos BEV_UNLOCK(bev);
9062b3787f6Schristos return r;
9072b3787f6Schristos }
9082b3787f6Schristos
9092b3787f6Schristos ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)9102b3787f6Schristos bufferevent_get_max_to_read(struct bufferevent *bev)
9112b3787f6Schristos {
9122b3787f6Schristos ev_ssize_t r;
9132b3787f6Schristos BEV_LOCK(bev);
9142b3787f6Schristos r = bufferevent_get_read_max_(BEV_UPCAST(bev));
9152b3787f6Schristos BEV_UNLOCK(bev);
9162b3787f6Schristos return r;
9172b3787f6Schristos }
9182b3787f6Schristos
9192b3787f6Schristos ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)9202b3787f6Schristos bufferevent_get_max_to_write(struct bufferevent *bev)
9212b3787f6Schristos {
9222b3787f6Schristos ev_ssize_t r;
9232b3787f6Schristos BEV_LOCK(bev);
9242b3787f6Schristos r = bufferevent_get_write_max_(BEV_UPCAST(bev));
9252b3787f6Schristos BEV_UNLOCK(bev);
9262b3787f6Schristos return r;
9272b3787f6Schristos }
9282b3787f6Schristos
9291b6f2cd4Schristos const struct ev_token_bucket_cfg *
bufferevent_get_token_bucket_cfg(const struct bufferevent * bev)9301b6f2cd4Schristos bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
9311b6f2cd4Schristos struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
9321b6f2cd4Schristos struct ev_token_bucket_cfg *cfg;
9331b6f2cd4Schristos
9341b6f2cd4Schristos BEV_LOCK(bev);
9351b6f2cd4Schristos
9361b6f2cd4Schristos if (bufev_private->rate_limiting) {
9371b6f2cd4Schristos cfg = bufev_private->rate_limiting->cfg;
9381b6f2cd4Schristos } else {
9391b6f2cd4Schristos cfg = NULL;
9401b6f2cd4Schristos }
9411b6f2cd4Schristos
9421b6f2cd4Schristos BEV_UNLOCK(bev);
9431b6f2cd4Schristos
9441b6f2cd4Schristos return cfg;
9451b6f2cd4Schristos }
9462b3787f6Schristos
9472b3787f6Schristos /* Mostly you don't want to use this function from inside libevent;
9482b3787f6Schristos * bufferevent_get_read_max_() is more likely what you want*/
9492b3787f6Schristos ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)9502b3787f6Schristos bufferevent_rate_limit_group_get_read_limit(
9512b3787f6Schristos struct bufferevent_rate_limit_group *grp)
9522b3787f6Schristos {
9532b3787f6Schristos ev_ssize_t r;
9542b3787f6Schristos LOCK_GROUP(grp);
9552b3787f6Schristos r = grp->rate_limit.read_limit;
9562b3787f6Schristos UNLOCK_GROUP(grp);
9572b3787f6Schristos return r;
9582b3787f6Schristos }
9592b3787f6Schristos
9602b3787f6Schristos /* Mostly you don't want to use this function from inside libevent;
9612b3787f6Schristos * bufferevent_get_write_max_() is more likely what you want. */
9622b3787f6Schristos ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)9632b3787f6Schristos bufferevent_rate_limit_group_get_write_limit(
9642b3787f6Schristos struct bufferevent_rate_limit_group *grp)
9652b3787f6Schristos {
9662b3787f6Schristos ev_ssize_t r;
9672b3787f6Schristos LOCK_GROUP(grp);
9682b3787f6Schristos r = grp->rate_limit.write_limit;
9692b3787f6Schristos UNLOCK_GROUP(grp);
9702b3787f6Schristos return r;
9712b3787f6Schristos }
9722b3787f6Schristos
9732b3787f6Schristos int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)9742b3787f6Schristos bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
9752b3787f6Schristos {
9762b3787f6Schristos int r = 0;
9772b3787f6Schristos ev_ssize_t old_limit, new_limit;
9782b3787f6Schristos struct bufferevent_private *bevp;
9792b3787f6Schristos BEV_LOCK(bev);
9802b3787f6Schristos bevp = BEV_UPCAST(bev);
9812b3787f6Schristos EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
9822b3787f6Schristos old_limit = bevp->rate_limiting->limit.read_limit;
9832b3787f6Schristos
9842b3787f6Schristos new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
9852b3787f6Schristos if (old_limit > 0 && new_limit <= 0) {
9862b3787f6Schristos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
9872b3787f6Schristos if (event_add(&bevp->rate_limiting->refill_bucket_event,
9882b3787f6Schristos &bevp->rate_limiting->cfg->tick_timeout) < 0)
9892b3787f6Schristos r = -1;
9902b3787f6Schristos } else if (old_limit <= 0 && new_limit > 0) {
9912b3787f6Schristos if (!(bevp->write_suspended & BEV_SUSPEND_BW))
9922b3787f6Schristos event_del(&bevp->rate_limiting->refill_bucket_event);
9932b3787f6Schristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
9942b3787f6Schristos }
9952b3787f6Schristos
9962b3787f6Schristos BEV_UNLOCK(bev);
9972b3787f6Schristos return r;
9982b3787f6Schristos }
9992b3787f6Schristos
10002b3787f6Schristos int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)10012b3787f6Schristos bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
10022b3787f6Schristos {
10032b3787f6Schristos /* XXXX this is mostly copy-and-paste from
10042b3787f6Schristos * bufferevent_decrement_read_limit */
10052b3787f6Schristos int r = 0;
10062b3787f6Schristos ev_ssize_t old_limit, new_limit;
10072b3787f6Schristos struct bufferevent_private *bevp;
10082b3787f6Schristos BEV_LOCK(bev);
10092b3787f6Schristos bevp = BEV_UPCAST(bev);
10102b3787f6Schristos EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
10112b3787f6Schristos old_limit = bevp->rate_limiting->limit.write_limit;
10122b3787f6Schristos
10132b3787f6Schristos new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
10142b3787f6Schristos if (old_limit > 0 && new_limit <= 0) {
10152b3787f6Schristos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
10162b3787f6Schristos if (event_add(&bevp->rate_limiting->refill_bucket_event,
10172b3787f6Schristos &bevp->rate_limiting->cfg->tick_timeout) < 0)
10182b3787f6Schristos r = -1;
10192b3787f6Schristos } else if (old_limit <= 0 && new_limit > 0) {
10202b3787f6Schristos if (!(bevp->read_suspended & BEV_SUSPEND_BW))
10212b3787f6Schristos event_del(&bevp->rate_limiting->refill_bucket_event);
10222b3787f6Schristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
10232b3787f6Schristos }
10242b3787f6Schristos
10252b3787f6Schristos BEV_UNLOCK(bev);
10262b3787f6Schristos return r;
10272b3787f6Schristos }
10282b3787f6Schristos
10292b3787f6Schristos int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)10302b3787f6Schristos bufferevent_rate_limit_group_decrement_read(
10312b3787f6Schristos struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
10322b3787f6Schristos {
10332b3787f6Schristos int r = 0;
10342b3787f6Schristos ev_ssize_t old_limit, new_limit;
10352b3787f6Schristos LOCK_GROUP(grp);
10362b3787f6Schristos old_limit = grp->rate_limit.read_limit;
10372b3787f6Schristos new_limit = (grp->rate_limit.read_limit -= decr);
10382b3787f6Schristos
10392b3787f6Schristos if (old_limit > 0 && new_limit <= 0) {
10402b3787f6Schristos bev_group_suspend_reading_(grp);
10412b3787f6Schristos } else if (old_limit <= 0 && new_limit > 0) {
10422b3787f6Schristos bev_group_unsuspend_reading_(grp);
10432b3787f6Schristos }
10442b3787f6Schristos
10452b3787f6Schristos UNLOCK_GROUP(grp);
10462b3787f6Schristos return r;
10472b3787f6Schristos }
10482b3787f6Schristos
10492b3787f6Schristos int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)10502b3787f6Schristos bufferevent_rate_limit_group_decrement_write(
10512b3787f6Schristos struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
10522b3787f6Schristos {
10532b3787f6Schristos int r = 0;
10542b3787f6Schristos ev_ssize_t old_limit, new_limit;
10552b3787f6Schristos LOCK_GROUP(grp);
10562b3787f6Schristos old_limit = grp->rate_limit.write_limit;
10572b3787f6Schristos new_limit = (grp->rate_limit.write_limit -= decr);
10582b3787f6Schristos
10592b3787f6Schristos if (old_limit > 0 && new_limit <= 0) {
10602b3787f6Schristos bev_group_suspend_writing_(grp);
10612b3787f6Schristos } else if (old_limit <= 0 && new_limit > 0) {
10622b3787f6Schristos bev_group_unsuspend_writing_(grp);
10632b3787f6Schristos }
10642b3787f6Schristos
10652b3787f6Schristos UNLOCK_GROUP(grp);
10662b3787f6Schristos return r;
10672b3787f6Schristos }
10682b3787f6Schristos
10692b3787f6Schristos void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)10702b3787f6Schristos bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
10712b3787f6Schristos ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
10722b3787f6Schristos {
10732b3787f6Schristos EVUTIL_ASSERT(grp != NULL);
10742b3787f6Schristos if (total_read_out)
10752b3787f6Schristos *total_read_out = grp->total_read;
10762b3787f6Schristos if (total_written_out)
10772b3787f6Schristos *total_written_out = grp->total_written;
10782b3787f6Schristos }
10792b3787f6Schristos
10802b3787f6Schristos void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)10812b3787f6Schristos bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
10822b3787f6Schristos {
10832b3787f6Schristos grp->total_read = grp->total_written = 0;
10842b3787f6Schristos }
10852b3787f6Schristos
10862b3787f6Schristos int
bufferevent_ratelim_init_(struct bufferevent_private * bev)10872b3787f6Schristos bufferevent_ratelim_init_(struct bufferevent_private *bev)
10882b3787f6Schristos {
10892b3787f6Schristos bev->rate_limiting = NULL;
10902b3787f6Schristos bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
10912b3787f6Schristos bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
10922b3787f6Schristos
10932b3787f6Schristos return 0;
10942b3787f6Schristos }
1095