1 /*****************************************************************************
2 
3 Copyright (c) 2017, 2020, Oracle and/or its affiliates.
4 
5 Portions of this file contain modifications contributed and copyrighted by
6 Google, Inc. Those modifications are gratefully acknowledged and are described
7 briefly in the InnoDB documentation. The contributions by Google are
8 incorporated with their permission, and subject to the conditions contained in
9 the file COPYING.Google.
10 
11 This program is free software; you can redistribute it and/or modify it under
12 the terms of the GNU General Public License, version 2.0, as published by the
13 Free Software Foundation.
14 
15 This program is also distributed with certain software (including but not
16 limited to OpenSSL) that is licensed under separate terms, as designated in a
17 particular file or component or in included license documentation. The authors
18 of MySQL hereby grant you an additional permission to link the program and
19 your derivative works with the separately licensed software that they have
20 included with MySQL.
21 
22 This program is distributed in the hope that it will be useful, but WITHOUT
23 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
24 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
25 for more details.
26 
27 You should have received a copy of the GNU General Public License along with
28 this program; if not, write to the Free Software Foundation, Inc.,
29 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
30 
31 *****************************************************************************/
32 
33 /** @file log/log0ddl.cc
34  DDL log
35 
36  Created 12/1/2016 Shaohua Wang
37  *******************************************************/
38 
39 #include <debug_sync.h>
40 #include "ha_prototypes.h"
41 
42 #include <current_thd.h>
43 #include <sql_thd_internal_api.h>
44 
45 #include "btr0sea.h"
46 #include "dict0dd.h"
47 #include "dict0mem.h"
48 #include "dict0stats.h"
49 #include "ha_innodb.h"
50 #include "log0ddl.h"
51 #include "mysql/plugin.h"
52 #include "pars0pars.h"
53 #include "que0que.h"
54 #include "row0ins.h"
55 #include "row0row.h"
56 #include "row0sel.h"
57 #include "trx0trx.h"
58 
59 /** Object to handle Log_DDL */
60 Log_DDL *log_ddl = nullptr;
61 
62 /** Whether replaying DDL log
63 Note: we should not write DDL log when replaying DDL log. */
64 thread_local bool thread_local_ddl_log_replay = false;
65 
66 /** Whether in recover(replay) DDL log in startup. */
67 bool Log_DDL::s_in_recovery = false;
68 
69 #ifdef UNIV_DEBUG
70 
71 /** Used by SET GLOBAL innodb_ddl_log_crash_counter_reset_debug = 1; */
72 bool innodb_ddl_log_crash_reset_debug;
73 
74 /** Below counters are only used for four types of DDL log:
75 1. FREE TREE
76 2. DELETE SPACE
77 3. RENAME SPACE
78 4. DROP
79 Other RENAME_TABLE and REMOVE CACHE doesn't touch the data files at all,
80 so would be skipped */
81 
82 /** Crash injection counter used before writing FREE TREE log */
83 static uint32_t crash_before_free_tree_log_counter = 1;
84 
85 /** Crash injection counter used after writing FREE TREE log */
86 static uint32_t crash_after_free_tree_log_counter = 1;
87 
88 /** Crash injection counter used after deleting FREE TREE log */
89 static uint32_t crash_after_free_tree_delete_counter = 1;
90 
91 /** Crash injection counter used before writing DELETE SPACE log */
92 static uint32_t crash_before_delete_space_log_counter = 1;
93 
94 /** Crash injection counter used after writing DELETE SPACE log */
95 static uint32_t crash_after_delete_space_log_counter = 1;
96 
97 /** Crash injection counter used after deleting DELETE SPACE log */
98 static uint32_t crash_after_delete_space_delete_counter = 1;
99 
100 /** Crash injection counter used before writing RENAME SPACE log */
101 static uint32_t crash_before_rename_space_log_counter = 1;
102 
103 /** Crash injection counter used after writing RENAME SPACE log */
104 static uint32_t crash_after_rename_space_log_counter = 1;
105 
106 /** Crash injection counter used after deleting RENAME SPACE log */
107 static uint32_t crash_after_rename_space_delete_counter = 1;
108 
109 /** Crash injection counter used before writing DROP log */
110 static uint32_t crash_before_drop_log_counter = 1;
111 
112 /** Crash injection counter used after writing DROP log */
113 static uint32_t crash_after_drop_log_counter = 1;
114 
115 /** Crash injection counter used after any replay */
116 static uint32_t crash_after_replay_counter = 1;
117 
118 /** Crash injection counter used before writing ALTER ENCRYPT TABLESPACE log */
119 static uint32_t crash_before_alter_encrypt_space_log_counter = 1;
120 
121 /** Crash injection counter used after writing ALTER ENCRYPT TABLESPACE log */
122 static uint32_t crash_after_alter_encrypt_space_log_counter = 1;
123 
ddl_log_crash_reset(THD * thd,SYS_VAR * var,void * var_ptr,const void * save)124 void ddl_log_crash_reset(THD *thd, SYS_VAR *var, void *var_ptr,
125                          const void *save) {
126   const bool reset = *static_cast<const bool *>(save);
127 
128   innodb_ddl_log_crash_reset_debug = reset;
129 
130   if (reset) {
131     crash_before_free_tree_log_counter = 1;
132     crash_after_free_tree_log_counter = 1;
133     crash_after_free_tree_delete_counter = 1;
134     crash_before_delete_space_log_counter = 1;
135     crash_after_delete_space_log_counter = 1;
136     crash_after_delete_space_delete_counter = 1;
137     crash_before_rename_space_log_counter = 1;
138     crash_after_rename_space_log_counter = 1;
139     crash_after_rename_space_delete_counter = 1;
140     crash_before_drop_log_counter = 1;
141     crash_after_drop_log_counter = 1;
142     crash_after_replay_counter = 1;
143   }
144 }
145 
146 #endif /* UNIV_DEBUG */
147 
DDL_Record()148 DDL_Record::DDL_Record()
149     : m_id(ULINT_UNDEFINED),
150       m_thread_id(ULINT_UNDEFINED),
151       m_space_id(SPACE_UNKNOWN),
152       m_page_no(FIL_NULL),
153       m_index_id(ULINT_UNDEFINED),
154       m_table_id(ULINT_UNDEFINED),
155       m_old_file_path(nullptr),
156       m_new_file_path(nullptr),
157       m_heap(nullptr),
158       m_deletable(true) {}
159 
~DDL_Record()160 DDL_Record::~DDL_Record() {
161   if (m_heap != nullptr) {
162     mem_heap_free(m_heap);
163   }
164 }
165 
set_old_file_path(const char * name)166 void DDL_Record::set_old_file_path(const char *name) {
167   ulint len = strlen(name);
168 
169   if (m_heap == nullptr) {
170     m_heap = mem_heap_create(FN_REFLEN + 1);
171   }
172 
173   m_old_file_path = mem_heap_strdupl(m_heap, name, len);
174 }
175 
set_old_file_path(const byte * data,ulint len)176 void DDL_Record::set_old_file_path(const byte *data, ulint len) {
177   if (m_heap == nullptr) {
178     m_heap = mem_heap_create(FN_REFLEN + 1);
179   }
180 
181   m_old_file_path = static_cast<char *>(mem_heap_dup(m_heap, data, len + 1));
182   m_old_file_path[len] = '\0';
183 }
184 
set_new_file_path(const char * name)185 void DDL_Record::set_new_file_path(const char *name) {
186   ulint len = strlen(name);
187 
188   if (m_heap == nullptr) {
189     m_heap = mem_heap_create(FN_REFLEN + 1);
190   }
191 
192   m_new_file_path = mem_heap_strdupl(m_heap, name, len);
193 }
194 
set_new_file_path(const byte * data,ulint len)195 void DDL_Record::set_new_file_path(const byte *data, ulint len) {
196   if (m_heap == nullptr) {
197     m_heap = mem_heap_create(FN_REFLEN + 1);
198   }
199 
200   m_new_file_path = static_cast<char *>(mem_heap_dup(m_heap, data, len + 1));
201   m_new_file_path[len] = '\0';
202 }
203 
print(std::ostream & out) const204 std::ostream &DDL_Record::print(std::ostream &out) const {
205   ut_ad(m_type >= Log_Type::SMALLEST_LOG);
206   ut_ad(m_type <= Log_Type::BIGGEST_LOG);
207 
208   bool printed = false;
209 
210   out << "[DDL record: ";
211 
212   switch (m_type) {
213     case Log_Type::FREE_TREE_LOG:
214       out << "FREE";
215       break;
216     case Log_Type::DELETE_SPACE_LOG:
217       out << "DELETE SPACE";
218       break;
219     case Log_Type::RENAME_SPACE_LOG:
220       out << "RENAME SPACE";
221       break;
222     case Log_Type::DROP_LOG:
223       out << "DROP";
224       break;
225     case Log_Type::RENAME_TABLE_LOG:
226       out << "RENAME TABLE";
227       break;
228     case Log_Type::REMOVE_CACHE_LOG:
229       out << "REMOVE CACHE";
230       break;
231     case Log_Type::ALTER_ENCRYPT_TABLESPACE_LOG:
232       out << "ALTER ENCRYPT TABLESPACE";
233       break;
234     default:
235       ut_ad(0);
236   }
237 
238   out << ",";
239 
240   if (m_id != ULINT_UNDEFINED) {
241     out << " id=" << m_id;
242     printed = true;
243   }
244 
245   if (m_thread_id != ULINT_UNDEFINED) {
246     if (printed) {
247       out << ",";
248     }
249     out << " thread_id=" << m_thread_id;
250     printed = true;
251   }
252 
253   if (m_space_id != SPACE_UNKNOWN) {
254     if (printed) {
255       out << ",";
256     }
257     out << " space_id=" << m_space_id;
258     printed = true;
259   }
260 
261   if (m_table_id != ULINT_UNDEFINED) {
262     if (printed) {
263       out << ",";
264     }
265     out << " table_id=" << m_table_id;
266     printed = true;
267   }
268 
269   if (m_index_id != ULINT_UNDEFINED) {
270     if (printed) {
271       out << ",";
272     }
273     out << " index_id=" << m_index_id;
274     printed = true;
275   }
276 
277   if (m_page_no != FIL_NULL) {
278     if (printed) {
279       out << ",";
280     }
281     out << " page_no=" << m_page_no;
282     printed = true;
283   }
284 
285   if (m_old_file_path != nullptr) {
286     if (printed) {
287       out << ",";
288     }
289     out << " old_file_path=" << m_old_file_path;
290     printed = true;
291   }
292 
293   if (m_new_file_path != nullptr) {
294     if (printed) {
295       out << ",";
296     }
297     out << " new_file_path=" << m_new_file_path;
298   }
299 
300   out << "]";
301 
302   return (out);
303 }
304 
305 /** Display a DDL record
306 @param[in,out]	o	output stream
307 @param[in]	record	DDL record to display
308 @return the output stream */
operator <<(std::ostream & o,const DDL_Record & record)309 std::ostream &operator<<(std::ostream &o, const DDL_Record &record) {
310   return (record.print(o));
311 }
312 
DDL_Log_Table()313 DDL_Log_Table::DDL_Log_Table() : DDL_Log_Table(nullptr) {}
314 
DDL_Log_Table(trx_t * trx)315 DDL_Log_Table::DDL_Log_Table(trx_t *trx)
316     : m_table(dict_sys->ddl_log), m_tuple(nullptr), m_trx(trx), m_thr(nullptr) {
317   ut_ad(m_trx == nullptr || m_trx->ddl_operation);
318   m_heap = mem_heap_create(1000);
319   if (m_trx != nullptr) {
320     start_query_thread();
321   }
322 }
323 
~DDL_Log_Table()324 DDL_Log_Table::~DDL_Log_Table() {
325   stop_query_thread();
326   mem_heap_free(m_heap);
327 }
328 
start_query_thread()329 void DDL_Log_Table::start_query_thread() {
330   que_t *graph = static_cast<que_fork_t *>(que_node_get_parent(
331       pars_complete_graph_for_exec(nullptr, m_trx, m_heap, nullptr)));
332   m_thr = que_fork_start_command(graph);
333   ut_ad(m_trx->lock.n_active_thrs == 1);
334 }
335 
stop_query_thread()336 void DDL_Log_Table::stop_query_thread() {
337   if (m_thr != nullptr) {
338     que_thr_stop_for_mysql_no_error(m_thr, m_trx);
339   }
340 }
341 
create_tuple(const DDL_Record & record)342 void DDL_Log_Table::create_tuple(const DDL_Record &record) {
343   const dict_col_t *col;
344   dfield_t *dfield;
345   byte *buf;
346 
347   m_tuple = dtuple_create(m_heap, m_table->get_n_cols());
348   dict_table_copy_types(m_tuple, m_table);
349   buf = static_cast<byte *>(mem_heap_alloc(m_heap, 8));
350   memset(buf, 0xFF, 8);
351 
352   col = m_table->get_sys_col(DATA_ROW_ID);
353   dfield = dtuple_get_nth_field(m_tuple, dict_col_get_no(col));
354   dfield_set_data(dfield, buf, DATA_ROW_ID_LEN);
355 
356   col = m_table->get_sys_col(DATA_ROLL_PTR);
357   dfield = dtuple_get_nth_field(m_tuple, dict_col_get_no(col));
358   dfield_set_data(dfield, buf, DATA_ROLL_PTR_LEN);
359 
360   buf = static_cast<byte *>(mem_heap_alloc(m_heap, DATA_TRX_ID_LEN));
361   mach_write_to_6(buf, m_trx->id);
362   col = m_table->get_sys_col(DATA_TRX_ID);
363   dfield = dtuple_get_nth_field(m_tuple, dict_col_get_no(col));
364   dfield_set_data(dfield, buf, DATA_TRX_ID_LEN);
365 
366   const ulint rec_id = record.get_id();
367 
368   if (rec_id != ULINT_UNDEFINED) {
369     buf = static_cast<byte *>(mem_heap_alloc(m_heap, s_id_col_len));
370     mach_write_to_8(buf, rec_id);
371     dfield = dtuple_get_nth_field(m_tuple, s_id_col_no);
372     dfield_set_data(dfield, buf, s_id_col_len);
373   }
374 
375   if (record.get_thread_id() != ULINT_UNDEFINED) {
376     buf = static_cast<byte *>(mem_heap_alloc(m_heap, s_thread_id_col_len));
377     mach_write_to_8(buf, record.get_thread_id());
378     dfield = dtuple_get_nth_field(m_tuple, s_thread_id_col_no);
379     dfield_set_data(dfield, buf, s_thread_id_col_len);
380   }
381 
382   ut_ad(record.get_type() >= Log_Type::SMALLEST_LOG);
383   ut_ad(record.get_type() <= Log_Type::BIGGEST_LOG);
384   buf = static_cast<byte *>(mem_heap_alloc(m_heap, s_type_col_len));
385   mach_write_to_4(buf,
386                   static_cast<typename std::underlying_type<Log_Type>::type>(
387                       record.get_type()));
388   dfield = dtuple_get_nth_field(m_tuple, s_type_col_no);
389   dfield_set_data(dfield, buf, s_type_col_len);
390 
391   if (record.get_space_id() != SPACE_UNKNOWN) {
392     buf = static_cast<byte *>(mem_heap_alloc(m_heap, s_space_id_col_len));
393     mach_write_to_4(buf, record.get_space_id());
394     dfield = dtuple_get_nth_field(m_tuple, s_space_id_col_no);
395     dfield_set_data(dfield, buf, s_space_id_col_len);
396   }
397 
398   if (record.get_page_no() != FIL_NULL) {
399     buf = static_cast<byte *>(mem_heap_alloc(m_heap, s_page_no_col_len));
400     mach_write_to_4(buf, record.get_page_no());
401     dfield = dtuple_get_nth_field(m_tuple, s_page_no_col_no);
402     dfield_set_data(dfield, buf, s_page_no_col_len);
403   }
404 
405   if (record.get_index_id() != ULINT_UNDEFINED) {
406     buf = static_cast<byte *>(mem_heap_alloc(m_heap, s_index_id_col_len));
407     mach_write_to_8(buf, record.get_index_id());
408     dfield = dtuple_get_nth_field(m_tuple, s_index_id_col_no);
409     dfield_set_data(dfield, buf, s_index_id_col_len);
410   }
411 
412   if (record.get_table_id() != ULINT_UNDEFINED) {
413     buf = static_cast<byte *>(mem_heap_alloc(m_heap, s_table_id_col_len));
414     mach_write_to_8(buf, record.get_table_id());
415     dfield = dtuple_get_nth_field(m_tuple, s_table_id_col_no);
416     dfield_set_data(dfield, buf, s_table_id_col_len);
417   }
418 
419   if (record.get_old_file_path() != nullptr) {
420     ulint m_len = strlen(record.get_old_file_path()) + 1;
421     dfield = dtuple_get_nth_field(m_tuple, s_old_file_path_col_no);
422     dfield_set_data(dfield, record.get_old_file_path(), m_len);
423   }
424 
425   if (record.get_new_file_path() != nullptr) {
426     ulint m_len = strlen(record.get_new_file_path()) + 1;
427     dfield = dtuple_get_nth_field(m_tuple, s_new_file_path_col_no);
428     dfield_set_data(dfield, record.get_new_file_path(), m_len);
429   }
430 }
431 
create_tuple(ulint id,const dict_index_t * index)432 void DDL_Log_Table::create_tuple(ulint id, const dict_index_t *index) {
433   ut_ad(id != ULINT_UNDEFINED);
434 
435   dfield_t *dfield;
436   ulint len;
437   ulint table_col_offset;
438   ulint index_col_offset;
439 
440   m_tuple = dtuple_create(m_heap, 1);
441   dict_index_copy_types(m_tuple, index, 1);
442 
443   if (index->is_clustered()) {
444     len = s_id_col_len;
445     table_col_offset = s_id_col_no;
446   } else {
447     len = s_thread_id_col_len;
448     table_col_offset = s_thread_id_col_no;
449   }
450 
451   index_col_offset = index->get_col_pos(table_col_offset);
452   byte *buf = static_cast<byte *>(mem_heap_alloc(m_heap, len));
453   mach_write_to_8(buf, id);
454   dfield = dtuple_get_nth_field(m_tuple, index_col_offset);
455   dfield_set_data(dfield, buf, len);
456 }
457 
insert(const DDL_Record & record)458 dberr_t DDL_Log_Table::insert(const DDL_Record &record) {
459   dberr_t error;
460   dict_index_t *index = m_table->first_index();
461   dtuple_t *entry;
462   uint32_t flags = BTR_NO_LOCKING_FLAG;
463   mem_heap_t *offsets_heap = mem_heap_create(1000);
464   static std::atomic<uint64_t> count(0);
465 
466   if (count++ % 64 == 0) {
467     log_free_check();
468   }
469 
470   create_tuple(record);
471   entry = row_build_index_entry(m_tuple, nullptr, index, m_heap);
472 
473 #ifdef UNIV_DEBUG
474   bool insert = true;
475   DBUG_EXECUTE_IF("ddl_log_return_error_from_insert", insert = false;);
476 
477   if (insert) {
478 #endif
479     error = row_ins_clust_index_entry_low(flags, BTR_MODIFY_LEAF, index,
480                                           index->n_uniq, entry, m_thr, false);
481 #ifdef UNIV_DEBUG
482   } else {
483     error = DB_ERROR;
484   }
485 #endif
486 
487   if (error == DB_FAIL) {
488     error = row_ins_clust_index_entry_low(flags, BTR_MODIFY_TREE, index,
489                                           index->n_uniq, entry, m_thr, false);
490     ut_ad(error == DB_SUCCESS);
491   }
492 
493   if (error != DB_SUCCESS) {
494     ib::error(ER_IB_ERR_DDL_LOG_INSERT_FAILURE);
495     mem_heap_free(offsets_heap);
496     return error;
497   }
498 
499   index = index->next();
500 
501   entry = row_build_index_entry(m_tuple, nullptr, index, m_heap);
502 
503   error =
504       row_ins_sec_index_entry_low(flags, BTR_MODIFY_LEAF, index, offsets_heap,
505                                   m_heap, entry, m_trx->id, m_thr, false);
506 
507   if (error == DB_FAIL) {
508     error =
509         row_ins_sec_index_entry_low(flags, BTR_MODIFY_TREE, index, offsets_heap,
510                                     m_heap, entry, m_trx->id, m_thr, false);
511   }
512 
513   mem_heap_free(offsets_heap);
514   ut_ad(error == DB_SUCCESS);
515   return (error);
516 }
517 
convert_to_ddl_record(bool is_clustered,rec_t * rec,const ulint * offsets,DDL_Record & record)518 void DDL_Log_Table::convert_to_ddl_record(bool is_clustered, rec_t *rec,
519                                           const ulint *offsets,
520                                           DDL_Record &record) {
521   if (is_clustered) {
522     for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
523       const byte *data;
524       ulint len;
525 
526       if (i == DATA_ROLL_PTR || i == DATA_TRX_ID) {
527         continue;
528       }
529 
530       data = rec_get_nth_field(rec, offsets, i, &len);
531 
532       if (len != UNIV_SQL_NULL) {
533         set_field(data, i, len, record);
534       }
535     }
536   } else {
537     /* For secondary index, only the ID would be stored */
538     record.set_id(parse_id(m_table->first_index()->next(), rec, offsets));
539   }
540 }
541 
parse_id(const dict_index_t * index,rec_t * rec,const ulint * offsets)542 ulint DDL_Log_Table::parse_id(const dict_index_t *index, rec_t *rec,
543                               const ulint *offsets) {
544   ulint len;
545   ulint index_offset = index->get_col_pos(s_id_col_no);
546 
547   const byte *data = rec_get_nth_field(rec, offsets, index_offset, &len);
548   ut_ad(len == s_id_col_len);
549 
550   return (mach_read_from_8(data));
551 }
552 
set_field(const byte * data,ulint index_offset,ulint len,DDL_Record & record)553 void DDL_Log_Table::set_field(const byte *data, ulint index_offset, ulint len,
554                               DDL_Record &record) {
555   dict_index_t *index = dict_sys->ddl_log->first_index();
556   ulint col_offset = index->get_col_no(index_offset);
557 
558   if (col_offset == s_new_file_path_col_no) {
559     record.set_new_file_path(data, len);
560     return;
561   }
562 
563   if (col_offset == s_old_file_path_col_no) {
564     record.set_old_file_path(data, len);
565     return;
566   }
567 
568   ulint value = fetch_value(data, col_offset);
569   switch (col_offset) {
570     case s_id_col_no:
571       record.set_id(value);
572       break;
573     case s_thread_id_col_no:
574       record.set_thread_id(value);
575       break;
576     case s_type_col_no:
577       record.set_type(static_cast<Log_Type>(value));
578       break;
579     case s_space_id_col_no:
580       record.set_space_id(static_cast<space_id_t>(value));
581       break;
582     case s_page_no_col_no:
583       record.set_page_no(static_cast<page_no_t>(value));
584       break;
585     case s_index_id_col_no:
586       record.set_index_id(value);
587       break;
588     case s_table_id_col_no:
589       record.set_table_id(value);
590       break;
591     case s_old_file_path_col_no:
592     case s_new_file_path_col_no:
593     default:
594       ut_ad(0);
595   }
596 }
597 
fetch_value(const byte * data,ulint offset)598 ulint DDL_Log_Table::fetch_value(const byte *data, ulint offset) {
599   ulint value = 0;
600   switch (offset) {
601     case s_id_col_no:
602     case s_thread_id_col_no:
603     case s_index_id_col_no:
604     case s_table_id_col_no:
605       value = mach_read_from_8(data);
606       return (value);
607     case s_type_col_no:
608     case s_space_id_col_no:
609     case s_page_no_col_no:
610       value = mach_read_from_4(data);
611       return (value);
612     case s_new_file_path_col_no:
613     case s_old_file_path_col_no:
614     default:
615       ut_ad(0);
616       break;
617   }
618 
619   return (value);
620 }
621 
search_all(DDL_Records & records)622 dberr_t DDL_Log_Table::search_all(DDL_Records &records) {
623   mtr_t mtr;
624   btr_pcur_t pcur;
625   rec_t *rec;
626   bool move = true;
627   ulint *offsets;
628   dict_index_t *index = m_table->first_index();
629   dberr_t error = DB_SUCCESS;
630 
631   mtr_start(&mtr);
632 
633   /** Scan the index in decreasing order. */
634   btr_pcur_open_at_index_side(false, index, BTR_SEARCH_LEAF, &pcur, true, 0,
635                               &mtr);
636 
637   for (; move == true; move = btr_pcur_move_to_prev(&pcur, &mtr)) {
638     rec = btr_pcur_get_rec(&pcur);
639 
640     if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
641       continue;
642     }
643 
644     offsets = rec_get_offsets(rec, index, nullptr, ULINT_UNDEFINED, &m_heap);
645 
646     if (rec_get_deleted_flag(rec, dict_table_is_comp(m_table))) {
647       continue;
648     }
649 
650     DDL_Record *record = UT_NEW_NOKEY(DDL_Record());
651     convert_to_ddl_record(index->is_clustered(), rec, offsets, *record);
652     records.push_back(record);
653   }
654 
655   btr_pcur_close(&pcur);
656   mtr_commit(&mtr);
657 
658   return (error);
659 }
660 
search(ulint thread_id,DDL_Records & records)661 dberr_t DDL_Log_Table::search(ulint thread_id, DDL_Records &records) {
662   dberr_t error;
663   DDL_Records records_of_thread_id;
664 
665   error = search_by_id(thread_id, m_table->first_index()->next(),
666                        records_of_thread_id);
667   ut_ad(error == DB_SUCCESS);
668 
669   for (auto it = records_of_thread_id.rbegin();
670        it != records_of_thread_id.rend(); ++it) {
671     error = search_by_id((*it)->get_id(), m_table->first_index(), records);
672     ut_ad(error == DB_SUCCESS);
673   }
674 
675   for (auto record : records_of_thread_id) {
676     UT_DELETE(record);
677   }
678 
679   return (error);
680 }
681 
search_by_id(ulint id,dict_index_t * index,DDL_Records & records)682 dberr_t DDL_Log_Table::search_by_id(ulint id, dict_index_t *index,
683                                     DDL_Records &records) {
684   mtr_t mtr;
685   btr_pcur_t pcur;
686   rec_t *rec;
687   bool move = true;
688   ulint *offsets;
689   dberr_t error = DB_SUCCESS;
690 
691   mtr_start(&mtr);
692 
693   create_tuple(id, index);
694   btr_pcur_open_with_no_init(index, m_tuple, PAGE_CUR_GE, BTR_SEARCH_LEAF,
695                              &pcur, 0, &mtr);
696 
697   for (; move == true; move = btr_pcur_move_to_next(&pcur, &mtr)) {
698     rec = btr_pcur_get_rec(&pcur);
699 
700     if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
701       continue;
702     }
703 
704     offsets = rec_get_offsets(rec, index, nullptr, ULINT_UNDEFINED, &m_heap);
705 
706     if (cmp_dtuple_rec(m_tuple, rec, index, offsets) != 0) {
707       break;
708     }
709 
710     if (rec_get_deleted_flag(rec, dict_table_is_comp(m_table))) {
711       continue;
712     }
713 
714     DDL_Record *record = UT_NEW_NOKEY(DDL_Record());
715     convert_to_ddl_record(index->is_clustered(), rec, offsets, *record);
716     records.push_back(record);
717   }
718 
719   mtr_commit(&mtr);
720 
721   return (error);
722 }
723 
remove(ulint id)724 dberr_t DDL_Log_Table::remove(ulint id) {
725   mtr_t mtr;
726   dict_index_t *clust_index = m_table->first_index();
727   btr_pcur_t pcur;
728   ulint *offsets;
729   rec_t *rec;
730   dict_index_t *index;
731   dtuple_t *row;
732   btr_cur_t *btr_cur;
733   dtuple_t *entry;
734   dberr_t err = DB_SUCCESS;
735   enum row_search_result search_result;
736   ulint flags = BTR_NO_LOCKING_FLAG;
737   static uint64_t count = 0;
738 
739   if (count++ % 64 == 0) {
740     log_free_check();
741   }
742 
743   create_tuple(id, clust_index);
744 
745   mtr_start(&mtr);
746 
747   btr_pcur_open(clust_index, m_tuple, PAGE_CUR_LE,
748                 BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE, &pcur, &mtr);
749 
750   btr_cur = btr_pcur_get_btr_cur(&pcur);
751 
752   if (page_rec_is_infimum(btr_pcur_get_rec(&pcur)) ||
753       btr_pcur_get_low_match(&pcur) < clust_index->n_uniq) {
754     btr_pcur_close(&pcur);
755     mtr_commit(&mtr);
756     return (DB_SUCCESS);
757   }
758 
759   offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), clust_index, nullptr,
760                             ULINT_UNDEFINED, &m_heap);
761 
762   row = row_build(ROW_COPY_DATA, clust_index, btr_pcur_get_rec(&pcur), offsets,
763                   nullptr, nullptr, nullptr, nullptr, m_heap);
764 
765   rec = btr_cur_get_rec(btr_cur);
766 
767   if (!rec_get_deleted_flag(rec, dict_table_is_comp(m_table))) {
768     err = btr_cur_del_mark_set_clust_rec(flags, btr_cur_get_block(btr_cur), rec,
769                                          clust_index, offsets, m_thr, m_tuple,
770                                          &mtr);
771   }
772 
773   btr_pcur_close(&pcur);
774   mtr_commit(&mtr);
775 
776   if (err != DB_SUCCESS) {
777     return (err);
778   }
779 
780   mtr_start(&mtr);
781 
782   index = clust_index->next();
783   entry = row_build_index_entry(row, nullptr, index, m_heap);
784   search_result = row_search_index_entry(
785       index, entry, BTR_MODIFY_LEAF | BTR_DELETE_MARK, &pcur, &mtr);
786   btr_cur = btr_pcur_get_btr_cur(&pcur);
787 
788   if (search_result == ROW_NOT_FOUND) {
789     btr_pcur_close(&pcur);
790     mtr_commit(&mtr);
791     ut_ad(0);
792     return (DB_CORRUPTION);
793   }
794 
795   rec = btr_cur_get_rec(btr_cur);
796 
797   if (!rec_get_deleted_flag(rec, dict_table_is_comp(m_table))) {
798     err = btr_cur_del_mark_set_sec_rec(flags, btr_cur, TRUE, m_thr, &mtr);
799   }
800 
801   btr_pcur_close(&pcur);
802   mtr_commit(&mtr);
803 
804   return (err);
805 }
806 
remove(const DDL_Records & records)807 dberr_t DDL_Log_Table::remove(const DDL_Records &records) {
808   dberr_t ret = DB_SUCCESS;
809 
810   for (auto record : records) {
811     if (record->get_deletable()) {
812       dberr_t err = remove(record->get_id());
813 
814       ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
815       if (err != DB_SUCCESS) {
816         ret = err;
817       }
818     }
819   }
820 
821   return (ret);
822 }
823 
Log_DDL()824 Log_DDL::Log_DDL() {
825   ut_ad(dict_sys->ddl_log != nullptr);
826   ut_ad(dict_table_has_autoinc_col(dict_sys->ddl_log));
827 }
828 
next_id()829 inline uint64_t Log_DDL::next_id() {
830   uint64_t autoinc;
831 
832   dict_table_autoinc_lock(dict_sys->ddl_log);
833   autoinc = dict_table_autoinc_read(dict_sys->ddl_log);
834   ++autoinc;
835   dict_table_autoinc_update_if_greater(dict_sys->ddl_log, autoinc);
836   dict_table_autoinc_unlock(dict_sys->ddl_log);
837 
838   return (autoinc);
839 }
840 
skip(const dict_table_t * table,THD * thd)841 inline bool Log_DDL::skip(const dict_table_t *table, THD *thd) {
842   return (recv_recovery_on || thread_local_ddl_log_replay ||
843           (table != nullptr && table->is_temporary()) ||
844           thd_is_bootstrap_thread(thd));
845 }
846 
write_free_tree_log(trx_t * trx,const dict_index_t * index,bool is_drop_table)847 dberr_t Log_DDL::write_free_tree_log(trx_t *trx, const dict_index_t *index,
848                                      bool is_drop_table) {
849   ut_ad(trx == thd_to_trx(current_thd));
850 
851   if (skip(index->table, trx->mysql_thd)) {
852     return (DB_SUCCESS);
853   }
854 
855   if (index->type & DICT_FTS) {
856     ut_ad(index->page == FIL_NULL);
857     return (DB_SUCCESS);
858   }
859 
860   if (dict_index_get_online_status(index) != ONLINE_INDEX_COMPLETE) {
861     /* To skip any previously aborted index. This is because this kind
862     of index should be already freed in previous post_ddl. It's inproper
863     to log it and may free it again later, which may trigger some
864     double free page problem. */
865     return (DB_SUCCESS);
866   }
867 
868   uint64_t id = next_id();
869   ulint thread_id = thd_get_thread_id(trx->mysql_thd);
870   dberr_t err;
871 
872   trx->ddl_operation = true;
873 
874   DBUG_INJECT_CRASH("ddl_log_crash_before_free_tree_log",
875                     crash_before_free_tree_log_counter++);
876 
877   if (is_drop_table) {
878     /* Drop index case, if committed, will be redo only */
879     err = insert_free_tree_log(trx, index, id, thread_id);
880     if (err != DB_SUCCESS) {
881       return err;
882     }
883 
884     DBUG_INJECT_CRASH("ddl_log_crash_after_free_tree_log",
885                       crash_after_free_tree_log_counter++);
886   } else {
887     /* This is the case of building index during create table
888     scenario. The index will be dropped if ddl is rolled back */
889     err = insert_free_tree_log(nullptr, index, id, thread_id);
890     if (err != DB_SUCCESS) {
891       return err;
892     }
893 
894     DBUG_INJECT_CRASH("ddl_log_crash_after_free_tree_log",
895                       crash_after_free_tree_log_counter++);
896 
897     DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_1",
898                     srv_inject_too_many_concurrent_trxs = true;);
899 
900     /* Delete this operation if the create trx is committed */
901     err = delete_by_id(trx, id, false);
902     ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
903 
904     DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_1",
905                     srv_inject_too_many_concurrent_trxs = false;);
906 
907     DBUG_INJECT_CRASH("ddl_log_crash_after_free_tree_delete",
908                       crash_after_free_tree_delete_counter++);
909   }
910 
911   return (err);
912 }
913 
insert_free_tree_log(trx_t * trx,const dict_index_t * index,uint64_t id,ulint thread_id)914 dberr_t Log_DDL::insert_free_tree_log(trx_t *trx, const dict_index_t *index,
915                                       uint64_t id, ulint thread_id) {
916   ut_ad(index->page != FIL_NULL);
917 
918   dberr_t error;
919   bool has_dd_trx = (trx != nullptr);
920   if (!has_dd_trx) {
921     trx = trx_allocate_for_background();
922     trx_start_internal(trx);
923     trx->ddl_operation = true;
924   } else {
925     trx_start_if_not_started(trx, true);
926   }
927 
928   ut_ad(trx->ddl_operation);
929 
930   DDL_Record record;
931   record.set_id(id);
932   record.set_thread_id(thread_id);
933   record.set_type(Log_Type::FREE_TREE_LOG);
934   record.set_space_id(index->space);
935   record.set_page_no(index->page);
936   record.set_index_id(index->id);
937 
938   {
939     DDL_Log_Table ddl_log(trx);
940     error = ddl_log.insert(record);
941   }
942 
943   if (!has_dd_trx) {
944     trx_commit_for_mysql(trx);
945     trx_free_for_background(trx);
946   }
947 
948   if (error == DB_SUCCESS && srv_print_ddl_logs) {
949     ib::info(ER_IB_MSG_647) << "DDL log insert : " << record;
950   }
951 
952   return (error);
953 }
954 
write_delete_space_log(trx_t * trx,const dict_table_t * table,space_id_t space_id,const char * file_path,bool is_drop,bool dict_locked)955 dberr_t Log_DDL::write_delete_space_log(trx_t *trx, const dict_table_t *table,
956                                         space_id_t space_id,
957                                         const char *file_path, bool is_drop,
958                                         bool dict_locked) {
959   ut_ad(trx == thd_to_trx(current_thd));
960   ut_ad(table == nullptr || dict_table_is_file_per_table(table));
961 
962   if (skip(table, trx->mysql_thd)) {
963     return (DB_SUCCESS);
964   }
965 
966   uint64_t id = next_id();
967   ulint thread_id = thd_get_thread_id(trx->mysql_thd);
968   dberr_t err;
969 
970   trx->ddl_operation = true;
971 
972   DBUG_INJECT_CRASH("ddl_log_crash_before_delete_space_log",
973                     crash_before_delete_space_log_counter++);
974 
975   if (is_drop) {
976     err = insert_delete_space_log(trx, id, thread_id, space_id, file_path,
977                                   dict_locked);
978     if (err != DB_SUCCESS) {
979       return err;
980     }
981 
982     DBUG_INJECT_CRASH("ddl_log_crash_after_delete_space_log",
983                       crash_after_delete_space_log_counter++);
984   } else {
985     err = insert_delete_space_log(nullptr, id, thread_id, space_id, file_path,
986                                   dict_locked);
987     if (err != DB_SUCCESS) {
988       return err;
989     }
990 
991     DBUG_INJECT_CRASH("ddl_log_crash_after_delete_space_log",
992                       crash_after_delete_space_log_counter++);
993 
994     DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_2",
995                     srv_inject_too_many_concurrent_trxs = true;);
996 
997     err = delete_by_id(trx, id, dict_locked);
998     ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
999 
1000     DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_2",
1001                     srv_inject_too_many_concurrent_trxs = false;);
1002 
1003     DBUG_INJECT_CRASH("ddl_log_crash_after_delete_space_delete",
1004                       crash_after_delete_space_delete_counter++);
1005   }
1006 
1007   return (err);
1008 }
1009 
insert_delete_space_log(trx_t * trx,uint64_t id,ulint thread_id,space_id_t space_id,const char * file_path,bool dict_locked)1010 dberr_t Log_DDL::insert_delete_space_log(trx_t *trx, uint64_t id,
1011                                          ulint thread_id, space_id_t space_id,
1012                                          const char *file_path,
1013                                          bool dict_locked) {
1014   dberr_t error;
1015   bool has_dd_trx = (trx != nullptr);
1016 
1017   if (!has_dd_trx) {
1018     trx = trx_allocate_for_background();
1019     trx_start_internal(trx);
1020     trx->ddl_operation = true;
1021   } else {
1022     trx_start_if_not_started(trx, true);
1023   }
1024 
1025   ut_ad(trx->ddl_operation);
1026 
1027   if (dict_locked) {
1028     mutex_exit(&dict_sys->mutex);
1029   }
1030 
1031   DDL_Record record;
1032   record.set_id(id);
1033   record.set_thread_id(thread_id);
1034   record.set_type(Log_Type::DELETE_SPACE_LOG);
1035   record.set_space_id(space_id);
1036   record.set_old_file_path(file_path);
1037 
1038   {
1039     DDL_Log_Table ddl_log(trx);
1040     error = ddl_log.insert(record);
1041   }
1042 
1043   if (dict_locked) {
1044     mutex_enter(&dict_sys->mutex);
1045   }
1046 
1047   if (!has_dd_trx) {
1048     trx_commit_for_mysql(trx);
1049     trx_free_for_background(trx);
1050   }
1051 
1052   if (error == DB_SUCCESS && srv_print_ddl_logs) {
1053     ib::info(ER_IB_MSG_648) << "DDL log insert : " << record;
1054   }
1055 
1056   return (error);
1057 }
1058 
write_rename_space_log(space_id_t space_id,const char * old_file_path,const char * new_file_path)1059 dberr_t Log_DDL::write_rename_space_log(space_id_t space_id,
1060                                         const char *old_file_path,
1061                                         const char *new_file_path) {
1062   /* Missing current_thd, it happens during crash recovery */
1063   if (!current_thd) {
1064     return (DB_SUCCESS);
1065   }
1066 
1067   trx_t *trx = thd_to_trx(current_thd);
1068 
1069   /* This is special case for fil_rename_tablespace during recovery */
1070   if (trx == nullptr) {
1071     return (DB_SUCCESS);
1072   }
1073 
1074   if (skip(nullptr, trx->mysql_thd)) {
1075     return (DB_SUCCESS);
1076   }
1077 
1078   uint64_t id = next_id();
1079   ulint thread_id = thd_get_thread_id(trx->mysql_thd);
1080 
1081   trx->ddl_operation = true;
1082 
1083   DBUG_INJECT_CRASH("ddl_log_crash_before_rename_space_log",
1084                     crash_before_rename_space_log_counter++);
1085 
1086   dberr_t err = insert_rename_space_log(id, thread_id, space_id, old_file_path,
1087                                         new_file_path);
1088   if (err != DB_SUCCESS) {
1089     return err;
1090   }
1091 
1092   DBUG_INJECT_CRASH("ddl_log_crash_after_rename_space_log",
1093                     crash_after_rename_space_log_counter++);
1094 
1095   DBUG_EXECUTE_IF("ddl_log_crash_after_rename_space_log_insert",
1096                   DBUG_SUICIDE(););
1097 
1098   DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_4",
1099                   srv_inject_too_many_concurrent_trxs = true;);
1100 
1101   err = delete_by_id(trx, id, true);
1102   ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
1103 
1104   DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_4",
1105                   srv_inject_too_many_concurrent_trxs = false;);
1106 
1107   DBUG_INJECT_CRASH("ddl_log_crash_after_rename_space_delete",
1108                     crash_after_rename_space_delete_counter++);
1109 
1110   return (err);
1111 }
1112 
insert_rename_space_log(uint64_t id,ulint thread_id,space_id_t space_id,const char * old_file_path,const char * new_file_path)1113 dberr_t Log_DDL::insert_rename_space_log(uint64_t id, ulint thread_id,
1114                                          space_id_t space_id,
1115                                          const char *old_file_path,
1116                                          const char *new_file_path) {
1117   dberr_t error;
1118   trx_t *trx = trx_allocate_for_background();
1119   trx_start_internal(trx);
1120   trx->ddl_operation = true;
1121 
1122   ut_ad(mutex_own(&dict_sys->mutex));
1123   mutex_exit(&dict_sys->mutex);
1124 
1125   DDL_Record record;
1126   record.set_id(id);
1127   record.set_thread_id(thread_id);
1128   record.set_type(Log_Type::RENAME_SPACE_LOG);
1129   record.set_space_id(space_id);
1130   record.set_old_file_path(old_file_path);
1131   record.set_new_file_path(new_file_path);
1132 
1133   {
1134     DDL_Log_Table ddl_log(trx);
1135     error = ddl_log.insert(record);
1136   }
1137 
1138   mutex_enter(&dict_sys->mutex);
1139 
1140   trx_commit_for_mysql(trx);
1141   trx_free_for_background(trx);
1142 
1143   if (error == DB_SUCCESS && srv_print_ddl_logs) {
1144     ib::info(ER_IB_MSG_649) << "DDL log insert : " << record;
1145   }
1146 
1147   return (error);
1148 }
1149 
write_alter_encrypt_space_log(space_id_t space_id)1150 dberr_t Log_DDL::write_alter_encrypt_space_log(space_id_t space_id) {
1151   /* Missing current_thd, it happens during crash recovery */
1152   if (!current_thd) {
1153     return (DB_SUCCESS);
1154   }
1155 
1156   trx_t *trx = thd_to_trx(current_thd);
1157 
1158   if (skip(nullptr, trx->mysql_thd)) {
1159     return (DB_SUCCESS);
1160   }
1161 
1162   uint64_t id = next_id();
1163   ulint thread_id = thd_get_thread_id(trx->mysql_thd);
1164 
1165   trx->ddl_operation = true;
1166 
1167   DBUG_INJECT_CRASH("ddl_log_crash_before_alter_encrypt_space_log",
1168                     crash_before_alter_encrypt_space_log_counter++);
1169 
1170   dberr_t err = insert_alter_encrypt_space_log(id, thread_id, space_id);
1171   if (err != DB_SUCCESS) {
1172     return err;
1173   }
1174 
1175   DBUG_INJECT_CRASH("ddl_log_crash_after_alter_encrypt_space_log",
1176                     crash_after_alter_encrypt_space_log_counter++);
1177 
1178   return (err);
1179 }
1180 
insert_alter_encrypt_space_log(uint64_t id,ulint thread_id,space_id_t space_id)1181 dberr_t Log_DDL::insert_alter_encrypt_space_log(uint64_t id, ulint thread_id,
1182                                                 space_id_t space_id) {
1183   dberr_t error;
1184   trx_t *trx = trx_allocate_for_background();
1185   trx_start_internal(trx);
1186   trx->ddl_operation = true;
1187 
1188   ut_ad(mutex_own(&dict_sys->mutex));
1189   mutex_exit(&dict_sys->mutex);
1190 
1191   DDL_Record record;
1192   record.set_id(id);
1193   record.set_thread_id(thread_id);
1194   record.set_type(Log_Type::ALTER_ENCRYPT_TABLESPACE_LOG);
1195   record.set_space_id(space_id);
1196 
1197   {
1198     DDL_Log_Table ddl_log(trx);
1199     error = ddl_log.insert(record);
1200   }
1201 
1202   mutex_enter(&dict_sys->mutex);
1203 
1204   trx_commit_for_mysql(trx);
1205   trx_free_for_background(trx);
1206 
1207   if (error == DB_SUCCESS && srv_print_ddl_logs) {
1208     ib::info(ER_IB_MSG_1284) << "DDL log insert : " << record;
1209   }
1210 
1211   return (error);
1212 }
1213 
write_drop_log(trx_t * trx,const table_id_t table_id)1214 dberr_t Log_DDL::write_drop_log(trx_t *trx, const table_id_t table_id) {
1215   if (skip(nullptr, trx->mysql_thd)) {
1216     return (DB_SUCCESS);
1217   }
1218 
1219   trx->ddl_operation = true;
1220 
1221   uint64_t id = next_id();
1222   ulint thread_id = thd_get_thread_id(trx->mysql_thd);
1223 
1224   DBUG_INJECT_CRASH("ddl_log_crash_before_drop_log",
1225                     crash_before_drop_log_counter++);
1226 
1227   dberr_t err;
1228   err = insert_drop_log(trx, id, thread_id, table_id);
1229   if (err != DB_SUCCESS) {
1230     return err;
1231   }
1232 
1233   DBUG_INJECT_CRASH("ddl_log_crash_after_drop_log",
1234                     crash_after_drop_log_counter++);
1235 
1236   return (err);
1237 }
1238 
insert_drop_log(trx_t * trx,uint64_t id,ulint thread_id,const table_id_t table_id)1239 dberr_t Log_DDL::insert_drop_log(trx_t *trx, uint64_t id, ulint thread_id,
1240                                  const table_id_t table_id) {
1241   ut_ad(trx->ddl_operation);
1242   ut_ad(mutex_own(&dict_sys->mutex));
1243 
1244   trx_start_if_not_started(trx, true);
1245 
1246   mutex_exit(&dict_sys->mutex);
1247 
1248   dberr_t error;
1249   DDL_Record record;
1250   record.set_id(id);
1251   record.set_thread_id(thread_id);
1252   record.set_type(Log_Type::DROP_LOG);
1253   record.set_table_id(table_id);
1254 
1255   {
1256     DDL_Log_Table ddl_log(trx);
1257     error = ddl_log.insert(record);
1258   }
1259 
1260   mutex_enter(&dict_sys->mutex);
1261 
1262   if (error == DB_SUCCESS && srv_print_ddl_logs) {
1263     ib::info(ER_IB_MSG_650) << "DDL log insert : " << record;
1264   }
1265 
1266   return (error);
1267 }
1268 
write_rename_table_log(dict_table_t * table,const char * old_name,const char * new_name)1269 dberr_t Log_DDL::write_rename_table_log(dict_table_t *table,
1270                                         const char *old_name,
1271                                         const char *new_name) {
1272   trx_t *trx = thd_to_trx(current_thd);
1273 
1274   if (skip(table, trx->mysql_thd)) {
1275     return (DB_SUCCESS);
1276   }
1277 
1278   uint64_t id = next_id();
1279   ulint thread_id = thd_get_thread_id(trx->mysql_thd);
1280 
1281   trx->ddl_operation = true;
1282 
1283   dberr_t err =
1284       insert_rename_table_log(id, thread_id, table->id, old_name, new_name);
1285   if (err != DB_SUCCESS) {
1286     return err;
1287   }
1288 
1289   DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_5",
1290                   srv_inject_too_many_concurrent_trxs = true;);
1291 
1292   err = delete_by_id(trx, id, true);
1293   ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
1294 
1295   DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_5",
1296                   srv_inject_too_many_concurrent_trxs = false;);
1297 
1298   return (err);
1299 }
1300 
insert_rename_table_log(uint64_t id,ulint thread_id,table_id_t table_id,const char * old_name,const char * new_name)1301 dberr_t Log_DDL::insert_rename_table_log(uint64_t id, ulint thread_id,
1302                                          table_id_t table_id,
1303                                          const char *old_name,
1304                                          const char *new_name) {
1305   dberr_t error;
1306   trx_t *trx = trx_allocate_for_background();
1307   trx_start_internal(trx);
1308   trx->ddl_operation = true;
1309 
1310   ut_ad(mutex_own(&dict_sys->mutex));
1311   mutex_exit(&dict_sys->mutex);
1312 
1313   DDL_Record record;
1314   record.set_id(id);
1315   record.set_thread_id(thread_id);
1316   record.set_type(Log_Type::RENAME_TABLE_LOG);
1317   record.set_table_id(table_id);
1318   record.set_old_file_path(old_name);
1319   record.set_new_file_path(new_name);
1320 
1321   {
1322     DDL_Log_Table ddl_log(trx);
1323     error = ddl_log.insert(record);
1324   }
1325 
1326   mutex_enter(&dict_sys->mutex);
1327 
1328   trx_commit_for_mysql(trx);
1329   trx_free_for_background(trx);
1330 
1331   if (error == DB_SUCCESS && srv_print_ddl_logs) {
1332     ib::info(ER_IB_MSG_651) << "DDL log insert : " << record;
1333   }
1334 
1335   return (error);
1336 }
1337 
write_remove_cache_log(trx_t * trx,dict_table_t * table)1338 dberr_t Log_DDL::write_remove_cache_log(trx_t *trx, dict_table_t *table) {
1339   ut_ad(trx == thd_to_trx(current_thd));
1340 
1341   if (skip(table, trx->mysql_thd)) {
1342     return (DB_SUCCESS);
1343   }
1344 
1345   uint64_t id = next_id();
1346   ulint thread_id = thd_get_thread_id(trx->mysql_thd);
1347 
1348   trx->ddl_operation = true;
1349 
1350   dberr_t err =
1351       insert_remove_cache_log(id, thread_id, table->id, table->name.m_name);
1352   if (err != DB_SUCCESS) {
1353     return err;
1354   }
1355 
1356   DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_3",
1357                   srv_inject_too_many_concurrent_trxs = true;);
1358 
1359   err = delete_by_id(trx, id, false);
1360   ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
1361 
1362   DBUG_EXECUTE_IF("DDL_Log_remove_inject_error_3",
1363                   srv_inject_too_many_concurrent_trxs = false;);
1364 
1365   return (err);
1366 }
1367 
insert_remove_cache_log(uint64_t id,ulint thread_id,table_id_t table_id,const char * table_name)1368 dberr_t Log_DDL::insert_remove_cache_log(uint64_t id, ulint thread_id,
1369                                          table_id_t table_id,
1370                                          const char *table_name) {
1371   dberr_t error;
1372   trx_t *trx = trx_allocate_for_background();
1373   trx_start_internal(trx);
1374   trx->ddl_operation = true;
1375 
1376   DDL_Record record;
1377   record.set_id(id);
1378   record.set_thread_id(thread_id);
1379   record.set_type(Log_Type::REMOVE_CACHE_LOG);
1380   record.set_table_id(table_id);
1381   record.set_new_file_path(table_name);
1382 
1383   {
1384     DDL_Log_Table ddl_log(trx);
1385     error = ddl_log.insert(record);
1386   }
1387 
1388   trx_commit_for_mysql(trx);
1389   trx_free_for_background(trx);
1390 
1391   if (error == DB_SUCCESS && srv_print_ddl_logs) {
1392     ib::info(ER_IB_MSG_652) << "DDL log insert : " << record;
1393   }
1394 
1395   return (error);
1396 }
1397 
delete_by_id(trx_t * trx,uint64_t id,bool dict_locked)1398 dberr_t Log_DDL::delete_by_id(trx_t *trx, uint64_t id, bool dict_locked) {
1399   dberr_t err;
1400 
1401   trx_start_if_not_started(trx, true);
1402 
1403   ut_ad(trx->ddl_operation);
1404 
1405   if (dict_locked) {
1406     mutex_exit(&dict_sys->mutex);
1407   }
1408 
1409   {
1410     DDL_Log_Table ddl_log(trx);
1411     err = ddl_log.remove(id);
1412     ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
1413 
1414     if (err == DB_TOO_MANY_CONCURRENT_TRXS) {
1415       ib::error(ER_IB_MSG_DDL_LOG_DELETE_BY_ID_TMCT);
1416     }
1417   }
1418 
1419   if (dict_locked) {
1420     mutex_enter(&dict_sys->mutex);
1421   }
1422 
1423   if (srv_print_ddl_logs && err == DB_SUCCESS) {
1424     ib::info(ER_IB_MSG_DDL_LOG_DELETE_BY_ID_OK) << "DDL log delete : " << id;
1425   }
1426 
1427   return (err);
1428 }
1429 
replay_all()1430 dberr_t Log_DDL::replay_all() {
1431   ut_ad(is_in_recovery());
1432 
1433   DDL_Log_Table ddl_log;
1434   DDL_Records records;
1435 
1436   dberr_t err = ddl_log.search_all(records);
1437   ut_ad(err == DB_SUCCESS);
1438 
1439   for (auto record : records) {
1440     log_ddl->replay(*record);
1441     /* If this is alter tablespace encrypt entry, don't delete it yet.
1442     This is to handle crash during resume operation. This entry will be deleted
1443     once resume operation is finished. */
1444     if (record->get_type() == Log_Type::ALTER_ENCRYPT_TABLESPACE_LOG) {
1445       ts_encrypt_ddl_records.push_back(record);
1446       record->set_deletable(false);
1447     }
1448   }
1449 
1450   err = delete_by_ids(records);
1451   ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
1452 
1453   for (auto record : records) {
1454     if (record->get_deletable()) {
1455       UT_DELETE(record);
1456     }
1457   }
1458 
1459   return (err);
1460 }
1461 
replay_by_thread_id(ulint thread_id)1462 dberr_t Log_DDL::replay_by_thread_id(ulint thread_id) {
1463   DDL_Log_Table ddl_log;
1464   DDL_Records records;
1465 
1466   dberr_t err = ddl_log.search(thread_id, records);
1467   ut_ad(err == DB_SUCCESS);
1468 
1469   for (auto record : records) {
1470     log_ddl->replay(*record);
1471   }
1472 
1473   err = delete_by_ids(records);
1474   ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
1475 
1476   for (auto record : records) {
1477     if (record->get_deletable()) {
1478       UT_DELETE(record);
1479     }
1480   }
1481 
1482   return (err);
1483 }
1484 
1485 #define DELETE_IDS_RETRIES_MAX 10
1486 
delete_by_ids(DDL_Records & records)1487 dberr_t Log_DDL::delete_by_ids(DDL_Records &records) {
1488   dberr_t err = DB_SUCCESS;
1489 
1490   if (records.empty()) {
1491     return (err);
1492   }
1493 
1494   int t;
1495   for (t = DELETE_IDS_RETRIES_MAX; t > 0; t--) {
1496     trx_t *trx;
1497     trx = trx_allocate_for_background();
1498     trx_start_if_not_started(trx, true);
1499     trx->ddl_operation = true;
1500 
1501     {
1502       DDL_Log_Table ddl_log(trx);
1503       err = ddl_log.remove(records);
1504       ut_ad(err == DB_SUCCESS || err == DB_TOO_MANY_CONCURRENT_TRXS);
1505     }
1506 
1507     trx_commit_for_mysql(trx);
1508     trx_free_for_background(trx);
1509 
1510 #ifdef UNIV_DEBUG
1511     if (srv_inject_too_many_concurrent_trxs) {
1512       srv_inject_too_many_concurrent_trxs = false;
1513     }
1514 #endif /* UNIV_DEBUG */
1515 
1516     if (err != DB_TOO_MANY_CONCURRENT_TRXS) {
1517       break;
1518     }
1519   }
1520 
1521   return (err);
1522 }
1523 
replay(DDL_Record & record)1524 dberr_t Log_DDL::replay(DDL_Record &record) {
1525   dberr_t err = DB_SUCCESS;
1526 
1527   if (srv_print_ddl_logs) {
1528     ib::info(ER_IB_MSG_654) << "DDL log replay : " << record;
1529   }
1530 
1531   switch (record.get_type()) {
1532     case Log_Type::FREE_TREE_LOG:
1533       replay_free_tree_log(record.get_space_id(), record.get_page_no(),
1534                            record.get_index_id());
1535       break;
1536 
1537     case Log_Type::DELETE_SPACE_LOG:
1538       replay_delete_space_log(record.get_space_id(),
1539                               record.get_old_file_path());
1540       break;
1541 
1542     case Log_Type::RENAME_SPACE_LOG:
1543       replay_rename_space_log(record.get_space_id(), record.get_old_file_path(),
1544                               record.get_new_file_path());
1545       break;
1546 
1547     case Log_Type::DROP_LOG:
1548       replay_drop_log(record.get_table_id());
1549       break;
1550 
1551     case Log_Type::RENAME_TABLE_LOG:
1552       replay_rename_table_log(record.get_table_id(), record.get_old_file_path(),
1553                               record.get_new_file_path());
1554       break;
1555 
1556     case Log_Type::REMOVE_CACHE_LOG:
1557       replay_remove_cache_log(record.get_table_id(),
1558                               record.get_new_file_path());
1559       break;
1560 
1561     case Log_Type::ALTER_ENCRYPT_TABLESPACE_LOG:
1562       replay_alter_encrypt_space_log(record.get_space_id());
1563       break;
1564 
1565     default:
1566       ut_error;
1567   }
1568 
1569   return (err);
1570 }
1571 
replay_free_tree_log(space_id_t space_id,page_no_t page_no,ulint index_id)1572 void Log_DDL::replay_free_tree_log(space_id_t space_id, page_no_t page_no,
1573                                    ulint index_id) {
1574   ut_ad(space_id != SPACE_UNKNOWN);
1575   ut_ad(page_no != FIL_NULL);
1576 
1577   bool found;
1578   const page_size_t page_size(fil_space_get_page_size(space_id, &found));
1579 
1580   /* Skip if it is a single table tablespace and the .ibd
1581   file is missing */
1582   if (!found) {
1583     if (srv_print_ddl_logs) {
1584       ib::info(ER_IB_MSG_655)
1585           << "DDL log replay : FREE tablespace " << space_id << " is missing.";
1586     }
1587 
1588     return;
1589   }
1590 
1591   /* This is required by dropping hash index afterwards. */
1592   mutex_enter(&dict_sys->mutex);
1593 
1594   mtr_t mtr;
1595   mtr_start(&mtr);
1596 
1597   btr_free_if_exists(page_id_t(space_id, page_no), page_size, index_id, &mtr);
1598 
1599   mtr_commit(&mtr);
1600 
1601   mutex_exit(&dict_sys->mutex);
1602 
1603   DBUG_INJECT_CRASH("ddl_log_crash_after_replay", crash_after_replay_counter++);
1604 }
1605 
replay_delete_space_log(space_id_t space_id,const char * file_path)1606 void Log_DDL::replay_delete_space_log(space_id_t space_id,
1607                                       const char *file_path) {
1608   THD *thd = current_thd;
1609 
1610   if (fsp_is_undo_tablespace(space_id)) {
1611     /* If this is called during DROP UNDO TABLESPACE, then the undo_space
1612     is already gone. But if this is called at startup after a crash, that
1613     memory object might exist. If the crash occurred just before the file
1614     was deleted, then at startup it was opened in srv_undo_tablespaces_open().
1615     Then in trx_rsegs_init(), any explicit undo tablespace that did not
1616     contain any undo logs was set to empty.  That prevented any new undo
1617     logs to be added during the startup process up till now.  So whether
1618     we are at runtime or startup, we assert that the undo tablespace is
1619     empty and delete the undo::Tablespace object if it exists. */
1620     undo::spaces->x_lock();
1621     space_id_t space_num = undo::id2num(space_id);
1622     undo::Tablespace *undo_space = undo::spaces->find(space_num);
1623     if (undo_space != nullptr) {
1624       ut_a(undo_space->is_empty());
1625       undo::spaces->drop(undo_space);
1626     }
1627     undo::spaces->x_unlock();
1628   }
1629 
1630   if (thd != nullptr) {
1631     /* For general tablespace, MDL on SDI tables is already
1632     acquired at innobase_drop_tablespace() and for file_per_table
1633     tablespace, MDL is acquired at row_drop_table_for_mysql() */
1634     mutex_enter(&dict_sys->mutex);
1635     dict_sdi_remove_from_cache(space_id, nullptr, true);
1636     mutex_exit(&dict_sys->mutex);
1637   }
1638 
1639   /* A master key rotation blocks all DDLs using backup_lock, so it is assured
1640   that during CREATE/DROP TABLE, master key will not change. */
1641 
1642   DBUG_EXECUTE_IF("ddl_log_replay_delete_space_crash_before_drop",
1643                   DBUG_SUICIDE(););
1644 
1645   /* Update filename with correct partition case, of needed. */
1646   std::string path_str(file_path);
1647   std::string space_name;
1648   fil_update_partition_name(space_id, 0, false, space_name, path_str);
1649   file_path = path_str.c_str();
1650 
1651   row_drop_tablespace(space_id, file_path);
1652 
1653   /* If this is an undo space_id, allow the undo number for it
1654   to be reused. */
1655   if (fsp_is_undo_tablespace(space_id)) {
1656     undo::spaces->x_lock();
1657     undo::unuse_space_id(space_id);
1658     undo::spaces->x_unlock();
1659   }
1660 
1661   DBUG_INJECT_CRASH("ddl_log_crash_after_replay", crash_after_replay_counter++);
1662 }
1663 
replay_rename_space_log(space_id_t space_id,const char * old_file_path,const char * new_file_path)1664 void Log_DDL::replay_rename_space_log(space_id_t space_id,
1665                                       const char *old_file_path,
1666                                       const char *new_file_path) {
1667   bool ret;
1668   page_id_t page_id(space_id, 0);
1669 
1670   std::string space_name;
1671 
1672   /* Update old filename with correct partition case, of needed. */
1673   std::string old_path(old_file_path);
1674   fil_update_partition_name(space_id, 0, false, space_name, old_path);
1675   old_file_path = old_path.c_str();
1676 
1677   /* Update new filename with correct partition case, of needed. */
1678   std::string new_path(new_file_path);
1679   fil_update_partition_name(space_id, 0, false, space_name, new_path);
1680   new_file_path = new_path.c_str();
1681 
1682   ret = fil_op_replay_rename_for_ddl(page_id, old_file_path, new_file_path);
1683 
1684   if (!ret && srv_print_ddl_logs) {
1685     ib::info(ER_IB_MSG_656) << "DDL log replay : RENAME from " << old_file_path
1686                             << " to " << new_file_path << " failed";
1687   }
1688 
1689   DBUG_INJECT_CRASH("ddl_log_crash_after_replay", crash_after_replay_counter++);
1690 }
1691 
replay_alter_encrypt_space_log(space_id_t space_id)1692 void Log_DDL::replay_alter_encrypt_space_log(space_id_t space_id) {
1693   /* NOOP */
1694   DBUG_INJECT_CRASH("ddl_log_crash_after_replay", crash_after_replay_counter++);
1695 }
1696 
replay_drop_log(const table_id_t table_id)1697 void Log_DDL::replay_drop_log(const table_id_t table_id) {
1698   mutex_enter(&dict_persist->mutex);
1699   ut_d(dberr_t error =) dict_persist->table_buffer->remove(table_id);
1700   ut_ad(error == DB_SUCCESS);
1701   mutex_exit(&dict_persist->mutex);
1702 
1703   DBUG_INJECT_CRASH("ddl_log_crash_after_replay", crash_after_replay_counter++);
1704 }
1705 
replay_rename_table_log(table_id_t table_id,const char * old_name,const char * new_name)1706 void Log_DDL::replay_rename_table_log(table_id_t table_id, const char *old_name,
1707                                       const char *new_name) {
1708   if (is_in_recovery()) {
1709     if (srv_print_ddl_logs) {
1710       ib::info(ER_IB_MSG_657) << "DDL log replay : in recovery,"
1711                               << " skip RENAME TABLE";
1712     }
1713 
1714     return;
1715   }
1716 
1717   trx_t *trx;
1718   trx = trx_allocate_for_background();
1719   trx->mysql_thd = current_thd;
1720   trx_start_if_not_started(trx, true);
1721 
1722   row_mysql_lock_data_dictionary(trx);
1723   trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
1724 
1725   /* Convert partition table name DDL log, if needed. Required if
1726   upgrading a crashed database. */
1727   std::string old_table(old_name);
1728   dict_name::rebuild(old_table);
1729   old_name = old_table.c_str();
1730 
1731   std::string new_table(new_name);
1732   dict_name::rebuild(new_table);
1733   new_name = new_table.c_str();
1734 
1735   dberr_t err;
1736   err = row_rename_table_for_mysql(old_name, new_name, nullptr, trx, true);
1737 
1738   dict_table_t *table;
1739   table = dd_table_open_on_name_in_mem(new_name, true);
1740   if (table != nullptr) {
1741     dict_table_ddl_release(table);
1742     dd_table_close(table, nullptr, nullptr, true);
1743   }
1744 
1745   row_mysql_unlock_data_dictionary(trx);
1746 
1747   trx_commit_for_mysql(trx);
1748   trx_free_for_background(trx);
1749 
1750   if (err != DB_SUCCESS) {
1751     if (srv_print_ddl_logs) {
1752       ib::info(ER_IB_MSG_658)
1753           << "DDL log replay : rename table"
1754           << " in cache from " << old_name << " to " << new_name;
1755     }
1756   } else {
1757     /* TODO: Once we get rid of dict_operation_lock,
1758     we may consider to do this in row_rename_table_for_mysql,
1759     so no need to worry this rename here */
1760     char errstr[512];
1761 
1762     dict_stats_rename_table(old_name, new_name, errstr, sizeof(errstr));
1763   }
1764 }
1765 
replay_remove_cache_log(table_id_t table_id,const char * table_name)1766 void Log_DDL::replay_remove_cache_log(table_id_t table_id,
1767                                       const char *table_name) {
1768   if (is_in_recovery()) {
1769     if (srv_print_ddl_logs) {
1770       ib::info(ER_IB_MSG_659) << "DDL log replay : in recovery,"
1771                               << " skip REMOVE CACHE";
1772     }
1773 
1774     return;
1775   }
1776 
1777   dict_table_t *table;
1778 
1779   table = dd_table_open_on_id_in_mem(table_id, false);
1780 
1781   /* Convert partition table name DDL log, if needed. Required if
1782   upgrading a crashed database. */
1783   std::string table_str(table_name);
1784   dict_name::rebuild(table_str);
1785   table_name = table_str.c_str();
1786 
1787   if (table != nullptr) {
1788     ut_ad(strcmp(table->name.m_name, table_name) == 0);
1789 
1790     mutex_enter(&dict_sys->mutex);
1791     dd_table_close(table, nullptr, nullptr, true);
1792     btr_drop_ahi_for_table(table);
1793     dict_table_remove_from_cache(table);
1794     mutex_exit(&dict_sys->mutex);
1795   }
1796 }
1797 
post_ddl(THD * thd)1798 dberr_t Log_DDL::post_ddl(THD *thd) {
1799   if (skip(nullptr, thd)) {
1800     return (DB_SUCCESS);
1801   }
1802 
1803   if (srv_read_only_mode || srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN) {
1804     return (DB_SUCCESS);
1805   }
1806 
1807   DEBUG_SYNC(thd, "innodb_ddl_log_before_enter");
1808 
1809   DBUG_EXECUTE_IF("ddl_log_before_post_ddl", DBUG_SUICIDE(););
1810 
1811   /* If srv_force_recovery > 0, DROP TABLE is allowed, and here only
1812   DELETE and DROP log can be replayed. */
1813 
1814   ulint thread_id = thd_get_thread_id(thd);
1815 
1816   if (srv_print_ddl_logs) {
1817     ib::info(ER_IB_MSG_660)
1818         << "DDL log post ddl : begin for thread id : " << thread_id;
1819   }
1820 
1821   thread_local_ddl_log_replay = true;
1822 
1823   dberr_t err = replay_by_thread_id(thread_id);
1824 
1825   thread_local_ddl_log_replay = false;
1826 
1827   if (srv_print_ddl_logs) {
1828     ib::info(ER_IB_MSG_661)
1829         << "DDL log post ddl : end for thread id : " << thread_id;
1830   }
1831 
1832   return (err);
1833 }
1834 
recover()1835 dberr_t Log_DDL::recover() {
1836   if (srv_read_only_mode || srv_force_recovery > 0) {
1837     return (DB_SUCCESS);
1838   }
1839 
1840   ib::info(ER_IB_MSG_662) << "DDL log recovery : begin";
1841 
1842   thread_local_ddl_log_replay = true;
1843   s_in_recovery = true;
1844 
1845   dberr_t err = replay_all();
1846 
1847   thread_local_ddl_log_replay = false;
1848   s_in_recovery = false;
1849 
1850   ib::info(ER_IB_MSG_663) << "DDL log recovery : end";
1851 
1852   return (err);
1853 }
1854 
post_ts_encryption(DDL_Records & records)1855 dberr_t Log_DDL::post_ts_encryption(DDL_Records &records) {
1856   for (auto record : records) {
1857     record->set_deletable(true);
1858   }
1859 
1860   dberr_t err = Log_DDL::delete_by_ids(records);
1861 
1862   for (auto record : records) {
1863     UT_DELETE(record);
1864   }
1865 
1866   return (err);
1867 }
1868