1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <fcntl.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <time.h>
9 #include <assert.h>
10 #include <inttypes.h>
11 
12 #include "default_engine.h"
13 
14 #define INNODB_MEMCACHED
15 /* Forward Declarations */
16 static void item_link_q(struct default_engine *engine, hash_item *it);
17 static void item_unlink_q(struct default_engine *engine, hash_item *it);
18 static hash_item *do_item_alloc(struct default_engine *engine,
19                                 const void *key, const size_t nkey,
20                                 const int flags, const rel_time_t exptime,
21                                 const int nbytes,
22                                 const void *cookie);
23 static hash_item *do_item_get(struct default_engine *engine,
24                               const char *key, const size_t nkey);
25 static int do_item_link(struct default_engine *engine, hash_item *it);
26 static void do_item_unlink(struct default_engine *engine, hash_item *it);
27 static void do_item_release(struct default_engine *engine, hash_item *it);
28 static void do_item_update(struct default_engine *engine, hash_item *it);
29 static int do_item_replace(struct default_engine *engine,
30                             hash_item *it, hash_item *new_it);
31 static void item_free(struct default_engine *engine, hash_item *it);
32 
33 /*
34  * We only reposition items in the LRU queue if they haven't been repositioned
35  * in this many seconds. That saves us from churning on frequently-accessed
36  * items.
37  */
38 #define ITEM_UPDATE_INTERVAL 60
39 
40 void cache_set_initial_cas_id(uint64_t cas);
41 static uint64_t cas_id;
42 
item_stats_reset(struct default_engine * engine)43 void item_stats_reset(struct default_engine *engine) {
44     pthread_mutex_lock(&engine->cache_lock);
45     memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
46     pthread_mutex_unlock(&engine->cache_lock);
47 }
48 
49 
50 /* warning: don't use these macros with a function, as it evals its arg twice */
ITEM_ntotal(struct default_engine * engine,const hash_item * item)51 static inline size_t ITEM_ntotal(struct default_engine *engine,
52                                  const hash_item *item) {
53     size_t ret = sizeof(*item) + item->nkey + item->nbytes;
54     if (engine->config.use_cas) {
55         ret += sizeof(uint64_t);
56     }
57 
58     return ret;
59 }
60 
61 /* Set the initial CAS id for the hash table */
cache_set_initial_cas_id(uint64_t cas)62 void cache_set_initial_cas_id(uint64_t cas) {
63   cas_id = cas;
64 }
65 
66 /* Get the next CAS id for a new item. */
get_cas_id(void)67 static uint64_t get_cas_id(void) {
68     return ++cas_id;
69 }
70 
71 /* Enable this for reference-count debugging. */
72 #if 0
73 # define DEBUG_REFCNT(it,op) \
74                 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
75                         it, op, it->refcount, \
76                         (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
77                         (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
78 #else
79 # define DEBUG_REFCNT(it,op) while(0)
80 #endif
81 
82 
83 #ifdef INNODB_MEMCACHED /* INNODB_MEMCACHED */
84 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)85 hash_item *do_item_alloc(struct default_engine *engine,
86                          const void *key,
87                          const size_t nkey,
88                          const int flags,
89                          const rel_time_t exptime,
90                          const int nbytes,
91                          const void *cookie) {
92     hash_item *it = NULL;
93     size_t ntotal = sizeof(hash_item) + nkey + nbytes;
94     if (engine->config.use_cas) {
95         ntotal += sizeof(uint64_t);
96     }
97 
98     unsigned int id = slabs_clsid(engine, ntotal);
99     if (id == 0)
100         return 0;
101 
102     if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
103         return NULL;
104     }
105 
106     assert(it->slabs_clsid == 0);
107 
108     it->slabs_clsid = id;
109 
110     assert(it != engine->items.heads[it->slabs_clsid]);
111 
112     it->next = it->prev = it->h_next = 0;
113     it->refcount = 1;     /* the caller will have a reference */
114     DEBUG_REFCNT(it, '*');
115     it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
116     it->nkey = nkey;
117     it->nbytes = nbytes;
118     it->flags = flags;
119     memcpy((void*)item_get_key(it), key, nkey);
120     it->exptime = exptime;
121     return it;
122 }
123 
124 #else /* INNODB_MEMCACHED */
125 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)126 hash_item *do_item_alloc(struct default_engine *engine,
127                          const void *key,
128                          const size_t nkey,
129                          const int flags,
130                          const rel_time_t exptime,
131                          const int nbytes,
132                          const void *cookie) {
133     hash_item *it = NULL;
134     size_t ntotal = sizeof(hash_item) + nkey + nbytes;
135     if (engine->config.use_cas) {
136         ntotal += sizeof(uint64_t);
137     }
138 
139     unsigned int id = slabs_clsid(engine, ntotal);
140     if (id == 0)
141         return 0;
142 
143     /* do a quick check if we have any expired items in the tail.. */
144     int tries = 50;
145     hash_item *search;
146 
147     rel_time_t current_time = engine->server.core->get_current_time();
148 
149     for (search = engine->items.tails[id];
150          tries > 0 && search != NULL;
151          tries--, search=search->prev) {
152         if (search->refcount == 0 &&
153             (search->exptime != 0 && search->exptime < current_time)) {
154             it = search;
155             /* I don't want to actually free the object, just steal
156              * the item to avoid to grab the slab mutex twice ;-)
157              */
158             pthread_mutex_lock(&engine->stats.lock);
159             engine->stats.reclaimed++;
160             pthread_mutex_unlock(&engine->stats.lock);
161             engine->items.itemstats[id].reclaimed++;
162             it->refcount = 1;
163             do_item_unlink(engine, it);
164             /* Initialize the item block: */
165             it->slabs_clsid = 0;
166             it->refcount = 0;
167             break;
168         }
169     }
170 
171     if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
172         /*
173         ** Could not find an expired item at the tail, and memory allocation
174         ** failed. Try to evict some items!
175         */
176         tries = 50;
177 
178         /* If requested to not push old items out of cache when memory runs out,
179          * we're out of luck at this point...
180          */
181 
182         if (engine->config.evict_to_free == 0) {
183             engine->items.itemstats[id].outofmemory++;
184             return NULL;
185         }
186 
187         /*
188          * try to get one off the right LRU
189          * don't necessariuly unlink the tail because it may be locked: refcount>0
190          * search up from tail an item with refcount==0 and unlink it; give up after 50
191          * tries
192          */
193 
194         if (engine->items.tails[id] == 0) {
195             engine->items.itemstats[id].outofmemory++;
196             return NULL;
197         }
198 
199         for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
200             if (search->refcount == 0) {
201                 if (search->exptime == 0 || search->exptime > current_time) {
202                     engine->items.itemstats[id].evicted++;
203                     engine->items.itemstats[id].evicted_time = current_time - search->time;
204                     if (search->exptime != 0) {
205                         engine->items.itemstats[id].evicted_nonzero++;
206                     }
207                     pthread_mutex_lock(&engine->stats.lock);
208                     engine->stats.evictions++;
209                     pthread_mutex_unlock(&engine->stats.lock);
210                     engine->server.stat->evicting(cookie,
211                                                   item_get_key(search),
212                                                   search->nkey);
213                 } else {
214                     engine->items.itemstats[id].reclaimed++;
215                     pthread_mutex_lock(&engine->stats.lock);
216                     engine->stats.reclaimed++;
217                     pthread_mutex_unlock(&engine->stats.lock);
218                 }
219                 do_item_unlink(engine, search);
220                 break;
221             }
222         }
223         it = slabs_alloc(engine, ntotal, id);
224         if (it == 0) {
225             engine->items.itemstats[id].outofmemory++;
226             /* Last ditch effort. There is a very rare bug which causes
227              * refcount leaks. We've fixed most of them, but it still happens,
228              * and it may happen in the future.
229              * We can reasonably assume no item can stay locked for more than
230              * three hours, so if we find one in the tail which is that old,
231              * free it anyway.
232              */
233             tries = 50;
234             for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
235                 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
236                     engine->items.itemstats[id].tailrepairs++;
237                     search->refcount = 0;
238                     do_item_unlink(engine, search);
239                     break;
240                 }
241             }
242             it = slabs_alloc(engine, ntotal, id);
243             if (it == 0) {
244                 return NULL;
245             }
246         }
247     }
248 
249     assert(it->slabs_clsid == 0);
250 
251     it->slabs_clsid = id;
252 
253     assert(it != engine->items.heads[it->slabs_clsid]);
254 
255     it->next = it->prev = it->h_next = 0;
256     it->refcount = 1;     /* the caller will have a reference */
257     DEBUG_REFCNT(it, '*');
258     it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
259     it->nkey = nkey;
260     it->nbytes = nbytes;
261     it->flags = flags;
262     memcpy((void*)item_get_key(it), key, nkey);
263     it->exptime = exptime;
264     return it;
265 }
266 #endif /* INNODB_MEMCACHED */
267 
item_free(struct default_engine * engine,hash_item * it)268 static void item_free(struct default_engine *engine, hash_item *it) {
269     size_t ntotal = ITEM_ntotal(engine, it);
270     unsigned int clsid;
271     assert((it->iflag & ITEM_LINKED) == 0);
272     assert(it != engine->items.heads[it->slabs_clsid]);
273     assert(it != engine->items.tails[it->slabs_clsid]);
274     assert(it->refcount == 0);
275 
276     /* so slab size changer can tell later if item is already free or not */
277     clsid = it->slabs_clsid;
278     it->slabs_clsid = 0;
279     it->iflag |= ITEM_SLABBED;
280     DEBUG_REFCNT(it, 'F');
281     slabs_free(engine, it, ntotal, clsid);
282 }
283 
item_link_q(struct default_engine * engine,hash_item * it)284 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
285     hash_item **head, **tail;
286     assert(it->slabs_clsid < POWER_LARGEST);
287     assert((it->iflag & ITEM_SLABBED) == 0);
288 
289     head = &engine->items.heads[it->slabs_clsid];
290     tail = &engine->items.tails[it->slabs_clsid];
291     assert(it != *head);
292     assert((*head && *tail) || (*head == 0 && *tail == 0));
293     it->prev = 0;
294     it->next = *head;
295     if (it->next) it->next->prev = it;
296     *head = it;
297     if (*tail == 0) *tail = it;
298     engine->items.sizes[it->slabs_clsid]++;
299     return;
300 }
301 
item_unlink_q(struct default_engine * engine,hash_item * it)302 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
303     hash_item **head, **tail;
304     assert(it->slabs_clsid < POWER_LARGEST);
305     head = &engine->items.heads[it->slabs_clsid];
306     tail = &engine->items.tails[it->slabs_clsid];
307 
308     if (*head == it) {
309         assert(it->prev == 0);
310         *head = it->next;
311     }
312     if (*tail == it) {
313         assert(it->next == 0);
314         *tail = it->prev;
315     }
316     assert(it->next != it);
317     assert(it->prev != it);
318 
319     if (it->next) it->next->prev = it->prev;
320     if (it->prev) it->prev->next = it->next;
321     engine->items.sizes[it->slabs_clsid]--;
322     return;
323 }
324 
do_item_link(struct default_engine * engine,hash_item * it)325 int do_item_link(struct default_engine *engine, hash_item *it) {
326     MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
327     assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
328     assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
329     it->iflag |= ITEM_LINKED;
330     it->time = engine->server.core->get_current_time();
331     assoc_insert(engine, engine->server.core->hash(item_get_key(it),
332                                                         it->nkey, 0),
333                  it);
334 
335     pthread_mutex_lock(&engine->stats.lock);
336     engine->stats.curr_bytes += ITEM_ntotal(engine, it);
337     engine->stats.curr_items += 1;
338     engine->stats.total_items += 1;
339     pthread_mutex_unlock(&engine->stats.lock);
340 
341     /* Allocate a new CAS ID on link. */
342     item_set_cas(NULL, NULL, it, get_cas_id());
343 
344     item_link_q(engine, it);
345 
346     return 1;
347 }
348 
do_item_unlink(struct default_engine * engine,hash_item * it)349 void do_item_unlink(struct default_engine *engine, hash_item *it) {
350     MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
351     if ((it->iflag & ITEM_LINKED) != 0) {
352         it->iflag &= ~ITEM_LINKED;
353         pthread_mutex_lock(&engine->stats.lock);
354         engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
355         engine->stats.curr_items -= 1;
356         pthread_mutex_unlock(&engine->stats.lock);
357         assoc_delete(engine, engine->server.core->hash(item_get_key(it),
358                                                             it->nkey, 0),
359                      item_get_key(it), it->nkey);
360         item_unlink_q(engine, it);
361         if (it->refcount == 0) {
362             item_free(engine, it);
363         }
364     }
365 }
366 
do_item_release(struct default_engine * engine,hash_item * it)367 void do_item_release(struct default_engine *engine, hash_item *it) {
368     MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
369     if (it->refcount != 0) {
370         it->refcount--;
371         DEBUG_REFCNT(it, '-');
372     }
373     if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
374         item_free(engine, it);
375     }
376 }
377 
do_item_update(struct default_engine * engine,hash_item * it)378 void do_item_update(struct default_engine *engine, hash_item *it) {
379     rel_time_t current_time = engine->server.core->get_current_time();
380     MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
381     if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
382         assert((it->iflag & ITEM_SLABBED) == 0);
383 
384         if ((it->iflag & ITEM_LINKED) != 0) {
385             item_unlink_q(engine, it);
386             it->time = current_time;
387             item_link_q(engine, it);
388         }
389     }
390 }
391 
do_item_replace(struct default_engine * engine,hash_item * it,hash_item * new_it)392 int do_item_replace(struct default_engine *engine,
393                     hash_item *it, hash_item *new_it) {
394     MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
395                            item_get_key(new_it), new_it->nkey, new_it->nbytes);
396     assert((it->iflag & ITEM_SLABBED) == 0);
397 
398     do_item_unlink(engine, it);
399     return do_item_link(engine, new_it);
400 }
401 
402 /*@null@*/
do_item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)403 static char *do_item_cachedump(const unsigned int slabs_clsid,
404                                const unsigned int limit,
405                                unsigned int *bytes) {
406 #ifdef FUTURE
407     unsigned int memlimit = 2 * 1024 * 1024;   /* 2MB max response size */
408     char *buffer;
409     unsigned int bufcurr;
410     hash_item *it;
411     unsigned int len;
412     unsigned int shown = 0;
413     char key_temp[KEY_MAX_LENGTH + 1];
414     char temp[512];
415 
416     it = engine->items.heads[slabs_clsid];
417 
418     buffer = malloc((size_t)memlimit);
419     if (buffer == 0) return NULL;
420     bufcurr = 0;
421 
422 
423     while (it != NULL && (limit == 0 || shown < limit)) {
424         assert(it->nkey <= KEY_MAX_LENGTH);
425         /* Copy the key since it may not be null-terminated in the struct */
426         strncpy(key_temp, item_get_key(it), it->nkey);
427         key_temp[it->nkey] = 0x00; /* terminate */
428         len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
429                        key_temp, it->nbytes - 2,
430                        (unsigned long)it->exptime + process_started);
431         if (bufcurr + len + 6 > memlimit)  /* 6 is END\r\n\0 */
432             break;
433         memcpy(buffer + bufcurr, temp, len);
434         bufcurr += len;
435         shown++;
436         it = it->next;
437     }
438 
439 
440     memcpy(buffer + bufcurr, "END\r\n", 6);
441     bufcurr += 5;
442 
443     *bytes = bufcurr;
444     return buffer;
445 #endif
446     (void)slabs_clsid;
447     (void)limit;
448     (void)bytes;
449     return NULL;
450 }
451 
do_item_stats(struct default_engine * engine,ADD_STAT add_stats,const void * c)452 static void do_item_stats(struct default_engine *engine,
453                           ADD_STAT add_stats, const void *c) {
454     int i;
455     for (i = 0; i < POWER_LARGEST; i++) {
456         if (engine->items.tails[i] != NULL) {
457             const char *prefix = "items";
458             add_statistics(c, add_stats, prefix, i, "number", "%u",
459                            engine->items.sizes[i]);
460             add_statistics(c, add_stats, prefix, i, "age", "%u",
461                            engine->items.tails[i]->time);
462             add_statistics(c, add_stats, prefix, i, "evicted",
463                            "%u", engine->items.itemstats[i].evicted);
464             add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
465                            "%u", engine->items.itemstats[i].evicted_nonzero);
466             add_statistics(c, add_stats, prefix, i, "evicted_time",
467                            "%u", engine->items.itemstats[i].evicted_time);
468             add_statistics(c, add_stats, prefix, i, "outofmemory",
469                            "%u", engine->items.itemstats[i].outofmemory);
470             add_statistics(c, add_stats, prefix, i, "tailrepairs",
471                            "%u", engine->items.itemstats[i].tailrepairs);;
472             add_statistics(c, add_stats, prefix, i, "reclaimed",
473                            "%u", engine->items.itemstats[i].reclaimed);;
474         }
475     }
476 }
477 
478 /** dumps out a list of objects of each size, with granularity of 32 bytes */
479 /*@null@*/
do_item_stats_sizes(struct default_engine * engine,ADD_STAT add_stats,const void * c)480 static void do_item_stats_sizes(struct default_engine *engine,
481                                 ADD_STAT add_stats, const void *c) {
482 
483     /* max 1MB object, divided into 32 bytes size buckets */
484     const int num_buckets = 32768;
485     unsigned int *histogram = calloc(num_buckets, sizeof(int));
486 
487     if (histogram != NULL) {
488         int i;
489 
490         /* build the histogram */
491         for (i = 0; i < POWER_LARGEST; i++) {
492             hash_item *iter = engine->items.heads[i];
493             while (iter) {
494                 int ntotal = ITEM_ntotal(engine, iter);
495                 int bucket = ntotal / 32;
496                 if ((ntotal % 32) != 0) bucket++;
497                 if (bucket < num_buckets) histogram[bucket]++;
498                 iter = iter->next;
499             }
500         }
501 
502         /* write the buffer */
503         for (i = 0; i < num_buckets; i++) {
504             if (histogram[i] != 0) {
505                 char key[8], val[32];
506                 int klen, vlen;
507                 klen = snprintf(key, sizeof(key), "%d", i * 32);
508                 vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
509                 assert(klen < sizeof(key));
510                 assert(vlen < sizeof(val));
511                 add_stats(key, klen, val, vlen, c);
512             }
513         }
514         free(histogram);
515     }
516 }
517 
518 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(struct default_engine * engine,const char * key,const size_t nkey)519 hash_item *do_item_get(struct default_engine *engine,
520                        const char *key, const size_t nkey) {
521     rel_time_t current_time = engine->server.core->get_current_time();
522     hash_item *it = assoc_find(engine, engine->server.core->hash(key,
523                                                                       nkey, 0),
524                                key, nkey);
525     int was_found = 0;
526 
527     if (engine->config.verbose > 2) {
528         if (it == NULL) {
529             fprintf(stderr, "> NOT FOUND %s", key);
530         } else {
531             fprintf(stderr, "> FOUND KEY %s", (const char*)item_get_key(it));
532             was_found++;
533         }
534     }
535 
536     if (it != NULL && engine->config.oldest_live != 0 &&
537         engine->config.oldest_live <= current_time &&
538         it->time <= engine->config.oldest_live) {
539         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
540         it = NULL;
541     }
542 
543     if (it == NULL && was_found) {
544         fprintf(stderr, " -nuked by flush");
545         was_found--;
546     }
547 
548     if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
549         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
550         it = NULL;
551     }
552 
553     if (it == NULL && was_found) {
554         fprintf(stderr, " -nuked by expire");
555         was_found--;
556     }
557 
558     if (it != NULL) {
559         it->refcount++;
560         DEBUG_REFCNT(it, '+');
561         do_item_update(engine, it);
562     }
563 
564     if (engine->config.verbose > 2)
565         fprintf(stderr, "\n");
566 
567     return it;
568 }
569 
570 /*
571  * Stores an item in the cache according to the semantics of one of the set
572  * commands. In threaded mode, this is protected by the cache lock.
573  *
574  * Returns the state of storage.
575  */
do_store_item(struct default_engine * engine,hash_item * it,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)576 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
577                                        hash_item *it, uint64_t *cas,
578                                        ENGINE_STORE_OPERATION operation,
579                                        const void *cookie) {
580     const char *key = item_get_key(it);
581     hash_item *old_it = do_item_get(engine, key, it->nkey);
582     ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
583 
584     hash_item *new_it = NULL;
585 
586     if (old_it != NULL && operation == OPERATION_ADD) {
587         /* add only adds a nonexistent item, but promote to head of LRU */
588         do_item_update(engine, old_it);
589     } else if (!old_it && (operation == OPERATION_REPLACE
590         || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
591     {
592         /* replace only replaces an existing value; don't store */
593     } else if (operation == OPERATION_CAS) {
594         /* validate cas operation */
595         if(old_it == NULL) {
596             // LRU expired
597             stored = ENGINE_KEY_ENOENT;
598         }
599         else if (item_get_cas(it) == item_get_cas(old_it)) {
600             // cas validates
601             // it and old_it may belong to different classes.
602             // I'm updating the stats for the one that's getting pushed out
603             do_item_replace(engine, old_it, it);
604             stored = ENGINE_SUCCESS;
605         } else {
606             if (engine->config.verbose > 1) {
607                 fprintf(stderr,
608                         "CAS:  failure: expected %"PRIu64", got %"PRIu64"\n",
609                         item_get_cas(old_it),
610                         item_get_cas(it));
611             }
612             stored = ENGINE_KEY_EEXISTS;
613         }
614     } else {
615         /*
616          * Append - combine new and old record into single one. Here it's
617          * atomic and thread-safe.
618          */
619         if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
620             /*
621              * Validate CAS
622              */
623             if (item_get_cas(it) != 0) {
624                 // CAS much be equal
625                 if (item_get_cas(it) != item_get_cas(old_it)) {
626                     stored = ENGINE_KEY_EEXISTS;
627                 }
628             }
629 
630             if (stored == ENGINE_NOT_STORED) {
631                 /* we have it and old_it here - alloc memory to hold both */
632                 new_it = do_item_alloc(engine, key, it->nkey,
633                                        old_it->flags,
634                                        old_it->exptime,
635                                        it->nbytes + old_it->nbytes - 2 /* CRLF */,
636                                        cookie);
637 
638                 if (new_it == NULL) {
639                     /* SERVER_ERROR out of memory */
640                     if (old_it != NULL) {
641                         do_item_release(engine, old_it);
642                     }
643 
644                     return ENGINE_NOT_STORED;
645                 }
646 
647                 /* copy data from it and old_it to new_it */
648 
649                 if (operation == OPERATION_APPEND) {
650                     memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
651                     memcpy(item_get_data(new_it) + old_it->nbytes - 2 /* CRLF */, item_get_data(it), it->nbytes);
652                 } else {
653                     /* OPERATION_PREPEND */
654                     memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
655                     memcpy(item_get_data(new_it) + it->nbytes - 2 /* CRLF */, item_get_data(old_it), old_it->nbytes);
656                 }
657 
658                 it = new_it;
659             }
660         }
661 
662         if (stored == ENGINE_NOT_STORED) {
663             if (old_it != NULL) {
664                 do_item_replace(engine, old_it, it);
665             } else {
666                 do_item_link(engine, it);
667             }
668 
669             *cas = item_get_cas(it);
670             stored = ENGINE_SUCCESS;
671         }
672     }
673 
674     if (old_it != NULL) {
675         do_item_release(engine, old_it);         /* release our reference */
676     }
677 
678     if (new_it != NULL) {
679         do_item_release(engine, new_it);
680     }
681 
682     if (stored == ENGINE_SUCCESS) {
683         *cas = item_get_cas(it);
684     }
685 
686     return stored;
687 }
688 
689 
690 /*
691  * adds a delta value to a numeric item.
692  *
693  * c     connection requesting the operation
694  * it    item to adjust
695  * incr  true to increment value, false to decrement
696  * delta amount to adjust value by
697  * buf   buffer for response string
698  *
699  * returns a response string to send back to the client.
700  */
do_add_delta(struct default_engine * engine,hash_item * it,const bool incr,const int64_t delta,uint64_t * rcas,uint64_t * result,const void * cookie)701 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
702                                       hash_item *it, const bool incr,
703                                       const int64_t delta, uint64_t *rcas,
704                                       uint64_t *result, const void *cookie) {
705     const char *ptr;
706     uint64_t value;
707     int res;
708 
709     ptr = item_get_data(it);
710 
711     if (!safe_strtoull(ptr, &value)) {
712         return ENGINE_EINVAL;
713     }
714 
715     if (incr) {
716         value += delta;
717     } else {
718         if(delta > value) {
719             value = 0;
720         } else {
721             value -= delta;
722         }
723     }
724 
725     *result = value;
726     char buf[80];
727     if ((res = snprintf(buf, sizeof(buf), "%" PRIu64 "\r\n", value)) == -1) {
728         return ENGINE_EINVAL;
729     }
730     hash_item *new_it = do_item_alloc(engine, item_get_key(it),
731                                       it->nkey, it->flags,
732                                       it->exptime, res,
733                                       cookie );
734     if (new_it == 0) {
735         do_item_unlink(engine, it);
736         return ENGINE_ENOMEM;
737     }
738     memcpy(item_get_data(new_it), buf, res);
739     do_item_replace(engine, it, new_it);
740     *rcas = item_get_cas(new_it);
741     do_item_release(engine, new_it);       /* release our reference */
742 
743     return ENGINE_SUCCESS;
744 }
745 
746 /********************************* ITEM ACCESS *******************************/
747 #ifdef INNODB_MEMCACHED
748 
749 /*
750  * Allocates a new item.
751  */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)752 hash_item *item_alloc(struct default_engine *engine,
753                       const void *key, size_t nkey, int flags,
754                       rel_time_t exptime, int nbytes, const void *cookie) {
755     hash_item *it;
756     it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
757     return it;
758 }
759 #else /* INNODB_MEMCAHED */
760 /*
761  * Allocates a new item.
762  */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)763 hash_item *item_alloc(struct default_engine *engine,
764                       const void *key, size_t nkey, int flags,
765                       rel_time_t exptime, int nbytes, const void *cookie) {
766     hash_item *it;
767     pthread_mutex_lock(&engine->cache_lock);
768     it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
769     pthread_mutex_unlock(&engine->cache_lock);
770     return it;
771 }
772 #endif /* INNODB_MEMCAHED */
773 
774 /*
775  * Returns an item if it hasn't been marked as expired,
776  * lazy-expiring as needed.
777  */
item_get(struct default_engine * engine,const void * key,const size_t nkey)778 hash_item *item_get(struct default_engine *engine,
779                     const void *key, const size_t nkey) {
780     hash_item *it;
781     pthread_mutex_lock(&engine->cache_lock);
782     it = do_item_get(engine, key, nkey);
783     pthread_mutex_unlock(&engine->cache_lock);
784     return it;
785 }
786 
787 /*
788  * Decrements the reference count on an item and adds it to the freelist if
789  * needed.
790  */
item_release(struct default_engine * engine,hash_item * item)791 void item_release(struct default_engine *engine, hash_item *item) {
792     pthread_mutex_lock(&engine->cache_lock);
793     do_item_release(engine, item);
794     pthread_mutex_unlock(&engine->cache_lock);
795 }
796 
797 /*
798  * Unlinks an item from the LRU and hashtable.
799  */
item_unlink(struct default_engine * engine,hash_item * item)800 void item_unlink(struct default_engine *engine, hash_item *item) {
801     pthread_mutex_lock(&engine->cache_lock);
802     do_item_unlink(engine, item);
803     pthread_mutex_unlock(&engine->cache_lock);
804 }
805 
do_arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)806 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
807                                        const void* cookie,
808                                        const void* key,
809                                        const int nkey,
810                                        const bool increment,
811                                        const bool create,
812                                        const uint64_t delta,
813                                        const uint64_t initial,
814                                        const rel_time_t exptime,
815                                        uint64_t *cas,
816                                        uint64_t *result)
817 {
818    hash_item *item = do_item_get(engine, key, nkey);
819    ENGINE_ERROR_CODE ret;
820 
821    if (item == NULL) {
822       if (!create) {
823          return ENGINE_KEY_ENOENT;
824       } else {
825          char buffer[128];
826          int len = snprintf(buffer, sizeof(buffer), "%"PRIu64"\r\n",
827                             (uint64_t)initial);
828 
829          item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
830          if (item == NULL) {
831             return ENGINE_ENOMEM;
832          }
833          memcpy((void*)item_get_data(item), buffer, len);
834          if ((ret = do_store_item(engine, item, cas,
835                                   OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
836              *result = initial;
837              *cas = item_get_cas(item);
838          }
839          do_item_release(engine, item);
840       }
841    } else {
842       ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
843       do_item_release(engine, item);
844    }
845 
846    return ret;
847 }
848 
arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)849 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
850                              const void* cookie,
851                              const void* key,
852                              const int nkey,
853                              const bool increment,
854                              const bool create,
855                              const uint64_t delta,
856                              const uint64_t initial,
857                              const rel_time_t exptime,
858                              uint64_t *cas,
859                              uint64_t *result)
860 {
861     ENGINE_ERROR_CODE ret;
862 
863     pthread_mutex_lock(&engine->cache_lock);
864     ret = do_arithmetic(engine, cookie, key, nkey, increment,
865                         create, delta, initial, exptime, cas,
866                         result);
867     pthread_mutex_unlock(&engine->cache_lock);
868     return ret;
869 }
870 
871 /*
872  * Stores an item in the cache (high level, obeys set/add/replace semantics)
873  */
store_item(struct default_engine * engine,hash_item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)874 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
875                              hash_item *item, uint64_t *cas,
876                              ENGINE_STORE_OPERATION operation,
877                              const void *cookie) {
878     ENGINE_ERROR_CODE ret;
879 
880     pthread_mutex_lock(&engine->cache_lock);
881     ret = do_store_item(engine, item, cas, operation, cookie);
882     pthread_mutex_unlock(&engine->cache_lock);
883     return ret;
884 }
885 
886 /*
887  * Flushes expired items after a flush_all call
888  */
item_flush_expired(struct default_engine * engine,time_t when)889 void item_flush_expired(struct default_engine *engine, time_t when) {
890     int i;
891     hash_item *iter, *next;
892 
893     pthread_mutex_lock(&engine->cache_lock);
894 
895     if (when == 0) {
896         engine->config.oldest_live = engine->server.core->get_current_time() - 1;
897     } else {
898         engine->config.oldest_live = engine->server.core->realtime(when) - 1;
899     }
900 
901     if (engine->config.oldest_live != 0) {
902         for (i = 0; i < POWER_LARGEST; i++) {
903             /*
904              * The LRU is sorted in decreasing time order, and an item's
905              * timestamp is never newer than its last access time, so we
906              * only need to walk back until we hit an item older than the
907              * oldest_live time.
908              * The oldest_live checking will auto-expire the remaining items.
909              */
910             for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
911                 if (iter->time >= engine->config.oldest_live) {
912                     next = iter->next;
913                     if ((iter->iflag & ITEM_SLABBED) == 0) {
914                         do_item_unlink(engine, iter);
915                     }
916                 } else {
917                     /* We've hit the first old item. Continue to the next queue. */
918                     break;
919                 }
920             }
921         }
922     }
923     pthread_mutex_unlock(&engine->cache_lock);
924 }
925 
926 /*
927  * Dumps part of the cache
928  */
item_cachedump(struct default_engine * engine,unsigned int slabs_clsid,unsigned int limit,unsigned int * bytes)929 char *item_cachedump(struct default_engine *engine,
930                      unsigned int slabs_clsid,
931                      unsigned int limit,
932                      unsigned int *bytes) {
933     char *ret;
934 
935     pthread_mutex_lock(&engine->cache_lock);
936     ret = do_item_cachedump(slabs_clsid, limit, bytes);
937     pthread_mutex_unlock(&engine->cache_lock);
938     return ret;
939 }
940 
item_stats(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)941 void item_stats(struct default_engine *engine,
942                    ADD_STAT add_stat, const void *cookie)
943 {
944     pthread_mutex_lock(&engine->cache_lock);
945     do_item_stats(engine, add_stat, cookie);
946     pthread_mutex_unlock(&engine->cache_lock);
947 }
948 
949 
item_stats_sizes(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)950 void item_stats_sizes(struct default_engine *engine,
951                       ADD_STAT add_stat, const void *cookie)
952 {
953     pthread_mutex_lock(&engine->cache_lock);
954     do_item_stats_sizes(engine, add_stat, cookie);
955     pthread_mutex_unlock(&engine->cache_lock);
956 }
957 
do_item_link_cursor(struct default_engine * engine,hash_item * cursor,int ii)958 static void do_item_link_cursor(struct default_engine *engine,
959                                 hash_item *cursor, int ii)
960 {
961     cursor->slabs_clsid = (uint8_t)ii;
962     cursor->next = NULL;
963     cursor->prev = engine->items.tails[ii];
964     engine->items.tails[ii]->next = cursor;
965     engine->items.tails[ii] = cursor;
966     engine->items.sizes[ii]++;
967 }
968 
969 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
970                                       hash_item *item, void *cookie);
971 
do_item_walk_cursor(struct default_engine * engine,hash_item * cursor,int steplength,ITERFUNC itemfunc,void * itemdata,ENGINE_ERROR_CODE * error)972 static bool do_item_walk_cursor(struct default_engine *engine,
973                                 hash_item *cursor,
974                                 int steplength,
975                                 ITERFUNC itemfunc,
976                                 void* itemdata,
977                                 ENGINE_ERROR_CODE *error)
978 {
979     int ii = 0;
980     *error = ENGINE_SUCCESS;
981 
982     while (cursor->prev != NULL && ii < steplength) {
983         ++ii;
984         /* Move cursor */
985         hash_item *ptr = cursor->prev;
986         item_unlink_q(engine, cursor);
987 
988         bool done = false;
989 
990         if (ptr == engine->items.heads[cursor->slabs_clsid]) {
991             done = true;
992         } else {
993             cursor->next = ptr;
994             cursor->prev = ptr->prev;
995             cursor->prev->next = cursor;
996             ptr->prev = cursor;
997         }
998 
999         /* Ignore cursors */
1000         if (!(ptr->nkey == 0 && ptr->nbytes == 0)) {
1001             *error = itemfunc(engine, ptr, itemdata);
1002             if (*error != ENGINE_SUCCESS) {
1003                 return false;
1004             }
1005         }
1006 
1007         if (done) {
1008             return false;
1009         }
1010    }
1011 
1012     return true;
1013 }
1014 
item_scrub(struct default_engine * engine,hash_item * item,void * cookie)1015 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1016                                     hash_item *item,
1017                                     void *cookie) {
1018     (void)cookie;
1019     engine->scrubber.visited++;
1020     rel_time_t current_time = engine->server.core->get_current_time();
1021     if (item->refcount == 0 &&
1022         (item->exptime != 0 && item->exptime < current_time)) {
1023         do_item_unlink(engine, item);
1024         engine->scrubber.cleaned++;
1025     }
1026     return ENGINE_SUCCESS;
1027 }
1028 
item_scrub_class(struct default_engine * engine,hash_item * cursor)1029 static void item_scrub_class(struct default_engine *engine,
1030                              hash_item *cursor) {
1031 
1032     ENGINE_ERROR_CODE ret;
1033     bool more;
1034     do {
1035         pthread_mutex_lock(&engine->cache_lock);
1036         more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1037         pthread_mutex_unlock(&engine->cache_lock);
1038         if (ret != ENGINE_SUCCESS) {
1039             break;
1040         }
1041     } while (more);
1042 }
1043 
item_scubber_main(void * arg)1044 static void *item_scubber_main(void *arg)
1045 {
1046     struct default_engine *engine = arg;
1047     hash_item cursor = { .refcount = 1 };
1048 
1049     for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1050         pthread_mutex_lock(&engine->cache_lock);
1051         bool skip = false;
1052         if (engine->items.heads[ii] == NULL) {
1053             skip = true;
1054         } else {
1055             // add the item at the tail
1056             do_item_link_cursor(engine, &cursor, ii);
1057         }
1058         pthread_mutex_unlock(&engine->cache_lock);
1059 
1060         if (!skip) {
1061             item_scrub_class(engine, &cursor);
1062         }
1063     }
1064 
1065     pthread_mutex_lock(&engine->scrubber.lock);
1066     engine->scrubber.stopped = time(NULL);
1067     engine->scrubber.running = false;
1068     pthread_mutex_unlock(&engine->scrubber.lock);
1069 
1070     return NULL;
1071 }
1072 
item_start_scrub(struct default_engine * engine)1073 bool item_start_scrub(struct default_engine *engine)
1074 {
1075     bool ret = false;
1076     pthread_mutex_lock(&engine->scrubber.lock);
1077     if (!engine->scrubber.running) {
1078         engine->scrubber.started = time(NULL);
1079         engine->scrubber.stopped = 0;
1080         engine->scrubber.visited = 0;
1081         engine->scrubber.cleaned = 0;
1082         engine->scrubber.running = true;
1083 
1084         pthread_t t;
1085         pthread_attr_t attr;
1086 
1087         if (pthread_attr_init(&attr) != 0 ||
1088             pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1089             pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1090         {
1091             engine->scrubber.running = false;
1092         } else {
1093             ret = true;
1094         }
1095     }
1096     pthread_mutex_unlock(&engine->scrubber.lock);
1097 
1098     return ret;
1099 }
1100