1 /*
2  * Copyright (c) 2016, Vsevolod Stakhov
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *	 * Redistributions of source code must retain the above copyright
8  *	   notice, this list of conditions and the following disclaimer.
9  *	 * Redistributions in binary form must reproduce the above copyright
10  *	   notice, this list of conditions and the following disclaimer in the
11  *	   documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
14  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
15  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
16  * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
17  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
18  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
19  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
20  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
21  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
22  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23  */
24 
25 #include "config.h"
26 #ifdef WITH_MEMCACHED
27 #include "libmemcached/memcached.h"
28 #endif
29 #include "cfg_file.h"
30 #include "cache.h"
31 #include "hiredis.h"
32 #include "rmilter.h"
33 #include "upstream.h"
34 #include "util.h"
35 #include <assert.h>
36 
37 #define DEFAULT_REDIS_PORT 6379
38 
39 #ifdef WITH_MEMCACHED
compat_memcached_success(int rc)40 static inline bool compat_memcached_success(int rc)
41 {
42 	return (rc == MEMCACHED_BUFFERED ||
43 			rc == MEMCACHED_DELETED ||
44 			rc == MEMCACHED_END ||
45 			rc == MEMCACHED_ITEM ||
46 			rc == MEMCACHED_STAT ||
47 			rc == MEMCACHED_STORED ||
48 			rc == MEMCACHED_SUCCESS ||
49 			rc == MEMCACHED_VALUE);
50 }
51 
compat_memcached_fatal(int rc)52 static inline bool compat_memcached_fatal(int rc)
53 {
54 	return (
55 			rc != MEMCACHED_BUFFERED &&
56 			rc != MEMCACHED_DATA_EXISTS &&
57 			rc != MEMCACHED_DELETED &&
58 			rc != MEMCACHED_END &&
59 			rc != MEMCACHED_ITEM &&
60 			rc != MEMCACHED_NOTFOUND &&
61 			rc != MEMCACHED_NOTSTORED &&
62 			rc != MEMCACHED_STAT &&
63 			rc != MEMCACHED_STORED &&
64 			rc != MEMCACHED_SUCCESS &&
65 			rc != MEMCACHED_VALUE);
66 }
67 #endif
68 
69 static struct cache_server *
rmilter_get_server(struct config_file * cfg,enum rmilter_query_type type,const unsigned char * key,size_t keylen,struct mlfi_priv * priv)70 rmilter_get_server (struct config_file *cfg, enum rmilter_query_type type,
71 		const unsigned char *key, size_t keylen, struct mlfi_priv *priv)
72 {
73 	struct cache_server *serv = NULL;
74 	void *ptr = NULL;
75 	unsigned mlen = 0;
76 
77 	switch (type) {
78 	case RMILTER_QUERY_GREYLIST:
79 		if (cfg->cache_servers_grey_num > 0) {
80 			ptr = cfg->cache_servers_grey;
81 			mlen = cfg->cache_servers_grey_num;
82 		}
83 		break;
84 	case RMILTER_QUERY_WHITELIST:
85 		if (cfg->cache_servers_white_num > 0) {
86 			ptr = cfg->cache_servers_white;
87 			mlen = cfg->cache_servers_white_num;
88 		}
89 		else if (cfg->cache_servers_grey_num > 0) {
90 			ptr = cfg->cache_servers_grey;
91 			mlen = cfg->cache_servers_grey_num;
92 		}
93 		break;
94 	case RMILTER_QUERY_RATELIMIT:
95 		if (cfg->cache_servers_limits_num > 0) {
96 			ptr = cfg->cache_servers_limits;
97 			mlen = cfg->cache_servers_limits_num;
98 		}
99 		break;
100 	case RMILTER_QUERY_ID:
101 		if (cfg->cache_servers_id_num > 0) {
102 			ptr = cfg->cache_servers_id;
103 			mlen = cfg->cache_servers_id_num;
104 		}
105 		break;
106 	}
107 
108 	if (ptr) {
109 		serv = (struct cache_server *)get_upstream_by_hash (ptr, mlen,
110 				sizeof (*serv), time (NULL),
111 				cfg->cache_error_time, cfg->cache_dead_time,
112 				cfg->cache_maxerrors, key, keylen, priv);
113 	}
114 
115 	return serv;
116 }
117 
118 static struct cache_server *
rmilter_get_publish_server(struct config_file * cfg,enum rmilter_publish_type type,struct mlfi_priv * priv)119 rmilter_get_publish_server (struct config_file *cfg, enum rmilter_publish_type type,
120 		struct mlfi_priv *priv)
121 {
122 	struct cache_server *serv = NULL;
123 	void *ptr = NULL;
124 	unsigned mlen = 0;
125 
126 	switch (type) {
127 	case RMILTER_PUBLISH_COPY:
128 		if (cfg->cache_servers_copy_num > 0) {
129 			ptr = cfg->cache_servers_copy;
130 			mlen = cfg->cache_servers_copy_num;
131 		}
132 		break;
133 	case RMILTER_PUBLISH_SPAM:
134 		if (cfg->cache_servers_spam_num > 0) {
135 			ptr = cfg->cache_servers_spam;
136 			mlen = cfg->cache_servers_spam_num;
137 		}
138 		break;
139 	}
140 
141 	if (ptr) {
142 		serv = (struct cache_server *)get_upstream_master_slave (ptr, mlen,
143 				sizeof (*serv), time (NULL),
144 				cfg->cache_error_time, cfg->cache_dead_time,
145 				cfg->cache_maxerrors, priv);
146 	}
147 
148 	return serv;
149 }
150 
151 #ifdef WITH_MEMCACHED
152 static void
rmilter_format_libmemcached_config(struct config_file * cfg,struct cache_server * serv,memcached_st * ctx)153 rmilter_format_libmemcached_config (struct config_file *cfg,
154 		struct cache_server *serv,
155 		memcached_st *ctx)
156 {
157 	if (serv->addr[0] == '/' || serv->addr[0] == '.') {
158 		/* Assume unix socket */
159 		memcached_server_add_unix_socket (ctx, serv->addr);
160 	}
161 	else {
162 		memcached_server_add (ctx, serv->addr, serv->port);
163 	}
164 
165 	memcached_behavior_set (ctx, MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT,
166 			cfg->cache_connect_timeout);
167 	memcached_behavior_set (ctx, MEMCACHED_BEHAVIOR_POLL_TIMEOUT,
168 				cfg->cache_connect_timeout);
169 }
170 #endif
171 
172 bool
rmilter_query_cache(struct config_file * cfg,enum rmilter_query_type type,const unsigned char * key,size_t keylen,unsigned char ** data,size_t * datalen,struct mlfi_priv * priv)173 rmilter_query_cache (struct config_file *cfg, enum rmilter_query_type type,
174 		const unsigned char *key, size_t keylen,
175 		unsigned char **data, size_t *datalen, struct mlfi_priv *priv)
176 {
177 	struct cache_server *serv;
178 	redisContext *redis;
179 	redisReply *r = NULL;
180 	struct timeval tv;
181 	bool ret = false;
182 	size_t nelems = 1;
183 	int rep;
184 
185 	serv = rmilter_get_server (cfg, type, key, keylen, priv);
186 
187 	if (serv) {
188 		if (cfg->cache_use_redis) {
189 			/* Special workaround */
190 			if (serv->port == DEFAULT_MEMCACHED_PORT) {
191 				serv->port = DEFAULT_REDIS_PORT;
192 			}
193 
194 			msec_to_tv (cfg->cache_connect_timeout, &tv);
195 			redis = redisConnectWithTimeout (serv->addr, serv->port, tv);
196 
197 			if (redis == NULL || redis->err != 0) {
198 				msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
199 					(int)serv->port, redis ? redis->errstr : "unknown error");
200 				upstream_fail (&serv->up, time (NULL));
201 
202 				if (redis) {
203 					redisFree (redis);
204 				}
205 
206 				return false;
207 			}
208 			else {
209 				rep = 1;
210 				if (cfg->cache_password) {
211 					redisAppendCommand (redis, "AUTH %s",
212 							cfg->cache_password);
213 					rep ++;
214 				}
215 				if (cfg->cache_dbname) {
216 					redisAppendCommand (redis, "SELECT %s",
217 							cfg->cache_dbname);
218 					rep ++;
219 				}
220 
221 				redisAppendCommand (redis, "GET %b", key, keylen);
222 
223 				while (rep > 0) {
224 					redisGetReply (redis, (void **)&r);
225 
226 					/* Ignore all replies but the last one */
227 					if (r != NULL && rep != 1) {
228 						freeReplyObject (r);
229 						r = NULL;
230 					}
231 					rep --;
232 				}
233 
234 				if (r != NULL) {
235 					if (r->type == REDIS_REPLY_STRING && r->len > 0) {
236 						*data = malloc (r->len);
237 						if (*data) {
238 							memcpy (*data, r->str, r->len);
239 							ret = true;
240 							if (*datalen) {
241 								*datalen = r->len;
242 							}
243 						}
244 					}
245 
246 					freeReplyObject (r);
247 				}
248 
249 				redisFree (redis);
250 				upstream_ok (&serv->up, time (NULL));
251 			}
252 		}
253 		else {
254 #ifdef WITH_MEMCACHED
255 			char *kval;
256 			size_t value_len = 0;
257 			uint32_t mflags;
258 			int mret;
259 			memcached_st *mctx;
260 
261 			mctx = memcached_create (NULL);
262 			rmilter_format_libmemcached_config (cfg, serv, mctx);
263 
264 			if (mctx == NULL) {
265 				msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
266 					(int)serv->port, strerror (errno));
267 				upstream_fail (&serv->up, time (NULL));
268 
269 				return false;
270 			}
271 
272 			kval = memcached_get (mctx, key, keylen,
273 					&value_len, &mflags, &mret);
274 
275 			if (!compat_memcached_success (mret)) {
276 				if (kval) {
277 					free (kval);
278 				}
279 				*datalen = 0;
280 
281 				if (compat_memcached_fatal (mret)) {
282 					msg_err ("<%s>; cannot get key on %s:%d: %s", priv->mlfi_id, serv->addr,
283 						(int)serv->port, memcached_strerror (mctx, mret));
284 					upstream_fail (&serv->up, time (NULL));
285 				}
286 				else {
287 					upstream_ok (&serv->up, time (NULL));
288 				}
289 			}
290 			else {
291 				*data = kval;
292 				*datalen = value_len;
293 				upstream_ok (&serv->up, time (NULL));
294 				ret = true;
295 			}
296 
297 			memcached_free (mctx);
298 #else
299 			msg_err ("<%s>; memcached query requested when memcached support is"
300 					" not compiled", priv->mlfi_id);
301 #endif
302 		}
303 	}
304 
305 	return ret;
306 }
307 
308 bool
rmilter_set_cache(struct config_file * cfg,enum rmilter_query_type type,const unsigned char * key,size_t keylen,const unsigned char * data,size_t datalen,unsigned expire,struct mlfi_priv * priv)309 rmilter_set_cache (struct config_file *cfg, enum rmilter_query_type type ,
310 		const unsigned char *key, size_t keylen,
311 		const unsigned char *data, size_t datalen,
312 		unsigned expire, struct mlfi_priv *priv)
313 {
314 	struct cache_server *serv;
315 	redisContext *redis;
316 	redisReply *r = NULL;
317 	struct timeval tv;
318 	size_t nelems = 1;
319 	int rep;
320 
321 	serv = rmilter_get_server (cfg, type, key, keylen, priv);
322 
323 	if (serv) {
324 		if (cfg->cache_use_redis) {
325 			if (serv->port == DEFAULT_MEMCACHED_PORT) {
326 				serv->port = DEFAULT_REDIS_PORT;
327 			}
328 
329 			msec_to_tv (cfg->cache_connect_timeout, &tv);
330 			redis = redisConnectWithTimeout (serv->addr, serv->port, tv);
331 
332 			if (redis == NULL || redis->err != 0) {
333 				msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
334 					(int)serv->port, redis ? redis->errstr : "unknown error");
335 				upstream_fail (&serv->up, time (NULL));
336 
337 				if (redis) {
338 					redisFree (redis);
339 				}
340 
341 				return false;
342 			}
343 			else {
344 				rep = 1;
345 				if (cfg->cache_password) {
346 					redisAppendCommand (redis, "AUTH %s",
347 							cfg->cache_password);
348 					rep ++;
349 				}
350 				if (cfg->cache_dbname) {
351 					redisAppendCommand (redis, "SELECT %s",
352 							cfg->cache_dbname);
353 					rep ++;
354 				}
355 
356 				if (expire > 0) {
357 					redisAppendCommand (redis, "SETEX %b %d %b", key, keylen,
358 							expire, data, datalen);
359 				}
360 				else {
361 					redisAppendCommand (redis, "SET %b %b", key, keylen,
362 							data, datalen);
363 				}
364 
365 				while (rep > 0) {
366 					redisGetReply (redis, (void **)&r);
367 
368 					/* Ignore all replies but the last one */
369 					if (r != NULL && rep != 1) {
370 						freeReplyObject (r);
371 						r = NULL;
372 					}
373 					rep --;
374 				}
375 
376 				if (r != NULL) {
377 					freeReplyObject (r);
378 				}
379 
380 				redisFree (redis);
381 				upstream_ok (&serv->up, time (NULL));
382 			}
383 		}
384 		else {
385 #ifdef WITH_MEMCACHED
386 			char *kval;
387 			size_t value_len = 0;
388 			uint32_t mflags;
389 			int mret;
390 			memcached_st *mctx;
391 
392 			mctx = memcached_create (NULL);
393 			rmilter_format_libmemcached_config (cfg, serv, mctx);
394 
395 			if (mctx == NULL) {
396 				msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
397 					(int)serv->port, strerror (errno));
398 				upstream_fail (&serv->up, time (NULL));
399 
400 				return false;
401 			}
402 
403 			mret = memcached_set (mctx, key, keylen, data, datalen,
404 					expire, 0);
405 
406 			if (!compat_memcached_success (mret)) {
407 				msg_err ("<%s>; cannot set key on %s:%d: %s", priv->mlfi_id, serv->addr,
408 					    (int)serv->port, memcached_strerror (mctx, mret));
409 				upstream_fail (&serv->up, time (NULL));
410 				memcached_free (mctx);
411 
412 				return false;
413 			}
414 			else {
415 				upstream_ok (&serv->up, time (NULL));
416 			}
417 
418 			memcached_free (mctx);
419 #else
420 		msg_err ("<%s>; memcached query requested when memcached support is"
421 				" not compiled", priv->mlfi_id);
422 #endif
423 		}
424 	}
425 
426 	return true;
427 }
428 
429 bool
rmilter_delete_cache(struct config_file * cfg,enum rmilter_query_type type,const unsigned char * key,size_t keylen,struct mlfi_priv * priv)430 rmilter_delete_cache (struct config_file *cfg, enum rmilter_query_type type ,
431 		const unsigned char *key, size_t keylen, struct mlfi_priv *priv)
432 {
433 	struct cache_server *serv;
434 	redisContext *redis;
435 	redisReply *r = NULL;
436 	struct timeval tv;
437 	size_t nelems = 1;
438 	int rep;
439 
440 	serv = rmilter_get_server (cfg, type, key, keylen, priv);
441 
442 	if (serv) {
443 		if (cfg->cache_use_redis) {
444 			if (serv->port == DEFAULT_MEMCACHED_PORT) {
445 				serv->port = DEFAULT_REDIS_PORT;
446 			}
447 
448 			msec_to_tv (cfg->cache_connect_timeout, &tv);
449 			redis = redisConnectWithTimeout (serv->addr, serv->port, tv);
450 
451 			if (redis == NULL || redis->err != 0) {
452 				msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
453 						(int)serv->port, redis ? redis->errstr : "unknown error");
454 				upstream_fail (&serv->up, time (NULL));
455 
456 				if (redis) {
457 					redisFree (redis);
458 				}
459 
460 				return false;
461 			}
462 			else {
463 
464 				rep = 1;
465 				if (cfg->cache_password) {
466 					redisAppendCommand (redis, "AUTH %s",
467 							cfg->cache_password);
468 					rep ++;
469 				}
470 				if (cfg->cache_dbname) {
471 					redisAppendCommand (redis, "SELECT %s",
472 							cfg->cache_dbname);
473 					rep ++;
474 				}
475 
476 				redisAppendCommand (redis, "DELETE %b", key, keylen);
477 
478 				while (rep > 0) {
479 					redisGetReply (redis, (void **)&r);
480 
481 					/* Ignore all replies but the last one */
482 					if (r != NULL && rep != 1) {
483 						freeReplyObject (r);
484 						r = NULL;
485 					}
486 					rep --;
487 				}
488 
489 				if (r != NULL) {
490 					freeReplyObject (r);
491 				}
492 
493 				redisFree (redis);
494 				upstream_ok (&serv->up, time (NULL));
495 			}
496 		}
497 		else {
498 #ifdef WITH_MEMCACHED
499 			char *kval;
500 			size_t value_len = 0;
501 			uint32_t mflags;
502 			int mret;
503 			memcached_st *mctx;
504 
505 			mctx = memcached_create (NULL);
506 			rmilter_format_libmemcached_config (cfg, serv, mctx);
507 
508 			if (mctx == NULL) {
509 				msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
510 						(int)serv->port, strerror (errno));
511 				upstream_fail (&serv->up, time (NULL));
512 
513 				return false;
514 			}
515 
516 			mret = memcached_delete (mctx, key, keylen, 0);
517 
518 			if (!compat_memcached_success (mret)) {
519 				if (compat_memcached_fatal (mret)) {
520 					msg_err ("<%s>; cannot delete key on %s:%d: %s", priv->mlfi_id, serv->addr,
521 							(int)serv->port, memcached_strerror (mctx, mret));
522 					upstream_fail (&serv->up, time (NULL));
523 					memcached_free (mctx);
524 
525 					return false;
526 				}
527 			}
528 
529 			upstream_ok (&serv->up, time (NULL));
530 			memcached_free (mctx);
531 #else
532 			msg_err ("<%s>; memcached query requested when memcached support is"
533 					" not compiled", priv->mlfi_id);
534 #endif
535 		}
536 
537 	}
538 
539 	return true;
540 }
541 
542 int
rmilter_publish_cache(struct config_file * cfg,enum rmilter_publish_type type,const unsigned char * channel,size_t channel_len,const unsigned char * data,size_t datalen,struct mlfi_priv * priv)543 rmilter_publish_cache (struct config_file *cfg, enum rmilter_publish_type type,
544 		const unsigned char *channel, size_t channel_len,
545 		const unsigned char *data, size_t datalen,
546 		struct mlfi_priv *priv)
547 {
548 	struct cache_server *serv;
549 	redisContext *redis;
550 	redisReply *r = NULL;
551 	struct timeval tv;
552 	size_t nelems = 1;
553 	int rep, n = 0;
554 
555 	serv = rmilter_get_publish_server (cfg, type, priv);
556 
557 	if (serv) {
558 		if (cfg->cache_use_redis) {
559 			if (serv->port == DEFAULT_MEMCACHED_PORT) {
560 				serv->port = DEFAULT_REDIS_PORT;
561 			}
562 
563 			msec_to_tv (cfg->cache_connect_timeout, &tv);
564 			redis = redisConnectWithTimeout (serv->addr, serv->port, tv);
565 
566 			if (redis == NULL || redis->err != 0) {
567 				msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
568 					(int)serv->port, redis ? redis->errstr : "unknown error");
569 				upstream_fail (&serv->up, time (NULL));
570 
571 				if (redis) {
572 					redisFree (redis);
573 				}
574 
575 				return -1;
576 			}
577 			else {
578 				rep = 1;
579 				if (cfg->cache_password) {
580 					redisAppendCommand (redis, "AUTH %s",
581 							cfg->cache_password);
582 					rep ++;
583 				}
584 				if (cfg->cache_dbname) {
585 					redisAppendCommand (redis, "SELECT %s",
586 							cfg->cache_dbname);
587 					rep ++;
588 				}
589 
590 
591 				redisAppendCommand (redis, "PUBLISH %b %b", channel, channel_len,
592 						data, datalen);
593 
594 				while (rep > 0) {
595 					redisGetReply (redis, (void **)&r);
596 
597 					/* Ignore all replies but the last one */
598 					if (r != NULL && rep != 1) {
599 						freeReplyObject (r);
600 						r = NULL;
601 					}
602 					rep --;
603 				}
604 
605 				if (r != NULL) {
606 					n = r->integer;
607 					freeReplyObject (r);
608 				}
609 
610 				redisFree (redis);
611 				upstream_ok (&serv->up, time (NULL));
612 			}
613 		}
614 		else {
615 			msg_err ("<%s>; memcached query requested when pubsub is available "
616 					"only for redis", priv->mlfi_id);
617 
618 			return -1;
619 		}
620 	}
621 
622 	return n;
623 }
624