1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *  Modifications Copyright (c) 2013, 2021, Oracle and/or its affiliates.
4  *  All rights reserved.
5  */
6 #include "config.h"
7 #include <fcntl.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <string.h>
12 #include <time.h>
13 #include <assert.h>
14 #include <inttypes.h>
15 
16 #include "default_engine.h"
17 
18 #define INNODB_MEMCACHED
19 /* Forward Declarations */
20 static void item_link_q(struct default_engine *engine, hash_item *it);
21 static void item_unlink_q(struct default_engine *engine, hash_item *it);
22 static hash_item *do_item_alloc(struct default_engine *engine,
23                                 const void *key, const size_t nkey,
24                                 const int flags, const rel_time_t exptime,
25                                 const int nbytes,
26                                 const void *cookie);
27 static hash_item *do_item_get(struct default_engine *engine,
28                               const char *key, const size_t nkey);
29 static int do_item_link(struct default_engine *engine, hash_item *it);
30 static void do_item_unlink(struct default_engine *engine, hash_item *it);
31 static void do_item_release(struct default_engine *engine, hash_item *it);
32 static void do_item_update(struct default_engine *engine, hash_item *it);
33 static int do_item_replace(struct default_engine *engine,
34                             hash_item *it, hash_item *new_it);
35 static void item_free(struct default_engine *engine, hash_item *it);
36 
37 /*
38  * We only reposition items in the LRU queue if they haven't been repositioned
39  * in this many seconds. That saves us from churning on frequently-accessed
40  * items.
41  */
42 #define ITEM_UPDATE_INTERVAL 60
43 
44 void cache_set_initial_cas_id(uint64_t cas);
45 static uint64_t cas_id;
46 
item_stats_reset(struct default_engine * engine)47 void item_stats_reset(struct default_engine *engine) {
48     pthread_mutex_lock(&engine->cache_lock);
49     memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
50     pthread_mutex_unlock(&engine->cache_lock);
51 }
52 
53 
54 /* warning: don't use these macros with a function, as it evals its arg twice */
ITEM_ntotal(struct default_engine * engine,const hash_item * item)55 static inline size_t ITEM_ntotal(struct default_engine *engine,
56                                  const hash_item *item) {
57     size_t ret = sizeof(*item) + item->nkey + item->nbytes;
58     if (engine->config.use_cas) {
59         ret += sizeof(uint64_t);
60     }
61 
62     return ret;
63 }
64 
65 /* Set the initial CAS id for the hash table */
cache_set_initial_cas_id(uint64_t cas)66 void cache_set_initial_cas_id(uint64_t cas) {
67   cas_id = cas;
68 }
69 
70 /* Get the next CAS id for a new item. */
get_cas_id(void)71 static uint64_t get_cas_id(void) {
72     return ++cas_id;
73 }
74 
75 /* Enable this for reference-count debugging. */
76 #if 0
77 # define DEBUG_REFCNT(it,op) \
78                 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
79                         it, op, it->refcount, \
80                         (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
81                         (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
82 #else
83 # define DEBUG_REFCNT(it,op) while(0)
84 #endif
85 
86 
87 #ifdef INNODB_MEMCACHED /* INNODB_MEMCACHED */
88 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)89 hash_item *do_item_alloc(struct default_engine *engine,
90                          const void *key,
91                          const size_t nkey,
92                          const int flags,
93                          const rel_time_t exptime,
94                          const int nbytes,
95                          const void *cookie) {
96     hash_item *it = NULL;
97     // Avoid potential underflows.
98     if (nbytes < 0)
99         return 0;
100 
101     size_t ntotal = sizeof(hash_item) + nkey + nbytes;
102     if (engine->config.use_cas) {
103         ntotal += sizeof(uint64_t);
104     }
105 
106     unsigned int id = slabs_clsid(engine, ntotal);
107     if (id == 0)
108         return 0;
109 
110     if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
111         return NULL;
112     }
113 
114     assert(it->slabs_clsid == 0);
115 
116     it->slabs_clsid = id;
117 
118     assert(it != engine->items.heads[it->slabs_clsid]);
119 
120     it->next = it->prev = it->h_next = 0;
121     it->refcount = 1;     /* the caller will have a reference */
122     DEBUG_REFCNT(it, '*');
123     it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
124     it->nkey = nkey;
125     it->nbytes = nbytes;
126     it->flags = flags;
127     memcpy((void*)item_get_key(it), key, nkey);
128     it->exptime = exptime;
129     return it;
130 }
131 
132 #else /* INNODB_MEMCACHED */
133 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)134 hash_item *do_item_alloc(struct default_engine *engine,
135                          const void *key,
136                          const size_t nkey,
137                          const int flags,
138                          const rel_time_t exptime,
139                          const int nbytes,
140                          const void *cookie) {
141     hash_item *it = NULL;
142     // Avoid potential underflows.
143     if (nbytes < 0)
144         return 0;
145 
146     size_t ntotal = sizeof(hash_item) + nkey + nbytes;
147     if (engine->config.use_cas) {
148         ntotal += sizeof(uint64_t);
149     }
150 
151     unsigned int id = slabs_clsid(engine, ntotal);
152     if (id == 0)
153         return 0;
154 
155     /* do a quick check if we have any expired items in the tail.. */
156     int tries = 50;
157     hash_item *search;
158 
159     rel_time_t current_time = engine->server.core->get_current_time();
160 
161     for (search = engine->items.tails[id];
162          tries > 0 && search != NULL;
163          tries--, search=search->prev) {
164         if (search->refcount == 0 &&
165             (search->exptime != 0 && search->exptime < current_time)) {
166             it = search;
167             /* I don't want to actually free the object, just steal
168              * the item to avoid to grab the slab mutex twice ;-)
169              */
170             pthread_mutex_lock(&engine->stats.lock);
171             engine->stats.reclaimed++;
172             pthread_mutex_unlock(&engine->stats.lock);
173             engine->items.itemstats[id].reclaimed++;
174             it->refcount = 1;
175             do_item_unlink(engine, it);
176             /* Initialize the item block: */
177             it->slabs_clsid = 0;
178             it->refcount = 0;
179             break;
180         }
181     }
182 
183     if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
184         /*
185         ** Could not find an expired item at the tail, and memory allocation
186         ** failed. Try to evict some items!
187         */
188         tries = 50;
189 
190         /* If requested to not push old items out of cache when memory runs out,
191          * we're out of luck at this point...
192          */
193 
194         if (engine->config.evict_to_free == 0) {
195             engine->items.itemstats[id].outofmemory++;
196             return NULL;
197         }
198 
199         /*
200          * try to get one off the right LRU
201          * don't necessariuly unlink the tail because it may be locked: refcount>0
202          * search up from tail an item with refcount==0 and unlink it; give up after 50
203          * tries
204          */
205 
206         if (engine->items.tails[id] == 0) {
207             engine->items.itemstats[id].outofmemory++;
208             return NULL;
209         }
210 
211         for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
212             if (search->refcount == 0) {
213                 if (search->exptime == 0 || search->exptime > current_time) {
214                     engine->items.itemstats[id].evicted++;
215                     engine->items.itemstats[id].evicted_time = current_time - search->time;
216                     if (search->exptime != 0) {
217                         engine->items.itemstats[id].evicted_nonzero++;
218                     }
219                     pthread_mutex_lock(&engine->stats.lock);
220                     engine->stats.evictions++;
221                     pthread_mutex_unlock(&engine->stats.lock);
222                     engine->server.stat->evicting(cookie,
223                                                   item_get_key(search),
224                                                   search->nkey);
225                 } else {
226                     engine->items.itemstats[id].reclaimed++;
227                     pthread_mutex_lock(&engine->stats.lock);
228                     engine->stats.reclaimed++;
229                     pthread_mutex_unlock(&engine->stats.lock);
230                 }
231                 do_item_unlink(engine, search);
232                 break;
233             }
234         }
235         it = slabs_alloc(engine, ntotal, id);
236         if (it == 0) {
237             engine->items.itemstats[id].outofmemory++;
238             /* Last ditch effort. There is a very rare bug which causes
239              * refcount leaks. We've fixed most of them, but it still happens,
240              * and it may happen in the future.
241              * We can reasonably assume no item can stay locked for more than
242              * three hours, so if we find one in the tail which is that old,
243              * free it anyway.
244              */
245             tries = 50;
246             for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
247                 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
248                     engine->items.itemstats[id].tailrepairs++;
249                     search->refcount = 0;
250                     do_item_unlink(engine, search);
251                     break;
252                 }
253             }
254             it = slabs_alloc(engine, ntotal, id);
255             if (it == 0) {
256                 return NULL;
257             }
258         }
259     }
260 
261     assert(it->slabs_clsid == 0);
262 
263     it->slabs_clsid = id;
264 
265     assert(it != engine->items.heads[it->slabs_clsid]);
266 
267     it->next = it->prev = it->h_next = 0;
268     it->refcount = 1;     /* the caller will have a reference */
269     DEBUG_REFCNT(it, '*');
270     it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
271     it->nkey = nkey;
272     it->nbytes = nbytes;
273     it->flags = flags;
274     memcpy((void*)item_get_key(it), key, nkey);
275     it->exptime = exptime;
276     return it;
277 }
278 #endif /* INNODB_MEMCACHED */
279 
item_free(struct default_engine * engine,hash_item * it)280 static void item_free(struct default_engine *engine, hash_item *it) {
281     size_t ntotal = ITEM_ntotal(engine, it);
282     unsigned int clsid;
283     assert((it->iflag & ITEM_LINKED) == 0);
284     assert(it != engine->items.heads[it->slabs_clsid]);
285     assert(it != engine->items.tails[it->slabs_clsid]);
286     assert(it->refcount == 0);
287 
288     /* so slab size changer can tell later if item is already free or not */
289     clsid = it->slabs_clsid;
290     it->slabs_clsid = 0;
291     it->iflag |= ITEM_SLABBED;
292     DEBUG_REFCNT(it, 'F');
293     slabs_free(engine, it, ntotal, clsid);
294 }
295 
item_link_q(struct default_engine * engine,hash_item * it)296 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
297     hash_item **head, **tail;
298     assert(it->slabs_clsid < POWER_LARGEST);
299     assert((it->iflag & ITEM_SLABBED) == 0);
300 
301     head = &engine->items.heads[it->slabs_clsid];
302     tail = &engine->items.tails[it->slabs_clsid];
303     assert(it != *head);
304     assert((*head && *tail) || (*head == 0 && *tail == 0));
305     it->prev = 0;
306     it->next = *head;
307     if (it->next) it->next->prev = it;
308     *head = it;
309     if (*tail == 0) *tail = it;
310     engine->items.sizes[it->slabs_clsid]++;
311     return;
312 }
313 
item_unlink_q(struct default_engine * engine,hash_item * it)314 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
315     hash_item **head, **tail;
316     assert(it->slabs_clsid < POWER_LARGEST);
317     head = &engine->items.heads[it->slabs_clsid];
318     tail = &engine->items.tails[it->slabs_clsid];
319 
320     if (*head == it) {
321         assert(it->prev == 0);
322         *head = it->next;
323     }
324     if (*tail == it) {
325         assert(it->next == 0);
326         *tail = it->prev;
327     }
328     assert(it->next != it);
329     assert(it->prev != it);
330 
331     if (it->next) it->next->prev = it->prev;
332     if (it->prev) it->prev->next = it->next;
333     engine->items.sizes[it->slabs_clsid]--;
334     return;
335 }
336 
do_item_link(struct default_engine * engine,hash_item * it)337 int do_item_link(struct default_engine *engine, hash_item *it) {
338     MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
339     assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
340     assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
341     it->iflag |= ITEM_LINKED;
342     it->time = engine->server.core->get_current_time();
343     assoc_insert(engine, engine->server.core->hash(item_get_key(it),
344                                                         it->nkey, 0),
345                  it);
346 
347     pthread_mutex_lock(&engine->stats.lock);
348     engine->stats.curr_bytes += ITEM_ntotal(engine, it);
349     engine->stats.curr_items += 1;
350     engine->stats.total_items += 1;
351     pthread_mutex_unlock(&engine->stats.lock);
352 
353     /* Allocate a new CAS ID on link. */
354     item_set_cas(NULL, NULL, it, get_cas_id());
355 
356     item_link_q(engine, it);
357 
358     return 1;
359 }
360 
do_item_unlink(struct default_engine * engine,hash_item * it)361 void do_item_unlink(struct default_engine *engine, hash_item *it) {
362     MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
363     if ((it->iflag & ITEM_LINKED) != 0) {
364         it->iflag &= ~ITEM_LINKED;
365         pthread_mutex_lock(&engine->stats.lock);
366         engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
367         engine->stats.curr_items -= 1;
368         pthread_mutex_unlock(&engine->stats.lock);
369         assoc_delete(engine, engine->server.core->hash(item_get_key(it),
370                                                             it->nkey, 0),
371                      item_get_key(it), it->nkey);
372         item_unlink_q(engine, it);
373         if (it->refcount == 0) {
374             item_free(engine, it);
375         }
376     }
377 }
378 
do_item_release(struct default_engine * engine,hash_item * it)379 void do_item_release(struct default_engine *engine, hash_item *it) {
380     MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
381     if (it->refcount != 0) {
382         it->refcount--;
383         DEBUG_REFCNT(it, '-');
384     }
385     if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
386         item_free(engine, it);
387     }
388 }
389 
do_item_update(struct default_engine * engine,hash_item * it)390 void do_item_update(struct default_engine *engine, hash_item *it) {
391     rel_time_t current_time = engine->server.core->get_current_time();
392     MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
393     if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
394         assert((it->iflag & ITEM_SLABBED) == 0);
395 
396         if ((it->iflag & ITEM_LINKED) != 0) {
397             item_unlink_q(engine, it);
398             it->time = current_time;
399             item_link_q(engine, it);
400         }
401     }
402 }
403 
do_item_replace(struct default_engine * engine,hash_item * it,hash_item * new_it)404 int do_item_replace(struct default_engine *engine,
405                     hash_item *it, hash_item *new_it) {
406     MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
407                            item_get_key(new_it), new_it->nkey, new_it->nbytes);
408     assert((it->iflag & ITEM_SLABBED) == 0);
409 
410     do_item_unlink(engine, it);
411     return do_item_link(engine, new_it);
412 }
413 
414 /*@null@*/
do_item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)415 static char *do_item_cachedump(const unsigned int slabs_clsid,
416                                const unsigned int limit,
417                                unsigned int *bytes) {
418 #ifdef FUTURE
419     unsigned int memlimit = 2 * 1024 * 1024;   /* 2MB max response size */
420     char *buffer;
421     unsigned int bufcurr;
422     hash_item *it;
423     unsigned int len;
424     unsigned int shown = 0;
425     char key_temp[KEY_MAX_LENGTH + 1];
426     char temp[512];
427 
428     it = engine->items.heads[slabs_clsid];
429 
430     buffer = malloc((size_t)memlimit);
431     if (buffer == 0) return NULL;
432     bufcurr = 0;
433 
434 
435     while (it != NULL && (limit == 0 || shown < limit)) {
436         assert(it->nkey <= KEY_MAX_LENGTH);
437         /* Copy the key since it may not be null-terminated in the struct */
438         strncpy(key_temp, item_get_key(it), it->nkey);
439         key_temp[it->nkey] = 0x00; /* terminate */
440         len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
441                        key_temp, it->nbytes - 2,
442                        (unsigned long)it->exptime + process_started);
443         if (bufcurr + len + 6 > memlimit)  /* 6 is END\r\n\0 */
444             break;
445         memcpy(buffer + bufcurr, temp, len);
446         bufcurr += len;
447         shown++;
448         it = it->next;
449     }
450 
451 
452     memcpy(buffer + bufcurr, "END\r\n", 6);
453     bufcurr += 5;
454 
455     *bytes = bufcurr;
456     return buffer;
457 #endif
458     (void)slabs_clsid;
459     (void)limit;
460     (void)bytes;
461     return NULL;
462 }
463 
do_item_stats(struct default_engine * engine,ADD_STAT add_stats,const void * c)464 static void do_item_stats(struct default_engine *engine,
465                           ADD_STAT add_stats, const void *c) {
466     int i;
467     for (i = 0; i < POWER_LARGEST; i++) {
468         if (engine->items.tails[i] != NULL) {
469             const char *prefix = "items";
470             add_statistics(c, add_stats, prefix, i, "number", "%u",
471                            engine->items.sizes[i]);
472             add_statistics(c, add_stats, prefix, i, "age", "%u",
473                            engine->items.tails[i]->time);
474             add_statistics(c, add_stats, prefix, i, "evicted",
475                            "%u", engine->items.itemstats[i].evicted);
476             add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
477                            "%u", engine->items.itemstats[i].evicted_nonzero);
478             add_statistics(c, add_stats, prefix, i, "evicted_time",
479                            "%u", engine->items.itemstats[i].evicted_time);
480             add_statistics(c, add_stats, prefix, i, "outofmemory",
481                            "%u", engine->items.itemstats[i].outofmemory);
482             add_statistics(c, add_stats, prefix, i, "tailrepairs",
483                            "%u", engine->items.itemstats[i].tailrepairs);;
484             add_statistics(c, add_stats, prefix, i, "reclaimed",
485                            "%u", engine->items.itemstats[i].reclaimed);;
486         }
487     }
488 }
489 
490 /** dumps out a list of objects of each size, with granularity of 32 bytes */
491 /*@null@*/
do_item_stats_sizes(struct default_engine * engine,ADD_STAT add_stats,const void * c)492 static void do_item_stats_sizes(struct default_engine *engine,
493                                 ADD_STAT add_stats, const void *c) {
494 
495     /* max 1MB object, divided into 32 bytes size buckets */
496     const int num_buckets = 32768;
497     unsigned int *histogram = calloc(num_buckets, sizeof(int));
498 
499     if (histogram != NULL) {
500         int i;
501 
502         /* build the histogram */
503         for (i = 0; i < POWER_LARGEST; i++) {
504             hash_item *iter = engine->items.heads[i];
505             while (iter) {
506                 int ntotal = ITEM_ntotal(engine, iter);
507                 int bucket = ntotal / 32;
508                 if ((ntotal % 32) != 0) bucket++;
509                 if (bucket < num_buckets) histogram[bucket]++;
510                 iter = iter->next;
511             }
512         }
513 
514         /* write the buffer */
515         for (i = 0; i < num_buckets; i++) {
516             if (histogram[i] != 0) {
517                 char key[8], val[32];
518                 int klen, vlen;
519                 klen = snprintf(key, sizeof(key), "%d", i * 32);
520                 vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
521                 assert(klen < sizeof(key));
522                 assert(vlen < sizeof(val));
523                 add_stats(key, klen, val, vlen, c);
524             }
525         }
526         free(histogram);
527     }
528 }
529 
530 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(struct default_engine * engine,const char * key,const size_t nkey)531 hash_item *do_item_get(struct default_engine *engine,
532                        const char *key, const size_t nkey) {
533     rel_time_t current_time = engine->server.core->get_current_time();
534     hash_item *it = assoc_find(engine, engine->server.core->hash(key,
535                                                                       nkey, 0),
536                                key, nkey);
537     int was_found = 0;
538 
539     if (engine->config.verbose > 2) {
540         if (it == NULL) {
541             fprintf(stderr, "> NOT FOUND %s", key);
542         } else {
543             fprintf(stderr, "> FOUND KEY %s", (const char*)item_get_key(it));
544             was_found++;
545         }
546     }
547 
548     if (it != NULL && engine->config.oldest_live != 0 &&
549         engine->config.oldest_live <= current_time &&
550         it->time <= engine->config.oldest_live) {
551         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
552         it = NULL;
553     }
554 
555     if (it == NULL && was_found) {
556         fprintf(stderr, " -nuked by flush");
557         was_found--;
558     }
559 
560     if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
561         do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
562         it = NULL;
563     }
564 
565     if (it == NULL && was_found) {
566         fprintf(stderr, " -nuked by expire");
567         was_found--;
568     }
569 
570     if (it != NULL) {
571         it->refcount++;
572         DEBUG_REFCNT(it, '+');
573         do_item_update(engine, it);
574     }
575 
576     if (engine->config.verbose > 2)
577         fprintf(stderr, "\n");
578 
579     return it;
580 }
581 
582 /*
583  * Stores an item in the cache according to the semantics of one of the set
584  * commands. In threaded mode, this is protected by the cache lock.
585  *
586  * Returns the state of storage.
587  */
do_store_item(struct default_engine * engine,hash_item * it,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)588 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
589                                        hash_item *it, uint64_t *cas,
590                                        ENGINE_STORE_OPERATION operation,
591                                        const void *cookie) {
592     const char *key = item_get_key(it);
593     hash_item *old_it = do_item_get(engine, key, it->nkey);
594     ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
595 
596     hash_item *new_it = NULL;
597 
598     if (old_it != NULL && operation == OPERATION_ADD) {
599         /* add only adds a nonexistent item, but promote to head of LRU */
600         do_item_update(engine, old_it);
601     } else if (!old_it && (operation == OPERATION_REPLACE
602         || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
603     {
604         /* replace only replaces an existing value; don't store */
605     } else if (operation == OPERATION_CAS) {
606         /* validate cas operation */
607         if(old_it == NULL) {
608             // LRU expired
609             stored = ENGINE_KEY_ENOENT;
610         }
611         else if (item_get_cas(it) == item_get_cas(old_it)) {
612             // cas validates
613             // it and old_it may belong to different classes.
614             // I'm updating the stats for the one that's getting pushed out
615             do_item_replace(engine, old_it, it);
616             stored = ENGINE_SUCCESS;
617         } else {
618             if (engine->config.verbose > 1) {
619                 fprintf(stderr,
620                         "CAS:  failure: expected %"PRIu64", got %"PRIu64"\n",
621                         item_get_cas(old_it),
622                         item_get_cas(it));
623             }
624             stored = ENGINE_KEY_EEXISTS;
625         }
626     } else {
627         /*
628          * Append - combine new and old record into single one. Here it's
629          * atomic and thread-safe.
630          */
631         if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
632             /*
633              * Validate CAS
634              */
635             if (item_get_cas(it) != 0) {
636                 // CAS much be equal
637                 if (item_get_cas(it) != item_get_cas(old_it)) {
638                     stored = ENGINE_KEY_EEXISTS;
639                 }
640             }
641 
642             if (stored == ENGINE_NOT_STORED) {
643                 /* we have it and old_it here - alloc memory to hold both */
644                 new_it = do_item_alloc(engine, key, it->nkey,
645                                        old_it->flags,
646                                        old_it->exptime,
647                                        it->nbytes + old_it->nbytes - 2 /* CRLF */,
648                                        cookie);
649 
650                 if (new_it == NULL) {
651                     /* SERVER_ERROR out of memory */
652                     if (old_it != NULL) {
653                         do_item_release(engine, old_it);
654                     }
655 
656                     return ENGINE_NOT_STORED;
657                 }
658 
659                 /* copy data from it and old_it to new_it */
660 
661                 if (operation == OPERATION_APPEND) {
662                     memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
663                     memcpy(item_get_data(new_it) + old_it->nbytes - 2 /* CRLF */, item_get_data(it), it->nbytes);
664                 } else {
665                     /* OPERATION_PREPEND */
666                     memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
667                     memcpy(item_get_data(new_it) + it->nbytes - 2 /* CRLF */, item_get_data(old_it), old_it->nbytes);
668                 }
669 
670                 it = new_it;
671             }
672         }
673 
674         if (stored == ENGINE_NOT_STORED) {
675             if (old_it != NULL) {
676                 do_item_replace(engine, old_it, it);
677             } else {
678                 do_item_link(engine, it);
679             }
680 
681             *cas = item_get_cas(it);
682             stored = ENGINE_SUCCESS;
683         }
684     }
685 
686     if (old_it != NULL) {
687         do_item_release(engine, old_it);         /* release our reference */
688     }
689 
690     if (new_it != NULL) {
691         do_item_release(engine, new_it);
692     }
693 
694     if (stored == ENGINE_SUCCESS) {
695         *cas = item_get_cas(it);
696     }
697 
698     return stored;
699 }
700 
701 
702 /*
703  * adds a delta value to a numeric item.
704  *
705  * c     connection requesting the operation
706  * it    item to adjust
707  * incr  true to increment value, false to decrement
708  * delta amount to adjust value by
709  * buf   buffer for response string
710  *
711  * returns a response string to send back to the client.
712  */
do_add_delta(struct default_engine * engine,hash_item * it,const bool incr,const int64_t delta,uint64_t * rcas,uint64_t * result,const void * cookie)713 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
714                                       hash_item *it, const bool incr,
715                                       const int64_t delta, uint64_t *rcas,
716                                       uint64_t *result, const void *cookie) {
717     const char *ptr;
718     uint64_t value;
719     int res;
720 
721     ptr = item_get_data(it);
722 
723     if (!safe_strtoull(ptr, &value)) {
724         return ENGINE_EINVAL;
725     }
726 
727     if (incr) {
728         value += delta;
729     } else {
730         if(delta > value) {
731             value = 0;
732         } else {
733             value -= delta;
734         }
735     }
736 
737     *result = value;
738     char buf[80];
739     if ((res = snprintf(buf, sizeof(buf), "%" PRIu64 "\r\n", value)) == -1) {
740         return ENGINE_EINVAL;
741     }
742     hash_item *new_it = do_item_alloc(engine, item_get_key(it),
743                                       it->nkey, it->flags,
744                                       it->exptime, res,
745                                       cookie );
746     if (new_it == 0) {
747         do_item_unlink(engine, it);
748         return ENGINE_ENOMEM;
749     }
750     memcpy(item_get_data(new_it), buf, res);
751     do_item_replace(engine, it, new_it);
752     *rcas = item_get_cas(new_it);
753     do_item_release(engine, new_it);       /* release our reference */
754 
755     return ENGINE_SUCCESS;
756 }
757 
758 /********************************* ITEM ACCESS *******************************/
759 #ifdef INNODB_MEMCACHED
760 
761 /*
762  * Allocates a new item.
763  */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)764 hash_item *item_alloc(struct default_engine *engine,
765                       const void *key, size_t nkey, int flags,
766                       rel_time_t exptime, int nbytes, const void *cookie) {
767     hash_item *it;
768     it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
769     return it;
770 }
771 #else /* INNODB_MEMCAHED */
772 /*
773  * Allocates a new item.
774  */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)775 hash_item *item_alloc(struct default_engine *engine,
776                       const void *key, size_t nkey, int flags,
777                       rel_time_t exptime, int nbytes, const void *cookie) {
778     hash_item *it;
779     pthread_mutex_lock(&engine->cache_lock);
780     it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
781     pthread_mutex_unlock(&engine->cache_lock);
782     return it;
783 }
784 #endif /* INNODB_MEMCAHED */
785 
786 /*
787  * Returns an item if it hasn't been marked as expired,
788  * lazy-expiring as needed.
789  */
item_get(struct default_engine * engine,const void * key,const size_t nkey)790 hash_item *item_get(struct default_engine *engine,
791                     const void *key, const size_t nkey) {
792     hash_item *it;
793     pthread_mutex_lock(&engine->cache_lock);
794     it = do_item_get(engine, key, nkey);
795     pthread_mutex_unlock(&engine->cache_lock);
796     return it;
797 }
798 
799 /*
800  * Decrements the reference count on an item and adds it to the freelist if
801  * needed.
802  */
item_release(struct default_engine * engine,hash_item * item)803 void item_release(struct default_engine *engine, hash_item *item) {
804     pthread_mutex_lock(&engine->cache_lock);
805     do_item_release(engine, item);
806     pthread_mutex_unlock(&engine->cache_lock);
807 }
808 
809 /*
810  * Unlinks an item from the LRU and hashtable.
811  */
item_unlink(struct default_engine * engine,hash_item * item)812 void item_unlink(struct default_engine *engine, hash_item *item) {
813     pthread_mutex_lock(&engine->cache_lock);
814     do_item_unlink(engine, item);
815     pthread_mutex_unlock(&engine->cache_lock);
816 }
817 
do_arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)818 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
819                                        const void* cookie,
820                                        const void* key,
821                                        const int nkey,
822                                        const bool increment,
823                                        const bool create,
824                                        const uint64_t delta,
825                                        const uint64_t initial,
826                                        const rel_time_t exptime,
827                                        uint64_t *cas,
828                                        uint64_t *result)
829 {
830    hash_item *item = do_item_get(engine, key, nkey);
831    ENGINE_ERROR_CODE ret;
832 
833    if (item == NULL) {
834       if (!create) {
835          return ENGINE_KEY_ENOENT;
836       } else {
837          char buffer[128];
838          int len = snprintf(buffer, sizeof(buffer), "%"PRIu64"\r\n",
839                             (uint64_t)initial);
840 
841          item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
842          if (item == NULL) {
843             return ENGINE_ENOMEM;
844          }
845          memcpy((void*)item_get_data(item), buffer, len);
846          if ((ret = do_store_item(engine, item, cas,
847                                   OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
848              *result = initial;
849              *cas = item_get_cas(item);
850          }
851          do_item_release(engine, item);
852       }
853    } else {
854       ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
855       do_item_release(engine, item);
856    }
857 
858    return ret;
859 }
860 
arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)861 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
862                              const void* cookie,
863                              const void* key,
864                              const int nkey,
865                              const bool increment,
866                              const bool create,
867                              const uint64_t delta,
868                              const uint64_t initial,
869                              const rel_time_t exptime,
870                              uint64_t *cas,
871                              uint64_t *result)
872 {
873     ENGINE_ERROR_CODE ret;
874 
875     pthread_mutex_lock(&engine->cache_lock);
876     ret = do_arithmetic(engine, cookie, key, nkey, increment,
877                         create, delta, initial, exptime, cas,
878                         result);
879     pthread_mutex_unlock(&engine->cache_lock);
880     return ret;
881 }
882 
883 /*
884  * Stores an item in the cache (high level, obeys set/add/replace semantics)
885  */
store_item(struct default_engine * engine,hash_item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)886 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
887                              hash_item *item, uint64_t *cas,
888                              ENGINE_STORE_OPERATION operation,
889                              const void *cookie) {
890     ENGINE_ERROR_CODE ret;
891 
892     pthread_mutex_lock(&engine->cache_lock);
893     ret = do_store_item(engine, item, cas, operation, cookie);
894     pthread_mutex_unlock(&engine->cache_lock);
895     return ret;
896 }
897 
898 /*
899  * Flushes expired items after a flush_all call
900  */
item_flush_expired(struct default_engine * engine,time_t when)901 void item_flush_expired(struct default_engine *engine, time_t when) {
902     int i;
903     hash_item *iter, *next;
904 
905     pthread_mutex_lock(&engine->cache_lock);
906 
907     if (when == 0) {
908         engine->config.oldest_live = engine->server.core->get_current_time() - 1;
909     } else {
910         engine->config.oldest_live = engine->server.core->realtime(when) - 1;
911     }
912 
913     if (engine->config.oldest_live != 0) {
914         for (i = 0; i < POWER_LARGEST; i++) {
915             /*
916              * The LRU is sorted in decreasing time order, and an item's
917              * timestamp is never newer than its last access time, so we
918              * only need to walk back until we hit an item older than the
919              * oldest_live time.
920              * The oldest_live checking will auto-expire the remaining items.
921              */
922             for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
923                 if (iter->time >= engine->config.oldest_live) {
924                     next = iter->next;
925                     if ((iter->iflag & ITEM_SLABBED) == 0) {
926                         do_item_unlink(engine, iter);
927                     }
928                 } else {
929                     /* We've hit the first old item. Continue to the next queue. */
930                     break;
931                 }
932             }
933         }
934     }
935     pthread_mutex_unlock(&engine->cache_lock);
936 }
937 
938 /*
939  * Dumps part of the cache
940  */
item_cachedump(struct default_engine * engine,unsigned int slabs_clsid,unsigned int limit,unsigned int * bytes)941 char *item_cachedump(struct default_engine *engine,
942                      unsigned int slabs_clsid,
943                      unsigned int limit,
944                      unsigned int *bytes) {
945     char *ret;
946 
947     pthread_mutex_lock(&engine->cache_lock);
948     ret = do_item_cachedump(slabs_clsid, limit, bytes);
949     pthread_mutex_unlock(&engine->cache_lock);
950     return ret;
951 }
952 
item_stats(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)953 void item_stats(struct default_engine *engine,
954                    ADD_STAT add_stat, const void *cookie)
955 {
956     pthread_mutex_lock(&engine->cache_lock);
957     do_item_stats(engine, add_stat, cookie);
958     pthread_mutex_unlock(&engine->cache_lock);
959 }
960 
961 
item_stats_sizes(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)962 void item_stats_sizes(struct default_engine *engine,
963                       ADD_STAT add_stat, const void *cookie)
964 {
965     pthread_mutex_lock(&engine->cache_lock);
966     do_item_stats_sizes(engine, add_stat, cookie);
967     pthread_mutex_unlock(&engine->cache_lock);
968 }
969 
do_item_link_cursor(struct default_engine * engine,hash_item * cursor,int ii)970 static void do_item_link_cursor(struct default_engine *engine,
971                                 hash_item *cursor, int ii)
972 {
973     cursor->slabs_clsid = (uint8_t)ii;
974     cursor->next = NULL;
975     cursor->prev = engine->items.tails[ii];
976     engine->items.tails[ii]->next = cursor;
977     engine->items.tails[ii] = cursor;
978     engine->items.sizes[ii]++;
979 }
980 
981 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
982                                       hash_item *item, void *cookie);
983 
do_item_walk_cursor(struct default_engine * engine,hash_item * cursor,int steplength,ITERFUNC itemfunc,void * itemdata,ENGINE_ERROR_CODE * error)984 static bool do_item_walk_cursor(struct default_engine *engine,
985                                 hash_item *cursor,
986                                 int steplength,
987                                 ITERFUNC itemfunc,
988                                 void* itemdata,
989                                 ENGINE_ERROR_CODE *error)
990 {
991     int ii = 0;
992     *error = ENGINE_SUCCESS;
993 
994     while (cursor->prev != NULL && ii < steplength) {
995         ++ii;
996         /* Move cursor */
997         hash_item *ptr = cursor->prev;
998         item_unlink_q(engine, cursor);
999 
1000         bool done = false;
1001 
1002         if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1003             done = true;
1004         } else {
1005             cursor->next = ptr;
1006             cursor->prev = ptr->prev;
1007             cursor->prev->next = cursor;
1008             ptr->prev = cursor;
1009         }
1010 
1011         /* Ignore cursors */
1012         if (!(ptr->nkey == 0 && ptr->nbytes == 0)) {
1013             *error = itemfunc(engine, ptr, itemdata);
1014             if (*error != ENGINE_SUCCESS) {
1015                 return false;
1016             }
1017         }
1018 
1019         if (done) {
1020             return false;
1021         }
1022    }
1023 
1024     return true;
1025 }
1026 
item_scrub(struct default_engine * engine,hash_item * item,void * cookie)1027 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1028                                     hash_item *item,
1029                                     void *cookie) {
1030     (void)cookie;
1031     engine->scrubber.visited++;
1032     rel_time_t current_time = engine->server.core->get_current_time();
1033     if (item->refcount == 0 &&
1034         (item->exptime != 0 && item->exptime < current_time)) {
1035         do_item_unlink(engine, item);
1036         engine->scrubber.cleaned++;
1037     }
1038     return ENGINE_SUCCESS;
1039 }
1040 
item_scrub_class(struct default_engine * engine,hash_item * cursor)1041 static void item_scrub_class(struct default_engine *engine,
1042                              hash_item *cursor) {
1043 
1044     ENGINE_ERROR_CODE ret;
1045     bool more;
1046     do {
1047         pthread_mutex_lock(&engine->cache_lock);
1048         more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1049         pthread_mutex_unlock(&engine->cache_lock);
1050         if (ret != ENGINE_SUCCESS) {
1051             break;
1052         }
1053     } while (more);
1054 }
1055 
item_scubber_main(void * arg)1056 static void *item_scubber_main(void *arg)
1057 {
1058     struct default_engine *engine = arg;
1059     hash_item cursor = { .refcount = 1 };
1060 
1061     for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1062         pthread_mutex_lock(&engine->cache_lock);
1063         bool skip = false;
1064         if (engine->items.heads[ii] == NULL) {
1065             skip = true;
1066         } else {
1067             // add the item at the tail
1068             do_item_link_cursor(engine, &cursor, ii);
1069         }
1070         pthread_mutex_unlock(&engine->cache_lock);
1071 
1072         if (!skip) {
1073             item_scrub_class(engine, &cursor);
1074         }
1075     }
1076 
1077     pthread_mutex_lock(&engine->scrubber.lock);
1078     engine->scrubber.stopped = time(NULL);
1079     engine->scrubber.running = false;
1080     pthread_mutex_unlock(&engine->scrubber.lock);
1081 
1082     return NULL;
1083 }
1084 
item_start_scrub(struct default_engine * engine)1085 bool item_start_scrub(struct default_engine *engine)
1086 {
1087     bool ret = false;
1088     pthread_mutex_lock(&engine->scrubber.lock);
1089     if (!engine->scrubber.running) {
1090         engine->scrubber.started = time(NULL);
1091         engine->scrubber.stopped = 0;
1092         engine->scrubber.visited = 0;
1093         engine->scrubber.cleaned = 0;
1094         engine->scrubber.running = true;
1095 
1096         pthread_t t;
1097         pthread_attr_t attr;
1098 
1099         if (pthread_attr_init(&attr) != 0 ||
1100             pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1101             pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1102         {
1103             engine->scrubber.running = false;
1104         } else {
1105             ret = true;
1106         }
1107     }
1108     pthread_mutex_unlock(&engine->scrubber.lock);
1109 
1110     return ret;
1111 }
1112