1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include "cluster.h"
32 #include "atomicvar.h"
33
34 #include <signal.h>
35 #include <ctype.h>
36
37 /* Database backup. */
38 struct dbBackup {
39 redisDb *dbarray;
40 rax *slots_to_keys;
41 uint64_t slots_keys_count[CLUSTER_SLOTS];
42 };
43
44 /*-----------------------------------------------------------------------------
45 * C-level DB API
46 *----------------------------------------------------------------------------*/
47
48 int keyIsExpired(redisDb *db, robj *key);
49
50 /* Update LFU when an object is accessed.
51 * Firstly, decrement the counter if the decrement time is reached.
52 * Then logarithmically increment the counter, and update the access time. */
updateLFU(robj * val)53 void updateLFU(robj *val) {
54 unsigned long counter = LFUDecrAndReturn(val);
55 counter = LFULogIncr(counter);
56 val->lru = (LFUGetTimeInMinutes()<<8) | counter;
57 }
58
59 /* Low level key lookup API, not actually called directly from commands
60 * implementations that should instead rely on lookupKeyRead(),
61 * lookupKeyWrite() and lookupKeyReadWithFlags(). */
lookupKey(redisDb * db,robj * key,int flags)62 robj *lookupKey(redisDb *db, robj *key, int flags) {
63 dictEntry *de = dictFind(db->dict,key->ptr);
64 if (de) {
65 robj *val = dictGetVal(de);
66
67 /* Update the access time for the ageing algorithm.
68 * Don't do it if we have a saving child, as this will trigger
69 * a copy on write madness. */
70 if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
71 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
72 updateLFU(val);
73 } else {
74 val->lru = LRU_CLOCK();
75 }
76 }
77 return val;
78 } else {
79 return NULL;
80 }
81 }
82
83 /* Lookup a key for read operations, or return NULL if the key is not found
84 * in the specified DB.
85 *
86 * As a side effect of calling this function:
87 * 1. A key gets expired if it reached it's TTL.
88 * 2. The key last access time is updated.
89 * 3. The global keys hits/misses stats are updated (reported in INFO).
90 * 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
91 *
92 * This API should not be used when we write to the key after obtaining
93 * the object linked to the key, but only for read only operations.
94 *
95 * Flags change the behavior of this command:
96 *
97 * LOOKUP_NONE (or zero): no special flags are passed.
98 * LOOKUP_NOTOUCH: don't alter the last access time of the key.
99 *
100 * Note: this function also returns NULL if the key is logically expired
101 * but still existing, in case this is a slave, since this API is called only
102 * for read operations. Even if the key expiry is master-driven, we can
103 * correctly report a key is expired on slaves even if the master is lagging
104 * expiring our key via DELs in the replication link. */
lookupKeyReadWithFlags(redisDb * db,robj * key,int flags)105 robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
106 robj *val;
107
108 if (expireIfNeeded(db,key) == 1) {
109 /* Key expired. If we are in the context of a master, expireIfNeeded()
110 * returns 0 only when the key does not exist at all, so it's safe
111 * to return NULL ASAP. */
112 if (server.masterhost == NULL)
113 goto keymiss;
114
115 /* However if we are in the context of a slave, expireIfNeeded() will
116 * not really try to expire the key, it only returns information
117 * about the "logical" status of the key: key expiring is up to the
118 * master in order to have a consistent view of master's data set.
119 *
120 * However, if the command caller is not the master, and as additional
121 * safety measure, the command invoked is a read-only command, we can
122 * safely return NULL here, and provide a more consistent behavior
123 * to clients accessing expired values in a read-only fashion, that
124 * will say the key as non existing.
125 *
126 * Notably this covers GETs when slaves are used to scale reads. */
127 if (server.current_client &&
128 server.current_client != server.master &&
129 server.current_client->cmd &&
130 server.current_client->cmd->flags & CMD_READONLY)
131 {
132 goto keymiss;
133 }
134 }
135 val = lookupKey(db,key,flags);
136 if (val == NULL)
137 goto keymiss;
138 server.stat_keyspace_hits++;
139 return val;
140
141 keymiss:
142 if (!(flags & LOOKUP_NONOTIFY)) {
143 server.stat_keyspace_misses++;
144 notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
145 }
146 return NULL;
147 }
148
149 /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
150 * common case. */
lookupKeyRead(redisDb * db,robj * key)151 robj *lookupKeyRead(redisDb *db, robj *key) {
152 return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
153 }
154
155 /* Lookup a key for write operations, and as a side effect, if needed, expires
156 * the key if its TTL is reached.
157 *
158 * Returns the linked value object if the key exists or NULL if the key
159 * does not exist in the specified DB. */
lookupKeyWriteWithFlags(redisDb * db,robj * key,int flags)160 robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
161 expireIfNeeded(db,key);
162 return lookupKey(db,key,flags);
163 }
164
lookupKeyWrite(redisDb * db,robj * key)165 robj *lookupKeyWrite(redisDb *db, robj *key) {
166 return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
167 }
168
lookupKeyReadOrReply(client * c,robj * key,robj * reply)169 robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
170 robj *o = lookupKeyRead(c->db, key);
171 if (!o) addReply(c,reply);
172 return o;
173 }
174
lookupKeyWriteOrReply(client * c,robj * key,robj * reply)175 robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
176 robj *o = lookupKeyWrite(c->db, key);
177 if (!o) addReply(c,reply);
178 return o;
179 }
180
181 /* Add the key to the DB. It's up to the caller to increment the reference
182 * counter of the value if needed.
183 *
184 * The program is aborted if the key already exists. */
dbAdd(redisDb * db,robj * key,robj * val)185 void dbAdd(redisDb *db, robj *key, robj *val) {
186 sds copy = sdsdup(key->ptr);
187 int retval = dictAdd(db->dict, copy, val);
188
189 serverAssertWithInfo(NULL,key,retval == DICT_OK);
190 if (val->type == OBJ_LIST ||
191 val->type == OBJ_ZSET ||
192 val->type == OBJ_STREAM)
193 signalKeyAsReady(db, key);
194 if (server.cluster_enabled) slotToKeyAdd(key->ptr);
195 }
196
197 /* This is a special version of dbAdd() that is used only when loading
198 * keys from the RDB file: the key is passed as an SDS string that is
199 * retained by the function (and not freed by the caller).
200 *
201 * Moreover this function will not abort if the key is already busy, to
202 * give more control to the caller, nor will signal the key as ready
203 * since it is not useful in this context.
204 *
205 * The function returns 1 if the key was added to the database, taking
206 * ownership of the SDS string, otherwise 0 is returned, and is up to the
207 * caller to free the SDS string. */
dbAddRDBLoad(redisDb * db,sds key,robj * val)208 int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
209 int retval = dictAdd(db->dict, key, val);
210 if (retval != DICT_OK) return 0;
211 if (server.cluster_enabled) slotToKeyAdd(key);
212 return 1;
213 }
214
215 /* Overwrite an existing key with a new value. Incrementing the reference
216 * count of the new value is up to the caller.
217 * This function does not modify the expire time of the existing key.
218 *
219 * The program is aborted if the key was not already present. */
dbOverwrite(redisDb * db,robj * key,robj * val)220 void dbOverwrite(redisDb *db, robj *key, robj *val) {
221 dictEntry *de = dictFind(db->dict,key->ptr);
222
223 serverAssertWithInfo(NULL,key,de != NULL);
224 dictEntry auxentry = *de;
225 robj *old = dictGetVal(de);
226 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
227 val->lru = old->lru;
228 }
229 dictSetVal(db->dict, de, val);
230
231 if (server.lazyfree_lazy_server_del) {
232 freeObjAsync(old);
233 dictSetVal(db->dict, &auxentry, NULL);
234 }
235
236 dictFreeVal(db->dict, &auxentry);
237 }
238
239 /* High level Set operation. This function can be used in order to set
240 * a key, whatever it was existing or not, to a new object.
241 *
242 * 1) The ref count of the value object is incremented.
243 * 2) clients WATCHing for the destination key notified.
244 * 3) The expire time of the key is reset (the key is made persistent),
245 * unless 'keepttl' is true.
246 *
247 * All the new keys in the database should be created via this interface.
248 * The client 'c' argument may be set to NULL if the operation is performed
249 * in a context where there is no clear client performing the operation. */
genericSetKey(client * c,redisDb * db,robj * key,robj * val,int keepttl,int signal)250 void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
251 if (lookupKeyWrite(db,key) == NULL) {
252 dbAdd(db,key,val);
253 } else {
254 dbOverwrite(db,key,val);
255 }
256 incrRefCount(val);
257 if (!keepttl) removeExpire(db,key);
258 if (signal) signalModifiedKey(c,db,key);
259 }
260
261 /* Common case for genericSetKey() where the TTL is not retained. */
setKey(client * c,redisDb * db,robj * key,robj * val)262 void setKey(client *c, redisDb *db, robj *key, robj *val) {
263 genericSetKey(c,db,key,val,0,1);
264 }
265
266 /* Return true if the specified key exists in the specified database.
267 * LRU/LFU info is not updated in any way. */
dbExists(redisDb * db,robj * key)268 int dbExists(redisDb *db, robj *key) {
269 return dictFind(db->dict,key->ptr) != NULL;
270 }
271
272 /* Return a random key, in form of a Redis object.
273 * If there are no keys, NULL is returned.
274 *
275 * The function makes sure to return keys not already expired. */
dbRandomKey(redisDb * db)276 robj *dbRandomKey(redisDb *db) {
277 dictEntry *de;
278 int maxtries = 100;
279 int allvolatile = dictSize(db->dict) == dictSize(db->expires);
280
281 while(1) {
282 sds key;
283 robj *keyobj;
284
285 de = dictGetFairRandomKey(db->dict);
286 if (de == NULL) return NULL;
287
288 key = dictGetKey(de);
289 keyobj = createStringObject(key,sdslen(key));
290 if (dictFind(db->expires,key)) {
291 if (allvolatile && server.masterhost && --maxtries == 0) {
292 /* If the DB is composed only of keys with an expire set,
293 * it could happen that all the keys are already logically
294 * expired in the slave, so the function cannot stop because
295 * expireIfNeeded() is false, nor it can stop because
296 * dictGetRandomKey() returns NULL (there are keys to return).
297 * To prevent the infinite loop we do some tries, but if there
298 * are the conditions for an infinite loop, eventually we
299 * return a key name that may be already expired. */
300 return keyobj;
301 }
302 if (expireIfNeeded(db,keyobj)) {
303 decrRefCount(keyobj);
304 continue; /* search for another key. This expired. */
305 }
306 }
307 return keyobj;
308 }
309 }
310
311 /* Delete a key, value, and associated expiration entry if any, from the DB */
dbSyncDelete(redisDb * db,robj * key)312 int dbSyncDelete(redisDb *db, robj *key) {
313 /* Deleting an entry from the expires dict will not free the sds of
314 * the key, because it is shared with the main dictionary. */
315 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
316 if (dictDelete(db->dict,key->ptr) == DICT_OK) {
317 if (server.cluster_enabled) slotToKeyDel(key->ptr);
318 return 1;
319 } else {
320 return 0;
321 }
322 }
323
324 /* This is a wrapper whose behavior depends on the Redis lazy free
325 * configuration. Deletes the key synchronously or asynchronously. */
dbDelete(redisDb * db,robj * key)326 int dbDelete(redisDb *db, robj *key) {
327 return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
328 dbSyncDelete(db,key);
329 }
330
331 /* Prepare the string object stored at 'key' to be modified destructively
332 * to implement commands like SETBIT or APPEND.
333 *
334 * An object is usually ready to be modified unless one of the two conditions
335 * are true:
336 *
337 * 1) The object 'o' is shared (refcount > 1), we don't want to affect
338 * other users.
339 * 2) The object encoding is not "RAW".
340 *
341 * If the object is found in one of the above conditions (or both) by the
342 * function, an unshared / not-encoded copy of the string object is stored
343 * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
344 * returned.
345 *
346 * USAGE:
347 *
348 * The object 'o' is what the caller already obtained by looking up 'key'
349 * in 'db', the usage pattern looks like this:
350 *
351 * o = lookupKeyWrite(db,key);
352 * if (checkType(c,o,OBJ_STRING)) return;
353 * o = dbUnshareStringValue(db,key,o);
354 *
355 * At this point the caller is ready to modify the object, for example
356 * using an sdscat() call to append some data, or anything else.
357 */
dbUnshareStringValue(redisDb * db,robj * key,robj * o)358 robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
359 serverAssert(o->type == OBJ_STRING);
360 if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
361 robj *decoded = getDecodedObject(o);
362 o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
363 decrRefCount(decoded);
364 dbOverwrite(db,key,o);
365 }
366 return o;
367 }
368
369 /* Remove all keys from the database(s) structure. The dbarray argument
370 * may not be the server main DBs (could be a backup).
371 *
372 * The dbnum can be -1 if all the DBs should be emptied, or the specified
373 * DB index if we want to empty only a single database.
374 * The function returns the number of keys removed from the database(s). */
emptyDbStructure(redisDb * dbarray,int dbnum,int async,void (callback)(void *))375 long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
376 void(callback)(void*))
377 {
378 long long removed = 0;
379 int startdb, enddb;
380
381 if (dbnum == -1) {
382 startdb = 0;
383 enddb = server.dbnum-1;
384 } else {
385 startdb = enddb = dbnum;
386 }
387
388 for (int j = startdb; j <= enddb; j++) {
389 removed += dictSize(dbarray[j].dict);
390 if (async) {
391 emptyDbAsync(&dbarray[j]);
392 } else {
393 dictEmpty(dbarray[j].dict,callback);
394 dictEmpty(dbarray[j].expires,callback);
395 }
396 /* Because all keys of database are removed, reset average ttl. */
397 dbarray[j].avg_ttl = 0;
398 dbarray[j].expires_cursor = 0;
399 }
400
401 return removed;
402 }
403
404 /* Remove all keys from all the databases in a Redis server.
405 * If callback is given the function is called from time to time to
406 * signal that work is in progress.
407 *
408 * The dbnum can be -1 if all the DBs should be flushed, or the specified
409 * DB number if we want to flush only a single Redis database number.
410 *
411 * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
412 * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
413 * and the function to return ASAP.
414 *
415 * On success the function returns the number of keys removed from the
416 * database(s). Otherwise -1 is returned in the specific case the
417 * DB number is out of range, and errno is set to EINVAL. */
emptyDb(int dbnum,int flags,void (callback)(void *))418 long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
419 int async = (flags & EMPTYDB_ASYNC);
420 RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
421 long long removed = 0;
422
423 if (dbnum < -1 || dbnum >= server.dbnum) {
424 errno = EINVAL;
425 return -1;
426 }
427
428 /* Fire the flushdb modules event. */
429 moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
430 REDISMODULE_SUBEVENT_FLUSHDB_START,
431 &fi);
432
433 /* Make sure the WATCHed keys are affected by the FLUSH* commands.
434 * Note that we need to call the function while the keys are still
435 * there. */
436 signalFlushedDb(dbnum);
437
438 /* Empty redis database structure. */
439 removed = emptyDbStructure(server.db, dbnum, async, callback);
440
441 /* Flush slots to keys map if enable cluster, we can flush entire
442 * slots to keys map whatever dbnum because only support one DB
443 * in cluster mode. */
444 if (server.cluster_enabled) slotToKeyFlush(async);
445
446 if (dbnum == -1) flushSlaveKeysWithExpireList();
447
448 /* Also fire the end event. Note that this event will fire almost
449 * immediately after the start event if the flush is asynchronous. */
450 moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
451 REDISMODULE_SUBEVENT_FLUSHDB_END,
452 &fi);
453
454 return removed;
455 }
456
457 /* Store a backup of the database for later use, and put an empty one
458 * instead of it. */
backupDb(void)459 dbBackup *backupDb(void) {
460 dbBackup *backup = zmalloc(sizeof(dbBackup));
461
462 /* Backup main DBs. */
463 backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum);
464 for (int i=0; i<server.dbnum; i++) {
465 backup->dbarray[i] = server.db[i];
466 server.db[i].dict = dictCreate(&dbDictType,NULL);
467 server.db[i].expires = dictCreate(&keyptrDictType,NULL);
468 }
469
470 /* Backup cluster slots to keys map if enable cluster. */
471 if (server.cluster_enabled) {
472 backup->slots_to_keys = server.cluster->slots_to_keys;
473 memcpy(backup->slots_keys_count, server.cluster->slots_keys_count,
474 sizeof(server.cluster->slots_keys_count));
475 server.cluster->slots_to_keys = raxNew();
476 memset(server.cluster->slots_keys_count, 0,
477 sizeof(server.cluster->slots_keys_count));
478 }
479
480 return backup;
481 }
482
483 /* Discard a previously created backup, this can be slow (similar to FLUSHALL)
484 * Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */
discardDbBackup(dbBackup * buckup,int flags,void (callback)(void *))485 void discardDbBackup(dbBackup *buckup, int flags, void(callback)(void*)) {
486 int async = (flags & EMPTYDB_ASYNC);
487
488 /* Release main DBs backup . */
489 emptyDbStructure(buckup->dbarray, -1, async, callback);
490 for (int i=0; i<server.dbnum; i++) {
491 dictRelease(buckup->dbarray[i].dict);
492 dictRelease(buckup->dbarray[i].expires);
493 }
494
495 /* Release slots to keys map backup if enable cluster. */
496 if (server.cluster_enabled) freeSlotsToKeysMap(buckup->slots_to_keys, async);
497
498 /* Release buckup. */
499 zfree(buckup->dbarray);
500 zfree(buckup);
501 }
502
503 /* Restore the previously created backup (discarding what currently resides
504 * in the db).
505 * This function should be called after the current contents of the database
506 * was emptied with a previous call to emptyDb (possibly using the async mode). */
restoreDbBackup(dbBackup * buckup)507 void restoreDbBackup(dbBackup *buckup) {
508 /* Restore main DBs. */
509 for (int i=0; i<server.dbnum; i++) {
510 serverAssert(dictSize(server.db[i].dict) == 0);
511 serverAssert(dictSize(server.db[i].expires) == 0);
512 dictRelease(server.db[i].dict);
513 dictRelease(server.db[i].expires);
514 server.db[i] = buckup->dbarray[i];
515 }
516
517 /* Restore slots to keys map backup if enable cluster. */
518 if (server.cluster_enabled) {
519 serverAssert(server.cluster->slots_to_keys->numele == 0);
520 raxFree(server.cluster->slots_to_keys);
521 server.cluster->slots_to_keys = buckup->slots_to_keys;
522 memcpy(server.cluster->slots_keys_count, buckup->slots_keys_count,
523 sizeof(server.cluster->slots_keys_count));
524 }
525
526 /* Release buckup. */
527 zfree(buckup->dbarray);
528 zfree(buckup);
529 }
530
selectDb(client * c,int id)531 int selectDb(client *c, int id) {
532 if (id < 0 || id >= server.dbnum)
533 return C_ERR;
534 c->db = &server.db[id];
535 return C_OK;
536 }
537
dbTotalServerKeyCount()538 long long dbTotalServerKeyCount() {
539 long long total = 0;
540 int j;
541 for (j = 0; j < server.dbnum; j++) {
542 total += dictSize(server.db[j].dict);
543 }
544 return total;
545 }
546
547 /*-----------------------------------------------------------------------------
548 * Hooks for key space changes.
549 *
550 * Every time a key in the database is modified the function
551 * signalModifiedKey() is called.
552 *
553 * Every time a DB is flushed the function signalFlushDb() is called.
554 *----------------------------------------------------------------------------*/
555
556 /* Note that the 'c' argument may be NULL if the key was modified out of
557 * a context of a client. */
signalModifiedKey(client * c,redisDb * db,robj * key)558 void signalModifiedKey(client *c, redisDb *db, robj *key) {
559 touchWatchedKey(db,key);
560 trackingInvalidateKey(c,key);
561 }
562
signalFlushedDb(int dbid)563 void signalFlushedDb(int dbid) {
564 int startdb, enddb;
565 if (dbid == -1) {
566 startdb = 0;
567 enddb = server.dbnum-1;
568 } else {
569 startdb = enddb = dbid;
570 }
571
572 for (int j = startdb; j <= enddb; j++) {
573 touchAllWatchedKeysInDb(&server.db[j], NULL);
574 }
575
576 trackingInvalidateKeysOnFlush(dbid);
577 }
578
579 /*-----------------------------------------------------------------------------
580 * Type agnostic commands operating on the key space
581 *----------------------------------------------------------------------------*/
582
583 /* Return the set of flags to use for the emptyDb() call for FLUSHALL
584 * and FLUSHDB commands.
585 *
586 * Currently the command just attempts to parse the "ASYNC" option. It
587 * also checks if the command arity is wrong.
588 *
589 * On success C_OK is returned and the flags are stored in *flags, otherwise
590 * C_ERR is returned and the function sends an error to the client. */
getFlushCommandFlags(client * c,int * flags)591 int getFlushCommandFlags(client *c, int *flags) {
592 /* Parse the optional ASYNC option. */
593 if (c->argc > 1) {
594 if (c->argc > 2 || strcasecmp(c->argv[1]->ptr,"async")) {
595 addReply(c,shared.syntaxerr);
596 return C_ERR;
597 }
598 *flags = EMPTYDB_ASYNC;
599 } else {
600 *flags = EMPTYDB_NO_FLAGS;
601 }
602 return C_OK;
603 }
604
605 /* Flushes the whole server data set. */
flushAllDataAndResetRDB(int flags)606 void flushAllDataAndResetRDB(int flags) {
607 server.dirty += emptyDb(-1,flags,NULL);
608 if (server.rdb_child_pid != -1) killRDBChild();
609 if (server.saveparamslen > 0) {
610 /* Normally rdbSave() will reset dirty, but we don't want this here
611 * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
612 int saved_dirty = server.dirty;
613 rdbSaveInfo rsi, *rsiptr;
614 rsiptr = rdbPopulateSaveInfo(&rsi);
615 rdbSave(server.rdb_filename,rsiptr);
616 server.dirty = saved_dirty;
617 }
618 server.dirty++;
619 #if defined(USE_JEMALLOC)
620 /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
621 * for large databases, flushdb blocks for long anyway, so a bit more won't
622 * harm and this way the flush and purge will be synchroneus. */
623 if (!(flags & EMPTYDB_ASYNC))
624 jemalloc_purge();
625 #endif
626 }
627
628 /* FLUSHDB [ASYNC]
629 *
630 * Flushes the currently SELECTed Redis DB. */
flushdbCommand(client * c)631 void flushdbCommand(client *c) {
632 int flags;
633
634 if (getFlushCommandFlags(c,&flags) == C_ERR) return;
635 server.dirty += emptyDb(c->db->id,flags,NULL);
636 addReply(c,shared.ok);
637 #if defined(USE_JEMALLOC)
638 /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
639 * for large databases, flushdb blocks for long anyway, so a bit more won't
640 * harm and this way the flush and purge will be synchroneus. */
641 if (!(flags & EMPTYDB_ASYNC))
642 jemalloc_purge();
643 #endif
644 }
645
646 /* FLUSHALL [ASYNC]
647 *
648 * Flushes the whole server data set. */
flushallCommand(client * c)649 void flushallCommand(client *c) {
650 int flags;
651 if (getFlushCommandFlags(c,&flags) == C_ERR) return;
652 flushAllDataAndResetRDB(flags);
653 addReply(c,shared.ok);
654 }
655
656 /* This command implements DEL and LAZYDEL. */
delGenericCommand(client * c,int lazy)657 void delGenericCommand(client *c, int lazy) {
658 int numdel = 0, j;
659
660 for (j = 1; j < c->argc; j++) {
661 expireIfNeeded(c->db,c->argv[j]);
662 int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
663 dbSyncDelete(c->db,c->argv[j]);
664 if (deleted) {
665 signalModifiedKey(c,c->db,c->argv[j]);
666 notifyKeyspaceEvent(NOTIFY_GENERIC,
667 "del",c->argv[j],c->db->id);
668 server.dirty++;
669 numdel++;
670 }
671 }
672 addReplyLongLong(c,numdel);
673 }
674
delCommand(client * c)675 void delCommand(client *c) {
676 delGenericCommand(c,server.lazyfree_lazy_user_del);
677 }
678
unlinkCommand(client * c)679 void unlinkCommand(client *c) {
680 delGenericCommand(c,1);
681 }
682
683 /* EXISTS key1 key2 ... key_N.
684 * Return value is the number of keys existing. */
existsCommand(client * c)685 void existsCommand(client *c) {
686 long long count = 0;
687 int j;
688
689 for (j = 1; j < c->argc; j++) {
690 if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++;
691 }
692 addReplyLongLong(c,count);
693 }
694
selectCommand(client * c)695 void selectCommand(client *c) {
696 long id;
697
698 if (getLongFromObjectOrReply(c, c->argv[1], &id,
699 "invalid DB index") != C_OK)
700 return;
701
702 if (server.cluster_enabled && id != 0) {
703 addReplyError(c,"SELECT is not allowed in cluster mode");
704 return;
705 }
706 if (selectDb(c,id) == C_ERR) {
707 addReplyError(c,"DB index is out of range");
708 } else {
709 addReply(c,shared.ok);
710 }
711 }
712
randomkeyCommand(client * c)713 void randomkeyCommand(client *c) {
714 robj *key;
715
716 if ((key = dbRandomKey(c->db)) == NULL) {
717 addReplyNull(c);
718 return;
719 }
720
721 addReplyBulk(c,key);
722 decrRefCount(key);
723 }
724
keysCommand(client * c)725 void keysCommand(client *c) {
726 dictIterator *di;
727 dictEntry *de;
728 sds pattern = c->argv[1]->ptr;
729 int plen = sdslen(pattern), allkeys;
730 unsigned long numkeys = 0;
731 void *replylen = addReplyDeferredLen(c);
732
733 di = dictGetSafeIterator(c->db->dict);
734 allkeys = (pattern[0] == '*' && plen == 1);
735 while((de = dictNext(di)) != NULL) {
736 sds key = dictGetKey(de);
737 robj *keyobj;
738
739 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
740 keyobj = createStringObject(key,sdslen(key));
741 if (!keyIsExpired(c->db,keyobj)) {
742 addReplyBulk(c,keyobj);
743 numkeys++;
744 }
745 decrRefCount(keyobj);
746 }
747 }
748 dictReleaseIterator(di);
749 setDeferredArrayLen(c,replylen,numkeys);
750 }
751
752 /* This callback is used by scanGenericCommand in order to collect elements
753 * returned by the dictionary iterator into a list. */
scanCallback(void * privdata,const dictEntry * de)754 void scanCallback(void *privdata, const dictEntry *de) {
755 void **pd = (void**) privdata;
756 list *keys = pd[0];
757 robj *o = pd[1];
758 robj *key, *val = NULL;
759
760 if (o == NULL) {
761 sds sdskey = dictGetKey(de);
762 key = createStringObject(sdskey, sdslen(sdskey));
763 } else if (o->type == OBJ_SET) {
764 sds keysds = dictGetKey(de);
765 key = createStringObject(keysds,sdslen(keysds));
766 } else if (o->type == OBJ_HASH) {
767 sds sdskey = dictGetKey(de);
768 sds sdsval = dictGetVal(de);
769 key = createStringObject(sdskey,sdslen(sdskey));
770 val = createStringObject(sdsval,sdslen(sdsval));
771 } else if (o->type == OBJ_ZSET) {
772 sds sdskey = dictGetKey(de);
773 key = createStringObject(sdskey,sdslen(sdskey));
774 val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
775 } else {
776 serverPanic("Type not handled in SCAN callback.");
777 }
778
779 listAddNodeTail(keys, key);
780 if (val) listAddNodeTail(keys, val);
781 }
782
783 /* Try to parse a SCAN cursor stored at object 'o':
784 * if the cursor is valid, store it as unsigned integer into *cursor and
785 * returns C_OK. Otherwise return C_ERR and send an error to the
786 * client. */
parseScanCursorOrReply(client * c,robj * o,unsigned long * cursor)787 int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
788 char *eptr;
789
790 /* Use strtoul() because we need an *unsigned* long, so
791 * getLongLongFromObject() does not cover the whole cursor space. */
792 errno = 0;
793 *cursor = strtoul(o->ptr, &eptr, 10);
794 if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
795 {
796 addReplyError(c, "invalid cursor");
797 return C_ERR;
798 }
799 return C_OK;
800 }
801
802 /* This command implements SCAN, HSCAN and SSCAN commands.
803 * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
804 * if 'o' is NULL the command will operate on the dictionary associated with
805 * the current database.
806 *
807 * When 'o' is not NULL the function assumes that the first argument in
808 * the client arguments vector is a key so it skips it before iterating
809 * in order to parse options.
810 *
811 * In the case of a Hash object the function returns both the field and value
812 * of every element on the Hash. */
scanGenericCommand(client * c,robj * o,unsigned long cursor)813 void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
814 int i, j;
815 list *keys = listCreate();
816 listNode *node, *nextnode;
817 long count = 10;
818 sds pat = NULL;
819 sds typename = NULL;
820 int patlen = 0, use_pattern = 0;
821 dict *ht;
822
823 /* Object must be NULL (to iterate keys names), or the type of the object
824 * must be Set, Sorted Set, or Hash. */
825 serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
826 o->type == OBJ_ZSET);
827
828 /* Set i to the first option argument. The previous one is the cursor. */
829 i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
830
831 /* Step 1: Parse options. */
832 while (i < c->argc) {
833 j = c->argc - i;
834 if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
835 if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
836 != C_OK)
837 {
838 goto cleanup;
839 }
840
841 if (count < 1) {
842 addReply(c,shared.syntaxerr);
843 goto cleanup;
844 }
845
846 i += 2;
847 } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
848 pat = c->argv[i+1]->ptr;
849 patlen = sdslen(pat);
850
851 /* The pattern always matches if it is exactly "*", so it is
852 * equivalent to disabling it. */
853 use_pattern = !(pat[0] == '*' && patlen == 1);
854
855 i += 2;
856 } else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
857 /* SCAN for a particular type only applies to the db dict */
858 typename = c->argv[i+1]->ptr;
859 i+= 2;
860 } else {
861 addReply(c,shared.syntaxerr);
862 goto cleanup;
863 }
864 }
865
866 /* Step 2: Iterate the collection.
867 *
868 * Note that if the object is encoded with a ziplist, intset, or any other
869 * representation that is not a hash table, we are sure that it is also
870 * composed of a small number of elements. So to avoid taking state we
871 * just return everything inside the object in a single call, setting the
872 * cursor to zero to signal the end of the iteration. */
873
874 /* Handle the case of a hash table. */
875 ht = NULL;
876 if (o == NULL) {
877 ht = c->db->dict;
878 } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
879 ht = o->ptr;
880 } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
881 ht = o->ptr;
882 count *= 2; /* We return key / value for this type. */
883 } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
884 zset *zs = o->ptr;
885 ht = zs->dict;
886 count *= 2; /* We return key / value for this type. */
887 }
888
889 if (ht) {
890 void *privdata[2];
891 /* We set the max number of iterations to ten times the specified
892 * COUNT, so if the hash table is in a pathological state (very
893 * sparsely populated) we avoid to block too much time at the cost
894 * of returning no or very few elements. */
895 long maxiterations = count*10;
896
897 /* We pass two pointers to the callback: the list to which it will
898 * add new elements, and the object containing the dictionary so that
899 * it is possible to fetch more data in a type-dependent way. */
900 privdata[0] = keys;
901 privdata[1] = o;
902 do {
903 cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
904 } while (cursor &&
905 maxiterations-- &&
906 listLength(keys) < (unsigned long)count);
907 } else if (o->type == OBJ_SET) {
908 int pos = 0;
909 int64_t ll;
910
911 while(intsetGet(o->ptr,pos++,&ll))
912 listAddNodeTail(keys,createStringObjectFromLongLong(ll));
913 cursor = 0;
914 } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
915 unsigned char *p = ziplistIndex(o->ptr,0);
916 unsigned char *vstr;
917 unsigned int vlen;
918 long long vll;
919
920 while(p) {
921 ziplistGet(p,&vstr,&vlen,&vll);
922 listAddNodeTail(keys,
923 (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
924 createStringObjectFromLongLong(vll));
925 p = ziplistNext(o->ptr,p);
926 }
927 cursor = 0;
928 } else {
929 serverPanic("Not handled encoding in SCAN.");
930 }
931
932 /* Step 3: Filter elements. */
933 node = listFirst(keys);
934 while (node) {
935 robj *kobj = listNodeValue(node);
936 nextnode = listNextNode(node);
937 int filter = 0;
938
939 /* Filter element if it does not match the pattern. */
940 if (!filter && use_pattern) {
941 if (sdsEncodedObject(kobj)) {
942 if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
943 filter = 1;
944 } else {
945 char buf[LONG_STR_SIZE];
946 int len;
947
948 serverAssert(kobj->encoding == OBJ_ENCODING_INT);
949 len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
950 if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
951 }
952 }
953
954 /* Filter an element if it isn't the type we want. */
955 if (!filter && o == NULL && typename){
956 robj* typecheck = lookupKeyReadWithFlags(c->db, kobj, LOOKUP_NOTOUCH);
957 char* type = getObjectTypeName(typecheck);
958 if (strcasecmp((char*) typename, type)) filter = 1;
959 }
960
961 /* Filter element if it is an expired key. */
962 if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
963
964 /* Remove the element and its associated value if needed. */
965 if (filter) {
966 decrRefCount(kobj);
967 listDelNode(keys, node);
968 }
969
970 /* If this is a hash or a sorted set, we have a flat list of
971 * key-value elements, so if this element was filtered, remove the
972 * value, or skip it if it was not filtered: we only match keys. */
973 if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
974 node = nextnode;
975 nextnode = listNextNode(node);
976 if (filter) {
977 kobj = listNodeValue(node);
978 decrRefCount(kobj);
979 listDelNode(keys, node);
980 }
981 }
982 node = nextnode;
983 }
984
985 /* Step 4: Reply to the client. */
986 addReplyArrayLen(c, 2);
987 addReplyBulkLongLong(c,cursor);
988
989 addReplyArrayLen(c, listLength(keys));
990 while ((node = listFirst(keys)) != NULL) {
991 robj *kobj = listNodeValue(node);
992 addReplyBulk(c, kobj);
993 decrRefCount(kobj);
994 listDelNode(keys, node);
995 }
996
997 cleanup:
998 listSetFreeMethod(keys,decrRefCountVoid);
999 listRelease(keys);
1000 }
1001
1002 /* The SCAN command completely relies on scanGenericCommand. */
scanCommand(client * c)1003 void scanCommand(client *c) {
1004 unsigned long cursor;
1005 if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
1006 scanGenericCommand(c,NULL,cursor);
1007 }
1008
dbsizeCommand(client * c)1009 void dbsizeCommand(client *c) {
1010 addReplyLongLong(c,dictSize(c->db->dict));
1011 }
1012
lastsaveCommand(client * c)1013 void lastsaveCommand(client *c) {
1014 addReplyLongLong(c,server.lastsave);
1015 }
1016
getObjectTypeName(robj * o)1017 char* getObjectTypeName(robj *o) {
1018 char* type;
1019 if (o == NULL) {
1020 type = "none";
1021 } else {
1022 switch(o->type) {
1023 case OBJ_STRING: type = "string"; break;
1024 case OBJ_LIST: type = "list"; break;
1025 case OBJ_SET: type = "set"; break;
1026 case OBJ_ZSET: type = "zset"; break;
1027 case OBJ_HASH: type = "hash"; break;
1028 case OBJ_STREAM: type = "stream"; break;
1029 case OBJ_MODULE: {
1030 moduleValue *mv = o->ptr;
1031 type = mv->type->name;
1032 }; break;
1033 default: type = "unknown"; break;
1034 }
1035 }
1036 return type;
1037 }
1038
typeCommand(client * c)1039 void typeCommand(client *c) {
1040 robj *o;
1041 o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
1042 addReplyStatus(c, getObjectTypeName(o));
1043 }
1044
shutdownCommand(client * c)1045 void shutdownCommand(client *c) {
1046 int flags = 0;
1047
1048 if (c->argc > 2) {
1049 addReply(c,shared.syntaxerr);
1050 return;
1051 } else if (c->argc == 2) {
1052 if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
1053 flags |= SHUTDOWN_NOSAVE;
1054 } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
1055 flags |= SHUTDOWN_SAVE;
1056 } else {
1057 addReply(c,shared.syntaxerr);
1058 return;
1059 }
1060 }
1061 if (prepareForShutdown(flags) == C_OK) exit(0);
1062 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
1063 }
1064
renameGenericCommand(client * c,int nx)1065 void renameGenericCommand(client *c, int nx) {
1066 robj *o;
1067 long long expire;
1068 int samekey = 0;
1069
1070 /* When source and dest key is the same, no operation is performed,
1071 * if the key exists, however we still return an error on unexisting key. */
1072 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
1073
1074 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
1075 return;
1076
1077 if (samekey) {
1078 addReply(c,nx ? shared.czero : shared.ok);
1079 return;
1080 }
1081
1082 incrRefCount(o);
1083 expire = getExpire(c->db,c->argv[1]);
1084 if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
1085 if (nx) {
1086 decrRefCount(o);
1087 addReply(c,shared.czero);
1088 return;
1089 }
1090 /* Overwrite: delete the old key before creating the new one
1091 * with the same name. */
1092 dbDelete(c->db,c->argv[2]);
1093 }
1094 dbAdd(c->db,c->argv[2],o);
1095 if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
1096 dbDelete(c->db,c->argv[1]);
1097 signalModifiedKey(c,c->db,c->argv[1]);
1098 signalModifiedKey(c,c->db,c->argv[2]);
1099 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
1100 c->argv[1],c->db->id);
1101 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
1102 c->argv[2],c->db->id);
1103 server.dirty++;
1104 addReply(c,nx ? shared.cone : shared.ok);
1105 }
1106
renameCommand(client * c)1107 void renameCommand(client *c) {
1108 renameGenericCommand(c,0);
1109 }
1110
renamenxCommand(client * c)1111 void renamenxCommand(client *c) {
1112 renameGenericCommand(c,1);
1113 }
1114
moveCommand(client * c)1115 void moveCommand(client *c) {
1116 robj *o;
1117 redisDb *src, *dst;
1118 int srcid;
1119 long long dbid, expire;
1120
1121 if (server.cluster_enabled) {
1122 addReplyError(c,"MOVE is not allowed in cluster mode");
1123 return;
1124 }
1125
1126 /* Obtain source and target DB pointers */
1127 src = c->db;
1128 srcid = c->db->id;
1129
1130 if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
1131 dbid < INT_MIN || dbid > INT_MAX ||
1132 selectDb(c,dbid) == C_ERR)
1133 {
1134 addReply(c,shared.outofrangeerr);
1135 return;
1136 }
1137 dst = c->db;
1138 selectDb(c,srcid); /* Back to the source DB */
1139
1140 /* If the user is moving using as target the same
1141 * DB as the source DB it is probably an error. */
1142 if (src == dst) {
1143 addReply(c,shared.sameobjecterr);
1144 return;
1145 }
1146
1147 /* Check if the element exists and get a reference */
1148 o = lookupKeyWrite(c->db,c->argv[1]);
1149 if (!o) {
1150 addReply(c,shared.czero);
1151 return;
1152 }
1153 expire = getExpire(c->db,c->argv[1]);
1154
1155 /* Return zero if the key already exists in the target DB */
1156 if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
1157 addReply(c,shared.czero);
1158 return;
1159 }
1160 dbAdd(dst,c->argv[1],o);
1161 if (expire != -1) setExpire(c,dst,c->argv[1],expire);
1162 incrRefCount(o);
1163
1164 /* OK! key moved, free the entry in the source DB */
1165 dbDelete(src,c->argv[1]);
1166 signalModifiedKey(c,src,c->argv[1]);
1167 signalModifiedKey(c,dst,c->argv[1]);
1168 notifyKeyspaceEvent(NOTIFY_GENERIC,
1169 "move_from",c->argv[1],src->id);
1170 notifyKeyspaceEvent(NOTIFY_GENERIC,
1171 "move_to",c->argv[1],dst->id);
1172
1173 server.dirty++;
1174 addReply(c,shared.cone);
1175 }
1176
1177 /* Helper function for dbSwapDatabases(): scans the list of keys that have
1178 * one or more blocked clients for B[LR]POP or other blocking commands
1179 * and signal the keys as ready if they are of the right type. See the comment
1180 * where the function is used for more info. */
scanDatabaseForReadyLists(redisDb * db)1181 void scanDatabaseForReadyLists(redisDb *db) {
1182 dictEntry *de;
1183 dictIterator *di = dictGetSafeIterator(db->blocking_keys);
1184 while((de = dictNext(di)) != NULL) {
1185 robj *key = dictGetKey(de);
1186 robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
1187 if (value && (value->type == OBJ_LIST ||
1188 value->type == OBJ_STREAM ||
1189 value->type == OBJ_ZSET))
1190 signalKeyAsReady(db, key);
1191 }
1192 dictReleaseIterator(di);
1193 }
1194
1195 /* Swap two databases at runtime so that all clients will magically see
1196 * the new database even if already connected. Note that the client
1197 * structure c->db points to a given DB, so we need to be smarter and
1198 * swap the underlying referenced structures, otherwise we would need
1199 * to fix all the references to the Redis DB structure.
1200 *
1201 * Returns C_ERR if at least one of the DB ids are out of range, otherwise
1202 * C_OK is returned. */
dbSwapDatabases(long id1,long id2)1203 int dbSwapDatabases(long id1, long id2) {
1204 if (id1 < 0 || id1 >= server.dbnum ||
1205 id2 < 0 || id2 >= server.dbnum) return C_ERR;
1206 if (id1 == id2) return C_OK;
1207 redisDb aux = server.db[id1];
1208 redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
1209
1210 /* Swap hash tables. Note that we don't swap blocking_keys,
1211 * ready_keys and watched_keys, since we want clients to
1212 * remain in the same DB they were. */
1213 db1->dict = db2->dict;
1214 db1->expires = db2->expires;
1215 db1->avg_ttl = db2->avg_ttl;
1216 db1->expires_cursor = db2->expires_cursor;
1217
1218 db2->dict = aux.dict;
1219 db2->expires = aux.expires;
1220 db2->avg_ttl = aux.avg_ttl;
1221 db2->expires_cursor = aux.expires_cursor;
1222
1223 /* Now we need to handle clients blocked on lists: as an effect
1224 * of swapping the two DBs, a client that was waiting for list
1225 * X in a given DB, may now actually be unblocked if X happens
1226 * to exist in the new version of the DB, after the swap.
1227 *
1228 * However normally we only do this check for efficiency reasons
1229 * in dbAdd() when a list is created. So here we need to rescan
1230 * the list of clients blocked on lists and signal lists as ready
1231 * if needed.
1232 *
1233 * Also the swapdb should make transaction fail if there is any
1234 * client watching keys */
1235 scanDatabaseForReadyLists(db1);
1236 touchAllWatchedKeysInDb(db1, db2);
1237 scanDatabaseForReadyLists(db2);
1238 touchAllWatchedKeysInDb(db2, db1);
1239 return C_OK;
1240 }
1241
1242 /* SWAPDB db1 db2 */
swapdbCommand(client * c)1243 void swapdbCommand(client *c) {
1244 long id1, id2;
1245
1246 /* Not allowed in cluster mode: we have just DB 0 there. */
1247 if (server.cluster_enabled) {
1248 addReplyError(c,"SWAPDB is not allowed in cluster mode");
1249 return;
1250 }
1251
1252 /* Get the two DBs indexes. */
1253 if (getLongFromObjectOrReply(c, c->argv[1], &id1,
1254 "invalid first DB index") != C_OK)
1255 return;
1256
1257 if (getLongFromObjectOrReply(c, c->argv[2], &id2,
1258 "invalid second DB index") != C_OK)
1259 return;
1260
1261 /* Swap... */
1262 if (dbSwapDatabases(id1,id2) == C_ERR) {
1263 addReplyError(c,"DB index is out of range");
1264 return;
1265 } else {
1266 RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2};
1267 moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si);
1268 server.dirty++;
1269 addReply(c,shared.ok);
1270 }
1271 }
1272
1273 /*-----------------------------------------------------------------------------
1274 * Expires API
1275 *----------------------------------------------------------------------------*/
1276
removeExpire(redisDb * db,robj * key)1277 int removeExpire(redisDb *db, robj *key) {
1278 /* An expire may only be removed if there is a corresponding entry in the
1279 * main dict. Otherwise, the key will never be freed. */
1280 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1281 return dictDelete(db->expires,key->ptr) == DICT_OK;
1282 }
1283
1284 /* Set an expire to the specified key. If the expire is set in the context
1285 * of an user calling a command 'c' is the client, otherwise 'c' is set
1286 * to NULL. The 'when' parameter is the absolute unix time in milliseconds
1287 * after which the key will no longer be considered valid. */
setExpire(client * c,redisDb * db,robj * key,long long when)1288 void setExpire(client *c, redisDb *db, robj *key, long long when) {
1289 dictEntry *kde, *de;
1290
1291 /* Reuse the sds from the main dict in the expire dict */
1292 kde = dictFind(db->dict,key->ptr);
1293 serverAssertWithInfo(NULL,key,kde != NULL);
1294 de = dictAddOrFind(db->expires,dictGetKey(kde));
1295 dictSetSignedIntegerVal(de,when);
1296
1297 int writable_slave = server.masterhost && server.repl_slave_ro == 0;
1298 if (c && writable_slave && !(c->flags & CLIENT_MASTER))
1299 rememberSlaveKeyWithExpire(db,key);
1300 }
1301
1302 /* Return the expire time of the specified key, or -1 if no expire
1303 * is associated with this key (i.e. the key is non volatile) */
getExpire(redisDb * db,robj * key)1304 long long getExpire(redisDb *db, robj *key) {
1305 dictEntry *de;
1306
1307 /* No expire? return ASAP */
1308 if (dictSize(db->expires) == 0 ||
1309 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
1310
1311 /* The entry was found in the expire dict, this means it should also
1312 * be present in the main dict (safety check). */
1313 serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
1314 return dictGetSignedIntegerVal(de);
1315 }
1316
1317 /* Propagate expires into slaves and the AOF file.
1318 * When a key expires in the master, a DEL operation for this key is sent
1319 * to all the slaves and the AOF file if enabled.
1320 *
1321 * This way the key expiry is centralized in one place, and since both
1322 * AOF and the master->slave link guarantee operation ordering, everything
1323 * will be consistent even if we allow write operations against expiring
1324 * keys. */
propagateExpire(redisDb * db,robj * key,int lazy)1325 void propagateExpire(redisDb *db, robj *key, int lazy) {
1326 robj *argv[2];
1327
1328 argv[0] = lazy ? shared.unlink : shared.del;
1329 argv[1] = key;
1330 incrRefCount(argv[0]);
1331 incrRefCount(argv[1]);
1332
1333 if (server.aof_state != AOF_OFF)
1334 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
1335 replicationFeedSlaves(server.slaves,db->id,argv,2);
1336
1337 decrRefCount(argv[0]);
1338 decrRefCount(argv[1]);
1339 }
1340
1341 /* Check if the key is expired. */
keyIsExpired(redisDb * db,robj * key)1342 int keyIsExpired(redisDb *db, robj *key) {
1343 mstime_t when = getExpire(db,key);
1344 mstime_t now;
1345
1346 if (when < 0) return 0; /* No expire for this key */
1347
1348 /* Don't expire anything while loading. It will be done later. */
1349 if (server.loading) return 0;
1350
1351 /* If we are in the context of a Lua script, we pretend that time is
1352 * blocked to when the Lua script started. This way a key can expire
1353 * only the first time it is accessed and not in the middle of the
1354 * script execution, making propagation to slaves / AOF consistent.
1355 * See issue #1525 on Github for more information. */
1356 if (server.lua_caller) {
1357 now = server.lua_time_start;
1358 }
1359 /* If we are in the middle of a command execution, we still want to use
1360 * a reference time that does not change: in that case we just use the
1361 * cached time, that we update before each call in the call() function.
1362 * This way we avoid that commands such as RPOPLPUSH or similar, that
1363 * may re-open the same key multiple times, can invalidate an already
1364 * open object in a next call, if the next call will see the key expired,
1365 * while the first did not. */
1366 else if (server.fixed_time_expire > 0) {
1367 now = server.mstime;
1368 }
1369 /* For the other cases, we want to use the most fresh time we have. */
1370 else {
1371 now = mstime();
1372 }
1373
1374 /* The key expired if the current (virtual or real) time is greater
1375 * than the expire time of the key. */
1376 return now > when;
1377 }
1378
1379 /* This function is called when we are going to perform some operation
1380 * in a given key, but such key may be already logically expired even if
1381 * it still exists in the database. The main way this function is called
1382 * is via lookupKey*() family of functions.
1383 *
1384 * The behavior of the function depends on the replication role of the
1385 * instance, because slave instances do not expire keys, they wait
1386 * for DELs from the master for consistency matters. However even
1387 * slaves will try to have a coherent return value for the function,
1388 * so that read commands executed in the slave side will be able to
1389 * behave like if the key is expired even if still present (because the
1390 * master has yet to propagate the DEL).
1391 *
1392 * In masters as a side effect of finding a key which is expired, such
1393 * key will be evicted from the database. Also this may trigger the
1394 * propagation of a DEL/UNLINK command in AOF / replication stream.
1395 *
1396 * The return value of the function is 0 if the key is still valid,
1397 * otherwise the function returns 1 if the key is expired. */
expireIfNeeded(redisDb * db,robj * key)1398 int expireIfNeeded(redisDb *db, robj *key) {
1399 if (!keyIsExpired(db,key)) return 0;
1400
1401 /* If we are running in the context of a slave, instead of
1402 * evicting the expired key from the database, we return ASAP:
1403 * the slave key expiration is controlled by the master that will
1404 * send us synthesized DEL operations for expired keys.
1405 *
1406 * Still we try to return the right information to the caller,
1407 * that is, 0 if we think the key should be still valid, 1 if
1408 * we think the key is expired at this time. */
1409 if (server.masterhost != NULL) return 1;
1410
1411 /* Delete the key */
1412 server.stat_expiredkeys++;
1413 propagateExpire(db,key,server.lazyfree_lazy_expire);
1414 notifyKeyspaceEvent(NOTIFY_EXPIRED,
1415 "expired",key,db->id);
1416 int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
1417 dbSyncDelete(db,key);
1418 if (retval) signalModifiedKey(NULL,db,key);
1419 return retval;
1420 }
1421
1422 /* -----------------------------------------------------------------------------
1423 * API to get key arguments from commands
1424 * ---------------------------------------------------------------------------*/
1425
1426 /* Prepare the getKeysResult struct to hold numkeys, either by using the
1427 * pre-allocated keysbuf or by allocating a new array on the heap.
1428 *
1429 * This function must be called at least once before starting to populate
1430 * the result, and can be called repeatedly to enlarge the result array.
1431 */
getKeysPrepareResult(getKeysResult * result,int numkeys)1432 int *getKeysPrepareResult(getKeysResult *result, int numkeys) {
1433 /* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack
1434 * buffer here. */
1435 if (!result->keys) {
1436 serverAssert(!result->numkeys);
1437 result->keys = result->keysbuf;
1438 }
1439
1440 /* Resize if necessary */
1441 if (numkeys > result->size) {
1442 if (result->keys != result->keysbuf) {
1443 /* We're not using a static buffer, just (re)alloc */
1444 result->keys = zrealloc(result->keys, numkeys * sizeof(int));
1445 } else {
1446 /* We are using a static buffer, copy its contents */
1447 result->keys = zmalloc(numkeys * sizeof(int));
1448 if (result->numkeys)
1449 memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(int));
1450 }
1451 result->size = numkeys;
1452 }
1453
1454 return result->keys;
1455 }
1456
1457 /* The base case is to use the keys position as given in the command table
1458 * (firstkey, lastkey, step). */
getKeysUsingCommandTable(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1459 int getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result) {
1460 int j, i = 0, last, *keys;
1461 UNUSED(argv);
1462
1463 if (cmd->firstkey == 0) {
1464 result->numkeys = 0;
1465 return 0;
1466 }
1467
1468 last = cmd->lastkey;
1469 if (last < 0) last = argc+last;
1470
1471 int count = ((last - cmd->firstkey)+1);
1472 keys = getKeysPrepareResult(result, count);
1473
1474 for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
1475 if (j >= argc) {
1476 /* Modules commands, and standard commands with a not fixed number
1477 * of arguments (negative arity parameter) do not have dispatch
1478 * time arity checks, so we need to handle the case where the user
1479 * passed an invalid number of arguments here. In this case we
1480 * return no keys and expect the command implementation to report
1481 * an arity or syntax error. */
1482 if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
1483 getKeysFreeResult(result);
1484 result->numkeys = 0;
1485 return 0;
1486 } else {
1487 serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
1488 }
1489 }
1490 keys[i++] = j;
1491 }
1492 result->numkeys = i;
1493 return i;
1494 }
1495
1496 /* Return all the arguments that are keys in the command passed via argc / argv.
1497 *
1498 * The command returns the positions of all the key arguments inside the array,
1499 * so the actual return value is a heap allocated array of integers. The
1500 * length of the array is returned by reference into *numkeys.
1501 *
1502 * 'cmd' must be point to the corresponding entry into the redisCommand
1503 * table, according to the command name in argv[0].
1504 *
1505 * This function uses the command table if a command-specific helper function
1506 * is not required, otherwise it calls the command-specific function. */
getKeysFromCommand(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1507 int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1508 if (cmd->flags & CMD_MODULE_GETKEYS) {
1509 return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
1510 } else if (!(cmd->flags & CMD_MODULE) && cmd->getkeys_proc) {
1511 return cmd->getkeys_proc(cmd,argv,argc,result);
1512 } else {
1513 return getKeysUsingCommandTable(cmd,argv,argc,result);
1514 }
1515 }
1516
1517 /* Free the result of getKeysFromCommand. */
getKeysFreeResult(getKeysResult * result)1518 void getKeysFreeResult(getKeysResult *result) {
1519 if (result && result->keys != result->keysbuf)
1520 zfree(result->keys);
1521 }
1522
1523 /* Helper function to extract keys from following commands:
1524 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
1525 * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
zunionInterGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1526 int zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1527 int i, num, *keys;
1528 UNUSED(cmd);
1529
1530 num = atoi(argv[2]->ptr);
1531 /* Sanity check. Don't return any key if the command is going to
1532 * reply with syntax error. */
1533 if (num < 1 || num > (argc-3)) {
1534 result->numkeys = 0;
1535 return 0;
1536 }
1537
1538 /* Keys in z{union,inter}store come from two places:
1539 * argv[1] = storage key,
1540 * argv[3...n] = keys to intersect */
1541 /* Total keys = {union,inter} keys + storage key */
1542 keys = getKeysPrepareResult(result, num+1);
1543 result->numkeys = num+1;
1544
1545 /* Add all key positions for argv[3...n] to keys[] */
1546 for (i = 0; i < num; i++) keys[i] = 3+i;
1547
1548 /* Finally add the argv[1] key position (the storage key target). */
1549 keys[num] = 1;
1550
1551 return result->numkeys;
1552 }
1553
1554 /* Helper function to extract keys from the following commands:
1555 * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
1556 * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
evalGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1557 int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1558 int i, num, *keys;
1559 UNUSED(cmd);
1560
1561 num = atoi(argv[2]->ptr);
1562 /* Sanity check. Don't return any key if the command is going to
1563 * reply with syntax error. */
1564 if (num <= 0 || num > (argc-3)) {
1565 result->numkeys = 0;
1566 return 0;
1567 }
1568
1569 keys = getKeysPrepareResult(result, num);
1570 result->numkeys = num;
1571
1572 /* Add all key positions for argv[3...n] to keys[] */
1573 for (i = 0; i < num; i++) keys[i] = 3+i;
1574
1575 return result->numkeys;
1576 }
1577
1578 /* Helper function to extract keys from the SORT command.
1579 *
1580 * SORT <sort-key> ... STORE <store-key> ...
1581 *
1582 * The first argument of SORT is always a key, however a list of options
1583 * follow in SQL-alike style. Here we parse just the minimum in order to
1584 * correctly identify keys in the "STORE" option. */
sortGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1585 int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1586 int i, j, num, *keys, found_store = 0;
1587 UNUSED(cmd);
1588
1589 num = 0;
1590 keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */
1591 keys[num++] = 1; /* <sort-key> is always present. */
1592
1593 /* Search for STORE option. By default we consider options to don't
1594 * have arguments, so if we find an unknown option name we scan the
1595 * next. However there are options with 1 or 2 arguments, so we
1596 * provide a list here in order to skip the right number of args. */
1597 struct {
1598 char *name;
1599 int skip;
1600 } skiplist[] = {
1601 {"limit", 2},
1602 {"get", 1},
1603 {"by", 1},
1604 {NULL, 0} /* End of elements. */
1605 };
1606
1607 for (i = 2; i < argc; i++) {
1608 for (j = 0; skiplist[j].name != NULL; j++) {
1609 if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
1610 i += skiplist[j].skip;
1611 break;
1612 } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
1613 /* Note: we don't increment "num" here and continue the loop
1614 * to be sure to process the *last* "STORE" option if multiple
1615 * ones are provided. This is same behavior as SORT. */
1616 found_store = 1;
1617 keys[num] = i+1; /* <store-key> */
1618 break;
1619 }
1620 }
1621 }
1622 result->numkeys = num + found_store;
1623 return result->numkeys;
1624 }
1625
migrateGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1626 int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1627 int i, num, first, *keys;
1628 UNUSED(cmd);
1629
1630 /* Assume the obvious form. */
1631 first = 3;
1632 num = 1;
1633
1634 /* But check for the extended one with the KEYS option. */
1635 if (argc > 6) {
1636 for (i = 6; i < argc; i++) {
1637 if (!strcasecmp(argv[i]->ptr,"keys") &&
1638 sdslen(argv[3]->ptr) == 0)
1639 {
1640 first = i+1;
1641 num = argc-first;
1642 break;
1643 }
1644 }
1645 }
1646
1647 keys = getKeysPrepareResult(result, num);
1648 for (i = 0; i < num; i++) keys[i] = first+i;
1649 result->numkeys = num;
1650 return num;
1651 }
1652
1653 /* Helper function to extract keys from following commands:
1654 * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
1655 * [COUNT count] [STORE key] [STOREDIST key]
1656 * GEORADIUSBYMEMBER key member radius unit ... options ... */
georadiusGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1657 int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1658 int i, num, *keys;
1659 UNUSED(cmd);
1660
1661 /* Check for the presence of the stored key in the command */
1662 int stored_key = -1;
1663 for (i = 5; i < argc; i++) {
1664 char *arg = argv[i]->ptr;
1665 /* For the case when user specifies both "store" and "storedist" options, the
1666 * second key specified would override the first key. This behavior is kept
1667 * the same as in georadiusCommand method.
1668 */
1669 if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) {
1670 stored_key = i+1;
1671 i++;
1672 }
1673 }
1674 num = 1 + (stored_key == -1 ? 0 : 1);
1675
1676 /* Keys in the command come from two places:
1677 * argv[1] = key,
1678 * argv[5...n] = stored key if present
1679 */
1680 keys = getKeysPrepareResult(result, num);
1681
1682 /* Add all key positions to keys[] */
1683 keys[0] = 1;
1684 if(num > 1) {
1685 keys[1] = stored_key;
1686 }
1687 result->numkeys = num;
1688 return num;
1689 }
1690
1691 /* LCS ... [KEYS <key1> <key2>] ... */
lcsGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1692 int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1693 int i;
1694 int *keys = getKeysPrepareResult(result, 2);
1695 UNUSED(cmd);
1696
1697 /* We need to parse the options of the command in order to check for the
1698 * "KEYS" argument before the "STRINGS" argument. */
1699 for (i = 1; i < argc; i++) {
1700 char *arg = argv[i]->ptr;
1701 int moreargs = (argc-1) - i;
1702
1703 if (!strcasecmp(arg, "strings")) {
1704 break;
1705 } else if (!strcasecmp(arg, "keys") && moreargs >= 2) {
1706 keys[0] = i+1;
1707 keys[1] = i+2;
1708 result->numkeys = 2;
1709 return result->numkeys;
1710 }
1711 }
1712 result->numkeys = 0;
1713 return result->numkeys;
1714 }
1715
1716 /* Helper function to extract keys from memory command.
1717 * MEMORY USAGE <key> */
memoryGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1718 int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1719 UNUSED(cmd);
1720
1721 getKeysPrepareResult(result, 1);
1722 if (argc >= 3 && !strcasecmp(argv[1]->ptr,"usage")) {
1723 result->keys[0] = 2;
1724 result->numkeys = 1;
1725 return result->numkeys;
1726 }
1727 result->numkeys = 0;
1728 return 0;
1729 }
1730
1731 /* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
1732 * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */
xreadGetKeys(struct redisCommand * cmd,robj ** argv,int argc,getKeysResult * result)1733 int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
1734 int i, num = 0, *keys;
1735 UNUSED(cmd);
1736
1737 /* We need to parse the options of the command in order to seek the first
1738 * "STREAMS" string which is actually the option. This is needed because
1739 * "STREAMS" could also be the name of the consumer group and even the
1740 * name of the stream key. */
1741 int streams_pos = -1;
1742 for (i = 1; i < argc; i++) {
1743 char *arg = argv[i]->ptr;
1744 if (!strcasecmp(arg, "block")) {
1745 i++; /* Skip option argument. */
1746 } else if (!strcasecmp(arg, "count")) {
1747 i++; /* Skip option argument. */
1748 } else if (!strcasecmp(arg, "group")) {
1749 i += 2; /* Skip option argument. */
1750 } else if (!strcasecmp(arg, "noack")) {
1751 /* Nothing to do. */
1752 } else if (!strcasecmp(arg, "streams")) {
1753 streams_pos = i;
1754 break;
1755 } else {
1756 break; /* Syntax error. */
1757 }
1758 }
1759 if (streams_pos != -1) num = argc - streams_pos - 1;
1760
1761 /* Syntax error. */
1762 if (streams_pos == -1 || num == 0 || num % 2 != 0) {
1763 result->numkeys = 0;
1764 return 0;
1765 }
1766 num /= 2; /* We have half the keys as there are arguments because
1767 there are also the IDs, one per key. */
1768
1769 keys = getKeysPrepareResult(result, num);
1770 for (i = streams_pos+1; i < argc-num; i++) keys[i-streams_pos-1] = i;
1771 result->numkeys = num;
1772 return num;
1773 }
1774
1775 /* Slot to Key API. This is used by Redis Cluster in order to obtain in
1776 * a fast way a key that belongs to a specified hash slot. This is useful
1777 * while rehashing the cluster and in other conditions when we need to
1778 * understand if we have keys for a given hash slot. */
slotToKeyUpdateKey(sds key,int add)1779 void slotToKeyUpdateKey(sds key, int add) {
1780 size_t keylen = sdslen(key);
1781 unsigned int hashslot = keyHashSlot(key,keylen);
1782 unsigned char buf[64];
1783 unsigned char *indexed = buf;
1784
1785 server.cluster->slots_keys_count[hashslot] += add ? 1 : -1;
1786 if (keylen+2 > 64) indexed = zmalloc(keylen+2);
1787 indexed[0] = (hashslot >> 8) & 0xff;
1788 indexed[1] = hashslot & 0xff;
1789 memcpy(indexed+2,key,keylen);
1790 if (add) {
1791 raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
1792 } else {
1793 raxRemove(server.cluster->slots_to_keys,indexed,keylen+2,NULL);
1794 }
1795 if (indexed != buf) zfree(indexed);
1796 }
1797
slotToKeyAdd(sds key)1798 void slotToKeyAdd(sds key) {
1799 slotToKeyUpdateKey(key,1);
1800 }
1801
slotToKeyDel(sds key)1802 void slotToKeyDel(sds key) {
1803 slotToKeyUpdateKey(key,0);
1804 }
1805
1806 /* Release the radix tree mapping Redis Cluster keys to slots. If 'async'
1807 * is true, we release it asynchronously. */
freeSlotsToKeysMap(rax * rt,int async)1808 void freeSlotsToKeysMap(rax *rt, int async) {
1809 if (async) {
1810 freeSlotsToKeysMapAsync(rt);
1811 } else {
1812 raxFree(rt);
1813 }
1814 }
1815
1816 /* Empty the slots-keys map of Redis CLuster by creating a new empty one and
1817 * freeing the old one. */
slotToKeyFlush(int async)1818 void slotToKeyFlush(int async) {
1819 rax *old = server.cluster->slots_to_keys;
1820
1821 server.cluster->slots_to_keys = raxNew();
1822 memset(server.cluster->slots_keys_count,0,
1823 sizeof(server.cluster->slots_keys_count));
1824 freeSlotsToKeysMap(old, async);
1825 }
1826
1827 /* Pupulate the specified array of objects with keys in the specified slot.
1828 * New objects are returned to represent keys, it's up to the caller to
1829 * decrement the reference count to release the keys names. */
getKeysInSlot(unsigned int hashslot,robj ** keys,unsigned int count)1830 unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
1831 raxIterator iter;
1832 int j = 0;
1833 unsigned char indexed[2];
1834
1835 indexed[0] = (hashslot >> 8) & 0xff;
1836 indexed[1] = hashslot & 0xff;
1837 raxStart(&iter,server.cluster->slots_to_keys);
1838 raxSeek(&iter,">=",indexed,2);
1839 while(count-- && raxNext(&iter)) {
1840 if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
1841 keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
1842 }
1843 raxStop(&iter);
1844 return j;
1845 }
1846
1847 /* Remove all the keys in the specified hash slot.
1848 * The number of removed items is returned. */
delKeysInSlot(unsigned int hashslot)1849 unsigned int delKeysInSlot(unsigned int hashslot) {
1850 raxIterator iter;
1851 int j = 0;
1852 unsigned char indexed[2];
1853
1854 indexed[0] = (hashslot >> 8) & 0xff;
1855 indexed[1] = hashslot & 0xff;
1856 raxStart(&iter,server.cluster->slots_to_keys);
1857 while(server.cluster->slots_keys_count[hashslot]) {
1858 raxSeek(&iter,">=",indexed,2);
1859 raxNext(&iter);
1860
1861 robj *key = createStringObject((char*)iter.key+2,iter.key_len-2);
1862 dbDelete(&server.db[0],key);
1863 decrRefCount(key);
1864 j++;
1865 }
1866 raxStop(&iter);
1867 return j;
1868 }
1869
countKeysInSlot(unsigned int hashslot)1870 unsigned int countKeysInSlot(unsigned int hashslot) {
1871 return server.cluster->slots_keys_count[hashslot];
1872 }
1873