1 /*
2 * Copyright (c) 2007-2012, Vsevolod Stakhov
3 * All rights reserved.
4
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * Redistributions of source code must retain the above copyright notice, this
8 * list of conditions and the following disclaimer. Redistributions in binary form
9 * must reproduce the above copyright notice, this list of conditions and the
10 * following disclaimer in the documentation and/or other materials provided with
11 * the distribution. Neither the name of the author nor the names of its
12 * contributors may be used to endorse or promote products derived from this
13 * software without specific prior written permission.
14
15 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
22 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
23 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
24 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 */
26
27 #include "config.h"
28 #include "rmilter.h"
29 #include "upstream.h"
30 #include "xxhash.h"
31
32 #ifdef WITH_DEBUG
33 #define msg_debug(args...) syslog(LOG_DEBUG, ##args)
34 #else
35 #define msg_debug(args...) do {} while(0)
36 #endif
37
38 #ifdef _THREAD_SAFE
39 pthread_rwlock_t upstream_mtx = PTHREAD_RWLOCK_INITIALIZER;
40 #define U_RLOCK() do { pthread_rwlock_rdlock (&upstream_mtx); } while (0)
41 #define U_WLOCK() do { pthread_rwlock_wrlock (&upstream_mtx); } while (0)
42 #define U_UNLOCK() do { pthread_rwlock_unlock (&upstream_mtx); } while (0)
43 #else
44 #define U_RLOCK() do {} while (0)
45 #define U_WLOCK() do {} while (0)
46 #define U_UNLOCK() do {} while (0)
47 #endif
48
49 #define MAX_TRIES 20
50
51 /*
52 * Check upstream parameters and mark it whether valid or dead
53 */
check_upstream(struct upstream * up,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)54 static void check_upstream(struct upstream *up, time_t now,
55 time_t error_timeout, time_t revive_timeout, unsigned int max_errors,
56 const struct mlfi_priv *priv)
57 {
58 if (up->dead) {
59 if (now - up->time >= revive_timeout) {
60 msg_debug("<%s>; check_upstream: reviving upstream after %ld seconds",
61 priv->mlfi_id, (long int ) now - up->time);
62 U_WLOCK ();
63 up->dead = 0;
64 up->errors = 0;
65 up->time = 0;
66 up->weight = up->priority;
67 U_UNLOCK ();
68 }
69 }
70 else {
71 if (now - up->time >= error_timeout && up->errors >= max_errors) {
72 msg_debug(
73 "<%s>; check_upstream: marking upstreams as dead after %ld errors",
74 priv->mlfi_id, (long int ) up->errors);
75 U_WLOCK ();
76 up->dead = 1;
77 up->time = now;
78 up->weight = 0;
79 U_UNLOCK ();
80 }
81 }
82 }
83
84 /*
85 * Call this function after failed upstream request
86 */
upstream_fail(struct upstream * up,time_t now)87 void upstream_fail(struct upstream *up, time_t now)
88 {
89 if (up->time != 0) {
90 up->errors++;
91 }
92 else {
93 U_WLOCK ();
94 up->time = now;
95 up->errors++;
96 U_UNLOCK ();
97 }
98 }
99 /*
100 * Call this function after successful upstream request
101 */
upstream_ok(struct upstream * up,time_t now)102 void upstream_ok(struct upstream *up, time_t now)
103 {
104 if (up->errors != 0) {
105 U_WLOCK ();
106 up->errors = 0;
107 up->time = 0;
108 U_UNLOCK ();
109 }
110
111 up->weight--;
112 }
113 /*
114 * Mark all upstreams as active. This function is used when all upstreams are marked as inactive
115 */
revive_all_upstreams(void * ups,unsigned int members,unsigned int msize,const struct mlfi_priv * priv)116 void revive_all_upstreams(void *ups, unsigned int members, unsigned int msize,
117 const struct mlfi_priv *priv)
118 {
119 unsigned int i;
120 struct upstream *cur;
121 u_char *p;
122
123 U_WLOCK ();
124 msg_debug("<%s>; revive_all_upstreams: starting reviving all upstreams", priv->mlfi_id);
125 p = ups;
126 for (i = 0; i < members; i++) {
127 cur = (struct upstream *) p;
128 cur->time = 0;
129 cur->errors = 0;
130 cur->dead = 0;
131 cur->weight = cur->priority;
132 p += msize;
133 }
134 U_UNLOCK ();
135 }
136
137 /*
138 * Scan all upstreams for errors and mark upstreams dead or alive depends on conditions,
139 * return number of alive upstreams
140 */
rescan_upstreams(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)141 static int rescan_upstreams(void *ups, unsigned int members, unsigned int msize,
142 time_t now, time_t error_timeout, time_t revive_timeout,
143 unsigned int max_errors, const struct mlfi_priv *priv)
144 {
145 unsigned int i, alive;
146 struct upstream *cur;
147 u_char *p;
148
149 /* Recheck all upstreams */
150 p = ups;
151 alive = members;
152 for (i = 0; i < members; i++) {
153 cur = (struct upstream *) p;
154 check_upstream (cur, now, error_timeout, revive_timeout, max_errors, priv);
155 alive--;
156 p += msize;
157 }
158
159 /* All upstreams are dead */
160 if (alive == 0) {
161 revive_all_upstreams (ups, members, msize, priv);
162 alive = members;
163 }
164
165 msg_debug("<%s>; rescan_upstreams: %d upstreams alive", priv->mlfi_id, alive);
166
167 return (int) alive;
168
169 }
170
171 /* Return alive upstream by its number */
172 static struct upstream *
get_upstream_by_number(void * ups,unsigned int members,unsigned int msize,int selected)173 get_upstream_by_number(void *ups, unsigned int members, unsigned int msize,
174 int selected)
175 {
176 int i;
177 u_char *p, *c;
178 struct upstream *cur;
179
180 i = 0;
181 p = ups;
182 c = ups;
183 U_RLOCK ();
184 for (;;) {
185 /* Out of range, return NULL */
186 if (p > c + members * msize) {
187 break;
188 }
189
190 cur = (struct upstream *) p;
191 p += msize;
192
193 if (cur->dead) {
194 /* Skip inactive upstreams */
195 continue;
196 }
197 /* Return selected upstream */
198 if (i == selected) {
199 U_UNLOCK ();
200 return cur;
201 }
202 i++;
203 }
204 U_UNLOCK ();
205
206 /* Error */
207 return NULL;
208
209 }
210
211 static uint64_t
get_hash_for_key(const unsigned char * key,unsigned int keylen)212 get_hash_for_key(const unsigned char *key, unsigned int keylen)
213 {
214 return XXH64 (key, keylen, 0xdeadbabe);
215 }
216
217 /*
218 * Recheck all upstreams and return random active upstream
219 */
220 struct upstream *
get_random_upstream(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)221 get_random_upstream(void *ups, unsigned int members, unsigned int msize,
222 time_t now, time_t error_timeout, time_t revive_timeout,
223 unsigned int max_errors, const struct mlfi_priv *priv)
224 {
225 int alive, selected;
226
227 alive = rescan_upstreams (ups, members, msize, now, error_timeout,
228 revive_timeout, max_errors, priv);
229 selected = rand () % alive;
230 msg_debug("<%s>; get_random_upstream: return upstream with number %d of %d",
231 priv->mlfi_id, selected, alive);
232
233 return get_upstream_by_number (ups, members, msize, selected);
234 }
235
236 /*
237 * The key idea of this function is obtained from the following paper:
238 * A Fast, Minimal Memory, Consistent Hash Algorithm
239 * John Lamping, Eric Veach
240 *
241 * http://arxiv.org/abs/1406.2294
242 */
rmilter_consistent_hash(uint64_t key,uint32_t nbuckets)243 static uint32_t rmilter_consistent_hash(uint64_t key, uint32_t nbuckets)
244 {
245 int64_t b = -1, j = 0;
246
247 while (j < nbuckets) {
248 b = j;
249 key *= 2862933555777941757ULL + 1;
250 j = (b + 1) * (double) (1ULL << 31) / (double) ((key >> 33) + 1ULL);
251 }
252
253 return b;
254 }
255
256 /*
257 * Return upstream by hash, that is calculated from active upstreams number
258 */
259 struct upstream *
get_upstream_by_hash(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const unsigned char * key,unsigned int keylen,const struct mlfi_priv * priv)260 get_upstream_by_hash(void *ups, unsigned int members, unsigned int msize,
261 time_t now, time_t error_timeout, time_t revive_timeout,
262 unsigned int max_errors, const unsigned char *key,
263 unsigned int keylen, const struct mlfi_priv *priv)
264 {
265 int alive, i = 0, sel;
266 uint64_t h = 0;
267 char numbuf[4];
268 u_char *c, *p;
269 struct upstream *cur = NULL;
270
271 alive = rescan_upstreams (ups, members, msize, now, error_timeout,
272 revive_timeout, max_errors, priv);
273
274 if (alive == 0) {
275 return NULL;
276 }
277
278 h = get_hash_for_key (key, keylen);
279 sel = rmilter_consistent_hash (h, alive);
280
281 msg_debug("<%s>; get_upstream_by_hash: try to select upstream number %d of %d",
282 priv->mlfi_id, sel, alive);
283 U_RLOCK ();
284 p = ups;
285 c = ups;
286
287 for (;;) {
288 /* Out of range, return NULL */
289 if (p > c + members * msize) {
290 break;
291 }
292
293 cur = (struct upstream *) p;
294 p += msize;
295
296 if (cur->dead) {
297 /* Skip inactive upstreams */
298 continue;
299 }
300 /* Return selected upstream */
301 if (i == sel) {
302 U_UNLOCK ();
303 return cur;
304 }
305 i++;
306 }
307 U_UNLOCK ();
308
309 return cur;
310 }
311
312 /*
313 * Recheck all upstreams and return upstream in round-robin order according to weight and priority
314 */
315 struct upstream *
get_upstream_round_robin(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)316 get_upstream_round_robin(void *ups, unsigned int members, unsigned int msize,
317 time_t now, time_t error_timeout, time_t revive_timeout,
318 unsigned int max_errors, const struct mlfi_priv *priv)
319 {
320 unsigned int max_weight, i;
321 struct upstream *cur, *selected = NULL;
322 u_char *p;
323
324 /* Recheck all upstreams */
325 rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout,
326 max_errors, priv);
327
328 p = ups;
329 max_weight = 0;
330 selected = (struct upstream *) p;
331 U_RLOCK ();
332 for (i = 0; i < members; i++) {
333 cur = (struct upstream *) p;
334 if (!cur->dead) {
335 if ((int) max_weight < cur->weight) {
336 max_weight = cur->weight;
337 selected = cur;
338 }
339 }
340 p += msize;
341 }
342 U_UNLOCK ();
343
344 if (max_weight == 0) {
345 p = ups;
346 U_WLOCK ();
347 for (i = 0; i < members; i++) {
348 cur = (struct upstream *) p;
349 cur->weight = cur->priority;
350 if (!cur->dead) {
351 if (max_weight < cur->priority) {
352 max_weight = cur->priority;
353 selected = cur;
354 }
355 }
356 p += msize;
357 }
358 U_UNLOCK ();
359 }
360 msg_debug("<%s>; get_upstream_round_robin: selecting upstream with weight %d",
361 priv->mlfi_id, max_weight);
362
363 return selected;
364 }
365
366 /*
367 * Recheck all upstreams and return upstream in round-robin order according to only priority (master-slaves)
368 */
369 struct upstream *
get_upstream_master_slave(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)370 get_upstream_master_slave(void *ups, unsigned int members, unsigned int msize,
371 time_t now, time_t error_timeout, time_t revive_timeout,
372 unsigned int max_errors, const struct mlfi_priv *priv)
373 {
374 unsigned int max_weight, i;
375 struct upstream *cur, *selected = NULL;
376 u_char *p;
377
378 /* Recheck all upstreams */
379 rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout,
380 max_errors, priv);
381
382 p = ups;
383 max_weight = 0;
384 selected = (struct upstream *) p;
385 U_RLOCK ();
386 for (i = 0; i < members; i++) {
387 cur = (struct upstream *) p;
388 if (!cur->dead) {
389 if (max_weight < cur->priority) {
390 max_weight = cur->priority;
391 selected = cur;
392 }
393 }
394 p += msize;
395 }
396 U_UNLOCK ();
397 msg_debug("<%s>; get_upstream_master_slave: selecting upstream with priority %d",
398 priv->mlfi_id, max_weight);
399
400 return selected;
401 }
402
403 #undef U_LOCK
404 #undef U_UNLOCK
405