1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <fcntl.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <time.h>
9 #include <assert.h>
10 #include <inttypes.h>
11
12 #include "default_engine.h"
13
14 /* Forward Declarations */
15 static void item_link_q(struct default_engine *engine, hash_item *it);
16 static void item_unlink_q(struct default_engine *engine, hash_item *it);
17 static hash_item *do_item_alloc(struct default_engine *engine,
18 const void *key, const size_t nkey,
19 const int flags, const rel_time_t exptime,
20 const int nbytes,
21 const void *cookie);
22 static hash_item *do_item_get(struct default_engine *engine,
23 const char *key, const size_t nkey);
24 static int do_item_link(struct default_engine *engine, hash_item *it);
25 static void do_item_unlink(struct default_engine *engine, hash_item *it);
26 static void do_item_release(struct default_engine *engine, hash_item *it);
27 static void do_item_update(struct default_engine *engine, hash_item *it);
28 static int do_item_replace(struct default_engine *engine,
29 hash_item *it, hash_item *new_it);
30 static void item_free(struct default_engine *engine, hash_item *it);
31
32 /*
33 * We only reposition items in the LRU queue if they haven't been repositioned
34 * in this many seconds. That saves us from churning on frequently-accessed
35 * items.
36 */
37 #define ITEM_UPDATE_INTERVAL 60
38 /*
39 * To avoid scanning through the complete cache in some circumstances we'll
40 * just give up and return an error after inspecting a fixed number of objects.
41 */
42 static const int search_items = 50;
43
item_stats_reset(struct default_engine * engine)44 void item_stats_reset(struct default_engine *engine) {
45 pthread_mutex_lock(&engine->cache_lock);
46 memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
47 pthread_mutex_unlock(&engine->cache_lock);
48 }
49
50
51 /* warning: don't use these macros with a function, as it evals its arg twice */
ITEM_ntotal(struct default_engine * engine,const hash_item * item)52 static inline size_t ITEM_ntotal(struct default_engine *engine,
53 const hash_item *item) {
54 size_t ret = sizeof(*item) + item->nkey + item->nbytes;
55 if (engine->config.use_cas) {
56 ret += sizeof(uint64_t);
57 }
58
59 return ret;
60 }
61
62 /* Get the next CAS id for a new item. */
get_cas_id(void)63 static uint64_t get_cas_id(void) {
64 static uint64_t cas_id = 0;
65 return ++cas_id;
66 }
67
68 /* Enable this for reference-count debugging. */
69 #if 0
70 # define DEBUG_REFCNT(it,op) \
71 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
72 it, op, it->refcount, \
73 (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
74 (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
75 #else
76 # define DEBUG_REFCNT(it,op) while(0)
77 #endif
78
79
80 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)81 hash_item *do_item_alloc(struct default_engine *engine,
82 const void *key,
83 const size_t nkey,
84 const int flags,
85 const rel_time_t exptime,
86 const int nbytes,
87 const void *cookie) {
88 hash_item *it = NULL;
89 size_t ntotal = sizeof(hash_item) + nkey + nbytes;
90 if (engine->config.use_cas) {
91 ntotal += sizeof(uint64_t);
92 }
93
94 unsigned int id = slabs_clsid(engine, ntotal);
95 if (id == 0)
96 return 0;
97
98 /* do a quick check if we have any expired items in the tail.. */
99 int tries = search_items;
100 hash_item *search;
101
102 rel_time_t current_time = engine->server.core->get_current_time();
103
104 for (search = engine->items.tails[id];
105 tries > 0 && search != NULL;
106 tries--, search=search->prev) {
107 if (search->refcount == 0 &&
108 (search->exptime != 0 && search->exptime < current_time)) {
109 it = search;
110 /* I don't want to actually free the object, just steal
111 * the item to avoid to grab the slab mutex twice ;-)
112 */
113 pthread_mutex_lock(&engine->stats.lock);
114 engine->stats.reclaimed++;
115 pthread_mutex_unlock(&engine->stats.lock);
116 engine->items.itemstats[id].reclaimed++;
117 it->refcount = 1;
118 slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
119 do_item_unlink(engine, it);
120 /* Initialize the item block: */
121 it->slabs_clsid = 0;
122 it->refcount = 0;
123 break;
124 }
125 }
126
127 if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
128 /*
129 ** Could not find an expired item at the tail, and memory allocation
130 ** failed. Try to evict some items!
131 */
132 tries = search_items;
133
134 /* If requested to not push old items out of cache when memory runs out,
135 * we're out of luck at this point...
136 */
137
138 if (engine->config.evict_to_free == 0) {
139 engine->items.itemstats[id].outofmemory++;
140 return NULL;
141 }
142
143 /*
144 * try to get one off the right LRU
145 * don't necessariuly unlink the tail because it may be locked: refcount>0
146 * search up from tail an item with refcount==0 and unlink it; give up after search_items
147 * tries
148 */
149
150 if (engine->items.tails[id] == 0) {
151 engine->items.itemstats[id].outofmemory++;
152 return NULL;
153 }
154
155 for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
156 if (search->refcount == 0) {
157 if (search->exptime == 0 || search->exptime > current_time) {
158 engine->items.itemstats[id].evicted++;
159 engine->items.itemstats[id].evicted_time = current_time - search->time;
160 if (search->exptime != 0) {
161 engine->items.itemstats[id].evicted_nonzero++;
162 }
163 pthread_mutex_lock(&engine->stats.lock);
164 engine->stats.evictions++;
165 pthread_mutex_unlock(&engine->stats.lock);
166 engine->server.stat->evicting(cookie,
167 item_get_key(search),
168 search->nkey);
169 } else {
170 engine->items.itemstats[id].reclaimed++;
171 pthread_mutex_lock(&engine->stats.lock);
172 engine->stats.reclaimed++;
173 pthread_mutex_unlock(&engine->stats.lock);
174 }
175 do_item_unlink(engine, search);
176 break;
177 }
178 }
179 it = slabs_alloc(engine, ntotal, id);
180 if (it == 0) {
181 engine->items.itemstats[id].outofmemory++;
182 /* Last ditch effort. There is a very rare bug which causes
183 * refcount leaks. We've fixed most of them, but it still happens,
184 * and it may happen in the future.
185 * We can reasonably assume no item can stay locked for more than
186 * three hours, so if we find one in the tail which is that old,
187 * free it anyway.
188 */
189 tries = search_items;
190 for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
191 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
192 engine->items.itemstats[id].tailrepairs++;
193 search->refcount = 0;
194 do_item_unlink(engine, search);
195 break;
196 }
197 }
198 it = slabs_alloc(engine, ntotal, id);
199 if (it == 0) {
200 return NULL;
201 }
202 }
203 }
204
205 assert(it->slabs_clsid == 0);
206
207 it->slabs_clsid = id;
208
209 assert(it != engine->items.heads[it->slabs_clsid]);
210
211 it->next = it->prev = it->h_next = 0;
212 it->refcount = 1; /* the caller will have a reference */
213 DEBUG_REFCNT(it, '*');
214 it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
215 it->nkey = nkey;
216 it->nbytes = nbytes;
217 it->flags = flags;
218 memcpy((void*)item_get_key(it), key, nkey);
219 it->exptime = exptime;
220 return it;
221 }
222
item_free(struct default_engine * engine,hash_item * it)223 static void item_free(struct default_engine *engine, hash_item *it) {
224 size_t ntotal = ITEM_ntotal(engine, it);
225 unsigned int clsid;
226 assert((it->iflag & ITEM_LINKED) == 0);
227 assert(it != engine->items.heads[it->slabs_clsid]);
228 assert(it != engine->items.tails[it->slabs_clsid]);
229 assert(it->refcount == 0);
230
231 /* so slab size changer can tell later if item is already free or not */
232 clsid = it->slabs_clsid;
233 it->slabs_clsid = 0;
234 it->iflag |= ITEM_SLABBED;
235 DEBUG_REFCNT(it, 'F');
236 slabs_free(engine, it, ntotal, clsid);
237 }
238
item_link_q(struct default_engine * engine,hash_item * it)239 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
240 hash_item **head, **tail;
241 assert(it->slabs_clsid < POWER_LARGEST);
242 assert((it->iflag & ITEM_SLABBED) == 0);
243
244 head = &engine->items.heads[it->slabs_clsid];
245 tail = &engine->items.tails[it->slabs_clsid];
246 assert(it != *head);
247 assert((*head && *tail) || (*head == 0 && *tail == 0));
248 it->prev = 0;
249 it->next = *head;
250 if (it->next) it->next->prev = it;
251 *head = it;
252 if (*tail == 0) *tail = it;
253 engine->items.sizes[it->slabs_clsid]++;
254 return;
255 }
256
item_unlink_q(struct default_engine * engine,hash_item * it)257 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
258 hash_item **head, **tail;
259 assert(it->slabs_clsid < POWER_LARGEST);
260 head = &engine->items.heads[it->slabs_clsid];
261 tail = &engine->items.tails[it->slabs_clsid];
262
263 if (*head == it) {
264 assert(it->prev == 0);
265 *head = it->next;
266 }
267 if (*tail == it) {
268 assert(it->next == 0);
269 *tail = it->prev;
270 }
271 assert(it->next != it);
272 assert(it->prev != it);
273
274 if (it->next) it->next->prev = it->prev;
275 if (it->prev) it->prev->next = it->next;
276 engine->items.sizes[it->slabs_clsid]--;
277 return;
278 }
279
do_item_link(struct default_engine * engine,hash_item * it)280 int do_item_link(struct default_engine *engine, hash_item *it) {
281 MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
282 assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
283 assert(it->nbytes < (1024 * 1024)); /* 1MB max size */
284 it->iflag |= ITEM_LINKED;
285 it->time = engine->server.core->get_current_time();
286 assoc_insert(engine, engine->server.core->hash(item_get_key(it),
287 it->nkey, 0),
288 it);
289
290 pthread_mutex_lock(&engine->stats.lock);
291 engine->stats.curr_bytes += ITEM_ntotal(engine, it);
292 engine->stats.curr_items += 1;
293 engine->stats.total_items += 1;
294 pthread_mutex_unlock(&engine->stats.lock);
295
296 /* Allocate a new CAS ID on link. */
297 item_set_cas(NULL, NULL, it, get_cas_id());
298
299 item_link_q(engine, it);
300
301 return 1;
302 }
303
do_item_unlink(struct default_engine * engine,hash_item * it)304 void do_item_unlink(struct default_engine *engine, hash_item *it) {
305 MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
306 if ((it->iflag & ITEM_LINKED) != 0) {
307 it->iflag &= ~ITEM_LINKED;
308 pthread_mutex_lock(&engine->stats.lock);
309 engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
310 engine->stats.curr_items -= 1;
311 pthread_mutex_unlock(&engine->stats.lock);
312 assoc_delete(engine, engine->server.core->hash(item_get_key(it),
313 it->nkey, 0),
314 item_get_key(it), it->nkey);
315 item_unlink_q(engine, it);
316 if (it->refcount == 0) {
317 item_free(engine, it);
318 }
319 }
320 }
321
do_item_release(struct default_engine * engine,hash_item * it)322 void do_item_release(struct default_engine *engine, hash_item *it) {
323 MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
324 if (it->refcount != 0) {
325 it->refcount--;
326 DEBUG_REFCNT(it, '-');
327 }
328 if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
329 item_free(engine, it);
330 }
331 }
332
do_item_update(struct default_engine * engine,hash_item * it)333 void do_item_update(struct default_engine *engine, hash_item *it) {
334 rel_time_t current_time = engine->server.core->get_current_time();
335 MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
336 if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
337 assert((it->iflag & ITEM_SLABBED) == 0);
338
339 if ((it->iflag & ITEM_LINKED) != 0) {
340 item_unlink_q(engine, it);
341 it->time = current_time;
342 item_link_q(engine, it);
343 }
344 }
345 }
346
do_item_replace(struct default_engine * engine,hash_item * it,hash_item * new_it)347 int do_item_replace(struct default_engine *engine,
348 hash_item *it, hash_item *new_it) {
349 MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
350 item_get_key(new_it), new_it->nkey, new_it->nbytes);
351 assert((it->iflag & ITEM_SLABBED) == 0);
352
353 do_item_unlink(engine, it);
354 return do_item_link(engine, new_it);
355 }
356
357 /*@null@*/
do_item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)358 static char *do_item_cachedump(const unsigned int slabs_clsid,
359 const unsigned int limit,
360 unsigned int *bytes) {
361 #ifdef FUTURE
362 unsigned int memlimit = 2 * 1024 * 1024; /* 2MB max response size */
363 char *buffer;
364 unsigned int bufcurr;
365 hash_item *it;
366 unsigned int len;
367 unsigned int shown = 0;
368 char key_temp[KEY_MAX_LENGTH + 1];
369 char temp[512];
370
371 it = engine->items.heads[slabs_clsid];
372
373 buffer = malloc((size_t)memlimit);
374 if (buffer == 0) return NULL;
375 bufcurr = 0;
376
377
378 while (it != NULL && (limit == 0 || shown < limit)) {
379 assert(it->nkey <= KEY_MAX_LENGTH);
380 /* Copy the key since it may not be null-terminated in the struct */
381 strncpy(key_temp, item_get_key(it), it->nkey);
382 key_temp[it->nkey] = 0x00; /* terminate */
383 len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
384 key_temp, it->nbytes,
385 (unsigned long)it->exptime + process_started);
386 if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
387 break;
388 memcpy(buffer + bufcurr, temp, len);
389 bufcurr += len;
390 shown++;
391 it = it->next;
392 }
393
394
395 memcpy(buffer + bufcurr, "END\r\n", 6);
396 bufcurr += 5;
397
398 *bytes = bufcurr;
399 return buffer;
400 #endif
401 (void)slabs_clsid;
402 (void)limit;
403 (void)bytes;
404 return NULL;
405 }
406
do_item_stats(struct default_engine * engine,ADD_STAT add_stats,const void * c)407 static void do_item_stats(struct default_engine *engine,
408 ADD_STAT add_stats, const void *c) {
409 int i;
410 rel_time_t current_time = engine->server.core->get_current_time();
411 for (i = 0; i < POWER_LARGEST; i++) {
412 if (engine->items.tails[i] != NULL) {
413 int search = search_items;
414 while (search > 0 &&
415 engine->items.tails[i] != NULL &&
416 ((engine->config.oldest_live != 0 && /* Item flushd */
417 engine->config.oldest_live <= current_time &&
418 engine->items.tails[i]->time <= engine->config.oldest_live) ||
419 (engine->items.tails[i]->exptime != 0 && /* and not expired */
420 engine->items.tails[i]->exptime < current_time))) {
421 --search;
422 if (engine->items.tails[i]->refcount == 0) {
423 do_item_unlink(engine, engine->items.tails[i]);
424 } else {
425 break;
426 }
427 }
428 if (engine->items.tails[i] == NULL) {
429 /* We removed all of the items in this slab class */
430 continue;
431 }
432
433 const char *prefix = "items";
434 add_statistics(c, add_stats, prefix, i, "number", "%u",
435 engine->items.sizes[i]);
436 add_statistics(c, add_stats, prefix, i, "age", "%u",
437 engine->items.tails[i]->time);
438 add_statistics(c, add_stats, prefix, i, "evicted",
439 "%u", engine->items.itemstats[i].evicted);
440 add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
441 "%u", engine->items.itemstats[i].evicted_nonzero);
442 add_statistics(c, add_stats, prefix, i, "evicted_time",
443 "%u", engine->items.itemstats[i].evicted_time);
444 add_statistics(c, add_stats, prefix, i, "outofmemory",
445 "%u", engine->items.itemstats[i].outofmemory);
446 add_statistics(c, add_stats, prefix, i, "tailrepairs",
447 "%u", engine->items.itemstats[i].tailrepairs);;
448 add_statistics(c, add_stats, prefix, i, "reclaimed",
449 "%u", engine->items.itemstats[i].reclaimed);;
450 }
451 }
452 }
453
454 /** dumps out a list of objects of each size, with granularity of 32 bytes */
455 /*@null@*/
do_item_stats_sizes(struct default_engine * engine,ADD_STAT add_stats,const void * c)456 static void do_item_stats_sizes(struct default_engine *engine,
457 ADD_STAT add_stats, const void *c) {
458
459 /* max 1MB object, divided into 32 bytes size buckets */
460 const int num_buckets = 32768;
461 unsigned int *histogram = calloc(num_buckets, sizeof(int));
462
463 if (histogram != NULL) {
464 int i;
465
466 /* build the histogram */
467 for (i = 0; i < POWER_LARGEST; i++) {
468 hash_item *iter = engine->items.heads[i];
469 while (iter) {
470 int ntotal = ITEM_ntotal(engine, iter);
471 int bucket = ntotal / 32;
472 if ((ntotal % 32) != 0) bucket++;
473 if (bucket < num_buckets) histogram[bucket]++;
474 iter = iter->next;
475 }
476 }
477
478 /* write the buffer */
479 for (i = 0; i < num_buckets; i++) {
480 if (histogram[i] != 0) {
481 char key[8], val[32];
482 int klen, vlen;
483 klen = snprintf(key, sizeof(key), "%d", i * 32);
484 vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
485 assert(klen < sizeof(key));
486 assert(vlen < sizeof(val));
487 add_stats(key, klen, val, vlen, c);
488 }
489 }
490 free(histogram);
491 }
492 }
493
494 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(struct default_engine * engine,const char * key,const size_t nkey)495 hash_item *do_item_get(struct default_engine *engine,
496 const char *key, const size_t nkey) {
497 rel_time_t current_time = engine->server.core->get_current_time();
498 hash_item *it = assoc_find(engine, engine->server.core->hash(key,
499 nkey, 0),
500 key, nkey);
501 int was_found = 0;
502
503 if (engine->config.verbose > 2) {
504 EXTENSION_LOGGER_DESCRIPTOR *logger;
505 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
506 if (it == NULL) {
507 logger->log(EXTENSION_LOG_DEBUG, NULL,
508 "> NOT FOUND %s", key);
509 } else {
510 logger->log(EXTENSION_LOG_DEBUG, NULL,
511 "> FOUND KEY %s",
512 (const char*)item_get_key(it));
513 was_found++;
514 }
515 }
516
517 if (it != NULL && engine->config.oldest_live != 0 &&
518 engine->config.oldest_live <= current_time &&
519 it->time <= engine->config.oldest_live) {
520 do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
521 it = NULL;
522 }
523
524 if (it == NULL && was_found) {
525 EXTENSION_LOGGER_DESCRIPTOR *logger;
526 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
527 logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by flush");
528 was_found--;
529 }
530
531 if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
532 do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
533 it = NULL;
534 }
535
536 if (it == NULL && was_found) {
537 EXTENSION_LOGGER_DESCRIPTOR *logger;
538 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
539 logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by expire");
540 was_found--;
541 }
542
543 if (it != NULL) {
544 it->refcount++;
545 DEBUG_REFCNT(it, '+');
546 do_item_update(engine, it);
547 }
548
549 return it;
550 }
551
552 /*
553 * Stores an item in the cache according to the semantics of one of the set
554 * commands. In threaded mode, this is protected by the cache lock.
555 *
556 * Returns the state of storage.
557 */
do_store_item(struct default_engine * engine,hash_item * it,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)558 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
559 hash_item *it, uint64_t *cas,
560 ENGINE_STORE_OPERATION operation,
561 const void *cookie) {
562 const char *key = item_get_key(it);
563 hash_item *old_it = do_item_get(engine, key, it->nkey);
564 ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
565
566 hash_item *new_it = NULL;
567
568 if (old_it != NULL && operation == OPERATION_ADD) {
569 /* add only adds a nonexistent item, but promote to head of LRU */
570 do_item_update(engine, old_it);
571 } else if (!old_it && (operation == OPERATION_REPLACE
572 || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
573 {
574 /* replace only replaces an existing value; don't store */
575 } else if (operation == OPERATION_CAS) {
576 /* validate cas operation */
577 if(old_it == NULL) {
578 // LRU expired
579 stored = ENGINE_KEY_ENOENT;
580 }
581 else if (item_get_cas(it) == item_get_cas(old_it)) {
582 // cas validates
583 // it and old_it may belong to different classes.
584 // I'm updating the stats for the one that's getting pushed out
585 do_item_replace(engine, old_it, it);
586 stored = ENGINE_SUCCESS;
587 } else {
588 if (engine->config.verbose > 1) {
589 EXTENSION_LOGGER_DESCRIPTOR *logger;
590 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
591 logger->log(EXTENSION_LOG_INFO, NULL,
592 "CAS: failure: expected %"PRIu64", got %"PRIu64"\n",
593 item_get_cas(old_it),
594 item_get_cas(it));
595 }
596 stored = ENGINE_KEY_EEXISTS;
597 }
598 } else {
599 /*
600 * Append - combine new and old record into single one. Here it's
601 * atomic and thread-safe.
602 */
603 if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
604 /*
605 * Validate CAS
606 */
607 if (item_get_cas(it) != 0) {
608 // CAS much be equal
609 if (item_get_cas(it) != item_get_cas(old_it)) {
610 stored = ENGINE_KEY_EEXISTS;
611 }
612 }
613
614 if (stored == ENGINE_NOT_STORED) {
615 /* we have it and old_it here - alloc memory to hold both */
616 new_it = do_item_alloc(engine, key, it->nkey,
617 old_it->flags,
618 old_it->exptime,
619 it->nbytes + old_it->nbytes,
620 cookie);
621
622 if (new_it == NULL) {
623 /* SERVER_ERROR out of memory */
624 if (old_it != NULL) {
625 do_item_release(engine, old_it);
626 }
627
628 return ENGINE_NOT_STORED;
629 }
630
631 /* copy data from it and old_it to new_it */
632
633 if (operation == OPERATION_APPEND) {
634 memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
635 memcpy(item_get_data(new_it) + old_it->nbytes, item_get_data(it), it->nbytes);
636 } else {
637 /* OPERATION_PREPEND */
638 memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
639 memcpy(item_get_data(new_it) + it->nbytes, item_get_data(old_it), old_it->nbytes);
640 }
641
642 it = new_it;
643 }
644 }
645
646 if (stored == ENGINE_NOT_STORED) {
647 if (old_it != NULL) {
648 do_item_replace(engine, old_it, it);
649 } else {
650 do_item_link(engine, it);
651 }
652
653 *cas = item_get_cas(it);
654 stored = ENGINE_SUCCESS;
655 }
656 }
657
658 if (old_it != NULL) {
659 do_item_release(engine, old_it); /* release our reference */
660 }
661
662 if (new_it != NULL) {
663 do_item_release(engine, new_it);
664 }
665
666 if (stored == ENGINE_SUCCESS) {
667 *cas = item_get_cas(it);
668 }
669
670 return stored;
671 }
672
673
674 /*
675 * adds a delta value to a numeric item.
676 *
677 * c connection requesting the operation
678 * it item to adjust
679 * incr true to increment value, false to decrement
680 * delta amount to adjust value by
681 * buf buffer for response string
682 *
683 * returns a response string to send back to the client.
684 */
do_add_delta(struct default_engine * engine,hash_item * it,const bool incr,const int64_t delta,uint64_t * rcas,uint64_t * result,const void * cookie)685 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
686 hash_item *it, const bool incr,
687 const int64_t delta, uint64_t *rcas,
688 uint64_t *result, const void *cookie) {
689 const char *ptr;
690 uint64_t value;
691 char buf[80];
692 int res;
693
694 if (it->nbytes >= (sizeof(buf) - 1)) {
695 return ENGINE_EINVAL;
696 }
697
698 ptr = item_get_data(it);
699 memcpy(buf, ptr, it->nbytes);
700 buf[it->nbytes] = '\0';
701
702 if (!safe_strtoull(buf, &value)) {
703 return ENGINE_EINVAL;
704 }
705
706 if (incr) {
707 value += delta;
708 } else {
709 if(delta > value) {
710 value = 0;
711 } else {
712 value -= delta;
713 }
714 }
715
716 *result = value;
717 if ((res = snprintf(buf, sizeof(buf), "%" PRIu64, value)) == -1) {
718 return ENGINE_EINVAL;
719 }
720
721 if (it->refcount == 1 && res <= it->nbytes) {
722 // we can do inline replacement
723 memcpy(item_get_data(it), buf, res);
724 memset(item_get_data(it) + res, ' ', it->nbytes - res);
725 item_set_cas(NULL, NULL, it, get_cas_id());
726 *rcas = item_get_cas(it);
727 } else {
728 hash_item *new_it = do_item_alloc(engine, item_get_key(it),
729 it->nkey, it->flags,
730 it->exptime, res,
731 cookie);
732 if (new_it == NULL) {
733 do_item_unlink(engine, it);
734 return ENGINE_ENOMEM;
735 }
736 memcpy(item_get_data(new_it), buf, res);
737 do_item_replace(engine, it, new_it);
738 *rcas = item_get_cas(new_it);
739 do_item_release(engine, new_it); /* release our reference */
740 }
741
742 return ENGINE_SUCCESS;
743 }
744
745 /********************************* ITEM ACCESS *******************************/
746
747 /*
748 * Allocates a new item.
749 */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)750 hash_item *item_alloc(struct default_engine *engine,
751 const void *key, size_t nkey, int flags,
752 rel_time_t exptime, int nbytes, const void *cookie) {
753 hash_item *it;
754 pthread_mutex_lock(&engine->cache_lock);
755 it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
756 pthread_mutex_unlock(&engine->cache_lock);
757 return it;
758 }
759
760 /*
761 * Returns an item if it hasn't been marked as expired,
762 * lazy-expiring as needed.
763 */
item_get(struct default_engine * engine,const void * key,const size_t nkey)764 hash_item *item_get(struct default_engine *engine,
765 const void *key, const size_t nkey) {
766 hash_item *it;
767 pthread_mutex_lock(&engine->cache_lock);
768 it = do_item_get(engine, key, nkey);
769 pthread_mutex_unlock(&engine->cache_lock);
770 return it;
771 }
772
773 /*
774 * Decrements the reference count on an item and adds it to the freelist if
775 * needed.
776 */
item_release(struct default_engine * engine,hash_item * item)777 void item_release(struct default_engine *engine, hash_item *item) {
778 pthread_mutex_lock(&engine->cache_lock);
779 do_item_release(engine, item);
780 pthread_mutex_unlock(&engine->cache_lock);
781 }
782
783 /*
784 * Unlinks an item from the LRU and hashtable.
785 */
item_unlink(struct default_engine * engine,hash_item * item)786 void item_unlink(struct default_engine *engine, hash_item *item) {
787 pthread_mutex_lock(&engine->cache_lock);
788 do_item_unlink(engine, item);
789 pthread_mutex_unlock(&engine->cache_lock);
790 }
791
do_arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)792 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
793 const void* cookie,
794 const void* key,
795 const int nkey,
796 const bool increment,
797 const bool create,
798 const uint64_t delta,
799 const uint64_t initial,
800 const rel_time_t exptime,
801 uint64_t *cas,
802 uint64_t *result)
803 {
804 hash_item *item = do_item_get(engine, key, nkey);
805 ENGINE_ERROR_CODE ret;
806
807 if (item == NULL) {
808 if (!create) {
809 return ENGINE_KEY_ENOENT;
810 } else {
811 char buffer[128];
812 int len = snprintf(buffer, sizeof(buffer), "%"PRIu64,
813 (uint64_t)initial);
814
815 item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
816 if (item == NULL) {
817 return ENGINE_ENOMEM;
818 }
819 memcpy((void*)item_get_data(item), buffer, len);
820 if ((ret = do_store_item(engine, item, cas,
821 OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
822 *result = initial;
823 *cas = item_get_cas(item);
824 }
825 do_item_release(engine, item);
826 }
827 } else {
828 ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
829 do_item_release(engine, item);
830 }
831
832 return ret;
833 }
834
arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)835 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
836 const void* cookie,
837 const void* key,
838 const int nkey,
839 const bool increment,
840 const bool create,
841 const uint64_t delta,
842 const uint64_t initial,
843 const rel_time_t exptime,
844 uint64_t *cas,
845 uint64_t *result)
846 {
847 ENGINE_ERROR_CODE ret;
848
849 pthread_mutex_lock(&engine->cache_lock);
850 ret = do_arithmetic(engine, cookie, key, nkey, increment,
851 create, delta, initial, exptime, cas,
852 result);
853 pthread_mutex_unlock(&engine->cache_lock);
854 return ret;
855 }
856
857 /*
858 * Stores an item in the cache (high level, obeys set/add/replace semantics)
859 */
store_item(struct default_engine * engine,hash_item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)860 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
861 hash_item *item, uint64_t *cas,
862 ENGINE_STORE_OPERATION operation,
863 const void *cookie) {
864 ENGINE_ERROR_CODE ret;
865
866 pthread_mutex_lock(&engine->cache_lock);
867 ret = do_store_item(engine, item, cas, operation, cookie);
868 pthread_mutex_unlock(&engine->cache_lock);
869 return ret;
870 }
871
do_touch_item(struct default_engine * engine,const void * key,uint16_t nkey,uint32_t exptime)872 static hash_item *do_touch_item(struct default_engine *engine,
873 const void *key,
874 uint16_t nkey,
875 uint32_t exptime)
876 {
877 hash_item *item = do_item_get(engine, key, nkey);
878 if (item != NULL) {
879 item->exptime = exptime;
880 }
881 return item;
882 }
883
touch_item(struct default_engine * engine,const void * key,uint16_t nkey,uint32_t exptime)884 hash_item *touch_item(struct default_engine *engine,
885 const void *key,
886 uint16_t nkey,
887 uint32_t exptime)
888 {
889 hash_item *ret;
890
891 pthread_mutex_lock(&engine->cache_lock);
892 ret = do_touch_item(engine, key, nkey, exptime);
893 pthread_mutex_unlock(&engine->cache_lock);
894 return ret;
895 }
896
897 /*
898 * Flushes expired items after a flush_all call
899 */
item_flush_expired(struct default_engine * engine,time_t when)900 void item_flush_expired(struct default_engine *engine, time_t when) {
901 int i;
902 hash_item *iter, *next;
903
904 pthread_mutex_lock(&engine->cache_lock);
905
906 if (when == 0) {
907 engine->config.oldest_live = engine->server.core->get_current_time() - 1;
908 } else {
909 engine->config.oldest_live = engine->server.core->realtime(when) - 1;
910 }
911
912 if (engine->config.oldest_live != 0) {
913 for (i = 0; i < POWER_LARGEST; i++) {
914 /*
915 * The LRU is sorted in decreasing time order, and an item's
916 * timestamp is never newer than its last access time, so we
917 * only need to walk back until we hit an item older than the
918 * oldest_live time.
919 * The oldest_live checking will auto-expire the remaining items.
920 */
921 for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
922 if (iter->time >= engine->config.oldest_live) {
923 next = iter->next;
924 if ((iter->iflag & ITEM_SLABBED) == 0) {
925 do_item_unlink(engine, iter);
926 }
927 } else {
928 /* We've hit the first old item. Continue to the next queue. */
929 break;
930 }
931 }
932 }
933 }
934 pthread_mutex_unlock(&engine->cache_lock);
935 }
936
937 /*
938 * Dumps part of the cache
939 */
item_cachedump(struct default_engine * engine,unsigned int slabs_clsid,unsigned int limit,unsigned int * bytes)940 char *item_cachedump(struct default_engine *engine,
941 unsigned int slabs_clsid,
942 unsigned int limit,
943 unsigned int *bytes) {
944 char *ret;
945
946 pthread_mutex_lock(&engine->cache_lock);
947 ret = do_item_cachedump(slabs_clsid, limit, bytes);
948 pthread_mutex_unlock(&engine->cache_lock);
949 return ret;
950 }
951
item_stats(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)952 void item_stats(struct default_engine *engine,
953 ADD_STAT add_stat, const void *cookie)
954 {
955 pthread_mutex_lock(&engine->cache_lock);
956 do_item_stats(engine, add_stat, cookie);
957 pthread_mutex_unlock(&engine->cache_lock);
958 }
959
960
item_stats_sizes(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)961 void item_stats_sizes(struct default_engine *engine,
962 ADD_STAT add_stat, const void *cookie)
963 {
964 pthread_mutex_lock(&engine->cache_lock);
965 do_item_stats_sizes(engine, add_stat, cookie);
966 pthread_mutex_unlock(&engine->cache_lock);
967 }
968
do_item_link_cursor(struct default_engine * engine,hash_item * cursor,int ii)969 static void do_item_link_cursor(struct default_engine *engine,
970 hash_item *cursor, int ii)
971 {
972 cursor->slabs_clsid = (uint8_t)ii;
973 cursor->next = NULL;
974 cursor->prev = engine->items.tails[ii];
975 engine->items.tails[ii]->next = cursor;
976 engine->items.tails[ii] = cursor;
977 engine->items.sizes[ii]++;
978 }
979
980 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
981 hash_item *item, void *cookie);
982
do_item_walk_cursor(struct default_engine * engine,hash_item * cursor,int steplength,ITERFUNC itemfunc,void * itemdata,ENGINE_ERROR_CODE * error)983 static bool do_item_walk_cursor(struct default_engine *engine,
984 hash_item *cursor,
985 int steplength,
986 ITERFUNC itemfunc,
987 void* itemdata,
988 ENGINE_ERROR_CODE *error)
989 {
990 int ii = 0;
991 *error = ENGINE_SUCCESS;
992
993 while (cursor->prev != NULL && ii < steplength) {
994 ++ii;
995 /* Move cursor */
996 hash_item *ptr = cursor->prev;
997 item_unlink_q(engine, cursor);
998
999 bool done = false;
1000 if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1001 done = true;
1002 cursor->prev = NULL;
1003 } else {
1004 cursor->next = ptr;
1005 cursor->prev = ptr->prev;
1006 cursor->prev->next = cursor;
1007 ptr->prev = cursor;
1008 }
1009
1010 /* Ignore cursors */
1011 if (ptr->nkey == 0 && ptr->nbytes == 0) {
1012 --ii;
1013 } else {
1014 *error = itemfunc(engine, ptr, itemdata);
1015 if (*error != ENGINE_SUCCESS) {
1016 return false;
1017 }
1018 }
1019
1020 if (done) {
1021 return false;
1022 }
1023 }
1024
1025 return (cursor->prev != NULL);
1026 }
1027
item_scrub(struct default_engine * engine,hash_item * item,void * cookie)1028 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1029 hash_item *item,
1030 void *cookie) {
1031 (void)cookie;
1032 engine->scrubber.visited++;
1033 rel_time_t current_time = engine->server.core->get_current_time();
1034 if (item->refcount == 0 &&
1035 (item->exptime != 0 && item->exptime < current_time)) {
1036 do_item_unlink(engine, item);
1037 engine->scrubber.cleaned++;
1038 }
1039 return ENGINE_SUCCESS;
1040 }
1041
item_scrub_class(struct default_engine * engine,hash_item * cursor)1042 static void item_scrub_class(struct default_engine *engine,
1043 hash_item *cursor) {
1044
1045 ENGINE_ERROR_CODE ret;
1046 bool more;
1047 do {
1048 pthread_mutex_lock(&engine->cache_lock);
1049 more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1050 pthread_mutex_unlock(&engine->cache_lock);
1051 if (ret != ENGINE_SUCCESS) {
1052 break;
1053 }
1054 } while (more);
1055 }
1056
item_scubber_main(void * arg)1057 static void *item_scubber_main(void *arg)
1058 {
1059 struct default_engine *engine = arg;
1060 hash_item cursor = { .refcount = 1 };
1061
1062 for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1063 pthread_mutex_lock(&engine->cache_lock);
1064 bool skip = false;
1065 if (engine->items.heads[ii] == NULL) {
1066 skip = true;
1067 } else {
1068 // add the item at the tail
1069 do_item_link_cursor(engine, &cursor, ii);
1070 }
1071 pthread_mutex_unlock(&engine->cache_lock);
1072
1073 if (!skip) {
1074 item_scrub_class(engine, &cursor);
1075 }
1076 }
1077
1078 pthread_mutex_lock(&engine->scrubber.lock);
1079 engine->scrubber.stopped = time(NULL);
1080 engine->scrubber.running = false;
1081 pthread_mutex_unlock(&engine->scrubber.lock);
1082
1083 return NULL;
1084 }
1085
item_start_scrub(struct default_engine * engine)1086 bool item_start_scrub(struct default_engine *engine)
1087 {
1088 bool ret = false;
1089 pthread_mutex_lock(&engine->scrubber.lock);
1090 if (!engine->scrubber.running) {
1091 engine->scrubber.started = time(NULL);
1092 engine->scrubber.stopped = 0;
1093 engine->scrubber.visited = 0;
1094 engine->scrubber.cleaned = 0;
1095 engine->scrubber.running = true;
1096
1097 pthread_t t;
1098 pthread_attr_t attr;
1099
1100 if (pthread_attr_init(&attr) != 0 ||
1101 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1102 pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1103 {
1104 engine->scrubber.running = false;
1105 } else {
1106 ret = true;
1107 }
1108 }
1109 pthread_mutex_unlock(&engine->scrubber.lock);
1110
1111 return ret;
1112 }
1113
1114 struct tap_client {
1115 hash_item cursor;
1116 hash_item *it;
1117 };
1118
item_tap_iterfunc(struct default_engine * engine,hash_item * item,void * cookie)1119 static ENGINE_ERROR_CODE item_tap_iterfunc(struct default_engine *engine,
1120 hash_item *item,
1121 void *cookie) {
1122 struct tap_client *client = cookie;
1123 client->it = item;
1124 ++client->it->refcount;
1125 return ENGINE_SUCCESS;
1126 }
1127
do_item_tap_walker(struct default_engine * engine,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)1128 static tap_event_t do_item_tap_walker(struct default_engine *engine,
1129 const void *cookie, item **itm,
1130 void **es, uint16_t *nes, uint8_t *ttl,
1131 uint16_t *flags, uint32_t *seqno,
1132 uint16_t *vbucket)
1133 {
1134 struct tap_client *client = engine->server.cookie->get_engine_specific(cookie);
1135 if (client == NULL) {
1136 return TAP_DISCONNECT;
1137 }
1138
1139 *es = NULL;
1140 *nes = 0;
1141 *ttl = (uint8_t)-1;
1142 *seqno = 0;
1143 *flags = 0;
1144 *vbucket = 0;
1145 client->it = NULL;
1146
1147 ENGINE_ERROR_CODE r;
1148 do {
1149 if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
1150 // find next slab class to look at..
1151 bool linked = false;
1152 for (int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked; ++ii) {
1153 if (engine->items.heads[ii] != NULL) {
1154 // add the item at the tail
1155 do_item_link_cursor(engine, &client->cursor, ii);
1156 linked = true;
1157 }
1158 }
1159 if (!linked) {
1160 break;
1161 }
1162 }
1163 } while (client->it == NULL);
1164 *itm = client->it;
1165
1166 return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
1167 }
1168
item_tap_walker(ENGINE_HANDLE * handle,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)1169 tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
1170 const void *cookie, item **itm,
1171 void **es, uint16_t *nes, uint8_t *ttl,
1172 uint16_t *flags, uint32_t *seqno,
1173 uint16_t *vbucket)
1174 {
1175 tap_event_t ret;
1176 struct default_engine *engine = (struct default_engine*)handle;
1177 pthread_mutex_lock(&engine->cache_lock);
1178 ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1179 pthread_mutex_unlock(&engine->cache_lock);
1180
1181 return ret;
1182 }
1183
initialize_item_tap_walker(struct default_engine * engine,const void * cookie)1184 bool initialize_item_tap_walker(struct default_engine *engine,
1185 const void* cookie)
1186 {
1187 struct tap_client *client = calloc(1, sizeof(*client));
1188 if (client == NULL) {
1189 return false;
1190 }
1191 client->cursor.refcount = 1;
1192
1193 /* Link the cursor! */
1194 bool linked = false;
1195 for (int ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1196 pthread_mutex_lock(&engine->cache_lock);
1197 if (engine->items.heads[ii] != NULL) {
1198 // add the item at the tail
1199 do_item_link_cursor(engine, &client->cursor, ii);
1200 linked = true;
1201 }
1202 pthread_mutex_unlock(&engine->cache_lock);
1203 }
1204
1205 engine->server.cookie->store_engine_specific(cookie, client);
1206 return true;
1207 }
1208