1 /*	$NetBSD: bufferevent_ratelim.c,v 1.5 2021/04/10 19:02:37 rillig Exp $	*/
2 
3 /*
4  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. The name of the author may not be used to endorse or promote products
17  *    derived from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 #include "evconfig-private.h"
31 
32 #include <sys/types.h>
33 #include <limits.h>
34 #include <string.h>
35 #include <stdlib.h>
36 
37 #include "event2/event.h"
38 #include "event2/event_struct.h"
39 #include "event2/util.h"
40 #include "event2/bufferevent.h"
41 #include "event2/bufferevent_struct.h"
42 #include "event2/buffer.h"
43 
44 #include "ratelim-internal.h"
45 
46 #include "bufferevent-internal.h"
47 #include "mm-internal.h"
48 #include "util-internal.h"
49 #include "event-internal.h"
50 
51 int
52 ev_token_bucket_init_(struct ev_token_bucket *bucket,
53     const struct ev_token_bucket_cfg *cfg,
54     ev_uint32_t current_tick,
55     int reinitialize)
56 {
57 	if (reinitialize) {
58 		/* on reinitialization, we only clip downwards, since we've
59 		   already used who-knows-how-much bandwidth this tick.  We
60 		   leave "last_updated" as it is; the next update will add the
61 		   appropriate amount of bandwidth to the bucket.
62 		*/
63 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
64 			bucket->read_limit = cfg->read_maximum;
65 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
66 			bucket->write_limit = cfg->write_maximum;
67 	} else {
68 		bucket->read_limit = cfg->read_rate;
69 		bucket->write_limit = cfg->write_rate;
70 		bucket->last_updated = current_tick;
71 	}
72 	return 0;
73 }
74 
75 int
76 ev_token_bucket_update_(struct ev_token_bucket *bucket,
77     const struct ev_token_bucket_cfg *cfg,
78     ev_uint32_t current_tick)
79 {
80 	/* It's okay if the tick number overflows, since we'll just
81 	 * wrap around when we do the unsigned substraction. */
82 	unsigned n_ticks = current_tick - bucket->last_updated;
83 
84 	/* Make sure some ticks actually happened, and that time didn't
85 	 * roll back. */
86 	if (n_ticks == 0 || n_ticks > INT_MAX)
87 		return 0;
88 
89 	/* Naively, we would say
90 		bucket->limit += n_ticks * cfg->rate;
91 
92 		if (bucket->limit > cfg->maximum)
93 			bucket->limit = cfg->maximum;
94 
95 	   But we're worried about overflow, so we do it like this:
96 	*/
97 
98 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
99 		bucket->read_limit = cfg->read_maximum;
100 	else
101 		bucket->read_limit += n_ticks * cfg->read_rate;
102 
103 
104 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
105 		bucket->write_limit = cfg->write_maximum;
106 	else
107 		bucket->write_limit += n_ticks * cfg->write_rate;
108 
109 
110 	bucket->last_updated = current_tick;
111 
112 	return 1;
113 }
114 
115 static inline void
116 bufferevent_update_buckets(struct bufferevent_private *bev)
117 {
118 	/* Must hold lock on bev. */
119 	struct timeval now;
120 	unsigned tick;
121 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
123 	if (tick != bev->rate_limiting->limit.last_updated)
124 		ev_token_bucket_update_(&bev->rate_limiting->limit,
125 		    bev->rate_limiting->cfg, tick);
126 }
127 
128 ev_uint32_t
129 ev_token_bucket_get_tick_(const struct timeval *tv,
130     const struct ev_token_bucket_cfg *cfg)
131 {
132 	/* This computation uses two multiplies and a divide.  We could do
133 	 * fewer if we knew that the tick length was an integer number of
134 	 * seconds, or if we knew it divided evenly into a second.  We should
135 	 * investigate that more.
136 	 */
137 
138 	/* We cast to an ev_uint64_t first, since we don't want to overflow
139 	 * before we do the final divide. */
140 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
141 	return (unsigned)(msec / cfg->msec_per_tick);
142 }
143 
144 struct ev_token_bucket_cfg *
145 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
146     size_t write_rate, size_t write_burst,
147     const struct timeval *tick_len)
148 {
149 	struct ev_token_bucket_cfg *r;
150 	struct timeval g;
151 	if (! tick_len) {
152 		g.tv_sec = 1;
153 		g.tv_usec = 0;
154 		tick_len = &g;
155 	}
156 	if (read_rate > read_burst || write_rate > write_burst ||
157 	    read_rate < 1 || write_rate < 1)
158 		return NULL;
159 	if (read_rate > EV_RATE_LIMIT_MAX ||
160 	    write_rate > EV_RATE_LIMIT_MAX ||
161 	    read_burst > EV_RATE_LIMIT_MAX ||
162 	    write_burst > EV_RATE_LIMIT_MAX)
163 		return NULL;
164 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165 	if (!r)
166 		return NULL;
167 	r->read_rate = read_rate;
168 	r->write_rate = write_rate;
169 	r->read_maximum = read_burst;
170 	r->write_maximum = write_burst;
171 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
172 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
173 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
174 	return r;
175 }
176 
177 void
178 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
179 {
180 	mm_free(cfg);
181 }
182 
183 /* Default values for max_single_read & max_single_write variables. */
184 #define MAX_SINGLE_READ_DEFAULT 16384
185 #define MAX_SINGLE_WRITE_DEFAULT 16384
186 
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189 
190 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
194 
195 /** Helper: figure out the maximum amount we should write if is_write, or
196     the maximum amount we should read if is_read.  Return that maximum, or
197     0 if our bucket is wholly exhausted.
198  */
199 static inline ev_ssize_t
200 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
201 {
202 	/* needs lock on bev. */
203 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204 
205 #define LIM(x)						\
206 	(is_write ? (x).write_limit : (x).read_limit)
207 
208 #define GROUP_SUSPENDED(g)			\
209 	(is_write ? (g)->write_suspended : (g)->read_suspended)
210 
211 	/* Sets max_so_far to MIN(x, max_so_far) */
212 #define CLAMPTO(x)				\
213 	do {					\
214 		if (max_so_far > (x))		\
215 			max_so_far = (x);	\
216 	} while (/*CONSTCOND*/0);
217 
218 	if (!bev->rate_limiting)
219 		return max_so_far;
220 
221 	/* If rate-limiting is enabled at all, update the appropriate
222 	   bucket, and take the smaller of our rate limit and the group
223 	   rate limit.
224 	 */
225 
226 	if (bev->rate_limiting->cfg) {
227 		bufferevent_update_buckets(bev);
228 		max_so_far = LIM(bev->rate_limiting->limit);
229 	}
230 	if (bev->rate_limiting->group) {
231 		struct bufferevent_rate_limit_group *g =
232 		    bev->rate_limiting->group;
233 		ev_ssize_t share;
234 		LOCK_GROUP(g);
235 		if (GROUP_SUSPENDED(g)) {
236 			/* We can get here if we failed to lock this
237 			 * particular bufferevent while suspending the whole
238 			 * group. */
239 			if (is_write)
240 				bufferevent_suspend_write_(&bev->bev,
241 				    BEV_SUSPEND_BW_GROUP);
242 			else
243 				bufferevent_suspend_read_(&bev->bev,
244 				    BEV_SUSPEND_BW_GROUP);
245 			share = 0;
246 		} else {
247 			/* XXXX probably we should divide among the active
248 			 * members, not the total members. */
249 			share = LIM(g->rate_limit) / g->n_members;
250 			if (share < g->min_share)
251 				share = g->min_share;
252 		}
253 		UNLOCK_GROUP(g);
254 		CLAMPTO(share);
255 	}
256 
257 	if (max_so_far < 0)
258 		max_so_far = 0;
259 	return max_so_far;
260 }
261 
262 ev_ssize_t
263 bufferevent_get_read_max_(struct bufferevent_private *bev)
264 {
265 	return bufferevent_get_rlim_max_(bev, 0);
266 }
267 
268 ev_ssize_t
269 bufferevent_get_write_max_(struct bufferevent_private *bev)
270 {
271 	return bufferevent_get_rlim_max_(bev, 1);
272 }
273 
274 int
275 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277 	/* XXXXX Make sure all users of this function check its return value */
278 	int r = 0;
279 	/* need to hold lock on bev */
280 	if (!bev->rate_limiting)
281 		return 0;
282 
283 	if (bev->rate_limiting->cfg) {
284 		bev->rate_limiting->limit.read_limit -= bytes;
285 		if (bev->rate_limiting->limit.read_limit <= 0) {
286 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
287 			if (event_add(&bev->rate_limiting->refill_bucket_event,
288 				&bev->rate_limiting->cfg->tick_timeout) < 0)
289 				r = -1;
290 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
291 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
292 				event_del(&bev->rate_limiting->refill_bucket_event);
293 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
294 		}
295 	}
296 
297 	if (bev->rate_limiting->group) {
298 		LOCK_GROUP(bev->rate_limiting->group);
299 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300 		bev->rate_limiting->group->total_read += bytes;
301 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302 			bev_group_suspend_reading_(bev->rate_limiting->group);
303 		} else if (bev->rate_limiting->group->read_suspended) {
304 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
305 		}
306 		UNLOCK_GROUP(bev->rate_limiting->group);
307 	}
308 
309 	return r;
310 }
311 
312 int
313 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315 	/* XXXXX Make sure all users of this function check its return value */
316 	int r = 0;
317 	/* need to hold lock */
318 	if (!bev->rate_limiting)
319 		return 0;
320 
321 	if (bev->rate_limiting->cfg) {
322 		bev->rate_limiting->limit.write_limit -= bytes;
323 		if (bev->rate_limiting->limit.write_limit <= 0) {
324 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
325 			if (event_add(&bev->rate_limiting->refill_bucket_event,
326 				&bev->rate_limiting->cfg->tick_timeout) < 0)
327 				r = -1;
328 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
329 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
330 				event_del(&bev->rate_limiting->refill_bucket_event);
331 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
332 		}
333 	}
334 
335 	if (bev->rate_limiting->group) {
336 		LOCK_GROUP(bev->rate_limiting->group);
337 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338 		bev->rate_limiting->group->total_written += bytes;
339 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340 			bev_group_suspend_writing_(bev->rate_limiting->group);
341 		} else if (bev->rate_limiting->group->write_suspended) {
342 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
343 		}
344 		UNLOCK_GROUP(bev->rate_limiting->group);
345 	}
346 
347 	return r;
348 }
349 
350 /** Stop reading on every bufferevent in <b>g</b> */
351 static int
352 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
353 {
354 	/* Needs group lock */
355 	struct bufferevent_private *bev;
356 	g->read_suspended = 1;
357 	g->pending_unsuspend_read = 0;
358 
359 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
360 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
361 	   the bufferevent locks.  If we are unable to lock any individual
362 	   bufferevent, it will find out later when it looks at its limit
363 	   and sees that its group is suspended.)
364 	*/
365 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
367 			bufferevent_suspend_read_(&bev->bev,
368 			    BEV_SUSPEND_BW_GROUP);
369 			EVLOCK_UNLOCK(bev->lock, 0);
370 		}
371 	}
372 	return 0;
373 }
374 
375 /** Stop writing on every bufferevent in <b>g</b> */
376 static int
377 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
378 {
379 	/* Needs group lock */
380 	struct bufferevent_private *bev;
381 	g->write_suspended = 1;
382 	g->pending_unsuspend_write = 0;
383 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
385 			bufferevent_suspend_write_(&bev->bev,
386 			    BEV_SUSPEND_BW_GROUP);
387 			EVLOCK_UNLOCK(bev->lock, 0);
388 		}
389 	}
390 	return 0;
391 }
392 
393 /** Timer callback invoked on a single bufferevent with one or more exhausted
394     buckets when they are ready to refill. */
395 static void
396 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
397 {
398 	unsigned tick;
399 	struct timeval now;
400 	struct bufferevent_private *bev = arg;
401 	int again = 0;
402 	BEV_LOCK(&bev->bev);
403 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404 		BEV_UNLOCK(&bev->bev);
405 		return;
406 	}
407 
408 	/* First, update the bucket */
409 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410 	tick = ev_token_bucket_get_tick_(&now,
411 	    bev->rate_limiting->cfg);
412 	ev_token_bucket_update_(&bev->rate_limiting->limit,
413 	    bev->rate_limiting->cfg,
414 	    tick);
415 
416 	/* Now unsuspend any read/write operations as appropriate. */
417 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418 		if (bev->rate_limiting->limit.read_limit > 0)
419 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
420 		else
421 			again = 1;
422 	}
423 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424 		if (bev->rate_limiting->limit.write_limit > 0)
425 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
426 		else
427 			again = 1;
428 	}
429 	if (again) {
430 		/* One or more of the buckets may need another refill if they
431 		   started negative.
432 
433 		   XXXX if we need to be quiet for more ticks, we should
434 		   maybe figure out what timeout we really want.
435 		*/
436 		/* XXXX Handle event_add failure somehow */
437 		event_add(&bev->rate_limiting->refill_bucket_event,
438 		    &bev->rate_limiting->cfg->tick_timeout);
439 	}
440 	BEV_UNLOCK(&bev->bev);
441 }
442 
443 /** Helper: grab a random element from a bufferevent group.
444  *
445  * Requires that we hold the lock on the group.
446  */
447 static struct bufferevent_private *
448 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449 {
450 	int which;
451 	struct bufferevent_private *bev;
452 
453 	/* requires group lock */
454 
455 	if (!group->n_members)
456 		return NULL;
457 
458 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
459 
460 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
461 
462 	bev = LIST_FIRST(&group->members);
463 	while (which--)
464 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
465 
466 	return bev;
467 }
468 
469 /** Iterate over the elements of a rate-limiting group 'g' with a random
470     starting point, assigning each to the variable 'bev', and executing the
471     block 'block'.
472 
473     We do this in a half-baked effort to get fairness among group members.
474     XXX Round-robin or some kind of priority queue would be even more fair.
475  */
476 #define FOREACH_RANDOM_ORDER(block)			\
477 	do {						\
478 		first = bev_group_random_element_(g);	\
479 		for (bev = first; bev != LIST_END(&g->members); \
480 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481 			block ;					 \
482 		}						 \
483 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
485 			block ;						\
486 		}							\
487 	} while (0)
488 
489 static void
490 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491 {
492 	int again = 0;
493 	struct bufferevent_private *bev, *first;
494 
495 	g->read_suspended = 0;
496 	FOREACH_RANDOM_ORDER({
497 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
498 			bufferevent_unsuspend_read_(&bev->bev,
499 			    BEV_SUSPEND_BW_GROUP);
500 			EVLOCK_UNLOCK(bev->lock, 0);
501 		} else {
502 			again = 1;
503 		}
504 	});
505 	g->pending_unsuspend_read = again;
506 }
507 
508 static void
509 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510 {
511 	int again = 0;
512 	struct bufferevent_private *bev, *first;
513 	g->write_suspended = 0;
514 
515 	FOREACH_RANDOM_ORDER({
516 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
517 			bufferevent_unsuspend_write_(&bev->bev,
518 			    BEV_SUSPEND_BW_GROUP);
519 			EVLOCK_UNLOCK(bev->lock, 0);
520 		} else {
521 			again = 1;
522 		}
523 	});
524 	g->pending_unsuspend_write = again;
525 }
526 
527 /** Callback invoked every tick to add more elements to the group bucket
528     and unsuspend group members as needed.
529  */
530 static void
531 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
532 {
533 	struct bufferevent_rate_limit_group *g = arg;
534 	unsigned tick;
535 	struct timeval now;
536 
537 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
538 
539 	LOCK_GROUP(g);
540 
541 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
543 
544 	if (g->pending_unsuspend_read ||
545 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546 		bev_group_unsuspend_reading_(g);
547 	}
548 	if (g->pending_unsuspend_write ||
549 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550 		bev_group_unsuspend_writing_(g);
551 	}
552 
553 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
554 	 * with pending_unsuspend_write/read, we should do it on the
555 	 * next iteration of the mainloop.
556 	 */
557 
558 	UNLOCK_GROUP(g);
559 }
560 
561 int
562 bufferevent_set_rate_limit(struct bufferevent *bev,
563     struct ev_token_bucket_cfg *cfg)
564 {
565 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
566 	int r = -1;
567 	struct bufferevent_rate_limit *rlim;
568 	struct timeval now;
569 	ev_uint32_t tick;
570 	int reinit = 0, suspended = 0;
571 	/* XXX reference-count cfg */
572 
573 	BEV_LOCK(bev);
574 
575 	if (cfg == NULL) {
576 		if (bevp->rate_limiting) {
577 			rlim = bevp->rate_limiting;
578 			rlim->cfg = NULL;
579 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
580 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
581 			if (event_initialized(&rlim->refill_bucket_event))
582 				event_del(&rlim->refill_bucket_event);
583 		}
584 		r = 0;
585 		goto done;
586 	}
587 
588 	event_base_gettimeofday_cached(bev->ev_base, &now);
589 	tick = ev_token_bucket_get_tick_(&now, cfg);
590 
591 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
592 		/* no-op */
593 		r = 0;
594 		goto done;
595 	}
596 	if (bevp->rate_limiting == NULL) {
597 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
598 		if (!rlim)
599 			goto done;
600 		bevp->rate_limiting = rlim;
601 	} else {
602 		rlim = bevp->rate_limiting;
603 	}
604 	reinit = rlim->cfg != NULL;
605 
606 	rlim->cfg = cfg;
607 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
608 
609 	if (reinit) {
610 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
611 		event_del(&rlim->refill_bucket_event);
612 	}
613 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
614 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
615 
616 	if (rlim->limit.read_limit > 0) {
617 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
618 	} else {
619 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
620 		suspended=1;
621 	}
622 	if (rlim->limit.write_limit > 0) {
623 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
624 	} else {
625 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
626 		suspended = 1;
627 	}
628 
629 	if (suspended)
630 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
631 
632 	r = 0;
633 
634 done:
635 	BEV_UNLOCK(bev);
636 	return r;
637 }
638 
639 struct bufferevent_rate_limit_group *
640 bufferevent_rate_limit_group_new(struct event_base *base,
641     const struct ev_token_bucket_cfg *cfg)
642 {
643 	struct bufferevent_rate_limit_group *g;
644 	struct timeval now;
645 	ev_uint32_t tick;
646 
647 	event_base_gettimeofday_cached(base, &now);
648 	tick = ev_token_bucket_get_tick_(&now, cfg);
649 
650 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
651 	if (!g)
652 		return NULL;
653 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
654 	LIST_INIT(&g->members);
655 
656 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
657 
658 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
659 	    bev_group_refill_callback_, g);
660 	/*XXXX handle event_add failure */
661 	event_add(&g->master_refill_event, &cfg->tick_timeout);
662 
663 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
664 
665 	bufferevent_rate_limit_group_set_min_share(g, 64);
666 
667 	evutil_weakrand_seed_(&g->weakrand_seed,
668 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
669 
670 	return g;
671 }
672 
673 int
674 bufferevent_rate_limit_group_set_cfg(
675 	struct bufferevent_rate_limit_group *g,
676 	const struct ev_token_bucket_cfg *cfg)
677 {
678 	int same_tick;
679 	if (!g || !cfg)
680 		return -1;
681 
682 	LOCK_GROUP(g);
683 	same_tick = evutil_timercmp(
684 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
685 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
686 
687 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
688 		g->rate_limit.read_limit = cfg->read_maximum;
689 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
690 		g->rate_limit.write_limit = cfg->write_maximum;
691 
692 	if (!same_tick) {
693 		/* This can cause a hiccup in the schedule */
694 		event_add(&g->master_refill_event, &cfg->tick_timeout);
695 	}
696 
697 	/* The new limits might force us to adjust min_share differently. */
698 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
699 
700 	UNLOCK_GROUP(g);
701 	return 0;
702 }
703 
704 int
705 bufferevent_rate_limit_group_set_min_share(
706 	struct bufferevent_rate_limit_group *g,
707 	size_t share)
708 {
709 	if (share > EV_SSIZE_MAX)
710 		return -1;
711 
712 	g->configured_min_share = share;
713 
714 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
715 	 * state, at least one connection can go per tick. */
716 	if (share > g->rate_limit_cfg.read_rate)
717 		share = g->rate_limit_cfg.read_rate;
718 	if (share > g->rate_limit_cfg.write_rate)
719 		share = g->rate_limit_cfg.write_rate;
720 
721 	g->min_share = share;
722 	return 0;
723 }
724 
725 void
726 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
727 {
728 	LOCK_GROUP(g);
729 	EVUTIL_ASSERT(0 == g->n_members);
730 	event_del(&g->master_refill_event);
731 	UNLOCK_GROUP(g);
732 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
733 	mm_free(g);
734 }
735 
736 int
737 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
738     struct bufferevent_rate_limit_group *g)
739 {
740 	int wsuspend, rsuspend;
741 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
742 	BEV_LOCK(bev);
743 
744 	if (!bevp->rate_limiting) {
745 		struct bufferevent_rate_limit *rlim;
746 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747 		if (!rlim) {
748 			BEV_UNLOCK(bev);
749 			return -1;
750 		}
751 		event_assign(&rlim->refill_bucket_event, bev->ev_base,
752 		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
753 		bevp->rate_limiting = rlim;
754 	}
755 
756 	if (bevp->rate_limiting->group == g) {
757 		BEV_UNLOCK(bev);
758 		return 0;
759 	}
760 	if (bevp->rate_limiting->group)
761 		bufferevent_remove_from_rate_limit_group(bev);
762 
763 	LOCK_GROUP(g);
764 	bevp->rate_limiting->group = g;
765 	++g->n_members;
766 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
767 
768 	rsuspend = g->read_suspended;
769 	wsuspend = g->write_suspended;
770 
771 	UNLOCK_GROUP(g);
772 
773 	if (rsuspend)
774 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775 	if (wsuspend)
776 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
777 
778 	BEV_UNLOCK(bev);
779 	return 0;
780 }
781 
782 int
783 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
784 {
785 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
786 }
787 
788 int
789 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790     int unsuspend)
791 {
792 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
793 	BEV_LOCK(bev);
794 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
795 		struct bufferevent_rate_limit_group *g =
796 		    bevp->rate_limiting->group;
797 		LOCK_GROUP(g);
798 		bevp->rate_limiting->group = NULL;
799 		--g->n_members;
800 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
801 		UNLOCK_GROUP(g);
802 	}
803 	if (unsuspend) {
804 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
805 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
806 	}
807 	BEV_UNLOCK(bev);
808 	return 0;
809 }
810 
811 /* ===
812  * API functions to expose rate limits.
813  *
814  * Don't use these from inside Libevent; they're meant to be for use by
815  * the program.
816  * === */
817 
818 /* Mostly you don't want to use this function from inside libevent;
819  * bufferevent_get_read_max_() is more likely what you want*/
820 ev_ssize_t
821 bufferevent_get_read_limit(struct bufferevent *bev)
822 {
823 	ev_ssize_t r;
824 	struct bufferevent_private *bevp;
825 	BEV_LOCK(bev);
826 	bevp = BEV_UPCAST(bev);
827 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
828 		bufferevent_update_buckets(bevp);
829 		r = bevp->rate_limiting->limit.read_limit;
830 	} else {
831 		r = EV_SSIZE_MAX;
832 	}
833 	BEV_UNLOCK(bev);
834 	return r;
835 }
836 
837 /* Mostly you don't want to use this function from inside libevent;
838  * bufferevent_get_write_max_() is more likely what you want*/
839 ev_ssize_t
840 bufferevent_get_write_limit(struct bufferevent *bev)
841 {
842 	ev_ssize_t r;
843 	struct bufferevent_private *bevp;
844 	BEV_LOCK(bev);
845 	bevp = BEV_UPCAST(bev);
846 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
847 		bufferevent_update_buckets(bevp);
848 		r = bevp->rate_limiting->limit.write_limit;
849 	} else {
850 		r = EV_SSIZE_MAX;
851 	}
852 	BEV_UNLOCK(bev);
853 	return r;
854 }
855 
856 int
857 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
858 {
859 	struct bufferevent_private *bevp;
860 	BEV_LOCK(bev);
861 	bevp = BEV_UPCAST(bev);
862 	if (size == 0 || size > EV_SSIZE_MAX)
863 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
864 	else
865 		bevp->max_single_read = size;
866 	BEV_UNLOCK(bev);
867 	return 0;
868 }
869 
870 int
871 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
872 {
873 	struct bufferevent_private *bevp;
874 	BEV_LOCK(bev);
875 	bevp = BEV_UPCAST(bev);
876 	if (size == 0 || size > EV_SSIZE_MAX)
877 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
878 	else
879 		bevp->max_single_write = size;
880 	BEV_UNLOCK(bev);
881 	return 0;
882 }
883 
884 ev_ssize_t
885 bufferevent_get_max_single_read(struct bufferevent *bev)
886 {
887 	ev_ssize_t r;
888 
889 	BEV_LOCK(bev);
890 	r = BEV_UPCAST(bev)->max_single_read;
891 	BEV_UNLOCK(bev);
892 	return r;
893 }
894 
895 ev_ssize_t
896 bufferevent_get_max_single_write(struct bufferevent *bev)
897 {
898 	ev_ssize_t r;
899 
900 	BEV_LOCK(bev);
901 	r = BEV_UPCAST(bev)->max_single_write;
902 	BEV_UNLOCK(bev);
903 	return r;
904 }
905 
906 ev_ssize_t
907 bufferevent_get_max_to_read(struct bufferevent *bev)
908 {
909 	ev_ssize_t r;
910 	BEV_LOCK(bev);
911 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
912 	BEV_UNLOCK(bev);
913 	return r;
914 }
915 
916 ev_ssize_t
917 bufferevent_get_max_to_write(struct bufferevent *bev)
918 {
919 	ev_ssize_t r;
920 	BEV_LOCK(bev);
921 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
922 	BEV_UNLOCK(bev);
923 	return r;
924 }
925 
926 const struct ev_token_bucket_cfg *
927 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
928 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
929 	struct ev_token_bucket_cfg *cfg;
930 
931 	BEV_LOCK(bev);
932 
933 	if (bufev_private->rate_limiting) {
934 		cfg = bufev_private->rate_limiting->cfg;
935 	} else {
936 		cfg = NULL;
937 	}
938 
939 	BEV_UNLOCK(bev);
940 
941 	return cfg;
942 }
943 
944 /* Mostly you don't want to use this function from inside libevent;
945  * bufferevent_get_read_max_() is more likely what you want*/
946 ev_ssize_t
947 bufferevent_rate_limit_group_get_read_limit(
948 	struct bufferevent_rate_limit_group *grp)
949 {
950 	ev_ssize_t r;
951 	LOCK_GROUP(grp);
952 	r = grp->rate_limit.read_limit;
953 	UNLOCK_GROUP(grp);
954 	return r;
955 }
956 
957 /* Mostly you don't want to use this function from inside libevent;
958  * bufferevent_get_write_max_() is more likely what you want. */
959 ev_ssize_t
960 bufferevent_rate_limit_group_get_write_limit(
961 	struct bufferevent_rate_limit_group *grp)
962 {
963 	ev_ssize_t r;
964 	LOCK_GROUP(grp);
965 	r = grp->rate_limit.write_limit;
966 	UNLOCK_GROUP(grp);
967 	return r;
968 }
969 
970 int
971 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
972 {
973 	int r = 0;
974 	ev_ssize_t old_limit, new_limit;
975 	struct bufferevent_private *bevp;
976 	BEV_LOCK(bev);
977 	bevp = BEV_UPCAST(bev);
978 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
979 	old_limit = bevp->rate_limiting->limit.read_limit;
980 
981 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
982 	if (old_limit > 0 && new_limit <= 0) {
983 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
984 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
985 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
986 			r = -1;
987 	} else if (old_limit <= 0 && new_limit > 0) {
988 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
989 			event_del(&bevp->rate_limiting->refill_bucket_event);
990 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
991 	}
992 
993 	BEV_UNLOCK(bev);
994 	return r;
995 }
996 
997 int
998 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
999 {
1000 	/* XXXX this is mostly copy-and-paste from
1001 	 * bufferevent_decrement_read_limit */
1002 	int r = 0;
1003 	ev_ssize_t old_limit, new_limit;
1004 	struct bufferevent_private *bevp;
1005 	BEV_LOCK(bev);
1006 	bevp = BEV_UPCAST(bev);
1007 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1008 	old_limit = bevp->rate_limiting->limit.write_limit;
1009 
1010 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1011 	if (old_limit > 0 && new_limit <= 0) {
1012 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1013 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1014 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1015 			r = -1;
1016 	} else if (old_limit <= 0 && new_limit > 0) {
1017 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1018 			event_del(&bevp->rate_limiting->refill_bucket_event);
1019 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1020 	}
1021 
1022 	BEV_UNLOCK(bev);
1023 	return r;
1024 }
1025 
1026 int
1027 bufferevent_rate_limit_group_decrement_read(
1028 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1029 {
1030 	int r = 0;
1031 	ev_ssize_t old_limit, new_limit;
1032 	LOCK_GROUP(grp);
1033 	old_limit = grp->rate_limit.read_limit;
1034 	new_limit = (grp->rate_limit.read_limit -= decr);
1035 
1036 	if (old_limit > 0 && new_limit <= 0) {
1037 		bev_group_suspend_reading_(grp);
1038 	} else if (old_limit <= 0 && new_limit > 0) {
1039 		bev_group_unsuspend_reading_(grp);
1040 	}
1041 
1042 	UNLOCK_GROUP(grp);
1043 	return r;
1044 }
1045 
1046 int
1047 bufferevent_rate_limit_group_decrement_write(
1048 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1049 {
1050 	int r = 0;
1051 	ev_ssize_t old_limit, new_limit;
1052 	LOCK_GROUP(grp);
1053 	old_limit = grp->rate_limit.write_limit;
1054 	new_limit = (grp->rate_limit.write_limit -= decr);
1055 
1056 	if (old_limit > 0 && new_limit <= 0) {
1057 		bev_group_suspend_writing_(grp);
1058 	} else if (old_limit <= 0 && new_limit > 0) {
1059 		bev_group_unsuspend_writing_(grp);
1060 	}
1061 
1062 	UNLOCK_GROUP(grp);
1063 	return r;
1064 }
1065 
1066 void
1067 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1068     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1069 {
1070 	EVUTIL_ASSERT(grp != NULL);
1071 	if (total_read_out)
1072 		*total_read_out = grp->total_read;
1073 	if (total_written_out)
1074 		*total_written_out = grp->total_written;
1075 }
1076 
1077 void
1078 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1079 {
1080 	grp->total_read = grp->total_written = 0;
1081 }
1082 
1083 int
1084 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1085 {
1086 	bev->rate_limiting = NULL;
1087 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1088 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1089 
1090 	return 0;
1091 }
1092