1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <fcntl.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <time.h>
9 #include <assert.h>
10 #include <inttypes.h>
11 
12 #include "default_engine.h"
13 
14 /* Forward Declarations */
15 static void item_link_q(struct default_engine *engine, hash_item *it);
16 static void item_unlink_q(struct default_engine *engine, hash_item *it);
17 static hash_item *do_item_alloc(struct default_engine *engine,
18                                 const void *key, const size_t nkey,
19                                 const int flags, const rel_time_t exptime,
20                                 const int nbytes,
21                                 const void *cookie);
22 static hash_item *do_item_get(struct default_engine *engine,
23                               const char *key, const size_t nkey);
24 static int do_item_link(struct default_engine *engine, hash_item *it);
25 static void do_item_unlink(struct default_engine *engine, hash_item *it);
26 static void do_item_release(struct default_engine *engine, hash_item *it);
27 static void do_item_update(struct default_engine *engine, hash_item *it);
28 static int do_item_replace(struct default_engine *engine,
29                             hash_item *it, hash_item *new_it);
30 static void item_free(struct default_engine *engine, hash_item *it);
31 
32 /*
33  * We only reposition items in the LRU queue if they haven't been repositioned
34  * in this many seconds. That saves us from churning on frequently-accessed
35  * items.
36  */
37 #define ITEM_UPDATE_INTERVAL 60
38 /*
39  * To avoid scanning through the complete cache in some circumstances we'll
40  * just give up and return an error after inspecting a fixed number of objects.
41  */
42 static const int search_items = 50;
43 
item_stats_reset(struct default_engine * engine)44 void item_stats_reset(struct default_engine *engine) {
45     pthread_mutex_lock(&engine->cache_lock);
46     memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
47     pthread_mutex_unlock(&engine->cache_lock);
48 }
49 
50 
51 /* warning: don't use these macros with a function, as it evals its arg twice */
ITEM_ntotal(struct default_engine * engine,const hash_item * item)52 static inline size_t ITEM_ntotal(struct default_engine *engine,
53                                  const hash_item *item) {
54     size_t ret = sizeof(*item) + item->nkey + item->nbytes;
55     if (engine->config.use_cas) {
56         ret += sizeof(uint64_t);
57     }
58 
59     return ret;
60 }
61 
62 /* Get the next CAS id for a new item. */
get_cas_id(void)63 static uint64_t get_cas_id(void) {
64     static uint64_t cas_id = 0;
65     return ++cas_id;
66 }
67 
68 /* Enable this for reference-count debugging. */
69 #if 0
70 # define DEBUG_REFCNT(it,op) \
71                 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
72                         it, op, it->refcount, \
73                         (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
74                         (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
75 #else
76 # define DEBUG_REFCNT(it,op) while(0)
77 #endif
78 
79 
80 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)81 hash_item *do_item_alloc(struct default_engine *engine,
82                          const void *key,
83                          const size_t nkey,
84                          const int flags,
85                          const rel_time_t exptime,
86                          const int nbytes,
87                          const void *cookie) {
88     hash_item *it = NULL;
89     size_t ntotal = sizeof(hash_item) + nkey + nbytes;
90     if (engine->config.use_cas) {
91         ntotal += sizeof(uint64_t);
92     }
93 
94     unsigned int id = slabs_clsid(engine, ntotal);
95     if (id == 0)
96         return 0;
97 
98     /* do a quick check if we have any expired items in the tail.. */
99     int tries = search_items;
100     hash_item *search;
101     rel_time_t oldest_live = engine->config.oldest_live;
102     rel_time_t current_time = engine->server.core->get_current_time();
103 
104     for (search = engine->items.tails[id];
105          tries > 0 && search != NULL;
106          tries--, search=search->prev) {
107         if (search->refcount == 0 &&
108             ((search->time < oldest_live) || //dead by flush
109              (search->exptime != 0 && search->exptime < current_time))) {
110             it = search;
111             /* I don't want to actually free the object, just steal
112              * the item to avoid to grab the slab mutex twice ;-)
113              */
114             pthread_mutex_lock(&engine->stats.lock);
115             engine->stats.reclaimed++;
116             pthread_mutex_unlock(&engine->stats.lock);
117             engine->items.itemstats[id].reclaimed++;
118             it->refcount = 1;
119             slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
120             do_item_unlink(engine, it);
121             /* Initialize the item block: */
122             it->slabs_clsid = 0;
123             it->refcount = 0;
124             break;
125         }
126     }
127 
128     if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
129         /*
130         ** Could not find an expired item at the tail, and memory allocation
131         ** failed. Try to evict some items!
132         */
133         tries = search_items;
134 
135         /* If requested to not push old items out of cache when memory runs out,
136          * we're out of luck at this point...
137          */
138 
139         if (engine->config.evict_to_free == 0) {
140             engine->items.itemstats[id].outofmemory++;
141             return NULL;
142         }
143 
144         /*
145          * try to get one off the right LRU
146          * don't necessariuly unlink the tail because it may be locked: refcount>0
147          * search up from tail an item with refcount==0 and unlink it; give up after search_items
148          * tries
149          */
150 
151         if (engine->items.tails[id] == 0) {
152             engine->items.itemstats[id].outofmemory++;
153             return NULL;
154         }
155 
156         for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
157             if (search->refcount == 0) {
158                 if (search->exptime == 0 || search->exptime > current_time) {
159                     engine->items.itemstats[id].evicted++;
160                     engine->items.itemstats[id].evicted_time = current_time - search->time;
161                     if (search->exptime != 0) {
162                         engine->items.itemstats[id].evicted_nonzero++;
163                     }
164                     pthread_mutex_lock(&engine->stats.lock);
165                     engine->stats.evictions++;
166                     pthread_mutex_unlock(&engine->stats.lock);
167                     engine->server.stat->evicting(cookie,
168                                                   item_get_key(search),
169                                                   search->nkey);
170                 } else {
171                     engine->items.itemstats[id].reclaimed++;
172                     pthread_mutex_lock(&engine->stats.lock);
173                     engine->stats.reclaimed++;
174                     pthread_mutex_unlock(&engine->stats.lock);
175                 }
176                 do_item_unlink(engine, search);
177                 break;
178             }
179         }
180         it = slabs_alloc(engine, ntotal, id);
181         if (it == 0) {
182             engine->items.itemstats[id].outofmemory++;
183             /* Last ditch effort. There is a very rare bug which causes
184              * refcount leaks. We've fixed most of them, but it still happens,
185              * and it may happen in the future.
186              * We can reasonably assume no item can stay locked for more than
187              * three hours, so if we find one in the tail which is that old,
188              * free it anyway.
189              */
190             tries = search_items;
191             for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
192                 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
193                     engine->items.itemstats[id].tailrepairs++;
194                     search->refcount = 0;
195                     do_item_unlink(engine, search);
196                     break;
197                 }
198             }
199             it = slabs_alloc(engine, ntotal, id);
200             if (it == 0) {
201                 return NULL;
202             }
203         }
204     }
205 
206     assert(it->slabs_clsid == 0);
207 
208     it->slabs_clsid = id;
209 
210     assert(it != engine->items.heads[it->slabs_clsid]);
211 
212     it->next = it->prev = it->h_next = 0;
213     it->refcount = 1;     /* the caller will have a reference */
214     DEBUG_REFCNT(it, '*');
215     it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
216     it->nkey = nkey;
217     it->nbytes = nbytes;
218     it->flags = flags;
219     memcpy((void*)item_get_key(it), key, nkey);
220     it->exptime = exptime;
221     return it;
222 }
223 
item_free(struct default_engine * engine,hash_item * it)224 static void item_free(struct default_engine *engine, hash_item *it) {
225     size_t ntotal = ITEM_ntotal(engine, it);
226     unsigned int clsid;
227     assert((it->iflag & ITEM_LINKED) == 0);
228     assert(it != engine->items.heads[it->slabs_clsid]);
229     assert(it != engine->items.tails[it->slabs_clsid]);
230     assert(it->refcount == 0);
231 
232     /* so slab size changer can tell later if item is already free or not */
233     clsid = it->slabs_clsid;
234     it->slabs_clsid = 0;
235     it->iflag |= ITEM_SLABBED;
236     DEBUG_REFCNT(it, 'F');
237     slabs_free(engine, it, ntotal, clsid);
238 }
239 
item_link_q(struct default_engine * engine,hash_item * it)240 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
241     hash_item **head, **tail;
242     assert(it->slabs_clsid < POWER_LARGEST);
243     assert((it->iflag & ITEM_SLABBED) == 0);
244 
245     head = &engine->items.heads[it->slabs_clsid];
246     tail = &engine->items.tails[it->slabs_clsid];
247     assert(it != *head);
248     assert((*head && *tail) || (*head == 0 && *tail == 0));
249     it->prev = 0;
250     it->next = *head;
251     if (it->next) it->next->prev = it;
252     *head = it;
253     if (*tail == 0) *tail = it;
254     engine->items.sizes[it->slabs_clsid]++;
255     return;
256 }
257 
item_unlink_q(struct default_engine * engine,hash_item * it)258 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
259     hash_item **head, **tail;
260     assert(it->slabs_clsid < POWER_LARGEST);
261     head = &engine->items.heads[it->slabs_clsid];
262     tail = &engine->items.tails[it->slabs_clsid];
263 
264     if (*head == it) {
265         assert(it->prev == 0);
266         *head = it->next;
267     }
268     if (*tail == it) {
269         assert(it->next == 0);
270         *tail = it->prev;
271     }
272     assert(it->next != it);
273     assert(it->prev != it);
274 
275     if (it->next) it->next->prev = it->prev;
276     if (it->prev) it->prev->next = it->next;
277     engine->items.sizes[it->slabs_clsid]--;
278     return;
279 }
280 
do_item_link(struct default_engine * engine,hash_item * it)281 int do_item_link(struct default_engine *engine, hash_item *it) {
282     MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
283     assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
284     assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
285     it->iflag |= ITEM_LINKED;
286     it->time = engine->server.core->get_current_time();
287     assoc_insert(engine, engine->server.core->hash(item_get_key(it),
288                                                         it->nkey, 0),
289                  it);
290 
291     pthread_mutex_lock(&engine->stats.lock);
292     engine->stats.curr_bytes += ITEM_ntotal(engine, it);
293     engine->stats.curr_items += 1;
294     engine->stats.total_items += 1;
295     pthread_mutex_unlock(&engine->stats.lock);
296 
297     /* Allocate a new CAS ID on link. */
298     item_set_cas(NULL, NULL, it, get_cas_id());
299 
300     item_link_q(engine, it);
301 
302     return 1;
303 }
304 
do_item_unlink(struct default_engine * engine,hash_item * it)305 void do_item_unlink(struct default_engine *engine, hash_item *it) {
306     MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
307     if ((it->iflag & ITEM_LINKED) != 0) {
308         it->iflag &= ~ITEM_LINKED;
309         pthread_mutex_lock(&engine->stats.lock);
310         engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
311         engine->stats.curr_items -= 1;
312         pthread_mutex_unlock(&engine->stats.lock);
313         assoc_delete(engine, engine->server.core->hash(item_get_key(it),
314                                                             it->nkey, 0),
315                      item_get_key(it), it->nkey);
316         item_unlink_q(engine, it);
317         if (it->refcount == 0) {
318             item_free(engine, it);
319         }
320     }
321 }
322 
do_item_release(struct default_engine * engine,hash_item * it)323 void do_item_release(struct default_engine *engine, hash_item *it) {
324     MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
325     if (it->refcount != 0) {
326         it->refcount--;
327         DEBUG_REFCNT(it, '-');
328     }
329     if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
330         item_free(engine, it);
331     }
332 }
333 
do_item_update(struct default_engine * engine,hash_item * it)334 void do_item_update(struct default_engine *engine, hash_item *it) {
335     rel_time_t current_time = engine->server.core->get_current_time();
336     MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
337     if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
338         assert((it->iflag & ITEM_SLABBED) == 0);
339 
340         if ((it->iflag & ITEM_LINKED) != 0) {
341             item_unlink_q(engine, it);
342             it->time = current_time;
343             item_link_q(engine, it);
344         }
345     }
346 }
347 
do_item_replace(struct default_engine * engine,hash_item * it,hash_item * new_it)348 int do_item_replace(struct default_engine *engine,
349                     hash_item *it, hash_item *new_it) {
350     MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
351                            item_get_key(new_it), new_it->nkey, new_it->nbytes);
352     assert((it->iflag & ITEM_SLABBED) == 0);
353 
354     do_item_unlink(engine, it);
355     return do_item_link(engine, new_it);
356 }
357 
358 /*@null@*/
do_item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)359 static char *do_item_cachedump(const unsigned int slabs_clsid,
360                                const unsigned int limit,
361                                unsigned int *bytes) {
362 #ifdef FUTURE
363     unsigned int memlimit = 2 * 1024 * 1024;   /* 2MB max response size */
364     char *buffer;
365     unsigned int bufcurr;
366     hash_item *it;
367     unsigned int len;
368     unsigned int shown = 0;
369     char key_temp[KEY_MAX_LENGTH + 1];
370     char temp[512];
371 
372     it = engine->items.heads[slabs_clsid];
373 
374     buffer = malloc((size_t)memlimit);
375     if (buffer == 0) return NULL;
376     bufcurr = 0;
377 
378 
379     while (it != NULL && (limit == 0 || shown < limit)) {
380         assert(it->nkey <= KEY_MAX_LENGTH);
381         /* Copy the key since it may not be null-terminated in the struct */
382         strncpy(key_temp, item_get_key(it), it->nkey);
383         key_temp[it->nkey] = 0x00; /* terminate */
384         len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
385                        key_temp, it->nbytes,
386                        (unsigned long)it->exptime + process_started);
387         if (bufcurr + len + 6 > memlimit)  /* 6 is END\r\n\0 */
388             break;
389         memcpy(buffer + bufcurr, temp, len);
390         bufcurr += len;
391         shown++;
392         it = it->next;
393     }
394 
395 
396     memcpy(buffer + bufcurr, "END\r\n", 6);
397     bufcurr += 5;
398 
399     *bytes = bufcurr;
400     return buffer;
401 #endif
402     (void)slabs_clsid;
403     (void)limit;
404     (void)bytes;
405     return NULL;
406 }
407 
do_item_stats(struct default_engine * engine,ADD_STAT add_stats,const void * c)408 static void do_item_stats(struct default_engine *engine,
409                           ADD_STAT add_stats, const void *c) {
410     int i;
411     rel_time_t current_time = engine->server.core->get_current_time();
412     for (i = 0; i < POWER_LARGEST; i++) {
413         if (engine->items.tails[i] != NULL) {
414             int search = search_items;
415             while (search > 0 &&
416                    engine->items.tails[i] != NULL &&
417                    ((engine->config.oldest_live != 0 && /* Item flushd */
418                      engine->config.oldest_live <= current_time &&
419                      engine->items.tails[i]->time <= engine->config.oldest_live) ||
420                     (engine->items.tails[i]->exptime != 0 && /* and not expired */
421                      engine->items.tails[i]->exptime < current_time))) {
422                 --search;
423                 if (engine->items.tails[i]->refcount == 0) {
424                     do_item_unlink(engine, engine->items.tails[i]);
425                 } else {
426                     break;
427                 }
428             }
429             if (engine->items.tails[i] == NULL) {
430                 /* We removed all of the items in this slab class */
431                 continue;
432             }
433 
434             const char *prefix = "items";
435             add_statistics(c, add_stats, prefix, i, "number", "%u",
436                            engine->items.sizes[i]);
437             add_statistics(c, add_stats, prefix, i, "age", "%u",
438                            engine->items.tails[i]->time);
439             add_statistics(c, add_stats, prefix, i, "evicted",
440                            "%u", engine->items.itemstats[i].evicted);
441             add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
442                            "%u", engine->items.itemstats[i].evicted_nonzero);
443             add_statistics(c, add_stats, prefix, i, "evicted_time",
444                            "%u", engine->items.itemstats[i].evicted_time);
445             add_statistics(c, add_stats, prefix, i, "outofmemory",
446                            "%u", engine->items.itemstats[i].outofmemory);
447             add_statistics(c, add_stats, prefix, i, "tailrepairs",
448                            "%u", engine->items.itemstats[i].tailrepairs);;
449             add_statistics(c, add_stats, prefix, i, "reclaimed",
450                            "%u", engine->items.itemstats[i].reclaimed);;
451         }
452     }
453 }
454 
455 /** dumps out a list of objects of each size, with granularity of 32 bytes */
456 /*@null@*/
do_item_stats_sizes(struct default_engine * engine,ADD_STAT add_stats,const void * c)457 static void do_item_stats_sizes(struct default_engine *engine,
458                                 ADD_STAT add_stats, const void *c) {
459 
460     /* max 1MB object, divided into 32 bytes size buckets */
461     const int num_buckets = 32768;
462     unsigned int *histogram = calloc(num_buckets, sizeof(int));
463 
464     if (histogram != NULL) {
465         int i;
466 
467         /* build the histogram */
468         for (i = 0; i < POWER_LARGEST; i++) {
469             hash_item *iter = engine->items.heads[i];
470             while (iter) {
471                 int ntotal = ITEM_ntotal(engine, iter);
472                 int bucket = ntotal / 32;
473                 if ((ntotal % 32) != 0) bucket++;
474                 if (bucket < num_buckets) histogram[bucket]++;
475                 iter = iter->next;
476             }
477         }
478 
479         /* write the buffer */
480         for (i = 0; i < num_buckets; i++) {
481             if (histogram[i] != 0) {
482                 char key[8], val[32];
483                 int klen, vlen;
484                 klen = snprintf(key, sizeof(key), "%d", i * 32);
485                 vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
486                 assert(klen < sizeof(key));
487                 assert(vlen < sizeof(val));
488                 add_stats(key, klen, val, vlen, c);
489             }
490         }
491         free(histogram);
492     }
493 }
494 
495 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(struct default_engine * engine,const char * key,const size_t nkey)496 hash_item *do_item_get(struct default_engine *engine,
497                        const char *key, const size_t nkey) {
498     rel_time_t current_time = engine->server.core->get_current_time();
499     hash_item *it = assoc_find(engine, engine->server.core->hash(key,
500                                                                       nkey, 0),
501                                key, nkey);
502     int was_found = 0;
503 
504     if (engine->config.verbose > 2) {
505         EXTENSION_LOGGER_DESCRIPTOR *logger;
506         logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
507         if (it == NULL) {
508             logger->log(EXTENSION_LOG_DEBUG, NULL,
509                         "> NOT FOUND %s", key);
510         } else {
511             logger->log(EXTENSION_LOG_DEBUG, NULL,
512                         "> FOUND KEY %s",
513                         (const char*)item_get_key(it));
514             was_found++;
515         }
516     }
517 
518     if (it != NULL && engine->config.oldest_live != 0 &&
519         engine->config.oldest_live <= current_time &&
520         it->time <= engine->config.oldest_live) {
521         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
522         it = NULL;
523     }
524 
525     if (it == NULL && was_found) {
526         EXTENSION_LOGGER_DESCRIPTOR *logger;
527         logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
528         logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by flush");
529         was_found--;
530     }
531 
532     if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
533         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
534         it = NULL;
535     }
536 
537     if (it == NULL && was_found) {
538         EXTENSION_LOGGER_DESCRIPTOR *logger;
539         logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
540         logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by expire");
541         was_found--;
542     }
543 
544     if (it != NULL) {
545         it->refcount++;
546         DEBUG_REFCNT(it, '+');
547         do_item_update(engine, it);
548     }
549 
550     return it;
551 }
552 
553 /*
554  * Stores an item in the cache according to the semantics of one of the set
555  * commands. In threaded mode, this is protected by the cache lock.
556  *
557  * Returns the state of storage.
558  */
do_store_item(struct default_engine * engine,hash_item * it,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)559 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
560                                        hash_item *it, uint64_t *cas,
561                                        ENGINE_STORE_OPERATION operation,
562                                        const void *cookie) {
563     const char *key = item_get_key(it);
564     hash_item *old_it = do_item_get(engine, key, it->nkey);
565     ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
566 
567     hash_item *new_it = NULL;
568 
569     if (old_it != NULL && operation == OPERATION_ADD) {
570         /* add only adds a nonexistent item, but promote to head of LRU */
571         do_item_update(engine, old_it);
572     } else if (!old_it && (operation == OPERATION_REPLACE
573         || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
574     {
575         /* replace only replaces an existing value; don't store */
576     } else if (operation == OPERATION_CAS) {
577         /* validate cas operation */
578         if(old_it == NULL) {
579             // LRU expired
580             stored = ENGINE_KEY_ENOENT;
581         }
582         else if (item_get_cas(it) == item_get_cas(old_it)) {
583             // cas validates
584             // it and old_it may belong to different classes.
585             // I'm updating the stats for the one that's getting pushed out
586             do_item_replace(engine, old_it, it);
587             stored = ENGINE_SUCCESS;
588         } else {
589             if (engine->config.verbose > 1) {
590                 EXTENSION_LOGGER_DESCRIPTOR *logger;
591                 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
592                 logger->log(EXTENSION_LOG_INFO, NULL,
593                         "CAS:  failure: expected %"PRIu64", got %"PRIu64"\n",
594                         item_get_cas(old_it),
595                         item_get_cas(it));
596             }
597             stored = ENGINE_KEY_EEXISTS;
598         }
599     } else {
600         /*
601          * Append - combine new and old record into single one. Here it's
602          * atomic and thread-safe.
603          */
604         if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
605             /*
606              * Validate CAS
607              */
608             if (item_get_cas(it) != 0) {
609                 // CAS much be equal
610                 if (item_get_cas(it) != item_get_cas(old_it)) {
611                     stored = ENGINE_KEY_EEXISTS;
612                 }
613             }
614 
615             if (stored == ENGINE_NOT_STORED) {
616                 /* we have it and old_it here - alloc memory to hold both */
617                 new_it = do_item_alloc(engine, key, it->nkey,
618                                        old_it->flags,
619                                        old_it->exptime,
620                                        it->nbytes + old_it->nbytes,
621                                        cookie);
622 
623                 if (new_it == NULL) {
624                     /* SERVER_ERROR out of memory */
625                     if (old_it != NULL) {
626                         do_item_release(engine, old_it);
627                     }
628 
629                     return ENGINE_NOT_STORED;
630                 }
631 
632                 /* copy data from it and old_it to new_it */
633 
634                 if (operation == OPERATION_APPEND) {
635                     memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
636                     memcpy(item_get_data(new_it) + old_it->nbytes, item_get_data(it), it->nbytes);
637                 } else {
638                     /* OPERATION_PREPEND */
639                     memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
640                     memcpy(item_get_data(new_it) + it->nbytes, item_get_data(old_it), old_it->nbytes);
641                 }
642 
643                 it = new_it;
644             }
645         }
646 
647         if (stored == ENGINE_NOT_STORED) {
648             if (old_it != NULL) {
649                 do_item_replace(engine, old_it, it);
650             } else {
651                 do_item_link(engine, it);
652             }
653 
654             *cas = item_get_cas(it);
655             stored = ENGINE_SUCCESS;
656         }
657     }
658 
659     if (old_it != NULL) {
660         do_item_release(engine, old_it);         /* release our reference */
661     }
662 
663     if (new_it != NULL) {
664         do_item_release(engine, new_it);
665     }
666 
667     if (stored == ENGINE_SUCCESS) {
668         *cas = item_get_cas(it);
669     }
670 
671     return stored;
672 }
673 
674 
675 /*
676  * adds a delta value to a numeric item.
677  *
678  * c     connection requesting the operation
679  * it    item to adjust
680  * incr  true to increment value, false to decrement
681  * delta amount to adjust value by
682  * buf   buffer for response string
683  *
684  * returns a response string to send back to the client.
685  */
do_add_delta(struct default_engine * engine,hash_item * it,const bool incr,const int64_t delta,uint64_t * rcas,uint64_t * result,const void * cookie)686 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
687                                       hash_item *it, const bool incr,
688                                       const int64_t delta, uint64_t *rcas,
689                                       uint64_t *result, const void *cookie) {
690     const char *ptr;
691     uint64_t value;
692     char buf[80];
693     int res;
694 
695     if (it->nbytes >= (sizeof(buf) - 1)) {
696         return ENGINE_EINVAL;
697     }
698 
699     ptr = item_get_data(it);
700     memcpy(buf, ptr, it->nbytes);
701     buf[it->nbytes] = '\0';
702 
703     if (!safe_strtoull(buf, &value)) {
704         return ENGINE_EINVAL;
705     }
706 
707     if (incr) {
708         value += delta;
709     } else {
710         if(delta > value) {
711             value = 0;
712         } else {
713             value -= delta;
714         }
715     }
716 
717     *result = value;
718     if ((res = snprintf(buf, sizeof(buf), "%" PRIu64, value)) == -1) {
719         return ENGINE_EINVAL;
720     }
721 
722     if (it->refcount == 1 && res <= it->nbytes) {
723         // we can do inline replacement
724         memcpy(item_get_data(it), buf, res);
725         memset(item_get_data(it) + res, ' ', it->nbytes - res);
726         item_set_cas(NULL, NULL, it, get_cas_id());
727         *rcas = item_get_cas(it);
728     } else {
729         hash_item *new_it = do_item_alloc(engine, item_get_key(it),
730                                           it->nkey, it->flags,
731                                           it->exptime, res,
732                                           cookie);
733         if (new_it == NULL) {
734             do_item_unlink(engine, it);
735             return ENGINE_ENOMEM;
736         }
737         memcpy(item_get_data(new_it), buf, res);
738         do_item_replace(engine, it, new_it);
739         *rcas = item_get_cas(new_it);
740         do_item_release(engine, new_it);       /* release our reference */
741     }
742 
743     return ENGINE_SUCCESS;
744 }
745 
746 /********************************* ITEM ACCESS *******************************/
747 
748 /*
749  * Allocates a new item.
750  */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)751 hash_item *item_alloc(struct default_engine *engine,
752                       const void *key, size_t nkey, int flags,
753                       rel_time_t exptime, int nbytes, const void *cookie) {
754     hash_item *it;
755     pthread_mutex_lock(&engine->cache_lock);
756     it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
757     pthread_mutex_unlock(&engine->cache_lock);
758     return it;
759 }
760 
761 /*
762  * Returns an item if it hasn't been marked as expired,
763  * lazy-expiring as needed.
764  */
item_get(struct default_engine * engine,const void * key,const size_t nkey)765 hash_item *item_get(struct default_engine *engine,
766                     const void *key, const size_t nkey) {
767     hash_item *it;
768     pthread_mutex_lock(&engine->cache_lock);
769     it = do_item_get(engine, key, nkey);
770     pthread_mutex_unlock(&engine->cache_lock);
771     return it;
772 }
773 
774 /*
775  * Decrements the reference count on an item and adds it to the freelist if
776  * needed.
777  */
item_release(struct default_engine * engine,hash_item * item)778 void item_release(struct default_engine *engine, hash_item *item) {
779     pthread_mutex_lock(&engine->cache_lock);
780     do_item_release(engine, item);
781     pthread_mutex_unlock(&engine->cache_lock);
782 }
783 
784 /*
785  * Unlinks an item from the LRU and hashtable.
786  */
item_unlink(struct default_engine * engine,hash_item * item)787 void item_unlink(struct default_engine *engine, hash_item *item) {
788     pthread_mutex_lock(&engine->cache_lock);
789     do_item_unlink(engine, item);
790     pthread_mutex_unlock(&engine->cache_lock);
791 }
792 
do_arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)793 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
794                                        const void* cookie,
795                                        const void* key,
796                                        const int nkey,
797                                        const bool increment,
798                                        const bool create,
799                                        const uint64_t delta,
800                                        const uint64_t initial,
801                                        const rel_time_t exptime,
802                                        uint64_t *cas,
803                                        uint64_t *result)
804 {
805    hash_item *item = do_item_get(engine, key, nkey);
806    ENGINE_ERROR_CODE ret;
807 
808    if (item == NULL) {
809       if (!create) {
810          return ENGINE_KEY_ENOENT;
811       } else {
812          char buffer[128];
813          int len = snprintf(buffer, sizeof(buffer), "%"PRIu64,
814                             (uint64_t)initial);
815 
816          item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
817          if (item == NULL) {
818             return ENGINE_ENOMEM;
819          }
820          memcpy((void*)item_get_data(item), buffer, len);
821          if ((ret = do_store_item(engine, item, cas,
822                                   OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
823              *result = initial;
824              *cas = item_get_cas(item);
825          }
826          do_item_release(engine, item);
827       }
828    } else {
829       ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
830       do_item_release(engine, item);
831    }
832 
833    return ret;
834 }
835 
arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)836 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
837                              const void* cookie,
838                              const void* key,
839                              const int nkey,
840                              const bool increment,
841                              const bool create,
842                              const uint64_t delta,
843                              const uint64_t initial,
844                              const rel_time_t exptime,
845                              uint64_t *cas,
846                              uint64_t *result)
847 {
848     ENGINE_ERROR_CODE ret;
849 
850     pthread_mutex_lock(&engine->cache_lock);
851     ret = do_arithmetic(engine, cookie, key, nkey, increment,
852                         create, delta, initial, exptime, cas,
853                         result);
854     pthread_mutex_unlock(&engine->cache_lock);
855     return ret;
856 }
857 
858 /*
859  * Stores an item in the cache (high level, obeys set/add/replace semantics)
860  */
store_item(struct default_engine * engine,hash_item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)861 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
862                              hash_item *item, uint64_t *cas,
863                              ENGINE_STORE_OPERATION operation,
864                              const void *cookie) {
865     ENGINE_ERROR_CODE ret;
866 
867     pthread_mutex_lock(&engine->cache_lock);
868     ret = do_store_item(engine, item, cas, operation, cookie);
869     pthread_mutex_unlock(&engine->cache_lock);
870     return ret;
871 }
872 
do_touch_item(struct default_engine * engine,const void * key,uint16_t nkey,uint32_t exptime)873 static hash_item *do_touch_item(struct default_engine *engine,
874                                      const void *key,
875                                      uint16_t nkey,
876                                      uint32_t exptime)
877 {
878    hash_item *item = do_item_get(engine, key, nkey);
879    if (item != NULL) {
880        item->exptime = exptime;
881    }
882    return item;
883 }
884 
touch_item(struct default_engine * engine,const void * key,uint16_t nkey,uint32_t exptime)885 hash_item *touch_item(struct default_engine *engine,
886                            const void *key,
887                            uint16_t nkey,
888                            uint32_t exptime)
889 {
890     hash_item *ret;
891 
892     pthread_mutex_lock(&engine->cache_lock);
893     ret = do_touch_item(engine, key, nkey, exptime);
894     pthread_mutex_unlock(&engine->cache_lock);
895     return ret;
896 }
897 
898 /*
899  * Flushes expired items after a flush_all call
900  */
item_flush_expired(struct default_engine * engine,time_t when)901 void item_flush_expired(struct default_engine *engine, time_t when) {
902     int i;
903     hash_item *iter, *next;
904 
905     pthread_mutex_lock(&engine->cache_lock);
906 
907     if (when == 0) {
908         engine->config.oldest_live = engine->server.core->get_current_time() - 1;
909     } else {
910         engine->config.oldest_live = engine->server.core->realtime(when) - 1;
911     }
912 
913     if (engine->config.oldest_live != 0) {
914         for (i = 0; i < POWER_LARGEST; i++) {
915             /*
916              * The LRU is sorted in decreasing time order, and an item's
917              * timestamp is never newer than its last access time, so we
918              * only need to walk back until we hit an item older than the
919              * oldest_live time.
920              * The oldest_live checking will auto-expire the remaining items.
921              */
922             for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
923                 if (iter->time >= engine->config.oldest_live) {
924                     next = iter->next;
925                     if ((iter->iflag & ITEM_SLABBED) == 0) {
926                         do_item_unlink(engine, iter);
927                     }
928                 } else {
929                     /* We've hit the first old item. Continue to the next queue. */
930                     break;
931                 }
932             }
933         }
934     }
935     pthread_mutex_unlock(&engine->cache_lock);
936 }
937 
938 /*
939  * Dumps part of the cache
940  */
item_cachedump(struct default_engine * engine,unsigned int slabs_clsid,unsigned int limit,unsigned int * bytes)941 char *item_cachedump(struct default_engine *engine,
942                      unsigned int slabs_clsid,
943                      unsigned int limit,
944                      unsigned int *bytes) {
945     char *ret;
946 
947     pthread_mutex_lock(&engine->cache_lock);
948     ret = do_item_cachedump(slabs_clsid, limit, bytes);
949     pthread_mutex_unlock(&engine->cache_lock);
950     return ret;
951 }
952 
item_stats(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)953 void item_stats(struct default_engine *engine,
954                    ADD_STAT add_stat, const void *cookie)
955 {
956     pthread_mutex_lock(&engine->cache_lock);
957     do_item_stats(engine, add_stat, cookie);
958     pthread_mutex_unlock(&engine->cache_lock);
959 }
960 
961 
item_stats_sizes(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)962 void item_stats_sizes(struct default_engine *engine,
963                       ADD_STAT add_stat, const void *cookie)
964 {
965     pthread_mutex_lock(&engine->cache_lock);
966     do_item_stats_sizes(engine, add_stat, cookie);
967     pthread_mutex_unlock(&engine->cache_lock);
968 }
969 
do_item_link_cursor(struct default_engine * engine,hash_item * cursor,int ii)970 static void do_item_link_cursor(struct default_engine *engine,
971                                 hash_item *cursor, int ii)
972 {
973     cursor->slabs_clsid = (uint8_t)ii;
974     cursor->next = NULL;
975     cursor->prev = engine->items.tails[ii];
976     engine->items.tails[ii]->next = cursor;
977     engine->items.tails[ii] = cursor;
978     engine->items.sizes[ii]++;
979 }
980 
981 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
982                                       hash_item *item, void *cookie);
983 
do_item_walk_cursor(struct default_engine * engine,hash_item * cursor,int steplength,ITERFUNC itemfunc,void * itemdata,ENGINE_ERROR_CODE * error)984 static bool do_item_walk_cursor(struct default_engine *engine,
985                                 hash_item *cursor,
986                                 int steplength,
987                                 ITERFUNC itemfunc,
988                                 void* itemdata,
989                                 ENGINE_ERROR_CODE *error)
990 {
991     int ii = 0;
992     *error = ENGINE_SUCCESS;
993 
994     while (cursor->prev != NULL && ii < steplength) {
995         ++ii;
996         /* Move cursor */
997         hash_item *ptr = cursor->prev;
998         item_unlink_q(engine, cursor);
999 
1000         bool done = false;
1001         if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1002             done = true;
1003             cursor->prev = NULL;
1004         } else {
1005             cursor->next = ptr;
1006             cursor->prev = ptr->prev;
1007             cursor->prev->next = cursor;
1008             ptr->prev = cursor;
1009         }
1010 
1011         /* Ignore cursors */
1012         if (ptr->nkey == 0 && ptr->nbytes == 0) {
1013             --ii;
1014         } else {
1015             *error = itemfunc(engine, ptr, itemdata);
1016             if (*error != ENGINE_SUCCESS) {
1017                 return false;
1018             }
1019         }
1020 
1021         if (done) {
1022             return false;
1023         }
1024     }
1025 
1026     return (cursor->prev != NULL);
1027 }
1028 
item_scrub(struct default_engine * engine,hash_item * item,void * cookie)1029 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1030                                     hash_item *item,
1031                                     void *cookie) {
1032     (void)cookie;
1033     engine->scrubber.visited++;
1034     rel_time_t current_time = engine->server.core->get_current_time();
1035     if (item->refcount == 0 &&
1036         (item->exptime != 0 && item->exptime < current_time)) {
1037         do_item_unlink(engine, item);
1038         engine->scrubber.cleaned++;
1039     }
1040     return ENGINE_SUCCESS;
1041 }
1042 
item_scrub_class(struct default_engine * engine,hash_item * cursor)1043 static void item_scrub_class(struct default_engine *engine,
1044                              hash_item *cursor) {
1045 
1046     ENGINE_ERROR_CODE ret;
1047     bool more;
1048     do {
1049         pthread_mutex_lock(&engine->cache_lock);
1050         more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1051         pthread_mutex_unlock(&engine->cache_lock);
1052         if (ret != ENGINE_SUCCESS) {
1053             break;
1054         }
1055     } while (more);
1056 }
1057 
item_scubber_main(void * arg)1058 static void *item_scubber_main(void *arg)
1059 {
1060     struct default_engine *engine = arg;
1061     hash_item cursor = { .refcount = 1 };
1062 
1063     for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1064         pthread_mutex_lock(&engine->cache_lock);
1065         bool skip = false;
1066         if (engine->items.heads[ii] == NULL) {
1067             skip = true;
1068         } else {
1069             // add the item at the tail
1070             do_item_link_cursor(engine, &cursor, ii);
1071         }
1072         pthread_mutex_unlock(&engine->cache_lock);
1073 
1074         if (!skip) {
1075             item_scrub_class(engine, &cursor);
1076         }
1077     }
1078 
1079     pthread_mutex_lock(&engine->scrubber.lock);
1080     engine->scrubber.stopped = time(NULL);
1081     engine->scrubber.running = false;
1082     pthread_mutex_unlock(&engine->scrubber.lock);
1083 
1084     return NULL;
1085 }
1086 
item_start_scrub(struct default_engine * engine)1087 bool item_start_scrub(struct default_engine *engine)
1088 {
1089     bool ret = false;
1090     pthread_mutex_lock(&engine->scrubber.lock);
1091     if (!engine->scrubber.running) {
1092         engine->scrubber.started = time(NULL);
1093         engine->scrubber.stopped = 0;
1094         engine->scrubber.visited = 0;
1095         engine->scrubber.cleaned = 0;
1096         engine->scrubber.running = true;
1097 
1098         pthread_t t;
1099         pthread_attr_t attr;
1100 
1101         if (pthread_attr_init(&attr) != 0 ||
1102             pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1103             pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1104         {
1105             engine->scrubber.running = false;
1106         } else {
1107             ret = true;
1108         }
1109     }
1110     pthread_mutex_unlock(&engine->scrubber.lock);
1111 
1112     return ret;
1113 }
1114 
1115 struct tap_client {
1116     hash_item cursor;
1117     hash_item *it;
1118 };
1119 
item_tap_iterfunc(struct default_engine * engine,hash_item * item,void * cookie)1120 static ENGINE_ERROR_CODE item_tap_iterfunc(struct default_engine *engine,
1121                                     hash_item *item,
1122                                     void *cookie) {
1123     struct tap_client *client = cookie;
1124     client->it = item;
1125     ++client->it->refcount;
1126     return ENGINE_SUCCESS;
1127 }
1128 
do_item_tap_walker(struct default_engine * engine,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)1129 static tap_event_t do_item_tap_walker(struct default_engine *engine,
1130                                          const void *cookie, item **itm,
1131                                          void **es, uint16_t *nes, uint8_t *ttl,
1132                                          uint16_t *flags, uint32_t *seqno,
1133                                          uint16_t *vbucket)
1134 {
1135     struct tap_client *client = engine->server.cookie->get_engine_specific(cookie);
1136     if (client == NULL) {
1137         return TAP_DISCONNECT;
1138     }
1139 
1140     *es = NULL;
1141     *nes = 0;
1142     *ttl = (uint8_t)-1;
1143     *seqno = 0;
1144     *flags = 0;
1145     *vbucket = 0;
1146     client->it = NULL;
1147 
1148     ENGINE_ERROR_CODE r;
1149     do {
1150         if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
1151             // find next slab class to look at..
1152             bool linked = false;
1153             for (int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked;  ++ii) {
1154                 if (engine->items.heads[ii] != NULL) {
1155                     // add the item at the tail
1156                     do_item_link_cursor(engine, &client->cursor, ii);
1157                     linked = true;
1158                 }
1159             }
1160             if (!linked) {
1161                 break;
1162             }
1163         }
1164     } while (client->it == NULL);
1165     *itm = client->it;
1166 
1167     return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
1168 }
1169 
item_tap_walker(ENGINE_HANDLE * handle,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)1170 tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
1171                             const void *cookie, item **itm,
1172                             void **es, uint16_t *nes, uint8_t *ttl,
1173                             uint16_t *flags, uint32_t *seqno,
1174                             uint16_t *vbucket)
1175 {
1176     tap_event_t ret;
1177     struct default_engine *engine = (struct default_engine*)handle;
1178     pthread_mutex_lock(&engine->cache_lock);
1179     ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1180     pthread_mutex_unlock(&engine->cache_lock);
1181 
1182     return ret;
1183 }
1184 
initialize_item_tap_walker(struct default_engine * engine,const void * cookie)1185 bool initialize_item_tap_walker(struct default_engine *engine,
1186                                 const void* cookie)
1187 {
1188     struct tap_client *client = calloc(1, sizeof(*client));
1189     if (client == NULL) {
1190         return false;
1191     }
1192     client->cursor.refcount = 1;
1193 
1194     /* Link the cursor! */
1195     bool linked = false;
1196     for (int ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1197         pthread_mutex_lock(&engine->cache_lock);
1198         if (engine->items.heads[ii] != NULL) {
1199             // add the item at the tail
1200             do_item_link_cursor(engine, &client->cursor, ii);
1201             linked = true;
1202         }
1203         pthread_mutex_unlock(&engine->cache_lock);
1204     }
1205 
1206     engine->server.cookie->store_engine_specific(cookie, client);
1207     return true;
1208 }
1209