1 /*
2  Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License, version 2.0,
6  as published by the Free Software Foundation.
7 
8  This program is also distributed with certain software (including
9  but not limited to OpenSSL) that is licensed under separate terms,
10  as designated in a particular file or component or in included license
11  documentation.  The authors of MySQL hereby grant you an additional
12  permission to link the program and your derivative works with the
13  separately licensed software that they have included with MySQL.
14 
15  This program is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  GNU General Public License, version 2.0, for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with this program; if not, write to the Free Software
22  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23  */
24 #include "my_config.h"
25 #include <stdlib.h>
26 #include <string.h>
27 #include <assert.h>
28 #include <pthread.h>
29 
30 #include "config.h"
31 
32 #include "default_engine.h"
33 
34 #include <memcached/util.h>
35 #include <memcached/config_parser.h>
36 #include <memcached/extension.h>
37 #include <memcached/extension_loggers.h>
38 
39 #include "ndb_engine.h"
40 #include "ndb_configuration.h"
41 #include "workitem.h"
42 #include "ndb_engine_private.h"
43 #include "debug.h"
44 #include "Scheduler.h"
45 #include "thread_identifier.h"
46 #include "timing.h"
47 #include "ndb_error_logger.h"
48 
49 /* Global variables */
50 EXTENSION_LOGGER_DESCRIPTOR *logger;
51 
52 /* Static and local to this file */
53 const char * set_ops[] = { "","add","set","replace","append","prepend","cas" };
54 
55 
ndb_handle(ENGINE_HANDLE * handle)56 static inline struct ndb_engine* ndb_handle(ENGINE_HANDLE* handle)
57 {
58   return (struct ndb_engine*) handle;
59 }
60 
default_handle(struct ndb_engine * eng)61 static inline struct default_engine* default_handle(struct ndb_engine *eng)
62 {
63   return (struct default_engine*) eng->m_default_engine;
64 }
65 
66 
67 /*********** PRIVATE UTILITY FUNCTIONS BEGIN HERE ***********************/
68 
get_my_pipeline_config(struct ndb_engine * eng)69 ndb_pipeline * get_my_pipeline_config(struct ndb_engine *eng) {
70   const thread_identifier * thread_id;
71 
72   /* Try to fetch the pipeline from the thread identity */
73   thread_id = get_thread_id();
74   if(thread_id) {
75     return thread_id->pipeline;
76   }
77   else {
78     /* On the first call from each thread, initialize the pipeline */
79     return ndb_pipeline_initialize(eng);
80   }
81 }
82 
83 
84 /*********** FUNCTIONS IMPLEMENTING THE PUBLISHED API BEGIN HERE ********/
85 
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)86 ENGINE_ERROR_CODE create_instance(uint64_t interface,
87                                   GET_SERVER_API get_server_api,
88                                   ENGINE_HANDLE **handle ) {
89 
90   struct ndb_engine *ndb_eng;
91   const char * env_connectstring;
92   ENGINE_ERROR_CODE return_status;
93 
94   SERVER_HANDLE_V1 *api = get_server_api();
95   if (interface != 1 || api == NULL) {
96     return ENGINE_ENOTSUP;
97   }
98 
99   ndb_eng = (ndb_engine *)malloc(sizeof(struct ndb_engine));
100   if(ndb_eng == NULL) {
101     return ENGINE_ENOMEM;
102   }
103 
104   logger = (EXTENSION_LOGGER_DESCRIPTOR *)api->extension->get_extension(EXTENSION_LOGGER);
105 
106   ndb_eng->npipelines = 0;
107   ndb_eng->connected  = false;
108 
109   ndb_eng->engine.interface.interface = 1;
110   ndb_eng->engine.get_info         = ndb_get_info;
111   ndb_eng->engine.initialize       = ndb_initialize;
112   ndb_eng->engine.destroy          = ndb_destroy;
113   ndb_eng->engine.allocate         = ndb_allocate;
114   ndb_eng->engine.remove           = ndb_remove;
115   ndb_eng->engine.release          = ndb_release;
116   ndb_eng->engine.get              = ndb_get;
117   ndb_eng->engine.get_stats        = ndb_get_stats;
118   ndb_eng->engine.reset_stats      = ndb_reset_stats;
119   ndb_eng->engine.store            = ndb_store;
120   ndb_eng->engine.arithmetic       = ndb_arithmetic;
121   ndb_eng->engine.flush            = ndb_flush;
122   ndb_eng->engine.unknown_command  = ndb_unknown_command;
123   ndb_eng->engine.item_set_cas     = item_set_cas;           /* reused */
124   ndb_eng->engine.get_item_info    = ndb_get_item_info;
125   ndb_eng->engine.get_stats_struct = NULL;
126   ndb_eng->engine.aggregate_stats  = NULL;
127   ndb_eng->engine.tap_notify       = NULL;
128   ndb_eng->engine.get_tap_iterator = NULL;
129   ndb_eng->engine.errinfo          = NULL;
130 
131   ndb_eng->server = *api;
132   ndb_eng->get_server_api = get_server_api;
133 
134   /* configuration, with default values*/
135   ndb_eng->startup_options.connectstring = "localhost:1186";
136   ndb_eng->startup_options.server_role   = "default_role";
137   ndb_eng->startup_options.scheduler     = 0;
138   ndb_eng->startup_options.debug_enable  = false;
139   ndb_eng->startup_options.debug_detail  = false;
140   ndb_eng->startup_options.reconf_enable = true;
141 
142   /* Now let NDB_CONNECTSRING environment variable override the default */
143   env_connectstring = getenv("NDB_CONNECTSTRING");
144   if(env_connectstring)
145     ndb_eng->startup_options.connectstring = env_connectstring;
146 
147   /* Set engine informational structure */
148   ndb_eng->info.info.description = "NDB Memcache " VERSION;
149   ndb_eng->info.info.num_features = 3;
150   ndb_eng->info.info.features[0].feature = ENGINE_FEATURE_CAS;
151   ndb_eng->info.info.features[0].description = NULL;
152   ndb_eng->info.info.features[1].feature = ENGINE_FEATURE_PERSISTENT_STORAGE;
153   ndb_eng->info.info.features[1].description = NULL;
154   ndb_eng->info.info.features[2].feature = ENGINE_FEATURE_LRU;
155   ndb_eng->info.info.features[2].description = NULL;
156 
157   /* Now call create_instace() for the default engine */
158   return_status = default_engine_create_instance(interface, get_server_api,
159                                                  & (ndb_eng->m_default_engine));
160 
161   if(return_status == ENGINE_SUCCESS)
162     *handle = (ENGINE_HANDLE*) &ndb_eng->engine;
163 
164   return return_status;
165 }
166 
167 
168 /*** get_info ***/
ndb_get_info(ENGINE_HANDLE * handle)169 static const engine_info* ndb_get_info(ENGINE_HANDLE* handle)
170 {
171   return & ndb_handle(handle)->info.info;
172 }
173 
174 
175 /*** initialize ***/
ndb_initialize(ENGINE_HANDLE * handle,const char * config_str)176 static ENGINE_ERROR_CODE ndb_initialize(ENGINE_HANDLE* handle,
177                                         const char* config_str)
178 {
179   int i, nthreads, debug_level;
180   time_point_t pump_time = 0;
181   ENGINE_ERROR_CODE return_status;
182   struct ndb_engine *ndb_eng = ndb_handle(handle);
183   struct default_engine *def_eng = default_handle(ndb_eng);
184   scheduler_options sched_opts;
185 
186   /* Process options for both the ndb engine and the default engine:  */
187   read_cmdline_options(ndb_eng, def_eng, config_str);
188 
189   /* Initalize the debug library */
190   if(ndb_eng->startup_options.debug_detail)
191     debug_level = 2;
192   else if(ndb_eng->startup_options.debug_enable)
193     debug_level = 1;
194   else debug_level = 0;
195   DEBUG_INIT(NULL, debug_level);
196   DEBUG_ENTER();
197 
198   /* Connect to the Primary cluster */
199   if(!(connect_to_primary_cluster(ndb_eng->startup_options.connectstring,
200                                   ndb_eng->startup_options.server_role))) {
201      logger->log(LOG_WARNING, 0, "Could not connect to NDB.  Shutting down.\n");
202      return ENGINE_FAILED;
203   }
204   ndb_eng->connected = true;
205 
206   /* Read configuration */
207   if(!(get_config())) {
208      logger->log(LOG_WARNING, 0, "Failed to read configuration -- shutting down.\n"
209                  "(Did you run ndb_memcache_metadata.sql?)\n");
210      return ENGINE_FAILED;
211   }
212 
213   /* Connect to additional clusters */
214   if(! open_connections_to_all_clusters()) {
215     logger->log(LOG_WARNING, 0, "open_connections_to_all_clusters() failed \n");
216    return ENGINE_FAILED;
217   }
218 
219   /* Initialize Thread-Specific Storage */
220   initialize_thread_id_key();
221 
222   /* Fetch some settings from the memcached core */
223   fetch_core_settings(ndb_eng, def_eng);
224   nthreads = ndb_eng->server_options.nthreads;
225 
226   /* Initialize the error handler */
227   ndb_error_logger_init(def_eng->server.core, ndb_eng->server_options.verbose);
228 
229   logger->log(LOG_WARNING, NULL, "Server started with %d threads.\n", nthreads);
230   logger->log(LOG_WARNING, NULL, "Priming the pump ... ");
231   timing_point(& pump_time);
232 
233   /* prefetch data dictionary objects */
234   prefetch_dictionary_objects();
235 
236   /* Build the scheduler options structure */
237   sched_opts.nthreads = ndb_eng->server_options.nthreads;
238   sched_opts.max_clients = ndb_eng->server_options.maxconns;
239 
240   /* Allocate and initailize the pipelines, and their schedulers.
241      This will take some time; each pipeline creates slab and pool allocators,
242      and each scheduler may preallocate a large number of Ndb objects and
243      transactions, requiring multiple round trips to the data nodes.  We
244      do this now to avoid the latency cost of setting up those objects at
245      runtime.
246      When the first request comes in, the pipeline, scheduler, and thread
247      will all get stitched together.
248   */
249   ndb_eng->pipelines  = (void **)malloc(nthreads * sizeof(void *));
250   for(i = 0 ; i < nthreads ; i++) {
251     ndb_eng->pipelines[i] = (ndb_pipeline *)get_request_pipeline(i, ndb_eng);
252     if(! scheduler_initialize((ndb_pipeline *)ndb_eng->pipelines[i], & sched_opts)) {
253       logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n",
254                   ndb_eng->startup_options.scheduler);
255       abort();
256     }
257   }
258 
259   logger->log(LOG_WARNING, NULL, "done [%5.3f sec].\n",
260               (double) (timing_point(& pump_time) / (double) 1000000000));
261 
262   /* Now initialize the default engine with no options (it already has them) */
263   return_status = def_eng->engine.initialize(ndb_eng->m_default_engine, "");
264 
265   if(return_status == ENGINE_SUCCESS) {
266     set_initial_cas_ids(& ndb_eng->cas_hi, & ndb_eng->cas_lo);
267   }
268 
269   print_debug_startup_info();
270 
271   /* Listen for reconfiguration signals */
272   if(ndb_eng->startup_options.reconf_enable) {
273     start_reconfig_listener(ndb_eng->pipelines[0]);
274   }
275 
276   return return_status;
277 }
278 
279 
280 /*** destroy ***/
ndb_destroy(ENGINE_HANDLE * handle,bool force)281 static void ndb_destroy(ENGINE_HANDLE* handle, bool force)
282 {
283   struct ndb_engine* ndb_eng;
284   struct default_engine *def_eng;
285   DEBUG_ENTER();
286 
287   ndb_eng = ndb_handle(handle);
288   def_eng = default_handle(ndb_eng);
289 
290   // TODO: Shutdown just the Scheduler Global
291   for(atomic_int32_t i = 0 ; i < ndb_eng->npipelines; i ++) {
292     ndb_pipeline *p = (ndb_pipeline *)ndb_eng->pipelines[i];
293     if(p) {
294       scheduler_shutdown(p);
295       ndb_pipeline_free(p);
296     }
297   }
298 
299   disconnect_all();
300   def_eng->engine.destroy(ndb_eng->m_default_engine, force);
301 }
302 
303 
304 /* CALL FLOWS
305    ----------
306    GET:       eng.get(), eng.get_item_info()*, eng.release()*
307    DELETE:    eng.remove()
308    SET (etc): eng.allocate(), eng.item_set_cas(), eng.get_item_info(),
309                  eng.store(), eng.release()*
310    INCR:      eng.arithmetic()
311    FLUSH:     eng.flush()
312 
313    * Called only on success (ENGINE_SUCCESS or ENGINE_EWOULDBLOCK)
314 */
315 
316 
317 
318 /*** Release scheduler resources and free workitem ****/
release_and_free(workitem * wqitem)319 void release_and_free(workitem *wqitem) {
320   DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id);
321   scheduler_release(wqitem->pipeline, wqitem);
322   workitem_free(wqitem);
323 }
324 
325 
326 /*** allocate ***/
327 
328 /* Allocate gets a struct item from the slab allocator, and fills in
329    everything but the value.  It seems like we can just pass this on to
330    the default engine; we'll intercept it later in store().
331    This is also called directly from finalize_read() in the commit thread.
332 */
ndb_allocate(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const size_t nkey,const size_t nbytes,const int flags,const rel_time_t exptime)333 static ENGINE_ERROR_CODE ndb_allocate(ENGINE_HANDLE* handle,
334                                            const void* cookie,
335                                            item **item,
336                                            const void* key,
337                                            const size_t nkey,
338                                            const size_t nbytes,
339                                            const int flags,
340                                            const rel_time_t exptime)
341 {
342   struct ndb_engine* ndb_eng;
343   struct default_engine *def_eng;
344   DEBUG_ENTER_DETAIL();
345 
346   ndb_eng = ndb_handle(handle);
347   def_eng = default_handle(ndb_eng);
348 
349   return def_eng->engine.allocate(ndb_eng->m_default_engine, cookie, item,
350                                   key, nkey, nbytes, flags, exptime);
351 }
352 
353 
354 /*** remove ***/
ndb_remove(ENGINE_HANDLE * handle,const void * cookie,const void * key,const size_t nkey,uint64_t cas,uint16_t vbucket)355 static ENGINE_ERROR_CODE ndb_remove(ENGINE_HANDLE* handle,
356                                          const void* cookie,
357                                          const void* key,
358                                          const size_t nkey,
359                                          uint64_t cas,
360                                          uint16_t vbucket  __attribute__((unused)))
361 {
362   struct ndb_engine* ndb_eng = ndb_handle(handle);
363   struct default_engine *def_eng = default_handle(ndb_eng);
364   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
365   ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
366   prefix_info_t prefix;
367   workitem *wqitem;
368 
369   /* Is this a callback after completed I/O? */
370   wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
371   if(wqitem) {
372     DEBUG_PRINT_DETAIL("Got callback: %s", wqitem->status->comment);
373     ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); //pop
374     release_and_free(wqitem);
375     return wqitem->status->status;
376   }
377 
378   prefix = get_prefix_info_for_key(nkey, (const char *)key);
379   DEBUG_PRINT_DETAIL("prefix: %d", prefix.prefix_id);
380 
381   /* DELETE.
382      You should attempt the cache delete, regardless of whether the database
383      delete succeeds or fails.  So, we simply perform the cache delete first,
384      here, and then queue the database delete.
385   */
386 
387   if(prefix.do_mc_delete) {                         /* Cache Delete */
388     hash_item *it = item_get(def_eng, key, nkey);
389     return_status = ENGINE_KEY_ENOENT;
390     if (it != NULL) {
391       // ACTUALLY NO???
392       /* In the binary protocol there is such a thing as a CAS delete.
393          This is the CAS check.  If we will also be deleting from the database,
394          there are two possibilities:
395           1: The CAS matches; perform the delete.
396           2: The CAS doesn't match; delete the item because it's stale.
397          Therefore we skip the check altogether if(do_db_delete).
398       */
399       if(! prefix.do_db_delete)
400         if(cas && (cas != item_get_cas(it)))
401           return ENGINE_KEY_EEXISTS;
402 
403       item_unlink(def_eng, it);
404       item_release(def_eng, it);
405       return_status  = ENGINE_SUCCESS;
406     }
407   }
408 
409   if(prefix.do_db_delete) {                        /* Database Delete */
410     wqitem = (workitem *)new_workitem_for_delete_op(pipeline, prefix, cookie, nkey, (const char *)key, & cas);
411     DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
412     return_status = scheduler_schedule(pipeline, wqitem);
413     if(return_status != ENGINE_EWOULDBLOCK) {
414       release_and_free(wqitem);
415     }
416   }
417 
418   return return_status;
419 }
420 
421 
422 /*** release ***/
ndb_release(ENGINE_HANDLE * handle,const void * cookie,item * item)423 static void ndb_release(ENGINE_HANDLE* handle, const void *cookie,
424                         item* item)
425 {
426   struct ndb_engine* ndb_eng = ndb_handle(handle);
427   struct default_engine *def_eng = default_handle(ndb_eng);
428 
429   workitem *wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
430   if(wqitem) {
431     ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
432     release_and_free(wqitem);
433   }
434 
435   if(item && (item != wqitem)) {
436     DEBUG_PRINT_DETAIL("Releasing a hash item.");
437     item_release(def_eng, (hash_item *) item);
438   }
439 }
440 
441 
442 /*** get ***/
ndb_get(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const int nkey,uint16_t vbucket)443 static ENGINE_ERROR_CODE ndb_get(ENGINE_HANDLE* handle,
444                                  const void* cookie,
445                                  item** item,
446                                  const void* key,
447                                  const int nkey,
448                                  uint16_t vbucket __attribute__((unused)))
449 {
450   struct ndb_engine* ndb_eng = ndb_handle(handle);
451   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
452   struct workitem *wqitem;
453   prefix_info_t prefix;
454   ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
455 
456   wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
457 
458   /* Is this a callback after completed I/O? */
459   if(wqitem && ! wqitem->base.complete) {
460     DEBUG_PRINT_DETAIL("Got read callback on workitem %d.%d: %s",
461                 wqitem->pipeline->id, wqitem->id, wqitem->status->comment);
462     *item = wqitem->cache_item;
463     wqitem->base.complete = 1;
464     return_status = wqitem->status->status;
465 
466     /* On success the workitem will be read in ndb_get_item_info, then released.
467        Otherwise: */
468     if(return_status != ENGINE_SUCCESS) {
469       ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
470       release_and_free(wqitem);
471     }
472 
473     return return_status;
474   }
475 
476   prefix = get_prefix_info_for_key(nkey, (const char *)key);
477 
478   /* Cache read */
479   /* FIXME: Use the public APIs */
480   if(prefix.do_mc_read) {
481     *item = item_get(default_handle(ndb_eng), key, nkey);
482     if (*item != NULL) {
483       DEBUG_PRINT(" cache hit");
484       return ENGINE_SUCCESS;
485     }
486     DEBUG_PRINT(" cache miss");
487   }
488 
489   /* Build and send the NDB transaction */
490   if(prefix.do_db_read) {
491     wqitem = (workitem *)new_workitem_for_get_op(wqitem, pipeline, prefix, cookie, nkey, (const char *)key);
492     DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
493     return_status = scheduler_schedule(pipeline, wqitem);
494     if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
495       /* On error we must pop and free */
496       ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
497       release_and_free(wqitem);
498     }
499   }
500 
501   return return_status;
502 }
503 
504 
505 /*** get_stats ***/
ndb_get_stats(ENGINE_HANDLE * handle,const void * cookie,const char * stat_key,int nkey,ADD_STAT add_stat)506 static ENGINE_ERROR_CODE ndb_get_stats(ENGINE_HANDLE* handle,
507                                        const void *cookie,
508                                        const char *stat_key,
509                                        int nkey,
510                                        ADD_STAT add_stat)
511 {
512   struct ndb_engine* ndb_eng = ndb_handle(handle);
513   struct default_engine *def_eng = default_handle(ndb_eng);
514   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
515 
516   DEBUG_ENTER_DETAIL();
517 
518   if(stat_key) {
519     if(strncasecmp(stat_key, "menu", 4) == 0)
520       return stats_menu(add_stat, cookie);
521 
522    if((strncasecmp(stat_key, "ndb", 3) == 0)       ||
523        (strncasecmp(stat_key, "scheduler", 9) == 0) ||
524        (strncasecmp(stat_key, "reconf", 6) == 0)    ||
525        (strncasecmp(stat_key, "errors", 6) == 0))
526     {
527       /* NDB Engine stats */
528       pipeline_add_stats(pipeline, stat_key, add_stat, cookie);
529       return ENGINE_SUCCESS;
530     }
531   }
532 
533   /* Default engine stats */
534   return def_eng->engine.get_stats(ndb_eng->m_default_engine, cookie,
535                                    stat_key, nkey, add_stat);
536 }
537 
538 
539 /*** reset_stats ***/
ndb_reset_stats(ENGINE_HANDLE * handle,const void * cookie)540 static void ndb_reset_stats(ENGINE_HANDLE* handle,
541                             const void *cookie)
542 {
543   struct ndb_engine* ndb_eng = ndb_handle(handle);
544   struct default_engine *def_eng = default_handle(ndb_eng);
545   /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
546   /* DEBUG_ENTER(); */
547   def_eng->engine.reset_stats(ndb_eng->m_default_engine, cookie);
548 }
549 
550 
551 /*** store ***/
552 
ndb_store(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t * cas,ENGINE_STORE_OPERATION op,uint16_t vbucket)553 static ENGINE_ERROR_CODE ndb_store(ENGINE_HANDLE* handle,
554                                    const void *cookie,
555                                    item* item,
556                                    uint64_t *cas,
557                                    ENGINE_STORE_OPERATION op,
558                                    uint16_t vbucket  __attribute__((unused)))
559 {
560   struct ndb_engine* ndb_eng = ndb_handle(handle);
561   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
562   ENGINE_ERROR_CODE return_status = ENGINE_NOT_STORED;
563   prefix_info_t prefix;
564 
565   /* Is this a callback after completed I/O? */
566   workitem *wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
567   if(wqitem) {
568     DEBUG_PRINT_DETAIL("Got callback on workitem %d.%d: %s",
569                 pipeline->id, wqitem->id, wqitem->status->comment);
570     return wqitem->status->status;
571   }
572 
573   prefix = get_prefix_info_for_key(hash_item_get_key_len((hash_item *)item),
574                                    hash_item_get_key((hash_item *)item));
575 
576 
577   /* Build and send the NDB transaction.
578      If there is also a cache operation, it must be deferred until we know
579      whether the database operation has succeeded or failed.
580   */
581   if(prefix.do_db_write) {
582     wqitem = (workitem *)new_workitem_for_store_op(pipeline, op, prefix, cookie, (hash_item *)item, cas);
583     DEBUG_PRINT("[%s] prefix %d; CAS %llu; use mc/db: %d/%d  --  creating workitem %d.%d",
584                 set_ops[op], prefix.prefix_id, cas ? *cas : 0,
585                 prefix.do_mc_write, prefix.do_db_write,
586                 pipeline->id, wqitem->id);
587     return_status = scheduler_schedule(pipeline, wqitem);
588     if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
589       ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
590       release_and_free(wqitem);
591     }
592   }
593   else if(prefix.do_mc_write) {
594     DEBUG_PRINT("[%s] prefix %d; CAS %llu; use mc/db: %d/%d --  cache-only store.",
595                 set_ops[op], prefix.prefix_id, cas ? *cas : 0,
596                 prefix.do_mc_write, prefix.do_db_write);
597     return_status = store_item(default_handle(ndb_eng), (hash_item *)item, cas, op, cookie);
598   }
599 
600   return return_status;
601 }
602 
603 
604 /*** arithmetic ***/
ndb_arithmetic(ENGINE_HANDLE * handle,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result,uint16_t vbucket)605 static ENGINE_ERROR_CODE ndb_arithmetic(ENGINE_HANDLE* handle,
606                                         const void* cookie,
607                                         const void* key,
608                                         const int nkey,
609                                         const bool increment,
610                                         const bool create,
611                                         const uint64_t delta,
612                                         const uint64_t initial,
613                                         const rel_time_t exptime,
614                                         uint64_t *cas,
615                                         uint64_t *result,
616                                         uint16_t vbucket)
617 {
618   struct ndb_engine* ndb_eng = ndb_handle(handle);
619   struct default_engine *def_eng = default_handle(ndb_eng);
620   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
621   struct workitem *wqitem;
622   prefix_info_t prefix;
623   ENGINE_ERROR_CODE return_status;
624 
625   /* Is this a callback after completed I/O? */
626   wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
627   if(wqitem && ! wqitem->base.complete) {
628     DEBUG_PRINT_DETAIL("Got arithmetic callback: %s", wqitem->status->comment);
629     return_status = wqitem->status->status;
630     wqitem->base.complete = 1;
631     *result = wqitem->math_value;
632     /* There will be no call to release(), so pop and free now. */
633     ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
634     release_and_free(wqitem);
635     return return_status;
636   }
637 
638   prefix = get_prefix_info_for_key(nkey, (const char *)key);
639   DEBUG_PRINT("prefix: %d   delta: %d  create: %d   initial: %d ",
640                      prefix.prefix_id, (int) delta, (int) create, (int) initial);
641 
642   /* For cache-only prefixes, forward this to the default engine */
643   if(! prefix.use_ndb) {
644     return def_eng->engine.arithmetic(ndb_eng->m_default_engine, cookie,
645                                       key, nkey, increment, create,
646                                       delta, initial, exptime, cas,
647                                       result, vbucket);
648   }
649 
650   /* A math operation contains both a read and a write. */
651   if(! (prefix.has_math_col && prefix.do_db_read && prefix.do_db_write)) {
652     logger->log(LOG_WARNING, 0, "NDB INCR/DECR is not allowed for this key.\n");
653     DEBUG_PRINT("REJECTED : %d %d %d", (int) prefix.has_math_col,
654                        (int) prefix.do_db_read , (int)  prefix.do_db_write);
655     return ENGINE_NOT_STORED;
656   }
657 
658   wqitem = (workitem *)new_workitem_for_arithmetic(pipeline, prefix, cookie, (const char *)key, nkey,
659                                                    increment, create, delta, initial, cas);
660   DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
661 
662   return_status = scheduler_schedule(pipeline, wqitem);
663 
664   if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS)))
665     release_and_free(wqitem);
666 
667   return return_status;
668 }
669 
670 /*** flush ***/
ndb_flush(ENGINE_HANDLE * handle,const void * cookie,time_t when)671 static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle,
672                                    const void* cookie, time_t when)
673 {
674 /*
675    Notes on flush:
676    The server will call *only* into ndb_flush (not to allocate or release).
677    The NDB engine ignores the "when" parameter.
678    Flush operations have special handling, outside of the scheduler.
679    They are performed synchronously.
680    And we always send the flush command to the cache engine.
681 */
682   struct ndb_engine* ndb_eng;
683   struct default_engine *def_eng;
684   ndb_pipeline *pipeline;
685   DEBUG_ENTER();
686 
687   ndb_eng = ndb_handle(handle);
688   def_eng = default_handle(ndb_eng);
689   pipeline = get_my_pipeline_config(ndb_eng);
690 
691   (void) def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when);
692   // TODO: Why not return ENGINE_EWOULDBLOCK first?
693   return pipeline_flush_all(pipeline);
694 }
695 
696 
697 /*** unknown_command ***/
ndb_unknown_command(ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)698 static ENGINE_ERROR_CODE ndb_unknown_command(ENGINE_HANDLE* handle,
699                                              const void* cookie,
700                                              protocol_binary_request_header *request,
701                                              ADD_RESPONSE response)
702 {
703   struct ndb_engine* ndb_eng;
704   struct default_engine *def_eng;
705   DEBUG_ENTER();
706 
707   ndb_eng = ndb_handle(handle);
708   def_eng = default_handle(ndb_eng);
709 
710   return def_eng->engine.unknown_command(ndb_eng->m_default_engine, cookie,
711                                          request, response);
712 }
713 
714 
715 /*** get_item_info ***/
ndb_get_item_info(ENGINE_HANDLE * handle,const void * cookie,const item * item,item_info * item_info)716 static bool ndb_get_item_info(ENGINE_HANDLE *handle,
717                               const void *cookie,
718                               const item* item,
719                               item_info *item_info)
720 {
721   struct ndb_engine* ndb_eng = ndb_handle(handle);
722   struct default_engine *def_eng = default_handle(ndb_eng);
723 
724   workitem *wqitem = (workitem *)ndb_eng->server.cookie->get_engine_specific(cookie);
725 
726   if(wqitem == NULL) {
727     DEBUG_PRINT_DETAIL(" cache-only");
728     return def_eng->engine.get_item_info(handle, cookie, item, item_info);
729   }
730 
731   if (item_info->nvalue < 1) {
732     DEBUG_PRINT_DETAIL("nvalue too small.");
733     return false;
734   }
735 
736   if(wqitem->base.has_value) {
737     /* Use the workitem. */
738     item_info->cas = wqitem->cas ? *(wqitem->cas) : 0;
739     item_info->exptime = 0;
740     item_info->nbytes = wqitem->value_size;
741     item_info->flags = wqitem->math_flags;
742     item_info->clsid = slabs_clsid(default_handle(ndb_eng), wqitem->value_size);
743     item_info->nkey = wqitem->base.nkey;
744     item_info->nvalue = 1;  /* how many iovecs */
745     item_info->key = wqitem->key;
746     item_info->value[0].iov_base = wqitem->value_ptr;
747     item_info->value[0].iov_len = wqitem->value_size;
748     DEBUG_PRINT_DETAIL("workitem %d.%d [%s].", wqitem->pipeline->id, wqitem->id,
749                        workitem_get_operation(wqitem));
750     return true;
751   }
752   else {
753     /* Use a hash item */
754     hash_item *it = (hash_item*) item;
755     item_info->cas = hash_item_get_cas(it);
756     item_info->exptime = it->exptime;
757     item_info->nbytes = wqitem ? wqitem->value_size : 0;
758     item_info->flags = it->flags;
759     item_info->clsid = it->slabs_clsid;
760     item_info->nkey = it->nkey;
761     item_info->nvalue = 1;
762     item_info->key = hash_item_get_key(it);
763     item_info->value[0].iov_base = hash_item_get_data(it);
764     item_info->value[0].iov_len = item_info->nbytes;
765     if(item_info->nbytes) {
766       DEBUG_PRINT_DETAIL("hash_item [KEY: %.*s][CAS: %llu][nbytes: %d].", it->nkey,
767                           hash_item_get_key(it), item_info->cas, item_info->nbytes);
768     }
769     else {
770       DEBUG_PRINT_DETAIL(" new hash_item");
771     }
772     return true;
773   }
774 }
775 
776 namespace {
777 
778 /*
779   C++ doesn't have C99's designated initializers until C++20, and we need to
780   initialize an union, so create some constructors for config_item as convenience.
781 */
Config_item_string(const char * key,char ** value)782 config_item Config_item_string(const char *key, char **value)
783 {
784   config_item item;
785   item.key= key;
786   item.datatype= DT_STRING;
787   item.value.dt_string= value;
788   return item;
789 }
790 
Config_item_bool(const char * key,bool * value)791 config_item Config_item_bool(const char *key, bool *value)
792 {
793   config_item item;
794   item.key= key;
795   item.datatype= DT_BOOL;
796   item.value.dt_bool= value;
797   return item;
798 }
799 
Config_item_size_t(const char * key,size_t * value)800 config_item Config_item_size_t(const char *key, size_t *value)
801 {
802   config_item item;
803   item.key= key;
804   item.datatype= DT_SIZE;
805   item.value.dt_size= value;
806   return item;
807 }
808 
Config_item_float(const char * key,float * value)809 config_item Config_item_float(const char *key, float *value)
810 {
811   config_item item;
812   item.key= key;
813   item.datatype= DT_FLOAT;
814   item.value.dt_float= value;
815   return item;
816 }
817 
Config_item_configfile(const char * key)818 config_item Config_item_configfile(const char *key)
819 {
820   config_item item;
821   item.key= key;
822   item.datatype= DT_CONFIGFILE;
823   return item;
824 }
825 
826 }  // namespace
827 
828 /* read_cmdline_options requires duplicating code from the default engine.
829    If the default engine supports a new option, you will need to add it here.
830    We create a single config_items structure containing options for both
831    engines.
832 */
read_cmdline_options(struct ndb_engine * ndb,struct default_engine * se,const char * conf)833 void read_cmdline_options(struct ndb_engine *ndb, struct default_engine *se,
834                           const char * conf)
835 {
836   int did_parse;
837   DEBUG_ENTER();
838 
839   did_parse = 0;   /* 0 = success from parse_config() */
840 
841   if (conf != NULL) {
842     struct config_item items[] = {
843       /* NDB OPTIONS */
844       Config_item_string("connectstring", (char **) &(ndb->startup_options.connectstring)),
845       Config_item_string("role", (char **) &(ndb->startup_options.server_role)),
846       Config_item_string("scheduler", (char **) &(ndb->startup_options.scheduler)),
847 #ifdef DEBUG_OUTPUT
848       Config_item_bool("debug", &(ndb->startup_options.debug_enable)),
849       Config_item_bool("detail", &(ndb->startup_options.debug_detail)),
850 #endif
851       Config_item_bool("reconf", &(ndb->startup_options.reconf_enable)),
852 
853       /* DEFAULT ENGINE OPTIONS */
854       Config_item_bool("use_cas", &se->config.use_cas),
855       Config_item_size_t("verbose", &se->config.verbose),
856       Config_item_bool("eviction", &se->config.evict_to_free),
857       Config_item_size_t("cache_size", &se->config.maxbytes),
858       Config_item_bool("preallocate", &se->config.preallocate),
859       Config_item_float("factor", &se->config.factor),
860       Config_item_size_t("chunk_size", &se->config.chunk_size),
861       Config_item_size_t("item_size_max", &se->config.item_size_max),
862       Config_item_configfile("config_file"),
863       { nullptr }
864     };
865 
866     did_parse = se->server.core->parse_config(conf, items, stderr);
867   }
868   switch(did_parse) {
869       case -1:
870         logger->log(LOG_WARNING, NULL,
871                     "Unknown tokens in config string \"%s\"\n", conf);
872         break;
873       case 1:
874         logger->log(LOG_WARNING, NULL,
875                     "Illegal values in config string: \"%s\"\n", conf);
876         break;
877       case 0: /* success */
878         break;
879   }
880 
881   global_max_item_size = se->config.item_size_max;
882 }
883 
884 
fetch_core_settings(struct ndb_engine * engine,struct default_engine * se)885 int fetch_core_settings(struct ndb_engine *engine,
886                          struct default_engine *se) {
887 
888   /* Set up a struct config_item containing the keys we're interested in. */
889   struct config_item items[] = {
890     Config_item_bool("cas_enabled", &engine->server_options.cas_enabled),
891     Config_item_size_t("maxconns", &engine->server_options.maxconns),
892     Config_item_size_t("num_threads", &engine->server_options.nthreads),
893     Config_item_size_t("verbosity", &engine->server_options.verbose),
894     { nullptr }
895   };
896 
897   DEBUG_ENTER();
898 
899   /* This will call "stats settings" and parse the output into the config */
900   return se->server.core->get_config(items);
901 }
902 
903 
stats_menu(ADD_STAT add_stat,const void * cookie)904 ENGINE_ERROR_CODE stats_menu(ADD_STAT add_stat, const void *cookie) {
905   char key[128];
906   char val[128];
907   int klen, vlen;
908 
909   klen = sprintf(key, "ndb");
910   vlen = sprintf(val, "          NDB Engine: NDBAPI statistics");
911   add_stat(key, klen, val, vlen, cookie);
912 
913   klen = sprintf(key, "errors");
914   vlen = sprintf(val, "       NDB Engine: Error message counters");
915   add_stat(key, klen, val, vlen, cookie);
916 
917   klen = sprintf(key, "scheduler");
918   vlen = sprintf(val, "    NDB Engine: Scheduler internal statistics");
919   add_stat(key, klen, val, vlen, cookie);
920 
921   klen = sprintf(key, "reconf");
922   vlen = sprintf(val, "       NDB Engine: Current configuration version");
923   add_stat(key, klen, val, vlen, cookie);
924 
925   klen = sprintf(key, "settings");
926   vlen = sprintf(val, "     Server core: configurable settings");
927   add_stat(key, klen, val, vlen, cookie);
928 
929   klen = sprintf(key, "reset");
930   vlen = sprintf(val, "        Server core: reset counters");
931   add_stat(key, klen, val, vlen, cookie);
932 
933   klen = sprintf(key, "detail");
934   vlen = sprintf(val, "       Server core: use stats detail on|off|dump");
935   add_stat(key, klen, val, vlen, cookie);
936 
937   klen = sprintf(key, "aggregate");
938   vlen = sprintf(val, "    Server core: aggregated");
939   add_stat(key, klen, val, vlen, cookie);
940 
941   klen = sprintf(key, "slabs");
942   vlen = sprintf(val, "        Cache Engine: allocator");
943   add_stat(key, klen, val, vlen, cookie);
944 
945   klen = sprintf(key, "items");
946   vlen = sprintf(val, "        Cache Engine: itemes cached");
947   add_stat(key, klen, val, vlen, cookie);
948 
949   klen = sprintf(key, "sizes");
950   vlen = sprintf(val, "        Cache Engine: items per allocation class");
951   add_stat(key, klen, val, vlen, cookie);
952 
953   klen = sprintf(key, "vbucket");
954   vlen = sprintf(val, "      Cache Engine: dump vbucket table");
955   add_stat(key, klen, val, vlen, cookie);
956 
957   klen = sprintf(key, "scrub");
958   vlen = sprintf(val, "        Cache Engine: scrubber status");
959   add_stat(key, klen, val, vlen, cookie);
960 
961   return ENGINE_SUCCESS;
962 }
963 
964