1 /*
2    Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 // Implements the interface defined in
26 #include "storage/ndb/plugin/ndb_schema_dist_table.h"
27 
28 #include <sstream>
29 
30 #include "sql/item_strfunc.h"
31 #include "storage/ndb/plugin/ndb_log.h"
32 #include "storage/ndb/plugin/ndb_retry.h"
33 #include "storage/ndb/plugin/ndb_schema_dist.h"
34 #include "storage/ndb/plugin/ndb_thd_ndb.h"
35 
36 const std::string Ndb_schema_dist_table::DB_NAME = "mysql";
37 const std::string Ndb_schema_dist_table::TABLE_NAME = "ndb_schema";
38 
39 const char *Ndb_schema_dist_table::COL_DB = "db";
40 const char *Ndb_schema_dist_table::COL_NAME = "name";
41 const char *Ndb_schema_dist_table::COL_QUERY = "query";
42 const char *Ndb_schema_dist_table::COL_ID = "id";
43 const char *Ndb_schema_dist_table::COL_VERSION = "version";
44 std::string Ndb_schema_dist_table::old_ndb_schema_uuid = "";
45 static const char *COL_SLOCK = "slock";
46 static const char *COL_NODEID = "node_id";
47 static const char *COL_EPOCH = "epoch";
48 static const char *COL_TYPE = "type";
49 static const char *COL_SCHEMA_OP_ID = "schema_op_id";
50 
51 // Length of the schema object identifiers which can be distributed by the
52 // ndb_schema table. The legacy limit of 63 was increased in 8.0.18 to allow for
53 // "any" identifier to be distributed. NOTE! Code still supports working with a
54 // ndb_schema table using the legacy length, warning will be printed suggesting
55 // upgrade
56 static constexpr int IDENTIFIER_LENGTH = 255;
57 static constexpr int LEGACY_IDENTIFIER_LENGTH = 63;
58 
59 static const char *SCHEMA_UUID_KEY = "schema_uuid";
60 
Ndb_schema_dist_table(Thd_ndb * thd_ndb)61 Ndb_schema_dist_table::Ndb_schema_dist_table(Thd_ndb *thd_ndb)
62     : Ndb_util_table(thd_ndb, DB_NAME, TABLE_NAME, true) {}
63 
~Ndb_schema_dist_table()64 Ndb_schema_dist_table::~Ndb_schema_dist_table() {}
65 
check_schema() const66 bool Ndb_schema_dist_table::check_schema() const {
67   // db
68   // varbinary, at least 63 bytes long
69   // NOTE! The 63 bytes length for the db and name column is a legacy bug
70   // which doesn't have enough room for MySQL's max identitfier size. For
71   // backwards compatiblity reasons it's allowed to use such a schema
72   // distribution table but not all identifiers will be possible to distribute.
73   if (!(check_column_exist(COL_DB) && check_column_varbinary(COL_DB) &&
74         check_column_minlength(COL_DB, LEGACY_IDENTIFIER_LENGTH))) {
75     return false;
76   }
77 
78   // name
79   // varbinary, at least 63 bytes long
80   if (!(check_column_exist(COL_NAME) && check_column_varbinary(COL_NAME) &&
81         check_column_minlength(COL_NAME, LEGACY_IDENTIFIER_LENGTH))) {
82     return false;
83   }
84   // Check that db + name is the primary key, otherwise pk operations
85   // using that key won't work
86   if (!check_primary_key({COL_DB, COL_NAME})) {
87     return false;
88   }
89 
90   // slock
91   // binary, need room for at least 32 bytes(i.e 32*8 bits for 256 nodes)
92   if (!(check_column_exist(COL_SLOCK) && check_column_binary(COL_SLOCK) &&
93         check_column_minlength(COL_SLOCK, 32))) {
94     return false;
95   }
96 
97   // query
98   // blob
99   if (!(check_column_exist(COL_QUERY) && check_column_blob(COL_QUERY))) {
100     return false;
101   }
102 
103   // nodeid
104   // unsigned int
105   if (!(check_column_exist(COL_NODEID) && check_column_unsigned(COL_NODEID))) {
106     return false;
107   }
108 
109   // epoch
110   // unsigned bigint
111   if (!(check_column_exist(COL_EPOCH) && check_column_bigunsigned(COL_EPOCH))) {
112     return false;
113   }
114 
115   // id
116   // unsigned int
117   if (!(check_column_exist(COL_ID) && check_column_unsigned(COL_ID))) {
118     return false;
119   }
120 
121   // version
122   // unsigned int
123   if (!(check_column_exist(COL_VERSION) &&
124         check_column_unsigned(COL_VERSION))) {
125     return false;
126   }
127 
128   // type
129   // unsigned int
130   if (!(check_column_exist(COL_TYPE) && check_column_unsigned(COL_TYPE))) {
131     return false;
132   }
133 
134   // schema_op_id
135   // unsigned int
136   if (!check_column_exist(COL_SCHEMA_OP_ID)) {
137     // This is an optional column added in 8.0.17. Functionality depending on
138     // this column will be conditional but warnings are pushed to alert the user
139     // that upgrade is eventually necessary
140     ;
141   } else {
142     // Column exists, check proper type
143     if (!(check_column_unsigned(COL_SCHEMA_OP_ID) &&
144           check_column_nullable(COL_SCHEMA_OP_ID, true))) {
145       return false;
146     }
147   }
148 
149   return true;
150 }
151 
check_column_identifier_limit(const char * column_name,const std::string & identifier) const152 bool Ndb_schema_dist_table::check_column_identifier_limit(
153     const char *column_name, const std::string &identifier) const {
154   DBUG_TRACE;
155   DBUG_PRINT("enter", ("column_name: '%s', identifier: '%s'", column_name,
156                        identifier.c_str()));
157 
158   if (!check_column_exist(column_name)) {
159     return false;
160   }
161 
162   const int max_length = DBUG_EVALUATE_IF("ndb_schema_dist_63byte_limit", 63,
163                                           get_column_max_length(column_name));
164 
165   if (identifier.length() > static_cast<size_t>(max_length)) {
166     push_warning("Identifier length exceeds the %d byte limit", max_length);
167     return false;
168   }
169   return true;
170 }
171 
define_table_ndb(NdbDictionary::Table & new_table,unsigned mysql_version) const172 bool Ndb_schema_dist_table::define_table_ndb(NdbDictionary::Table &new_table,
173                                              unsigned mysql_version) const {
174   // Set metadata for backwards compatibility support, earlier versions
175   // will see what they expect and can connect to NDB properly. The physical
176   // table in NDB may be extended to support new functionality but should still
177   // be possible to use.
178   constexpr uint8 legacy_metadata[418] = {
179       0x01, 0x00, 0x00, 0x00, 0x6a, 0x22, 0x00, 0x00, 0x96, 0x01, 0x00, 0x00,
180       0x78, 0x9c, 0xed, 0xd8, 0x3d, 0x4b, 0xc3, 0x50, 0x14, 0x06, 0xe0, 0x37,
181       0x89, 0x89, 0x37, 0xb1, 0xd4, 0x0f, 0x82, 0x83, 0xd3, 0x75, 0x10, 0xb4,
182       0x83, 0x6d, 0x45, 0xdd, 0xa4, 0xa6, 0x28, 0x5a, 0xfc, 0x2a, 0xa5, 0x83,
183       0x9d, 0xc4, 0x36, 0x01, 0xeb, 0x47, 0xab, 0xad, 0x0a, 0x0e, 0x4a, 0xfd,
184       0x29, 0xce, 0x0e, 0x8e, 0x0e, 0x0e, 0x42, 0x07, 0x7f, 0x88, 0xbf, 0x43,
185       0x7a, 0x3d, 0x89, 0x55, 0x3a, 0xba, 0x45, 0xf0, 0x3c, 0x4b, 0xce, 0x79,
186       0x39, 0xe1, 0xde, 0x33, 0x26, 0x3d, 0xcd, 0x49, 0x1a, 0xc0, 0x98, 0x06,
187       0x64, 0x80, 0xba, 0xd6, 0xc5, 0x0f, 0x3d, 0x05, 0x1b, 0x30, 0xc3, 0x52,
188       0x7c, 0x67, 0x75, 0x9a, 0x9b, 0x79, 0x03, 0xf6, 0xa3, 0x2e, 0x09, 0xa4,
189       0xd3, 0x80, 0x04, 0x63, 0x8c, 0x31, 0xc6, 0x18, 0x63, 0x8c, 0x31, 0xc6,
190       0xfe, 0x32, 0x4d, 0x07, 0x1c, 0x7a, 0xde, 0x41, 0x37, 0xa8, 0xeb, 0xd0,
191       0xf7, 0xbd, 0xb6, 0x9a, 0x83, 0xde, 0xf1, 0xbe, 0x0a, 0x55, 0x2c, 0x15,
192       0x76, 0xbc, 0x52, 0x45, 0xc5, 0x7d, 0x51, 0x16, 0x3f, 0x07, 0x0d, 0xbf,
193       0x5a, 0x3b, 0xbd, 0x6a, 0x5f, 0x06, 0xad, 0x79, 0xea, 0x65, 0xd1, 0x2b,
194       0x95, 0x0b, 0xe5, 0xc2, 0xde, 0xae, 0xcc, 0x57, 0xe4, 0xd6, 0x7a, 0x45,
195       0xa6, 0x53, 0xd3, 0x4b, 0x99, 0xe5, 0x6c, 0x56, 0x7a, 0xdb, 0x1b, 0x7b,
196       0xa5, 0x42, 0x79, 0x73, 0x47, 0xae, 0xc8, 0x05, 0x99, 0x4a, 0xcb, 0xd9,
197       0x39, 0x68, 0x13, 0x71, 0x2f, 0xc0, 0x18, 0x63, 0x8c, 0x31, 0xc6, 0x18,
198       0x63, 0xff, 0xd7, 0xb1, 0x8e, 0xb1, 0xb8, 0xef, 0x10, 0x27, 0x0d, 0x36,
199       0x6e, 0xf1, 0x4e, 0x55, 0x17, 0x8b, 0x3f, 0x69, 0x11, 0x93, 0xfd, 0xea,
200       0x16, 0x8e, 0xad, 0xbb, 0x73, 0xf2, 0x97, 0x30, 0x04, 0xc3, 0xaf, 0xc2,
201       0x84, 0xd9, 0x38, 0x3c, 0x0b, 0x60, 0xc1, 0x6a, 0x9f, 0x36, 0x6b, 0x27,
202       0x18, 0x86, 0x75, 0x71, 0x15, 0xb4, 0x6e, 0x20, 0x20, 0x1a, 0x4d, 0x3f,
203       0x38, 0xa8, 0xfb, 0x74, 0xb0, 0x15, 0x9c, 0x37, 0x6b, 0x47, 0xf4, 0x59,
204       0x6d, 0x50, 0x3b, 0x02, 0x71, 0x1d, 0xb4, 0xda, 0xf5, 0x66, 0x03, 0x09,
205       0x98, 0x97, 0x37, 0xe7, 0x01, 0x86, 0x8c, 0x5c, 0x0e, 0xd1, 0x1f, 0x19,
206       0xba, 0xc9, 0x68, 0x0e, 0x30, 0x4d, 0x0a, 0xbc, 0x81, 0xc0, 0xb2, 0xe8,
207       0xcc, 0xfb, 0x7e, 0xd0, 0xa3, 0x60, 0xd8, 0x12, 0x02, 0x0f, 0xc0, 0xf8,
208       0x9a, 0x0b, 0x7c, 0x50, 0x20, 0x84, 0xe3, 0xe0, 0x11, 0x98, 0x0a, 0x27,
209       0x0c, 0x01, 0xd8, 0x96, 0xeb, 0xe2, 0x09, 0xc8, 0x87, 0x01, 0x0d, 0xc3,
210       0x31, 0x68, 0xe2, 0x79, 0x60, 0x62, 0x24, 0x7c, 0xe5, 0x65, 0x20, 0x48,
211       0x98, 0x14, 0xbc, 0x0e, 0x04, 0xca, 0xaf, 0xaa, 0x70, 0x41, 0x15, 0x6d,
212       0xa7, 0xa2, 0xd5, 0x54, 0x7f, 0x2f, 0x15, 0x2d, 0xa5, 0xa8, 0xe8, 0xaf,
213       0xa3, 0xc2, 0x5d, 0x14, 0x3e, 0x01, 0x4d, 0x53, 0x5e, 0x81};
214   if (new_table.setFrm(legacy_metadata, sizeof(legacy_metadata)) != 0) {
215     push_warning("Failed to set legacy metadata");
216     return false;
217   }
218 
219   new_table.setForceVarPart(true);
220 
221   // Allow table to be read+write also in single user mode
222   new_table.setSingleUserMode(NdbDictionary::Table::SingleUserModeReadWrite);
223 
224   // The length of "db" and "name" was adjusted in 8.0.18 to allow
225   // passing 255 bytes long identifiers
226   int db_and_name_length = IDENTIFIER_LENGTH;
227   if (mysql_version < 80018) {
228     // Use legacy identifier length when creating the table for
229     // backwards compatibility testing
230     db_and_name_length = LEGACY_IDENTIFIER_LENGTH;
231   }
232 
233   {
234     // db VARBINARY(255) NOT NULL
235     NdbDictionary::Column col_db(COL_DB);
236     col_db.setType(NdbDictionary::Column::Varbinary);
237     col_db.setLength(db_and_name_length);
238     col_db.setNullable(false);
239     col_db.setPrimaryKey(true);
240     if (!define_table_add_column(new_table, col_db)) return false;
241   }
242 
243   {
244     // name VARBINARY(255) NOT NULL
245     NdbDictionary::Column col_name(COL_NAME);
246     col_name.setType(NdbDictionary::Column::Varbinary);
247     col_name.setLength(db_and_name_length);
248     col_name.setNullable(false);
249     col_name.setPrimaryKey(true);
250     if (!define_table_add_column(new_table, col_name)) return false;
251   }
252 
253   {
254     // slock BINARY(32) NOT NULL
255     NdbDictionary::Column col_slock(COL_SLOCK);
256     col_slock.setType(NdbDictionary::Column::Binary);
257     col_slock.setLength(32);
258     col_slock.setNullable(false);
259     if (!define_table_add_column(new_table, col_slock)) return false;
260   }
261 
262   {
263     // query BLOB NOT NULL
264     NdbDictionary::Column col_query(COL_QUERY);
265     col_query.setType(NdbDictionary::Column::Blob);
266     col_query.setInlineSize(256);
267     col_query.setPartSize(2000);
268     col_query.setStripeSize(0);
269     col_query.setNullable(false);
270     if (!define_table_add_column(new_table, col_query)) return false;
271   }
272 
273   {
274     // node_id INT UNSIGNED NOT NULL
275     NdbDictionary::Column col_nodeid(COL_NODEID);
276     col_nodeid.setType(NdbDictionary::Column::Unsigned);
277     col_nodeid.setNullable(false);
278     if (!define_table_add_column(new_table, col_nodeid)) return false;
279   }
280 
281   {
282     // epoch BIGINT UNSIGNED NOT NULL
283     NdbDictionary::Column col_epoch(COL_EPOCH);
284     col_epoch.setType(NdbDictionary::Column::Bigunsigned);
285     col_epoch.setNullable(false);
286     if (!define_table_add_column(new_table, col_epoch)) return false;
287   }
288 
289   {
290     // id INT UNSIGNED NOT NULL
291     NdbDictionary::Column col_id(COL_ID);
292     col_id.setType(NdbDictionary::Column::Unsigned);
293     col_id.setNullable(false);
294     if (!define_table_add_column(new_table, col_id)) return false;
295   }
296 
297   {
298     // version INT UNSIGNED NOT NULL
299     NdbDictionary::Column col_version(COL_VERSION);
300     col_version.setType(NdbDictionary::Column::Unsigned);
301     col_version.setNullable(false);
302     if (!define_table_add_column(new_table, col_version)) return false;
303   }
304 
305   {
306     // type INT UNSIGNED NOT NULL
307     NdbDictionary::Column col_type(COL_TYPE);
308     col_type.setType(NdbDictionary::Column::Unsigned);
309     col_type.setNullable(false);
310     if (!define_table_add_column(new_table, col_type)) return false;
311   }
312 
313   if (mysql_version >= 80017) {
314     // schema_op_id INT UNSIGNED NULL
315     NdbDictionary::Column col_schema_op_id(COL_SCHEMA_OP_ID);
316     col_schema_op_id.setType(NdbDictionary::Column::Unsigned);
317     col_schema_op_id.setNullable(true);  // NULL!
318     if (!define_table_add_column(new_table, col_schema_op_id)) return false;
319   }
320 
321   return true;
322 }
323 
need_upgrade() const324 bool Ndb_schema_dist_table::need_upgrade() const {
325   // Check that 'schema_op_id' column exists. If exists, it's used for sending
326   // the schema_op_id from client to participants who can then use it when
327   // replying using ndb_schema_result (if they support that table)
328   if (!have_schema_op_id_column()) {
329     return true;
330   }
331   // The 'db' and 'name' column need to be upgrade if length is shorter than
332   // current identifier length
333   if (get_column_max_length(COL_DB) < IDENTIFIER_LENGTH ||
334       get_column_max_length(COL_NAME) < IDENTIFIER_LENGTH) {
335     return true;
336   }
337   return false;
338 }
339 
drop_events_in_NDB() const340 bool Ndb_schema_dist_table::drop_events_in_NDB() const {
341   // Drop the default event on ndb_schema table
342   if (!drop_event_in_NDB("REPL$mysql/ndb_schema")) return false;
343 
344   // Legacy event on ndb_schema table, drop since it might
345   // have been created(although ages ago)
346   if (!drop_event_in_NDB("REPLF$mysql/ndb_schema")) return false;
347 
348   return true;
349 }
350 
define_table_dd() const351 std::string Ndb_schema_dist_table::define_table_dd() const {
352   std::stringstream ss;
353   ss << "CREATE TABLE " << db_name() << "." << table_name() << "(\n";
354   ss << "db VARBINARY(" << get_column_max_length(COL_DB) << ") NOT NULL,";
355   ss << "name VARBINARY(" << get_column_max_length(COL_NAME) << ") NOT NULL,";
356   ss << "slock BINARY(32) NOT NULL,"
357         "query BLOB NOT NULL,"
358         "node_id INT UNSIGNED NOT NULL,"
359         "epoch BIGINT UNSIGNED NOT NULL,"
360         "id INT UNSIGNED NOT NULL,"
361         "version INT UNSIGNED NOT NULL,"
362         "type INT UNSIGNED NOT NULL,";
363   if (have_schema_op_id_column()) {
364     ss << "schema_op_id INT UNSIGNED NULL,";
365   }
366   ss << "PRIMARY KEY USING HASH (db,name)"
367      << ") ENGINE=ndbcluster CHARACTER SET latin1";
368   return ss.str();
369 }
370 
get_slock_bytes() const371 int Ndb_schema_dist_table::get_slock_bytes() const {
372   return get_column_max_length(COL_SLOCK);
373 }
374 
have_schema_op_id_column() const375 bool Ndb_schema_dist_table::have_schema_op_id_column() const {
376   const NdbDictionary::Table *ndb_tab = get_table();
377   return ndb_tab->getColumn(COL_SCHEMA_OP_ID);
378 }
379 
380 // Helper functions to read and write properties into the query column of
381 // the schema UUID tuple. The properties are written in the form of
382 // "key1=value1;key2=value2;"
map_extract_key_value_string(const std::map<std::string,std::string> & kv_map)383 static std::string map_extract_key_value_string(
384     const std::map<std::string, std::string> &kv_map) {
385   std::string key_value_str;
386   std::for_each(kv_map.begin(), kv_map.end(),
387                 [&](std::pair<std::string, std::string> entry) -> void {
388                   key_value_str.append(entry.first);
389                   key_value_str.append("=");
390                   key_value_str.append(entry.second);
391                   key_value_str.append(";");
392                 });
393   return key_value_str;
394 }
395 
key_value_str_get_value(const std::string & kv_str,const char * key)396 static std::string key_value_str_get_value(const std::string &kv_str,
397                                            const char *key) {
398   std::istringstream kv_ss(kv_str);
399   std::string kv_pair;
400   while (std::getline(kv_ss, kv_pair, ';')) {
401     int key_length = kv_pair.find('=');
402     DBUG_ASSERT(key_length > 0);
403     if (kv_pair.substr(0, key_length).compare(key) == 0) {
404       return kv_pair.substr(key_length + 1);
405     }
406   }
407   return "";
408 }
409 
get_schema_uuid(std::string * schema_uuid) const410 bool Ndb_schema_dist_table::get_schema_uuid(std::string *schema_uuid) const {
411   DBUG_TRACE;
412   const NdbDictionary::Table *ndb_table = get_table();
413 
414   // Pack the table and db names to be used during read into table
415   char db_name_buf[FN_REFLEN];
416   char table_name_buf[FN_REFLEN];
417   pack_varbinary(COL_DB, DB_NAME.c_str(), db_name_buf);
418   pack_varbinary(COL_NAME, TABLE_NAME.c_str(), table_name_buf);
419   std::string query_col_value;
420 
421   // Lambda read function to execute using ndb_trans_retry()
422   std::function<const NdbError *(NdbTransaction *)> read_ndb_schema_func =
423       [&](NdbTransaction *trans) -> const NdbError * {
424     DBUG_TRACE;
425     NdbOperation *read_op = trans->getNdbOperation(ndb_table);
426     if (read_op == nullptr) return &trans->getNdbError();
427 
428     // Define read operation based on 'db_name, table_name' key
429     if (read_op->readTuple() != 0 || read_op->equal(COL_DB, db_name_buf) != 0 ||
430         read_op->equal(COL_NAME, table_name_buf) != 0) {
431       return &read_op->getNdbError();
432     }
433 
434     // Setup read for the query column value.
435     NdbBlob *query_blob_handle = read_op->getBlobHandle(COL_QUERY);
436     if (!query_blob_handle) {
437       return &read_op->getNdbError();
438     }
439 
440     if (trans->execute(NdbTransaction::NoCommit,
441                        NdbOperation::DefaultAbortOption,
442                        1 /* force send */) != 0) {
443       // Execute failed.
444       return &trans->getNdbError();
445     }
446 
447     // Transaction execute succeeded. Check the operation for errors
448     const NdbError &read_op_error = read_op->getNdbError();
449     if (read_op_error.code == 0) {
450       // The tuple exists. Read the value from query blob and return
451       if (!unpack_blob_not_null(query_blob_handle, &query_col_value)) {
452         return &query_blob_handle->getNdbError();
453       }
454       return nullptr;
455     } else if (read_op_error.classification ==
456                NdbError::Classification::NoDataFound) {
457       // The tuple doesn't exist.
458       ndb_log_verbose(19, "The schema UUID tuple doesn't exist");
459       return nullptr;
460     } else {
461       // Operation failed with an unexpected error
462       return &read_op_error;
463     }
464   };
465 
466   NdbError ndb_err;
467   if (!ndb_trans_retry(get_ndb(), get_thd(), ndb_err, read_ndb_schema_func)) {
468     push_warning("Failed to read the schema UUID tuple: %s(%d).",
469                  ndb_err.message, ndb_err.code);
470     return false;
471   }
472 
473   if (query_col_value.size() == 0) {
474     // Schema UUID is not present
475     ndb_log_info("Schema UUID not present in ndb_schema table");
476     return true;
477   }
478 
479   // The tuple with Schema UUID exists. It is stored as a key value
480   // pair of form "schema_uuid=<UUID>;". Extract the value and return.
481   schema_uuid->assign(
482       key_value_str_get_value(query_col_value, SCHEMA_UUID_KEY));
483   DBUG_ASSERT(schema_uuid->size() == UUID_LENGTH);
484   ndb_log_verbose(19, "Schema UUID read from NDB : %s", schema_uuid->c_str());
485   return true;
486 }
487 
update_schema_uuid_in_NDB(const std::string & schema_uuid) const488 bool Ndb_schema_dist_table::update_schema_uuid_in_NDB(
489     const std::string &schema_uuid) const {
490   DBUG_TRACE;
491 
492   const NdbDictionary::Table *ndb_table = get_table();
493   DBUG_ASSERT(ndb_table != nullptr);
494 
495   // Store the UUID as a key value pair of form "schema_uuid=<UUID>;"
496   std::map<std::string, std::string> ndb_schema_props;
497   ndb_schema_props.insert({SCHEMA_UUID_KEY, schema_uuid.c_str()});
498   const std::string ndb_schema_props_str =
499       map_extract_key_value_string(ndb_schema_props);
500 
501   // Pack db and table_name
502   char db_buf[FN_REFLEN];
503   char name_buf[FN_REFLEN];
504   pack_varbinary(COL_DB, DB_NAME.c_str(), db_buf);
505   pack_varbinary(COL_NAME, TABLE_NAME.c_str(), name_buf);
506 
507   // Function for writing row to ndb_schema
508   std::function<const NdbError *(NdbTransaction *)> write_schema_op_func =
509       [&](NdbTransaction *trans) -> const NdbError * {
510     DBUG_TRACE;
511 
512     NdbOperation *op = trans->getNdbOperation(ndb_table);
513     if (op == nullptr) return &trans->getNdbError();
514 
515     // Buffer with zeroes for slock
516     std::vector<char> slock_zeroes;
517     slock_zeroes.assign(get_slock_bytes(), 0);
518     const char *slock_buf = slock_zeroes.data();
519 
520     const Uint64 log_epoch = 0;
521     if (op->writeTuple() != 0 || op->equal(COL_DB, db_buf) != 0 ||
522         op->equal(COL_NAME, name_buf) != 0 ||
523         op->setValue(COL_SLOCK, slock_buf) != 0 ||
524         op->setValue(COL_NODEID, 0) != 0 ||
525         op->setValue(COL_EPOCH, log_epoch) != 0 ||
526         op->setValue(COL_ID, 0) != 0 || op->setValue(COL_VERSION, 0) != 0 ||
527         op->setValue(COL_TYPE, SOT_CREATE_TABLE) != 0 ||
528         op->setAnyValue(0) != 0)
529       return &op->getNdbError();
530 
531     if (have_schema_op_id_column() && op->setValue(COL_SCHEMA_OP_ID, 0) != 0)
532       return &op->getNdbError();
533 
534     NdbBlob *ndb_blob = op->getBlobHandle(COL_QUERY);
535     if (ndb_blob == nullptr) return &op->getNdbError();
536 
537     if (ndb_blob->setValue(ndb_schema_props_str.c_str(),
538                            ndb_schema_props_str.length()) != 0)
539       return &ndb_blob->getNdbError();
540 
541     if (trans->execute(NdbTransaction::Commit, NdbOperation::DefaultAbortOption,
542                        1 /* force send */) != 0) {
543       return &trans->getNdbError();
544     }
545 
546     return nullptr;
547   };
548 
549   NdbError ndb_err;
550   if (!ndb_trans_retry(get_ndb(), get_thd(), ndb_err, write_schema_op_func)) {
551     push_warning(
552         "Failed to update schema UUID in 'mysql.ndb_schema' table. Code : %d. "
553         "Error : %s",
554         ndb_err.code, ndb_err.message);
555     return false;
556   }
557 
558   return true;
559 }
560 
pre_upgrade() const561 bool Ndb_schema_dist_table::pre_upgrade() const {
562   // During upgrade, the schema UUID need not be regenerated.
563   // Save it for restoring it later after upgrade
564   if (!get_schema_uuid(&old_ndb_schema_uuid)) return false;
565   return true;
566 }
567 
post_install() const568 bool Ndb_schema_dist_table::post_install() const {
569   DBUG_TRACE;
570 
571   std::string schema_uuid;
572   if (old_ndb_schema_uuid.empty()) {
573     // The table was just created
574     // Generate a new schema uuid for the table
575     String schema_uuid_buf;
576     mysql_generate_uuid(&schema_uuid_buf);
577     schema_uuid.assign(schema_uuid_buf.ptr(), schema_uuid_buf.length());
578     ndb_log_verbose(19, "Generated new schema UUID : %s", schema_uuid.c_str());
579   } else {
580     // The table was just upgraded
581     // Restore the old schema UUID
582     schema_uuid = std::move(old_ndb_schema_uuid);
583     ndb_log_verbose(19, "Restoring schema UUID : %s after upgrade",
584                     schema_uuid.c_str());
585   }
586 
587   // Update the UUID in the ndb_schema_table
588   if (!update_schema_uuid_in_NDB(schema_uuid)) {
589     return false;
590   }
591   return true;
592 }
593