1 /*
2  * Copyright (c) 2007-2012, Vsevolod Stakhov
3  * All rights reserved.
4 
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer. Redistributions in binary form
9  * must reproduce the above copyright notice, this list of conditions and the
10  * following disclaimer in the documentation and/or other materials provided with
11  * the distribution. Neither the name of the author nor the names of its
12  * contributors may be used to endorse or promote products derived from this
13  * software without specific prior written permission.
14 
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
22  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
23  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
24  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  */
26 
27 #include "config.h"
28 #include "rmilter.h"
29 #include "upstream.h"
30 #include "xxhash.h"
31 
32 #ifdef WITH_DEBUG
33 #define msg_debug(args...) syslog(LOG_DEBUG, ##args)
34 #else
35 #define msg_debug(args...) do {} while(0)
36 #endif
37 
38 #ifdef _THREAD_SAFE
39 pthread_rwlock_t upstream_mtx = PTHREAD_RWLOCK_INITIALIZER;
40 #define U_RLOCK() do { pthread_rwlock_rdlock (&upstream_mtx); } while (0)
41 #define U_WLOCK() do { pthread_rwlock_wrlock (&upstream_mtx); } while (0)
42 #define U_UNLOCK() do { pthread_rwlock_unlock (&upstream_mtx); } while (0)
43 #else
44 #define U_RLOCK() do {} while (0)
45 #define U_WLOCK() do {} while (0)
46 #define U_UNLOCK() do {} while (0)
47 #endif
48 
49 #define MAX_TRIES 20
50 
51 /*
52  * Check upstream parameters and mark it whether valid or dead
53  */
check_upstream(struct upstream * up,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)54 static void check_upstream(struct upstream *up, time_t now,
55 		time_t error_timeout, time_t revive_timeout, unsigned int max_errors,
56 		const struct mlfi_priv *priv)
57 {
58 	if (up->dead) {
59 		if (now - up->time >= revive_timeout) {
60 			msg_debug("<%s>; check_upstream: reviving upstream after %ld seconds",
61 					priv->mlfi_id, (long int ) now - up->time);
62 			U_WLOCK ();
63 			up->dead = 0;
64 			up->errors = 0;
65 			up->time = 0;
66 			up->weight = up->priority;
67 			U_UNLOCK ();
68 		}
69 	}
70 	else {
71 		if (now - up->time >= error_timeout && up->errors >= max_errors) {
72 			msg_debug(
73 					"<%s>; check_upstream: marking upstreams as dead after %ld errors",
74 					priv->mlfi_id, (long int ) up->errors);
75 			U_WLOCK ();
76 			up->dead = 1;
77 			up->time = now;
78 			up->weight = 0;
79 			U_UNLOCK ();
80 		}
81 	}
82 }
83 
84 /*
85  * Call this function after failed upstream request
86  */
upstream_fail(struct upstream * up,time_t now)87 void upstream_fail(struct upstream *up, time_t now)
88 {
89 	if (up->time != 0) {
90 		up->errors++;
91 	}
92 	else {
93 		U_WLOCK ();
94 		up->time = now;
95 		up->errors++;
96 		U_UNLOCK ();
97 	}
98 }
99 /*
100  * Call this function after successful upstream request
101  */
upstream_ok(struct upstream * up,time_t now)102 void upstream_ok(struct upstream *up, time_t now)
103 {
104 	if (up->errors != 0) {
105 		U_WLOCK ();
106 		up->errors = 0;
107 		up->time = 0;
108 		U_UNLOCK ();
109 	}
110 
111 	up->weight--;
112 }
113 /*
114  * Mark all upstreams as active. This function is used when all upstreams are marked as inactive
115  */
revive_all_upstreams(void * ups,unsigned int members,unsigned int msize,const struct mlfi_priv * priv)116 void revive_all_upstreams(void *ups, unsigned int members, unsigned int msize,
117 		const struct mlfi_priv *priv)
118 {
119 	unsigned int i;
120 	struct upstream *cur;
121 	u_char *p;
122 
123 	U_WLOCK ();
124 	msg_debug("<%s>; revive_all_upstreams: starting reviving all upstreams", priv->mlfi_id);
125 	p = ups;
126 	for (i = 0; i < members; i++) {
127 		cur = (struct upstream *) p;
128 		cur->time = 0;
129 		cur->errors = 0;
130 		cur->dead = 0;
131 		cur->weight = cur->priority;
132 		p += msize;
133 	}
134 	U_UNLOCK ();
135 }
136 
137 /*
138  * Scan all upstreams for errors and mark upstreams dead or alive depends on conditions,
139  * return number of alive upstreams
140  */
rescan_upstreams(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)141 static int rescan_upstreams(void *ups, unsigned int members, unsigned int msize,
142 		time_t now, time_t error_timeout, time_t revive_timeout,
143 		unsigned int max_errors, const struct mlfi_priv *priv)
144 {
145 	unsigned int i, alive;
146 	struct upstream *cur;
147 	u_char *p;
148 
149 	/* Recheck all upstreams */
150 	p = ups;
151 	alive = members;
152 	for (i = 0; i < members; i++) {
153 		cur = (struct upstream *) p;
154 		check_upstream (cur, now, error_timeout, revive_timeout, max_errors, priv);
155 		alive--;
156 		p += msize;
157 	}
158 
159 	/* All upstreams are dead */
160 	if (alive == 0) {
161 		revive_all_upstreams (ups, members, msize, priv);
162 		alive = members;
163 	}
164 
165 	msg_debug("<%s>; rescan_upstreams: %d upstreams alive", priv->mlfi_id, alive);
166 
167 	return (int) alive;
168 
169 }
170 
171 /* Return alive upstream by its number */
172 static struct upstream *
get_upstream_by_number(void * ups,unsigned int members,unsigned int msize,int selected)173 get_upstream_by_number(void *ups, unsigned int members, unsigned int msize,
174 		int selected)
175 {
176 	int i;
177 	u_char *p, *c;
178 	struct upstream *cur;
179 
180 	i = 0;
181 	p = ups;
182 	c = ups;
183 	U_RLOCK ();
184 	for (;;) {
185 		/* Out of range, return NULL */
186 		if (p > c + members * msize) {
187 			break;
188 		}
189 
190 		cur = (struct upstream *) p;
191 		p += msize;
192 
193 		if (cur->dead) {
194 			/* Skip inactive upstreams */
195 			continue;
196 		}
197 		/* Return selected upstream */
198 		if (i == selected) {
199 			U_UNLOCK ();
200 			return cur;
201 		}
202 		i++;
203 	}
204 	U_UNLOCK ();
205 
206 	/* Error */
207 	return NULL;
208 
209 }
210 
211 static uint64_t
get_hash_for_key(const unsigned char * key,unsigned int keylen)212 get_hash_for_key(const unsigned char *key, unsigned int keylen)
213 {
214 	return XXH64 (key, keylen, 0xdeadbabe);
215 }
216 
217 /*
218  * Recheck all upstreams and return random active upstream
219  */
220 struct upstream *
get_random_upstream(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)221 get_random_upstream(void *ups, unsigned int members, unsigned int msize,
222 		time_t now, time_t error_timeout, time_t revive_timeout,
223 		unsigned int max_errors, const struct mlfi_priv *priv)
224 {
225 	int alive, selected;
226 
227 	alive = rescan_upstreams (ups, members, msize, now, error_timeout,
228 			revive_timeout, max_errors, priv);
229 	selected = rand () % alive;
230 	msg_debug("<%s>; get_random_upstream: return upstream with number %d of %d",
231 			priv->mlfi_id, selected, alive);
232 
233 	return get_upstream_by_number (ups, members, msize, selected);
234 }
235 
236 /*
237  * The key idea of this function is obtained from the following paper:
238  * A Fast, Minimal Memory, Consistent Hash Algorithm
239  * John Lamping, Eric Veach
240  *
241  * http://arxiv.org/abs/1406.2294
242  */
rmilter_consistent_hash(uint64_t key,uint32_t nbuckets)243 static uint32_t rmilter_consistent_hash(uint64_t key, uint32_t nbuckets)
244 {
245 	int64_t b = -1, j = 0;
246 
247 	while (j < nbuckets) {
248 		b = j;
249 		key *= 2862933555777941757ULL + 1;
250 		j = (b + 1) * (double) (1ULL << 31) / (double) ((key >> 33) + 1ULL);
251 	}
252 
253 	return b;
254 }
255 
256 /*
257  * Return upstream by hash, that is calculated from active upstreams number
258  */
259 struct upstream *
get_upstream_by_hash(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const unsigned char * key,unsigned int keylen,const struct mlfi_priv * priv)260 get_upstream_by_hash(void *ups, unsigned int members, unsigned int msize,
261 		time_t now, time_t error_timeout, time_t revive_timeout,
262 		unsigned int max_errors, const unsigned char *key,
263 		unsigned int keylen, const struct mlfi_priv *priv)
264 {
265 	int alive, i = 0, sel;
266 	uint64_t h = 0;
267 	char numbuf[4];
268 	u_char *c, *p;
269 	struct upstream *cur = NULL;
270 
271 	alive = rescan_upstreams (ups, members, msize, now, error_timeout,
272 			revive_timeout, max_errors, priv);
273 
274 	if (alive == 0) {
275 		return NULL;
276 	}
277 
278 	h = get_hash_for_key (key, keylen);
279 	sel = rmilter_consistent_hash (h, alive);
280 
281 	msg_debug("<%s>; get_upstream_by_hash: try to select upstream number %d of %d",
282 			priv->mlfi_id, sel, alive);
283 	U_RLOCK ();
284 	p = ups;
285 	c = ups;
286 
287 	for (;;) {
288 		/* Out of range, return NULL */
289 		if (p > c + members * msize) {
290 			break;
291 		}
292 
293 		cur = (struct upstream *) p;
294 		p += msize;
295 
296 		if (cur->dead) {
297 			/* Skip inactive upstreams */
298 			continue;
299 		}
300 		/* Return selected upstream */
301 		if (i == sel) {
302 			U_UNLOCK ();
303 			return cur;
304 		}
305 		i++;
306 	}
307 	U_UNLOCK ();
308 
309 	return cur;
310 }
311 
312 /*
313  * Recheck all upstreams and return upstream in round-robin order according to weight and priority
314  */
315 struct upstream *
get_upstream_round_robin(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)316 get_upstream_round_robin(void *ups, unsigned int members, unsigned int msize,
317 		time_t now, time_t error_timeout, time_t revive_timeout,
318 		unsigned int max_errors, const struct mlfi_priv *priv)
319 {
320 	unsigned int max_weight, i;
321 	struct upstream *cur, *selected = NULL;
322 	u_char *p;
323 
324 	/* Recheck all upstreams */
325 	rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout,
326 			max_errors, priv);
327 
328 	p = ups;
329 	max_weight = 0;
330 	selected = (struct upstream *) p;
331 	U_RLOCK ();
332 	for (i = 0; i < members; i++) {
333 		cur = (struct upstream *) p;
334 		if (!cur->dead) {
335 			if ((int) max_weight < cur->weight) {
336 				max_weight = cur->weight;
337 				selected = cur;
338 			}
339 		}
340 		p += msize;
341 	}
342 	U_UNLOCK ();
343 
344 	if (max_weight == 0) {
345 		p = ups;
346 		U_WLOCK ();
347 		for (i = 0; i < members; i++) {
348 			cur = (struct upstream *) p;
349 			cur->weight = cur->priority;
350 			if (!cur->dead) {
351 				if (max_weight < cur->priority) {
352 					max_weight = cur->priority;
353 					selected = cur;
354 				}
355 			}
356 			p += msize;
357 		}
358 		U_UNLOCK ();
359 	}
360 	msg_debug("<%s>; get_upstream_round_robin: selecting upstream with weight %d",
361 			priv->mlfi_id, max_weight);
362 
363 	return selected;
364 }
365 
366 /*
367  * Recheck all upstreams and return upstream in round-robin order according to only priority (master-slaves)
368  */
369 struct upstream *
get_upstream_master_slave(void * ups,unsigned int members,unsigned int msize,time_t now,time_t error_timeout,time_t revive_timeout,unsigned int max_errors,const struct mlfi_priv * priv)370 get_upstream_master_slave(void *ups, unsigned int members, unsigned int msize,
371 		time_t now, time_t error_timeout, time_t revive_timeout,
372 		unsigned int max_errors, const struct mlfi_priv *priv)
373 {
374 	unsigned int max_weight, i;
375 	struct upstream *cur, *selected = NULL;
376 	u_char *p;
377 
378 	/* Recheck all upstreams */
379 	rescan_upstreams (ups, members, msize, now, error_timeout, revive_timeout,
380 			max_errors, priv);
381 
382 	p = ups;
383 	max_weight = 0;
384 	selected = (struct upstream *) p;
385 	U_RLOCK ();
386 	for (i = 0; i < members; i++) {
387 		cur = (struct upstream *) p;
388 		if (!cur->dead) {
389 			if (max_weight < cur->priority) {
390 				max_weight = cur->priority;
391 				selected = cur;
392 			}
393 		}
394 		p += msize;
395 	}
396 	U_UNLOCK ();
397 	msg_debug("<%s>; get_upstream_master_slave: selecting upstream with priority %d",
398 			priv->mlfi_id, max_weight);
399 
400 	return selected;
401 }
402 
403 #undef U_LOCK
404 #undef U_UNLOCK
405