1 /*
2 * Copyright (c) 2016, Vsevolod Stakhov
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
14 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
15 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
16 * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
17 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
18 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
19 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
20 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
21 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
22 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 */
24
25 #include "config.h"
26 #ifdef WITH_MEMCACHED
27 #include "libmemcached/memcached.h"
28 #endif
29 #include "cfg_file.h"
30 #include "cache.h"
31 #include "hiredis.h"
32 #include "rmilter.h"
33 #include "upstream.h"
34 #include "util.h"
35 #include <assert.h>
36
37 #define DEFAULT_REDIS_PORT 6379
38
39 #ifdef WITH_MEMCACHED
compat_memcached_success(int rc)40 static inline bool compat_memcached_success(int rc)
41 {
42 return (rc == MEMCACHED_BUFFERED ||
43 rc == MEMCACHED_DELETED ||
44 rc == MEMCACHED_END ||
45 rc == MEMCACHED_ITEM ||
46 rc == MEMCACHED_STAT ||
47 rc == MEMCACHED_STORED ||
48 rc == MEMCACHED_SUCCESS ||
49 rc == MEMCACHED_VALUE);
50 }
51
compat_memcached_fatal(int rc)52 static inline bool compat_memcached_fatal(int rc)
53 {
54 return (
55 rc != MEMCACHED_BUFFERED &&
56 rc != MEMCACHED_DATA_EXISTS &&
57 rc != MEMCACHED_DELETED &&
58 rc != MEMCACHED_END &&
59 rc != MEMCACHED_ITEM &&
60 rc != MEMCACHED_NOTFOUND &&
61 rc != MEMCACHED_NOTSTORED &&
62 rc != MEMCACHED_STAT &&
63 rc != MEMCACHED_STORED &&
64 rc != MEMCACHED_SUCCESS &&
65 rc != MEMCACHED_VALUE);
66 }
67 #endif
68
69 static struct cache_server *
rmilter_get_server(struct config_file * cfg,enum rmilter_query_type type,const unsigned char * key,size_t keylen,struct mlfi_priv * priv)70 rmilter_get_server (struct config_file *cfg, enum rmilter_query_type type,
71 const unsigned char *key, size_t keylen, struct mlfi_priv *priv)
72 {
73 struct cache_server *serv = NULL;
74 void *ptr = NULL;
75 unsigned mlen = 0;
76
77 switch (type) {
78 case RMILTER_QUERY_GREYLIST:
79 if (cfg->cache_servers_grey_num > 0) {
80 ptr = cfg->cache_servers_grey;
81 mlen = cfg->cache_servers_grey_num;
82 }
83 break;
84 case RMILTER_QUERY_WHITELIST:
85 if (cfg->cache_servers_white_num > 0) {
86 ptr = cfg->cache_servers_white;
87 mlen = cfg->cache_servers_white_num;
88 }
89 else if (cfg->cache_servers_grey_num > 0) {
90 ptr = cfg->cache_servers_grey;
91 mlen = cfg->cache_servers_grey_num;
92 }
93 break;
94 case RMILTER_QUERY_RATELIMIT:
95 if (cfg->cache_servers_limits_num > 0) {
96 ptr = cfg->cache_servers_limits;
97 mlen = cfg->cache_servers_limits_num;
98 }
99 break;
100 case RMILTER_QUERY_ID:
101 if (cfg->cache_servers_id_num > 0) {
102 ptr = cfg->cache_servers_id;
103 mlen = cfg->cache_servers_id_num;
104 }
105 break;
106 }
107
108 if (ptr) {
109 serv = (struct cache_server *)get_upstream_by_hash (ptr, mlen,
110 sizeof (*serv), time (NULL),
111 cfg->cache_error_time, cfg->cache_dead_time,
112 cfg->cache_maxerrors, key, keylen, priv);
113 }
114
115 return serv;
116 }
117
118 static struct cache_server *
rmilter_get_publish_server(struct config_file * cfg,enum rmilter_publish_type type,struct mlfi_priv * priv)119 rmilter_get_publish_server (struct config_file *cfg, enum rmilter_publish_type type,
120 struct mlfi_priv *priv)
121 {
122 struct cache_server *serv = NULL;
123 void *ptr = NULL;
124 unsigned mlen = 0;
125
126 switch (type) {
127 case RMILTER_PUBLISH_COPY:
128 if (cfg->cache_servers_copy_num > 0) {
129 ptr = cfg->cache_servers_copy;
130 mlen = cfg->cache_servers_copy_num;
131 }
132 break;
133 case RMILTER_PUBLISH_SPAM:
134 if (cfg->cache_servers_spam_num > 0) {
135 ptr = cfg->cache_servers_spam;
136 mlen = cfg->cache_servers_spam_num;
137 }
138 break;
139 }
140
141 if (ptr) {
142 serv = (struct cache_server *)get_upstream_master_slave (ptr, mlen,
143 sizeof (*serv), time (NULL),
144 cfg->cache_error_time, cfg->cache_dead_time,
145 cfg->cache_maxerrors, priv);
146 }
147
148 return serv;
149 }
150
151 #ifdef WITH_MEMCACHED
152 static void
rmilter_format_libmemcached_config(struct config_file * cfg,struct cache_server * serv,memcached_st * ctx)153 rmilter_format_libmemcached_config (struct config_file *cfg,
154 struct cache_server *serv,
155 memcached_st *ctx)
156 {
157 if (serv->addr[0] == '/' || serv->addr[0] == '.') {
158 /* Assume unix socket */
159 memcached_server_add_unix_socket (ctx, serv->addr);
160 }
161 else {
162 memcached_server_add (ctx, serv->addr, serv->port);
163 }
164
165 memcached_behavior_set (ctx, MEMCACHED_BEHAVIOR_CONNECT_TIMEOUT,
166 cfg->cache_connect_timeout);
167 memcached_behavior_set (ctx, MEMCACHED_BEHAVIOR_POLL_TIMEOUT,
168 cfg->cache_connect_timeout);
169 }
170 #endif
171
172 bool
rmilter_query_cache(struct config_file * cfg,enum rmilter_query_type type,const unsigned char * key,size_t keylen,unsigned char ** data,size_t * datalen,struct mlfi_priv * priv)173 rmilter_query_cache (struct config_file *cfg, enum rmilter_query_type type,
174 const unsigned char *key, size_t keylen,
175 unsigned char **data, size_t *datalen, struct mlfi_priv *priv)
176 {
177 struct cache_server *serv;
178 redisContext *redis;
179 redisReply *r = NULL;
180 struct timeval tv;
181 bool ret = false;
182 size_t nelems = 1;
183 int rep;
184
185 serv = rmilter_get_server (cfg, type, key, keylen, priv);
186
187 if (serv) {
188 if (cfg->cache_use_redis) {
189 /* Special workaround */
190 if (serv->port == DEFAULT_MEMCACHED_PORT) {
191 serv->port = DEFAULT_REDIS_PORT;
192 }
193
194 msec_to_tv (cfg->cache_connect_timeout, &tv);
195 redis = redisConnectWithTimeout (serv->addr, serv->port, tv);
196
197 if (redis == NULL || redis->err != 0) {
198 msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
199 (int)serv->port, redis ? redis->errstr : "unknown error");
200 upstream_fail (&serv->up, time (NULL));
201
202 if (redis) {
203 redisFree (redis);
204 }
205
206 return false;
207 }
208 else {
209 rep = 1;
210 if (cfg->cache_password) {
211 redisAppendCommand (redis, "AUTH %s",
212 cfg->cache_password);
213 rep ++;
214 }
215 if (cfg->cache_dbname) {
216 redisAppendCommand (redis, "SELECT %s",
217 cfg->cache_dbname);
218 rep ++;
219 }
220
221 redisAppendCommand (redis, "GET %b", key, keylen);
222
223 while (rep > 0) {
224 redisGetReply (redis, (void **)&r);
225
226 /* Ignore all replies but the last one */
227 if (r != NULL && rep != 1) {
228 freeReplyObject (r);
229 r = NULL;
230 }
231 rep --;
232 }
233
234 if (r != NULL) {
235 if (r->type == REDIS_REPLY_STRING && r->len > 0) {
236 *data = malloc (r->len);
237 if (*data) {
238 memcpy (*data, r->str, r->len);
239 ret = true;
240 if (*datalen) {
241 *datalen = r->len;
242 }
243 }
244 }
245
246 freeReplyObject (r);
247 }
248
249 redisFree (redis);
250 upstream_ok (&serv->up, time (NULL));
251 }
252 }
253 else {
254 #ifdef WITH_MEMCACHED
255 char *kval;
256 size_t value_len = 0;
257 uint32_t mflags;
258 int mret;
259 memcached_st *mctx;
260
261 mctx = memcached_create (NULL);
262 rmilter_format_libmemcached_config (cfg, serv, mctx);
263
264 if (mctx == NULL) {
265 msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
266 (int)serv->port, strerror (errno));
267 upstream_fail (&serv->up, time (NULL));
268
269 return false;
270 }
271
272 kval = memcached_get (mctx, key, keylen,
273 &value_len, &mflags, &mret);
274
275 if (!compat_memcached_success (mret)) {
276 if (kval) {
277 free (kval);
278 }
279 *datalen = 0;
280
281 if (compat_memcached_fatal (mret)) {
282 msg_err ("<%s>; cannot get key on %s:%d: %s", priv->mlfi_id, serv->addr,
283 (int)serv->port, memcached_strerror (mctx, mret));
284 upstream_fail (&serv->up, time (NULL));
285 }
286 else {
287 upstream_ok (&serv->up, time (NULL));
288 }
289 }
290 else {
291 *data = kval;
292 *datalen = value_len;
293 upstream_ok (&serv->up, time (NULL));
294 ret = true;
295 }
296
297 memcached_free (mctx);
298 #else
299 msg_err ("<%s>; memcached query requested when memcached support is"
300 " not compiled", priv->mlfi_id);
301 #endif
302 }
303 }
304
305 return ret;
306 }
307
308 bool
rmilter_set_cache(struct config_file * cfg,enum rmilter_query_type type,const unsigned char * key,size_t keylen,const unsigned char * data,size_t datalen,unsigned expire,struct mlfi_priv * priv)309 rmilter_set_cache (struct config_file *cfg, enum rmilter_query_type type ,
310 const unsigned char *key, size_t keylen,
311 const unsigned char *data, size_t datalen,
312 unsigned expire, struct mlfi_priv *priv)
313 {
314 struct cache_server *serv;
315 redisContext *redis;
316 redisReply *r = NULL;
317 struct timeval tv;
318 size_t nelems = 1;
319 int rep;
320
321 serv = rmilter_get_server (cfg, type, key, keylen, priv);
322
323 if (serv) {
324 if (cfg->cache_use_redis) {
325 if (serv->port == DEFAULT_MEMCACHED_PORT) {
326 serv->port = DEFAULT_REDIS_PORT;
327 }
328
329 msec_to_tv (cfg->cache_connect_timeout, &tv);
330 redis = redisConnectWithTimeout (serv->addr, serv->port, tv);
331
332 if (redis == NULL || redis->err != 0) {
333 msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
334 (int)serv->port, redis ? redis->errstr : "unknown error");
335 upstream_fail (&serv->up, time (NULL));
336
337 if (redis) {
338 redisFree (redis);
339 }
340
341 return false;
342 }
343 else {
344 rep = 1;
345 if (cfg->cache_password) {
346 redisAppendCommand (redis, "AUTH %s",
347 cfg->cache_password);
348 rep ++;
349 }
350 if (cfg->cache_dbname) {
351 redisAppendCommand (redis, "SELECT %s",
352 cfg->cache_dbname);
353 rep ++;
354 }
355
356 if (expire > 0) {
357 redisAppendCommand (redis, "SETEX %b %d %b", key, keylen,
358 expire, data, datalen);
359 }
360 else {
361 redisAppendCommand (redis, "SET %b %b", key, keylen,
362 data, datalen);
363 }
364
365 while (rep > 0) {
366 redisGetReply (redis, (void **)&r);
367
368 /* Ignore all replies but the last one */
369 if (r != NULL && rep != 1) {
370 freeReplyObject (r);
371 r = NULL;
372 }
373 rep --;
374 }
375
376 if (r != NULL) {
377 freeReplyObject (r);
378 }
379
380 redisFree (redis);
381 upstream_ok (&serv->up, time (NULL));
382 }
383 }
384 else {
385 #ifdef WITH_MEMCACHED
386 char *kval;
387 size_t value_len = 0;
388 uint32_t mflags;
389 int mret;
390 memcached_st *mctx;
391
392 mctx = memcached_create (NULL);
393 rmilter_format_libmemcached_config (cfg, serv, mctx);
394
395 if (mctx == NULL) {
396 msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
397 (int)serv->port, strerror (errno));
398 upstream_fail (&serv->up, time (NULL));
399
400 return false;
401 }
402
403 mret = memcached_set (mctx, key, keylen, data, datalen,
404 expire, 0);
405
406 if (!compat_memcached_success (mret)) {
407 msg_err ("<%s>; cannot set key on %s:%d: %s", priv->mlfi_id, serv->addr,
408 (int)serv->port, memcached_strerror (mctx, mret));
409 upstream_fail (&serv->up, time (NULL));
410 memcached_free (mctx);
411
412 return false;
413 }
414 else {
415 upstream_ok (&serv->up, time (NULL));
416 }
417
418 memcached_free (mctx);
419 #else
420 msg_err ("<%s>; memcached query requested when memcached support is"
421 " not compiled", priv->mlfi_id);
422 #endif
423 }
424 }
425
426 return true;
427 }
428
429 bool
rmilter_delete_cache(struct config_file * cfg,enum rmilter_query_type type,const unsigned char * key,size_t keylen,struct mlfi_priv * priv)430 rmilter_delete_cache (struct config_file *cfg, enum rmilter_query_type type ,
431 const unsigned char *key, size_t keylen, struct mlfi_priv *priv)
432 {
433 struct cache_server *serv;
434 redisContext *redis;
435 redisReply *r = NULL;
436 struct timeval tv;
437 size_t nelems = 1;
438 int rep;
439
440 serv = rmilter_get_server (cfg, type, key, keylen, priv);
441
442 if (serv) {
443 if (cfg->cache_use_redis) {
444 if (serv->port == DEFAULT_MEMCACHED_PORT) {
445 serv->port = DEFAULT_REDIS_PORT;
446 }
447
448 msec_to_tv (cfg->cache_connect_timeout, &tv);
449 redis = redisConnectWithTimeout (serv->addr, serv->port, tv);
450
451 if (redis == NULL || redis->err != 0) {
452 msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
453 (int)serv->port, redis ? redis->errstr : "unknown error");
454 upstream_fail (&serv->up, time (NULL));
455
456 if (redis) {
457 redisFree (redis);
458 }
459
460 return false;
461 }
462 else {
463
464 rep = 1;
465 if (cfg->cache_password) {
466 redisAppendCommand (redis, "AUTH %s",
467 cfg->cache_password);
468 rep ++;
469 }
470 if (cfg->cache_dbname) {
471 redisAppendCommand (redis, "SELECT %s",
472 cfg->cache_dbname);
473 rep ++;
474 }
475
476 redisAppendCommand (redis, "DELETE %b", key, keylen);
477
478 while (rep > 0) {
479 redisGetReply (redis, (void **)&r);
480
481 /* Ignore all replies but the last one */
482 if (r != NULL && rep != 1) {
483 freeReplyObject (r);
484 r = NULL;
485 }
486 rep --;
487 }
488
489 if (r != NULL) {
490 freeReplyObject (r);
491 }
492
493 redisFree (redis);
494 upstream_ok (&serv->up, time (NULL));
495 }
496 }
497 else {
498 #ifdef WITH_MEMCACHED
499 char *kval;
500 size_t value_len = 0;
501 uint32_t mflags;
502 int mret;
503 memcached_st *mctx;
504
505 mctx = memcached_create (NULL);
506 rmilter_format_libmemcached_config (cfg, serv, mctx);
507
508 if (mctx == NULL) {
509 msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
510 (int)serv->port, strerror (errno));
511 upstream_fail (&serv->up, time (NULL));
512
513 return false;
514 }
515
516 mret = memcached_delete (mctx, key, keylen, 0);
517
518 if (!compat_memcached_success (mret)) {
519 if (compat_memcached_fatal (mret)) {
520 msg_err ("<%s>; cannot delete key on %s:%d: %s", priv->mlfi_id, serv->addr,
521 (int)serv->port, memcached_strerror (mctx, mret));
522 upstream_fail (&serv->up, time (NULL));
523 memcached_free (mctx);
524
525 return false;
526 }
527 }
528
529 upstream_ok (&serv->up, time (NULL));
530 memcached_free (mctx);
531 #else
532 msg_err ("<%s>; memcached query requested when memcached support is"
533 " not compiled", priv->mlfi_id);
534 #endif
535 }
536
537 }
538
539 return true;
540 }
541
542 int
rmilter_publish_cache(struct config_file * cfg,enum rmilter_publish_type type,const unsigned char * channel,size_t channel_len,const unsigned char * data,size_t datalen,struct mlfi_priv * priv)543 rmilter_publish_cache (struct config_file *cfg, enum rmilter_publish_type type,
544 const unsigned char *channel, size_t channel_len,
545 const unsigned char *data, size_t datalen,
546 struct mlfi_priv *priv)
547 {
548 struct cache_server *serv;
549 redisContext *redis;
550 redisReply *r = NULL;
551 struct timeval tv;
552 size_t nelems = 1;
553 int rep, n = 0;
554
555 serv = rmilter_get_publish_server (cfg, type, priv);
556
557 if (serv) {
558 if (cfg->cache_use_redis) {
559 if (serv->port == DEFAULT_MEMCACHED_PORT) {
560 serv->port = DEFAULT_REDIS_PORT;
561 }
562
563 msec_to_tv (cfg->cache_connect_timeout, &tv);
564 redis = redisConnectWithTimeout (serv->addr, serv->port, tv);
565
566 if (redis == NULL || redis->err != 0) {
567 msg_err ("<%s>; cannot connect to %s:%d: %s", priv->mlfi_id, serv->addr,
568 (int)serv->port, redis ? redis->errstr : "unknown error");
569 upstream_fail (&serv->up, time (NULL));
570
571 if (redis) {
572 redisFree (redis);
573 }
574
575 return -1;
576 }
577 else {
578 rep = 1;
579 if (cfg->cache_password) {
580 redisAppendCommand (redis, "AUTH %s",
581 cfg->cache_password);
582 rep ++;
583 }
584 if (cfg->cache_dbname) {
585 redisAppendCommand (redis, "SELECT %s",
586 cfg->cache_dbname);
587 rep ++;
588 }
589
590
591 redisAppendCommand (redis, "PUBLISH %b %b", channel, channel_len,
592 data, datalen);
593
594 while (rep > 0) {
595 redisGetReply (redis, (void **)&r);
596
597 /* Ignore all replies but the last one */
598 if (r != NULL && rep != 1) {
599 freeReplyObject (r);
600 r = NULL;
601 }
602 rep --;
603 }
604
605 if (r != NULL) {
606 n = r->integer;
607 freeReplyObject (r);
608 }
609
610 redisFree (redis);
611 upstream_ok (&serv->up, time (NULL));
612 }
613 }
614 else {
615 msg_err ("<%s>; memcached query requested when pubsub is available "
616 "only for redis", priv->mlfi_id);
617
618 return -1;
619 }
620 }
621
622 return n;
623 }
624