1 /*
2 * ProFTPD - FTP server daemon
3 * Copyright (c) 2017-2020 The ProFTPD Project team
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Suite 500, Boston, MA 02110-1335, USA.
18 *
19 * As a special exemption, The ProFTPD Project team and other respective
20 * copyright holders give permission to link this program with OpenSSL, and
21 * distribute the resulting executable, without including the source code for
22 * OpenSSL in the source distribution.
23 */
24
25 /* Redis management */
26
27 #include "conf.h"
28
29 #ifdef PR_USE_REDIS
30
31 #include <hiredis/hiredis.h>
32
33 #ifndef REDIS_CONNECT_RETRIES
34 # define REDIS_CONNECT_RETRIES 10
35 #endif /* REDIS_CONNECT_RETRIES */
36
37 /* When scanning for keys/lists, how many items to request per command? */
38 #define PR_REDIS_SCAN_SIZE 100
39
40 struct redis_rec {
41 pool *pool;
42 module *owner;
43 redisContext *ctx;
44 unsigned long flags;
45
46 /* For tracking the number of "opens"/"closes" on a shared redis_rec,
47 * as the same struct might be used by multiple modules in the same
48 * session, each module doing a conn_get()/conn_close().
49 */
50 unsigned int refcount;
51
52 /* Redis server version. */
53 unsigned int major_version;
54 unsigned int minor_version;
55 unsigned int patch_version;
56
57 /* Table mapping modules to their namespaces */
58 pr_table_t *namespace_tab;
59 };
60
61 static array_header *redis_sentinels = NULL;
62 static const char *redis_sentinel_master = NULL;
63
64 static const char *redis_server = NULL;
65 static int redis_port = -1;
66 static unsigned long redis_flags = 0UL;
67 static const char *redis_username = NULL;
68 static const char *redis_password = NULL;
69 static const char *redis_db_idx = NULL;
70
71 static pr_redis_t *sess_redis = NULL;
72
73 static unsigned long redis_connect_millis = 500;
74 static unsigned long redis_io_millis = 500;
75
76 static const char *trace_channel = "redis";
77
78 static const char *get_reply_type(int reply_type);
79
millis2timeval(struct timeval * tv,unsigned long millis)80 static void millis2timeval(struct timeval *tv, unsigned long millis) {
81 tv->tv_sec = (millis / 1000);
82 tv->tv_usec = (millis - (tv->tv_sec * 1000)) * 1000;
83 }
84
redis_strerror(pool * p,pr_redis_t * redis,int rerrno)85 static const char *redis_strerror(pool *p, pr_redis_t *redis, int rerrno) {
86 const char *err;
87
88 switch (redis->ctx->err) {
89 case REDIS_ERR_IO:
90 err = pstrcat(p, "[io] ", strerror(rerrno), NULL);
91 break;
92
93 case REDIS_ERR_EOF:
94 err = pstrcat(p, "[eof] ", redis->ctx->errstr, NULL);
95 break;
96
97 case REDIS_ERR_PROTOCOL:
98 err = pstrcat(p, "[protocol] ", redis->ctx->errstr, NULL);
99 break;
100
101 case REDIS_ERR_OOM:
102 err = pstrcat(p, "[oom] ", redis->ctx->errstr, NULL);
103 break;
104
105 case REDIS_ERR_OTHER:
106 err = pstrcat(p, "[other] ", redis->ctx->errstr, NULL);
107 break;
108
109 case REDIS_OK:
110 default:
111 err = "OK";
112 break;
113 }
114
115 return err;
116 }
117
conn_reconnect(pool * p,pr_redis_t * redis)118 static int conn_reconnect(pool *p, pr_redis_t *redis) {
119 int xerrno = 0;
120 #ifdef HAVE_HIREDIS_REDISRECONNECT
121 register unsigned int i;
122
123 if (redis->flags & PR_REDIS_CONN_FL_NO_RECONNECT) {
124 errno = EPERM;
125 return -1;
126 }
127
128 /* Use the already-provided REDIS_CONNECT_RETRIES from <hiredis/hiredis.h>
129 * rather than defining our own.
130 *
131 * Currently that is a rather low number (3), so I do not feel the need
132 * for retry delays or exponential backoff at this time.
133 */
134 for (i = 0; i < REDIS_CONNECT_RETRIES; i++) {
135 int res;
136
137 pr_trace_msg(trace_channel, 9, "attempt #%u to reconnect", i+1);
138
139 res = redisReconnect(redis->ctx);
140 xerrno = errno;
141 if (res == REDIS_OK) {
142 pr_trace_msg(trace_channel, 9, "attempt #%u to reconnect succeeded", i+1);
143 return 0;
144 }
145
146 pr_trace_msg(trace_channel, 9, "attempt #%u to reconnect failed: %s",
147 i+ 1, redis_strerror(p, redis, xerrno));
148 }
149 #else
150 xerrno = ENOSYS;
151 #endif /* No redisReconnect() */
152
153 errno = xerrno;
154 return -1;
155 }
156
handle_reply(pr_redis_t * redis,const char * cmd,redisReply * reply)157 static redisReply *handle_reply(pr_redis_t *redis, const char *cmd,
158 redisReply *reply) {
159 int xerrno;
160 pool *tmp_pool;
161
162 if (reply != NULL) {
163 return reply;
164 }
165
166 xerrno = errno;
167 tmp_pool = make_sub_pool(redis->pool);
168 pr_trace_msg(trace_channel, 2, "error executing %s command: %s", cmd,
169 redis_strerror(tmp_pool, redis, xerrno));
170
171 if (redis->ctx->err == REDIS_ERR_IO ||
172 redis->ctx->err == REDIS_ERR_EOF) {
173 int res;
174
175 res = conn_reconnect(tmp_pool, redis);
176 if (res < 0) {
177 pr_trace_msg(trace_channel, 9, "failed to reconnect: %s",
178 strerror(errno));
179 }
180 }
181
182 destroy_pool(tmp_pool);
183 errno = xerrno;
184 return NULL;
185 }
186
ping_server(pr_redis_t * redis)187 static int ping_server(pr_redis_t *redis) {
188 const char *cmd;
189 redisReply *reply;
190
191 cmd = "PING";
192 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
193 reply = redisCommand(redis->ctx, "%s", cmd);
194 reply = handle_reply(redis, cmd, reply);
195 if (reply == NULL) {
196 return -1;
197 }
198
199 /* We COULD assert a "PONG" response here, but really, anything is OK. */
200 pr_trace_msg(trace_channel, 7, "%s reply: %s", cmd, reply->str);
201 freeReplyObject(reply);
202 return 0;
203 }
204
parse_redis_version(pr_redis_t * redis,redisReply * info)205 static void parse_redis_version(pr_redis_t *redis, redisReply *info) {
206 pool *tmp_pool;
207 unsigned int major, minor, patch;
208 char *text, *version_text;
209
210 if (info->type != REDIS_REPLY_STRING) {
211 pr_trace_msg(trace_channel, 1, "expected STRING reply for INFO, got %s",
212 get_reply_type(info->type));
213 return;
214 }
215
216 tmp_pool = make_sub_pool(redis->pool);
217 pr_pool_tag(tmp_pool, "Redis version parsing pool");
218 text = pstrndup(tmp_pool, info->str, info->len);
219
220 /* Scan the entire INFO string for "redis_version:N.N.N". */
221
222 version_text = strstr(text, "redis_version:");
223 if (version_text == NULL) {
224 pr_trace_msg(trace_channel, 1, "no `redis_version` found in INFO reply");
225 destroy_pool(tmp_pool);
226 return;
227 }
228
229 if (sscanf(version_text, "redis_version:%u.%u.%u", &major, &minor,
230 &patch) == 3) {
231 redis->major_version = major;
232 redis->minor_version = minor;
233 redis->patch_version = patch;
234
235 pr_trace_msg(trace_channel, 9,
236 "parsed Redis version %u (major), %u (minor), %u (patch) out of INFO",
237 redis->major_version, redis->minor_version, redis->patch_version);
238
239 } else {
240 pr_trace_msg(trace_channel, 1, "failed to scan Redis version '%s'",
241 version_text);
242 }
243
244 destroy_pool(tmp_pool);
245 }
246
stat_server(pr_redis_t * redis,const char * section)247 static int stat_server(pr_redis_t *redis, const char *section) {
248 const char *cmd;
249 redisReply *reply;
250
251 cmd = "INFO";
252 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
253 reply = redisCommand(redis->ctx, "%s %s", cmd, section);
254 reply = handle_reply(redis, cmd, reply);
255 if (reply == NULL) {
256 return -1;
257 }
258
259 if (pr_trace_get_level(trace_channel) >= 25) {
260 pr_trace_msg(trace_channel, 25, "%s reply: %s", cmd, reply->str);
261
262 } else {
263 pr_trace_msg(trace_channel, 7, "%s reply: (text, %lu bytes)", cmd,
264 (unsigned long) reply->len);
265 }
266
267 if (redis->major_version == 0 &&
268 (strcmp(section, "server") == 0 || strcmp(section, "") == 0)) {
269 /* We are particularly interested in the Redis server version; we key
270 * off of this version to detect when to change our command semantics,
271 * such as for AUTH.
272 *
273 * Thus we parse the server version out of the "server" info, unless
274 * the version is already known.
275 */
276 parse_redis_version(redis, reply);
277 }
278
279 freeReplyObject(reply);
280 return 0;
281 }
282
pr_redis_conn_get(pool * p,unsigned long flags)283 pr_redis_t *pr_redis_conn_get(pool *p, unsigned long flags) {
284 if (p == NULL) {
285 errno = EINVAL;
286 return NULL;
287 }
288
289 if (sess_redis != NULL) {
290 sess_redis->refcount++;
291 return sess_redis;
292 }
293
294 return pr_redis_conn_new(p, NULL, flags);
295 }
296
set_conn_options(pr_redis_t * redis)297 static int set_conn_options(pr_redis_t *redis) {
298 int res, xerrno;
299 struct timeval tv;
300 pool *tmp_pool;
301
302 tmp_pool = make_sub_pool(redis->pool);
303
304 millis2timeval(&tv, redis_io_millis);
305 res = redisSetTimeout(redis->ctx, tv);
306 xerrno = errno;
307
308 if (res == REDIS_ERR) {
309 pr_trace_msg(trace_channel, 4,
310 "error setting %lu ms timeout: %s", redis_io_millis,
311 redis_strerror(tmp_pool, redis, xerrno));
312 }
313
314 #if HIREDIS_MAJOR >= 0 && \
315 HIREDIS_MINOR >= 12
316 res = redisEnableKeepAlive(redis->ctx);
317 xerrno = errno;
318
319 if (res == REDIS_ERR) {
320 pr_trace_msg(trace_channel, 4,
321 "error setting keepalive: %s", redis_strerror(tmp_pool, redis, xerrno));
322 }
323 #endif /* HiRedis 0.12.0 and later */
324
325 destroy_pool(tmp_pool);
326 return 0;
327 }
328
sess_redis_cleanup(void * data)329 static void sess_redis_cleanup(void *data) {
330 sess_redis = NULL;
331 }
332
make_redis_conn(pool * p,const char * host,int port)333 static pr_redis_t *make_redis_conn(pool *p, const char *host, int port) {
334 int uses_ip = TRUE, xerrno;
335 pr_redis_t *redis;
336 pool *sub_pool;
337 redisContext *ctx;
338 struct timeval tv;
339
340 millis2timeval(&tv, redis_connect_millis);
341
342 /* If the given redis "server" string starts with a '/' character, assume
343 * that it is a Unix socket path.
344 */
345 if (*host == '/') {
346 uses_ip = FALSE;
347 ctx = redisConnectUnixWithTimeout(host, tv);
348
349 } else {
350 ctx = redisConnectWithTimeout(host, port, tv);
351 }
352
353 xerrno = errno;
354
355 if (ctx == NULL) {
356 errno = ENOMEM;
357 return NULL;
358 }
359
360 if (ctx->err != 0) {
361 const char *err_type, *err_msg;
362
363 switch (ctx->err) {
364 case REDIS_ERR_IO:
365 err_type = "io";
366 err_msg = strerror(xerrno);
367 break;
368
369 case REDIS_ERR_EOF:
370 err_type = "eof";
371 err_msg = ctx->errstr;
372 break;
373
374 case REDIS_ERR_PROTOCOL:
375 err_type = "protocol";
376 err_msg = ctx->errstr;
377 break;
378
379 case REDIS_ERR_OOM:
380 err_type = "oom";
381 err_msg = ctx->errstr;
382 break;
383
384 case REDIS_ERR_OTHER:
385 err_type = "other";
386 err_msg = ctx->errstr;
387 break;
388
389 default:
390 err_type = "unknown";
391 err_msg = ctx->errstr;
392 break;
393 }
394
395 if (uses_ip == TRUE) {
396 pr_trace_msg(trace_channel, 3,
397 "error connecting to %s#%d: [%s] %s", host, port, err_type, err_msg);
398
399 } else {
400 pr_trace_msg(trace_channel, 3,
401 "error connecting to '%s': [%s] %s", host, err_type, err_msg);
402 }
403
404 redisFree(ctx);
405 errno = EIO;
406 return NULL;
407 }
408
409 sub_pool = make_sub_pool(p);
410 pr_pool_tag(sub_pool, "Redis connection pool");
411
412 redis = pcalloc(sub_pool, sizeof(pr_redis_t));
413 redis->pool = sub_pool;
414 redis->ctx = ctx;
415
416 return redis;
417 }
418
discover_redis_master(pool * p,const char * host,int port,const char * master)419 static int discover_redis_master(pool *p, const char *host, int port,
420 const char *master) {
421 int res = 0, xerrno = 0;
422 pool *tmp_pool;
423 pr_redis_t *redis;
424 pr_netaddr_t *addr = NULL;
425
426 tmp_pool = make_sub_pool(p);
427
428 redis = make_redis_conn(tmp_pool, host, port);
429 xerrno = errno;
430
431 if (redis == NULL) {
432 destroy_pool(tmp_pool);
433
434 errno = xerrno;
435 return -1;
436 }
437
438 if (master == NULL) {
439 array_header *masters = NULL;
440
441 res = pr_redis_sentinel_get_masters(tmp_pool, redis, &masters);
442 if (res < 0) {
443 pr_trace_msg(trace_channel, 14, "error getting masters from Sentinel: %s",
444 strerror(errno));
445
446 } else {
447 master = ((char **) masters->elts)[0];
448 pr_trace_msg(trace_channel, 17, "discovered master '%s'", master);
449 }
450 }
451
452 res = pr_redis_sentinel_get_master_addr(tmp_pool, redis, master, &addr);
453 xerrno = errno;
454
455 if (res < 0) {
456 pr_redis_conn_destroy(redis);
457 destroy_pool(tmp_pool);
458
459 errno = xerrno;
460 return -1;
461 }
462
463 redis_server = pstrdup(p, pr_netaddr_get_ipstr(addr));
464 redis_port = ntohs(pr_netaddr_get_port(addr));
465
466 pr_redis_conn_destroy(redis);
467 destroy_pool(tmp_pool);
468
469 errno = xerrno;
470 return res;
471 }
472
pr_redis_conn_new(pool * p,module * m,unsigned long flags)473 pr_redis_t *pr_redis_conn_new(pool *p, module *m, unsigned long flags) {
474 int default_port, res, xerrno;
475 pr_redis_t *redis;
476 const char *default_host;
477
478 if (p == NULL) {
479 errno = EINVAL;
480 return NULL;
481 }
482
483 default_host = redis_server;
484 default_port = redis_port;
485
486 /* Do we have a list of Sentinels configured? If so, try those first. */
487 if (redis_sentinels != NULL) {
488 register unsigned int i;
489
490 pr_trace_msg(trace_channel, 17,
491 "querying Sentinels (%u count) for Redis server",
492 redis_sentinels->nelts);
493
494 for (i = 0; i < redis_sentinels->nelts; i++) {
495 pr_netaddr_t *addr;
496 const char *sentinel;
497 int port;
498
499 addr = ((pr_netaddr_t **) redis_sentinels->elts)[i];
500 sentinel = pr_netaddr_get_ipstr(addr);
501 port = ntohs(pr_netaddr_get_port(addr));
502
503 if (discover_redis_master(p, sentinel, port,
504 redis_sentinel_master) == 0) {
505 pr_trace_msg(trace_channel, 17,
506 "discovered Redis server %s:%d using Sentinel #%u (%s:%d)",
507 redis_server, redis_port, i+1, sentinel, port);
508 }
509 }
510 }
511
512 /* If the Sentinels failed to provide a usable host, fall back to the
513 * default.
514 */
515 if (redis_server == NULL) {
516 redis_server = default_host;
517 redis_port = default_port;
518 }
519
520 if (redis_server == NULL) {
521 pr_trace_msg(trace_channel, 9, "%s",
522 "unable to create new Redis connection: No server configured");
523 errno = EPERM;
524 return NULL;
525 }
526
527 redis = make_redis_conn(p, redis_server, redis_port);
528 if (redis == NULL) {
529 return NULL;
530 }
531
532 redis->owner = m;
533 redis->refcount = 1;
534 redis->flags = flags;
535
536 /* The namespace table is null; it will be created if/when callers
537 * configure namespace prefixes.
538 */
539 redis->namespace_tab = NULL;
540
541 /* Set some of the desired behavior flags on the connection */
542 res = set_conn_options(redis);
543 if (res < 0) {
544 xerrno = errno;
545
546 pr_redis_conn_destroy(redis);
547 errno = xerrno;
548 return NULL;
549 }
550
551 res = ping_server(redis);
552 if (res < 0) {
553 xerrno = errno;
554
555 pr_redis_conn_destroy(redis);
556 errno = xerrno;
557 return NULL;
558 }
559
560 /* Make sure we are connected to the configured server by querying
561 * some stats/info from it.
562 */
563 res = stat_server(redis, "server");
564 if (res < 0) {
565 xerrno = errno;
566
567 pr_redis_conn_destroy(redis);
568 errno = xerrno;
569 return NULL;
570 }
571
572 if (redis_password != NULL) {
573 if (redis_username != NULL) {
574 res = pr_redis_auth2(redis, redis_username, redis_password);
575
576 } else {
577 res = pr_redis_auth(redis, redis_password);
578 }
579
580 if (res < 0) {
581 xerrno = errno;
582
583 pr_redis_conn_destroy(redis);
584 errno = xerrno;
585 return NULL;
586 }
587 }
588
589 if (redis_db_idx != NULL) {
590 res = pr_redis_select(redis, redis_db_idx);
591 if (res < 0) {
592 xerrno = errno;
593
594 pr_redis_conn_destroy(redis);
595 errno = xerrno;
596 return NULL;
597 }
598 }
599
600 if (sess_redis == NULL) {
601 sess_redis = redis;
602
603 /* Register a cleanup on this redis, so that when it is destroyed, we
604 * clear this sess_redis pointer, lest it remaining dangling.
605 */
606 register_cleanup2(redis->pool, NULL, sess_redis_cleanup);
607 }
608
609 return redis;
610 }
611
612 /* Return TRUE if we actually closed the connection, FALSE if we simply
613 * decremented the refcount.
614 */
pr_redis_conn_close(pr_redis_t * redis)615 int pr_redis_conn_close(pr_redis_t *redis) {
616 int closed = FALSE;
617
618 if (redis == NULL) {
619 errno = EINVAL;
620 return -1;
621 }
622
623 if (redis->refcount > 0) {
624 redis->refcount--;
625 }
626
627 if (redis->refcount == 0) {
628 if (redis->ctx != NULL) {
629 const char *cmd = NULL;
630 redisReply *reply;
631
632 cmd = "QUIT";
633 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
634 reply = redisCommand(redis->ctx, "%s", cmd);
635 if (reply != NULL) {
636 freeReplyObject(reply);
637 }
638
639 redisFree(redis->ctx);
640 redis->ctx = NULL;
641 }
642
643 if (redis->namespace_tab != NULL) {
644 (void) pr_table_empty(redis->namespace_tab);
645 (void) pr_table_free(redis->namespace_tab);
646 redis->namespace_tab = NULL;
647 }
648
649 closed = TRUE;
650 }
651
652 return closed;
653 }
654
655 /* Return TRUE if we actually closed the connection, FALSE if we simply
656 * decremented the refcount.
657 */
pr_redis_conn_destroy(pr_redis_t * redis)658 int pr_redis_conn_destroy(pr_redis_t *redis) {
659 int closed, destroyed = FALSE;
660
661 if (redis == NULL) {
662 errno = EINVAL;
663 return -1;
664 }
665
666 closed = pr_redis_conn_close(redis);
667 if (closed < 0) {
668 return -1;
669 }
670
671 if (closed == TRUE) {
672 if (redis == sess_redis) {
673 sess_redis = NULL;
674 }
675
676 destroy_pool(redis->pool);
677 destroyed = TRUE;
678 }
679
680 return destroyed;
681 }
682
modptr_cmp_cb(const void * k1,size_t ksz1,const void * k2,size_t ksz2)683 static int modptr_cmp_cb(const void *k1, size_t ksz1, const void *k2,
684 size_t ksz2) {
685
686 /* Return zero to indicate a match, non-zero otherwise. */
687 return (((module *) k1) == ((module *) k2) ? 0 : 1);
688 }
689
modptr_hash_cb(const void * k,size_t ksz)690 static unsigned int modptr_hash_cb(const void *k, size_t ksz) {
691 unsigned int key = 0;
692
693 /* XXX Yes, this is a bit hacky for "hashing" a pointer value. */
694
695 memcpy(&key, k, sizeof(key));
696 key ^= (key >> 16);
697
698 return key;
699 }
700
pr_redis_conn_set_namespace(pr_redis_t * redis,module * m,const void * prefix,size_t prefixsz)701 int pr_redis_conn_set_namespace(pr_redis_t *redis, module *m,
702 const void *prefix, size_t prefixsz) {
703
704 if (redis == NULL ||
705 m == NULL) {
706 errno = EINVAL;
707 return -1;
708 }
709
710 if (prefix != NULL &&
711 prefixsz == 0) {
712 errno = EINVAL;
713 return -1;
714 }
715
716 if (redis->namespace_tab == NULL) {
717 pr_table_t *tab;
718
719 tab = pr_table_alloc(redis->pool, 0);
720
721 (void) pr_table_ctl(tab, PR_TABLE_CTL_SET_KEY_CMP, modptr_cmp_cb);
722 (void) pr_table_ctl(tab, PR_TABLE_CTL_SET_KEY_HASH, modptr_hash_cb);
723 redis->namespace_tab = tab;
724 }
725
726 if (prefix != NULL) {
727 int count;
728 void *val;
729 size_t valsz;
730
731 valsz = prefixsz;
732 val = palloc(redis->pool, valsz);
733 memcpy(val, prefix, prefixsz);
734
735 count = pr_table_kexists(redis->namespace_tab, m, sizeof(module *));
736 if (count <= 0) {
737 (void) pr_table_kadd(redis->namespace_tab, m, sizeof(module *), val,
738 valsz);
739
740 } else {
741 (void) pr_table_kset(redis->namespace_tab, m, sizeof(module *), val,
742 valsz);
743 }
744
745 } else {
746 /* A NULL prefix means the caller is removing their namespace mapping. */
747 (void) pr_table_kremove(redis->namespace_tab, m, sizeof(module *), NULL);
748 }
749
750 return 0;
751 }
752
pr_redis_conn_get_version(pr_redis_t * redis,unsigned int * major_version,unsigned int * minor_version,unsigned int * patch_version)753 int pr_redis_conn_get_version(pr_redis_t *redis, unsigned int *major_version,
754 unsigned int *minor_version, unsigned int *patch_version) {
755
756 if (redis == NULL) {
757 errno = EINVAL;
758 return -1;
759 }
760
761 if (major_version == NULL &&
762 minor_version == NULL &&
763 patch_version == NULL) {
764 errno = EINVAL;
765 return -1;
766 }
767
768 if (major_version != NULL) {
769 *major_version = redis->major_version;
770 }
771
772 if (minor_version != NULL) {
773 *minor_version = redis->minor_version;
774 }
775
776 if (patch_version != NULL) {
777 *patch_version = redis->patch_version;
778 }
779
780 return 0;
781 }
782
pr_redis_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,time_t expires)783 int pr_redis_add(pr_redis_t *redis, module *m, const char *key, void *value,
784 size_t valuesz, time_t expires) {
785 int res;
786
787 if (key == NULL) {
788 errno = EINVAL;
789 return -1;
790 }
791
792 res = pr_redis_kadd(redis, m, key, strlen(key), value, valuesz, expires);
793 if (res < 0) {
794 int xerrno = errno;
795
796 pr_trace_msg(trace_channel, 2,
797 "error adding key '%s', value (%lu bytes): %s", key,
798 (unsigned long) valuesz, strerror(xerrno));
799
800 errno = xerrno;
801 return -1;
802 }
803
804 return 0;
805 }
806
pr_redis_decr(pr_redis_t * redis,module * m,const char * key,uint32_t decr,uint64_t * value)807 int pr_redis_decr(pr_redis_t *redis, module *m, const char *key, uint32_t decr,
808 uint64_t *value) {
809 int res;
810
811 if (key == NULL) {
812 errno = EINVAL;
813 return -1;
814 }
815
816 res = pr_redis_kdecr(redis, m, key, strlen(key), decr, value);
817 if (res < 0) {
818 int xerrno = errno;
819
820 pr_trace_msg(trace_channel, 2,
821 "error decrementing key '%s' by %lu: %s", key,
822 (unsigned long) decr, strerror(xerrno));
823
824 errno = xerrno;
825 return -1;
826 }
827
828 return 0;
829 }
830
pr_redis_get(pool * p,pr_redis_t * redis,module * m,const char * key,size_t * valuesz)831 void *pr_redis_get(pool *p, pr_redis_t *redis, module *m, const char *key,
832 size_t *valuesz) {
833 void *ptr = NULL;
834
835 if (key == NULL) {
836 errno = EINVAL;
837 return NULL;
838 }
839
840 ptr = pr_redis_kget(p, redis, m, key, strlen(key), valuesz);
841 if (ptr == NULL) {
842 int xerrno = errno;
843
844 pr_trace_msg(trace_channel, 2,
845 "error getting data for key '%s': %s", key, strerror(xerrno));
846
847 errno = xerrno;
848 return NULL;
849 }
850
851 return ptr;
852 }
853
pr_redis_get_str(pool * p,pr_redis_t * redis,module * m,const char * key)854 char *pr_redis_get_str(pool *p, pr_redis_t *redis, module *m, const char *key) {
855 char *ptr = NULL;
856
857 if (key == NULL) {
858 errno = EINVAL;
859 return NULL;
860 }
861
862 ptr = pr_redis_kget_str(p, redis, m, key, strlen(key));
863 if (ptr == NULL) {
864 int xerrno = errno;
865
866 pr_trace_msg(trace_channel, 2,
867 "error getting data for key '%s': %s", key, strerror(xerrno));
868
869 errno = xerrno;
870 return NULL;
871 }
872
873 return ptr;
874 }
875
pr_redis_incr(pr_redis_t * redis,module * m,const char * key,uint32_t incr,uint64_t * value)876 int pr_redis_incr(pr_redis_t *redis, module *m, const char *key, uint32_t incr,
877 uint64_t *value) {
878 int res;
879
880 if (key == NULL) {
881 errno = EINVAL;
882 return -1;
883 }
884
885 res = pr_redis_kincr(redis, m, key, strlen(key), incr, value);
886 if (res < 0) {
887 int xerrno = errno;
888
889 pr_trace_msg(trace_channel, 2,
890 "error incrementing key '%s' by %lu: %s", key,
891 (unsigned long) incr, strerror(xerrno));
892
893 errno = xerrno;
894 return -1;
895 }
896
897 return 0;
898 }
899
pr_redis_remove(pr_redis_t * redis,module * m,const char * key)900 int pr_redis_remove(pr_redis_t *redis, module *m, const char *key) {
901 int res;
902
903 if (key == NULL) {
904 errno = EINVAL;
905 return -1;
906 }
907
908 res = pr_redis_kremove(redis, m, key, strlen(key));
909 if (res < 0) {
910 int xerrno = errno;
911
912 pr_trace_msg(trace_channel, 2,
913 "error removing key '%s': %s", key, strerror(xerrno));
914
915 errno = xerrno;
916 return -1;
917 }
918
919 return 0;
920 }
921
pr_redis_rename(pr_redis_t * redis,module * m,const char * from,const char * to)922 int pr_redis_rename(pr_redis_t *redis, module *m, const char *from,
923 const char *to) {
924 int res;
925
926 if (from == NULL ||
927 to == NULL) {
928 errno = EINVAL;
929 return -1;
930 }
931
932 res = pr_redis_krename(redis, m, from, strlen(from), to, strlen(to));
933 if (res < 0) {
934 int xerrno = errno;
935
936 pr_trace_msg(trace_channel, 2,
937 "error renaming key '%s' to '%s': %s", from, to, strerror(xerrno));
938
939 errno = xerrno;
940 return -1;
941 }
942
943 return 0;
944 }
945
pr_redis_set(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,time_t expires)946 int pr_redis_set(pr_redis_t *redis, module *m, const char *key, void *value,
947 size_t valuesz, time_t expires) {
948 int res;
949
950 if (key == NULL) {
951 errno = EINVAL;
952 return -1;
953 }
954
955 res = pr_redis_kset(redis, m, key, strlen(key), value, valuesz, expires);
956 if (res < 0) {
957 int xerrno = errno;
958
959 pr_trace_msg(trace_channel, 2,
960 "error setting key '%s', value (%lu bytes): %s", key,
961 (unsigned long) valuesz, strerror(xerrno));
962
963 errno = xerrno;
964 return -1;
965 }
966
967 return 0;
968 }
969
970 /* Hash operations */
pr_redis_hash_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)971 int pr_redis_hash_count(pr_redis_t *redis, module *m, const char *key,
972 uint64_t *count) {
973 int res;
974
975 if (key == NULL) {
976 errno = EINVAL;
977 return -1;
978 }
979
980 res = pr_redis_hash_kcount(redis, m, key, strlen(key), count);
981 if (res < 0) {
982 int xerrno = errno;
983
984 pr_trace_msg(trace_channel, 2,
985 "error counting hash using key '%s': %s", key, strerror(xerrno));
986
987 errno = xerrno;
988 return -1;
989 }
990
991 return 0;
992 }
993
pr_redis_hash_delete(pr_redis_t * redis,module * m,const char * key,const char * field)994 int pr_redis_hash_delete(pr_redis_t *redis, module *m, const char *key,
995 const char *field) {
996 int res;
997
998 if (key == NULL ||
999 field == NULL) {
1000 errno = EINVAL;
1001 return -1;
1002 }
1003
1004 res = pr_redis_hash_kdelete(redis, m, key, strlen(key), field, strlen(field));
1005 if (res < 0) {
1006 int xerrno = errno;
1007
1008 pr_trace_msg(trace_channel, 2,
1009 "error deleting field from hash using key '%s', field '%s': %s", key,
1010 field, strerror(xerrno));
1011
1012 errno = xerrno;
1013 return -1;
1014 }
1015
1016 return 0;
1017 }
1018
pr_redis_hash_exists(pr_redis_t * redis,module * m,const char * key,const char * field)1019 int pr_redis_hash_exists(pr_redis_t *redis, module *m, const char *key,
1020 const char *field) {
1021 int res;
1022
1023 if (key == NULL ||
1024 field == NULL) {
1025 errno = EINVAL;
1026 return -1;
1027 }
1028
1029 res = pr_redis_hash_kexists(redis, m, key, strlen(key), field, strlen(field));
1030 if (res < 0) {
1031 int xerrno = errno;
1032
1033 pr_trace_msg(trace_channel, 2,
1034 "error checking existence of hash using key '%s', field '%s': %s", key,
1035 field, strerror(xerrno));
1036
1037 errno = xerrno;
1038 return -1;
1039 }
1040
1041 return res;
1042 }
1043
pr_redis_hash_get(pool * p,pr_redis_t * redis,module * m,const char * key,const char * field,void ** value,size_t * valuesz)1044 int pr_redis_hash_get(pool *p, pr_redis_t *redis, module *m, const char *key,
1045 const char *field, void **value, size_t *valuesz) {
1046 int res;
1047
1048 if (key == NULL ||
1049 field == NULL) {
1050 errno = EINVAL;
1051 return -1;
1052 }
1053
1054 res = pr_redis_hash_kget(p, redis, m, key, strlen(key), field, strlen(field),
1055 value, valuesz);
1056 if (res < 0) {
1057 int xerrno = errno;
1058
1059 pr_trace_msg(trace_channel, 2,
1060 "error getting field from hash using key '%s', field '%s': %s", key,
1061 field, strerror(xerrno));
1062
1063 errno = xerrno;
1064 return -1;
1065 }
1066
1067 return 0;
1068 }
1069
pr_redis_hash_getall(pool * p,pr_redis_t * redis,module * m,const char * key,pr_table_t ** hash)1070 int pr_redis_hash_getall(pool *p, pr_redis_t *redis, module *m,
1071 const char *key, pr_table_t **hash) {
1072 int res;
1073
1074 if (key == NULL) {
1075 errno = EINVAL;
1076 return -1;
1077 }
1078
1079 res = pr_redis_hash_kgetall(p, redis, m, key, strlen(key), hash);
1080 if (res < 0) {
1081 int xerrno = errno;
1082
1083 pr_trace_msg(trace_channel, 2,
1084 "error entire hash using key '%s': %s", key, strerror(xerrno));
1085
1086 errno = xerrno;
1087 return -1;
1088 }
1089
1090 return 0;
1091 }
1092
pr_redis_hash_incr(pr_redis_t * redis,module * m,const char * key,const char * field,int32_t incr,int64_t * value)1093 int pr_redis_hash_incr(pr_redis_t *redis, module *m, const char *key,
1094 const char *field, int32_t incr, int64_t *value) {
1095 int res;
1096
1097 if (key == NULL ||
1098 field == NULL) {
1099 errno = EINVAL;
1100 return -1;
1101 }
1102
1103 res = pr_redis_hash_kincr(redis, m, key, strlen(key), field, strlen(field),
1104 incr, value);
1105 if (res < 0) {
1106 int xerrno = errno;
1107
1108 pr_trace_msg(trace_channel, 2,
1109 "error incrementing field in hash using key '%s', field '%s': %s", key,
1110 field, strerror(xerrno));
1111
1112 errno = xerrno;
1113 return -1;
1114 }
1115
1116 return 0;
1117 }
1118
pr_redis_hash_keys(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** fields)1119 int pr_redis_hash_keys(pool *p, pr_redis_t *redis, module *m, const char *key,
1120 array_header **fields) {
1121 int res;
1122
1123 if (key == NULL) {
1124 errno = EINVAL;
1125 return -1;
1126 }
1127
1128 res = pr_redis_hash_kkeys(p, redis, m, key, strlen(key), fields);
1129 if (res < 0) {
1130 int xerrno = errno;
1131
1132 pr_trace_msg(trace_channel, 2,
1133 "error obtaining keys from hash using key '%s': %s", key,
1134 strerror(xerrno));
1135
1136 errno = xerrno;
1137 return -1;
1138 }
1139
1140 return 0;
1141 }
1142
pr_redis_hash_remove(pr_redis_t * redis,module * m,const char * key)1143 int pr_redis_hash_remove(pr_redis_t *redis, module *m, const char *key) {
1144 int res;
1145
1146 if (key == NULL) {
1147 errno = EINVAL;
1148 return -1;
1149 }
1150
1151 res = pr_redis_hash_kremove(redis, m, key, strlen(key));
1152 if (res < 0) {
1153 int xerrno = errno;
1154
1155 pr_trace_msg(trace_channel, 2,
1156 "error removing hash using key '%s': %s", key, strerror(xerrno));
1157
1158 errno = xerrno;
1159 return -1;
1160 }
1161
1162 return 0;
1163 }
1164
pr_redis_hash_set(pr_redis_t * redis,module * m,const char * key,const char * field,void * value,size_t valuesz)1165 int pr_redis_hash_set(pr_redis_t *redis, module *m, const char *key,
1166 const char *field, void *value, size_t valuesz) {
1167 int res;
1168
1169 if (key == NULL ||
1170 field == NULL) {
1171 errno = EINVAL;
1172 return -1;
1173 }
1174
1175 res = pr_redis_hash_kset(redis, m, key, strlen(key), field, strlen(field),
1176 value, valuesz);
1177 if (res < 0) {
1178 int xerrno = errno;
1179
1180 pr_trace_msg(trace_channel, 2,
1181 "error setting field in hash using key '%s', field '%s': %s", key, field,
1182 strerror(xerrno));
1183
1184 errno = xerrno;
1185 return -1;
1186 }
1187
1188 return 0;
1189 }
1190
pr_redis_hash_setall(pr_redis_t * redis,module * m,const char * key,pr_table_t * hash)1191 int pr_redis_hash_setall(pr_redis_t *redis, module *m, const char *key,
1192 pr_table_t *hash) {
1193 int res;
1194
1195 if (key == NULL) {
1196 errno = EINVAL;
1197 return -1;
1198 }
1199
1200 res = pr_redis_hash_ksetall(redis, m, key, strlen(key), hash);
1201 if (res < 0) {
1202 int xerrno = errno;
1203
1204 pr_trace_msg(trace_channel, 2,
1205 "error setting hash using key '%s': %s", key, strerror(xerrno));
1206
1207 errno = xerrno;
1208 return -1;
1209 }
1210
1211 return 0;
1212 }
1213
pr_redis_hash_values(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values)1214 int pr_redis_hash_values(pool *p, pr_redis_t *redis, module *m,
1215 const char *key, array_header **values) {
1216 int res;
1217
1218 if (key == NULL) {
1219 errno = EINVAL;
1220 return -1;
1221 }
1222
1223 res = pr_redis_hash_kvalues(p, redis, m, key, strlen(key), values);
1224 if (res < 0) {
1225 int xerrno = errno;
1226
1227 pr_trace_msg(trace_channel, 2,
1228 "error getting values of hash using key '%s': %s", key, strerror(xerrno));
1229
1230 errno = xerrno;
1231 return -1;
1232 }
1233
1234 return 0;
1235 }
1236
1237 /* List operations */
pr_redis_list_append(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1238 int pr_redis_list_append(pr_redis_t *redis, module *m, const char *key,
1239 void *value, size_t valuesz) {
1240 int res;
1241
1242 if (key == NULL) {
1243 errno = EINVAL;
1244 return -1;
1245 }
1246
1247 res = pr_redis_list_kappend(redis, m, key, strlen(key), value, valuesz);
1248 if (res < 0) {
1249 int xerrno = errno;
1250
1251 pr_trace_msg(trace_channel, 2,
1252 "error appending to list using key '%s': %s", key, strerror(xerrno));
1253
1254 errno = xerrno;
1255 return -1;
1256 }
1257
1258 return 0;
1259 }
1260
pr_redis_list_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)1261 int pr_redis_list_count(pr_redis_t *redis, module *m, const char *key,
1262 uint64_t *count) {
1263 int res;
1264
1265 if (key == NULL) {
1266 errno = EINVAL;
1267 return -1;
1268 }
1269
1270 res = pr_redis_list_kcount(redis, m, key, strlen(key), count);
1271 if (res < 0) {
1272 int xerrno = errno;
1273
1274 pr_trace_msg(trace_channel, 2,
1275 "error counting list using key '%s': %s", key, strerror(xerrno));
1276
1277 errno = xerrno;
1278 return -1;
1279 }
1280
1281 return 0;
1282 }
1283
pr_redis_list_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1284 int pr_redis_list_delete(pr_redis_t *redis, module *m, const char *key,
1285 void *value, size_t valuesz) {
1286 int res;
1287
1288 if (value == NULL) {
1289 errno = EINVAL;
1290 return -1;
1291 }
1292
1293 res = pr_redis_list_kdelete(redis, m, key, strlen(key), value, valuesz);
1294 if (res < 0) {
1295 int xerrno = errno;
1296
1297 pr_trace_msg(trace_channel, 2,
1298 "error deleting item from list using key '%s': %s", key,
1299 strerror(xerrno));
1300
1301 errno = xerrno;
1302 return -1;
1303 }
1304
1305 return 0;
1306 }
1307
pr_redis_list_exists(pr_redis_t * redis,module * m,const char * key,unsigned int idx)1308 int pr_redis_list_exists(pr_redis_t *redis, module *m, const char *key,
1309 unsigned int idx) {
1310 int res;
1311
1312 if (key == NULL) {
1313 errno = EINVAL;
1314 return -1;
1315 }
1316
1317 res = pr_redis_list_kexists(redis, m, key, strlen(key), idx);
1318 if (res < 0) {
1319 int xerrno = errno;
1320
1321 pr_trace_msg(trace_channel, 2,
1322 "error checking item at index %u in list using key '%s': %s", idx, key,
1323 strerror(xerrno));
1324
1325 errno = xerrno;
1326 return -1;
1327 }
1328
1329 return res;
1330 }
1331
pr_redis_list_get(pool * p,pr_redis_t * redis,module * m,const char * key,unsigned int idx,void ** value,size_t * valuesz)1332 int pr_redis_list_get(pool *p, pr_redis_t *redis, module *m, const char *key,
1333 unsigned int idx, void **value, size_t *valuesz) {
1334 int res;
1335
1336 if (key == NULL) {
1337 errno = EINVAL;
1338 return -1;
1339 }
1340
1341 res = pr_redis_list_kget(p, redis, m, key, strlen(key), idx, value, valuesz);
1342 if (res < 0) {
1343 int xerrno = errno;
1344
1345 pr_trace_msg(trace_channel, 2,
1346 "error getting item at index %u in list using key '%s': %s", idx, key,
1347 strerror(xerrno));
1348
1349 errno = xerrno;
1350 return -1;
1351 }
1352
1353 return res;
1354 }
1355
pr_redis_list_getall(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values,array_header ** valueszs)1356 int pr_redis_list_getall(pool *p, pr_redis_t *redis, module *m, const char *key,
1357 array_header **values, array_header **valueszs) {
1358 int res;
1359
1360 if (key == NULL) {
1361 errno = EINVAL;
1362 return -1;
1363 }
1364
1365 res = pr_redis_list_kgetall(p, redis, m, key, strlen(key), values, valueszs);
1366 if (res < 0) {
1367 int xerrno = errno;
1368
1369 pr_trace_msg(trace_channel, 2,
1370 "error getting items in list using key '%s': %s", key, strerror(xerrno));
1371
1372 errno = xerrno;
1373 return -1;
1374 }
1375
1376 return res;
1377 }
1378
pr_redis_list_pop(pool * p,pr_redis_t * redis,module * m,const char * key,void ** value,size_t * valuesz,int flags)1379 int pr_redis_list_pop(pool *p, pr_redis_t *redis, module *m, const char *key,
1380 void **value, size_t *valuesz, int flags) {
1381 int res;
1382
1383 if (key == NULL) {
1384 errno = EINVAL;
1385 return -1;
1386 }
1387
1388 res = pr_redis_list_kpop(p, redis, m, key, strlen(key), value, valuesz,
1389 flags);
1390 if (res < 0) {
1391 int xerrno = errno;
1392
1393 pr_trace_msg(trace_channel, 2,
1394 "error popping item from list using key '%s': %s", key, strerror(xerrno));
1395
1396 errno = xerrno;
1397 return -1;
1398 }
1399
1400 return res;
1401 }
1402
pr_redis_list_push(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,int flags)1403 int pr_redis_list_push(pr_redis_t *redis, module *m, const char *key,
1404 void *value, size_t valuesz, int flags) {
1405 int res;
1406
1407 if (key == NULL) {
1408 errno = EINVAL;
1409 return -1;
1410 }
1411
1412 res = pr_redis_list_kpush(redis, m, key, strlen(key), value, valuesz, flags);
1413 if (res < 0) {
1414 int xerrno = errno;
1415
1416 pr_trace_msg(trace_channel, 2,
1417 "error pushing item into list using key '%s': %s", key, strerror(xerrno));
1418
1419 errno = xerrno;
1420 return -1;
1421 }
1422
1423 return 0;
1424 }
1425
pr_redis_list_remove(pr_redis_t * redis,module * m,const char * key)1426 int pr_redis_list_remove(pr_redis_t *redis, module *m, const char *key) {
1427 int res;
1428
1429 if (key == NULL) {
1430 errno = EINVAL;
1431 return -1;
1432 }
1433
1434 res = pr_redis_list_kremove(redis, m, key, strlen(key));
1435 if (res < 0) {
1436 int xerrno = errno;
1437
1438 pr_trace_msg(trace_channel, 2,
1439 "error removing list using key '%s': %s", key, strerror(xerrno));
1440
1441 errno = xerrno;
1442 return -1;
1443 }
1444
1445 return 0;
1446 }
1447
pr_redis_list_rotate(pool * p,pr_redis_t * redis,module * m,const char * key,void ** value,size_t * valuesz)1448 int pr_redis_list_rotate(pool *p, pr_redis_t *redis, module *m,
1449 const char *key, void **value, size_t *valuesz) {
1450 int res;
1451
1452 if (key == NULL) {
1453 errno = EINVAL;
1454 return -1;
1455 }
1456
1457 res = pr_redis_list_krotate(p, redis, m, key, strlen(key), value, valuesz);
1458 if (res < 0) {
1459 int xerrno = errno;
1460
1461 pr_trace_msg(trace_channel, 2,
1462 "error rotating list using key '%s': %s", key, strerror(xerrno));
1463
1464 errno = xerrno;
1465 return -1;
1466 }
1467
1468 return 0;
1469 }
1470
pr_redis_list_set(pr_redis_t * redis,module * m,const char * key,unsigned int idx,void * value,size_t valuesz)1471 int pr_redis_list_set(pr_redis_t *redis, module *m, const char *key,
1472 unsigned int idx, void *value, size_t valuesz) {
1473 int res;
1474
1475 if (key == NULL) {
1476 errno = EINVAL;
1477 return -1;
1478 }
1479
1480 res = pr_redis_list_kset(redis, m, key, strlen(key), idx, value, valuesz);
1481 if (res < 0) {
1482 int xerrno = errno;
1483
1484 pr_trace_msg(trace_channel, 2,
1485 "error setting item in list using key '%s', index %u: %s", key, idx,
1486 strerror(xerrno));
1487
1488 errno = xerrno;
1489 return -1;
1490 }
1491
1492 return 0;
1493 }
1494
pr_redis_list_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs)1495 int pr_redis_list_setall(pr_redis_t *redis, module *m, const char *key,
1496 array_header *values, array_header *valueszs) {
1497 int res;
1498
1499 if (key == NULL) {
1500 errno = EINVAL;
1501 return -1;
1502 }
1503
1504 res = pr_redis_list_ksetall(redis, m, key, strlen(key), values, valueszs);
1505 if (res < 0) {
1506 int xerrno = errno;
1507
1508 pr_trace_msg(trace_channel, 2,
1509 "error setting items in list using key '%s': %s", key, strerror(xerrno));
1510
1511 errno = xerrno;
1512 return -1;
1513 }
1514
1515 return 0;
1516 }
1517
1518 /* Set operations */
pr_redis_set_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1519 int pr_redis_set_add(pr_redis_t *redis, module *m, const char *key,
1520 void *value, size_t valuesz) {
1521 int res;
1522
1523 if (key == NULL) {
1524 errno = EINVAL;
1525 return -1;
1526 }
1527
1528 res = pr_redis_set_kadd(redis, m, key, strlen(key), value, valuesz);
1529 if (res < 0) {
1530 int xerrno = errno;
1531
1532 pr_trace_msg(trace_channel, 2,
1533 "error adding item to set using key '%s': %s", key, strerror(xerrno));
1534
1535 errno = xerrno;
1536 return -1;
1537 }
1538
1539 return 0;
1540 }
1541
pr_redis_set_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)1542 int pr_redis_set_count(pr_redis_t *redis, module *m, const char *key,
1543 uint64_t *count) {
1544 int res;
1545
1546 if (key == NULL) {
1547 errno = EINVAL;
1548 return -1;
1549 }
1550
1551 res = pr_redis_set_kcount(redis, m, key, strlen(key), count);
1552 if (res < 0) {
1553 int xerrno = errno;
1554
1555 pr_trace_msg(trace_channel, 2,
1556 "error counting set using key '%s': %s", key, strerror(xerrno));
1557
1558 errno = xerrno;
1559 return -1;
1560 }
1561
1562 return 0;
1563 }
1564
pr_redis_set_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1565 int pr_redis_set_delete(pr_redis_t *redis, module *m, const char *key,
1566 void *value, size_t valuesz) {
1567 int res;
1568
1569 if (key == NULL) {
1570 errno = EINVAL;
1571 return -1;
1572 }
1573
1574 res = pr_redis_set_kdelete(redis, m, key, strlen(key), value, valuesz);
1575 if (res < 0) {
1576 int xerrno = errno;
1577
1578 pr_trace_msg(trace_channel, 2,
1579 "error deleting item from set using key '%s': %s", key, strerror(xerrno));
1580
1581 errno = xerrno;
1582 return -1;
1583 }
1584
1585 return 0;
1586 }
1587
pr_redis_set_exists(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1588 int pr_redis_set_exists(pr_redis_t *redis, module *m, const char *key,
1589 void *value, size_t valuesz) {
1590 int res;
1591
1592 if (key == NULL) {
1593 errno = EINVAL;
1594 return -1;
1595 }
1596
1597 res = pr_redis_set_kexists(redis, m, key, strlen(key), value, valuesz);
1598 if (res < 0) {
1599 int xerrno = errno;
1600
1601 pr_trace_msg(trace_channel, 2,
1602 "error checking item in set using key '%s': %s", key, strerror(xerrno));
1603
1604 errno = xerrno;
1605 return -1;
1606 }
1607
1608 return res;
1609 }
1610
pr_redis_set_getall(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values,array_header ** valueszs)1611 int pr_redis_set_getall(pool *p, pr_redis_t *redis, module *m, const char *key,
1612 array_header **values, array_header **valueszs) {
1613 int res;
1614
1615 if (key == NULL) {
1616 errno = EINVAL;
1617 return -1;
1618 }
1619
1620 res = pr_redis_set_kgetall(p, redis, m, key, strlen(key), values, valueszs);
1621 if (res < 0) {
1622 int xerrno = errno;
1623
1624 pr_trace_msg(trace_channel, 2,
1625 "error getting items in set using key '%s': %s", key, strerror(xerrno));
1626
1627 errno = xerrno;
1628 return -1;
1629 }
1630
1631 return res;
1632 }
1633
pr_redis_set_remove(pr_redis_t * redis,module * m,const char * key)1634 int pr_redis_set_remove(pr_redis_t *redis, module *m, const char *key) {
1635 int res;
1636
1637 if (key == NULL) {
1638 errno = EINVAL;
1639 return -1;
1640 }
1641
1642 res = pr_redis_set_kremove(redis, m, key, strlen(key));
1643 if (res < 0) {
1644 int xerrno = errno;
1645
1646 pr_trace_msg(trace_channel, 2,
1647 "error removing set using key '%s': %s", key, strerror(xerrno));
1648
1649 errno = xerrno;
1650 return -1;
1651 }
1652
1653 return 0;
1654 }
1655
pr_redis_set_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs)1656 int pr_redis_set_setall(pr_redis_t *redis, module *m, const char *key,
1657 array_header *values, array_header *valueszs) {
1658 int res;
1659
1660 if (key == NULL) {
1661 errno = EINVAL;
1662 return -1;
1663 }
1664
1665 res = pr_redis_set_ksetall(redis, m, key, strlen(key), values, valueszs);
1666 if (res < 0) {
1667 int xerrno = errno;
1668
1669 pr_trace_msg(trace_channel, 2,
1670 "error setting items in set using key '%s': %s", key, strerror(xerrno));
1671
1672 errno = xerrno;
1673 return -1;
1674 }
1675
1676 return 0;
1677 }
1678
1679 /* Sorted Set operations */
pr_redis_sorted_set_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float score)1680 int pr_redis_sorted_set_add(pr_redis_t *redis, module *m, const char *key,
1681 void *value, size_t valuesz, float score) {
1682 int res;
1683
1684 if (key == NULL) {
1685 errno = EINVAL;
1686 return -1;
1687 }
1688
1689 res = pr_redis_sorted_set_kadd(redis, m, key, strlen(key), value, valuesz,
1690 score);
1691 if (res < 0) {
1692 int xerrno = errno;
1693
1694 pr_trace_msg(trace_channel, 2,
1695 "error adding item with score %0.3f to sorted set using key '%s': %s",
1696 score, key, strerror(xerrno));
1697
1698 errno = xerrno;
1699 return -1;
1700 }
1701
1702 return 0;
1703 }
1704
pr_redis_sorted_set_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)1705 int pr_redis_sorted_set_count(pr_redis_t *redis, module *m, const char *key,
1706 uint64_t *count) {
1707 int res;
1708
1709 if (key == NULL) {
1710 errno = EINVAL;
1711 return -1;
1712 }
1713
1714 res = pr_redis_sorted_set_kcount(redis, m, key, strlen(key), count);
1715 if (res < 0) {
1716 int xerrno = errno;
1717
1718 pr_trace_msg(trace_channel, 2,
1719 "error counting sorted set using key '%s': %s", key, strerror(xerrno));
1720
1721 errno = xerrno;
1722 return -1;
1723 }
1724
1725 return 0;
1726 }
1727
pr_redis_sorted_set_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1728 int pr_redis_sorted_set_delete(pr_redis_t *redis, module *m, const char *key,
1729 void *value, size_t valuesz) {
1730 int res;
1731
1732 if (key == NULL) {
1733 errno = EINVAL;
1734 return -1;
1735 }
1736
1737 res = pr_redis_sorted_set_kdelete(redis, m, key, strlen(key), value, valuesz);
1738 if (res < 0) {
1739 int xerrno = errno;
1740
1741 pr_trace_msg(trace_channel, 2,
1742 "error deleting item from sorted set using key '%s': %s", key,
1743 strerror(xerrno));
1744
1745 errno = xerrno;
1746 return -1;
1747 }
1748
1749 return 0;
1750 }
1751
pr_redis_sorted_set_exists(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)1752 int pr_redis_sorted_set_exists(pr_redis_t *redis, module *m, const char *key,
1753 void *value, size_t valuesz) {
1754 int res;
1755
1756 if (key == NULL) {
1757 errno = EINVAL;
1758 return -1;
1759 }
1760
1761 res = pr_redis_sorted_set_kexists(redis, m, key, strlen(key), value, valuesz);
1762 if (res < 0) {
1763 int xerrno = errno;
1764
1765 pr_trace_msg(trace_channel, 2,
1766 "error checking item in sorted set using key '%s': %s", key,
1767 strerror(xerrno));
1768
1769 errno = xerrno;
1770 return -1;
1771 }
1772
1773 return res;
1774 }
1775
pr_redis_sorted_set_getn(pool * p,pr_redis_t * redis,module * m,const char * key,unsigned int offset,unsigned int len,array_header ** values,array_header ** valueszs,int flags)1776 int pr_redis_sorted_set_getn(pool *p, pr_redis_t *redis, module *m,
1777 const char *key, unsigned int offset, unsigned int len,
1778 array_header **values, array_header **valueszs, int flags) {
1779 int res;
1780
1781 if (key == NULL) {
1782 errno = EINVAL;
1783 return -1;
1784 }
1785
1786 res = pr_redis_sorted_set_kgetn(p, redis, m, key, strlen(key), offset, len,
1787 values, valueszs, flags);
1788 if (res < 0) {
1789 int xerrno = errno;
1790
1791 pr_trace_msg(trace_channel, 2,
1792 "error getting %u %s from sorted set using key '%s': %s", len,
1793 len != 1 ? "items" : "item", key, strerror(xerrno));
1794
1795 errno = xerrno;
1796 return -1;
1797 }
1798
1799 return res;
1800 }
1801
pr_redis_sorted_set_incr(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float incr,float * score)1802 int pr_redis_sorted_set_incr(pr_redis_t *redis, module *m, const char *key,
1803 void *value, size_t valuesz, float incr, float *score) {
1804 int res;
1805
1806 if (key == NULL) {
1807 errno = EINVAL;
1808 return -1;
1809 }
1810
1811 res = pr_redis_sorted_set_kincr(redis, m, key, strlen(key), value, valuesz,
1812 incr, score);
1813 if (res < 0) {
1814 int xerrno = errno;
1815
1816 pr_trace_msg(trace_channel, 2,
1817 "error incrementing item by %0.3f in sorted set using key '%s': %s",
1818 incr, key, strerror(xerrno));
1819
1820 errno = xerrno;
1821 return -1;
1822 }
1823
1824 return res;
1825 }
1826
pr_redis_sorted_set_remove(pr_redis_t * redis,module * m,const char * key)1827 int pr_redis_sorted_set_remove(pr_redis_t *redis, module *m, const char *key) {
1828 int res;
1829
1830 if (key == NULL) {
1831 errno = EINVAL;
1832 return -1;
1833 }
1834
1835 res = pr_redis_sorted_set_kremove(redis, m, key, strlen(key));
1836 if (res < 0) {
1837 int xerrno = errno;
1838
1839 pr_trace_msg(trace_channel, 2,
1840 "error removing sorted set using key '%s': %s", key, strerror(xerrno));
1841
1842 errno = xerrno;
1843 return -1;
1844 }
1845
1846 return 0;
1847 }
1848
pr_redis_sorted_set_score(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float * score)1849 int pr_redis_sorted_set_score(pr_redis_t *redis, module *m, const char *key,
1850 void *value, size_t valuesz, float *score) {
1851 int res;
1852
1853 if (key == NULL) {
1854 errno = EINVAL;
1855 return -1;
1856 }
1857
1858 res = pr_redis_sorted_set_kscore(redis, m, key, strlen(key), value, valuesz,
1859 score);
1860 if (res < 0) {
1861 int xerrno = errno;
1862
1863 pr_trace_msg(trace_channel, 2,
1864 "error getting score for item in sorted set using key '%s': %s", key,
1865 strerror(xerrno));
1866
1867 errno = xerrno;
1868 return -1;
1869 }
1870
1871 return res;
1872 }
1873
pr_redis_sorted_set_set(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float score)1874 int pr_redis_sorted_set_set(pr_redis_t *redis, module *m, const char *key,
1875 void *value, size_t valuesz, float score) {
1876 int res;
1877
1878 if (key == NULL) {
1879 errno = EINVAL;
1880 return -1;
1881 }
1882
1883 res = pr_redis_sorted_set_kset(redis, m, key, strlen(key), value, valuesz,
1884 score);
1885 if (res < 0) {
1886 int xerrno = errno;
1887
1888 pr_trace_msg(trace_channel, 2,
1889 "error setting item to score %0.3f in sorted set using key '%s': %s",
1890 score, key, strerror(xerrno));
1891
1892 errno = xerrno;
1893 return -1;
1894 }
1895
1896 return 0;
1897 }
1898
pr_redis_sorted_set_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs,array_header * scores)1899 int pr_redis_sorted_set_setall(pr_redis_t *redis, module *m, const char *key,
1900 array_header *values, array_header *valueszs, array_header *scores) {
1901 int res;
1902
1903 if (key == NULL) {
1904 errno = EINVAL;
1905 return -1;
1906 }
1907
1908 res = pr_redis_sorted_set_ksetall(redis, m, key, strlen(key), values,
1909 valueszs, scores);
1910 if (res < 0) {
1911 int xerrno = errno;
1912
1913 pr_trace_msg(trace_channel, 2,
1914 "error setting items in sorted set using key '%s': %s", key,
1915 strerror(xerrno));
1916
1917 errno = xerrno;
1918 return -1;
1919 }
1920
1921 return 0;
1922 }
1923
get_namespace_key(pool * p,pr_redis_t * redis,module * m,const char * key,size_t * keysz)1924 static const char *get_namespace_key(pool *p, pr_redis_t *redis, module *m,
1925 const char *key, size_t *keysz) {
1926
1927 if (m != NULL &&
1928 redis->namespace_tab != NULL) {
1929 const char *prefix = NULL;
1930 size_t prefixsz = 0;
1931
1932 prefix = pr_table_kget(redis->namespace_tab, m, sizeof(module *),
1933 &prefixsz);
1934 if (prefix != NULL) {
1935 char *new_key;
1936 size_t new_keysz;
1937
1938 pr_trace_msg(trace_channel, 25,
1939 "using namespace prefix '%s' for module 'mod_%s.c'", prefix, m->name);
1940
1941 /* Since the given key may not be text, we cannot simply use pstrcat()
1942 * to prepend our namespace value.
1943 */
1944 new_keysz = prefixsz + *keysz;
1945 new_key = palloc(p, new_keysz);
1946 memcpy(new_key, prefix, prefixsz);
1947 memcpy(new_key + prefixsz, key, *keysz);
1948
1949 key = new_key;
1950 *keysz = new_keysz;
1951 }
1952 }
1953
1954 return key;
1955 }
1956
get_reply_type(int reply_type)1957 static const char *get_reply_type(int reply_type) {
1958 const char *type_name;
1959
1960 switch (reply_type) {
1961 case REDIS_REPLY_STRING:
1962 type_name = "STRING";
1963 break;
1964
1965 case REDIS_REPLY_ARRAY:
1966 type_name = "ARRAY";
1967 break;
1968
1969 case REDIS_REPLY_INTEGER:
1970 type_name = "INTEGER";
1971 break;
1972
1973 case REDIS_REPLY_NIL:
1974 type_name = "NIL";
1975 break;
1976
1977 case REDIS_REPLY_STATUS:
1978 type_name = "STATUS";
1979 break;
1980
1981 case REDIS_REPLY_ERROR:
1982 type_name = "ERROR";
1983 break;
1984
1985 default:
1986 type_name = "unknown";
1987 }
1988
1989 return type_name;
1990 }
1991
pr_redis_command(pr_redis_t * redis,const array_header * args,int reply_type)1992 int pr_redis_command(pr_redis_t *redis, const array_header *args,
1993 int reply_type) {
1994 register unsigned int i;
1995 pool *tmp_pool = NULL;
1996 array_header *arglens;
1997 const char *cmd = NULL;
1998 redisReply *reply;
1999 int redis_reply_type;
2000
2001 if (redis == NULL ||
2002 args == NULL ||
2003 args->nelts == 0) {
2004 errno = EINVAL;
2005 return -1;
2006 }
2007
2008 switch (reply_type) {
2009 case PR_REDIS_REPLY_TYPE_STRING:
2010 redis_reply_type = REDIS_REPLY_STRING;
2011 break;
2012
2013 case PR_REDIS_REPLY_TYPE_INTEGER:
2014 redis_reply_type = REDIS_REPLY_INTEGER;
2015 break;
2016
2017 case PR_REDIS_REPLY_TYPE_NIL:
2018 redis_reply_type = REDIS_REPLY_NIL;
2019 break;
2020
2021 case PR_REDIS_REPLY_TYPE_ARRAY:
2022 redis_reply_type = REDIS_REPLY_ARRAY;
2023 break;
2024
2025 case PR_REDIS_REPLY_TYPE_STATUS:
2026 redis_reply_type = REDIS_REPLY_STATUS;
2027 break;
2028
2029 case PR_REDIS_REPLY_TYPE_ERROR:
2030 redis_reply_type = REDIS_REPLY_ERROR;
2031 break;
2032
2033 default:
2034 errno = EINVAL;
2035 return -1;
2036 }
2037
2038 tmp_pool = make_sub_pool(redis->pool);
2039 pr_pool_tag(tmp_pool, "Redis Command pool");
2040
2041 arglens = make_array(tmp_pool, args->nelts, sizeof(size_t));
2042 for (i = 0; i < args->nelts; i++) {
2043 pr_signals_handle();
2044 *((size_t *) push_array(arglens)) = strlen(((char **) args->elts)[i]);
2045 }
2046
2047 cmd = ((char **) args->elts)[0];
2048 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2049 reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
2050 reply = handle_reply(redis, cmd, reply);
2051 if (reply == NULL) {
2052 destroy_pool(tmp_pool);
2053 errno = EIO;
2054 return -1;
2055 }
2056
2057 if (reply->type != redis_reply_type) {
2058 pr_trace_msg(trace_channel, 2,
2059 "expected %s reply for %s, got %s", get_reply_type(redis_reply_type), cmd,
2060 get_reply_type(reply->type));
2061
2062 if (reply->type == REDIS_REPLY_ERROR) {
2063 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2064 }
2065
2066 freeReplyObject(reply);
2067 destroy_pool(tmp_pool);
2068 errno = EINVAL;
2069 return -1;
2070 }
2071
2072 switch (reply->type) {
2073 case REDIS_REPLY_STRING:
2074 case REDIS_REPLY_STATUS:
2075 case REDIS_REPLY_ERROR:
2076 pr_trace_msg(trace_channel, 7, "%s %s reply: %.*s", cmd,
2077 get_reply_type(reply->type), (int) reply->len, reply->str);
2078 break;
2079
2080 case REDIS_REPLY_INTEGER:
2081 pr_trace_msg(trace_channel, 7, "%s INTEGER reply: %lld", cmd,
2082 reply->integer);
2083 break;
2084
2085 case REDIS_REPLY_NIL:
2086 pr_trace_msg(trace_channel, 7, "%s NIL reply", cmd);
2087 break;
2088
2089 case REDIS_REPLY_ARRAY:
2090 pr_trace_msg(trace_channel, 7, "%s ARRAY reply: (%lu %s)", cmd,
2091 (unsigned long) reply->elements,
2092 reply->elements != 1 ? "elements" : "element");
2093 break;
2094
2095 default:
2096 break;
2097 }
2098
2099 freeReplyObject(reply);
2100 destroy_pool(tmp_pool);
2101 return 0;
2102 }
2103
pr_redis_auth2(pr_redis_t * redis,const char * username,const char * password)2104 int pr_redis_auth2(pr_redis_t *redis, const char *username,
2105 const char *password) {
2106 const char *cmd;
2107 pool *tmp_pool;
2108 redisReply *reply;
2109
2110 if (redis == NULL ||
2111 username == NULL ||
2112 password == NULL) {
2113 errno = EINVAL;
2114 return -1;
2115 }
2116
2117 tmp_pool = make_sub_pool(redis->pool);
2118 pr_pool_tag(tmp_pool, "Redis AUTH pool");
2119
2120 cmd = "AUTH";
2121 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2122
2123 /* Redis 6.x changed the AUTH semantics, now requiring a username. */
2124 if (redis->major_version >= 6) {
2125 reply = redisCommand(redis->ctx, "%s %s %s", cmd, username, password);
2126
2127 } else {
2128 reply = redisCommand(redis->ctx, "%s %s", cmd, password);
2129 }
2130
2131 reply = handle_reply(redis, cmd, reply);
2132 if (reply == NULL) {
2133 pr_trace_msg(trace_channel, 2,
2134 "error authenticating client: %s", strerror(errno));
2135 destroy_pool(tmp_pool);
2136 errno = EIO;
2137 return -1;
2138 }
2139
2140 if (reply->type != REDIS_REPLY_STRING &&
2141 reply->type != REDIS_REPLY_STATUS) {
2142 pr_trace_msg(trace_channel, 2,
2143 "expected STRING or STATUS reply for %s, got %s", cmd,
2144 get_reply_type(reply->type));
2145
2146 if (reply->type == REDIS_REPLY_ERROR) {
2147 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2148 }
2149
2150 freeReplyObject(reply);
2151 destroy_pool(tmp_pool);
2152 errno = EINVAL;
2153 return -1;
2154 }
2155
2156 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2157 reply->str);
2158
2159 freeReplyObject(reply);
2160 destroy_pool(tmp_pool);
2161 return 0;
2162 }
2163
pr_redis_auth(pr_redis_t * redis,const char * password)2164 int pr_redis_auth(pr_redis_t *redis, const char *password) {
2165 return pr_redis_auth2(redis, "default", password);
2166 }
2167
pr_redis_select(pr_redis_t * redis,const char * db_idx)2168 int pr_redis_select(pr_redis_t *redis, const char *db_idx) {
2169 const char *cmd;
2170 pool *tmp_pool;
2171 redisReply *reply;
2172
2173 if (redis == NULL ||
2174 db_idx == NULL) {
2175 errno = EINVAL;
2176 return -1;
2177 }
2178
2179 tmp_pool = make_sub_pool(redis->pool);
2180 pr_pool_tag(tmp_pool, "Redis SELECT pool");
2181
2182 cmd = "SELECT";
2183 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2184 reply = redisCommand(redis->ctx, "%s %s", cmd, db_idx);
2185 reply = handle_reply(redis, cmd, reply);
2186 if (reply == NULL) {
2187 pr_trace_msg(trace_channel, 2,
2188 "error selecting database '%s': %s", db_idx, strerror(errno));
2189 destroy_pool(tmp_pool);
2190 errno = EIO;
2191 return -1;
2192 }
2193
2194 if (reply->type != REDIS_REPLY_STRING &&
2195 reply->type != REDIS_REPLY_STATUS) {
2196 pr_trace_msg(trace_channel, 2,
2197 "expected STRING or STATUS reply for %s, got %s", cmd,
2198 get_reply_type(reply->type));
2199
2200 if (reply->type == REDIS_REPLY_ERROR) {
2201 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2202 }
2203
2204 freeReplyObject(reply);
2205 destroy_pool(tmp_pool);
2206 errno = EINVAL;
2207 return -1;
2208 }
2209
2210 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2211 reply->str);
2212
2213 freeReplyObject(reply);
2214 destroy_pool(tmp_pool);
2215 return 0;
2216 }
2217
pr_redis_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,time_t expires)2218 int pr_redis_kadd(pr_redis_t *redis, module *m, const char *key, size_t keysz,
2219 void *value, size_t valuesz, time_t expires) {
2220 return pr_redis_kset(redis, m, key, keysz, value, valuesz, expires);
2221 }
2222
pr_redis_kdecr(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint32_t decr,uint64_t * value)2223 int pr_redis_kdecr(pr_redis_t *redis, module *m, const char *key, size_t keysz,
2224 uint32_t decr, uint64_t *value) {
2225 pool *tmp_pool = NULL;
2226 const char *cmd = NULL;
2227 redisReply *reply;
2228
2229 if (redis == NULL ||
2230 m == NULL ||
2231 key == NULL ||
2232 decr == 0) {
2233 errno = EINVAL;
2234 return -1;
2235 }
2236
2237 tmp_pool = make_sub_pool(redis->pool);
2238 pr_pool_tag(tmp_pool, "Redis DECRBY pool");
2239
2240 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2241
2242 cmd = "DECRBY";
2243 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2244 reply = redisCommand(redis->ctx, "%s %b %lu", cmd, key, keysz,
2245 (unsigned long) decr);
2246 reply = handle_reply(redis, cmd, reply);
2247 if (reply == NULL) {
2248 pr_trace_msg(trace_channel, 2,
2249 "error decrementing key (%lu bytes) by %lu using %s: %s",
2250 (unsigned long) keysz, (unsigned long) decr, cmd, strerror(errno));
2251 destroy_pool(tmp_pool);
2252 errno = EIO;
2253 return -1;
2254 }
2255
2256 if (reply->type != REDIS_REPLY_INTEGER) {
2257 pr_trace_msg(trace_channel, 2,
2258 "expected INTEGER reply for %s, got %s", cmd,
2259 get_reply_type(reply->type));
2260
2261 if (reply->type == REDIS_REPLY_ERROR) {
2262 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2263 }
2264
2265 freeReplyObject(reply);
2266 destroy_pool(tmp_pool);
2267 errno = EINVAL;
2268 return -1;
2269 }
2270
2271 /* Note: DECRBY will automatically set the key value to zero if it does
2272 * not already exist. To detect a nonexistent key, then, we look to
2273 * see if the return value is exactly our requested decrement. If so,
2274 * REMOVE the auto-created key, and return ENOENT.
2275 */
2276 if ((decr * -1) == (uint32_t) reply->integer) {
2277 freeReplyObject(reply);
2278 destroy_pool(tmp_pool);
2279 (void) pr_redis_kremove(redis, m, key, keysz);
2280 errno = ENOENT;
2281 return -1;
2282 }
2283
2284 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2285
2286 if (value != NULL) {
2287 *value = (uint64_t) reply->integer;
2288 }
2289
2290 freeReplyObject(reply);
2291 destroy_pool(tmp_pool);
2292 return 0;
2293 }
2294
pr_redis_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,size_t * valuesz)2295 void *pr_redis_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
2296 size_t keysz, size_t *valuesz) {
2297 const char *cmd;
2298 pool *tmp_pool;
2299 redisReply *reply;
2300 char *data = NULL;
2301
2302 if (p == NULL ||
2303 redis == NULL ||
2304 m == NULL ||
2305 key == NULL ||
2306 valuesz == NULL) {
2307 errno = EINVAL;
2308 return NULL;
2309 }
2310
2311 tmp_pool = make_sub_pool(redis->pool);
2312 pr_pool_tag(tmp_pool, "Redis GET pool");
2313
2314 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2315
2316 cmd = "GET";
2317 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2318 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2319 reply = handle_reply(redis, cmd, reply);
2320 if (reply == NULL) {
2321 pr_trace_msg(trace_channel, 2,
2322 "error getting data for key (%lu bytes) using %s: %s",
2323 (unsigned long) keysz, cmd, strerror(errno));
2324 destroy_pool(tmp_pool);
2325 errno = EIO;
2326 return NULL;
2327 }
2328
2329 if (reply->type == REDIS_REPLY_NIL) {
2330 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
2331 freeReplyObject(reply);
2332 destroy_pool(tmp_pool);
2333 errno = ENOENT;
2334 return NULL;
2335 }
2336
2337 if (reply->type != REDIS_REPLY_STRING) {
2338 pr_trace_msg(trace_channel, 2,
2339 "expected STRING reply for %s, got %s", cmd, get_reply_type(reply->type));
2340 freeReplyObject(reply);
2341 destroy_pool(tmp_pool);
2342 errno = EINVAL;
2343 return NULL;
2344 }
2345
2346 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2347 reply->str);
2348
2349 if (valuesz != NULL) {
2350 *valuesz = (uint64_t) reply->len;
2351 }
2352
2353 data = palloc(p, reply->len);
2354 memcpy(data, reply->str, reply->len);
2355
2356 freeReplyObject(reply);
2357 destroy_pool(tmp_pool);
2358 return data;
2359 }
2360
pr_redis_kget_str(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz)2361 char *pr_redis_kget_str(pool *p, pr_redis_t *redis, module *m, const char *key,
2362 size_t keysz) {
2363 const char *cmd;
2364 pool *tmp_pool;
2365 redisReply *reply;
2366 char *data = NULL;
2367
2368 if (p == NULL ||
2369 redis == NULL ||
2370 m == NULL ||
2371 key == NULL) {
2372 errno = EINVAL;
2373 return NULL;
2374 }
2375
2376 tmp_pool = make_sub_pool(redis->pool);
2377 pr_pool_tag(tmp_pool, "Redis GET pool");
2378
2379 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2380
2381 cmd = "GET";
2382 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2383 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2384 reply = handle_reply(redis, cmd, reply);
2385 if (reply == NULL) {
2386 pr_trace_msg(trace_channel, 2,
2387 "error getting data for key (%lu bytes) using %s: %s",
2388 (unsigned long) keysz, cmd, strerror(errno));
2389 destroy_pool(tmp_pool);
2390 errno = EIO;
2391 return NULL;
2392 }
2393
2394 if (reply->type == REDIS_REPLY_NIL) {
2395 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
2396 freeReplyObject(reply);
2397 destroy_pool(tmp_pool);
2398 errno = ENOENT;
2399 return NULL;
2400 }
2401
2402 if (reply->type != REDIS_REPLY_STRING) {
2403 pr_trace_msg(trace_channel, 2,
2404 "expected STRING reply for %s, got %s", cmd, get_reply_type(reply->type));
2405 freeReplyObject(reply);
2406 destroy_pool(tmp_pool);
2407 errno = EINVAL;
2408 return NULL;
2409 }
2410
2411 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2412 reply->str);
2413
2414 data = pstrndup(p, reply->str, reply->len);
2415
2416 freeReplyObject(reply);
2417 destroy_pool(tmp_pool);
2418 return data;
2419 }
2420
pr_redis_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint32_t incr,uint64_t * value)2421 int pr_redis_kincr(pr_redis_t *redis, module *m, const char *key, size_t keysz,
2422 uint32_t incr, uint64_t *value) {
2423 pool *tmp_pool = NULL;
2424 const char *cmd = NULL;
2425 redisReply *reply;
2426
2427 if (redis == NULL ||
2428 m == NULL ||
2429 key == NULL ||
2430 incr == 0) {
2431 errno = EINVAL;
2432 return -1;
2433 }
2434
2435 tmp_pool = make_sub_pool(redis->pool);
2436 pr_pool_tag(tmp_pool, "Redis INCRBY pool");
2437
2438 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2439
2440 cmd = "INCRBY";
2441 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2442 reply = redisCommand(redis->ctx, "%s %b %lu", cmd, key, keysz,
2443 (unsigned long) incr);
2444 reply = handle_reply(redis, cmd, reply);
2445 if (reply == NULL) {
2446 pr_trace_msg(trace_channel, 2,
2447 "error incrementing key (%lu bytes) by %lu using %s: %s",
2448 (unsigned long) keysz, (unsigned long) incr, cmd, strerror(errno));
2449 destroy_pool(tmp_pool);
2450 errno = EIO;
2451 return -1;
2452 }
2453
2454 if (reply->type != REDIS_REPLY_INTEGER) {
2455 pr_trace_msg(trace_channel, 2,
2456 "expected INTEGER reply for %s, got %s", cmd,
2457 get_reply_type(reply->type));
2458
2459 if (reply->type == REDIS_REPLY_ERROR) {
2460 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2461 }
2462
2463 freeReplyObject(reply);
2464 destroy_pool(tmp_pool);
2465 errno = EINVAL;
2466 return -1;
2467 }
2468
2469 /* Note: INCRBY will automatically set the key value to zero if it does
2470 * not already exist. To detect a nonexistent key, then, we look to
2471 * see if the return value is exactly our requested increment. If so,
2472 * REMOVE the auto-created key, and return ENOENT.
2473 */
2474 if (incr == (uint32_t) reply->integer) {
2475 freeReplyObject(reply);
2476 destroy_pool(tmp_pool);
2477 (void) pr_redis_kremove(redis, m, key, keysz);
2478 errno = ENOENT;
2479 return -1;
2480 }
2481
2482 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2483
2484 if (value != NULL) {
2485 *value = (uint64_t) reply->integer;
2486 }
2487
2488 freeReplyObject(reply);
2489 destroy_pool(tmp_pool);
2490 return 0;
2491 }
2492
pr_redis_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)2493 int pr_redis_kremove(pr_redis_t *redis, module *m, const char *key,
2494 size_t keysz) {
2495 int xerrno = 0;
2496 pool *tmp_pool = NULL;
2497 const char *cmd = NULL;
2498 redisReply *reply;
2499 long long count;
2500
2501 if (redis == NULL ||
2502 m == NULL ||
2503 key == NULL) {
2504 errno = EINVAL;
2505 return -1;
2506 }
2507
2508 tmp_pool = make_sub_pool(redis->pool);
2509 pr_pool_tag(tmp_pool, "Redis DEL pool");
2510
2511 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2512
2513 cmd = "DEL";
2514 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2515 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2516 xerrno = errno;
2517
2518 reply = handle_reply(redis, cmd, reply);
2519 if (reply == NULL) {
2520 pr_trace_msg(trace_channel, 2,
2521 "error removing key (%lu bytes): %s", (unsigned long) keysz,
2522 strerror(errno));
2523 destroy_pool(tmp_pool);
2524 errno = xerrno;
2525 return -1;
2526 }
2527
2528 if (reply->type != REDIS_REPLY_INTEGER) {
2529 pr_trace_msg(trace_channel, 2,
2530 "expected INTEGER reply for %s, got %s", cmd,
2531 get_reply_type(reply->type));
2532 freeReplyObject(reply);
2533 destroy_pool(tmp_pool);
2534 errno = EINVAL;
2535 return -1;
2536 }
2537
2538 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2539 count = reply->integer;
2540
2541 freeReplyObject(reply);
2542 destroy_pool(tmp_pool);
2543
2544 if (count == 0) {
2545 /* No keys removed. */
2546 errno = ENOENT;
2547 return -1;
2548 }
2549
2550 return 0;
2551 }
2552
pr_redis_krename(pr_redis_t * redis,module * m,const char * from,size_t fromsz,const char * to,size_t tosz)2553 int pr_redis_krename(pr_redis_t *redis, module *m, const char *from,
2554 size_t fromsz, const char *to, size_t tosz) {
2555 int xerrno = 0;
2556 pool *tmp_pool = NULL;
2557 const char *cmd = NULL;
2558 redisReply *reply;
2559
2560 if (redis == NULL ||
2561 m == NULL ||
2562 from == NULL ||
2563 fromsz == 0 ||
2564 to == NULL ||
2565 tosz == 0) {
2566 errno = EINVAL;
2567 return -1;
2568 }
2569
2570 tmp_pool = make_sub_pool(redis->pool);
2571 pr_pool_tag(tmp_pool, "Redis RENAME pool");
2572
2573 from = get_namespace_key(tmp_pool, redis, m, from, &fromsz);
2574 to = get_namespace_key(tmp_pool, redis, m, to, &tosz);
2575
2576 cmd = "RENAME";
2577 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2578 reply = redisCommand(redis->ctx, "%s %b %b", cmd, from, fromsz, to, tosz);
2579 xerrno = errno;
2580
2581 reply = handle_reply(redis, cmd, reply);
2582 if (reply == NULL) {
2583 pr_trace_msg(trace_channel, 2,
2584 "error renaming key (from %lu bytes, to %lu bytes): %s",
2585 (unsigned long) fromsz, (unsigned long) tosz, strerror(errno));
2586 destroy_pool(tmp_pool);
2587 errno = xerrno;
2588 return -1;
2589 }
2590
2591 if (reply->type != REDIS_REPLY_STRING &&
2592 reply->type != REDIS_REPLY_STATUS) {
2593 xerrno = EINVAL;
2594
2595 pr_trace_msg(trace_channel, 2,
2596 "expected STRING or STATUS reply for %s, got %s", cmd,
2597 get_reply_type(reply->type));
2598
2599 if (reply->type == REDIS_REPLY_ERROR) {
2600 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2601
2602 /* Note: In order to provide ENOENT semantics here, we have to be
2603 * naughty, and assume the contents of this error message.
2604 */
2605 if (strstr(reply->str, "no such key") != NULL) {
2606 xerrno = ENOENT;
2607 }
2608 }
2609
2610 freeReplyObject(reply);
2611 destroy_pool(tmp_pool);
2612 errno = xerrno;
2613 return -1;
2614 }
2615
2616 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
2617 reply->str);
2618
2619 freeReplyObject(reply);
2620 destroy_pool(tmp_pool);
2621 return 0;
2622 }
2623
pr_redis_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,time_t expires)2624 int pr_redis_kset(pr_redis_t *redis, module *m, const char *key, size_t keysz,
2625 void *value, size_t valuesz, time_t expires) {
2626 pool *tmp_pool = NULL;
2627 const char *cmd = NULL;
2628 redisReply *reply;
2629
2630 /* XXX Should we allow null values to be added, thus allowing use of keys
2631 * as sentinels?
2632 */
2633 if (redis == NULL ||
2634 m == NULL ||
2635 key == NULL ||
2636 value == NULL) {
2637 errno = EINVAL;
2638 return -1;
2639 }
2640
2641 tmp_pool = make_sub_pool(redis->pool);
2642 pr_pool_tag(tmp_pool, "Redis SET pool");
2643
2644 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2645
2646 if (expires > 0) {
2647 cmd = "SETEX";
2648 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2649 reply = redisCommand(redis->ctx, "%s %b %lu %b", cmd, key, keysz,
2650 (unsigned long) expires, value, valuesz);
2651
2652 } else {
2653 cmd = "SET";
2654 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2655 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value,
2656 valuesz);
2657 }
2658
2659 reply = handle_reply(redis, cmd, reply);
2660 if (reply == NULL) {
2661 pr_trace_msg(trace_channel, 2,
2662 "error adding key (%lu bytes), value (%lu bytes) using %s: %s",
2663 (unsigned long) keysz, (unsigned long) valuesz, cmd, strerror(errno));
2664 destroy_pool(tmp_pool);
2665 errno = EIO;
2666 return -1;
2667 }
2668
2669 pr_trace_msg(trace_channel, 7, "%s reply: %s", cmd, reply->str);
2670 freeReplyObject(reply);
2671 destroy_pool(tmp_pool);
2672 return 0;
2673 }
2674
pr_redis_hash_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)2675 int pr_redis_hash_kcount(pr_redis_t *redis, module *m, const char *key,
2676 size_t keysz, uint64_t *count) {
2677 int xerrno = 0;
2678 pool *tmp_pool = NULL;
2679 const char *cmd = NULL;
2680 redisReply *reply;
2681
2682 if (redis == NULL ||
2683 m == NULL ||
2684 key == NULL ||
2685 keysz == 0 ||
2686 count == NULL) {
2687 errno = EINVAL;
2688 return -1;
2689 }
2690
2691 tmp_pool = make_sub_pool(redis->pool);
2692 pr_pool_tag(tmp_pool, "Redis HLEN pool");
2693
2694 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2695
2696 cmd = "HLEN";
2697 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2698 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2699 xerrno = errno;
2700
2701 reply = handle_reply(redis, cmd, reply);
2702 if (reply == NULL) {
2703 pr_trace_msg(trace_channel, 2,
2704 "error getting count of hash using key (%lu bytes): %s",
2705 (unsigned long) keysz, strerror(errno));
2706 destroy_pool(tmp_pool);
2707 errno = xerrno;
2708 return -1;
2709 }
2710
2711 if (reply->type != REDIS_REPLY_INTEGER) {
2712 pr_trace_msg(trace_channel, 2,
2713 "expected INTEGER reply for %s, got %s", cmd,
2714 get_reply_type(reply->type));
2715
2716 if (reply->type == REDIS_REPLY_ERROR) {
2717 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2718 }
2719
2720 freeReplyObject(reply);
2721 destroy_pool(tmp_pool);
2722 errno = EINVAL;
2723 return -1;
2724 }
2725
2726 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2727 *count = reply->integer;
2728
2729 freeReplyObject(reply);
2730 destroy_pool(tmp_pool);
2731 return 0;
2732 }
2733
pr_redis_hash_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz)2734 int pr_redis_hash_kdelete(pr_redis_t *redis, module *m, const char *key,
2735 size_t keysz, const char *field, size_t fieldsz) {
2736 int xerrno = 0, exists = FALSE;
2737 pool *tmp_pool = NULL;
2738 const char *cmd = NULL;
2739 redisReply *reply;
2740
2741 if (redis == NULL ||
2742 m == NULL ||
2743 key == NULL ||
2744 keysz == 0 ||
2745 field == NULL ||
2746 fieldsz == 0) {
2747 errno = EINVAL;
2748 return -1;
2749 }
2750
2751 tmp_pool = make_sub_pool(redis->pool);
2752 pr_pool_tag(tmp_pool, "Redis HDEL pool");
2753
2754 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2755
2756 cmd = "HDEL";
2757 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2758 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, field, fieldsz);
2759 xerrno = errno;
2760
2761 reply = handle_reply(redis, cmd, reply);
2762 if (reply == NULL) {
2763 pr_trace_msg(trace_channel, 2,
2764 "error getting count of hash using key (%lu bytes): %s",
2765 (unsigned long) keysz, strerror(errno));
2766 destroy_pool(tmp_pool);
2767 errno = xerrno;
2768 return -1;
2769 }
2770
2771 if (reply->type != REDIS_REPLY_INTEGER) {
2772 pr_trace_msg(trace_channel, 2,
2773 "expected INTEGER reply for %s, got %s", cmd,
2774 get_reply_type(reply->type));
2775
2776 if (reply->type == REDIS_REPLY_ERROR) {
2777 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2778 }
2779
2780 freeReplyObject(reply);
2781 destroy_pool(tmp_pool);
2782 errno = EINVAL;
2783 return -1;
2784 }
2785
2786 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2787 exists = reply->integer ? TRUE : FALSE;
2788
2789 freeReplyObject(reply);
2790 destroy_pool(tmp_pool);
2791
2792 if (exists == FALSE) {
2793 errno = ENOENT;
2794 return -1;
2795 }
2796
2797 return 0;
2798 }
2799
pr_redis_hash_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz)2800 int pr_redis_hash_kexists(pr_redis_t *redis, module *m, const char *key,
2801 size_t keysz, const char *field, size_t fieldsz) {
2802 int xerrno = 0, exists = FALSE;
2803 pool *tmp_pool = NULL;
2804 const char *cmd = NULL;
2805 redisReply *reply;
2806
2807 if (redis == NULL ||
2808 m == NULL ||
2809 key == NULL ||
2810 keysz == 0 ||
2811 field == NULL ||
2812 fieldsz == 0) {
2813 errno = EINVAL;
2814 return -1;
2815 }
2816
2817 tmp_pool = make_sub_pool(redis->pool);
2818 pr_pool_tag(tmp_pool, "Redis HEXISTS pool");
2819
2820 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2821
2822 cmd = "HEXISTS";
2823 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2824 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, field, fieldsz);
2825 xerrno = errno;
2826
2827 reply = handle_reply(redis, cmd, reply);
2828 if (reply == NULL) {
2829 pr_trace_msg(trace_channel, 2,
2830 "error getting count of hash using key (%lu bytes): %s",
2831 (unsigned long) keysz, strerror(errno));
2832 destroy_pool(tmp_pool);
2833 errno = xerrno;
2834 return -1;
2835 }
2836
2837 if (reply->type != REDIS_REPLY_INTEGER) {
2838 pr_trace_msg(trace_channel, 2,
2839 "expected INTEGER reply for %s, got %s", cmd,
2840 get_reply_type(reply->type));
2841
2842 if (reply->type == REDIS_REPLY_ERROR) {
2843 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2844 }
2845
2846 freeReplyObject(reply);
2847 destroy_pool(tmp_pool);
2848 errno = EINVAL;
2849 return -1;
2850 }
2851
2852 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
2853 exists = reply->integer ? TRUE : FALSE;
2854
2855 freeReplyObject(reply);
2856 destroy_pool(tmp_pool);
2857 return exists;
2858 }
2859
pr_redis_hash_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,void ** value,size_t * valuesz)2860 int pr_redis_hash_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
2861 size_t keysz, const char *field, size_t fieldsz, void **value,
2862 size_t *valuesz) {
2863 int xerrno = 0, exists = FALSE;
2864 pool *tmp_pool = NULL;
2865 const char *cmd = NULL;
2866 redisReply *reply;
2867
2868 if (p == NULL ||
2869 redis == NULL ||
2870 m == NULL ||
2871 key == NULL ||
2872 keysz == 0 ||
2873 field == NULL ||
2874 fieldsz == 0 ||
2875 value == NULL) {
2876 errno = EINVAL;
2877 return -1;
2878 }
2879
2880 tmp_pool = make_sub_pool(redis->pool);
2881 pr_pool_tag(tmp_pool, "Redis HGET pool");
2882
2883 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2884
2885 cmd = "HGET";
2886 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2887 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, field, fieldsz);
2888 xerrno = errno;
2889
2890 reply = handle_reply(redis, cmd, reply);
2891 if (reply == NULL) {
2892 pr_trace_msg(trace_channel, 2,
2893 "error getting item for field in hash using key (%lu bytes): %s",
2894 (unsigned long) keysz, strerror(errno));
2895 destroy_pool(tmp_pool);
2896 errno = xerrno;
2897 return -1;
2898 }
2899
2900 if (reply->type != REDIS_REPLY_STRING &&
2901 reply->type != REDIS_REPLY_NIL) {
2902 pr_trace_msg(trace_channel, 2,
2903 "expected STRING or NIL reply for %s, got %s", cmd,
2904 get_reply_type(reply->type));
2905
2906 if (reply->type == REDIS_REPLY_ERROR) {
2907 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2908 }
2909
2910 freeReplyObject(reply);
2911 destroy_pool(tmp_pool);
2912 errno = EINVAL;
2913 return -1;
2914 }
2915
2916 if (reply->type == REDIS_REPLY_STRING) {
2917 pr_trace_msg(trace_channel, 7, "%s reply: (%lu bytes)", cmd,
2918 (unsigned long) reply->len);
2919
2920 *value = palloc(p, reply->len);
2921 memcpy(*value, reply->str, reply->len);
2922
2923 if (valuesz != NULL) {
2924 *valuesz = reply->len;
2925 }
2926
2927 exists = TRUE;
2928
2929 } else {
2930 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
2931 }
2932
2933 freeReplyObject(reply);
2934 destroy_pool(tmp_pool);
2935
2936 if (exists == FALSE) {
2937 errno = ENOENT;
2938 return -1;
2939 }
2940
2941 return 0;
2942 }
2943
pr_redis_hash_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,pr_table_t ** hash)2944 int pr_redis_hash_kgetall(pool *p, pr_redis_t *redis, module *m,
2945 const char *key, size_t keysz, pr_table_t **hash) {
2946 int res, xerrno = 0;
2947 pool *tmp_pool = NULL;
2948 const char *cmd = NULL;
2949 redisReply *reply;
2950
2951 if (p == NULL ||
2952 redis == NULL ||
2953 m == NULL ||
2954 key == NULL ||
2955 keysz == 0 ||
2956 hash == NULL) {
2957 errno = EINVAL;
2958 return -1;
2959 }
2960
2961 tmp_pool = make_sub_pool(redis->pool);
2962 pr_pool_tag(tmp_pool, "Redis HGETALL pool");
2963
2964 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
2965
2966 cmd = "HGETALL";
2967 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
2968 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
2969 xerrno = errno;
2970
2971 reply = handle_reply(redis, cmd, reply);
2972 if (reply == NULL) {
2973 pr_trace_msg(trace_channel, 2,
2974 "error getting hash using key (%lu bytes): %s",
2975 (unsigned long) keysz, strerror(errno));
2976 destroy_pool(tmp_pool);
2977 errno = xerrno;
2978 return -1;
2979 }
2980
2981 if (reply->type != REDIS_REPLY_ARRAY) {
2982 pr_trace_msg(trace_channel, 2,
2983 "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
2984
2985 if (reply->type == REDIS_REPLY_ERROR) {
2986 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
2987 }
2988
2989 freeReplyObject(reply);
2990 destroy_pool(tmp_pool);
2991 errno = EINVAL;
2992 return -1;
2993 }
2994
2995 if (reply->elements > 0) {
2996 register unsigned int i;
2997
2998 pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
2999 (unsigned long) reply->elements,
3000 reply->elements != 1 ? "elements" : "element");
3001
3002 *hash = pr_table_alloc(p, 0);
3003
3004 for (i = 0; i < reply->elements; i += 2) {
3005 redisReply *key_elt, *value_elt;
3006 void *key_data = NULL, *value_data = NULL;
3007 size_t key_datasz = 0, value_datasz = 0;
3008
3009 key_elt = reply->element[i];
3010 if (key_elt->type == REDIS_REPLY_STRING) {
3011 key_datasz = key_elt->len;
3012 key_data = palloc(p, key_datasz);
3013 memcpy(key_data, key_elt->str, key_datasz);
3014
3015 } else {
3016 pr_trace_msg(trace_channel, 2,
3017 "expected STRING element at index %u, got %s", i,
3018 get_reply_type(key_elt->type));
3019 }
3020
3021 value_elt = reply->element[i+1];
3022 if (value_elt->type == REDIS_REPLY_STRING) {
3023 value_datasz = value_elt->len;
3024 value_data = palloc(p, value_datasz);
3025 memcpy(value_data, value_elt->str, value_datasz);
3026
3027 } else {
3028 pr_trace_msg(trace_channel, 2,
3029 "expected STRING element at index %u, got %s", i + 2,
3030 get_reply_type(value_elt->type));
3031 }
3032
3033 if (key_data != NULL &&
3034 value_data != NULL) {
3035 if (pr_table_kadd(*hash, key_data, key_datasz, value_data,
3036 value_datasz) < 0) {
3037 pr_trace_msg(trace_channel, 2,
3038 "error adding key (%lu bytes), value (%lu bytes) to hash: %s",
3039 (unsigned long) key_datasz, (unsigned long) value_datasz,
3040 strerror(errno));
3041 }
3042 }
3043 }
3044
3045 res = 0;
3046
3047 } else {
3048 xerrno = ENOENT;
3049 res = -1;
3050 }
3051
3052 freeReplyObject(reply);
3053 destroy_pool(tmp_pool);
3054
3055 errno = xerrno;
3056 return res;
3057 }
3058
pr_redis_hash_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,int32_t incr,int64_t * value)3059 int pr_redis_hash_kincr(pr_redis_t *redis, module *m, const char *key,
3060 size_t keysz, const char *field, size_t fieldsz, int32_t incr,
3061 int64_t *value) {
3062 int xerrno = 0, exists = FALSE;
3063 pool *tmp_pool = NULL;
3064 const char *cmd = NULL;
3065 redisReply *reply;
3066
3067 if (redis == NULL ||
3068 m == NULL ||
3069 key == NULL ||
3070 keysz == 0 ||
3071 field == NULL ||
3072 fieldsz == 0) {
3073 errno = EINVAL;
3074 return -1;
3075 }
3076
3077 exists = pr_redis_hash_kexists(redis, m, key, keysz, field, fieldsz);
3078 if (exists == FALSE) {
3079 errno = ENOENT;
3080 return -1;
3081 }
3082
3083 tmp_pool = make_sub_pool(redis->pool);
3084 pr_pool_tag(tmp_pool, "Redis HINCRBY pool");
3085
3086 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3087
3088 cmd = "HINCRBY";
3089 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3090 reply = redisCommand(redis->ctx, "%s %b %b %d", cmd, key, keysz, field,
3091 fieldsz, incr);
3092 xerrno = errno;
3093
3094 reply = handle_reply(redis, cmd, reply);
3095 if (reply == NULL) {
3096 pr_trace_msg(trace_channel, 2,
3097 "error incrementing field in hash using key (%lu bytes): %s",
3098 (unsigned long) keysz, strerror(errno));
3099 destroy_pool(tmp_pool);
3100 errno = xerrno;
3101 return -1;
3102 }
3103
3104 if (reply->type != REDIS_REPLY_INTEGER) {
3105 pr_trace_msg(trace_channel, 2,
3106 "expected INTEGER reply for %s, got %s", cmd,
3107 get_reply_type(reply->type));
3108
3109 if (reply->type == REDIS_REPLY_ERROR) {
3110 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3111 }
3112
3113 freeReplyObject(reply);
3114 destroy_pool(tmp_pool);
3115 errno = EINVAL;
3116 return -1;
3117 }
3118
3119 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
3120 if (value != NULL) {
3121 *value = (int64_t) reply->integer;
3122 }
3123
3124 freeReplyObject(reply);
3125 destroy_pool(tmp_pool);
3126 return 0;
3127 }
3128
hash_scan(pool * p,pr_redis_t * redis,const char * key,size_t keysz,array_header * fields,char ** cursor,int count)3129 static int hash_scan(pool *p, pr_redis_t *redis, const char *key,
3130 size_t keysz, array_header *fields, char **cursor, int count) {
3131 int res = 0, xerrno = 0;
3132 const char *cmd = NULL;
3133 redisReply *reply;
3134
3135 cmd = "HSCAN";
3136 pr_trace_msg(trace_channel, 7, "sending command: %s %s", cmd, *cursor);
3137 reply = redisCommand(redis->ctx, "%s %b %s COUNT %d", cmd, key, keysz,
3138 *cursor, count);
3139 xerrno = errno;
3140
3141 reply = handle_reply(redis, cmd, reply);
3142 if (reply == NULL) {
3143 pr_trace_msg(trace_channel, 2,
3144 "error getting fields of hash using key (%lu bytes), cursor '%s': %s",
3145 (unsigned long) keysz, *cursor, strerror(errno));
3146 errno = xerrno;
3147 return -1;
3148 }
3149
3150 if (reply->type != REDIS_REPLY_ARRAY) {
3151 pr_trace_msg(trace_channel, 2,
3152 "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
3153
3154 if (reply->type == REDIS_REPLY_ERROR) {
3155 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3156 }
3157
3158 freeReplyObject(reply);
3159 errno = EINVAL;
3160 return -1;
3161 }
3162
3163 if (reply->elements == 2) {
3164 redisReply *elt;
3165
3166 elt = reply->element[0];
3167 if (elt->type == REDIS_REPLY_STRING) {
3168 *cursor = pstrndup(p, elt->str, elt->len);
3169
3170 } else {
3171 pr_trace_msg(trace_channel, 2,
3172 "expected STRING element at index 0, got %s",
3173 get_reply_type(elt->type));
3174
3175 xerrno = EINVAL;
3176 res = -1;
3177 }
3178
3179 if (res == 0) {
3180 elt = reply->element[1];
3181 if (elt->type == REDIS_REPLY_ARRAY) {
3182 register unsigned int i;
3183
3184 pr_trace_msg(trace_channel, 7, "%s reply: %s %lu %s", cmd, *cursor,
3185 (unsigned long) elt->elements,
3186 elt->elements != 1 ? "elements" : "element");
3187
3188 /* When using HSCAN, we iterate over ALL the fields of the hash,
3189 * key AND value. Thus to get just the keys, we need every other
3190 * item.
3191 */
3192 for (i = 1; i < elt->elements; i += 2) {
3193 redisReply *item;
3194
3195 item = elt->element[i];
3196 if (item->type == REDIS_REPLY_STRING) {
3197 char *field;
3198
3199 field = pstrndup(p, item->str, item->len);
3200 *((char **) push_array(fields)) = field;
3201
3202 } else {
3203 pr_trace_msg(trace_channel, 2,
3204 "expected STRING element at index %u, got %s", i,
3205 get_reply_type(elt->type));
3206 }
3207 }
3208
3209 if (strcmp(*cursor, "0") == 0) {
3210 /* Set the cursor to NULL, to indicate to the caller the end
3211 * of the iteration.
3212 */
3213 *cursor = NULL;
3214 }
3215
3216 } else {
3217 pr_trace_msg(trace_channel, 2,
3218 "expected ARRAY element at index 1, got %s",
3219 get_reply_type(elt->type));
3220
3221 xerrno = EINVAL;
3222 res = -1;
3223 }
3224 }
3225
3226 } else {
3227 xerrno = ENOENT;
3228 res = -1;
3229 }
3230
3231 freeReplyObject(reply);
3232 errno = xerrno;
3233 return res;
3234 }
3235
pr_redis_hash_kkeys(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** fields)3236 int pr_redis_hash_kkeys(pool *p, pr_redis_t *redis, module *m, const char *key,
3237 size_t keysz, array_header **fields) {
3238 int res = 0, xerrno = 0;
3239 pool *tmp_pool = NULL;
3240 char *cursor;
3241
3242 if (p == NULL ||
3243 redis == NULL ||
3244 m == NULL ||
3245 key == NULL ||
3246 keysz == 0 ||
3247 fields == NULL) {
3248 errno = EINVAL;
3249 return -1;
3250 }
3251
3252 tmp_pool = make_sub_pool(redis->pool);
3253 pr_pool_tag(tmp_pool, "Redis HSCAN pool");
3254
3255 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3256
3257 cursor = "0";
3258 res = 0;
3259 *fields = make_array(p, 0, sizeof(char *));
3260
3261 while (res == 0 &&
3262 cursor != NULL) {
3263 pr_signals_handle();
3264
3265 res = hash_scan(tmp_pool, redis, key, keysz, *fields, &cursor,
3266 PR_REDIS_SCAN_SIZE);
3267 xerrno = errno;
3268
3269 if (res < 0) {
3270 destroy_pool(tmp_pool);
3271
3272 errno = xerrno;
3273 return -1;
3274 }
3275 }
3276
3277 if ((*fields)->nelts == 0) {
3278 *fields = NULL;
3279 xerrno = ENOENT;
3280 res = -1;
3281 }
3282
3283 destroy_pool(tmp_pool);
3284 errno = xerrno;
3285 return res;
3286 }
3287
pr_redis_hash_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)3288 int pr_redis_hash_kremove(pr_redis_t *redis, module *m, const char *key,
3289 size_t keysz) {
3290
3291 /* Note: We can actually use just DEL here. */
3292 return pr_redis_kremove(redis, m, key, keysz);
3293 }
3294
pr_redis_hash_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,void * value,size_t valuesz)3295 int pr_redis_hash_kset(pr_redis_t *redis, module *m, const char *key,
3296 size_t keysz, const char *field, size_t fieldsz, void *value,
3297 size_t valuesz) {
3298 int xerrno = 0;
3299 pool *tmp_pool = NULL;
3300 const char *cmd = NULL;
3301 redisReply *reply;
3302
3303 if (redis == NULL ||
3304 m == NULL ||
3305 key == NULL ||
3306 keysz == 0 ||
3307 field == NULL ||
3308 fieldsz == 0 ||
3309 value == NULL ||
3310 valuesz == 0) {
3311 errno = EINVAL;
3312 return -1;
3313 }
3314
3315 tmp_pool = make_sub_pool(redis->pool);
3316 pr_pool_tag(tmp_pool, "Redis HSET pool");
3317
3318 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3319
3320 cmd = "HSET";
3321 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3322 reply = redisCommand(redis->ctx, "%s %b %b %b", cmd, key, keysz, field,
3323 fieldsz, value, valuesz);
3324 xerrno = errno;
3325
3326 reply = handle_reply(redis, cmd, reply);
3327 if (reply == NULL) {
3328 pr_trace_msg(trace_channel, 2,
3329 "error setting item for field in hash using key (%lu bytes): %s",
3330 (unsigned long) keysz, strerror(errno));
3331 destroy_pool(tmp_pool);
3332 errno = xerrno;
3333 return -1;
3334 }
3335
3336 if (reply->type != REDIS_REPLY_INTEGER) {
3337 pr_trace_msg(trace_channel, 2,
3338 "expected INTEGER reply for %s, got %s", cmd,
3339 get_reply_type(reply->type));
3340
3341 if (reply->type == REDIS_REPLY_ERROR) {
3342 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3343 }
3344
3345 freeReplyObject(reply);
3346 destroy_pool(tmp_pool);
3347 errno = EINVAL;
3348 return -1;
3349 }
3350
3351 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
3352
3353 freeReplyObject(reply);
3354 destroy_pool(tmp_pool);
3355 return 0;
3356 }
3357
pr_redis_hash_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,pr_table_t * hash)3358 int pr_redis_hash_ksetall(pr_redis_t *redis, module *m, const char *key,
3359 size_t keysz, pr_table_t *hash) {
3360 int count, xerrno = 0;
3361 pool *tmp_pool = NULL;
3362 const char *cmd = NULL;
3363 array_header *args, *arglens;
3364 redisReply *reply;
3365 const void *key_data;
3366 size_t key_datasz;
3367
3368 if (redis == NULL ||
3369 m == NULL ||
3370 key == NULL ||
3371 keysz == 0 ||
3372 hash == NULL) {
3373 errno = EINVAL;
3374 return -1;
3375 }
3376
3377 /* Skip any empty hashes. */
3378 count = pr_table_count(hash);
3379 if (count <= 0) {
3380 pr_trace_msg(trace_channel, 9, "skipping empty table");
3381 errno = EINVAL;
3382 return -1;
3383 }
3384
3385 tmp_pool = make_sub_pool(redis->pool);
3386 pr_pool_tag(tmp_pool, "Redis HMSET pool");
3387
3388 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3389
3390 cmd = "HMSET";
3391 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3392
3393 args = make_array(tmp_pool, count + 1, sizeof(char *));
3394 arglens = make_array(tmp_pool, count + 1, sizeof(size_t));
3395
3396 *((char **) push_array(args)) = pstrdup(tmp_pool, cmd);
3397 *((size_t *) push_array(arglens)) = strlen(cmd);
3398
3399 *((char **) push_array(args)) = (char *) key;
3400 *((size_t *) push_array(arglens)) = keysz;
3401
3402 pr_table_rewind(hash);
3403 key_data = pr_table_knext(hash, &key_datasz);
3404 while (key_data != NULL) {
3405 const void *value_data;
3406 size_t value_datasz;
3407
3408 pr_signals_handle();
3409
3410 value_data = pr_table_kget(hash, key_data, key_datasz, &value_datasz);
3411 if (value_data != NULL) {
3412 char *key_dup, *value_dup;
3413
3414 key_dup = palloc(tmp_pool, key_datasz);
3415 memcpy(key_dup, key_data, key_datasz);
3416 *((char **) push_array(args)) = key_dup;
3417 *((size_t *) push_array(arglens)) = key_datasz;
3418
3419 value_dup = palloc(tmp_pool, value_datasz);
3420 memcpy(value_dup, value_data, value_datasz);
3421 *((char **) push_array(args)) = value_dup;
3422 *((size_t *) push_array(arglens)) = value_datasz;
3423 }
3424
3425 key_data = pr_table_knext(hash, &key_datasz);
3426 }
3427
3428 reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
3429 xerrno = errno;
3430
3431 reply = handle_reply(redis, cmd, reply);
3432 if (reply == NULL) {
3433 pr_trace_msg(trace_channel, 2,
3434 "error setting hash using key (%lu bytes): %s",
3435 (unsigned long) keysz, strerror(errno));
3436 destroy_pool(tmp_pool);
3437 errno = xerrno;
3438 return -1;
3439 }
3440
3441 if (reply->type != REDIS_REPLY_STRING &&
3442 reply->type != REDIS_REPLY_STATUS) {
3443 pr_trace_msg(trace_channel, 2,
3444 "expected STRING or STATUS reply for %s, got %s", cmd,
3445 get_reply_type(reply->type));
3446
3447 if (reply->type == REDIS_REPLY_ERROR) {
3448 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3449 }
3450
3451 freeReplyObject(reply);
3452 destroy_pool(tmp_pool);
3453 errno = EINVAL;
3454 return -1;
3455 }
3456
3457 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
3458 reply->str);
3459
3460 freeReplyObject(reply);
3461 destroy_pool(tmp_pool);
3462 return 0;
3463 }
3464
pr_redis_hash_kvalues(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values)3465 int pr_redis_hash_kvalues(pool *p, pr_redis_t *redis, module *m,
3466 const char *key, size_t keysz, array_header **values) {
3467 int res, xerrno = 0;
3468 pool *tmp_pool = NULL;
3469 const char *cmd = NULL;
3470 redisReply *reply;
3471
3472 if (p == NULL ||
3473 redis == NULL ||
3474 m == NULL ||
3475 key == NULL ||
3476 keysz == 0 ||
3477 values == NULL) {
3478 errno = EINVAL;
3479 return -1;
3480 }
3481
3482 tmp_pool = make_sub_pool(redis->pool);
3483 pr_pool_tag(tmp_pool, "Redis HVALS pool");
3484
3485 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3486
3487 cmd = "HVALS";
3488 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3489 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
3490 xerrno = errno;
3491
3492 reply = handle_reply(redis, cmd, reply);
3493 if (reply == NULL) {
3494 pr_trace_msg(trace_channel, 2,
3495 "error getting values of hash using key (%lu bytes): %s",
3496 (unsigned long) keysz, strerror(errno));
3497 destroy_pool(tmp_pool);
3498 errno = xerrno;
3499 return -1;
3500 }
3501
3502 if (reply->type != REDIS_REPLY_ARRAY) {
3503 pr_trace_msg(trace_channel, 2,
3504 "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
3505
3506 if (reply->type == REDIS_REPLY_ERROR) {
3507 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3508 }
3509
3510 freeReplyObject(reply);
3511 destroy_pool(tmp_pool);
3512 errno = EINVAL;
3513 return -1;
3514 }
3515
3516 if (reply->elements > 0) {
3517 register unsigned int i;
3518
3519 pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
3520 (unsigned long) reply->elements,
3521 reply->elements != 1 ? "elements" : "element");
3522
3523 *values = make_array(p, reply->elements, sizeof(char *));
3524 for (i = 0; i < reply->elements; i++) {
3525 redisReply *elt;
3526
3527 elt = reply->element[i];
3528 if (elt->type == REDIS_REPLY_STRING) {
3529 char *value;
3530
3531 value = pcalloc(p, reply->len + 1);
3532 memcpy(value, reply->str, reply->len);
3533 *((char **) push_array(*values)) = value;
3534
3535 } else {
3536 pr_trace_msg(trace_channel, 2,
3537 "expected STRING element at index %u, got %s", i,
3538 get_reply_type(elt->type));
3539 }
3540 }
3541
3542 res = 0;
3543
3544 } else {
3545 xerrno = ENOENT;
3546 res = -1;
3547 }
3548
3549 freeReplyObject(reply);
3550 destroy_pool(tmp_pool);
3551
3552 errno = xerrno;
3553 return res;
3554 }
3555
pr_redis_list_kappend(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)3556 int pr_redis_list_kappend(pr_redis_t *redis, module *m, const char *key,
3557 size_t keysz, void *value, size_t valuesz) {
3558 return pr_redis_list_kpush(redis, m, key, keysz, value, valuesz,
3559 PR_REDIS_LIST_FL_RIGHT);
3560 }
3561
pr_redis_list_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)3562 int pr_redis_list_kcount(pr_redis_t *redis, module *m, const char *key,
3563 size_t keysz, uint64_t *count) {
3564 int xerrno = 0;
3565 pool *tmp_pool = NULL;
3566 const char *cmd = NULL;
3567 redisReply *reply;
3568
3569 if (redis == NULL ||
3570 m == NULL ||
3571 key == NULL ||
3572 keysz == 0 ||
3573 count == NULL) {
3574 errno = EINVAL;
3575 return -1;
3576 }
3577
3578 tmp_pool = make_sub_pool(redis->pool);
3579 pr_pool_tag(tmp_pool, "Redis LLEN pool");
3580
3581 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3582
3583 cmd = "LLEN";
3584 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3585 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
3586 xerrno = errno;
3587
3588 reply = handle_reply(redis, cmd, reply);
3589 if (reply == NULL) {
3590 pr_trace_msg(trace_channel, 2,
3591 "error getting count of list using key (%lu bytes): %s",
3592 (unsigned long) keysz, strerror(errno));
3593 destroy_pool(tmp_pool);
3594 errno = xerrno;
3595 return -1;
3596 }
3597
3598 if (reply->type != REDIS_REPLY_INTEGER) {
3599 pr_trace_msg(trace_channel, 2,
3600 "expected INTEGER reply for %s, got %s", cmd,
3601 get_reply_type(reply->type));
3602
3603 if (reply->type == REDIS_REPLY_ERROR) {
3604 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3605 }
3606
3607 freeReplyObject(reply);
3608 destroy_pool(tmp_pool);
3609 errno = EINVAL;
3610 return -1;
3611 }
3612
3613 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
3614 *count = (uint64_t) reply->integer;
3615
3616 freeReplyObject(reply);
3617 destroy_pool(tmp_pool);
3618 return 0;
3619 }
3620
pr_redis_list_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)3621 int pr_redis_list_kdelete(pr_redis_t *redis, module *m, const char *key,
3622 size_t keysz, void *value, size_t valuesz) {
3623 int xerrno = 0;
3624 pool *tmp_pool = NULL;
3625 const char *cmd = NULL;
3626 redisReply *reply;
3627 long long count = 0;
3628
3629 if (redis == NULL ||
3630 m == NULL ||
3631 key == NULL ||
3632 keysz == 0 ||
3633 value == NULL ||
3634 valuesz == 0) {
3635 errno = EINVAL;
3636 return -1;
3637 }
3638
3639 tmp_pool = make_sub_pool(redis->pool);
3640 pr_pool_tag(tmp_pool, "Redis LREM pool");
3641
3642 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3643
3644 cmd = "LREM";
3645 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3646 reply = redisCommand(redis->ctx, "%s %b 0 %b", cmd, key, keysz, value,
3647 valuesz);
3648 xerrno = errno;
3649
3650 reply = handle_reply(redis, cmd, reply);
3651 if (reply == NULL) {
3652 pr_trace_msg(trace_channel, 2,
3653 "error deleting item from set using key (%lu bytes): %s",
3654 (unsigned long) keysz, strerror(errno));
3655 destroy_pool(tmp_pool);
3656 errno = xerrno;
3657 return -1;
3658 }
3659
3660 if (reply->type != REDIS_REPLY_INTEGER) {
3661 pr_trace_msg(trace_channel, 2,
3662 "expected INTEGER reply for %s, got %s", cmd,
3663 get_reply_type(reply->type));
3664
3665 if (reply->type == REDIS_REPLY_ERROR) {
3666 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3667 }
3668
3669 freeReplyObject(reply);
3670 destroy_pool(tmp_pool);
3671 errno = EINVAL;
3672 return -1;
3673 }
3674
3675 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
3676 count = reply->integer;
3677
3678 freeReplyObject(reply);
3679 destroy_pool(tmp_pool);
3680
3681 if (count == 0) {
3682 /* No items removed. */
3683 errno = ENOENT;
3684 return -1;
3685 }
3686
3687 return 0;
3688 }
3689
pr_redis_list_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx)3690 int pr_redis_list_kexists(pr_redis_t *redis, module *m, const char *key,
3691 size_t keysz, unsigned int idx) {
3692 pool *tmp_pool;
3693 int res, xerrno = 0;
3694 void *val = NULL;
3695 size_t valsz = 0;
3696
3697 tmp_pool = make_sub_pool(NULL);
3698 res = pr_redis_list_kget(tmp_pool, redis, m, key, keysz, idx, &val, &valsz);
3699 xerrno = errno;
3700 destroy_pool(tmp_pool);
3701
3702 if (res < 0) {
3703 if (xerrno != ENOENT) {
3704 errno = xerrno;
3705 return -1;
3706 }
3707
3708 return FALSE;
3709 }
3710
3711 return TRUE;
3712 }
3713
pr_redis_list_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx,void ** value,size_t * valuesz)3714 int pr_redis_list_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
3715 size_t keysz, unsigned int idx, void **value, size_t *valuesz) {
3716 int res, xerrno = 0;
3717 pool *tmp_pool = NULL;
3718 const char *cmd = NULL;
3719 redisReply *reply;
3720 uint64_t count;
3721
3722 if (p == NULL ||
3723 redis == NULL ||
3724 m == NULL ||
3725 key == NULL ||
3726 keysz == 0 ||
3727 value == NULL ||
3728 valuesz == NULL) {
3729 errno = EINVAL;
3730 return -1;
3731 }
3732
3733 if (pr_redis_list_kcount(redis, m, key, keysz, &count) == 0) {
3734 if (count > 0 &&
3735 idx > 0 &&
3736 idx >= count) {
3737 pr_trace_msg(trace_channel, 14,
3738 "requested index %u exceeds list length %lu", idx,
3739 (unsigned long) count);
3740 errno = ERANGE;
3741 return -1;
3742 }
3743 }
3744
3745 tmp_pool = make_sub_pool(redis->pool);
3746 pr_pool_tag(tmp_pool, "Redis LINDEX pool");
3747
3748 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3749
3750 cmd = "LINDEX";
3751 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3752 reply = redisCommand(redis->ctx, "%s %b %u", cmd, key, keysz, idx);
3753 xerrno = errno;
3754
3755 reply = handle_reply(redis, cmd, reply);
3756 if (reply == NULL) {
3757 pr_trace_msg(trace_channel, 2,
3758 "error getting item at index %u of list using key (%lu bytes): %s", idx,
3759 (unsigned long) keysz, strerror(errno));
3760 destroy_pool(tmp_pool);
3761 errno = xerrno;
3762 return -1;
3763 }
3764
3765 if (reply->type != REDIS_REPLY_STRING &&
3766 reply->type != REDIS_REPLY_NIL) {
3767 pr_trace_msg(trace_channel, 2,
3768 "expected STRING or NIL reply for %s, got %s", cmd,
3769 get_reply_type(reply->type));
3770
3771 if (reply->type == REDIS_REPLY_ERROR) {
3772 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3773 }
3774
3775 freeReplyObject(reply);
3776 destroy_pool(tmp_pool);
3777 errno = EINVAL;
3778 return -1;
3779 }
3780
3781 if (reply->type == REDIS_REPLY_STRING) {
3782 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
3783 reply->str);
3784 *valuesz = reply->len;
3785 *value = palloc(p, reply->len);
3786 memcpy(*value, reply->str, reply->len);
3787 res = 0;
3788
3789 } else {
3790 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
3791 xerrno = ENOENT;
3792 res = -1;
3793 }
3794
3795 freeReplyObject(reply);
3796 destroy_pool(tmp_pool);
3797
3798 errno = xerrno;
3799 return res;
3800 }
3801
list_scan(pool * p,pr_redis_t * redis,const char * key,size_t keysz,array_header * values,array_header * valueszs,int * cursor,int count)3802 static int list_scan(pool *p, pr_redis_t *redis, const char *key, size_t keysz,
3803 array_header *values, array_header *valueszs, int *cursor, int count) {
3804 int res = 0, xerrno = 0, range;
3805 const char *cmd = NULL;
3806 redisReply *reply;
3807
3808 cmd = "LRANGE";
3809
3810 /* Note: We use one less than the count to preserve [...) semantics of the
3811 * requested range, rather than Redis' [...] inclusive semantics.
3812 */
3813 range = *cursor + count - 1;
3814 pr_trace_msg(trace_channel, 7, "sending command: %s %d %d", cmd, *cursor,
3815 range);
3816 reply = redisCommand(redis->ctx, "%s %b %d %d", cmd, key, keysz,
3817 *cursor, range);
3818 xerrno = errno;
3819
3820 reply = handle_reply(redis, cmd, reply);
3821 if (reply == NULL) {
3822 pr_trace_msg(trace_channel, 2,
3823 "error getting items in list using key (%lu bytes), cursor %d: %s",
3824 (unsigned long) keysz, *cursor, strerror(errno));
3825 errno = xerrno;
3826 return -1;
3827 }
3828
3829 if (reply->type != REDIS_REPLY_ARRAY) {
3830 pr_trace_msg(trace_channel, 2,
3831 "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
3832
3833 if (reply->type == REDIS_REPLY_ERROR) {
3834 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
3835 }
3836
3837 freeReplyObject(reply);
3838 errno = EINVAL;
3839 return -1;
3840 }
3841
3842 if (reply->elements > 0) {
3843 register unsigned int i;
3844
3845 pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
3846 (unsigned long) reply->elements,
3847 reply->elements != 1 ? "elements" : "element");
3848
3849 for (i = 0; i < reply->elements; i++) {
3850 redisReply *value_elt;
3851 void *value_data = NULL;
3852 size_t value_datasz = 0;
3853
3854 value_elt = reply->element[i];
3855 if (value_elt->type == REDIS_REPLY_STRING) {
3856 value_datasz = value_elt->len;
3857 value_data = palloc(p, value_datasz);
3858 memcpy(value_data, value_elt->str, value_datasz);
3859
3860 } else {
3861 pr_trace_msg(trace_channel, 2,
3862 "expected STRING element at index %u, got %s", i+1,
3863 get_reply_type(value_elt->type));
3864 }
3865
3866 if (value_data != NULL) {
3867 *((void **) push_array(values)) = value_data;
3868 *((size_t *) push_array(valueszs)) = value_datasz;
3869 }
3870 }
3871
3872 if (reply->elements == 0) {
3873 /* Set the cursor to -1, to indicate to the caller the end of the
3874 * iteration.
3875 */
3876 *cursor = -1;
3877
3878 } else {
3879 (*cursor) += reply->elements;
3880 }
3881
3882 res = 0;
3883
3884 } else {
3885 if (*cursor > 0) {
3886 /* If cursor is greater than zero, then we have found some elements,
3887 * and have reached the end of the iteration.
3888 */
3889 *cursor = -1;
3890 res = 0;
3891
3892 } else {
3893 xerrno = ENOENT;
3894 res = -1;
3895 }
3896 }
3897
3898 freeReplyObject(reply);
3899 errno = xerrno;
3900 return res;
3901 }
3902
pr_redis_list_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values,array_header ** valueszs)3903 int pr_redis_list_kgetall(pool *p, pr_redis_t *redis, module *m,
3904 const char *key, size_t keysz, array_header **values,
3905 array_header **valueszs) {
3906 int cursor, res = 0, xerrno = 0;
3907 pool *tmp_pool = NULL;
3908
3909 if (p == NULL ||
3910 redis == NULL ||
3911 m == NULL ||
3912 key == NULL ||
3913 keysz == 0 ||
3914 values == NULL ||
3915 valueszs == NULL) {
3916 errno = EINVAL;
3917 return -1;
3918 }
3919
3920 tmp_pool = make_sub_pool(redis->pool);
3921 pr_pool_tag(tmp_pool, "Redis LRANGE pool");
3922
3923 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3924
3925 cursor = 0;
3926 res = 0;
3927 *values = make_array(p, 0, sizeof(void *));
3928 *valueszs = make_array(p, 0, sizeof(size_t));
3929
3930 while (res == 0 &&
3931 cursor != -1) {
3932 pr_signals_handle();
3933
3934 res = list_scan(tmp_pool, redis, key, keysz, *values, *valueszs, &cursor,
3935 PR_REDIS_SCAN_SIZE);
3936 xerrno = errno;
3937
3938 if (res < 0) {
3939 destroy_pool(tmp_pool);
3940
3941 errno = xerrno;
3942 return -1;
3943 }
3944 }
3945
3946 if ((*values)->nelts == 0) {
3947 *values = *valueszs = NULL;
3948 xerrno = ENOENT;
3949 res = -1;
3950 }
3951
3952 destroy_pool(tmp_pool);
3953 errno = xerrno;
3954 return res;
3955 }
3956
pr_redis_list_kpop(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,void ** value,size_t * valuesz,int flags)3957 int pr_redis_list_kpop(pool *p, pr_redis_t *redis, module *m, const char *key,
3958 size_t keysz, void **value, size_t *valuesz, int flags) {
3959 int res, xerrno = 0;
3960 pool *tmp_pool = NULL;
3961 const char *cmd = NULL;
3962 redisReply *reply;
3963
3964 if (p == NULL ||
3965 redis == NULL ||
3966 m == NULL ||
3967 key == NULL ||
3968 keysz == 0 ||
3969 value == NULL ||
3970 valuesz == NULL) {
3971 errno = EINVAL;
3972 return -1;
3973 }
3974
3975 tmp_pool = make_sub_pool(redis->pool);
3976
3977 switch (flags) {
3978 case PR_REDIS_LIST_FL_RIGHT:
3979 pr_pool_tag(tmp_pool, "Redis RPOP pool");
3980 cmd = "RPOP";
3981 break;
3982
3983 case PR_REDIS_LIST_FL_LEFT:
3984 pr_pool_tag(tmp_pool, "Redis LPOP pool");
3985 cmd = "LPOP";
3986 break;
3987
3988 default:
3989 destroy_pool(tmp_pool);
3990 errno = EINVAL;
3991 return -1;
3992 }
3993
3994 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
3995
3996 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
3997 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
3998 xerrno = errno;
3999
4000 reply = handle_reply(redis, cmd, reply);
4001 if (reply == NULL) {
4002 pr_trace_msg(trace_channel, 2,
4003 "error popping item from list using key (%lu bytes): %s",
4004 (unsigned long) keysz, strerror(errno));
4005 destroy_pool(tmp_pool);
4006 errno = xerrno;
4007 return -1;
4008 }
4009
4010 if (reply->type != REDIS_REPLY_STRING &&
4011 reply->type != REDIS_REPLY_NIL) {
4012 pr_trace_msg(trace_channel, 2,
4013 "expected STRING or NIL reply for %s, got %s", cmd,
4014 get_reply_type(reply->type));
4015
4016 if (reply->type == REDIS_REPLY_ERROR) {
4017 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4018 }
4019
4020 freeReplyObject(reply);
4021 destroy_pool(tmp_pool);
4022 errno = EINVAL;
4023 return -1;
4024 }
4025
4026 if (reply->type == REDIS_REPLY_STRING) {
4027 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
4028 reply->str);
4029 *valuesz = reply->len;
4030 *value = palloc(p, reply->len);
4031 memcpy(*value, reply->str, reply->len);
4032 res = 0;
4033
4034 } else {
4035 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
4036 xerrno = ENOENT;
4037 res = -1;
4038 }
4039
4040 freeReplyObject(reply);
4041 destroy_pool(tmp_pool);
4042
4043 errno = xerrno;
4044 return res;
4045 }
4046
pr_redis_list_kpush(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,int flags)4047 int pr_redis_list_kpush(pr_redis_t *redis, module *m, const char *key,
4048 size_t keysz, void *value, size_t valuesz, int flags) {
4049 int xerrno = 0;
4050 pool *tmp_pool = NULL;
4051 const char *cmd = NULL;
4052 redisReply *reply;
4053
4054 if (redis == NULL ||
4055 m == NULL ||
4056 key == NULL ||
4057 keysz == 0 ||
4058 value == NULL ||
4059 valuesz == 0) {
4060 errno = EINVAL;
4061 return -1;
4062 }
4063
4064 tmp_pool = make_sub_pool(redis->pool);
4065
4066 switch (flags) {
4067 case PR_REDIS_LIST_FL_RIGHT:
4068 pr_pool_tag(tmp_pool, "Redis RPUSH pool");
4069 cmd = "RPUSH";
4070 break;
4071
4072 case PR_REDIS_LIST_FL_LEFT:
4073 pr_pool_tag(tmp_pool, "Redis LPUSH pool");
4074 cmd = "LPUSH";
4075 break;
4076
4077 default:
4078 destroy_pool(tmp_pool);
4079 errno = EINVAL;
4080 return -1;
4081 }
4082
4083 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4084
4085 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4086 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4087 xerrno = errno;
4088
4089 reply = handle_reply(redis, cmd, reply);
4090 if (reply == NULL) {
4091 pr_trace_msg(trace_channel, 2,
4092 "error pushing to list using key (%lu bytes): %s",
4093 (unsigned long) keysz, strerror(errno));
4094 destroy_pool(tmp_pool);
4095 errno = xerrno;
4096 return -1;
4097 }
4098
4099 if (reply->type != REDIS_REPLY_INTEGER) {
4100 pr_trace_msg(trace_channel, 2,
4101 "expected INTEGER reply for %s, got %s", cmd,
4102 get_reply_type(reply->type));
4103
4104 if (reply->type == REDIS_REPLY_ERROR) {
4105 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4106 }
4107
4108 freeReplyObject(reply);
4109 destroy_pool(tmp_pool);
4110 errno = EINVAL;
4111 return -1;
4112 }
4113
4114 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4115
4116 freeReplyObject(reply);
4117 destroy_pool(tmp_pool);
4118 return 0;
4119 }
4120
pr_redis_list_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)4121 int pr_redis_list_kremove(pr_redis_t *redis, module *m, const char *key,
4122 size_t keysz) {
4123
4124 /* Note: We can actually use just DEL here. */
4125 return pr_redis_kremove(redis, m, key, keysz);
4126 }
4127
pr_redis_list_krotate(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,void ** value,size_t * valuesz)4128 int pr_redis_list_krotate(pool *p, pr_redis_t *redis, module *m,
4129 const char *key, size_t keysz, void **value, size_t *valuesz) {
4130 int res = 0, xerrno = 0;
4131 pool *tmp_pool = NULL;
4132 const char *cmd = NULL;
4133 redisReply *reply;
4134
4135 if (p == NULL ||
4136 redis == NULL ||
4137 m == NULL ||
4138 key == NULL ||
4139 keysz == 0 ||
4140 value == NULL ||
4141 valuesz == NULL) {
4142 errno = EINVAL;
4143 return -1;
4144 }
4145
4146 tmp_pool = make_sub_pool(redis->pool);
4147 pr_pool_tag(tmp_pool, "Redis RPOPLPUSH pool");
4148
4149 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4150
4151 cmd = "RPOPLPUSH";
4152 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4153 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, key, keysz);
4154 xerrno = errno;
4155
4156 reply = handle_reply(redis, cmd, reply);
4157 if (reply == NULL) {
4158 pr_trace_msg(trace_channel, 2,
4159 "error rotating list using key (%lu bytes): %s", (unsigned long) keysz,
4160 strerror(errno));
4161 destroy_pool(tmp_pool);
4162 errno = xerrno;
4163 return -1;
4164 }
4165
4166 if (reply->type != REDIS_REPLY_STRING &&
4167 reply->type != REDIS_REPLY_NIL) {
4168 pr_trace_msg(trace_channel, 2,
4169 "expected STRING or NIL reply for %s, got %s", cmd,
4170 get_reply_type(reply->type));
4171
4172 if (reply->type == REDIS_REPLY_ERROR) {
4173 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4174 }
4175
4176 freeReplyObject(reply);
4177 destroy_pool(tmp_pool);
4178 errno = EINVAL;
4179 return -1;
4180 }
4181
4182 if (reply->type == REDIS_REPLY_STRING) {
4183 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
4184 reply->str);
4185 *valuesz = reply->len;
4186 *value = palloc(p, reply->len);
4187 memcpy(*value, reply->str, reply->len);
4188 res = 0;
4189
4190 } else {
4191 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
4192 xerrno = ENOENT;
4193 res = -1;
4194 }
4195
4196 freeReplyObject(reply);
4197 destroy_pool(tmp_pool);
4198
4199 errno = xerrno;
4200 return res;
4201 }
4202
pr_redis_list_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx,void * value,size_t valuesz)4203 int pr_redis_list_kset(pr_redis_t *redis, module *m, const char *key,
4204 size_t keysz, unsigned int idx, void *value, size_t valuesz) {
4205 int xerrno = 0;
4206 pool *tmp_pool = NULL;
4207 const char *cmd = NULL;
4208 redisReply *reply;
4209
4210 if (redis == NULL ||
4211 m == NULL ||
4212 key == NULL ||
4213 keysz == 0 ||
4214 value == NULL ||
4215 valuesz == 0) {
4216 errno = EINVAL;
4217 return -1;
4218 }
4219
4220 tmp_pool = make_sub_pool(redis->pool);
4221 pr_pool_tag(tmp_pool, "Redis LSET pool");
4222
4223 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4224
4225 cmd = "LSET";
4226 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4227 reply = redisCommand(redis->ctx, "%s %b %u %b", cmd, key, keysz, idx, value,
4228 valuesz);
4229 xerrno = errno;
4230
4231 reply = handle_reply(redis, cmd, reply);
4232 if (reply == NULL) {
4233 pr_trace_msg(trace_channel, 2,
4234 "error setting item at index %u in list using key (%lu bytes): %s", idx,
4235 (unsigned long) keysz, strerror(errno));
4236 destroy_pool(tmp_pool);
4237 errno = xerrno;
4238 return -1;
4239 }
4240
4241 if (reply->type != REDIS_REPLY_STRING &&
4242 reply->type != REDIS_REPLY_STATUS) {
4243 pr_trace_msg(trace_channel, 2,
4244 "expected STRING or STATUS reply for %s, got %s", cmd,
4245 get_reply_type(reply->type));
4246
4247 if (reply->type == REDIS_REPLY_ERROR) {
4248 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4249 }
4250
4251 freeReplyObject(reply);
4252 destroy_pool(tmp_pool);
4253 errno = EINVAL;
4254 return -1;
4255 }
4256
4257 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
4258 reply->str);
4259
4260 freeReplyObject(reply);
4261 destroy_pool(tmp_pool);
4262 return 0;
4263 }
4264
pr_redis_list_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs)4265 int pr_redis_list_ksetall(pr_redis_t *redis, module *m, const char *key,
4266 size_t keysz, array_header *values, array_header *valueszs) {
4267 register unsigned int i;
4268 int res, xerrno = 0;
4269 pool *tmp_pool = NULL;
4270 array_header *args, *arglens;
4271 const char *cmd = NULL;
4272 redisReply *reply;
4273
4274 if (redis == NULL ||
4275 m == NULL ||
4276 key == NULL ||
4277 keysz == 0 ||
4278 values == NULL ||
4279 values->nelts == 0 ||
4280 valueszs == NULL ||
4281 valueszs->nelts == 0 ||
4282 values->nelts != valueszs->nelts) {
4283 errno = EINVAL;
4284 return -1;
4285 }
4286
4287 /* First, delete any existing list at this key; a set operation, in my mind,
4288 * is a complete overwrite.
4289 */
4290 res = pr_redis_list_kremove(redis, m, key, keysz);
4291 if (res < 0 &&
4292 errno != ENOENT) {
4293 return -1;
4294 }
4295
4296 tmp_pool = make_sub_pool(redis->pool);
4297 pr_pool_tag(tmp_pool, "Redis RPUSH pool");
4298
4299 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4300
4301 cmd = "RPUSH";
4302 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4303
4304 args = make_array(tmp_pool, 0, sizeof(char *));
4305 arglens = make_array(tmp_pool, 0, sizeof(size_t));
4306
4307 *((char **) push_array(args)) = pstrdup(tmp_pool, cmd);
4308 *((size_t *) push_array(arglens)) = strlen(cmd);
4309
4310 *((char **) push_array(args)) = (char *) key;
4311 *((size_t *) push_array(arglens)) = keysz;
4312
4313 for (i = 0; i < values->nelts; i++) {
4314 pr_signals_handle();
4315
4316 *((char **) push_array(args)) = ((char **) values->elts)[i];
4317 *((size_t *) push_array(arglens)) = ((size_t *) valueszs->elts)[i];
4318 }
4319
4320 reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
4321 xerrno = errno;
4322
4323 reply = handle_reply(redis, cmd, reply);
4324 if (reply == NULL) {
4325 pr_trace_msg(trace_channel, 2,
4326 "error setting items in list using key (%lu bytes): %s",
4327 (unsigned long) keysz, strerror(errno));
4328 destroy_pool(tmp_pool);
4329 errno = xerrno;
4330 return -1;
4331 }
4332
4333 if (reply->type != REDIS_REPLY_INTEGER) {
4334 pr_trace_msg(trace_channel, 2,
4335 "expected INTEGER reply for %s, got %s", cmd,
4336 get_reply_type(reply->type));
4337
4338 if (reply->type == REDIS_REPLY_ERROR) {
4339 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4340 }
4341 freeReplyObject(reply);
4342 destroy_pool(tmp_pool);
4343 errno = EINVAL;
4344 return -1;
4345 }
4346
4347 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4348
4349 freeReplyObject(reply);
4350 destroy_pool(tmp_pool);
4351 return 0;
4352 }
4353
pr_redis_set_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4354 int pr_redis_set_kadd(pr_redis_t *redis, module *m, const char *key,
4355 size_t keysz, void *value, size_t valuesz) {
4356 int xerrno = 0, exists = FALSE;
4357 pool *tmp_pool = NULL;
4358 const char *cmd = NULL;
4359 redisReply *reply;
4360
4361 if (redis == NULL ||
4362 m == NULL ||
4363 key == NULL ||
4364 keysz == 0 ||
4365 value == NULL ||
4366 valuesz == 0) {
4367 errno = EINVAL;
4368 return -1;
4369 }
4370
4371 exists = pr_redis_set_kexists(redis, m, key, keysz, value, valuesz);
4372 if (exists == TRUE) {
4373 errno = EEXIST;
4374 return -1;
4375 }
4376
4377 tmp_pool = make_sub_pool(redis->pool);
4378 pr_pool_tag(tmp_pool, "Redis SADD pool");
4379
4380 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4381
4382 cmd = "SADD";
4383 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4384 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4385 xerrno = errno;
4386
4387 reply = handle_reply(redis, cmd, reply);
4388 if (reply == NULL) {
4389 pr_trace_msg(trace_channel, 2,
4390 "error adding to set using key (%lu bytes): %s",
4391 (unsigned long) keysz, strerror(errno));
4392 destroy_pool(tmp_pool);
4393 errno = xerrno;
4394 return -1;
4395 }
4396
4397 if (reply->type != REDIS_REPLY_INTEGER) {
4398 pr_trace_msg(trace_channel, 2,
4399 "expected INTEGER reply for %s, got %s", cmd,
4400 get_reply_type(reply->type));
4401
4402 if (reply->type == REDIS_REPLY_ERROR) {
4403 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4404 }
4405
4406 freeReplyObject(reply);
4407 destroy_pool(tmp_pool);
4408 errno = EINVAL;
4409 return -1;
4410 }
4411
4412 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4413
4414 freeReplyObject(reply);
4415 destroy_pool(tmp_pool);
4416 return 0;
4417 }
4418
pr_redis_set_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)4419 int pr_redis_set_kcount(pr_redis_t *redis, module *m, const char *key,
4420 size_t keysz, uint64_t *count) {
4421 int xerrno = 0;
4422 pool *tmp_pool = NULL;
4423 const char *cmd = NULL;
4424 redisReply *reply;
4425
4426 if (redis == NULL ||
4427 m == NULL ||
4428 key == NULL ||
4429 keysz == 0 ||
4430 count == NULL) {
4431 errno = EINVAL;
4432 return -1;
4433 }
4434
4435 tmp_pool = make_sub_pool(redis->pool);
4436 pr_pool_tag(tmp_pool, "Redis SCARD pool");
4437
4438 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4439
4440 cmd = "SCARD";
4441 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4442 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
4443 xerrno = errno;
4444
4445 reply = handle_reply(redis, cmd, reply);
4446 if (reply == NULL) {
4447 pr_trace_msg(trace_channel, 2,
4448 "error getting count of set using key (%lu bytes): %s",
4449 (unsigned long) keysz, strerror(errno));
4450 destroy_pool(tmp_pool);
4451 errno = xerrno;
4452 return -1;
4453 }
4454
4455 if (reply->type != REDIS_REPLY_INTEGER) {
4456 pr_trace_msg(trace_channel, 2,
4457 "expected INTEGER reply for %s, got %s", cmd,
4458 get_reply_type(reply->type));
4459
4460 if (reply->type == REDIS_REPLY_ERROR) {
4461 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4462 }
4463
4464 freeReplyObject(reply);
4465 destroy_pool(tmp_pool);
4466 errno = EINVAL;
4467 return -1;
4468 }
4469
4470 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4471 *count = (uint64_t) reply->integer;
4472
4473 freeReplyObject(reply);
4474 destroy_pool(tmp_pool);
4475 return 0;
4476 }
4477
pr_redis_set_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4478 int pr_redis_set_kdelete(pr_redis_t *redis, module *m, const char *key,
4479 size_t keysz, void *value, size_t valuesz) {
4480 int xerrno = 0;
4481 pool *tmp_pool = NULL;
4482 const char *cmd = NULL;
4483 redisReply *reply;
4484 long long count = 0;
4485
4486 if (redis == NULL ||
4487 m == NULL ||
4488 key == NULL ||
4489 keysz == 0 ||
4490 value == NULL ||
4491 valuesz == 0) {
4492 errno = EINVAL;
4493 return -1;
4494 }
4495
4496 tmp_pool = make_sub_pool(redis->pool);
4497 pr_pool_tag(tmp_pool, "Redis SREM pool");
4498
4499 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4500
4501 cmd = "SREM";
4502 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4503 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4504 xerrno = errno;
4505
4506 reply = handle_reply(redis, cmd, reply);
4507 if (reply == NULL) {
4508 pr_trace_msg(trace_channel, 2,
4509 "error deleting item from set using key (%lu bytes): %s",
4510 (unsigned long) keysz, strerror(errno));
4511 destroy_pool(tmp_pool);
4512 errno = xerrno;
4513 return -1;
4514 }
4515
4516 if (reply->type != REDIS_REPLY_INTEGER) {
4517 pr_trace_msg(trace_channel, 2,
4518 "expected INTEGER reply for %s, got %s", cmd,
4519 get_reply_type(reply->type));
4520
4521 if (reply->type == REDIS_REPLY_ERROR) {
4522 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4523 }
4524
4525 freeReplyObject(reply);
4526 destroy_pool(tmp_pool);
4527 errno = EINVAL;
4528 return -1;
4529 }
4530
4531 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4532 count = reply->integer;
4533
4534 freeReplyObject(reply);
4535 destroy_pool(tmp_pool);
4536
4537 if (count == 0) {
4538 /* No items removed. */
4539 errno = ENOENT;
4540 return -1;
4541 }
4542
4543 return 0;
4544 }
4545
pr_redis_set_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4546 int pr_redis_set_kexists(pr_redis_t *redis, module *m, const char *key,
4547 size_t keysz, void *value, size_t valuesz) {
4548 int xerrno = 0, exists = FALSE;
4549 pool *tmp_pool = NULL;
4550 const char *cmd = NULL;
4551 redisReply *reply;
4552
4553 if (redis == NULL ||
4554 m == NULL ||
4555 key == NULL ||
4556 keysz == 0 ||
4557 value == NULL ||
4558 valuesz == 0) {
4559 errno = EINVAL;
4560 return -1;
4561 }
4562
4563 tmp_pool = make_sub_pool(redis->pool);
4564 pr_pool_tag(tmp_pool, "Redis SISMEMBER pool");
4565
4566 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4567
4568 cmd = "SISMEMBER";
4569 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4570 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4571 xerrno = errno;
4572
4573 reply = handle_reply(redis, cmd, reply);
4574 if (reply == NULL) {
4575 pr_trace_msg(trace_channel, 2,
4576 "error checking item in set using key (%lu bytes): %s",
4577 (unsigned long) keysz, strerror(errno));
4578 destroy_pool(tmp_pool);
4579 errno = xerrno;
4580 return -1;
4581 }
4582
4583 if (reply->type != REDIS_REPLY_INTEGER) {
4584 pr_trace_msg(trace_channel, 2,
4585 "expected INTEGER reply for %s, got %s", cmd,
4586 get_reply_type(reply->type));
4587
4588 if (reply->type == REDIS_REPLY_ERROR) {
4589 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4590 }
4591
4592 freeReplyObject(reply);
4593 destroy_pool(tmp_pool);
4594 errno = EINVAL;
4595 return -1;
4596 }
4597
4598 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4599 exists = reply->integer ? TRUE : FALSE;
4600
4601 freeReplyObject(reply);
4602 destroy_pool(tmp_pool);
4603 return exists;
4604 }
4605
pr_redis_set_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values,array_header ** valueszs)4606 int pr_redis_set_kgetall(pool *p, pr_redis_t *redis, module *m, const char *key,
4607 size_t keysz, array_header **values, array_header **valueszs) {
4608 int res = 0, xerrno = 0;
4609 pool *tmp_pool = NULL;
4610 const char *cmd = NULL;
4611 redisReply *reply;
4612
4613 if (p == NULL ||
4614 redis == NULL ||
4615 m == NULL ||
4616 key == NULL ||
4617 keysz == 0 ||
4618 values == NULL ||
4619 valueszs == NULL) {
4620 errno = EINVAL;
4621 return -1;
4622 }
4623
4624 tmp_pool = make_sub_pool(redis->pool);
4625 pr_pool_tag(tmp_pool, "Redis SMEMBERS pool");
4626
4627 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4628
4629 cmd = "SMEMBERS";
4630 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4631 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
4632 xerrno = errno;
4633
4634 reply = handle_reply(redis, cmd, reply);
4635 if (reply == NULL) {
4636 pr_trace_msg(trace_channel, 2,
4637 "error getting items in set using key (%lu bytes): %s",
4638 (unsigned long) keysz, strerror(errno));
4639 destroy_pool(tmp_pool);
4640 errno = xerrno;
4641 return -1;
4642 }
4643
4644 if (reply->type != REDIS_REPLY_ARRAY) {
4645 pr_trace_msg(trace_channel, 2,
4646 "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
4647
4648 if (reply->type == REDIS_REPLY_ERROR) {
4649 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4650 }
4651
4652 freeReplyObject(reply);
4653 destroy_pool(tmp_pool);
4654 errno = EINVAL;
4655 return -1;
4656 }
4657
4658 if (reply->elements > 0) {
4659 register unsigned int i;
4660
4661 pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
4662 (unsigned long) reply->elements,
4663 reply->elements != 1 ? "elements" : "element");
4664
4665 *values = make_array(p, 0, sizeof(void *));
4666 *valueszs = make_array(p, 0, sizeof(size_t));
4667
4668 for (i = 0; i < reply->elements; i++) {
4669 redisReply *value_elt;
4670 void *value_data = NULL;
4671 size_t value_datasz = 0;
4672
4673 value_elt = reply->element[i];
4674 if (value_elt->type == REDIS_REPLY_STRING) {
4675 value_datasz = value_elt->len;
4676 value_data = palloc(p, value_datasz);
4677 memcpy(value_data, value_elt->str, value_datasz);
4678
4679 } else {
4680 pr_trace_msg(trace_channel, 2,
4681 "expected STRING element at index %u, got %s", i + 1,
4682 get_reply_type(value_elt->type));
4683 }
4684
4685 if (value_data != NULL) {
4686 *((void **) push_array(*values)) = value_data;
4687 *((size_t *) push_array(*valueszs)) = value_datasz;
4688 }
4689 }
4690
4691 res = 0;
4692
4693 } else {
4694 xerrno = ENOENT;
4695 res = -1;
4696 }
4697
4698 freeReplyObject(reply);
4699 destroy_pool(tmp_pool);
4700
4701 errno = xerrno;
4702 return res;
4703 }
4704
pr_redis_set_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)4705 int pr_redis_set_kremove(pr_redis_t *redis, module *m, const char *key,
4706 size_t keysz) {
4707
4708 /* Note: We can actually use just DEL here. */
4709 return pr_redis_kremove(redis, m, key, keysz);
4710 }
4711
pr_redis_set_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs)4712 int pr_redis_set_ksetall(pr_redis_t *redis, module *m, const char *key,
4713 size_t keysz, array_header *values, array_header *valueszs) {
4714 register unsigned int i;
4715 int res, xerrno = 0;
4716 pool *tmp_pool = NULL;
4717 array_header *args, *arglens;
4718 const char *cmd = NULL;
4719 redisReply *reply;
4720
4721 if (redis == NULL ||
4722 m == NULL ||
4723 key == NULL ||
4724 keysz == 0 ||
4725 values == NULL ||
4726 values->nelts == 0 ||
4727 valueszs == NULL ||
4728 valueszs->nelts == 0 ||
4729 values->nelts != valueszs->nelts) {
4730 errno = EINVAL;
4731 return -1;
4732 }
4733
4734 /* First, delete any existing set at this key; a set operation, in my mind,
4735 * is a complete overwrite.
4736 */
4737 res = pr_redis_set_kremove(redis, m, key, keysz);
4738 if (res < 0 &&
4739 errno != ENOENT) {
4740 return -1;
4741 }
4742
4743 tmp_pool = make_sub_pool(redis->pool);
4744 pr_pool_tag(tmp_pool, "Redis SADD pool");
4745
4746 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4747
4748 cmd = "SADD";
4749 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4750
4751 args = make_array(tmp_pool, 0, sizeof(char *));
4752 arglens = make_array(tmp_pool, 0, sizeof(size_t));
4753
4754 *((char **) push_array(args)) = pstrdup(tmp_pool, cmd);
4755 *((size_t *) push_array(arglens)) = strlen(cmd);
4756
4757 *((char **) push_array(args)) = (char *) key;
4758 *((size_t *) push_array(arglens)) = keysz;
4759
4760 for (i = 0; i < values->nelts; i++) {
4761 pr_signals_handle();
4762
4763 *((char **) push_array(args)) = ((char **) values->elts)[i];
4764 *((size_t *) push_array(arglens)) = ((size_t *) valueszs->elts)[i];
4765 }
4766
4767 reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
4768 xerrno = errno;
4769
4770 reply = handle_reply(redis, cmd, reply);
4771 if (reply == NULL) {
4772 pr_trace_msg(trace_channel, 2,
4773 "error setting items in set using key (%lu bytes): %s",
4774 (unsigned long) keysz, strerror(errno));
4775 destroy_pool(tmp_pool);
4776 errno = xerrno;
4777 return -1;
4778 }
4779
4780 if (reply->type != REDIS_REPLY_INTEGER) {
4781 pr_trace_msg(trace_channel, 2,
4782 "expected INTEGER reply for %s, got %s", cmd,
4783 get_reply_type(reply->type));
4784
4785 if (reply->type == REDIS_REPLY_ERROR) {
4786 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4787 }
4788 freeReplyObject(reply);
4789 destroy_pool(tmp_pool);
4790 errno = EINVAL;
4791 return -1;
4792 }
4793
4794 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4795
4796 freeReplyObject(reply);
4797 destroy_pool(tmp_pool);
4798 return 0;
4799 }
4800
pr_redis_sorted_set_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float score)4801 int pr_redis_sorted_set_kadd(pr_redis_t *redis, module *m, const char *key,
4802 size_t keysz, void *value, size_t valuesz, float score) {
4803 int xerrno = 0, exists = FALSE;
4804 pool *tmp_pool = NULL;
4805 const char *cmd = NULL;
4806 redisReply *reply;
4807
4808 if (redis == NULL ||
4809 m == NULL ||
4810 key == NULL ||
4811 keysz == 0 ||
4812 value == NULL ||
4813 valuesz == 0) {
4814 errno = EINVAL;
4815 return -1;
4816 }
4817
4818 /* Note: We should probably detect the server version, and instead of using
4819 * a separate existence check, if server >= 3.0.2, use the NX/XX flags of
4820 * the ZADD command.
4821 */
4822 exists = pr_redis_sorted_set_kexists(redis, m, key, keysz, value, valuesz);
4823 if (exists == TRUE) {
4824 errno = EEXIST;
4825 return -1;
4826 }
4827
4828 tmp_pool = make_sub_pool(redis->pool);
4829 pr_pool_tag(tmp_pool, "Redis ZADD pool");
4830
4831 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4832
4833 cmd = "ZADD";
4834 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4835 reply = redisCommand(redis->ctx, "%s %b %f %b", cmd, key, keysz, score,
4836 value, valuesz);
4837 xerrno = errno;
4838
4839 reply = handle_reply(redis, cmd, reply);
4840 if (reply == NULL) {
4841 pr_trace_msg(trace_channel, 2,
4842 "error adding to sorted set using key (%lu bytes): %s",
4843 (unsigned long) keysz, strerror(errno));
4844 destroy_pool(tmp_pool);
4845 errno = xerrno;
4846 return -1;
4847 }
4848
4849 if (reply->type != REDIS_REPLY_INTEGER) {
4850 pr_trace_msg(trace_channel, 2,
4851 "expected INTEGER reply for %s, got %s", cmd,
4852 get_reply_type(reply->type));
4853
4854 if (reply->type == REDIS_REPLY_ERROR) {
4855 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4856 }
4857 freeReplyObject(reply);
4858 destroy_pool(tmp_pool);
4859 errno = EINVAL;
4860 return -1;
4861 }
4862
4863 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4864
4865 freeReplyObject(reply);
4866 destroy_pool(tmp_pool);
4867 return 0;
4868 }
4869
pr_redis_sorted_set_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)4870 int pr_redis_sorted_set_kcount(pr_redis_t *redis, module *m, const char *key,
4871 size_t keysz, uint64_t *count) {
4872 int xerrno = 0;
4873 pool *tmp_pool = NULL;
4874 const char *cmd = NULL;
4875 redisReply *reply;
4876
4877 if (redis == NULL ||
4878 m == NULL ||
4879 key == NULL ||
4880 keysz == 0 ||
4881 count == NULL) {
4882 errno = EINVAL;
4883 return -1;
4884 }
4885
4886 tmp_pool = make_sub_pool(redis->pool);
4887 pr_pool_tag(tmp_pool, "Redis ZCARD pool");
4888
4889 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4890
4891 cmd = "ZCARD";
4892 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4893 reply = redisCommand(redis->ctx, "%s %b", cmd, key, keysz);
4894 xerrno = errno;
4895
4896 reply = handle_reply(redis, cmd, reply);
4897 if (reply == NULL) {
4898 pr_trace_msg(trace_channel, 2,
4899 "error getting count of sorted set using key (%lu bytes): %s",
4900 (unsigned long) keysz, strerror(errno));
4901 destroy_pool(tmp_pool);
4902 errno = xerrno;
4903 return -1;
4904 }
4905
4906 if (reply->type != REDIS_REPLY_INTEGER) {
4907 pr_trace_msg(trace_channel, 2,
4908 "expected INTEGER reply for %s, got %s", cmd,
4909 get_reply_type(reply->type));
4910
4911 if (reply->type == REDIS_REPLY_ERROR) {
4912 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4913 }
4914
4915 freeReplyObject(reply);
4916 destroy_pool(tmp_pool);
4917 errno = EINVAL;
4918 return -1;
4919 }
4920
4921 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4922 *count = (uint64_t) reply->integer;
4923
4924 freeReplyObject(reply);
4925 destroy_pool(tmp_pool);
4926 return 0;
4927 }
4928
pr_redis_sorted_set_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4929 int pr_redis_sorted_set_kdelete(pr_redis_t *redis, module *m, const char *key,
4930 size_t keysz, void *value, size_t valuesz) {
4931 int xerrno = 0;
4932 pool *tmp_pool = NULL;
4933 const char *cmd = NULL;
4934 redisReply *reply;
4935 long long count = 0;
4936
4937 if (redis == NULL ||
4938 m == NULL ||
4939 key == NULL ||
4940 keysz == 0 ||
4941 value == NULL ||
4942 valuesz == 0) {
4943 errno = EINVAL;
4944 return -1;
4945 }
4946
4947 tmp_pool = make_sub_pool(redis->pool);
4948 pr_pool_tag(tmp_pool, "Redis ZREM pool");
4949
4950 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
4951
4952 cmd = "ZREM";
4953 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
4954 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
4955 xerrno = errno;
4956
4957 reply = handle_reply(redis, cmd, reply);
4958 if (reply == NULL) {
4959 pr_trace_msg(trace_channel, 2,
4960 "error deleting item from sorted set using key (%lu bytes): %s",
4961 (unsigned long) keysz, strerror(errno));
4962 destroy_pool(tmp_pool);
4963 errno = xerrno;
4964 return -1;
4965 }
4966
4967 if (reply->type != REDIS_REPLY_INTEGER) {
4968 pr_trace_msg(trace_channel, 2,
4969 "expected INTEGER reply for %s, got %s", cmd,
4970 get_reply_type(reply->type));
4971
4972 if (reply->type == REDIS_REPLY_ERROR) {
4973 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
4974 }
4975
4976 freeReplyObject(reply);
4977 destroy_pool(tmp_pool);
4978 errno = EINVAL;
4979 return -1;
4980 }
4981
4982 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
4983 count = reply->integer;
4984
4985 freeReplyObject(reply);
4986 destroy_pool(tmp_pool);
4987
4988 if (count == 0) {
4989 /* No items removed. */
4990 errno = ENOENT;
4991 return -1;
4992 }
4993
4994 return 0;
4995 }
4996
pr_redis_sorted_set_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)4997 int pr_redis_sorted_set_kexists(pr_redis_t *redis, module *m, const char *key,
4998 size_t keysz, void *value, size_t valuesz) {
4999 int xerrno = 0, exists = FALSE;
5000 pool *tmp_pool = NULL;
5001 const char *cmd = NULL;
5002 redisReply *reply;
5003
5004 if (redis == NULL ||
5005 m == NULL ||
5006 key == NULL ||
5007 keysz == 0 ||
5008 value == NULL ||
5009 valuesz == 0) {
5010 errno = EINVAL;
5011 return -1;
5012 }
5013
5014 tmp_pool = make_sub_pool(redis->pool);
5015 pr_pool_tag(tmp_pool, "Redis ZRANK pool");
5016
5017 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5018
5019 cmd = "ZRANK";
5020 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5021 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
5022 xerrno = errno;
5023
5024 reply = handle_reply(redis, cmd, reply);
5025 if (reply == NULL) {
5026 pr_trace_msg(trace_channel, 2,
5027 "error checking item in sorted set using key (%lu bytes): %s",
5028 (unsigned long) keysz, strerror(errno));
5029 destroy_pool(tmp_pool);
5030 errno = xerrno;
5031 return -1;
5032 }
5033
5034 if (reply->type != REDIS_REPLY_INTEGER &&
5035 reply->type != REDIS_REPLY_NIL) {
5036 pr_trace_msg(trace_channel, 2,
5037 "expected INTEGER or NIL reply for %s, got %s", cmd,
5038 get_reply_type(reply->type));
5039
5040 if (reply->type == REDIS_REPLY_ERROR) {
5041 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5042 }
5043
5044 freeReplyObject(reply);
5045 destroy_pool(tmp_pool);
5046 errno = EINVAL;
5047 return -1;
5048 }
5049
5050 if (reply->type == REDIS_REPLY_INTEGER) {
5051 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
5052 exists = TRUE;
5053
5054 } else {
5055 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
5056 exists = FALSE;
5057 }
5058
5059 freeReplyObject(reply);
5060 destroy_pool(tmp_pool);
5061 return exists;
5062 }
5063
pr_redis_sorted_set_kgetn(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int offset,unsigned int len,array_header ** values,array_header ** valueszs,int flags)5064 int pr_redis_sorted_set_kgetn(pool *p, pr_redis_t *redis, module *m,
5065 const char *key, size_t keysz, unsigned int offset, unsigned int len,
5066 array_header **values, array_header **valueszs, int flags) {
5067 int res = 0, xerrno = 0;
5068 pool *tmp_pool = NULL;
5069 const char *cmd = NULL;
5070 redisReply *reply;
5071
5072 if (p == NULL ||
5073 redis == NULL ||
5074 m == NULL ||
5075 key == NULL ||
5076 keysz == 0 ||
5077 values == NULL ||
5078 valueszs == NULL) {
5079 errno = EINVAL;
5080 return -1;
5081 }
5082
5083 tmp_pool = make_sub_pool(redis->pool);
5084
5085 switch (flags) {
5086 case PR_REDIS_SORTED_SET_FL_ASC:
5087 pr_pool_tag(tmp_pool, "Redis ZRANGE pool");
5088 cmd = "ZRANGE";
5089 break;
5090
5091 case PR_REDIS_SORTED_SET_FL_DESC:
5092 pr_pool_tag(tmp_pool, "Redis ZREVRANGE pool");
5093 cmd = "ZREVRANGE";
5094 break;
5095
5096 default:
5097 errno = EINVAL;
5098 return -1;
5099 }
5100
5101 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5102
5103 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5104
5105 /* Since the the range is [start, stop] inclusive, and the function takes
5106 * a length, we need to subtract one for whose items these are. Consider
5107 * an offset of 0, and a len of 1 -- to get just one item. In that case,
5108 * stop would be 0 as well.
5109 */
5110 reply = redisCommand(redis->ctx, "%s %b %u %u", cmd, key, keysz, offset,
5111 offset + len - 1);
5112 xerrno = errno;
5113
5114 reply = handle_reply(redis, cmd, reply);
5115 if (reply == NULL) {
5116 pr_trace_msg(trace_channel, 2,
5117 "error getting %u %s in sorted set using key (%lu bytes): %s", len,
5118 len != 1 ? "items" : "item", (unsigned long) keysz, strerror(errno));
5119 destroy_pool(tmp_pool);
5120 errno = xerrno;
5121 return -1;
5122 }
5123
5124 if (reply->type != REDIS_REPLY_ARRAY) {
5125 pr_trace_msg(trace_channel, 2,
5126 "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
5127
5128 if (reply->type == REDIS_REPLY_ERROR) {
5129 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5130 }
5131
5132 freeReplyObject(reply);
5133 destroy_pool(tmp_pool);
5134 errno = EINVAL;
5135 return -1;
5136 }
5137
5138 if (reply->elements > 0) {
5139 register unsigned int i;
5140
5141 pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
5142 (unsigned long) reply->elements,
5143 reply->elements != 1 ? "elements" : "element");
5144
5145 *values = make_array(p, 0, sizeof(void *));
5146 *valueszs = make_array(p, 0, sizeof(size_t));
5147
5148 for (i = 0; i < reply->elements; i++) {
5149 redisReply *value_elt;
5150 void *value_data = NULL;
5151 size_t value_datasz = 0;
5152
5153 value_elt = reply->element[i];
5154 if (value_elt->type == REDIS_REPLY_STRING) {
5155 value_datasz = value_elt->len;
5156 value_data = palloc(p, value_datasz);
5157 memcpy(value_data, value_elt->str, value_datasz);
5158
5159 } else {
5160 pr_trace_msg(trace_channel, 2,
5161 "expected STRING element at index %u, got %s", i + 1,
5162 get_reply_type(value_elt->type));
5163 }
5164
5165 if (value_data != NULL) {
5166 *((void **) push_array(*values)) = value_data;
5167 *((size_t *) push_array(*valueszs)) = value_datasz;
5168 }
5169 }
5170
5171 res = 0;
5172
5173 } else {
5174 xerrno = ENOENT;
5175 res = -1;
5176 }
5177
5178 freeReplyObject(reply);
5179 destroy_pool(tmp_pool);
5180
5181 errno = xerrno;
5182 return res;
5183 }
5184
pr_redis_sorted_set_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float incr,float * score)5185 int pr_redis_sorted_set_kincr(pr_redis_t *redis, module *m, const char *key,
5186 size_t keysz, void *value, size_t valuesz, float incr, float *score) {
5187 int res, xerrno, exists;
5188 pool *tmp_pool = NULL;
5189 const char *cmd = NULL;
5190 redisReply *reply;
5191
5192 if (redis == NULL ||
5193 m == NULL ||
5194 key == NULL ||
5195 keysz == 0 ||
5196 value == NULL ||
5197 valuesz == 0 ||
5198 score == NULL) {
5199 errno = EINVAL;
5200 return -1;
5201 }
5202
5203 exists = pr_redis_sorted_set_kexists(redis, m, key, keysz, value, valuesz);
5204 if (exists == FALSE) {
5205 errno = ENOENT;
5206 return -1;
5207 }
5208
5209 tmp_pool = make_sub_pool(redis->pool);
5210 pr_pool_tag(tmp_pool, "Redis ZINCRBY pool");
5211
5212 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5213
5214 cmd = "ZINCRBY";
5215 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5216 reply = redisCommand(redis->ctx, "%s %b %f %b", cmd, key, keysz, incr,
5217 value, valuesz);
5218 xerrno = errno;
5219
5220 reply = handle_reply(redis, cmd, reply);
5221 if (reply == NULL) {
5222 pr_trace_msg(trace_channel, 2,
5223 "error incrementing key (%lu bytes) by %0.3f in sorted set using %s: %s",
5224 (unsigned long) keysz, incr, cmd, strerror(errno));
5225 destroy_pool(tmp_pool);
5226 errno = EIO;
5227 return -1;
5228 }
5229
5230 if (reply->type != REDIS_REPLY_STRING) {
5231 pr_trace_msg(trace_channel, 2,
5232 "expected STRING reply for %s, got %s", cmd,
5233 get_reply_type(reply->type));
5234
5235 if (reply->type == REDIS_REPLY_ERROR) {
5236 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5237 }
5238
5239 freeReplyObject(reply);
5240 destroy_pool(tmp_pool);
5241 errno = EINVAL;
5242 return -1;
5243 }
5244
5245 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
5246 reply->str);
5247
5248 res = sscanf(reply->str, "%f", score);
5249 if (res != 1) {
5250 pr_trace_msg(trace_channel, 3, "error parsing '%.*s' as float",
5251 (int) reply->len, reply->str);
5252 xerrno = EINVAL;
5253 res = -1;
5254
5255 } else {
5256 res = 0;
5257 }
5258
5259 freeReplyObject(reply);
5260 destroy_pool(tmp_pool);
5261
5262 errno = xerrno;
5263 return res;
5264 }
5265
pr_redis_sorted_set_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)5266 int pr_redis_sorted_set_kremove(pr_redis_t *redis, module *m, const char *key,
5267 size_t keysz) {
5268
5269 /* Note: We can actually use just DEL here. */
5270 return pr_redis_kremove(redis, m, key, keysz);
5271 }
5272
pr_redis_sorted_set_kscore(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float * score)5273 int pr_redis_sorted_set_kscore(pr_redis_t *redis, module *m, const char *key,
5274 size_t keysz, void *value, size_t valuesz, float *score) {
5275 int res, xerrno;
5276 pool *tmp_pool = NULL;
5277 const char *cmd = NULL;
5278 redisReply *reply;
5279
5280 if (redis == NULL ||
5281 m == NULL ||
5282 key == NULL ||
5283 keysz == 0 ||
5284 value == NULL ||
5285 valuesz == 0 ||
5286 score == NULL) {
5287 errno = EINVAL;
5288 return -1;
5289 }
5290
5291 tmp_pool = make_sub_pool(redis->pool);
5292 pr_pool_tag(tmp_pool, "Redis ZSCORE pool");
5293
5294 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5295
5296 cmd = "ZSCORE";
5297 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5298 reply = redisCommand(redis->ctx, "%s %b %b", cmd, key, keysz, value, valuesz);
5299 xerrno = errno;
5300
5301 reply = handle_reply(redis, cmd, reply);
5302 if (reply == NULL) {
5303 pr_trace_msg(trace_channel, 2,
5304 "error gettin score for key (%lu bytes) using %s: %s",
5305 (unsigned long) keysz, cmd, strerror(errno));
5306 destroy_pool(tmp_pool);
5307 errno = EIO;
5308 return -1;
5309 }
5310
5311 if (reply->type != REDIS_REPLY_STRING &&
5312 reply->type != REDIS_REPLY_NIL) {
5313 pr_trace_msg(trace_channel, 2,
5314 "expected STRING or NIL reply for %s, got %s", cmd,
5315 get_reply_type(reply->type));
5316
5317 if (reply->type == REDIS_REPLY_ERROR) {
5318 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5319 }
5320
5321 freeReplyObject(reply);
5322 destroy_pool(tmp_pool);
5323 errno = EINVAL;
5324 return -1;
5325 }
5326
5327 if (reply->type == REDIS_REPLY_STRING) {
5328 pr_trace_msg(trace_channel, 7, "%s reply: %.*s", cmd, (int) reply->len,
5329 reply->str);
5330
5331 res = sscanf(reply->str, "%f", score);
5332 if (res != 1) {
5333 pr_trace_msg(trace_channel, 3, "error parsing '%.*s' as float",
5334 (int) reply->len, reply->str);
5335 xerrno = EINVAL;
5336 res = -1;
5337
5338 } else {
5339 res = 0;
5340 }
5341
5342 } else {
5343 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
5344 xerrno = ENOENT;
5345 res = -1;
5346 }
5347
5348 freeReplyObject(reply);
5349 destroy_pool(tmp_pool);
5350
5351 errno = xerrno;
5352 return res;
5353 }
5354
pr_redis_sorted_set_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float score)5355 int pr_redis_sorted_set_kset(pr_redis_t *redis, module *m, const char *key,
5356 size_t keysz, void *value, size_t valuesz, float score) {
5357 int xerrno = 0, exists = FALSE;
5358 pool *tmp_pool = NULL;
5359 const char *cmd = NULL;
5360 redisReply *reply;
5361
5362 if (redis == NULL ||
5363 m == NULL ||
5364 key == NULL ||
5365 keysz == 0 ||
5366 value == NULL ||
5367 valuesz == 0) {
5368 errno = EINVAL;
5369 return -1;
5370 }
5371
5372 /* Note: We should probably detect the server version, and instead of using
5373 * a separate existence check, if server >= 3.0.2, use the NX/XX flags of
5374 * the ZADD command.
5375 */
5376 exists = pr_redis_sorted_set_kexists(redis, m, key, keysz, value, valuesz);
5377 if (exists == FALSE) {
5378 errno = ENOENT;
5379 return -1;
5380 }
5381
5382 tmp_pool = make_sub_pool(redis->pool);
5383 pr_pool_tag(tmp_pool, "Redis ZADD pool");
5384
5385 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5386
5387 cmd = "ZADD";
5388 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5389 reply = redisCommand(redis->ctx, "%s %b %f %b", cmd, key, keysz, score,
5390 value, valuesz);
5391 xerrno = errno;
5392
5393 reply = handle_reply(redis, cmd, reply);
5394 if (reply == NULL) {
5395 pr_trace_msg(trace_channel, 2,
5396 "error setting item in sorted set using key (%lu bytes): %s",
5397 (unsigned long) keysz, strerror(errno));
5398 destroy_pool(tmp_pool);
5399 errno = xerrno;
5400 return -1;
5401 }
5402
5403 if (reply->type != REDIS_REPLY_INTEGER) {
5404 pr_trace_msg(trace_channel, 2,
5405 "expected INTEGER reply for %s, got %s", cmd,
5406 get_reply_type(reply->type));
5407
5408 if (reply->type == REDIS_REPLY_ERROR) {
5409 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5410 }
5411 freeReplyObject(reply);
5412 destroy_pool(tmp_pool);
5413 errno = EINVAL;
5414 return -1;
5415 }
5416
5417 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
5418
5419 freeReplyObject(reply);
5420 destroy_pool(tmp_pool);
5421 return 0;
5422 }
5423
f2s(pool * p,float num,size_t * len)5424 static char *f2s(pool *p, float num, size_t *len) {
5425 int res;
5426 char *s;
5427 size_t sz;
5428
5429 sz = 32;
5430 s = pcalloc(p, sz + 1);
5431 res = pr_snprintf(s, sz, "%0.3f", num);
5432
5433 *len = res;
5434 return s;
5435 }
5436
pr_redis_sorted_set_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs,array_header * scores)5437 int pr_redis_sorted_set_ksetall(pr_redis_t *redis, module *m, const char *key,
5438 size_t keysz, array_header *values, array_header *valueszs,
5439 array_header *scores) {
5440 register unsigned int i;
5441 int res, xerrno = 0;
5442 pool *tmp_pool = NULL;
5443 array_header *args, *arglens;
5444 const char *cmd = NULL;
5445 redisReply *reply;
5446
5447 if (redis == NULL ||
5448 m == NULL ||
5449 key == NULL ||
5450 keysz == 0 ||
5451 values == NULL ||
5452 values->nelts == 0 ||
5453 valueszs == NULL ||
5454 valueszs->nelts == 0 ||
5455 scores == NULL ||
5456 scores->nelts == 0) {
5457 errno = EINVAL;
5458 return -1;
5459 }
5460
5461 if (values->nelts != valueszs->nelts ||
5462 values->nelts != scores->nelts) {
5463 errno = EINVAL;
5464 return -1;
5465 }
5466
5467 /* First, delete any existing sorted set at this key; a set operation,
5468 * in my mind, is a complete overwrite.
5469 */
5470 res = pr_redis_sorted_set_kremove(redis, m, key, keysz);
5471 if (res < 0 &&
5472 errno != ENOENT) {
5473 return -1;
5474 }
5475
5476 tmp_pool = make_sub_pool(redis->pool);
5477 pr_pool_tag(tmp_pool, "Redis ZADD pool");
5478
5479 key = get_namespace_key(tmp_pool, redis, m, key, &keysz);
5480
5481 cmd = "ZADD";
5482 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5483
5484 args = make_array(tmp_pool, 0, sizeof(char *));
5485 arglens = make_array(tmp_pool, 0, sizeof(size_t));
5486
5487 *((char **) push_array(args)) = pstrdup(tmp_pool, cmd);
5488 *((size_t *) push_array(arglens)) = strlen(cmd);
5489
5490 *((char **) push_array(args)) = (char *) key;
5491 *((size_t *) push_array(arglens)) = keysz;
5492
5493 for (i = 0; i < values->nelts; i++) {
5494 size_t scoresz = 0;
5495
5496 pr_signals_handle();
5497
5498 *((char **) push_array(args)) = f2s(tmp_pool, ((float *) scores->elts)[i],
5499 &scoresz);
5500 *((size_t *) push_array(arglens)) = scoresz;
5501
5502 *((char **) push_array(args)) = ((char **) values->elts)[i];
5503 *((size_t *) push_array(arglens)) = ((size_t *) valueszs->elts)[i];
5504 }
5505
5506 reply = redisCommandArgv(redis->ctx, args->nelts, args->elts, arglens->elts);
5507 xerrno = errno;
5508
5509 reply = handle_reply(redis, cmd, reply);
5510 if (reply == NULL) {
5511 pr_trace_msg(trace_channel, 2,
5512 "error setting items in sorted set using key (%lu bytes): %s",
5513 (unsigned long) keysz, strerror(errno));
5514 destroy_pool(tmp_pool);
5515 errno = xerrno;
5516 return -1;
5517 }
5518
5519 if (reply->type != REDIS_REPLY_INTEGER) {
5520 pr_trace_msg(trace_channel, 2,
5521 "expected INTEGER reply for %s, got %s", cmd,
5522 get_reply_type(reply->type));
5523
5524 if (reply->type == REDIS_REPLY_ERROR) {
5525 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5526 }
5527 freeReplyObject(reply);
5528 destroy_pool(tmp_pool);
5529 errno = EINVAL;
5530 return -1;
5531 }
5532
5533 pr_trace_msg(trace_channel, 7, "%s reply: %lld", cmd, reply->integer);
5534
5535 freeReplyObject(reply);
5536 destroy_pool(tmp_pool);
5537 return 0;
5538 }
5539
pr_redis_sentinel_get_master_addr(pool * p,pr_redis_t * redis,const char * name,pr_netaddr_t ** addr)5540 int pr_redis_sentinel_get_master_addr(pool *p, pr_redis_t *redis,
5541 const char *name, pr_netaddr_t **addr) {
5542 int res = 0, xerrno = 0;
5543 pool *tmp_pool = NULL;
5544 const char *cmd = NULL;
5545 redisReply *reply;
5546
5547 if (p == NULL ||
5548 redis == NULL ||
5549 name == NULL ||
5550 addr == NULL) {
5551 errno = EINVAL;
5552 return -1;
5553 }
5554
5555 tmp_pool = make_sub_pool(redis->pool);
5556 pr_pool_tag(tmp_pool, "Redis SENTINEL pool");
5557
5558 cmd = "SENTINEL";
5559 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5560 reply = redisCommand(redis->ctx, "%s get-master-addr-by-name %s", cmd, name);
5561 xerrno = errno;
5562
5563 reply = handle_reply(redis, cmd, reply);
5564 if (reply == NULL) {
5565 pr_trace_msg(trace_channel, 2,
5566 "error getting address for master '%s': %s", name, strerror(errno));
5567 destroy_pool(tmp_pool);
5568 errno = EIO;
5569 return -1;
5570 }
5571
5572 if (reply->type == REDIS_REPLY_NIL) {
5573 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
5574 freeReplyObject(reply);
5575 destroy_pool(tmp_pool);
5576
5577 errno = ENOENT;
5578 return -1;
5579 }
5580
5581 if (reply->type != REDIS_REPLY_ARRAY) {
5582 pr_trace_msg(trace_channel, 2,
5583 "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
5584
5585 if (reply->type == REDIS_REPLY_ERROR) {
5586 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5587 }
5588
5589 freeReplyObject(reply);
5590 destroy_pool(tmp_pool);
5591 errno = EINVAL;
5592 return -1;
5593 }
5594
5595 if (reply->elements > 0) {
5596 redisReply *elt;
5597 char *host = NULL;
5598 int port = -1;
5599
5600 pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
5601 (unsigned long) reply->elements,
5602 reply->elements != 1 ? "elements" : "element");
5603
5604 elt = reply->element[0];
5605 if (elt->type == REDIS_REPLY_STRING) {
5606 host = pstrndup(tmp_pool, elt->str, elt->len);
5607
5608 } else {
5609 pr_trace_msg(trace_channel, 2,
5610 "expected STRING element at index 0, got %s",
5611 get_reply_type(elt->type));
5612 }
5613
5614 elt = reply->element[1];
5615 if (elt->type == REDIS_REPLY_STRING) {
5616 char *port_str;
5617
5618 port_str = pstrndup(tmp_pool, elt->str, elt->len);
5619 port = atoi(port_str);
5620
5621 } else {
5622 pr_trace_msg(trace_channel, 2,
5623 "expected STRING element at index 1, got %s",
5624 get_reply_type(elt->type));
5625 }
5626
5627 if (host != NULL &&
5628 port != -1) {
5629 *addr = (pr_netaddr_t *) pr_netaddr_get_addr(p, host, NULL);
5630 if (*addr != NULL) {
5631 pr_netaddr_set_port2(*addr, port);
5632
5633 } else {
5634 xerrno = errno;
5635 res = -1;
5636 }
5637
5638 } else {
5639 xerrno = ENOENT;
5640 res = -1;
5641 }
5642
5643 } else {
5644 xerrno = ENOENT;
5645 res = -1;
5646 }
5647
5648 freeReplyObject(reply);
5649 destroy_pool(tmp_pool);
5650
5651 errno = xerrno;
5652 return res;
5653 }
5654
pr_redis_sentinel_get_masters(pool * p,pr_redis_t * redis,array_header ** masters)5655 int pr_redis_sentinel_get_masters(pool *p, pr_redis_t *redis,
5656 array_header **masters) {
5657 int res = 0, xerrno = 0;
5658 pool *tmp_pool = NULL;
5659 const char *cmd = NULL;
5660 redisReply *reply;
5661
5662 if (p == NULL ||
5663 redis == NULL ||
5664 masters == NULL) {
5665 errno = EINVAL;
5666 return -1;
5667 }
5668
5669 tmp_pool = make_sub_pool(redis->pool);
5670 pr_pool_tag(tmp_pool, "Redis SENTINEL pool");
5671
5672 cmd = "SENTINEL";
5673 pr_trace_msg(trace_channel, 7, "sending command: %s", cmd);
5674 reply = redisCommand(redis->ctx, "%s masters", cmd);
5675 xerrno = errno;
5676
5677 reply = handle_reply(redis, cmd, reply);
5678 if (reply == NULL) {
5679 pr_trace_msg(trace_channel, 2,
5680 "error getting masters: %s", strerror(errno));
5681 destroy_pool(tmp_pool);
5682 errno = EIO;
5683 return -1;
5684 }
5685
5686 if (reply->type == REDIS_REPLY_NIL) {
5687 pr_trace_msg(trace_channel, 7, "%s reply: nil", cmd);
5688 freeReplyObject(reply);
5689 destroy_pool(tmp_pool);
5690
5691 errno = ENOENT;
5692 return -1;
5693 }
5694
5695 if (reply->type != REDIS_REPLY_ARRAY) {
5696 pr_trace_msg(trace_channel, 2,
5697 "expected ARRAY reply for %s, got %s", cmd, get_reply_type(reply->type));
5698
5699 if (reply->type == REDIS_REPLY_ERROR) {
5700 pr_trace_msg(trace_channel, 2, "%s error: %s", cmd, reply->str);
5701 }
5702
5703 freeReplyObject(reply);
5704 destroy_pool(tmp_pool);
5705 errno = EINVAL;
5706 return -1;
5707 }
5708
5709 if (reply->elements > 0) {
5710 register unsigned int i;
5711
5712 pr_trace_msg(trace_channel, 7, "%s reply: %lu %s", cmd,
5713 (unsigned long) reply->elements,
5714 reply->elements != 1 ? "elements" : "element");
5715
5716 *masters = make_array(p, reply->elements, sizeof(char *));
5717
5718 for (i = 0; i < reply->elements; i++) {
5719 redisReply *elt;
5720
5721 elt = reply->element[i];
5722 if (elt->type == REDIS_REPLY_ARRAY) {
5723 redisReply *info;
5724
5725 info = elt->element[1];
5726 *((char **) push_array(*masters)) = pstrndup(p, info->str, info->len);
5727
5728 } else {
5729 pr_trace_msg(trace_channel, 2,
5730 "expected ARRAY element at index %u, got %s", i,
5731 get_reply_type(elt->type));
5732 }
5733 }
5734
5735 } else {
5736 xerrno = ENOENT;
5737 res = -1;
5738 }
5739
5740 freeReplyObject(reply);
5741 destroy_pool(tmp_pool);
5742
5743 errno = xerrno;
5744 return res;
5745 }
5746
redis_set_server2(const char * server,int port,unsigned long flags,const char * username,const char * password,const char * db_idx)5747 int redis_set_server2(const char *server, int port, unsigned long flags,
5748 const char *username, const char *password, const char *db_idx) {
5749
5750 if (server == NULL) {
5751 /* By using a port of -2 specifically, we can use this function to
5752 * clear the server/port, for testing purposes ONLY.
5753 */
5754 if (port < 1 &&
5755 port != -2) {
5756 errno = EINVAL;
5757 return -1;
5758 }
5759 }
5760
5761 redis_server = server;
5762 redis_port = port;
5763 redis_flags = flags;
5764 redis_username = username;
5765 redis_password = password;
5766 redis_db_idx = db_idx;
5767
5768 return 0;
5769 }
5770
redis_set_server(const char * server,int port,unsigned long flags,const char * password,const char * db_idx)5771 int redis_set_server(const char *server, int port, unsigned long flags,
5772 const char *password, const char *db_idx) {
5773 return redis_set_server2(server, port, flags, "default", password, db_idx);
5774 }
5775
redis_set_sentinels(array_header * sentinels,const char * name)5776 int redis_set_sentinels(array_header *sentinels, const char *name) {
5777
5778 if (sentinels != NULL &&
5779 sentinels->nelts == 0) {
5780 errno = EINVAL;
5781 return -1;
5782 }
5783
5784 redis_sentinels = sentinels;
5785 redis_sentinel_master = name;
5786
5787 return 0;
5788 }
5789
redis_set_timeouts(unsigned long connect_millis,unsigned long io_millis)5790 int redis_set_timeouts(unsigned long connect_millis, unsigned long io_millis) {
5791 redis_connect_millis = connect_millis;
5792 redis_io_millis = io_millis;
5793
5794 return 0;
5795 }
5796
redis_clear(void)5797 int redis_clear(void) {
5798 if (sess_redis != NULL) {
5799 pr_redis_conn_destroy(sess_redis);
5800 sess_redis = NULL;
5801 }
5802
5803 return 0;
5804 }
5805
redis_init(void)5806 int redis_init(void) {
5807 return 0;
5808 }
5809
5810 #else
5811
pr_redis_conn_get(pool * p,unsigned long flags)5812 pr_redis_t *pr_redis_conn_get(pool *p, unsigned long flags) {
5813 errno = ENOSYS;
5814 return NULL;
5815 }
5816
pr_redis_conn_new(pool * p,module * m,unsigned long flags)5817 pr_redis_t *pr_redis_conn_new(pool *p, module *m, unsigned long flags) {
5818 errno = ENOSYS;
5819 return NULL;
5820 }
5821
pr_redis_conn_close(pr_redis_t * redis)5822 int pr_redis_conn_close(pr_redis_t *redis) {
5823 errno = ENOSYS;
5824 return -1;
5825 }
5826
pr_redis_conn_destroy(pr_redis_t * redis)5827 int pr_redis_conn_destroy(pr_redis_t *redis) {
5828 errno = ENOSYS;
5829 return -1;
5830 }
5831
pr_redis_conn_set_namespace(pr_redis_t * redis,module * m,const void * prefix,size_t prefixsz)5832 int pr_redis_conn_set_namespace(pr_redis_t *redis, module *m,
5833 const void *prefix, size_t prefixsz) {
5834 errno = ENOSYS;
5835 return -1;
5836 }
5837
pr_redis_auth(pr_redis_t * redis,const char * password)5838 int pr_redis_auth(pr_redis_t *redis, const char *password) {
5839 errno = ENOSYS;
5840 return -1;
5841 }
5842
pr_redis_select(pr_redis_t * redis,const char * db_idx)5843 int pr_redis_select(pr_redis_t *redis, const char *db_idx) {
5844 errno = ENOSYS;
5845 return -1;
5846 }
5847
pr_redis_command(pr_redis_t * redis,const array_header * args,int reply_type)5848 int pr_redis_command(pr_redis_t *redis, const array_header *args,
5849 int reply_type) {
5850 errno = ENOSYS;
5851 return -1;
5852 }
5853
pr_redis_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,time_t expires)5854 int pr_redis_add(pr_redis_t *redis, module *m, const char *key, void *value,
5855 size_t valuesz, time_t expires) {
5856 errno = ENOSYS;
5857 return -1;
5858 }
5859
pr_redis_decr(pr_redis_t * redis,module * m,const char * key,uint32_t decr,uint64_t * value)5860 int pr_redis_decr(pr_redis_t *redis, module *m, const char *key, uint32_t decr,
5861 uint64_t *value) {
5862 errno = ENOSYS;
5863 return -1;
5864 }
5865
pr_redis_get(pool * p,pr_redis_t * redis,module * m,const char * key,size_t * valuesz)5866 void *pr_redis_get(pool *p, pr_redis_t *redis, module *m, const char *key,
5867 size_t *valuesz) {
5868 errno = ENOSYS;
5869 return NULL;
5870 }
5871
pr_redis_get_str(pool * p,pr_redis_t * redis,module * m,const char * key)5872 char *pr_redis_get_str(pool *p, pr_redis_t *redis, module *m, const char *key) {
5873 errno = ENOSYS;
5874 return NULL;
5875 }
5876
pr_redis_incr(pr_redis_t * redis,module * m,const char * key,uint32_t incr,uint64_t * value)5877 int pr_redis_incr(pr_redis_t *redis, module *m, const char *key, uint32_t incr,
5878 uint64_t *value) {
5879 errno = ENOSYS;
5880 return -1;
5881 }
5882
pr_redis_remove(pr_redis_t * redis,module * m,const char * key)5883 int pr_redis_remove(pr_redis_t *redis, module *m, const char *key) {
5884 errno = ENOSYS;
5885 return -1;
5886 }
5887
pr_redis_rename(pr_redis_t * redis,module * m,const char * from,const char * to)5888 int pr_redis_rename(pr_redis_t *redis, module *m, const char *from,
5889 const char *to) {
5890 errno = ENOSYS;
5891 return -1;
5892 }
5893
pr_redis_set(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,time_t expires)5894 int pr_redis_set(pr_redis_t *redis, module *m, const char *key, void *value,
5895 size_t valuesz, time_t expires) {
5896 errno = ENOSYS;
5897 return -1;
5898 }
5899
pr_redis_hash_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)5900 int pr_redis_hash_count(pr_redis_t *redis, module *m, const char *key,
5901 uint64_t *count) {
5902 errno = ENOSYS;
5903 return -1;
5904 }
5905
pr_redis_hash_delete(pr_redis_t * redis,module * m,const char * key,const char * field)5906 int pr_redis_hash_delete(pr_redis_t *redis, module *m, const char *key,
5907 const char *field) {
5908 errno = ENOSYS;
5909 return -1;
5910 }
5911
pr_redis_hash_exists(pr_redis_t * redis,module * m,const char * key,const char * field)5912 int pr_redis_hash_exists(pr_redis_t *redis, module *m, const char *key,
5913 const char *field) {
5914 errno = ENOSYS;
5915 return -1;
5916 }
5917
pr_redis_hash_get(pool * p,pr_redis_t * redis,module * m,const char * key,const char * field,void ** value,size_t * valuesz)5918 int pr_redis_hash_get(pool *p, pr_redis_t *redis, module *m, const char *key,
5919 const char *field, void **value, size_t *valuesz) {
5920 errno = ENOSYS;
5921 return -1;
5922 }
5923
pr_redis_hash_getall(pool * p,pr_redis_t * redis,module * m,const char * key,pr_table_t ** hash)5924 int pr_redis_hash_getall(pool *p, pr_redis_t *redis, module *m,
5925 const char *key, pr_table_t **hash) {
5926 errno = ENOSYS;
5927 return -1;
5928 }
5929
pr_redis_hash_incr(pr_redis_t * redis,module * m,const char * key,const char * field,int32_t incr,int64_t * value)5930 int pr_redis_hash_incr(pr_redis_t *redis, module *m, const char *key,
5931 const char *field, int32_t incr, int64_t *value) {
5932 errno = ENOSYS;
5933 return -1;
5934 }
5935
pr_redis_hash_keys(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** fields)5936 int pr_redis_hash_keys(pool *p, pr_redis_t *redis, module *m, const char *key,
5937 array_header **fields) {
5938 errno = ENOSYS;
5939 return -1;
5940 }
5941
pr_redis_hash_remove(pr_redis_t * redis,module * m,const char * key)5942 int pr_redis_hash_remove(pr_redis_t *redis, module *m, const char *key) {
5943 errno = ENOSYS;
5944 return -1;
5945 }
5946
pr_redis_hash_set(pr_redis_t * redis,module * m,const char * key,const char * field,void * value,size_t valuesz)5947 int pr_redis_hash_set(pr_redis_t *redis, module *m, const char *key,
5948 const char *field, void *value, size_t valuesz) {
5949 errno = ENOSYS;
5950 return -1;
5951 }
5952
pr_redis_hash_setall(pr_redis_t * redis,module * m,const char * key,pr_table_t * hash)5953 int pr_redis_hash_setall(pr_redis_t *redis, module *m, const char *key,
5954 pr_table_t *hash) {
5955 errno = ENOSYS;
5956 return -1;
5957 }
5958
pr_redis_hash_values(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values)5959 int pr_redis_hash_values(pool *p, pr_redis_t *redis, module *m,
5960 const char *key, array_header **values) {
5961 errno = ENOSYS;
5962 return -1;
5963 }
5964
pr_redis_list_append(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)5965 int pr_redis_list_append(pr_redis_t *redis, module *m, const char *key,
5966 void *value, size_t valuesz) {
5967 errno = ENOSYS;
5968 return -1;
5969 }
5970
pr_redis_list_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)5971 int pr_redis_list_count(pr_redis_t *redis, module *m, const char *key,
5972 uint64_t *count) {
5973 errno = ENOSYS;
5974 return -1;
5975 }
5976
pr_redis_list_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)5977 int pr_redis_list_delete(pr_redis_t *redis, module *m, const char *key,
5978 void *value, size_t valuesz) {
5979 errno = ENOSYS;
5980 return -1;
5981 }
5982
pr_redis_list_exists(pr_redis_t * redis,module * m,const char * key,unsigned int idx)5983 int pr_redis_list_exists(pr_redis_t *redis, module *m, const char *key,
5984 unsigned int idx) {
5985 errno = ENOSYS;
5986 return -1;
5987 }
5988
pr_redis_list_get(pool * p,pr_redis_t * redis,module * m,const char * key,unsigned int idx,void ** value,size_t * valuesz)5989 int pr_redis_list_get(pool *p, pr_redis_t *redis, module *m, const char *key,
5990 unsigned int idx, void **value, size_t *valuesz) {
5991 errno = ENOSYS;
5992 return -1;
5993 }
5994
pr_redis_list_getall(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values,array_header ** valueszs)5995 int pr_redis_list_getall(pool *p, pr_redis_t *redis, module *m, const char *key,
5996 array_header **values, array_header **valueszs) {
5997 errno = ENOSYS;
5998 return -1;
5999 }
6000
pr_redis_list_pop(pool * p,pr_redis_t * redis,module * m,const char * key,void ** value,size_t * valuesz,int flags)6001 int pr_redis_list_pop(pool *p, pr_redis_t *redis, module *m, const char *key,
6002 void **value, size_t *valuesz, int flags) {
6003 errno = ENOSYS;
6004 return -1;
6005 }
6006
pr_redis_list_push(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,int flags)6007 int pr_redis_list_push(pr_redis_t *redis, module *m, const char *key,
6008 void *value, size_t valuesz, int flags) {
6009 errno = ENOSYS;
6010 return -1;
6011 }
6012
pr_redis_list_remove(pr_redis_t * redis,module * m,const char * key)6013 int pr_redis_list_remove(pr_redis_t *redis, module *m, const char *key) {
6014 errno = ENOSYS;
6015 return -1;
6016 }
6017
pr_redis_list_rotate(pool * p,pr_redis_t * redis,module * m,const char * key,void ** value,size_t * valuesz)6018 int pr_redis_list_rotate(pool *p, pr_redis_t *redis, module *m,
6019 const char *key, void **value, size_t *valuesz) {
6020 errno = ENOSYS;
6021 return -1;
6022 }
6023
pr_redis_list_set(pr_redis_t * redis,module * m,const char * key,unsigned int idx,void * value,size_t valuesz)6024 int pr_redis_list_set(pr_redis_t *redis, module *m, const char *key,
6025 unsigned int idx, void *value, size_t valuesz) {
6026 errno = ENOSYS;
6027 return -1;
6028 }
6029
pr_redis_list_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs)6030 int pr_redis_list_setall(pr_redis_t *redis, module *m, const char *key,
6031 array_header *values, array_header *valueszs) {
6032 errno = ENOSYS;
6033 return -1;
6034 }
6035
pr_redis_set_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6036 int pr_redis_set_add(pr_redis_t *redis, module *m, const char *key,
6037 void *value, size_t valuesz) {
6038 errno = ENOSYS;
6039 return -1;
6040 }
6041
pr_redis_set_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)6042 int pr_redis_set_count(pr_redis_t *redis, module *m, const char *key,
6043 uint64_t *count) {
6044 errno = ENOSYS;
6045 return -1;
6046 }
6047
pr_redis_set_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6048 int pr_redis_set_delete(pr_redis_t *redis, module *m, const char *key,
6049 void *value, size_t valuesz) {
6050 errno = ENOSYS;
6051 return -1;
6052 }
6053
pr_redis_set_exists(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6054 int pr_redis_set_exists(pr_redis_t *redis, module *m, const char *key,
6055 void *value, size_t valuesz) {
6056 errno = ENOSYS;
6057 return -1;
6058 }
6059
pr_redis_set_getall(pool * p,pr_redis_t * redis,module * m,const char * key,array_header ** values,array_header ** valueszs)6060 int pr_redis_set_getall(pool *p, pr_redis_t *redis, module *m, const char *key,
6061 array_header **values, array_header **valueszs) {
6062 errno = ENOSYS;
6063 return -1;
6064 }
6065
pr_redis_set_remove(pr_redis_t * redis,module * m,const char * key)6066 int pr_redis_set_remove(pr_redis_t *redis, module *m, const char *key) {
6067 errno = ENOSYS;
6068 return -1;
6069 }
6070
pr_redis_set_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs)6071 int pr_redis_set_setall(pr_redis_t *redis, module *m, const char *key,
6072 array_header *values, array_header *valueszs) {
6073 errno = ENOSYS;
6074 return -1;
6075 }
6076
pr_redis_sorted_set_add(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float score)6077 int pr_redis_sorted_set_add(pr_redis_t *redis, module *m, const char *key,
6078 void *value, size_t valuesz, float score) {
6079 errno = ENOSYS;
6080 return -1;
6081 }
6082
pr_redis_sorted_set_count(pr_redis_t * redis,module * m,const char * key,uint64_t * count)6083 int pr_redis_sorted_set_count(pr_redis_t *redis, module *m, const char *key,
6084 uint64_t *count) {
6085 errno = ENOSYS;
6086 return -1;
6087 }
6088
pr_redis_sorted_set_delete(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6089 int pr_redis_sorted_set_delete(pr_redis_t *redis, module *m, const char *key,
6090 void *value, size_t valuesz) {
6091 errno = ENOSYS;
6092 return -1;
6093 }
6094
pr_redis_sorted_set_exists(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz)6095 int pr_redis_sorted_set_exists(pr_redis_t *redis, module *m, const char *key,
6096 void *value, size_t valuesz) {
6097 errno = ENOSYS;
6098 return -1;
6099 }
6100
pr_redis_sorted_set_getn(pool * p,pr_redis_t * redis,module * m,const char * key,unsigned int offset,unsigned int len,array_header ** values,array_header ** valueszs,int flags)6101 int pr_redis_sorted_set_getn(pool *p, pr_redis_t *redis, module *m,
6102 const char *key, unsigned int offset, unsigned int len,
6103 array_header **values, array_header **valueszs, int flags) {
6104 errno = ENOSYS;
6105 return -1;
6106 }
6107
pr_redis_sorted_set_incr(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float incr,float * score)6108 int pr_redis_sorted_set_incr(pr_redis_t *redis, module *m, const char *key,
6109 void *value, size_t valuesz, float incr, float *score) {
6110 errno = ENOSYS;
6111 return -1;
6112 }
6113
pr_redis_sorted_set_remove(pr_redis_t * redis,module * m,const char * key)6114 int pr_redis_sorted_set_remove(pr_redis_t *redis, module *m, const char *key) {
6115 errno = ENOSYS;
6116 return -1;
6117 }
6118
pr_redis_sorted_set_score(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float * score)6119 int pr_redis_sorted_set_score(pr_redis_t *redis, module *m, const char *key,
6120 void *value, size_t valuesz, float *score) {
6121 errno = ENOSYS;
6122 return -1;
6123 }
6124
pr_redis_sorted_set_set(pr_redis_t * redis,module * m,const char * key,void * value,size_t valuesz,float score)6125 int pr_redis_sorted_set_set(pr_redis_t *redis, module *m, const char *key,
6126 void *value, size_t valuesz, float score) {
6127 errno = ENOSYS;
6128 return -1;
6129 }
6130
pr_redis_sorted_set_setall(pr_redis_t * redis,module * m,const char * key,array_header * values,array_header * valueszs,array_header * scores)6131 int pr_redis_sorted_set_setall(pr_redis_t *redis, module *m, const char *key,
6132 array_header *values, array_header *valueszs, array_header *scores) {
6133 errno = ENOSYS;
6134 return -1;
6135 }
6136
pr_redis_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,time_t expires)6137 int pr_redis_kadd(pr_redis_t *redis, module *m, const char *key, size_t keysz,
6138 void *value, size_t valuesz, time_t expires) {
6139 errno = ENOSYS;
6140 return -1;
6141 }
6142
pr_redis_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,size_t * valuesz)6143 void *pr_redis_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
6144 size_t keysz, size_t *valuesz) {
6145 errno = ENOSYS;
6146 return NULL;
6147 }
6148
pr_redis_kget_str(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz)6149 char *pr_redis_kget_str(pool *p, pr_redis_t *redis, module *m, const char *key,
6150 size_t keysz) {
6151 errno = ENOSYS;
6152 return NULL;
6153 }
6154
pr_redis_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6155 int pr_redis_kremove(pr_redis_t *redis, module *m, const char *key,
6156 size_t keysz) {
6157 errno = ENOSYS;
6158 return -1;
6159 }
6160
pr_redis_krename(pr_redis_t * redis,module * m,const char * from,size_t fromsz,const char * to,size_t tosz)6161 int pr_redis_krename(pr_redis_t *redis, module *m, const char *from,
6162 size_t fromsz, const char *to, size_t tosz) {
6163 errno = ENOSYS;
6164 return -1;
6165 }
6166
pr_redis_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,time_t expires)6167 int pr_redis_kset(pr_redis_t *redis, module *m, const char *key, size_t keysz,
6168 void *value, size_t valuesz, time_t expires) {
6169 errno = ENOSYS;
6170 return -1;
6171 }
6172
pr_redis_hash_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)6173 int pr_redis_hash_kcount(pr_redis_t *redis, module *m, const char *key,
6174 size_t keysz, uint64_t *count) {
6175 errno = ENOSYS;
6176 return -1;
6177 }
6178
pr_redis_hash_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz)6179 int pr_redis_hash_kdelete(pr_redis_t *redis, module *m, const char *key,
6180 size_t keysz, const char *field, size_t fieldsz) {
6181 errno = ENOSYS;
6182 return -1;
6183 }
6184
pr_redis_hash_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz)6185 int pr_redis_hash_kexists(pr_redis_t *redis, module *m, const char *key,
6186 size_t keysz, const char *field, size_t fieldsz) {
6187 errno = ENOSYS;
6188 return -1;
6189 }
6190
pr_redis_hash_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,void ** value,size_t * valuesz)6191 int pr_redis_hash_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
6192 size_t keysz, const char *field, size_t fieldsz, void **value,
6193 size_t *valuesz) {
6194 errno = ENOSYS;
6195 return -1;
6196 }
6197
pr_redis_hash_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,pr_table_t ** hash)6198 int pr_redis_hash_kgetall(pool *p, pr_redis_t *redis, module *m,
6199 const char *key, size_t keysz, pr_table_t **hash) {
6200 errno = ENOSYS;
6201 return -1;
6202 }
6203
pr_redis_hash_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,int32_t incr,int64_t * value)6204 int pr_redis_hash_kincr(pr_redis_t *redis, module *m, const char *key,
6205 size_t keysz, const char *field, size_t fieldsz, int32_t incr,
6206 int64_t *value) {
6207 errno = ENOSYS;
6208 return -1;
6209 }
6210
pr_redis_hash_kkeys(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** fields)6211 int pr_redis_hash_kkeys(pool *p, pr_redis_t *redis, module *m, const char *key,
6212 size_t keysz, array_header **fields) {
6213 errno = ENOSYS;
6214 return -1;
6215 }
6216
pr_redis_hash_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6217 int pr_redis_hash_kremove(pr_redis_t *redis, module *m, const char *key,
6218 size_t keysz) {
6219 errno = ENOSYS;
6220 return -1;
6221 }
6222
pr_redis_hash_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,const char * field,size_t fieldsz,void * value,size_t valuesz)6223 int pr_redis_hash_kset(pr_redis_t *redis, module *m, const char *key,
6224 size_t keysz, const char *field, size_t fieldsz, void *value,
6225 size_t valuesz) {
6226 errno = ENOSYS;
6227 return -1;
6228 }
6229
pr_redis_hash_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,pr_table_t * hash)6230 int pr_redis_hash_ksetall(pr_redis_t *redis, module *m, const char *key,
6231 size_t keysz, pr_table_t *hash) {
6232 errno = ENOSYS;
6233 return -1;
6234 }
6235
pr_redis_hash_kvalues(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values)6236 int pr_redis_hash_kvalues(pool *p, pr_redis_t *redis, module *m,
6237 const char *key, size_t keysz, array_header **values) {
6238 errno = ENOSYS;
6239 return -1;
6240 }
6241
pr_redis_list_kappend(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6242 int pr_redis_list_kappend(pr_redis_t *redis, module *m, const char *key,
6243 size_t keysz, void *value, size_t valuesz) {
6244 errno = ENOSYS;
6245 return -1;
6246 }
6247
pr_redis_list_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)6248 int pr_redis_list_kcount(pr_redis_t *redis, module *m, const char *key,
6249 size_t keysz, uint64_t *count) {
6250 errno = ENOSYS;
6251 return -1;
6252 }
6253
pr_redis_list_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6254 int pr_redis_list_kdelete(pr_redis_t *redis, module *m, const char *key,
6255 size_t keysz, void *value, size_t valuesz) {
6256 errno = ENOSYS;
6257 return -1;
6258 }
6259
pr_redis_list_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx)6260 int pr_redis_list_kexists(pr_redis_t *redis, module *m, const char *key,
6261 size_t keysz, unsigned int idx) {
6262 errno = ENOSYS;
6263 return -1;
6264 }
6265
pr_redis_list_kget(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx,void ** value,size_t * valuesz)6266 int pr_redis_list_kget(pool *p, pr_redis_t *redis, module *m, const char *key,
6267 size_t keysz, unsigned int idx, void **value, size_t *valuesz) {
6268 errno = ENOSYS;
6269 return -1;
6270 }
6271
pr_redis_list_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values,array_header ** valueszs)6272 int pr_redis_list_kgetall(pool *p, pr_redis_t *redis, module *m,
6273 const char *key, size_t keysz, array_header **values,
6274 array_header **valueszs) {
6275 errno = ENOSYS;
6276 return -1;
6277 }
6278
pr_redis_list_kpop(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,void ** value,size_t * valuesz,int flags)6279 int pr_redis_list_kpop(pool *p, pr_redis_t *redis, module *m, const char *key,
6280 size_t keysz, void **value, size_t *valuesz, int flags) {
6281 errno = ENOSYS;
6282 return -1;
6283 }
6284
pr_redis_list_kpush(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,int flags)6285 int pr_redis_list_kpush(pr_redis_t *redis, module *m, const char *key,
6286 size_t keysz, void *value, size_t valuesz, int flags) {
6287 errno = ENOSYS;
6288 return -1;
6289 }
6290
pr_redis_list_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6291 int pr_redis_list_kremove(pr_redis_t *redis, module *m, const char *key,
6292 size_t keysz) {
6293 errno = ENOSYS;
6294 return -1;
6295 }
6296
pr_redis_list_krotate(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,void ** value,size_t * valuesz)6297 int pr_redis_list_krotate(pool *p, pr_redis_t *redis, module *m,
6298 const char *key, size_t keysz, void **value, size_t *valuesz) {
6299 errno = ENOSYS;
6300 return -1;
6301 }
6302
pr_redis_list_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int idx,void * value,size_t valuesz)6303 int pr_redis_list_kset(pr_redis_t *redis, module *m, const char *key,
6304 size_t keysz, unsigned int idx, void *value, size_t valuesz) {
6305 errno = ENOSYS;
6306 return -1;
6307 }
6308
pr_redis_list_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs)6309 int pr_redis_list_ksetall(pr_redis_t *redis, module *m, const char *key,
6310 size_t keysz, array_header *values, array_header *valueszs) {
6311 errno = ENOSYS;
6312 return -1;
6313 }
6314
pr_redis_set_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6315 int pr_redis_set_kadd(pr_redis_t *redis, module *m, const char *key,
6316 size_t keysz, void *value, size_t valuesz) {
6317 errno = ENOSYS;
6318 return -1;
6319 }
6320
pr_redis_set_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)6321 int pr_redis_set_kcount(pr_redis_t *redis, module *m, const char *key,
6322 size_t keysz, uint64_t *count) {
6323 errno = ENOSYS;
6324 return -1;
6325 }
6326
pr_redis_set_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6327 int pr_redis_set_kdelete(pr_redis_t *redis, module *m, const char *key,
6328 size_t keysz, void *value, size_t valuesz) {
6329 errno = ENOSYS;
6330 return -1;
6331 }
6332
pr_redis_set_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6333 int pr_redis_set_kexists(pr_redis_t *redis, module *m, const char *key,
6334 size_t keysz, void *value, size_t valuesz) {
6335 errno = ENOSYS;
6336 return -1;
6337 }
6338
pr_redis_set_kgetall(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header ** values,array_header ** valueszs)6339 int pr_redis_set_kgetall(pool *p, pr_redis_t *redis, module *m, const char *key,
6340 size_t keysz, array_header **values, array_header **valueszs) {
6341 errno = ENOSYS;
6342 return -1;
6343 }
6344
pr_redis_set_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6345 int pr_redis_set_kremove(pr_redis_t *redis, module *m, const char *key,
6346 size_t keysz) {
6347 errno = ENOSYS;
6348 return -1;
6349 }
6350
pr_redis_set_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs)6351 int pr_redis_set_ksetall(pr_redis_t *redis, module *m, const char *key,
6352 size_t keysz, array_header *values, array_header *valueszs) {
6353 errno = ENOSYS;
6354 return -1;
6355 }
6356
pr_redis_sorted_set_kadd(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float score)6357 int pr_redis_sorted_set_kadd(pr_redis_t *redis, module *m, const char *key,
6358 size_t keysz, void *value, size_t valuesz, float score) {
6359 errno = ENOSYS;
6360 return -1;
6361 }
6362
pr_redis_sorted_set_kcount(pr_redis_t * redis,module * m,const char * key,size_t keysz,uint64_t * count)6363 int pr_redis_sorted_set_kcount(pr_redis_t *redis, module *m, const char *key,
6364 size_t keysz, uint64_t *count) {
6365 errno = ENOSYS;
6366 return -1;
6367 }
6368
pr_redis_sorted_set_kdelete(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6369 int pr_redis_sorted_set_kdelete(pr_redis_t *redis, module *m, const char *key,
6370 size_t keysz, void *value, size_t valuesz) {
6371 errno = ENOSYS;
6372 return -1;
6373 }
6374
pr_redis_sorted_set_kexists(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz)6375 int pr_redis_sorted_set_kexists(pr_redis_t *redis, module *m, const char *key,
6376 size_t keysz, void *value, size_t valuesz) {
6377 errno = ENOSYS;
6378 return -1;
6379 }
6380
pr_redis_sorted_set_kgetn(pool * p,pr_redis_t * redis,module * m,const char * key,size_t keysz,unsigned int offset,unsigned int len,array_header ** values,array_header ** valueszs,int flags)6381 int pr_redis_sorted_set_kgetn(pool *p, pr_redis_t *redis, module *m,
6382 const char *key, size_t keysz, unsigned int offset, unsigned int len,
6383 array_header **values, array_header **valueszs, int flags) {
6384 errno = ENOSYS;
6385 return -1;
6386 }
6387
pr_redis_sorted_set_kincr(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float incr,float * score)6388 int pr_redis_sorted_set_kincr(pr_redis_t *redis, module *m, const char *key,
6389 size_t keysz, void *value, size_t valuesz, float incr, float *score) {
6390 errno = ENOSYS;
6391 return -1;
6392 }
6393
pr_redis_sorted_set_kremove(pr_redis_t * redis,module * m,const char * key,size_t keysz)6394 int pr_redis_sorted_set_kremove(pr_redis_t *redis, module *m, const char *key,
6395 size_t keysz) {
6396 errno = ENOSYS;
6397 return -1;
6398 }
6399
pr_redis_sorted_set_kscore(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float * score)6400 int pr_redis_sorted_set_kscore(pr_redis_t *redis, module *m, const char *key,
6401 size_t keysz, void *value, size_t valuesz, float *score) {
6402 errno = ENOSYS;
6403 return -1;
6404 }
6405
pr_redis_sorted_set_kset(pr_redis_t * redis,module * m,const char * key,size_t keysz,void * value,size_t valuesz,float score)6406 int pr_redis_sorted_set_kset(pr_redis_t *redis, module *m, const char *key,
6407 size_t keysz, void *value, size_t valuesz, float score) {
6408 errno = ENOSYS;
6409 return -1;
6410 }
6411
pr_redis_sorted_set_ksetall(pr_redis_t * redis,module * m,const char * key,size_t keysz,array_header * values,array_header * valueszs,array_header * scores)6412 int pr_redis_sorted_set_ksetall(pr_redis_t *redis, module *m, const char *key,
6413 size_t keysz, array_header *values, array_header *valueszs,
6414 array_header *scores) {
6415 errno = ENOSYS;
6416 return -1;
6417 }
6418
pr_redis_sentinel_get_master_addr(pool * p,pr_redis_t * redis,const char * name,pr_netaddr_t ** addr)6419 int pr_redis_sentinel_get_master_addr(pool *p, pr_redis_t *redis,
6420 const char *name, pr_netaddr_t **addr) {
6421 errno = ENOSYS;
6422 return -1;
6423 }
6424
pr_redis_sentinel_get_masters(pool * p,pr_redis_t * redis,array_header ** masters)6425 int pr_redis_sentinel_get_masters(pool *p, pr_redis_t *redis,
6426 array_header **masters) {
6427 errno = ENOSYS;
6428 return -1;
6429 }
6430
redis_set_server(const char * server,int port,unsigned long flags,const char * password,const char * db_idx)6431 int redis_set_server(const char *server, int port, unsigned long flags,
6432 const char *password, const char *db_idx) {
6433 errno = ENOSYS;
6434 return -1;
6435 }
6436
redis_set_sentinels(array_header * sentinels,const char * name)6437 int redis_set_sentinels(array_header *sentinels, const char *name) {
6438 errno = ENOSYS;
6439 return -1;
6440 }
6441
redis_set_timeouts(unsigned long conn_millis,unsigned long io_millis)6442 int redis_set_timeouts(unsigned long conn_millis, unsigned long io_millis) {
6443 errno = ENOSYS;
6444 return -1;
6445 }
6446
redis_clear(void)6447 int redis_clear(void) {
6448 errno = ENOSYS;
6449 return -1;
6450 }
6451
redis_init(void)6452 int redis_init(void) {
6453 errno = ENOSYS;
6454 return -1;
6455 }
6456
6457 #endif /* PR_USE_REDIS */
6458