1 /*	$NetBSD: bufferevent_ratelim.c,v 1.5 2020/05/25 20:47:33 christos Exp $	*/
2 
3 /*
4  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. The name of the author may not be used to endorse or promote products
17  *    derived from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 #include "evconfig-private.h"
31 
32 #include <sys/types.h>
33 #include <limits.h>
34 #include <string.h>
35 #include <stdlib.h>
36 
37 #include "event2/event.h"
38 #include "event2/event_struct.h"
39 #include "event2/util.h"
40 #include "event2/bufferevent.h"
41 #include "event2/bufferevent_struct.h"
42 #include "event2/buffer.h"
43 
44 #include "ratelim-internal.h"
45 
46 #include "bufferevent-internal.h"
47 #include "mm-internal.h"
48 #include "util-internal.h"
49 #include "event-internal.h"
50 
51 int
ev_token_bucket_init_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)52 ev_token_bucket_init_(struct ev_token_bucket *bucket,
53     const struct ev_token_bucket_cfg *cfg,
54     ev_uint32_t current_tick,
55     int reinitialize)
56 {
57 	if (reinitialize) {
58 		/* on reinitialization, we only clip downwards, since we've
59 		   already used who-knows-how-much bandwidth this tick.  We
60 		   leave "last_updated" as it is; the next update will add the
61 		   appropriate amount of bandwidth to the bucket.
62 		*/
63 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
64 			bucket->read_limit = cfg->read_maximum;
65 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
66 			bucket->write_limit = cfg->write_maximum;
67 	} else {
68 		bucket->read_limit = cfg->read_rate;
69 		bucket->write_limit = cfg->write_rate;
70 		bucket->last_updated = current_tick;
71 	}
72 	return 0;
73 }
74 
75 int
ev_token_bucket_update_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)76 ev_token_bucket_update_(struct ev_token_bucket *bucket,
77     const struct ev_token_bucket_cfg *cfg,
78     ev_uint32_t current_tick)
79 {
80 	/* It's okay if the tick number overflows, since we'll just
81 	 * wrap around when we do the unsigned substraction. */
82 	unsigned n_ticks = current_tick - bucket->last_updated;
83 
84 	/* Make sure some ticks actually happened, and that time didn't
85 	 * roll back. */
86 	if (n_ticks == 0 || n_ticks > INT_MAX)
87 		return 0;
88 
89 	/* Naively, we would say
90 		bucket->limit += n_ticks * cfg->rate;
91 
92 		if (bucket->limit > cfg->maximum)
93 			bucket->limit = cfg->maximum;
94 
95 	   But we're worried about overflow, so we do it like this:
96 	*/
97 
98 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
99 		bucket->read_limit = cfg->read_maximum;
100 	else
101 		bucket->read_limit += n_ticks * cfg->read_rate;
102 
103 
104 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
105 		bucket->write_limit = cfg->write_maximum;
106 	else
107 		bucket->write_limit += n_ticks * cfg->write_rate;
108 
109 
110 	bucket->last_updated = current_tick;
111 
112 	return 1;
113 }
114 
115 static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)116 bufferevent_update_buckets(struct bufferevent_private *bev)
117 {
118 	/* Must hold lock on bev. */
119 	struct timeval now;
120 	unsigned tick;
121 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
123 	if (tick != bev->rate_limiting->limit.last_updated)
124 		ev_token_bucket_update_(&bev->rate_limiting->limit,
125 		    bev->rate_limiting->cfg, tick);
126 }
127 
128 ev_uint32_t
ev_token_bucket_get_tick_(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)129 ev_token_bucket_get_tick_(const struct timeval *tv,
130     const struct ev_token_bucket_cfg *cfg)
131 {
132 	/* This computation uses two multiplies and a divide.  We could do
133 	 * fewer if we knew that the tick length was an integer number of
134 	 * seconds, or if we knew it divided evenly into a second.  We should
135 	 * investigate that more.
136 	 */
137 
138 	/* We cast to an ev_uint64_t first, since we don't want to overflow
139 	 * before we do the final divide. */
140 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
141 	return (unsigned)(msec / cfg->msec_per_tick);
142 }
143 
144 struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)145 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
146     size_t write_rate, size_t write_burst,
147     const struct timeval *tick_len)
148 {
149 	struct ev_token_bucket_cfg *r;
150 	struct timeval g;
151 	if (! tick_len) {
152 		g.tv_sec = 1;
153 		g.tv_usec = 0;
154 		tick_len = &g;
155 	}
156 	if (read_rate > read_burst || write_rate > write_burst ||
157 	    read_rate < 1 || write_rate < 1)
158 		return NULL;
159 	if (read_rate > EV_RATE_LIMIT_MAX ||
160 	    write_rate > EV_RATE_LIMIT_MAX ||
161 	    read_burst > EV_RATE_LIMIT_MAX ||
162 	    write_burst > EV_RATE_LIMIT_MAX)
163 		return NULL;
164 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165 	if (!r)
166 		return NULL;
167 	r->read_rate = read_rate;
168 	r->write_rate = write_rate;
169 	r->read_maximum = read_burst;
170 	r->write_maximum = write_burst;
171 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
172 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
173 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
174 	return r;
175 }
176 
177 void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)178 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
179 {
180 	mm_free(cfg);
181 }
182 
183 /* Default values for max_single_read & max_single_write variables. */
184 #define MAX_SINGLE_READ_DEFAULT 16384
185 #define MAX_SINGLE_WRITE_DEFAULT 16384
186 
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189 
190 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
194 
195 /** Helper: figure out the maximum amount we should write if is_write, or
196     the maximum amount we should read if is_read.  Return that maximum, or
197     0 if our bucket is wholly exhausted.
198  */
199 static inline ev_ssize_t
bufferevent_get_rlim_max_(struct bufferevent_private * bev,int is_write)200 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
201 {
202 	/* needs lock on bev. */
203 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204 
205 #define LIM(x)						\
206 	(is_write ? (x).write_limit : (x).read_limit)
207 
208 #define GROUP_SUSPENDED(g)			\
209 	(is_write ? (g)->write_suspended : (g)->read_suspended)
210 
211 	/* Sets max_so_far to MIN(x, max_so_far) */
212 #define CLAMPTO(x)				\
213 	do {					\
214 		if (max_so_far > (x))		\
215 			max_so_far = (x);	\
216 	} while (0);
217 
218 	if (!bev->rate_limiting)
219 		return max_so_far;
220 
221 	/* If rate-limiting is enabled at all, update the appropriate
222 	   bucket, and take the smaller of our rate limit and the group
223 	   rate limit.
224 	 */
225 
226 	if (bev->rate_limiting->cfg) {
227 		bufferevent_update_buckets(bev);
228 		max_so_far = LIM(bev->rate_limiting->limit);
229 	}
230 	if (bev->rate_limiting->group) {
231 		struct bufferevent_rate_limit_group *g =
232 		    bev->rate_limiting->group;
233 		ev_ssize_t share;
234 		LOCK_GROUP(g);
235 		if (GROUP_SUSPENDED(g)) {
236 			/* We can get here if we failed to lock this
237 			 * particular bufferevent while suspending the whole
238 			 * group. */
239 			if (is_write)
240 				bufferevent_suspend_write_(&bev->bev,
241 				    BEV_SUSPEND_BW_GROUP);
242 			else
243 				bufferevent_suspend_read_(&bev->bev,
244 				    BEV_SUSPEND_BW_GROUP);
245 			share = 0;
246 		} else {
247 			/* XXXX probably we should divide among the active
248 			 * members, not the total members. */
249 			share = LIM(g->rate_limit) / g->n_members;
250 			if (share < g->min_share)
251 				share = g->min_share;
252 		}
253 		UNLOCK_GROUP(g);
254 		CLAMPTO(share);
255 	}
256 
257 	if (max_so_far < 0)
258 		max_so_far = 0;
259 	return max_so_far;
260 }
261 
262 ev_ssize_t
bufferevent_get_read_max_(struct bufferevent_private * bev)263 bufferevent_get_read_max_(struct bufferevent_private *bev)
264 {
265 	return bufferevent_get_rlim_max_(bev, 0);
266 }
267 
268 ev_ssize_t
bufferevent_get_write_max_(struct bufferevent_private * bev)269 bufferevent_get_write_max_(struct bufferevent_private *bev)
270 {
271 	return bufferevent_get_rlim_max_(bev, 1);
272 }
273 
274 int
bufferevent_decrement_read_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)275 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277 	/* XXXXX Make sure all users of this function check its return value */
278 	int r = 0;
279 	/* need to hold lock on bev */
280 	if (!bev->rate_limiting)
281 		return 0;
282 
283 	if (bev->rate_limiting->cfg) {
284 		bev->rate_limiting->limit.read_limit -= bytes;
285 		if (bev->rate_limiting->limit.read_limit <= 0) {
286 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
287 			if (event_add(&bev->rate_limiting->refill_bucket_event,
288 				&bev->rate_limiting->cfg->tick_timeout) < 0)
289 				r = -1;
290 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
291 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
292 				event_del(&bev->rate_limiting->refill_bucket_event);
293 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
294 		}
295 	}
296 
297 	if (bev->rate_limiting->group) {
298 		LOCK_GROUP(bev->rate_limiting->group);
299 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300 		bev->rate_limiting->group->total_read += bytes;
301 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302 			bev_group_suspend_reading_(bev->rate_limiting->group);
303 		} else if (bev->rate_limiting->group->read_suspended) {
304 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
305 		}
306 		UNLOCK_GROUP(bev->rate_limiting->group);
307 	}
308 
309 	return r;
310 }
311 
312 int
bufferevent_decrement_write_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)313 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315 	/* XXXXX Make sure all users of this function check its return value */
316 	int r = 0;
317 	/* need to hold lock */
318 	if (!bev->rate_limiting)
319 		return 0;
320 
321 	if (bev->rate_limiting->cfg) {
322 		bev->rate_limiting->limit.write_limit -= bytes;
323 		if (bev->rate_limiting->limit.write_limit <= 0) {
324 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
325 			if (event_add(&bev->rate_limiting->refill_bucket_event,
326 				&bev->rate_limiting->cfg->tick_timeout) < 0)
327 				r = -1;
328 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
329 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
330 				event_del(&bev->rate_limiting->refill_bucket_event);
331 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
332 		}
333 	}
334 
335 	if (bev->rate_limiting->group) {
336 		LOCK_GROUP(bev->rate_limiting->group);
337 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338 		bev->rate_limiting->group->total_written += bytes;
339 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340 			bev_group_suspend_writing_(bev->rate_limiting->group);
341 		} else if (bev->rate_limiting->group->write_suspended) {
342 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
343 		}
344 		UNLOCK_GROUP(bev->rate_limiting->group);
345 	}
346 
347 	return r;
348 }
349 
350 /** Stop reading on every bufferevent in <b>g</b> */
351 static int
bev_group_suspend_reading_(struct bufferevent_rate_limit_group * g)352 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
353 {
354 	/* Needs group lock */
355 	struct bufferevent_private *bev;
356 	g->read_suspended = 1;
357 	g->pending_unsuspend_read = 0;
358 
359 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
360 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
361 	   the bufferevent locks.  If we are unable to lock any individual
362 	   bufferevent, it will find out later when it looks at its limit
363 	   and sees that its group is suspended.)
364 	*/
365 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
367 			bufferevent_suspend_read_(&bev->bev,
368 			    BEV_SUSPEND_BW_GROUP);
369 			EVLOCK_UNLOCK(bev->lock, 0);
370 		}
371 	}
372 	return 0;
373 }
374 
375 /** Stop writing on every bufferevent in <b>g</b> */
376 static int
bev_group_suspend_writing_(struct bufferevent_rate_limit_group * g)377 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
378 {
379 	/* Needs group lock */
380 	struct bufferevent_private *bev;
381 	g->write_suspended = 1;
382 	g->pending_unsuspend_write = 0;
383 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
385 			bufferevent_suspend_write_(&bev->bev,
386 			    BEV_SUSPEND_BW_GROUP);
387 			EVLOCK_UNLOCK(bev->lock, 0);
388 		}
389 	}
390 	return 0;
391 }
392 
393 /** Timer callback invoked on a single bufferevent with one or more exhausted
394     buckets when they are ready to refill. */
395 static void
bev_refill_callback_(evutil_socket_t fd,short what,void * arg)396 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
397 {
398 	unsigned tick;
399 	struct timeval now;
400 	struct bufferevent_private *bev = arg;
401 	int again = 0;
402 	BEV_LOCK(&bev->bev);
403 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404 		BEV_UNLOCK(&bev->bev);
405 		return;
406 	}
407 
408 	/* First, update the bucket */
409 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410 	tick = ev_token_bucket_get_tick_(&now,
411 	    bev->rate_limiting->cfg);
412 	ev_token_bucket_update_(&bev->rate_limiting->limit,
413 	    bev->rate_limiting->cfg,
414 	    tick);
415 
416 	/* Now unsuspend any read/write operations as appropriate. */
417 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418 		if (bev->rate_limiting->limit.read_limit > 0)
419 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
420 		else
421 			again = 1;
422 	}
423 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424 		if (bev->rate_limiting->limit.write_limit > 0)
425 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
426 		else
427 			again = 1;
428 	}
429 	if (again) {
430 		/* One or more of the buckets may need another refill if they
431 		   started negative.
432 
433 		   XXXX if we need to be quiet for more ticks, we should
434 		   maybe figure out what timeout we really want.
435 		*/
436 		/* XXXX Handle event_add failure somehow */
437 		event_add(&bev->rate_limiting->refill_bucket_event,
438 		    &bev->rate_limiting->cfg->tick_timeout);
439 	}
440 	BEV_UNLOCK(&bev->bev);
441 }
442 
443 /** Helper: grab a random element from a bufferevent group.
444  *
445  * Requires that we hold the lock on the group.
446  */
447 static struct bufferevent_private *
bev_group_random_element_(struct bufferevent_rate_limit_group * group)448 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449 {
450 	int which;
451 	struct bufferevent_private *bev;
452 
453 	/* requires group lock */
454 
455 	if (!group->n_members)
456 		return NULL;
457 
458 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
459 
460 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
461 
462 	bev = LIST_FIRST(&group->members);
463 	while (which--)
464 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
465 
466 	return bev;
467 }
468 
469 /** Iterate over the elements of a rate-limiting group 'g' with a random
470     starting point, assigning each to the variable 'bev', and executing the
471     block 'block'.
472 
473     We do this in a half-baked effort to get fairness among group members.
474     XXX Round-robin or some kind of priority queue would be even more fair.
475  */
476 #define FOREACH_RANDOM_ORDER(block)			\
477 	do {						\
478 		first = bev_group_random_element_(g);	\
479 		for (bev = first; bev != LIST_END(&g->members); \
480 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481 			block ;					 \
482 		}						 \
483 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
485 			block ;						\
486 		}							\
487 	} while (0)
488 
489 static void
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group * g)490 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491 {
492 	int again = 0;
493 	struct bufferevent_private *bev, *first;
494 
495 	g->read_suspended = 0;
496 	FOREACH_RANDOM_ORDER({
497 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
498 			bufferevent_unsuspend_read_(&bev->bev,
499 			    BEV_SUSPEND_BW_GROUP);
500 			EVLOCK_UNLOCK(bev->lock, 0);
501 		} else {
502 			again = 1;
503 		}
504 	});
505 	g->pending_unsuspend_read = again;
506 }
507 
508 static void
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group * g)509 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510 {
511 	int again = 0;
512 	struct bufferevent_private *bev, *first;
513 	g->write_suspended = 0;
514 
515 	FOREACH_RANDOM_ORDER({
516 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
517 			bufferevent_unsuspend_write_(&bev->bev,
518 			    BEV_SUSPEND_BW_GROUP);
519 			EVLOCK_UNLOCK(bev->lock, 0);
520 		} else {
521 			again = 1;
522 		}
523 	});
524 	g->pending_unsuspend_write = again;
525 }
526 
527 /** Callback invoked every tick to add more elements to the group bucket
528     and unsuspend group members as needed.
529  */
530 static void
bev_group_refill_callback_(evutil_socket_t fd,short what,void * arg)531 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
532 {
533 	struct bufferevent_rate_limit_group *g = arg;
534 	unsigned tick;
535 	struct timeval now;
536 
537 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
538 
539 	LOCK_GROUP(g);
540 
541 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
543 
544 	if (g->pending_unsuspend_read ||
545 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546 		bev_group_unsuspend_reading_(g);
547 	}
548 	if (g->pending_unsuspend_write ||
549 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550 		bev_group_unsuspend_writing_(g);
551 	}
552 
553 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
554 	 * with pending_unsuspend_write/read, we should do it on the
555 	 * next iteration of the mainloop.
556 	 */
557 
558 	UNLOCK_GROUP(g);
559 }
560 
561 int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)562 bufferevent_set_rate_limit(struct bufferevent *bev,
563     struct ev_token_bucket_cfg *cfg)
564 {
565 	struct bufferevent_private *bevp =
566 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
567 	int r = -1;
568 	struct bufferevent_rate_limit *rlim;
569 	struct timeval now;
570 	ev_uint32_t tick;
571 	int reinit = 0, suspended = 0;
572 	/* XXX reference-count cfg */
573 
574 	BEV_LOCK(bev);
575 
576 	if (cfg == NULL) {
577 		if (bevp->rate_limiting) {
578 			rlim = bevp->rate_limiting;
579 			rlim->cfg = NULL;
580 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
581 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
582 			if (event_initialized(&rlim->refill_bucket_event))
583 				event_del(&rlim->refill_bucket_event);
584 		}
585 		r = 0;
586 		goto done;
587 	}
588 
589 	event_base_gettimeofday_cached(bev->ev_base, &now);
590 	tick = ev_token_bucket_get_tick_(&now, cfg);
591 
592 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
593 		/* no-op */
594 		r = 0;
595 		goto done;
596 	}
597 	if (bevp->rate_limiting == NULL) {
598 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
599 		if (!rlim)
600 			goto done;
601 		bevp->rate_limiting = rlim;
602 	} else {
603 		rlim = bevp->rate_limiting;
604 	}
605 	reinit = rlim->cfg != NULL;
606 
607 	rlim->cfg = cfg;
608 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
609 
610 	if (reinit) {
611 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
612 		event_del(&rlim->refill_bucket_event);
613 	}
614 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
615 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
616 
617 	if (rlim->limit.read_limit > 0) {
618 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
619 	} else {
620 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
621 		suspended=1;
622 	}
623 	if (rlim->limit.write_limit > 0) {
624 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
625 	} else {
626 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
627 		suspended = 1;
628 	}
629 
630 	if (suspended)
631 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
632 
633 	r = 0;
634 
635 done:
636 	BEV_UNLOCK(bev);
637 	return r;
638 }
639 
640 struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)641 bufferevent_rate_limit_group_new(struct event_base *base,
642     const struct ev_token_bucket_cfg *cfg)
643 {
644 	struct bufferevent_rate_limit_group *g;
645 	struct timeval now;
646 	ev_uint32_t tick;
647 
648 	event_base_gettimeofday_cached(base, &now);
649 	tick = ev_token_bucket_get_tick_(&now, cfg);
650 
651 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
652 	if (!g)
653 		return NULL;
654 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
655 	LIST_INIT(&g->members);
656 
657 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
658 
659 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
660 	    bev_group_refill_callback_, g);
661 	/*XXXX handle event_add failure */
662 	event_add(&g->master_refill_event, &cfg->tick_timeout);
663 
664 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
665 
666 	bufferevent_rate_limit_group_set_min_share(g, 64);
667 
668 	evutil_weakrand_seed_(&g->weakrand_seed,
669 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
670 
671 	return g;
672 }
673 
674 int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)675 bufferevent_rate_limit_group_set_cfg(
676 	struct bufferevent_rate_limit_group *g,
677 	const struct ev_token_bucket_cfg *cfg)
678 {
679 	int same_tick;
680 	if (!g || !cfg)
681 		return -1;
682 
683 	LOCK_GROUP(g);
684 	same_tick = evutil_timercmp(
685 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
686 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
687 
688 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
689 		g->rate_limit.read_limit = cfg->read_maximum;
690 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
691 		g->rate_limit.write_limit = cfg->write_maximum;
692 
693 	if (!same_tick) {
694 		/* This can cause a hiccup in the schedule */
695 		event_add(&g->master_refill_event, &cfg->tick_timeout);
696 	}
697 
698 	/* The new limits might force us to adjust min_share differently. */
699 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
700 
701 	UNLOCK_GROUP(g);
702 	return 0;
703 }
704 
705 int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)706 bufferevent_rate_limit_group_set_min_share(
707 	struct bufferevent_rate_limit_group *g,
708 	size_t share)
709 {
710 	if (share > EV_SSIZE_MAX)
711 		return -1;
712 
713 	g->configured_min_share = share;
714 
715 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
716 	 * state, at least one connection can go per tick. */
717 	if (share > g->rate_limit_cfg.read_rate)
718 		share = g->rate_limit_cfg.read_rate;
719 	if (share > g->rate_limit_cfg.write_rate)
720 		share = g->rate_limit_cfg.write_rate;
721 
722 	g->min_share = share;
723 	return 0;
724 }
725 
726 void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)727 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
728 {
729 	LOCK_GROUP(g);
730 	EVUTIL_ASSERT(0 == g->n_members);
731 	event_del(&g->master_refill_event);
732 	UNLOCK_GROUP(g);
733 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
734 	mm_free(g);
735 }
736 
737 int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)738 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
739     struct bufferevent_rate_limit_group *g)
740 {
741 	int wsuspend, rsuspend;
742 	struct bufferevent_private *bevp =
743 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
744 	BEV_LOCK(bev);
745 
746 	if (!bevp->rate_limiting) {
747 		struct bufferevent_rate_limit *rlim;
748 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
749 		if (!rlim) {
750 			BEV_UNLOCK(bev);
751 			return -1;
752 		}
753 		event_assign(&rlim->refill_bucket_event, bev->ev_base,
754 		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
755 		bevp->rate_limiting = rlim;
756 	}
757 
758 	if (bevp->rate_limiting->group == g) {
759 		BEV_UNLOCK(bev);
760 		return 0;
761 	}
762 	if (bevp->rate_limiting->group)
763 		bufferevent_remove_from_rate_limit_group(bev);
764 
765 	LOCK_GROUP(g);
766 	bevp->rate_limiting->group = g;
767 	++g->n_members;
768 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
769 
770 	rsuspend = g->read_suspended;
771 	wsuspend = g->write_suspended;
772 
773 	UNLOCK_GROUP(g);
774 
775 	if (rsuspend)
776 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
777 	if (wsuspend)
778 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
779 
780 	BEV_UNLOCK(bev);
781 	return 0;
782 }
783 
784 int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)785 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
786 {
787 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
788 }
789 
790 int
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent * bev,int unsuspend)791 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
792     int unsuspend)
793 {
794 	struct bufferevent_private *bevp =
795 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
796 	BEV_LOCK(bev);
797 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
798 		struct bufferevent_rate_limit_group *g =
799 		    bevp->rate_limiting->group;
800 		LOCK_GROUP(g);
801 		bevp->rate_limiting->group = NULL;
802 		--g->n_members;
803 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
804 		UNLOCK_GROUP(g);
805 	}
806 	if (unsuspend) {
807 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
808 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
809 	}
810 	BEV_UNLOCK(bev);
811 	return 0;
812 }
813 
814 /* ===
815  * API functions to expose rate limits.
816  *
817  * Don't use these from inside Libevent; they're meant to be for use by
818  * the program.
819  * === */
820 
821 /* Mostly you don't want to use this function from inside libevent;
822  * bufferevent_get_read_max_() is more likely what you want*/
823 ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)824 bufferevent_get_read_limit(struct bufferevent *bev)
825 {
826 	ev_ssize_t r;
827 	struct bufferevent_private *bevp;
828 	BEV_LOCK(bev);
829 	bevp = BEV_UPCAST(bev);
830 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
831 		bufferevent_update_buckets(bevp);
832 		r = bevp->rate_limiting->limit.read_limit;
833 	} else {
834 		r = EV_SSIZE_MAX;
835 	}
836 	BEV_UNLOCK(bev);
837 	return r;
838 }
839 
840 /* Mostly you don't want to use this function from inside libevent;
841  * bufferevent_get_write_max_() is more likely what you want*/
842 ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)843 bufferevent_get_write_limit(struct bufferevent *bev)
844 {
845 	ev_ssize_t r;
846 	struct bufferevent_private *bevp;
847 	BEV_LOCK(bev);
848 	bevp = BEV_UPCAST(bev);
849 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
850 		bufferevent_update_buckets(bevp);
851 		r = bevp->rate_limiting->limit.write_limit;
852 	} else {
853 		r = EV_SSIZE_MAX;
854 	}
855 	BEV_UNLOCK(bev);
856 	return r;
857 }
858 
859 int
bufferevent_set_max_single_read(struct bufferevent * bev,size_t size)860 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
861 {
862 	struct bufferevent_private *bevp;
863 	BEV_LOCK(bev);
864 	bevp = BEV_UPCAST(bev);
865 	if (size == 0 || size > EV_SSIZE_MAX)
866 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
867 	else
868 		bevp->max_single_read = size;
869 	BEV_UNLOCK(bev);
870 	return 0;
871 }
872 
873 int
bufferevent_set_max_single_write(struct bufferevent * bev,size_t size)874 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
875 {
876 	struct bufferevent_private *bevp;
877 	BEV_LOCK(bev);
878 	bevp = BEV_UPCAST(bev);
879 	if (size == 0 || size > EV_SSIZE_MAX)
880 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
881 	else
882 		bevp->max_single_write = size;
883 	BEV_UNLOCK(bev);
884 	return 0;
885 }
886 
887 ev_ssize_t
bufferevent_get_max_single_read(struct bufferevent * bev)888 bufferevent_get_max_single_read(struct bufferevent *bev)
889 {
890 	ev_ssize_t r;
891 
892 	BEV_LOCK(bev);
893 	r = BEV_UPCAST(bev)->max_single_read;
894 	BEV_UNLOCK(bev);
895 	return r;
896 }
897 
898 ev_ssize_t
bufferevent_get_max_single_write(struct bufferevent * bev)899 bufferevent_get_max_single_write(struct bufferevent *bev)
900 {
901 	ev_ssize_t r;
902 
903 	BEV_LOCK(bev);
904 	r = BEV_UPCAST(bev)->max_single_write;
905 	BEV_UNLOCK(bev);
906 	return r;
907 }
908 
909 ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)910 bufferevent_get_max_to_read(struct bufferevent *bev)
911 {
912 	ev_ssize_t r;
913 	BEV_LOCK(bev);
914 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
915 	BEV_UNLOCK(bev);
916 	return r;
917 }
918 
919 ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)920 bufferevent_get_max_to_write(struct bufferevent *bev)
921 {
922 	ev_ssize_t r;
923 	BEV_LOCK(bev);
924 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
925 	BEV_UNLOCK(bev);
926 	return r;
927 }
928 
929 const struct ev_token_bucket_cfg *
bufferevent_get_token_bucket_cfg(const struct bufferevent * bev)930 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
931 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
932 	struct ev_token_bucket_cfg *cfg;
933 
934 	BEV_LOCK(bev);
935 
936 	if (bufev_private->rate_limiting) {
937 		cfg = bufev_private->rate_limiting->cfg;
938 	} else {
939 		cfg = NULL;
940 	}
941 
942 	BEV_UNLOCK(bev);
943 
944 	return cfg;
945 }
946 
947 /* Mostly you don't want to use this function from inside libevent;
948  * bufferevent_get_read_max_() is more likely what you want*/
949 ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)950 bufferevent_rate_limit_group_get_read_limit(
951 	struct bufferevent_rate_limit_group *grp)
952 {
953 	ev_ssize_t r;
954 	LOCK_GROUP(grp);
955 	r = grp->rate_limit.read_limit;
956 	UNLOCK_GROUP(grp);
957 	return r;
958 }
959 
960 /* Mostly you don't want to use this function from inside libevent;
961  * bufferevent_get_write_max_() is more likely what you want. */
962 ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)963 bufferevent_rate_limit_group_get_write_limit(
964 	struct bufferevent_rate_limit_group *grp)
965 {
966 	ev_ssize_t r;
967 	LOCK_GROUP(grp);
968 	r = grp->rate_limit.write_limit;
969 	UNLOCK_GROUP(grp);
970 	return r;
971 }
972 
973 int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)974 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
975 {
976 	int r = 0;
977 	ev_ssize_t old_limit, new_limit;
978 	struct bufferevent_private *bevp;
979 	BEV_LOCK(bev);
980 	bevp = BEV_UPCAST(bev);
981 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
982 	old_limit = bevp->rate_limiting->limit.read_limit;
983 
984 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
985 	if (old_limit > 0 && new_limit <= 0) {
986 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
987 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
988 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
989 			r = -1;
990 	} else if (old_limit <= 0 && new_limit > 0) {
991 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
992 			event_del(&bevp->rate_limiting->refill_bucket_event);
993 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
994 	}
995 
996 	BEV_UNLOCK(bev);
997 	return r;
998 }
999 
1000 int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)1001 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1002 {
1003 	/* XXXX this is mostly copy-and-paste from
1004 	 * bufferevent_decrement_read_limit */
1005 	int r = 0;
1006 	ev_ssize_t old_limit, new_limit;
1007 	struct bufferevent_private *bevp;
1008 	BEV_LOCK(bev);
1009 	bevp = BEV_UPCAST(bev);
1010 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1011 	old_limit = bevp->rate_limiting->limit.write_limit;
1012 
1013 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1014 	if (old_limit > 0 && new_limit <= 0) {
1015 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1016 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1017 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1018 			r = -1;
1019 	} else if (old_limit <= 0 && new_limit > 0) {
1020 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1021 			event_del(&bevp->rate_limiting->refill_bucket_event);
1022 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1023 	}
1024 
1025 	BEV_UNLOCK(bev);
1026 	return r;
1027 }
1028 
1029 int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)1030 bufferevent_rate_limit_group_decrement_read(
1031 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1032 {
1033 	int r = 0;
1034 	ev_ssize_t old_limit, new_limit;
1035 	LOCK_GROUP(grp);
1036 	old_limit = grp->rate_limit.read_limit;
1037 	new_limit = (grp->rate_limit.read_limit -= decr);
1038 
1039 	if (old_limit > 0 && new_limit <= 0) {
1040 		bev_group_suspend_reading_(grp);
1041 	} else if (old_limit <= 0 && new_limit > 0) {
1042 		bev_group_unsuspend_reading_(grp);
1043 	}
1044 
1045 	UNLOCK_GROUP(grp);
1046 	return r;
1047 }
1048 
1049 int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)1050 bufferevent_rate_limit_group_decrement_write(
1051 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1052 {
1053 	int r = 0;
1054 	ev_ssize_t old_limit, new_limit;
1055 	LOCK_GROUP(grp);
1056 	old_limit = grp->rate_limit.write_limit;
1057 	new_limit = (grp->rate_limit.write_limit -= decr);
1058 
1059 	if (old_limit > 0 && new_limit <= 0) {
1060 		bev_group_suspend_writing_(grp);
1061 	} else if (old_limit <= 0 && new_limit > 0) {
1062 		bev_group_unsuspend_writing_(grp);
1063 	}
1064 
1065 	UNLOCK_GROUP(grp);
1066 	return r;
1067 }
1068 
1069 void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)1070 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1071     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1072 {
1073 	EVUTIL_ASSERT(grp != NULL);
1074 	if (total_read_out)
1075 		*total_read_out = grp->total_read;
1076 	if (total_written_out)
1077 		*total_written_out = grp->total_written;
1078 }
1079 
1080 void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)1081 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1082 {
1083 	grp->total_read = grp->total_written = 0;
1084 }
1085 
1086 int
bufferevent_ratelim_init_(struct bufferevent_private * bev)1087 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1088 {
1089 	bev->rate_limiting = NULL;
1090 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1091 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1092 
1093 	return 0;
1094 }
1095