1 /*
2    Copyright (c) 2011, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 // Implements the functions declared in ndb_schema_dist.h
26 #include "storage/ndb/plugin/ndb_schema_dist.h"
27 
28 #include <atomic>
29 #include <mutex>
30 
31 #include "my_dbug.h"
32 #include "mysqld_error.h"
33 #include "ndbapi/ndb_cluster_connection.hpp"
34 #include "sql/query_options.h"  // OPTION_BIN_LOG
35 #include "sql/sql_error.h"
36 #include "sql/sql_thd_internal_api.h"
37 #include "storage/ndb/plugin/ndb_anyvalue.h"
38 #include "storage/ndb/plugin/ndb_dist_priv_util.h"
39 #include "storage/ndb/plugin/ndb_name_util.h"
40 #include "storage/ndb/plugin/ndb_require.h"
41 #include "storage/ndb/plugin/ndb_schema_dist_table.h"
42 #include "storage/ndb/plugin/ndb_schema_result_table.h"
43 #include "storage/ndb/plugin/ndb_share.h"
44 #include "storage/ndb/plugin/ndb_thd.h"
45 #include "storage/ndb/plugin/ndb_thd_ndb.h"
46 #include "storage/ndb/plugin/ndb_upgrade_util.h"
47 
48 // Temporarily use a fixed string on the form "./mysql/ndb_schema" as key
49 // for retrieving the NDB_SHARE for mysql.ndb_schema. This will subsequently
50 // be removed when a NDB_SHARE can be acquired using db+table_name and the
51 // key is formatted behind the curtains in NDB_SHARE without using
52 // build_table_filename() etc.
53 static constexpr const char *NDB_SCHEMA_TABLE_KEY =
54     IF_WIN(".\\mysql\\ndb_schema", "./mysql/ndb_schema");
55 
is_ready(void * requestor)56 bool Ndb_schema_dist::is_ready(void *requestor) {
57   DBUG_TRACE;
58 
59   std::stringstream ss;
60   ss << "is_ready_" << std::hex << requestor;
61   const std::string reference = ss.str();
62 
63   NDB_SHARE *schema_share = NDB_SHARE::acquire_reference_by_key(
64       NDB_SCHEMA_TABLE_KEY, reference.c_str());
65   if (schema_share == nullptr) return false;  // Not ready
66 
67   if (!schema_share->have_event_operation()) {
68     NDB_SHARE::release_reference(schema_share, reference.c_str());
69     return false;  // Not ready
70   }
71 
72   NDB_SHARE::release_reference(schema_share, reference.c_str());
73   return true;
74 }
75 
76 bool Ndb_schema_dist_client::m_ddl_blocked = true;
77 
is_schema_dist_table(const char * db,const char * table_name)78 bool Ndb_schema_dist_client::is_schema_dist_table(const char *db,
79                                                   const char *table_name) {
80   if (db == Ndb_schema_dist_table::DB_NAME &&
81       table_name == Ndb_schema_dist_table::TABLE_NAME) {
82     // This is the NDB table used for schema distribution
83     return true;
84   }
85   return false;
86 }
87 
is_schema_dist_result_table(const char * db,const char * table_name)88 bool Ndb_schema_dist_client::is_schema_dist_result_table(
89     const char *db, const char *table_name) {
90   if (db == Ndb_schema_result_table::DB_NAME &&
91       table_name == Ndb_schema_result_table::TABLE_NAME) {
92     // This is the NDB table used for schema distribution results
93     return true;
94   }
95   return false;
96 }
97 
98 /*
99   Actual schema change operations that effect the local Data Dictionary are
100   performed with the Global Schema Lock held, but ACL operations are not.
101   Use acl_change_mutex to serialize all ACL changes on this server.
102 */
103 static std::mutex acl_change_mutex;
104 
acquire_acl_lock()105 void Ndb_schema_dist_client::acquire_acl_lock() {
106   acl_change_mutex.lock();
107   m_holding_acl_mutex = true;
108 }
109 
unique_reference(void * owner)110 static std::string unique_reference(void *owner) {
111   std::stringstream ss;
112   ss << "ndb_schema_dist_client" << std::hex << owner;
113   return ss.str();
114 }
115 
Ndb_schema_dist_client(THD * thd)116 Ndb_schema_dist_client::Ndb_schema_dist_client(THD *thd)
117     : m_thd(thd),
118       m_thd_ndb(get_thd_ndb(thd)),
119       m_share_reference(unique_reference(this)),
120       m_holding_acl_mutex(false) {}
121 
prepare(const char * db,const char * tabname)122 bool Ndb_schema_dist_client::prepare(const char *db, const char *tabname) {
123   DBUG_TRACE;
124 
125   // Acquire reference on mysql.ndb_schema
126   m_share = NDB_SHARE::acquire_reference_by_key(NDB_SCHEMA_TABLE_KEY,
127                                                 m_share_reference.c_str());
128 
129   if (m_share == nullptr || m_share->have_event_operation() == false ||
130       DBUG_EVALUATE_IF("ndb_schema_dist_not_ready_early", true, false)) {
131     // The NDB_SHARE for mysql.ndb_schema hasn't been created or not setup
132     // yet -> schema distribution is not ready
133     push_warning(m_thd, Sql_condition::SL_WARNING, ER_GET_ERRMSG,
134                  "Schema distribution is not ready");
135     return false;
136   }
137 
138   if (unlikely(m_ddl_blocked)) {
139     // If a data node gets upgraded after this MySQL Server is upgraded, this
140     // MySQL Server will not be aware of the upgrade due to Bug#30930132.
141     // So as a workaround, re-evaluate again if the DDL needs to be blocked
142     if (ndb_all_nodes_support_mysql_dd()) {
143       // All nodes connected to cluster support MySQL DD.
144       // No need to continue blocking the DDL.
145       m_ddl_blocked = false;
146     } else {
147       // Non database DDLs are blocked in plugin due to an ongoing upgrade.
148       // Database DDLs are allowed as they are actually executed in the Server
149       // layer and ndbcluster is only responsible for distributing the change to
150       // other MySQL Servers.
151       if (strlen(tabname) != 0) {
152         // If the tablename is not empty, it is a non database DDL. Block it.
153         my_printf_error(
154             ER_DISALLOWED_OPERATION,
155             "DDLs are disallowed on NDB SE as there is atleast one node "
156             "without MySQL DD support connected to the cluster.",
157             MYF(0));
158         return false;
159       }
160     }
161   }
162 
163   // Save the prepared "keys"(which are used when communicating with
164   // the other MySQL Servers), they should match the keys used in later calls.
165   m_prepared_keys.add_key(db, tabname);
166 
167   Ndb_schema_dist_table schema_dist_table(m_thd_ndb);
168   if (!schema_dist_table.open()) {
169     return false;
170   }
171 
172   if (!schema_dist_table.check_schema()) {
173     return false;
174   }
175 
176   // Open the ndb_schema_result table, the table is created by ndbcluster
177   // when connecting to NDB and thus it shall exist at this time.
178   Ndb_schema_result_table schema_result_table(m_thd_ndb);
179   if (!schema_result_table.open()) {
180     return false;
181   }
182 
183   if (!schema_result_table.check_schema()) {
184     return false;
185   }
186 
187   // Schema distribution is ready
188   return true;
189 }
190 
prepare_rename(const char * db,const char * tabname,const char * new_db,const char * new_tabname)191 bool Ndb_schema_dist_client::prepare_rename(const char *db, const char *tabname,
192                                             const char *new_db,
193                                             const char *new_tabname) {
194   DBUG_TRACE;
195 
196   // Normal prepare first
197   if (!prepare(db, tabname)) {
198     /* During upgrade to 8.0, distributed privilege tables must get renamed
199        as part of a statement "ALTER TABLE ... ENGINE=innodb" before schema
200        distribution has started running.
201     */
202     if (Ndb_dist_priv_util::is_privilege_table(db, tabname)) return true;
203 
204     return false;
205   }
206 
207   // Allow additional keys for rename which will use the "old" name
208   // when communicating with participants until the rename is done.
209   // After rename has occurred, the new name will be used
210   m_prepared_keys.add_key(new_db, new_tabname);
211 
212   // Schema distribution is ready
213   return true;
214 }
215 
prepare_acl_change(uint node_id)216 bool Ndb_schema_dist_client::prepare_acl_change(uint node_id) {
217   /* Acquire the ACL change mutex. It will be released by the destructor.
218    */
219   acquire_acl_lock();
220 
221   /*
222     There is no table name required to log an ACL operation, but the table
223     name is a part of the primary key in ndb_schema. Fabricate a name
224     that is unique to this MySQL server, so that ACL changes originating
225     from different servers use different rows in ndb_schema.
226   */
227   std::string server_key = "acl_dist_from_" + std::to_string(node_id);
228 
229   /*
230     Always use "mysql" as the db part of the primary key.
231     If the current database is set to something other than "mysql", the
232     database will be transmitted as part of GRANT and REVOKE statements.
233   */
234   return prepare("mysql", server_key.c_str());
235 }
236 
check_identifier_limits(std::string & invalid_identifier)237 bool Ndb_schema_dist_client::check_identifier_limits(
238     std::string &invalid_identifier) {
239   DBUG_TRACE;
240 
241   Ndb_schema_dist_table schema_dist_table(m_thd_ndb);
242   if (!schema_dist_table.open()) {
243     invalid_identifier = "<open failed>";
244     return false;
245   }
246 
247   // Check that identifiers does not exceed the limits imposed
248   // by the ndb_schema table layout
249   for (auto key : m_prepared_keys.keys()) {
250     // db
251     if (!schema_dist_table.check_column_identifier_limit(
252             Ndb_schema_dist_table::COL_DB, key.first)) {
253       invalid_identifier = key.first;
254       return false;
255     }
256     // name
257     if (!schema_dist_table.check_column_identifier_limit(
258             Ndb_schema_dist_table::COL_NAME, key.second)) {
259       invalid_identifier = key.second;
260       return false;
261     }
262   }
263   return true;
264 }
265 
add_key(const char * db,const char * tabname)266 void Ndb_schema_dist_client::Prepared_keys::add_key(const char *db,
267                                                     const char *tabname) {
268   m_keys.emplace_back(db, tabname);
269 }
270 
check_key(const char * db,const char * tabname) const271 bool Ndb_schema_dist_client::Prepared_keys::check_key(
272     const char *db, const char *tabname) const {
273   for (auto key : m_keys) {
274     if (key.first == db && key.second == tabname) {
275       return true;  // OK, key has been prepared
276     }
277   }
278   return false;
279 }
280 
281 extern void update_slave_api_stats(const Ndb *);
282 
~Ndb_schema_dist_client()283 Ndb_schema_dist_client::~Ndb_schema_dist_client() {
284   if (m_share) {
285     // Release the reference to mysql.ndb_schema table
286     NDB_SHARE::release_reference(m_share, m_share_reference.c_str());
287   }
288 
289   if (m_thd_ndb && m_thd_ndb->is_slave_thread()) {
290     // Copy-out slave thread statistics
291     // NOTE! This is just a "convenient place" to call this
292     // function, it could be moved to "end of statement"(if there
293     // was such a place..).
294     update_slave_api_stats(m_thd_ndb->ndb);
295   }
296 
297   if (m_holding_acl_mutex) {
298     acl_change_mutex.unlock();
299   }
300 }
301 
302 /*
303   Produce unique identifier for distributing objects that
304   does not have any global id from NDB. Use a sequence counter
305   which is unique in this node.
306 */
307 static std::atomic<uint32> schema_dist_id_sequence{0};
unique_id() const308 uint32 Ndb_schema_dist_client::unique_id() const {
309   uint32 id = ++schema_dist_id_sequence;
310   // Handle wraparound
311   if (id == 0) {
312     id = ++schema_dist_id_sequence;
313   }
314   DBUG_ASSERT(id != 0);
315   return id;
316 }
317 
318 /*
319   Produce unique identifier for distributing objects that
320   does not have any global version from NDB. Use own nodeid
321   which is unique in NDB.
322 */
unique_version() const323 uint32 Ndb_schema_dist_client::unique_version() const {
324   const uint32 ver = m_thd_ndb->connection->node_id();
325   DBUG_ASSERT(ver != 0);
326   return ver;
327 }
328 
push_and_clear_schema_op_results()329 void Ndb_schema_dist_client::push_and_clear_schema_op_results() {
330   if (m_schema_op_results.empty()) {
331     return;
332   }
333 
334   // Push results received from participant(s) as warnings. These are meant to
335   // indicate that schema distribution has failed on one of the nodes. For more
336   // information on how and why the failure occured, the relevant error log
337   // remains the place to look
338   for (const Schema_op_result &op_result : m_schema_op_results) {
339     // Warning consists of the node id and message but not result code since
340     // that's an internal detail
341     m_thd_ndb->push_warning("Node %d: '%s'", op_result.nodeid,
342                             op_result.message.c_str());
343   }
344   // Clear the results. This is needed when the Ndb_schema_dist_client object
345   // is reused as is the case during an inplace alter where the same object is
346   // used during both prepare and commit
347   m_schema_op_results.clear();
348 }
349 
log_schema_op(const char * query,size_t query_length,const char * db,const char * table_name,uint32 id,uint32 version,SCHEMA_OP_TYPE type,bool log_query_on_participant)350 bool Ndb_schema_dist_client::log_schema_op(const char *query,
351                                            size_t query_length, const char *db,
352                                            const char *table_name, uint32 id,
353                                            uint32 version, SCHEMA_OP_TYPE type,
354                                            bool log_query_on_participant) {
355   DBUG_TRACE;
356   DBUG_ASSERT(db && table_name);
357   DBUG_ASSERT(id != 0 && version != 0);
358   DBUG_ASSERT(m_thd_ndb);
359 
360   // Never allow temporary names when communicating with participant
361   if (ndb_name_is_temp(db) || ndb_name_is_temp(table_name)) {
362     DBUG_ASSERT(false);
363     return false;
364   }
365 
366   // Require that m_share has been initialized to reference the
367   // schema distribution table
368   ndbcluster::ndbrequire(m_share);
369 
370   // Check that prepared keys match
371   if (!m_prepared_keys.check_key(db, table_name)) {
372     m_thd_ndb->push_warning("INTERNAL ERROR: prepared keys didn't match");
373     DBUG_ASSERT(false);  // Catch in debug
374     return false;
375   }
376 
377   // Don't distribute if thread has turned off schema distribution
378   if (m_thd_ndb->check_option(Thd_ndb::NO_LOG_SCHEMA_OP)) {
379     DBUG_PRINT("info", ("NO_LOG_SCHEMA_OP set - > skip schema distribution"));
380     return true;  // Ok, skipped
381   }
382 
383   // Verify identifier limits, this should already have been caught earlier
384   {
385     std::string invalid_identifier;
386     if (!check_identifier_limits(invalid_identifier)) {
387       m_thd_ndb->push_warning("INTERNAL ERROR: identifier limits exceeded");
388       DBUG_ASSERT(false);  // Catch in debug
389       return false;
390     }
391   }
392 
393   // Calculate anyvalue
394   const Uint32 anyvalue = calculate_anyvalue(log_query_on_participant);
395 
396   const bool result =
397       log_schema_op_impl(m_thd_ndb->ndb, query, static_cast<int>(query_length),
398                          db, table_name, id, version, type, anyvalue);
399   if (!result) {
400     // Schema distribution failed
401     push_and_clear_schema_op_results();
402     m_thd_ndb->push_warning("Schema distribution failed");
403     return false;
404   }
405 
406   // Schema distribution passed but the schema op may have failed on
407   // participants. Push and clear results (if any)
408   push_and_clear_schema_op_results();
409   return true;
410 }
411 
create_table(const char * db,const char * table_name,int id,int version)412 bool Ndb_schema_dist_client::create_table(const char *db,
413                                           const char *table_name, int id,
414                                           int version) {
415   DBUG_TRACE;
416 
417   if (is_schema_dist_table(db, table_name)) {
418     // Create of the schema distribution table is not distributed. Instead,
419     // every MySQL Server have special handling to create it if not
420     // exists and then open it as first step of connecting to the cluster
421     return true;
422   }
423 
424   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
425                        table_name, id, version, SOT_CREATE_TABLE);
426 }
427 
truncate_table(const char * db,const char * table_name,int id,int version)428 bool Ndb_schema_dist_client::truncate_table(const char *db,
429                                             const char *table_name, int id,
430                                             int version) {
431   DBUG_TRACE;
432   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
433                        table_name, id, version, SOT_TRUNCATE_TABLE);
434 }
435 
alter_table(const char * db,const char * table_name,int id,int version,bool log_on_participant)436 bool Ndb_schema_dist_client::alter_table(const char *db, const char *table_name,
437                                          int id, int version,
438                                          bool log_on_participant) {
439   DBUG_TRACE;
440   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
441                        table_name, id, version, SOT_ALTER_TABLE_COMMIT,
442                        log_on_participant);
443 }
444 
alter_table_inplace_prepare(const char * db,const char * table_name,int id,int version)445 bool Ndb_schema_dist_client::alter_table_inplace_prepare(const char *db,
446                                                          const char *table_name,
447                                                          int id, int version) {
448   DBUG_TRACE;
449   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
450                        table_name, id, version, SOT_ONLINE_ALTER_TABLE_PREPARE);
451 }
452 
alter_table_inplace_commit(const char * db,const char * table_name,int id,int version)453 bool Ndb_schema_dist_client::alter_table_inplace_commit(const char *db,
454                                                         const char *table_name,
455                                                         int id, int version) {
456   DBUG_TRACE;
457   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
458                        table_name, id, version, SOT_ONLINE_ALTER_TABLE_COMMIT);
459 }
460 
rename_table_prepare(const char * db,const char * table_name,int id,int version,const char * new_key_for_table)461 bool Ndb_schema_dist_client::rename_table_prepare(
462     const char *db, const char *table_name, int id, int version,
463     const char *new_key_for_table) {
464   DBUG_TRACE;
465   // NOTE! The rename table prepare phase is primarily done in order to
466   // pass the "new key"(i.e db/table_name) for the table to be renamed,
467   // that's since there isn't enough placeholders in the subsequent rename
468   // table phase.
469   return log_schema_op(new_key_for_table, strlen(new_key_for_table), db,
470                        table_name, id, version, SOT_RENAME_TABLE_PREPARE);
471 }
472 
rename_table(const char * db,const char * table_name,int id,int version,const char * new_dbname,const char * new_tabname,bool log_on_participant)473 bool Ndb_schema_dist_client::rename_table(const char *db,
474                                           const char *table_name, int id,
475                                           int version, const char *new_dbname,
476                                           const char *new_tabname,
477                                           bool log_on_participant) {
478   DBUG_TRACE;
479 
480   /*
481     Rewrite the query, the original query may contain several tables but
482     rename_table() is called once for each table in the query.
483       ie. RENAME TABLE t1 to tx, t2 to ty;
484           -> RENAME TABLE t1 to tx + RENAME TABLE t2 to ty
485   */
486   std::string rewritten_query;
487   rewritten_query.append("rename table `")
488       .append(db)
489       .append("`.`")
490       .append(table_name)
491       .append("` to `")
492       .append(new_dbname)
493       .append("`.`")
494       .append(new_tabname)
495       .append("`");
496   DBUG_PRINT("info", ("rewritten query: '%s'", rewritten_query.c_str()));
497 
498   return log_schema_op(rewritten_query.c_str(), rewritten_query.length(), db,
499                        table_name, id, version, SOT_RENAME_TABLE,
500                        log_on_participant);
501 }
502 
drop_table(const char * db,const char * table_name,int id,int version,bool log_on_participant)503 bool Ndb_schema_dist_client::drop_table(const char *db, const char *table_name,
504                                         int id, int version,
505                                         bool log_on_participant) {
506   DBUG_TRACE;
507 
508   /*
509     Never distribute each dropped table as part of DROP DATABASE:
510     1) as only the DROP DATABASE command should go into binlog
511     2) as this MySQL Server is dropping the tables from NDB, when
512        the participants get the DROP DATABASE it will remove
513        any tables from the DD and then remove the database.
514   */
515   DBUG_ASSERT(thd_sql_command(m_thd) != SQLCOM_DROP_DB);
516 
517   /*
518     Rewrite the query, the original query may contain several tables but
519     drop_table() is called once for each table in the query.
520     ie. DROP TABLE t1, t2;
521       -> DROP TABLE t1 + DROP TABLE t2
522   */
523   std::string rewritten_query;
524   rewritten_query.append("drop table `")
525       .append(db)
526       .append("`.`")
527       .append(table_name)
528       .append("`");
529   DBUG_PRINT("info", ("rewritten query: '%s'", rewritten_query.c_str()));
530 
531   // Special case where the table to be dropped was already dropped in the
532   // client. This is considered acceptable behavior and the query is distributed
533   // to ensure that the table is dropped in the pariticipants. Assign values to
534   // id and version to workaround the assumption that they will always be != 0
535   if (id == 0 && version == 0) {
536     id = unique_id();
537     version = unique_version();
538   }
539 
540   return log_schema_op(rewritten_query.c_str(), rewritten_query.length(), db,
541                        table_name, id, version, SOT_DROP_TABLE,
542                        log_on_participant);
543 }
544 
create_db(const char * query,uint query_length,const char * db,unsigned int id,unsigned int version)545 bool Ndb_schema_dist_client::create_db(const char *query, uint query_length,
546                                        const char *db, unsigned int id,
547                                        unsigned int version) {
548   DBUG_TRACE;
549 
550   // Checking identifier limits "late", there is no way to return
551   // an error to fail the CREATE DATABASE command
552   std::string invalid_identifier;
553   if (!check_identifier_limits(invalid_identifier)) {
554     // Check of db name limit failed
555     m_thd_ndb->push_warning("Identifier name '%-.100s' is too long",
556                             invalid_identifier.c_str());
557     return false;
558   }
559 
560   return log_schema_op(query, query_length, db, "", id, version, SOT_CREATE_DB);
561 }
562 
alter_db(const char * query,uint query_length,const char * db,unsigned int id,unsigned int version)563 bool Ndb_schema_dist_client::alter_db(const char *query, uint query_length,
564                                       const char *db, unsigned int id,
565                                       unsigned int version) {
566   DBUG_TRACE;
567 
568   // Checking identifier limits "late", there is no way to return
569   // an error to fail the ALTER DATABASE command
570   std::string invalid_identifier;
571   if (!check_identifier_limits(invalid_identifier)) {
572     // Check of db name limit failed
573     m_thd_ndb->push_warning("Identifier name '%-.100s' is too long",
574                             invalid_identifier.c_str());
575     return false;
576   }
577 
578   return log_schema_op(query, query_length, db, "", id, version, SOT_ALTER_DB);
579 }
580 
drop_db(const char * db)581 bool Ndb_schema_dist_client::drop_db(const char *db) {
582   DBUG_TRACE;
583 
584   // Checking identifier limits "late", there is no way to return
585   // an error to fail the DROP DATABASE command
586   std::string invalid_identifier;
587   if (!check_identifier_limits(invalid_identifier)) {
588     // Check of db name limit failed
589     m_thd_ndb->push_warning("Identifier name '%-.100s' is too long",
590                             invalid_identifier.c_str());
591     return false;
592   }
593 
594   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), db,
595                        "", unique_id(), unique_version(), SOT_DROP_DB);
596 }
597 
598 /* STATEMENT-style ACL change distribution */
acl_notify(const char * database,const char * query,uint query_length,bool participant_refresh)599 bool Ndb_schema_dist_client::acl_notify(const char *database, const char *query,
600                                         uint query_length,
601                                         bool participant_refresh) {
602   DBUG_TRACE;
603   DBUG_ASSERT(m_holding_acl_mutex);
604   auto key = m_prepared_keys.keys()[0];
605   std::string new_query("use ");
606   if (database != nullptr && strcmp(database, "mysql")) {
607     new_query.append(database).append(";").append(query, query_length);
608     query = new_query.c_str();
609     query_length = new_query.size();
610   }
611   SCHEMA_OP_TYPE type =
612       participant_refresh ? SOT_ACL_STATEMENT : SOT_ACL_STATEMENT_REFRESH;
613   return log_schema_op(query, query_length, key.first.c_str(),
614                        key.second.c_str(), unique_id(), unique_version(), type);
615 }
616 
617 /* SNAPSHOT-style ACL change distribution */
acl_notify(std::string user_list)618 bool Ndb_schema_dist_client::acl_notify(std::string user_list) {
619   DBUG_TRACE;
620   DBUG_ASSERT(m_holding_acl_mutex);
621   auto key = m_prepared_keys.keys()[0];
622 
623   return log_schema_op(user_list.c_str(), user_list.length(), key.first.c_str(),
624                        key.second.c_str(), unique_id(), unique_version(),
625                        SOT_ACL_SNAPSHOT);
626 }
627 
tablespace_changed(const char * tablespace_name,int id,int version)628 bool Ndb_schema_dist_client::tablespace_changed(const char *tablespace_name,
629                                                 int id, int version) {
630   DBUG_TRACE;
631   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
632                        tablespace_name, id, version, SOT_TABLESPACE);
633 }
634 
logfilegroup_changed(const char * logfilegroup_name,int id,int version)635 bool Ndb_schema_dist_client::logfilegroup_changed(const char *logfilegroup_name,
636                                                   int id, int version) {
637   DBUG_TRACE;
638   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
639                        logfilegroup_name, id, version, SOT_LOGFILE_GROUP);
640 }
641 
create_tablespace(const char * tablespace_name,int id,int version)642 bool Ndb_schema_dist_client::create_tablespace(const char *tablespace_name,
643                                                int id, int version) {
644   DBUG_TRACE;
645   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
646                        tablespace_name, id, version, SOT_CREATE_TABLESPACE);
647 }
648 
alter_tablespace(const char * tablespace_name,int id,int version)649 bool Ndb_schema_dist_client::alter_tablespace(const char *tablespace_name,
650                                               int id, int version) {
651   DBUG_TRACE;
652   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
653                        tablespace_name, id, version, SOT_ALTER_TABLESPACE);
654 }
655 
drop_tablespace(const char * tablespace_name,int id,int version)656 bool Ndb_schema_dist_client::drop_tablespace(const char *tablespace_name,
657                                              int id, int version) {
658   DBUG_TRACE;
659   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
660                        tablespace_name, id, version, SOT_DROP_TABLESPACE);
661 }
662 
create_logfile_group(const char * logfile_group_name,int id,int version)663 bool Ndb_schema_dist_client::create_logfile_group(
664     const char *logfile_group_name, int id, int version) {
665   DBUG_TRACE;
666   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
667                        logfile_group_name, id, version,
668                        SOT_CREATE_LOGFILE_GROUP);
669 }
670 
alter_logfile_group(const char * logfile_group_name,int id,int version)671 bool Ndb_schema_dist_client::alter_logfile_group(const char *logfile_group_name,
672                                                  int id, int version) {
673   DBUG_TRACE;
674   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
675                        logfile_group_name, id, version,
676                        SOT_ALTER_LOGFILE_GROUP);
677 }
678 
drop_logfile_group(const char * logfile_group_name,int id,int version)679 bool Ndb_schema_dist_client::drop_logfile_group(const char *logfile_group_name,
680                                                 int id, int version) {
681   DBUG_TRACE;
682   return log_schema_op(ndb_thd_query(m_thd), ndb_thd_query_length(m_thd), "",
683                        logfile_group_name, id, version, SOT_DROP_LOGFILE_GROUP);
684 }
685 
type_name(SCHEMA_OP_TYPE type)686 const char *Ndb_schema_dist_client::type_name(SCHEMA_OP_TYPE type) {
687   switch (type) {
688     case SOT_DROP_TABLE:
689       return "DROP_TABLE";
690     case SOT_CREATE_TABLE:
691       return "CREATE_TABLE";
692     case SOT_ALTER_TABLE_COMMIT:
693       return "ALTER_TABLE_COMMIT";
694     case SOT_DROP_DB:
695       return "DROP_DB";
696     case SOT_CREATE_DB:
697       return "CREATE_DB";
698     case SOT_ALTER_DB:
699       return "ALTER_DB";
700     case SOT_CLEAR_SLOCK:
701       return "CLEAR_SLOCK";
702     case SOT_TABLESPACE:
703       return "TABLESPACE";
704     case SOT_LOGFILE_GROUP:
705       return "LOGFILE_GROUP";
706     case SOT_RENAME_TABLE:
707       return "RENAME_TABLE";
708     case SOT_TRUNCATE_TABLE:
709       return "TRUNCATE_TABLE";
710     case SOT_RENAME_TABLE_PREPARE:
711       return "RENAME_TABLE_PREPARE";
712     case SOT_ONLINE_ALTER_TABLE_PREPARE:
713       return "ONLINE_ALTER_TABLE_PREPARE";
714     case SOT_ONLINE_ALTER_TABLE_COMMIT:
715       return "ONLINE_ALTER_TABLE_COMMIT";
716     case SOT_CREATE_USER:
717       return "CREATE_USER";
718     case SOT_DROP_USER:
719       return "DROP_USER";
720     case SOT_RENAME_USER:
721       return "RENAME_USER";
722     case SOT_GRANT:
723       return "GRANT";
724     case SOT_REVOKE:
725       return "REVOKE";
726     case SOT_CREATE_TABLESPACE:
727       return "CREATE_TABLESPACE";
728     case SOT_ALTER_TABLESPACE:
729       return "ALTER_TABLESPACE";
730     case SOT_DROP_TABLESPACE:
731       return "DROP_TABLESPACE";
732     case SOT_CREATE_LOGFILE_GROUP:
733       return "CREATE_LOGFILE_GROUP";
734     case SOT_ALTER_LOGFILE_GROUP:
735       return "ALTER_LOGFILE_GROUP";
736     case SOT_DROP_LOGFILE_GROUP:
737       return "DROP_LOGFILE_GROUP";
738     case SOT_ACL_SNAPSHOT:
739       return "ACL_SNAPSHOT";
740     case SOT_ACL_STATEMENT:
741       return "ACL_STATEMENT";
742     case SOT_ACL_STATEMENT_REFRESH:
743       return "ACL_STATEMENT_REFRESH";
744     default:
745       break;
746   }
747   DBUG_ASSERT(false);
748   return "<unknown>";
749 }
750 
calculate_anyvalue(bool force_nologging) const751 uint32 Ndb_schema_dist_client::calculate_anyvalue(bool force_nologging) const {
752   Uint32 anyValue = 0;
753   if (!thd_slave_thread(m_thd)) {
754     /* Schema change originating from this MySQLD, check SQL_LOG_BIN
755      * variable and pass 'setting' to all logging MySQLDs via AnyValue
756      */
757     if (thd_test_options(m_thd, OPTION_BIN_LOG)) /* e.g. SQL_LOG_BIN == on */
758     {
759       DBUG_PRINT("info", ("Schema event for binlogging"));
760       ndbcluster_anyvalue_set_normal(anyValue);
761     } else {
762       DBUG_PRINT("info", ("Schema event not for binlogging"));
763       ndbcluster_anyvalue_set_nologging(anyValue);
764     }
765 
766     if (!force_nologging) {
767       DBUG_PRINT("info", ("Forcing query not to be binlogged on participant"));
768       ndbcluster_anyvalue_set_nologging(anyValue);
769     }
770   } else {
771     /*
772        Slave propagating replicated schema event in ndb_schema
773        In case replicated serverId is composite
774        (server-id-bits < 31) we copy it into the
775        AnyValue as-is
776        This is for 'future', as currently Schema operations
777        do not have composite AnyValues.
778        In future it may be useful to support *not* mapping composite
779        AnyValues to/from Binlogged server-ids.
780     */
781     DBUG_PRINT("info", ("Replicated schema event with original server id"));
782     anyValue = thd_unmasked_server_id(m_thd);
783   }
784 
785 #ifndef DBUG_OFF
786   /*
787     MySQLD will set the user-portion of AnyValue (if any) to all 1s
788     This tests code filtering ServerIds on the value of server-id-bits.
789   */
790   const char *p = getenv("NDB_TEST_ANYVALUE_USERDATA");
791   if (p != 0 && *p != 0 && *p != '0' && *p != 'n' && *p != 'N') {
792     dbug_ndbcluster_anyvalue_set_userbits(anyValue);
793   }
794 #endif
795   DBUG_PRINT("info", ("anyvalue: %u", anyValue));
796   return anyValue;
797 }
798