1 /*****************************************************************************
2
3 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25 *****************************************************************************/
26
27 /** @file clone/clone0api.cc
28 Innodb Clone Interface
29
30 *******************************************************/
31 #include <cstdio>
32 #include <fstream>
33 #include <iostream>
34
35 #include "clone0api.h"
36 #include "clone0clone.h"
37 #include "os0thread-create.h"
38
39 #include "sql/clone_handler.h"
40 #include "sql/mysqld.h"
41 #include "sql/sql_class.h"
42 #include "sql/sql_prepare.h"
43 #include "sql/sql_table.h"
44 #include "sql/sql_thd_internal_api.h"
45 #include "sql/strfunc.h"
46
47 #include "dict0dd.h"
48 #include "sql/dd/cache/dictionary_client.h"
49 #include "sql/dd/dictionary.h"
50 #include "sql/dd/impl/dictionary_impl.h" // dd::dd_tablespace_id()
51 #include "sql/dd/impl/sdi.h"
52 #include "sql/dd/impl/utils.h"
53 #include "sql/dd/types/schema.h"
54 #include "sql/dd/types/table.h"
55 #include "sql/rpl_msr.h" // is_slave_configured()
56
57 /** Check if clone status file exists.
58 @param[in] file_name file name
59 @return true if file exists. */
file_exists(std::string & file_name)60 static bool file_exists(std::string &file_name) {
61 std::ifstream file(file_name.c_str());
62
63 if (file.is_open()) {
64 file.close();
65 return (true);
66 }
67 return (false);
68 }
69
70 /** Rename clone status file. The operation is expected to be atomic
71 when the files belong to same directory.
72 @param[in] from_file name of current file
73 @param[in] to_file name of new file */
rename_file(std::string & from_file,std::string & to_file)74 static void rename_file(std::string &from_file, std::string &to_file) {
75 auto ret = std::rename(from_file.c_str(), to_file.c_str());
76
77 if (ret != 0) {
78 ib::fatal(ER_IB_CLONE_STATUS_FILE)
79 << "Error renaming file from: " << from_file.c_str()
80 << " to: " << to_file.c_str();
81 }
82 }
83
84 /** Create clone status file.
85 @param[in] file_name file name */
create_file(std::string & file_name)86 static void create_file(std::string &file_name) {
87 std::ofstream file(file_name.c_str());
88
89 if (file.is_open()) {
90 file.close();
91 return;
92 }
93 ib::error(ER_IB_CLONE_STATUS_FILE)
94 << "Error creating file : " << file_name.c_str();
95 }
96
97 /** Delete clone status file.
98 @param[in] file name of file */
remove_file(std::string & file)99 static void remove_file(std::string &file) {
100 auto ret = std::remove(file.c_str());
101
102 if (ret != 0) {
103 ib::error(ER_IB_CLONE_STATUS_FILE)
104 << "Error removing file : " << file.c_str();
105 }
106 }
107
108 /** Create clone in progress file and error file.
109 @param[in] clone clone handle */
create_status_file(const Clone_Handle * clone)110 static void create_status_file(const Clone_Handle *clone) {
111 const char *path = clone->get_datadir();
112 std::string file_name;
113
114 if (clone->replace_datadir()) {
115 /* Create error file for rollback. */
116 file_name.assign(CLONE_INNODB_ERROR_FILE);
117 create_file(file_name);
118 return;
119 }
120
121 file_name.assign(path);
122 /* Add path separator if needed. */
123 if (file_name.back() != OS_PATH_SEPARATOR) {
124 file_name.append(OS_PATH_SEPARATOR_STR);
125 }
126 file_name.append(CLONE_INNODB_IN_PROGRESS_FILE);
127
128 create_file(file_name);
129 }
130
131 /** Drop clone in progress file and error file.
132 @param[in] clone clone handle */
drop_status_file(const Clone_Handle * clone)133 static void drop_status_file(const Clone_Handle *clone) {
134 const char *path = clone->get_datadir();
135 std::string file_name;
136
137 if (clone->replace_datadir()) {
138 /* Indicate that clone needs table fix up on recovery. */
139 file_name.assign(CLONE_INNODB_FIXUP_FILE);
140 create_file(file_name);
141
142 /* drop error file on success. */
143 file_name.assign(CLONE_INNODB_ERROR_FILE);
144 remove_file(file_name);
145
146 DBUG_EXECUTE_IF("clone_recovery_crash_point", {
147 file_name.assign(CLONE_INNODB_RECOVERY_CRASH_POINT);
148 create_file(file_name);
149 });
150 return;
151 }
152
153 std::string path_name(path);
154 /* Add path separator if needed. */
155 if (path_name.back() != OS_PATH_SEPARATOR) {
156 path_name.append(OS_PATH_SEPARATOR_STR);
157 }
158
159 /* Indicate that clone needs table fix up on recovery. */
160 file_name.assign(path_name);
161 file_name.append(CLONE_INNODB_FIXUP_FILE);
162 create_file(file_name);
163
164 /* Indicate clone needs to update recovery status. */
165 file_name.assign(path_name);
166 file_name.append(CLONE_INNODB_REPLACED_FILES);
167 create_file(file_name);
168
169 /* Mark successful clone operation. */
170 file_name.assign(path_name);
171 file_name.append(CLONE_INNODB_IN_PROGRESS_FILE);
172 remove_file(file_name);
173 }
174
clone_init_list_files()175 void clone_init_list_files() {
176 /* Remove any existing list files. */
177 std::string new_files(CLONE_INNODB_NEW_FILES);
178 if (file_exists(new_files)) {
179 remove_file(new_files);
180 }
181 std::string old_files(CLONE_INNODB_OLD_FILES);
182 if (file_exists(old_files)) {
183 remove_file(old_files);
184 }
185 std::string replaced_files(CLONE_INNODB_REPLACED_FILES);
186 if (file_exists(replaced_files)) {
187 remove_file(replaced_files);
188 }
189 std::string recovery_file(CLONE_INNODB_RECOVERY_FILE);
190 if (file_exists(recovery_file)) {
191 remove_file(recovery_file);
192 }
193 }
194
clone_add_to_list_file(const char * list_file_name,const char * file_name)195 int clone_add_to_list_file(const char *list_file_name, const char *file_name) {
196 std::ofstream list_file;
197 list_file.open(list_file_name, std::ofstream::app);
198
199 if (list_file.is_open()) {
200 list_file << file_name << std::endl;
201
202 if (list_file.good()) {
203 list_file.close();
204 return (0);
205 }
206 list_file.close();
207 }
208 /* This is an error case. Either open or write call failed. */
209 char errbuf[MYSYS_STRERROR_SIZE];
210 my_error(ER_ERROR_ON_WRITE, MYF(0), list_file_name, errno,
211 my_strerror(errbuf, sizeof(errbuf), errno));
212 return (ER_ERROR_ON_WRITE);
213 }
214
215 /** Add all existing redo files to old file list. */
track_redo_files()216 static void track_redo_files() {
217 std::string log_file;
218 for (uint32_t index = 0; index < srv_n_log_files; ++index) {
219 /* Build redo log file name. */
220 char file_name[MAX_LOG_FILE_NAME + 1];
221 snprintf(file_name, MAX_LOG_FILE_NAME, "%s%u", ib_logfile_basename, index);
222
223 log_file.assign(srv_log_group_home_dir);
224 if (!log_file.empty() && log_file.back() != OS_PATH_SEPARATOR) {
225 log_file.append(OS_PATH_SEPARATOR_STR);
226 }
227 log_file.append(file_name);
228 clone_add_to_list_file(CLONE_INNODB_OLD_FILES, log_file.c_str());
229 }
230 }
231
232 /** Execute sql statement.
233 @param[in,out] thd current THD
234 @param[in] sql_stmt SQL statement
235 @param[in] thread_number executing thread number
236 @param[in] skip_error skip statement on error
237 @return false, if successful. */
238 static bool clone_execute_query(THD *thd, const char *sql_stmt,
239 size_t thread_number, bool skip_error);
240
241 /** Delete all binary logs before clone.
242 @param[in] thd current THD
243 @return error code */
244 static int clone_drop_binary_logs(THD *thd);
245
246 /** Drop all user data before starting clone.
247 @param[in,out] thd current THD
248 @param[in] allow_threads allow multiple threads
249 @return error code */
250 static int clone_drop_user_data(THD *thd, bool allow_threads);
251
252 /** Set security context to skip privilege check.
253 @param[in,out] thd session THD
254 @param[in,out] sctx security context */
skip_grants(THD * thd,Security_context & sctx)255 static void skip_grants(THD *thd, Security_context &sctx) {
256 /* Take care of the possible side effect of skipping grant i.e.
257 setting SYSTEM_USER privilege flag. */
258 bool saved_flag = thd->is_system_user();
259 sctx.skip_grants();
260 ut_ad(thd->is_system_user() == saved_flag);
261 thd->set_system_user(saved_flag);
262 }
263
innodb_clone_get_capability(Ha_clone_flagset & flags)264 void innodb_clone_get_capability(Ha_clone_flagset &flags) {
265 flags.reset();
266
267 flags.set(HA_CLONE_HYBRID);
268 flags.set(HA_CLONE_MULTI_TASK);
269 flags.set(HA_CLONE_RESTART);
270 }
271
innodb_clone_begin(handlerton * hton,THD * thd,const byte * & loc,uint & loc_len,uint & task_id,Ha_clone_type type,Ha_clone_mode mode)272 int innodb_clone_begin(handlerton *hton, THD *thd, const byte *&loc,
273 uint &loc_len, uint &task_id, Ha_clone_type type,
274 Ha_clone_mode mode) {
275 /* Check if reference locator is valid */
276 if (loc != nullptr && !clone_validate_locator(loc, loc_len)) {
277 int err = ER_CLONE_PROTOCOL;
278 my_error(err, MYF(0), "Wrong Clone RPC: Invalid Locator");
279 return (err);
280 }
281
282 /* Acquire clone system mutex which would automatically get released
283 when we return from the function [RAII]. */
284 IB_mutex_guard sys_mutex(clone_sys->get_mutex());
285
286 /* Check if concurrent ddl has marked abort. */
287 if (Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
288 if (thd != nullptr) {
289 my_error(ER_CLONE_DDL_IN_PROGRESS, MYF(0));
290 }
291
292 return (ER_CLONE_DDL_IN_PROGRESS);
293 }
294
295 if (!mtr_t::s_logging.is_enabled()) {
296 if (thd != nullptr) {
297 my_error(ER_INNODB_REDO_DISABLED, MYF(0));
298 }
299 return (ER_INNODB_REDO_DISABLED);
300 }
301
302 /* Check of clone is already in progress for the reference locator. */
303 auto clone_hdl = clone_sys->find_clone(loc, loc_len, CLONE_HDL_COPY);
304
305 int err = 0;
306
307 switch (mode) {
308 case HA_CLONE_MODE_RESTART:
309 /* Error out if existing clone is not found */
310 if (clone_hdl == nullptr) {
311 my_error(ER_INTERNAL_ERROR, MYF(0),
312 "Innodb Clone Restart could not find existing clone");
313 return (ER_INTERNAL_ERROR);
314 }
315
316 ib::info(ER_IB_CLONE_START_STOP) << "Clone Begin Master Task: Restart";
317 err = clone_hdl->restart_copy(thd, loc, loc_len);
318
319 break;
320
321 case HA_CLONE_MODE_START: {
322 /* Should not find existing clone for the locator */
323 if (clone_hdl != nullptr) {
324 clone_sys->drop_clone(clone_hdl);
325 my_error(ER_INTERNAL_ERROR, MYF(0),
326 "Innodb Clone Begin refers existing clone");
327 return (ER_INTERNAL_ERROR);
328 }
329 LEX_CSTRING sctx_user = thd->m_main_security_ctx.user();
330 LEX_CSTRING sctx_host = thd->m_main_security_ctx.host_or_ip();
331
332 /* Should not become a donor when provisioning is started. */
333 if (Clone_handler::is_provisioning()) {
334 if (0 == strcmp(my_localhost, sctx_host.str)) {
335 my_error(ER_CLONE_LOOPBACK, MYF(0));
336 return (ER_CLONE_LOOPBACK);
337 }
338 my_error(ER_CLONE_TOO_MANY_CONCURRENT_CLONES, MYF(0), MAX_CLONES);
339 return (ER_CLONE_TOO_MANY_CONCURRENT_CLONES);
340 }
341
342 /* Log user and host beginning clone operation. */
343 ib::info(ER_IB_CLONE_START_STOP) << "Clone Begin Master Task by "
344 << sctx_user.str << "@" << sctx_host.str;
345 break;
346 }
347
348 case HA_CLONE_MODE_ADD_TASK:
349 /* Should find existing clone for the locator */
350 if (clone_hdl == nullptr) {
351 /* Operation has finished already */
352 my_error(ER_INTERNAL_ERROR, MYF(0),
353 "Innodb Clone add task refers non-existing clone");
354
355 return (ER_INTERNAL_ERROR);
356 }
357 break;
358
359 case HA_CLONE_MODE_VERSION:
360 case HA_CLONE_MODE_MAX:
361 default:
362 ut_ad(false);
363 my_error(ER_INTERNAL_ERROR, MYF(0), "Innodb Clone Begin Invalid Mode");
364
365 return (ER_INTERNAL_ERROR);
366 }
367
368 if (clone_hdl == nullptr) {
369 ut_ad(thd != nullptr);
370 ut_ad(mode == HA_CLONE_MODE_START);
371
372 /* Create new clone handle for copy. Reference locator
373 is used for matching the version. */
374 auto err = clone_sys->add_clone(loc, CLONE_HDL_COPY, clone_hdl);
375 if (err != 0) {
376 return (err);
377 }
378
379 err = clone_hdl->init(loc, loc_len, type, nullptr);
380
381 /* Check and wait if clone is marked for wait. */
382 if (err == 0) {
383 err = clone_sys->wait_for_free(thd);
384 }
385
386 if (err != 0) {
387 clone_sys->drop_clone(clone_hdl);
388 return (err);
389 }
390 }
391
392 /* Add new task for the clone copy operation. */
393 if (err == 0) {
394 /* Release clone system mutex here as we might need to wait while
395 adding task. It is safe as the clone handle is acquired and cannot
396 be freed till we release it. */
397 mutex_exit(clone_sys->get_mutex());
398 err = clone_hdl->add_task(thd, nullptr, 0, task_id);
399 mutex_enter(clone_sys->get_mutex());
400 }
401
402 if (err != 0) {
403 clone_sys->drop_clone(clone_hdl);
404 return (err);
405 }
406
407 if (task_id > 0) {
408 ib::info(ER_IB_CLONE_START_STOP) << "Clone Begin Task ID: " << task_id;
409 }
410
411 /* Get the current locator from clone handle. */
412 loc = clone_hdl->get_locator(loc_len);
413 return (0);
414 }
415
innodb_clone_copy(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,Ha_clone_cbk * cbk)416 int innodb_clone_copy(handlerton *hton, THD *thd, const byte *loc, uint loc_len,
417 uint task_id, Ha_clone_cbk *cbk) {
418 cbk->set_hton(hton);
419
420 /* Get clone handle by locator index. */
421 auto clone_hdl = clone_sys->get_clone_by_index(loc, loc_len);
422
423 auto err = clone_hdl->check_error(thd);
424 if (err != 0) {
425 return (err);
426 }
427
428 /* Start data copy. */
429 err = clone_hdl->copy(thd, task_id, cbk);
430 clone_hdl->save_error(err);
431
432 return (err);
433 }
434
innodb_clone_ack(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,int in_err,Ha_clone_cbk * cbk)435 int innodb_clone_ack(handlerton *hton, THD *thd, const byte *loc, uint loc_len,
436 uint task_id, int in_err, Ha_clone_cbk *cbk) {
437 cbk->set_hton(hton);
438
439 /* Check if reference locator is valid */
440 if (loc != nullptr && !clone_validate_locator(loc, loc_len)) {
441 int err = ER_CLONE_PROTOCOL;
442 my_error(err, MYF(0), "Wrong Clone RPC: Invalid Locator");
443 return (err);
444 }
445 mutex_enter(clone_sys->get_mutex());
446
447 /* Find attach clone handle using the reference locator. */
448 auto clone_hdl = clone_sys->find_clone(loc, loc_len, CLONE_HDL_COPY);
449
450 mutex_exit(clone_sys->get_mutex());
451
452 /* Must find existing clone for the locator */
453 if (clone_hdl == nullptr) {
454 my_error(ER_INTERNAL_ERROR, MYF(0),
455
456 "Innodb Clone ACK refers non-existing clone");
457 return (ER_INTERNAL_ERROR);
458 }
459
460 int err = 0;
461
462 /* If thread is interrupted, then set interrupt error instead. */
463 if (thd_killed(thd)) {
464 my_error(ER_QUERY_INTERRUPTED, MYF(0));
465 in_err = ER_QUERY_INTERRUPTED;
466 }
467
468 if (in_err == 0) {
469 /* Apply acknowledged data */
470 err = clone_hdl->apply(thd, task_id, cbk);
471
472 clone_hdl->save_error(err);
473 } else {
474 /* For error input, return after saving it */
475 ib::info(ER_IB_CLONE_OPERATION) << "Clone set error ACK: " << in_err;
476 clone_hdl->save_error(in_err);
477 }
478
479 mutex_enter(clone_sys->get_mutex());
480
481 /* Detach from clone handle */
482 clone_sys->drop_clone(clone_hdl);
483
484 mutex_exit(clone_sys->get_mutex());
485
486 return (err);
487 }
488
innodb_clone_end(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,int in_err)489 int innodb_clone_end(handlerton *hton, THD *thd, const byte *loc, uint loc_len,
490 uint task_id, int in_err) {
491 /* Acquire clone system mutex which would automatically get released
492 when we return from the function [RAII]. */
493 IB_mutex_guard sys_mutex(clone_sys->get_mutex());
494
495 /* Get clone handle by locator index. */
496 auto clone_hdl = clone_sys->get_clone_by_index(loc, loc_len);
497
498 /* If thread is interrupted, then set interrupt error instead. */
499 if (thd_killed(thd)) {
500 my_error(ER_QUERY_INTERRUPTED, MYF(0));
501 in_err = ER_QUERY_INTERRUPTED;
502 }
503 /* Set error, if already not set */
504 clone_hdl->save_error(in_err);
505
506 /* Drop current task. */
507 bool is_master = false;
508 auto wait_reconnect = clone_hdl->drop_task(thd, task_id, in_err, is_master);
509 auto is_copy = clone_hdl->is_copy_clone();
510 auto is_init = clone_hdl->is_init();
511 auto is_abort = clone_hdl->is_abort();
512
513 if (!wait_reconnect || is_abort) {
514 if (is_copy && is_master) {
515 if (is_abort) {
516 ib::info(ER_IB_CLONE_RESTART)
517 << "Clone Master aborted by concurrent clone";
518
519 } else if (in_err != 0) {
520 /* Make sure re-start attempt fails immediately */
521 clone_hdl->set_state(CLONE_STATE_ABORT);
522 }
523 }
524
525 if (!is_copy && !is_init && is_master) {
526 if (in_err == 0) {
527 /* On success for apply handle, drop status file. */
528 drop_status_file(clone_hdl);
529 } else if (clone_hdl->replace_datadir()) {
530 /* On failure, rollback if replacing current data directory. */
531 clone_files_error();
532 }
533 }
534 clone_sys->drop_clone(clone_hdl);
535
536 auto da = thd->get_stmt_da();
537 ib::info(ER_IB_CLONE_START_STOP)
538 << "Clone"
539 << (is_copy ? " End" : (is_init ? " Apply Version End" : " Apply End"))
540 << (is_master ? " Master" : "") << " Task ID: " << task_id
541 << (in_err != 0 ? " Failed, code: " : " Passed, code: ") << in_err
542 << ": "
543 << ((in_err == 0 || da == nullptr || !da->is_error())
544 ? ""
545 : da->message_text());
546 return (0);
547 }
548
549 auto da = thd->get_stmt_da();
550 ib::info(ER_IB_CLONE_RESTART)
551 << "Clone Master wait for restart"
552 << " after n/w error code: " << in_err << ": "
553 << ((da == nullptr || !da->is_error()) ? "" : da->message_text());
554
555 ut_ad(clone_hdl->is_copy_clone());
556 ut_ad(is_master);
557
558 /* Set state to idle and wait for re-connect */
559 clone_hdl->set_state(CLONE_STATE_IDLE);
560 /* Sleep for 1 second */
561 Clone_Msec sleep_time(Clone_Sec(1));
562 /* Generate alert message every minute. */
563 Clone_Sec alert_interval(Clone_Min(1));
564 /* Wait for 5 minutes for client to reconnect back */
565 Clone_Sec time_out(Clone_Min(5));
566
567 bool is_timeout = false;
568 auto err = Clone_Sys::wait(
569 sleep_time, time_out, alert_interval,
570 [&](bool alert, bool &result) {
571 ut_ad(mutex_own(clone_sys->get_mutex()));
572 result = !clone_hdl->is_active();
573
574 if (thd_killed(thd) || clone_hdl->is_interrupted()) {
575 ib::info(ER_IB_CLONE_RESTART)
576 << "Clone End Master wait for Restart interrupted";
577 my_error(ER_QUERY_INTERRUPTED, MYF(0));
578 return (ER_QUERY_INTERRUPTED);
579
580 } else if (Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
581 ib::info(ER_IB_CLONE_RESTART)
582 << "Clone End Master wait for Restart aborted by DDL";
583 my_error(ER_CLONE_DDL_IN_PROGRESS, MYF(0));
584 return (ER_CLONE_DDL_IN_PROGRESS);
585
586 } else if (clone_hdl->is_abort()) {
587 result = false;
588 ib::info(ER_IB_CLONE_RESTART) << "Clone End Master wait for Restart"
589 " aborted by concurrent clone";
590 return (0);
591 }
592
593 if (!result) {
594 ib::info(ER_IB_CLONE_RESTART)
595 << "Clone Master restarted successfully by "
596 "other task after n/w failure";
597
598 } else if (alert) {
599 ib::info(ER_IB_CLONE_RESTART)
600 << "Clone Master still waiting for restart";
601 }
602 return (0);
603 },
604 clone_sys->get_mutex(), is_timeout);
605
606 if (err == 0 && is_timeout && clone_hdl->is_idle()) {
607 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone End Master wait "
608 "for restart timed out after "
609 "5 Minutes. Dropping Snapshot";
610 }
611 /* Last task should drop the clone handle. */
612 clone_sys->drop_clone(clone_hdl);
613 return (0);
614 }
615
innodb_clone_apply_begin(handlerton * hton,THD * thd,const byte * & loc,uint & loc_len,uint & task_id,Ha_clone_mode mode,const char * data_dir)616 int innodb_clone_apply_begin(handlerton *hton, THD *thd, const byte *&loc,
617 uint &loc_len, uint &task_id, Ha_clone_mode mode,
618 const char *data_dir) {
619 /* Check if reference locator is valid */
620 if (loc != nullptr && !clone_validate_locator(loc, loc_len)) {
621 int err = ER_CLONE_PROTOCOL;
622 my_error(err, MYF(0), "Wrong Clone RPC: Invalid Locator");
623 return (err);
624 }
625
626 /* Acquire clone system mutex which would automatically get released
627 when we return from the function [RAII]. */
628 IB_mutex_guard sys_mutex(clone_sys->get_mutex());
629
630 /* Check if clone is already in progress for the reference locator. */
631 auto clone_hdl = clone_sys->find_clone(loc, loc_len, CLONE_HDL_APPLY);
632
633 switch (mode) {
634 case HA_CLONE_MODE_RESTART: {
635 ib::info(ER_IB_CLONE_RESTART) << "Clone Apply Begin Master Task: Restart";
636 auto err = clone_hdl->restart_apply(thd, loc, loc_len);
637
638 /* Reduce reference count */
639 clone_sys->drop_clone(clone_hdl);
640
641 /* Restart is done by master task */
642 ut_ad(task_id == 0);
643 task_id = 0;
644
645 return (err);
646 }
647 case HA_CLONE_MODE_START:
648
649 if (clone_hdl != nullptr) {
650 ut_ad(false);
651 clone_sys->drop_clone(clone_hdl);
652 ib::error(ER_IB_CLONE_INTERNAL)
653 << "Clone Apply Begin Master found duplicate clone";
654 clone_hdl = nullptr;
655 }
656
657 /* Check if the locator is from current mysqld server. */
658 clone_hdl = clone_sys->find_clone(loc, loc_len, CLONE_HDL_COPY);
659
660 if (clone_hdl != nullptr) {
661 clone_sys->drop_clone(clone_hdl);
662 clone_hdl = nullptr;
663 ib::info(ER_IB_CLONE_START_STOP) << "Clone Apply Master Loop Back";
664 ut_ad(data_dir != nullptr);
665 }
666 ib::info(ER_IB_CLONE_START_STOP) << "Clone Apply Begin Master Task";
667 break;
668
669 case HA_CLONE_MODE_ADD_TASK:
670 /* Should find existing clone for the locator */
671 if (clone_hdl == nullptr) {
672 /* Operation has finished already */
673 my_error(ER_INTERNAL_ERROR, MYF(0),
674 "Innodb Clone Apply add task to non-existing clone");
675
676 return (ER_INTERNAL_ERROR);
677 }
678 break;
679
680 case HA_CLONE_MODE_VERSION:
681 /* Cannot have input locator or existing clone */
682 ib::info(ER_IB_CLONE_START_STOP)
683 << "Clone Apply Begin Master Version Check";
684 ut_ad(loc == nullptr);
685 ut_ad(clone_hdl == nullptr);
686 break;
687
688 case HA_CLONE_MODE_MAX:
689 default:
690 ut_ad(false);
691
692 my_error(ER_INTERNAL_ERROR, MYF(0),
693 "Innodb Clone Appply Begin Invalid Mode");
694
695 return (ER_INTERNAL_ERROR);
696 }
697
698 if (clone_hdl == nullptr) {
699 ut_ad(thd != nullptr);
700
701 ut_ad(mode == HA_CLONE_MODE_VERSION || mode == HA_CLONE_MODE_START);
702
703 /* Create new clone handle for apply. Reference locator
704 is used for matching the version. */
705 auto err = clone_sys->add_clone(loc, CLONE_HDL_APPLY, clone_hdl);
706 if (err != 0) {
707 return (err);
708 }
709
710 err = clone_hdl->init(loc, loc_len, HA_CLONE_BLOCKING, data_dir);
711
712 if (err != 0) {
713 clone_sys->drop_clone(clone_hdl);
714 return (err);
715 }
716 }
717
718 if (clone_hdl->is_active()) {
719 /* Release clone system mutex here as we might need to wait while
720 adding task. It is safe as the clone handle is acquired and cannot
721 be freed till we release it. */
722 mutex_exit(clone_sys->get_mutex());
723
724 /* Create status file to indicate active clone directory. */
725 if (mode == HA_CLONE_MODE_START) {
726 create_status_file(clone_hdl);
727 }
728
729 int err = 0;
730 /* Drop any user data after acquiring backup lock. Don't allow
731 concurrent threads as the BACKUP MDL lock would not allow any
732 other threads to execute DDL. */
733 if (clone_hdl->replace_datadir() && mode == HA_CLONE_MODE_START) {
734 /* Safeguard to throw error if innodb read only mode is on. Currently
735 not reachable as we would get error much earlier while dropping user
736 tables. */
737 if (srv_read_only_mode) {
738 ut_ad(false);
739 err = ER_INTERNAL_ERROR;
740 my_error(err, MYF(0),
741 "Clone cannot replace data with innodb_read_only = ON");
742 } else {
743 track_redo_files();
744 err = clone_drop_user_data(thd, false);
745 if (err != 0) {
746 clone_files_error();
747 }
748 }
749 }
750
751 /* Add new task for the clone apply operation. */
752 if (err == 0) {
753 ut_ad(loc != nullptr);
754 err = clone_hdl->add_task(thd, loc, loc_len, task_id);
755 }
756 mutex_enter(clone_sys->get_mutex());
757
758 if (err != 0) {
759 clone_sys->drop_clone(clone_hdl);
760 return (err);
761 }
762
763 } else {
764 ut_ad(mode == HA_CLONE_MODE_VERSION);
765
766 /* Set all clone status files empty. */
767 if (clone_hdl->replace_datadir()) {
768 clone_init_list_files();
769 }
770 }
771
772 if (task_id > 0) {
773 ib::info(ER_IB_CLONE_START_STOP)
774 << "Clone Apply Begin Task ID: " << task_id;
775 }
776 /* Get the current locator from clone handle. */
777 if (mode != HA_CLONE_MODE_ADD_TASK) {
778 loc = clone_hdl->get_locator(loc_len);
779 }
780 return (0);
781 }
782
innodb_clone_apply(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,int in_err,Ha_clone_cbk * cbk)783 int innodb_clone_apply(handlerton *hton, THD *thd, const byte *loc,
784 uint loc_len, uint task_id, int in_err,
785 Ha_clone_cbk *cbk) {
786 /* Get clone handle by locator index. */
787 auto clone_hdl = clone_sys->get_clone_by_index(loc, loc_len);
788 ut_ad(in_err != 0 || cbk != nullptr);
789
790 /* For error input, return after saving it */
791 if (in_err != 0 || cbk == nullptr) {
792 clone_hdl->save_error(in_err);
793 auto da = thd->get_stmt_da();
794 ib::info(ER_IB_CLONE_OPERATION)
795 << "Clone Apply set error code: " << in_err << ": "
796 << ((in_err == 0 || da == nullptr || !da->is_error())
797 ? ""
798 : da->message_text());
799 return (0);
800 }
801
802 cbk->set_hton(hton);
803 auto err = clone_hdl->check_error(thd);
804 if (err != 0) {
805 return (err);
806 }
807
808 /* Apply data received from callback. */
809 err = clone_hdl->apply(thd, task_id, cbk);
810 clone_hdl->save_error(err);
811
812 return (err);
813 }
814
innodb_clone_apply_end(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,int in_err)815 int innodb_clone_apply_end(handlerton *hton, THD *thd, const byte *loc,
816 uint loc_len, uint task_id, int in_err) {
817 auto err = innodb_clone_end(hton, thd, loc, loc_len, task_id, in_err);
818 return (err);
819 }
820
821 /* Logical bitmap for clone file state. */
822
823 /** Data file is found. */
824 const int FILE_DATA = 1;
825 /** Saved data file is found */
826 const int FILE_SAVED = 10;
827 /** Cloned data file is found */
828 const int FILE_CLONED = 100;
829
830 /** NONE state: file not present. */
831 const int FILE_STATE_NONE = 0;
832 /** Normal state: only data file is present. */
833 const int FILE_STATE_NORMAL = FILE_DATA;
834 /** Saved state: only saved data file is present. */
835 const int FILE_STATE_SAVED = FILE_SAVED;
836 /** Cloned state: data file and cloned data file are present. */
837 const int FILE_STATE_CLONED = FILE_DATA + FILE_CLONED;
838 /** Saved clone state: saved data file and cloned data file are present. */
839 const int FILE_STATE_CLONE_SAVED = FILE_SAVED + FILE_CLONED;
840 /** Replaced state: saved data file and data file are present. */
841 const int FILE_STATE_REPLACED = FILE_SAVED + FILE_DATA;
842
843 /* Clone data File state transfer.
844 [FILE_STATE_NORMAL] --> [FILE_STATE_CLONED]
845 Remote data is cloned into another file named <file_name>.clone.
846
847 [FILE_STATE_CLONED] --> [FILE_STATE_CLONE_SAVED]
848 Before recovery the datafile is saved in a file named <file_name>.save.
849
850 [FILE_STATE_CLONE_SAVED] --> [FILE_STATE_REPLACED]
851 Before recovery the cloned file is moved to datafile.
852
853 [FILE_STATE_REPLACED] --> [FILE_STATE_NORMAL]
854 After successful recovery the saved data file is removed.
855
856 Every state transition involves a single file create, delete or rename and
857 we consider them atomic. In case of a failure the state rolls back exactly
858 in reverse order.
859 */
860
861 /** Get current state of a clone file.
862 @param[in] data_file data file name
863 @return current file state. */
get_file_state(std::string data_file)864 static int get_file_state(std::string data_file) {
865 int state = 0;
866 /* Check if data file is there. */
867 if (file_exists(data_file)) {
868 state += FILE_DATA;
869 }
870
871 std::string saved_file(data_file);
872 saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
873
874 /* Check if saved old file is there. */
875 if (file_exists(saved_file)) {
876 state += FILE_SAVED;
877 }
878
879 std::string cloned_file(data_file);
880 cloned_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
881
882 /* Check if cloned file is there. */
883 if (file_exists(cloned_file)) {
884 state += FILE_CLONED;
885 }
886 return (state);
887 }
888
889 /** Roll forward clone file state till final state.
890 @param[in] data_file data file name
891 @param[in] final_state data file state to forward to
892 @return previous file state before roll forward. */
file_roll_forward(std::string & data_file,int final_state)893 static int file_roll_forward(std::string &data_file, int final_state) {
894 auto cur_state = get_file_state(data_file);
895
896 switch (cur_state) {
897 case FILE_STATE_CLONED: {
898 if (final_state == FILE_STATE_CLONED) {
899 break;
900 }
901 /* Save data file */
902 std::string saved_file(data_file);
903 saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
904 rename_file(data_file, saved_file);
905 ib::info(ER_IB_CLONE_STATUS_FILE)
906 << "Clone File Roll Forward: Save data file " << data_file
907 << " state: " << cur_state;
908 }
909 /* Fall through */
910
911 case FILE_STATE_CLONE_SAVED: {
912 if (final_state == FILE_STATE_CLONE_SAVED) {
913 break;
914 }
915 /* Replace data file with cloned file. */
916 std::string cloned_file(data_file);
917 cloned_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
918 rename_file(cloned_file, data_file);
919 ib::info(ER_IB_CLONE_STATUS_FILE)
920 << "Clone File Roll Forward: Rename clone to data file " << data_file
921 << " state: " << cur_state;
922 }
923 /* Fall through */
924
925 case FILE_STATE_REPLACED: {
926 if (final_state == FILE_STATE_REPLACED) {
927 break;
928 }
929 /* Remove saved data file */
930 std::string saved_file(data_file);
931 saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
932 remove_file(saved_file);
933 ib::info(ER_IB_CLONE_STATUS_FILE)
934 << "Clone File Roll Forward: Remove saved data file " << data_file
935 << " state: " << cur_state;
936 }
937 /* Fall through */
938
939 case FILE_STATE_NORMAL:
940 /* Nothing to do. */
941 break;
942
943 default:
944 ib::fatal(ER_IB_CLONE_STATUS_FILE)
945 << "Clone File Roll Forward: Invalid File State: " << cur_state;
946 }
947 return (cur_state);
948 }
949
950 /** Roll back clone file state to normal state.
951 @param[in] data_file data file name */
file_rollback(std::string & data_file)952 static void file_rollback(std::string &data_file) {
953 auto cur_state = get_file_state(data_file);
954
955 switch (cur_state) {
956 case FILE_STATE_REPLACED: {
957 /* Replace data file back to cloned file. */
958 std::string cloned_file(data_file);
959 cloned_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
960 rename_file(data_file, cloned_file);
961 ib::info(ER_IB_CLONE_STATUS_FILE)
962 << "Clone File Roll Back: Rename data to cloned file " << data_file
963 << " state: " << cur_state;
964 }
965 /* Fall through */
966
967 case FILE_STATE_CLONE_SAVED: {
968 /* Replace data file with saved file. */
969 std::string saved_file(data_file);
970 saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
971 rename_file(saved_file, data_file);
972 ib::info(ER_IB_CLONE_STATUS_FILE)
973 << "Clone File Roll Back: Rename saved to data file " << data_file
974 << " state: " << cur_state;
975 }
976 /* Fall through */
977
978 case FILE_STATE_CLONED: {
979 /* Remove cloned data file. */
980 std::string cloned_file(data_file);
981 cloned_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
982 remove_file(cloned_file);
983 ib::info(ER_IB_CLONE_STATUS_FILE)
984 << "Clone File Roll Back: Remove cloned file " << data_file
985 << " state: " << cur_state;
986 }
987 /* Fall through */
988
989 case FILE_STATE_NORMAL:
990 /* Nothing to do. */
991 break;
992
993 default:
994 ib::fatal(ER_IB_CLONE_STATUS_FILE)
995 << "Clone File Roll Back: Invalid File State: " << cur_state;
996 }
997 }
998
999 /* Clone old data File state transfer. These files are present only in
1000 recipient and we haven't drop the database objects (table/tablespace)
1001 before clone. Currently used for user created undo tablespace. Dropping
1002 undo tablespace could be expensive as we need to wait for purge to finish.
1003 [FILE_STATE_NORMAL] --> [FILE_STATE_SAVED]
1004 Before recovery the old datafile is saved in a file named <file_name>.save.
1005
1006 [FILE_STATE_SAVED] --> [FILE_STATE_NONE]
1007 After successful recovery the saved data file is removed.
1008
1009 These state transitions involve a single file delete or rename and
1010 we consider them atomic. In case of a failure the state rolls back.
1011
1012 [FILE_STATE_SAVED] --> [FILE_STATE_NORMAL]
1013 On failure saved data file is moved back to original data file.
1014 */
1015
1016 /** Roll forward old data file state till final state.
1017 @param[in] data_file data file name
1018 @param[in] final_state data file state to forward to */
old_file_roll_forward(std::string & data_file,int final_state)1019 static void old_file_roll_forward(std::string &data_file, int final_state) {
1020 auto cur_state = get_file_state(data_file);
1021
1022 switch (cur_state) {
1023 case FILE_STATE_CLONED:
1024 case FILE_STATE_CLONE_SAVED:
1025 case FILE_STATE_REPLACED:
1026 /* If the file is also cloned, we can skip here as it would be handled
1027 with other cloned files. */
1028 ib::info(ER_IB_CLONE_STATUS_FILE)
1029 << "Clone Old File Roll Forward: Skipped cloned file " << data_file
1030 << " state: " << cur_state;
1031 break;
1032 case FILE_STATE_NORMAL: {
1033 if (final_state == FILE_STATE_NORMAL) {
1034 ut_ad(false);
1035 break;
1036 }
1037 /* Save data file */
1038 std::string saved_file(data_file);
1039 saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
1040 rename_file(data_file, saved_file);
1041 ib::info(ER_IB_CLONE_STATUS_FILE)
1042 << "Clone Old File Roll Forward: Saved data file " << data_file
1043 << " state: " << cur_state;
1044 }
1045 /* Fall through */
1046
1047 case FILE_STATE_SAVED: {
1048 if (final_state == FILE_STATE_SAVED) {
1049 break;
1050 }
1051 /* Remove saved data file */
1052 std::string saved_file(data_file);
1053 saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
1054 remove_file(saved_file);
1055 ib::info(ER_IB_CLONE_STATUS_FILE)
1056 << "Clone Old File Roll Forward: Remove saved file " << data_file
1057 << " state: " << cur_state;
1058 }
1059 /* Fall through */
1060
1061 case FILE_STATE_NONE:
1062 /* Nothing to do. */
1063 break;
1064
1065 default:
1066 ib::fatal(ER_IB_CLONE_STATUS_FILE)
1067 << "Clone Old File Roll Forward: Invalid File State: " << cur_state;
1068 }
1069 }
1070
1071 /** Roll back old data file state to normal state.
1072 @param[in] data_file data file name */
old_file_rollback(std::string & data_file)1073 static void old_file_rollback(std::string &data_file) {
1074 auto cur_state = get_file_state(data_file);
1075
1076 switch (cur_state) {
1077 case FILE_STATE_CLONED:
1078 case FILE_STATE_CLONE_SAVED:
1079 case FILE_STATE_REPLACED:
1080 /* If the file is also cloned, we can skip here as it would be handled
1081 with other cloned files. */
1082 ib::info(ER_IB_CLONE_STATUS_FILE)
1083 << "Clone Old File Roll Back: Skip cloned file " << data_file
1084 << " state: " << cur_state;
1085 break;
1086
1087 case FILE_STATE_SAVED: {
1088 /* Replace data file with saved file. */
1089 std::string saved_file(data_file);
1090 saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
1091 rename_file(saved_file, data_file);
1092 ib::info(ER_IB_CLONE_STATUS_FILE)
1093 << "Clone Old File Roll Back: Renamed saved data file " << data_file
1094 << " state: " << cur_state;
1095 }
1096 /* Fall through */
1097
1098 case FILE_STATE_NORMAL:
1099 case FILE_STATE_NONE:
1100 /* Nothing to do. */
1101 break;
1102
1103 default:
1104 ib::fatal(ER_IB_CLONE_STATUS_FILE)
1105 << "Clone Old File Roll Back: Invalid File State: " << cur_state;
1106 }
1107 }
1108
1109 /** Fatal error callback function. Don't call other functions from here. Don't
1110 use ut_a, ut_ad asserts or ib::fatal to avoid recursive invocation. */
clone_files_fatal_error()1111 static void clone_files_fatal_error() {
1112 /* Safeguard to avoid recursive call. */
1113 static bool started_error_handling = false;
1114 if (started_error_handling) {
1115 return;
1116 }
1117 started_error_handling = true;
1118
1119 std::ifstream err_file(CLONE_INNODB_ERROR_FILE);
1120 if (err_file.is_open()) {
1121 err_file.close();
1122 } else {
1123 /* Create error file if not there. */
1124 std::ofstream new_file(CLONE_INNODB_ERROR_FILE);
1125 /* On creation failure, return and abort. */
1126 if (!new_file.is_open()) {
1127 return;
1128 }
1129 new_file.close();
1130 }
1131 /* In case of fatal error, from ib::fatal and ut_a asserts
1132 we terminate the process here and send the exit status so that a
1133 managed server can be restarted with older data files. */
1134 std::_Exit(MYSQLD_RESTART_EXIT);
1135 }
1136
1137 /** Update recovery status file at end of clone recovery.
1138 @param[in] finished true if finishing clone recovery
1139 @param[in] is_error if recovery error
1140 @param[in] is_replace true, if replacing current directory */
clone_update_recovery_status(bool finished,bool is_error,bool is_replace)1141 static void clone_update_recovery_status(bool finished, bool is_error,
1142 bool is_replace) {
1143 /* true, when we are recovering a cloned database. */
1144 static bool recovery_in_progress = false;
1145 /* true, when replacing current data directory. */
1146 static bool recovery_replace = false;
1147
1148 std::function<void()> callback_function;
1149
1150 /* Mark the beginning of clone recovery. */
1151 if (!finished) {
1152 recovery_in_progress = true;
1153 if (is_replace) {
1154 recovery_replace = true;
1155 callback_function = clone_files_fatal_error;
1156 ut_set_assert_callback(callback_function);
1157 }
1158 return;
1159 }
1160 is_replace = recovery_replace;
1161 recovery_replace = false;
1162
1163 /* Update status only if clone recovery in progress. */
1164 if (!recovery_in_progress) {
1165 return;
1166 }
1167
1168 /* Mark end of clone recovery process. */
1169 recovery_in_progress = false;
1170 ut_set_assert_callback(callback_function);
1171
1172 std::string file_name;
1173
1174 file_name.assign(CLONE_INNODB_RECOVERY_FILE);
1175 if (!file_exists(file_name)) {
1176 return;
1177 }
1178
1179 std::ofstream status_file;
1180 status_file.open(file_name, std::ofstream::app);
1181 if (!status_file.is_open()) {
1182 return;
1183 }
1184
1185 /* Write zero for unsuccessful recovery. */
1186 uint64_t end_time = 0;
1187 if (is_error) {
1188 status_file << end_time << std::endl;
1189 status_file.close();
1190 /* Set recovery error so that server can restart only for replace. */
1191 clone_recovery_error = is_replace;
1192 return;
1193 }
1194
1195 /* Write recovery end time */
1196 end_time = my_micro_time();
1197 status_file << end_time << std::endl;
1198 if (!status_file.good()) {
1199 status_file.close();
1200 return;
1201 }
1202
1203 mtr_t mtr;
1204 mtr.start();
1205 byte *binlog_pos = trx_sysf_get(&mtr) + TRX_SYS_MYSQL_LOG_INFO;
1206
1207 /* Check logfile magic number. */
1208 if (mach_read_from_4(binlog_pos + TRX_SYS_MYSQL_LOG_MAGIC_N_FLD) !=
1209 TRX_SYS_MYSQL_LOG_MAGIC_N) {
1210 mtr.commit();
1211 status_file.close();
1212 return;
1213 }
1214 /* Write binary log file name. */
1215 status_file << binlog_pos + TRX_SYS_MYSQL_LOG_NAME << std::endl;
1216 if (!status_file.good()) {
1217 mtr.commit();
1218 status_file.close();
1219 return;
1220 }
1221
1222 auto high = mach_read_from_4(binlog_pos + TRX_SYS_MYSQL_LOG_OFFSET_HIGH);
1223 auto low = mach_read_from_4(binlog_pos + TRX_SYS_MYSQL_LOG_OFFSET_LOW);
1224
1225 auto log_offset = static_cast<uint64_t>(high);
1226 log_offset = (log_offset << 32);
1227 log_offset |= static_cast<uint64_t>(low);
1228
1229 /* Write log file offset. */
1230 status_file << log_offset << std::endl;
1231
1232 mtr.commit();
1233 status_file.close();
1234 /* Set clone startup for GR, only during replace. */
1235 clone_startup = is_replace;
1236 }
1237
1238 /** Initialize recovery status for cloned recovery.
1239 @param[in] replace we are replacing current directory. */
clone_init_recovery_status(bool replace)1240 static void clone_init_recovery_status(bool replace) {
1241 std::string file_name;
1242 file_name.assign(CLONE_INNODB_RECOVERY_FILE);
1243
1244 std::ofstream status_file;
1245 status_file.open(file_name, std::ofstream::out | std::ofstream::trunc);
1246 if (!status_file.is_open()) {
1247 return;
1248 }
1249 /* Write recovery begin time */
1250 uint64_t begin_time = my_micro_time();
1251 status_file << begin_time << std::endl;
1252 status_file.close();
1253 clone_update_recovery_status(false, false, replace);
1254 }
1255
clone_update_gtid_status(std::string & gtids)1256 void clone_update_gtid_status(std::string >ids) {
1257 /* Return if not clone database recovery. */
1258 std::string replace_files(CLONE_INNODB_REPLACED_FILES);
1259 if (!file_exists(replace_files)) {
1260 return;
1261 }
1262 /* Return if status file is not created. */
1263 std::string recovery_file(CLONE_INNODB_RECOVERY_FILE);
1264 if (!file_exists(recovery_file)) {
1265 ut_ad(false);
1266 return;
1267 }
1268 /* Open status file to append GTID. */
1269 std::ofstream status_file;
1270 status_file.open(recovery_file, std::ofstream::app);
1271 if (!status_file.is_open()) {
1272 return;
1273 }
1274 status_file << gtids << std::endl;
1275 status_file.close();
1276
1277 /* Remove replace file after successful recovery and status update. */
1278 std::ifstream files;
1279 files.open(replace_files);
1280
1281 if (files.is_open()) {
1282 /* If file is not empty, we are replacing data directory. */
1283 std::string file_name;
1284 if (std::getline(files, file_name)) {
1285 clone_startup = true;
1286 }
1287 files.close();
1288 }
1289 remove_file(replace_files);
1290 }
1291
clone_files_error()1292 void clone_files_error() {
1293 /* Check if clone file directory exists. */
1294 if (!os_file_exists(CLONE_FILES_DIR)) {
1295 return;
1296 }
1297
1298 std::string err_file(CLONE_INNODB_ERROR_FILE);
1299
1300 /* Create error status file if not there. */
1301 if (!file_exists(err_file)) {
1302 create_file(err_file);
1303 }
1304
1305 std::ifstream files;
1306 std::string data_file;
1307
1308 /* Open old file to get all files to be moved. */
1309 files.open(CLONE_INNODB_OLD_FILES);
1310 if (files.is_open()) {
1311 /* Extract and process all files to be moved */
1312 while (std::getline(files, data_file)) {
1313 old_file_rollback(data_file);
1314 }
1315 files.close();
1316 std::string old_files(CLONE_INNODB_OLD_FILES);
1317 remove_file(old_files);
1318 }
1319
1320 /* Open file to get all files to be replaced. */
1321 files.open(CLONE_INNODB_REPLACED_FILES);
1322 if (files.is_open()) {
1323 /* Extract and process all files to be replaced */
1324 while (std::getline(files, data_file)) {
1325 file_rollback(data_file);
1326 }
1327 files.close();
1328 std::string replace_files(CLONE_INNODB_REPLACED_FILES);
1329 remove_file(replace_files);
1330 }
1331
1332 /* Open file to get all new files to delete. */
1333 files.open(CLONE_INNODB_NEW_FILES);
1334 if (files.is_open()) {
1335 /* Extract and process all files to be replaced */
1336 while (std::getline(files, data_file)) {
1337 remove_file(data_file);
1338 }
1339 files.close();
1340 std::string new_files(CLONE_INNODB_NEW_FILES);
1341 remove_file(new_files);
1342 }
1343
1344 /* Remove error status file. */
1345 remove_file(err_file);
1346
1347 /* Update recovery status file for recovery error. */
1348 clone_update_recovery_status(true, true, true);
1349 }
1350
1351 #ifdef UNIV_DEBUG
clone_check_recovery_crashpoint(bool is_cloned_db)1352 bool clone_check_recovery_crashpoint(bool is_cloned_db) {
1353 if (!is_cloned_db) {
1354 return (true);
1355 }
1356 std::string crash_file(CLONE_INNODB_RECOVERY_CRASH_POINT);
1357
1358 if (file_exists(crash_file)) {
1359 remove_file(crash_file);
1360 return (false);
1361 }
1362 return (true);
1363 }
1364 #endif
1365
clone_files_recovery(bool finished)1366 void clone_files_recovery(bool finished) {
1367 /* Clone error file is present in case of error. */
1368 std::string file_name;
1369 file_name.assign(CLONE_INNODB_ERROR_FILE);
1370
1371 if (file_exists(file_name)) {
1372 ut_ad(!finished);
1373 clone_files_error();
1374 return;
1375 }
1376
1377 /* if replace file is not present, remove old file. */
1378 if (!finished) {
1379 std::string replace_files(CLONE_INNODB_REPLACED_FILES);
1380 std::string old_files(CLONE_INNODB_OLD_FILES);
1381 if (!file_exists(replace_files) && file_exists(old_files)) {
1382 ut_ad(false);
1383 remove_file(old_files);
1384 }
1385 }
1386
1387 std::ifstream files;
1388
1389 /* Open files to get all old files to be saved or removed. Must handle
1390 the old files before cloned files. This is because during old file
1391 processing we need to skip the common files based on cloned state. If
1392 the cloned state is reset then these files would be considered as old
1393 files and removed. */
1394 int end_state = finished ? FILE_STATE_NONE : FILE_STATE_SAVED;
1395 files.open(CLONE_INNODB_OLD_FILES);
1396 if (files.is_open()) {
1397 /* Extract and process all files to be saved or removed */
1398 while (std::getline(files, file_name)) {
1399 old_file_roll_forward(file_name, end_state);
1400 }
1401 files.close();
1402
1403 /* Remove clone file after successful recovery. */
1404 if (finished) {
1405 std::string old_files(CLONE_INNODB_OLD_FILES);
1406 remove_file(old_files);
1407 }
1408 }
1409
1410 /* Open file to get all files to be replaced. */
1411 end_state = finished ? FILE_STATE_NORMAL : FILE_STATE_REPLACED;
1412 files.open(CLONE_INNODB_REPLACED_FILES);
1413
1414 if (files.is_open()) {
1415 int prev_state = FILE_STATE_NORMAL;
1416 /* If file is empty, it is not replace. */
1417 bool replace = false;
1418
1419 /* Extract and process all files to be replaced */
1420 while (std::getline(files, file_name)) {
1421 replace = true;
1422 prev_state = file_roll_forward(file_name, end_state);
1423 }
1424
1425 files.close();
1426
1427 if (finished) {
1428 /* Update recovery status file at the end of clone recovery. We don't
1429 remove the replace file here. It would be removed only after updating
1430 GTID state. */
1431 clone_update_recovery_status(true, false, replace);
1432 } else {
1433 /* If previous state was normal, clone recovery is already done. */
1434 if (!replace || prev_state != FILE_STATE_NORMAL) {
1435 /* Clone database recovery is started. */
1436 clone_init_recovery_status(replace);
1437 }
1438 }
1439 }
1440
1441 file_name.assign(CLONE_INNODB_NEW_FILES);
1442 auto exists = file_exists(file_name);
1443
1444 if (exists && finished) {
1445 /* Remove clone file after successful recovery. */
1446 std::string new_files(CLONE_INNODB_NEW_FILES);
1447 remove_file(new_files);
1448 }
1449 }
1450
clone_init()1451 dberr_t clone_init() {
1452 /* Check if incomplete cloned data directory */
1453 if (os_file_exists(CLONE_INNODB_IN_PROGRESS_FILE)) {
1454 return (DB_ABORT_INCOMPLETE_CLONE);
1455 }
1456
1457 /* Initialize clone files before starting recovery. */
1458 clone_files_recovery(false);
1459
1460 if (clone_sys == nullptr) {
1461 ut_ad(Clone_Sys::s_clone_sys_state == CLONE_SYS_INACTIVE);
1462 clone_sys = UT_NEW(Clone_Sys(), mem_key_clone);
1463 }
1464 Clone_Sys::s_clone_sys_state = CLONE_SYS_ACTIVE;
1465 Clone_handler::init_xa();
1466
1467 return (DB_SUCCESS);
1468 }
1469
clone_free()1470 void clone_free() {
1471 Clone_handler::uninit_xa();
1472 if (clone_sys != nullptr) {
1473 ut_ad(Clone_Sys::s_clone_sys_state == CLONE_SYS_ACTIVE);
1474
1475 UT_DELETE(clone_sys);
1476 clone_sys = nullptr;
1477 }
1478
1479 Clone_Sys::s_clone_sys_state = CLONE_SYS_INACTIVE;
1480 }
1481
clone_mark_abort(bool force)1482 bool clone_mark_abort(bool force) {
1483 bool aborted;
1484
1485 mutex_enter(clone_sys->get_mutex());
1486
1487 aborted = clone_sys->mark_abort(force);
1488
1489 mutex_exit(clone_sys->get_mutex());
1490
1491 DEBUG_SYNC_C("clone_marked_abort2");
1492
1493 return (aborted);
1494 }
1495
clone_mark_active()1496 void clone_mark_active() {
1497 mutex_enter(clone_sys->get_mutex());
1498
1499 clone_sys->mark_active();
1500
1501 mutex_exit(clone_sys->get_mutex());
1502 }
1503
clone_check_active()1504 bool clone_check_active() {
1505 mutex_enter(clone_sys->get_mutex());
1506 auto is_active = clone_sys->check_active_clone(false);
1507 mutex_exit(clone_sys->get_mutex());
1508
1509 return (is_active || Clone_handler::is_provisioning());
1510 }
1511
clone_mark_wait()1512 bool clone_mark_wait() {
1513 mutex_enter(clone_sys->get_mutex());
1514 auto success = clone_sys->mark_wait();
1515 mutex_exit(clone_sys->get_mutex());
1516 return (success);
1517 }
1518
clone_mark_free()1519 void clone_mark_free() {
1520 mutex_enter(clone_sys->get_mutex());
1521 clone_sys->mark_free();
1522 mutex_exit(clone_sys->get_mutex());
1523 }
1524
1525 template <typename T>
1526 using DD_Objs = std::vector<const T *>;
1527
1528 template <typename T>
1529 using DD_Objs_Iter = typename DD_Objs<T>::const_iterator;
1530
1531 using Releaser = dd::cache::Dictionary_client::Auto_releaser;
1532
1533 namespace {
1534
1535 /** Fix schema, table and tablespace. Used for two different purposes.
1536 1. After recovery from cloned database:
1537 A. Create empty data file for non-Innodb tables that are not cloned.
1538 B. Create any schema directory that is not present.
1539
1540 2. Before cloning into current data directory:
1541 A. Drop all user tables.
1542 B. Drop all user schema
1543 C. Drop all user tablespaces. */
1544
1545 class Fixup_data {
1546 public:
1547 /** Constructor.
1548 @param[in] concurrent spawn multiple threads
1549 @param[in] is_drop the operation is drop */
Fixup_data(bool concurrent,bool is_drop)1550 Fixup_data(bool concurrent, bool is_drop)
1551 : m_num_tasks(), m_concurrent(concurrent), m_drop(is_drop) {
1552 m_num_errors.store(0);
1553 }
1554
1555 /** Fix tables for which data is not cloned.
1556 @param[in,out] thd current THD
1557 @param[in] dd_objects table/schema/tablespace from DD
1558 @return true if error */
1559 template <typename T>
fix(THD * thd,const DD_Objs<T> & dd_objects)1560 bool fix(THD *thd, const DD_Objs<T> &dd_objects) {
1561 set_num_tasks(dd_objects.size());
1562
1563 using namespace std::placeholders;
1564 auto fixup_function =
1565 std::bind(&Fixup_data::fix_objects<T>, this, thd, _1, _2, _3);
1566
1567 par_for(PFS_NOT_INSTRUMENTED, dd_objects, get_num_tasks(), fixup_function);
1568
1569 return (failed());
1570 }
1571
1572 /** Remove data cloned from configuration tables which are not relevant
1573 in recipient.
1574 @param[in,out] thd current THD
1575 @return true if error */
1576 bool fix_config_tables(THD *thd);
1577
1578 /** Number of system configuration tables. */
1579 static const size_t S_NUM_CONFIG_TABLES = 0;
1580
1581 /** Array of configuration tables. */
1582 static const std::array<const char *, S_NUM_CONFIG_TABLES> s_config_tables;
1583
1584 private:
1585 /** Check and fix specific DD object.
1586 @param[in,out] thd current THD
1587 @param[in] object DD object
1588 @param[in] thread_number current thread number. */
1589 template <typename T>
1590 bool fix_one_object(THD *thd, const T *object, size_t thread_number);
1591
1592 /** Check and fix a rangle of DD objects
1593 @param[in,out] thd current THD
1594 @param[in] begin first element in current slice
1595 @param[in] end last element in current slice
1596 @param[in] thread_number current thread number. */
1597 template <typename T>
1598 void fix_objects(THD *thd, const DD_Objs_Iter<T> &begin,
1599 const DD_Objs_Iter<T> &end, size_t thread_number);
1600
1601 /** @return number of tasks. */
get_num_tasks() const1602 size_t get_num_tasks() const { return (m_num_tasks); }
1603
1604 /** Calculate and set number of new tasks to spawn.
1605 @param[in] num_entries number of entries to handle */
set_num_tasks(size_t num_entries)1606 void set_num_tasks(size_t num_entries) {
1607 /* Check if we are allowed to spawn multiple threads. Disable
1608 multithreading while dropping objects for now. We need more
1609 work to handle and pass interrupt signal to workers. */
1610 if (is_drop() || !allow_concurrent()) {
1611 m_num_tasks = 0;
1612 return;
1613 }
1614 /* Have one task for every 100 entries. */
1615 m_num_tasks = num_entries / 100;
1616
1617 #ifdef UNIV_DEBUG
1618 /* Test operation in newly spawned thread. */
1619 if (m_num_tasks == 0) {
1620 ++m_num_tasks;
1621 }
1622 #endif /* UNIV_DEBUG */
1623
1624 /* Don't go beyond 8 threads for now. */
1625 if (m_num_tasks > 8) {
1626 m_num_tasks = 8;
1627 }
1628 m_num_errors.store(0);
1629 }
1630
1631 /** @return true, if current operation is drop. */
is_drop() const1632 bool is_drop() const { return (m_drop); }
1633
1634 /** @return true, if concurrency is allowed. */
allow_concurrent() const1635 bool allow_concurrent() const { return (m_concurrent); }
1636
1637 /** Get the table operation string.
1638 @return sql key word for the operation. */
sql_operation()1639 const char *sql_operation() {
1640 if (is_drop()) {
1641 return ("DROP");
1642 }
1643 /* Alternative action is truncate. */
1644 return ("TRUNCATE");
1645 }
1646
1647 /** Check if the current SE type should be skipped.
1648 @param[in] type SE type
1649 @return true iff the SE needs to be skipped. */
skip_se_tables(enum legacy_db_type type)1650 bool skip_se_tables(enum legacy_db_type type) {
1651 /* Don't skip any specific DB during drop operation. All existing
1652 user tables are dropped before cloning a remote database. */
1653 if (is_drop()) {
1654 return (false);
1655 }
1656 /* Truncate only MyISAM and CSV tables. After clone we need to create
1657 empty tables for engines that are not cloned. */
1658 if (type == DB_TYPE_MYISAM || type == DB_TYPE_CSV_DB) {
1659 return (false);
1660 }
1661 return (true);
1662 }
1663
1664 /** Check if the schema is performance schema.
1665 @param[in] schema_name schema name
1666 @return true iff performance schema. */
is_performance_schema(const char * schema_name) const1667 bool is_performance_schema(const char *schema_name) const {
1668 return (0 == strcmp(schema_name, PERFORMANCE_SCHEMA_DB_NAME.str));
1669 }
1670
1671 /** Check if the current schema is system schema
1672 @param[in] schema_name schema name
1673 @return true iff system schema. */
is_system_schema(const char * schema_name) const1674 bool is_system_schema(const char *schema_name) const {
1675 if (0 == strcmp(schema_name, MYSQL_SCHEMA_NAME.str) ||
1676 0 == strcmp(schema_name, "sys") ||
1677 0 == strcmp(schema_name, PERFORMANCE_SCHEMA_DB_NAME.str) ||
1678 0 == strcmp(schema_name, INFORMATION_SCHEMA_NAME.str)) {
1679 return (true);
1680 }
1681 return (false);
1682 }
1683
1684 /** Check if the current schema tables needs to be skipped.
1685 @param[in] table DD table
1686 @param[in] table_name table name
1687 @param[in] schema_name schema name
1688 @return true iff table needs to be skipped. */
skip_schema_tables(const dd::Table * table,const char * table_name,const char * schema_name)1689 bool skip_schema_tables(const dd::Table *table, const char *table_name,
1690 const char *schema_name) {
1691 /* Skip specific tables only during drop. */
1692 if (!is_drop()) {
1693 return (false);
1694 }
1695
1696 /* Handle only visible base tables. */
1697 if (table->type() != dd::enum_table_type::BASE_TABLE ||
1698 table->hidden() != dd::Abstract_table::HT_VISIBLE) {
1699 return (true);
1700 }
1701
1702 /* Don't Skip tables in non-system schemas. */
1703 if (!is_system_schema(schema_name)) {
1704 return (false);
1705 }
1706
1707 /* Skip DD system tables. */
1708 if (table->is_explicit_tablespace() &&
1709 table->tablespace_id() == dd::Dictionary_impl::dd_tablespace_id()) {
1710 return (true);
1711 }
1712
1713 /* Skip all in information_schema and performance_schema tables. */
1714 if (0 == strcmp(schema_name, PERFORMANCE_SCHEMA_DB_NAME.str) ||
1715 0 == strcmp(schema_name, INFORMATION_SCHEMA_NAME.str)) {
1716 return (true);
1717 }
1718
1719 /* Skip specific tables in mysql schema. */
1720 if (0 == strcmp(schema_name, MYSQL_SCHEMA_NAME.str) &&
1721 (0 == strcmp(table_name, GENERAL_LOG_NAME.str) ||
1722 0 == strcmp(table_name, SLOW_LOG_NAME.str))) {
1723 return (true);
1724 }
1725
1726 /* Skip specific tables in sys schema. */
1727 if (0 == strcmp(schema_name, "sys") &&
1728 0 == strcmp(table_name, "sys_config")) {
1729 return (true);
1730 }
1731
1732 return (false);
1733 }
1734
1735 /** Check if the current schema needs to be skipped.
1736 @param[in] schema_name schema name
1737 @return true iff schema needs to be skipped. */
skip_schema(const char * schema_name)1738 bool skip_schema(const char *schema_name) {
1739 /* Don't drop system schema. */
1740 if (is_drop()) {
1741 return (is_system_schema(schema_name));
1742 }
1743 /* Information schema has no directory */
1744 if (0 == strcmp(schema_name, INFORMATION_SCHEMA_NAME.str)) {
1745 return (true);
1746 }
1747 return (false);
1748 }
1749
1750 /** Check if the current tablespace needs to be skipped.
1751 @param[in,out] thd current THD
1752 @param[in] dd_space dd tablespace
1753 @return true iff tablespace needs to be skipped. */
skip_tablespace(THD * thd,const dd::Tablespace * dd_space)1754 bool skip_tablespace(THD *thd, const dd::Tablespace *dd_space) {
1755 /* System tablespaces are in Innodb. Skip other engines. */
1756 auto se =
1757 ha_resolve_by_name_raw(thd, lex_cstring_handle(dd_space->engine()));
1758 auto se_type = ha_legacy_type(se ? plugin_data<handlerton *>(se) : nullptr);
1759 plugin_unlock(thd, se);
1760 if (se_type != DB_TYPE_INNODB) {
1761 return (false);
1762 }
1763
1764 /* Skip system tablespace by name. */
1765 const auto space_name = dd_space->name().c_str();
1766 const char *innodb_prefix = "innodb_";
1767 const char *sys_prefix = "sys/";
1768 if (0 == strcmp(space_name, "mysql") ||
1769 0 == strncmp(space_name, sys_prefix, strlen(sys_prefix)) ||
1770 0 == strncmp(space_name, innodb_prefix, strlen(innodb_prefix))) {
1771 return (true);
1772 }
1773
1774 /* Skip undo tablespaces. */
1775 auto &se_data = dd_space->se_private_data();
1776 space_id_t space_id = SPACE_UNKNOWN;
1777
1778 if (se_data.get(dd_space_key_strings[DD_SPACE_ID], &space_id) ||
1779 space_id == SPACE_UNKNOWN) {
1780 ut_ad(false);
1781 return (false);
1782 }
1783 bool is_undo = fsp_is_undo_tablespace(space_id);
1784
1785 /* Add skipped undo tablespace files to list of old files to remove. */
1786 if (is_undo && !allow_concurrent()) {
1787 auto dd_file = *(dd_space->files().begin());
1788 clone_add_to_list_file(CLONE_INNODB_OLD_FILES,
1789 dd_file->filename().c_str());
1790 /* In rare case, the undo might be kept halfway truncated due to some
1791 error during truncate. Check and add truncate log file as old file if
1792 present. */
1793 undo::Tablespace undo_space(space_id);
1794 const char *log_file_name = undo_space.log_file_name();
1795
1796 if (os_file_exists(log_file_name)) {
1797 clone_add_to_list_file(CLONE_INNODB_OLD_FILES, log_file_name);
1798 }
1799 }
1800
1801 /* Skip all undo tablespaces. */
1802 if (is_undo) {
1803 return (true);
1804 }
1805
1806 /* Check and skip file per table tablespace. */
1807 uint32_t flags = 0;
1808 if (se_data.get(dd_space_key_strings[DD_SPACE_FLAGS], &flags)) {
1809 ut_ad(false);
1810 return (false);
1811 }
1812
1813 if (fsp_is_file_per_table(space_id, flags)) {
1814 return (true);
1815 }
1816 return (false);
1817 }
1818
1819 /** Form and execute sql command.
1820 @param[in,out] thd current THD
1821 @param[in] schema_name schema name
1822 @param[in] table_name table name
1823 @param[in] tablespace_name tablespace name
1824 @param[in] thread_number current thread number. */
1825 bool execute_sql(THD *thd, const char *schema_name, const char *table_name,
1826 const char *tablespace_name, size_t thread_number);
1827
1828 /** @return true, if any thread has failed. */
failed() const1829 bool failed() const { return (m_num_errors.load() != 0); }
1830
1831 private:
1832 /** Number of tasks failed. */
1833 std::atomic_size_t m_num_errors;
1834
1835 /** Number of tasks. */
1836 size_t m_num_tasks;
1837
1838 /** Allow concurrent threads. */
1839 bool m_concurrent;
1840
1841 /** If the objects need to be dropped. */
1842 bool m_drop;
1843 };
1844
1845 /** All configuration tables for which data should not be cloned. From
1846 replication configurations only clone slave_master_info table needed by GR. */
1847 const std::array<const char *, Fixup_data::S_NUM_CONFIG_TABLES>
1848 Fixup_data::s_config_tables = {};
1849
fix_config_tables(THD * thd)1850 bool Fixup_data::fix_config_tables(THD *thd) {
1851 /* No privilege check needed for individual tables. */
1852 auto saved_sctx = thd->security_context();
1853 Security_context sctx(*saved_sctx);
1854 skip_grants(thd, sctx);
1855 thd->set_security_context(&sctx);
1856
1857 /* Disable binary logging. */
1858 char sql_stmt[FN_LEN + FN_LEN + 64];
1859 snprintf(sql_stmt, sizeof(sql_stmt), "SET SQL_LOG_BIN = OFF");
1860 static_cast<void>(clone_execute_query(thd, &sql_stmt[0], 1, false));
1861
1862 /* Loop through all objects and fix. */
1863 bool ret = false;
1864 for (auto table : s_config_tables) {
1865 ret = execute_sql(thd, "mysql", table, nullptr, 1);
1866 if (ret) break;
1867 }
1868 /* Set back old security context. */
1869 thd->set_security_context(saved_sctx);
1870 return (ret);
1871 }
1872
1873 template <typename T>
fix_objects(THD * thd,const DD_Objs_Iter<T> & begin,const DD_Objs_Iter<T> & end,size_t thread_number)1874 void Fixup_data::fix_objects(THD *thd, const DD_Objs_Iter<T> &begin,
1875 const DD_Objs_Iter<T> &end, size_t thread_number) {
1876 ib::info(ER_IB_CLONE_SQL) << "Clone: Fix Object count: " << (end - begin)
1877 << " task: " << thread_number;
1878
1879 bool thread_created = false;
1880
1881 /* For newly spawned threads, create server THD */
1882 if (thread_number != get_num_tasks()) {
1883 thd = create_thd(false, true, true, PSI_NOT_INSTRUMENTED);
1884 thread_created = true;
1885 }
1886
1887 /* Save system thread type to be safe. */
1888 auto saved_thd_system = thd->system_thread;
1889
1890 /* No privilege check needed for individual tables. */
1891 auto saved_sctx = thd->security_context();
1892 Security_context sctx(*saved_sctx);
1893 skip_grants(thd, sctx);
1894 thd->set_security_context(&sctx);
1895
1896 char sql_stmt[FN_LEN + FN_LEN + 64];
1897
1898 /* Disable binary logging. */
1899 snprintf(sql_stmt, sizeof(sql_stmt), "SET SQL_LOG_BIN = OFF");
1900 if (clone_execute_query(thd, &sql_stmt[0], thread_number, false)) {
1901 ++m_num_errors;
1902 }
1903
1904 /* Disable foreign key check. */
1905 snprintf(sql_stmt, sizeof(sql_stmt), "SET FOREIGN_KEY_CHECKS=0");
1906 if (clone_execute_query(thd, &sql_stmt[0], thread_number, false)) {
1907 ++m_num_errors;
1908 }
1909
1910 if (thread_created) {
1911 /* For concurrent worker threads set timeout for MDL lock. */
1912 snprintf(sql_stmt, sizeof(sql_stmt), "SET LOCAL LOCK_WAIT_TIMEOUT=1");
1913 if (clone_execute_query(thd, &sql_stmt[0], thread_number, false)) {
1914 ++m_num_errors;
1915 }
1916 }
1917
1918 /* Loop through all objects and fix. */
1919 for (auto it = begin; it != end && m_num_errors == 0; ++it) {
1920 if (fix_one_object<T>(thd, *it, thread_number)) {
1921 break;
1922 }
1923 }
1924
1925 /* Set back old security context. */
1926 thd->set_security_context(saved_sctx);
1927 thd->system_thread = saved_thd_system;
1928
1929 /* Destroy thread if newly spawned task */
1930 if (thread_created) {
1931 destroy_thd(thd);
1932 }
1933 }
1934
1935 template <>
fix_one_object(THD * thd,const dd::Table * table,size_t thread_number)1936 bool Fixup_data::fix_one_object(THD *thd, const dd::Table *table,
1937 size_t thread_number) {
1938 auto se = ha_resolve_by_name_raw(thd, lex_cstring_handle(table->engine()));
1939 auto se_type = ha_legacy_type(se ? plugin_data<handlerton *>(se) : nullptr);
1940
1941 plugin_unlock(thd, se);
1942
1943 if (skip_se_tables(se_type)) {
1944 return (false);
1945 }
1946
1947 auto dc = dd::get_dd_client(thd);
1948 Releaser releaser(dc);
1949
1950 const dd::Schema *table_schema = nullptr;
1951
1952 auto saved_thread_type = thd->system_thread;
1953 thd->system_thread = SYSTEM_THREAD_DD_INITIALIZE;
1954
1955 if (dc->acquire(table->schema_id(), &table_schema)) {
1956 ++m_num_errors;
1957 thd->system_thread = saved_thread_type;
1958 return (true);
1959 }
1960
1961 const auto schema_name = table_schema->name().c_str();
1962 const auto table_name = table->name().c_str();
1963
1964 /* For performance schema drop the SDI table. */
1965 if (is_drop() && is_performance_schema(schema_name)) {
1966 dd::sdi::drop(thd, table);
1967 }
1968 thd->system_thread = saved_thread_type;
1969
1970 if (skip_schema_tables(table, table_name, schema_name)) {
1971 return (false);
1972 }
1973
1974 /* Throw warning for MyIsam and CSV tables for which data is
1975 not cloned. These tables would be empty after clone. */
1976 if (!is_drop() && !is_system_schema(schema_name)) {
1977 ib::warn(ER_IB_CLONE_NON_INNODB_TABLE, schema_name, table_name);
1978 }
1979
1980 auto ret_val =
1981 execute_sql(thd, schema_name, table_name, nullptr, thread_number);
1982 return (ret_val);
1983 }
1984
execute_sql(THD * thd,const char * schema_name,const char * table_name,const char * tablespace_name,size_t thread_number)1985 bool Fixup_data::execute_sql(THD *thd, const char *schema_name,
1986 const char *table_name,
1987 const char *tablespace_name,
1988 size_t thread_number) {
1989 char sql_stmt[FN_LEN + FN_LEN + 64];
1990
1991 if (tablespace_name != nullptr) {
1992 /* TABLESPACE operation */
1993 snprintf(sql_stmt, sizeof(sql_stmt), "DROP TABLESPACE `%s`",
1994 tablespace_name);
1995
1996 } else if (table_name != nullptr) {
1997 /* TABLE operation */
1998 snprintf(sql_stmt, sizeof(sql_stmt), "%s TABLE `%s`.`%s`", sql_operation(),
1999 schema_name, table_name);
2000 } else {
2001 /* SCHEMA operation */
2002 snprintf(sql_stmt, sizeof(sql_stmt), "DROP SCHEMA `%s`", schema_name);
2003 }
2004
2005 auto saved_thread_type = thd->system_thread;
2006 if (!is_drop()) {
2007 /* No MDL locks during initialization phase. */
2008 thd->system_thread = SYSTEM_THREAD_DD_INITIALIZE;
2009 }
2010
2011 /* Skip error while attempting drop concurrently using multiple workers.
2012 We will handle the skipped objects later in in main thread.*/
2013 bool skip_error = is_drop() && allow_concurrent();
2014
2015 auto ret_val =
2016 clone_execute_query(thd, &sql_stmt[0], thread_number, skip_error);
2017 if (ret_val) {
2018 ++m_num_errors;
2019 }
2020
2021 thd->system_thread = saved_thread_type;
2022
2023 if (is_drop() && !ret_val && !thd->check_clone_vio()) {
2024 auto err = ER_QUERY_INTERRUPTED;
2025 my_error(ER_QUERY_INTERRUPTED, MYF(0));
2026 ++m_num_errors;
2027
2028 auto da = thd->get_stmt_da();
2029 ib::info(ER_IB_CLONE_SQL)
2030 << "Clone: Failed to " << sql_stmt << " task: " << thread_number
2031 << " code: " << err << ": "
2032 << ((da == nullptr || !da->is_error()) ? "" : da->message_text());
2033 }
2034 return (ret_val);
2035 }
2036
2037 template <>
fix_one_object(THD * thd,const dd::Schema * schema,size_t thread_number)2038 bool Fixup_data::fix_one_object(THD *thd, const dd::Schema *schema,
2039 size_t thread_number) {
2040 const auto schema_name = schema->name().c_str();
2041
2042 if (skip_schema(schema_name)) {
2043 return (false);
2044 }
2045
2046 if (is_drop()) {
2047 auto ret_val =
2048 execute_sql(thd, schema_name, nullptr, nullptr, thread_number);
2049 return (ret_val);
2050 }
2051
2052 /* Convert schema name to directory name to handle special characters. */
2053 char schema_dir[FN_REFLEN];
2054 static_cast<void>(
2055 tablename_to_filename(schema_name, schema_dir, sizeof(schema_dir)));
2056 MY_STAT stat_info;
2057 if (mysql_file_stat(key_file_misc, schema_dir, &stat_info, MYF(0)) !=
2058 nullptr) {
2059 /* Schema directory exists */
2060 return (false);
2061 }
2062
2063 if (my_mkdir(schema_dir, 0777, MYF(0)) < 0) {
2064 ib::error(ER_IB_CLONE_INTERNAL)
2065 << "Clone: Failed to create schema directory: " << schema_name
2066 << " task: " << thread_number;
2067 ++m_num_errors;
2068 return (true);
2069 }
2070
2071 ib::info(ER_IB_CLONE_SQL)
2072 << "Clone: Fixed Schema: " << schema_name << " task: " << thread_number;
2073 return (false);
2074 }
2075
2076 template <>
fix_one_object(THD * thd,const dd::Tablespace * tablespace,size_t thread_number)2077 bool Fixup_data::fix_one_object(THD *thd, const dd::Tablespace *tablespace,
2078 size_t thread_number) {
2079 ut_ad(is_drop());
2080
2081 if (skip_tablespace(thd, tablespace)) {
2082 return (false);
2083 }
2084
2085 const auto tablespace_name = tablespace->name().c_str();
2086
2087 auto ret_val =
2088 execute_sql(thd, nullptr, nullptr, tablespace_name, thread_number);
2089 return (ret_val);
2090 }
2091 } /* namespace */
2092
fix_cloned_tables(THD * thd)2093 bool fix_cloned_tables(THD *thd) {
2094 std::string fixup_file(CLONE_INNODB_FIXUP_FILE);
2095
2096 /* Check if table fix up is needed. */
2097 if (!file_exists(fixup_file)) {
2098 return (false);
2099 }
2100
2101 auto dc = dd::get_dd_client(thd);
2102 Releaser releaser(dc);
2103
2104 Fixup_data clone_fixup(true, false);
2105
2106 ib::info(ER_IB_CLONE_SQL) << "Clone Fixup: check and create schema directory";
2107 DD_Objs<dd::Schema> schemas;
2108
2109 if (dc->fetch_global_components(&schemas) || clone_fixup.fix(thd, schemas)) {
2110 return (true);
2111 }
2112
2113 ib::info(ER_IB_CLONE_SQL)
2114 << "Clone Fixup: create empty MyIsam and CSV tables";
2115 DD_Objs<dd::Table> tables;
2116
2117 if (dc->fetch_global_components(&tables) || clone_fixup.fix(thd, tables)) {
2118 return (true);
2119 }
2120
2121 ib::info(ER_IB_CLONE_SQL) << "Clone Fixup: replication configuration tables";
2122 if (clone_fixup.fix_config_tables(thd)) {
2123 return (true);
2124 }
2125
2126 ib::info(ER_IB_CLONE_SQL) << "Clone Fixup: finished successfully";
2127 remove_file(fixup_file);
2128 return (false);
2129 }
2130
clone_execute_query(THD * thd,const char * sql_stmt,size_t thread_number,bool skip_error)2131 static bool clone_execute_query(THD *thd, const char *sql_stmt,
2132 size_t thread_number, bool skip_error) {
2133 thd->set_query_id(next_query_id());
2134
2135 /* We use the code from dd::excute_query here to capture the error. */
2136 Ed_connection con(thd);
2137 std::string query(sql_stmt);
2138
2139 LEX_STRING str;
2140 lex_string_strmake(thd->mem_root, &str, query.c_str(), query.length());
2141
2142 auto saved_thd_system = thd->system_thread;
2143 /* For visibility in SHOW PROCESS LIST during execute direct. */
2144 if (thd->system_thread == NON_SYSTEM_THREAD) {
2145 thd->system_thread = SYSTEM_THREAD_BACKGROUND;
2146 }
2147
2148 if (con.execute_direct(str)) {
2149 thd->system_thread = saved_thd_system;
2150 auto sql_errno = con.get_last_errno();
2151 const char *sql_state = mysql_errno_to_sqlstate(sql_errno);
2152 const char *sql_errmsg = con.get_last_error();
2153
2154 /* Skip error, if asked. Don't skip query interruption request. */
2155 if (skip_error && sql_errno != ER_QUERY_INTERRUPTED) {
2156 ib::info(ER_IB_CLONE_SQL)
2157 << "Clone: Skipped " << sql_stmt << " task: " << thread_number
2158 << " Reason = " << sql_errno << ": " << sql_errmsg;
2159 return (false);
2160 }
2161
2162 ib::info(ER_IB_CLONE_SQL)
2163 << "Clone: Failed to " << sql_stmt << " task: " << thread_number
2164 << " code: " << sql_errno << ": " << sql_errmsg;
2165
2166 /* Update the error to THD. */
2167 auto da = thd->get_stmt_da();
2168 if (da != nullptr) {
2169 da->set_overwrite_status(true);
2170 da->set_error_status(sql_errno, sql_errmsg, sql_state);
2171 da->push_warning(thd, sql_errno, sql_state, Sql_condition::SL_ERROR,
2172 sql_errmsg);
2173 da->set_overwrite_status(false);
2174 }
2175 return (true);
2176 }
2177
2178 thd->system_thread = saved_thd_system;
2179 return (false);
2180 }
2181
clone_drop_binary_logs(THD * thd)2182 static int clone_drop_binary_logs(THD *thd) {
2183 int err = 0;
2184 /* No privilege check needed for individual tables. */
2185 auto saved_sctx = thd->security_context();
2186 Security_context sctx(*saved_sctx);
2187 skip_grants(thd, sctx);
2188 thd->set_security_context(&sctx);
2189
2190 /* 1. Attempt to stop slaves if any. */
2191 char sql_stmt[FN_LEN + FN_LEN + 64];
2192 snprintf(sql_stmt, sizeof(sql_stmt), "STOP SLAVE");
2193
2194 channel_map.rdlock();
2195 auto is_slave = is_slave_configured();
2196 channel_map.unlock();
2197
2198 if (is_slave && clone_execute_query(thd, &sql_stmt[0], 1, false)) {
2199 err = ER_INTERNAL_ERROR;
2200 my_error(err, MYF(0), "Clone failed to stop slave");
2201 }
2202
2203 if (err == 0) {
2204 /* Clear warnings if any. */
2205 thd->clear_error();
2206
2207 /* 2. Clear all binary logs and GTID. */
2208 snprintf(sql_stmt, sizeof(sql_stmt), "RESET MASTER");
2209
2210 if (clone_execute_query(thd, &sql_stmt[0], 1, false)) {
2211 err = ER_INTERNAL_ERROR;
2212 my_error(err, MYF(0), "Clone failed to reset binary logs");
2213 }
2214 }
2215
2216 /* Set back old security context. */
2217 thd->set_security_context(saved_sctx);
2218 return (err);
2219 }
2220
clone_drop_user_data(THD * thd,bool allow_threads)2221 static int clone_drop_user_data(THD *thd, bool allow_threads) {
2222 ib::warn(ER_IB_CLONE_USER_DATA, "Started");
2223 Clone_handler::set_drop_data();
2224
2225 auto dc = dd::get_dd_client(thd);
2226 Releaser releaser(dc);
2227 Fixup_data clone_fixup(allow_threads, true);
2228
2229 ib::info(ER_IB_CLONE_SQL) << "Clone Drop all user data";
2230 DD_Objs<dd::Table> tables;
2231
2232 if (dc->fetch_global_components(&tables) || clone_fixup.fix(thd, tables)) {
2233 ib::info(ER_IB_CLONE_SQL) << "Clone failed to drop all user tables";
2234 my_error(ER_INTERNAL_ERROR, MYF(0), "Clone failed to drop all user tables");
2235
2236 /* Get the first error reported. */
2237 auto da = thd->get_stmt_da();
2238 return (da->mysql_errno());
2239 }
2240
2241 ib::info(ER_IB_CLONE_SQL) << "Clone Drop User schemas";
2242 DD_Objs<dd::Schema> schemas;
2243
2244 if (dc->fetch_global_components(&schemas) || clone_fixup.fix(thd, schemas)) {
2245 ib::info(ER_IB_CLONE_SQL) << "Clone failed to drop all user schemas";
2246 my_error(ER_INTERNAL_ERROR, MYF(0),
2247 "Clone failed to drop all user schemas");
2248
2249 /* Get the first error reported. */
2250 auto da = thd->get_stmt_da();
2251 return (da->mysql_errno());
2252 }
2253
2254 ib::info(ER_IB_CLONE_SQL) << "Clone Drop User tablespaces";
2255 DD_Objs<dd::Tablespace> tablesps;
2256
2257 if (dc->fetch_global_components(&tablesps) ||
2258 clone_fixup.fix(thd, tablesps)) {
2259 ib::info(ER_IB_CLONE_SQL) << "Clone failed to drop all user tablespaces";
2260 my_error(ER_INTERNAL_ERROR, MYF(0),
2261 "Clone failed to drop all user tablespaces");
2262
2263 /* Get the first error reported. */
2264 auto da = thd->get_stmt_da();
2265 return (da->mysql_errno());
2266 }
2267
2268 /* Clean binary logs after removing all user data. */
2269 if (!allow_threads) {
2270 auto err = clone_drop_binary_logs(thd);
2271 if (err != 0) {
2272 return (err);
2273 }
2274 }
2275 ib::info(ER_IB_CLONE_SQL) << "Clone Drop: finished successfully";
2276 ib::warn(ER_IB_CLONE_USER_DATA, "Finished");
2277 return (0);
2278 }
2279