1 /*
2 Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24 #include "my_config.h"
25 #include <stdlib.h>
26 #include <string.h>
27 #include <assert.h>
28 #include <pthread.h>
29
30 #include "config.h"
31
32 #include "default_engine.h"
33
34 #include <memcached/util.h>
35 #include <memcached/config_parser.h>
36 #include <memcached/extension.h>
37 #include <memcached/extension_loggers.h>
38
39 #include "ndb_engine.h"
40 #include "ndb_configuration.h"
41 #include "workitem.h"
42 #include "ndb_engine_private.h"
43 #include "debug.h"
44 #include "Scheduler.h"
45 #include "thread_identifier.h"
46 #include "timing.h"
47 #include "ndb_error_logger.h"
48
49 /* Global variables */
50 EXTENSION_LOGGER_DESCRIPTOR *logger;
51
52 /* Static and local to this file */
53 const char * set_ops[] = { "","add","set","replace","append","prepend","cas" };
54
55
ndb_handle(ENGINE_HANDLE * handle)56 static inline struct ndb_engine* ndb_handle(ENGINE_HANDLE* handle)
57 {
58 return (struct ndb_engine*) handle;
59 }
60
default_handle(struct ndb_engine * eng)61 static inline struct default_engine* default_handle(struct ndb_engine *eng)
62 {
63 return (struct default_engine*) eng->m_default_engine;
64 }
65
66
67 /*********** PRIVATE UTILITY FUNCTIONS BEGIN HERE ***********************/
68
get_my_pipeline_config(struct ndb_engine * eng)69 ndb_pipeline * get_my_pipeline_config(struct ndb_engine *eng) {
70 const thread_identifier * thread_id;
71
72 /* Try to fetch the pipeline from the thread identity */
73 thread_id = get_thread_id();
74 if(thread_id) {
75 return thread_id->pipeline;
76 }
77 else {
78 /* On the first call from each thread, initialize the pipeline */
79 return ndb_pipeline_initialize(eng);
80 }
81 }
82
83
84 /*********** FUNCTIONS IMPLEMENTING THE PUBLISHED API BEGIN HERE ********/
85
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)86 ENGINE_ERROR_CODE create_instance(uint64_t interface,
87 GET_SERVER_API get_server_api,
88 ENGINE_HANDLE **handle ) {
89
90 struct ndb_engine *ndb_eng;
91 const char * env_connectstring;
92 ENGINE_ERROR_CODE return_status;
93
94 SERVER_HANDLE_V1 *api = get_server_api();
95 if (interface != 1 || api == NULL) {
96 return ENGINE_ENOTSUP;
97 }
98
99 ndb_eng = (ndb_engine *)malloc(sizeof(struct ndb_engine));
100 if(ndb_eng == NULL) {
101 return ENGINE_ENOMEM;
102 }
103
104 logger = (EXTENSION_LOGGER_DESCRIPTOR *)api->extension->get_extension(EXTENSION_LOGGER);
105
106 ndb_eng->npipelines = 0;
107 ndb_eng->connected = false;
108
109 ndb_eng->engine.interface.interface = 1;
110 ndb_eng->engine.get_info = ndb_get_info;
111 ndb_eng->engine.initialize = ndb_initialize;
112 ndb_eng->engine.destroy = ndb_destroy;
113 ndb_eng->engine.allocate = ndb_allocate;
114 ndb_eng->engine.remove = ndb_remove;
115 ndb_eng->engine.release = ndb_release;
116 ndb_eng->engine.get = ndb_get;
117 ndb_eng->engine.get_stats = ndb_get_stats;
118 ndb_eng->engine.reset_stats = ndb_reset_stats;
119 ndb_eng->engine.store = ndb_store;
120 ndb_eng->engine.arithmetic = ndb_arithmetic;
121 ndb_eng->engine.flush = ndb_flush;
122 ndb_eng->engine.unknown_command = ndb_unknown_command;
123 ndb_eng->engine.item_set_cas = item_set_cas; /* reused */
124 ndb_eng->engine.get_item_info = ndb_get_item_info;
125 ndb_eng->engine.get_stats_struct = NULL;
126 ndb_eng->engine.aggregate_stats = NULL;
127 ndb_eng->engine.tap_notify = NULL;
128 ndb_eng->engine.get_tap_iterator = NULL;
129 ndb_eng->engine.errinfo = NULL;
130
131 ndb_eng->server = *api;
132 ndb_eng->get_server_api = get_server_api;
133
134 /* configuration, with default values*/
135 ndb_eng->startup_options.connectstring = "localhost:1186";
136 ndb_eng->startup_options.server_role = "default_role";
137 ndb_eng->startup_options.scheduler = 0;
138 ndb_eng->startup_options.debug_enable = false;
139 ndb_eng->startup_options.debug_detail = false;
140 ndb_eng->startup_options.reconf_enable = true;
141
142 /* Now let NDB_CONNECTSRING environment variable override the default */
143 env_connectstring = getenv("NDB_CONNECTSTRING");
144 if(env_connectstring)
145 ndb_eng->startup_options.connectstring = env_connectstring;
146
147 /* Set engine informational structure */
148 ndb_eng->info.info.description = "NDB Memcache " VERSION;
149 ndb_eng->info.info.num_features = 3;
150 ndb_eng->info.info.features[0].feature = ENGINE_FEATURE_CAS;
151 ndb_eng->info.info.features[0].description = NULL;
152 ndb_eng->info.info.features[1].feature = ENGINE_FEATURE_PERSISTENT_STORAGE;
153 ndb_eng->info.info.features[1].description = NULL;
154 ndb_eng->info.info.features[2].feature = ENGINE_FEATURE_LRU;
155 ndb_eng->info.info.features[2].description = NULL;
156
157 /* Now call create_instace() for the default engine */
158 return_status = default_engine_create_instance(interface, get_server_api,
159 & (ndb_eng->m_default_engine));
160
161 if(return_status == ENGINE_SUCCESS)
162 *handle = (ENGINE_HANDLE*) &ndb_eng->engine;
163
164 return return_status;
165 }
166
167
168 /*** get_info ***/
ndb_get_info(ENGINE_HANDLE * handle)169 static const engine_info* ndb_get_info(ENGINE_HANDLE* handle)
170 {
171 return & ndb_handle(handle)->info.info;
172 }
173
174
175 /*** initialize ***/
ndb_initialize(ENGINE_HANDLE * handle,const char * config_str)176 static ENGINE_ERROR_CODE ndb_initialize(ENGINE_HANDLE* handle,
177 const char* config_str)
178 {
179 int i, nthreads, debug_level;
180 time_point_t pump_time = 0;
181 ENGINE_ERROR_CODE return_status;
182 struct ndb_engine *ndb_eng = ndb_handle(handle);
183 struct default_engine *def_eng = default_handle(ndb_eng);
184 scheduler_options sched_opts;
185
186 /* Process options for both the ndb engine and the default engine: */
187 read_cmdline_options(ndb_eng, def_eng, config_str);
188
189 /* Initalize the debug library */
190 if(ndb_eng->startup_options.debug_detail)
191 debug_level = 2;
192 else if(ndb_eng->startup_options.debug_enable)
193 debug_level = 1;
194 else debug_level = 0;
195 DEBUG_INIT(NULL, debug_level);
196 DEBUG_ENTER();
197
198 /* Connect to the Primary cluster */
199 if(!(connect_to_primary_cluster(ndb_eng->startup_options.connectstring,
200 ndb_eng->startup_options.server_role))) {
201 logger->log(LOG_WARNING, 0, "Could not connect to NDB. Shutting down.\n");
202 return ENGINE_FAILED;
203 }
204 ndb_eng->connected = true;
205
206 /* Read configuration */
207 if(!(get_config())) {
208 logger->log(LOG_WARNING, 0, "Failed to read configuration -- shutting down.\n"
209 "(Did you run ndb_memcache_metadata.sql?)\n");
210 return ENGINE_FAILED;
211 }
212
213 /* Connect to additional clusters */
214 if(! open_connections_to_all_clusters()) {
215 logger->log(LOG_WARNING, 0, "open_connections_to_all_clusters() failed \n");
216 return ENGINE_FAILED;
217 }
218
219 /* Initialize Thread-Specific Storage */
220 initialize_thread_id_key();
221
222 /* Fetch some settings from the memcached core */
223 fetch_core_settings(ndb_eng, def_eng);
224 nthreads = ndb_eng->server_options.nthreads;
225
226 /* Initialize the error handler */
227 ndb_error_logger_init(def_eng->server.core, ndb_eng->server_options.verbose);
228
229 logger->log(LOG_WARNING, NULL, "Server started with %d threads.\n", nthreads);
230 logger->log(LOG_WARNING, NULL, "Priming the pump ... ");
231 timing_point(& pump_time);
232
233 /* prefetch data dictionary objects */
234 prefetch_dictionary_objects();
235
236 /* Build the scheduler options structure */
237 sched_opts.nthreads = ndb_eng->server_options.nthreads;
238 sched_opts.max_clients = ndb_eng->server_options.maxconns;
239
240 /* Allocate and initailize the pipelines, and their schedulers.
241 This will take some time; each pipeline creates slab and pool allocators,
242 and each scheduler may preallocate a large number of Ndb objects and
243 transactions, requiring multiple round trips to the data nodes. We
244 do this now to avoid the latency cost of setting up those objects at
245 runtime.
246 When the first request comes in, the pipeline, scheduler, and thread
247 will all get stitched together.
248 */
249 ndb_eng->pipelines = (void **)malloc(nthreads * sizeof(void *));
250 for(i = 0 ; i < nthreads ; i++) {
251 ndb_eng->pipelines[i] = (ndb_pipeline *)get_request_pipeline(i, ndb_eng);
252 if(! scheduler_initialize((ndb_pipeline *)ndb_eng->pipelines[i], & sched_opts)) {
253 logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n",
254 ndb_eng->startup_options.scheduler);
255 abort();
256 }
257 }
258
259 logger->log(LOG_WARNING, NULL, "done [%5.3f sec].\n",
260 (double) (timing_point(& pump_time) / (double) 1000000000));
261
262 /* Now initialize the default engine with no options (it already has them) */
263 return_status = def_eng->engine.initialize(ndb_eng->m_default_engine, "");
264
265 if(return_status == ENGINE_SUCCESS) {
266 set_initial_cas_ids(& ndb_eng->cas_hi, & ndb_eng->cas_lo);
267 }
268
269 print_debug_startup_info();
270
271 /* Listen for reconfiguration signals */
272 if(ndb_eng->startup_options.reconf_enable) {
273 start_reconfig_listener(ndb_eng->pipelines[0]);
274 }
275
276 return return_status;
277 }
278
279
280 /*** destroy ***/
ndb_destroy(ENGINE_HANDLE * handle,bool force)281 static void ndb_destroy(ENGINE_HANDLE* handle, bool force)
282 {
283 struct ndb_engine* ndb_eng;
284 struct default_engine *def_eng;
285 DEBUG_ENTER();
286
287 ndb_eng = ndb_handle(handle);
288 def_eng = default_handle(ndb_eng);
289
290 // TODO: Shutdown just the Scheduler Global
291 for(atomic_int32_t i = 0 ; i < ndb_eng->npipelines; i ++) {
292 ndb_pipeline *p = (ndb_pipeline *)ndb_eng->pipelines[i];
293 if(p) {
294 scheduler_shutdown(p);
295 ndb_pipeline_free(p);
296 }
297 }
298
299 disconnect_all();
300 def_eng->engine.destroy(ndb_eng->m_default_engine, force);
301 }
302
303
304 /* CALL FLOWS
305 ----------
306 GET: eng.get(), eng.get_item_info()*, eng.release()*
307 DELETE: eng.remove()
308 SET (etc): eng.allocate(), eng.item_set_cas(), eng.get_item_info(),
309 eng.store(), eng.release()*
310 INCR: eng.arithmetic()
311 FLUSH: eng.flush()
312
313 * Called only on success (ENGINE_SUCCESS or ENGINE_EWOULDBLOCK)
314 */
315
316
317
318 /*** Release scheduler resources and free workitem ****/
release_and_free(workitem * wqitem)319 void release_and_free(workitem *wqitem) {
320 DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id);
321 scheduler_release(wqitem->pipeline, wqitem);
322 workitem_free(wqitem);
323 }
324
325
326 /*** allocate ***/
327
328 /* Allocate gets a struct item from the slab allocator, and fills in
329 everything but the value. It seems like we can just pass this on to
330 the default engine; we'll intercept it later in store().
331 This is also called directly from finalize_read() in the commit thread.
332 */
ndb_allocate(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const size_t nkey,const size_t nbytes,const int flags,const rel_time_t exptime)333 static ENGINE_ERROR_CODE ndb_allocate(ENGINE_HANDLE* handle,
334 const void* cookie,
335 item **item,
336 const void* key,
337 const size_t nkey,
338 const size_t nbytes,
339 const int flags,
340 const rel_time_t exptime)
341 {
342 struct ndb_engine* ndb_eng;
343 struct default_engine *def_eng;
344 DEBUG_ENTER_DETAIL();
345
346 ndb_eng = ndb_handle(handle);
347 def_eng = default_handle(ndb_eng);
348
349 return def_eng->engine.allocate(ndb_eng->m_default_engine, cookie, item,
350 key, nkey, nbytes, flags, exptime);
351 }
352
353
354 /*** remove ***/
ndb_remove(ENGINE_HANDLE * handle,const void * cookie,const void * key,const size_t nkey,uint64_t cas,uint16_t vbucket)355 static ENGINE_ERROR_CODE ndb_remove(ENGINE_HANDLE* handle,
356 const void* cookie,
357 const void* key,
358 const size_t nkey,
359 uint64_t cas,
360 uint16_t vbucket __attribute__((unused)))
361 {
362 struct ndb_engine* ndb_eng = ndb_handle(handle);
363 struct default_engine *def_eng = default_handle(ndb_eng);
364 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
365 ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
366 prefix_info_t prefix;
367 workitem *wqitem;
368
369 /* Is this a callback after completed I/O? */
370 wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
371 if(wqitem) {
372 DEBUG_PRINT_DETAIL("Got callback: %s", wqitem->status->comment);
373 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); //pop
374 release_and_free(wqitem);
375 return wqitem->status->status;
376 }
377
378 prefix = get_prefix_info_for_key(nkey, (const char *)key);
379 DEBUG_PRINT_DETAIL("prefix: %d", prefix.prefix_id);
380
381 /* DELETE.
382 You should attempt the cache delete, regardless of whether the database
383 delete succeeds or fails. So, we simply perform the cache delete first,
384 here, and then queue the database delete.
385 */
386
387 if(prefix.do_mc_delete) { /* Cache Delete */
388 hash_item *it = item_get(def_eng, key, nkey);
389 return_status = ENGINE_KEY_ENOENT;
390 if (it != NULL) {
391 // ACTUALLY NO???
392 /* In the binary protocol there is such a thing as a CAS delete.
393 This is the CAS check. If we will also be deleting from the database,
394 there are two possibilities:
395 1: The CAS matches; perform the delete.
396 2: The CAS doesn't match; delete the item because it's stale.
397 Therefore we skip the check altogether if(do_db_delete).
398 */
399 if(! prefix.do_db_delete)
400 if(cas && (cas != item_get_cas(it)))
401 return ENGINE_KEY_EEXISTS;
402
403 item_unlink(def_eng, it);
404 item_release(def_eng, it);
405 return_status = ENGINE_SUCCESS;
406 }
407 }
408
409 if(prefix.do_db_delete) { /* Database Delete */
410 wqitem = (workitem *)new_workitem_for_delete_op(pipeline, prefix, cookie, nkey, (const char *)key, & cas);
411 DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
412 return_status = scheduler_schedule(pipeline, wqitem);
413 if(return_status != ENGINE_EWOULDBLOCK) {
414 release_and_free(wqitem);
415 }
416 }
417
418 return return_status;
419 }
420
421
422 /*** release ***/
ndb_release(ENGINE_HANDLE * handle,const void * cookie,item * item)423 static void ndb_release(ENGINE_HANDLE* handle, const void *cookie,
424 item* item)
425 {
426 struct ndb_engine* ndb_eng = ndb_handle(handle);
427 struct default_engine *def_eng = default_handle(ndb_eng);
428
429 workitem *wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
430 if(wqitem) {
431 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
432 release_and_free(wqitem);
433 }
434
435 if(item && (item != wqitem)) {
436 DEBUG_PRINT_DETAIL("Releasing a hash item.");
437 item_release(def_eng, (hash_item *) item);
438 }
439 }
440
441
442 /*** get ***/
ndb_get(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const int nkey,uint16_t vbucket)443 static ENGINE_ERROR_CODE ndb_get(ENGINE_HANDLE* handle,
444 const void* cookie,
445 item** item,
446 const void* key,
447 const int nkey,
448 uint16_t vbucket __attribute__((unused)))
449 {
450 struct ndb_engine* ndb_eng = ndb_handle(handle);
451 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
452 struct workitem *wqitem;
453 prefix_info_t prefix;
454 ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
455
456 wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
457
458 /* Is this a callback after completed I/O? */
459 if(wqitem && ! wqitem->base.complete) {
460 DEBUG_PRINT_DETAIL("Got read callback on workitem %d.%d: %s",
461 wqitem->pipeline->id, wqitem->id, wqitem->status->comment);
462 *item = wqitem->cache_item;
463 wqitem->base.complete = 1;
464 return_status = wqitem->status->status;
465
466 /* On success the workitem will be read in ndb_get_item_info, then released.
467 Otherwise: */
468 if(return_status != ENGINE_SUCCESS) {
469 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
470 release_and_free(wqitem);
471 }
472
473 return return_status;
474 }
475
476 prefix = get_prefix_info_for_key(nkey, (const char *)key);
477
478 /* Cache read */
479 /* FIXME: Use the public APIs */
480 if(prefix.do_mc_read) {
481 *item = item_get(default_handle(ndb_eng), key, nkey);
482 if (*item != NULL) {
483 DEBUG_PRINT(" cache hit");
484 return ENGINE_SUCCESS;
485 }
486 DEBUG_PRINT(" cache miss");
487 }
488
489 /* Build and send the NDB transaction */
490 if(prefix.do_db_read) {
491 wqitem = (workitem *)new_workitem_for_get_op(wqitem, pipeline, prefix, cookie, nkey, (const char *)key);
492 DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
493 return_status = scheduler_schedule(pipeline, wqitem);
494 if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
495 /* On error we must pop and free */
496 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
497 release_and_free(wqitem);
498 }
499 }
500
501 return return_status;
502 }
503
504
505 /*** get_stats ***/
ndb_get_stats(ENGINE_HANDLE * handle,const void * cookie,const char * stat_key,int nkey,ADD_STAT add_stat)506 static ENGINE_ERROR_CODE ndb_get_stats(ENGINE_HANDLE* handle,
507 const void *cookie,
508 const char *stat_key,
509 int nkey,
510 ADD_STAT add_stat)
511 {
512 struct ndb_engine* ndb_eng = ndb_handle(handle);
513 struct default_engine *def_eng = default_handle(ndb_eng);
514 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
515
516 DEBUG_ENTER_DETAIL();
517
518 if(stat_key) {
519 if(strncasecmp(stat_key, "menu", 4) == 0)
520 return stats_menu(add_stat, cookie);
521
522 if((strncasecmp(stat_key, "ndb", 3) == 0) ||
523 (strncasecmp(stat_key, "scheduler", 9) == 0) ||
524 (strncasecmp(stat_key, "reconf", 6) == 0) ||
525 (strncasecmp(stat_key, "errors", 6) == 0))
526 {
527 /* NDB Engine stats */
528 pipeline_add_stats(pipeline, stat_key, add_stat, cookie);
529 return ENGINE_SUCCESS;
530 }
531 }
532
533 /* Default engine stats */
534 return def_eng->engine.get_stats(ndb_eng->m_default_engine, cookie,
535 stat_key, nkey, add_stat);
536 }
537
538
539 /*** reset_stats ***/
ndb_reset_stats(ENGINE_HANDLE * handle,const void * cookie)540 static void ndb_reset_stats(ENGINE_HANDLE* handle,
541 const void *cookie)
542 {
543 struct ndb_engine* ndb_eng = ndb_handle(handle);
544 struct default_engine *def_eng = default_handle(ndb_eng);
545 /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
546 /* DEBUG_ENTER(); */
547 def_eng->engine.reset_stats(ndb_eng->m_default_engine, cookie);
548 }
549
550
551 /*** store ***/
552
ndb_store(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t * cas,ENGINE_STORE_OPERATION op,uint16_t vbucket)553 static ENGINE_ERROR_CODE ndb_store(ENGINE_HANDLE* handle,
554 const void *cookie,
555 item* item,
556 uint64_t *cas,
557 ENGINE_STORE_OPERATION op,
558 uint16_t vbucket __attribute__((unused)))
559 {
560 struct ndb_engine* ndb_eng = ndb_handle(handle);
561 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
562 ENGINE_ERROR_CODE return_status = ENGINE_NOT_STORED;
563 prefix_info_t prefix;
564
565 /* Is this a callback after completed I/O? */
566 workitem *wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
567 if(wqitem) {
568 DEBUG_PRINT_DETAIL("Got callback on workitem %d.%d: %s",
569 pipeline->id, wqitem->id, wqitem->status->comment);
570 return wqitem->status->status;
571 }
572
573 prefix = get_prefix_info_for_key(hash_item_get_key_len((hash_item *)item),
574 hash_item_get_key((hash_item *)item));
575
576
577 /* Build and send the NDB transaction.
578 If there is also a cache operation, it must be deferred until we know
579 whether the database operation has succeeded or failed.
580 */
581 if(prefix.do_db_write) {
582 wqitem = (workitem *)new_workitem_for_store_op(pipeline, op, prefix, cookie, (hash_item *)item, cas);
583 DEBUG_PRINT("[%s] prefix %d; CAS %llu; use mc/db: %d/%d -- creating workitem %d.%d",
584 set_ops[op], prefix.prefix_id, cas ? *cas : 0,
585 prefix.do_mc_write, prefix.do_db_write,
586 pipeline->id, wqitem->id);
587 return_status = scheduler_schedule(pipeline, wqitem);
588 if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
589 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
590 release_and_free(wqitem);
591 }
592 }
593 else if(prefix.do_mc_write) {
594 DEBUG_PRINT("[%s] prefix %d; CAS %llu; use mc/db: %d/%d -- cache-only store.",
595 set_ops[op], prefix.prefix_id, cas ? *cas : 0,
596 prefix.do_mc_write, prefix.do_db_write);
597 return_status = store_item(default_handle(ndb_eng), (hash_item *)item, cas, op, cookie);
598 }
599
600 return return_status;
601 }
602
603
604 /*** arithmetic ***/
ndb_arithmetic(ENGINE_HANDLE * handle,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result,uint16_t vbucket)605 static ENGINE_ERROR_CODE ndb_arithmetic(ENGINE_HANDLE* handle,
606 const void* cookie,
607 const void* key,
608 const int nkey,
609 const bool increment,
610 const bool create,
611 const uint64_t delta,
612 const uint64_t initial,
613 const rel_time_t exptime,
614 uint64_t *cas,
615 uint64_t *result,
616 uint16_t vbucket)
617 {
618 struct ndb_engine* ndb_eng = ndb_handle(handle);
619 struct default_engine *def_eng = default_handle(ndb_eng);
620 ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
621 struct workitem *wqitem;
622 prefix_info_t prefix;
623 ENGINE_ERROR_CODE return_status;
624
625 /* Is this a callback after completed I/O? */
626 wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
627 if(wqitem && ! wqitem->base.complete) {
628 DEBUG_PRINT_DETAIL("Got arithmetic callback: %s", wqitem->status->comment);
629 return_status = wqitem->status->status;
630 wqitem->base.complete = 1;
631 *result = wqitem->math_value;
632 /* There will be no call to release(), so pop and free now. */
633 ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
634 release_and_free(wqitem);
635 return return_status;
636 }
637
638 prefix = get_prefix_info_for_key(nkey, (const char *)key);
639 DEBUG_PRINT("prefix: %d delta: %d create: %d initial: %d ",
640 prefix.prefix_id, (int) delta, (int) create, (int) initial);
641
642 /* For cache-only prefixes, forward this to the default engine */
643 if(! prefix.use_ndb) {
644 return def_eng->engine.arithmetic(ndb_eng->m_default_engine, cookie,
645 key, nkey, increment, create,
646 delta, initial, exptime, cas,
647 result, vbucket);
648 }
649
650 /* A math operation contains both a read and a write. */
651 if(! (prefix.has_math_col && prefix.do_db_read && prefix.do_db_write)) {
652 logger->log(LOG_WARNING, 0, "NDB INCR/DECR is not allowed for this key.\n");
653 DEBUG_PRINT("REJECTED : %d %d %d", (int) prefix.has_math_col,
654 (int) prefix.do_db_read , (int) prefix.do_db_write);
655 return ENGINE_NOT_STORED;
656 }
657
658 wqitem = (workitem *)new_workitem_for_arithmetic(pipeline, prefix, cookie, (const char *)key, nkey,
659 increment, create, delta, initial, cas);
660 DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
661
662 return_status = scheduler_schedule(pipeline, wqitem);
663
664 if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS)))
665 release_and_free(wqitem);
666
667 return return_status;
668 }
669
670 /*** flush ***/
ndb_flush(ENGINE_HANDLE * handle,const void * cookie,time_t when)671 static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle,
672 const void* cookie, time_t when)
673 {
674 /*
675 Notes on flush:
676 The server will call *only* into ndb_flush (not to allocate or release).
677 The NDB engine ignores the "when" parameter.
678 Flush operations have special handling, outside of the scheduler.
679 They are performed synchronously.
680 And we always send the flush command to the cache engine.
681 */
682 struct ndb_engine* ndb_eng;
683 struct default_engine *def_eng;
684 ndb_pipeline *pipeline;
685 DEBUG_ENTER();
686
687 ndb_eng = ndb_handle(handle);
688 def_eng = default_handle(ndb_eng);
689 pipeline = get_my_pipeline_config(ndb_eng);
690
691 (void) def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when);
692 // TODO: Why not return ENGINE_EWOULDBLOCK first?
693 return pipeline_flush_all(pipeline);
694 }
695
696
697 /*** unknown_command ***/
ndb_unknown_command(ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)698 static ENGINE_ERROR_CODE ndb_unknown_command(ENGINE_HANDLE* handle,
699 const void* cookie,
700 protocol_binary_request_header *request,
701 ADD_RESPONSE response)
702 {
703 struct ndb_engine* ndb_eng;
704 struct default_engine *def_eng;
705 DEBUG_ENTER();
706
707 ndb_eng = ndb_handle(handle);
708 def_eng = default_handle(ndb_eng);
709
710 return def_eng->engine.unknown_command(ndb_eng->m_default_engine, cookie,
711 request, response);
712 }
713
714
715 /*** get_item_info ***/
ndb_get_item_info(ENGINE_HANDLE * handle,const void * cookie,const item * item,item_info * item_info)716 static bool ndb_get_item_info(ENGINE_HANDLE *handle,
717 const void *cookie,
718 const item* item,
719 item_info *item_info)
720 {
721 struct ndb_engine* ndb_eng = ndb_handle(handle);
722 struct default_engine *def_eng = default_handle(ndb_eng);
723
724 workitem *wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
725
726 if(wqitem == NULL) {
727 DEBUG_PRINT_DETAIL(" cache-only");
728 return def_eng->engine.get_item_info(handle, cookie, item, item_info);
729 }
730
731 if (item_info->nvalue < 1) {
732 DEBUG_PRINT_DETAIL("nvalue too small.");
733 return false;
734 }
735
736 if(wqitem->base.has_value) {
737 /* Use the workitem. */
738 item_info->cas = wqitem->cas ? *(wqitem->cas) : 0;
739 item_info->exptime = 0;
740 item_info->nbytes = wqitem->value_size;
741 item_info->flags = wqitem->math_flags;
742 item_info->clsid = slabs_clsid(default_handle(ndb_eng), wqitem->value_size);
743 item_info->nkey = wqitem->base.nkey;
744 item_info->nvalue = 1; /* how many iovecs */
745 item_info->key = wqitem->key;
746 item_info->value[0].iov_base = wqitem->value_ptr;
747 item_info->value[0].iov_len = wqitem->value_size;
748 DEBUG_PRINT_DETAIL("workitem %d.%d [%s].", wqitem->pipeline->id, wqitem->id,
749 workitem_get_operation(wqitem));
750 return true;
751 }
752 else {
753 /* Use a hash item */
754 hash_item *it = (hash_item*) item;
755 item_info->cas = hash_item_get_cas(it);
756 item_info->exptime = it->exptime;
757 item_info->nbytes = wqitem ? wqitem->value_size : 0;
758 item_info->flags = it->flags;
759 item_info->clsid = it->slabs_clsid;
760 item_info->nkey = it->nkey;
761 item_info->nvalue = 1;
762 item_info->key = hash_item_get_key(it);
763 item_info->value[0].iov_base = hash_item_get_data(it);
764 item_info->value[0].iov_len = item_info->nbytes;
765 if(item_info->nbytes) {
766 DEBUG_PRINT_DETAIL("hash_item [KEY: %.*s][CAS: %llu][nbytes: %d].", it->nkey,
767 hash_item_get_key(it), item_info->cas, item_info->nbytes);
768 }
769 else {
770 DEBUG_PRINT_DETAIL(" new hash_item");
771 }
772 return true;
773 }
774 }
775
776 namespace {
777
778 /*
779 C++ doesn't have C99's designated initializers until C++20, and we need to
780 initialize an union, so create some constructors for config_item as convenience.
781 */
Config_item_string(const char * key,char ** value)782 config_item Config_item_string(const char *key, char **value)
783 {
784 config_item item;
785 item.key= key;
786 item.datatype= DT_STRING;
787 item.value.dt_string= value;
788 return item;
789 }
790
Config_item_bool(const char * key,bool * value)791 config_item Config_item_bool(const char *key, bool *value)
792 {
793 config_item item;
794 item.key= key;
795 item.datatype= DT_BOOL;
796 item.value.dt_bool= value;
797 return item;
798 }
799
Config_item_size_t(const char * key,size_t * value)800 config_item Config_item_size_t(const char *key, size_t *value)
801 {
802 config_item item;
803 item.key= key;
804 item.datatype= DT_SIZE;
805 item.value.dt_size= value;
806 return item;
807 }
808
Config_item_float(const char * key,float * value)809 config_item Config_item_float(const char *key, float *value)
810 {
811 config_item item;
812 item.key= key;
813 item.datatype= DT_FLOAT;
814 item.value.dt_float= value;
815 return item;
816 }
817
Config_item_configfile(const char * key)818 config_item Config_item_configfile(const char *key)
819 {
820 config_item item;
821 item.key= key;
822 item.datatype= DT_CONFIGFILE;
823 return item;
824 }
825
826 } // namespace
827
828 /* read_cmdline_options requires duplicating code from the default engine.
829 If the default engine supports a new option, you will need to add it here.
830 We create a single config_items structure containing options for both
831 engines.
832 */
read_cmdline_options(struct ndb_engine * ndb,struct default_engine * se,const char * conf)833 void read_cmdline_options(struct ndb_engine *ndb, struct default_engine *se,
834 const char * conf)
835 {
836 int did_parse;
837 DEBUG_ENTER();
838
839 did_parse = 0; /* 0 = success from parse_config() */
840
841 if (conf != NULL) {
842 struct config_item items[] = {
843 /* NDB OPTIONS */
844 Config_item_string("connectstring", (char **) &(ndb->startup_options.connectstring)),
845 Config_item_string("role", (char **) &(ndb->startup_options.server_role)),
846 Config_item_string("scheduler", (char **) &(ndb->startup_options.scheduler)),
847 #ifdef DEBUG_OUTPUT
848 Config_item_bool("debug", &(ndb->startup_options.debug_enable)),
849 Config_item_bool("detail", &(ndb->startup_options.debug_detail)),
850 #endif
851 Config_item_bool("reconf", &(ndb->startup_options.reconf_enable)),
852
853 /* DEFAULT ENGINE OPTIONS */
854 Config_item_bool("use_cas", &se->config.use_cas),
855 Config_item_size_t("verbose", &se->config.verbose),
856 Config_item_bool("eviction", &se->config.evict_to_free),
857 Config_item_size_t("cache_size", &se->config.maxbytes),
858 Config_item_bool("preallocate", &se->config.preallocate),
859 Config_item_float("factor", &se->config.factor),
860 Config_item_size_t("chunk_size", &se->config.chunk_size),
861 Config_item_size_t("item_size_max", &se->config.item_size_max),
862 Config_item_configfile("config_file"),
863 { nullptr }
864 };
865
866 did_parse = se->server.core->parse_config(conf, items, stderr);
867 }
868 switch(did_parse) {
869 case -1:
870 logger->log(LOG_WARNING, NULL,
871 "Unknown tokens in config string \"%s\"\n", conf);
872 break;
873 case 1:
874 logger->log(LOG_WARNING, NULL,
875 "Illegal values in config string: \"%s\"\n", conf);
876 break;
877 case 0: /* success */
878 break;
879 }
880
881 global_max_item_size = se->config.item_size_max;
882 }
883
884
fetch_core_settings(struct ndb_engine * engine,struct default_engine * se)885 int fetch_core_settings(struct ndb_engine *engine,
886 struct default_engine *se) {
887
888 /* Set up a struct config_item containing the keys we're interested in. */
889 struct config_item items[] = {
890 Config_item_bool("cas_enabled", &engine->server_options.cas_enabled),
891 Config_item_size_t("maxconns", &engine->server_options.maxconns),
892 Config_item_size_t("num_threads", &engine->server_options.nthreads),
893 Config_item_size_t("verbosity", &engine->server_options.verbose),
894 { nullptr }
895 };
896
897 DEBUG_ENTER();
898
899 /* This will call "stats settings" and parse the output into the config */
900 return se->server.core->get_config(items);
901 }
902
903
stats_menu(ADD_STAT add_stat,const void * cookie)904 ENGINE_ERROR_CODE stats_menu(ADD_STAT add_stat, const void *cookie) {
905 char key[128];
906 char val[128];
907 int klen, vlen;
908
909 klen = sprintf(key, "ndb");
910 vlen = sprintf(val, " NDB Engine: NDBAPI statistics");
911 add_stat(key, klen, val, vlen, cookie);
912
913 klen = sprintf(key, "errors");
914 vlen = sprintf(val, " NDB Engine: Error message counters");
915 add_stat(key, klen, val, vlen, cookie);
916
917 klen = sprintf(key, "scheduler");
918 vlen = sprintf(val, " NDB Engine: Scheduler internal statistics");
919 add_stat(key, klen, val, vlen, cookie);
920
921 klen = sprintf(key, "reconf");
922 vlen = sprintf(val, " NDB Engine: Current configuration version");
923 add_stat(key, klen, val, vlen, cookie);
924
925 klen = sprintf(key, "settings");
926 vlen = sprintf(val, " Server core: configurable settings");
927 add_stat(key, klen, val, vlen, cookie);
928
929 klen = sprintf(key, "reset");
930 vlen = sprintf(val, " Server core: reset counters");
931 add_stat(key, klen, val, vlen, cookie);
932
933 klen = sprintf(key, "detail");
934 vlen = sprintf(val, " Server core: use stats detail on|off|dump");
935 add_stat(key, klen, val, vlen, cookie);
936
937 klen = sprintf(key, "aggregate");
938 vlen = sprintf(val, " Server core: aggregated");
939 add_stat(key, klen, val, vlen, cookie);
940
941 klen = sprintf(key, "slabs");
942 vlen = sprintf(val, " Cache Engine: allocator");
943 add_stat(key, klen, val, vlen, cookie);
944
945 klen = sprintf(key, "items");
946 vlen = sprintf(val, " Cache Engine: itemes cached");
947 add_stat(key, klen, val, vlen, cookie);
948
949 klen = sprintf(key, "sizes");
950 vlen = sprintf(val, " Cache Engine: items per allocation class");
951 add_stat(key, klen, val, vlen, cookie);
952
953 klen = sprintf(key, "vbucket");
954 vlen = sprintf(val, " Cache Engine: dump vbucket table");
955 add_stat(key, klen, val, vlen, cookie);
956
957 klen = sprintf(key, "scrub");
958 vlen = sprintf(val, " Cache Engine: scrubber status");
959 add_stat(key, klen, val, vlen, cookie);
960
961 return ENGINE_SUCCESS;
962 }
963
964