1 /* $NetBSD: bufferevent_ratelim.c,v 1.2 2013/04/11 16:56:41 christos Exp $ */
2 /*
3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <sys/types.h>
31 #include <limits.h>
32 #include <string.h>
33 #include <stdlib.h>
34
35 #include "event2/event.h"
36 #include "event2/event_struct.h"
37 #include "event2/util.h"
38 #include "event2/bufferevent.h"
39 #include "event2/bufferevent_struct.h"
40 #include "event2/buffer.h"
41
42 #include "ratelim-internal.h"
43
44 #include "bufferevent-internal.h"
45 #include "mm-internal.h"
46 #include "util-internal.h"
47 #include "event-internal.h"
48
49 int
ev_token_bucket_init(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)50 ev_token_bucket_init(struct ev_token_bucket *bucket,
51 const struct ev_token_bucket_cfg *cfg,
52 ev_uint32_t current_tick,
53 int reinitialize)
54 {
55 if (reinitialize) {
56 /* on reinitialization, we only clip downwards, since we've
57 already used who-knows-how-much bandwidth this tick. We
58 leave "last_updated" as it is; the next update will add the
59 appropriate amount of bandwidth to the bucket.
60 */
61 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62 bucket->read_limit = cfg->read_maximum;
63 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64 bucket->write_limit = cfg->write_maximum;
65 } else {
66 bucket->read_limit = cfg->read_rate;
67 bucket->write_limit = cfg->write_rate;
68 bucket->last_updated = current_tick;
69 }
70 return 0;
71 }
72
73 int
ev_token_bucket_update(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)74 ev_token_bucket_update(struct ev_token_bucket *bucket,
75 const struct ev_token_bucket_cfg *cfg,
76 ev_uint32_t current_tick)
77 {
78 /* It's okay if the tick number overflows, since we'll just
79 * wrap around when we do the unsigned substraction. */
80 unsigned n_ticks = current_tick - bucket->last_updated;
81
82 /* Make sure some ticks actually happened, and that time didn't
83 * roll back. */
84 if (n_ticks == 0 || n_ticks > INT_MAX)
85 return 0;
86
87 /* Naively, we would say
88 bucket->limit += n_ticks * cfg->rate;
89
90 if (bucket->limit > cfg->maximum)
91 bucket->limit = cfg->maximum;
92
93 But we're worried about overflow, so we do it like this:
94 */
95
96 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97 bucket->read_limit = cfg->read_maximum;
98 else
99 bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103 bucket->write_limit = cfg->write_maximum;
104 else
105 bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108 bucket->last_updated = current_tick;
109
110 return 1;
111 }
112
113 static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)114 bufferevent_update_buckets(struct bufferevent_private *bev)
115 {
116 /* Must hold lock on bev. */
117 struct timeval now;
118 unsigned tick;
119 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120 tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
121 if (tick != bev->rate_limiting->limit.last_updated)
122 ev_token_bucket_update(&bev->rate_limiting->limit,
123 bev->rate_limiting->cfg, tick);
124 }
125
126 ev_uint32_t
ev_token_bucket_get_tick(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)127 ev_token_bucket_get_tick(const struct timeval *tv,
128 const struct ev_token_bucket_cfg *cfg)
129 {
130 /* This computation uses two multiplies and a divide. We could do
131 * fewer if we knew that the tick length was an integer number of
132 * seconds, or if we knew it divided evenly into a second. We should
133 * investigate that more.
134 */
135
136 /* We cast to an ev_uint64_t first, since we don't want to overflow
137 * before we do the final divide. */
138 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139 return (unsigned)(msec / cfg->msec_per_tick);
140 }
141
142 struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)143 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144 size_t write_rate, size_t write_burst,
145 const struct timeval *tick_len)
146 {
147 struct ev_token_bucket_cfg *r;
148 struct timeval g;
149 if (! tick_len) {
150 g.tv_sec = 1;
151 g.tv_usec = 0;
152 tick_len = &g;
153 }
154 if (read_rate > read_burst || write_rate > write_burst ||
155 read_rate < 1 || write_rate < 1)
156 return NULL;
157 if (read_rate > EV_RATE_LIMIT_MAX ||
158 write_rate > EV_RATE_LIMIT_MAX ||
159 read_burst > EV_RATE_LIMIT_MAX ||
160 write_burst > EV_RATE_LIMIT_MAX)
161 return NULL;
162 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163 if (!r)
164 return NULL;
165 r->read_rate = read_rate;
166 r->write_rate = write_rate;
167 r->read_maximum = read_burst;
168 r->write_maximum = write_burst;
169 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170 r->msec_per_tick = (tick_len->tv_sec * 1000) +
171 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172 return r;
173 }
174
175 void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)176 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177 {
178 mm_free(cfg);
179 }
180
181 /* No matter how big our bucket gets, don't try to read more than this
182 * much in a single read operation. */
183 #define MAX_TO_READ_EVER 16384
184 /* No matter how big our bucket gets, don't try to write more than this
185 * much in a single write operation. */
186 #define MAX_TO_WRITE_EVER 16384
187
188 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
189 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
190
191 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
192 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
193 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
194 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
195
196 /** Helper: figure out the maximum amount we should write if is_write, or
197 the maximum amount we should read if is_read. Return that maximum, or
198 0 if our bucket is wholly exhausted.
199 */
200 static inline ev_ssize_t
_bufferevent_get_rlim_max(struct bufferevent_private * bev,int is_write)201 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
202 {
203 /* needs lock on bev. */
204 ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
205
206 #define LIM(x) \
207 (is_write ? (x).write_limit : (x).read_limit)
208
209 #define GROUP_SUSPENDED(g) \
210 (is_write ? (g)->write_suspended : (g)->read_suspended)
211
212 /* Sets max_so_far to MIN(x, max_so_far) */
213 #define CLAMPTO(x) \
214 do { \
215 if (max_so_far > (x)) \
216 max_so_far = (x); \
217 } while (/*CONSTCOND*/0);
218
219 if (!bev->rate_limiting)
220 return max_so_far;
221
222 /* If rate-limiting is enabled at all, update the appropriate
223 bucket, and take the smaller of our rate limit and the group
224 rate limit.
225 */
226
227 if (bev->rate_limiting->cfg) {
228 bufferevent_update_buckets(bev);
229 max_so_far = LIM(bev->rate_limiting->limit);
230 }
231 if (bev->rate_limiting->group) {
232 struct bufferevent_rate_limit_group *g =
233 bev->rate_limiting->group;
234 ev_ssize_t share;
235 LOCK_GROUP(g);
236 if (GROUP_SUSPENDED(g)) {
237 /* We can get here if we failed to lock this
238 * particular bufferevent while suspending the whole
239 * group. */
240 if (is_write)
241 bufferevent_suspend_write(&bev->bev,
242 BEV_SUSPEND_BW_GROUP);
243 else
244 bufferevent_suspend_read(&bev->bev,
245 BEV_SUSPEND_BW_GROUP);
246 share = 0;
247 } else {
248 /* XXXX probably we should divide among the active
249 * members, not the total members. */
250 share = LIM(g->rate_limit) / g->n_members;
251 if (share < g->min_share)
252 share = g->min_share;
253 }
254 UNLOCK_GROUP(g);
255 CLAMPTO(share);
256 }
257
258 if (max_so_far < 0)
259 max_so_far = 0;
260 return max_so_far;
261 }
262
263 ev_ssize_t
_bufferevent_get_read_max(struct bufferevent_private * bev)264 _bufferevent_get_read_max(struct bufferevent_private *bev)
265 {
266 return _bufferevent_get_rlim_max(bev, 0);
267 }
268
269 ev_ssize_t
_bufferevent_get_write_max(struct bufferevent_private * bev)270 _bufferevent_get_write_max(struct bufferevent_private *bev)
271 {
272 return _bufferevent_get_rlim_max(bev, 1);
273 }
274
275 int
_bufferevent_decrement_read_buckets(struct bufferevent_private * bev,ev_ssize_t bytes)276 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
277 {
278 /* XXXXX Make sure all users of this function check its return value */
279 int r = 0;
280 /* need to hold lock on bev */
281 if (!bev->rate_limiting)
282 return 0;
283
284 if (bev->rate_limiting->cfg) {
285 bev->rate_limiting->limit.read_limit -= bytes;
286 if (bev->rate_limiting->limit.read_limit <= 0) {
287 bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
288 if (event_add(&bev->rate_limiting->refill_bucket_event,
289 &bev->rate_limiting->cfg->tick_timeout) < 0)
290 r = -1;
291 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
292 if (!(bev->write_suspended & BEV_SUSPEND_BW))
293 event_del(&bev->rate_limiting->refill_bucket_event);
294 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
295 }
296 }
297
298 if (bev->rate_limiting->group) {
299 LOCK_GROUP(bev->rate_limiting->group);
300 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
301 bev->rate_limiting->group->total_read += bytes;
302 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
303 _bev_group_suspend_reading(bev->rate_limiting->group);
304 } else if (bev->rate_limiting->group->read_suspended) {
305 _bev_group_unsuspend_reading(bev->rate_limiting->group);
306 }
307 UNLOCK_GROUP(bev->rate_limiting->group);
308 }
309
310 return r;
311 }
312
313 int
_bufferevent_decrement_write_buckets(struct bufferevent_private * bev,ev_ssize_t bytes)314 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
315 {
316 /* XXXXX Make sure all users of this function check its return value */
317 int r = 0;
318 /* need to hold lock */
319 if (!bev->rate_limiting)
320 return 0;
321
322 if (bev->rate_limiting->cfg) {
323 bev->rate_limiting->limit.write_limit -= bytes;
324 if (bev->rate_limiting->limit.write_limit <= 0) {
325 bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
326 if (event_add(&bev->rate_limiting->refill_bucket_event,
327 &bev->rate_limiting->cfg->tick_timeout) < 0)
328 r = -1;
329 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
330 if (!(bev->read_suspended & BEV_SUSPEND_BW))
331 event_del(&bev->rate_limiting->refill_bucket_event);
332 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
333 }
334 }
335
336 if (bev->rate_limiting->group) {
337 LOCK_GROUP(bev->rate_limiting->group);
338 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
339 bev->rate_limiting->group->total_written += bytes;
340 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
341 _bev_group_suspend_writing(bev->rate_limiting->group);
342 } else if (bev->rate_limiting->group->write_suspended) {
343 _bev_group_unsuspend_writing(bev->rate_limiting->group);
344 }
345 UNLOCK_GROUP(bev->rate_limiting->group);
346 }
347
348 return r;
349 }
350
351 /** Stop reading on every bufferevent in <b>g</b> */
352 static int
_bev_group_suspend_reading(struct bufferevent_rate_limit_group * g)353 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
354 {
355 /* Needs group lock */
356 struct bufferevent_private *bev;
357 g->read_suspended = 1;
358 g->pending_unsuspend_read = 0;
359
360 /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
361 to prevent a deadlock. (Ordinarily, the group lock nests inside
362 the bufferevent locks. If we are unable to lock any individual
363 bufferevent, it will find out later when it looks at its limit
364 and sees that its group is suspended.
365 */
366 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
367 if (EVLOCK_TRY_LOCK(bev->lock)) {
368 bufferevent_suspend_read(&bev->bev,
369 BEV_SUSPEND_BW_GROUP);
370 EVLOCK_UNLOCK(bev->lock, 0);
371 }
372 }
373 return 0;
374 }
375
376 /** Stop writing on every bufferevent in <b>g</b> */
377 static int
_bev_group_suspend_writing(struct bufferevent_rate_limit_group * g)378 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
379 {
380 /* Needs group lock */
381 struct bufferevent_private *bev;
382 g->write_suspended = 1;
383 g->pending_unsuspend_write = 0;
384 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
385 if (EVLOCK_TRY_LOCK(bev->lock)) {
386 bufferevent_suspend_write(&bev->bev,
387 BEV_SUSPEND_BW_GROUP);
388 EVLOCK_UNLOCK(bev->lock, 0);
389 }
390 }
391 return 0;
392 }
393
394 /** Timer callback invoked on a single bufferevent with one or more exhausted
395 buckets when they are ready to refill. */
396 static void
_bev_refill_callback(evutil_socket_t fd,short what,void * arg)397 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
398 {
399 unsigned tick;
400 struct timeval now;
401 struct bufferevent_private *bev = arg;
402 int again = 0;
403 BEV_LOCK(&bev->bev);
404 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
405 BEV_UNLOCK(&bev->bev);
406 return;
407 }
408
409 /* First, update the bucket */
410 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
411 tick = ev_token_bucket_get_tick(&now,
412 bev->rate_limiting->cfg);
413 ev_token_bucket_update(&bev->rate_limiting->limit,
414 bev->rate_limiting->cfg,
415 tick);
416
417 /* Now unsuspend any read/write operations as appropriate. */
418 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
419 if (bev->rate_limiting->limit.read_limit > 0)
420 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
421 else
422 again = 1;
423 }
424 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
425 if (bev->rate_limiting->limit.write_limit > 0)
426 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
427 else
428 again = 1;
429 }
430 if (again) {
431 /* One or more of the buckets may need another refill if they
432 started negative.
433
434 XXXX if we need to be quiet for more ticks, we should
435 maybe figure out what timeout we really want.
436 */
437 /* XXXX Handle event_add failure somehow */
438 event_add(&bev->rate_limiting->refill_bucket_event,
439 &bev->rate_limiting->cfg->tick_timeout);
440 }
441 BEV_UNLOCK(&bev->bev);
442 }
443
444 /** Helper: grab a random element from a bufferevent group. */
445 static struct bufferevent_private *
_bev_group_random_element(struct bufferevent_rate_limit_group * group)446 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
447 {
448 int which;
449 struct bufferevent_private *bev;
450
451 /* requires group lock */
452
453 if (!group->n_members)
454 return NULL;
455
456 EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
457
458 which = _evutil_weakrand() % group->n_members;
459
460 bev = TAILQ_FIRST(&group->members);
461 while (which--)
462 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
463
464 return bev;
465 }
466
467 /** Iterate over the elements of a rate-limiting group 'g' with a random
468 starting point, assigning each to the variable 'bev', and executing the
469 block 'block'.
470
471 We do this in a half-baked effort to get fairness among group members.
472 XXX Round-robin or some kind of priority queue would be even more fair.
473 */
474 #define FOREACH_RANDOM_ORDER(block) \
475 do { \
476 first = _bev_group_random_element(g); \
477 for (bev = first; bev != TAILQ_END(&g->members); \
478 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
479 block ; \
480 } \
481 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
482 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
483 block ; \
484 } \
485 } while (/*CONSTCOND*/0)
486
487 static void
_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group * g)488 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
489 {
490 int again = 0;
491 struct bufferevent_private *bev, *first;
492
493 g->read_suspended = 0;
494 FOREACH_RANDOM_ORDER({
495 if (EVLOCK_TRY_LOCK(bev->lock)) {
496 bufferevent_unsuspend_read(&bev->bev,
497 BEV_SUSPEND_BW_GROUP);
498 EVLOCK_UNLOCK(bev->lock, 0);
499 } else {
500 again = 1;
501 }
502 });
503 g->pending_unsuspend_read = again;
504 }
505
506 static void
_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group * g)507 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
508 {
509 int again = 0;
510 struct bufferevent_private *bev, *first;
511 g->write_suspended = 0;
512
513 FOREACH_RANDOM_ORDER({
514 if (EVLOCK_TRY_LOCK(bev->lock)) {
515 bufferevent_unsuspend_write(&bev->bev,
516 BEV_SUSPEND_BW_GROUP);
517 EVLOCK_UNLOCK(bev->lock, 0);
518 } else {
519 again = 1;
520 }
521 });
522 g->pending_unsuspend_write = again;
523 }
524
525 /** Callback invoked every tick to add more elements to the group bucket
526 and unsuspend group members as needed.
527 */
528 static void
_bev_group_refill_callback(evutil_socket_t fd,short what,void * arg)529 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
530 {
531 struct bufferevent_rate_limit_group *g = arg;
532 unsigned tick;
533 struct timeval now;
534
535 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536
537 LOCK_GROUP(g);
538
539 tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
540 ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
541
542 if (g->pending_unsuspend_read ||
543 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544 _bev_group_unsuspend_reading(g);
545 }
546 if (g->pending_unsuspend_write ||
547 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548 _bev_group_unsuspend_writing(g);
549 }
550
551 /* XXXX Rather than waiting to the next tick to unsuspend stuff
552 * with pending_unsuspend_write/read, we should do it on the
553 * next iteration of the mainloop.
554 */
555
556 UNLOCK_GROUP(g);
557 }
558
559 int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)560 bufferevent_set_rate_limit(struct bufferevent *bev,
561 struct ev_token_bucket_cfg *cfg)
562 {
563 struct bufferevent_private *bevp =
564 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
565 int r = -1;
566 struct bufferevent_rate_limit *rlim;
567 struct timeval now;
568 ev_uint32_t tick;
569 int reinit = 0, suspended = 0;
570 /* XXX reference-count cfg */
571
572 BEV_LOCK(bev);
573
574 if (cfg == NULL) {
575 if (bevp->rate_limiting) {
576 rlim = bevp->rate_limiting;
577 rlim->cfg = NULL;
578 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
579 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
580 if (event_initialized(&rlim->refill_bucket_event))
581 event_del(&rlim->refill_bucket_event);
582 }
583 r = 0;
584 goto done;
585 }
586
587 event_base_gettimeofday_cached(bev->ev_base, &now);
588 tick = ev_token_bucket_get_tick(&now, cfg);
589
590 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
591 /* no-op */
592 r = 0;
593 goto done;
594 }
595 if (bevp->rate_limiting == NULL) {
596 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
597 if (!rlim)
598 goto done;
599 bevp->rate_limiting = rlim;
600 } else {
601 rlim = bevp->rate_limiting;
602 }
603 reinit = rlim->cfg != NULL;
604
605 rlim->cfg = cfg;
606 ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
607
608 if (reinit) {
609 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
610 event_del(&rlim->refill_bucket_event);
611 }
612 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
613 _bev_refill_callback, bevp);
614
615 if (rlim->limit.read_limit > 0) {
616 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
617 } else {
618 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
619 suspended=1;
620 }
621 if (rlim->limit.write_limit > 0) {
622 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
623 } else {
624 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
625 suspended = 1;
626 }
627
628 if (suspended)
629 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
630
631 r = 0;
632
633 done:
634 BEV_UNLOCK(bev);
635 return r;
636 }
637
638 struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)639 bufferevent_rate_limit_group_new(struct event_base *base,
640 const struct ev_token_bucket_cfg *cfg)
641 {
642 struct bufferevent_rate_limit_group *g;
643 struct timeval now;
644 ev_uint32_t tick;
645
646 event_base_gettimeofday_cached(base, &now);
647 tick = ev_token_bucket_get_tick(&now, cfg);
648
649 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
650 if (!g)
651 return NULL;
652 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
653 TAILQ_INIT(&g->members);
654
655 ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
656
657 event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
658 _bev_group_refill_callback, g);
659 /*XXXX handle event_add failure */
660 event_add(&g->master_refill_event, &cfg->tick_timeout);
661
662 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
663
664 bufferevent_rate_limit_group_set_min_share(g, 64);
665
666 return g;
667 }
668
669 int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)670 bufferevent_rate_limit_group_set_cfg(
671 struct bufferevent_rate_limit_group *g,
672 const struct ev_token_bucket_cfg *cfg)
673 {
674 int same_tick;
675 if (!g || !cfg)
676 return -1;
677
678 LOCK_GROUP(g);
679 same_tick = evutil_timercmp(
680 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
681 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
682
683 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
684 g->rate_limit.read_limit = cfg->read_maximum;
685 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
686 g->rate_limit.write_limit = cfg->write_maximum;
687
688 if (!same_tick) {
689 /* This can cause a hiccup in the schedule */
690 event_add(&g->master_refill_event, &cfg->tick_timeout);
691 }
692
693 /* The new limits might force us to adjust min_share differently. */
694 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
695
696 UNLOCK_GROUP(g);
697 return 0;
698 }
699
700 int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)701 bufferevent_rate_limit_group_set_min_share(
702 struct bufferevent_rate_limit_group *g,
703 size_t share)
704 {
705 if (share > EV_SSIZE_MAX)
706 return -1;
707
708 g->configured_min_share = share;
709
710 /* Can't set share to less than the one-tick maximum. IOW, at steady
711 * state, at least one connection can go per tick. */
712 if (share > g->rate_limit_cfg.read_rate)
713 share = g->rate_limit_cfg.read_rate;
714 if (share > g->rate_limit_cfg.write_rate)
715 share = g->rate_limit_cfg.write_rate;
716
717 g->min_share = share;
718 return 0;
719 }
720
721 void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)722 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
723 {
724 LOCK_GROUP(g);
725 EVUTIL_ASSERT(0 == g->n_members);
726 event_del(&g->master_refill_event);
727 UNLOCK_GROUP(g);
728 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
729 mm_free(g);
730 }
731
732 int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)733 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
734 struct bufferevent_rate_limit_group *g)
735 {
736 int wsuspend, rsuspend;
737 struct bufferevent_private *bevp =
738 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
739 BEV_LOCK(bev);
740
741 if (!bevp->rate_limiting) {
742 struct bufferevent_rate_limit *rlim;
743 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
744 if (!rlim) {
745 BEV_UNLOCK(bev);
746 return -1;
747 }
748 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
749 _bev_refill_callback, bevp);
750 bevp->rate_limiting = rlim;
751 }
752
753 if (bevp->rate_limiting->group == g) {
754 BEV_UNLOCK(bev);
755 return 0;
756 }
757 if (bevp->rate_limiting->group)
758 bufferevent_remove_from_rate_limit_group(bev);
759
760 LOCK_GROUP(g);
761 bevp->rate_limiting->group = g;
762 ++g->n_members;
763 TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
764
765 rsuspend = g->read_suspended;
766 wsuspend = g->write_suspended;
767
768 UNLOCK_GROUP(g);
769
770 if (rsuspend)
771 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
772 if (wsuspend)
773 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
774
775 BEV_UNLOCK(bev);
776 return 0;
777 }
778
779 int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)780 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
781 {
782 return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
783 }
784
785 int
bufferevent_remove_from_rate_limit_group_internal(struct bufferevent * bev,int unsuspend)786 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
787 int unsuspend)
788 {
789 struct bufferevent_private *bevp =
790 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
791 BEV_LOCK(bev);
792 if (bevp->rate_limiting && bevp->rate_limiting->group) {
793 struct bufferevent_rate_limit_group *g =
794 bevp->rate_limiting->group;
795 LOCK_GROUP(g);
796 bevp->rate_limiting->group = NULL;
797 --g->n_members;
798 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
799 UNLOCK_GROUP(g);
800 }
801 if (unsuspend) {
802 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
803 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
804 }
805 BEV_UNLOCK(bev);
806 return 0;
807 }
808
809 /* ===
810 * API functions to expose rate limits.
811 *
812 * Don't use these from inside Libevent; they're meant to be for use by
813 * the program.
814 * === */
815
816 /* Mostly you don't want to use this function from inside libevent;
817 * _bufferevent_get_read_max() is more likely what you want*/
818 ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)819 bufferevent_get_read_limit(struct bufferevent *bev)
820 {
821 ev_ssize_t r;
822 struct bufferevent_private *bevp;
823 BEV_LOCK(bev);
824 bevp = BEV_UPCAST(bev);
825 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
826 bufferevent_update_buckets(bevp);
827 r = bevp->rate_limiting->limit.read_limit;
828 } else {
829 r = EV_SSIZE_MAX;
830 }
831 BEV_UNLOCK(bev);
832 return r;
833 }
834
835 /* Mostly you don't want to use this function from inside libevent;
836 * _bufferevent_get_write_max() is more likely what you want*/
837 ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)838 bufferevent_get_write_limit(struct bufferevent *bev)
839 {
840 ev_ssize_t r;
841 struct bufferevent_private *bevp;
842 BEV_LOCK(bev);
843 bevp = BEV_UPCAST(bev);
844 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
845 bufferevent_update_buckets(bevp);
846 r = bevp->rate_limiting->limit.write_limit;
847 } else {
848 r = EV_SSIZE_MAX;
849 }
850 BEV_UNLOCK(bev);
851 return r;
852 }
853
854 ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)855 bufferevent_get_max_to_read(struct bufferevent *bev)
856 {
857 ev_ssize_t r;
858 BEV_LOCK(bev);
859 r = _bufferevent_get_read_max(BEV_UPCAST(bev));
860 BEV_UNLOCK(bev);
861 return r;
862 }
863
864 ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)865 bufferevent_get_max_to_write(struct bufferevent *bev)
866 {
867 ev_ssize_t r;
868 BEV_LOCK(bev);
869 r = _bufferevent_get_write_max(BEV_UPCAST(bev));
870 BEV_UNLOCK(bev);
871 return r;
872 }
873
874
875 /* Mostly you don't want to use this function from inside libevent;
876 * _bufferevent_get_read_max() is more likely what you want*/
877 ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)878 bufferevent_rate_limit_group_get_read_limit(
879 struct bufferevent_rate_limit_group *grp)
880 {
881 ev_ssize_t r;
882 LOCK_GROUP(grp);
883 r = grp->rate_limit.read_limit;
884 UNLOCK_GROUP(grp);
885 return r;
886 }
887
888 /* Mostly you don't want to use this function from inside libevent;
889 * _bufferevent_get_write_max() is more likely what you want. */
890 ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)891 bufferevent_rate_limit_group_get_write_limit(
892 struct bufferevent_rate_limit_group *grp)
893 {
894 ev_ssize_t r;
895 LOCK_GROUP(grp);
896 r = grp->rate_limit.write_limit;
897 UNLOCK_GROUP(grp);
898 return r;
899 }
900
901 int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)902 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
903 {
904 int r = 0;
905 ev_ssize_t old_limit, new_limit;
906 struct bufferevent_private *bevp;
907 BEV_LOCK(bev);
908 bevp = BEV_UPCAST(bev);
909 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
910 old_limit = bevp->rate_limiting->limit.read_limit;
911
912 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
913 if (old_limit > 0 && new_limit <= 0) {
914 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
915 if (event_add(&bevp->rate_limiting->refill_bucket_event,
916 &bevp->rate_limiting->cfg->tick_timeout) < 0)
917 r = -1;
918 } else if (old_limit <= 0 && new_limit > 0) {
919 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
920 event_del(&bevp->rate_limiting->refill_bucket_event);
921 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
922 }
923
924 BEV_UNLOCK(bev);
925 return r;
926 }
927
928 int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)929 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
930 {
931 /* XXXX this is mostly copy-and-paste from
932 * bufferevent_decrement_read_limit */
933 int r = 0;
934 ev_ssize_t old_limit, new_limit;
935 struct bufferevent_private *bevp;
936 BEV_LOCK(bev);
937 bevp = BEV_UPCAST(bev);
938 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
939 old_limit = bevp->rate_limiting->limit.write_limit;
940
941 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
942 if (old_limit > 0 && new_limit <= 0) {
943 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
944 if (event_add(&bevp->rate_limiting->refill_bucket_event,
945 &bevp->rate_limiting->cfg->tick_timeout) < 0)
946 r = -1;
947 } else if (old_limit <= 0 && new_limit > 0) {
948 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
949 event_del(&bevp->rate_limiting->refill_bucket_event);
950 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
951 }
952
953 BEV_UNLOCK(bev);
954 return r;
955 }
956
957 int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)958 bufferevent_rate_limit_group_decrement_read(
959 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
960 {
961 int r = 0;
962 ev_ssize_t old_limit, new_limit;
963 LOCK_GROUP(grp);
964 old_limit = grp->rate_limit.read_limit;
965 new_limit = (grp->rate_limit.read_limit -= decr);
966
967 if (old_limit > 0 && new_limit <= 0) {
968 _bev_group_suspend_reading(grp);
969 } else if (old_limit <= 0 && new_limit > 0) {
970 _bev_group_unsuspend_reading(grp);
971 }
972
973 UNLOCK_GROUP(grp);
974 return r;
975 }
976
977 int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)978 bufferevent_rate_limit_group_decrement_write(
979 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
980 {
981 int r = 0;
982 ev_ssize_t old_limit, new_limit;
983 LOCK_GROUP(grp);
984 old_limit = grp->rate_limit.write_limit;
985 new_limit = (grp->rate_limit.write_limit -= decr);
986
987 if (old_limit > 0 && new_limit <= 0) {
988 _bev_group_suspend_writing(grp);
989 } else if (old_limit <= 0 && new_limit > 0) {
990 _bev_group_unsuspend_writing(grp);
991 }
992
993 UNLOCK_GROUP(grp);
994 return r;
995 }
996
997 void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)998 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
999 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1000 {
1001 EVUTIL_ASSERT(grp != NULL);
1002 if (total_read_out)
1003 *total_read_out = grp->total_read;
1004 if (total_written_out)
1005 *total_written_out = grp->total_written;
1006 }
1007
1008 void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)1009 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1010 {
1011 grp->total_read = grp->total_written = 0;
1012 }
1013