1 /* $NetBSD: bufferevent_ratelim.c,v 1.6 2021/04/10 19:18:45 rillig Exp $ */
2
3 /*
4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. The name of the author may not be used to endorse or promote products
17 * derived from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30 #include "evconfig-private.h"
31
32 #include <sys/types.h>
33 #include <limits.h>
34 #include <string.h>
35 #include <stdlib.h>
36
37 #include "event2/event.h"
38 #include "event2/event_struct.h"
39 #include "event2/util.h"
40 #include "event2/bufferevent.h"
41 #include "event2/bufferevent_struct.h"
42 #include "event2/buffer.h"
43
44 #include "ratelim-internal.h"
45
46 #include "bufferevent-internal.h"
47 #include "mm-internal.h"
48 #include "util-internal.h"
49 #include "event-internal.h"
50
51 int
ev_token_bucket_init_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)52 ev_token_bucket_init_(struct ev_token_bucket *bucket,
53 const struct ev_token_bucket_cfg *cfg,
54 ev_uint32_t current_tick,
55 int reinitialize)
56 {
57 if (reinitialize) {
58 /* on reinitialization, we only clip downwards, since we've
59 already used who-knows-how-much bandwidth this tick. We
60 leave "last_updated" as it is; the next update will add the
61 appropriate amount of bandwidth to the bucket.
62 */
63 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
64 bucket->read_limit = cfg->read_maximum;
65 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
66 bucket->write_limit = cfg->write_maximum;
67 } else {
68 bucket->read_limit = cfg->read_rate;
69 bucket->write_limit = cfg->write_rate;
70 bucket->last_updated = current_tick;
71 }
72 return 0;
73 }
74
75 int
ev_token_bucket_update_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)76 ev_token_bucket_update_(struct ev_token_bucket *bucket,
77 const struct ev_token_bucket_cfg *cfg,
78 ev_uint32_t current_tick)
79 {
80 /* It's okay if the tick number overflows, since we'll just
81 * wrap around when we do the unsigned substraction. */
82 unsigned n_ticks = current_tick - bucket->last_updated;
83
84 /* Make sure some ticks actually happened, and that time didn't
85 * roll back. */
86 if (n_ticks == 0 || n_ticks > INT_MAX)
87 return 0;
88
89 /* Naively, we would say
90 bucket->limit += n_ticks * cfg->rate;
91
92 if (bucket->limit > cfg->maximum)
93 bucket->limit = cfg->maximum;
94
95 But we're worried about overflow, so we do it like this:
96 */
97
98 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
99 bucket->read_limit = cfg->read_maximum;
100 else
101 bucket->read_limit += n_ticks * cfg->read_rate;
102
103
104 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
105 bucket->write_limit = cfg->write_maximum;
106 else
107 bucket->write_limit += n_ticks * cfg->write_rate;
108
109
110 bucket->last_updated = current_tick;
111
112 return 1;
113 }
114
115 static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)116 bufferevent_update_buckets(struct bufferevent_private *bev)
117 {
118 /* Must hold lock on bev. */
119 struct timeval now;
120 unsigned tick;
121 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122 tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
123 if (tick != bev->rate_limiting->limit.last_updated)
124 ev_token_bucket_update_(&bev->rate_limiting->limit,
125 bev->rate_limiting->cfg, tick);
126 }
127
128 ev_uint32_t
ev_token_bucket_get_tick_(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)129 ev_token_bucket_get_tick_(const struct timeval *tv,
130 const struct ev_token_bucket_cfg *cfg)
131 {
132 /* This computation uses two multiplies and a divide. We could do
133 * fewer if we knew that the tick length was an integer number of
134 * seconds, or if we knew it divided evenly into a second. We should
135 * investigate that more.
136 */
137
138 /* We cast to an ev_uint64_t first, since we don't want to overflow
139 * before we do the final divide. */
140 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
141 return (unsigned)(msec / cfg->msec_per_tick);
142 }
143
144 struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)145 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
146 size_t write_rate, size_t write_burst,
147 const struct timeval *tick_len)
148 {
149 struct ev_token_bucket_cfg *r;
150 struct timeval g;
151 if (! tick_len) {
152 g.tv_sec = 1;
153 g.tv_usec = 0;
154 tick_len = &g;
155 }
156 if (read_rate > read_burst || write_rate > write_burst ||
157 read_rate < 1 || write_rate < 1)
158 return NULL;
159 if (read_rate > EV_RATE_LIMIT_MAX ||
160 write_rate > EV_RATE_LIMIT_MAX ||
161 read_burst > EV_RATE_LIMIT_MAX ||
162 write_burst > EV_RATE_LIMIT_MAX)
163 return NULL;
164 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165 if (!r)
166 return NULL;
167 r->read_rate = read_rate;
168 r->write_rate = write_rate;
169 r->read_maximum = read_burst;
170 r->write_maximum = write_burst;
171 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
172 r->msec_per_tick = (tick_len->tv_sec * 1000) +
173 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
174 return r;
175 }
176
177 void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)178 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
179 {
180 mm_free(cfg);
181 }
182
183 /* Default values for max_single_read & max_single_write variables. */
184 #define MAX_SINGLE_READ_DEFAULT 16384
185 #define MAX_SINGLE_WRITE_DEFAULT 16384
186
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189
190 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
194
195 /** Helper: figure out the maximum amount we should write if is_write, or
196 the maximum amount we should read if is_read. Return that maximum, or
197 0 if our bucket is wholly exhausted.
198 */
199 static inline ev_ssize_t
bufferevent_get_rlim_max_(struct bufferevent_private * bev,int is_write)200 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
201 {
202 /* needs lock on bev. */
203 ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204
205 #define LIM(x) \
206 (is_write ? (x).write_limit : (x).read_limit)
207
208 #define GROUP_SUSPENDED(g) \
209 (is_write ? (g)->write_suspended : (g)->read_suspended)
210
211 /* Sets max_so_far to MIN(x, max_so_far) */
212 #define CLAMPTO(x) \
213 do { \
214 if (max_so_far > (x)) \
215 max_so_far = (x); \
216 } while (0);
217
218 if (!bev->rate_limiting)
219 return max_so_far;
220
221 /* If rate-limiting is enabled at all, update the appropriate
222 bucket, and take the smaller of our rate limit and the group
223 rate limit.
224 */
225
226 if (bev->rate_limiting->cfg) {
227 bufferevent_update_buckets(bev);
228 max_so_far = LIM(bev->rate_limiting->limit);
229 }
230 if (bev->rate_limiting->group) {
231 struct bufferevent_rate_limit_group *g =
232 bev->rate_limiting->group;
233 ev_ssize_t share;
234 LOCK_GROUP(g);
235 if (GROUP_SUSPENDED(g)) {
236 /* We can get here if we failed to lock this
237 * particular bufferevent while suspending the whole
238 * group. */
239 if (is_write)
240 bufferevent_suspend_write_(&bev->bev,
241 BEV_SUSPEND_BW_GROUP);
242 else
243 bufferevent_suspend_read_(&bev->bev,
244 BEV_SUSPEND_BW_GROUP);
245 share = 0;
246 } else {
247 /* XXXX probably we should divide among the active
248 * members, not the total members. */
249 share = LIM(g->rate_limit) / g->n_members;
250 if (share < g->min_share)
251 share = g->min_share;
252 }
253 UNLOCK_GROUP(g);
254 CLAMPTO(share);
255 }
256
257 if (max_so_far < 0)
258 max_so_far = 0;
259 return max_so_far;
260 }
261
262 ev_ssize_t
bufferevent_get_read_max_(struct bufferevent_private * bev)263 bufferevent_get_read_max_(struct bufferevent_private *bev)
264 {
265 return bufferevent_get_rlim_max_(bev, 0);
266 }
267
268 ev_ssize_t
bufferevent_get_write_max_(struct bufferevent_private * bev)269 bufferevent_get_write_max_(struct bufferevent_private *bev)
270 {
271 return bufferevent_get_rlim_max_(bev, 1);
272 }
273
274 int
bufferevent_decrement_read_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)275 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277 /* XXXXX Make sure all users of this function check its return value */
278 int r = 0;
279 /* need to hold lock on bev */
280 if (!bev->rate_limiting)
281 return 0;
282
283 if (bev->rate_limiting->cfg) {
284 bev->rate_limiting->limit.read_limit -= bytes;
285 if (bev->rate_limiting->limit.read_limit <= 0) {
286 bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
287 if (event_add(&bev->rate_limiting->refill_bucket_event,
288 &bev->rate_limiting->cfg->tick_timeout) < 0)
289 r = -1;
290 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
291 if (!(bev->write_suspended & BEV_SUSPEND_BW))
292 event_del(&bev->rate_limiting->refill_bucket_event);
293 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
294 }
295 }
296
297 if (bev->rate_limiting->group) {
298 LOCK_GROUP(bev->rate_limiting->group);
299 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300 bev->rate_limiting->group->total_read += bytes;
301 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302 bev_group_suspend_reading_(bev->rate_limiting->group);
303 } else if (bev->rate_limiting->group->read_suspended) {
304 bev_group_unsuspend_reading_(bev->rate_limiting->group);
305 }
306 UNLOCK_GROUP(bev->rate_limiting->group);
307 }
308
309 return r;
310 }
311
312 int
bufferevent_decrement_write_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)313 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315 /* XXXXX Make sure all users of this function check its return value */
316 int r = 0;
317 /* need to hold lock */
318 if (!bev->rate_limiting)
319 return 0;
320
321 if (bev->rate_limiting->cfg) {
322 bev->rate_limiting->limit.write_limit -= bytes;
323 if (bev->rate_limiting->limit.write_limit <= 0) {
324 bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
325 if (event_add(&bev->rate_limiting->refill_bucket_event,
326 &bev->rate_limiting->cfg->tick_timeout) < 0)
327 r = -1;
328 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
329 if (!(bev->read_suspended & BEV_SUSPEND_BW))
330 event_del(&bev->rate_limiting->refill_bucket_event);
331 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
332 }
333 }
334
335 if (bev->rate_limiting->group) {
336 LOCK_GROUP(bev->rate_limiting->group);
337 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338 bev->rate_limiting->group->total_written += bytes;
339 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340 bev_group_suspend_writing_(bev->rate_limiting->group);
341 } else if (bev->rate_limiting->group->write_suspended) {
342 bev_group_unsuspend_writing_(bev->rate_limiting->group);
343 }
344 UNLOCK_GROUP(bev->rate_limiting->group);
345 }
346
347 return r;
348 }
349
350 /** Stop reading on every bufferevent in <b>g</b> */
351 static int
bev_group_suspend_reading_(struct bufferevent_rate_limit_group * g)352 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
353 {
354 /* Needs group lock */
355 struct bufferevent_private *bev;
356 g->read_suspended = 1;
357 g->pending_unsuspend_read = 0;
358
359 /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
360 to prevent a deadlock. (Ordinarily, the group lock nests inside
361 the bufferevent locks. If we are unable to lock any individual
362 bufferevent, it will find out later when it looks at its limit
363 and sees that its group is suspended.)
364 */
365 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366 if (EVLOCK_TRY_LOCK_(bev->lock)) {
367 bufferevent_suspend_read_(&bev->bev,
368 BEV_SUSPEND_BW_GROUP);
369 EVLOCK_UNLOCK(bev->lock, 0);
370 }
371 }
372 return 0;
373 }
374
375 /** Stop writing on every bufferevent in <b>g</b> */
376 static int
bev_group_suspend_writing_(struct bufferevent_rate_limit_group * g)377 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
378 {
379 /* Needs group lock */
380 struct bufferevent_private *bev;
381 g->write_suspended = 1;
382 g->pending_unsuspend_write = 0;
383 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384 if (EVLOCK_TRY_LOCK_(bev->lock)) {
385 bufferevent_suspend_write_(&bev->bev,
386 BEV_SUSPEND_BW_GROUP);
387 EVLOCK_UNLOCK(bev->lock, 0);
388 }
389 }
390 return 0;
391 }
392
393 /** Timer callback invoked on a single bufferevent with one or more exhausted
394 buckets when they are ready to refill. */
395 static void
bev_refill_callback_(evutil_socket_t fd,short what,void * arg)396 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
397 {
398 unsigned tick;
399 struct timeval now;
400 struct bufferevent_private *bev = arg;
401 int again = 0;
402 BEV_LOCK(&bev->bev);
403 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404 BEV_UNLOCK(&bev->bev);
405 return;
406 }
407
408 /* First, update the bucket */
409 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410 tick = ev_token_bucket_get_tick_(&now,
411 bev->rate_limiting->cfg);
412 ev_token_bucket_update_(&bev->rate_limiting->limit,
413 bev->rate_limiting->cfg,
414 tick);
415
416 /* Now unsuspend any read/write operations as appropriate. */
417 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418 if (bev->rate_limiting->limit.read_limit > 0)
419 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
420 else
421 again = 1;
422 }
423 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424 if (bev->rate_limiting->limit.write_limit > 0)
425 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
426 else
427 again = 1;
428 }
429 if (again) {
430 /* One or more of the buckets may need another refill if they
431 started negative.
432
433 XXXX if we need to be quiet for more ticks, we should
434 maybe figure out what timeout we really want.
435 */
436 /* XXXX Handle event_add failure somehow */
437 event_add(&bev->rate_limiting->refill_bucket_event,
438 &bev->rate_limiting->cfg->tick_timeout);
439 }
440 BEV_UNLOCK(&bev->bev);
441 }
442
443 /** Helper: grab a random element from a bufferevent group.
444 *
445 * Requires that we hold the lock on the group.
446 */
447 static struct bufferevent_private *
bev_group_random_element_(struct bufferevent_rate_limit_group * group)448 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449 {
450 int which;
451 struct bufferevent_private *bev;
452
453 /* requires group lock */
454
455 if (!group->n_members)
456 return NULL;
457
458 EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
459
460 which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
461
462 bev = LIST_FIRST(&group->members);
463 while (which--)
464 bev = LIST_NEXT(bev, rate_limiting->next_in_group);
465
466 return bev;
467 }
468
469 /** Iterate over the elements of a rate-limiting group 'g' with a random
470 starting point, assigning each to the variable 'bev', and executing the
471 block 'block'.
472
473 We do this in a half-baked effort to get fairness among group members.
474 XXX Round-robin or some kind of priority queue would be even more fair.
475 */
476 #define FOREACH_RANDOM_ORDER(block) \
477 do { \
478 first = bev_group_random_element_(g); \
479 for (bev = first; bev != LIST_END(&g->members); \
480 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481 block ; \
482 } \
483 for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
485 block ; \
486 } \
487 } while (0)
488
489 static void
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group * g)490 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491 {
492 int again = 0;
493 struct bufferevent_private *bev, *first;
494
495 g->read_suspended = 0;
496 FOREACH_RANDOM_ORDER({
497 if (EVLOCK_TRY_LOCK_(bev->lock)) {
498 bufferevent_unsuspend_read_(&bev->bev,
499 BEV_SUSPEND_BW_GROUP);
500 EVLOCK_UNLOCK(bev->lock, 0);
501 } else {
502 again = 1;
503 }
504 });
505 g->pending_unsuspend_read = again;
506 }
507
508 static void
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group * g)509 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510 {
511 int again = 0;
512 struct bufferevent_private *bev, *first;
513 g->write_suspended = 0;
514
515 FOREACH_RANDOM_ORDER({
516 if (EVLOCK_TRY_LOCK_(bev->lock)) {
517 bufferevent_unsuspend_write_(&bev->bev,
518 BEV_SUSPEND_BW_GROUP);
519 EVLOCK_UNLOCK(bev->lock, 0);
520 } else {
521 again = 1;
522 }
523 });
524 g->pending_unsuspend_write = again;
525 }
526
527 /** Callback invoked every tick to add more elements to the group bucket
528 and unsuspend group members as needed.
529 */
530 static void
bev_group_refill_callback_(evutil_socket_t fd,short what,void * arg)531 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
532 {
533 struct bufferevent_rate_limit_group *g = arg;
534 unsigned tick;
535 struct timeval now;
536
537 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
538
539 LOCK_GROUP(g);
540
541 tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542 ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
543
544 if (g->pending_unsuspend_read ||
545 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546 bev_group_unsuspend_reading_(g);
547 }
548 if (g->pending_unsuspend_write ||
549 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550 bev_group_unsuspend_writing_(g);
551 }
552
553 /* XXXX Rather than waiting to the next tick to unsuspend stuff
554 * with pending_unsuspend_write/read, we should do it on the
555 * next iteration of the mainloop.
556 */
557
558 UNLOCK_GROUP(g);
559 }
560
561 int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)562 bufferevent_set_rate_limit(struct bufferevent *bev,
563 struct ev_token_bucket_cfg *cfg)
564 {
565 struct bufferevent_private *bevp = BEV_UPCAST(bev);
566 int r = -1;
567 struct bufferevent_rate_limit *rlim;
568 struct timeval now;
569 ev_uint32_t tick;
570 int reinit = 0, suspended = 0;
571 /* XXX reference-count cfg */
572
573 BEV_LOCK(bev);
574
575 if (cfg == NULL) {
576 if (bevp->rate_limiting) {
577 rlim = bevp->rate_limiting;
578 rlim->cfg = NULL;
579 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
580 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
581 if (event_initialized(&rlim->refill_bucket_event))
582 event_del(&rlim->refill_bucket_event);
583 }
584 r = 0;
585 goto done;
586 }
587
588 event_base_gettimeofday_cached(bev->ev_base, &now);
589 tick = ev_token_bucket_get_tick_(&now, cfg);
590
591 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
592 /* no-op */
593 r = 0;
594 goto done;
595 }
596 if (bevp->rate_limiting == NULL) {
597 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
598 if (!rlim)
599 goto done;
600 bevp->rate_limiting = rlim;
601 } else {
602 rlim = bevp->rate_limiting;
603 }
604 reinit = rlim->cfg != NULL;
605
606 rlim->cfg = cfg;
607 ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
608
609 if (reinit) {
610 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
611 event_del(&rlim->refill_bucket_event);
612 }
613 event_assign(&rlim->refill_bucket_event, bev->ev_base,
614 -1, EV_FINALIZE, bev_refill_callback_, bevp);
615
616 if (rlim->limit.read_limit > 0) {
617 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
618 } else {
619 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
620 suspended=1;
621 }
622 if (rlim->limit.write_limit > 0) {
623 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
624 } else {
625 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
626 suspended = 1;
627 }
628
629 if (suspended)
630 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
631
632 r = 0;
633
634 done:
635 BEV_UNLOCK(bev);
636 return r;
637 }
638
639 struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)640 bufferevent_rate_limit_group_new(struct event_base *base,
641 const struct ev_token_bucket_cfg *cfg)
642 {
643 struct bufferevent_rate_limit_group *g;
644 struct timeval now;
645 ev_uint32_t tick;
646
647 event_base_gettimeofday_cached(base, &now);
648 tick = ev_token_bucket_get_tick_(&now, cfg);
649
650 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
651 if (!g)
652 return NULL;
653 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
654 LIST_INIT(&g->members);
655
656 ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
657
658 event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
659 bev_group_refill_callback_, g);
660 /*XXXX handle event_add failure */
661 event_add(&g->master_refill_event, &cfg->tick_timeout);
662
663 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
664
665 bufferevent_rate_limit_group_set_min_share(g, 64);
666
667 evutil_weakrand_seed_(&g->weakrand_seed,
668 (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
669
670 return g;
671 }
672
673 int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)674 bufferevent_rate_limit_group_set_cfg(
675 struct bufferevent_rate_limit_group *g,
676 const struct ev_token_bucket_cfg *cfg)
677 {
678 int same_tick;
679 if (!g || !cfg)
680 return -1;
681
682 LOCK_GROUP(g);
683 same_tick = evutil_timercmp(
684 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
685 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
686
687 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
688 g->rate_limit.read_limit = cfg->read_maximum;
689 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
690 g->rate_limit.write_limit = cfg->write_maximum;
691
692 if (!same_tick) {
693 /* This can cause a hiccup in the schedule */
694 event_add(&g->master_refill_event, &cfg->tick_timeout);
695 }
696
697 /* The new limits might force us to adjust min_share differently. */
698 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
699
700 UNLOCK_GROUP(g);
701 return 0;
702 }
703
704 int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)705 bufferevent_rate_limit_group_set_min_share(
706 struct bufferevent_rate_limit_group *g,
707 size_t share)
708 {
709 if (share > EV_SSIZE_MAX)
710 return -1;
711
712 g->configured_min_share = share;
713
714 /* Can't set share to less than the one-tick maximum. IOW, at steady
715 * state, at least one connection can go per tick. */
716 if (share > g->rate_limit_cfg.read_rate)
717 share = g->rate_limit_cfg.read_rate;
718 if (share > g->rate_limit_cfg.write_rate)
719 share = g->rate_limit_cfg.write_rate;
720
721 g->min_share = share;
722 return 0;
723 }
724
725 void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)726 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
727 {
728 LOCK_GROUP(g);
729 EVUTIL_ASSERT(0 == g->n_members);
730 event_del(&g->master_refill_event);
731 UNLOCK_GROUP(g);
732 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
733 mm_free(g);
734 }
735
736 int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)737 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
738 struct bufferevent_rate_limit_group *g)
739 {
740 int wsuspend, rsuspend;
741 struct bufferevent_private *bevp = BEV_UPCAST(bev);
742 BEV_LOCK(bev);
743
744 if (!bevp->rate_limiting) {
745 struct bufferevent_rate_limit *rlim;
746 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747 if (!rlim) {
748 BEV_UNLOCK(bev);
749 return -1;
750 }
751 event_assign(&rlim->refill_bucket_event, bev->ev_base,
752 -1, EV_FINALIZE, bev_refill_callback_, bevp);
753 bevp->rate_limiting = rlim;
754 }
755
756 if (bevp->rate_limiting->group == g) {
757 BEV_UNLOCK(bev);
758 return 0;
759 }
760 if (bevp->rate_limiting->group)
761 bufferevent_remove_from_rate_limit_group(bev);
762
763 LOCK_GROUP(g);
764 bevp->rate_limiting->group = g;
765 ++g->n_members;
766 LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
767
768 rsuspend = g->read_suspended;
769 wsuspend = g->write_suspended;
770
771 UNLOCK_GROUP(g);
772
773 if (rsuspend)
774 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775 if (wsuspend)
776 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
777
778 BEV_UNLOCK(bev);
779 return 0;
780 }
781
782 int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)783 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
784 {
785 return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
786 }
787
788 int
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent * bev,int unsuspend)789 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790 int unsuspend)
791 {
792 struct bufferevent_private *bevp = BEV_UPCAST(bev);
793 BEV_LOCK(bev);
794 if (bevp->rate_limiting && bevp->rate_limiting->group) {
795 struct bufferevent_rate_limit_group *g =
796 bevp->rate_limiting->group;
797 LOCK_GROUP(g);
798 bevp->rate_limiting->group = NULL;
799 --g->n_members;
800 LIST_REMOVE(bevp, rate_limiting->next_in_group);
801 UNLOCK_GROUP(g);
802 }
803 if (unsuspend) {
804 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
805 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
806 }
807 BEV_UNLOCK(bev);
808 return 0;
809 }
810
811 /* ===
812 * API functions to expose rate limits.
813 *
814 * Don't use these from inside Libevent; they're meant to be for use by
815 * the program.
816 * === */
817
818 /* Mostly you don't want to use this function from inside libevent;
819 * bufferevent_get_read_max_() is more likely what you want*/
820 ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)821 bufferevent_get_read_limit(struct bufferevent *bev)
822 {
823 ev_ssize_t r;
824 struct bufferevent_private *bevp;
825 BEV_LOCK(bev);
826 bevp = BEV_UPCAST(bev);
827 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
828 bufferevent_update_buckets(bevp);
829 r = bevp->rate_limiting->limit.read_limit;
830 } else {
831 r = EV_SSIZE_MAX;
832 }
833 BEV_UNLOCK(bev);
834 return r;
835 }
836
837 /* Mostly you don't want to use this function from inside libevent;
838 * bufferevent_get_write_max_() is more likely what you want*/
839 ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)840 bufferevent_get_write_limit(struct bufferevent *bev)
841 {
842 ev_ssize_t r;
843 struct bufferevent_private *bevp;
844 BEV_LOCK(bev);
845 bevp = BEV_UPCAST(bev);
846 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
847 bufferevent_update_buckets(bevp);
848 r = bevp->rate_limiting->limit.write_limit;
849 } else {
850 r = EV_SSIZE_MAX;
851 }
852 BEV_UNLOCK(bev);
853 return r;
854 }
855
856 int
bufferevent_set_max_single_read(struct bufferevent * bev,size_t size)857 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
858 {
859 struct bufferevent_private *bevp;
860 BEV_LOCK(bev);
861 bevp = BEV_UPCAST(bev);
862 if (size == 0 || size > EV_SSIZE_MAX)
863 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
864 else
865 bevp->max_single_read = size;
866 BEV_UNLOCK(bev);
867 return 0;
868 }
869
870 int
bufferevent_set_max_single_write(struct bufferevent * bev,size_t size)871 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
872 {
873 struct bufferevent_private *bevp;
874 BEV_LOCK(bev);
875 bevp = BEV_UPCAST(bev);
876 if (size == 0 || size > EV_SSIZE_MAX)
877 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
878 else
879 bevp->max_single_write = size;
880 BEV_UNLOCK(bev);
881 return 0;
882 }
883
884 ev_ssize_t
bufferevent_get_max_single_read(struct bufferevent * bev)885 bufferevent_get_max_single_read(struct bufferevent *bev)
886 {
887 ev_ssize_t r;
888
889 BEV_LOCK(bev);
890 r = BEV_UPCAST(bev)->max_single_read;
891 BEV_UNLOCK(bev);
892 return r;
893 }
894
895 ev_ssize_t
bufferevent_get_max_single_write(struct bufferevent * bev)896 bufferevent_get_max_single_write(struct bufferevent *bev)
897 {
898 ev_ssize_t r;
899
900 BEV_LOCK(bev);
901 r = BEV_UPCAST(bev)->max_single_write;
902 BEV_UNLOCK(bev);
903 return r;
904 }
905
906 ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)907 bufferevent_get_max_to_read(struct bufferevent *bev)
908 {
909 ev_ssize_t r;
910 BEV_LOCK(bev);
911 r = bufferevent_get_read_max_(BEV_UPCAST(bev));
912 BEV_UNLOCK(bev);
913 return r;
914 }
915
916 ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)917 bufferevent_get_max_to_write(struct bufferevent *bev)
918 {
919 ev_ssize_t r;
920 BEV_LOCK(bev);
921 r = bufferevent_get_write_max_(BEV_UPCAST(bev));
922 BEV_UNLOCK(bev);
923 return r;
924 }
925
926 const struct ev_token_bucket_cfg *
bufferevent_get_token_bucket_cfg(const struct bufferevent * bev)927 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
928 struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
929 struct ev_token_bucket_cfg *cfg;
930
931 BEV_LOCK(bev);
932
933 if (bufev_private->rate_limiting) {
934 cfg = bufev_private->rate_limiting->cfg;
935 } else {
936 cfg = NULL;
937 }
938
939 BEV_UNLOCK(bev);
940
941 return cfg;
942 }
943
944 /* Mostly you don't want to use this function from inside libevent;
945 * bufferevent_get_read_max_() is more likely what you want*/
946 ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)947 bufferevent_rate_limit_group_get_read_limit(
948 struct bufferevent_rate_limit_group *grp)
949 {
950 ev_ssize_t r;
951 LOCK_GROUP(grp);
952 r = grp->rate_limit.read_limit;
953 UNLOCK_GROUP(grp);
954 return r;
955 }
956
957 /* Mostly you don't want to use this function from inside libevent;
958 * bufferevent_get_write_max_() is more likely what you want. */
959 ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)960 bufferevent_rate_limit_group_get_write_limit(
961 struct bufferevent_rate_limit_group *grp)
962 {
963 ev_ssize_t r;
964 LOCK_GROUP(grp);
965 r = grp->rate_limit.write_limit;
966 UNLOCK_GROUP(grp);
967 return r;
968 }
969
970 int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)971 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
972 {
973 int r = 0;
974 ev_ssize_t old_limit, new_limit;
975 struct bufferevent_private *bevp;
976 BEV_LOCK(bev);
977 bevp = BEV_UPCAST(bev);
978 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
979 old_limit = bevp->rate_limiting->limit.read_limit;
980
981 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
982 if (old_limit > 0 && new_limit <= 0) {
983 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
984 if (event_add(&bevp->rate_limiting->refill_bucket_event,
985 &bevp->rate_limiting->cfg->tick_timeout) < 0)
986 r = -1;
987 } else if (old_limit <= 0 && new_limit > 0) {
988 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
989 event_del(&bevp->rate_limiting->refill_bucket_event);
990 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
991 }
992
993 BEV_UNLOCK(bev);
994 return r;
995 }
996
997 int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)998 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
999 {
1000 /* XXXX this is mostly copy-and-paste from
1001 * bufferevent_decrement_read_limit */
1002 int r = 0;
1003 ev_ssize_t old_limit, new_limit;
1004 struct bufferevent_private *bevp;
1005 BEV_LOCK(bev);
1006 bevp = BEV_UPCAST(bev);
1007 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1008 old_limit = bevp->rate_limiting->limit.write_limit;
1009
1010 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1011 if (old_limit > 0 && new_limit <= 0) {
1012 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1013 if (event_add(&bevp->rate_limiting->refill_bucket_event,
1014 &bevp->rate_limiting->cfg->tick_timeout) < 0)
1015 r = -1;
1016 } else if (old_limit <= 0 && new_limit > 0) {
1017 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1018 event_del(&bevp->rate_limiting->refill_bucket_event);
1019 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1020 }
1021
1022 BEV_UNLOCK(bev);
1023 return r;
1024 }
1025
1026 int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)1027 bufferevent_rate_limit_group_decrement_read(
1028 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1029 {
1030 int r = 0;
1031 ev_ssize_t old_limit, new_limit;
1032 LOCK_GROUP(grp);
1033 old_limit = grp->rate_limit.read_limit;
1034 new_limit = (grp->rate_limit.read_limit -= decr);
1035
1036 if (old_limit > 0 && new_limit <= 0) {
1037 bev_group_suspend_reading_(grp);
1038 } else if (old_limit <= 0 && new_limit > 0) {
1039 bev_group_unsuspend_reading_(grp);
1040 }
1041
1042 UNLOCK_GROUP(grp);
1043 return r;
1044 }
1045
1046 int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)1047 bufferevent_rate_limit_group_decrement_write(
1048 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1049 {
1050 int r = 0;
1051 ev_ssize_t old_limit, new_limit;
1052 LOCK_GROUP(grp);
1053 old_limit = grp->rate_limit.write_limit;
1054 new_limit = (grp->rate_limit.write_limit -= decr);
1055
1056 if (old_limit > 0 && new_limit <= 0) {
1057 bev_group_suspend_writing_(grp);
1058 } else if (old_limit <= 0 && new_limit > 0) {
1059 bev_group_unsuspend_writing_(grp);
1060 }
1061
1062 UNLOCK_GROUP(grp);
1063 return r;
1064 }
1065
1066 void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)1067 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1068 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1069 {
1070 EVUTIL_ASSERT(grp != NULL);
1071 if (total_read_out)
1072 *total_read_out = grp->total_read;
1073 if (total_written_out)
1074 *total_written_out = grp->total_written;
1075 }
1076
1077 void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)1078 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1079 {
1080 grp->total_read = grp->total_written = 0;
1081 }
1082
1083 int
bufferevent_ratelim_init_(struct bufferevent_private * bev)1084 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1085 {
1086 bev->rate_limiting = NULL;
1087 bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1088 bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1089
1090 return 0;
1091 }
1092