1 /***************************************************************************** 2 3 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved. 4 5 This program is free software; you can redistribute it and/or modify it under 6 the terms of the GNU General Public License, version 2.0, as published by the 7 Free Software Foundation. 8 9 This program is also distributed with certain software (including but not 10 limited to OpenSSL) that is licensed under separate terms, as designated in a 11 particular file or component or in included license documentation. The authors 12 of MySQL hereby grant you an additional permission to link the program and 13 your derivative works with the separately licensed software that they have 14 included with MySQL. 15 16 This program is distributed in the hope that it will be useful, but WITHOUT 17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, 19 for more details. 20 21 You should have received a copy of the GNU General Public License along with 22 this program; if not, write to the Free Software Foundation, Inc., 23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 24 25 *****************************************************************************/ 26 27 /** @file include/clone0clone.h 28 Innodb Clone System 29 30 *******************************************************/ 31 32 #ifndef CLONE_CLONE_INCLUDE 33 #define CLONE_CLONE_INCLUDE 34 35 #include <chrono> 36 #include "db0err.h" 37 #include "mysql/plugin.h" // thd_killed() 38 #include "sql/handler.h" 39 #include "univ.i" 40 #include "ut0mutex.h" 41 42 #include "clone0desc.h" 43 #include "clone0repl.h" 44 #include "clone0snapshot.h" 45 46 /** Directory under data directory for all clone status files. */ 47 #define CLONE_FILES_DIR OS_FILE_PREFIX "clone" OS_PATH_SEPARATOR_STR 48 49 /** Clone in progress file name length. */ 50 const size_t CLONE_INNODB_FILE_LEN = 64; 51 52 #ifdef UNIV_DEBUG 53 /** Clone simulate recovery error file name. */ 54 const char CLONE_INNODB_RECOVERY_CRASH_POINT[] = 55 CLONE_FILES_DIR OS_FILE_PREFIX "status_crash_point"; 56 #endif 57 58 /** Clone in progress file name. */ 59 const char CLONE_INNODB_IN_PROGRESS_FILE[] = 60 CLONE_FILES_DIR OS_FILE_PREFIX "status_in_progress"; 61 62 /** Clone error file name. */ 63 const char CLONE_INNODB_ERROR_FILE[] = 64 CLONE_FILES_DIR OS_FILE_PREFIX "status_error"; 65 66 /** Clone fix up file name. Present when clone needs table fix up. */ 67 const char CLONE_INNODB_FIXUP_FILE[] = 68 CLONE_FILES_DIR OS_FILE_PREFIX "status_fix"; 69 70 /** Clone recovery status. */ 71 const char CLONE_INNODB_RECOVERY_FILE[] = 72 CLONE_FILES_DIR OS_FILE_PREFIX "status_recovery"; 73 74 /** Clone file name for list of files cloned in place. */ 75 const char CLONE_INNODB_NEW_FILES[] = 76 CLONE_FILES_DIR OS_FILE_PREFIX "new_files"; 77 78 /** Clone file name for list of files to be replaced. */ 79 const char CLONE_INNODB_REPLACED_FILES[] = 80 CLONE_FILES_DIR OS_FILE_PREFIX "replace_files"; 81 82 /** Clone file name for list of old files to be removed. */ 83 const char CLONE_INNODB_OLD_FILES[] = 84 CLONE_FILES_DIR OS_FILE_PREFIX "old_files"; 85 86 /** Clone file extension for files to be replaced. */ 87 const char CLONE_INNODB_REPLACED_FILE_EXTN[] = "." OS_FILE_PREFIX "clone"; 88 89 /** Clone file extension for saved old files. */ 90 const char CLONE_INNODB_SAVED_FILE_EXTN[] = "." OS_FILE_PREFIX "clone_save"; 91 92 using Clone_Msec = std::chrono::milliseconds; 93 using Clone_Sec = std::chrono::seconds; 94 using Clone_Min = std::chrono::minutes; 95 96 /** Default sleep time while waiting: 100 ms */ 97 const Clone_Msec CLONE_DEF_SLEEP{100}; 98 99 /** Default alert interval in multiple of sleep time: 5 seconds */ 100 const Clone_Sec CLONE_DEF_ALERT_INTERVAL{5}; 101 102 /** Default timeout in multiple of sleep time: 30 minutes */ 103 const Clone_Min CLONE_DEF_TIMEOUT{30}; 104 105 /** Clone system state */ 106 enum Clone_System_State { 107 CLONE_SYS_INACTIVE, 108 CLONE_SYS_ACTIVE, 109 CLONE_SYS_ABORT 110 }; 111 112 using Clone_Sys_State = std::atomic<Clone_System_State>; 113 114 /** Clone Handle State */ 115 enum Clone_Handle_State { 116 CLONE_STATE_INIT = 1, 117 CLONE_STATE_ACTIVE, 118 CLONE_STATE_IDLE, 119 CLONE_STATE_ABORT 120 }; 121 122 /** Clone task state */ 123 enum Clone_Task_State { CLONE_TASK_INACTIVE = 1, CLONE_TASK_ACTIVE }; 124 125 /** Maximum number of concurrent snapshots */ 126 const int MAX_SNAPSHOTS = 1; 127 128 /** Maximum number of concurrent clones */ 129 const int MAX_CLONES = 1; 130 131 /** Clone system array size */ 132 const int CLONE_ARR_SIZE = 2 * MAX_CLONES; 133 134 /** Snapshot system array size */ 135 const int SNAPSHOT_ARR_SIZE = 2 * MAX_SNAPSHOTS; 136 137 /** Task for clone operation. Multiple task can concurrently work 138 on a clone operation. */ 139 struct Clone_Task { 140 /** Task Meta data */ 141 Clone_Task_Meta m_task_meta; 142 143 /** Task state */ 144 Clone_Task_State m_task_state; 145 146 /** Serial descriptor byte string */ 147 byte *m_serial_desc; 148 149 /** Serial descriptor allocated length */ 150 uint m_alloc_len; 151 152 /** Current file descriptor */ 153 pfs_os_file_t m_current_file_des; 154 155 /** Current file index */ 156 uint m_current_file_index; 157 158 /** Data files are read using OS buffer cache */ 159 bool m_file_cache; 160 161 /** If master task */ 162 bool m_is_master; 163 164 /** If task has associated session */ 165 bool m_has_thd; 166 167 #ifdef UNIV_DEBUG 168 /** Ignore debug sync point */ 169 bool m_ignore_sync; 170 171 /** Counter to restart in different state */ 172 int m_debug_counter; 173 #endif /* UNIV_DEBUG */ 174 175 /** Allocated buffer */ 176 byte *m_current_buffer; 177 178 /** Allocated buffer length */ 179 uint m_buffer_alloc_len; 180 181 /** Data transferred for current chunk in bytes */ 182 uint32_t m_data_size; 183 }; 184 185 class Clone_Handle; 186 187 /** Task manager for manging the tasks for a clone operation */ 188 class Clone_Task_Manager { 189 public: 190 /** Initialize task manager for clone handle 191 @param[in] snapshot snapshot */ 192 void init(Clone_Snapshot *snapshot); 193 194 /** Get task state mutex 195 @return state mutex */ get_mutex()196 ib_mutex_t *get_mutex() { return (&m_state_mutex); } 197 198 /** Handle any error raised by concurrent tasks. 199 @param[in] raise_error raise error if true 200 @return error code */ 201 int handle_error_other_task(bool raise_error); 202 203 /** Set error number 204 @param[in] err error number 205 @param[in] file_name associated file name if any */ set_error(int err,const char * file_name)206 void set_error(int err, const char *file_name) { 207 mutex_enter(&m_state_mutex); 208 209 ib::info(ER_IB_CLONE_OPERATION) << "Clone Set Error code: " << err 210 << " Saved Error code: " << m_saved_error; 211 212 /* Override any network error as we should not be waiting for restart 213 if other errors have occurred. */ 214 if (m_saved_error == 0 || is_network_error(m_saved_error)) { 215 m_saved_error = err; 216 217 if (file_name != nullptr) { 218 ut_ad(m_err_file_name != nullptr); 219 ut_ad(m_err_file_len != 0); 220 221 strncpy(m_err_file_name, file_name, m_err_file_len); 222 } 223 } 224 225 mutex_exit(&m_state_mutex); 226 } 227 228 /** Add a task to task manager 229 @param[in] thd server THD object 230 @param[in] ref_loc reference locator from remote 231 @param[in] loc_len locator length in bytes 232 @param[out] task_id task identifier 233 @return error code */ 234 int add_task(THD *thd, const byte *ref_loc, uint loc_len, uint &task_id); 235 236 /** Drop task from task manager 237 @param[in] thd server THD object 238 @param[in] task_id current task ID 239 @param[out] is_master true, if master task 240 @return true if needs to wait for re-start */ 241 bool drop_task(THD *thd, uint task_id, bool &is_master); 242 243 /** Reset chunk information for task 244 @param[in] task current task */ reset_chunk(Clone_Task * task)245 void reset_chunk(Clone_Task *task) { 246 ut_ad(mutex_own(&m_state_mutex)); 247 248 /* Reset current processing chunk */ 249 task->m_task_meta.m_chunk_num = 0; 250 task->m_task_meta.m_block_num = 0; 251 252 if (task->m_data_size > 0) { 253 ut_ad(get_state() != CLONE_SNAPSHOT_NONE); 254 ut_ad(get_state() != CLONE_SNAPSHOT_INIT); 255 ut_ad(get_state() != CLONE_SNAPSHOT_DONE); 256 257 auto &monitor = m_clone_snapshot->get_clone_monitor(); 258 259 monitor.update_work(task->m_data_size); 260 } 261 262 task->m_data_size = 0; 263 } 264 265 /** Get task by index 266 @param[in] index task index 267 @return task */ get_task_by_index(uint index)268 Clone_Task *get_task_by_index(uint index) { 269 auto task = (m_clone_tasks + index); 270 ut_ad(task->m_task_state == CLONE_TASK_ACTIVE); 271 272 return (task); 273 } 274 275 /** Reserve next chunk from task manager. Called by individual tasks. 276 @param[in] task requesting task 277 @param[out] ret_chunk reserved chunk number 278 @param[out] ret_block start block number 279 '0' if no more chunk. 280 @return error code */ 281 int reserve_next_chunk(Clone_Task *task, uint32_t &ret_chunk, 282 uint32_t &ret_block); 283 284 /** Set current chunk and block information 285 @param[in,out] task requesting task 286 @param[in] new_meta updated task metadata 287 @return error code */ 288 int set_chunk(Clone_Task *task, Clone_Task_Meta *new_meta); 289 290 /** Track any incomplete chunks handled by the task 291 @param[in,out] task current task */ 292 void add_incomplete_chunk(Clone_Task *task); 293 294 /** Initialize task manager for current state */ 295 void init_state(); 296 297 /** Reinitialize state using locator 298 @param[in] loc locator from remote client 299 @param[in] loc_len locator length in bytes */ 300 void reinit_copy_state(const byte *loc, uint loc_len); 301 302 /** Reinitialize state using locator 303 @param[in] ref_loc current locator 304 @param[in] ref_len current locator length 305 @param[out] new_loc new locator to be sent to remote server 306 @param[out] new_len length of new locator 307 @param[in,out] alloc_len allocated length for locator buffer */ 308 void reinit_apply_state(const byte *ref_loc, uint ref_len, byte *&new_loc, 309 uint &new_len, uint &alloc_len); 310 311 /** Reset state transition information */ reset_transition()312 void reset_transition() { 313 m_num_tasks_transit = 0; 314 m_num_tasks_finished = 0; 315 m_next_state = CLONE_SNAPSHOT_NONE; 316 } 317 318 /** Reset error information */ reset_error()319 void reset_error() { 320 m_saved_error = 0; 321 strncpy(m_err_file_name, "Clone File", m_err_file_len); 322 } 323 324 /** Get current clone state 325 @return clone state */ get_state()326 Snapshot_State get_state() { return (m_current_state); } 327 328 /** Check if in state transition 329 @return true if state transition is in progress */ in_transit_state()330 bool in_transit_state() { return (m_next_state != CLONE_SNAPSHOT_NONE); } 331 332 /** Get attached snapshot 333 @return snapshot */ get_snapshot()334 Clone_Snapshot *get_snapshot() { return (m_clone_snapshot); } 335 336 /** Move to next snapshot state. Each task must call this after 337 no more chunk is left in current state. The state can be changed 338 only after all tasks have finished transferring the reserved chunks. 339 @param[in] task clone task 340 @param[in] state_desc descriptor for next state 341 @param[in] new_state next state to move to 342 @param[in] cbk alert callback for long wait 343 @param[out] num_wait unfinished tasks in current state 344 @return error code */ 345 int change_state(Clone_Task *task, Clone_Desc_State *state_desc, 346 Snapshot_State new_state, Clone_Alert_Func cbk, 347 uint &num_wait); 348 349 /** Check if state transition is over and all tasks moved to next state 350 @param[in] task requesting task 351 @param[in] new_state next state to move to 352 @param[in] exit_on_wait exit from transition if needs to wait 353 @param[in] in_err input error if already occurred 354 @param[out] num_wait number of tasks to move to next state 355 @return error code */ 356 int check_state(Clone_Task *task, Snapshot_State new_state, bool exit_on_wait, 357 int in_err, uint32_t &num_wait); 358 359 /** Check if needs to send state metadata once 360 @param[in] task current task 361 @return true if needs to send state metadata */ is_restart_metadata(Clone_Task * task)362 bool is_restart_metadata(Clone_Task *task) { 363 if (task->m_is_master && m_send_state_meta) { 364 m_send_state_meta = false; 365 return (true); 366 } 367 368 return (false); 369 } 370 371 /** @return true if file metadata is transferred */ is_file_metadata_transferred()372 bool is_file_metadata_transferred() const { 373 return (m_transferred_file_meta); 374 } 375 376 /** Set sub-state: all file metadata is transferred */ set_file_meta_transferred()377 void set_file_meta_transferred() { m_transferred_file_meta = true; } 378 379 /** Mark state finished for current task 380 @param[in] task current task 381 @return error code */ 382 int finish_state(Clone_Task *task); 383 384 /** Set acknowledged state 385 @param[in] state_desc State descriptor */ 386 void ack_state(const Clone_Desc_State *state_desc); 387 388 /** Wait for acknowledgement 389 @param[in] clone parent clone handle 390 @param[in] task current task 391 @param[in] callback user callback interface 392 @return error code */ 393 int wait_ack(Clone_Handle *clone, Clone_Task *task, Ha_clone_cbk *callback); 394 395 /** Check if state ACK is needed 396 @param[in] state_desc State descriptor 397 @return true if need to wait for ACK from remote */ check_ack(const Clone_Desc_State * state_desc)398 bool check_ack(const Clone_Desc_State *state_desc) { 399 bool ret = true; 400 401 mutex_enter(&m_state_mutex); 402 403 /* Check if state is already acknowledged */ 404 if (m_ack_state == state_desc->m_state) { 405 ut_ad(m_restart_count > 0); 406 ret = false; 407 ++m_num_tasks_finished; 408 } 409 410 mutex_exit(&m_state_mutex); 411 412 return (ret); 413 } 414 415 /** Check if clone is restarted after failure 416 @return true if restarted */ is_restarted()417 bool is_restarted() { return (m_restart_count > 0); } 418 419 /** Allocate buffers for current task 420 @param[in,out] task current task 421 @return error code */ 422 int alloc_buffer(Clone_Task *task); 423 424 #ifdef UNIV_DEBUG 425 /** Wait during clone operation 426 @param[in] chunk_num chunk number to process 427 @param[in] task current task */ 428 void debug_wait(uint chunk_num, Clone_Task *task); 429 430 /** Force restart clone operation by raising network error 431 @param[in] task current task 432 @param[in] in_err any err that has occurred 433 @param[in] restart_count restart counter 434 @return error code */ 435 int debug_restart(Clone_Task *task, int in_err, int restart_count); 436 #endif /* UNIV_DEBUG */ 437 438 private: 439 /** Check if we need to wait before adding current task 440 @param[in] ref_loc reference locator from remote 441 @param[in] loc_len reference locator length 442 @return true, if needs to wait */ 443 bool wait_before_add(const byte *ref_loc, uint loc_len); 444 445 private: 446 /** Check if network error 447 @param[in] err error code 448 @return true if network error */ is_network_error(int err)449 bool is_network_error(int err) { 450 if (err == ER_NET_ERROR_ON_WRITE || err == ER_NET_READ_ERROR || 451 err == ER_NET_WRITE_INTERRUPTED || err == ER_NET_READ_INTERRUPTED || 452 err == ER_NET_WAIT_ERROR) { 453 return (true); 454 } 455 return (false); 456 } 457 458 /** Reserve free task from task manager and initialize 459 @param[in] thd server THD object 460 @param[out] task_id initialized task ID */ 461 void reserve_task(THD *thd, uint &task_id); 462 463 /** Check if we should process incomplete chunk next. Incomplete 464 chunks could be there after a re-start from network failure. We always 465 process the chunks in order and need to choose accordingly. 466 @return if need to process incomplete chunk next. */ process_inclomplete_chunk()467 inline bool process_inclomplete_chunk() { 468 /* 1. Check if there is any incomplete chunk. */ 469 auto &chunks = m_chunk_info.m_incomplete_chunks; 470 if (chunks.empty()) { 471 return (false); 472 } 473 474 /* 2. Check if all complete chunks are processed. */ 475 auto min_complete_chunk = m_chunk_info.m_min_unres_chunk; 476 if (min_complete_chunk > m_chunk_info.m_total_chunks) { 477 return (true); 478 } 479 480 /* 3. Compare the minimum chunk number for complete and incomplete chunk */ 481 auto it = chunks.begin(); 482 auto min_incomplete_chunk = it->first; 483 484 ut_ad(min_complete_chunk != min_incomplete_chunk); 485 return (min_incomplete_chunk < min_complete_chunk); 486 } 487 488 /** Get next in complete chunk if any 489 @param[out] block_num first block number in chunk 490 @return incomplete chunk number */ 491 uint32_t get_next_incomplete_chunk(uint32 &block_num); 492 493 /** Get next unreserved chunk 494 @return chunk number */ 495 uint32_t get_next_chunk(); 496 497 private: 498 /** Mutex synchronizing access by concurrent tasks */ 499 ib_mutex_t m_state_mutex; 500 501 /** Finished and incomplete chunk information */ 502 Chunk_Info m_chunk_info; 503 504 /** Clone task array */ 505 Clone_Task m_clone_tasks[CLONE_MAX_TASKS]; 506 507 /** Current number of tasks */ 508 uint m_num_tasks; 509 510 /** Number of tasks finished current state */ 511 uint m_num_tasks_finished; 512 513 /** Number of tasks in transit state */ 514 uint m_num_tasks_transit; 515 516 /** Number of times clone is restarted */ 517 uint m_restart_count; 518 519 /** Acknowledged state from client */ 520 Snapshot_State m_ack_state; 521 522 /** Current state for clone */ 523 Snapshot_State m_current_state; 524 525 /** Next state: used during state transfer */ 526 Snapshot_State m_next_state; 527 528 /* Sub state: File metadata is transferred */ 529 bool m_transferred_file_meta; 530 531 /** Send state metadata before starting: Used for restart */ 532 bool m_send_state_meta; 533 534 /** Save any error raised by a task */ 535 int m_saved_error; 536 537 /** File name related to the saved error */ 538 char *m_err_file_name; 539 540 /** File name length */ 541 size_t m_err_file_len; 542 543 /** Attached snapshot handle */ 544 Clone_Snapshot *m_clone_snapshot; 545 }; 546 547 /** Clone Handle for copying or applying data */ 548 class Clone_Handle { 549 public: 550 /** Construct clone handle 551 @param[in] handle_type clone handle type 552 @param[in] clone_version clone version 553 @param[in] clone_index index in clone array */ 554 Clone_Handle(Clone_Handle_Type handle_type, uint clone_version, 555 uint clone_index); 556 557 /** Destructor: Detach from snapshot */ 558 ~Clone_Handle(); 559 560 /** Initialize clone handle 561 @param[in] ref_loc reference locator 562 @param[in] ref_len reference locator length 563 @param[in] type clone type 564 @param[in] data_dir data directory for apply 565 @return error code */ 566 int init(const byte *ref_loc, uint ref_len, Ha_clone_type type, 567 const char *data_dir); 568 569 /** Attach to the clone handle */ attach()570 void attach() { ++m_ref_count; } 571 572 /** Detach from the clone handle 573 @return reference count */ detach()574 uint detach() { 575 ut_a(m_ref_count > 0); 576 --m_ref_count; 577 578 return (m_ref_count); 579 } 580 581 /** Get locator for the clone handle. 582 @param[out] loc_len serialized locator length 583 @return serialized clone locator */ 584 byte *get_locator(uint &loc_len); 585 586 /** @return clone data directory */ get_datadir()587 const char *get_datadir() const { return (m_clone_dir); } 588 589 /** @return true, if clone is replacing current data directory. */ replace_datadir()590 bool replace_datadir() const { 591 return (!is_copy_clone() && m_clone_dir == nullptr); 592 } 593 594 /** Build locator descriptor for the clone handle 595 @param[out] loc_desc locator descriptor */ 596 void build_descriptor(Clone_Desc_Locator *loc_desc); 597 598 /** Add a task to clone handle 599 @param[in] thd server THD object 600 @param[in] ref_loc reference locator from remote 601 @param[in] ref_len reference locator length 602 @param[out] task_id task identifier 603 @return error code */ add_task(THD * thd,const byte * ref_loc,uint ref_len,uint & task_id)604 int add_task(THD *thd, const byte *ref_loc, uint ref_len, uint &task_id) { 605 return (m_clone_task_manager.add_task(thd, ref_loc, ref_len, task_id)); 606 } 607 608 /** Drop task from clone handle 609 @param[in] thd server THD object 610 @param[in] task_id current task ID 611 @param[in] in_err input error 612 @param[out] is_master true, if master task 613 @return true if needs to wait for re-start */ 614 bool drop_task(THD *thd, uint task_id, int in_err, bool &is_master); 615 616 /** Save current error number 617 @param[in] err error number */ save_error(int err)618 void save_error(int err) { 619 if (err != 0) { 620 m_clone_task_manager.set_error(err, nullptr); 621 } 622 } 623 624 /** Check for error from other tasks and DDL 625 @param[in,out] thd session THD 626 @return error code */ check_error(THD * thd)627 int check_error(THD *thd) { 628 bool has_thd = (thd != nullptr); 629 auto err = m_clone_task_manager.handle_error_other_task(has_thd); 630 /* Save any error reported */ 631 save_error(err); 632 return (err); 633 } 634 635 /** @return true if any task is interrupted */ is_interrupted()636 bool is_interrupted() { 637 auto err = m_clone_task_manager.handle_error_other_task(false); 638 return (err == ER_QUERY_INTERRUPTED); 639 } 640 641 /** Get clone handle index in clone array 642 @return array index */ get_index()643 uint get_index() { return (m_clone_arr_index); } 644 645 /** Get clone data descriptor version 646 @return version */ get_version()647 uint get_version() { return (m_clone_desc_version); } 648 649 /** Check if it is copy clone 650 @return true if copy clone handle */ is_copy_clone()651 bool is_copy_clone() const { return (m_clone_handle_type == CLONE_HDL_COPY); } 652 653 /** Check if clone type matches 654 @param[in] other_handle_type type to match with 655 @return true if type matches with clone handle type */ match_hdl_type(Clone_Handle_Type other_handle_type)656 bool match_hdl_type(Clone_Handle_Type other_handle_type) { 657 return (m_clone_handle_type == other_handle_type); 658 } 659 660 /** Set current clone state 661 @param[in] state clone handle state */ set_state(Clone_Handle_State state)662 void set_state(Clone_Handle_State state) { m_clone_handle_state = state; } 663 664 /** Check if clone state is active 665 @return true if in active state */ is_active()666 bool is_active() { return (m_clone_handle_state == CLONE_STATE_ACTIVE); } 667 668 /** Check if clone is initialized 669 @return true if in initial state */ is_init()670 bool is_init() { return (m_clone_handle_state == CLONE_STATE_INIT); } 671 672 /** Check if clone is idle waiting for restart 673 @return true if clone is in idle state */ is_idle()674 bool is_idle() { return (m_clone_handle_state == CLONE_STATE_IDLE); } 675 676 /** Check if clone is aborted 677 @return true if clone is aborted */ is_abort()678 bool is_abort() { return (m_clone_handle_state == CLONE_STATE_ABORT); } 679 680 /** Restart copy after a network failure 681 @param[in] thd server THD object 682 @param[in] loc locator wit copy state from remote client 683 @param[in] loc_len locator length in bytes 684 @return error code */ 685 int restart_copy(THD *thd, const byte *loc, uint loc_len); 686 687 /** Build locator with current state and restart apply 688 @param[in] thd server THD object 689 @param[in,out] loc loctor with current state information 690 @param[in,out] loc_len locator length in bytes 691 @return error code */ 692 int restart_apply(THD *thd, const byte *&loc, uint &loc_len); 693 694 /** Transfer snapshot data via callback 695 @param[in] thd server THD object 696 @param[in] task_id current task ID 697 @param[in] callback user callback interface 698 @return error code */ 699 int copy(THD *thd, uint task_id, Ha_clone_cbk *callback); 700 701 /** Apply snapshot data received via callback 702 @param[in] thd server THD 703 @param[in] task_id current task ID 704 @param[in] callback user callback interface 705 @return error code */ 706 int apply(THD *thd, uint task_id, Ha_clone_cbk *callback); 707 708 /** Send keep alive while during long wait 709 @param[in] task task that is sending the information 710 @param[in] callback callback interface 711 @return error code */ 712 int send_keep_alive(Clone_Task *task, Ha_clone_cbk *callback); 713 714 private: 715 /** Check if enough space is there to clone. 716 @return error if not enough space */ 717 int check_space(); 718 719 /** Create clone data directory. 720 @return error code */ 721 int create_clone_directory(); 722 723 /** Display clone progress 724 @param[in] cur_chunk current chunk number 725 @param[in] max_chunk total number of chunks 726 @param[in,out] percent_done percentage completed 727 @param[in,out] disp_time last displayed time */ 728 void display_progress(uint32_t cur_chunk, uint32_t max_chunk, 729 uint32_t &percent_done, 730 ib_time_monotonic_ms_t &disp_time); 731 732 /** Open file for the task 733 @param[in] task clone task 734 @param[in] file_meta file information 735 @param[in] file_type file type (data, log etc.) 736 @param[in] create_file create if not present 737 @param[in] set_and_close set size and close 738 @return error code */ 739 int open_file(Clone_Task *task, Clone_File_Meta *file_meta, ulint file_type, 740 bool create_file, bool set_and_close); 741 742 /** Close file for the task 743 @param[in] task clone task 744 @return error code */ 745 int close_file(Clone_Task *task); 746 747 /** Callback providing the file reference and data length to copy 748 @param[in] cbk callback interface 749 @param[in] task clone task 750 @param[in] len data length 751 @param[in] buf_cbk invoke buffer callback 752 @param[in] offset file offset 753 @param[in] name file name where func invoked 754 @param[in] line line where the func invoked 755 @return error code */ 756 int file_callback(Ha_clone_cbk *cbk, Clone_Task *task, uint len, bool buf_cbk, 757 uint64_t offset 758 #ifdef UNIV_PFS_IO 759 , 760 const char *name, uint line 761 #endif /* UNIV_PFS_IO */ 762 ); 763 764 /** Move to next state 765 @param[in] task clone task 766 @param[in] callback callback interface 767 @param[in] state_desc descriptor for next state to move to 768 @return error code */ 769 int move_to_next_state(Clone_Task *task, Ha_clone_cbk *callback, 770 Clone_Desc_State *state_desc); 771 772 /** Send current state information via callback 773 @param[in] task task that is sending the information 774 @param[in] callback callback interface 775 @param[in] is_start if it is the start of current state 776 @return error code */ 777 int send_state_metadata(Clone_Task *task, Ha_clone_cbk *callback, 778 bool is_start); 779 780 /** Send current task information via callback 781 @param[in] task task that is sending the information 782 @param[in] callback callback interface 783 @return error code */ 784 int send_task_metadata(Clone_Task *task, Ha_clone_cbk *callback); 785 786 /** Send all file information via callback 787 @param[in] task task that is sending the information 788 @param[in] callback callback interface 789 @return error code */ 790 int send_all_file_metadata(Clone_Task *task, Ha_clone_cbk *callback); 791 792 /** Send current file information via callback 793 @param[in] task task that is sending the information 794 @param[in] file_meta file meta information 795 @param[in] callback callback interface 796 @return error code */ 797 int send_file_metadata(Clone_Task *task, Clone_File_Meta *file_meta, 798 Ha_clone_cbk *callback); 799 800 /** Send cloned data via callback 801 @param[in] task task that is sending the information 802 @param[in] file_meta file information 803 @param[in] offset file offset 804 @param[in] buffer data buffer or NULL if send from file 805 @param[in] size data buffer size 806 @param[in] callback callback interface 807 @return error code */ 808 int send_data(Clone_Task *task, Clone_File_Meta *file_meta, 809 ib_uint64_t offset, byte *buffer, uint size, 810 Ha_clone_cbk *callback); 811 812 /** Process a data chunk and send data blocks via callback 813 @param[in] task task that is sending the information 814 @param[in] chunk_num chunk number to process 815 @param[in] block_num start block number 816 @param[in] callback callback interface 817 @return error code */ 818 int process_chunk(Clone_Task *task, uint32_t chunk_num, uint32_t block_num, 819 Ha_clone_cbk *callback); 820 821 /** Create apply task based on task metadata in callback 822 @param[in] task current task 823 @param[in] callback callback interface 824 @return error code */ 825 int apply_task_metadata(Clone_Task *task, Ha_clone_cbk *callback); 826 827 /** Move to next state based on state metadata and set 828 state information 829 @param[in] task current task 830 @param[in,out] callback callback interface 831 @param[in,out] state_desc clone state descriptor 832 @return error code */ 833 int ack_state_metadata(Clone_Task *task, Ha_clone_cbk *callback, 834 Clone_Desc_State *state_desc); 835 836 /** Notify state change via callback. 837 @param[in] task current task 838 @param[in,out] callback callback interface 839 @param[in,out] state_desc clone state descriptor */ 840 void notify_state_change(Clone_Task *task, Ha_clone_cbk *callback, 841 Clone_Desc_State *state_desc); 842 843 /** Move to next state based on state metadata and set 844 state information 845 @param[in] task current task 846 @param[in] callback callback interface 847 @return error code */ 848 int apply_state_metadata(Clone_Task *task, Ha_clone_cbk *callback); 849 850 /** Create file metadata based on callback 851 @param[in] task current task 852 @param[in] callback callback interface 853 @return error code */ 854 int apply_file_metadata(Clone_Task *task, Ha_clone_cbk *callback); 855 856 /** Apply data received via callback 857 @param[in] task current task 858 @param[in] callback callback interface 859 @return error code */ 860 int apply_data(Clone_Task *task, Ha_clone_cbk *callback); 861 862 /** Receive data from callback and apply 863 @param[in] task task that is receiving the information 864 @param[in] offset file offset for applying data 865 @param[in] file_size updated file size 866 @param[in] size data length in bytes 867 @param[in] callback callback interface 868 @return error code */ 869 int receive_data(Clone_Task *task, uint64_t offset, uint64_t file_size, 870 uint32_t size, Ha_clone_cbk *callback); 871 872 /** Punch holes for multiple pages during apply. 873 @param[in] file file descriptor 874 @param[in] buffer data buffer 875 @param[in] len buffer length 876 @param[in] start_off starting offset in file 877 @param[in] page_len page length 878 @param[in] block_size file system block size 879 @return innodb error code */ 880 dberr_t punch_holes(os_file_t file, const byte *buffer, uint32_t len, 881 uint64_t start_off, uint32_t page_len, 882 uint32_t block_size); 883 884 /** Modify page encryption attribute and/or punch hole. 885 @param[in] task task that is applying data 886 @param[in] offset file offset for applying data 887 @param[in,out] buffer data to apply 888 @param[in] buf_len data buffer length 889 @return error code */ 890 int modify_and_write(const Clone_Task *task, uint64_t offset, 891 unsigned char *buffer, uint32_t buf_len); 892 893 private: 894 /** Clone handle type: Copy, Apply */ 895 Clone_Handle_Type m_clone_handle_type; 896 897 /** Clone handle state */ 898 Clone_Handle_State m_clone_handle_state; 899 900 /** Fixed locator for version negotiation. */ 901 byte m_version_locator[CLONE_DESC_MAX_BASE_LEN]; 902 903 /** Serialized locator */ 904 byte *m_clone_locator; 905 906 /** Locator length in bytes */ 907 uint m_locator_length; 908 909 /** Serialized Restart locator */ 910 byte *m_restart_loc; 911 912 /** Restart locator length in bytes */ 913 uint m_restart_loc_len; 914 915 /** Clone descriptor version in use */ 916 uint m_clone_desc_version; 917 918 /** Index in global array */ 919 uint m_clone_arr_index; 920 921 /** Unique clone identifier */ 922 ib_uint64_t m_clone_id; 923 924 /** Reference count */ 925 uint m_ref_count; 926 927 /** Allow restart of clone operation after network failure */ 928 bool m_allow_restart; 929 930 /** Clone data directory */ 931 const char *m_clone_dir; 932 933 /** Clone task manager */ 934 Clone_Task_Manager m_clone_task_manager; 935 }; 936 937 /** Clone System */ 938 class Clone_Sys { 939 public: 940 /** Construct clone system */ 941 Clone_Sys(); 942 943 /** Destructor: Call during system shutdown */ 944 ~Clone_Sys(); 945 946 /** Create and add a new clone handle to clone system 947 @param[in] loc locator 948 @param[in] hdl_type handle type 949 @param[out] clone_hdl clone handle 950 @return error code */ 951 int add_clone(const byte *loc, Clone_Handle_Type hdl_type, 952 Clone_Handle *&clone_hdl); 953 954 /** drop a clone handle from clone system 955 @param[in] clone_handle Clone handle */ 956 void drop_clone(Clone_Handle *clone_handle); 957 958 /** Find if a clone is already running for the reference locator 959 @param[in] ref_loc reference locator 960 @param[in] loc_len reference locator length 961 @param[in] hdl_type clone type 962 @return clone handle if found, NULL otherwise */ 963 Clone_Handle *find_clone(const byte *ref_loc, uint loc_len, 964 Clone_Handle_Type hdl_type); 965 966 /** Get the clone handle from locator by index 967 @param[in] loc locator 968 @param[in] loc_len locator length in bytes 969 @return clone handle */ 970 Clone_Handle *get_clone_by_index(const byte *loc, uint loc_len); 971 972 /** Get or create a snapshot for clone and attach 973 @param[in] hdl_type handle type 974 @param[in] clone_type clone type 975 @param[in] snapshot_id snapshot identifier 976 @param[in] is_pfs_monitor true, if needs PFS monitoring 977 @param[out] snapshot clone snapshot 978 @return error code */ 979 int attach_snapshot(Clone_Handle_Type hdl_type, Ha_clone_type clone_type, 980 ib_uint64_t snapshot_id, bool is_pfs_monitor, 981 Clone_Snapshot *&snapshot); 982 983 /** Detach clone handle from snapshot 984 @param[in] snapshot snapshot 985 @param[in] hdl_type handle type */ 986 void detach_snapshot(Clone_Snapshot *snapshot, Clone_Handle_Type hdl_type); 987 988 /** Mark clone state to abort if no active clone. If force is set, 989 abort all active clones and set state to abort. 990 @param[in] force force active clones to abort 991 @return true if global state is set to abort successfully */ 992 bool mark_abort(bool force); 993 994 /** Mark clone state to active if no other abort request */ 995 void mark_active(); 996 997 /** Mark to indicate that new clone operations should wait. 998 @return true, if no active clone and mark is set successfully */ 999 bool mark_wait(); 1000 1001 /** Free the wait marker. */ 1002 void mark_free(); 1003 1004 /** Wait for marker to get freed. 1005 @param[in,out] thd user session 1006 @return, error if timeout */ 1007 int wait_for_free(THD *thd); 1008 1009 /** Get next unique ID 1010 @return unique ID */ 1011 ib_uint64_t get_next_id(); 1012 1013 /** Get clone sys mutex 1014 @return clone system mutex */ get_mutex()1015 ib_mutex_t *get_mutex() { return (&m_clone_sys_mutex); } 1016 1017 /** Clone System state */ 1018 static Clone_Sys_State s_clone_sys_state; 1019 1020 /** Number of active abort requests */ 1021 static uint s_clone_abort_count; 1022 1023 /** Number of active wait requests */ 1024 static uint s_clone_wait_count; 1025 1026 /** Function to check wait condition 1027 @param[in] is_alert print alert message 1028 @param[out] result true, if condition is satisfied 1029 @return error code */ 1030 using Wait_Cond_Cbk_Func = std::function<int(bool is_alert, bool &result)>; 1031 1032 /** Wait till the condition is satisfied or timeout. 1033 @param[in] sleep_time sleep time in milliseconds 1034 @param[in] timeout total time to wait in seconds 1035 @param[in] alert_interval alert interval in seconds 1036 @param[in] func callback function for condition check 1037 @param[in] mutex release during sleep and re-acquire 1038 @param[out] is_timeout true if timeout 1039 @return error code returned by callback function. */ wait(Clone_Msec sleep_time,Clone_Sec timeout,Clone_Sec alert_interval,Wait_Cond_Cbk_Func && func,ib_mutex_t * mutex,bool & is_timeout)1040 static int wait(Clone_Msec sleep_time, Clone_Sec timeout, 1041 Clone_Sec alert_interval, Wait_Cond_Cbk_Func &&func, 1042 ib_mutex_t *mutex, bool &is_timeout) { 1043 int err = 0; 1044 bool wait = true; 1045 is_timeout = false; 1046 1047 int loop_count = 0; 1048 auto alert_count = static_cast<int>(alert_interval / sleep_time); 1049 auto total_count = static_cast<int>(timeout / sleep_time); 1050 1051 /* Call function once before waiting. */ 1052 err = func(false, wait); 1053 1054 while (!is_timeout && wait && err == 0) { 1055 ++loop_count; 1056 1057 /* Release input mutex */ 1058 if (mutex != nullptr) { 1059 ut_ad(mutex_own(mutex)); 1060 mutex_exit(mutex); 1061 } 1062 1063 std::this_thread::sleep_for(sleep_time); 1064 1065 /* Acquire input mutex back */ 1066 if (mutex != nullptr) { 1067 mutex_enter(mutex); 1068 } 1069 1070 auto alert = (alert_count > 0) ? (loop_count % alert_count == 0) : true; 1071 1072 err = func(alert, wait); 1073 1074 is_timeout = (loop_count > total_count); 1075 } 1076 return (err); 1077 } 1078 1079 /** Wait till the condition is satisfied or default timeout. 1080 @param[in] func callback function for condition check 1081 @param[in] mutex release during sleep and re-acquire 1082 @param[out] is_timeout true if timeout 1083 @return error code returned by callback function. */ wait_default(Wait_Cond_Cbk_Func && func,ib_mutex_t * mutex,bool & is_timeout)1084 static int wait_default(Wait_Cond_Cbk_Func &&func, ib_mutex_t *mutex, 1085 bool &is_timeout) { 1086 return (wait(CLONE_DEF_SLEEP, Clone_Sec(CLONE_DEF_TIMEOUT), 1087 CLONE_DEF_ALERT_INTERVAL, 1088 std::forward<Wait_Cond_Cbk_Func>(func), mutex, is_timeout)); 1089 } 1090 1091 /** Check if any active clone is running. 1092 @param[in] print_alert print alert message 1093 @return true, if concurrent clone in progress */ 1094 bool check_active_clone(bool print_alert); 1095 1096 /** @return GTID persistor */ get_gtid_persistor()1097 Clone_persist_gtid &get_gtid_persistor() { return (m_gtid_persister); } 1098 1099 private: 1100 /** Find free index to allocate new clone handle. 1101 @param[in] hdl_type clone handle type 1102 @param[out] free_index free index in array 1103 @return error code */ 1104 int find_free_index(Clone_Handle_Type hdl_type, uint &free_index); 1105 1106 private: 1107 /** Array of clone handles */ 1108 Clone_Handle *m_clone_arr[CLONE_ARR_SIZE]; 1109 1110 /** Number of copy clones */ 1111 uint m_num_clones; 1112 1113 /** Number of apply clones */ 1114 uint m_num_apply_clones; 1115 1116 /** Array of clone snapshots */ 1117 Clone_Snapshot *m_snapshot_arr[SNAPSHOT_ARR_SIZE]; 1118 1119 /** Number of copy snapshots */ 1120 uint m_num_snapshots; 1121 1122 /** Number of apply snapshots */ 1123 uint m_num_apply_snapshots; 1124 1125 /** Clone system mutex */ 1126 ib_mutex_t m_clone_sys_mutex; 1127 1128 /** Clone unique ID generator */ 1129 ib_uint64_t m_clone_id_generator; 1130 1131 /** GTID persister */ 1132 Clone_persist_gtid m_gtid_persister; 1133 }; 1134 1135 /** Clone system global */ 1136 extern Clone_Sys *clone_sys; 1137 1138 #endif /* CLONE_CLONE_INCLUDE */ 1139