1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <string.h>
7 #include <assert.h>
8 #include <errno.h>
9 #include <ctype.h>
10 #include <unistd.h>
11 #include <stddef.h>
12 #include <inttypes.h>
13
14 #include "default_engine.h"
15 #include "memcached/util.h"
16 #include "memcached/config_parser.h"
17
18 static const engine_info* default_get_info(ENGINE_HANDLE* handle);
19 static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle,
20 const char* config_str);
21 static void default_destroy(ENGINE_HANDLE* handle,
22 const bool force);
23 static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle,
24 const void* cookie,
25 item **item,
26 const void* key,
27 const size_t nkey,
28 const size_t nbytes,
29 const int flags,
30 const rel_time_t exptime);
31 static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle,
32 const void* cookie,
33 const void* key,
34 const size_t nkey,
35 uint64_t cas,
36 uint16_t vbucket);
37
38 static void default_item_release(ENGINE_HANDLE* handle, const void *cookie,
39 item* item);
40 static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle,
41 const void* cookie,
42 item** item,
43 const void* key,
44 const int nkey,
45 uint16_t vbucket);
46 static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle,
47 const void *cookie,
48 const char *stat_key,
49 int nkey,
50 ADD_STAT add_stat);
51 static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie);
52 static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle,
53 const void *cookie,
54 item* item,
55 uint64_t *cas,
56 ENGINE_STORE_OPERATION operation,
57 uint16_t vbucket);
58 static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle,
59 const void* cookie,
60 const void* key,
61 const int nkey,
62 const bool increment,
63 const bool create,
64 const uint64_t delta,
65 const uint64_t initial,
66 const rel_time_t exptime,
67 uint64_t *cas,
68 uint64_t *result,
69 uint16_t vbucket);
70 static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle,
71 const void* cookie, time_t when);
72 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
73 const char *cfg_str);
74 static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle,
75 const void* cookie,
76 protocol_binary_request_header *request,
77 ADD_RESPONSE response);
78
79 static ENGINE_ERROR_CODE default_tap_notify(ENGINE_HANDLE* handle,
80 const void *cookie,
81 void *engine_specific,
82 uint16_t nengine,
83 uint8_t ttl,
84 uint16_t tap_flags,
85 tap_event_t tap_event,
86 uint32_t tap_seqno,
87 const void *key,
88 size_t nkey,
89 uint32_t flags,
90 uint32_t exptime,
91 uint64_t cas,
92 const void *data,
93 size_t ndata,
94 uint16_t vbucket);
95
96 static TAP_ITERATOR default_get_tap_iterator(ENGINE_HANDLE* handle,
97 const void* cookie,
98 const void* client,
99 size_t nclient,
100 uint32_t flags,
101 const void* userdata,
102 size_t nuserdata);
103
104 static void default_handle_disconnect(const void *cookie,
105 ENGINE_EVENT_TYPE type,
106 const void *event_data,
107 const void *cb_data);
108
109 union vbucket_info_adapter {
110 char c;
111 struct vbucket_info v;
112 };
113
set_vbucket_state(struct default_engine * e,uint16_t vbid,vbucket_state_t to)114 static void set_vbucket_state(struct default_engine *e,
115 uint16_t vbid, vbucket_state_t to) {
116 union vbucket_info_adapter vi;
117 vi.c = e->vbucket_infos[vbid];
118 vi.v.state = to;
119 e->vbucket_infos[vbid] = vi.c;
120 }
121
get_vbucket_state(struct default_engine * e,uint16_t vbid)122 static vbucket_state_t get_vbucket_state(struct default_engine *e,
123 uint16_t vbid) {
124 union vbucket_info_adapter vi;
125 vi.c = e->vbucket_infos[vbid];
126 return vi.v.state;
127 }
128
handled_vbucket(struct default_engine * e,uint16_t vbid)129 static bool handled_vbucket(struct default_engine *e, uint16_t vbid) {
130 return e->config.ignore_vbucket
131 || (get_vbucket_state(e, vbid) == vbucket_state_active);
132 }
133
134 /* mechanism for handling bad vbucket requests */
135 #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; }
136
137 static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie,
138 const item* item, item_info *item_info);
139
vbucket_state_name(vbucket_state_t s)140 static const char const * vbucket_state_name(vbucket_state_t s) {
141 static const char const * vbucket_states[] = {
142 [vbucket_state_active] = "active",
143 [vbucket_state_replica] = "replica",
144 [vbucket_state_pending] = "pending",
145 [vbucket_state_dead] = "dead"
146 };
147 if (is_valid_vbucket_state_t(s)) {
148 return vbucket_states[s];
149 } else {
150 return "Illegal vbucket state";
151 }
152 }
153
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)154 ENGINE_ERROR_CODE create_instance(uint64_t interface,
155 GET_SERVER_API get_server_api,
156 ENGINE_HANDLE **handle) {
157 SERVER_HANDLE_V1 *api = get_server_api();
158 if (interface != 1 || api == NULL) {
159 return ENGINE_ENOTSUP;
160 }
161
162 struct default_engine *engine = malloc(sizeof(*engine));
163 if (engine == NULL) {
164 return ENGINE_ENOMEM;
165 }
166
167 struct default_engine default_engine = {
168 .engine = {
169 .interface = {
170 .interface = 1
171 },
172 .get_info = default_get_info,
173 .initialize = default_initialize,
174 .destroy = default_destroy,
175 .allocate = default_item_allocate,
176 .remove = default_item_delete,
177 .release = default_item_release,
178 .get = default_get,
179 .get_stats = default_get_stats,
180 .reset_stats = default_reset_stats,
181 .store = default_store,
182 .arithmetic = default_arithmetic,
183 .flush = default_flush,
184 .unknown_command = default_unknown_command,
185 .tap_notify = default_tap_notify,
186 .get_tap_iterator = default_get_tap_iterator,
187 .item_set_cas = item_set_cas,
188 .get_item_info = get_item_info
189 },
190 .server = *api,
191 .get_server_api = get_server_api,
192 .initialized = true,
193 .assoc = {
194 .hashpower = 16,
195 },
196 .slabs = {
197 .lock = PTHREAD_MUTEX_INITIALIZER
198 },
199 .cache_lock = PTHREAD_MUTEX_INITIALIZER,
200 .stats = {
201 .lock = PTHREAD_MUTEX_INITIALIZER,
202 },
203 .config = {
204 .use_cas = true,
205 .verbose = 0,
206 .oldest_live = 0,
207 .evict_to_free = true,
208 .maxbytes = 64 * 1024 * 1024,
209 .preallocate = false,
210 .factor = 1.25,
211 .chunk_size = 48,
212 .item_size_max= 1024 * 1024,
213 },
214 .scrubber = {
215 .lock = PTHREAD_MUTEX_INITIALIZER,
216 },
217 .tap_connections = {
218 .lock = PTHREAD_MUTEX_INITIALIZER,
219 .size = 10,
220 }
221 };
222 default_engine.info.engine_info.description = "Default engine v0.1";
223 default_engine.info.engine_info.num_features = 1;
224 default_engine.info.engine_info.features[0].feature = ENGINE_FEATURE_LRU;
225
226 *engine = default_engine;
227 engine->tap_connections.clients = calloc(default_engine.tap_connections.size, sizeof(void*));
228 if (engine->tap_connections.clients == NULL) {
229 free(engine);
230 return ENGINE_ENOMEM;
231 }
232 *handle = (ENGINE_HANDLE*)&engine->engine;
233 return ENGINE_SUCCESS;
234 }
235
get_handle(ENGINE_HANDLE * handle)236 static inline struct default_engine* get_handle(ENGINE_HANDLE* handle) {
237 return (struct default_engine*)handle;
238 }
239
get_real_item(item * item)240 static inline hash_item* get_real_item(item* item) {
241 return (hash_item*)item;
242 }
243
default_get_info(ENGINE_HANDLE * handle)244 static const engine_info* default_get_info(ENGINE_HANDLE* handle) {
245 return &get_handle(handle)->info.engine_info;
246 }
247
default_initialize(ENGINE_HANDLE * handle,const char * config_str)248 static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle,
249 const char* config_str) {
250 struct default_engine* se = get_handle(handle);
251
252 ENGINE_ERROR_CODE ret = initalize_configuration(se, config_str);
253 if (ret != ENGINE_SUCCESS) {
254 return ret;
255 }
256
257 /* fixup feature_info */
258 if (se->config.use_cas) {
259 se->info.engine_info.features[se->info.engine_info.num_features++].feature = ENGINE_FEATURE_CAS;
260 }
261
262 ret = assoc_init(se);
263 if (ret != ENGINE_SUCCESS) {
264 return ret;
265 }
266
267 ret = slabs_init(se, se->config.maxbytes, se->config.factor,
268 se->config.preallocate);
269 if (ret != ENGINE_SUCCESS) {
270 return ret;
271 }
272
273 se->server.callback->register_callback(handle, ON_DISCONNECT, default_handle_disconnect, handle);
274
275 return ENGINE_SUCCESS;
276 }
277
default_destroy(ENGINE_HANDLE * handle,const bool force)278 static void default_destroy(ENGINE_HANDLE* handle, const bool force) {
279 (void) force;
280 struct default_engine* se = get_handle(handle);
281
282 if (se->initialized) {
283 pthread_mutex_destroy(&se->cache_lock);
284 pthread_mutex_destroy(&se->stats.lock);
285 pthread_mutex_destroy(&se->slabs.lock);
286 se->initialized = false;
287 free(se->tap_connections.clients);
288 free(se);
289 }
290 }
291
default_item_allocate(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const size_t nkey,const size_t nbytes,const int flags,const rel_time_t exptime)292 static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle,
293 const void* cookie,
294 item **item,
295 const void* key,
296 const size_t nkey,
297 const size_t nbytes,
298 const int flags,
299 const rel_time_t exptime) {
300 struct default_engine* engine = get_handle(handle);
301 size_t ntotal = sizeof(hash_item) + nkey + nbytes;
302 if (engine->config.use_cas) {
303 ntotal += sizeof(uint64_t);
304 }
305 unsigned int id = slabs_clsid(engine, ntotal);
306 if (id == 0) {
307 return ENGINE_E2BIG;
308 }
309
310 hash_item *it;
311 it = item_alloc(engine, key, nkey, flags, engine->server.core->realtime(exptime),
312 nbytes, cookie);
313
314 if (it != NULL) {
315 *item = it;
316 return ENGINE_SUCCESS;
317 } else {
318 return ENGINE_ENOMEM;
319 }
320 }
321
default_item_delete(ENGINE_HANDLE * handle,const void * cookie,const void * key,const size_t nkey,uint64_t cas,uint16_t vbucket)322 static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle,
323 const void* cookie,
324 const void* key,
325 const size_t nkey,
326 uint64_t cas,
327 uint16_t vbucket)
328 {
329 struct default_engine* engine = get_handle(handle);
330 VBUCKET_GUARD(engine, vbucket);
331
332 hash_item *it = item_get(engine, key, nkey);
333 if (it == NULL) {
334 return ENGINE_KEY_ENOENT;
335 }
336
337 if (cas == 0 || cas == item_get_cas(it)) {
338 item_unlink(engine, it);
339 item_release(engine, it);
340 } else {
341 return ENGINE_KEY_EEXISTS;
342 }
343
344 return ENGINE_SUCCESS;
345 }
346
default_item_release(ENGINE_HANDLE * handle,const void * cookie,item * item)347 static void default_item_release(ENGINE_HANDLE* handle,
348 const void *cookie,
349 item* item) {
350 item_release(get_handle(handle), get_real_item(item));
351 }
352
default_get(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const int nkey,uint16_t vbucket)353 static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle,
354 const void* cookie,
355 item** item,
356 const void* key,
357 const int nkey,
358 uint16_t vbucket) {
359 struct default_engine *engine = get_handle(handle);
360 VBUCKET_GUARD(engine, vbucket);
361
362 *item = item_get(engine, key, nkey);
363 if (*item != NULL) {
364 return ENGINE_SUCCESS;
365 } else {
366 return ENGINE_KEY_ENOENT;
367 }
368 }
369
stats_vbucket(struct default_engine * e,ADD_STAT add_stat,const void * cookie)370 static void stats_vbucket(struct default_engine *e,
371 ADD_STAT add_stat,
372 const void *cookie) {
373 for (int i = 0; i < NUM_VBUCKETS; i++) {
374 vbucket_state_t state = get_vbucket_state(e, i);
375 if (state != vbucket_state_dead) {
376 char buf[16];
377 snprintf(buf, sizeof(buf), "vb_%d", i);
378 const char * state_name = vbucket_state_name(state);
379 add_stat(buf, strlen(buf), state_name, strlen(state_name), cookie);
380 }
381 }
382 }
383
default_get_stats(ENGINE_HANDLE * handle,const void * cookie,const char * stat_key,int nkey,ADD_STAT add_stat)384 static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle,
385 const void* cookie,
386 const char* stat_key,
387 int nkey,
388 ADD_STAT add_stat)
389 {
390 struct default_engine* engine = get_handle(handle);
391 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
392
393 if (stat_key == NULL) {
394 char val[128];
395 int len;
396
397 pthread_mutex_lock(&engine->stats.lock);
398 len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.evictions);
399 add_stat("evictions", 9, val, len, cookie);
400 len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_items);
401 add_stat("curr_items", 10, val, len, cookie);
402 len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.total_items);
403 add_stat("total_items", 11, val, len, cookie);
404 len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_bytes);
405 add_stat("bytes", 5, val, len, cookie);
406 len = sprintf(val, "%"PRIu64, engine->stats.reclaimed);
407 add_stat("reclaimed", 9, val, len, cookie);
408 len = sprintf(val, "%"PRIu64, (uint64_t)engine->config.maxbytes);
409 add_stat("engine_maxbytes", 15, val, len, cookie);
410 pthread_mutex_unlock(&engine->stats.lock);
411 } else if (strncmp(stat_key, "slabs", 5) == 0) {
412 slabs_stats(engine, add_stat, cookie);
413 } else if (strncmp(stat_key, "items", 5) == 0) {
414 item_stats(engine, add_stat, cookie);
415 } else if (strncmp(stat_key, "sizes", 5) == 0) {
416 item_stats_sizes(engine, add_stat, cookie);
417 } else if (strncmp(stat_key, "vbucket", 7) == 0) {
418 stats_vbucket(engine, add_stat, cookie);
419 } else if (strncmp(stat_key, "scrub", 5) == 0) {
420 char val[128];
421 int len;
422
423 pthread_mutex_lock(&engine->scrubber.lock);
424 if (engine->scrubber.running) {
425 add_stat("scrubber:status", 15, "running", 7, cookie);
426 } else {
427 add_stat("scrubber:status", 15, "stopped", 7, cookie);
428 }
429
430 if (engine->scrubber.started != 0) {
431 if (engine->scrubber.stopped != 0) {
432 time_t diff = engine->scrubber.started - engine->scrubber.stopped;
433 len = sprintf(val, "%"PRIu64, (uint64_t)diff);
434 add_stat("scrubber:last_run", 17, val, len, cookie);
435 }
436
437 len = sprintf(val, "%"PRIu64, engine->scrubber.visited);
438 add_stat("scrubber:visited", 16, val, len, cookie);
439 len = sprintf(val, "%"PRIu64, engine->scrubber.cleaned);
440 add_stat("scrubber:cleaned", 16, val, len, cookie);
441 }
442 pthread_mutex_unlock(&engine->scrubber.lock);
443 } else {
444 ret = ENGINE_KEY_ENOENT;
445 }
446
447 return ret;
448 }
449
default_store(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,uint16_t vbucket)450 static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle,
451 const void *cookie,
452 item* item,
453 uint64_t *cas,
454 ENGINE_STORE_OPERATION operation,
455 uint16_t vbucket) {
456 struct default_engine *engine = get_handle(handle);
457 VBUCKET_GUARD(engine, vbucket);
458 return store_item(engine, get_real_item(item), cas, operation,
459 cookie);
460 }
461
default_arithmetic(ENGINE_HANDLE * handle,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result,uint16_t vbucket)462 static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle,
463 const void* cookie,
464 const void* key,
465 const int nkey,
466 const bool increment,
467 const bool create,
468 const uint64_t delta,
469 const uint64_t initial,
470 const rel_time_t exptime,
471 uint64_t *cas,
472 uint64_t *result,
473 uint16_t vbucket) {
474 struct default_engine *engine = get_handle(handle);
475 VBUCKET_GUARD(engine, vbucket);
476
477 return arithmetic(engine, cookie, key, nkey, increment,
478 create, delta, initial, engine->server.core->realtime(exptime), cas,
479 result);
480 }
481
default_flush(ENGINE_HANDLE * handle,const void * cookie,time_t when)482 static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle,
483 const void* cookie, time_t when) {
484 item_flush_expired(get_handle(handle), when);
485
486 return ENGINE_SUCCESS;
487 }
488
default_reset_stats(ENGINE_HANDLE * handle,const void * cookie)489 static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie) {
490 struct default_engine *engine = get_handle(handle);
491 item_stats_reset(engine);
492
493 pthread_mutex_lock(&engine->stats.lock);
494 engine->stats.evictions = 0;
495 engine->stats.reclaimed = 0;
496 engine->stats.total_items = 0;
497 pthread_mutex_unlock(&engine->stats.lock);
498 }
499
initalize_configuration(struct default_engine * se,const char * cfg_str)500 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
501 const char *cfg_str) {
502 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
503
504 se->config.vb0 = true;
505
506 if (cfg_str != NULL) {
507 struct config_item items[] = {
508 { .key = "use_cas",
509 .datatype = DT_BOOL,
510 .value.dt_bool = &se->config.use_cas },
511 { .key = "verbose",
512 .datatype = DT_SIZE,
513 .value.dt_size = &se->config.verbose },
514 { .key = "eviction",
515 .datatype = DT_BOOL,
516 .value.dt_bool = &se->config.evict_to_free },
517 { .key = "cache_size",
518 .datatype = DT_SIZE,
519 .value.dt_size = &se->config.maxbytes },
520 { .key = "preallocate",
521 .datatype = DT_BOOL,
522 .value.dt_bool = &se->config.preallocate },
523 { .key = "factor",
524 .datatype = DT_FLOAT,
525 .value.dt_float = &se->config.factor },
526 { .key = "chunk_size",
527 .datatype = DT_SIZE,
528 .value.dt_size = &se->config.chunk_size },
529 { .key = "item_size_max",
530 .datatype = DT_SIZE,
531 .value.dt_size = &se->config.item_size_max },
532 { .key = "ignore_vbucket",
533 .datatype = DT_BOOL,
534 .value.dt_bool = &se->config.ignore_vbucket },
535 { .key = "vb0",
536 .datatype = DT_BOOL,
537 .value.dt_bool = &se->config.vb0 },
538 { .key = "config_file",
539 .datatype = DT_CONFIGFILE },
540 { .key = NULL}
541 };
542
543 ret = se->server.core->parse_config(cfg_str, items, stderr);
544 }
545
546 if (se->config.vb0) {
547 set_vbucket_state(se, 0, vbucket_state_active);
548 }
549
550 return ret;
551 }
552
set_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_set_vbucket * req,ADD_RESPONSE response)553 static bool set_vbucket(struct default_engine *e,
554 const void* cookie,
555 protocol_binary_request_set_vbucket *req,
556 ADD_RESPONSE response) {
557 size_t bodylen = ntohl(req->message.header.request.bodylen)
558 - ntohs(req->message.header.request.keylen);
559 if (bodylen != sizeof(vbucket_state_t)) {
560 const char *msg = "Incorrect packet format";
561 return response(NULL, 0, NULL, 0, msg, strlen(msg),
562 PROTOCOL_BINARY_RAW_BYTES,
563 PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
564 }
565 vbucket_state_t state;
566 memcpy(&state, &req->message.body.state, sizeof(state));
567 state = ntohl(state);
568
569 if (!is_valid_vbucket_state_t(state)) {
570 const char *msg = "Invalid vbucket state";
571 return response(NULL, 0, NULL, 0, msg, strlen(msg),
572 PROTOCOL_BINARY_RAW_BYTES,
573 PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
574 }
575
576 set_vbucket_state(e, ntohs(req->message.header.request.vbucket), state);
577 return response(NULL, 0, NULL, 0, &state, sizeof(state),
578 PROTOCOL_BINARY_RAW_BYTES,
579 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
580 }
581
get_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_get_vbucket * req,ADD_RESPONSE response)582 static bool get_vbucket(struct default_engine *e,
583 const void* cookie,
584 protocol_binary_request_get_vbucket *req,
585 ADD_RESPONSE response) {
586 vbucket_state_t state;
587 state = get_vbucket_state(e, ntohs(req->message.header.request.vbucket));
588 state = ntohl(state);
589
590 return response(NULL, 0, NULL, 0, &state, sizeof(state),
591 PROTOCOL_BINARY_RAW_BYTES,
592 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
593 }
594
rm_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_header * req,ADD_RESPONSE response)595 static bool rm_vbucket(struct default_engine *e,
596 const void *cookie,
597 protocol_binary_request_header *req,
598 ADD_RESPONSE response) {
599 set_vbucket_state(e, ntohs(req->request.vbucket), vbucket_state_dead);
600 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
601 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
602 }
603
scrub_cmd(struct default_engine * e,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)604 static bool scrub_cmd(struct default_engine *e,
605 const void *cookie,
606 protocol_binary_request_header *request,
607 ADD_RESPONSE response) {
608
609 protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
610 if (!item_start_scrub(e)) {
611 res = PROTOCOL_BINARY_RESPONSE_EBUSY;
612 }
613
614 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
615 res, 0, cookie);
616 }
617
touch(struct default_engine * e,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)618 static bool touch(struct default_engine *e, const void *cookie,
619 protocol_binary_request_header *request,
620 ADD_RESPONSE response) {
621 if (request->request.extlen != 4 || request->request.keylen == 0) {
622 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
623 PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
624 }
625
626 protocol_binary_request_touch *t = (void*)request;
627 void *key = t->bytes + sizeof(t->bytes);
628 uint32_t exptime = ntohl(t->message.body.expiration);
629 uint16_t nkey = ntohs(request->request.keylen);
630
631 hash_item *item = touch_item(e, key, nkey,
632 e->server.core->realtime(exptime));
633 if (item == NULL) {
634 if (request->request.opcode == PROTOCOL_BINARY_CMD_GATQ) {
635 return true;
636 } else {
637 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
638 PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, cookie);
639 }
640 } else {
641 bool ret;
642 if (request->request.opcode == PROTOCOL_BINARY_CMD_TOUCH) {
643 ret = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
644 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
645 } else {
646 ret = response(NULL, 0, &item->flags, sizeof(item->flags),
647 item_get_data(item), item->nbytes,
648 PROTOCOL_BINARY_RAW_BYTES,
649 PROTOCOL_BINARY_RESPONSE_SUCCESS,
650 item_get_cas(item), cookie);
651 }
652 item_release(e, item);
653 return ret;
654 }
655 }
656
default_unknown_command(ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)657 static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle,
658 const void* cookie,
659 protocol_binary_request_header *request,
660 ADD_RESPONSE response)
661 {
662 struct default_engine* e = get_handle(handle);
663 bool sent;
664
665 switch(request->request.opcode) {
666 case PROTOCOL_BINARY_CMD_SCRUB:
667 sent = scrub_cmd(e, cookie, request, response);
668 break;
669 case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
670 sent = rm_vbucket(e, cookie, request, response);
671 break;
672 case PROTOCOL_BINARY_CMD_SET_VBUCKET:
673 sent = set_vbucket(e, cookie, (void*)request, response);
674 break;
675 case PROTOCOL_BINARY_CMD_GET_VBUCKET:
676 sent = get_vbucket(e, cookie, (void*)request, response);
677 break;
678 case PROTOCOL_BINARY_CMD_TOUCH:
679 case PROTOCOL_BINARY_CMD_GAT:
680 case PROTOCOL_BINARY_CMD_GATQ:
681 sent = touch(e, cookie, request, response);
682 break;
683 default:
684 sent = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
685 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
686 break;
687 }
688
689 if (sent) {
690 return ENGINE_SUCCESS;
691 } else {
692 return ENGINE_FAILED;
693 }
694 }
695
696
item_get_cas(const hash_item * item)697 uint64_t item_get_cas(const hash_item* item)
698 {
699 if (item->iflag & ITEM_WITH_CAS) {
700 return *(uint64_t*)(item + 1);
701 }
702 return 0;
703 }
704
item_set_cas(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t val)705 void item_set_cas(ENGINE_HANDLE *handle, const void *cookie,
706 item* item, uint64_t val)
707 {
708 hash_item* it = get_real_item(item);
709 if (it->iflag & ITEM_WITH_CAS) {
710 *(uint64_t*)(it + 1) = val;
711 }
712 }
713
item_get_key(const hash_item * item)714 const void* item_get_key(const hash_item* item)
715 {
716 char *ret = (void*)(item + 1);
717 if (item->iflag & ITEM_WITH_CAS) {
718 ret += sizeof(uint64_t);
719 }
720
721 return ret;
722 }
723
item_get_data(const hash_item * item)724 char* item_get_data(const hash_item* item)
725 {
726 return ((char*)item_get_key(item)) + item->nkey;
727 }
728
item_get_clsid(const hash_item * item)729 uint8_t item_get_clsid(const hash_item* item)
730 {
731 return 0;
732 }
733
get_item_info(ENGINE_HANDLE * handle,const void * cookie,const item * item,item_info * item_info)734 static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie,
735 const item* item, item_info *item_info)
736 {
737 hash_item* it = (hash_item*)item;
738 if (item_info->nvalue < 1) {
739 return false;
740 }
741 item_info->cas = item_get_cas(it);
742 item_info->exptime = it->exptime;
743 item_info->nbytes = it->nbytes;
744 item_info->flags = it->flags;
745 item_info->clsid = it->slabs_clsid;
746 item_info->nkey = it->nkey;
747 item_info->nvalue = 1;
748 item_info->key = item_get_key(it);
749 item_info->value[0].iov_base = item_get_data(it);
750 item_info->value[0].iov_len = it->nbytes;
751 return true;
752 }
753
default_tap_notify(ENGINE_HANDLE * handle,const void * cookie,void * engine_specific,uint16_t nengine,uint8_t ttl,uint16_t tap_flags,tap_event_t tap_event,uint32_t tap_seqno,const void * key,size_t nkey,uint32_t flags,uint32_t exptime,uint64_t cas,const void * data,size_t ndata,uint16_t vbucket)754 static ENGINE_ERROR_CODE default_tap_notify(ENGINE_HANDLE* handle,
755 const void *cookie,
756 void *engine_specific,
757 uint16_t nengine,
758 uint8_t ttl,
759 uint16_t tap_flags,
760 tap_event_t tap_event,
761 uint32_t tap_seqno,
762 const void *key,
763 size_t nkey,
764 uint32_t flags,
765 uint32_t exptime,
766 uint64_t cas,
767 const void *data,
768 size_t ndata,
769 uint16_t vbucket) {
770 struct default_engine* engine = get_handle(handle);
771 vbucket_state_t state;
772 item *it;
773 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
774
775 switch (tap_event) {
776 case TAP_ACK:
777 /* We don't provide a tap stream, so we should never receive this */
778 abort();
779
780 case TAP_FLUSH:
781 return default_flush(handle, cookie, 0);
782
783 case TAP_DELETION:
784 return default_item_delete(handle, cookie, key, nkey, cas, vbucket);
785
786 case TAP_MUTATION:
787 it = engine->server.cookie->get_engine_specific(cookie);
788 if (it == NULL) {
789 ret = default_item_allocate(handle, cookie, &it, key, nkey, ndata, flags, exptime);
790 switch (ret) {
791 case ENGINE_SUCCESS:
792 break;
793 case ENGINE_ENOMEM:
794 return ENGINE_TMPFAIL;
795 default:
796 return ret;
797 }
798 }
799 memcpy(item_get_data(it), data, ndata);
800 engine->server.cookie->store_engine_specific(cookie, NULL);
801 item_set_cas(handle, cookie, it, cas);
802 ret = default_store(handle, cookie, it, &cas, OPERATION_SET, vbucket);
803 if (ret == ENGINE_EWOULDBLOCK) {
804 engine->server.cookie->store_engine_specific(cookie, it);
805 } else {
806 item_release(engine, it);
807 }
808
809 break;
810
811 case TAP_VBUCKET_SET:
812 if (nengine != sizeof(vbucket_state_t)) {
813 // illegal size of the vbucket set package...
814 return ENGINE_DISCONNECT;
815 }
816
817 memcpy(&state, engine_specific, nengine);
818 state = (vbucket_state_t)ntohl(state);
819
820 if (!is_valid_vbucket_state_t(state)) {
821 return ENGINE_DISCONNECT;
822 }
823
824 set_vbucket_state(engine, vbucket, state);
825 return ENGINE_SUCCESS;
826
827 case TAP_OPAQUE:
828 // not supported, ignore
829 default:
830 engine->server.log->get_logger()->log(EXTENSION_LOG_DEBUG, cookie,
831 "Ignoring unknown tap event: %x", tap_event);
832 }
833
834 return ret;
835 }
836
default_get_tap_iterator(ENGINE_HANDLE * handle,const void * cookie,const void * client,size_t nclient,uint32_t flags,const void * userdata,size_t nuserdata)837 static TAP_ITERATOR default_get_tap_iterator(ENGINE_HANDLE* handle,
838 const void* cookie,
839 const void* client,
840 size_t nclient,
841 uint32_t flags,
842 const void* userdata,
843 size_t nuserdata) {
844 struct default_engine* engine = get_handle(handle);
845
846 if ((flags & TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS)) { /* Not supported */
847 return NULL;
848 }
849
850 pthread_mutex_lock(&engine->tap_connections.lock);
851 int ii;
852 for (ii = 0; ii < engine->tap_connections.size; ++ii) {
853 if (engine->tap_connections.clients[ii] == NULL) {
854 engine->tap_connections.clients[ii] = cookie;
855 break;
856 }
857 }
858 pthread_mutex_unlock(&engine->tap_connections.lock);
859 if (ii == engine->tap_connections.size) {
860 // @todo allow more connections :)
861 return NULL;
862 }
863
864 if (!initialize_item_tap_walker(engine, cookie)) {
865 /* Failed to create */
866 pthread_mutex_lock(&engine->tap_connections.lock);
867 engine->tap_connections.clients[ii] = NULL;
868 pthread_mutex_unlock(&engine->tap_connections.lock);
869 return NULL;
870 }
871
872 return item_tap_walker;
873 }
874
default_handle_disconnect(const void * cookie,ENGINE_EVENT_TYPE type,const void * event_data,const void * cb_data)875 static void default_handle_disconnect(const void *cookie,
876 ENGINE_EVENT_TYPE type,
877 const void *event_data,
878 const void *cb_data) {
879 struct default_engine *engine = (struct default_engine*)cb_data;
880 pthread_mutex_lock(&engine->tap_connections.lock);
881 int ii;
882 for (ii = 0; ii < engine->tap_connections.size; ++ii) {
883 if (engine->tap_connections.clients[ii] == cookie) {
884 free(engine->server.cookie->get_engine_specific(cookie));
885 break;
886 }
887 }
888 pthread_mutex_unlock(&engine->tap_connections.lock);
889 }
890