1 /*	$NetBSD: bufferevent_ratelim.c,v 1.3 2017/01/31 23:17:39 christos Exp $	*/
2 /*
3  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 #include "evconfig-private.h"
30 
31 #include <sys/types.h>
32 #include <limits.h>
33 #include <string.h>
34 #include <stdlib.h>
35 
36 #include "event2/event.h"
37 #include "event2/event_struct.h"
38 #include "event2/util.h"
39 #include "event2/bufferevent.h"
40 #include "event2/bufferevent_struct.h"
41 #include "event2/buffer.h"
42 
43 #include "ratelim-internal.h"
44 
45 #include "bufferevent-internal.h"
46 #include "mm-internal.h"
47 #include "util-internal.h"
48 #include "event-internal.h"
49 
50 int
51 ev_token_bucket_init_(struct ev_token_bucket *bucket,
52     const struct ev_token_bucket_cfg *cfg,
53     ev_uint32_t current_tick,
54     int reinitialize)
55 {
56 	if (reinitialize) {
57 		/* on reinitialization, we only clip downwards, since we've
58 		   already used who-knows-how-much bandwidth this tick.  We
59 		   leave "last_updated" as it is; the next update will add the
60 		   appropriate amount of bandwidth to the bucket.
61 		*/
62 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
63 			bucket->read_limit = cfg->read_maximum;
64 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
65 			bucket->write_limit = cfg->write_maximum;
66 	} else {
67 		bucket->read_limit = cfg->read_rate;
68 		bucket->write_limit = cfg->write_rate;
69 		bucket->last_updated = current_tick;
70 	}
71 	return 0;
72 }
73 
74 int
75 ev_token_bucket_update_(struct ev_token_bucket *bucket,
76     const struct ev_token_bucket_cfg *cfg,
77     ev_uint32_t current_tick)
78 {
79 	/* It's okay if the tick number overflows, since we'll just
80 	 * wrap around when we do the unsigned substraction. */
81 	unsigned n_ticks = current_tick - bucket->last_updated;
82 
83 	/* Make sure some ticks actually happened, and that time didn't
84 	 * roll back. */
85 	if (n_ticks == 0 || n_ticks > INT_MAX)
86 		return 0;
87 
88 	/* Naively, we would say
89 		bucket->limit += n_ticks * cfg->rate;
90 
91 		if (bucket->limit > cfg->maximum)
92 			bucket->limit = cfg->maximum;
93 
94 	   But we're worried about overflow, so we do it like this:
95 	*/
96 
97 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
98 		bucket->read_limit = cfg->read_maximum;
99 	else
100 		bucket->read_limit += n_ticks * cfg->read_rate;
101 
102 
103 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
104 		bucket->write_limit = cfg->write_maximum;
105 	else
106 		bucket->write_limit += n_ticks * cfg->write_rate;
107 
108 
109 	bucket->last_updated = current_tick;
110 
111 	return 1;
112 }
113 
114 static inline void
115 bufferevent_update_buckets(struct bufferevent_private *bev)
116 {
117 	/* Must hold lock on bev. */
118 	struct timeval now;
119 	unsigned tick;
120 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
121 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
122 	if (tick != bev->rate_limiting->limit.last_updated)
123 		ev_token_bucket_update_(&bev->rate_limiting->limit,
124 		    bev->rate_limiting->cfg, tick);
125 }
126 
127 ev_uint32_t
128 ev_token_bucket_get_tick_(const struct timeval *tv,
129     const struct ev_token_bucket_cfg *cfg)
130 {
131 	/* This computation uses two multiplies and a divide.  We could do
132 	 * fewer if we knew that the tick length was an integer number of
133 	 * seconds, or if we knew it divided evenly into a second.  We should
134 	 * investigate that more.
135 	 */
136 
137 	/* We cast to an ev_uint64_t first, since we don't want to overflow
138 	 * before we do the final divide. */
139 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
140 	return (unsigned)(msec / cfg->msec_per_tick);
141 }
142 
143 struct ev_token_bucket_cfg *
144 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
145     size_t write_rate, size_t write_burst,
146     const struct timeval *tick_len)
147 {
148 	struct ev_token_bucket_cfg *r;
149 	struct timeval g;
150 	if (! tick_len) {
151 		g.tv_sec = 1;
152 		g.tv_usec = 0;
153 		tick_len = &g;
154 	}
155 	if (read_rate > read_burst || write_rate > write_burst ||
156 	    read_rate < 1 || write_rate < 1)
157 		return NULL;
158 	if (read_rate > EV_RATE_LIMIT_MAX ||
159 	    write_rate > EV_RATE_LIMIT_MAX ||
160 	    read_burst > EV_RATE_LIMIT_MAX ||
161 	    write_burst > EV_RATE_LIMIT_MAX)
162 		return NULL;
163 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
164 	if (!r)
165 		return NULL;
166 	r->read_rate = read_rate;
167 	r->write_rate = write_rate;
168 	r->read_maximum = read_burst;
169 	r->write_maximum = write_burst;
170 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
171 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
172 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
173 	return r;
174 }
175 
176 void
177 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
178 {
179 	mm_free(cfg);
180 }
181 
182 /* Default values for max_single_read & max_single_write variables. */
183 #define MAX_SINGLE_READ_DEFAULT 16384
184 #define MAX_SINGLE_WRITE_DEFAULT 16384
185 
186 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
187 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
188 
189 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
190 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
191 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
192 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
193 
194 /** Helper: figure out the maximum amount we should write if is_write, or
195     the maximum amount we should read if is_read.  Return that maximum, or
196     0 if our bucket is wholly exhausted.
197  */
198 static inline ev_ssize_t
199 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
200 {
201 	/* needs lock on bev. */
202 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
203 
204 #define LIM(x)						\
205 	(is_write ? (x).write_limit : (x).read_limit)
206 
207 #define GROUP_SUSPENDED(g)			\
208 	(is_write ? (g)->write_suspended : (g)->read_suspended)
209 
210 	/* Sets max_so_far to MIN(x, max_so_far) */
211 #define CLAMPTO(x)				\
212 	do {					\
213 		if (max_so_far > (x))		\
214 			max_so_far = (x);	\
215 	} while (/*CONSTCOND*/0);
216 
217 	if (!bev->rate_limiting)
218 		return max_so_far;
219 
220 	/* If rate-limiting is enabled at all, update the appropriate
221 	   bucket, and take the smaller of our rate limit and the group
222 	   rate limit.
223 	 */
224 
225 	if (bev->rate_limiting->cfg) {
226 		bufferevent_update_buckets(bev);
227 		max_so_far = LIM(bev->rate_limiting->limit);
228 	}
229 	if (bev->rate_limiting->group) {
230 		struct bufferevent_rate_limit_group *g =
231 		    bev->rate_limiting->group;
232 		ev_ssize_t share;
233 		LOCK_GROUP(g);
234 		if (GROUP_SUSPENDED(g)) {
235 			/* We can get here if we failed to lock this
236 			 * particular bufferevent while suspending the whole
237 			 * group. */
238 			if (is_write)
239 				bufferevent_suspend_write_(&bev->bev,
240 				    BEV_SUSPEND_BW_GROUP);
241 			else
242 				bufferevent_suspend_read_(&bev->bev,
243 				    BEV_SUSPEND_BW_GROUP);
244 			share = 0;
245 		} else {
246 			/* XXXX probably we should divide among the active
247 			 * members, not the total members. */
248 			share = LIM(g->rate_limit) / g->n_members;
249 			if (share < g->min_share)
250 				share = g->min_share;
251 		}
252 		UNLOCK_GROUP(g);
253 		CLAMPTO(share);
254 	}
255 
256 	if (max_so_far < 0)
257 		max_so_far = 0;
258 	return max_so_far;
259 }
260 
261 ev_ssize_t
262 bufferevent_get_read_max_(struct bufferevent_private *bev)
263 {
264 	return bufferevent_get_rlim_max_(bev, 0);
265 }
266 
267 ev_ssize_t
268 bufferevent_get_write_max_(struct bufferevent_private *bev)
269 {
270 	return bufferevent_get_rlim_max_(bev, 1);
271 }
272 
273 int
274 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
275 {
276 	/* XXXXX Make sure all users of this function check its return value */
277 	int r = 0;
278 	/* need to hold lock on bev */
279 	if (!bev->rate_limiting)
280 		return 0;
281 
282 	if (bev->rate_limiting->cfg) {
283 		bev->rate_limiting->limit.read_limit -= bytes;
284 		if (bev->rate_limiting->limit.read_limit <= 0) {
285 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
286 			if (event_add(&bev->rate_limiting->refill_bucket_event,
287 				&bev->rate_limiting->cfg->tick_timeout) < 0)
288 				r = -1;
289 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
290 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
291 				event_del(&bev->rate_limiting->refill_bucket_event);
292 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
293 		}
294 	}
295 
296 	if (bev->rate_limiting->group) {
297 		LOCK_GROUP(bev->rate_limiting->group);
298 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
299 		bev->rate_limiting->group->total_read += bytes;
300 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
301 			bev_group_suspend_reading_(bev->rate_limiting->group);
302 		} else if (bev->rate_limiting->group->read_suspended) {
303 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
304 		}
305 		UNLOCK_GROUP(bev->rate_limiting->group);
306 	}
307 
308 	return r;
309 }
310 
311 int
312 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
313 {
314 	/* XXXXX Make sure all users of this function check its return value */
315 	int r = 0;
316 	/* need to hold lock */
317 	if (!bev->rate_limiting)
318 		return 0;
319 
320 	if (bev->rate_limiting->cfg) {
321 		bev->rate_limiting->limit.write_limit -= bytes;
322 		if (bev->rate_limiting->limit.write_limit <= 0) {
323 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
324 			if (event_add(&bev->rate_limiting->refill_bucket_event,
325 				&bev->rate_limiting->cfg->tick_timeout) < 0)
326 				r = -1;
327 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
328 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
329 				event_del(&bev->rate_limiting->refill_bucket_event);
330 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
331 		}
332 	}
333 
334 	if (bev->rate_limiting->group) {
335 		LOCK_GROUP(bev->rate_limiting->group);
336 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
337 		bev->rate_limiting->group->total_written += bytes;
338 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
339 			bev_group_suspend_writing_(bev->rate_limiting->group);
340 		} else if (bev->rate_limiting->group->write_suspended) {
341 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
342 		}
343 		UNLOCK_GROUP(bev->rate_limiting->group);
344 	}
345 
346 	return r;
347 }
348 
349 /** Stop reading on every bufferevent in <b>g</b> */
350 static int
351 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
352 {
353 	/* Needs group lock */
354 	struct bufferevent_private *bev;
355 	g->read_suspended = 1;
356 	g->pending_unsuspend_read = 0;
357 
358 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
359 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
360 	   the bufferevent locks.  If we are unable to lock any individual
361 	   bufferevent, it will find out later when it looks at its limit
362 	   and sees that its group is suspended.)
363 	*/
364 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
365 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
366 			bufferevent_suspend_read_(&bev->bev,
367 			    BEV_SUSPEND_BW_GROUP);
368 			EVLOCK_UNLOCK(bev->lock, 0);
369 		}
370 	}
371 	return 0;
372 }
373 
374 /** Stop writing on every bufferevent in <b>g</b> */
375 static int
376 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
377 {
378 	/* Needs group lock */
379 	struct bufferevent_private *bev;
380 	g->write_suspended = 1;
381 	g->pending_unsuspend_write = 0;
382 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
383 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
384 			bufferevent_suspend_write_(&bev->bev,
385 			    BEV_SUSPEND_BW_GROUP);
386 			EVLOCK_UNLOCK(bev->lock, 0);
387 		}
388 	}
389 	return 0;
390 }
391 
392 /** Timer callback invoked on a single bufferevent with one or more exhausted
393     buckets when they are ready to refill. */
394 static void
395 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
396 {
397 	unsigned tick;
398 	struct timeval now;
399 	struct bufferevent_private *bev = arg;
400 	int again = 0;
401 	BEV_LOCK(&bev->bev);
402 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
403 		BEV_UNLOCK(&bev->bev);
404 		return;
405 	}
406 
407 	/* First, update the bucket */
408 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
409 	tick = ev_token_bucket_get_tick_(&now,
410 	    bev->rate_limiting->cfg);
411 	ev_token_bucket_update_(&bev->rate_limiting->limit,
412 	    bev->rate_limiting->cfg,
413 	    tick);
414 
415 	/* Now unsuspend any read/write operations as appropriate. */
416 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
417 		if (bev->rate_limiting->limit.read_limit > 0)
418 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
419 		else
420 			again = 1;
421 	}
422 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
423 		if (bev->rate_limiting->limit.write_limit > 0)
424 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
425 		else
426 			again = 1;
427 	}
428 	if (again) {
429 		/* One or more of the buckets may need another refill if they
430 		   started negative.
431 
432 		   XXXX if we need to be quiet for more ticks, we should
433 		   maybe figure out what timeout we really want.
434 		*/
435 		/* XXXX Handle event_add failure somehow */
436 		event_add(&bev->rate_limiting->refill_bucket_event,
437 		    &bev->rate_limiting->cfg->tick_timeout);
438 	}
439 	BEV_UNLOCK(&bev->bev);
440 }
441 
442 /** Helper: grab a random element from a bufferevent group.
443  *
444  * Requires that we hold the lock on the group.
445  */
446 static struct bufferevent_private *
447 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
448 {
449 	int which;
450 	struct bufferevent_private *bev;
451 
452 	/* requires group lock */
453 
454 	if (!group->n_members)
455 		return NULL;
456 
457 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
458 
459 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
460 
461 	bev = LIST_FIRST(&group->members);
462 	while (which--)
463 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
464 
465 	return bev;
466 }
467 
468 /** Iterate over the elements of a rate-limiting group 'g' with a random
469     starting point, assigning each to the variable 'bev', and executing the
470     block 'block'.
471 
472     We do this in a half-baked effort to get fairness among group members.
473     XXX Round-robin or some kind of priority queue would be even more fair.
474  */
475 #define FOREACH_RANDOM_ORDER(block)			\
476 	do {						\
477 		first = bev_group_random_element_(g);	\
478 		for (bev = first; bev != LIST_END(&g->members); \
479 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
480 			block ;					 \
481 		}						 \
482 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
483 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
484 			block ;						\
485 		}							\
486 	} while (/*CONSTCOND*/0)
487 
488 static void
489 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
490 {
491 	int again = 0;
492 	struct bufferevent_private *bev, *first;
493 
494 	g->read_suspended = 0;
495 	FOREACH_RANDOM_ORDER({
496 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
497 			bufferevent_unsuspend_read_(&bev->bev,
498 			    BEV_SUSPEND_BW_GROUP);
499 			EVLOCK_UNLOCK(bev->lock, 0);
500 		} else {
501 			again = 1;
502 		}
503 	});
504 	g->pending_unsuspend_read = again;
505 }
506 
507 static void
508 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
509 {
510 	int again = 0;
511 	struct bufferevent_private *bev, *first;
512 	g->write_suspended = 0;
513 
514 	FOREACH_RANDOM_ORDER({
515 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
516 			bufferevent_unsuspend_write_(&bev->bev,
517 			    BEV_SUSPEND_BW_GROUP);
518 			EVLOCK_UNLOCK(bev->lock, 0);
519 		} else {
520 			again = 1;
521 		}
522 	});
523 	g->pending_unsuspend_write = again;
524 }
525 
526 /** Callback invoked every tick to add more elements to the group bucket
527     and unsuspend group members as needed.
528  */
529 static void
530 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
531 {
532 	struct bufferevent_rate_limit_group *g = arg;
533 	unsigned tick;
534 	struct timeval now;
535 
536 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
537 
538 	LOCK_GROUP(g);
539 
540 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
541 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
542 
543 	if (g->pending_unsuspend_read ||
544 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
545 		bev_group_unsuspend_reading_(g);
546 	}
547 	if (g->pending_unsuspend_write ||
548 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
549 		bev_group_unsuspend_writing_(g);
550 	}
551 
552 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
553 	 * with pending_unsuspend_write/read, we should do it on the
554 	 * next iteration of the mainloop.
555 	 */
556 
557 	UNLOCK_GROUP(g);
558 }
559 
560 int
561 bufferevent_set_rate_limit(struct bufferevent *bev,
562     struct ev_token_bucket_cfg *cfg)
563 {
564 	struct bufferevent_private *bevp =
565 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
566 	int r = -1;
567 	struct bufferevent_rate_limit *rlim;
568 	struct timeval now;
569 	ev_uint32_t tick;
570 	int reinit = 0, suspended = 0;
571 	/* XXX reference-count cfg */
572 
573 	BEV_LOCK(bev);
574 
575 	if (cfg == NULL) {
576 		if (bevp->rate_limiting) {
577 			rlim = bevp->rate_limiting;
578 			rlim->cfg = NULL;
579 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
580 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
581 			if (event_initialized(&rlim->refill_bucket_event))
582 				event_del(&rlim->refill_bucket_event);
583 		}
584 		r = 0;
585 		goto done;
586 	}
587 
588 	event_base_gettimeofday_cached(bev->ev_base, &now);
589 	tick = ev_token_bucket_get_tick_(&now, cfg);
590 
591 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
592 		/* no-op */
593 		r = 0;
594 		goto done;
595 	}
596 	if (bevp->rate_limiting == NULL) {
597 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
598 		if (!rlim)
599 			goto done;
600 		bevp->rate_limiting = rlim;
601 	} else {
602 		rlim = bevp->rate_limiting;
603 	}
604 	reinit = rlim->cfg != NULL;
605 
606 	rlim->cfg = cfg;
607 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
608 
609 	if (reinit) {
610 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
611 		event_del(&rlim->refill_bucket_event);
612 	}
613 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
614 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
615 
616 	if (rlim->limit.read_limit > 0) {
617 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
618 	} else {
619 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
620 		suspended=1;
621 	}
622 	if (rlim->limit.write_limit > 0) {
623 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
624 	} else {
625 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
626 		suspended = 1;
627 	}
628 
629 	if (suspended)
630 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
631 
632 	r = 0;
633 
634 done:
635 	BEV_UNLOCK(bev);
636 	return r;
637 }
638 
639 struct bufferevent_rate_limit_group *
640 bufferevent_rate_limit_group_new(struct event_base *base,
641     const struct ev_token_bucket_cfg *cfg)
642 {
643 	struct bufferevent_rate_limit_group *g;
644 	struct timeval now;
645 	ev_uint32_t tick;
646 
647 	event_base_gettimeofday_cached(base, &now);
648 	tick = ev_token_bucket_get_tick_(&now, cfg);
649 
650 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
651 	if (!g)
652 		return NULL;
653 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
654 	LIST_INIT(&g->members);
655 
656 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
657 
658 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
659 	    bev_group_refill_callback_, g);
660 	/*XXXX handle event_add failure */
661 	event_add(&g->master_refill_event, &cfg->tick_timeout);
662 
663 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
664 
665 	bufferevent_rate_limit_group_set_min_share(g, 64);
666 
667 	evutil_weakrand_seed_(&g->weakrand_seed,
668 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
669 
670 	return g;
671 }
672 
673 int
674 bufferevent_rate_limit_group_set_cfg(
675 	struct bufferevent_rate_limit_group *g,
676 	const struct ev_token_bucket_cfg *cfg)
677 {
678 	int same_tick;
679 	if (!g || !cfg)
680 		return -1;
681 
682 	LOCK_GROUP(g);
683 	same_tick = evutil_timercmp(
684 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
685 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
686 
687 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
688 		g->rate_limit.read_limit = cfg->read_maximum;
689 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
690 		g->rate_limit.write_limit = cfg->write_maximum;
691 
692 	if (!same_tick) {
693 		/* This can cause a hiccup in the schedule */
694 		event_add(&g->master_refill_event, &cfg->tick_timeout);
695 	}
696 
697 	/* The new limits might force us to adjust min_share differently. */
698 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
699 
700 	UNLOCK_GROUP(g);
701 	return 0;
702 }
703 
704 int
705 bufferevent_rate_limit_group_set_min_share(
706 	struct bufferevent_rate_limit_group *g,
707 	size_t share)
708 {
709 	if (share > EV_SSIZE_MAX)
710 		return -1;
711 
712 	g->configured_min_share = share;
713 
714 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
715 	 * state, at least one connection can go per tick. */
716 	if (share > g->rate_limit_cfg.read_rate)
717 		share = g->rate_limit_cfg.read_rate;
718 	if (share > g->rate_limit_cfg.write_rate)
719 		share = g->rate_limit_cfg.write_rate;
720 
721 	g->min_share = share;
722 	return 0;
723 }
724 
725 void
726 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
727 {
728 	LOCK_GROUP(g);
729 	EVUTIL_ASSERT(0 == g->n_members);
730 	event_del(&g->master_refill_event);
731 	UNLOCK_GROUP(g);
732 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
733 	mm_free(g);
734 }
735 
736 int
737 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
738     struct bufferevent_rate_limit_group *g)
739 {
740 	int wsuspend, rsuspend;
741 	struct bufferevent_private *bevp =
742 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
743 	BEV_LOCK(bev);
744 
745 	if (!bevp->rate_limiting) {
746 		struct bufferevent_rate_limit *rlim;
747 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
748 		if (!rlim) {
749 			BEV_UNLOCK(bev);
750 			return -1;
751 		}
752 		event_assign(&rlim->refill_bucket_event, bev->ev_base,
753 		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
754 		bevp->rate_limiting = rlim;
755 	}
756 
757 	if (bevp->rate_limiting->group == g) {
758 		BEV_UNLOCK(bev);
759 		return 0;
760 	}
761 	if (bevp->rate_limiting->group)
762 		bufferevent_remove_from_rate_limit_group(bev);
763 
764 	LOCK_GROUP(g);
765 	bevp->rate_limiting->group = g;
766 	++g->n_members;
767 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
768 
769 	rsuspend = g->read_suspended;
770 	wsuspend = g->write_suspended;
771 
772 	UNLOCK_GROUP(g);
773 
774 	if (rsuspend)
775 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
776 	if (wsuspend)
777 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
778 
779 	BEV_UNLOCK(bev);
780 	return 0;
781 }
782 
783 int
784 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
785 {
786 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
787 }
788 
789 int
790 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
791     int unsuspend)
792 {
793 	struct bufferevent_private *bevp =
794 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
795 	BEV_LOCK(bev);
796 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
797 		struct bufferevent_rate_limit_group *g =
798 		    bevp->rate_limiting->group;
799 		LOCK_GROUP(g);
800 		bevp->rate_limiting->group = NULL;
801 		--g->n_members;
802 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
803 		UNLOCK_GROUP(g);
804 	}
805 	if (unsuspend) {
806 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
807 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
808 	}
809 	BEV_UNLOCK(bev);
810 	return 0;
811 }
812 
813 /* ===
814  * API functions to expose rate limits.
815  *
816  * Don't use these from inside Libevent; they're meant to be for use by
817  * the program.
818  * === */
819 
820 /* Mostly you don't want to use this function from inside libevent;
821  * bufferevent_get_read_max_() is more likely what you want*/
822 ev_ssize_t
823 bufferevent_get_read_limit(struct bufferevent *bev)
824 {
825 	ev_ssize_t r;
826 	struct bufferevent_private *bevp;
827 	BEV_LOCK(bev);
828 	bevp = BEV_UPCAST(bev);
829 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
830 		bufferevent_update_buckets(bevp);
831 		r = bevp->rate_limiting->limit.read_limit;
832 	} else {
833 		r = EV_SSIZE_MAX;
834 	}
835 	BEV_UNLOCK(bev);
836 	return r;
837 }
838 
839 /* Mostly you don't want to use this function from inside libevent;
840  * bufferevent_get_write_max_() is more likely what you want*/
841 ev_ssize_t
842 bufferevent_get_write_limit(struct bufferevent *bev)
843 {
844 	ev_ssize_t r;
845 	struct bufferevent_private *bevp;
846 	BEV_LOCK(bev);
847 	bevp = BEV_UPCAST(bev);
848 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
849 		bufferevent_update_buckets(bevp);
850 		r = bevp->rate_limiting->limit.write_limit;
851 	} else {
852 		r = EV_SSIZE_MAX;
853 	}
854 	BEV_UNLOCK(bev);
855 	return r;
856 }
857 
858 int
859 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
860 {
861 	struct bufferevent_private *bevp;
862 	BEV_LOCK(bev);
863 	bevp = BEV_UPCAST(bev);
864 	if (size == 0 || size > EV_SSIZE_MAX)
865 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
866 	else
867 		bevp->max_single_read = size;
868 	BEV_UNLOCK(bev);
869 	return 0;
870 }
871 
872 int
873 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
874 {
875 	struct bufferevent_private *bevp;
876 	BEV_LOCK(bev);
877 	bevp = BEV_UPCAST(bev);
878 	if (size == 0 || size > EV_SSIZE_MAX)
879 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
880 	else
881 		bevp->max_single_write = size;
882 	BEV_UNLOCK(bev);
883 	return 0;
884 }
885 
886 ev_ssize_t
887 bufferevent_get_max_single_read(struct bufferevent *bev)
888 {
889 	ev_ssize_t r;
890 
891 	BEV_LOCK(bev);
892 	r = BEV_UPCAST(bev)->max_single_read;
893 	BEV_UNLOCK(bev);
894 	return r;
895 }
896 
897 ev_ssize_t
898 bufferevent_get_max_single_write(struct bufferevent *bev)
899 {
900 	ev_ssize_t r;
901 
902 	BEV_LOCK(bev);
903 	r = BEV_UPCAST(bev)->max_single_write;
904 	BEV_UNLOCK(bev);
905 	return r;
906 }
907 
908 ev_ssize_t
909 bufferevent_get_max_to_read(struct bufferevent *bev)
910 {
911 	ev_ssize_t r;
912 	BEV_LOCK(bev);
913 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
914 	BEV_UNLOCK(bev);
915 	return r;
916 }
917 
918 ev_ssize_t
919 bufferevent_get_max_to_write(struct bufferevent *bev)
920 {
921 	ev_ssize_t r;
922 	BEV_LOCK(bev);
923 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
924 	BEV_UNLOCK(bev);
925 	return r;
926 }
927 
928 const struct ev_token_bucket_cfg *
929 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
930 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
931 	struct ev_token_bucket_cfg *cfg;
932 
933 	BEV_LOCK(bev);
934 
935 	if (bufev_private->rate_limiting) {
936 		cfg = bufev_private->rate_limiting->cfg;
937 	} else {
938 		cfg = NULL;
939 	}
940 
941 	BEV_UNLOCK(bev);
942 
943 	return cfg;
944 }
945 
946 /* Mostly you don't want to use this function from inside libevent;
947  * bufferevent_get_read_max_() is more likely what you want*/
948 ev_ssize_t
949 bufferevent_rate_limit_group_get_read_limit(
950 	struct bufferevent_rate_limit_group *grp)
951 {
952 	ev_ssize_t r;
953 	LOCK_GROUP(grp);
954 	r = grp->rate_limit.read_limit;
955 	UNLOCK_GROUP(grp);
956 	return r;
957 }
958 
959 /* Mostly you don't want to use this function from inside libevent;
960  * bufferevent_get_write_max_() is more likely what you want. */
961 ev_ssize_t
962 bufferevent_rate_limit_group_get_write_limit(
963 	struct bufferevent_rate_limit_group *grp)
964 {
965 	ev_ssize_t r;
966 	LOCK_GROUP(grp);
967 	r = grp->rate_limit.write_limit;
968 	UNLOCK_GROUP(grp);
969 	return r;
970 }
971 
972 int
973 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
974 {
975 	int r = 0;
976 	ev_ssize_t old_limit, new_limit;
977 	struct bufferevent_private *bevp;
978 	BEV_LOCK(bev);
979 	bevp = BEV_UPCAST(bev);
980 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
981 	old_limit = bevp->rate_limiting->limit.read_limit;
982 
983 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
984 	if (old_limit > 0 && new_limit <= 0) {
985 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
986 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
987 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
988 			r = -1;
989 	} else if (old_limit <= 0 && new_limit > 0) {
990 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
991 			event_del(&bevp->rate_limiting->refill_bucket_event);
992 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
993 	}
994 
995 	BEV_UNLOCK(bev);
996 	return r;
997 }
998 
999 int
1000 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1001 {
1002 	/* XXXX this is mostly copy-and-paste from
1003 	 * bufferevent_decrement_read_limit */
1004 	int r = 0;
1005 	ev_ssize_t old_limit, new_limit;
1006 	struct bufferevent_private *bevp;
1007 	BEV_LOCK(bev);
1008 	bevp = BEV_UPCAST(bev);
1009 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1010 	old_limit = bevp->rate_limiting->limit.write_limit;
1011 
1012 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1013 	if (old_limit > 0 && new_limit <= 0) {
1014 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1015 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1016 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1017 			r = -1;
1018 	} else if (old_limit <= 0 && new_limit > 0) {
1019 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1020 			event_del(&bevp->rate_limiting->refill_bucket_event);
1021 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1022 	}
1023 
1024 	BEV_UNLOCK(bev);
1025 	return r;
1026 }
1027 
1028 int
1029 bufferevent_rate_limit_group_decrement_read(
1030 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1031 {
1032 	int r = 0;
1033 	ev_ssize_t old_limit, new_limit;
1034 	LOCK_GROUP(grp);
1035 	old_limit = grp->rate_limit.read_limit;
1036 	new_limit = (grp->rate_limit.read_limit -= decr);
1037 
1038 	if (old_limit > 0 && new_limit <= 0) {
1039 		bev_group_suspend_reading_(grp);
1040 	} else if (old_limit <= 0 && new_limit > 0) {
1041 		bev_group_unsuspend_reading_(grp);
1042 	}
1043 
1044 	UNLOCK_GROUP(grp);
1045 	return r;
1046 }
1047 
1048 int
1049 bufferevent_rate_limit_group_decrement_write(
1050 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1051 {
1052 	int r = 0;
1053 	ev_ssize_t old_limit, new_limit;
1054 	LOCK_GROUP(grp);
1055 	old_limit = grp->rate_limit.write_limit;
1056 	new_limit = (grp->rate_limit.write_limit -= decr);
1057 
1058 	if (old_limit > 0 && new_limit <= 0) {
1059 		bev_group_suspend_writing_(grp);
1060 	} else if (old_limit <= 0 && new_limit > 0) {
1061 		bev_group_unsuspend_writing_(grp);
1062 	}
1063 
1064 	UNLOCK_GROUP(grp);
1065 	return r;
1066 }
1067 
1068 void
1069 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1070     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1071 {
1072 	EVUTIL_ASSERT(grp != NULL);
1073 	if (total_read_out)
1074 		*total_read_out = grp->total_read;
1075 	if (total_written_out)
1076 		*total_written_out = grp->total_written;
1077 }
1078 
1079 void
1080 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1081 {
1082 	grp->total_read = grp->total_written = 0;
1083 }
1084 
1085 int
1086 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1087 {
1088 	bev->rate_limiting = NULL;
1089 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1090 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1091 
1092 	return 0;
1093 }
1094