1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Modifications Copyright (c) 2013, 2021, Oracle and/or its affiliates.
4 * All rights reserved.
5 */
6 #include "config.h"
7 #include <fcntl.h>
8 #include <errno.h>
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <string.h>
12 #include <time.h>
13 #include <assert.h>
14 #include <inttypes.h>
15
16 #include "default_engine.h"
17
18 #define INNODB_MEMCACHED
19 /* Forward Declarations */
20 static void item_link_q(struct default_engine *engine, hash_item *it);
21 static void item_unlink_q(struct default_engine *engine, hash_item *it);
22 static hash_item *do_item_alloc(struct default_engine *engine,
23 const void *key, const size_t nkey,
24 const int flags, const rel_time_t exptime,
25 const int nbytes,
26 const void *cookie);
27 static hash_item *do_item_get(struct default_engine *engine,
28 const char *key, const size_t nkey);
29 static int do_item_link(struct default_engine *engine, hash_item *it);
30 static void do_item_unlink(struct default_engine *engine, hash_item *it);
31 static void do_item_release(struct default_engine *engine, hash_item *it);
32 static void do_item_update(struct default_engine *engine, hash_item *it);
33 static int do_item_replace(struct default_engine *engine,
34 hash_item *it, hash_item *new_it);
35 static void item_free(struct default_engine *engine, hash_item *it);
36
37 /*
38 * We only reposition items in the LRU queue if they haven't been repositioned
39 * in this many seconds. That saves us from churning on frequently-accessed
40 * items.
41 */
42 #define ITEM_UPDATE_INTERVAL 60
43
44 void cache_set_initial_cas_id(uint64_t cas);
45 static uint64_t cas_id;
46
item_stats_reset(struct default_engine * engine)47 void item_stats_reset(struct default_engine *engine) {
48 pthread_mutex_lock(&engine->cache_lock);
49 memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
50 pthread_mutex_unlock(&engine->cache_lock);
51 }
52
53
54 /* warning: don't use these macros with a function, as it evals its arg twice */
ITEM_ntotal(struct default_engine * engine,const hash_item * item)55 static inline size_t ITEM_ntotal(struct default_engine *engine,
56 const hash_item *item) {
57 size_t ret = sizeof(*item) + item->nkey + item->nbytes;
58 if (engine->config.use_cas) {
59 ret += sizeof(uint64_t);
60 }
61
62 return ret;
63 }
64
65 /* Set the initial CAS id for the hash table */
cache_set_initial_cas_id(uint64_t cas)66 void cache_set_initial_cas_id(uint64_t cas) {
67 cas_id = cas;
68 }
69
70 /* Get the next CAS id for a new item. */
get_cas_id(void)71 static uint64_t get_cas_id(void) {
72 return ++cas_id;
73 }
74
75 /* Enable this for reference-count debugging. */
76 #if 0
77 # define DEBUG_REFCNT(it,op) \
78 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
79 it, op, it->refcount, \
80 (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
81 (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
82 #else
83 # define DEBUG_REFCNT(it,op) while(0)
84 #endif
85
86
87 #ifdef INNODB_MEMCACHED /* INNODB_MEMCACHED */
88 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)89 hash_item *do_item_alloc(struct default_engine *engine,
90 const void *key,
91 const size_t nkey,
92 const int flags,
93 const rel_time_t exptime,
94 const int nbytes,
95 const void *cookie) {
96 hash_item *it = NULL;
97 // Avoid potential underflows.
98 if (nbytes < 0)
99 return 0;
100
101 size_t ntotal = sizeof(hash_item) + nkey + nbytes;
102 if (engine->config.use_cas) {
103 ntotal += sizeof(uint64_t);
104 }
105
106 unsigned int id = slabs_clsid(engine, ntotal);
107 if (id == 0)
108 return 0;
109
110 if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
111 return NULL;
112 }
113
114 assert(it->slabs_clsid == 0);
115
116 it->slabs_clsid = id;
117
118 assert(it != engine->items.heads[it->slabs_clsid]);
119
120 it->next = it->prev = it->h_next = 0;
121 it->refcount = 1; /* the caller will have a reference */
122 DEBUG_REFCNT(it, '*');
123 it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
124 it->nkey = nkey;
125 it->nbytes = nbytes;
126 it->flags = flags;
127 memcpy((void*)item_get_key(it), key, nkey);
128 it->exptime = exptime;
129 return it;
130 }
131
132 #else /* INNODB_MEMCACHED */
133 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)134 hash_item *do_item_alloc(struct default_engine *engine,
135 const void *key,
136 const size_t nkey,
137 const int flags,
138 const rel_time_t exptime,
139 const int nbytes,
140 const void *cookie) {
141 hash_item *it = NULL;
142 // Avoid potential underflows.
143 if (nbytes < 0)
144 return 0;
145
146 size_t ntotal = sizeof(hash_item) + nkey + nbytes;
147 if (engine->config.use_cas) {
148 ntotal += sizeof(uint64_t);
149 }
150
151 unsigned int id = slabs_clsid(engine, ntotal);
152 if (id == 0)
153 return 0;
154
155 /* do a quick check if we have any expired items in the tail.. */
156 int tries = 50;
157 hash_item *search;
158
159 rel_time_t current_time = engine->server.core->get_current_time();
160
161 for (search = engine->items.tails[id];
162 tries > 0 && search != NULL;
163 tries--, search=search->prev) {
164 if (search->refcount == 0 &&
165 (search->exptime != 0 && search->exptime < current_time)) {
166 it = search;
167 /* I don't want to actually free the object, just steal
168 * the item to avoid to grab the slab mutex twice ;-)
169 */
170 pthread_mutex_lock(&engine->stats.lock);
171 engine->stats.reclaimed++;
172 pthread_mutex_unlock(&engine->stats.lock);
173 engine->items.itemstats[id].reclaimed++;
174 it->refcount = 1;
175 do_item_unlink(engine, it);
176 /* Initialize the item block: */
177 it->slabs_clsid = 0;
178 it->refcount = 0;
179 break;
180 }
181 }
182
183 if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
184 /*
185 ** Could not find an expired item at the tail, and memory allocation
186 ** failed. Try to evict some items!
187 */
188 tries = 50;
189
190 /* If requested to not push old items out of cache when memory runs out,
191 * we're out of luck at this point...
192 */
193
194 if (engine->config.evict_to_free == 0) {
195 engine->items.itemstats[id].outofmemory++;
196 return NULL;
197 }
198
199 /*
200 * try to get one off the right LRU
201 * don't necessariuly unlink the tail because it may be locked: refcount>0
202 * search up from tail an item with refcount==0 and unlink it; give up after 50
203 * tries
204 */
205
206 if (engine->items.tails[id] == 0) {
207 engine->items.itemstats[id].outofmemory++;
208 return NULL;
209 }
210
211 for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
212 if (search->refcount == 0) {
213 if (search->exptime == 0 || search->exptime > current_time) {
214 engine->items.itemstats[id].evicted++;
215 engine->items.itemstats[id].evicted_time = current_time - search->time;
216 if (search->exptime != 0) {
217 engine->items.itemstats[id].evicted_nonzero++;
218 }
219 pthread_mutex_lock(&engine->stats.lock);
220 engine->stats.evictions++;
221 pthread_mutex_unlock(&engine->stats.lock);
222 engine->server.stat->evicting(cookie,
223 item_get_key(search),
224 search->nkey);
225 } else {
226 engine->items.itemstats[id].reclaimed++;
227 pthread_mutex_lock(&engine->stats.lock);
228 engine->stats.reclaimed++;
229 pthread_mutex_unlock(&engine->stats.lock);
230 }
231 do_item_unlink(engine, search);
232 break;
233 }
234 }
235 it = slabs_alloc(engine, ntotal, id);
236 if (it == 0) {
237 engine->items.itemstats[id].outofmemory++;
238 /* Last ditch effort. There is a very rare bug which causes
239 * refcount leaks. We've fixed most of them, but it still happens,
240 * and it may happen in the future.
241 * We can reasonably assume no item can stay locked for more than
242 * three hours, so if we find one in the tail which is that old,
243 * free it anyway.
244 */
245 tries = 50;
246 for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
247 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
248 engine->items.itemstats[id].tailrepairs++;
249 search->refcount = 0;
250 do_item_unlink(engine, search);
251 break;
252 }
253 }
254 it = slabs_alloc(engine, ntotal, id);
255 if (it == 0) {
256 return NULL;
257 }
258 }
259 }
260
261 assert(it->slabs_clsid == 0);
262
263 it->slabs_clsid = id;
264
265 assert(it != engine->items.heads[it->slabs_clsid]);
266
267 it->next = it->prev = it->h_next = 0;
268 it->refcount = 1; /* the caller will have a reference */
269 DEBUG_REFCNT(it, '*');
270 it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
271 it->nkey = nkey;
272 it->nbytes = nbytes;
273 it->flags = flags;
274 memcpy((void*)item_get_key(it), key, nkey);
275 it->exptime = exptime;
276 return it;
277 }
278 #endif /* INNODB_MEMCACHED */
279
item_free(struct default_engine * engine,hash_item * it)280 static void item_free(struct default_engine *engine, hash_item *it) {
281 size_t ntotal = ITEM_ntotal(engine, it);
282 unsigned int clsid;
283 assert((it->iflag & ITEM_LINKED) == 0);
284 assert(it != engine->items.heads[it->slabs_clsid]);
285 assert(it != engine->items.tails[it->slabs_clsid]);
286 assert(it->refcount == 0);
287
288 /* so slab size changer can tell later if item is already free or not */
289 clsid = it->slabs_clsid;
290 it->slabs_clsid = 0;
291 it->iflag |= ITEM_SLABBED;
292 DEBUG_REFCNT(it, 'F');
293 slabs_free(engine, it, ntotal, clsid);
294 }
295
item_link_q(struct default_engine * engine,hash_item * it)296 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
297 hash_item **head, **tail;
298 assert(it->slabs_clsid < POWER_LARGEST);
299 assert((it->iflag & ITEM_SLABBED) == 0);
300
301 head = &engine->items.heads[it->slabs_clsid];
302 tail = &engine->items.tails[it->slabs_clsid];
303 assert(it != *head);
304 assert((*head && *tail) || (*head == 0 && *tail == 0));
305 it->prev = 0;
306 it->next = *head;
307 if (it->next) it->next->prev = it;
308 *head = it;
309 if (*tail == 0) *tail = it;
310 engine->items.sizes[it->slabs_clsid]++;
311 return;
312 }
313
item_unlink_q(struct default_engine * engine,hash_item * it)314 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
315 hash_item **head, **tail;
316 assert(it->slabs_clsid < POWER_LARGEST);
317 head = &engine->items.heads[it->slabs_clsid];
318 tail = &engine->items.tails[it->slabs_clsid];
319
320 if (*head == it) {
321 assert(it->prev == 0);
322 *head = it->next;
323 }
324 if (*tail == it) {
325 assert(it->next == 0);
326 *tail = it->prev;
327 }
328 assert(it->next != it);
329 assert(it->prev != it);
330
331 if (it->next) it->next->prev = it->prev;
332 if (it->prev) it->prev->next = it->next;
333 engine->items.sizes[it->slabs_clsid]--;
334 return;
335 }
336
do_item_link(struct default_engine * engine,hash_item * it)337 int do_item_link(struct default_engine *engine, hash_item *it) {
338 MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
339 assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
340 assert(it->nbytes < (1024 * 1024)); /* 1MB max size */
341 it->iflag |= ITEM_LINKED;
342 it->time = engine->server.core->get_current_time();
343 assoc_insert(engine, engine->server.core->hash(item_get_key(it),
344 it->nkey, 0),
345 it);
346
347 pthread_mutex_lock(&engine->stats.lock);
348 engine->stats.curr_bytes += ITEM_ntotal(engine, it);
349 engine->stats.curr_items += 1;
350 engine->stats.total_items += 1;
351 pthread_mutex_unlock(&engine->stats.lock);
352
353 /* Allocate a new CAS ID on link. */
354 item_set_cas(NULL, NULL, it, get_cas_id());
355
356 item_link_q(engine, it);
357
358 return 1;
359 }
360
do_item_unlink(struct default_engine * engine,hash_item * it)361 void do_item_unlink(struct default_engine *engine, hash_item *it) {
362 MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
363 if ((it->iflag & ITEM_LINKED) != 0) {
364 it->iflag &= ~ITEM_LINKED;
365 pthread_mutex_lock(&engine->stats.lock);
366 engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
367 engine->stats.curr_items -= 1;
368 pthread_mutex_unlock(&engine->stats.lock);
369 assoc_delete(engine, engine->server.core->hash(item_get_key(it),
370 it->nkey, 0),
371 item_get_key(it), it->nkey);
372 item_unlink_q(engine, it);
373 if (it->refcount == 0) {
374 item_free(engine, it);
375 }
376 }
377 }
378
do_item_release(struct default_engine * engine,hash_item * it)379 void do_item_release(struct default_engine *engine, hash_item *it) {
380 MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
381 if (it->refcount != 0) {
382 it->refcount--;
383 DEBUG_REFCNT(it, '-');
384 }
385 if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
386 item_free(engine, it);
387 }
388 }
389
do_item_update(struct default_engine * engine,hash_item * it)390 void do_item_update(struct default_engine *engine, hash_item *it) {
391 rel_time_t current_time = engine->server.core->get_current_time();
392 MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
393 if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
394 assert((it->iflag & ITEM_SLABBED) == 0);
395
396 if ((it->iflag & ITEM_LINKED) != 0) {
397 item_unlink_q(engine, it);
398 it->time = current_time;
399 item_link_q(engine, it);
400 }
401 }
402 }
403
do_item_replace(struct default_engine * engine,hash_item * it,hash_item * new_it)404 int do_item_replace(struct default_engine *engine,
405 hash_item *it, hash_item *new_it) {
406 MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
407 item_get_key(new_it), new_it->nkey, new_it->nbytes);
408 assert((it->iflag & ITEM_SLABBED) == 0);
409
410 do_item_unlink(engine, it);
411 return do_item_link(engine, new_it);
412 }
413
414 /*@null@*/
do_item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)415 static char *do_item_cachedump(const unsigned int slabs_clsid,
416 const unsigned int limit,
417 unsigned int *bytes) {
418 #ifdef FUTURE
419 unsigned int memlimit = 2 * 1024 * 1024; /* 2MB max response size */
420 char *buffer;
421 unsigned int bufcurr;
422 hash_item *it;
423 unsigned int len;
424 unsigned int shown = 0;
425 char key_temp[KEY_MAX_LENGTH + 1];
426 char temp[512];
427
428 it = engine->items.heads[slabs_clsid];
429
430 buffer = malloc((size_t)memlimit);
431 if (buffer == 0) return NULL;
432 bufcurr = 0;
433
434
435 while (it != NULL && (limit == 0 || shown < limit)) {
436 assert(it->nkey <= KEY_MAX_LENGTH);
437 /* Copy the key since it may not be null-terminated in the struct */
438 strncpy(key_temp, item_get_key(it), it->nkey);
439 key_temp[it->nkey] = 0x00; /* terminate */
440 len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
441 key_temp, it->nbytes - 2,
442 (unsigned long)it->exptime + process_started);
443 if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
444 break;
445 memcpy(buffer + bufcurr, temp, len);
446 bufcurr += len;
447 shown++;
448 it = it->next;
449 }
450
451
452 memcpy(buffer + bufcurr, "END\r\n", 6);
453 bufcurr += 5;
454
455 *bytes = bufcurr;
456 return buffer;
457 #endif
458 (void)slabs_clsid;
459 (void)limit;
460 (void)bytes;
461 return NULL;
462 }
463
do_item_stats(struct default_engine * engine,ADD_STAT add_stats,const void * c)464 static void do_item_stats(struct default_engine *engine,
465 ADD_STAT add_stats, const void *c) {
466 int i;
467 for (i = 0; i < POWER_LARGEST; i++) {
468 if (engine->items.tails[i] != NULL) {
469 const char *prefix = "items";
470 add_statistics(c, add_stats, prefix, i, "number", "%u",
471 engine->items.sizes[i]);
472 add_statistics(c, add_stats, prefix, i, "age", "%u",
473 engine->items.tails[i]->time);
474 add_statistics(c, add_stats, prefix, i, "evicted",
475 "%u", engine->items.itemstats[i].evicted);
476 add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
477 "%u", engine->items.itemstats[i].evicted_nonzero);
478 add_statistics(c, add_stats, prefix, i, "evicted_time",
479 "%u", engine->items.itemstats[i].evicted_time);
480 add_statistics(c, add_stats, prefix, i, "outofmemory",
481 "%u", engine->items.itemstats[i].outofmemory);
482 add_statistics(c, add_stats, prefix, i, "tailrepairs",
483 "%u", engine->items.itemstats[i].tailrepairs);;
484 add_statistics(c, add_stats, prefix, i, "reclaimed",
485 "%u", engine->items.itemstats[i].reclaimed);;
486 }
487 }
488 }
489
490 /** dumps out a list of objects of each size, with granularity of 32 bytes */
491 /*@null@*/
do_item_stats_sizes(struct default_engine * engine,ADD_STAT add_stats,const void * c)492 static void do_item_stats_sizes(struct default_engine *engine,
493 ADD_STAT add_stats, const void *c) {
494
495 /* max 1MB object, divided into 32 bytes size buckets */
496 const int num_buckets = 32768;
497 unsigned int *histogram = calloc(num_buckets, sizeof(int));
498
499 if (histogram != NULL) {
500 int i;
501
502 /* build the histogram */
503 for (i = 0; i < POWER_LARGEST; i++) {
504 hash_item *iter = engine->items.heads[i];
505 while (iter) {
506 int ntotal = ITEM_ntotal(engine, iter);
507 int bucket = ntotal / 32;
508 if ((ntotal % 32) != 0) bucket++;
509 if (bucket < num_buckets) histogram[bucket]++;
510 iter = iter->next;
511 }
512 }
513
514 /* write the buffer */
515 for (i = 0; i < num_buckets; i++) {
516 if (histogram[i] != 0) {
517 char key[8], val[32];
518 int klen, vlen;
519 klen = snprintf(key, sizeof(key), "%d", i * 32);
520 vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
521 assert(klen < sizeof(key));
522 assert(vlen < sizeof(val));
523 add_stats(key, klen, val, vlen, c);
524 }
525 }
526 free(histogram);
527 }
528 }
529
530 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(struct default_engine * engine,const char * key,const size_t nkey)531 hash_item *do_item_get(struct default_engine *engine,
532 const char *key, const size_t nkey) {
533 rel_time_t current_time = engine->server.core->get_current_time();
534 hash_item *it = assoc_find(engine, engine->server.core->hash(key,
535 nkey, 0),
536 key, nkey);
537 int was_found = 0;
538
539 if (engine->config.verbose > 2) {
540 if (it == NULL) {
541 fprintf(stderr, "> NOT FOUND %s", key);
542 } else {
543 fprintf(stderr, "> FOUND KEY %s", (const char*)item_get_key(it));
544 was_found++;
545 }
546 }
547
548 if (it != NULL && engine->config.oldest_live != 0 &&
549 engine->config.oldest_live <= current_time &&
550 it->time <= engine->config.oldest_live) {
551 do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
552 it = NULL;
553 }
554
555 if (it == NULL && was_found) {
556 fprintf(stderr, " -nuked by flush");
557 was_found--;
558 }
559
560 if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
561 do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
562 it = NULL;
563 }
564
565 if (it == NULL && was_found) {
566 fprintf(stderr, " -nuked by expire");
567 was_found--;
568 }
569
570 if (it != NULL) {
571 it->refcount++;
572 DEBUG_REFCNT(it, '+');
573 do_item_update(engine, it);
574 }
575
576 if (engine->config.verbose > 2)
577 fprintf(stderr, "\n");
578
579 return it;
580 }
581
582 /*
583 * Stores an item in the cache according to the semantics of one of the set
584 * commands. In threaded mode, this is protected by the cache lock.
585 *
586 * Returns the state of storage.
587 */
do_store_item(struct default_engine * engine,hash_item * it,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)588 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
589 hash_item *it, uint64_t *cas,
590 ENGINE_STORE_OPERATION operation,
591 const void *cookie) {
592 const char *key = item_get_key(it);
593 hash_item *old_it = do_item_get(engine, key, it->nkey);
594 ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
595
596 hash_item *new_it = NULL;
597
598 if (old_it != NULL && operation == OPERATION_ADD) {
599 /* add only adds a nonexistent item, but promote to head of LRU */
600 do_item_update(engine, old_it);
601 } else if (!old_it && (operation == OPERATION_REPLACE
602 || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
603 {
604 /* replace only replaces an existing value; don't store */
605 } else if (operation == OPERATION_CAS) {
606 /* validate cas operation */
607 if(old_it == NULL) {
608 // LRU expired
609 stored = ENGINE_KEY_ENOENT;
610 }
611 else if (item_get_cas(it) == item_get_cas(old_it)) {
612 // cas validates
613 // it and old_it may belong to different classes.
614 // I'm updating the stats for the one that's getting pushed out
615 do_item_replace(engine, old_it, it);
616 stored = ENGINE_SUCCESS;
617 } else {
618 if (engine->config.verbose > 1) {
619 fprintf(stderr,
620 "CAS: failure: expected %"PRIu64", got %"PRIu64"\n",
621 item_get_cas(old_it),
622 item_get_cas(it));
623 }
624 stored = ENGINE_KEY_EEXISTS;
625 }
626 } else {
627 /*
628 * Append - combine new and old record into single one. Here it's
629 * atomic and thread-safe.
630 */
631 if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
632 /*
633 * Validate CAS
634 */
635 if (item_get_cas(it) != 0) {
636 // CAS much be equal
637 if (item_get_cas(it) != item_get_cas(old_it)) {
638 stored = ENGINE_KEY_EEXISTS;
639 }
640 }
641
642 if (stored == ENGINE_NOT_STORED) {
643 /* we have it and old_it here - alloc memory to hold both */
644 new_it = do_item_alloc(engine, key, it->nkey,
645 old_it->flags,
646 old_it->exptime,
647 it->nbytes + old_it->nbytes - 2 /* CRLF */,
648 cookie);
649
650 if (new_it == NULL) {
651 /* SERVER_ERROR out of memory */
652 if (old_it != NULL) {
653 do_item_release(engine, old_it);
654 }
655
656 return ENGINE_NOT_STORED;
657 }
658
659 /* copy data from it and old_it to new_it */
660
661 if (operation == OPERATION_APPEND) {
662 memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
663 memcpy(item_get_data(new_it) + old_it->nbytes - 2 /* CRLF */, item_get_data(it), it->nbytes);
664 } else {
665 /* OPERATION_PREPEND */
666 memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
667 memcpy(item_get_data(new_it) + it->nbytes - 2 /* CRLF */, item_get_data(old_it), old_it->nbytes);
668 }
669
670 it = new_it;
671 }
672 }
673
674 if (stored == ENGINE_NOT_STORED) {
675 if (old_it != NULL) {
676 do_item_replace(engine, old_it, it);
677 } else {
678 do_item_link(engine, it);
679 }
680
681 *cas = item_get_cas(it);
682 stored = ENGINE_SUCCESS;
683 }
684 }
685
686 if (old_it != NULL) {
687 do_item_release(engine, old_it); /* release our reference */
688 }
689
690 if (new_it != NULL) {
691 do_item_release(engine, new_it);
692 }
693
694 if (stored == ENGINE_SUCCESS) {
695 *cas = item_get_cas(it);
696 }
697
698 return stored;
699 }
700
701
702 /*
703 * adds a delta value to a numeric item.
704 *
705 * c connection requesting the operation
706 * it item to adjust
707 * incr true to increment value, false to decrement
708 * delta amount to adjust value by
709 * buf buffer for response string
710 *
711 * returns a response string to send back to the client.
712 */
do_add_delta(struct default_engine * engine,hash_item * it,const bool incr,const int64_t delta,uint64_t * rcas,uint64_t * result,const void * cookie)713 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
714 hash_item *it, const bool incr,
715 const int64_t delta, uint64_t *rcas,
716 uint64_t *result, const void *cookie) {
717 const char *ptr;
718 uint64_t value;
719 int res;
720
721 ptr = item_get_data(it);
722
723 if (!safe_strtoull(ptr, &value)) {
724 return ENGINE_EINVAL;
725 }
726
727 if (incr) {
728 value += delta;
729 } else {
730 if(delta > value) {
731 value = 0;
732 } else {
733 value -= delta;
734 }
735 }
736
737 *result = value;
738 char buf[80];
739 if ((res = snprintf(buf, sizeof(buf), "%" PRIu64 "\r\n", value)) == -1) {
740 return ENGINE_EINVAL;
741 }
742 hash_item *new_it = do_item_alloc(engine, item_get_key(it),
743 it->nkey, it->flags,
744 it->exptime, res,
745 cookie );
746 if (new_it == 0) {
747 do_item_unlink(engine, it);
748 return ENGINE_ENOMEM;
749 }
750 memcpy(item_get_data(new_it), buf, res);
751 do_item_replace(engine, it, new_it);
752 *rcas = item_get_cas(new_it);
753 do_item_release(engine, new_it); /* release our reference */
754
755 return ENGINE_SUCCESS;
756 }
757
758 /********************************* ITEM ACCESS *******************************/
759 #ifdef INNODB_MEMCACHED
760
761 /*
762 * Allocates a new item.
763 */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)764 hash_item *item_alloc(struct default_engine *engine,
765 const void *key, size_t nkey, int flags,
766 rel_time_t exptime, int nbytes, const void *cookie) {
767 hash_item *it;
768 it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
769 return it;
770 }
771 #else /* INNODB_MEMCAHED */
772 /*
773 * Allocates a new item.
774 */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)775 hash_item *item_alloc(struct default_engine *engine,
776 const void *key, size_t nkey, int flags,
777 rel_time_t exptime, int nbytes, const void *cookie) {
778 hash_item *it;
779 pthread_mutex_lock(&engine->cache_lock);
780 it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
781 pthread_mutex_unlock(&engine->cache_lock);
782 return it;
783 }
784 #endif /* INNODB_MEMCAHED */
785
786 /*
787 * Returns an item if it hasn't been marked as expired,
788 * lazy-expiring as needed.
789 */
item_get(struct default_engine * engine,const void * key,const size_t nkey)790 hash_item *item_get(struct default_engine *engine,
791 const void *key, const size_t nkey) {
792 hash_item *it;
793 pthread_mutex_lock(&engine->cache_lock);
794 it = do_item_get(engine, key, nkey);
795 pthread_mutex_unlock(&engine->cache_lock);
796 return it;
797 }
798
799 /*
800 * Decrements the reference count on an item and adds it to the freelist if
801 * needed.
802 */
item_release(struct default_engine * engine,hash_item * item)803 void item_release(struct default_engine *engine, hash_item *item) {
804 pthread_mutex_lock(&engine->cache_lock);
805 do_item_release(engine, item);
806 pthread_mutex_unlock(&engine->cache_lock);
807 }
808
809 /*
810 * Unlinks an item from the LRU and hashtable.
811 */
item_unlink(struct default_engine * engine,hash_item * item)812 void item_unlink(struct default_engine *engine, hash_item *item) {
813 pthread_mutex_lock(&engine->cache_lock);
814 do_item_unlink(engine, item);
815 pthread_mutex_unlock(&engine->cache_lock);
816 }
817
do_arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)818 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
819 const void* cookie,
820 const void* key,
821 const int nkey,
822 const bool increment,
823 const bool create,
824 const uint64_t delta,
825 const uint64_t initial,
826 const rel_time_t exptime,
827 uint64_t *cas,
828 uint64_t *result)
829 {
830 hash_item *item = do_item_get(engine, key, nkey);
831 ENGINE_ERROR_CODE ret;
832
833 if (item == NULL) {
834 if (!create) {
835 return ENGINE_KEY_ENOENT;
836 } else {
837 char buffer[128];
838 int len = snprintf(buffer, sizeof(buffer), "%"PRIu64"\r\n",
839 (uint64_t)initial);
840
841 item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
842 if (item == NULL) {
843 return ENGINE_ENOMEM;
844 }
845 memcpy((void*)item_get_data(item), buffer, len);
846 if ((ret = do_store_item(engine, item, cas,
847 OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
848 *result = initial;
849 *cas = item_get_cas(item);
850 }
851 do_item_release(engine, item);
852 }
853 } else {
854 ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
855 do_item_release(engine, item);
856 }
857
858 return ret;
859 }
860
arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)861 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
862 const void* cookie,
863 const void* key,
864 const int nkey,
865 const bool increment,
866 const bool create,
867 const uint64_t delta,
868 const uint64_t initial,
869 const rel_time_t exptime,
870 uint64_t *cas,
871 uint64_t *result)
872 {
873 ENGINE_ERROR_CODE ret;
874
875 pthread_mutex_lock(&engine->cache_lock);
876 ret = do_arithmetic(engine, cookie, key, nkey, increment,
877 create, delta, initial, exptime, cas,
878 result);
879 pthread_mutex_unlock(&engine->cache_lock);
880 return ret;
881 }
882
883 /*
884 * Stores an item in the cache (high level, obeys set/add/replace semantics)
885 */
store_item(struct default_engine * engine,hash_item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)886 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
887 hash_item *item, uint64_t *cas,
888 ENGINE_STORE_OPERATION operation,
889 const void *cookie) {
890 ENGINE_ERROR_CODE ret;
891
892 pthread_mutex_lock(&engine->cache_lock);
893 ret = do_store_item(engine, item, cas, operation, cookie);
894 pthread_mutex_unlock(&engine->cache_lock);
895 return ret;
896 }
897
898 /*
899 * Flushes expired items after a flush_all call
900 */
item_flush_expired(struct default_engine * engine,time_t when)901 void item_flush_expired(struct default_engine *engine, time_t when) {
902 int i;
903 hash_item *iter, *next;
904
905 pthread_mutex_lock(&engine->cache_lock);
906
907 if (when == 0) {
908 engine->config.oldest_live = engine->server.core->get_current_time() - 1;
909 } else {
910 engine->config.oldest_live = engine->server.core->realtime(when) - 1;
911 }
912
913 if (engine->config.oldest_live != 0) {
914 for (i = 0; i < POWER_LARGEST; i++) {
915 /*
916 * The LRU is sorted in decreasing time order, and an item's
917 * timestamp is never newer than its last access time, so we
918 * only need to walk back until we hit an item older than the
919 * oldest_live time.
920 * The oldest_live checking will auto-expire the remaining items.
921 */
922 for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
923 if (iter->time >= engine->config.oldest_live) {
924 next = iter->next;
925 if ((iter->iflag & ITEM_SLABBED) == 0) {
926 do_item_unlink(engine, iter);
927 }
928 } else {
929 /* We've hit the first old item. Continue to the next queue. */
930 break;
931 }
932 }
933 }
934 }
935 pthread_mutex_unlock(&engine->cache_lock);
936 }
937
938 /*
939 * Dumps part of the cache
940 */
item_cachedump(struct default_engine * engine,unsigned int slabs_clsid,unsigned int limit,unsigned int * bytes)941 char *item_cachedump(struct default_engine *engine,
942 unsigned int slabs_clsid,
943 unsigned int limit,
944 unsigned int *bytes) {
945 char *ret;
946
947 pthread_mutex_lock(&engine->cache_lock);
948 ret = do_item_cachedump(slabs_clsid, limit, bytes);
949 pthread_mutex_unlock(&engine->cache_lock);
950 return ret;
951 }
952
item_stats(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)953 void item_stats(struct default_engine *engine,
954 ADD_STAT add_stat, const void *cookie)
955 {
956 pthread_mutex_lock(&engine->cache_lock);
957 do_item_stats(engine, add_stat, cookie);
958 pthread_mutex_unlock(&engine->cache_lock);
959 }
960
961
item_stats_sizes(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)962 void item_stats_sizes(struct default_engine *engine,
963 ADD_STAT add_stat, const void *cookie)
964 {
965 pthread_mutex_lock(&engine->cache_lock);
966 do_item_stats_sizes(engine, add_stat, cookie);
967 pthread_mutex_unlock(&engine->cache_lock);
968 }
969
do_item_link_cursor(struct default_engine * engine,hash_item * cursor,int ii)970 static void do_item_link_cursor(struct default_engine *engine,
971 hash_item *cursor, int ii)
972 {
973 cursor->slabs_clsid = (uint8_t)ii;
974 cursor->next = NULL;
975 cursor->prev = engine->items.tails[ii];
976 engine->items.tails[ii]->next = cursor;
977 engine->items.tails[ii] = cursor;
978 engine->items.sizes[ii]++;
979 }
980
981 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
982 hash_item *item, void *cookie);
983
do_item_walk_cursor(struct default_engine * engine,hash_item * cursor,int steplength,ITERFUNC itemfunc,void * itemdata,ENGINE_ERROR_CODE * error)984 static bool do_item_walk_cursor(struct default_engine *engine,
985 hash_item *cursor,
986 int steplength,
987 ITERFUNC itemfunc,
988 void* itemdata,
989 ENGINE_ERROR_CODE *error)
990 {
991 int ii = 0;
992 *error = ENGINE_SUCCESS;
993
994 while (cursor->prev != NULL && ii < steplength) {
995 ++ii;
996 /* Move cursor */
997 hash_item *ptr = cursor->prev;
998 item_unlink_q(engine, cursor);
999
1000 bool done = false;
1001
1002 if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1003 done = true;
1004 } else {
1005 cursor->next = ptr;
1006 cursor->prev = ptr->prev;
1007 cursor->prev->next = cursor;
1008 ptr->prev = cursor;
1009 }
1010
1011 /* Ignore cursors */
1012 if (!(ptr->nkey == 0 && ptr->nbytes == 0)) {
1013 *error = itemfunc(engine, ptr, itemdata);
1014 if (*error != ENGINE_SUCCESS) {
1015 return false;
1016 }
1017 }
1018
1019 if (done) {
1020 return false;
1021 }
1022 }
1023
1024 return true;
1025 }
1026
item_scrub(struct default_engine * engine,hash_item * item,void * cookie)1027 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1028 hash_item *item,
1029 void *cookie) {
1030 (void)cookie;
1031 engine->scrubber.visited++;
1032 rel_time_t current_time = engine->server.core->get_current_time();
1033 if (item->refcount == 0 &&
1034 (item->exptime != 0 && item->exptime < current_time)) {
1035 do_item_unlink(engine, item);
1036 engine->scrubber.cleaned++;
1037 }
1038 return ENGINE_SUCCESS;
1039 }
1040
item_scrub_class(struct default_engine * engine,hash_item * cursor)1041 static void item_scrub_class(struct default_engine *engine,
1042 hash_item *cursor) {
1043
1044 ENGINE_ERROR_CODE ret;
1045 bool more;
1046 do {
1047 pthread_mutex_lock(&engine->cache_lock);
1048 more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1049 pthread_mutex_unlock(&engine->cache_lock);
1050 if (ret != ENGINE_SUCCESS) {
1051 break;
1052 }
1053 } while (more);
1054 }
1055
item_scubber_main(void * arg)1056 static void *item_scubber_main(void *arg)
1057 {
1058 struct default_engine *engine = arg;
1059 hash_item cursor = { .refcount = 1 };
1060
1061 for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1062 pthread_mutex_lock(&engine->cache_lock);
1063 bool skip = false;
1064 if (engine->items.heads[ii] == NULL) {
1065 skip = true;
1066 } else {
1067 // add the item at the tail
1068 do_item_link_cursor(engine, &cursor, ii);
1069 }
1070 pthread_mutex_unlock(&engine->cache_lock);
1071
1072 if (!skip) {
1073 item_scrub_class(engine, &cursor);
1074 }
1075 }
1076
1077 pthread_mutex_lock(&engine->scrubber.lock);
1078 engine->scrubber.stopped = time(NULL);
1079 engine->scrubber.running = false;
1080 pthread_mutex_unlock(&engine->scrubber.lock);
1081
1082 return NULL;
1083 }
1084
item_start_scrub(struct default_engine * engine)1085 bool item_start_scrub(struct default_engine *engine)
1086 {
1087 bool ret = false;
1088 pthread_mutex_lock(&engine->scrubber.lock);
1089 if (!engine->scrubber.running) {
1090 engine->scrubber.started = time(NULL);
1091 engine->scrubber.stopped = 0;
1092 engine->scrubber.visited = 0;
1093 engine->scrubber.cleaned = 0;
1094 engine->scrubber.running = true;
1095
1096 pthread_t t;
1097 pthread_attr_t attr;
1098
1099 if (pthread_attr_init(&attr) != 0 ||
1100 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1101 pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1102 {
1103 engine->scrubber.running = false;
1104 } else {
1105 ret = true;
1106 }
1107 }
1108 pthread_mutex_unlock(&engine->scrubber.lock);
1109
1110 return ret;
1111 }
1112