1 /* Copyright (c) 2014, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/dd/impl/dictionary_impl.h"
24 
25 #include <string.h>
26 #include <memory>
27 
28 #include "m_ctype.h"
29 #include "m_string.h"
30 #include "my_dbug.h"
31 #include "my_inttypes.h"
32 #include "my_sys.h"
33 #include "mysql/thread_type.h"
34 #include "mysql/udf_registration_types.h"
35 #include "mysql_com.h"
36 #include "mysqld_error.h"
37 #include "sql/auth/auth_common.h"  // acl_init
38 #include "sql/auth/sql_security_ctx.h"
39 #include "sql/auto_thd.h"                        // Auto_thd
40 #include "sql/bootstrap.h"                       // bootstrap::bootstrap_functor
41 #include "sql/dd/cache/dictionary_client.h"      // dd::Dictionary_client
42 #include "sql/dd/dd.h"                           // enum_dd_init_type
43 #include "sql/dd/dd_schema.h"                    // dd::Schema_MDL_locker
44 #include "sql/dd/dd_version.h"                   // dd::DD_VERSION
45 #include "sql/dd/impl/bootstrap/bootstrapper.h"  // dd::Bootstrapper
46 #include "sql/dd/impl/cache/shared_dictionary_cache.h"  // Shared_dictionary_cache
47 #include "sql/dd/impl/system_registry.h"                // dd::System_tables
48 #include "sql/dd/impl/tables/columns.h"                 // dd::tables::Columns
49 #include "sql/dd/impl/tables/dd_properties.h"     // get_actual_dd_version()
50 #include "sql/dd/impl/tables/indexes.h"           // dd::tables::Indexes
51 #include "sql/dd/impl/tables/table_partitions.h"  // dd::tables::Table_partitions
52 #include "sql/dd/impl/tables/tables.h"            // dd::tables::Tables
53 #include "sql/dd/impl/tables/tablespaces.h"       // dd::tables::Tablespaces
54 #include "sql/dd/impl/utils.h"                    // dd::tables::Tablespaces
55 #include "sql/dd/info_schema/metadata.h"  // dd::info_schema::store_dynamic...
56 #include "sql/dd/types/abstract_table.h"  // dd::Abstract_table::DD_table
57 #include "sql/dd/types/column.h"          // dd::Column::DD_table
58 #include "sql/dd/types/index.h"           // dd::Index::DD_table
59 #include "sql/dd/types/object_table_definition.h"
60 #include "sql/dd/types/partition.h"  // dd::Partition::DD_table
61 #include "sql/dd/types/system_view.h"
62 #include "sql/dd/types/table.h"         // dd::Table::DD_table
63 #include "sql/dd/types/tablespace.h"    // dd::Tablespace::DD_table
64 #include "sql/dd/upgrade_57/upgrade.h"  // dd::upgrade
65 #include "sql/derror.h"
66 #include "sql/handler.h"
67 #include "sql/mdl.h"
68 #include "sql/opt_costconstantcache.h"  // init_optimizer_cost_module
69 #include "sql/plugin_table.h"
70 #include "sql/sql_base.h"   // close_cached_tables
71 #include "sql/sql_class.h"  // THD
72 #include "sql/system_variables.h"
73 #include "sql/thd_raii.h"                       // Disable_autocommit_guard
74 #include "sql/transaction.h"                    // trans_commit()
75 #include "storage/perfschema/pfs_dd_version.h"  // PFS_DD_VERSION
76 
77 extern Cost_constant_cache *cost_constant_cache;  // defined in
78                                                   // opt_costconstantcache.cc
79 
80 ///////////////////////////////////////////////////////////////////////////
81 
82 namespace dd {
83 
84 ///////////////////////////////////////////////////////////////////////////
85 // Implementation details.
86 ///////////////////////////////////////////////////////////////////////////
87 
88 class Object_table;
89 class Table;
90 
91 Dictionary_impl *Dictionary_impl::s_instance = nullptr;
92 
instance()93 Dictionary_impl *Dictionary_impl::instance() { return s_instance; }
94 
95 Object_id Dictionary_impl::DEFAULT_CATALOG_ID = 1;
96 Object_id Dictionary_impl::DD_TABLESPACE_ID = 1;
97 const String_type Dictionary_impl::DEFAULT_CATALOG_NAME("def");
98 
99 ///////////////////////////////////////////////////////////////////////////
100 
init(enum_dd_init_type dd_init)101 bool Dictionary_impl::init(enum_dd_init_type dd_init) {
102   if (dd_init == enum_dd_init_type::DD_INITIALIZE ||
103       dd_init == enum_dd_init_type::DD_RESTART_OR_UPGRADE) {
104     DBUG_ASSERT(!Dictionary_impl::s_instance);
105 
106     if (Dictionary_impl::s_instance) return false; /* purecov: inspected */
107 
108     std::unique_ptr<Dictionary_impl> d(new Dictionary_impl());
109 
110     Dictionary_impl::s_instance = d.release();
111   }
112 
113   /*
114     Initialize the cost model, but delete it after the dd is initialized.
115     This is because the cost model is needed for the dd initialization, but
116     it must be re-initialized later after the plugins have been initialized.
117     Upgrade process needs heap engine initialized, hence parameter 'true'
118     is passed to the function.
119   */
120   bool cost_constant_inited = false;
121   if (cost_constant_cache == nullptr) {
122     init_optimizer_cost_module(true);
123     cost_constant_inited = true;
124   }
125 
126   // Disable table encryption privilege checks for system threads.
127   bool saved_table_encryption_privilege_check =
128       opt_table_encryption_privilege_check;
129   opt_table_encryption_privilege_check = false;
130 
131   /*
132     Install or start or upgrade the dictionary
133     depending on bootstrapping option.
134   */
135 
136   bool result = false;
137 
138   // Creation of Data Dictionary through current server
139   if (dd_init == enum_dd_init_type::DD_INITIALIZE)
140     result = ::bootstrap::run_bootstrap_thread(
141         nullptr, nullptr, &bootstrap::initialize, SYSTEM_THREAD_DD_INITIALIZE);
142 
143   // Creation of INFORMATION_SCHEMA system views.
144   else if (dd_init == enum_dd_init_type::DD_INITIALIZE_SYSTEM_VIEWS)
145     result = ::bootstrap::run_bootstrap_thread(nullptr, nullptr,
146                                                &dd::info_schema::initialize,
147                                                SYSTEM_THREAD_DD_INITIALIZE);
148 
149   /*
150     Creation of Dictionary Tables in old Data Directory
151     This function also takes care of normal server restart.
152   */
153   else if (dd_init == enum_dd_init_type::DD_RESTART_OR_UPGRADE)
154     result = ::bootstrap::run_bootstrap_thread(
155         nullptr, nullptr, &upgrade_57::do_pre_checks_and_initialize_dd,
156         SYSTEM_THREAD_DD_INITIALIZE);
157 
158   // Populate metadata in DD tables from old data directory and do cleanup.
159   else if (dd_init == enum_dd_init_type::DD_POPULATE_UPGRADE)
160     result = ::bootstrap::run_bootstrap_thread(
161         nullptr, nullptr, &upgrade_57::fill_dd_and_finalize,
162         SYSTEM_THREAD_DD_INITIALIZE);
163 
164   // Delete DD tables and do cleanup in case of error in upgrade
165   else if (dd_init == enum_dd_init_type::DD_DELETE)
166     result = ::bootstrap::run_bootstrap_thread(
167         nullptr, nullptr, &upgrade_57::terminate, SYSTEM_THREAD_DD_INITIALIZE);
168 
169   // Update server and plugin I_S table metadata into DD tables.
170   else if (dd_init == enum_dd_init_type::DD_UPDATE_I_S_METADATA)
171     result = ::bootstrap::run_bootstrap_thread(
172         nullptr, nullptr, &dd::info_schema::update_I_S_metadata,
173         SYSTEM_THREAD_DD_INITIALIZE);
174 
175   // Creation of non-dd-based INFORMATION_SCHEMA system views.
176   else if (dd_init ==
177            enum_dd_init_type::DD_INITIALIZE_NON_DD_BASED_SYSTEM_VIEWS)
178     result = ::bootstrap::run_bootstrap_thread(
179         nullptr, nullptr, &dd::info_schema::init_non_dd_based_system_view,
180         SYSTEM_THREAD_DD_INITIALIZE);
181 
182   // Restore the table_encryption_privilege_check.
183   opt_table_encryption_privilege_check = saved_table_encryption_privilege_check;
184 
185   /* Now that the dd is initialized, delete the cost model. */
186   if (cost_constant_inited) delete_optimizer_cost_module();
187 
188   return result;
189 }
190 
191 ///////////////////////////////////////////////////////////////////////////
192 
shutdown()193 bool Dictionary_impl::shutdown() {
194   if (!Dictionary_impl::s_instance) return true;
195 
196   delete Dictionary_impl::s_instance;
197   Dictionary_impl::s_instance = nullptr;
198 
199   return false;
200 }
201 
202 ///////////////////////////////////////////////////////////////////////////
203 // Implementation details.
204 ///////////////////////////////////////////////////////////////////////////
205 
get_target_dd_version()206 uint Dictionary_impl::get_target_dd_version() { return dd::DD_VERSION; }
207 
208 ///////////////////////////////////////////////////////////////////////////
209 
get_actual_dd_version(THD * thd)210 uint Dictionary_impl::get_actual_dd_version(THD *thd) {
211   bool exists = false;
212   uint version = 0;
213   bool error MY_ATTRIBUTE((unused)) = tables::DD_properties::instance().get(
214       thd, "DD_VERSION", &version, &exists);
215   DBUG_ASSERT(!error);
216   DBUG_ASSERT(exists);
217   return version;
218 }
219 
220 ///////////////////////////////////////////////////////////////////////////
221 
get_target_I_S_version()222 uint Dictionary_impl::get_target_I_S_version() {
223   return dd::info_schema::IS_DD_VERSION;
224 }
225 
226 ///////////////////////////////////////////////////////////////////////////
227 
get_actual_I_S_version(THD * thd)228 uint Dictionary_impl::get_actual_I_S_version(THD *thd) {
229   bool exists = false;
230   uint version = 0;
231   bool error MY_ATTRIBUTE((unused)) = tables::DD_properties::instance().get(
232       thd, "IS_VERSION", &version, &exists);
233   DBUG_ASSERT(!error);
234   DBUG_ASSERT(exists);
235   return version;
236 }
237 
238 ///////////////////////////////////////////////////////////////////////////
239 
set_I_S_version(THD * thd,uint version)240 uint Dictionary_impl::set_I_S_version(THD *thd, uint version) {
241   return tables::DD_properties::instance().set(thd, "IS_VERSION", version);
242 }
243 
244 ///////////////////////////////////////////////////////////////////////////
245 
get_target_P_S_version()246 uint Dictionary_impl::get_target_P_S_version() { return PFS_DD_VERSION; }
247 
248 ///////////////////////////////////////////////////////////////////////////
249 
get_actual_P_S_version(THD * thd)250 uint Dictionary_impl::get_actual_P_S_version(THD *thd) {
251   bool exists = false;
252   uint version = 0;
253   bool error MY_ATTRIBUTE((unused)) = tables::DD_properties::instance().get(
254       thd, "PS_VERSION", &version, &exists);
255   DBUG_ASSERT(!error);
256   DBUG_ASSERT(exists);
257   return version;
258 }
259 
260 ///////////////////////////////////////////////////////////////////////////
261 
get_actual_ndbinfo_schema_version(THD * thd,uint * ver)262 bool Dictionary_impl::get_actual_ndbinfo_schema_version(THD *thd, uint *ver) {
263   bool exists = false;
264   tables::DD_properties::instance().get(thd, "NDBINFO_VERSION", ver, &exists);
265   return exists;
266 }
267 
268 ///////////////////////////////////////////////////////////////////////////
269 
set_ndbinfo_schema_version(THD * thd,uint version)270 uint Dictionary_impl::set_ndbinfo_schema_version(THD *thd, uint version) {
271   return tables::DD_properties::instance().set(thd, "NDBINFO_VERSION", version);
272 }
273 
274 ///////////////////////////////////////////////////////////////////////////
275 
set_P_S_version(THD * thd,uint version)276 uint Dictionary_impl::set_P_S_version(THD *thd, uint version) {
277   return tables::DD_properties::instance().set(thd, "PS_VERSION", version);
278 }
279 
280 ///////////////////////////////////////////////////////////////////////////
281 
get_dd_table(const String_type & schema_name,const String_type & table_name) const282 const Object_table *Dictionary_impl::get_dd_table(
283     const String_type &schema_name, const String_type &table_name) const {
284   if (!is_dd_schema_name(schema_name)) return nullptr;
285 
286   return System_tables::instance()->find_table(schema_name, table_name);
287 }
288 
289 ///////////////////////////////////////////////////////////////////////////
290 
is_dd_table_name(const String_type & schema_name,const String_type & table_name) const291 bool Dictionary_impl::is_dd_table_name(const String_type &schema_name,
292                                        const String_type &table_name) const {
293   if (!is_dd_schema_name(schema_name)) return false;
294 
295   const System_tables::Types *table_type =
296       System_tables::instance()->find_type(schema_name, table_name);
297 
298   return (table_type != nullptr &&
299           (*table_type == System_tables::Types::CORE ||
300            *table_type == System_tables::Types::INERT ||
301            *table_type == System_tables::Types::SECOND ||
302            *table_type == System_tables::Types::DDSE_PRIVATE ||
303            *table_type == System_tables::Types::DDSE_PROTECTED));
304 }
305 
306 ///////////////////////////////////////////////////////////////////////////
307 
is_system_table_name(const String_type & schema_name,const String_type & table_name) const308 bool Dictionary_impl::is_system_table_name(
309     const String_type &schema_name, const String_type &table_name) const {
310   if (!is_dd_schema_name(schema_name)) return false;
311 
312   const System_tables::Types *table_type =
313       System_tables::instance()->find_type(schema_name, table_name);
314 
315   return (table_type != nullptr &&
316           (*table_type == System_tables::Types::SYSTEM));
317 }
318 
319 ///////////////////////////////////////////////////////////////////////////
320 
table_type_error_code(const String_type & schema_name,const String_type & table_name) const321 int Dictionary_impl::table_type_error_code(
322     const String_type &schema_name, const String_type &table_name) const {
323   const System_tables::Types *type =
324       System_tables::instance()->find_type(schema_name, table_name);
325   if (type != nullptr) return System_tables::type_name_error_code(*type);
326   return ER_NO_SYSTEM_TABLE_ACCESS_FOR_TABLE;
327 }
328 
329 ///////////////////////////////////////////////////////////////////////////
330 
is_dd_table_access_allowed(bool is_dd_internal_thread,bool is_ddl_statement,const char * schema_name,size_t schema_length,const char * table_name) const331 bool Dictionary_impl::is_dd_table_access_allowed(bool is_dd_internal_thread,
332                                                  bool is_ddl_statement,
333                                                  const char *schema_name,
334                                                  size_t schema_length,
335                                                  const char *table_name) const {
336   /*
337     From WL#6391, we have the following matrix describing access:
338 
339     ---------+---------------------+
340              | Dictionary internal |
341     ---------+----------+----------+
342              |   DDL    |   DML    |
343     ---------+-----+----+-----+----+
344              | IN  | EX | IN  | EX |
345     ---------+-----+----+-----+----+
346     Inert    |  X          X       |
347     Core     |  X          X       |
348     Second   |  X          X       |
349     DDSE_priv|  X          X       |
350     DDSE_prot|  X          X    X  |
351     SYSTEM   |  X    X     X    X  |
352     ---------+---------------------+
353 
354     For performance reasons, we first check the schema
355     name to shortcut the evaluation. If the table is not in
356     the 'mysql' schema, we don't need any further checks. Same for
357     checking for internal threads - an internal thread has full
358     access. We also allow access if the appropriate debug flag
359     is set.
360   */
361   if (schema_length != MYSQL_SCHEMA_NAME.length ||
362       strncmp(schema_name, MYSQL_SCHEMA_NAME.str, MYSQL_SCHEMA_NAME.length) ||
363       is_dd_internal_thread ||
364       DBUG_EVALUATE_IF("skip_dd_table_access_check", true, false))
365     return true;
366 
367   // Now we need to get the table type.
368   const String_type schema_str(schema_name);
369   const String_type table_str(table_name);
370   const System_tables::Types *table_type =
371       System_tables::instance()->find_type(schema_str, table_str);
372 
373   /*
374     Access allowed for external DD tables, for DML on protected DDSE tables,
375     and for any operation on SYSTEM tables.
376   */
377   return (table_type == nullptr ||
378           (*table_type == System_tables::Types::DDSE_PROTECTED &&
379            !is_ddl_statement) ||
380           *table_type == System_tables::Types::SYSTEM);
381 }
382 
383 ///////////////////////////////////////////////////////////////////////////
384 
is_system_view_name(const char * schema_name,const char * table_name,bool * hidden) const385 bool Dictionary_impl::is_system_view_name(const char *schema_name,
386                                           const char *table_name,
387                                           bool *hidden) const {
388   /*
389     TODO One possible improvement here could be to try and use the variant
390     of is_infoschema_db() that takes length as a parameter. Then, if the
391     schema name length is different, this can quickly be used to conclude
392     that this is indeed not a system view, without having to do a strcmp at
393     all.
394   */
395   if (schema_name == nullptr || table_name == nullptr ||
396       is_infoschema_db(schema_name) == false)
397     return false;
398 
399   // The System_views registry stores the view name in uppercase.
400   // So convert the input to uppercase before search.
401   char tab_name_buf[NAME_LEN + 1];
402   my_stpcpy(tab_name_buf, table_name);
403   my_caseup_str(system_charset_info, tab_name_buf);
404 
405   const system_views::System_view *s =
406       System_views::instance()->find(INFORMATION_SCHEMA_NAME.str, tab_name_buf);
407 
408   if (s)
409     *hidden = s->hidden();
410   else
411     *hidden = false;
412 
413   return s != nullptr;
414 }
415 
416 ///////////////////////////////////////////////////////////////////////////
417 
418 /*
419   Global interface methods at 'dd' namespace.
420   Following are couple of API's that InnoDB needs to acquire MDL locks.
421 */
422 
acquire_mdl(THD * thd,MDL_key::enum_mdl_namespace lock_namespace,const char * schema_name,const char * table_name,bool no_wait,ulong lock_wait_timeout,enum_mdl_type lock_type,enum_mdl_duration lock_duration,MDL_ticket ** out_mdl_ticket)423 static bool acquire_mdl(THD *thd, MDL_key::enum_mdl_namespace lock_namespace,
424                         const char *schema_name, const char *table_name,
425                         bool no_wait, ulong lock_wait_timeout,
426                         enum_mdl_type lock_type,
427                         enum_mdl_duration lock_duration,
428                         MDL_ticket **out_mdl_ticket) {
429   DBUG_TRACE;
430 
431   MDL_request mdl_request;
432   MDL_REQUEST_INIT(&mdl_request, lock_namespace, schema_name, table_name,
433                    lock_type, lock_duration);
434 
435   /*
436     If there is a request for an exclusive lock, we also need to acquire
437     a transactional intention exclusive backup lock and global read lock
438     (this is not done to get a lock, but rather to protect against others
439     setting the backup- or global read lock).
440   */
441   MDL_request_list mdl_requests;
442   mdl_requests.push_front(&mdl_request);
443 
444   MDL_request *grl_request = nullptr;
445   MDL_request *bl_request = nullptr;
446   if (lock_type == MDL_EXCLUSIVE) {
447     // If we cannot acquire protection against GRL, err out.
448     if (thd->global_read_lock.can_acquire_protection()) return true;
449 
450     grl_request = new (thd->mem_root) MDL_request;
451     MDL_REQUEST_INIT(grl_request, MDL_key::GLOBAL, "", "",
452                      MDL_INTENTION_EXCLUSIVE, MDL_TRANSACTION);
453     mdl_requests.push_front(grl_request);
454 
455     bl_request = new (thd->mem_root) MDL_request;
456     MDL_REQUEST_INIT(bl_request, MDL_key::BACKUP_LOCK, "", "",
457                      MDL_INTENTION_EXCLUSIVE, MDL_TRANSACTION);
458     mdl_requests.push_front(bl_request);
459   }
460 
461   /*
462     With no_wait, we acquire the locks one by one. When waiting,
463     we use the lock request list to get either all or none in
464     the same acquisition.
465   */
466   if (no_wait) {
467     if (thd->mdl_context.try_acquire_lock(&mdl_request) ||
468         (grl_request != nullptr &&
469          thd->mdl_context.try_acquire_lock(bl_request)) ||
470         (bl_request != nullptr &&
471          thd->mdl_context.try_acquire_lock(grl_request))) {
472       return true;
473     }
474   } else if (thd->mdl_context.acquire_locks(&mdl_requests, lock_wait_timeout))
475     return true;
476 
477   /*
478     Unlike in other places where we acquire protection against global read
479     lock, the read_only state is not checked here since it is handled by
480     the caller or extra steps are taken to correctly ignore it. Also checking
481     read_only state can be problematic for background threads like drop table
482     thread and purge thread which can be initiated on behalf of statements
483     executed by replication thread where the read_only state does not apply.
484   */
485 
486   if (out_mdl_ticket) *out_mdl_ticket = mdl_request.ticket;
487 
488   return false;
489 }
490 
acquire_shared_table_mdl(THD * thd,const char * schema_name,const char * table_name,bool no_wait,MDL_ticket ** out_mdl_ticket)491 bool acquire_shared_table_mdl(THD *thd, const char *schema_name,
492                               const char *table_name, bool no_wait,
493                               MDL_ticket **out_mdl_ticket) {
494   return acquire_mdl(thd, MDL_key::TABLE, schema_name, table_name, no_wait,
495                      thd->variables.lock_wait_timeout, MDL_SHARED, MDL_EXPLICIT,
496                      out_mdl_ticket);
497 }
498 
has_shared_table_mdl(THD * thd,const char * schema_name,const char * table_name)499 bool has_shared_table_mdl(THD *thd, const char *schema_name,
500                           const char *table_name) {
501   return thd->mdl_context.owns_equal_or_stronger_lock(
502       MDL_key::TABLE, schema_name, table_name, MDL_SHARED);
503 }
504 
has_exclusive_table_mdl(THD * thd,const char * schema_name,const char * table_name)505 bool has_exclusive_table_mdl(THD *thd, const char *schema_name,
506                              const char *table_name) {
507   return thd->mdl_context.owns_equal_or_stronger_lock(
508       MDL_key::TABLE, schema_name, table_name, MDL_EXCLUSIVE);
509 }
510 
acquire_exclusive_tablespace_mdl(THD * thd,const char * tablespace_name,bool no_wait,MDL_ticket ** ticket,bool for_trx)511 bool acquire_exclusive_tablespace_mdl(THD *thd, const char *tablespace_name,
512                                       bool no_wait, MDL_ticket **ticket,
513                                       bool for_trx) {
514   enum_mdl_duration duration = (for_trx ? MDL_TRANSACTION : MDL_EXPLICIT);
515   return acquire_mdl(thd, MDL_key::TABLESPACE, "", tablespace_name, no_wait,
516                      thd->variables.lock_wait_timeout, MDL_EXCLUSIVE, duration,
517                      ticket);
518 }
519 
acquire_shared_tablespace_mdl(THD * thd,const char * tablespace_name,bool no_wait,MDL_ticket ** ticket,bool for_trx)520 bool acquire_shared_tablespace_mdl(THD *thd, const char *tablespace_name,
521                                    bool no_wait, MDL_ticket **ticket,
522                                    bool for_trx) {
523   // When requesting a tablespace name lock, we leave the schema name empty.
524   enum_mdl_duration duration = (for_trx ? MDL_TRANSACTION : MDL_EXPLICIT);
525   return acquire_mdl(thd, MDL_key::TABLESPACE, "", tablespace_name, no_wait,
526                      thd->variables.lock_wait_timeout, MDL_SHARED, duration,
527                      ticket);
528 }
529 
has_shared_tablespace_mdl(THD * thd,const char * tablespace_name)530 bool has_shared_tablespace_mdl(THD *thd, const char *tablespace_name) {
531   // When checking a tablespace name lock, we leave the schema name empty.
532   return thd->mdl_context.owns_equal_or_stronger_lock(
533       MDL_key::TABLESPACE, "", tablespace_name, MDL_SHARED);
534 }
535 
has_exclusive_tablespace_mdl(THD * thd,const char * tablespace_name)536 bool has_exclusive_tablespace_mdl(THD *thd, const char *tablespace_name) {
537   // When checking a tablespace name lock, we leave the schema name empty.
538   return thd->mdl_context.owns_equal_or_stronger_lock(
539       MDL_key::TABLESPACE, "", tablespace_name, MDL_EXCLUSIVE);
540 }
541 
acquire_exclusive_table_mdl(THD * thd,const char * schema_name,const char * table_name,bool no_wait,MDL_ticket ** out_mdl_ticket)542 bool acquire_exclusive_table_mdl(THD *thd, const char *schema_name,
543                                  const char *table_name, bool no_wait,
544                                  MDL_ticket **out_mdl_ticket) {
545   return acquire_mdl(thd, MDL_key::TABLE, schema_name, table_name, no_wait,
546                      thd->variables.lock_wait_timeout, MDL_EXCLUSIVE,
547                      MDL_TRANSACTION, out_mdl_ticket);
548 }
549 
acquire_exclusive_table_mdl(THD * thd,const char * schema_name,const char * table_name,unsigned long int lock_wait_timeout,MDL_ticket ** out_mdl_ticket)550 bool acquire_exclusive_table_mdl(THD *thd, const char *schema_name,
551                                  const char *table_name,
552                                  unsigned long int lock_wait_timeout,
553                                  MDL_ticket **out_mdl_ticket) {
554   return acquire_mdl(thd, MDL_key::TABLE, schema_name, table_name, false,
555                      lock_wait_timeout, MDL_EXCLUSIVE, MDL_TRANSACTION,
556                      out_mdl_ticket);
557 }
558 
acquire_exclusive_schema_mdl(THD * thd,const char * schema_name,bool no_wait,MDL_ticket ** out_mdl_ticket)559 bool acquire_exclusive_schema_mdl(THD *thd, const char *schema_name,
560                                   bool no_wait, MDL_ticket **out_mdl_ticket) {
561   return acquire_mdl(thd, MDL_key::SCHEMA, schema_name, "", no_wait,
562                      thd->variables.lock_wait_timeout, MDL_EXCLUSIVE,
563                      MDL_EXPLICIT, out_mdl_ticket);
564 }
565 
release_mdl(THD * thd,MDL_ticket * mdl_ticket)566 void release_mdl(THD *thd, MDL_ticket *mdl_ticket) {
567   DBUG_TRACE;
568 
569   thd->mdl_context.release_lock(mdl_ticket);
570 }
571 
572 /* purecov: begin deadcode */
get_dd_client(THD * thd)573 cache::Dictionary_client *get_dd_client(THD *thd) { return thd->dd_client(); }
574 /* purecov: end */
575 
create_native_table(THD * thd,const Plugin_table * pt)576 bool create_native_table(THD *thd, const Plugin_table *pt) {
577   if (dd::get_dictionary()->is_dd_table_name(pt->get_schema_name(),
578                                              pt->get_name())) {
579     my_error(ER_NO_SYSTEM_TABLE_ACCESS, MYF(0),
580              ER_THD_NONCONST(thd, dd::get_dictionary()->table_type_error_code(
581                                       pt->get_schema_name(), pt->get_name())),
582              pt->get_schema_name(), pt->get_name());
583 
584     return true;
585   }
586 
587   // Acquire MDL on new native table that we would create.
588   bool error = false;
589   MDL_request mdl_request;
590   MDL_REQUEST_INIT(&mdl_request, MDL_key::TABLE, pt->get_schema_name(),
591                    pt->get_name(), MDL_EXCLUSIVE, MDL_TRANSACTION);
592   dd::Schema_MDL_locker mdl_locker(thd);
593   if (mdl_locker.ensure_locked(pt->get_schema_name()) ||
594       thd->mdl_context.acquire_lock(&mdl_request,
595                                     thd->variables.lock_wait_timeout))
596     return true;
597 
598   /*
599     1. Mark that we are executing a special DDL during
600     plugin initialization. This will enable DDL to not be
601     committed or binlogged. The called of this API would commit
602     the transaction.
603 
604     2. Remove metadata of native table if already exists. This could
605     happen if server was crashed and restarted.
606 
607     3. Create native table.
608 
609     4. Undo 1.
610   */
611   dd::cache::Dictionary_client *client = thd->dd_client();
612   const dd::Table *table_def = nullptr;
613   if (client->acquire(pt->get_schema_name(), pt->get_name(), &table_def))
614     return true;
615 
616   thd->mark_plugin_fake_ddl(true);
617   ulong master_access = thd->security_context()->master_access();
618   thd->security_context()->set_master_access(~(ulong)0);
619   {
620     Disable_binlog_guard guard(thd);
621 
622     // Drop the table and related dynamic statistics too.
623     if (table_def) {
624       error =
625           client->drop(table_def) || client->remove_table_dynamic_statistics(
626                                          pt->get_schema_name(), pt->get_name());
627     }
628 
629     if (!error) error = dd::execute_query(thd, pt->get_ddl());
630   }
631 
632   thd->security_context()->set_master_access(master_access);
633   thd->mark_plugin_fake_ddl(false);
634 
635   return error;
636 }
637 
638 // Remove metadata of native table from DD tables.
drop_native_table(THD * thd,const char * schema_name,const char * table_name)639 bool drop_native_table(THD *thd, const char *schema_name,
640                        const char *table_name) {
641   if (dd::get_dictionary()->is_dd_table_name(schema_name, table_name)) {
642     my_error(ER_NO_SYSTEM_TABLE_ACCESS, MYF(0),
643              ER_THD_NONCONST(thd, dd::get_dictionary()->table_type_error_code(
644                                       schema_name, table_name)),
645              schema_name, table_name);
646 
647     return true;
648   }
649 
650   // Acquire MDL on schema and table.
651   MDL_request mdl_request;
652   MDL_REQUEST_INIT(&mdl_request, MDL_key::TABLE, schema_name, table_name,
653                    MDL_EXCLUSIVE, MDL_TRANSACTION);
654   dd::Schema_MDL_locker mdl_locker(thd);
655   if (mdl_locker.ensure_locked(schema_name) ||
656       thd->mdl_context.acquire_lock(&mdl_request,
657                                     thd->variables.lock_wait_timeout))
658     return true;
659 
660   dd::cache::Dictionary_client *client = thd->dd_client();
661   const dd::Table *table_def = nullptr;
662   if (client->acquire(schema_name, table_name, &table_def)) {
663     // Error is reported by the dictionary subsystem.
664     return true;
665   }
666 
667   // Not error is reported if table is not present.
668   if (!table_def) return false;
669 
670   // Drop the table and related dynamic statistics too.
671   return client->drop(table_def) ||
672          client->remove_table_dynamic_statistics(schema_name, table_name);
673 }
674 
reset_tables_and_tablespaces()675 bool reset_tables_and_tablespaces() {
676   Auto_THD thd;
677   handlerton *ddse = ha_resolve_by_legacy_type(thd.thd, DB_TYPE_INNODB);
678 
679   // Acquire transactional metadata locks and evict all cached objects.
680   if (dd::cache::Shared_dictionary_cache::reset_tables_and_tablespaces(thd.thd))
681     return true;
682 
683   // Evict all cached objects in the DD cache in the DDSE.
684   if (ddse->dict_cache_reset_tables_and_tablespaces != nullptr)
685     ddse->dict_cache_reset_tables_and_tablespaces();
686 
687   bool ret = close_cached_tables(nullptr, nullptr, false, LONG_TIMEOUT);
688 
689   // Release transactional metadata locks.
690   thd.thd->mdl_context.release_transactional_locks();
691 
692   return ret;
693 }
694 
commit_or_rollback_tablespace_change(THD * thd,dd::Tablespace * space,bool error,bool release_mdl_on_commit_only)695 bool commit_or_rollback_tablespace_change(THD *thd, dd::Tablespace *space,
696                                           bool error,
697                                           bool release_mdl_on_commit_only) {
698   dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
699   Disable_autocommit_guard autocommit_guard(thd);
700 
701   if (!error && space != nullptr) {
702     error = thd->dd_client()->update(space);
703   }
704 
705   if (error) {
706     trans_rollback_stmt(thd);
707     trans_rollback(thd);
708   } else {
709     error = trans_commit_stmt(thd) || trans_commit(thd);
710   }
711 
712   if (!error || !release_mdl_on_commit_only) {
713     thd->mdl_context.release_transactional_locks();
714   }
715   return error;
716 }
717 
718 template <typename Entity_object_type>
get_dd_table()719 const Object_table &get_dd_table() {
720   return Entity_object_type::DD_table::instance();
721 }
722 
723 template const Object_table &get_dd_table<dd::Column>();
724 template const Object_table &get_dd_table<dd::Index>();
725 template const Object_table &get_dd_table<dd::Partition>();
726 template const Object_table &get_dd_table<dd::Table>();
727 template const Object_table &get_dd_table<dd::Tablespace>();
728 
rename_tablespace_mdl_hook(THD * thd,MDL_ticket * src,MDL_ticket * dst)729 void rename_tablespace_mdl_hook(THD *thd, MDL_ticket *src, MDL_ticket *dst) {
730   if (!thd->locked_tables_mode) {
731     return;
732   }
733   thd->locked_tables_list.add_rename_tablespace_mdls(src, dst);
734 }
735 
736 }  // namespace dd
737