1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <fcntl.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <time.h>
9 #include <assert.h>
10 #include <inttypes.h>
11
12 #include "default_engine.h"
13
14 #define INNODB_MEMCACHED
15 /* Forward Declarations */
16 static void item_link_q(struct default_engine *engine, hash_item *it);
17 static void item_unlink_q(struct default_engine *engine, hash_item *it);
18 static hash_item *do_item_alloc(struct default_engine *engine,
19 const void *key, const size_t nkey,
20 const int flags, const rel_time_t exptime,
21 const int nbytes,
22 const void *cookie);
23 static hash_item *do_item_get(struct default_engine *engine,
24 const char *key, const size_t nkey);
25 static int do_item_link(struct default_engine *engine, hash_item *it);
26 static void do_item_unlink(struct default_engine *engine, hash_item *it);
27 static void do_item_release(struct default_engine *engine, hash_item *it);
28 static void do_item_update(struct default_engine *engine, hash_item *it);
29 static int do_item_replace(struct default_engine *engine,
30 hash_item *it, hash_item *new_it);
31 static void item_free(struct default_engine *engine, hash_item *it);
32
33 /*
34 * We only reposition items in the LRU queue if they haven't been repositioned
35 * in this many seconds. That saves us from churning on frequently-accessed
36 * items.
37 */
38 #define ITEM_UPDATE_INTERVAL 60
39
40 void cache_set_initial_cas_id(uint64_t cas);
41 static uint64_t cas_id;
42
item_stats_reset(struct default_engine * engine)43 void item_stats_reset(struct default_engine *engine) {
44 pthread_mutex_lock(&engine->cache_lock);
45 memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
46 pthread_mutex_unlock(&engine->cache_lock);
47 }
48
49
50 /* warning: don't use these macros with a function, as it evals its arg twice */
ITEM_ntotal(struct default_engine * engine,const hash_item * item)51 static inline size_t ITEM_ntotal(struct default_engine *engine,
52 const hash_item *item) {
53 size_t ret = sizeof(*item) + item->nkey + item->nbytes;
54 if (engine->config.use_cas) {
55 ret += sizeof(uint64_t);
56 }
57
58 return ret;
59 }
60
61 /* Set the initial CAS id for the hash table */
cache_set_initial_cas_id(uint64_t cas)62 void cache_set_initial_cas_id(uint64_t cas) {
63 cas_id = cas;
64 }
65
66 /* Get the next CAS id for a new item. */
get_cas_id(void)67 static uint64_t get_cas_id(void) {
68 return ++cas_id;
69 }
70
71 /* Enable this for reference-count debugging. */
72 #if 0
73 # define DEBUG_REFCNT(it,op) \
74 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
75 it, op, it->refcount, \
76 (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
77 (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
78 #else
79 # define DEBUG_REFCNT(it,op) while(0)
80 #endif
81
82
83 #ifdef INNODB_MEMCACHED /* INNODB_MEMCACHED */
84 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)85 hash_item *do_item_alloc(struct default_engine *engine,
86 const void *key,
87 const size_t nkey,
88 const int flags,
89 const rel_time_t exptime,
90 const int nbytes,
91 const void *cookie) {
92 hash_item *it = NULL;
93 size_t ntotal = sizeof(hash_item) + nkey + nbytes;
94 if (engine->config.use_cas) {
95 ntotal += sizeof(uint64_t);
96 }
97
98 unsigned int id = slabs_clsid(engine, ntotal);
99 if (id == 0)
100 return 0;
101
102 if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
103 return NULL;
104 }
105
106 assert(it->slabs_clsid == 0);
107
108 it->slabs_clsid = id;
109
110 assert(it != engine->items.heads[it->slabs_clsid]);
111
112 it->next = it->prev = it->h_next = 0;
113 it->refcount = 1; /* the caller will have a reference */
114 DEBUG_REFCNT(it, '*');
115 it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
116 it->nkey = nkey;
117 it->nbytes = nbytes;
118 it->flags = flags;
119 memcpy((void*)item_get_key(it), key, nkey);
120 it->exptime = exptime;
121 return it;
122 }
123
124 #else /* INNODB_MEMCACHED */
125 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)126 hash_item *do_item_alloc(struct default_engine *engine,
127 const void *key,
128 const size_t nkey,
129 const int flags,
130 const rel_time_t exptime,
131 const int nbytes,
132 const void *cookie) {
133 hash_item *it = NULL;
134 size_t ntotal = sizeof(hash_item) + nkey + nbytes;
135 if (engine->config.use_cas) {
136 ntotal += sizeof(uint64_t);
137 }
138
139 unsigned int id = slabs_clsid(engine, ntotal);
140 if (id == 0)
141 return 0;
142
143 /* do a quick check if we have any expired items in the tail.. */
144 int tries = 50;
145 hash_item *search;
146
147 rel_time_t current_time = engine->server.core->get_current_time();
148
149 for (search = engine->items.tails[id];
150 tries > 0 && search != NULL;
151 tries--, search=search->prev) {
152 if (search->refcount == 0 &&
153 (search->exptime != 0 && search->exptime < current_time)) {
154 it = search;
155 /* I don't want to actually free the object, just steal
156 * the item to avoid to grab the slab mutex twice ;-)
157 */
158 pthread_mutex_lock(&engine->stats.lock);
159 engine->stats.reclaimed++;
160 pthread_mutex_unlock(&engine->stats.lock);
161 engine->items.itemstats[id].reclaimed++;
162 it->refcount = 1;
163 do_item_unlink(engine, it);
164 /* Initialize the item block: */
165 it->slabs_clsid = 0;
166 it->refcount = 0;
167 break;
168 }
169 }
170
171 if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
172 /*
173 ** Could not find an expired item at the tail, and memory allocation
174 ** failed. Try to evict some items!
175 */
176 tries = 50;
177
178 /* If requested to not push old items out of cache when memory runs out,
179 * we're out of luck at this point...
180 */
181
182 if (engine->config.evict_to_free == 0) {
183 engine->items.itemstats[id].outofmemory++;
184 return NULL;
185 }
186
187 /*
188 * try to get one off the right LRU
189 * don't necessariuly unlink the tail because it may be locked: refcount>0
190 * search up from tail an item with refcount==0 and unlink it; give up after 50
191 * tries
192 */
193
194 if (engine->items.tails[id] == 0) {
195 engine->items.itemstats[id].outofmemory++;
196 return NULL;
197 }
198
199 for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
200 if (search->refcount == 0) {
201 if (search->exptime == 0 || search->exptime > current_time) {
202 engine->items.itemstats[id].evicted++;
203 engine->items.itemstats[id].evicted_time = current_time - search->time;
204 if (search->exptime != 0) {
205 engine->items.itemstats[id].evicted_nonzero++;
206 }
207 pthread_mutex_lock(&engine->stats.lock);
208 engine->stats.evictions++;
209 pthread_mutex_unlock(&engine->stats.lock);
210 engine->server.stat->evicting(cookie,
211 item_get_key(search),
212 search->nkey);
213 } else {
214 engine->items.itemstats[id].reclaimed++;
215 pthread_mutex_lock(&engine->stats.lock);
216 engine->stats.reclaimed++;
217 pthread_mutex_unlock(&engine->stats.lock);
218 }
219 do_item_unlink(engine, search);
220 break;
221 }
222 }
223 it = slabs_alloc(engine, ntotal, id);
224 if (it == 0) {
225 engine->items.itemstats[id].outofmemory++;
226 /* Last ditch effort. There is a very rare bug which causes
227 * refcount leaks. We've fixed most of them, but it still happens,
228 * and it may happen in the future.
229 * We can reasonably assume no item can stay locked for more than
230 * three hours, so if we find one in the tail which is that old,
231 * free it anyway.
232 */
233 tries = 50;
234 for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
235 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
236 engine->items.itemstats[id].tailrepairs++;
237 search->refcount = 0;
238 do_item_unlink(engine, search);
239 break;
240 }
241 }
242 it = slabs_alloc(engine, ntotal, id);
243 if (it == 0) {
244 return NULL;
245 }
246 }
247 }
248
249 assert(it->slabs_clsid == 0);
250
251 it->slabs_clsid = id;
252
253 assert(it != engine->items.heads[it->slabs_clsid]);
254
255 it->next = it->prev = it->h_next = 0;
256 it->refcount = 1; /* the caller will have a reference */
257 DEBUG_REFCNT(it, '*');
258 it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
259 it->nkey = nkey;
260 it->nbytes = nbytes;
261 it->flags = flags;
262 memcpy((void*)item_get_key(it), key, nkey);
263 it->exptime = exptime;
264 return it;
265 }
266 #endif /* INNODB_MEMCACHED */
267
item_free(struct default_engine * engine,hash_item * it)268 static void item_free(struct default_engine *engine, hash_item *it) {
269 size_t ntotal = ITEM_ntotal(engine, it);
270 unsigned int clsid;
271 assert((it->iflag & ITEM_LINKED) == 0);
272 assert(it != engine->items.heads[it->slabs_clsid]);
273 assert(it != engine->items.tails[it->slabs_clsid]);
274 assert(it->refcount == 0);
275
276 /* so slab size changer can tell later if item is already free or not */
277 clsid = it->slabs_clsid;
278 it->slabs_clsid = 0;
279 it->iflag |= ITEM_SLABBED;
280 DEBUG_REFCNT(it, 'F');
281 slabs_free(engine, it, ntotal, clsid);
282 }
283
item_link_q(struct default_engine * engine,hash_item * it)284 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
285 hash_item **head, **tail;
286 assert(it->slabs_clsid < POWER_LARGEST);
287 assert((it->iflag & ITEM_SLABBED) == 0);
288
289 head = &engine->items.heads[it->slabs_clsid];
290 tail = &engine->items.tails[it->slabs_clsid];
291 assert(it != *head);
292 assert((*head && *tail) || (*head == 0 && *tail == 0));
293 it->prev = 0;
294 it->next = *head;
295 if (it->next) it->next->prev = it;
296 *head = it;
297 if (*tail == 0) *tail = it;
298 engine->items.sizes[it->slabs_clsid]++;
299 return;
300 }
301
item_unlink_q(struct default_engine * engine,hash_item * it)302 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
303 hash_item **head, **tail;
304 assert(it->slabs_clsid < POWER_LARGEST);
305 head = &engine->items.heads[it->slabs_clsid];
306 tail = &engine->items.tails[it->slabs_clsid];
307
308 if (*head == it) {
309 assert(it->prev == 0);
310 *head = it->next;
311 }
312 if (*tail == it) {
313 assert(it->next == 0);
314 *tail = it->prev;
315 }
316 assert(it->next != it);
317 assert(it->prev != it);
318
319 if (it->next) it->next->prev = it->prev;
320 if (it->prev) it->prev->next = it->next;
321 engine->items.sizes[it->slabs_clsid]--;
322 return;
323 }
324
do_item_link(struct default_engine * engine,hash_item * it)325 int do_item_link(struct default_engine *engine, hash_item *it) {
326 MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
327 assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
328 assert(it->nbytes < (1024 * 1024)); /* 1MB max size */
329 it->iflag |= ITEM_LINKED;
330 it->time = engine->server.core->get_current_time();
331 assoc_insert(engine, engine->server.core->hash(item_get_key(it),
332 it->nkey, 0),
333 it);
334
335 pthread_mutex_lock(&engine->stats.lock);
336 engine->stats.curr_bytes += ITEM_ntotal(engine, it);
337 engine->stats.curr_items += 1;
338 engine->stats.total_items += 1;
339 pthread_mutex_unlock(&engine->stats.lock);
340
341 /* Allocate a new CAS ID on link. */
342 item_set_cas(NULL, NULL, it, get_cas_id());
343
344 item_link_q(engine, it);
345
346 return 1;
347 }
348
do_item_unlink(struct default_engine * engine,hash_item * it)349 void do_item_unlink(struct default_engine *engine, hash_item *it) {
350 MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
351 if ((it->iflag & ITEM_LINKED) != 0) {
352 it->iflag &= ~ITEM_LINKED;
353 pthread_mutex_lock(&engine->stats.lock);
354 engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
355 engine->stats.curr_items -= 1;
356 pthread_mutex_unlock(&engine->stats.lock);
357 assoc_delete(engine, engine->server.core->hash(item_get_key(it),
358 it->nkey, 0),
359 item_get_key(it), it->nkey);
360 item_unlink_q(engine, it);
361 if (it->refcount == 0) {
362 item_free(engine, it);
363 }
364 }
365 }
366
do_item_release(struct default_engine * engine,hash_item * it)367 void do_item_release(struct default_engine *engine, hash_item *it) {
368 MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
369 if (it->refcount != 0) {
370 it->refcount--;
371 DEBUG_REFCNT(it, '-');
372 }
373 if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
374 item_free(engine, it);
375 }
376 }
377
do_item_update(struct default_engine * engine,hash_item * it)378 void do_item_update(struct default_engine *engine, hash_item *it) {
379 rel_time_t current_time = engine->server.core->get_current_time();
380 MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
381 if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
382 assert((it->iflag & ITEM_SLABBED) == 0);
383
384 if ((it->iflag & ITEM_LINKED) != 0) {
385 item_unlink_q(engine, it);
386 it->time = current_time;
387 item_link_q(engine, it);
388 }
389 }
390 }
391
do_item_replace(struct default_engine * engine,hash_item * it,hash_item * new_it)392 int do_item_replace(struct default_engine *engine,
393 hash_item *it, hash_item *new_it) {
394 MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
395 item_get_key(new_it), new_it->nkey, new_it->nbytes);
396 assert((it->iflag & ITEM_SLABBED) == 0);
397
398 do_item_unlink(engine, it);
399 return do_item_link(engine, new_it);
400 }
401
402 /*@null@*/
do_item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)403 static char *do_item_cachedump(const unsigned int slabs_clsid,
404 const unsigned int limit,
405 unsigned int *bytes) {
406 #ifdef FUTURE
407 unsigned int memlimit = 2 * 1024 * 1024; /* 2MB max response size */
408 char *buffer;
409 unsigned int bufcurr;
410 hash_item *it;
411 unsigned int len;
412 unsigned int shown = 0;
413 char key_temp[KEY_MAX_LENGTH + 1];
414 char temp[512];
415
416 it = engine->items.heads[slabs_clsid];
417
418 buffer = malloc((size_t)memlimit);
419 if (buffer == 0) return NULL;
420 bufcurr = 0;
421
422
423 while (it != NULL && (limit == 0 || shown < limit)) {
424 assert(it->nkey <= KEY_MAX_LENGTH);
425 /* Copy the key since it may not be null-terminated in the struct */
426 strncpy(key_temp, item_get_key(it), it->nkey);
427 key_temp[it->nkey] = 0x00; /* terminate */
428 len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
429 key_temp, it->nbytes - 2,
430 (unsigned long)it->exptime + process_started);
431 if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
432 break;
433 memcpy(buffer + bufcurr, temp, len);
434 bufcurr += len;
435 shown++;
436 it = it->next;
437 }
438
439
440 memcpy(buffer + bufcurr, "END\r\n", 6);
441 bufcurr += 5;
442
443 *bytes = bufcurr;
444 return buffer;
445 #endif
446 (void)slabs_clsid;
447 (void)limit;
448 (void)bytes;
449 return NULL;
450 }
451
do_item_stats(struct default_engine * engine,ADD_STAT add_stats,const void * c)452 static void do_item_stats(struct default_engine *engine,
453 ADD_STAT add_stats, const void *c) {
454 int i;
455 for (i = 0; i < POWER_LARGEST; i++) {
456 if (engine->items.tails[i] != NULL) {
457 const char *prefix = "items";
458 add_statistics(c, add_stats, prefix, i, "number", "%u",
459 engine->items.sizes[i]);
460 add_statistics(c, add_stats, prefix, i, "age", "%u",
461 engine->items.tails[i]->time);
462 add_statistics(c, add_stats, prefix, i, "evicted",
463 "%u", engine->items.itemstats[i].evicted);
464 add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
465 "%u", engine->items.itemstats[i].evicted_nonzero);
466 add_statistics(c, add_stats, prefix, i, "evicted_time",
467 "%u", engine->items.itemstats[i].evicted_time);
468 add_statistics(c, add_stats, prefix, i, "outofmemory",
469 "%u", engine->items.itemstats[i].outofmemory);
470 add_statistics(c, add_stats, prefix, i, "tailrepairs",
471 "%u", engine->items.itemstats[i].tailrepairs);;
472 add_statistics(c, add_stats, prefix, i, "reclaimed",
473 "%u", engine->items.itemstats[i].reclaimed);;
474 }
475 }
476 }
477
478 /** dumps out a list of objects of each size, with granularity of 32 bytes */
479 /*@null@*/
do_item_stats_sizes(struct default_engine * engine,ADD_STAT add_stats,const void * c)480 static void do_item_stats_sizes(struct default_engine *engine,
481 ADD_STAT add_stats, const void *c) {
482
483 /* max 1MB object, divided into 32 bytes size buckets */
484 const int num_buckets = 32768;
485 unsigned int *histogram = calloc(num_buckets, sizeof(int));
486
487 if (histogram != NULL) {
488 int i;
489
490 /* build the histogram */
491 for (i = 0; i < POWER_LARGEST; i++) {
492 hash_item *iter = engine->items.heads[i];
493 while (iter) {
494 int ntotal = ITEM_ntotal(engine, iter);
495 int bucket = ntotal / 32;
496 if ((ntotal % 32) != 0) bucket++;
497 if (bucket < num_buckets) histogram[bucket]++;
498 iter = iter->next;
499 }
500 }
501
502 /* write the buffer */
503 for (i = 0; i < num_buckets; i++) {
504 if (histogram[i] != 0) {
505 char key[8], val[32];
506 int klen, vlen;
507 klen = snprintf(key, sizeof(key), "%d", i * 32);
508 vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
509 assert(klen < sizeof(key));
510 assert(vlen < sizeof(val));
511 add_stats(key, klen, val, vlen, c);
512 }
513 }
514 free(histogram);
515 }
516 }
517
518 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(struct default_engine * engine,const char * key,const size_t nkey)519 hash_item *do_item_get(struct default_engine *engine,
520 const char *key, const size_t nkey) {
521 rel_time_t current_time = engine->server.core->get_current_time();
522 hash_item *it = assoc_find(engine, engine->server.core->hash(key,
523 nkey, 0),
524 key, nkey);
525 int was_found = 0;
526
527 if (engine->config.verbose > 2) {
528 if (it == NULL) {
529 fprintf(stderr, "> NOT FOUND %s", key);
530 } else {
531 fprintf(stderr, "> FOUND KEY %s", (const char*)item_get_key(it));
532 was_found++;
533 }
534 }
535
536 if (it != NULL && engine->config.oldest_live != 0 &&
537 engine->config.oldest_live <= current_time &&
538 it->time <= engine->config.oldest_live) {
539 do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
540 it = NULL;
541 }
542
543 if (it == NULL && was_found) {
544 fprintf(stderr, " -nuked by flush");
545 was_found--;
546 }
547
548 if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
549 do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
550 it = NULL;
551 }
552
553 if (it == NULL && was_found) {
554 fprintf(stderr, " -nuked by expire");
555 was_found--;
556 }
557
558 if (it != NULL) {
559 it->refcount++;
560 DEBUG_REFCNT(it, '+');
561 do_item_update(engine, it);
562 }
563
564 if (engine->config.verbose > 2)
565 fprintf(stderr, "\n");
566
567 return it;
568 }
569
570 /*
571 * Stores an item in the cache according to the semantics of one of the set
572 * commands. In threaded mode, this is protected by the cache lock.
573 *
574 * Returns the state of storage.
575 */
do_store_item(struct default_engine * engine,hash_item * it,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)576 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
577 hash_item *it, uint64_t *cas,
578 ENGINE_STORE_OPERATION operation,
579 const void *cookie) {
580 const char *key = item_get_key(it);
581 hash_item *old_it = do_item_get(engine, key, it->nkey);
582 ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
583
584 hash_item *new_it = NULL;
585
586 if (old_it != NULL && operation == OPERATION_ADD) {
587 /* add only adds a nonexistent item, but promote to head of LRU */
588 do_item_update(engine, old_it);
589 } else if (!old_it && (operation == OPERATION_REPLACE
590 || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
591 {
592 /* replace only replaces an existing value; don't store */
593 } else if (operation == OPERATION_CAS) {
594 /* validate cas operation */
595 if(old_it == NULL) {
596 // LRU expired
597 stored = ENGINE_KEY_ENOENT;
598 }
599 else if (item_get_cas(it) == item_get_cas(old_it)) {
600 // cas validates
601 // it and old_it may belong to different classes.
602 // I'm updating the stats for the one that's getting pushed out
603 do_item_replace(engine, old_it, it);
604 stored = ENGINE_SUCCESS;
605 } else {
606 if (engine->config.verbose > 1) {
607 fprintf(stderr,
608 "CAS: failure: expected %"PRIu64", got %"PRIu64"\n",
609 item_get_cas(old_it),
610 item_get_cas(it));
611 }
612 stored = ENGINE_KEY_EEXISTS;
613 }
614 } else {
615 /*
616 * Append - combine new and old record into single one. Here it's
617 * atomic and thread-safe.
618 */
619 if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
620 /*
621 * Validate CAS
622 */
623 if (item_get_cas(it) != 0) {
624 // CAS much be equal
625 if (item_get_cas(it) != item_get_cas(old_it)) {
626 stored = ENGINE_KEY_EEXISTS;
627 }
628 }
629
630 if (stored == ENGINE_NOT_STORED) {
631 /* we have it and old_it here - alloc memory to hold both */
632 new_it = do_item_alloc(engine, key, it->nkey,
633 old_it->flags,
634 old_it->exptime,
635 it->nbytes + old_it->nbytes - 2 /* CRLF */,
636 cookie);
637
638 if (new_it == NULL) {
639 /* SERVER_ERROR out of memory */
640 if (old_it != NULL) {
641 do_item_release(engine, old_it);
642 }
643
644 return ENGINE_NOT_STORED;
645 }
646
647 /* copy data from it and old_it to new_it */
648
649 if (operation == OPERATION_APPEND) {
650 memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
651 memcpy(item_get_data(new_it) + old_it->nbytes - 2 /* CRLF */, item_get_data(it), it->nbytes);
652 } else {
653 /* OPERATION_PREPEND */
654 memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
655 memcpy(item_get_data(new_it) + it->nbytes - 2 /* CRLF */, item_get_data(old_it), old_it->nbytes);
656 }
657
658 it = new_it;
659 }
660 }
661
662 if (stored == ENGINE_NOT_STORED) {
663 if (old_it != NULL) {
664 do_item_replace(engine, old_it, it);
665 } else {
666 do_item_link(engine, it);
667 }
668
669 *cas = item_get_cas(it);
670 stored = ENGINE_SUCCESS;
671 }
672 }
673
674 if (old_it != NULL) {
675 do_item_release(engine, old_it); /* release our reference */
676 }
677
678 if (new_it != NULL) {
679 do_item_release(engine, new_it);
680 }
681
682 if (stored == ENGINE_SUCCESS) {
683 *cas = item_get_cas(it);
684 }
685
686 return stored;
687 }
688
689
690 /*
691 * adds a delta value to a numeric item.
692 *
693 * c connection requesting the operation
694 * it item to adjust
695 * incr true to increment value, false to decrement
696 * delta amount to adjust value by
697 * buf buffer for response string
698 *
699 * returns a response string to send back to the client.
700 */
do_add_delta(struct default_engine * engine,hash_item * it,const bool incr,const int64_t delta,uint64_t * rcas,uint64_t * result,const void * cookie)701 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
702 hash_item *it, const bool incr,
703 const int64_t delta, uint64_t *rcas,
704 uint64_t *result, const void *cookie) {
705 const char *ptr;
706 uint64_t value;
707 int res;
708
709 ptr = item_get_data(it);
710
711 if (!safe_strtoull(ptr, &value)) {
712 return ENGINE_EINVAL;
713 }
714
715 if (incr) {
716 value += delta;
717 } else {
718 if(delta > value) {
719 value = 0;
720 } else {
721 value -= delta;
722 }
723 }
724
725 *result = value;
726 char buf[80];
727 if ((res = snprintf(buf, sizeof(buf), "%" PRIu64 "\r\n", value)) == -1) {
728 return ENGINE_EINVAL;
729 }
730 hash_item *new_it = do_item_alloc(engine, item_get_key(it),
731 it->nkey, it->flags,
732 it->exptime, res,
733 cookie );
734 if (new_it == 0) {
735 do_item_unlink(engine, it);
736 return ENGINE_ENOMEM;
737 }
738 memcpy(item_get_data(new_it), buf, res);
739 do_item_replace(engine, it, new_it);
740 *rcas = item_get_cas(new_it);
741 do_item_release(engine, new_it); /* release our reference */
742
743 return ENGINE_SUCCESS;
744 }
745
746 /********************************* ITEM ACCESS *******************************/
747 #ifdef INNODB_MEMCACHED
748
749 /*
750 * Allocates a new item.
751 */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)752 hash_item *item_alloc(struct default_engine *engine,
753 const void *key, size_t nkey, int flags,
754 rel_time_t exptime, int nbytes, const void *cookie) {
755 hash_item *it;
756 it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
757 return it;
758 }
759 #else /* INNODB_MEMCAHED */
760 /*
761 * Allocates a new item.
762 */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)763 hash_item *item_alloc(struct default_engine *engine,
764 const void *key, size_t nkey, int flags,
765 rel_time_t exptime, int nbytes, const void *cookie) {
766 hash_item *it;
767 pthread_mutex_lock(&engine->cache_lock);
768 it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
769 pthread_mutex_unlock(&engine->cache_lock);
770 return it;
771 }
772 #endif /* INNODB_MEMCAHED */
773
774 /*
775 * Returns an item if it hasn't been marked as expired,
776 * lazy-expiring as needed.
777 */
item_get(struct default_engine * engine,const void * key,const size_t nkey)778 hash_item *item_get(struct default_engine *engine,
779 const void *key, const size_t nkey) {
780 hash_item *it;
781 pthread_mutex_lock(&engine->cache_lock);
782 it = do_item_get(engine, key, nkey);
783 pthread_mutex_unlock(&engine->cache_lock);
784 return it;
785 }
786
787 /*
788 * Decrements the reference count on an item and adds it to the freelist if
789 * needed.
790 */
item_release(struct default_engine * engine,hash_item * item)791 void item_release(struct default_engine *engine, hash_item *item) {
792 pthread_mutex_lock(&engine->cache_lock);
793 do_item_release(engine, item);
794 pthread_mutex_unlock(&engine->cache_lock);
795 }
796
797 /*
798 * Unlinks an item from the LRU and hashtable.
799 */
item_unlink(struct default_engine * engine,hash_item * item)800 void item_unlink(struct default_engine *engine, hash_item *item) {
801 pthread_mutex_lock(&engine->cache_lock);
802 do_item_unlink(engine, item);
803 pthread_mutex_unlock(&engine->cache_lock);
804 }
805
do_arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)806 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
807 const void* cookie,
808 const void* key,
809 const int nkey,
810 const bool increment,
811 const bool create,
812 const uint64_t delta,
813 const uint64_t initial,
814 const rel_time_t exptime,
815 uint64_t *cas,
816 uint64_t *result)
817 {
818 hash_item *item = do_item_get(engine, key, nkey);
819 ENGINE_ERROR_CODE ret;
820
821 if (item == NULL) {
822 if (!create) {
823 return ENGINE_KEY_ENOENT;
824 } else {
825 char buffer[128];
826 int len = snprintf(buffer, sizeof(buffer), "%"PRIu64"\r\n",
827 (uint64_t)initial);
828
829 item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
830 if (item == NULL) {
831 return ENGINE_ENOMEM;
832 }
833 memcpy((void*)item_get_data(item), buffer, len);
834 if ((ret = do_store_item(engine, item, cas,
835 OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
836 *result = initial;
837 *cas = item_get_cas(item);
838 }
839 do_item_release(engine, item);
840 }
841 } else {
842 ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
843 do_item_release(engine, item);
844 }
845
846 return ret;
847 }
848
arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)849 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
850 const void* cookie,
851 const void* key,
852 const int nkey,
853 const bool increment,
854 const bool create,
855 const uint64_t delta,
856 const uint64_t initial,
857 const rel_time_t exptime,
858 uint64_t *cas,
859 uint64_t *result)
860 {
861 ENGINE_ERROR_CODE ret;
862
863 pthread_mutex_lock(&engine->cache_lock);
864 ret = do_arithmetic(engine, cookie, key, nkey, increment,
865 create, delta, initial, exptime, cas,
866 result);
867 pthread_mutex_unlock(&engine->cache_lock);
868 return ret;
869 }
870
871 /*
872 * Stores an item in the cache (high level, obeys set/add/replace semantics)
873 */
store_item(struct default_engine * engine,hash_item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)874 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
875 hash_item *item, uint64_t *cas,
876 ENGINE_STORE_OPERATION operation,
877 const void *cookie) {
878 ENGINE_ERROR_CODE ret;
879
880 pthread_mutex_lock(&engine->cache_lock);
881 ret = do_store_item(engine, item, cas, operation, cookie);
882 pthread_mutex_unlock(&engine->cache_lock);
883 return ret;
884 }
885
886 /*
887 * Flushes expired items after a flush_all call
888 */
item_flush_expired(struct default_engine * engine,time_t when)889 void item_flush_expired(struct default_engine *engine, time_t when) {
890 int i;
891 hash_item *iter, *next;
892
893 pthread_mutex_lock(&engine->cache_lock);
894
895 if (when == 0) {
896 engine->config.oldest_live = engine->server.core->get_current_time() - 1;
897 } else {
898 engine->config.oldest_live = engine->server.core->realtime(when) - 1;
899 }
900
901 if (engine->config.oldest_live != 0) {
902 for (i = 0; i < POWER_LARGEST; i++) {
903 /*
904 * The LRU is sorted in decreasing time order, and an item's
905 * timestamp is never newer than its last access time, so we
906 * only need to walk back until we hit an item older than the
907 * oldest_live time.
908 * The oldest_live checking will auto-expire the remaining items.
909 */
910 for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
911 if (iter->time >= engine->config.oldest_live) {
912 next = iter->next;
913 if ((iter->iflag & ITEM_SLABBED) == 0) {
914 do_item_unlink(engine, iter);
915 }
916 } else {
917 /* We've hit the first old item. Continue to the next queue. */
918 break;
919 }
920 }
921 }
922 }
923 pthread_mutex_unlock(&engine->cache_lock);
924 }
925
926 /*
927 * Dumps part of the cache
928 */
item_cachedump(struct default_engine * engine,unsigned int slabs_clsid,unsigned int limit,unsigned int * bytes)929 char *item_cachedump(struct default_engine *engine,
930 unsigned int slabs_clsid,
931 unsigned int limit,
932 unsigned int *bytes) {
933 char *ret;
934
935 pthread_mutex_lock(&engine->cache_lock);
936 ret = do_item_cachedump(slabs_clsid, limit, bytes);
937 pthread_mutex_unlock(&engine->cache_lock);
938 return ret;
939 }
940
item_stats(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)941 void item_stats(struct default_engine *engine,
942 ADD_STAT add_stat, const void *cookie)
943 {
944 pthread_mutex_lock(&engine->cache_lock);
945 do_item_stats(engine, add_stat, cookie);
946 pthread_mutex_unlock(&engine->cache_lock);
947 }
948
949
item_stats_sizes(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)950 void item_stats_sizes(struct default_engine *engine,
951 ADD_STAT add_stat, const void *cookie)
952 {
953 pthread_mutex_lock(&engine->cache_lock);
954 do_item_stats_sizes(engine, add_stat, cookie);
955 pthread_mutex_unlock(&engine->cache_lock);
956 }
957
do_item_link_cursor(struct default_engine * engine,hash_item * cursor,int ii)958 static void do_item_link_cursor(struct default_engine *engine,
959 hash_item *cursor, int ii)
960 {
961 cursor->slabs_clsid = (uint8_t)ii;
962 cursor->next = NULL;
963 cursor->prev = engine->items.tails[ii];
964 engine->items.tails[ii]->next = cursor;
965 engine->items.tails[ii] = cursor;
966 engine->items.sizes[ii]++;
967 }
968
969 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
970 hash_item *item, void *cookie);
971
do_item_walk_cursor(struct default_engine * engine,hash_item * cursor,int steplength,ITERFUNC itemfunc,void * itemdata,ENGINE_ERROR_CODE * error)972 static bool do_item_walk_cursor(struct default_engine *engine,
973 hash_item *cursor,
974 int steplength,
975 ITERFUNC itemfunc,
976 void* itemdata,
977 ENGINE_ERROR_CODE *error)
978 {
979 int ii = 0;
980 *error = ENGINE_SUCCESS;
981
982 while (cursor->prev != NULL && ii < steplength) {
983 ++ii;
984 /* Move cursor */
985 hash_item *ptr = cursor->prev;
986 item_unlink_q(engine, cursor);
987
988 bool done = false;
989
990 if (ptr == engine->items.heads[cursor->slabs_clsid]) {
991 done = true;
992 } else {
993 cursor->next = ptr;
994 cursor->prev = ptr->prev;
995 cursor->prev->next = cursor;
996 ptr->prev = cursor;
997 }
998
999 /* Ignore cursors */
1000 if (!(ptr->nkey == 0 && ptr->nbytes == 0)) {
1001 *error = itemfunc(engine, ptr, itemdata);
1002 if (*error != ENGINE_SUCCESS) {
1003 return false;
1004 }
1005 }
1006
1007 if (done) {
1008 return false;
1009 }
1010 }
1011
1012 return true;
1013 }
1014
item_scrub(struct default_engine * engine,hash_item * item,void * cookie)1015 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1016 hash_item *item,
1017 void *cookie) {
1018 (void)cookie;
1019 engine->scrubber.visited++;
1020 rel_time_t current_time = engine->server.core->get_current_time();
1021 if (item->refcount == 0 &&
1022 (item->exptime != 0 && item->exptime < current_time)) {
1023 do_item_unlink(engine, item);
1024 engine->scrubber.cleaned++;
1025 }
1026 return ENGINE_SUCCESS;
1027 }
1028
item_scrub_class(struct default_engine * engine,hash_item * cursor)1029 static void item_scrub_class(struct default_engine *engine,
1030 hash_item *cursor) {
1031
1032 ENGINE_ERROR_CODE ret;
1033 bool more;
1034 do {
1035 pthread_mutex_lock(&engine->cache_lock);
1036 more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1037 pthread_mutex_unlock(&engine->cache_lock);
1038 if (ret != ENGINE_SUCCESS) {
1039 break;
1040 }
1041 } while (more);
1042 }
1043
item_scubber_main(void * arg)1044 static void *item_scubber_main(void *arg)
1045 {
1046 struct default_engine *engine = arg;
1047 hash_item cursor = { .refcount = 1 };
1048
1049 for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1050 pthread_mutex_lock(&engine->cache_lock);
1051 bool skip = false;
1052 if (engine->items.heads[ii] == NULL) {
1053 skip = true;
1054 } else {
1055 // add the item at the tail
1056 do_item_link_cursor(engine, &cursor, ii);
1057 }
1058 pthread_mutex_unlock(&engine->cache_lock);
1059
1060 if (!skip) {
1061 item_scrub_class(engine, &cursor);
1062 }
1063 }
1064
1065 pthread_mutex_lock(&engine->scrubber.lock);
1066 engine->scrubber.stopped = time(NULL);
1067 engine->scrubber.running = false;
1068 pthread_mutex_unlock(&engine->scrubber.lock);
1069
1070 return NULL;
1071 }
1072
item_start_scrub(struct default_engine * engine)1073 bool item_start_scrub(struct default_engine *engine)
1074 {
1075 bool ret = false;
1076 pthread_mutex_lock(&engine->scrubber.lock);
1077 if (!engine->scrubber.running) {
1078 engine->scrubber.started = time(NULL);
1079 engine->scrubber.stopped = 0;
1080 engine->scrubber.visited = 0;
1081 engine->scrubber.cleaned = 0;
1082 engine->scrubber.running = true;
1083
1084 pthread_t t;
1085 pthread_attr_t attr;
1086
1087 if (pthread_attr_init(&attr) != 0 ||
1088 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1089 pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1090 {
1091 engine->scrubber.running = false;
1092 } else {
1093 ret = true;
1094 }
1095 }
1096 pthread_mutex_unlock(&engine->scrubber.lock);
1097
1098 return ret;
1099 }
1100