1 /*
2  * Copyright (c) 2016 Justin Zhu
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to
6  * deal in the Software without restriction, including without limitation the
7  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8  * sell copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20  * IN THE SOFTWARE.
21  */
22 #include <stdlib.h>
23 #include "h2o.h"
24 
25 #ifndef HUNDRED_MS
26 #define HUNDRED_MS 100
27 #endif
28 
29 #ifndef ONE_SECOND
30 #define ONE_SECOND 1000
31 #endif
32 
33 typedef struct st_throttle_resp_t {
34     h2o_ostream_t super;
35     h2o_timer_t timeout_entry;
36     int64_t tokens;
37     size_t token_inc;
38     h2o_context_t *ctx;
39     h2o_req_t *req;
40     struct {
41         H2O_VECTOR(h2o_sendvec_t) bufs;
42         h2o_send_state_t stream_state;
43     } state;
44 } throttle_resp_t;
45 
real_send(throttle_resp_t * self)46 static void real_send(throttle_resp_t *self)
47 {
48     /* a really simple token bucket implementation */
49     assert(self->tokens > 0);
50     size_t i, token_consume;
51 
52     token_consume = 0;
53 
54     for (i = 0; i < self->state.bufs.size; i++) {
55         token_consume += self->state.bufs.entries[i].len;
56     }
57 
58     self->tokens -= token_consume;
59 
60     h2o_ostream_send_next(&self->super, self->req, self->state.bufs.entries, self->state.bufs.size, self->state.stream_state);
61     if (!h2o_send_state_is_in_progress(self->state.stream_state))
62         h2o_timer_unlink(&self->timeout_entry);
63 }
64 
add_token(h2o_timer_t * entry)65 static void add_token(h2o_timer_t *entry)
66 {
67     throttle_resp_t *self = H2O_STRUCT_FROM_MEMBER(throttle_resp_t, timeout_entry, entry);
68 
69     h2o_timer_link(self->ctx->loop, 100, &self->timeout_entry);
70     self->tokens += self->token_inc;
71 
72     if (self->tokens > 0)
73         real_send(self);
74 }
75 
on_send(h2o_ostream_t * _self,h2o_req_t * req,h2o_sendvec_t * inbufs,size_t inbufcnt,h2o_send_state_t state)76 static void on_send(h2o_ostream_t *_self, h2o_req_t *req, h2o_sendvec_t *inbufs, size_t inbufcnt, h2o_send_state_t state)
77 {
78     throttle_resp_t *self = (void *)_self;
79     size_t i;
80 
81     /* I don't know if this is a proper way. */
82     h2o_vector_reserve(&req->pool, &self->state.bufs, inbufcnt);
83     /* start to save state */
84     for (i = 0; i < inbufcnt; ++i) {
85         self->state.bufs.entries[i] = inbufs[i];
86     }
87     self->state.bufs.size = inbufcnt;
88     self->state.stream_state = state;
89 
90     /* if there's token, we try to send */
91     if (self->tokens > 0)
92         real_send(self);
93 }
94 
on_stop(h2o_ostream_t * _self,h2o_req_t * req)95 static void on_stop(h2o_ostream_t *_self, h2o_req_t *req)
96 {
97     throttle_resp_t *self = (void *)_self;
98     if (h2o_timer_is_linked(&self->timeout_entry))
99         h2o_timer_unlink(&self->timeout_entry);
100 }
101 
on_setup_ostream(h2o_filter_t * self,h2o_req_t * req,h2o_ostream_t ** slot)102 static void on_setup_ostream(h2o_filter_t *self, h2o_req_t *req, h2o_ostream_t **slot)
103 {
104     throttle_resp_t *throttle;
105     h2o_iovec_t traffic_header_value;
106     size_t traffic_limit;
107 
108     if (req->res.status != 200)
109         goto Next;
110     if (h2o_memis(req->input.method.base, req->input.method.len, H2O_STRLIT("HEAD")))
111         goto Next;
112 
113     ssize_t xt_index;
114     if ((xt_index = h2o_find_header(&req->res.headers, H2O_TOKEN_X_TRAFFIC, -1)) == -1)
115         goto Next;
116 
117     traffic_header_value = req->res.headers.entries[xt_index].value;
118     char *buf = traffic_header_value.base;
119 
120     if (H2O_UNLIKELY((traffic_limit = h2o_strtosizefwd(&buf, traffic_header_value.len)) == SIZE_MAX))
121         goto Next;
122 
123     throttle = (void *)h2o_add_ostream(req, H2O_ALIGNOF(*throttle), sizeof(*throttle), slot);
124 
125     /* calculate the token increment per 100ms */
126     throttle->token_inc = traffic_limit * HUNDRED_MS / ONE_SECOND;
127     if (req->preferred_chunk_size > throttle->token_inc)
128         req->preferred_chunk_size = throttle->token_inc;
129 
130     h2o_delete_header(&req->res.headers, xt_index);
131 
132     throttle->super.do_send = on_send;
133     throttle->super.stop = on_stop;
134     throttle->ctx = req->conn->ctx;
135     throttle->req = req;
136     throttle->state.bufs.capacity = 0;
137     throttle->state.bufs.size = 0;
138 
139     throttle->tokens = throttle->token_inc;
140     slot = &throttle->super.next;
141 
142     h2o_timer_init(&throttle->timeout_entry, add_token);
143     h2o_timer_link(throttle->ctx->loop, 100, &throttle->timeout_entry);
144 
145 Next:
146     h2o_setup_next_ostream(req, slot);
147 }
148 
h2o_throttle_resp_register(h2o_pathconf_t * pathconf)149 void h2o_throttle_resp_register(h2o_pathconf_t *pathconf)
150 {
151     h2o_filter_t *self = h2o_create_filter(pathconf, sizeof(*self));
152     self->on_setup_ostream = on_setup_ostream;
153 }
154