1 #include "server.h"
2 #include "bio.h"
3 #include "atomicvar.h"
4 #include "cluster.h"
5 
6 static size_t lazyfree_objects = 0;
7 pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
8 
9 /* Return the number of currently pending objects to free. */
lazyfreeGetPendingObjectsCount(void)10 size_t lazyfreeGetPendingObjectsCount(void) {
11     size_t aux;
12     atomicGet(lazyfree_objects,aux);
13     return aux;
14 }
15 
16 /* Return the amount of work needed in order to free an object.
17  * The return value is not always the actual number of allocations the
18  * object is composed of, but a number proportional to it.
19  *
20  * For strings the function always returns 1.
21  *
22  * For aggregated objects represented by hash tables or other data structures
23  * the function just returns the number of elements the object is composed of.
24  *
25  * Objects composed of single allocations are always reported as having a
26  * single item even if they are actually logical composed of multiple
27  * elements.
28  *
29  * For lists the function returns the number of elements in the quicklist
30  * representing the list. */
lazyfreeGetFreeEffort(robj * obj)31 size_t lazyfreeGetFreeEffort(robj *obj) {
32     if (obj->type == OBJ_LIST) {
33         quicklist *ql = obj->ptr;
34         return ql->len;
35     } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
36         dict *ht = obj->ptr;
37         return dictSize(ht);
38     } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
39         zset *zs = obj->ptr;
40         return zs->zsl->length;
41     } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
42         dict *ht = obj->ptr;
43         return dictSize(ht);
44     } else if (obj->type == OBJ_STREAM) {
45         size_t effort = 0;
46         stream *s = obj->ptr;
47 
48         /* Make a best effort estimate to maintain constant runtime. Every macro
49          * node in the Stream is one allocation. */
50         effort += s->rax->numnodes;
51 
52         /* Every consumer group is an allocation and so are the entries in its
53          * PEL. We use size of the first group's PEL as an estimate for all
54          * others. */
55         if (s->cgroups && raxSize(s->cgroups)) {
56             raxIterator ri;
57             streamCG *cg;
58             raxStart(&ri,s->cgroups);
59             raxSeek(&ri,"^",NULL,0);
60             /* There must be at least one group so the following should always
61              * work. */
62             serverAssert(raxNext(&ri));
63             cg = ri.data;
64             effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
65             raxStop(&ri);
66         }
67         return effort;
68     } else {
69         return 1; /* Everything else is a single allocation. */
70     }
71 }
72 
73 /* Delete a key, value, and associated expiration entry if any, from the DB.
74  * If there are enough allocations to free the value object may be put into
75  * a lazy free list instead of being freed synchronously. The lazy free list
76  * will be reclaimed in a different bio.c thread. */
77 #define LAZYFREE_THRESHOLD 64
dbAsyncDelete(redisDb * db,robj * key)78 int dbAsyncDelete(redisDb *db, robj *key) {
79     /* Deleting an entry from the expires dict will not free the sds of
80      * the key, because it is shared with the main dictionary. */
81     if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
82 
83     /* If the value is composed of a few allocations, to free in a lazy way
84      * is actually just slower... So under a certain limit we just free
85      * the object synchronously. */
86     dictEntry *de = dictUnlink(db->dict,key->ptr);
87     if (de) {
88         robj *val = dictGetVal(de);
89         size_t free_effort = lazyfreeGetFreeEffort(val);
90 
91         /* If releasing the object is too much work, do it in the background
92          * by adding the object to the lazy free list.
93          * Note that if the object is shared, to reclaim it now it is not
94          * possible. This rarely happens, however sometimes the implementation
95          * of parts of the Redis core may call incrRefCount() to protect
96          * objects, and then call dbDelete(). In this case we'll fall
97          * through and reach the dictFreeUnlinkedEntry() call, that will be
98          * equivalent to just calling decrRefCount(). */
99         if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
100             atomicIncr(lazyfree_objects,1);
101             bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
102             dictSetVal(db->dict,de,NULL);
103         }
104     }
105 
106     /* Release the key-val pair, or just the key if we set the val
107      * field to NULL in order to lazy free it later. */
108     if (de) {
109         dictFreeUnlinkedEntry(db->dict,de);
110         if (server.cluster_enabled) slotToKeyDel(key->ptr);
111         return 1;
112     } else {
113         return 0;
114     }
115 }
116 
117 /* Free an object, if the object is huge enough, free it in async way. */
freeObjAsync(robj * o)118 void freeObjAsync(robj *o) {
119     size_t free_effort = lazyfreeGetFreeEffort(o);
120     if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) {
121         atomicIncr(lazyfree_objects,1);
122         bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL);
123     } else {
124         decrRefCount(o);
125     }
126 }
127 
128 /* Empty a Redis DB asynchronously. What the function does actually is to
129  * create a new empty set of hash tables and scheduling the old ones for
130  * lazy freeing. */
emptyDbAsync(redisDb * db)131 void emptyDbAsync(redisDb *db) {
132     dict *oldht1 = db->dict, *oldht2 = db->expires;
133     db->dict = dictCreate(&dbDictType,NULL);
134     db->expires = dictCreate(&keyptrDictType,NULL);
135     atomicIncr(lazyfree_objects,dictSize(oldht1));
136     bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
137 }
138 
139 /* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */
freeSlotsToKeysMapAsync(rax * rt)140 void freeSlotsToKeysMapAsync(rax *rt) {
141     atomicIncr(lazyfree_objects,rt->numele);
142     bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,rt);
143 }
144 
145 /* Release objects from the lazyfree thread. It's just decrRefCount()
146  * updating the count of objects to release. */
lazyfreeFreeObjectFromBioThread(robj * o)147 void lazyfreeFreeObjectFromBioThread(robj *o) {
148     decrRefCount(o);
149     atomicDecr(lazyfree_objects,1);
150 }
151 
152 /* Release a database from the lazyfree thread. The 'db' pointer is the
153  * database which was substituted with a fresh one in the main thread
154  * when the database was logically deleted. */
lazyfreeFreeDatabaseFromBioThread(dict * ht1,dict * ht2)155 void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
156     size_t numkeys = dictSize(ht1);
157     dictRelease(ht1);
158     dictRelease(ht2);
159     atomicDecr(lazyfree_objects,numkeys);
160 }
161 
162 /* Release the radix tree mapping Redis Cluster keys to slots in the
163  * lazyfree thread. */
lazyfreeFreeSlotsMapFromBioThread(rax * rt)164 void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
165     size_t len = rt->numele;
166     raxFree(rt);
167     atomicDecr(lazyfree_objects,len);
168 }
169