1*7c5f803cSrillig /* $NetBSD: bufferevent_ratelim.c,v 1.6 2021/04/10 19:18:45 rillig Exp $ */
24109d450Schristos
393dcc084Schristos /*
493dcc084Schristos * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
593dcc084Schristos * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
693dcc084Schristos * All rights reserved.
793dcc084Schristos *
893dcc084Schristos * Redistribution and use in source and binary forms, with or without
993dcc084Schristos * modification, are permitted provided that the following conditions
1093dcc084Schristos * are met:
1193dcc084Schristos * 1. Redistributions of source code must retain the above copyright
1293dcc084Schristos * notice, this list of conditions and the following disclaimer.
1393dcc084Schristos * 2. Redistributions in binary form must reproduce the above copyright
1493dcc084Schristos * notice, this list of conditions and the following disclaimer in the
1593dcc084Schristos * documentation and/or other materials provided with the distribution.
1693dcc084Schristos * 3. The name of the author may not be used to endorse or promote products
1793dcc084Schristos * derived from this software without specific prior written permission.
1893dcc084Schristos *
1993dcc084Schristos * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
2093dcc084Schristos * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
2193dcc084Schristos * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
2293dcc084Schristos * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
2393dcc084Schristos * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
2493dcc084Schristos * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
2593dcc084Schristos * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
2693dcc084Schristos * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
2793dcc084Schristos * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
2893dcc084Schristos * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2993dcc084Schristos */
30615c371fSchristos #include "evconfig-private.h"
3193dcc084Schristos
3293dcc084Schristos #include <sys/types.h>
3393dcc084Schristos #include <limits.h>
3493dcc084Schristos #include <string.h>
3593dcc084Schristos #include <stdlib.h>
3693dcc084Schristos
3793dcc084Schristos #include "event2/event.h"
3893dcc084Schristos #include "event2/event_struct.h"
3993dcc084Schristos #include "event2/util.h"
4093dcc084Schristos #include "event2/bufferevent.h"
4193dcc084Schristos #include "event2/bufferevent_struct.h"
4293dcc084Schristos #include "event2/buffer.h"
4393dcc084Schristos
4493dcc084Schristos #include "ratelim-internal.h"
4593dcc084Schristos
4693dcc084Schristos #include "bufferevent-internal.h"
4793dcc084Schristos #include "mm-internal.h"
4893dcc084Schristos #include "util-internal.h"
4993dcc084Schristos #include "event-internal.h"
5093dcc084Schristos
5193dcc084Schristos int
ev_token_bucket_init_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)52615c371fSchristos ev_token_bucket_init_(struct ev_token_bucket *bucket,
5393dcc084Schristos const struct ev_token_bucket_cfg *cfg,
5493dcc084Schristos ev_uint32_t current_tick,
5593dcc084Schristos int reinitialize)
5693dcc084Schristos {
5793dcc084Schristos if (reinitialize) {
5893dcc084Schristos /* on reinitialization, we only clip downwards, since we've
5993dcc084Schristos already used who-knows-how-much bandwidth this tick. We
6093dcc084Schristos leave "last_updated" as it is; the next update will add the
6193dcc084Schristos appropriate amount of bandwidth to the bucket.
6293dcc084Schristos */
6393dcc084Schristos if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
6493dcc084Schristos bucket->read_limit = cfg->read_maximum;
6593dcc084Schristos if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
6693dcc084Schristos bucket->write_limit = cfg->write_maximum;
6793dcc084Schristos } else {
6893dcc084Schristos bucket->read_limit = cfg->read_rate;
6993dcc084Schristos bucket->write_limit = cfg->write_rate;
7093dcc084Schristos bucket->last_updated = current_tick;
7193dcc084Schristos }
7293dcc084Schristos return 0;
7393dcc084Schristos }
7493dcc084Schristos
7593dcc084Schristos int
ev_token_bucket_update_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)76615c371fSchristos ev_token_bucket_update_(struct ev_token_bucket *bucket,
7793dcc084Schristos const struct ev_token_bucket_cfg *cfg,
7893dcc084Schristos ev_uint32_t current_tick)
7993dcc084Schristos {
8093dcc084Schristos /* It's okay if the tick number overflows, since we'll just
8193dcc084Schristos * wrap around when we do the unsigned substraction. */
8293dcc084Schristos unsigned n_ticks = current_tick - bucket->last_updated;
8393dcc084Schristos
8493dcc084Schristos /* Make sure some ticks actually happened, and that time didn't
8593dcc084Schristos * roll back. */
8693dcc084Schristos if (n_ticks == 0 || n_ticks > INT_MAX)
8793dcc084Schristos return 0;
8893dcc084Schristos
8993dcc084Schristos /* Naively, we would say
9093dcc084Schristos bucket->limit += n_ticks * cfg->rate;
9193dcc084Schristos
9293dcc084Schristos if (bucket->limit > cfg->maximum)
9393dcc084Schristos bucket->limit = cfg->maximum;
9493dcc084Schristos
9593dcc084Schristos But we're worried about overflow, so we do it like this:
9693dcc084Schristos */
9793dcc084Schristos
9893dcc084Schristos if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
9993dcc084Schristos bucket->read_limit = cfg->read_maximum;
10093dcc084Schristos else
10193dcc084Schristos bucket->read_limit += n_ticks * cfg->read_rate;
10293dcc084Schristos
10393dcc084Schristos
10493dcc084Schristos if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
10593dcc084Schristos bucket->write_limit = cfg->write_maximum;
10693dcc084Schristos else
10793dcc084Schristos bucket->write_limit += n_ticks * cfg->write_rate;
10893dcc084Schristos
10993dcc084Schristos
11093dcc084Schristos bucket->last_updated = current_tick;
11193dcc084Schristos
11293dcc084Schristos return 1;
11393dcc084Schristos }
11493dcc084Schristos
11593dcc084Schristos static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)11693dcc084Schristos bufferevent_update_buckets(struct bufferevent_private *bev)
11793dcc084Schristos {
11893dcc084Schristos /* Must hold lock on bev. */
11993dcc084Schristos struct timeval now;
12093dcc084Schristos unsigned tick;
12193dcc084Schristos event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122615c371fSchristos tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
12393dcc084Schristos if (tick != bev->rate_limiting->limit.last_updated)
124615c371fSchristos ev_token_bucket_update_(&bev->rate_limiting->limit,
12593dcc084Schristos bev->rate_limiting->cfg, tick);
12693dcc084Schristos }
12793dcc084Schristos
12893dcc084Schristos ev_uint32_t
ev_token_bucket_get_tick_(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)129615c371fSchristos ev_token_bucket_get_tick_(const struct timeval *tv,
13093dcc084Schristos const struct ev_token_bucket_cfg *cfg)
13193dcc084Schristos {
13293dcc084Schristos /* This computation uses two multiplies and a divide. We could do
13393dcc084Schristos * fewer if we knew that the tick length was an integer number of
13493dcc084Schristos * seconds, or if we knew it divided evenly into a second. We should
13593dcc084Schristos * investigate that more.
13693dcc084Schristos */
13793dcc084Schristos
13893dcc084Schristos /* We cast to an ev_uint64_t first, since we don't want to overflow
13993dcc084Schristos * before we do the final divide. */
14093dcc084Schristos ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
14193dcc084Schristos return (unsigned)(msec / cfg->msec_per_tick);
14293dcc084Schristos }
14393dcc084Schristos
14493dcc084Schristos struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)14593dcc084Schristos ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
14693dcc084Schristos size_t write_rate, size_t write_burst,
14793dcc084Schristos const struct timeval *tick_len)
14893dcc084Schristos {
14993dcc084Schristos struct ev_token_bucket_cfg *r;
15093dcc084Schristos struct timeval g;
15193dcc084Schristos if (! tick_len) {
15293dcc084Schristos g.tv_sec = 1;
15393dcc084Schristos g.tv_usec = 0;
15493dcc084Schristos tick_len = &g;
15593dcc084Schristos }
15693dcc084Schristos if (read_rate > read_burst || write_rate > write_burst ||
15793dcc084Schristos read_rate < 1 || write_rate < 1)
15893dcc084Schristos return NULL;
15993dcc084Schristos if (read_rate > EV_RATE_LIMIT_MAX ||
16093dcc084Schristos write_rate > EV_RATE_LIMIT_MAX ||
16193dcc084Schristos read_burst > EV_RATE_LIMIT_MAX ||
16293dcc084Schristos write_burst > EV_RATE_LIMIT_MAX)
16393dcc084Schristos return NULL;
16493dcc084Schristos r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
16593dcc084Schristos if (!r)
16693dcc084Schristos return NULL;
16793dcc084Schristos r->read_rate = read_rate;
16893dcc084Schristos r->write_rate = write_rate;
16993dcc084Schristos r->read_maximum = read_burst;
17093dcc084Schristos r->write_maximum = write_burst;
17193dcc084Schristos memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
17293dcc084Schristos r->msec_per_tick = (tick_len->tv_sec * 1000) +
17393dcc084Schristos (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
17493dcc084Schristos return r;
17593dcc084Schristos }
17693dcc084Schristos
17793dcc084Schristos void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)17893dcc084Schristos ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
17993dcc084Schristos {
18093dcc084Schristos mm_free(cfg);
18193dcc084Schristos }
18293dcc084Schristos
183615c371fSchristos /* Default values for max_single_read & max_single_write variables. */
184615c371fSchristos #define MAX_SINGLE_READ_DEFAULT 16384
185615c371fSchristos #define MAX_SINGLE_WRITE_DEFAULT 16384
18693dcc084Schristos
18793dcc084Schristos #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
18893dcc084Schristos #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
18993dcc084Schristos
190615c371fSchristos static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191615c371fSchristos static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192615c371fSchristos static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193615c371fSchristos static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
19493dcc084Schristos
19593dcc084Schristos /** Helper: figure out the maximum amount we should write if is_write, or
19693dcc084Schristos the maximum amount we should read if is_read. Return that maximum, or
19793dcc084Schristos 0 if our bucket is wholly exhausted.
19893dcc084Schristos */
19993dcc084Schristos static inline ev_ssize_t
bufferevent_get_rlim_max_(struct bufferevent_private * bev,int is_write)200615c371fSchristos bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
20193dcc084Schristos {
20293dcc084Schristos /* needs lock on bev. */
203615c371fSchristos ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
20493dcc084Schristos
20593dcc084Schristos #define LIM(x) \
20693dcc084Schristos (is_write ? (x).write_limit : (x).read_limit)
20793dcc084Schristos
20893dcc084Schristos #define GROUP_SUSPENDED(g) \
20993dcc084Schristos (is_write ? (g)->write_suspended : (g)->read_suspended)
21093dcc084Schristos
21193dcc084Schristos /* Sets max_so_far to MIN(x, max_so_far) */
21293dcc084Schristos #define CLAMPTO(x) \
21393dcc084Schristos do { \
21493dcc084Schristos if (max_so_far > (x)) \
21593dcc084Schristos max_so_far = (x); \
216*7c5f803cSrillig } while (0);
21793dcc084Schristos
21893dcc084Schristos if (!bev->rate_limiting)
21993dcc084Schristos return max_so_far;
22093dcc084Schristos
22193dcc084Schristos /* If rate-limiting is enabled at all, update the appropriate
22293dcc084Schristos bucket, and take the smaller of our rate limit and the group
22393dcc084Schristos rate limit.
22493dcc084Schristos */
22593dcc084Schristos
22693dcc084Schristos if (bev->rate_limiting->cfg) {
22793dcc084Schristos bufferevent_update_buckets(bev);
22893dcc084Schristos max_so_far = LIM(bev->rate_limiting->limit);
22993dcc084Schristos }
23093dcc084Schristos if (bev->rate_limiting->group) {
23193dcc084Schristos struct bufferevent_rate_limit_group *g =
23293dcc084Schristos bev->rate_limiting->group;
23393dcc084Schristos ev_ssize_t share;
23493dcc084Schristos LOCK_GROUP(g);
23593dcc084Schristos if (GROUP_SUSPENDED(g)) {
23693dcc084Schristos /* We can get here if we failed to lock this
23793dcc084Schristos * particular bufferevent while suspending the whole
23893dcc084Schristos * group. */
23993dcc084Schristos if (is_write)
240615c371fSchristos bufferevent_suspend_write_(&bev->bev,
24193dcc084Schristos BEV_SUSPEND_BW_GROUP);
24293dcc084Schristos else
243615c371fSchristos bufferevent_suspend_read_(&bev->bev,
24493dcc084Schristos BEV_SUSPEND_BW_GROUP);
24593dcc084Schristos share = 0;
24693dcc084Schristos } else {
24793dcc084Schristos /* XXXX probably we should divide among the active
24893dcc084Schristos * members, not the total members. */
24993dcc084Schristos share = LIM(g->rate_limit) / g->n_members;
25093dcc084Schristos if (share < g->min_share)
25193dcc084Schristos share = g->min_share;
25293dcc084Schristos }
25393dcc084Schristos UNLOCK_GROUP(g);
25493dcc084Schristos CLAMPTO(share);
25593dcc084Schristos }
25693dcc084Schristos
25793dcc084Schristos if (max_so_far < 0)
25893dcc084Schristos max_so_far = 0;
25993dcc084Schristos return max_so_far;
26093dcc084Schristos }
26193dcc084Schristos
26293dcc084Schristos ev_ssize_t
bufferevent_get_read_max_(struct bufferevent_private * bev)263615c371fSchristos bufferevent_get_read_max_(struct bufferevent_private *bev)
26493dcc084Schristos {
265615c371fSchristos return bufferevent_get_rlim_max_(bev, 0);
26693dcc084Schristos }
26793dcc084Schristos
26893dcc084Schristos ev_ssize_t
bufferevent_get_write_max_(struct bufferevent_private * bev)269615c371fSchristos bufferevent_get_write_max_(struct bufferevent_private *bev)
27093dcc084Schristos {
271615c371fSchristos return bufferevent_get_rlim_max_(bev, 1);
27293dcc084Schristos }
27393dcc084Schristos
27493dcc084Schristos int
bufferevent_decrement_read_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)275615c371fSchristos bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
27693dcc084Schristos {
27793dcc084Schristos /* XXXXX Make sure all users of this function check its return value */
27893dcc084Schristos int r = 0;
27993dcc084Schristos /* need to hold lock on bev */
28093dcc084Schristos if (!bev->rate_limiting)
28193dcc084Schristos return 0;
28293dcc084Schristos
28393dcc084Schristos if (bev->rate_limiting->cfg) {
28493dcc084Schristos bev->rate_limiting->limit.read_limit -= bytes;
28593dcc084Schristos if (bev->rate_limiting->limit.read_limit <= 0) {
286615c371fSchristos bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
28793dcc084Schristos if (event_add(&bev->rate_limiting->refill_bucket_event,
28893dcc084Schristos &bev->rate_limiting->cfg->tick_timeout) < 0)
28993dcc084Schristos r = -1;
29093dcc084Schristos } else if (bev->read_suspended & BEV_SUSPEND_BW) {
29193dcc084Schristos if (!(bev->write_suspended & BEV_SUSPEND_BW))
29293dcc084Schristos event_del(&bev->rate_limiting->refill_bucket_event);
293615c371fSchristos bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
29493dcc084Schristos }
29593dcc084Schristos }
29693dcc084Schristos
29793dcc084Schristos if (bev->rate_limiting->group) {
29893dcc084Schristos LOCK_GROUP(bev->rate_limiting->group);
29993dcc084Schristos bev->rate_limiting->group->rate_limit.read_limit -= bytes;
30093dcc084Schristos bev->rate_limiting->group->total_read += bytes;
30193dcc084Schristos if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302615c371fSchristos bev_group_suspend_reading_(bev->rate_limiting->group);
30393dcc084Schristos } else if (bev->rate_limiting->group->read_suspended) {
304615c371fSchristos bev_group_unsuspend_reading_(bev->rate_limiting->group);
30593dcc084Schristos }
30693dcc084Schristos UNLOCK_GROUP(bev->rate_limiting->group);
30793dcc084Schristos }
30893dcc084Schristos
30993dcc084Schristos return r;
31093dcc084Schristos }
31193dcc084Schristos
31293dcc084Schristos int
bufferevent_decrement_write_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)313615c371fSchristos bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
31493dcc084Schristos {
31593dcc084Schristos /* XXXXX Make sure all users of this function check its return value */
31693dcc084Schristos int r = 0;
31793dcc084Schristos /* need to hold lock */
31893dcc084Schristos if (!bev->rate_limiting)
31993dcc084Schristos return 0;
32093dcc084Schristos
32193dcc084Schristos if (bev->rate_limiting->cfg) {
32293dcc084Schristos bev->rate_limiting->limit.write_limit -= bytes;
32393dcc084Schristos if (bev->rate_limiting->limit.write_limit <= 0) {
324615c371fSchristos bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
32593dcc084Schristos if (event_add(&bev->rate_limiting->refill_bucket_event,
32693dcc084Schristos &bev->rate_limiting->cfg->tick_timeout) < 0)
32793dcc084Schristos r = -1;
32893dcc084Schristos } else if (bev->write_suspended & BEV_SUSPEND_BW) {
32993dcc084Schristos if (!(bev->read_suspended & BEV_SUSPEND_BW))
33093dcc084Schristos event_del(&bev->rate_limiting->refill_bucket_event);
331615c371fSchristos bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
33293dcc084Schristos }
33393dcc084Schristos }
33493dcc084Schristos
33593dcc084Schristos if (bev->rate_limiting->group) {
33693dcc084Schristos LOCK_GROUP(bev->rate_limiting->group);
33793dcc084Schristos bev->rate_limiting->group->rate_limit.write_limit -= bytes;
33893dcc084Schristos bev->rate_limiting->group->total_written += bytes;
33993dcc084Schristos if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340615c371fSchristos bev_group_suspend_writing_(bev->rate_limiting->group);
34193dcc084Schristos } else if (bev->rate_limiting->group->write_suspended) {
342615c371fSchristos bev_group_unsuspend_writing_(bev->rate_limiting->group);
34393dcc084Schristos }
34493dcc084Schristos UNLOCK_GROUP(bev->rate_limiting->group);
34593dcc084Schristos }
34693dcc084Schristos
34793dcc084Schristos return r;
34893dcc084Schristos }
34993dcc084Schristos
35093dcc084Schristos /** Stop reading on every bufferevent in <b>g</b> */
35193dcc084Schristos static int
bev_group_suspend_reading_(struct bufferevent_rate_limit_group * g)352615c371fSchristos bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
35393dcc084Schristos {
35493dcc084Schristos /* Needs group lock */
35593dcc084Schristos struct bufferevent_private *bev;
35693dcc084Schristos g->read_suspended = 1;
35793dcc084Schristos g->pending_unsuspend_read = 0;
35893dcc084Schristos
359615c371fSchristos /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
36093dcc084Schristos to prevent a deadlock. (Ordinarily, the group lock nests inside
36193dcc084Schristos the bufferevent locks. If we are unable to lock any individual
36293dcc084Schristos bufferevent, it will find out later when it looks at its limit
363615c371fSchristos and sees that its group is suspended.)
36493dcc084Schristos */
365615c371fSchristos LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366615c371fSchristos if (EVLOCK_TRY_LOCK_(bev->lock)) {
367615c371fSchristos bufferevent_suspend_read_(&bev->bev,
36893dcc084Schristos BEV_SUSPEND_BW_GROUP);
36993dcc084Schristos EVLOCK_UNLOCK(bev->lock, 0);
37093dcc084Schristos }
37193dcc084Schristos }
37293dcc084Schristos return 0;
37393dcc084Schristos }
37493dcc084Schristos
37593dcc084Schristos /** Stop writing on every bufferevent in <b>g</b> */
37693dcc084Schristos static int
bev_group_suspend_writing_(struct bufferevent_rate_limit_group * g)377615c371fSchristos bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
37893dcc084Schristos {
37993dcc084Schristos /* Needs group lock */
38093dcc084Schristos struct bufferevent_private *bev;
38193dcc084Schristos g->write_suspended = 1;
38293dcc084Schristos g->pending_unsuspend_write = 0;
383615c371fSchristos LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384615c371fSchristos if (EVLOCK_TRY_LOCK_(bev->lock)) {
385615c371fSchristos bufferevent_suspend_write_(&bev->bev,
38693dcc084Schristos BEV_SUSPEND_BW_GROUP);
38793dcc084Schristos EVLOCK_UNLOCK(bev->lock, 0);
38893dcc084Schristos }
38993dcc084Schristos }
39093dcc084Schristos return 0;
39193dcc084Schristos }
39293dcc084Schristos
39393dcc084Schristos /** Timer callback invoked on a single bufferevent with one or more exhausted
39493dcc084Schristos buckets when they are ready to refill. */
39593dcc084Schristos static void
bev_refill_callback_(evutil_socket_t fd,short what,void * arg)396615c371fSchristos bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
39793dcc084Schristos {
39893dcc084Schristos unsigned tick;
39993dcc084Schristos struct timeval now;
40093dcc084Schristos struct bufferevent_private *bev = arg;
40193dcc084Schristos int again = 0;
40293dcc084Schristos BEV_LOCK(&bev->bev);
40393dcc084Schristos if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
40493dcc084Schristos BEV_UNLOCK(&bev->bev);
40593dcc084Schristos return;
40693dcc084Schristos }
40793dcc084Schristos
40893dcc084Schristos /* First, update the bucket */
40993dcc084Schristos event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410615c371fSchristos tick = ev_token_bucket_get_tick_(&now,
41193dcc084Schristos bev->rate_limiting->cfg);
412615c371fSchristos ev_token_bucket_update_(&bev->rate_limiting->limit,
41393dcc084Schristos bev->rate_limiting->cfg,
41493dcc084Schristos tick);
41593dcc084Schristos
41693dcc084Schristos /* Now unsuspend any read/write operations as appropriate. */
41793dcc084Schristos if ((bev->read_suspended & BEV_SUSPEND_BW)) {
41893dcc084Schristos if (bev->rate_limiting->limit.read_limit > 0)
419615c371fSchristos bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
42093dcc084Schristos else
42193dcc084Schristos again = 1;
42293dcc084Schristos }
42393dcc084Schristos if ((bev->write_suspended & BEV_SUSPEND_BW)) {
42493dcc084Schristos if (bev->rate_limiting->limit.write_limit > 0)
425615c371fSchristos bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
42693dcc084Schristos else
42793dcc084Schristos again = 1;
42893dcc084Schristos }
42993dcc084Schristos if (again) {
43093dcc084Schristos /* One or more of the buckets may need another refill if they
43193dcc084Schristos started negative.
43293dcc084Schristos
43393dcc084Schristos XXXX if we need to be quiet for more ticks, we should
43493dcc084Schristos maybe figure out what timeout we really want.
43593dcc084Schristos */
43693dcc084Schristos /* XXXX Handle event_add failure somehow */
43793dcc084Schristos event_add(&bev->rate_limiting->refill_bucket_event,
43893dcc084Schristos &bev->rate_limiting->cfg->tick_timeout);
43993dcc084Schristos }
44093dcc084Schristos BEV_UNLOCK(&bev->bev);
44193dcc084Schristos }
44293dcc084Schristos
443615c371fSchristos /** Helper: grab a random element from a bufferevent group.
444615c371fSchristos *
445615c371fSchristos * Requires that we hold the lock on the group.
446615c371fSchristos */
44793dcc084Schristos static struct bufferevent_private *
bev_group_random_element_(struct bufferevent_rate_limit_group * group)448615c371fSchristos bev_group_random_element_(struct bufferevent_rate_limit_group *group)
44993dcc084Schristos {
45093dcc084Schristos int which;
45193dcc084Schristos struct bufferevent_private *bev;
45293dcc084Schristos
45393dcc084Schristos /* requires group lock */
45493dcc084Schristos
45593dcc084Schristos if (!group->n_members)
45693dcc084Schristos return NULL;
45793dcc084Schristos
458615c371fSchristos EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
45993dcc084Schristos
460615c371fSchristos which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
46193dcc084Schristos
462615c371fSchristos bev = LIST_FIRST(&group->members);
46393dcc084Schristos while (which--)
464615c371fSchristos bev = LIST_NEXT(bev, rate_limiting->next_in_group);
46593dcc084Schristos
46693dcc084Schristos return bev;
46793dcc084Schristos }
46893dcc084Schristos
46993dcc084Schristos /** Iterate over the elements of a rate-limiting group 'g' with a random
47093dcc084Schristos starting point, assigning each to the variable 'bev', and executing the
47193dcc084Schristos block 'block'.
47293dcc084Schristos
47393dcc084Schristos We do this in a half-baked effort to get fairness among group members.
47493dcc084Schristos XXX Round-robin or some kind of priority queue would be even more fair.
47593dcc084Schristos */
47693dcc084Schristos #define FOREACH_RANDOM_ORDER(block) \
47793dcc084Schristos do { \
478615c371fSchristos first = bev_group_random_element_(g); \
479615c371fSchristos for (bev = first; bev != LIST_END(&g->members); \
480615c371fSchristos bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
48193dcc084Schristos block ; \
48293dcc084Schristos } \
483615c371fSchristos for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484615c371fSchristos bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
48593dcc084Schristos block ; \
48693dcc084Schristos } \
487c767aba8Srillig } while (0)
48893dcc084Schristos
48993dcc084Schristos static void
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group * g)490615c371fSchristos bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
49193dcc084Schristos {
49293dcc084Schristos int again = 0;
49393dcc084Schristos struct bufferevent_private *bev, *first;
49493dcc084Schristos
49593dcc084Schristos g->read_suspended = 0;
49693dcc084Schristos FOREACH_RANDOM_ORDER({
497615c371fSchristos if (EVLOCK_TRY_LOCK_(bev->lock)) {
498615c371fSchristos bufferevent_unsuspend_read_(&bev->bev,
49993dcc084Schristos BEV_SUSPEND_BW_GROUP);
50093dcc084Schristos EVLOCK_UNLOCK(bev->lock, 0);
50193dcc084Schristos } else {
50293dcc084Schristos again = 1;
50393dcc084Schristos }
50493dcc084Schristos });
50593dcc084Schristos g->pending_unsuspend_read = again;
50693dcc084Schristos }
50793dcc084Schristos
50893dcc084Schristos static void
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group * g)509615c371fSchristos bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
51093dcc084Schristos {
51193dcc084Schristos int again = 0;
51293dcc084Schristos struct bufferevent_private *bev, *first;
51393dcc084Schristos g->write_suspended = 0;
51493dcc084Schristos
51593dcc084Schristos FOREACH_RANDOM_ORDER({
516615c371fSchristos if (EVLOCK_TRY_LOCK_(bev->lock)) {
517615c371fSchristos bufferevent_unsuspend_write_(&bev->bev,
51893dcc084Schristos BEV_SUSPEND_BW_GROUP);
51993dcc084Schristos EVLOCK_UNLOCK(bev->lock, 0);
52093dcc084Schristos } else {
52193dcc084Schristos again = 1;
52293dcc084Schristos }
52393dcc084Schristos });
52493dcc084Schristos g->pending_unsuspend_write = again;
52593dcc084Schristos }
52693dcc084Schristos
52793dcc084Schristos /** Callback invoked every tick to add more elements to the group bucket
52893dcc084Schristos and unsuspend group members as needed.
52993dcc084Schristos */
53093dcc084Schristos static void
bev_group_refill_callback_(evutil_socket_t fd,short what,void * arg)531615c371fSchristos bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
53293dcc084Schristos {
53393dcc084Schristos struct bufferevent_rate_limit_group *g = arg;
53493dcc084Schristos unsigned tick;
53593dcc084Schristos struct timeval now;
53693dcc084Schristos
53793dcc084Schristos event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
53893dcc084Schristos
53993dcc084Schristos LOCK_GROUP(g);
54093dcc084Schristos
541615c371fSchristos tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542615c371fSchristos ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
54393dcc084Schristos
54493dcc084Schristos if (g->pending_unsuspend_read ||
54593dcc084Schristos (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546615c371fSchristos bev_group_unsuspend_reading_(g);
54793dcc084Schristos }
54893dcc084Schristos if (g->pending_unsuspend_write ||
54993dcc084Schristos (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550615c371fSchristos bev_group_unsuspend_writing_(g);
55193dcc084Schristos }
55293dcc084Schristos
55393dcc084Schristos /* XXXX Rather than waiting to the next tick to unsuspend stuff
55493dcc084Schristos * with pending_unsuspend_write/read, we should do it on the
55593dcc084Schristos * next iteration of the mainloop.
55693dcc084Schristos */
55793dcc084Schristos
55893dcc084Schristos UNLOCK_GROUP(g);
55993dcc084Schristos }
56093dcc084Schristos
56193dcc084Schristos int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)56293dcc084Schristos bufferevent_set_rate_limit(struct bufferevent *bev,
56393dcc084Schristos struct ev_token_bucket_cfg *cfg)
56493dcc084Schristos {
5654109d450Schristos struct bufferevent_private *bevp = BEV_UPCAST(bev);
56693dcc084Schristos int r = -1;
56793dcc084Schristos struct bufferevent_rate_limit *rlim;
56893dcc084Schristos struct timeval now;
56993dcc084Schristos ev_uint32_t tick;
57093dcc084Schristos int reinit = 0, suspended = 0;
57193dcc084Schristos /* XXX reference-count cfg */
57293dcc084Schristos
57393dcc084Schristos BEV_LOCK(bev);
57493dcc084Schristos
57593dcc084Schristos if (cfg == NULL) {
57693dcc084Schristos if (bevp->rate_limiting) {
57793dcc084Schristos rlim = bevp->rate_limiting;
57893dcc084Schristos rlim->cfg = NULL;
579615c371fSchristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
580615c371fSchristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
58193dcc084Schristos if (event_initialized(&rlim->refill_bucket_event))
58293dcc084Schristos event_del(&rlim->refill_bucket_event);
58393dcc084Schristos }
58493dcc084Schristos r = 0;
58593dcc084Schristos goto done;
58693dcc084Schristos }
58793dcc084Schristos
58893dcc084Schristos event_base_gettimeofday_cached(bev->ev_base, &now);
589615c371fSchristos tick = ev_token_bucket_get_tick_(&now, cfg);
59093dcc084Schristos
59193dcc084Schristos if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
59293dcc084Schristos /* no-op */
59393dcc084Schristos r = 0;
59493dcc084Schristos goto done;
59593dcc084Schristos }
59693dcc084Schristos if (bevp->rate_limiting == NULL) {
59793dcc084Schristos rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
59893dcc084Schristos if (!rlim)
59993dcc084Schristos goto done;
60093dcc084Schristos bevp->rate_limiting = rlim;
60193dcc084Schristos } else {
60293dcc084Schristos rlim = bevp->rate_limiting;
60393dcc084Schristos }
60493dcc084Schristos reinit = rlim->cfg != NULL;
60593dcc084Schristos
60693dcc084Schristos rlim->cfg = cfg;
607615c371fSchristos ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
60893dcc084Schristos
60993dcc084Schristos if (reinit) {
61093dcc084Schristos EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
61193dcc084Schristos event_del(&rlim->refill_bucket_event);
61293dcc084Schristos }
613615c371fSchristos event_assign(&rlim->refill_bucket_event, bev->ev_base,
614615c371fSchristos -1, EV_FINALIZE, bev_refill_callback_, bevp);
61593dcc084Schristos
61693dcc084Schristos if (rlim->limit.read_limit > 0) {
617615c371fSchristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
61893dcc084Schristos } else {
619615c371fSchristos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
62093dcc084Schristos suspended=1;
62193dcc084Schristos }
62293dcc084Schristos if (rlim->limit.write_limit > 0) {
623615c371fSchristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
62493dcc084Schristos } else {
625615c371fSchristos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
62693dcc084Schristos suspended = 1;
62793dcc084Schristos }
62893dcc084Schristos
62993dcc084Schristos if (suspended)
63093dcc084Schristos event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
63193dcc084Schristos
63293dcc084Schristos r = 0;
63393dcc084Schristos
63493dcc084Schristos done:
63593dcc084Schristos BEV_UNLOCK(bev);
63693dcc084Schristos return r;
63793dcc084Schristos }
63893dcc084Schristos
63993dcc084Schristos struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)64093dcc084Schristos bufferevent_rate_limit_group_new(struct event_base *base,
64193dcc084Schristos const struct ev_token_bucket_cfg *cfg)
64293dcc084Schristos {
64393dcc084Schristos struct bufferevent_rate_limit_group *g;
64493dcc084Schristos struct timeval now;
64593dcc084Schristos ev_uint32_t tick;
64693dcc084Schristos
64793dcc084Schristos event_base_gettimeofday_cached(base, &now);
648615c371fSchristos tick = ev_token_bucket_get_tick_(&now, cfg);
64993dcc084Schristos
65093dcc084Schristos g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
65193dcc084Schristos if (!g)
65293dcc084Schristos return NULL;
65393dcc084Schristos memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
654615c371fSchristos LIST_INIT(&g->members);
65593dcc084Schristos
656615c371fSchristos ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
65793dcc084Schristos
658615c371fSchristos event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
659615c371fSchristos bev_group_refill_callback_, g);
66093dcc084Schristos /*XXXX handle event_add failure */
66193dcc084Schristos event_add(&g->master_refill_event, &cfg->tick_timeout);
66293dcc084Schristos
66393dcc084Schristos EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
66493dcc084Schristos
66593dcc084Schristos bufferevent_rate_limit_group_set_min_share(g, 64);
66693dcc084Schristos
667615c371fSchristos evutil_weakrand_seed_(&g->weakrand_seed,
668615c371fSchristos (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
669615c371fSchristos
67093dcc084Schristos return g;
67193dcc084Schristos }
67293dcc084Schristos
67393dcc084Schristos int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)67493dcc084Schristos bufferevent_rate_limit_group_set_cfg(
67593dcc084Schristos struct bufferevent_rate_limit_group *g,
67693dcc084Schristos const struct ev_token_bucket_cfg *cfg)
67793dcc084Schristos {
67893dcc084Schristos int same_tick;
67993dcc084Schristos if (!g || !cfg)
68093dcc084Schristos return -1;
68193dcc084Schristos
68293dcc084Schristos LOCK_GROUP(g);
68393dcc084Schristos same_tick = evutil_timercmp(
68493dcc084Schristos &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
68593dcc084Schristos memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
68693dcc084Schristos
68793dcc084Schristos if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
68893dcc084Schristos g->rate_limit.read_limit = cfg->read_maximum;
68993dcc084Schristos if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
69093dcc084Schristos g->rate_limit.write_limit = cfg->write_maximum;
69193dcc084Schristos
69293dcc084Schristos if (!same_tick) {
69393dcc084Schristos /* This can cause a hiccup in the schedule */
69493dcc084Schristos event_add(&g->master_refill_event, &cfg->tick_timeout);
69593dcc084Schristos }
69693dcc084Schristos
69793dcc084Schristos /* The new limits might force us to adjust min_share differently. */
69893dcc084Schristos bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
69993dcc084Schristos
70093dcc084Schristos UNLOCK_GROUP(g);
70193dcc084Schristos return 0;
70293dcc084Schristos }
70393dcc084Schristos
70493dcc084Schristos int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)70593dcc084Schristos bufferevent_rate_limit_group_set_min_share(
70693dcc084Schristos struct bufferevent_rate_limit_group *g,
70793dcc084Schristos size_t share)
70893dcc084Schristos {
70993dcc084Schristos if (share > EV_SSIZE_MAX)
71093dcc084Schristos return -1;
71193dcc084Schristos
71293dcc084Schristos g->configured_min_share = share;
71393dcc084Schristos
71493dcc084Schristos /* Can't set share to less than the one-tick maximum. IOW, at steady
71593dcc084Schristos * state, at least one connection can go per tick. */
71693dcc084Schristos if (share > g->rate_limit_cfg.read_rate)
71793dcc084Schristos share = g->rate_limit_cfg.read_rate;
71893dcc084Schristos if (share > g->rate_limit_cfg.write_rate)
71993dcc084Schristos share = g->rate_limit_cfg.write_rate;
72093dcc084Schristos
72193dcc084Schristos g->min_share = share;
72293dcc084Schristos return 0;
72393dcc084Schristos }
72493dcc084Schristos
72593dcc084Schristos void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)72693dcc084Schristos bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
72793dcc084Schristos {
72893dcc084Schristos LOCK_GROUP(g);
72993dcc084Schristos EVUTIL_ASSERT(0 == g->n_members);
73093dcc084Schristos event_del(&g->master_refill_event);
73193dcc084Schristos UNLOCK_GROUP(g);
73293dcc084Schristos EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
73393dcc084Schristos mm_free(g);
73493dcc084Schristos }
73593dcc084Schristos
73693dcc084Schristos int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)73793dcc084Schristos bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
73893dcc084Schristos struct bufferevent_rate_limit_group *g)
73993dcc084Schristos {
74093dcc084Schristos int wsuspend, rsuspend;
7414109d450Schristos struct bufferevent_private *bevp = BEV_UPCAST(bev);
74293dcc084Schristos BEV_LOCK(bev);
74393dcc084Schristos
74493dcc084Schristos if (!bevp->rate_limiting) {
74593dcc084Schristos struct bufferevent_rate_limit *rlim;
74693dcc084Schristos rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
74793dcc084Schristos if (!rlim) {
74893dcc084Schristos BEV_UNLOCK(bev);
74993dcc084Schristos return -1;
75093dcc084Schristos }
751615c371fSchristos event_assign(&rlim->refill_bucket_event, bev->ev_base,
752615c371fSchristos -1, EV_FINALIZE, bev_refill_callback_, bevp);
75393dcc084Schristos bevp->rate_limiting = rlim;
75493dcc084Schristos }
75593dcc084Schristos
75693dcc084Schristos if (bevp->rate_limiting->group == g) {
75793dcc084Schristos BEV_UNLOCK(bev);
75893dcc084Schristos return 0;
75993dcc084Schristos }
76093dcc084Schristos if (bevp->rate_limiting->group)
76193dcc084Schristos bufferevent_remove_from_rate_limit_group(bev);
76293dcc084Schristos
76393dcc084Schristos LOCK_GROUP(g);
76493dcc084Schristos bevp->rate_limiting->group = g;
76593dcc084Schristos ++g->n_members;
766615c371fSchristos LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
76793dcc084Schristos
76893dcc084Schristos rsuspend = g->read_suspended;
76993dcc084Schristos wsuspend = g->write_suspended;
77093dcc084Schristos
77193dcc084Schristos UNLOCK_GROUP(g);
77293dcc084Schristos
77393dcc084Schristos if (rsuspend)
774615c371fSchristos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
77593dcc084Schristos if (wsuspend)
776615c371fSchristos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
77793dcc084Schristos
77893dcc084Schristos BEV_UNLOCK(bev);
77993dcc084Schristos return 0;
78093dcc084Schristos }
78193dcc084Schristos
78293dcc084Schristos int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)78393dcc084Schristos bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
78493dcc084Schristos {
785615c371fSchristos return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
78693dcc084Schristos }
78793dcc084Schristos
78893dcc084Schristos int
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent * bev,int unsuspend)789615c371fSchristos bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
79093dcc084Schristos int unsuspend)
79193dcc084Schristos {
7924109d450Schristos struct bufferevent_private *bevp = BEV_UPCAST(bev);
79393dcc084Schristos BEV_LOCK(bev);
79493dcc084Schristos if (bevp->rate_limiting && bevp->rate_limiting->group) {
79593dcc084Schristos struct bufferevent_rate_limit_group *g =
79693dcc084Schristos bevp->rate_limiting->group;
79793dcc084Schristos LOCK_GROUP(g);
79893dcc084Schristos bevp->rate_limiting->group = NULL;
79993dcc084Schristos --g->n_members;
800615c371fSchristos LIST_REMOVE(bevp, rate_limiting->next_in_group);
80193dcc084Schristos UNLOCK_GROUP(g);
80293dcc084Schristos }
80393dcc084Schristos if (unsuspend) {
804615c371fSchristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
805615c371fSchristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
80693dcc084Schristos }
80793dcc084Schristos BEV_UNLOCK(bev);
80893dcc084Schristos return 0;
80993dcc084Schristos }
81093dcc084Schristos
81193dcc084Schristos /* ===
81293dcc084Schristos * API functions to expose rate limits.
81393dcc084Schristos *
81493dcc084Schristos * Don't use these from inside Libevent; they're meant to be for use by
81593dcc084Schristos * the program.
81693dcc084Schristos * === */
81793dcc084Schristos
81893dcc084Schristos /* Mostly you don't want to use this function from inside libevent;
819615c371fSchristos * bufferevent_get_read_max_() is more likely what you want*/
82093dcc084Schristos ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)82193dcc084Schristos bufferevent_get_read_limit(struct bufferevent *bev)
82293dcc084Schristos {
82393dcc084Schristos ev_ssize_t r;
82493dcc084Schristos struct bufferevent_private *bevp;
82593dcc084Schristos BEV_LOCK(bev);
82693dcc084Schristos bevp = BEV_UPCAST(bev);
82793dcc084Schristos if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
82893dcc084Schristos bufferevent_update_buckets(bevp);
82993dcc084Schristos r = bevp->rate_limiting->limit.read_limit;
83093dcc084Schristos } else {
83193dcc084Schristos r = EV_SSIZE_MAX;
83293dcc084Schristos }
83393dcc084Schristos BEV_UNLOCK(bev);
83493dcc084Schristos return r;
83593dcc084Schristos }
83693dcc084Schristos
83793dcc084Schristos /* Mostly you don't want to use this function from inside libevent;
838615c371fSchristos * bufferevent_get_write_max_() is more likely what you want*/
83993dcc084Schristos ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)84093dcc084Schristos bufferevent_get_write_limit(struct bufferevent *bev)
84193dcc084Schristos {
84293dcc084Schristos ev_ssize_t r;
84393dcc084Schristos struct bufferevent_private *bevp;
84493dcc084Schristos BEV_LOCK(bev);
84593dcc084Schristos bevp = BEV_UPCAST(bev);
84693dcc084Schristos if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
84793dcc084Schristos bufferevent_update_buckets(bevp);
84893dcc084Schristos r = bevp->rate_limiting->limit.write_limit;
84993dcc084Schristos } else {
85093dcc084Schristos r = EV_SSIZE_MAX;
85193dcc084Schristos }
85293dcc084Schristos BEV_UNLOCK(bev);
85393dcc084Schristos return r;
85493dcc084Schristos }
85593dcc084Schristos
856615c371fSchristos int
bufferevent_set_max_single_read(struct bufferevent * bev,size_t size)857615c371fSchristos bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
858615c371fSchristos {
859615c371fSchristos struct bufferevent_private *bevp;
860615c371fSchristos BEV_LOCK(bev);
861615c371fSchristos bevp = BEV_UPCAST(bev);
862615c371fSchristos if (size == 0 || size > EV_SSIZE_MAX)
863615c371fSchristos bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
864615c371fSchristos else
865615c371fSchristos bevp->max_single_read = size;
866615c371fSchristos BEV_UNLOCK(bev);
867615c371fSchristos return 0;
868615c371fSchristos }
869615c371fSchristos
870615c371fSchristos int
bufferevent_set_max_single_write(struct bufferevent * bev,size_t size)871615c371fSchristos bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
872615c371fSchristos {
873615c371fSchristos struct bufferevent_private *bevp;
874615c371fSchristos BEV_LOCK(bev);
875615c371fSchristos bevp = BEV_UPCAST(bev);
876615c371fSchristos if (size == 0 || size > EV_SSIZE_MAX)
877615c371fSchristos bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
878615c371fSchristos else
879615c371fSchristos bevp->max_single_write = size;
880615c371fSchristos BEV_UNLOCK(bev);
881615c371fSchristos return 0;
882615c371fSchristos }
883615c371fSchristos
884615c371fSchristos ev_ssize_t
bufferevent_get_max_single_read(struct bufferevent * bev)885615c371fSchristos bufferevent_get_max_single_read(struct bufferevent *bev)
886615c371fSchristos {
887615c371fSchristos ev_ssize_t r;
888615c371fSchristos
889615c371fSchristos BEV_LOCK(bev);
890615c371fSchristos r = BEV_UPCAST(bev)->max_single_read;
891615c371fSchristos BEV_UNLOCK(bev);
892615c371fSchristos return r;
893615c371fSchristos }
894615c371fSchristos
895615c371fSchristos ev_ssize_t
bufferevent_get_max_single_write(struct bufferevent * bev)896615c371fSchristos bufferevent_get_max_single_write(struct bufferevent *bev)
897615c371fSchristos {
898615c371fSchristos ev_ssize_t r;
899615c371fSchristos
900615c371fSchristos BEV_LOCK(bev);
901615c371fSchristos r = BEV_UPCAST(bev)->max_single_write;
902615c371fSchristos BEV_UNLOCK(bev);
903615c371fSchristos return r;
904615c371fSchristos }
905615c371fSchristos
90693dcc084Schristos ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)90793dcc084Schristos bufferevent_get_max_to_read(struct bufferevent *bev)
90893dcc084Schristos {
90993dcc084Schristos ev_ssize_t r;
91093dcc084Schristos BEV_LOCK(bev);
911615c371fSchristos r = bufferevent_get_read_max_(BEV_UPCAST(bev));
91293dcc084Schristos BEV_UNLOCK(bev);
91393dcc084Schristos return r;
91493dcc084Schristos }
91593dcc084Schristos
91693dcc084Schristos ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)91793dcc084Schristos bufferevent_get_max_to_write(struct bufferevent *bev)
91893dcc084Schristos {
91993dcc084Schristos ev_ssize_t r;
92093dcc084Schristos BEV_LOCK(bev);
921615c371fSchristos r = bufferevent_get_write_max_(BEV_UPCAST(bev));
92293dcc084Schristos BEV_UNLOCK(bev);
92393dcc084Schristos return r;
92493dcc084Schristos }
92593dcc084Schristos
926615c371fSchristos const struct ev_token_bucket_cfg *
bufferevent_get_token_bucket_cfg(const struct bufferevent * bev)927615c371fSchristos bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
928615c371fSchristos struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
929615c371fSchristos struct ev_token_bucket_cfg *cfg;
930615c371fSchristos
931615c371fSchristos BEV_LOCK(bev);
932615c371fSchristos
933615c371fSchristos if (bufev_private->rate_limiting) {
934615c371fSchristos cfg = bufev_private->rate_limiting->cfg;
935615c371fSchristos } else {
936615c371fSchristos cfg = NULL;
937615c371fSchristos }
938615c371fSchristos
939615c371fSchristos BEV_UNLOCK(bev);
940615c371fSchristos
941615c371fSchristos return cfg;
942615c371fSchristos }
94393dcc084Schristos
94493dcc084Schristos /* Mostly you don't want to use this function from inside libevent;
945615c371fSchristos * bufferevent_get_read_max_() is more likely what you want*/
94693dcc084Schristos ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)94793dcc084Schristos bufferevent_rate_limit_group_get_read_limit(
94893dcc084Schristos struct bufferevent_rate_limit_group *grp)
94993dcc084Schristos {
95093dcc084Schristos ev_ssize_t r;
95193dcc084Schristos LOCK_GROUP(grp);
95293dcc084Schristos r = grp->rate_limit.read_limit;
95393dcc084Schristos UNLOCK_GROUP(grp);
95493dcc084Schristos return r;
95593dcc084Schristos }
95693dcc084Schristos
95793dcc084Schristos /* Mostly you don't want to use this function from inside libevent;
958615c371fSchristos * bufferevent_get_write_max_() is more likely what you want. */
95993dcc084Schristos ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)96093dcc084Schristos bufferevent_rate_limit_group_get_write_limit(
96193dcc084Schristos struct bufferevent_rate_limit_group *grp)
96293dcc084Schristos {
96393dcc084Schristos ev_ssize_t r;
96493dcc084Schristos LOCK_GROUP(grp);
96593dcc084Schristos r = grp->rate_limit.write_limit;
96693dcc084Schristos UNLOCK_GROUP(grp);
96793dcc084Schristos return r;
96893dcc084Schristos }
96993dcc084Schristos
97093dcc084Schristos int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)97193dcc084Schristos bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
97293dcc084Schristos {
97393dcc084Schristos int r = 0;
97493dcc084Schristos ev_ssize_t old_limit, new_limit;
97593dcc084Schristos struct bufferevent_private *bevp;
97693dcc084Schristos BEV_LOCK(bev);
97793dcc084Schristos bevp = BEV_UPCAST(bev);
97893dcc084Schristos EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
97993dcc084Schristos old_limit = bevp->rate_limiting->limit.read_limit;
98093dcc084Schristos
98193dcc084Schristos new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
98293dcc084Schristos if (old_limit > 0 && new_limit <= 0) {
983615c371fSchristos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
98493dcc084Schristos if (event_add(&bevp->rate_limiting->refill_bucket_event,
98593dcc084Schristos &bevp->rate_limiting->cfg->tick_timeout) < 0)
98693dcc084Schristos r = -1;
98793dcc084Schristos } else if (old_limit <= 0 && new_limit > 0) {
98893dcc084Schristos if (!(bevp->write_suspended & BEV_SUSPEND_BW))
98993dcc084Schristos event_del(&bevp->rate_limiting->refill_bucket_event);
990615c371fSchristos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
99193dcc084Schristos }
99293dcc084Schristos
99393dcc084Schristos BEV_UNLOCK(bev);
99493dcc084Schristos return r;
99593dcc084Schristos }
99693dcc084Schristos
99793dcc084Schristos int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)99893dcc084Schristos bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
99993dcc084Schristos {
100093dcc084Schristos /* XXXX this is mostly copy-and-paste from
100193dcc084Schristos * bufferevent_decrement_read_limit */
100293dcc084Schristos int r = 0;
100393dcc084Schristos ev_ssize_t old_limit, new_limit;
100493dcc084Schristos struct bufferevent_private *bevp;
100593dcc084Schristos BEV_LOCK(bev);
100693dcc084Schristos bevp = BEV_UPCAST(bev);
100793dcc084Schristos EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
100893dcc084Schristos old_limit = bevp->rate_limiting->limit.write_limit;
100993dcc084Schristos
101093dcc084Schristos new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
101193dcc084Schristos if (old_limit > 0 && new_limit <= 0) {
1012615c371fSchristos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
101393dcc084Schristos if (event_add(&bevp->rate_limiting->refill_bucket_event,
101493dcc084Schristos &bevp->rate_limiting->cfg->tick_timeout) < 0)
101593dcc084Schristos r = -1;
101693dcc084Schristos } else if (old_limit <= 0 && new_limit > 0) {
101793dcc084Schristos if (!(bevp->read_suspended & BEV_SUSPEND_BW))
101893dcc084Schristos event_del(&bevp->rate_limiting->refill_bucket_event);
1019615c371fSchristos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
102093dcc084Schristos }
102193dcc084Schristos
102293dcc084Schristos BEV_UNLOCK(bev);
102393dcc084Schristos return r;
102493dcc084Schristos }
102593dcc084Schristos
102693dcc084Schristos int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)102793dcc084Schristos bufferevent_rate_limit_group_decrement_read(
102893dcc084Schristos struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
102993dcc084Schristos {
103093dcc084Schristos int r = 0;
103193dcc084Schristos ev_ssize_t old_limit, new_limit;
103293dcc084Schristos LOCK_GROUP(grp);
103393dcc084Schristos old_limit = grp->rate_limit.read_limit;
103493dcc084Schristos new_limit = (grp->rate_limit.read_limit -= decr);
103593dcc084Schristos
103693dcc084Schristos if (old_limit > 0 && new_limit <= 0) {
1037615c371fSchristos bev_group_suspend_reading_(grp);
103893dcc084Schristos } else if (old_limit <= 0 && new_limit > 0) {
1039615c371fSchristos bev_group_unsuspend_reading_(grp);
104093dcc084Schristos }
104193dcc084Schristos
104293dcc084Schristos UNLOCK_GROUP(grp);
104393dcc084Schristos return r;
104493dcc084Schristos }
104593dcc084Schristos
104693dcc084Schristos int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)104793dcc084Schristos bufferevent_rate_limit_group_decrement_write(
104893dcc084Schristos struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
104993dcc084Schristos {
105093dcc084Schristos int r = 0;
105193dcc084Schristos ev_ssize_t old_limit, new_limit;
105293dcc084Schristos LOCK_GROUP(grp);
105393dcc084Schristos old_limit = grp->rate_limit.write_limit;
105493dcc084Schristos new_limit = (grp->rate_limit.write_limit -= decr);
105593dcc084Schristos
105693dcc084Schristos if (old_limit > 0 && new_limit <= 0) {
1057615c371fSchristos bev_group_suspend_writing_(grp);
105893dcc084Schristos } else if (old_limit <= 0 && new_limit > 0) {
1059615c371fSchristos bev_group_unsuspend_writing_(grp);
106093dcc084Schristos }
106193dcc084Schristos
106293dcc084Schristos UNLOCK_GROUP(grp);
106393dcc084Schristos return r;
106493dcc084Schristos }
106593dcc084Schristos
106693dcc084Schristos void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)106793dcc084Schristos bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
106893dcc084Schristos ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
106993dcc084Schristos {
107093dcc084Schristos EVUTIL_ASSERT(grp != NULL);
107193dcc084Schristos if (total_read_out)
107293dcc084Schristos *total_read_out = grp->total_read;
107393dcc084Schristos if (total_written_out)
107493dcc084Schristos *total_written_out = grp->total_written;
107593dcc084Schristos }
107693dcc084Schristos
107793dcc084Schristos void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)107893dcc084Schristos bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
107993dcc084Schristos {
108093dcc084Schristos grp->total_read = grp->total_written = 0;
108193dcc084Schristos }
1082615c371fSchristos
1083615c371fSchristos int
bufferevent_ratelim_init_(struct bufferevent_private * bev)1084615c371fSchristos bufferevent_ratelim_init_(struct bufferevent_private *bev)
1085615c371fSchristos {
1086615c371fSchristos bev->rate_limiting = NULL;
1087615c371fSchristos bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1088615c371fSchristos bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1089615c371fSchristos
1090615c371fSchristos return 0;
1091615c371fSchristos }
1092