1 /*
2 Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3 reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24 02110-1301 USA
25 */
26 #include <my_config.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <assert.h>
30 #include <pthread.h>
31
32 #include "config.h"
33
34 #include "default_engine.h"
35
36 #include <memcached/util.h>
37 #include <memcached/config_parser.h>
38 #include <memcached/extension.h>
39 #include <memcached/extension_loggers.h>
40
41 #include "ndb_engine.h"
42 #include "ndb_configuration.h"
43 #include "workitem.h"
44 #include "ndb_engine_private.h"
45 #include "debug.h"
46 #include "Scheduler.h"
47 #include "thread_identifier.h"
48 #include "timing.h"
49 #include "ndb_error_logger.h"
50
51 /* Global variables */
52 EXTENSION_LOGGER_DESCRIPTOR *logger;
53
54 /* Static and local to this file */
55 const char * set_ops[] = { "","add","set","replace","append","prepend","cas" };
56
57
ndb_handle(ENGINE_HANDLE * handle)58 static inline struct ndb_engine* ndb_handle(ENGINE_HANDLE* handle)
59 {
60 return (struct ndb_engine*) handle;
61 }
62
default_handle(struct ndb_engine * eng)63 static inline struct default_engine* default_handle(struct ndb_engine *eng)
64 {
65 return (struct default_engine*) eng->m_default_engine;
66 }
67
68
69 /*********** PRIVATE UTILITY FUNCTIONS BEGIN HERE ***********************/
70
get_my_pipeline_config(struct ndb_engine * eng)71 ndb_pipeline * get_my_pipeline_config(struct ndb_engine *eng) {
72 const thread_identifier * thread_id;
73
74 /* Try to fetch the pipeline from the thread identity */
75 thread_id = get_thread_id();
76 if(thread_id) {
77 return thread_id->pipeline;
78 }
79 else {
80 /* On the first call from each thread, initialize the pipeline */
81 return ndb_pipeline_initialize(eng);
82 }
83 }
84
85
86 /*********** FUNCTIONS IMPLEMENTING THE PUBLISHED API BEGIN HERE ********/
87
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)88 ENGINE_ERROR_CODE create_instance(uint64_t interface,
89 GET_SERVER_API get_server_api,
90 ENGINE_HANDLE **handle ) {
91
92 struct ndb_engine *ndb_eng;
93 const char * env_connectstring;
94 ENGINE_ERROR_CODE return_status;
95
96 SERVER_HANDLE_V1 *api = get_server_api();
97 if (interface != 1 || api == NULL) {
98 return ENGINE_ENOTSUP;
99 }
100
101 ndb_eng = malloc(sizeof(struct ndb_engine));
102 if(ndb_eng == NULL) {
103 return ENGINE_ENOMEM;
104 }
105
106 logger = api->extension->get_extension(EXTENSION_LOGGER);
107
108 ndb_eng->npipelines = 0;
109 ndb_eng->connected = false;
110
111 ndb_eng->engine.interface.interface = 1;
112 ndb_eng->engine.get_info = ndb_get_info;
113 ndb_eng->engine.initialize = ndb_initialize;
114 ndb_eng->engine.destroy = ndb_destroy;
115 ndb_eng->engine.allocate = ndb_allocate;
116 ndb_eng->engine.remove = ndb_remove;
117 ndb_eng->engine.release = ndb_release;
118 ndb_eng->engine.get = ndb_get;
119 ndb_eng->engine.get_stats = ndb_get_stats;
120 ndb_eng->engine.reset_stats = ndb_reset_stats;
121 ndb_eng->engine.store = ndb_store;
122 ndb_eng->engine.arithmetic = ndb_arithmetic;
123 ndb_eng->engine.flush = ndb_flush;
124 ndb_eng->engine.unknown_command = ndb_unknown_command;
125 ndb_eng->engine.item_set_cas = item_set_cas; /* reused */
126 ndb_eng->engine.get_item_info = ndb_get_item_info;
127 ndb_eng->engine.get_stats_struct = NULL;
128 ndb_eng->engine.aggregate_stats = NULL;
129 ndb_eng->engine.tap_notify = NULL;
130 ndb_eng->engine.get_tap_iterator = NULL;
131 ndb_eng->engine.errinfo = NULL;
132
133 ndb_eng->server = *api;
134 ndb_eng->get_server_api = get_server_api;
135
136 /* configuration, with default values*/
137 ndb_eng->startup_options.connectstring = "localhost:1186";
138 ndb_eng->startup_options.server_role = "default_role";
139 ndb_eng->startup_options.scheduler = 0;
140 ndb_eng->startup_options.debug_enable = false;
141 ndb_eng->startup_options.debug_detail = false;
142 ndb_eng->startup_options.reconf_enable = true;
143
144 /* Now let NDB_CONNECTSRING environment variable override the default */
145 env_connectstring = getenv("NDB_CONNECTSTRING");
146 if(env_connectstring)
147 ndb_eng->startup_options.connectstring = env_connectstring;
148
149 /* Set engine informational structure */
150 ndb_eng->info.info.description = "NDB Memcache " VERSION;
151 ndb_eng->info.info.num_features = 3;
152 ndb_eng->info.info.features[0].feature = ENGINE_FEATURE_CAS;
153 ndb_eng->info.info.features[0].description = NULL;
154 ndb_eng->info.info.features[1].feature = ENGINE_FEATURE_PERSISTENT_STORAGE;
155 ndb_eng->info.info.features[1].description = NULL;
156 ndb_eng->info.info.features[2].feature = ENGINE_FEATURE_LRU;
157 ndb_eng->info.info.features[2].description = NULL;
158
159 /* Now call create_instace() for the default engine */
160 return_status = default_engine_create_instance(interface, get_server_api,
161 & (ndb_eng->m_default_engine));
162
163 if(return_status == ENGINE_SUCCESS)
164 *handle = (ENGINE_HANDLE*) &ndb_eng->engine;
165
166 return return_status;
167 }
168
169
170 /*** get_info ***/
ndb_get_info(ENGINE_HANDLE * handle)171 static const engine_info* ndb_get_info(ENGINE_HANDLE* handle)
172 {
173 return & ndb_handle(handle)->info.info;
174 }
175
176
177 /*** initialize ***/
ndb_initialize(ENGINE_HANDLE * handle,const char * config_str)178 static ENGINE_ERROR_CODE ndb_initialize(ENGINE_HANDLE* handle,
179 const char* config_str)
180 {
181 int i, nthreads, debug_level;
182 time_point_t pump_time = 0;
183 ENGINE_ERROR_CODE return_status;
184 struct ndb_engine *ndb_eng = ndb_handle(handle);
185 struct default_engine *def_eng = default_handle(ndb_eng);
186 scheduler_options sched_opts;
187
188 /* Process options for both the ndb engine and the default engine: */
189 read_cmdline_options(ndb_eng, def_eng, config_str);
190
191 /* Initalize the debug library */
192 if(ndb_eng->startup_options.debug_detail)
193 debug_level = 2;
194 else if(ndb_eng->startup_options.debug_enable)
195 debug_level = 1;
196 else debug_level = 0;
197 DEBUG_INIT(NULL, debug_level);
198 DEBUG_ENTER();
199
200 /* Connect to the Primary cluster */
201 if(!(connect_to_primary_cluster(ndb_eng->startup_options.connectstring,
202 ndb_eng->startup_options.server_role))) {
203 logger->log(LOG_WARNING, 0, "Could not connect to NDB. Shutting down.\n");
204 return ENGINE_FAILED;
205 }
206 ndb_eng->connected = true;
207
208 /* Read configuration */
209 if(!(get_config())) {
210 logger->log(LOG_WARNING, 0, "Failed to read configuration -- shutting down.\n"
211 "(Did you run ndb_memcache_metadata.sql?)\n");
212 return ENGINE_FAILED;
213 }
214
215 /* Connect to additional clusters */
216 if(! open_connections_to_all_clusters()) {
217 logger->log(LOG_WARNING, 0, "open_connections_to_all_clusters() failed \n");
218 return ENGINE_FAILED;
219 }
220
221 /* Initialize Thread-Specific Storage */
222 initialize_thread_id_key();
223
224 /* Fetch some settings from the memcached core */
225 fetch_core_settings(ndb_eng, def_eng);
226 nthreads = ndb_eng->server_options.nthreads;
227
228 /* Initialize the error handler */
229 ndb_error_logger_init(def_eng->server.core, ndb_eng->server_options.verbose);
230
231 logger->log(LOG_WARNING, NULL, "Server started with %d threads.\n", nthreads);
232 logger->log(LOG_WARNING, NULL, "Priming the pump ... ");
233 timing_point(& pump_time);
234
235 /* prefetch data dictionary objects */
236 prefetch_dictionary_objects();
237
238 /* Build the scheduler options structure */
239 sched_opts.nthreads = ndb_eng->server_options.nthreads;
240 sched_opts.max_clients = ndb_eng->server_options.maxconns;
241
242 /* Allocate and initailize the pipelines, and their schedulers.
243 This will take some time; each pipeline creates slab and pool allocators,
244 and each scheduler may preallocate a large number of Ndb objects and
245 transactions, requiring multiple round trips to the data nodes. We
246 do this now to avoid the latency cost of setting up those objects at
247 runtime.
248 When the first request comes in, the pipeline, scheduler, and thread
249 will all get stitched together.
250 */
251 ndb_eng->pipelines = malloc(nthreads * sizeof(void *));
252 for(i = 0 ; i < nthreads ; i++) {
253 ndb_eng->pipelines[i] = get_request_pipeline(i, ndb_eng);
254 if(! scheduler_initialize(ndb_eng->pipelines[i], & sched_opts)) {
255 logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n",
256 ndb_eng->startup_options.scheduler);
257 abort();
258 }
259 }
260
261 logger->log(LOG_WARNING, NULL, "done [%5.3f sec].\n",
262 (double) (timing_point(& pump_time) / (double) 1000000000));
263
264 /* Now initialize the default engine with no options (it already has them) */
265 return_status = def_eng->engine.initialize(ndb_eng->m_default_engine, "");
266
267 if(return_status == ENGINE_SUCCESS) {
268 set_initial_cas_ids(& ndb_eng->cas_hi, & ndb_eng->cas_lo);
269 }
270
271 print_debug_startup_info();
272
273 /* Listen for reconfiguration signals */
274 if(ndb_eng->startup_options.reconf_enable) {
275 start_reconfig_listener(ndb_eng->pipelines[0]);
276 }
277
278 return return_status;
279 }
280
281
282 /*** destroy ***/
ndb_destroy(ENGINE_HANDLE * handle,bool force)283 static void ndb_destroy(ENGINE_HANDLE* handle, bool force)
284 {
285 struct ndb_engine* ndb_eng;
286 struct default_engine *def_eng;
287 DEBUG_ENTER();
288
289 ndb_eng = ndb_handle(handle);
290 def_eng = default_handle(ndb_eng);
291
292 // TODO: Shutdown just the Scheduler Global
293 for(atomic_int32_t i = 0 ; i < ndb_eng->npipelines; i ++) {
294 ndb_pipeline *p = ndb_eng->pipelines[i];
295 if(p) {
296 scheduler_shutdown(p);
297 ndb_pipeline_free(p);
298 }
299 }
300
301 disconnect_all();
302 def_eng->engine.destroy(ndb_eng->m_default_engine, force);
303 }
304
305
306 /* CALL FLOWS
307 ----------
308 GET: eng.get(), eng.get_item_info()*, eng.release()*
309 DELETE: eng.remove()
310 SET (etc): eng.allocate(), eng.item_set_cas(), eng.get_item_info(),
311 eng.store(), eng.release()*
312 INCR: eng.arithmetic()
313 FLUSH: eng.flush()
314
315 * Called only on success (ENGINE_SUCCESS or ENGINE_EWOULDBLOCK)
316 */
317
318
319
320 /*** Release scheduler resources and free workitem ****/
release_and_free(workitem * wqitem)321 void release_and_free(workitem *wqitem) {
322 DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id);
323 scheduler_release(wqitem->pipeline, wqitem);
324 workitem_free(wqitem);
325 }
326
327
328 /*** allocate ***/
329
330 /* Allocate gets a struct item from the slab allocator, and fills in
331 everything but the value. It seems like we can just pass this on to
332 the default engine; we'll intercept it later in store().
333 This is also called directly from finalize_read() in the commit thread.
334 */
ndb_allocate(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const size_t nkey,const size_t nbytes,const int flags,const rel_time_t exptime)335 static ENGINE_ERROR_CODE ndb_allocate(ENGINE_HANDLE* handle,
336 const void* cookie,
337 item **item,
338 const void* key,
339 const size_t nkey,
340 const size_t nbytes,
341 const int flags,
342 const rel_time_t exptime)
343 {
344 struct ndb_engine* ndb_eng;
345 struct default_engine *def_eng;
346 DEBUG_ENTER_DETAIL();
347
348 ndb_eng = ndb_handle(handle);
349 def_eng = default_handle(ndb_eng);
350
351 return def_eng->engine.allocate(ndb_eng->m_default_engine, cookie, item,
352 key, nkey, nbytes, flags, exptime);
353 }
354
355
356 /*** remove ***/
ndb_remove(ENGINE_HANDLE * handle,const void * cookie,const void * key,const size_t nkey,uint64_t cas,uint16_t vbucket)357 static ENGINE_ERROR_CODE ndb_remove(ENGINE_HANDLE* handle,
358 const void* cookie,
359 const void* key,
360 const size_t nkey,
361 uint64_t cas,
362 uint16_t vbucket __attribute__((unused)))
363 {
364 struct ndb_engine* ndb_eng = ndb_handle(handle);
365 struct default_engine *def_eng = default_handle(ndb_eng);
366 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
367 ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
368 prefix_info_t prefix;
369 workitem *wqitem;
370
371 /* Is this a callback after completed I/O? */
372 wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
373 if(wqitem) {
374 DEBUG_PRINT_DETAIL("Got callback: %s", wqitem->status->comment);
375 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); //pop
376 release_and_free(wqitem);
377 return wqitem->status->status;
378 }
379
380 prefix = get_prefix_info_for_key(nkey, key);
381 DEBUG_PRINT_DETAIL("prefix: %d", prefix.prefix_id);
382
383 /* DELETE.
384 You should attempt the cache delete, regardless of whether the database
385 delete succeeds or fails. So, we simply perform the cache delete first,
386 here, and then queue the database delete.
387 */
388
389 if(prefix.do_mc_delete) { /* Cache Delete */
390 hash_item *it = item_get(def_eng, key, nkey);
391 return_status = ENGINE_KEY_ENOENT;
392 if (it != NULL) {
393 // ACTUALLY NO???
394 /* In the binary protocol there is such a thing as a CAS delete.
395 This is the CAS check. If we will also be deleting from the database,
396 there are two possibilities:
397 1: The CAS matches; perform the delete.
398 2: The CAS doesn't match; delete the item because it's stale.
399 Therefore we skip the check altogether if(do_db_delete).
400 */
401 if(! prefix.do_db_delete)
402 if(cas && (cas != item_get_cas(it)))
403 return ENGINE_KEY_EEXISTS;
404
405 item_unlink(def_eng, it);
406 item_release(def_eng, it);
407 return_status = ENGINE_SUCCESS;
408 }
409 }
410
411 if(prefix.do_db_delete) { /* Database Delete */
412 wqitem = new_workitem_for_delete_op(pipeline, prefix, cookie, nkey, key, & cas);
413 DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
414 return_status = scheduler_schedule(pipeline, wqitem);
415 if(return_status != ENGINE_EWOULDBLOCK) {
416 release_and_free(wqitem);
417 }
418 }
419
420 return return_status;
421 }
422
423
424 /*** release ***/
ndb_release(ENGINE_HANDLE * handle,const void * cookie,item * item)425 static void ndb_release(ENGINE_HANDLE* handle, const void *cookie,
426 item* item)
427 {
428 struct ndb_engine* ndb_eng = ndb_handle(handle);
429 struct default_engine *def_eng = default_handle(ndb_eng);
430
431 workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
432 if(wqitem) {
433 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
434 release_and_free(wqitem);
435 }
436
437 if(item && (item != wqitem)) {
438 DEBUG_PRINT_DETAIL("Releasing a hash item.");
439 item_release(def_eng, (hash_item *) item);
440 }
441 }
442
443
444 /*** get ***/
ndb_get(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const int nkey,uint16_t vbucket)445 static ENGINE_ERROR_CODE ndb_get(ENGINE_HANDLE* handle,
446 const void* cookie,
447 item** item,
448 const void* key,
449 const int nkey,
450 uint16_t vbucket __attribute__((unused)))
451 {
452 struct ndb_engine* ndb_eng = ndb_handle(handle);
453 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
454 struct workitem *wqitem;
455 prefix_info_t prefix;
456 ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
457
458 wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
459
460 /* Is this a callback after completed I/O? */
461 if(wqitem && ! wqitem->base.complete) {
462 DEBUG_PRINT_DETAIL("Got read callback on workitem %d.%d: %s",
463 wqitem->pipeline->id, wqitem->id, wqitem->status->comment);
464 *item = wqitem->cache_item;
465 wqitem->base.complete = 1;
466 return_status = wqitem->status->status;
467
468 /* On success the workitem will be read in ndb_get_item_info, then released.
469 Otherwise: */
470 if(return_status != ENGINE_SUCCESS) {
471 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
472 release_and_free(wqitem);
473 }
474
475 return return_status;
476 }
477
478 prefix = get_prefix_info_for_key(nkey, key);
479
480 /* Cache read */
481 /* FIXME: Use the public APIs */
482 if(prefix.do_mc_read) {
483 *item = item_get(default_handle(ndb_eng), key, nkey);
484 if (*item != NULL) {
485 DEBUG_PRINT(" cache hit");
486 return ENGINE_SUCCESS;
487 }
488 DEBUG_PRINT(" cache miss");
489 }
490
491 /* Build and send the NDB transaction */
492 if(prefix.do_db_read) {
493 wqitem = new_workitem_for_get_op(wqitem, pipeline, prefix, cookie, nkey, key);
494 DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
495 return_status = scheduler_schedule(pipeline, wqitem);
496 if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
497 /* On error we must pop and free */
498 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
499 release_and_free(wqitem);
500 }
501 }
502
503 return return_status;
504 }
505
506
507 /*** get_stats ***/
ndb_get_stats(ENGINE_HANDLE * handle,const void * cookie,const char * stat_key,int nkey,ADD_STAT add_stat)508 static ENGINE_ERROR_CODE ndb_get_stats(ENGINE_HANDLE* handle,
509 const void *cookie,
510 const char *stat_key,
511 int nkey,
512 ADD_STAT add_stat)
513 {
514 struct ndb_engine* ndb_eng = ndb_handle(handle);
515 struct default_engine *def_eng = default_handle(ndb_eng);
516 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
517
518 DEBUG_ENTER_DETAIL();
519
520 if(stat_key) {
521 if(strncasecmp(stat_key, "menu", 4) == 0)
522 return stats_menu(add_stat, cookie);
523
524 if((strncasecmp(stat_key, "ndb", 3) == 0) ||
525 (strncasecmp(stat_key, "scheduler", 9) == 0) ||
526 (strncasecmp(stat_key, "reconf", 6) == 0) ||
527 (strncasecmp(stat_key, "errors", 6) == 0))
528 {
529 /* NDB Engine stats */
530 pipeline_add_stats(pipeline, stat_key, add_stat, cookie);
531 return ENGINE_SUCCESS;
532 }
533 }
534
535 /* Default engine stats */
536 return def_eng->engine.get_stats(ndb_eng->m_default_engine, cookie,
537 stat_key, nkey, add_stat);
538 }
539
540
541 /*** reset_stats ***/
ndb_reset_stats(ENGINE_HANDLE * handle,const void * cookie)542 static void ndb_reset_stats(ENGINE_HANDLE* handle,
543 const void *cookie)
544 {
545 struct ndb_engine* ndb_eng = ndb_handle(handle);
546 struct default_engine *def_eng = default_handle(ndb_eng);
547 /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
548 /* DEBUG_ENTER(); */
549 def_eng->engine.reset_stats(ndb_eng->m_default_engine, cookie);
550 }
551
552
553 /*** store ***/
554
ndb_store(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t * cas,ENGINE_STORE_OPERATION op,uint16_t vbucket)555 static ENGINE_ERROR_CODE ndb_store(ENGINE_HANDLE* handle,
556 const void *cookie,
557 item* item,
558 uint64_t *cas,
559 ENGINE_STORE_OPERATION op,
560 uint16_t vbucket __attribute__((unused)))
561 {
562 struct ndb_engine* ndb_eng = ndb_handle(handle);
563 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
564 ENGINE_ERROR_CODE return_status = ENGINE_NOT_STORED;
565 prefix_info_t prefix;
566
567 /* Is this a callback after completed I/O? */
568 workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
569 if(wqitem) {
570 DEBUG_PRINT_DETAIL("Got callback on workitem %d.%d: %s",
571 pipeline->id, wqitem->id, wqitem->status->comment);
572 return wqitem->status->status;
573 }
574
575 prefix = get_prefix_info_for_key(hash_item_get_key_len(item),
576 hash_item_get_key(item));
577
578
579 /* Build and send the NDB transaction.
580 If there is also a cache operation, it must be deferred until we know
581 whether the database operation has succeeded or failed.
582 */
583 if(prefix.do_db_write) {
584 wqitem = new_workitem_for_store_op(pipeline, op, prefix, cookie, item, cas);
585 DEBUG_PRINT("[%s] prefix %d; CAS %llu; use mc/db: %d/%d -- creating workitem %d.%d",
586 set_ops[op], prefix.prefix_id, cas ? *cas : 0,
587 prefix.do_mc_write, prefix.do_db_write,
588 pipeline->id, wqitem->id);
589 return_status = scheduler_schedule(pipeline, wqitem);
590 if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
591 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
592 release_and_free(wqitem);
593 }
594 }
595 else if(prefix.do_mc_write) {
596 DEBUG_PRINT("[%s] prefix %d; CAS %llu; use mc/db: %d/%d -- cache-only store.",
597 set_ops[op], prefix.prefix_id, cas ? *cas : 0,
598 prefix.do_mc_write, prefix.do_db_write);
599 return_status = store_item(default_handle(ndb_eng), item, cas, op, cookie);
600 }
601
602 return return_status;
603 }
604
605
606 /*** arithmetic ***/
ndb_arithmetic(ENGINE_HANDLE * handle,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result,uint16_t vbucket)607 static ENGINE_ERROR_CODE ndb_arithmetic(ENGINE_HANDLE* handle,
608 const void* cookie,
609 const void* key,
610 const int nkey,
611 const bool increment,
612 const bool create,
613 const uint64_t delta,
614 const uint64_t initial,
615 const rel_time_t exptime,
616 uint64_t *cas,
617 uint64_t *result,
618 uint16_t vbucket)
619 {
620 struct ndb_engine* ndb_eng = ndb_handle(handle);
621 struct default_engine *def_eng = default_handle(ndb_eng);
622 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
623 struct workitem *wqitem;
624 prefix_info_t prefix;
625 ENGINE_ERROR_CODE return_status;
626
627 /* Is this a callback after completed I/O? */
628 wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
629 if(wqitem && ! wqitem->base.complete) {
630 DEBUG_PRINT_DETAIL("Got arithmetic callback: %s", wqitem->status->comment);
631 return_status = wqitem->status->status;
632 wqitem->base.complete = 1;
633 *result = wqitem->math_value;
634 /* There will be no call to release(), so pop and free now. */
635 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
636 release_and_free(wqitem);
637 return return_status;
638 }
639
640 prefix = get_prefix_info_for_key(nkey, key);
641 DEBUG_PRINT("prefix: %d delta: %d create: %d initial: %d ",
642 prefix.prefix_id, (int) delta, (int) create, (int) initial);
643
644 /* For cache-only prefixes, forward this to the default engine */
645 if(! prefix.use_ndb) {
646 return def_eng->engine.arithmetic(ndb_eng->m_default_engine, cookie,
647 key, nkey, increment, create,
648 delta, initial, exptime, cas,
649 result, vbucket);
650 }
651
652 /* A math operation contains both a read and a write. */
653 if(! (prefix.has_math_col && prefix.do_db_read && prefix.do_db_write)) {
654 logger->log(LOG_WARNING, 0, "NDB INCR/DECR is not allowed for this key.\n");
655 DEBUG_PRINT("REJECTED : %d %d %d", (int) prefix.has_math_col,
656 (int) prefix.do_db_read , (int) prefix.do_db_write);
657 return ENGINE_NOT_STORED;
658 }
659
660 wqitem = new_workitem_for_arithmetic(pipeline, prefix, cookie, key, nkey,
661 increment, create, delta, initial, cas);
662 DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
663
664 return_status = scheduler_schedule(pipeline, wqitem);
665
666 if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS)))
667 release_and_free(wqitem);
668
669 return return_status;
670 }
671
672 /*** flush ***/
ndb_flush(ENGINE_HANDLE * handle,const void * cookie,time_t when)673 static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle,
674 const void* cookie, time_t when)
675 {
676 /*
677 Notes on flush:
678 The server will call *only* into ndb_flush (not to allocate or release).
679 The NDB engine ignores the "when" parameter.
680 Flush operations have special handling, outside of the scheduler.
681 They are performed synchronously.
682 And we always send the flush command to the cache engine.
683 */
684 struct ndb_engine* ndb_eng;
685 struct default_engine *def_eng;
686 ndb_pipeline *pipeline;
687 DEBUG_ENTER();
688
689 ndb_eng = ndb_handle(handle);
690 def_eng = default_handle(ndb_eng);
691 pipeline = get_my_pipeline_config(ndb_eng);
692
693 (void) def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when);
694 // TODO: Why not return ENGINE_EWOULDBLOCK first?
695 return pipeline_flush_all(pipeline);
696 }
697
698
699 /*** unknown_command ***/
ndb_unknown_command(ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)700 static ENGINE_ERROR_CODE ndb_unknown_command(ENGINE_HANDLE* handle,
701 const void* cookie,
702 protocol_binary_request_header *request,
703 ADD_RESPONSE response)
704 {
705 struct ndb_engine* ndb_eng;
706 struct default_engine *def_eng;
707 DEBUG_ENTER();
708
709 ndb_eng = ndb_handle(handle);
710 def_eng = default_handle(ndb_eng);
711
712 return def_eng->engine.unknown_command(ndb_eng->m_default_engine, cookie,
713 request, response);
714 }
715
716
717 /*** get_item_info ***/
ndb_get_item_info(ENGINE_HANDLE * handle,const void * cookie,const item * item,item_info * item_info)718 static bool ndb_get_item_info(ENGINE_HANDLE *handle,
719 const void *cookie,
720 const item* item,
721 item_info *item_info)
722 {
723 struct ndb_engine* ndb_eng = ndb_handle(handle);
724 struct default_engine *def_eng = default_handle(ndb_eng);
725
726 workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
727
728 if(wqitem == NULL) {
729 DEBUG_PRINT_DETAIL(" cache-only");
730 return def_eng->engine.get_item_info(handle, cookie, item, item_info);
731 }
732
733 if (item_info->nvalue < 1) {
734 DEBUG_PRINT_DETAIL("nvalue too small.");
735 return false;
736 }
737
738 if(wqitem->base.has_value) {
739 /* Use the workitem. */
740 item_info->cas = wqitem->cas ? *(wqitem->cas) : 0;
741 item_info->exptime = 0;
742 item_info->nbytes = wqitem->value_size;
743 item_info->flags = wqitem->math_flags;
744 item_info->clsid = slabs_clsid(default_handle(ndb_eng), wqitem->value_size);
745 item_info->nkey = wqitem->base.nkey;
746 item_info->nvalue = 1; /* how many iovecs */
747 item_info->key = wqitem->key;
748 item_info->value[0].iov_base = wqitem->value_ptr;
749 item_info->value[0].iov_len = wqitem->value_size;
750 DEBUG_PRINT_DETAIL("workitem %d.%d [%s].", wqitem->pipeline->id, wqitem->id,
751 workitem_get_operation(wqitem));
752 return true;
753 }
754 else {
755 /* Use a hash item */
756 hash_item *it = (hash_item*) item;
757 item_info->cas = hash_item_get_cas(it);
758 item_info->exptime = it->exptime;
759 item_info->nbytes = wqitem ? wqitem->value_size : 0;
760 item_info->flags = it->flags;
761 item_info->clsid = it->slabs_clsid;
762 item_info->nkey = it->nkey;
763 item_info->nvalue = 1;
764 item_info->key = hash_item_get_key(it);
765 item_info->value[0].iov_base = hash_item_get_data(it);
766 item_info->value[0].iov_len = item_info->nbytes;
767 if(item_info->nbytes) {
768 DEBUG_PRINT_DETAIL("hash_item [KEY: %.*s][CAS: %llu][nbytes: %d].", it->nkey,
769 hash_item_get_key(it), item_info->cas, item_info->nbytes);
770 }
771 else {
772 DEBUG_PRINT_DETAIL(" new hash_item");
773 }
774 return true;
775 }
776 }
777
778
779 /* read_cmdline_options requires duplicating code from the default engine.
780 If the default engine supports a new option, you will need to add it here.
781 We create a single config_items structure containing options for both
782 engines.
783 */
read_cmdline_options(struct ndb_engine * ndb,struct default_engine * se,const char * conf)784 void read_cmdline_options(struct ndb_engine *ndb, struct default_engine *se,
785 const char * conf)
786 {
787 int did_parse;
788 DEBUG_ENTER();
789
790 did_parse = 0; /* 0 = success from parse_config() */
791
792 if (conf != NULL) {
793 struct config_item items[] = {
794 /* NDB OPTIONS */
795 { .key = "connectstring",
796 .datatype = DT_STRING,
797 .value.dt_string = (char **) &(ndb->startup_options.connectstring) },
798 { .key = "role",
799 .datatype = DT_STRING,
800 .value.dt_string = (char **) &(ndb->startup_options.server_role) },
801 { .key = "scheduler",
802 .datatype = DT_STRING,
803 .value.dt_string = (char **) &(ndb->startup_options.scheduler) },
804 #ifdef DEBUG_OUTPUT
805 { .key = "debug",
806 .datatype = DT_BOOL,
807 .value.dt_bool = &(ndb->startup_options.debug_enable) },
808 { .key = "detail",
809 .datatype = DT_BOOL,
810 .value.dt_bool = &(ndb->startup_options.debug_detail) },
811 #endif
812 { .key = "reconf",
813 .datatype = DT_BOOL,
814 .value.dt_bool = &(ndb->startup_options.reconf_enable) },
815
816 /* DEFAULT ENGINE OPTIONS */
817 { .key = "use_cas",
818 .datatype = DT_BOOL,
819 .value.dt_bool = &se->config.use_cas },
820 { .key = "verbose",
821 .datatype = DT_SIZE,
822 .value.dt_size = &se->config.verbose },
823 { .key = "eviction",
824 .datatype = DT_BOOL,
825 .value.dt_bool = &se->config.evict_to_free },
826 { .key = "cache_size",
827 .datatype = DT_SIZE,
828 .value.dt_size = &se->config.maxbytes },
829 { .key = "preallocate",
830 .datatype = DT_BOOL,
831 .value.dt_bool = &se->config.preallocate },
832 { .key = "factor",
833 .datatype = DT_FLOAT,
834 .value.dt_float = &se->config.factor },
835 { .key = "chunk_size",
836 .datatype = DT_SIZE,
837 .value.dt_size = &se->config.chunk_size },
838 { .key = "item_size_max",
839 .datatype = DT_SIZE,
840 .value.dt_size = &se->config.item_size_max },
841 { .key = "config_file",
842 .datatype = DT_CONFIGFILE },
843 { .key = NULL}
844 };
845
846 did_parse = se->server.core->parse_config(conf, items, stderr);
847 }
848 switch(did_parse) {
849 case -1:
850 logger->log(LOG_WARNING, NULL,
851 "Unknown tokens in config string \"%s\"\n", conf);
852 break;
853 case 1:
854 logger->log(LOG_WARNING, NULL,
855 "Illegal values in config string: \"%s\"\n", conf);
856 break;
857 case 0: /* success */
858 break;
859 }
860
861 global_max_item_size = se->config.item_size_max;
862 }
863
864
fetch_core_settings(struct ndb_engine * engine,struct default_engine * se)865 int fetch_core_settings(struct ndb_engine *engine,
866 struct default_engine *se) {
867
868 /* Set up a struct config_item containing the keys we're interested in. */
869 struct config_item items[] = {
870 { .key = "cas_enabled",
871 .datatype = DT_BOOL,
872 .value.dt_bool = &engine->server_options.cas_enabled },
873 { .key = "maxconns",
874 .datatype = DT_SIZE,
875 .value.dt_size = &engine->server_options.maxconns },
876 { .key = "num_threads",
877 .datatype = DT_SIZE,
878 .value.dt_size = &engine->server_options.nthreads },
879 { .key = "verbosity",
880 .datatype = DT_SIZE,
881 .value.dt_size = &engine->server_options.verbose },
882 { .key = NULL }
883 };
884
885 DEBUG_ENTER();
886
887 /* This will call "stats settings" and parse the output into the config */
888 return se->server.core->get_config(items);
889 }
890
891
stats_menu(ADD_STAT add_stat,const void * cookie)892 ENGINE_ERROR_CODE stats_menu(ADD_STAT add_stat, const void *cookie) {
893 char key[128];
894 char val[128];
895 int klen, vlen;
896
897 klen = sprintf(key, "ndb");
898 vlen = sprintf(val, " NDB Engine: NDBAPI statistics");
899 add_stat(key, klen, val, vlen, cookie);
900
901 klen = sprintf(key, "errors");
902 vlen = sprintf(val, " NDB Engine: Error message counters");
903 add_stat(key, klen, val, vlen, cookie);
904
905 klen = sprintf(key, "scheduler");
906 vlen = sprintf(val, " NDB Engine: Scheduler internal statistics");
907 add_stat(key, klen, val, vlen, cookie);
908
909 klen = sprintf(key, "reconf");
910 vlen = sprintf(val, " NDB Engine: Current configuration version");
911 add_stat(key, klen, val, vlen, cookie);
912
913 klen = sprintf(key, "settings");
914 vlen = sprintf(val, " Server core: configurable settings");
915 add_stat(key, klen, val, vlen, cookie);
916
917 klen = sprintf(key, "reset");
918 vlen = sprintf(val, " Server core: reset counters");
919 add_stat(key, klen, val, vlen, cookie);
920
921 klen = sprintf(key, "detail");
922 vlen = sprintf(val, " Server core: use stats detail on|off|dump");
923 add_stat(key, klen, val, vlen, cookie);
924
925 klen = sprintf(key, "aggregate");
926 vlen = sprintf(val, " Server core: aggregated");
927 add_stat(key, klen, val, vlen, cookie);
928
929 klen = sprintf(key, "slabs");
930 vlen = sprintf(val, " Cache Engine: allocator");
931 add_stat(key, klen, val, vlen, cookie);
932
933 klen = sprintf(key, "items");
934 vlen = sprintf(val, " Cache Engine: itemes cached");
935 add_stat(key, klen, val, vlen, cookie);
936
937 klen = sprintf(key, "sizes");
938 vlen = sprintf(val, " Cache Engine: items per allocation class");
939 add_stat(key, klen, val, vlen, cookie);
940
941 klen = sprintf(key, "vbucket");
942 vlen = sprintf(val, " Cache Engine: dump vbucket table");
943 add_stat(key, klen, val, vlen, cookie);
944
945 klen = sprintf(key, "scrub");
946 vlen = sprintf(val, " Cache Engine: scrubber status");
947 add_stat(key, klen, val, vlen, cookie);
948
949 return ENGINE_SUCCESS;
950 }
951
952