1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "httpd.h"
18 #include "http_config.h"
19 #include "http_log.h"
20 #include "util_filter.h"
21
22 #include "mod_ratelimit.h"
23
24 #define RATE_LIMIT_FILTER_NAME "RATE_LIMIT"
25 #define RATE_INTERVAL_MS (200)
26
27 typedef enum rl_state_e
28 {
29 RATE_LIMIT,
30 RATE_FULLSPEED
31 } rl_state_e;
32
33 typedef struct rl_ctx_t
34 {
35 int speed;
36 int chunk_size;
37 int burst;
38 int do_sleep;
39 rl_state_e state;
40 apr_bucket_brigade *tmpbb;
41 apr_bucket_brigade *holdingbb;
42 } rl_ctx_t;
43
44 #if defined(RLFDEBUG)
brigade_dump(request_rec * r,apr_bucket_brigade * bb)45 static void brigade_dump(request_rec *r, apr_bucket_brigade *bb)
46 {
47 apr_bucket *e;
48 int i = 0;
49
50 for (e = APR_BRIGADE_FIRST(bb);
51 e != APR_BRIGADE_SENTINEL(bb); e = APR_BUCKET_NEXT(e), i++) {
52 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(03193)
53 "brigade: [%d] %s", i, e->type->name);
54
55 }
56 }
57 #endif /* RLFDEBUG */
58
59 static apr_status_t
rate_limit_filter(ap_filter_t * f,apr_bucket_brigade * bb)60 rate_limit_filter(ap_filter_t *f, apr_bucket_brigade *bb)
61 {
62 apr_status_t rv = APR_SUCCESS;
63 rl_ctx_t *ctx = f->ctx;
64 apr_bucket_alloc_t *ba = f->r->connection->bucket_alloc;
65
66 /* Set up our rl_ctx_t on first use */
67 if (ctx == NULL) {
68 const char *rl = NULL;
69 int ratelimit;
70 int burst = 0;
71
72 /* no subrequests. */
73 if (f->r->main != NULL) {
74 ap_remove_output_filter(f);
75 return ap_pass_brigade(f->next, bb);
76 }
77
78 /* Configuration: rate limit */
79 rl = apr_table_get(f->r->subprocess_env, "rate-limit");
80
81 if (rl == NULL) {
82 ap_remove_output_filter(f);
83 return ap_pass_brigade(f->next, bb);
84 }
85
86 /* rl is in kilo bytes / second */
87 ratelimit = atoi(rl) * 1024;
88 if (ratelimit <= 0) {
89 /* remove ourselves */
90 ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, f->r,
91 APLOGNO(03488) "rl: disabling: rate-limit = %s (too high?)", rl);
92 ap_remove_output_filter(f);
93 return ap_pass_brigade(f->next, bb);
94 }
95
96 /* Configuration: optional initial burst */
97 rl = apr_table_get(f->r->subprocess_env, "rate-initial-burst");
98 if (rl != NULL) {
99 burst = atoi(rl) * 1024;
100 if (burst <= 0) {
101 ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, f->r,
102 APLOGNO(03489) "rl: disabling burst: rate-initial-burst = %s (too high?)", rl);
103 burst = 0;
104 }
105 }
106
107 /* Set up our context */
108 ctx = apr_palloc(f->r->pool, sizeof(rl_ctx_t));
109 f->ctx = ctx;
110 ctx->state = RATE_LIMIT;
111 ctx->speed = ratelimit;
112 ctx->burst = burst;
113 ctx->do_sleep = 0;
114
115 /* calculate how many bytes / interval we want to send */
116 /* speed is bytes / second, so, how many (speed / 1000 % interval) */
117 ctx->chunk_size = (ctx->speed / (1000 / RATE_INTERVAL_MS));
118 ctx->tmpbb = apr_brigade_create(f->r->pool, ba);
119 ctx->holdingbb = apr_brigade_create(f->r->pool, ba);
120 }
121 else {
122 APR_BRIGADE_PREPEND(bb, ctx->holdingbb);
123 }
124
125 while (!APR_BRIGADE_EMPTY(bb)) {
126 apr_bucket *e;
127
128 if (ctx->state == RATE_FULLSPEED) {
129 /* Find where we 'stop' going full speed. */
130 for (e = APR_BRIGADE_FIRST(bb);
131 e != APR_BRIGADE_SENTINEL(bb); e = APR_BUCKET_NEXT(e)) {
132 if (AP_RL_BUCKET_IS_END(e)) {
133 apr_brigade_split_ex(bb, e, ctx->holdingbb);
134 ctx->state = RATE_LIMIT;
135 break;
136 }
137 }
138
139 e = apr_bucket_flush_create(ba);
140 APR_BRIGADE_INSERT_TAIL(bb, e);
141 rv = ap_pass_brigade(f->next, bb);
142 apr_brigade_cleanup(bb);
143
144 if (rv != APR_SUCCESS) {
145 ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, f->r, APLOGNO(01455)
146 "rl: full speed brigade pass failed.");
147 return rv;
148 }
149 }
150 else {
151 for (e = APR_BRIGADE_FIRST(bb);
152 e != APR_BRIGADE_SENTINEL(bb); e = APR_BUCKET_NEXT(e)) {
153 if (AP_RL_BUCKET_IS_START(e)) {
154 apr_brigade_split_ex(bb, e, ctx->holdingbb);
155 ctx->state = RATE_FULLSPEED;
156 break;
157 }
158 }
159
160 while (!APR_BRIGADE_EMPTY(bb)) {
161 apr_off_t len = ctx->chunk_size + ctx->burst;
162
163 APR_BRIGADE_CONCAT(ctx->tmpbb, bb);
164
165 /*
166 * Pull next chunk of data; the initial amount is our
167 * burst allotment (if any) plus a chunk. All subsequent
168 * iterations are just chunks with whatever remaining
169 * burst amounts we have left (in case not done in the
170 * first bucket).
171 */
172 rv = apr_brigade_partition(ctx->tmpbb, len, &e);
173 if (rv != APR_SUCCESS && rv != APR_INCOMPLETE) {
174 ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, f->r, APLOGNO(01456)
175 "rl: partition failed.");
176 return rv;
177 }
178 /* Send next metadata now if any */
179 while (e != APR_BRIGADE_SENTINEL(ctx->tmpbb)
180 && APR_BUCKET_IS_METADATA(e)) {
181 e = APR_BUCKET_NEXT(e);
182 }
183 if (e != APR_BRIGADE_SENTINEL(ctx->tmpbb)) {
184 apr_brigade_split_ex(ctx->tmpbb, e, bb);
185 }
186 else {
187 apr_brigade_length(ctx->tmpbb, 1, &len);
188 }
189
190 /*
191 * Adjust the burst amount depending on how much
192 * we've done up to now.
193 */
194 if (ctx->burst) {
195 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r,
196 APLOGNO(03485) "rl: burst %d; len %"APR_OFF_T_FMT, ctx->burst, len);
197 if (len < ctx->burst) {
198 ctx->burst -= len;
199 }
200 else {
201 ctx->burst = 0;
202 }
203 }
204
205 e = APR_BRIGADE_LAST(ctx->tmpbb);
206 if (APR_BUCKET_IS_EOS(e)) {
207 ap_remove_output_filter(f);
208 }
209 else if (!APR_BUCKET_IS_FLUSH(e)) {
210 if (APR_BRIGADE_EMPTY(bb)) {
211 /* Wait for more (or next call) */
212 break;
213 }
214 e = apr_bucket_flush_create(ba);
215 APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
216 }
217
218 #if defined(RLFDEBUG)
219 brigade_dump(f->r, ctx->tmpbb);
220 brigade_dump(f->r, bb);
221 #endif /* RLFDEBUG */
222
223 if (ctx->do_sleep) {
224 apr_sleep(RATE_INTERVAL_MS * 1000);
225 }
226 else {
227 ctx->do_sleep = 1;
228 }
229
230 rv = ap_pass_brigade(f->next, ctx->tmpbb);
231 apr_brigade_cleanup(ctx->tmpbb);
232
233 if (rv != APR_SUCCESS) {
234 /* Most often, user disconnects from stream */
235 ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, f->r, APLOGNO(01457)
236 "rl: brigade pass failed.");
237 return rv;
238 }
239 }
240 }
241
242 if (!APR_BRIGADE_EMPTY(ctx->holdingbb)) {
243 /* Any rate-limited data in tmpbb is sent unlimited along
244 * with the rest.
245 */
246 APR_BRIGADE_CONCAT(bb, ctx->tmpbb);
247 APR_BRIGADE_CONCAT(bb, ctx->holdingbb);
248 }
249 }
250
251 #if defined(RLFDEBUG)
252 brigade_dump(f->r, ctx->tmpbb);
253 #endif /* RLFDEBUG */
254
255 /* Save remaining tmpbb with the correct lifetime for the next call */
256 return ap_save_brigade(f, &ctx->holdingbb, &ctx->tmpbb, f->r->pool);
257 }
258
259
260 static apr_status_t
rl_bucket_read(apr_bucket * b,const char ** str,apr_size_t * len,apr_read_type_e block)261 rl_bucket_read(apr_bucket *b, const char **str,
262 apr_size_t *len, apr_read_type_e block)
263 {
264 *str = NULL;
265 *len = 0;
266 return APR_SUCCESS;
267 }
268
269 AP_RL_DECLARE(apr_bucket *)
ap_rl_end_create(apr_bucket_alloc_t * list)270 ap_rl_end_create(apr_bucket_alloc_t *list)
271 {
272 apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
273
274 APR_BUCKET_INIT(b);
275 b->free = apr_bucket_free;
276 b->list = list;
277 b->length = 0;
278 b->start = 0;
279 b->data = NULL;
280 b->type = &ap_rl_bucket_type_end;
281
282 return b;
283 }
284
285 AP_RL_DECLARE(apr_bucket *)
ap_rl_start_create(apr_bucket_alloc_t * list)286 ap_rl_start_create(apr_bucket_alloc_t *list)
287 {
288 apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);
289
290 APR_BUCKET_INIT(b);
291 b->free = apr_bucket_free;
292 b->list = list;
293 b->length = 0;
294 b->start = 0;
295 b->data = NULL;
296 b->type = &ap_rl_bucket_type_start;
297
298 return b;
299 }
300
301
302
303 AP_RL_DECLARE_DATA const apr_bucket_type_t ap_rl_bucket_type_end = {
304 "RL_END", 5, APR_BUCKET_METADATA,
305 apr_bucket_destroy_noop,
306 rl_bucket_read,
307 apr_bucket_setaside_noop,
308 apr_bucket_split_notimpl,
309 apr_bucket_simple_copy
310 };
311
312
313 AP_RL_DECLARE_DATA const apr_bucket_type_t ap_rl_bucket_type_start = {
314 "RL_START", 5, APR_BUCKET_METADATA,
315 apr_bucket_destroy_noop,
316 rl_bucket_read,
317 apr_bucket_setaside_noop,
318 apr_bucket_split_notimpl,
319 apr_bucket_simple_copy
320 };
321
322
323
324
register_hooks(apr_pool_t * p)325 static void register_hooks(apr_pool_t *p)
326 {
327 /* run after mod_deflate etc etc, but not at connection level, ie, mod_ssl. */
328 ap_register_output_filter(RATE_LIMIT_FILTER_NAME, rate_limit_filter,
329 NULL, AP_FTYPE_CONNECTION - 1);
330 }
331
332 AP_DECLARE_MODULE(ratelimit) = {
333 STANDARD20_MODULE_STUFF,
334 NULL, /* create per-directory config structure */
335 NULL, /* merge per-directory config structures */
336 NULL, /* create per-server config structure */
337 NULL, /* merge per-server config structures */
338 NULL, /* command apr_table_t */
339 register_hooks
340 };
341