1 /* $NetBSD: bufferevent_ratelim.c,v 1.5 2020/05/25 20:47:33 christos Exp $ */
2
3 /*
4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. The name of the author may not be used to endorse or promote products
17 * derived from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30 #include "evconfig-private.h"
31
32 #include <sys/types.h>
33 #include <limits.h>
34 #include <string.h>
35 #include <stdlib.h>
36
37 #include "event2/event.h"
38 #include "event2/event_struct.h"
39 #include "event2/util.h"
40 #include "event2/bufferevent.h"
41 #include "event2/bufferevent_struct.h"
42 #include "event2/buffer.h"
43
44 #include "ratelim-internal.h"
45
46 #include "bufferevent-internal.h"
47 #include "mm-internal.h"
48 #include "util-internal.h"
49 #include "event-internal.h"
50
51 int
ev_token_bucket_init_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)52 ev_token_bucket_init_(struct ev_token_bucket *bucket,
53 const struct ev_token_bucket_cfg *cfg,
54 ev_uint32_t current_tick,
55 int reinitialize)
56 {
57 if (reinitialize) {
58 /* on reinitialization, we only clip downwards, since we've
59 already used who-knows-how-much bandwidth this tick. We
60 leave "last_updated" as it is; the next update will add the
61 appropriate amount of bandwidth to the bucket.
62 */
63 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
64 bucket->read_limit = cfg->read_maximum;
65 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
66 bucket->write_limit = cfg->write_maximum;
67 } else {
68 bucket->read_limit = cfg->read_rate;
69 bucket->write_limit = cfg->write_rate;
70 bucket->last_updated = current_tick;
71 }
72 return 0;
73 }
74
75 int
ev_token_bucket_update_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)76 ev_token_bucket_update_(struct ev_token_bucket *bucket,
77 const struct ev_token_bucket_cfg *cfg,
78 ev_uint32_t current_tick)
79 {
80 /* It's okay if the tick number overflows, since we'll just
81 * wrap around when we do the unsigned substraction. */
82 unsigned n_ticks = current_tick - bucket->last_updated;
83
84 /* Make sure some ticks actually happened, and that time didn't
85 * roll back. */
86 if (n_ticks == 0 || n_ticks > INT_MAX)
87 return 0;
88
89 /* Naively, we would say
90 bucket->limit += n_ticks * cfg->rate;
91
92 if (bucket->limit > cfg->maximum)
93 bucket->limit = cfg->maximum;
94
95 But we're worried about overflow, so we do it like this:
96 */
97
98 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
99 bucket->read_limit = cfg->read_maximum;
100 else
101 bucket->read_limit += n_ticks * cfg->read_rate;
102
103
104 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
105 bucket->write_limit = cfg->write_maximum;
106 else
107 bucket->write_limit += n_ticks * cfg->write_rate;
108
109
110 bucket->last_updated = current_tick;
111
112 return 1;
113 }
114
115 static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)116 bufferevent_update_buckets(struct bufferevent_private *bev)
117 {
118 /* Must hold lock on bev. */
119 struct timeval now;
120 unsigned tick;
121 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122 tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
123 if (tick != bev->rate_limiting->limit.last_updated)
124 ev_token_bucket_update_(&bev->rate_limiting->limit,
125 bev->rate_limiting->cfg, tick);
126 }
127
128 ev_uint32_t
ev_token_bucket_get_tick_(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)129 ev_token_bucket_get_tick_(const struct timeval *tv,
130 const struct ev_token_bucket_cfg *cfg)
131 {
132 /* This computation uses two multiplies and a divide. We could do
133 * fewer if we knew that the tick length was an integer number of
134 * seconds, or if we knew it divided evenly into a second. We should
135 * investigate that more.
136 */
137
138 /* We cast to an ev_uint64_t first, since we don't want to overflow
139 * before we do the final divide. */
140 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
141 return (unsigned)(msec / cfg->msec_per_tick);
142 }
143
144 struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)145 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
146 size_t write_rate, size_t write_burst,
147 const struct timeval *tick_len)
148 {
149 struct ev_token_bucket_cfg *r;
150 struct timeval g;
151 if (! tick_len) {
152 g.tv_sec = 1;
153 g.tv_usec = 0;
154 tick_len = &g;
155 }
156 if (read_rate > read_burst || write_rate > write_burst ||
157 read_rate < 1 || write_rate < 1)
158 return NULL;
159 if (read_rate > EV_RATE_LIMIT_MAX ||
160 write_rate > EV_RATE_LIMIT_MAX ||
161 read_burst > EV_RATE_LIMIT_MAX ||
162 write_burst > EV_RATE_LIMIT_MAX)
163 return NULL;
164 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165 if (!r)
166 return NULL;
167 r->read_rate = read_rate;
168 r->write_rate = write_rate;
169 r->read_maximum = read_burst;
170 r->write_maximum = write_burst;
171 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
172 r->msec_per_tick = (tick_len->tv_sec * 1000) +
173 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
174 return r;
175 }
176
177 void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)178 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
179 {
180 mm_free(cfg);
181 }
182
183 /* Default values for max_single_read & max_single_write variables. */
184 #define MAX_SINGLE_READ_DEFAULT 16384
185 #define MAX_SINGLE_WRITE_DEFAULT 16384
186
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189
190 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
194
195 /** Helper: figure out the maximum amount we should write if is_write, or
196 the maximum amount we should read if is_read. Return that maximum, or
197 0 if our bucket is wholly exhausted.
198 */
199 static inline ev_ssize_t
bufferevent_get_rlim_max_(struct bufferevent_private * bev,int is_write)200 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
201 {
202 /* needs lock on bev. */
203 ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204
205 #define LIM(x) \
206 (is_write ? (x).write_limit : (x).read_limit)
207
208 #define GROUP_SUSPENDED(g) \
209 (is_write ? (g)->write_suspended : (g)->read_suspended)
210
211 /* Sets max_so_far to MIN(x, max_so_far) */
212 #define CLAMPTO(x) \
213 do { \
214 if (max_so_far > (x)) \
215 max_so_far = (x); \
216 } while (0);
217
218 if (!bev->rate_limiting)
219 return max_so_far;
220
221 /* If rate-limiting is enabled at all, update the appropriate
222 bucket, and take the smaller of our rate limit and the group
223 rate limit.
224 */
225
226 if (bev->rate_limiting->cfg) {
227 bufferevent_update_buckets(bev);
228 max_so_far = LIM(bev->rate_limiting->limit);
229 }
230 if (bev->rate_limiting->group) {
231 struct bufferevent_rate_limit_group *g =
232 bev->rate_limiting->group;
233 ev_ssize_t share;
234 LOCK_GROUP(g);
235 if (GROUP_SUSPENDED(g)) {
236 /* We can get here if we failed to lock this
237 * particular bufferevent while suspending the whole
238 * group. */
239 if (is_write)
240 bufferevent_suspend_write_(&bev->bev,
241 BEV_SUSPEND_BW_GROUP);
242 else
243 bufferevent_suspend_read_(&bev->bev,
244 BEV_SUSPEND_BW_GROUP);
245 share = 0;
246 } else {
247 /* XXXX probably we should divide among the active
248 * members, not the total members. */
249 share = LIM(g->rate_limit) / g->n_members;
250 if (share < g->min_share)
251 share = g->min_share;
252 }
253 UNLOCK_GROUP(g);
254 CLAMPTO(share);
255 }
256
257 if (max_so_far < 0)
258 max_so_far = 0;
259 return max_so_far;
260 }
261
262 ev_ssize_t
bufferevent_get_read_max_(struct bufferevent_private * bev)263 bufferevent_get_read_max_(struct bufferevent_private *bev)
264 {
265 return bufferevent_get_rlim_max_(bev, 0);
266 }
267
268 ev_ssize_t
bufferevent_get_write_max_(struct bufferevent_private * bev)269 bufferevent_get_write_max_(struct bufferevent_private *bev)
270 {
271 return bufferevent_get_rlim_max_(bev, 1);
272 }
273
274 int
bufferevent_decrement_read_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)275 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277 /* XXXXX Make sure all users of this function check its return value */
278 int r = 0;
279 /* need to hold lock on bev */
280 if (!bev->rate_limiting)
281 return 0;
282
283 if (bev->rate_limiting->cfg) {
284 bev->rate_limiting->limit.read_limit -= bytes;
285 if (bev->rate_limiting->limit.read_limit <= 0) {
286 bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
287 if (event_add(&bev->rate_limiting->refill_bucket_event,
288 &bev->rate_limiting->cfg->tick_timeout) < 0)
289 r = -1;
290 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
291 if (!(bev->write_suspended & BEV_SUSPEND_BW))
292 event_del(&bev->rate_limiting->refill_bucket_event);
293 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
294 }
295 }
296
297 if (bev->rate_limiting->group) {
298 LOCK_GROUP(bev->rate_limiting->group);
299 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300 bev->rate_limiting->group->total_read += bytes;
301 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302 bev_group_suspend_reading_(bev->rate_limiting->group);
303 } else if (bev->rate_limiting->group->read_suspended) {
304 bev_group_unsuspend_reading_(bev->rate_limiting->group);
305 }
306 UNLOCK_GROUP(bev->rate_limiting->group);
307 }
308
309 return r;
310 }
311
312 int
bufferevent_decrement_write_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)313 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315 /* XXXXX Make sure all users of this function check its return value */
316 int r = 0;
317 /* need to hold lock */
318 if (!bev->rate_limiting)
319 return 0;
320
321 if (bev->rate_limiting->cfg) {
322 bev->rate_limiting->limit.write_limit -= bytes;
323 if (bev->rate_limiting->limit.write_limit <= 0) {
324 bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
325 if (event_add(&bev->rate_limiting->refill_bucket_event,
326 &bev->rate_limiting->cfg->tick_timeout) < 0)
327 r = -1;
328 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
329 if (!(bev->read_suspended & BEV_SUSPEND_BW))
330 event_del(&bev->rate_limiting->refill_bucket_event);
331 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
332 }
333 }
334
335 if (bev->rate_limiting->group) {
336 LOCK_GROUP(bev->rate_limiting->group);
337 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338 bev->rate_limiting->group->total_written += bytes;
339 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340 bev_group_suspend_writing_(bev->rate_limiting->group);
341 } else if (bev->rate_limiting->group->write_suspended) {
342 bev_group_unsuspend_writing_(bev->rate_limiting->group);
343 }
344 UNLOCK_GROUP(bev->rate_limiting->group);
345 }
346
347 return r;
348 }
349
350 /** Stop reading on every bufferevent in <b>g</b> */
351 static int
bev_group_suspend_reading_(struct bufferevent_rate_limit_group * g)352 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
353 {
354 /* Needs group lock */
355 struct bufferevent_private *bev;
356 g->read_suspended = 1;
357 g->pending_unsuspend_read = 0;
358
359 /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
360 to prevent a deadlock. (Ordinarily, the group lock nests inside
361 the bufferevent locks. If we are unable to lock any individual
362 bufferevent, it will find out later when it looks at its limit
363 and sees that its group is suspended.)
364 */
365 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366 if (EVLOCK_TRY_LOCK_(bev->lock)) {
367 bufferevent_suspend_read_(&bev->bev,
368 BEV_SUSPEND_BW_GROUP);
369 EVLOCK_UNLOCK(bev->lock, 0);
370 }
371 }
372 return 0;
373 }
374
375 /** Stop writing on every bufferevent in <b>g</b> */
376 static int
bev_group_suspend_writing_(struct bufferevent_rate_limit_group * g)377 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
378 {
379 /* Needs group lock */
380 struct bufferevent_private *bev;
381 g->write_suspended = 1;
382 g->pending_unsuspend_write = 0;
383 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384 if (EVLOCK_TRY_LOCK_(bev->lock)) {
385 bufferevent_suspend_write_(&bev->bev,
386 BEV_SUSPEND_BW_GROUP);
387 EVLOCK_UNLOCK(bev->lock, 0);
388 }
389 }
390 return 0;
391 }
392
393 /** Timer callback invoked on a single bufferevent with one or more exhausted
394 buckets when they are ready to refill. */
395 static void
bev_refill_callback_(evutil_socket_t fd,short what,void * arg)396 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
397 {
398 unsigned tick;
399 struct timeval now;
400 struct bufferevent_private *bev = arg;
401 int again = 0;
402 BEV_LOCK(&bev->bev);
403 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404 BEV_UNLOCK(&bev->bev);
405 return;
406 }
407
408 /* First, update the bucket */
409 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410 tick = ev_token_bucket_get_tick_(&now,
411 bev->rate_limiting->cfg);
412 ev_token_bucket_update_(&bev->rate_limiting->limit,
413 bev->rate_limiting->cfg,
414 tick);
415
416 /* Now unsuspend any read/write operations as appropriate. */
417 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418 if (bev->rate_limiting->limit.read_limit > 0)
419 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
420 else
421 again = 1;
422 }
423 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424 if (bev->rate_limiting->limit.write_limit > 0)
425 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
426 else
427 again = 1;
428 }
429 if (again) {
430 /* One or more of the buckets may need another refill if they
431 started negative.
432
433 XXXX if we need to be quiet for more ticks, we should
434 maybe figure out what timeout we really want.
435 */
436 /* XXXX Handle event_add failure somehow */
437 event_add(&bev->rate_limiting->refill_bucket_event,
438 &bev->rate_limiting->cfg->tick_timeout);
439 }
440 BEV_UNLOCK(&bev->bev);
441 }
442
443 /** Helper: grab a random element from a bufferevent group.
444 *
445 * Requires that we hold the lock on the group.
446 */
447 static struct bufferevent_private *
bev_group_random_element_(struct bufferevent_rate_limit_group * group)448 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449 {
450 int which;
451 struct bufferevent_private *bev;
452
453 /* requires group lock */
454
455 if (!group->n_members)
456 return NULL;
457
458 EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
459
460 which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
461
462 bev = LIST_FIRST(&group->members);
463 while (which--)
464 bev = LIST_NEXT(bev, rate_limiting->next_in_group);
465
466 return bev;
467 }
468
469 /** Iterate over the elements of a rate-limiting group 'g' with a random
470 starting point, assigning each to the variable 'bev', and executing the
471 block 'block'.
472
473 We do this in a half-baked effort to get fairness among group members.
474 XXX Round-robin or some kind of priority queue would be even more fair.
475 */
476 #define FOREACH_RANDOM_ORDER(block) \
477 do { \
478 first = bev_group_random_element_(g); \
479 for (bev = first; bev != LIST_END(&g->members); \
480 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481 block ; \
482 } \
483 for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
485 block ; \
486 } \
487 } while (0)
488
489 static void
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group * g)490 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491 {
492 int again = 0;
493 struct bufferevent_private *bev, *first;
494
495 g->read_suspended = 0;
496 FOREACH_RANDOM_ORDER({
497 if (EVLOCK_TRY_LOCK_(bev->lock)) {
498 bufferevent_unsuspend_read_(&bev->bev,
499 BEV_SUSPEND_BW_GROUP);
500 EVLOCK_UNLOCK(bev->lock, 0);
501 } else {
502 again = 1;
503 }
504 });
505 g->pending_unsuspend_read = again;
506 }
507
508 static void
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group * g)509 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510 {
511 int again = 0;
512 struct bufferevent_private *bev, *first;
513 g->write_suspended = 0;
514
515 FOREACH_RANDOM_ORDER({
516 if (EVLOCK_TRY_LOCK_(bev->lock)) {
517 bufferevent_unsuspend_write_(&bev->bev,
518 BEV_SUSPEND_BW_GROUP);
519 EVLOCK_UNLOCK(bev->lock, 0);
520 } else {
521 again = 1;
522 }
523 });
524 g->pending_unsuspend_write = again;
525 }
526
527 /** Callback invoked every tick to add more elements to the group bucket
528 and unsuspend group members as needed.
529 */
530 static void
bev_group_refill_callback_(evutil_socket_t fd,short what,void * arg)531 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
532 {
533 struct bufferevent_rate_limit_group *g = arg;
534 unsigned tick;
535 struct timeval now;
536
537 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
538
539 LOCK_GROUP(g);
540
541 tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542 ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
543
544 if (g->pending_unsuspend_read ||
545 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546 bev_group_unsuspend_reading_(g);
547 }
548 if (g->pending_unsuspend_write ||
549 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550 bev_group_unsuspend_writing_(g);
551 }
552
553 /* XXXX Rather than waiting to the next tick to unsuspend stuff
554 * with pending_unsuspend_write/read, we should do it on the
555 * next iteration of the mainloop.
556 */
557
558 UNLOCK_GROUP(g);
559 }
560
561 int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)562 bufferevent_set_rate_limit(struct bufferevent *bev,
563 struct ev_token_bucket_cfg *cfg)
564 {
565 struct bufferevent_private *bevp =
566 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
567 int r = -1;
568 struct bufferevent_rate_limit *rlim;
569 struct timeval now;
570 ev_uint32_t tick;
571 int reinit = 0, suspended = 0;
572 /* XXX reference-count cfg */
573
574 BEV_LOCK(bev);
575
576 if (cfg == NULL) {
577 if (bevp->rate_limiting) {
578 rlim = bevp->rate_limiting;
579 rlim->cfg = NULL;
580 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
581 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
582 if (event_initialized(&rlim->refill_bucket_event))
583 event_del(&rlim->refill_bucket_event);
584 }
585 r = 0;
586 goto done;
587 }
588
589 event_base_gettimeofday_cached(bev->ev_base, &now);
590 tick = ev_token_bucket_get_tick_(&now, cfg);
591
592 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
593 /* no-op */
594 r = 0;
595 goto done;
596 }
597 if (bevp->rate_limiting == NULL) {
598 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
599 if (!rlim)
600 goto done;
601 bevp->rate_limiting = rlim;
602 } else {
603 rlim = bevp->rate_limiting;
604 }
605 reinit = rlim->cfg != NULL;
606
607 rlim->cfg = cfg;
608 ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
609
610 if (reinit) {
611 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
612 event_del(&rlim->refill_bucket_event);
613 }
614 event_assign(&rlim->refill_bucket_event, bev->ev_base,
615 -1, EV_FINALIZE, bev_refill_callback_, bevp);
616
617 if (rlim->limit.read_limit > 0) {
618 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
619 } else {
620 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
621 suspended=1;
622 }
623 if (rlim->limit.write_limit > 0) {
624 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
625 } else {
626 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
627 suspended = 1;
628 }
629
630 if (suspended)
631 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
632
633 r = 0;
634
635 done:
636 BEV_UNLOCK(bev);
637 return r;
638 }
639
640 struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)641 bufferevent_rate_limit_group_new(struct event_base *base,
642 const struct ev_token_bucket_cfg *cfg)
643 {
644 struct bufferevent_rate_limit_group *g;
645 struct timeval now;
646 ev_uint32_t tick;
647
648 event_base_gettimeofday_cached(base, &now);
649 tick = ev_token_bucket_get_tick_(&now, cfg);
650
651 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
652 if (!g)
653 return NULL;
654 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
655 LIST_INIT(&g->members);
656
657 ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
658
659 event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
660 bev_group_refill_callback_, g);
661 /*XXXX handle event_add failure */
662 event_add(&g->master_refill_event, &cfg->tick_timeout);
663
664 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
665
666 bufferevent_rate_limit_group_set_min_share(g, 64);
667
668 evutil_weakrand_seed_(&g->weakrand_seed,
669 (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
670
671 return g;
672 }
673
674 int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)675 bufferevent_rate_limit_group_set_cfg(
676 struct bufferevent_rate_limit_group *g,
677 const struct ev_token_bucket_cfg *cfg)
678 {
679 int same_tick;
680 if (!g || !cfg)
681 return -1;
682
683 LOCK_GROUP(g);
684 same_tick = evutil_timercmp(
685 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
686 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
687
688 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
689 g->rate_limit.read_limit = cfg->read_maximum;
690 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
691 g->rate_limit.write_limit = cfg->write_maximum;
692
693 if (!same_tick) {
694 /* This can cause a hiccup in the schedule */
695 event_add(&g->master_refill_event, &cfg->tick_timeout);
696 }
697
698 /* The new limits might force us to adjust min_share differently. */
699 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
700
701 UNLOCK_GROUP(g);
702 return 0;
703 }
704
705 int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)706 bufferevent_rate_limit_group_set_min_share(
707 struct bufferevent_rate_limit_group *g,
708 size_t share)
709 {
710 if (share > EV_SSIZE_MAX)
711 return -1;
712
713 g->configured_min_share = share;
714
715 /* Can't set share to less than the one-tick maximum. IOW, at steady
716 * state, at least one connection can go per tick. */
717 if (share > g->rate_limit_cfg.read_rate)
718 share = g->rate_limit_cfg.read_rate;
719 if (share > g->rate_limit_cfg.write_rate)
720 share = g->rate_limit_cfg.write_rate;
721
722 g->min_share = share;
723 return 0;
724 }
725
726 void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)727 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
728 {
729 LOCK_GROUP(g);
730 EVUTIL_ASSERT(0 == g->n_members);
731 event_del(&g->master_refill_event);
732 UNLOCK_GROUP(g);
733 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
734 mm_free(g);
735 }
736
737 int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)738 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
739 struct bufferevent_rate_limit_group *g)
740 {
741 int wsuspend, rsuspend;
742 struct bufferevent_private *bevp =
743 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
744 BEV_LOCK(bev);
745
746 if (!bevp->rate_limiting) {
747 struct bufferevent_rate_limit *rlim;
748 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
749 if (!rlim) {
750 BEV_UNLOCK(bev);
751 return -1;
752 }
753 event_assign(&rlim->refill_bucket_event, bev->ev_base,
754 -1, EV_FINALIZE, bev_refill_callback_, bevp);
755 bevp->rate_limiting = rlim;
756 }
757
758 if (bevp->rate_limiting->group == g) {
759 BEV_UNLOCK(bev);
760 return 0;
761 }
762 if (bevp->rate_limiting->group)
763 bufferevent_remove_from_rate_limit_group(bev);
764
765 LOCK_GROUP(g);
766 bevp->rate_limiting->group = g;
767 ++g->n_members;
768 LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
769
770 rsuspend = g->read_suspended;
771 wsuspend = g->write_suspended;
772
773 UNLOCK_GROUP(g);
774
775 if (rsuspend)
776 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
777 if (wsuspend)
778 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
779
780 BEV_UNLOCK(bev);
781 return 0;
782 }
783
784 int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)785 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
786 {
787 return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
788 }
789
790 int
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent * bev,int unsuspend)791 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
792 int unsuspend)
793 {
794 struct bufferevent_private *bevp =
795 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
796 BEV_LOCK(bev);
797 if (bevp->rate_limiting && bevp->rate_limiting->group) {
798 struct bufferevent_rate_limit_group *g =
799 bevp->rate_limiting->group;
800 LOCK_GROUP(g);
801 bevp->rate_limiting->group = NULL;
802 --g->n_members;
803 LIST_REMOVE(bevp, rate_limiting->next_in_group);
804 UNLOCK_GROUP(g);
805 }
806 if (unsuspend) {
807 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
808 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
809 }
810 BEV_UNLOCK(bev);
811 return 0;
812 }
813
814 /* ===
815 * API functions to expose rate limits.
816 *
817 * Don't use these from inside Libevent; they're meant to be for use by
818 * the program.
819 * === */
820
821 /* Mostly you don't want to use this function from inside libevent;
822 * bufferevent_get_read_max_() is more likely what you want*/
823 ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)824 bufferevent_get_read_limit(struct bufferevent *bev)
825 {
826 ev_ssize_t r;
827 struct bufferevent_private *bevp;
828 BEV_LOCK(bev);
829 bevp = BEV_UPCAST(bev);
830 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
831 bufferevent_update_buckets(bevp);
832 r = bevp->rate_limiting->limit.read_limit;
833 } else {
834 r = EV_SSIZE_MAX;
835 }
836 BEV_UNLOCK(bev);
837 return r;
838 }
839
840 /* Mostly you don't want to use this function from inside libevent;
841 * bufferevent_get_write_max_() is more likely what you want*/
842 ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)843 bufferevent_get_write_limit(struct bufferevent *bev)
844 {
845 ev_ssize_t r;
846 struct bufferevent_private *bevp;
847 BEV_LOCK(bev);
848 bevp = BEV_UPCAST(bev);
849 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
850 bufferevent_update_buckets(bevp);
851 r = bevp->rate_limiting->limit.write_limit;
852 } else {
853 r = EV_SSIZE_MAX;
854 }
855 BEV_UNLOCK(bev);
856 return r;
857 }
858
859 int
bufferevent_set_max_single_read(struct bufferevent * bev,size_t size)860 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
861 {
862 struct bufferevent_private *bevp;
863 BEV_LOCK(bev);
864 bevp = BEV_UPCAST(bev);
865 if (size == 0 || size > EV_SSIZE_MAX)
866 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
867 else
868 bevp->max_single_read = size;
869 BEV_UNLOCK(bev);
870 return 0;
871 }
872
873 int
bufferevent_set_max_single_write(struct bufferevent * bev,size_t size)874 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
875 {
876 struct bufferevent_private *bevp;
877 BEV_LOCK(bev);
878 bevp = BEV_UPCAST(bev);
879 if (size == 0 || size > EV_SSIZE_MAX)
880 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
881 else
882 bevp->max_single_write = size;
883 BEV_UNLOCK(bev);
884 return 0;
885 }
886
887 ev_ssize_t
bufferevent_get_max_single_read(struct bufferevent * bev)888 bufferevent_get_max_single_read(struct bufferevent *bev)
889 {
890 ev_ssize_t r;
891
892 BEV_LOCK(bev);
893 r = BEV_UPCAST(bev)->max_single_read;
894 BEV_UNLOCK(bev);
895 return r;
896 }
897
898 ev_ssize_t
bufferevent_get_max_single_write(struct bufferevent * bev)899 bufferevent_get_max_single_write(struct bufferevent *bev)
900 {
901 ev_ssize_t r;
902
903 BEV_LOCK(bev);
904 r = BEV_UPCAST(bev)->max_single_write;
905 BEV_UNLOCK(bev);
906 return r;
907 }
908
909 ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)910 bufferevent_get_max_to_read(struct bufferevent *bev)
911 {
912 ev_ssize_t r;
913 BEV_LOCK(bev);
914 r = bufferevent_get_read_max_(BEV_UPCAST(bev));
915 BEV_UNLOCK(bev);
916 return r;
917 }
918
919 ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)920 bufferevent_get_max_to_write(struct bufferevent *bev)
921 {
922 ev_ssize_t r;
923 BEV_LOCK(bev);
924 r = bufferevent_get_write_max_(BEV_UPCAST(bev));
925 BEV_UNLOCK(bev);
926 return r;
927 }
928
929 const struct ev_token_bucket_cfg *
bufferevent_get_token_bucket_cfg(const struct bufferevent * bev)930 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
931 struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
932 struct ev_token_bucket_cfg *cfg;
933
934 BEV_LOCK(bev);
935
936 if (bufev_private->rate_limiting) {
937 cfg = bufev_private->rate_limiting->cfg;
938 } else {
939 cfg = NULL;
940 }
941
942 BEV_UNLOCK(bev);
943
944 return cfg;
945 }
946
947 /* Mostly you don't want to use this function from inside libevent;
948 * bufferevent_get_read_max_() is more likely what you want*/
949 ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)950 bufferevent_rate_limit_group_get_read_limit(
951 struct bufferevent_rate_limit_group *grp)
952 {
953 ev_ssize_t r;
954 LOCK_GROUP(grp);
955 r = grp->rate_limit.read_limit;
956 UNLOCK_GROUP(grp);
957 return r;
958 }
959
960 /* Mostly you don't want to use this function from inside libevent;
961 * bufferevent_get_write_max_() is more likely what you want. */
962 ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)963 bufferevent_rate_limit_group_get_write_limit(
964 struct bufferevent_rate_limit_group *grp)
965 {
966 ev_ssize_t r;
967 LOCK_GROUP(grp);
968 r = grp->rate_limit.write_limit;
969 UNLOCK_GROUP(grp);
970 return r;
971 }
972
973 int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)974 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
975 {
976 int r = 0;
977 ev_ssize_t old_limit, new_limit;
978 struct bufferevent_private *bevp;
979 BEV_LOCK(bev);
980 bevp = BEV_UPCAST(bev);
981 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
982 old_limit = bevp->rate_limiting->limit.read_limit;
983
984 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
985 if (old_limit > 0 && new_limit <= 0) {
986 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
987 if (event_add(&bevp->rate_limiting->refill_bucket_event,
988 &bevp->rate_limiting->cfg->tick_timeout) < 0)
989 r = -1;
990 } else if (old_limit <= 0 && new_limit > 0) {
991 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
992 event_del(&bevp->rate_limiting->refill_bucket_event);
993 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
994 }
995
996 BEV_UNLOCK(bev);
997 return r;
998 }
999
1000 int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)1001 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1002 {
1003 /* XXXX this is mostly copy-and-paste from
1004 * bufferevent_decrement_read_limit */
1005 int r = 0;
1006 ev_ssize_t old_limit, new_limit;
1007 struct bufferevent_private *bevp;
1008 BEV_LOCK(bev);
1009 bevp = BEV_UPCAST(bev);
1010 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1011 old_limit = bevp->rate_limiting->limit.write_limit;
1012
1013 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1014 if (old_limit > 0 && new_limit <= 0) {
1015 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1016 if (event_add(&bevp->rate_limiting->refill_bucket_event,
1017 &bevp->rate_limiting->cfg->tick_timeout) < 0)
1018 r = -1;
1019 } else if (old_limit <= 0 && new_limit > 0) {
1020 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1021 event_del(&bevp->rate_limiting->refill_bucket_event);
1022 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1023 }
1024
1025 BEV_UNLOCK(bev);
1026 return r;
1027 }
1028
1029 int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)1030 bufferevent_rate_limit_group_decrement_read(
1031 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1032 {
1033 int r = 0;
1034 ev_ssize_t old_limit, new_limit;
1035 LOCK_GROUP(grp);
1036 old_limit = grp->rate_limit.read_limit;
1037 new_limit = (grp->rate_limit.read_limit -= decr);
1038
1039 if (old_limit > 0 && new_limit <= 0) {
1040 bev_group_suspend_reading_(grp);
1041 } else if (old_limit <= 0 && new_limit > 0) {
1042 bev_group_unsuspend_reading_(grp);
1043 }
1044
1045 UNLOCK_GROUP(grp);
1046 return r;
1047 }
1048
1049 int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)1050 bufferevent_rate_limit_group_decrement_write(
1051 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1052 {
1053 int r = 0;
1054 ev_ssize_t old_limit, new_limit;
1055 LOCK_GROUP(grp);
1056 old_limit = grp->rate_limit.write_limit;
1057 new_limit = (grp->rate_limit.write_limit -= decr);
1058
1059 if (old_limit > 0 && new_limit <= 0) {
1060 bev_group_suspend_writing_(grp);
1061 } else if (old_limit <= 0 && new_limit > 0) {
1062 bev_group_unsuspend_writing_(grp);
1063 }
1064
1065 UNLOCK_GROUP(grp);
1066 return r;
1067 }
1068
1069 void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)1070 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1071 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1072 {
1073 EVUTIL_ASSERT(grp != NULL);
1074 if (total_read_out)
1075 *total_read_out = grp->total_read;
1076 if (total_written_out)
1077 *total_written_out = grp->total_written;
1078 }
1079
1080 void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)1081 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1082 {
1083 grp->total_read = grp->total_written = 0;
1084 }
1085
1086 int
bufferevent_ratelim_init_(struct bufferevent_private * bev)1087 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1088 {
1089 bev->rate_limiting = NULL;
1090 bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1091 bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1092
1093 return 0;
1094 }
1095