1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 #include "config.h"
3 
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <string.h>
7 #include <assert.h>
8 #include <errno.h>
9 #include <ctype.h>
10 #include <unistd.h>
11 #include <stddef.h>
12 #include <inttypes.h>
13 
14 #include "default_engine.h"
15 #include "memcached/util.h"
16 #include "memcached/config_parser.h"
17 
18 static const engine_info* default_get_info(ENGINE_HANDLE* handle);
19 static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle,
20                                             const char* config_str);
21 static void default_destroy(ENGINE_HANDLE* handle,
22                             const bool force);
23 static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle,
24                                                const void* cookie,
25                                                item **item,
26                                                const void* key,
27                                                const size_t nkey,
28                                                const size_t nbytes,
29                                                const int flags,
30                                                const rel_time_t exptime);
31 static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle,
32                                              const void* cookie,
33                                              const void* key,
34                                              const size_t nkey,
35                                              uint64_t cas,
36                                              uint16_t vbucket);
37 
38 static void default_item_release(ENGINE_HANDLE* handle, const void *cookie,
39                                  item* item);
40 static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle,
41                                      const void* cookie,
42                                      item** item,
43                                      const void* key,
44                                      const int nkey,
45                                      uint16_t vbucket);
46 static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle,
47                   const void *cookie,
48                   const char *stat_key,
49                   int nkey,
50                   ADD_STAT add_stat);
51 static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie);
52 static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle,
53                                        const void *cookie,
54                                        item* item,
55                                        uint64_t *cas,
56                                        ENGINE_STORE_OPERATION operation,
57                                        uint16_t vbucket);
58 static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle,
59                                             const void* cookie,
60                                             const void* key,
61                                             const int nkey,
62                                             const bool increment,
63                                             const bool create,
64                                             const uint64_t delta,
65                                             const uint64_t initial,
66                                             const rel_time_t exptime,
67                                             uint64_t *cas,
68                                             uint64_t *result,
69                                             uint16_t vbucket);
70 static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle,
71                                        const void* cookie, time_t when);
72 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
73                                                  const char *cfg_str);
74 static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle,
75                                                  const void* cookie,
76                                                  protocol_binary_request_header *request,
77                                                  ADD_RESPONSE response);
78 
79 static ENGINE_ERROR_CODE default_tap_notify(ENGINE_HANDLE* handle,
80                                             const void *cookie,
81                                             void *engine_specific,
82                                             uint16_t nengine,
83                                             uint8_t ttl,
84                                             uint16_t tap_flags,
85                                             tap_event_t tap_event,
86                                             uint32_t tap_seqno,
87                                             const void *key,
88                                             size_t nkey,
89                                             uint32_t flags,
90                                             uint32_t exptime,
91                                             uint64_t cas,
92                                             const void *data,
93                                             size_t ndata,
94                                             uint16_t vbucket);
95 
96 static TAP_ITERATOR default_get_tap_iterator(ENGINE_HANDLE* handle,
97                                              const void* cookie,
98                                              const void* client,
99                                              size_t nclient,
100                                              uint32_t flags,
101                                              const void* userdata,
102                                              size_t nuserdata);
103 
104 static void default_handle_disconnect(const void *cookie,
105                                       ENGINE_EVENT_TYPE type,
106                                       const void *event_data,
107                                       const void *cb_data);
108 
109 union vbucket_info_adapter {
110     char c;
111     struct vbucket_info v;
112 };
113 
set_vbucket_state(struct default_engine * e,uint16_t vbid,vbucket_state_t to)114 static void set_vbucket_state(struct default_engine *e,
115                               uint16_t vbid, vbucket_state_t to) {
116     union vbucket_info_adapter vi;
117     vi.c = e->vbucket_infos[vbid];
118     vi.v.state = to;
119     e->vbucket_infos[vbid] = vi.c;
120 }
121 
get_vbucket_state(struct default_engine * e,uint16_t vbid)122 static vbucket_state_t get_vbucket_state(struct default_engine *e,
123                                          uint16_t vbid) {
124     union vbucket_info_adapter vi;
125     vi.c = e->vbucket_infos[vbid];
126     return vi.v.state;
127 }
128 
handled_vbucket(struct default_engine * e,uint16_t vbid)129 static bool handled_vbucket(struct default_engine *e, uint16_t vbid) {
130     return e->config.ignore_vbucket
131         || (get_vbucket_state(e, vbid) == vbucket_state_active);
132 }
133 
134 /* mechanism for handling bad vbucket requests */
135 #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; }
136 
137 static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie,
138                           const item* item, item_info *item_info);
139 
vbucket_state_name(vbucket_state_t s)140 static const char const * vbucket_state_name(vbucket_state_t s) {
141     static const char const * vbucket_states[] = {
142         [vbucket_state_active] = "active",
143         [vbucket_state_replica] = "replica",
144         [vbucket_state_pending] = "pending",
145         [vbucket_state_dead] = "dead"
146     };
147     if (is_valid_vbucket_state_t(s)) {
148         return vbucket_states[s];
149     } else {
150         return "Illegal vbucket state";
151     }
152 }
153 
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)154 ENGINE_ERROR_CODE create_instance(uint64_t interface,
155                                   GET_SERVER_API get_server_api,
156                                   ENGINE_HANDLE **handle) {
157    SERVER_HANDLE_V1 *api = get_server_api();
158    if (interface != 1 || api == NULL) {
159       return ENGINE_ENOTSUP;
160    }
161 
162    struct default_engine *engine = malloc(sizeof(*engine));
163    if (engine == NULL) {
164       return ENGINE_ENOMEM;
165    }
166 
167    struct default_engine default_engine = {
168       .engine = {
169          .interface = {
170             .interface = 1
171          },
172          .get_info = default_get_info,
173          .initialize = default_initialize,
174          .destroy = default_destroy,
175          .allocate = default_item_allocate,
176          .remove = default_item_delete,
177          .release = default_item_release,
178          .get = default_get,
179          .get_stats = default_get_stats,
180          .reset_stats = default_reset_stats,
181          .store = default_store,
182          .arithmetic = default_arithmetic,
183          .flush = default_flush,
184          .unknown_command = default_unknown_command,
185          .tap_notify = default_tap_notify,
186          .get_tap_iterator = default_get_tap_iterator,
187          .item_set_cas = item_set_cas,
188          .get_item_info = get_item_info
189       },
190       .server = *api,
191       .get_server_api = get_server_api,
192       .initialized = true,
193       .assoc = {
194          .hashpower = 16,
195       },
196       .slabs = {
197          .lock = PTHREAD_MUTEX_INITIALIZER
198       },
199       .cache_lock = PTHREAD_MUTEX_INITIALIZER,
200       .stats = {
201          .lock = PTHREAD_MUTEX_INITIALIZER,
202       },
203       .config = {
204          .use_cas = true,
205          .verbose = 0,
206          .oldest_live = 0,
207          .evict_to_free = true,
208          .maxbytes = 64 * 1024 * 1024,
209          .preallocate = false,
210          .factor = 1.25,
211          .chunk_size = 48,
212          .item_size_max= 1024 * 1024,
213        },
214       .scrubber = {
215          .lock = PTHREAD_MUTEX_INITIALIZER,
216       },
217       .tap_connections = {
218          .lock = PTHREAD_MUTEX_INITIALIZER,
219          .size = 10,
220       }
221   };
222   default_engine.info.engine_info.description = "Default engine v0.1";
223   default_engine.info.engine_info.num_features = 1;
224   default_engine.info.engine_info.features[0].feature = ENGINE_FEATURE_LRU;
225 
226    *engine = default_engine;
227    engine->tap_connections.clients = calloc(default_engine.tap_connections.size, sizeof(void*));
228    if (engine->tap_connections.clients == NULL) {
229        free(engine);
230        return ENGINE_ENOMEM;
231    }
232    *handle = (ENGINE_HANDLE*)&engine->engine;
233    return ENGINE_SUCCESS;
234 }
235 
get_handle(ENGINE_HANDLE * handle)236 static inline struct default_engine* get_handle(ENGINE_HANDLE* handle) {
237    return (struct default_engine*)handle;
238 }
239 
get_real_item(item * item)240 static inline hash_item* get_real_item(item* item) {
241     return (hash_item*)item;
242 }
243 
default_get_info(ENGINE_HANDLE * handle)244 static const engine_info* default_get_info(ENGINE_HANDLE* handle) {
245     return &get_handle(handle)->info.engine_info;
246 }
247 
default_initialize(ENGINE_HANDLE * handle,const char * config_str)248 static ENGINE_ERROR_CODE default_initialize(ENGINE_HANDLE* handle,
249                                             const char* config_str) {
250    struct default_engine* se = get_handle(handle);
251 
252    ENGINE_ERROR_CODE ret = initalize_configuration(se, config_str);
253    if (ret != ENGINE_SUCCESS) {
254       return ret;
255    }
256 
257    /* fixup feature_info */
258    if (se->config.use_cas) {
259        se->info.engine_info.features[se->info.engine_info.num_features++].feature = ENGINE_FEATURE_CAS;
260    }
261 
262    ret = assoc_init(se);
263    if (ret != ENGINE_SUCCESS) {
264       return ret;
265    }
266 
267    ret = slabs_init(se, se->config.maxbytes, se->config.factor,
268                     se->config.preallocate);
269    if (ret != ENGINE_SUCCESS) {
270       return ret;
271    }
272 
273    se->server.callback->register_callback(handle, ON_DISCONNECT, default_handle_disconnect, handle);
274 
275    return ENGINE_SUCCESS;
276 }
277 
default_destroy(ENGINE_HANDLE * handle,const bool force)278 static void default_destroy(ENGINE_HANDLE* handle, const bool force) {
279    (void) force;
280    struct default_engine* se = get_handle(handle);
281 
282    if (se->initialized) {
283       pthread_mutex_destroy(&se->cache_lock);
284       pthread_mutex_destroy(&se->stats.lock);
285       pthread_mutex_destroy(&se->slabs.lock);
286       se->initialized = false;
287       free(se->tap_connections.clients);
288       free(se);
289    }
290 }
291 
default_item_allocate(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const size_t nkey,const size_t nbytes,const int flags,const rel_time_t exptime)292 static ENGINE_ERROR_CODE default_item_allocate(ENGINE_HANDLE* handle,
293                                                const void* cookie,
294                                                item **item,
295                                                const void* key,
296                                                const size_t nkey,
297                                                const size_t nbytes,
298                                                const int flags,
299                                                const rel_time_t exptime) {
300    struct default_engine* engine = get_handle(handle);
301    size_t ntotal = sizeof(hash_item) + nkey + nbytes;
302    if (engine->config.use_cas) {
303       ntotal += sizeof(uint64_t);
304    }
305    unsigned int id = slabs_clsid(engine, ntotal);
306    if (id == 0) {
307       return ENGINE_E2BIG;
308    }
309 
310    hash_item *it;
311    it = item_alloc(engine, key, nkey, flags, engine->server.core->realtime(exptime),
312                    nbytes, cookie);
313 
314    if (it != NULL) {
315       *item = it;
316       return ENGINE_SUCCESS;
317    } else {
318       return ENGINE_ENOMEM;
319    }
320 }
321 
default_item_delete(ENGINE_HANDLE * handle,const void * cookie,const void * key,const size_t nkey,uint64_t cas,uint16_t vbucket)322 static ENGINE_ERROR_CODE default_item_delete(ENGINE_HANDLE* handle,
323                                              const void* cookie,
324                                              const void* key,
325                                              const size_t nkey,
326                                              uint64_t cas,
327                                              uint16_t vbucket)
328 {
329    struct default_engine* engine = get_handle(handle);
330    VBUCKET_GUARD(engine, vbucket);
331 
332    hash_item *it = item_get(engine, key, nkey);
333    if (it == NULL) {
334       return ENGINE_KEY_ENOENT;
335    }
336 
337    if (cas == 0 || cas == item_get_cas(it)) {
338       item_unlink(engine, it);
339       item_release(engine, it);
340    } else {
341       return ENGINE_KEY_EEXISTS;
342    }
343 
344    return ENGINE_SUCCESS;
345 }
346 
default_item_release(ENGINE_HANDLE * handle,const void * cookie,item * item)347 static void default_item_release(ENGINE_HANDLE* handle,
348                                  const void *cookie,
349                                  item* item) {
350    item_release(get_handle(handle), get_real_item(item));
351 }
352 
default_get(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const int nkey,uint16_t vbucket)353 static ENGINE_ERROR_CODE default_get(ENGINE_HANDLE* handle,
354                                      const void* cookie,
355                                      item** item,
356                                      const void* key,
357                                      const int nkey,
358                                      uint16_t vbucket) {
359    struct default_engine *engine = get_handle(handle);
360    VBUCKET_GUARD(engine, vbucket);
361 
362    *item = item_get(engine, key, nkey);
363    if (*item != NULL) {
364       return ENGINE_SUCCESS;
365    } else {
366       return ENGINE_KEY_ENOENT;
367    }
368 }
369 
stats_vbucket(struct default_engine * e,ADD_STAT add_stat,const void * cookie)370 static void stats_vbucket(struct default_engine *e,
371                           ADD_STAT add_stat,
372                           const void *cookie) {
373     for (int i = 0; i < NUM_VBUCKETS; i++) {
374         vbucket_state_t state = get_vbucket_state(e, i);
375         if (state != vbucket_state_dead) {
376             char buf[16];
377             snprintf(buf, sizeof(buf), "vb_%d", i);
378             const char * state_name = vbucket_state_name(state);
379             add_stat(buf, strlen(buf), state_name, strlen(state_name), cookie);
380         }
381     }
382 }
383 
default_get_stats(ENGINE_HANDLE * handle,const void * cookie,const char * stat_key,int nkey,ADD_STAT add_stat)384 static ENGINE_ERROR_CODE default_get_stats(ENGINE_HANDLE* handle,
385                                            const void* cookie,
386                                            const char* stat_key,
387                                            int nkey,
388                                            ADD_STAT add_stat)
389 {
390    struct default_engine* engine = get_handle(handle);
391    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
392 
393    if (stat_key == NULL) {
394       char val[128];
395       int len;
396 
397       pthread_mutex_lock(&engine->stats.lock);
398       len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.evictions);
399       add_stat("evictions", 9, val, len, cookie);
400       len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_items);
401       add_stat("curr_items", 10, val, len, cookie);
402       len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.total_items);
403       add_stat("total_items", 11, val, len, cookie);
404       len = sprintf(val, "%"PRIu64, (uint64_t)engine->stats.curr_bytes);
405       add_stat("bytes", 5, val, len, cookie);
406       len = sprintf(val, "%"PRIu64, engine->stats.reclaimed);
407       add_stat("reclaimed", 9, val, len, cookie);
408       len = sprintf(val, "%"PRIu64, (uint64_t)engine->config.maxbytes);
409       add_stat("engine_maxbytes", 15, val, len, cookie);
410       pthread_mutex_unlock(&engine->stats.lock);
411    } else if (strncmp(stat_key, "slabs", 5) == 0) {
412       slabs_stats(engine, add_stat, cookie);
413    } else if (strncmp(stat_key, "items", 5) == 0) {
414       item_stats(engine, add_stat, cookie);
415    } else if (strncmp(stat_key, "sizes", 5) == 0) {
416       item_stats_sizes(engine, add_stat, cookie);
417    } else if (strncmp(stat_key, "vbucket", 7) == 0) {
418       stats_vbucket(engine, add_stat, cookie);
419    } else if (strncmp(stat_key, "scrub", 5) == 0) {
420       char val[128];
421       int len;
422 
423       pthread_mutex_lock(&engine->scrubber.lock);
424       if (engine->scrubber.running) {
425          add_stat("scrubber:status", 15, "running", 7, cookie);
426       } else {
427          add_stat("scrubber:status", 15, "stopped", 7, cookie);
428       }
429 
430       if (engine->scrubber.started != 0) {
431          if (engine->scrubber.stopped != 0) {
432             time_t diff = engine->scrubber.started - engine->scrubber.stopped;
433             len = sprintf(val, "%"PRIu64, (uint64_t)diff);
434             add_stat("scrubber:last_run", 17, val, len, cookie);
435          }
436 
437          len = sprintf(val, "%"PRIu64, engine->scrubber.visited);
438          add_stat("scrubber:visited", 16, val, len, cookie);
439          len = sprintf(val, "%"PRIu64, engine->scrubber.cleaned);
440          add_stat("scrubber:cleaned", 16, val, len, cookie);
441       }
442       pthread_mutex_unlock(&engine->scrubber.lock);
443    } else {
444       ret = ENGINE_KEY_ENOENT;
445    }
446 
447    return ret;
448 }
449 
default_store(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t * cas,ENGINE_STORE_OPERATION operation,uint16_t vbucket)450 static ENGINE_ERROR_CODE default_store(ENGINE_HANDLE* handle,
451                                        const void *cookie,
452                                        item* item,
453                                        uint64_t *cas,
454                                        ENGINE_STORE_OPERATION operation,
455                                        uint16_t vbucket) {
456     struct default_engine *engine = get_handle(handle);
457     VBUCKET_GUARD(engine, vbucket);
458     return store_item(engine, get_real_item(item), cas, operation,
459                       cookie);
460 }
461 
default_arithmetic(ENGINE_HANDLE * handle,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result,uint16_t vbucket)462 static ENGINE_ERROR_CODE default_arithmetic(ENGINE_HANDLE* handle,
463                                             const void* cookie,
464                                             const void* key,
465                                             const int nkey,
466                                             const bool increment,
467                                             const bool create,
468                                             const uint64_t delta,
469                                             const uint64_t initial,
470                                             const rel_time_t exptime,
471                                             uint64_t *cas,
472                                             uint64_t *result,
473                                             uint16_t vbucket) {
474    struct default_engine *engine = get_handle(handle);
475    VBUCKET_GUARD(engine, vbucket);
476 
477    return arithmetic(engine, cookie, key, nkey, increment,
478                      create, delta, initial, engine->server.core->realtime(exptime), cas,
479                      result);
480 }
481 
default_flush(ENGINE_HANDLE * handle,const void * cookie,time_t when)482 static ENGINE_ERROR_CODE default_flush(ENGINE_HANDLE* handle,
483                                        const void* cookie, time_t when) {
484    item_flush_expired(get_handle(handle), when);
485 
486    return ENGINE_SUCCESS;
487 }
488 
default_reset_stats(ENGINE_HANDLE * handle,const void * cookie)489 static void default_reset_stats(ENGINE_HANDLE* handle, const void *cookie) {
490    struct default_engine *engine = get_handle(handle);
491    item_stats_reset(engine);
492 
493    pthread_mutex_lock(&engine->stats.lock);
494    engine->stats.evictions = 0;
495    engine->stats.reclaimed = 0;
496    engine->stats.total_items = 0;
497    pthread_mutex_unlock(&engine->stats.lock);
498 }
499 
initalize_configuration(struct default_engine * se,const char * cfg_str)500 static ENGINE_ERROR_CODE initalize_configuration(struct default_engine *se,
501                                                  const char *cfg_str) {
502    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
503 
504    se->config.vb0 = true;
505 
506    if (cfg_str != NULL) {
507       struct config_item items[] = {
508          { .key = "use_cas",
509            .datatype = DT_BOOL,
510            .value.dt_bool = &se->config.use_cas },
511          { .key = "verbose",
512            .datatype = DT_SIZE,
513            .value.dt_size = &se->config.verbose },
514          { .key = "eviction",
515            .datatype = DT_BOOL,
516            .value.dt_bool = &se->config.evict_to_free },
517          { .key = "cache_size",
518            .datatype = DT_SIZE,
519            .value.dt_size = &se->config.maxbytes },
520          { .key = "preallocate",
521            .datatype = DT_BOOL,
522            .value.dt_bool = &se->config.preallocate },
523          { .key = "factor",
524            .datatype = DT_FLOAT,
525            .value.dt_float = &se->config.factor },
526          { .key = "chunk_size",
527            .datatype = DT_SIZE,
528            .value.dt_size = &se->config.chunk_size },
529          { .key = "item_size_max",
530            .datatype = DT_SIZE,
531            .value.dt_size = &se->config.item_size_max },
532          { .key = "ignore_vbucket",
533            .datatype = DT_BOOL,
534            .value.dt_bool = &se->config.ignore_vbucket },
535          { .key = "vb0",
536            .datatype = DT_BOOL,
537            .value.dt_bool = &se->config.vb0 },
538          { .key = "config_file",
539            .datatype = DT_CONFIGFILE },
540          { .key = NULL}
541       };
542 
543       ret = se->server.core->parse_config(cfg_str, items, stderr);
544    }
545 
546    if (se->config.vb0) {
547        set_vbucket_state(se, 0, vbucket_state_active);
548    }
549 
550    return ret;
551 }
552 
set_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_set_vbucket * req,ADD_RESPONSE response)553 static bool set_vbucket(struct default_engine *e,
554                         const void* cookie,
555                         protocol_binary_request_set_vbucket *req,
556                         ADD_RESPONSE response) {
557     size_t bodylen = ntohl(req->message.header.request.bodylen)
558         - ntohs(req->message.header.request.keylen);
559     if (bodylen != sizeof(vbucket_state_t)) {
560         const char *msg = "Incorrect packet format";
561         return response(NULL, 0, NULL, 0, msg, strlen(msg),
562                         PROTOCOL_BINARY_RAW_BYTES,
563                         PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
564     }
565     vbucket_state_t state;
566     memcpy(&state, &req->message.body.state, sizeof(state));
567     state = ntohl(state);
568 
569     if (!is_valid_vbucket_state_t(state)) {
570         const char *msg = "Invalid vbucket state";
571         return response(NULL, 0, NULL, 0, msg, strlen(msg),
572                         PROTOCOL_BINARY_RAW_BYTES,
573                         PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
574     }
575 
576     set_vbucket_state(e, ntohs(req->message.header.request.vbucket), state);
577     return response(NULL, 0, NULL, 0, &state, sizeof(state),
578                     PROTOCOL_BINARY_RAW_BYTES,
579                     PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
580 }
581 
get_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_get_vbucket * req,ADD_RESPONSE response)582 static bool get_vbucket(struct default_engine *e,
583                         const void* cookie,
584                         protocol_binary_request_get_vbucket *req,
585                         ADD_RESPONSE response) {
586     vbucket_state_t state;
587     state = get_vbucket_state(e, ntohs(req->message.header.request.vbucket));
588     state = ntohl(state);
589 
590     return response(NULL, 0, NULL, 0, &state, sizeof(state),
591                     PROTOCOL_BINARY_RAW_BYTES,
592                     PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
593 }
594 
rm_vbucket(struct default_engine * e,const void * cookie,protocol_binary_request_header * req,ADD_RESPONSE response)595 static bool rm_vbucket(struct default_engine *e,
596                        const void *cookie,
597                        protocol_binary_request_header *req,
598                        ADD_RESPONSE response) {
599     set_vbucket_state(e, ntohs(req->request.vbucket), vbucket_state_dead);
600     return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
601                     PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
602 }
603 
scrub_cmd(struct default_engine * e,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)604 static bool scrub_cmd(struct default_engine *e,
605                       const void *cookie,
606                       protocol_binary_request_header *request,
607                       ADD_RESPONSE response) {
608 
609     protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_SUCCESS;
610     if (!item_start_scrub(e)) {
611         res = PROTOCOL_BINARY_RESPONSE_EBUSY;
612     }
613 
614     return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
615                     res, 0, cookie);
616 }
617 
touch(struct default_engine * e,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)618 static bool touch(struct default_engine *e, const void *cookie,
619                   protocol_binary_request_header *request,
620                   ADD_RESPONSE response) {
621     if (request->request.extlen != 4 || request->request.keylen == 0) {
622         return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
623                         PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
624     }
625 
626     protocol_binary_request_touch *t = (void*)request;
627     void *key = t->bytes + sizeof(t->bytes);
628     uint32_t exptime = ntohl(t->message.body.expiration);
629     uint16_t nkey = ntohs(request->request.keylen);
630 
631     hash_item *item = touch_item(e, key, nkey,
632                                  e->server.core->realtime(exptime));
633     if (item == NULL) {
634         if (request->request.opcode == PROTOCOL_BINARY_CMD_GATQ) {
635             return true;
636         } else {
637             return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
638                             PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, cookie);
639         }
640     } else {
641         bool ret;
642         if (request->request.opcode == PROTOCOL_BINARY_CMD_TOUCH) {
643             ret = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
644                            PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
645         } else {
646             ret = response(NULL, 0, &item->flags, sizeof(item->flags),
647                            item_get_data(item), item->nbytes,
648                            PROTOCOL_BINARY_RAW_BYTES,
649                            PROTOCOL_BINARY_RESPONSE_SUCCESS,
650                            item_get_cas(item), cookie);
651         }
652         item_release(e, item);
653         return ret;
654     }
655 }
656 
default_unknown_command(ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)657 static ENGINE_ERROR_CODE default_unknown_command(ENGINE_HANDLE* handle,
658                                                  const void* cookie,
659                                                  protocol_binary_request_header *request,
660                                                  ADD_RESPONSE response)
661 {
662     struct default_engine* e = get_handle(handle);
663     bool sent;
664 
665     switch(request->request.opcode) {
666     case PROTOCOL_BINARY_CMD_SCRUB:
667         sent = scrub_cmd(e, cookie, request, response);
668         break;
669     case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
670         sent = rm_vbucket(e, cookie, request, response);
671         break;
672     case PROTOCOL_BINARY_CMD_SET_VBUCKET:
673         sent = set_vbucket(e, cookie, (void*)request, response);
674         break;
675     case PROTOCOL_BINARY_CMD_GET_VBUCKET:
676         sent = get_vbucket(e, cookie, (void*)request, response);
677         break;
678     case PROTOCOL_BINARY_CMD_TOUCH:
679     case PROTOCOL_BINARY_CMD_GAT:
680     case PROTOCOL_BINARY_CMD_GATQ:
681         sent = touch(e, cookie, request, response);
682         break;
683     default:
684         sent = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
685                         PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
686         break;
687     }
688 
689     if (sent) {
690         return ENGINE_SUCCESS;
691     } else {
692         return ENGINE_FAILED;
693     }
694 }
695 
696 
item_get_cas(const hash_item * item)697 uint64_t item_get_cas(const hash_item* item)
698 {
699     if (item->iflag & ITEM_WITH_CAS) {
700         return *(uint64_t*)(item + 1);
701     }
702     return 0;
703 }
704 
item_set_cas(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t val)705 void item_set_cas(ENGINE_HANDLE *handle, const void *cookie,
706                   item* item, uint64_t val)
707 {
708     hash_item* it = get_real_item(item);
709     if (it->iflag & ITEM_WITH_CAS) {
710         *(uint64_t*)(it + 1) = val;
711     }
712 }
713 
item_get_key(const hash_item * item)714 const void* item_get_key(const hash_item* item)
715 {
716     char *ret = (void*)(item + 1);
717     if (item->iflag & ITEM_WITH_CAS) {
718         ret += sizeof(uint64_t);
719     }
720 
721     return ret;
722 }
723 
item_get_data(const hash_item * item)724 char* item_get_data(const hash_item* item)
725 {
726     return ((char*)item_get_key(item)) + item->nkey;
727 }
728 
item_get_clsid(const hash_item * item)729 uint8_t item_get_clsid(const hash_item* item)
730 {
731     return 0;
732 }
733 
get_item_info(ENGINE_HANDLE * handle,const void * cookie,const item * item,item_info * item_info)734 static bool get_item_info(ENGINE_HANDLE *handle, const void *cookie,
735                           const item* item, item_info *item_info)
736 {
737     hash_item* it = (hash_item*)item;
738     if (item_info->nvalue < 1) {
739         return false;
740     }
741     item_info->cas = item_get_cas(it);
742     item_info->exptime = it->exptime;
743     item_info->nbytes = it->nbytes;
744     item_info->flags = it->flags;
745     item_info->clsid = it->slabs_clsid;
746     item_info->nkey = it->nkey;
747     item_info->nvalue = 1;
748     item_info->key = item_get_key(it);
749     item_info->value[0].iov_base = item_get_data(it);
750     item_info->value[0].iov_len = it->nbytes;
751     return true;
752 }
753 
default_tap_notify(ENGINE_HANDLE * handle,const void * cookie,void * engine_specific,uint16_t nengine,uint8_t ttl,uint16_t tap_flags,tap_event_t tap_event,uint32_t tap_seqno,const void * key,size_t nkey,uint32_t flags,uint32_t exptime,uint64_t cas,const void * data,size_t ndata,uint16_t vbucket)754 static ENGINE_ERROR_CODE default_tap_notify(ENGINE_HANDLE* handle,
755                                             const void *cookie,
756                                             void *engine_specific,
757                                             uint16_t nengine,
758                                             uint8_t ttl,
759                                             uint16_t tap_flags,
760                                             tap_event_t tap_event,
761                                             uint32_t tap_seqno,
762                                             const void *key,
763                                             size_t nkey,
764                                             uint32_t flags,
765                                             uint32_t exptime,
766                                             uint64_t cas,
767                                             const void *data,
768                                             size_t ndata,
769                                             uint16_t vbucket) {
770     struct default_engine* engine = get_handle(handle);
771     vbucket_state_t state;
772     item *it;
773     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
774 
775     switch (tap_event) {
776     case TAP_ACK:
777         /* We don't provide a tap stream, so we should never receive this */
778         abort();
779 
780     case TAP_FLUSH:
781         return default_flush(handle, cookie, 0);
782 
783     case TAP_DELETION:
784         return default_item_delete(handle, cookie, key, nkey, cas, vbucket);
785 
786     case TAP_MUTATION:
787         it = engine->server.cookie->get_engine_specific(cookie);
788         if (it == NULL) {
789             ret = default_item_allocate(handle, cookie, &it, key, nkey, ndata, flags, exptime);
790             switch (ret) {
791             case ENGINE_SUCCESS:
792                 break;
793             case ENGINE_ENOMEM:
794                 return ENGINE_TMPFAIL;
795             default:
796                 return ret;
797             }
798         }
799         memcpy(item_get_data(it), data, ndata);
800         engine->server.cookie->store_engine_specific(cookie, NULL);
801         item_set_cas(handle, cookie, it, cas);
802         ret = default_store(handle, cookie, it, &cas, OPERATION_SET, vbucket);
803         if (ret == ENGINE_EWOULDBLOCK) {
804             engine->server.cookie->store_engine_specific(cookie, it);
805         } else {
806             item_release(engine, it);
807         }
808 
809         break;
810 
811     case TAP_VBUCKET_SET:
812         if (nengine != sizeof(vbucket_state_t)) {
813             // illegal size of the vbucket set package...
814             return ENGINE_DISCONNECT;
815         }
816 
817         memcpy(&state, engine_specific, nengine);
818         state = (vbucket_state_t)ntohl(state);
819 
820         if (!is_valid_vbucket_state_t(state)) {
821             return ENGINE_DISCONNECT;
822         }
823 
824         set_vbucket_state(engine, vbucket, state);
825         return ENGINE_SUCCESS;
826 
827     case TAP_OPAQUE:
828         // not supported, ignore
829     default:
830         engine->server.log->get_logger()->log(EXTENSION_LOG_DEBUG, cookie,
831                     "Ignoring unknown tap event: %x", tap_event);
832     }
833 
834     return ret;
835 }
836 
default_get_tap_iterator(ENGINE_HANDLE * handle,const void * cookie,const void * client,size_t nclient,uint32_t flags,const void * userdata,size_t nuserdata)837 static TAP_ITERATOR default_get_tap_iterator(ENGINE_HANDLE* handle,
838                                              const void* cookie,
839                                              const void* client,
840                                              size_t nclient,
841                                              uint32_t flags,
842                                              const void* userdata,
843                                              size_t nuserdata) {
844     struct default_engine* engine = get_handle(handle);
845 
846     if ((flags & TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS)) { /* Not supported */
847         return NULL;
848     }
849 
850     pthread_mutex_lock(&engine->tap_connections.lock);
851     int ii;
852     for (ii = 0; ii < engine->tap_connections.size; ++ii) {
853         if (engine->tap_connections.clients[ii] == NULL) {
854             engine->tap_connections.clients[ii] = cookie;
855             break;
856         }
857     }
858     pthread_mutex_unlock(&engine->tap_connections.lock);
859     if (ii == engine->tap_connections.size) {
860         // @todo allow more connections :)
861         return NULL;
862     }
863 
864     if (!initialize_item_tap_walker(engine, cookie)) {
865         /* Failed to create */
866         pthread_mutex_lock(&engine->tap_connections.lock);
867         engine->tap_connections.clients[ii] = NULL;
868         pthread_mutex_unlock(&engine->tap_connections.lock);
869         return NULL;
870     }
871 
872     return item_tap_walker;
873  }
874 
default_handle_disconnect(const void * cookie,ENGINE_EVENT_TYPE type,const void * event_data,const void * cb_data)875 static void default_handle_disconnect(const void *cookie,
876                                       ENGINE_EVENT_TYPE type,
877                                       const void *event_data,
878                                       const void *cb_data) {
879     struct default_engine *engine = (struct default_engine*)cb_data;
880     pthread_mutex_lock(&engine->tap_connections.lock);
881     int ii;
882     for (ii = 0; ii < engine->tap_connections.size; ++ii) {
883         if (engine->tap_connections.clients[ii] == cookie) {
884             free(engine->server.cookie->get_engine_specific(cookie));
885             break;
886         }
887     }
888     pthread_mutex_unlock(&engine->tap_connections.lock);
889 }
890