1 /*****************************************************************************
2 
3 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file clone/clone0api.cc
28  Innodb Clone Interface
29 
30  *******************************************************/
31 #include <cstdio>
32 #include <fstream>
33 #include <iostream>
34 
35 #include "clone0api.h"
36 #include "clone0clone.h"
37 #include "os0thread-create.h"
38 
39 #include "sql/clone_handler.h"
40 #include "sql/mysqld.h"
41 #include "sql/sql_class.h"
42 #include "sql/sql_prepare.h"
43 #include "sql/sql_table.h"
44 #include "sql/sql_thd_internal_api.h"
45 #include "sql/strfunc.h"
46 
47 #include "dict0dd.h"
48 #include "sql/dd/cache/dictionary_client.h"
49 #include "sql/dd/dictionary.h"
50 #include "sql/dd/impl/dictionary_impl.h"  // dd::dd_tablespace_id()
51 #include "sql/dd/impl/sdi.h"
52 #include "sql/dd/impl/utils.h"
53 #include "sql/dd/types/schema.h"
54 #include "sql/dd/types/table.h"
55 #include "sql/rpl_msr.h"  // is_slave_configured()
56 
57 /** Check if clone status file exists.
58 @param[in]	file_name	file name
59 @return true if file exists. */
file_exists(std::string & file_name)60 static bool file_exists(std::string &file_name) {
61   std::ifstream file(file_name.c_str());
62 
63   if (file.is_open()) {
64     file.close();
65     return (true);
66   }
67   return (false);
68 }
69 
70 /** Rename clone status file. The operation is expected to be atomic
71 when the files belong to same directory.
72 @param[in]	from_file	name of current file
73 @param[in]	to_file		name of new file */
rename_file(std::string & from_file,std::string & to_file)74 static void rename_file(std::string &from_file, std::string &to_file) {
75   auto ret = std::rename(from_file.c_str(), to_file.c_str());
76 
77   if (ret != 0) {
78     ib::fatal(ER_IB_CLONE_STATUS_FILE)
79         << "Error renaming file from: " << from_file.c_str()
80         << " to: " << to_file.c_str();
81   }
82 }
83 
84 /** Create clone status file.
85 @param[in]	file_name	file name */
create_file(std::string & file_name)86 static void create_file(std::string &file_name) {
87   std::ofstream file(file_name.c_str());
88 
89   if (file.is_open()) {
90     file.close();
91     return;
92   }
93   ib::error(ER_IB_CLONE_STATUS_FILE)
94       << "Error creating file : " << file_name.c_str();
95 }
96 
97 /** Delete clone status file.
98 @param[in]	file	name of file */
remove_file(std::string & file)99 static void remove_file(std::string &file) {
100   auto ret = std::remove(file.c_str());
101 
102   if (ret != 0) {
103     ib::error(ER_IB_CLONE_STATUS_FILE)
104         << "Error removing file : " << file.c_str();
105   }
106 }
107 
108 /** Create clone in progress file and error file.
109 @param[in]	clone	clone handle */
create_status_file(const Clone_Handle * clone)110 static void create_status_file(const Clone_Handle *clone) {
111   const char *path = clone->get_datadir();
112   std::string file_name;
113 
114   if (clone->replace_datadir()) {
115     /* Create error file for rollback. */
116     file_name.assign(CLONE_INNODB_ERROR_FILE);
117     create_file(file_name);
118     return;
119   }
120 
121   file_name.assign(path);
122   /* Add path separator if needed. */
123   if (file_name.back() != OS_PATH_SEPARATOR) {
124     file_name.append(OS_PATH_SEPARATOR_STR);
125   }
126   file_name.append(CLONE_INNODB_IN_PROGRESS_FILE);
127 
128   create_file(file_name);
129 }
130 
131 /** Drop clone in progress file and error file.
132 @param[in]	clone	clone handle */
drop_status_file(const Clone_Handle * clone)133 static void drop_status_file(const Clone_Handle *clone) {
134   const char *path = clone->get_datadir();
135   std::string file_name;
136 
137   if (clone->replace_datadir()) {
138     /* Indicate that clone needs table fix up on recovery. */
139     file_name.assign(CLONE_INNODB_FIXUP_FILE);
140     create_file(file_name);
141 
142     /* drop error file on success. */
143     file_name.assign(CLONE_INNODB_ERROR_FILE);
144     remove_file(file_name);
145 
146     DBUG_EXECUTE_IF("clone_recovery_crash_point", {
147       file_name.assign(CLONE_INNODB_RECOVERY_CRASH_POINT);
148       create_file(file_name);
149     });
150     return;
151   }
152 
153   std::string path_name(path);
154   /* Add path separator if needed. */
155   if (path_name.back() != OS_PATH_SEPARATOR) {
156     path_name.append(OS_PATH_SEPARATOR_STR);
157   }
158 
159   /* Indicate that clone needs table fix up on recovery. */
160   file_name.assign(path_name);
161   file_name.append(CLONE_INNODB_FIXUP_FILE);
162   create_file(file_name);
163 
164   /* Indicate clone needs to update recovery status. */
165   file_name.assign(path_name);
166   file_name.append(CLONE_INNODB_REPLACED_FILES);
167   create_file(file_name);
168 
169   /* Mark successful clone operation. */
170   file_name.assign(path_name);
171   file_name.append(CLONE_INNODB_IN_PROGRESS_FILE);
172   remove_file(file_name);
173 }
174 
clone_init_list_files()175 void clone_init_list_files() {
176   /* Remove any existing list files. */
177   std::string new_files(CLONE_INNODB_NEW_FILES);
178   if (file_exists(new_files)) {
179     remove_file(new_files);
180   }
181   std::string old_files(CLONE_INNODB_OLD_FILES);
182   if (file_exists(old_files)) {
183     remove_file(old_files);
184   }
185   std::string replaced_files(CLONE_INNODB_REPLACED_FILES);
186   if (file_exists(replaced_files)) {
187     remove_file(replaced_files);
188   }
189   std::string recovery_file(CLONE_INNODB_RECOVERY_FILE);
190   if (file_exists(recovery_file)) {
191     remove_file(recovery_file);
192   }
193 }
194 
clone_add_to_list_file(const char * list_file_name,const char * file_name)195 int clone_add_to_list_file(const char *list_file_name, const char *file_name) {
196   std::ofstream list_file;
197   list_file.open(list_file_name, std::ofstream::app);
198 
199   if (list_file.is_open()) {
200     list_file << file_name << std::endl;
201 
202     if (list_file.good()) {
203       list_file.close();
204       return (0);
205     }
206     list_file.close();
207   }
208   /* This is an error case. Either open or write call failed. */
209   char errbuf[MYSYS_STRERROR_SIZE];
210   my_error(ER_ERROR_ON_WRITE, MYF(0), list_file_name, errno,
211            my_strerror(errbuf, sizeof(errbuf), errno));
212   return (ER_ERROR_ON_WRITE);
213 }
214 
215 /** Add all existing redo files to old file list. */
track_redo_files()216 static void track_redo_files() {
217   std::string log_file;
218   for (uint32_t index = 0; index < srv_n_log_files; ++index) {
219     /* Build redo log file name. */
220     char file_name[MAX_LOG_FILE_NAME + 1];
221     snprintf(file_name, MAX_LOG_FILE_NAME, "%s%u", ib_logfile_basename, index);
222 
223     log_file.assign(srv_log_group_home_dir);
224     if (!log_file.empty() && log_file.back() != OS_PATH_SEPARATOR) {
225       log_file.append(OS_PATH_SEPARATOR_STR);
226     }
227     log_file.append(file_name);
228     clone_add_to_list_file(CLONE_INNODB_OLD_FILES, log_file.c_str());
229   }
230 }
231 
232 /** Execute sql statement.
233 @param[in,out]	thd		current THD
234 @param[in]	sql_stmt	SQL statement
235 @param[in]	thread_number	executing thread number
236 @param[in]	skip_error	skip statement on error
237 @return false, if successful. */
238 static bool clone_execute_query(THD *thd, const char *sql_stmt,
239                                 size_t thread_number, bool skip_error);
240 
241 /** Delete all binary logs before clone.
242 @param[in]	thd	current THD
243 @return error code */
244 static int clone_drop_binary_logs(THD *thd);
245 
246 /** Drop all user data before starting clone.
247 @param[in,out]	thd		current THD
248 @param[in]	allow_threads	allow multiple threads
249 @return error code */
250 static int clone_drop_user_data(THD *thd, bool allow_threads);
251 
252 /** Set security context to skip privilege check.
253 @param[in,out]	thd	session THD
254 @param[in,out]	sctx	security context */
skip_grants(THD * thd,Security_context & sctx)255 static void skip_grants(THD *thd, Security_context &sctx) {
256   /* Take care of the possible side effect of skipping grant i.e.
257   setting SYSTEM_USER privilege flag. */
258   bool saved_flag = thd->is_system_user();
259   sctx.skip_grants();
260   ut_ad(thd->is_system_user() == saved_flag);
261   thd->set_system_user(saved_flag);
262 }
263 
innodb_clone_get_capability(Ha_clone_flagset & flags)264 void innodb_clone_get_capability(Ha_clone_flagset &flags) {
265   flags.reset();
266 
267   flags.set(HA_CLONE_HYBRID);
268   flags.set(HA_CLONE_MULTI_TASK);
269   flags.set(HA_CLONE_RESTART);
270 }
271 
innodb_clone_begin(handlerton * hton,THD * thd,const byte * & loc,uint & loc_len,uint & task_id,Ha_clone_type type,Ha_clone_mode mode)272 int innodb_clone_begin(handlerton *hton, THD *thd, const byte *&loc,
273                        uint &loc_len, uint &task_id, Ha_clone_type type,
274                        Ha_clone_mode mode) {
275   /* Check if reference locator is valid */
276   if (loc != nullptr && !clone_validate_locator(loc, loc_len)) {
277     int err = ER_CLONE_PROTOCOL;
278     my_error(err, MYF(0), "Wrong Clone RPC: Invalid Locator");
279     return (err);
280   }
281 
282   /* Acquire clone system mutex which would automatically get released
283   when we return from the function [RAII]. */
284   IB_mutex_guard sys_mutex(clone_sys->get_mutex());
285 
286   /* Check if concurrent ddl has marked abort. */
287   if (Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
288     if (thd != nullptr) {
289       my_error(ER_CLONE_DDL_IN_PROGRESS, MYF(0));
290     }
291 
292     return (ER_CLONE_DDL_IN_PROGRESS);
293   }
294 
295   if (!mtr_t::s_logging.is_enabled()) {
296     if (thd != nullptr) {
297       my_error(ER_INNODB_REDO_DISABLED, MYF(0));
298     }
299     return (ER_INNODB_REDO_DISABLED);
300   }
301 
302   /* Check of clone is already in progress for the reference locator. */
303   auto clone_hdl = clone_sys->find_clone(loc, loc_len, CLONE_HDL_COPY);
304 
305   int err = 0;
306 
307   switch (mode) {
308     case HA_CLONE_MODE_RESTART:
309       /* Error out if existing clone is not found */
310       if (clone_hdl == nullptr) {
311         my_error(ER_INTERNAL_ERROR, MYF(0),
312                  "Innodb Clone Restart could not find existing clone");
313         return (ER_INTERNAL_ERROR);
314       }
315 
316       ib::info(ER_IB_CLONE_START_STOP) << "Clone Begin Master Task: Restart";
317       err = clone_hdl->restart_copy(thd, loc, loc_len);
318 
319       break;
320 
321     case HA_CLONE_MODE_START: {
322       /* Should not find existing clone for the locator */
323       if (clone_hdl != nullptr) {
324         clone_sys->drop_clone(clone_hdl);
325         my_error(ER_INTERNAL_ERROR, MYF(0),
326                  "Innodb Clone Begin refers existing clone");
327         return (ER_INTERNAL_ERROR);
328       }
329       LEX_CSTRING sctx_user = thd->m_main_security_ctx.user();
330       LEX_CSTRING sctx_host = thd->m_main_security_ctx.host_or_ip();
331 
332       /* Should not become a donor when provisioning is started. */
333       if (Clone_handler::is_provisioning()) {
334         if (0 == strcmp(my_localhost, sctx_host.str)) {
335           my_error(ER_CLONE_LOOPBACK, MYF(0));
336           return (ER_CLONE_LOOPBACK);
337         }
338         my_error(ER_CLONE_TOO_MANY_CONCURRENT_CLONES, MYF(0), MAX_CLONES);
339         return (ER_CLONE_TOO_MANY_CONCURRENT_CLONES);
340       }
341 
342       /* Log user and host beginning clone operation. */
343       ib::info(ER_IB_CLONE_START_STOP) << "Clone Begin Master Task by "
344                                        << sctx_user.str << "@" << sctx_host.str;
345       break;
346     }
347 
348     case HA_CLONE_MODE_ADD_TASK:
349       /* Should find existing clone for the locator */
350       if (clone_hdl == nullptr) {
351         /* Operation has finished already */
352         my_error(ER_INTERNAL_ERROR, MYF(0),
353                  "Innodb Clone add task refers non-existing clone");
354 
355         return (ER_INTERNAL_ERROR);
356       }
357       break;
358 
359     case HA_CLONE_MODE_VERSION:
360     case HA_CLONE_MODE_MAX:
361     default:
362       ut_ad(false);
363       my_error(ER_INTERNAL_ERROR, MYF(0), "Innodb Clone Begin Invalid Mode");
364 
365       return (ER_INTERNAL_ERROR);
366   }
367 
368   if (clone_hdl == nullptr) {
369     ut_ad(thd != nullptr);
370     ut_ad(mode == HA_CLONE_MODE_START);
371 
372     /* Create new clone handle for copy. Reference locator
373     is used for matching the version. */
374     auto err = clone_sys->add_clone(loc, CLONE_HDL_COPY, clone_hdl);
375     if (err != 0) {
376       return (err);
377     }
378 
379     err = clone_hdl->init(loc, loc_len, type, nullptr);
380 
381     /* Check and wait if clone is marked for wait. */
382     if (err == 0) {
383       err = clone_sys->wait_for_free(thd);
384     }
385 
386     if (err != 0) {
387       clone_sys->drop_clone(clone_hdl);
388       return (err);
389     }
390   }
391 
392   /* Add new task for the clone copy operation. */
393   if (err == 0) {
394     /* Release clone system mutex here as we might need to wait while
395     adding task. It is safe as the clone handle is acquired and cannot
396     be freed till we release it. */
397     mutex_exit(clone_sys->get_mutex());
398     err = clone_hdl->add_task(thd, nullptr, 0, task_id);
399     mutex_enter(clone_sys->get_mutex());
400   }
401 
402   if (err != 0) {
403     clone_sys->drop_clone(clone_hdl);
404     return (err);
405   }
406 
407   if (task_id > 0) {
408     ib::info(ER_IB_CLONE_START_STOP) << "Clone Begin Task ID: " << task_id;
409   }
410 
411   /* Get the current locator from clone handle. */
412   loc = clone_hdl->get_locator(loc_len);
413   return (0);
414 }
415 
innodb_clone_copy(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,Ha_clone_cbk * cbk)416 int innodb_clone_copy(handlerton *hton, THD *thd, const byte *loc, uint loc_len,
417                       uint task_id, Ha_clone_cbk *cbk) {
418   cbk->set_hton(hton);
419 
420   /* Get clone handle by locator index. */
421   auto clone_hdl = clone_sys->get_clone_by_index(loc, loc_len);
422 
423   auto err = clone_hdl->check_error(thd);
424   if (err != 0) {
425     return (err);
426   }
427 
428   /* Start data copy. */
429   err = clone_hdl->copy(thd, task_id, cbk);
430   clone_hdl->save_error(err);
431 
432   return (err);
433 }
434 
innodb_clone_ack(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,int in_err,Ha_clone_cbk * cbk)435 int innodb_clone_ack(handlerton *hton, THD *thd, const byte *loc, uint loc_len,
436                      uint task_id, int in_err, Ha_clone_cbk *cbk) {
437   cbk->set_hton(hton);
438 
439   /* Check if reference locator is valid */
440   if (loc != nullptr && !clone_validate_locator(loc, loc_len)) {
441     int err = ER_CLONE_PROTOCOL;
442     my_error(err, MYF(0), "Wrong Clone RPC: Invalid Locator");
443     return (err);
444   }
445   mutex_enter(clone_sys->get_mutex());
446 
447   /* Find attach clone handle using the reference locator. */
448   auto clone_hdl = clone_sys->find_clone(loc, loc_len, CLONE_HDL_COPY);
449 
450   mutex_exit(clone_sys->get_mutex());
451 
452   /* Must find existing clone for the locator */
453   if (clone_hdl == nullptr) {
454     my_error(ER_INTERNAL_ERROR, MYF(0),
455 
456              "Innodb Clone ACK refers non-existing clone");
457     return (ER_INTERNAL_ERROR);
458   }
459 
460   int err = 0;
461 
462   /* If thread is interrupted, then set interrupt error instead. */
463   if (thd_killed(thd)) {
464     my_error(ER_QUERY_INTERRUPTED, MYF(0));
465     in_err = ER_QUERY_INTERRUPTED;
466   }
467 
468   if (in_err == 0) {
469     /* Apply acknowledged data */
470     err = clone_hdl->apply(thd, task_id, cbk);
471 
472     clone_hdl->save_error(err);
473   } else {
474     /* For error input, return after saving it */
475     ib::info(ER_IB_CLONE_OPERATION) << "Clone set error ACK: " << in_err;
476     clone_hdl->save_error(in_err);
477   }
478 
479   mutex_enter(clone_sys->get_mutex());
480 
481   /* Detach from clone handle */
482   clone_sys->drop_clone(clone_hdl);
483 
484   mutex_exit(clone_sys->get_mutex());
485 
486   return (err);
487 }
488 
innodb_clone_end(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,int in_err)489 int innodb_clone_end(handlerton *hton, THD *thd, const byte *loc, uint loc_len,
490                      uint task_id, int in_err) {
491   /* Acquire clone system mutex which would automatically get released
492   when we return from the function [RAII]. */
493   IB_mutex_guard sys_mutex(clone_sys->get_mutex());
494 
495   /* Get clone handle by locator index. */
496   auto clone_hdl = clone_sys->get_clone_by_index(loc, loc_len);
497 
498   /* If thread is interrupted, then set interrupt error instead. */
499   if (thd_killed(thd)) {
500     my_error(ER_QUERY_INTERRUPTED, MYF(0));
501     in_err = ER_QUERY_INTERRUPTED;
502   }
503   /* Set error, if already not set */
504   clone_hdl->save_error(in_err);
505 
506   /* Drop current task. */
507   bool is_master = false;
508   auto wait_reconnect = clone_hdl->drop_task(thd, task_id, in_err, is_master);
509   auto is_copy = clone_hdl->is_copy_clone();
510   auto is_init = clone_hdl->is_init();
511   auto is_abort = clone_hdl->is_abort();
512 
513   if (!wait_reconnect || is_abort) {
514     if (is_copy && is_master) {
515       if (is_abort) {
516         ib::info(ER_IB_CLONE_RESTART)
517             << "Clone Master aborted by concurrent clone";
518 
519       } else if (in_err != 0) {
520         /* Make sure re-start attempt fails immediately */
521         clone_hdl->set_state(CLONE_STATE_ABORT);
522       }
523     }
524 
525     if (!is_copy && !is_init && is_master) {
526       if (in_err == 0) {
527         /* On success for apply handle, drop status file. */
528         drop_status_file(clone_hdl);
529       } else if (clone_hdl->replace_datadir()) {
530         /* On failure, rollback if replacing current data directory. */
531         clone_files_error();
532       }
533     }
534     clone_sys->drop_clone(clone_hdl);
535 
536     auto da = thd->get_stmt_da();
537     ib::info(ER_IB_CLONE_START_STOP)
538         << "Clone"
539         << (is_copy ? " End" : (is_init ? " Apply Version End" : " Apply End"))
540         << (is_master ? " Master" : "") << " Task ID: " << task_id
541         << (in_err != 0 ? " Failed, code: " : " Passed, code: ") << in_err
542         << ": "
543         << ((in_err == 0 || da == nullptr || !da->is_error())
544                 ? ""
545                 : da->message_text());
546     return (0);
547   }
548 
549   auto da = thd->get_stmt_da();
550   ib::info(ER_IB_CLONE_RESTART)
551       << "Clone Master wait for restart"
552       << " after n/w error code: " << in_err << ": "
553       << ((da == nullptr || !da->is_error()) ? "" : da->message_text());
554 
555   ut_ad(clone_hdl->is_copy_clone());
556   ut_ad(is_master);
557 
558   /* Set state to idle and wait for re-connect */
559   clone_hdl->set_state(CLONE_STATE_IDLE);
560   /* Sleep for 1 second */
561   Clone_Msec sleep_time(Clone_Sec(1));
562   /* Generate alert message every minute. */
563   Clone_Sec alert_interval(Clone_Min(1));
564   /* Wait for 5 minutes for client to reconnect back */
565   Clone_Sec time_out(Clone_Min(5));
566 
567   bool is_timeout = false;
568   auto err = Clone_Sys::wait(
569       sleep_time, time_out, alert_interval,
570       [&](bool alert, bool &result) {
571         ut_ad(mutex_own(clone_sys->get_mutex()));
572         result = !clone_hdl->is_active();
573 
574         if (thd_killed(thd) || clone_hdl->is_interrupted()) {
575           ib::info(ER_IB_CLONE_RESTART)
576               << "Clone End Master wait for Restart interrupted";
577           my_error(ER_QUERY_INTERRUPTED, MYF(0));
578           return (ER_QUERY_INTERRUPTED);
579 
580         } else if (Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
581           ib::info(ER_IB_CLONE_RESTART)
582               << "Clone End Master wait for Restart aborted by DDL";
583           my_error(ER_CLONE_DDL_IN_PROGRESS, MYF(0));
584           return (ER_CLONE_DDL_IN_PROGRESS);
585 
586         } else if (clone_hdl->is_abort()) {
587           result = false;
588           ib::info(ER_IB_CLONE_RESTART) << "Clone End Master wait for Restart"
589                                            " aborted by concurrent clone";
590           return (0);
591         }
592 
593         if (!result) {
594           ib::info(ER_IB_CLONE_RESTART)
595               << "Clone Master restarted successfully by "
596                  "other task after n/w failure";
597 
598         } else if (alert) {
599           ib::info(ER_IB_CLONE_RESTART)
600               << "Clone Master still waiting for restart";
601         }
602         return (0);
603       },
604       clone_sys->get_mutex(), is_timeout);
605 
606   if (err == 0 && is_timeout && clone_hdl->is_idle()) {
607     ib::info(ER_IB_CLONE_TIMEOUT) << "Clone End Master wait "
608                                      "for restart timed out after "
609                                      "5 Minutes. Dropping Snapshot";
610   }
611   /* Last task should drop the clone handle. */
612   clone_sys->drop_clone(clone_hdl);
613   return (0);
614 }
615 
innodb_clone_apply_begin(handlerton * hton,THD * thd,const byte * & loc,uint & loc_len,uint & task_id,Ha_clone_mode mode,const char * data_dir)616 int innodb_clone_apply_begin(handlerton *hton, THD *thd, const byte *&loc,
617                              uint &loc_len, uint &task_id, Ha_clone_mode mode,
618                              const char *data_dir) {
619   /* Check if reference locator is valid */
620   if (loc != nullptr && !clone_validate_locator(loc, loc_len)) {
621     int err = ER_CLONE_PROTOCOL;
622     my_error(err, MYF(0), "Wrong Clone RPC: Invalid Locator");
623     return (err);
624   }
625 
626   /* Acquire clone system mutex which would automatically get released
627   when we return from the function [RAII]. */
628   IB_mutex_guard sys_mutex(clone_sys->get_mutex());
629 
630   /* Check if clone is already in progress for the reference locator. */
631   auto clone_hdl = clone_sys->find_clone(loc, loc_len, CLONE_HDL_APPLY);
632 
633   switch (mode) {
634     case HA_CLONE_MODE_RESTART: {
635       ib::info(ER_IB_CLONE_RESTART) << "Clone Apply Begin Master Task: Restart";
636       auto err = clone_hdl->restart_apply(thd, loc, loc_len);
637 
638       /* Reduce reference count */
639       clone_sys->drop_clone(clone_hdl);
640 
641       /* Restart is done by master task */
642       ut_ad(task_id == 0);
643       task_id = 0;
644 
645       return (err);
646     }
647     case HA_CLONE_MODE_START:
648 
649       if (clone_hdl != nullptr) {
650         ut_ad(false);
651         clone_sys->drop_clone(clone_hdl);
652         ib::error(ER_IB_CLONE_INTERNAL)
653             << "Clone Apply Begin Master found duplicate clone";
654         clone_hdl = nullptr;
655       }
656 
657       /* Check if the locator is from current mysqld server. */
658       clone_hdl = clone_sys->find_clone(loc, loc_len, CLONE_HDL_COPY);
659 
660       if (clone_hdl != nullptr) {
661         clone_sys->drop_clone(clone_hdl);
662         clone_hdl = nullptr;
663         ib::info(ER_IB_CLONE_START_STOP) << "Clone Apply Master Loop Back";
664         ut_ad(data_dir != nullptr);
665       }
666       ib::info(ER_IB_CLONE_START_STOP) << "Clone Apply Begin Master Task";
667       break;
668 
669     case HA_CLONE_MODE_ADD_TASK:
670       /* Should find existing clone for the locator */
671       if (clone_hdl == nullptr) {
672         /* Operation has finished already */
673         my_error(ER_INTERNAL_ERROR, MYF(0),
674                  "Innodb Clone Apply add task to non-existing clone");
675 
676         return (ER_INTERNAL_ERROR);
677       }
678       break;
679 
680     case HA_CLONE_MODE_VERSION:
681       /* Cannot have input locator or existing clone */
682       ib::info(ER_IB_CLONE_START_STOP)
683           << "Clone Apply Begin Master Version Check";
684       ut_ad(loc == nullptr);
685       ut_ad(clone_hdl == nullptr);
686       break;
687 
688     case HA_CLONE_MODE_MAX:
689     default:
690       ut_ad(false);
691 
692       my_error(ER_INTERNAL_ERROR, MYF(0),
693                "Innodb Clone Appply Begin Invalid Mode");
694 
695       return (ER_INTERNAL_ERROR);
696   }
697 
698   if (clone_hdl == nullptr) {
699     ut_ad(thd != nullptr);
700 
701     ut_ad(mode == HA_CLONE_MODE_VERSION || mode == HA_CLONE_MODE_START);
702 
703     /* Create new clone handle for apply. Reference locator
704     is used for matching the version. */
705     auto err = clone_sys->add_clone(loc, CLONE_HDL_APPLY, clone_hdl);
706     if (err != 0) {
707       return (err);
708     }
709 
710     err = clone_hdl->init(loc, loc_len, HA_CLONE_BLOCKING, data_dir);
711 
712     if (err != 0) {
713       clone_sys->drop_clone(clone_hdl);
714       return (err);
715     }
716   }
717 
718   if (clone_hdl->is_active()) {
719     /* Release clone system mutex here as we might need to wait while
720     adding task. It is safe as the clone handle is acquired and cannot
721     be freed till we release it. */
722     mutex_exit(clone_sys->get_mutex());
723 
724     /* Create status file to indicate active clone directory. */
725     if (mode == HA_CLONE_MODE_START) {
726       create_status_file(clone_hdl);
727     }
728 
729     int err = 0;
730     /* Drop any user data after acquiring backup lock. Don't allow
731     concurrent threads as the BACKUP MDL lock would not allow any
732     other threads to execute DDL. */
733     if (clone_hdl->replace_datadir() && mode == HA_CLONE_MODE_START) {
734       /* Safeguard to throw error if innodb read only mode is on. Currently
735       not reachable as we would get error much earlier while dropping user
736       tables. */
737       if (srv_read_only_mode) {
738         ut_ad(false);
739         err = ER_INTERNAL_ERROR;
740         my_error(err, MYF(0),
741                  "Clone cannot replace data with innodb_read_only = ON");
742       } else {
743         track_redo_files();
744         err = clone_drop_user_data(thd, false);
745         if (err != 0) {
746           clone_files_error();
747         }
748       }
749     }
750 
751     /* Add new task for the clone apply operation. */
752     if (err == 0) {
753       ut_ad(loc != nullptr);
754       err = clone_hdl->add_task(thd, loc, loc_len, task_id);
755     }
756     mutex_enter(clone_sys->get_mutex());
757 
758     if (err != 0) {
759       clone_sys->drop_clone(clone_hdl);
760       return (err);
761     }
762 
763   } else {
764     ut_ad(mode == HA_CLONE_MODE_VERSION);
765 
766     /* Set all clone status files empty. */
767     if (clone_hdl->replace_datadir()) {
768       clone_init_list_files();
769     }
770   }
771 
772   if (task_id > 0) {
773     ib::info(ER_IB_CLONE_START_STOP)
774         << "Clone Apply Begin Task ID: " << task_id;
775   }
776   /* Get the current locator from clone handle. */
777   if (mode != HA_CLONE_MODE_ADD_TASK) {
778     loc = clone_hdl->get_locator(loc_len);
779   }
780   return (0);
781 }
782 
innodb_clone_apply(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,int in_err,Ha_clone_cbk * cbk)783 int innodb_clone_apply(handlerton *hton, THD *thd, const byte *loc,
784                        uint loc_len, uint task_id, int in_err,
785                        Ha_clone_cbk *cbk) {
786   /* Get clone handle by locator index. */
787   auto clone_hdl = clone_sys->get_clone_by_index(loc, loc_len);
788   ut_ad(in_err != 0 || cbk != nullptr);
789 
790   /* For error input, return after saving it */
791   if (in_err != 0 || cbk == nullptr) {
792     clone_hdl->save_error(in_err);
793     auto da = thd->get_stmt_da();
794     ib::info(ER_IB_CLONE_OPERATION)
795         << "Clone Apply set error code: " << in_err << ": "
796         << ((in_err == 0 || da == nullptr || !da->is_error())
797                 ? ""
798                 : da->message_text());
799     return (0);
800   }
801 
802   cbk->set_hton(hton);
803   auto err = clone_hdl->check_error(thd);
804   if (err != 0) {
805     return (err);
806   }
807 
808   /* Apply data received from callback. */
809   err = clone_hdl->apply(thd, task_id, cbk);
810   clone_hdl->save_error(err);
811 
812   return (err);
813 }
814 
innodb_clone_apply_end(handlerton * hton,THD * thd,const byte * loc,uint loc_len,uint task_id,int in_err)815 int innodb_clone_apply_end(handlerton *hton, THD *thd, const byte *loc,
816                            uint loc_len, uint task_id, int in_err) {
817   auto err = innodb_clone_end(hton, thd, loc, loc_len, task_id, in_err);
818   return (err);
819 }
820 
821 /* Logical bitmap for clone file state. */
822 
823 /** Data file is found. */
824 const int FILE_DATA = 1;
825 /** Saved data file is found */
826 const int FILE_SAVED = 10;
827 /** Cloned data file is found */
828 const int FILE_CLONED = 100;
829 
830 /** NONE state: file not present. */
831 const int FILE_STATE_NONE = 0;
832 /** Normal state: only data file is present. */
833 const int FILE_STATE_NORMAL = FILE_DATA;
834 /** Saved state: only saved data file is present. */
835 const int FILE_STATE_SAVED = FILE_SAVED;
836 /** Cloned state: data file and cloned data file are present. */
837 const int FILE_STATE_CLONED = FILE_DATA + FILE_CLONED;
838 /** Saved clone state: saved data file and cloned data file are present. */
839 const int FILE_STATE_CLONE_SAVED = FILE_SAVED + FILE_CLONED;
840 /** Replaced state: saved data file and data file are present. */
841 const int FILE_STATE_REPLACED = FILE_SAVED + FILE_DATA;
842 
843 /* Clone data File state transfer.
844   [FILE_STATE_NORMAL] --> [FILE_STATE_CLONED]
845     Remote data is cloned into another file named <file_name>.clone.
846 
847   [FILE_STATE_CLONED] --> [FILE_STATE_CLONE_SAVED]
848     Before recovery the datafile is saved in a file named <file_name>.save.
849 
850   [FILE_STATE_CLONE_SAVED] --> [FILE_STATE_REPLACED]
851     Before recovery the cloned file is moved to datafile.
852 
853   [FILE_STATE_REPLACED] --> [FILE_STATE_NORMAL]
854     After successful recovery the saved data file is removed.
855 
856   Every state transition involves a single file create, delete or rename and
857   we consider them atomic. In case of a failure the state rolls back exactly
858   in reverse order.
859 */
860 
861 /** Get current state of a clone file.
862 @param[in]	data_file	data file name
863 @return current file state. */
get_file_state(std::string data_file)864 static int get_file_state(std::string data_file) {
865   int state = 0;
866   /* Check if data file is there. */
867   if (file_exists(data_file)) {
868     state += FILE_DATA;
869   }
870 
871   std::string saved_file(data_file);
872   saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
873 
874   /* Check if saved old file is there. */
875   if (file_exists(saved_file)) {
876     state += FILE_SAVED;
877   }
878 
879   std::string cloned_file(data_file);
880   cloned_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
881 
882   /* Check if cloned file is there. */
883   if (file_exists(cloned_file)) {
884     state += FILE_CLONED;
885   }
886   return (state);
887 }
888 
889 /** Roll forward clone file state till final state.
890 @param[in]	data_file	data file name
891 @param[in]	final_state	data file state to forward to
892 @return previous file state before roll forward. */
file_roll_forward(std::string & data_file,int final_state)893 static int file_roll_forward(std::string &data_file, int final_state) {
894   auto cur_state = get_file_state(data_file);
895 
896   switch (cur_state) {
897     case FILE_STATE_CLONED: {
898       if (final_state == FILE_STATE_CLONED) {
899         break;
900       }
901       /* Save data file */
902       std::string saved_file(data_file);
903       saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
904       rename_file(data_file, saved_file);
905       ib::info(ER_IB_CLONE_STATUS_FILE)
906           << "Clone File Roll Forward: Save data file " << data_file
907           << " state: " << cur_state;
908     }
909       /* Fall through */
910 
911     case FILE_STATE_CLONE_SAVED: {
912       if (final_state == FILE_STATE_CLONE_SAVED) {
913         break;
914       }
915       /* Replace data file with cloned file. */
916       std::string cloned_file(data_file);
917       cloned_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
918       rename_file(cloned_file, data_file);
919       ib::info(ER_IB_CLONE_STATUS_FILE)
920           << "Clone File Roll Forward: Rename clone to data file " << data_file
921           << " state: " << cur_state;
922     }
923       /* Fall through */
924 
925     case FILE_STATE_REPLACED: {
926       if (final_state == FILE_STATE_REPLACED) {
927         break;
928       }
929       /* Remove saved data file */
930       std::string saved_file(data_file);
931       saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
932       remove_file(saved_file);
933       ib::info(ER_IB_CLONE_STATUS_FILE)
934           << "Clone File Roll Forward: Remove saved data file " << data_file
935           << " state: " << cur_state;
936     }
937       /* Fall through */
938 
939     case FILE_STATE_NORMAL:
940       /* Nothing to do. */
941       break;
942 
943     default:
944       ib::fatal(ER_IB_CLONE_STATUS_FILE)
945           << "Clone File Roll Forward: Invalid File State: " << cur_state;
946   }
947   return (cur_state);
948 }
949 
950 /** Roll back clone file state to normal state.
951 @param[in]	data_file	data file name */
file_rollback(std::string & data_file)952 static void file_rollback(std::string &data_file) {
953   auto cur_state = get_file_state(data_file);
954 
955   switch (cur_state) {
956     case FILE_STATE_REPLACED: {
957       /* Replace data file back to cloned file. */
958       std::string cloned_file(data_file);
959       cloned_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
960       rename_file(data_file, cloned_file);
961       ib::info(ER_IB_CLONE_STATUS_FILE)
962           << "Clone File Roll Back: Rename data to cloned file " << data_file
963           << " state: " << cur_state;
964     }
965       /* Fall through */
966 
967     case FILE_STATE_CLONE_SAVED: {
968       /* Replace data file with saved file. */
969       std::string saved_file(data_file);
970       saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
971       rename_file(saved_file, data_file);
972       ib::info(ER_IB_CLONE_STATUS_FILE)
973           << "Clone File Roll Back: Rename saved to data file " << data_file
974           << " state: " << cur_state;
975     }
976       /* Fall through */
977 
978     case FILE_STATE_CLONED: {
979       /* Remove cloned data file. */
980       std::string cloned_file(data_file);
981       cloned_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
982       remove_file(cloned_file);
983       ib::info(ER_IB_CLONE_STATUS_FILE)
984           << "Clone File Roll Back: Remove cloned file " << data_file
985           << " state: " << cur_state;
986     }
987       /* Fall through */
988 
989     case FILE_STATE_NORMAL:
990       /* Nothing to do. */
991       break;
992 
993     default:
994       ib::fatal(ER_IB_CLONE_STATUS_FILE)
995           << "Clone File Roll Back: Invalid File State: " << cur_state;
996   }
997 }
998 
999 /* Clone old data File state transfer. These files are present only in
1000 recipient and we haven't drop the database objects (table/tablespace)
1001 before clone. Currently used for user created undo tablespace. Dropping
1002 undo tablespace could be expensive as we need to wait for purge to finish.
1003   [FILE_STATE_NORMAL] --> [FILE_STATE_SAVED]
1004     Before recovery the old datafile is saved in a file named <file_name>.save.
1005 
1006   [FILE_STATE_SAVED] --> [FILE_STATE_NONE]
1007     After successful recovery the saved data file is removed.
1008 
1009   These state transitions involve a single file delete or rename and
1010   we consider them atomic. In case of a failure the state rolls back.
1011 
1012   [FILE_STATE_SAVED] --> [FILE_STATE_NORMAL]
1013     On failure saved data file is moved back to original data file.
1014 */
1015 
1016 /** Roll forward old data file state till final state.
1017 @param[in]	data_file	data file name
1018 @param[in]	final_state	data file state to forward to */
old_file_roll_forward(std::string & data_file,int final_state)1019 static void old_file_roll_forward(std::string &data_file, int final_state) {
1020   auto cur_state = get_file_state(data_file);
1021 
1022   switch (cur_state) {
1023     case FILE_STATE_CLONED:
1024     case FILE_STATE_CLONE_SAVED:
1025     case FILE_STATE_REPLACED:
1026       /* If the file is also cloned, we can skip here as it would be handled
1027       with other cloned files. */
1028       ib::info(ER_IB_CLONE_STATUS_FILE)
1029           << "Clone Old File Roll Forward: Skipped cloned file " << data_file
1030           << " state: " << cur_state;
1031       break;
1032     case FILE_STATE_NORMAL: {
1033       if (final_state == FILE_STATE_NORMAL) {
1034         ut_ad(false);
1035         break;
1036       }
1037       /* Save data file */
1038       std::string saved_file(data_file);
1039       saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
1040       rename_file(data_file, saved_file);
1041       ib::info(ER_IB_CLONE_STATUS_FILE)
1042           << "Clone Old File Roll Forward: Saved data file " << data_file
1043           << " state: " << cur_state;
1044     }
1045       /* Fall through */
1046 
1047     case FILE_STATE_SAVED: {
1048       if (final_state == FILE_STATE_SAVED) {
1049         break;
1050       }
1051       /* Remove saved data file */
1052       std::string saved_file(data_file);
1053       saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
1054       remove_file(saved_file);
1055       ib::info(ER_IB_CLONE_STATUS_FILE)
1056           << "Clone Old File Roll Forward: Remove saved file " << data_file
1057           << " state: " << cur_state;
1058     }
1059       /* Fall through */
1060 
1061     case FILE_STATE_NONE:
1062       /* Nothing to do. */
1063       break;
1064 
1065     default:
1066       ib::fatal(ER_IB_CLONE_STATUS_FILE)
1067           << "Clone Old File Roll Forward: Invalid File State: " << cur_state;
1068   }
1069 }
1070 
1071 /** Roll back old data file state to normal state.
1072 @param[in]	data_file	data file name */
old_file_rollback(std::string & data_file)1073 static void old_file_rollback(std::string &data_file) {
1074   auto cur_state = get_file_state(data_file);
1075 
1076   switch (cur_state) {
1077     case FILE_STATE_CLONED:
1078     case FILE_STATE_CLONE_SAVED:
1079     case FILE_STATE_REPLACED:
1080       /* If the file is also cloned, we can skip here as it would be handled
1081       with other cloned files. */
1082       ib::info(ER_IB_CLONE_STATUS_FILE)
1083           << "Clone Old File Roll Back: Skip cloned file " << data_file
1084           << " state: " << cur_state;
1085       break;
1086 
1087     case FILE_STATE_SAVED: {
1088       /* Replace data file with saved file. */
1089       std::string saved_file(data_file);
1090       saved_file.append(CLONE_INNODB_SAVED_FILE_EXTN);
1091       rename_file(saved_file, data_file);
1092       ib::info(ER_IB_CLONE_STATUS_FILE)
1093           << "Clone Old File Roll Back: Renamed saved data file " << data_file
1094           << " state: " << cur_state;
1095     }
1096       /* Fall through */
1097 
1098     case FILE_STATE_NORMAL:
1099     case FILE_STATE_NONE:
1100       /* Nothing to do. */
1101       break;
1102 
1103     default:
1104       ib::fatal(ER_IB_CLONE_STATUS_FILE)
1105           << "Clone Old File Roll Back: Invalid File State: " << cur_state;
1106   }
1107 }
1108 
1109 /** Fatal error callback function. Don't call other functions from here. Don't
1110 use ut_a, ut_ad asserts or ib::fatal to avoid recursive invocation. */
clone_files_fatal_error()1111 static void clone_files_fatal_error() {
1112   /* Safeguard to avoid recursive call. */
1113   static bool started_error_handling = false;
1114   if (started_error_handling) {
1115     return;
1116   }
1117   started_error_handling = true;
1118 
1119   std::ifstream err_file(CLONE_INNODB_ERROR_FILE);
1120   if (err_file.is_open()) {
1121     err_file.close();
1122   } else {
1123     /* Create error file if not there. */
1124     std::ofstream new_file(CLONE_INNODB_ERROR_FILE);
1125     /* On creation failure, return and abort. */
1126     if (!new_file.is_open()) {
1127       return;
1128     }
1129     new_file.close();
1130   }
1131   /* In case of fatal error, from ib::fatal and ut_a asserts
1132   we terminate the process here and send the exit status so that a
1133   managed server can be restarted with older data files. */
1134   std::_Exit(MYSQLD_RESTART_EXIT);
1135 }
1136 
1137 /** Update recovery status file at end of clone recovery.
1138 @param[in]	finished	true if finishing clone recovery
1139 @param[in]	is_error	if recovery error
1140 @param[in]	is_replace	true, if replacing current directory */
clone_update_recovery_status(bool finished,bool is_error,bool is_replace)1141 static void clone_update_recovery_status(bool finished, bool is_error,
1142                                          bool is_replace) {
1143   /* true, when we are recovering a cloned database. */
1144   static bool recovery_in_progress = false;
1145   /* true, when replacing current data directory. */
1146   static bool recovery_replace = false;
1147 
1148   std::function<void()> callback_function;
1149 
1150   /* Mark the beginning of clone recovery. */
1151   if (!finished) {
1152     recovery_in_progress = true;
1153     if (is_replace) {
1154       recovery_replace = true;
1155       callback_function = clone_files_fatal_error;
1156       ut_set_assert_callback(callback_function);
1157     }
1158     return;
1159   }
1160   is_replace = recovery_replace;
1161   recovery_replace = false;
1162 
1163   /* Update status only if clone recovery in progress. */
1164   if (!recovery_in_progress) {
1165     return;
1166   }
1167 
1168   /* Mark end of clone recovery process. */
1169   recovery_in_progress = false;
1170   ut_set_assert_callback(callback_function);
1171 
1172   std::string file_name;
1173 
1174   file_name.assign(CLONE_INNODB_RECOVERY_FILE);
1175   if (!file_exists(file_name)) {
1176     return;
1177   }
1178 
1179   std::ofstream status_file;
1180   status_file.open(file_name, std::ofstream::app);
1181   if (!status_file.is_open()) {
1182     return;
1183   }
1184 
1185   /* Write zero for unsuccessful recovery. */
1186   uint64_t end_time = 0;
1187   if (is_error) {
1188     status_file << end_time << std::endl;
1189     status_file.close();
1190     /* Set recovery error so that server can restart only for replace. */
1191     clone_recovery_error = is_replace;
1192     return;
1193   }
1194 
1195   /* Write recovery end time */
1196   end_time = my_micro_time();
1197   status_file << end_time << std::endl;
1198   if (!status_file.good()) {
1199     status_file.close();
1200     return;
1201   }
1202 
1203   mtr_t mtr;
1204   mtr.start();
1205   byte *binlog_pos = trx_sysf_get(&mtr) + TRX_SYS_MYSQL_LOG_INFO;
1206 
1207   /* Check logfile magic number. */
1208   if (mach_read_from_4(binlog_pos + TRX_SYS_MYSQL_LOG_MAGIC_N_FLD) !=
1209       TRX_SYS_MYSQL_LOG_MAGIC_N) {
1210     mtr.commit();
1211     status_file.close();
1212     return;
1213   }
1214   /* Write binary log file name. */
1215   status_file << binlog_pos + TRX_SYS_MYSQL_LOG_NAME << std::endl;
1216   if (!status_file.good()) {
1217     mtr.commit();
1218     status_file.close();
1219     return;
1220   }
1221 
1222   auto high = mach_read_from_4(binlog_pos + TRX_SYS_MYSQL_LOG_OFFSET_HIGH);
1223   auto low = mach_read_from_4(binlog_pos + TRX_SYS_MYSQL_LOG_OFFSET_LOW);
1224 
1225   auto log_offset = static_cast<uint64_t>(high);
1226   log_offset = (log_offset << 32);
1227   log_offset |= static_cast<uint64_t>(low);
1228 
1229   /* Write log file offset. */
1230   status_file << log_offset << std::endl;
1231 
1232   mtr.commit();
1233   status_file.close();
1234   /* Set clone startup for GR, only during replace. */
1235   clone_startup = is_replace;
1236 }
1237 
1238 /** Initialize recovery status for cloned recovery.
1239 @param[in]	replace		we are replacing current directory. */
clone_init_recovery_status(bool replace)1240 static void clone_init_recovery_status(bool replace) {
1241   std::string file_name;
1242   file_name.assign(CLONE_INNODB_RECOVERY_FILE);
1243 
1244   std::ofstream status_file;
1245   status_file.open(file_name, std::ofstream::out | std::ofstream::trunc);
1246   if (!status_file.is_open()) {
1247     return;
1248   }
1249   /* Write recovery begin time */
1250   uint64_t begin_time = my_micro_time();
1251   status_file << begin_time << std::endl;
1252   status_file.close();
1253   clone_update_recovery_status(false, false, replace);
1254 }
1255 
clone_update_gtid_status(std::string & gtids)1256 void clone_update_gtid_status(std::string &gtids) {
1257   /* Return if not clone database recovery. */
1258   std::string replace_files(CLONE_INNODB_REPLACED_FILES);
1259   if (!file_exists(replace_files)) {
1260     return;
1261   }
1262   /* Return if status file is not created. */
1263   std::string recovery_file(CLONE_INNODB_RECOVERY_FILE);
1264   if (!file_exists(recovery_file)) {
1265     ut_ad(false);
1266     return;
1267   }
1268   /* Open status file to append GTID. */
1269   std::ofstream status_file;
1270   status_file.open(recovery_file, std::ofstream::app);
1271   if (!status_file.is_open()) {
1272     return;
1273   }
1274   status_file << gtids << std::endl;
1275   status_file.close();
1276 
1277   /* Remove replace file after successful recovery and status update. */
1278   std::ifstream files;
1279   files.open(replace_files);
1280 
1281   if (files.is_open()) {
1282     /* If file is not empty, we are replacing data directory. */
1283     std::string file_name;
1284     if (std::getline(files, file_name)) {
1285       clone_startup = true;
1286     }
1287     files.close();
1288   }
1289   remove_file(replace_files);
1290 }
1291 
clone_files_error()1292 void clone_files_error() {
1293   /* Check if clone file directory exists. */
1294   if (!os_file_exists(CLONE_FILES_DIR)) {
1295     return;
1296   }
1297 
1298   std::string err_file(CLONE_INNODB_ERROR_FILE);
1299 
1300   /* Create error status file if not there. */
1301   if (!file_exists(err_file)) {
1302     create_file(err_file);
1303   }
1304 
1305   std::ifstream files;
1306   std::string data_file;
1307 
1308   /* Open old file to get all files to be moved. */
1309   files.open(CLONE_INNODB_OLD_FILES);
1310   if (files.is_open()) {
1311     /* Extract and process all files to be moved */
1312     while (std::getline(files, data_file)) {
1313       old_file_rollback(data_file);
1314     }
1315     files.close();
1316     std::string old_files(CLONE_INNODB_OLD_FILES);
1317     remove_file(old_files);
1318   }
1319 
1320   /* Open file to get all files to be replaced. */
1321   files.open(CLONE_INNODB_REPLACED_FILES);
1322   if (files.is_open()) {
1323     /* Extract and process all files to be replaced */
1324     while (std::getline(files, data_file)) {
1325       file_rollback(data_file);
1326     }
1327     files.close();
1328     std::string replace_files(CLONE_INNODB_REPLACED_FILES);
1329     remove_file(replace_files);
1330   }
1331 
1332   /* Open file to get all new files to delete. */
1333   files.open(CLONE_INNODB_NEW_FILES);
1334   if (files.is_open()) {
1335     /* Extract and process all files to be replaced */
1336     while (std::getline(files, data_file)) {
1337       remove_file(data_file);
1338     }
1339     files.close();
1340     std::string new_files(CLONE_INNODB_NEW_FILES);
1341     remove_file(new_files);
1342   }
1343 
1344   /* Remove error status file. */
1345   remove_file(err_file);
1346 
1347   /* Update recovery status file for recovery error. */
1348   clone_update_recovery_status(true, true, true);
1349 }
1350 
1351 #ifdef UNIV_DEBUG
clone_check_recovery_crashpoint(bool is_cloned_db)1352 bool clone_check_recovery_crashpoint(bool is_cloned_db) {
1353   if (!is_cloned_db) {
1354     return (true);
1355   }
1356   std::string crash_file(CLONE_INNODB_RECOVERY_CRASH_POINT);
1357 
1358   if (file_exists(crash_file)) {
1359     remove_file(crash_file);
1360     return (false);
1361   }
1362   return (true);
1363 }
1364 #endif
1365 
clone_files_recovery(bool finished)1366 void clone_files_recovery(bool finished) {
1367   /* Clone error file is present in case of error. */
1368   std::string file_name;
1369   file_name.assign(CLONE_INNODB_ERROR_FILE);
1370 
1371   if (file_exists(file_name)) {
1372     ut_ad(!finished);
1373     clone_files_error();
1374     return;
1375   }
1376 
1377   /* if replace file is not present, remove old file. */
1378   if (!finished) {
1379     std::string replace_files(CLONE_INNODB_REPLACED_FILES);
1380     std::string old_files(CLONE_INNODB_OLD_FILES);
1381     if (!file_exists(replace_files) && file_exists(old_files)) {
1382       ut_ad(false);
1383       remove_file(old_files);
1384     }
1385   }
1386 
1387   std::ifstream files;
1388 
1389   /* Open files to get all old files to be saved or removed. Must handle
1390   the old files before cloned files. This is because during old file
1391   processing we need to skip the common files based on cloned state. If
1392   the cloned state is reset then these files would be considered as old
1393   files and removed. */
1394   int end_state = finished ? FILE_STATE_NONE : FILE_STATE_SAVED;
1395   files.open(CLONE_INNODB_OLD_FILES);
1396   if (files.is_open()) {
1397     /* Extract and process all files to be saved or removed */
1398     while (std::getline(files, file_name)) {
1399       old_file_roll_forward(file_name, end_state);
1400     }
1401     files.close();
1402 
1403     /* Remove clone file after successful recovery. */
1404     if (finished) {
1405       std::string old_files(CLONE_INNODB_OLD_FILES);
1406       remove_file(old_files);
1407     }
1408   }
1409 
1410   /* Open file to get all files to be replaced. */
1411   end_state = finished ? FILE_STATE_NORMAL : FILE_STATE_REPLACED;
1412   files.open(CLONE_INNODB_REPLACED_FILES);
1413 
1414   if (files.is_open()) {
1415     int prev_state = FILE_STATE_NORMAL;
1416     /* If file is empty, it is not replace. */
1417     bool replace = false;
1418 
1419     /* Extract and process all files to be replaced */
1420     while (std::getline(files, file_name)) {
1421       replace = true;
1422       prev_state = file_roll_forward(file_name, end_state);
1423     }
1424 
1425     files.close();
1426 
1427     if (finished) {
1428       /* Update recovery status file at the end of clone recovery. We don't
1429       remove the replace file here. It would be removed only after updating
1430       GTID state. */
1431       clone_update_recovery_status(true, false, replace);
1432     } else {
1433       /* If previous state was normal, clone recovery is already done. */
1434       if (!replace || prev_state != FILE_STATE_NORMAL) {
1435         /* Clone database recovery is started. */
1436         clone_init_recovery_status(replace);
1437       }
1438     }
1439   }
1440 
1441   file_name.assign(CLONE_INNODB_NEW_FILES);
1442   auto exists = file_exists(file_name);
1443 
1444   if (exists && finished) {
1445     /* Remove clone file after successful recovery. */
1446     std::string new_files(CLONE_INNODB_NEW_FILES);
1447     remove_file(new_files);
1448   }
1449 }
1450 
clone_init()1451 dberr_t clone_init() {
1452   /* Check if incomplete cloned data directory */
1453   if (os_file_exists(CLONE_INNODB_IN_PROGRESS_FILE)) {
1454     return (DB_ABORT_INCOMPLETE_CLONE);
1455   }
1456 
1457   /* Initialize clone files before starting recovery. */
1458   clone_files_recovery(false);
1459 
1460   if (clone_sys == nullptr) {
1461     ut_ad(Clone_Sys::s_clone_sys_state == CLONE_SYS_INACTIVE);
1462     clone_sys = UT_NEW(Clone_Sys(), mem_key_clone);
1463   }
1464   Clone_Sys::s_clone_sys_state = CLONE_SYS_ACTIVE;
1465   Clone_handler::init_xa();
1466 
1467   return (DB_SUCCESS);
1468 }
1469 
clone_free()1470 void clone_free() {
1471   Clone_handler::uninit_xa();
1472   if (clone_sys != nullptr) {
1473     ut_ad(Clone_Sys::s_clone_sys_state == CLONE_SYS_ACTIVE);
1474 
1475     UT_DELETE(clone_sys);
1476     clone_sys = nullptr;
1477   }
1478 
1479   Clone_Sys::s_clone_sys_state = CLONE_SYS_INACTIVE;
1480 }
1481 
clone_mark_abort(bool force)1482 bool clone_mark_abort(bool force) {
1483   bool aborted;
1484 
1485   mutex_enter(clone_sys->get_mutex());
1486 
1487   aborted = clone_sys->mark_abort(force);
1488 
1489   mutex_exit(clone_sys->get_mutex());
1490 
1491   DEBUG_SYNC_C("clone_marked_abort2");
1492 
1493   return (aborted);
1494 }
1495 
clone_mark_active()1496 void clone_mark_active() {
1497   mutex_enter(clone_sys->get_mutex());
1498 
1499   clone_sys->mark_active();
1500 
1501   mutex_exit(clone_sys->get_mutex());
1502 }
1503 
clone_check_active()1504 bool clone_check_active() {
1505   mutex_enter(clone_sys->get_mutex());
1506   auto is_active = clone_sys->check_active_clone(false);
1507   mutex_exit(clone_sys->get_mutex());
1508 
1509   return (is_active || Clone_handler::is_provisioning());
1510 }
1511 
clone_mark_wait()1512 bool clone_mark_wait() {
1513   mutex_enter(clone_sys->get_mutex());
1514   auto success = clone_sys->mark_wait();
1515   mutex_exit(clone_sys->get_mutex());
1516   return (success);
1517 }
1518 
clone_mark_free()1519 void clone_mark_free() {
1520   mutex_enter(clone_sys->get_mutex());
1521   clone_sys->mark_free();
1522   mutex_exit(clone_sys->get_mutex());
1523 }
1524 
1525 template <typename T>
1526 using DD_Objs = std::vector<const T *>;
1527 
1528 template <typename T>
1529 using DD_Objs_Iter = typename DD_Objs<T>::const_iterator;
1530 
1531 using Releaser = dd::cache::Dictionary_client::Auto_releaser;
1532 
1533 namespace {
1534 
1535 /** Fix schema, table and tablespace. Used for two different purposes.
1536 1. After recovery from cloned database:
1537 A. Create empty data file for non-Innodb tables that are not cloned.
1538 B. Create any schema directory that is not present.
1539 
1540 2. Before cloning into current data directory:
1541 A. Drop all user tables.
1542 B. Drop all user schema
1543 C. Drop all user tablespaces.  */
1544 
1545 class Fixup_data {
1546  public:
1547   /** Constructor.
1548   @param[in]	concurrent	spawn multiple threads
1549   @param[in]	is_drop		the operation is drop */
Fixup_data(bool concurrent,bool is_drop)1550   Fixup_data(bool concurrent, bool is_drop)
1551       : m_num_tasks(), m_concurrent(concurrent), m_drop(is_drop) {
1552     m_num_errors.store(0);
1553   }
1554 
1555   /** Fix tables for which data is not cloned.
1556   @param[in,out]	thd		current	THD
1557   @param[in]		dd_objects	table/schema/tablespace from DD
1558   @return true if error */
1559   template <typename T>
fix(THD * thd,const DD_Objs<T> & dd_objects)1560   bool fix(THD *thd, const DD_Objs<T> &dd_objects) {
1561     set_num_tasks(dd_objects.size());
1562 
1563     using namespace std::placeholders;
1564     auto fixup_function =
1565         std::bind(&Fixup_data::fix_objects<T>, this, thd, _1, _2, _3);
1566 
1567     par_for(PFS_NOT_INSTRUMENTED, dd_objects, get_num_tasks(), fixup_function);
1568 
1569     return (failed());
1570   }
1571 
1572   /** Remove data cloned from configuration tables which are not relevant
1573   in recipient.
1574   @param[in,out]	thd	current	THD
1575   @return true if error */
1576   bool fix_config_tables(THD *thd);
1577 
1578   /** Number of system configuration tables. */
1579   static const size_t S_NUM_CONFIG_TABLES = 0;
1580 
1581   /** Array of configuration tables. */
1582   static const std::array<const char *, S_NUM_CONFIG_TABLES> s_config_tables;
1583 
1584  private:
1585   /** Check and fix specific DD object.
1586   @param[in,out]	thd		current	THD
1587   @param[in]		object		DD object
1588   @param[in]		thread_number	current thread number. */
1589   template <typename T>
1590   bool fix_one_object(THD *thd, const T *object, size_t thread_number);
1591 
1592   /** Check and fix a rangle of DD objects
1593   @param[in,out]	thd		current	THD
1594   @param[in]		begin		first element in current slice
1595   @param[in]		end		last element in current slice
1596   @param[in]		thread_number	current thread number. */
1597   template <typename T>
1598   void fix_objects(THD *thd, const DD_Objs_Iter<T> &begin,
1599                    const DD_Objs_Iter<T> &end, size_t thread_number);
1600 
1601   /** @return number of tasks. */
get_num_tasks() const1602   size_t get_num_tasks() const { return (m_num_tasks); }
1603 
1604   /** Calculate and set number of new tasks to spawn.
1605   @param[in]	num_entries	number of entries to handle */
set_num_tasks(size_t num_entries)1606   void set_num_tasks(size_t num_entries) {
1607     /* Check if we are allowed to spawn multiple threads. Disable
1608     multithreading while dropping objects for now. We need more
1609     work to handle and pass interrupt signal to workers. */
1610     if (is_drop() || !allow_concurrent()) {
1611       m_num_tasks = 0;
1612       return;
1613     }
1614     /* Have one task for every 100 entries. */
1615     m_num_tasks = num_entries / 100;
1616 
1617 #ifdef UNIV_DEBUG
1618     /* Test operation in newly spawned thread. */
1619     if (m_num_tasks == 0) {
1620       ++m_num_tasks;
1621     }
1622 #endif /* UNIV_DEBUG */
1623 
1624     /* Don't go beyond 8 threads for now. */
1625     if (m_num_tasks > 8) {
1626       m_num_tasks = 8;
1627     }
1628     m_num_errors.store(0);
1629   }
1630 
1631   /** @return true, if current operation is drop. */
is_drop() const1632   bool is_drop() const { return (m_drop); }
1633 
1634   /** @return true, if concurrency is allowed. */
allow_concurrent() const1635   bool allow_concurrent() const { return (m_concurrent); }
1636 
1637   /** Get the table operation string.
1638   @return sql key word for the operation. */
sql_operation()1639   const char *sql_operation() {
1640     if (is_drop()) {
1641       return ("DROP");
1642     }
1643     /* Alternative action is truncate. */
1644     return ("TRUNCATE");
1645   }
1646 
1647   /** Check if the current SE type should be skipped.
1648   @param[in]	type	SE type
1649   @return true iff the SE needs to be skipped. */
skip_se_tables(enum legacy_db_type type)1650   bool skip_se_tables(enum legacy_db_type type) {
1651     /* Don't skip any specific DB during drop operation. All existing
1652     user tables are dropped before cloning a remote database. */
1653     if (is_drop()) {
1654       return (false);
1655     }
1656     /* Truncate only MyISAM and CSV tables. After clone we need to create
1657     empty tables for engines that are not cloned. */
1658     if (type == DB_TYPE_MYISAM || type == DB_TYPE_CSV_DB) {
1659       return (false);
1660     }
1661     return (true);
1662   }
1663 
1664   /** Check if the schema is performance schema.
1665   @param[in]	schema_name	schema name
1666   @return true iff performance schema. */
is_performance_schema(const char * schema_name) const1667   bool is_performance_schema(const char *schema_name) const {
1668     return (0 == strcmp(schema_name, PERFORMANCE_SCHEMA_DB_NAME.str));
1669   }
1670 
1671   /** Check if the current schema is system schema
1672   @param[in]	schema_name	schema name
1673   @return true iff system schema. */
is_system_schema(const char * schema_name) const1674   bool is_system_schema(const char *schema_name) const {
1675     if (0 == strcmp(schema_name, MYSQL_SCHEMA_NAME.str) ||
1676         0 == strcmp(schema_name, "sys") ||
1677         0 == strcmp(schema_name, PERFORMANCE_SCHEMA_DB_NAME.str) ||
1678         0 == strcmp(schema_name, INFORMATION_SCHEMA_NAME.str)) {
1679       return (true);
1680     }
1681     return (false);
1682   }
1683 
1684   /** Check if the current schema tables needs to be skipped.
1685   @param[in]	table		DD table
1686   @param[in]	table_name	table name
1687   @param[in]	schema_name	schema name
1688   @return true iff table needs to be skipped. */
skip_schema_tables(const dd::Table * table,const char * table_name,const char * schema_name)1689   bool skip_schema_tables(const dd::Table *table, const char *table_name,
1690                           const char *schema_name) {
1691     /* Skip specific tables only during drop. */
1692     if (!is_drop()) {
1693       return (false);
1694     }
1695 
1696     /* Handle only visible base tables. */
1697     if (table->type() != dd::enum_table_type::BASE_TABLE ||
1698         table->hidden() != dd::Abstract_table::HT_VISIBLE) {
1699       return (true);
1700     }
1701 
1702     /* Don't Skip tables in non-system schemas. */
1703     if (!is_system_schema(schema_name)) {
1704       return (false);
1705     }
1706 
1707     /* Skip DD system tables. */
1708     if (table->is_explicit_tablespace() &&
1709         table->tablespace_id() == dd::Dictionary_impl::dd_tablespace_id()) {
1710       return (true);
1711     }
1712 
1713     /* Skip all in information_schema and performance_schema tables. */
1714     if (0 == strcmp(schema_name, PERFORMANCE_SCHEMA_DB_NAME.str) ||
1715         0 == strcmp(schema_name, INFORMATION_SCHEMA_NAME.str)) {
1716       return (true);
1717     }
1718 
1719     /* Skip specific tables in mysql schema. */
1720     if (0 == strcmp(schema_name, MYSQL_SCHEMA_NAME.str) &&
1721         (0 == strcmp(table_name, GENERAL_LOG_NAME.str) ||
1722          0 == strcmp(table_name, SLOW_LOG_NAME.str))) {
1723       return (true);
1724     }
1725 
1726     /* Skip specific tables in sys schema. */
1727     if (0 == strcmp(schema_name, "sys") &&
1728         0 == strcmp(table_name, "sys_config")) {
1729       return (true);
1730     }
1731 
1732     return (false);
1733   }
1734 
1735   /** Check if the current schema needs to be skipped.
1736   @param[in]	schema_name	schema name
1737   @return true iff schema needs to be skipped. */
skip_schema(const char * schema_name)1738   bool skip_schema(const char *schema_name) {
1739     /* Don't drop system schema. */
1740     if (is_drop()) {
1741       return (is_system_schema(schema_name));
1742     }
1743     /* Information schema has no directory */
1744     if (0 == strcmp(schema_name, INFORMATION_SCHEMA_NAME.str)) {
1745       return (true);
1746     }
1747     return (false);
1748   }
1749 
1750   /** Check if the current tablespace needs to be skipped.
1751   @param[in,out]	thd		current	THD
1752   @param[in]		dd_space	dd tablespace
1753   @return true iff tablespace needs to be skipped. */
skip_tablespace(THD * thd,const dd::Tablespace * dd_space)1754   bool skip_tablespace(THD *thd, const dd::Tablespace *dd_space) {
1755     /* System tablespaces are in Innodb. Skip other engines. */
1756     auto se =
1757         ha_resolve_by_name_raw(thd, lex_cstring_handle(dd_space->engine()));
1758     auto se_type = ha_legacy_type(se ? plugin_data<handlerton *>(se) : nullptr);
1759     plugin_unlock(thd, se);
1760     if (se_type != DB_TYPE_INNODB) {
1761       return (false);
1762     }
1763 
1764     /* Skip system tablespace by name. */
1765     const auto space_name = dd_space->name().c_str();
1766     const char *innodb_prefix = "innodb_";
1767     const char *sys_prefix = "sys/";
1768     if (0 == strcmp(space_name, "mysql") ||
1769         0 == strncmp(space_name, sys_prefix, strlen(sys_prefix)) ||
1770         0 == strncmp(space_name, innodb_prefix, strlen(innodb_prefix))) {
1771       return (true);
1772     }
1773 
1774     /* Skip undo tablespaces. */
1775     auto &se_data = dd_space->se_private_data();
1776     space_id_t space_id = SPACE_UNKNOWN;
1777 
1778     if (se_data.get(dd_space_key_strings[DD_SPACE_ID], &space_id) ||
1779         space_id == SPACE_UNKNOWN) {
1780       ut_ad(false);
1781       return (false);
1782     }
1783     bool is_undo = fsp_is_undo_tablespace(space_id);
1784 
1785     /* Add skipped undo tablespace files to list of old files to remove. */
1786     if (is_undo && !allow_concurrent()) {
1787       auto dd_file = *(dd_space->files().begin());
1788       clone_add_to_list_file(CLONE_INNODB_OLD_FILES,
1789                              dd_file->filename().c_str());
1790       /* In rare case, the undo might be kept halfway truncated due to some
1791       error during truncate. Check and add truncate log file as old file if
1792       present. */
1793       undo::Tablespace undo_space(space_id);
1794       const char *log_file_name = undo_space.log_file_name();
1795 
1796       if (os_file_exists(log_file_name)) {
1797         clone_add_to_list_file(CLONE_INNODB_OLD_FILES, log_file_name);
1798       }
1799     }
1800 
1801     /* Skip all undo tablespaces. */
1802     if (is_undo) {
1803       return (true);
1804     }
1805 
1806     /* Check and skip file per table tablespace. */
1807     uint32_t flags = 0;
1808     if (se_data.get(dd_space_key_strings[DD_SPACE_FLAGS], &flags)) {
1809       ut_ad(false);
1810       return (false);
1811     }
1812 
1813     if (fsp_is_file_per_table(space_id, flags)) {
1814       return (true);
1815     }
1816     return (false);
1817   }
1818 
1819   /** Form and execute sql command.
1820   @param[in,out]	thd		current	THD
1821   @param[in]		schema_name	schema name
1822   @param[in]		table_name	table name
1823   @param[in]		tablespace_name	tablespace name
1824   @param[in]		thread_number	current thread number. */
1825   bool execute_sql(THD *thd, const char *schema_name, const char *table_name,
1826                    const char *tablespace_name, size_t thread_number);
1827 
1828   /** @return true, if any thread has failed. */
failed() const1829   bool failed() const { return (m_num_errors.load() != 0); }
1830 
1831  private:
1832   /** Number of tasks failed. */
1833   std::atomic_size_t m_num_errors;
1834 
1835   /** Number of tasks. */
1836   size_t m_num_tasks;
1837 
1838   /** Allow concurrent threads. */
1839   bool m_concurrent;
1840 
1841   /** If the objects need to be dropped. */
1842   bool m_drop;
1843 };
1844 
1845 /** All configuration tables for which data should not be cloned. From
1846 replication configurations only clone slave_master_info table needed by GR. */
1847 const std::array<const char *, Fixup_data::S_NUM_CONFIG_TABLES>
1848     Fixup_data::s_config_tables = {};
1849 
fix_config_tables(THD * thd)1850 bool Fixup_data::fix_config_tables(THD *thd) {
1851   /* No privilege check needed for individual tables. */
1852   auto saved_sctx = thd->security_context();
1853   Security_context sctx(*saved_sctx);
1854   skip_grants(thd, sctx);
1855   thd->set_security_context(&sctx);
1856 
1857   /* Disable binary logging. */
1858   char sql_stmt[FN_LEN + FN_LEN + 64];
1859   snprintf(sql_stmt, sizeof(sql_stmt), "SET SQL_LOG_BIN = OFF");
1860   static_cast<void>(clone_execute_query(thd, &sql_stmt[0], 1, false));
1861 
1862   /* Loop through all objects and fix. */
1863   bool ret = false;
1864   for (auto table : s_config_tables) {
1865     ret = execute_sql(thd, "mysql", table, nullptr, 1);
1866     if (ret) break;
1867   }
1868   /* Set back old security context. */
1869   thd->set_security_context(saved_sctx);
1870   return (ret);
1871 }
1872 
1873 template <typename T>
fix_objects(THD * thd,const DD_Objs_Iter<T> & begin,const DD_Objs_Iter<T> & end,size_t thread_number)1874 void Fixup_data::fix_objects(THD *thd, const DD_Objs_Iter<T> &begin,
1875                              const DD_Objs_Iter<T> &end, size_t thread_number) {
1876   ib::info(ER_IB_CLONE_SQL) << "Clone: Fix Object count: " << (end - begin)
1877                             << " task: " << thread_number;
1878 
1879   bool thread_created = false;
1880 
1881   /* For newly spawned threads, create server THD */
1882   if (thread_number != get_num_tasks()) {
1883     thd = create_thd(false, true, true, PSI_NOT_INSTRUMENTED);
1884     thread_created = true;
1885   }
1886 
1887   /* Save system thread type to be safe. */
1888   auto saved_thd_system = thd->system_thread;
1889 
1890   /* No privilege check needed for individual tables. */
1891   auto saved_sctx = thd->security_context();
1892   Security_context sctx(*saved_sctx);
1893   skip_grants(thd, sctx);
1894   thd->set_security_context(&sctx);
1895 
1896   char sql_stmt[FN_LEN + FN_LEN + 64];
1897 
1898   /* Disable binary logging. */
1899   snprintf(sql_stmt, sizeof(sql_stmt), "SET SQL_LOG_BIN = OFF");
1900   if (clone_execute_query(thd, &sql_stmt[0], thread_number, false)) {
1901     ++m_num_errors;
1902   }
1903 
1904   /* Disable foreign key check. */
1905   snprintf(sql_stmt, sizeof(sql_stmt), "SET FOREIGN_KEY_CHECKS=0");
1906   if (clone_execute_query(thd, &sql_stmt[0], thread_number, false)) {
1907     ++m_num_errors;
1908   }
1909 
1910   if (thread_created) {
1911     /* For concurrent worker threads set timeout for MDL lock. */
1912     snprintf(sql_stmt, sizeof(sql_stmt), "SET LOCAL LOCK_WAIT_TIMEOUT=1");
1913     if (clone_execute_query(thd, &sql_stmt[0], thread_number, false)) {
1914       ++m_num_errors;
1915     }
1916   }
1917 
1918   /* Loop through all objects and fix. */
1919   for (auto it = begin; it != end && m_num_errors == 0; ++it) {
1920     if (fix_one_object<T>(thd, *it, thread_number)) {
1921       break;
1922     }
1923   }
1924 
1925   /* Set back old security context. */
1926   thd->set_security_context(saved_sctx);
1927   thd->system_thread = saved_thd_system;
1928 
1929   /* Destroy thread if newly spawned task */
1930   if (thread_created) {
1931     destroy_thd(thd);
1932   }
1933 }
1934 
1935 template <>
fix_one_object(THD * thd,const dd::Table * table,size_t thread_number)1936 bool Fixup_data::fix_one_object(THD *thd, const dd::Table *table,
1937                                 size_t thread_number) {
1938   auto se = ha_resolve_by_name_raw(thd, lex_cstring_handle(table->engine()));
1939   auto se_type = ha_legacy_type(se ? plugin_data<handlerton *>(se) : nullptr);
1940 
1941   plugin_unlock(thd, se);
1942 
1943   if (skip_se_tables(se_type)) {
1944     return (false);
1945   }
1946 
1947   auto dc = dd::get_dd_client(thd);
1948   Releaser releaser(dc);
1949 
1950   const dd::Schema *table_schema = nullptr;
1951 
1952   auto saved_thread_type = thd->system_thread;
1953   thd->system_thread = SYSTEM_THREAD_DD_INITIALIZE;
1954 
1955   if (dc->acquire(table->schema_id(), &table_schema)) {
1956     ++m_num_errors;
1957     thd->system_thread = saved_thread_type;
1958     return (true);
1959   }
1960 
1961   const auto schema_name = table_schema->name().c_str();
1962   const auto table_name = table->name().c_str();
1963 
1964   /* For performance schema drop the SDI table. */
1965   if (is_drop() && is_performance_schema(schema_name)) {
1966     dd::sdi::drop(thd, table);
1967   }
1968   thd->system_thread = saved_thread_type;
1969 
1970   if (skip_schema_tables(table, table_name, schema_name)) {
1971     return (false);
1972   }
1973 
1974   /* Throw warning for MyIsam and CSV tables for which data is
1975   not cloned. These tables would be empty after clone. */
1976   if (!is_drop() && !is_system_schema(schema_name)) {
1977     ib::warn(ER_IB_CLONE_NON_INNODB_TABLE, schema_name, table_name);
1978   }
1979 
1980   auto ret_val =
1981       execute_sql(thd, schema_name, table_name, nullptr, thread_number);
1982   return (ret_val);
1983 }
1984 
execute_sql(THD * thd,const char * schema_name,const char * table_name,const char * tablespace_name,size_t thread_number)1985 bool Fixup_data::execute_sql(THD *thd, const char *schema_name,
1986                              const char *table_name,
1987                              const char *tablespace_name,
1988                              size_t thread_number) {
1989   char sql_stmt[FN_LEN + FN_LEN + 64];
1990 
1991   if (tablespace_name != nullptr) {
1992     /* TABLESPACE operation */
1993     snprintf(sql_stmt, sizeof(sql_stmt), "DROP TABLESPACE `%s`",
1994              tablespace_name);
1995 
1996   } else if (table_name != nullptr) {
1997     /* TABLE operation */
1998     snprintf(sql_stmt, sizeof(sql_stmt), "%s TABLE `%s`.`%s`", sql_operation(),
1999              schema_name, table_name);
2000   } else {
2001     /* SCHEMA operation */
2002     snprintf(sql_stmt, sizeof(sql_stmt), "DROP SCHEMA `%s`", schema_name);
2003   }
2004 
2005   auto saved_thread_type = thd->system_thread;
2006   if (!is_drop()) {
2007     /* No MDL locks during initialization phase. */
2008     thd->system_thread = SYSTEM_THREAD_DD_INITIALIZE;
2009   }
2010 
2011   /* Skip error while attempting drop concurrently using multiple workers.
2012   We will handle the skipped objects later in in main thread.*/
2013   bool skip_error = is_drop() && allow_concurrent();
2014 
2015   auto ret_val =
2016       clone_execute_query(thd, &sql_stmt[0], thread_number, skip_error);
2017   if (ret_val) {
2018     ++m_num_errors;
2019   }
2020 
2021   thd->system_thread = saved_thread_type;
2022 
2023   if (is_drop() && !ret_val && !thd->check_clone_vio()) {
2024     auto err = ER_QUERY_INTERRUPTED;
2025     my_error(ER_QUERY_INTERRUPTED, MYF(0));
2026     ++m_num_errors;
2027 
2028     auto da = thd->get_stmt_da();
2029     ib::info(ER_IB_CLONE_SQL)
2030         << "Clone: Failed to " << sql_stmt << " task: " << thread_number
2031         << " code: " << err << ": "
2032         << ((da == nullptr || !da->is_error()) ? "" : da->message_text());
2033   }
2034   return (ret_val);
2035 }
2036 
2037 template <>
fix_one_object(THD * thd,const dd::Schema * schema,size_t thread_number)2038 bool Fixup_data::fix_one_object(THD *thd, const dd::Schema *schema,
2039                                 size_t thread_number) {
2040   const auto schema_name = schema->name().c_str();
2041 
2042   if (skip_schema(schema_name)) {
2043     return (false);
2044   }
2045 
2046   if (is_drop()) {
2047     auto ret_val =
2048         execute_sql(thd, schema_name, nullptr, nullptr, thread_number);
2049     return (ret_val);
2050   }
2051 
2052   /* Convert schema name to directory name to handle special characters. */
2053   char schema_dir[FN_REFLEN];
2054   static_cast<void>(
2055       tablename_to_filename(schema_name, schema_dir, sizeof(schema_dir)));
2056   MY_STAT stat_info;
2057   if (mysql_file_stat(key_file_misc, schema_dir, &stat_info, MYF(0)) !=
2058       nullptr) {
2059     /* Schema directory exists */
2060     return (false);
2061   }
2062 
2063   if (my_mkdir(schema_dir, 0777, MYF(0)) < 0) {
2064     ib::error(ER_IB_CLONE_INTERNAL)
2065         << "Clone: Failed to create schema directory: " << schema_name
2066         << " task: " << thread_number;
2067     ++m_num_errors;
2068     return (true);
2069   }
2070 
2071   ib::info(ER_IB_CLONE_SQL)
2072       << "Clone: Fixed Schema: " << schema_name << " task: " << thread_number;
2073   return (false);
2074 }
2075 
2076 template <>
fix_one_object(THD * thd,const dd::Tablespace * tablespace,size_t thread_number)2077 bool Fixup_data::fix_one_object(THD *thd, const dd::Tablespace *tablespace,
2078                                 size_t thread_number) {
2079   ut_ad(is_drop());
2080 
2081   if (skip_tablespace(thd, tablespace)) {
2082     return (false);
2083   }
2084 
2085   const auto tablespace_name = tablespace->name().c_str();
2086 
2087   auto ret_val =
2088       execute_sql(thd, nullptr, nullptr, tablespace_name, thread_number);
2089   return (ret_val);
2090 }
2091 } /* namespace */
2092 
fix_cloned_tables(THD * thd)2093 bool fix_cloned_tables(THD *thd) {
2094   std::string fixup_file(CLONE_INNODB_FIXUP_FILE);
2095 
2096   /* Check if table fix up is needed. */
2097   if (!file_exists(fixup_file)) {
2098     return (false);
2099   }
2100 
2101   auto dc = dd::get_dd_client(thd);
2102   Releaser releaser(dc);
2103 
2104   Fixup_data clone_fixup(true, false);
2105 
2106   ib::info(ER_IB_CLONE_SQL) << "Clone Fixup: check and create schema directory";
2107   DD_Objs<dd::Schema> schemas;
2108 
2109   if (dc->fetch_global_components(&schemas) || clone_fixup.fix(thd, schemas)) {
2110     return (true);
2111   }
2112 
2113   ib::info(ER_IB_CLONE_SQL)
2114       << "Clone Fixup: create empty MyIsam and CSV tables";
2115   DD_Objs<dd::Table> tables;
2116 
2117   if (dc->fetch_global_components(&tables) || clone_fixup.fix(thd, tables)) {
2118     return (true);
2119   }
2120 
2121   ib::info(ER_IB_CLONE_SQL) << "Clone Fixup: replication configuration tables";
2122   if (clone_fixup.fix_config_tables(thd)) {
2123     return (true);
2124   }
2125 
2126   ib::info(ER_IB_CLONE_SQL) << "Clone Fixup: finished successfully";
2127   remove_file(fixup_file);
2128   return (false);
2129 }
2130 
clone_execute_query(THD * thd,const char * sql_stmt,size_t thread_number,bool skip_error)2131 static bool clone_execute_query(THD *thd, const char *sql_stmt,
2132                                 size_t thread_number, bool skip_error) {
2133   thd->set_query_id(next_query_id());
2134 
2135   /* We use the code from dd::excute_query here to capture the error. */
2136   Ed_connection con(thd);
2137   std::string query(sql_stmt);
2138 
2139   LEX_STRING str;
2140   lex_string_strmake(thd->mem_root, &str, query.c_str(), query.length());
2141 
2142   auto saved_thd_system = thd->system_thread;
2143   /* For visibility in SHOW PROCESS LIST during execute direct. */
2144   if (thd->system_thread == NON_SYSTEM_THREAD) {
2145     thd->system_thread = SYSTEM_THREAD_BACKGROUND;
2146   }
2147 
2148   if (con.execute_direct(str)) {
2149     thd->system_thread = saved_thd_system;
2150     auto sql_errno = con.get_last_errno();
2151     const char *sql_state = mysql_errno_to_sqlstate(sql_errno);
2152     const char *sql_errmsg = con.get_last_error();
2153 
2154     /* Skip error, if asked. Don't skip query interruption request. */
2155     if (skip_error && sql_errno != ER_QUERY_INTERRUPTED) {
2156       ib::info(ER_IB_CLONE_SQL)
2157           << "Clone: Skipped " << sql_stmt << " task: " << thread_number
2158           << " Reason = " << sql_errno << ": " << sql_errmsg;
2159       return (false);
2160     }
2161 
2162     ib::info(ER_IB_CLONE_SQL)
2163         << "Clone: Failed to " << sql_stmt << " task: " << thread_number
2164         << " code: " << sql_errno << ": " << sql_errmsg;
2165 
2166     /* Update the error to THD. */
2167     auto da = thd->get_stmt_da();
2168     if (da != nullptr) {
2169       da->set_overwrite_status(true);
2170       da->set_error_status(sql_errno, sql_errmsg, sql_state);
2171       da->push_warning(thd, sql_errno, sql_state, Sql_condition::SL_ERROR,
2172                        sql_errmsg);
2173       da->set_overwrite_status(false);
2174     }
2175     return (true);
2176   }
2177 
2178   thd->system_thread = saved_thd_system;
2179   return (false);
2180 }
2181 
clone_drop_binary_logs(THD * thd)2182 static int clone_drop_binary_logs(THD *thd) {
2183   int err = 0;
2184   /* No privilege check needed for individual tables. */
2185   auto saved_sctx = thd->security_context();
2186   Security_context sctx(*saved_sctx);
2187   skip_grants(thd, sctx);
2188   thd->set_security_context(&sctx);
2189 
2190   /* 1. Attempt to stop slaves if any. */
2191   char sql_stmt[FN_LEN + FN_LEN + 64];
2192   snprintf(sql_stmt, sizeof(sql_stmt), "STOP SLAVE");
2193 
2194   channel_map.rdlock();
2195   auto is_slave = is_slave_configured();
2196   channel_map.unlock();
2197 
2198   if (is_slave && clone_execute_query(thd, &sql_stmt[0], 1, false)) {
2199     err = ER_INTERNAL_ERROR;
2200     my_error(err, MYF(0), "Clone failed to stop slave");
2201   }
2202 
2203   if (err == 0) {
2204     /* Clear warnings if any. */
2205     thd->clear_error();
2206 
2207     /* 2. Clear all binary logs and GTID. */
2208     snprintf(sql_stmt, sizeof(sql_stmt), "RESET MASTER");
2209 
2210     if (clone_execute_query(thd, &sql_stmt[0], 1, false)) {
2211       err = ER_INTERNAL_ERROR;
2212       my_error(err, MYF(0), "Clone failed to reset binary logs");
2213     }
2214   }
2215 
2216   /* Set back old security context. */
2217   thd->set_security_context(saved_sctx);
2218   return (err);
2219 }
2220 
clone_drop_user_data(THD * thd,bool allow_threads)2221 static int clone_drop_user_data(THD *thd, bool allow_threads) {
2222   ib::warn(ER_IB_CLONE_USER_DATA, "Started");
2223   Clone_handler::set_drop_data();
2224 
2225   auto dc = dd::get_dd_client(thd);
2226   Releaser releaser(dc);
2227   Fixup_data clone_fixup(allow_threads, true);
2228 
2229   ib::info(ER_IB_CLONE_SQL) << "Clone Drop all user data";
2230   DD_Objs<dd::Table> tables;
2231 
2232   if (dc->fetch_global_components(&tables) || clone_fixup.fix(thd, tables)) {
2233     ib::info(ER_IB_CLONE_SQL) << "Clone failed to drop all user tables";
2234     my_error(ER_INTERNAL_ERROR, MYF(0), "Clone failed to drop all user tables");
2235 
2236     /* Get the first error reported. */
2237     auto da = thd->get_stmt_da();
2238     return (da->mysql_errno());
2239   }
2240 
2241   ib::info(ER_IB_CLONE_SQL) << "Clone Drop User schemas";
2242   DD_Objs<dd::Schema> schemas;
2243 
2244   if (dc->fetch_global_components(&schemas) || clone_fixup.fix(thd, schemas)) {
2245     ib::info(ER_IB_CLONE_SQL) << "Clone failed to drop all user schemas";
2246     my_error(ER_INTERNAL_ERROR, MYF(0),
2247              "Clone failed to drop all user schemas");
2248 
2249     /* Get the first error reported. */
2250     auto da = thd->get_stmt_da();
2251     return (da->mysql_errno());
2252   }
2253 
2254   ib::info(ER_IB_CLONE_SQL) << "Clone Drop User tablespaces";
2255   DD_Objs<dd::Tablespace> tablesps;
2256 
2257   if (dc->fetch_global_components(&tablesps) ||
2258       clone_fixup.fix(thd, tablesps)) {
2259     ib::info(ER_IB_CLONE_SQL) << "Clone failed to drop all user tablespaces";
2260     my_error(ER_INTERNAL_ERROR, MYF(0),
2261              "Clone failed to drop all user tablespaces");
2262 
2263     /* Get the first error reported. */
2264     auto da = thd->get_stmt_da();
2265     return (da->mysql_errno());
2266   }
2267 
2268   /* Clean binary logs after removing all user data. */
2269   if (!allow_threads) {
2270     auto err = clone_drop_binary_logs(thd);
2271     if (err != 0) {
2272       return (err);
2273     }
2274   }
2275   ib::info(ER_IB_CLONE_SQL) << "Clone Drop: finished successfully";
2276   ib::warn(ER_IB_CLONE_USER_DATA, "Finished");
2277   return (0);
2278 }
2279