1*9034ec65Schristos /*	$NetBSD: bufferevent_ratelim.c,v 1.5 2020/05/25 20:47:33 christos Exp $	*/
22b3787f6Schristos 
32b3787f6Schristos /*
42b3787f6Schristos  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
52b3787f6Schristos  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
62b3787f6Schristos  * All rights reserved.
72b3787f6Schristos  *
82b3787f6Schristos  * Redistribution and use in source and binary forms, with or without
92b3787f6Schristos  * modification, are permitted provided that the following conditions
102b3787f6Schristos  * are met:
112b3787f6Schristos  * 1. Redistributions of source code must retain the above copyright
122b3787f6Schristos  *    notice, this list of conditions and the following disclaimer.
132b3787f6Schristos  * 2. Redistributions in binary form must reproduce the above copyright
142b3787f6Schristos  *    notice, this list of conditions and the following disclaimer in the
152b3787f6Schristos  *    documentation and/or other materials provided with the distribution.
162b3787f6Schristos  * 3. The name of the author may not be used to endorse or promote products
172b3787f6Schristos  *    derived from this software without specific prior written permission.
182b3787f6Schristos  *
192b3787f6Schristos  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
202b3787f6Schristos  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
212b3787f6Schristos  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
222b3787f6Schristos  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
232b3787f6Schristos  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
242b3787f6Schristos  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
252b3787f6Schristos  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
262b3787f6Schristos  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
272b3787f6Schristos  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
282b3787f6Schristos  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
292b3787f6Schristos  */
302b3787f6Schristos #include "evconfig-private.h"
312b3787f6Schristos 
322b3787f6Schristos #include <sys/types.h>
332b3787f6Schristos #include <limits.h>
342b3787f6Schristos #include <string.h>
352b3787f6Schristos #include <stdlib.h>
362b3787f6Schristos 
372b3787f6Schristos #include "event2/event.h"
382b3787f6Schristos #include "event2/event_struct.h"
392b3787f6Schristos #include "event2/util.h"
402b3787f6Schristos #include "event2/bufferevent.h"
412b3787f6Schristos #include "event2/bufferevent_struct.h"
422b3787f6Schristos #include "event2/buffer.h"
432b3787f6Schristos 
442b3787f6Schristos #include "ratelim-internal.h"
452b3787f6Schristos 
462b3787f6Schristos #include "bufferevent-internal.h"
472b3787f6Schristos #include "mm-internal.h"
482b3787f6Schristos #include "util-internal.h"
492b3787f6Schristos #include "event-internal.h"
502b3787f6Schristos 
512b3787f6Schristos int
ev_token_bucket_init_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)522b3787f6Schristos ev_token_bucket_init_(struct ev_token_bucket *bucket,
532b3787f6Schristos     const struct ev_token_bucket_cfg *cfg,
542b3787f6Schristos     ev_uint32_t current_tick,
552b3787f6Schristos     int reinitialize)
562b3787f6Schristos {
572b3787f6Schristos 	if (reinitialize) {
582b3787f6Schristos 		/* on reinitialization, we only clip downwards, since we've
592b3787f6Schristos 		   already used who-knows-how-much bandwidth this tick.  We
602b3787f6Schristos 		   leave "last_updated" as it is; the next update will add the
612b3787f6Schristos 		   appropriate amount of bandwidth to the bucket.
622b3787f6Schristos 		*/
632b3787f6Schristos 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
642b3787f6Schristos 			bucket->read_limit = cfg->read_maximum;
652b3787f6Schristos 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
662b3787f6Schristos 			bucket->write_limit = cfg->write_maximum;
672b3787f6Schristos 	} else {
682b3787f6Schristos 		bucket->read_limit = cfg->read_rate;
692b3787f6Schristos 		bucket->write_limit = cfg->write_rate;
702b3787f6Schristos 		bucket->last_updated = current_tick;
712b3787f6Schristos 	}
722b3787f6Schristos 	return 0;
732b3787f6Schristos }
742b3787f6Schristos 
752b3787f6Schristos int
ev_token_bucket_update_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)762b3787f6Schristos ev_token_bucket_update_(struct ev_token_bucket *bucket,
772b3787f6Schristos     const struct ev_token_bucket_cfg *cfg,
782b3787f6Schristos     ev_uint32_t current_tick)
792b3787f6Schristos {
802b3787f6Schristos 	/* It's okay if the tick number overflows, since we'll just
812b3787f6Schristos 	 * wrap around when we do the unsigned substraction. */
822b3787f6Schristos 	unsigned n_ticks = current_tick - bucket->last_updated;
832b3787f6Schristos 
842b3787f6Schristos 	/* Make sure some ticks actually happened, and that time didn't
852b3787f6Schristos 	 * roll back. */
862b3787f6Schristos 	if (n_ticks == 0 || n_ticks > INT_MAX)
872b3787f6Schristos 		return 0;
882b3787f6Schristos 
892b3787f6Schristos 	/* Naively, we would say
902b3787f6Schristos 		bucket->limit += n_ticks * cfg->rate;
912b3787f6Schristos 
922b3787f6Schristos 		if (bucket->limit > cfg->maximum)
932b3787f6Schristos 			bucket->limit = cfg->maximum;
942b3787f6Schristos 
952b3787f6Schristos 	   But we're worried about overflow, so we do it like this:
962b3787f6Schristos 	*/
972b3787f6Schristos 
982b3787f6Schristos 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
992b3787f6Schristos 		bucket->read_limit = cfg->read_maximum;
1002b3787f6Schristos 	else
1012b3787f6Schristos 		bucket->read_limit += n_ticks * cfg->read_rate;
1022b3787f6Schristos 
1032b3787f6Schristos 
1042b3787f6Schristos 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
1052b3787f6Schristos 		bucket->write_limit = cfg->write_maximum;
1062b3787f6Schristos 	else
1072b3787f6Schristos 		bucket->write_limit += n_ticks * cfg->write_rate;
1082b3787f6Schristos 
1092b3787f6Schristos 
1102b3787f6Schristos 	bucket->last_updated = current_tick;
1112b3787f6Schristos 
1122b3787f6Schristos 	return 1;
1132b3787f6Schristos }
1142b3787f6Schristos 
1152b3787f6Schristos static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)1162b3787f6Schristos bufferevent_update_buckets(struct bufferevent_private *bev)
1172b3787f6Schristos {
1182b3787f6Schristos 	/* Must hold lock on bev. */
1192b3787f6Schristos 	struct timeval now;
1202b3787f6Schristos 	unsigned tick;
1212b3787f6Schristos 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
1222b3787f6Schristos 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
1232b3787f6Schristos 	if (tick != bev->rate_limiting->limit.last_updated)
1242b3787f6Schristos 		ev_token_bucket_update_(&bev->rate_limiting->limit,
1252b3787f6Schristos 		    bev->rate_limiting->cfg, tick);
1262b3787f6Schristos }
1272b3787f6Schristos 
1282b3787f6Schristos ev_uint32_t
ev_token_bucket_get_tick_(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)1292b3787f6Schristos ev_token_bucket_get_tick_(const struct timeval *tv,
1302b3787f6Schristos     const struct ev_token_bucket_cfg *cfg)
1312b3787f6Schristos {
1322b3787f6Schristos 	/* This computation uses two multiplies and a divide.  We could do
1332b3787f6Schristos 	 * fewer if we knew that the tick length was an integer number of
1342b3787f6Schristos 	 * seconds, or if we knew it divided evenly into a second.  We should
1352b3787f6Schristos 	 * investigate that more.
1362b3787f6Schristos 	 */
1372b3787f6Schristos 
1382b3787f6Schristos 	/* We cast to an ev_uint64_t first, since we don't want to overflow
1392b3787f6Schristos 	 * before we do the final divide. */
1402b3787f6Schristos 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
1412b3787f6Schristos 	return (unsigned)(msec / cfg->msec_per_tick);
1422b3787f6Schristos }
1432b3787f6Schristos 
1442b3787f6Schristos struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)1452b3787f6Schristos ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
1462b3787f6Schristos     size_t write_rate, size_t write_burst,
1472b3787f6Schristos     const struct timeval *tick_len)
1482b3787f6Schristos {
1492b3787f6Schristos 	struct ev_token_bucket_cfg *r;
1502b3787f6Schristos 	struct timeval g;
1512b3787f6Schristos 	if (! tick_len) {
1522b3787f6Schristos 		g.tv_sec = 1;
1532b3787f6Schristos 		g.tv_usec = 0;
1542b3787f6Schristos 		tick_len = &g;
1552b3787f6Schristos 	}
1562b3787f6Schristos 	if (read_rate > read_burst || write_rate > write_burst ||
1572b3787f6Schristos 	    read_rate < 1 || write_rate < 1)
1582b3787f6Schristos 		return NULL;
1592b3787f6Schristos 	if (read_rate > EV_RATE_LIMIT_MAX ||
1602b3787f6Schristos 	    write_rate > EV_RATE_LIMIT_MAX ||
1612b3787f6Schristos 	    read_burst > EV_RATE_LIMIT_MAX ||
1622b3787f6Schristos 	    write_burst > EV_RATE_LIMIT_MAX)
1632b3787f6Schristos 		return NULL;
1642b3787f6Schristos 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
1652b3787f6Schristos 	if (!r)
1662b3787f6Schristos 		return NULL;
1672b3787f6Schristos 	r->read_rate = read_rate;
1682b3787f6Schristos 	r->write_rate = write_rate;
1692b3787f6Schristos 	r->read_maximum = read_burst;
1702b3787f6Schristos 	r->write_maximum = write_burst;
1712b3787f6Schristos 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
1722b3787f6Schristos 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
1732b3787f6Schristos 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
1742b3787f6Schristos 	return r;
1752b3787f6Schristos }
1762b3787f6Schristos 
1772b3787f6Schristos void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)1782b3787f6Schristos ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
1792b3787f6Schristos {
1802b3787f6Schristos 	mm_free(cfg);
1812b3787f6Schristos }
1822b3787f6Schristos 
1832b3787f6Schristos /* Default values for max_single_read & max_single_write variables. */
1842b3787f6Schristos #define MAX_SINGLE_READ_DEFAULT 16384
1852b3787f6Schristos #define MAX_SINGLE_WRITE_DEFAULT 16384
1862b3787f6Schristos 
1872b3787f6Schristos #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
1882b3787f6Schristos #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
1892b3787f6Schristos 
1902b3787f6Schristos static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
1912b3787f6Schristos static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
1922b3787f6Schristos static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
1932b3787f6Schristos static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
1942b3787f6Schristos 
1952b3787f6Schristos /** Helper: figure out the maximum amount we should write if is_write, or
1962b3787f6Schristos     the maximum amount we should read if is_read.  Return that maximum, or
1972b3787f6Schristos     0 if our bucket is wholly exhausted.
1982b3787f6Schristos  */
1992b3787f6Schristos static inline ev_ssize_t
bufferevent_get_rlim_max_(struct bufferevent_private * bev,int is_write)2002b3787f6Schristos bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
2012b3787f6Schristos {
2022b3787f6Schristos 	/* needs lock on bev. */
2032b3787f6Schristos 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
2042b3787f6Schristos 
2052b3787f6Schristos #define LIM(x)						\
2062b3787f6Schristos 	(is_write ? (x).write_limit : (x).read_limit)
2072b3787f6Schristos 
2082b3787f6Schristos #define GROUP_SUSPENDED(g)			\
2092b3787f6Schristos 	(is_write ? (g)->write_suspended : (g)->read_suspended)
2102b3787f6Schristos 
2112b3787f6Schristos 	/* Sets max_so_far to MIN(x, max_so_far) */
2122b3787f6Schristos #define CLAMPTO(x)				\
2132b3787f6Schristos 	do {					\
2142b3787f6Schristos 		if (max_so_far > (x))		\
2152b3787f6Schristos 			max_so_far = (x);	\
2162b3787f6Schristos 	} while (0);
2172b3787f6Schristos 
2182b3787f6Schristos 	if (!bev->rate_limiting)
2192b3787f6Schristos 		return max_so_far;
2202b3787f6Schristos 
2212b3787f6Schristos 	/* If rate-limiting is enabled at all, update the appropriate
2222b3787f6Schristos 	   bucket, and take the smaller of our rate limit and the group
2232b3787f6Schristos 	   rate limit.
2242b3787f6Schristos 	 */
2252b3787f6Schristos 
2262b3787f6Schristos 	if (bev->rate_limiting->cfg) {
2272b3787f6Schristos 		bufferevent_update_buckets(bev);
2282b3787f6Schristos 		max_so_far = LIM(bev->rate_limiting->limit);
2292b3787f6Schristos 	}
2302b3787f6Schristos 	if (bev->rate_limiting->group) {
2312b3787f6Schristos 		struct bufferevent_rate_limit_group *g =
2322b3787f6Schristos 		    bev->rate_limiting->group;
2332b3787f6Schristos 		ev_ssize_t share;
2342b3787f6Schristos 		LOCK_GROUP(g);
2352b3787f6Schristos 		if (GROUP_SUSPENDED(g)) {
2362b3787f6Schristos 			/* We can get here if we failed to lock this
2372b3787f6Schristos 			 * particular bufferevent while suspending the whole
2382b3787f6Schristos 			 * group. */
2392b3787f6Schristos 			if (is_write)
2402b3787f6Schristos 				bufferevent_suspend_write_(&bev->bev,
2412b3787f6Schristos 				    BEV_SUSPEND_BW_GROUP);
2422b3787f6Schristos 			else
2432b3787f6Schristos 				bufferevent_suspend_read_(&bev->bev,
2442b3787f6Schristos 				    BEV_SUSPEND_BW_GROUP);
2452b3787f6Schristos 			share = 0;
2462b3787f6Schristos 		} else {
2472b3787f6Schristos 			/* XXXX probably we should divide among the active
2482b3787f6Schristos 			 * members, not the total members. */
2492b3787f6Schristos 			share = LIM(g->rate_limit) / g->n_members;
2502b3787f6Schristos 			if (share < g->min_share)
2512b3787f6Schristos 				share = g->min_share;
2522b3787f6Schristos 		}
2532b3787f6Schristos 		UNLOCK_GROUP(g);
2542b3787f6Schristos 		CLAMPTO(share);
2552b3787f6Schristos 	}
2562b3787f6Schristos 
2572b3787f6Schristos 	if (max_so_far < 0)
2582b3787f6Schristos 		max_so_far = 0;
2592b3787f6Schristos 	return max_so_far;
2602b3787f6Schristos }
2612b3787f6Schristos 
2622b3787f6Schristos ev_ssize_t
bufferevent_get_read_max_(struct bufferevent_private * bev)2632b3787f6Schristos bufferevent_get_read_max_(struct bufferevent_private *bev)
2642b3787f6Schristos {
2652b3787f6Schristos 	return bufferevent_get_rlim_max_(bev, 0);
2662b3787f6Schristos }
2672b3787f6Schristos 
2682b3787f6Schristos ev_ssize_t
bufferevent_get_write_max_(struct bufferevent_private * bev)2692b3787f6Schristos bufferevent_get_write_max_(struct bufferevent_private *bev)
2702b3787f6Schristos {
2712b3787f6Schristos 	return bufferevent_get_rlim_max_(bev, 1);
2722b3787f6Schristos }
2732b3787f6Schristos 
2742b3787f6Schristos int
bufferevent_decrement_read_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)2752b3787f6Schristos bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
2762b3787f6Schristos {
2772b3787f6Schristos 	/* XXXXX Make sure all users of this function check its return value */
2782b3787f6Schristos 	int r = 0;
2792b3787f6Schristos 	/* need to hold lock on bev */
2802b3787f6Schristos 	if (!bev->rate_limiting)
2812b3787f6Schristos 		return 0;
2822b3787f6Schristos 
2832b3787f6Schristos 	if (bev->rate_limiting->cfg) {
2842b3787f6Schristos 		bev->rate_limiting->limit.read_limit -= bytes;
2852b3787f6Schristos 		if (bev->rate_limiting->limit.read_limit <= 0) {
2862b3787f6Schristos 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
2872b3787f6Schristos 			if (event_add(&bev->rate_limiting->refill_bucket_event,
2882b3787f6Schristos 				&bev->rate_limiting->cfg->tick_timeout) < 0)
2892b3787f6Schristos 				r = -1;
2902b3787f6Schristos 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
2912b3787f6Schristos 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
2922b3787f6Schristos 				event_del(&bev->rate_limiting->refill_bucket_event);
2932b3787f6Schristos 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
2942b3787f6Schristos 		}
2952b3787f6Schristos 	}
2962b3787f6Schristos 
2972b3787f6Schristos 	if (bev->rate_limiting->group) {
2982b3787f6Schristos 		LOCK_GROUP(bev->rate_limiting->group);
2992b3787f6Schristos 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
3002b3787f6Schristos 		bev->rate_limiting->group->total_read += bytes;
3012b3787f6Schristos 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
3022b3787f6Schristos 			bev_group_suspend_reading_(bev->rate_limiting->group);
3032b3787f6Schristos 		} else if (bev->rate_limiting->group->read_suspended) {
3042b3787f6Schristos 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
3052b3787f6Schristos 		}
3062b3787f6Schristos 		UNLOCK_GROUP(bev->rate_limiting->group);
3072b3787f6Schristos 	}
3082b3787f6Schristos 
3092b3787f6Schristos 	return r;
3102b3787f6Schristos }
3112b3787f6Schristos 
3122b3787f6Schristos int
bufferevent_decrement_write_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)3132b3787f6Schristos bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
3142b3787f6Schristos {
3152b3787f6Schristos 	/* XXXXX Make sure all users of this function check its return value */
3162b3787f6Schristos 	int r = 0;
3172b3787f6Schristos 	/* need to hold lock */
3182b3787f6Schristos 	if (!bev->rate_limiting)
3192b3787f6Schristos 		return 0;
3202b3787f6Schristos 
3212b3787f6Schristos 	if (bev->rate_limiting->cfg) {
3222b3787f6Schristos 		bev->rate_limiting->limit.write_limit -= bytes;
3232b3787f6Schristos 		if (bev->rate_limiting->limit.write_limit <= 0) {
3242b3787f6Schristos 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
3252b3787f6Schristos 			if (event_add(&bev->rate_limiting->refill_bucket_event,
3262b3787f6Schristos 				&bev->rate_limiting->cfg->tick_timeout) < 0)
3272b3787f6Schristos 				r = -1;
3282b3787f6Schristos 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
3292b3787f6Schristos 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
3302b3787f6Schristos 				event_del(&bev->rate_limiting->refill_bucket_event);
3312b3787f6Schristos 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
3322b3787f6Schristos 		}
3332b3787f6Schristos 	}
3342b3787f6Schristos 
3352b3787f6Schristos 	if (bev->rate_limiting->group) {
3362b3787f6Schristos 		LOCK_GROUP(bev->rate_limiting->group);
3372b3787f6Schristos 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
3382b3787f6Schristos 		bev->rate_limiting->group->total_written += bytes;
3392b3787f6Schristos 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
3402b3787f6Schristos 			bev_group_suspend_writing_(bev->rate_limiting->group);
3412b3787f6Schristos 		} else if (bev->rate_limiting->group->write_suspended) {
3422b3787f6Schristos 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
3432b3787f6Schristos 		}
3442b3787f6Schristos 		UNLOCK_GROUP(bev->rate_limiting->group);
3452b3787f6Schristos 	}
3462b3787f6Schristos 
3472b3787f6Schristos 	return r;
3482b3787f6Schristos }
3492b3787f6Schristos 
3502b3787f6Schristos /** Stop reading on every bufferevent in <b>g</b> */
3512b3787f6Schristos static int
bev_group_suspend_reading_(struct bufferevent_rate_limit_group * g)3522b3787f6Schristos bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
3532b3787f6Schristos {
3542b3787f6Schristos 	/* Needs group lock */
3552b3787f6Schristos 	struct bufferevent_private *bev;
3562b3787f6Schristos 	g->read_suspended = 1;
3572b3787f6Schristos 	g->pending_unsuspend_read = 0;
3582b3787f6Schristos 
3592b3787f6Schristos 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
3602b3787f6Schristos 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
3612b3787f6Schristos 	   the bufferevent locks.  If we are unable to lock any individual
3622b3787f6Schristos 	   bufferevent, it will find out later when it looks at its limit
3632b3787f6Schristos 	   and sees that its group is suspended.)
3642b3787f6Schristos 	*/
3652b3787f6Schristos 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
3662b3787f6Schristos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
3672b3787f6Schristos 			bufferevent_suspend_read_(&bev->bev,
3682b3787f6Schristos 			    BEV_SUSPEND_BW_GROUP);
3692b3787f6Schristos 			EVLOCK_UNLOCK(bev->lock, 0);
3702b3787f6Schristos 		}
3712b3787f6Schristos 	}
3722b3787f6Schristos 	return 0;
3732b3787f6Schristos }
3742b3787f6Schristos 
3752b3787f6Schristos /** Stop writing on every bufferevent in <b>g</b> */
3762b3787f6Schristos static int
bev_group_suspend_writing_(struct bufferevent_rate_limit_group * g)3772b3787f6Schristos bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
3782b3787f6Schristos {
3792b3787f6Schristos 	/* Needs group lock */
3802b3787f6Schristos 	struct bufferevent_private *bev;
3812b3787f6Schristos 	g->write_suspended = 1;
3822b3787f6Schristos 	g->pending_unsuspend_write = 0;
3832b3787f6Schristos 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
3842b3787f6Schristos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
3852b3787f6Schristos 			bufferevent_suspend_write_(&bev->bev,
3862b3787f6Schristos 			    BEV_SUSPEND_BW_GROUP);
3872b3787f6Schristos 			EVLOCK_UNLOCK(bev->lock, 0);
3882b3787f6Schristos 		}
3892b3787f6Schristos 	}
3902b3787f6Schristos 	return 0;
3912b3787f6Schristos }
3922b3787f6Schristos 
3932b3787f6Schristos /** Timer callback invoked on a single bufferevent with one or more exhausted
3942b3787f6Schristos     buckets when they are ready to refill. */
3952b3787f6Schristos static void
bev_refill_callback_(evutil_socket_t fd,short what,void * arg)3962b3787f6Schristos bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
3972b3787f6Schristos {
3982b3787f6Schristos 	unsigned tick;
3992b3787f6Schristos 	struct timeval now;
4002b3787f6Schristos 	struct bufferevent_private *bev = arg;
4012b3787f6Schristos 	int again = 0;
4022b3787f6Schristos 	BEV_LOCK(&bev->bev);
4032b3787f6Schristos 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
4042b3787f6Schristos 		BEV_UNLOCK(&bev->bev);
4052b3787f6Schristos 		return;
4062b3787f6Schristos 	}
4072b3787f6Schristos 
4082b3787f6Schristos 	/* First, update the bucket */
4092b3787f6Schristos 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
4102b3787f6Schristos 	tick = ev_token_bucket_get_tick_(&now,
4112b3787f6Schristos 	    bev->rate_limiting->cfg);
4122b3787f6Schristos 	ev_token_bucket_update_(&bev->rate_limiting->limit,
4132b3787f6Schristos 	    bev->rate_limiting->cfg,
4142b3787f6Schristos 	    tick);
4152b3787f6Schristos 
4162b3787f6Schristos 	/* Now unsuspend any read/write operations as appropriate. */
4172b3787f6Schristos 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
4182b3787f6Schristos 		if (bev->rate_limiting->limit.read_limit > 0)
4192b3787f6Schristos 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
4202b3787f6Schristos 		else
4212b3787f6Schristos 			again = 1;
4222b3787f6Schristos 	}
4232b3787f6Schristos 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
4242b3787f6Schristos 		if (bev->rate_limiting->limit.write_limit > 0)
4252b3787f6Schristos 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
4262b3787f6Schristos 		else
4272b3787f6Schristos 			again = 1;
4282b3787f6Schristos 	}
4292b3787f6Schristos 	if (again) {
4302b3787f6Schristos 		/* One or more of the buckets may need another refill if they
4312b3787f6Schristos 		   started negative.
4322b3787f6Schristos 
4332b3787f6Schristos 		   XXXX if we need to be quiet for more ticks, we should
4342b3787f6Schristos 		   maybe figure out what timeout we really want.
4352b3787f6Schristos 		*/
4362b3787f6Schristos 		/* XXXX Handle event_add failure somehow */
4372b3787f6Schristos 		event_add(&bev->rate_limiting->refill_bucket_event,
4382b3787f6Schristos 		    &bev->rate_limiting->cfg->tick_timeout);
4392b3787f6Schristos 	}
4402b3787f6Schristos 	BEV_UNLOCK(&bev->bev);
4412b3787f6Schristos }
4422b3787f6Schristos 
4432b3787f6Schristos /** Helper: grab a random element from a bufferevent group.
4442b3787f6Schristos  *
4452b3787f6Schristos  * Requires that we hold the lock on the group.
4462b3787f6Schristos  */
4472b3787f6Schristos static struct bufferevent_private *
bev_group_random_element_(struct bufferevent_rate_limit_group * group)4482b3787f6Schristos bev_group_random_element_(struct bufferevent_rate_limit_group *group)
4492b3787f6Schristos {
4502b3787f6Schristos 	int which;
4512b3787f6Schristos 	struct bufferevent_private *bev;
4522b3787f6Schristos 
4532b3787f6Schristos 	/* requires group lock */
4542b3787f6Schristos 
4552b3787f6Schristos 	if (!group->n_members)
4562b3787f6Schristos 		return NULL;
4572b3787f6Schristos 
4582b3787f6Schristos 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
4592b3787f6Schristos 
4602b3787f6Schristos 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
4612b3787f6Schristos 
4622b3787f6Schristos 	bev = LIST_FIRST(&group->members);
4632b3787f6Schristos 	while (which--)
4642b3787f6Schristos 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
4652b3787f6Schristos 
4662b3787f6Schristos 	return bev;
4672b3787f6Schristos }
4682b3787f6Schristos 
4692b3787f6Schristos /** Iterate over the elements of a rate-limiting group 'g' with a random
4702b3787f6Schristos     starting point, assigning each to the variable 'bev', and executing the
4712b3787f6Schristos     block 'block'.
4722b3787f6Schristos 
4732b3787f6Schristos     We do this in a half-baked effort to get fairness among group members.
4742b3787f6Schristos     XXX Round-robin or some kind of priority queue would be even more fair.
4752b3787f6Schristos  */
4762b3787f6Schristos #define FOREACH_RANDOM_ORDER(block)			\
4772b3787f6Schristos 	do {						\
4782b3787f6Schristos 		first = bev_group_random_element_(g);	\
4792b3787f6Schristos 		for (bev = first; bev != LIST_END(&g->members); \
4802b3787f6Schristos 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
4812b3787f6Schristos 			block ;					 \
4822b3787f6Schristos 		}						 \
4832b3787f6Schristos 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
4842b3787f6Schristos 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
4852b3787f6Schristos 			block ;						\
4862b3787f6Schristos 		}							\
4872b3787f6Schristos 	} while (0)
4882b3787f6Schristos 
4892b3787f6Schristos static void
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group * g)4902b3787f6Schristos bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
4912b3787f6Schristos {
4922b3787f6Schristos 	int again = 0;
4932b3787f6Schristos 	struct bufferevent_private *bev, *first;
4942b3787f6Schristos 
4952b3787f6Schristos 	g->read_suspended = 0;
4962b3787f6Schristos 	FOREACH_RANDOM_ORDER({
4972b3787f6Schristos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
4982b3787f6Schristos 			bufferevent_unsuspend_read_(&bev->bev,
4992b3787f6Schristos 			    BEV_SUSPEND_BW_GROUP);
5002b3787f6Schristos 			EVLOCK_UNLOCK(bev->lock, 0);
5012b3787f6Schristos 		} else {
5022b3787f6Schristos 			again = 1;
5032b3787f6Schristos 		}
5042b3787f6Schristos 	});
5052b3787f6Schristos 	g->pending_unsuspend_read = again;
5062b3787f6Schristos }
5072b3787f6Schristos 
5082b3787f6Schristos static void
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group * g)5092b3787f6Schristos bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
5102b3787f6Schristos {
5112b3787f6Schristos 	int again = 0;
5122b3787f6Schristos 	struct bufferevent_private *bev, *first;
5132b3787f6Schristos 	g->write_suspended = 0;
5142b3787f6Schristos 
5152b3787f6Schristos 	FOREACH_RANDOM_ORDER({
5162b3787f6Schristos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
5172b3787f6Schristos 			bufferevent_unsuspend_write_(&bev->bev,
5182b3787f6Schristos 			    BEV_SUSPEND_BW_GROUP);
5192b3787f6Schristos 			EVLOCK_UNLOCK(bev->lock, 0);
5202b3787f6Schristos 		} else {
5212b3787f6Schristos 			again = 1;
5222b3787f6Schristos 		}
5232b3787f6Schristos 	});
5242b3787f6Schristos 	g->pending_unsuspend_write = again;
5252b3787f6Schristos }
5262b3787f6Schristos 
5272b3787f6Schristos /** Callback invoked every tick to add more elements to the group bucket
5282b3787f6Schristos     and unsuspend group members as needed.
5292b3787f6Schristos  */
5302b3787f6Schristos static void
bev_group_refill_callback_(evutil_socket_t fd,short what,void * arg)5312b3787f6Schristos bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
5322b3787f6Schristos {
5332b3787f6Schristos 	struct bufferevent_rate_limit_group *g = arg;
5342b3787f6Schristos 	unsigned tick;
5352b3787f6Schristos 	struct timeval now;
5362b3787f6Schristos 
5372b3787f6Schristos 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
5382b3787f6Schristos 
5392b3787f6Schristos 	LOCK_GROUP(g);
5402b3787f6Schristos 
5412b3787f6Schristos 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
5422b3787f6Schristos 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
5432b3787f6Schristos 
5442b3787f6Schristos 	if (g->pending_unsuspend_read ||
5452b3787f6Schristos 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
5462b3787f6Schristos 		bev_group_unsuspend_reading_(g);
5472b3787f6Schristos 	}
5482b3787f6Schristos 	if (g->pending_unsuspend_write ||
5492b3787f6Schristos 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
5502b3787f6Schristos 		bev_group_unsuspend_writing_(g);
5512b3787f6Schristos 	}
5522b3787f6Schristos 
5532b3787f6Schristos 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
5542b3787f6Schristos 	 * with pending_unsuspend_write/read, we should do it on the
5552b3787f6Schristos 	 * next iteration of the mainloop.
5562b3787f6Schristos 	 */
5572b3787f6Schristos 
5582b3787f6Schristos 	UNLOCK_GROUP(g);
5592b3787f6Schristos }
5602b3787f6Schristos 
5612b3787f6Schristos int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)5622b3787f6Schristos bufferevent_set_rate_limit(struct bufferevent *bev,
5632b3787f6Schristos     struct ev_token_bucket_cfg *cfg)
5642b3787f6Schristos {
5652b3787f6Schristos 	struct bufferevent_private *bevp =
5662b3787f6Schristos 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
5672b3787f6Schristos 	int r = -1;
5682b3787f6Schristos 	struct bufferevent_rate_limit *rlim;
5692b3787f6Schristos 	struct timeval now;
5702b3787f6Schristos 	ev_uint32_t tick;
5712b3787f6Schristos 	int reinit = 0, suspended = 0;
5722b3787f6Schristos 	/* XXX reference-count cfg */
5732b3787f6Schristos 
5742b3787f6Schristos 	BEV_LOCK(bev);
5752b3787f6Schristos 
5762b3787f6Schristos 	if (cfg == NULL) {
5772b3787f6Schristos 		if (bevp->rate_limiting) {
5782b3787f6Schristos 			rlim = bevp->rate_limiting;
5792b3787f6Schristos 			rlim->cfg = NULL;
5802b3787f6Schristos 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
5812b3787f6Schristos 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
5822b3787f6Schristos 			if (event_initialized(&rlim->refill_bucket_event))
5832b3787f6Schristos 				event_del(&rlim->refill_bucket_event);
5842b3787f6Schristos 		}
5852b3787f6Schristos 		r = 0;
5862b3787f6Schristos 		goto done;
5872b3787f6Schristos 	}
5882b3787f6Schristos 
5892b3787f6Schristos 	event_base_gettimeofday_cached(bev->ev_base, &now);
5902b3787f6Schristos 	tick = ev_token_bucket_get_tick_(&now, cfg);
5912b3787f6Schristos 
5922b3787f6Schristos 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
5932b3787f6Schristos 		/* no-op */
5942b3787f6Schristos 		r = 0;
5952b3787f6Schristos 		goto done;
5962b3787f6Schristos 	}
5972b3787f6Schristos 	if (bevp->rate_limiting == NULL) {
5982b3787f6Schristos 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
5992b3787f6Schristos 		if (!rlim)
6002b3787f6Schristos 			goto done;
6012b3787f6Schristos 		bevp->rate_limiting = rlim;
6022b3787f6Schristos 	} else {
6032b3787f6Schristos 		rlim = bevp->rate_limiting;
6042b3787f6Schristos 	}
6052b3787f6Schristos 	reinit = rlim->cfg != NULL;
6062b3787f6Schristos 
6072b3787f6Schristos 	rlim->cfg = cfg;
6082b3787f6Schristos 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
6092b3787f6Schristos 
6102b3787f6Schristos 	if (reinit) {
6112b3787f6Schristos 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
6122b3787f6Schristos 		event_del(&rlim->refill_bucket_event);
6132b3787f6Schristos 	}
6141b6f2cd4Schristos 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
6151b6f2cd4Schristos 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
6162b3787f6Schristos 
6172b3787f6Schristos 	if (rlim->limit.read_limit > 0) {
6182b3787f6Schristos 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
6192b3787f6Schristos 	} else {
6202b3787f6Schristos 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
6212b3787f6Schristos 		suspended=1;
6222b3787f6Schristos 	}
6232b3787f6Schristos 	if (rlim->limit.write_limit > 0) {
6242b3787f6Schristos 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
6252b3787f6Schristos 	} else {
6262b3787f6Schristos 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
6272b3787f6Schristos 		suspended = 1;
6282b3787f6Schristos 	}
6292b3787f6Schristos 
6302b3787f6Schristos 	if (suspended)
6312b3787f6Schristos 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
6322b3787f6Schristos 
6332b3787f6Schristos 	r = 0;
6342b3787f6Schristos 
6352b3787f6Schristos done:
6362b3787f6Schristos 	BEV_UNLOCK(bev);
6372b3787f6Schristos 	return r;
6382b3787f6Schristos }
6392b3787f6Schristos 
6402b3787f6Schristos struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)6412b3787f6Schristos bufferevent_rate_limit_group_new(struct event_base *base,
6422b3787f6Schristos     const struct ev_token_bucket_cfg *cfg)
6432b3787f6Schristos {
6442b3787f6Schristos 	struct bufferevent_rate_limit_group *g;
6452b3787f6Schristos 	struct timeval now;
6462b3787f6Schristos 	ev_uint32_t tick;
6472b3787f6Schristos 
6482b3787f6Schristos 	event_base_gettimeofday_cached(base, &now);
6492b3787f6Schristos 	tick = ev_token_bucket_get_tick_(&now, cfg);
6502b3787f6Schristos 
6512b3787f6Schristos 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
6522b3787f6Schristos 	if (!g)
6532b3787f6Schristos 		return NULL;
6542b3787f6Schristos 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
6552b3787f6Schristos 	LIST_INIT(&g->members);
6562b3787f6Schristos 
6572b3787f6Schristos 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
6582b3787f6Schristos 
6591b6f2cd4Schristos 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
6602b3787f6Schristos 	    bev_group_refill_callback_, g);
6612b3787f6Schristos 	/*XXXX handle event_add failure */
6622b3787f6Schristos 	event_add(&g->master_refill_event, &cfg->tick_timeout);
6632b3787f6Schristos 
6642b3787f6Schristos 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
6652b3787f6Schristos 
6662b3787f6Schristos 	bufferevent_rate_limit_group_set_min_share(g, 64);
6672b3787f6Schristos 
6682b3787f6Schristos 	evutil_weakrand_seed_(&g->weakrand_seed,
6692b3787f6Schristos 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
6702b3787f6Schristos 
6712b3787f6Schristos 	return g;
6722b3787f6Schristos }
6732b3787f6Schristos 
6742b3787f6Schristos int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)6752b3787f6Schristos bufferevent_rate_limit_group_set_cfg(
6762b3787f6Schristos 	struct bufferevent_rate_limit_group *g,
6772b3787f6Schristos 	const struct ev_token_bucket_cfg *cfg)
6782b3787f6Schristos {
6792b3787f6Schristos 	int same_tick;
6802b3787f6Schristos 	if (!g || !cfg)
6812b3787f6Schristos 		return -1;
6822b3787f6Schristos 
6832b3787f6Schristos 	LOCK_GROUP(g);
6842b3787f6Schristos 	same_tick = evutil_timercmp(
6852b3787f6Schristos 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
6862b3787f6Schristos 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
6872b3787f6Schristos 
6882b3787f6Schristos 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
6892b3787f6Schristos 		g->rate_limit.read_limit = cfg->read_maximum;
6902b3787f6Schristos 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
6912b3787f6Schristos 		g->rate_limit.write_limit = cfg->write_maximum;
6922b3787f6Schristos 
6932b3787f6Schristos 	if (!same_tick) {
6942b3787f6Schristos 		/* This can cause a hiccup in the schedule */
6952b3787f6Schristos 		event_add(&g->master_refill_event, &cfg->tick_timeout);
6962b3787f6Schristos 	}
6972b3787f6Schristos 
6982b3787f6Schristos 	/* The new limits might force us to adjust min_share differently. */
6992b3787f6Schristos 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
7002b3787f6Schristos 
7012b3787f6Schristos 	UNLOCK_GROUP(g);
7022b3787f6Schristos 	return 0;
7032b3787f6Schristos }
7042b3787f6Schristos 
7052b3787f6Schristos int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)7062b3787f6Schristos bufferevent_rate_limit_group_set_min_share(
7072b3787f6Schristos 	struct bufferevent_rate_limit_group *g,
7082b3787f6Schristos 	size_t share)
7092b3787f6Schristos {
7102b3787f6Schristos 	if (share > EV_SSIZE_MAX)
7112b3787f6Schristos 		return -1;
7122b3787f6Schristos 
7132b3787f6Schristos 	g->configured_min_share = share;
7142b3787f6Schristos 
7152b3787f6Schristos 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
7162b3787f6Schristos 	 * state, at least one connection can go per tick. */
7172b3787f6Schristos 	if (share > g->rate_limit_cfg.read_rate)
7182b3787f6Schristos 		share = g->rate_limit_cfg.read_rate;
7192b3787f6Schristos 	if (share > g->rate_limit_cfg.write_rate)
7202b3787f6Schristos 		share = g->rate_limit_cfg.write_rate;
7212b3787f6Schristos 
7222b3787f6Schristos 	g->min_share = share;
7232b3787f6Schristos 	return 0;
7242b3787f6Schristos }
7252b3787f6Schristos 
7262b3787f6Schristos void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)7272b3787f6Schristos bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
7282b3787f6Schristos {
7292b3787f6Schristos 	LOCK_GROUP(g);
7302b3787f6Schristos 	EVUTIL_ASSERT(0 == g->n_members);
7312b3787f6Schristos 	event_del(&g->master_refill_event);
7322b3787f6Schristos 	UNLOCK_GROUP(g);
7332b3787f6Schristos 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
7342b3787f6Schristos 	mm_free(g);
7352b3787f6Schristos }
7362b3787f6Schristos 
7372b3787f6Schristos int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)7382b3787f6Schristos bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
7392b3787f6Schristos     struct bufferevent_rate_limit_group *g)
7402b3787f6Schristos {
7412b3787f6Schristos 	int wsuspend, rsuspend;
7422b3787f6Schristos 	struct bufferevent_private *bevp =
7432b3787f6Schristos 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
7442b3787f6Schristos 	BEV_LOCK(bev);
7452b3787f6Schristos 
7462b3787f6Schristos 	if (!bevp->rate_limiting) {
7472b3787f6Schristos 		struct bufferevent_rate_limit *rlim;
7482b3787f6Schristos 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
7492b3787f6Schristos 		if (!rlim) {
7502b3787f6Schristos 			BEV_UNLOCK(bev);
7512b3787f6Schristos 			return -1;
7522b3787f6Schristos 		}
7531b6f2cd4Schristos 		event_assign(&rlim->refill_bucket_event, bev->ev_base,
7541b6f2cd4Schristos 		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
7552b3787f6Schristos 		bevp->rate_limiting = rlim;
7562b3787f6Schristos 	}
7572b3787f6Schristos 
7582b3787f6Schristos 	if (bevp->rate_limiting->group == g) {
7592b3787f6Schristos 		BEV_UNLOCK(bev);
7602b3787f6Schristos 		return 0;
7612b3787f6Schristos 	}
7622b3787f6Schristos 	if (bevp->rate_limiting->group)
7632b3787f6Schristos 		bufferevent_remove_from_rate_limit_group(bev);
7642b3787f6Schristos 
7652b3787f6Schristos 	LOCK_GROUP(g);
7662b3787f6Schristos 	bevp->rate_limiting->group = g;
7672b3787f6Schristos 	++g->n_members;
7682b3787f6Schristos 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
7692b3787f6Schristos 
7702b3787f6Schristos 	rsuspend = g->read_suspended;
7712b3787f6Schristos 	wsuspend = g->write_suspended;
7722b3787f6Schristos 
7732b3787f6Schristos 	UNLOCK_GROUP(g);
7742b3787f6Schristos 
7752b3787f6Schristos 	if (rsuspend)
7762b3787f6Schristos 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
7772b3787f6Schristos 	if (wsuspend)
7782b3787f6Schristos 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
7792b3787f6Schristos 
7802b3787f6Schristos 	BEV_UNLOCK(bev);
7812b3787f6Schristos 	return 0;
7822b3787f6Schristos }
7832b3787f6Schristos 
7842b3787f6Schristos int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)7852b3787f6Schristos bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
7862b3787f6Schristos {
7872b3787f6Schristos 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
7882b3787f6Schristos }
7892b3787f6Schristos 
7902b3787f6Schristos int
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent * bev,int unsuspend)7912b3787f6Schristos bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
7922b3787f6Schristos     int unsuspend)
7932b3787f6Schristos {
7942b3787f6Schristos 	struct bufferevent_private *bevp =
7952b3787f6Schristos 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
7962b3787f6Schristos 	BEV_LOCK(bev);
7972b3787f6Schristos 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
7982b3787f6Schristos 		struct bufferevent_rate_limit_group *g =
7992b3787f6Schristos 		    bevp->rate_limiting->group;
8002b3787f6Schristos 		LOCK_GROUP(g);
8012b3787f6Schristos 		bevp->rate_limiting->group = NULL;
8022b3787f6Schristos 		--g->n_members;
8032b3787f6Schristos 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
8042b3787f6Schristos 		UNLOCK_GROUP(g);
8052b3787f6Schristos 	}
8062b3787f6Schristos 	if (unsuspend) {
8072b3787f6Schristos 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
8082b3787f6Schristos 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
8092b3787f6Schristos 	}
8102b3787f6Schristos 	BEV_UNLOCK(bev);
8112b3787f6Schristos 	return 0;
8122b3787f6Schristos }
8132b3787f6Schristos 
8142b3787f6Schristos /* ===
8152b3787f6Schristos  * API functions to expose rate limits.
8162b3787f6Schristos  *
8172b3787f6Schristos  * Don't use these from inside Libevent; they're meant to be for use by
8182b3787f6Schristos  * the program.
8192b3787f6Schristos  * === */
8202b3787f6Schristos 
8212b3787f6Schristos /* Mostly you don't want to use this function from inside libevent;
8222b3787f6Schristos  * bufferevent_get_read_max_() is more likely what you want*/
8232b3787f6Schristos ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)8242b3787f6Schristos bufferevent_get_read_limit(struct bufferevent *bev)
8252b3787f6Schristos {
8262b3787f6Schristos 	ev_ssize_t r;
8272b3787f6Schristos 	struct bufferevent_private *bevp;
8282b3787f6Schristos 	BEV_LOCK(bev);
8292b3787f6Schristos 	bevp = BEV_UPCAST(bev);
8302b3787f6Schristos 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
8312b3787f6Schristos 		bufferevent_update_buckets(bevp);
8322b3787f6Schristos 		r = bevp->rate_limiting->limit.read_limit;
8332b3787f6Schristos 	} else {
8342b3787f6Schristos 		r = EV_SSIZE_MAX;
8352b3787f6Schristos 	}
8362b3787f6Schristos 	BEV_UNLOCK(bev);
8372b3787f6Schristos 	return r;
8382b3787f6Schristos }
8392b3787f6Schristos 
8402b3787f6Schristos /* Mostly you don't want to use this function from inside libevent;
8412b3787f6Schristos  * bufferevent_get_write_max_() is more likely what you want*/
8422b3787f6Schristos ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)8432b3787f6Schristos bufferevent_get_write_limit(struct bufferevent *bev)
8442b3787f6Schristos {
8452b3787f6Schristos 	ev_ssize_t r;
8462b3787f6Schristos 	struct bufferevent_private *bevp;
8472b3787f6Schristos 	BEV_LOCK(bev);
8482b3787f6Schristos 	bevp = BEV_UPCAST(bev);
8492b3787f6Schristos 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
8502b3787f6Schristos 		bufferevent_update_buckets(bevp);
8512b3787f6Schristos 		r = bevp->rate_limiting->limit.write_limit;
8522b3787f6Schristos 	} else {
8532b3787f6Schristos 		r = EV_SSIZE_MAX;
8542b3787f6Schristos 	}
8552b3787f6Schristos 	BEV_UNLOCK(bev);
8562b3787f6Schristos 	return r;
8572b3787f6Schristos }
8582b3787f6Schristos 
8592b3787f6Schristos int
bufferevent_set_max_single_read(struct bufferevent * bev,size_t size)8602b3787f6Schristos bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
8612b3787f6Schristos {
8622b3787f6Schristos 	struct bufferevent_private *bevp;
8632b3787f6Schristos 	BEV_LOCK(bev);
8642b3787f6Schristos 	bevp = BEV_UPCAST(bev);
8652b3787f6Schristos 	if (size == 0 || size > EV_SSIZE_MAX)
8662b3787f6Schristos 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
8672b3787f6Schristos 	else
8682b3787f6Schristos 		bevp->max_single_read = size;
8692b3787f6Schristos 	BEV_UNLOCK(bev);
8702b3787f6Schristos 	return 0;
8712b3787f6Schristos }
8722b3787f6Schristos 
8732b3787f6Schristos int
bufferevent_set_max_single_write(struct bufferevent * bev,size_t size)8742b3787f6Schristos bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
8752b3787f6Schristos {
8762b3787f6Schristos 	struct bufferevent_private *bevp;
8772b3787f6Schristos 	BEV_LOCK(bev);
8782b3787f6Schristos 	bevp = BEV_UPCAST(bev);
8792b3787f6Schristos 	if (size == 0 || size > EV_SSIZE_MAX)
8802b3787f6Schristos 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
8812b3787f6Schristos 	else
8822b3787f6Schristos 		bevp->max_single_write = size;
8832b3787f6Schristos 	BEV_UNLOCK(bev);
8842b3787f6Schristos 	return 0;
8852b3787f6Schristos }
8862b3787f6Schristos 
8872b3787f6Schristos ev_ssize_t
bufferevent_get_max_single_read(struct bufferevent * bev)8882b3787f6Schristos bufferevent_get_max_single_read(struct bufferevent *bev)
8892b3787f6Schristos {
8902b3787f6Schristos 	ev_ssize_t r;
8912b3787f6Schristos 
8922b3787f6Schristos 	BEV_LOCK(bev);
8932b3787f6Schristos 	r = BEV_UPCAST(bev)->max_single_read;
8942b3787f6Schristos 	BEV_UNLOCK(bev);
8952b3787f6Schristos 	return r;
8962b3787f6Schristos }
8972b3787f6Schristos 
8982b3787f6Schristos ev_ssize_t
bufferevent_get_max_single_write(struct bufferevent * bev)8992b3787f6Schristos bufferevent_get_max_single_write(struct bufferevent *bev)
9002b3787f6Schristos {
9012b3787f6Schristos 	ev_ssize_t r;
9022b3787f6Schristos 
9032b3787f6Schristos 	BEV_LOCK(bev);
9042b3787f6Schristos 	r = BEV_UPCAST(bev)->max_single_write;
9052b3787f6Schristos 	BEV_UNLOCK(bev);
9062b3787f6Schristos 	return r;
9072b3787f6Schristos }
9082b3787f6Schristos 
9092b3787f6Schristos ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)9102b3787f6Schristos bufferevent_get_max_to_read(struct bufferevent *bev)
9112b3787f6Schristos {
9122b3787f6Schristos 	ev_ssize_t r;
9132b3787f6Schristos 	BEV_LOCK(bev);
9142b3787f6Schristos 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
9152b3787f6Schristos 	BEV_UNLOCK(bev);
9162b3787f6Schristos 	return r;
9172b3787f6Schristos }
9182b3787f6Schristos 
9192b3787f6Schristos ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)9202b3787f6Schristos bufferevent_get_max_to_write(struct bufferevent *bev)
9212b3787f6Schristos {
9222b3787f6Schristos 	ev_ssize_t r;
9232b3787f6Schristos 	BEV_LOCK(bev);
9242b3787f6Schristos 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
9252b3787f6Schristos 	BEV_UNLOCK(bev);
9262b3787f6Schristos 	return r;
9272b3787f6Schristos }
9282b3787f6Schristos 
9291b6f2cd4Schristos const struct ev_token_bucket_cfg *
bufferevent_get_token_bucket_cfg(const struct bufferevent * bev)9301b6f2cd4Schristos bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
9311b6f2cd4Schristos 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
9321b6f2cd4Schristos 	struct ev_token_bucket_cfg *cfg;
9331b6f2cd4Schristos 
9341b6f2cd4Schristos 	BEV_LOCK(bev);
9351b6f2cd4Schristos 
9361b6f2cd4Schristos 	if (bufev_private->rate_limiting) {
9371b6f2cd4Schristos 		cfg = bufev_private->rate_limiting->cfg;
9381b6f2cd4Schristos 	} else {
9391b6f2cd4Schristos 		cfg = NULL;
9401b6f2cd4Schristos 	}
9411b6f2cd4Schristos 
9421b6f2cd4Schristos 	BEV_UNLOCK(bev);
9431b6f2cd4Schristos 
9441b6f2cd4Schristos 	return cfg;
9451b6f2cd4Schristos }
9462b3787f6Schristos 
9472b3787f6Schristos /* Mostly you don't want to use this function from inside libevent;
9482b3787f6Schristos  * bufferevent_get_read_max_() is more likely what you want*/
9492b3787f6Schristos ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)9502b3787f6Schristos bufferevent_rate_limit_group_get_read_limit(
9512b3787f6Schristos 	struct bufferevent_rate_limit_group *grp)
9522b3787f6Schristos {
9532b3787f6Schristos 	ev_ssize_t r;
9542b3787f6Schristos 	LOCK_GROUP(grp);
9552b3787f6Schristos 	r = grp->rate_limit.read_limit;
9562b3787f6Schristos 	UNLOCK_GROUP(grp);
9572b3787f6Schristos 	return r;
9582b3787f6Schristos }
9592b3787f6Schristos 
9602b3787f6Schristos /* Mostly you don't want to use this function from inside libevent;
9612b3787f6Schristos  * bufferevent_get_write_max_() is more likely what you want. */
9622b3787f6Schristos ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)9632b3787f6Schristos bufferevent_rate_limit_group_get_write_limit(
9642b3787f6Schristos 	struct bufferevent_rate_limit_group *grp)
9652b3787f6Schristos {
9662b3787f6Schristos 	ev_ssize_t r;
9672b3787f6Schristos 	LOCK_GROUP(grp);
9682b3787f6Schristos 	r = grp->rate_limit.write_limit;
9692b3787f6Schristos 	UNLOCK_GROUP(grp);
9702b3787f6Schristos 	return r;
9712b3787f6Schristos }
9722b3787f6Schristos 
9732b3787f6Schristos int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)9742b3787f6Schristos bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
9752b3787f6Schristos {
9762b3787f6Schristos 	int r = 0;
9772b3787f6Schristos 	ev_ssize_t old_limit, new_limit;
9782b3787f6Schristos 	struct bufferevent_private *bevp;
9792b3787f6Schristos 	BEV_LOCK(bev);
9802b3787f6Schristos 	bevp = BEV_UPCAST(bev);
9812b3787f6Schristos 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
9822b3787f6Schristos 	old_limit = bevp->rate_limiting->limit.read_limit;
9832b3787f6Schristos 
9842b3787f6Schristos 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
9852b3787f6Schristos 	if (old_limit > 0 && new_limit <= 0) {
9862b3787f6Schristos 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
9872b3787f6Schristos 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
9882b3787f6Schristos 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
9892b3787f6Schristos 			r = -1;
9902b3787f6Schristos 	} else if (old_limit <= 0 && new_limit > 0) {
9912b3787f6Schristos 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
9922b3787f6Schristos 			event_del(&bevp->rate_limiting->refill_bucket_event);
9932b3787f6Schristos 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
9942b3787f6Schristos 	}
9952b3787f6Schristos 
9962b3787f6Schristos 	BEV_UNLOCK(bev);
9972b3787f6Schristos 	return r;
9982b3787f6Schristos }
9992b3787f6Schristos 
10002b3787f6Schristos int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)10012b3787f6Schristos bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
10022b3787f6Schristos {
10032b3787f6Schristos 	/* XXXX this is mostly copy-and-paste from
10042b3787f6Schristos 	 * bufferevent_decrement_read_limit */
10052b3787f6Schristos 	int r = 0;
10062b3787f6Schristos 	ev_ssize_t old_limit, new_limit;
10072b3787f6Schristos 	struct bufferevent_private *bevp;
10082b3787f6Schristos 	BEV_LOCK(bev);
10092b3787f6Schristos 	bevp = BEV_UPCAST(bev);
10102b3787f6Schristos 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
10112b3787f6Schristos 	old_limit = bevp->rate_limiting->limit.write_limit;
10122b3787f6Schristos 
10132b3787f6Schristos 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
10142b3787f6Schristos 	if (old_limit > 0 && new_limit <= 0) {
10152b3787f6Schristos 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
10162b3787f6Schristos 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
10172b3787f6Schristos 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
10182b3787f6Schristos 			r = -1;
10192b3787f6Schristos 	} else if (old_limit <= 0 && new_limit > 0) {
10202b3787f6Schristos 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
10212b3787f6Schristos 			event_del(&bevp->rate_limiting->refill_bucket_event);
10222b3787f6Schristos 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
10232b3787f6Schristos 	}
10242b3787f6Schristos 
10252b3787f6Schristos 	BEV_UNLOCK(bev);
10262b3787f6Schristos 	return r;
10272b3787f6Schristos }
10282b3787f6Schristos 
10292b3787f6Schristos int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)10302b3787f6Schristos bufferevent_rate_limit_group_decrement_read(
10312b3787f6Schristos 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
10322b3787f6Schristos {
10332b3787f6Schristos 	int r = 0;
10342b3787f6Schristos 	ev_ssize_t old_limit, new_limit;
10352b3787f6Schristos 	LOCK_GROUP(grp);
10362b3787f6Schristos 	old_limit = grp->rate_limit.read_limit;
10372b3787f6Schristos 	new_limit = (grp->rate_limit.read_limit -= decr);
10382b3787f6Schristos 
10392b3787f6Schristos 	if (old_limit > 0 && new_limit <= 0) {
10402b3787f6Schristos 		bev_group_suspend_reading_(grp);
10412b3787f6Schristos 	} else if (old_limit <= 0 && new_limit > 0) {
10422b3787f6Schristos 		bev_group_unsuspend_reading_(grp);
10432b3787f6Schristos 	}
10442b3787f6Schristos 
10452b3787f6Schristos 	UNLOCK_GROUP(grp);
10462b3787f6Schristos 	return r;
10472b3787f6Schristos }
10482b3787f6Schristos 
10492b3787f6Schristos int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)10502b3787f6Schristos bufferevent_rate_limit_group_decrement_write(
10512b3787f6Schristos 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
10522b3787f6Schristos {
10532b3787f6Schristos 	int r = 0;
10542b3787f6Schristos 	ev_ssize_t old_limit, new_limit;
10552b3787f6Schristos 	LOCK_GROUP(grp);
10562b3787f6Schristos 	old_limit = grp->rate_limit.write_limit;
10572b3787f6Schristos 	new_limit = (grp->rate_limit.write_limit -= decr);
10582b3787f6Schristos 
10592b3787f6Schristos 	if (old_limit > 0 && new_limit <= 0) {
10602b3787f6Schristos 		bev_group_suspend_writing_(grp);
10612b3787f6Schristos 	} else if (old_limit <= 0 && new_limit > 0) {
10622b3787f6Schristos 		bev_group_unsuspend_writing_(grp);
10632b3787f6Schristos 	}
10642b3787f6Schristos 
10652b3787f6Schristos 	UNLOCK_GROUP(grp);
10662b3787f6Schristos 	return r;
10672b3787f6Schristos }
10682b3787f6Schristos 
10692b3787f6Schristos void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)10702b3787f6Schristos bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
10712b3787f6Schristos     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
10722b3787f6Schristos {
10732b3787f6Schristos 	EVUTIL_ASSERT(grp != NULL);
10742b3787f6Schristos 	if (total_read_out)
10752b3787f6Schristos 		*total_read_out = grp->total_read;
10762b3787f6Schristos 	if (total_written_out)
10772b3787f6Schristos 		*total_written_out = grp->total_written;
10782b3787f6Schristos }
10792b3787f6Schristos 
10802b3787f6Schristos void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)10812b3787f6Schristos bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
10822b3787f6Schristos {
10832b3787f6Schristos 	grp->total_read = grp->total_written = 0;
10842b3787f6Schristos }
10852b3787f6Schristos 
10862b3787f6Schristos int
bufferevent_ratelim_init_(struct bufferevent_private * bev)10872b3787f6Schristos bufferevent_ratelim_init_(struct bufferevent_private *bev)
10882b3787f6Schristos {
10892b3787f6Schristos 	bev->rate_limiting = NULL;
10902b3787f6Schristos 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
10912b3787f6Schristos 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
10922b3787f6Schristos 
10932b3787f6Schristos 	return 0;
10942b3787f6Schristos }
1095