1 /*
2  * ProFTPD - FTP server daemon
3  * Copyright (c) 2017-2020 The ProFTPD Project team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Suite 500, Boston, MA 02110-1335, USA.
18  *
19  * As a special exemption, The ProFTPD Project team and other respective
20  * copyright holders give permission to link this program with OpenSSL, and
21  * distribute the resulting executable, without including the source code for
22  * OpenSSL in the source distribution.
23  */
24 
25 /* Redis management */
26 
27 #include "conf.h"
28 
29 #ifdef PR_USE_REDIS
30 
31 #include <hiredis/hiredis.h>
32 
33 #ifndef REDIS_CONNECT_RETRIES
34 # define REDIS_CONNECT_RETRIES	10
35 #endif /* REDIS_CONNECT_RETRIES */
36 
37 /* When scanning for keys/lists, how many items to request per command? */
38 #define PR_REDIS_SCAN_SIZE	100
39 
40 struct redis_rec {
41   pool *pool;
42   module *owner;
43   redisContext *ctx;
44   unsigned long flags;
45 
46   /* For tracking the number of "opens"/"closes" on a shared redis_rec,
47    * as the same struct might be used by multiple modules in the same
48    * session, each module doing a conn_get()/conn_close().
49    */
50   unsigned int refcount;
51 
52   /* Redis server version. */
53   unsigned int major_version;
54   unsigned int minor_version;
55   unsigned int patch_version;
56 
57   /* Table mapping modules to their namespaces */
58   pr_table_t *namespace_tab;
59 };
60 
61 static array_header *redis_sentinels = NULL;
62 static const char *redis_sentinel_master = NULL;
63 
64 static const char *redis_server = NULL;
65 static int redis_port = -1;
66 static unsigned long redis_flags = 0UL;
67 static const char *redis_username = NULL;
68 static const char *redis_password = NULL;
69 static const char *redis_db_idx = NULL;
70 
71 static pr_redis_t *sess_redis = NULL;
72 
73 static unsigned long redis_connect_millis = 500;
74 static unsigned long redis_io_millis = 500;
75 
76 static const char *trace_channel = "redis";
77 
78 static const char *get_reply_type(int reply_type);
79 
millis2timeval(struct timeval * tv,unsigned long millis)80 static void millis2timeval(struct timeval *tv, unsigned long millis) {
81   tv->tv_sec = (millis / 1000);
82   tv->tv_usec = (millis - (tv->tv_sec * 1000)) * 1000;
83 }
84 
redis_strerror(pool * p,pr_redis_t * redis,int rerrno)85 static const char *redis_strerror(pool *p, pr_redis_t *redis, int rerrno) {
86   const char *err;
87 
88   switch (redis->ctx->err) {
89     case REDIS_ERR_IO:
90       err = pstrcat(p, "[io] ", strerror(rerrno), NULL);
91       break;
92 
93     case REDIS_ERR_EOF:
94       err = pstrcat(p, "[eof] ", redis->ctx->errstr, NULL);
95       break;
96 
97     case REDIS_ERR_PROTOCOL:
98       err = pstrcat(p, "[protocol] ", redis->ctx->errstr, NULL);
99       break;
100 
101     case REDIS_ERR_OOM:
102       err = pstrcat(p, "[oom] ", redis->ctx->errstr, NULL);
103       break;
104 
105     case REDIS_ERR_OTHER:
106       err = pstrcat(p, "[other] ", redis->ctx->errstr, NULL);
107       break;
108 
109     case REDIS_OK:
110     default:
111       err = "OK";
112       break;
113   }
114 
115   return err;
116 }
117 
conn_reconnect(pool * p,pr_redis_t * redis)118 static int conn_reconnect(pool *p, pr_redis_t *redis) {
119   int xerrno = 0;
120 #ifdef HAVE_HIREDIS_REDISRECONNECT
121   register unsigned int i;
122 
123   if (redis->flags & PR_REDIS_CONN_FL_NO_RECONNECT) {
124     errno = EPERM;
125     return -1;
126   }
127 
128   /* Use the already-provided REDIS_CONNECT_RETRIES from <hiredis/hiredis.h>
129    * rather than defining our own.
130    *
131    * Currently that is a rather low number (3), so I do not feel the need
132    * for retry delays or exponential backoff at this time.
133    */
134   for (i = 0; i < REDIS_CONNECT_RETRIES; i++) {
135     int res;
136 
137     pr_trace_msg(trace_channel, 9, "attempt #%u to reconnect", i+1);
138 
139     res = redisReconnect(redis->ctx);
140     xerrno = errno;
141     if (res == REDIS_OK) {
142       pr_trace_msg(trace_channel, 9, "attempt #%u to reconnect succeeded", i+1);
143       return 0;
144     }
145 
146     pr_trace_msg(trace_channel, 9, "attempt #%u to reconnect failed: %s",
147       i+ 1, redis_strerror(p, redis, xerrno));
148   }
149 #else
150   xerrno = ENOSYS;
151 #endif /* No redisReconnect() */
152 
153   errno = xerrno;
154   return -1;
155 }
156 
handle_reply(pr_redis_t * redis,const char * cmd,redisReply * reply)157 static redisReply *handle_reply(pr_redis_t *redis, const char *cmd,
158     redisReply *reply) {
159   int xerrno;
160   pool *tmp_pool;
161 
162   if (reply != NULL) {
163     return reply;
164   }
165 
166   xerrno = errno;
167   tmp_pool = make_sub_pool(redis->pool);
168   pr_trace_msg(trace_channel, 2, "error executing %s command: %s", cmd,
169     redis_strerror(tmp_pool, redis, xerrno));
170 
171   if (redis->ctx->err == REDIS_ERR_IO ||
172       redis->ctx->err == REDIS_ERR_EOF) {
173     int res;
174 
175     res = conn_reconnect(tmp_pool, redis);
176     if (res < 0) {
177       pr_trace_msg(trace_channel, 9, "failed to reconnect: %s",
178         strerror(errno));
179     }
180   }
181 
182   destroy_pool(tmp_pool);
183   errno = xerrno;
184   return NULL;
185 }
186 
ping_server(pr_redis_t * redis)187 static int ping_server(pr_redis_t *redis) {
188   const char *cmd;
189   redisReply *reply;
190 
191   cmd = "PING";
192   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
193   reply = redisCommand(redis->ctx, "%s", cmd);
194   reply = handle_reply(redis, cmd, reply);
195   if (reply == NULL) {
196     return -1;
197   }
198 
199   /* We COULD assert a "PONG" response here, but really, anything is OK. */
200   pr_trace_msg(trace_channel, 7, "%s reply: %s", cmd, reply->str);
201   freeReplyObject(reply);
202   return 0;
203 }
204 
parse_redis_version(pr_redis_t * redis,redisReply * info)205 static void parse_redis_version(pr_redis_t *redis, redisReply *info) {
206   pool *tmp_pool;
207   unsigned int major, minor, patch;
208   char *text, *version_text;
209 
210   if (info->type != REDIS_REPLY_STRING) {
211     pr_trace_msg(trace_channel, 1, "expected STRING reply for INFO, got %s",
212       get_reply_type(info->type));
213     return;
214   }
215 
216   tmp_pool = make_sub_pool(redis->pool);
217   pr_pool_tag(tmp_pool, "Redis version parsing pool");
218   text = pstrndup(tmp_pool, info->str, info->len);
219 
220   /* Scan the entire INFO string for "redis_version:N.N.N". */
221 
222   version_text = strstr(text, "redis_version:");
223   if (version_text == NULL) {
224     pr_trace_msg(trace_channel, 1, "no `redis_version` found in INFO reply");
225     destroy_pool(tmp_pool);
226     return;
227   }
228 
229   if (sscanf(version_text, "redis_version:%u.%u.%u", &major, &minor,
230       &patch) == 3) {
231     redis->major_version = major;
232     redis->minor_version = minor;
233     redis->patch_version = patch;
234 
235     pr_trace_msg(trace_channel, 9,
236       "parsed Redis version %u (major), %u (minor), %u (patch) out of INFO",
237       redis->major_version, redis->minor_version, redis->patch_version);
238 
239   } else {
240     pr_trace_msg(trace_channel, 1, "failed to scan Redis version '%s'",
241       version_text);
242   }
243 
244   destroy_pool(tmp_pool);
245 }
246 
stat_server(pr_redis_t * redis,const char * section)247 static int stat_server(pr_redis_t *redis, const char *section) {
248   const char *cmd;
249   redisReply *reply;
250 
251   cmd = "INFO";
252   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
253   reply = redisCommand(redis->ctx, "%s %s", cmd, section);
254   reply = handle_reply(redis, cmd, reply);
255   if (reply == NULL) {
256     return -1;
257   }
258 
259   if (pr_trace_get_level(trace_channel) >= 25) {
260     pr_trace_msg(trace_channel, 25, "%s reply: %s", cmd, reply->str);
261 
262   } else {
263     pr_trace_msg(trace_channel, 7, "%s reply: (text, %lu bytes)", cmd,
264       (unsigned long) reply->len);
265   }
266 
267   if (redis->major_version == 0 &&
268       (strcmp(section, "server") == 0 || strcmp(section, "") == 0)) {
269     /* We are particularly interested in the Redis server version; we key
270      * off of this version to detect when to change our command semantics,
271      * such as for AUTH.
272      *
273      * Thus we parse the server version out of the "server" info, unless
274      * the version is already known.
275      */
276     parse_redis_version(redis, reply);
277   }
278 
279   freeReplyObject(reply);
280   return 0;
281 }
282 
pr_redis_conn_get(pool * p,unsigned long flags)283 pr_redis_t *pr_redis_conn_get(pool *p, unsigned long flags) {
284   if (p == NULL) {
285     errno = EINVAL;
286     return NULL;
287   }
288 
289   if (sess_redis != NULL) {
290     sess_redis->refcount++;
291     return sess_redis;
292   }
293 
294   return pr_redis_conn_new(p, NULL, flags);
295 }
296 
set_conn_options(pr_redis_t * redis)297 static int set_conn_options(pr_redis_t *redis) {
298   int res, xerrno;
299   struct timeval tv;
300   pool *tmp_pool;
301 
302   tmp_pool = make_sub_pool(redis->pool);
303 
304   millis2timeval(&tv, redis_io_millis);
305   res = redisSetTimeout(redis->ctx, tv);
306   xerrno = errno;
307 
308   if (res == REDIS_ERR) {
309     pr_trace_msg(trace_channel, 4,
310       "error setting %lu ms timeout: %s", redis_io_millis,
311       redis_strerror(tmp_pool, redis, xerrno));
312   }
313 
314 #if HIREDIS_MAJOR >= 0 && \
315     HIREDIS_MINOR >= 12
316   res = redisEnableKeepAlive(redis->ctx);
317   xerrno = errno;
318 
319   if (res == REDIS_ERR) {
320     pr_trace_msg(trace_channel, 4,
321       "error setting keepalive: %s", redis_strerror(tmp_pool, redis, xerrno));
322   }
323 #endif /* HiRedis 0.12.0 and later */
324 
325   destroy_pool(tmp_pool);
326   return 0;
327 }
328 
sess_redis_cleanup(void * data)329 static void sess_redis_cleanup(void *data) {
330   sess_redis = NULL;
331 }
332 
make_redis_conn(pool * p,const char * host,int port)333 static pr_redis_t *make_redis_conn(pool *p, const char *host, int port) {
334   int uses_ip = TRUE, xerrno;
335   pr_redis_t *redis;
336   pool *sub_pool;
337   redisContext *ctx;
338   struct timeval tv;
339 
340   millis2timeval(&tv, redis_connect_millis);
341 
342   /* If the given redis "server" string starts with a '/' character, assume
343    * that it is a Unix socket path.
344    */
345   if (*host == '/') {
346     uses_ip = FALSE;
347     ctx = redisConnectUnixWithTimeout(host, tv);
348 
349   } else {
350     ctx = redisConnectWithTimeout(host, port, tv);
351   }
352 
353   xerrno = errno;
354 
355   if (ctx == NULL) {
356     errno = ENOMEM;
357     return NULL;
358   }
359 
360   if (ctx->err != 0) {
361     const char *err_type, *err_msg;
362 
363     switch (ctx->err) {
364       case REDIS_ERR_IO:
365         err_type = "io";
366         err_msg = strerror(xerrno);
367         break;
368 
369       case REDIS_ERR_EOF:
370         err_type = "eof";
371         err_msg = ctx->errstr;
372         break;
373 
374       case REDIS_ERR_PROTOCOL:
375         err_type = "protocol";
376         err_msg = ctx->errstr;
377         break;
378 
379       case REDIS_ERR_OOM:
380         err_type = "oom";
381         err_msg = ctx->errstr;
382         break;
383 
384       case REDIS_ERR_OTHER:
385         err_type = "other";
386         err_msg = ctx->errstr;
387         break;
388 
389       default:
390         err_type = "unknown";
391         err_msg = ctx->errstr;
392         break;
393     }
394 
395     if (uses_ip == TRUE) {
396       pr_trace_msg(trace_channel, 3,
397         "error connecting to %s#%d: [%s] %s", host, port, err_type, err_msg);
398 
399     } else {
400       pr_trace_msg(trace_channel, 3,
401         "error connecting to '%s': [%s] %s", host, err_type, err_msg);
402     }
403 
404     redisFree(ctx);
405     errno = EIO;
406     return NULL;
407   }
408 
409   sub_pool = make_sub_pool(p);
410   pr_pool_tag(sub_pool, "Redis connection pool");
411 
412   redis = pcalloc(sub_pool, sizeof(pr_redis_t));
413   redis->pool = sub_pool;
414   redis->ctx = ctx;
415 
416   return redis;
417 }
418 
discover_redis_master(pool * p,const char * host,int port,const char * master)419 static int discover_redis_master(pool *p, const char *host, int port,
420     const char *master) {
421   int res = 0, xerrno = 0;
422   pool *tmp_pool;
423   pr_redis_t *redis;
424   pr_netaddr_t *addr = NULL;
425 
426   tmp_pool = make_sub_pool(p);
427 
428   redis = make_redis_conn(tmp_pool, host, port);
429   xerrno = errno;
430 
431   if (redis == NULL) {
432     destroy_pool(tmp_pool);
433 
434     errno = xerrno;
435     return -1;
436   }
437 
438   if (master == NULL) {
439     array_header *masters = NULL;
440 
441     res = pr_redis_sentinel_get_masters(tmp_pool, redis, &masters);
442     if (res < 0) {
443       pr_trace_msg(trace_channel, 14, "error getting masters from Sentinel: %s",
444         strerror(errno));
445 
446     } else {
447       master = ((char **) masters->elts)[0];
448       pr_trace_msg(trace_channel, 17, "discovered master '%s'", master);
449     }
450   }
451 
452   res = pr_redis_sentinel_get_master_addr(tmp_pool, redis, master, &addr);
453   xerrno = errno;
454 
455   if (res < 0) {
456     pr_redis_conn_destroy(redis);
457     destroy_pool(tmp_pool);
458 
459     errno = xerrno;
460     return -1;
461   }
462 
463   redis_server = pstrdup(p, pr_netaddr_get_ipstr(addr));
464   redis_port = ntohs(pr_netaddr_get_port(addr));
465 
466   pr_redis_conn_destroy(redis);
467   destroy_pool(tmp_pool);
468 
469   errno = xerrno;
470   return res;
471 }
472 
pr_redis_conn_new(pool * p,module * m,unsigned long flags)473 pr_redis_t *pr_redis_conn_new(pool *p, module *m, unsigned long flags) {
474   int default_port, res, xerrno;
475   pr_redis_t *redis;
476   const char *default_host;
477 
478   if (p == NULL) {
479     errno = EINVAL;
480     return NULL;
481   }
482 
483   default_host = redis_server;
484   default_port = redis_port;
485 
486   /* Do we have a list of Sentinels configured?  If so, try those first. */
487   if (redis_sentinels != NULL) {
488     register unsigned int i;
489 
490     pr_trace_msg(trace_channel, 17,
491       "querying Sentinels (%u count) for Redis server",
492       redis_sentinels->nelts);
493 
494     for (i = 0; i < redis_sentinels->nelts; i++) {
495       pr_netaddr_t *addr;
496       const char *sentinel;
497       int port;
498 
499       addr = ((pr_netaddr_t **) redis_sentinels->elts)[i];
500       sentinel = pr_netaddr_get_ipstr(addr);
501       port = ntohs(pr_netaddr_get_port(addr));
502 
503       if (discover_redis_master(p, sentinel, port,
504           redis_sentinel_master) == 0) {
505         pr_trace_msg(trace_channel, 17,
506           "discovered Redis server %s:%d using Sentinel #%u (%s:%d)",
507           redis_server, redis_port, i+1, sentinel, port);
508       }
509     }
510   }
511 
512   /* If the Sentinels failed to provide a usable host, fall back to the
513    * default.
514    */
515   if (redis_server == NULL) {
516     redis_server = default_host;
517     redis_port = default_port;
518   }
519 
520   if (redis_server == NULL) {
521     pr_trace_msg(trace_channel, 9, "%s",
522       "unable to create new Redis connection: No server configured");
523     errno = EPERM;
524     return NULL;
525   }
526 
527   redis = make_redis_conn(p, redis_server, redis_port);
528   if (redis == NULL) {
529     return NULL;
530   }
531 
532   redis->owner = m;
533   redis->refcount = 1;
534   redis->flags = flags;
535 
536   /* The namespace table is null; it will be created if/when callers
537    * configure namespace prefixes.
538    */
539   redis->namespace_tab = NULL;
540 
541   /* Set some of the desired behavior flags on the connection */
542   res = set_conn_options(redis);
543   if (res < 0) {
544     xerrno = errno;
545 
546     pr_redis_conn_destroy(redis);
547     errno = xerrno;
548     return NULL;
549   }
550 
551   res = ping_server(redis);
552   if (res < 0) {
553     xerrno = errno;
554 
555     pr_redis_conn_destroy(redis);
556     errno = xerrno;
557     return NULL;
558   }
559 
560   /* Make sure we are connected to the configured server by querying
561    * some stats/info from it.
562    */
563   res = stat_server(redis, "server");
564   if (res < 0) {
565     xerrno = errno;
566 
567     pr_redis_conn_destroy(redis);
568     errno = xerrno;
569     return NULL;
570   }
571 
572   if (redis_password != NULL) {
573     if (redis_username != NULL) {
574       res = pr_redis_auth2(redis, redis_username, redis_password);
575 
576     } else {
577       res = pr_redis_auth(redis, redis_password);
578     }
579 
580     if (res < 0) {
581       xerrno = errno;
582 
583       pr_redis_conn_destroy(redis);
584       errno = xerrno;
585       return NULL;
586     }
587   }
588 
589   if (redis_db_idx != NULL) {
590     res = pr_redis_select(redis, redis_db_idx);
591     if (res < 0) {
592       xerrno = errno;
593 
594       pr_redis_conn_destroy(redis);
595       errno = xerrno;
596       return NULL;
597     }
598   }
599 
600   if (sess_redis == NULL) {
601     sess_redis = redis;
602 
603     /* Register a cleanup on this redis, so that when it is destroyed, we
604      * clear this sess_redis pointer, lest it remaining dangling.
605      */
606     register_cleanup2(redis->pool, NULL, sess_redis_cleanup);
607   }
608 
609   return redis;
610 }
611 
612 /* Return TRUE if we actually closed the connection, FALSE if we simply
613  * decremented the refcount.
614  */
pr_redis_conn_close(pr_redis_t * redis)615 int pr_redis_conn_close(pr_redis_t *redis) {
616   int closed = FALSE;
617 
618   if (redis == NULL) {
619     errno = EINVAL;
620     return -1;
621   }
622 
623   if (redis->refcount > 0) {
624     redis->refcount--;
625   }
626 
627   if (redis->refcount == 0) {
628     if (redis->ctx != NULL) {
629       const char *cmd = NULL;
630       redisReply *reply;
631 
632       cmd = "QUIT";
633       pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
634       reply = redisCommand(redis->ctx, "%s", cmd);
635       if (reply != NULL) {
636         freeReplyObject(reply);
637       }
638 
639       redisFree(redis->ctx);
640       redis->ctx = NULL;
641     }
642 
643     if (redis->namespace_tab != NULL) {
644       (void) pr_table_empty(redis->namespace_tab);
645       (void) pr_table_free(redis->namespace_tab);
646       redis->namespace_tab = NULL;
647     }
648 
649     closed = TRUE;
650   }
651 
652   return closed;
653 }
654 
655 /* Return TRUE if we actually closed the connection, FALSE if we simply
656  * decremented the refcount.
657  */
pr_redis_conn_destroy(pr_redis_t * redis)658 int pr_redis_conn_destroy(pr_redis_t *redis) {
659   int closed, destroyed = FALSE;
660 
661   if (redis == NULL) {
662     errno = EINVAL;
663     return -1;
664   }
665 
666   closed = pr_redis_conn_close(redis);
667   if (closed < 0) {
668     return -1;
669   }
670 
671   if (closed == TRUE) {
672     if (redis == sess_redis) {
673       sess_redis = NULL;
674     }
675 
676     destroy_pool(redis->pool);
677     destroyed = TRUE;
678   }
679 
680   return destroyed;
681 }
682 
modptr_cmp_cb(const void * k1,size_t ksz1,const void * k2,size_t ksz2)683 static int modptr_cmp_cb(const void *k1, size_t ksz1, const void *k2,
684     size_t ksz2) {
685 
686   /* Return zero to indicate a match, non-zero otherwise. */
687   return (((module *) k1) == ((module *) k2) ? 0 : 1);
688 }
689 
modptr_hash_cb(const void * k,size_t ksz)690 static unsigned int modptr_hash_cb(const void *k, size_t ksz) {
691   unsigned int key = 0;
692 
693   /* XXX Yes, this is a bit hacky for "hashing" a pointer value. */
694 
695   memcpy(&key, k, sizeof(key));
696   key ^= (key >> 16);
697 
698   return key;
699 }
700 
pr_redis_conn_set_namespace(pr_redis_t * redis,module * m,const void * prefix,size_t prefixsz)701 int pr_redis_conn_set_namespace(pr_redis_t *redis, module *m,
702     const void *prefix, size_t prefixsz) {
703 
704   if (redis == NULL ||
705       m == NULL) {
706     errno = EINVAL;
707     return -1;
708   }
709 
710   if (prefix != NULL &&
711       prefixsz == 0) {
712     errno = EINVAL;
713     return -1;
714   }
715 
716   if (redis->namespace_tab == NULL) {
717     pr_table_t *tab;
718 
719     tab = pr_table_alloc(redis->pool, 0);
720 
721     (void) pr_table_ctl(tab, PR_TABLE_CTL_SET_KEY_CMP, modptr_cmp_cb);
722     (void) pr_table_ctl(tab, PR_TABLE_CTL_SET_KEY_HASH, modptr_hash_cb);
723     redis->namespace_tab = tab;
724   }
725 
726   if (prefix != NULL) {
727     int count;
728     void *val;
729     size_t valsz;
730 
731     valsz = prefixsz;
732     val = palloc(redis->pool, valsz);
733     memcpy(val, prefix, prefixsz);
734 
735     count = pr_table_kexists(redis->namespace_tab, m, sizeof(module *));
736     if (count <= 0) {
737       (void) pr_table_kadd(redis->namespace_tab, m, sizeof(module *), val,
738         valsz);
739 
740     } else {
741       (void) pr_table_kset(redis->namespace_tab, m, sizeof(module *), val,
742         valsz);
743     }
744 
745   } else {
746     /* A NULL prefix means the caller is removing their namespace mapping. */
747     (void) pr_table_kremove(redis->namespace_tab, m, sizeof(module *), NULL);
748   }
749 
750   return 0;
751 }
752 
pr_redis_conn_get_version(pr_redis_t * redis,unsigned int * major_version,unsigned int * minor_version,unsigned int * patch_version)753 int pr_redis_conn_get_version(pr_redis_t *redis, unsigned int *major_version,
754     unsigned int *minor_version, unsigned int *patch_version) {
755 
756   if (redis == NULL) {
757     errno = EINVAL;
758     return -1;
759   }
760 
761   if (major_version == NULL &&
762       minor_version == NULL &&
763       patch_version == NULL) {
764     errno = EINVAL;
765     return -1;
766   }
767 
768   if (major_version != NULL) {
769     *major_version = redis->major_version;
770   }
771 
772   if (minor_version != NULL) {
773     *minor_version = redis->minor_version;
774   }
775 
776   if (patch_version != NULL) {
777     *patch_version = redis->patch_version;
778   }
779 
780   return 0;
781 }
782 
pr_redis_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,time_t expires)783 int pr_redis_add(pr_redis_t *redis, module *m, const char *key, void *value,
784     size_t valuesz, time_t expires) {
785   int res;
786 
787   if (key == NULL) {
788     errno = EINVAL;
789     return -1;
790   }
791 
792   res = pr_redis_kadd(redis, m, key, strlen(key), value, valuesz, expires);
793   if (res < 0) {
794     int xerrno = errno;
795 
796     pr_trace_msg(trace_channel, 2,
797       "error adding key '%s', value (%lu bytes): %s", key,
798       (unsigned long) valuesz, strerror(xerrno));
799 
800     errno = xerrno;
801     return -1;
802   }
803 
804   return 0;
805 }
806 
pr_redis_decr(pr_redis_t * redis,module * m,const char * key,uint32_t decr,uint64_t * value)807 int pr_redis_decr(pr_redis_t *redis, module *m, const char *key, uint32_t decr,
808     uint64_t *value) {
809   int res;
810 
811   if (key == NULL) {
812     errno = EINVAL;
813     return -1;
814   }
815 
816   res = pr_redis_kdecr(redis, m, key, strlen(key), decr, value);
817   if (res < 0) {
818     int xerrno = errno;
819 
820     pr_trace_msg(trace_channel, 2,
821       "error decrementing key '%s' by %lu: %s", key,
822       (unsigned long) decr, strerror(xerrno));
823 
824     errno = xerrno;
825     return -1;
826   }
827 
828   return 0;
829 }
830 
pr_redis_get(pool * p,pr_redis_t * redis,module * m,const char * key,size_t * valuesz)831 void *pr_redis_get(pool *p, pr_redis_t *redis, module *m, const char *key,
832     size_t *valuesz) {
833   void *ptr = NULL;
834 
835   if (key == NULL) {
836     errno = EINVAL;
837     return NULL;
838   }
839 
840   ptr = pr_redis_kget(p, redis, m, key, strlen(key), valuesz);
841   if (ptr == NULL) {
842     int xerrno = errno;
843 
844     pr_trace_msg(trace_channel, 2,
845       "error getting data for key '%s': %s", key, strerror(xerrno));
846 
847     errno = xerrno;
848     return NULL;
849   }
850 
851   return ptr;
852 }
853 
pr_redis_get_str(pool * p,pr_redis_t * redis,module * m,const char * key)854 char *pr_redis_get_str(pool *p, pr_redis_t *redis, module *m, const char *key) {
855   char *ptr = NULL;
856 
857   if (key == NULL) {
858     errno = EINVAL;
859     return NULL;
860   }
861 
862   ptr = pr_redis_kget_str(p, redis, m, key, strlen(key));
863   if (ptr == NULL) {
864     int xerrno = errno;
865 
866     pr_trace_msg(trace_channel, 2,
867       "error getting data for key '%s': %s", key, strerror(xerrno));
868 
869     errno = xerrno;
870     return NULL;
871   }
872 
873   return ptr;
874 }
875 
pr_redis_incr(pr_redis_t * redis,module * m,const char * key,uint32_t incr,uint64_t * value)876 int pr_redis_incr(pr_redis_t *redis, module *m, const char *key, uint32_t incr,
877     uint64_t *value) {
878   int res;
879 
880   if (key == NULL) {
881     errno = EINVAL;
882     return -1;
883   }
884 
885   res = pr_redis_kincr(redis, m, key, strlen(key), incr, value);
886   if (res < 0) {
887     int xerrno = errno;
888 
889     pr_trace_msg(trace_channel, 2,
890       "error incrementing key '%s' by %lu: %s", key,
891       (unsigned long) incr, strerror(xerrno));
892 
893     errno = xerrno;
894     return -1;
895   }
896 
897   return 0;
898 }
899 
pr_redis_remove(pr_redis_t * redis,module * m,const char * key)900 int pr_redis_remove(pr_redis_t *redis, module *m, const char *key) {
901   int res;
902 
903   if (key == NULL) {
904     errno = EINVAL;
905     return -1;
906   }
907 
908   res = pr_redis_kremove(redis, m, key, strlen(key));
909   if (res < 0) {
910     int xerrno = errno;
911 
912     pr_trace_msg(trace_channel, 2,
913       "error removing key '%s': %s", key, strerror(xerrno));
914 
915     errno = xerrno;
916     return -1;
917   }
918 
919   return 0;
920 }
921 
pr_redis_rename(pr_redis_t * redis,module * m,const char * from,const char * to)922 int pr_redis_rename(pr_redis_t *redis, module *m, const char *from,
923     const char *to) {
924   int res;
925 
926   if (from == NULL ||
927       to == NULL) {
928     errno = EINVAL;
929     return -1;
930   }
931 
932   res = pr_redis_krename(redis, m, from, strlen(from), to, strlen(to));
933   if (res < 0) {
934     int xerrno = errno;
935 
936     pr_trace_msg(trace_channel, 2,
937       "error renaming key '%s' to '%s': %s", from, to, strerror(xerrno));
938 
939     errno = xerrno;
940     return -1;
941   }
942 
943   return 0;
944 }
945 
pr_redis_set(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,time_t expires)946 int pr_redis_set(pr_redis_t *redis, module *m, const char *key, void *value,
947     size_t valuesz, time_t expires) {
948   int res;
949 
950   if (key == NULL) {
951     errno = EINVAL;
952     return -1;
953   }
954 
955   res = pr_redis_kset(redis, m, key, strlen(key), value, valuesz, expires);
956   if (res < 0) {
957     int xerrno = errno;
958 
959     pr_trace_msg(trace_channel, 2,
960       "error setting key '%s', value (%lu bytes): %s", key,
961       (unsigned long) valuesz, strerror(xerrno));
962 
963     errno = xerrno;
964     return -1;
965   }
966 
967   return 0;
968 }
969 
970 /* Hash operations */
pr_redis_hash_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)971 int pr_redis_hash_count(pr_redis_t *redis, module *m, const char *key,
972     uint64_t *count) {
973   int res;
974 
975   if (key == NULL) {
976     errno = EINVAL;
977     return -1;
978   }
979 
980   res = pr_redis_hash_kcount(redis, m, key, strlen(key), count);
981   if (res < 0) {
982     int xerrno = errno;
983 
984     pr_trace_msg(trace_channel, 2,
985       "error counting hash using key '%s': %s", key, strerror(xerrno));
986 
987     errno = xerrno;
988     return -1;
989   }
990 
991   return 0;
992 }
993 
pr_redis_hash_delete(pr_redis_t * redis,module * m,const char * key,const char * field)994 int pr_redis_hash_delete(pr_redis_t *redis, module *m, const char *key,
995     const char *field) {
996   int res;
997 
998   if (key == NULL ||
999       field == NULL) {
1000     errno = EINVAL;
1001     return -1;
1002   }
1003 
1004   res = pr_redis_hash_kdelete(redis, m, key, strlen(key), field, strlen(field));
1005   if (res < 0) {
1006     int xerrno = errno;
1007 
1008     pr_trace_msg(trace_channel, 2,
1009       "error deleting field from hash using key '%s', field '%s': %s", key,
1010       field, strerror(xerrno));
1011 
1012     errno = xerrno;
1013     return -1;
1014   }
1015 
1016   return 0;
1017 }
1018 
pr_redis_hash_exists(pr_redis_t * redis,module * m,const char * key,const char * field)1019 int pr_redis_hash_exists(pr_redis_t *redis, module *m, const char *key,
1020     const char *field) {
1021   int res;
1022 
1023   if (key == NULL ||
1024       field == NULL) {
1025     errno = EINVAL;
1026     return -1;
1027   }
1028 
1029   res = pr_redis_hash_kexists(redis, m, key, strlen(key), field, strlen(field));
1030   if (res < 0) {
1031     int xerrno = errno;
1032 
1033     pr_trace_msg(trace_channel, 2,
1034       "error checking existence of hash using key '%s', field '%s': %s", key,
1035       field, strerror(xerrno));
1036 
1037     errno = xerrno;
1038     return -1;
1039   }
1040 
1041   return res;
1042 }
1043 
pr_redis_hash_get(pool * p,pr_redis_t * redis,module * m,const char * key,const char * field,void ** value,size_t * valuesz)1044 int pr_redis_hash_get(pool *p, pr_redis_t *redis, module *m, const char *key,
1045     const char *field, void **value, size_t *valuesz) {
1046   int res;
1047 
1048   if (key == NULL ||
1049       field == NULL) {
1050     errno = EINVAL;
1051     return -1;
1052   }
1053 
1054   res = pr_redis_hash_kget(p, redis, m, key, strlen(key), field, strlen(field),
1055     value, valuesz);
1056   if (res < 0) {
1057     int xerrno = errno;
1058 
1059     pr_trace_msg(trace_channel, 2,
1060       "error getting field from hash using key '%s', field '%s': %s", key,
1061       field, strerror(xerrno));
1062 
1063     errno = xerrno;
1064     return -1;
1065   }
1066 
1067   return 0;
1068 }
1069 
pr_redis_hash_getall(pool * p,pr_redis_t * redis,module * m,const char * key,pr_table_t ** hash)1070 int pr_redis_hash_getall(pool *p, pr_redis_t *redis, module *m,
1071     const char *key, pr_table_t **hash) {
1072   int res;
1073 
1074   if (key == NULL) {
1075     errno = EINVAL;
1076     return -1;
1077   }
1078 
1079   res = pr_redis_hash_kgetall(p, redis, m, key, strlen(key), hash);
1080   if (res < 0) {
1081     int xerrno = errno;
1082 
1083     pr_trace_msg(trace_channel, 2,
1084       "error entire hash using key '%s': %s", key, strerror(xerrno));
1085 
1086     errno = xerrno;
1087     return -1;
1088   }
1089 
1090   return 0;
1091 }
1092 
pr_redis_hash_incr(pr_redis_t * redis,module * m,const char * key,const char * field,int32_t incr,int64_t * value)1093 int pr_redis_hash_incr(pr_redis_t *redis, module *m, const char *key,
1094     const char *field, int32_t incr, int64_t *value) {
1095   int res;
1096 
1097   if (key == NULL ||
1098       field == NULL) {
1099     errno = EINVAL;
1100     return -1;
1101   }
1102 
1103   res = pr_redis_hash_kincr(redis, m, key, strlen(key), field, strlen(field),
1104     incr, value);
1105   if (res < 0) {
1106     int xerrno = errno;
1107 
1108     pr_trace_msg(trace_channel, 2,
1109       "error incrementing field in hash using key '%s', field '%s': %s", key,
1110       field, strerror(xerrno));
1111 
1112     errno = xerrno;
1113     return -1;
1114   }
1115 
1116   return 0;
1117 }
1118 
pr_redis_hash_keys(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** fields)1119 int pr_redis_hash_keys(pool *p, pr_redis_t *redis, module *m, const char *key,
1120     array_header **fields) {
1121   int res;
1122 
1123   if (key == NULL) {
1124     errno = EINVAL;
1125     return -1;
1126   }
1127 
1128   res = pr_redis_hash_kkeys(p, redis, m, key, strlen(key), fields);
1129   if (res < 0) {
1130     int xerrno = errno;
1131 
1132     pr_trace_msg(trace_channel, 2,
1133       "error obtaining keys from hash using key '%s': %s", key,
1134       strerror(xerrno));
1135 
1136     errno = xerrno;
1137     return -1;
1138   }
1139 
1140   return 0;
1141 }
1142 
pr_redis_hash_remove(pr_redis_t * redis,module * m,const char * key)1143 int pr_redis_hash_remove(pr_redis_t *redis, module *m, const char *key) {
1144   int res;
1145 
1146   if (key == NULL) {
1147     errno = EINVAL;
1148     return -1;
1149   }
1150 
1151   res = pr_redis_hash_kremove(redis, m, key, strlen(key));
1152   if (res < 0) {
1153     int xerrno = errno;
1154 
1155     pr_trace_msg(trace_channel, 2,
1156       "error removing hash using key '%s': %s", key, strerror(xerrno));
1157 
1158     errno = xerrno;
1159     return -1;
1160   }
1161 
1162   return 0;
1163 }
1164 
pr_redis_hash_set(pr_redis_t * redis,module * m,const char * key,const char * field,void * value,size_t valuesz)1165 int pr_redis_hash_set(pr_redis_t *redis, module *m, const char *key,
1166     const char *field, void *value, size_t valuesz) {
1167   int res;
1168 
1169   if (key == NULL ||
1170       field == NULL) {
1171     errno = EINVAL;
1172     return -1;
1173   }
1174 
1175   res = pr_redis_hash_kset(redis, m, key, strlen(key), field, strlen(field),
1176     value, valuesz);
1177   if (res < 0) {
1178     int xerrno = errno;
1179 
1180     pr_trace_msg(trace_channel, 2,
1181       "error setting field in hash using key '%s', field '%s': %s", key, field,
1182       strerror(xerrno));
1183 
1184     errno = xerrno;
1185     return -1;
1186   }
1187 
1188   return 0;
1189 }
1190 
pr_redis_hash_setall(pr_redis_t * redis,module * m,const char * key,pr_table_t * hash)1191 int pr_redis_hash_setall(pr_redis_t *redis, module *m, const char *key,
1192     pr_table_t *hash) {
1193   int res;
1194 
1195   if (key == NULL) {
1196     errno = EINVAL;
1197     return -1;
1198   }
1199 
1200   res = pr_redis_hash_ksetall(redis, m, key, strlen(key), hash);
1201   if (res < 0) {
1202     int xerrno = errno;
1203 
1204     pr_trace_msg(trace_channel, 2,
1205       "error setting hash using key '%s': %s", key, strerror(xerrno));
1206 
1207     errno = xerrno;
1208     return -1;
1209   }
1210 
1211   return 0;
1212 }
1213 
pr_redis_hash_values(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values)1214 int pr_redis_hash_values(pool *p, pr_redis_t *redis, module *m,
1215     const char *key, array_header **values) {
1216   int res;
1217 
1218   if (key == NULL) {
1219     errno = EINVAL;
1220     return -1;
1221   }
1222 
1223   res = pr_redis_hash_kvalues(p, redis, m, key, strlen(key), values);
1224   if (res < 0) {
1225     int xerrno = errno;
1226 
1227     pr_trace_msg(trace_channel, 2,
1228       "error getting values of hash using key '%s': %s", key, strerror(xerrno));
1229 
1230     errno = xerrno;
1231     return -1;
1232   }
1233 
1234   return 0;
1235 }
1236 
1237 /* List operations */
pr_redis_list_append(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1238 int pr_redis_list_append(pr_redis_t *redis, module *m, const char *key,
1239     void *value, size_t valuesz) {
1240   int res;
1241 
1242   if (key == NULL) {
1243     errno = EINVAL;
1244     return -1;
1245   }
1246 
1247   res = pr_redis_list_kappend(redis, m, key, strlen(key), value, valuesz);
1248   if (res < 0) {
1249     int xerrno = errno;
1250 
1251     pr_trace_msg(trace_channel, 2,
1252       "error appending to list using key '%s': %s", key, strerror(xerrno));
1253 
1254     errno = xerrno;
1255     return -1;
1256   }
1257 
1258   return 0;
1259 }
1260 
pr_redis_list_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)1261 int pr_redis_list_count(pr_redis_t *redis, module *m, const char *key,
1262     uint64_t *count) {
1263   int res;
1264 
1265   if (key == NULL) {
1266     errno = EINVAL;
1267     return -1;
1268   }
1269 
1270   res = pr_redis_list_kcount(redis, m, key, strlen(key), count);
1271   if (res < 0) {
1272     int xerrno = errno;
1273 
1274     pr_trace_msg(trace_channel, 2,
1275       "error counting list using key '%s': %s", key, strerror(xerrno));
1276 
1277     errno = xerrno;
1278     return -1;
1279   }
1280 
1281   return 0;
1282 }
1283 
pr_redis_list_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1284 int pr_redis_list_delete(pr_redis_t *redis, module *m, const char *key,
1285     void *value, size_t valuesz) {
1286   int res;
1287 
1288   if (value == NULL) {
1289     errno = EINVAL;
1290     return -1;
1291   }
1292 
1293   res = pr_redis_list_kdelete(redis, m, key, strlen(key), value, valuesz);
1294   if (res < 0) {
1295     int xerrno = errno;
1296 
1297     pr_trace_msg(trace_channel, 2,
1298       "error deleting item from list using key '%s': %s", key,
1299       strerror(xerrno));
1300 
1301     errno = xerrno;
1302     return -1;
1303   }
1304 
1305   return 0;
1306 }
1307 
pr_redis_list_exists(pr_redis_t * redis,module * m,const char * key,unsigned int idx)1308 int pr_redis_list_exists(pr_redis_t *redis, module *m, const char *key,
1309     unsigned int idx) {
1310   int res;
1311 
1312   if (key == NULL) {
1313     errno = EINVAL;
1314     return -1;
1315   }
1316 
1317   res = pr_redis_list_kexists(redis, m, key, strlen(key), idx);
1318   if (res < 0) {
1319     int xerrno = errno;
1320 
1321     pr_trace_msg(trace_channel, 2,
1322       "error checking item at index %u in list using key '%s': %s", idx, key,
1323       strerror(xerrno));
1324 
1325     errno = xerrno;
1326     return -1;
1327   }
1328 
1329   return res;
1330 }
1331 
pr_redis_list_get(pool * p,pr_redis_t * redis,module * m,const char * key,unsigned int idx,void ** value,size_t * valuesz)1332 int pr_redis_list_get(pool *p, pr_redis_t *redis, module *m, const char *key,
1333     unsigned int idx, void **value, size_t *valuesz) {
1334   int res;
1335 
1336   if (key == NULL) {
1337     errno = EINVAL;
1338     return -1;
1339   }
1340 
1341   res = pr_redis_list_kget(p, redis, m, key, strlen(key), idx, value, valuesz);
1342   if (res < 0) {
1343     int xerrno = errno;
1344 
1345     pr_trace_msg(trace_channel, 2,
1346       "error getting item at index %u in list using key '%s': %s", idx, key,
1347       strerror(xerrno));
1348 
1349     errno = xerrno;
1350     return -1;
1351   }
1352 
1353   return res;
1354 }
1355 
pr_redis_list_getall(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values,array_header ** valueszs)1356 int pr_redis_list_getall(pool *p, pr_redis_t *redis, module *m, const char *key,
1357     array_header **values, array_header **valueszs) {
1358   int res;
1359 
1360   if (key == NULL) {
1361     errno = EINVAL;
1362     return -1;
1363   }
1364 
1365   res = pr_redis_list_kgetall(p, redis, m, key, strlen(key), values, valueszs);
1366   if (res < 0) {
1367     int xerrno = errno;
1368 
1369     pr_trace_msg(trace_channel, 2,
1370       "error getting items in list using key '%s': %s", key, strerror(xerrno));
1371 
1372     errno = xerrno;
1373     return -1;
1374   }
1375 
1376   return res;
1377 }
1378 
pr_redis_list_pop(pool * p,pr_redis_t * redis,module * m,const char * key,void ** value,size_t * valuesz,int flags)1379 int pr_redis_list_pop(pool *p, pr_redis_t *redis, module *m, const char *key,
1380     void **value, size_t *valuesz, int flags) {
1381   int res;
1382 
1383   if (key == NULL) {
1384     errno = EINVAL;
1385     return -1;
1386   }
1387 
1388   res = pr_redis_list_kpop(p, redis, m, key, strlen(key), value, valuesz,
1389     flags);
1390   if (res < 0) {
1391     int xerrno = errno;
1392 
1393     pr_trace_msg(trace_channel, 2,
1394       "error popping item from list using key '%s': %s", key, strerror(xerrno));
1395 
1396     errno = xerrno;
1397     return -1;
1398   }
1399 
1400   return res;
1401 }
1402 
pr_redis_list_push(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,int flags)1403 int pr_redis_list_push(pr_redis_t *redis, module *m, const char *key,
1404     void *value, size_t valuesz, int flags) {
1405   int res;
1406 
1407   if (key == NULL) {
1408     errno = EINVAL;
1409     return -1;
1410   }
1411 
1412   res = pr_redis_list_kpush(redis, m, key, strlen(key), value, valuesz, flags);
1413   if (res < 0) {
1414     int xerrno = errno;
1415 
1416     pr_trace_msg(trace_channel, 2,
1417       "error pushing item into list using key '%s': %s", key, strerror(xerrno));
1418 
1419     errno = xerrno;
1420     return -1;
1421   }
1422 
1423   return 0;
1424 }
1425 
pr_redis_list_remove(pr_redis_t * redis,module * m,const char * key)1426 int pr_redis_list_remove(pr_redis_t *redis, module *m, const char *key) {
1427   int res;
1428 
1429   if (key == NULL) {
1430     errno = EINVAL;
1431     return -1;
1432   }
1433 
1434   res = pr_redis_list_kremove(redis, m, key, strlen(key));
1435   if (res < 0) {
1436     int xerrno = errno;
1437 
1438     pr_trace_msg(trace_channel, 2,
1439       "error removing list using key '%s': %s", key, strerror(xerrno));
1440 
1441     errno = xerrno;
1442     return -1;
1443   }
1444 
1445   return 0;
1446 }
1447 
pr_redis_list_rotate(pool * p,pr_redis_t * redis,module * m,const char * key,void ** value,size_t * valuesz)1448 int pr_redis_list_rotate(pool *p, pr_redis_t *redis, module *m,
1449     const char *key, void **value, size_t *valuesz) {
1450   int res;
1451 
1452   if (key == NULL) {
1453     errno = EINVAL;
1454     return -1;
1455   }
1456 
1457   res = pr_redis_list_krotate(p, redis, m, key, strlen(key), value, valuesz);
1458   if (res < 0) {
1459     int xerrno = errno;
1460 
1461     pr_trace_msg(trace_channel, 2,
1462       "error rotating list using key '%s': %s", key, strerror(xerrno));
1463 
1464     errno = xerrno;
1465     return -1;
1466   }
1467 
1468   return 0;
1469 }
1470 
pr_redis_list_set(pr_redis_t * redis,module * m,const char * key,unsigned int idx,void * value,size_t valuesz)1471 int pr_redis_list_set(pr_redis_t *redis, module *m, const char *key,
1472     unsigned int idx, void *value, size_t valuesz) {
1473   int res;
1474 
1475   if (key == NULL) {
1476     errno = EINVAL;
1477     return -1;
1478   }
1479 
1480   res = pr_redis_list_kset(redis, m, key, strlen(key), idx, value, valuesz);
1481   if (res < 0) {
1482     int xerrno = errno;
1483 
1484     pr_trace_msg(trace_channel, 2,
1485       "error setting item in list using key '%s', index %u: %s", key, idx,
1486       strerror(xerrno));
1487 
1488     errno = xerrno;
1489     return -1;
1490   }
1491 
1492   return 0;
1493 }
1494 
pr_redis_list_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs)1495 int pr_redis_list_setall(pr_redis_t *redis, module *m, const char *key,
1496     array_header *values, array_header *valueszs) {
1497   int res;
1498 
1499   if (key == NULL) {
1500     errno = EINVAL;
1501     return -1;
1502   }
1503 
1504   res = pr_redis_list_ksetall(redis, m, key, strlen(key), values, valueszs);
1505   if (res < 0) {
1506     int xerrno = errno;
1507 
1508     pr_trace_msg(trace_channel, 2,
1509       "error setting items in list using key '%s': %s", key, strerror(xerrno));
1510 
1511     errno = xerrno;
1512     return -1;
1513   }
1514 
1515   return 0;
1516 }
1517 
1518 /* Set operations */
pr_redis_set_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1519 int pr_redis_set_add(pr_redis_t *redis, module *m, const char *key,
1520     void *value, size_t valuesz) {
1521   int res;
1522 
1523   if (key == NULL) {
1524     errno = EINVAL;
1525     return -1;
1526   }
1527 
1528   res = pr_redis_set_kadd(redis, m, key, strlen(key), value, valuesz);
1529   if (res < 0) {
1530     int xerrno = errno;
1531 
1532     pr_trace_msg(trace_channel, 2,
1533       "error adding item to set using key '%s': %s", key, strerror(xerrno));
1534 
1535     errno = xerrno;
1536     return -1;
1537   }
1538 
1539   return 0;
1540 }
1541 
pr_redis_set_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)1542 int pr_redis_set_count(pr_redis_t *redis, module *m, const char *key,
1543     uint64_t *count) {
1544   int res;
1545 
1546   if (key == NULL) {
1547     errno = EINVAL;
1548     return -1;
1549   }
1550 
1551   res = pr_redis_set_kcount(redis, m, key, strlen(key), count);
1552   if (res < 0) {
1553     int xerrno = errno;
1554 
1555     pr_trace_msg(trace_channel, 2,
1556       "error counting set using key '%s': %s", key, strerror(xerrno));
1557 
1558     errno = xerrno;
1559     return -1;
1560   }
1561 
1562   return 0;
1563 }
1564 
pr_redis_set_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1565 int pr_redis_set_delete(pr_redis_t *redis, module *m, const char *key,
1566     void *value, size_t valuesz) {
1567   int res;
1568 
1569   if (key == NULL) {
1570     errno = EINVAL;
1571     return -1;
1572   }
1573 
1574   res = pr_redis_set_kdelete(redis, m, key, strlen(key), value, valuesz);
1575   if (res < 0) {
1576     int xerrno = errno;
1577 
1578     pr_trace_msg(trace_channel, 2,
1579       "error deleting item from set using key '%s': %s", key, strerror(xerrno));
1580 
1581     errno = xerrno;
1582     return -1;
1583   }
1584 
1585   return 0;
1586 }
1587 
pr_redis_set_exists(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1588 int pr_redis_set_exists(pr_redis_t *redis, module *m, const char *key,
1589     void *value, size_t valuesz) {
1590   int res;
1591 
1592   if (key == NULL) {
1593     errno = EINVAL;
1594     return -1;
1595   }
1596 
1597   res = pr_redis_set_kexists(redis, m, key, strlen(key), value, valuesz);
1598   if (res < 0) {
1599     int xerrno = errno;
1600 
1601     pr_trace_msg(trace_channel, 2,
1602       "error checking item in set using key '%s': %s", key, strerror(xerrno));
1603 
1604     errno = xerrno;
1605     return -1;
1606   }
1607 
1608   return res;
1609 }
1610 
pr_redis_set_getall(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values,array_header ** valueszs)1611 int pr_redis_set_getall(pool *p, pr_redis_t *redis, module *m, const char *key,
1612     array_header **values, array_header **valueszs) {
1613   int res;
1614 
1615   if (key == NULL) {
1616     errno = EINVAL;
1617     return -1;
1618   }
1619 
1620   res = pr_redis_set_kgetall(p, redis, m, key, strlen(key), values, valueszs);
1621   if (res < 0) {
1622     int xerrno = errno;
1623 
1624     pr_trace_msg(trace_channel, 2,
1625       "error getting items in set using key '%s': %s", key, strerror(xerrno));
1626 
1627     errno = xerrno;
1628     return -1;
1629   }
1630 
1631   return res;
1632 }
1633 
pr_redis_set_remove(pr_redis_t * redis,module * m,const char * key)1634 int pr_redis_set_remove(pr_redis_t *redis, module *m, const char *key) {
1635   int res;
1636 
1637   if (key == NULL) {
1638     errno = EINVAL;
1639     return -1;
1640   }
1641 
1642   res = pr_redis_set_kremove(redis, m, key, strlen(key));
1643   if (res < 0) {
1644     int xerrno = errno;
1645 
1646     pr_trace_msg(trace_channel, 2,
1647       "error removing set using key '%s': %s", key, strerror(xerrno));
1648 
1649     errno = xerrno;
1650     return -1;
1651   }
1652 
1653   return 0;
1654 }
1655 
pr_redis_set_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs)1656 int pr_redis_set_setall(pr_redis_t *redis, module *m, const char *key,
1657     array_header *values, array_header *valueszs) {
1658   int res;
1659 
1660   if (key == NULL) {
1661     errno = EINVAL;
1662     return -1;
1663   }
1664 
1665   res = pr_redis_set_ksetall(redis, m, key, strlen(key), values, valueszs);
1666   if (res < 0) {
1667     int xerrno = errno;
1668 
1669     pr_trace_msg(trace_channel, 2,
1670       "error setting items in set using key '%s': %s", key, strerror(xerrno));
1671 
1672     errno = xerrno;
1673     return -1;
1674   }
1675 
1676   return 0;
1677 }
1678 
1679 /* Sorted Set operations */
pr_redis_sorted_set_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float score)1680 int pr_redis_sorted_set_add(pr_redis_t *redis, module *m, const char *key,
1681     void *value, size_t valuesz, float score) {
1682   int res;
1683 
1684   if (key == NULL) {
1685     errno = EINVAL;
1686     return -1;
1687   }
1688 
1689   res = pr_redis_sorted_set_kadd(redis, m, key, strlen(key), value, valuesz,
1690     score);
1691   if (res < 0) {
1692     int xerrno = errno;
1693 
1694     pr_trace_msg(trace_channel, 2,
1695       "error adding item with score %0.3f to sorted set using key '%s': %s",
1696       score, key, strerror(xerrno));
1697 
1698     errno = xerrno;
1699     return -1;
1700   }
1701 
1702   return 0;
1703 }
1704 
pr_redis_sorted_set_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)1705 int pr_redis_sorted_set_count(pr_redis_t *redis, module *m, const char *key,
1706     uint64_t *count) {
1707   int res;
1708 
1709   if (key == NULL) {
1710     errno = EINVAL;
1711     return -1;
1712   }
1713 
1714   res = pr_redis_sorted_set_kcount(redis, m, key, strlen(key), count);
1715   if (res < 0) {
1716     int xerrno = errno;
1717 
1718     pr_trace_msg(trace_channel, 2,
1719       "error counting sorted set using key '%s': %s", key, strerror(xerrno));
1720 
1721     errno = xerrno;
1722     return -1;
1723   }
1724 
1725   return 0;
1726 }
1727 
pr_redis_sorted_set_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1728 int pr_redis_sorted_set_delete(pr_redis_t *redis, module *m, const char *key,
1729     void *value, size_t valuesz) {
1730   int res;
1731 
1732   if (key == NULL) {
1733     errno = EINVAL;
1734     return -1;
1735   }
1736 
1737   res = pr_redis_sorted_set_kdelete(redis, m, key, strlen(key), value, valuesz);
1738   if (res < 0) {
1739     int xerrno = errno;
1740 
1741     pr_trace_msg(trace_channel, 2,
1742       "error deleting item from sorted set using key '%s': %s", key,
1743       strerror(xerrno));
1744 
1745     errno = xerrno;
1746     return -1;
1747   }
1748 
1749   return 0;
1750 }
1751 
pr_redis_sorted_set_exists(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1752 int pr_redis_sorted_set_exists(pr_redis_t *redis, module *m, const char *key,
1753     void *value, size_t valuesz) {
1754   int res;
1755 
1756   if (key == NULL) {
1757     errno = EINVAL;
1758     return -1;
1759   }
1760 
1761   res = pr_redis_sorted_set_kexists(redis, m, key, strlen(key), value, valuesz);
1762   if (res < 0) {
1763     int xerrno = errno;
1764 
1765     pr_trace_msg(trace_channel, 2,
1766       "error checking item in sorted set using key '%s': %s", key,
1767       strerror(xerrno));
1768 
1769     errno = xerrno;
1770     return -1;
1771   }
1772 
1773   return res;
1774 }
1775 
pr_redis_sorted_set_getn(pool * p,pr_redis_t * redis,module * m,const char * key,unsigned int offset,unsigned int len,array_header ** values,array_header ** valueszs,int flags)1776 int pr_redis_sorted_set_getn(pool *p, pr_redis_t *redis, module *m,
1777     const char *key, unsigned int offset, unsigned int len,
1778     array_header **values, array_header **valueszs, int flags) {
1779   int res;
1780 
1781   if (key == NULL) {
1782     errno = EINVAL;
1783     return -1;
1784   }
1785 
1786   res = pr_redis_sorted_set_kgetn(p, redis, m, key, strlen(key), offset, len,
1787     values, valueszs, flags);
1788   if (res < 0) {
1789     int xerrno = errno;
1790 
1791     pr_trace_msg(trace_channel, 2,
1792       "error getting %u %s from sorted set using key '%s': %s", len,
1793       len != 1 ? "items" : "item", key, strerror(xerrno));
1794 
1795     errno = xerrno;
1796     return -1;
1797   }
1798 
1799   return res;
1800 }
1801 
pr_redis_sorted_set_incr(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float incr,float * score)1802 int pr_redis_sorted_set_incr(pr_redis_t *redis, module *m, const char *key,
1803     void *value, size_t valuesz, float incr, float *score) {
1804   int res;
1805 
1806   if (key == NULL) {
1807     errno = EINVAL;
1808     return -1;
1809   }
1810 
1811   res = pr_redis_sorted_set_kincr(redis, m, key, strlen(key), value, valuesz,
1812     incr, score);
1813   if (res < 0) {
1814     int xerrno = errno;
1815 
1816     pr_trace_msg(trace_channel, 2,
1817       "error incrementing item by %0.3f in sorted set using key '%s': %s",
1818       incr, key, strerror(xerrno));
1819 
1820     errno = xerrno;
1821     return -1;
1822   }
1823 
1824   return res;
1825 }
1826 
pr_redis_sorted_set_remove(pr_redis_t * redis,module * m,const char * key)1827 int pr_redis_sorted_set_remove(pr_redis_t *redis, module *m, const char *key) {
1828   int res;
1829 
1830   if (key == NULL) {
1831     errno = EINVAL;
1832     return -1;
1833   }
1834 
1835   res = pr_redis_sorted_set_kremove(redis, m, key, strlen(key));
1836   if (res < 0) {
1837     int xerrno = errno;
1838 
1839     pr_trace_msg(trace_channel, 2,
1840       "error removing sorted set using key '%s': %s", key, strerror(xerrno));
1841 
1842     errno = xerrno;
1843     return -1;
1844   }
1845 
1846   return 0;
1847 }
1848 
pr_redis_sorted_set_score(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float * score)1849 int pr_redis_sorted_set_score(pr_redis_t *redis, module *m, const char *key,
1850     void *value, size_t valuesz, float *score) {
1851   int res;
1852 
1853   if (key == NULL) {
1854     errno = EINVAL;
1855     return -1;
1856   }
1857 
1858   res = pr_redis_sorted_set_kscore(redis, m, key, strlen(key), value, valuesz,
1859     score);
1860   if (res < 0) {
1861     int xerrno = errno;
1862 
1863     pr_trace_msg(trace_channel, 2,
1864       "error getting score for item in sorted set using key '%s': %s", key,
1865       strerror(xerrno));
1866 
1867     errno = xerrno;
1868     return -1;
1869   }
1870 
1871   return res;
1872 }
1873 
pr_redis_sorted_set_set(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float score)1874 int pr_redis_sorted_set_set(pr_redis_t *redis, module *m, const char *key,
1875     void *value, size_t valuesz, float score) {
1876   int res;
1877 
1878   if (key == NULL) {
1879     errno = EINVAL;
1880     return -1;
1881   }
1882 
1883   res = pr_redis_sorted_set_kset(redis, m, key, strlen(key), value, valuesz,
1884     score);
1885   if (res < 0) {
1886     int xerrno = errno;
1887 
1888     pr_trace_msg(trace_channel, 2,
1889       "error setting item to score %0.3f in sorted set using key '%s': %s",
1890       score, key, strerror(xerrno));
1891 
1892     errno = xerrno;
1893     return -1;
1894   }
1895 
1896   return 0;
1897 }
1898 
pr_redis_sorted_set_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs,array_header * scores)1899 int pr_redis_sorted_set_setall(pr_redis_t *redis, module *m, const char *key,
1900     array_header *values, array_header *valueszs, array_header *scores) {
1901   int res;
1902 
1903   if (key == NULL) {
1904     errno = EINVAL;
1905     return -1;
1906   }
1907 
1908   res = pr_redis_sorted_set_ksetall(redis, m, key, strlen(key), values,
1909     valueszs, scores);
1910   if (res < 0) {
1911     int xerrno = errno;
1912 
1913     pr_trace_msg(trace_channel, 2,
1914       "error setting items in sorted set using key '%s': %s", key,
1915       strerror(xerrno));
1916 
1917     errno = xerrno;
1918     return -1;
1919   }
1920 
1921   return 0;
1922 }
1923 
get_namespace_key(pool * p,pr_redis_t * redis,module * m,const char * key,size_t * keysz)1924 static const char *get_namespace_key(pool *p, pr_redis_t *redis, module *m,
1925     const char *key, size_t *keysz) {
1926 
1927   if (m != NULL &&
1928       redis->namespace_tab != NULL) {
1929     const char *prefix = NULL;
1930     size_t prefixsz = 0;
1931 
1932     prefix = pr_table_kget(redis->namespace_tab, m, sizeof(module *),
1933       &prefixsz);
1934     if (prefix != NULL) {
1935       char *new_key;
1936       size_t new_keysz;
1937 
1938       pr_trace_msg(trace_channel, 25,
1939         "using namespace prefix '%s' for module 'mod_%s.c'", prefix, m->name);
1940 
1941       /* Since the given key may not be text, we cannot simply use pstrcat()
1942        * to prepend our namespace value.
1943        */
1944       new_keysz = prefixsz + *keysz;
1945       new_key = palloc(p, new_keysz);
1946       memcpy(new_key, prefix, prefixsz);
1947       memcpy(new_key + prefixsz, key, *keysz);
1948 
1949       key = new_key;
1950       *keysz = new_keysz;
1951     }
1952   }
1953 
1954   return key;
1955 }
1956 
get_reply_type(int reply_type)1957 static const char *get_reply_type(int reply_type) {
1958   const char *type_name;
1959 
1960   switch (reply_type) {
1961     case REDIS_REPLY_STRING:
1962       type_name = "STRING";
1963       break;
1964 
1965     case REDIS_REPLY_ARRAY:
1966       type_name = "ARRAY";
1967       break;
1968 
1969     case REDIS_REPLY_INTEGER:
1970       type_name = "INTEGER";
1971       break;
1972 
1973     case REDIS_REPLY_NIL:
1974       type_name = "NIL";
1975       break;
1976 
1977     case REDIS_REPLY_STATUS:
1978       type_name = "STATUS";
1979       break;
1980 
1981     case REDIS_REPLY_ERROR:
1982       type_name = "ERROR";
1983       break;
1984 
1985     default:
1986       type_name = "unknown";
1987   }
1988 
1989   return type_name;
1990 }
1991 
pr_redis_command(pr_redis_t * redis,const array_header * args,int reply_type)1992 int pr_redis_command(pr_redis_t *redis, const array_header *args,
1993     int reply_type) {
1994   register unsigned int i;
1995   pool *tmp_pool = NULL;
1996   array_header *arglens;
1997   const char *cmd = NULL;
1998   redisReply *reply;
1999   int redis_reply_type;
2000 
2001   if (redis == NULL ||
2002       args == NULL ||
2003       args->nelts == 0) {
2004     errno = EINVAL;
2005     return -1;
2006   }
2007 
2008   switch (reply_type) {
2009     case PR_REDIS_REPLY_TYPE_STRING:
2010       redis_reply_type = REDIS_REPLY_STRING;
2011       break;
2012 
2013     case PR_REDIS_REPLY_TYPE_INTEGER:
2014       redis_reply_type = REDIS_REPLY_INTEGER;
2015       break;
2016 
2017     case PR_REDIS_REPLY_TYPE_NIL:
2018       redis_reply_type = REDIS_REPLY_NIL;
2019       break;
2020 
2021     case PR_REDIS_REPLY_TYPE_ARRAY:
2022       redis_reply_type = REDIS_REPLY_ARRAY;
2023       break;
2024 
2025     case PR_REDIS_REPLY_TYPE_STATUS:
2026       redis_reply_type = REDIS_REPLY_STATUS;
2027       break;
2028 
2029     case PR_REDIS_REPLY_TYPE_ERROR:
2030       redis_reply_type = REDIS_REPLY_ERROR;
2031       break;
2032 
2033     default:
2034       errno = EINVAL;
2035       return -1;
2036   }
2037 
2038   tmp_pool = make_sub_pool(redis->pool);
2039   pr_pool_tag(tmp_pool, "Redis Command pool");
2040 
2041   arglens = make_array(tmp_pool, args->nelts, sizeof(size_t));
2042   for (i = 0; i < args->nelts; i++) {
2043     pr_signals_handle();
2044     *((size_t *) push_array(arglens)) = strlen(((char **) args->elts)[i]);
2045   }
2046 
2047   cmd = ((char **) args->elts)[0];
2048   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2049   reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
2050   reply = handle_reply(redis, cmd, reply);
2051   if (reply == NULL) {
2052     destroy_pool(tmp_pool);
2053     errno = EIO;
2054     return -1;
2055   }
2056 
2057   if (reply->type != redis_reply_type) {
2058     pr_trace_msg(trace_channel, 2,
2059       "expected %s reply for %s, got %s", get_reply_type(redis_reply_type), cmd,
2060       get_reply_type(reply->type));
2061 
2062     if (reply->type == REDIS_REPLY_ERROR) {
2063       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2064     }
2065 
2066     freeReplyObject(reply);
2067     destroy_pool(tmp_pool);
2068     errno = EINVAL;
2069     return -1;
2070   }
2071 
2072   switch (reply->type) {
2073     case REDIS_REPLY_STRING:
2074     case REDIS_REPLY_STATUS:
2075     case REDIS_REPLY_ERROR:
2076       pr_trace_msg(trace_channel, 7, "%s %s reply: %.*s", cmd,
2077         get_reply_type(reply->type), (int) reply->len, reply->str);
2078       break;
2079 
2080     case REDIS_REPLY_INTEGER:
2081       pr_trace_msg(trace_channel, 7, "%s INTEGER reply: %lld", cmd,
2082         reply->integer);
2083       break;
2084 
2085     case REDIS_REPLY_NIL:
2086       pr_trace_msg(trace_channel, 7, "%s NIL reply", cmd);
2087       break;
2088 
2089     case REDIS_REPLY_ARRAY:
2090       pr_trace_msg(trace_channel, 7, "%s ARRAY reply: (%lu %s)", cmd,
2091         (unsigned long) reply->elements,
2092         reply->elements != 1 ? "elements" : "element");
2093       break;
2094 
2095     default:
2096       break;
2097   }
2098 
2099   freeReplyObject(reply);
2100   destroy_pool(tmp_pool);
2101   return 0;
2102 }
2103 
pr_redis_auth2(pr_redis_t * redis,const char * username,const char * password)2104 int pr_redis_auth2(pr_redis_t *redis, const char *username,
2105     const char *password) {
2106   const char *cmd;
2107   pool *tmp_pool;
2108   redisReply *reply;
2109 
2110   if (redis == NULL ||
2111       username == NULL ||
2112       password == NULL) {
2113     errno = EINVAL;
2114     return -1;
2115   }
2116 
2117   tmp_pool = make_sub_pool(redis->pool);
2118   pr_pool_tag(tmp_pool, "Redis AUTH pool");
2119 
2120   cmd = "AUTH";
2121   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2122 
2123   /* Redis 6.x changed the AUTH semantics, now requiring a username. */
2124   if (redis->major_version >= 6) {
2125     reply = redisCommand(redis->ctx, "%s %s %s", cmd, username, password);
2126 
2127   } else {
2128     reply = redisCommand(redis->ctx, "%s %s", cmd, password);
2129   }
2130 
2131   reply = handle_reply(redis, cmd, reply);
2132   if (reply == NULL) {
2133     pr_trace_msg(trace_channel, 2,
2134       "error authenticating client: %s", strerror(errno));
2135     destroy_pool(tmp_pool);
2136     errno = EIO;
2137     return -1;
2138   }
2139 
2140   if (reply->type != REDIS_REPLY_STRING &&
2141       reply->type != REDIS_REPLY_STATUS) {
2142     pr_trace_msg(trace_channel, 2,
2143       "expected STRING or STATUS reply for %s, got %s", cmd,
2144       get_reply_type(reply->type));
2145 
2146     if (reply->type == REDIS_REPLY_ERROR) {
2147       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2148     }
2149 
2150     freeReplyObject(reply);
2151     destroy_pool(tmp_pool);
2152     errno = EINVAL;
2153     return -1;
2154   }
2155 
2156   pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2157     reply->str);
2158 
2159   freeReplyObject(reply);
2160   destroy_pool(tmp_pool);
2161   return 0;
2162 }
2163 
pr_redis_auth(pr_redis_t * redis,const char * password)2164 int pr_redis_auth(pr_redis_t *redis, const char *password) {
2165   return pr_redis_auth2(redis, "default", password);
2166 }
2167 
pr_redis_select(pr_redis_t * redis,const char * db_idx)2168 int pr_redis_select(pr_redis_t *redis, const char *db_idx) {
2169   const char *cmd;
2170   pool *tmp_pool;
2171   redisReply *reply;
2172 
2173   if (redis == NULL ||
2174       db_idx == NULL) {
2175     errno = EINVAL;
2176     return -1;
2177   }
2178 
2179   tmp_pool = make_sub_pool(redis->pool);
2180   pr_pool_tag(tmp_pool, "Redis SELECT pool");
2181 
2182   cmd = "SELECT";
2183   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2184   reply = redisCommand(redis->ctx, "%s %s", cmd, db_idx);
2185   reply = handle_reply(redis, cmd, reply);
2186   if (reply == NULL) {
2187     pr_trace_msg(trace_channel, 2,
2188       "error selecting database '%s': %s", db_idx, strerror(errno));
2189     destroy_pool(tmp_pool);
2190     errno = EIO;
2191     return -1;
2192   }
2193 
2194   if (reply->type != REDIS_REPLY_STRING &&
2195       reply->type != REDIS_REPLY_STATUS) {
2196     pr_trace_msg(trace_channel, 2,
2197       "expected STRING or STATUS reply for %s, got %s", cmd,
2198       get_reply_type(reply->type));
2199 
2200     if (reply->type == REDIS_REPLY_ERROR) {
2201       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2202     }
2203 
2204     freeReplyObject(reply);
2205     destroy_pool(tmp_pool);
2206     errno = EINVAL;
2207     return -1;
2208   }
2209 
2210   pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2211     reply->str);
2212 
2213   freeReplyObject(reply);
2214   destroy_pool(tmp_pool);
2215   return 0;
2216 }
2217 
pr_redis_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,time_t expires)2218 int pr_redis_kadd(pr_redis_t *redis, module *m, const char *key, size_t keysz,
2219     void *value, size_t valuesz, time_t expires) {
2220   return pr_redis_kset(redis, m, key, keysz, value, valuesz, expires);
2221 }
2222 
pr_redis_kdecr(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint32_t decr,uint64_t * value)2223 int pr_redis_kdecr(pr_redis_t *redis, module *m, const char *key, size_t keysz,
2224     uint32_t decr, uint64_t *value) {
2225   pool *tmp_pool = NULL;
2226   const char *cmd = NULL;
2227   redisReply *reply;
2228 
2229   if (redis == NULL ||
2230       m == NULL ||
2231       key == NULL ||
2232       decr == 0) {
2233     errno = EINVAL;
2234     return -1;
2235   }
2236 
2237   tmp_pool = make_sub_pool(redis->pool);
2238   pr_pool_tag(tmp_pool, "Redis DECRBY pool");
2239 
2240   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2241 
2242   cmd = "DECRBY";
2243   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2244   reply = redisCommand(redis->ctx, "%s %b %lu", cmd, key, keysz,
2245     (unsigned long) decr);
2246   reply = handle_reply(redis, cmd, reply);
2247   if (reply == NULL) {
2248     pr_trace_msg(trace_channel, 2,
2249       "error decrementing key (%lu bytes) by %lu using %s: %s",
2250       (unsigned long) keysz, (unsigned long) decr, cmd, strerror(errno));
2251     destroy_pool(tmp_pool);
2252     errno = EIO;
2253     return -1;
2254   }
2255 
2256   if (reply->type != REDIS_REPLY_INTEGER) {
2257     pr_trace_msg(trace_channel, 2,
2258       "expected INTEGER reply for %s, got %s", cmd,
2259       get_reply_type(reply->type));
2260 
2261     if (reply->type == REDIS_REPLY_ERROR) {
2262       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2263     }
2264 
2265     freeReplyObject(reply);
2266     destroy_pool(tmp_pool);
2267     errno = EINVAL;
2268     return -1;
2269   }
2270 
2271   /* Note: DECRBY will automatically set the key value to zero if it does
2272    * not already exist.  To detect a nonexistent key, then, we look to
2273    * see if the return value is exactly our requested decrement.  If so,
2274    * REMOVE the auto-created key, and return ENOENT.
2275    */
2276   if ((decr * -1) == (uint32_t) reply->integer) {
2277     freeReplyObject(reply);
2278     destroy_pool(tmp_pool);
2279     (void) pr_redis_kremove(redis, m, key, keysz);
2280     errno = ENOENT;
2281     return -1;
2282   }
2283 
2284   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2285 
2286   if (value != NULL) {
2287     *value = (uint64_t) reply->integer;
2288   }
2289 
2290   freeReplyObject(reply);
2291   destroy_pool(tmp_pool);
2292   return 0;
2293 }
2294 
pr_redis_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,size_t * valuesz)2295 void *pr_redis_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
2296     size_t keysz, size_t *valuesz) {
2297   const char *cmd;
2298   pool *tmp_pool;
2299   redisReply *reply;
2300   char *data = NULL;
2301 
2302   if (p == NULL ||
2303       redis == NULL ||
2304       m == NULL ||
2305       key == NULL ||
2306       valuesz == NULL) {
2307     errno = EINVAL;
2308     return NULL;
2309   }
2310 
2311   tmp_pool = make_sub_pool(redis->pool);
2312   pr_pool_tag(tmp_pool, "Redis GET pool");
2313 
2314   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2315 
2316   cmd = "GET";
2317   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2318   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2319   reply = handle_reply(redis, cmd, reply);
2320   if (reply == NULL) {
2321     pr_trace_msg(trace_channel, 2,
2322       "error getting data for key (%lu bytes) using %s: %s",
2323       (unsigned long) keysz, cmd, strerror(errno));
2324     destroy_pool(tmp_pool);
2325     errno = EIO;
2326     return NULL;
2327   }
2328 
2329   if (reply->type == REDIS_REPLY_NIL) {
2330     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
2331     freeReplyObject(reply);
2332     destroy_pool(tmp_pool);
2333     errno = ENOENT;
2334     return NULL;
2335   }
2336 
2337   if (reply->type != REDIS_REPLY_STRING) {
2338     pr_trace_msg(trace_channel, 2,
2339       "expected STRING reply for %s, got %s", cmd, get_reply_type(reply->type));
2340     freeReplyObject(reply);
2341     destroy_pool(tmp_pool);
2342     errno = EINVAL;
2343     return NULL;
2344   }
2345 
2346   pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2347     reply->str);
2348 
2349   if (valuesz != NULL) {
2350     *valuesz = (uint64_t) reply->len;
2351   }
2352 
2353   data = palloc(p, reply->len);
2354   memcpy(data, reply->str, reply->len);
2355 
2356   freeReplyObject(reply);
2357   destroy_pool(tmp_pool);
2358   return data;
2359 }
2360 
pr_redis_kget_str(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz)2361 char *pr_redis_kget_str(pool *p, pr_redis_t *redis, module *m, const char *key,
2362     size_t keysz) {
2363   const char *cmd;
2364   pool *tmp_pool;
2365   redisReply *reply;
2366   char *data = NULL;
2367 
2368   if (p == NULL ||
2369       redis == NULL ||
2370       m == NULL ||
2371       key == NULL) {
2372     errno = EINVAL;
2373     return NULL;
2374   }
2375 
2376   tmp_pool = make_sub_pool(redis->pool);
2377   pr_pool_tag(tmp_pool, "Redis GET pool");
2378 
2379   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2380 
2381   cmd = "GET";
2382   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2383   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2384   reply = handle_reply(redis, cmd, reply);
2385   if (reply == NULL) {
2386     pr_trace_msg(trace_channel, 2,
2387       "error getting data for key (%lu bytes) using %s: %s",
2388       (unsigned long) keysz, cmd, strerror(errno));
2389     destroy_pool(tmp_pool);
2390     errno = EIO;
2391     return NULL;
2392   }
2393 
2394   if (reply->type == REDIS_REPLY_NIL) {
2395     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
2396     freeReplyObject(reply);
2397     destroy_pool(tmp_pool);
2398     errno = ENOENT;
2399     return NULL;
2400   }
2401 
2402   if (reply->type != REDIS_REPLY_STRING) {
2403     pr_trace_msg(trace_channel, 2,
2404       "expected STRING reply for %s, got %s", cmd, get_reply_type(reply->type));
2405     freeReplyObject(reply);
2406     destroy_pool(tmp_pool);
2407     errno = EINVAL;
2408     return NULL;
2409   }
2410 
2411   pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2412     reply->str);
2413 
2414   data = pstrndup(p, reply->str, reply->len);
2415 
2416   freeReplyObject(reply);
2417   destroy_pool(tmp_pool);
2418   return data;
2419 }
2420 
pr_redis_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint32_t incr,uint64_t * value)2421 int pr_redis_kincr(pr_redis_t *redis, module *m, const char *key, size_t keysz,
2422     uint32_t incr, uint64_t *value) {
2423   pool *tmp_pool = NULL;
2424   const char *cmd = NULL;
2425   redisReply *reply;
2426 
2427   if (redis == NULL ||
2428       m == NULL ||
2429       key == NULL ||
2430       incr == 0) {
2431     errno = EINVAL;
2432     return -1;
2433   }
2434 
2435   tmp_pool = make_sub_pool(redis->pool);
2436   pr_pool_tag(tmp_pool, "Redis INCRBY pool");
2437 
2438   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2439 
2440   cmd = "INCRBY";
2441   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2442   reply = redisCommand(redis->ctx, "%s %b %lu", cmd, key, keysz,
2443     (unsigned long) incr);
2444   reply = handle_reply(redis, cmd, reply);
2445   if (reply == NULL) {
2446     pr_trace_msg(trace_channel, 2,
2447       "error incrementing key (%lu bytes) by %lu using %s: %s",
2448       (unsigned long) keysz, (unsigned long) incr, cmd, strerror(errno));
2449     destroy_pool(tmp_pool);
2450     errno = EIO;
2451     return -1;
2452   }
2453 
2454   if (reply->type != REDIS_REPLY_INTEGER) {
2455     pr_trace_msg(trace_channel, 2,
2456       "expected INTEGER reply for %s, got %s", cmd,
2457       get_reply_type(reply->type));
2458 
2459     if (reply->type == REDIS_REPLY_ERROR) {
2460       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2461     }
2462 
2463     freeReplyObject(reply);
2464     destroy_pool(tmp_pool);
2465     errno = EINVAL;
2466     return -1;
2467   }
2468 
2469   /* Note: INCRBY will automatically set the key value to zero if it does
2470    * not already exist.  To detect a nonexistent key, then, we look to
2471    * see if the return value is exactly our requested increment.  If so,
2472    * REMOVE the auto-created key, and return ENOENT.
2473    */
2474   if (incr == (uint32_t) reply->integer) {
2475     freeReplyObject(reply);
2476     destroy_pool(tmp_pool);
2477     (void) pr_redis_kremove(redis, m, key, keysz);
2478     errno = ENOENT;
2479     return -1;
2480   }
2481 
2482   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2483 
2484   if (value != NULL) {
2485     *value = (uint64_t) reply->integer;
2486   }
2487 
2488   freeReplyObject(reply);
2489   destroy_pool(tmp_pool);
2490   return 0;
2491 }
2492 
pr_redis_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)2493 int pr_redis_kremove(pr_redis_t *redis, module *m, const char *key,
2494     size_t keysz) {
2495   int xerrno = 0;
2496   pool *tmp_pool = NULL;
2497   const char *cmd = NULL;
2498   redisReply *reply;
2499   long long count;
2500 
2501   if (redis == NULL ||
2502       m == NULL ||
2503       key == NULL) {
2504     errno = EINVAL;
2505     return -1;
2506   }
2507 
2508   tmp_pool = make_sub_pool(redis->pool);
2509   pr_pool_tag(tmp_pool, "Redis DEL pool");
2510 
2511   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2512 
2513   cmd = "DEL";
2514   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2515   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2516   xerrno = errno;
2517 
2518   reply = handle_reply(redis, cmd, reply);
2519   if (reply == NULL) {
2520     pr_trace_msg(trace_channel, 2,
2521       "error removing key (%lu bytes): %s", (unsigned long) keysz,
2522       strerror(errno));
2523     destroy_pool(tmp_pool);
2524     errno = xerrno;
2525     return -1;
2526   }
2527 
2528   if (reply->type != REDIS_REPLY_INTEGER) {
2529     pr_trace_msg(trace_channel, 2,
2530       "expected INTEGER reply for %s, got %s", cmd,
2531       get_reply_type(reply->type));
2532     freeReplyObject(reply);
2533     destroy_pool(tmp_pool);
2534     errno = EINVAL;
2535     return -1;
2536   }
2537 
2538   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2539   count = reply->integer;
2540 
2541   freeReplyObject(reply);
2542   destroy_pool(tmp_pool);
2543 
2544   if (count == 0) {
2545     /* No keys removed. */
2546     errno = ENOENT;
2547     return -1;
2548   }
2549 
2550   return 0;
2551 }
2552 
pr_redis_krename(pr_redis_t * redis,module * m,const char * from,size_t fromsz,const char * to,size_t tosz)2553 int pr_redis_krename(pr_redis_t *redis, module *m, const char *from,
2554     size_t fromsz, const char *to, size_t tosz) {
2555   int xerrno = 0;
2556   pool *tmp_pool = NULL;
2557   const char *cmd = NULL;
2558   redisReply *reply;
2559 
2560   if (redis == NULL ||
2561       m == NULL ||
2562       from == NULL ||
2563       fromsz == 0 ||
2564       to == NULL ||
2565       tosz == 0) {
2566     errno = EINVAL;
2567     return -1;
2568   }
2569 
2570   tmp_pool = make_sub_pool(redis->pool);
2571   pr_pool_tag(tmp_pool, "Redis RENAME pool");
2572 
2573   from = get_namespace_key(tmp_pool, redis, m, from, &fromsz);
2574   to = get_namespace_key(tmp_pool, redis, m, to, &tosz);
2575 
2576   cmd = "RENAME";
2577   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2578   reply = redisCommand(redis->ctx, "%s %b %b", cmd, from, fromsz, to, tosz);
2579   xerrno = errno;
2580 
2581   reply = handle_reply(redis, cmd, reply);
2582   if (reply == NULL) {
2583     pr_trace_msg(trace_channel, 2,
2584       "error renaming key (from %lu bytes, to %lu bytes): %s",
2585       (unsigned long) fromsz, (unsigned long) tosz, strerror(errno));
2586     destroy_pool(tmp_pool);
2587     errno = xerrno;
2588     return -1;
2589   }
2590 
2591   if (reply->type != REDIS_REPLY_STRING &&
2592       reply->type != REDIS_REPLY_STATUS) {
2593     xerrno = EINVAL;
2594 
2595     pr_trace_msg(trace_channel, 2,
2596       "expected STRING or STATUS reply for %s, got %s", cmd,
2597       get_reply_type(reply->type));
2598 
2599     if (reply->type == REDIS_REPLY_ERROR) {
2600       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2601 
2602       /* Note: In order to provide ENOENT semantics here, we have to be
2603        * naughty, and assume the contents of this error message.
2604        */
2605       if (strstr(reply->str, "no such key") != NULL) {
2606         xerrno = ENOENT;
2607       }
2608     }
2609 
2610     freeReplyObject(reply);
2611     destroy_pool(tmp_pool);
2612     errno = xerrno;
2613     return -1;
2614   }
2615 
2616   pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2617     reply->str);
2618 
2619   freeReplyObject(reply);
2620   destroy_pool(tmp_pool);
2621   return 0;
2622 }
2623 
pr_redis_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,time_t expires)2624 int pr_redis_kset(pr_redis_t *redis, module *m, const char *key, size_t keysz,
2625     void *value, size_t valuesz, time_t expires) {
2626   pool *tmp_pool = NULL;
2627   const char *cmd = NULL;
2628   redisReply *reply;
2629 
2630   /* XXX Should we allow null values to be added, thus allowing use of keys
2631    * as sentinels?
2632    */
2633   if (redis == NULL ||
2634       m == NULL ||
2635       key == NULL ||
2636       value == NULL) {
2637     errno = EINVAL;
2638     return -1;
2639   }
2640 
2641   tmp_pool = make_sub_pool(redis->pool);
2642   pr_pool_tag(tmp_pool, "Redis SET pool");
2643 
2644   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2645 
2646   if (expires > 0) {
2647     cmd = "SETEX";
2648     pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2649     reply = redisCommand(redis->ctx, "%s %b %lu %b", cmd, key, keysz,
2650       (unsigned long) expires, value, valuesz);
2651 
2652   } else {
2653     cmd = "SET";
2654     pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2655     reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value,
2656       valuesz);
2657   }
2658 
2659   reply = handle_reply(redis, cmd, reply);
2660   if (reply == NULL) {
2661     pr_trace_msg(trace_channel, 2,
2662       "error adding key (%lu bytes), value (%lu bytes) using %s: %s",
2663       (unsigned long) keysz, (unsigned long) valuesz, cmd, strerror(errno));
2664     destroy_pool(tmp_pool);
2665     errno = EIO;
2666     return -1;
2667   }
2668 
2669   pr_trace_msg(trace_channel, 7, "%s reply: %s", cmd, reply->str);
2670   freeReplyObject(reply);
2671   destroy_pool(tmp_pool);
2672   return 0;
2673 }
2674 
pr_redis_hash_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)2675 int pr_redis_hash_kcount(pr_redis_t *redis, module *m, const char *key,
2676     size_t keysz, uint64_t *count) {
2677   int xerrno = 0;
2678   pool *tmp_pool = NULL;
2679   const char *cmd = NULL;
2680   redisReply *reply;
2681 
2682   if (redis == NULL ||
2683       m == NULL ||
2684       key == NULL ||
2685       keysz == 0 ||
2686       count == NULL) {
2687     errno = EINVAL;
2688     return -1;
2689   }
2690 
2691   tmp_pool = make_sub_pool(redis->pool);
2692   pr_pool_tag(tmp_pool, "Redis HLEN pool");
2693 
2694   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2695 
2696   cmd = "HLEN";
2697   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2698   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2699   xerrno = errno;
2700 
2701   reply = handle_reply(redis, cmd, reply);
2702   if (reply == NULL) {
2703     pr_trace_msg(trace_channel, 2,
2704       "error getting count of hash using key (%lu bytes): %s",
2705       (unsigned long) keysz, strerror(errno));
2706     destroy_pool(tmp_pool);
2707     errno = xerrno;
2708     return -1;
2709   }
2710 
2711   if (reply->type != REDIS_REPLY_INTEGER) {
2712     pr_trace_msg(trace_channel, 2,
2713       "expected INTEGER reply for %s, got %s", cmd,
2714       get_reply_type(reply->type));
2715 
2716     if (reply->type == REDIS_REPLY_ERROR) {
2717       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2718     }
2719 
2720     freeReplyObject(reply);
2721     destroy_pool(tmp_pool);
2722     errno = EINVAL;
2723     return -1;
2724   }
2725 
2726   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2727   *count = reply->integer;
2728 
2729   freeReplyObject(reply);
2730   destroy_pool(tmp_pool);
2731   return 0;
2732 }
2733 
pr_redis_hash_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz)2734 int pr_redis_hash_kdelete(pr_redis_t *redis, module *m, const char *key,
2735     size_t keysz, const char *field, size_t fieldsz) {
2736   int xerrno = 0, exists = FALSE;
2737   pool *tmp_pool = NULL;
2738   const char *cmd = NULL;
2739   redisReply *reply;
2740 
2741   if (redis == NULL ||
2742       m == NULL ||
2743       key == NULL ||
2744       keysz == 0 ||
2745       field == NULL ||
2746       fieldsz == 0) {
2747     errno = EINVAL;
2748     return -1;
2749   }
2750 
2751   tmp_pool = make_sub_pool(redis->pool);
2752   pr_pool_tag(tmp_pool, "Redis HDEL pool");
2753 
2754   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2755 
2756   cmd = "HDEL";
2757   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2758   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, field, fieldsz);
2759   xerrno = errno;
2760 
2761   reply = handle_reply(redis, cmd, reply);
2762   if (reply == NULL) {
2763     pr_trace_msg(trace_channel, 2,
2764       "error getting count of hash using key (%lu bytes): %s",
2765       (unsigned long) keysz, strerror(errno));
2766     destroy_pool(tmp_pool);
2767     errno = xerrno;
2768     return -1;
2769   }
2770 
2771   if (reply->type != REDIS_REPLY_INTEGER) {
2772     pr_trace_msg(trace_channel, 2,
2773       "expected INTEGER reply for %s, got %s", cmd,
2774       get_reply_type(reply->type));
2775 
2776     if (reply->type == REDIS_REPLY_ERROR) {
2777       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2778     }
2779 
2780     freeReplyObject(reply);
2781     destroy_pool(tmp_pool);
2782     errno = EINVAL;
2783     return -1;
2784   }
2785 
2786   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2787   exists = reply->integer ? TRUE : FALSE;
2788 
2789   freeReplyObject(reply);
2790   destroy_pool(tmp_pool);
2791 
2792   if (exists == FALSE) {
2793     errno = ENOENT;
2794     return -1;
2795   }
2796 
2797   return 0;
2798 }
2799 
pr_redis_hash_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz)2800 int pr_redis_hash_kexists(pr_redis_t *redis, module *m, const char *key,
2801     size_t keysz, const char *field, size_t fieldsz) {
2802   int xerrno = 0, exists = FALSE;
2803   pool *tmp_pool = NULL;
2804   const char *cmd = NULL;
2805   redisReply *reply;
2806 
2807   if (redis == NULL ||
2808       m == NULL ||
2809       key == NULL ||
2810       keysz == 0 ||
2811       field == NULL ||
2812       fieldsz == 0) {
2813     errno = EINVAL;
2814     return -1;
2815   }
2816 
2817   tmp_pool = make_sub_pool(redis->pool);
2818   pr_pool_tag(tmp_pool, "Redis HEXISTS pool");
2819 
2820   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2821 
2822   cmd = "HEXISTS";
2823   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2824   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, field, fieldsz);
2825   xerrno = errno;
2826 
2827   reply = handle_reply(redis, cmd, reply);
2828   if (reply == NULL) {
2829     pr_trace_msg(trace_channel, 2,
2830       "error getting count of hash using key (%lu bytes): %s",
2831       (unsigned long) keysz, strerror(errno));
2832     destroy_pool(tmp_pool);
2833     errno = xerrno;
2834     return -1;
2835   }
2836 
2837   if (reply->type != REDIS_REPLY_INTEGER) {
2838     pr_trace_msg(trace_channel, 2,
2839       "expected INTEGER reply for %s, got %s", cmd,
2840       get_reply_type(reply->type));
2841 
2842     if (reply->type == REDIS_REPLY_ERROR) {
2843       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2844     }
2845 
2846     freeReplyObject(reply);
2847     destroy_pool(tmp_pool);
2848     errno = EINVAL;
2849     return -1;
2850   }
2851 
2852   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2853   exists = reply->integer ? TRUE : FALSE;
2854 
2855   freeReplyObject(reply);
2856   destroy_pool(tmp_pool);
2857   return exists;
2858 }
2859 
pr_redis_hash_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,void ** value,size_t * valuesz)2860 int pr_redis_hash_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
2861     size_t keysz, const char *field, size_t fieldsz, void **value,
2862     size_t *valuesz) {
2863   int xerrno = 0, exists = FALSE;
2864   pool *tmp_pool = NULL;
2865   const char *cmd = NULL;
2866   redisReply *reply;
2867 
2868   if (p == NULL ||
2869       redis == NULL ||
2870       m == NULL ||
2871       key == NULL ||
2872       keysz == 0 ||
2873       field == NULL ||
2874       fieldsz == 0 ||
2875       value == NULL) {
2876     errno = EINVAL;
2877     return -1;
2878   }
2879 
2880   tmp_pool = make_sub_pool(redis->pool);
2881   pr_pool_tag(tmp_pool, "Redis HGET pool");
2882 
2883   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2884 
2885   cmd = "HGET";
2886   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2887   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, field, fieldsz);
2888   xerrno = errno;
2889 
2890   reply = handle_reply(redis, cmd, reply);
2891   if (reply == NULL) {
2892     pr_trace_msg(trace_channel, 2,
2893       "error getting item for field in hash using key (%lu bytes): %s",
2894       (unsigned long) keysz, strerror(errno));
2895     destroy_pool(tmp_pool);
2896     errno = xerrno;
2897     return -1;
2898   }
2899 
2900   if (reply->type != REDIS_REPLY_STRING &&
2901       reply->type != REDIS_REPLY_NIL) {
2902     pr_trace_msg(trace_channel, 2,
2903       "expected STRING or NIL reply for %s, got %s", cmd,
2904       get_reply_type(reply->type));
2905 
2906     if (reply->type == REDIS_REPLY_ERROR) {
2907       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2908     }
2909 
2910     freeReplyObject(reply);
2911     destroy_pool(tmp_pool);
2912     errno = EINVAL;
2913     return -1;
2914   }
2915 
2916   if (reply->type == REDIS_REPLY_STRING) {
2917     pr_trace_msg(trace_channel, 7, "%s reply: (%lu bytes)", cmd,
2918       (unsigned long) reply->len);
2919 
2920     *value = palloc(p, reply->len);
2921     memcpy(*value, reply->str, reply->len);
2922 
2923     if (valuesz != NULL) {
2924       *valuesz = reply->len;
2925     }
2926 
2927     exists = TRUE;
2928 
2929   } else {
2930     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
2931   }
2932 
2933   freeReplyObject(reply);
2934   destroy_pool(tmp_pool);
2935 
2936   if (exists == FALSE) {
2937     errno = ENOENT;
2938     return -1;
2939   }
2940 
2941   return 0;
2942 }
2943 
pr_redis_hash_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,pr_table_t ** hash)2944 int pr_redis_hash_kgetall(pool *p, pr_redis_t *redis, module *m,
2945     const char *key, size_t keysz, pr_table_t **hash) {
2946   int res, xerrno = 0;
2947   pool *tmp_pool = NULL;
2948   const char *cmd = NULL;
2949   redisReply *reply;
2950 
2951   if (p == NULL ||
2952       redis == NULL ||
2953       m == NULL ||
2954       key == NULL ||
2955       keysz == 0 ||
2956       hash == NULL) {
2957     errno = EINVAL;
2958     return -1;
2959   }
2960 
2961   tmp_pool = make_sub_pool(redis->pool);
2962   pr_pool_tag(tmp_pool, "Redis HGETALL pool");
2963 
2964   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2965 
2966   cmd = "HGETALL";
2967   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2968   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2969   xerrno = errno;
2970 
2971   reply = handle_reply(redis, cmd, reply);
2972   if (reply == NULL) {
2973     pr_trace_msg(trace_channel, 2,
2974       "error getting hash using key (%lu bytes): %s",
2975       (unsigned long) keysz, strerror(errno));
2976     destroy_pool(tmp_pool);
2977     errno = xerrno;
2978     return -1;
2979   }
2980 
2981   if (reply->type != REDIS_REPLY_ARRAY) {
2982     pr_trace_msg(trace_channel, 2,
2983       "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
2984 
2985     if (reply->type == REDIS_REPLY_ERROR) {
2986       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2987     }
2988 
2989     freeReplyObject(reply);
2990     destroy_pool(tmp_pool);
2991     errno = EINVAL;
2992     return -1;
2993   }
2994 
2995   if (reply->elements > 0) {
2996     register unsigned int i;
2997 
2998     pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
2999       (unsigned long) reply->elements,
3000       reply->elements != 1 ? "elements" : "element");
3001 
3002     *hash = pr_table_alloc(p, 0);
3003 
3004     for (i = 0; i < reply->elements; i += 2) {
3005       redisReply *key_elt, *value_elt;
3006       void *key_data = NULL, *value_data = NULL;
3007       size_t key_datasz = 0, value_datasz = 0;
3008 
3009       key_elt = reply->element[i];
3010       if (key_elt->type == REDIS_REPLY_STRING) {
3011         key_datasz = key_elt->len;
3012         key_data = palloc(p, key_datasz);
3013         memcpy(key_data, key_elt->str, key_datasz);
3014 
3015       } else {
3016         pr_trace_msg(trace_channel, 2,
3017           "expected STRING element at index %u, got %s", i,
3018           get_reply_type(key_elt->type));
3019       }
3020 
3021       value_elt = reply->element[i+1];
3022       if (value_elt->type == REDIS_REPLY_STRING) {
3023         value_datasz = value_elt->len;
3024         value_data = palloc(p, value_datasz);
3025         memcpy(value_data, value_elt->str, value_datasz);
3026 
3027       } else {
3028         pr_trace_msg(trace_channel, 2,
3029           "expected STRING element at index %u, got %s", i + 2,
3030           get_reply_type(value_elt->type));
3031       }
3032 
3033       if (key_data != NULL &&
3034           value_data != NULL) {
3035         if (pr_table_kadd(*hash, key_data, key_datasz, value_data,
3036             value_datasz) < 0) {
3037           pr_trace_msg(trace_channel, 2,
3038             "error adding key (%lu bytes), value (%lu bytes) to hash: %s",
3039             (unsigned long) key_datasz, (unsigned long) value_datasz,
3040             strerror(errno));
3041         }
3042       }
3043     }
3044 
3045     res = 0;
3046 
3047   } else {
3048     xerrno = ENOENT;
3049     res = -1;
3050   }
3051 
3052   freeReplyObject(reply);
3053   destroy_pool(tmp_pool);
3054 
3055   errno = xerrno;
3056   return res;
3057 }
3058 
pr_redis_hash_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,int32_t incr,int64_t * value)3059 int pr_redis_hash_kincr(pr_redis_t *redis, module *m, const char *key,
3060     size_t keysz, const char *field, size_t fieldsz, int32_t incr,
3061     int64_t *value) {
3062   int xerrno = 0, exists = FALSE;
3063   pool *tmp_pool = NULL;
3064   const char *cmd = NULL;
3065   redisReply *reply;
3066 
3067   if (redis == NULL ||
3068       m == NULL ||
3069       key == NULL ||
3070       keysz == 0 ||
3071       field == NULL ||
3072       fieldsz == 0) {
3073     errno = EINVAL;
3074     return -1;
3075   }
3076 
3077   exists = pr_redis_hash_kexists(redis, m, key, keysz, field, fieldsz);
3078   if (exists == FALSE) {
3079     errno = ENOENT;
3080     return -1;
3081   }
3082 
3083   tmp_pool = make_sub_pool(redis->pool);
3084   pr_pool_tag(tmp_pool, "Redis HINCRBY pool");
3085 
3086   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3087 
3088   cmd = "HINCRBY";
3089   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3090   reply = redisCommand(redis->ctx, "%s %b %b %d", cmd, key, keysz, field,
3091     fieldsz, incr);
3092   xerrno = errno;
3093 
3094   reply = handle_reply(redis, cmd, reply);
3095   if (reply == NULL) {
3096     pr_trace_msg(trace_channel, 2,
3097       "error incrementing field in hash using key (%lu bytes): %s",
3098       (unsigned long) keysz, strerror(errno));
3099     destroy_pool(tmp_pool);
3100     errno = xerrno;
3101     return -1;
3102   }
3103 
3104   if (reply->type != REDIS_REPLY_INTEGER) {
3105     pr_trace_msg(trace_channel, 2,
3106       "expected INTEGER reply for %s, got %s", cmd,
3107       get_reply_type(reply->type));
3108 
3109     if (reply->type == REDIS_REPLY_ERROR) {
3110       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3111     }
3112 
3113     freeReplyObject(reply);
3114     destroy_pool(tmp_pool);
3115     errno = EINVAL;
3116     return -1;
3117   }
3118 
3119   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
3120   if (value != NULL) {
3121     *value = (int64_t) reply->integer;
3122   }
3123 
3124   freeReplyObject(reply);
3125   destroy_pool(tmp_pool);
3126   return 0;
3127 }
3128 
hash_scan(pool * p,pr_redis_t * redis,const char * key,size_t keysz,array_header * fields,char ** cursor,int count)3129 static int hash_scan(pool *p, pr_redis_t *redis, const char *key,
3130     size_t keysz, array_header *fields, char **cursor, int count) {
3131   int res = 0, xerrno = 0;
3132   const char *cmd = NULL;
3133   redisReply *reply;
3134 
3135   cmd = "HSCAN";
3136   pr_trace_msg(trace_channel, 7, "sending command: %s %s", cmd, *cursor);
3137   reply = redisCommand(redis->ctx, "%s %b %s COUNT %d", cmd, key, keysz,
3138     *cursor, count);
3139   xerrno = errno;
3140 
3141   reply = handle_reply(redis, cmd, reply);
3142   if (reply == NULL) {
3143     pr_trace_msg(trace_channel, 2,
3144       "error getting fields of hash using key (%lu bytes), cursor '%s': %s",
3145       (unsigned long) keysz, *cursor, strerror(errno));
3146     errno = xerrno;
3147     return -1;
3148   }
3149 
3150   if (reply->type != REDIS_REPLY_ARRAY) {
3151     pr_trace_msg(trace_channel, 2,
3152       "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
3153 
3154     if (reply->type == REDIS_REPLY_ERROR) {
3155       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3156     }
3157 
3158     freeReplyObject(reply);
3159     errno = EINVAL;
3160     return -1;
3161   }
3162 
3163   if (reply->elements == 2) {
3164     redisReply *elt;
3165 
3166     elt = reply->element[0];
3167     if (elt->type == REDIS_REPLY_STRING) {
3168       *cursor = pstrndup(p, elt->str, elt->len);
3169 
3170     } else {
3171       pr_trace_msg(trace_channel, 2,
3172         "expected STRING element at index 0, got %s",
3173         get_reply_type(elt->type));
3174 
3175       xerrno = EINVAL;
3176       res = -1;
3177     }
3178 
3179     if (res == 0) {
3180       elt = reply->element[1];
3181       if (elt->type == REDIS_REPLY_ARRAY) {
3182         register unsigned int i;
3183 
3184         pr_trace_msg(trace_channel, 7, "%s reply: %s %lu %s", cmd, *cursor,
3185           (unsigned long) elt->elements,
3186           elt->elements != 1 ? "elements" : "element");
3187 
3188         /* When using HSCAN, we iterate over ALL the fields of the hash,
3189          * key AND value.  Thus to get just the keys, we need every other
3190          * item.
3191          */
3192         for (i = 1; i < elt->elements; i += 2) {
3193           redisReply *item;
3194 
3195           item = elt->element[i];
3196           if (item->type == REDIS_REPLY_STRING) {
3197             char *field;
3198 
3199             field = pstrndup(p, item->str, item->len);
3200             *((char **) push_array(fields)) = field;
3201 
3202           } else {
3203             pr_trace_msg(trace_channel, 2,
3204               "expected STRING element at index %u, got %s", i,
3205               get_reply_type(elt->type));
3206           }
3207         }
3208 
3209         if (strcmp(*cursor, "0") == 0) {
3210           /* Set the cursor to NULL, to indicate to the caller the end
3211            * of the iteration.
3212            */
3213           *cursor = NULL;
3214         }
3215 
3216       } else {
3217         pr_trace_msg(trace_channel, 2,
3218           "expected ARRAY element at index 1, got %s",
3219           get_reply_type(elt->type));
3220 
3221         xerrno = EINVAL;
3222         res = -1;
3223       }
3224     }
3225 
3226   } else {
3227     xerrno = ENOENT;
3228     res = -1;
3229   }
3230 
3231   freeReplyObject(reply);
3232   errno = xerrno;
3233   return res;
3234 }
3235 
pr_redis_hash_kkeys(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** fields)3236 int pr_redis_hash_kkeys(pool *p, pr_redis_t *redis, module *m, const char *key,
3237     size_t keysz, array_header **fields) {
3238   int res = 0, xerrno = 0;
3239   pool *tmp_pool = NULL;
3240   char *cursor;
3241 
3242   if (p == NULL ||
3243       redis == NULL ||
3244       m == NULL ||
3245       key == NULL ||
3246       keysz == 0 ||
3247       fields == NULL) {
3248     errno = EINVAL;
3249     return -1;
3250   }
3251 
3252   tmp_pool = make_sub_pool(redis->pool);
3253   pr_pool_tag(tmp_pool, "Redis HSCAN pool");
3254 
3255   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3256 
3257   cursor = "0";
3258   res = 0;
3259   *fields = make_array(p, 0, sizeof(char *));
3260 
3261   while (res == 0 &&
3262          cursor != NULL) {
3263     pr_signals_handle();
3264 
3265     res = hash_scan(tmp_pool, redis, key, keysz, *fields, &cursor,
3266       PR_REDIS_SCAN_SIZE);
3267     xerrno = errno;
3268 
3269     if (res < 0) {
3270       destroy_pool(tmp_pool);
3271 
3272       errno = xerrno;
3273       return -1;
3274     }
3275   }
3276 
3277   if ((*fields)->nelts == 0) {
3278     *fields = NULL;
3279     xerrno = ENOENT;
3280     res = -1;
3281   }
3282 
3283   destroy_pool(tmp_pool);
3284   errno = xerrno;
3285   return res;
3286 }
3287 
pr_redis_hash_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)3288 int pr_redis_hash_kremove(pr_redis_t *redis, module *m, const char *key,
3289     size_t keysz) {
3290 
3291   /* Note: We can actually use just DEL here. */
3292   return pr_redis_kremove(redis, m, key, keysz);
3293 }
3294 
pr_redis_hash_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,void * value,size_t valuesz)3295 int pr_redis_hash_kset(pr_redis_t *redis, module *m, const char *key,
3296     size_t keysz, const char *field, size_t fieldsz, void *value,
3297     size_t valuesz) {
3298   int xerrno = 0;
3299   pool *tmp_pool = NULL;
3300   const char *cmd = NULL;
3301   redisReply *reply;
3302 
3303   if (redis == NULL ||
3304       m == NULL ||
3305       key == NULL ||
3306       keysz == 0 ||
3307       field == NULL ||
3308       fieldsz == 0 ||
3309       value == NULL ||
3310       valuesz == 0) {
3311     errno = EINVAL;
3312     return -1;
3313   }
3314 
3315   tmp_pool = make_sub_pool(redis->pool);
3316   pr_pool_tag(tmp_pool, "Redis HSET pool");
3317 
3318   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3319 
3320   cmd = "HSET";
3321   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3322   reply = redisCommand(redis->ctx, "%s %b %b %b", cmd, key, keysz, field,
3323     fieldsz, value, valuesz);
3324   xerrno = errno;
3325 
3326   reply = handle_reply(redis, cmd, reply);
3327   if (reply == NULL) {
3328     pr_trace_msg(trace_channel, 2,
3329       "error setting item for field in hash using key (%lu bytes): %s",
3330       (unsigned long) keysz, strerror(errno));
3331     destroy_pool(tmp_pool);
3332     errno = xerrno;
3333     return -1;
3334   }
3335 
3336   if (reply->type != REDIS_REPLY_INTEGER) {
3337     pr_trace_msg(trace_channel, 2,
3338       "expected INTEGER reply for %s, got %s", cmd,
3339       get_reply_type(reply->type));
3340 
3341     if (reply->type == REDIS_REPLY_ERROR) {
3342       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3343     }
3344 
3345     freeReplyObject(reply);
3346     destroy_pool(tmp_pool);
3347     errno = EINVAL;
3348     return -1;
3349   }
3350 
3351   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
3352 
3353   freeReplyObject(reply);
3354   destroy_pool(tmp_pool);
3355   return 0;
3356 }
3357 
pr_redis_hash_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,pr_table_t * hash)3358 int pr_redis_hash_ksetall(pr_redis_t *redis, module *m, const char *key,
3359     size_t keysz, pr_table_t *hash) {
3360   int count, xerrno = 0;
3361   pool *tmp_pool = NULL;
3362   const char *cmd = NULL;
3363   array_header *args, *arglens;
3364   redisReply *reply;
3365   const void *key_data;
3366   size_t key_datasz;
3367 
3368   if (redis == NULL ||
3369       m == NULL ||
3370       key == NULL ||
3371       keysz == 0 ||
3372       hash == NULL) {
3373     errno = EINVAL;
3374     return -1;
3375   }
3376 
3377   /* Skip any empty hashes. */
3378   count = pr_table_count(hash);
3379   if (count <= 0) {
3380     pr_trace_msg(trace_channel, 9, "skipping empty table");
3381     errno = EINVAL;
3382     return -1;
3383   }
3384 
3385   tmp_pool = make_sub_pool(redis->pool);
3386   pr_pool_tag(tmp_pool, "Redis HMSET pool");
3387 
3388   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3389 
3390   cmd = "HMSET";
3391   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3392 
3393   args = make_array(tmp_pool, count + 1, sizeof(char *));
3394   arglens = make_array(tmp_pool, count + 1, sizeof(size_t));
3395 
3396   *((char **) push_array(args)) = pstrdup(tmp_pool, cmd);
3397   *((size_t *) push_array(arglens)) = strlen(cmd);
3398 
3399   *((char **) push_array(args)) = (char *) key;
3400   *((size_t *) push_array(arglens)) = keysz;
3401 
3402   pr_table_rewind(hash);
3403   key_data = pr_table_knext(hash, &key_datasz);
3404   while (key_data != NULL) {
3405     const void *value_data;
3406     size_t value_datasz;
3407 
3408     pr_signals_handle();
3409 
3410     value_data = pr_table_kget(hash, key_data, key_datasz, &value_datasz);
3411     if (value_data != NULL) {
3412       char *key_dup, *value_dup;
3413 
3414       key_dup = palloc(tmp_pool, key_datasz);
3415       memcpy(key_dup, key_data, key_datasz);
3416       *((char **) push_array(args)) = key_dup;
3417       *((size_t *) push_array(arglens)) = key_datasz;
3418 
3419       value_dup = palloc(tmp_pool, value_datasz);
3420       memcpy(value_dup, value_data, value_datasz);
3421       *((char **) push_array(args)) = value_dup;
3422       *((size_t *) push_array(arglens)) = value_datasz;
3423     }
3424 
3425     key_data = pr_table_knext(hash, &key_datasz);
3426   }
3427 
3428   reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
3429   xerrno = errno;
3430 
3431   reply = handle_reply(redis, cmd, reply);
3432   if (reply == NULL) {
3433     pr_trace_msg(trace_channel, 2,
3434       "error setting hash using key (%lu bytes): %s",
3435       (unsigned long) keysz, strerror(errno));
3436     destroy_pool(tmp_pool);
3437     errno = xerrno;
3438     return -1;
3439   }
3440 
3441   if (reply->type != REDIS_REPLY_STRING &&
3442       reply->type != REDIS_REPLY_STATUS) {
3443     pr_trace_msg(trace_channel, 2,
3444       "expected STRING or STATUS reply for %s, got %s", cmd,
3445       get_reply_type(reply->type));
3446 
3447     if (reply->type == REDIS_REPLY_ERROR) {
3448       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3449     }
3450 
3451     freeReplyObject(reply);
3452     destroy_pool(tmp_pool);
3453     errno = EINVAL;
3454     return -1;
3455   }
3456 
3457   pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
3458     reply->str);
3459 
3460   freeReplyObject(reply);
3461   destroy_pool(tmp_pool);
3462   return 0;
3463 }
3464 
pr_redis_hash_kvalues(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values)3465 int pr_redis_hash_kvalues(pool *p, pr_redis_t *redis, module *m,
3466     const char *key, size_t keysz, array_header **values) {
3467   int res, xerrno = 0;
3468   pool *tmp_pool = NULL;
3469   const char *cmd = NULL;
3470   redisReply *reply;
3471 
3472   if (p == NULL ||
3473       redis == NULL ||
3474       m == NULL ||
3475       key == NULL ||
3476       keysz == 0 ||
3477       values == NULL) {
3478     errno = EINVAL;
3479     return -1;
3480   }
3481 
3482   tmp_pool = make_sub_pool(redis->pool);
3483   pr_pool_tag(tmp_pool, "Redis HVALS pool");
3484 
3485   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3486 
3487   cmd = "HVALS";
3488   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3489   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
3490   xerrno = errno;
3491 
3492   reply = handle_reply(redis, cmd, reply);
3493   if (reply == NULL) {
3494     pr_trace_msg(trace_channel, 2,
3495       "error getting values of hash using key (%lu bytes): %s",
3496       (unsigned long) keysz, strerror(errno));
3497     destroy_pool(tmp_pool);
3498     errno = xerrno;
3499     return -1;
3500   }
3501 
3502   if (reply->type != REDIS_REPLY_ARRAY) {
3503     pr_trace_msg(trace_channel, 2,
3504       "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
3505 
3506     if (reply->type == REDIS_REPLY_ERROR) {
3507       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3508     }
3509 
3510     freeReplyObject(reply);
3511     destroy_pool(tmp_pool);
3512     errno = EINVAL;
3513     return -1;
3514   }
3515 
3516   if (reply->elements > 0) {
3517     register unsigned int i;
3518 
3519     pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
3520       (unsigned long) reply->elements,
3521       reply->elements != 1 ? "elements" : "element");
3522 
3523     *values = make_array(p, reply->elements, sizeof(char *));
3524     for (i = 0; i < reply->elements; i++) {
3525       redisReply *elt;
3526 
3527       elt = reply->element[i];
3528       if (elt->type == REDIS_REPLY_STRING) {
3529         char *value;
3530 
3531         value = pcalloc(p, reply->len + 1);
3532         memcpy(value, reply->str, reply->len);
3533         *((char **) push_array(*values)) = value;
3534 
3535       } else {
3536         pr_trace_msg(trace_channel, 2,
3537           "expected STRING element at index %u, got %s", i,
3538           get_reply_type(elt->type));
3539       }
3540     }
3541 
3542     res = 0;
3543 
3544   } else {
3545     xerrno = ENOENT;
3546     res = -1;
3547   }
3548 
3549   freeReplyObject(reply);
3550   destroy_pool(tmp_pool);
3551 
3552   errno = xerrno;
3553   return res;
3554 }
3555 
pr_redis_list_kappend(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)3556 int pr_redis_list_kappend(pr_redis_t *redis, module *m, const char *key,
3557     size_t keysz, void *value, size_t valuesz) {
3558   return pr_redis_list_kpush(redis, m, key, keysz, value, valuesz,
3559     PR_REDIS_LIST_FL_RIGHT);
3560 }
3561 
pr_redis_list_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)3562 int pr_redis_list_kcount(pr_redis_t *redis, module *m, const char *key,
3563     size_t keysz, uint64_t *count) {
3564   int xerrno = 0;
3565   pool *tmp_pool = NULL;
3566   const char *cmd = NULL;
3567   redisReply *reply;
3568 
3569   if (redis == NULL ||
3570       m == NULL ||
3571       key == NULL ||
3572       keysz == 0 ||
3573       count == NULL) {
3574     errno = EINVAL;
3575     return -1;
3576   }
3577 
3578   tmp_pool = make_sub_pool(redis->pool);
3579   pr_pool_tag(tmp_pool, "Redis LLEN pool");
3580 
3581   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3582 
3583   cmd = "LLEN";
3584   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3585   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
3586   xerrno = errno;
3587 
3588   reply = handle_reply(redis, cmd, reply);
3589   if (reply == NULL) {
3590     pr_trace_msg(trace_channel, 2,
3591       "error getting count of list using key (%lu bytes): %s",
3592       (unsigned long) keysz, strerror(errno));
3593     destroy_pool(tmp_pool);
3594     errno = xerrno;
3595     return -1;
3596   }
3597 
3598   if (reply->type != REDIS_REPLY_INTEGER) {
3599     pr_trace_msg(trace_channel, 2,
3600       "expected INTEGER reply for %s, got %s", cmd,
3601       get_reply_type(reply->type));
3602 
3603     if (reply->type == REDIS_REPLY_ERROR) {
3604       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3605     }
3606 
3607     freeReplyObject(reply);
3608     destroy_pool(tmp_pool);
3609     errno = EINVAL;
3610     return -1;
3611   }
3612 
3613   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
3614   *count = (uint64_t) reply->integer;
3615 
3616   freeReplyObject(reply);
3617   destroy_pool(tmp_pool);
3618   return 0;
3619 }
3620 
pr_redis_list_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)3621 int pr_redis_list_kdelete(pr_redis_t *redis, module *m, const char *key,
3622     size_t keysz, void *value, size_t valuesz) {
3623   int xerrno = 0;
3624   pool *tmp_pool = NULL;
3625   const char *cmd = NULL;
3626   redisReply *reply;
3627   long long count = 0;
3628 
3629   if (redis == NULL ||
3630       m == NULL ||
3631       key == NULL ||
3632       keysz == 0 ||
3633       value == NULL ||
3634       valuesz == 0) {
3635     errno = EINVAL;
3636     return -1;
3637   }
3638 
3639   tmp_pool = make_sub_pool(redis->pool);
3640   pr_pool_tag(tmp_pool, "Redis LREM pool");
3641 
3642   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3643 
3644   cmd = "LREM";
3645   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3646   reply = redisCommand(redis->ctx, "%s %b 0 %b", cmd, key, keysz, value,
3647     valuesz);
3648   xerrno = errno;
3649 
3650   reply = handle_reply(redis, cmd, reply);
3651   if (reply == NULL) {
3652     pr_trace_msg(trace_channel, 2,
3653       "error deleting item from set using key (%lu bytes): %s",
3654       (unsigned long) keysz, strerror(errno));
3655     destroy_pool(tmp_pool);
3656     errno = xerrno;
3657     return -1;
3658   }
3659 
3660   if (reply->type != REDIS_REPLY_INTEGER) {
3661     pr_trace_msg(trace_channel, 2,
3662       "expected INTEGER reply for %s, got %s", cmd,
3663       get_reply_type(reply->type));
3664 
3665     if (reply->type == REDIS_REPLY_ERROR) {
3666       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3667     }
3668 
3669     freeReplyObject(reply);
3670     destroy_pool(tmp_pool);
3671     errno = EINVAL;
3672     return -1;
3673   }
3674 
3675   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
3676   count = reply->integer;
3677 
3678   freeReplyObject(reply);
3679   destroy_pool(tmp_pool);
3680 
3681   if (count == 0) {
3682     /* No items removed. */
3683     errno = ENOENT;
3684     return -1;
3685   }
3686 
3687   return 0;
3688 }
3689 
pr_redis_list_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx)3690 int pr_redis_list_kexists(pr_redis_t *redis, module *m, const char *key,
3691     size_t keysz, unsigned int idx) {
3692   pool *tmp_pool;
3693   int res, xerrno = 0;
3694   void *val = NULL;
3695   size_t valsz = 0;
3696 
3697   tmp_pool = make_sub_pool(NULL);
3698   res = pr_redis_list_kget(tmp_pool, redis, m, key, keysz, idx, &val, &valsz);
3699   xerrno = errno;
3700   destroy_pool(tmp_pool);
3701 
3702   if (res < 0) {
3703     if (xerrno != ENOENT) {
3704       errno = xerrno;
3705       return -1;
3706     }
3707 
3708     return FALSE;
3709   }
3710 
3711   return TRUE;
3712 }
3713 
pr_redis_list_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx,void ** value,size_t * valuesz)3714 int pr_redis_list_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
3715     size_t keysz, unsigned int idx, void **value, size_t *valuesz) {
3716   int res, xerrno = 0;
3717   pool *tmp_pool = NULL;
3718   const char *cmd = NULL;
3719   redisReply *reply;
3720   uint64_t count;
3721 
3722   if (p == NULL ||
3723       redis == NULL ||
3724       m == NULL ||
3725       key == NULL ||
3726       keysz == 0 ||
3727       value == NULL ||
3728       valuesz == NULL) {
3729     errno = EINVAL;
3730     return -1;
3731   }
3732 
3733   if (pr_redis_list_kcount(redis, m, key, keysz, &count) == 0) {
3734     if (count > 0 &&
3735         idx > 0 &&
3736         idx >= count) {
3737       pr_trace_msg(trace_channel, 14,
3738         "requested index %u exceeds list length %lu", idx,
3739         (unsigned long) count);
3740       errno = ERANGE;
3741       return -1;
3742     }
3743   }
3744 
3745   tmp_pool = make_sub_pool(redis->pool);
3746   pr_pool_tag(tmp_pool, "Redis LINDEX pool");
3747 
3748   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3749 
3750   cmd = "LINDEX";
3751   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3752   reply = redisCommand(redis->ctx, "%s %b %u", cmd, key, keysz, idx);
3753   xerrno = errno;
3754 
3755   reply = handle_reply(redis, cmd, reply);
3756   if (reply == NULL) {
3757     pr_trace_msg(trace_channel, 2,
3758       "error getting item at index %u of list using key (%lu bytes): %s", idx,
3759       (unsigned long) keysz, strerror(errno));
3760     destroy_pool(tmp_pool);
3761     errno = xerrno;
3762     return -1;
3763   }
3764 
3765   if (reply->type != REDIS_REPLY_STRING &&
3766       reply->type != REDIS_REPLY_NIL) {
3767     pr_trace_msg(trace_channel, 2,
3768       "expected STRING or NIL reply for %s, got %s", cmd,
3769       get_reply_type(reply->type));
3770 
3771     if (reply->type == REDIS_REPLY_ERROR) {
3772       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3773     }
3774 
3775     freeReplyObject(reply);
3776     destroy_pool(tmp_pool);
3777     errno = EINVAL;
3778     return -1;
3779   }
3780 
3781   if (reply->type == REDIS_REPLY_STRING) {
3782     pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
3783       reply->str);
3784     *valuesz = reply->len;
3785     *value = palloc(p, reply->len);
3786     memcpy(*value, reply->str, reply->len);
3787     res = 0;
3788 
3789   } else {
3790     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
3791     xerrno = ENOENT;
3792     res = -1;
3793   }
3794 
3795   freeReplyObject(reply);
3796   destroy_pool(tmp_pool);
3797 
3798   errno = xerrno;
3799   return res;
3800 }
3801 
list_scan(pool * p,pr_redis_t * redis,const char * key,size_t keysz,array_header * values,array_header * valueszs,int * cursor,int count)3802 static int list_scan(pool *p, pr_redis_t *redis, const char *key, size_t keysz,
3803     array_header *values, array_header *valueszs, int *cursor, int count) {
3804   int res = 0, xerrno = 0, range;
3805   const char *cmd = NULL;
3806   redisReply *reply;
3807 
3808   cmd = "LRANGE";
3809 
3810   /* Note: We use one less than the count to preserve [...) semantics of the
3811    * requested range, rather than Redis' [...] inclusive semantics.
3812    */
3813   range = *cursor + count - 1;
3814   pr_trace_msg(trace_channel, 7, "sending command: %s %d %d", cmd, *cursor,
3815     range);
3816   reply = redisCommand(redis->ctx, "%s %b %d %d", cmd, key, keysz,
3817     *cursor, range);
3818   xerrno = errno;
3819 
3820   reply = handle_reply(redis, cmd, reply);
3821   if (reply == NULL) {
3822     pr_trace_msg(trace_channel, 2,
3823       "error getting items in list using key (%lu bytes), cursor %d: %s",
3824       (unsigned long) keysz, *cursor, strerror(errno));
3825     errno = xerrno;
3826     return -1;
3827   }
3828 
3829   if (reply->type != REDIS_REPLY_ARRAY) {
3830     pr_trace_msg(trace_channel, 2,
3831       "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
3832 
3833     if (reply->type == REDIS_REPLY_ERROR) {
3834       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3835     }
3836 
3837     freeReplyObject(reply);
3838     errno = EINVAL;
3839     return -1;
3840   }
3841 
3842   if (reply->elements > 0) {
3843     register unsigned int i;
3844 
3845     pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
3846       (unsigned long) reply->elements,
3847       reply->elements != 1 ? "elements" : "element");
3848 
3849     for (i = 0; i < reply->elements; i++) {
3850       redisReply *value_elt;
3851       void *value_data = NULL;
3852       size_t value_datasz = 0;
3853 
3854       value_elt = reply->element[i];
3855       if (value_elt->type == REDIS_REPLY_STRING) {
3856         value_datasz = value_elt->len;
3857         value_data = palloc(p, value_datasz);
3858         memcpy(value_data, value_elt->str, value_datasz);
3859 
3860       } else {
3861         pr_trace_msg(trace_channel, 2,
3862           "expected STRING element at index %u, got %s", i+1,
3863           get_reply_type(value_elt->type));
3864       }
3865 
3866       if (value_data != NULL) {
3867         *((void **) push_array(values)) = value_data;
3868         *((size_t *) push_array(valueszs)) = value_datasz;
3869       }
3870     }
3871 
3872     if (reply->elements == 0) {
3873       /* Set the cursor to -1, to indicate to the caller the end of the
3874        * iteration.
3875        */
3876       *cursor = -1;
3877 
3878     } else {
3879       (*cursor) += reply->elements;
3880     }
3881 
3882     res = 0;
3883 
3884   } else {
3885     if (*cursor > 0) {
3886       /* If cursor is greater than zero, then we have found some elements,
3887        * and have reached the end of the iteration.
3888        */
3889       *cursor = -1;
3890       res = 0;
3891 
3892     } else {
3893       xerrno = ENOENT;
3894       res = -1;
3895     }
3896   }
3897 
3898   freeReplyObject(reply);
3899   errno = xerrno;
3900   return res;
3901 }
3902 
pr_redis_list_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values,array_header ** valueszs)3903 int pr_redis_list_kgetall(pool *p, pr_redis_t *redis, module *m,
3904     const char *key, size_t keysz, array_header **values,
3905     array_header **valueszs) {
3906   int cursor, res = 0, xerrno = 0;
3907   pool *tmp_pool = NULL;
3908 
3909   if (p == NULL ||
3910       redis == NULL ||
3911       m == NULL ||
3912       key == NULL ||
3913       keysz == 0 ||
3914       values == NULL ||
3915       valueszs == NULL) {
3916     errno = EINVAL;
3917     return -1;
3918   }
3919 
3920   tmp_pool = make_sub_pool(redis->pool);
3921   pr_pool_tag(tmp_pool, "Redis LRANGE pool");
3922 
3923   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3924 
3925   cursor = 0;
3926   res = 0;
3927   *values = make_array(p, 0, sizeof(void *));
3928   *valueszs = make_array(p, 0, sizeof(size_t));
3929 
3930   while (res == 0 &&
3931          cursor != -1) {
3932     pr_signals_handle();
3933 
3934     res = list_scan(tmp_pool, redis, key, keysz, *values, *valueszs, &cursor,
3935       PR_REDIS_SCAN_SIZE);
3936     xerrno = errno;
3937 
3938     if (res < 0) {
3939       destroy_pool(tmp_pool);
3940 
3941       errno = xerrno;
3942       return -1;
3943     }
3944   }
3945 
3946   if ((*values)->nelts == 0) {
3947     *values = *valueszs = NULL;
3948     xerrno = ENOENT;
3949     res = -1;
3950   }
3951 
3952   destroy_pool(tmp_pool);
3953   errno = xerrno;
3954   return res;
3955 }
3956 
pr_redis_list_kpop(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,void ** value,size_t * valuesz,int flags)3957 int pr_redis_list_kpop(pool *p, pr_redis_t *redis, module *m, const char *key,
3958     size_t keysz, void **value, size_t *valuesz, int flags) {
3959   int res, xerrno = 0;
3960   pool *tmp_pool = NULL;
3961   const char *cmd = NULL;
3962   redisReply *reply;
3963 
3964   if (p == NULL ||
3965       redis == NULL ||
3966       m == NULL ||
3967       key == NULL ||
3968       keysz == 0 ||
3969       value == NULL ||
3970       valuesz == NULL) {
3971     errno = EINVAL;
3972     return -1;
3973   }
3974 
3975   tmp_pool = make_sub_pool(redis->pool);
3976 
3977   switch (flags) {
3978     case PR_REDIS_LIST_FL_RIGHT:
3979       pr_pool_tag(tmp_pool, "Redis RPOP pool");
3980       cmd = "RPOP";
3981       break;
3982 
3983     case PR_REDIS_LIST_FL_LEFT:
3984       pr_pool_tag(tmp_pool, "Redis LPOP pool");
3985       cmd = "LPOP";
3986       break;
3987 
3988     default:
3989       destroy_pool(tmp_pool);
3990       errno = EINVAL;
3991       return -1;
3992   }
3993 
3994   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3995 
3996   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3997   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
3998   xerrno = errno;
3999 
4000   reply = handle_reply(redis, cmd, reply);
4001   if (reply == NULL) {
4002     pr_trace_msg(trace_channel, 2,
4003       "error popping item from list using key (%lu bytes): %s",
4004       (unsigned long) keysz, strerror(errno));
4005     destroy_pool(tmp_pool);
4006     errno = xerrno;
4007     return -1;
4008   }
4009 
4010   if (reply->type != REDIS_REPLY_STRING &&
4011       reply->type != REDIS_REPLY_NIL) {
4012     pr_trace_msg(trace_channel, 2,
4013       "expected STRING or NIL reply for %s, got %s", cmd,
4014       get_reply_type(reply->type));
4015 
4016     if (reply->type == REDIS_REPLY_ERROR) {
4017       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4018     }
4019 
4020     freeReplyObject(reply);
4021     destroy_pool(tmp_pool);
4022     errno = EINVAL;
4023     return -1;
4024   }
4025 
4026   if (reply->type == REDIS_REPLY_STRING) {
4027     pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
4028       reply->str);
4029     *valuesz = reply->len;
4030     *value = palloc(p, reply->len);
4031     memcpy(*value, reply->str, reply->len);
4032     res = 0;
4033 
4034   } else {
4035     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
4036     xerrno = ENOENT;
4037     res = -1;
4038   }
4039 
4040   freeReplyObject(reply);
4041   destroy_pool(tmp_pool);
4042 
4043   errno = xerrno;
4044   return res;
4045 }
4046 
pr_redis_list_kpush(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,int flags)4047 int pr_redis_list_kpush(pr_redis_t *redis, module *m, const char *key,
4048     size_t keysz, void *value, size_t valuesz, int flags) {
4049   int xerrno = 0;
4050   pool *tmp_pool = NULL;
4051   const char *cmd = NULL;
4052   redisReply *reply;
4053 
4054   if (redis == NULL ||
4055       m == NULL ||
4056       key == NULL ||
4057       keysz == 0 ||
4058       value == NULL ||
4059       valuesz == 0) {
4060     errno = EINVAL;
4061     return -1;
4062   }
4063 
4064   tmp_pool = make_sub_pool(redis->pool);
4065 
4066   switch (flags) {
4067     case PR_REDIS_LIST_FL_RIGHT:
4068       pr_pool_tag(tmp_pool, "Redis RPUSH pool");
4069       cmd = "RPUSH";
4070       break;
4071 
4072     case PR_REDIS_LIST_FL_LEFT:
4073       pr_pool_tag(tmp_pool, "Redis LPUSH pool");
4074       cmd = "LPUSH";
4075       break;
4076 
4077     default:
4078       destroy_pool(tmp_pool);
4079       errno = EINVAL;
4080       return -1;
4081   }
4082 
4083   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4084 
4085   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4086   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4087   xerrno = errno;
4088 
4089   reply = handle_reply(redis, cmd, reply);
4090   if (reply == NULL) {
4091     pr_trace_msg(trace_channel, 2,
4092       "error pushing to list using key (%lu bytes): %s",
4093       (unsigned long) keysz, strerror(errno));
4094     destroy_pool(tmp_pool);
4095     errno = xerrno;
4096     return -1;
4097   }
4098 
4099   if (reply->type != REDIS_REPLY_INTEGER) {
4100     pr_trace_msg(trace_channel, 2,
4101       "expected INTEGER reply for %s, got %s", cmd,
4102       get_reply_type(reply->type));
4103 
4104     if (reply->type == REDIS_REPLY_ERROR) {
4105       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4106     }
4107 
4108     freeReplyObject(reply);
4109     destroy_pool(tmp_pool);
4110     errno = EINVAL;
4111     return -1;
4112   }
4113 
4114   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4115 
4116   freeReplyObject(reply);
4117   destroy_pool(tmp_pool);
4118   return 0;
4119 }
4120 
pr_redis_list_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)4121 int pr_redis_list_kremove(pr_redis_t *redis, module *m, const char *key,
4122     size_t keysz) {
4123 
4124   /* Note: We can actually use just DEL here. */
4125   return pr_redis_kremove(redis, m, key, keysz);
4126 }
4127 
pr_redis_list_krotate(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,void ** value,size_t * valuesz)4128 int pr_redis_list_krotate(pool *p, pr_redis_t *redis, module *m,
4129     const char *key, size_t keysz, void **value, size_t *valuesz) {
4130   int res = 0, xerrno = 0;
4131   pool *tmp_pool = NULL;
4132   const char *cmd = NULL;
4133   redisReply *reply;
4134 
4135   if (p == NULL ||
4136       redis == NULL ||
4137       m == NULL ||
4138       key == NULL ||
4139       keysz == 0 ||
4140       value == NULL ||
4141       valuesz == NULL) {
4142     errno = EINVAL;
4143     return -1;
4144   }
4145 
4146   tmp_pool = make_sub_pool(redis->pool);
4147   pr_pool_tag(tmp_pool, "Redis RPOPLPUSH pool");
4148 
4149   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4150 
4151   cmd = "RPOPLPUSH";
4152   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4153   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, key, keysz);
4154   xerrno = errno;
4155 
4156   reply = handle_reply(redis, cmd, reply);
4157   if (reply == NULL) {
4158     pr_trace_msg(trace_channel, 2,
4159       "error rotating list using key (%lu bytes): %s", (unsigned long) keysz,
4160       strerror(errno));
4161     destroy_pool(tmp_pool);
4162     errno = xerrno;
4163     return -1;
4164   }
4165 
4166   if (reply->type != REDIS_REPLY_STRING &&
4167       reply->type != REDIS_REPLY_NIL) {
4168     pr_trace_msg(trace_channel, 2,
4169       "expected STRING or NIL reply for %s, got %s", cmd,
4170       get_reply_type(reply->type));
4171 
4172     if (reply->type == REDIS_REPLY_ERROR) {
4173       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4174     }
4175 
4176     freeReplyObject(reply);
4177     destroy_pool(tmp_pool);
4178     errno = EINVAL;
4179     return -1;
4180   }
4181 
4182   if (reply->type == REDIS_REPLY_STRING) {
4183     pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
4184       reply->str);
4185     *valuesz = reply->len;
4186     *value = palloc(p, reply->len);
4187     memcpy(*value, reply->str, reply->len);
4188     res = 0;
4189 
4190   } else {
4191     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
4192     xerrno = ENOENT;
4193     res = -1;
4194   }
4195 
4196   freeReplyObject(reply);
4197   destroy_pool(tmp_pool);
4198 
4199   errno = xerrno;
4200   return res;
4201 }
4202 
pr_redis_list_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx,void * value,size_t valuesz)4203 int pr_redis_list_kset(pr_redis_t *redis, module *m, const char *key,
4204     size_t keysz, unsigned int idx, void *value, size_t valuesz) {
4205   int xerrno = 0;
4206   pool *tmp_pool = NULL;
4207   const char *cmd = NULL;
4208   redisReply *reply;
4209 
4210   if (redis == NULL ||
4211       m == NULL ||
4212       key == NULL ||
4213       keysz == 0 ||
4214       value == NULL ||
4215       valuesz == 0) {
4216     errno = EINVAL;
4217     return -1;
4218   }
4219 
4220   tmp_pool = make_sub_pool(redis->pool);
4221   pr_pool_tag(tmp_pool, "Redis LSET pool");
4222 
4223   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4224 
4225   cmd = "LSET";
4226   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4227   reply = redisCommand(redis->ctx, "%s %b %u %b", cmd, key, keysz, idx, value,
4228     valuesz);
4229   xerrno = errno;
4230 
4231   reply = handle_reply(redis, cmd, reply);
4232   if (reply == NULL) {
4233     pr_trace_msg(trace_channel, 2,
4234       "error setting item at index %u in list using key (%lu bytes): %s", idx,
4235       (unsigned long) keysz, strerror(errno));
4236     destroy_pool(tmp_pool);
4237     errno = xerrno;
4238     return -1;
4239   }
4240 
4241   if (reply->type != REDIS_REPLY_STRING &&
4242       reply->type != REDIS_REPLY_STATUS) {
4243     pr_trace_msg(trace_channel, 2,
4244       "expected STRING or STATUS reply for %s, got %s", cmd,
4245       get_reply_type(reply->type));
4246 
4247     if (reply->type == REDIS_REPLY_ERROR) {
4248       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4249     }
4250 
4251     freeReplyObject(reply);
4252     destroy_pool(tmp_pool);
4253     errno = EINVAL;
4254     return -1;
4255   }
4256 
4257   pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
4258     reply->str);
4259 
4260   freeReplyObject(reply);
4261   destroy_pool(tmp_pool);
4262   return 0;
4263 }
4264 
pr_redis_list_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs)4265 int pr_redis_list_ksetall(pr_redis_t *redis, module *m, const char *key,
4266     size_t keysz, array_header *values, array_header *valueszs) {
4267   register unsigned int i;
4268   int res, xerrno = 0;
4269   pool *tmp_pool = NULL;
4270   array_header *args, *arglens;
4271   const char *cmd = NULL;
4272   redisReply *reply;
4273 
4274   if (redis == NULL ||
4275       m == NULL ||
4276       key == NULL ||
4277       keysz == 0 ||
4278       values == NULL ||
4279       values->nelts == 0 ||
4280       valueszs == NULL ||
4281       valueszs->nelts == 0 ||
4282       values->nelts != valueszs->nelts) {
4283     errno = EINVAL;
4284     return -1;
4285   }
4286 
4287   /* First, delete any existing list at this key; a set operation, in my mind,
4288    * is a complete overwrite.
4289    */
4290   res = pr_redis_list_kremove(redis, m, key, keysz);
4291   if (res < 0 &&
4292       errno != ENOENT) {
4293     return -1;
4294   }
4295 
4296   tmp_pool = make_sub_pool(redis->pool);
4297   pr_pool_tag(tmp_pool, "Redis RPUSH pool");
4298 
4299   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4300 
4301   cmd = "RPUSH";
4302   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4303 
4304   args = make_array(tmp_pool, 0, sizeof(char *));
4305   arglens = make_array(tmp_pool, 0, sizeof(size_t));
4306 
4307   *((char **) push_array(args)) = pstrdup(tmp_pool, cmd);
4308   *((size_t *) push_array(arglens)) = strlen(cmd);
4309 
4310   *((char **) push_array(args)) = (char *) key;
4311   *((size_t *) push_array(arglens)) = keysz;
4312 
4313   for (i = 0; i < values->nelts; i++) {
4314     pr_signals_handle();
4315 
4316     *((char **) push_array(args)) = ((char **) values->elts)[i];
4317     *((size_t *) push_array(arglens)) = ((size_t *) valueszs->elts)[i];
4318   }
4319 
4320   reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
4321   xerrno = errno;
4322 
4323   reply = handle_reply(redis, cmd, reply);
4324   if (reply == NULL) {
4325     pr_trace_msg(trace_channel, 2,
4326       "error setting items in list using key (%lu bytes): %s",
4327       (unsigned long) keysz, strerror(errno));
4328     destroy_pool(tmp_pool);
4329     errno = xerrno;
4330     return -1;
4331   }
4332 
4333   if (reply->type != REDIS_REPLY_INTEGER) {
4334     pr_trace_msg(trace_channel, 2,
4335       "expected INTEGER reply for %s, got %s", cmd,
4336       get_reply_type(reply->type));
4337 
4338     if (reply->type == REDIS_REPLY_ERROR) {
4339       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4340     }
4341     freeReplyObject(reply);
4342     destroy_pool(tmp_pool);
4343     errno = EINVAL;
4344     return -1;
4345   }
4346 
4347   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4348 
4349   freeReplyObject(reply);
4350   destroy_pool(tmp_pool);
4351   return 0;
4352 }
4353 
pr_redis_set_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4354 int pr_redis_set_kadd(pr_redis_t *redis, module *m, const char *key,
4355     size_t keysz, void *value, size_t valuesz) {
4356   int xerrno = 0, exists = FALSE;
4357   pool *tmp_pool = NULL;
4358   const char *cmd = NULL;
4359   redisReply *reply;
4360 
4361   if (redis == NULL ||
4362       m == NULL ||
4363       key == NULL ||
4364       keysz == 0 ||
4365       value == NULL ||
4366       valuesz == 0) {
4367     errno = EINVAL;
4368     return -1;
4369   }
4370 
4371   exists = pr_redis_set_kexists(redis, m, key, keysz, value, valuesz);
4372   if (exists == TRUE) {
4373     errno = EEXIST;
4374     return -1;
4375   }
4376 
4377   tmp_pool = make_sub_pool(redis->pool);
4378   pr_pool_tag(tmp_pool, "Redis SADD pool");
4379 
4380   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4381 
4382   cmd = "SADD";
4383   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4384   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4385   xerrno = errno;
4386 
4387   reply = handle_reply(redis, cmd, reply);
4388   if (reply == NULL) {
4389     pr_trace_msg(trace_channel, 2,
4390       "error adding to set using key (%lu bytes): %s",
4391       (unsigned long) keysz, strerror(errno));
4392     destroy_pool(tmp_pool);
4393     errno = xerrno;
4394     return -1;
4395   }
4396 
4397   if (reply->type != REDIS_REPLY_INTEGER) {
4398     pr_trace_msg(trace_channel, 2,
4399       "expected INTEGER reply for %s, got %s", cmd,
4400       get_reply_type(reply->type));
4401 
4402     if (reply->type == REDIS_REPLY_ERROR) {
4403       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4404     }
4405 
4406     freeReplyObject(reply);
4407     destroy_pool(tmp_pool);
4408     errno = EINVAL;
4409     return -1;
4410   }
4411 
4412   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4413 
4414   freeReplyObject(reply);
4415   destroy_pool(tmp_pool);
4416   return 0;
4417 }
4418 
pr_redis_set_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)4419 int pr_redis_set_kcount(pr_redis_t *redis, module *m, const char *key,
4420     size_t keysz, uint64_t *count) {
4421   int xerrno = 0;
4422   pool *tmp_pool = NULL;
4423   const char *cmd = NULL;
4424   redisReply *reply;
4425 
4426   if (redis == NULL ||
4427       m == NULL ||
4428       key == NULL ||
4429       keysz == 0 ||
4430       count == NULL) {
4431     errno = EINVAL;
4432     return -1;
4433   }
4434 
4435   tmp_pool = make_sub_pool(redis->pool);
4436   pr_pool_tag(tmp_pool, "Redis SCARD pool");
4437 
4438   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4439 
4440   cmd = "SCARD";
4441   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4442   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
4443   xerrno = errno;
4444 
4445   reply = handle_reply(redis, cmd, reply);
4446   if (reply == NULL) {
4447     pr_trace_msg(trace_channel, 2,
4448       "error getting count of set using key (%lu bytes): %s",
4449       (unsigned long) keysz, strerror(errno));
4450     destroy_pool(tmp_pool);
4451     errno = xerrno;
4452     return -1;
4453   }
4454 
4455   if (reply->type != REDIS_REPLY_INTEGER) {
4456     pr_trace_msg(trace_channel, 2,
4457       "expected INTEGER reply for %s, got %s", cmd,
4458       get_reply_type(reply->type));
4459 
4460     if (reply->type == REDIS_REPLY_ERROR) {
4461       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4462     }
4463 
4464     freeReplyObject(reply);
4465     destroy_pool(tmp_pool);
4466     errno = EINVAL;
4467     return -1;
4468   }
4469 
4470   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4471   *count = (uint64_t) reply->integer;
4472 
4473   freeReplyObject(reply);
4474   destroy_pool(tmp_pool);
4475   return 0;
4476 }
4477 
pr_redis_set_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4478 int pr_redis_set_kdelete(pr_redis_t *redis, module *m, const char *key,
4479     size_t keysz, void *value, size_t valuesz) {
4480   int xerrno = 0;
4481   pool *tmp_pool = NULL;
4482   const char *cmd = NULL;
4483   redisReply *reply;
4484   long long count = 0;
4485 
4486   if (redis == NULL ||
4487       m == NULL ||
4488       key == NULL ||
4489       keysz == 0 ||
4490       value == NULL ||
4491       valuesz == 0) {
4492     errno = EINVAL;
4493     return -1;
4494   }
4495 
4496   tmp_pool = make_sub_pool(redis->pool);
4497   pr_pool_tag(tmp_pool, "Redis SREM pool");
4498 
4499   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4500 
4501   cmd = "SREM";
4502   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4503   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4504   xerrno = errno;
4505 
4506   reply = handle_reply(redis, cmd, reply);
4507   if (reply == NULL) {
4508     pr_trace_msg(trace_channel, 2,
4509       "error deleting item from set using key (%lu bytes): %s",
4510       (unsigned long) keysz, strerror(errno));
4511     destroy_pool(tmp_pool);
4512     errno = xerrno;
4513     return -1;
4514   }
4515 
4516   if (reply->type != REDIS_REPLY_INTEGER) {
4517     pr_trace_msg(trace_channel, 2,
4518       "expected INTEGER reply for %s, got %s", cmd,
4519       get_reply_type(reply->type));
4520 
4521     if (reply->type == REDIS_REPLY_ERROR) {
4522       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4523     }
4524 
4525     freeReplyObject(reply);
4526     destroy_pool(tmp_pool);
4527     errno = EINVAL;
4528     return -1;
4529   }
4530 
4531   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4532   count = reply->integer;
4533 
4534   freeReplyObject(reply);
4535   destroy_pool(tmp_pool);
4536 
4537   if (count == 0) {
4538     /* No items removed. */
4539     errno = ENOENT;
4540     return -1;
4541   }
4542 
4543   return 0;
4544 }
4545 
pr_redis_set_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4546 int pr_redis_set_kexists(pr_redis_t *redis, module *m, const char *key,
4547     size_t keysz, void *value, size_t valuesz) {
4548   int xerrno = 0, exists = FALSE;
4549   pool *tmp_pool = NULL;
4550   const char *cmd = NULL;
4551   redisReply *reply;
4552 
4553   if (redis == NULL ||
4554       m == NULL ||
4555       key == NULL ||
4556       keysz == 0 ||
4557       value == NULL ||
4558       valuesz == 0) {
4559     errno = EINVAL;
4560     return -1;
4561   }
4562 
4563   tmp_pool = make_sub_pool(redis->pool);
4564   pr_pool_tag(tmp_pool, "Redis SISMEMBER pool");
4565 
4566   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4567 
4568   cmd = "SISMEMBER";
4569   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4570   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4571   xerrno = errno;
4572 
4573   reply = handle_reply(redis, cmd, reply);
4574   if (reply == NULL) {
4575     pr_trace_msg(trace_channel, 2,
4576       "error checking item in set using key (%lu bytes): %s",
4577       (unsigned long) keysz, strerror(errno));
4578     destroy_pool(tmp_pool);
4579     errno = xerrno;
4580     return -1;
4581   }
4582 
4583   if (reply->type != REDIS_REPLY_INTEGER) {
4584     pr_trace_msg(trace_channel, 2,
4585       "expected INTEGER reply for %s, got %s", cmd,
4586       get_reply_type(reply->type));
4587 
4588     if (reply->type == REDIS_REPLY_ERROR) {
4589       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4590     }
4591 
4592     freeReplyObject(reply);
4593     destroy_pool(tmp_pool);
4594     errno = EINVAL;
4595     return -1;
4596   }
4597 
4598   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4599   exists = reply->integer ? TRUE : FALSE;
4600 
4601   freeReplyObject(reply);
4602   destroy_pool(tmp_pool);
4603   return exists;
4604 }
4605 
pr_redis_set_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values,array_header ** valueszs)4606 int pr_redis_set_kgetall(pool *p, pr_redis_t *redis, module *m, const char *key,
4607     size_t keysz, array_header **values, array_header **valueszs) {
4608   int res = 0, xerrno = 0;
4609   pool *tmp_pool = NULL;
4610   const char *cmd = NULL;
4611   redisReply *reply;
4612 
4613   if (p == NULL ||
4614       redis == NULL ||
4615       m == NULL ||
4616       key == NULL ||
4617       keysz == 0 ||
4618       values == NULL ||
4619       valueszs == NULL) {
4620     errno = EINVAL;
4621     return -1;
4622   }
4623 
4624   tmp_pool = make_sub_pool(redis->pool);
4625   pr_pool_tag(tmp_pool, "Redis SMEMBERS pool");
4626 
4627   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4628 
4629   cmd = "SMEMBERS";
4630   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4631   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
4632   xerrno = errno;
4633 
4634   reply = handle_reply(redis, cmd, reply);
4635   if (reply == NULL) {
4636     pr_trace_msg(trace_channel, 2,
4637       "error getting items in set using key (%lu bytes): %s",
4638       (unsigned long) keysz, strerror(errno));
4639     destroy_pool(tmp_pool);
4640     errno = xerrno;
4641     return -1;
4642   }
4643 
4644   if (reply->type != REDIS_REPLY_ARRAY) {
4645     pr_trace_msg(trace_channel, 2,
4646       "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
4647 
4648     if (reply->type == REDIS_REPLY_ERROR) {
4649       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4650     }
4651 
4652     freeReplyObject(reply);
4653     destroy_pool(tmp_pool);
4654     errno = EINVAL;
4655     return -1;
4656   }
4657 
4658   if (reply->elements > 0) {
4659     register unsigned int i;
4660 
4661     pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
4662       (unsigned long) reply->elements,
4663       reply->elements != 1 ? "elements" : "element");
4664 
4665     *values = make_array(p, 0, sizeof(void *));
4666     *valueszs = make_array(p, 0, sizeof(size_t));
4667 
4668     for (i = 0; i < reply->elements; i++) {
4669       redisReply *value_elt;
4670       void *value_data = NULL;
4671       size_t value_datasz = 0;
4672 
4673       value_elt = reply->element[i];
4674       if (value_elt->type == REDIS_REPLY_STRING) {
4675         value_datasz = value_elt->len;
4676         value_data = palloc(p, value_datasz);
4677         memcpy(value_data, value_elt->str, value_datasz);
4678 
4679       } else {
4680         pr_trace_msg(trace_channel, 2,
4681           "expected STRING element at index %u, got %s", i + 1,
4682           get_reply_type(value_elt->type));
4683       }
4684 
4685       if (value_data != NULL) {
4686         *((void **) push_array(*values)) = value_data;
4687         *((size_t *) push_array(*valueszs)) = value_datasz;
4688       }
4689     }
4690 
4691     res = 0;
4692 
4693   } else {
4694     xerrno = ENOENT;
4695     res = -1;
4696   }
4697 
4698   freeReplyObject(reply);
4699   destroy_pool(tmp_pool);
4700 
4701   errno = xerrno;
4702   return res;
4703 }
4704 
pr_redis_set_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)4705 int pr_redis_set_kremove(pr_redis_t *redis, module *m, const char *key,
4706     size_t keysz) {
4707 
4708   /* Note: We can actually use just DEL here. */
4709   return pr_redis_kremove(redis, m, key, keysz);
4710 }
4711 
pr_redis_set_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs)4712 int pr_redis_set_ksetall(pr_redis_t *redis, module *m, const char *key,
4713     size_t keysz, array_header *values, array_header *valueszs) {
4714   register unsigned int i;
4715   int res, xerrno = 0;
4716   pool *tmp_pool = NULL;
4717   array_header *args, *arglens;
4718   const char *cmd = NULL;
4719   redisReply *reply;
4720 
4721   if (redis == NULL ||
4722       m == NULL ||
4723       key == NULL ||
4724       keysz == 0 ||
4725       values == NULL ||
4726       values->nelts == 0 ||
4727       valueszs == NULL ||
4728       valueszs->nelts == 0 ||
4729       values->nelts != valueszs->nelts) {
4730     errno = EINVAL;
4731     return -1;
4732   }
4733 
4734   /* First, delete any existing set at this key; a set operation, in my mind,
4735    * is a complete overwrite.
4736    */
4737   res = pr_redis_set_kremove(redis, m, key, keysz);
4738   if (res < 0 &&
4739       errno != ENOENT) {
4740     return -1;
4741   }
4742 
4743   tmp_pool = make_sub_pool(redis->pool);
4744   pr_pool_tag(tmp_pool, "Redis SADD pool");
4745 
4746   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4747 
4748   cmd = "SADD";
4749   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4750 
4751   args = make_array(tmp_pool, 0, sizeof(char *));
4752   arglens = make_array(tmp_pool, 0, sizeof(size_t));
4753 
4754   *((char **) push_array(args)) = pstrdup(tmp_pool, cmd);
4755   *((size_t *) push_array(arglens)) = strlen(cmd);
4756 
4757   *((char **) push_array(args)) = (char *) key;
4758   *((size_t *) push_array(arglens)) = keysz;
4759 
4760   for (i = 0; i < values->nelts; i++) {
4761     pr_signals_handle();
4762 
4763     *((char **) push_array(args)) = ((char **) values->elts)[i];
4764     *((size_t *) push_array(arglens)) = ((size_t *) valueszs->elts)[i];
4765   }
4766 
4767   reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
4768   xerrno = errno;
4769 
4770   reply = handle_reply(redis, cmd, reply);
4771   if (reply == NULL) {
4772     pr_trace_msg(trace_channel, 2,
4773       "error setting items in set using key (%lu bytes): %s",
4774       (unsigned long) keysz, strerror(errno));
4775     destroy_pool(tmp_pool);
4776     errno = xerrno;
4777     return -1;
4778   }
4779 
4780   if (reply->type != REDIS_REPLY_INTEGER) {
4781     pr_trace_msg(trace_channel, 2,
4782       "expected INTEGER reply for %s, got %s", cmd,
4783       get_reply_type(reply->type));
4784 
4785     if (reply->type == REDIS_REPLY_ERROR) {
4786       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4787     }
4788     freeReplyObject(reply);
4789     destroy_pool(tmp_pool);
4790     errno = EINVAL;
4791     return -1;
4792   }
4793 
4794   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4795 
4796   freeReplyObject(reply);
4797   destroy_pool(tmp_pool);
4798   return 0;
4799 }
4800 
pr_redis_sorted_set_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float score)4801 int pr_redis_sorted_set_kadd(pr_redis_t *redis, module *m, const char *key,
4802     size_t keysz, void *value, size_t valuesz, float score) {
4803   int xerrno = 0, exists = FALSE;
4804   pool *tmp_pool = NULL;
4805   const char *cmd = NULL;
4806   redisReply *reply;
4807 
4808   if (redis == NULL ||
4809       m == NULL ||
4810       key == NULL ||
4811       keysz == 0 ||
4812       value == NULL ||
4813       valuesz == 0) {
4814     errno = EINVAL;
4815     return -1;
4816   }
4817 
4818   /* Note: We should probably detect the server version, and instead of using
4819    * a separate existence check, if server >= 3.0.2, use the NX/XX flags of
4820    * the ZADD command.
4821    */
4822   exists = pr_redis_sorted_set_kexists(redis, m, key, keysz, value, valuesz);
4823   if (exists == TRUE) {
4824     errno = EEXIST;
4825     return -1;
4826   }
4827 
4828   tmp_pool = make_sub_pool(redis->pool);
4829   pr_pool_tag(tmp_pool, "Redis ZADD pool");
4830 
4831   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4832 
4833   cmd = "ZADD";
4834   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4835   reply = redisCommand(redis->ctx, "%s %b %f %b", cmd, key, keysz, score,
4836     value, valuesz);
4837   xerrno = errno;
4838 
4839   reply = handle_reply(redis, cmd, reply);
4840   if (reply == NULL) {
4841     pr_trace_msg(trace_channel, 2,
4842       "error adding to sorted set using key (%lu bytes): %s",
4843       (unsigned long) keysz, strerror(errno));
4844     destroy_pool(tmp_pool);
4845     errno = xerrno;
4846     return -1;
4847   }
4848 
4849   if (reply->type != REDIS_REPLY_INTEGER) {
4850     pr_trace_msg(trace_channel, 2,
4851       "expected INTEGER reply for %s, got %s", cmd,
4852       get_reply_type(reply->type));
4853 
4854     if (reply->type == REDIS_REPLY_ERROR) {
4855       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4856     }
4857     freeReplyObject(reply);
4858     destroy_pool(tmp_pool);
4859     errno = EINVAL;
4860     return -1;
4861   }
4862 
4863   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4864 
4865   freeReplyObject(reply);
4866   destroy_pool(tmp_pool);
4867   return 0;
4868 }
4869 
pr_redis_sorted_set_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)4870 int pr_redis_sorted_set_kcount(pr_redis_t *redis, module *m, const char *key,
4871     size_t keysz, uint64_t *count) {
4872   int xerrno = 0;
4873   pool *tmp_pool = NULL;
4874   const char *cmd = NULL;
4875   redisReply *reply;
4876 
4877   if (redis == NULL ||
4878       m == NULL ||
4879       key == NULL ||
4880       keysz == 0 ||
4881       count == NULL) {
4882     errno = EINVAL;
4883     return -1;
4884   }
4885 
4886   tmp_pool = make_sub_pool(redis->pool);
4887   pr_pool_tag(tmp_pool, "Redis ZCARD pool");
4888 
4889   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4890 
4891   cmd = "ZCARD";
4892   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4893   reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
4894   xerrno = errno;
4895 
4896   reply = handle_reply(redis, cmd, reply);
4897   if (reply == NULL) {
4898     pr_trace_msg(trace_channel, 2,
4899       "error getting count of sorted set using key (%lu bytes): %s",
4900       (unsigned long) keysz, strerror(errno));
4901     destroy_pool(tmp_pool);
4902     errno = xerrno;
4903     return -1;
4904   }
4905 
4906   if (reply->type != REDIS_REPLY_INTEGER) {
4907     pr_trace_msg(trace_channel, 2,
4908       "expected INTEGER reply for %s, got %s", cmd,
4909       get_reply_type(reply->type));
4910 
4911     if (reply->type == REDIS_REPLY_ERROR) {
4912       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4913     }
4914 
4915     freeReplyObject(reply);
4916     destroy_pool(tmp_pool);
4917     errno = EINVAL;
4918     return -1;
4919   }
4920 
4921   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4922   *count = (uint64_t) reply->integer;
4923 
4924   freeReplyObject(reply);
4925   destroy_pool(tmp_pool);
4926   return 0;
4927 }
4928 
pr_redis_sorted_set_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4929 int pr_redis_sorted_set_kdelete(pr_redis_t *redis, module *m, const char *key,
4930     size_t keysz, void *value, size_t valuesz) {
4931   int xerrno = 0;
4932   pool *tmp_pool = NULL;
4933   const char *cmd = NULL;
4934   redisReply *reply;
4935   long long count = 0;
4936 
4937   if (redis == NULL ||
4938       m == NULL ||
4939       key == NULL ||
4940       keysz == 0 ||
4941       value == NULL ||
4942       valuesz == 0) {
4943     errno = EINVAL;
4944     return -1;
4945   }
4946 
4947   tmp_pool = make_sub_pool(redis->pool);
4948   pr_pool_tag(tmp_pool, "Redis ZREM pool");
4949 
4950   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4951 
4952   cmd = "ZREM";
4953   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4954   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4955   xerrno = errno;
4956 
4957   reply = handle_reply(redis, cmd, reply);
4958   if (reply == NULL) {
4959     pr_trace_msg(trace_channel, 2,
4960       "error deleting item from sorted set using key (%lu bytes): %s",
4961       (unsigned long) keysz, strerror(errno));
4962     destroy_pool(tmp_pool);
4963     errno = xerrno;
4964     return -1;
4965   }
4966 
4967   if (reply->type != REDIS_REPLY_INTEGER) {
4968     pr_trace_msg(trace_channel, 2,
4969       "expected INTEGER reply for %s, got %s", cmd,
4970       get_reply_type(reply->type));
4971 
4972     if (reply->type == REDIS_REPLY_ERROR) {
4973       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4974     }
4975 
4976     freeReplyObject(reply);
4977     destroy_pool(tmp_pool);
4978     errno = EINVAL;
4979     return -1;
4980   }
4981 
4982   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4983   count = reply->integer;
4984 
4985   freeReplyObject(reply);
4986   destroy_pool(tmp_pool);
4987 
4988   if (count == 0) {
4989     /* No items removed. */
4990     errno = ENOENT;
4991     return -1;
4992   }
4993 
4994   return 0;
4995 }
4996 
pr_redis_sorted_set_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4997 int pr_redis_sorted_set_kexists(pr_redis_t *redis, module *m, const char *key,
4998     size_t keysz, void *value, size_t valuesz) {
4999   int xerrno = 0, exists = FALSE;
5000   pool *tmp_pool = NULL;
5001   const char *cmd = NULL;
5002   redisReply *reply;
5003 
5004   if (redis == NULL ||
5005       m == NULL ||
5006       key == NULL ||
5007       keysz == 0 ||
5008       value == NULL ||
5009       valuesz == 0) {
5010     errno = EINVAL;
5011     return -1;
5012   }
5013 
5014   tmp_pool = make_sub_pool(redis->pool);
5015   pr_pool_tag(tmp_pool, "Redis ZRANK pool");
5016 
5017   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5018 
5019   cmd = "ZRANK";
5020   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5021   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
5022   xerrno = errno;
5023 
5024   reply = handle_reply(redis, cmd, reply);
5025   if (reply == NULL) {
5026     pr_trace_msg(trace_channel, 2,
5027       "error checking item in sorted set using key (%lu bytes): %s",
5028       (unsigned long) keysz, strerror(errno));
5029     destroy_pool(tmp_pool);
5030     errno = xerrno;
5031     return -1;
5032   }
5033 
5034   if (reply->type != REDIS_REPLY_INTEGER &&
5035       reply->type != REDIS_REPLY_NIL) {
5036     pr_trace_msg(trace_channel, 2,
5037       "expected INTEGER or NIL reply for %s, got %s", cmd,
5038       get_reply_type(reply->type));
5039 
5040     if (reply->type == REDIS_REPLY_ERROR) {
5041       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5042     }
5043 
5044     freeReplyObject(reply);
5045     destroy_pool(tmp_pool);
5046     errno = EINVAL;
5047     return -1;
5048   }
5049 
5050   if (reply->type == REDIS_REPLY_INTEGER) {
5051     pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
5052     exists = TRUE;
5053 
5054   } else {
5055     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
5056     exists = FALSE;
5057   }
5058 
5059   freeReplyObject(reply);
5060   destroy_pool(tmp_pool);
5061   return exists;
5062 }
5063 
pr_redis_sorted_set_kgetn(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int offset,unsigned int len,array_header ** values,array_header ** valueszs,int flags)5064 int pr_redis_sorted_set_kgetn(pool *p, pr_redis_t *redis, module *m,
5065     const char *key, size_t keysz, unsigned int offset, unsigned int len,
5066     array_header **values, array_header **valueszs, int flags) {
5067   int res = 0, xerrno = 0;
5068   pool *tmp_pool = NULL;
5069   const char *cmd = NULL;
5070   redisReply *reply;
5071 
5072   if (p == NULL ||
5073       redis == NULL ||
5074       m == NULL ||
5075       key == NULL ||
5076       keysz == 0 ||
5077       values == NULL ||
5078       valueszs == NULL) {
5079     errno = EINVAL;
5080     return -1;
5081   }
5082 
5083   tmp_pool = make_sub_pool(redis->pool);
5084 
5085   switch (flags) {
5086     case PR_REDIS_SORTED_SET_FL_ASC:
5087       pr_pool_tag(tmp_pool, "Redis ZRANGE pool");
5088       cmd = "ZRANGE";
5089       break;
5090 
5091     case PR_REDIS_SORTED_SET_FL_DESC:
5092       pr_pool_tag(tmp_pool, "Redis ZREVRANGE pool");
5093       cmd = "ZREVRANGE";
5094       break;
5095 
5096     default:
5097       errno = EINVAL;
5098       return -1;
5099   }
5100 
5101   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5102 
5103   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5104 
5105   /* Since the the range is [start, stop] inclusive, and the function takes
5106    * a length, we need to subtract one for whose items these are.  Consider
5107    * an offset of 0, and a len of 1 -- to get just one item.  In that case,
5108    * stop would be 0 as well.
5109    */
5110   reply = redisCommand(redis->ctx, "%s %b %u %u", cmd, key, keysz, offset,
5111     offset + len - 1);
5112   xerrno = errno;
5113 
5114   reply = handle_reply(redis, cmd, reply);
5115   if (reply == NULL) {
5116     pr_trace_msg(trace_channel, 2,
5117       "error getting %u %s in sorted set using key (%lu bytes): %s", len,
5118       len != 1 ? "items" : "item", (unsigned long) keysz, strerror(errno));
5119     destroy_pool(tmp_pool);
5120     errno = xerrno;
5121     return -1;
5122   }
5123 
5124   if (reply->type != REDIS_REPLY_ARRAY) {
5125     pr_trace_msg(trace_channel, 2,
5126       "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
5127 
5128     if (reply->type == REDIS_REPLY_ERROR) {
5129       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5130     }
5131 
5132     freeReplyObject(reply);
5133     destroy_pool(tmp_pool);
5134     errno = EINVAL;
5135     return -1;
5136   }
5137 
5138   if (reply->elements > 0) {
5139     register unsigned int i;
5140 
5141     pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
5142       (unsigned long) reply->elements,
5143       reply->elements != 1 ? "elements" : "element");
5144 
5145     *values = make_array(p, 0, sizeof(void *));
5146     *valueszs = make_array(p, 0, sizeof(size_t));
5147 
5148     for (i = 0; i < reply->elements; i++) {
5149       redisReply *value_elt;
5150       void *value_data = NULL;
5151       size_t value_datasz = 0;
5152 
5153       value_elt = reply->element[i];
5154       if (value_elt->type == REDIS_REPLY_STRING) {
5155         value_datasz = value_elt->len;
5156         value_data = palloc(p, value_datasz);
5157         memcpy(value_data, value_elt->str, value_datasz);
5158 
5159       } else {
5160         pr_trace_msg(trace_channel, 2,
5161           "expected STRING element at index %u, got %s", i + 1,
5162           get_reply_type(value_elt->type));
5163       }
5164 
5165       if (value_data != NULL) {
5166         *((void **) push_array(*values)) = value_data;
5167         *((size_t *) push_array(*valueszs)) = value_datasz;
5168       }
5169     }
5170 
5171     res = 0;
5172 
5173   } else {
5174     xerrno = ENOENT;
5175     res = -1;
5176   }
5177 
5178   freeReplyObject(reply);
5179   destroy_pool(tmp_pool);
5180 
5181   errno = xerrno;
5182   return res;
5183 }
5184 
pr_redis_sorted_set_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float incr,float * score)5185 int pr_redis_sorted_set_kincr(pr_redis_t *redis, module *m, const char *key,
5186     size_t keysz, void *value, size_t valuesz, float incr, float *score) {
5187   int res, xerrno, exists;
5188   pool *tmp_pool = NULL;
5189   const char *cmd = NULL;
5190   redisReply *reply;
5191 
5192   if (redis == NULL ||
5193       m == NULL ||
5194       key == NULL ||
5195       keysz == 0 ||
5196       value == NULL ||
5197       valuesz == 0 ||
5198       score == NULL) {
5199     errno = EINVAL;
5200     return -1;
5201   }
5202 
5203   exists = pr_redis_sorted_set_kexists(redis, m, key, keysz, value, valuesz);
5204   if (exists == FALSE) {
5205     errno = ENOENT;
5206     return -1;
5207   }
5208 
5209   tmp_pool = make_sub_pool(redis->pool);
5210   pr_pool_tag(tmp_pool, "Redis ZINCRBY pool");
5211 
5212   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5213 
5214   cmd = "ZINCRBY";
5215   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5216   reply = redisCommand(redis->ctx, "%s %b %f %b", cmd, key, keysz, incr,
5217     value, valuesz);
5218   xerrno = errno;
5219 
5220   reply = handle_reply(redis, cmd, reply);
5221   if (reply == NULL) {
5222     pr_trace_msg(trace_channel, 2,
5223       "error incrementing key (%lu bytes) by %0.3f in sorted set using %s: %s",
5224       (unsigned long) keysz, incr, cmd, strerror(errno));
5225     destroy_pool(tmp_pool);
5226     errno = EIO;
5227     return -1;
5228   }
5229 
5230   if (reply->type != REDIS_REPLY_STRING) {
5231     pr_trace_msg(trace_channel, 2,
5232       "expected STRING reply for %s, got %s", cmd,
5233       get_reply_type(reply->type));
5234 
5235     if (reply->type == REDIS_REPLY_ERROR) {
5236       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5237     }
5238 
5239     freeReplyObject(reply);
5240     destroy_pool(tmp_pool);
5241     errno = EINVAL;
5242     return -1;
5243   }
5244 
5245   pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
5246     reply->str);
5247 
5248   res = sscanf(reply->str, "%f", score);
5249   if (res != 1) {
5250     pr_trace_msg(trace_channel, 3, "error parsing '%.*s' as float",
5251       (int) reply->len, reply->str);
5252     xerrno = EINVAL;
5253     res = -1;
5254 
5255   } else {
5256     res = 0;
5257   }
5258 
5259   freeReplyObject(reply);
5260   destroy_pool(tmp_pool);
5261 
5262   errno = xerrno;
5263   return res;
5264 }
5265 
pr_redis_sorted_set_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)5266 int pr_redis_sorted_set_kremove(pr_redis_t *redis, module *m, const char *key,
5267     size_t keysz) {
5268 
5269   /* Note: We can actually use just DEL here. */
5270   return pr_redis_kremove(redis, m, key, keysz);
5271 }
5272 
pr_redis_sorted_set_kscore(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float * score)5273 int pr_redis_sorted_set_kscore(pr_redis_t *redis, module *m, const char *key,
5274     size_t keysz, void *value, size_t valuesz, float *score) {
5275   int res, xerrno;
5276   pool *tmp_pool = NULL;
5277   const char *cmd = NULL;
5278   redisReply *reply;
5279 
5280   if (redis == NULL ||
5281       m == NULL ||
5282       key == NULL ||
5283       keysz == 0 ||
5284       value == NULL ||
5285       valuesz == 0 ||
5286       score == NULL) {
5287     errno = EINVAL;
5288     return -1;
5289   }
5290 
5291   tmp_pool = make_sub_pool(redis->pool);
5292   pr_pool_tag(tmp_pool, "Redis ZSCORE pool");
5293 
5294   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5295 
5296   cmd = "ZSCORE";
5297   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5298   reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
5299   xerrno = errno;
5300 
5301   reply = handle_reply(redis, cmd, reply);
5302   if (reply == NULL) {
5303     pr_trace_msg(trace_channel, 2,
5304       "error gettin score for key (%lu bytes) using %s: %s",
5305       (unsigned long) keysz, cmd, strerror(errno));
5306     destroy_pool(tmp_pool);
5307     errno = EIO;
5308     return -1;
5309   }
5310 
5311   if (reply->type != REDIS_REPLY_STRING &&
5312       reply->type != REDIS_REPLY_NIL) {
5313     pr_trace_msg(trace_channel, 2,
5314       "expected STRING or NIL reply for %s, got %s", cmd,
5315       get_reply_type(reply->type));
5316 
5317     if (reply->type == REDIS_REPLY_ERROR) {
5318       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5319     }
5320 
5321     freeReplyObject(reply);
5322     destroy_pool(tmp_pool);
5323     errno = EINVAL;
5324     return -1;
5325   }
5326 
5327   if (reply->type == REDIS_REPLY_STRING) {
5328     pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
5329       reply->str);
5330 
5331     res = sscanf(reply->str, "%f", score);
5332     if (res != 1) {
5333       pr_trace_msg(trace_channel, 3, "error parsing '%.*s' as float",
5334         (int) reply->len, reply->str);
5335       xerrno = EINVAL;
5336       res = -1;
5337 
5338     } else {
5339       res = 0;
5340     }
5341 
5342   } else {
5343     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
5344     xerrno = ENOENT;
5345     res = -1;
5346   }
5347 
5348   freeReplyObject(reply);
5349   destroy_pool(tmp_pool);
5350 
5351   errno = xerrno;
5352   return res;
5353 }
5354 
pr_redis_sorted_set_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float score)5355 int pr_redis_sorted_set_kset(pr_redis_t *redis, module *m, const char *key,
5356     size_t keysz, void *value, size_t valuesz, float score) {
5357   int xerrno = 0, exists = FALSE;
5358   pool *tmp_pool = NULL;
5359   const char *cmd = NULL;
5360   redisReply *reply;
5361 
5362   if (redis == NULL ||
5363       m == NULL ||
5364       key == NULL ||
5365       keysz == 0 ||
5366       value == NULL ||
5367       valuesz == 0) {
5368     errno = EINVAL;
5369     return -1;
5370   }
5371 
5372   /* Note: We should probably detect the server version, and instead of using
5373    * a separate existence check, if server >= 3.0.2, use the NX/XX flags of
5374    * the ZADD command.
5375    */
5376   exists = pr_redis_sorted_set_kexists(redis, m, key, keysz, value, valuesz);
5377   if (exists == FALSE) {
5378     errno = ENOENT;
5379     return -1;
5380   }
5381 
5382   tmp_pool = make_sub_pool(redis->pool);
5383   pr_pool_tag(tmp_pool, "Redis ZADD pool");
5384 
5385   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5386 
5387   cmd = "ZADD";
5388   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5389   reply = redisCommand(redis->ctx, "%s %b %f %b", cmd, key, keysz, score,
5390     value, valuesz);
5391   xerrno = errno;
5392 
5393   reply = handle_reply(redis, cmd, reply);
5394   if (reply == NULL) {
5395     pr_trace_msg(trace_channel, 2,
5396       "error setting item in sorted set using key (%lu bytes): %s",
5397       (unsigned long) keysz, strerror(errno));
5398     destroy_pool(tmp_pool);
5399     errno = xerrno;
5400     return -1;
5401   }
5402 
5403   if (reply->type != REDIS_REPLY_INTEGER) {
5404     pr_trace_msg(trace_channel, 2,
5405       "expected INTEGER reply for %s, got %s", cmd,
5406       get_reply_type(reply->type));
5407 
5408     if (reply->type == REDIS_REPLY_ERROR) {
5409       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5410     }
5411     freeReplyObject(reply);
5412     destroy_pool(tmp_pool);
5413     errno = EINVAL;
5414     return -1;
5415   }
5416 
5417   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
5418 
5419   freeReplyObject(reply);
5420   destroy_pool(tmp_pool);
5421   return 0;
5422 }
5423 
f2s(pool * p,float num,size_t * len)5424 static char *f2s(pool *p, float num, size_t *len) {
5425   int res;
5426   char *s;
5427   size_t sz;
5428 
5429   sz = 32;
5430   s = pcalloc(p, sz + 1);
5431   res = pr_snprintf(s, sz, "%0.3f", num);
5432 
5433   *len = res;
5434   return s;
5435 }
5436 
pr_redis_sorted_set_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs,array_header * scores)5437 int pr_redis_sorted_set_ksetall(pr_redis_t *redis, module *m, const char *key,
5438     size_t keysz, array_header *values, array_header *valueszs,
5439     array_header *scores) {
5440   register unsigned int i;
5441   int res, xerrno = 0;
5442   pool *tmp_pool = NULL;
5443   array_header *args, *arglens;
5444   const char *cmd = NULL;
5445   redisReply *reply;
5446 
5447   if (redis == NULL ||
5448       m == NULL ||
5449       key == NULL ||
5450       keysz == 0 ||
5451       values == NULL ||
5452       values->nelts == 0 ||
5453       valueszs == NULL ||
5454       valueszs->nelts == 0 ||
5455       scores == NULL ||
5456       scores->nelts == 0) {
5457     errno = EINVAL;
5458     return -1;
5459   }
5460 
5461   if (values->nelts != valueszs->nelts ||
5462       values->nelts != scores->nelts) {
5463     errno = EINVAL;
5464     return -1;
5465   }
5466 
5467   /* First, delete any existing sorted set at this key; a set operation,
5468    * in my mind, is a complete overwrite.
5469    */
5470   res = pr_redis_sorted_set_kremove(redis, m, key, keysz);
5471   if (res < 0 &&
5472       errno != ENOENT) {
5473     return -1;
5474   }
5475 
5476   tmp_pool = make_sub_pool(redis->pool);
5477   pr_pool_tag(tmp_pool, "Redis ZADD pool");
5478 
5479   key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5480 
5481   cmd = "ZADD";
5482   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5483 
5484   args = make_array(tmp_pool, 0, sizeof(char *));
5485   arglens = make_array(tmp_pool, 0, sizeof(size_t));
5486 
5487   *((char **) push_array(args)) = pstrdup(tmp_pool, cmd);
5488   *((size_t *) push_array(arglens)) = strlen(cmd);
5489 
5490   *((char **) push_array(args)) = (char *) key;
5491   *((size_t *) push_array(arglens)) = keysz;
5492 
5493   for (i = 0; i < values->nelts; i++) {
5494     size_t scoresz = 0;
5495 
5496     pr_signals_handle();
5497 
5498     *((char **) push_array(args)) = f2s(tmp_pool, ((float *) scores->elts)[i],
5499       &scoresz);
5500     *((size_t *) push_array(arglens)) = scoresz;
5501 
5502     *((char **) push_array(args)) = ((char **) values->elts)[i];
5503     *((size_t *) push_array(arglens)) = ((size_t *) valueszs->elts)[i];
5504   }
5505 
5506   reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
5507   xerrno = errno;
5508 
5509   reply = handle_reply(redis, cmd, reply);
5510   if (reply == NULL) {
5511     pr_trace_msg(trace_channel, 2,
5512       "error setting items in sorted set using key (%lu bytes): %s",
5513       (unsigned long) keysz, strerror(errno));
5514     destroy_pool(tmp_pool);
5515     errno = xerrno;
5516     return -1;
5517   }
5518 
5519   if (reply->type != REDIS_REPLY_INTEGER) {
5520     pr_trace_msg(trace_channel, 2,
5521       "expected INTEGER reply for %s, got %s", cmd,
5522       get_reply_type(reply->type));
5523 
5524     if (reply->type == REDIS_REPLY_ERROR) {
5525       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5526     }
5527     freeReplyObject(reply);
5528     destroy_pool(tmp_pool);
5529     errno = EINVAL;
5530     return -1;
5531   }
5532 
5533   pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
5534 
5535   freeReplyObject(reply);
5536   destroy_pool(tmp_pool);
5537   return 0;
5538 }
5539 
pr_redis_sentinel_get_master_addr(pool * p,pr_redis_t * redis,const char * name,pr_netaddr_t ** addr)5540 int pr_redis_sentinel_get_master_addr(pool *p, pr_redis_t *redis,
5541     const char *name, pr_netaddr_t **addr) {
5542   int res = 0, xerrno = 0;
5543   pool *tmp_pool = NULL;
5544   const char *cmd = NULL;
5545   redisReply *reply;
5546 
5547   if (p == NULL ||
5548       redis == NULL ||
5549       name == NULL ||
5550       addr == NULL) {
5551     errno = EINVAL;
5552     return -1;
5553   }
5554 
5555   tmp_pool = make_sub_pool(redis->pool);
5556   pr_pool_tag(tmp_pool, "Redis SENTINEL pool");
5557 
5558   cmd = "SENTINEL";
5559   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5560   reply = redisCommand(redis->ctx, "%s get-master-addr-by-name %s", cmd, name);
5561   xerrno = errno;
5562 
5563   reply = handle_reply(redis, cmd, reply);
5564   if (reply == NULL) {
5565     pr_trace_msg(trace_channel, 2,
5566       "error getting address for master '%s': %s", name, strerror(errno));
5567     destroy_pool(tmp_pool);
5568     errno = EIO;
5569     return -1;
5570   }
5571 
5572   if (reply->type == REDIS_REPLY_NIL) {
5573     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
5574     freeReplyObject(reply);
5575     destroy_pool(tmp_pool);
5576 
5577     errno = ENOENT;
5578     return -1;
5579   }
5580 
5581   if (reply->type != REDIS_REPLY_ARRAY) {
5582     pr_trace_msg(trace_channel, 2,
5583       "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
5584 
5585     if (reply->type == REDIS_REPLY_ERROR) {
5586       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5587     }
5588 
5589     freeReplyObject(reply);
5590     destroy_pool(tmp_pool);
5591     errno = EINVAL;
5592     return -1;
5593   }
5594 
5595   if (reply->elements > 0) {
5596     redisReply *elt;
5597     char *host = NULL;
5598     int port = -1;
5599 
5600     pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
5601       (unsigned long) reply->elements,
5602       reply->elements != 1 ? "elements" : "element");
5603 
5604     elt = reply->element[0];
5605     if (elt->type == REDIS_REPLY_STRING) {
5606       host = pstrndup(tmp_pool, elt->str, elt->len);
5607 
5608     } else {
5609       pr_trace_msg(trace_channel, 2,
5610         "expected STRING element at index 0, got %s",
5611         get_reply_type(elt->type));
5612     }
5613 
5614     elt = reply->element[1];
5615     if (elt->type == REDIS_REPLY_STRING) {
5616       char *port_str;
5617 
5618       port_str = pstrndup(tmp_pool, elt->str, elt->len);
5619       port = atoi(port_str);
5620 
5621     } else {
5622       pr_trace_msg(trace_channel, 2,
5623         "expected STRING element at index 1, got %s",
5624         get_reply_type(elt->type));
5625     }
5626 
5627     if (host != NULL &&
5628         port != -1) {
5629       *addr = (pr_netaddr_t *) pr_netaddr_get_addr(p, host, NULL);
5630       if (*addr != NULL) {
5631         pr_netaddr_set_port2(*addr, port);
5632 
5633       } else {
5634         xerrno = errno;
5635         res = -1;
5636       }
5637 
5638     } else {
5639       xerrno = ENOENT;
5640       res = -1;
5641     }
5642 
5643   } else {
5644     xerrno = ENOENT;
5645     res = -1;
5646   }
5647 
5648   freeReplyObject(reply);
5649   destroy_pool(tmp_pool);
5650 
5651   errno = xerrno;
5652   return res;
5653 }
5654 
pr_redis_sentinel_get_masters(pool * p,pr_redis_t * redis,array_header ** masters)5655 int pr_redis_sentinel_get_masters(pool *p, pr_redis_t *redis,
5656     array_header **masters) {
5657   int res = 0, xerrno = 0;
5658   pool *tmp_pool = NULL;
5659   const char *cmd = NULL;
5660   redisReply *reply;
5661 
5662   if (p == NULL ||
5663       redis == NULL ||
5664       masters == NULL) {
5665     errno = EINVAL;
5666     return -1;
5667   }
5668 
5669   tmp_pool = make_sub_pool(redis->pool);
5670   pr_pool_tag(tmp_pool, "Redis SENTINEL pool");
5671 
5672   cmd = "SENTINEL";
5673   pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5674   reply = redisCommand(redis->ctx, "%s masters", cmd);
5675   xerrno = errno;
5676 
5677   reply = handle_reply(redis, cmd, reply);
5678   if (reply == NULL) {
5679     pr_trace_msg(trace_channel, 2,
5680       "error getting masters: %s", strerror(errno));
5681     destroy_pool(tmp_pool);
5682     errno = EIO;
5683     return -1;
5684   }
5685 
5686   if (reply->type == REDIS_REPLY_NIL) {
5687     pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
5688     freeReplyObject(reply);
5689     destroy_pool(tmp_pool);
5690 
5691     errno = ENOENT;
5692     return -1;
5693   }
5694 
5695   if (reply->type != REDIS_REPLY_ARRAY) {
5696     pr_trace_msg(trace_channel, 2,
5697       "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
5698 
5699     if (reply->type == REDIS_REPLY_ERROR) {
5700       pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5701     }
5702 
5703     freeReplyObject(reply);
5704     destroy_pool(tmp_pool);
5705     errno = EINVAL;
5706     return -1;
5707   }
5708 
5709   if (reply->elements > 0) {
5710     register unsigned int i;
5711 
5712     pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
5713       (unsigned long) reply->elements,
5714       reply->elements != 1 ? "elements" : "element");
5715 
5716     *masters = make_array(p, reply->elements, sizeof(char *));
5717 
5718     for (i = 0; i < reply->elements; i++) {
5719       redisReply *elt;
5720 
5721       elt = reply->element[i];
5722       if (elt->type == REDIS_REPLY_ARRAY) {
5723         redisReply *info;
5724 
5725         info = elt->element[1];
5726         *((char **) push_array(*masters)) = pstrndup(p, info->str, info->len);
5727 
5728       } else {
5729         pr_trace_msg(trace_channel, 2,
5730           "expected ARRAY element at index %u, got %s", i,
5731           get_reply_type(elt->type));
5732       }
5733     }
5734 
5735   } else {
5736     xerrno = ENOENT;
5737     res = -1;
5738   }
5739 
5740   freeReplyObject(reply);
5741   destroy_pool(tmp_pool);
5742 
5743   errno = xerrno;
5744   return res;
5745 }
5746 
redis_set_server2(const char * server,int port,unsigned long flags,const char * username,const char * password,const char * db_idx)5747 int redis_set_server2(const char *server, int port, unsigned long flags,
5748     const char *username, const char *password, const char *db_idx) {
5749 
5750   if (server == NULL) {
5751     /* By using a port of -2 specifically, we can use this function to
5752      * clear the server/port, for testing purposes ONLY.
5753      */
5754     if (port < 1 &&
5755         port != -2) {
5756       errno = EINVAL;
5757       return -1;
5758     }
5759   }
5760 
5761   redis_server = server;
5762   redis_port = port;
5763   redis_flags = flags;
5764   redis_username = username;
5765   redis_password = password;
5766   redis_db_idx = db_idx;
5767 
5768   return 0;
5769 }
5770 
redis_set_server(const char * server,int port,unsigned long flags,const char * password,const char * db_idx)5771 int redis_set_server(const char *server, int port, unsigned long flags,
5772     const char *password, const char *db_idx) {
5773   return redis_set_server2(server, port, flags, "default", password, db_idx);
5774 }
5775 
redis_set_sentinels(array_header * sentinels,const char * name)5776 int redis_set_sentinels(array_header *sentinels, const char *name) {
5777 
5778   if (sentinels != NULL &&
5779       sentinels->nelts == 0) {
5780     errno = EINVAL;
5781     return -1;
5782   }
5783 
5784   redis_sentinels = sentinels;
5785   redis_sentinel_master = name;
5786 
5787   return 0;
5788 }
5789 
redis_set_timeouts(unsigned long connect_millis,unsigned long io_millis)5790 int redis_set_timeouts(unsigned long connect_millis, unsigned long io_millis) {
5791   redis_connect_millis = connect_millis;
5792   redis_io_millis = io_millis;
5793 
5794   return 0;
5795 }
5796 
redis_clear(void)5797 int redis_clear(void) {
5798   if (sess_redis != NULL) {
5799     pr_redis_conn_destroy(sess_redis);
5800     sess_redis = NULL;
5801   }
5802 
5803   return 0;
5804 }
5805 
redis_init(void)5806 int redis_init(void) {
5807   return 0;
5808 }
5809 
5810 #else
5811 
pr_redis_conn_get(pool * p,unsigned long flags)5812 pr_redis_t *pr_redis_conn_get(pool *p, unsigned long flags) {
5813   errno = ENOSYS;
5814   return NULL;
5815 }
5816 
pr_redis_conn_new(pool * p,module * m,unsigned long flags)5817 pr_redis_t *pr_redis_conn_new(pool *p, module *m, unsigned long flags) {
5818   errno = ENOSYS;
5819   return NULL;
5820 }
5821 
pr_redis_conn_close(pr_redis_t * redis)5822 int pr_redis_conn_close(pr_redis_t *redis) {
5823   errno = ENOSYS;
5824   return -1;
5825 }
5826 
pr_redis_conn_destroy(pr_redis_t * redis)5827 int pr_redis_conn_destroy(pr_redis_t *redis) {
5828   errno = ENOSYS;
5829   return -1;
5830 }
5831 
pr_redis_conn_set_namespace(pr_redis_t * redis,module * m,const void * prefix,size_t prefixsz)5832 int pr_redis_conn_set_namespace(pr_redis_t *redis, module *m,
5833     const void *prefix, size_t prefixsz) {
5834   errno = ENOSYS;
5835   return -1;
5836 }
5837 
pr_redis_auth(pr_redis_t * redis,const char * password)5838 int pr_redis_auth(pr_redis_t *redis, const char *password) {
5839   errno = ENOSYS;
5840   return -1;
5841 }
5842 
pr_redis_select(pr_redis_t * redis,const char * db_idx)5843 int pr_redis_select(pr_redis_t *redis, const char *db_idx) {
5844   errno = ENOSYS;
5845   return -1;
5846 }
5847 
pr_redis_command(pr_redis_t * redis,const array_header * args,int reply_type)5848 int pr_redis_command(pr_redis_t *redis, const array_header *args,
5849     int reply_type) {
5850   errno = ENOSYS;
5851   return -1;
5852 }
5853 
pr_redis_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,time_t expires)5854 int pr_redis_add(pr_redis_t *redis, module *m, const char *key, void *value,
5855     size_t valuesz, time_t expires) {
5856   errno = ENOSYS;
5857   return -1;
5858 }
5859 
pr_redis_decr(pr_redis_t * redis,module * m,const char * key,uint32_t decr,uint64_t * value)5860 int pr_redis_decr(pr_redis_t *redis, module *m, const char *key, uint32_t decr,
5861     uint64_t *value) {
5862   errno = ENOSYS;
5863   return -1;
5864 }
5865 
pr_redis_get(pool * p,pr_redis_t * redis,module * m,const char * key,size_t * valuesz)5866 void *pr_redis_get(pool *p, pr_redis_t *redis, module *m, const char *key,
5867     size_t *valuesz) {
5868   errno = ENOSYS;
5869   return NULL;
5870 }
5871 
pr_redis_get_str(pool * p,pr_redis_t * redis,module * m,const char * key)5872 char *pr_redis_get_str(pool *p, pr_redis_t *redis, module *m, const char *key) {
5873   errno = ENOSYS;
5874   return NULL;
5875 }
5876 
pr_redis_incr(pr_redis_t * redis,module * m,const char * key,uint32_t incr,uint64_t * value)5877 int pr_redis_incr(pr_redis_t *redis, module *m, const char *key, uint32_t incr,
5878     uint64_t *value) {
5879   errno = ENOSYS;
5880   return -1;
5881 }
5882 
pr_redis_remove(pr_redis_t * redis,module * m,const char * key)5883 int pr_redis_remove(pr_redis_t *redis, module *m, const char *key) {
5884   errno = ENOSYS;
5885   return -1;
5886 }
5887 
pr_redis_rename(pr_redis_t * redis,module * m,const char * from,const char * to)5888 int pr_redis_rename(pr_redis_t *redis, module *m, const char *from,
5889     const char *to) {
5890   errno = ENOSYS;
5891   return -1;
5892 }
5893 
pr_redis_set(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,time_t expires)5894 int pr_redis_set(pr_redis_t *redis, module *m, const char *key, void *value,
5895     size_t valuesz, time_t expires) {
5896   errno = ENOSYS;
5897   return -1;
5898 }
5899 
pr_redis_hash_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)5900 int pr_redis_hash_count(pr_redis_t *redis, module *m, const char *key,
5901     uint64_t *count) {
5902   errno = ENOSYS;
5903   return -1;
5904 }
5905 
pr_redis_hash_delete(pr_redis_t * redis,module * m,const char * key,const char * field)5906 int pr_redis_hash_delete(pr_redis_t *redis, module *m, const char *key,
5907     const char *field) {
5908   errno = ENOSYS;
5909   return -1;
5910 }
5911 
pr_redis_hash_exists(pr_redis_t * redis,module * m,const char * key,const char * field)5912 int pr_redis_hash_exists(pr_redis_t *redis, module *m, const char *key,
5913     const char *field) {
5914   errno = ENOSYS;
5915   return -1;
5916 }
5917 
pr_redis_hash_get(pool * p,pr_redis_t * redis,module * m,const char * key,const char * field,void ** value,size_t * valuesz)5918 int pr_redis_hash_get(pool *p, pr_redis_t *redis, module *m, const char *key,
5919     const char *field, void **value, size_t *valuesz) {
5920   errno = ENOSYS;
5921   return -1;
5922 }
5923 
pr_redis_hash_getall(pool * p,pr_redis_t * redis,module * m,const char * key,pr_table_t ** hash)5924 int pr_redis_hash_getall(pool *p, pr_redis_t *redis, module *m,
5925     const char *key, pr_table_t **hash) {
5926   errno = ENOSYS;
5927   return -1;
5928 }
5929 
pr_redis_hash_incr(pr_redis_t * redis,module * m,const char * key,const char * field,int32_t incr,int64_t * value)5930 int pr_redis_hash_incr(pr_redis_t *redis, module *m, const char *key,
5931     const char *field, int32_t incr, int64_t *value) {
5932   errno = ENOSYS;
5933   return -1;
5934 }
5935 
pr_redis_hash_keys(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** fields)5936 int pr_redis_hash_keys(pool *p, pr_redis_t *redis, module *m, const char *key,
5937     array_header **fields) {
5938   errno = ENOSYS;
5939   return -1;
5940 }
5941 
pr_redis_hash_remove(pr_redis_t * redis,module * m,const char * key)5942 int pr_redis_hash_remove(pr_redis_t *redis, module *m, const char *key) {
5943   errno = ENOSYS;
5944   return -1;
5945 }
5946 
pr_redis_hash_set(pr_redis_t * redis,module * m,const char * key,const char * field,void * value,size_t valuesz)5947 int pr_redis_hash_set(pr_redis_t *redis, module *m, const char *key,
5948     const char *field, void *value, size_t valuesz) {
5949   errno = ENOSYS;
5950   return -1;
5951 }
5952 
pr_redis_hash_setall(pr_redis_t * redis,module * m,const char * key,pr_table_t * hash)5953 int pr_redis_hash_setall(pr_redis_t *redis, module *m, const char *key,
5954     pr_table_t *hash) {
5955   errno = ENOSYS;
5956   return -1;
5957 }
5958 
pr_redis_hash_values(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values)5959 int pr_redis_hash_values(pool *p, pr_redis_t *redis, module *m,
5960     const char *key, array_header **values) {
5961   errno = ENOSYS;
5962   return -1;
5963 }
5964 
pr_redis_list_append(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)5965 int pr_redis_list_append(pr_redis_t *redis, module *m, const char *key,
5966     void *value, size_t valuesz) {
5967   errno = ENOSYS;
5968   return -1;
5969 }
5970 
pr_redis_list_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)5971 int pr_redis_list_count(pr_redis_t *redis, module *m, const char *key,
5972     uint64_t *count) {
5973   errno = ENOSYS;
5974   return -1;
5975 }
5976 
pr_redis_list_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)5977 int pr_redis_list_delete(pr_redis_t *redis, module *m, const char *key,
5978     void *value, size_t valuesz) {
5979   errno = ENOSYS;
5980   return -1;
5981 }
5982 
pr_redis_list_exists(pr_redis_t * redis,module * m,const char * key,unsigned int idx)5983 int pr_redis_list_exists(pr_redis_t *redis, module *m, const char *key,
5984     unsigned int idx) {
5985   errno = ENOSYS;
5986   return -1;
5987 }
5988 
pr_redis_list_get(pool * p,pr_redis_t * redis,module * m,const char * key,unsigned int idx,void ** value,size_t * valuesz)5989 int pr_redis_list_get(pool *p, pr_redis_t *redis, module *m, const char *key,
5990     unsigned int idx, void **value, size_t *valuesz) {
5991   errno = ENOSYS;
5992   return -1;
5993 }
5994 
pr_redis_list_getall(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values,array_header ** valueszs)5995 int pr_redis_list_getall(pool *p, pr_redis_t *redis, module *m, const char *key,
5996     array_header **values, array_header **valueszs) {
5997   errno = ENOSYS;
5998   return -1;
5999 }
6000 
pr_redis_list_pop(pool * p,pr_redis_t * redis,module * m,const char * key,void ** value,size_t * valuesz,int flags)6001 int pr_redis_list_pop(pool *p, pr_redis_t *redis, module *m, const char *key,
6002     void **value, size_t *valuesz, int flags) {
6003   errno = ENOSYS;
6004   return -1;
6005 }
6006 
pr_redis_list_push(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,int flags)6007 int pr_redis_list_push(pr_redis_t *redis, module *m, const char *key,
6008     void *value, size_t valuesz, int flags) {
6009   errno = ENOSYS;
6010   return -1;
6011 }
6012 
pr_redis_list_remove(pr_redis_t * redis,module * m,const char * key)6013 int pr_redis_list_remove(pr_redis_t *redis, module *m, const char *key) {
6014   errno = ENOSYS;
6015   return -1;
6016 }
6017 
pr_redis_list_rotate(pool * p,pr_redis_t * redis,module * m,const char * key,void ** value,size_t * valuesz)6018 int pr_redis_list_rotate(pool *p, pr_redis_t *redis, module *m,
6019     const char *key, void **value, size_t *valuesz) {
6020   errno = ENOSYS;
6021   return -1;
6022 }
6023 
pr_redis_list_set(pr_redis_t * redis,module * m,const char * key,unsigned int idx,void * value,size_t valuesz)6024 int pr_redis_list_set(pr_redis_t *redis, module *m, const char *key,
6025     unsigned int idx, void *value, size_t valuesz) {
6026   errno = ENOSYS;
6027   return -1;
6028 }
6029 
pr_redis_list_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs)6030 int pr_redis_list_setall(pr_redis_t *redis, module *m, const char *key,
6031     array_header *values, array_header *valueszs) {
6032   errno = ENOSYS;
6033   return -1;
6034 }
6035 
pr_redis_set_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6036 int pr_redis_set_add(pr_redis_t *redis, module *m, const char *key,
6037     void *value, size_t valuesz) {
6038   errno = ENOSYS;
6039   return -1;
6040 }
6041 
pr_redis_set_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)6042 int pr_redis_set_count(pr_redis_t *redis, module *m, const char *key,
6043     uint64_t *count) {
6044   errno = ENOSYS;
6045   return -1;
6046 }
6047 
pr_redis_set_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6048 int pr_redis_set_delete(pr_redis_t *redis, module *m, const char *key,
6049     void *value, size_t valuesz) {
6050   errno = ENOSYS;
6051   return -1;
6052 }
6053 
pr_redis_set_exists(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6054 int pr_redis_set_exists(pr_redis_t *redis, module *m, const char *key,
6055     void *value, size_t valuesz) {
6056   errno = ENOSYS;
6057   return -1;
6058 }
6059 
pr_redis_set_getall(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values,array_header ** valueszs)6060 int pr_redis_set_getall(pool *p, pr_redis_t *redis, module *m, const char *key,
6061     array_header **values, array_header **valueszs) {
6062   errno = ENOSYS;
6063   return -1;
6064 }
6065 
pr_redis_set_remove(pr_redis_t * redis,module * m,const char * key)6066 int pr_redis_set_remove(pr_redis_t *redis, module *m, const char *key) {
6067   errno = ENOSYS;
6068   return -1;
6069 }
6070 
pr_redis_set_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs)6071 int pr_redis_set_setall(pr_redis_t *redis, module *m, const char *key,
6072     array_header *values, array_header *valueszs) {
6073   errno = ENOSYS;
6074   return -1;
6075 }
6076 
pr_redis_sorted_set_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float score)6077 int pr_redis_sorted_set_add(pr_redis_t *redis, module *m, const char *key,
6078     void *value, size_t valuesz, float score) {
6079   errno = ENOSYS;
6080   return -1;
6081 }
6082 
pr_redis_sorted_set_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)6083 int pr_redis_sorted_set_count(pr_redis_t *redis, module *m, const char *key,
6084     uint64_t *count) {
6085   errno = ENOSYS;
6086   return -1;
6087 }
6088 
pr_redis_sorted_set_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6089 int pr_redis_sorted_set_delete(pr_redis_t *redis, module *m, const char *key,
6090     void *value, size_t valuesz) {
6091   errno = ENOSYS;
6092   return -1;
6093 }
6094 
pr_redis_sorted_set_exists(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6095 int pr_redis_sorted_set_exists(pr_redis_t *redis, module *m, const char *key,
6096     void *value, size_t valuesz) {
6097   errno = ENOSYS;
6098   return -1;
6099 }
6100 
pr_redis_sorted_set_getn(pool * p,pr_redis_t * redis,module * m,const char * key,unsigned int offset,unsigned int len,array_header ** values,array_header ** valueszs,int flags)6101 int pr_redis_sorted_set_getn(pool *p, pr_redis_t *redis, module *m,
6102     const char *key, unsigned int offset, unsigned int len,
6103     array_header **values, array_header **valueszs, int flags) {
6104   errno = ENOSYS;
6105   return -1;
6106 }
6107 
pr_redis_sorted_set_incr(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float incr,float * score)6108 int pr_redis_sorted_set_incr(pr_redis_t *redis, module *m, const char *key,
6109     void *value, size_t valuesz, float incr, float *score) {
6110   errno = ENOSYS;
6111   return -1;
6112 }
6113 
pr_redis_sorted_set_remove(pr_redis_t * redis,module * m,const char * key)6114 int pr_redis_sorted_set_remove(pr_redis_t *redis, module *m, const char *key) {
6115   errno = ENOSYS;
6116   return -1;
6117 }
6118 
pr_redis_sorted_set_score(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float * score)6119 int pr_redis_sorted_set_score(pr_redis_t *redis, module *m, const char *key,
6120     void *value, size_t valuesz, float *score) {
6121   errno = ENOSYS;
6122   return -1;
6123 }
6124 
pr_redis_sorted_set_set(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float score)6125 int pr_redis_sorted_set_set(pr_redis_t *redis, module *m, const char *key,
6126     void *value, size_t valuesz, float score) {
6127   errno = ENOSYS;
6128   return -1;
6129 }
6130 
pr_redis_sorted_set_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs,array_header * scores)6131 int pr_redis_sorted_set_setall(pr_redis_t *redis, module *m, const char *key,
6132     array_header *values, array_header *valueszs, array_header *scores) {
6133   errno = ENOSYS;
6134   return -1;
6135 }
6136 
pr_redis_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,time_t expires)6137 int pr_redis_kadd(pr_redis_t *redis, module *m, const char *key, size_t keysz,
6138     void *value, size_t valuesz, time_t expires) {
6139   errno = ENOSYS;
6140   return -1;
6141 }
6142 
pr_redis_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,size_t * valuesz)6143 void *pr_redis_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
6144     size_t keysz, size_t *valuesz) {
6145   errno = ENOSYS;
6146   return NULL;
6147 }
6148 
pr_redis_kget_str(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz)6149 char *pr_redis_kget_str(pool *p, pr_redis_t *redis, module *m, const char *key,
6150     size_t keysz) {
6151   errno = ENOSYS;
6152   return NULL;
6153 }
6154 
pr_redis_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6155 int pr_redis_kremove(pr_redis_t *redis, module *m, const char *key,
6156     size_t keysz) {
6157   errno = ENOSYS;
6158   return -1;
6159 }
6160 
pr_redis_krename(pr_redis_t * redis,module * m,const char * from,size_t fromsz,const char * to,size_t tosz)6161 int pr_redis_krename(pr_redis_t *redis, module *m, const char *from,
6162     size_t fromsz, const char *to, size_t tosz) {
6163   errno = ENOSYS;
6164   return -1;
6165 }
6166 
pr_redis_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,time_t expires)6167 int pr_redis_kset(pr_redis_t *redis, module *m, const char *key, size_t keysz,
6168     void *value, size_t valuesz, time_t expires) {
6169   errno = ENOSYS;
6170   return -1;
6171 }
6172 
pr_redis_hash_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)6173 int pr_redis_hash_kcount(pr_redis_t *redis, module *m, const char *key,
6174     size_t keysz, uint64_t *count) {
6175   errno = ENOSYS;
6176   return -1;
6177 }
6178 
pr_redis_hash_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz)6179 int pr_redis_hash_kdelete(pr_redis_t *redis, module *m, const char *key,
6180     size_t keysz, const char *field, size_t fieldsz) {
6181   errno = ENOSYS;
6182   return -1;
6183 }
6184 
pr_redis_hash_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz)6185 int pr_redis_hash_kexists(pr_redis_t *redis, module *m, const char *key,
6186     size_t keysz, const char *field, size_t fieldsz) {
6187   errno = ENOSYS;
6188   return -1;
6189 }
6190 
pr_redis_hash_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,void ** value,size_t * valuesz)6191 int pr_redis_hash_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
6192     size_t keysz, const char *field, size_t fieldsz, void **value,
6193     size_t *valuesz) {
6194   errno = ENOSYS;
6195   return -1;
6196 }
6197 
pr_redis_hash_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,pr_table_t ** hash)6198 int pr_redis_hash_kgetall(pool *p, pr_redis_t *redis, module *m,
6199     const char *key, size_t keysz, pr_table_t **hash) {
6200   errno = ENOSYS;
6201   return -1;
6202 }
6203 
pr_redis_hash_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,int32_t incr,int64_t * value)6204 int pr_redis_hash_kincr(pr_redis_t *redis, module *m, const char *key,
6205     size_t keysz, const char *field, size_t fieldsz, int32_t incr,
6206     int64_t *value) {
6207   errno = ENOSYS;
6208   return -1;
6209 }
6210 
pr_redis_hash_kkeys(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** fields)6211 int pr_redis_hash_kkeys(pool *p, pr_redis_t *redis, module *m, const char *key,
6212     size_t keysz, array_header **fields) {
6213   errno = ENOSYS;
6214   return -1;
6215 }
6216 
pr_redis_hash_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6217 int pr_redis_hash_kremove(pr_redis_t *redis, module *m, const char *key,
6218     size_t keysz) {
6219   errno = ENOSYS;
6220   return -1;
6221 }
6222 
pr_redis_hash_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,void * value,size_t valuesz)6223 int pr_redis_hash_kset(pr_redis_t *redis, module *m, const char *key,
6224     size_t keysz, const char *field, size_t fieldsz, void *value,
6225     size_t valuesz) {
6226   errno = ENOSYS;
6227   return -1;
6228 }
6229 
pr_redis_hash_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,pr_table_t * hash)6230 int pr_redis_hash_ksetall(pr_redis_t *redis, module *m, const char *key,
6231     size_t keysz, pr_table_t *hash) {
6232   errno = ENOSYS;
6233   return -1;
6234 }
6235 
pr_redis_hash_kvalues(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values)6236 int pr_redis_hash_kvalues(pool *p, pr_redis_t *redis, module *m,
6237     const char *key, size_t keysz, array_header **values) {
6238   errno = ENOSYS;
6239   return -1;
6240 }
6241 
pr_redis_list_kappend(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6242 int pr_redis_list_kappend(pr_redis_t *redis, module *m, const char *key,
6243     size_t keysz, void *value, size_t valuesz) {
6244   errno = ENOSYS;
6245   return -1;
6246 }
6247 
pr_redis_list_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)6248 int pr_redis_list_kcount(pr_redis_t *redis, module *m, const char *key,
6249     size_t keysz, uint64_t *count) {
6250   errno = ENOSYS;
6251   return -1;
6252 }
6253 
pr_redis_list_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6254 int pr_redis_list_kdelete(pr_redis_t *redis, module *m, const char *key,
6255     size_t keysz, void *value, size_t valuesz) {
6256   errno = ENOSYS;
6257   return -1;
6258 }
6259 
pr_redis_list_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx)6260 int pr_redis_list_kexists(pr_redis_t *redis, module *m, const char *key,
6261     size_t keysz, unsigned int idx) {
6262   errno = ENOSYS;
6263   return -1;
6264 }
6265 
pr_redis_list_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx,void ** value,size_t * valuesz)6266 int pr_redis_list_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
6267     size_t keysz, unsigned int idx, void **value, size_t *valuesz) {
6268   errno = ENOSYS;
6269   return -1;
6270 }
6271 
pr_redis_list_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values,array_header ** valueszs)6272 int pr_redis_list_kgetall(pool *p, pr_redis_t *redis, module *m,
6273     const char *key, size_t keysz, array_header **values,
6274     array_header **valueszs) {
6275   errno = ENOSYS;
6276   return -1;
6277 }
6278 
pr_redis_list_kpop(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,void ** value,size_t * valuesz,int flags)6279 int pr_redis_list_kpop(pool *p, pr_redis_t *redis, module *m, const char *key,
6280     size_t keysz, void **value, size_t *valuesz, int flags) {
6281   errno = ENOSYS;
6282   return -1;
6283 }
6284 
pr_redis_list_kpush(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,int flags)6285 int pr_redis_list_kpush(pr_redis_t *redis, module *m, const char *key,
6286     size_t keysz, void *value, size_t valuesz, int flags) {
6287   errno = ENOSYS;
6288   return -1;
6289 }
6290 
pr_redis_list_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6291 int pr_redis_list_kremove(pr_redis_t *redis, module *m, const char *key,
6292     size_t keysz) {
6293   errno = ENOSYS;
6294   return -1;
6295 }
6296 
pr_redis_list_krotate(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,void ** value,size_t * valuesz)6297 int pr_redis_list_krotate(pool *p, pr_redis_t *redis, module *m,
6298     const char *key, size_t keysz, void **value, size_t *valuesz) {
6299   errno = ENOSYS;
6300   return -1;
6301 }
6302 
pr_redis_list_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx,void * value,size_t valuesz)6303 int pr_redis_list_kset(pr_redis_t *redis, module *m, const char *key,
6304     size_t keysz, unsigned int idx, void *value, size_t valuesz) {
6305   errno = ENOSYS;
6306   return -1;
6307 }
6308 
pr_redis_list_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs)6309 int pr_redis_list_ksetall(pr_redis_t *redis, module *m, const char *key,
6310     size_t keysz, array_header *values, array_header *valueszs) {
6311   errno = ENOSYS;
6312   return -1;
6313 }
6314 
pr_redis_set_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6315 int pr_redis_set_kadd(pr_redis_t *redis, module *m, const char *key,
6316     size_t keysz, void *value, size_t valuesz) {
6317   errno = ENOSYS;
6318   return -1;
6319 }
6320 
pr_redis_set_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)6321 int pr_redis_set_kcount(pr_redis_t *redis, module *m, const char *key,
6322     size_t keysz, uint64_t *count) {
6323   errno = ENOSYS;
6324   return -1;
6325 }
6326 
pr_redis_set_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6327 int pr_redis_set_kdelete(pr_redis_t *redis, module *m, const char *key,
6328     size_t keysz, void *value, size_t valuesz) {
6329   errno = ENOSYS;
6330   return -1;
6331 }
6332 
pr_redis_set_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6333 int pr_redis_set_kexists(pr_redis_t *redis, module *m, const char *key,
6334     size_t keysz, void *value, size_t valuesz) {
6335   errno = ENOSYS;
6336   return -1;
6337 }
6338 
pr_redis_set_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values,array_header ** valueszs)6339 int pr_redis_set_kgetall(pool *p, pr_redis_t *redis, module *m, const char *key,
6340     size_t keysz, array_header **values, array_header **valueszs) {
6341   errno = ENOSYS;
6342   return -1;
6343 }
6344 
pr_redis_set_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6345 int pr_redis_set_kremove(pr_redis_t *redis, module *m, const char *key,
6346     size_t keysz) {
6347   errno = ENOSYS;
6348   return -1;
6349 }
6350 
pr_redis_set_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs)6351 int pr_redis_set_ksetall(pr_redis_t *redis, module *m, const char *key,
6352     size_t keysz, array_header *values, array_header *valueszs) {
6353   errno = ENOSYS;
6354   return -1;
6355 }
6356 
pr_redis_sorted_set_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float score)6357 int pr_redis_sorted_set_kadd(pr_redis_t *redis, module *m, const char *key,
6358     size_t keysz, void *value, size_t valuesz, float score) {
6359   errno = ENOSYS;
6360   return -1;
6361 }
6362 
pr_redis_sorted_set_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)6363 int pr_redis_sorted_set_kcount(pr_redis_t *redis, module *m, const char *key,
6364     size_t keysz, uint64_t *count) {
6365   errno = ENOSYS;
6366   return -1;
6367 }
6368 
pr_redis_sorted_set_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6369 int pr_redis_sorted_set_kdelete(pr_redis_t *redis, module *m, const char *key,
6370     size_t keysz, void *value, size_t valuesz) {
6371   errno = ENOSYS;
6372   return -1;
6373 }
6374 
pr_redis_sorted_set_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6375 int pr_redis_sorted_set_kexists(pr_redis_t *redis, module *m, const char *key,
6376     size_t keysz, void *value, size_t valuesz) {
6377   errno = ENOSYS;
6378   return -1;
6379 }
6380 
pr_redis_sorted_set_kgetn(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int offset,unsigned int len,array_header ** values,array_header ** valueszs,int flags)6381 int pr_redis_sorted_set_kgetn(pool *p, pr_redis_t *redis, module *m,
6382     const char *key, size_t keysz, unsigned int offset, unsigned int len,
6383     array_header **values, array_header **valueszs, int flags) {
6384   errno = ENOSYS;
6385   return -1;
6386 }
6387 
pr_redis_sorted_set_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float incr,float * score)6388 int pr_redis_sorted_set_kincr(pr_redis_t *redis, module *m, const char *key,
6389     size_t keysz, void *value, size_t valuesz, float incr, float *score) {
6390   errno = ENOSYS;
6391   return -1;
6392 }
6393 
pr_redis_sorted_set_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6394 int pr_redis_sorted_set_kremove(pr_redis_t *redis, module *m, const char *key,
6395     size_t keysz) {
6396   errno = ENOSYS;
6397   return -1;
6398 }
6399 
pr_redis_sorted_set_kscore(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float * score)6400 int pr_redis_sorted_set_kscore(pr_redis_t *redis, module *m, const char *key,
6401     size_t keysz, void *value, size_t valuesz, float *score) {
6402   errno = ENOSYS;
6403   return -1;
6404 }
6405 
pr_redis_sorted_set_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float score)6406 int pr_redis_sorted_set_kset(pr_redis_t *redis, module *m, const char *key,
6407     size_t keysz, void *value, size_t valuesz, float score) {
6408   errno = ENOSYS;
6409   return -1;
6410 }
6411 
pr_redis_sorted_set_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs,array_header * scores)6412 int pr_redis_sorted_set_ksetall(pr_redis_t *redis, module *m, const char *key,
6413     size_t keysz, array_header *values, array_header *valueszs,
6414     array_header *scores) {
6415   errno = ENOSYS;
6416   return -1;
6417 }
6418 
pr_redis_sentinel_get_master_addr(pool * p,pr_redis_t * redis,const char * name,pr_netaddr_t ** addr)6419 int pr_redis_sentinel_get_master_addr(pool *p, pr_redis_t *redis,
6420     const char *name, pr_netaddr_t **addr) {
6421   errno = ENOSYS;
6422   return -1;
6423 }
6424 
pr_redis_sentinel_get_masters(pool * p,pr_redis_t * redis,array_header ** masters)6425 int pr_redis_sentinel_get_masters(pool *p, pr_redis_t *redis,
6426     array_header **masters) {
6427   errno = ENOSYS;
6428   return -1;
6429 }
6430 
redis_set_server(const char * server,int port,unsigned long flags,const char * password,const char * db_idx)6431 int redis_set_server(const char *server, int port, unsigned long flags,
6432     const char *password, const char *db_idx) {
6433   errno = ENOSYS;
6434   return -1;
6435 }
6436 
redis_set_sentinels(array_header * sentinels,const char * name)6437 int redis_set_sentinels(array_header *sentinels, const char *name) {
6438   errno = ENOSYS;
6439   return -1;
6440 }
6441 
redis_set_timeouts(unsigned long conn_millis,unsigned long io_millis)6442 int redis_set_timeouts(unsigned long conn_millis, unsigned long io_millis) {
6443   errno = ENOSYS;
6444   return -1;
6445 }
6446 
redis_clear(void)6447 int redis_clear(void) {
6448   errno = ENOSYS;
6449   return -1;
6450 }
6451 
redis_init(void)6452 int redis_init(void) {
6453   errno = ENOSYS;
6454   return -1;
6455 }
6456 
6457 #endif /* PR_USE_REDIS */
6458