1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <fcntl.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <time.h>
9 #include <assert.h>
10 #include <inttypes.h>
11 
12 #include "default_engine.h"
13 
14 /* Forward Declarations */
15 static void item_link_q(struct default_engine *engine, hash_item *it);
16 static void item_unlink_q(struct default_engine *engine, hash_item *it);
17 static hash_item *do_item_alloc(struct default_engine *engine,
18                                 const void *key, const size_t nkey,
19                                 const int flags, const rel_time_t exptime,
20                                 const int nbytes,
21                                 const void *cookie);
22 static hash_item *do_item_get(struct default_engine *engine,
23                               const char *key, const size_t nkey);
24 static int do_item_link(struct default_engine *engine, hash_item *it);
25 static void do_item_unlink(struct default_engine *engine, hash_item *it);
26 static void do_item_release(struct default_engine *engine, hash_item *it);
27 static void do_item_update(struct default_engine *engine, hash_item *it);
28 static int do_item_replace(struct default_engine *engine,
29                             hash_item *it, hash_item *new_it);
30 static void item_free(struct default_engine *engine, hash_item *it);
31 
32 /*
33  * We only reposition items in the LRU queue if they haven't been repositioned
34  * in this many seconds. That saves us from churning on frequently-accessed
35  * items.
36  */
37 #define ITEM_UPDATE_INTERVAL 60
38 /*
39  * To avoid scanning through the complete cache in some circumstances we'll
40  * just give up and return an error after inspecting a fixed number of objects.
41  */
42 static const int search_items = 50;
43 
item_stats_reset(struct default_engine * engine)44 void item_stats_reset(struct default_engine *engine) {
45     pthread_mutex_lock(&engine->cache_lock);
46     memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
47     pthread_mutex_unlock(&engine->cache_lock);
48 }
49 
50 
51 /* warning: don't use these macros with a function, as it evals its arg twice */
ITEM_ntotal(struct default_engine * engine,const hash_item * item)52 static inline size_t ITEM_ntotal(struct default_engine *engine,
53                                  const hash_item *item) {
54     size_t ret = sizeof(*item) + item->nkey + item->nbytes;
55     if (engine->config.use_cas) {
56         ret += sizeof(uint64_t);
57     }
58 
59     return ret;
60 }
61 
62 /* Get the next CAS id for a new item. */
get_cas_id(void)63 static uint64_t get_cas_id(void) {
64     static uint64_t cas_id = 0;
65     return ++cas_id;
66 }
67 
68 /* Enable this for reference-count debugging. */
69 #if 0
70 # define DEBUG_REFCNT(it,op) \
71                 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
72                         it, op, it->refcount, \
73                         (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
74                         (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
75 #else
76 # define DEBUG_REFCNT(it,op) while(0)
77 #endif
78 
79 
80 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)81 hash_item *do_item_alloc(struct default_engine *engine,
82                          const void *key,
83                          const size_t nkey,
84                          const int flags,
85                          const rel_time_t exptime,
86                          const int nbytes,
87                          const void *cookie) {
88     hash_item *it = NULL;
89     size_t ntotal = sizeof(hash_item) + nkey + nbytes;
90     if (engine->config.use_cas) {
91         ntotal += sizeof(uint64_t);
92     }
93 
94     unsigned int id = slabs_clsid(engine, ntotal);
95     if (id == 0)
96         return 0;
97 
98     /* do a quick check if we have any expired items in the tail.. */
99     int tries = search_items;
100     hash_item *search;
101 
102     rel_time_t current_time = engine->server.core->get_current_time();
103 
104     for (search = engine->items.tails[id];
105          tries > 0 && search != NULL;
106          tries--, search=search->prev) {
107         if (search->refcount == 0 &&
108             (search->exptime != 0 && search->exptime < current_time)) {
109             it = search;
110             /* I don't want to actually free the object, just steal
111              * the item to avoid to grab the slab mutex twice ;-)
112              */
113             pthread_mutex_lock(&engine->stats.lock);
114             engine->stats.reclaimed++;
115             pthread_mutex_unlock(&engine->stats.lock);
116             engine->items.itemstats[id].reclaimed++;
117             it->refcount = 1;
118             slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
119             do_item_unlink(engine, it);
120             /* Initialize the item block: */
121             it->slabs_clsid = 0;
122             it->refcount = 0;
123             break;
124         }
125     }
126 
127     if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
128         /*
129         ** Could not find an expired item at the tail, and memory allocation
130         ** failed. Try to evict some items!
131         */
132         tries = search_items;
133 
134         /* If requested to not push old items out of cache when memory runs out,
135          * we're out of luck at this point...
136          */
137 
138         if (engine->config.evict_to_free == 0) {
139             engine->items.itemstats[id].outofmemory++;
140             return NULL;
141         }
142 
143         /*
144          * try to get one off the right LRU
145          * don't necessariuly unlink the tail because it may be locked: refcount>0
146          * search up from tail an item with refcount==0 and unlink it; give up after search_items
147          * tries
148          */
149 
150         if (engine->items.tails[id] == 0) {
151             engine->items.itemstats[id].outofmemory++;
152             return NULL;
153         }
154 
155         for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
156             if (search->refcount == 0) {
157                 if (search->exptime == 0 || search->exptime > current_time) {
158                     engine->items.itemstats[id].evicted++;
159                     engine->items.itemstats[id].evicted_time = current_time - search->time;
160                     if (search->exptime != 0) {
161                         engine->items.itemstats[id].evicted_nonzero++;
162                     }
163                     pthread_mutex_lock(&engine->stats.lock);
164                     engine->stats.evictions++;
165                     pthread_mutex_unlock(&engine->stats.lock);
166                     engine->server.stat->evicting(cookie,
167                                                   item_get_key(search),
168                                                   search->nkey);
169                 } else {
170                     engine->items.itemstats[id].reclaimed++;
171                     pthread_mutex_lock(&engine->stats.lock);
172                     engine->stats.reclaimed++;
173                     pthread_mutex_unlock(&engine->stats.lock);
174                 }
175                 do_item_unlink(engine, search);
176                 break;
177             }
178         }
179         it = slabs_alloc(engine, ntotal, id);
180         if (it == 0) {
181             engine->items.itemstats[id].outofmemory++;
182             /* Last ditch effort. There is a very rare bug which causes
183              * refcount leaks. We've fixed most of them, but it still happens,
184              * and it may happen in the future.
185              * We can reasonably assume no item can stay locked for more than
186              * three hours, so if we find one in the tail which is that old,
187              * free it anyway.
188              */
189             tries = search_items;
190             for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
191                 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
192                     engine->items.itemstats[id].tailrepairs++;
193                     search->refcount = 0;
194                     do_item_unlink(engine, search);
195                     break;
196                 }
197             }
198             it = slabs_alloc(engine, ntotal, id);
199             if (it == 0) {
200                 return NULL;
201             }
202         }
203     }
204 
205     assert(it->slabs_clsid == 0);
206 
207     it->slabs_clsid = id;
208 
209     assert(it != engine->items.heads[it->slabs_clsid]);
210 
211     it->next = it->prev = it->h_next = 0;
212     it->refcount = 1;     /* the caller will have a reference */
213     DEBUG_REFCNT(it, '*');
214     it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
215     it->nkey = nkey;
216     it->nbytes = nbytes;
217     it->flags = flags;
218     memcpy((void*)item_get_key(it), key, nkey);
219     it->exptime = exptime;
220     return it;
221 }
222 
item_free(struct default_engine * engine,hash_item * it)223 static void item_free(struct default_engine *engine, hash_item *it) {
224     size_t ntotal = ITEM_ntotal(engine, it);
225     unsigned int clsid;
226     assert((it->iflag & ITEM_LINKED) == 0);
227     assert(it != engine->items.heads[it->slabs_clsid]);
228     assert(it != engine->items.tails[it->slabs_clsid]);
229     assert(it->refcount == 0);
230 
231     /* so slab size changer can tell later if item is already free or not */
232     clsid = it->slabs_clsid;
233     it->slabs_clsid = 0;
234     it->iflag |= ITEM_SLABBED;
235     DEBUG_REFCNT(it, 'F');
236     slabs_free(engine, it, ntotal, clsid);
237 }
238 
item_link_q(struct default_engine * engine,hash_item * it)239 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
240     hash_item **head, **tail;
241     assert(it->slabs_clsid < POWER_LARGEST);
242     assert((it->iflag & ITEM_SLABBED) == 0);
243 
244     head = &engine->items.heads[it->slabs_clsid];
245     tail = &engine->items.tails[it->slabs_clsid];
246     assert(it != *head);
247     assert((*head && *tail) || (*head == 0 && *tail == 0));
248     it->prev = 0;
249     it->next = *head;
250     if (it->next) it->next->prev = it;
251     *head = it;
252     if (*tail == 0) *tail = it;
253     engine->items.sizes[it->slabs_clsid]++;
254     return;
255 }
256 
item_unlink_q(struct default_engine * engine,hash_item * it)257 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
258     hash_item **head, **tail;
259     assert(it->slabs_clsid < POWER_LARGEST);
260     head = &engine->items.heads[it->slabs_clsid];
261     tail = &engine->items.tails[it->slabs_clsid];
262 
263     if (*head == it) {
264         assert(it->prev == 0);
265         *head = it->next;
266     }
267     if (*tail == it) {
268         assert(it->next == 0);
269         *tail = it->prev;
270     }
271     assert(it->next != it);
272     assert(it->prev != it);
273 
274     if (it->next) it->next->prev = it->prev;
275     if (it->prev) it->prev->next = it->next;
276     engine->items.sizes[it->slabs_clsid]--;
277     return;
278 }
279 
do_item_link(struct default_engine * engine,hash_item * it)280 int do_item_link(struct default_engine *engine, hash_item *it) {
281     MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
282     assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
283     assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
284     it->iflag |= ITEM_LINKED;
285     it->time = engine->server.core->get_current_time();
286     assoc_insert(engine, engine->server.core->hash(item_get_key(it),
287                                                         it->nkey, 0),
288                  it);
289 
290     pthread_mutex_lock(&engine->stats.lock);
291     engine->stats.curr_bytes += ITEM_ntotal(engine, it);
292     engine->stats.curr_items += 1;
293     engine->stats.total_items += 1;
294     pthread_mutex_unlock(&engine->stats.lock);
295 
296     /* Allocate a new CAS ID on link. */
297     item_set_cas(NULL, NULL, it, get_cas_id());
298 
299     item_link_q(engine, it);
300 
301     return 1;
302 }
303 
do_item_unlink(struct default_engine * engine,hash_item * it)304 void do_item_unlink(struct default_engine *engine, hash_item *it) {
305     MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
306     if ((it->iflag & ITEM_LINKED) != 0) {
307         it->iflag &= ~ITEM_LINKED;
308         pthread_mutex_lock(&engine->stats.lock);
309         engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
310         engine->stats.curr_items -= 1;
311         pthread_mutex_unlock(&engine->stats.lock);
312         assoc_delete(engine, engine->server.core->hash(item_get_key(it),
313                                                             it->nkey, 0),
314                      item_get_key(it), it->nkey);
315         item_unlink_q(engine, it);
316         if (it->refcount == 0) {
317             item_free(engine, it);
318         }
319     }
320 }
321 
do_item_release(struct default_engine * engine,hash_item * it)322 void do_item_release(struct default_engine *engine, hash_item *it) {
323     MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
324     if (it->refcount != 0) {
325         it->refcount--;
326         DEBUG_REFCNT(it, '-');
327     }
328     if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
329         item_free(engine, it);
330     }
331 }
332 
do_item_update(struct default_engine * engine,hash_item * it)333 void do_item_update(struct default_engine *engine, hash_item *it) {
334     rel_time_t current_time = engine->server.core->get_current_time();
335     MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
336     if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
337         assert((it->iflag & ITEM_SLABBED) == 0);
338 
339         if ((it->iflag & ITEM_LINKED) != 0) {
340             item_unlink_q(engine, it);
341             it->time = current_time;
342             item_link_q(engine, it);
343         }
344     }
345 }
346 
do_item_replace(struct default_engine * engine,hash_item * it,hash_item * new_it)347 int do_item_replace(struct default_engine *engine,
348                     hash_item *it, hash_item *new_it) {
349     MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
350                            item_get_key(new_it), new_it->nkey, new_it->nbytes);
351     assert((it->iflag & ITEM_SLABBED) == 0);
352 
353     do_item_unlink(engine, it);
354     return do_item_link(engine, new_it);
355 }
356 
357 /*@null@*/
do_item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)358 static char *do_item_cachedump(const unsigned int slabs_clsid,
359                                const unsigned int limit,
360                                unsigned int *bytes) {
361 #ifdef FUTURE
362     unsigned int memlimit = 2 * 1024 * 1024;   /* 2MB max response size */
363     char *buffer;
364     unsigned int bufcurr;
365     hash_item *it;
366     unsigned int len;
367     unsigned int shown = 0;
368     char key_temp[KEY_MAX_LENGTH + 1];
369     char temp[512];
370 
371     it = engine->items.heads[slabs_clsid];
372 
373     buffer = malloc((size_t)memlimit);
374     if (buffer == 0) return NULL;
375     bufcurr = 0;
376 
377 
378     while (it != NULL && (limit == 0 || shown < limit)) {
379         assert(it->nkey <= KEY_MAX_LENGTH);
380         /* Copy the key since it may not be null-terminated in the struct */
381         strncpy(key_temp, item_get_key(it), it->nkey);
382         key_temp[it->nkey] = 0x00; /* terminate */
383         len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
384                        key_temp, it->nbytes,
385                        (unsigned long)it->exptime + process_started);
386         if (bufcurr + len + 6 > memlimit)  /* 6 is END\r\n\0 */
387             break;
388         memcpy(buffer + bufcurr, temp, len);
389         bufcurr += len;
390         shown++;
391         it = it->next;
392     }
393 
394 
395     memcpy(buffer + bufcurr, "END\r\n", 6);
396     bufcurr += 5;
397 
398     *bytes = bufcurr;
399     return buffer;
400 #endif
401     (void)slabs_clsid;
402     (void)limit;
403     (void)bytes;
404     return NULL;
405 }
406 
do_item_stats(struct default_engine * engine,ADD_STAT add_stats,const void * c)407 static void do_item_stats(struct default_engine *engine,
408                           ADD_STAT add_stats, const void *c) {
409     int i;
410     rel_time_t current_time = engine->server.core->get_current_time();
411     for (i = 0; i < POWER_LARGEST; i++) {
412         if (engine->items.tails[i] != NULL) {
413             int search = search_items;
414             while (search > 0 &&
415                    engine->items.tails[i] != NULL &&
416                    ((engine->config.oldest_live != 0 && /* Item flushd */
417                      engine->config.oldest_live <= current_time &&
418                      engine->items.tails[i]->time <= engine->config.oldest_live) ||
419                     (engine->items.tails[i]->exptime != 0 && /* and not expired */
420                      engine->items.tails[i]->exptime < current_time))) {
421                 --search;
422                 if (engine->items.tails[i]->refcount == 0) {
423                     do_item_unlink(engine, engine->items.tails[i]);
424                 } else {
425                     break;
426                 }
427             }
428             if (engine->items.tails[i] == NULL) {
429                 /* We removed all of the items in this slab class */
430                 continue;
431             }
432 
433             const char *prefix = "items";
434             add_statistics(c, add_stats, prefix, i, "number", "%u",
435                            engine->items.sizes[i]);
436             add_statistics(c, add_stats, prefix, i, "age", "%u",
437                            engine->items.tails[i]->time);
438             add_statistics(c, add_stats, prefix, i, "evicted",
439                            "%u", engine->items.itemstats[i].evicted);
440             add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
441                            "%u", engine->items.itemstats[i].evicted_nonzero);
442             add_statistics(c, add_stats, prefix, i, "evicted_time",
443                            "%u", engine->items.itemstats[i].evicted_time);
444             add_statistics(c, add_stats, prefix, i, "outofmemory",
445                            "%u", engine->items.itemstats[i].outofmemory);
446             add_statistics(c, add_stats, prefix, i, "tailrepairs",
447                            "%u", engine->items.itemstats[i].tailrepairs);;
448             add_statistics(c, add_stats, prefix, i, "reclaimed",
449                            "%u", engine->items.itemstats[i].reclaimed);;
450         }
451     }
452 }
453 
454 /** dumps out a list of objects of each size, with granularity of 32 bytes */
455 /*@null@*/
do_item_stats_sizes(struct default_engine * engine,ADD_STAT add_stats,const void * c)456 static void do_item_stats_sizes(struct default_engine *engine,
457                                 ADD_STAT add_stats, const void *c) {
458 
459     /* max 1MB object, divided into 32 bytes size buckets */
460     const int num_buckets = 32768;
461     unsigned int *histogram = calloc(num_buckets, sizeof(int));
462 
463     if (histogram != NULL) {
464         int i;
465 
466         /* build the histogram */
467         for (i = 0; i < POWER_LARGEST; i++) {
468             hash_item *iter = engine->items.heads[i];
469             while (iter) {
470                 int ntotal = ITEM_ntotal(engine, iter);
471                 int bucket = ntotal / 32;
472                 if ((ntotal % 32) != 0) bucket++;
473                 if (bucket < num_buckets) histogram[bucket]++;
474                 iter = iter->next;
475             }
476         }
477 
478         /* write the buffer */
479         for (i = 0; i < num_buckets; i++) {
480             if (histogram[i] != 0) {
481                 char key[8], val[32];
482                 int klen, vlen;
483                 klen = snprintf(key, sizeof(key), "%d", i * 32);
484                 vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
485                 assert(klen < sizeof(key));
486                 assert(vlen < sizeof(val));
487                 add_stats(key, klen, val, vlen, c);
488             }
489         }
490         free(histogram);
491     }
492 }
493 
494 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(struct default_engine * engine,const char * key,const size_t nkey)495 hash_item *do_item_get(struct default_engine *engine,
496                        const char *key, const size_t nkey) {
497     rel_time_t current_time = engine->server.core->get_current_time();
498     hash_item *it = assoc_find(engine, engine->server.core->hash(key,
499                                                                       nkey, 0),
500                                key, nkey);
501     int was_found = 0;
502 
503     if (engine->config.verbose > 2) {
504         EXTENSION_LOGGER_DESCRIPTOR *logger;
505         logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
506         if (it == NULL) {
507             logger->log(EXTENSION_LOG_DEBUG, NULL,
508                         "> NOT FOUND %s", key);
509         } else {
510             logger->log(EXTENSION_LOG_DEBUG, NULL,
511                         "> FOUND KEY %s",
512                         (const char*)item_get_key(it));
513             was_found++;
514         }
515     }
516 
517     if (it != NULL && engine->config.oldest_live != 0 &&
518         engine->config.oldest_live <= current_time &&
519         it->time <= engine->config.oldest_live) {
520         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
521         it = NULL;
522     }
523 
524     if (it == NULL && was_found) {
525         EXTENSION_LOGGER_DESCRIPTOR *logger;
526         logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
527         logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by flush");
528         was_found--;
529     }
530 
531     if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
532         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
533         it = NULL;
534     }
535 
536     if (it == NULL && was_found) {
537         EXTENSION_LOGGER_DESCRIPTOR *logger;
538         logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
539         logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by expire");
540         was_found--;
541     }
542 
543     if (it != NULL) {
544         it->refcount++;
545         DEBUG_REFCNT(it, '+');
546         do_item_update(engine, it);
547     }
548 
549     return it;
550 }
551 
552 /*
553  * Stores an item in the cache according to the semantics of one of the set
554  * commands. In threaded mode, this is protected by the cache lock.
555  *
556  * Returns the state of storage.
557  */
do_store_item(struct default_engine * engine,hash_item * it,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)558 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
559                                        hash_item *it, uint64_t *cas,
560                                        ENGINE_STORE_OPERATION operation,
561                                        const void *cookie) {
562     const char *key = item_get_key(it);
563     hash_item *old_it = do_item_get(engine, key, it->nkey);
564     ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
565 
566     hash_item *new_it = NULL;
567 
568     if (old_it != NULL && operation == OPERATION_ADD) {
569         /* add only adds a nonexistent item, but promote to head of LRU */
570         do_item_update(engine, old_it);
571     } else if (!old_it && (operation == OPERATION_REPLACE
572         || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
573     {
574         /* replace only replaces an existing value; don't store */
575     } else if (operation == OPERATION_CAS) {
576         /* validate cas operation */
577         if(old_it == NULL) {
578             // LRU expired
579             stored = ENGINE_KEY_ENOENT;
580         }
581         else if (item_get_cas(it) == item_get_cas(old_it)) {
582             // cas validates
583             // it and old_it may belong to different classes.
584             // I'm updating the stats for the one that's getting pushed out
585             do_item_replace(engine, old_it, it);
586             stored = ENGINE_SUCCESS;
587         } else {
588             if (engine->config.verbose > 1) {
589                 EXTENSION_LOGGER_DESCRIPTOR *logger;
590                 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
591                 logger->log(EXTENSION_LOG_INFO, NULL,
592                         "CAS:  failure: expected %"PRIu64", got %"PRIu64"\n",
593                         item_get_cas(old_it),
594                         item_get_cas(it));
595             }
596             stored = ENGINE_KEY_EEXISTS;
597         }
598     } else {
599         /*
600          * Append - combine new and old record into single one. Here it's
601          * atomic and thread-safe.
602          */
603         if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
604             /*
605              * Validate CAS
606              */
607             if (item_get_cas(it) != 0) {
608                 // CAS much be equal
609                 if (item_get_cas(it) != item_get_cas(old_it)) {
610                     stored = ENGINE_KEY_EEXISTS;
611                 }
612             }
613 
614             if (stored == ENGINE_NOT_STORED) {
615                 /* we have it and old_it here - alloc memory to hold both */
616                 new_it = do_item_alloc(engine, key, it->nkey,
617                                        old_it->flags,
618                                        old_it->exptime,
619                                        it->nbytes + old_it->nbytes,
620                                        cookie);
621 
622                 if (new_it == NULL) {
623                     /* SERVER_ERROR out of memory */
624                     if (old_it != NULL) {
625                         do_item_release(engine, old_it);
626                     }
627 
628                     return ENGINE_NOT_STORED;
629                 }
630 
631                 /* copy data from it and old_it to new_it */
632 
633                 if (operation == OPERATION_APPEND) {
634                     memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
635                     memcpy(item_get_data(new_it) + old_it->nbytes, item_get_data(it), it->nbytes);
636                 } else {
637                     /* OPERATION_PREPEND */
638                     memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
639                     memcpy(item_get_data(new_it) + it->nbytes, item_get_data(old_it), old_it->nbytes);
640                 }
641 
642                 it = new_it;
643             }
644         }
645 
646         if (stored == ENGINE_NOT_STORED) {
647             if (old_it != NULL) {
648                 do_item_replace(engine, old_it, it);
649             } else {
650                 do_item_link(engine, it);
651             }
652 
653             *cas = item_get_cas(it);
654             stored = ENGINE_SUCCESS;
655         }
656     }
657 
658     if (old_it != NULL) {
659         do_item_release(engine, old_it);         /* release our reference */
660     }
661 
662     if (new_it != NULL) {
663         do_item_release(engine, new_it);
664     }
665 
666     if (stored == ENGINE_SUCCESS) {
667         *cas = item_get_cas(it);
668     }
669 
670     return stored;
671 }
672 
673 
674 /*
675  * adds a delta value to a numeric item.
676  *
677  * c     connection requesting the operation
678  * it    item to adjust
679  * incr  true to increment value, false to decrement
680  * delta amount to adjust value by
681  * buf   buffer for response string
682  *
683  * returns a response string to send back to the client.
684  */
do_add_delta(struct default_engine * engine,hash_item * it,const bool incr,const int64_t delta,uint64_t * rcas,uint64_t * result,const void * cookie)685 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
686                                       hash_item *it, const bool incr,
687                                       const int64_t delta, uint64_t *rcas,
688                                       uint64_t *result, const void *cookie) {
689     const char *ptr;
690     uint64_t value;
691     char buf[80];
692     int res;
693 
694     if (it->nbytes >= (sizeof(buf) - 1)) {
695         return ENGINE_EINVAL;
696     }
697 
698     ptr = item_get_data(it);
699     memcpy(buf, ptr, it->nbytes);
700     buf[it->nbytes] = '\0';
701 
702     if (!safe_strtoull(buf, &value)) {
703         return ENGINE_EINVAL;
704     }
705 
706     if (incr) {
707         value += delta;
708     } else {
709         if(delta > value) {
710             value = 0;
711         } else {
712             value -= delta;
713         }
714     }
715 
716     *result = value;
717     if ((res = snprintf(buf, sizeof(buf), "%" PRIu64, value)) == -1) {
718         return ENGINE_EINVAL;
719     }
720 
721     if (it->refcount == 1 && res <= it->nbytes) {
722         // we can do inline replacement
723         memcpy(item_get_data(it), buf, res);
724         memset(item_get_data(it) + res, ' ', it->nbytes - res);
725         item_set_cas(NULL, NULL, it, get_cas_id());
726         *rcas = item_get_cas(it);
727     } else {
728         hash_item *new_it = do_item_alloc(engine, item_get_key(it),
729                                           it->nkey, it->flags,
730                                           it->exptime, res,
731                                           cookie);
732         if (new_it == NULL) {
733             do_item_unlink(engine, it);
734             return ENGINE_ENOMEM;
735         }
736         memcpy(item_get_data(new_it), buf, res);
737         do_item_replace(engine, it, new_it);
738         *rcas = item_get_cas(new_it);
739         do_item_release(engine, new_it);       /* release our reference */
740     }
741 
742     return ENGINE_SUCCESS;
743 }
744 
745 /********************************* ITEM ACCESS *******************************/
746 
747 /*
748  * Allocates a new item.
749  */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)750 hash_item *item_alloc(struct default_engine *engine,
751                       const void *key, size_t nkey, int flags,
752                       rel_time_t exptime, int nbytes, const void *cookie) {
753     hash_item *it;
754     pthread_mutex_lock(&engine->cache_lock);
755     it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
756     pthread_mutex_unlock(&engine->cache_lock);
757     return it;
758 }
759 
760 /*
761  * Returns an item if it hasn't been marked as expired,
762  * lazy-expiring as needed.
763  */
item_get(struct default_engine * engine,const void * key,const size_t nkey)764 hash_item *item_get(struct default_engine *engine,
765                     const void *key, const size_t nkey) {
766     hash_item *it;
767     pthread_mutex_lock(&engine->cache_lock);
768     it = do_item_get(engine, key, nkey);
769     pthread_mutex_unlock(&engine->cache_lock);
770     return it;
771 }
772 
773 /*
774  * Decrements the reference count on an item and adds it to the freelist if
775  * needed.
776  */
item_release(struct default_engine * engine,hash_item * item)777 void item_release(struct default_engine *engine, hash_item *item) {
778     pthread_mutex_lock(&engine->cache_lock);
779     do_item_release(engine, item);
780     pthread_mutex_unlock(&engine->cache_lock);
781 }
782 
783 /*
784  * Unlinks an item from the LRU and hashtable.
785  */
item_unlink(struct default_engine * engine,hash_item * item)786 void item_unlink(struct default_engine *engine, hash_item *item) {
787     pthread_mutex_lock(&engine->cache_lock);
788     do_item_unlink(engine, item);
789     pthread_mutex_unlock(&engine->cache_lock);
790 }
791 
do_arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)792 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
793                                        const void* cookie,
794                                        const void* key,
795                                        const int nkey,
796                                        const bool increment,
797                                        const bool create,
798                                        const uint64_t delta,
799                                        const uint64_t initial,
800                                        const rel_time_t exptime,
801                                        uint64_t *cas,
802                                        uint64_t *result)
803 {
804    hash_item *item = do_item_get(engine, key, nkey);
805    ENGINE_ERROR_CODE ret;
806 
807    if (item == NULL) {
808       if (!create) {
809          return ENGINE_KEY_ENOENT;
810       } else {
811          char buffer[128];
812          int len = snprintf(buffer, sizeof(buffer), "%"PRIu64,
813                             (uint64_t)initial);
814 
815          item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
816          if (item == NULL) {
817             return ENGINE_ENOMEM;
818          }
819          memcpy((void*)item_get_data(item), buffer, len);
820          if ((ret = do_store_item(engine, item, cas,
821                                   OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
822              *result = initial;
823              *cas = item_get_cas(item);
824          }
825          do_item_release(engine, item);
826       }
827    } else {
828       ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
829       do_item_release(engine, item);
830    }
831 
832    return ret;
833 }
834 
arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)835 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
836                              const void* cookie,
837                              const void* key,
838                              const int nkey,
839                              const bool increment,
840                              const bool create,
841                              const uint64_t delta,
842                              const uint64_t initial,
843                              const rel_time_t exptime,
844                              uint64_t *cas,
845                              uint64_t *result)
846 {
847     ENGINE_ERROR_CODE ret;
848 
849     pthread_mutex_lock(&engine->cache_lock);
850     ret = do_arithmetic(engine, cookie, key, nkey, increment,
851                         create, delta, initial, exptime, cas,
852                         result);
853     pthread_mutex_unlock(&engine->cache_lock);
854     return ret;
855 }
856 
857 /*
858  * Stores an item in the cache (high level, obeys set/add/replace semantics)
859  */
store_item(struct default_engine * engine,hash_item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)860 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
861                              hash_item *item, uint64_t *cas,
862                              ENGINE_STORE_OPERATION operation,
863                              const void *cookie) {
864     ENGINE_ERROR_CODE ret;
865 
866     pthread_mutex_lock(&engine->cache_lock);
867     ret = do_store_item(engine, item, cas, operation, cookie);
868     pthread_mutex_unlock(&engine->cache_lock);
869     return ret;
870 }
871 
do_touch_item(struct default_engine * engine,const void * key,uint16_t nkey,uint32_t exptime)872 static hash_item *do_touch_item(struct default_engine *engine,
873                                      const void *key,
874                                      uint16_t nkey,
875                                      uint32_t exptime)
876 {
877    hash_item *item = do_item_get(engine, key, nkey);
878    if (item != NULL) {
879        item->exptime = exptime;
880    }
881    return item;
882 }
883 
touch_item(struct default_engine * engine,const void * key,uint16_t nkey,uint32_t exptime)884 hash_item *touch_item(struct default_engine *engine,
885                            const void *key,
886                            uint16_t nkey,
887                            uint32_t exptime)
888 {
889     hash_item *ret;
890 
891     pthread_mutex_lock(&engine->cache_lock);
892     ret = do_touch_item(engine, key, nkey, exptime);
893     pthread_mutex_unlock(&engine->cache_lock);
894     return ret;
895 }
896 
897 /*
898  * Flushes expired items after a flush_all call
899  */
item_flush_expired(struct default_engine * engine,time_t when)900 void item_flush_expired(struct default_engine *engine, time_t when) {
901     int i;
902     hash_item *iter, *next;
903 
904     pthread_mutex_lock(&engine->cache_lock);
905 
906     if (when == 0) {
907         engine->config.oldest_live = engine->server.core->get_current_time() - 1;
908     } else {
909         engine->config.oldest_live = engine->server.core->realtime(when) - 1;
910     }
911 
912     if (engine->config.oldest_live != 0) {
913         for (i = 0; i < POWER_LARGEST; i++) {
914             /*
915              * The LRU is sorted in decreasing time order, and an item's
916              * timestamp is never newer than its last access time, so we
917              * only need to walk back until we hit an item older than the
918              * oldest_live time.
919              * The oldest_live checking will auto-expire the remaining items.
920              */
921             for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
922                 if (iter->time >= engine->config.oldest_live) {
923                     next = iter->next;
924                     if ((iter->iflag & ITEM_SLABBED) == 0) {
925                         do_item_unlink(engine, iter);
926                     }
927                 } else {
928                     /* We've hit the first old item. Continue to the next queue. */
929                     break;
930                 }
931             }
932         }
933     }
934     pthread_mutex_unlock(&engine->cache_lock);
935 }
936 
937 /*
938  * Dumps part of the cache
939  */
item_cachedump(struct default_engine * engine,unsigned int slabs_clsid,unsigned int limit,unsigned int * bytes)940 char *item_cachedump(struct default_engine *engine,
941                      unsigned int slabs_clsid,
942                      unsigned int limit,
943                      unsigned int *bytes) {
944     char *ret;
945 
946     pthread_mutex_lock(&engine->cache_lock);
947     ret = do_item_cachedump(slabs_clsid, limit, bytes);
948     pthread_mutex_unlock(&engine->cache_lock);
949     return ret;
950 }
951 
item_stats(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)952 void item_stats(struct default_engine *engine,
953                    ADD_STAT add_stat, const void *cookie)
954 {
955     pthread_mutex_lock(&engine->cache_lock);
956     do_item_stats(engine, add_stat, cookie);
957     pthread_mutex_unlock(&engine->cache_lock);
958 }
959 
960 
item_stats_sizes(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)961 void item_stats_sizes(struct default_engine *engine,
962                       ADD_STAT add_stat, const void *cookie)
963 {
964     pthread_mutex_lock(&engine->cache_lock);
965     do_item_stats_sizes(engine, add_stat, cookie);
966     pthread_mutex_unlock(&engine->cache_lock);
967 }
968 
do_item_link_cursor(struct default_engine * engine,hash_item * cursor,int ii)969 static void do_item_link_cursor(struct default_engine *engine,
970                                 hash_item *cursor, int ii)
971 {
972     cursor->slabs_clsid = (uint8_t)ii;
973     cursor->next = NULL;
974     cursor->prev = engine->items.tails[ii];
975     engine->items.tails[ii]->next = cursor;
976     engine->items.tails[ii] = cursor;
977     engine->items.sizes[ii]++;
978 }
979 
980 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
981                                       hash_item *item, void *cookie);
982 
do_item_walk_cursor(struct default_engine * engine,hash_item * cursor,int steplength,ITERFUNC itemfunc,void * itemdata,ENGINE_ERROR_CODE * error)983 static bool do_item_walk_cursor(struct default_engine *engine,
984                                 hash_item *cursor,
985                                 int steplength,
986                                 ITERFUNC itemfunc,
987                                 void* itemdata,
988                                 ENGINE_ERROR_CODE *error)
989 {
990     int ii = 0;
991     *error = ENGINE_SUCCESS;
992 
993     while (cursor->prev != NULL && ii < steplength) {
994         ++ii;
995         /* Move cursor */
996         hash_item *ptr = cursor->prev;
997         item_unlink_q(engine, cursor);
998 
999         bool done = false;
1000         if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1001             done = true;
1002             cursor->prev = NULL;
1003         } else {
1004             cursor->next = ptr;
1005             cursor->prev = ptr->prev;
1006             cursor->prev->next = cursor;
1007             ptr->prev = cursor;
1008         }
1009 
1010         /* Ignore cursors */
1011         if (ptr->nkey == 0 && ptr->nbytes == 0) {
1012             --ii;
1013         } else {
1014             *error = itemfunc(engine, ptr, itemdata);
1015             if (*error != ENGINE_SUCCESS) {
1016                 return false;
1017             }
1018         }
1019 
1020         if (done) {
1021             return false;
1022         }
1023     }
1024 
1025     return (cursor->prev != NULL);
1026 }
1027 
item_scrub(struct default_engine * engine,hash_item * item,void * cookie)1028 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1029                                     hash_item *item,
1030                                     void *cookie) {
1031     (void)cookie;
1032     engine->scrubber.visited++;
1033     rel_time_t current_time = engine->server.core->get_current_time();
1034     if (item->refcount == 0 &&
1035         (item->exptime != 0 && item->exptime < current_time)) {
1036         do_item_unlink(engine, item);
1037         engine->scrubber.cleaned++;
1038     }
1039     return ENGINE_SUCCESS;
1040 }
1041 
item_scrub_class(struct default_engine * engine,hash_item * cursor)1042 static void item_scrub_class(struct default_engine *engine,
1043                              hash_item *cursor) {
1044 
1045     ENGINE_ERROR_CODE ret;
1046     bool more;
1047     do {
1048         pthread_mutex_lock(&engine->cache_lock);
1049         more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1050         pthread_mutex_unlock(&engine->cache_lock);
1051         if (ret != ENGINE_SUCCESS) {
1052             break;
1053         }
1054     } while (more);
1055 }
1056 
item_scubber_main(void * arg)1057 static void *item_scubber_main(void *arg)
1058 {
1059     struct default_engine *engine = arg;
1060     hash_item cursor = { .refcount = 1 };
1061 
1062     for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1063         pthread_mutex_lock(&engine->cache_lock);
1064         bool skip = false;
1065         if (engine->items.heads[ii] == NULL) {
1066             skip = true;
1067         } else {
1068             // add the item at the tail
1069             do_item_link_cursor(engine, &cursor, ii);
1070         }
1071         pthread_mutex_unlock(&engine->cache_lock);
1072 
1073         if (!skip) {
1074             item_scrub_class(engine, &cursor);
1075         }
1076     }
1077 
1078     pthread_mutex_lock(&engine->scrubber.lock);
1079     engine->scrubber.stopped = time(NULL);
1080     engine->scrubber.running = false;
1081     pthread_mutex_unlock(&engine->scrubber.lock);
1082 
1083     return NULL;
1084 }
1085 
item_start_scrub(struct default_engine * engine)1086 bool item_start_scrub(struct default_engine *engine)
1087 {
1088     bool ret = false;
1089     pthread_mutex_lock(&engine->scrubber.lock);
1090     if (!engine->scrubber.running) {
1091         engine->scrubber.started = time(NULL);
1092         engine->scrubber.stopped = 0;
1093         engine->scrubber.visited = 0;
1094         engine->scrubber.cleaned = 0;
1095         engine->scrubber.running = true;
1096 
1097         pthread_t t;
1098         pthread_attr_t attr;
1099 
1100         if (pthread_attr_init(&attr) != 0 ||
1101             pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1102             pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1103         {
1104             engine->scrubber.running = false;
1105         } else {
1106             ret = true;
1107         }
1108     }
1109     pthread_mutex_unlock(&engine->scrubber.lock);
1110 
1111     return ret;
1112 }
1113 
1114 struct tap_client {
1115     hash_item cursor;
1116     hash_item *it;
1117 };
1118 
item_tap_iterfunc(struct default_engine * engine,hash_item * item,void * cookie)1119 static ENGINE_ERROR_CODE item_tap_iterfunc(struct default_engine *engine,
1120                                     hash_item *item,
1121                                     void *cookie) {
1122     struct tap_client *client = cookie;
1123     client->it = item;
1124     ++client->it->refcount;
1125     return ENGINE_SUCCESS;
1126 }
1127 
do_item_tap_walker(struct default_engine * engine,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)1128 static tap_event_t do_item_tap_walker(struct default_engine *engine,
1129                                          const void *cookie, item **itm,
1130                                          void **es, uint16_t *nes, uint8_t *ttl,
1131                                          uint16_t *flags, uint32_t *seqno,
1132                                          uint16_t *vbucket)
1133 {
1134     struct tap_client *client = engine->server.cookie->get_engine_specific(cookie);
1135     if (client == NULL) {
1136         return TAP_DISCONNECT;
1137     }
1138 
1139     *es = NULL;
1140     *nes = 0;
1141     *ttl = (uint8_t)-1;
1142     *seqno = 0;
1143     *flags = 0;
1144     *vbucket = 0;
1145     client->it = NULL;
1146 
1147     ENGINE_ERROR_CODE r;
1148     do {
1149         if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
1150             // find next slab class to look at..
1151             bool linked = false;
1152             for (int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked;  ++ii) {
1153                 if (engine->items.heads[ii] != NULL) {
1154                     // add the item at the tail
1155                     do_item_link_cursor(engine, &client->cursor, ii);
1156                     linked = true;
1157                 }
1158             }
1159             if (!linked) {
1160                 break;
1161             }
1162         }
1163     } while (client->it == NULL);
1164     *itm = client->it;
1165 
1166     return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
1167 }
1168 
item_tap_walker(ENGINE_HANDLE * handle,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)1169 tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
1170                             const void *cookie, item **itm,
1171                             void **es, uint16_t *nes, uint8_t *ttl,
1172                             uint16_t *flags, uint32_t *seqno,
1173                             uint16_t *vbucket)
1174 {
1175     tap_event_t ret;
1176     struct default_engine *engine = (struct default_engine*)handle;
1177     pthread_mutex_lock(&engine->cache_lock);
1178     ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1179     pthread_mutex_unlock(&engine->cache_lock);
1180 
1181     return ret;
1182 }
1183 
initialize_item_tap_walker(struct default_engine * engine,const void * cookie)1184 bool initialize_item_tap_walker(struct default_engine *engine,
1185                                 const void* cookie)
1186 {
1187     struct tap_client *client = calloc(1, sizeof(*client));
1188     if (client == NULL) {
1189         return false;
1190     }
1191     client->cursor.refcount = 1;
1192 
1193     /* Link the cursor! */
1194     bool linked = false;
1195     for (int ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1196         pthread_mutex_lock(&engine->cache_lock);
1197         if (engine->items.heads[ii] != NULL) {
1198             // add the item at the tail
1199             do_item_link_cursor(engine, &client->cursor, ii);
1200             linked = true;
1201         }
1202         pthread_mutex_unlock(&engine->cache_lock);
1203     }
1204 
1205     engine->server.cookie->store_engine_specific(cookie, client);
1206     return true;
1207 }
1208