1 /*****************************************************************************
2 This file was modified by Oracle on 2015/02/05
3 Modifications copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
4 *****************************************************************************/
5
6 #include "config.h"
7
8 #include <stdlib.h>
9 #include <stdio.h>
10 #include <string.h>
11 #include <assert.h>
12 #include <errno.h>
13 #include <ctype.h>
14 #include <unistd.h>
15 #include <stddef.h>
16 #include <inttypes.h>
17 #include <arpa/inet.h>
18
19 #include "default_engine.h"
20 #include "memcached/util.h"
21 #include "memcached/config_parser.h"
22
23 #define CMD_SET_VBUCKET 0x83
24 #define CMD_GET_VBUCKET 0x84
25 #define CMD_DEL_VBUCKET 0x85
26
27 static const engine_info* default_get_info(ENGINE_HANDLE* handle);
28 static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle,
29 const char* config_str);
30 static void default_destroy(ENGINE_HANDLE* handle, bool force);
31 static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle,
32 const void* cookie,
33 item **item,
34 const void* key,
35 const size_t nkey,
36 const size_t nbytes,
37 const int flags,
38 const rel_time_t exptime);
39 static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle,
40 const void* cookie,
41 const void* key,
42 const size_t nkey,
43 uint64_t cas,
44 uint16_t vbucket);
45
46 static void default_item_release(ENGINE_HANDLE* handle, const void *cookie,
47 item* item);
48 static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle,
49 const void* cookie,
50 item** item,
51 const void* key,
52 const int nkey,
53 uint16_t vbucket);
54 static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle,
55 const void *cookie,
56 const char *stat_key,
57 int nkey,
58 ADD_STAT add_stat);
59 static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie);
60 static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle,
61 const void *cookie,
62 item* item,
63 uint64_t *cas,
64 ENGINE_STORE_OPERATION operation,
65 uint16_t vbucket);
66 static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle,
67 const void* cookie,
68 const void* key,
69 const int nkey,
70 const bool increment,
71 const bool create,
72 const uint64_t delta,
73 const uint64_t initial,
74 const rel_time_t exptime,
75 uint64_t *cas,
76 uint64_t *result,
77 uint16_t vbucket);
78 static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle,
79 const void* cookie, time_t when);
80 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
81 const char *cfg_str);
82 static TAP_ITERATOR get_tap_iterator(ENGINE_HANDLE* handle, const void* cookie,
83 const void* client, size_t nclient,
84 uint32_t flags,
85 const void* userdata, size_t nuserdata);
86 static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle,
87 const void* cookie,
88 protocol_binary_request_header *request,
89 ADD_RESPONSE response);
90
91 union vbucket_info_adapter {
92 char c;
93 struct vbucket_info v;
94 };
95
set_vbucket_state(struct default_engine * e,uint16_t vbid,enum vbucket_state to)96 static void set_vbucket_state(struct default_engine *e,
97 uint16_t vbid, enum vbucket_state to) {
98 union vbucket_info_adapter vi;
99 vi.c = e->vbucket_infos[vbid];
100 vi.v.state = to;
101 e->vbucket_infos[vbid] = vi.c;
102 }
103
get_vbucket_state(struct default_engine * e,uint16_t vbid)104 static enum vbucket_state get_vbucket_state(struct default_engine *e,
105 uint16_t vbid) {
106 union vbucket_info_adapter vi;
107 vi.c = e->vbucket_infos[vbid];
108 return vi.v.state;
109 }
110
handled_vbucket(struct default_engine * e,uint16_t vbid)111 static bool handled_vbucket(struct default_engine *e, uint16_t vbid) {
112 return e->config.ignore_vbucket
113 || (get_vbucket_state(e, vbid) == VBUCKET_STATE_ACTIVE);
114 }
115
116 /* mechanism for handling bad vbucket requests */
117 #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; }
118
119 static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie,
120 const item* item, item_info *item_info);
121
vbucket_state_name(enum vbucket_state s)122 static const char const * vbucket_state_name(enum vbucket_state s) {
123 static const char const * vbucket_states[] = {
124 "dead", "active", "replica", "pending"
125 };
126 return vbucket_states[s];
127 }
128
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)129 ENGINE_ERROR_CODE create_instance(uint64_t interface,
130 GET_SERVER_API get_server_api,
131 ENGINE_HANDLE **handle) {
132 SERVER_HANDLE_V1 *api = get_server_api();
133 if (interface != 1 || api == NULL) {
134 return ENGINE_ENOTSUP;
135 }
136
137 struct default_engine *engine = malloc(sizeof(*engine));
138 if (engine == NULL) {
139 return ENGINE_ENOMEM;
140 }
141
142 struct default_engine default_engine = {
143 .engine = {
144 .interface = {
145 .interface = 1
146 },
147 .get_info = default_get_info,
148 .initialize = default_initialize,
149 .destroy = default_destroy,
150 .allocate = default_item_allocate,
151 .remove = default_item_delete,
152 .release = default_item_release,
153 .get = default_get,
154 .get_stats = default_get_stats,
155 .reset_stats = default_reset_stats,
156 .store = default_store,
157 .arithmetic = default_arithmetic,
158 .flush = default_flush,
159 .unknown_command = default_unknown_command,
160 .item_set_cas = item_set_cas,
161 .get_item_info = get_item_info,
162 .get_tap_iterator = get_tap_iterator
163 },
164 .server = *api,
165 .get_server_api = get_server_api,
166 .initialized = true,
167 .assoc = {
168 .hashpower = 16,
169 },
170 .slabs = {
171 .lock = PTHREAD_MUTEX_INITIALIZER
172 },
173 .cache_lock = PTHREAD_MUTEX_INITIALIZER,
174 .stats = {
175 .lock = PTHREAD_MUTEX_INITIALIZER,
176 },
177 .config = {
178 .use_cas = true,
179 .verbose = 0,
180 .oldest_live = 0,
181 .evict_to_free = true,
182 .maxbytes = 64 * 1024 * 1024,
183 .preallocate = false,
184 .factor = 1.25,
185 .chunk_size = 48,
186 .item_size_max= 1024 * 1024,
187 },
188 .scrubber = {
189 .lock = PTHREAD_MUTEX_INITIALIZER,
190 },
191 /* FIXME: compilation issue on solaris x86
192 .info.engine_info = {
193 .description = "Default engine v0.1",
194 .num_features = 1,
195 .features = {
196 [0].feature = ENGINE_FEATURE_LRU
197 }
198 } */
199 };
200
201 *engine = default_engine;
202
203 *handle = (ENGINE_HANDLE*)&engine->engine;
204 return ENGINE_SUCCESS;
205 }
206
get_handle(ENGINE_HANDLE * handle)207 static inline struct default_engine* get_handle(ENGINE_HANDLE* handle) {
208 return (struct default_engine*)handle;
209 }
210
get_real_item(item * item)211 static inline hash_item* get_real_item(item* item) {
212 return (hash_item*)item;
213 }
214
default_get_info(ENGINE_HANDLE * handle)215 static const engine_info* default_get_info(ENGINE_HANDLE* handle) {
216 return &get_handle(handle)->info.engine_info;
217 }
218
default_initialize(ENGINE_HANDLE * handle,const char * config_str)219 static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle,
220 const char* config_str) {
221 struct default_engine* se = get_handle(handle);
222
223 ENGINE_ERROR_CODE ret = initalize_configuration(se, config_str);
224 if (ret != ENGINE_SUCCESS) {
225 return ret;
226 }
227
228 /* fixup feature_info */
229 if (se->config.use_cas) {
230 se->info.engine_info.features[se->info.engine_info.num_features++].feature = ENGINE_FEATURE_CAS;
231 }
232
233 ret = assoc_init(se);
234 if (ret != ENGINE_SUCCESS) {
235 return ret;
236 }
237
238 ret = slabs_init(se, se->config.maxbytes, se->config.factor,
239 se->config.preallocate);
240 if (ret != ENGINE_SUCCESS) {
241 return ret;
242 }
243
244 return ENGINE_SUCCESS;
245 }
246
default_destroy(ENGINE_HANDLE * handle,bool force)247 static void default_destroy(ENGINE_HANDLE* handle, bool force) {
248 struct default_engine* se = get_handle(handle);
249
250 if (se->initialized) {
251 pthread_mutex_destroy(&se->cache_lock);
252 pthread_mutex_destroy(&se->stats.lock);
253 pthread_mutex_destroy(&se->slabs.lock);
254 se->initialized = false;
255 free(se);
256 }
257 }
258
default_item_allocate(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const size_t nkey,const size_t nbytes,const int flags,const rel_time_t exptime)259 static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle,
260 const void* cookie,
261 item **item,
262 const void* key,
263 const size_t nkey,
264 const size_t nbytes,
265 const int flags,
266 const rel_time_t exptime) {
267 struct default_engine* engine = get_handle(handle);
268 size_t ntotal = sizeof(hash_item) + nkey + nbytes;
269 if (engine->config.use_cas) {
270 ntotal += sizeof(uint64_t);
271 }
272 unsigned int id = slabs_clsid(engine, ntotal);
273 if (id == 0) {
274 return ENGINE_E2BIG;
275 }
276
277 hash_item *it;
278 it = item_alloc(engine, key, nkey, flags, engine->server.core->realtime(exptime),
279 nbytes, cookie);
280
281 if (it != NULL) {
282 *item = it;
283 return ENGINE_SUCCESS;
284 } else {
285 return ENGINE_ENOMEM;
286 }
287 }
288
default_item_delete(ENGINE_HANDLE * handle,const void * cookie,const void * key,const size_t nkey,uint64_t cas,uint16_t vbucket)289 static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle,
290 const void* cookie,
291 const void* key,
292 const size_t nkey,
293 uint64_t cas,
294 uint16_t vbucket)
295 {
296 struct default_engine* engine = get_handle(handle);
297 VBUCKET_GUARD(engine, vbucket);
298
299 hash_item *it = item_get(engine, key, nkey);
300 if (it == NULL) {
301 return ENGINE_KEY_ENOENT;
302 }
303
304 if (cas == 0 || cas == item_get_cas(it)) {
305 item_unlink(engine, it);
306 item_release(engine, it);
307 } else {
308 return ENGINE_KEY_EEXISTS;
309 }
310
311 return ENGINE_SUCCESS;
312 }
313
default_item_release(ENGINE_HANDLE * handle,const void * cookie,item * item)314 static void default_item_release(ENGINE_HANDLE* handle,
315 const void *cookie,
316 item* item) {
317 item_release(get_handle(handle), get_real_item(item));
318 }
319
default_get(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const int nkey,uint16_t vbucket)320 static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle,
321 const void* cookie,
322 item** item,
323 const void* key,
324 const int nkey,
325 uint16_t vbucket) {
326 struct default_engine *engine = get_handle(handle);
327 VBUCKET_GUARD(engine, vbucket);
328
329 *item = item_get(engine, key, nkey);
330 if (*item != NULL) {
331 return ENGINE_SUCCESS;
332 } else {
333 return ENGINE_KEY_ENOENT;
334 }
335 }
336
stats_vbucket(struct default_engine * e,ADD_STAT add_stat,const void * cookie)337 static void stats_vbucket(struct default_engine *e,
338 ADD_STAT add_stat,
339 const void *cookie) {
340 for (int i = 0; i < NUM_VBUCKETS; i++) {
341 enum vbucket_state state = get_vbucket_state(e, i);
342 if (state != VBUCKET_STATE_DEAD) {
343 char buf[16];
344 snprintf(buf, sizeof(buf), "vb_%d", i);
345 const char * state_name = vbucket_state_name(state);
346 add_stat(buf, strlen(buf), state_name, strlen(state_name), cookie);
347 }
348 }
349 }
350
default_get_stats(ENGINE_HANDLE * handle,const void * cookie,const char * stat_key,int nkey,ADD_STAT add_stat)351 static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle,
352 const void* cookie,
353 const char* stat_key,
354 int nkey,
355 ADD_STAT add_stat)
356 {
357 struct default_engine* engine = get_handle(handle);
358 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
359
360 if (stat_key == NULL) {
361 char val[128];
362 int len;
363
364 pthread_mutex_lock(&engine->stats.lock);
365 len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.evictions);
366 add_stat("evictions", 9, val, len, cookie);
367 len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_items);
368 add_stat("curr_items", 10, val, len, cookie);
369 len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.total_items);
370 add_stat("total_items", 11, val, len, cookie);
371 len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_bytes);
372 add_stat("bytes", 5, val, len, cookie);
373 len = sprintf(val, "%"PRIu64, engine->stats.reclaimed);
374 add_stat("reclaimed", 9, val, len, cookie);
375 len = sprintf(val, "%"PRIu64, (uint64_t)engine->config.maxbytes);
376 add_stat("engine_maxbytes", 15, val, len, cookie);
377 pthread_mutex_unlock(&engine->stats.lock);
378 } else if (strncmp(stat_key, "slabs", 5) == 0) {
379 slabs_stats(engine, add_stat, cookie);
380 } else if (strncmp(stat_key, "items", 5) == 0) {
381 item_stats(engine, add_stat, cookie);
382 } else if (strncmp(stat_key, "sizes", 5) == 0) {
383 item_stats_sizes(engine, add_stat, cookie);
384 } else if (strncmp(stat_key, "vbucket", 7) == 0) {
385 stats_vbucket(engine, add_stat, cookie);
386 } else if (strncmp(stat_key, "scrub", 5) == 0) {
387 char val[128];
388 int len;
389
390 pthread_mutex_lock(&engine->scrubber.lock);
391 if (engine->scrubber.running) {
392 add_stat("scrubber:status", 15, "running", 7, cookie);
393 } else {
394 add_stat("scrubber:status", 15, "stopped", 7, cookie);
395 }
396
397 if (engine->scrubber.started != 0) {
398 if (engine->scrubber.stopped != 0) {
399 time_t diff = engine->scrubber.started - engine->scrubber.stopped;
400 len = sprintf(val, "%"PRIu64, (uint64_t)diff);
401 add_stat("scrubber:last_run", 17, val, len, cookie);
402 }
403
404 len = sprintf(val, "%"PRIu64, engine->scrubber.visited);
405 add_stat("scrubber:visited", 16, val, len, cookie);
406 len = sprintf(val, "%"PRIu64, engine->scrubber.cleaned);
407 add_stat("scrubber:cleaned", 16, val, len, cookie);
408 }
409 pthread_mutex_unlock(&engine->scrubber.lock);
410 } else {
411 ret = ENGINE_KEY_ENOENT;
412 }
413
414 return ret;
415 }
416
default_store(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,uint16_t vbucket)417 static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle,
418 const void *cookie,
419 item* item,
420 uint64_t *cas,
421 ENGINE_STORE_OPERATION operation,
422 uint16_t vbucket) {
423 struct default_engine *engine = get_handle(handle);
424 VBUCKET_GUARD(engine, vbucket);
425 return store_item(engine, get_real_item(item), cas, operation,
426 cookie);
427 }
428
default_arithmetic(ENGINE_HANDLE * handle,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result,uint16_t vbucket)429 static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle,
430 const void* cookie,
431 const void* key,
432 const int nkey,
433 const bool increment,
434 const bool create,
435 const uint64_t delta,
436 const uint64_t initial,
437 const rel_time_t exptime,
438 uint64_t *cas,
439 uint64_t *result,
440 uint16_t vbucket) {
441 struct default_engine *engine = get_handle(handle);
442 VBUCKET_GUARD(engine, vbucket);
443
444 return arithmetic(engine, cookie, key, nkey, increment,
445 create, delta, initial, exptime, cas,
446 result);
447 }
448
default_flush(ENGINE_HANDLE * handle,const void * cookie,time_t when)449 static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle,
450 const void* cookie, time_t when) {
451 item_flush_expired(get_handle(handle), when);
452
453 return ENGINE_SUCCESS;
454 }
455
default_reset_stats(ENGINE_HANDLE * handle,const void * cookie)456 static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie) {
457 struct default_engine *engine = get_handle(handle);
458 item_stats_reset(engine);
459
460 pthread_mutex_lock(&engine->stats.lock);
461 engine->stats.evictions = 0;
462 engine->stats.reclaimed = 0;
463 engine->stats.total_items = 0;
464 pthread_mutex_unlock(&engine->stats.lock);
465 }
466
tap_always_pause(ENGINE_HANDLE * e,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)467 static tap_event_t tap_always_pause(ENGINE_HANDLE *e,
468 const void *cookie, item **itm, void **es,
469 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
470 uint32_t *seqno, uint16_t *vbucket) {
471 return TAP_PAUSE;
472 }
473
tap_always_disconnect(ENGINE_HANDLE * e,const void * cookie,item ** itm,void ** es,uint16_t * nes,uint8_t * ttl,uint16_t * flags,uint32_t * seqno,uint16_t * vbucket)474 static tap_event_t tap_always_disconnect(ENGINE_HANDLE *e,
475 const void *cookie, item **itm, void **es,
476 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
477 uint32_t *seqno, uint16_t *vbucket) {
478 return TAP_DISCONNECT;
479 }
480
get_tap_iterator(ENGINE_HANDLE * handle,const void * cookie,const void * client,size_t nclient,uint32_t flags,const void * userdata,size_t nuserdata)481 static TAP_ITERATOR get_tap_iterator(ENGINE_HANDLE* handle, const void* cookie,
482 const void* client, size_t nclient,
483 uint32_t flags,
484 const void* userdata, size_t nuserdata) {
485 TAP_ITERATOR rv = tap_always_pause;
486 if ((flags & TAP_CONNECT_FLAG_DUMP)
487 || (flags & TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS)) {
488 rv = tap_always_disconnect;
489 }
490 return rv;
491 }
492
initalize_configuration(struct default_engine * se,const char * cfg_str)493 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
494 const char *cfg_str) {
495 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
496
497 se->config.vb0 = true;
498
499 if (cfg_str != NULL) {
500 struct config_item items[] = {
501 { .key = "use_cas",
502 .datatype = DT_BOOL,
503 .value.dt_bool = &se->config.use_cas },
504 { .key = "verbose",
505 .datatype = DT_SIZE,
506 .value.dt_size = &se->config.verbose },
507 { .key = "eviction",
508 .datatype = DT_BOOL,
509 .value.dt_bool = &se->config.evict_to_free },
510 { .key = "cache_size",
511 .datatype = DT_SIZE,
512 .value.dt_size = &se->config.maxbytes },
513 { .key = "preallocate",
514 .datatype = DT_BOOL,
515 .value.dt_bool = &se->config.preallocate },
516 { .key = "factor",
517 .datatype = DT_FLOAT,
518 .value.dt_float = &se->config.factor },
519 { .key = "chunk_size",
520 .datatype = DT_SIZE,
521 .value.dt_size = &se->config.chunk_size },
522 { .key = "item_size_max",
523 .datatype = DT_SIZE,
524 .value.dt_size = &se->config.item_size_max },
525 { .key = "ignore_vbucket",
526 .datatype = DT_BOOL,
527 .value.dt_bool = &se->config.ignore_vbucket },
528 { .key = "vb0",
529 .datatype = DT_BOOL,
530 .value.dt_bool = &se->config.vb0 },
531 { .key = "config_file",
532 .datatype = DT_CONFIGFILE },
533 { .key = NULL}
534 };
535
536 ret = se->server.core->parse_config(cfg_str, items, stderr);
537 }
538
539 if (se->config.vb0) {
540 set_vbucket_state(se, 0, VBUCKET_STATE_ACTIVE);
541 }
542
543 return ENGINE_SUCCESS;
544 }
545
set_vbucket(struct default_engine * e,protocol_binary_request_header * request,const char ** msg)546 static protocol_binary_response_status set_vbucket(struct default_engine *e,
547 protocol_binary_request_header *request,
548 const char **msg) {
549 protocol_binary_request_no_extras *req =
550 (protocol_binary_request_no_extras*)request;
551 assert(req);
552
553 char keyz[32];
554 char valz[32];
555
556 // Read the key.
557 int keylen = ntohs(req->message.header.request.keylen);
558 if (keylen >= (int)sizeof(keyz)) {
559 *msg = "Key is too large.";
560 return PROTOCOL_BINARY_RESPONSE_EINVAL;
561 }
562 memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
563 keyz[keylen] = 0x00;
564
565 // Read the value.
566 size_t bodylen = ntohl(req->message.header.request.bodylen)
567 - ntohs(req->message.header.request.keylen);
568 if (bodylen >= sizeof(valz)) {
569 *msg = "Value is too large.";
570 return PROTOCOL_BINARY_RESPONSE_EINVAL;
571 }
572 memcpy(valz, (char*)request + sizeof(req->message.header)
573 + keylen, bodylen);
574 valz[bodylen] = 0x00;
575
576 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
577 *msg = "Configured";
578
579 enum vbucket_state state;
580 if (strcmp(valz, "active") == 0) {
581 state = VBUCKET_STATE_ACTIVE;
582 } else if(strcmp(valz, "replica") == 0) {
583 state = VBUCKET_STATE_REPLICA;
584 } else if(strcmp(valz, "pending") == 0) {
585 state = VBUCKET_STATE_PENDING;
586 } else if(strcmp(valz, "dead") == 0) {
587 state = VBUCKET_STATE_DEAD;
588 } else {
589 *msg = "Invalid state.";
590 return PROTOCOL_BINARY_RESPONSE_EINVAL;
591 }
592
593 uint32_t vbucket = 0;
594 if (!safe_strtoul(keyz, &vbucket) || vbucket > NUM_VBUCKETS) {
595 *msg = "Value out of range.";
596 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
597 } else {
598 set_vbucket_state(e, (uint16_t)vbucket, state);
599 }
600
601 return rv;
602 }
603
get_vbucket(struct default_engine * e,protocol_binary_request_header * request,const char ** msg)604 static protocol_binary_response_status get_vbucket(struct default_engine *e,
605 protocol_binary_request_header *request,
606 const char **msg) {
607 protocol_binary_request_no_extras *req =
608 (protocol_binary_request_no_extras*)request;
609 assert(req);
610
611 char keyz[8]; // stringy 2^16 int
612
613 // Read the key.
614 int keylen = ntohs(req->message.header.request.keylen);
615 if (keylen >= (int)sizeof(keyz)) {
616 *msg = "Key is too large.";
617 return PROTOCOL_BINARY_RESPONSE_EINVAL;
618 }
619 memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
620 keyz[keylen] = 0x00;
621
622 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
623
624 uint32_t vbucket = 0;
625 if (!safe_strtoul(keyz, &vbucket) || vbucket > NUM_VBUCKETS) {
626 *msg = "Value out of range.";
627 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
628 } else {
629 *msg = vbucket_state_name(get_vbucket_state(e, (uint16_t)vbucket));
630 }
631
632 return rv;
633 }
634
rm_vbucket(struct default_engine * e,protocol_binary_request_header * request,const char ** msg)635 static protocol_binary_response_status rm_vbucket(struct default_engine *e,
636 protocol_binary_request_header *request,
637 const char **msg) {
638 protocol_binary_request_no_extras *req =
639 (protocol_binary_request_no_extras*)request;
640 assert(req);
641
642 char keyz[8]; // stringy 2^16 int
643
644 // Read the key.
645 int keylen = ntohs(req->message.header.request.keylen);
646 if (keylen >= (int)sizeof(keyz)) {
647 *msg = "Key is too large.";
648 return PROTOCOL_BINARY_RESPONSE_EINVAL;
649 }
650 memcpy(keyz, ((char*)request) + sizeof(req->message.header), keylen);
651 keyz[keylen] = 0x00;
652
653 protocol_binary_response_status rv = PROTOCOL_BINARY_RESPONSE_SUCCESS;
654
655 uint32_t vbucket = 0;
656 if (!safe_strtoul(keyz, &vbucket) || vbucket > NUM_VBUCKETS) {
657 *msg = "Value out of range.";
658 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
659 } else {
660 set_vbucket_state(e, (uint16_t)vbucket, VBUCKET_STATE_DEAD);
661 }
662
663 assert(msg);
664 return rv;
665 }
666
scrub_cmd(struct default_engine * e,protocol_binary_request_header * request,const char ** msg)667 static protocol_binary_response_status scrub_cmd(struct default_engine *e,
668 protocol_binary_request_header *request,
669 const char **msg) {
670 return item_start_scrub(e) ? PROTOCOL_BINARY_RESPONSE_SUCCESS
671 : PROTOCOL_BINARY_RESPONSE_EBUSY;
672 }
673
default_unknown_command(ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)674 static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle,
675 const void* cookie,
676 protocol_binary_request_header *request,
677 ADD_RESPONSE response)
678 {
679 struct default_engine* e = get_handle(handle);
680
681 bool handled = true;
682 const char *msg = NULL;
683 protocol_binary_response_status res =
684 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
685
686 switch(request->request.opcode) {
687 case PROTOCOL_BINARY_CMD_SCRUB:
688 res = scrub_cmd(e, request, &msg);
689 break;
690 case CMD_DEL_VBUCKET:
691 res = rm_vbucket(e, request, &msg);
692 break;
693 case CMD_SET_VBUCKET:
694 res = set_vbucket(e, request, &msg);
695 break;
696 case CMD_GET_VBUCKET:
697 res = get_vbucket(e, request, &msg);
698 break;
699 default:
700 handled = false;
701 break;
702 }
703
704 bool sent = false;
705 if (handled) {
706 size_t msg_size = msg ? strlen(msg) : 0;
707 sent = response(NULL, 0, NULL, 0,
708 msg, (uint16_t)msg_size,
709 PROTOCOL_BINARY_RAW_BYTES,
710 (uint16_t)res, 0, cookie);
711 } else {
712 sent = response(NULL, 0, NULL, 0, NULL, 0,
713 PROTOCOL_BINARY_RAW_BYTES,
714 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
715 }
716
717 if (sent) {
718 return ENGINE_SUCCESS;
719 } else {
720 return ENGINE_FAILED;
721 }
722 }
723
724
item_get_cas(const hash_item * item)725 uint64_t item_get_cas(const hash_item* item)
726 {
727 if (item->iflag & ITEM_WITH_CAS) {
728 return *(uint64_t*)(item + 1);
729 }
730 return 0;
731 }
732
item_set_cas(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t val)733 void item_set_cas(ENGINE_HANDLE *handle, const void *cookie,
734 item* item, uint64_t val)
735 {
736 hash_item* it = get_real_item(item);
737 if (it->iflag & ITEM_WITH_CAS) {
738 *(uint64_t*)(it + 1) = val;
739 }
740 }
741
item_get_key(const hash_item * item)742 const void* item_get_key(const hash_item* item)
743 {
744 char *ret = (void*)(item + 1);
745 if (item->iflag & ITEM_WITH_CAS) {
746 ret += sizeof(uint64_t);
747 }
748
749 return ret;
750 }
751
item_get_data(const hash_item * item)752 char* item_get_data(const hash_item* item)
753 {
754 return ((char*)item_get_key(item)) + item->nkey;
755 }
756
item_get_clsid(const hash_item * item)757 uint8_t item_get_clsid(const hash_item* item)
758 {
759 return 0;
760 }
761
get_item_info(ENGINE_HANDLE * handle,const void * cookie,const item * item,item_info * item_info)762 static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie,
763 const item* item, item_info *item_info)
764 {
765 hash_item* it = (hash_item*)item;
766 if (item_info->nvalue < 1) {
767 return false;
768 }
769 item_info->cas = item_get_cas(it);
770 item_info->exptime = it->exptime;
771 item_info->nbytes = it->nbytes;
772 item_info->flags = it->flags;
773 item_info->clsid = it->slabs_clsid;
774 item_info->nkey = it->nkey;
775 item_info->nvalue = 1;
776 item_info->key = item_get_key(it);
777 item_info->value[0].iov_base = item_get_data(it);
778 item_info->value[0].iov_len = it->nbytes;
779 return true;
780 }
781