1 /* Copyright (c) 2014, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/dd/impl/bootstrap/bootstrapper.h"
24 
25 #include <stddef.h>
26 #include <sys/types.h>
27 #include <memory>
28 #include <new>
29 #include <set>
30 #include <string>
31 #include <utility>
32 #include <vector>
33 
34 #include "m_ctype.h"
35 #include "my_dbug.h"
36 #include "my_loglevel.h"
37 #include "my_sys.h"
38 #include "mysql/components/services/log_builtins.h"
39 #include "mysql_version.h"  // MYSQL_VERSION_ID
40 #include "mysqld_error.h"
41 #include "sql/auth/sql_security_ctx.h"
42 #include "sql/dd/cache/dictionary_client.h"  // dd::cache::Dictionary_client
43 #include "sql/dd/dd.h"                       // dd::create_object
44 #include "sql/dd/impl/bootstrap/bootstrap_ctx.h"        // DD_bootstrap_ctx
45 #include "sql/dd/impl/cache/shared_dictionary_cache.h"  // Shared_dictionary_cache
46 #include "sql/dd/impl/cache/storage_adapter.h"          // Storage_adapter
47 #include "sql/dd/impl/dictionary_impl.h"                // dd::Dictionary_impl
48 #include "sql/dd/impl/raw/object_keys.h"
49 #include "sql/dd/impl/sdi.h"                    // dd::sdi::store
50 #include "sql/dd/impl/tables/character_sets.h"  // dd::tables::Character_sets
51 #include "sql/dd/impl/tables/collations.h"      // dd::tables::Collations
52 #include "sql/dd/impl/tables/dd_properties.h"   // dd::tables::DD_properties
53 #include "sql/dd/impl/tables/tables.h"          // dd::tables::Tables
54 #include "sql/dd/impl/types/schema_impl.h"      // dd::Schema_impl
55 #include "sql/dd/impl/types/table_impl.h"       // dd::Table_impl
56 #include "sql/dd/impl/types/tablespace_impl.h"  // dd::Table_impl
57 #include "sql/dd/impl/upgrade/dd.h"             // dd::upgrade::upgrade_tables
58 #include "sql/dd/impl/upgrade/server.h"  // dd::upgrade::do_server_upgrade_checks
59 #include "sql/dd/impl/utils.h"           // dd::execute_query
60 #include "sql/dd/object_id.h"
61 #include "sql/dd/types/abstract_table.h"
62 #include "sql/dd/types/object_table.h"             // dd::Object_table
63 #include "sql/dd/types/object_table_definition.h"  // dd::Object_table_definition
64 #include "sql/dd/types/table.h"
65 #include "sql/dd/types/tablespace.h"
66 #include "sql/dd/types/tablespace_file.h"  // dd::Tablespace_file
67 #include "sql/dd/upgrade/server.h"         // UPGRADE_NONE
68 #include "sql/dd/upgrade_57/upgrade.h"     // upgrade_57::Upgrade_status
69 #include "sql/handler.h"                   // dict_init_mode_t
70 #include "sql/mdl.h"
71 #include "sql/mysqld.h"
72 #include "sql/sd_notify.h"  // sysd::notify
73 #include "sql/thd_raii.h"
74 
75 using namespace dd;
76 
77 ///////////////////////////////////////////////////////////////////////////
78 
79 namespace {
80 
81 // Initialize recovery in the DDSE.
DDSE_dict_recover(THD * thd,dict_recovery_mode_t dict_recovery_mode,uint version)82 bool DDSE_dict_recover(THD *thd, dict_recovery_mode_t dict_recovery_mode,
83                        uint version) {
84   handlerton *ddse = ha_resolve_by_legacy_type(thd, DB_TYPE_INNODB);
85   if (ddse->dict_recover == nullptr) return true;
86 
87   bool error = ddse->dict_recover(dict_recovery_mode, version);
88 
89   /*
90     Commit when tablespaces have been initialized, since in that
91     case, tablespace meta data is added.
92   */
93   if (dict_recovery_mode == DICT_RECOVERY_INITIALIZE_TABLESPACES)
94     return dd::end_transaction(thd, error);
95 
96   return error;
97 }
98 
99 /*
100   Update the System_tables registry with meta data from 'dd_properties'.
101   Iterate over the tables in the DD_properties. If this is minor downgrade,
102   add new tables that were added in the newer version to the System_tables
103   registry. If this is not minor downgrade, assert that all tables in the
104   DD_properties indeed have a corresponding entry in the System_tables
105   registry.
106 */
update_system_tables(THD * thd)107 bool update_system_tables(THD *thd) {
108   std::unique_ptr<dd::Properties> system_tables_props;
109   bool exists = false;
110 
111   if (dd::tables::DD_properties::instance().get(
112           thd, "SYSTEM_TABLES", &system_tables_props, &exists) ||
113       !exists) {
114     my_error(ER_DD_INIT_FAILED, MYF(0));
115     return true;
116   }
117   /*
118     We would normally use a range based loop here, but the developerstudio
119     compiler on Solaris does not handle this when the base collection has
120     pure virtual begin() and end() functions.
121   */
122   for (Properties::const_iterator it = system_tables_props->begin();
123        it != system_tables_props->end(); ++it) {
124     // Check if this is a CORE, INERT, SECOND or DDSE table.
125     if (!dd::get_dictionary()->is_dd_table_name(MYSQL_SCHEMA_NAME.str,
126                                                 it->first)) {
127       if (bootstrap::DD_bootstrap_ctx::instance().is_minor_downgrade()) {
128         /*
129           Add tables as type CORE regardless of the actual type, which
130           is irrelevant in this case.
131         */
132         System_tables::instance()->add(MYSQL_SCHEMA_NAME.str, it->first,
133                                        System_tables::Types::CORE, nullptr);
134       } else {
135         my_error(ER_DD_METADATA_NOT_FOUND, MYF(0), it->first.c_str());
136         return true;
137       }
138     } else {
139       /*
140         The table is a known DD table. Then, we get its definition
141         and add it to the Object_table instance. The definition might
142         not exist if the table was added after the version that we
143         are upgrading from.
144       */
145       String_type tbl_prop_str;
146       if (!system_tables_props->exists(it->first) ||
147           system_tables_props->get(it->first, &tbl_prop_str))
148         continue;
149 
150       const Object_table *table_def = System_tables::instance()->find_table(
151           MYSQL_SCHEMA_NAME.str, it->first);
152       DBUG_ASSERT(table_def);
153 
154       std::unique_ptr<dd::Properties> tbl_props(
155           Properties::parse_properties(tbl_prop_str));
156 
157       String_type def;
158       if (tbl_props->get(dd::tables::DD_properties::dd_key(
159                              dd::tables::DD_properties::DD_property::DEF),
160                          &def)) {
161         my_error(ER_DD_METADATA_NOT_FOUND, MYF(0), it->first.c_str());
162         return true;
163       }
164       std::unique_ptr<Properties> table_def_properties(
165           Properties::parse_properties(def));
166       table_def->set_actual_table_definition(*table_def_properties);
167     }
168   }
169 
170   return false;
171 }
172 
173 // Create a DD table using the target table definition.
create_target_table(THD * thd,const Object_table * object_table)174 bool create_target_table(THD *thd, const Object_table *object_table) {
175   DBUG_ASSERT(object_table != nullptr);
176 
177   /*
178    The target table definition may not be present if the table
179    is abandoned. That's ok, not an error.
180   */
181   if (object_table->is_abandoned()) return false;
182 
183   String_type target_ddl_statement("");
184   const Object_table_definition *target_table_def =
185       object_table->target_table_definition();
186 
187   DBUG_ASSERT(target_table_def != nullptr);
188   target_ddl_statement = target_table_def->get_ddl();
189   DBUG_ASSERT(!target_ddl_statement.empty());
190 
191   return dd::execute_query(thd, target_ddl_statement);
192 }
193 
194 // Create a DD table using the actual table definition.
195 /* purecov: begin inspected */
create_actual_table(THD * thd,const Object_table * object_table)196 bool create_actual_table(THD *thd, const Object_table *object_table) {
197   /*
198     For minor downgrade, tables might have been added in the upgraded
199     server that we do not have any Object_table instance for. In that
200     case, we just skip them.
201   */
202   if (object_table == nullptr) {
203     DBUG_ASSERT(bootstrap::DD_bootstrap_ctx::instance().is_minor_downgrade());
204     return false;
205   }
206 
207   String_type actual_ddl_statement("");
208   const Object_table_definition *actual_table_def =
209       object_table->actual_table_definition();
210 
211   /*
212     The actual definition may not be present. This will happen during
213     upgrade if the new DD version adds a new DD table which was not
214     present in the DD we are upgrading from. This is OK, not an error.
215   */
216   if (actual_table_def == nullptr) return false;
217 
218   actual_ddl_statement = actual_table_def->get_ddl();
219   DBUG_ASSERT(!actual_ddl_statement.empty());
220 
221   return dd::execute_query(thd, actual_ddl_statement);
222 }
223 /* purecov: end */
224 
225 /*
226   Acquire exclusive meta data locks for the DD schema, tablespace and
227   table names.
228 */
acquire_exclusive_mdl(THD * thd)229 bool acquire_exclusive_mdl(THD *thd) {
230   // All MDL requests.
231   MDL_request_list mdl_requests;
232 
233   // Prepare MDL request for the schema name.
234   MDL_request schema_request;
235   MDL_REQUEST_INIT(&schema_request, MDL_key::SCHEMA, MYSQL_SCHEMA_NAME.str, "",
236                    MDL_EXCLUSIVE, MDL_TRANSACTION);
237   mdl_requests.push_front(&schema_request);
238 
239   // Prepare MDL request for the tablespace name.
240   MDL_request tablespace_request;
241   MDL_REQUEST_INIT(&tablespace_request, MDL_key::TABLESPACE, "",
242                    MYSQL_TABLESPACE_NAME.str, MDL_EXCLUSIVE, MDL_TRANSACTION);
243   mdl_requests.push_front(&tablespace_request);
244 
245   // Prepare MDL requests for all tables names.
246   for (System_tables::Const_iterator it = System_tables::instance()->begin();
247        it != System_tables::instance()->end(); ++it) {
248     // Skip extraneous tables during minor downgrade.
249     if ((*it)->entity() == nullptr) continue;
250 
251     MDL_request *table_request = new (thd->mem_root) MDL_request;
252     if (table_request == nullptr) return true;
253     MDL_REQUEST_INIT(table_request, MDL_key::TABLE, MYSQL_SCHEMA_NAME.str,
254                      (*it)->entity()->name().c_str(), MDL_EXCLUSIVE,
255                      MDL_TRANSACTION);
256     mdl_requests.push_front(table_request);
257   }
258 
259   // Finally, acquire all the MDL locks.
260   return (thd->mdl_context.acquire_locks(&mdl_requests,
261                                          thd->variables.lock_wait_timeout));
262 }
263 
264 /*
265   Acquire the DD schema, tablespace and table objects. Clone the objects,
266   reset ID, store persistently, and update the storage adapter.
267 */
flush_meta_data(THD * thd)268 bool flush_meta_data(THD *thd) {
269   // Acquire exclusive meta data locks for the relevant DD objects.
270   if (acquire_exclusive_mdl(thd)) return true;
271 
272   {
273     /*
274       Use a scoped auto releaser to make sure the cached objects are released
275       before the shared cache is reset.
276     */
277     dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
278 
279     /*
280       First, we acquire the DD schema and tablespace and keep them in
281       local variables. We also clone them, the clones will be used for
282       updating the ids. We also acquire all the DD table objects to make
283       sure the shared cache is populated, and we keep the original objects
284       as well as clones in a vector. The auto releaser will make sure the
285       objects are not evicted. This must be ensured since we need to make
286       sure the ids stay consistent across all objects in the shared cache.
287     */
288     const Schema *dd_schema = nullptr;
289     const Tablespace *dd_tspace = nullptr;
290     std::vector<const Table_impl *> dd_tables;  // Owned by the shared cache.
291     std::vector<std::unique_ptr<Table_impl>> dd_table_clones;
292 
293     if (thd->dd_client()->acquire(dd::String_type(MYSQL_SCHEMA_NAME.str),
294                                   &dd_schema) ||
295         thd->dd_client()->acquire(dd::String_type(MYSQL_TABLESPACE_NAME.str),
296                                   &dd_tspace))
297       return dd::end_transaction(thd, true);
298 
299     std::unique_ptr<Schema_impl> dd_schema_clone(
300         dynamic_cast<Schema_impl *>(dd_schema->clone()));
301 
302     std::unique_ptr<Tablespace_impl> dd_tspace_clone(
303         dynamic_cast<Tablespace_impl *>(dd_tspace->clone()));
304 
305     for (System_tables::Const_iterator it = System_tables::instance()->begin();
306          it != System_tables::instance()->end(); ++it) {
307       /*
308         We add nullptr to the dd_tables vector for abandoned
309         tables and system tables to have the same number of objects
310         in the System_tables list, the dd_tables vector and the
311         dd_table_clones vector.
312       */
313       const dd::Table *dd_table = nullptr;
314       if ((*it)->property() != System_tables::Types::SYSTEM &&
315           thd->dd_client()->acquire(MYSQL_SCHEMA_NAME.str,
316                                     (*it)->entity()->name(), &dd_table))
317         return dd::end_transaction(thd, true);
318 
319       dd_tables.push_back(dynamic_cast<const Table_impl *>(dd_table));
320 
321       /*
322         If this is an abandoned table, we can't clone it. Thus, we
323         push back a nullptr to make sure we have the same number of
324         elements in the dd_table_clones as in the System_tables.
325       */
326       std::unique_ptr<Table_impl> dd_table_clone;
327       if (dd_table != nullptr) {
328         dd_table_clones.push_back(std::unique_ptr<Table_impl>(
329             dynamic_cast<Table_impl *>(dd_table->clone())));
330       } else {
331         dd_table_clones.push_back(nullptr);
332       }
333     }
334 
335     /*
336       We have now populated the shared cache with the core objects, and kept
337       clones of all DD objects. The scoped auto releaser makes sure we will
338       not evict the objects from the shared cache until the auto releaser
339       exits scope. Thus, within the scope of the auto releaser, we can modify
340       the contents of the core registry in the storage adapter without risking
341       that this will interfere with the contents of the shared cache, because
342       the DD transactions will acquire the core objects from the shared cache.
343     */
344 
345     /*
346       First, we modify and store the DD schema without changing the cached
347       copy. We cannot use acquire_for_modification() here, because that
348       would make the DD sub-transactions (e.g. when calling store()) see a
349       partially modified set of core objects, where e.g. the mysql
350       schema object has got its new, real id (from the auto-inc column
351       in the dd.schemata table), whereas the core DD table objects still
352       refer to the id that was allocated when creating the scaffolding.
353 
354       So we first store all the objects persistently, and make sure that
355       the on-disk data will have correct and consistent ids. When all objects
356       are stored, we update the contents of the core registry in the
357       storage adapter to reflect the persisted data. Finally, the shared
358       cache is reset so that on next acquisition, the DD objects will be
359       fetched from the core registry in the storage adapter.
360     */
361 
362     /*
363       We must set the ID to INVALID to make the object get a fresh ID from
364       the auto inc ID column.
365     */
366     dd_schema_clone->set_id(INVALID_OBJECT_ID);
367     dd_tspace_clone->set_id(INVALID_OBJECT_ID);
368     if (dd::cache::Storage_adapter::instance()->store(
369             thd, static_cast<Schema *>(dd_schema_clone.get())) ||
370         dd::cache::Storage_adapter::instance()->store(
371             thd, static_cast<Tablespace *>(dd_tspace_clone.get())))
372       return dd::end_transaction(thd, true);
373 
374     /*
375       Now, the DD schema and DD tablespace are stored persistently. We will
376       not update the core registry until after we have stored all DD tables.
377       At that point, we can update all the core registry objects in one go
378       and avoid using a partially update core registry for e.g. object
379       acquisition.
380     */
381     std::vector<std::unique_ptr<Table_impl>>::iterator clone_it =
382         dd_table_clones.begin();
383     for (System_tables::Const_iterator it = System_tables::instance()->begin();
384          it != System_tables::instance()->end() &&
385          clone_it != dd_table_clones.end();
386          ++it, ++clone_it) {
387       // Skip abandoned tables and system tables.
388       if ((*clone_it) == nullptr ||
389           (*it)->property() == System_tables::Types::SYSTEM)
390         continue;
391 
392       DBUG_ASSERT((*it)->entity()->name() == (*clone_it)->name());
393 
394       // We must set the ID to INVALID to let the object get an auto inc ID.
395       (*clone_it)->set_id(INVALID_OBJECT_ID);
396 
397       /*
398         Change the schema and tablespace id to match the ids of the
399         persisted objects. Note that this means the persisted DD table
400         objects will have consistent IDs, but the IDs in the objects in
401         the core registry will not be updated yet.
402       */
403       (*clone_it)->set_schema_id(dd_schema_clone->id());
404       (*clone_it)->set_tablespace_id(dd_tspace_clone->id());
405       if (dd::cache::Storage_adapter::instance()->store(
406               thd, static_cast<Table *>((*clone_it).get())))
407         return dd::end_transaction(thd, true);
408     }
409 
410     /*
411       Update and store the predefined tablespace objects. The DD tablespace
412       has already been stored above, so we iterate only over the tablespaces
413       of type PREDEFINED_DDSE.
414     */
415     for (System_tablespaces::Const_iterator it =
416              System_tablespaces::instance()->begin(
417                  System_tablespaces::Types::PREDEFINED_DDSE);
418          it != System_tablespaces::instance()->end();
419          it = System_tablespaces::instance()->next(
420              it, System_tablespaces::Types::PREDEFINED_DDSE)) {
421       const dd::Tablespace *tspace = nullptr;
422       if (thd->dd_client()->acquire((*it)->key().second, &tspace))
423         return dd::end_transaction(thd, true);
424 
425       std::unique_ptr<Tablespace_impl> tspace_clone(
426           dynamic_cast<Tablespace_impl *>(tspace->clone()));
427 
428       // We must set the ID to INVALID to enable storing the object.
429       tspace_clone->set_id(INVALID_OBJECT_ID);
430       if (dd::cache::Storage_adapter::instance()->store(
431               thd, static_cast<Tablespace *>(tspace_clone.get())))
432         return dd::end_transaction(thd, true);
433 
434       /*
435         Only the DD tablespace is needed to handle cache misses, so we can
436         just drop the predefined tablespaces from the core registry now that
437         it has been persisted.
438       */
439       dd::cache::Storage_adapter::instance()->core_drop(thd, tspace);
440     }
441 
442     /*
443       Now, the DD schema and tablespace as well as the DD tables have been
444       persisted. The last thing we do before resetting the shared cache is
445       to update the contents of the core registry to match the persisted
446       objects. First, we update the core registry with the persisted DD
447       schema and tablespace.
448      */
449     dd::cache::Storage_adapter::instance()->core_drop(thd, dd_schema);
450     dd::cache::Storage_adapter::instance()->core_store(
451         thd, static_cast<Schema *>(dd_schema_clone.get()));
452 
453     dd::cache::Storage_adapter::instance()->core_drop(thd, dd_tspace);
454     dd::cache::Storage_adapter::instance()->core_store(
455         thd, static_cast<Tablespace *>(dd_tspace_clone.get()));
456 
457     // Make sure the IDs after storing are as expected.
458     DBUG_ASSERT(dd_schema_clone->id() == 1);
459     DBUG_ASSERT(dd_tspace_clone->id() == 1);
460 
461     /*
462       Finally, we update the core registry of the DD tables. This must be
463       done in two loops to avoid issues related to overlapping ID sequences.
464     */
465     std::vector<const Table_impl *>::const_iterator table_it =
466         dd_tables.begin();
467     for (System_tables::Const_iterator it = System_tables::instance()->begin();
468          it != System_tables::instance()->end() && table_it != dd_tables.end();
469          ++it, ++table_it) {
470       // Skip abandoned tables and system tables.
471       if ((*table_it) == nullptr ||
472           (*it)->property() == System_tables::Types::SYSTEM)
473         continue;
474 
475       DBUG_ASSERT((*it)->entity()->name() == (*table_it)->name());
476       dd::cache::Storage_adapter::instance()->core_drop(
477           thd, static_cast<const Table *>(*table_it));
478     }
479 
480     clone_it = dd_table_clones.begin();
481     for (System_tables::Const_iterator it = System_tables::instance()->begin();
482          it != System_tables::instance()->end() &&
483          clone_it != dd_table_clones.end();
484          ++it, ++clone_it) {
485       // Skip abandoned tables and system tables.
486       if ((*clone_it) == nullptr ||
487           (*it)->property() == System_tables::Types::SYSTEM)
488         continue;
489 
490       if ((*it)->property() == System_tables::Types::CORE) {
491         DBUG_ASSERT((*it)->entity()->name() == (*clone_it)->name());
492         dd::cache::Storage_adapter::instance()->core_store(
493             thd, static_cast<Table *>((*clone_it).get()));
494       }
495     }
496   }
497 
498   /*
499     Now, the auto releaser has released the objects, and we can go ahead and
500     reset the shared cache.
501   */
502   dd::cache::Shared_dictionary_cache::instance()->reset(true);
503 
504   if (dd::end_transaction(thd, false)) {
505     return true;
506   }
507 
508   /*
509     Use a scoped auto releaser to make sure the objects cached for SDI
510     writing, FK parent information reload, and DD property storage are
511     released.
512   */
513   dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
514 
515   // Acquire the DD tablespace and write SDI
516   const Tablespace *dd_tspace = nullptr;
517   if (thd->dd_client()->acquire(dd::String_type(MYSQL_TABLESPACE_NAME.str),
518                                 &dd_tspace) ||
519       dd::sdi::store(thd, dd_tspace)) {
520     return dd::end_transaction(thd, true);
521   }
522 
523   // Acquire the DD schema and write SDI
524   const Schema *dd_schema = nullptr;
525   if (thd->dd_client()->acquire(dd::String_type(MYSQL_SCHEMA_NAME.str),
526                                 &dd_schema) ||
527       dd::sdi::store(thd, dd_schema)) {
528     return dd::end_transaction(thd, true);
529   }
530 
531   /*
532     Acquire the DD table objects and write SDI for them. Also sync from
533     the DD tables in order to get the FK parent information reloaded.
534   */
535   for (System_tables::Const_iterator it = System_tables::instance()->begin();
536        it != System_tables::instance()->end(); ++it) {
537     // Skip system tables.
538     if ((*it)->property() == System_tables::Types::SYSTEM) continue;
539 
540     const dd::Table *dd_table = nullptr;
541     if (thd->dd_client()->acquire(MYSQL_SCHEMA_NAME.str,
542                                   (*it)->entity()->name(), &dd_table)) {
543       return dd::end_transaction(thd, true);
544     }
545 
546     // Skip abandoned tables.
547     if (dd_table == nullptr) continue;
548 
549     /*
550       Make sure the registry of the core DD objects is updated with an
551       object read from the DD tables, with updated FK parent information.
552       Store the object to make sure SDI is written.
553     */
554     Abstract_table::Name_key table_key;
555     Abstract_table::update_name_key(&table_key, dd_schema->id(),
556                                     dd_table->name());
557     const dd::Abstract_table *persisted_dd_table = nullptr;
558     if (dd::cache::Storage_adapter::instance()->get(
559             thd, table_key, ISO_READ_COMMITTED, true, &persisted_dd_table) ||
560         persisted_dd_table == nullptr ||
561         dd::sdi::store(thd, dynamic_cast<const Table *>(persisted_dd_table))) {
562       if (persisted_dd_table != nullptr) delete persisted_dd_table;
563       return dd::end_transaction(thd, true);
564     }
565 
566     if ((*it)->property() == System_tables::Types::CORE) {
567       dd::cache::Storage_adapter::instance()->core_drop(thd, dd_table);
568       dd::cache::Storage_adapter::instance()->core_store(
569           thd, dynamic_cast<Table *>(
570                    const_cast<Abstract_table *>(persisted_dd_table)));
571     }
572 
573     if (persisted_dd_table != nullptr) delete persisted_dd_table;
574   }
575 
576   bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::SYNCED);
577 
578   return dd::end_transaction(thd, false);
579 }
580 
581 // Insert additional data into the DD tables.
populate_tables(THD * thd)582 bool populate_tables(THD *thd) {
583   // Iterate over DD tables, populate tables.
584   for (System_tables::Const_iterator it = System_tables::instance()->begin();
585        it != System_tables::instance()->end(); ++it) {
586     // Skip system tables.
587     if ((*it)->property() == System_tables::Types::SYSTEM) continue;
588 
589     // Retrieve list of SQL statements to execute.
590     const Object_table_definition *table_def =
591         (*it)->entity()->target_table_definition();
592 
593     // Skip abandoned tables.
594     if (table_def == nullptr) continue;
595 
596     bool error = false;
597     std::vector<dd::String_type> stmt = table_def->get_dml();
598     for (std::vector<dd::String_type>::iterator stmt_it = stmt.begin();
599          stmt_it != stmt.end() && !error; ++stmt_it)
600       error = dd::execute_query(thd, *stmt_it);
601 
602     // Commit the statement based population.
603     if (dd::end_transaction(thd, error)) return true;
604 
605     // If no error, call the low level table population method, and commit it.
606     error = (*it)->entity()->populate(thd);
607     if (dd::end_transaction(thd, error)) return true;
608   }
609 
610   bootstrap::DD_bootstrap_ctx::instance().set_stage(
611       bootstrap::Stage::POPULATED);
612 
613   return false;
614 }
615 
616 // Re-populate character sets and collations upon normal restart.
repopulate_charsets_and_collations(THD * thd)617 bool repopulate_charsets_and_collations(THD *thd) {
618   /*
619     We must check if the DDSE is started in a way that makes the DD
620     read only. For now, we only support InnoDB as SE for the DD. The call
621     to retrieve the handlerton for the DDSE should be replaced by a more
622     generic mechanism.
623   */
624   handlerton *ddse = ha_resolve_by_legacy_type(thd, DB_TYPE_INNODB);
625   if (ddse->is_dict_readonly && ddse->is_dict_readonly()) {
626     LogErr(WARNING_LEVEL, ER_DD_NO_WRITES_NO_REPOPULATION, "InnoDB", " ");
627     return false;
628   }
629 
630   /*
631     Otherwise, turn off FK checks, re-populate and commit.
632     The FK checks must be turned off since the collations and
633     character sets reference each other.
634   */
635   bool error = dd::execute_query(thd, "SET FOREIGN_KEY_CHECKS= 0") ||
636                tables::Collations::instance().populate(thd) ||
637                tables::Character_sets::instance().populate(thd);
638 
639   /*
640     We must commit the re-population before executing a new query, which
641     expects the transaction to be empty, and finally, turn FK checks back on.
642   */
643   error |= dd::end_transaction(thd, error);
644   error |= dd::execute_query(thd, "SET FOREIGN_KEY_CHECKS= 1");
645   bootstrap::DD_bootstrap_ctx::instance().set_stage(
646       bootstrap::Stage::POPULATED);
647 
648   return error;
649 }
650 
651 /*
652   Verify that the storage adapter contains the core DD objects and
653   nothing else.
654 */
verify_contents(THD * thd)655 bool verify_contents(THD *thd) {
656   // Verify that the DD schema is present, and that its id == 1.
657   Schema::Name_key schema_key;
658   Schema::update_name_key(&schema_key, MYSQL_SCHEMA_NAME.str);
659   Object_id dd_schema_id =
660       cache::Storage_adapter::instance()->core_get_id<Schema>(schema_key);
661 
662   DBUG_ASSERT(dd_schema_id == MYSQL_SCHEMA_DD_ID);
663   if (dd_schema_id == INVALID_OBJECT_ID) {
664     LogErr(ERROR_LEVEL, ER_DD_SCHEMA_NOT_FOUND, MYSQL_SCHEMA_NAME.str);
665     return dd::end_transaction(thd, true);
666   }
667   DBUG_ASSERT(cache::Storage_adapter::instance()->core_size<Schema>() == 1);
668 
669   // Verify that the core DD tables are present.
670 #ifndef DBUG_OFF
671   size_t n_core_tables = 0;
672 #endif
673   for (System_tables::Const_iterator it =
674            System_tables::instance()->begin(System_tables::Types::CORE);
675        it != System_tables::instance()->end();
676        it = System_tables::instance()->next(it, System_tables::Types::CORE)) {
677     // Skip extraneous tables for minor downgrade.
678     if ((*it)->entity() == nullptr) continue;
679 
680 #ifndef DBUG_OFF
681     n_core_tables++;
682 #endif
683 
684     Table::Name_key table_key;
685     Table::update_name_key(&table_key, dd_schema_id, (*it)->entity()->name());
686     Object_id dd_table_id =
687         cache::Storage_adapter::instance()->core_get_id<Table>(table_key);
688 
689     DBUG_ASSERT(dd_table_id != INVALID_OBJECT_ID);
690     if (dd_table_id == INVALID_OBJECT_ID) {
691       LogErr(ERROR_LEVEL, ER_DD_TABLE_NOT_FOUND,
692              (*it)->entity()->name().c_str());
693       return dd::end_transaction(thd, true);
694     }
695   }
696   DBUG_ASSERT(cache::Storage_adapter::instance()->core_size<Abstract_table>() ==
697               n_core_tables);
698 
699   // Verify that the dictionary tablespace is present and that its id == 1.
700   Tablespace::Name_key tspace_key;
701   Tablespace::update_name_key(&tspace_key, MYSQL_TABLESPACE_NAME.str);
702   Object_id dd_tspace_id =
703       cache::Storage_adapter::instance()->core_get_id<Tablespace>(tspace_key);
704 
705   DBUG_ASSERT(dd_tspace_id == MYSQL_TABLESPACE_DD_ID);
706   if (dd_tspace_id == INVALID_OBJECT_ID) {
707     LogErr(ERROR_LEVEL, ER_DD_TABLESPACE_NOT_FOUND, MYSQL_TABLESPACE_NAME.str);
708     return dd::end_transaction(thd, true);
709   }
710   DBUG_ASSERT(cache::Storage_adapter::instance()->core_size<Tablespace>() == 1);
711 
712   return dd::end_transaction(thd, false);
713 }
714 
715 }  // namespace
716 
717 ///////////////////////////////////////////////////////////////////////////
718 
719 namespace dd {
720 namespace bootstrap {
721 
722 /*
723   Do the necessary DD-related initialization in the DDSE, and get the
724   predefined tables and tablespaces.
725 */
DDSE_dict_init(THD * thd,dict_init_mode_t dict_init_mode,uint version)726 bool DDSE_dict_init(THD *thd, dict_init_mode_t dict_init_mode, uint version) {
727   handlerton *ddse = ha_resolve_by_legacy_type(thd, DB_TYPE_INNODB);
728 
729   /*
730     The lists with element wrappers are mem root allocated. The wrapped
731     instances are allocated dynamically in the DDSE. These instances will be
732     owned by the System_tables registry by the end of this function.
733   */
734   List<const Object_table> ddse_tables;
735   List<const Plugin_tablespace> ddse_tablespaces;
736   if (ddse->ddse_dict_init == nullptr ||
737       ddse->ddse_dict_init(dict_init_mode, version, &ddse_tables,
738                            &ddse_tablespaces))
739     return true;
740 
741   /*
742     Iterate over the table definitions and add them to the System_tables
743     registry. The Object_table instances will later be used to execute
744     CREATE TABLE statements to actually create the tables.
745 
746     If Object_table::is_hidden(), then we add the tables as type DDSE_PRIVATE
747     (not available neither for DDL nor DML), otherwise, we add them as type
748     DDSE_PROTECTED (available for DML, not for DDL).
749   */
750   List_iterator<const Object_table> table_it(ddse_tables);
751   const Object_table *ddse_table = nullptr;
752   while ((ddse_table = table_it++)) {
753     System_tables::Types table_type = System_tables::Types::DDSE_PROTECTED;
754     if (ddse_table->is_hidden()) {
755       table_type = System_tables::Types::DDSE_PRIVATE;
756     }
757     System_tables::instance()->add(MYSQL_SCHEMA_NAME.str, ddse_table->name(),
758                                    table_type, ddse_table);
759   }
760 
761   /*
762     Get the server version number from the DD tablespace header and verify
763     that we are allowed to upgrade from that version. The error handling is done
764     after adding the ddse tables into the system registry to avoid memory leaks.
765   */
766   if (!opt_initialize) {
767     uint server_version = 0;
768     if (ddse->dict_get_server_version == nullptr ||
769         ddse->dict_get_server_version(&server_version)) {
770       LogErr(ERROR_LEVEL, ER_CANNOT_GET_SERVER_VERSION_FROM_TABLESPACE_HEADER);
771       return true;
772     }
773 
774     if (server_version != MYSQL_VERSION_ID) {
775       if (opt_upgrade_mode == UPGRADE_NONE) {
776         LogErr(ERROR_LEVEL, ER_SERVER_UPGRADE_OFF);
777         return true;
778       }
779       if (!DD_bootstrap_ctx::instance().supported_server_version(
780               server_version)) {
781         LogErr(ERROR_LEVEL, ER_SERVER_UPGRADE_VERSION_NOT_SUPPORTED,
782                server_version);
783         return true;
784       }
785     }
786   }
787 
788   /*
789     At this point, the Systen_tables registry contains the INERT DD tables,
790     and the DDSE tables. Before we continue, we must add the remaining
791     DD tables.
792   */
793   System_tables::instance()->add_remaining_dd_tables();
794 
795   /*
796     Iterate over the tablespace definitions, add the names and the
797     tablespace meta data to the System_tablespaces registry. The
798     meta data will be used later to create dd::Tablespace objects.
799     The Plugin_tablespace instances are owned by the DDSE.
800   */
801   List_iterator<const Plugin_tablespace> tablespace_it(ddse_tablespaces);
802   const Plugin_tablespace *tablespace = nullptr;
803   while ((tablespace = tablespace_it++)) {
804     // Add the name and the object instance to the registry with the
805     // appropriate property.
806     if (my_strcasecmp(system_charset_info, MYSQL_TABLESPACE_NAME.str,
807                       tablespace->get_name()) == 0)
808       System_tablespaces::instance()->add(
809           tablespace->get_name(), System_tablespaces::Types::DD, tablespace);
810     else
811       System_tablespaces::instance()->add(
812           tablespace->get_name(), System_tablespaces::Types::PREDEFINED_DDSE,
813           tablespace);
814   }
815 
816   return false;
817 }
818 
819 // Initialize the data dictionary.
initialize_dictionary(THD * thd,bool is_dd_upgrade_57,Dictionary_impl * d)820 bool initialize_dictionary(THD *thd, bool is_dd_upgrade_57,
821                            Dictionary_impl *d) {
822   if (is_dd_upgrade_57)
823     bootstrap::DD_bootstrap_ctx::instance().set_stage(
824         bootstrap::Stage::STARTED);
825 
826   store_predefined_tablespace_metadata(thd);
827   if (create_dd_schema(thd) || initialize_dd_properties(thd) ||
828       create_tables(thd, nullptr))
829     return true;
830 
831   if (is_dd_upgrade_57) {
832     // Add status to mark creation of dictionary in InnoDB.
833     // Till this step, no new undo log is created by InnoDB.
834     if (upgrade_57::Upgrade_status().update(
835             upgrade_57::Upgrade_status::enum_stage::DICT_TABLES_CREATED))
836       return true;
837   }
838 
839   DBUG_EXECUTE_IF(
840       "dd_upgrade_stage_2", if (is_dd_upgrade_57) {
841         /*
842           Server will crash will upgrading 5.7 data directory.
843           This will leave server is an inconsistent state.
844           File tracking upgrade will have Stage 2 written in it.
845           Next restart of server on same data directory should
846           revert all changes done by upgrade and data directory
847           should be reusable by 5.7 server.
848         */
849         DBUG_SUICIDE();
850       });
851 
852   if (DDSE_dict_recover(thd, DICT_RECOVERY_INITIALIZE_SERVER,
853                         d->get_target_dd_version()) ||
854       flush_meta_data(thd) ||
855       DDSE_dict_recover(thd, DICT_RECOVERY_INITIALIZE_TABLESPACES,
856                         d->get_target_dd_version()) ||
857       populate_tables(thd) ||
858       update_properties(thd, nullptr, nullptr,
859                         String_type(MYSQL_SCHEMA_NAME.str)) ||
860       verify_contents(thd) | update_versions(thd, is_dd_upgrade_57))
861     return true;
862 
863   bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::FINISHED);
864 
865   return false;
866 }
867 
868 // First time server start and initialization of the data dictionary.
initialize(THD * thd)869 bool initialize(THD *thd) {
870   bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::STARTED);
871 
872   /*
873     Set tx_read_only to false to allow installing DD tables even
874     if the server is started with --transaction-read-only=true.
875   */
876   thd->variables.transaction_read_only = false;
877   thd->tx_read_only = false;
878 
879   Disable_autocommit_guard autocommit_guard(thd);
880 
881   Dictionary_impl *d = dd::Dictionary_impl::instance();
882   DBUG_ASSERT(d);
883   cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
884 
885   /*
886     Each step in the install process below is committed independently,
887     either implicitly (for e.g. "CREATE TABLE") or explicitly (for the
888     operations in the "populate()" methods). Thus, there is no need to
889     commit explicitly here.
890   */
891   if (DDSE_dict_init(thd, DICT_INIT_CREATE_FILES, d->get_target_dd_version()) ||
892       initialize_dictionary(thd, false, d))
893     return true;
894 
895   DBUG_ASSERT(d->get_target_dd_version() == d->get_actual_dd_version(thd));
896   LogErr(INFORMATION_LEVEL, ER_DD_VERSION_INSTALLED,
897          d->get_target_dd_version());
898   return false;
899 }
900 
901 // Normal server restart.
restart(THD * thd)902 bool restart(THD *thd) {
903   bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::STARTED);
904 
905   /*
906     Set tx_read_only to false to allow installing DD tables even
907     if the server is started with --transaction-read-only=true.
908   */
909   thd->variables.transaction_read_only = false;
910   thd->tx_read_only = false;
911 
912   // Set explicit_defaults_for_timestamp variable for dictionary creation
913   thd->variables.explicit_defaults_for_timestamp = true;
914 
915   Disable_autocommit_guard autocommit_guard(thd);
916 
917   Dictionary_impl *d = dd::Dictionary_impl::instance();
918   DBUG_ASSERT(d);
919   cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
920 
921   store_predefined_tablespace_metadata(thd);
922 
923   if (create_dd_schema(thd) || initialize_dd_properties(thd) ||
924       create_tables(thd, nullptr) || sync_meta_data(thd) ||
925       DDSE_dict_recover(thd, DICT_RECOVERY_RESTART_SERVER,
926                         d->get_actual_dd_version(thd)) ||
927       upgrade::do_server_upgrade_checks(thd) || upgrade::upgrade_tables(thd) ||
928       repopulate_charsets_and_collations(thd) || verify_contents(thd) ||
929       update_versions(thd, false)) {
930     return true;
931   }
932 
933   bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::FINISHED);
934   LogErr(INFORMATION_LEVEL, ER_DD_VERSION_FOUND, d->get_actual_dd_version(thd));
935 
936   return false;
937 }
938 
939 // Initialize dictionary in case of server restart.
recover_innodb_upon_upgrade(THD * thd)940 void recover_innodb_upon_upgrade(THD *thd) {
941   Dictionary_impl *d = dd::Dictionary_impl::instance();
942   store_predefined_tablespace_metadata(thd);
943   // RAII to handle error in execution of CREATE TABLE.
944   Key_length_error_handler key_error_handler;
945   /*
946     Ignore ER_TOO_LONG_KEY for dictionary tables during restart.
947     Do not print the error in error log as we are creating only the
948     cached objects and not physical tables.
949 TODO: Workaround due to bug#20629014. Remove when the bug is fixed.
950    */
951   thd->push_internal_handler(&key_error_handler);
952   if (create_dd_schema(thd) || initialize_dd_properties(thd) ||
953       create_tables(thd, nullptr) ||
954       DDSE_dict_recover(thd, DICT_RECOVERY_RESTART_SERVER,
955                         d->get_actual_dd_version(thd))) {
956     // Error is not be handled in this case as we are on cleanup code path.
957     LogErr(WARNING_LEVEL, ER_DD_INIT_UPGRADE_FAILED);
958   }
959   thd->pop_internal_handler();
960   return;
961 }
962 
setup_dd_objects_and_collations(THD * thd)963 bool setup_dd_objects_and_collations(THD *thd) {
964   // Continue with server startup.
965   bootstrap::DD_bootstrap_ctx::instance().set_stage(
966       bootstrap::Stage::CREATED_TABLES);
967 
968   /*
969     Set tx_read_only to false to allow installing DD tables even
970     if the server is started with --transaction-read-only=true.
971   */
972   thd->variables.transaction_read_only = false;
973   thd->tx_read_only = false;
974 
975   Disable_autocommit_guard autocommit_guard(thd);
976 
977   Dictionary_impl *d = dd::Dictionary_impl::instance();
978   DBUG_ASSERT(d);
979 
980   DBUG_ASSERT(d->get_target_dd_version() == d->get_actual_dd_version(thd));
981 
982   /*
983     In this context, we initialize the target tables directly since this
984     is a restart based on a pre-transactional-DD server, so ordinary
985     upgrade does not need to be considered.
986   */
987   if (sync_meta_data(thd) || repopulate_charsets_and_collations(thd) ||
988       verify_contents(thd) || update_versions(thd, false)) {
989     return true;
990   }
991 
992   bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::FINISHED);
993   LogErr(INFORMATION_LEVEL, ER_DD_VERSION_FOUND, d->get_actual_dd_version(thd));
994 
995   return false;
996 }
997 
998 }  // namespace bootstrap
999 
store_predefined_tablespace_metadata(THD * thd)1000 void store_predefined_tablespace_metadata(THD *thd) {
1001   /*
1002     Create dd::Tablespace objects and store them (which will add their meta
1003     data to the storage adapter registry of DD entities). The tablespaces
1004     are already created physically in the DDSE, so we only need to create
1005     the corresponding meta data.
1006   */
1007   for (System_tablespaces::Const_iterator it =
1008            System_tablespaces::instance()->begin();
1009        it != System_tablespaces::instance()->end(); ++it) {
1010     const Plugin_tablespace *tablespace_def = (*it)->entity();
1011 
1012     // Create the dd::Tablespace object.
1013     std::unique_ptr<Tablespace> tablespace(dd::create_object<Tablespace>());
1014     tablespace->set_name(tablespace_def->get_name());
1015     tablespace->set_options(tablespace_def->get_options());
1016     tablespace->set_se_private_data(tablespace_def->get_se_private_data());
1017     tablespace->set_engine(tablespace_def->get_engine());
1018 
1019     // Loop over the tablespace files, create dd::Tablespace_file objects.
1020     List<const Plugin_tablespace::Plugin_tablespace_file> files =
1021         tablespace_def->get_files();
1022     List_iterator<const Plugin_tablespace::Plugin_tablespace_file> file_it(
1023         files);
1024     const Plugin_tablespace::Plugin_tablespace_file *file = nullptr;
1025     while ((file = file_it++)) {
1026       Tablespace_file *space_file = tablespace->add_file();
1027       space_file->set_filename(file->get_name());
1028       space_file->set_se_private_data(file->get_se_private_data());
1029     }
1030 
1031     // All the predefined tablespace are unencrypted (atleast for now).
1032     tablespace->options().set("encryption", "N");
1033 
1034     /*
1035       Here, we just want to populate the core registry in the storage
1036       adapter. We do not want to have the object registered in the
1037       uncommitted registry, this will only add complexity to the
1038       DD cache usage during bootstrap. Thus, we call the storage adapter
1039       directly instead of going through the dictionary client.
1040     */
1041     dd::cache::Storage_adapter::instance()->store(thd, tablespace.get());
1042   }
1043   bootstrap::DD_bootstrap_ctx::instance().set_stage(
1044       bootstrap::Stage::CREATED_TABLESPACES);
1045 }
1046 
create_dd_schema(THD * thd)1047 bool create_dd_schema(THD *thd) {
1048   return dd::execute_query(thd,
1049                            dd::String_type("CREATE SCHEMA ") +
1050                                dd::String_type(MYSQL_SCHEMA_NAME.str) +
1051                                dd::String_type(" DEFAULT COLLATE ") +
1052                                dd::String_type(default_charset_info->name)) ||
1053          dd::execute_query(thd, dd::String_type("USE ") +
1054                                     dd::String_type(MYSQL_SCHEMA_NAME.str));
1055 }
1056 
initialize_dd_properties(THD * thd)1057 bool initialize_dd_properties(THD *thd) {
1058   // Create the dd_properties table.
1059   const Object_table_definition *dd_properties_def =
1060       dd::tables::DD_properties::instance().target_table_definition();
1061   if (dd::execute_query(thd, dd_properties_def->get_ddl())) return true;
1062 
1063   /*
1064     We can now decide which version number we will use for the DD, and
1065     initialize the DD_bootstrap_ctx with the relevant version number.
1066   */
1067   uint actual_version = dd::DD_VERSION;
1068   uint actual_server_version = MYSQL_VERSION_ID;
1069   uint upgraded_server_version = MYSQL_VERSION_ID;
1070 
1071   bootstrap::DD_bootstrap_ctx::instance().set_actual_dd_version(actual_version);
1072   bootstrap::DD_bootstrap_ctx::instance().set_upgraded_server_version(
1073       actual_server_version);
1074 
1075   if (!opt_initialize) {
1076     bool exists = false;
1077     bool exists_server = false;
1078     bool exists_upgraded_version = false;
1079 
1080     // Check 'DD_version' too in order to catch an upgrade from 8.0.3.
1081     if (dd::tables::DD_properties::instance().get(thd, "DD_VERSION",
1082                                                   &actual_version, &exists) ||
1083         !exists) {
1084       LogErr(ERROR_LEVEL, ER_DD_NO_VERSION_FOUND);
1085       return true;
1086     }
1087 
1088     /* purecov: begin inspected */
1089     if (actual_version != dd::DD_VERSION) {
1090       bootstrap::DD_bootstrap_ctx::instance().set_actual_dd_version(
1091           actual_version);
1092       if (opt_no_dd_upgrade) {
1093         push_deprecated_warn(thd, "--no-dd-upgrade", "--upgrade=NONE");
1094         LogErr(ERROR_LEVEL, ER_DD_UPGRADE_OFF);
1095         return true;
1096       }
1097 
1098       if (!bootstrap::DD_bootstrap_ctx::instance().supported_dd_version()) {
1099         /*
1100           If we are attempting on minor downgrade, make sure this is
1101           supported.
1102         */
1103         if (!bootstrap::DD_bootstrap_ctx::instance().is_minor_downgrade()) {
1104           LogErr(ERROR_LEVEL, ER_DD_UPGRADE_VERSION_NOT_SUPPORTED,
1105                  actual_version);
1106           return true;
1107         }
1108 
1109         uint minor_downgrade_threshold = 0;
1110         if (dd::tables::DD_properties::instance().get(
1111                 thd, "MINOR_DOWNGRADE_THRESHOLD", &minor_downgrade_threshold,
1112                 &exists) ||
1113             !exists || minor_downgrade_threshold > dd::DD_VERSION) {
1114           LogErr(ERROR_LEVEL, ER_DD_MINOR_DOWNGRADE_VERSION_NOT_SUPPORTED,
1115                  actual_version);
1116           return true;
1117         }
1118       }
1119     }
1120     /* purecov: end */
1121 
1122     if (dd::tables::DD_properties::instance().get(
1123             thd, "MYSQLD_VERSION", &actual_server_version, &exists_server) ||
1124         !exists_server)
1125       return true;
1126 
1127     if (dd::tables::DD_properties::instance().get(
1128             thd, "MYSQLD_VERSION_UPGRADED", &upgraded_server_version,
1129             &exists_upgraded_version) ||
1130         !exists_upgraded_version)
1131       upgraded_server_version = actual_server_version;
1132     bootstrap::DD_bootstrap_ctx::instance().set_upgraded_server_version(
1133         upgraded_server_version);
1134 
1135     if (DBUG_EVALUATE_IF("simulate_mysql_upgrade_skip_pending", true,
1136                          actual_server_version != upgraded_server_version &&
1137                              actual_server_version != MYSQL_VERSION_ID)) {
1138       LogErr(ERROR_LEVEL, ER_SERVER_UPGRADE_PENDING, MYSQL_VERSION_ID,
1139              upgraded_server_version);
1140       return true;
1141     }
1142 
1143     if (upgraded_server_version != MYSQL_VERSION_ID) {
1144       /*
1145         This check is also done in DDSE_dict_init() based on the version
1146         number from the DD tablespace header. Here, we repeat the check,
1147         this time based on the server version number stored in the DD
1148         table 'dd_properties'. The two checks should give the same result,
1149         so this check should never fail; hence, the debug assert.
1150       */
1151       if (!bootstrap::DD_bootstrap_ctx::instance().supported_server_version()) {
1152         LogErr(ERROR_LEVEL, ER_SERVER_UPGRADE_VERSION_NOT_SUPPORTED,
1153                actual_server_version);
1154         DBUG_ASSERT(false);
1155         return true;
1156       }
1157     }
1158 
1159     /*
1160       Reject restarting with a changed LCTN setting, since the collation
1161       for LCTN-dependent columns is decided during server initialization.
1162     */
1163     uint actual_lctn = 0;
1164     exists = false;
1165     if (dd::tables::DD_properties::instance().get(thd, "LCTN", &actual_lctn,
1166                                                   &exists) ||
1167         !exists) {
1168       LogErr(WARNING_LEVEL, ER_LCTN_NOT_FOUND, lower_case_table_names);
1169     } else if (actual_lctn != lower_case_table_names) {
1170       LogErr(ERROR_LEVEL, ER_LCTN_CHANGED, lower_case_table_names, actual_lctn);
1171       return true;
1172     }
1173   }
1174 
1175   if (bootstrap::DD_bootstrap_ctx::instance().is_initialize())
1176     LogErr(INFORMATION_LEVEL, ER_DD_INITIALIZE, dd::DD_VERSION);
1177   else if (bootstrap::DD_bootstrap_ctx::instance().is_restart())
1178     LogErr(INFORMATION_LEVEL, ER_DD_RESTART, dd::DD_VERSION);
1179   else if (bootstrap::DD_bootstrap_ctx::instance().is_minor_downgrade())
1180     LogErr(INFORMATION_LEVEL, ER_DD_MINOR_DOWNGRADE, actual_version,
1181            dd::DD_VERSION);
1182   else {
1183     /*
1184       If none of the above, then this must be DD upgrade or server
1185       upgrade, or both.
1186     */
1187     if (bootstrap::DD_bootstrap_ctx::instance().is_dd_upgrade()) {
1188       LogErr(SYSTEM_LEVEL, ER_DD_UPGRADE, actual_version, dd::DD_VERSION);
1189       log_sink_buffer_check_timeout();
1190       sysd::notify("STATUS=Data Dictionary upgrade in progress\n");
1191     }
1192     if (bootstrap::DD_bootstrap_ctx::instance().is_server_upgrade()) {
1193       // This condition is hit only if upgrade has been skipped before
1194       if (opt_upgrade_mode == UPGRADE_NONE) {
1195         LogErr(ERROR_LEVEL, ER_SERVER_UPGRADE_OFF);
1196         return true;
1197       }
1198       LogErr(INFORMATION_LEVEL, ER_SERVER_UPGRADE_FROM_VERSION,
1199              upgraded_server_version, MYSQL_VERSION_ID);
1200     }
1201     DBUG_ASSERT(bootstrap::DD_bootstrap_ctx::instance().is_dd_upgrade() ||
1202                 bootstrap::DD_bootstrap_ctx::instance().is_server_upgrade());
1203   }
1204 
1205   /*
1206     Unless this is initialization or restart, we must update the
1207     System_tables registry with the information from the 'dd_properties'
1208     regarding the actual DD tables.
1209   */
1210   if (!bootstrap::DD_bootstrap_ctx::instance().is_initialize() &&
1211       bootstrap::DD_bootstrap_ctx::instance().is_dd_upgrade() &&
1212       update_system_tables(thd)) {
1213     return true;
1214   }
1215 
1216   bootstrap::DD_bootstrap_ctx::instance().set_stage(
1217       bootstrap::Stage::FETCHED_PROPERTIES);
1218 
1219   return false;
1220 }
1221 
is_non_inert_dd_or_ddse_table(System_tables::Types table_type)1222 bool is_non_inert_dd_or_ddse_table(System_tables::Types table_type) {
1223   return table_type == System_tables::Types::CORE ||
1224          table_type == System_tables::Types::SECOND ||
1225          table_type == System_tables::Types::DDSE_PRIVATE ||
1226          table_type == System_tables::Types::DDSE_PROTECTED;
1227 }
1228 
create_tables(THD * thd,const std::set<String_type> * create_set)1229 bool create_tables(THD *thd, const std::set<String_type> *create_set) {
1230   // Turn off FK checks, this is needed since we have cyclic FKs.
1231   if (dd::execute_query(thd, "SET FOREIGN_KEY_CHECKS= 0")) return true;
1232 
1233   /*
1234     Decide whether we should create actual or target tables. For plain
1235     restart and initialize, we create the target tables. For the second
1236     table creation stage during upgrade, we also create target tables.
1237     So we create the actual tables only during the first table creation
1238     stage for upgrade, and for minor downgrade.
1239   */
1240   bool create_target_tables = true;
1241   if (bootstrap::DD_bootstrap_ctx::instance().get_stage() ==
1242           bootstrap::Stage::FETCHED_PROPERTIES &&
1243       (bootstrap::DD_bootstrap_ctx::instance().is_dd_upgrade() ||
1244        bootstrap::DD_bootstrap_ctx::instance().is_minor_downgrade()))
1245     create_target_tables = false;
1246 
1247   /*
1248     Iterate over DD tables and create the tables. Note that we do not iterate
1249     over INERT tables here, there is currently only one INERT table (the
1250     'dd_properties'), and it is created in 'initialize_dd_properties' in
1251     order to get hold of e.g. version information.
1252   */
1253   bool error = false;
1254   for (System_tables::Const_iterator it = System_tables::instance()->begin();
1255        it != System_tables::instance()->end() && !error; ++it) {
1256     if (is_non_inert_dd_or_ddse_table((*it)->property())) {
1257       /*
1258         If a create set is submitted, create only the target tables that
1259         are in the create set.
1260       */
1261       if (create_set == nullptr ||
1262           create_set->find((*it)->entity()->name()) != create_set->end()) {
1263         /*
1264           Use the actual or target definition to create the table depending
1265           on the context.
1266         */
1267         if (create_target_tables)
1268           error = create_target_table(thd, (*it)->entity());
1269         else
1270           error = create_actual_table(thd, (*it)->entity());
1271       }
1272     }
1273   }
1274 
1275   // Turn FK checks back on.
1276   if (error || dd::execute_query(thd, "SET FOREIGN_KEY_CHECKS= 1")) return true;
1277 
1278   bootstrap::DD_bootstrap_ctx::instance().set_stage(
1279       bootstrap::Stage::CREATED_TABLES);
1280 
1281   return false;
1282 }
1283 
sync_meta_data(THD * thd)1284 bool sync_meta_data(THD *thd) {
1285   // Acquire exclusive meta data locks for the relevant DD objects.
1286   if (acquire_exclusive_mdl(thd)) return true;
1287 
1288   {
1289     /*
1290       Use a scoped auto releaser to make sure the cached objects are released
1291       before the shared cache is reset.
1292     */
1293     dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
1294 
1295     /*
1296       First, we acquire the DD schema and tablespace and keep them in
1297       local variables. The DD table objects are acquired and put into
1298       a vector. We also get hold of the corresponding persisted objects.
1299 
1300       In this way, we make sure the shared cache is populated. The auto
1301       releaser will make sure the objects are not evicted. This must be
1302       ensured since we need to make sure the ids stay consistent across
1303       all objects in the shared cache.
1304     */
1305 
1306     const Schema *dd_schema = nullptr;
1307     const Tablespace *dd_tspace = nullptr;
1308     if (thd->dd_client()->acquire(dd::String_type(MYSQL_SCHEMA_NAME.str),
1309                                   &dd_schema) ||
1310         thd->dd_client()->acquire(dd::String_type(MYSQL_TABLESPACE_NAME.str),
1311                                   &dd_tspace))
1312       return dd::end_transaction(thd, true);
1313 
1314     std::vector<Table *> dd_tables;  // Owned by the shared cache.
1315     for (System_tables::Const_iterator it = System_tables::instance()->begin();
1316          it != System_tables::instance()->end(); ++it) {
1317       // Skip extraneous tables during minor downgrade.
1318       if ((*it)->entity() == nullptr) continue;
1319 
1320       const dd::Table *dd_table = nullptr;
1321       if (thd->dd_client()->acquire(MYSQL_SCHEMA_NAME.str,
1322                                     (*it)->entity()->name(), &dd_table))
1323         return dd::end_transaction(thd, true);
1324       dd_tables.push_back(const_cast<Table *>(dd_table));
1325     }
1326 
1327     // Get the persisted DD schema and tablespace.
1328     Schema::Name_key schema_key;
1329     dd_schema->update_name_key(&schema_key);
1330     const Schema *tmp_schema = nullptr;
1331 
1332     Tablespace::Name_key tspace_key;
1333     dd_tspace->update_name_key(&tspace_key);
1334     const Tablespace *tmp_tspace = nullptr;
1335 
1336     if (dd::cache::Storage_adapter::instance()->get(
1337             thd, schema_key, ISO_READ_COMMITTED, true, &tmp_schema) ||
1338         dd::cache::Storage_adapter::instance()->get(
1339             thd, tspace_key, ISO_READ_COMMITTED, true, &tmp_tspace))
1340       return dd::end_transaction(thd, true);
1341 
1342     DBUG_ASSERT(tmp_schema != nullptr && tmp_tspace != nullptr);
1343     std::unique_ptr<Schema> persisted_dd_schema(
1344         const_cast<Schema *>(tmp_schema));
1345     std::unique_ptr<Tablespace> persisted_dd_tspace(
1346         const_cast<Tablespace *>(tmp_tspace));
1347 
1348     // If the persisted meta data indicates that the DD tablespace is
1349     // encrypted, then we record this fact to make sure the DDL statements
1350     // that are genereated during e.g. upgrade will have the correct
1351     // encryption option.
1352     String_type encryption("");
1353     Object_table_definition_impl::set_dd_tablespace_encrypted(
1354         persisted_dd_tspace->options().exists("encryption") &&
1355         !persisted_dd_tspace->options().get("encryption", &encryption) &&
1356         encryption == "Y");
1357 
1358     // Get the persisted DD table objects into a vector.
1359     std::vector<std::unique_ptr<Table_impl>> persisted_dd_tables;
1360     for (System_tables::Const_iterator it = System_tables::instance()->begin();
1361          it != System_tables::instance()->end(); ++it) {
1362       // Skip extraneous tables during minor downgrade.
1363       if ((*it)->entity() == nullptr) continue;
1364 
1365       const dd::Abstract_table *dd_table = nullptr;
1366       dd::Abstract_table::Name_key table_key;
1367       Abstract_table::update_name_key(&table_key, persisted_dd_schema->id(),
1368                                       (*it)->entity()->name());
1369 
1370       if (dd::cache::Storage_adapter::instance()->get(
1371               thd, table_key, ISO_READ_COMMITTED, true, &dd_table))
1372         return dd::end_transaction(thd, true);
1373 
1374       std::unique_ptr<Table_impl> persisted_dd_table(
1375           dynamic_cast<Table_impl *>(const_cast<Abstract_table *>(dd_table)));
1376       persisted_dd_tables.push_back(std::move(persisted_dd_table));
1377     }
1378 
1379     // Drop the tablespaces with type PREDEFINED_DDSE from the storage adapter.
1380     for (System_tablespaces::Const_iterator it =
1381              System_tablespaces::instance()->begin(
1382                  System_tablespaces::Types::PREDEFINED_DDSE);
1383          it != System_tablespaces::instance()->end();
1384          it = System_tablespaces::instance()->next(
1385              it, System_tablespaces::Types::PREDEFINED_DDSE)) {
1386       const Tablespace *tspace = nullptr;
1387       if (thd->dd_client()->acquire((*it)->entity()->get_name(), &tspace))
1388         return dd::end_transaction(thd, true);
1389 
1390       dd::cache::Storage_adapter::instance()->core_drop(thd, tspace);
1391     }
1392     /*
1393       We have now populated the shared cache with the core objects. The
1394       scoped auto releaser makes sure we will not evict the objects from
1395       the shared cache until the auto releaser exits scope. Thus, within
1396       the scope of the auto releaser, we can modify the contents of the
1397       core registry in the storage adapter without risking that this will
1398       interfere with the contents of the shared cache, because the DD
1399       transactions will acquire the core objects from the shared cache.
1400     */
1401 
1402     /*
1403       We have also read the DD schema and tablespace as well as the DD
1404       tables from persistent storage. The last thing we do before resetting
1405       the shared cache is to update the contents of the core registry to
1406       match the persisted objects. First, we update the core registry with
1407       the persisted DD schema and tablespace.
1408     */
1409     dd::cache::Storage_adapter::instance()->core_drop(thd, dd_schema);
1410     dd::cache::Storage_adapter::instance()->core_store(
1411         thd, persisted_dd_schema.get());
1412 
1413     dd::cache::Storage_adapter::instance()->core_drop(thd, dd_tspace);
1414     dd::cache::Storage_adapter::instance()->core_store(
1415         thd, persisted_dd_tspace.get());
1416 
1417     // Make sure the IDs after storing are as expected.
1418     DBUG_ASSERT(persisted_dd_schema->id() == 1);
1419     DBUG_ASSERT(persisted_dd_tspace->id() == 1);
1420 
1421     /*
1422       Finally, we update the core registry of the DD tables. This must be
1423       done in two loops to avoid issues related to overlapping ID sequences.
1424     */
1425     std::vector<Table *>::const_iterator table_it = dd_tables.begin();
1426     for (System_tables::Const_iterator it = System_tables::instance()->begin();
1427          it != System_tables::instance()->end() && table_it != dd_tables.end();
1428          ++it, ++table_it) {
1429       /*
1430         If we are in the process of upgrading, there may not be an entry
1431         in the dd_tables for new tables that have been added after the
1432         version we are upgrading from.
1433       */
1434       if ((*table_it) != nullptr) {
1435         DBUG_ASSERT((*it)->entity()->name() == (*table_it)->name());
1436         dd::cache::Storage_adapter::instance()->core_drop(thd, *table_it);
1437       }
1438     }
1439 
1440     std::vector<std::unique_ptr<Table_impl>>::const_iterator persisted_it =
1441         persisted_dd_tables.begin();
1442     for (System_tables::Const_iterator it = System_tables::instance()->begin();
1443          it != System_tables::instance()->end() &&
1444          persisted_it != persisted_dd_tables.end();
1445          ++it, ++persisted_it) {
1446       /*
1447         If we are in the process of upgrading, there may not be an entry
1448         in the persisted_dd_tables for new tables that have been added after
1449         the version we are upgrading from.
1450       */
1451       if ((*persisted_it) == nullptr) continue;
1452 
1453       if ((*it)->property() == System_tables::Types::CORE) {
1454         dd::cache::Storage_adapter::instance()->core_store(
1455             thd, static_cast<Table *>((*persisted_it).get()));
1456       }
1457     }
1458   }
1459 
1460   /*
1461     Now, the auto releaser has released the objects, and we can go ahead and
1462     reset the shared cache.
1463   */
1464   dd::cache::Shared_dictionary_cache::instance()->reset(true);
1465   bootstrap::DD_bootstrap_ctx::instance().set_stage(bootstrap::Stage::SYNCED);
1466 
1467   // Commit and flush tables to force re-opening using the refreshed meta data.
1468   if (dd::end_transaction(thd, false) || dd::execute_query(thd, "FLUSH TABLES"))
1469     return true;
1470 
1471   // Get hold of the temporary actual and target schema names.
1472   String_type target_schema_name;
1473   bool target_schema_exists = false;
1474   if (dd::tables::DD_properties::instance().get(thd, "UPGRADE_TARGET_SCHEMA",
1475                                                 &target_schema_name,
1476                                                 &target_schema_exists))
1477     return true;
1478 
1479   String_type actual_schema_name;
1480   bool actual_schema_exists = false;
1481   if (dd::tables::DD_properties::instance().get(thd, "UPGRADE_ACTUAL_SCHEMA",
1482                                                 &actual_schema_name,
1483                                                 &actual_schema_exists))
1484     return true;
1485 
1486   // Reset the DDSE local dictionary cache.
1487   handlerton *ddse = ha_resolve_by_legacy_type(thd, DB_TYPE_INNODB);
1488   if (ddse->dict_cache_reset == nullptr) return true;
1489 
1490   for (System_tables::Const_iterator it = System_tables::instance()->begin();
1491        it != System_tables::instance()->end(); ++it) {
1492     // Skip extraneous tables during minor downgrade.
1493     if ((*it)->entity() == nullptr) continue;
1494 
1495     if ((*it)->property() == System_tables::Types::CORE ||
1496         (*it)->property() == System_tables::Types::SECOND) {
1497       ddse->dict_cache_reset(MYSQL_SCHEMA_NAME.str,
1498                              (*it)->entity()->name().c_str());
1499       if (target_schema_exists && !target_schema_name.empty())
1500         ddse->dict_cache_reset(target_schema_name.c_str(),
1501                                (*it)->entity()->name().c_str());
1502       if (actual_schema_exists && !actual_schema_name.empty())
1503         ddse->dict_cache_reset(actual_schema_name.c_str(),
1504                                (*it)->entity()->name().c_str());
1505     }
1506   }
1507 
1508   /*
1509     At this point, we're to a large extent open for business.
1510     If there are leftover schema names from upgrade, delete them
1511     and remove the names from the DD properties.
1512   */
1513   if (target_schema_exists && !target_schema_name.empty()) {
1514     std::stringstream ss;
1515     ss << "DROP SCHEMA IF EXISTS " << target_schema_name;
1516     if (dd::execute_query(thd, ss.str().c_str())) return true;
1517   }
1518 
1519   if (actual_schema_exists && !actual_schema_name.empty()) {
1520     std::stringstream ss;
1521     ss << "DROP SCHEMA IF EXISTS " << actual_schema_name;
1522     if (dd::execute_query(thd, ss.str().c_str())) return true;
1523   }
1524 
1525   /*
1526    The statements above are auto committed, so there is nothing uncommitted
1527    at this stage. Go ahead and remove the schema keys.
1528   */
1529   if (actual_schema_exists)
1530     (void)dd::tables::DD_properties::instance().remove(thd,
1531                                                        "UPGRADE_ACTUAL_SCHEMA");
1532 
1533   if (target_schema_exists)
1534     (void)dd::tables::DD_properties::instance().remove(thd,
1535                                                        "UPGRADE_TARGET_SCHEMA");
1536 
1537   if (actual_schema_exists || target_schema_exists)
1538     return dd::end_transaction(thd, false);
1539 
1540   return false;
1541 }
1542 
update_properties(THD * thd,const std::set<String_type> * create_set,const std::set<String_type> * remove_set,const String_type & target_table_schema_name)1543 bool update_properties(THD *thd, const std::set<String_type> *create_set,
1544                        const std::set<String_type> *remove_set,
1545                        const String_type &target_table_schema_name) {
1546   /*
1547     Populate the dd properties with the SQL DDL and SE private data.
1548     Store meta data of non-inert tables only.
1549   */
1550   std::unique_ptr<dd::Properties> system_tables_props(
1551       dd::Properties::parse_properties(""));
1552 
1553   dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
1554   for (System_tables::Const_iterator it = System_tables::instance()->begin();
1555        it != System_tables::instance()->end(); ++it) {
1556     if (is_non_inert_dd_or_ddse_table((*it)->property())) {
1557       /*
1558         This will not be called for minor downgrade, so all tables
1559         will have a corresponding Object_table.
1560       */
1561       DBUG_ASSERT((*it)->entity() != nullptr);
1562       const Object_table_definition *table_def =
1563           (*it)->entity()->target_table_definition();
1564 
1565       // May be null for abandoned tables, which should be skipped.
1566       if (table_def == nullptr) {
1567         continue;
1568       }
1569 
1570       /*
1571         Tables that are in the remove_set, but not in the create_set,
1572         should not be reflected in the DD properties.
1573       */
1574       if (remove_set != nullptr && create_set != nullptr &&
1575           remove_set->find((*it)->entity()->name()) != remove_set->end() &&
1576           create_set->find((*it)->entity()->name()) == create_set->end()) {
1577         continue;
1578       }
1579 
1580       /*
1581         If a create set is submitted, use this to decide whether we should
1582         get the meta data from the table in the 'mysql' schema or the temporary
1583         target schema.
1584       */
1585       String_type table_schema_name{MYSQL_SCHEMA_NAME.str};
1586       if (create_set != nullptr &&
1587           create_set->find((*it)->entity()->name()) != create_set->end()) {
1588         table_schema_name = target_table_schema_name;
1589       }
1590 
1591       /*
1592         Acquire the table object to get hold of the se private data etc.
1593         Note that we must acquire it from the appropriate schema.
1594       */
1595       const dd::Table *dd_table = nullptr;
1596       if (thd->dd_client()->acquire(table_schema_name, (*it)->entity()->name(),
1597                                     &dd_table))
1598         return dd::end_transaction(thd, true);
1599 
1600       // All non-abandoned tables should have a table object present.
1601       DBUG_ASSERT(dd_table != nullptr);
1602 
1603       std::unique_ptr<dd::Properties> tbl_props(
1604           dd::Properties::parse_properties(""));
1605 
1606       using dd::tables::DD_properties;
1607       tbl_props->set(DD_properties::dd_key(DD_properties::DD_property::ID),
1608                      dd_table->se_private_id());
1609       tbl_props->set(DD_properties::dd_key(DD_properties::DD_property::DATA),
1610                      dd_table->se_private_data().raw_string());
1611       tbl_props->set(
1612           DD_properties::dd_key(DD_properties::DD_property::SPACE_ID),
1613           dd_table->tablespace_id());
1614 
1615       // Store the structured representation of the table definition.
1616       std::unique_ptr<Properties> definition(Properties::parse_properties(""));
1617       table_def->store_into_properties(definition.get());
1618       tbl_props->set(DD_properties::dd_key(DD_properties::DD_property::DEF),
1619                      definition->raw_string());
1620 
1621       // Store the se private data for each index.
1622       dd::Table::Index_collection::const_iterator idx(
1623           dd_table->indexes().begin());
1624       for (int count = 0; idx != dd_table->indexes().end(); ++idx, ++count) {
1625         std::stringstream ss;
1626         ss << DD_properties::dd_key(DD_properties::DD_property::IDX) << count;
1627         tbl_props->set(ss.str().c_str(),
1628                        (*idx)->se_private_data().raw_string());
1629       }
1630 
1631       // Store the se private data for each column.
1632       dd::Table::Column_collection::const_iterator col(
1633           dd_table->columns().begin());
1634       for (int count = 0; col != dd_table->columns().end(); ++col, ++count) {
1635         std::stringstream ss;
1636         ss << DD_properties::dd_key(DD_properties::DD_property::COL) << count;
1637         tbl_props->set(ss.str().c_str(),
1638                        (*col)->se_private_data().raw_string());
1639       }
1640 
1641       // All tables should be reflected in the System tables list.
1642       system_tables_props->set(dd_table->name(), tbl_props->raw_string());
1643     }
1644   }
1645   if (dd::tables::DD_properties::instance().set(thd, "SYSTEM_TABLES",
1646                                                 *system_tables_props.get()))
1647     return dd::end_transaction(thd, true);
1648 
1649   bootstrap::DD_bootstrap_ctx::instance().set_stage(
1650       bootstrap::Stage::STORED_DD_META_DATA);
1651 
1652   // Delay commit.
1653   return false;
1654 }
1655 
update_versions(THD * thd,bool is_dd_upgrade_57)1656 bool update_versions(THD *thd, bool is_dd_upgrade_57) {
1657   /*
1658     During initialize, store the DD version number, the LCTN used, and the
1659     mysqld server version.
1660   */
1661   if (opt_initialize) {
1662     if (dd::tables::DD_properties::instance().set(thd, "DD_VERSION",
1663                                                   dd::DD_VERSION) ||
1664         dd::tables::DD_properties::instance().set(
1665             thd, "MINOR_DOWNGRADE_THRESHOLD",
1666             dd::DD_VERSION_MINOR_DOWNGRADE_THRESHOLD) ||
1667         dd::tables::DD_properties::instance().set(thd, "SDI_VERSION",
1668                                                   dd::SDI_VERSION) ||
1669         dd::tables::DD_properties::instance().set(thd, "LCTN",
1670                                                   lower_case_table_names) ||
1671         dd::tables::DD_properties::instance().set(thd, "MYSQLD_VERSION_LO",
1672                                                   MYSQL_VERSION_ID) ||
1673         dd::tables::DD_properties::instance().set(thd, "MYSQLD_VERSION_HI",
1674                                                   MYSQL_VERSION_ID) ||
1675         dd::tables::DD_properties::instance().set(thd, "MYSQLD_VERSION",
1676                                                   MYSQL_VERSION_ID))
1677       return dd::end_transaction(thd, true);
1678 
1679     if (is_dd_upgrade_57) {
1680       if (dd::tables::DD_properties::instance().set(
1681               thd, "MYSQLD_VERSION_UPGRADED", bootstrap::SERVER_VERSION_50700))
1682         return true;
1683       bootstrap::DD_bootstrap_ctx::instance().set_upgraded_server_version(
1684           bootstrap::SERVER_VERSION_50700);
1685     } else {
1686       if (dd::tables::DD_properties::instance().set(
1687               thd, "MYSQLD_VERSION_UPGRADED", MYSQL_VERSION_ID))
1688         return true;
1689       bootstrap::DD_bootstrap_ctx::instance().set_upgraded_server_version(
1690           MYSQL_VERSION_ID);
1691     }
1692   } else {
1693     uint mysqld_version_lo = 0;
1694     uint mysqld_version_hi = 0;
1695     uint mysqld_version = 0;
1696     uint upgraded_server_version = 0;
1697     bool exists_lo = false;
1698     bool exists_hi = false;
1699     bool exists = false;
1700     bool exists_upgraded_version = false;
1701     if ((dd::tables::DD_properties::instance().get(
1702              thd, "MYSQLD_VERSION_LO", &mysqld_version_lo, &exists_lo) ||
1703          !exists_lo) ||
1704         (dd::tables::DD_properties::instance().get(
1705              thd, "MYSQLD_VERSION_HI", &mysqld_version_hi, &exists_hi) ||
1706          !exists_hi) ||
1707         (dd::tables::DD_properties::instance().get(thd, "MYSQLD_VERSION",
1708                                                    &mysqld_version, &exists) ||
1709          !exists))
1710       return dd::end_transaction(thd, true);
1711 
1712     if (dd::tables::DD_properties::instance().get(
1713             thd, "MYSQLD_VERSION_UPGRADED", &upgraded_server_version,
1714             &exists_upgraded_version) ||
1715         !exists_upgraded_version) {
1716       if (dd::tables::DD_properties::instance().set(
1717               thd, "MYSQLD_VERSION_UPGRADED", mysqld_version))
1718         return true;
1719       upgraded_server_version = mysqld_version;
1720     }
1721     bootstrap::DD_bootstrap_ctx::instance().set_upgraded_server_version(
1722         upgraded_server_version);
1723 
1724     if ((mysqld_version_lo > MYSQL_VERSION_ID &&
1725          dd::tables::DD_properties::instance().set(thd, "MYSQLD_VERSION_LO",
1726                                                    MYSQL_VERSION_ID)) ||
1727         (mysqld_version_hi < MYSQL_VERSION_ID &&
1728          dd::tables::DD_properties::instance().set(thd, "MYSQLD_VERSION_HI",
1729                                                    MYSQL_VERSION_ID)) ||
1730         (mysqld_version != MYSQL_VERSION_ID &&
1731          dd::tables::DD_properties::instance().set(thd, "MYSQLD_VERSION",
1732                                                    MYSQL_VERSION_ID)))
1733       return dd::end_transaction(thd, true);
1734 
1735     /*
1736       Update the SDI version number in case of upgrade.
1737       Note that on downgrade, we keep the old SDI version.
1738     */
1739     uint stored_sdi_version = 0;
1740     bool exists_sdi = false;
1741     if ((dd::tables::DD_properties::instance().get(
1742              thd, "SDI_VERSION", &stored_sdi_version, &exists_sdi) ||
1743          !exists_sdi) ||
1744         (stored_sdi_version < dd::SDI_VERSION &&
1745          dd::tables::DD_properties::instance().set(thd, "SDI_VERSION",
1746                                                    dd::SDI_VERSION)))
1747       return dd::end_transaction(thd, true);
1748 
1749     /*
1750       Update the DD version number in case of upgrade.
1751       Note that on downgrade, we keep the old DD version.
1752     */
1753     uint dd_version = 0;
1754     bool exists_dd = false;
1755     if ((dd::tables::DD_properties::instance().get(thd, "DD_VERSION",
1756                                                    &dd_version, &exists_dd) ||
1757          !exists_dd) ||
1758         (dd_version < dd::DD_VERSION &&
1759          dd::tables::DD_properties::instance().set(thd, "DD_VERSION",
1760                                                    dd::DD_VERSION)))
1761       return dd::end_transaction(thd, true);
1762 
1763     /*
1764       Update the minor downgrade threshold in case of upgrade.
1765       Note that on downgrade, we keep the threshold version which is
1766       already present.
1767     */
1768     if (dd_version < dd::DD_VERSION &&
1769         dd::tables::DD_properties::instance().set(
1770             thd, "MINOR_DOWNGRADE_THRESHOLD",
1771             dd::DD_VERSION_MINOR_DOWNGRADE_THRESHOLD))
1772       return dd::end_transaction(thd, true);
1773   }
1774 
1775   /*
1776     Update the server version number in the bootstrap ctx and the
1777     DD tablespace header if we have been doing a server upgrade.
1778     Note that the update of the tablespace header is not rolled
1779     back in case of an abort, so this better be the last step we
1780     do before committing.
1781   */
1782   handlerton *ddse = ha_resolve_by_legacy_type(thd, DB_TYPE_INNODB);
1783   if (bootstrap::DD_bootstrap_ctx::instance().is_server_upgrade()) {
1784     if (ddse->dict_set_server_version == nullptr ||
1785         ddse->dict_set_server_version()) {
1786       LogErr(ERROR_LEVEL, ER_CANNOT_SET_SERVER_VERSION_IN_TABLESPACE_HEADER);
1787       return dd::end_transaction(thd, true);
1788     }
1789   }
1790 
1791 #ifndef DBUG_OFF
1792   /*
1793     Debug code to make sure that after updating version numbers, regardless
1794     of the type of initialization, restart or upgrade, the server version
1795     number in the DD tablespace header is indeed the same as this server's
1796     version number.
1797   */
1798   uint version = 0;
1799   DBUG_ASSERT(ddse->dict_get_server_version != nullptr);
1800   DBUG_ASSERT(!ddse->dict_get_server_version(&version));
1801   DBUG_ASSERT(version == MYSQL_VERSION_ID);
1802 #endif
1803 
1804   bootstrap::DD_bootstrap_ctx::instance().set_stage(
1805       bootstrap::Stage::VERSION_UPDATED);
1806 
1807   /*
1808     During upgrade, this will commit the swap of the old and new DD tables.
1809   */
1810   return dd::end_transaction(thd, false);
1811 }
1812 
1813 }  // namespace dd
1814