1 /*
2 Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 // Implements the interface defined in
26 #include "storage/ndb/plugin/ndb_schema_dist_table.h"
27
28 #include <sstream>
29
30 #include "sql/item_strfunc.h"
31 #include "storage/ndb/plugin/ndb_log.h"
32 #include "storage/ndb/plugin/ndb_retry.h"
33 #include "storage/ndb/plugin/ndb_schema_dist.h"
34 #include "storage/ndb/plugin/ndb_thd_ndb.h"
35
36 const std::string Ndb_schema_dist_table::DB_NAME = "mysql";
37 const std::string Ndb_schema_dist_table::TABLE_NAME = "ndb_schema";
38
39 const char *Ndb_schema_dist_table::COL_DB = "db";
40 const char *Ndb_schema_dist_table::COL_NAME = "name";
41 const char *Ndb_schema_dist_table::COL_QUERY = "query";
42 const char *Ndb_schema_dist_table::COL_ID = "id";
43 const char *Ndb_schema_dist_table::COL_VERSION = "version";
44 std::string Ndb_schema_dist_table::old_ndb_schema_uuid = "";
45 static const char *COL_SLOCK = "slock";
46 static const char *COL_NODEID = "node_id";
47 static const char *COL_EPOCH = "epoch";
48 static const char *COL_TYPE = "type";
49 static const char *COL_SCHEMA_OP_ID = "schema_op_id";
50
51 // Length of the schema object identifiers which can be distributed by the
52 // ndb_schema table. The legacy limit of 63 was increased in 8.0.18 to allow for
53 // "any" identifier to be distributed. NOTE! Code still supports working with a
54 // ndb_schema table using the legacy length, warning will be printed suggesting
55 // upgrade
56 static constexpr int IDENTIFIER_LENGTH = 255;
57 static constexpr int LEGACY_IDENTIFIER_LENGTH = 63;
58
59 static const char *SCHEMA_UUID_KEY = "schema_uuid";
60
Ndb_schema_dist_table(Thd_ndb * thd_ndb)61 Ndb_schema_dist_table::Ndb_schema_dist_table(Thd_ndb *thd_ndb)
62 : Ndb_util_table(thd_ndb, DB_NAME, TABLE_NAME, true) {}
63
~Ndb_schema_dist_table()64 Ndb_schema_dist_table::~Ndb_schema_dist_table() {}
65
check_schema() const66 bool Ndb_schema_dist_table::check_schema() const {
67 // db
68 // varbinary, at least 63 bytes long
69 // NOTE! The 63 bytes length for the db and name column is a legacy bug
70 // which doesn't have enough room for MySQL's max identitfier size. For
71 // backwards compatiblity reasons it's allowed to use such a schema
72 // distribution table but not all identifiers will be possible to distribute.
73 if (!(check_column_exist(COL_DB) && check_column_varbinary(COL_DB) &&
74 check_column_minlength(COL_DB, LEGACY_IDENTIFIER_LENGTH))) {
75 return false;
76 }
77
78 // name
79 // varbinary, at least 63 bytes long
80 if (!(check_column_exist(COL_NAME) && check_column_varbinary(COL_NAME) &&
81 check_column_minlength(COL_NAME, LEGACY_IDENTIFIER_LENGTH))) {
82 return false;
83 }
84 // Check that db + name is the primary key, otherwise pk operations
85 // using that key won't work
86 if (!check_primary_key({COL_DB, COL_NAME})) {
87 return false;
88 }
89
90 // slock
91 // binary, need room for at least 32 bytes(i.e 32*8 bits for 256 nodes)
92 if (!(check_column_exist(COL_SLOCK) && check_column_binary(COL_SLOCK) &&
93 check_column_minlength(COL_SLOCK, 32))) {
94 return false;
95 }
96
97 // query
98 // blob
99 if (!(check_column_exist(COL_QUERY) && check_column_blob(COL_QUERY))) {
100 return false;
101 }
102
103 // nodeid
104 // unsigned int
105 if (!(check_column_exist(COL_NODEID) && check_column_unsigned(COL_NODEID))) {
106 return false;
107 }
108
109 // epoch
110 // unsigned bigint
111 if (!(check_column_exist(COL_EPOCH) && check_column_bigunsigned(COL_EPOCH))) {
112 return false;
113 }
114
115 // id
116 // unsigned int
117 if (!(check_column_exist(COL_ID) && check_column_unsigned(COL_ID))) {
118 return false;
119 }
120
121 // version
122 // unsigned int
123 if (!(check_column_exist(COL_VERSION) &&
124 check_column_unsigned(COL_VERSION))) {
125 return false;
126 }
127
128 // type
129 // unsigned int
130 if (!(check_column_exist(COL_TYPE) && check_column_unsigned(COL_TYPE))) {
131 return false;
132 }
133
134 // schema_op_id
135 // unsigned int
136 if (!check_column_exist(COL_SCHEMA_OP_ID)) {
137 // This is an optional column added in 8.0.17. Functionality depending on
138 // this column will be conditional but warnings are pushed to alert the user
139 // that upgrade is eventually necessary
140 ;
141 } else {
142 // Column exists, check proper type
143 if (!(check_column_unsigned(COL_SCHEMA_OP_ID) &&
144 check_column_nullable(COL_SCHEMA_OP_ID, true))) {
145 return false;
146 }
147 }
148
149 return true;
150 }
151
check_column_identifier_limit(const char * column_name,const std::string & identifier) const152 bool Ndb_schema_dist_table::check_column_identifier_limit(
153 const char *column_name, const std::string &identifier) const {
154 DBUG_TRACE;
155 DBUG_PRINT("enter", ("column_name: '%s', identifier: '%s'", column_name,
156 identifier.c_str()));
157
158 if (!check_column_exist(column_name)) {
159 return false;
160 }
161
162 const int max_length = DBUG_EVALUATE_IF("ndb_schema_dist_63byte_limit", 63,
163 get_column_max_length(column_name));
164
165 if (identifier.length() > static_cast<size_t>(max_length)) {
166 push_warning("Identifier length exceeds the %d byte limit", max_length);
167 return false;
168 }
169 return true;
170 }
171
define_table_ndb(NdbDictionary::Table & new_table,unsigned mysql_version) const172 bool Ndb_schema_dist_table::define_table_ndb(NdbDictionary::Table &new_table,
173 unsigned mysql_version) const {
174 // Set metadata for backwards compatibility support, earlier versions
175 // will see what they expect and can connect to NDB properly. The physical
176 // table in NDB may be extended to support new functionality but should still
177 // be possible to use.
178 constexpr uint8 legacy_metadata[418] = {
179 0x01, 0x00, 0x00, 0x00, 0x6a, 0x22, 0x00, 0x00, 0x96, 0x01, 0x00, 0x00,
180 0x78, 0x9c, 0xed, 0xd8, 0x3d, 0x4b, 0xc3, 0x50, 0x14, 0x06, 0xe0, 0x37,
181 0x89, 0x89, 0x37, 0xb1, 0xd4, 0x0f, 0x82, 0x83, 0xd3, 0x75, 0x10, 0xb4,
182 0x83, 0x6d, 0x45, 0xdd, 0xa4, 0xa6, 0x28, 0x5a, 0xfc, 0x2a, 0xa5, 0x83,
183 0x9d, 0xc4, 0x36, 0x01, 0xeb, 0x47, 0xab, 0xad, 0x0a, 0x0e, 0x4a, 0xfd,
184 0x29, 0xce, 0x0e, 0x8e, 0x0e, 0x0e, 0x42, 0x07, 0x7f, 0x88, 0xbf, 0x43,
185 0x7a, 0x3d, 0x89, 0x55, 0x3a, 0xba, 0x45, 0xf0, 0x3c, 0x4b, 0xce, 0x79,
186 0x39, 0xe1, 0xde, 0x33, 0x26, 0x3d, 0xcd, 0x49, 0x1a, 0xc0, 0x98, 0x06,
187 0x64, 0x80, 0xba, 0xd6, 0xc5, 0x0f, 0x3d, 0x05, 0x1b, 0x30, 0xc3, 0x52,
188 0x7c, 0x67, 0x75, 0x9a, 0x9b, 0x79, 0x03, 0xf6, 0xa3, 0x2e, 0x09, 0xa4,
189 0xd3, 0x80, 0x04, 0x63, 0x8c, 0x31, 0xc6, 0x18, 0x63, 0x8c, 0x31, 0xc6,
190 0xfe, 0x32, 0x4d, 0x07, 0x1c, 0x7a, 0xde, 0x41, 0x37, 0xa8, 0xeb, 0xd0,
191 0xf7, 0xbd, 0xb6, 0x9a, 0x83, 0xde, 0xf1, 0xbe, 0x0a, 0x55, 0x2c, 0x15,
192 0x76, 0xbc, 0x52, 0x45, 0xc5, 0x7d, 0x51, 0x16, 0x3f, 0x07, 0x0d, 0xbf,
193 0x5a, 0x3b, 0xbd, 0x6a, 0x5f, 0x06, 0xad, 0x79, 0xea, 0x65, 0xd1, 0x2b,
194 0x95, 0x0b, 0xe5, 0xc2, 0xde, 0xae, 0xcc, 0x57, 0xe4, 0xd6, 0x7a, 0x45,
195 0xa6, 0x53, 0xd3, 0x4b, 0x99, 0xe5, 0x6c, 0x56, 0x7a, 0xdb, 0x1b, 0x7b,
196 0xa5, 0x42, 0x79, 0x73, 0x47, 0xae, 0xc8, 0x05, 0x99, 0x4a, 0xcb, 0xd9,
197 0x39, 0x68, 0x13, 0x71, 0x2f, 0xc0, 0x18, 0x63, 0x8c, 0x31, 0xc6, 0x18,
198 0x63, 0xff, 0xd7, 0xb1, 0x8e, 0xb1, 0xb8, 0xef, 0x10, 0x27, 0x0d, 0x36,
199 0x6e, 0xf1, 0x4e, 0x55, 0x17, 0x8b, 0x3f, 0x69, 0x11, 0x93, 0xfd, 0xea,
200 0x16, 0x8e, 0xad, 0xbb, 0x73, 0xf2, 0x97, 0x30, 0x04, 0xc3, 0xaf, 0xc2,
201 0x84, 0xd9, 0x38, 0x3c, 0x0b, 0x60, 0xc1, 0x6a, 0x9f, 0x36, 0x6b, 0x27,
202 0x18, 0x86, 0x75, 0x71, 0x15, 0xb4, 0x6e, 0x20, 0x20, 0x1a, 0x4d, 0x3f,
203 0x38, 0xa8, 0xfb, 0x74, 0xb0, 0x15, 0x9c, 0x37, 0x6b, 0x47, 0xf4, 0x59,
204 0x6d, 0x50, 0x3b, 0x02, 0x71, 0x1d, 0xb4, 0xda, 0xf5, 0x66, 0x03, 0x09,
205 0x98, 0x97, 0x37, 0xe7, 0x01, 0x86, 0x8c, 0x5c, 0x0e, 0xd1, 0x1f, 0x19,
206 0xba, 0xc9, 0x68, 0x0e, 0x30, 0x4d, 0x0a, 0xbc, 0x81, 0xc0, 0xb2, 0xe8,
207 0xcc, 0xfb, 0x7e, 0xd0, 0xa3, 0x60, 0xd8, 0x12, 0x02, 0x0f, 0xc0, 0xf8,
208 0x9a, 0x0b, 0x7c, 0x50, 0x20, 0x84, 0xe3, 0xe0, 0x11, 0x98, 0x0a, 0x27,
209 0x0c, 0x01, 0xd8, 0x96, 0xeb, 0xe2, 0x09, 0xc8, 0x87, 0x01, 0x0d, 0xc3,
210 0x31, 0x68, 0xe2, 0x79, 0x60, 0x62, 0x24, 0x7c, 0xe5, 0x65, 0x20, 0x48,
211 0x98, 0x14, 0xbc, 0x0e, 0x04, 0xca, 0xaf, 0xaa, 0x70, 0x41, 0x15, 0x6d,
212 0xa7, 0xa2, 0xd5, 0x54, 0x7f, 0x2f, 0x15, 0x2d, 0xa5, 0xa8, 0xe8, 0xaf,
213 0xa3, 0xc2, 0x5d, 0x14, 0x3e, 0x01, 0x4d, 0x53, 0x5e, 0x81};
214 if (new_table.setFrm(legacy_metadata, sizeof(legacy_metadata)) != 0) {
215 push_warning("Failed to set legacy metadata");
216 return false;
217 }
218
219 new_table.setForceVarPart(true);
220
221 // Allow table to be read+write also in single user mode
222 new_table.setSingleUserMode(NdbDictionary::Table::SingleUserModeReadWrite);
223
224 // The length of "db" and "name" was adjusted in 8.0.18 to allow
225 // passing 255 bytes long identifiers
226 int db_and_name_length = IDENTIFIER_LENGTH;
227 if (mysql_version < 80018) {
228 // Use legacy identifier length when creating the table for
229 // backwards compatibility testing
230 db_and_name_length = LEGACY_IDENTIFIER_LENGTH;
231 }
232
233 {
234 // db VARBINARY(255) NOT NULL
235 NdbDictionary::Column col_db(COL_DB);
236 col_db.setType(NdbDictionary::Column::Varbinary);
237 col_db.setLength(db_and_name_length);
238 col_db.setNullable(false);
239 col_db.setPrimaryKey(true);
240 if (!define_table_add_column(new_table, col_db)) return false;
241 }
242
243 {
244 // name VARBINARY(255) NOT NULL
245 NdbDictionary::Column col_name(COL_NAME);
246 col_name.setType(NdbDictionary::Column::Varbinary);
247 col_name.setLength(db_and_name_length);
248 col_name.setNullable(false);
249 col_name.setPrimaryKey(true);
250 if (!define_table_add_column(new_table, col_name)) return false;
251 }
252
253 {
254 // slock BINARY(32) NOT NULL
255 NdbDictionary::Column col_slock(COL_SLOCK);
256 col_slock.setType(NdbDictionary::Column::Binary);
257 col_slock.setLength(32);
258 col_slock.setNullable(false);
259 if (!define_table_add_column(new_table, col_slock)) return false;
260 }
261
262 {
263 // query BLOB NOT NULL
264 NdbDictionary::Column col_query(COL_QUERY);
265 col_query.setType(NdbDictionary::Column::Blob);
266 col_query.setInlineSize(256);
267 col_query.setPartSize(2000);
268 col_query.setStripeSize(0);
269 col_query.setNullable(false);
270 if (!define_table_add_column(new_table, col_query)) return false;
271 }
272
273 {
274 // node_id INT UNSIGNED NOT NULL
275 NdbDictionary::Column col_nodeid(COL_NODEID);
276 col_nodeid.setType(NdbDictionary::Column::Unsigned);
277 col_nodeid.setNullable(false);
278 if (!define_table_add_column(new_table, col_nodeid)) return false;
279 }
280
281 {
282 // epoch BIGINT UNSIGNED NOT NULL
283 NdbDictionary::Column col_epoch(COL_EPOCH);
284 col_epoch.setType(NdbDictionary::Column::Bigunsigned);
285 col_epoch.setNullable(false);
286 if (!define_table_add_column(new_table, col_epoch)) return false;
287 }
288
289 {
290 // id INT UNSIGNED NOT NULL
291 NdbDictionary::Column col_id(COL_ID);
292 col_id.setType(NdbDictionary::Column::Unsigned);
293 col_id.setNullable(false);
294 if (!define_table_add_column(new_table, col_id)) return false;
295 }
296
297 {
298 // version INT UNSIGNED NOT NULL
299 NdbDictionary::Column col_version(COL_VERSION);
300 col_version.setType(NdbDictionary::Column::Unsigned);
301 col_version.setNullable(false);
302 if (!define_table_add_column(new_table, col_version)) return false;
303 }
304
305 {
306 // type INT UNSIGNED NOT NULL
307 NdbDictionary::Column col_type(COL_TYPE);
308 col_type.setType(NdbDictionary::Column::Unsigned);
309 col_type.setNullable(false);
310 if (!define_table_add_column(new_table, col_type)) return false;
311 }
312
313 if (mysql_version >= 80017) {
314 // schema_op_id INT UNSIGNED NULL
315 NdbDictionary::Column col_schema_op_id(COL_SCHEMA_OP_ID);
316 col_schema_op_id.setType(NdbDictionary::Column::Unsigned);
317 col_schema_op_id.setNullable(true); // NULL!
318 if (!define_table_add_column(new_table, col_schema_op_id)) return false;
319 }
320
321 return true;
322 }
323
need_upgrade() const324 bool Ndb_schema_dist_table::need_upgrade() const {
325 // Check that 'schema_op_id' column exists. If exists, it's used for sending
326 // the schema_op_id from client to participants who can then use it when
327 // replying using ndb_schema_result (if they support that table)
328 if (!have_schema_op_id_column()) {
329 return true;
330 }
331 // The 'db' and 'name' column need to be upgrade if length is shorter than
332 // current identifier length
333 if (get_column_max_length(COL_DB) < IDENTIFIER_LENGTH ||
334 get_column_max_length(COL_NAME) < IDENTIFIER_LENGTH) {
335 return true;
336 }
337 return false;
338 }
339
drop_events_in_NDB() const340 bool Ndb_schema_dist_table::drop_events_in_NDB() const {
341 // Drop the default event on ndb_schema table
342 if (!drop_event_in_NDB("REPL$mysql/ndb_schema")) return false;
343
344 // Legacy event on ndb_schema table, drop since it might
345 // have been created(although ages ago)
346 if (!drop_event_in_NDB("REPLF$mysql/ndb_schema")) return false;
347
348 return true;
349 }
350
define_table_dd() const351 std::string Ndb_schema_dist_table::define_table_dd() const {
352 std::stringstream ss;
353 ss << "CREATE TABLE " << db_name() << "." << table_name() << "(\n";
354 ss << "db VARBINARY(" << get_column_max_length(COL_DB) << ") NOT NULL,";
355 ss << "name VARBINARY(" << get_column_max_length(COL_NAME) << ") NOT NULL,";
356 ss << "slock BINARY(32) NOT NULL,"
357 "query BLOB NOT NULL,"
358 "node_id INT UNSIGNED NOT NULL,"
359 "epoch BIGINT UNSIGNED NOT NULL,"
360 "id INT UNSIGNED NOT NULL,"
361 "version INT UNSIGNED NOT NULL,"
362 "type INT UNSIGNED NOT NULL,";
363 if (have_schema_op_id_column()) {
364 ss << "schema_op_id INT UNSIGNED NULL,";
365 }
366 ss << "PRIMARY KEY USING HASH (db,name)"
367 << ") ENGINE=ndbcluster CHARACTER SET latin1";
368 return ss.str();
369 }
370
get_slock_bytes() const371 int Ndb_schema_dist_table::get_slock_bytes() const {
372 return get_column_max_length(COL_SLOCK);
373 }
374
have_schema_op_id_column() const375 bool Ndb_schema_dist_table::have_schema_op_id_column() const {
376 const NdbDictionary::Table *ndb_tab = get_table();
377 return ndb_tab->getColumn(COL_SCHEMA_OP_ID);
378 }
379
380 // Helper functions to read and write properties into the query column of
381 // the schema UUID tuple. The properties are written in the form of
382 // "key1=value1;key2=value2;"
map_extract_key_value_string(const std::map<std::string,std::string> & kv_map)383 static std::string map_extract_key_value_string(
384 const std::map<std::string, std::string> &kv_map) {
385 std::string key_value_str;
386 std::for_each(kv_map.begin(), kv_map.end(),
387 [&](std::pair<std::string, std::string> entry) -> void {
388 key_value_str.append(entry.first);
389 key_value_str.append("=");
390 key_value_str.append(entry.second);
391 key_value_str.append(";");
392 });
393 return key_value_str;
394 }
395
key_value_str_get_value(const std::string & kv_str,const char * key)396 static std::string key_value_str_get_value(const std::string &kv_str,
397 const char *key) {
398 std::istringstream kv_ss(kv_str);
399 std::string kv_pair;
400 while (std::getline(kv_ss, kv_pair, ';')) {
401 int key_length = kv_pair.find('=');
402 DBUG_ASSERT(key_length > 0);
403 if (kv_pair.substr(0, key_length).compare(key) == 0) {
404 return kv_pair.substr(key_length + 1);
405 }
406 }
407 return "";
408 }
409
get_schema_uuid(std::string * schema_uuid) const410 bool Ndb_schema_dist_table::get_schema_uuid(std::string *schema_uuid) const {
411 DBUG_TRACE;
412 const NdbDictionary::Table *ndb_table = get_table();
413
414 // Pack the table and db names to be used during read into table
415 char db_name_buf[FN_REFLEN];
416 char table_name_buf[FN_REFLEN];
417 pack_varbinary(COL_DB, DB_NAME.c_str(), db_name_buf);
418 pack_varbinary(COL_NAME, TABLE_NAME.c_str(), table_name_buf);
419 std::string query_col_value;
420
421 // Lambda read function to execute using ndb_trans_retry()
422 std::function<const NdbError *(NdbTransaction *)> read_ndb_schema_func =
423 [&](NdbTransaction *trans) -> const NdbError * {
424 DBUG_TRACE;
425 NdbOperation *read_op = trans->getNdbOperation(ndb_table);
426 if (read_op == nullptr) return &trans->getNdbError();
427
428 // Define read operation based on 'db_name, table_name' key
429 if (read_op->readTuple() != 0 || read_op->equal(COL_DB, db_name_buf) != 0 ||
430 read_op->equal(COL_NAME, table_name_buf) != 0) {
431 return &read_op->getNdbError();
432 }
433
434 // Setup read for the query column value.
435 NdbBlob *query_blob_handle = read_op->getBlobHandle(COL_QUERY);
436 if (!query_blob_handle) {
437 return &read_op->getNdbError();
438 }
439
440 if (trans->execute(NdbTransaction::NoCommit,
441 NdbOperation::DefaultAbortOption,
442 1 /* force send */) != 0) {
443 // Execute failed.
444 return &trans->getNdbError();
445 }
446
447 // Transaction execute succeeded. Check the operation for errors
448 const NdbError &read_op_error = read_op->getNdbError();
449 if (read_op_error.code == 0) {
450 // The tuple exists. Read the value from query blob and return
451 if (!unpack_blob_not_null(query_blob_handle, &query_col_value)) {
452 return &query_blob_handle->getNdbError();
453 }
454 return nullptr;
455 } else if (read_op_error.classification ==
456 NdbError::Classification::NoDataFound) {
457 // The tuple doesn't exist.
458 ndb_log_verbose(19, "The schema UUID tuple doesn't exist");
459 return nullptr;
460 } else {
461 // Operation failed with an unexpected error
462 return &read_op_error;
463 }
464 };
465
466 NdbError ndb_err;
467 if (!ndb_trans_retry(get_ndb(), get_thd(), ndb_err, read_ndb_schema_func)) {
468 push_warning("Failed to read the schema UUID tuple: %s(%d).",
469 ndb_err.message, ndb_err.code);
470 return false;
471 }
472
473 if (query_col_value.size() == 0) {
474 // Schema UUID is not present
475 ndb_log_info("Schema UUID not present in ndb_schema table");
476 return true;
477 }
478
479 // The tuple with Schema UUID exists. It is stored as a key value
480 // pair of form "schema_uuid=<UUID>;". Extract the value and return.
481 schema_uuid->assign(
482 key_value_str_get_value(query_col_value, SCHEMA_UUID_KEY));
483 DBUG_ASSERT(schema_uuid->size() == UUID_LENGTH);
484 ndb_log_verbose(19, "Schema UUID read from NDB : %s", schema_uuid->c_str());
485 return true;
486 }
487
update_schema_uuid_in_NDB(const std::string & schema_uuid) const488 bool Ndb_schema_dist_table::update_schema_uuid_in_NDB(
489 const std::string &schema_uuid) const {
490 DBUG_TRACE;
491
492 const NdbDictionary::Table *ndb_table = get_table();
493 DBUG_ASSERT(ndb_table != nullptr);
494
495 // Store the UUID as a key value pair of form "schema_uuid=<UUID>;"
496 std::map<std::string, std::string> ndb_schema_props;
497 ndb_schema_props.insert({SCHEMA_UUID_KEY, schema_uuid.c_str()});
498 const std::string ndb_schema_props_str =
499 map_extract_key_value_string(ndb_schema_props);
500
501 // Pack db and table_name
502 char db_buf[FN_REFLEN];
503 char name_buf[FN_REFLEN];
504 pack_varbinary(COL_DB, DB_NAME.c_str(), db_buf);
505 pack_varbinary(COL_NAME, TABLE_NAME.c_str(), name_buf);
506
507 // Function for writing row to ndb_schema
508 std::function<const NdbError *(NdbTransaction *)> write_schema_op_func =
509 [&](NdbTransaction *trans) -> const NdbError * {
510 DBUG_TRACE;
511
512 NdbOperation *op = trans->getNdbOperation(ndb_table);
513 if (op == nullptr) return &trans->getNdbError();
514
515 // Buffer with zeroes for slock
516 std::vector<char> slock_zeroes;
517 slock_zeroes.assign(get_slock_bytes(), 0);
518 const char *slock_buf = slock_zeroes.data();
519
520 const Uint64 log_epoch = 0;
521 if (op->writeTuple() != 0 || op->equal(COL_DB, db_buf) != 0 ||
522 op->equal(COL_NAME, name_buf) != 0 ||
523 op->setValue(COL_SLOCK, slock_buf) != 0 ||
524 op->setValue(COL_NODEID, 0) != 0 ||
525 op->setValue(COL_EPOCH, log_epoch) != 0 ||
526 op->setValue(COL_ID, 0) != 0 || op->setValue(COL_VERSION, 0) != 0 ||
527 op->setValue(COL_TYPE, SOT_CREATE_TABLE) != 0 ||
528 op->setAnyValue(0) != 0)
529 return &op->getNdbError();
530
531 if (have_schema_op_id_column() && op->setValue(COL_SCHEMA_OP_ID, 0) != 0)
532 return &op->getNdbError();
533
534 NdbBlob *ndb_blob = op->getBlobHandle(COL_QUERY);
535 if (ndb_blob == nullptr) return &op->getNdbError();
536
537 if (ndb_blob->setValue(ndb_schema_props_str.c_str(),
538 ndb_schema_props_str.length()) != 0)
539 return &ndb_blob->getNdbError();
540
541 if (trans->execute(NdbTransaction::Commit, NdbOperation::DefaultAbortOption,
542 1 /* force send */) != 0) {
543 return &trans->getNdbError();
544 }
545
546 return nullptr;
547 };
548
549 NdbError ndb_err;
550 if (!ndb_trans_retry(get_ndb(), get_thd(), ndb_err, write_schema_op_func)) {
551 push_warning(
552 "Failed to update schema UUID in 'mysql.ndb_schema' table. Code : %d. "
553 "Error : %s",
554 ndb_err.code, ndb_err.message);
555 return false;
556 }
557
558 return true;
559 }
560
pre_upgrade() const561 bool Ndb_schema_dist_table::pre_upgrade() const {
562 // During upgrade, the schema UUID need not be regenerated.
563 // Save it for restoring it later after upgrade
564 if (!get_schema_uuid(&old_ndb_schema_uuid)) return false;
565 return true;
566 }
567
post_install() const568 bool Ndb_schema_dist_table::post_install() const {
569 DBUG_TRACE;
570
571 std::string schema_uuid;
572 if (old_ndb_schema_uuid.empty()) {
573 // The table was just created
574 // Generate a new schema uuid for the table
575 String schema_uuid_buf;
576 mysql_generate_uuid(&schema_uuid_buf);
577 schema_uuid.assign(schema_uuid_buf.ptr(), schema_uuid_buf.length());
578 ndb_log_verbose(19, "Generated new schema UUID : %s", schema_uuid.c_str());
579 } else {
580 // The table was just upgraded
581 // Restore the old schema UUID
582 schema_uuid = std::move(old_ndb_schema_uuid);
583 ndb_log_verbose(19, "Restoring schema UUID : %s after upgrade",
584 schema_uuid.c_str());
585 }
586
587 // Update the UUID in the ndb_schema_table
588 if (!update_schema_uuid_in_NDB(schema_uuid)) {
589 return false;
590 }
591 return true;
592 }
593