1 /*
2  Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3  reserved.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License, version 2.0,
7  as published by the Free Software Foundation.
8 
9  This program is also distributed with certain software (including
10  but not limited to OpenSSL) that is licensed under separate terms,
11  as designated in a particular file or component or in included license
12  documentation.  The authors of MySQL hereby grant you an additional
13  permission to link the program and your derivative works with the
14  separately licensed software that they have included with MySQL.
15 
16  This program is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  GNU General Public License, version 2.0, for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with this program; if not, write to the Free Software
23  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24  02110-1301  USA
25  */
26 
27 #include <my_config.h>
28 #include <unistd.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <assert.h>
32 #include <string.h>
33 #include <time.h>
34 
35 #include "NdbApi.hpp"
36 
37 #include <memcached/extension_loggers.h>
38 #include <memcached/util.h>
39 
40 #include "ndbmemcache_global.h"
41 #include "debug.h"
42 #include "timing.h"
43 #include "Config_v1.h"
44 #include "TableSpec.h"
45 #include "QueryPlan.h"
46 #include "Operation.h"
47 #include "ExternalValue.h"
48 #include "ndb_error_logger.h"
49 
50 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
51 
52 
53 /*********** VERSION 1 METADATA *******************/
54 
55 
config_v1(Configuration * cf)56 config_v1::config_v1(Configuration * cf) :
57   db(cf->primary_conn),
58   conf(*cf),
59   server_role_id(-1),
60   nclusters(0),
61   policies_map(0),
62   containers_map(0)
63 {
64   db.init(2);
65 };
66 
~config_v1()67 config_v1::~config_v1() {
68   DEBUG_ENTER_METHOD("config_v1 destructor");
69   if(containers_map) {
70     delete containers_map;
71   }
72   if(policies_map) {
73     policies_map->do_free_values = true;
74     delete policies_map;
75   }
76 }
77 
read_configuration()78 bool config_v1::read_configuration() {
79   DEBUG_ENTER_METHOD("config_v1::read_configuration");
80 
81   for(int i = 0 ; i < MAX_CLUSTERS ; i++) cluster_ids[i] = 0;
82 
83   containers_map  = new LookupTable<TableSpec>();
84   policies_map    = new LookupTable<prefix_info_t>();
85 
86   bool success = false;
87   NdbTransaction *tx = db.startTransaction();
88   if(tx) {
89     server_role_id = get_server_role_id(tx);
90     if(! (server_role_id < 0)) success = get_policies(tx);
91     if(success) success = get_connections(tx);
92     if(success) success = get_prefixes(server_role_id, tx);
93     if(success) {
94       log_signon(tx);
95       set_initial_cas();
96       tx->execute(NdbTransaction::Commit);
97       minor_version_config();
98     }
99     else {
100       logger->log(LOG_WARNING, 0, "Configuration failed.\n");
101       tx->execute(NdbTransaction::Rollback);
102     }
103 
104     tx->close();
105   }
106   else {
107     log_ndb_error(db.getNdbError());
108   }
109 
110   return success;
111 }
112 
113 
114 /* get_server_role_id():
115  SELECT role_id, max_tps
116  FROM memcache_server_roles
117  WHERE role_name = conf.server_role;
118  Returns the integer ID, or -1 if the record was not found.
119  */
get_server_role_id(NdbTransaction * tx)120 int config_v1::get_server_role_id(NdbTransaction *tx) {
121   uint32_t val = -1;
122 
123   TableSpec spec("ndbmemcache.memcache_server_roles",
124                  "role_name", "role_id,max_tps");
125   QueryPlan plan(&db, &spec);
126   Operation op(&plan, OP_READ);
127 
128   op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
129   op.buffer =     (char *) malloc(op.requiredBuffer());
130 
131   op.clearKeyNullBits();
132   op.setKeyPart(COL_STORE_KEY, conf.server_role, strlen(conf.server_role));
133   op.readTuple(tx);
134   tx->execute(NdbTransaction::NoCommit);
135 
136   if(tx->getNdbError().classification == NdbError::NoError) {
137     val = op.getIntValue(COL_STORE_VALUE+0);
138     conf.max_tps = op.getIntValue(COL_STORE_VALUE+1);
139   }
140   else {
141     logger->log(LOG_WARNING, 0, "\nServer role \"%s\" not found in "
142                 "configuration database.\n\n", conf.server_role);
143   }
144 
145   free(op.key_buffer);
146   free(op.buffer);
147 
148   DEBUG_PRINT("Name: \"%s\" -- ID: %d", conf.server_role, val);
149   return val;
150 }
151 
152 /* get_policies():
153  SELECT * FROM cache_policies;
154  Creates the policies map (name => prefix_info).
155  Returns true on success.
156 
157  `get_policy` ENUM('cache_only','ndb_only','caching','disabled') NOT NULL ,
158  `set_policy` ENUM('cache_only','ndb_only','caching','disabled') NOT NULL ,
159  `delete_policy` ENUM('cache_only','ndb_only','caching','disabled') NOT NULL ,
160  `flush_from_db` ENUM('false', 'true') NOT NULL DEFAULT 'false'
161  */
get_policies(NdbTransaction * tx)162 bool config_v1::get_policies(NdbTransaction *tx) {
163   DEBUG_ENTER_METHOD("config_v1::get_policies");
164   bool success = true;
165   int res;
166   TableSpec spec("ndbmemcache.cache_policies",
167                  "policy_name",
168                  "get_policy,set_policy,delete_policy,flush_from_db");
169   QueryPlan plan(&db, &spec);
170   Operation op(&plan, OP_SCAN);
171 
172   NdbScanOperation *scan = op.scanTable(tx);
173   if(! scan) {
174     log_ndb_error(tx->getNdbError());
175     success = false;
176   }
177   if(tx->execute(NdbTransaction::NoCommit)) {
178     log_ndb_error(tx->getNdbError());
179     success = false;
180   }
181 
182   res = scan->nextResult((const char **) &op.buffer, true, false);
183   while((res == 0) || (res == 2)) {
184     prefix_info_t * info = (prefix_info_t *) calloc(1, sizeof(prefix_info_t));
185 
186     char name[41];          //   `policy_name` VARCHAR(40) NOT NULL
187     size_t name_len = op.copyValue(COL_STORE_KEY, name);
188     assert(name_len > 0);
189 
190     /*  ENUM('cache_only','ndb_only','caching','disabled') NOT NULL
191      is:      1            2          3         4                   */
192     unsigned int get_policy = op.getIntValue(COL_STORE_VALUE+0);
193     assert((get_policy > 0) && (get_policy < 5));
194     if(get_policy == 1 || get_policy == 3) info->do_mc_read = 1;
195     if(get_policy == 2 || get_policy == 3) info->do_db_read = 1;
196 
197     unsigned int set_policy = op.getIntValue(COL_STORE_VALUE+1);
198     assert((set_policy > 0) && (set_policy < 5));
199     if(set_policy == 1 || set_policy == 3) info->do_mc_write = 1;
200     if(set_policy == 2 || set_policy == 3) info->do_db_write = 1;
201 
202     unsigned int del_policy = op.getIntValue(COL_STORE_VALUE+2);
203     assert((del_policy > 0) && (del_policy < 5));
204     if(del_policy == 1 || del_policy == 3) info->do_mc_delete = 1;
205     if(del_policy == 2 || del_policy == 3) info->do_db_delete = 1;
206 
207 
208     /* `flush_from_db` ENUM('false', 'true') NOT NULL DEFAULT 'false'
209      is:                   1        2          */
210     int flush_policy = op.getIntValue(COL_STORE_VALUE+3);
211     if(flush_policy == 2) info->do_db_flush = 1;
212 
213     DEBUG_PRINT("%s:  get-%d set-%d del-%d flush-%d addr-%p",
214                 name, get_policy, set_policy, del_policy, flush_policy, info);
215 
216     policies_map->insert(name, info);
217     res = scan->nextResult((const char **) &op.buffer, true, false);
218   }
219   if(res == -1) {
220     log_ndb_error(scan->getNdbError());
221     success = false;
222   }
223 
224   return success;
225 }
226 
227 /* get_connections():
228  SELECT * FROM ndb_clusters.
229  Creates the cluster_ids map <cfg_data_cluster_id => connections_index>.
230  Returns true on success.
231  */
get_connections(NdbTransaction * tx)232 bool config_v1::get_connections(NdbTransaction *tx) {
233   DEBUG_ENTER_METHOD("config_v1::get_connections");
234   bool success = true;
235   int res;
236   TableSpec spec("ndbmemcache.ndb_clusters",
237                  "cluster_id",
238                  "ndb_connectstring,microsec_rtt");
239   /* Scan the ndb_clusters table */
240   QueryPlan plan(&db, &spec);
241   Operation op(&plan, OP_SCAN);
242 
243   NdbScanOperation *scan = op.scanTable(tx);
244   if(! scan) {
245     log_ndb_error(scan->getNdbError());
246     success = false;
247   }
248   if(tx->execute(NdbTransaction::NoCommit)) {
249     log_ndb_error(tx->getNdbError());
250     success = false;
251   }
252 
253   res = scan->nextResult((const char **) &op.buffer, true, false);
254   while((res == 0) || (res == 2)) {
255     bool str_is_null;
256     char connectstring[129];
257     unsigned int rtt;
258     /*  `cluster_id` INT NOT NULL             */
259     int cfg_data_id = op.getIntValue(COL_STORE_KEY);
260 
261     /* `ndb_connectstring` VARCHAR(128) NULL  */
262     str_is_null = op.isNull(COL_STORE_VALUE+0);
263     if(! str_is_null) op.copyValue(COL_STORE_VALUE+0, connectstring);
264 
265     /* `microsec_rtt` INT UNSIGNED NOT NULL default 300  */
266     rtt = op.getIntValue(COL_STORE_VALUE+1);
267 
268     /* Add the connection to the configuration */
269     int connection_idx;
270     if(str_is_null)
271       connection_idx = conf.storeConnection(0, rtt);
272     else
273       connection_idx = conf.storeConnection(strdup(connectstring), rtt);
274 
275     DEBUG_PRINT("[%d]:  { %d => \"%s\" [rtt: %d]}", connection_idx,
276                 cfg_data_id, str_is_null ? "" : connectstring, rtt);
277 
278     nclusters++;
279     cluster_ids[connection_idx] = cfg_data_id;
280     res = scan->nextResult((const char **) &op.buffer, true, false);
281   }
282   if(res == -1) {
283     log_ndb_error(scan->getNdbError());
284     success = false;
285   }
286   DEBUG_PRINT("clusters: %d", nclusters);
287   return success;
288 }
289 
290 
get_container(char * name,NdbTransaction * tx)291 TableSpec * config_v1::get_container(char *name, NdbTransaction *tx) {
292   TableSpec *c = containers_map->find(name);
293 
294   if(c == NULL) {
295     c = get_container_record(name, tx);
296     containers_map->insert(name, c);
297   }
298   else {
299     DEBUG_PRINT("\"%s\" found in local map (\"%s\").", name, c->table_name);
300   }
301   assert(c);
302   return c;
303 }
304 
305 
get_container_record(char * name,NdbTransaction * tx)306 TableSpec * config_v1::get_container_record(char *name, NdbTransaction *tx) {
307   TableSpec *container;
308   TableSpec spec("ndbmemcache.containers",
309                  "name",
310                  "db_schema,db_table,key_columns,value_columns,flags,"
311                  "increment_column,cas_column,expire_time_column");
312   QueryPlan plan(&db, &spec);
313   Operation op(&plan, OP_READ);
314 
315   op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
316   op.buffer     = (char *) malloc(op.requiredBuffer());
317 
318   op.clearKeyNullBits();
319   op.setKeyPart(COL_STORE_KEY, name, strlen(name));
320   op.readTuple(tx);
321   tx->execute(NdbTransaction::NoCommit);
322 
323   if(tx->getNdbError().classification == NdbError::NoError) {
324     char val[256];
325     char *schema, *table, *keycols, *valcols;
326 
327     //  `db_schema` VARCHAR(250) NOT NULL,
328     //  `db_table` VARCHAR(250) NOT NULL,
329     //  `key_columns` VARCHAR(250) NOT NULL,
330     op.copyValue(COL_STORE_VALUE + 0, val); schema = strdup(val);
331     op.copyValue(COL_STORE_VALUE + 1, val); table = strdup(val);
332     op.copyValue(COL_STORE_VALUE + 2, val); keycols = strdup(val);
333 
334     //  `value_columns` VARCHAR(250),
335     // TODO: testcase for value_columns is null
336     if(op.isNull(COL_STORE_VALUE + 3)) valcols = 0;
337     else {
338       op.copyValue(COL_STORE_VALUE + 3, val);
339       valcols = strdup(val);
340     }
341 
342     /* Instantiate a TableSpec for this container */
343     container = new TableSpec(0, keycols, valcols);
344     container->setTable(schema, table);
345 
346     if(keycols) free(keycols);
347     if(valcols) free(valcols);
348 
349     //  `flags` VARCHAR(250) NOT NULL DEFAULT "0",
350     /* If the value is non-numeric, use it to set the flags_column field */
351     container->flags_column = 0;
352     container->static_flags = 0;
353     op.copyValue(COL_STORE_VALUE + 4, val);
354     if(!safe_strtoul(val, & container->static_flags))
355       container->flags_column = strdup(val);
356 
357     //  `increment_column` VARCHAR(250),
358     if(op.isNull(COL_STORE_VALUE + 5)) container->math_column = 0;
359     else {
360       op.copyValue(COL_STORE_VALUE + 5, val);
361       container->math_column = strdup(val);
362     }
363 
364     //  `cas_column` VARCHAR(250),
365     if(op.isNull(COL_STORE_VALUE + 6)) container->cas_column = 0;
366     else {
367       op.copyValue(COL_STORE_VALUE + 6, val);
368       container->cas_column = strdup(val);
369     }
370 
371     //  `expire_time_column` VARCHAR(250)
372     if(op.isNull(COL_STORE_VALUE + 7)) container->exp_column = 0;
373     else {
374       op.copyValue(COL_STORE_VALUE + 7, val);
375       container->exp_column = strdup(val);
376     }
377     DEBUG_PRINT("\"%s\" found in database (%s).", name, table);
378   }
379   else {
380     container = 0;
381     logger->log(LOG_WARNING, 0, "\"%s\" NOT FOUND in database.\n", name);
382   }
383 
384   free(op.key_buffer);
385   free(op.buffer);
386 
387   return container;
388 }
389 
390 
391 /* get_prefixes():
392  SELECT * FROM key_prefixes where server_role_id = role_id;
393  This is an ordered index scan.  It fetches key prefixes from the
394  configuration metadata and passes them on to store_prefix().
395  Returns true on success.
396  */
get_prefixes(int role_id,NdbTransaction * tx)397 bool config_v1::get_prefixes(int role_id, NdbTransaction *tx) {
398   DEBUG_ENTER_METHOD("config_v1::get_prefixes");
399   bool success = true;
400   int res;
401   TableSpec spec("ndbmemcache.key_prefixes",
402                  "server_role_id,key_prefix",
403                  "cluster_id,policy,container");
404   QueryPlan plan(&db, &spec, PKScan);
405   Operation op(&plan, OP_SCAN);
406 
407   // `server_role_id` INT UNSIGNED NOT NULL DEFAULT 0,
408   // PRIMARY KEY (`server_role_id`, `key_prefix`) )
409   op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
410   op.setKeyPartInt(COL_STORE_KEY, role_id);
411 
412   NdbIndexScanOperation::IndexBound bound;
413   bound.low_key = bound.high_key = op.key_buffer;
414   bound.low_key_count = bound.high_key_count = 1;
415   bound.low_inclusive = bound.high_inclusive = true;
416   bound.range_no = 0;
417 
418   NdbIndexScanOperation *scan = op.scanIndex(tx, &bound);
419   if(! scan) {
420     record_ndb_error(tx->getNdbError());
421     logger->log(LOG_WARNING, 0, "scanIndex(): %s\n", tx->getNdbError().message);
422     success = false;
423   }
424   if(tx->execute(NdbTransaction::NoCommit)) {
425     record_ndb_error(tx->getNdbError());
426     logger->log(LOG_WARNING, 0, "execute(): %s\n", tx->getNdbError().message);
427     success = false;
428   }
429 
430   res = scan->nextResult((const char **) &op.buffer, true, false);
431   while((res == 0) || (res == 2)) {
432     char key_prefix[251], policy_name[41], container[51];
433     int cluster_id = 0;
434     TableSpec * container_spec;
435 
436     // `key_prefix` VARCHAR(250) NOT NULL ,
437     op.copyValue(COL_STORE_KEY + 1, key_prefix);
438 
439     // `cluster_id` INT UNSIGNED NOT NULL DEFAULT 0,
440     cluster_id = op.getIntValue(COL_STORE_VALUE + 0);
441 
442     // `policy` VARCHAR(40) NOT NULL,
443     op.copyValue(COL_STORE_VALUE + 1, policy_name);
444 
445     // `container` VARCHAR(50),
446     if(op.isNull(COL_STORE_VALUE + 2)) container_spec = 0;
447     else {
448       op.copyValue(COL_STORE_VALUE + 2, container);
449       container_spec = get_container(container, tx);
450       if(! container_spec) {
451         logger->log(LOG_WARNING, 0, "Cannot find container \"%s\" for "
452                     "key prefix \"%s\".\n", container, key_prefix);
453         free(op.key_buffer);
454         return false;
455       }
456     }
457 
458     if(! store_prefix(key_prefix, container_spec, cluster_id, policy_name)) {
459       delete[] op.key_buffer;
460       return false;
461     }
462     res = scan->nextResult((const char **) &op.buffer, true, false);
463   }
464 
465   free(op.key_buffer);
466 
467   if(res == -1) {
468     log_ndb_error(scan->getNdbError());
469     return false;
470   }
471   return true;
472 }
473 
474 
475 /* store_prefix():
476  Takes everything needed to build a KeyPrefix.
477  If the config is valid, build the KeyPrefix, store it in the configuration,
478  and return true.  Otherwise print a warning to the log and return false.
479  */
store_prefix(const char * name,TableSpec * table,int cluster_id,char * cache_policy)480 bool config_v1::store_prefix(const char * name,
481                              TableSpec *table,
482                              int cluster_id,
483                              char *cache_policy) {
484   KeyPrefix prefix(name);
485   prefix_info_t * info_ptr;
486 
487   info_ptr = policies_map->find(cache_policy);
488   if(info_ptr == 0) {
489     /* policy from key_prefixes doesn't exist in cache_policies */
490     logger->log(LOG_WARNING, 0, "Invalid cache policy \"%s\" named in "
491                 "key prefix \"%s\"\n", cache_policy, name);
492     return false;
493   }
494 
495   memcpy(& prefix.info, info_ptr, sizeof(prefix_info_t));
496 
497   if(prefix.info.do_db_read || prefix.info.do_db_write
498      || prefix.info.do_db_delete || prefix.info.do_db_flush) {
499     prefix.info.use_ndb = 1;
500     /* At least one of math_col or value_col is required.  */
501     if((! table->math_column) && (! table->value_columns[0])) {
502       logger->log(LOG_WARNING, 0, "Error at key prefix \"%s\": "
503                   "No value container.\n");
504       return false;
505     }
506     if(table->cas_column)   prefix.info.has_cas_col    = 1;
507     if(table->math_column)  prefix.info.has_math_col   = 1;
508     if(table->exp_column)   prefix.info.has_expire_col = 1;
509     if(table->flags_column) prefix.info.has_flags_col  = 1;
510   }
511   else {
512     /* If the prefix does not use NDB, you cannot specify a container. */
513     if(table != 0) {
514       logger->log(LOG_WARNING, 0, "Error at key prefix \"%s\": "
515                   "Cache policy \"%s\" does not use NDB, so container "
516                   " must be null.\n", name, cache_policy);
517       return false;
518     }
519   }
520 
521   int internal_cluster_idx = -1;
522 
523   if(prefix.info.use_ndb) {
524     /* The cluster_id must refer to a known cluster: */
525     for(int i = 0 ; i < nclusters ; i++)
526       if(cluster_ids[i] == cluster_id)
527         internal_cluster_idx = i;
528 
529     if(internal_cluster_idx == -1) {
530       logger->log(LOG_WARNING, 0, "Error at key prefix \"%s\": cluster_id %d "
531                   "does not exist in ndb_clusters table.\n",
532                   name, cluster_id);
533       return false;
534     }
535   }
536 
537   /* Tie it all together */
538   prefix.info.cluster_id = internal_cluster_idx;
539   prefix.table = table;
540   prefix.info.usable = 1;
541 
542   /* Configuration::storePrefix() will make a copy of the KeyPrefix,
543    and fill in the prefix_id of the copy.
544    */
545   prefix.info.prefix_id = conf.storePrefix(prefix);
546 
547   return true;
548 }
549 
550 
551 /* log_signon()
552  UPDATE last_memcached_signon SET hostname=?, server_role=?, signon_time=?
553  WHERE ndb_node_id = MY_NODE_ID.
554  This has the side effect of providing us with the global checkpoint ID
555  for server startup.
556  */
log_signon(NdbTransaction * tx)557 void config_v1::log_signon(NdbTransaction *tx) {
558   DEBUG_ENTER_METHOD("config_v1::log_signon");
559   char my_hostname[256];
560   gethostname(my_hostname, 256);
561   TableSpec spec("ndbmemcache.last_memcached_signon",
562                  "ndb_node_id", "hostname,server_role,signon_time");
563   QueryPlan plan(&db, &spec);
564 
565   Operation op(&plan, OPERATION_SET);
566   op.buffer     = (char *) malloc(op.requiredBuffer());
567   op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
568   op.setKeyPartInt(COL_STORE_KEY,   db.getNodeId());  // node ID (in key)
569   op.setColumnInt(COL_STORE_KEY,    db.getNodeId());  // node ID (in row)
570   op.setColumn(COL_STORE_VALUE+0,   my_hostname, strlen(my_hostname));           // hostname
571   op.setColumn(COL_STORE_VALUE+1,   conf.server_role, strlen(conf.server_role)); // role
572   op.setColumnInt(COL_STORE_VALUE+2,time(NULL));                                 // timestamp
573 
574   op.writeTuple(tx);
575   tx->execute(NdbTransaction::NoCommit);
576   tx->getGCI(&signon_gci);
577 
578   free(op.key_buffer);
579   free(op.buffer);
580   return;
581 }
582 
583 
584 /* set_initial_cas():
585  Create an initial value for the cas_unique sequence.
586  Use the latest GCI (obtained when signing on) and the NDB node id.
587  TODO: This scheme probably spends too many bits on an unchanging initial GCI
588  leaving not enough bits for the counter.
589  */
set_initial_cas()590 void config_v1::set_initial_cas() {
591   /*  ----------------------------------------------------------------
592    |   27 bits Initial GCI    | eng bit|     28 bits counter        |
593    |                          | + 8bit |                            |
594    |                          | NodeId |                            |
595    ----------------------------------------------------------------   */
596   const uint64_t MASK_GCI   = 0x07FFFFFF00000000LLU; // Use these 27 bits of GCI
597   const uint64_t ENGINE_BIT = 0x0000001000000000LLU; // bit 36
598 
599   uint64_t node_id = ((uint64_t) db.getNodeId()) << 28;
600   uint64_t gci_bits = (signon_gci & MASK_GCI) << 5;
601   uint64_t def_eng_cas = gci_bits | node_id;
602   uint64_t ndb_eng_cas = gci_bits | ENGINE_BIT | node_id;
603 
604   //  void storeCAS(uint64_t ndb_engine_cas, uint64_t default_engine_cas);
605   conf.storeCAS(ndb_eng_cas, def_eng_cas);
606   DEBUG_PRINT("Sign On GCI: 0x%llx | Node Id: [%d] 0x%llx | Engine bit: 0x%llx",
607               signon_gci, db.getNodeId(), node_id, ENGINE_BIT);
608   DEBUG_PRINT("Initial CAS: %llu 0x%llx ", ndb_eng_cas, ndb_eng_cas);
609 
610   return;
611 }
612 
613 
614 /***************** VERSION 1.0 ****************/
minor_version_config()615 void config_v1_0::minor_version_config() {
616   conf.onlineReloadFlag = 0;
617   conf.reload_waiter = 0;
618 }
619 
620 
621 /***************** VERSION 1.1 ****************/
622 int server_roles_reload_waiter(Ndb_cluster_connection *, const char *);
minor_version_config()623 void config_v1_1::minor_version_config() {
624   conf.onlineReloadFlag = 1;
625   conf.reload_waiter = server_roles_reload_waiter;
626 }
627 
628 
629 /******** RELOAD WAITER ON ndbmemcache.memcache_server_roles *********/
create_event(NdbDictionary::Dictionary * dict,const char * event_name)630 int create_event(NdbDictionary::Dictionary *dict, const char *event_name) {
631   DEBUG_ENTER();
632   const NdbDictionary::Table *tab = dict->getTable("memcache_server_roles");
633   if(tab == 0) {
634     log_ndb_error(dict->getNdbError());
635     return -1;
636   }
637 
638   NdbDictionary::Event event(event_name, *tab);
639   event.addTableEvent(NdbDictionary::Event::TE_UPDATE);
640   event.addEventColumn("update_timestamp");
641   if(dict->createEvent(event) != 0) {
642     log_ndb_error(dict->getNdbError());
643     return -1;
644   }
645 
646   return 0;
647 }
648 
649 
server_roles_reload_waiter(Ndb_cluster_connection * conn,const char * server_role)650 int server_roles_reload_waiter(Ndb_cluster_connection *conn,
651                                const char *server_role) {
652   DEBUG_ENTER();
653   const char * event_name = "MEMCACHE$conf_reload_v1.1";
654   Ndb db(conn, "ndbmemcache");
655   db.init(4);
656   NdbDictionary::Dictionary *dict = db.getDictionary();
657 
658   const NdbDictionary::Event * stored_event = dict->getEvent(event_name);
659   if(stored_event == 0) {
660     if(create_event(dict, event_name) != 0) {
661       return -1;
662     }
663   }
664 
665   NdbEventOperation *wait_op = db.createEventOperation(event_name);
666   if(wait_op == 0) { // error
667     log_ndb_error(db.getNdbError());
668     return -1;
669   }
670 
671   /* Create RecAttrs for the PK and the timestamp */
672   NdbRecAttr *recattr1 = wait_op->getValue("role_name");
673   NdbRecAttr *recattr2 = wait_op->getPreValue("role_name");
674   NdbRecAttr *recattr3 = wait_op->getValue("update_timestamp");
675   NdbRecAttr *recattr4 = wait_op->getPreValue("update_timestamp");
676   assert(recattr1 && recattr2 && recattr3 && recattr4);
677 
678   if(wait_op->execute() != 0) {
679     log_ndb_error(wait_op->getNdbError());
680     return -1;
681   }
682 
683   while(1) {
684     // TODO: if conf.shutdown return 0.
685     int waiting = db.pollEvents2(1000);
686 
687     if(waiting < 0) {
688       /* error */
689       db.dropEventOperation(wait_op);
690       log_ndb_error(db.getNdbError());
691       return -1;
692     }
693     else if(waiting > 0) {
694       NdbEventOperation *event = db.nextEvent2();
695       if(event) {
696         switch(event->getEventType2()) {
697           case NdbDictionary::Event::TE_UPDATE:
698             if(recattr1->isNULL() == 0) {
699               uint role_name_len = *(const unsigned char*) recattr1->aRef();
700               char *role_name = recattr1->aRef() + 1;
701               if(role_name_len == strlen(server_role) && ! strcmp(server_role, role_name)) {
702                 /* Time to reconfigure! */
703                 logger->log(LOG_WARNING, 0, "Received update to server role %s", role_name);
704                 db.dropEventOperation(wait_op);
705                 return 1;
706               } else DEBUG_PRINT("Got update event for %s, but that aint me.", role_name);
707             } else DEBUG_PRINT("Got update event for NULL role");
708             break;
709           case NdbDictionary::Event::TE_NODE_FAILURE:
710             logger->log(LOG_WARNING, 0, "Event thread got TE_NODE_FAILURE");
711             break;
712           case NdbDictionary::Event::TE_INCONSISTENT:
713             logger->log(LOG_WARNING, 0, "Event thread got TE_INCONSISTENT");
714             break;
715           case NdbDictionary::Event::TE_OUT_OF_MEMORY:
716             logger->log(LOG_WARNING, 0, "Event buffer overflow.  "
717                         "Event thread got TE_OUT_OF_MEMORY.");
718             break;
719           default:
720             /* No need to discuss other event types. */
721             break;
722         }
723       } else DEBUG_PRINT("Spurious wakeup: nextEvent2() returned > 0.");
724     } // pollEvents2() returned 0 = timeout.  just wait again.
725   } // End of while(1) loop
726 }
727 
728 
729 /***************** VERSION 1.2 ****************/
minor_version_config()730 void config_v1_2::minor_version_config() {
731   conf.onlineReloadFlag = 1;
732   conf.reload_waiter = server_roles_reload_waiter;
733 }
734 
735 
get_container_record(char * name,NdbTransaction * tx)736 TableSpec * config_v1_2::get_container_record(char *name, NdbTransaction *tx) {
737   TableSpec * cont = config_v1::get_container_record(name, tx);
738   if(cont) {
739     TableSpec spec("ndbmemcache.containers", "name", "large_values_table");
740     QueryPlan plan(&db, &spec);
741     Operation op(&plan, OP_READ);
742 
743     op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
744     op.buffer     = (char *) malloc(op.requiredBuffer());
745 
746     op.clearKeyNullBits();
747     op.setKeyPart(COL_STORE_KEY, name, strlen(name));
748     op.readTuple(tx);
749     tx->execute(NdbTransaction::NoCommit);
750 
751     if(tx->getNdbError().classification == NdbError::NoError) {
752       char val[256];
753       if(! op.isNull(COL_STORE_VALUE + 0)) {
754         op.copyValue(COL_STORE_VALUE + 0, val);
755         cont->external_table = ExternalValue::createContainerRecord(val);
756       }
757     }
758 
759     free(op.key_buffer);
760     free(op.buffer);
761   }
762   return cont;
763 }
764 
765