1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "cluster.h"
32 #include "atomicvar.h"
33 
34 #include <signal.h>
35 #include <ctype.h>
36 
37 /* Database backup. */
38 struct dbBackup {
39     redisDb *dbarray;
40     rax *slots_to_keys;
41     uint64_t slots_keys_count[CLUSTER_SLOTS];
42 };
43 
44 /*-----------------------------------------------------------------------------
45  * C-level DB API
46  *----------------------------------------------------------------------------*/
47 
48 int keyIsExpired(redisDb *db, robj *key);
49 
50 /* Update LFU when an object is accessed.
51  * Firstly, decrement the counter if the decrement time is reached.
52  * Then logarithmically increment the counter, and update the access time. */
updateLFU(robj * val)53 void updateLFU(robj *val) {
54     unsigned long counter = LFUDecrAndReturn(val);
55     counter = LFULogIncr(counter);
56     val->lru = (LFUGetTimeInMinutes()<<8) | counter;
57 }
58 
59 /* Low level key lookup API, not actually called directly from commands
60  * implementations that should instead rely on lookupKeyRead(),
61  * lookupKeyWrite() and lookupKeyReadWithFlags(). */
lookupKey(redisDb * db,robj * key,int flags)62 robj *lookupKey(redisDb *db, robj *key, int flags) {
63     dictEntry *de = dictFind(db->dict,key->ptr);
64     if (de) {
65         robj *val = dictGetVal(de);
66 
67         /* Update the access time for the ageing algorithm.
68          * Don't do it if we have a saving child, as this will trigger
69          * a copy on write madness. */
70         if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
71             if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
72                 updateLFU(val);
73             } else {
74                 val->lru = LRU_CLOCK();
75             }
76         }
77         return val;
78     } else {
79         return NULL;
80     }
81 }
82 
83 /* Lookup a key for read operations, or return NULL if the key is not found
84  * in the specified DB.
85  *
86  * As a side effect of calling this function:
87  * 1. A key gets expired if it reached it's TTL.
88  * 2. The key last access time is updated.
89  * 3. The global keys hits/misses stats are updated (reported in INFO).
90  * 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
91  *
92  * This API should not be used when we write to the key after obtaining
93  * the object linked to the key, but only for read only operations.
94  *
95  * Flags change the behavior of this command:
96  *
97  *  LOOKUP_NONE (or zero): no special flags are passed.
98  *  LOOKUP_NOTOUCH: don't alter the last access time of the key.
99  *
100  * Note: this function also returns NULL if the key is logically expired
101  * but still existing, in case this is a slave, since this API is called only
102  * for read operations. Even if the key expiry is master-driven, we can
103  * correctly report a key is expired on slaves even if the master is lagging
104  * expiring our key via DELs in the replication link. */
lookupKeyReadWithFlags(redisDb * db,robj * key,int flags)105 robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
106     robj *val;
107 
108     if (expireIfNeeded(db,key) == 1) {
109         /* Key expired. If we are in the context of a master, expireIfNeeded()
110          * returns 0 only when the key does not exist at all, so it's safe
111          * to return NULL ASAP. */
112         if (server.masterhost == NULL)
113             goto keymiss;
114 
115         /* However if we are in the context of a slave, expireIfNeeded() will
116          * not really try to expire the key, it only returns information
117          * about the "logical" status of the key: key expiring is up to the
118          * master in order to have a consistent view of master's data set.
119          *
120          * However, if the command caller is not the master, and as additional
121          * safety measure, the command invoked is a read-only command, we can
122          * safely return NULL here, and provide a more consistent behavior
123          * to clients accessing expired values in a read-only fashion, that
124          * will say the key as non existing.
125          *
126          * Notably this covers GETs when slaves are used to scale reads. */
127         if (server.current_client &&
128             server.current_client != server.master &&
129             server.current_client->cmd &&
130             server.current_client->cmd->flags & CMD_READONLY)
131         {
132             goto keymiss;
133         }
134     }
135     val = lookupKey(db,key,flags);
136     if (val == NULL)
137         goto keymiss;
138     server.stat_keyspace_hits++;
139     return val;
140 
141 keymiss:
142     if (!(flags & LOOKUP_NONOTIFY)) {
143         server.stat_keyspace_misses++;
144         notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
145     }
146     return NULL;
147 }
148 
149 /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
150  * common case. */
lookupKeyRead(redisDb * db,robj * key)151 robj *lookupKeyRead(redisDb *db, robj *key) {
152     return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
153 }
154 
155 /* Lookup a key for write operations, and as a side effect, if needed, expires
156  * the key if its TTL is reached.
157  *
158  * Returns the linked value object if the key exists or NULL if the key
159  * does not exist in the specified DB. */
lookupKeyWriteWithFlags(redisDb * db,robj * key,int flags)160 robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
161     expireIfNeeded(db,key);
162     return lookupKey(db,key,flags);
163 }
164 
lookupKeyWrite(redisDb * db,robj * key)165 robj *lookupKeyWrite(redisDb *db, robj *key) {
166     return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
167 }
168 
lookupKeyReadOrReply(client * c,robj * key,robj * reply)169 robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
170     robj *o = lookupKeyRead(c->db, key);
171     if (!o) addReply(c,reply);
172     return o;
173 }
174 
lookupKeyWriteOrReply(client * c,robj * key,robj * reply)175 robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
176     robj *o = lookupKeyWrite(c->db, key);
177     if (!o) addReply(c,reply);
178     return o;
179 }
180 
181 /* Add the key to the DB. It's up to the caller to increment the reference
182  * counter of the value if needed.
183  *
184  * The program is aborted if the key already exists. */
dbAdd(redisDb * db,robj * key,robj * val)185 void dbAdd(redisDb *db, robj *key, robj *val) {
186     sds copy = sdsdup(key->ptr);
187     int retval = dictAdd(db->dict, copy, val);
188 
189     serverAssertWithInfo(NULL,key,retval == DICT_OK);
190     if (val->type == OBJ_LIST ||
191         val->type == OBJ_ZSET ||
192         val->type == OBJ_STREAM)
193         signalKeyAsReady(db, key);
194     if (server.cluster_enabled) slotToKeyAdd(key->ptr);
195 }
196 
197 /* This is a special version of dbAdd() that is used only when loading
198  * keys from the RDB file: the key is passed as an SDS string that is
199  * retained by the function (and not freed by the caller).
200  *
201  * Moreover this function will not abort if the key is already busy, to
202  * give more control to the caller, nor will signal the key as ready
203  * since it is not useful in this context.
204  *
205  * The function returns 1 if the key was added to the database, taking
206  * ownership of the SDS string, otherwise 0 is returned, and is up to the
207  * caller to free the SDS string. */
dbAddRDBLoad(redisDb * db,sds key,robj * val)208 int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
209     int retval = dictAdd(db->dict, key, val);
210     if (retval != DICT_OK) return 0;
211     if (server.cluster_enabled) slotToKeyAdd(key);
212     return 1;
213 }
214 
215 /* Overwrite an existing key with a new value. Incrementing the reference
216  * count of the new value is up to the caller.
217  * This function does not modify the expire time of the existing key.
218  *
219  * The program is aborted if the key was not already present. */
dbOverwrite(redisDb * db,robj * key,robj * val)220 void dbOverwrite(redisDb *db, robj *key, robj *val) {
221     dictEntry *de = dictFind(db->dict,key->ptr);
222 
223     serverAssertWithInfo(NULL,key,de != NULL);
224     dictEntry auxentry = *de;
225     robj *old = dictGetVal(de);
226     if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
227         val->lru = old->lru;
228     }
229     dictSetVal(db->dict, de, val);
230 
231     if (server.lazyfree_lazy_server_del) {
232         freeObjAsync(old);
233         dictSetVal(db->dict, &auxentry, NULL);
234     }
235 
236     dictFreeVal(db->dict, &auxentry);
237 }
238 
239 /* High level Set operation. This function can be used in order to set
240  * a key, whatever it was existing or not, to a new object.
241  *
242  * 1) The ref count of the value object is incremented.
243  * 2) clients WATCHing for the destination key notified.
244  * 3) The expire time of the key is reset (the key is made persistent),
245  *    unless 'keepttl' is true.
246  *
247  * All the new keys in the database should be created via this interface.
248  * The client 'c' argument may be set to NULL if the operation is performed
249  * in a context where there is no clear client performing the operation. */
genericSetKey(client * c,redisDb * db,robj * key,robj * val,int keepttl,int signal)250 void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
251     if (lookupKeyWrite(db,key) == NULL) {
252         dbAdd(db,key,val);
253     } else {
254         dbOverwrite(db,key,val);
255     }
256     incrRefCount(val);
257     if (!keepttl) removeExpire(db,key);
258     if (signal) signalModifiedKey(c,db,key);
259 }
260 
261 /* Common case for genericSetKey() where the TTL is not retained. */
setKey(client * c,redisDb * db,robj * key,robj * val)262 void setKey(client *c, redisDb *db, robj *key, robj *val) {
263     genericSetKey(c,db,key,val,0,1);
264 }
265 
266 /* Return true if the specified key exists in the specified database.
267  * LRU/LFU info is not updated in any way. */
dbExists(redisDb * db,robj * key)268 int dbExists(redisDb *db, robj *key) {
269     return dictFind(db->dict,key->ptr) != NULL;
270 }
271 
272 /* Return a random key, in form of a Redis object.
273  * If there are no keys, NULL is returned.
274  *
275  * The function makes sure to return keys not already expired. */
dbRandomKey(redisDb * db)276 robj *dbRandomKey(redisDb *db) {
277     dictEntry *de;
278     int maxtries = 100;
279     int allvolatile = dictSize(db->dict) == dictSize(db->expires);
280 
281     while(1) {
282         sds key;
283         robj *keyobj;
284 
285         de = dictGetFairRandomKey(db->dict);
286         if (de == NULL) return NULL;
287 
288         key = dictGetKey(de);
289         keyobj = createStringObject(key,sdslen(key));
290         if (dictFind(db->expires,key)) {
291             if (allvolatile && server.masterhost && --maxtries == 0) {
292                 /* If the DB is composed only of keys with an expire set,
293                  * it could happen that all the keys are already logically
294                  * expired in the slave, so the function cannot stop because
295                  * expireIfNeeded() is false, nor it can stop because
296                  * dictGetRandomKey() returns NULL (there are keys to return).
297                  * To prevent the infinite loop we do some tries, but if there
298                  * are the conditions for an infinite loop, eventually we
299                  * return a key name that may be already expired. */
300                 return keyobj;
301             }
302             if (expireIfNeeded(db,keyobj)) {
303                 decrRefCount(keyobj);
304                 continue; /* search for another key. This expired. */
305             }
306         }
307         return keyobj;
308     }
309 }
310 
311 /* Delete a key, value, and associated expiration entry if any, from the DB */
dbSyncDelete(redisDb * db,robj * key)312 int dbSyncDelete(redisDb *db, robj *key) {
313     /* Deleting an entry from the expires dict will not free the sds of
314      * the key, because it is shared with the main dictionary. */
315     if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
316     if (dictDelete(db->dict,key->ptr) == DICT_OK) {
317         if (server.cluster_enabled) slotToKeyDel(key->ptr);
318         return 1;
319     } else {
320         return 0;
321     }
322 }
323 
324 /* This is a wrapper whose behavior depends on the Redis lazy free
325  * configuration. Deletes the key synchronously or asynchronously. */
dbDelete(redisDb * db,robj * key)326 int dbDelete(redisDb *db, robj *key) {
327     return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
328                                              dbSyncDelete(db,key);
329 }
330 
331 /* Prepare the string object stored at 'key' to be modified destructively
332  * to implement commands like SETBIT or APPEND.
333  *
334  * An object is usually ready to be modified unless one of the two conditions
335  * are true:
336  *
337  * 1) The object 'o' is shared (refcount > 1), we don't want to affect
338  *    other users.
339  * 2) The object encoding is not "RAW".
340  *
341  * If the object is found in one of the above conditions (or both) by the
342  * function, an unshared / not-encoded copy of the string object is stored
343  * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
344  * returned.
345  *
346  * USAGE:
347  *
348  * The object 'o' is what the caller already obtained by looking up 'key'
349  * in 'db', the usage pattern looks like this:
350  *
351  * o = lookupKeyWrite(db,key);
352  * if (checkType(c,o,OBJ_STRING)) return;
353  * o = dbUnshareStringValue(db,key,o);
354  *
355  * At this point the caller is ready to modify the object, for example
356  * using an sdscat() call to append some data, or anything else.
357  */
dbUnshareStringValue(redisDb * db,robj * key,robj * o)358 robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
359     serverAssert(o->type == OBJ_STRING);
360     if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
361         robj *decoded = getDecodedObject(o);
362         o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
363         decrRefCount(decoded);
364         dbOverwrite(db,key,o);
365     }
366     return o;
367 }
368 
369 /* Remove all keys from the database(s) structure. The dbarray argument
370  * may not be the server main DBs (could be a backup).
371  *
372  * The dbnum can be -1 if all the DBs should be emptied, or the specified
373  * DB index if we want to empty only a single database.
374  * The function returns the number of keys removed from the database(s). */
emptyDbStructure(redisDb * dbarray,int dbnum,int async,void (callback)(void *))375 long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
376                            void(callback)(void*))
377 {
378     long long removed = 0;
379     int startdb, enddb;
380 
381     if (dbnum == -1) {
382         startdb = 0;
383         enddb = server.dbnum-1;
384     } else {
385         startdb = enddb = dbnum;
386     }
387 
388     for (int j = startdb; j <= enddb; j++) {
389         removed += dictSize(dbarray[j].dict);
390         if (async) {
391             emptyDbAsync(&dbarray[j]);
392         } else {
393             dictEmpty(dbarray[j].dict,callback);
394             dictEmpty(dbarray[j].expires,callback);
395         }
396         /* Because all keys of database are removed, reset average ttl. */
397         dbarray[j].avg_ttl = 0;
398         dbarray[j].expires_cursor = 0;
399     }
400 
401     return removed;
402 }
403 
404 /* Remove all keys from all the databases in a Redis server.
405  * If callback is given the function is called from time to time to
406  * signal that work is in progress.
407  *
408  * The dbnum can be -1 if all the DBs should be flushed, or the specified
409  * DB number if we want to flush only a single Redis database number.
410  *
411  * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
412  * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
413  * and the function to return ASAP.
414  *
415  * On success the function returns the number of keys removed from the
416  * database(s). Otherwise -1 is returned in the specific case the
417  * DB number is out of range, and errno is set to EINVAL. */
emptyDb(int dbnum,int flags,void (callback)(void *))418 long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
419     int async = (flags & EMPTYDB_ASYNC);
420     RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
421     long long removed = 0;
422 
423     if (dbnum < -1 || dbnum >= server.dbnum) {
424         errno = EINVAL;
425         return -1;
426     }
427 
428     /* Fire the flushdb modules event. */
429     moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
430                           REDISMODULE_SUBEVENT_FLUSHDB_START,
431                           &fi);
432 
433     /* Make sure the WATCHed keys are affected by the FLUSH* commands.
434      * Note that we need to call the function while the keys are still
435      * there. */
436     signalFlushedDb(dbnum);
437 
438     /* Empty redis database structure. */
439     removed = emptyDbStructure(server.db, dbnum, async, callback);
440 
441     /* Flush slots to keys map if enable cluster, we can flush entire
442      * slots to keys map whatever dbnum because only support one DB
443      * in cluster mode. */
444     if (server.cluster_enabled) slotToKeyFlush(async);
445 
446     if (dbnum == -1) flushSlaveKeysWithExpireList();
447 
448     /* Also fire the end event. Note that this event will fire almost
449      * immediately after the start event if the flush is asynchronous. */
450     moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
451                           REDISMODULE_SUBEVENT_FLUSHDB_END,
452                           &fi);
453 
454     return removed;
455 }
456 
457 /* Store a backup of the database for later use, and put an empty one
458  * instead of it. */
backupDb(void)459 dbBackup *backupDb(void) {
460     dbBackup *backup = zmalloc(sizeof(dbBackup));
461 
462     /* Backup main DBs. */
463     backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum);
464     for (int i=0; i<server.dbnum; i++) {
465         backup->dbarray[i] = server.db[i];
466         server.db[i].dict = dictCreate(&dbDictType,NULL);
467         server.db[i].expires = dictCreate(&keyptrDictType,NULL);
468     }
469 
470     /* Backup cluster slots to keys map if enable cluster. */
471     if (server.cluster_enabled) {
472         backup->slots_to_keys = server.cluster->slots_to_keys;
473         memcpy(backup->slots_keys_count, server.cluster->slots_keys_count,
474             sizeof(server.cluster->slots_keys_count));
475         server.cluster->slots_to_keys = raxNew();
476         memset(server.cluster->slots_keys_count, 0,
477             sizeof(server.cluster->slots_keys_count));
478     }
479 
480     return backup;
481 }
482 
483 /* Discard a previously created backup, this can be slow (similar to FLUSHALL)
484  * Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */
discardDbBackup(dbBackup * buckup,int flags,void (callback)(void *))485 void discardDbBackup(dbBackup *buckup, int flags, void(callback)(void*)) {
486     int async = (flags & EMPTYDB_ASYNC);
487 
488     /* Release main DBs backup . */
489     emptyDbStructure(buckup->dbarray, -1, async, callback);
490     for (int i=0; i<server.dbnum; i++) {
491         dictRelease(buckup->dbarray[i].dict);
492         dictRelease(buckup->dbarray[i].expires);
493     }
494 
495     /* Release slots to keys map backup if enable cluster. */
496     if (server.cluster_enabled) freeSlotsToKeysMap(buckup->slots_to_keys, async);
497 
498     /* Release buckup. */
499     zfree(buckup->dbarray);
500     zfree(buckup);
501 }
502 
503 /* Restore the previously created backup (discarding what currently resides
504  * in the db).
505  * This function should be called after the current contents of the database
506  * was emptied with a previous call to emptyDb (possibly using the async mode). */
restoreDbBackup(dbBackup * buckup)507 void restoreDbBackup(dbBackup *buckup) {
508     /* Restore main DBs. */
509     for (int i=0; i<server.dbnum; i++) {
510         serverAssert(dictSize(server.db[i].dict) == 0);
511         serverAssert(dictSize(server.db[i].expires) == 0);
512         dictRelease(server.db[i].dict);
513         dictRelease(server.db[i].expires);
514         server.db[i] = buckup->dbarray[i];
515     }
516 
517     /* Restore slots to keys map backup if enable cluster. */
518     if (server.cluster_enabled) {
519         serverAssert(server.cluster->slots_to_keys->numele == 0);
520         raxFree(server.cluster->slots_to_keys);
521         server.cluster->slots_to_keys = buckup->slots_to_keys;
522         memcpy(server.cluster->slots_keys_count, buckup->slots_keys_count,
523                 sizeof(server.cluster->slots_keys_count));
524     }
525 
526     /* Release buckup. */
527     zfree(buckup->dbarray);
528     zfree(buckup);
529 }
530 
selectDb(client * c,int id)531 int selectDb(client *c, int id) {
532     if (id < 0 || id >= server.dbnum)
533         return C_ERR;
534     c->db = &server.db[id];
535     return C_OK;
536 }
537 
dbTotalServerKeyCount()538 long long dbTotalServerKeyCount() {
539     long long total = 0;
540     int j;
541     for (j = 0; j < server.dbnum; j++) {
542         total += dictSize(server.db[j].dict);
543     }
544     return total;
545 }
546 
547 /*-----------------------------------------------------------------------------
548  * Hooks for key space changes.
549  *
550  * Every time a key in the database is modified the function
551  * signalModifiedKey() is called.
552  *
553  * Every time a DB is flushed the function signalFlushDb() is called.
554  *----------------------------------------------------------------------------*/
555 
556 /* Note that the 'c' argument may be NULL if the key was modified out of
557  * a context of a client. */
signalModifiedKey(client * c,redisDb * db,robj * key)558 void signalModifiedKey(client *c, redisDb *db, robj *key) {
559     touchWatchedKey(db,key);
560     trackingInvalidateKey(c,key);
561 }
562 
signalFlushedDb(int dbid)563 void signalFlushedDb(int dbid) {
564     int startdb, enddb;
565     if (dbid == -1) {
566         startdb = 0;
567         enddb = server.dbnum-1;
568     } else {
569         startdb = enddb = dbid;
570     }
571 
572     for (int j = startdb; j <= enddb; j++) {
573         touchAllWatchedKeysInDb(&server.db[j], NULL);
574     }
575 
576     trackingInvalidateKeysOnFlush(dbid);
577 }
578 
579 /*-----------------------------------------------------------------------------
580  * Type agnostic commands operating on the key space
581  *----------------------------------------------------------------------------*/
582 
583 /* Return the set of flags to use for the emptyDb() call for FLUSHALL
584  * and FLUSHDB commands.
585  *
586  * Currently the command just attempts to parse the "ASYNC" option. It
587  * also checks if the command arity is wrong.
588  *
589  * On success C_OK is returned and the flags are stored in *flags, otherwise
590  * C_ERR is returned and the function sends an error to the client. */
getFlushCommandFlags(client * c,int * flags)591 int getFlushCommandFlags(client *c, int *flags) {
592     /* Parse the optional ASYNC option. */
593     if (c->argc > 1) {
594         if (c->argc > 2 || strcasecmp(c->argv[1]->ptr,"async")) {
595             addReply(c,shared.syntaxerr);
596             return C_ERR;
597         }
598         *flags = EMPTYDB_ASYNC;
599     } else {
600         *flags = EMPTYDB_NO_FLAGS;
601     }
602     return C_OK;
603 }
604 
605 /* Flushes the whole server data set. */
flushAllDataAndResetRDB(int flags)606 void flushAllDataAndResetRDB(int flags) {
607     server.dirty += emptyDb(-1,flags,NULL);
608     if (server.rdb_child_pid != -1) killRDBChild();
609     if (server.saveparamslen > 0) {
610         /* Normally rdbSave() will reset dirty, but we don't want this here
611          * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
612         int saved_dirty = server.dirty;
613         rdbSaveInfo rsi, *rsiptr;
614         rsiptr = rdbPopulateSaveInfo(&rsi);
615         rdbSave(server.rdb_filename,rsiptr);
616         server.dirty = saved_dirty;
617     }
618     server.dirty++;
619 #if defined(USE_JEMALLOC)
620     /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
621      * for large databases, flushdb blocks for long anyway, so a bit more won't
622      * harm and this way the flush and purge will be synchroneus. */
623     if (!(flags & EMPTYDB_ASYNC))
624         jemalloc_purge();
625 #endif
626 }
627 
628 /* FLUSHDB [ASYNC]
629  *
630  * Flushes the currently SELECTed Redis DB. */
flushdbCommand(client * c)631 void flushdbCommand(client *c) {
632     int flags;
633 
634     if (getFlushCommandFlags(c,&flags) == C_ERR) return;
635     server.dirty += emptyDb(c->db->id,flags,NULL);
636     addReply(c,shared.ok);
637 #if defined(USE_JEMALLOC)
638     /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
639      * for large databases, flushdb blocks for long anyway, so a bit more won't
640      * harm and this way the flush and purge will be synchroneus. */
641     if (!(flags & EMPTYDB_ASYNC))
642         jemalloc_purge();
643 #endif
644 }
645 
646 /* FLUSHALL [ASYNC]
647  *
648  * Flushes the whole server data set. */
flushallCommand(client * c)649 void flushallCommand(client *c) {
650     int flags;
651     if (getFlushCommandFlags(c,&flags) == C_ERR) return;
652     flushAllDataAndResetRDB(flags);
653     addReply(c,shared.ok);
654 }
655 
656 /* This command implements DEL and LAZYDEL. */
delGenericCommand(client * c,int lazy)657 void delGenericCommand(client *c, int lazy) {
658     int numdel = 0, j;
659 
660     for (j = 1; j < c->argc; j++) {
661         expireIfNeeded(c->db,c->argv[j]);
662         int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
663                               dbSyncDelete(c->db,c->argv[j]);
664         if (deleted) {
665             signalModifiedKey(c,c->db,c->argv[j]);
666             notifyKeyspaceEvent(NOTIFY_GENERIC,
667                 "del",c->argv[j],c->db->id);
668             server.dirty++;
669             numdel++;
670         }
671     }
672     addReplyLongLong(c,numdel);
673 }
674 
delCommand(client * c)675 void delCommand(client *c) {
676     delGenericCommand(c,server.lazyfree_lazy_user_del);
677 }
678 
unlinkCommand(client * c)679 void unlinkCommand(client *c) {
680     delGenericCommand(c,1);
681 }
682 
683 /* EXISTS key1 key2 ... key_N.
684  * Return value is the number of keys existing. */
existsCommand(client * c)685 void existsCommand(client *c) {
686     long long count = 0;
687     int j;
688 
689     for (j = 1; j < c->argc; j++) {
690         if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++;
691     }
692     addReplyLongLong(c,count);
693 }
694 
selectCommand(client * c)695 void selectCommand(client *c) {
696     long id;
697 
698     if (getLongFromObjectOrReply(c, c->argv[1], &id,
699         "invalid DB index") != C_OK)
700         return;
701 
702     if (server.cluster_enabled && id != 0) {
703         addReplyError(c,"SELECT is not allowed in cluster mode");
704         return;
705     }
706     if (selectDb(c,id) == C_ERR) {
707         addReplyError(c,"DB index is out of range");
708     } else {
709         addReply(c,shared.ok);
710     }
711 }
712 
randomkeyCommand(client * c)713 void randomkeyCommand(client *c) {
714     robj *key;
715 
716     if ((key = dbRandomKey(c->db)) == NULL) {
717         addReplyNull(c);
718         return;
719     }
720 
721     addReplyBulk(c,key);
722     decrRefCount(key);
723 }
724 
keysCommand(client * c)725 void keysCommand(client *c) {
726     dictIterator *di;
727     dictEntry *de;
728     sds pattern = c->argv[1]->ptr;
729     int plen = sdslen(pattern), allkeys;
730     unsigned long numkeys = 0;
731     void *replylen = addReplyDeferredLen(c);
732 
733     di = dictGetSafeIterator(c->db->dict);
734     allkeys = (pattern[0] == '*' && plen == 1);
735     while((de = dictNext(di)) != NULL) {
736         sds key = dictGetKey(de);
737         robj *keyobj;
738 
739         if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
740             keyobj = createStringObject(key,sdslen(key));
741             if (!keyIsExpired(c->db,keyobj)) {
742                 addReplyBulk(c,keyobj);
743                 numkeys++;
744             }
745             decrRefCount(keyobj);
746         }
747     }
748     dictReleaseIterator(di);
749     setDeferredArrayLen(c,replylen,numkeys);
750 }
751 
752 /* This callback is used by scanGenericCommand in order to collect elements
753  * returned by the dictionary iterator into a list. */
scanCallback(void * privdata,const dictEntry * de)754 void scanCallback(void *privdata, const dictEntry *de) {
755     void **pd = (void**) privdata;
756     list *keys = pd[0];
757     robj *o = pd[1];
758     robj *key, *val = NULL;
759 
760     if (o == NULL) {
761         sds sdskey = dictGetKey(de);
762         key = createStringObject(sdskey, sdslen(sdskey));
763     } else if (o->type == OBJ_SET) {
764         sds keysds = dictGetKey(de);
765         key = createStringObject(keysds,sdslen(keysds));
766     } else if (o->type == OBJ_HASH) {
767         sds sdskey = dictGetKey(de);
768         sds sdsval = dictGetVal(de);
769         key = createStringObject(sdskey,sdslen(sdskey));
770         val = createStringObject(sdsval,sdslen(sdsval));
771     } else if (o->type == OBJ_ZSET) {
772         sds sdskey = dictGetKey(de);
773         key = createStringObject(sdskey,sdslen(sdskey));
774         val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
775     } else {
776         serverPanic("Type not handled in SCAN callback.");
777     }
778 
779     listAddNodeTail(keys, key);
780     if (val) listAddNodeTail(keys, val);
781 }
782 
783 /* Try to parse a SCAN cursor stored at object 'o':
784  * if the cursor is valid, store it as unsigned integer into *cursor and
785  * returns C_OK. Otherwise return C_ERR and send an error to the
786  * client. */
parseScanCursorOrReply(client * c,robj * o,unsigned long * cursor)787 int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
788     char *eptr;
789 
790     /* Use strtoul() because we need an *unsigned* long, so
791      * getLongLongFromObject() does not cover the whole cursor space. */
792     errno = 0;
793     *cursor = strtoul(o->ptr, &eptr, 10);
794     if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
795     {
796         addReplyError(c, "invalid cursor");
797         return C_ERR;
798     }
799     return C_OK;
800 }
801 
802 /* This command implements SCAN, HSCAN and SSCAN commands.
803  * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
804  * if 'o' is NULL the command will operate on the dictionary associated with
805  * the current database.
806  *
807  * When 'o' is not NULL the function assumes that the first argument in
808  * the client arguments vector is a key so it skips it before iterating
809  * in order to parse options.
810  *
811  * In the case of a Hash object the function returns both the field and value
812  * of every element on the Hash. */
scanGenericCommand(client * c,robj * o,unsigned long cursor)813 void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
814     int i, j;
815     list *keys = listCreate();
816     listNode *node, *nextnode;
817     long count = 10;
818     sds pat = NULL;
819     sds typename = NULL;
820     int patlen = 0, use_pattern = 0;
821     dict *ht;
822 
823     /* Object must be NULL (to iterate keys names), or the type of the object
824      * must be Set, Sorted Set, or Hash. */
825     serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
826                 o->type == OBJ_ZSET);
827 
828     /* Set i to the first option argument. The previous one is the cursor. */
829     i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
830 
831     /* Step 1: Parse options. */
832     while (i < c->argc) {
833         j = c->argc - i;
834         if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
835             if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
836                 != C_OK)
837             {
838                 goto cleanup;
839             }
840 
841             if (count < 1) {
842                 addReply(c,shared.syntaxerr);
843                 goto cleanup;
844             }
845 
846             i += 2;
847         } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
848             pat = c->argv[i+1]->ptr;
849             patlen = sdslen(pat);
850 
851             /* The pattern always matches if it is exactly "*", so it is
852              * equivalent to disabling it. */
853             use_pattern = !(pat[0] == '*' && patlen == 1);
854 
855             i += 2;
856         } else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
857             /* SCAN for a particular type only applies to the db dict */
858             typename = c->argv[i+1]->ptr;
859             i+= 2;
860         } else {
861             addReply(c,shared.syntaxerr);
862             goto cleanup;
863         }
864     }
865 
866     /* Step 2: Iterate the collection.
867      *
868      * Note that if the object is encoded with a ziplist, intset, or any other
869      * representation that is not a hash table, we are sure that it is also
870      * composed of a small number of elements. So to avoid taking state we
871      * just return everything inside the object in a single call, setting the
872      * cursor to zero to signal the end of the iteration. */
873 
874     /* Handle the case of a hash table. */
875     ht = NULL;
876     if (o == NULL) {
877         ht = c->db->dict;
878     } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
879         ht = o->ptr;
880     } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
881         ht = o->ptr;
882         count *= 2; /* We return key / value for this type. */
883     } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
884         zset *zs = o->ptr;
885         ht = zs->dict;
886         count *= 2; /* We return key / value for this type. */
887     }
888 
889     if (ht) {
890         void *privdata[2];
891         /* We set the max number of iterations to ten times the specified
892          * COUNT, so if the hash table is in a pathological state (very
893          * sparsely populated) we avoid to block too much time at the cost
894          * of returning no or very few elements. */
895         long maxiterations = count*10;
896 
897         /* We pass two pointers to the callback: the list to which it will
898          * add new elements, and the object containing the dictionary so that
899          * it is possible to fetch more data in a type-dependent way. */
900         privdata[0] = keys;
901         privdata[1] = o;
902         do {
903             cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
904         } while (cursor &&
905               maxiterations-- &&
906               listLength(keys) < (unsigned long)count);
907     } else if (o->type == OBJ_SET) {
908         int pos = 0;
909         int64_t ll;
910 
911         while(intsetGet(o->ptr,pos++,&ll))
912             listAddNodeTail(keys,createStringObjectFromLongLong(ll));
913         cursor = 0;
914     } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
915         unsigned char *p = ziplistIndex(o->ptr,0);
916         unsigned char *vstr;
917         unsigned int vlen;
918         long long vll;
919 
920         while(p) {
921             ziplistGet(p,&vstr,&vlen,&vll);
922             listAddNodeTail(keys,
923                 (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
924                                  createStringObjectFromLongLong(vll));
925             p = ziplistNext(o->ptr,p);
926         }
927         cursor = 0;
928     } else {
929         serverPanic("Not handled encoding in SCAN.");
930     }
931 
932     /* Step 3: Filter elements. */
933     node = listFirst(keys);
934     while (node) {
935         robj *kobj = listNodeValue(node);
936         nextnode = listNextNode(node);
937         int filter = 0;
938 
939         /* Filter element if it does not match the pattern. */
940         if (!filter && use_pattern) {
941             if (sdsEncodedObject(kobj)) {
942                 if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
943                     filter = 1;
944             } else {
945                 char buf[LONG_STR_SIZE];
946                 int len;
947 
948                 serverAssert(kobj->encoding == OBJ_ENCODING_INT);
949                 len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
950                 if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
951             }
952         }
953 
954         /* Filter an element if it isn't the type we want. */
955         if (!filter && o == NULL && typename){
956             robj* typecheck = lookupKeyReadWithFlags(c->db, kobj, LOOKUP_NOTOUCH);
957             char* type = getObjectTypeName(typecheck);
958             if (strcasecmp((char*) typename, type)) filter = 1;
959         }
960 
961         /* Filter element if it is an expired key. */
962         if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
963 
964         /* Remove the element and its associated value if needed. */
965         if (filter) {
966             decrRefCount(kobj);
967             listDelNode(keys, node);
968         }
969 
970         /* If this is a hash or a sorted set, we have a flat list of
971          * key-value elements, so if this element was filtered, remove the
972          * value, or skip it if it was not filtered: we only match keys. */
973         if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
974             node = nextnode;
975             nextnode = listNextNode(node);
976             if (filter) {
977                 kobj = listNodeValue(node);
978                 decrRefCount(kobj);
979                 listDelNode(keys, node);
980             }
981         }
982         node = nextnode;
983     }
984 
985     /* Step 4: Reply to the client. */
986     addReplyArrayLen(c, 2);
987     addReplyBulkLongLong(c,cursor);
988 
989     addReplyArrayLen(c, listLength(keys));
990     while ((node = listFirst(keys)) != NULL) {
991         robj *kobj = listNodeValue(node);
992         addReplyBulk(c, kobj);
993         decrRefCount(kobj);
994         listDelNode(keys, node);
995     }
996 
997 cleanup:
998     listSetFreeMethod(keys,decrRefCountVoid);
999     listRelease(keys);
1000 }
1001 
1002 /* The SCAN command completely relies on scanGenericCommand. */
scanCommand(client * c)1003 void scanCommand(client *c) {
1004     unsigned long cursor;
1005     if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
1006     scanGenericCommand(c,NULL,cursor);
1007 }
1008 
dbsizeCommand(client * c)1009 void dbsizeCommand(client *c) {
1010     addReplyLongLong(c,dictSize(c->db->dict));
1011 }
1012 
lastsaveCommand(client * c)1013 void lastsaveCommand(client *c) {
1014     addReplyLongLong(c,server.lastsave);
1015 }
1016 
getObjectTypeName(robj * o)1017 char* getObjectTypeName(robj *o) {
1018     char* type;
1019     if (o == NULL) {
1020         type = "none";
1021     } else {
1022         switch(o->type) {
1023         case OBJ_STRING: type = "string"; break;
1024         case OBJ_LIST: type = "list"; break;
1025         case OBJ_SET: type = "set"; break;
1026         case OBJ_ZSET: type = "zset"; break;
1027         case OBJ_HASH: type = "hash"; break;
1028         case OBJ_STREAM: type = "stream"; break;
1029         case OBJ_MODULE: {
1030             moduleValue *mv = o->ptr;
1031             type = mv->type->name;
1032         }; break;
1033         default: type = "unknown"; break;
1034         }
1035     }
1036     return type;
1037 }
1038 
typeCommand(client * c)1039 void typeCommand(client *c) {
1040     robj *o;
1041     o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
1042     addReplyStatus(c, getObjectTypeName(o));
1043 }
1044 
shutdownCommand(client * c)1045 void shutdownCommand(client *c) {
1046     int flags = 0;
1047 
1048     if (c->argc > 2) {
1049         addReply(c,shared.syntaxerr);
1050         return;
1051     } else if (c->argc == 2) {
1052         if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
1053             flags |= SHUTDOWN_NOSAVE;
1054         } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
1055             flags |= SHUTDOWN_SAVE;
1056         } else {
1057             addReply(c,shared.syntaxerr);
1058             return;
1059         }
1060     }
1061     if (prepareForShutdown(flags) == C_OK) exit(0);
1062     addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
1063 }
1064 
renameGenericCommand(client * c,int nx)1065 void renameGenericCommand(client *c, int nx) {
1066     robj *o;
1067     long long expire;
1068     int samekey = 0;
1069 
1070     /* When source and dest key is the same, no operation is performed,
1071      * if the key exists, however we still return an error on unexisting key. */
1072     if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
1073 
1074     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
1075         return;
1076 
1077     if (samekey) {
1078         addReply(c,nx ? shared.czero : shared.ok);
1079         return;
1080     }
1081 
1082     incrRefCount(o);
1083     expire = getExpire(c->db,c->argv[1]);
1084     if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
1085         if (nx) {
1086             decrRefCount(o);
1087             addReply(c,shared.czero);
1088             return;
1089         }
1090         /* Overwrite: delete the old key before creating the new one
1091          * with the same name. */
1092         dbDelete(c->db,c->argv[2]);
1093     }
1094     dbAdd(c->db,c->argv[2],o);
1095     if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
1096     dbDelete(c->db,c->argv[1]);
1097     signalModifiedKey(c,c->db,c->argv[1]);
1098     signalModifiedKey(c,c->db,c->argv[2]);
1099     notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
1100         c->argv[1],c->db->id);
1101     notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
1102         c->argv[2],c->db->id);
1103     server.dirty++;
1104     addReply(c,nx ? shared.cone : shared.ok);
1105 }
1106 
renameCommand(client * c)1107 void renameCommand(client *c) {
1108     renameGenericCommand(c,0);
1109 }
1110 
renamenxCommand(client * c)1111 void renamenxCommand(client *c) {
1112     renameGenericCommand(c,1);
1113 }
1114 
moveCommand(client * c)1115 void moveCommand(client *c) {
1116     robj *o;
1117     redisDb *src, *dst;
1118     int srcid;
1119     long long dbid, expire;
1120 
1121     if (server.cluster_enabled) {
1122         addReplyError(c,"MOVE is not allowed in cluster mode");
1123         return;
1124     }
1125 
1126     /* Obtain source and target DB pointers */
1127     src = c->db;
1128     srcid = c->db->id;
1129 
1130     if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
1131         dbid < INT_MIN || dbid > INT_MAX ||
1132         selectDb(c,dbid) == C_ERR)
1133     {
1134         addReply(c,shared.outofrangeerr);
1135         return;
1136     }
1137     dst = c->db;
1138     selectDb(c,srcid); /* Back to the source DB */
1139 
1140     /* If the user is moving using as target the same
1141      * DB as the source DB it is probably an error. */
1142     if (src == dst) {
1143         addReply(c,shared.sameobjecterr);
1144         return;
1145     }
1146 
1147     /* Check if the element exists and get a reference */
1148     o = lookupKeyWrite(c->db,c->argv[1]);
1149     if (!o) {
1150         addReply(c,shared.czero);
1151         return;
1152     }
1153     expire = getExpire(c->db,c->argv[1]);
1154 
1155     /* Return zero if the key already exists in the target DB */
1156     if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
1157         addReply(c,shared.czero);
1158         return;
1159     }
1160     dbAdd(dst,c->argv[1],o);
1161     if (expire != -1) setExpire(c,dst,c->argv[1],expire);
1162     incrRefCount(o);
1163 
1164     /* OK! key moved, free the entry in the source DB */
1165     dbDelete(src,c->argv[1]);
1166     signalModifiedKey(c,src,c->argv[1]);
1167     signalModifiedKey(c,dst,c->argv[1]);
1168     notifyKeyspaceEvent(NOTIFY_GENERIC,
1169                 "move_from",c->argv[1],src->id);
1170     notifyKeyspaceEvent(NOTIFY_GENERIC,
1171                 "move_to",c->argv[1],dst->id);
1172 
1173     server.dirty++;
1174     addReply(c,shared.cone);
1175 }
1176 
1177 /* Helper function for dbSwapDatabases(): scans the list of keys that have
1178  * one or more blocked clients for B[LR]POP or other blocking commands
1179  * and signal the keys as ready if they are of the right type. See the comment
1180  * where the function is used for more info. */
scanDatabaseForReadyLists(redisDb * db)1181 void scanDatabaseForReadyLists(redisDb *db) {
1182     dictEntry *de;
1183     dictIterator *di = dictGetSafeIterator(db->blocking_keys);
1184     while((de = dictNext(di)) != NULL) {
1185         robj *key = dictGetKey(de);
1186         robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
1187         if (value && (value->type == OBJ_LIST ||
1188                       value->type == OBJ_STREAM ||
1189                       value->type == OBJ_ZSET))
1190             signalKeyAsReady(db, key);
1191     }
1192     dictReleaseIterator(di);
1193 }
1194 
1195 /* Swap two databases at runtime so that all clients will magically see
1196  * the new database even if already connected. Note that the client
1197  * structure c->db points to a given DB, so we need to be smarter and
1198  * swap the underlying referenced structures, otherwise we would need
1199  * to fix all the references to the Redis DB structure.
1200  *
1201  * Returns C_ERR if at least one of the DB ids are out of range, otherwise
1202  * C_OK is returned. */
dbSwapDatabases(long id1,long id2)1203 int dbSwapDatabases(long id1, long id2) {
1204     if (id1 < 0 || id1 >= server.dbnum ||
1205         id2 < 0 || id2 >= server.dbnum) return C_ERR;
1206     if (id1 == id2) return C_OK;
1207     redisDb aux = server.db[id1];
1208     redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
1209 
1210     /* Swap hash tables. Note that we don't swap blocking_keys,
1211      * ready_keys and watched_keys, since we want clients to
1212      * remain in the same DB they were. */
1213     db1->dict = db2->dict;
1214     db1->expires = db2->expires;
1215     db1->avg_ttl = db2->avg_ttl;
1216     db1->expires_cursor = db2->expires_cursor;
1217 
1218     db2->dict = aux.dict;
1219     db2->expires = aux.expires;
1220     db2->avg_ttl = aux.avg_ttl;
1221     db2->expires_cursor = aux.expires_cursor;
1222 
1223     /* Now we need to handle clients blocked on lists: as an effect
1224      * of swapping the two DBs, a client that was waiting for list
1225      * X in a given DB, may now actually be unblocked if X happens
1226      * to exist in the new version of the DB, after the swap.
1227      *
1228      * However normally we only do this check for efficiency reasons
1229      * in dbAdd() when a list is created. So here we need to rescan
1230      * the list of clients blocked on lists and signal lists as ready
1231      * if needed.
1232      *
1233      * Also the swapdb should make transaction fail if there is any
1234      * client watching keys */
1235     scanDatabaseForReadyLists(db1);
1236     touchAllWatchedKeysInDb(db1, db2);
1237     scanDatabaseForReadyLists(db2);
1238     touchAllWatchedKeysInDb(db2, db1);
1239     return C_OK;
1240 }
1241 
1242 /* SWAPDB db1 db2 */
swapdbCommand(client * c)1243 void swapdbCommand(client *c) {
1244     long id1, id2;
1245 
1246     /* Not allowed in cluster mode: we have just DB 0 there. */
1247     if (server.cluster_enabled) {
1248         addReplyError(c,"SWAPDB is not allowed in cluster mode");
1249         return;
1250     }
1251 
1252     /* Get the two DBs indexes. */
1253     if (getLongFromObjectOrReply(c, c->argv[1], &id1,
1254         "invalid first DB index") != C_OK)
1255         return;
1256 
1257     if (getLongFromObjectOrReply(c, c->argv[2], &id2,
1258         "invalid second DB index") != C_OK)
1259         return;
1260 
1261     /* Swap... */
1262     if (dbSwapDatabases(id1,id2) == C_ERR) {
1263         addReplyError(c,"DB index is out of range");
1264         return;
1265     } else {
1266         RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2};
1267         moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si);
1268         server.dirty++;
1269         addReply(c,shared.ok);
1270     }
1271 }
1272 
1273 /*-----------------------------------------------------------------------------
1274  * Expires API
1275  *----------------------------------------------------------------------------*/
1276 
removeExpire(redisDb * db,robj * key)1277 int removeExpire(redisDb *db, robj *key) {
1278     /* An expire may only be removed if there is a corresponding entry in the
1279      * main dict. Otherwise, the key will never be freed. */
1280     serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1281     return dictDelete(db->expires,key->ptr) == DICT_OK;
1282 }
1283 
1284 /* Set an expire to the specified key. If the expire is set in the context
1285  * of an user calling a command 'c' is the client, otherwise 'c' is set
1286  * to NULL. The 'when' parameter is the absolute unix time in milliseconds
1287  * after which the key will no longer be considered valid. */
setExpire(client * c,redisDb * db,robj * key,long long when)1288 void setExpire(client *c, redisDb *db, robj *key, long long when) {
1289     dictEntry *kde, *de;
1290 
1291     /* Reuse the sds from the main dict in the expire dict */
1292     kde = dictFind(db->dict,key->ptr);
1293     serverAssertWithInfo(NULL,key,kde != NULL);
1294     de = dictAddOrFind(db->expires,dictGetKey(kde));
1295     dictSetSignedIntegerVal(de,when);
1296 
1297     int writable_slave = server.masterhost && server.repl_slave_ro == 0;
1298     if (c && writable_slave && !(c->flags & CLIENT_MASTER))
1299         rememberSlaveKeyWithExpire(db,key);
1300 }
1301 
1302 /* Return the expire time of the specified key, or -1 if no expire
1303  * is associated with this key (i.e. the key is non volatile) */
getExpire(redisDb * db,robj * key)1304 long long getExpire(redisDb *db, robj *key) {
1305     dictEntry *de;
1306 
1307     /* No expire? return ASAP */
1308     if (dictSize(db->expires) == 0 ||
1309        (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
1310 
1311     /* The entry was found in the expire dict, this means it should also
1312      * be present in the main dict (safety check). */
1313     serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1314     return dictGetSignedIntegerVal(de);
1315 }
1316 
1317 /* Propagate expires into slaves and the AOF file.
1318  * When a key expires in the master, a DEL operation for this key is sent
1319  * to all the slaves and the AOF file if enabled.
1320  *
1321  * This way the key expiry is centralized in one place, and since both
1322  * AOF and the master->slave link guarantee operation ordering, everything
1323  * will be consistent even if we allow write operations against expiring
1324  * keys. */
propagateExpire(redisDb * db,robj * key,int lazy)1325 void propagateExpire(redisDb *db, robj *key, int lazy) {
1326     robj *argv[2];
1327 
1328     argv[0] = lazy ? shared.unlink : shared.del;
1329     argv[1] = key;
1330     incrRefCount(argv[0]);
1331     incrRefCount(argv[1]);
1332 
1333     if (server.aof_state != AOF_OFF)
1334         feedAppendOnlyFile(server.delCommand,db->id,argv,2);
1335     replicationFeedSlaves(server.slaves,db->id,argv,2);
1336 
1337     decrRefCount(argv[0]);
1338     decrRefCount(argv[1]);
1339 }
1340 
1341 /* Check if the key is expired. */
keyIsExpired(redisDb * db,robj * key)1342 int keyIsExpired(redisDb *db, robj *key) {
1343     mstime_t when = getExpire(db,key);
1344     mstime_t now;
1345 
1346     if (when < 0) return 0; /* No expire for this key */
1347 
1348     /* Don't expire anything while loading. It will be done later. */
1349     if (server.loading) return 0;
1350 
1351     /* If we are in the context of a Lua script, we pretend that time is
1352      * blocked to when the Lua script started. This way a key can expire
1353      * only the first time it is accessed and not in the middle of the
1354      * script execution, making propagation to slaves / AOF consistent.
1355      * See issue #1525 on Github for more information. */
1356     if (server.lua_caller) {
1357         now = server.lua_time_start;
1358     }
1359     /* If we are in the middle of a command execution, we still want to use
1360      * a reference time that does not change: in that case we just use the
1361      * cached time, that we update before each call in the call() function.
1362      * This way we avoid that commands such as RPOPLPUSH or similar, that
1363      * may re-open the same key multiple times, can invalidate an already
1364      * open object in a next call, if the next call will see the key expired,
1365      * while the first did not. */
1366     else if (server.fixed_time_expire > 0) {
1367         now = server.mstime;
1368     }
1369     /* For the other cases, we want to use the most fresh time we have. */
1370     else {
1371         now = mstime();
1372     }
1373 
1374     /* The key expired if the current (virtual or real) time is greater
1375      * than the expire time of the key. */
1376     return now > when;
1377 }
1378 
1379 /* This function is called when we are going to perform some operation
1380  * in a given key, but such key may be already logically expired even if
1381  * it still exists in the database. The main way this function is called
1382  * is via lookupKey*() family of functions.
1383  *
1384  * The behavior of the function depends on the replication role of the
1385  * instance, because slave instances do not expire keys, they wait
1386  * for DELs from the master for consistency matters. However even
1387  * slaves will try to have a coherent return value for the function,
1388  * so that read commands executed in the slave side will be able to
1389  * behave like if the key is expired even if still present (because the
1390  * master has yet to propagate the DEL).
1391  *
1392  * In masters as a side effect of finding a key which is expired, such
1393  * key will be evicted from the database. Also this may trigger the
1394  * propagation of a DEL/UNLINK command in AOF / replication stream.
1395  *
1396  * The return value of the function is 0 if the key is still valid,
1397  * otherwise the function returns 1 if the key is expired. */
expireIfNeeded(redisDb * db,robj * key)1398 int expireIfNeeded(redisDb *db, robj *key) {
1399     if (!keyIsExpired(db,key)) return 0;
1400 
1401     /* If we are running in the context of a slave, instead of
1402      * evicting the expired key from the database, we return ASAP:
1403      * the slave key expiration is controlled by the master that will
1404      * send us synthesized DEL operations for expired keys.
1405      *
1406      * Still we try to return the right information to the caller,
1407      * that is, 0 if we think the key should be still valid, 1 if
1408      * we think the key is expired at this time. */
1409     if (server.masterhost != NULL) return 1;
1410 
1411     /* Delete the key */
1412     server.stat_expiredkeys++;
1413     propagateExpire(db,key,server.lazyfree_lazy_expire);
1414     notifyKeyspaceEvent(NOTIFY_EXPIRED,
1415         "expired",key,db->id);
1416     int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
1417                                                dbSyncDelete(db,key);
1418     if (retval) signalModifiedKey(NULL,db,key);
1419     return retval;
1420 }
1421 
1422 /* -----------------------------------------------------------------------------
1423  * API to get key arguments from commands
1424  * ---------------------------------------------------------------------------*/
1425 
1426 /* Prepare the getKeysResult struct to hold numkeys, either by using the
1427  * pre-allocated keysbuf or by allocating a new array on the heap.
1428  *
1429  * This function must be called at least once before starting to populate
1430  * the result, and can be called repeatedly to enlarge the result array.
1431  */
getKeysPrepareResult(getKeysResult * result,int numkeys)1432 int *getKeysPrepareResult(getKeysResult *result, int numkeys) {
1433     /* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack
1434      * buffer here. */
1435     if (!result->keys) {
1436         serverAssert(!result->numkeys);
1437         result->keys = result->keysbuf;
1438     }
1439 
1440     /* Resize if necessary */
1441     if (numkeys > result->size) {
1442         if (result->keys != result->keysbuf) {
1443             /* We're not using a static buffer, just (re)alloc */
1444             result->keys = zrealloc(result->keys, numkeys * sizeof(int));
1445         } else {
1446             /* We are using a static buffer, copy its contents */
1447             result->keys = zmalloc(numkeys * sizeof(int));
1448             if (result->numkeys)
1449                 memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(int));
1450         }
1451         result->size = numkeys;
1452     }
1453 
1454     return result->keys;
1455 }
1456 
1457 /* The base case is to use the keys position as given in the command table
1458  * (firstkey, lastkey, step). */
getKeysUsingCommandTable(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1459 int getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result) {
1460     int j, i = 0, last, *keys;
1461     UNUSED(argv);
1462 
1463     if (cmd->firstkey == 0) {
1464         result->numkeys = 0;
1465         return 0;
1466     }
1467 
1468     last = cmd->lastkey;
1469     if (last < 0) last = argc+last;
1470 
1471     int count = ((last - cmd->firstkey)+1);
1472     keys = getKeysPrepareResult(result, count);
1473 
1474     for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
1475         if (j >= argc) {
1476             /* Modules commands, and standard commands with a not fixed number
1477              * of arguments (negative arity parameter) do not have dispatch
1478              * time arity checks, so we need to handle the case where the user
1479              * passed an invalid number of arguments here. In this case we
1480              * return no keys and expect the command implementation to report
1481              * an arity or syntax error. */
1482             if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
1483                 getKeysFreeResult(result);
1484                 result->numkeys = 0;
1485                 return 0;
1486             } else {
1487                 serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
1488             }
1489         }
1490         keys[i++] = j;
1491     }
1492     result->numkeys = i;
1493     return i;
1494 }
1495 
1496 /* Return all the arguments that are keys in the command passed via argc / argv.
1497  *
1498  * The command returns the positions of all the key arguments inside the array,
1499  * so the actual return value is a heap allocated array of integers. The
1500  * length of the array is returned by reference into *numkeys.
1501  *
1502  * 'cmd' must be point to the corresponding entry into the redisCommand
1503  * table, according to the command name in argv[0].
1504  *
1505  * This function uses the command table if a command-specific helper function
1506  * is not required, otherwise it calls the command-specific function. */
getKeysFromCommand(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1507 int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1508     if (cmd->flags & CMD_MODULE_GETKEYS) {
1509         return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
1510     } else if (!(cmd->flags & CMD_MODULE) && cmd->getkeys_proc) {
1511         return cmd->getkeys_proc(cmd,argv,argc,result);
1512     } else {
1513         return getKeysUsingCommandTable(cmd,argv,argc,result);
1514     }
1515 }
1516 
1517 /* Free the result of getKeysFromCommand. */
getKeysFreeResult(getKeysResult * result)1518 void getKeysFreeResult(getKeysResult *result) {
1519     if (result && result->keys != result->keysbuf)
1520         zfree(result->keys);
1521 }
1522 
1523 /* Helper function to extract keys from following commands:
1524  * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
1525  * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
zunionInterGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1526 int zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1527     int i, num, *keys;
1528     UNUSED(cmd);
1529 
1530     num = atoi(argv[2]->ptr);
1531     /* Sanity check. Don't return any key if the command is going to
1532      * reply with syntax error. */
1533     if (num < 1 || num > (argc-3)) {
1534         result->numkeys = 0;
1535         return 0;
1536     }
1537 
1538     /* Keys in z{union,inter}store come from two places:
1539      * argv[1] = storage key,
1540      * argv[3...n] = keys to intersect */
1541     /* Total keys = {union,inter} keys + storage key */
1542     keys = getKeysPrepareResult(result, num+1);
1543     result->numkeys = num+1;
1544 
1545     /* Add all key positions for argv[3...n] to keys[] */
1546     for (i = 0; i < num; i++) keys[i] = 3+i;
1547 
1548     /* Finally add the argv[1] key position (the storage key target). */
1549     keys[num] = 1;
1550 
1551     return result->numkeys;
1552 }
1553 
1554 /* Helper function to extract keys from the following commands:
1555  * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
1556  * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
evalGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1557 int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1558     int i, num, *keys;
1559     UNUSED(cmd);
1560 
1561     num = atoi(argv[2]->ptr);
1562     /* Sanity check. Don't return any key if the command is going to
1563      * reply with syntax error. */
1564     if (num <= 0 || num > (argc-3)) {
1565         result->numkeys = 0;
1566         return 0;
1567     }
1568 
1569     keys = getKeysPrepareResult(result, num);
1570     result->numkeys = num;
1571 
1572     /* Add all key positions for argv[3...n] to keys[] */
1573     for (i = 0; i < num; i++) keys[i] = 3+i;
1574 
1575     return result->numkeys;
1576 }
1577 
1578 /* Helper function to extract keys from the SORT command.
1579  *
1580  * SORT <sort-key> ... STORE <store-key> ...
1581  *
1582  * The first argument of SORT is always a key, however a list of options
1583  * follow in SQL-alike style. Here we parse just the minimum in order to
1584  * correctly identify keys in the "STORE" option. */
sortGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1585 int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1586     int i, j, num, *keys, found_store = 0;
1587     UNUSED(cmd);
1588 
1589     num = 0;
1590     keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */
1591     keys[num++] = 1; /* <sort-key> is always present. */
1592 
1593     /* Search for STORE option. By default we consider options to don't
1594      * have arguments, so if we find an unknown option name we scan the
1595      * next. However there are options with 1 or 2 arguments, so we
1596      * provide a list here in order to skip the right number of args. */
1597     struct {
1598         char *name;
1599         int skip;
1600     } skiplist[] = {
1601         {"limit", 2},
1602         {"get", 1},
1603         {"by", 1},
1604         {NULL, 0} /* End of elements. */
1605     };
1606 
1607     for (i = 2; i < argc; i++) {
1608         for (j = 0; skiplist[j].name != NULL; j++) {
1609             if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
1610                 i += skiplist[j].skip;
1611                 break;
1612             } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
1613                 /* Note: we don't increment "num" here and continue the loop
1614                  * to be sure to process the *last* "STORE" option if multiple
1615                  * ones are provided. This is same behavior as SORT. */
1616                 found_store = 1;
1617                 keys[num] = i+1; /* <store-key> */
1618                 break;
1619             }
1620         }
1621     }
1622     result->numkeys = num + found_store;
1623     return result->numkeys;
1624 }
1625 
migrateGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1626 int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1627     int i, num, first, *keys;
1628     UNUSED(cmd);
1629 
1630     /* Assume the obvious form. */
1631     first = 3;
1632     num = 1;
1633 
1634     /* But check for the extended one with the KEYS option. */
1635     if (argc > 6) {
1636         for (i = 6; i < argc; i++) {
1637             if (!strcasecmp(argv[i]->ptr,"keys") &&
1638                 sdslen(argv[3]->ptr) == 0)
1639             {
1640                 first = i+1;
1641                 num = argc-first;
1642                 break;
1643             }
1644         }
1645     }
1646 
1647     keys = getKeysPrepareResult(result, num);
1648     for (i = 0; i < num; i++) keys[i] = first+i;
1649     result->numkeys = num;
1650     return num;
1651 }
1652 
1653 /* Helper function to extract keys from following commands:
1654  * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
1655  *                             [COUNT count] [STORE key] [STOREDIST key]
1656  * GEORADIUSBYMEMBER key member radius unit ... options ... */
georadiusGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1657 int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1658     int i, num, *keys;
1659     UNUSED(cmd);
1660 
1661     /* Check for the presence of the stored key in the command */
1662     int stored_key = -1;
1663     for (i = 5; i < argc; i++) {
1664         char *arg = argv[i]->ptr;
1665         /* For the case when user specifies both "store" and "storedist" options, the
1666          * second key specified would override the first key. This behavior is kept
1667          * the same as in georadiusCommand method.
1668          */
1669         if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) {
1670             stored_key = i+1;
1671             i++;
1672         }
1673     }
1674     num = 1 + (stored_key == -1 ? 0 : 1);
1675 
1676     /* Keys in the command come from two places:
1677      * argv[1] = key,
1678      * argv[5...n] = stored key if present
1679      */
1680     keys = getKeysPrepareResult(result, num);
1681 
1682     /* Add all key positions to keys[] */
1683     keys[0] = 1;
1684     if(num > 1) {
1685          keys[1] = stored_key;
1686     }
1687     result->numkeys = num;
1688     return num;
1689 }
1690 
1691 /* LCS ... [KEYS <key1> <key2>] ... */
lcsGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1692 int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1693     int i;
1694     int *keys = getKeysPrepareResult(result, 2);
1695     UNUSED(cmd);
1696 
1697     /* We need to parse the options of the command in order to check for the
1698      * "KEYS" argument before the "STRINGS" argument. */
1699     for (i = 1; i < argc; i++) {
1700         char *arg = argv[i]->ptr;
1701         int moreargs = (argc-1) - i;
1702 
1703         if (!strcasecmp(arg, "strings")) {
1704             break;
1705         } else if (!strcasecmp(arg, "keys") && moreargs >= 2) {
1706             keys[0] = i+1;
1707             keys[1] = i+2;
1708             result->numkeys = 2;
1709             return result->numkeys;
1710         }
1711     }
1712     result->numkeys = 0;
1713     return result->numkeys;
1714 }
1715 
1716 /* Helper function to extract keys from memory command.
1717  * MEMORY USAGE <key> */
memoryGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1718 int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1719     UNUSED(cmd);
1720 
1721     getKeysPrepareResult(result, 1);
1722     if (argc >= 3 && !strcasecmp(argv[1]->ptr,"usage")) {
1723         result->keys[0] = 2;
1724         result->numkeys = 1;
1725         return result->numkeys;
1726     }
1727     result->numkeys = 0;
1728     return 0;
1729 }
1730 
1731 /* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
1732  *       STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */
xreadGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1733 int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1734     int i, num = 0, *keys;
1735     UNUSED(cmd);
1736 
1737     /* We need to parse the options of the command in order to seek the first
1738      * "STREAMS" string which is actually the option. This is needed because
1739      * "STREAMS" could also be the name of the consumer group and even the
1740      * name of the stream key. */
1741     int streams_pos = -1;
1742     for (i = 1; i < argc; i++) {
1743         char *arg = argv[i]->ptr;
1744         if (!strcasecmp(arg, "block")) {
1745             i++; /* Skip option argument. */
1746         } else if (!strcasecmp(arg, "count")) {
1747             i++; /* Skip option argument. */
1748         } else if (!strcasecmp(arg, "group")) {
1749             i += 2; /* Skip option argument. */
1750         } else if (!strcasecmp(arg, "noack")) {
1751             /* Nothing to do. */
1752         } else if (!strcasecmp(arg, "streams")) {
1753             streams_pos = i;
1754             break;
1755         } else {
1756             break; /* Syntax error. */
1757         }
1758     }
1759     if (streams_pos != -1) num = argc - streams_pos - 1;
1760 
1761     /* Syntax error. */
1762     if (streams_pos == -1 || num == 0 || num % 2 != 0) {
1763         result->numkeys = 0;
1764         return 0;
1765     }
1766     num /= 2; /* We have half the keys as there are arguments because
1767                  there are also the IDs, one per key. */
1768 
1769     keys = getKeysPrepareResult(result, num);
1770     for (i = streams_pos+1; i < argc-num; i++) keys[i-streams_pos-1] = i;
1771     result->numkeys = num;
1772     return num;
1773 }
1774 
1775 /* Slot to Key API. This is used by Redis Cluster in order to obtain in
1776  * a fast way a key that belongs to a specified hash slot. This is useful
1777  * while rehashing the cluster and in other conditions when we need to
1778  * understand if we have keys for a given hash slot. */
slotToKeyUpdateKey(sds key,int add)1779 void slotToKeyUpdateKey(sds key, int add) {
1780     size_t keylen = sdslen(key);
1781     unsigned int hashslot = keyHashSlot(key,keylen);
1782     unsigned char buf[64];
1783     unsigned char *indexed = buf;
1784 
1785     server.cluster->slots_keys_count[hashslot] += add ? 1 : -1;
1786     if (keylen+2 > 64) indexed = zmalloc(keylen+2);
1787     indexed[0] = (hashslot >> 8) & 0xff;
1788     indexed[1] = hashslot & 0xff;
1789     memcpy(indexed+2,key,keylen);
1790     if (add) {
1791         raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
1792     } else {
1793         raxRemove(server.cluster->slots_to_keys,indexed,keylen+2,NULL);
1794     }
1795     if (indexed != buf) zfree(indexed);
1796 }
1797 
slotToKeyAdd(sds key)1798 void slotToKeyAdd(sds key) {
1799     slotToKeyUpdateKey(key,1);
1800 }
1801 
slotToKeyDel(sds key)1802 void slotToKeyDel(sds key) {
1803     slotToKeyUpdateKey(key,0);
1804 }
1805 
1806 /* Release the radix tree mapping Redis Cluster keys to slots. If 'async'
1807  * is true, we release it asynchronously. */
freeSlotsToKeysMap(rax * rt,int async)1808 void freeSlotsToKeysMap(rax *rt, int async) {
1809     if (async) {
1810         freeSlotsToKeysMapAsync(rt);
1811     } else {
1812         raxFree(rt);
1813     }
1814 }
1815 
1816 /* Empty the slots-keys map of Redis CLuster by creating a new empty one and
1817  * freeing the old one. */
slotToKeyFlush(int async)1818 void slotToKeyFlush(int async) {
1819     rax *old = server.cluster->slots_to_keys;
1820 
1821     server.cluster->slots_to_keys = raxNew();
1822     memset(server.cluster->slots_keys_count,0,
1823            sizeof(server.cluster->slots_keys_count));
1824     freeSlotsToKeysMap(old, async);
1825 }
1826 
1827 /* Pupulate the specified array of objects with keys in the specified slot.
1828  * New objects are returned to represent keys, it's up to the caller to
1829  * decrement the reference count to release the keys names. */
getKeysInSlot(unsigned int hashslot,robj ** keys,unsigned int count)1830 unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
1831     raxIterator iter;
1832     int j = 0;
1833     unsigned char indexed[2];
1834 
1835     indexed[0] = (hashslot >> 8) & 0xff;
1836     indexed[1] = hashslot & 0xff;
1837     raxStart(&iter,server.cluster->slots_to_keys);
1838     raxSeek(&iter,">=",indexed,2);
1839     while(count-- && raxNext(&iter)) {
1840         if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
1841         keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
1842     }
1843     raxStop(&iter);
1844     return j;
1845 }
1846 
1847 /* Remove all the keys in the specified hash slot.
1848  * The number of removed items is returned. */
delKeysInSlot(unsigned int hashslot)1849 unsigned int delKeysInSlot(unsigned int hashslot) {
1850     raxIterator iter;
1851     int j = 0;
1852     unsigned char indexed[2];
1853 
1854     indexed[0] = (hashslot >> 8) & 0xff;
1855     indexed[1] = hashslot & 0xff;
1856     raxStart(&iter,server.cluster->slots_to_keys);
1857     while(server.cluster->slots_keys_count[hashslot]) {
1858         raxSeek(&iter,">=",indexed,2);
1859         raxNext(&iter);
1860 
1861         robj *key = createStringObject((char*)iter.key+2,iter.key_len-2);
1862         dbDelete(&server.db[0],key);
1863         decrRefCount(key);
1864         j++;
1865     }
1866     raxStop(&iter);
1867     return j;
1868 }
1869 
countKeysInSlot(unsigned int hashslot)1870 unsigned int countKeysInSlot(unsigned int hashslot) {
1871     return server.cluster->slots_keys_count[hashslot];
1872 }
1873