1 /*****************************************************************************
2 
3 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file include/clone0clone.h
28  Innodb Clone System
29 
30  *******************************************************/
31 
32 #ifndef CLONE_CLONE_INCLUDE
33 #define CLONE_CLONE_INCLUDE
34 
35 #include <chrono>
36 #include "db0err.h"
37 #include "mysql/plugin.h"  // thd_killed()
38 #include "sql/handler.h"
39 #include "univ.i"
40 #include "ut0mutex.h"
41 
42 #include "clone0desc.h"
43 #include "clone0repl.h"
44 #include "clone0snapshot.h"
45 
46 /** Directory under data directory for all clone status files. */
47 #define CLONE_FILES_DIR OS_FILE_PREFIX "clone" OS_PATH_SEPARATOR_STR
48 
49 /** Clone in progress file name length. */
50 const size_t CLONE_INNODB_FILE_LEN = 64;
51 
52 #ifdef UNIV_DEBUG
53 /** Clone simulate recovery error file name. */
54 const char CLONE_INNODB_RECOVERY_CRASH_POINT[] =
55     CLONE_FILES_DIR OS_FILE_PREFIX "status_crash_point";
56 #endif
57 
58 /** Clone in progress file name. */
59 const char CLONE_INNODB_IN_PROGRESS_FILE[] =
60     CLONE_FILES_DIR OS_FILE_PREFIX "status_in_progress";
61 
62 /** Clone error file name. */
63 const char CLONE_INNODB_ERROR_FILE[] =
64     CLONE_FILES_DIR OS_FILE_PREFIX "status_error";
65 
66 /** Clone fix up file name. Present when clone needs table fix up. */
67 const char CLONE_INNODB_FIXUP_FILE[] =
68     CLONE_FILES_DIR OS_FILE_PREFIX "status_fix";
69 
70 /** Clone recovery status. */
71 const char CLONE_INNODB_RECOVERY_FILE[] =
72     CLONE_FILES_DIR OS_FILE_PREFIX "status_recovery";
73 
74 /** Clone file name for list of files cloned in place. */
75 const char CLONE_INNODB_NEW_FILES[] =
76     CLONE_FILES_DIR OS_FILE_PREFIX "new_files";
77 
78 /** Clone file name for list of files to be replaced. */
79 const char CLONE_INNODB_REPLACED_FILES[] =
80     CLONE_FILES_DIR OS_FILE_PREFIX "replace_files";
81 
82 /** Clone file name for list of old files to be removed. */
83 const char CLONE_INNODB_OLD_FILES[] =
84     CLONE_FILES_DIR OS_FILE_PREFIX "old_files";
85 
86 /** Clone file extension for files to be replaced. */
87 const char CLONE_INNODB_REPLACED_FILE_EXTN[] = "." OS_FILE_PREFIX "clone";
88 
89 /** Clone file extension for saved old files. */
90 const char CLONE_INNODB_SAVED_FILE_EXTN[] = "." OS_FILE_PREFIX "clone_save";
91 
92 using Clone_Msec = std::chrono::milliseconds;
93 using Clone_Sec = std::chrono::seconds;
94 using Clone_Min = std::chrono::minutes;
95 
96 /** Default sleep time while waiting: 100 ms */
97 const Clone_Msec CLONE_DEF_SLEEP{100};
98 
99 /** Default alert interval in multiple of sleep time: 5 seconds */
100 const Clone_Sec CLONE_DEF_ALERT_INTERVAL{5};
101 
102 /** Default timeout in multiple of sleep time: 30 minutes */
103 const Clone_Min CLONE_DEF_TIMEOUT{30};
104 
105 /** Clone system state */
106 enum Clone_System_State {
107   CLONE_SYS_INACTIVE,
108   CLONE_SYS_ACTIVE,
109   CLONE_SYS_ABORT
110 };
111 
112 using Clone_Sys_State = std::atomic<Clone_System_State>;
113 
114 /** Clone Handle State */
115 enum Clone_Handle_State {
116   CLONE_STATE_INIT = 1,
117   CLONE_STATE_ACTIVE,
118   CLONE_STATE_IDLE,
119   CLONE_STATE_ABORT
120 };
121 
122 /** Clone task state */
123 enum Clone_Task_State { CLONE_TASK_INACTIVE = 1, CLONE_TASK_ACTIVE };
124 
125 /** Maximum number of concurrent snapshots */
126 const int MAX_SNAPSHOTS = 1;
127 
128 /** Maximum number of concurrent clones */
129 const int MAX_CLONES = 1;
130 
131 /** Clone system array size */
132 const int CLONE_ARR_SIZE = 2 * MAX_CLONES;
133 
134 /** Snapshot system array size */
135 const int SNAPSHOT_ARR_SIZE = 2 * MAX_SNAPSHOTS;
136 
137 /** Task for clone operation. Multiple task can concurrently work
138 on a clone operation. */
139 struct Clone_Task {
140   /** Task Meta data */
141   Clone_Task_Meta m_task_meta;
142 
143   /** Task state */
144   Clone_Task_State m_task_state;
145 
146   /** Serial descriptor byte string */
147   byte *m_serial_desc;
148 
149   /** Serial descriptor allocated length */
150   uint m_alloc_len;
151 
152   /** Current file descriptor */
153   pfs_os_file_t m_current_file_des;
154 
155   /** Current file index */
156   uint m_current_file_index;
157 
158   /** Data files are read using OS buffer cache */
159   bool m_file_cache;
160 
161   /** If master task */
162   bool m_is_master;
163 
164   /** If task has associated session */
165   bool m_has_thd;
166 
167 #ifdef UNIV_DEBUG
168   /** Ignore debug sync point */
169   bool m_ignore_sync;
170 
171   /** Counter to restart in different state */
172   int m_debug_counter;
173 #endif /* UNIV_DEBUG */
174 
175   /** Allocated buffer */
176   byte *m_current_buffer;
177 
178   /** Allocated buffer length */
179   uint m_buffer_alloc_len;
180 
181   /** Data transferred for current chunk in bytes */
182   uint32_t m_data_size;
183 };
184 
185 class Clone_Handle;
186 
187 /** Task manager for manging the tasks for a clone operation */
188 class Clone_Task_Manager {
189  public:
190   /** Initialize task manager for clone handle
191   @param[in]	snapshot	snapshot */
192   void init(Clone_Snapshot *snapshot);
193 
194   /** Get task state mutex
195   @return state mutex */
get_mutex()196   ib_mutex_t *get_mutex() { return (&m_state_mutex); }
197 
198   /** Handle any error raised by concurrent tasks.
199   @param[in]	raise_error	raise error if true
200   @return error code */
201   int handle_error_other_task(bool raise_error);
202 
203   /** Set error number
204   @param[in]	err		error number
205   @param[in]	file_name	associated file name if any  */
set_error(int err,const char * file_name)206   void set_error(int err, const char *file_name) {
207     mutex_enter(&m_state_mutex);
208 
209     ib::info(ER_IB_CLONE_OPERATION) << "Clone Set Error code: " << err
210                                     << " Saved Error code: " << m_saved_error;
211 
212     /* Override any network error as we should not be waiting for restart
213     if other errors have occurred. */
214     if (m_saved_error == 0 || is_network_error(m_saved_error)) {
215       m_saved_error = err;
216 
217       if (file_name != nullptr) {
218         ut_ad(m_err_file_name != nullptr);
219         ut_ad(m_err_file_len != 0);
220 
221         strncpy(m_err_file_name, file_name, m_err_file_len);
222       }
223     }
224 
225     mutex_exit(&m_state_mutex);
226   }
227 
228   /** Add a task to task manager
229   @param[in]	thd	server THD object
230   @param[in]	ref_loc	reference locator from remote
231   @param[in]	loc_len	locator length in bytes
232   @param[out]	task_id	task identifier
233   @return error code */
234   int add_task(THD *thd, const byte *ref_loc, uint loc_len, uint &task_id);
235 
236   /** Drop task from task manager
237   @param[in]	thd		server THD object
238   @param[in]	task_id		current task ID
239   @param[out]	is_master	true, if master task
240   @return true if needs to wait for re-start */
241   bool drop_task(THD *thd, uint task_id, bool &is_master);
242 
243   /** Reset chunk information for task
244   @param[in]	task	current task */
reset_chunk(Clone_Task * task)245   void reset_chunk(Clone_Task *task) {
246     ut_ad(mutex_own(&m_state_mutex));
247 
248     /* Reset current processing chunk */
249     task->m_task_meta.m_chunk_num = 0;
250     task->m_task_meta.m_block_num = 0;
251 
252     if (task->m_data_size > 0) {
253       ut_ad(get_state() != CLONE_SNAPSHOT_NONE);
254       ut_ad(get_state() != CLONE_SNAPSHOT_INIT);
255       ut_ad(get_state() != CLONE_SNAPSHOT_DONE);
256 
257       auto &monitor = m_clone_snapshot->get_clone_monitor();
258 
259       monitor.update_work(task->m_data_size);
260     }
261 
262     task->m_data_size = 0;
263   }
264 
265   /** Get task by index
266   @param[in]	index	task index
267   @return task */
get_task_by_index(uint index)268   Clone_Task *get_task_by_index(uint index) {
269     auto task = (m_clone_tasks + index);
270     ut_ad(task->m_task_state == CLONE_TASK_ACTIVE);
271 
272     return (task);
273   }
274 
275   /** Reserve next chunk from task manager. Called by individual tasks.
276   @param[in]	task		requesting task
277   @param[out]	ret_chunk	reserved chunk number
278   @param[out]	ret_block	start block number
279                                   '0' if no more chunk.
280   @return error code */
281   int reserve_next_chunk(Clone_Task *task, uint32_t &ret_chunk,
282                          uint32_t &ret_block);
283 
284   /** Set current chunk and block information
285   @param[in,out]	task		requesting task
286   @param[in]	new_meta	updated task metadata
287   @return error code */
288   int set_chunk(Clone_Task *task, Clone_Task_Meta *new_meta);
289 
290   /** Track any incomplete chunks handled by the task
291   @param[in,out]	task	current task */
292   void add_incomplete_chunk(Clone_Task *task);
293 
294   /** Initialize task manager for current state */
295   void init_state();
296 
297   /** Reinitialize state using locator
298   @param[in]	loc	locator from remote client
299   @param[in]	loc_len	locator length in bytes */
300   void reinit_copy_state(const byte *loc, uint loc_len);
301 
302   /** Reinitialize state using locator
303   @param[in]	ref_loc		current locator
304   @param[in]	ref_len		current locator length
305   @param[out]	new_loc		new locator to be sent to remote server
306   @param[out]	new_len		length of new locator
307   @param[in,out]	alloc_len	allocated length for locator buffer */
308   void reinit_apply_state(const byte *ref_loc, uint ref_len, byte *&new_loc,
309                           uint &new_len, uint &alloc_len);
310 
311   /** Reset state transition information */
reset_transition()312   void reset_transition() {
313     m_num_tasks_transit = 0;
314     m_num_tasks_finished = 0;
315     m_next_state = CLONE_SNAPSHOT_NONE;
316   }
317 
318   /** Reset error information */
reset_error()319   void reset_error() {
320     m_saved_error = 0;
321     strncpy(m_err_file_name, "Clone File", m_err_file_len);
322   }
323 
324   /** Get current clone state
325   @return clone state */
get_state()326   Snapshot_State get_state() { return (m_current_state); }
327 
328   /** Check if in state transition
329   @return true if state transition is in progress */
in_transit_state()330   bool in_transit_state() { return (m_next_state != CLONE_SNAPSHOT_NONE); }
331 
332   /** Get attached snapshot
333   @return snapshot */
get_snapshot()334   Clone_Snapshot *get_snapshot() { return (m_clone_snapshot); }
335 
336   /** Move to next snapshot state. Each task must call this after
337   no more chunk is left in current state. The state can be changed
338   only after all tasks have finished transferring the reserved chunks.
339   @param[in]	task		clone task
340   @param[in]	state_desc	descriptor for next state
341   @param[in]	new_state	next state to move to
342   @param[in]	cbk		alert callback for long wait
343   @param[out]	num_wait	unfinished tasks in current state
344   @return error code */
345   int change_state(Clone_Task *task, Clone_Desc_State *state_desc,
346                    Snapshot_State new_state, Clone_Alert_Func cbk,
347                    uint &num_wait);
348 
349   /** Check if state transition is over and all tasks moved to next state
350   @param[in]	task		requesting task
351   @param[in]	new_state	next state to move to
352   @param[in]	exit_on_wait	exit from transition if needs to wait
353   @param[in]	in_err		input error if already occurred
354   @param[out]	num_wait	number of tasks to move to next state
355   @return error code */
356   int check_state(Clone_Task *task, Snapshot_State new_state, bool exit_on_wait,
357                   int in_err, uint32_t &num_wait);
358 
359   /** Check if needs to send state metadata once
360   @param[in]	task	current task
361   @return true if needs to send state metadata */
is_restart_metadata(Clone_Task * task)362   bool is_restart_metadata(Clone_Task *task) {
363     if (task->m_is_master && m_send_state_meta) {
364       m_send_state_meta = false;
365       return (true);
366     }
367 
368     return (false);
369   }
370 
371   /** @return true if file metadata is transferred */
is_file_metadata_transferred()372   bool is_file_metadata_transferred() const {
373     return (m_transferred_file_meta);
374   }
375 
376   /** Set sub-state: all file metadata is transferred */
set_file_meta_transferred()377   void set_file_meta_transferred() { m_transferred_file_meta = true; }
378 
379   /** Mark state finished for current task
380   @param[in]	task	current task
381   @return error code */
382   int finish_state(Clone_Task *task);
383 
384   /** Set acknowledged state
385   @param[in]	state_desc	State descriptor */
386   void ack_state(const Clone_Desc_State *state_desc);
387 
388   /** Wait for acknowledgement
389   @param[in]	clone		parent clone handle
390   @param[in]	task		current task
391   @param[in]	callback	user callback interface
392   @return error code */
393   int wait_ack(Clone_Handle *clone, Clone_Task *task, Ha_clone_cbk *callback);
394 
395   /** Check if state ACK is needed
396   @param[in]	state_desc	State descriptor
397   @return true if need to wait for ACK from remote */
check_ack(const Clone_Desc_State * state_desc)398   bool check_ack(const Clone_Desc_State *state_desc) {
399     bool ret = true;
400 
401     mutex_enter(&m_state_mutex);
402 
403     /* Check if state is already acknowledged */
404     if (m_ack_state == state_desc->m_state) {
405       ut_ad(m_restart_count > 0);
406       ret = false;
407       ++m_num_tasks_finished;
408     }
409 
410     mutex_exit(&m_state_mutex);
411 
412     return (ret);
413   }
414 
415   /** Check if clone is restarted after failure
416   @return true if restarted */
is_restarted()417   bool is_restarted() { return (m_restart_count > 0); }
418 
419   /** Allocate buffers for current task
420   @param[in,out]	task	current task
421   @return error code */
422   int alloc_buffer(Clone_Task *task);
423 
424 #ifdef UNIV_DEBUG
425   /** Wait during clone operation
426   @param[in]	chunk_num	chunk number to process
427   @param[in]	task		current task */
428   void debug_wait(uint chunk_num, Clone_Task *task);
429 
430   /** Force restart clone operation by raising network error
431   @param[in]	task		current task
432   @param[in]	in_err		any err that has occurred
433   @param[in]	restart_count	restart counter
434   @return error code */
435   int debug_restart(Clone_Task *task, int in_err, int restart_count);
436 #endif /* UNIV_DEBUG */
437 
438  private:
439   /** Check if we need to wait before adding current task
440   @param[in]	ref_loc	reference locator from remote
441   @param[in]	loc_len	reference locator length
442   @return true, if needs to wait */
443   bool wait_before_add(const byte *ref_loc, uint loc_len);
444 
445  private:
446   /** Check if network error
447   @param[in]	err	error code
448   @return true if network error */
is_network_error(int err)449   bool is_network_error(int err) {
450     if (err == ER_NET_ERROR_ON_WRITE || err == ER_NET_READ_ERROR ||
451         err == ER_NET_WRITE_INTERRUPTED || err == ER_NET_READ_INTERRUPTED ||
452         err == ER_NET_WAIT_ERROR) {
453       return (true);
454     }
455     return (false);
456   }
457 
458   /** Reserve free task from task manager and initialize
459   @param[in]	thd	server THD object
460   @param[out]	task_id	initialized task ID */
461   void reserve_task(THD *thd, uint &task_id);
462 
463   /** Check if we should process incomplete chunk next. Incomplete
464   chunks could be there after a re-start from network failure. We always
465   process the chunks in order and need to choose accordingly.
466   @return if need to process incomplete chunk next. */
process_inclomplete_chunk()467   inline bool process_inclomplete_chunk() {
468     /* 1. Check if there is any incomplete chunk. */
469     auto &chunks = m_chunk_info.m_incomplete_chunks;
470     if (chunks.empty()) {
471       return (false);
472     }
473 
474     /* 2. Check if all complete chunks are processed. */
475     auto min_complete_chunk = m_chunk_info.m_min_unres_chunk;
476     if (min_complete_chunk > m_chunk_info.m_total_chunks) {
477       return (true);
478     }
479 
480     /* 3. Compare the minimum chunk number for complete and incomplete chunk */
481     auto it = chunks.begin();
482     auto min_incomplete_chunk = it->first;
483 
484     ut_ad(min_complete_chunk != min_incomplete_chunk);
485     return (min_incomplete_chunk < min_complete_chunk);
486   }
487 
488   /** Get next in complete chunk if any
489   @param[out]	block_num	first block number in chunk
490   @return incomplete chunk number */
491   uint32_t get_next_incomplete_chunk(uint32 &block_num);
492 
493   /** Get next unreserved chunk
494   @return chunk number */
495   uint32_t get_next_chunk();
496 
497  private:
498   /** Mutex synchronizing access by concurrent tasks */
499   ib_mutex_t m_state_mutex;
500 
501   /** Finished and incomplete chunk information */
502   Chunk_Info m_chunk_info;
503 
504   /** Clone task array */
505   Clone_Task m_clone_tasks[CLONE_MAX_TASKS];
506 
507   /** Current number of tasks */
508   uint m_num_tasks;
509 
510   /** Number of tasks finished current state */
511   uint m_num_tasks_finished;
512 
513   /** Number of tasks in transit state */
514   uint m_num_tasks_transit;
515 
516   /** Number of times clone is restarted */
517   uint m_restart_count;
518 
519   /** Acknowledged state from client */
520   Snapshot_State m_ack_state;
521 
522   /** Current state for clone */
523   Snapshot_State m_current_state;
524 
525   /** Next state: used during state transfer */
526   Snapshot_State m_next_state;
527 
528   /* Sub state: File metadata is transferred */
529   bool m_transferred_file_meta;
530 
531   /** Send state metadata before starting: Used for restart */
532   bool m_send_state_meta;
533 
534   /** Save any error raised by a task */
535   int m_saved_error;
536 
537   /** File name related to the saved error */
538   char *m_err_file_name;
539 
540   /** File name length */
541   size_t m_err_file_len;
542 
543   /** Attached snapshot handle */
544   Clone_Snapshot *m_clone_snapshot;
545 };
546 
547 /** Clone Handle for copying or applying data */
548 class Clone_Handle {
549  public:
550   /** Construct clone handle
551   @param[in]	handle_type	clone handle type
552   @param[in]	clone_version	clone version
553   @param[in]	clone_index	index in clone array */
554   Clone_Handle(Clone_Handle_Type handle_type, uint clone_version,
555                uint clone_index);
556 
557   /** Destructor: Detach from snapshot */
558   ~Clone_Handle();
559 
560   /** Initialize clone handle
561   @param[in]	ref_loc		reference locator
562   @param[in]	ref_len		reference locator length
563   @param[in]	type		clone type
564   @param[in]	data_dir	data directory for apply
565   @return error code */
566   int init(const byte *ref_loc, uint ref_len, Ha_clone_type type,
567            const char *data_dir);
568 
569   /** Attach to the clone handle */
attach()570   void attach() { ++m_ref_count; }
571 
572   /** Detach from the clone handle
573   @return reference count */
detach()574   uint detach() {
575     ut_a(m_ref_count > 0);
576     --m_ref_count;
577 
578     return (m_ref_count);
579   }
580 
581   /** Get locator for the clone handle.
582   @param[out]	loc_len	serialized locator length
583   @return serialized clone locator */
584   byte *get_locator(uint &loc_len);
585 
586   /** @return clone data directory */
get_datadir()587   const char *get_datadir() const { return (m_clone_dir); }
588 
589   /** @return true, if clone is replacing current data directory. */
replace_datadir()590   bool replace_datadir() const {
591     return (!is_copy_clone() && m_clone_dir == nullptr);
592   }
593 
594   /** Build locator descriptor for the clone handle
595   @param[out]	loc_desc	locator descriptor */
596   void build_descriptor(Clone_Desc_Locator *loc_desc);
597 
598   /** Add a task to clone handle
599   @param[in]	thd	server THD object
600   @param[in]	ref_loc	reference locator from remote
601   @param[in]	ref_len	reference locator length
602   @param[out]	task_id	task identifier
603   @return error code */
add_task(THD * thd,const byte * ref_loc,uint ref_len,uint & task_id)604   int add_task(THD *thd, const byte *ref_loc, uint ref_len, uint &task_id) {
605     return (m_clone_task_manager.add_task(thd, ref_loc, ref_len, task_id));
606   }
607 
608   /** Drop task from clone handle
609   @param[in]	thd		server THD object
610   @param[in]	task_id		current task ID
611   @param[in]	in_err		input error
612   @param[out]	is_master	true, if master task
613   @return true if needs to wait for re-start */
614   bool drop_task(THD *thd, uint task_id, int in_err, bool &is_master);
615 
616   /** Save current error number
617   @param[in]	err	error number */
save_error(int err)618   void save_error(int err) {
619     if (err != 0) {
620       m_clone_task_manager.set_error(err, nullptr);
621     }
622   }
623 
624   /** Check for error from other tasks and DDL
625   @param[in,out]	thd	session THD
626   @return error code */
check_error(THD * thd)627   int check_error(THD *thd) {
628     bool has_thd = (thd != nullptr);
629     auto err = m_clone_task_manager.handle_error_other_task(has_thd);
630     /* Save any error reported */
631     save_error(err);
632     return (err);
633   }
634 
635   /** @return true if any task is interrupted */
is_interrupted()636   bool is_interrupted() {
637     auto err = m_clone_task_manager.handle_error_other_task(false);
638     return (err == ER_QUERY_INTERRUPTED);
639   }
640 
641   /** Get clone handle index in clone array
642   @return array index */
get_index()643   uint get_index() { return (m_clone_arr_index); }
644 
645   /** Get clone data descriptor version
646   @return version */
get_version()647   uint get_version() { return (m_clone_desc_version); }
648 
649   /** Check if it is copy clone
650   @return true if copy clone handle */
is_copy_clone()651   bool is_copy_clone() const { return (m_clone_handle_type == CLONE_HDL_COPY); }
652 
653   /** Check if clone type matches
654   @param[in]	other_handle_type	type to match with
655   @return true if type matches with clone handle type */
match_hdl_type(Clone_Handle_Type other_handle_type)656   bool match_hdl_type(Clone_Handle_Type other_handle_type) {
657     return (m_clone_handle_type == other_handle_type);
658   }
659 
660   /** Set current clone state
661   @param[in]	state	clone handle state */
set_state(Clone_Handle_State state)662   void set_state(Clone_Handle_State state) { m_clone_handle_state = state; }
663 
664   /** Check if clone state is active
665   @return true if in active state */
is_active()666   bool is_active() { return (m_clone_handle_state == CLONE_STATE_ACTIVE); }
667 
668   /** Check if clone is initialized
669   @return true if in initial state */
is_init()670   bool is_init() { return (m_clone_handle_state == CLONE_STATE_INIT); }
671 
672   /** Check if clone is idle waiting for restart
673   @return true if clone is in idle state */
is_idle()674   bool is_idle() { return (m_clone_handle_state == CLONE_STATE_IDLE); }
675 
676   /** Check if clone is aborted
677   @return true if clone is aborted */
is_abort()678   bool is_abort() { return (m_clone_handle_state == CLONE_STATE_ABORT); }
679 
680   /** Restart copy after a network failure
681   @param[in]	thd	server THD object
682   @param[in]	loc	locator wit copy state from remote client
683   @param[in]	loc_len	locator length in bytes
684   @return error code */
685   int restart_copy(THD *thd, const byte *loc, uint loc_len);
686 
687   /** Build locator with current state and restart apply
688   @param[in]	thd	server THD object
689   @param[in,out]	loc	loctor with current state information
690   @param[in,out]	loc_len	locator length in bytes
691   @return error code */
692   int restart_apply(THD *thd, const byte *&loc, uint &loc_len);
693 
694   /** Transfer snapshot data via callback
695   @param[in]	thd		server THD object
696   @param[in]	task_id		current task ID
697   @param[in]	callback	user callback interface
698   @return error code */
699   int copy(THD *thd, uint task_id, Ha_clone_cbk *callback);
700 
701   /** Apply snapshot data received via callback
702   @param[in]	thd		server THD
703   @param[in]	task_id		current task ID
704   @param[in]	callback	user callback interface
705   @return error code */
706   int apply(THD *thd, uint task_id, Ha_clone_cbk *callback);
707 
708   /** Send keep alive while during long wait
709   @param[in]	task		task that is sending the information
710   @param[in]	callback	callback interface
711   @return error code */
712   int send_keep_alive(Clone_Task *task, Ha_clone_cbk *callback);
713 
714  private:
715   /** Check if enough space is there to clone.
716   @return error if not enough space */
717   int check_space();
718 
719   /** Create clone data directory.
720   @return error code */
721   int create_clone_directory();
722 
723   /** Display clone progress
724   @param[in]	cur_chunk	current chunk number
725   @param[in]	max_chunk	total number of chunks
726   @param[in,out]	percent_done	percentage completed
727   @param[in,out]	disp_time	last displayed time */
728   void display_progress(uint32_t cur_chunk, uint32_t max_chunk,
729                         uint32_t &percent_done,
730                         ib_time_monotonic_ms_t &disp_time);
731 
732   /** Open file for the task
733   @param[in]	task		clone task
734   @param[in]	file_meta	file information
735   @param[in]	file_type	file type (data, log etc.)
736   @param[in]	create_file	create if not present
737   @param[in]	set_and_close	set size and close
738   @return error code */
739   int open_file(Clone_Task *task, Clone_File_Meta *file_meta, ulint file_type,
740                 bool create_file, bool set_and_close);
741 
742   /** Close file for the task
743   @param[in]	task	clone task
744   @return error code */
745   int close_file(Clone_Task *task);
746 
747   /** Callback providing the file reference and data length to copy
748   @param[in]	cbk	callback interface
749   @param[in]	task	clone task
750   @param[in]	len	data length
751   @param[in]	buf_cbk	invoke buffer callback
752   @param[in]	offset	file offset
753   @param[in]	name	file name where func invoked
754   @param[in]	line	line where the func invoked
755   @return error code */
756   int file_callback(Ha_clone_cbk *cbk, Clone_Task *task, uint len, bool buf_cbk,
757                     uint64_t offset
758 #ifdef UNIV_PFS_IO
759                     ,
760                     const char *name, uint line
761 #endif /* UNIV_PFS_IO */
762   );
763 
764   /** Move to next state
765   @param[in]	task		clone task
766   @param[in]	callback	callback interface
767   @param[in]	state_desc	descriptor for next state to move to
768   @return error code */
769   int move_to_next_state(Clone_Task *task, Ha_clone_cbk *callback,
770                          Clone_Desc_State *state_desc);
771 
772   /** Send current state information via callback
773   @param[in]	task		task that is sending the information
774   @param[in]	callback	callback interface
775   @param[in]	is_start	if it is the start of current state
776   @return error code */
777   int send_state_metadata(Clone_Task *task, Ha_clone_cbk *callback,
778                           bool is_start);
779 
780   /** Send current task information via callback
781   @param[in]	task		task that is sending the information
782   @param[in]	callback	callback interface
783   @return error code */
784   int send_task_metadata(Clone_Task *task, Ha_clone_cbk *callback);
785 
786   /** Send all file information via callback
787   @param[in]	task		task that is sending the information
788   @param[in]	callback	callback interface
789   @return error code */
790   int send_all_file_metadata(Clone_Task *task, Ha_clone_cbk *callback);
791 
792   /** Send current file information via callback
793   @param[in]	task		task that is sending the information
794   @param[in]	file_meta	file meta information
795   @param[in]	callback	callback interface
796   @return error code */
797   int send_file_metadata(Clone_Task *task, Clone_File_Meta *file_meta,
798                          Ha_clone_cbk *callback);
799 
800   /** Send cloned data via callback
801   @param[in]	task		task that is sending the information
802   @param[in]	file_meta	file information
803   @param[in]	offset		file offset
804   @param[in]	buffer		data buffer or NULL if send from file
805   @param[in]	size		data buffer size
806   @param[in]	callback	callback interface
807   @return error code */
808   int send_data(Clone_Task *task, Clone_File_Meta *file_meta,
809                 ib_uint64_t offset, byte *buffer, uint size,
810                 Ha_clone_cbk *callback);
811 
812   /** Process a data chunk and send data blocks via callback
813   @param[in]	task		task that is sending the information
814   @param[in]	chunk_num	chunk number to process
815   @param[in]	block_num	start block number
816   @param[in]	callback	callback interface
817   @return error code */
818   int process_chunk(Clone_Task *task, uint32_t chunk_num, uint32_t block_num,
819                     Ha_clone_cbk *callback);
820 
821   /** Create apply task based on task metadata in callback
822   @param[in]	task		current task
823   @param[in]	callback	callback interface
824   @return error code */
825   int apply_task_metadata(Clone_Task *task, Ha_clone_cbk *callback);
826 
827   /** Move to next state based on state metadata and set
828   state information
829   @param[in]		task		current task
830   @param[in,out]	callback	callback interface
831   @param[in,out]	state_desc	clone state descriptor
832   @return error code */
833   int ack_state_metadata(Clone_Task *task, Ha_clone_cbk *callback,
834                          Clone_Desc_State *state_desc);
835 
836   /** Notify state change via callback.
837   @param[in]		task		current task
838   @param[in,out]	callback	callback interface
839   @param[in,out]	state_desc	clone state descriptor */
840   void notify_state_change(Clone_Task *task, Ha_clone_cbk *callback,
841                            Clone_Desc_State *state_desc);
842 
843   /** Move to next state based on state metadata and set
844   state information
845   @param[in]	task		current task
846   @param[in]	callback	callback interface
847   @return error code */
848   int apply_state_metadata(Clone_Task *task, Ha_clone_cbk *callback);
849 
850   /** Create file metadata based on callback
851   @param[in]	task		current task
852   @param[in]	callback	callback interface
853   @return error code */
854   int apply_file_metadata(Clone_Task *task, Ha_clone_cbk *callback);
855 
856   /** Apply data received via callback
857   @param[in]	task		current task
858   @param[in]	callback	callback interface
859   @return error code */
860   int apply_data(Clone_Task *task, Ha_clone_cbk *callback);
861 
862   /** Receive data from callback and apply
863   @param[in]	task		task that is receiving the information
864   @param[in]	offset		file offset for applying data
865   @param[in]	file_size	updated file size
866   @param[in]	size		data length in bytes
867   @param[in]	callback	callback interface
868   @return error code */
869   int receive_data(Clone_Task *task, uint64_t offset, uint64_t file_size,
870                    uint32_t size, Ha_clone_cbk *callback);
871 
872   /** Punch holes for multiple pages during apply.
873   @param[in]	file		file descriptor
874   @param[in]	buffer		data buffer
875   @param[in]	len		buffer length
876   @param[in]	start_off	starting offset in file
877   @param[in]	page_len	page length
878   @param[in]	block_size	file system block size
879   @return innodb error code */
880   dberr_t punch_holes(os_file_t file, const byte *buffer, uint32_t len,
881                       uint64_t start_off, uint32_t page_len,
882                       uint32_t block_size);
883 
884   /** Modify page encryption attribute and/or punch hole.
885   @param[in]		task	task that is applying data
886   @param[in]		offset	file offset for applying data
887   @param[in,out]	buffer	data to apply
888   @param[in]		buf_len	data buffer length
889   @return error code */
890   int modify_and_write(const Clone_Task *task, uint64_t offset,
891                        unsigned char *buffer, uint32_t buf_len);
892 
893  private:
894   /** Clone handle type: Copy, Apply */
895   Clone_Handle_Type m_clone_handle_type;
896 
897   /** Clone handle state */
898   Clone_Handle_State m_clone_handle_state;
899 
900   /** Fixed locator for version negotiation. */
901   byte m_version_locator[CLONE_DESC_MAX_BASE_LEN];
902 
903   /** Serialized locator */
904   byte *m_clone_locator;
905 
906   /** Locator length in bytes */
907   uint m_locator_length;
908 
909   /** Serialized Restart locator */
910   byte *m_restart_loc;
911 
912   /** Restart locator length in bytes */
913   uint m_restart_loc_len;
914 
915   /** Clone descriptor version in use */
916   uint m_clone_desc_version;
917 
918   /** Index in global array */
919   uint m_clone_arr_index;
920 
921   /** Unique clone identifier */
922   ib_uint64_t m_clone_id;
923 
924   /** Reference count */
925   uint m_ref_count;
926 
927   /** Allow restart of clone operation after network failure */
928   bool m_allow_restart;
929 
930   /** Clone data directory */
931   const char *m_clone_dir;
932 
933   /** Clone task manager */
934   Clone_Task_Manager m_clone_task_manager;
935 };
936 
937 /** Clone System */
938 class Clone_Sys {
939  public:
940   /** Construct clone system */
941   Clone_Sys();
942 
943   /** Destructor: Call during system shutdown */
944   ~Clone_Sys();
945 
946   /** Create and add a new clone handle to clone system
947   @param[in]	loc		locator
948   @param[in]	hdl_type	handle type
949   @param[out]	clone_hdl	clone handle
950   @return error code */
951   int add_clone(const byte *loc, Clone_Handle_Type hdl_type,
952                 Clone_Handle *&clone_hdl);
953 
954   /** drop a clone handle from clone system
955   @param[in]	clone_handle	Clone handle */
956   void drop_clone(Clone_Handle *clone_handle);
957 
958   /** Find if a clone is already running for the reference locator
959   @param[in]	ref_loc		reference locator
960   @param[in]	loc_len		reference locator length
961   @param[in]	hdl_type	clone type
962   @return clone handle if found, NULL otherwise */
963   Clone_Handle *find_clone(const byte *ref_loc, uint loc_len,
964                            Clone_Handle_Type hdl_type);
965 
966   /** Get the clone handle from locator by index
967   @param[in]	loc	locator
968   @param[in]	loc_len	locator length in bytes
969   @return clone handle */
970   Clone_Handle *get_clone_by_index(const byte *loc, uint loc_len);
971 
972   /** Get or create a snapshot for clone and attach
973   @param[in]	hdl_type	handle type
974   @param[in]	clone_type	clone type
975   @param[in]	snapshot_id	snapshot identifier
976   @param[in]	is_pfs_monitor	true, if needs PFS monitoring
977   @param[out]	snapshot	clone snapshot
978   @return error code */
979   int attach_snapshot(Clone_Handle_Type hdl_type, Ha_clone_type clone_type,
980                       ib_uint64_t snapshot_id, bool is_pfs_monitor,
981                       Clone_Snapshot *&snapshot);
982 
983   /** Detach clone handle from snapshot
984   @param[in]	snapshot	snapshot
985   @param[in]	hdl_type	handle type */
986   void detach_snapshot(Clone_Snapshot *snapshot, Clone_Handle_Type hdl_type);
987 
988   /** Mark clone state to abort if no active clone. If force is set,
989   abort all active clones and set state to abort.
990   @param[in]	force	force active clones to abort
991   @return true if global state is set to abort successfully */
992   bool mark_abort(bool force);
993 
994   /** Mark clone state to active if no other abort request */
995   void mark_active();
996 
997   /** Mark to indicate that new clone operations should wait.
998   @return true, if no active clone and mark is set successfully */
999   bool mark_wait();
1000 
1001   /** Free the wait marker. */
1002   void mark_free();
1003 
1004   /** Wait for marker to get freed.
1005   @param[in,out]	thd	user session
1006   @return, error if timeout */
1007   int wait_for_free(THD *thd);
1008 
1009   /** Get next unique ID
1010   @return unique ID */
1011   ib_uint64_t get_next_id();
1012 
1013   /** Get clone sys mutex
1014   @return clone system mutex */
get_mutex()1015   ib_mutex_t *get_mutex() { return (&m_clone_sys_mutex); }
1016 
1017   /** Clone System state */
1018   static Clone_Sys_State s_clone_sys_state;
1019 
1020   /** Number of active abort requests */
1021   static uint s_clone_abort_count;
1022 
1023   /** Number of active wait requests */
1024   static uint s_clone_wait_count;
1025 
1026   /** Function to check wait condition
1027   @param[in]	is_alert	print alert message
1028   @param[out]	result		true, if condition is satisfied
1029   @return error code */
1030   using Wait_Cond_Cbk_Func = std::function<int(bool is_alert, bool &result)>;
1031 
1032   /** Wait till the condition is satisfied or timeout.
1033   @param[in]	sleep_time	sleep time in milliseconds
1034   @param[in]	timeout		total time to wait in seconds
1035   @param[in]	alert_interval	alert interval in seconds
1036   @param[in]	func		callback function for condition check
1037   @param[in]	mutex		release during sleep and re-acquire
1038   @param[out]	is_timeout	true if timeout
1039   @return error code returned by callback function. */
wait(Clone_Msec sleep_time,Clone_Sec timeout,Clone_Sec alert_interval,Wait_Cond_Cbk_Func && func,ib_mutex_t * mutex,bool & is_timeout)1040   static int wait(Clone_Msec sleep_time, Clone_Sec timeout,
1041                   Clone_Sec alert_interval, Wait_Cond_Cbk_Func &&func,
1042                   ib_mutex_t *mutex, bool &is_timeout) {
1043     int err = 0;
1044     bool wait = true;
1045     is_timeout = false;
1046 
1047     int loop_count = 0;
1048     auto alert_count = static_cast<int>(alert_interval / sleep_time);
1049     auto total_count = static_cast<int>(timeout / sleep_time);
1050 
1051     /* Call function once before waiting. */
1052     err = func(false, wait);
1053 
1054     while (!is_timeout && wait && err == 0) {
1055       ++loop_count;
1056 
1057       /* Release input mutex */
1058       if (mutex != nullptr) {
1059         ut_ad(mutex_own(mutex));
1060         mutex_exit(mutex);
1061       }
1062 
1063       std::this_thread::sleep_for(sleep_time);
1064 
1065       /* Acquire input mutex back */
1066       if (mutex != nullptr) {
1067         mutex_enter(mutex);
1068       }
1069 
1070       auto alert = (alert_count > 0) ? (loop_count % alert_count == 0) : true;
1071 
1072       err = func(alert, wait);
1073 
1074       is_timeout = (loop_count > total_count);
1075     }
1076     return (err);
1077   }
1078 
1079   /** Wait till the condition is satisfied or default timeout.
1080   @param[in]	func		callback function for condition check
1081   @param[in]	mutex		release during sleep and re-acquire
1082   @param[out]	is_timeout	true if timeout
1083   @return error code returned by callback function. */
wait_default(Wait_Cond_Cbk_Func && func,ib_mutex_t * mutex,bool & is_timeout)1084   static int wait_default(Wait_Cond_Cbk_Func &&func, ib_mutex_t *mutex,
1085                           bool &is_timeout) {
1086     return (wait(CLONE_DEF_SLEEP, Clone_Sec(CLONE_DEF_TIMEOUT),
1087                  CLONE_DEF_ALERT_INTERVAL,
1088                  std::forward<Wait_Cond_Cbk_Func>(func), mutex, is_timeout));
1089   }
1090 
1091   /** Check if any active clone is running.
1092   @param[in]	print_alert	print alert message
1093   @return true, if concurrent clone in progress */
1094   bool check_active_clone(bool print_alert);
1095 
1096   /** @return GTID persistor */
get_gtid_persistor()1097   Clone_persist_gtid &get_gtid_persistor() { return (m_gtid_persister); }
1098 
1099  private:
1100   /** Find free index to allocate new clone handle.
1101   @param[in]	hdl_type	clone handle type
1102   @param[out]	free_index	free index in array
1103   @return error code */
1104   int find_free_index(Clone_Handle_Type hdl_type, uint &free_index);
1105 
1106  private:
1107   /** Array of clone handles */
1108   Clone_Handle *m_clone_arr[CLONE_ARR_SIZE];
1109 
1110   /** Number of copy clones */
1111   uint m_num_clones;
1112 
1113   /** Number of apply clones */
1114   uint m_num_apply_clones;
1115 
1116   /** Array of clone snapshots */
1117   Clone_Snapshot *m_snapshot_arr[SNAPSHOT_ARR_SIZE];
1118 
1119   /** Number of copy snapshots */
1120   uint m_num_snapshots;
1121 
1122   /** Number of apply snapshots */
1123   uint m_num_apply_snapshots;
1124 
1125   /** Clone system mutex */
1126   ib_mutex_t m_clone_sys_mutex;
1127 
1128   /** Clone unique ID generator */
1129   ib_uint64_t m_clone_id_generator;
1130 
1131   /** GTID persister */
1132   Clone_persist_gtid m_gtid_persister;
1133 };
1134 
1135 /** Clone system global */
1136 extern Clone_Sys *clone_sys;
1137 
1138 #endif /* CLONE_CLONE_INCLUDE */
1139