1 /*
2 Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights
3 reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24 02110-1301 USA
25 */
26
27 #include <my_config.h>
28 #include <unistd.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <assert.h>
32 #include <string.h>
33 #include <time.h>
34
35 #include "NdbApi.hpp"
36
37 #include <memcached/extension_loggers.h>
38 #include <memcached/util.h>
39
40 #include "ndbmemcache_global.h"
41 #include "debug.h"
42 #include "timing.h"
43 #include "Config_v1.h"
44 #include "TableSpec.h"
45 #include "QueryPlan.h"
46 #include "Operation.h"
47 #include "ExternalValue.h"
48 #include "ndb_error_logger.h"
49
50 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
51
52
53 /*********** VERSION 1 METADATA *******************/
54
55
config_v1(Configuration * cf)56 config_v1::config_v1(Configuration * cf) :
57 db(cf->primary_conn),
58 conf(*cf),
59 server_role_id(-1),
60 nclusters(0),
61 policies_map(0),
62 containers_map(0)
63 {
64 db.init(2);
65 };
66
~config_v1()67 config_v1::~config_v1() {
68 DEBUG_ENTER_METHOD("config_v1 destructor");
69 if(containers_map) {
70 delete containers_map;
71 }
72 if(policies_map) {
73 policies_map->do_free_values = true;
74 delete policies_map;
75 }
76 }
77
read_configuration()78 bool config_v1::read_configuration() {
79 DEBUG_ENTER_METHOD("config_v1::read_configuration");
80
81 for(int i = 0 ; i < MAX_CLUSTERS ; i++) cluster_ids[i] = 0;
82
83 containers_map = new LookupTable<TableSpec>();
84 policies_map = new LookupTable<prefix_info_t>();
85
86 bool success = false;
87 NdbTransaction *tx = db.startTransaction();
88 if(tx) {
89 server_role_id = get_server_role_id(tx);
90 if(! (server_role_id < 0)) success = get_policies(tx);
91 if(success) success = get_connections(tx);
92 if(success) success = get_prefixes(server_role_id, tx);
93 if(success) {
94 log_signon(tx);
95 set_initial_cas();
96 tx->execute(NdbTransaction::Commit);
97 minor_version_config();
98 }
99 else {
100 logger->log(LOG_WARNING, 0, "Configuration failed.\n");
101 tx->execute(NdbTransaction::Rollback);
102 }
103
104 tx->close();
105 }
106 else {
107 log_ndb_error(db.getNdbError());
108 }
109
110 return success;
111 }
112
113
114 /* get_server_role_id():
115 SELECT role_id, max_tps
116 FROM memcache_server_roles
117 WHERE role_name = conf.server_role;
118 Returns the integer ID, or -1 if the record was not found.
119 */
get_server_role_id(NdbTransaction * tx)120 int config_v1::get_server_role_id(NdbTransaction *tx) {
121 uint32_t val = -1;
122
123 TableSpec spec("ndbmemcache.memcache_server_roles",
124 "role_name", "role_id,max_tps");
125 QueryPlan plan(&db, &spec);
126 Operation op(&plan, OP_READ);
127
128 op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
129 op.buffer = (char *) malloc(op.requiredBuffer());
130
131 op.clearKeyNullBits();
132 op.setKeyPart(COL_STORE_KEY, conf.server_role, strlen(conf.server_role));
133 op.readTuple(tx);
134 tx->execute(NdbTransaction::NoCommit);
135
136 if(tx->getNdbError().classification == NdbError::NoError) {
137 val = op.getIntValue(COL_STORE_VALUE+0);
138 conf.max_tps = op.getIntValue(COL_STORE_VALUE+1);
139 }
140 else {
141 logger->log(LOG_WARNING, 0, "\nServer role \"%s\" not found in "
142 "configuration database.\n\n", conf.server_role);
143 }
144
145 free(op.key_buffer);
146 free(op.buffer);
147
148 DEBUG_PRINT("Name: \"%s\" -- ID: %d", conf.server_role, val);
149 return val;
150 }
151
152 /* get_policies():
153 SELECT * FROM cache_policies;
154 Creates the policies map (name => prefix_info).
155 Returns true on success.
156
157 `get_policy` ENUM('cache_only','ndb_only','caching','disabled') NOT NULL ,
158 `set_policy` ENUM('cache_only','ndb_only','caching','disabled') NOT NULL ,
159 `delete_policy` ENUM('cache_only','ndb_only','caching','disabled') NOT NULL ,
160 `flush_from_db` ENUM('false', 'true') NOT NULL DEFAULT 'false'
161 */
get_policies(NdbTransaction * tx)162 bool config_v1::get_policies(NdbTransaction *tx) {
163 DEBUG_ENTER_METHOD("config_v1::get_policies");
164 bool success = true;
165 int res;
166 TableSpec spec("ndbmemcache.cache_policies",
167 "policy_name",
168 "get_policy,set_policy,delete_policy,flush_from_db");
169 QueryPlan plan(&db, &spec);
170 Operation op(&plan, OP_SCAN);
171
172 NdbScanOperation *scan = op.scanTable(tx);
173 if(! scan) {
174 log_ndb_error(tx->getNdbError());
175 success = false;
176 }
177 if(tx->execute(NdbTransaction::NoCommit)) {
178 log_ndb_error(tx->getNdbError());
179 success = false;
180 }
181
182 res = scan->nextResult((const char **) &op.buffer, true, false);
183 while((res == 0) || (res == 2)) {
184 prefix_info_t * info = (prefix_info_t *) calloc(1, sizeof(prefix_info_t));
185
186 char name[41]; // `policy_name` VARCHAR(40) NOT NULL
187 size_t name_len = op.copyValue(COL_STORE_KEY, name);
188 assert(name_len > 0);
189
190 /* ENUM('cache_only','ndb_only','caching','disabled') NOT NULL
191 is: 1 2 3 4 */
192 unsigned int get_policy = op.getIntValue(COL_STORE_VALUE+0);
193 assert((get_policy > 0) && (get_policy < 5));
194 if(get_policy == 1 || get_policy == 3) info->do_mc_read = 1;
195 if(get_policy == 2 || get_policy == 3) info->do_db_read = 1;
196
197 unsigned int set_policy = op.getIntValue(COL_STORE_VALUE+1);
198 assert((set_policy > 0) && (set_policy < 5));
199 if(set_policy == 1 || set_policy == 3) info->do_mc_write = 1;
200 if(set_policy == 2 || set_policy == 3) info->do_db_write = 1;
201
202 unsigned int del_policy = op.getIntValue(COL_STORE_VALUE+2);
203 assert((del_policy > 0) && (del_policy < 5));
204 if(del_policy == 1 || del_policy == 3) info->do_mc_delete = 1;
205 if(del_policy == 2 || del_policy == 3) info->do_db_delete = 1;
206
207
208 /* `flush_from_db` ENUM('false', 'true') NOT NULL DEFAULT 'false'
209 is: 1 2 */
210 int flush_policy = op.getIntValue(COL_STORE_VALUE+3);
211 if(flush_policy == 2) info->do_db_flush = 1;
212
213 DEBUG_PRINT("%s: get-%d set-%d del-%d flush-%d addr-%p",
214 name, get_policy, set_policy, del_policy, flush_policy, info);
215
216 policies_map->insert(name, info);
217 res = scan->nextResult((const char **) &op.buffer, true, false);
218 }
219 if(res == -1) {
220 log_ndb_error(scan->getNdbError());
221 success = false;
222 }
223
224 return success;
225 }
226
227 /* get_connections():
228 SELECT * FROM ndb_clusters.
229 Creates the cluster_ids map <cfg_data_cluster_id => connections_index>.
230 Returns true on success.
231 */
get_connections(NdbTransaction * tx)232 bool config_v1::get_connections(NdbTransaction *tx) {
233 DEBUG_ENTER_METHOD("config_v1::get_connections");
234 bool success = true;
235 int res;
236 TableSpec spec("ndbmemcache.ndb_clusters",
237 "cluster_id",
238 "ndb_connectstring,microsec_rtt");
239 /* Scan the ndb_clusters table */
240 QueryPlan plan(&db, &spec);
241 Operation op(&plan, OP_SCAN);
242
243 NdbScanOperation *scan = op.scanTable(tx);
244 if(! scan) {
245 log_ndb_error(scan->getNdbError());
246 success = false;
247 }
248 if(tx->execute(NdbTransaction::NoCommit)) {
249 log_ndb_error(tx->getNdbError());
250 success = false;
251 }
252
253 res = scan->nextResult((const char **) &op.buffer, true, false);
254 while((res == 0) || (res == 2)) {
255 bool str_is_null;
256 char connectstring[129];
257 unsigned int rtt;
258 /* `cluster_id` INT NOT NULL */
259 int cfg_data_id = op.getIntValue(COL_STORE_KEY);
260
261 /* `ndb_connectstring` VARCHAR(128) NULL */
262 str_is_null = op.isNull(COL_STORE_VALUE+0);
263 if(! str_is_null) op.copyValue(COL_STORE_VALUE+0, connectstring);
264
265 /* `microsec_rtt` INT UNSIGNED NOT NULL default 300 */
266 rtt = op.getIntValue(COL_STORE_VALUE+1);
267
268 /* Add the connection to the configuration */
269 int connection_idx;
270 if(str_is_null)
271 connection_idx = conf.storeConnection(0, rtt);
272 else
273 connection_idx = conf.storeConnection(strdup(connectstring), rtt);
274
275 DEBUG_PRINT("[%d]: { %d => \"%s\" [rtt: %d]}", connection_idx,
276 cfg_data_id, str_is_null ? "" : connectstring, rtt);
277
278 nclusters++;
279 cluster_ids[connection_idx] = cfg_data_id;
280 res = scan->nextResult((const char **) &op.buffer, true, false);
281 }
282 if(res == -1) {
283 log_ndb_error(scan->getNdbError());
284 success = false;
285 }
286 DEBUG_PRINT("clusters: %d", nclusters);
287 return success;
288 }
289
290
get_container(char * name,NdbTransaction * tx)291 TableSpec * config_v1::get_container(char *name, NdbTransaction *tx) {
292 TableSpec *c = containers_map->find(name);
293
294 if(c == NULL) {
295 c = get_container_record(name, tx);
296 containers_map->insert(name, c);
297 }
298 else {
299 DEBUG_PRINT("\"%s\" found in local map (\"%s\").", name, c->table_name);
300 }
301 assert(c);
302 return c;
303 }
304
305
get_container_record(char * name,NdbTransaction * tx)306 TableSpec * config_v1::get_container_record(char *name, NdbTransaction *tx) {
307 TableSpec *container;
308 TableSpec spec("ndbmemcache.containers",
309 "name",
310 "db_schema,db_table,key_columns,value_columns,flags,"
311 "increment_column,cas_column,expire_time_column");
312 QueryPlan plan(&db, &spec);
313 Operation op(&plan, OP_READ);
314
315 op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
316 op.buffer = (char *) malloc(op.requiredBuffer());
317
318 op.clearKeyNullBits();
319 op.setKeyPart(COL_STORE_KEY, name, strlen(name));
320 op.readTuple(tx);
321 tx->execute(NdbTransaction::NoCommit);
322
323 if(tx->getNdbError().classification == NdbError::NoError) {
324 char val[256];
325 char *schema, *table, *keycols, *valcols;
326
327 // `db_schema` VARCHAR(250) NOT NULL,
328 // `db_table` VARCHAR(250) NOT NULL,
329 // `key_columns` VARCHAR(250) NOT NULL,
330 op.copyValue(COL_STORE_VALUE + 0, val); schema = strdup(val);
331 op.copyValue(COL_STORE_VALUE + 1, val); table = strdup(val);
332 op.copyValue(COL_STORE_VALUE + 2, val); keycols = strdup(val);
333
334 // `value_columns` VARCHAR(250),
335 // TODO: testcase for value_columns is null
336 if(op.isNull(COL_STORE_VALUE + 3)) valcols = 0;
337 else {
338 op.copyValue(COL_STORE_VALUE + 3, val);
339 valcols = strdup(val);
340 }
341
342 /* Instantiate a TableSpec for this container */
343 container = new TableSpec(0, keycols, valcols);
344 container->setTable(schema, table);
345
346 if(keycols) free(keycols);
347 if(valcols) free(valcols);
348
349 // `flags` VARCHAR(250) NOT NULL DEFAULT "0",
350 /* If the value is non-numeric, use it to set the flags_column field */
351 container->flags_column = 0;
352 container->static_flags = 0;
353 op.copyValue(COL_STORE_VALUE + 4, val);
354 if(!safe_strtoul(val, & container->static_flags))
355 container->flags_column = strdup(val);
356
357 // `increment_column` VARCHAR(250),
358 if(op.isNull(COL_STORE_VALUE + 5)) container->math_column = 0;
359 else {
360 op.copyValue(COL_STORE_VALUE + 5, val);
361 container->math_column = strdup(val);
362 }
363
364 // `cas_column` VARCHAR(250),
365 if(op.isNull(COL_STORE_VALUE + 6)) container->cas_column = 0;
366 else {
367 op.copyValue(COL_STORE_VALUE + 6, val);
368 container->cas_column = strdup(val);
369 }
370
371 // `expire_time_column` VARCHAR(250)
372 if(op.isNull(COL_STORE_VALUE + 7)) container->exp_column = 0;
373 else {
374 op.copyValue(COL_STORE_VALUE + 7, val);
375 container->exp_column = strdup(val);
376 }
377 DEBUG_PRINT("\"%s\" found in database (%s).", name, table);
378 }
379 else {
380 container = 0;
381 logger->log(LOG_WARNING, 0, "\"%s\" NOT FOUND in database.\n", name);
382 }
383
384 free(op.key_buffer);
385 free(op.buffer);
386
387 return container;
388 }
389
390
391 /* get_prefixes():
392 SELECT * FROM key_prefixes where server_role_id = role_id;
393 This is an ordered index scan. It fetches key prefixes from the
394 configuration metadata and passes them on to store_prefix().
395 Returns true on success.
396 */
get_prefixes(int role_id,NdbTransaction * tx)397 bool config_v1::get_prefixes(int role_id, NdbTransaction *tx) {
398 DEBUG_ENTER_METHOD("config_v1::get_prefixes");
399 bool success = true;
400 int res;
401 TableSpec spec("ndbmemcache.key_prefixes",
402 "server_role_id,key_prefix",
403 "cluster_id,policy,container");
404 QueryPlan plan(&db, &spec, PKScan);
405 Operation op(&plan, OP_SCAN);
406
407 // `server_role_id` INT UNSIGNED NOT NULL DEFAULT 0,
408 // PRIMARY KEY (`server_role_id`, `key_prefix`) )
409 op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
410 op.setKeyPartInt(COL_STORE_KEY, role_id);
411
412 NdbIndexScanOperation::IndexBound bound;
413 bound.low_key = bound.high_key = op.key_buffer;
414 bound.low_key_count = bound.high_key_count = 1;
415 bound.low_inclusive = bound.high_inclusive = true;
416 bound.range_no = 0;
417
418 NdbIndexScanOperation *scan = op.scanIndex(tx, &bound);
419 if(! scan) {
420 record_ndb_error(tx->getNdbError());
421 logger->log(LOG_WARNING, 0, "scanIndex(): %s\n", tx->getNdbError().message);
422 success = false;
423 }
424 if(tx->execute(NdbTransaction::NoCommit)) {
425 record_ndb_error(tx->getNdbError());
426 logger->log(LOG_WARNING, 0, "execute(): %s\n", tx->getNdbError().message);
427 success = false;
428 }
429
430 res = scan->nextResult((const char **) &op.buffer, true, false);
431 while((res == 0) || (res == 2)) {
432 char key_prefix[251], policy_name[41], container[51];
433 int cluster_id = 0;
434 TableSpec * container_spec;
435
436 // `key_prefix` VARCHAR(250) NOT NULL ,
437 op.copyValue(COL_STORE_KEY + 1, key_prefix);
438
439 // `cluster_id` INT UNSIGNED NOT NULL DEFAULT 0,
440 cluster_id = op.getIntValue(COL_STORE_VALUE + 0);
441
442 // `policy` VARCHAR(40) NOT NULL,
443 op.copyValue(COL_STORE_VALUE + 1, policy_name);
444
445 // `container` VARCHAR(50),
446 if(op.isNull(COL_STORE_VALUE + 2)) container_spec = 0;
447 else {
448 op.copyValue(COL_STORE_VALUE + 2, container);
449 container_spec = get_container(container, tx);
450 if(! container_spec) {
451 logger->log(LOG_WARNING, 0, "Cannot find container \"%s\" for "
452 "key prefix \"%s\".\n", container, key_prefix);
453 free(op.key_buffer);
454 return false;
455 }
456 }
457
458 if(! store_prefix(key_prefix, container_spec, cluster_id, policy_name)) {
459 delete[] op.key_buffer;
460 return false;
461 }
462 res = scan->nextResult((const char **) &op.buffer, true, false);
463 }
464
465 free(op.key_buffer);
466
467 if(res == -1) {
468 log_ndb_error(scan->getNdbError());
469 return false;
470 }
471 return true;
472 }
473
474
475 /* store_prefix():
476 Takes everything needed to build a KeyPrefix.
477 If the config is valid, build the KeyPrefix, store it in the configuration,
478 and return true. Otherwise print a warning to the log and return false.
479 */
store_prefix(const char * name,TableSpec * table,int cluster_id,char * cache_policy)480 bool config_v1::store_prefix(const char * name,
481 TableSpec *table,
482 int cluster_id,
483 char *cache_policy) {
484 KeyPrefix prefix(name);
485 prefix_info_t * info_ptr;
486
487 info_ptr = policies_map->find(cache_policy);
488 if(info_ptr == 0) {
489 /* policy from key_prefixes doesn't exist in cache_policies */
490 logger->log(LOG_WARNING, 0, "Invalid cache policy \"%s\" named in "
491 "key prefix \"%s\"\n", cache_policy, name);
492 return false;
493 }
494
495 memcpy(& prefix.info, info_ptr, sizeof(prefix_info_t));
496
497 if(prefix.info.do_db_read || prefix.info.do_db_write
498 || prefix.info.do_db_delete || prefix.info.do_db_flush) {
499 prefix.info.use_ndb = 1;
500 /* At least one of math_col or value_col is required. */
501 if((! table->math_column) && (! table->value_columns[0])) {
502 logger->log(LOG_WARNING, 0, "Error at key prefix \"%s\": "
503 "No value container.\n");
504 return false;
505 }
506 if(table->cas_column) prefix.info.has_cas_col = 1;
507 if(table->math_column) prefix.info.has_math_col = 1;
508 if(table->exp_column) prefix.info.has_expire_col = 1;
509 if(table->flags_column) prefix.info.has_flags_col = 1;
510 }
511 else {
512 /* If the prefix does not use NDB, you cannot specify a container. */
513 if(table != 0) {
514 logger->log(LOG_WARNING, 0, "Error at key prefix \"%s\": "
515 "Cache policy \"%s\" does not use NDB, so container "
516 " must be null.\n", name, cache_policy);
517 return false;
518 }
519 }
520
521 int internal_cluster_idx = -1;
522
523 if(prefix.info.use_ndb) {
524 /* The cluster_id must refer to a known cluster: */
525 for(int i = 0 ; i < nclusters ; i++)
526 if(cluster_ids[i] == cluster_id)
527 internal_cluster_idx = i;
528
529 if(internal_cluster_idx == -1) {
530 logger->log(LOG_WARNING, 0, "Error at key prefix \"%s\": cluster_id %d "
531 "does not exist in ndb_clusters table.\n",
532 name, cluster_id);
533 return false;
534 }
535 }
536
537 /* Tie it all together */
538 prefix.info.cluster_id = internal_cluster_idx;
539 prefix.table = table;
540 prefix.info.usable = 1;
541
542 /* Configuration::storePrefix() will make a copy of the KeyPrefix,
543 and fill in the prefix_id of the copy.
544 */
545 prefix.info.prefix_id = conf.storePrefix(prefix);
546
547 return true;
548 }
549
550
551 /* log_signon()
552 UPDATE last_memcached_signon SET hostname=?, server_role=?, signon_time=?
553 WHERE ndb_node_id = MY_NODE_ID.
554 This has the side effect of providing us with the global checkpoint ID
555 for server startup.
556 */
log_signon(NdbTransaction * tx)557 void config_v1::log_signon(NdbTransaction *tx) {
558 DEBUG_ENTER_METHOD("config_v1::log_signon");
559 char my_hostname[256];
560 gethostname(my_hostname, 256);
561 TableSpec spec("ndbmemcache.last_memcached_signon",
562 "ndb_node_id", "hostname,server_role,signon_time");
563 QueryPlan plan(&db, &spec);
564
565 Operation op(&plan, OPERATION_SET);
566 op.buffer = (char *) malloc(op.requiredBuffer());
567 op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
568 op.setKeyPartInt(COL_STORE_KEY, db.getNodeId()); // node ID (in key)
569 op.setColumnInt(COL_STORE_KEY, db.getNodeId()); // node ID (in row)
570 op.setColumn(COL_STORE_VALUE+0, my_hostname, strlen(my_hostname)); // hostname
571 op.setColumn(COL_STORE_VALUE+1, conf.server_role, strlen(conf.server_role)); // role
572 op.setColumnInt(COL_STORE_VALUE+2,time(NULL)); // timestamp
573
574 op.writeTuple(tx);
575 tx->execute(NdbTransaction::NoCommit);
576 tx->getGCI(&signon_gci);
577
578 free(op.key_buffer);
579 free(op.buffer);
580 return;
581 }
582
583
584 /* set_initial_cas():
585 Create an initial value for the cas_unique sequence.
586 Use the latest GCI (obtained when signing on) and the NDB node id.
587 TODO: This scheme probably spends too many bits on an unchanging initial GCI
588 leaving not enough bits for the counter.
589 */
set_initial_cas()590 void config_v1::set_initial_cas() {
591 /* ----------------------------------------------------------------
592 | 27 bits Initial GCI | eng bit| 28 bits counter |
593 | | + 8bit | |
594 | | NodeId | |
595 ---------------------------------------------------------------- */
596 const uint64_t MASK_GCI = 0x07FFFFFF00000000LLU; // Use these 27 bits of GCI
597 const uint64_t ENGINE_BIT = 0x0000001000000000LLU; // bit 36
598
599 uint64_t node_id = ((uint64_t) db.getNodeId()) << 28;
600 uint64_t gci_bits = (signon_gci & MASK_GCI) << 5;
601 uint64_t def_eng_cas = gci_bits | node_id;
602 uint64_t ndb_eng_cas = gci_bits | ENGINE_BIT | node_id;
603
604 // void storeCAS(uint64_t ndb_engine_cas, uint64_t default_engine_cas);
605 conf.storeCAS(ndb_eng_cas, def_eng_cas);
606 DEBUG_PRINT("Sign On GCI: 0x%llx | Node Id: [%d] 0x%llx | Engine bit: 0x%llx",
607 signon_gci, db.getNodeId(), node_id, ENGINE_BIT);
608 DEBUG_PRINT("Initial CAS: %llu 0x%llx ", ndb_eng_cas, ndb_eng_cas);
609
610 return;
611 }
612
613
614 /***************** VERSION 1.0 ****************/
minor_version_config()615 void config_v1_0::minor_version_config() {
616 conf.onlineReloadFlag = 0;
617 conf.reload_waiter = 0;
618 }
619
620
621 /***************** VERSION 1.1 ****************/
622 int server_roles_reload_waiter(Ndb_cluster_connection *, const char *);
minor_version_config()623 void config_v1_1::minor_version_config() {
624 conf.onlineReloadFlag = 1;
625 conf.reload_waiter = server_roles_reload_waiter;
626 }
627
628
629 /******** RELOAD WAITER ON ndbmemcache.memcache_server_roles *********/
create_event(NdbDictionary::Dictionary * dict,const char * event_name)630 int create_event(NdbDictionary::Dictionary *dict, const char *event_name) {
631 DEBUG_ENTER();
632 const NdbDictionary::Table *tab = dict->getTable("memcache_server_roles");
633 if(tab == 0) {
634 log_ndb_error(dict->getNdbError());
635 return -1;
636 }
637
638 NdbDictionary::Event event(event_name, *tab);
639 event.addTableEvent(NdbDictionary::Event::TE_UPDATE);
640 event.addEventColumn("update_timestamp");
641 if(dict->createEvent(event) != 0) {
642 log_ndb_error(dict->getNdbError());
643 return -1;
644 }
645
646 return 0;
647 }
648
649
server_roles_reload_waiter(Ndb_cluster_connection * conn,const char * server_role)650 int server_roles_reload_waiter(Ndb_cluster_connection *conn,
651 const char *server_role) {
652 DEBUG_ENTER();
653 const char * event_name = "MEMCACHE$conf_reload_v1.1";
654 Ndb db(conn, "ndbmemcache");
655 db.init(4);
656 NdbDictionary::Dictionary *dict = db.getDictionary();
657
658 const NdbDictionary::Event * stored_event = dict->getEvent(event_name);
659 if(stored_event == 0) {
660 if(create_event(dict, event_name) != 0) {
661 return -1;
662 }
663 }
664
665 NdbEventOperation *wait_op = db.createEventOperation(event_name);
666 if(wait_op == 0) { // error
667 log_ndb_error(db.getNdbError());
668 return -1;
669 }
670
671 /* Create RecAttrs for the PK and the timestamp */
672 NdbRecAttr *recattr1 = wait_op->getValue("role_name");
673 NdbRecAttr *recattr2 = wait_op->getPreValue("role_name");
674 NdbRecAttr *recattr3 = wait_op->getValue("update_timestamp");
675 NdbRecAttr *recattr4 = wait_op->getPreValue("update_timestamp");
676 assert(recattr1 && recattr2 && recattr3 && recattr4);
677
678 if(wait_op->execute() != 0) {
679 log_ndb_error(wait_op->getNdbError());
680 return -1;
681 }
682
683 while(1) {
684 // TODO: if conf.shutdown return 0.
685 int waiting = db.pollEvents2(1000);
686
687 if(waiting < 0) {
688 /* error */
689 db.dropEventOperation(wait_op);
690 log_ndb_error(db.getNdbError());
691 return -1;
692 }
693 else if(waiting > 0) {
694 NdbEventOperation *event = db.nextEvent2();
695 if(event) {
696 switch(event->getEventType2()) {
697 case NdbDictionary::Event::TE_UPDATE:
698 if(recattr1->isNULL() == 0) {
699 uint role_name_len = *(const unsigned char*) recattr1->aRef();
700 char *role_name = recattr1->aRef() + 1;
701 if(role_name_len == strlen(server_role) && ! strcmp(server_role, role_name)) {
702 /* Time to reconfigure! */
703 logger->log(LOG_WARNING, 0, "Received update to server role %s", role_name);
704 db.dropEventOperation(wait_op);
705 return 1;
706 } else DEBUG_PRINT("Got update event for %s, but that aint me.", role_name);
707 } else DEBUG_PRINT("Got update event for NULL role");
708 break;
709 case NdbDictionary::Event::TE_NODE_FAILURE:
710 logger->log(LOG_WARNING, 0, "Event thread got TE_NODE_FAILURE");
711 break;
712 case NdbDictionary::Event::TE_INCONSISTENT:
713 logger->log(LOG_WARNING, 0, "Event thread got TE_INCONSISTENT");
714 break;
715 case NdbDictionary::Event::TE_OUT_OF_MEMORY:
716 logger->log(LOG_WARNING, 0, "Event buffer overflow. "
717 "Event thread got TE_OUT_OF_MEMORY.");
718 break;
719 default:
720 /* No need to discuss other event types. */
721 break;
722 }
723 } else DEBUG_PRINT("Spurious wakeup: nextEvent2() returned > 0.");
724 } // pollEvents2() returned 0 = timeout. just wait again.
725 } // End of while(1) loop
726 }
727
728
729 /***************** VERSION 1.2 ****************/
minor_version_config()730 void config_v1_2::minor_version_config() {
731 conf.onlineReloadFlag = 1;
732 conf.reload_waiter = server_roles_reload_waiter;
733 }
734
735
get_container_record(char * name,NdbTransaction * tx)736 TableSpec * config_v1_2::get_container_record(char *name, NdbTransaction *tx) {
737 TableSpec * cont = config_v1::get_container_record(name, tx);
738 if(cont) {
739 TableSpec spec("ndbmemcache.containers", "name", "large_values_table");
740 QueryPlan plan(&db, &spec);
741 Operation op(&plan, OP_READ);
742
743 op.key_buffer = (char *) malloc(op.requiredKeyBuffer());
744 op.buffer = (char *) malloc(op.requiredBuffer());
745
746 op.clearKeyNullBits();
747 op.setKeyPart(COL_STORE_KEY, name, strlen(name));
748 op.readTuple(tx);
749 tx->execute(NdbTransaction::NoCommit);
750
751 if(tx->getNdbError().classification == NdbError::NoError) {
752 char val[256];
753 if(! op.isNull(COL_STORE_VALUE + 0)) {
754 op.copyValue(COL_STORE_VALUE + 0, val);
755 cont->external_table = ExternalValue::createContainerRecord(val);
756 }
757 }
758
759 free(op.key_buffer);
760 free(op.buffer);
761 }
762 return cont;
763 }
764
765