1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 #include <fcntl.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <string.h>
8 #include <time.h>
9 #include <assert.h>
10 #include <inttypes.h>
11
12 #include "default_engine.h"
13
14 /* Forward Declarations */
15 static void item_link_q(struct default_engine *engine, hash_item *it);
16 static void item_unlink_q(struct default_engine *engine, hash_item *it);
17 static hash_item *do_item_alloc(struct default_engine *engine,
18 const void *key, const size_t nkey,
19 const int flags, const rel_time_t exptime,
20 const int nbytes,
21 const void *cookie);
22 static hash_item *do_item_get(struct default_engine *engine,
23 const char *key, const size_t nkey);
24 static int do_item_link(struct default_engine *engine, hash_item *it);
25 static void do_item_unlink(struct default_engine *engine, hash_item *it);
26 static void do_item_release(struct default_engine *engine, hash_item *it);
27 static void do_item_update(struct default_engine *engine, hash_item *it);
28 static int do_item_replace(struct default_engine *engine,
29 hash_item *it, hash_item *new_it);
30 static void item_free(struct default_engine *engine, hash_item *it);
31
32 /*
33 * We only reposition items in the LRU queue if they haven't been repositioned
34 * in this many seconds. That saves us from churning on frequently-accessed
35 * items.
36 */
37 #define ITEM_UPDATE_INTERVAL 60
38 /*
39 * To avoid scanning through the complete cache in some circumstances we'll
40 * just give up and return an error after inspecting a fixed number of objects.
41 */
42 static const int search_items = 50;
43
item_stats_reset(struct default_engine * engine)44 void item_stats_reset(struct default_engine *engine) {
45 pthread_mutex_lock(&engine->cache_lock);
46 memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
47 pthread_mutex_unlock(&engine->cache_lock);
48 }
49
50
51 /* warning: don't use these macros with a function, as it evals its arg twice */
ITEM_ntotal(struct default_engine * engine,const hash_item * item)52 static inline size_t ITEM_ntotal(struct default_engine *engine,
53 const hash_item *item) {
54 size_t ret = sizeof(*item) + item->nkey + item->nbytes;
55 if (engine->config.use_cas) {
56 ret += sizeof(uint64_t);
57 }
58
59 return ret;
60 }
61
62 /* Get the next CAS id for a new item. */
get_cas_id(void)63 static uint64_t get_cas_id(void) {
64 static uint64_t cas_id = 0;
65 return ++cas_id;
66 }
67
68 /* Enable this for reference-count debugging. */
69 #if 0
70 # define DEBUG_REFCNT(it,op) \
71 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
72 it, op, it->refcount, \
73 (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
74 (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
75 #else
76 # define DEBUG_REFCNT(it,op) while(0)
77 #endif
78
79
80 /*@null@*/
do_item_alloc(struct default_engine * engine,const void * key,const size_t nkey,const int flags,const rel_time_t exptime,const int nbytes,const void * cookie)81 hash_item *do_item_alloc(struct default_engine *engine,
82 const void *key,
83 const size_t nkey,
84 const int flags,
85 const rel_time_t exptime,
86 const int nbytes,
87 const void *cookie) {
88 hash_item *it = NULL;
89 size_t ntotal = sizeof(hash_item) + nkey + nbytes;
90 if (engine->config.use_cas) {
91 ntotal += sizeof(uint64_t);
92 }
93
94 unsigned int id = slabs_clsid(engine, ntotal);
95 if (id == 0)
96 return 0;
97
98 /* do a quick check if we have any expired items in the tail.. */
99 int tries = search_items;
100 hash_item *search;
101 rel_time_t oldest_live = engine->config.oldest_live;
102 rel_time_t current_time = engine->server.core->get_current_time();
103
104 for (search = engine->items.tails[id];
105 tries > 0 && search != NULL;
106 tries--, search=search->prev) {
107 if (search->refcount == 0 &&
108 ((search->time < oldest_live) || //dead by flush
109 (search->exptime != 0 && search->exptime < current_time))) {
110 it = search;
111 /* I don't want to actually free the object, just steal
112 * the item to avoid to grab the slab mutex twice ;-)
113 */
114 pthread_mutex_lock(&engine->stats.lock);
115 engine->stats.reclaimed++;
116 pthread_mutex_unlock(&engine->stats.lock);
117 engine->items.itemstats[id].reclaimed++;
118 it->refcount = 1;
119 slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
120 do_item_unlink(engine, it);
121 /* Initialize the item block: */
122 it->slabs_clsid = 0;
123 it->refcount = 0;
124 break;
125 }
126 }
127
128 if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
129 /*
130 ** Could not find an expired item at the tail, and memory allocation
131 ** failed. Try to evict some items!
132 */
133 tries = search_items;
134
135 /* If requested to not push old items out of cache when memory runs out,
136 * we're out of luck at this point...
137 */
138
139 if (engine->config.evict_to_free == 0) {
140 engine->items.itemstats[id].outofmemory++;
141 return NULL;
142 }
143
144 /*
145 * try to get one off the right LRU
146 * don't necessariuly unlink the tail because it may be locked: refcount>0
147 * search up from tail an item with refcount==0 and unlink it; give up after search_items
148 * tries
149 */
150
151 if (engine->items.tails[id] == 0) {
152 engine->items.itemstats[id].outofmemory++;
153 return NULL;
154 }
155
156 for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
157 if (search->refcount == 0) {
158 if (search->exptime == 0 || search->exptime > current_time) {
159 engine->items.itemstats[id].evicted++;
160 engine->items.itemstats[id].evicted_time = current_time - search->time;
161 if (search->exptime != 0) {
162 engine->items.itemstats[id].evicted_nonzero++;
163 }
164 pthread_mutex_lock(&engine->stats.lock);
165 engine->stats.evictions++;
166 pthread_mutex_unlock(&engine->stats.lock);
167 engine->server.stat->evicting(cookie,
168 item_get_key(search),
169 search->nkey);
170 } else {
171 engine->items.itemstats[id].reclaimed++;
172 pthread_mutex_lock(&engine->stats.lock);
173 engine->stats.reclaimed++;
174 pthread_mutex_unlock(&engine->stats.lock);
175 }
176 do_item_unlink(engine, search);
177 break;
178 }
179 }
180 it = slabs_alloc(engine, ntotal, id);
181 if (it == 0) {
182 engine->items.itemstats[id].outofmemory++;
183 /* Last ditch effort. There is a very rare bug which causes
184 * refcount leaks. We've fixed most of them, but it still happens,
185 * and it may happen in the future.
186 * We can reasonably assume no item can stay locked for more than
187 * three hours, so if we find one in the tail which is that old,
188 * free it anyway.
189 */
190 tries = search_items;
191 for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
192 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
193 engine->items.itemstats[id].tailrepairs++;
194 search->refcount = 0;
195 do_item_unlink(engine, search);
196 break;
197 }
198 }
199 it = slabs_alloc(engine, ntotal, id);
200 if (it == 0) {
201 return NULL;
202 }
203 }
204 }
205
206 assert(it->slabs_clsid == 0);
207
208 it->slabs_clsid = id;
209
210 assert(it != engine->items.heads[it->slabs_clsid]);
211
212 it->next = it->prev = it->h_next = 0;
213 it->refcount = 1; /* the caller will have a reference */
214 DEBUG_REFCNT(it, '*');
215 it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
216 it->nkey = nkey;
217 it->nbytes = nbytes;
218 it->flags = flags;
219 memcpy((void*)item_get_key(it), key, nkey);
220 it->exptime = exptime;
221 return it;
222 }
223
item_free(struct default_engine * engine,hash_item * it)224 static void item_free(struct default_engine *engine, hash_item *it) {
225 size_t ntotal = ITEM_ntotal(engine, it);
226 unsigned int clsid;
227 assert((it->iflag & ITEM_LINKED) == 0);
228 assert(it != engine->items.heads[it->slabs_clsid]);
229 assert(it != engine->items.tails[it->slabs_clsid]);
230 assert(it->refcount == 0);
231
232 /* so slab size changer can tell later if item is already free or not */
233 clsid = it->slabs_clsid;
234 it->slabs_clsid = 0;
235 it->iflag |= ITEM_SLABBED;
236 DEBUG_REFCNT(it, 'F');
237 slabs_free(engine, it, ntotal, clsid);
238 }
239
item_link_q(struct default_engine * engine,hash_item * it)240 static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
241 hash_item **head, **tail;
242 assert(it->slabs_clsid < POWER_LARGEST);
243 assert((it->iflag & ITEM_SLABBED) == 0);
244
245 head = &engine->items.heads[it->slabs_clsid];
246 tail = &engine->items.tails[it->slabs_clsid];
247 assert(it != *head);
248 assert((*head && *tail) || (*head == 0 && *tail == 0));
249 it->prev = 0;
250 it->next = *head;
251 if (it->next) it->next->prev = it;
252 *head = it;
253 if (*tail == 0) *tail = it;
254 engine->items.sizes[it->slabs_clsid]++;
255 return;
256 }
257
item_unlink_q(struct default_engine * engine,hash_item * it)258 static void item_unlink_q(struct default_engine *engine, hash_item *it) {
259 hash_item **head, **tail;
260 assert(it->slabs_clsid < POWER_LARGEST);
261 head = &engine->items.heads[it->slabs_clsid];
262 tail = &engine->items.tails[it->slabs_clsid];
263
264 if (*head == it) {
265 assert(it->prev == 0);
266 *head = it->next;
267 }
268 if (*tail == it) {
269 assert(it->next == 0);
270 *tail = it->prev;
271 }
272 assert(it->next != it);
273 assert(it->prev != it);
274
275 if (it->next) it->next->prev = it->prev;
276 if (it->prev) it->prev->next = it->next;
277 engine->items.sizes[it->slabs_clsid]--;
278 return;
279 }
280
do_item_link(struct default_engine * engine,hash_item * it)281 int do_item_link(struct default_engine *engine, hash_item *it) {
282 MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
283 assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
284 assert(it->nbytes < (1024 * 1024)); /* 1MB max size */
285 it->iflag |= ITEM_LINKED;
286 it->time = engine->server.core->get_current_time();
287 assoc_insert(engine, engine->server.core->hash(item_get_key(it),
288 it->nkey, 0),
289 it);
290
291 pthread_mutex_lock(&engine->stats.lock);
292 engine->stats.curr_bytes += ITEM_ntotal(engine, it);
293 engine->stats.curr_items += 1;
294 engine->stats.total_items += 1;
295 pthread_mutex_unlock(&engine->stats.lock);
296
297 /* Allocate a new CAS ID on link. */
298 item_set_cas(NULL, NULL, it, get_cas_id());
299
300 item_link_q(engine, it);
301
302 return 1;
303 }
304
do_item_unlink(struct default_engine * engine,hash_item * it)305 void do_item_unlink(struct default_engine *engine, hash_item *it) {
306 MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
307 if ((it->iflag & ITEM_LINKED) != 0) {
308 it->iflag &= ~ITEM_LINKED;
309 pthread_mutex_lock(&engine->stats.lock);
310 engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
311 engine->stats.curr_items -= 1;
312 pthread_mutex_unlock(&engine->stats.lock);
313 assoc_delete(engine, engine->server.core->hash(item_get_key(it),
314 it->nkey, 0),
315 item_get_key(it), it->nkey);
316 item_unlink_q(engine, it);
317 if (it->refcount == 0) {
318 item_free(engine, it);
319 }
320 }
321 }
322
do_item_release(struct default_engine * engine,hash_item * it)323 void do_item_release(struct default_engine *engine, hash_item *it) {
324 MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
325 if (it->refcount != 0) {
326 it->refcount--;
327 DEBUG_REFCNT(it, '-');
328 }
329 if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
330 item_free(engine, it);
331 }
332 }
333
do_item_update(struct default_engine * engine,hash_item * it)334 void do_item_update(struct default_engine *engine, hash_item *it) {
335 rel_time_t current_time = engine->server.core->get_current_time();
336 MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
337 if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
338 assert((it->iflag & ITEM_SLABBED) == 0);
339
340 if ((it->iflag & ITEM_LINKED) != 0) {
341 item_unlink_q(engine, it);
342 it->time = current_time;
343 item_link_q(engine, it);
344 }
345 }
346 }
347
do_item_replace(struct default_engine * engine,hash_item * it,hash_item * new_it)348 int do_item_replace(struct default_engine *engine,
349 hash_item *it, hash_item *new_it) {
350 MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
351 item_get_key(new_it), new_it->nkey, new_it->nbytes);
352 assert((it->iflag & ITEM_SLABBED) == 0);
353
354 do_item_unlink(engine, it);
355 return do_item_link(engine, new_it);
356 }
357
358 /*@null@*/
do_item_cachedump(const unsigned int slabs_clsid,const unsigned int limit,unsigned int * bytes)359 static char *do_item_cachedump(const unsigned int slabs_clsid,
360 const unsigned int limit,
361 unsigned int *bytes) {
362 #ifdef FUTURE
363 unsigned int memlimit = 2 * 1024 * 1024; /* 2MB max response size */
364 char *buffer;
365 unsigned int bufcurr;
366 hash_item *it;
367 unsigned int len;
368 unsigned int shown = 0;
369 char key_temp[KEY_MAX_LENGTH + 1];
370 char temp[512];
371
372 it = engine->items.heads[slabs_clsid];
373
374 buffer = malloc((size_t)memlimit);
375 if (buffer == 0) return NULL;
376 bufcurr = 0;
377
378
379 while (it != NULL && (limit == 0 || shown < limit)) {
380 assert(it->nkey <= KEY_MAX_LENGTH);
381 /* Copy the key since it may not be null-terminated in the struct */
382 strncpy(key_temp, item_get_key(it), it->nkey);
383 key_temp[it->nkey] = 0x00; /* terminate */
384 len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
385 key_temp, it->nbytes,
386 (unsigned long)it->exptime + process_started);
387 if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
388 break;
389 memcpy(buffer + bufcurr, temp, len);
390 bufcurr += len;
391 shown++;
392 it = it->next;
393 }
394
395
396 memcpy(buffer + bufcurr, "END\r\n", 6);
397 bufcurr += 5;
398
399 *bytes = bufcurr;
400 return buffer;
401 #endif
402 (void)slabs_clsid;
403 (void)limit;
404 (void)bytes;
405 return NULL;
406 }
407
do_item_stats(struct default_engine * engine,ADD_STAT add_stats,const void * c)408 static void do_item_stats(struct default_engine *engine,
409 ADD_STAT add_stats, const void *c) {
410 int i;
411 rel_time_t current_time = engine->server.core->get_current_time();
412 for (i = 0; i < POWER_LARGEST; i++) {
413 if (engine->items.tails[i] != NULL) {
414 int search = search_items;
415 while (search > 0 &&
416 engine->items.tails[i] != NULL &&
417 ((engine->config.oldest_live != 0 && /* Item flushd */
418 engine->config.oldest_live <= current_time &&
419 engine->items.tails[i]->time <= engine->config.oldest_live) ||
420 (engine->items.tails[i]->exptime != 0 && /* and not expired */
421 engine->items.tails[i]->exptime < current_time))) {
422 --search;
423 if (engine->items.tails[i]->refcount == 0) {
424 do_item_unlink(engine, engine->items.tails[i]);
425 } else {
426 break;
427 }
428 }
429 if (engine->items.tails[i] == NULL) {
430 /* We removed all of the items in this slab class */
431 continue;
432 }
433
434 const char *prefix = "items";
435 add_statistics(c, add_stats, prefix, i, "number", "%u",
436 engine->items.sizes[i]);
437 add_statistics(c, add_stats, prefix, i, "age", "%u",
438 engine->items.tails[i]->time);
439 add_statistics(c, add_stats, prefix, i, "evicted",
440 "%u", engine->items.itemstats[i].evicted);
441 add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
442 "%u", engine->items.itemstats[i].evicted_nonzero);
443 add_statistics(c, add_stats, prefix, i, "evicted_time",
444 "%u", engine->items.itemstats[i].evicted_time);
445 add_statistics(c, add_stats, prefix, i, "outofmemory",
446 "%u", engine->items.itemstats[i].outofmemory);
447 add_statistics(c, add_stats, prefix, i, "tailrepairs",
448 "%u", engine->items.itemstats[i].tailrepairs);;
449 add_statistics(c, add_stats, prefix, i, "reclaimed",
450 "%u", engine->items.itemstats[i].reclaimed);;
451 }
452 }
453 }
454
455 /** dumps out a list of objects of each size, with granularity of 32 bytes */
456 /*@null@*/
do_item_stats_sizes(struct default_engine * engine,ADD_STAT add_stats,const void * c)457 static void do_item_stats_sizes(struct default_engine *engine,
458 ADD_STAT add_stats, const void *c) {
459
460 /* max 1MB object, divided into 32 bytes size buckets */
461 const int num_buckets = 32768;
462 unsigned int *histogram = calloc(num_buckets, sizeof(int));
463
464 if (histogram != NULL) {
465 int i;
466
467 /* build the histogram */
468 for (i = 0; i < POWER_LARGEST; i++) {
469 hash_item *iter = engine->items.heads[i];
470 while (iter) {
471 int ntotal = ITEM_ntotal(engine, iter);
472 int bucket = ntotal / 32;
473 if ((ntotal % 32) != 0) bucket++;
474 if (bucket < num_buckets) histogram[bucket]++;
475 iter = iter->next;
476 }
477 }
478
479 /* write the buffer */
480 for (i = 0; i < num_buckets; i++) {
481 if (histogram[i] != 0) {
482 char key[8], val[32];
483 int klen, vlen;
484 klen = snprintf(key, sizeof(key), "%d", i * 32);
485 vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
486 assert(klen < sizeof(key));
487 assert(vlen < sizeof(val));
488 add_stats(key, klen, val, vlen, c);
489 }
490 }
491 free(histogram);
492 }
493 }
494
495 /** wrapper around assoc_find which does the lazy expiration logic */
do_item_get(struct default_engine * engine,const char * key,const size_t nkey)496 hash_item *do_item_get(struct default_engine *engine,
497 const char *key, const size_t nkey) {
498 rel_time_t current_time = engine->server.core->get_current_time();
499 hash_item *it = assoc_find(engine, engine->server.core->hash(key,
500 nkey, 0),
501 key, nkey);
502 int was_found = 0;
503
504 if (engine->config.verbose > 2) {
505 EXTENSION_LOGGER_DESCRIPTOR *logger;
506 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
507 if (it == NULL) {
508 logger->log(EXTENSION_LOG_DEBUG, NULL,
509 "> NOT FOUND %s", key);
510 } else {
511 logger->log(EXTENSION_LOG_DEBUG, NULL,
512 "> FOUND KEY %s",
513 (const char*)item_get_key(it));
514 was_found++;
515 }
516 }
517
518 if (it != NULL && engine->config.oldest_live != 0 &&
519 engine->config.oldest_live <= current_time &&
520 it->time <= engine->config.oldest_live) {
521 do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
522 it = NULL;
523 }
524
525 if (it == NULL && was_found) {
526 EXTENSION_LOGGER_DESCRIPTOR *logger;
527 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
528 logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by flush");
529 was_found--;
530 }
531
532 if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
533 do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
534 it = NULL;
535 }
536
537 if (it == NULL && was_found) {
538 EXTENSION_LOGGER_DESCRIPTOR *logger;
539 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
540 logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by expire");
541 was_found--;
542 }
543
544 if (it != NULL) {
545 it->refcount++;
546 DEBUG_REFCNT(it, '+');
547 do_item_update(engine, it);
548 }
549
550 return it;
551 }
552
553 /*
554 * Stores an item in the cache according to the semantics of one of the set
555 * commands. In threaded mode, this is protected by the cache lock.
556 *
557 * Returns the state of storage.
558 */
do_store_item(struct default_engine * engine,hash_item * it,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)559 static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
560 hash_item *it, uint64_t *cas,
561 ENGINE_STORE_OPERATION operation,
562 const void *cookie) {
563 const char *key = item_get_key(it);
564 hash_item *old_it = do_item_get(engine, key, it->nkey);
565 ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
566
567 hash_item *new_it = NULL;
568
569 if (old_it != NULL && operation == OPERATION_ADD) {
570 /* add only adds a nonexistent item, but promote to head of LRU */
571 do_item_update(engine, old_it);
572 } else if (!old_it && (operation == OPERATION_REPLACE
573 || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
574 {
575 /* replace only replaces an existing value; don't store */
576 } else if (operation == OPERATION_CAS) {
577 /* validate cas operation */
578 if(old_it == NULL) {
579 // LRU expired
580 stored = ENGINE_KEY_ENOENT;
581 }
582 else if (item_get_cas(it) == item_get_cas(old_it)) {
583 // cas validates
584 // it and old_it may belong to different classes.
585 // I'm updating the stats for the one that's getting pushed out
586 do_item_replace(engine, old_it, it);
587 stored = ENGINE_SUCCESS;
588 } else {
589 if (engine->config.verbose > 1) {
590 EXTENSION_LOGGER_DESCRIPTOR *logger;
591 logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
592 logger->log(EXTENSION_LOG_INFO, NULL,
593 "CAS: failure: expected %"PRIu64", got %"PRIu64"\n",
594 item_get_cas(old_it),
595 item_get_cas(it));
596 }
597 stored = ENGINE_KEY_EEXISTS;
598 }
599 } else {
600 /*
601 * Append - combine new and old record into single one. Here it's
602 * atomic and thread-safe.
603 */
604 if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
605 /*
606 * Validate CAS
607 */
608 if (item_get_cas(it) != 0) {
609 // CAS much be equal
610 if (item_get_cas(it) != item_get_cas(old_it)) {
611 stored = ENGINE_KEY_EEXISTS;
612 }
613 }
614
615 if (stored == ENGINE_NOT_STORED) {
616 /* we have it and old_it here - alloc memory to hold both */
617 new_it = do_item_alloc(engine, key, it->nkey,
618 old_it->flags,
619 old_it->exptime,
620 it->nbytes + old_it->nbytes,
621 cookie);
622
623 if (new_it == NULL) {
624 /* SERVER_ERROR out of memory */
625 if (old_it != NULL) {
626 do_item_release(engine, old_it);
627 }
628
629 return ENGINE_NOT_STORED;
630 }
631
632 /* copy data from it and old_it to new_it */
633
634 if (operation == OPERATION_APPEND) {
635 memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
636 memcpy(item_get_data(new_it) + old_it->nbytes, item_get_data(it), it->nbytes);
637 } else {
638 /* OPERATION_PREPEND */
639 memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
640 memcpy(item_get_data(new_it) + it->nbytes, item_get_data(old_it), old_it->nbytes);
641 }
642
643 it = new_it;
644 }
645 }
646
647 if (stored == ENGINE_NOT_STORED) {
648 if (old_it != NULL) {
649 do_item_replace(engine, old_it, it);
650 } else {
651 do_item_link(engine, it);
652 }
653
654 *cas = item_get_cas(it);
655 stored = ENGINE_SUCCESS;
656 }
657 }
658
659 if (old_it != NULL) {
660 do_item_release(engine, old_it); /* release our reference */
661 }
662
663 if (new_it != NULL) {
664 do_item_release(engine, new_it);
665 }
666
667 if (stored == ENGINE_SUCCESS) {
668 *cas = item_get_cas(it);
669 }
670
671 return stored;
672 }
673
674
675 /*
676 * adds a delta value to a numeric item.
677 *
678 * c connection requesting the operation
679 * it item to adjust
680 * incr true to increment value, false to decrement
681 * delta amount to adjust value by
682 * buf buffer for response string
683 *
684 * returns a response string to send back to the client.
685 */
do_add_delta(struct default_engine * engine,hash_item * it,const bool incr,const int64_t delta,uint64_t * rcas,uint64_t * result,const void * cookie)686 static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
687 hash_item *it, const bool incr,
688 const int64_t delta, uint64_t *rcas,
689 uint64_t *result, const void *cookie) {
690 const char *ptr;
691 uint64_t value;
692 char buf[80];
693 int res;
694
695 if (it->nbytes >= (sizeof(buf) - 1)) {
696 return ENGINE_EINVAL;
697 }
698
699 ptr = item_get_data(it);
700 memcpy(buf, ptr, it->nbytes);
701 buf[it->nbytes] = '\0';
702
703 if (!safe_strtoull(buf, &value)) {
704 return ENGINE_EINVAL;
705 }
706
707 if (incr) {
708 value += delta;
709 } else {
710 if(delta > value) {
711 value = 0;
712 } else {
713 value -= delta;
714 }
715 }
716
717 *result = value;
718 if ((res = snprintf(buf, sizeof(buf), "%" PRIu64, value)) == -1) {
719 return ENGINE_EINVAL;
720 }
721
722 if (it->refcount == 1 && res <= it->nbytes) {
723 // we can do inline replacement
724 memcpy(item_get_data(it), buf, res);
725 memset(item_get_data(it) + res, ' ', it->nbytes - res);
726 item_set_cas(NULL, NULL, it, get_cas_id());
727 *rcas = item_get_cas(it);
728 } else {
729 hash_item *new_it = do_item_alloc(engine, item_get_key(it),
730 it->nkey, it->flags,
731 it->exptime, res,
732 cookie);
733 if (new_it == NULL) {
734 do_item_unlink(engine, it);
735 return ENGINE_ENOMEM;
736 }
737 memcpy(item_get_data(new_it), buf, res);
738 do_item_replace(engine, it, new_it);
739 *rcas = item_get_cas(new_it);
740 do_item_release(engine, new_it); /* release our reference */
741 }
742
743 return ENGINE_SUCCESS;
744 }
745
746 /********************************* ITEM ACCESS *******************************/
747
748 /*
749 * Allocates a new item.
750 */
item_alloc(struct default_engine * engine,const void * key,size_t nkey,int flags,rel_time_t exptime,int nbytes,const void * cookie)751 hash_item *item_alloc(struct default_engine *engine,
752 const void *key, size_t nkey, int flags,
753 rel_time_t exptime, int nbytes, const void *cookie) {
754 hash_item *it;
755 pthread_mutex_lock(&engine->cache_lock);
756 it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
757 pthread_mutex_unlock(&engine->cache_lock);
758 return it;
759 }
760
761 /*
762 * Returns an item if it hasn't been marked as expired,
763 * lazy-expiring as needed.
764 */
item_get(struct default_engine * engine,const void * key,const size_t nkey)765 hash_item *item_get(struct default_engine *engine,
766 const void *key, const size_t nkey) {
767 hash_item *it;
768 pthread_mutex_lock(&engine->cache_lock);
769 it = do_item_get(engine, key, nkey);
770 pthread_mutex_unlock(&engine->cache_lock);
771 return it;
772 }
773
774 /*
775 * Decrements the reference count on an item and adds it to the freelist if
776 * needed.
777 */
item_release(struct default_engine * engine,hash_item * item)778 void item_release(struct default_engine *engine, hash_item *item) {
779 pthread_mutex_lock(&engine->cache_lock);
780 do_item_release(engine, item);
781 pthread_mutex_unlock(&engine->cache_lock);
782 }
783
784 /*
785 * Unlinks an item from the LRU and hashtable.
786 */
item_unlink(struct default_engine * engine,hash_item * item)787 void item_unlink(struct default_engine *engine, hash_item *item) {
788 pthread_mutex_lock(&engine->cache_lock);
789 do_item_unlink(engine, item);
790 pthread_mutex_unlock(&engine->cache_lock);
791 }
792
do_arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)793 static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
794 const void* cookie,
795 const void* key,
796 const int nkey,
797 const bool increment,
798 const bool create,
799 const uint64_t delta,
800 const uint64_t initial,
801 const rel_time_t exptime,
802 uint64_t *cas,
803 uint64_t *result)
804 {
805 hash_item *item = do_item_get(engine, key, nkey);
806 ENGINE_ERROR_CODE ret;
807
808 if (item == NULL) {
809 if (!create) {
810 return ENGINE_KEY_ENOENT;
811 } else {
812 char buffer[128];
813 int len = snprintf(buffer, sizeof(buffer), "%"PRIu64,
814 (uint64_t)initial);
815
816 item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
817 if (item == NULL) {
818 return ENGINE_ENOMEM;
819 }
820 memcpy((void*)item_get_data(item), buffer, len);
821 if ((ret = do_store_item(engine, item, cas,
822 OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
823 *result = initial;
824 *cas = item_get_cas(item);
825 }
826 do_item_release(engine, item);
827 }
828 } else {
829 ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
830 do_item_release(engine, item);
831 }
832
833 return ret;
834 }
835
arithmetic(struct default_engine * engine,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result)836 ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
837 const void* cookie,
838 const void* key,
839 const int nkey,
840 const bool increment,
841 const bool create,
842 const uint64_t delta,
843 const uint64_t initial,
844 const rel_time_t exptime,
845 uint64_t *cas,
846 uint64_t *result)
847 {
848 ENGINE_ERROR_CODE ret;
849
850 pthread_mutex_lock(&engine->cache_lock);
851 ret = do_arithmetic(engine, cookie, key, nkey, increment,
852 create, delta, initial, exptime, cas,
853 result);
854 pthread_mutex_unlock(&engine->cache_lock);
855 return ret;
856 }
857
858 /*
859 * Stores an item in the cache (high level, obeys set/add/replace semantics)
860 */
store_item(struct default_engine * engine,hash_item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,const void * cookie)861 ENGINE_ERROR_CODE store_item(struct default_engine *engine,
862 hash_item *item, uint64_t *cas,
863 ENGINE_STORE_OPERATION operation,
864 const void *cookie) {
865 ENGINE_ERROR_CODE ret;
866
867 pthread_mutex_lock(&engine->cache_lock);
868 ret = do_store_item(engine, item, cas, operation, cookie);
869 pthread_mutex_unlock(&engine->cache_lock);
870 return ret;
871 }
872
do_touch_item(struct default_engine * engine,const void * key,uint16_t nkey,uint32_t exptime)873 static hash_item *do_touch_item(struct default_engine *engine,
874 const void *key,
875 uint16_t nkey,
876 uint32_t exptime)
877 {
878 hash_item *item = do_item_get(engine, key, nkey);
879 if (item != NULL) {
880 item->exptime = exptime;
881 }
882 return item;
883 }
884
touch_item(struct default_engine * engine,const void * key,uint16_t nkey,uint32_t exptime)885 hash_item *touch_item(struct default_engine *engine,
886 const void *key,
887 uint16_t nkey,
888 uint32_t exptime)
889 {
890 hash_item *ret;
891
892 pthread_mutex_lock(&engine->cache_lock);
893 ret = do_touch_item(engine, key, nkey, exptime);
894 pthread_mutex_unlock(&engine->cache_lock);
895 return ret;
896 }
897
898 /*
899 * Flushes expired items after a flush_all call
900 */
item_flush_expired(struct default_engine * engine,time_t when)901 void item_flush_expired(struct default_engine *engine, time_t when) {
902 int i;
903 hash_item *iter, *next;
904
905 pthread_mutex_lock(&engine->cache_lock);
906
907 if (when == 0) {
908 engine->config.oldest_live = engine->server.core->get_current_time() - 1;
909 } else {
910 engine->config.oldest_live = engine->server.core->realtime(when) - 1;
911 }
912
913 if (engine->config.oldest_live != 0) {
914 for (i = 0; i < POWER_LARGEST; i++) {
915 /*
916 * The LRU is sorted in decreasing time order, and an item's
917 * timestamp is never newer than its last access time, so we
918 * only need to walk back until we hit an item older than the
919 * oldest_live time.
920 * The oldest_live checking will auto-expire the remaining items.
921 */
922 for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
923 if (iter->time >= engine->config.oldest_live) {
924 next = iter->next;
925 if ((iter->iflag & ITEM_SLABBED) == 0) {
926 do_item_unlink(engine, iter);
927 }
928 } else {
929 /* We've hit the first old item. Continue to the next queue. */
930 break;
931 }
932 }
933 }
934 }
935 pthread_mutex_unlock(&engine->cache_lock);
936 }
937
938 /*
939 * Dumps part of the cache
940 */
item_cachedump(struct default_engine * engine,unsigned int slabs_clsid,unsigned int limit,unsigned int * bytes)941 char *item_cachedump(struct default_engine *engine,
942 unsigned int slabs_clsid,
943 unsigned int limit,
944 unsigned int *bytes) {
945 char *ret;
946
947 pthread_mutex_lock(&engine->cache_lock);
948 ret = do_item_cachedump(slabs_clsid, limit, bytes);
949 pthread_mutex_unlock(&engine->cache_lock);
950 return ret;
951 }
952
item_stats(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)953 void item_stats(struct default_engine *engine,
954 ADD_STAT add_stat, const void *cookie)
955 {
956 pthread_mutex_lock(&engine->cache_lock);
957 do_item_stats(engine, add_stat, cookie);
958 pthread_mutex_unlock(&engine->cache_lock);
959 }
960
961
item_stats_sizes(struct default_engine * engine,ADD_STAT add_stat,const void * cookie)962 void item_stats_sizes(struct default_engine *engine,
963 ADD_STAT add_stat, const void *cookie)
964 {
965 pthread_mutex_lock(&engine->cache_lock);
966 do_item_stats_sizes(engine, add_stat, cookie);
967 pthread_mutex_unlock(&engine->cache_lock);
968 }
969
do_item_link_cursor(struct default_engine * engine,hash_item * cursor,int ii)970 static void do_item_link_cursor(struct default_engine *engine,
971 hash_item *cursor, int ii)
972 {
973 cursor->slabs_clsid = (uint8_t)ii;
974 cursor->next = NULL;
975 cursor->prev = engine->items.tails[ii];
976 engine->items.tails[ii]->next = cursor;
977 engine->items.tails[ii] = cursor;
978 engine->items.sizes[ii]++;
979 }
980
981 typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
982 hash_item *item, void *cookie);
983
do_item_walk_cursor(struct default_engine * engine,hash_item * cursor,int steplength,ITERFUNC itemfunc,void * itemdata,ENGINE_ERROR_CODE * error)984 static bool do_item_walk_cursor(struct default_engine *engine,
985 hash_item *cursor,
986 int steplength,
987 ITERFUNC itemfunc,
988 void* itemdata,
989 ENGINE_ERROR_CODE *error)
990 {
991 int ii = 0;
992 *error = ENGINE_SUCCESS;
993
994 while (cursor->prev != NULL && ii < steplength) {
995 ++ii;
996 /* Move cursor */
997 hash_item *ptr = cursor->prev;
998 item_unlink_q(engine, cursor);
999
1000 bool done = false;
1001 if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1002 done = true;
1003 cursor->prev = NULL;
1004 } else {
1005 cursor->next = ptr;
1006 cursor->prev = ptr->prev;
1007 cursor->prev->next = cursor;
1008 ptr->prev = cursor;
1009 }
1010
1011 /* Ignore cursors */
1012 if (ptr->nkey == 0 && ptr->nbytes == 0) {
1013 --ii;
1014 } else {
1015 *error = itemfunc(engine, ptr, itemdata);
1016 if (*error != ENGINE_SUCCESS) {
1017 return false;
1018 }
1019 }
1020
1021 if (done) {
1022 return false;
1023 }
1024 }
1025
1026 return (cursor->prev != NULL);
1027 }
1028
item_scrub(struct default_engine * engine,hash_item * item,void * cookie)1029 static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1030 hash_item *item,
1031 void *cookie) {
1032 (void)cookie;
1033 engine->scrubber.visited++;
1034 rel_time_t current_time = engine->server.core->get_current_time();
1035 if (item->refcount == 0 &&
1036 (item->exptime != 0 && item->exptime < current_time)) {
1037 do_item_unlink(engine, item);
1038 engine->scrubber.cleaned++;
1039 }
1040 return ENGINE_SUCCESS;
1041 }
1042
item_scrub_class(struct default_engine * engine,hash_item * cursor)1043 static void item_scrub_class(struct default_engine *engine,
1044 hash_item *cursor) {
1045
1046 ENGINE_ERROR_CODE ret;
1047 bool more;
1048 do {
1049 pthread_mutex_lock(&engine->cache_lock);
1050 more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1051 pthread_mutex_unlock(&engine->cache_lock);
1052 if (ret != ENGINE_SUCCESS) {
1053 break;
1054 }
1055 } while (more);
1056 }
1057
item_scubber_main(void * arg)1058 static void *item_scubber_main(void *arg)
1059 {
1060 struct default_engine *engine = arg;
1061 hash_item cursor = { .refcount = 1 };
1062
1063 for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1064 pthread_mutex_lock(&engine->cache_lock);
1065 bool skip = false;
1066 if (engine->items.heads[ii] == NULL) {
1067 skip = true;
1068 } else {
1069 // add the item at the tail
1070 do_item_link_cursor(engine, &cursor, ii);
1071 }
1072 pthread_mutex_unlock(&engine->cache_lock);
1073
1074 if (!skip) {
1075 item_scrub_class(engine, &cursor);
1076 }
1077 }
1078
1079 pthread_mutex_lock(&engine->scrubber.lock);
1080 engine->scrubber.stopped = time(NULL);
1081 engine->scrubber.running = false;
1082 pthread_mutex_unlock(&engine->scrubber.lock);
1083
1084 return NULL;
1085 }
1086
item_start_scrub(struct default_engine * engine)1087 bool item_start_scrub(struct default_engine *engine)
1088 {
1089 bool ret = false;
1090 pthread_mutex_lock(&engine->scrubber.lock);
1091 if (!engine->scrubber.running) {
1092 engine->scrubber.started = time(NULL);
1093 engine->scrubber.stopped = 0;
1094 engine->scrubber.visited = 0;
1095 engine->scrubber.cleaned = 0;
1096 engine->scrubber.running = true;
1097
1098 pthread_t t;
1099 pthread_attr_t attr;
1100
1101 if (pthread_attr_init(&attr) != 0 ||
1102 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1103 pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1104 {
1105 engine->scrubber.running = false;
1106 } else {
1107 ret = true;
1108 }
1109 }
1110 pthread_mutex_unlock(&engine->scrubber.lock);
1111
1112 return ret;
1113 }
1114
1115 struct tap_client {
1116 hash_item cursor;
1117 hash_item *it;
1118 };
1119
item_tap_iterfunc(struct default_engine * engine,hash_item * item,void * cookie)1120 static ENGINE_ERROR_CODE item_tap_iterfunc(struct default_engine *engine,
1121 hash_item *item,
1122 void *cookie) {
1123 struct tap_client *client = cookie;
1124 client->it = item;
1125 ++client->it->refcount;
1126 return ENGINE_SUCCESS;
1127 }
1128
do_item_tap_walker(struct default_engine * engine,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)1129 static tap_event_t do_item_tap_walker(struct default_engine *engine,
1130 const void *cookie, item **itm,
1131 void **es, uint16_t *nes, uint8_t *ttl,
1132 uint16_t *flags, uint32_t *seqno,
1133 uint16_t *vbucket)
1134 {
1135 struct tap_client *client = engine->server.cookie->get_engine_specific(cookie);
1136 if (client == NULL) {
1137 return TAP_DISCONNECT;
1138 }
1139
1140 *es = NULL;
1141 *nes = 0;
1142 *ttl = (uint8_t)-1;
1143 *seqno = 0;
1144 *flags = 0;
1145 *vbucket = 0;
1146 client->it = NULL;
1147
1148 ENGINE_ERROR_CODE r;
1149 do {
1150 if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
1151 // find next slab class to look at..
1152 bool linked = false;
1153 for (int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked; ++ii) {
1154 if (engine->items.heads[ii] != NULL) {
1155 // add the item at the tail
1156 do_item_link_cursor(engine, &client->cursor, ii);
1157 linked = true;
1158 }
1159 }
1160 if (!linked) {
1161 break;
1162 }
1163 }
1164 } while (client->it == NULL);
1165 *itm = client->it;
1166
1167 return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
1168 }
1169
item_tap_walker(ENGINE_HANDLE * handle,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)1170 tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
1171 const void *cookie, item **itm,
1172 void **es, uint16_t *nes, uint8_t *ttl,
1173 uint16_t *flags, uint32_t *seqno,
1174 uint16_t *vbucket)
1175 {
1176 tap_event_t ret;
1177 struct default_engine *engine = (struct default_engine*)handle;
1178 pthread_mutex_lock(&engine->cache_lock);
1179 ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1180 pthread_mutex_unlock(&engine->cache_lock);
1181
1182 return ret;
1183 }
1184
initialize_item_tap_walker(struct default_engine * engine,const void * cookie)1185 bool initialize_item_tap_walker(struct default_engine *engine,
1186 const void* cookie)
1187 {
1188 struct tap_client *client = calloc(1, sizeof(*client));
1189 if (client == NULL) {
1190 return false;
1191 }
1192 client->cursor.refcount = 1;
1193
1194 /* Link the cursor! */
1195 bool linked = false;
1196 for (int ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1197 pthread_mutex_lock(&engine->cache_lock);
1198 if (engine->items.heads[ii] != NULL) {
1199 // add the item at the tail
1200 do_item_link_cursor(engine, &client->cursor, ii);
1201 linked = true;
1202 }
1203 pthread_mutex_unlock(&engine->cache_lock);
1204 }
1205
1206 engine->server.cookie->store_engine_specific(cookie, client);
1207 return true;
1208 }
1209