1 /*
2 * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include "cluster.h"
32 #include "rdb.h"
33 #include <dlfcn.h>
34
35 #define REDISMODULE_CORE 1
36 #include "redismodule.h"
37
38 /* --------------------------------------------------------------------------
39 * Private data structures used by the modules system. Those are data
40 * structures that are never exposed to Redis Modules, if not as void
41 * pointers that have an API the module can call with them)
42 * -------------------------------------------------------------------------- */
43
44 /* This structure represents a module inside the system. */
45 struct RedisModule {
46 void *handle; /* Module dlopen() handle. */
47 char *name; /* Module name. */
48 int ver; /* Module version. We use just progressive integers. */
49 int apiver; /* Module API version as requested during initialization.*/
50 list *types; /* Module data types. */
51 list *usedby; /* List of modules using APIs from this one. */
52 list *using; /* List of modules we use some APIs of. */
53 list *filters; /* List of filters the module has registered. */
54 int in_call; /* RM_Call() nesting level */
55 };
56 typedef struct RedisModule RedisModule;
57
58 /* This represents a shared API. Shared APIs will be used to populate
59 * the server.sharedapi dictionary, mapping names of APIs exported by
60 * modules for other modules to use, to their structure specifying the
61 * function pointer that can be called. */
62 struct RedisModuleSharedAPI {
63 void *func;
64 RedisModule *module;
65 };
66 typedef struct RedisModuleSharedAPI RedisModuleSharedAPI;
67
68 static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/
69
70 /* Entries in the context->amqueue array, representing objects to free
71 * when the callback returns. */
72 struct AutoMemEntry {
73 void *ptr;
74 int type;
75 };
76
77 /* AutMemEntry type field values. */
78 #define REDISMODULE_AM_KEY 0
79 #define REDISMODULE_AM_STRING 1
80 #define REDISMODULE_AM_REPLY 2
81 #define REDISMODULE_AM_FREED 3 /* Explicitly freed by user already. */
82 #define REDISMODULE_AM_DICT 4
83
84 /* The pool allocator block. Redis Modules can allocate memory via this special
85 * allocator that will automatically release it all once the callback returns.
86 * This means that it can only be used for ephemeral allocations. However
87 * there are two advantages for modules to use this API:
88 *
89 * 1) The memory is automatically released when the callback returns.
90 * 2) This allocator is faster for many small allocations since whole blocks
91 * are allocated, and small pieces returned to the caller just advancing
92 * the index of the allocation.
93 *
94 * Allocations are always rounded to the size of the void pointer in order
95 * to always return aligned memory chunks. */
96
97 #define REDISMODULE_POOL_ALLOC_MIN_SIZE (1024*8)
98 #define REDISMODULE_POOL_ALLOC_ALIGN (sizeof(void*))
99
100 typedef struct RedisModulePoolAllocBlock {
101 uint32_t size;
102 uint32_t used;
103 struct RedisModulePoolAllocBlock *next;
104 char memory[];
105 } RedisModulePoolAllocBlock;
106
107 /* This structure represents the context in which Redis modules operate.
108 * Most APIs module can access, get a pointer to the context, so that the API
109 * implementation can hold state across calls, or remember what to free after
110 * the call and so forth.
111 *
112 * Note that not all the context structure is always filled with actual values
113 * but only the fields needed in a given context. */
114
115 struct RedisModuleBlockedClient;
116
117 struct RedisModuleCtx {
118 void *getapifuncptr; /* NOTE: Must be the first field. */
119 struct RedisModule *module; /* Module reference. */
120 client *client; /* Client calling a command. */
121 struct RedisModuleBlockedClient *blocked_client; /* Blocked client for
122 thread safe context. */
123 struct AutoMemEntry *amqueue; /* Auto memory queue of objects to free. */
124 int amqueue_len; /* Number of slots in amqueue. */
125 int amqueue_used; /* Number of used slots in amqueue. */
126 int flags; /* REDISMODULE_CTX_... flags. */
127 void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
128 int postponed_arrays_count; /* Number of entries in postponed_arrays. */
129 void *blocked_privdata; /* Privdata set when unblocking a client. */
130
131 /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
132 int *keys_pos;
133 int keys_count;
134
135 struct RedisModulePoolAllocBlock *pa_head;
136 redisOpArray saved_oparray; /* When propagating commands in a callback
137 we reallocate the "also propagate" op
138 array. Here we save the old one to
139 restore it later. */
140 };
141 typedef struct RedisModuleCtx RedisModuleCtx;
142
143 #define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}}
144 #define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
145 #define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
146 #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
147 #define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
148 #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
149 #define REDISMODULE_CTX_THREAD_SAFE (1<<5)
150 #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6)
151 #define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<7)
152
153 /* This represents a Redis key opened with RM_OpenKey(). */
154 struct RedisModuleKey {
155 RedisModuleCtx *ctx;
156 redisDb *db;
157 robj *key; /* Key name object. */
158 robj *value; /* Value object, or NULL if the key was not found. */
159 void *iter; /* Iterator. */
160 int mode; /* Opening mode. */
161
162 /* Zset iterator. */
163 uint32_t ztype; /* REDISMODULE_ZSET_RANGE_* */
164 zrangespec zrs; /* Score range. */
165 zlexrangespec zlrs; /* Lex range. */
166 uint32_t zstart; /* Start pos for positional ranges. */
167 uint32_t zend; /* End pos for positional ranges. */
168 void *zcurrent; /* Zset iterator current node. */
169 int zer; /* Zset iterator end reached flag
170 (true if end was reached). */
171 };
172 typedef struct RedisModuleKey RedisModuleKey;
173
174 /* RedisModuleKey 'ztype' values. */
175 #define REDISMODULE_ZSET_RANGE_NONE 0 /* This must always be 0. */
176 #define REDISMODULE_ZSET_RANGE_LEX 1
177 #define REDISMODULE_ZSET_RANGE_SCORE 2
178 #define REDISMODULE_ZSET_RANGE_POS 3
179
180 /* Function pointer type of a function representing a command inside
181 * a Redis module. */
182 struct RedisModuleBlockedClient;
183 typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc);
184 typedef void (*RedisModuleDisconnectFunc) (RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc);
185
186 /* This struct holds the information about a command registered by a module.*/
187 struct RedisModuleCommandProxy {
188 struct RedisModule *module;
189 RedisModuleCmdFunc func;
190 struct redisCommand *rediscmd;
191 };
192 typedef struct RedisModuleCommandProxy RedisModuleCommandProxy;
193
194 #define REDISMODULE_REPLYFLAG_NONE 0
195 #define REDISMODULE_REPLYFLAG_TOPARSE (1<<0) /* Protocol must be parsed. */
196 #define REDISMODULE_REPLYFLAG_NESTED (1<<1) /* Nested reply object. No proto
197 or struct free. */
198
199 /* Reply of RM_Call() function. The function is filled in a lazy
200 * way depending on the function called on the reply structure. By default
201 * only the type, proto and protolen are filled. */
202 typedef struct RedisModuleCallReply {
203 RedisModuleCtx *ctx;
204 int type; /* REDISMODULE_REPLY_... */
205 int flags; /* REDISMODULE_REPLYFLAG_... */
206 size_t len; /* Len of strings or num of elements of arrays. */
207 char *proto; /* Raw reply protocol. An SDS string at top-level object. */
208 size_t protolen;/* Length of protocol. */
209 union {
210 const char *str; /* String pointer for string and error replies. This
211 does not need to be freed, always points inside
212 a reply->proto buffer of the reply object or, in
213 case of array elements, of parent reply objects. */
214 long long ll; /* Reply value for integer reply. */
215 struct RedisModuleCallReply *array; /* Array of sub-reply elements. */
216 } val;
217 } RedisModuleCallReply;
218
219 /* Structure representing a blocked client. We get a pointer to such
220 * an object when blocking from modules. */
221 typedef struct RedisModuleBlockedClient {
222 client *client; /* Pointer to the blocked client. or NULL if the client
223 was destroyed during the life of this object. */
224 RedisModule *module; /* Module blocking the client. */
225 RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
226 RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
227 RedisModuleDisconnectFunc disconnect_callback; /* Called on disconnection.*/
228 void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/
229 void *privdata; /* Module private data that may be used by the reply
230 or timeout callback. It is set via the
231 RedisModule_UnblockClient() API. */
232 client *reply_client; /* Fake client used to accumulate replies
233 in thread safe contexts. */
234 int dbid; /* Database number selected by the original client. */
235 } RedisModuleBlockedClient;
236
237 static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
238 static list *moduleUnblockedClients;
239
240 /* We need a mutex that is unlocked / relocked in beforeSleep() in order to
241 * allow thread safe contexts to execute commands at a safe moment. */
242 static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
243
244
245 /* Function pointer type for keyspace event notification subscriptions from modules. */
246 typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
247
248 /* Keyspace notification subscriber information.
249 * See RM_SubscribeToKeyspaceEvents() for more information. */
250 typedef struct RedisModuleKeyspaceSubscriber {
251 /* The module subscribed to the event */
252 RedisModule *module;
253 /* Notification callback in the module*/
254 RedisModuleNotificationFunc notify_callback;
255 /* A bit mask of the events the module is interested in */
256 int event_mask;
257 /* Active flag set on entry, to avoid reentrant subscribers
258 * calling themselves */
259 int active;
260 } RedisModuleKeyspaceSubscriber;
261
262 /* The module keyspace notification subscribers list */
263 static list *moduleKeyspaceSubscribers;
264
265 /* Static client recycled for when we need to provide a context with a client
266 * in a situation where there is no client to provide. This avoidsallocating
267 * a new client per round. For instance this is used in the keyspace
268 * notifications, timers and cluster messages callbacks. */
269 static client *moduleFreeContextReusedClient;
270
271 /* Data structures related to the exported dictionary data structure. */
272 typedef struct RedisModuleDict {
273 rax *rax; /* The radix tree. */
274 } RedisModuleDict;
275
276 typedef struct RedisModuleDictIter {
277 RedisModuleDict *dict;
278 raxIterator ri;
279 } RedisModuleDictIter;
280
281 typedef struct RedisModuleCommandFilterCtx {
282 RedisModuleString **argv;
283 int argc;
284 } RedisModuleCommandFilterCtx;
285
286 typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
287
288 typedef struct RedisModuleCommandFilter {
289 /* The module that registered the filter */
290 RedisModule *module;
291 /* Filter callback function */
292 RedisModuleCommandFilterFunc callback;
293 /* REDISMODULE_CMDFILTER_* flags */
294 int flags;
295 } RedisModuleCommandFilter;
296
297 /* Registered filters */
298 static list *moduleCommandFilters;
299
300 /* Flags for moduleCreateArgvFromUserFormat(). */
301 #define REDISMODULE_ARGV_REPLICATE (1<<0)
302 #define REDISMODULE_ARGV_NO_AOF (1<<1)
303 #define REDISMODULE_ARGV_NO_REPLICAS (1<<2)
304
305 /* --------------------------------------------------------------------------
306 * Prototypes
307 * -------------------------------------------------------------------------- */
308
309 void RM_FreeCallReply(RedisModuleCallReply *reply);
310 void RM_CloseKey(RedisModuleKey *key);
311 void autoMemoryCollect(RedisModuleCtx *ctx);
312 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap);
313 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx);
314 void RM_ZsetRangeStop(RedisModuleKey *kp);
315 static void zsetKeyReset(RedisModuleKey *key);
316 void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d);
317
318 /* --------------------------------------------------------------------------
319 * Heap allocation raw functions
320 * -------------------------------------------------------------------------- */
321
322 /* Use like malloc(). Memory allocated with this function is reported in
323 * Redis INFO memory, used for keys eviction according to maxmemory settings
324 * and in general is taken into account as memory allocated by Redis.
325 * You should avoid using malloc(). */
RM_Alloc(size_t bytes)326 void *RM_Alloc(size_t bytes) {
327 return zmalloc(bytes);
328 }
329
330 /* Use like calloc(). Memory allocated with this function is reported in
331 * Redis INFO memory, used for keys eviction according to maxmemory settings
332 * and in general is taken into account as memory allocated by Redis.
333 * You should avoid using calloc() directly. */
RM_Calloc(size_t nmemb,size_t size)334 void *RM_Calloc(size_t nmemb, size_t size) {
335 return zcalloc(nmemb*size);
336 }
337
338 /* Use like realloc() for memory obtained with RedisModule_Alloc(). */
RM_Realloc(void * ptr,size_t bytes)339 void* RM_Realloc(void *ptr, size_t bytes) {
340 return zrealloc(ptr,bytes);
341 }
342
343 /* Use like free() for memory obtained by RedisModule_Alloc() and
344 * RedisModule_Realloc(). However you should never try to free with
345 * RedisModule_Free() memory allocated with malloc() inside your module. */
RM_Free(void * ptr)346 void RM_Free(void *ptr) {
347 zfree(ptr);
348 }
349
350 /* Like strdup() but returns memory allocated with RedisModule_Alloc(). */
RM_Strdup(const char * str)351 char *RM_Strdup(const char *str) {
352 return zstrdup(str);
353 }
354
355 /* --------------------------------------------------------------------------
356 * Pool allocator
357 * -------------------------------------------------------------------------- */
358
359 /* Release the chain of blocks used for pool allocations. */
poolAllocRelease(RedisModuleCtx * ctx)360 void poolAllocRelease(RedisModuleCtx *ctx) {
361 RedisModulePoolAllocBlock *head = ctx->pa_head, *next;
362
363 while(head != NULL) {
364 next = head->next;
365 zfree(head);
366 head = next;
367 }
368 ctx->pa_head = NULL;
369 }
370
371 /* Return heap allocated memory that will be freed automatically when the
372 * module callback function returns. Mostly suitable for small allocations
373 * that are short living and must be released when the callback returns
374 * anyway. The returned memory is aligned to the architecture word size
375 * if at least word size bytes are requested, otherwise it is just
376 * aligned to the next power of two, so for example a 3 bytes request is
377 * 4 bytes aligned while a 2 bytes request is 2 bytes aligned.
378 *
379 * There is no realloc style function since when this is needed to use the
380 * pool allocator is not a good idea.
381 *
382 * The function returns NULL if `bytes` is 0. */
RM_PoolAlloc(RedisModuleCtx * ctx,size_t bytes)383 void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) {
384 if (bytes == 0) return NULL;
385 RedisModulePoolAllocBlock *b = ctx->pa_head;
386 size_t left = b ? b->size - b->used : 0;
387
388 /* Fix alignment. */
389 if (left >= bytes) {
390 size_t alignment = REDISMODULE_POOL_ALLOC_ALIGN;
391 while (bytes < alignment && alignment/2 >= bytes) alignment /= 2;
392 if (b->used % alignment)
393 b->used += alignment - (b->used % alignment);
394 left = (b->used > b->size) ? 0 : b->size - b->used;
395 }
396
397 /* Create a new block if needed. */
398 if (left < bytes) {
399 size_t blocksize = REDISMODULE_POOL_ALLOC_MIN_SIZE;
400 if (blocksize < bytes) blocksize = bytes;
401 b = zmalloc(sizeof(*b) + blocksize);
402 b->size = blocksize;
403 b->used = 0;
404 b->next = ctx->pa_head;
405 ctx->pa_head = b;
406 }
407
408 char *retval = b->memory + b->used;
409 b->used += bytes;
410 return retval;
411 }
412
413 /* --------------------------------------------------------------------------
414 * Helpers for modules API implementation
415 * -------------------------------------------------------------------------- */
416
417 /* Create an empty key of the specified type. 'kp' must point to a key object
418 * opened for writing where the .value member is set to NULL because the
419 * key was found to be non existing.
420 *
421 * On success REDISMODULE_OK is returned and the key is populated with
422 * the value of the specified type. The function fails and returns
423 * REDISMODULE_ERR if:
424 *
425 * 1) The key is not open for writing.
426 * 2) The key is not empty.
427 * 3) The specified type is unknown.
428 */
moduleCreateEmptyKey(RedisModuleKey * key,int type)429 int moduleCreateEmptyKey(RedisModuleKey *key, int type) {
430 robj *obj;
431
432 /* The key must be open for writing and non existing to proceed. */
433 if (!(key->mode & REDISMODULE_WRITE) || key->value)
434 return REDISMODULE_ERR;
435
436 switch(type) {
437 case REDISMODULE_KEYTYPE_LIST:
438 obj = createQuicklistObject();
439 quicklistSetOptions(obj->ptr, server.list_max_ziplist_size,
440 server.list_compress_depth);
441 break;
442 case REDISMODULE_KEYTYPE_ZSET:
443 obj = createZsetZiplistObject();
444 break;
445 case REDISMODULE_KEYTYPE_HASH:
446 obj = createHashObject();
447 break;
448 default: return REDISMODULE_ERR;
449 }
450 dbAdd(key->db,key->key,obj);
451 key->value = obj;
452 return REDISMODULE_OK;
453 }
454
455 /* This function is called in low-level API implementation functions in order
456 * to check if the value associated with the key remained empty after an
457 * operation that removed elements from an aggregate data type.
458 *
459 * If this happens, the key is deleted from the DB and the key object state
460 * is set to the right one in order to be targeted again by write operations
461 * possibly recreating the key if needed.
462 *
463 * The function returns 1 if the key value object is found empty and is
464 * deleted, otherwise 0 is returned. */
moduleDelKeyIfEmpty(RedisModuleKey * key)465 int moduleDelKeyIfEmpty(RedisModuleKey *key) {
466 if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL) return 0;
467 int isempty;
468 robj *o = key->value;
469
470 switch(o->type) {
471 case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
472 case OBJ_SET: isempty = setTypeSize(o) == 0; break;
473 case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
474 case OBJ_HASH: isempty = hashTypeLength(o) == 0; break;
475 case OBJ_STREAM: isempty = streamLength(o) == 0; break;
476 default: isempty = 0;
477 }
478
479 if (isempty) {
480 dbDelete(key->db,key->key);
481 key->value = NULL;
482 return 1;
483 } else {
484 return 0;
485 }
486 }
487
488 /* --------------------------------------------------------------------------
489 * Service API exported to modules
490 *
491 * Note that all the exported APIs are called RM_<funcname> in the core
492 * and RedisModule_<funcname> in the module side (defined as function
493 * pointers in redismodule.h). In this way the dynamic linker does not
494 * mess with our global function pointers, overriding it with the symbols
495 * defined in the main executable having the same names.
496 * -------------------------------------------------------------------------- */
497
498 /* Lookup the requested module API and store the function pointer into the
499 * target pointer. The function returns REDISMODULE_ERR if there is no such
500 * named API, otherwise REDISMODULE_OK.
501 *
502 * This function is not meant to be used by modules developer, it is only
503 * used implicitly by including redismodule.h. */
RM_GetApi(const char * funcname,void ** targetPtrPtr)504 int RM_GetApi(const char *funcname, void **targetPtrPtr) {
505 dictEntry *he = dictFind(server.moduleapi, funcname);
506 if (!he) return REDISMODULE_ERR;
507 *targetPtrPtr = dictGetVal(he);
508 return REDISMODULE_OK;
509 }
510
511 /* Helper function for when a command callback is called, in order to handle
512 * details needed to correctly replicate commands. */
moduleHandlePropagationAfterCommandCallback(RedisModuleCtx * ctx)513 void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
514 client *c = ctx->client;
515
516 /* We don't need to do anything here if the context was never used
517 * in order to propagate commands. */
518 if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;
519
520 if (c->flags & CLIENT_LUA) return;
521
522 /* Handle the replication of the final EXEC, since whatever a command
523 * emits is always wrapped around MULTI/EXEC. */
524 robj *propargv[1];
525 propargv[0] = createStringObject("EXEC",4);
526 alsoPropagate(server.execCommand,c->db->id,propargv,1,
527 PROPAGATE_AOF|PROPAGATE_REPL);
528 decrRefCount(propargv[0]);
529
530 /* If this is not a module command context (but is instead a simple
531 * callback context), we have to handle directly the "also propagate"
532 * array and emit it. In a module command call this will be handled
533 * directly by call(). */
534 if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) &&
535 server.also_propagate.numops)
536 {
537 for (int j = 0; j < server.also_propagate.numops; j++) {
538 redisOp *rop = &server.also_propagate.ops[j];
539 int target = rop->target;
540 if (target)
541 propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
542 }
543 redisOpArrayFree(&server.also_propagate);
544 /* Restore the previous oparray in case of nexted use of the API. */
545 server.also_propagate = ctx->saved_oparray;
546 /* We're done with saved_oparray, let's invalidate it. */
547 redisOpArrayInit(&ctx->saved_oparray);
548 }
549 }
550
551 /* Free the context after the user function was called. */
moduleFreeContext(RedisModuleCtx * ctx)552 void moduleFreeContext(RedisModuleCtx *ctx) {
553 moduleHandlePropagationAfterCommandCallback(ctx);
554 autoMemoryCollect(ctx);
555 poolAllocRelease(ctx);
556 if (ctx->postponed_arrays) {
557 zfree(ctx->postponed_arrays);
558 ctx->postponed_arrays_count = 0;
559 serverLog(LL_WARNING,
560 "API misuse detected in module %s: "
561 "RedisModule_ReplyWithArray(REDISMODULE_POSTPONED_ARRAY_LEN) "
562 "not matched by the same number of RedisModule_SetReplyArrayLen() "
563 "calls.",
564 ctx->module->name);
565 }
566 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
567 }
568
569 /* This Redis command binds the normal Redis command invocation with commands
570 * exported by modules. */
RedisModuleCommandDispatcher(client * c)571 void RedisModuleCommandDispatcher(client *c) {
572 RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
573 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
574
575 ctx.flags |= REDISMODULE_CTX_MODULE_COMMAND_CALL;
576 ctx.module = cp->module;
577 ctx.client = c;
578 cp->func(&ctx,(void**)c->argv,c->argc);
579 moduleFreeContext(&ctx);
580
581 /* In some cases processMultibulkBuffer uses sdsMakeRoomFor to
582 * expand the query buffer, and in order to avoid a big object copy
583 * the query buffer SDS may be used directly as the SDS string backing
584 * the client argument vectors: sometimes this will result in the SDS
585 * string having unused space at the end. Later if a module takes ownership
586 * of the RedisString, such space will be wasted forever. Inside the
587 * Redis core this is not a problem because tryObjectEncoding() is called
588 * before storing strings in the key space. Here we need to do it
589 * for the module. */
590 for (int i = 0; i < c->argc; i++) {
591 /* Only do the work if the module took ownership of the object:
592 * in that case the refcount is no longer 1. */
593 if (c->argv[i]->refcount > 1)
594 trimStringObjectIfNeeded(c->argv[i]);
595 }
596 }
597
598 /* This function returns the list of keys, with the same interface as the
599 * 'getkeys' function of the native commands, for module commands that exported
600 * the "getkeys-api" flag during the registration. This is done when the
601 * list of keys are not at fixed positions, so that first/last/step cannot
602 * be used.
603 *
604 * In order to accomplish its work, the module command is called, flagging
605 * the context in a way that the command can recognize this is a special
606 * "get keys" call by calling RedisModule_IsKeysPositionRequest(ctx). */
moduleGetCommandKeysViaAPI(struct redisCommand * cmd,robj ** argv,int argc,int * numkeys)607 int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
608 RedisModuleCommandProxy *cp = (void*)(unsigned long)cmd->getkeys_proc;
609 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
610
611 ctx.module = cp->module;
612 ctx.client = NULL;
613 ctx.flags |= REDISMODULE_CTX_KEYS_POS_REQUEST;
614 cp->func(&ctx,(void**)argv,argc);
615 int *res = ctx.keys_pos;
616 if (numkeys) *numkeys = ctx.keys_count;
617 moduleFreeContext(&ctx);
618 return res;
619 }
620
621 /* Return non-zero if a module command, that was declared with the
622 * flag "getkeys-api", is called in a special way to get the keys positions
623 * and not to get executed. Otherwise zero is returned. */
RM_IsKeysPositionRequest(RedisModuleCtx * ctx)624 int RM_IsKeysPositionRequest(RedisModuleCtx *ctx) {
625 return (ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST) != 0;
626 }
627
628 /* When a module command is called in order to obtain the position of
629 * keys, since it was flagged as "getkeys-api" during the registration,
630 * the command implementation checks for this special call using the
631 * RedisModule_IsKeysPositionRequest() API and uses this function in
632 * order to report keys, like in the following example:
633 *
634 * if (RedisModule_IsKeysPositionRequest(ctx)) {
635 * RedisModule_KeyAtPos(ctx,1);
636 * RedisModule_KeyAtPos(ctx,2);
637 * }
638 *
639 * Note: in the example below the get keys API would not be needed since
640 * keys are at fixed positions. This interface is only used for commands
641 * with a more complex structure. */
RM_KeyAtPos(RedisModuleCtx * ctx,int pos)642 void RM_KeyAtPos(RedisModuleCtx *ctx, int pos) {
643 if (!(ctx->flags & REDISMODULE_CTX_KEYS_POS_REQUEST)) return;
644 if (pos <= 0) return;
645 ctx->keys_pos = zrealloc(ctx->keys_pos,sizeof(int)*(ctx->keys_count+1));
646 ctx->keys_pos[ctx->keys_count++] = pos;
647 }
648
649 /* Helper for RM_CreateCommand(). Turns a string representing command
650 * flags into the command flags used by the Redis core.
651 *
652 * It returns the set of flags, or -1 if unknown flags are found. */
commandFlagsFromString(char * s)653 int64_t commandFlagsFromString(char *s) {
654 int count, j;
655 int64_t flags = 0;
656 sds *tokens = sdssplitlen(s,strlen(s)," ",1,&count);
657 for (j = 0; j < count; j++) {
658 char *t = tokens[j];
659 if (!strcasecmp(t,"write")) flags |= CMD_WRITE;
660 else if (!strcasecmp(t,"readonly")) flags |= CMD_READONLY;
661 else if (!strcasecmp(t,"admin")) flags |= CMD_ADMIN;
662 else if (!strcasecmp(t,"deny-oom")) flags |= CMD_DENYOOM;
663 else if (!strcasecmp(t,"deny-script")) flags |= CMD_NOSCRIPT;
664 else if (!strcasecmp(t,"allow-loading")) flags |= CMD_LOADING;
665 else if (!strcasecmp(t,"pubsub")) flags |= CMD_PUBSUB;
666 else if (!strcasecmp(t,"random")) flags |= CMD_RANDOM;
667 else if (!strcasecmp(t,"allow-stale")) flags |= CMD_STALE;
668 else if (!strcasecmp(t,"no-monitor")) flags |= CMD_SKIP_MONITOR;
669 else if (!strcasecmp(t,"fast")) flags |= CMD_FAST;
670 else if (!strcasecmp(t,"getkeys-api")) flags |= CMD_MODULE_GETKEYS;
671 else if (!strcasecmp(t,"no-cluster")) flags |= CMD_MODULE_NO_CLUSTER;
672 else break;
673 }
674 sdsfreesplitres(tokens,count);
675 if (j != count) return -1; /* Some token not processed correctly. */
676 return flags;
677 }
678
679 /* Register a new command in the Redis server, that will be handled by
680 * calling the function pointer 'func' using the RedisModule calling
681 * convention. The function returns REDISMODULE_ERR if the specified command
682 * name is already busy or a set of invalid flags were passed, otherwise
683 * REDISMODULE_OK is returned and the new command is registered.
684 *
685 * This function must be called during the initialization of the module
686 * inside the RedisModule_OnLoad() function. Calling this function outside
687 * of the initialization function is not defined.
688 *
689 * The command function type is the following:
690 *
691 * int MyCommand_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
692 *
693 * And is supposed to always return REDISMODULE_OK.
694 *
695 * The set of flags 'strflags' specify the behavior of the command, and should
696 * be passed as a C string composed of space separated words, like for
697 * example "write deny-oom". The set of flags are:
698 *
699 * * **"write"**: The command may modify the data set (it may also read
700 * from it).
701 * * **"readonly"**: The command returns data from keys but never writes.
702 * * **"admin"**: The command is an administrative command (may change
703 * replication or perform similar tasks).
704 * * **"deny-oom"**: The command may use additional memory and should be
705 * denied during out of memory conditions.
706 * * **"deny-script"**: Don't allow this command in Lua scripts.
707 * * **"allow-loading"**: Allow this command while the server is loading data.
708 * Only commands not interacting with the data set
709 * should be allowed to run in this mode. If not sure
710 * don't use this flag.
711 * * **"pubsub"**: The command publishes things on Pub/Sub channels.
712 * * **"random"**: The command may have different outputs even starting
713 * from the same input arguments and key values.
714 * * **"allow-stale"**: The command is allowed to run on slaves that don't
715 * serve stale data. Don't use if you don't know what
716 * this means.
717 * * **"no-monitor"**: Don't propagate the command on monitor. Use this if
718 * the command has sensible data among the arguments.
719 * * **"fast"**: The command time complexity is not greater
720 * than O(log(N)) where N is the size of the collection or
721 * anything else representing the normal scalability
722 * issue with the command.
723 * * **"getkeys-api"**: The command implements the interface to return
724 * the arguments that are keys. Used when start/stop/step
725 * is not enough because of the command syntax.
726 * * **"no-cluster"**: The command should not register in Redis Cluster
727 * since is not designed to work with it because, for
728 * example, is unable to report the position of the
729 * keys, programmatically creates key names, or any
730 * other reason.
731 */
RM_CreateCommand(RedisModuleCtx * ctx,const char * name,RedisModuleCmdFunc cmdfunc,const char * strflags,int firstkey,int lastkey,int keystep)732 int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
733 int64_t flags = strflags ? commandFlagsFromString((char*)strflags) : 0;
734 if (flags == -1) return REDISMODULE_ERR;
735 if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled)
736 return REDISMODULE_ERR;
737
738 struct redisCommand *rediscmd;
739 RedisModuleCommandProxy *cp;
740 sds cmdname = sdsnew(name);
741
742 /* Check if the command name is busy. */
743 if (lookupCommand(cmdname) != NULL) {
744 sdsfree(cmdname);
745 return REDISMODULE_ERR;
746 }
747
748 /* Create a command "proxy", which is a structure that is referenced
749 * in the command table, so that the generic command that works as
750 * binding between modules and Redis, can know what function to call
751 * and what the module is.
752 *
753 * Note that we use the Redis command table 'getkeys_proc' in order to
754 * pass a reference to the command proxy structure. */
755 cp = zmalloc(sizeof(*cp));
756 cp->module = ctx->module;
757 cp->func = cmdfunc;
758 cp->rediscmd = zmalloc(sizeof(*rediscmd));
759 cp->rediscmd->name = cmdname;
760 cp->rediscmd->proc = RedisModuleCommandDispatcher;
761 cp->rediscmd->arity = -1;
762 cp->rediscmd->flags = flags | CMD_MODULE;
763 cp->rediscmd->getkeys_proc = (redisGetKeysProc*)(unsigned long)cp;
764 cp->rediscmd->firstkey = firstkey;
765 cp->rediscmd->lastkey = lastkey;
766 cp->rediscmd->keystep = keystep;
767 cp->rediscmd->microseconds = 0;
768 cp->rediscmd->calls = 0;
769 dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd);
770 dictAdd(server.orig_commands,sdsdup(cmdname),cp->rediscmd);
771 return REDISMODULE_OK;
772 }
773
774 /* Called by RM_Init() to setup the `ctx->module` structure.
775 *
776 * This is an internal function, Redis modules developers don't need
777 * to use it. */
RM_SetModuleAttribs(RedisModuleCtx * ctx,const char * name,int ver,int apiver)778 void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
779 RedisModule *module;
780
781 if (ctx->module != NULL) return;
782 module = zmalloc(sizeof(*module));
783 module->name = sdsnew((char*)name);
784 module->ver = ver;
785 module->apiver = apiver;
786 module->types = listCreate();
787 module->usedby = listCreate();
788 module->using = listCreate();
789 module->filters = listCreate();
790 module->in_call = 0;
791 ctx->module = module;
792 }
793
794 /* Return non-zero if the module name is busy.
795 * Otherwise zero is returned. */
RM_IsModuleNameBusy(const char * name)796 int RM_IsModuleNameBusy(const char *name) {
797 sds modulename = sdsnew(name);
798 dictEntry *de = dictFind(modules,modulename);
799 sdsfree(modulename);
800 return de != NULL;
801 }
802
803 /* Return the current UNIX time in milliseconds. */
RM_Milliseconds(void)804 long long RM_Milliseconds(void) {
805 return mstime();
806 }
807
808 /* --------------------------------------------------------------------------
809 * Automatic memory management for modules
810 * -------------------------------------------------------------------------- */
811
812 /* Enable automatic memory management. See API.md for more information.
813 *
814 * The function must be called as the first function of a command implementation
815 * that wants to use automatic memory. */
RM_AutoMemory(RedisModuleCtx * ctx)816 void RM_AutoMemory(RedisModuleCtx *ctx) {
817 ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
818 }
819
820 /* Add a new object to release automatically when the callback returns. */
autoMemoryAdd(RedisModuleCtx * ctx,int type,void * ptr)821 void autoMemoryAdd(RedisModuleCtx *ctx, int type, void *ptr) {
822 if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
823 if (ctx->amqueue_used == ctx->amqueue_len) {
824 ctx->amqueue_len *= 2;
825 if (ctx->amqueue_len < 16) ctx->amqueue_len = 16;
826 ctx->amqueue = zrealloc(ctx->amqueue,sizeof(struct AutoMemEntry)*ctx->amqueue_len);
827 }
828 ctx->amqueue[ctx->amqueue_used].type = type;
829 ctx->amqueue[ctx->amqueue_used].ptr = ptr;
830 ctx->amqueue_used++;
831 }
832
833 /* Mark an object as freed in the auto release queue, so that users can still
834 * free things manually if they want.
835 *
836 * The function returns 1 if the object was actually found in the auto memory
837 * pool, otherwise 0 is returned. */
autoMemoryFreed(RedisModuleCtx * ctx,int type,void * ptr)838 int autoMemoryFreed(RedisModuleCtx *ctx, int type, void *ptr) {
839 if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return 0;
840
841 int count = (ctx->amqueue_used+1)/2;
842 for (int j = 0; j < count; j++) {
843 for (int side = 0; side < 2; side++) {
844 /* For side = 0 check right side of the array, for
845 * side = 1 check the left side instead (zig-zag scanning). */
846 int i = (side == 0) ? (ctx->amqueue_used - 1 - j) : j;
847 if (ctx->amqueue[i].type == type &&
848 ctx->amqueue[i].ptr == ptr)
849 {
850 ctx->amqueue[i].type = REDISMODULE_AM_FREED;
851
852 /* Switch the freed element and the last element, to avoid growing
853 * the queue unnecessarily if we allocate/free in a loop */
854 if (i != ctx->amqueue_used-1) {
855 ctx->amqueue[i] = ctx->amqueue[ctx->amqueue_used-1];
856 }
857
858 /* Reduce the size of the queue because we either moved the top
859 * element elsewhere or freed it */
860 ctx->amqueue_used--;
861 return 1;
862 }
863 }
864 }
865 return 0;
866 }
867
868 /* Release all the objects in queue. */
autoMemoryCollect(RedisModuleCtx * ctx)869 void autoMemoryCollect(RedisModuleCtx *ctx) {
870 if (!(ctx->flags & REDISMODULE_CTX_AUTO_MEMORY)) return;
871 /* Clear the AUTO_MEMORY flag from the context, otherwise the functions
872 * we call to free the resources, will try to scan the auto release
873 * queue to mark the entries as freed. */
874 ctx->flags &= ~REDISMODULE_CTX_AUTO_MEMORY;
875 int j;
876 for (j = 0; j < ctx->amqueue_used; j++) {
877 void *ptr = ctx->amqueue[j].ptr;
878 switch(ctx->amqueue[j].type) {
879 case REDISMODULE_AM_STRING: decrRefCount(ptr); break;
880 case REDISMODULE_AM_REPLY: RM_FreeCallReply(ptr); break;
881 case REDISMODULE_AM_KEY: RM_CloseKey(ptr); break;
882 case REDISMODULE_AM_DICT: RM_FreeDict(NULL,ptr); break;
883 }
884 }
885 ctx->flags |= REDISMODULE_CTX_AUTO_MEMORY;
886 zfree(ctx->amqueue);
887 ctx->amqueue = NULL;
888 ctx->amqueue_len = 0;
889 ctx->amqueue_used = 0;
890 }
891
892 /* --------------------------------------------------------------------------
893 * String objects APIs
894 * -------------------------------------------------------------------------- */
895
896 /* Create a new module string object. The returned string must be freed
897 * with RedisModule_FreeString(), unless automatic memory is enabled.
898 *
899 * The string is created by copying the `len` bytes starting
900 * at `ptr`. No reference is retained to the passed buffer.
901 *
902 * The module context 'ctx' is optional and may be NULL if you want to create
903 * a string out of the context scope. However in that case, the automatic
904 * memory management will not be available, and the string memory must be
905 * managed manually. */
RM_CreateString(RedisModuleCtx * ctx,const char * ptr,size_t len)906 RedisModuleString *RM_CreateString(RedisModuleCtx *ctx, const char *ptr, size_t len) {
907 RedisModuleString *o = createStringObject(ptr,len);
908 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
909 return o;
910 }
911
912 /* Create a new module string object from a printf format and arguments.
913 * The returned string must be freed with RedisModule_FreeString(), unless
914 * automatic memory is enabled.
915 *
916 * The string is created using the sds formatter function sdscatvprintf().
917 *
918 * The passed context 'ctx' may be NULL if necessary, see the
919 * RedisModule_CreateString() documentation for more info. */
RM_CreateStringPrintf(RedisModuleCtx * ctx,const char * fmt,...)920 RedisModuleString *RM_CreateStringPrintf(RedisModuleCtx *ctx, const char *fmt, ...) {
921 sds s = sdsempty();
922
923 va_list ap;
924 va_start(ap, fmt);
925 s = sdscatvprintf(s, fmt, ap);
926 va_end(ap);
927
928 RedisModuleString *o = createObject(OBJ_STRING, s);
929 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
930
931 return o;
932 }
933
934
935 /* Like RedisModule_CreatString(), but creates a string starting from a long long
936 * integer instead of taking a buffer and its length.
937 *
938 * The returned string must be released with RedisModule_FreeString() or by
939 * enabling automatic memory management.
940 *
941 * The passed context 'ctx' may be NULL if necessary, see the
942 * RedisModule_CreateString() documentation for more info. */
RM_CreateStringFromLongLong(RedisModuleCtx * ctx,long long ll)943 RedisModuleString *RM_CreateStringFromLongLong(RedisModuleCtx *ctx, long long ll) {
944 char buf[LONG_STR_SIZE];
945 size_t len = ll2string(buf,sizeof(buf),ll);
946 return RM_CreateString(ctx,buf,len);
947 }
948
949 /* Like RedisModule_CreatString(), but creates a string starting from another
950 * RedisModuleString.
951 *
952 * The returned string must be released with RedisModule_FreeString() or by
953 * enabling automatic memory management.
954 *
955 * The passed context 'ctx' may be NULL if necessary, see the
956 * RedisModule_CreateString() documentation for more info. */
RM_CreateStringFromString(RedisModuleCtx * ctx,const RedisModuleString * str)957 RedisModuleString *RM_CreateStringFromString(RedisModuleCtx *ctx, const RedisModuleString *str) {
958 RedisModuleString *o = dupStringObject(str);
959 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_STRING,o);
960 return o;
961 }
962
963 /* Free a module string object obtained with one of the Redis modules API calls
964 * that return new string objects.
965 *
966 * It is possible to call this function even when automatic memory management
967 * is enabled. In that case the string will be released ASAP and removed
968 * from the pool of string to release at the end.
969 *
970 * If the string was created with a NULL context 'ctx', it is also possible to
971 * pass ctx as NULL when releasing the string (but passing a context will not
972 * create any issue). Strings created with a context should be freed also passing
973 * the context, so if you want to free a string out of context later, make sure
974 * to create it using a NULL context. */
RM_FreeString(RedisModuleCtx * ctx,RedisModuleString * str)975 void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
976 decrRefCount(str);
977 if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str);
978 }
979
980 /* Every call to this function, will make the string 'str' requiring
981 * an additional call to RedisModule_FreeString() in order to really
982 * free the string. Note that the automatic freeing of the string obtained
983 * enabling modules automatic memory management counts for one
984 * RedisModule_FreeString() call (it is just executed automatically).
985 *
986 * Normally you want to call this function when, at the same time
987 * the following conditions are true:
988 *
989 * 1) You have automatic memory management enabled.
990 * 2) You want to create string objects.
991 * 3) Those string objects you create need to live *after* the callback
992 * function(for example a command implementation) creating them returns.
993 *
994 * Usually you want this in order to store the created string object
995 * into your own data structure, for example when implementing a new data
996 * type.
997 *
998 * Note that when memory management is turned off, you don't need
999 * any call to RetainString() since creating a string will always result
1000 * into a string that lives after the callback function returns, if
1001 * no FreeString() call is performed.
1002 *
1003 * It is possible to call this function with a NULL context. */
RM_RetainString(RedisModuleCtx * ctx,RedisModuleString * str)1004 void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
1005 if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) {
1006 /* Increment the string reference counting only if we can't
1007 * just remove the object from the list of objects that should
1008 * be reclaimed. Why we do that, instead of just incrementing
1009 * the refcount in any case, and let the automatic FreeString()
1010 * call at the end to bring the refcount back at the desired
1011 * value? Because this way we ensure that the object refcount
1012 * value is 1 (instead of going to 2 to be dropped later to 1)
1013 * after the call to this function. This is needed for functions
1014 * like RedisModule_StringAppendBuffer() to work. */
1015 incrRefCount(str);
1016 }
1017 }
1018
1019 /* Given a string module object, this function returns the string pointer
1020 * and length of the string. The returned pointer and length should only
1021 * be used for read only accesses and never modified. */
RM_StringPtrLen(const RedisModuleString * str,size_t * len)1022 const char *RM_StringPtrLen(const RedisModuleString *str, size_t *len) {
1023 if (str == NULL) {
1024 const char *errmsg = "(NULL string reply referenced in module)";
1025 if (len) *len = strlen(errmsg);
1026 return errmsg;
1027 }
1028 if (len) *len = sdslen(str->ptr);
1029 return str->ptr;
1030 }
1031
1032 /* --------------------------------------------------------------------------
1033 * Higher level string operations
1034 * ------------------------------------------------------------------------- */
1035
1036 /* Convert the string into a long long integer, storing it at `*ll`.
1037 * Returns REDISMODULE_OK on success. If the string can't be parsed
1038 * as a valid, strict long long (no spaces before/after), REDISMODULE_ERR
1039 * is returned. */
RM_StringToLongLong(const RedisModuleString * str,long long * ll)1040 int RM_StringToLongLong(const RedisModuleString *str, long long *ll) {
1041 return string2ll(str->ptr,sdslen(str->ptr),ll) ? REDISMODULE_OK :
1042 REDISMODULE_ERR;
1043 }
1044
1045 /* Convert the string into a double, storing it at `*d`.
1046 * Returns REDISMODULE_OK on success or REDISMODULE_ERR if the string is
1047 * not a valid string representation of a double value. */
RM_StringToDouble(const RedisModuleString * str,double * d)1048 int RM_StringToDouble(const RedisModuleString *str, double *d) {
1049 int retval = getDoubleFromObject(str,d);
1050 return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
1051 }
1052
1053 /* Compare two string objects, returning -1, 0 or 1 respectively if
1054 * a < b, a == b, a > b. Strings are compared byte by byte as two
1055 * binary blobs without any encoding care / collation attempt. */
RM_StringCompare(RedisModuleString * a,RedisModuleString * b)1056 int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) {
1057 return compareStringObjects(a,b);
1058 }
1059
1060 /* Return the (possibly modified in encoding) input 'str' object if
1061 * the string is unshared, otherwise NULL is returned. */
moduleAssertUnsharedString(RedisModuleString * str)1062 RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) {
1063 if (str->refcount != 1) {
1064 serverLog(LL_WARNING,
1065 "Module attempted to use an in-place string modify operation "
1066 "with a string referenced multiple times. Please check the code "
1067 "for API usage correctness.");
1068 return NULL;
1069 }
1070 if (str->encoding == OBJ_ENCODING_EMBSTR) {
1071 /* Note: here we "leak" the additional allocation that was
1072 * used in order to store the embedded string in the object. */
1073 str->ptr = sdsnewlen(str->ptr,sdslen(str->ptr));
1074 str->encoding = OBJ_ENCODING_RAW;
1075 } else if (str->encoding == OBJ_ENCODING_INT) {
1076 /* Convert the string from integer to raw encoding. */
1077 str->ptr = sdsfromlonglong((long)str->ptr);
1078 str->encoding = OBJ_ENCODING_RAW;
1079 }
1080 return str;
1081 }
1082
1083 /* Append the specified buffer to the string 'str'. The string must be a
1084 * string created by the user that is referenced only a single time, otherwise
1085 * REDISMODULE_ERR is returned and the operation is not performed. */
RM_StringAppendBuffer(RedisModuleCtx * ctx,RedisModuleString * str,const char * buf,size_t len)1086 int RM_StringAppendBuffer(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len) {
1087 UNUSED(ctx);
1088 str = moduleAssertUnsharedString(str);
1089 if (str == NULL) return REDISMODULE_ERR;
1090 str->ptr = sdscatlen(str->ptr,buf,len);
1091 return REDISMODULE_OK;
1092 }
1093
1094 /* --------------------------------------------------------------------------
1095 * Reply APIs
1096 *
1097 * Most functions always return REDISMODULE_OK so you can use it with
1098 * 'return' in order to return from the command implementation with:
1099 *
1100 * if (... some condition ...)
1101 * return RM_ReplyWithLongLong(ctx,mycount);
1102 * -------------------------------------------------------------------------- */
1103
1104 /* Send an error about the number of arguments given to the command,
1105 * citing the command name in the error message.
1106 *
1107 * Example:
1108 *
1109 * if (argc != 3) return RedisModule_WrongArity(ctx);
1110 */
RM_WrongArity(RedisModuleCtx * ctx)1111 int RM_WrongArity(RedisModuleCtx *ctx) {
1112 addReplyErrorFormat(ctx->client,
1113 "wrong number of arguments for '%s' command",
1114 (char*)ctx->client->argv[0]->ptr);
1115 return REDISMODULE_OK;
1116 }
1117
1118 /* Return the client object the `RM_Reply*` functions should target.
1119 * Normally this is just `ctx->client`, that is the client that called
1120 * the module command, however in the case of thread safe contexts there
1121 * is no directly associated client (since it would not be safe to access
1122 * the client from a thread), so instead the blocked client object referenced
1123 * in the thread safe context, has a fake client that we just use to accumulate
1124 * the replies. Later, when the client is unblocked, the accumulated replies
1125 * are appended to the actual client.
1126 *
1127 * The function returns the client pointer depending on the context, or
1128 * NULL if there is no potential client. This happens when we are in the
1129 * context of a thread safe context that was not initialized with a blocked
1130 * client object. Other contexts without associated clients are the ones
1131 * initialized to run the timers callbacks. */
moduleGetReplyClient(RedisModuleCtx * ctx)1132 client *moduleGetReplyClient(RedisModuleCtx *ctx) {
1133 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
1134 if (ctx->blocked_client)
1135 return ctx->blocked_client->reply_client;
1136 else
1137 return NULL;
1138 } else {
1139 /* If this is a non thread safe context, just return the client
1140 * that is running the command if any. This may be NULL as well
1141 * in the case of contexts that are not executed with associated
1142 * clients, like timer contexts. */
1143 return ctx->client;
1144 }
1145 }
1146
1147 /* Send an integer reply to the client, with the specified long long value.
1148 * The function always returns REDISMODULE_OK. */
RM_ReplyWithLongLong(RedisModuleCtx * ctx,long long ll)1149 int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) {
1150 client *c = moduleGetReplyClient(ctx);
1151 if (c == NULL) return REDISMODULE_OK;
1152 addReplyLongLong(c,ll);
1153 return REDISMODULE_OK;
1154 }
1155
1156 /* Reply with an error or simple string (status message). Used to implement
1157 * ReplyWithSimpleString() and ReplyWithError().
1158 * The function always returns REDISMODULE_OK. */
replyWithStatus(RedisModuleCtx * ctx,const char * msg,char * prefix)1159 int replyWithStatus(RedisModuleCtx *ctx, const char *msg, char *prefix) {
1160 client *c = moduleGetReplyClient(ctx);
1161 if (c == NULL) return REDISMODULE_OK;
1162 sds strmsg = sdsnewlen(prefix,1);
1163 strmsg = sdscat(strmsg,msg);
1164 strmsg = sdscatlen(strmsg,"\r\n",2);
1165 addReplySds(c,strmsg);
1166 return REDISMODULE_OK;
1167 }
1168
1169 /* Reply with the error 'err'.
1170 *
1171 * Note that 'err' must contain all the error, including
1172 * the initial error code. The function only provides the initial "-", so
1173 * the usage is, for example:
1174 *
1175 * RedisModule_ReplyWithError(ctx,"ERR Wrong Type");
1176 *
1177 * and not just:
1178 *
1179 * RedisModule_ReplyWithError(ctx,"Wrong Type");
1180 *
1181 * The function always returns REDISMODULE_OK.
1182 */
RM_ReplyWithError(RedisModuleCtx * ctx,const char * err)1183 int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) {
1184 return replyWithStatus(ctx,err,"-");
1185 }
1186
1187 /* Reply with a simple string (+... \r\n in RESP protocol). This replies
1188 * are suitable only when sending a small non-binary string with small
1189 * overhead, like "OK" or similar replies.
1190 *
1191 * The function always returns REDISMODULE_OK. */
RM_ReplyWithSimpleString(RedisModuleCtx * ctx,const char * msg)1192 int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) {
1193 return replyWithStatus(ctx,msg,"+");
1194 }
1195
1196 /* Reply with an array type of 'len' elements. However 'len' other calls
1197 * to `ReplyWith*` style functions must follow in order to emit the elements
1198 * of the array.
1199 *
1200 * When producing arrays with a number of element that is not known beforehand
1201 * the function can be called with the special count
1202 * REDISMODULE_POSTPONED_ARRAY_LEN, and the actual number of elements can be
1203 * later set with RedisModule_ReplySetArrayLength() (which will set the
1204 * latest "open" count if there are multiple ones).
1205 *
1206 * The function always returns REDISMODULE_OK. */
RM_ReplyWithArray(RedisModuleCtx * ctx,long len)1207 int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) {
1208 client *c = moduleGetReplyClient(ctx);
1209 if (c == NULL) return REDISMODULE_OK;
1210 if (len == REDISMODULE_POSTPONED_ARRAY_LEN) {
1211 ctx->postponed_arrays = zrealloc(ctx->postponed_arrays,sizeof(void*)*
1212 (ctx->postponed_arrays_count+1));
1213 ctx->postponed_arrays[ctx->postponed_arrays_count] =
1214 addDeferredMultiBulkLength(c);
1215 ctx->postponed_arrays_count++;
1216 } else {
1217 addReplyMultiBulkLen(c,len);
1218 }
1219 return REDISMODULE_OK;
1220 }
1221
1222 /* When RedisModule_ReplyWithArray() is used with the argument
1223 * REDISMODULE_POSTPONED_ARRAY_LEN, because we don't know beforehand the number
1224 * of items we are going to output as elements of the array, this function
1225 * will take care to set the array length.
1226 *
1227 * Since it is possible to have multiple array replies pending with unknown
1228 * length, this function guarantees to always set the latest array length
1229 * that was created in a postponed way.
1230 *
1231 * For example in order to output an array like [1,[10,20,30]] we
1232 * could write:
1233 *
1234 * RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
1235 * RedisModule_ReplyWithLongLong(ctx,1);
1236 * RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
1237 * RedisModule_ReplyWithLongLong(ctx,10);
1238 * RedisModule_ReplyWithLongLong(ctx,20);
1239 * RedisModule_ReplyWithLongLong(ctx,30);
1240 * RedisModule_ReplySetArrayLength(ctx,3); // Set len of 10,20,30 array.
1241 * RedisModule_ReplySetArrayLength(ctx,2); // Set len of top array
1242 *
1243 * Note that in the above example there is no reason to postpone the array
1244 * length, since we produce a fixed number of elements, but in the practice
1245 * the code may use an iterator or other ways of creating the output so
1246 * that is not easy to calculate in advance the number of elements.
1247 */
RM_ReplySetArrayLength(RedisModuleCtx * ctx,long len)1248 void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) {
1249 client *c = moduleGetReplyClient(ctx);
1250 if (c == NULL) return;
1251 if (ctx->postponed_arrays_count == 0) {
1252 serverLog(LL_WARNING,
1253 "API misuse detected in module %s: "
1254 "RedisModule_ReplySetArrayLength() called without previous "
1255 "RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN) "
1256 "call.", ctx->module->name);
1257 return;
1258 }
1259 ctx->postponed_arrays_count--;
1260 setDeferredMultiBulkLength(c,
1261 ctx->postponed_arrays[ctx->postponed_arrays_count],
1262 len);
1263 if (ctx->postponed_arrays_count == 0) {
1264 zfree(ctx->postponed_arrays);
1265 ctx->postponed_arrays = NULL;
1266 }
1267 }
1268
1269 /* Reply with a bulk string, taking in input a C buffer pointer and length.
1270 *
1271 * The function always returns REDISMODULE_OK. */
RM_ReplyWithStringBuffer(RedisModuleCtx * ctx,const char * buf,size_t len)1272 int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) {
1273 client *c = moduleGetReplyClient(ctx);
1274 if (c == NULL) return REDISMODULE_OK;
1275 addReplyBulkCBuffer(c,(char*)buf,len);
1276 return REDISMODULE_OK;
1277 }
1278
1279 /* Reply with a bulk string, taking in input a C buffer pointer that is
1280 * assumed to be null-terminated.
1281 *
1282 * The function always returns REDISMODULE_OK. */
RM_ReplyWithCString(RedisModuleCtx * ctx,const char * buf)1283 int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) {
1284 client *c = moduleGetReplyClient(ctx);
1285 if (c == NULL) return REDISMODULE_OK;
1286 addReplyBulkCString(c,(char*)buf);
1287 return REDISMODULE_OK;
1288 }
1289
1290 /* Reply with a bulk string, taking in input a RedisModuleString object.
1291 *
1292 * The function always returns REDISMODULE_OK. */
RM_ReplyWithString(RedisModuleCtx * ctx,RedisModuleString * str)1293 int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) {
1294 client *c = moduleGetReplyClient(ctx);
1295 if (c == NULL) return REDISMODULE_OK;
1296 addReplyBulk(c,str);
1297 return REDISMODULE_OK;
1298 }
1299
1300 /* Reply to the client with a NULL. In the RESP protocol a NULL is encoded
1301 * as the string "$-1\r\n".
1302 *
1303 * The function always returns REDISMODULE_OK. */
RM_ReplyWithNull(RedisModuleCtx * ctx)1304 int RM_ReplyWithNull(RedisModuleCtx *ctx) {
1305 client *c = moduleGetReplyClient(ctx);
1306 if (c == NULL) return REDISMODULE_OK;
1307 addReply(c,shared.nullbulk);
1308 return REDISMODULE_OK;
1309 }
1310
1311 /* Reply exactly what a Redis command returned us with RedisModule_Call().
1312 * This function is useful when we use RedisModule_Call() in order to
1313 * execute some command, as we want to reply to the client exactly the
1314 * same reply we obtained by the command.
1315 *
1316 * The function always returns REDISMODULE_OK. */
RM_ReplyWithCallReply(RedisModuleCtx * ctx,RedisModuleCallReply * reply)1317 int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
1318 client *c = moduleGetReplyClient(ctx);
1319 if (c == NULL) return REDISMODULE_OK;
1320 sds proto = sdsnewlen(reply->proto, reply->protolen);
1321 addReplySds(c,proto);
1322 return REDISMODULE_OK;
1323 }
1324
1325 /* Send a string reply obtained converting the double 'd' into a bulk string.
1326 * This function is basically equivalent to converting a double into
1327 * a string into a C buffer, and then calling the function
1328 * RedisModule_ReplyWithStringBuffer() with the buffer and length.
1329 *
1330 * The function always returns REDISMODULE_OK. */
RM_ReplyWithDouble(RedisModuleCtx * ctx,double d)1331 int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) {
1332 client *c = moduleGetReplyClient(ctx);
1333 if (c == NULL) return REDISMODULE_OK;
1334 addReplyDouble(c,d);
1335 return REDISMODULE_OK;
1336 }
1337
1338 /* --------------------------------------------------------------------------
1339 * Commands replication API
1340 * -------------------------------------------------------------------------- */
1341
1342 /* Helper function to replicate MULTI the first time we replicate something
1343 * in the context of a command execution. EXEC will be handled by the
1344 * RedisModuleCommandDispatcher() function. */
moduleReplicateMultiIfNeeded(RedisModuleCtx * ctx)1345 void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
1346 /* Skip this if client explicitly wrap the command with MULTI, or if
1347 * the module command was called by a script. */
1348 if (ctx->client->flags & (CLIENT_MULTI|CLIENT_LUA)) return;
1349 /* If we already emitted MULTI return ASAP. */
1350 if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return;
1351 /* If this is a thread safe context, we do not want to wrap commands
1352 * executed into MULTI/EXEC, they are executed as single commands
1353 * from an external client in essence. */
1354 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) return;
1355 /* If this is a callback context, and not a module command execution
1356 * context, we have to setup the op array for the "also propagate" API
1357 * so that RM_Replicate() will work. */
1358 if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) {
1359 ctx->saved_oparray = server.also_propagate;
1360 redisOpArrayInit(&server.also_propagate);
1361 }
1362 execCommandPropagateMulti(ctx->client);
1363 ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
1364 }
1365
1366 /* Replicate the specified command and arguments to slaves and AOF, as effect
1367 * of execution of the calling command implementation.
1368 *
1369 * The replicated commands are always wrapped into the MULTI/EXEC that
1370 * contains all the commands replicated in a given module command
1371 * execution. However the commands replicated with RedisModule_Call()
1372 * are the first items, the ones replicated with RedisModule_Replicate()
1373 * will all follow before the EXEC.
1374 *
1375 * Modules should try to use one interface or the other.
1376 *
1377 * This command follows exactly the same interface of RedisModule_Call(),
1378 * so a set of format specifiers must be passed, followed by arguments
1379 * matching the provided format specifiers.
1380 *
1381 * Please refer to RedisModule_Call() for more information.
1382 *
1383 * Using the special "A" and "R" modifiers, the caller can exclude either
1384 * the AOF or the replicas from the propagation of the specified command.
1385 * Otherwise, by default, the command will be propagated in both channels.
1386 *
1387 * ## Note about calling this function from a thread safe context:
1388 *
1389 * Normally when you call this function from the callback implementing a
1390 * module command, or any other callback provided by the Redis Module API,
1391 * Redis will accumulate all the calls to this function in the context of
1392 * the callback, and will propagate all the commands wrapped in a MULTI/EXEC
1393 * transaction. However when calling this function from a threaded safe context
1394 * that can live an undefined amount of time, and can be locked/unlocked in
1395 * at will, the behavior is different: MULTI/EXEC wrapper is not emitted
1396 * and the command specified is inserted in the AOF and replication stream
1397 * immediately.
1398 *
1399 * ## Return value
1400 *
1401 * The command returns REDISMODULE_ERR if the format specifiers are invalid
1402 * or the command name does not belong to a known command. */
RM_Replicate(RedisModuleCtx * ctx,const char * cmdname,const char * fmt,...)1403 int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
1404 struct redisCommand *cmd;
1405 robj **argv = NULL;
1406 int argc = 0, flags = 0, j;
1407 va_list ap;
1408
1409 cmd = lookupCommandByCString((char*)cmdname);
1410 if (!cmd) return REDISMODULE_ERR;
1411
1412 /* Create the client and dispatch the command. */
1413 va_start(ap, fmt);
1414 argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
1415 va_end(ap);
1416 if (argv == NULL) return REDISMODULE_ERR;
1417
1418 /* Select the propagation target. Usually is AOF + replicas, however
1419 * the caller can exclude one or the other using the "A" or "R"
1420 * modifiers. */
1421 int target = 0;
1422 if (!(flags & REDISMODULE_ARGV_NO_AOF)) target |= PROPAGATE_AOF;
1423 if (!(flags & REDISMODULE_ARGV_NO_REPLICAS)) target |= PROPAGATE_REPL;
1424
1425 /* Replicate! When we are in a threaded context, we want to just insert
1426 * the replicated command ASAP, since it is not clear when the context
1427 * will stop being used, so accumulating stuff does not make much sense,
1428 * nor we could easily use the alsoPropagate() API from threads. */
1429 if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
1430 propagate(cmd,ctx->client->db->id,argv,argc,target);
1431 } else {
1432 moduleReplicateMultiIfNeeded(ctx);
1433 alsoPropagate(cmd,ctx->client->db->id,argv,argc,target);
1434 }
1435
1436 /* Release the argv. */
1437 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
1438 zfree(argv);
1439 server.dirty++;
1440 return REDISMODULE_OK;
1441 }
1442
1443 /* This function will replicate the command exactly as it was invoked
1444 * by the client. Note that this function will not wrap the command into
1445 * a MULTI/EXEC stanza, so it should not be mixed with other replication
1446 * commands.
1447 *
1448 * Basically this form of replication is useful when you want to propagate
1449 * the command to the slaves and AOF file exactly as it was called, since
1450 * the command can just be re-executed to deterministically re-create the
1451 * new state starting from the old one.
1452 *
1453 * The function always returns REDISMODULE_OK. */
RM_ReplicateVerbatim(RedisModuleCtx * ctx)1454 int RM_ReplicateVerbatim(RedisModuleCtx *ctx) {
1455 alsoPropagate(ctx->client->cmd,ctx->client->db->id,
1456 ctx->client->argv,ctx->client->argc,
1457 PROPAGATE_AOF|PROPAGATE_REPL);
1458 server.dirty++;
1459 return REDISMODULE_OK;
1460 }
1461
1462 /* --------------------------------------------------------------------------
1463 * DB and Key APIs -- Generic API
1464 * -------------------------------------------------------------------------- */
1465
1466 /* Return the ID of the current client calling the currently active module
1467 * command. The returned ID has a few guarantees:
1468 *
1469 * 1. The ID is different for each different client, so if the same client
1470 * executes a module command multiple times, it can be recognized as
1471 * having the same ID, otherwise the ID will be different.
1472 * 2. The ID increases monotonically. Clients connecting to the server later
1473 * are guaranteed to get IDs greater than any past ID previously seen.
1474 *
1475 * Valid IDs are from 1 to 2^64-1. If 0 is returned it means there is no way
1476 * to fetch the ID in the context the function was currently called. */
RM_GetClientId(RedisModuleCtx * ctx)1477 unsigned long long RM_GetClientId(RedisModuleCtx *ctx) {
1478 if (ctx->client == NULL) return 0;
1479 return ctx->client->id;
1480 }
1481
1482 /* Return the currently selected DB. */
RM_GetSelectedDb(RedisModuleCtx * ctx)1483 int RM_GetSelectedDb(RedisModuleCtx *ctx) {
1484 return ctx->client->db->id;
1485 }
1486
1487
1488 /* Return the current context's flags. The flags provide information on the
1489 * current request context (whether the client is a Lua script or in a MULTI),
1490 * and about the Redis instance in general, i.e replication and persistence.
1491 *
1492 * The available flags are:
1493 *
1494 * * REDISMODULE_CTX_FLAGS_LUA: The command is running in a Lua script
1495 *
1496 * * REDISMODULE_CTX_FLAGS_MULTI: The command is running inside a transaction
1497 *
1498 * * REDISMODULE_CTX_FLAGS_REPLICATED: The command was sent over the replication
1499 * link by the MASTER
1500 *
1501 * * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master
1502 *
1503 * * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave
1504 *
1505 * * REDISMODULE_CTX_FLAGS_READONLY: The Redis instance is read-only
1506 *
1507 * * REDISMODULE_CTX_FLAGS_CLUSTER: The Redis instance is in cluster mode
1508 *
1509 * * REDISMODULE_CTX_FLAGS_AOF: The Redis instance has AOF enabled
1510 *
1511 * * REDISMODULE_CTX_FLAGS_RDB: The instance has RDB enabled
1512 *
1513 * * REDISMODULE_CTX_FLAGS_MAXMEMORY: The instance has Maxmemory set
1514 *
1515 * * REDISMODULE_CTX_FLAGS_EVICT: Maxmemory is set and has an eviction
1516 * policy that may delete keys
1517 *
1518 * * REDISMODULE_CTX_FLAGS_OOM: Redis is out of memory according to the
1519 * maxmemory setting.
1520 *
1521 * * REDISMODULE_CTX_FLAGS_OOM_WARNING: Less than 25% of memory remains before
1522 * reaching the maxmemory level.
1523 *
1524 * * REDISMODULE_CTX_FLAGS_LOADING: Server is loading RDB/AOF
1525 *
1526 * * REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE: No active link with the master.
1527 *
1528 * * REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING: The replica is trying to
1529 * connect with the master.
1530 *
1531 * * REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING: Master -> Replica RDB
1532 * transfer is in progress.
1533 *
1534 * * REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE: The replica has an active link
1535 * with its master. This is the
1536 * contrary of STALE state.
1537 *
1538 * * REDISMODULE_CTX_FLAGS_ACTIVE_CHILD: There is currently some background
1539 * process active (RDB, AUX or module).
1540 */
RM_GetContextFlags(RedisModuleCtx * ctx)1541 int RM_GetContextFlags(RedisModuleCtx *ctx) {
1542
1543 int flags = 0;
1544 /* Client specific flags */
1545 if (ctx->client) {
1546 if (ctx->client->flags & CLIENT_LUA)
1547 flags |= REDISMODULE_CTX_FLAGS_LUA;
1548 if (ctx->client->flags & CLIENT_MULTI)
1549 flags |= REDISMODULE_CTX_FLAGS_MULTI;
1550 /* Module command recieved from MASTER, is replicated. */
1551 if (ctx->client->flags & CLIENT_MASTER)
1552 flags |= REDISMODULE_CTX_FLAGS_REPLICATED;
1553 }
1554
1555 if (server.cluster_enabled)
1556 flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
1557
1558 if (server.loading)
1559 flags |= REDISMODULE_CTX_FLAGS_LOADING;
1560
1561 /* Maxmemory and eviction policy */
1562 if (server.maxmemory > 0) {
1563 flags |= REDISMODULE_CTX_FLAGS_MAXMEMORY;
1564
1565 if (server.maxmemory_policy != MAXMEMORY_NO_EVICTION)
1566 flags |= REDISMODULE_CTX_FLAGS_EVICT;
1567 }
1568
1569 /* Persistence flags */
1570 if (server.aof_state != AOF_OFF)
1571 flags |= REDISMODULE_CTX_FLAGS_AOF;
1572 if (server.saveparamslen > 0)
1573 flags |= REDISMODULE_CTX_FLAGS_RDB;
1574
1575 /* Replication flags */
1576 if (server.masterhost == NULL) {
1577 flags |= REDISMODULE_CTX_FLAGS_MASTER;
1578 } else {
1579 flags |= REDISMODULE_CTX_FLAGS_SLAVE;
1580 if (server.repl_slave_ro)
1581 flags |= REDISMODULE_CTX_FLAGS_READONLY;
1582
1583 /* Replica state flags. */
1584 if (server.repl_state == REPL_STATE_CONNECT ||
1585 server.repl_state == REPL_STATE_CONNECTING)
1586 {
1587 flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING;
1588 } else if (server.repl_state == REPL_STATE_TRANSFER) {
1589 flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING;
1590 } else if (server.repl_state == REPL_STATE_CONNECTED) {
1591 flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE;
1592 }
1593
1594 if (server.repl_state != REPL_STATE_CONNECTED)
1595 flags |= REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE;
1596 }
1597
1598 /* OOM flag. */
1599 float level;
1600 int retval = getMaxmemoryState(NULL,NULL,NULL,&level);
1601 if (retval == C_ERR) flags |= REDISMODULE_CTX_FLAGS_OOM;
1602 if (level > 0.75) flags |= REDISMODULE_CTX_FLAGS_OOM_WARNING;
1603
1604 /* Presence of children processes. */
1605 if (hasActiveChildProcess()) flags |= REDISMODULE_CTX_FLAGS_ACTIVE_CHILD;
1606
1607 return flags;
1608 }
1609
1610 /* Change the currently selected DB. Returns an error if the id
1611 * is out of range.
1612 *
1613 * Note that the client will retain the currently selected DB even after
1614 * the Redis command implemented by the module calling this function
1615 * returns.
1616 *
1617 * If the module command wishes to change something in a different DB and
1618 * returns back to the original one, it should call RedisModule_GetSelectedDb()
1619 * before in order to restore the old DB number before returning. */
RM_SelectDb(RedisModuleCtx * ctx,int newid)1620 int RM_SelectDb(RedisModuleCtx *ctx, int newid) {
1621 int retval = selectDb(ctx->client,newid);
1622 return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR;
1623 }
1624
1625 /* Return an handle representing a Redis key, so that it is possible
1626 * to call other APIs with the key handle as argument to perform
1627 * operations on the key.
1628 *
1629 * The return value is the handle representing the key, that must be
1630 * closed with RM_CloseKey().
1631 *
1632 * If the key does not exist and WRITE mode is requested, the handle
1633 * is still returned, since it is possible to perform operations on
1634 * a yet not existing key (that will be created, for example, after
1635 * a list push operation). If the mode is just READ instead, and the
1636 * key does not exist, NULL is returned. However it is still safe to
1637 * call RedisModule_CloseKey() and RedisModule_KeyType() on a NULL
1638 * value. */
RM_OpenKey(RedisModuleCtx * ctx,robj * keyname,int mode)1639 void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
1640 RedisModuleKey *kp;
1641 robj *value;
1642
1643 if (mode & REDISMODULE_WRITE) {
1644 value = lookupKeyWrite(ctx->client->db,keyname);
1645 } else {
1646 value = lookupKeyRead(ctx->client->db,keyname);
1647 if (value == NULL) {
1648 return NULL;
1649 }
1650 }
1651
1652 /* Setup the key handle. */
1653 kp = zmalloc(sizeof(*kp));
1654 kp->ctx = ctx;
1655 kp->db = ctx->client->db;
1656 kp->key = keyname;
1657 incrRefCount(keyname);
1658 kp->value = value;
1659 kp->iter = NULL;
1660 kp->mode = mode;
1661 zsetKeyReset(kp);
1662 autoMemoryAdd(ctx,REDISMODULE_AM_KEY,kp);
1663 return (void*)kp;
1664 }
1665
1666 /* Close a key handle. */
RM_CloseKey(RedisModuleKey * key)1667 void RM_CloseKey(RedisModuleKey *key) {
1668 if (key == NULL) return;
1669 if (key->mode & REDISMODULE_WRITE) signalModifiedKey(key->db,key->key);
1670 /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
1671 RM_ZsetRangeStop(key);
1672 decrRefCount(key->key);
1673 autoMemoryFreed(key->ctx,REDISMODULE_AM_KEY,key);
1674 zfree(key);
1675 }
1676
1677 /* Return the type of the key. If the key pointer is NULL then
1678 * REDISMODULE_KEYTYPE_EMPTY is returned. */
RM_KeyType(RedisModuleKey * key)1679 int RM_KeyType(RedisModuleKey *key) {
1680 if (key == NULL || key->value == NULL) return REDISMODULE_KEYTYPE_EMPTY;
1681 /* We map between defines so that we are free to change the internal
1682 * defines as desired. */
1683 switch(key->value->type) {
1684 case OBJ_STRING: return REDISMODULE_KEYTYPE_STRING;
1685 case OBJ_LIST: return REDISMODULE_KEYTYPE_LIST;
1686 case OBJ_SET: return REDISMODULE_KEYTYPE_SET;
1687 case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET;
1688 case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH;
1689 case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE;
1690 /* case OBJ_STREAM: return REDISMODULE_KEYTYPE_STREAM; - don't wanna add new API to 5.0 */
1691 default: return 0;
1692 }
1693 }
1694
1695 /* Return the length of the value associated with the key.
1696 * For strings this is the length of the string. For all the other types
1697 * is the number of elements (just counting keys for hashes).
1698 *
1699 * If the key pointer is NULL or the key is empty, zero is returned. */
RM_ValueLength(RedisModuleKey * key)1700 size_t RM_ValueLength(RedisModuleKey *key) {
1701 if (key == NULL || key->value == NULL) return 0;
1702 switch(key->value->type) {
1703 case OBJ_STRING: return stringObjectLen(key->value);
1704 case OBJ_LIST: return listTypeLength(key->value);
1705 case OBJ_SET: return setTypeSize(key->value);
1706 case OBJ_ZSET: return zsetLength(key->value);
1707 case OBJ_HASH: return hashTypeLength(key->value);
1708 case OBJ_STREAM: return streamLength(key->value);
1709 default: return 0;
1710 }
1711 }
1712
1713 /* If the key is open for writing, remove it, and setup the key to
1714 * accept new writes as an empty key (that will be created on demand).
1715 * On success REDISMODULE_OK is returned. If the key is not open for
1716 * writing REDISMODULE_ERR is returned. */
RM_DeleteKey(RedisModuleKey * key)1717 int RM_DeleteKey(RedisModuleKey *key) {
1718 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1719 if (key->value) {
1720 dbDelete(key->db,key->key);
1721 key->value = NULL;
1722 }
1723 return REDISMODULE_OK;
1724 }
1725
1726 /* If the key is open for writing, unlink it (that is delete it in a
1727 * non-blocking way, not reclaiming memory immediately) and setup the key to
1728 * accept new writes as an empty key (that will be created on demand).
1729 * On success REDISMODULE_OK is returned. If the key is not open for
1730 * writing REDISMODULE_ERR is returned. */
RM_UnlinkKey(RedisModuleKey * key)1731 int RM_UnlinkKey(RedisModuleKey *key) {
1732 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1733 if (key->value) {
1734 dbAsyncDelete(key->db,key->key);
1735 key->value = NULL;
1736 }
1737 return REDISMODULE_OK;
1738 }
1739
1740 /* Return the key expire value, as milliseconds of remaining TTL.
1741 * If no TTL is associated with the key or if the key is empty,
1742 * REDISMODULE_NO_EXPIRE is returned. */
RM_GetExpire(RedisModuleKey * key)1743 mstime_t RM_GetExpire(RedisModuleKey *key) {
1744 mstime_t expire = getExpire(key->db,key->key);
1745 if (expire == -1 || key->value == NULL) return -1;
1746 expire -= mstime();
1747 return expire >= 0 ? expire : 0;
1748 }
1749
1750 /* Set a new expire for the key. If the special expire
1751 * REDISMODULE_NO_EXPIRE is set, the expire is cancelled if there was
1752 * one (the same as the PERSIST command).
1753 *
1754 * Note that the expire must be provided as a positive integer representing
1755 * the number of milliseconds of TTL the key should have.
1756 *
1757 * The function returns REDISMODULE_OK on success or REDISMODULE_ERR if
1758 * the key was not open for writing or is an empty key. */
RM_SetExpire(RedisModuleKey * key,mstime_t expire)1759 int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
1760 if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL)
1761 return REDISMODULE_ERR;
1762 if (expire != REDISMODULE_NO_EXPIRE) {
1763 expire += mstime();
1764 setExpire(key->ctx->client,key->db,key->key,expire);
1765 } else {
1766 removeExpire(key->db,key->key);
1767 }
1768 return REDISMODULE_OK;
1769 }
1770
1771 /* --------------------------------------------------------------------------
1772 * Key API for String type
1773 * -------------------------------------------------------------------------- */
1774
1775 /* If the key is open for writing, set the specified string 'str' as the
1776 * value of the key, deleting the old value if any.
1777 * On success REDISMODULE_OK is returned. If the key is not open for
1778 * writing or there is an active iterator, REDISMODULE_ERR is returned. */
RM_StringSet(RedisModuleKey * key,RedisModuleString * str)1779 int RM_StringSet(RedisModuleKey *key, RedisModuleString *str) {
1780 if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
1781 RM_DeleteKey(key);
1782 setKey(key->db,key->key,str);
1783 key->value = str;
1784 return REDISMODULE_OK;
1785 }
1786
1787 /* Prepare the key associated string value for DMA access, and returns
1788 * a pointer and size (by reference), that the user can use to read or
1789 * modify the string in-place accessing it directly via pointer.
1790 *
1791 * The 'mode' is composed by bitwise OR-ing the following flags:
1792 *
1793 * REDISMODULE_READ -- Read access
1794 * REDISMODULE_WRITE -- Write access
1795 *
1796 * If the DMA is not requested for writing, the pointer returned should
1797 * only be accessed in a read-only fashion.
1798 *
1799 * On error (wrong type) NULL is returned.
1800 *
1801 * DMA access rules:
1802 *
1803 * 1. No other key writing function should be called since the moment
1804 * the pointer is obtained, for all the time we want to use DMA access
1805 * to read or modify the string.
1806 *
1807 * 2. Each time RM_StringTruncate() is called, to continue with the DMA
1808 * access, RM_StringDMA() should be called again to re-obtain
1809 * a new pointer and length.
1810 *
1811 * 3. If the returned pointer is not NULL, but the length is zero, no
1812 * byte can be touched (the string is empty, or the key itself is empty)
1813 * so a RM_StringTruncate() call should be used if there is to enlarge
1814 * the string, and later call StringDMA() again to get the pointer.
1815 */
RM_StringDMA(RedisModuleKey * key,size_t * len,int mode)1816 char *RM_StringDMA(RedisModuleKey *key, size_t *len, int mode) {
1817 /* We need to return *some* pointer for empty keys, we just return
1818 * a string literal pointer, that is the advantage to be mapped into
1819 * a read only memory page, so the module will segfault if a write
1820 * attempt is performed. */
1821 char *emptystring = "<dma-empty-string>";
1822 if (key->value == NULL) {
1823 *len = 0;
1824 return emptystring;
1825 }
1826
1827 if (key->value->type != OBJ_STRING) return NULL;
1828
1829 /* For write access, and even for read access if the object is encoded,
1830 * we unshare the string (that has the side effect of decoding it). */
1831 if ((mode & REDISMODULE_WRITE) || key->value->encoding != OBJ_ENCODING_RAW)
1832 key->value = dbUnshareStringValue(key->db, key->key, key->value);
1833
1834 *len = sdslen(key->value->ptr);
1835 return key->value->ptr;
1836 }
1837
1838 /* If the string is open for writing and is of string type, resize it, padding
1839 * with zero bytes if the new length is greater than the old one.
1840 *
1841 * After this call, RM_StringDMA() must be called again to continue
1842 * DMA access with the new pointer.
1843 *
1844 * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR on
1845 * error, that is, the key is not open for writing, is not a string
1846 * or resizing for more than 512 MB is requested.
1847 *
1848 * If the key is empty, a string key is created with the new string value
1849 * unless the new length value requested is zero. */
RM_StringTruncate(RedisModuleKey * key,size_t newlen)1850 int RM_StringTruncate(RedisModuleKey *key, size_t newlen) {
1851 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1852 if (key->value && key->value->type != OBJ_STRING) return REDISMODULE_ERR;
1853 if (newlen > 512*1024*1024) return REDISMODULE_ERR;
1854
1855 /* Empty key and new len set to 0. Just return REDISMODULE_OK without
1856 * doing anything. */
1857 if (key->value == NULL && newlen == 0) return REDISMODULE_OK;
1858
1859 if (key->value == NULL) {
1860 /* Empty key: create it with the new size. */
1861 robj *o = createObject(OBJ_STRING,sdsnewlen(NULL, newlen));
1862 setKey(key->db,key->key,o);
1863 key->value = o;
1864 decrRefCount(o);
1865 } else {
1866 /* Unshare and resize. */
1867 key->value = dbUnshareStringValue(key->db, key->key, key->value);
1868 size_t curlen = sdslen(key->value->ptr);
1869 if (newlen > curlen) {
1870 key->value->ptr = sdsgrowzero(key->value->ptr,newlen);
1871 } else if (newlen < curlen) {
1872 sdsrange(key->value->ptr,0,newlen-1);
1873 /* If the string is too wasteful, reallocate it. */
1874 if (sdslen(key->value->ptr) < sdsavail(key->value->ptr))
1875 key->value->ptr = sdsRemoveFreeSpace(key->value->ptr);
1876 }
1877 }
1878 return REDISMODULE_OK;
1879 }
1880
1881 /* --------------------------------------------------------------------------
1882 * Key API for List type
1883 * -------------------------------------------------------------------------- */
1884
1885 /* Push an element into a list, on head or tail depending on 'where' argument.
1886 * If the key pointer is about an empty key opened for writing, the key
1887 * is created. On error (key opened for read-only operations or of the wrong
1888 * type) REDISMODULE_ERR is returned, otherwise REDISMODULE_OK is returned. */
RM_ListPush(RedisModuleKey * key,int where,RedisModuleString * ele)1889 int RM_ListPush(RedisModuleKey *key, int where, RedisModuleString *ele) {
1890 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1891 if (key->value && key->value->type != OBJ_LIST) return REDISMODULE_ERR;
1892 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_LIST);
1893 listTypePush(key->value, ele,
1894 (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
1895 return REDISMODULE_OK;
1896 }
1897
1898 /* Pop an element from the list, and returns it as a module string object
1899 * that the user should be free with RM_FreeString() or by enabling
1900 * automatic memory. 'where' specifies if the element should be popped from
1901 * head or tail. The command returns NULL if:
1902 * 1) The list is empty.
1903 * 2) The key was not open for writing.
1904 * 3) The key is not a list. */
RM_ListPop(RedisModuleKey * key,int where)1905 RedisModuleString *RM_ListPop(RedisModuleKey *key, int where) {
1906 if (!(key->mode & REDISMODULE_WRITE) ||
1907 key->value == NULL ||
1908 key->value->type != OBJ_LIST) return NULL;
1909 robj *ele = listTypePop(key->value,
1910 (where == REDISMODULE_LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL);
1911 robj *decoded = getDecodedObject(ele);
1912 decrRefCount(ele);
1913 moduleDelKeyIfEmpty(key);
1914 autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,decoded);
1915 return decoded;
1916 }
1917
1918 /* --------------------------------------------------------------------------
1919 * Key API for Sorted Set type
1920 * -------------------------------------------------------------------------- */
1921
1922 /* Conversion from/to public flags of the Modules API and our private flags,
1923 * so that we have everything decoupled. */
RM_ZsetAddFlagsToCoreFlags(int flags)1924 int RM_ZsetAddFlagsToCoreFlags(int flags) {
1925 int retflags = 0;
1926 if (flags & REDISMODULE_ZADD_XX) retflags |= ZADD_XX;
1927 if (flags & REDISMODULE_ZADD_NX) retflags |= ZADD_NX;
1928 return retflags;
1929 }
1930
1931 /* See previous function comment. */
RM_ZsetAddFlagsFromCoreFlags(int flags)1932 int RM_ZsetAddFlagsFromCoreFlags(int flags) {
1933 int retflags = 0;
1934 if (flags & ZADD_ADDED) retflags |= REDISMODULE_ZADD_ADDED;
1935 if (flags & ZADD_UPDATED) retflags |= REDISMODULE_ZADD_UPDATED;
1936 if (flags & ZADD_NOP) retflags |= REDISMODULE_ZADD_NOP;
1937 return retflags;
1938 }
1939
1940 /* Add a new element into a sorted set, with the specified 'score'.
1941 * If the element already exists, the score is updated.
1942 *
1943 * A new sorted set is created at value if the key is an empty open key
1944 * setup for writing.
1945 *
1946 * Additional flags can be passed to the function via a pointer, the flags
1947 * are both used to receive input and to communicate state when the function
1948 * returns. 'flagsptr' can be NULL if no special flags are used.
1949 *
1950 * The input flags are:
1951 *
1952 * REDISMODULE_ZADD_XX: Element must already exist. Do nothing otherwise.
1953 * REDISMODULE_ZADD_NX: Element must not exist. Do nothing otherwise.
1954 *
1955 * The output flags are:
1956 *
1957 * REDISMODULE_ZADD_ADDED: The new element was added to the sorted set.
1958 * REDISMODULE_ZADD_UPDATED: The score of the element was updated.
1959 * REDISMODULE_ZADD_NOP: No operation was performed because XX or NX flags.
1960 *
1961 * On success the function returns REDISMODULE_OK. On the following errors
1962 * REDISMODULE_ERR is returned:
1963 *
1964 * * The key was not opened for writing.
1965 * * The key is of the wrong type.
1966 * * 'score' double value is not a number (NaN).
1967 */
RM_ZsetAdd(RedisModuleKey * key,double score,RedisModuleString * ele,int * flagsptr)1968 int RM_ZsetAdd(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr) {
1969 int flags = 0;
1970 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1971 if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1972 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET);
1973 if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr);
1974 if (zsetAdd(key->value,score,ele->ptr,&flags,NULL) == 0) {
1975 if (flagsptr) *flagsptr = 0;
1976 return REDISMODULE_ERR;
1977 }
1978 if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags);
1979 return REDISMODULE_OK;
1980 }
1981
1982 /* This function works exactly like RM_ZsetAdd(), but instead of setting
1983 * a new score, the score of the existing element is incremented, or if the
1984 * element does not already exist, it is added assuming the old score was
1985 * zero.
1986 *
1987 * The input and output flags, and the return value, have the same exact
1988 * meaning, with the only difference that this function will return
1989 * REDISMODULE_ERR even when 'score' is a valid double number, but adding it
1990 * to the existing score results into a NaN (not a number) condition.
1991 *
1992 * This function has an additional field 'newscore', if not NULL is filled
1993 * with the new score of the element after the increment, if no error
1994 * is returned. */
RM_ZsetIncrby(RedisModuleKey * key,double score,RedisModuleString * ele,int * flagsptr,double * newscore)1995 int RM_ZsetIncrby(RedisModuleKey *key, double score, RedisModuleString *ele, int *flagsptr, double *newscore) {
1996 int flags = 0;
1997 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
1998 if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
1999 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_ZSET);
2000 if (flagsptr) flags = RM_ZsetAddFlagsToCoreFlags(*flagsptr);
2001 flags |= ZADD_INCR;
2002 if (zsetAdd(key->value,score,ele->ptr,&flags,newscore) == 0) {
2003 if (flagsptr) *flagsptr = 0;
2004 return REDISMODULE_ERR;
2005 }
2006 /* zsetAdd() may signal back that the resulting score is not a number. */
2007 if (flagsptr && (*flagsptr & ZADD_NAN)) {
2008 *flagsptr = 0;
2009 return REDISMODULE_ERR;
2010 }
2011 if (flagsptr) *flagsptr = RM_ZsetAddFlagsFromCoreFlags(flags);
2012 return REDISMODULE_OK;
2013 }
2014
2015 /* Remove the specified element from the sorted set.
2016 * The function returns REDISMODULE_OK on success, and REDISMODULE_ERR
2017 * on one of the following conditions:
2018 *
2019 * * The key was not opened for writing.
2020 * * The key is of the wrong type.
2021 *
2022 * The return value does NOT indicate the fact the element was really
2023 * removed (since it existed) or not, just if the function was executed
2024 * with success.
2025 *
2026 * In order to know if the element was removed, the additional argument
2027 * 'deleted' must be passed, that populates the integer by reference
2028 * setting it to 1 or 0 depending on the outcome of the operation.
2029 * The 'deleted' argument can be NULL if the caller is not interested
2030 * to know if the element was really removed.
2031 *
2032 * Empty keys will be handled correctly by doing nothing. */
RM_ZsetRem(RedisModuleKey * key,RedisModuleString * ele,int * deleted)2033 int RM_ZsetRem(RedisModuleKey *key, RedisModuleString *ele, int *deleted) {
2034 if (!(key->mode & REDISMODULE_WRITE)) return REDISMODULE_ERR;
2035 if (key->value && key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
2036 if (key->value != NULL && zsetDel(key->value,ele->ptr)) {
2037 if (deleted) *deleted = 1;
2038 moduleDelKeyIfEmpty(key);
2039 } else {
2040 if (deleted) *deleted = 0;
2041 }
2042 return REDISMODULE_OK;
2043 }
2044
2045 /* On success retrieve the double score associated at the sorted set element
2046 * 'ele' and returns REDISMODULE_OK. Otherwise REDISMODULE_ERR is returned
2047 * to signal one of the following conditions:
2048 *
2049 * * There is no such element 'ele' in the sorted set.
2050 * * The key is not a sorted set.
2051 * * The key is an open empty key.
2052 */
RM_ZsetScore(RedisModuleKey * key,RedisModuleString * ele,double * score)2053 int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) {
2054 if (key->value == NULL) return REDISMODULE_ERR;
2055 if (key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
2056 if (zsetScore(key->value,ele->ptr,score) == C_ERR) return REDISMODULE_ERR;
2057 return REDISMODULE_OK;
2058 }
2059
2060 /* --------------------------------------------------------------------------
2061 * Key API for Sorted Set iterator
2062 * -------------------------------------------------------------------------- */
2063
zsetKeyReset(RedisModuleKey * key)2064 void zsetKeyReset(RedisModuleKey *key) {
2065 key->ztype = REDISMODULE_ZSET_RANGE_NONE;
2066 key->zcurrent = NULL;
2067 key->zer = 1;
2068 }
2069
2070 /* Stop a sorted set iteration. */
RM_ZsetRangeStop(RedisModuleKey * key)2071 void RM_ZsetRangeStop(RedisModuleKey *key) {
2072 /* Free resources if needed. */
2073 if (key->ztype == REDISMODULE_ZSET_RANGE_LEX)
2074 zslFreeLexRange(&key->zlrs);
2075 /* Setup sensible values so that misused iteration API calls when an
2076 * iterator is not active will result into something more sensible
2077 * than crashing. */
2078 zsetKeyReset(key);
2079 }
2080
2081 /* Return the "End of range" flag value to signal the end of the iteration. */
RM_ZsetRangeEndReached(RedisModuleKey * key)2082 int RM_ZsetRangeEndReached(RedisModuleKey *key) {
2083 if (!key->value || key->value->type != OBJ_ZSET) return 1;
2084 return key->zer;
2085 }
2086
2087 /* Helper function for RM_ZsetFirstInScoreRange() and RM_ZsetLastInScoreRange().
2088 * Setup the sorted set iteration according to the specified score range
2089 * (see the functions calling it for more info). If 'first' is true the
2090 * first element in the range is used as a starting point for the iterator
2091 * otherwise the last. Return REDISMODULE_OK on success otherwise
2092 * REDISMODULE_ERR. */
zsetInitScoreRange(RedisModuleKey * key,double min,double max,int minex,int maxex,int first)2093 int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex, int first) {
2094 if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
2095
2096 RM_ZsetRangeStop(key);
2097 key->ztype = REDISMODULE_ZSET_RANGE_SCORE;
2098 key->zer = 0;
2099
2100 /* Setup the range structure used by the sorted set core implementation
2101 * in order to seek at the specified element. */
2102 zrangespec *zrs = &key->zrs;
2103 zrs->min = min;
2104 zrs->max = max;
2105 zrs->minex = minex;
2106 zrs->maxex = maxex;
2107
2108 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2109 key->zcurrent = first ? zzlFirstInRange(key->value->ptr,zrs) :
2110 zzlLastInRange(key->value->ptr,zrs);
2111 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2112 zset *zs = key->value->ptr;
2113 zskiplist *zsl = zs->zsl;
2114 key->zcurrent = first ? zslFirstInRange(zsl,zrs) :
2115 zslLastInRange(zsl,zrs);
2116 } else {
2117 serverPanic("Unsupported zset encoding");
2118 }
2119 if (key->zcurrent == NULL) key->zer = 1;
2120 return REDISMODULE_OK;
2121 }
2122
2123 /* Setup a sorted set iterator seeking the first element in the specified
2124 * range. Returns REDISMODULE_OK if the iterator was correctly initialized
2125 * otherwise REDISMODULE_ERR is returned in the following conditions:
2126 *
2127 * 1. The value stored at key is not a sorted set or the key is empty.
2128 *
2129 * The range is specified according to the two double values 'min' and 'max'.
2130 * Both can be infinite using the following two macros:
2131 *
2132 * REDISMODULE_POSITIVE_INFINITE for positive infinite value
2133 * REDISMODULE_NEGATIVE_INFINITE for negative infinite value
2134 *
2135 * 'minex' and 'maxex' parameters, if true, respectively setup a range
2136 * where the min and max value are exclusive (not included) instead of
2137 * inclusive. */
RM_ZsetFirstInScoreRange(RedisModuleKey * key,double min,double max,int minex,int maxex)2138 int RM_ZsetFirstInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) {
2139 return zsetInitScoreRange(key,min,max,minex,maxex,1);
2140 }
2141
2142 /* Exactly like RedisModule_ZsetFirstInScoreRange() but the last element of
2143 * the range is selected for the start of the iteration instead. */
RM_ZsetLastInScoreRange(RedisModuleKey * key,double min,double max,int minex,int maxex)2144 int RM_ZsetLastInScoreRange(RedisModuleKey *key, double min, double max, int minex, int maxex) {
2145 return zsetInitScoreRange(key,min,max,minex,maxex,0);
2146 }
2147
2148 /* Helper function for RM_ZsetFirstInLexRange() and RM_ZsetLastInLexRange().
2149 * Setup the sorted set iteration according to the specified lexicographical
2150 * range (see the functions calling it for more info). If 'first' is true the
2151 * first element in the range is used as a starting point for the iterator
2152 * otherwise the last. Return REDISMODULE_OK on success otherwise
2153 * REDISMODULE_ERR.
2154 *
2155 * Note that this function takes 'min' and 'max' in the same form of the
2156 * Redis ZRANGEBYLEX command. */
zsetInitLexRange(RedisModuleKey * key,RedisModuleString * min,RedisModuleString * max,int first)2157 int zsetInitLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max, int first) {
2158 if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
2159
2160 RM_ZsetRangeStop(key);
2161 key->zer = 0;
2162
2163 /* Setup the range structure used by the sorted set core implementation
2164 * in order to seek at the specified element. */
2165 zlexrangespec *zlrs = &key->zlrs;
2166 if (zslParseLexRange(min, max, zlrs) == C_ERR) return REDISMODULE_ERR;
2167
2168 /* Set the range type to lex only after successfully parsing the range,
2169 * otherwise we don't want the zlexrangespec to be freed. */
2170 key->ztype = REDISMODULE_ZSET_RANGE_LEX;
2171
2172 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2173 key->zcurrent = first ? zzlFirstInLexRange(key->value->ptr,zlrs) :
2174 zzlLastInLexRange(key->value->ptr,zlrs);
2175 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2176 zset *zs = key->value->ptr;
2177 zskiplist *zsl = zs->zsl;
2178 key->zcurrent = first ? zslFirstInLexRange(zsl,zlrs) :
2179 zslLastInLexRange(zsl,zlrs);
2180 } else {
2181 serverPanic("Unsupported zset encoding");
2182 }
2183 if (key->zcurrent == NULL) key->zer = 1;
2184
2185 return REDISMODULE_OK;
2186 }
2187
2188 /* Setup a sorted set iterator seeking the first element in the specified
2189 * lexicographical range. Returns REDISMODULE_OK if the iterator was correctly
2190 * initialized otherwise REDISMODULE_ERR is returned in the
2191 * following conditions:
2192 *
2193 * 1. The value stored at key is not a sorted set or the key is empty.
2194 * 2. The lexicographical range 'min' and 'max' format is invalid.
2195 *
2196 * 'min' and 'max' should be provided as two RedisModuleString objects
2197 * in the same format as the parameters passed to the ZRANGEBYLEX command.
2198 * The function does not take ownership of the objects, so they can be released
2199 * ASAP after the iterator is setup. */
RM_ZsetFirstInLexRange(RedisModuleKey * key,RedisModuleString * min,RedisModuleString * max)2200 int RM_ZsetFirstInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) {
2201 return zsetInitLexRange(key,min,max,1);
2202 }
2203
2204 /* Exactly like RedisModule_ZsetFirstInLexRange() but the last element of
2205 * the range is selected for the start of the iteration instead. */
RM_ZsetLastInLexRange(RedisModuleKey * key,RedisModuleString * min,RedisModuleString * max)2206 int RM_ZsetLastInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleString *max) {
2207 return zsetInitLexRange(key,min,max,0);
2208 }
2209
2210 /* Return the current sorted set element of an active sorted set iterator
2211 * or NULL if the range specified in the iterator does not include any
2212 * element. */
RM_ZsetRangeCurrentElement(RedisModuleKey * key,double * score)2213 RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) {
2214 RedisModuleString *str;
2215
2216 if (key->zcurrent == NULL) return NULL;
2217 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2218 unsigned char *eptr, *sptr;
2219 eptr = key->zcurrent;
2220 sds ele = ziplistGetObject(eptr);
2221 if (score) {
2222 sptr = ziplistNext(key->value->ptr,eptr);
2223 *score = zzlGetScore(sptr);
2224 }
2225 str = createObject(OBJ_STRING,ele);
2226 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2227 zskiplistNode *ln = key->zcurrent;
2228 if (score) *score = ln->score;
2229 str = createStringObject(ln->ele,sdslen(ln->ele));
2230 } else {
2231 serverPanic("Unsupported zset encoding");
2232 }
2233 autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,str);
2234 return str;
2235 }
2236
2237 /* Go to the next element of the sorted set iterator. Returns 1 if there was
2238 * a next element, 0 if we are already at the latest element or the range
2239 * does not include any item at all. */
RM_ZsetRangeNext(RedisModuleKey * key)2240 int RM_ZsetRangeNext(RedisModuleKey *key) {
2241 if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
2242
2243 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2244 unsigned char *zl = key->value->ptr;
2245 unsigned char *eptr = key->zcurrent;
2246 unsigned char *next;
2247 next = ziplistNext(zl,eptr); /* Skip element. */
2248 if (next) next = ziplistNext(zl,next); /* Skip score. */
2249 if (next == NULL) {
2250 key->zer = 1;
2251 return 0;
2252 } else {
2253 /* Are we still within the range? */
2254 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
2255 /* Fetch the next element score for the
2256 * range check. */
2257 unsigned char *saved_next = next;
2258 next = ziplistNext(zl,next); /* Skip next element. */
2259 double score = zzlGetScore(next); /* Obtain the next score. */
2260 if (!zslValueLteMax(score,&key->zrs)) {
2261 key->zer = 1;
2262 return 0;
2263 }
2264 next = saved_next;
2265 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2266 if (!zzlLexValueLteMax(next,&key->zlrs)) {
2267 key->zer = 1;
2268 return 0;
2269 }
2270 }
2271 key->zcurrent = next;
2272 return 1;
2273 }
2274 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2275 zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward;
2276 if (next == NULL) {
2277 key->zer = 1;
2278 return 0;
2279 } else {
2280 /* Are we still within the range? */
2281 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
2282 !zslValueLteMax(next->score,&key->zrs))
2283 {
2284 key->zer = 1;
2285 return 0;
2286 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2287 if (!zslLexValueLteMax(next->ele,&key->zlrs)) {
2288 key->zer = 1;
2289 return 0;
2290 }
2291 }
2292 key->zcurrent = next;
2293 return 1;
2294 }
2295 } else {
2296 serverPanic("Unsupported zset encoding");
2297 }
2298 }
2299
2300 /* Go to the previous element of the sorted set iterator. Returns 1 if there was
2301 * a previous element, 0 if we are already at the first element or the range
2302 * does not include any item at all. */
RM_ZsetRangePrev(RedisModuleKey * key)2303 int RM_ZsetRangePrev(RedisModuleKey *key) {
2304 if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
2305
2306 if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
2307 unsigned char *zl = key->value->ptr;
2308 unsigned char *eptr = key->zcurrent;
2309 unsigned char *prev;
2310 prev = ziplistPrev(zl,eptr); /* Go back to previous score. */
2311 if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */
2312 if (prev == NULL) {
2313 key->zer = 1;
2314 return 0;
2315 } else {
2316 /* Are we still within the range? */
2317 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
2318 /* Fetch the previous element score for the
2319 * range check. */
2320 unsigned char *saved_prev = prev;
2321 prev = ziplistNext(zl,prev); /* Skip element to get the score.*/
2322 double score = zzlGetScore(prev); /* Obtain the prev score. */
2323 if (!zslValueGteMin(score,&key->zrs)) {
2324 key->zer = 1;
2325 return 0;
2326 }
2327 prev = saved_prev;
2328 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2329 if (!zzlLexValueGteMin(prev,&key->zlrs)) {
2330 key->zer = 1;
2331 return 0;
2332 }
2333 }
2334 key->zcurrent = prev;
2335 return 1;
2336 }
2337 } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
2338 zskiplistNode *ln = key->zcurrent, *prev = ln->backward;
2339 if (prev == NULL) {
2340 key->zer = 1;
2341 return 0;
2342 } else {
2343 /* Are we still within the range? */
2344 if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
2345 !zslValueGteMin(prev->score,&key->zrs))
2346 {
2347 key->zer = 1;
2348 return 0;
2349 } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
2350 if (!zslLexValueGteMin(prev->ele,&key->zlrs)) {
2351 key->zer = 1;
2352 return 0;
2353 }
2354 }
2355 key->zcurrent = prev;
2356 return 1;
2357 }
2358 } else {
2359 serverPanic("Unsupported zset encoding");
2360 }
2361 }
2362
2363 /* --------------------------------------------------------------------------
2364 * Key API for Hash type
2365 * -------------------------------------------------------------------------- */
2366
2367 /* Set the field of the specified hash field to the specified value.
2368 * If the key is an empty key open for writing, it is created with an empty
2369 * hash value, in order to set the specified field.
2370 *
2371 * The function is variadic and the user must specify pairs of field
2372 * names and values, both as RedisModuleString pointers (unless the
2373 * CFIELD option is set, see later). At the end of the field/value-ptr pairs,
2374 * NULL must be specified as last argument to signal the end of the arguments
2375 * in the variadic function.
2376 *
2377 * Example to set the hash argv[1] to the value argv[2]:
2378 *
2379 * RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1],argv[2],NULL);
2380 *
2381 * The function can also be used in order to delete fields (if they exist)
2382 * by setting them to the specified value of REDISMODULE_HASH_DELETE:
2383 *
2384 * RedisModule_HashSet(key,REDISMODULE_HASH_NONE,argv[1],
2385 * REDISMODULE_HASH_DELETE,NULL);
2386 *
2387 * The behavior of the command changes with the specified flags, that can be
2388 * set to REDISMODULE_HASH_NONE if no special behavior is needed.
2389 *
2390 * REDISMODULE_HASH_NX: The operation is performed only if the field was not
2391 * already existing in the hash.
2392 * REDISMODULE_HASH_XX: The operation is performed only if the field was
2393 * already existing, so that a new value could be
2394 * associated to an existing filed, but no new fields
2395 * are created.
2396 * REDISMODULE_HASH_CFIELDS: The field names passed are null terminated C
2397 * strings instead of RedisModuleString objects.
2398 *
2399 * Unless NX is specified, the command overwrites the old field value with
2400 * the new one.
2401 *
2402 * When using REDISMODULE_HASH_CFIELDS, field names are reported using
2403 * normal C strings, so for example to delete the field "foo" the following
2404 * code can be used:
2405 *
2406 * RedisModule_HashSet(key,REDISMODULE_HASH_CFIELDS,"foo",
2407 * REDISMODULE_HASH_DELETE,NULL);
2408 *
2409 * Return value:
2410 *
2411 * The number of fields updated (that may be less than the number of fields
2412 * specified because of the XX or NX options).
2413 *
2414 * In the following case the return value is always zero:
2415 *
2416 * * The key was not open for writing.
2417 * * The key was associated with a non Hash value.
2418 */
RM_HashSet(RedisModuleKey * key,int flags,...)2419 int RM_HashSet(RedisModuleKey *key, int flags, ...) {
2420 va_list ap;
2421 if (!(key->mode & REDISMODULE_WRITE)) return 0;
2422 if (key->value && key->value->type != OBJ_HASH) return 0;
2423 if (key->value == NULL) moduleCreateEmptyKey(key,REDISMODULE_KEYTYPE_HASH);
2424
2425 int updated = 0;
2426 va_start(ap, flags);
2427 while(1) {
2428 RedisModuleString *field, *value;
2429 /* Get the field and value objects. */
2430 if (flags & REDISMODULE_HASH_CFIELDS) {
2431 char *cfield = va_arg(ap,char*);
2432 if (cfield == NULL) break;
2433 field = createRawStringObject(cfield,strlen(cfield));
2434 } else {
2435 field = va_arg(ap,RedisModuleString*);
2436 if (field == NULL) break;
2437 }
2438 value = va_arg(ap,RedisModuleString*);
2439
2440 /* Handle XX and NX */
2441 if (flags & (REDISMODULE_HASH_XX|REDISMODULE_HASH_NX)) {
2442 int exists = hashTypeExists(key->value, field->ptr);
2443 if (((flags & REDISMODULE_HASH_XX) && !exists) ||
2444 ((flags & REDISMODULE_HASH_NX) && exists))
2445 {
2446 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
2447 continue;
2448 }
2449 }
2450
2451 /* Handle deletion if value is REDISMODULE_HASH_DELETE. */
2452 if (value == REDISMODULE_HASH_DELETE) {
2453 updated += hashTypeDelete(key->value, field->ptr);
2454 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
2455 continue;
2456 }
2457
2458 int low_flags = HASH_SET_COPY;
2459 /* If CFIELDS is active, we can pass the ownership of the
2460 * SDS object to the low level function that sets the field
2461 * to avoid a useless copy. */
2462 if (flags & REDISMODULE_HASH_CFIELDS)
2463 low_flags |= HASH_SET_TAKE_FIELD;
2464
2465 robj *argv[2] = {field,value};
2466 hashTypeTryConversion(key->value,argv,0,1);
2467 updated += hashTypeSet(key->value, field->ptr, value->ptr, low_flags);
2468
2469 /* If CFIELDS is active, SDS string ownership is now of hashTypeSet(),
2470 * however we still have to release the 'field' object shell. */
2471 if (flags & REDISMODULE_HASH_CFIELDS) {
2472 field->ptr = NULL; /* Prevent the SDS string from being freed. */
2473 decrRefCount(field);
2474 }
2475 }
2476 va_end(ap);
2477 moduleDelKeyIfEmpty(key);
2478 return updated;
2479 }
2480
2481 /* Get fields from an hash value. This function is called using a variable
2482 * number of arguments, alternating a field name (as a StringRedisModule
2483 * pointer) with a pointer to a StringRedisModule pointer, that is set to the
2484 * value of the field if the field exist, or NULL if the field did not exist.
2485 * At the end of the field/value-ptr pairs, NULL must be specified as last
2486 * argument to signal the end of the arguments in the variadic function.
2487 *
2488 * This is an example usage:
2489 *
2490 * RedisModuleString *first, *second;
2491 * RedisModule_HashGet(mykey,REDISMODULE_HASH_NONE,argv[1],&first,
2492 * argv[2],&second,NULL);
2493 *
2494 * As with RedisModule_HashSet() the behavior of the command can be specified
2495 * passing flags different than REDISMODULE_HASH_NONE:
2496 *
2497 * REDISMODULE_HASH_CFIELD: field names as null terminated C strings.
2498 *
2499 * REDISMODULE_HASH_EXISTS: instead of setting the value of the field
2500 * expecting a RedisModuleString pointer to pointer, the function just
2501 * reports if the field esists or not and expects an integer pointer
2502 * as the second element of each pair.
2503 *
2504 * Example of REDISMODULE_HASH_CFIELD:
2505 *
2506 * RedisModuleString *username, *hashedpass;
2507 * RedisModule_HashGet(mykey,"username",&username,"hp",&hashedpass, NULL);
2508 *
2509 * Example of REDISMODULE_HASH_EXISTS:
2510 *
2511 * int exists;
2512 * RedisModule_HashGet(mykey,argv[1],&exists,NULL);
2513 *
2514 * The function returns REDISMODULE_OK on success and REDISMODULE_ERR if
2515 * the key is not an hash value.
2516 *
2517 * Memory management:
2518 *
2519 * The returned RedisModuleString objects should be released with
2520 * RedisModule_FreeString(), or by enabling automatic memory management.
2521 */
RM_HashGet(RedisModuleKey * key,int flags,...)2522 int RM_HashGet(RedisModuleKey *key, int flags, ...) {
2523 va_list ap;
2524 if (key->value && key->value->type != OBJ_HASH) return REDISMODULE_ERR;
2525
2526 va_start(ap, flags);
2527 while(1) {
2528 RedisModuleString *field, **valueptr;
2529 int *existsptr;
2530 /* Get the field object and the value pointer to pointer. */
2531 if (flags & REDISMODULE_HASH_CFIELDS) {
2532 char *cfield = va_arg(ap,char*);
2533 if (cfield == NULL) break;
2534 field = createRawStringObject(cfield,strlen(cfield));
2535 } else {
2536 field = va_arg(ap,RedisModuleString*);
2537 if (field == NULL) break;
2538 }
2539
2540 /* Query the hash for existence or value object. */
2541 if (flags & REDISMODULE_HASH_EXISTS) {
2542 existsptr = va_arg(ap,int*);
2543 if (key->value)
2544 *existsptr = hashTypeExists(key->value,field->ptr);
2545 else
2546 *existsptr = 0;
2547 } else {
2548 valueptr = va_arg(ap,RedisModuleString**);
2549 if (key->value) {
2550 *valueptr = hashTypeGetValueObject(key->value,field->ptr);
2551 if (*valueptr) {
2552 robj *decoded = getDecodedObject(*valueptr);
2553 decrRefCount(*valueptr);
2554 *valueptr = decoded;
2555 }
2556 if (*valueptr)
2557 autoMemoryAdd(key->ctx,REDISMODULE_AM_STRING,*valueptr);
2558 } else {
2559 *valueptr = NULL;
2560 }
2561 }
2562
2563 /* Cleanup */
2564 if (flags & REDISMODULE_HASH_CFIELDS) decrRefCount(field);
2565 }
2566 va_end(ap);
2567 return REDISMODULE_OK;
2568 }
2569
2570 /* --------------------------------------------------------------------------
2571 * Redis <-> Modules generic Call() API
2572 * -------------------------------------------------------------------------- */
2573
2574 /* Create a new RedisModuleCallReply object. The processing of the reply
2575 * is lazy, the object is just populated with the raw protocol and later
2576 * is processed as needed. Initially we just make sure to set the right
2577 * reply type, which is extremely cheap to do. */
moduleCreateCallReplyFromProto(RedisModuleCtx * ctx,sds proto)2578 RedisModuleCallReply *moduleCreateCallReplyFromProto(RedisModuleCtx *ctx, sds proto) {
2579 RedisModuleCallReply *reply = zmalloc(sizeof(*reply));
2580 reply->ctx = ctx;
2581 reply->proto = proto;
2582 reply->protolen = sdslen(proto);
2583 reply->flags = REDISMODULE_REPLYFLAG_TOPARSE; /* Lazy parsing. */
2584 switch(proto[0]) {
2585 case '$':
2586 case '+': reply->type = REDISMODULE_REPLY_STRING; break;
2587 case '-': reply->type = REDISMODULE_REPLY_ERROR; break;
2588 case ':': reply->type = REDISMODULE_REPLY_INTEGER; break;
2589 case '*': reply->type = REDISMODULE_REPLY_ARRAY; break;
2590 default: reply->type = REDISMODULE_REPLY_UNKNOWN; break;
2591 }
2592 if ((proto[0] == '*' || proto[0] == '$') && proto[1] == '-')
2593 reply->type = REDISMODULE_REPLY_NULL;
2594 return reply;
2595 }
2596
2597 void moduleParseCallReply_Int(RedisModuleCallReply *reply);
2598 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply);
2599 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply);
2600 void moduleParseCallReply_Array(RedisModuleCallReply *reply);
2601
2602 /* Do nothing if REDISMODULE_REPLYFLAG_TOPARSE is false, otherwise
2603 * use the protcol of the reply in reply->proto in order to fill the
2604 * reply with parsed data according to the reply type. */
moduleParseCallReply(RedisModuleCallReply * reply)2605 void moduleParseCallReply(RedisModuleCallReply *reply) {
2606 if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) return;
2607 reply->flags &= ~REDISMODULE_REPLYFLAG_TOPARSE;
2608
2609 switch(reply->proto[0]) {
2610 case ':': moduleParseCallReply_Int(reply); break;
2611 case '$': moduleParseCallReply_BulkString(reply); break;
2612 case '-': /* handled by next item. */
2613 case '+': moduleParseCallReply_SimpleString(reply); break;
2614 case '*': moduleParseCallReply_Array(reply); break;
2615 }
2616 }
2617
moduleParseCallReply_Int(RedisModuleCallReply * reply)2618 void moduleParseCallReply_Int(RedisModuleCallReply *reply) {
2619 char *proto = reply->proto;
2620 char *p = strchr(proto+1,'\r');
2621
2622 string2ll(proto+1,p-proto-1,&reply->val.ll);
2623 reply->protolen = p-proto+2;
2624 reply->type = REDISMODULE_REPLY_INTEGER;
2625 }
2626
moduleParseCallReply_BulkString(RedisModuleCallReply * reply)2627 void moduleParseCallReply_BulkString(RedisModuleCallReply *reply) {
2628 char *proto = reply->proto;
2629 char *p = strchr(proto+1,'\r');
2630 long long bulklen;
2631
2632 string2ll(proto+1,p-proto-1,&bulklen);
2633 if (bulklen == -1) {
2634 reply->protolen = p-proto+2;
2635 reply->type = REDISMODULE_REPLY_NULL;
2636 } else {
2637 reply->val.str = p+2;
2638 reply->len = bulklen;
2639 reply->protolen = p-proto+2+bulklen+2;
2640 reply->type = REDISMODULE_REPLY_STRING;
2641 }
2642 }
2643
moduleParseCallReply_SimpleString(RedisModuleCallReply * reply)2644 void moduleParseCallReply_SimpleString(RedisModuleCallReply *reply) {
2645 char *proto = reply->proto;
2646 char *p = strchr(proto+1,'\r');
2647
2648 reply->val.str = proto+1;
2649 reply->len = p-proto-1;
2650 reply->protolen = p-proto+2;
2651 reply->type = proto[0] == '+' ? REDISMODULE_REPLY_STRING :
2652 REDISMODULE_REPLY_ERROR;
2653 }
2654
moduleParseCallReply_Array(RedisModuleCallReply * reply)2655 void moduleParseCallReply_Array(RedisModuleCallReply *reply) {
2656 char *proto = reply->proto;
2657 char *p = strchr(proto+1,'\r');
2658 long long arraylen, j;
2659
2660 string2ll(proto+1,p-proto-1,&arraylen);
2661 p += 2;
2662
2663 if (arraylen == -1) {
2664 reply->protolen = p-proto;
2665 reply->type = REDISMODULE_REPLY_NULL;
2666 return;
2667 }
2668
2669 reply->val.array = zmalloc(sizeof(RedisModuleCallReply)*arraylen);
2670 reply->len = arraylen;
2671 for (j = 0; j < arraylen; j++) {
2672 RedisModuleCallReply *ele = reply->val.array+j;
2673 ele->flags = REDISMODULE_REPLYFLAG_NESTED |
2674 REDISMODULE_REPLYFLAG_TOPARSE;
2675 ele->proto = p;
2676 ele->ctx = reply->ctx;
2677 moduleParseCallReply(ele);
2678 p += ele->protolen;
2679 }
2680 reply->protolen = p-proto;
2681 reply->type = REDISMODULE_REPLY_ARRAY;
2682 }
2683
2684 /* Free a Call reply and all the nested replies it contains if it's an
2685 * array. */
RM_FreeCallReply_Rec(RedisModuleCallReply * reply,int freenested)2686 void RM_FreeCallReply_Rec(RedisModuleCallReply *reply, int freenested){
2687 /* Don't free nested replies by default: the user must always free the
2688 * toplevel reply. However be gentle and don't crash if the module
2689 * misuses the API. */
2690 if (!freenested && reply->flags & REDISMODULE_REPLYFLAG_NESTED) return;
2691
2692 if (!(reply->flags & REDISMODULE_REPLYFLAG_TOPARSE)) {
2693 if (reply->type == REDISMODULE_REPLY_ARRAY) {
2694 size_t j;
2695 for (j = 0; j < reply->len; j++)
2696 RM_FreeCallReply_Rec(reply->val.array+j,1);
2697 zfree(reply->val.array);
2698 }
2699 }
2700
2701 /* For nested replies, we don't free reply->proto (which if not NULL
2702 * references the parent reply->proto buffer), nor the structure
2703 * itself which is allocated as an array of structures, and is freed
2704 * when the array value is released. */
2705 if (!(reply->flags & REDISMODULE_REPLYFLAG_NESTED)) {
2706 if (reply->proto) sdsfree(reply->proto);
2707 zfree(reply);
2708 }
2709 }
2710
2711 /* Wrapper for the recursive free reply function. This is needed in order
2712 * to have the first level function to return on nested replies, but only
2713 * if called by the module API. */
RM_FreeCallReply(RedisModuleCallReply * reply)2714 void RM_FreeCallReply(RedisModuleCallReply *reply) {
2715
2716 RedisModuleCtx *ctx = reply->ctx;
2717 RM_FreeCallReply_Rec(reply,0);
2718 autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply);
2719 }
2720
2721 /* Return the reply type. */
RM_CallReplyType(RedisModuleCallReply * reply)2722 int RM_CallReplyType(RedisModuleCallReply *reply) {
2723 if (!reply) return REDISMODULE_REPLY_UNKNOWN;
2724 return reply->type;
2725 }
2726
2727 /* Return the reply type length, where applicable. */
RM_CallReplyLength(RedisModuleCallReply * reply)2728 size_t RM_CallReplyLength(RedisModuleCallReply *reply) {
2729 moduleParseCallReply(reply);
2730 switch(reply->type) {
2731 case REDISMODULE_REPLY_STRING:
2732 case REDISMODULE_REPLY_ERROR:
2733 case REDISMODULE_REPLY_ARRAY:
2734 return reply->len;
2735 default:
2736 return 0;
2737 }
2738 }
2739
2740 /* Return the 'idx'-th nested call reply element of an array reply, or NULL
2741 * if the reply type is wrong or the index is out of range. */
RM_CallReplyArrayElement(RedisModuleCallReply * reply,size_t idx)2742 RedisModuleCallReply *RM_CallReplyArrayElement(RedisModuleCallReply *reply, size_t idx) {
2743 moduleParseCallReply(reply);
2744 if (reply->type != REDISMODULE_REPLY_ARRAY) return NULL;
2745 if (idx >= reply->len) return NULL;
2746 return reply->val.array+idx;
2747 }
2748
2749 /* Return the long long of an integer reply. */
RM_CallReplyInteger(RedisModuleCallReply * reply)2750 long long RM_CallReplyInteger(RedisModuleCallReply *reply) {
2751 moduleParseCallReply(reply);
2752 if (reply->type != REDISMODULE_REPLY_INTEGER) return LLONG_MIN;
2753 return reply->val.ll;
2754 }
2755
2756 /* Return the pointer and length of a string or error reply. */
RM_CallReplyStringPtr(RedisModuleCallReply * reply,size_t * len)2757 const char *RM_CallReplyStringPtr(RedisModuleCallReply *reply, size_t *len) {
2758 moduleParseCallReply(reply);
2759 if (reply->type != REDISMODULE_REPLY_STRING &&
2760 reply->type != REDISMODULE_REPLY_ERROR) return NULL;
2761 if (len) *len = reply->len;
2762 return reply->val.str;
2763 }
2764
2765 /* Return a new string object from a call reply of type string, error or
2766 * integer. Otherwise (wrong reply type) return NULL. */
RM_CreateStringFromCallReply(RedisModuleCallReply * reply)2767 RedisModuleString *RM_CreateStringFromCallReply(RedisModuleCallReply *reply) {
2768 moduleParseCallReply(reply);
2769 switch(reply->type) {
2770 case REDISMODULE_REPLY_STRING:
2771 case REDISMODULE_REPLY_ERROR:
2772 return RM_CreateString(reply->ctx,reply->val.str,reply->len);
2773 case REDISMODULE_REPLY_INTEGER: {
2774 char buf[64];
2775 int len = ll2string(buf,sizeof(buf),reply->val.ll);
2776 return RM_CreateString(reply->ctx,buf,len);
2777 }
2778 default: return NULL;
2779 }
2780 }
2781
2782 /* Returns an array of robj pointers, and populates *argc with the number
2783 * of items, by parsing the format specifier "fmt" as described for
2784 * the RM_Call(), RM_Replicate() and other module APIs.
2785 *
2786 * The integer pointed by 'flags' is populated with flags according
2787 * to special modifiers in "fmt". For now only one exists:
2788 *
2789 * "!" -> REDISMODULE_ARGV_REPLICATE
2790 * "A" -> REDISMODULE_ARGV_NO_AOF
2791 * "R" -> REDISMODULE_ARGV_NO_REPLICAS
2792 *
2793 * On error (format specifier error) NULL is returned and nothing is
2794 * allocated. On success the argument vector is returned. */
moduleCreateArgvFromUserFormat(const char * cmdname,const char * fmt,int * argcp,int * flags,va_list ap)2795 robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap) {
2796 int argc = 0, argv_size, j;
2797 robj **argv = NULL;
2798
2799 /* As a first guess to avoid useless reallocations, size argv to
2800 * hold one argument for each char specifier in 'fmt'. */
2801 argv_size = strlen(fmt)+1; /* +1 because of the command name. */
2802 argv = zrealloc(argv,sizeof(robj*)*argv_size);
2803
2804 /* Build the arguments vector based on the format specifier. */
2805 argv[0] = createStringObject(cmdname,strlen(cmdname));
2806 argc++;
2807
2808 /* Create the client and dispatch the command. */
2809 const char *p = fmt;
2810 while(*p) {
2811 if (*p == 'c') {
2812 char *cstr = va_arg(ap,char*);
2813 argv[argc++] = createStringObject(cstr,strlen(cstr));
2814 } else if (*p == 's') {
2815 robj *obj = va_arg(ap,void*);
2816 argv[argc++] = obj;
2817 incrRefCount(obj);
2818 } else if (*p == 'b') {
2819 char *buf = va_arg(ap,char*);
2820 size_t len = va_arg(ap,size_t);
2821 argv[argc++] = createStringObject(buf,len);
2822 } else if (*p == 'l') {
2823 long long ll = va_arg(ap,long long);
2824 argv[argc++] = createObject(OBJ_STRING,sdsfromlonglong(ll));
2825 } else if (*p == 'v') {
2826 /* A vector of strings */
2827 robj **v = va_arg(ap, void*);
2828 size_t vlen = va_arg(ap, size_t);
2829
2830 /* We need to grow argv to hold the vector's elements.
2831 * We resize by vector_len-1 elements, because we held
2832 * one element in argv for the vector already */
2833 argv_size += vlen-1;
2834 argv = zrealloc(argv,sizeof(robj*)*argv_size);
2835
2836 size_t i = 0;
2837 for (i = 0; i < vlen; i++) {
2838 incrRefCount(v[i]);
2839 argv[argc++] = v[i];
2840 }
2841 } else if (*p == '!') {
2842 if (flags) (*flags) |= REDISMODULE_ARGV_REPLICATE;
2843 } else if (*p == 'A') {
2844 if (flags) (*flags) |= REDISMODULE_ARGV_NO_AOF;
2845 } else if (*p == 'R') {
2846 if (flags) (*flags) |= REDISMODULE_ARGV_NO_REPLICAS;
2847 } else {
2848 goto fmterr;
2849 }
2850 p++;
2851 }
2852 *argcp = argc;
2853 return argv;
2854
2855 fmterr:
2856 for (j = 0; j < argc; j++)
2857 decrRefCount(argv[j]);
2858 zfree(argv);
2859 return NULL;
2860 }
2861
2862 /* Exported API to call any Redis command from modules.
2863 * On success a RedisModuleCallReply object is returned, otherwise
2864 * NULL is returned and errno is set to the following values:
2865 *
2866 * EINVAL: command non existing, wrong arity, wrong format specifier.
2867 * EPERM: operation in Cluster instance with key in non local slot.
2868 *
2869 * This API is documented here: https://redis.io/topics/modules-intro
2870 */
RM_Call(RedisModuleCtx * ctx,const char * cmdname,const char * fmt,...)2871 RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) {
2872 struct redisCommand *cmd;
2873 client *c = NULL;
2874 robj **argv = NULL;
2875 int argc = 0, flags = 0;
2876 va_list ap;
2877 RedisModuleCallReply *reply = NULL;
2878 int replicate = 0; /* Replicate this command? */
2879
2880 /* Create the client and dispatch the command. */
2881 va_start(ap, fmt);
2882 c = createClient(-1);
2883 argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
2884 replicate = flags & REDISMODULE_ARGV_REPLICATE;
2885 va_end(ap);
2886
2887 /* Setup our fake client for command execution. */
2888 c->flags |= CLIENT_MODULE;
2889 c->db = ctx->client->db;
2890 c->argv = argv;
2891 c->argc = argc;
2892 if (ctx->module) ctx->module->in_call++;
2893
2894 /* We handle the above format error only when the client is setup so that
2895 * we can free it normally. */
2896 if (argv == NULL) goto cleanup;
2897
2898 /* Call command filters */
2899 moduleCallCommandFilters(c);
2900
2901 /* Lookup command now, after filters had a chance to make modifications
2902 * if necessary.
2903 */
2904 cmd = lookupCommand(c->argv[0]->ptr);
2905 if (!cmd) {
2906 errno = EINVAL;
2907 goto cleanup;
2908 }
2909 c->cmd = c->lastcmd = cmd;
2910
2911 /* Basic arity checks. */
2912 if ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity)) {
2913 errno = EINVAL;
2914 goto cleanup;
2915 }
2916
2917 /* If this is a Redis Cluster node, we need to make sure the module is not
2918 * trying to access non-local keys, with the exception of commands
2919 * received from our master. */
2920 if (server.cluster_enabled && !(ctx->client->flags & CLIENT_MASTER)) {
2921 /* Duplicate relevant flags in the module client. */
2922 c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
2923 c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING);
2924 if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,NULL) !=
2925 server.cluster->myself)
2926 {
2927 errno = EPERM;
2928 goto cleanup;
2929 }
2930 }
2931
2932 /* If we are using single commands replication, we need to wrap what
2933 * we propagate into a MULTI/EXEC block, so that it will be atomic like
2934 * a Lua script in the context of AOF and slaves. */
2935 if (replicate) moduleReplicateMultiIfNeeded(ctx);
2936
2937 /* Run the command */
2938 int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
2939 if (replicate) {
2940 if (!(flags & REDISMODULE_ARGV_NO_AOF))
2941 call_flags |= CMD_CALL_PROPAGATE_AOF;
2942 if (!(flags & REDISMODULE_ARGV_NO_REPLICAS))
2943 call_flags |= CMD_CALL_PROPAGATE_REPL;
2944 }
2945 call(c,call_flags);
2946
2947 /* Convert the result of the Redis command into a suitable Lua type.
2948 * The first thing we need is to create a single string from the client
2949 * output buffers. */
2950 sds proto = sdsnewlen(c->buf,c->bufpos);
2951 c->bufpos = 0;
2952 while(listLength(c->reply)) {
2953 clientReplyBlock *o = listNodeValue(listFirst(c->reply));
2954
2955 proto = sdscatlen(proto,o->buf,o->used);
2956 listDelNode(c->reply,listFirst(c->reply));
2957 }
2958 reply = moduleCreateCallReplyFromProto(ctx,proto);
2959 autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
2960
2961 cleanup:
2962 if (ctx->module) ctx->module->in_call--;
2963 freeClient(c);
2964 return reply;
2965 }
2966
2967 /* Return a pointer, and a length, to the protocol returned by the command
2968 * that returned the reply object. */
RM_CallReplyProto(RedisModuleCallReply * reply,size_t * len)2969 const char *RM_CallReplyProto(RedisModuleCallReply *reply, size_t *len) {
2970 if (reply->proto) *len = sdslen(reply->proto);
2971 return reply->proto;
2972 }
2973
2974 /* --------------------------------------------------------------------------
2975 * Modules data types
2976 *
2977 * When String DMA or using existing data structures is not enough, it is
2978 * possible to create new data types from scratch and export them to
2979 * Redis. The module must provide a set of callbacks for handling the
2980 * new values exported (for example in order to provide RDB saving/loading,
2981 * AOF rewrite, and so forth). In this section we define this API.
2982 * -------------------------------------------------------------------------- */
2983
2984 /* Turn a 9 chars name in the specified charset and a 10 bit encver into
2985 * a single 64 bit unsigned integer that represents this exact module name
2986 * and version. This final number is called a "type ID" and is used when
2987 * writing module exported values to RDB files, in order to re-associate the
2988 * value to the right module to load them during RDB loading.
2989 *
2990 * If the string is not of the right length or the charset is wrong, or
2991 * if encver is outside the unsigned 10 bit integer range, 0 is returned,
2992 * otherwise the function returns the right type ID.
2993 *
2994 * The resulting 64 bit integer is composed as follows:
2995 *
2996 * (high order bits) 6|6|6|6|6|6|6|6|6|10 (low order bits)
2997 *
2998 * The first 6 bits value is the first character, name[0], while the last
2999 * 6 bits value, immediately before the 10 bits integer, is name[8].
3000 * The last 10 bits are the encoding version.
3001 *
3002 * Note that a name and encver combo of "AAAAAAAAA" and 0, will produce
3003 * zero as return value, that is the same we use to signal errors, thus
3004 * this combination is invalid, and also useless since type names should
3005 * try to be vary to avoid collisions. */
3006
3007 const char *ModuleTypeNameCharSet =
3008 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
3009 "abcdefghijklmnopqrstuvwxyz"
3010 "0123456789-_";
3011
moduleTypeEncodeId(const char * name,int encver)3012 uint64_t moduleTypeEncodeId(const char *name, int encver) {
3013 /* We use 64 symbols so that we can map each character into 6 bits
3014 * of the final output. */
3015 const char *cset = ModuleTypeNameCharSet;
3016 if (strlen(name) != 9) return 0;
3017 if (encver < 0 || encver > 1023) return 0;
3018
3019 uint64_t id = 0;
3020 for (int j = 0; j < 9; j++) {
3021 char *p = strchr(cset,name[j]);
3022 if (!p) return 0;
3023 unsigned long pos = p-cset;
3024 id = (id << 6) | pos;
3025 }
3026 id = (id << 10) | encver;
3027 return id;
3028 }
3029
3030 /* Search, in the list of exported data types of all the modules registered,
3031 * a type with the same name as the one given. Returns the moduleType
3032 * structure pointer if such a module is found, or NULL otherwise. */
moduleTypeLookupModuleByName(const char * name)3033 moduleType *moduleTypeLookupModuleByName(const char *name) {
3034 dictIterator *di = dictGetIterator(modules);
3035 dictEntry *de;
3036
3037 while ((de = dictNext(di)) != NULL) {
3038 struct RedisModule *module = dictGetVal(de);
3039 listIter li;
3040 listNode *ln;
3041
3042 listRewind(module->types,&li);
3043 while((ln = listNext(&li))) {
3044 moduleType *mt = ln->value;
3045 if (memcmp(name,mt->name,sizeof(mt->name)) == 0) {
3046 dictReleaseIterator(di);
3047 return mt;
3048 }
3049 }
3050 }
3051 dictReleaseIterator(di);
3052 return NULL;
3053 }
3054
3055 /* Lookup a module by ID, with caching. This function is used during RDB
3056 * loading. Modules exporting data types should never be able to unload, so
3057 * our cache does not need to expire. */
3058 #define MODULE_LOOKUP_CACHE_SIZE 3
3059
moduleTypeLookupModuleByID(uint64_t id)3060 moduleType *moduleTypeLookupModuleByID(uint64_t id) {
3061 static struct {
3062 uint64_t id;
3063 moduleType *mt;
3064 } cache[MODULE_LOOKUP_CACHE_SIZE];
3065
3066 /* Search in cache to start. */
3067 int j;
3068 for (j = 0; j < MODULE_LOOKUP_CACHE_SIZE && cache[j].mt != NULL; j++)
3069 if (cache[j].id == id) return cache[j].mt;
3070
3071 /* Slow module by module lookup. */
3072 moduleType *mt = NULL;
3073 dictIterator *di = dictGetIterator(modules);
3074 dictEntry *de;
3075
3076 while ((de = dictNext(di)) != NULL && mt == NULL) {
3077 struct RedisModule *module = dictGetVal(de);
3078 listIter li;
3079 listNode *ln;
3080
3081 listRewind(module->types,&li);
3082 while((ln = listNext(&li))) {
3083 moduleType *this_mt = ln->value;
3084 /* Compare only the 54 bit module identifier and not the
3085 * encoding version. */
3086 if (this_mt->id >> 10 == id >> 10) {
3087 mt = this_mt;
3088 break;
3089 }
3090 }
3091 }
3092 dictReleaseIterator(di);
3093
3094 /* Add to cache if possible. */
3095 if (mt && j < MODULE_LOOKUP_CACHE_SIZE) {
3096 cache[j].id = id;
3097 cache[j].mt = mt;
3098 }
3099 return mt;
3100 }
3101
3102 /* Turn an (unresolved) module ID into a type name, to show the user an
3103 * error when RDB files contain module data we can't load.
3104 * The buffer pointed by 'name' must be 10 bytes at least. The function will
3105 * fill it with a null terminated module name. */
moduleTypeNameByID(char * name,uint64_t moduleid)3106 void moduleTypeNameByID(char *name, uint64_t moduleid) {
3107 const char *cset = ModuleTypeNameCharSet;
3108
3109 name[9] = '\0';
3110 char *p = name+8;
3111 moduleid >>= 10;
3112 for (int j = 0; j < 9; j++) {
3113 *p-- = cset[moduleid & 63];
3114 moduleid >>= 6;
3115 }
3116 }
3117
3118 /* Register a new data type exported by the module. The parameters are the
3119 * following. Please for in depth documentation check the modules API
3120 * documentation, especially the TYPES.md file.
3121 *
3122 * * **name**: A 9 characters data type name that MUST be unique in the Redis
3123 * Modules ecosystem. Be creative... and there will be no collisions. Use
3124 * the charset A-Z a-z 9-0, plus the two "-_" characters. A good
3125 * idea is to use, for example `<typename>-<vendor>`. For example
3126 * "tree-AntZ" may mean "Tree data structure by @antirez". To use both
3127 * lower case and upper case letters helps in order to prevent collisions.
3128 * * **encver**: Encoding version, which is, the version of the serialization
3129 * that a module used in order to persist data. As long as the "name"
3130 * matches, the RDB loading will be dispatched to the type callbacks
3131 * whatever 'encver' is used, however the module can understand if
3132 * the encoding it must load are of an older version of the module.
3133 * For example the module "tree-AntZ" initially used encver=0. Later
3134 * after an upgrade, it started to serialize data in a different format
3135 * and to register the type with encver=1. However this module may
3136 * still load old data produced by an older version if the rdb_load
3137 * callback is able to check the encver value and act accordingly.
3138 * The encver must be a positive value between 0 and 1023.
3139 * * **typemethods_ptr** is a pointer to a RedisModuleTypeMethods structure
3140 * that should be populated with the methods callbacks and structure
3141 * version, like in the following example:
3142 *
3143 * RedisModuleTypeMethods tm = {
3144 * .version = REDISMODULE_TYPE_METHOD_VERSION,
3145 * .rdb_load = myType_RDBLoadCallBack,
3146 * .rdb_save = myType_RDBSaveCallBack,
3147 * .aof_rewrite = myType_AOFRewriteCallBack,
3148 * .free = myType_FreeCallBack,
3149 *
3150 * // Optional fields
3151 * .digest = myType_DigestCallBack,
3152 * .mem_usage = myType_MemUsageCallBack,
3153 * }
3154 *
3155 * * **rdb_load**: A callback function pointer that loads data from RDB files.
3156 * * **rdb_save**: A callback function pointer that saves data to RDB files.
3157 * * **aof_rewrite**: A callback function pointer that rewrites data as commands.
3158 * * **digest**: A callback function pointer that is used for `DEBUG DIGEST`.
3159 * * **free**: A callback function pointer that can free a type value.
3160 *
3161 * The **digest* and **mem_usage** methods should currently be omitted since
3162 * they are not yet implemented inside the Redis modules core.
3163 *
3164 * Note: the module name "AAAAAAAAA" is reserved and produces an error, it
3165 * happens to be pretty lame as well.
3166 *
3167 * If there is already a module registering a type with the same name,
3168 * and if the module name or encver is invalid, NULL is returned.
3169 * Otherwise the new type is registered into Redis, and a reference of
3170 * type RedisModuleType is returned: the caller of the function should store
3171 * this reference into a gobal variable to make future use of it in the
3172 * modules type API, since a single module may register multiple types.
3173 * Example code fragment:
3174 *
3175 * static RedisModuleType *BalancedTreeType;
3176 *
3177 * int RedisModule_OnLoad(RedisModuleCtx *ctx) {
3178 * // some code here ...
3179 * BalancedTreeType = RM_CreateDataType(...);
3180 * }
3181 */
RM_CreateDataType(RedisModuleCtx * ctx,const char * name,int encver,void * typemethods_ptr)3182 moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver, void *typemethods_ptr) {
3183 uint64_t id = moduleTypeEncodeId(name,encver);
3184 if (id == 0) return NULL;
3185 if (moduleTypeLookupModuleByName(name) != NULL) return NULL;
3186
3187 long typemethods_version = ((long*)typemethods_ptr)[0];
3188 if (typemethods_version == 0) return NULL;
3189
3190 struct typemethods {
3191 uint64_t version;
3192 moduleTypeLoadFunc rdb_load;
3193 moduleTypeSaveFunc rdb_save;
3194 moduleTypeRewriteFunc aof_rewrite;
3195 moduleTypeMemUsageFunc mem_usage;
3196 moduleTypeDigestFunc digest;
3197 moduleTypeFreeFunc free;
3198 struct {
3199 moduleTypeAuxLoadFunc aux_load;
3200 moduleTypeAuxSaveFunc aux_save;
3201 int aux_save_triggers;
3202 } v2;
3203 } *tms = (struct typemethods*) typemethods_ptr;
3204
3205 moduleType *mt = zcalloc(sizeof(*mt));
3206 mt->id = id;
3207 mt->module = ctx->module;
3208 mt->rdb_load = tms->rdb_load;
3209 mt->rdb_save = tms->rdb_save;
3210 mt->aof_rewrite = tms->aof_rewrite;
3211 mt->mem_usage = tms->mem_usage;
3212 mt->digest = tms->digest;
3213 mt->free = tms->free;
3214 if (tms->version >= 2) {
3215 mt->aux_load = tms->v2.aux_load;
3216 mt->aux_save = tms->v2.aux_save;
3217 mt->aux_save_triggers = tms->v2.aux_save_triggers;
3218 }
3219 memcpy(mt->name,name,sizeof(mt->name));
3220 listAddNodeTail(ctx->module->types,mt);
3221 return mt;
3222 }
3223
3224 /* If the key is open for writing, set the specified module type object
3225 * as the value of the key, deleting the old value if any.
3226 * On success REDISMODULE_OK is returned. If the key is not open for
3227 * writing or there is an active iterator, REDISMODULE_ERR is returned. */
RM_ModuleTypeSetValue(RedisModuleKey * key,moduleType * mt,void * value)3228 int RM_ModuleTypeSetValue(RedisModuleKey *key, moduleType *mt, void *value) {
3229 if (!(key->mode & REDISMODULE_WRITE) || key->iter) return REDISMODULE_ERR;
3230 RM_DeleteKey(key);
3231 robj *o = createModuleObject(mt,value);
3232 setKey(key->db,key->key,o);
3233 decrRefCount(o);
3234 key->value = o;
3235 return REDISMODULE_OK;
3236 }
3237
3238 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on
3239 * the key, returns the module type pointer of the value stored at key.
3240 *
3241 * If the key is NULL, is not associated with a module type, or is empty,
3242 * then NULL is returned instead. */
RM_ModuleTypeGetType(RedisModuleKey * key)3243 moduleType *RM_ModuleTypeGetType(RedisModuleKey *key) {
3244 if (key == NULL ||
3245 key->value == NULL ||
3246 RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL;
3247 moduleValue *mv = key->value->ptr;
3248 return mv->type;
3249 }
3250
3251 /* Assuming RedisModule_KeyType() returned REDISMODULE_KEYTYPE_MODULE on
3252 * the key, returns the module type low-level value stored at key, as
3253 * it was set by the user via RedisModule_ModuleTypeSet().
3254 *
3255 * If the key is NULL, is not associated with a module type, or is empty,
3256 * then NULL is returned instead. */
RM_ModuleTypeGetValue(RedisModuleKey * key)3257 void *RM_ModuleTypeGetValue(RedisModuleKey *key) {
3258 if (key == NULL ||
3259 key->value == NULL ||
3260 RM_KeyType(key) != REDISMODULE_KEYTYPE_MODULE) return NULL;
3261 moduleValue *mv = key->value->ptr;
3262 return mv->value;
3263 }
3264
3265 /* --------------------------------------------------------------------------
3266 * RDB loading and saving functions
3267 * -------------------------------------------------------------------------- */
3268
3269 /* Called when there is a load error in the context of a module. This cannot
3270 * be recovered like for the built-in types. */
moduleRDBLoadError(RedisModuleIO * io)3271 void moduleRDBLoadError(RedisModuleIO *io) {
3272 serverLog(LL_WARNING,
3273 "Error loading data from RDB (short read or EOF). "
3274 "Read performed by module '%s' about type '%s' "
3275 "after reading '%llu' bytes of a value.",
3276 io->type->module->name,
3277 io->type->name,
3278 (unsigned long long)io->bytes);
3279 exit(1);
3280 }
3281
3282 /* Save an unsigned 64 bit value into the RDB file. This function should only
3283 * be called in the context of the rdb_save method of modules implementing new
3284 * data types. */
RM_SaveUnsigned(RedisModuleIO * io,uint64_t value)3285 void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) {
3286 if (io->error) return;
3287 /* Save opcode. */
3288 int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_UINT);
3289 if (retval == -1) goto saveerr;
3290 io->bytes += retval;
3291 /* Save value. */
3292 retval = rdbSaveLen(io->rio, value);
3293 if (retval == -1) goto saveerr;
3294 io->bytes += retval;
3295 return;
3296
3297 saveerr:
3298 io->error = 1;
3299 }
3300
3301 /* Load an unsigned 64 bit value from the RDB file. This function should only
3302 * be called in the context of the rdb_load method of modules implementing
3303 * new data types. */
RM_LoadUnsigned(RedisModuleIO * io)3304 uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
3305 if (io->ver == 2) {
3306 uint64_t opcode = rdbLoadLen(io->rio,NULL);
3307 if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr;
3308 }
3309 uint64_t value;
3310 int retval = rdbLoadLenByRef(io->rio, NULL, &value);
3311 if (retval == -1) goto loaderr;
3312 return value;
3313
3314 loaderr:
3315 moduleRDBLoadError(io);
3316 return 0; /* Never reached. */
3317 }
3318
3319 /* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */
RM_SaveSigned(RedisModuleIO * io,int64_t value)3320 void RM_SaveSigned(RedisModuleIO *io, int64_t value) {
3321 union {uint64_t u; int64_t i;} conv;
3322 conv.i = value;
3323 RM_SaveUnsigned(io,conv.u);
3324 }
3325
3326 /* Like RedisModule_LoadUnsigned() but for signed 64 bit values. */
RM_LoadSigned(RedisModuleIO * io)3327 int64_t RM_LoadSigned(RedisModuleIO *io) {
3328 union {uint64_t u; int64_t i;} conv;
3329 conv.u = RM_LoadUnsigned(io);
3330 return conv.i;
3331 }
3332
3333 /* In the context of the rdb_save method of a module type, saves a
3334 * string into the RDB file taking as input a RedisModuleString.
3335 *
3336 * The string can be later loaded with RedisModule_LoadString() or
3337 * other Load family functions expecting a serialized string inside
3338 * the RDB file. */
RM_SaveString(RedisModuleIO * io,RedisModuleString * s)3339 void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) {
3340 if (io->error) return;
3341 /* Save opcode. */
3342 ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
3343 if (retval == -1) goto saveerr;
3344 io->bytes += retval;
3345 /* Save value. */
3346 retval = rdbSaveStringObject(io->rio, s);
3347 if (retval == -1) goto saveerr;
3348 io->bytes += retval;
3349 return;
3350
3351 saveerr:
3352 io->error = 1;
3353 }
3354
3355 /* Like RedisModule_SaveString() but takes a raw C pointer and length
3356 * as input. */
RM_SaveStringBuffer(RedisModuleIO * io,const char * str,size_t len)3357 void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) {
3358 if (io->error) return;
3359 /* Save opcode. */
3360 ssize_t retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
3361 if (retval == -1) goto saveerr;
3362 io->bytes += retval;
3363 /* Save value. */
3364 retval = rdbSaveRawString(io->rio, (unsigned char*)str,len);
3365 if (retval == -1) goto saveerr;
3366 io->bytes += retval;
3367 return;
3368
3369 saveerr:
3370 io->error = 1;
3371 }
3372
3373 /* Implements RM_LoadString() and RM_LoadStringBuffer() */
moduleLoadString(RedisModuleIO * io,int plain,size_t * lenptr)3374 void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
3375 if (io->ver == 2) {
3376 uint64_t opcode = rdbLoadLen(io->rio,NULL);
3377 if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr;
3378 }
3379 void *s = rdbGenericLoadStringObject(io->rio,
3380 plain ? RDB_LOAD_PLAIN : RDB_LOAD_NONE, lenptr);
3381 if (s == NULL) goto loaderr;
3382 return s;
3383
3384 loaderr:
3385 moduleRDBLoadError(io);
3386 return NULL; /* Never reached. */
3387 }
3388
3389 /* In the context of the rdb_load method of a module data type, loads a string
3390 * from the RDB file, that was previously saved with RedisModule_SaveString()
3391 * functions family.
3392 *
3393 * The returned string is a newly allocated RedisModuleString object, and
3394 * the user should at some point free it with a call to RedisModule_FreeString().
3395 *
3396 * If the data structure does not store strings as RedisModuleString objects,
3397 * the similar function RedisModule_LoadStringBuffer() could be used instead. */
RM_LoadString(RedisModuleIO * io)3398 RedisModuleString *RM_LoadString(RedisModuleIO *io) {
3399 return moduleLoadString(io,0,NULL);
3400 }
3401
3402 /* Like RedisModule_LoadString() but returns an heap allocated string that
3403 * was allocated with RedisModule_Alloc(), and can be resized or freed with
3404 * RedisModule_Realloc() or RedisModule_Free().
3405 *
3406 * The size of the string is stored at '*lenptr' if not NULL.
3407 * The returned string is not automatically NULL termianted, it is loaded
3408 * exactly as it was stored inisde the RDB file. */
RM_LoadStringBuffer(RedisModuleIO * io,size_t * lenptr)3409 char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) {
3410 return moduleLoadString(io,1,lenptr);
3411 }
3412
3413 /* In the context of the rdb_save method of a module data type, saves a double
3414 * value to the RDB file. The double can be a valid number, a NaN or infinity.
3415 * It is possible to load back the value with RedisModule_LoadDouble(). */
RM_SaveDouble(RedisModuleIO * io,double value)3416 void RM_SaveDouble(RedisModuleIO *io, double value) {
3417 if (io->error) return;
3418 /* Save opcode. */
3419 int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_DOUBLE);
3420 if (retval == -1) goto saveerr;
3421 io->bytes += retval;
3422 /* Save value. */
3423 retval = rdbSaveBinaryDoubleValue(io->rio, value);
3424 if (retval == -1) goto saveerr;
3425 io->bytes += retval;
3426 return;
3427
3428 saveerr:
3429 io->error = 1;
3430 }
3431
3432 /* In the context of the rdb_save method of a module data type, loads back the
3433 * double value saved by RedisModule_SaveDouble(). */
RM_LoadDouble(RedisModuleIO * io)3434 double RM_LoadDouble(RedisModuleIO *io) {
3435 if (io->ver == 2) {
3436 uint64_t opcode = rdbLoadLen(io->rio,NULL);
3437 if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr;
3438 }
3439 double value;
3440 int retval = rdbLoadBinaryDoubleValue(io->rio, &value);
3441 if (retval == -1) goto loaderr;
3442 return value;
3443
3444 loaderr:
3445 moduleRDBLoadError(io);
3446 return 0; /* Never reached. */
3447 }
3448
3449 /* In the context of the rdb_save method of a module data type, saves a float
3450 * value to the RDB file. The float can be a valid number, a NaN or infinity.
3451 * It is possible to load back the value with RedisModule_LoadFloat(). */
RM_SaveFloat(RedisModuleIO * io,float value)3452 void RM_SaveFloat(RedisModuleIO *io, float value) {
3453 if (io->error) return;
3454 /* Save opcode. */
3455 int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_FLOAT);
3456 if (retval == -1) goto saveerr;
3457 io->bytes += retval;
3458 /* Save value. */
3459 retval = rdbSaveBinaryFloatValue(io->rio, value);
3460 if (retval == -1) goto saveerr;
3461 io->bytes += retval;
3462 return;
3463
3464 saveerr:
3465 io->error = 1;
3466 }
3467
3468 /* In the context of the rdb_save method of a module data type, loads back the
3469 * float value saved by RedisModule_SaveFloat(). */
RM_LoadFloat(RedisModuleIO * io)3470 float RM_LoadFloat(RedisModuleIO *io) {
3471 if (io->ver == 2) {
3472 uint64_t opcode = rdbLoadLen(io->rio,NULL);
3473 if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr;
3474 }
3475 float value;
3476 int retval = rdbLoadBinaryFloatValue(io->rio, &value);
3477 if (retval == -1) goto loaderr;
3478 return value;
3479
3480 loaderr:
3481 moduleRDBLoadError(io);
3482 return 0; /* Never reached. */
3483 }
3484
3485 /* Iterate over modules, and trigger rdb aux saving for the ones modules types
3486 * who asked for it. */
rdbSaveModulesAux(rio * rdb,int when)3487 ssize_t rdbSaveModulesAux(rio *rdb, int when) {
3488 size_t total_written = 0;
3489 dictIterator *di = dictGetIterator(modules);
3490 dictEntry *de;
3491
3492 while ((de = dictNext(di)) != NULL) {
3493 struct RedisModule *module = dictGetVal(de);
3494 listIter li;
3495 listNode *ln;
3496
3497 listRewind(module->types,&li);
3498 while((ln = listNext(&li))) {
3499 moduleType *mt = ln->value;
3500 if (!mt->aux_save || !(mt->aux_save_triggers & when))
3501 continue;
3502 ssize_t ret = rdbSaveSingleModuleAux(rdb, when, mt);
3503 if (ret==-1) {
3504 dictReleaseIterator(di);
3505 return -1;
3506 }
3507 total_written += ret;
3508 }
3509 }
3510
3511 dictReleaseIterator(di);
3512 return total_written;
3513 }
3514
3515 /* --------------------------------------------------------------------------
3516 * Key digest API (DEBUG DIGEST interface for modules types)
3517 * -------------------------------------------------------------------------- */
3518
3519 /* Add a new element to the digest. This function can be called multiple times
3520 * one element after the other, for all the elements that constitute a given
3521 * data structure. The function call must be followed by the call to
3522 * `RedisModule_DigestEndSequence` eventually, when all the elements that are
3523 * always in a given order are added. See the Redis Modules data types
3524 * documentation for more info. However this is a quick example that uses Redis
3525 * data types as an example.
3526 *
3527 * To add a sequence of unordered elements (for example in the case of a Redis
3528 * Set), the pattern to use is:
3529 *
3530 * foreach element {
3531 * AddElement(element);
3532 * EndSequence();
3533 * }
3534 *
3535 * Because Sets are not ordered, so every element added has a position that
3536 * does not depend from the other. However if instead our elements are
3537 * ordered in pairs, like field-value pairs of an Hash, then one should
3538 * use:
3539 *
3540 * foreach key,value {
3541 * AddElement(key);
3542 * AddElement(value);
3543 * EndSquence();
3544 * }
3545 *
3546 * Because the key and value will be always in the above order, while instead
3547 * the single key-value pairs, can appear in any position into a Redis hash.
3548 *
3549 * A list of ordered elements would be implemented with:
3550 *
3551 * foreach element {
3552 * AddElement(element);
3553 * }
3554 * EndSequence();
3555 *
3556 */
RM_DigestAddStringBuffer(RedisModuleDigest * md,unsigned char * ele,size_t len)3557 void RM_DigestAddStringBuffer(RedisModuleDigest *md, unsigned char *ele, size_t len) {
3558 mixDigest(md->o,ele,len);
3559 }
3560
3561 /* Like `RedisModule_DigestAddStringBuffer()` but takes a long long as input
3562 * that gets converted into a string before adding it to the digest. */
RM_DigestAddLongLong(RedisModuleDigest * md,long long ll)3563 void RM_DigestAddLongLong(RedisModuleDigest *md, long long ll) {
3564 char buf[LONG_STR_SIZE];
3565 size_t len = ll2string(buf,sizeof(buf),ll);
3566 mixDigest(md->o,buf,len);
3567 }
3568
3569 /* See the documentation for `RedisModule_DigestAddElement()`. */
RM_DigestEndSequence(RedisModuleDigest * md)3570 void RM_DigestEndSequence(RedisModuleDigest *md) {
3571 xorDigest(md->x,md->o,sizeof(md->o));
3572 memset(md->o,0,sizeof(md->o));
3573 }
3574
3575 /* --------------------------------------------------------------------------
3576 * AOF API for modules data types
3577 * -------------------------------------------------------------------------- */
3578
3579 /* Emits a command into the AOF during the AOF rewriting process. This function
3580 * is only called in the context of the aof_rewrite method of data types exported
3581 * by a module. The command works exactly like RedisModule_Call() in the way
3582 * the parameters are passed, but it does not return anything as the error
3583 * handling is performed by Redis itself. */
RM_EmitAOF(RedisModuleIO * io,const char * cmdname,const char * fmt,...)3584 void RM_EmitAOF(RedisModuleIO *io, const char *cmdname, const char *fmt, ...) {
3585 if (io->error) return;
3586 struct redisCommand *cmd;
3587 robj **argv = NULL;
3588 int argc = 0, flags = 0, j;
3589 va_list ap;
3590
3591 cmd = lookupCommandByCString((char*)cmdname);
3592 if (!cmd) {
3593 serverLog(LL_WARNING,
3594 "Fatal: AOF method for module data type '%s' tried to "
3595 "emit unknown command '%s'",
3596 io->type->name, cmdname);
3597 io->error = 1;
3598 errno = EINVAL;
3599 return;
3600 }
3601
3602 /* Emit the arguments into the AOF in Redis protocol format. */
3603 va_start(ap, fmt);
3604 argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap);
3605 va_end(ap);
3606 if (argv == NULL) {
3607 serverLog(LL_WARNING,
3608 "Fatal: AOF method for module data type '%s' tried to "
3609 "call RedisModule_EmitAOF() with wrong format specifiers '%s'",
3610 io->type->name, fmt);
3611 io->error = 1;
3612 errno = EINVAL;
3613 return;
3614 }
3615
3616 /* Bulk count. */
3617 if (!io->error && rioWriteBulkCount(io->rio,'*',argc) == 0)
3618 io->error = 1;
3619
3620 /* Arguments. */
3621 for (j = 0; j < argc; j++) {
3622 if (!io->error && rioWriteBulkObject(io->rio,argv[j]) == 0)
3623 io->error = 1;
3624 decrRefCount(argv[j]);
3625 }
3626 zfree(argv);
3627 return;
3628 }
3629
3630 /* --------------------------------------------------------------------------
3631 * IO context handling
3632 * -------------------------------------------------------------------------- */
3633
RM_GetContextFromIO(RedisModuleIO * io)3634 RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) {
3635 if (io->ctx) return io->ctx; /* Can't have more than one... */
3636 RedisModuleCtx ctxtemplate = REDISMODULE_CTX_INIT;
3637 io->ctx = zmalloc(sizeof(RedisModuleCtx));
3638 *(io->ctx) = ctxtemplate;
3639 io->ctx->module = io->type->module;
3640 io->ctx->client = NULL;
3641 return io->ctx;
3642 }
3643
3644 /* Returns a RedisModuleString with the name of the key currently saving or
3645 * loading, when an IO data type callback is called. There is no guarantee
3646 * that the key name is always available, so this may return NULL.
3647 */
RM_GetKeyNameFromIO(RedisModuleIO * io)3648 const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) {
3649 return io->key;
3650 }
3651
3652 /* --------------------------------------------------------------------------
3653 * Logging
3654 * -------------------------------------------------------------------------- */
3655
3656 /* This is the low level function implementing both:
3657 *
3658 * RM_Log()
3659 * RM_LogIOError()
3660 *
3661 */
RM_LogRaw(RedisModule * module,const char * levelstr,const char * fmt,va_list ap)3662 void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_list ap) {
3663 char msg[LOG_MAX_LEN];
3664 size_t name_len;
3665 int level;
3666
3667 if (!strcasecmp(levelstr,"debug")) level = LL_DEBUG;
3668 else if (!strcasecmp(levelstr,"verbose")) level = LL_VERBOSE;
3669 else if (!strcasecmp(levelstr,"notice")) level = LL_NOTICE;
3670 else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING;
3671 else level = LL_VERBOSE; /* Default. */
3672
3673 if (level < server.verbosity) return;
3674
3675 name_len = snprintf(msg, sizeof(msg),"<%s> ", module? module->name: "module");
3676 vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap);
3677 serverLogRaw(level,msg);
3678 }
3679
3680 /* Produces a log message to the standard Redis log, the format accepts
3681 * printf-alike specifiers, while level is a string describing the log
3682 * level to use when emitting the log, and must be one of the following:
3683 *
3684 * * "debug"
3685 * * "verbose"
3686 * * "notice"
3687 * * "warning"
3688 *
3689 * If the specified log level is invalid, verbose is used by default.
3690 * There is a fixed limit to the length of the log line this function is able
3691 * to emit, this limit is not specified but is guaranteed to be more than
3692 * a few lines of text.
3693 *
3694 * The ctx argument may be NULL if cannot be provided in the context of the
3695 * caller for instance threads or callbacks, in which case a generic "module"
3696 * will be used instead of the module name.
3697 */
RM_Log(RedisModuleCtx * ctx,const char * levelstr,const char * fmt,...)3698 void RM_Log(RedisModuleCtx *ctx, const char *levelstr, const char *fmt, ...) {
3699 va_list ap;
3700 va_start(ap, fmt);
3701 RM_LogRaw(ctx? ctx->module: NULL,levelstr,fmt,ap);
3702 va_end(ap);
3703 }
3704
3705 /* Log errors from RDB / AOF serialization callbacks.
3706 *
3707 * This function should be used when a callback is returning a critical
3708 * error to the caller since cannot load or save the data for some
3709 * critical reason. */
RM_LogIOError(RedisModuleIO * io,const char * levelstr,const char * fmt,...)3710 void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...) {
3711 va_list ap;
3712 va_start(ap, fmt);
3713 RM_LogRaw(io->type->module,levelstr,fmt,ap);
3714 va_end(ap);
3715 }
3716
3717 /* --------------------------------------------------------------------------
3718 * Blocking clients from modules
3719 * -------------------------------------------------------------------------- */
3720
3721 /* Readable handler for the awake pipe. We do nothing here, the awake bytes
3722 * will be actually read in a more appropriate place in the
3723 * moduleHandleBlockedClients() function that is where clients are actually
3724 * served. */
moduleBlockedClientPipeReadable(aeEventLoop * el,int fd,void * privdata,int mask)3725 void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
3726 UNUSED(el);
3727 UNUSED(fd);
3728 UNUSED(mask);
3729 UNUSED(privdata);
3730 }
3731
3732 /* This is called from blocked.c in order to unblock a client: may be called
3733 * for multiple reasons while the client is in the middle of being blocked
3734 * because the client is terminated, but is also called for cleanup when a
3735 * client is unblocked in a clean way after replaying.
3736 *
3737 * What we do here is just to set the client to NULL in the redis module
3738 * blocked client handle. This way if the client is terminated while there
3739 * is a pending threaded operation involving the blocked client, we'll know
3740 * that the client no longer exists and no reply callback should be called.
3741 *
3742 * The structure RedisModuleBlockedClient will be always deallocated when
3743 * running the list of clients blocked by a module that need to be unblocked. */
unblockClientFromModule(client * c)3744 void unblockClientFromModule(client *c) {
3745 RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
3746
3747 /* Call the disconnection callback if any. */
3748 if (bc->disconnect_callback) {
3749 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3750 ctx.blocked_privdata = bc->privdata;
3751 ctx.module = bc->module;
3752 ctx.client = bc->client;
3753 bc->disconnect_callback(&ctx,bc);
3754 moduleFreeContext(&ctx);
3755 }
3756
3757 bc->client = NULL;
3758 /* Reset the client for a new query since, for blocking commands implemented
3759 * into modules, we do not it immediately after the command returns (and
3760 * the client blocks) in order to be still able to access the argument
3761 * vector from callbacks. */
3762 resetClient(c);
3763 }
3764
3765 /* Block a client in the context of a blocking command, returning an handle
3766 * which will be used, later, in order to unblock the client with a call to
3767 * RedisModule_UnblockClient(). The arguments specify callback functions
3768 * and a timeout after which the client is unblocked.
3769 *
3770 * The callbacks are called in the following contexts:
3771 *
3772 * reply_callback: called after a successful RedisModule_UnblockClient()
3773 * call in order to reply to the client and unblock it.
3774 *
3775 * reply_timeout: called when the timeout is reached in order to send an
3776 * error to the client.
3777 *
3778 * free_privdata: called in order to free the private data that is passed
3779 * by RedisModule_UnblockClient() call.
3780 */
RM_BlockClient(RedisModuleCtx * ctx,RedisModuleCmdFunc reply_callback,RedisModuleCmdFunc timeout_callback,void (* free_privdata)(RedisModuleCtx *,void *),long long timeout_ms)3781 RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
3782 client *c = ctx->client;
3783 int islua = c->flags & CLIENT_LUA;
3784 int ismulti = c->flags & CLIENT_MULTI;
3785
3786 c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
3787 RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
3788
3789 /* We need to handle the invalid operation of calling modules blocking
3790 * commands from Lua or MULTI. We actually create an already aborted
3791 * (client set to NULL) blocked client handle, and actually reply with
3792 * an error. */
3793 bc->client = (islua || ismulti) ? NULL : c;
3794 bc->module = ctx->module;
3795 bc->reply_callback = reply_callback;
3796 bc->timeout_callback = timeout_callback;
3797 bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
3798 bc->free_privdata = free_privdata;
3799 bc->privdata = NULL;
3800 bc->reply_client = createClient(-1);
3801 bc->reply_client->flags |= CLIENT_MODULE;
3802 bc->dbid = c->db->id;
3803 c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
3804
3805 if (islua || ismulti) {
3806 c->bpop.module_blocked_handle = NULL;
3807 addReplyError(c, islua ?
3808 "Blocking module command called from Lua script" :
3809 "Blocking module command called from transaction");
3810 } else {
3811 blockClient(c,BLOCKED_MODULE);
3812 }
3813 return bc;
3814 }
3815
3816 /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
3817 * the reply callbacks to be called in order to reply to the client.
3818 * The 'privdata' argument will be accessible by the reply callback, so
3819 * the caller of this function can pass any value that is needed in order to
3820 * actually reply to the client.
3821 *
3822 * A common usage for 'privdata' is a thread that computes something that
3823 * needs to be passed to the client, included but not limited some slow
3824 * to compute reply or some reply obtained via networking.
3825 *
3826 * Note: this function can be called from threads spawned by the module. */
RM_UnblockClient(RedisModuleBlockedClient * bc,void * privdata)3827 int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
3828 pthread_mutex_lock(&moduleUnblockedClientsMutex);
3829 bc->privdata = privdata;
3830 listAddNodeTail(moduleUnblockedClients,bc);
3831 if (write(server.module_blocked_pipe[1],"A",1) != 1) {
3832 /* Ignore the error, this is best-effort. */
3833 }
3834 pthread_mutex_unlock(&moduleUnblockedClientsMutex);
3835 return REDISMODULE_OK;
3836 }
3837
3838 /* Abort a blocked client blocking operation: the client will be unblocked
3839 * without firing any callback. */
RM_AbortBlock(RedisModuleBlockedClient * bc)3840 int RM_AbortBlock(RedisModuleBlockedClient *bc) {
3841 bc->reply_callback = NULL;
3842 bc->disconnect_callback = NULL;
3843 return RM_UnblockClient(bc,NULL);
3844 }
3845
3846 /* Set a callback that will be called if a blocked client disconnects
3847 * before the module has a chance to call RedisModule_UnblockClient()
3848 *
3849 * Usually what you want to do there, is to cleanup your module state
3850 * so that you can call RedisModule_UnblockClient() safely, otherwise
3851 * the client will remain blocked forever if the timeout is large.
3852 *
3853 * Notes:
3854 *
3855 * 1. It is not safe to call Reply* family functions here, it is also
3856 * useless since the client is gone.
3857 *
3858 * 2. This callback is not called if the client disconnects because of
3859 * a timeout. In such a case, the client is unblocked automatically
3860 * and the timeout callback is called.
3861 */
RM_SetDisconnectCallback(RedisModuleBlockedClient * bc,RedisModuleDisconnectFunc callback)3862 void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback) {
3863 bc->disconnect_callback = callback;
3864 }
3865
3866 /* This function will check the moduleUnblockedClients queue in order to
3867 * call the reply callback and really unblock the client.
3868 *
3869 * Clients end into this list because of calls to RM_UnblockClient(),
3870 * however it is possible that while the module was doing work for the
3871 * blocked client, it was terminated by Redis (for timeout or other reasons).
3872 * When this happens the RedisModuleBlockedClient structure in the queue
3873 * will have the 'client' field set to NULL. */
moduleHandleBlockedClients(void)3874 void moduleHandleBlockedClients(void) {
3875 listNode *ln;
3876 RedisModuleBlockedClient *bc;
3877
3878 pthread_mutex_lock(&moduleUnblockedClientsMutex);
3879 /* Here we unblock all the pending clients blocked in modules operations
3880 * so we can read every pending "awake byte" in the pipe. */
3881 char buf[1];
3882 while (read(server.module_blocked_pipe[0],buf,1) == 1);
3883 while (listLength(moduleUnblockedClients)) {
3884 ln = listFirst(moduleUnblockedClients);
3885 bc = ln->value;
3886 client *c = bc->client;
3887 listDelNode(moduleUnblockedClients,ln);
3888 pthread_mutex_unlock(&moduleUnblockedClientsMutex);
3889
3890 /* Release the lock during the loop, as long as we don't
3891 * touch the shared list. */
3892
3893 /* Call the reply callback if the client is valid and we have
3894 * any callback. */
3895 if (c && bc->reply_callback) {
3896 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3897 ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
3898 ctx.blocked_privdata = bc->privdata;
3899 ctx.module = bc->module;
3900 ctx.client = bc->client;
3901 ctx.blocked_client = bc;
3902 bc->reply_callback(&ctx,(void**)c->argv,c->argc);
3903 moduleFreeContext(&ctx);
3904 }
3905
3906 /* Free privdata if any. */
3907 if (bc->privdata && bc->free_privdata) {
3908 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3909 if (c == NULL)
3910 ctx.flags |= REDISMODULE_CTX_BLOCKED_DISCONNECTED;
3911 ctx.blocked_privdata = bc->privdata;
3912 ctx.module = bc->module;
3913 ctx.client = bc->client;
3914 bc->free_privdata(&ctx,bc->privdata);
3915 moduleFreeContext(&ctx);
3916 }
3917
3918 /* It is possible that this blocked client object accumulated
3919 * replies to send to the client in a thread safe context.
3920 * We need to glue such replies to the client output buffer and
3921 * free the temporary client we just used for the replies. */
3922 if (c) AddReplyFromClient(c, bc->reply_client);
3923 freeClient(bc->reply_client);
3924
3925 if (c != NULL) {
3926 /* Before unblocking the client, set the disconnect callback
3927 * to NULL, because if we reached this point, the client was
3928 * properly unblocked by the module. */
3929 bc->disconnect_callback = NULL;
3930 unblockClient(c);
3931 /* Put the client in the list of clients that need to write
3932 * if there are pending replies here. This is needed since
3933 * during a non blocking command the client may receive output. */
3934 if (clientHasPendingReplies(c) &&
3935 !(c->flags & CLIENT_PENDING_WRITE))
3936 {
3937 c->flags |= CLIENT_PENDING_WRITE;
3938 listAddNodeHead(server.clients_pending_write,c);
3939 }
3940 }
3941
3942 /* Free 'bc' only after unblocking the client, since it is
3943 * referenced in the client blocking context, and must be valid
3944 * when calling unblockClient(). */
3945 zfree(bc);
3946
3947 /* Lock again before to iterate the loop. */
3948 pthread_mutex_lock(&moduleUnblockedClientsMutex);
3949 }
3950 pthread_mutex_unlock(&moduleUnblockedClientsMutex);
3951 }
3952
3953 /* Called when our client timed out. After this function unblockClient()
3954 * is called, and it will invalidate the blocked client. So this function
3955 * does not need to do any cleanup. Eventually the module will call the
3956 * API to unblock the client and the memory will be released. */
moduleBlockedClientTimedOut(client * c)3957 void moduleBlockedClientTimedOut(client *c) {
3958 RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
3959 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
3960 ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT;
3961 ctx.module = bc->module;
3962 ctx.client = bc->client;
3963 ctx.blocked_client = bc;
3964 bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
3965 moduleFreeContext(&ctx);
3966 /* For timeout events, we do not want to call the disconnect callback,
3967 * because the blocked client will be automatically disconnected in
3968 * this case, and the user can still hook using the timeout callback. */
3969 bc->disconnect_callback = NULL;
3970 }
3971
3972 /* Return non-zero if a module command was called in order to fill the
3973 * reply for a blocked client. */
RM_IsBlockedReplyRequest(RedisModuleCtx * ctx)3974 int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx) {
3975 return (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) != 0;
3976 }
3977
3978 /* Return non-zero if a module command was called in order to fill the
3979 * reply for a blocked client that timed out. */
RM_IsBlockedTimeoutRequest(RedisModuleCtx * ctx)3980 int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx) {
3981 return (ctx->flags & REDISMODULE_CTX_BLOCKED_TIMEOUT) != 0;
3982 }
3983
3984 /* Get the private data set by RedisModule_UnblockClient() */
RM_GetBlockedClientPrivateData(RedisModuleCtx * ctx)3985 void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
3986 return ctx->blocked_privdata;
3987 }
3988
3989 /* Get the blocked client associated with a given context.
3990 * This is useful in the reply and timeout callbacks of blocked clients,
3991 * before sometimes the module has the blocked client handle references
3992 * around, and wants to cleanup it. */
RM_GetBlockedClientHandle(RedisModuleCtx * ctx)3993 RedisModuleBlockedClient *RM_GetBlockedClientHandle(RedisModuleCtx *ctx) {
3994 return ctx->blocked_client;
3995 }
3996
3997 /* Return true if when the free callback of a blocked client is called,
3998 * the reason for the client to be unblocked is that it disconnected
3999 * while it was blocked. */
RM_BlockedClientDisconnected(RedisModuleCtx * ctx)4000 int RM_BlockedClientDisconnected(RedisModuleCtx *ctx) {
4001 return (ctx->flags & REDISMODULE_CTX_BLOCKED_DISCONNECTED) != 0;
4002 }
4003
4004 /* --------------------------------------------------------------------------
4005 * Thread Safe Contexts
4006 * -------------------------------------------------------------------------- */
4007
4008 /* Return a context which can be used inside threads to make Redis context
4009 * calls with certain modules APIs. If 'bc' is not NULL then the module will
4010 * be bound to a blocked client, and it will be possible to use the
4011 * `RedisModule_Reply*` family of functions to accumulate a reply for when the
4012 * client will be unblocked. Otherwise the thread safe context will be
4013 * detached by a specific client.
4014 *
4015 * To call non-reply APIs, the thread safe context must be prepared with:
4016 *
4017 * RedisModule_ThreadSafeContextLock(ctx);
4018 * ... make your call here ...
4019 * RedisModule_ThreadSafeContextUnlock(ctx);
4020 *
4021 * This is not needed when using `RedisModule_Reply*` functions, assuming
4022 * that a blocked client was used when the context was created, otherwise
4023 * no RedisModule_Reply* call should be made at all.
4024 *
4025 * TODO: thread safe contexts do not inherit the blocked client
4026 * selected database. */
RM_GetThreadSafeContext(RedisModuleBlockedClient * bc)4027 RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
4028 RedisModuleCtx *ctx = zmalloc(sizeof(*ctx));
4029 RedisModuleCtx empty = REDISMODULE_CTX_INIT;
4030 memcpy(ctx,&empty,sizeof(empty));
4031 if (bc) {
4032 ctx->blocked_client = bc;
4033 ctx->module = bc->module;
4034 }
4035 ctx->flags |= REDISMODULE_CTX_THREAD_SAFE;
4036 /* Even when the context is associated with a blocked client, we can't
4037 * access it safely from another thread, so we create a fake client here
4038 * in order to keep things like the currently selected database and similar
4039 * things. */
4040 ctx->client = createClient(-1);
4041 if (bc) {
4042 selectDb(ctx->client,bc->dbid);
4043 if (bc->client) ctx->client->id = bc->client->id;
4044 }
4045 return ctx;
4046 }
4047
4048 /* Release a thread safe context. */
RM_FreeThreadSafeContext(RedisModuleCtx * ctx)4049 void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
4050 moduleFreeContext(ctx);
4051 zfree(ctx);
4052 }
4053
4054 /* Acquire the server lock before executing a thread safe API call.
4055 * This is not needed for `RedisModule_Reply*` calls when there is
4056 * a blocked client connected to the thread safe context. */
RM_ThreadSafeContextLock(RedisModuleCtx * ctx)4057 void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
4058 UNUSED(ctx);
4059 moduleAcquireGIL();
4060 }
4061
4062 /* Release the server lock after a thread safe API call was executed. */
RM_ThreadSafeContextUnlock(RedisModuleCtx * ctx)4063 void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
4064 UNUSED(ctx);
4065 moduleReleaseGIL();
4066 }
4067
moduleAcquireGIL(void)4068 void moduleAcquireGIL(void) {
4069 pthread_mutex_lock(&moduleGIL);
4070 }
4071
moduleReleaseGIL(void)4072 void moduleReleaseGIL(void) {
4073 pthread_mutex_unlock(&moduleGIL);
4074 }
4075
4076
4077 /* --------------------------------------------------------------------------
4078 * Module Keyspace Notifications API
4079 * -------------------------------------------------------------------------- */
4080
4081 /* Subscribe to keyspace notifications. This is a low-level version of the
4082 * keyspace-notifications API. A module can register callbacks to be notified
4083 * when keyspce events occur.
4084 *
4085 * Notification events are filtered by their type (string events, set events,
4086 * etc), and the subscriber callback receives only events that match a specific
4087 * mask of event types.
4088 *
4089 * When subscribing to notifications with RedisModule_SubscribeToKeyspaceEvents
4090 * the module must provide an event type-mask, denoting the events the subscriber
4091 * is interested in. This can be an ORed mask of any of the following flags:
4092 *
4093 * - REDISMODULE_NOTIFY_GENERIC: Generic commands like DEL, EXPIRE, RENAME
4094 * - REDISMODULE_NOTIFY_STRING: String events
4095 * - REDISMODULE_NOTIFY_LIST: List events
4096 * - REDISMODULE_NOTIFY_SET: Set events
4097 * - REDISMODULE_NOTIFY_HASH: Hash events
4098 * - REDISMODULE_NOTIFY_ZSET: Sorted Set events
4099 * - REDISMODULE_NOTIFY_EXPIRED: Expiration events
4100 * - REDISMODULE_NOTIFY_EVICTED: Eviction events
4101 * - REDISMODULE_NOTIFY_STREAM: Stream events
4102 * - REDISMODULE_NOTIFY_ALL: All events
4103 *
4104 * We do not distinguish between key events and keyspace events, and it is up
4105 * to the module to filter the actions taken based on the key.
4106 *
4107 * The subscriber signature is:
4108 *
4109 * int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type,
4110 * const char *event,
4111 * RedisModuleString *key);
4112 *
4113 * `type` is the event type bit, that must match the mask given at registration
4114 * time. The event string is the actual command being executed, and key is the
4115 * relevant Redis key.
4116 *
4117 * Notification callback gets executed with a redis context that can not be
4118 * used to send anything to the client, and has the db number where the event
4119 * occurred as its selected db number.
4120 *
4121 * Notice that it is not necessary to enable notifications in redis.conf for
4122 * module notifications to work.
4123 *
4124 * Warning: the notification callbacks are performed in a synchronous manner,
4125 * so notification callbacks must to be fast, or they would slow Redis down.
4126 * If you need to take long actions, use threads to offload them.
4127 *
4128 * See https://redis.io/topics/notifications for more information.
4129 */
RM_SubscribeToKeyspaceEvents(RedisModuleCtx * ctx,int types,RedisModuleNotificationFunc callback)4130 int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
4131 RedisModuleKeyspaceSubscriber *sub = zmalloc(sizeof(*sub));
4132 sub->module = ctx->module;
4133 sub->event_mask = types;
4134 sub->notify_callback = callback;
4135 sub->active = 0;
4136
4137 listAddNodeTail(moduleKeyspaceSubscribers, sub);
4138 return REDISMODULE_OK;
4139 }
4140
4141 /* Dispatcher for keyspace notifications to module subscriber functions.
4142 * This gets called only if at least one module requested to be notified on
4143 * keyspace notifications */
moduleNotifyKeyspaceEvent(int type,const char * event,robj * key,int dbid)4144 void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
4145 /* Don't do anything if there aren't any subscribers */
4146 if (listLength(moduleKeyspaceSubscribers) == 0) return;
4147
4148 listIter li;
4149 listNode *ln;
4150 listRewind(moduleKeyspaceSubscribers,&li);
4151
4152 /* Remove irrelevant flags from the type mask */
4153 type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);
4154
4155 while((ln = listNext(&li))) {
4156 RedisModuleKeyspaceSubscriber *sub = ln->value;
4157 /* Only notify subscribers on events matching they registration,
4158 * and avoid subscribers triggering themselves */
4159 if ((sub->event_mask & type) && sub->active == 0) {
4160 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
4161 ctx.module = sub->module;
4162 ctx.client = moduleFreeContextReusedClient;
4163 selectDb(ctx.client, dbid);
4164
4165 /* mark the handler as active to avoid reentrant loops.
4166 * If the subscriber performs an action triggering itself,
4167 * it will not be notified about it. */
4168 sub->active = 1;
4169 sub->notify_callback(&ctx, type, event, key);
4170 sub->active = 0;
4171 moduleFreeContext(&ctx);
4172 }
4173 }
4174 }
4175
4176 /* Unsubscribe any notification subscribers this module has upon unloading */
moduleUnsubscribeNotifications(RedisModule * module)4177 void moduleUnsubscribeNotifications(RedisModule *module) {
4178 listIter li;
4179 listNode *ln;
4180 listRewind(moduleKeyspaceSubscribers,&li);
4181 while((ln = listNext(&li))) {
4182 RedisModuleKeyspaceSubscriber *sub = ln->value;
4183 if (sub->module == module) {
4184 listDelNode(moduleKeyspaceSubscribers, ln);
4185 zfree(sub);
4186 }
4187 }
4188 }
4189
4190 /* --------------------------------------------------------------------------
4191 * Modules Cluster API
4192 * -------------------------------------------------------------------------- */
4193
4194 /* The Cluster message callback function pointer type. */
4195 typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
4196
4197 /* This structure identifies a registered caller: it must match a given module
4198 * ID, for a given message type. The callback function is just the function
4199 * that was registered as receiver. */
4200 typedef struct moduleClusterReceiver {
4201 uint64_t module_id;
4202 RedisModuleClusterMessageReceiver callback;
4203 struct RedisModule *module;
4204 struct moduleClusterReceiver *next;
4205 } moduleClusterReceiver;
4206
4207 typedef struct moduleClusterNodeInfo {
4208 int flags;
4209 char ip[NET_IP_STR_LEN];
4210 int port;
4211 char master_id[40]; /* Only if flags & REDISMODULE_NODE_MASTER is true. */
4212 } mdouleClusterNodeInfo;
4213
4214 /* We have an array of message types: each bucket is a linked list of
4215 * configured receivers. */
4216 static moduleClusterReceiver *clusterReceivers[UINT8_MAX];
4217
4218 /* Dispatch the message to the right module receiver. */
moduleCallClusterReceivers(const char * sender_id,uint64_t module_id,uint8_t type,const unsigned char * payload,uint32_t len)4219 void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len) {
4220 moduleClusterReceiver *r = clusterReceivers[type];
4221 while(r) {
4222 if (r->module_id == module_id) {
4223 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
4224 ctx.module = r->module;
4225 ctx.client = moduleFreeContextReusedClient;
4226 selectDb(ctx.client, 0);
4227 r->callback(&ctx,sender_id,type,payload,len);
4228 moduleFreeContext(&ctx);
4229 return;
4230 }
4231 r = r->next;
4232 }
4233 }
4234
4235 /* Register a callback receiver for cluster messages of type 'type'. If there
4236 * was already a registered callback, this will replace the callback function
4237 * with the one provided, otherwise if the callback is set to NULL and there
4238 * is already a callback for this function, the callback is unregistered
4239 * (so this API call is also used in order to delete the receiver). */
RM_RegisterClusterMessageReceiver(RedisModuleCtx * ctx,uint8_t type,RedisModuleClusterMessageReceiver callback)4240 void RM_RegisterClusterMessageReceiver(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) {
4241 if (!server.cluster_enabled) return;
4242
4243 uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
4244 moduleClusterReceiver *r = clusterReceivers[type], *prev = NULL;
4245 while(r) {
4246 if (r->module_id == module_id) {
4247 /* Found! Set or delete. */
4248 if (callback) {
4249 r->callback = callback;
4250 } else {
4251 /* Delete the receiver entry if the user is setting
4252 * it to NULL. Just unlink the receiver node from the
4253 * linked list. */
4254 if (prev)
4255 prev->next = r->next;
4256 else
4257 clusterReceivers[type]->next = r->next;
4258 zfree(r);
4259 }
4260 return;
4261 }
4262 prev = r;
4263 r = r->next;
4264 }
4265
4266 /* Not found, let's add it. */
4267 if (callback) {
4268 r = zmalloc(sizeof(*r));
4269 r->module_id = module_id;
4270 r->module = ctx->module;
4271 r->callback = callback;
4272 r->next = clusterReceivers[type];
4273 clusterReceivers[type] = r;
4274 }
4275 }
4276
4277 /* Send a message to all the nodes in the cluster if `target` is NULL, otherwise
4278 * at the specified target, which is a REDISMODULE_NODE_ID_LEN bytes node ID, as
4279 * returned by the receiver callback or by the nodes iteration functions.
4280 *
4281 * The function returns REDISMODULE_OK if the message was successfully sent,
4282 * otherwise if the node is not connected or such node ID does not map to any
4283 * known cluster node, REDISMODULE_ERR is returned. */
RM_SendClusterMessage(RedisModuleCtx * ctx,char * target_id,uint8_t type,unsigned char * msg,uint32_t len)4284 int RM_SendClusterMessage(RedisModuleCtx *ctx, char *target_id, uint8_t type, unsigned char *msg, uint32_t len) {
4285 if (!server.cluster_enabled) return REDISMODULE_ERR;
4286 uint64_t module_id = moduleTypeEncodeId(ctx->module->name,0);
4287 if (clusterSendModuleMessageToTarget(target_id,module_id,type,msg,len) == C_OK)
4288 return REDISMODULE_OK;
4289 else
4290 return REDISMODULE_ERR;
4291 }
4292
4293 /* Return an array of string pointers, each string pointer points to a cluster
4294 * node ID of exactly REDISMODULE_NODE_ID_SIZE bytes (without any null term).
4295 * The number of returned node IDs is stored into `*numnodes`.
4296 * However if this function is called by a module not running an a Redis
4297 * instance with Redis Cluster enabled, NULL is returned instead.
4298 *
4299 * The IDs returned can be used with RedisModule_GetClusterNodeInfo() in order
4300 * to get more information about single nodes.
4301 *
4302 * The array returned by this function must be freed using the function
4303 * RedisModule_FreeClusterNodesList().
4304 *
4305 * Example:
4306 *
4307 * size_t count, j;
4308 * char **ids = RedisModule_GetClusterNodesList(ctx,&count);
4309 * for (j = 0; j < count; j++) {
4310 * RedisModule_Log("notice","Node %.*s",
4311 * REDISMODULE_NODE_ID_LEN,ids[j]);
4312 * }
4313 * RedisModule_FreeClusterNodesList(ids);
4314 */
RM_GetClusterNodesList(RedisModuleCtx * ctx,size_t * numnodes)4315 char **RM_GetClusterNodesList(RedisModuleCtx *ctx, size_t *numnodes) {
4316 UNUSED(ctx);
4317
4318 if (!server.cluster_enabled) return NULL;
4319 size_t count = dictSize(server.cluster->nodes);
4320 char **ids = zmalloc((count+1)*REDISMODULE_NODE_ID_LEN);
4321 dictIterator *di = dictGetIterator(server.cluster->nodes);
4322 dictEntry *de;
4323 int j = 0;
4324 while((de = dictNext(di)) != NULL) {
4325 clusterNode *node = dictGetVal(de);
4326 if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue;
4327 ids[j] = zmalloc(REDISMODULE_NODE_ID_LEN);
4328 memcpy(ids[j],node->name,REDISMODULE_NODE_ID_LEN);
4329 j++;
4330 }
4331 *numnodes = j;
4332 ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need
4333 * to also get the count argument. */
4334 dictReleaseIterator(di);
4335 return ids;
4336 }
4337
4338 /* Free the node list obtained with RedisModule_GetClusterNodesList. */
RM_FreeClusterNodesList(char ** ids)4339 void RM_FreeClusterNodesList(char **ids) {
4340 if (ids == NULL) return;
4341 for (int j = 0; ids[j]; j++) zfree(ids[j]);
4342 zfree(ids);
4343 }
4344
4345 /* Return this node ID (REDISMODULE_CLUSTER_ID_LEN bytes) or NULL if the cluster
4346 * is disabled. */
RM_GetMyClusterID(void)4347 const char *RM_GetMyClusterID(void) {
4348 if (!server.cluster_enabled) return NULL;
4349 return server.cluster->myself->name;
4350 }
4351
4352 /* Return the number of nodes in the cluster, regardless of their state
4353 * (handshake, noaddress, ...) so that the number of active nodes may actually
4354 * be smaller, but not greater than this number. If the instance is not in
4355 * cluster mode, zero is returned. */
RM_GetClusterSize(void)4356 size_t RM_GetClusterSize(void) {
4357 if (!server.cluster_enabled) return 0;
4358 return dictSize(server.cluster->nodes);
4359 }
4360
4361 /* Populate the specified info for the node having as ID the specified 'id',
4362 * then returns REDISMODULE_OK. Otherwise if the node ID does not exist from
4363 * the POV of this local node, REDISMODULE_ERR is returned.
4364 *
4365 * The arguments ip, master_id, port and flags can be NULL in case we don't
4366 * need to populate back certain info. If an ip and master_id (only populated
4367 * if the instance is a slave) are specified, they point to buffers holding
4368 * at least REDISMODULE_NODE_ID_LEN bytes. The strings written back as ip
4369 * and master_id are not null terminated.
4370 *
4371 * The list of flags reported is the following:
4372 *
4373 * * REDISMODULE_NODE_MYSELF This node
4374 * * REDISMODULE_NODE_MASTER The node is a master
4375 * * REDISMODULE_NODE_SLAVE The node is a replica
4376 * * REDISMODULE_NODE_PFAIL We see the node as failing
4377 * * REDISMODULE_NODE_FAIL The cluster agrees the node is failing
4378 * * REDISMODULE_NODE_NOFAILOVER The slave is configured to never failover
4379 */
4380
4381 clusterNode *clusterLookupNode(const char *name); /* We need access to internals */
4382
RM_GetClusterNodeInfo(RedisModuleCtx * ctx,const char * id,char * ip,char * master_id,int * port,int * flags)4383 int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *master_id, int *port, int *flags) {
4384 UNUSED(ctx);
4385
4386 clusterNode *node = clusterLookupNode(id);
4387 if (node == NULL ||
4388 node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
4389 {
4390 return REDISMODULE_ERR;
4391 }
4392
4393 if (ip) strncpy(ip,node->ip,NET_IP_STR_LEN);
4394
4395 if (master_id) {
4396 /* If the information is not available, the function will set the
4397 * field to zero bytes, so that when the field can't be populated the
4398 * function kinda remains predictable. */
4399 if (node->flags & CLUSTER_NODE_MASTER && node->slaveof)
4400 memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN);
4401 else
4402 memset(master_id,0,REDISMODULE_NODE_ID_LEN);
4403 }
4404 if (port) *port = node->port;
4405
4406 /* As usually we have to remap flags for modules, in order to ensure
4407 * we can provide binary compatibility. */
4408 if (flags) {
4409 *flags = 0;
4410 if (node->flags & CLUSTER_NODE_MYSELF) *flags |= REDISMODULE_NODE_MYSELF;
4411 if (node->flags & CLUSTER_NODE_MASTER) *flags |= REDISMODULE_NODE_MASTER;
4412 if (node->flags & CLUSTER_NODE_SLAVE) *flags |= REDISMODULE_NODE_SLAVE;
4413 if (node->flags & CLUSTER_NODE_PFAIL) *flags |= REDISMODULE_NODE_PFAIL;
4414 if (node->flags & CLUSTER_NODE_FAIL) *flags |= REDISMODULE_NODE_FAIL;
4415 if (node->flags & CLUSTER_NODE_NOFAILOVER) *flags |= REDISMODULE_NODE_NOFAILOVER;
4416 }
4417 return REDISMODULE_OK;
4418 }
4419
4420 /* Set Redis Cluster flags in order to change the normal behavior of
4421 * Redis Cluster, especially with the goal of disabling certain functions.
4422 * This is useful for modules that use the Cluster API in order to create
4423 * a different distributed system, but still want to use the Redis Cluster
4424 * message bus. Flags that can be set:
4425 *
4426 * CLUSTER_MODULE_FLAG_NO_FAILOVER
4427 * CLUSTER_MODULE_FLAG_NO_REDIRECTION
4428 *
4429 * With the following effects:
4430 *
4431 * NO_FAILOVER: prevent Redis Cluster slaves to failover a failing master.
4432 * Also disables the replica migration feature.
4433 *
4434 * NO_REDIRECTION: Every node will accept any key, without trying to perform
4435 * partitioning according to the user Redis Cluster algorithm.
4436 * Slots informations will still be propagated across the
4437 * cluster, but without effects. */
RM_SetClusterFlags(RedisModuleCtx * ctx,uint64_t flags)4438 void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) {
4439 UNUSED(ctx);
4440 if (flags & REDISMODULE_CLUSTER_FLAG_NO_FAILOVER)
4441 server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_FAILOVER;
4442 if (flags & REDISMODULE_CLUSTER_FLAG_NO_REDIRECTION)
4443 server.cluster_module_flags |= CLUSTER_MODULE_FLAG_NO_REDIRECTION;
4444 }
4445
4446 /* --------------------------------------------------------------------------
4447 * Modules Timers API
4448 *
4449 * Module timers are an high precision "green timers" abstraction where
4450 * every module can register even millions of timers without problems, even if
4451 * the actual event loop will just have a single timer that is used to awake the
4452 * module timers subsystem in order to process the next event.
4453 *
4454 * All the timers are stored into a radix tree, ordered by expire time, when
4455 * the main Redis event loop timer callback is called, we try to process all
4456 * the timers already expired one after the other. Then we re-enter the event
4457 * loop registering a timer that will expire when the next to process module
4458 * timer will expire.
4459 *
4460 * Every time the list of active timers drops to zero, we unregister the
4461 * main event loop timer, so that there is no overhead when such feature is
4462 * not used.
4463 * -------------------------------------------------------------------------- */
4464
4465 static rax *Timers; /* The radix tree of all the timers sorted by expire. */
4466 long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */
4467
4468 typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
4469
4470 /* The timer descriptor, stored as value in the radix tree. */
4471 typedef struct RedisModuleTimer {
4472 RedisModule *module; /* Module reference. */
4473 RedisModuleTimerProc callback; /* The callback to invoke on expire. */
4474 void *data; /* Private data for the callback. */
4475 int dbid; /* Database number selected by the original client. */
4476 } RedisModuleTimer;
4477
4478 /* This is the timer handler that is called by the main event loop. We schedule
4479 * this timer to be called when the nearest of our module timers will expire. */
moduleTimerHandler(struct aeEventLoop * eventLoop,long long id,void * clientData)4480 int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *clientData) {
4481 UNUSED(eventLoop);
4482 UNUSED(id);
4483 UNUSED(clientData);
4484
4485 /* To start let's try to fire all the timers already expired. */
4486 raxIterator ri;
4487 raxStart(&ri,Timers);
4488 uint64_t now = ustime();
4489 long long next_period = 0;
4490 while(1) {
4491 raxSeek(&ri,"^",NULL,0);
4492 if (!raxNext(&ri)) break;
4493 uint64_t expiretime;
4494 memcpy(&expiretime,ri.key,sizeof(expiretime));
4495 expiretime = ntohu64(expiretime);
4496 if (now >= expiretime) {
4497 RedisModuleTimer *timer = ri.data;
4498 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
4499
4500 ctx.module = timer->module;
4501 ctx.client = moduleFreeContextReusedClient;
4502 selectDb(ctx.client, timer->dbid);
4503 timer->callback(&ctx,timer->data);
4504 moduleFreeContext(&ctx);
4505 raxRemove(Timers,(unsigned char*)ri.key,ri.key_len,NULL);
4506 zfree(timer);
4507 } else {
4508 next_period = (expiretime-now)/1000; /* Scale to milliseconds. */
4509 break;
4510 }
4511 }
4512 raxStop(&ri);
4513
4514 /* Reschedule the next timer or cancel it. */
4515 if (next_period <= 0) next_period = 1;
4516 return (raxSize(Timers) > 0) ? next_period : AE_NOMORE;
4517 }
4518
4519 /* Create a new timer that will fire after `period` milliseconds, and will call
4520 * the specified function using `data` as argument. The returned timer ID can be
4521 * used to get information from the timer or to stop it before it fires. */
RM_CreateTimer(RedisModuleCtx * ctx,mstime_t period,RedisModuleTimerProc callback,void * data)4522 RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) {
4523 RedisModuleTimer *timer = zmalloc(sizeof(*timer));
4524 timer->module = ctx->module;
4525 timer->callback = callback;
4526 timer->data = data;
4527 timer->dbid = ctx->client->db->id;
4528 uint64_t expiretime = ustime()+period*1000;
4529 uint64_t key;
4530
4531 while(1) {
4532 key = htonu64(expiretime);
4533 if (raxFind(Timers, (unsigned char*)&key,sizeof(key)) == raxNotFound) {
4534 raxInsert(Timers,(unsigned char*)&key,sizeof(key),timer,NULL);
4535 break;
4536 } else {
4537 expiretime++;
4538 }
4539 }
4540
4541 /* We need to install the main event loop timer if it's not already
4542 * installed, or we may need to refresh its period if we just installed
4543 * a timer that will expire sooner than any other else. */
4544 if (aeTimer != -1) {
4545 raxIterator ri;
4546 raxStart(&ri,Timers);
4547 raxSeek(&ri,"^",NULL,0);
4548 raxNext(&ri);
4549 if (memcmp(ri.key,&key,sizeof(key)) == 0) {
4550 /* This is the first key, we need to re-install the timer according
4551 * to the just added event. */
4552 aeDeleteTimeEvent(server.el,aeTimer);
4553 aeTimer = -1;
4554 }
4555 raxStop(&ri);
4556 }
4557
4558 /* If we have no main timer (the old one was invalidated, or this is the
4559 * first module timer we have), install one. */
4560 if (aeTimer == -1)
4561 aeTimer = aeCreateTimeEvent(server.el,period,moduleTimerHandler,NULL,NULL);
4562
4563 return key;
4564 }
4565
4566 /* Stop a timer, returns REDISMODULE_OK if the timer was found, belonged to the
4567 * calling module, and was stopped, otherwise REDISMODULE_ERR is returned.
4568 * If not NULL, the data pointer is set to the value of the data argument when
4569 * the timer was created. */
RM_StopTimer(RedisModuleCtx * ctx,RedisModuleTimerID id,void ** data)4570 int RM_StopTimer(RedisModuleCtx *ctx, RedisModuleTimerID id, void **data) {
4571 RedisModuleTimer *timer = raxFind(Timers,(unsigned char*)&id,sizeof(id));
4572 if (timer == raxNotFound || timer->module != ctx->module)
4573 return REDISMODULE_ERR;
4574 if (data) *data = timer->data;
4575 raxRemove(Timers,(unsigned char*)&id,sizeof(id),NULL);
4576 zfree(timer);
4577 return REDISMODULE_OK;
4578 }
4579
4580 /* Obtain information about a timer: its remaining time before firing
4581 * (in milliseconds), and the private data pointer associated with the timer.
4582 * If the timer specified does not exist or belongs to a different module
4583 * no information is returned and the function returns REDISMODULE_ERR, otherwise
4584 * REDISMODULE_OK is returned. The arguments remaining or data can be NULL if
4585 * the caller does not need certain information. */
RM_GetTimerInfo(RedisModuleCtx * ctx,RedisModuleTimerID id,uint64_t * remaining,void ** data)4586 int RM_GetTimerInfo(RedisModuleCtx *ctx, RedisModuleTimerID id, uint64_t *remaining, void **data) {
4587 RedisModuleTimer *timer = raxFind(Timers,(unsigned char*)&id,sizeof(id));
4588 if (timer == raxNotFound || timer->module != ctx->module)
4589 return REDISMODULE_ERR;
4590 if (remaining) {
4591 int64_t rem = ntohu64(id)-ustime();
4592 if (rem < 0) rem = 0;
4593 *remaining = rem/1000; /* Scale to milliseconds. */
4594 }
4595 if (data) *data = timer->data;
4596 return REDISMODULE_OK;
4597 }
4598
4599 /* --------------------------------------------------------------------------
4600 * Modules Dictionary API
4601 *
4602 * Implements a sorted dictionary (actually backed by a radix tree) with
4603 * the usual get / set / del / num-items API, together with an iterator
4604 * capable of going back and forth.
4605 * -------------------------------------------------------------------------- */
4606
4607 /* Create a new dictionary. The 'ctx' pointer can be the current module context
4608 * or NULL, depending on what you want. Please follow the following rules:
4609 *
4610 * 1. Use a NULL context if you plan to retain a reference to this dictionary
4611 * that will survive the time of the module callback where you created it.
4612 * 2. Use a NULL context if no context is available at the time you are creating
4613 * the dictionary (of course...).
4614 * 3. However use the current callback context as 'ctx' argument if the
4615 * dictionary time to live is just limited to the callback scope. In this
4616 * case, if enabled, you can enjoy the automatic memory management that will
4617 * reclaim the dictionary memory, as well as the strings returned by the
4618 * Next / Prev dictionary iterator calls.
4619 */
RM_CreateDict(RedisModuleCtx * ctx)4620 RedisModuleDict *RM_CreateDict(RedisModuleCtx *ctx) {
4621 struct RedisModuleDict *d = zmalloc(sizeof(*d));
4622 d->rax = raxNew();
4623 if (ctx != NULL) autoMemoryAdd(ctx,REDISMODULE_AM_DICT,d);
4624 return d;
4625 }
4626
4627 /* Free a dictionary created with RM_CreateDict(). You need to pass the
4628 * context pointer 'ctx' only if the dictionary was created using the
4629 * context instead of passing NULL. */
RM_FreeDict(RedisModuleCtx * ctx,RedisModuleDict * d)4630 void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d) {
4631 if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_DICT,d);
4632 raxFree(d->rax);
4633 zfree(d);
4634 }
4635
4636 /* Return the size of the dictionary (number of keys). */
RM_DictSize(RedisModuleDict * d)4637 uint64_t RM_DictSize(RedisModuleDict *d) {
4638 return raxSize(d->rax);
4639 }
4640
4641 /* Store the specified key into the dictionary, setting its value to the
4642 * pointer 'ptr'. If the key was added with success, since it did not
4643 * already exist, REDISMODULE_OK is returned. Otherwise if the key already
4644 * exists the function returns REDISMODULE_ERR. */
RM_DictSetC(RedisModuleDict * d,void * key,size_t keylen,void * ptr)4645 int RM_DictSetC(RedisModuleDict *d, void *key, size_t keylen, void *ptr) {
4646 int retval = raxTryInsert(d->rax,key,keylen,ptr,NULL);
4647 return (retval == 1) ? REDISMODULE_OK : REDISMODULE_ERR;
4648 }
4649
4650 /* Like RedisModule_DictSetC() but will replace the key with the new
4651 * value if the key already exists. */
RM_DictReplaceC(RedisModuleDict * d,void * key,size_t keylen,void * ptr)4652 int RM_DictReplaceC(RedisModuleDict *d, void *key, size_t keylen, void *ptr) {
4653 int retval = raxInsert(d->rax,key,keylen,ptr,NULL);
4654 return (retval == 1) ? REDISMODULE_OK : REDISMODULE_ERR;
4655 }
4656
4657 /* Like RedisModule_DictSetC() but takes the key as a RedisModuleString. */
RM_DictSet(RedisModuleDict * d,RedisModuleString * key,void * ptr)4658 int RM_DictSet(RedisModuleDict *d, RedisModuleString *key, void *ptr) {
4659 return RM_DictSetC(d,key->ptr,sdslen(key->ptr),ptr);
4660 }
4661
4662 /* Like RedisModule_DictReplaceC() but takes the key as a RedisModuleString. */
RM_DictReplace(RedisModuleDict * d,RedisModuleString * key,void * ptr)4663 int RM_DictReplace(RedisModuleDict *d, RedisModuleString *key, void *ptr) {
4664 return RM_DictReplaceC(d,key->ptr,sdslen(key->ptr),ptr);
4665 }
4666
4667 /* Return the value stored at the specified key. The function returns NULL
4668 * both in the case the key does not exist, or if you actually stored
4669 * NULL at key. So, optionally, if the 'nokey' pointer is not NULL, it will
4670 * be set by reference to 1 if the key does not exist, or to 0 if the key
4671 * exists. */
RM_DictGetC(RedisModuleDict * d,void * key,size_t keylen,int * nokey)4672 void *RM_DictGetC(RedisModuleDict *d, void *key, size_t keylen, int *nokey) {
4673 void *res = raxFind(d->rax,key,keylen);
4674 if (nokey) *nokey = (res == raxNotFound);
4675 return (res == raxNotFound) ? NULL : res;
4676 }
4677
4678 /* Like RedisModule_DictGetC() but takes the key as a RedisModuleString. */
RM_DictGet(RedisModuleDict * d,RedisModuleString * key,int * nokey)4679 void *RM_DictGet(RedisModuleDict *d, RedisModuleString *key, int *nokey) {
4680 return RM_DictGetC(d,key->ptr,sdslen(key->ptr),nokey);
4681 }
4682
4683 /* Remove the specified key from the dictionary, returning REDISMODULE_OK if
4684 * the key was found and delted, or REDISMODULE_ERR if instead there was
4685 * no such key in the dictionary. When the operation is successful, if
4686 * 'oldval' is not NULL, then '*oldval' is set to the value stored at the
4687 * key before it was deleted. Using this feature it is possible to get
4688 * a pointer to the value (for instance in order to release it), without
4689 * having to call RedisModule_DictGet() before deleting the key. */
RM_DictDelC(RedisModuleDict * d,void * key,size_t keylen,void * oldval)4690 int RM_DictDelC(RedisModuleDict *d, void *key, size_t keylen, void *oldval) {
4691 int retval = raxRemove(d->rax,key,keylen,oldval);
4692 return retval ? REDISMODULE_OK : REDISMODULE_ERR;
4693 }
4694
4695 /* Like RedisModule_DictDelC() but gets the key as a RedisModuleString. */
RM_DictDel(RedisModuleDict * d,RedisModuleString * key,void * oldval)4696 int RM_DictDel(RedisModuleDict *d, RedisModuleString *key, void *oldval) {
4697 return RM_DictDelC(d,key->ptr,sdslen(key->ptr),oldval);
4698 }
4699
4700 /* Return an interator, setup in order to start iterating from the specified
4701 * key by applying the operator 'op', which is just a string specifying the
4702 * comparison operator to use in order to seek the first element. The
4703 * operators avalable are:
4704 *
4705 * "^" -- Seek the first (lexicographically smaller) key.
4706 * "$" -- Seek the last (lexicographically biffer) key.
4707 * ">" -- Seek the first element greter than the specified key.
4708 * ">=" -- Seek the first element greater or equal than the specified key.
4709 * "<" -- Seek the first element smaller than the specified key.
4710 * "<=" -- Seek the first element smaller or equal than the specified key.
4711 * "==" -- Seek the first element matching exactly the specified key.
4712 *
4713 * Note that for "^" and "$" the passed key is not used, and the user may
4714 * just pass NULL with a length of 0.
4715 *
4716 * If the element to start the iteration cannot be seeked based on the
4717 * key and operator passed, RedisModule_DictNext() / Prev() will just return
4718 * REDISMODULE_ERR at the first call, otherwise they'll produce elements.
4719 */
RM_DictIteratorStartC(RedisModuleDict * d,const char * op,void * key,size_t keylen)4720 RedisModuleDictIter *RM_DictIteratorStartC(RedisModuleDict *d, const char *op, void *key, size_t keylen) {
4721 RedisModuleDictIter *di = zmalloc(sizeof(*di));
4722 di->dict = d;
4723 raxStart(&di->ri,d->rax);
4724 raxSeek(&di->ri,op,key,keylen);
4725 return di;
4726 }
4727
4728 /* Exactly like RedisModule_DictIteratorStartC, but the key is passed as a
4729 * RedisModuleString. */
RM_DictIteratorStart(RedisModuleDict * d,const char * op,RedisModuleString * key)4730 RedisModuleDictIter *RM_DictIteratorStart(RedisModuleDict *d, const char *op, RedisModuleString *key) {
4731 return RM_DictIteratorStartC(d,op,key->ptr,sdslen(key->ptr));
4732 }
4733
4734 /* Release the iterator created with RedisModule_DictIteratorStart(). This call
4735 * is mandatory otherwise a memory leak is introduced in the module. */
RM_DictIteratorStop(RedisModuleDictIter * di)4736 void RM_DictIteratorStop(RedisModuleDictIter *di) {
4737 raxStop(&di->ri);
4738 zfree(di);
4739 }
4740
4741 /* After its creation with RedisModule_DictIteratorStart(), it is possible to
4742 * change the currently selected element of the iterator by using this
4743 * API call. The result based on the operator and key is exactly like
4744 * the function RedisModule_DictIteratorStart(), however in this case the
4745 * return value is just REDISMODULE_OK in case the seeked element was found,
4746 * or REDISMODULE_ERR in case it was not possible to seek the specified
4747 * element. It is possible to reseek an iterator as many times as you want. */
RM_DictIteratorReseekC(RedisModuleDictIter * di,const char * op,void * key,size_t keylen)4748 int RM_DictIteratorReseekC(RedisModuleDictIter *di, const char *op, void *key, size_t keylen) {
4749 return raxSeek(&di->ri,op,key,keylen);
4750 }
4751
4752 /* Like RedisModule_DictIteratorReseekC() but takes the key as as a
4753 * RedisModuleString. */
RM_DictIteratorReseek(RedisModuleDictIter * di,const char * op,RedisModuleString * key)4754 int RM_DictIteratorReseek(RedisModuleDictIter *di, const char *op, RedisModuleString *key) {
4755 return RM_DictIteratorReseekC(di,op,key->ptr,sdslen(key->ptr));
4756 }
4757
4758 /* Return the current item of the dictionary iterator 'di' and steps to the
4759 * next element. If the iterator already yield the last element and there
4760 * are no other elements to return, NULL is returned, otherwise a pointer
4761 * to a string representing the key is provided, and the '*keylen' length
4762 * is set by reference (if keylen is not NULL). The '*dataptr', if not NULL
4763 * is set to the value of the pointer stored at the returned key as auxiliary
4764 * data (as set by the RedisModule_DictSet API).
4765 *
4766 * Usage example:
4767 *
4768 * ... create the iterator here ...
4769 * char *key;
4770 * void *data;
4771 * while((key = RedisModule_DictNextC(iter,&keylen,&data)) != NULL) {
4772 * printf("%.*s %p\n", (int)keylen, key, data);
4773 * }
4774 *
4775 * The returned pointer is of type void because sometimes it makes sense
4776 * to cast it to a char* sometimes to an unsigned char* depending on the
4777 * fact it contains or not binary data, so this API ends being more
4778 * comfortable to use.
4779 *
4780 * The validity of the returned pointer is until the next call to the
4781 * next/prev iterator step. Also the pointer is no longer valid once the
4782 * iterator is released. */
RM_DictNextC(RedisModuleDictIter * di,size_t * keylen,void ** dataptr)4783 void *RM_DictNextC(RedisModuleDictIter *di, size_t *keylen, void **dataptr) {
4784 if (!raxNext(&di->ri)) return NULL;
4785 if (keylen) *keylen = di->ri.key_len;
4786 if (dataptr) *dataptr = di->ri.data;
4787 return di->ri.key;
4788 }
4789
4790 /* This function is exactly like RedisModule_DictNext() but after returning
4791 * the currently selected element in the iterator, it selects the previous
4792 * element (laxicographically smaller) instead of the next one. */
RM_DictPrevC(RedisModuleDictIter * di,size_t * keylen,void ** dataptr)4793 void *RM_DictPrevC(RedisModuleDictIter *di, size_t *keylen, void **dataptr) {
4794 if (!raxPrev(&di->ri)) return NULL;
4795 if (keylen) *keylen = di->ri.key_len;
4796 if (dataptr) *dataptr = di->ri.data;
4797 return di->ri.key;
4798 }
4799
4800 /* Like RedisModuleNextC(), but instead of returning an internally allocated
4801 * buffer and key length, it returns directly a module string object allocated
4802 * in the specified context 'ctx' (that may be NULL exactly like for the main
4803 * API RedisModule_CreateString).
4804 *
4805 * The returned string object should be deallocated after use, either manually
4806 * or by using a context that has automatic memory management active. */
RM_DictNext(RedisModuleCtx * ctx,RedisModuleDictIter * di,void ** dataptr)4807 RedisModuleString *RM_DictNext(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr) {
4808 size_t keylen;
4809 void *key = RM_DictNextC(di,&keylen,dataptr);
4810 if (key == NULL) return NULL;
4811 return RM_CreateString(ctx,key,keylen);
4812 }
4813
4814 /* Like RedisModule_DictNext() but after returning the currently selected
4815 * element in the iterator, it selects the previous element (laxicographically
4816 * smaller) instead of the next one. */
RM_DictPrev(RedisModuleCtx * ctx,RedisModuleDictIter * di,void ** dataptr)4817 RedisModuleString *RM_DictPrev(RedisModuleCtx *ctx, RedisModuleDictIter *di, void **dataptr) {
4818 size_t keylen;
4819 void *key = RM_DictPrevC(di,&keylen,dataptr);
4820 if (key == NULL) return NULL;
4821 return RM_CreateString(ctx,key,keylen);
4822 }
4823
4824 /* Compare the element currently pointed by the iterator to the specified
4825 * element given by key/keylen, according to the operator 'op' (the set of
4826 * valid operators are the same valid for RedisModule_DictIteratorStart).
4827 * If the comparision is successful the command returns REDISMODULE_OK
4828 * otherwise REDISMODULE_ERR is returned.
4829 *
4830 * This is useful when we want to just emit a lexicographical range, so
4831 * in the loop, as we iterate elements, we can also check if we are still
4832 * on range.
4833 *
4834 * The function returne REDISMODULE_ERR if the iterator reached the
4835 * end of elements condition as well. */
RM_DictCompareC(RedisModuleDictIter * di,const char * op,void * key,size_t keylen)4836 int RM_DictCompareC(RedisModuleDictIter *di, const char *op, void *key, size_t keylen) {
4837 if (raxEOF(&di->ri)) return REDISMODULE_ERR;
4838 int res = raxCompare(&di->ri,op,key,keylen);
4839 return res ? REDISMODULE_OK : REDISMODULE_ERR;
4840 }
4841
4842 /* Like RedisModule_DictCompareC but gets the key to compare with the current
4843 * iterator key as a RedisModuleString. */
RM_DictCompare(RedisModuleDictIter * di,const char * op,RedisModuleString * key)4844 int RM_DictCompare(RedisModuleDictIter *di, const char *op, RedisModuleString *key) {
4845 if (raxEOF(&di->ri)) return REDISMODULE_ERR;
4846 int res = raxCompare(&di->ri,op,key->ptr,sdslen(key->ptr));
4847 return res ? REDISMODULE_OK : REDISMODULE_ERR;
4848 }
4849
4850 /* --------------------------------------------------------------------------
4851 * Modules utility APIs
4852 * -------------------------------------------------------------------------- */
4853
4854 /* Return random bytes using SHA1 in counter mode with a /dev/urandom
4855 * initialized seed. This function is fast so can be used to generate
4856 * many bytes without any effect on the operating system entropy pool.
4857 * Currently this function is not thread safe. */
RM_GetRandomBytes(unsigned char * dst,size_t len)4858 void RM_GetRandomBytes(unsigned char *dst, size_t len) {
4859 getRandomBytes(dst,len);
4860 }
4861
4862 /* Like RedisModule_GetRandomBytes() but instead of setting the string to
4863 * random bytes the string is set to random characters in the in the
4864 * hex charset [0-9a-f]. */
RM_GetRandomHexChars(char * dst,size_t len)4865 void RM_GetRandomHexChars(char *dst, size_t len) {
4866 getRandomHexChars(dst,len);
4867 }
4868
4869 /* --------------------------------------------------------------------------
4870 * Modules API exporting / importing
4871 * -------------------------------------------------------------------------- */
4872
4873 /* This function is called by a module in order to export some API with a
4874 * given name. Other modules will be able to use this API by calling the
4875 * symmetrical function RM_GetSharedAPI() and casting the return value to
4876 * the right function pointer.
4877 *
4878 * The function will return REDISMODULE_OK if the name is not already taken,
4879 * otherwise REDISMODULE_ERR will be returned and no operation will be
4880 * performed.
4881 *
4882 * IMPORTANT: the apiname argument should be a string literal with static
4883 * lifetime. The API relies on the fact that it will always be valid in
4884 * the future. */
RM_ExportSharedAPI(RedisModuleCtx * ctx,const char * apiname,void * func)4885 int RM_ExportSharedAPI(RedisModuleCtx *ctx, const char *apiname, void *func) {
4886 RedisModuleSharedAPI *sapi = zmalloc(sizeof(*sapi));
4887 sapi->module = ctx->module;
4888 sapi->func = func;
4889 if (dictAdd(server.sharedapi, (char*)apiname, sapi) != DICT_OK) {
4890 zfree(sapi);
4891 return REDISMODULE_ERR;
4892 }
4893 return REDISMODULE_OK;
4894 }
4895
4896 /* Request an exported API pointer. The return value is just a void pointer
4897 * that the caller of this function will be required to cast to the right
4898 * function pointer, so this is a private contract between modules.
4899 *
4900 * If the requested API is not available then NULL is returned. Because
4901 * modules can be loaded at different times with different order, this
4902 * function calls should be put inside some module generic API registering
4903 * step, that is called every time a module attempts to execute a
4904 * command that requires external APIs: if some API cannot be resolved, the
4905 * command should return an error.
4906 *
4907 * Here is an exmaple:
4908 *
4909 * int ... myCommandImplementation() {
4910 * if (getExternalAPIs() == 0) {
4911 * reply with an error here if we cannot have the APIs
4912 * }
4913 * // Use the API:
4914 * myFunctionPointer(foo);
4915 * }
4916 *
4917 * And the function registerAPI() is:
4918 *
4919 * int getExternalAPIs(void) {
4920 * static int api_loaded = 0;
4921 * if (api_loaded != 0) return 1; // APIs already resolved.
4922 *
4923 * myFunctionPointer = RedisModule_GetOtherModuleAPI("...");
4924 * if (myFunctionPointer == NULL) return 0;
4925 *
4926 * return 1;
4927 * }
4928 */
RM_GetSharedAPI(RedisModuleCtx * ctx,const char * apiname)4929 void *RM_GetSharedAPI(RedisModuleCtx *ctx, const char *apiname) {
4930 dictEntry *de = dictFind(server.sharedapi, apiname);
4931 if (de == NULL) return NULL;
4932 RedisModuleSharedAPI *sapi = dictGetVal(de);
4933 if (listSearchKey(sapi->module->usedby,ctx->module) == NULL) {
4934 listAddNodeTail(sapi->module->usedby,ctx->module);
4935 listAddNodeTail(ctx->module->using,sapi->module);
4936 }
4937 return sapi->func;
4938 }
4939
4940 /* Remove all the APIs registered by the specified module. Usually you
4941 * want this when the module is going to be unloaded. This function
4942 * assumes that's caller responsibility to make sure the APIs are not
4943 * used by other modules.
4944 *
4945 * The number of unregistered APIs is returned. */
moduleUnregisterSharedAPI(RedisModule * module)4946 int moduleUnregisterSharedAPI(RedisModule *module) {
4947 int count = 0;
4948 dictIterator *di = dictGetSafeIterator(server.sharedapi);
4949 dictEntry *de;
4950 while ((de = dictNext(di)) != NULL) {
4951 const char *apiname = dictGetKey(de);
4952 RedisModuleSharedAPI *sapi = dictGetVal(de);
4953 if (sapi->module == module) {
4954 dictDelete(server.sharedapi,apiname);
4955 zfree(sapi);
4956 count++;
4957 }
4958 }
4959 dictReleaseIterator(di);
4960 return count;
4961 }
4962
4963 /* Remove the specified module as an user of APIs of ever other module.
4964 * This is usually called when a module is unloaded.
4965 *
4966 * Returns the number of modules this module was using APIs from. */
moduleUnregisterUsedAPI(RedisModule * module)4967 int moduleUnregisterUsedAPI(RedisModule *module) {
4968 listIter li;
4969 listNode *ln;
4970 int count = 0;
4971
4972 listRewind(module->using,&li);
4973 while((ln = listNext(&li))) {
4974 RedisModule *used = ln->value;
4975 listNode *ln = listSearchKey(used->usedby,module);
4976 if (ln) {
4977 listDelNode(module->using,ln);
4978 count++;
4979 }
4980 }
4981 return count;
4982 }
4983
4984 /* Unregister all filters registered by a module.
4985 * This is called when a module is being unloaded.
4986 *
4987 * Returns the number of filters unregistered. */
moduleUnregisterFilters(RedisModule * module)4988 int moduleUnregisterFilters(RedisModule *module) {
4989 listIter li;
4990 listNode *ln;
4991 int count = 0;
4992
4993 listRewind(module->filters,&li);
4994 while((ln = listNext(&li))) {
4995 RedisModuleCommandFilter *filter = ln->value;
4996 listNode *ln = listSearchKey(moduleCommandFilters,filter);
4997 if (ln) {
4998 listDelNode(moduleCommandFilters,ln);
4999 count++;
5000 }
5001 zfree(filter);
5002 }
5003 return count;
5004 }
5005
5006 /* --------------------------------------------------------------------------
5007 * Module Command Filter API
5008 * -------------------------------------------------------------------------- */
5009
5010 /* Register a new command filter function.
5011 *
5012 * Command filtering makes it possible for modules to extend Redis by plugging
5013 * into the execution flow of all commands.
5014 *
5015 * A registered filter gets called before Redis executes *any* command. This
5016 * includes both core Redis commands and commands registered by any module. The
5017 * filter applies in all execution paths including:
5018 *
5019 * 1. Invocation by a client.
5020 * 2. Invocation through `RedisModule_Call()` by any module.
5021 * 3. Invocation through Lua 'redis.call()`.
5022 * 4. Replication of a command from a master.
5023 *
5024 * The filter executes in a special filter context, which is different and more
5025 * limited than a RedisModuleCtx. Because the filter affects any command, it
5026 * must be implemented in a very efficient way to reduce the performance impact
5027 * on Redis. All Redis Module API calls that require a valid context (such as
5028 * `RedisModule_Call()`, `RedisModule_OpenKey()`, etc.) are not supported in a
5029 * filter context.
5030 *
5031 * The `RedisModuleCommandFilterCtx` can be used to inspect or modify the
5032 * executed command and its arguments. As the filter executes before Redis
5033 * begins processing the command, any change will affect the way the command is
5034 * processed. For example, a module can override Redis commands this way:
5035 *
5036 * 1. Register a `MODULE.SET` command which implements an extended version of
5037 * the Redis `SET` command.
5038 * 2. Register a command filter which detects invocation of `SET` on a specific
5039 * pattern of keys. Once detected, the filter will replace the first
5040 * argument from `SET` to `MODULE.SET`.
5041 * 3. When filter execution is complete, Redis considers the new command name
5042 * and therefore executes the module's own command.
5043 *
5044 * Note that in the above use case, if `MODULE.SET` itself uses
5045 * `RedisModule_Call()` the filter will be applied on that call as well. If
5046 * that is not desired, the `REDISMODULE_CMDFILTER_NOSELF` flag can be set when
5047 * registering the filter.
5048 *
5049 * The `REDISMODULE_CMDFILTER_NOSELF` flag prevents execution flows that
5050 * originate from the module's own `RM_Call()` from reaching the filter. This
5051 * flag is effective for all execution flows, including nested ones, as long as
5052 * the execution begins from the module's command context or a thread-safe
5053 * context that is associated with a blocking command.
5054 *
5055 * Detached thread-safe contexts are *not* associated with the module and cannot
5056 * be protected by this flag.
5057 *
5058 * If multiple filters are registered (by the same or different modules), they
5059 * are executed in the order of registration.
5060 */
5061
RM_RegisterCommandFilter(RedisModuleCtx * ctx,RedisModuleCommandFilterFunc callback,int flags)5062 RedisModuleCommandFilter *RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback, int flags) {
5063 RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter));
5064 filter->module = ctx->module;
5065 filter->callback = callback;
5066 filter->flags = flags;
5067
5068 listAddNodeTail(moduleCommandFilters, filter);
5069 listAddNodeTail(ctx->module->filters, filter);
5070 return filter;
5071 }
5072
5073 /* Unregister a command filter.
5074 */
RM_UnregisterCommandFilter(RedisModuleCtx * ctx,RedisModuleCommandFilter * filter)5075 int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *filter) {
5076 listNode *ln;
5077
5078 /* A module can only remove its own filters */
5079 if (filter->module != ctx->module) return REDISMODULE_ERR;
5080
5081 ln = listSearchKey(moduleCommandFilters,filter);
5082 if (!ln) return REDISMODULE_ERR;
5083 listDelNode(moduleCommandFilters,ln);
5084
5085 ln = listSearchKey(ctx->module->filters,filter);
5086 if (!ln) return REDISMODULE_ERR; /* Shouldn't happen */
5087 listDelNode(ctx->module->filters,ln);
5088
5089 return REDISMODULE_OK;
5090 }
5091
moduleCallCommandFilters(client * c)5092 void moduleCallCommandFilters(client *c) {
5093 if (listLength(moduleCommandFilters) == 0) return;
5094
5095 listIter li;
5096 listNode *ln;
5097 listRewind(moduleCommandFilters,&li);
5098
5099 RedisModuleCommandFilterCtx filter = {
5100 .argv = c->argv,
5101 .argc = c->argc
5102 };
5103
5104 while((ln = listNext(&li))) {
5105 RedisModuleCommandFilter *f = ln->value;
5106
5107 /* Skip filter if REDISMODULE_CMDFILTER_NOSELF is set and module is
5108 * currently processing a command.
5109 */
5110 if ((f->flags & REDISMODULE_CMDFILTER_NOSELF) && f->module->in_call) continue;
5111
5112 /* Call filter */
5113 f->callback(&filter);
5114 }
5115
5116 c->argv = filter.argv;
5117 c->argc = filter.argc;
5118 }
5119
5120 /* Return the number of arguments a filtered command has. The number of
5121 * arguments include the command itself.
5122 */
RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx * fctx)5123 int RM_CommandFilterArgsCount(RedisModuleCommandFilterCtx *fctx)
5124 {
5125 return fctx->argc;
5126 }
5127
5128 /* Return the specified command argument. The first argument (position 0) is
5129 * the command itself, and the rest are user-provided args.
5130 */
RM_CommandFilterArgGet(RedisModuleCommandFilterCtx * fctx,int pos)5131 const RedisModuleString *RM_CommandFilterArgGet(RedisModuleCommandFilterCtx *fctx, int pos)
5132 {
5133 if (pos < 0 || pos >= fctx->argc) return NULL;
5134 return fctx->argv[pos];
5135 }
5136
5137 /* Modify the filtered command by inserting a new argument at the specified
5138 * position. The specified RedisModuleString argument may be used by Redis
5139 * after the filter context is destroyed, so it must not be auto-memory
5140 * allocated, freed or used elsewhere.
5141 */
5142
RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx * fctx,int pos,RedisModuleString * arg)5143 int RM_CommandFilterArgInsert(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg)
5144 {
5145 int i;
5146
5147 if (pos < 0 || pos > fctx->argc) return REDISMODULE_ERR;
5148
5149 fctx->argv = zrealloc(fctx->argv, (fctx->argc+1)*sizeof(RedisModuleString *));
5150 for (i = fctx->argc; i > pos; i--) {
5151 fctx->argv[i] = fctx->argv[i-1];
5152 }
5153 fctx->argv[pos] = arg;
5154 fctx->argc++;
5155
5156 return REDISMODULE_OK;
5157 }
5158
5159 /* Modify the filtered command by replacing an existing argument with a new one.
5160 * The specified RedisModuleString argument may be used by Redis after the
5161 * filter context is destroyed, so it must not be auto-memory allocated, freed
5162 * or used elsewhere.
5163 */
5164
RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx * fctx,int pos,RedisModuleString * arg)5165 int RM_CommandFilterArgReplace(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg)
5166 {
5167 if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR;
5168
5169 decrRefCount(fctx->argv[pos]);
5170 fctx->argv[pos] = arg;
5171
5172 return REDISMODULE_OK;
5173 }
5174
5175 /* Modify the filtered command by deleting an argument at the specified
5176 * position.
5177 */
RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx * fctx,int pos)5178 int RM_CommandFilterArgDelete(RedisModuleCommandFilterCtx *fctx, int pos)
5179 {
5180 int i;
5181 if (pos < 0 || pos >= fctx->argc) return REDISMODULE_ERR;
5182
5183 decrRefCount(fctx->argv[pos]);
5184 for (i = pos; i < fctx->argc-1; i++) {
5185 fctx->argv[i] = fctx->argv[i+1];
5186 }
5187 fctx->argc--;
5188
5189 return REDISMODULE_OK;
5190 }
5191
5192 /* --------------------------------------------------------------------------
5193 * Modules API internals
5194 * -------------------------------------------------------------------------- */
5195
5196 /* server.moduleapi dictionary type. Only uses plain C strings since
5197 * this gets queries from modules. */
5198
dictCStringKeyHash(const void * key)5199 uint64_t dictCStringKeyHash(const void *key) {
5200 return dictGenHashFunction((unsigned char*)key, strlen((char*)key));
5201 }
5202
dictCStringKeyCompare(void * privdata,const void * key1,const void * key2)5203 int dictCStringKeyCompare(void *privdata, const void *key1, const void *key2) {
5204 UNUSED(privdata);
5205 return strcmp(key1,key2) == 0;
5206 }
5207
5208 dictType moduleAPIDictType = {
5209 dictCStringKeyHash, /* hash function */
5210 NULL, /* key dup */
5211 NULL, /* val dup */
5212 dictCStringKeyCompare, /* key compare */
5213 NULL, /* key destructor */
5214 NULL /* val destructor */
5215 };
5216
moduleRegisterApi(const char * funcname,void * funcptr)5217 int moduleRegisterApi(const char *funcname, void *funcptr) {
5218 return dictAdd(server.moduleapi, (char*)funcname, funcptr);
5219 }
5220
5221 #define REGISTER_API(name) \
5222 moduleRegisterApi("RedisModule_" #name, (void *)(unsigned long)RM_ ## name)
5223
5224 /* Global initialization at Redis startup. */
5225 void moduleRegisterCoreAPI(void);
5226
moduleInitModulesSystem(void)5227 void moduleInitModulesSystem(void) {
5228 moduleUnblockedClients = listCreate();
5229 server.loadmodule_queue = listCreate();
5230 modules = dictCreate(&modulesDictType,NULL);
5231
5232 /* Set up the keyspace notification susbscriber list and static client */
5233 moduleKeyspaceSubscribers = listCreate();
5234 moduleFreeContextReusedClient = createClient(-1);
5235 moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
5236
5237 /* Set up filter list */
5238 moduleCommandFilters = listCreate();
5239
5240 moduleRegisterCoreAPI();
5241 if (pipe(server.module_blocked_pipe) == -1) {
5242 serverLog(LL_WARNING,
5243 "Can't create the pipe for module blocking commands: %s",
5244 strerror(errno));
5245 exit(1);
5246 }
5247 /* Make the pipe non blocking. This is just a best effort aware mechanism
5248 * and we do not want to block not in the read nor in the write half. */
5249 anetNonBlock(NULL,server.module_blocked_pipe[0]);
5250 anetNonBlock(NULL,server.module_blocked_pipe[1]);
5251
5252 /* Create the timers radix tree. */
5253 Timers = raxNew();
5254
5255 /* Our thread-safe contexts GIL must start with already locked:
5256 * it is just unlocked when it's safe. */
5257 pthread_mutex_lock(&moduleGIL);
5258 }
5259
5260 /* Load all the modules in the server.loadmodule_queue list, which is
5261 * populated by `loadmodule` directives in the configuration file.
5262 * We can't load modules directly when processing the configuration file
5263 * because the server must be fully initialized before loading modules.
5264 *
5265 * The function aborts the server on errors, since to start with missing
5266 * modules is not considered sane: clients may rely on the existence of
5267 * given commands, loading AOF also may need some modules to exist, and
5268 * if this instance is a slave, it must understand commands from master. */
moduleLoadFromQueue(void)5269 void moduleLoadFromQueue(void) {
5270 listIter li;
5271 listNode *ln;
5272
5273 listRewind(server.loadmodule_queue,&li);
5274 while((ln = listNext(&li))) {
5275 struct moduleLoadQueueEntry *loadmod = ln->value;
5276 if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc)
5277 == C_ERR)
5278 {
5279 serverLog(LL_WARNING,
5280 "Can't load module from %s: server aborting",
5281 loadmod->path);
5282 exit(1);
5283 }
5284 }
5285 }
5286
moduleFreeModuleStructure(struct RedisModule * module)5287 void moduleFreeModuleStructure(struct RedisModule *module) {
5288 listRelease(module->types);
5289 listRelease(module->filters);
5290 sdsfree(module->name);
5291 zfree(module);
5292 }
5293
moduleUnregisterCommands(struct RedisModule * module)5294 void moduleUnregisterCommands(struct RedisModule *module) {
5295 /* Unregister all the commands registered by this module. */
5296 dictIterator *di = dictGetSafeIterator(server.commands);
5297 dictEntry *de;
5298 while ((de = dictNext(di)) != NULL) {
5299 struct redisCommand *cmd = dictGetVal(de);
5300 if (cmd->proc == RedisModuleCommandDispatcher) {
5301 RedisModuleCommandProxy *cp =
5302 (void*)(unsigned long)cmd->getkeys_proc;
5303 sds cmdname = cp->rediscmd->name;
5304 if (cp->module == module) {
5305 dictDelete(server.commands,cmdname);
5306 dictDelete(server.orig_commands,cmdname);
5307 sdsfree(cmdname);
5308 zfree(cp->rediscmd);
5309 zfree(cp);
5310 }
5311 }
5312 }
5313 dictReleaseIterator(di);
5314 }
5315
5316 /* Load a module and initialize it. On success C_OK is returned, otherwise
5317 * C_ERR is returned. */
moduleLoad(const char * path,void ** module_argv,int module_argc)5318 int moduleLoad(const char *path, void **module_argv, int module_argc) {
5319 int (*onload)(void *, void **, int);
5320 void *handle;
5321 RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
5322
5323 handle = dlopen(path,RTLD_NOW|RTLD_LOCAL);
5324 if (handle == NULL) {
5325 serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror());
5326 return C_ERR;
5327 }
5328 onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad");
5329 if (onload == NULL) {
5330 dlclose(handle);
5331 serverLog(LL_WARNING,
5332 "Module %s does not export RedisModule_OnLoad() "
5333 "symbol. Module not loaded.",path);
5334 return C_ERR;
5335 }
5336 if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) {
5337 if (ctx.module) {
5338 moduleUnregisterCommands(ctx.module);
5339 moduleUnregisterSharedAPI(ctx.module);
5340 moduleUnregisterUsedAPI(ctx.module);
5341 moduleFreeModuleStructure(ctx.module);
5342 }
5343 dlclose(handle);
5344 serverLog(LL_WARNING,
5345 "Module %s initialization failed. Module not loaded",path);
5346 return C_ERR;
5347 }
5348
5349 /* Redis module loaded! Register it. */
5350 dictAdd(modules,ctx.module->name,ctx.module);
5351 ctx.module->handle = handle;
5352 serverLog(LL_NOTICE,"Module '%s' loaded from %s",ctx.module->name,path);
5353 moduleFreeContext(&ctx);
5354 return C_OK;
5355 }
5356
5357
5358 /* Unload the module registered with the specified name. On success
5359 * C_OK is returned, otherwise C_ERR is returned and errno is set
5360 * to the following values depending on the type of error:
5361 *
5362 * * ENONET: No such module having the specified name.
5363 * * EBUSY: The module exports a new data type and can only be reloaded. */
moduleUnload(sds name)5364 int moduleUnload(sds name) {
5365 struct RedisModule *module = dictFetchValue(modules,name);
5366
5367 if (module == NULL) {
5368 errno = ENOENT;
5369 return REDISMODULE_ERR;
5370 } else if (listLength(module->types)) {
5371 errno = EBUSY;
5372 return REDISMODULE_ERR;
5373 } else if (listLength(module->usedby)) {
5374 errno = EPERM;
5375 return REDISMODULE_ERR;
5376 }
5377
5378 moduleUnregisterCommands(module);
5379 moduleUnregisterSharedAPI(module);
5380 moduleUnregisterUsedAPI(module);
5381 moduleUnregisterFilters(module);
5382
5383 /* Remove any notification subscribers this module might have */
5384 moduleUnsubscribeNotifications(module);
5385
5386 /* Unregister all the hooks. TODO: Yet no hooks support here. */
5387
5388 /* Unload the dynamic library. */
5389 if (dlclose(module->handle) == -1) {
5390 char *error = dlerror();
5391 if (error == NULL) error = "Unknown error";
5392 serverLog(LL_WARNING,"Error when trying to close the %s module: %s",
5393 module->name, error);
5394 }
5395
5396 /* Remove from list of modules. */
5397 serverLog(LL_NOTICE,"Module %s unloaded",module->name);
5398 dictDelete(modules,module->name);
5399 module->name = NULL; /* The name was already freed by dictDelete(). */
5400 moduleFreeModuleStructure(module);
5401
5402 return REDISMODULE_OK;
5403 }
5404
5405 /* Redis MODULE command.
5406 *
5407 * MODULE LOAD <path> [args...] */
moduleCommand(client * c)5408 void moduleCommand(client *c) {
5409 char *subcmd = c->argv[1]->ptr;
5410 if (c->argc == 2 && !strcasecmp(subcmd,"help")) {
5411 const char *help[] = {
5412 "LIST -- Return a list of loaded modules.",
5413 "LOAD <path> [arg ...] -- Load a module library from <path>.",
5414 "UNLOAD <name> -- Unload a module.",
5415 NULL
5416 };
5417 addReplyHelp(c, help);
5418 } else
5419 if (!strcasecmp(subcmd,"load") && c->argc >= 3) {
5420 robj **argv = NULL;
5421 int argc = 0;
5422
5423 if (c->argc > 3) {
5424 argc = c->argc - 3;
5425 argv = &c->argv[3];
5426 }
5427
5428 if (moduleLoad(c->argv[2]->ptr,(void **)argv,argc) == C_OK)
5429 addReply(c,shared.ok);
5430 else
5431 addReplyError(c,
5432 "Error loading the extension. Please check the server logs.");
5433 } else if (!strcasecmp(subcmd,"unload") && c->argc == 3) {
5434 if (moduleUnload(c->argv[2]->ptr) == C_OK)
5435 addReply(c,shared.ok);
5436 else {
5437 char *errmsg;
5438 switch(errno) {
5439 case ENOENT:
5440 errmsg = "no such module with that name";
5441 break;
5442 case EBUSY:
5443 errmsg = "the module exports one or more module-side data "
5444 "types, can't unload";
5445 break;
5446 case EPERM:
5447 errmsg = "the module exports APIs used by other modules. "
5448 "Please unload them first and try again";
5449 break;
5450 default:
5451 errmsg = "operation not possible.";
5452 break;
5453 }
5454 addReplyErrorFormat(c,"Error unloading module: %s",errmsg);
5455 }
5456 } else if (!strcasecmp(subcmd,"list") && c->argc == 2) {
5457 dictIterator *di = dictGetIterator(modules);
5458 dictEntry *de;
5459
5460 addReplyMultiBulkLen(c,dictSize(modules));
5461 while ((de = dictNext(di)) != NULL) {
5462 sds name = dictGetKey(de);
5463 struct RedisModule *module = dictGetVal(de);
5464 addReplyMultiBulkLen(c,4);
5465 addReplyBulkCString(c,"name");
5466 addReplyBulkCBuffer(c,name,sdslen(name));
5467 addReplyBulkCString(c,"ver");
5468 addReplyLongLong(c,module->ver);
5469 }
5470 dictReleaseIterator(di);
5471 } else {
5472 addReplySubcommandSyntaxError(c);
5473 return;
5474 }
5475 }
5476
5477 /* Return the number of registered modules. */
moduleCount(void)5478 size_t moduleCount(void) {
5479 return dictSize(modules);
5480 }
5481
5482 /* Register all the APIs we export. Keep this function at the end of the
5483 * file so that's easy to seek it to add new entries. */
moduleRegisterCoreAPI(void)5484 void moduleRegisterCoreAPI(void) {
5485 server.moduleapi = dictCreate(&moduleAPIDictType,NULL);
5486 server.sharedapi = dictCreate(&moduleAPIDictType,NULL);
5487 REGISTER_API(Alloc);
5488 REGISTER_API(Calloc);
5489 REGISTER_API(Realloc);
5490 REGISTER_API(Free);
5491 REGISTER_API(Strdup);
5492 REGISTER_API(CreateCommand);
5493 REGISTER_API(SetModuleAttribs);
5494 REGISTER_API(IsModuleNameBusy);
5495 REGISTER_API(WrongArity);
5496 REGISTER_API(ReplyWithLongLong);
5497 REGISTER_API(ReplyWithError);
5498 REGISTER_API(ReplyWithSimpleString);
5499 REGISTER_API(ReplyWithArray);
5500 REGISTER_API(ReplySetArrayLength);
5501 REGISTER_API(ReplyWithString);
5502 REGISTER_API(ReplyWithStringBuffer);
5503 REGISTER_API(ReplyWithCString);
5504 REGISTER_API(ReplyWithNull);
5505 REGISTER_API(ReplyWithCallReply);
5506 REGISTER_API(ReplyWithDouble);
5507 REGISTER_API(GetSelectedDb);
5508 REGISTER_API(SelectDb);
5509 REGISTER_API(OpenKey);
5510 REGISTER_API(CloseKey);
5511 REGISTER_API(KeyType);
5512 REGISTER_API(ValueLength);
5513 REGISTER_API(ListPush);
5514 REGISTER_API(ListPop);
5515 REGISTER_API(StringToLongLong);
5516 REGISTER_API(StringToDouble);
5517 REGISTER_API(Call);
5518 REGISTER_API(CallReplyProto);
5519 REGISTER_API(FreeCallReply);
5520 REGISTER_API(CallReplyInteger);
5521 REGISTER_API(CallReplyType);
5522 REGISTER_API(CallReplyLength);
5523 REGISTER_API(CallReplyArrayElement);
5524 REGISTER_API(CallReplyStringPtr);
5525 REGISTER_API(CreateStringFromCallReply);
5526 REGISTER_API(CreateString);
5527 REGISTER_API(CreateStringFromLongLong);
5528 REGISTER_API(CreateStringFromString);
5529 REGISTER_API(CreateStringPrintf);
5530 REGISTER_API(FreeString);
5531 REGISTER_API(StringPtrLen);
5532 REGISTER_API(AutoMemory);
5533 REGISTER_API(Replicate);
5534 REGISTER_API(ReplicateVerbatim);
5535 REGISTER_API(DeleteKey);
5536 REGISTER_API(UnlinkKey);
5537 REGISTER_API(StringSet);
5538 REGISTER_API(StringDMA);
5539 REGISTER_API(StringTruncate);
5540 REGISTER_API(SetExpire);
5541 REGISTER_API(GetExpire);
5542 REGISTER_API(ZsetAdd);
5543 REGISTER_API(ZsetIncrby);
5544 REGISTER_API(ZsetScore);
5545 REGISTER_API(ZsetRem);
5546 REGISTER_API(ZsetRangeStop);
5547 REGISTER_API(ZsetFirstInScoreRange);
5548 REGISTER_API(ZsetLastInScoreRange);
5549 REGISTER_API(ZsetFirstInLexRange);
5550 REGISTER_API(ZsetLastInLexRange);
5551 REGISTER_API(ZsetRangeCurrentElement);
5552 REGISTER_API(ZsetRangeNext);
5553 REGISTER_API(ZsetRangePrev);
5554 REGISTER_API(ZsetRangeEndReached);
5555 REGISTER_API(HashSet);
5556 REGISTER_API(HashGet);
5557 REGISTER_API(IsKeysPositionRequest);
5558 REGISTER_API(KeyAtPos);
5559 REGISTER_API(GetClientId);
5560 REGISTER_API(GetContextFlags);
5561 REGISTER_API(PoolAlloc);
5562 REGISTER_API(CreateDataType);
5563 REGISTER_API(ModuleTypeSetValue);
5564 REGISTER_API(ModuleTypeGetType);
5565 REGISTER_API(ModuleTypeGetValue);
5566 REGISTER_API(SaveUnsigned);
5567 REGISTER_API(LoadUnsigned);
5568 REGISTER_API(SaveSigned);
5569 REGISTER_API(LoadSigned);
5570 REGISTER_API(SaveString);
5571 REGISTER_API(SaveStringBuffer);
5572 REGISTER_API(LoadString);
5573 REGISTER_API(LoadStringBuffer);
5574 REGISTER_API(SaveDouble);
5575 REGISTER_API(LoadDouble);
5576 REGISTER_API(SaveFloat);
5577 REGISTER_API(LoadFloat);
5578 REGISTER_API(EmitAOF);
5579 REGISTER_API(Log);
5580 REGISTER_API(LogIOError);
5581 REGISTER_API(StringAppendBuffer);
5582 REGISTER_API(RetainString);
5583 REGISTER_API(StringCompare);
5584 REGISTER_API(GetContextFromIO);
5585 REGISTER_API(GetKeyNameFromIO);
5586 REGISTER_API(BlockClient);
5587 REGISTER_API(UnblockClient);
5588 REGISTER_API(IsBlockedReplyRequest);
5589 REGISTER_API(IsBlockedTimeoutRequest);
5590 REGISTER_API(GetBlockedClientPrivateData);
5591 REGISTER_API(AbortBlock);
5592 REGISTER_API(Milliseconds);
5593 REGISTER_API(GetThreadSafeContext);
5594 REGISTER_API(FreeThreadSafeContext);
5595 REGISTER_API(ThreadSafeContextLock);
5596 REGISTER_API(ThreadSafeContextUnlock);
5597 REGISTER_API(DigestAddStringBuffer);
5598 REGISTER_API(DigestAddLongLong);
5599 REGISTER_API(DigestEndSequence);
5600 REGISTER_API(SubscribeToKeyspaceEvents);
5601 REGISTER_API(RegisterClusterMessageReceiver);
5602 REGISTER_API(SendClusterMessage);
5603 REGISTER_API(GetClusterNodeInfo);
5604 REGISTER_API(GetClusterNodesList);
5605 REGISTER_API(FreeClusterNodesList);
5606 REGISTER_API(CreateTimer);
5607 REGISTER_API(StopTimer);
5608 REGISTER_API(GetTimerInfo);
5609 REGISTER_API(GetMyClusterID);
5610 REGISTER_API(GetClusterSize);
5611 REGISTER_API(GetRandomBytes);
5612 REGISTER_API(GetRandomHexChars);
5613 REGISTER_API(BlockedClientDisconnected);
5614 REGISTER_API(SetDisconnectCallback);
5615 REGISTER_API(GetBlockedClientHandle);
5616 REGISTER_API(SetClusterFlags);
5617 REGISTER_API(CreateDict);
5618 REGISTER_API(FreeDict);
5619 REGISTER_API(DictSize);
5620 REGISTER_API(DictSetC);
5621 REGISTER_API(DictReplaceC);
5622 REGISTER_API(DictSet);
5623 REGISTER_API(DictReplace);
5624 REGISTER_API(DictGetC);
5625 REGISTER_API(DictGet);
5626 REGISTER_API(DictDelC);
5627 REGISTER_API(DictDel);
5628 REGISTER_API(DictIteratorStartC);
5629 REGISTER_API(DictIteratorStart);
5630 REGISTER_API(DictIteratorStop);
5631 REGISTER_API(DictIteratorReseekC);
5632 REGISTER_API(DictIteratorReseek);
5633 REGISTER_API(DictNextC);
5634 REGISTER_API(DictPrevC);
5635 REGISTER_API(DictNext);
5636 REGISTER_API(DictPrev);
5637 REGISTER_API(DictCompareC);
5638 REGISTER_API(DictCompare);
5639 REGISTER_API(ExportSharedAPI);
5640 REGISTER_API(GetSharedAPI);
5641 REGISTER_API(RegisterCommandFilter);
5642 REGISTER_API(UnregisterCommandFilter);
5643 REGISTER_API(CommandFilterArgsCount);
5644 REGISTER_API(CommandFilterArgGet);
5645 REGISTER_API(CommandFilterArgInsert);
5646 REGISTER_API(CommandFilterArgReplace);
5647 REGISTER_API(CommandFilterArgDelete);
5648 }
5649