1 #include "server.h"
2 #include "bio.h"
3 #include "atomicvar.h"
4 #include "cluster.h"
5
6 static size_t lazyfree_objects = 0;
7 pthread_mutex_t lazyfree_objects_mutex = PTHREAD_MUTEX_INITIALIZER;
8
9 /* Return the number of currently pending objects to free. */
lazyfreeGetPendingObjectsCount(void)10 size_t lazyfreeGetPendingObjectsCount(void) {
11 size_t aux;
12 atomicGet(lazyfree_objects,aux);
13 return aux;
14 }
15
16 /* Return the amount of work needed in order to free an object.
17 * The return value is not always the actual number of allocations the
18 * object is composed of, but a number proportional to it.
19 *
20 * For strings the function always returns 1.
21 *
22 * For aggregated objects represented by hash tables or other data structures
23 * the function just returns the number of elements the object is composed of.
24 *
25 * Objects composed of single allocations are always reported as having a
26 * single item even if they are actually logical composed of multiple
27 * elements.
28 *
29 * For lists the function returns the number of elements in the quicklist
30 * representing the list. */
lazyfreeGetFreeEffort(robj * obj)31 size_t lazyfreeGetFreeEffort(robj *obj) {
32 if (obj->type == OBJ_LIST) {
33 quicklist *ql = obj->ptr;
34 return ql->len;
35 } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
36 dict *ht = obj->ptr;
37 return dictSize(ht);
38 } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
39 zset *zs = obj->ptr;
40 return zs->zsl->length;
41 } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
42 dict *ht = obj->ptr;
43 return dictSize(ht);
44 } else if (obj->type == OBJ_STREAM) {
45 size_t effort = 0;
46 stream *s = obj->ptr;
47
48 /* Make a best effort estimate to maintain constant runtime. Every macro
49 * node in the Stream is one allocation. */
50 effort += s->rax->numnodes;
51
52 /* Every consumer group is an allocation and so are the entries in its
53 * PEL. We use size of the first group's PEL as an estimate for all
54 * others. */
55 if (s->cgroups && raxSize(s->cgroups)) {
56 raxIterator ri;
57 streamCG *cg;
58 raxStart(&ri,s->cgroups);
59 raxSeek(&ri,"^",NULL,0);
60 /* There must be at least one group so the following should always
61 * work. */
62 serverAssert(raxNext(&ri));
63 cg = ri.data;
64 effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
65 raxStop(&ri);
66 }
67 return effort;
68 } else {
69 return 1; /* Everything else is a single allocation. */
70 }
71 }
72
73 /* Delete a key, value, and associated expiration entry if any, from the DB.
74 * If there are enough allocations to free the value object may be put into
75 * a lazy free list instead of being freed synchronously. The lazy free list
76 * will be reclaimed in a different bio.c thread. */
77 #define LAZYFREE_THRESHOLD 64
dbAsyncDelete(redisDb * db,robj * key)78 int dbAsyncDelete(redisDb *db, robj *key) {
79 /* Deleting an entry from the expires dict will not free the sds of
80 * the key, because it is shared with the main dictionary. */
81 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
82
83 /* If the value is composed of a few allocations, to free in a lazy way
84 * is actually just slower... So under a certain limit we just free
85 * the object synchronously. */
86 dictEntry *de = dictUnlink(db->dict,key->ptr);
87 if (de) {
88 robj *val = dictGetVal(de);
89 size_t free_effort = lazyfreeGetFreeEffort(val);
90
91 /* If releasing the object is too much work, do it in the background
92 * by adding the object to the lazy free list.
93 * Note that if the object is shared, to reclaim it now it is not
94 * possible. This rarely happens, however sometimes the implementation
95 * of parts of the Redis core may call incrRefCount() to protect
96 * objects, and then call dbDelete(). In this case we'll fall
97 * through and reach the dictFreeUnlinkedEntry() call, that will be
98 * equivalent to just calling decrRefCount(). */
99 if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
100 atomicIncr(lazyfree_objects,1);
101 bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
102 dictSetVal(db->dict,de,NULL);
103 }
104 }
105
106 /* Release the key-val pair, or just the key if we set the val
107 * field to NULL in order to lazy free it later. */
108 if (de) {
109 dictFreeUnlinkedEntry(db->dict,de);
110 if (server.cluster_enabled) slotToKeyDel(key->ptr);
111 return 1;
112 } else {
113 return 0;
114 }
115 }
116
117 /* Free an object, if the object is huge enough, free it in async way. */
freeObjAsync(robj * o)118 void freeObjAsync(robj *o) {
119 size_t free_effort = lazyfreeGetFreeEffort(o);
120 if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) {
121 atomicIncr(lazyfree_objects,1);
122 bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL);
123 } else {
124 decrRefCount(o);
125 }
126 }
127
128 /* Empty a Redis DB asynchronously. What the function does actually is to
129 * create a new empty set of hash tables and scheduling the old ones for
130 * lazy freeing. */
emptyDbAsync(redisDb * db)131 void emptyDbAsync(redisDb *db) {
132 dict *oldht1 = db->dict, *oldht2 = db->expires;
133 db->dict = dictCreate(&dbDictType,NULL);
134 db->expires = dictCreate(&keyptrDictType,NULL);
135 atomicIncr(lazyfree_objects,dictSize(oldht1));
136 bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
137 }
138
139 /* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */
freeSlotsToKeysMapAsync(rax * rt)140 void freeSlotsToKeysMapAsync(rax *rt) {
141 atomicIncr(lazyfree_objects,rt->numele);
142 bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,rt);
143 }
144
145 /* Release objects from the lazyfree thread. It's just decrRefCount()
146 * updating the count of objects to release. */
lazyfreeFreeObjectFromBioThread(robj * o)147 void lazyfreeFreeObjectFromBioThread(robj *o) {
148 decrRefCount(o);
149 atomicDecr(lazyfree_objects,1);
150 }
151
152 /* Release a database from the lazyfree thread. The 'db' pointer is the
153 * database which was substituted with a fresh one in the main thread
154 * when the database was logically deleted. */
lazyfreeFreeDatabaseFromBioThread(dict * ht1,dict * ht2)155 void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
156 size_t numkeys = dictSize(ht1);
157 dictRelease(ht1);
158 dictRelease(ht2);
159 atomicDecr(lazyfree_objects,numkeys);
160 }
161
162 /* Release the radix tree mapping Redis Cluster keys to slots in the
163 * lazyfree thread. */
lazyfreeFreeSlotsMapFromBioThread(rax * rt)164 void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
165 size_t len = rt->numele;
166 raxFree(rt);
167 atomicDecr(lazyfree_objects,len);
168 }
169