1 /*
2  Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3  reserved.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License, version 2.0,
7  as published by the Free Software Foundation.
8 
9  This program is also distributed with certain software (including
10  but not limited to OpenSSL) that is licensed under separate terms,
11  as designated in a particular file or component or in included license
12  documentation.  The authors of MySQL hereby grant you an additional
13  permission to link the program and your derivative works with the
14  separately licensed software that they have included with MySQL.
15 
16  This program is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  GNU General Public License, version 2.0, for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with this program; if not, write to the Free Software
23  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24  02110-1301  USA
25  */
26 #include <my_config.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <assert.h>
30 #include <pthread.h>
31 
32 #include "config.h"
33 
34 #include "default_engine.h"
35 
36 #include <memcached/util.h>
37 #include <memcached/config_parser.h>
38 #include <memcached/extension.h>
39 #include <memcached/extension_loggers.h>
40 
41 #include "ndb_engine.h"
42 #include "ndb_configuration.h"
43 #include "workitem.h"
44 #include "ndb_engine_private.h"
45 #include "debug.h"
46 #include "Scheduler.h"
47 #include "thread_identifier.h"
48 #include "timing.h"
49 #include "ndb_error_logger.h"
50 
51 /* Global variables */
52 EXTENSION_LOGGER_DESCRIPTOR *logger;
53 
54 /* Static and local to this file */
55 const char * set_ops[] = { "","add","set","replace","append","prepend","cas" };
56 
57 
ndb_handle(ENGINE_HANDLE * handle)58 static inline struct ndb_engine* ndb_handle(ENGINE_HANDLE* handle)
59 {
60   return (struct ndb_engine*) handle;
61 }
62 
default_handle(struct ndb_engine * eng)63 static inline struct default_engine* default_handle(struct ndb_engine *eng)
64 {
65   return (struct default_engine*) eng->m_default_engine;
66 }
67 
68 
69 /*********** PRIVATE UTILITY FUNCTIONS BEGIN HERE ***********************/
70 
get_my_pipeline_config(struct ndb_engine * eng)71 ndb_pipeline * get_my_pipeline_config(struct ndb_engine *eng) {
72   const thread_identifier * thread_id;
73 
74   /* Try to fetch the pipeline from the thread identity */
75   thread_id = get_thread_id();
76   if(thread_id) {
77     return thread_id->pipeline;
78   }
79   else {
80     /* On the first call from each thread, initialize the pipeline */
81     return ndb_pipeline_initialize(eng);
82   }
83 }
84 
85 
86 /*********** FUNCTIONS IMPLEMENTING THE PUBLISHED API BEGIN HERE ********/
87 
create_instance(uint64_t interface,GET_SERVER_API get_server_api,ENGINE_HANDLE ** handle)88 ENGINE_ERROR_CODE create_instance(uint64_t interface,
89                                   GET_SERVER_API get_server_api,
90                                   ENGINE_HANDLE **handle ) {
91 
92   struct ndb_engine *ndb_eng;
93   const char * env_connectstring;
94   ENGINE_ERROR_CODE return_status;
95 
96   SERVER_HANDLE_V1 *api = get_server_api();
97   if (interface != 1 || api == NULL) {
98     return ENGINE_ENOTSUP;
99   }
100 
101   ndb_eng = malloc(sizeof(struct ndb_engine));
102   if(ndb_eng == NULL) {
103     return ENGINE_ENOMEM;
104   }
105 
106   logger = api->extension->get_extension(EXTENSION_LOGGER);
107 
108   ndb_eng->npipelines = 0;
109   ndb_eng->connected  = false;
110 
111   ndb_eng->engine.interface.interface = 1;
112   ndb_eng->engine.get_info         = ndb_get_info;
113   ndb_eng->engine.initialize       = ndb_initialize;
114   ndb_eng->engine.destroy          = ndb_destroy;
115   ndb_eng->engine.allocate         = ndb_allocate;
116   ndb_eng->engine.remove           = ndb_remove;
117   ndb_eng->engine.release          = ndb_release;
118   ndb_eng->engine.get              = ndb_get;
119   ndb_eng->engine.get_stats        = ndb_get_stats;
120   ndb_eng->engine.reset_stats      = ndb_reset_stats;
121   ndb_eng->engine.store            = ndb_store;
122   ndb_eng->engine.arithmetic       = ndb_arithmetic;
123   ndb_eng->engine.flush            = ndb_flush;
124   ndb_eng->engine.unknown_command  = ndb_unknown_command;
125   ndb_eng->engine.item_set_cas     = item_set_cas;           /* reused */
126   ndb_eng->engine.get_item_info    = ndb_get_item_info;
127   ndb_eng->engine.get_stats_struct = NULL;
128   ndb_eng->engine.aggregate_stats  = NULL;
129   ndb_eng->engine.tap_notify       = NULL;
130   ndb_eng->engine.get_tap_iterator = NULL;
131   ndb_eng->engine.errinfo          = NULL;
132 
133   ndb_eng->server = *api;
134   ndb_eng->get_server_api = get_server_api;
135 
136   /* configuration, with default values*/
137   ndb_eng->startup_options.connectstring = "localhost:1186";
138   ndb_eng->startup_options.server_role   = "default_role";
139   ndb_eng->startup_options.scheduler     = 0;
140   ndb_eng->startup_options.debug_enable  = false;
141   ndb_eng->startup_options.debug_detail  = false;
142   ndb_eng->startup_options.reconf_enable = true;
143 
144   /* Now let NDB_CONNECTSRING environment variable override the default */
145   env_connectstring = getenv("NDB_CONNECTSTRING");
146   if(env_connectstring)
147     ndb_eng->startup_options.connectstring = env_connectstring;
148 
149   /* Set engine informational structure */
150   ndb_eng->info.info.description = "NDB Memcache " VERSION;
151   ndb_eng->info.info.num_features = 3;
152   ndb_eng->info.info.features[0].feature = ENGINE_FEATURE_CAS;
153   ndb_eng->info.info.features[0].description = NULL;
154   ndb_eng->info.info.features[1].feature = ENGINE_FEATURE_PERSISTENT_STORAGE;
155   ndb_eng->info.info.features[1].description = NULL;
156   ndb_eng->info.info.features[2].feature = ENGINE_FEATURE_LRU;
157   ndb_eng->info.info.features[2].description = NULL;
158 
159   /* Now call create_instace() for the default engine */
160   return_status = default_engine_create_instance(interface, get_server_api,
161                                                  & (ndb_eng->m_default_engine));
162 
163   if(return_status == ENGINE_SUCCESS)
164     *handle = (ENGINE_HANDLE*) &ndb_eng->engine;
165 
166   return return_status;
167 }
168 
169 
170 /*** get_info ***/
ndb_get_info(ENGINE_HANDLE * handle)171 static const engine_info* ndb_get_info(ENGINE_HANDLE* handle)
172 {
173   return & ndb_handle(handle)->info.info;
174 }
175 
176 
177 /*** initialize ***/
ndb_initialize(ENGINE_HANDLE * handle,const char * config_str)178 static ENGINE_ERROR_CODE ndb_initialize(ENGINE_HANDLE* handle,
179                                         const char* config_str)
180 {
181   int i, nthreads, debug_level;
182   time_point_t pump_time = 0;
183   ENGINE_ERROR_CODE return_status;
184   struct ndb_engine *ndb_eng = ndb_handle(handle);
185   struct default_engine *def_eng = default_handle(ndb_eng);
186   scheduler_options sched_opts;
187 
188   /* Process options for both the ndb engine and the default engine:  */
189   read_cmdline_options(ndb_eng, def_eng, config_str);
190 
191   /* Initalize the debug library */
192   if(ndb_eng->startup_options.debug_detail)
193     debug_level = 2;
194   else if(ndb_eng->startup_options.debug_enable)
195     debug_level = 1;
196   else debug_level = 0;
197   DEBUG_INIT(NULL, debug_level);
198   DEBUG_ENTER();
199 
200   /* Connect to the Primary cluster */
201   if(!(connect_to_primary_cluster(ndb_eng->startup_options.connectstring,
202                                   ndb_eng->startup_options.server_role))) {
203      logger->log(LOG_WARNING, 0, "Could not connect to NDB.  Shutting down.\n");
204      return ENGINE_FAILED;
205   }
206   ndb_eng->connected = true;
207 
208   /* Read configuration */
209   if(!(get_config())) {
210      logger->log(LOG_WARNING, 0, "Failed to read configuration -- shutting down.\n"
211                  "(Did you run ndb_memcache_metadata.sql?)\n");
212      return ENGINE_FAILED;
213   }
214 
215   /* Connect to additional clusters */
216   if(! open_connections_to_all_clusters()) {
217     logger->log(LOG_WARNING, 0, "open_connections_to_all_clusters() failed \n");
218    return ENGINE_FAILED;
219   }
220 
221   /* Initialize Thread-Specific Storage */
222   initialize_thread_id_key();
223 
224   /* Fetch some settings from the memcached core */
225   fetch_core_settings(ndb_eng, def_eng);
226   nthreads = ndb_eng->server_options.nthreads;
227 
228   /* Initialize the error handler */
229   ndb_error_logger_init(def_eng->server.core, ndb_eng->server_options.verbose);
230 
231   logger->log(LOG_WARNING, NULL, "Server started with %d threads.\n", nthreads);
232   logger->log(LOG_WARNING, NULL, "Priming the pump ... ");
233   timing_point(& pump_time);
234 
235   /* prefetch data dictionary objects */
236   prefetch_dictionary_objects();
237 
238   /* Build the scheduler options structure */
239   sched_opts.nthreads = ndb_eng->server_options.nthreads;
240   sched_opts.max_clients = ndb_eng->server_options.maxconns;
241 
242   /* Allocate and initailize the pipelines, and their schedulers.
243      This will take some time; each pipeline creates slab and pool allocators,
244      and each scheduler may preallocate a large number of Ndb objects and
245      transactions, requiring multiple round trips to the data nodes.  We
246      do this now to avoid the latency cost of setting up those objects at
247      runtime.
248      When the first request comes in, the pipeline, scheduler, and thread
249      will all get stitched together.
250   */
251   ndb_eng->pipelines  = malloc(nthreads * sizeof(void *));
252   for(i = 0 ; i < nthreads ; i++) {
253     ndb_eng->pipelines[i] = get_request_pipeline(i, ndb_eng);
254     if(! scheduler_initialize(ndb_eng->pipelines[i], & sched_opts)) {
255       logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n",
256                   ndb_eng->startup_options.scheduler);
257       abort();
258     }
259   }
260 
261   logger->log(LOG_WARNING, NULL, "done [%5.3f sec].\n",
262               (double) (timing_point(& pump_time) / (double) 1000000000));
263 
264   /* Now initialize the default engine with no options (it already has them) */
265   return_status = def_eng->engine.initialize(ndb_eng->m_default_engine, "");
266 
267   if(return_status == ENGINE_SUCCESS) {
268     set_initial_cas_ids(& ndb_eng->cas_hi, & ndb_eng->cas_lo);
269   }
270 
271   print_debug_startup_info();
272 
273   /* Listen for reconfiguration signals */
274   if(ndb_eng->startup_options.reconf_enable) {
275     start_reconfig_listener(ndb_eng->pipelines[0]);
276   }
277 
278   return return_status;
279 }
280 
281 
282 /*** destroy ***/
ndb_destroy(ENGINE_HANDLE * handle,bool force)283 static void ndb_destroy(ENGINE_HANDLE* handle, bool force)
284 {
285   struct ndb_engine* ndb_eng;
286   struct default_engine *def_eng;
287   DEBUG_ENTER();
288 
289   ndb_eng = ndb_handle(handle);
290   def_eng = default_handle(ndb_eng);
291 
292   // TODO: Shutdown just the Scheduler Global
293   for(atomic_int32_t i = 0 ; i < ndb_eng->npipelines; i ++) {
294     ndb_pipeline *p = ndb_eng->pipelines[i];
295     if(p) {
296       scheduler_shutdown(p);
297       ndb_pipeline_free(p);
298     }
299   }
300 
301   disconnect_all();
302   def_eng->engine.destroy(ndb_eng->m_default_engine, force);
303 }
304 
305 
306 /* CALL FLOWS
307    ----------
308    GET:       eng.get(), eng.get_item_info()*, eng.release()*
309    DELETE:    eng.remove()
310    SET (etc): eng.allocate(), eng.item_set_cas(), eng.get_item_info(),
311                  eng.store(), eng.release()*
312    INCR:      eng.arithmetic()
313    FLUSH:     eng.flush()
314 
315    * Called only on success (ENGINE_SUCCESS or ENGINE_EWOULDBLOCK)
316 */
317 
318 
319 
320 /*** Release scheduler resources and free workitem ****/
release_and_free(workitem * wqitem)321 void release_and_free(workitem *wqitem) {
322   DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id);
323   scheduler_release(wqitem->pipeline, wqitem);
324   workitem_free(wqitem);
325 }
326 
327 
328 /*** allocate ***/
329 
330 /* Allocate gets a struct item from the slab allocator, and fills in
331    everything but the value.  It seems like we can just pass this on to
332    the default engine; we'll intercept it later in store().
333    This is also called directly from finalize_read() in the commit thread.
334 */
ndb_allocate(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const size_t nkey,const size_t nbytes,const int flags,const rel_time_t exptime)335 static ENGINE_ERROR_CODE ndb_allocate(ENGINE_HANDLE* handle,
336                                            const void* cookie,
337                                            item **item,
338                                            const void* key,
339                                            const size_t nkey,
340                                            const size_t nbytes,
341                                            const int flags,
342                                            const rel_time_t exptime)
343 {
344   struct ndb_engine* ndb_eng;
345   struct default_engine *def_eng;
346   DEBUG_ENTER_DETAIL();
347 
348   ndb_eng = ndb_handle(handle);
349   def_eng = default_handle(ndb_eng);
350 
351   return def_eng->engine.allocate(ndb_eng->m_default_engine, cookie, item,
352                                   key, nkey, nbytes, flags, exptime);
353 }
354 
355 
356 /*** remove ***/
ndb_remove(ENGINE_HANDLE * handle,const void * cookie,const void * key,const size_t nkey,uint64_t cas,uint16_t vbucket)357 static ENGINE_ERROR_CODE ndb_remove(ENGINE_HANDLE* handle,
358                                          const void* cookie,
359                                          const void* key,
360                                          const size_t nkey,
361                                          uint64_t cas,
362                                          uint16_t vbucket  __attribute__((unused)))
363 {
364   struct ndb_engine* ndb_eng = ndb_handle(handle);
365   struct default_engine *def_eng = default_handle(ndb_eng);
366   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
367   ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
368   prefix_info_t prefix;
369   workitem *wqitem;
370 
371   /* Is this a callback after completed I/O? */
372   wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
373   if(wqitem) {
374     DEBUG_PRINT_DETAIL("Got callback: %s", wqitem->status->comment);
375     ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); //pop
376     release_and_free(wqitem);
377     return wqitem->status->status;
378   }
379 
380   prefix = get_prefix_info_for_key(nkey, key);
381   DEBUG_PRINT_DETAIL("prefix: %d", prefix.prefix_id);
382 
383   /* DELETE.
384      You should attempt the cache delete, regardless of whether the database
385      delete succeeds or fails.  So, we simply perform the cache delete first,
386      here, and then queue the database delete.
387   */
388 
389   if(prefix.do_mc_delete) {                         /* Cache Delete */
390     hash_item *it = item_get(def_eng, key, nkey);
391     return_status = ENGINE_KEY_ENOENT;
392     if (it != NULL) {
393       // ACTUALLY NO???
394       /* In the binary protocol there is such a thing as a CAS delete.
395          This is the CAS check.  If we will also be deleting from the database,
396          there are two possibilities:
397           1: The CAS matches; perform the delete.
398           2: The CAS doesn't match; delete the item because it's stale.
399          Therefore we skip the check altogether if(do_db_delete).
400       */
401       if(! prefix.do_db_delete)
402         if(cas && (cas != item_get_cas(it)))
403           return ENGINE_KEY_EEXISTS;
404 
405       item_unlink(def_eng, it);
406       item_release(def_eng, it);
407       return_status  = ENGINE_SUCCESS;
408     }
409   }
410 
411   if(prefix.do_db_delete) {                        /* Database Delete */
412     wqitem = new_workitem_for_delete_op(pipeline, prefix, cookie, nkey, key, & cas);
413     DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
414     return_status = scheduler_schedule(pipeline, wqitem);
415     if(return_status != ENGINE_EWOULDBLOCK) {
416       release_and_free(wqitem);
417     }
418   }
419 
420   return return_status;
421 }
422 
423 
424 /*** release ***/
ndb_release(ENGINE_HANDLE * handle,const void * cookie,item * item)425 static void ndb_release(ENGINE_HANDLE* handle, const void *cookie,
426                         item* item)
427 {
428   struct ndb_engine* ndb_eng = ndb_handle(handle);
429   struct default_engine *def_eng = default_handle(ndb_eng);
430 
431   workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
432   if(wqitem) {
433     ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
434     release_and_free(wqitem);
435   }
436 
437   if(item && (item != wqitem)) {
438     DEBUG_PRINT_DETAIL("Releasing a hash item.");
439     item_release(def_eng, (hash_item *) item);
440   }
441 }
442 
443 
444 /*** get ***/
ndb_get(ENGINE_HANDLE * handle,const void * cookie,item ** item,const void * key,const int nkey,uint16_t vbucket)445 static ENGINE_ERROR_CODE ndb_get(ENGINE_HANDLE* handle,
446                                  const void* cookie,
447                                  item** item,
448                                  const void* key,
449                                  const int nkey,
450                                  uint16_t vbucket __attribute__((unused)))
451 {
452   struct ndb_engine* ndb_eng = ndb_handle(handle);
453   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
454   struct workitem *wqitem;
455   prefix_info_t prefix;
456   ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
457 
458   wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
459 
460   /* Is this a callback after completed I/O? */
461   if(wqitem && ! wqitem->base.complete) {
462     DEBUG_PRINT_DETAIL("Got read callback on workitem %d.%d: %s",
463                 wqitem->pipeline->id, wqitem->id, wqitem->status->comment);
464     *item = wqitem->cache_item;
465     wqitem->base.complete = 1;
466     return_status = wqitem->status->status;
467 
468     /* On success the workitem will be read in ndb_get_item_info, then released.
469        Otherwise: */
470     if(return_status != ENGINE_SUCCESS) {
471       ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
472       release_and_free(wqitem);
473     }
474 
475     return return_status;
476   }
477 
478   prefix = get_prefix_info_for_key(nkey, key);
479 
480   /* Cache read */
481   /* FIXME: Use the public APIs */
482   if(prefix.do_mc_read) {
483     *item = item_get(default_handle(ndb_eng), key, nkey);
484     if (*item != NULL) {
485       DEBUG_PRINT(" cache hit");
486       return ENGINE_SUCCESS;
487     }
488     DEBUG_PRINT(" cache miss");
489   }
490 
491   /* Build and send the NDB transaction */
492   if(prefix.do_db_read) {
493     wqitem = new_workitem_for_get_op(wqitem, pipeline, prefix, cookie, nkey, key);
494     DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
495     return_status = scheduler_schedule(pipeline, wqitem);
496     if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
497       /* On error we must pop and free */
498       ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
499       release_and_free(wqitem);
500     }
501   }
502 
503   return return_status;
504 }
505 
506 
507 /*** get_stats ***/
ndb_get_stats(ENGINE_HANDLE * handle,const void * cookie,const char * stat_key,int nkey,ADD_STAT add_stat)508 static ENGINE_ERROR_CODE ndb_get_stats(ENGINE_HANDLE* handle,
509                                        const void *cookie,
510                                        const char *stat_key,
511                                        int nkey,
512                                        ADD_STAT add_stat)
513 {
514   struct ndb_engine* ndb_eng = ndb_handle(handle);
515   struct default_engine *def_eng = default_handle(ndb_eng);
516   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
517 
518   DEBUG_ENTER_DETAIL();
519 
520   if(stat_key) {
521     if(strncasecmp(stat_key, "menu", 4) == 0)
522       return stats_menu(add_stat, cookie);
523 
524    if((strncasecmp(stat_key, "ndb", 3) == 0)       ||
525        (strncasecmp(stat_key, "scheduler", 9) == 0) ||
526        (strncasecmp(stat_key, "reconf", 6) == 0)    ||
527        (strncasecmp(stat_key, "errors", 6) == 0))
528     {
529       /* NDB Engine stats */
530       pipeline_add_stats(pipeline, stat_key, add_stat, cookie);
531       return ENGINE_SUCCESS;
532     }
533   }
534 
535   /* Default engine stats */
536   return def_eng->engine.get_stats(ndb_eng->m_default_engine, cookie,
537                                    stat_key, nkey, add_stat);
538 }
539 
540 
541 /*** reset_stats ***/
ndb_reset_stats(ENGINE_HANDLE * handle,const void * cookie)542 static void ndb_reset_stats(ENGINE_HANDLE* handle,
543                             const void *cookie)
544 {
545   struct ndb_engine* ndb_eng = ndb_handle(handle);
546   struct default_engine *def_eng = default_handle(ndb_eng);
547   /* ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); */
548   /* DEBUG_ENTER(); */
549   def_eng->engine.reset_stats(ndb_eng->m_default_engine, cookie);
550 }
551 
552 
553 /*** store ***/
554 
ndb_store(ENGINE_HANDLE * handle,const void * cookie,item * item,uint64_t * cas,ENGINE_STORE_OPERATION op,uint16_t vbucket)555 static ENGINE_ERROR_CODE ndb_store(ENGINE_HANDLE* handle,
556                                    const void *cookie,
557                                    item* item,
558                                    uint64_t *cas,
559                                    ENGINE_STORE_OPERATION op,
560                                    uint16_t vbucket  __attribute__((unused)))
561 {
562   struct ndb_engine* ndb_eng = ndb_handle(handle);
563   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
564   ENGINE_ERROR_CODE return_status = ENGINE_NOT_STORED;
565   prefix_info_t prefix;
566 
567   /* Is this a callback after completed I/O? */
568   workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
569   if(wqitem) {
570     DEBUG_PRINT_DETAIL("Got callback on workitem %d.%d: %s",
571                 pipeline->id, wqitem->id, wqitem->status->comment);
572     return wqitem->status->status;
573   }
574 
575   prefix = get_prefix_info_for_key(hash_item_get_key_len(item),
576                                    hash_item_get_key(item));
577 
578 
579   /* Build and send the NDB transaction.
580      If there is also a cache operation, it must be deferred until we know
581      whether the database operation has succeeded or failed.
582   */
583   if(prefix.do_db_write) {
584     wqitem = new_workitem_for_store_op(pipeline, op, prefix, cookie, item, cas);
585     DEBUG_PRINT("[%s] prefix %d; CAS %llu; use mc/db: %d/%d  --  creating workitem %d.%d",
586                 set_ops[op], prefix.prefix_id, cas ? *cas : 0,
587                 prefix.do_mc_write, prefix.do_db_write,
588                 pipeline->id, wqitem->id);
589     return_status = scheduler_schedule(pipeline, wqitem);
590     if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
591       ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
592       release_and_free(wqitem);
593     }
594   }
595   else if(prefix.do_mc_write) {
596     DEBUG_PRINT("[%s] prefix %d; CAS %llu; use mc/db: %d/%d --  cache-only store.",
597                 set_ops[op], prefix.prefix_id, cas ? *cas : 0,
598                 prefix.do_mc_write, prefix.do_db_write);
599     return_status = store_item(default_handle(ndb_eng), item, cas, op, cookie);
600   }
601 
602   return return_status;
603 }
604 
605 
606 /*** arithmetic ***/
ndb_arithmetic(ENGINE_HANDLE * handle,const void * cookie,const void * key,const int nkey,const bool increment,const bool create,const uint64_t delta,const uint64_t initial,const rel_time_t exptime,uint64_t * cas,uint64_t * result,uint16_t vbucket)607 static ENGINE_ERROR_CODE ndb_arithmetic(ENGINE_HANDLE* handle,
608                                         const void* cookie,
609                                         const void* key,
610                                         const int nkey,
611                                         const bool increment,
612                                         const bool create,
613                                         const uint64_t delta,
614                                         const uint64_t initial,
615                                         const rel_time_t exptime,
616                                         uint64_t *cas,
617                                         uint64_t *result,
618                                         uint16_t vbucket)
619 {
620   struct ndb_engine* ndb_eng = ndb_handle(handle);
621   struct default_engine *def_eng = default_handle(ndb_eng);
622   ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
623   struct workitem *wqitem;
624   prefix_info_t prefix;
625   ENGINE_ERROR_CODE return_status;
626 
627   /* Is this a callback after completed I/O? */
628   wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
629   if(wqitem && ! wqitem->base.complete) {
630     DEBUG_PRINT_DETAIL("Got arithmetic callback: %s", wqitem->status->comment);
631     return_status = wqitem->status->status;
632     wqitem->base.complete = 1;
633     *result = wqitem->math_value;
634     /* There will be no call to release(), so pop and free now. */
635     ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
636     release_and_free(wqitem);
637     return return_status;
638   }
639 
640   prefix = get_prefix_info_for_key(nkey, key);
641   DEBUG_PRINT("prefix: %d   delta: %d  create: %d   initial: %d ",
642                      prefix.prefix_id, (int) delta, (int) create, (int) initial);
643 
644   /* For cache-only prefixes, forward this to the default engine */
645   if(! prefix.use_ndb) {
646     return def_eng->engine.arithmetic(ndb_eng->m_default_engine, cookie,
647                                       key, nkey, increment, create,
648                                       delta, initial, exptime, cas,
649                                       result, vbucket);
650   }
651 
652   /* A math operation contains both a read and a write. */
653   if(! (prefix.has_math_col && prefix.do_db_read && prefix.do_db_write)) {
654     logger->log(LOG_WARNING, 0, "NDB INCR/DECR is not allowed for this key.\n");
655     DEBUG_PRINT("REJECTED : %d %d %d", (int) prefix.has_math_col,
656                        (int) prefix.do_db_read , (int)  prefix.do_db_write);
657     return ENGINE_NOT_STORED;
658   }
659 
660   wqitem = new_workitem_for_arithmetic(pipeline, prefix, cookie, key, nkey,
661                                        increment, create, delta, initial, cas);
662   DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
663 
664   return_status = scheduler_schedule(pipeline, wqitem);
665 
666   if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS)))
667     release_and_free(wqitem);
668 
669   return return_status;
670 }
671 
672 /*** flush ***/
ndb_flush(ENGINE_HANDLE * handle,const void * cookie,time_t when)673 static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle,
674                                    const void* cookie, time_t when)
675 {
676 /*
677    Notes on flush:
678    The server will call *only* into ndb_flush (not to allocate or release).
679    The NDB engine ignores the "when" parameter.
680    Flush operations have special handling, outside of the scheduler.
681    They are performed synchronously.
682    And we always send the flush command to the cache engine.
683 */
684   struct ndb_engine* ndb_eng;
685   struct default_engine *def_eng;
686   ndb_pipeline *pipeline;
687   DEBUG_ENTER();
688 
689   ndb_eng = ndb_handle(handle);
690   def_eng = default_handle(ndb_eng);
691   pipeline = get_my_pipeline_config(ndb_eng);
692 
693   (void) def_eng->engine.flush(ndb_eng->m_default_engine, cookie, when);
694   // TODO: Why not return ENGINE_EWOULDBLOCK first?
695   return pipeline_flush_all(pipeline);
696 }
697 
698 
699 /*** unknown_command ***/
ndb_unknown_command(ENGINE_HANDLE * handle,const void * cookie,protocol_binary_request_header * request,ADD_RESPONSE response)700 static ENGINE_ERROR_CODE ndb_unknown_command(ENGINE_HANDLE* handle,
701                                              const void* cookie,
702                                              protocol_binary_request_header *request,
703                                              ADD_RESPONSE response)
704 {
705   struct ndb_engine* ndb_eng;
706   struct default_engine *def_eng;
707   DEBUG_ENTER();
708 
709   ndb_eng = ndb_handle(handle);
710   def_eng = default_handle(ndb_eng);
711 
712   return def_eng->engine.unknown_command(ndb_eng->m_default_engine, cookie,
713                                          request, response);
714 }
715 
716 
717 /*** get_item_info ***/
ndb_get_item_info(ENGINE_HANDLE * handle,const void * cookie,const item * item,item_info * item_info)718 static bool ndb_get_item_info(ENGINE_HANDLE *handle,
719                               const void *cookie,
720                               const item* item,
721                               item_info *item_info)
722 {
723   struct ndb_engine* ndb_eng = ndb_handle(handle);
724   struct default_engine *def_eng = default_handle(ndb_eng);
725 
726   workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
727 
728   if(wqitem == NULL) {
729     DEBUG_PRINT_DETAIL(" cache-only");
730     return def_eng->engine.get_item_info(handle, cookie, item, item_info);
731   }
732 
733   if (item_info->nvalue < 1) {
734     DEBUG_PRINT_DETAIL("nvalue too small.");
735     return false;
736   }
737 
738   if(wqitem->base.has_value) {
739     /* Use the workitem. */
740     item_info->cas = wqitem->cas ? *(wqitem->cas) : 0;
741     item_info->exptime = 0;
742     item_info->nbytes = wqitem->value_size;
743     item_info->flags = wqitem->math_flags;
744     item_info->clsid = slabs_clsid(default_handle(ndb_eng), wqitem->value_size);
745     item_info->nkey = wqitem->base.nkey;
746     item_info->nvalue = 1;  /* how many iovecs */
747     item_info->key = wqitem->key;
748     item_info->value[0].iov_base = wqitem->value_ptr;
749     item_info->value[0].iov_len = wqitem->value_size;
750     DEBUG_PRINT_DETAIL("workitem %d.%d [%s].", wqitem->pipeline->id, wqitem->id,
751                        workitem_get_operation(wqitem));
752     return true;
753   }
754   else {
755     /* Use a hash item */
756     hash_item *it = (hash_item*) item;
757     item_info->cas = hash_item_get_cas(it);
758     item_info->exptime = it->exptime;
759     item_info->nbytes = wqitem ? wqitem->value_size : 0;
760     item_info->flags = it->flags;
761     item_info->clsid = it->slabs_clsid;
762     item_info->nkey = it->nkey;
763     item_info->nvalue = 1;
764     item_info->key = hash_item_get_key(it);
765     item_info->value[0].iov_base = hash_item_get_data(it);
766     item_info->value[0].iov_len = item_info->nbytes;
767     if(item_info->nbytes) {
768       DEBUG_PRINT_DETAIL("hash_item [KEY: %.*s][CAS: %llu][nbytes: %d].", it->nkey,
769                           hash_item_get_key(it), item_info->cas, item_info->nbytes);
770     }
771     else {
772       DEBUG_PRINT_DETAIL(" new hash_item");
773     }
774     return true;
775   }
776 }
777 
778 
779 /* read_cmdline_options requires duplicating code from the default engine.
780    If the default engine supports a new option, you will need to add it here.
781    We create a single config_items structure containing options for both
782    engines.
783 */
read_cmdline_options(struct ndb_engine * ndb,struct default_engine * se,const char * conf)784 void read_cmdline_options(struct ndb_engine *ndb, struct default_engine *se,
785                           const char * conf)
786 {
787   int did_parse;
788   DEBUG_ENTER();
789 
790   did_parse = 0;   /* 0 = success from parse_config() */
791 
792   if (conf != NULL) {
793     struct config_item items[] = {
794       /* NDB OPTIONS */
795       { .key = "connectstring",
796         .datatype = DT_STRING,
797         .value.dt_string = (char **) &(ndb->startup_options.connectstring) },
798       { .key = "role",
799         .datatype = DT_STRING,
800         .value.dt_string = (char **) &(ndb->startup_options.server_role) },
801       { .key = "scheduler",
802         .datatype = DT_STRING,
803         .value.dt_string = (char **) &(ndb->startup_options.scheduler) },
804 #ifdef DEBUG_OUTPUT
805       { .key = "debug",
806         .datatype = DT_BOOL,
807         .value.dt_bool = &(ndb->startup_options.debug_enable) },
808       { .key = "detail",
809         .datatype = DT_BOOL,
810         .value.dt_bool = &(ndb->startup_options.debug_detail) },
811 #endif
812       { .key = "reconf",
813         .datatype = DT_BOOL,
814         .value.dt_bool = &(ndb->startup_options.reconf_enable) },
815 
816       /* DEFAULT ENGINE OPTIONS */
817       { .key = "use_cas",
818         .datatype = DT_BOOL,
819         .value.dt_bool = &se->config.use_cas },
820       { .key = "verbose",
821         .datatype = DT_SIZE,
822         .value.dt_size = &se->config.verbose },
823       { .key = "eviction",
824         .datatype = DT_BOOL,
825         .value.dt_bool = &se->config.evict_to_free },
826       { .key = "cache_size",
827         .datatype = DT_SIZE,
828         .value.dt_size = &se->config.maxbytes },
829       { .key = "preallocate",
830         .datatype = DT_BOOL,
831         .value.dt_bool = &se->config.preallocate },
832       { .key = "factor",
833         .datatype = DT_FLOAT,
834         .value.dt_float = &se->config.factor },
835       { .key = "chunk_size",
836         .datatype = DT_SIZE,
837         .value.dt_size = &se->config.chunk_size },
838       { .key = "item_size_max",
839         .datatype = DT_SIZE,
840         .value.dt_size = &se->config.item_size_max },
841       { .key = "config_file",
842         .datatype = DT_CONFIGFILE },
843       { .key = NULL}
844     };
845 
846     did_parse = se->server.core->parse_config(conf, items, stderr);
847   }
848   switch(did_parse) {
849       case -1:
850         logger->log(LOG_WARNING, NULL,
851                     "Unknown tokens in config string \"%s\"\n", conf);
852         break;
853       case 1:
854         logger->log(LOG_WARNING, NULL,
855                     "Illegal values in config string: \"%s\"\n", conf);
856         break;
857       case 0: /* success */
858         break;
859   }
860 
861   global_max_item_size = se->config.item_size_max;
862 }
863 
864 
fetch_core_settings(struct ndb_engine * engine,struct default_engine * se)865 int fetch_core_settings(struct ndb_engine *engine,
866                          struct default_engine *se) {
867 
868   /* Set up a struct config_item containing the keys we're interested in. */
869   struct config_item items[] = {
870     { .key = "cas_enabled",
871       .datatype = DT_BOOL,
872       .value.dt_bool = &engine->server_options.cas_enabled },
873     { .key = "maxconns",
874       .datatype = DT_SIZE,
875       .value.dt_size = &engine->server_options.maxconns },
876     { .key = "num_threads",
877       .datatype = DT_SIZE,
878       .value.dt_size = &engine->server_options.nthreads },
879     { .key = "verbosity",
880       .datatype = DT_SIZE,
881       .value.dt_size = &engine->server_options.verbose },
882     { .key = NULL }
883   };
884 
885   DEBUG_ENTER();
886 
887   /* This will call "stats settings" and parse the output into the config */
888   return se->server.core->get_config(items);
889 }
890 
891 
stats_menu(ADD_STAT add_stat,const void * cookie)892 ENGINE_ERROR_CODE stats_menu(ADD_STAT add_stat, const void *cookie) {
893   char key[128];
894   char val[128];
895   int klen, vlen;
896 
897   klen = sprintf(key, "ndb");
898   vlen = sprintf(val, "          NDB Engine: NDBAPI statistics");
899   add_stat(key, klen, val, vlen, cookie);
900 
901   klen = sprintf(key, "errors");
902   vlen = sprintf(val, "       NDB Engine: Error message counters");
903   add_stat(key, klen, val, vlen, cookie);
904 
905   klen = sprintf(key, "scheduler");
906   vlen = sprintf(val, "    NDB Engine: Scheduler internal statistics");
907   add_stat(key, klen, val, vlen, cookie);
908 
909   klen = sprintf(key, "reconf");
910   vlen = sprintf(val, "       NDB Engine: Current configuration version");
911   add_stat(key, klen, val, vlen, cookie);
912 
913   klen = sprintf(key, "settings");
914   vlen = sprintf(val, "     Server core: configurable settings");
915   add_stat(key, klen, val, vlen, cookie);
916 
917   klen = sprintf(key, "reset");
918   vlen = sprintf(val, "        Server core: reset counters");
919   add_stat(key, klen, val, vlen, cookie);
920 
921   klen = sprintf(key, "detail");
922   vlen = sprintf(val, "       Server core: use stats detail on|off|dump");
923   add_stat(key, klen, val, vlen, cookie);
924 
925   klen = sprintf(key, "aggregate");
926   vlen = sprintf(val, "    Server core: aggregated");
927   add_stat(key, klen, val, vlen, cookie);
928 
929   klen = sprintf(key, "slabs");
930   vlen = sprintf(val, "        Cache Engine: allocator");
931   add_stat(key, klen, val, vlen, cookie);
932 
933   klen = sprintf(key, "items");
934   vlen = sprintf(val, "        Cache Engine: itemes cached");
935   add_stat(key, klen, val, vlen, cookie);
936 
937   klen = sprintf(key, "sizes");
938   vlen = sprintf(val, "        Cache Engine: items per allocation class");
939   add_stat(key, klen, val, vlen, cookie);
940 
941   klen = sprintf(key, "vbucket");
942   vlen = sprintf(val, "      Cache Engine: dump vbucket table");
943   add_stat(key, klen, val, vlen, cookie);
944 
945   klen = sprintf(key, "scrub");
946   vlen = sprintf(val, "        Cache Engine: scrubber status");
947   add_stat(key, klen, val, vlen, cookie);
948 
949   return ENGINE_SUCCESS;
950 }
951 
952