1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License, version 2.0, as published by the
8 Free Software Foundation.
9 
10 This program is also distributed with certain software (including but not
11 limited to OpenSSL) that is licensed under separate terms, as designated in a
12 particular file or component or in included license documentation. The authors
13 of MySQL hereby grant you an additional permission to link the program and
14 your derivative works with the separately licensed software that they have
15 included with MySQL.
16 
17 This program is distributed in the hope that it will be useful, but WITHOUT
18 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
19 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
20 for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
25 
26 *****************************************************************************/
27 
28 /** @file dict/dict0dict.cc
29  Data dictionary system
30 
31  Created 1/8/1996 Heikki Tuuri
32  ***********************************************************************/
33 
34 #include "my_config.h"
35 
36 #include <stdlib.h>
37 #include <strfunc.h>
38 #include <sys/types.h>
39 #include <algorithm>
40 #include <string>
41 
42 #ifndef UNIV_HOTBACKUP
43 #include "current_thd.h"
44 #endif /* !UNIV_HOTBACKUP */
45 #include "dict0dict.h"
46 #include "fil0fil.h"
47 #ifndef UNIV_HOTBACKUP
48 #include "fts0fts.h"
49 #endif /* !UNIV_HOTBACKUP */
50 #include "ha_prototypes.h"
51 #include "my_dbug.h"
52 
53 #ifndef UNIV_HOTBACKUP
54 #include "clone0api.h"
55 #include "mysqld.h"  // system_charset_info
56 #include "que0types.h"
57 #include "row0sel.h"
58 #endif /* !UNIV_HOTBACKUP */
59 
60 #ifdef UNIV_HOTBACKUP
61 #define dict_lru_validate(x) (true)
62 #define dict_lru_find_table(x) (true)
63 #define dict_non_lru_find_table(x) (true)
64 #endif /* UNIV_HOTBACKUP */
65 
66 /** dummy index for ROW_FORMAT=REDUNDANT supremum and infimum records */
67 dict_index_t *dict_ind_redundant;
68 
69 #if defined UNIV_DEBUG || defined UNIV_IBUF_DEBUG
70 /** Flag to control insert buffer debugging. */
71 extern uint ibuf_debug;
72 #endif /* UNIV_DEBUG || UNIV_IBUF_DEBUG */
73 
74 #include <algorithm>
75 #include <vector>
76 
77 #include "btr0btr.h"
78 #include "btr0cur.h"
79 #include "btr0sea.h"
80 #include "buf0buf.h"
81 #include "data0type.h"
82 #include "dict0boot.h"
83 #include "dict0crea.h"
84 #ifndef UNIV_HOTBACKUP
85 #include "dict0dd.h"
86 #endif /* !UNIV_HOTBACKUP */
87 #include "dict0mem.h"
88 #include "dict0priv.h"
89 #ifndef UNIV_HOTBACKUP
90 #include "dict0stats.h"
91 #endif /* !UNIV_HOTBACKUP */
92 #include "fsp0sysspace.h"
93 #ifndef UNIV_HOTBACKUP
94 #include "fts0fts.h"
95 #include "fts0types.h"
96 #include "lock0lock.h"
97 #endif /* !UNIV_HOTBACKUP */
98 #include "mach0data.h"
99 #include "mem0mem.h"
100 #include "os0once.h"
101 #include "page0page.h"
102 #include "page0zip.h"
103 #ifndef UNIV_HOTBACKUP
104 #include "pars0pars.h"
105 #include "pars0sym.h"
106 #include "que0que.h"
107 #endif /* !UNIV_HOTBACKUP */
108 #include "rem0cmp.h"
109 #include "row0ins.h"
110 #include "row0log.h"
111 #ifndef UNIV_HOTBACKUP
112 #include "row0merge.h"
113 #include "row0mysql.h"
114 #endif /* !UNIV_HOTBACKUP */
115 #include "row0upd.h"
116 #ifndef UNIV_HOTBACKUP
117 #include "ha_innodb.h"
118 #include "srv0mon.h"
119 #include "srv0start.h"
120 #include "sync0sync.h"
121 #include "trx0undo.h"
122 #include "ut0new.h"
123 #endif /* !UNIV_HOTBACKUP */
124 
125 static_assert(DATA_ROW_ID == 0, "DATA_ROW_ID != 0");
126 static_assert(DATA_TRX_ID == 1, "DATA_TRX_ID != 1");
127 static_assert(DATA_ROLL_PTR == 2, "DATA_ROLL_PTR != 2");
128 static_assert(DATA_N_SYS_COLS == 3, "DATA_N_SYS_COLS != 3");
129 static_assert(DATA_TRX_ID_LEN == 6, "DATA_TRX_ID_LEN != 6");
130 static_assert(DATA_ITT_N_SYS_COLS == 2, "DATA_ITT_N_SYS_COLS != 2");
131 
132 /** the dictionary system */
133 dict_sys_t *dict_sys = nullptr;
134 
135 /** The set of SE private IDs of DD tables. Used to tell whether a table is
136 a DD table. Since the DD tables can be rebuilt with new SE private IDs,
137 this set replaces checks based on ranges of IDs. */
138 std::set<dd::Object_id> dict_sys_t::s_dd_table_ids = {};
139 
140 /** The name of the data dictionary tablespace. */
141 const char *dict_sys_t::s_dd_space_name = "mysql";
142 
143 /** The file name of the data dictionary tablespace */
144 const char *dict_sys_t::s_dd_space_file_name = "mysql.ibd";
145 
146 /** The name of the hard-coded system tablespace. */
147 const char *dict_sys_t::s_sys_space_name = "innodb_system";
148 
149 /** The name of the predefined temporary tablespace. */
150 const char *dict_sys_t::s_temp_space_name = "innodb_temporary";
151 
152 /** The file name of the predefined temporary tablespace */
153 const char *dict_sys_t::s_temp_space_file_name = "ibtmp1";
154 
155 /** The hard-coded tablespace name innodb_file_per_table. */
156 const char *dict_sys_t::s_file_per_table_name = "innodb_file_per_table";
157 
158 /** These two undo tablespaces cannot be dropped. */
159 const char *dict_sys_t::s_default_undo_space_name_1 = "innodb_undo_001";
160 const char *dict_sys_t::s_default_undo_space_name_2 = "innodb_undo_002";
161 
162 /** the dictionary persisting structure */
163 dict_persist_t *dict_persist = nullptr;
164 
165 /** @brief the data dictionary rw-latch protecting dict_sys
166 
167 table create, drop, etc. reserve this in X-mode; implicit or
168 backround operations purge, rollback, foreign key checks reserve this
169 in S-mode; we cannot trust that MySQL protects implicit or background
170 operations a table drop since MySQL does not know of them; therefore
171 we need this; NOTE: a transaction which reserves this must keep book
172 on the mode in trx_t::dict_operation_lock_mode */
173 rw_lock_t *dict_operation_lock;
174 
175 /** Percentage of compression failures that are allowed in a single
176 round */
177 ulong zip_failure_threshold_pct = 5;
178 
179 /** Maximum percentage of a page that can be allowed as a pad to avoid
180 compression failures */
181 ulong zip_pad_max = 50;
182 
183 #define DICT_POOL_PER_TABLE_HASH          \
184   512 /*!< buffer pool max size per table \
185       hash table fixed size in bytes */
186 
187 #ifndef UNIV_HOTBACKUP
188 /** Identifies generated InnoDB foreign key names */
189 static char dict_ibfk[] = "_ibfk_";
190 
191 /** Array to store table_ids of INNODB_SYS_* TABLES */
192 static table_id_t dict_sys_table_id[SYS_NUM_SYSTEM_TABLES];
193 
194 /** Tries to find column names for the index and sets the col field of the
195 index.
196 @param[in]	table	table
197 @param[in]	index	index
198 @param[in]	add_v	new virtual columns added along with an add index call
199 @return true if the column names were found */
200 static ibool dict_index_find_and_set_cols(const dict_table_t *table,
201                                           dict_index_t *index,
202                                           const dict_add_v_col_t *add_v);
203 /** Builds the internal dictionary cache representation for a clustered
204  index, containing also system fields not defined by the user.
205  @return own: the internal representation of the clustered index */
206 static dict_index_t *dict_index_build_internal_clust(
207     const dict_table_t *table, /*!< in: table */
208     dict_index_t *index);      /*!< in: user representation of
209                                a clustered index */
210 /** Builds the internal dictionary cache representation for a non-clustered
211  index, containing also system fields not defined by the user.
212  @return own: the internal representation of the non-clustered index */
213 static dict_index_t *dict_index_build_internal_non_clust(
214     const dict_table_t *table, /*!< in: table */
215     dict_index_t *index);      /*!< in: user representation of
216                                a non-clustered index */
217 /** Builds the internal dictionary cache representation for an FTS index.
218  @return own: the internal representation of the FTS index */
219 static dict_index_t *dict_index_build_internal_fts(
220     dict_table_t *table,  /*!< in: table */
221     dict_index_t *index); /*!< in: user representation of an FTS index */
222 
223 /** Removes an index from the dictionary cache. */
224 static void dict_index_remove_from_cache_low(
225     dict_table_t *table, /*!< in/out: table */
226     dict_index_t *index, /*!< in, own: index */
227     ibool lru_evict);    /*!< in: TRUE if page being evicted
228                          to make room in the table LRU list */
229 
230 /** Calculate and update the redo log margin for current tables which
231 have some changed dynamic metadata in memory and have not been written
232 back to mysql.innodb_dynamic_metadata. Update LSN limit, which is used
233 to stop user threads when redo log is running out of space and they
234 do not hold latches (log.free_check_limit_sn). */
235 static void dict_persist_update_log_margin(void);
236 
237 /** Removes a table object from the dictionary cache. */
238 static void dict_table_remove_from_cache_low(
239     dict_table_t *table, /*!< in, own: table */
240     ibool lru_evict);    /*!< in: TRUE if evicting from LRU */
241 
242 #ifdef UNIV_DEBUG
243 /** Validate the dictionary table LRU list.
244  @return true if validate OK */
245 static ibool dict_lru_validate(void);
246 /** Check if table is in the dictionary table LRU list.
247  @return true if table found */
248 static ibool dict_lru_find_table(
249     const dict_table_t *find_table); /*!< in: table to find */
250 /** Check if a table exists in the dict table non-LRU list.
251  @return true if table found */
252 static ibool dict_non_lru_find_table(
253     const dict_table_t *find_table); /*!< in: table to find */
254 #endif                               /* UNIV_DEBUG */
255 
256 /* Stream for storing detailed information about the latest foreign key
257 and unique key errors. Only created if !srv_read_only_mode */
258 FILE *dict_foreign_err_file = nullptr;
259 /* mutex protecting the foreign and unique error buffers */
260 ib_mutex_t dict_foreign_err_mutex;
261 
262 /** Checks if the database name in two table names is the same.
263  @return true if same db name */
dict_tables_have_same_db(const char * name1,const char * name2)264 ibool dict_tables_have_same_db(const char *name1, /*!< in: table name in the
265                                                   form dbname '/' tablename */
266                                const char *name2) /*!< in: table name in the
267                                                   form dbname '/' tablename */
268 {
269   for (; *name1 == *name2; name1++, name2++) {
270     if (*name1 == '/') {
271       return (TRUE);
272     }
273     ut_a(*name1); /* the names must contain '/' */
274   }
275   return (FALSE);
276 }
277 
278 /** Return the end of table name where we have removed dbname and '/'.
279  @return table name */
dict_remove_db_name(const char * name)280 const char *dict_remove_db_name(const char *name) /*!< in: table name in the
281                                                   form dbname '/' tablename */
282 {
283   const char *s = strchr(name, '/');
284   ut_a(s);
285 
286   return (s + 1);
287 }
288 #endif /* !UNIV_HOTBACKUP */
289 
290 /** Get the database name length in a table name.
291  @return database name length */
dict_get_db_name_len(const char * name)292 ulint dict_get_db_name_len(const char *name) /*!< in: table name in the form
293                                              dbname '/' tablename */
294 {
295   const char *s;
296   s = strchr(name, '/');
297   if (s == nullptr) {
298     return (0);
299   }
300   return (s - name);
301 }
302 
303 #ifndef UNIV_HOTBACKUP
304 /** Reserves the dictionary system mutex for MySQL. */
dict_mutex_enter_for_mysql(void)305 void dict_mutex_enter_for_mysql(void) { mutex_enter(&dict_sys->mutex); }
306 
307 /** Releases the dictionary system mutex for MySQL. */
dict_mutex_exit_for_mysql(void)308 void dict_mutex_exit_for_mysql(void) { mutex_exit(&dict_sys->mutex); }
309 
310 /** Allocate and init a dict_table_t's stats latch.
311 This function must not be called concurrently on the same table object.
312 @param[in,out]	table_void	table whose stats latch to create */
dict_table_stats_latch_alloc(void * table_void)313 static void dict_table_stats_latch_alloc(void *table_void) {
314   dict_table_t *table = static_cast<dict_table_t *>(table_void);
315 
316   /* Note: rw_lock_create() will call the constructor */
317 
318   table->stats_latch =
319       static_cast<rw_lock_t *>(ut_malloc_nokey(sizeof(rw_lock_t)));
320 
321   ut_a(table->stats_latch != nullptr);
322 
323   rw_lock_create(dict_table_stats_key, table->stats_latch, SYNC_INDEX_TREE);
324 }
325 
326 /** Deinit and free a dict_table_t's stats latch.
327 This function must not be called concurrently on the same table object.
328 @param[in,out]	table	table whose stats latch to free */
dict_table_stats_latch_free(dict_table_t * table)329 static void dict_table_stats_latch_free(dict_table_t *table) {
330   rw_lock_free(table->stats_latch);
331   ut_free(table->stats_latch);
332 }
333 
334 /** Create a dict_table_t's stats latch or delay for lazy creation.
335 This function is only called from either single threaded environment
336 or from a thread that has not shared the table object with other threads.
337 @param[in,out]	table	table whose stats latch to create
338 @param[in]	enabled	if false then the latch is disabled
339 and dict_table_stats_lock()/unlock() become noop on this table. */
dict_table_stats_latch_create(dict_table_t * table,bool enabled)340 void dict_table_stats_latch_create(dict_table_t *table, bool enabled) {
341   if (!enabled) {
342     table->stats_latch = nullptr;
343     table->stats_latch_created = os_once::DONE;
344     return;
345   }
346 
347   /* We create this lazily the first time it is used. */
348   table->stats_latch = nullptr;
349   table->stats_latch_created = os_once::NEVER_DONE;
350 }
351 
352 /** Destroy a dict_table_t's stats latch.
353 This function is only called from either single threaded environment
354 or from a thread that has not shared the table object with other threads.
355 @param[in,out]	table	table whose stats latch to destroy */
dict_table_stats_latch_destroy(dict_table_t * table)356 void dict_table_stats_latch_destroy(dict_table_t *table) {
357   if (table->stats_latch_created == os_once::DONE &&
358       table->stats_latch != nullptr) {
359     dict_table_stats_latch_free(table);
360   }
361 }
362 
363 /** Lock the appropriate latch to protect a given table's statistics.
364 @param[in]	table		table whose stats to lock
365 @param[in]	latch_mode	RW_S_LATCH or RW_X_LATCH */
dict_table_stats_lock(dict_table_t * table,ulint latch_mode)366 void dict_table_stats_lock(dict_table_t *table, ulint latch_mode) {
367   ut_ad(table != nullptr);
368   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
369 
370   os_once::do_or_wait_for_done(&table->stats_latch_created,
371                                dict_table_stats_latch_alloc, table);
372 
373   if (table->stats_latch == nullptr) {
374     /* This is a dummy table object that is private in the current
375     thread and is not shared between multiple threads, thus we
376     skip any locking. */
377     return;
378   }
379 
380   switch (latch_mode) {
381     case RW_S_LATCH:
382       rw_lock_s_lock(table->stats_latch);
383       break;
384     case RW_X_LATCH:
385       rw_lock_x_lock(table->stats_latch);
386       break;
387     case RW_NO_LATCH:
388       /* fall through */
389     default:
390       ut_error;
391   }
392 }
393 
394 /** Unlock the latch that has been locked by dict_table_stats_lock().
395 @param[in]	table		table whose stats to unlock
396 @param[in]	latch_mode	RW_S_LATCH or RW_X_LATCH */
dict_table_stats_unlock(dict_table_t * table,ulint latch_mode)397 void dict_table_stats_unlock(dict_table_t *table, ulint latch_mode) {
398   ut_ad(table != nullptr);
399   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
400 
401   if (table->stats_latch == nullptr) {
402     /* This is a dummy table object that is private in the current
403     thread and is not shared between multiple threads, thus we
404     skip any locking. */
405     return;
406   }
407 
408   switch (latch_mode) {
409     case RW_S_LATCH:
410       rw_lock_s_unlock(table->stats_latch);
411       break;
412     case RW_X_LATCH:
413       rw_lock_x_unlock(table->stats_latch);
414       break;
415     case RW_NO_LATCH:
416       /* fall through */
417     default:
418       ut_error;
419   }
420 }
421 
422 /** Try to drop any indexes after an aborted index creation.
423  This can also be after a server kill during DROP INDEX. */
dict_table_try_drop_aborted(dict_table_t * table,table_id_t table_id,ulint ref_count)424 static void dict_table_try_drop_aborted(
425     dict_table_t *table, /*!< in: table, or NULL if it
426                          needs to be looked up again */
427     table_id_t table_id, /*!< in: table identifier */
428     ulint ref_count)     /*!< in: expected table->n_ref_count */
429 {
430   trx_t *trx;
431 
432   trx = trx_allocate_for_background();
433   trx->op_info = "try to drop any indexes after an aborted index creation";
434   row_mysql_lock_data_dictionary(trx);
435   trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
436 
437   if (table == nullptr) {
438     table = dd_table_open_on_id(table_id, nullptr, nullptr, true, true);
439 
440     /* Decrement the ref count. The table is MDL locked, so should
441     not be dropped */
442     if (table) {
443       dd_table_close(table, nullptr, nullptr, true);
444     }
445   } else {
446     ut_ad(table->id == table_id);
447   }
448 
449   if (table && table->get_ref_count() == ref_count && table->drop_aborted) {
450     /* Silence a debug assertion in row_merge_drop_indexes(). */
451     ut_d(table->acquire());
452     row_merge_drop_indexes(trx, table, TRUE);
453     ut_d(table->release());
454     ut_ad(table->get_ref_count() == ref_count);
455     trx_commit_for_mysql(trx);
456   }
457 
458   row_mysql_unlock_data_dictionary(trx);
459   trx_free_for_background(trx);
460 }
461 
462 /** When opening a table,
463  try to drop any indexes after an aborted index creation.
464  Release the dict_sys->mutex. */
dict_table_try_drop_aborted_and_mutex_exit(dict_table_t * table,ibool try_drop)465 static void dict_table_try_drop_aborted_and_mutex_exit(
466     dict_table_t *table, /*!< in: table (may be NULL) */
467     ibool try_drop)      /*!< in: FALSE if should try to
468                          drop indexes whose online creation
469                          was aborted */
470 {
471   if (try_drop && table != nullptr && table->drop_aborted &&
472       table->get_ref_count() == 1 && table->first_index()) {
473     /* Attempt to drop the indexes whose online creation
474     was aborted. */
475     table_id_t table_id = table->id;
476 
477     mutex_exit(&dict_sys->mutex);
478 
479     dict_table_try_drop_aborted(table, table_id, 1);
480   } else {
481     mutex_exit(&dict_sys->mutex);
482   }
483 }
484 #endif /* !UNIV_HOTBACKUP */
485 
486 /** Decrements the count of open handles to a table. */
dict_table_close(dict_table_t * table,ibool dict_locked,ibool try_drop)487 void dict_table_close(dict_table_t *table, /*!< in/out: table */
488                       ibool dict_locked, /*!< in: TRUE=data dictionary locked */
489                       ibool try_drop)    /*!< in: TRUE=try to drop any orphan
490                                          indexes after an aborted online
491                                          index creation */
492 {
493   ibool drop_aborted;
494 
495   ut_a(table->get_ref_count() > 0);
496 
497 #ifndef UNIV_HOTBACKUP
498 #ifdef UNIV_DEBUG
499   if (!table->is_intrinsic()) {
500     /* This is now only for validation in debug mode */
501     if (!dict_locked) {
502       mutex_enter(&dict_sys->mutex);
503     }
504 
505     ut_ad(dict_lru_validate());
506 
507     if (table->can_be_evicted) {
508       ut_ad(dict_lru_find_table(table));
509     } else {
510       ut_ad(dict_non_lru_find_table(table));
511     }
512 
513     if (!dict_locked) {
514       mutex_exit(&dict_sys->mutex);
515     }
516   }
517 #endif /* UNIV_DEBUG */
518 #endif /* !UNIV_HOTBACKUP */
519 
520   if (!table->is_intrinsic()) {
521     /* Ask for lock to prevent concurrent table open,
522     in case the race of n_ref_count and stat_initialized in
523     dict_stats_deinit(). See dict_table_t::acquire_with_lock() too.
524     We don't actually need dict_sys mutex any more here. */
525     table->lock();
526   }
527 
528   drop_aborted = try_drop && table->drop_aborted &&
529                  table->get_ref_count() == 1 && table->first_index();
530 
531   table->release();
532 
533 #ifndef UNIV_HOTBACKUP
534   /* Intrinsic table is not added to dictionary cache so skip other
535   cache specific actions. */
536   if (table->is_intrinsic()) {
537     return;
538   }
539 
540   /* Force persistent stats re-read upon next open of the table
541   so that FLUSH TABLE can be used to forcibly fetch stats from disk
542   if they have been manually modified. We reset table->stat_initialized
543   only if table reference count is 0 because we do not want too frequent
544   stats re-reads (e.g. in other cases than FLUSH TABLE). */
545   if (strchr(table->name.m_name, '/') != nullptr &&
546       table->get_ref_count() == 0 && dict_stats_is_persistent_enabled(table)) {
547     dict_stats_deinit(table);
548   }
549 
550   if (!dict_locked) {
551     table_id_t table_id = table->id;
552 
553     if (drop_aborted) {
554       ut_ad(0);
555       dict_table_try_drop_aborted(nullptr, table_id, 0);
556     }
557   }
558 #endif /* !UNIV_HOTBACKUP */
559 
560   if (!table->is_intrinsic()) {
561     table->unlock();
562   }
563 }
564 
565 #ifndef UNIV_HOTBACKUP
566 /** Closes the only open handle to a table and drops a table while assuring
567  that dict_sys->mutex is held the whole time.  This assures that the table
568  is not evicted after the close when the count of open handles goes to zero.
569  Because dict_sys->mutex is held, we do not need to call
570  dict_table_prevent_eviction().  */
dict_table_close_and_drop(trx_t * trx,dict_table_t * table)571 void dict_table_close_and_drop(
572     trx_t *trx,          /*!< in: data dictionary transaction */
573     dict_table_t *table) /*!< in/out: table */
574 {
575   ut_ad(mutex_own(&dict_sys->mutex));
576   ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
577   ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
578 
579   dict_table_close(table, TRUE, FALSE);
580 
581 #if defined UNIV_DEBUG || defined UNIV_DDL_DEBUG
582   /* Nobody should have initialized the stats of the newly created
583   table when this is called. So we know that it has not been added
584   for background stats gathering. */
585   ut_a(!table->stat_initialized);
586 #endif /* UNIV_DEBUG || UNIV_DDL_DEBUG */
587 
588   row_merge_drop_table(trx, table);
589 }
590 
591 /** Check if the table has a given (non_virtual) column.
592 @param[in]	table		table object
593 @param[in]	col_name	column name
594 @param[in]	col_nr		column number guessed, 0 as default
595 @return column number if the table has the specified column,
596 otherwise table->n_def */
dict_table_has_column(const dict_table_t * table,const char * col_name,ulint col_nr)597 ulint dict_table_has_column(const dict_table_t *table, const char *col_name,
598                             ulint col_nr) {
599   ulint col_max = table->n_def;
600 
601   ut_ad(table);
602   ut_ad(col_name);
603   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
604 
605   if (col_nr < col_max &&
606       innobase_strcasecmp(col_name, table->get_col_name(col_nr)) == 0) {
607     return (col_nr);
608   }
609 
610   /** The order of column may changed, check it with other columns */
611   for (ulint i = 0; i < col_max; i++) {
612     if (i != col_nr &&
613         innobase_strcasecmp(col_name, table->get_col_name(i)) == 0) {
614       return (i);
615     }
616   }
617 
618   return (col_max);
619 }
620 
621 /** Returns a virtual column's name.
622 @param[in]	table	target table
623 @param[in]	col_nr	virtual column number (nth virtual column)
624 @return column name or NULL if column number out of range. */
dict_table_get_v_col_name(const dict_table_t * table,ulint col_nr)625 const char *dict_table_get_v_col_name(const dict_table_t *table, ulint col_nr) {
626   const char *s;
627 
628   ut_ad(table);
629   ut_ad(col_nr < table->n_v_def);
630   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
631 
632   if (col_nr >= table->n_v_def) {
633     return (nullptr);
634   }
635 
636   s = table->v_col_names;
637 
638   if (s != nullptr) {
639     for (ulint i = 0; i < col_nr; i++) {
640       s += strlen(s) + 1;
641     }
642   }
643 
644   return (s);
645 }
646 
647 /** Search virtual column's position in InnoDB according to its position
648 in original table's position
649 @param[in]	table	target table
650 @param[in]	col_nr	column number (nth column in the MySQL table)
651 @return virtual column's position in InnoDB, ULINT_UNDEFINED if not find */
dict_table_get_v_col_pos_for_mysql(const dict_table_t * table,ulint col_nr)652 static ulint dict_table_get_v_col_pos_for_mysql(const dict_table_t *table,
653                                                 ulint col_nr) {
654   ulint i;
655 
656   ut_ad(table);
657   ut_ad(col_nr < static_cast<ulint>(table->n_t_def));
658   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
659 
660   for (i = 0; i < table->n_v_def; i++) {
661     if (col_nr == dict_get_v_col_mysql_pos(table->v_cols[i].m_col.ind)) {
662       break;
663     }
664   }
665 
666   if (i == table->n_v_def) {
667     return (ULINT_UNDEFINED);
668   }
669 
670   return (i);
671 }
672 
673 /** Returns a virtual column's name according to its original
674 MySQL table position.
675 @param[in]	table	target table
676 @param[in]	col_nr	column number (nth column in the table)
677 @return column name. */
dict_table_get_v_col_name_mysql(const dict_table_t * table,ulint col_nr)678 const char *dict_table_get_v_col_name_mysql(const dict_table_t *table,
679                                             ulint col_nr) {
680   ulint i = dict_table_get_v_col_pos_for_mysql(table, col_nr);
681 
682   if (i == ULINT_UNDEFINED) {
683     return (nullptr);
684   }
685 
686   return (dict_table_get_v_col_name(table, i));
687 }
688 
689 /** Get nth virtual column according to its original MySQL table position
690 @param[in]	table	target table
691 @param[in]	col_nr	column number in MySQL Table definition
692 @return dict_v_col_t ptr */
dict_table_get_nth_v_col_mysql(const dict_table_t * table,ulint col_nr)693 dict_v_col_t *dict_table_get_nth_v_col_mysql(const dict_table_t *table,
694                                              ulint col_nr) {
695   ulint i = dict_table_get_v_col_pos_for_mysql(table, col_nr);
696 
697   if (i == ULINT_UNDEFINED) {
698     return (nullptr);
699   }
700 
701   return (dict_table_get_nth_v_col(table, i));
702 }
703 
704 /** Allocate and init the autoinc latch of a given table.
705 This function must not be called concurrently on the same table object.
706 @param[in,out]	table_void	table whose autoinc latch to create */
dict_table_autoinc_alloc(void * table_void)707 static void dict_table_autoinc_alloc(void *table_void) {
708   dict_table_t *table = static_cast<dict_table_t *>(table_void);
709 
710   table->autoinc_mutex = UT_NEW_NOKEY(ib_mutex_t());
711   ut_a(table->autoinc_mutex != nullptr);
712   mutex_create(LATCH_ID_AUTOINC, table->autoinc_mutex);
713 
714   table->autoinc_persisted_mutex = UT_NEW_NOKEY(ib_mutex_t());
715   ut_a(table->autoinc_persisted_mutex != nullptr);
716   mutex_create(LATCH_ID_PERSIST_AUTOINC, table->autoinc_persisted_mutex);
717 }
718 
719 /** Allocate and init the zip_pad_mutex of a given index.
720 This function must not be called concurrently on the same index object.
721 @param[in,out]	index_void	index whose zip_pad_mutex to create */
dict_index_zip_pad_alloc(void * index_void)722 static void dict_index_zip_pad_alloc(void *index_void) {
723   dict_index_t *index = static_cast<dict_index_t *>(index_void);
724   index->zip_pad.mutex = UT_NEW_NOKEY(SysMutex());
725   ut_a(index->zip_pad.mutex != nullptr);
726   mutex_create(LATCH_ID_ZIP_PAD_MUTEX, index->zip_pad.mutex);
727 }
728 
729 /** Acquire the autoinc lock. */
dict_table_autoinc_lock(dict_table_t * table)730 void dict_table_autoinc_lock(dict_table_t *table) /*!< in/out: table */
731 {
732   os_once::do_or_wait_for_done(&table->autoinc_mutex_created,
733                                dict_table_autoinc_alloc, table);
734 
735   mutex_enter(table->autoinc_mutex);
736 }
737 
738 /** Acquire the zip_pad_mutex latch.
739 @param[in,out]	index	the index whose zip_pad_mutex to acquire.*/
dict_index_zip_pad_lock(dict_index_t * index)740 static void dict_index_zip_pad_lock(dict_index_t *index) {
741   os_once::do_or_wait_for_done(&index->zip_pad.mutex_created,
742                                dict_index_zip_pad_alloc, index);
743 
744   mutex_enter(index->zip_pad.mutex);
745 }
746 
747 /** Unconditionally set the autoinc counter. */
dict_table_autoinc_initialize(dict_table_t * table,ib_uint64_t value)748 void dict_table_autoinc_initialize(
749     dict_table_t *table, /*!< in/out: table */
750     ib_uint64_t value)   /*!< in: next value to assign to a row */
751 {
752   ut_ad(dict_table_autoinc_own(table));
753 
754   table->autoinc = value;
755 }
756 
757 /** Write redo logs for autoinc counter that is to be inserted, or to
758 update some existing smaller one to bigger.
759 @param[in,out]	table	InnoDB table object
760 @param[in]	value	AUTOINC counter to log
761 @param[in,out]	mtr	mini-transaction */
dict_table_autoinc_log(dict_table_t * table,uint64_t value,mtr_t * mtr)762 void dict_table_autoinc_log(dict_table_t *table, uint64_t value, mtr_t *mtr) {
763   bool log = false;
764 
765   mutex_enter(table->autoinc_persisted_mutex);
766 
767   if (table->autoinc_persisted < value) {
768     dict_table_autoinc_persisted_update(table, value);
769 
770     /* The only concern here is some concurrent thread may
771     change the dirty_status to METADATA_BUFFERED. And the
772     only function is dict_table_persist_to_dd_table_buffer_low(),
773     which could be called by checkpoint and will first set the
774     dirty_status to METADATA_BUFFERED, and then write back
775     the latest changes to DDTableBuffer, all of which are under
776     protection of dict_persist->mutex.
777 
778     If that function sets the dirty_status to METADATA_BUFFERED
779     first, below checking will force current thread to wait on
780     dict_persist->mutex. Above update to AUTOINC would be either
781     written back to DDTableBuffer or not. But the redo logs for
782     current change won't be counted into current checkpoint.
783     See how log_sys->dict_max_allowed_checkpoint_lsn is set.
784     So even a crash after below redo log flushed, no change lost.
785 
786     If that function sets the dirty_status after below checking,
787     which means current change would be written back to
788     DDTableBuffer. It's also safe. */
789     if (table->dirty_status.load() == METADATA_DIRTY) {
790       ut_ad(table->in_dirty_dict_tables_list);
791     } else {
792       dict_table_mark_dirty(table);
793     }
794 
795     log = true;
796   }
797 
798   mutex_exit(table->autoinc_persisted_mutex);
799 
800   if (log) {
801     PersistentTableMetadata metadata(table->id, table->version);
802     metadata.set_autoinc(value);
803 
804     Persister *persister = dict_persist->persisters->get(PM_TABLE_AUTO_INC);
805     persister->write_log(table->id, metadata, mtr);
806     /* No need to flush due to performance reason */
807   }
808 }
809 
810 /** Get all the FTS indexes on a table.
811 @param[in]	table	table
812 @param[out]	indexes	all FTS indexes on this table
813 @return number of FTS indexes */
dict_table_get_all_fts_indexes(dict_table_t * table,ib_vector_t * indexes)814 ulint dict_table_get_all_fts_indexes(dict_table_t *table,
815                                      ib_vector_t *indexes) {
816   dict_index_t *index;
817 
818   ut_a(ib_vector_size(indexes) == 0);
819 
820   for (index = table->first_index(); index; index = index->next()) {
821     if (index->type == DICT_FTS) {
822       ib_vector_push(indexes, &index);
823     }
824   }
825 
826   return (ib_vector_size(indexes));
827 }
828 
829 /** Reads the next autoinc value (== autoinc counter value), 0 if not yet
830  initialized.
831  @return value for a new row, or 0 */
dict_table_autoinc_read(const dict_table_t * table)832 ib_uint64_t dict_table_autoinc_read(const dict_table_t *table) /*!< in: table */
833 {
834   ut_ad(dict_table_autoinc_own(table));
835 
836   return (table->autoinc);
837 }
838 
839 /** Updates the autoinc counter if the value supplied is greater than the
840  current value. */
dict_table_autoinc_update_if_greater(dict_table_t * table,ib_uint64_t value)841 void dict_table_autoinc_update_if_greater(
842 
843     dict_table_t *table, /*!< in/out: table */
844     ib_uint64_t value)   /*!< in: value which was assigned to a row */
845 {
846   ut_ad(dict_table_autoinc_own(table));
847 
848   if (value > table->autoinc) {
849     table->autoinc = value;
850   }
851 }
852 
853 /** Release the autoinc lock. */
dict_table_autoinc_unlock(dict_table_t * table)854 void dict_table_autoinc_unlock(dict_table_t *table) /*!< in/out: table */
855 {
856   mutex_exit(table->autoinc_mutex);
857 }
858 
859 /** Returns TRUE if the index contains a column or a prefix of that column.
860 @param[in]	index		index
861 @param[in]	n		column number
862 @param[in]	is_virtual	whether it is a virtual col
863 @return true if contains the column or its prefix */
dict_index_contains_col_or_prefix(const dict_index_t * index,ulint n,bool is_virtual)864 ibool dict_index_contains_col_or_prefix(const dict_index_t *index, ulint n,
865                                         bool is_virtual) {
866   const dict_field_t *field;
867   const dict_col_t *col;
868   ulint pos;
869   ulint n_fields;
870 
871   ut_ad(index);
872   ut_ad(index->magic_n == DICT_INDEX_MAGIC_N);
873 
874   if (index->is_clustered()) {
875     return (TRUE);
876   }
877 
878   if (is_virtual) {
879     col = &dict_table_get_nth_v_col(index->table, n)->m_col;
880   } else {
881     col = index->table->get_col(n);
882   }
883 
884   n_fields = dict_index_get_n_fields(index);
885 
886   for (pos = 0; pos < n_fields; pos++) {
887     field = index->get_field(pos);
888 
889     if (col == field->col) {
890       return (TRUE);
891     }
892   }
893 
894   return (FALSE);
895 }
896 
897 /** Looks for a matching field in an index. The column has to be the same. The
898  column in index must be complete, or must contain a prefix longer than the
899  column in index2. That is, we must be able to construct the prefix in index2
900  from the prefix in index.
901  @return position in internal representation of the index;
902  ULINT_UNDEFINED if not contained */
dict_index_get_nth_field_pos(const dict_index_t * index,const dict_index_t * index2,ulint n)903 ulint dict_index_get_nth_field_pos(
904     const dict_index_t *index,  /*!< in: index from which to search */
905     const dict_index_t *index2, /*!< in: index */
906     ulint n)                    /*!< in: field number in index2 */
907 {
908   const dict_field_t *field;
909   const dict_field_t *field2;
910   ulint n_fields;
911   ulint pos;
912 
913   ut_ad(index);
914   ut_ad(index->magic_n == DICT_INDEX_MAGIC_N);
915 
916   field2 = index2->get_field(n);
917 
918   n_fields = dict_index_get_n_fields(index);
919 
920   /* Are we looking for a MBR (Minimum Bound Box) field of
921   a spatial index */
922   bool is_mbr_fld = (n == 0 && dict_index_is_spatial(index2));
923 
924   for (pos = 0; pos < n_fields; pos++) {
925     field = index->get_field(pos);
926 
927     /* The first field of a spatial index is a transformed
928     MBR (Minimum Bound Box) field made out of original column,
929     so its field->col still points to original cluster index
930     col, but the actual content is different. So we cannot
931     consider them equal if neither of them is MBR field */
932     if (pos == 0 && dict_index_is_spatial(index) && !is_mbr_fld) {
933       continue;
934     }
935 
936     if (field->col == field2->col &&
937         (field->prefix_len == 0 || (field->prefix_len >= field2->prefix_len &&
938                                     field2->prefix_len != 0))) {
939       return (pos);
940     }
941   }
942 
943   return (ULINT_UNDEFINED);
944 }
945 
946 /** Looks for non-virtual column n position in the clustered index.
947  @return position in internal representation of the clustered index */
dict_table_get_nth_col_pos(const dict_table_t * table,ulint n)948 ulint dict_table_get_nth_col_pos(const dict_table_t *table, /*!< in: table */
949                                  ulint n) /*!< in: column number */
950 {
951   return (table->first_index()->get_col_pos(n));
952 }
953 
954 /** Get the innodb column position for a non-virtual column according to
955 its original MySQL table position n
956 @param[in]	table	table
957 @param[in]	n	MySQL column position
958 @return column position in InnoDB */
dict_table_mysql_pos_to_innodb(const dict_table_t * table,ulint n)959 ulint dict_table_mysql_pos_to_innodb(const dict_table_t *table, ulint n) {
960   ut_ad(n < table->n_t_cols);
961 
962   if (table->n_v_def == 0) {
963     /* No virtual columns, the MySQL position is the same
964     as InnoDB position */
965     return (n);
966   }
967 
968   /* Find out how many virtual columns are stored in front of 'n' */
969   ulint v_before = 0;
970   for (ulint i = 0; i < table->n_v_def; ++i) {
971     if (table->v_cols[i].m_col.ind > n) {
972       break;
973     }
974 
975     ++v_before;
976   }
977 
978   ut_ad(n >= v_before);
979 
980   return (n - v_before);
981 }
982 
983 /** Checks if a column is in the ordering columns of the clustered index of a
984  table. Column prefixes are treated like whole columns.
985  @return true if the column, or its prefix, is in the clustered key */
dict_table_col_in_clustered_key(const dict_table_t * table,ulint n)986 ibool dict_table_col_in_clustered_key(
987     const dict_table_t *table, /*!< in: table */
988     ulint n)                   /*!< in: column number */
989 {
990   const dict_index_t *index;
991   const dict_field_t *field;
992   const dict_col_t *col;
993   ulint pos;
994   ulint n_fields;
995 
996   ut_ad(table);
997 
998   col = table->get_col(n);
999 
1000   index = table->first_index();
1001 
1002   n_fields = dict_index_get_n_unique(index);
1003 
1004   for (pos = 0; pos < n_fields; pos++) {
1005     field = index->get_field(pos);
1006 
1007     if (col == field->col) {
1008       return (TRUE);
1009     }
1010   }
1011 
1012   return (FALSE);
1013 }
1014 #endif /* !UNIV_HOTBACKUP */
1015 
1016 /** Inits the data dictionary module. */
dict_init(void)1017 void dict_init(void) {
1018   dict_operation_lock =
1019       static_cast<rw_lock_t *>(ut_zalloc_nokey(sizeof(*dict_operation_lock)));
1020 
1021   dict_sys = static_cast<dict_sys_t *>(ut_zalloc_nokey(sizeof(*dict_sys)));
1022 
1023   UT_LIST_INIT(dict_sys->table_LRU, &dict_table_t::table_LRU);
1024   UT_LIST_INIT(dict_sys->table_non_LRU, &dict_table_t::table_LRU);
1025 
1026   mutex_create(LATCH_ID_DICT_SYS, &dict_sys->mutex);
1027 
1028   dict_sys->table_hash = hash_create(
1029       buf_pool_get_curr_size() / (DICT_POOL_PER_TABLE_HASH * UNIV_WORD_SIZE));
1030 
1031   dict_sys->table_id_hash = hash_create(
1032       buf_pool_get_curr_size() / (DICT_POOL_PER_TABLE_HASH * UNIV_WORD_SIZE));
1033 
1034   rw_lock_create(dict_operation_lock_key, dict_operation_lock,
1035                  SYNC_DICT_OPERATION);
1036 
1037 #ifndef UNIV_HOTBACKUP
1038   if (!srv_read_only_mode) {
1039     dict_foreign_err_file = os_file_create_tmpfile(nullptr);
1040     ut_a(dict_foreign_err_file);
1041   }
1042 #endif /* !UNIV_HOTBACKUP */
1043 
1044   mutex_create(LATCH_ID_DICT_FOREIGN_ERR, &dict_foreign_err_mutex);
1045 }
1046 
1047 #ifndef UNIV_HOTBACKUP
1048 /** Move to the most recently used segment of the LRU list. */
dict_move_to_mru(dict_table_t * table)1049 void dict_move_to_mru(dict_table_t *table) /*!< in: table to move to MRU */
1050 {
1051   ut_ad(mutex_own(&dict_sys->mutex));
1052   ut_ad(dict_lru_validate());
1053   ut_ad(dict_lru_find_table(table));
1054 
1055   ut_a(table->can_be_evicted);
1056 
1057   UT_LIST_REMOVE(dict_sys->table_LRU, table);
1058 
1059   UT_LIST_ADD_FIRST(dict_sys->table_LRU, table);
1060 
1061   ut_ad(dict_lru_validate());
1062 }
1063 
1064 /** Returns a table object and increment its open handle count.
1065  NOTE! This is a high-level function to be used mainly from outside the
1066  'dict' module. Inside this directory dict_table_get_low
1067  is usually the appropriate function.
1068  @return table, NULL if does not exist */
dict_table_open_on_name(const char * table_name,ibool dict_locked,ibool try_drop,dict_err_ignore_t ignore_err)1069 dict_table_t *dict_table_open_on_name(
1070     const char *table_name,       /*!< in: table name */
1071     ibool dict_locked,            /*!< in: TRUE=data dictionary locked */
1072     ibool try_drop,               /*!< in: TRUE=try to drop any orphan
1073                                   indexes after an aborted online
1074                                   index creation */
1075     dict_err_ignore_t ignore_err) /*!< in: error to be ignored when
1076                                   loading a table definition */
1077 {
1078   dict_table_t *table;
1079   DBUG_TRACE;
1080   DBUG_PRINT("dict_table_open_on_name", ("table: '%s'", table_name));
1081 
1082   if (!dict_locked) {
1083     mutex_enter(&dict_sys->mutex);
1084   }
1085 
1086   ut_ad(table_name);
1087   ut_ad(mutex_own(&dict_sys->mutex));
1088 
1089   std::string table_str(table_name);
1090   /* Check and convert 5.7 table name. We always keep 8.0 format name in cache
1091   during upgrade. */
1092   if (dict_name::is_partition(table_name)) {
1093     dict_name::rebuild(table_str);
1094   }
1095   table = dict_table_check_if_in_cache_low(table_str.c_str());
1096 
1097   if (table == nullptr) {
1098     table = dict_load_table(table_name, true, ignore_err);
1099   }
1100 
1101   ut_ad(!table || table->cached);
1102 
1103   if (table != nullptr) {
1104     if (ignore_err == DICT_ERR_IGNORE_NONE && table->is_corrupted()) {
1105       /* Make life easy for drop table. */
1106       dict_table_prevent_eviction(table);
1107 
1108       if (!dict_locked) {
1109         mutex_exit(&dict_sys->mutex);
1110       }
1111 
1112       ib::info(ER_IB_MSG_175) << "Table " << table->name
1113                               << " is corrupted. Please drop the table"
1114                                  " and recreate it";
1115       return nullptr;
1116     }
1117 
1118     if (table->can_be_evicted) {
1119       dict_move_to_mru(table);
1120     }
1121 
1122     table->acquire();
1123   }
1124 
1125   ut_ad(dict_lru_validate());
1126 
1127   if (!dict_locked) {
1128     dict_table_try_drop_aborted_and_mutex_exit(table, try_drop);
1129   }
1130 
1131   return table;
1132 }
1133 #endif /* !UNIV_HOTBACKUP */
1134 
1135 /** Adds system columns to a table object. */
dict_table_add_system_columns(dict_table_t * table,mem_heap_t * heap)1136 void dict_table_add_system_columns(dict_table_t *table, /*!< in/out: table */
1137                                    mem_heap_t *heap) /*!< in: temporary heap */
1138 {
1139   ut_ad(table);
1140   ut_ad(table->n_def == (table->n_cols - table->get_n_sys_cols()));
1141   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
1142   ut_ad(!table->cached);
1143 
1144   /* NOTE: the system columns MUST be added in the following order
1145   (so that they can be indexed by the numerical value of DATA_ROW_ID,
1146   etc.) and as the last columns of the table memory object.
1147   The clustered index will not always physically contain all system
1148   columns.
1149   Intrinsic table don't need DB_ROLL_PTR as UNDO logging is turned off
1150   for these tables. */
1151 
1152   dict_mem_table_add_col(table, heap, "DB_ROW_ID", DATA_SYS,
1153                          DATA_ROW_ID | DATA_NOT_NULL, DATA_ROW_ID_LEN, false);
1154 
1155   dict_mem_table_add_col(table, heap, "DB_TRX_ID", DATA_SYS,
1156                          DATA_TRX_ID | DATA_NOT_NULL, DATA_TRX_ID_LEN, false);
1157 
1158   if (!table->is_intrinsic()) {
1159     dict_mem_table_add_col(table, heap, "DB_ROLL_PTR", DATA_SYS,
1160                            DATA_ROLL_PTR | DATA_NOT_NULL, DATA_ROLL_PTR_LEN,
1161                            false);
1162 
1163     /* This check reminds that if a new system column is added to
1164     the program, it should be dealt with here */
1165   }
1166 }
1167 
1168 #ifndef UNIV_HOTBACKUP
1169 /** Mark if table has big rows.
1170 @param[in,out]	table	table handler */
dict_table_set_big_rows(dict_table_t * table)1171 void dict_table_set_big_rows(dict_table_t *table) {
1172   ulint row_len = 0;
1173   for (ulint i = 0; i < table->n_def; i++) {
1174     ulint col_len = table->get_col(i)->get_max_size();
1175 
1176     row_len += col_len;
1177 
1178     /* If we have a single unbounded field, or several gigantic
1179     fields, mark the maximum row size as BIG_ROW_SIZE. */
1180     if (row_len >= BIG_ROW_SIZE || col_len >= BIG_ROW_SIZE) {
1181       row_len = BIG_ROW_SIZE;
1182 
1183       break;
1184     }
1185   }
1186 
1187   table->big_rows = (row_len >= BIG_ROW_SIZE) ? TRUE : FALSE;
1188 }
1189 
1190 /** Adds a table object to the dictionary cache.
1191 @param[in,out]	table		table
1192 @param[in]	can_be_evicted	true if can be evicted
1193 @param[in,out]	heap		temporary heap
1194 */
dict_table_add_to_cache(dict_table_t * table,ibool can_be_evicted,mem_heap_t * heap)1195 void dict_table_add_to_cache(dict_table_t *table, ibool can_be_evicted,
1196                              mem_heap_t *heap) {
1197   ulint fold;
1198   ulint id_fold;
1199 
1200   ut_ad(dict_lru_validate());
1201   ut_ad(mutex_own(&dict_sys->mutex));
1202 
1203   table->cached = true;
1204 
1205   fold = ut_fold_string(table->name.m_name);
1206   id_fold = ut_fold_ull(table->id);
1207 
1208   dict_table_set_big_rows(table);
1209 
1210   /* Look for a table with the same name: error if such exists */
1211   {
1212     dict_table_t *table2;
1213     HASH_SEARCH(name_hash, dict_sys->table_hash, fold, dict_table_t *, table2,
1214                 ut_ad(table2->cached),
1215                 !strcmp(table2->name.m_name, table->name.m_name));
1216     ut_a(table2 == nullptr);
1217 
1218 #ifdef UNIV_DEBUG
1219     /* Look for the same table pointer with a different name */
1220     HASH_SEARCH_ALL(name_hash, dict_sys->table_hash, dict_table_t *, table2,
1221                     ut_ad(table2->cached), table2 == table);
1222     ut_ad(table2 == nullptr);
1223 #endif /* UNIV_DEBUG */
1224   }
1225 
1226   /* Look for a table with the same id: error if such exists */
1227   {
1228     dict_table_t *table2;
1229     HASH_SEARCH(id_hash, dict_sys->table_id_hash, id_fold, dict_table_t *,
1230                 table2, ut_ad(table2->cached), table2->id == table->id);
1231     ut_a(table2 == nullptr);
1232 
1233 #ifdef UNIV_DEBUG
1234     /* Look for the same table pointer with a different id */
1235     HASH_SEARCH_ALL(id_hash, dict_sys->table_id_hash, dict_table_t *, table2,
1236                     ut_ad(table2->cached), table2 == table);
1237     ut_ad(table2 == nullptr);
1238 #endif /* UNIV_DEBUG */
1239   }
1240 
1241   /* Add table to hash table of tables */
1242   HASH_INSERT(dict_table_t, name_hash, dict_sys->table_hash, fold, table);
1243 
1244   /* Add table to hash table of tables based on table id */
1245   HASH_INSERT(dict_table_t, id_hash, dict_sys->table_id_hash, id_fold, table);
1246 
1247   table->can_be_evicted = can_be_evicted;
1248 
1249   if (table->can_be_evicted) {
1250     UT_LIST_ADD_FIRST(dict_sys->table_LRU, table);
1251   } else {
1252     UT_LIST_ADD_FIRST(dict_sys->table_non_LRU, table);
1253   }
1254 
1255   ut_ad(dict_lru_validate());
1256 
1257   table->dirty_status.store(METADATA_CLEAN);
1258 
1259   dict_sys->size +=
1260       mem_heap_get_size(table->heap) + strlen(table->name.m_name) + 1;
1261   DBUG_EXECUTE_IF(
1262       "dd_upgrade", if (srv_is_upgrade_mode && srv_upgrade_old_undo_found) {
1263         ib::info(ER_IB_MSG_176) << "Adding table to cache: " << table->name;
1264       });
1265 }
1266 
1267 /** Test whether a table can be evicted from the LRU cache.
1268  @return true if table can be evicted. */
dict_table_can_be_evicted(dict_table_t * table)1269 static ibool dict_table_can_be_evicted(
1270     dict_table_t *table) /*!< in: table to test */
1271 {
1272   ut_ad(mutex_own(&dict_sys->mutex));
1273   ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1274 
1275   ut_a(table->can_be_evicted);
1276   ut_a(table->foreign_set.empty());
1277   ut_a(table->referenced_set.empty());
1278 
1279   if (table->get_ref_count() == 0) {
1280     const dict_index_t *index;
1281 
1282     /* The transaction commit and rollback are called from
1283     outside the handler interface. This means that there is
1284     a window where the table->n_ref_count can be zero but
1285     the table instance is in "use". */
1286 
1287     if (lock_table_has_locks(table)) {
1288       return (FALSE);
1289     }
1290 
1291     for (index = table->first_index(); index != nullptr;
1292          index = index->next()) {
1293       const btr_search_t *info = btr_search_get_info(index);
1294 
1295       /* We are not allowed to free the in-memory index
1296       struct dict_index_t until all entries in the adaptive
1297       hash index that point to any of the page belonging to
1298       his b-tree index are dropped. This is so because
1299       dropping of these entries require access to
1300       dict_index_t struct. To avoid such scenario we keep
1301       a count of number of such pages in the search_info and
1302       only free the dict_index_t struct when this count
1303       drops to zero.
1304 
1305       See also: dict_index_remove_from_cache_low() */
1306 
1307       if (btr_search_info_get_ref_count(info, index) > 0) {
1308         return (FALSE);
1309       }
1310     }
1311 
1312     return (TRUE);
1313   }
1314 
1315   return (FALSE);
1316 }
1317 
1318 /** Make room in the table cache by evicting an unused table. The unused table
1319  should not be part of FK relationship and currently not used in any user
1320  transaction. There is no guarantee that it will remove a table.
1321  @return number of tables evicted. If the number of tables in the dict_LRU
1322  is less than max_tables it will not do anything. */
dict_make_room_in_cache(ulint max_tables,ulint pct_check)1323 ulint dict_make_room_in_cache(
1324     ulint max_tables, /*!< in: max tables allowed in cache */
1325     ulint pct_check)  /*!< in: max percent to check */
1326 {
1327   ulint i;
1328   ulint len;
1329   dict_table_t *table;
1330   ulint check_up_to;
1331   ulint n_evicted = 0;
1332 
1333   ut_a(pct_check > 0);
1334   ut_a(pct_check <= 100);
1335   ut_ad(mutex_own(&dict_sys->mutex));
1336   ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
1337   ut_ad(dict_lru_validate());
1338 
1339   i = len = UT_LIST_GET_LEN(dict_sys->table_LRU);
1340 
1341   if (len < max_tables) {
1342     return (0);
1343   }
1344 
1345   check_up_to = len - ((len * pct_check) / 100);
1346 
1347   /* Check for overflow */
1348   ut_a(i == 0 || check_up_to <= i);
1349 
1350   /* Find a suitable candidate to evict from the cache. Don't scan the
1351   entire LRU list. Only scan pct_check list entries. */
1352 
1353   for (table = UT_LIST_GET_LAST(dict_sys->table_LRU);
1354        table != nullptr && i > check_up_to && (len - n_evicted) > max_tables;
1355        --i) {
1356     dict_table_t *prev_table;
1357 
1358     prev_table = UT_LIST_GET_PREV(table_LRU, table);
1359 
1360     table->lock();
1361 
1362     if (dict_table_can_be_evicted(table)) {
1363       table->unlock();
1364       DBUG_EXECUTE_IF("crash_if_fts_table_is_evicted", {
1365         if (table->fts && dict_table_has_fts_index(table)) {
1366           ut_ad(0);
1367         }
1368       };);
1369       dict_table_remove_from_cache_low(table, TRUE);
1370 
1371       ++n_evicted;
1372     } else {
1373       table->unlock();
1374     }
1375 
1376     table = prev_table;
1377   }
1378 
1379   return (n_evicted);
1380 }
1381 
1382 /** Move a table to the non-LRU list from the LRU list. */
dict_table_move_from_lru_to_non_lru(dict_table_t * table)1383 void dict_table_move_from_lru_to_non_lru(
1384     dict_table_t *table) /*!< in: table to move from LRU to non-LRU */
1385 {
1386   ut_ad(mutex_own(&dict_sys->mutex));
1387   ut_ad(dict_lru_find_table(table));
1388 
1389   ut_a(table->can_be_evicted);
1390 
1391   UT_LIST_REMOVE(dict_sys->table_LRU, table);
1392 
1393   UT_LIST_ADD_LAST(dict_sys->table_non_LRU, table);
1394 
1395   table->can_be_evicted = FALSE;
1396 }
1397 #endif /* !UNIV_HOTBACKUP */
1398 
1399 /** Move a table to the LRU end from the non LRU list.
1400 @param[in]	table	InnoDB table object */
dict_table_move_from_non_lru_to_lru(dict_table_t * table)1401 void dict_table_move_from_non_lru_to_lru(dict_table_t *table) {
1402   ut_ad(mutex_own(&dict_sys->mutex));
1403   ut_ad(dict_non_lru_find_table(table));
1404 
1405   ut_a(!table->can_be_evicted);
1406 
1407   UT_LIST_REMOVE(dict_sys->table_non_LRU, table);
1408 
1409   UT_LIST_ADD_LAST(dict_sys->table_LRU, table);
1410 
1411   table->can_be_evicted = TRUE;
1412 }
1413 
1414 /** Look up an index in a table.
1415 @param[in]	table	table
1416 @param[in]	id	index identifier
1417 @return index
1418 @retval NULL if not found */
dict_table_find_index_on_id(const dict_table_t * table,const index_id_t & id)1419 static const dict_index_t *dict_table_find_index_on_id(
1420     const dict_table_t *table, const index_id_t &id) {
1421   for (const dict_index_t *index = table->first_index(); index != nullptr;
1422        index = index->next()) {
1423     if (index->space == id.m_space_id && index->id == id.m_index_id) {
1424       return (index);
1425     }
1426   }
1427 
1428   return (nullptr);
1429 }
1430 
1431 #ifndef UNIV_HOTBACKUP
1432 /** Look up an index.
1433 @param[in]	id	index identifier
1434 @return index or NULL if not found */
dict_index_find(const index_id_t & id)1435 const dict_index_t *dict_index_find(const index_id_t &id) {
1436   const dict_table_t *table;
1437 
1438   ut_ad(mutex_own(&dict_sys->mutex));
1439 
1440   for (table = UT_LIST_GET_FIRST(dict_sys->table_LRU); table != nullptr;
1441        table = UT_LIST_GET_NEXT(table_LRU, table)) {
1442     const dict_index_t *index = dict_table_find_index_on_id(table, id);
1443     if (index != nullptr) {
1444       return (index);
1445     }
1446   }
1447 
1448   for (table = UT_LIST_GET_FIRST(dict_sys->table_non_LRU); table != nullptr;
1449        table = UT_LIST_GET_NEXT(table_LRU, table)) {
1450     const dict_index_t *index = dict_table_find_index_on_id(table, id);
1451     if (index != nullptr) {
1452       return (index);
1453     }
1454   }
1455 
1456   return (nullptr);
1457 }
1458 
1459 /** Function object to remove a foreign key constraint from the
1460 referenced_set of the referenced table.  The foreign key object is
1461 also removed from the dictionary cache.  The foreign key constraint
1462 is not removed from the foreign_set of the table containing the
1463 constraint. */
1464 struct dict_foreign_remove_partial {
operator ()dict_foreign_remove_partial1465   void operator()(dict_foreign_t *foreign) {
1466     dict_table_t *table = foreign->referenced_table;
1467     if (table != nullptr) {
1468       table->referenced_set.erase(foreign);
1469     }
1470     dict_foreign_free(foreign);
1471   }
1472 };
1473 
1474 /** Renames a table object.
1475  @return true if success */
dict_table_rename_in_cache(dict_table_t * table,const char * new_name,ibool rename_also_foreigns)1476 dberr_t dict_table_rename_in_cache(
1477     dict_table_t *table,        /*!< in/out: table */
1478     const char *new_name,       /*!< in: new name */
1479     ibool rename_also_foreigns) /*!< in: in ALTER TABLE we want
1480                            to preserve the original table name
1481                            in constraints which reference it */
1482 {
1483   dberr_t err;
1484   dict_foreign_t *foreign;
1485   dict_index_t *index;
1486   ulint fold;
1487   char old_name[MAX_FULL_NAME_LEN + 1];
1488 
1489   ut_ad(mutex_own(&dict_sys->mutex));
1490 
1491   /* store the old/current name to an automatic variable */
1492   if (strlen(table->name.m_name) + 1 <= sizeof(old_name)) {
1493     strcpy(old_name, table->name.m_name);
1494   } else {
1495     ib::fatal(ER_IB_MSG_177) << "Too long table name: " << table->name
1496                              << ", max length is " << MAX_FULL_NAME_LEN;
1497   }
1498 
1499   fold = ut_fold_string(new_name);
1500 
1501   /* Look for a table with the same name: error if such exists */
1502   dict_table_t *table2;
1503   HASH_SEARCH(name_hash, dict_sys->table_hash, fold, dict_table_t *, table2,
1504               ut_ad(table2->cached),
1505               (ut_strcmp(table2->name.m_name, new_name) == 0));
1506 
1507   DBUG_EXECUTE_IF(
1508       "dict_table_rename_in_cache_failure",
1509       if (table2 == nullptr) { table2 = (dict_table_t *)-1; });
1510 
1511   if (table2 != nullptr) {
1512     ib::error(ER_IB_MSG_178)
1513         << "Cannot rename table '" << old_name << "' to '" << new_name
1514         << "' since the"
1515            " dictionary cache already contains '"
1516         << new_name << "'.";
1517 
1518     return (DB_ERROR);
1519   }
1520 
1521   /* If the table is stored in a single-table tablespace,
1522   rename the tablespace file. */
1523 
1524   if (dict_table_is_discarded(table)) {
1525     char *filepath;
1526 
1527     ut_ad(dict_table_is_file_per_table(table));
1528     ut_ad(!table->is_temporary());
1529 
1530     /* Make sure the data_dir_path is set. */
1531     dd_get_and_save_data_dir_path<dd::Table>(table, nullptr, true);
1532 
1533     std::string path = dict_table_get_datadir(table);
1534 
1535     filepath = Fil_path::make(path, table->name.m_name, IBD, true);
1536 
1537     if (filepath == nullptr) {
1538       return (DB_OUT_OF_MEMORY);
1539     }
1540 
1541     err = fil_delete_tablespace(table->space, BUF_REMOVE_ALL_NO_WRITE);
1542 
1543     ut_a(err == DB_SUCCESS || err == DB_TABLESPACE_NOT_FOUND ||
1544          err == DB_IO_ERROR);
1545 
1546     if (err == DB_IO_ERROR) {
1547       ib::info(ER_IB_MSG_179) << "IO error while deleting: " << table->space
1548                               << " during rename of '" << old_name << "' to"
1549                               << " '" << new_name << "'";
1550     }
1551 
1552     /* Delete any temp file hanging around. */
1553     os_file_type_t ftype;
1554     bool exists;
1555     if (os_file_status(filepath, &exists, &ftype) && exists &&
1556         !os_file_delete_if_exists(innodb_temp_file_key, filepath, nullptr)) {
1557       ib::info(ER_IB_MSG_180) << "Delete of " << filepath << " failed.";
1558     }
1559 
1560     ut_free(filepath);
1561 
1562   } else if (dict_table_is_file_per_table(table)) {
1563     char *new_path = nullptr;
1564     char *old_path = fil_space_get_first_path(table->space);
1565 
1566     ut_ad(!table->is_temporary());
1567 
1568     if (DICT_TF_HAS_DATA_DIR(table->flags)) {
1569       std::string new_ibd;
1570 
1571       new_ibd = Fil_path::make_new_path(old_path, new_name, IBD);
1572 
1573       new_path = mem_strdup(new_ibd.c_str());
1574 
1575       /* InnoDB adds the db directory to the data directory.
1576       If the RENAME changes database, then it is possible that
1577       the a directory named for the new db does not exist
1578       in this remote location. */
1579       err = os_file_create_subdirs_if_needed(new_path);
1580       if (err != DB_SUCCESS) {
1581         ut_free(old_path);
1582         ut_free(new_path);
1583         return (err);
1584       }
1585     } else {
1586       new_path = Fil_path::make_ibd_from_table_name(new_name);
1587     }
1588 
1589     /* New filepath must not exist. */
1590     err = fil_rename_tablespace_check(table->space, old_path, new_path, false);
1591     if (err != DB_SUCCESS) {
1592       ut_free(old_path);
1593       ut_free(new_path);
1594       return (err);
1595     }
1596 
1597     clone_mark_abort(true);
1598 
1599     std::string new_tablespace_name(new_name);
1600     dict_name::convert_to_space(new_tablespace_name);
1601 
1602     dberr_t err = fil_rename_tablespace(table->space, old_path,
1603                                         new_tablespace_name.c_str(), new_path);
1604 
1605     clone_mark_active();
1606 
1607     ut_free(old_path);
1608     ut_free(new_path);
1609 
1610     if (err != DB_SUCCESS) {
1611       return (err);
1612     }
1613   }
1614 
1615   err = log_ddl->write_rename_table_log(table, new_name, table->name.m_name);
1616   if (err != DB_SUCCESS) {
1617     return (err);
1618   }
1619 
1620   /* Remove table from the hash tables of tables */
1621   HASH_DELETE(dict_table_t, name_hash, dict_sys->table_hash,
1622               ut_fold_string(old_name), table);
1623 
1624   if (strlen(new_name) > strlen(table->name.m_name)) {
1625     /* We allocate MAX_FULL_NAME_LEN + 1 bytes here to avoid
1626     memory fragmentation, we assume a repeated calls of
1627     ut_realloc() with the same size do not cause fragmentation */
1628     ut_a(strlen(new_name) <= MAX_FULL_NAME_LEN);
1629 
1630     table->name.m_name = static_cast<char *>(
1631         ut_realloc(table->name.m_name, MAX_FULL_NAME_LEN + 1));
1632   }
1633   strcpy(table->name.m_name, new_name);
1634 
1635   /* Add table to hash table of tables */
1636   HASH_INSERT(dict_table_t, name_hash, dict_sys->table_hash, fold, table);
1637 
1638   dict_sys->size += strlen(new_name) - strlen(old_name);
1639   ut_a(dict_sys->size > 0);
1640 
1641   /* Update the table_name field in indexes */
1642   for (index = table->first_index(); index != nullptr; index = index->next()) {
1643     index->table_name = table->name.m_name;
1644   }
1645 
1646   if (!rename_also_foreigns) {
1647     /* In ALTER TABLE we think of the rename table operation
1648     in the direction table -> temporary table (#sql...)
1649     as dropping the table with the old name and creating
1650     a new with the new name. Thus we kind of drop the
1651     constraints from the dictionary cache here. The foreign key
1652     constraints will be inherited to the new table from the
1653     system tables through a call of dict_load_foreigns. */
1654 
1655     /* Remove the foreign constraints from the cache */
1656     std::for_each(table->foreign_set.begin(), table->foreign_set.end(),
1657                   dict_foreign_remove_partial());
1658     table->foreign_set.clear();
1659 
1660     /* Reset table field in referencing constraints */
1661     for (dict_foreign_set::iterator it = table->referenced_set.begin();
1662          it != table->referenced_set.end(); ++it) {
1663       foreign = *it;
1664       foreign->referenced_table = nullptr;
1665       foreign->referenced_index = nullptr;
1666     }
1667 
1668     /* Make the set of referencing constraints empty */
1669     table->referenced_set.clear();
1670 
1671     return (DB_SUCCESS);
1672   }
1673 
1674   /* Update the table name fields in foreign constraints, and update also
1675   the constraint id of new format >= 4.0.18 constraints. Note that at
1676   this point we have already changed table->name to the new name. */
1677 
1678   dict_foreign_set fk_set;
1679 
1680   for (;;) {
1681     dict_foreign_set::iterator it = table->foreign_set.begin();
1682 
1683     if (it == table->foreign_set.end()) {
1684       break;
1685     }
1686 
1687     foreign = *it;
1688 
1689     if (foreign->referenced_table) {
1690       foreign->referenced_table->referenced_set.erase(foreign);
1691     }
1692 
1693     if (ut_strlen(foreign->foreign_table_name) <
1694         ut_strlen(table->name.m_name)) {
1695       /* Allocate a longer name buffer;
1696       TODO: store buf len to save memory */
1697 
1698       foreign->foreign_table_name =
1699           mem_heap_strdup(foreign->heap, table->name.m_name);
1700       dict_mem_foreign_table_name_lookup_set(foreign, TRUE);
1701     } else {
1702       strcpy(foreign->foreign_table_name, table->name.m_name);
1703       dict_mem_foreign_table_name_lookup_set(foreign, FALSE);
1704     }
1705     if (strchr(foreign->id, '/')) {
1706       /* This is a >= 4.0.18 format id */
1707 
1708       ulint db_len;
1709       char *old_id;
1710       char old_name_cs_filename[MAX_FULL_NAME_LEN + 1];
1711       uint errors = 0;
1712 
1713       /* All table names are internally stored in charset
1714       my_charset_filename (except the temp tables and the
1715       partition identifier suffix in partition tables). The
1716       foreign key constraint names are internally stored
1717       in UTF-8 charset.  The variable fkid here is used
1718       to store foreign key constraint name in charset
1719       my_charset_filename for comparison further below. */
1720       char fkid[MAX_TABLE_NAME_LEN + 20];
1721       ibool on_tmp = FALSE;
1722 
1723       /* The old table name in my_charset_filename is stored
1724       in old_name_cs_filename */
1725 
1726       strncpy(old_name_cs_filename, old_name, sizeof(old_name_cs_filename));
1727       if (strstr(old_name, TEMP_TABLE_PATH_PREFIX) == nullptr) {
1728         innobase_convert_to_system_charset(
1729             strchr(old_name_cs_filename, '/') + 1, strchr(old_name, '/') + 1,
1730             MAX_TABLE_NAME_LEN, &errors);
1731 
1732         if (errors) {
1733           /* There has been an error to convert
1734           old table into UTF-8.  This probably
1735           means that the old table name is
1736           actually in UTF-8. */
1737           innobase_convert_to_filename_charset(
1738               strchr(old_name_cs_filename, '/') + 1, strchr(old_name, '/') + 1,
1739               MAX_TABLE_NAME_LEN);
1740         } else {
1741           /* Old name already in
1742           my_charset_filename */
1743           strncpy(old_name_cs_filename, old_name, sizeof(old_name_cs_filename));
1744         }
1745       }
1746 
1747       strncpy(fkid, foreign->id, MAX_TABLE_NAME_LEN);
1748 
1749       if (strstr(fkid, TEMP_TABLE_PATH_PREFIX) == nullptr) {
1750         innobase_convert_to_filename_charset(strchr(fkid, '/') + 1,
1751                                              strchr(foreign->id, '/') + 1,
1752                                              MAX_TABLE_NAME_LEN + 20);
1753       } else {
1754         on_tmp = TRUE;
1755       }
1756 
1757       old_id = mem_strdup(foreign->id);
1758 
1759       if (ut_strlen(fkid) >
1760               ut_strlen(old_name_cs_filename) + ((sizeof dict_ibfk) - 1) &&
1761           !memcmp(fkid, old_name_cs_filename,
1762                   ut_strlen(old_name_cs_filename)) &&
1763           !memcmp(fkid + ut_strlen(old_name_cs_filename), dict_ibfk,
1764                   (sizeof dict_ibfk) - 1)) {
1765         /* This is a generated >= 4.0.18 format id */
1766 
1767         char table_name[MAX_TABLE_NAME_LEN + 1] = "";
1768         uint errors = 0;
1769 
1770         if (strlen(table->name.m_name) > strlen(old_name)) {
1771           foreign->id = static_cast<char *>(mem_heap_alloc(
1772               foreign->heap, strlen(table->name.m_name) + strlen(old_id) + 1));
1773         }
1774 
1775         /* Convert the table name to UTF-8 */
1776         strncpy(table_name, table->name.m_name, MAX_TABLE_NAME_LEN);
1777         innobase_convert_to_system_charset(strchr(table_name, '/') + 1,
1778                                            strchr(table->name.m_name, '/') + 1,
1779                                            MAX_TABLE_NAME_LEN, &errors);
1780 
1781         if (errors) {
1782           /* Table name could not be converted
1783           from charset my_charset_filename to
1784           UTF-8. This means that the table name
1785           is already in UTF-8 (#mysql#50). */
1786           strncpy(table_name, table->name.m_name, MAX_TABLE_NAME_LEN);
1787         }
1788 
1789         /* Replace the prefix 'databasename/tablename'
1790         with the new names */
1791         strcpy(foreign->id, table_name);
1792         if (on_tmp) {
1793           strcat(foreign->id, old_id + ut_strlen(old_name));
1794         } else {
1795           sprintf(strchr(foreign->id, '/') + 1, "%s%s",
1796                   strchr(table_name, '/') + 1, strstr(old_id, "_ibfk_"));
1797         }
1798 
1799       } else {
1800         /* This is a >= 4.0.18 format id where the user
1801         gave the id name */
1802         db_len = dict_get_db_name_len(table->name.m_name) + 1;
1803 
1804         if (db_len - 1 > dict_get_db_name_len(foreign->id)) {
1805           foreign->id = static_cast<char *>(
1806               mem_heap_alloc(foreign->heap, db_len + strlen(old_id) + 1));
1807         }
1808 
1809         /* Replace the database prefix in id with the
1810         one from table->name */
1811 
1812         ut_memcpy(foreign->id, table->name.m_name, db_len);
1813 
1814         strcpy(foreign->id + db_len, dict_remove_db_name(old_id));
1815       }
1816 
1817       ut_free(old_id);
1818     }
1819 
1820     table->foreign_set.erase(it);
1821     fk_set.insert(foreign);
1822 
1823     if (foreign->referenced_table) {
1824       foreign->referenced_table->referenced_set.insert(foreign);
1825     }
1826   }
1827 
1828   ut_a(table->foreign_set.empty());
1829   table->foreign_set.swap(fk_set);
1830 
1831   for (dict_foreign_set::iterator it = table->referenced_set.begin();
1832        it != table->referenced_set.end(); ++it) {
1833     foreign = *it;
1834 
1835     if (ut_strlen(foreign->referenced_table_name) <
1836         ut_strlen(table->name.m_name)) {
1837       /* Allocate a longer name buffer;
1838       TODO: store buf len to save memory */
1839 
1840       foreign->referenced_table_name =
1841           mem_heap_strdup(foreign->heap, table->name.m_name);
1842 
1843       dict_mem_referenced_table_name_lookup_set(foreign, TRUE);
1844     } else {
1845       /* Use the same buffer */
1846       strcpy(foreign->referenced_table_name, table->name.m_name);
1847 
1848       dict_mem_referenced_table_name_lookup_set(foreign, FALSE);
1849     }
1850   }
1851 
1852   return (DB_SUCCESS);
1853 }
1854 
1855 /** Change the id of a table object in the dictionary cache. This is used in
1856  DISCARD TABLESPACE. */
dict_table_change_id_in_cache(dict_table_t * table,table_id_t new_id)1857 void dict_table_change_id_in_cache(
1858     dict_table_t *table, /*!< in/out: table object already in cache */
1859     table_id_t new_id)   /*!< in: new id to set */
1860 {
1861   ut_ad(table);
1862   ut_ad(mutex_own(&dict_sys->mutex));
1863   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
1864 
1865   /* Remove the table from the hash table of id's */
1866 
1867   HASH_DELETE(dict_table_t, id_hash, dict_sys->table_id_hash,
1868               ut_fold_ull(table->id), table);
1869   table->id = new_id;
1870 
1871   /* Add the table back to the hash table */
1872   HASH_INSERT(dict_table_t, id_hash, dict_sys->table_id_hash,
1873               ut_fold_ull(table->id), table);
1874 }
1875 
1876 /** Removes a table object from the dictionary cache. */
dict_table_remove_from_cache_low(dict_table_t * table,ibool lru_evict)1877 static void dict_table_remove_from_cache_low(
1878     dict_table_t *table, /*!< in, own: table */
1879     ibool lru_evict)     /*!< in: TRUE if table being evicted
1880                          to make room in the table LRU list */
1881 {
1882   dict_foreign_t *foreign;
1883   dict_index_t *index;
1884   lint size;
1885 
1886   ut_ad(table);
1887   ut_ad(dict_lru_validate());
1888   ut_a(table->get_ref_count() == 0);
1889   ut_a(table->n_rec_locks.load() == 0);
1890   ut_ad(mutex_own(&dict_sys->mutex));
1891   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
1892 
1893   /* We first dirty read the status which could be changed from
1894   METADATA_DIRTY to METADATA_BUFFERED by checkpoint, and check again
1895   when persistence is necessary */
1896   switch (table->dirty_status.load()) {
1897     case METADATA_DIRTY:
1898       /* Write back the dirty metadata to DDTableBuffer */
1899       dict_table_persist_to_dd_table_buffer(table);
1900       ut_ad(table->dirty_status.load() != METADATA_DIRTY);
1901       /* Fall through */
1902     case METADATA_BUFFERED:
1903       /* We have to remove it away here, since it's evicted.
1904       And we will add it again once it's re-loaded if possible */
1905       mutex_enter(&dict_persist->mutex);
1906       ut_ad(table->in_dirty_dict_tables_list);
1907       UT_LIST_REMOVE(dict_persist->dirty_dict_tables, table);
1908       mutex_exit(&dict_persist->mutex);
1909       break;
1910     case METADATA_CLEAN:
1911       break;
1912   }
1913 
1914   /* Remove the foreign constraints from the cache */
1915   std::for_each(table->foreign_set.begin(), table->foreign_set.end(),
1916                 dict_foreign_remove_partial());
1917   table->foreign_set.clear();
1918 
1919   /* Reset table field in referencing constraints */
1920   for (dict_foreign_set::iterator it = table->referenced_set.begin();
1921        it != table->referenced_set.end(); ++it) {
1922     foreign = *it;
1923     foreign->referenced_table = nullptr;
1924     foreign->referenced_index = nullptr;
1925   }
1926 
1927   /* Remove the indexes from the cache */
1928 
1929   for (index = UT_LIST_GET_LAST(table->indexes); index != nullptr;
1930        index = UT_LIST_GET_LAST(table->indexes)) {
1931     dict_index_remove_from_cache_low(table, index, lru_evict);
1932   }
1933 
1934   /* Remove table from the hash tables of tables */
1935 
1936   HASH_DELETE(dict_table_t, name_hash, dict_sys->table_hash,
1937               ut_fold_string(table->name.m_name), table);
1938 
1939   HASH_DELETE(dict_table_t, id_hash, dict_sys->table_id_hash,
1940               ut_fold_ull(table->id), table);
1941 
1942   /* Remove table from LRU or non-LRU list. */
1943   if (table->can_be_evicted) {
1944     ut_ad(dict_lru_find_table(table));
1945     UT_LIST_REMOVE(dict_sys->table_LRU, table);
1946   } else {
1947     ut_ad(dict_non_lru_find_table(table));
1948     UT_LIST_REMOVE(dict_sys->table_non_LRU, table);
1949   }
1950 
1951   ut_ad(dict_lru_validate());
1952 
1953   /* Free virtual column template if any */
1954   if (table->vc_templ != nullptr) {
1955     dict_free_vc_templ(table->vc_templ);
1956     UT_DELETE(table->vc_templ);
1957   }
1958 
1959   size = mem_heap_get_size(table->heap) + strlen(table->name.m_name) + 1;
1960 
1961   ut_ad(dict_sys->size >= size);
1962 
1963   dict_sys->size -= size;
1964 
1965   dict_mem_table_free(table);
1966 }
1967 
1968 /** Removes a table object from the dictionary cache. */
dict_table_remove_from_cache(dict_table_t * table)1969 void dict_table_remove_from_cache(dict_table_t *table) /*!< in, own: table */
1970 {
1971   dict_table_remove_from_cache_low(table, FALSE);
1972 }
1973 
1974 /** Try to invalidate an entry from the dict cache, for a partitioned table,
1975 if any table found.
1976 @param[in]	name	Table name */
dict_partitioned_table_remove_from_cache(const char * name)1977 void dict_partitioned_table_remove_from_cache(const char *name) {
1978   ut_ad(mutex_own(&dict_sys->mutex));
1979 
1980   size_t name_len = strlen(name);
1981 
1982   for (uint32_t i = 0; i < hash_get_n_cells(dict_sys->table_id_hash); ++i) {
1983     dict_table_t *table;
1984 
1985     table =
1986         static_cast<dict_table_t *>(HASH_GET_FIRST(dict_sys->table_hash, i));
1987 
1988     while (table != nullptr) {
1989       dict_table_t *prev_table = table;
1990 
1991       table = static_cast<dict_table_t *>(HASH_GET_NEXT(name_hash, prev_table));
1992       ut_ad(prev_table->magic_n == DICT_TABLE_MAGIC_N);
1993 
1994       if (prev_table->is_dd_table) {
1995         continue;
1996       }
1997 
1998       if ((strncmp(name, prev_table->name.m_name, name_len) == 0) &&
1999           dict_table_is_partition(prev_table)) {
2000         btr_drop_ahi_for_table(prev_table);
2001         dict_table_remove_from_cache(prev_table);
2002       }
2003     }
2004   }
2005 }
2006 
2007 #ifdef UNIV_DEBUG
2008 /** Removes a table object from the dictionary cache, for debug purpose
2009 @param[in,out]	table		table object
2010 @param[in]	lru_evict	true if table being evicted to make room
2011                                 in the table LRU list */
dict_table_remove_from_cache_debug(dict_table_t * table,bool lru_evict)2012 void dict_table_remove_from_cache_debug(dict_table_t *table, bool lru_evict) {
2013   dict_table_remove_from_cache_low(table, lru_evict);
2014 }
2015 #endif /* UNIV_DEBUG */
2016 
2017 /** If the given column name is reserved for InnoDB system columns, return
2018  TRUE.
2019  @return true if name is reserved */
dict_col_name_is_reserved(const char * name)2020 ibool dict_col_name_is_reserved(const char *name) /*!< in: column name */
2021 {
2022 /* This check reminds that if a new system column is added to
2023 the program, it should be dealt with here. */
2024 #if DATA_N_SYS_COLS != 3
2025 #error "DATA_N_SYS_COLS != 3"
2026 #endif
2027 
2028   static const char *reserved_names[] = {"DB_ROW_ID", "DB_TRX_ID",
2029                                          "DB_ROLL_PTR"};
2030 
2031   ulint i;
2032 
2033   for (i = 0; i < UT_ARR_SIZE(reserved_names); i++) {
2034     if (innobase_strcasecmp(name, reserved_names[i]) == 0) {
2035       return (TRUE);
2036     }
2037   }
2038 
2039   return (FALSE);
2040 }
2041 
2042 /** Return maximum size of the node pointer record.
2043  @return maximum size of the record in bytes */
dict_index_node_ptr_max_size(const dict_index_t * index)2044 ulint dict_index_node_ptr_max_size(const dict_index_t *index) /*!< in: index */
2045 {
2046   ulint comp;
2047   ulint i;
2048   /* maximum possible storage size of a record */
2049   ulint rec_max_size;
2050 
2051   if (dict_index_is_ibuf(index)) {
2052     /* cannot estimate accurately */
2053     /* This is universal index for change buffer.
2054     The max size of the entry is about max key length * 2.
2055     (index key + primary key to be inserted to the index)
2056     (The max key length is UNIV_PAGE_SIZE / 16 * 3 at
2057      ha_innobase::max_supported_key_length(),
2058      considering MAX_KEY_LENGTH = 3072 at MySQL imposes
2059      the 3500 historical InnoDB value for 16K page size case.)
2060     For the universal index, node_ptr contains most of the entry.
2061     And 512 is enough to contain ibuf columns and meta-data */
2062     return (UNIV_PAGE_SIZE / 8 * 3 + 512);
2063   }
2064 
2065   comp = dict_table_is_comp(index->table);
2066 
2067   /* Each record has page_no, length of page_no and header. */
2068   rec_max_size = comp ? REC_NODE_PTR_SIZE + 1 + REC_N_NEW_EXTRA_BYTES
2069                       : REC_NODE_PTR_SIZE + 2 + REC_N_OLD_EXTRA_BYTES;
2070 
2071   if (comp) {
2072     /* Include the "null" flags in the
2073     maximum possible record size. */
2074     rec_max_size += UT_BITS_IN_BYTES(index->n_nullable);
2075   } else {
2076     /* For each column, include a 2-byte offset and a
2077     "null" flag. */
2078     rec_max_size += 2 * index->n_fields;
2079   }
2080 
2081   /* Compute the maximum possible record size. */
2082   for (i = 0; i < dict_index_get_n_unique_in_tree(index); i++) {
2083     const dict_field_t *field = index->get_field(i);
2084     const dict_col_t *col = field->col;
2085     ulint field_max_size;
2086     ulint field_ext_max_size;
2087 
2088     /* Determine the maximum length of the index field. */
2089 
2090     field_max_size = col->get_fixed_size(comp);
2091     if (field_max_size) {
2092       /* dict_index_add_col() should guarantee this */
2093       ut_ad(!field->prefix_len || field->fixed_len == field->prefix_len);
2094       /* Fixed lengths are not encoded
2095       in ROW_FORMAT=COMPACT. */
2096       rec_max_size += field_max_size;
2097       continue;
2098     }
2099 
2100     field_max_size = col->get_max_size();
2101     field_ext_max_size = field_max_size < 256 ? 1 : 2;
2102 
2103     if (field->prefix_len && field->prefix_len < field_max_size) {
2104       field_max_size = field->prefix_len;
2105     }
2106 
2107     if (comp) {
2108       /* Add the extra size for ROW_FORMAT=COMPACT.
2109       For ROW_FORMAT=REDUNDANT, these bytes were
2110       added to rec_max_size before this loop. */
2111       rec_max_size += field_ext_max_size;
2112     }
2113 
2114     rec_max_size += field_max_size;
2115   }
2116 
2117   return (rec_max_size);
2118 }
2119 
2120 /** If a record of this index might not fit on a single B-tree page,
2121  return TRUE.
2122  @return true if the index record could become too big */
dict_index_too_big_for_tree(const dict_table_t * table,const dict_index_t * new_index,bool strict)2123 static bool dict_index_too_big_for_tree(
2124     const dict_table_t *table,     /*!< in: table */
2125     const dict_index_t *new_index, /*!< in: index */
2126     bool strict)                   /*!< in: TRUE=report error if
2127                                    records could be too big to
2128                                    fit in an B-tree page */
2129 {
2130   ulint comp;
2131   ulint i;
2132   /* maximum possible storage size of a record */
2133   ulint rec_max_size;
2134   /* maximum allowed size of a record on a leaf page */
2135   ulint page_rec_max;
2136   /* maximum allowed size of a node pointer record */
2137   ulint page_ptr_max;
2138 
2139   /* FTS index consists of auxiliary tables, they shall be excluded from
2140   index row size check */
2141   if (new_index->type & DICT_FTS) {
2142     return (false);
2143   }
2144 
2145   DBUG_EXECUTE_IF("ib_force_create_table", return (FALSE););
2146 
2147   comp = dict_table_is_comp(table);
2148 
2149   const page_size_t page_size(dict_table_page_size(table));
2150 
2151   if (page_size.is_compressed() &&
2152       page_size.physical() < univ_page_size.physical()) {
2153     /* On a compressed page, two records must fit in the
2154     uncompressed page modification log. On compressed pages
2155     with size.physical() == univ_page_size.physical(),
2156     this limit will never be reached. */
2157     ut_ad(comp);
2158     /* The maximum allowed record size is the size of
2159     an empty page, minus a byte for recoding the heap
2160     number in the page modification log.  The maximum
2161     allowed node pointer size is half that. */
2162     page_rec_max =
2163         page_zip_empty_size(new_index->n_fields, page_size.physical());
2164     if (page_rec_max) {
2165       page_rec_max--;
2166     }
2167     page_ptr_max = page_rec_max / 2;
2168     /* On a compressed page, there is a two-byte entry in
2169     the dense page directory for every record.  But there
2170     is no record header. */
2171     rec_max_size = 2;
2172   } else {
2173     /* The maximum allowed record size is half a B-tree
2174     page(16k for 64k page size).  No additional sparse
2175     page directory entry will be generated for the first
2176     few user records. */
2177     page_rec_max = srv_page_size == UNIV_PAGE_SIZE_MAX
2178                        ? REC_MAX_DATA_SIZE - 1
2179                        : page_get_free_space_of_empty(comp) / 2;
2180     page_ptr_max = page_rec_max;
2181     /* Each record has a header. */
2182     rec_max_size = comp ? REC_N_NEW_EXTRA_BYTES : REC_N_OLD_EXTRA_BYTES;
2183   }
2184 
2185   if (comp) {
2186     /* Include the "null" flags in the
2187     maximum possible record size. */
2188     rec_max_size += UT_BITS_IN_BYTES(new_index->n_nullable);
2189   } else {
2190     /* For each column, include a 2-byte offset and a
2191     "null" flag.  The 1-byte format is only used in short
2192     records that do not contain externally stored columns.
2193     Such records could never exceed the page limit, even
2194     when using the 2-byte format. */
2195     rec_max_size += 2 * new_index->n_fields;
2196   }
2197 
2198   /* Compute the maximum possible record size. */
2199   for (i = 0; i < new_index->n_fields; i++) {
2200     const dict_field_t *field = new_index->get_field(i);
2201     const dict_col_t *col = field->col;
2202     ulint field_max_size;
2203     ulint field_ext_max_size;
2204 
2205     /* In dtuple_convert_big_rec(), variable-length columns
2206     that are longer than BTR_EXTERN_LOCAL_STORED_MAX_SIZE
2207     may be chosen for external storage.
2208 
2209     Fixed-length columns, and all columns of secondary
2210     index records are always stored inline. */
2211 
2212     /* Determine the maximum length of the index field.
2213     The field_ext_max_size should be computed as the worst
2214     case in rec_get_converted_size_comp() for
2215     REC_STATUS_ORDINARY records. */
2216 
2217     field_max_size = col->get_fixed_size(comp);
2218     if (field_max_size && field->fixed_len != 0) {
2219       /* dict_index_add_col() should guarantee this */
2220       ut_ad(!field->prefix_len || field->fixed_len == field->prefix_len);
2221       /* Fixed lengths are not encoded
2222       in ROW_FORMAT=COMPACT. */
2223       field_ext_max_size = 0;
2224       goto add_field_size;
2225     }
2226 
2227     field_max_size = col->get_max_size();
2228     field_ext_max_size = field_max_size < 256 ? 1 : 2;
2229 
2230     if (field->prefix_len) {
2231       if (field->prefix_len < field_max_size) {
2232         field_max_size = field->prefix_len;
2233       }
2234     } else if (field_max_size > BTR_EXTERN_LOCAL_STORED_MAX_SIZE &&
2235                new_index->is_clustered()) {
2236       /* In the worst case, we have a locally stored
2237       column of BTR_EXTERN_LOCAL_STORED_MAX_SIZE bytes.
2238       The length can be stored in one byte.  If the
2239       column were stored externally, the lengths in
2240       the clustered index page would be
2241       BTR_EXTERN_FIELD_REF_SIZE and 2. */
2242       field_max_size = BTR_EXTERN_LOCAL_STORED_MAX_SIZE;
2243       field_ext_max_size = 1;
2244     }
2245 
2246     if (comp) {
2247       /* Add the extra size for ROW_FORMAT=COMPACT.
2248       For ROW_FORMAT=REDUNDANT, these bytes were
2249       added to rec_max_size before this loop. */
2250       rec_max_size += field_ext_max_size;
2251     }
2252   add_field_size:
2253     rec_max_size += field_max_size;
2254 
2255     /* Check the size limit on leaf pages. */
2256     if (rec_max_size >= page_rec_max) {
2257       ib::error_or_warn(strict)
2258           << "Cannot add field " << field->name << " in table " << table->name
2259           << " because after adding it, the row size is " << rec_max_size
2260           << " which is greater than maximum allowed"
2261              " size ("
2262           << page_rec_max << ") for a record on index leaf page.";
2263 
2264       return (true);
2265     }
2266 
2267     /* Check the size limit on non-leaf pages.  Records
2268     stored in non-leaf B-tree pages consist of the unique
2269     columns of the record (the key columns of the B-tree)
2270     and a node pointer field.  When we have processed the
2271     unique columns, rec_max_size equals the size of the
2272     node pointer record minus the node pointer column. */
2273     if (i + 1 == dict_index_get_n_unique_in_tree(new_index) &&
2274         rec_max_size + REC_NODE_PTR_SIZE >= page_ptr_max) {
2275       return (true);
2276     }
2277   }
2278 
2279   return (false);
2280 }
2281 
2282 /** Adds an index to the dictionary cache.
2283 @param[in,out]	table	table on which the index is
2284 @param[in,out]	index	index; NOTE! The index memory
2285                         object is freed in this function!
2286 @param[in]	page_no	root page number of the index
2287 @param[in]	strict	TRUE=refuse to create the index
2288                         if records could be too big to fit in
2289                         an B-tree page
2290 @return DB_SUCCESS, DB_TOO_BIG_RECORD, or DB_CORRUPTION */
dict_index_add_to_cache(dict_table_t * table,dict_index_t * index,page_no_t page_no,ibool strict)2291 dberr_t dict_index_add_to_cache(dict_table_t *table, dict_index_t *index,
2292                                 page_no_t page_no, ibool strict) {
2293   ut_ad(!mutex_own(&dict_sys->mutex));
2294   return (
2295       dict_index_add_to_cache_w_vcol(table, index, nullptr, page_no, strict));
2296 }
2297 
2298 /** Clears the virtual column's index list before index is being freed.
2299 @param[in]  index   Index being freed */
dict_index_remove_from_v_col_list(dict_index_t * index)2300 void dict_index_remove_from_v_col_list(dict_index_t *index) {
2301   /* Index is not completely formed */
2302   if (!index->cached) {
2303     return;
2304   }
2305   if (dict_index_has_virtual(index)) {
2306     const dict_col_t *col;
2307     const dict_v_col_t *vcol;
2308 
2309     for (ulint i = 0; i < dict_index_get_n_fields(index); i++) {
2310       col = index->get_col(i);
2311       if (col->is_virtual()) {
2312         vcol = reinterpret_cast<const dict_v_col_t *>(col);
2313         /* This could be NULL, when we do add
2314         virtual column, add index together. We do not
2315         need to track this virtual column's index */
2316         if (vcol->v_indexes == nullptr) {
2317           continue;
2318         }
2319         dict_v_idx_list::iterator it;
2320         for (it = vcol->v_indexes->begin(); it != vcol->v_indexes->end();
2321              ++it) {
2322           dict_v_idx_t v_index = *it;
2323           if (v_index.index == index) {
2324             vcol->v_indexes->erase(it);
2325             break;
2326           }
2327         }
2328       }
2329     }
2330   }
2331 }
2332 
2333 /** Adds an index to the dictionary cache, with possible indexing newly
2334 added column.
2335 @param[in,out]	table	table on which the index is
2336 @param[in,out]	index	index; NOTE! The index memory
2337                         object is freed in this function!
2338 @param[in]	add_v	new virtual column that being added along with
2339                         an add index call
2340 @param[in]	page_no	root page number of the index
2341 @param[in]	strict	TRUE=refuse to create the index
2342                         if records could be too big to fit in
2343                         an B-tree page
2344 @return DB_SUCCESS, DB_TOO_BIG_RECORD, or DB_CORRUPTION */
dict_index_add_to_cache_w_vcol(dict_table_t * table,dict_index_t * index,const dict_add_v_col_t * add_v,page_no_t page_no,ibool strict)2345 dberr_t dict_index_add_to_cache_w_vcol(dict_table_t *table, dict_index_t *index,
2346                                        const dict_add_v_col_t *add_v,
2347                                        page_no_t page_no, ibool strict) {
2348   dict_index_t *new_index;
2349   ulint n_ord;
2350   ulint i;
2351 
2352   ut_ad(index);
2353   ut_ad(!mutex_own(&dict_sys->mutex));
2354   ut_ad(index->n_def == index->n_fields);
2355   ut_ad(index->magic_n == DICT_INDEX_MAGIC_N);
2356   ut_ad(!dict_index_is_online_ddl(index));
2357   ut_ad(!dict_index_is_ibuf(index));
2358 
2359   ut_d(mem_heap_validate(index->heap));
2360   ut_a(!index->is_clustered() || UT_LIST_GET_LEN(table->indexes) == 0);
2361 
2362   if (!dict_index_find_and_set_cols(table, index, add_v)) {
2363     dict_mem_index_free(index);
2364     return (DB_CORRUPTION);
2365   }
2366 
2367   /* Build the cache internal representation of the index,
2368   containing also the added system fields */
2369 
2370   if (index->type == DICT_FTS) {
2371     new_index = dict_index_build_internal_fts(table, index);
2372   } else if (index->is_clustered()) {
2373     new_index = dict_index_build_internal_clust(table, index);
2374   } else {
2375     new_index = dict_index_build_internal_non_clust(table, index);
2376   }
2377 
2378   /* Set the n_fields value in new_index to the actual defined
2379   number of fields in the cache internal representation */
2380 
2381   new_index->n_fields = new_index->n_def;
2382   new_index->trx_id = index->trx_id;
2383   new_index->set_committed(index->is_committed());
2384   new_index->allow_duplicates = index->allow_duplicates;
2385   new_index->nulls_equal = index->nulls_equal;
2386   new_index->disable_ahi = index->disable_ahi;
2387   new_index->srid_is_valid = index->srid_is_valid;
2388   new_index->srid = index->srid;
2389 
2390   new_index->srid = index->srid;
2391   new_index->srid_is_valid = index->srid_is_valid;
2392   if (index->rtr_srs.get() != nullptr)
2393     new_index->rtr_srs.reset(index->rtr_srs->clone());
2394 
2395   if (dict_index_too_big_for_tree(table, new_index, strict)) {
2396     if (strict) {
2397       dict_mem_index_free(new_index);
2398       dict_mem_index_free(index);
2399       return (DB_TOO_BIG_RECORD);
2400     } else if (current_thd != nullptr) {
2401       /* Avoid the warning to be printed
2402       during recovery. */
2403       ib_warn_row_too_big(table);
2404     }
2405   }
2406 
2407   n_ord = new_index->n_uniq;
2408 
2409   /* Flag the ordering columns and also set column max_prefix */
2410 
2411   for (i = 0; i < n_ord; i++) {
2412     const dict_field_t *field = new_index->get_field(i);
2413 
2414     /* Check the column being added in the index for
2415     the first time and flag the ordering column. */
2416     if (field->col->ord_part == 0) {
2417       field->col->max_prefix = field->prefix_len;
2418       field->col->ord_part = 1;
2419     } else if (field->prefix_len == 0) {
2420       /* Set the max_prefix for a column to 0 if
2421       its prefix length is 0 (for this index)
2422       even if it was a part of any other index
2423       with some prefix length. */
2424       field->col->max_prefix = 0;
2425     } else if (field->col->max_prefix != 0 &&
2426                field->prefix_len > field->col->max_prefix) {
2427       /* Set the max_prefix value based on the
2428       prefix_len. */
2429       field->col->max_prefix = field->prefix_len;
2430     }
2431     ut_ad(field->col->ord_part == 1);
2432   }
2433 
2434   new_index->stat_n_diff_key_vals = static_cast<ib_uint64_t *>(mem_heap_zalloc(
2435       new_index->heap, dict_index_get_n_unique(new_index) *
2436                            sizeof(*new_index->stat_n_diff_key_vals)));
2437 
2438   new_index->stat_n_sample_sizes = static_cast<ib_uint64_t *>(mem_heap_zalloc(
2439       new_index->heap, dict_index_get_n_unique(new_index) *
2440                            sizeof(*new_index->stat_n_sample_sizes)));
2441 
2442   new_index->stat_n_non_null_key_vals =
2443       static_cast<ib_uint64_t *>(mem_heap_zalloc(
2444           new_index->heap, dict_index_get_n_unique(new_index) *
2445                                sizeof(*new_index->stat_n_non_null_key_vals)));
2446 
2447   new_index->stat_index_size = 1;
2448   new_index->stat_n_leaf_pages = 1;
2449 
2450   new_index->table = table;
2451   new_index->table_name = table->name.m_name;
2452   new_index->search_info = btr_search_info_create(new_index->heap);
2453 
2454   new_index->page = page_no;
2455   rw_lock_create(index_tree_rw_lock_key, &new_index->lock, SYNC_INDEX_TREE);
2456 
2457   mutex_enter(&dict_sys->mutex);
2458 
2459   /* Add the new index as the last index for the table */
2460   UT_LIST_ADD_LAST(table->indexes, new_index);
2461 
2462   /* Intrinsic table are not added to dictionary cache instead are
2463   cached to session specific thread cache. */
2464   if (!table->is_intrinsic()) {
2465     dict_sys->size += mem_heap_get_size(new_index->heap);
2466   }
2467 
2468   mutex_exit(&dict_sys->mutex);
2469 
2470   /* Check if key part of the index is unique. */
2471   if (table->is_intrinsic()) {
2472     new_index->rec_cache.fixed_len_key = true;
2473     for (i = 0; i < new_index->n_uniq; i++) {
2474       const dict_field_t *field;
2475       field = new_index->get_field(i);
2476 
2477       if (!field->fixed_len) {
2478         new_index->rec_cache.fixed_len_key = false;
2479         break;
2480       }
2481     }
2482 
2483     new_index->rec_cache.key_has_null_cols = false;
2484     for (i = 0; i < new_index->n_uniq; i++) {
2485       const dict_field_t *field;
2486       field = new_index->get_field(i);
2487 
2488       if (!(field->col->prtype & DATA_NOT_NULL)) {
2489         new_index->rec_cache.key_has_null_cols = true;
2490         break;
2491       }
2492     }
2493   }
2494 
2495   if (dict_index_has_virtual(index)) {
2496     const dict_col_t *col;
2497     const dict_v_col_t *vcol;
2498 
2499     for (ulint i = 0; i < dict_index_get_n_fields(index); i++) {
2500       col = index->get_col(i);
2501       if (col->is_virtual()) {
2502         vcol = reinterpret_cast<const dict_v_col_t *>(col);
2503 
2504         /* This could be NULL, when we do add virtual
2505         column, add index together. We do not need to
2506         track this virtual column's index */
2507         if (vcol->v_indexes == nullptr) {
2508           continue;
2509         }
2510 
2511         dict_v_idx_list::iterator it;
2512 
2513         for (it = vcol->v_indexes->begin(); it != vcol->v_indexes->end();) {
2514           dict_v_idx_t v_index = *it;
2515           if (v_index.index == index) {
2516             vcol->v_indexes->erase(it++);
2517           } else {
2518             it++;
2519           }
2520         }
2521       }
2522     }
2523   }
2524 
2525   if (new_index->table->has_instant_cols() && new_index->is_clustered()) {
2526     new_index->instant_cols = true;
2527     new_index->n_instant_nullable =
2528         new_index->get_n_nullable_before(new_index->get_instant_fields());
2529   } else {
2530     new_index->instant_cols = false;
2531     new_index->n_instant_nullable = new_index->n_nullable;
2532   }
2533 
2534   dict_mem_index_free(index);
2535 
2536   return (DB_SUCCESS);
2537 }
2538 
2539 /** Removes an index from the dictionary cache. */
dict_index_remove_from_cache_low(dict_table_t * table,dict_index_t * index,ibool lru_evict)2540 static void dict_index_remove_from_cache_low(
2541     dict_table_t *table, /*!< in/out: table */
2542     dict_index_t *index, /*!< in, own: index */
2543     ibool lru_evict)     /*!< in: TRUE if index being evicted
2544                          to make room in the table LRU list */
2545 {
2546   lint size;
2547   ulint retries = 0;
2548   btr_search_t *info;
2549 
2550   ut_ad(table && index);
2551   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
2552   ut_ad(index->magic_n == DICT_INDEX_MAGIC_N);
2553   ut_ad(mutex_own(&dict_sys->mutex));
2554 
2555   /* No need to acquire the dict_index_t::lock here because
2556   there can't be any active operations on this index (or table). */
2557 
2558   if (index->online_log) {
2559     ut_ad(index->online_status == ONLINE_INDEX_CREATION);
2560     row_log_free(index->online_log);
2561   }
2562 
2563   /* We always create search info whether or not adaptive
2564   hash index is enabled or not. */
2565   info = btr_search_get_info(index);
2566   ut_ad(info);
2567 
2568   /* We are not allowed to free the in-memory index struct
2569   dict_index_t until all entries in the adaptive hash index
2570   that point to any of the page belonging to his b-tree index
2571   are dropped. This is so because dropping of these entries
2572   require access to dict_index_t struct. To avoid such scenario
2573   We keep a count of number of such pages in the search_info and
2574   only free the dict_index_t struct when this count drops to
2575   zero. See also: dict_table_can_be_evicted() */
2576 
2577   do {
2578     ulint ref_count = btr_search_info_get_ref_count(info, index);
2579 
2580     if (ref_count == 0) {
2581       break;
2582     }
2583 
2584     /* Sleep for 10ms before trying again. */
2585     os_thread_sleep(10000);
2586     ++retries;
2587 
2588     if (retries % 500 == 0) {
2589       /* No luck after 5 seconds of wait. */
2590       ib::error(ER_IB_MSG_181) << "Waited for " << retries / 100
2591                                << " secs for hash index"
2592                                   " ref_count ("
2593                                << ref_count
2594                                << ") to drop to 0."
2595                                   " index: "
2596                                << index->name << " table: " << table->name;
2597     }
2598 
2599     /* To avoid a hang here we commit suicide if the
2600     ref_count doesn't drop to zero in 600 seconds. */
2601     if (retries >= 60000) {
2602       ut_error;
2603     }
2604   } while (srv_shutdown_state.load() < SRV_SHUTDOWN_CLEANUP || !lru_evict);
2605 
2606   rw_lock_free(&index->lock);
2607 
2608   /* The index is being dropped, remove any compression stats for it. */
2609   if (!lru_evict && DICT_TF_GET_ZIP_SSIZE(index->table->flags) &&
2610       !index->table->discard_after_ddl) {
2611     index_id_t id(index->space, index->id);
2612     mutex_enter(&page_zip_stat_per_index_mutex);
2613     page_zip_stat_per_index.erase(id);
2614     mutex_exit(&page_zip_stat_per_index_mutex);
2615   }
2616 
2617   /* Remove the index from the list of indexes of the table */
2618   UT_LIST_REMOVE(table->indexes, index);
2619 
2620   /* Remove the index from affected virtual column index list */
2621   if (dict_index_has_virtual(index)) {
2622     const dict_col_t *col;
2623     const dict_v_col_t *vcol;
2624 
2625     for (ulint i = 0; i < dict_index_get_n_fields(index); i++) {
2626       col = index->get_col(i);
2627       if (col->is_virtual()) {
2628         vcol = reinterpret_cast<const dict_v_col_t *>(col);
2629 
2630         /* This could be NULL, when we do add virtual
2631         column, add index together. We do not need to
2632         track this virtual column's index */
2633         if (vcol->v_indexes == nullptr) {
2634           continue;
2635         }
2636 
2637         dict_v_idx_list::iterator it;
2638 
2639         for (it = vcol->v_indexes->begin(); it != vcol->v_indexes->end();
2640              ++it) {
2641           dict_v_idx_t v_index = *it;
2642           if (v_index.index == index) {
2643             vcol->v_indexes->erase(it);
2644             break;
2645           }
2646         }
2647       }
2648     }
2649   }
2650 
2651   size = mem_heap_get_size(index->heap);
2652 
2653   ut_ad(!table->is_intrinsic());
2654   ut_ad(dict_sys->size >= size);
2655 
2656   dict_sys->size -= size;
2657 
2658   dict_mem_index_free(index);
2659 }
2660 
2661 /** Removes an index from the dictionary cache. */
dict_index_remove_from_cache(dict_table_t * table,dict_index_t * index)2662 void dict_index_remove_from_cache(dict_table_t *table, /*!< in/out: table */
2663                                   dict_index_t *index) /*!< in, own: index */
2664 {
2665   dict_index_remove_from_cache_low(table, index, FALSE);
2666 }
2667 
2668 /** Duplicate a virtual column information
2669 @param[in]	v_col	virtual column information to duplicate
2670 @param[in,out]	heap	memory heap
2671 @return the duplicated virtual column */
dict_duplicate_v_col(const dict_v_col_t * v_col,mem_heap_t * heap)2672 static dict_v_col_t *dict_duplicate_v_col(const dict_v_col_t *v_col,
2673                                           mem_heap_t *heap) {
2674   dict_v_col_t *new_v_col =
2675       static_cast<dict_v_col_t *>(mem_heap_zalloc(heap, sizeof(*v_col)));
2676 
2677   ut_ad(v_col->v_indexes == nullptr);
2678 
2679   /* Currently, only m_col and v_indexes would be cared in future use,
2680   and v_indexes is always nullptr. So the memcpy can work for it */
2681   memcpy(new_v_col, v_col, sizeof(*v_col));
2682 
2683   return (new_v_col);
2684 }
2685 
2686 /** Tries to find column names for the index and sets the col field of the
2687 index.
2688 @param[in]	table	table
2689 @param[in,out]	index	index
2690 @param[in]	add_v	new virtual columns added along with an add index call
2691 @return true if the column names were found */
dict_index_find_and_set_cols(const dict_table_t * table,dict_index_t * index,const dict_add_v_col_t * add_v)2692 static ibool dict_index_find_and_set_cols(const dict_table_t *table,
2693                                           dict_index_t *index,
2694                                           const dict_add_v_col_t *add_v) {
2695   std::vector<ulint, ut_allocator<ulint>> col_added;
2696   std::vector<ulint, ut_allocator<ulint>> v_col_added;
2697 
2698   ut_ad(table != nullptr && index != nullptr);
2699   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
2700   ut_ad(!mutex_own(&dict_sys->mutex));
2701 
2702   for (ulint i = 0; i < index->n_fields; i++) {
2703     ulint j;
2704     dict_field_t *field = index->get_field(i);
2705 
2706     for (j = 0; j < table->n_cols; j++) {
2707       if (!strcmp(table->get_col_name(j), field->name)) {
2708         /* Check if same column is being assigned again
2709         which suggest that column has duplicate name. */
2710         bool exists =
2711             std::find(col_added.begin(), col_added.end(), j) != col_added.end();
2712 
2713         if (exists) {
2714           /* Duplicate column found. */
2715           goto dup_err;
2716         }
2717 
2718         field->col = table->get_col(j);
2719 
2720         col_added.push_back(j);
2721 
2722         goto found;
2723       }
2724     }
2725 
2726     /* Let's check if it is a virtual column */
2727     for (j = 0; j < table->n_v_cols; j++) {
2728       if (!strcmp(dict_table_get_v_col_name(table, j), field->name)) {
2729         /* Check if same column is being assigned again
2730         which suggest that column has duplicate name. */
2731         bool exists = std::find(v_col_added.begin(), v_col_added.end(), j) !=
2732                       v_col_added.end();
2733 
2734         if (exists) {
2735           /* Duplicate column found. */
2736           break;
2737         }
2738 
2739         field->col =
2740             reinterpret_cast<dict_col_t *>(dict_table_get_nth_v_col(table, j));
2741 
2742         v_col_added.push_back(j);
2743 
2744         goto found;
2745       }
2746     }
2747 
2748     if (add_v) {
2749       for (j = 0; j < add_v->n_v_col; j++) {
2750         if (!strcmp(add_v->v_col_name[j], field->name)) {
2751           /* Once add_v is not nullptr, it comes from ALTER TABLE.
2752           To make sure the index can work after ALTER TABLE path,
2753           which may happen when the ALTER TABLE gets rolled back,
2754           it is a must to duplicate the virtual column information,
2755           in case the passed in object would be freed after ALTER TABLE. */
2756 
2757           mutex_enter(&dict_sys->mutex);
2758           uint64_t old_size = mem_heap_get_size(table->heap);
2759           dict_v_col_t *vcol =
2760               dict_duplicate_v_col(&add_v->v_col[j], table->heap);
2761           field->col = &vcol->m_col;
2762           dict_sys->size += mem_heap_get_size(table->heap) - old_size;
2763           mutex_exit(&dict_sys->mutex);
2764 
2765           goto found;
2766         }
2767       }
2768     }
2769 
2770   dup_err:
2771 #ifdef UNIV_DEBUG
2772     /* It is an error not to find a matching column. */
2773     ib::error(ER_IB_MSG_182)
2774         << "No matching column for " << field->name << " in index "
2775         << index->name << " of table " << table->name;
2776 #endif /* UNIV_DEBUG */
2777     return (FALSE);
2778 
2779   found:;
2780   }
2781 
2782   return (TRUE);
2783 }
2784 
2785 /** Copies fields contained in index2 to index1. */
dict_index_copy(dict_index_t * index1,dict_index_t * index2,const dict_table_t * table,ulint start,ulint end)2786 static void dict_index_copy(dict_index_t *index1, /*!< in: index to copy to */
2787                             dict_index_t *index2, /*!< in: index to copy from */
2788                             const dict_table_t *table, /*!< in: table */
2789                             ulint start, /*!< in: first position to copy */
2790                             ulint end)   /*!< in: last position to copy */
2791 {
2792   dict_field_t *field;
2793   ulint i;
2794 
2795   /* Copy fields contained in index2 */
2796 
2797   for (i = start; i < end; i++) {
2798     field = index2->get_field(i);
2799 
2800     dict_index_add_col(index1, table, field->col, field->prefix_len,
2801                        field->is_ascending);
2802   }
2803 }
2804 
2805 /** Copies types of fields contained in index to tuple. */
dict_index_copy_types(dtuple_t * tuple,const dict_index_t * index,ulint n_fields)2806 void dict_index_copy_types(dtuple_t *tuple,           /*!< in/out: data tuple */
2807                            const dict_index_t *index, /*!< in: index */
2808                            ulint n_fields)            /*!< in: number of
2809                                                       field types to copy */
2810 {
2811   ulint i;
2812 
2813   if (dict_index_is_ibuf(index)) {
2814     /* For IBUF index set field types explicitly. */
2815     for (ulint i = 0; i < n_fields; i++) {
2816       dtype_t *dfield_type = dfield_get_type(dtuple_get_nth_field(tuple, i));
2817       dtype_set(dfield_type, DATA_BINARY, 0, 0);
2818     }
2819 
2820     return;
2821   }
2822 
2823   for (i = 0; i < n_fields; i++) {
2824     const dict_field_t *ifield;
2825     dtype_t *dfield_type;
2826 
2827     ifield = index->get_field(i);
2828     dfield_type = dfield_get_type(dtuple_get_nth_field(tuple, i));
2829     ifield->col->copy_type(dfield_type);
2830     if (dict_index_is_spatial(index) &&
2831         DATA_GEOMETRY_MTYPE(dfield_type->mtype)) {
2832       dfield_type->prtype |= DATA_GIS_MBR;
2833     }
2834   }
2835 }
2836 
2837 /** Copies types of virtual columns contained in table to tuple and sets all
2838 fields of the tuple to the SQL NULL value.  This function should
2839 be called right after dtuple_create().
2840 @param[in,out]	tuple	data tuple
2841 @param[in]	table	table
2842 */
dict_table_copy_v_types(dtuple_t * tuple,const dict_table_t * table)2843 void dict_table_copy_v_types(dtuple_t *tuple, const dict_table_t *table) {
2844   /* tuple could have more virtual columns than existing table,
2845   if we are calling this for creating index along with adding
2846   virtual columns */
2847   ulint n_fields =
2848       ut_min(dtuple_get_n_v_fields(tuple), static_cast<ulint>(table->n_v_def));
2849 
2850   for (ulint i = 0; i < n_fields; i++) {
2851     dfield_t *dfield = dtuple_get_nth_v_field(tuple, i);
2852     dtype_t *dtype = dfield_get_type(dfield);
2853 
2854     dfield_set_null(dfield);
2855     dict_table_get_nth_v_col(table, i)->m_col.copy_type(dtype);
2856   }
2857 }
2858 /** Copies types of columns contained in table to tuple and sets all
2859  fields of the tuple to the SQL NULL value.  This function should
2860  be called right after dtuple_create(). */
dict_table_copy_types(dtuple_t * tuple,const dict_table_t * table)2861 void dict_table_copy_types(dtuple_t *tuple,           /*!< in/out: data tuple */
2862                            const dict_table_t *table) /*!< in: table */
2863 {
2864   ulint i;
2865 
2866   for (i = 0; i < dtuple_get_n_fields(tuple); i++) {
2867     dfield_t *dfield = dtuple_get_nth_field(tuple, i);
2868     dtype_t *dtype = dfield_get_type(dfield);
2869 
2870     dfield_set_null(dfield);
2871     table->get_col(i)->copy_type(dtype);
2872   }
2873 
2874   dict_table_copy_v_types(tuple, table);
2875 }
2876 
2877 /********************************************************************
2878 Wait until all the background threads of the given table have exited, i.e.,
2879 bg_threads == 0. Note: bg_threads_mutex must be reserved when
2880 calling this. */
dict_table_wait_for_bg_threads_to_exit(dict_table_t * table,ulint delay)2881 void dict_table_wait_for_bg_threads_to_exit(
2882     dict_table_t *table, /*!< in: table */
2883     ulint delay)         /*!< in: time in microseconds to wait between
2884                          checks of bg_threads. */
2885 {
2886   fts_t *fts = table->fts;
2887 
2888   ut_ad(mutex_own(&fts->bg_threads_mutex));
2889 
2890   while (fts->bg_threads > 0) {
2891     mutex_exit(&fts->bg_threads_mutex);
2892 
2893     os_thread_sleep(delay);
2894 
2895     mutex_enter(&fts->bg_threads_mutex);
2896   }
2897 }
2898 
2899 /** Builds the internal dictionary cache representation for a clustered
2900  index, containing also system fields not defined by the user.
2901  @return own: the internal representation of the clustered index */
dict_index_build_internal_clust(const dict_table_t * table,dict_index_t * index)2902 static dict_index_t *dict_index_build_internal_clust(
2903     const dict_table_t *table, /*!< in: table */
2904     dict_index_t *index)       /*!< in: user representation of
2905                                a clustered index */
2906 {
2907   dict_index_t *new_index;
2908   dict_field_t *field;
2909   ulint trx_id_pos;
2910   ulint i;
2911   ibool *indexed;
2912 
2913   ut_ad(table && index);
2914   ut_ad(index->is_clustered());
2915   ut_ad(!dict_index_is_ibuf(index));
2916 
2917   ut_ad(!mutex_own(&dict_sys->mutex));
2918   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
2919 
2920   /* Create a new index object with certainly enough fields */
2921   new_index =
2922       dict_mem_index_create(table->name.m_name, index->name, table->space,
2923                             index->type, index->n_fields + table->n_cols);
2924 
2925   /* Copy other relevant data from the old index struct to the new
2926   struct: it inherits the values */
2927 
2928   new_index->n_user_defined_cols = index->n_fields;
2929 
2930   new_index->id = index->id;
2931 
2932   /* Copy the fields of index */
2933   dict_index_copy(new_index, index, table, 0, index->n_fields);
2934 
2935   if (dict_index_is_unique(index)) {
2936     /* Only the fields defined so far are needed to identify
2937     the index entry uniquely */
2938 
2939     new_index->n_uniq = new_index->n_def;
2940   } else {
2941     /* Also the row id is needed to identify the entry */
2942     new_index->n_uniq = 1 + new_index->n_def;
2943   }
2944 
2945   new_index->trx_id_offset = 0;
2946 
2947   /* Add system columns, trx id first */
2948 
2949   trx_id_pos = new_index->n_def;
2950 
2951   if (!dict_index_is_unique(index)) {
2952     dict_index_add_col(new_index, table, table->get_sys_col(DATA_ROW_ID), 0,
2953                        true);
2954     trx_id_pos++;
2955   }
2956 
2957   dict_index_add_col(new_index, table, table->get_sys_col(DATA_TRX_ID), 0,
2958                      true);
2959 
2960   for (i = 0; i < trx_id_pos; i++) {
2961     ulint fixed_size =
2962         new_index->get_col(i)->get_fixed_size(dict_table_is_comp(table));
2963 
2964     if (fixed_size == 0) {
2965       new_index->trx_id_offset = 0;
2966 
2967       break;
2968     }
2969 
2970     dict_field_t *field = new_index->get_field(i);
2971     if (field->prefix_len > 0) {
2972       new_index->trx_id_offset = 0;
2973 
2974       break;
2975     }
2976 
2977     /* Add fixed_size to new_index->trx_id_offset.
2978     Because the latter is a bit-field, an overflow
2979     can theoretically occur. Check for it. */
2980     fixed_size += new_index->trx_id_offset;
2981 
2982     new_index->trx_id_offset = fixed_size;
2983 
2984     if (new_index->trx_id_offset != fixed_size) {
2985       /* Overflow. Pretend that this is a
2986       variable-length PRIMARY KEY. */
2987       ut_ad(0);
2988       new_index->trx_id_offset = 0;
2989       break;
2990     }
2991   }
2992 
2993   /* UNDO logging is turned-off for intrinsic table and so
2994   DATA_ROLL_PTR system columns are not added as default system
2995   columns to such tables. */
2996   if (!table->is_intrinsic()) {
2997     dict_index_add_col(new_index, table, table->get_sys_col(DATA_ROLL_PTR), 0,
2998                        true);
2999   }
3000 
3001   /* Remember the table columns already contained in new_index */
3002   indexed =
3003       static_cast<ibool *>(ut_zalloc_nokey(table->n_cols * sizeof *indexed));
3004 
3005   /* Mark the table columns already contained in new_index */
3006   for (i = 0; i < new_index->n_def; i++) {
3007     field = new_index->get_field(i);
3008 
3009     /* If there is only a prefix of the column in the index
3010     field, do not mark the column as contained in the index */
3011 
3012     if (field->prefix_len == 0) {
3013       indexed[field->col->ind] = TRUE;
3014     }
3015   }
3016 
3017   /* Add to new_index non-system columns of table not yet included
3018   there */
3019   ulint n_sys_cols = table->get_n_sys_cols();
3020   for (i = 0; i + n_sys_cols < (ulint)table->n_cols; i++) {
3021     dict_col_t *col = table->get_col(i);
3022     ut_ad(col->mtype != DATA_SYS);
3023 
3024     if (!indexed[col->ind]) {
3025       dict_index_add_col(new_index, table, col, 0, true);
3026     }
3027   }
3028 
3029   ut_free(indexed);
3030 
3031   ut_ad(UT_LIST_GET_LEN(table->indexes) == 0);
3032 
3033   new_index->cached = TRUE;
3034 
3035   return (new_index);
3036 }
3037 
3038 /** Builds the internal dictionary cache representation for a non-clustered
3039  index, containing also system fields not defined by the user.
3040  @return own: the internal representation of the non-clustered index */
dict_index_build_internal_non_clust(const dict_table_t * table,dict_index_t * index)3041 static dict_index_t *dict_index_build_internal_non_clust(
3042     const dict_table_t *table, /*!< in: table */
3043     dict_index_t *index)       /*!< in: user representation of
3044                                a non-clustered index */
3045 {
3046   dict_field_t *field;
3047   dict_index_t *new_index;
3048   dict_index_t *clust_index;
3049   ulint i;
3050   ibool *indexed;
3051 
3052   ut_ad(table && index);
3053   ut_ad(!index->is_clustered());
3054   ut_ad(!dict_index_is_ibuf(index));
3055   ut_ad(!mutex_own(&dict_sys->mutex));
3056   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
3057 
3058   /* The clustered index should be the first in the list of indexes */
3059   clust_index = UT_LIST_GET_FIRST(table->indexes);
3060 
3061   ut_ad(clust_index);
3062   ut_ad(clust_index->is_clustered());
3063   ut_ad(!dict_index_is_ibuf(clust_index));
3064 
3065   /* Create a new index */
3066   new_index = dict_mem_index_create(table->name.m_name, index->name,
3067                                     index->space, index->type,
3068                                     index->n_fields + 1 + clust_index->n_uniq);
3069 
3070   /* Copy other relevant data from the old index
3071   struct to the new struct: it inherits the values */
3072 
3073   new_index->n_user_defined_cols = index->n_fields;
3074 
3075   new_index->id = index->id;
3076 
3077   /* Copy fields from index to new_index */
3078   dict_index_copy(new_index, index, table, 0, index->n_fields);
3079 
3080   /* Remember the table columns already contained in new_index */
3081   indexed =
3082       static_cast<ibool *>(ut_zalloc_nokey(table->n_cols * sizeof *indexed));
3083 
3084   /* Mark the table columns already contained in new_index */
3085   for (i = 0; i < new_index->n_def; i++) {
3086     field = new_index->get_field(i);
3087 
3088     if (field->col->is_virtual()) {
3089       continue;
3090     }
3091 
3092     /* If there is only a prefix of the column in the index
3093     field, do not mark the column as contained in the index */
3094 
3095     if (field->prefix_len == 0) {
3096       indexed[field->col->ind] = TRUE;
3097     }
3098   }
3099 
3100   /* Add to new_index the columns necessary to determine the clustered
3101   index entry uniquely */
3102 
3103   for (i = 0; i < clust_index->n_uniq; i++) {
3104     field = clust_index->get_field(i);
3105 
3106     if (!indexed[field->col->ind]) {
3107       dict_index_add_col(new_index, table, field->col, field->prefix_len,
3108                          field->is_ascending);
3109     } else if (dict_index_is_spatial(index)) {
3110       /*For spatial index, we still need to add the
3111       field to index. */
3112       dict_index_add_col(new_index, table, field->col, field->prefix_len,
3113                          field->is_ascending);
3114     }
3115   }
3116 
3117   ut_free(indexed);
3118 
3119   if (dict_index_is_unique(index)) {
3120     new_index->n_uniq = index->n_fields;
3121   } else {
3122     new_index->n_uniq = new_index->n_def;
3123   }
3124 
3125   /* Set the n_fields value in new_index to the actual defined
3126   number of fields */
3127 
3128   new_index->n_fields = new_index->n_def;
3129 
3130   new_index->cached = TRUE;
3131 
3132   return (new_index);
3133 }
3134 
3135 /***********************************************************************
3136 Builds the internal dictionary cache representation for an FTS index.
3137 @return own: the internal representation of the FTS index */
dict_index_build_internal_fts(dict_table_t * table,dict_index_t * index)3138 static dict_index_t *dict_index_build_internal_fts(
3139     dict_table_t *table, /*!< in: table */
3140     dict_index_t *index) /*!< in: user representation of an FTS index */
3141 {
3142   dict_index_t *new_index;
3143 
3144   ut_ad(table && index);
3145   ut_ad(index->type == DICT_FTS);
3146   ut_ad(!mutex_own(&dict_sys->mutex));
3147   ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
3148 
3149   /* Create a new index */
3150   new_index = dict_mem_index_create(table->name.m_name, index->name,
3151                                     index->space, index->type, index->n_fields);
3152 
3153   /* Copy other relevant data from the old index struct to the new
3154   struct: it inherits the values */
3155 
3156   new_index->n_user_defined_cols = index->n_fields;
3157 
3158   new_index->id = index->id;
3159 
3160   /* Copy fields from index to new_index */
3161   dict_index_copy(new_index, index, table, 0, index->n_fields);
3162 
3163   new_index->n_uniq = 0;
3164   new_index->cached = TRUE;
3165 
3166   if (table->fts->cache == nullptr) {
3167     table->fts->cache = fts_cache_create(table);
3168   }
3169 
3170   rw_lock_x_lock(&table->fts->cache->init_lock);
3171   /* Notify the FTS cache about this index. */
3172   fts_cache_index_cache_create(table, new_index);
3173   rw_lock_x_unlock(&table->fts->cache->init_lock);
3174 
3175   return (new_index);
3176 }
3177 /*====================== FOREIGN KEY PROCESSING ========================*/
3178 
3179 /** Checks if a table is referenced by foreign keys.
3180  @return true if table is referenced by a foreign key */
dict_table_is_referenced_by_foreign_key(const dict_table_t * table)3181 ibool dict_table_is_referenced_by_foreign_key(
3182     const dict_table_t *table) /*!< in: InnoDB table */
3183 {
3184   return (!table->referenced_set.empty());
3185 }
3186 
3187 /** Removes a foreign constraint struct from the dictionary cache. */
dict_foreign_remove_from_cache(dict_foreign_t * foreign)3188 void dict_foreign_remove_from_cache(
3189     dict_foreign_t *foreign) /*!< in, own: foreign constraint */
3190 {
3191   ut_ad(mutex_own(&dict_sys->mutex));
3192   ut_a(foreign);
3193 
3194   if (foreign->referenced_table != nullptr) {
3195     foreign->referenced_table->referenced_set.erase(foreign);
3196   }
3197 
3198   if (foreign->foreign_table != nullptr) {
3199     foreign->foreign_table->foreign_set.erase(foreign);
3200   }
3201 
3202   dict_foreign_free(foreign);
3203 }
3204 
3205 /** Looks for the foreign constraint from the foreign and referenced lists
3206  of a table.
3207  @return foreign constraint */
dict_foreign_find(dict_table_t * table,dict_foreign_t * foreign)3208 static dict_foreign_t *dict_foreign_find(
3209     dict_table_t *table,     /*!< in: table object */
3210     dict_foreign_t *foreign) /*!< in: foreign constraint */
3211 {
3212   ut_ad(mutex_own(&dict_sys->mutex));
3213 
3214   ut_ad(dict_foreign_set_validate(table->foreign_set));
3215   ut_ad(dict_foreign_set_validate(table->referenced_set));
3216 
3217   dict_foreign_set::iterator it = table->foreign_set.find(foreign);
3218 
3219   if (it != table->foreign_set.end()) {
3220     return (*it);
3221   }
3222 
3223   it = table->referenced_set.find(foreign);
3224 
3225   if (it != table->referenced_set.end()) {
3226     return (*it);
3227   }
3228 
3229   return (nullptr);
3230 }
3231 
3232 /** Tries to find an index whose first fields are the columns in the array,
3233  in the same order and is not marked for deletion and is not the same
3234  as types_idx.
3235  @return matching index, NULL if not found */
dict_foreign_find_index(const dict_table_t * table,const char ** col_names,const char ** columns,ulint n_cols,const dict_index_t * types_idx,bool check_charsets,ulint check_null)3236 dict_index_t *dict_foreign_find_index(
3237     const dict_table_t *table, /*!< in: table */
3238     const char **col_names,
3239     /*!< in: column names, or NULL
3240     to use table->col_names */
3241     const char **columns, /*!< in: array of column names */
3242     ulint n_cols,         /*!< in: number of columns */
3243     const dict_index_t *types_idx,
3244     /*!< in: NULL or an index
3245     whose types the column types
3246     must match */
3247     bool check_charsets,
3248     /*!< in: whether to check
3249     charsets.  only has an effect
3250     if types_idx != NULL */
3251     ulint check_null)
3252 /*!< in: nonzero if none of
3253 the columns must be declared
3254 NOT NULL */
3255 {
3256   const dict_index_t *index;
3257 
3258   ut_ad(mutex_own(&dict_sys->mutex));
3259 
3260   index = table->first_index();
3261 
3262   while (index != nullptr) {
3263     if (types_idx != index && !(index->type & DICT_FTS) &&
3264         !dict_index_is_spatial(index) && !index->to_be_dropped &&
3265         (!(index->uncommitted &&
3266            ((index->online_status == ONLINE_INDEX_ABORTED_DROPPED) ||
3267             (index->online_status == ONLINE_INDEX_ABORTED)))) &&
3268         dict_foreign_qualify_index(table, col_names, columns, n_cols, index,
3269                                    types_idx, check_charsets, check_null)) {
3270       return const_cast<dict_index_t *>(index);
3271     }
3272 
3273     index = index->next();
3274   }
3275 
3276   return (nullptr);
3277 }
3278 
3279 /** Report an error in a foreign key definition. */
dict_foreign_error_report_low(FILE * file,const char * name)3280 static void dict_foreign_error_report_low(
3281     FILE *file,       /*!< in: output stream */
3282     const char *name) /*!< in: table name */
3283 {
3284   rewind(file);
3285   ut_print_timestamp(file);
3286   fprintf(file, " Error in foreign key constraint of table %s:\n", name);
3287 }
3288 
3289 /** Report an error in a foreign key definition. */
dict_foreign_error_report(FILE * file,dict_foreign_t * fk,const char * msg)3290 static void dict_foreign_error_report(
3291     FILE *file,         /*!< in: output stream */
3292     dict_foreign_t *fk, /*!< in: foreign key constraint */
3293     const char *msg)    /*!< in: the error message */
3294 {
3295   mutex_enter(&dict_foreign_err_mutex);
3296   dict_foreign_error_report_low(file, fk->foreign_table_name);
3297   fputs(msg, file);
3298   fputs(" Constraint:\n", file);
3299   dict_print_info_on_foreign_key_in_create_format(file, nullptr, fk, TRUE);
3300   putc('\n', file);
3301   if (fk->foreign_index) {
3302     fprintf(file,
3303             "The index in the foreign key in table is"
3304             " %s\n%s\n",
3305             fk->foreign_index->name(), FOREIGN_KEY_CONSTRAINTS_MSG);
3306   }
3307   mutex_exit(&dict_foreign_err_mutex);
3308 }
3309 
3310 /** Adds a foreign key constraint object to the dictionary cache. May free
3311  the object if there already is an object with the same identifier in.
3312  At least one of the foreign table and the referenced table must already
3313  be in the dictionary cache!
3314  @return DB_SUCCESS or error code */
dict_foreign_add_to_cache(dict_foreign_t * foreign,const char ** col_names,bool check_charsets,bool can_free_fk,dict_err_ignore_t ignore_err)3315 dberr_t dict_foreign_add_to_cache(dict_foreign_t *foreign,
3316                                   /*!< in, own: foreign key constraint */
3317                                   const char **col_names,
3318                                   /*!< in: column names, or NULL to use
3319                                   foreign->foreign_table->col_names */
3320                                   bool check_charsets,
3321                                   /*!< in: whether to check charset
3322                                   compatibility */
3323                                   bool can_free_fk,
3324                                   /*!< in: whether free existing FK */
3325                                   dict_err_ignore_t ignore_err)
3326 /*!< in: error to be ignored */
3327 {
3328   dict_table_t *for_table;
3329   dict_table_t *ref_table;
3330   dict_foreign_t *for_in_cache = nullptr;
3331   dict_index_t *index;
3332   ibool added_to_referenced_list = FALSE;
3333   FILE *ef = dict_foreign_err_file;
3334 
3335   DBUG_TRACE;
3336   DBUG_PRINT("dict_foreign_add_to_cache", ("id: %s", foreign->id));
3337 
3338   ut_ad(mutex_own(&dict_sys->mutex));
3339 
3340   for_table =
3341       dict_table_check_if_in_cache_low(foreign->foreign_table_name_lookup);
3342 
3343   ref_table =
3344       dict_table_check_if_in_cache_low(foreign->referenced_table_name_lookup);
3345   ut_a(for_table || ref_table);
3346 
3347   if (for_table) {
3348     for_in_cache = dict_foreign_find(for_table, foreign);
3349   }
3350 
3351   if (!for_in_cache && ref_table) {
3352     for_in_cache = dict_foreign_find(ref_table, foreign);
3353   }
3354 
3355   if (for_in_cache && for_in_cache != foreign) {
3356     /* Free the foreign object */
3357     dict_foreign_free(foreign);
3358   } else {
3359     for_in_cache = foreign;
3360   }
3361 
3362   if (ref_table && !for_in_cache->referenced_table) {
3363     index = dict_foreign_find_index(
3364         ref_table, nullptr, for_in_cache->referenced_col_names,
3365         for_in_cache->n_fields, for_in_cache->foreign_index, check_charsets,
3366         false);
3367 
3368     if (index == nullptr && !(ignore_err & DICT_ERR_IGNORE_FK_NOKEY)) {
3369       dict_foreign_error_report(ef, for_in_cache,
3370                                 "there is no index in referenced table"
3371                                 " which would contain\n"
3372                                 "the columns as the first columns,"
3373                                 " or the data types in the\n"
3374                                 "referenced table do not match"
3375                                 " the ones in table.");
3376 
3377       if (for_in_cache == foreign && can_free_fk) {
3378         mem_heap_free(foreign->heap);
3379       }
3380 
3381       return DB_CANNOT_ADD_CONSTRAINT;
3382     }
3383 
3384     for_in_cache->referenced_table = ref_table;
3385     for_in_cache->referenced_index = index;
3386 
3387     std::pair<dict_foreign_set::iterator, bool> ret =
3388         ref_table->referenced_set.insert(for_in_cache);
3389 
3390     ut_a(ret.second); /* second is true if the insertion
3391                       took place */
3392     added_to_referenced_list = TRUE;
3393   }
3394 
3395   if (for_table && !for_in_cache->foreign_table) {
3396     index = dict_foreign_find_index(
3397         for_table, col_names, for_in_cache->foreign_col_names,
3398         for_in_cache->n_fields, for_in_cache->referenced_index, check_charsets,
3399         for_in_cache->type & (DICT_FOREIGN_ON_DELETE_SET_NULL |
3400                               DICT_FOREIGN_ON_UPDATE_SET_NULL));
3401 
3402     if (index == nullptr && !(ignore_err & DICT_ERR_IGNORE_FK_NOKEY)) {
3403       dict_foreign_error_report(ef, for_in_cache,
3404                                 "there is no index in the table"
3405                                 " which would contain\n"
3406                                 "the columns as the first columns,"
3407                                 " or the data types in the\n"
3408                                 "table do not match"
3409                                 " the ones in the referenced table\n"
3410                                 "or one of the ON ... SET NULL columns"
3411                                 " is declared NOT NULL.");
3412 
3413       if (for_in_cache == foreign) {
3414         if (added_to_referenced_list) {
3415           const dict_foreign_set::size_type n =
3416               ref_table->referenced_set.erase(for_in_cache);
3417 
3418           ut_a(n == 1); /* the number of
3419                         elements removed must
3420                         be one */
3421         }
3422         mem_heap_free(foreign->heap);
3423       }
3424 
3425       return DB_CANNOT_ADD_CONSTRAINT;
3426     }
3427 
3428     for_in_cache->foreign_table = for_table;
3429     for_in_cache->foreign_index = index;
3430 
3431     std::pair<dict_foreign_set::iterator, bool> ret =
3432         for_table->foreign_set.insert(for_in_cache);
3433 
3434     ut_a(ret.second); /* second is true if the insertion
3435                       took place */
3436   }
3437 
3438   /* We need to move the table to the non-LRU end of the table LRU
3439   list. Otherwise it will be evicted from the cache. */
3440 
3441   if (ref_table != nullptr) {
3442     dict_table_prevent_eviction(ref_table);
3443   }
3444 
3445   if (for_table != nullptr) {
3446     dict_table_prevent_eviction(for_table);
3447   }
3448 
3449   ut_ad(dict_lru_validate());
3450   return DB_SUCCESS;
3451 }
3452 
3453 /** Finds the highest [number] for foreign key constraints of the table. Looks
3454  only at the >= 4.0.18-format id's, which are of the form
3455  databasename/tablename_ibfk_[number].
3456  TODO: Remove this function once in-place ALTER TABLE code is
3457  updated to avoid its use.
3458  @return highest number, 0 if table has no new format foreign key constraints */
dict_table_get_highest_foreign_id(dict_table_t * table)3459 ulint dict_table_get_highest_foreign_id(
3460     dict_table_t *table) /*!< in: table in the dictionary memory cache */
3461 {
3462   dict_foreign_t *foreign;
3463   char *endp;
3464   ulint biggest_id = 0;
3465   ulint id;
3466   ulint len;
3467 
3468   DBUG_TRACE;
3469 
3470   ut_a(table);
3471 
3472   len = ut_strlen(table->name.m_name);
3473 
3474   for (dict_foreign_set::iterator it = table->foreign_set.begin();
3475        it != table->foreign_set.end(); ++it) {
3476     foreign = *it;
3477 
3478     if (ut_strlen(foreign->id) > ((sizeof dict_ibfk) - 1) + len &&
3479         0 == ut_memcmp(foreign->id, table->name.m_name, len) &&
3480         0 == ut_memcmp(foreign->id + len, dict_ibfk, (sizeof dict_ibfk) - 1) &&
3481         foreign->id[len + ((sizeof dict_ibfk) - 1)] != '0') {
3482       /* It is of the >= 4.0.18 format */
3483 
3484       id = strtoul(foreign->id + len + ((sizeof dict_ibfk) - 1), &endp, 10);
3485       if (*endp == '\0') {
3486         ut_a(id != biggest_id);
3487 
3488         if (id > biggest_id) {
3489           biggest_id = id;
3490         }
3491       }
3492     }
3493   }
3494 
3495   ulint size = table->foreign_set.size();
3496 
3497   biggest_id = (size > biggest_id) ? size : biggest_id;
3498 
3499   DBUG_PRINT("dict_table_get_highest_foreign_id", ("id: %lu", biggest_id));
3500 
3501   return biggest_id;
3502 }
3503 
3504 /*==================== END OF FOREIGN KEY PROCESSING ====================*/
3505 
3506 #ifdef UNIV_DEBUG
3507 /** Checks that a tuple has n_fields_cmp value in a sensible range, so that
3508  no comparison can occur with the page number field in a node pointer.
3509  @return true if ok */
dict_index_check_search_tuple(const dict_index_t * index,const dtuple_t * tuple)3510 ibool dict_index_check_search_tuple(
3511     const dict_index_t *index, /*!< in: index tree */
3512     const dtuple_t *tuple)     /*!< in: tuple used in a search */
3513 {
3514   ut_a(index);
3515   ut_a(dtuple_get_n_fields_cmp(tuple) <=
3516        dict_index_get_n_unique_in_tree(index));
3517   ut_ad(index->page != FIL_NULL);
3518   ut_ad(index->page >= FSP_FIRST_INODE_PAGE_NO);
3519   ut_ad(dtuple_check_typed(tuple));
3520   ut_ad(!(index->type & DICT_FTS));
3521   return (TRUE);
3522 }
3523 #endif /* UNIV_DEBUG */
3524 
3525 /** Builds a node pointer out of a physical record and a page number.
3526  @return own: node pointer */
dict_index_build_node_ptr(const dict_index_t * index,const rec_t * rec,page_no_t page_no,mem_heap_t * heap,ulint level)3527 dtuple_t *dict_index_build_node_ptr(const dict_index_t *index, /*!< in: index */
3528                                     const rec_t *rec,  /*!< in: record for which
3529                                                        to build node  pointer */
3530                                     page_no_t page_no, /*!< in: page number to
3531                                                        put in node pointer */
3532                                     mem_heap_t *heap, /*!< in: memory heap where
3533                                                       pointer created */
3534                                     ulint level) /*!< in: level of rec in tree:
3535                                                  0 means leaf level */
3536 {
3537   dtuple_t *tuple;
3538   dfield_t *field;
3539   byte *buf;
3540   ulint n_unique;
3541 
3542   if (dict_index_is_ibuf(index)) {
3543     /* In a universal index tree, we take the whole record as
3544     the node pointer if the record is on the leaf level,
3545     on non-leaf levels we remove the last field, which
3546     contains the page number of the child page */
3547 
3548     ut_a(!dict_table_is_comp(index->table));
3549     n_unique = rec_get_n_fields_old_raw(rec);
3550 
3551     if (level > 0) {
3552       ut_a(n_unique > 1);
3553       n_unique--;
3554     }
3555   } else {
3556     n_unique = dict_index_get_n_unique_in_tree_nonleaf(index);
3557   }
3558 
3559   tuple = dtuple_create(heap, n_unique + 1);
3560 
3561   /* When searching in the tree for the node pointer, we must not do
3562   comparison on the last field, the page number field, as on upper
3563   levels in the tree there may be identical node pointers with a
3564   different page number; therefore, we set the n_fields_cmp to one
3565   less: */
3566 
3567   dtuple_set_n_fields_cmp(tuple, n_unique);
3568 
3569   dict_index_copy_types(tuple, index, n_unique);
3570 
3571   buf = static_cast<byte *>(mem_heap_alloc(heap, 4));
3572 
3573   mach_write_to_4(buf, page_no);
3574 
3575   field = dtuple_get_nth_field(tuple, n_unique);
3576   dfield_set_data(field, buf, 4);
3577 
3578   dtype_set(dfield_get_type(field), DATA_SYS_CHILD, DATA_NOT_NULL, 4);
3579 
3580   rec_copy_prefix_to_dtuple(tuple, rec, index, n_unique, heap);
3581   dtuple_set_info_bits(tuple,
3582                        dtuple_get_info_bits(tuple) | REC_STATUS_NODE_PTR);
3583 
3584   ut_ad(dtuple_check_typed(tuple));
3585 
3586   return (tuple);
3587 }
3588 
dict_index_copy_rec_order_prefix(const dict_index_t * index,const rec_t * rec,ulint * n_fields,byte ** buf,size_t * buf_size)3589 rec_t *dict_index_copy_rec_order_prefix(const dict_index_t *index,
3590                                         const rec_t *rec, ulint *n_fields,
3591 
3592                                         byte **buf, size_t *buf_size) {
3593   ulint n;
3594 
3595   UNIV_PREFETCH_R(rec);
3596 
3597   if (dict_index_is_ibuf(index)) {
3598     ut_a(!dict_table_is_comp(index->table));
3599     n = rec_get_n_fields_old_raw(rec);
3600   } else {
3601     if (page_is_leaf(page_align(rec))) {
3602       n = dict_index_get_n_unique_in_tree(index);
3603     } else {
3604       n = dict_index_get_n_unique_in_tree_nonleaf(index);
3605       /* For internal node of R-tree, since we need to
3606       compare the page no field, so, we need to copy this
3607       field as well. */
3608       if (dict_index_is_spatial(index)) {
3609         n++;
3610       }
3611     }
3612   }
3613 
3614   *n_fields = n;
3615   return (rec_copy_prefix_to_buf(rec, index, n, buf, buf_size));
3616 }
3617 
3618 /** Builds a typed data tuple out of a physical record.
3619  @return own: data tuple */
dict_index_build_data_tuple(dict_index_t * index,rec_t * rec,ulint n_fields,mem_heap_t * heap)3620 dtuple_t *dict_index_build_data_tuple(
3621     dict_index_t *index, /*!< in: index tree */
3622     rec_t *rec,          /*!< in: record for which to build data tuple */
3623     ulint n_fields,      /*!< in: number of data fields */
3624     mem_heap_t *heap)    /*!< in: memory heap where tuple created */
3625 {
3626   dtuple_t *tuple;
3627 
3628   ut_ad(dict_table_is_comp(index->table) ||
3629         n_fields <= rec_get_n_fields_old(rec, index));
3630 
3631   tuple = dtuple_create(heap, n_fields);
3632 
3633   dict_index_copy_types(tuple, index, n_fields);
3634 
3635   rec_copy_prefix_to_dtuple(tuple, rec, index, n_fields, heap);
3636 
3637   ut_ad(dtuple_check_typed(tuple));
3638 
3639   return (tuple);
3640 }
3641 
3642 /** Calculates the minimum record length in an index. */
dict_index_calc_min_rec_len(const dict_index_t * index)3643 ulint dict_index_calc_min_rec_len(const dict_index_t *index) /*!< in: index */
3644 {
3645   ulint sum = 0;
3646   ulint i;
3647   ulint comp = dict_table_is_comp(index->table);
3648 
3649   if (comp) {
3650     ulint nullable = 0;
3651     sum = REC_N_NEW_EXTRA_BYTES;
3652     for (i = 0; i < dict_index_get_n_fields(index); i++) {
3653       const dict_col_t *col = index->get_col(i);
3654       ulint size = col->get_fixed_size(comp);
3655       sum += size;
3656       if (!size) {
3657         size = col->len;
3658         sum += size < 128 ? 1 : 2;
3659       }
3660       if (!(col->prtype & DATA_NOT_NULL)) {
3661         nullable++;
3662       }
3663     }
3664 
3665     /* round the NULL flags up to full bytes */
3666     sum += UT_BITS_IN_BYTES(nullable);
3667 
3668     return (sum);
3669   }
3670 
3671   for (i = 0; i < dict_index_get_n_fields(index); i++) {
3672     sum += index->get_col(i)->get_fixed_size(comp);
3673   }
3674 
3675   if (sum > 127) {
3676     sum += 2 * dict_index_get_n_fields(index);
3677   } else {
3678     sum += dict_index_get_n_fields(index);
3679   }
3680 
3681   sum += REC_N_OLD_EXTRA_BYTES;
3682 
3683   return (sum);
3684 }
3685 
3686 /** Outputs info on a foreign key of a table in a format suitable for
3687  CREATE TABLE. */
dict_print_info_on_foreign_key_in_create_format(FILE * file,trx_t * trx,dict_foreign_t * foreign,ibool add_newline)3688 void dict_print_info_on_foreign_key_in_create_format(
3689     FILE *file,              /*!< in: file where to print */
3690     trx_t *trx,              /*!< in: transaction */
3691     dict_foreign_t *foreign, /*!< in: foreign key constraint */
3692     ibool add_newline)       /*!< in: whether to add a newline */
3693 {
3694   const char *stripped_id;
3695   ulint i;
3696 
3697   if (strchr(foreign->id, '/')) {
3698     /* Strip the preceding database name from the constraint id */
3699     stripped_id = foreign->id + 1 + dict_get_db_name_len(foreign->id);
3700   } else {
3701     stripped_id = foreign->id;
3702   }
3703 
3704   putc(',', file);
3705 
3706   if (add_newline) {
3707     /* SHOW CREATE TABLE wants constraints each printed nicely
3708     on its own line, while error messages want no newlines
3709     inserted. */
3710     fputs("\n ", file);
3711   }
3712 
3713   fputs(" CONSTRAINT ", file);
3714   innobase_quote_identifier(file, trx, stripped_id);
3715   fputs(" FOREIGN KEY (", file);
3716 
3717   for (i = 0;;) {
3718     innobase_quote_identifier(file, trx, foreign->foreign_col_names[i]);
3719     if (++i < foreign->n_fields) {
3720       fputs(", ", file);
3721     } else {
3722       break;
3723     }
3724   }
3725 
3726   fputs(") REFERENCES ", file);
3727 
3728   if (dict_tables_have_same_db(foreign->foreign_table_name_lookup,
3729                                foreign->referenced_table_name_lookup)) {
3730     /* Do not print the database name of the referenced table */
3731     ut_print_name(file, trx,
3732                   dict_remove_db_name(foreign->referenced_table_name));
3733   } else {
3734     ut_print_name(file, trx, foreign->referenced_table_name);
3735   }
3736 
3737   putc(' ', file);
3738   putc('(', file);
3739 
3740   for (i = 0;;) {
3741     innobase_quote_identifier(file, trx, foreign->referenced_col_names[i]);
3742     if (++i < foreign->n_fields) {
3743       fputs(", ", file);
3744     } else {
3745       break;
3746     }
3747   }
3748 
3749   putc(')', file);
3750 
3751   if (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE) {
3752     fputs(" ON DELETE CASCADE", file);
3753   }
3754 
3755   if (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL) {
3756     fputs(" ON DELETE SET NULL", file);
3757   }
3758 
3759   if (!(foreign->type & DICT_FOREIGN_ON_DELETE_NO_ACTION) &&
3760       !(foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE) &&
3761       !(foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)) {
3762     fputs(" ON DELETE RESTRICT", file);
3763   }
3764 
3765   if (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE) {
3766     fputs(" ON UPDATE CASCADE", file);
3767   }
3768 
3769   if (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL) {
3770     fputs(" ON UPDATE SET NULL", file);
3771   }
3772 
3773   if (!(foreign->type & DICT_FOREIGN_ON_UPDATE_NO_ACTION) &&
3774       !(foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE) &&
3775       !(foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
3776     fputs(" ON UPDATE RESTRICT", file);
3777   }
3778 }
3779 #endif /* !UNIV_HOTBACKUP */
3780 
3781 /** Inits the structure for persisting dynamic metadata */
dict_persist_init(void)3782 void dict_persist_init(void) {
3783   dict_persist =
3784       static_cast<dict_persist_t *>(ut_zalloc_nokey(sizeof(*dict_persist)));
3785 
3786   mutex_create(LATCH_ID_DICT_PERSIST_DIRTY_TABLES, &dict_persist->mutex);
3787 
3788 #ifndef UNIV_HOTBACKUP
3789   UT_LIST_INIT(dict_persist->dirty_dict_tables,
3790                &dict_table_t::dirty_dict_tables);
3791 #endif /* !UNIV_HOTBACKUP */
3792 
3793   dict_persist->num_dirty_tables = 0;
3794 
3795   dict_persist->persisters = UT_NEW_NOKEY(Persisters());
3796   dict_persist->persisters->add(PM_INDEX_CORRUPTED);
3797   dict_persist->persisters->add(PM_TABLE_AUTO_INC);
3798 
3799 #ifndef UNIV_HOTBACKUP
3800   dict_persist_update_log_margin();
3801 #endif /* !UNIV_HOTBACKUP */
3802 }
3803 
3804 /** Clear the structure */
dict_persist_close(void)3805 void dict_persist_close(void) {
3806   UT_DELETE(dict_persist->persisters);
3807 
3808 #ifndef UNIV_HOTBACKUP
3809   UT_DELETE(dict_persist->table_buffer);
3810 #endif /* !UNIV_HOTBACKUP */
3811 
3812   mutex_free(&dict_persist->mutex);
3813 
3814   ut_free(dict_persist);
3815 }
3816 
3817 #ifndef UNIV_HOTBACKUP
3818 /** Initialize the dynamic metadata according to the table object
3819 @param[in]	table		table object
3820 @param[in,out]	metadata	metadata to be initialized */
dict_init_dynamic_metadata(dict_table_t * table,PersistentTableMetadata * metadata)3821 static void dict_init_dynamic_metadata(dict_table_t *table,
3822                                        PersistentTableMetadata *metadata) {
3823   ut_ad(mutex_own(&dict_persist->mutex));
3824 
3825   ut_ad(metadata->get_table_id() == table->id);
3826 
3827   for (const dict_index_t *index = table->first_index(); index != nullptr;
3828        index = index->next()) {
3829     if (index->is_corrupted()) {
3830       metadata->add_corrupted_index(index_id_t(index->space, index->id));
3831     }
3832   }
3833 
3834   if (table->autoinc_persisted != 0) {
3835     metadata->set_autoinc(table->autoinc_persisted);
3836   }
3837 
3838   /* Will initialize other metadata here */
3839 }
3840 #endif /* !UNIV_HOTBACKUP */
3841 
3842 /** Apply the persistent dynamic metadata read from redo logs or
3843 DDTableBuffer to corresponding table during recovery.
3844 @param[in,out]	table		table
3845 @param[in]	metadata	structure of persistent metadata
3846 @return true if we do apply something to the in-memory table object,
3847 otherwise false */
dict_table_apply_dynamic_metadata(dict_table_t * table,const PersistentTableMetadata * metadata)3848 bool dict_table_apply_dynamic_metadata(
3849     dict_table_t *table, const PersistentTableMetadata *metadata) {
3850   bool get_dirty = false;
3851 
3852   ut_ad(mutex_own(&dict_sys->mutex));
3853 
3854   /* Apply corrupted index ids first */
3855   const corrupted_ids_t corrupted_ids = metadata->get_corrupted_indexes();
3856 
3857   for (corrupted_ids_t::const_iterator iter = corrupted_ids.begin();
3858        iter != corrupted_ids.end(); ++iter) {
3859     const index_id_t index_id = *iter;
3860     dict_index_t *index;
3861 
3862     index = const_cast<dict_index_t *>(
3863         dict_table_find_index_on_id(table, index_id));
3864 
3865     if (index != nullptr) {
3866       ut_ad(index->space == index_id.m_space_id);
3867 
3868       if (!index->is_corrupted()) {
3869         index->type |= DICT_CORRUPT;
3870         get_dirty = true;
3871       }
3872 
3873     } else {
3874       /* In some cases, we could only load some indexes
3875       of a table but not all(See dict_load_indexes()).
3876       So we might not find it here */
3877       ib::info(ER_IB_MSG_184)
3878           << "Failed to find the index: " << index_id.m_index_id
3879           << " in space: " << index_id.m_space_id
3880           << " of table: " << table->name << "(table id: " << table->id
3881           << "). The index should have been dropped"
3882           << " or couldn't be loaded.";
3883     }
3884   }
3885 
3886   /* FIXME: Move this to the beginning of this function once corrupted
3887   index IDs are also written back to dd::Table::se_private_data. */
3888   /* Here is how version play role. Basically, version would be increased
3889   by one during every DDL. So applying metadata here should only be
3890   done when the versions match. One reason for this version is that
3891   autoinc counter may not be applied if it's bigger if the version is
3892   older.
3893   If the version of metadata is older than current table,
3894   then table already has the latest metadata, the old one should be
3895   discarded.
3896   If the metadata version is bigger than the one in table.
3897   it could be that an ALTER TABLE has been rolled back, so metadata
3898   in new version should be ignored too. */
3899   if (table->version != metadata->get_version()) {
3900     return (get_dirty);
3901   }
3902 
3903   ib_uint64_t autoinc = metadata->get_autoinc();
3904 
3905   /* This happens during recovery, so no locks are needed. */
3906   if (autoinc > table->autoinc_persisted) {
3907     table->autoinc = autoinc;
3908     table->autoinc_persisted = autoinc;
3909 
3910     get_dirty = true;
3911   }
3912 
3913   /* Will apply other persistent metadata here */
3914 
3915   return (get_dirty);
3916 }
3917 
3918 #ifndef UNIV_HOTBACKUP
3919 /** Read persistent dynamic metadata stored in a buffer
3920 @param[in]	buffer		buffer to read
3921 @param[in]	size		size of data in buffer
3922 @param[in]	metadata	where we store the metadata from buffer */
dict_table_read_dynamic_metadata(const byte * buffer,ulint size,PersistentTableMetadata * metadata)3923 void dict_table_read_dynamic_metadata(const byte *buffer, ulint size,
3924                                       PersistentTableMetadata *metadata) {
3925   const byte *pos = buffer;
3926   persistent_type_t type;
3927   Persister *persister;
3928   ulint consumed;
3929   bool corrupt;
3930 
3931   while (size > 0) {
3932     type = static_cast<persistent_type_t>(pos[0]);
3933     ut_ad(type > PM_SMALLEST_TYPE && type < PM_BIGGEST_TYPE);
3934 
3935     persister = dict_persist->persisters->get(type);
3936     ut_ad(persister != nullptr);
3937 
3938     consumed = persister->read(*metadata, pos, size, &corrupt);
3939     ut_ad(consumed != 0);
3940     ut_ad(size >= consumed);
3941     ut_ad(!corrupt);
3942 
3943     size -= consumed;
3944     pos += consumed;
3945   }
3946 
3947   ut_ad(size == 0);
3948 }
3949 
3950 /** Check if there is any latest persistent dynamic metadata recorded
3951 in DDTableBuffer table of the specific table. If so, read the metadata and
3952 update the table object accordingly. It's used when loading table.
3953 @param[in]	table		table object */
dict_table_load_dynamic_metadata(dict_table_t * table)3954 void dict_table_load_dynamic_metadata(dict_table_t *table) {
3955   DDTableBuffer *table_buffer;
3956 
3957   ut_ad(dict_sys != nullptr);
3958   ut_ad(mutex_own(&dict_sys->mutex));
3959   ut_ad(!table->is_temporary());
3960 
3961   table_buffer = dict_persist->table_buffer;
3962 
3963   mutex_enter(&dict_persist->mutex);
3964 
3965   std::string *readmeta;
3966   uint64 version;
3967   readmeta = table_buffer->get(table->id, &version);
3968 
3969   if (readmeta->length() != 0) {
3970     /* Persistent dynamic metadata of this table have changed
3971     recently, we need to update them to in-memory table */
3972     PersistentTableMetadata metadata(table->id, version);
3973 
3974     dict_table_read_dynamic_metadata(
3975         reinterpret_cast<const byte *>(readmeta->data()), readmeta->length(),
3976         &metadata);
3977 
3978     bool is_dirty = dict_table_apply_dynamic_metadata(table, &metadata);
3979 
3980     /* If !is_dirty, it could be either:
3981     1. It's first time to load this table, and the corrupted
3982     index marked has been dropped. Current dirty_status should
3983     be METADATA_CLEAN.
3984     2. It's the second time to apply dynamic metadata to this
3985     table, current in-memory dynamic metadata is up-to-date.
3986     Current dirty_status should be METADATA_BUFFERED.
3987     In both cases, we don't have to change the dirty_status */
3988     if (is_dirty) {
3989       UT_LIST_ADD_LAST(dict_persist->dirty_dict_tables, table);
3990       table->dirty_status.store(METADATA_BUFFERED);
3991       ut_d(table->in_dirty_dict_tables_list = true);
3992     }
3993   }
3994 
3995   mutex_exit(&dict_persist->mutex);
3996 
3997   UT_DELETE(readmeta);
3998 }
3999 #endif /* !UNIV_HOTBACKUP */
4000 
4001 /** Mark the dirty_status of a table as METADATA_DIRTY, and add it to the
4002 dirty_dict_tables list if necessary.
4003 @param[in,out]	table		table */
dict_table_mark_dirty(dict_table_t * table)4004 void dict_table_mark_dirty(dict_table_t *table) {
4005   ut_ad(!table->is_temporary());
4006 
4007   /* We should not adding dynamic metadata so late in shutdown phase and
4008   this data would only be retrieved during recovery. */
4009   ut_ad(srv_shutdown_state.load() < SRV_SHUTDOWN_FLUSH_PHASE);
4010 
4011   mutex_enter(&dict_persist->mutex);
4012 
4013   switch (table->dirty_status.load()) {
4014     case METADATA_DIRTY:
4015       break;
4016     case METADATA_CLEAN:
4017       /* Not in dirty_tables list, add it now */
4018       UT_LIST_ADD_LAST(dict_persist->dirty_dict_tables, table);
4019       ut_d(table->in_dirty_dict_tables_list = true);
4020       /* Fall through */
4021     case METADATA_BUFFERED:
4022       table->dirty_status.store(METADATA_DIRTY);
4023       ++dict_persist->num_dirty_tables;
4024 #ifndef UNIV_HOTBACKUP
4025       dict_persist_update_log_margin();
4026 #endif /* !UNIV_HOTBACKUP */
4027   }
4028 
4029   ut_ad(table->in_dirty_dict_tables_list);
4030 
4031   mutex_exit(&dict_persist->mutex);
4032 }
4033 
4034 /** Flags an index corrupted in the data dictionary cache only. This
4035 is used to mark a corrupted index when index's own dictionary
4036 is corrupted, and we would force to load such index for repair purpose.
4037 Besides, we have to write a redo log.
4038 We don't want to hold dict_sys->mutex here, so that we can set index as
4039 corrupted in some low-level functions. We would only set the flags from
4040 not corrupted to corrupted when server is running, so it should be safe
4041 to set it directly.
4042 @param[in,out]	index		index, must not be NULL */
dict_set_corrupted(dict_index_t * index)4043 void dict_set_corrupted(dict_index_t *index) {
4044   dict_table_t *table = index->table;
4045 
4046   if (index->type & DICT_CORRUPT) {
4047     return;
4048   }
4049 
4050   index->type |= DICT_CORRUPT;
4051 
4052   if (!srv_read_only_mode && !table->is_temporary()) {
4053     PersistentTableMetadata metadata(table->id, table->version);
4054     metadata.add_corrupted_index(index_id_t(index->space, index->id));
4055 
4056     Persister *persister = dict_persist->persisters->get(PM_INDEX_CORRUPTED);
4057     ut_ad(persister != nullptr);
4058 
4059 #ifndef UNIV_HOTBACKUP
4060     mtr_t mtr;
4061 
4062     mtr.start();
4063     persister->write_log(table->id, metadata, &mtr);
4064     mtr.commit();
4065 
4066     /* Make sure the corruption bit won't be lost */
4067     log_write_up_to(*log_sys, mtr.commit_lsn(), true);
4068 #endif /* !UNIV_HOTBACKUP */
4069 
4070     dict_table_mark_dirty(table);
4071   }
4072 }
4073 
4074 #ifndef UNIV_HOTBACKUP
4075 /** Write the dirty persistent dynamic metadata for a table to
4076 DD TABLE BUFFER table. This is the low level function to write back.
4077 @param[in,out]	table	table to write */
dict_table_persist_to_dd_table_buffer_low(dict_table_t * table)4078 static void dict_table_persist_to_dd_table_buffer_low(dict_table_t *table) {
4079   ut_ad(dict_sys != nullptr);
4080   ut_ad(mutex_own(&dict_persist->mutex));
4081   ut_ad(table->dirty_status.load() == METADATA_DIRTY);
4082   ut_ad(table->in_dirty_dict_tables_list);
4083   ut_ad(!table->is_temporary());
4084 
4085   DDTableBuffer *table_buffer = dict_persist->table_buffer;
4086   PersistentTableMetadata metadata(table->id, table->version);
4087   byte buffer[REC_MAX_DATA_SIZE];
4088   ulint size;
4089 
4090   /* Here the status gets changed first, to make concurrent
4091   update to this table to wait on dict_persist_t::mutex.
4092   See dict_table_autoinc_log(), etc. */
4093   table->dirty_status.store(METADATA_BUFFERED);
4094 
4095   dict_init_dynamic_metadata(table, &metadata);
4096 
4097   size = dict_persist->persisters->write(metadata, buffer);
4098 
4099   dberr_t error =
4100       table_buffer->replace(table->id, table->version, buffer, size);
4101   ut_a(error == DB_SUCCESS);
4102 
4103   ut_ad(dict_persist->num_dirty_tables > 0);
4104   --dict_persist->num_dirty_tables;
4105 #ifndef UNIV_HOTBACKUP
4106   dict_persist_update_log_margin();
4107 #endif /* !UNIV_HOTBACKUP */
4108 }
4109 
4110 /** Write back the dirty persistent dynamic metadata of the table
4111 to DDTableBuffer
4112 @param[in,out]	table	table object */
dict_table_persist_to_dd_table_buffer(dict_table_t * table)4113 void dict_table_persist_to_dd_table_buffer(dict_table_t *table) {
4114   ut_ad(dict_sys != nullptr);
4115   ut_ad(mutex_own(&dict_sys->mutex));
4116 
4117   mutex_enter(&dict_persist->mutex);
4118 
4119   if (table->dirty_status.load() != METADATA_DIRTY) {
4120     /* Double check the status, since a concurrent checkpoint
4121     may have already changed the status to not dirty */
4122     mutex_exit(&dict_persist->mutex);
4123     return;
4124   }
4125 
4126   ut_ad(table->in_dirty_dict_tables_list);
4127 
4128   dict_table_persist_to_dd_table_buffer_low(table);
4129 
4130   mutex_exit(&dict_persist->mutex);
4131 }
4132 
4133 /** Check if any table has any dirty persistent data, if so
4134 write dirty persistent data of table to mysql.innodb_dynamic_metadata
4135 accordingly. */
dict_persist_to_dd_table_buffer()4136 void dict_persist_to_dd_table_buffer() {
4137   bool persisted = false;
4138 
4139   if (dict_sys == nullptr) {
4140     log_set_dict_max_allowed_checkpoint_lsn(*log_sys, 0);
4141     return;
4142   }
4143 
4144   mutex_enter(&dict_persist->mutex);
4145 
4146   for (dict_table_t *table = UT_LIST_GET_FIRST(dict_persist->dirty_dict_tables);
4147        table != nullptr;) {
4148     dict_table_t *next = UT_LIST_GET_NEXT(dirty_dict_tables, table);
4149 
4150     ut_ad(table->dirty_status.load() == METADATA_DIRTY ||
4151           table->dirty_status.load() == METADATA_BUFFERED);
4152     ut_ad(next == nullptr || next->magic_n == DICT_TABLE_MAGIC_N);
4153 
4154     if (table->dirty_status.load() == METADATA_DIRTY) {
4155       /* We should not attempt to write to data pages while shutting down
4156       page cleaners. */
4157       if (srv_shutdown_state.load() >= SRV_SHUTDOWN_FLUSH_PHASE) {
4158         ut_ad(false);
4159       } else {
4160         dict_table_persist_to_dd_table_buffer_low(table);
4161         persisted = true;
4162       }
4163     }
4164 
4165     table = next;
4166   }
4167 
4168   ut_ad(dict_persist->num_dirty_tables == 0);
4169 
4170   /* Get this lsn with dict_persist->mutex held,
4171   so no other concurrent dynamic metadata change logs
4172   would be before this lsn. */
4173   const lsn_t persisted_lsn = log_get_lsn(*log_sys);
4174 
4175   /* As soon as we release the dict_persist->mutex, new dynamic
4176   metadata changes could happen. They would be not persisted
4177   until next call to dict_persist_to_dd_table_buffer.
4178   We must not remove redo which could allow to deduce them.
4179   Therefore the maximum allowed lsn for checkpoint is the
4180   current lsn. */
4181   log_set_dict_max_allowed_checkpoint_lsn(*log_sys, persisted_lsn);
4182 
4183   mutex_exit(&dict_persist->mutex);
4184 
4185   if (persisted) {
4186     log_write_up_to(*log_sys, persisted_lsn, true);
4187   }
4188 }
4189 
4190 #ifndef UNIV_HOTBACKUP
4191 
4192 /** Calculate and update the redo log margin for current tables which
4193 have some changed dynamic metadata in memory and have not been written
4194 back to mysql.innodb_dynamic_metadata. Update LSN limit, which is used
4195 to stop user threads when redo log is running out of space and they
4196 do not hold latches (log.free_check_limit_sn). */
dict_persist_update_log_margin()4197 static void dict_persist_update_log_margin() {
4198   /* Below variables basically considers only the AUTO_INCREMENT counter
4199   and a small margin for corrupted indexes. */
4200 
4201   /* Every table will generate less than 80 bytes without
4202   considering page split */
4203   static constexpr uint32_t log_margin_per_table_no_split = 80;
4204 
4205   /* Every table metadata log may roughly consume such many bytes. */
4206   static constexpr uint32_t record_size_per_table = 50;
4207 
4208   /* How many tables may generate one page split */
4209   static const uint32_t tables_per_split =
4210       (univ_page_size.physical() - PAGE_NEW_SUPREMUM_END) /
4211       record_size_per_table / 2;
4212 
4213   /* Every page split needs at most this log margin, if not root split. */
4214   static const uint32_t log_margin_per_split_no_root = 500;
4215 
4216   /* Extra marge for root split, we always leave this margin,
4217   since we don't know exactly it will split root or not */
4218   static const uint32_t log_margin_per_split_root =
4219       univ_page_size.physical() / 2 * 3; /* Add 50% margin. */
4220 
4221   /* Read without holding the dict_persist_t::mutex */
4222   uint32_t num_dirty_tables = dict_persist->num_dirty_tables;
4223   uint32_t total_splits = 0;
4224   uint32_t num_tables = num_dirty_tables;
4225 
4226   while (num_tables > 0) {
4227     total_splits += num_tables / tables_per_split + 1;
4228     num_tables = num_tables / tables_per_split;
4229   }
4230 
4231   const auto margin = (num_dirty_tables * log_margin_per_table_no_split +
4232                        total_splits * log_margin_per_split_no_root +
4233                        (num_dirty_tables == 0 ? 0 : log_margin_per_split_root));
4234 
4235   if (log_sys != nullptr) {
4236     /* Update margin for redo log */
4237     log_set_dict_persist_margin(*log_sys, margin);
4238   }
4239 }
4240 #endif /* !UNIV_HOTBACKUP */
4241 
4242 #ifdef UNIV_DEBUG
4243 /** Sets merge_threshold for all indexes in the list of tables
4244 @param[in]	list			pointer to the list of tables
4245 @param[in]	merge_threshold_all	value to set for all indexes */
dict_set_merge_threshold_list_debug(UT_LIST_BASE_NODE_T (dict_table_t)* list,uint merge_threshold_all)4246 inline void dict_set_merge_threshold_list_debug(
4247     UT_LIST_BASE_NODE_T(dict_table_t) * list, uint merge_threshold_all) {
4248   for (dict_table_t *table = UT_LIST_GET_FIRST(*list); table != nullptr;
4249        table = UT_LIST_GET_NEXT(table_LRU, table)) {
4250     for (dict_index_t *index = UT_LIST_GET_FIRST(table->indexes);
4251          index != nullptr; index = UT_LIST_GET_NEXT(indexes, index)) {
4252       rw_lock_x_lock(dict_index_get_lock(index));
4253       index->merge_threshold = merge_threshold_all;
4254       rw_lock_x_unlock(dict_index_get_lock(index));
4255     }
4256   }
4257 }
4258 
4259 /** Sets merge_threshold for all indexes in dictionary cache for debug.
4260 @param[in]	merge_threshold_all	value to set for all indexes */
dict_set_merge_threshold_all_debug(uint merge_threshold_all)4261 void dict_set_merge_threshold_all_debug(uint merge_threshold_all) {
4262   mutex_enter(&dict_sys->mutex);
4263 
4264   dict_set_merge_threshold_list_debug(&dict_sys->table_LRU,
4265                                       merge_threshold_all);
4266   dict_set_merge_threshold_list_debug(&dict_sys->table_non_LRU,
4267                                       merge_threshold_all);
4268 
4269   mutex_exit(&dict_sys->mutex);
4270 }
4271 #endif /* UNIV_DEBUG */
4272 
4273 /** Inits dict_ind_redundant. */
dict_ind_init(void)4274 void dict_ind_init(void) {
4275   dict_table_t *table;
4276 
4277   /* create dummy table and index for REDUNDANT infimum and supremum */
4278   table = dict_mem_table_create("SYS_DUMMY1", DICT_HDR_SPACE, 1, 0, 0, 0, 0);
4279   dict_mem_table_add_col(table, nullptr, nullptr, DATA_CHAR,
4280                          DATA_ENGLISH | DATA_NOT_NULL, 8, true);
4281 
4282   dict_ind_redundant =
4283       dict_mem_index_create("SYS_DUMMY1", "SYS_DUMMY1", DICT_HDR_SPACE, 0, 1);
4284   dict_index_add_col(dict_ind_redundant, table, table->get_col(0), 0, true);
4285   dict_ind_redundant->table = table;
4286   /* avoid ut_ad(index->cached) in dict_index_get_n_unique_in_tree */
4287   dict_ind_redundant->cached = TRUE;
4288 }
4289 
4290 /** Frees dict_ind_redundant. */
dict_ind_free(void)4291 void dict_ind_free(void) {
4292   dict_table_t *table;
4293 
4294   table = dict_ind_redundant->table;
4295   dict_mem_index_free(dict_ind_redundant);
4296   dict_ind_redundant = nullptr;
4297   dict_mem_table_free(table);
4298 }
4299 
4300 /** Get an index by name.
4301 @param[in]	table		the table where to look for the index
4302 @param[in]	name		the index name to look for
4303 @param[in]	committed	true=search for committed,
4304 false=search for uncommitted
4305 @return index, NULL if does not exist */
dict_table_get_index_on_name(dict_table_t * table,const char * name,bool committed)4306 dict_index_t *dict_table_get_index_on_name(dict_table_t *table,
4307                                            const char *name, bool committed) {
4308   dict_index_t *index;
4309 
4310   index = table->first_index();
4311 
4312   while (index != nullptr) {
4313     if (index->is_committed() == committed &&
4314         innobase_strcasecmp(index->name, name) == 0) {
4315       return (index);
4316     }
4317 
4318     index = index->next();
4319   }
4320 
4321   return (nullptr);
4322 }
4323 
4324 /** Replace the index passed in with another equivalent index in the
4325  foreign key lists of the table.
4326  @return whether all replacements were found */
dict_foreign_replace_index(dict_table_t * table,const char ** col_names,const dict_index_t * index)4327 bool dict_foreign_replace_index(
4328     dict_table_t *table, /*!< in/out: table */
4329     const char **col_names,
4330     /*!< in: column names, or NULL
4331     to use table->col_names */
4332     const dict_index_t *index) /*!< in: index to be replaced */
4333 {
4334   bool found = true;
4335   dict_foreign_t *foreign;
4336 
4337   ut_ad(index->to_be_dropped);
4338   ut_ad(index->table == table);
4339 
4340   for (dict_foreign_set::iterator it = table->foreign_set.begin();
4341        it != table->foreign_set.end(); ++it) {
4342     foreign = *it;
4343     if (foreign->foreign_index == index) {
4344       ut_ad(foreign->foreign_table == index->table);
4345 
4346       dict_index_t *new_index = dict_foreign_find_index(
4347           foreign->foreign_table, col_names, foreign->foreign_col_names,
4348           foreign->n_fields, index,
4349           /*check_charsets=*/TRUE, /*check_null=*/FALSE);
4350       if (new_index) {
4351         ut_ad(new_index->table == index->table);
4352         ut_ad(!new_index->to_be_dropped);
4353       } else {
4354         found = false;
4355       }
4356 
4357       foreign->foreign_index = new_index;
4358     }
4359   }
4360 
4361   for (dict_foreign_set::iterator it = table->referenced_set.begin();
4362        it != table->referenced_set.end(); ++it) {
4363     foreign = *it;
4364     if (foreign->referenced_index == index) {
4365       ut_ad(foreign->referenced_table == index->table);
4366 
4367       dict_index_t *new_index = dict_foreign_find_index(
4368           foreign->referenced_table, nullptr, foreign->referenced_col_names,
4369           foreign->n_fields, index,
4370           /*check_charsets=*/TRUE, /*check_null=*/FALSE);
4371       /* There must exist an alternative index,
4372       since this must have been checked earlier. */
4373       if (new_index) {
4374         ut_ad(new_index->table == index->table);
4375         ut_ad(!new_index->to_be_dropped);
4376       } else {
4377         found = false;
4378       }
4379 
4380       foreign->referenced_index = new_index;
4381     }
4382   }
4383 
4384   return (found);
4385 }
4386 
4387 #ifdef UNIV_DEBUG
4388 /** Check for duplicate index entries in a table [using the index name] */
dict_table_check_for_dup_indexes(const dict_table_t * table,enum check_name check)4389 void dict_table_check_for_dup_indexes(
4390     const dict_table_t *table, /*!< in: Check for dup indexes
4391                                in this table */
4392     enum check_name check)     /*!< in: whether and when to allow
4393                                temporary index names */
4394 {
4395   /* Check for duplicates, ignoring indexes that are marked
4396   as to be dropped */
4397 
4398   const dict_index_t *index1;
4399   const dict_index_t *index2;
4400 
4401   ut_ad(mutex_own(&dict_sys->mutex));
4402 
4403   /* The primary index _must_ exist */
4404   ut_a(UT_LIST_GET_LEN(table->indexes) > 0);
4405 
4406   index1 = UT_LIST_GET_FIRST(table->indexes);
4407 
4408   do {
4409     if (!index1->is_committed()) {
4410       ut_a(!index1->is_clustered());
4411 
4412       switch (check) {
4413         case CHECK_ALL_COMPLETE:
4414           ut_error;
4415         case CHECK_ABORTED_OK:
4416           switch (dict_index_get_online_status(index1)) {
4417             case ONLINE_INDEX_COMPLETE:
4418             case ONLINE_INDEX_CREATION:
4419               ut_error;
4420               break;
4421             case ONLINE_INDEX_ABORTED:
4422             case ONLINE_INDEX_ABORTED_DROPPED:
4423               break;
4424           }
4425           /* fall through */
4426         case CHECK_PARTIAL_OK:
4427           break;
4428       }
4429     }
4430 
4431     for (index2 = UT_LIST_GET_NEXT(indexes, index1); index2 != nullptr;
4432          index2 = UT_LIST_GET_NEXT(indexes, index2)) {
4433       ut_ad(index1->is_committed() != index2->is_committed() ||
4434             strcmp(index1->name, index2->name) != 0);
4435     }
4436 
4437     index1 = UT_LIST_GET_NEXT(indexes, index1);
4438   } while (index1);
4439 }
4440 #endif /* UNIV_DEBUG */
4441 
4442 /** Converts a database and table name from filesystem encoding (e.g.
4443 "@code d@i1b/a@q1b@1Kc @endcode", same format as used in  dict_table_t::name)
4444 in two strings in UTF8 encoding (e.g. dцb and aюbØc). The output buffers must
4445 be at least MAX_DB_UTF8_LEN and MAX_TABLE_UTF8_LEN bytes.
4446 @param[in]	db_and_table	database and table names,
4447                                 e.g. "@code d@i1b/a@q1b@1Kc @endcode"
4448 @param[out]	db_utf8		database name, e.g. dцb
4449 @param[in]	db_utf8_size	dbname_utf8 size
4450 @param[out]	table_utf8	table name, e.g. aюbØc
4451 @param[in]	table_utf8_size	table_utf8 size */
dict_fs2utf8(const char * db_and_table,char * db_utf8,size_t db_utf8_size,char * table_utf8,size_t table_utf8_size)4452 void dict_fs2utf8(const char *db_and_table, char *db_utf8, size_t db_utf8_size,
4453                   char *table_utf8, size_t table_utf8_size) {
4454   char db[MAX_DATABASE_NAME_LEN + 1];
4455   ulint db_len;
4456   uint errors;
4457 
4458   db_len = dict_get_db_name_len(db_and_table);
4459 
4460   ut_a(db_len <= sizeof(db));
4461 
4462   memcpy(db, db_and_table, db_len);
4463   db[db_len] = '\0';
4464 
4465   strconvert(&my_charset_filename, db, system_charset_info, db_utf8,
4466              db_utf8_size, &errors);
4467 
4468   /* convert each # to @0023 in table name and store the result in buf */
4469   const char *table = dict_remove_db_name(db_and_table);
4470   const char *table_p;
4471   char buf[MAX_TABLE_NAME_LEN * 5 + 1];
4472   char *buf_p;
4473   for (table_p = table, buf_p = buf; table_p[0] != '\0'; table_p++) {
4474     if (table_p[0] != '#') {
4475       buf_p[0] = table_p[0];
4476       buf_p++;
4477     } else {
4478       buf_p[0] = '@';
4479       buf_p[1] = '0';
4480       buf_p[2] = '0';
4481       buf_p[3] = '2';
4482       buf_p[4] = '3';
4483       buf_p += 5;
4484     }
4485     ut_a((size_t)(buf_p - buf) < sizeof(buf));
4486   }
4487   buf_p[0] = '\0';
4488 
4489   errors = 0;
4490   strconvert(&my_charset_filename, buf, system_charset_info, table_utf8,
4491              table_utf8_size, &errors);
4492 
4493   if (errors != 0) {
4494     snprintf(table_utf8, table_utf8_size, "%s", table);
4495   }
4496 }
4497 
4498 /** Resize the hash tables besed on the current buffer pool size. */
dict_resize()4499 void dict_resize() {
4500   dict_table_t *table;
4501 
4502   mutex_enter(&dict_sys->mutex);
4503 
4504   /* all table entries are in table_LRU and table_non_LRU lists */
4505   hash_table_free(dict_sys->table_hash);
4506   hash_table_free(dict_sys->table_id_hash);
4507 
4508   dict_sys->table_hash = hash_create(
4509       buf_pool_get_curr_size() / (DICT_POOL_PER_TABLE_HASH * UNIV_WORD_SIZE));
4510 
4511   dict_sys->table_id_hash = hash_create(
4512       buf_pool_get_curr_size() / (DICT_POOL_PER_TABLE_HASH * UNIV_WORD_SIZE));
4513 
4514   for (table = UT_LIST_GET_FIRST(dict_sys->table_LRU); table;
4515        table = UT_LIST_GET_NEXT(table_LRU, table)) {
4516     ulint fold = ut_fold_string(table->name.m_name);
4517     ulint id_fold = ut_fold_ull(table->id);
4518 
4519     HASH_INSERT(dict_table_t, name_hash, dict_sys->table_hash, fold, table);
4520 
4521     HASH_INSERT(dict_table_t, id_hash, dict_sys->table_id_hash, id_fold, table);
4522   }
4523 
4524   for (table = UT_LIST_GET_FIRST(dict_sys->table_non_LRU); table;
4525        table = UT_LIST_GET_NEXT(table_LRU, table)) {
4526     ulint fold = ut_fold_string(table->name.m_name);
4527     ulint id_fold = ut_fold_ull(table->id);
4528 
4529     HASH_INSERT(dict_table_t, name_hash, dict_sys->table_hash, fold, table);
4530 
4531     HASH_INSERT(dict_table_t, id_hash, dict_sys->table_id_hash, id_fold, table);
4532   }
4533 
4534   mutex_exit(&dict_sys->mutex);
4535 }
4536 #endif /* !UNIV_HOTBACKUP */
4537 
4538 /** Closes the data dictionary module. */
dict_close(void)4539 void dict_close(void) {
4540   if (dict_sys == nullptr) {
4541     /* This should only happen if a failure occurred
4542     during redo log processing. */
4543     return;
4544   }
4545 
4546   /* Acquire only because it's a pre-condition. */
4547   mutex_enter(&dict_sys->mutex);
4548 
4549   if (dict_sys->table_stats != nullptr) {
4550     dict_table_close(dict_sys->table_stats, true, false);
4551   }
4552   if (dict_sys->index_stats != nullptr) {
4553     dict_table_close(dict_sys->index_stats, true, false);
4554   }
4555   if (dict_sys->dynamic_metadata != nullptr) {
4556     dict_table_close(dict_sys->dynamic_metadata, true, false);
4557   }
4558   if (dict_sys->ddl_log) {
4559     dict_table_close(dict_sys->ddl_log, true, false);
4560   }
4561 
4562 #ifndef UNIV_HOTBACKUP
4563   /* Free the hash elements. We don't remove them from the table
4564   because we are going to destroy the table anyway. */
4565   for (ulint i = 0; i < hash_get_n_cells(dict_sys->table_id_hash); i++) {
4566     dict_table_t *table;
4567 
4568     table =
4569         static_cast<dict_table_t *>(HASH_GET_FIRST(dict_sys->table_hash, i));
4570 
4571     while (table) {
4572       dict_table_t *prev_table = table;
4573 
4574       table = static_cast<dict_table_t *>(HASH_GET_NEXT(name_hash, prev_table));
4575       ut_ad(prev_table->magic_n == DICT_TABLE_MAGIC_N);
4576       dict_table_remove_from_cache(prev_table);
4577     }
4578   }
4579 #endif /* !UNIV_HOTBACKUP */
4580 
4581   hash_table_free(dict_sys->table_hash);
4582 
4583   /* The elements are the same instance as in dict_sys->table_hash,
4584   therefore we don't delete the individual elements. */
4585   hash_table_free(dict_sys->table_id_hash);
4586 
4587 #ifndef UNIV_HOTBACKUP
4588   dict_ind_free();
4589 #endif /* !UNIV_HOTBACKUP */
4590 
4591   mutex_exit(&dict_sys->mutex);
4592   mutex_free(&dict_sys->mutex);
4593 
4594   rw_lock_free(dict_operation_lock);
4595 
4596   ut_free(dict_operation_lock);
4597   dict_operation_lock = nullptr;
4598 
4599   mutex_free(&dict_foreign_err_mutex);
4600 
4601 #ifndef UNIV_HOTBACKUP
4602   if (dict_foreign_err_file != nullptr) {
4603     fclose(dict_foreign_err_file);
4604     dict_foreign_err_file = nullptr;
4605   }
4606 #endif /* !UNIV_HOTBACKUP */
4607 
4608   ut_ad(dict_sys->size == 0);
4609 
4610   ut_free(dict_sys);
4611   dict_sys = nullptr;
4612 }
4613 
4614 #ifndef UNIV_HOTBACKUP
4615 #ifdef UNIV_DEBUG
4616 /** Validate the dictionary table LRU list.
4617  @return true if valid */
dict_lru_validate(void)4618 static ibool dict_lru_validate(void) {
4619   dict_table_t *table;
4620 
4621   ut_ad(mutex_own(&dict_sys->mutex));
4622 
4623   for (table = UT_LIST_GET_FIRST(dict_sys->table_LRU); table != nullptr;
4624        table = UT_LIST_GET_NEXT(table_LRU, table)) {
4625     ut_a(table->can_be_evicted);
4626   }
4627 
4628   for (table = UT_LIST_GET_FIRST(dict_sys->table_non_LRU); table != nullptr;
4629        table = UT_LIST_GET_NEXT(table_LRU, table)) {
4630     ut_a(!table->can_be_evicted);
4631   }
4632 
4633   return (TRUE);
4634 }
4635 
4636 /** Check if a table exists in the dict table LRU list.
4637  @return true if table found in LRU list */
dict_lru_find_table(const dict_table_t * find_table)4638 static ibool dict_lru_find_table(
4639     const dict_table_t *find_table) /*!< in: table to find */
4640 {
4641   dict_table_t *table;
4642 
4643   ut_ad(find_table != nullptr);
4644   ut_ad(mutex_own(&dict_sys->mutex));
4645 
4646   for (table = UT_LIST_GET_FIRST(dict_sys->table_LRU); table != nullptr;
4647        table = UT_LIST_GET_NEXT(table_LRU, table)) {
4648     ut_a(table->can_be_evicted);
4649 
4650     if (table == find_table) {
4651       return (TRUE);
4652     }
4653   }
4654 
4655   return (FALSE);
4656 }
4657 
4658 /** Check if a table exists in the dict table non-LRU list.
4659  @return true if table found in non-LRU list */
dict_non_lru_find_table(const dict_table_t * find_table)4660 static ibool dict_non_lru_find_table(
4661     const dict_table_t *find_table) /*!< in: table to find */
4662 {
4663   dict_table_t *table;
4664 
4665   ut_ad(find_table != nullptr);
4666   ut_ad(mutex_own(&dict_sys->mutex));
4667 
4668   for (table = UT_LIST_GET_FIRST(dict_sys->table_non_LRU); table != nullptr;
4669        table = UT_LIST_GET_NEXT(table_LRU, table)) {
4670     ut_a(!table->can_be_evicted);
4671 
4672     if (table == find_table) {
4673       return (TRUE);
4674     }
4675   }
4676 
4677   return (FALSE);
4678 }
4679 #endif /* UNIV_DEBUG */
4680 /** Check an index to see whether its first fields are the columns in the array,
4681  in the same order and is not marked for deletion and is not the same
4682  as types_idx.
4683  @return true if the index qualifies, otherwise false */
dict_foreign_qualify_index(const dict_table_t * table,const char ** col_names,const char ** columns,ulint n_cols,const dict_index_t * index,const dict_index_t * types_idx,bool check_charsets,ulint check_null)4684 bool dict_foreign_qualify_index(
4685     const dict_table_t *table, /*!< in: table */
4686     const char **col_names,
4687     /*!< in: column names, or NULL
4688     to use table->col_names */
4689     const char **columns,      /*!< in: array of column names */
4690     ulint n_cols,              /*!< in: number of columns */
4691     const dict_index_t *index, /*!< in: index to check */
4692     const dict_index_t *types_idx,
4693     /*!< in: NULL or an index
4694     whose types the column types
4695     must match */
4696     bool check_charsets,
4697     /*!< in: whether to check
4698     charsets.  only has an effect
4699     if types_idx != NULL */
4700     ulint check_null)
4701 /*!< in: nonzero if none of
4702 the columns must be declared
4703 NOT NULL */
4704 {
4705   if (dict_index_get_n_fields(index) < n_cols) {
4706     return (false);
4707   }
4708 
4709   for (ulint i = 0; i < n_cols; i++) {
4710     dict_field_t *field;
4711     const char *col_name;
4712     ulint col_no;
4713 
4714     field = index->get_field(i);
4715     col_no = dict_col_get_no(field->col);
4716 
4717     if (field->prefix_len != 0) {
4718       /* We do not accept column prefix
4719       indexes here */
4720       return (false);
4721     }
4722 
4723     if (check_null && (field->col->prtype & DATA_NOT_NULL)) {
4724       return (false);
4725     }
4726 
4727     col_name = col_names ? col_names[col_no]
4728                          : (field->col->is_virtual()
4729                                 ? dict_table_get_v_col_name_mysql(table, col_no)
4730                                 : table->get_col_name(col_no));
4731 
4732     if (0 != innobase_strcasecmp(columns[i], col_name)) {
4733       return (false);
4734     }
4735 
4736     if (types_idx &&
4737         !cmp_cols_are_equal(index->get_col(i), types_idx->get_col(i),
4738                             check_charsets)) {
4739       return (false);
4740     }
4741   }
4742 
4743   return (true);
4744 }
4745 
4746 /** Update the state of compression failure padding heuristics. This is
4747  called whenever a compression operation succeeds or fails.
4748  The caller must be holding info->mutex */
dict_index_zip_pad_update(zip_pad_info_t * info,ulint zip_threshold)4749 static void dict_index_zip_pad_update(
4750     zip_pad_info_t *info, /*!< in/out: info to be updated */
4751     ulint zip_threshold)  /*!< in: zip threshold value */
4752 {
4753   ulint total;
4754   ulint fail_pct;
4755 
4756   ut_ad(info);
4757 
4758   total = info->success + info->failure;
4759 
4760   ut_ad(total > 0);
4761 
4762   if (zip_threshold == 0) {
4763     /* User has just disabled the padding. */
4764     return;
4765   }
4766 
4767   if (total < ZIP_PAD_ROUND_LEN) {
4768     /* We are in middle of a round. Do nothing. */
4769     return;
4770   }
4771 
4772   /* We are at a 'round' boundary. Reset the values but first
4773   calculate fail rate for our heuristic. */
4774   fail_pct = (info->failure * 100) / total;
4775   info->failure = 0;
4776   info->success = 0;
4777 
4778   if (fail_pct > zip_threshold) {
4779     /* Compression failures are more then user defined
4780     threshold. Increase the pad size to reduce chances of
4781     compression failures. */
4782     ut_ad(info->pad % ZIP_PAD_INCR == 0);
4783 
4784     /* Only do increment if it won't increase padding
4785     beyond max pad size. */
4786     if (info->pad + ZIP_PAD_INCR < (UNIV_PAGE_SIZE * zip_pad_max) / 100) {
4787       /* Use atomics even though we have the mutex.
4788       This is to ensure that we are able to read
4789       info->pad atomically. */
4790       os_atomic_increment_ulint(&info->pad, ZIP_PAD_INCR);
4791 
4792       MONITOR_INC(MONITOR_PAD_INCREMENTS);
4793     }
4794 
4795     info->n_rounds = 0;
4796 
4797   } else {
4798     /* Failure rate was OK. Another successful round
4799     completed. */
4800     ++info->n_rounds;
4801 
4802     /* If enough successful rounds are completed with
4803     compression failure rate in control, decrease the
4804     padding. */
4805     if (info->n_rounds >= ZIP_PAD_SUCCESSFUL_ROUND_LIMIT && info->pad > 0) {
4806       ut_ad(info->pad % ZIP_PAD_INCR == 0);
4807       /* Use atomics even though we have the mutex.
4808       This is to ensure that we are able to read
4809       info->pad atomically. */
4810       os_atomic_decrement_ulint(&info->pad, ZIP_PAD_INCR);
4811 
4812       info->n_rounds = 0;
4813 
4814       MONITOR_INC(MONITOR_PAD_DECREMENTS);
4815     }
4816   }
4817 }
4818 
4819 /** This function should be called whenever a page is successfully
4820  compressed. Updates the compression padding information. */
dict_index_zip_success(dict_index_t * index)4821 void dict_index_zip_success(
4822     dict_index_t *index) /*!< in/out: index to be updated. */
4823 {
4824   ut_ad(index);
4825 
4826   ulint zip_threshold = zip_failure_threshold_pct;
4827   if (!zip_threshold) {
4828     /* Disabled by user. */
4829     return;
4830   }
4831 
4832   dict_index_zip_pad_lock(index);
4833   ++index->zip_pad.success;
4834   dict_index_zip_pad_update(&index->zip_pad, zip_threshold);
4835   dict_index_zip_pad_unlock(index);
4836 }
4837 
4838 /** This function should be called whenever a page compression attempt
4839  fails. Updates the compression padding information. */
dict_index_zip_failure(dict_index_t * index)4840 void dict_index_zip_failure(
4841     dict_index_t *index) /*!< in/out: index to be updated. */
4842 {
4843   ut_ad(index);
4844 
4845   ulint zip_threshold = zip_failure_threshold_pct;
4846   if (!zip_threshold) {
4847     /* Disabled by user. */
4848     return;
4849   }
4850 
4851   dict_index_zip_pad_lock(index);
4852   ++index->zip_pad.failure;
4853   dict_index_zip_pad_update(&index->zip_pad, zip_threshold);
4854   dict_index_zip_pad_unlock(index);
4855 }
4856 
4857 /** Return the optimal page size, for which page will likely compress.
4858  @return page size beyond which page might not compress */
dict_index_zip_pad_optimal_page_size(dict_index_t * index)4859 ulint dict_index_zip_pad_optimal_page_size(
4860     dict_index_t *index) /*!< in: index for which page size
4861                          is requested */
4862 {
4863   ulint pad;
4864   ulint min_sz;
4865   ulint sz;
4866 
4867   ut_ad(index);
4868 
4869   if (!zip_failure_threshold_pct) {
4870     /* Disabled by user. */
4871     return (UNIV_PAGE_SIZE);
4872   }
4873 
4874   /* We use atomics to read index->zip_pad.pad. Here we use zero
4875   as increment as are not changing the value of the 'pad'. */
4876 
4877   pad = os_atomic_increment_ulint(&index->zip_pad.pad, 0);
4878 
4879   ut_ad(pad < UNIV_PAGE_SIZE);
4880   sz = UNIV_PAGE_SIZE - pad;
4881 
4882   /* Min size allowed by user. */
4883   ut_ad(zip_pad_max < 100);
4884   min_sz = (UNIV_PAGE_SIZE * (100 - zip_pad_max)) / 100;
4885 
4886   return (ut_max(sz, min_sz));
4887 }
4888 
4889 /** Convert a 32 bit integer table flags to the 32 bit FSP Flags.
4890 Fsp Flags are written into the tablespace header at the offset
4891 FSP_SPACE_FLAGS and are also stored in the fil_space_t::flags field.
4892 The following chart shows the translation of the low order bit.
4893 Other bits are the same.
4894                         Low order bit
4895                     | REDUNDANT | COMPACT | COMPRESSED | DYNAMIC
4896 dict_table_t::flags |     0     |    1    |     1      |    1
4897 fil_space_t::flags  |     0     |    0    |     1      |    1
4898 @param[in]	table_flags	dict_table_t::flags
4899 @return tablespace flags (fil_space_t::flags) */
dict_tf_to_fsp_flags(uint32_t table_flags)4900 uint32_t dict_tf_to_fsp_flags(uint32_t table_flags) {
4901   DBUG_EXECUTE_IF("dict_tf_to_fsp_flags_failure", return (UINT32_UNDEFINED););
4902 
4903   bool has_atomic_blobs = DICT_TF_HAS_ATOMIC_BLOBS(table_flags);
4904   page_size_t page_size = dict_tf_get_page_size(table_flags);
4905   bool has_data_dir = DICT_TF_HAS_DATA_DIR(table_flags);
4906   bool is_shared = DICT_TF_HAS_SHARED_SPACE(table_flags);
4907 
4908   ut_ad(!page_size.is_compressed() || has_atomic_blobs);
4909 
4910   /* General tablespaces that are not compressed do not get the
4911   flags for dynamic row format (ATOMIC_BLOBS) */
4912   if (is_shared && !page_size.is_compressed()) {
4913     has_atomic_blobs = false;
4914   }
4915 
4916   uint32_t fsp_flags = fsp_flags_init(page_size, has_atomic_blobs, has_data_dir,
4917                                       is_shared, false);
4918 
4919   return (fsp_flags);
4920 }
4921 
4922 /** Convert table flag to row format string.
4923  @return row format name. */
dict_tf_to_row_format_string(uint32_t table_flag)4924 const char *dict_tf_to_row_format_string(
4925     uint32_t table_flag) /*!< in: row format setting */
4926 {
4927   switch (dict_tf_get_rec_format(table_flag)) {
4928     case REC_FORMAT_REDUNDANT:
4929       return ("ROW_TYPE_REDUNDANT");
4930     case REC_FORMAT_COMPACT:
4931       return ("ROW_TYPE_COMPACT");
4932     case REC_FORMAT_COMPRESSED:
4933       return ("ROW_TYPE_COMPRESSED");
4934     case REC_FORMAT_DYNAMIC:
4935       return ("ROW_TYPE_DYNAMIC");
4936   }
4937 
4938   ut_error;
4939 }
4940 
4941 /** Determine the extent size (in pages) for the given table
4942 @param[in]	table	the table whose extent size is being
4943                         calculated.
4944 @return extent size in pages (256, 128 or 64) */
dict_table_extent_size(const dict_table_t * table)4945 page_no_t dict_table_extent_size(const dict_table_t *table) {
4946   const ulint mb_1 = 1024 * 1024;
4947   const ulint mb_2 = 2 * mb_1;
4948   const ulint mb_4 = 4 * mb_1;
4949 
4950   page_size_t page_size = dict_table_page_size(table);
4951   page_no_t pages_in_extent = FSP_EXTENT_SIZE;
4952 
4953   if (page_size.is_compressed()) {
4954     ulint disk_page_size = page_size.physical();
4955 
4956     switch (disk_page_size) {
4957       case 1024:
4958         pages_in_extent = mb_1 / 1024;
4959         break;
4960       case 2048:
4961         pages_in_extent = mb_1 / 2048;
4962         break;
4963       case 4096:
4964         pages_in_extent = mb_1 / 4096;
4965         break;
4966       case 8192:
4967         pages_in_extent = mb_1 / 8192;
4968         break;
4969       case 16384:
4970         pages_in_extent = mb_1 / 16384;
4971         break;
4972       case 32768:
4973         pages_in_extent = mb_2 / 32768;
4974         break;
4975       case 65536:
4976         pages_in_extent = mb_4 / 65536;
4977         break;
4978       default:
4979         ut_ad(0);
4980     }
4981   }
4982 
4983   return (pages_in_extent);
4984 }
4985 
4986 /** Default constructor */
DDTableBuffer()4987 DDTableBuffer::DDTableBuffer() {
4988   init();
4989 
4990   /* Check if we need to recover it, in case of crash */
4991   btr_truncate_recover(m_index);
4992 }
4993 
4994 /** Destructor */
~DDTableBuffer()4995 DDTableBuffer::~DDTableBuffer() { close(); }
4996 
4997 /* Create the search and replace tuples */
create_tuples()4998 void DDTableBuffer::create_tuples() {
4999   const dict_col_t *col;
5000   dfield_t *dfield;
5001   byte *sys_buf;
5002   byte *id_buf;
5003 
5004   id_buf = static_cast<byte *>(mem_heap_alloc(m_heap, 8));
5005   memset(id_buf, 0, sizeof *id_buf);
5006 
5007   m_search_tuple = dtuple_create(m_heap, 1);
5008   dict_index_copy_types(m_search_tuple, m_index, 1);
5009 
5010   dfield = dtuple_get_nth_field(m_search_tuple, 0);
5011   dfield_set_data(dfield, id_buf, 8);
5012 
5013   /* Allocate another memory for this tuple */
5014   id_buf = static_cast<byte *>(mem_heap_alloc(m_heap, 8));
5015   memset(id_buf, 0, sizeof *id_buf);
5016 
5017   m_replace_tuple = dtuple_create(m_heap, N_COLS);
5018   dict_table_copy_types(m_replace_tuple, m_index->table);
5019 
5020   dfield = dtuple_get_nth_field(m_replace_tuple, TABLE_ID_FIELD_NO);
5021   dfield_set_data(dfield, id_buf, 8);
5022 
5023   /* Initialize system fields, we always write fake value. */
5024   sys_buf = static_cast<byte *>(mem_heap_alloc(m_heap, 8));
5025   memset(sys_buf, 0xFF, 8);
5026 
5027   col = m_index->table->get_sys_col(DATA_ROW_ID);
5028   dfield = dtuple_get_nth_field(m_replace_tuple, dict_col_get_no(col));
5029   dfield_set_data(dfield, sys_buf, DATA_ROW_ID_LEN);
5030 
5031   col = m_index->table->get_sys_col(DATA_TRX_ID);
5032   dfield = dtuple_get_nth_field(m_replace_tuple, dict_col_get_no(col));
5033   dfield_set_data(dfield, sys_buf, DATA_TRX_ID_LEN);
5034 
5035   col = m_index->table->get_sys_col(DATA_ROLL_PTR);
5036   dfield = dtuple_get_nth_field(m_replace_tuple, dict_col_get_no(col));
5037   dfield_set_data(dfield, sys_buf, DATA_ROLL_PTR_LEN);
5038 }
5039 
5040 /** Initialize the in-memory index */
init()5041 void DDTableBuffer::init() {
5042   if (dict_sys->dynamic_metadata != nullptr) {
5043     ut_ad(dict_table_is_comp(dict_sys->dynamic_metadata));
5044     m_index = dict_sys->dynamic_metadata->first_index();
5045   } else {
5046     open();
5047     dict_sys->dynamic_metadata = m_index->table;
5048   }
5049 
5050   ut_ad(m_index->next() == nullptr);
5051   ut_ad(m_index->n_uniq == 1);
5052   ut_ad(N_FIELDS == m_index->n_fields);
5053   ut_ad(m_index->table->n_cols == N_COLS);
5054 
5055   /* We don't need AHI for this table */
5056   m_index->disable_ahi = true;
5057   m_index->cached = true;
5058 
5059   m_heap = mem_heap_create(500);
5060   m_dynamic_heap = mem_heap_create(1000);
5061   m_replace_heap = mem_heap_create(1000);
5062 
5063   create_tuples();
5064 }
5065 
5066 /** Open the mysql.innodb_dynamic_metadata when DD is not fully up */
open()5067 void DDTableBuffer::open() {
5068   ut_ad(dict_sys->dynamic_metadata == nullptr);
5069 
5070   dict_table_t *table = nullptr;
5071   /* Keep it the same with definition of mysql/innodb_dynamic_metadata */
5072   const char *table_name = "mysql/innodb_dynamic_metadata";
5073   const char *table_id_name = "table_id";
5074   const char *version_name = "version";
5075   const char *metadata_name = "metadata";
5076   ulint prtype = 0;
5077   mem_heap_t *heap = mem_heap_create(256);
5078 
5079   /* Get the root page number according to index id, this is
5080   same with what we do in ha_innobsae::get_se_private_data() */
5081   page_no_t root = 4;
5082   space_index_t index_id = 0;
5083   while (true) {
5084     if (fsp_is_inode_page(root)) {
5085       ++root;
5086       ut_ad(!fsp_is_inode_page(root));
5087     }
5088 
5089     if (++index_id == dict_sys_t::s_dynamic_meta_index_id) {
5090       break;
5091     }
5092 
5093     ++root;
5094   }
5095 
5096   table = dict_mem_table_create(table_name, dict_sys_t::s_space_id, N_USER_COLS,
5097                                 0, 0, 0, 0);
5098 
5099   table->id = dict_sys_t::s_dynamic_meta_table_id;
5100   table->is_dd_table = true;
5101   table->dd_space_id = dict_sys_t::s_dd_space_id;
5102   table->flags |= DICT_TF_COMPACT | (1 << DICT_TF_POS_SHARED_SPACE) |
5103                   (1 << DICT_TF_POS_ATOMIC_BLOBS);
5104 
5105   prtype = dtype_form_prtype(
5106       MYSQL_TYPE_LONGLONG | DATA_NOT_NULL | DATA_UNSIGNED | DATA_BINARY_TYPE,
5107       0);
5108 
5109   dict_mem_table_add_col(table, heap, table_id_name, DATA_INT, prtype, 8, true);
5110   dict_mem_table_add_col(table, heap, version_name, DATA_INT, prtype, 8, true);
5111 
5112   prtype =
5113       dtype_form_prtype(MYSQL_TYPE_BLOB | DATA_NOT_NULL | DATA_BINARY_TYPE, 63);
5114 
5115   dict_mem_table_add_col(table, heap, metadata_name, DATA_BLOB, prtype, 10,
5116                          true);
5117 
5118   dict_table_add_system_columns(table, heap);
5119 
5120   m_index = dict_mem_index_create(table_name, "PRIMARY", dict_sys_t::s_space_id,
5121                                   DICT_CLUSTERED | DICT_UNIQUE, 1);
5122 
5123   dict_index_add_col(m_index, table, &table->cols[0], 0, true);
5124 
5125   m_index->id = dict_sys_t::s_dynamic_meta_index_id;
5126   m_index->n_uniq = 1;
5127 
5128   dberr_t err = dict_index_add_to_cache(table, m_index, root, false);
5129   if (err != DB_SUCCESS) {
5130     ut_ad(0);
5131   }
5132 
5133   m_index = table->first_index();
5134 
5135   mutex_enter(&dict_sys->mutex);
5136 
5137   dict_table_add_to_cache(table, true, heap);
5138 
5139   table->acquire();
5140 
5141   mutex_exit(&dict_sys->mutex);
5142 
5143   mem_heap_free(heap);
5144 }
5145 
5146 /** Initialize the id field of tuple
5147 @param[out]	tuple	the tuple to be initialized
5148 @param[in]	id	table id */
init_tuple_with_id(dtuple_t * tuple,table_id_t id)5149 void DDTableBuffer::init_tuple_with_id(dtuple_t *tuple, table_id_t id) {
5150   dfield_t *dfield = dtuple_get_nth_field(tuple, TABLE_ID_FIELD_NO);
5151   void *data = dfield->data;
5152 
5153   mach_write_to_8(data, id);
5154   dfield_set_data(dfield, data, 8);
5155 }
5156 
5157 /** Free the things initialized in init() */
close()5158 void DDTableBuffer::close() {
5159   mem_heap_free(m_heap);
5160   mem_heap_free(m_dynamic_heap);
5161   mem_heap_free(m_replace_heap);
5162 
5163   m_search_tuple = nullptr;
5164   m_replace_tuple = nullptr;
5165 }
5166 
5167 /** Prepare for a update on METADATA field
5168 @param[in]	entry	clustered index entry to replace rec
5169 @param[in]	rec	clustered index record
5170 @return update vector of differing fields without system columns,
5171 or NULL if there isn't any different field */
update_set_metadata(const dtuple_t * entry,const rec_t * rec)5172 upd_t *DDTableBuffer::update_set_metadata(const dtuple_t *entry,
5173                                           const rec_t *rec) {
5174   ulint offsets[N_FIELDS + 1 + REC_OFFS_HEADER_SIZE];
5175   upd_field_t *upd_field;
5176   const dfield_t *version_field;
5177   const dfield_t *metadata_dfield;
5178   const byte *metadata;
5179   const byte *version;
5180   ulint len;
5181   upd_t *update;
5182 
5183   rec_offs_init(offsets);
5184   rec_offs_set_n_fields(offsets, N_FIELDS);
5185   rec_init_offsets_comp_ordinary(rec, false, m_index, offsets);
5186   ut_ad(!rec_get_deleted_flag(rec, 1));
5187 
5188   version = rec_get_nth_field(rec, offsets, VERSION_FIELD_NO, &len);
5189   ut_ad(len == 8);
5190   version_field = dtuple_get_nth_field(entry, VERSION_FIELD_NO);
5191 
5192   metadata = rec_get_nth_field(rec, offsets, METADATA_FIELD_NO, &len);
5193   metadata_dfield = dtuple_get_nth_field(entry, METADATA_FIELD_NO);
5194 
5195   if (dfield_data_is_binary_equal(version_field, 8, version) &&
5196       dfield_data_is_binary_equal(metadata_dfield, len, metadata)) {
5197     return (nullptr);
5198   }
5199 
5200   update = upd_create(2, m_replace_heap);
5201 
5202   upd_field = upd_get_nth_field(update, 0);
5203   dfield_copy(&upd_field->new_val, version_field);
5204   upd_field_set_field_no(upd_field, VERSION_FIELD_NO, m_index, nullptr);
5205 
5206   upd_field = upd_get_nth_field(update, 1);
5207   dfield_copy(&upd_field->new_val, metadata_dfield);
5208   upd_field_set_field_no(upd_field, METADATA_FIELD_NO, m_index, nullptr);
5209 
5210   ut_ad(update->validate());
5211 
5212   return (update);
5213 }
5214 
5215 /** Replace the dynamic metadata for a specific table
5216 @param[in]	id		table id
5217 @param[in]	version		table dynamic metadata version
5218 @param[in]	metadata	the metadata we want to replace
5219 @param[in]	len		the metadata length
5220 @return DB_SUCCESS or error code */
replace(table_id_t id,uint64_t version,const byte * metadata,size_t len)5221 dberr_t DDTableBuffer::replace(table_id_t id, uint64_t version,
5222                                const byte *metadata, size_t len) {
5223   dtuple_t *entry;
5224   dfield_t *dfield;
5225   btr_pcur_t pcur;
5226   mtr_t mtr;
5227   byte ver[8];
5228   dberr_t error;
5229 
5230   ut_ad(mutex_own(&dict_persist->mutex));
5231 
5232   init_tuple_with_id(m_search_tuple, id);
5233 
5234   init_tuple_with_id(m_replace_tuple, id);
5235   mach_write_to_8(ver, version);
5236   dfield = dtuple_get_nth_field(m_replace_tuple, VERSION_COL_NO);
5237   dfield_set_data(dfield, ver, sizeof ver);
5238   dfield = dtuple_get_nth_field(m_replace_tuple, METADATA_COL_NO);
5239   dfield_set_data(dfield, metadata, len);
5240   /* Other system fields have been initialized */
5241 
5242   entry =
5243       row_build_index_entry(m_replace_tuple, nullptr, m_index, m_replace_heap);
5244 
5245   /* Start to search for the to-be-replaced tuple */
5246   mtr.start();
5247 
5248   btr_pcur_open(m_index, m_search_tuple, PAGE_CUR_LE, BTR_MODIFY_TREE, &pcur,
5249                 &mtr);
5250 
5251   if (page_rec_is_infimum(btr_pcur_get_rec(&pcur)) ||
5252       btr_pcur_get_low_match(&pcur) < m_index->n_uniq) {
5253     /* The record was not found, so it's the first time we
5254     add the row for this table of id, we need to insert it */
5255     static const ulint flags = (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG |
5256                                 BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG);
5257 
5258     mtr.commit();
5259 
5260     error =
5261         row_ins_clust_index_entry_low(flags, BTR_MODIFY_TREE, m_index,
5262                                       m_index->n_uniq, entry, nullptr, false);
5263     ut_a(error == DB_SUCCESS);
5264 
5265     mem_heap_empty(m_dynamic_heap);
5266     mem_heap_empty(m_replace_heap);
5267 
5268     return (DB_SUCCESS);
5269   }
5270 
5271   ut_ad(!rec_get_deleted_flag(btr_pcur_get_rec(&pcur), true));
5272 
5273   /* Prepare to update the record. */
5274   upd_t *update = update_set_metadata(entry, btr_pcur_get_rec(&pcur));
5275 
5276   if (update != nullptr) {
5277     ulint *cur_offsets = nullptr;
5278     big_rec_t *big_rec;
5279     static const ulint flags =
5280         (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG |
5281          BTR_KEEP_POS_FLAG | BTR_KEEP_SYS_FLAG);
5282 
5283     error = btr_cur_pessimistic_update(
5284         flags, btr_pcur_get_btr_cur(&pcur), &cur_offsets, &m_dynamic_heap,
5285         m_replace_heap, &big_rec, update, 0, nullptr, 0, 0, &mtr);
5286     ut_a(error == DB_SUCCESS);
5287     /* We don't have big rec in this table */
5288     ut_ad(!big_rec);
5289   }
5290 
5291   mtr.commit();
5292   mem_heap_empty(m_dynamic_heap);
5293   mem_heap_empty(m_replace_heap);
5294 
5295   return (DB_SUCCESS);
5296 }
5297 
5298 /** Remove the whole row for a specific table
5299 @param[in]	id	table id
5300 @return DB_SUCCESS or error code */
remove(table_id_t id)5301 dberr_t DDTableBuffer::remove(table_id_t id) {
5302   btr_pcur_t pcur;
5303   mtr_t mtr;
5304   dberr_t error;
5305 
5306   ut_ad(mutex_own(&dict_persist->mutex));
5307 
5308   init_tuple_with_id(m_search_tuple, id);
5309 
5310   mtr.start();
5311 
5312   btr_pcur_open(m_index, m_search_tuple, PAGE_CUR_LE,
5313                 BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE, &pcur, &mtr);
5314 
5315   if (!page_rec_is_infimum(btr_pcur_get_rec(&pcur)) &&
5316       btr_pcur_get_low_match(&pcur) == m_index->n_uniq) {
5317     DEBUG_SYNC_C("delete_metadata_before");
5318 
5319     btr_cur_pessimistic_delete(&error, false, btr_pcur_get_btr_cur(&pcur),
5320                                BTR_CREATE_FLAG, false, 0, 0, 0, &mtr);
5321     ut_ad(error == DB_SUCCESS);
5322   }
5323 
5324   mtr.commit();
5325 
5326   return (DB_SUCCESS);
5327 }
5328 
5329 /** Truncate the table. We can call it after all the dynamic metadata
5330 has been written back to DD table */
truncate()5331 void DDTableBuffer::truncate() {
5332   ut_ad(mutex_own(&dict_persist->mutex));
5333 
5334   btr_truncate(m_index);
5335 }
5336 
5337 /** Get the buffered metadata for a specific table, the caller
5338 has to delete the returned std::string object by UT_DELETE
5339 @param[in]	id	table id
5340 @param[out]	version	table dynamic metadata version
5341 @return the metadata got in a string object, if nothing, the
5342 string would be of length 0 */
get(table_id_t id,uint64 * version)5343 std::string *DDTableBuffer::get(table_id_t id, uint64 *version) {
5344   btr_cur_t cursor;
5345   mtr_t mtr;
5346   ulint len;
5347   const byte *field = nullptr;
5348 
5349   ut_ad(mutex_own(&dict_persist->mutex));
5350 
5351   init_tuple_with_id(m_search_tuple, id);
5352 
5353   mtr.start();
5354 
5355   btr_cur_search_to_nth_level(m_index, 0, m_search_tuple, PAGE_CUR_LE,
5356                               BTR_SEARCH_LEAF, &cursor, 0, __FILE__, __LINE__,
5357                               &mtr);
5358 
5359   if (cursor.low_match == dtuple_get_n_fields(m_search_tuple)) {
5360     ulint offsets[N_FIELDS + 1 + REC_OFFS_HEADER_SIZE];
5361     rec_offs_init(offsets);
5362     rec_offs_set_n_fields(offsets, N_FIELDS);
5363     rec_t *rec = btr_cur_get_rec(&cursor);
5364     rec_init_offsets_comp_ordinary(rec, false, m_index, offsets);
5365     ut_ad(!rec_get_deleted_flag(rec, true));
5366 
5367     const byte *rec_version =
5368         rec_get_nth_field(rec, offsets, VERSION_FIELD_NO, &len);
5369     ut_ad(len == 8);
5370     *version = mach_read_from_8(rec_version);
5371 
5372     field = rec_get_nth_field(rec, offsets, METADATA_FIELD_NO, &len);
5373 
5374     ut_ad(len != UNIV_SQL_NULL);
5375   } else {
5376     len = 0;
5377     *version = 0;
5378   }
5379 
5380   std::string *metadata =
5381       UT_NEW_NOKEY(std::string(reinterpret_cast<const char *>(field), len));
5382 
5383   mtr.commit();
5384 
5385   return (metadata);
5386 }
5387 #endif /* !UNIV_HOTBACKUP */
5388 
5389 /** Write MLOG_TABLE_DYNAMIC_META for persistent dynamic metadata of table
5390 @param[in]	id		table id
5391 @param[in]	metadata	metadata used to write the log
5392 @param[in,out]	mtr		mini-transaction */
write_log(table_id_t id,const PersistentTableMetadata & metadata,mtr_t * mtr) const5393 void Persister::write_log(table_id_t id,
5394                           const PersistentTableMetadata &metadata,
5395                           mtr_t *mtr) const {
5396   byte *log_ptr;
5397   ulint size = get_write_size(metadata);
5398   /* Both table id and version would be written in a compressed format,
5399   each of which would cost 1..11 bytes, and MLOG_TABLE_DYNAMIC_META costs
5400   1 byte. Refer to mlog_write_initial_dict_log_record() as well */
5401   static constexpr uint8_t metadata_log_header_size = 23;
5402 
5403   ut_ad(size > 0);
5404 
5405   if (!mlog_open_metadata(mtr, metadata_log_header_size + size, log_ptr)) {
5406     /* Currently possible only when global redo logging is not enabled. */
5407     ut_ad(!mtr_t::s_logging.is_enabled());
5408     return;
5409   }
5410 
5411   log_ptr = mlog_write_initial_dict_log_record(
5412       MLOG_TABLE_DYNAMIC_META, id, metadata.get_version(), log_ptr, mtr);
5413 
5414   ulint consumed = write(metadata, log_ptr, size);
5415   log_ptr += consumed;
5416 
5417   mlog_close(mtr, log_ptr);
5418 }
5419 
5420 /** Write the corrupted indexes of a table, we can pre-calculate the size
5421 by calling get_write_size()
5422 @param[in]	metadata	persistent data
5423 @param[out]	buffer		write buffer
5424 @param[in]	size		size of write buffer, should be at least
5425                                 get_write_size()
5426 @return the length of bytes written */
write(const PersistentTableMetadata & metadata,byte * buffer,ulint size) const5427 ulint CorruptedIndexPersister::write(const PersistentTableMetadata &metadata,
5428                                      byte *buffer, ulint size) const {
5429   ulint length = 0;
5430   corrupted_ids_t corrupted_ids = metadata.get_corrupted_indexes();
5431   ulint num = corrupted_ids.size();
5432 
5433   ut_ad(num < MAX_INDEXES);
5434 
5435   if (corrupted_ids.empty()) {
5436     return (0);
5437   }
5438 
5439   /* Write the PM_INDEX_CORRUPTED mark first */
5440   mach_write_to_1(buffer, static_cast<byte>(PM_INDEX_CORRUPTED));
5441   ++length;
5442   ++buffer;
5443 
5444   mach_write_to_1(buffer, num);
5445   ++length;
5446   ++buffer;
5447 
5448   for (ulint i = 0; i < num; ++i) {
5449     mach_write_to_4(buffer, corrupted_ids[i].m_space_id);
5450     mach_write_to_8(buffer + 4, corrupted_ids[i].m_index_id);
5451     length += INDEX_ID_LENGTH;
5452     buffer += INDEX_ID_LENGTH;
5453     ut_ad(length <= size);
5454   }
5455 
5456   return (length);
5457 }
5458 
5459 /** Pre-calculate the size of metadata to be written
5460 @param[in]	metadata	metadata to be written
5461 @return the size of metadata */
get_write_size(const PersistentTableMetadata & metadata) const5462 ulint CorruptedIndexPersister::get_write_size(
5463     const PersistentTableMetadata &metadata) const {
5464   ulint length = 0;
5465   corrupted_ids_t corrupted_ids = metadata.get_corrupted_indexes();
5466 
5467   ut_ad(corrupted_ids.size() < MAX_INDEXES);
5468 
5469   if (corrupted_ids.empty()) {
5470     return (0);
5471   }
5472 
5473   /* PM_INDEX_CORRUPTED mark and number of corrupted indexes' ids */
5474   length += 1 + 1;
5475   length += corrupted_ids.size() * INDEX_ID_LENGTH;
5476 
5477   return (length);
5478 }
5479 
5480 /** Read the corrupted indexes from buffer, and store them to
5481 metadata object
5482 @param[out]	metadata	metadata where we store the read data
5483 @param[in]	buffer		buffer to read
5484 @param[in]	size		size of buffer
5485 @param[out]	corrupt		true if we found something wrong in
5486                                 the buffer except incomplete buffer,
5487                                 otherwise false
5488 @return the bytes we read from the buffer if the buffer data
5489 is complete and we get everything, 0 if the buffer is incompleted */
read(PersistentTableMetadata & metadata,const byte * buffer,ulint size,bool * corrupt) const5490 ulint CorruptedIndexPersister::read(PersistentTableMetadata &metadata,
5491                                     const byte *buffer, ulint size,
5492                                     bool *corrupt) const {
5493   const byte *end = buffer + size;
5494   ulint consumed = 0;
5495   byte type;
5496   ulint num;
5497 
5498   *corrupt = false;
5499 
5500   /* It should contain PM_INDEX_CORRUPTED and number at least */
5501   if (size <= 2) {
5502     return (0);
5503   }
5504 
5505   type = *buffer++;
5506   ++consumed;
5507 
5508   if (type != PM_INDEX_CORRUPTED) {
5509     *corrupt = true;
5510     return (consumed);
5511   }
5512 
5513   num = mach_read_from_1(buffer);
5514   ++consumed;
5515   ++buffer;
5516 
5517   if (num == 0 || num > MAX_INDEXES) {
5518     *corrupt = true;
5519     return (consumed);
5520   }
5521 
5522   if (buffer + num * INDEX_ID_LENGTH > end) {
5523     return (0);
5524   }
5525 
5526   for (ulint i = 0; i < num; ++i) {
5527     space_id_t space_id = mach_read_from_4(buffer);
5528     space_index_t index_id = mach_read_from_8(buffer + 4);
5529     metadata.add_corrupted_index(index_id_t(space_id, index_id));
5530 
5531     buffer += INDEX_ID_LENGTH;
5532     consumed += INDEX_ID_LENGTH;
5533   }
5534 
5535   return (consumed);
5536 }
5537 
5538 /** Write the autoinc counter of a table, we can pre-calculate
5539 the size by calling get_write_size()
5540 @param[in]	metadata	persistent metadata
5541 @param[out]	buffer		write buffer
5542 @param[in]	size		size of write buffer, should be
5543                                 at least get_write_size()
5544 @return the length of bytes written */
write(const PersistentTableMetadata & metadata,byte * buffer,ulint size) const5545 ulint AutoIncPersister::write(const PersistentTableMetadata &metadata,
5546                               byte *buffer, ulint size) const {
5547   ulint length = 0;
5548   ib_uint64_t autoinc = metadata.get_autoinc();
5549 
5550   mach_write_to_1(buffer, static_cast<byte>(PM_TABLE_AUTO_INC));
5551   ++length;
5552   ++buffer;
5553 
5554   ulint len = mach_u64_write_much_compressed(buffer, autoinc);
5555   length += len;
5556   buffer += len;
5557 
5558   ut_ad(length <= size);
5559   return (length);
5560 }
5561 
5562 /** Read the autoinc counter from buffer, and store them to
5563 metadata object
5564 @param[out]	metadata	metadata where we store the read data
5565 @param[in]	buffer		buffer to read
5566 @param[in]	size		size of buffer
5567 @param[out]	corrupt		true if we found something wrong in
5568                                 the buffer except incomplete buffer,
5569                                 otherwise false
5570 @return the bytes we read from the buffer if the buffer data
5571 is complete and we get everything, 0 if the buffer is incomplete */
read(PersistentTableMetadata & metadata,const byte * buffer,ulint size,bool * corrupt) const5572 ulint AutoIncPersister::read(PersistentTableMetadata &metadata,
5573                              const byte *buffer, ulint size,
5574                              bool *corrupt) const {
5575   const byte *end = buffer + size;
5576   ulint consumed = 0;
5577   byte type;
5578   ib_uint64_t autoinc;
5579 
5580   *corrupt = false;
5581 
5582   /* It should contain PM_TABLE_AUTO_INC and the counter at least */
5583   if (size < 2) {
5584     return (0);
5585   }
5586 
5587   type = *buffer++;
5588   ++consumed;
5589 
5590   if (type != PM_TABLE_AUTO_INC) {
5591     *corrupt = true;
5592     return (consumed);
5593   }
5594 
5595   const byte *start = buffer;
5596   autoinc = mach_parse_u64_much_compressed(&start, end);
5597 
5598   if (start == nullptr) {
5599     /* Just incomplete data, not corrupted */
5600     return (0);
5601   }
5602 
5603   if (autoinc == 0) {
5604     metadata.set_autoinc(autoinc);
5605   } else {
5606     metadata.set_autoinc_if_bigger(autoinc);
5607   }
5608 
5609   consumed += start - buffer;
5610   ut_ad(consumed <= size);
5611   return (consumed);
5612 }
5613 
5614 /** Destructor */
~Persisters()5615 Persisters::~Persisters() {
5616   persisters_t::iterator iter;
5617   for (iter = m_persisters.begin(); iter != m_persisters.end(); ++iter) {
5618     UT_DELETE(iter->second);
5619   }
5620 }
5621 
5622 /** Get the persister object with specified type
5623 @param[in]	type	persister type
5624 @return Persister object required or NULL if not found */
get(persistent_type_t type) const5625 Persister *Persisters::get(persistent_type_t type) const {
5626   ut_ad(type > PM_SMALLEST_TYPE);
5627   ut_ad(type < PM_BIGGEST_TYPE);
5628 
5629   persisters_t::const_iterator iter = m_persisters.find(type);
5630 
5631   return (iter == m_persisters.end() ? NULL : iter->second);
5632 }
5633 
5634 /** Add a specified persister of type, we will allocate the Persister
5635 if there is no such persister exist, otherwise do nothing and return
5636 the existing one
5637 @param[in]	type	persister type
5638 @return the persister of type */
add(persistent_type_t type)5639 Persister *Persisters::add(persistent_type_t type) {
5640   ut_ad(type > PM_SMALLEST_TYPE);
5641   ut_ad(type < PM_BIGGEST_TYPE);
5642 
5643   Persister *persister = get(type);
5644 
5645   if (persister != nullptr) {
5646     return (persister);
5647   }
5648 
5649   switch (type) {
5650     case PM_INDEX_CORRUPTED:
5651       persister = UT_NEW_NOKEY(CorruptedIndexPersister());
5652       break;
5653     case PM_TABLE_AUTO_INC:
5654       persister = UT_NEW_NOKEY(AutoIncPersister());
5655       break;
5656     default:
5657       ut_ad(0);
5658       break;
5659   }
5660 
5661   m_persisters.insert(std::make_pair(type, persister));
5662 
5663   return (persister);
5664 }
5665 
5666 /** Remove a specified persister of type, we will free the Persister
5667 @param[in]	type	persister type */
remove(persistent_type_t type)5668 void Persisters::remove(persistent_type_t type) {
5669   persisters_t::iterator iter = m_persisters.find(type);
5670 
5671   if (iter != m_persisters.end()) {
5672     UT_DELETE(iter->second);
5673     m_persisters.erase(iter);
5674   }
5675 }
5676 
5677 #ifndef UNIV_HOTBACKUP
5678 /** Serialize the metadata to a buffer
5679 @param[in]	metadata	metadata to serialize
5680 @param[out]	buffer		buffer to store the serialized metadata
5681 @return the length of serialized metadata */
write(PersistentTableMetadata & metadata,byte * buffer)5682 size_t Persisters::write(PersistentTableMetadata &metadata, byte *buffer) {
5683   size_t size = 0;
5684   byte *pos = buffer;
5685   persistent_type_t type;
5686 
5687   for (type = static_cast<persistent_type_t>(PM_SMALLEST_TYPE + 1);
5688        type < PM_BIGGEST_TYPE;
5689        type = static_cast<persistent_type_t>(type + 1)) {
5690     ut_ad(size <= REC_MAX_DATA_SIZE);
5691 
5692     Persister *persister = get(type);
5693     ulint consumed = persister->write(metadata, pos, REC_MAX_DATA_SIZE - size);
5694 
5695     pos += consumed;
5696     size += consumed;
5697   }
5698 
5699   return (size);
5700 }
5701 
5702 /** Close SDI table.
5703 @param[in]	table		the in-meory SDI table object */
dict_sdi_close_table(dict_table_t * table)5704 void dict_sdi_close_table(dict_table_t *table) {
5705   ut_ad(dict_table_is_sdi(table->id));
5706   dict_table_close(table, true, false);
5707 }
5708 
5709 /** Retrieve in-memory index for SDI table.
5710 @param[in]	tablespace_id	innodb tablespace id
5711 @return dict_index_t structure or NULL*/
dict_sdi_get_index(space_id_t tablespace_id)5712 dict_index_t *dict_sdi_get_index(space_id_t tablespace_id) {
5713   dict_table_t *table = dd_table_open_on_id(
5714       dict_sdi_get_table_id(tablespace_id), nullptr, nullptr, true, true);
5715 
5716   if (table != nullptr) {
5717     dict_sdi_close_table(table);
5718     return (table->first_index());
5719   }
5720   return (nullptr);
5721 }
5722 
5723 /** Retrieve in-memory table object for SDI table.
5724 @param[in]	tablespace_id	innodb tablespace id
5725 @param[in]	dict_locked	true if dict_sys mutex is acquired
5726 @param[in]	is_create	true if we are creating index
5727 @return dict_table_t structure */
dict_sdi_get_table(space_id_t tablespace_id,bool dict_locked,bool is_create)5728 dict_table_t *dict_sdi_get_table(space_id_t tablespace_id, bool dict_locked,
5729                                  bool is_create) {
5730   if (is_create) {
5731     if (!dict_locked) {
5732       mutex_enter(&dict_sys->mutex);
5733     }
5734 
5735     dict_sdi_create_idx_in_mem(tablespace_id, false, 0, true);
5736 
5737     if (!dict_locked) {
5738       mutex_exit(&dict_sys->mutex);
5739     }
5740   }
5741   dict_table_t *table =
5742       dd_table_open_on_id(dict_sdi_get_table_id(tablespace_id), nullptr,
5743                           nullptr, dict_locked, true);
5744 
5745   return (table);
5746 }
5747 
5748 /** Remove the SDI table from table cache.
5749 @param[in]	space_id	InnoDB tablespace ID
5750 @param[in]	sdi_table	sdi table
5751 @param[in]	dict_locked	true if dict_sys mutex acquired */
dict_sdi_remove_from_cache(space_id_t space_id,dict_table_t * sdi_table,bool dict_locked)5752 void dict_sdi_remove_from_cache(space_id_t space_id, dict_table_t *sdi_table,
5753                                 bool dict_locked) {
5754   if (sdi_table == nullptr) {
5755     /* Remove SDI table from table cache */
5756     /* We already have MDL protection on tablespace as well
5757     as MDL on SDI table */
5758     sdi_table = dd_table_open_on_id_in_mem(dict_sdi_get_table_id(space_id),
5759                                            dict_locked);
5760     if (sdi_table) {
5761       dd_table_close(sdi_table, nullptr, nullptr, dict_locked);
5762     }
5763   } else {
5764     dd_table_close(sdi_table, nullptr, nullptr, dict_locked);
5765   }
5766 
5767   if (sdi_table) {
5768     if (!dict_locked) {
5769       mutex_enter(&dict_sys->mutex);
5770     }
5771 
5772     dict_table_remove_from_cache(sdi_table);
5773 
5774     if (!dict_locked) {
5775       mutex_exit(&dict_sys->mutex);
5776     }
5777   }
5778 }
5779 
5780 /** Change the table_id of SYS_* tables if they have been created after
5781 an earlier upgrade. This will update the table_id by adding DICT_MAX_DD_TABLES
5782 */
dict_table_change_id_sys_tables()5783 void dict_table_change_id_sys_tables() {
5784   ut_ad(mutex_own(&dict_sys->mutex));
5785 
5786   for (uint32_t i = 0; i < SYS_NUM_SYSTEM_TABLES; i++) {
5787     dict_table_t *system_table = dict_table_get_low(SYSTEM_TABLE_NAME[i]);
5788 
5789     ut_a(system_table != nullptr);
5790     ut_ad(dict_sys_table_id[i] == system_table->id);
5791 
5792     /* During upgrade, table_id of user tables is also
5793     moved by DICT_MAX_DD_TABLES. See dict_load_table_one()*/
5794     table_id_t new_table_id = system_table->id + DICT_MAX_DD_TABLES;
5795 
5796     dict_table_change_id_in_cache(system_table, new_table_id);
5797 
5798     dict_sys_table_id[i] = system_table->id;
5799 
5800     dict_table_prevent_eviction(system_table);
5801   }
5802 }
5803 
5804 /** Evict all tables that are loaded for applying purge.
5805 Since we move the offset of all table ids during upgrade,
5806 these tables cannot exist in cache. Also change table_ids
5807 of SYS_* tables if they are upgraded from earlier versions */
dict_upgrade_evict_tables_cache()5808 void dict_upgrade_evict_tables_cache() {
5809   dict_table_t *table;
5810 
5811   rw_lock_x_lock(dict_operation_lock);
5812   mutex_enter(&dict_sys->mutex);
5813 
5814   ut_ad(dict_lru_validate());
5815   ut_ad(srv_is_upgrade_mode);
5816 
5817   /* Move all tables from non-LRU to LRU */
5818   for (table = UT_LIST_GET_LAST(dict_sys->table_non_LRU); table != nullptr;) {
5819     dict_table_t *prev_table;
5820 
5821     prev_table = UT_LIST_GET_PREV(table_LRU, table);
5822 
5823     if (!dict_table_is_system(table->id)) {
5824       DBUG_EXECUTE_IF("dd_upgrade", ib::info(ER_IB_MSG_185)
5825                                         << "Moving table " << table->name
5826                                         << " from non-LRU to LRU";);
5827 
5828       dict_table_move_from_non_lru_to_lru(table);
5829     }
5830 
5831     table = prev_table;
5832   }
5833 
5834   for (table = UT_LIST_GET_LAST(dict_sys->table_LRU); table != nullptr;) {
5835     dict_table_t *prev_table;
5836 
5837     prev_table = UT_LIST_GET_PREV(table_LRU, table);
5838 
5839     ut_ad(dict_table_can_be_evicted(table));
5840 
5841     DBUG_EXECUTE_IF("dd_upgrade", ib::info(ER_IB_MSG_186)
5842                                       << "Evicting table: LRU: "
5843                                       << table->name;);
5844 
5845     dict_table_remove_from_cache_low(table, TRUE);
5846 
5847     table = prev_table;
5848   }
5849 
5850   dict_table_change_id_sys_tables();
5851 
5852   mutex_exit(&dict_sys->mutex);
5853   rw_lock_x_unlock(dict_operation_lock);
5854 }
5855 
5856 /** Build the table_id array of SYS_* tables. This
5857 array is used to determine if a table is InnoDB SYSTEM
5858 table or not.
5859 @return true if successful, false otherwise */
dict_sys_table_id_build()5860 bool dict_sys_table_id_build() {
5861   mutex_enter(&dict_sys->mutex);
5862   for (uint32_t i = 0; i < SYS_NUM_SYSTEM_TABLES; i++) {
5863     dict_table_t *system_table = dict_table_get_low(SYSTEM_TABLE_NAME[i]);
5864 
5865     if (system_table == nullptr) {
5866       /* Cannot find a system table, this happens only if user trying
5867       to boot server earlier than 5.7 */
5868       mutex_exit(&dict_sys->mutex);
5869       LogErr(ERROR_LEVEL, ER_IB_MSG_1271);
5870       return (false);
5871     }
5872     dict_sys_table_id[i] = system_table->id;
5873   }
5874   mutex_exit(&dict_sys->mutex);
5875   return (true);
5876 }
5877 
5878 /** @return true if table is InnoDB SYS_* table
5879 @param[in]	table_id	table id  */
dict_table_is_system(table_id_t table_id)5880 bool dict_table_is_system(table_id_t table_id) {
5881   for (uint32_t i = 0; i < SYS_NUM_SYSTEM_TABLES; i++) {
5882     if (table_id == dict_sys_table_id[i]) {
5883       return (true);
5884     }
5885   }
5886   return (false);
5887 }
5888 
5889 /** Acquire exclusive MDL on SDI tables. This is acquired to
5890 prevent concurrent DROP table/tablespace when there is purge
5891 happening on SDI table records. Purge will acquired shared
5892 MDL on SDI table.
5893 
5894 Exclusive MDL is transactional(released on trx commit). So
5895 for successful acquistion, there should be valid thd with
5896 trx associated.
5897 
5898 Acquistion order of SDI MDL and SDI table has to be in same
5899 order:
5900 
5901 1. dd_sdi_acquire_exclusive_mdl
5902 2. row_drop_table_from_cache()/innodb_drop_tablespace()
5903    ->dict_sdi_remove_from_cache()->dd_table_open_on_id()
5904 
5905 In purge:
5906 
5907 1. dd_sdi_acquire_shared_mdl
5908 2. dd_table_open_on_id()
5909 
5910 @param[in]	thd		server thread instance
5911 @param[in]	space_id	InnoDB tablespace id
5912 @param[in,out]	sdi_mdl		MDL ticket on SDI table
5913 @retval	DB_SUCESS		on success
5914 @retval	DB_LOCK_WAIT_TIMEOUT	on error */
dd_sdi_acquire_exclusive_mdl(THD * thd,space_id_t space_id,MDL_ticket ** sdi_mdl)5915 dberr_t dd_sdi_acquire_exclusive_mdl(THD *thd, space_id_t space_id,
5916                                      MDL_ticket **sdi_mdl) {
5917   /* Exclusive MDL always need trx context and is
5918   released on trx commit. So check if thd & trx
5919   exists */
5920   ut_ad(thd != nullptr);
5921   ut_ad(check_trx_exists(current_thd) != nullptr);
5922   ut_ad(sdi_mdl != nullptr);
5923   ut_ad(!mutex_own(&dict_sys->mutex));
5924 
5925   char tbl_buf[NAME_LEN + 1];
5926   const char *db_buf = "dummy_sdi_db";
5927 
5928   snprintf(tbl_buf, sizeof(tbl_buf), "SDI_" SPACE_ID_PF, space_id);
5929 
5930   /* Submit a higher than default lock wait timeout */
5931   auto lock_wait_timeout = thd_lock_wait_timeout(thd);
5932   if (lock_wait_timeout < 100000) {
5933     lock_wait_timeout += 100000;
5934   }
5935   if (dd::acquire_exclusive_table_mdl(thd, db_buf, tbl_buf, lock_wait_timeout,
5936                                       sdi_mdl)) {
5937     /* MDL failure can happen with lower timeout
5938     values chosen by user */
5939     return (DB_LOCK_WAIT_TIMEOUT);
5940   }
5941 
5942   /* MDL creation failed */
5943   if (*sdi_mdl == nullptr) {
5944     ut_ad(0);
5945     return (DB_LOCK_WAIT_TIMEOUT);
5946   }
5947 
5948   return (DB_SUCCESS);
5949 }
5950 
5951 /** Acquire shared MDL on SDI tables. This is acquired by purge to
5952 prevent concurrent DROP table/tablespace.
5953 DROP table/tablespace will acquire exclusive MDL on SDI table
5954 
5955 Acquistion order of SDI MDL and SDI table has to be in same
5956 order:
5957 
5958 1. dd_sdi_acquire_exclusive_mdl
5959 2. row_drop_table_from_cache()/innodb_drop_tablespace()
5960    ->dict_sdi_remove_from_cache()->dd_table_open_on_id()
5961 
5962 In purge:
5963 
5964 1. dd_sdi_acquire_shared_mdl
5965 2. dd_table_open_on_id()
5966 
5967 MDL should be released by caller
5968 @param[in]	thd		server thread instance
5969 @param[in]	space_id	InnoDB tablespace id
5970 @param[in,out]	sdi_mdl		MDL ticket on SDI table
5971 @retval	DB_SUCESS		on success
5972 @retval	DB_LOCK_WAIT_TIMEOUT	on error */
dd_sdi_acquire_shared_mdl(THD * thd,space_id_t space_id,MDL_ticket ** sdi_mdl)5973 dberr_t dd_sdi_acquire_shared_mdl(THD *thd, space_id_t space_id,
5974                                   MDL_ticket **sdi_mdl) {
5975   ut_ad(sdi_mdl != nullptr);
5976   ut_ad(!mutex_own(&dict_sys->mutex));
5977 
5978   char tbl_buf[NAME_LEN + 1];
5979   const char *db_buf = "dummy_sdi_db";
5980 
5981   snprintf(tbl_buf, sizeof(tbl_buf), "SDI_" SPACE_ID_PF, space_id);
5982 
5983   if (dd::acquire_shared_table_mdl(thd, db_buf, tbl_buf, false, sdi_mdl)) {
5984     /* MDL failure can happen with lower timeout
5985     values chosen by user */
5986     return (DB_LOCK_WAIT_TIMEOUT);
5987   }
5988 
5989   /* MDL creation failed */
5990   if (*sdi_mdl == nullptr) {
5991     ut_ad(0);
5992     return (DB_LOCK_WAIT_TIMEOUT);
5993   }
5994 
5995   return (DB_SUCCESS);
5996 }
5997 
5998 /** Get the tablespace data directory if set, otherwise empty string.
5999 @return the data directory */
dict_table_get_datadir(const dict_table_t * table)6000 std::string dict_table_get_datadir(const dict_table_t *table) {
6001   std::string path;
6002 
6003   if (DICT_TF_HAS_DATA_DIR(table->flags) && table->data_dir_path != nullptr) {
6004     path.assign(table->data_dir_path);
6005   }
6006 
6007   return (path);
6008 }
6009 #endif /* !UNIV_HOTBACKUP */
6010