1 /*
2 Copyright (c) 2011, 2020, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 // Implements the functions declared in ndb_schema_dist.h
26 #include "storage/ndb/plugin/ndb_schema_dist.h"
27
28 #include <atomic>
29 #include <mutex>
30
31 #include "my_dbug.h"
32 #include "mysqld_error.h"
33 #include "ndbapi/ndb_cluster_connection.hpp"
34 #include "sql/query_options.h" // OPTION_BIN_LOG
35 #include "sql/sql_error.h"
36 #include "sql/sql_thd_internal_api.h"
37 #include "storage/ndb/plugin/ndb_anyvalue.h"
38 #include "storage/ndb/plugin/ndb_dist_priv_util.h"
39 #include "storage/ndb/plugin/ndb_name_util.h"
40 #include "storage/ndb/plugin/ndb_require.h"
41 #include "storage/ndb/plugin/ndb_schema_dist_table.h"
42 #include "storage/ndb/plugin/ndb_schema_result_table.h"
43 #include "storage/ndb/plugin/ndb_share.h"
44 #include "storage/ndb/plugin/ndb_thd.h"
45 #include "storage/ndb/plugin/ndb_thd_ndb.h"
46 #include "storage/ndb/plugin/ndb_upgrade_util.h"
47
48 // Temporarily use a fixed string on the form "./mysql/ndb_schema" as key
49 // for retrieving the NDB_SHARE for mysql.ndb_schema. This will subsequently
50 // be removed when a NDB_SHARE can be acquired using db+table_name and the
51 // key is formatted behind the curtains in NDB_SHARE without using
52 // build_table_filename() etc.
53 static constexpr const char *NDB_SCHEMA_TABLE_KEY =
54 IF_WIN(".\\mysql\\ndb_schema", "./mysql/ndb_schema");
55
is_ready(void * requestor)56 bool Ndb_schema_dist::is_ready(void *requestor) {
57 DBUG_TRACE;
58
59 std::stringstream ss;
60 ss << "is_ready_" << std::hex << requestor;
61 const std::string reference = ss.str();
62
63 NDB_SHARE *schema_share = NDB_SHARE::acquire_reference_by_key(
64 NDB_SCHEMA_TABLE_KEY, reference.c_str());
65 if (schema_share == nullptr) return false; // Not ready
66
67 if (!schema_share->have_event_operation()) {
68 NDB_SHARE::release_reference(schema_share, reference.c_str());
69 return false; // Not ready
70 }
71
72 NDB_SHARE::release_reference(schema_share, reference.c_str());
73 return true;
74 }
75
76 bool Ndb_schema_dist_client::m_ddl_blocked = true;
77
is_schema_dist_table(const char * db,const char * table_name)78 bool Ndb_schema_dist_client::is_schema_dist_table(const char *db,
79 const char *table_name) {
80 if (db == Ndb_schema_dist_table::DB_NAME &&
81 table_name == Ndb_schema_dist_table::TABLE_NAME) {
82 // This is the NDB table used for schema distribution
83 return true;
84 }
85 return false;
86 }
87
is_schema_dist_result_table(const char * db,const char * table_name)88 bool Ndb_schema_dist_client::is_schema_dist_result_table(
89 const char *db, const char *table_name) {
90 if (db == Ndb_schema_result_table::DB_NAME &&
91 table_name == Ndb_schema_result_table::TABLE_NAME) {
92 // This is the NDB table used for schema distribution results
93 return true;
94 }
95 return false;
96 }
97
98 /*
99 Actual schema change operations that effect the local Data Dictionary are
100 performed with the Global Schema Lock held, but ACL operations are not.
101 Use acl_change_mutex to serialize all ACL changes on this server.
102 */
103 static std::mutex acl_change_mutex;
104
acquire_acl_lock()105 void Ndb_schema_dist_client::acquire_acl_lock() {
106 acl_change_mutex.lock();
107 m_holding_acl_mutex = true;
108 }
109
unique_reference(void * owner)110 static std::string unique_reference(void *owner) {
111 std::stringstream ss;
112 ss << "ndb_schema_dist_client" << std::hex << owner;
113 return ss.str();
114 }
115
Ndb_schema_dist_client(THD * thd)116 Ndb_schema_dist_client::Ndb_schema_dist_client(THD *thd)
117 : m_thd(thd),
118 m_thd_ndb(get_thd_ndb(thd)),
119 m_share_reference(unique_reference(this)),
120 m_holding_acl_mutex(false) {}
121
prepare(const char * db,const char * tabname)122 bool Ndb_schema_dist_client::prepare(const char *db, const char *tabname) {
123 DBUG_TRACE;
124
125 // Acquire reference on mysql.ndb_schema
126 m_share = NDB_SHARE::acquire_reference_by_key(NDB_SCHEMA_TABLE_KEY,
127 m_share_reference.c_str());
128
129 if (m_share == nullptr || m_share->have_event_operation() == false ||
130 DBUG_EVALUATE_IF("ndb_schema_dist_not_ready_early", true, false)) {
131 // The NDB_SHARE for mysql.ndb_schema hasn't been created or not setup
132 // yet -> schema distribution is not ready
133 push_warning(m_thd, Sql_condition::SL_WARNING, ER_GET_ERRMSG,
134 "Schema distribution is not ready");
135 return false;
136 }
137
138 if (unlikely(m_ddl_blocked)) {
139 // If a data node gets upgraded after this MySQL Server is upgraded, this
140 // MySQL Server will not be aware of the upgrade due to Bug#30930132.
141 // So as a workaround, re-evaluate again if the DDL needs to be blocked
142 if (ndb_all_nodes_support_mysql_dd()) {
143 // All nodes connected to cluster support MySQL DD.
144 // No need to continue blocking the DDL.
145 m_ddl_blocked = false;
146 } else {
147 // Non database DDLs are blocked in plugin due to an ongoing upgrade.
148 // Database DDLs are allowed as they are actually executed in the Server
149 // layer and ndbcluster is only responsible for distributing the change to
150 // other MySQL Servers.
151 if (strlen(tabname) != 0) {
152 // If the tablename is not empty, it is a non database DDL. Block it.
153 my_printf_error(
154 ER_DISALLOWED_OPERATION,
155 "DDLs are disallowed on NDB SE as there is atleast one node "
156 "without MySQL DD support connected to the cluster.",
157 MYF(0));
158 return false;
159 }
160 }
161 }
162
163 // Save the prepared "keys"(which are used when communicating with
164 // the other MySQL Servers), they should match the keys used in later calls.
165 m_prepared_keys.add_key(db, tabname);
166
167 Ndb_schema_dist_table schema_dist_table(m_thd_ndb);
168 if (!schema_dist_table.open()) {
169 return false;
170 }
171
172 if (!schema_dist_table.check_schema()) {
173 return false;
174 }
175
176 // Open the ndb_schema_result table, the table is created by ndbcluster
177 // when connecting to NDB and thus it shall exist at this time.
178 Ndb_schema_result_table schema_result_table(m_thd_ndb);
179 if (!schema_result_table.open()) {
180 return false;
181 }
182
183 if (!schema_result_table.check_schema()) {
184 return false;
185 }
186
187 // Schema distribution is ready
188 return true;
189 }
190
prepare_rename(const char * db,const char * tabname,const char * new_db,const char * new_tabname)191 bool Ndb_schema_dist_client::prepare_rename(const char *db, const char *tabname,
192 const char *new_db,
193 const char *new_tabname) {
194 DBUG_TRACE;
195
196 // Normal prepare first
197 if (!prepare(db, tabname)) {
198 /* During upgrade to 8.0, distributed privilege tables must get renamed
199 as part of a statement "ALTER TABLE ... ENGINE=innodb" before schema
200 distribution has started running.
201 */
202 if (Ndb_dist_priv_util::is_privilege_table(db, tabname)) return true;
203
204 return false;
205 }
206
207 // Allow additional keys for rename which will use the "old" name
208 // when communicating with participants until the rename is done.
209 // After rename has occurred, the new name will be used
210 m_prepared_keys.add_key(new_db, new_tabname);
211
212 // Schema distribution is ready
213 return true;
214 }
215
prepare_acl_change(uint node_id)216 bool Ndb_schema_dist_client::prepare_acl_change(uint node_id) {
217 /* Acquire the ACL change mutex. It will be released by the destructor.
218 */
219 acquire_acl_lock();
220
221 /*
222 There is no table name required to log an ACL operation, but the table
223 name is a part of the primary key in ndb_schema. Fabricate a name
224 that is unique to this MySQL server, so that ACL changes originating
225 from different servers use different rows in ndb_schema.
226 */
227 std::string server_key = "acl_dist_from_" + std::to_string(node_id);
228
229 /*
230 Always use "mysql" as the db part of the primary key.
231 If the current database is set to something other than "mysql", the
232 database will be transmitted as part of GRANT and REVOKE statements.
233 */
234 return prepare("mysql", server_key.c_str());
235 }
236
check_identifier_limits(std::string & invalid_identifier)237 bool Ndb_schema_dist_client::check_identifier_limits(
238 std::string &invalid_identifier) {
239 DBUG_TRACE;
240
241 Ndb_schema_dist_table schema_dist_table(m_thd_ndb);
242 if (!schema_dist_table.open()) {
243 invalid_identifier = "<open failed>";
244 return false;
245 }
246
247 // Check that identifiers does not exceed the limits imposed
248 // by the ndb_schema table layout
249 for (auto key : m_prepared_keys.keys()) {
250 // db
251 if (!schema_dist_table.check_column_identifier_limit(
252 Ndb_schema_dist_table::COL_DB, key.first)) {
253 invalid_identifier = key.first;
254 return false;
255 }
256 // name
257 if (!schema_dist_table.check_column_identifier_limit(
258 Ndb_schema_dist_table::COL_NAME, key.second)) {
259 invalid_identifier = key.second;
260 return false;
261 }
262 }
263 return true;
264 }
265
add_key(const char * db,const char * tabname)266 void Ndb_schema_dist_client::Prepared_keys::add_key(const char *db,
267 const char *tabname) {
268 m_keys.emplace_back(db, tabname);
269 }
270
check_key(const char * db,const char * tabname) const271 bool Ndb_schema_dist_client::Prepared_keys::check_key(
272 const char *db, const char *tabname) const {
273 for (auto key : m_keys) {
274 if (key.first == db && key.second == tabname) {
275 return true; // OK, key has been prepared
276 }
277 }
278 return false;
279 }
280
281 extern void update_slave_api_stats(const Ndb *);
282
~Ndb_schema_dist_client()283 Ndb_schema_dist_client::~Ndb_schema_dist_client() {
284 if (m_share) {
285 // Release the reference to mysql.ndb_schema table
286 NDB_SHARE::release_reference(m_share, m_share_reference.c_str());
287 }
288
289 if (m_thd_ndb && m_thd_ndb->is_slave_thread()) {
290 // Copy-out slave thread statistics
291 // NOTE! This is just a "convenient place" to call this
292 // function, it could be moved to "end of statement"(if there
293 // was such a place..).
294 update_slave_api_stats(m_thd_ndb->ndb);
295 }
296
297 if (m_holding_acl_mutex) {
298 acl_change_mutex.unlock();
299 }
300 }
301
302 /*
303 Produce unique identifier for distributing objects that
304 does not have any global id from NDB. Use a sequence counter
305 which is unique in this node.
306 */
307 static std::atomic<uint32> schema_dist_id_sequence{0};
unique_id() const308 uint32 Ndb_schema_dist_client::unique_id() const {
309 uint32 id = ++schema_dist_id_sequence;
310 // Handle wraparound
311 if (id == 0) {
312 id = ++schema_dist_id_sequence;
313 }
314 DBUG_ASSERT(id != 0);
315 return id;
316 }
317
318 /*
319 Produce unique identifier for distributing objects that
320 does not have any global version from NDB. Use own nodeid
321 which is unique in NDB.
322 */
unique_version() const323 uint32 Ndb_schema_dist_client::unique_version() const {
324 const uint32 ver = m_thd_ndb->connection->node_id();
325 DBUG_ASSERT(ver != 0);
326 return ver;
327 }
328
push_and_clear_schema_op_results()329 void Ndb_schema_dist_client::push_and_clear_schema_op_results() {
330 if (m_schema_op_results.empty()) {
331 return;
332 }
333
334 // Push results received from participant(s) as warnings. These are meant to
335 // indicate that schema distribution has failed on one of the nodes. For more
336 // information on how and why the failure occured, the relevant error log
337 // remains the place to look
338 for (const Schema_op_result &op_result : m_schema_op_results) {
339 // Warning consists of the node id and message but not result code since
340 // that's an internal detail
341 m_thd_ndb->push_warning("Node %d: '%s'", op_result.nodeid,
342 op_result.message.c_str());
343 }
344 // Clear the results. This is needed when the Ndb_schema_dist_client object
345 // is reused as is the case during an inplace alter where the same object is
346 // used during both prepare and commit
347 m_schema_op_results.clear();
348 }
349
log_schema_op(const char * query,size_t query_length,const char * db,const char * table_name,uint32 id,uint32 version,SCHEMA_OP_TYPE type,bool log_query_on_participant)350 bool Ndb_schema_dist_client::log_schema_op(const char *query,
351 size_t query_length, const char *db,
352 const char *table_name, uint32 id,
353 uint32 version, SCHEMA_OP_TYPE type,
354 bool log_query_on_participant) {
355 DBUG_TRACE;
356 DBUG_ASSERT(db && table_name);
357 DBUG_ASSERT(id != 0 && version != 0);
358 DBUG_ASSERT(m_thd_ndb);
359
360 // Never allow temporary names when communicating with participant
361 if (ndb_name_is_temp(db) || ndb_name_is_temp(table_name)) {
362 DBUG_ASSERT(false);
363 return false;
364 }
365
366 // Require that m_share has been initialized to reference the
367 // schema distribution table
368 ndbcluster::ndbrequire(m_share);
369
370 // Check that prepared keys match
371 if (!m_prepared_keys.check_key(db, table_name)) {
372 m_thd_ndb->push_warning("INTERNAL ERROR: prepared keys didn't match");
373 DBUG_ASSERT(false); // Catch in debug
374 return false;
375 }
376
377 // Don't distribute if thread has turned off schema distribution
378 if (m_thd_ndb->check_option(Thd_ndb::NO_LOG_SCHEMA_OP)) {
379 DBUG_PRINT("info", ("NO_LOG_SCHEMA_OP set - > skip schema distribution"));
380 return true; // Ok, skipped
381 }
382
383 // Verify identifier limits, this should already have been caught earlier
384 {
385 std::string invalid_identifier;
386 if (!check_identifier_limits(invalid_identifier)) {
387 m_thd_ndb->push_warning("INTERNAL ERROR: identifier limits exceeded");
388 DBUG_ASSERT(false); // Catch in debug
389 return false;
390 }
391 }
392
393 // Calculate anyvalue
394 const Uint32 anyvalue = calculate_anyvalue(log_query_on_participant);
395
396 const bool result =
397 log_schema_op_impl(m_thd_ndb->ndb, query, static_cast<int>(query_length),
398 db, table_name, id, version, type, anyvalue);
399 if (!result) {
400 // Schema distribution failed
401 push_and_clear_schema_op_results();
402 m_thd_ndb->push_warning("Schema distribution failed");
403 return false;
404 }
405
406 // Schema distribution passed but the schema op may have failed on
407 // participants. Push and clear results (if any)
408 push_and_clear_schema_op_results();
409 return true;
410 }
411
create_table(const char * db,const char * table_name,int id,int version)412 bool Ndb_schema_dist_client::create_table(const char *db,
413 const char *table_name, int id,
414 int version) {
415 DBUG_TRACE;
416
417 if (is_schema_dist_table(db, table_name)) {
418 // Create of the schema distribution table is not distributed. Instead,
419 // every MySQL Server have special handling to create it if not
420 // exists and then open it as first step of connecting to the cluster
421 return true;
422 }
423
424 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
425 table_name, id, version, SOT_CREATE_TABLE);
426 }
427
truncate_table(const char * db,const char * table_name,int id,int version)428 bool Ndb_schema_dist_client::truncate_table(const char *db,
429 const char *table_name, int id,
430 int version) {
431 DBUG_TRACE;
432 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
433 table_name, id, version, SOT_TRUNCATE_TABLE);
434 }
435
alter_table(const char * db,const char * table_name,int id,int version,bool log_on_participant)436 bool Ndb_schema_dist_client::alter_table(const char *db, const char *table_name,
437 int id, int version,
438 bool log_on_participant) {
439 DBUG_TRACE;
440 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
441 table_name, id, version, SOT_ALTER_TABLE_COMMIT,
442 log_on_participant);
443 }
444
alter_table_inplace_prepare(const char * db,const char * table_name,int id,int version)445 bool Ndb_schema_dist_client::alter_table_inplace_prepare(const char *db,
446 const char *table_name,
447 int id, int version) {
448 DBUG_TRACE;
449 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
450 table_name, id, version, SOT_ONLINE_ALTER_TABLE_PREPARE);
451 }
452
alter_table_inplace_commit(const char * db,const char * table_name,int id,int version)453 bool Ndb_schema_dist_client::alter_table_inplace_commit(const char *db,
454 const char *table_name,
455 int id, int version) {
456 DBUG_TRACE;
457 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
458 table_name, id, version, SOT_ONLINE_ALTER_TABLE_COMMIT);
459 }
460
rename_table_prepare(const char * db,const char * table_name,int id,int version,const char * new_key_for_table)461 bool Ndb_schema_dist_client::rename_table_prepare(
462 const char *db, const char *table_name, int id, int version,
463 const char *new_key_for_table) {
464 DBUG_TRACE;
465 // NOTE! The rename table prepare phase is primarily done in order to
466 // pass the "new key"(i.e db/table_name) for the table to be renamed,
467 // that's since there isn't enough placeholders in the subsequent rename
468 // table phase.
469 return log_schema_op(new_key_for_table, strlen(new_key_for_table), db,
470 table_name, id, version, SOT_RENAME_TABLE_PREPARE);
471 }
472
rename_table(const char * db,const char * table_name,int id,int version,const char * new_dbname,const char * new_tabname,bool log_on_participant)473 bool Ndb_schema_dist_client::rename_table(const char *db,
474 const char *table_name, int id,
475 int version, const char *new_dbname,
476 const char *new_tabname,
477 bool log_on_participant) {
478 DBUG_TRACE;
479
480 /*
481 Rewrite the query, the original query may contain several tables but
482 rename_table() is called once for each table in the query.
483 ie. RENAME TABLE t1 to tx, t2 to ty;
484 -> RENAME TABLE t1 to tx + RENAME TABLE t2 to ty
485 */
486 std::string rewritten_query;
487 rewritten_query.append("rename table `")
488 .append(db)
489 .append("`.`")
490 .append(table_name)
491 .append("` to `")
492 .append(new_dbname)
493 .append("`.`")
494 .append(new_tabname)
495 .append("`");
496 DBUG_PRINT("info", ("rewritten query: '%s'", rewritten_query.c_str()));
497
498 return log_schema_op(rewritten_query.c_str(), rewritten_query.length(), db,
499 table_name, id, version, SOT_RENAME_TABLE,
500 log_on_participant);
501 }
502
drop_table(const char * db,const char * table_name,int id,int version,bool log_on_participant)503 bool Ndb_schema_dist_client::drop_table(const char *db, const char *table_name,
504 int id, int version,
505 bool log_on_participant) {
506 DBUG_TRACE;
507
508 /*
509 Never distribute each dropped table as part of DROP DATABASE:
510 1) as only the DROP DATABASE command should go into binlog
511 2) as this MySQL Server is dropping the tables from NDB, when
512 the participants get the DROP DATABASE it will remove
513 any tables from the DD and then remove the database.
514 */
515 DBUG_ASSERT(thd_sql_command(m_thd) != SQLCOM_DROP_DB);
516
517 /*
518 Rewrite the query, the original query may contain several tables but
519 drop_table() is called once for each table in the query.
520 ie. DROP TABLE t1, t2;
521 -> DROP TABLE t1 + DROP TABLE t2
522 */
523 std::string rewritten_query;
524 rewritten_query.append("drop table `")
525 .append(db)
526 .append("`.`")
527 .append(table_name)
528 .append("`");
529 DBUG_PRINT("info", ("rewritten query: '%s'", rewritten_query.c_str()));
530
531 // Special case where the table to be dropped was already dropped in the
532 // client. This is considered acceptable behavior and the query is distributed
533 // to ensure that the table is dropped in the pariticipants. Assign values to
534 // id and version to workaround the assumption that they will always be != 0
535 if (id == 0 && version == 0) {
536 id = unique_id();
537 version = unique_version();
538 }
539
540 return log_schema_op(rewritten_query.c_str(), rewritten_query.length(), db,
541 table_name, id, version, SOT_DROP_TABLE,
542 log_on_participant);
543 }
544
create_db(const char * query,uint query_length,const char * db,unsigned int id,unsigned int version)545 bool Ndb_schema_dist_client::create_db(const char *query, uint query_length,
546 const char *db, unsigned int id,
547 unsigned int version) {
548 DBUG_TRACE;
549
550 // Checking identifier limits "late", there is no way to return
551 // an error to fail the CREATE DATABASE command
552 std::string invalid_identifier;
553 if (!check_identifier_limits(invalid_identifier)) {
554 // Check of db name limit failed
555 m_thd_ndb->push_warning("Identifier name '%-.100s' is too long",
556 invalid_identifier.c_str());
557 return false;
558 }
559
560 return log_schema_op(query, query_length, db, "", id, version, SOT_CREATE_DB);
561 }
562
alter_db(const char * query,uint query_length,const char * db,unsigned int id,unsigned int version)563 bool Ndb_schema_dist_client::alter_db(const char *query, uint query_length,
564 const char *db, unsigned int id,
565 unsigned int version) {
566 DBUG_TRACE;
567
568 // Checking identifier limits "late", there is no way to return
569 // an error to fail the ALTER DATABASE command
570 std::string invalid_identifier;
571 if (!check_identifier_limits(invalid_identifier)) {
572 // Check of db name limit failed
573 m_thd_ndb->push_warning("Identifier name '%-.100s' is too long",
574 invalid_identifier.c_str());
575 return false;
576 }
577
578 return log_schema_op(query, query_length, db, "", id, version, SOT_ALTER_DB);
579 }
580
drop_db(const char * db)581 bool Ndb_schema_dist_client::drop_db(const char *db) {
582 DBUG_TRACE;
583
584 // Checking identifier limits "late", there is no way to return
585 // an error to fail the DROP DATABASE command
586 std::string invalid_identifier;
587 if (!check_identifier_limits(invalid_identifier)) {
588 // Check of db name limit failed
589 m_thd_ndb->push_warning("Identifier name '%-.100s' is too long",
590 invalid_identifier.c_str());
591 return false;
592 }
593
594 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
595 "", unique_id(), unique_version(), SOT_DROP_DB);
596 }
597
598 /* STATEMENT-style ACL change distribution */
acl_notify(const char * database,const char * query,uint query_length,bool participant_refresh)599 bool Ndb_schema_dist_client::acl_notify(const char *database, const char *query,
600 uint query_length,
601 bool participant_refresh) {
602 DBUG_TRACE;
603 DBUG_ASSERT(m_holding_acl_mutex);
604 auto key = m_prepared_keys.keys()[0];
605 std::string new_query("use ");
606 if (database != nullptr && strcmp(database, "mysql")) {
607 new_query.append(database).append(";").append(query, query_length);
608 query = new_query.c_str();
609 query_length = new_query.size();
610 }
611 SCHEMA_OP_TYPE type =
612 participant_refresh ? SOT_ACL_STATEMENT : SOT_ACL_STATEMENT_REFRESH;
613 return log_schema_op(query, query_length, key.first.c_str(),
614 key.second.c_str(), unique_id(), unique_version(), type);
615 }
616
617 /* SNAPSHOT-style ACL change distribution */
acl_notify(std::string user_list)618 bool Ndb_schema_dist_client::acl_notify(std::string user_list) {
619 DBUG_TRACE;
620 DBUG_ASSERT(m_holding_acl_mutex);
621 auto key = m_prepared_keys.keys()[0];
622
623 return log_schema_op(user_list.c_str(), user_list.length(), key.first.c_str(),
624 key.second.c_str(), unique_id(), unique_version(),
625 SOT_ACL_SNAPSHOT);
626 }
627
tablespace_changed(const char * tablespace_name,int id,int version)628 bool Ndb_schema_dist_client::tablespace_changed(const char *tablespace_name,
629 int id, int version) {
630 DBUG_TRACE;
631 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
632 tablespace_name, id, version, SOT_TABLESPACE);
633 }
634
logfilegroup_changed(const char * logfilegroup_name,int id,int version)635 bool Ndb_schema_dist_client::logfilegroup_changed(const char *logfilegroup_name,
636 int id, int version) {
637 DBUG_TRACE;
638 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
639 logfilegroup_name, id, version, SOT_LOGFILE_GROUP);
640 }
641
create_tablespace(const char * tablespace_name,int id,int version)642 bool Ndb_schema_dist_client::create_tablespace(const char *tablespace_name,
643 int id, int version) {
644 DBUG_TRACE;
645 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
646 tablespace_name, id, version, SOT_CREATE_TABLESPACE);
647 }
648
alter_tablespace(const char * tablespace_name,int id,int version)649 bool Ndb_schema_dist_client::alter_tablespace(const char *tablespace_name,
650 int id, int version) {
651 DBUG_TRACE;
652 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
653 tablespace_name, id, version, SOT_ALTER_TABLESPACE);
654 }
655
drop_tablespace(const char * tablespace_name,int id,int version)656 bool Ndb_schema_dist_client::drop_tablespace(const char *tablespace_name,
657 int id, int version) {
658 DBUG_TRACE;
659 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
660 tablespace_name, id, version, SOT_DROP_TABLESPACE);
661 }
662
create_logfile_group(const char * logfile_group_name,int id,int version)663 bool Ndb_schema_dist_client::create_logfile_group(
664 const char *logfile_group_name, int id, int version) {
665 DBUG_TRACE;
666 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
667 logfile_group_name, id, version,
668 SOT_CREATE_LOGFILE_GROUP);
669 }
670
alter_logfile_group(const char * logfile_group_name,int id,int version)671 bool Ndb_schema_dist_client::alter_logfile_group(const char *logfile_group_name,
672 int id, int version) {
673 DBUG_TRACE;
674 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
675 logfile_group_name, id, version,
676 SOT_ALTER_LOGFILE_GROUP);
677 }
678
drop_logfile_group(const char * logfile_group_name,int id,int version)679 bool Ndb_schema_dist_client::drop_logfile_group(const char *logfile_group_name,
680 int id, int version) {
681 DBUG_TRACE;
682 return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
683 logfile_group_name, id, version, SOT_DROP_LOGFILE_GROUP);
684 }
685
type_name(SCHEMA_OP_TYPE type)686 const char *Ndb_schema_dist_client::type_name(SCHEMA_OP_TYPE type) {
687 switch (type) {
688 case SOT_DROP_TABLE:
689 return "DROP_TABLE";
690 case SOT_CREATE_TABLE:
691 return "CREATE_TABLE";
692 case SOT_ALTER_TABLE_COMMIT:
693 return "ALTER_TABLE_COMMIT";
694 case SOT_DROP_DB:
695 return "DROP_DB";
696 case SOT_CREATE_DB:
697 return "CREATE_DB";
698 case SOT_ALTER_DB:
699 return "ALTER_DB";
700 case SOT_CLEAR_SLOCK:
701 return "CLEAR_SLOCK";
702 case SOT_TABLESPACE:
703 return "TABLESPACE";
704 case SOT_LOGFILE_GROUP:
705 return "LOGFILE_GROUP";
706 case SOT_RENAME_TABLE:
707 return "RENAME_TABLE";
708 case SOT_TRUNCATE_TABLE:
709 return "TRUNCATE_TABLE";
710 case SOT_RENAME_TABLE_PREPARE:
711 return "RENAME_TABLE_PREPARE";
712 case SOT_ONLINE_ALTER_TABLE_PREPARE:
713 return "ONLINE_ALTER_TABLE_PREPARE";
714 case SOT_ONLINE_ALTER_TABLE_COMMIT:
715 return "ONLINE_ALTER_TABLE_COMMIT";
716 case SOT_CREATE_USER:
717 return "CREATE_USER";
718 case SOT_DROP_USER:
719 return "DROP_USER";
720 case SOT_RENAME_USER:
721 return "RENAME_USER";
722 case SOT_GRANT:
723 return "GRANT";
724 case SOT_REVOKE:
725 return "REVOKE";
726 case SOT_CREATE_TABLESPACE:
727 return "CREATE_TABLESPACE";
728 case SOT_ALTER_TABLESPACE:
729 return "ALTER_TABLESPACE";
730 case SOT_DROP_TABLESPACE:
731 return "DROP_TABLESPACE";
732 case SOT_CREATE_LOGFILE_GROUP:
733 return "CREATE_LOGFILE_GROUP";
734 case SOT_ALTER_LOGFILE_GROUP:
735 return "ALTER_LOGFILE_GROUP";
736 case SOT_DROP_LOGFILE_GROUP:
737 return "DROP_LOGFILE_GROUP";
738 case SOT_ACL_SNAPSHOT:
739 return "ACL_SNAPSHOT";
740 case SOT_ACL_STATEMENT:
741 return "ACL_STATEMENT";
742 case SOT_ACL_STATEMENT_REFRESH:
743 return "ACL_STATEMENT_REFRESH";
744 default:
745 break;
746 }
747 DBUG_ASSERT(false);
748 return "<unknown>";
749 }
750
calculate_anyvalue(bool force_nologging) const751 uint32 Ndb_schema_dist_client::calculate_anyvalue(bool force_nologging) const {
752 Uint32 anyValue = 0;
753 if (!thd_slave_thread(m_thd)) {
754 /* Schema change originating from this MySQLD, check SQL_LOG_BIN
755 * variable and pass 'setting' to all logging MySQLDs via AnyValue
756 */
757 if (thd_test_options(m_thd, OPTION_BIN_LOG)) /* e.g. SQL_LOG_BIN == on */
758 {
759 DBUG_PRINT("info", ("Schema event for binlogging"));
760 ndbcluster_anyvalue_set_normal(anyValue);
761 } else {
762 DBUG_PRINT("info", ("Schema event not for binlogging"));
763 ndbcluster_anyvalue_set_nologging(anyValue);
764 }
765
766 if (!force_nologging) {
767 DBUG_PRINT("info", ("Forcing query not to be binlogged on participant"));
768 ndbcluster_anyvalue_set_nologging(anyValue);
769 }
770 } else {
771 /*
772 Slave propagating replicated schema event in ndb_schema
773 In case replicated serverId is composite
774 (server-id-bits < 31) we copy it into the
775 AnyValue as-is
776 This is for 'future', as currently Schema operations
777 do not have composite AnyValues.
778 In future it may be useful to support *not* mapping composite
779 AnyValues to/from Binlogged server-ids.
780 */
781 DBUG_PRINT("info", ("Replicated schema event with original server id"));
782 anyValue = thd_unmasked_server_id(m_thd);
783 }
784
785 #ifndef DBUG_OFF
786 /*
787 MySQLD will set the user-portion of AnyValue (if any) to all 1s
788 This tests code filtering ServerIds on the value of server-id-bits.
789 */
790 const char *p = getenv("NDB_TEST_ANYVALUE_USERDATA");
791 if (p != 0 && *p != 0 && *p != '0' && *p != 'n' && *p != 'N') {
792 dbug_ndbcluster_anyvalue_set_userbits(anyValue);
793 }
794 #endif
795 DBUG_PRINT("info", ("anyvalue: %u", anyValue));
796 return anyValue;
797 }
798